From 0e58191e5627ba45b808b87ed04d6d8fa3f8b9f0 Mon Sep 17 00:00:00 2001 From: adash Date: Wed, 27 Feb 2008 18:43:56 +0000 Subject: [PATCH] Bug fixes --- Robust/src/Runtime/DSTM/interface/dstm.h | 2 +- .../src/Runtime/DSTM/interface/dstmserver.c | 56 ++++++++++++++----- Robust/src/Runtime/DSTM/interface/trans.c | 56 +++++++++---------- 3 files changed, 71 insertions(+), 43 deletions(-) diff --git a/Robust/src/Runtime/DSTM/interface/dstm.h b/Robust/src/Runtime/DSTM/interface/dstm.h index a6f3213c..c250439f 100644 --- a/Robust/src/Runtime/DSTM/interface/dstm.h +++ b/Robust/src/Runtime/DSTM/interface/dstm.h @@ -205,7 +205,7 @@ void *dstmAccept(void *); int readClientReq(trans_commit_data_t *, int); int processClientReq(fixed_data_t *, trans_commit_data_t *,unsigned int *, char *, void *, unsigned int *, int); char handleTransReq(fixed_data_t *, trans_commit_data_t *, unsigned int *, char *, void *, int); -int decideCtrlMessage(fixed_data_t *, trans_commit_data_t *, int *, int *, int *, int *, int *, void *, unsigned int *, unsigned int *, int); +char decideCtrlMessage(fixed_data_t *, trans_commit_data_t *, int *, int *, int *, int *, int *, void *, unsigned int *, unsigned int *, int); int transCommitProcess(void *, unsigned int *, unsigned int *, int, int, int); void processReqNotify(unsigned int numoid, unsigned int *oid, unsigned short *version, unsigned int mid, unsigned int threadid); /* end server portion */ diff --git a/Robust/src/Runtime/DSTM/interface/dstmserver.c b/Robust/src/Runtime/DSTM/interface/dstmserver.c index a91134af..0c96b83f 100644 --- a/Robust/src/Runtime/DSTM/interface/dstmserver.c +++ b/Robust/src/Runtime/DSTM/interface/dstmserver.c @@ -367,10 +367,10 @@ int readClientReq(trans_commit_data_t *transinfo, int acceptfd) { * Following this it also receives a new control message from the co-ordinator and processes this message*/ int processClientReq(fixed_data_t *fixed, trans_commit_data_t *transinfo, unsigned int *listmid, char *objread, void *modptr, unsigned int *oidmod, int acceptfd) { - char control, sendctrl; + char control, sendctrl, retval; objheader_t *tmp_header; void *header; - int i = 0, val, retval; + int i = 0, val; /* Send reply to the Coordinator */ if((retval = handleTransReq(fixed, transinfo, listmid, objread, modptr,acceptfd)) == 0 ) { @@ -389,7 +389,10 @@ int processClientReq(fixed_data_t *fixed, trans_commit_data_t *transinfo, free(modptr); /* Unlock objects that was locked due to this transaction */ for(i = 0; i< transinfo->numlocked; i++) { - header = mhashSearch(transinfo->objlocked[i]);// find the header address + if((header = mhashSearch(transinfo->objlocked[i])) == NULL) { + printf("mhashSearch returns NULL at %s, %d\n", __FILE__, __LINE__);// find the header address + return 1; + } STATUS(((objheader_t *)header)) &= ~(LOCK); } @@ -496,6 +499,16 @@ char handleTransReq(fixed_data_t *fixed, trans_commit_data_t *transinfo, unsigne v_nomatch++; /* Send TRANS_DISAGREE to Coordinator */ control = TRANS_DISAGREE; + if (objlocked > 0) { + for(j = 0; j < objlocked; j++) { + if((headptr = mhashSearch(oidlocked[j])) == NULL) { + printf("mhashSearch returns NULL at %s, %d\n", __FILE__, __LINE__); + return 0; + } + STATUS(headptr) &= ~(LOCK); + } + free(oidlocked); + } if((val = send(acceptfd, &control, sizeof(char), MSG_NOSIGNAL)) < sizeof(char)) { perror("Error in sending control to the Coordinator\n"); return 0; @@ -512,15 +525,23 @@ char handleTransReq(fixed_data_t *fixed, trans_commit_data_t *transinfo, unsigne } else { /* If versions don't match ...HARD ABORT */ v_nomatch++; control = TRANS_DISAGREE; + if (objlocked > 0) { + for(j = 0; j < objlocked; j++) { + if((headptr = mhashSearch(oidlocked[j])) == NULL) { + printf("mhashSearch returns NULL at %s, %d\n", __FILE__, __LINE__); + return 0; + } + STATUS(headptr) &= ~(LOCK); + } + free(oidlocked); + } + /* Send TRANS_DISAGREE to Coordinator */ if((val = send(acceptfd, &control, sizeof(char), MSG_NOSIGNAL)) < sizeof(char)) { perror("Error in sending control to the Coordinator\n"); return 0; } - if (objlocked > 0) { - STATUS(((objheader_t *)mobj)) &= ~(LOCK); - free(oidlocked); - } + return control; } } @@ -528,22 +549,23 @@ char handleTransReq(fixed_data_t *fixed, trans_commit_data_t *transinfo, unsigne } /* Decide what control message to send to Coordinator */ - if ((val = decideCtrlMessage(fixed, transinfo, &v_matchnolock, &v_matchlock, &v_nomatch, &objnotfound, &objlocked, + if ((control = decideCtrlMessage(fixed, transinfo, &v_matchnolock, &v_matchlock, &v_nomatch, &objnotfound, &objlocked, modptr, oidnotfound, oidlocked, acceptfd)) == 0) { printf("Error: In decideCtrlMessage() %s, %d\n", __FILE__, __LINE__); return 0; } - return val; + return control; } /* This function decides what control message such as TRANS_AGREE, TRANS_DISAGREE or TRANS_SOFT_ABORT * to send to Coordinator based on the votes of oids involved in the transaction */ -int decideCtrlMessage(fixed_data_t *fixed, trans_commit_data_t *transinfo, int *v_matchnolock, int *v_matchlock, +char decideCtrlMessage(fixed_data_t *fixed, trans_commit_data_t *transinfo, int *v_matchnolock, int *v_matchlock, int *v_nomatch, int *objnotfound, int *objlocked, void *modptr, unsigned int *oidnotfound, unsigned int *oidlocked, int acceptfd) { int val; char control = 0; + /* Condition to send TRANS_AGREE */ if(*(v_matchnolock) == fixed->numread + fixed->nummod) { control = TRANS_AGREE; @@ -556,16 +578,21 @@ int decideCtrlMessage(fixed_data_t *fixed, trans_commit_data_t *transinfo, int * /* Condition to send TRANS_SOFT_ABORT */ if((*(v_matchlock) > 0 && *(v_nomatch) == 0) || (*(objnotfound) > 0 && *(v_nomatch) == 0)) { control = TRANS_SOFT_ABORT; - char msg[]={TRANS_SOFT_ABORT, 0,0,0,0}; - *((int*)&msg[1])= *(objnotfound); /* Send control message */ - if((val = send(acceptfd, &msg, sizeof(msg),MSG_NOSIGNAL)) < sizeof(msg)) { - perror("Error in sending no of objects that are not found\n"); + if((val = send(acceptfd, &control, sizeof(char),MSG_NOSIGNAL)) < sizeof(char)) { + perror("Error in sending TRANS_SOFT_ABORT control\n"); return 0; } + /* Send number of oids not found and the missing oids if objects are missing in the machine */ if(*(objnotfound) != 0) { + int msg[1]; + msg[0] = *(objnotfound); + if((val = send(acceptfd, msg, sizeof(int) ,MSG_NOSIGNAL)) < sizeof(int)) { + perror("Error in sending objects that are not found\n"); + return 0; + } int size = sizeof(unsigned int)* *(objnotfound); if((val = send(acceptfd, oidnotfound, size ,MSG_NOSIGNAL)) < size) { perror("Error in sending objects that are not found\n"); @@ -781,6 +808,7 @@ void processReqNotify(unsigned int numoid, unsigned int *oidarry, unsigned short /* Check to see if versions are same */ checkversion: if ((STATUS(header) & LOCK) != LOCK) { + //FIXME make locking atomic STATUS(header) |= LOCK; newversion = header->version; if(newversion == *(versionarry + i)) { diff --git a/Robust/src/Runtime/DSTM/interface/trans.c b/Robust/src/Runtime/DSTM/interface/trans.c index d560c825..c82ab104 100644 --- a/Robust/src/Runtime/DSTM/interface/trans.c +++ b/Robust/src/Runtime/DSTM/interface/trans.c @@ -434,6 +434,7 @@ int transCommit(transrecord_t *record) { char localstat = 0; + /* Look through all the objects in the transaction record and make piles * for each machine involved in the transaction*/ pile_ptr = pile = createPiles(record); @@ -623,7 +624,6 @@ int transCommit(transrecord_t *record) { void *transRequest(void *threadarg) { int sd, i, n; struct sockaddr_in serv_addr; - struct hostent *server; thread_data_array_t *tdata; objheader_t *headeraddr; char buffer[RECEIVE_BUFFER_SIZE], control, recvcontrol; @@ -722,11 +722,9 @@ void *transRequest(void *threadarg) { pthread_exit(NULL); } - if ((retval = recv((int)sd, &control, sizeof(char), 0))<= 0) { - printf("Error: In receiving control %s,%d\n", __FILE__, __LINE__); - close(sd); - pthread_exit(NULL); - } + do { + retval = recv((int)sd, &control, sizeof(char), 0); + } while (retval < sizeof(char)); if(control == TRANS_UNSUCESSFUL) { //printf("DEBUG-> TRANS_ABORTED\n"); @@ -798,9 +796,17 @@ char sendResponse(thread_data_array_t *tdata, int sd) { char *ptr, retval = 0; unsigned int *oidnotfound; + control = *(tdata->replyctrl); + if (send(sd, &control, sizeof(char), MSG_NOSIGNAL) < sizeof(char)) { + perror("Error sending ctrl message for participant\n"); + return 0; + } + + //FIXME read missing objects /* If the decided response is due to a soft abort and missing objects at the Participant's side */ + /* if(tdata->recvmsg[tdata->thread_id].rcv_status == TRANS_SOFT_ABORT) { - /* Read list of objects missing */ + // Read list of objects missing if((read(sd, &oidcount, sizeof(int)) != 0) && (oidcount != 0)) { N = oidcount * sizeof(unsigned int); if((oidnotfound = calloc(oidcount, sizeof(unsigned int))) == NULL) { @@ -815,18 +821,15 @@ char sendResponse(thread_data_array_t *tdata, int sd) { } retval = TRANS_SOFT_ABORT; } + */ + /* If the decided response is TRANS_ABORT */ if(*(tdata->replyctrl) == TRANS_ABORT) { retval = TRANS_ABORT; - } else if(*(tdata->replyctrl) == TRANS_COMMIT) { /* If the decided response is TRANS_COMMIT */ + } else if(*(tdata->replyctrl) == TRANS_COMMIT) { /* If the decided response is TRANS_COMMIT */ retval = TRANS_COMMIT; } - - if (send(sd, tdata->replyctrl, sizeof(char),MSG_NOSIGNAL) < sizeof(char)) { - perror("Error sending ctrl message for participant\n"); - return 0; - } - + return retval; } @@ -838,7 +841,6 @@ char sendResponse(thread_data_array_t *tdata, int sd) { void *getRemoteObj(transrecord_t *record, unsigned int mnum, unsigned int oid) { int sd, size, val; struct sockaddr_in serv_addr; - struct hostent *server; char control; char machineip[16]; objheader_t *h; @@ -1495,10 +1497,10 @@ void *mcqProcess(void *threadid) { void sendPrefetchReq(prefetchpile_t *mcpilenode) { int sd, i, off, len, endpair, count = 0; - struct sockaddr_in serv_addr; - struct hostent *server; + struct sockaddr_in remoteAddr; char machineip[16], control; objpile_t *tmp; + unsigned int mid; /* Send Trans Prefetch Request */ if ((sd = socket(AF_INET, SOCK_STREAM, 0)) < 0) { @@ -1506,16 +1508,17 @@ void sendPrefetchReq(prefetchpile_t *mcpilenode) { return; } - bzero((char*) &serv_addr, sizeof(serv_addr)); - serv_addr.sin_family = AF_INET; - serv_addr.sin_port = htons(LISTEN_PORT); - midtoIP(mcpilenode->mid ,machineip); - machineip[15] = '\0'; - serv_addr.sin_addr.s_addr = inet_addr(machineip); + mid = mcpilenode->mid; + + bzero(&remoteAddr, sizeof(remoteAddr)); + remoteAddr.sin_family = AF_INET; + remoteAddr.sin_port = htons(LISTEN_PORT); + remoteAddr.sin_addr.s_addr = htonl(mid); /* Open Connection */ - if (connect(sd, (struct sockaddr *) &serv_addr, sizeof(struct sockaddr)) < 0) { - perror("Error in connect for SEND_PREFETCH_REQUEST\n"); + if (connect(sd, (struct sockaddr *)&remoteAddr, sizeof(remoteAddr)) < 0) { + printf("%s():error %d connecting to %s:%d\n", __func__, errno, + inet_ntoa(remoteAddr.sin_addr), LISTEN_PORT); close(sd); return; } @@ -1537,14 +1540,11 @@ void sendPrefetchReq(prefetchpile_t *mcpilenode) { char oidnoffset[len]; bzero(oidnoffset, len); *((unsigned int*)oidnoffset) = len; - //memcpy(oidnoffset, &len, sizeof(int)); off = sizeof(int); *((unsigned int *)((char *)oidnoffset + off)) = tmp->oid; - //memcpy(oidnoffset + off, &tmp->oid, sizeof(unsigned int)); off += sizeof(unsigned int); for(i = 0; i < tmp->numoffset; i++) { *((unsigned short*)((char *)oidnoffset + off)) = tmp->offset[i]; - //memcpy(oidnoffset + off, &(tmp->offset[i]), sizeof(short)); off+=sizeof(unsigned short); } -- 2.34.1