Bug fixes
authoradash <adash>
Wed, 27 Feb 2008 18:43:56 +0000 (18:43 +0000)
committeradash <adash>
Wed, 27 Feb 2008 18:43:56 +0000 (18:43 +0000)
Robust/src/Runtime/DSTM/interface/dstm.h
Robust/src/Runtime/DSTM/interface/dstmserver.c
Robust/src/Runtime/DSTM/interface/trans.c

index a6f3213cffb8d387b6c4e9bb792f3b5cf683985a..c250439ff9eb49cba3b1bff9c2ce421610e2810e 100644 (file)
@@ -205,7 +205,7 @@ void *dstmAccept(void *);
 int readClientReq(trans_commit_data_t *, int);
 int processClientReq(fixed_data_t *, trans_commit_data_t *,unsigned int *, char *, void *, unsigned int *, int);
 char handleTransReq(fixed_data_t *, trans_commit_data_t *, unsigned int *, char *, void *, int);
-int decideCtrlMessage(fixed_data_t *, trans_commit_data_t *, int *, int *, int *, int *, int *, void *, unsigned int *, unsigned int *, int);
+char decideCtrlMessage(fixed_data_t *, trans_commit_data_t *, int *, int *, int *, int *, int *, void *, unsigned int *, unsigned int *, int);
 int transCommitProcess(void *, unsigned int *, unsigned int *, int, int, int);
 void processReqNotify(unsigned int numoid, unsigned int *oid, unsigned short *version, unsigned int mid, unsigned int threadid);
 /* end server portion */
