From adc12595ec51a2b2d3195a5fd37b4864cd4e09e9 Mon Sep 17 00:00:00 2001 From: jihoonl Date: Tue, 22 Jun 2010 04:09:44 +0000 Subject: [PATCH] recovery.. still needs to fix --- .../Runtime/DSTM/interface_recovery/dstm.h | 9 +- .../DSTM/interface_recovery/dstmserver.c | 392 ++++++++++-------- .../Runtime/DSTM/interface_recovery/trans.c | 323 +++++++++------ .../DSTM/interface_recovery/translist.c | 32 +- .../DSTM/interface_recovery/translist.h | 9 +- 5 files changed, 442 insertions(+), 323 deletions(-) diff --git a/Robust/src/Runtime/DSTM/interface_recovery/dstm.h b/Robust/src/Runtime/DSTM/interface_recovery/dstm.h index ece2f51c..80c42f64 100644 --- a/Robust/src/Runtime/DSTM/interface_recovery/dstm.h +++ b/Robust/src/Runtime/DSTM/interface_recovery/dstm.h @@ -282,12 +282,11 @@ unsigned int updateLiveHosts(); void updateLiveHostsList(int mid); int updateLiveHostsCommit(); void receiveNewHostLists(int accept); -void stopTransactions(int TRANS_FLAG); +int stopTransactions(int TRANS_FLAG,unsigned int epoch_num); void sendMyList(int); void sendTransList(int acceptfd); int receiveTransList(int acceptfd); int combineTransactionList(tlist_node_t* tArray,int size); -char inspectTransaction(char control,unsigned int transid,char* debug,int TRANS_FLAG); void respondToLeader(); void setLocateObjHosts(); @@ -311,12 +310,12 @@ void notifyLeaderDeadMachine(unsigned int deadHost); void restoreDuplicationState(unsigned int deadHost,unsigned int epoch_num); int* getSocketLists(); void freeSocketLists(int*); -int inspectEpoch(unsigned int); +int inspectEpoch(unsigned int,const char*); int pingMachines(unsigned int epoch_num,int* sdlist,tlist_t**); int releaseNewLists(unsigned int epoch_num,int* sdlist,tlist_t*); int duplicateLostObjects(unsigned int epoch_num,int* sdlist); void restartTransactions(unsigned int epoch_num,int* sdlist); -int makeTransactionLists(tlist_t**,int); +int makeTransactionLists(tlist_t**,int sd,unsigned int epoch_num); int computeLiveHosts(int); void waitForAllMachine(); int readDuplicateObjs(int); @@ -335,6 +334,7 @@ void *dstmAccept(void *); int readClientReq(trans_commit_data_t *, int); int processClientReq(fixed_data_t *, trans_commit_data_t *,unsigned int *, char *, void *, unsigned int *, int); +void commitObjects(char control,fixed_data_t* fixed,trans_commit_data_t* transinfo,void* modptr,unsigned int* oidmod,int acceptfd); char checkDecision(unsigned int); char receiveDecisionFromBackup(unsigned int,int,unsigned int*); char handleTransReq(fixed_data_t *, trans_commit_data_t *, unsigned int *, char *, void *, int); @@ -378,6 +378,7 @@ __attribute__((pure)) objheader_t *transRead2(unsigned int); objheader_t *transCreateObj(unsigned int); //returns oid header unsigned int locateBackupMachine(unsigned int oid); int transCommit(); //return 0 if successful +void commitMessages(unsigned int epoch_num,int* sdlist,unsigned int deadsd,int pilecount,trans_req_data_t* tosend,char finalResponse,char treplyretry,trans_commit_data_t transinfo); void *transRequest(void *); //the C routine that the thread will execute when TRANS_REQUEST begins char decideResponse(char *, char *, int); // Coordinator decides what response to send to the participant void *getRemoteObj(unsigned int, unsigned int); // returns object header from main object store after object is copied into it from remote machine diff --git a/Robust/src/Runtime/DSTM/interface_recovery/dstmserver.c b/Robust/src/Runtime/DSTM/interface_recovery/dstmserver.c index 7c7fb5f7..b28d3140 100644 --- a/Robust/src/Runtime/DSTM/interface_recovery/dstmserver.c +++ b/Robust/src/Runtime/DSTM/interface_recovery/dstmserver.c @@ -38,6 +38,7 @@ extern int *liveHosts; extern int numLiveHostsInSystem; int clearNotifyListFlag; pthread_mutex_t clearNotifyList_mutex; +pthread_mutex_t translist_mutex; tlist_t* transList; int okCommit; // machine flag @@ -80,6 +81,7 @@ int dstmInit(void) { pthread_mutex_init(&liveHosts_mutex, NULL); pthread_mutex_init(&recovery_mutex, NULL); pthread_mutex_init(&clearNotifyList_mutex,NULL); + pthread_mutex_init(&translist_mutex,NULL); #endif if (mhashCreate(MHASH_SIZE, MLOADFACTOR)) @@ -315,6 +317,7 @@ void *dstmAccept(void *acceptfd) { while(1) { int ret=recv_data_errorcode((int)acceptfd, &control, sizeof(char)); //int ret=recv_data_errorcode_buf((int)acceptfd, &readbuffer, &control, sizeof(char)); +// printf("%s -> Received control = %d\n",__func__,control); dupeptr = NULL; if (ret==0) @@ -514,28 +517,31 @@ void *dstmAccept(void *acceptfd) { #endif #ifdef RECOVERY case REQUEST_TRANS_WAIT: - { + { unsigned int new_leader_index; recv_data((int)acceptfd,&epoch_num,sizeof(unsigned int)); recv_data((int)acceptfd,&new_leader_index,sizeof(unsigned int)); - if(inspectEpoch(epoch_num) < 0) { + if(inspectEpoch(epoch_num,"REQUEST_TRANS_WAIT") < 0) { response = RESPOND_HIGHER_EPOCH; send_data((int)acceptfd,&response,sizeof(char)); } else { printf("Got new Leader! : %d\n",epoch_num); - - stopTransactions(TRANS_BEFORE); - pthread_mutex_lock(&recovery_mutex); currentEpoch = epoch_num; + okCommit = TRANS_BEFORE; leader_index = new_leader_index; pthread_mutex_unlock(&recovery_mutex); - - response = RESPOND_TRANS_WAIT; - send_data((int)acceptfd,&response,sizeof(char)); - sendMyList((int)acceptfd); + if(stopTransactions(TRANS_BEFORE,epoch_num) < 0) { + response = RESPOND_HIGHER_EPOCH; + send_data((int)acceptfd,&response,sizeof(char)); + } + else { + response = RESPOND_TRANS_WAIT; + send_data((int)acceptfd,&response,sizeof(char)); + sendMyList((int)acceptfd); + } } } break; @@ -545,14 +551,16 @@ void *dstmAccept(void *acceptfd) { { recv_data((int)acceptfd,&epoch_num,sizeof(unsigned int)); - if(inspectEpoch(epoch_num) < 0) { + if(inspectEpoch(epoch_num,"RELEASE_NEW_LIST") < 0) { response = RESPOND_HIGHER_EPOCH; } else { response = receiveNewList((int)acceptfd); - stopTransactions(TRANS_AFTER); + if(stopTransactions(TRANS_AFTER,epoch_num) < -1) + response = RESPOND_HIGHER_EPOCH; } + printf("After stop transaction\n"); send_data((int)acceptfd,&response,sizeof(char)); } break; @@ -561,7 +569,7 @@ void *dstmAccept(void *acceptfd) { recv_data((int)acceptfd,&epoch_num,sizeof(char)); - if(inspectEpoch(epoch_num) < 0) break; + if(inspectEpoch(epoch_num,"REQUEST_TRANS_RESTART") < 0) break; pthread_mutex_lock(&liveHosts_mutex); printf("RESTART!!!\n"); @@ -598,7 +606,7 @@ void *dstmAccept(void *acceptfd) { recv_data((int)acceptfd,&epoch_num,sizeof(unsigned int)); - if(inspectEpoch(epoch_num) < 0) { + if(inspectEpoch(epoch_num,"REQUEST_DUPLICATE") < 0) { break; } @@ -731,10 +739,8 @@ int readDuplicateObjs(int acceptfd) { return -1; } - printf("%s -> PAss this point\n",__func__); ptr = dupeptr; - printf("%s -> numoid = %u\n",__func__,numoid); for(i = 0; i < numoid; i++) { header = (objheader_t *)ptr; oid = OID(header); @@ -824,10 +830,6 @@ int readClientReq(trans_commit_data_t *transinfo, int acceptfd) { fixed.control = TRANS_REQUEST; timeout = recv_data((int)acceptfd, ptr+1, size); -#ifdef RECOVERY - transList = tlistInsertNode(transList,fixed.transid,TRYING_TO_COMMIT,TRANS_OK); -#endif - /* Read list of mids */ int mcount = fixed.mcount; size = mcount * sizeof(unsigned int); @@ -915,109 +917,155 @@ int processClientReq(fixed_data_t *fixed, trans_commit_data_t *transinfo, char control, sendctrl, retval; objheader_t *tmp_header; - void *header; - int i = 0, val; + int i = 0; + unsigned int epoch_num; + tlist_node_t* tNode; +#ifdef DEBUG + printf("%s -> Enter\n",__func__); +#endif +// printf("%s -> transID : %u\n",__func__,fixed->transid); + if(inspectEpoch(fixed->epoch_num,"procesClient1") < 0) { + printf("%s-> Higher Epoch current epoch = %u epoch %u\n",__func__,currentEpoch,fixed->epoch_num); + control = RESPOND_HIGHER_EPOCH; + send_data((int)acceptfd,&control,sizeof(char)); + } /* Send reply to the Coordinator */ - if((retval = handleTransReq(fixed, transinfo, listmid, objread, modptr,acceptfd)) == 0 ) { + else if((retval = handleTransReq(fixed, transinfo, listmid, objread, modptr,acceptfd)) == 0 ) { printf("Error: In handleTransReq() %s, %d\n", __FILE__, __LINE__); printf("DEBUG-> Exiting processClientReq, line = %d\n", __LINE__); return 1; } // printf("%s -> Waiting for transID : %u\n",__func__,fixed->transid); + int timeout1 = recv_data((int)acceptfd, &control, sizeof(char)); + int timeout2 = recv_data((int)acceptfd, &epoch_num, sizeof(unsigned int)); - int timeout = recv_data((int)acceptfd, &control, sizeof(char)); - -#ifdef RECOVERY - if(timeout < 0) { // timeout. failed to receiving data from coordinator - control = -1; + if(timeout1 < 0 || timeout2 < 0) { // timeout. failed to receiving data from coordinator + control = DECISION_LOST; } + + pthread_mutex_lock(&translist_mutex); + transList = tlistInsertNode(transList,fixed->transid,control,TRYING_TO_COMMIT,epoch_num); + pthread_mutex_unlock(&translist_mutex); + + pthread_mutex_lock(&translist_mutex); + tNode = tlistSearch(transList,fixed->transid); + pthread_mutex_unlock(&translist_mutex); + // check if it is allowed to commit - control = inspectTransaction(control,fixed->transid,"processClientReq",TRANS_BEFORE); - thashInsert(fixed->transid, control); + do { + tNode->status = TRANS_INPROGRESS; + if(okCommit != TRANS_BEFORE) { + if(inspectEpoch(tNode->epoch_num,"processCleint2") > 0) { + tNode->status = TRANS_INPROGRESS; + thashInsert(fixed->transid,tNode->decision); + commitObjects(tNode->decision,fixed,transinfo,modptr,oidmod,acceptfd); + tNode->status = TRANS_AFTER; + } + if(okCommit == TRANS_AFTER) { + printf("%s -> 11 \ttransID : %u decision : %d status : %d \n",__func__,tNode->transid,tNode->decision,tNode->status); + sleep(3); + } + } + else { + tNode->status = TRYING_TO_COMMIT; + printf("%s -> Waiting!! \ttransID : %u decision : %d status : %d \n",__func__,tNode->transid,tNode->decision,tNode->status); + sleep(3); + randomdelay(); + } + + }while(tNode->status != TRANS_AFTER); + + if(okCommit == TRANS_AFTER) + { + printf("%s -> TRANS_AFTER!! \ttransID : %u decision : %d status : %d \n",__func__,tNode->transid,tNode->decision,tNode->status); + printf("%s -> Before removing\n",__func__); + } + + + pthread_mutex_lock(&translist_mutex); + transList = tlistRemove(transList,fixed->transid); + pthread_mutex_unlock(&translist_mutex); + + if(okCommit == TRANS_AFTER) + printf("%s -> After removing\n",__func__); + + /* Free memory */ + if (transinfo->objlocked != NULL) { + free(transinfo->objlocked); + } + if (transinfo->objnotfound != NULL) { + free(transinfo->objnotfound); + } +#ifdef DEBUG + printf("%s-> Exit\n", __func__); #endif + return 0; +} + +void commitObjects(char control,fixed_data_t* fixed,trans_commit_data_t* transinfo,void* modptr,unsigned int* oidmod,int acceptfd) +{ + void *header; + int val; + int i; + switch(control) { - case TRANS_ABORT: - if (fixed->nummod > 0) - free(modptr); - /* Unlock objects that was locked due to this transaction */ - int useWriteUnlock = 0; + case TRANS_ABORT: + if (fixed->nummod > 0) + free(modptr); + /* Unlock objects that was locked due to this transaction */ + int useWriteUnlock = 0; for(i = 0; i< transinfo->numlocked; i++) { - if(transinfo->objlocked[i] == -1) { - useWriteUnlock = 1; - continue; + if(transinfo->objlocked[i] == -1) { + useWriteUnlock = 1; + continue; } if((header = mhashSearch(transinfo->objlocked[i])) == NULL) { - printf("mhashSearch returns NULL at %s, %d\n", __FILE__, __LINE__); // find the header address - printf("%s-> Exiting, line:%d\n", __func__, __LINE__); - return 1; + printf("mhashSearch returns NULL at %s, %d\n", __FILE__, __LINE__); // find the header address + printf("%s-> Exiting, line:%d\n", __func__, __LINE__); + exit(0); + return ; } if(useWriteUnlock) { - write_unlock(STATUSPTR(header)); + write_unlock(STATUSPTR(header)); } else { - read_unlock(STATUSPTR(header)); - } + read_unlock(STATUSPTR(header)); + } } break; - - case TRANS_COMMIT: + case TRANS_COMMIT: /* insert received control into thash for another transaction*/ /* Invoke the transCommit process() */ - if((val = transCommitProcess(modptr, oidmod, transinfo->objlocked, fixed->nummod, transinfo->numlocked, (int)acceptfd)) != 0) { - printf("Error: In transCommitProcess() %s, %d\n", __FILE__, __LINE__); + if((val = transCommitProcess(modptr, oidmod, transinfo->objlocked, fixed->nummod, transinfo->numlocked, (int)acceptfd)) != 0) { + printf("Error: In transCommitProcess() %s, %d\n", __FILE__, __LINE__); /* Free memory */ if (transinfo->objlocked != NULL) { - free(transinfo->objlocked); + free(transinfo->objlocked); } if (transinfo->objnotfound != NULL) { - free(transinfo->objnotfound); - } - printf("%s-> Exiting, line:%d\n", __func__, __LINE__); - return 1; + free(transinfo->objnotfound); + } + printf("%s-> Exiting, line:%d\n", __func__, __LINE__); + exit(0); + return; } - - - break; - - case TRANS_ABORT_BUT_RETRY_COMMIT_WITH_RELOCATING: - break; - - default: - printf("Error: No response to TRANS_AGREE OR DISAGREE protocol %s, %d\n", __FILE__, __LINE__); - //TODO Use fixed.trans_id TID since Client may have died - break; - } - -#ifdef RECOVERY - inspectTransaction(control,fixed->transid,"processClientReq",TRANS_AFTER); - - tlist_node_t* tNode = tlistSearch(transList,fixed->transid); - tNode->status = TRANS_OK; - - pthread_mutex_lock(&clearNotifyList_mutex); - transList = tlistRemove(transList,fixed->transid); - pthread_mutex_unlock(&clearNotifyList_mutex); - -#endif - - /* Free memory */ - if (transinfo->objlocked != NULL) { - free(transinfo->objlocked); - } - if (transinfo->objnotfound != NULL) { - free(transinfo->objnotfound); + break; + case TRANS_ABORT_BUT_RETRY_COMMIT_WITH_RELOCATING: + break; + default: + printf("%s : No response to TRANS_AGREE OR DISAGREE protocol - transID = %u, control = %d\a\n",__func__,fixed->transid); + //TODO Use fixed.trans_id TID since Client may have died + break; } -#ifdef DEBUG - printf("%s-> Exiting, line:%d\n", __func__, __LINE__); -#endif - - return 0; -} - -#ifdef RECOVERY + + return; +} + + + char checkDecision(unsigned int transID) { #ifdef DEBUG @@ -1031,7 +1079,6 @@ char checkDecision(unsigned int transID) else return response; } -#endif /* This function increments counters while running a voting decision on all objects involved * in TRANS_REQUEST and If a TRANS_DISAGREE sends the response immediately back to the coordinator */ @@ -1042,6 +1089,9 @@ char handleTransReq(fixed_data_t *fixed, trans_commit_data_t *transinfo, unsigne unsigned int oid; unsigned int *oidnotfound, *oidlocked, *oidvernotmatch; objheader_t *headptr; +#ifdef DEBUG + printf("%s -> Enter\n",__func__); +#endif /* Counters and arrays to formulate decision on control message to be sent */ oidnotfound = (unsigned int *) calloc(fixed->numread + fixed->nummod, sizeof(unsigned int)); @@ -1135,27 +1185,6 @@ char handleTransReq(fixed_data_t *fixed, trans_commit_data_t *transinfo, unsigne offset += size; } #endif - /* - if (objlocked > 0) { - int useWriteUnlock = 0; - for(j = 0; j < objlocked; j++) { - if(oidlocked[j] == -1) { - useWriteUnlock = 1; - continue; - } - if((headptr = mhashSearch(oidlocked[j])) == NULL) { - printf("mhashSearch returns NULL at %s, %d\n", __FILE__, __LINE__); - return 0; - } - if(useWriteUnlock) { - write_unlock(STATUSPTR(headptr)); - } else { - read_unlock(STATUSPTR(headptr)); - } - } - free(oidlocked); - } - */ #ifdef DEBUG printf("%s -> control = %d, file = %s, line = %d\n", __func__,(int)control, __FILE__, __LINE__); @@ -1165,6 +1194,7 @@ char handleTransReq(fixed_data_t *fixed, trans_commit_data_t *transinfo, unsigne printf("control = %d\n",control); control=TRANS_DISAGREE; + printf("%s -> Sent message!\n",__func__); send_data(acceptfd, &control, sizeof(char)); #ifdef CACHE send_data(acceptfd, &numBytes, sizeof(int)); @@ -1183,6 +1213,9 @@ char handleTransReq(fixed_data_t *fixed, trans_commit_data_t *transinfo, unsigne printf("Error: In decideCtrlMessage() %s, %d\n", __FILE__, __LINE__); return 0; } +#ifdef DEBUG + printf("%s -> Exit\n",__func__); +#endif return control; } @@ -1414,6 +1447,9 @@ char decideCtrlMessage(fixed_data_t *fixed, trans_commit_data_t *transinfo, int unsigned int *oidnotfound, unsigned int *oidlocked, int acceptfd) { int val; char control = 0; +#ifdef DEBUG + printf("%s -> Enter\n",__func__); +#endif /* Condition to send TRANS_AGREE */ if(*(v_matchnolock) == fixed->numread + fixed->nummod) { @@ -1436,16 +1472,6 @@ char decideCtrlMessage(fixed_data_t *fixed, trans_commit_data_t *transinfo, int #endif /* Send control message */ send_data(acceptfd, &control, sizeof(char)); - - - /* FIXME how to send objs Send number of oids not found and the missing oids if objects are missing in the machine */ - /*if(*(objnotfound) != 0) { - int msg[1]; - msg[0] = *(objnotfound); - send_data(acceptfd, &msg, sizeof(int)); - int size = sizeof(unsigned int)* *(objnotfound); - send_data(acceptfd, oidnotfound, size); - }*/ } /* Fill out the trans_commit_data_t data structure. This is required for a trans commit process @@ -1455,6 +1481,10 @@ char decideCtrlMessage(fixed_data_t *fixed, trans_commit_data_t *transinfo, int transinfo->modptr = modptr; transinfo->numlocked = *(objlocked); transinfo->numnotfound = *(objnotfound); + +#ifdef DEBUG + printf("%s -> Exit\n",__func__); +#endif return control; } @@ -1816,37 +1846,62 @@ void receiveNewHostLists(int acceptfd) } /* wait until all transaction waits for leader's decision */ -void stopTransactions(int TRANS_FLAG) +int stopTransactions(int TRANS_FLAG,unsigned int epoch_num) { // printf("%s - > Enter flag :%d\n",__func__,TRANS_FLAG); int size = transList->size; int i; + int flag; tlist_node_t* walker; - - pthread_mutex_lock(&liveHosts_mutex); - okCommit = TRANS_FLAG; - pthread_mutex_unlock(&liveHosts_mutex); - /* make sure that all transactions are stopped */ - pthread_mutex_lock(&clearNotifyList_mutex); + if(TRANS_FLAG == TRANS_BEFORE) { + okCommit = TRANS_BEFORE; + /* make sure that all transactions are stopped */ + do { + transList->flag = 0; + walker = transList->head; - do { - transList->flag = 0; - walker = transList->head; + while(walker) + { + // locking + while(walker->status == TRANS_INPROGRESS) { + printf("%s ->transid : %u - decision %d Status : %d Waitflag = %d\n",__func__,walker->transid,walker->decision,walker->status,TRANS_FLAG); + if(inspectEpoch(epoch_num,"stopTrans_Before") < 0) + return -1; + sleep(3); + } + walker = walker->next; + } - while(walker) - { - // locking - while(!(walker->status == TRANS_FLAG || walker->status == TRANS_OK)) { -// printf("%s -> Waiting for %u - Status : %d tHash : %d\n",__func__,walker->transid,walker->status,thashSearch(walker->transid)); - randomdelay(); + pthread_mutex_lock(&translist_mutex); + flag = transList->flag; + pthread_mutex_unlock(&translist_mutex); + }while(flag == 1); + } + else if(TRANS_FLAG == TRANS_AFTER) + { + printf("%s -> TRANS_AFTER\n",__func__); + okCommit = TRANS_AFTER; + do { + pthread_mutex_lock(&translist_mutex); + size = transList->size; + printf("%s -> size = %d\n",__func__,size); + printf("%s -> okCommit = %d\n",__func__,okCommit); + walker = transList->head; + while(walker){ + printf("%s ->transid : %u - decision %d Status : %d epoch = %u current epoch : %u\n",__func__,walker->transid,walker->decision,walker->status,walker->epoch_num,currentEpoch); + walker = walker->next; } + pthread_mutex_unlock(&translist_mutex); - walker = walker->next; - } - }while(transList->flag == 1); + if(inspectEpoch(epoch_num,"stopTrans_Before") < 0) + return -1; - pthread_mutex_unlock(&clearNotifyList_mutex); + sleep(3); + }while(size != 0); + } + + return 0; } void sendMyList(int acceptfd) @@ -1860,18 +1915,28 @@ void sendMyList(int acceptfd) void sendTransList(int acceptfd) { + printf("%s -> Enter\n",__func__); int size; char response; int transid; + int i; + tlist_node_t* walker = transList->head; // send on-going transaction + pthread_mutex_lock(&translist_mutex); tlist_node_t* transArray = tlistToArray(transList,&size); + pthread_mutex_unlock(&translist_mutex); -/* if(transList->size != 0) + if(transList->size != 0) tlistPrint(transList); printf("%s -> transList->size : %d size = %d\n",__func__,transList->size,size); -*/ + + for(i = 0; i< size; i++) { + printf("ID : %u Decision : %d status : %d\n",transArray[i].transid,transArray[i].decision,transArray[i].status); + } + printf("%s -> End transArray\n",__func__); + send_data((int)acceptfd,&size,sizeof(int)); send_data((int)acceptfd,transArray, sizeof(tlist_node_t) * size); @@ -1894,6 +1959,7 @@ void sendTransList(int acceptfd) int receiveNewList(int acceptfd) { + printf("%s -> Enter\n",__func__); int size; tlist_node_t* tArray; tlist_node_t* walker; @@ -1933,6 +1999,7 @@ int receiveNewList(int acceptfd) response = -1; } + printf("%s -> Exit\n",__func__); return response; } @@ -1951,7 +2018,7 @@ int combineTransactionList(tlist_node_t* tArray,int size) if(walker->transid == tArray[i].transid) { walker->decision = tArray[i].decision; -// walker->status = tArray[i].status; + walker->epoch_num = tArray[i].epoch_num; break; } } @@ -1961,37 +2028,4 @@ int combineTransactionList(tlist_node_t* tArray,int size) return flag; } -char inspectTransaction(char finalResponse,unsigned int transid,char* debug,int TRANS_FLAG) -{ - tlist_node_t* tNode; - - tNode = tlistSearch(transList,transid); - - if(finalResponse <= 0) { - tNode->decision = DECISION_LOST; - } - else { - tNode->decision = finalResponse; - } - -// printf("%s -> decision = %d okCommit = %d\n",__func__,tNode->decision,okCommit); - - if((tNode->decision == DECISION_LOST) || (okCommit != TRANS_OK)) - { - pthread_mutex_lock(&liveHosts_mutex); - tNode->status = TRANS_FLAG; - pthread_mutex_unlock(&liveHosts_mutex); - - // if decision is not lost and okCommit is not TRANS_FLAG, get out of this loop - while(!((tNode->decision != DECISION_LOST) && (okCommit != TRANS_FLAG))) { -// printf("%s -> transID : %u decision : %d is waiting flag : %d\n",debug,tNode->transid,tNode->decision,TRANS_FLAG); - randomdelay(); - } - - finalResponse = tNode->decision; - } - - return finalResponse; -} - #endif diff --git a/Robust/src/Runtime/DSTM/interface_recovery/trans.c b/Robust/src/Runtime/DSTM/interface_recovery/trans.c index 6a8b12d4..1d8ad41d 100644 --- a/Robust/src/Runtime/DSTM/interface_recovery/trans.c +++ b/Robust/src/Runtime/DSTM/interface_recovery/trans.c @@ -111,6 +111,7 @@ unsigned int transIDMax; char ip[16]; // for debugging purpose extern tlist_t* transList; +extern pthread_mutex_t translist_mutex; extern pthread_mutex_t clearNotifyList_mutex; unsigned int currentEpoch; @@ -240,6 +241,7 @@ GDBRECV1: numbytes = recv(fd, buffer, size, 0); bytesRecv += numbytes; + if (numbytes>0) { buffer += numbytes; size -= numbytes; @@ -275,6 +277,8 @@ GDBRECV1: return -2; } } else { +// printf("%s -> Here?\n",__func__); +// printf("%s-> errno = %d %s\n", __func__, errno, strerror(errno)); //Case: numbytes==0 //machine has failed -- this case probably doesn't occur in reality // @@ -1202,6 +1206,8 @@ int transCommit() { int deadsd = -1; int deadmid = -1; unsigned int transID = getNewTransID(); + unsigned int epoch_num; + tlist_node_t* tNode; #endif #ifdef DEBUG @@ -1217,7 +1223,7 @@ int transCommit() { removetransactionhash(); objstrDelete(t_cache); t_chashDelete(); -#ifdef DEBUG +#ifndef DEBUG printf("%s-> End, line:%d\n\n", __func__, __LINE__); #endif return 1; @@ -1230,10 +1236,8 @@ int transCommit() { // sleep(1); randomdelay(); } - - transList = tlistInsertNode(transList,transID,TRYING_TO_COMMIT,TRANS_OK); #endif - + int treplyretryCount = 0; /* Initialize timeout for exponential delay */ exponential_backoff.tv_sec = 0; @@ -1242,6 +1246,10 @@ int transCommit() { do { treplyretry = 0; + pthread_mutex_lock(&recovery_mutex); + epoch_num = currentEpoch; + pthread_mutex_unlock(&recovery_mutex); + /* Look through all the objects in the transaction record and make piles * for each machine involved in the transaction*/ if (firsttime) { @@ -1263,6 +1271,7 @@ int transCommit() { /* Create a socket and getReplyCtrl array, initialize */ int socklist[pilecount]; + unsigned int midlist[pilecount]; char getReplyCtrl[pilecount]; int loopcount; for(loopcount = 0 ; loopcount < pilecount; loopcount++) { @@ -1276,6 +1285,8 @@ int transCommit() { trans_req_data_t *tosend; tosend = calloc(pilecount, sizeof(trans_req_data_t)); +// printf("%s -> transID : %u Start!\n",__func__,transID); + while(pile != NULL) { #ifdef DEBUG printf("%s-> New pile:[%s],", __func__, midtoIPString(pile->mid)); @@ -1288,20 +1299,22 @@ int transCommit() { tosend[sockindex].f.nummod = pile->nummod; tosend[sockindex].f.numcreated = pile->numcreated; tosend[sockindex].f.sum_bytes = pile->sum_bytes; - tosend[sockindex].f.epoch_num = currentEpoch; + tosend[sockindex].f.epoch_num = epoch_num; tosend[sockindex].listmid = listmid; tosend[sockindex].objread = pile->objread; tosend[sockindex].oidmod = pile->oidmod; tosend[sockindex].oidcreated = pile->oidcreated; - int sd = 0; + midlist[sockindex] = pile->mid; // debugging purpose + + int sd = 0; if(pile->mid != myIpAddr) { if((sd = getSockWithLock(transRequestSockPool, pile->mid)) < 0) { printf("\ntransRequest(): socket create error\n"); free(listmid); free(tosend); -#ifdef DEBUG +#ifndef DEBUG printf("%s-> End, line:%d\n\n", __func__, __LINE__); #endif return 1; @@ -1353,6 +1366,7 @@ int transCommit() { send_data(sd, modptr, tosend[sockindex].f.sum_bytes); //forcesend_buf(sd, &writebuffer, modptr, tosend[sockindex].f.sum_bytes); + free(modptr); } else { //handle request locally handleLocalReq(&tosend[sockindex], &transinfo, &getReplyCtrl[sockindex]); @@ -1363,7 +1377,7 @@ int transCommit() { /* Recv Ctrl msgs from all machines */ #ifdef DEBUG - printf("%s-> Finished sending transaction read/mod objects\n",__func__); + printf("%s-> Finished sending transaction read/mod objects transID = %u\n",__func__,transID); #endif int i; @@ -1372,11 +1386,10 @@ int transCommit() { if(sd != 0) { char control; int timeout; // a variable to check if the connection is still alive. if it is -1, then need to transcommit again +// printf("%s -> Waiting for mid : %s transID = %u\n",__func__,midtoIPString(midlist[i]),transID); timeout = recv_data(sd, &control, sizeof(char)); -// printf("i = %d control = %d\n",i,control); - - +// printf("%s -> Received mid : %s control %d timeout = %d\n",__func__,midtoIPString(midlist[i]),control,timeout); //Update common data structure with new ctrl msg getReplyCtrl[i] = control; /* Recv Objects if participant sends TRANS_DISAGREE */ @@ -1406,13 +1419,14 @@ int transCommit() { GETSIZE(size, header); size += sizeof(objheader_t); //make an entry in prefetch hash table - prehashInsert(oidToPrefetch, header); + prehashInsert(oidToPrefetch, header); length = length - size; offset += size; } } //end of receiving objs #endif - + +// printf("%s -> Pass this point2\n",__func__); #ifdef RECOVERY if(timeout < 0) { deadmid = listmid[i]; @@ -1432,12 +1446,13 @@ int transCommit() { #ifdef RECOVERY // wait until leader fix the system - if(okCommit != TRANS_OK) { - inspectTransaction(finalResponse,transID,"transCommit before response",TRANS_BEFORE); + if(finalResponse == RESPOND_HIGHER_EPOCH) { + printf("%s -> Received Higher epoch\n",__func__); finalResponse = TRANS_ABORT; treplyretry = 0; } #endif +// printf("%s -> transID = %u Passed this point\n",__func__,transID); #ifdef CACHE if (finalResponse == TRANS_COMMIT) { @@ -1452,73 +1467,51 @@ int transCommit() { } #endif - /* Send responses to all machines */ - for(i = 0; i < pilecount; i++) { - int sd = socklist[i]; -#ifdef RECOVERY - if(sd != deadsd) { -#endif - if(sd != 0) { -#ifdef CACHE - if(finalResponse == TRANS_COMMIT) { - int retval; - /* Update prefetch cache */ - if((retval = updatePrefetchCache(&(tosend[i]))) != 0) { - printf("Error: %s() in updating prefetch cache %s, %d\n", __func__, __FILE__, __LINE__); - free(tosend); - free(listmid); - return 1; - } - -#ifdef ABORTREADERS - removetransaction(tosend[i].oidmod,tosend[i].f.nummod); - removethisreadtransaction(tosend[i].objread, tosend[i].f.numread); -#endif - } -#ifdef ABORTREADERS - else if (!treplyretry) { - removethistransaction(tosend[i].oidmod,tosend[i].f.nummod); - removethisreadtransaction(tosend[i].objread,tosend[i].f.numread); - } -#endif -#endif - send_data(sd,&finalResponse,sizeof(char)); - } else { - /* Complete local processing */ - finalResponse = doLocalProcess(finalResponse, &(tosend[i]), &transinfo); - -#ifdef ABORTREADERS - if(finalResponse == TRANS_COMMIT) { - removetransaction(tosend[i].oidmod,tosend[i].f.nummod); - removethisreadtransaction(tosend[i].objread,tosend[i].f.numread); - } else if (!treplyretry) { - removethistransaction(tosend[i].oidmod,tosend[i].f.nummod); - removethisreadtransaction(tosend[i].objread,tosend[i].f.numread); - } -#endif - } -#ifdef RECOVERY - } -#endif + if(finalResponse == TRANS_COMMIT) { + pthread_mutex_lock(&translist_mutex); + transList = tlistInsertNode(transList,transID,TRYING_TO_COMMIT,TRYING_TO_COMMIT,epoch_num); + tNode = tlistSearch(transList,transID); + pthread_mutex_unlock(&translist_mutex); + + tNode->decision = finalResponse; + tNode->status = TRANS_INPROGRESS; + if(okCommit == TRANS_OK && inspectEpoch(epoch_num,"TRANS_COMMIT") > 0) + { + finalResponse = tNode->decision; + thashInsert(transID,tNode->decision); + commitMessages(epoch_num,socklist,deadsd,pilecount,tosend,tNode->decision,treplyretry,transinfo); + tNode->status = TRANS_AFTER; + } + else { + tNode->status = TRYING_TO_COMMIT; + if(inspectEpoch(epoch_num,"TRANS_COMMIT2") > 0) { +// treplyretry = 1; + } + finalResponse = TRANS_ABORT; + commitMessages(epoch_num,socklist,deadsd,pilecount,tosend,finalResponse,treplyretry,transinfo); } - - for(i = 0; i< pilecount; i++) { + + //=========== after transaction point + pthread_mutex_lock(&translist_mutex); + transList = tlistRemove(transList,transID); + pthread_mutex_unlock(&translist_mutex); + } + else { + commitMessages(epoch_num,socklist,deadsd,pilecount,tosend,finalResponse,treplyretry,transinfo); + } + + for(i = 0; i< pilecount; i++) { if(socklist[i] > 0) { freeSockWithLock(transRequestSockPool,listmid[i], socklist[i]); } } - - /* Free resources */ - free(tosend); - free(listmid); - if (!treplyretry) - pDelete(pile_ptr); - /* wait a random amount of time before retrying to commit transaction*/ - if(treplyretry) { - //treplyretryCount++; - //if(treplyretryCount >= NUM_TRY_TO_COMMIT) - // exponentialdelay(); - //else + /* Free resources */ + free(tosend); + free(listmid); + if (!treplyretry) + pDelete(pile_ptr); + /* wait a random amount of time before retrying to commit transaction*/ + if(treplyretry) { randomdelay(); #ifdef TRANSSTATS nSoftAbort++; @@ -1527,16 +1520,10 @@ int transCommit() { } while (treplyretry && deadmid != -1); #ifdef RECOVERY - //=========== after transaction point - tlist_node_t* tNode = tlistSearch(transList,transID); - inspectTransaction(finalResponse,transID,"Coordinator",TRANS_AFTER); - tNode->status = TRANS_OK; - finalResponse = tNode->decision; - pthread_mutex_lock(&clearNotifyList_mutex); - transList = tlistRemove(transList,transID); - pthread_mutex_unlock(&clearNotifyList_mutex); + + #endif if(finalResponse == TRANS_ABORT) { @@ -1547,8 +1534,7 @@ int transCommit() { objstrDelete(t_cache); t_chashDelete(); #ifdef RECOVERY - if(deadmid != -1) { /* if deadmid is greater than or equal to 0, - then there is dead machine. */ + if(deadmid != -1) { /* if deadmid is greater than or equal to 0, then there is dead machine. */ notifyLeaderDeadMachine(deadmid); } #endif @@ -1572,6 +1558,59 @@ int transCommit() { return 0; } +void commitMessages(unsigned int epoch_num,int* socklist,unsigned int deadsd,int pilecount,trans_req_data_t* tosend,char finalResponse,char treplyretry,trans_commit_data_t transinfo ) { + int i; + /* Send responses to all machines */ + for(i = 0; i < pilecount; i++) { + int sd = socklist[i]; +#ifdef RECOVERY + if(sd != deadsd) { +#endif + if(sd != 0) { +#ifdef CACHE + if(finalResponse == TRANS_COMMIT) { + int retval; + /* Update prefetch cache */ + if((retval = updatePrefetchCache(&(tosend[i]))) != 0) { + printf("Error: %s() in updating prefetch cache %s, %d\n", __func__, __FILE__, __LINE__); +// free(tosend); + // free(listmid); + exit(0); +// return 1; + } +#ifdef ABORTREADERS + removetransaction(tosend[i].oidmod,tosend[i].f.nummod); + removethisreadtransaction(tosend[i].objread, tosend[i].f.numread); +#endif + } +#ifdef ABORTREADERS + else if (!treplyretry) { + removethistransaction(tosend[i].oidmod,tosend[i].f.nummod); + removethisreadtransaction(tosend[i].objread,tosend[i].f.numread); + } +#endif +#endif + send_data(sd,&finalResponse,sizeof(char)); + send_data(sd,&epoch_num,sizeof(unsigned int)); + } else { + /* Complete local processing */ + finalResponse = doLocalProcess(finalResponse, &(tosend[i]), &transinfo); +#ifdef ABORTREADERS + if(finalResponse == TRANS_COMMIT) { + removetransaction(tosend[i].oidmod,tosend[i].f.nummod); + removethisreadtransaction(tosend[i].objread,tosend[i].f.numread); + } else if (!treplyretry) { + removethistransaction(tosend[i].oidmod,tosend[i].f.nummod); + removethisreadtransaction(tosend[i].objread,tosend[i].f.numread); + } +#endif + } +#ifdef RECOVERY + } +#endif + } +} + /* This function handles the local objects involved in a transaction * commiting process. It also makes a decision if this local machine * sends AGREE or DISAGREE or SOFT_ABORT to coordinator */ @@ -1632,12 +1671,6 @@ void handleLocalReq(trans_req_data_t *tdata, trans_commit_data_t *transinfo, cha } char doLocalProcess(char finalResponse, trans_req_data_t *tdata, trans_commit_data_t *transinfo) { - -#ifdef RECOVERY - finalResponse = inspectTransaction(finalResponse,tdata->f.transid,"Local Commit",TRANS_BEFORE); - thashInsert(tdata->f.transid,finalResponse); -#endif - if(finalResponse == TRANS_ABORT) { if(transAbortProcess(transinfo) != 0) { printf("Error in transAbortProcess() %s,%d\n", __FILE__, __LINE__); @@ -1651,7 +1684,7 @@ char doLocalProcess(char finalResponse, trans_req_data_t *tdata, trans_commit_da return; } } else { - printf("ERROR...No Decision\n"); + printf("%s -> ERROR...No Decision transID = %u finalResponse = %d\a\n",__func__,tdata->f.transid,finalResponse); } @@ -1671,6 +1704,8 @@ char doLocalProcess(char finalResponse, trans_req_data_t *tdata, trans_commit_da char decideResponse(char *getReplyCtrl, char *treplyretry, int pilecount) { int i, transagree = 0, transdisagree = 0, transsoftabort = 0; /* Counters to formulate decision of what message to send */ + + int higher_epoch_num=0; for (i = 0 ; i < pilecount; i++) { char control; control = getReplyCtrl[i]; @@ -1701,9 +1736,19 @@ char decideResponse(char *getReplyCtrl, char *treplyretry, int pilecount) { printf("%s-> Participant sent TRANS_SOFT_ABORT, i:%d, Control: %d\n", __func__, i, (int)control); #endif break; + case RESPOND_HIGHER_EPOCH: + higher_epoch_num++; +#ifdef DEBUG + printf("%s-> Participant sent TRANS_DISAGREE, i:%d, Control: %d\n", __func__, i, (int)control); +#endif + break; } } + if(higher_epoch_num > 0) + return RESPOND_HIGHER_EPOCH; + + if(transdisagree > 0) { /* Send Abort */ *treplyretry = 0; @@ -1791,7 +1836,7 @@ void notifyLeaderDeadMachine(unsigned int deadHost) { unsigned int epoch_num; if(!liveHosts[findHost(deadHost)]) { // if it is already fixed - printf("%s -> already fixed\n",__func__); +// printf("%s -> already fixed\n",__func__); sleep(WAIT_TIME); return; } @@ -1807,6 +1852,7 @@ void notifyLeaderDeadMachine(unsigned int deadHost) { // increase epoch number by number machines in the system pthread_mutex_lock(&recovery_mutex); epoch_num = currentEpoch = INCREASE_EPOCH(currentEpoch,numHostsInSystem,myIndexInHostArray); + okCommit = TRANS_BEFORE; pthread_mutex_unlock(&recovery_mutex); // notify all machines that this machien will act as leader. @@ -1817,13 +1863,12 @@ void notifyLeaderDeadMachine(unsigned int deadHost) { /* Leader's role */ void restoreDuplicationState(unsigned int deadHost,unsigned int epoch_num) { - printf("%s -> Entering\n",__func__); int* sdlist; tlist_t* tList; int flag = 0; #ifdef RECOVERYSTATS - printf("Recovery Start\n"); +// printf("Recovery Start\n"); long long st; long long fi; unsigned int dupeSize = 0; // to calculate the size of backed up data @@ -1836,12 +1881,17 @@ void restoreDuplicationState(unsigned int deadHost,unsigned int epoch_num) do { sdlist = getSocketLists(); - printf("%s -> I'm currently leader num : %d\n\n",__func__,epoch_num); + printf("%s -> I'm currently leader num : %d ping machines\n\n",__func__,epoch_num); if((flag = pingMachines(epoch_num,sdlist,&tList)) < 0) break; - printf("%s -> I'm currently leader num : %d\n\n",__func__,epoch_num); + pthread_mutex_lock(&translist_mutex); + tlistPrint(tList); + pthread_mutex_unlock(&translist_mutex); + getchar(); + printf("%s -> I'm currently leader num : %d releaseing new lists\n\n",__func__,epoch_num); if((flag = releaseNewLists(epoch_num,sdlist,tList)) < 0) break; - + getchar(); + printf("%s -> I'm currently leader num : %d duplicate objects\n\n",__func__,epoch_num); // transfer lost objects if((flag= duplicateLostObjects(epoch_num,sdlist)) < 0) break; @@ -1877,7 +1927,6 @@ void restoreDuplicationState(unsigned int deadHost,unsigned int epoch_num) printRecoveryStat(); #endif } - printf("%s -> Exiting\n",__func__); } int* getSocketLists() @@ -1957,7 +2006,7 @@ int pingMachines(unsigned int epoch_num,int* sdlist,tlist_t** tList) char response; tlist_t* currentTransactionList; - if(inspectEpoch(epoch_num) < 0) { + if(inspectEpoch(epoch_num,__func__) < 0) { printf("%s -> Higher Epoch\n",__func__); return -1; } @@ -1970,47 +2019,72 @@ int pingMachines(unsigned int epoch_num,int* sdlist,tlist_t** tList) if(sdlist[i] == -1 || hostIpAddrs[i] == myIpAddr) continue; + printf("%s -> sending request_trans_wait to %s\n",__func__,midtoIPString(hostIpAddrs[i])); request = REQUEST_TRANS_WAIT; send_data(sdlist[i],&request, sizeof(char)); send_data(sdlist[i],&epoch_num,sizeof(unsigned int)); send_data(sdlist[i],&myIndexInHostArray,sizeof(unsigned int)); } + printf("%s -> Stop transaction\n",__func__); /* stop all local transactions */ - stopTransactions(TRANS_BEFORE); + if(stopTransactions(TRANS_BEFORE,epoch_num) < 0) + return -1; + printf("After Stop transaction\n"); // grab leader's transaction list first tlist_node_t* walker = transList->head; - + while(walker) { - walker->status = TRANS_OK; - currentTransactionList = tlistInsertNode2(currentTransactionList,walker); + pthread_mutex_lock(&translist_mutex); + currentTransactionList = tlistInsertNode2(currentTransactionList,walker,epoch_num); + pthread_mutex_unlock(&translist_mutex); walker = walker->next; } +// printf("%s -> Local Transactions\n",__func__); +// tlistPrint(currentTransactionList); + for(i = 0; i < numHostsInSystem; i++) { if(sdlist[i] == -1 || hostIpAddrs[i] == myIpAddr) continue; + printf("%s -> receving from %s\n",__func__,midtoIPString(hostIpAddrs[i])); if(recv_data(sdlist[i],&response,sizeof(char)) < 0) { + printf("Here\n"); + pthread_mutex_lock(&translist_mutex); tlistDestroy(currentTransactionList); + pthread_mutex_unlock(&translist_mutex); return -2; } + printf("recevied response = %d\n",response); if(response == RESPOND_TRANS_WAIT) { + printf("%s -> RESPOND_TRANS_WAIT\n",__func__); + int timeout1 = computeLiveHosts(sdlist[i]); + printf("%s -> received host list\n",__func__); + int timeout2 = makeTransactionLists(¤tTransactionList,sdlist[i],epoch_num); + printf("%s -> received transaction list\n",__func__); // receive live host list // receive transaction list - if(computeLiveHosts(sdlist[i]) < 0 || makeTransactionLists(¤tTransactionList,sdlist[i]) < 0) { + if(timeout1 < 0 || timeout2 < 0) { + pthread_mutex_lock(&translist_mutex); tlistDestroy(currentTransactionList); + pthread_mutex_unlock(&translist_mutex); return -2; } + printf("\n\n\nAfter mid : %s \n",midtoIPString(hostIpAddrs[i])); + tlistPrint(currentTransactionList); } else if(response == RESPOND_HIGHER_EPOCH) { + printf("%s -> RESPOND_HIGHER_EPOCH\n",__func__); + pthread_mutex_lock(&translist_mutex); tlistDestroy(currentTransactionList); + pthread_mutex_unlock(&translist_mutex); return -1; } else { @@ -2029,8 +2103,6 @@ int pingMachines(unsigned int epoch_num,int* sdlist,tlist_t** tList) } walker = walker->next; } - - tlistPrint(currentTransactionList); *tList = currentTransactionList; printf("%s -> Exit\n",__func__); @@ -2060,6 +2132,7 @@ int computeLiveHosts(int sd) int releaseNewLists(unsigned int epoch_num,int* sdlist,tlist_t* tlist) { + printf("%s -> Enter\n",__func__); int i; char response = RELEASE_NEW_LIST; int size; @@ -2067,7 +2140,7 @@ int releaseNewLists(unsigned int epoch_num,int* sdlist,tlist_t* tlist) tlist_node_t* tArray; - if(inspectEpoch(epoch_num) < 0) return -1; + if(inspectEpoch(epoch_num,__func__) < 0) return -1; tArray = tlistToArray(tlist,&size); @@ -2101,7 +2174,8 @@ int releaseNewLists(unsigned int epoch_num,int* sdlist,tlist_t* tlist) printf("%s -> problem\n",__func__); exit(0); } - stopTransactions(TRANS_AFTER); + if(stopTransactions(TRANS_AFTER,epoch_num) < 0) + return -1; } } @@ -2135,12 +2209,13 @@ int releaseNewLists(unsigned int epoch_num,int* sdlist,tlist_t* tlist) // after this fuction // leader knows all the on-going transaction list and their decisions -int makeTransactionLists(tlist_t** tlist,int sd) +int makeTransactionLists(tlist_t** tlist,int sd,unsigned int epoch_num) { tlist_node_t* transArray; tlist_node_t* tmp; tlist_node_t* walker; int j; + int i; int size; // receive all on-going transaction list @@ -2157,6 +2232,12 @@ int makeTransactionLists(tlist_t** tlist,int sd) return -2; } + printf("%s -> Received TransArray\n",__func__); + for(i = 0; i< size; i++) { + printf("ID : %u Decision : %d status : %d\n",transArray[i].transid,transArray[i].decision,transArray[i].status); + } + printf("%s -> End transArray\n",__func__); + // add into currentTransactionList for(j = 0 ; j < size; j ++) { tmp = tlistSearch(*tlist,transArray[j].transid); @@ -2164,7 +2245,9 @@ int makeTransactionLists(tlist_t** tlist,int sd) if(tmp == NULL) { tlist_node_t* tNode = &transArray[j]; tNode->status = TRANS_OK; - *tlist = tlistInsertNode2(*tlist,&(transArray[j])); + + printf("%s -> transid = %u decision = %d\n",__func__,transArray[j].transid,transArray[j].decision); + *tlist = tlistInsertNode2(*tlist,&(transArray[j]),epoch_num); } else { if(tmp->decision == DECISION_LOST && transArray[j].decision != DECISION_LOST) @@ -2222,13 +2305,19 @@ void restartTransactions(unsigned int epoch_num,int* sdlist) } } -int inspectEpoch(unsigned int epoch_num) +int inspectEpoch(unsigned int epoch_num,const char* f) { int flag = 1; + +// printf("%s -> current epoch %u epoch num = %u\n",__func__,currentEpoch,epoch_num); pthread_mutex_lock(&recovery_mutex); if(epoch_num < currentEpoch) { flag = -1; - } + }/* + else if(epoch_num > currentEpoch) { +// printf("%s -> current epoch %u is changed to epoch num = %u\n",f,currentEpoch,epoch_num); +// currentEpoch = epoch_num; + }*/ pthread_mutex_unlock(&recovery_mutex); return flag; @@ -3199,8 +3288,6 @@ int getNumLiveHostsInSystem() { return count; } -// if flag = TRANS_OK, allow transactions -// flag = TRANS_WAIT, stop transactins int updateLiveHostsCommit() { #ifdef DEBUG printf("%s -> Enter\n",__func__); @@ -3311,7 +3398,7 @@ int duplicateLostObjects(unsigned int epoch_num,int *sdlist){ * Backup 26 21,24 */ - if(inspectEpoch(epoch_num) < 0) return -1; + if(inspectEpoch(epoch_num,__func__) < 0) return -1; response = REQUEST_DUPLICATE; diff --git a/Robust/src/Runtime/DSTM/interface_recovery/translist.c b/Robust/src/Runtime/DSTM/interface_recovery/translist.c index 446a78fb..8176fd5e 100644 --- a/Robust/src/Runtime/DSTM/interface_recovery/translist.c +++ b/Robust/src/Runtime/DSTM/interface_recovery/translist.c @@ -11,8 +11,6 @@ tlist_t* tlistCreate() transList->head = NULL; transList->size = 0; - pthread_mutex_init(&(transList->mutex),NULL); - return transList; } @@ -34,17 +32,16 @@ tlist_t* tlistDestroy(tlist_t* transList) } // tlistInsertNode extension -tlist_t* tlistInsertNode2(tlist_t* transList,tlist_node_t* tNode) +tlist_t* tlistInsertNode2(tlist_t* transList,tlist_node_t* tNode,unsigned int epoch_num) { - transList = tlistInsertNode(transList,tNode->transid,tNode->decision,tNode->status); + transList = tlistInsertNode(transList,tNode->transid,tNode->decision,tNode->status,epoch_num); return transList; } // return 0 if success, return -1 if fail -tlist_t* tlistInsertNode(tlist_t* transList,unsigned int transid,char decision,char status) { +tlist_t* tlistInsertNode(tlist_t* transList,unsigned int transid,char decision,char status,unsigned int epoch_num) { // printf("%s -> ADD transID : %u decision %d status %d\n",__func__,transid,decision,status); - pthread_mutex_lock(&(transList->mutex)); tlist_node_t* head = transList->head; if(head == NULL) { @@ -56,13 +53,12 @@ tlist_t* tlistInsertNode(tlist_t* transList,unsigned int transid,char decision,c head->transid = transid; head->decision = decision; head->status = status; + head->epoch_num = epoch_num; head->next = NULL; - //pthread_mutex_lock(&(transList->mutex)); transList->head = head; (transList->size)++; transList->flag = 1; - pthread_mutex_unlock(&(transList->mutex)); return transList; } else { @@ -77,12 +73,12 @@ tlist_t* tlistInsertNode(tlist_t* transList,unsigned int transid,char decision,c tmp->transid = transid; tmp->decision = decision; tmp->status = status; + tmp->epoch_num = epoch_num; tmp->next = transList->head; transList->head = tmp; (transList->size)++; transList->flag = 1; - pthread_mutex_unlock(&(transList->mutex)); return transList; } } @@ -90,7 +86,6 @@ tlist_t* tlistInsertNode(tlist_t* transList,unsigned int transid,char decision,c // return tlist_t if success, return null if cannot find tlist_node_t* tlistSearch(tlist_t* transList,unsigned int transid) { - pthread_mutex_lock(&(transList->mutex)); tlist_node_t* ptr = transList->head; while(ptr != NULL) @@ -100,20 +95,21 @@ tlist_node_t* tlistSearch(tlist_t* transList,unsigned int transid) ptr = ptr->next; } - pthread_mutex_unlock(&(transList->mutex)); return ptr; } tlist_t* tlistRemove(tlist_t* transList,unsigned int transid) { // printf("%s -> REMOVE transID : %u \n",__func__,transid); - pthread_mutex_lock(&(transList->mutex)); int flag = -1; tlist_node_t* tmp; tlist_node_t* ptr = transList->head; tlist_node_t* prev = NULL; + if(transList->head == NULL) + return transList; + /* if it is head */ if(transList->head->transid == transid) { @@ -123,7 +119,6 @@ tlist_t* tlistRemove(tlist_t* transList,unsigned int transid) (transList->size)--; transList->flag = 1; - pthread_mutex_unlock(&(transList->mutex)); return transList; } @@ -140,16 +135,12 @@ tlist_t* tlistRemove(tlist_t* transList,unsigned int transid) (transList->size)--; flag = 0; transList->flag = 1; - pthread_mutex_unlock(&(transList->mutex)); return transList; } prev = ptr; ptr = ptr->next; } - pthread_mutex_unlock(&(transList->mutex)); - printf("%s -> remove Fail!\n",__func__); - return transList; } @@ -171,7 +162,12 @@ tlist_node_t* tlistToArray(tlist_t* transList,int* size) while(walker) { - array[i++] = *walker; + array[i].transid = walker->transid; + array[i].decision = walker->decision; + array[i].status = walker->status; + array[i].epoch_num = walker->epoch_num; + + i++; walker = walker->next; } diff --git a/Robust/src/Runtime/DSTM/interface_recovery/translist.h b/Robust/src/Runtime/DSTM/interface_recovery/translist.h index 5c4da451..9364fb4d 100644 --- a/Robust/src/Runtime/DSTM/interface_recovery/translist.h +++ b/Robust/src/Runtime/DSTM/interface_recovery/translist.h @@ -12,7 +12,8 @@ /* for machine flag */ #define TRANS_OK 3 #define TRANS_BEFORE 4 -#define TRANS_AFTER 5 +#define TRANS_INPROGRESS 5 +#define TRANS_AFTER 6 /* Status @@ -26,6 +27,7 @@ typedef struct trans_list_node { unsigned int transid; char decision; char status; + unsigned int epoch_num; struct trans_list_node *next; } tlist_node_t; @@ -34,7 +36,6 @@ typedef struct trans_list tlist_node_t *head; int size; int flag; - pthread_mutex_t mutex; } tlist_t; // allocate tlist_t, return -1 if memory overflow @@ -42,8 +43,8 @@ tlist_t* tlistCreate(); tlist_t* tlistDestroy(tlist_t*); // return 0 if success, return -1 if fail -tlist_t* tlistInsertNode(tlist_t* transList,unsigned int transid,char decision,char status); -tlist_t* tlistInsertNode2(tlist_t* transList,tlist_node_t* tNode) ; +tlist_t* tlistInsertNode(tlist_t* transList,unsigned int transid,char decision,char status,unsigned int epoch_num); +tlist_t* tlistInsertNode2(tlist_t* transList,tlist_node_t* tNode,unsigned int epoch_num) ; // remove node. // return 0 if success, return -1 if fail -- 2.34.1