bug fixes
[IRC.git] / Robust / src / Runtime / DSTM / interface / dstmserver.c
index 92103bb32787b7dbd73c24768480d526d4a26b53..99f48f070727b627bf42f88a2230f3122d89e79d 100644 (file)
@@ -367,10 +367,10 @@ int readClientReq(trans_commit_data_t *transinfo, int acceptfd) {
  * Following this it also receives a new control message from the co-ordinator and processes this message*/
 int processClientReq(fixed_data_t *fixed, trans_commit_data_t *transinfo,
                unsigned int *listmid, char *objread, void *modptr, unsigned int *oidmod, int acceptfd) {
-       char control, sendctrl;
+       char control, sendctrl, retval;
        objheader_t *tmp_header;
        void *header;
-       int  i = 0, val, retval;
+       int  i = 0, val;
 
        /* Send reply to the Coordinator */
        if((retval = handleTransReq(fixed, transinfo, listmid, objread, modptr,acceptfd)) == 0 ) {
@@ -389,7 +389,10 @@ int processClientReq(fixed_data_t *fixed, trans_commit_data_t *transinfo,
                                free(modptr);
                        /* Unlock objects that was locked due to this transaction */
                        for(i = 0; i< transinfo->numlocked; i++) {
-                               header = mhashSearch(transinfo->objlocked[i]);// find the header address
+                               if((header = mhashSearch(transinfo->objlocked[i])) == NULL) {
+                                       printf("mhashSearch returns NULL at %s, %d\n", __FILE__, __LINE__);// find the header address
+                                       return 1;
+                               }
                                STATUS(((objheader_t *)header)) &= ~(LOCK);             
                        }
 
@@ -445,7 +448,7 @@ int processClientReq(fixed_data_t *fixed, trans_commit_data_t *transinfo,
 /* This function increments counters while running a voting decision on all objects involved 
  * in TRANS_REQUEST and If a TRANS_DISAGREE sends the response immediately back to the coordinator */
 char handleTransReq(fixed_data_t *fixed, trans_commit_data_t *transinfo, unsigned int *listmid, char *objread, void *modptr, int acceptfd) {
-       int val, i = 0;
+       int val, i = 0, j;
        unsigned short version;
        char control = 0, *ptr;
        unsigned int oid;
@@ -496,6 +499,16 @@ char handleTransReq(fixed_data_t *fixed, trans_commit_data_t *transinfo, unsigne
                                        v_nomatch++;
                                        /* Send TRANS_DISAGREE to Coordinator */
                                        control = TRANS_DISAGREE;
+                                       if (objlocked > 0) {
+                                               for(j = 0; j < objlocked; j++) {
+                                                       if((headptr = mhashSearch(oidlocked[j])) == NULL) {
+                                                               printf("mhashSearch returns NULL at %s, %d\n", __FILE__, __LINE__);
+                                                               return 0;
+                                                       }
+                                                       STATUS(headptr) &= ~(LOCK);
+                                               }
+                                               free(oidlocked);
+                                       }
                                        if((val = send(acceptfd, &control, sizeof(char), MSG_NOSIGNAL)) < sizeof(char)) {
                                                perror("Error in sending control to the Coordinator\n");
                                                return 0;
@@ -512,15 +525,23 @@ char handleTransReq(fixed_data_t *fixed, trans_commit_data_t *transinfo, unsigne
                                } else { /* If versions don't match ...HARD ABORT */
                                        v_nomatch++;
                                        control = TRANS_DISAGREE;
+                                       if (objlocked > 0) {
+                                               for(j = 0; j < objlocked; j++) {
+                                                       if((headptr = mhashSearch(oidlocked[j])) == NULL) {
+                                                               printf("mhashSearch returns NULL at %s, %d\n", __FILE__, __LINE__);
+                                                               return 0;
+                                                       }
+                                                       STATUS(headptr) &= ~(LOCK);
+                                               }
+                                               free(oidlocked);
+                                       }
+
                                        /* Send TRANS_DISAGREE to Coordinator */
                                        if((val = send(acceptfd, &control, sizeof(char), MSG_NOSIGNAL)) < sizeof(char)) {
                                                perror("Error in sending control to the Coordinator\n");
                                                return 0;
                                        }
-                                       if (objlocked > 0) {
-                                               STATUS(((objheader_t *)mobj)) &= ~(LOCK);
-                                               free(oidlocked);
-                                       }
+                                       
                                        return control;
                                }
                        }
@@ -528,22 +549,23 @@ char handleTransReq(fixed_data_t *fixed, trans_commit_data_t *transinfo, unsigne
        }
        
        /* Decide what control message to send to Coordinator */
-       if ((val = decideCtrlMessage(fixed, transinfo, &v_matchnolock, &v_matchlock, &v_nomatch, &objnotfound, &objlocked,
+       if ((control = decideCtrlMessage(fixed, transinfo, &v_matchnolock, &v_matchlock, &v_nomatch, &objnotfound, &objlocked,
                                        modptr, oidnotfound, oidlocked, acceptfd)) == 0) {
                printf("Error: In decideCtrlMessage() %s, %d\n", __FILE__, __LINE__);
                return 0;
        }
        
-       return val;
+       return control;
 
 }
 /* This function decides what control message such as TRANS_AGREE, TRANS_DISAGREE or TRANS_SOFT_ABORT
  * to send to Coordinator based on the votes of oids involved in the transaction */
-int decideCtrlMessage(fixed_data_t *fixed, trans_commit_data_t *transinfo, int *v_matchnolock, int *v_matchlock, 
+char decideCtrlMessage(fixed_data_t *fixed, trans_commit_data_t *transinfo, int *v_matchnolock, int *v_matchlock, 
                int *v_nomatch, int *objnotfound, int *objlocked, void *modptr, 
                unsigned int *oidnotfound, unsigned int *oidlocked, int acceptfd) {
        int val;
        char control = 0;
+
        /* Condition to send TRANS_AGREE */
        if(*(v_matchnolock) == fixed->numread + fixed->nummod) {
                control = TRANS_AGREE;
@@ -556,16 +578,21 @@ int decideCtrlMessage(fixed_data_t *fixed, trans_commit_data_t *transinfo, int *
        /* Condition to send TRANS_SOFT_ABORT */
        if((*(v_matchlock) > 0 && *(v_nomatch) == 0) || (*(objnotfound) > 0 && *(v_nomatch) == 0)) {
                control = TRANS_SOFT_ABORT;
-               char msg[]={TRANS_SOFT_ABORT, 0,0,0,0};
-               *((int*)&msg[1])= *(objnotfound);
 
                /* Send control message */
-               if((val = send(acceptfd, &msg, sizeof(msg),MSG_NOSIGNAL)) < sizeof(msg)) {
-                       perror("Error in sending no of objects that are not found\n");
+               if((val = send(acceptfd, &control, sizeof(char),MSG_NOSIGNAL)) < sizeof(char)) {
+                       perror("Error in sending TRANS_SOFT_ABORT control\n");
                        return 0;
                }
+
                /* Send number of oids not found and the missing oids if objects are missing in the machine */
                if(*(objnotfound) != 0) { 
+                       int msg[1];
+                       msg[0] = *(objnotfound);
+                       if((val = send(acceptfd, msg, sizeof(int) ,MSG_NOSIGNAL)) < sizeof(int)) {
+                               perror("Error in sending objects that are not found\n");
+                               return 0;
+                       }
                        int size = sizeof(unsigned int)* *(objnotfound);
                        if((val = send(acceptfd, oidnotfound, size ,MSG_NOSIGNAL)) < size) {
                                perror("Error in sending objects that are not found\n");
@@ -651,23 +678,13 @@ int prefetchReq(int acceptfd) {
        char control;
        objheader_t * header;
        int bytesRecvd;
-/*
-       unsigned int myIpAddr;
 
-#ifdef MAC
-       myIpAddr = getMyIpAddr("en1");
-#else
-       myIpAddr = getMyIpAddr("eth0");
-#endif
-*/
-       /* Repeatedly recv the oid and offset pairs sent for prefetch */
+       /* Repeatedly recv one oid and offset pair sent for prefetch */
        while(numbytes = recv((int)acceptfd, &length, sizeof(int), 0) != 0) {
                count++;
                if(length == -1)
                        break;
-               index = sizeof(unsigned int); // Index starts with sizeof  unsigned int because the 
-               // first 4 bytes are saved to send the
-               // size of the buffer (that is computed at the end of the loop)
+               index = 0;  
                bytesRecvd = 0;
                do {
                        bytesRecvd += recv((int)acceptfd, (char *)&oid +bytesRecvd,
@@ -684,6 +701,7 @@ int prefetchReq(int acceptfd) {
                        sum += n; 
                } while(sum < N && n != 0);     
 
+               bzero(&buffer, PRE_BUF_SIZE);
                /* Process each oid */
                if ((mobj = mhashSearch(oid)) == NULL) {/* Obj not found */
                        /* Save the oids not found in buffer for later use */
@@ -740,6 +758,7 @@ int prefetchReq(int acceptfd) {
                                }
                        }
                }
+
                /* Check for overflow in the buffer */
                if (index >= PRE_BUF_SIZE) {
                        printf("Error: Buffer array overflow %s, %d\n", __FILE__, __LINE__);
@@ -754,11 +773,14 @@ int prefetchReq(int acceptfd) {
                        }
                }
 
-               /* Add the buffer size into buffer as a parameter */
-               *((unsigned int *)buffer)=index;
+               //Send buffer size 
+               if((numbytes = send(acceptfd, &index, sizeof(unsigned int), MSG_NOSIGNAL)) < sizeof(unsigned int)) {
+                       perror("Error: in sending PREFETCH RESPONSE to Coordinator\n");
+                       return 1;
+               }
 
                /* Send the entire buffer with its size and oids found and not found */
-               if(send((int)acceptfd, &buffer, index, MSG_NOSIGNAL) < sizeof(index -1)) {
+               if(send((int)acceptfd, &buffer, index, MSG_NOSIGNAL) < sizeof(index)) {
                        perror("Error: sending oids found\n");
                        return 1;
                }
@@ -774,8 +796,8 @@ void processReqNotify(unsigned int numoid, unsigned int *oidarry, unsigned short
        int sd;
        struct sockaddr_in remoteAddr;
        int bytesSent;
-       int status, size, retry = 0;
-       
+       int status, size;
+
        int i = 0;
        while(i < numoid) {
                oid = *(oidarry + i);
@@ -786,6 +808,7 @@ void processReqNotify(unsigned int numoid, unsigned int *oidarry, unsigned short
                        /* Check to see if versions are same */
 checkversion:
                        if ((STATUS(header) & LOCK) != LOCK) {          
+                               //FIXME make locking atomic
                                STATUS(header) |= LOCK;
                                newversion = header->version;
                                if(newversion == *(versionarry + i)) {