index a91134af64e1f049ff526572b1789fe5f61ffa7e..0c96b83f8aa5c24113428f529c13f2a08da50dee 100644 (file)
@@ -367,10 +367,10 @@ int readClientReq(trans_commit_data_t *transinfo, int acceptfd) {
  * Following this it also receives a new control message from the co-ordinator and processes this message*/
 int processClientReq(fixed_data_t *fixed, trans_commit_data_t *transinfo,
                unsigned int *listmid, char *objread, void *modptr, unsigned int *oidmod, int acceptfd) {
-       char control, sendctrl;
+       char control, sendctrl, retval;
        objheader_t *tmp_header;
        void *header;
-       int  i = 0, val, retval;
+       int  i = 0, val;
 
        /* Send reply to the Coordinator */
        if((retval = handleTransReq(fixed, transinfo, listmid, objread, modptr,acceptfd)) == 0 ) {
@@ -389,7 +389,10 @@ int processClientReq(fixed_data_t *fixed, trans_commit_data_t *transinfo,
                                free(modptr);
                        /* Unlock objects that was locked due to this transaction */
                        for(i = 0; i< transinfo->numlocked; i++) {
-                               header = mhashSearch(transinfo->objlocked[i]);// find the header address
+                               if((header = mhashSearch(transinfo->objlocked[i])) == NULL) {
+                                       printf("mhashSearch returns NULL at %s, %d\n", __FILE__, __LINE__);// find the header address
+                                       return 1;
+                               }
                                STATUS(((objheader_t *)header)) &= ~(LOCK);             
                        }
 
@@ -496,6 +499,16 @@ char handleTransReq(fixed_data_t *fixed, trans_commit_data_t *transinfo, unsigne
                                        v_nomatch++;
                                        /* Send TRANS_DISAGREE to Coordinator */
                                        control = TRANS_DISAGREE;
+                                       if (objlocked > 0) {
+                                               for(j = 0; j < objlocked; j++) {
+                                                       if((headptr = mhashSearch(oidlocked[j])) == NULL) {
+                                                               printf("mhashSearch returns NULL at %s, %d\n", __FILE__, __LINE__);
+                                                               return 0;
+                                                       }
+                                                       STATUS(headptr) &= ~(LOCK);
+                                               }
+                                               free(oidlocked);
+                                       }
                                        if((val = send(acceptfd, &control, sizeof(char), MSG_NOSIGNAL)) < sizeof(char)) {
                                                perror("Error in sending control to the Coordinator\n");
                                                return 0;
@@ -512,15 +525,23 @@ char handleTransReq(fixed_data_t *fixed, trans_commit_data_t *transinfo, unsigne
                                } else { /* If versions don't match ...HARD ABORT */
                                        v_nomatch++;
                                        control = TRANS_DISAGREE;
+                                       if (objlocked > 0) {
+                                               for(j = 0; j < objlocked; j++) {
+                                                       if((headptr = mhashSearch(oidlocked[j])) == NULL) {
+                                                               printf("mhashSearch returns NULL at %s, %d\n", __FILE__, __LINE__);
+                                                               return 0;
+                                                       }
+                                                       STATUS(headptr) &= ~(LOCK);
+                                               }
+                                               free(oidlocked);
+                                       }
+
                                        /* Send TRANS_DISAGREE to Coordinator */
                                        if((val = send(acceptfd, &control, sizeof(char), MSG_NOSIGNAL)) < sizeof(char)) {
                                                perror("Error in sending control to the Coordinator\n");
                                                return 0;
                                        }
-                                       if (objlocked > 0) {
-                                               STATUS(((objheader_t *)mobj)) &= ~(LOCK);
-                                               free(oidlocked);
-                                       }
+                                       
                                        return control;
                                }
                        }
@@ -528,22 +549,23 @@ char handleTransReq(fixed_data_t *fixed, trans_commit_data_t *transinfo, unsigne
        }
        
        /* Decide what control message to send to Coordinator */
-       if ((val = decideCtrlMessage(fixed, transinfo, &v_matchnolock, &v_matchlock, &v_nomatch, &objnotfound, &objlocked,
+       if ((control = decideCtrlMessage(fixed, transinfo, &v_matchnolock, &v_matchlock, &v_nomatch, &objnotfound, &objlocked,
                                        modptr, oidnotfound, oidlocked, acceptfd)) == 0) {
                printf("Error: In decideCtrlMessage() %s, %d\n", __FILE__, __LINE__);
                return 0;
        }
        
-       return val;
+       return control;
 
 }
 /* This function decides what control message such as TRANS_AGREE, TRANS_DISAGREE or TRANS_SOFT_ABORT
  * to send to Coordinator based on the votes of oids involved in the transaction */
-int decideCtrlMessage(fixed_data_t *fixed, trans_commit_data_t *transinfo, int *v_matchnolock, int *v_matchlock, 
+char decideCtrlMessage(fixed_data_t *fixed, trans_commit_data_t *transinfo, int *v_matchnolock, int *v_matchlock, 
                int *v_nomatch, int *objnotfound, int *objlocked, void *modptr, 
                unsigned int *oidnotfound, unsigned int *oidlocked, int acceptfd) {
        int val;
        char control = 0;
+
        /* Condition to send TRANS_AGREE */
        if(*(v_matchnolock) == fixed->numread + fixed->nummod) {
                control = TRANS_AGREE;
@@ -556,16 +578,21 @@ int decideCtrlMessage(fixed_data_t *fixed, trans_commit_data_t *transinfo, int *
        /* Condition to send TRANS_SOFT_ABORT */
        if((*(v_matchlock) > 0 && *(v_nomatch) == 0) || (*(objnotfound) > 0 && *(v_nomatch) == 0)) {
                control = TRANS_SOFT_ABORT;
-               char msg[]={TRANS_SOFT_ABORT, 0,0,0,0};
-               *((int*)&msg[1])= *(objnotfound);
 
                /* Send control message */
-               if((val = send(acceptfd, &msg, sizeof(msg),MSG_NOSIGNAL)) < sizeof(msg)) {
-                       perror("Error in sending no of objects that are not found\n");
+               if((val = send(acceptfd, &control, sizeof(char),MSG_NOSIGNAL)) < sizeof(char)) {
+                       perror("Error in sending TRANS_SOFT_ABORT control\n");
                        return 0;
                }
+
                /* Send number of oids not found and the missing oids if objects are missing in the machine */
                if(*(objnotfound) != 0) { 
+                       int msg[1];
+                       msg[0] = *(objnotfound);
+                       if((val = send(acceptfd, msg, sizeof(int) ,MSG_NOSIGNAL)) < sizeof(int)) {
+                               perror("Error in sending objects that are not found\n");
+                               return 0;
+                       }
                        int size = sizeof(unsigned int)* *(objnotfound);
                        if((val = send(acceptfd, oidnotfound, size ,MSG_NOSIGNAL)) < size) {
                                perror("Error in sending objects that are not found\n");
@@ -781,6 +808,7 @@ void processReqNotify(unsigned int numoid, unsigned int *oidarry, unsigned short
                        /* Check to see if versions are same */
 checkversion:
                        if ((STATUS(header) & LOCK) != LOCK) {          
+                               //FIXME make locking atomic
                                STATUS(header) |= LOCK;
                                newversion = header->version;
                                if(newversion == *(versionarry + i)) {
index d560c825f0d981ffc3c3d5cb00211b329dfc6f99..c82ab10498f50d173abc4aac227c588ea3bc71cc 100644 (file)
@@ -434,6 +434,7 @@ int transCommit(transrecord_t *record) {
        char localstat = 0;
 
 
+
        /* Look through all the objects in the transaction record and make piles 
         * for each machine involved in the transaction*/
        pile_ptr = pile = createPiles(record);
@@ -623,7 +624,6 @@ int transCommit(transrecord_t *record) {
 void *transRequest(void *threadarg) {
        int sd, i, n;
        struct sockaddr_in serv_addr;
-       struct hostent *server;
        thread_data_array_t *tdata;
        objheader_t *headeraddr;
        char buffer[RECEIVE_BUFFER_SIZE], control, recvcontrol;
@@ -722,11 +722,9 @@ void *transRequest(void *threadarg) {
                pthread_exit(NULL);
        }
 
-       if ((retval = recv((int)sd, &control, sizeof(char), 0))<= 0) {
-               printf("Error: In receiving control %s,%d\n", __FILE__, __LINE__);
-               close(sd);
-               pthread_exit(NULL);
-       }
+       do {
+          retval = recv((int)sd, &control, sizeof(char), 0);
+       } while (retval < sizeof(char));
 
        if(control == TRANS_UNSUCESSFUL) {
                //printf("DEBUG-> TRANS_ABORTED\n");
@@ -798,9 +796,17 @@ char sendResponse(thread_data_array_t *tdata, int sd) {
        char *ptr, retval = 0;
        unsigned int *oidnotfound;
 
+       control = *(tdata->replyctrl);
+       if (send(sd, &control, sizeof(char), MSG_NOSIGNAL) < sizeof(char)) {
+               perror("Error sending ctrl message for participant\n");
+               return 0;
+       }
+
+       //FIXME read missing objects 
        /* If the decided response is due to a soft abort and missing objects at the Participant's side */
+       /*
        if(tdata->recvmsg[tdata->thread_id].rcv_status == TRANS_SOFT_ABORT) {
-               /* Read list of objects missing */
+               // Read list of objects missing  
                if((read(sd, &oidcount, sizeof(int)) != 0) && (oidcount != 0)) {
                        N = oidcount * sizeof(unsigned int);
                        if((oidnotfound = calloc(oidcount, sizeof(unsigned int))) == NULL) {
@@ -815,18 +821,15 @@ char sendResponse(thread_data_array_t *tdata, int sd) {
                }
                retval =  TRANS_SOFT_ABORT;
        }
+       */
+
        /* If the decided response is TRANS_ABORT */
        if(*(tdata->replyctrl) == TRANS_ABORT) {
                retval = TRANS_ABORT;
-       } else if(*(tdata->replyctrl) == TRANS_COMMIT) { /* If the decided response is TRANS_COMMIT */
+       } else if(*(tdata->replyctrl) == TRANS_COMMIT) { /* If the decided response is TRANS_COMMIT */ 
                retval = TRANS_COMMIT;
        }
-
-       if (send(sd, tdata->replyctrl, sizeof(char),MSG_NOSIGNAL) < sizeof(char)) {
-               perror("Error sending ctrl message for participant\n");
-               return 0;
-       }
-
+       
        return retval;
 }
 
@@ -838,7 +841,6 @@ char sendResponse(thread_data_array_t *tdata, int sd) {
 void *getRemoteObj(transrecord_t *record, unsigned int mnum, unsigned int oid) {
        int sd, size, val;
        struct sockaddr_in serv_addr;
-       struct hostent *server;
        char control;
        char machineip[16];
        objheader_t *h;
@@ -1495,10 +1497,10 @@ void *mcqProcess(void *threadid) {
 
 void sendPrefetchReq(prefetchpile_t *mcpilenode) {
        int sd, i, off, len, endpair, count = 0;
-       struct sockaddr_in serv_addr;
-       struct hostent *server;
+       struct sockaddr_in remoteAddr;
        char machineip[16], control;
        objpile_t *tmp;
+       unsigned int mid;
 
        /* Send Trans Prefetch Request */
        if ((sd = socket(AF_INET, SOCK_STREAM, 0)) < 0) {
@@ -1506,16 +1508,17 @@ void sendPrefetchReq(prefetchpile_t *mcpilenode) {
                return;
        }
 
-       bzero((char*) &serv_addr, sizeof(serv_addr));
-       serv_addr.sin_family = AF_INET;
-       serv_addr.sin_port = htons(LISTEN_PORT);
-       midtoIP(mcpilenode->mid ,machineip);
-       machineip[15] = '\0';
-       serv_addr.sin_addr.s_addr = inet_addr(machineip);
+       mid = mcpilenode->mid;
+
+       bzero(&remoteAddr, sizeof(remoteAddr));
+       remoteAddr.sin_family = AF_INET;
+       remoteAddr.sin_port = htons(LISTEN_PORT);
+       remoteAddr.sin_addr.s_addr = htonl(mid);
 
        /* Open Connection */
-       if (connect(sd, (struct sockaddr *) &serv_addr, sizeof(struct sockaddr)) < 0) {
-               perror("Error in connect for SEND_PREFETCH_REQUEST\n");
+       if (connect(sd, (struct sockaddr *)&remoteAddr, sizeof(remoteAddr)) < 0) {
+               printf("%s():error %d connecting to %s:%d\n", __func__, errno,
+                       inet_ntoa(remoteAddr.sin_addr), LISTEN_PORT);
                close(sd);
                return;
        }
@@ -1537,14 +1540,11 @@ void sendPrefetchReq(prefetchpile_t *mcpilenode) {
                char oidnoffset[len];
                bzero(oidnoffset, len);
                *((unsigned int*)oidnoffset) = len;
-               //memcpy(oidnoffset, &len, sizeof(int));
                off = sizeof(int);
                *((unsigned int *)((char *)oidnoffset + off)) = tmp->oid;
-               //memcpy(oidnoffset + off, &tmp->oid, sizeof(unsigned int));
                off += sizeof(unsigned int);
                for(i = 0; i < tmp->numoffset; i++) {
                        *((unsigned short*)((char *)oidnoffset + off)) = tmp->offset[i];
-                       //memcpy(oidnoffset + off, &(tmp->offset[i]), sizeof(short));
                        off+=sizeof(unsigned short);
                }