From 2d788801359e7c6e3b49ca6c7692b23a31df8180 Mon Sep 17 00:00:00 2001 From: jihoonl Date: Wed, 7 Apr 2010 00:55:20 +0000 Subject: [PATCH] two phase recovery --- .../Runtime/DSTM/interface_recovery/dstm.h | 4 +- .../DSTM/interface_recovery/dstmserver.c | 42 ++++++------ .../Runtime/DSTM/interface_recovery/trans.c | 65 +++++++++++-------- 3 files changed, 64 insertions(+), 47 deletions(-) diff --git a/Robust/src/Runtime/DSTM/interface_recovery/dstm.h b/Robust/src/Runtime/DSTM/interface_recovery/dstm.h index 2800d8ab..dbb8abfb 100644 --- a/Robust/src/Runtime/DSTM/interface_recovery/dstm.h +++ b/Robust/src/Runtime/DSTM/interface_recovery/dstm.h @@ -289,11 +289,11 @@ unsigned int updateLiveHosts(); void updateLiveHostsList(int mid); int updateLiveHostsCommit(); void receiveNewHostLists(int accept); -void stopTransactions(); +void stopTransactions(int TRANS_FLAG); void sendTransList(int acceptfd); void receiveTransList(int acceptfd); int combineTransactionList(tlist_node_t* tArray,int size); -char inspectTransaction(char control,unsigned int transid); +char inspectTransaction(char control,unsigned int transid,char* debug,int TRANS_FLAG); void respondToLeader(); void setLocateObjHosts(); diff --git a/Robust/src/Runtime/DSTM/interface_recovery/dstmserver.c b/Robust/src/Runtime/DSTM/interface_recovery/dstmserver.c index 5a93f44c..3b071e46 100644 --- a/Robust/src/Runtime/DSTM/interface_recovery/dstmserver.c +++ b/Robust/src/Runtime/DSTM/interface_recovery/dstmserver.c @@ -538,7 +538,7 @@ void *dstmAccept(void *acceptfd) { #ifdef RECOVERY case REQUEST_TRANS_WAIT: receiveNewHostLists((int)acceptfd); - stopTransactions(); + stopTransactions(TRANS_BEFORE); response = RESPOND_TRANS_WAIT; send_data((int)acceptfd,&response,sizeof(char)); @@ -557,6 +557,10 @@ void *dstmAccept(void *acceptfd) { printf("control -> REQUEST_TRANS_LIST\n"); sendTransList((int)acceptfd); receiveTransList((int)acceptfd); + + pthread_mutex_lock(&liveHosts_mutex); + okCommit = TRANS_AFTER; + pthread_mutex_unlock(&liveHosts_mutex); break; case REQUEST_TRANS_RESTART: @@ -960,6 +964,8 @@ int processClientReq(fixed_data_t *fixed, trans_commit_data_t *transinfo, return 1; } +// printf("%s -> Waiting for transID : %u\n",__func__,fixed->transid); + int timeout = recv_data((int)acceptfd, &control, sizeof(char)); #ifdef RECOVERY @@ -967,7 +973,7 @@ int processClientReq(fixed_data_t *fixed, trans_commit_data_t *transinfo, control = -1; } // check if it is allowed to commit - control = inspectTransaction(control,fixed->transid); + control = inspectTransaction(control,fixed->transid,"processClientReq",TRANS_BEFORE); thashInsert(fixed->transid, control); #endif @@ -1029,11 +1035,14 @@ int processClientReq(fixed_data_t *fixed, trans_commit_data_t *transinfo, tlist_node_t* tNode = tlistSearch(transList,fixed->transid); tNode->status = TRANS_OK; + inspectTransaction(control,fixed->transid,"processClientReq",TRANS_AFTER); pthread_mutex_lock(&clearNotifyList_mutex); transList = tlistRemove(transList,fixed->transid); pthread_mutex_unlock(&clearNotifyList_mutex); + // ====================after transaction point + #endif /* Free memory */ @@ -1845,15 +1854,15 @@ void receiveNewHostLists(int acceptfd) } /* wait until all transaction waits for leader's decision */ -void stopTransactions() +void stopTransactions(int TRANS_FLAG) { - printf("%s - > Enter\n",__func__); +// printf("%s - > Enter flag :%d\n",__func__,TRANS_FLAG); int size = transList->size; int i; tlist_node_t* walker; pthread_mutex_lock(&liveHosts_mutex); - okCommit = TRANS_WAIT; + okCommit = TRANS_FLAG; pthread_mutex_unlock(&liveHosts_mutex); /* make sure that all transactions are stopped */ @@ -1866,7 +1875,7 @@ void stopTransactions() while(walker) { // locking - while(!(walker->status == TRANS_WAIT || walker->status == TRANS_OK)) { + while(!(walker->status == TRANS_FLAG || walker->status == TRANS_OK)) { printf("%s -> Waiting for %u - Status : %d tHash : %d\n",__func__,walker->transid,walker->status,thashSearch(walker->transid)); sleep(2); } @@ -1876,7 +1885,7 @@ void stopTransactions() }while(transList->flag == 1); pthread_mutex_unlock(&clearNotifyList_mutex); - printf("%s - > Exit\n",__func__); +// printf("%s - > Exit\n",__func__); } void sendTransList(int acceptfd) @@ -1913,12 +1922,10 @@ void sendTransList(int acceptfd) } free(transArray); - printf("%s - > Exit\n",__func__); } void receiveTransList(int acceptfd) { - printf("%s -> Enter\n",__func__); int size; tlist_node_t* tArray; tlist_node_t* walker; @@ -1928,7 +1935,6 @@ void receiveTransList(int acceptfd) recv_data((int)acceptfd,&size,sizeof(int)); - printf("%s -> size : %d\n",__func__,size); if(size > 0) { if((tArray = calloc(size,sizeof(tlist_node_t) * size)) == NULL) @@ -1954,11 +1960,7 @@ void receiveTransList(int acceptfd) response = -1; } - printf("%s -> response : %d\n",__func__,response); - send_data((int)acceptfd,&response,sizeof(char)); - - printf("%s -> End\n",__func__); } @@ -1976,6 +1978,7 @@ int combineTransactionList(tlist_node_t* tArray,int size) if(walker->transid == tArray[i].transid) { walker->decision = tArray[i].decision; + walker->status = tArray[i].status; break; } } @@ -1985,13 +1988,13 @@ int combineTransactionList(tlist_node_t* tArray,int size) return flag; } -char inspectTransaction(char finalResponse,unsigned int transid) +char inspectTransaction(char finalResponse,unsigned int transid,char* debug,int TRANS_FLAG) { tlist_node_t* tNode; tNode = tlistSearch(transList,transid); - if(finalResponse < 0) { + if(finalResponse <= 0) { tNode->decision = DECISION_LOST; } else { @@ -2001,11 +2004,12 @@ char inspectTransaction(char finalResponse,unsigned int transid) if(!((tNode->decision != DECISION_LOST) && (okCommit == TRANS_OK))) { pthread_mutex_lock(&liveHosts_mutex); - tNode->status = TRANS_WAIT; + tNode->status = TRANS_FLAG; pthread_mutex_unlock(&liveHosts_mutex); - while(!((tNode->decision != DECISION_LOST) && (okCommit == TRANS_OK))) { - printf("%s -> transID : %u decision : %d is waiting\n",__func__,tNode->transid,tNode->decision); + // if decision is not lost and okCommit is not TRANS_FLAG, get out of this loop + while(!((tNode->decision != DECISION_LOST) && (okCommit != TRANS_FLAG))) { + printf("%s -> transID : %u decision : %d is waiting flag : %d\n",debug,tNode->transid,tNode->decision,TRANS_FLAG); sleep(3); } diff --git a/Robust/src/Runtime/DSTM/interface_recovery/trans.c b/Robust/src/Runtime/DSTM/interface_recovery/trans.c index 6a197c47..e0bcf79d 100644 --- a/Robust/src/Runtime/DSTM/interface_recovery/trans.c +++ b/Robust/src/Runtime/DSTM/interface_recovery/trans.c @@ -1154,7 +1154,7 @@ int transCommit() { #ifdef RECOVERY while(okCommit != TRANS_OK) { - printf("%s -> new Transactin is waiting\n",__func__); +// printf("%s -> new Transactin is waiting\n",__func__); sleep(2); } @@ -1358,11 +1358,9 @@ int transCommit() { #ifdef RECOVERY // wait until leader fix the system + if(okCommit != TRANS_OK) { - while(okCommit != TRANS_OK) { - printf("%s -> Coordinator is waiting finalResponse : %d\n",__func__,finalResponse); - sleep(1); - } + inspectTransaction(finalResponse,transID,"transCommit before response",TRANS_AFTER); finalResponse = TRANS_ABORT; } #endif @@ -1452,8 +1450,13 @@ int transCommit() { } while (treplyretry && deadmid != -1); #ifdef RECOVERY + + //=========== after transaction point tlist_node_t* tNode = tlistSearch(transList,transID); + inspectTransaction(finalResponse,transID,"Coordinator",TRANS_AFTER); + tNode->status = TRANS_OK; + finalResponse = tNode->decision; pthread_mutex_lock(&clearNotifyList_mutex); transList = tlistRemove(transList,transID); @@ -1555,7 +1558,7 @@ void handleLocalReq(trans_req_data_t *tdata, trans_commit_data_t *transinfo, cha char doLocalProcess(char finalResponse, trans_req_data_t *tdata, trans_commit_data_t *transinfo) { #ifdef RECOVERY - finalResponse = inspectTransaction(finalResponse,tdata->f.transid); + finalResponse = inspectTransaction(finalResponse,tdata->f.transid,"Local Commit",TRANS_BEFORE); thashInsert(tdata->f.transid,finalResponse); #endif @@ -1833,10 +1836,10 @@ void restoreDuplicationState(unsigned int deadHost) // clear transaction clearTransaction(); +// getchar(); // transfer lost objects duplicateLostObjects(deadHost); - getchar(); // restart transactions restartTransactions(); @@ -1860,8 +1863,6 @@ void notifyRestoration() int sd; int sdlist[numHostsInSystem]; - printf("%s -> Enter\n",__func__); - printHostsStatus(); pthread_mutex_lock(&liveHosts_mutex); @@ -1909,8 +1910,7 @@ void notifyRestoration() } } /* stop all local transactions */ - stopTransactions(); - printf("%s -> End\n",__func__); + stopTransactions(TRANS_BEFORE); } /* acknowledge leader that all transactions are waiting */ @@ -1991,6 +1991,8 @@ void clearTransaction() returns an array of ongoing transactions */ makeTransactionLists(&tlist,sdlist); +// getchar(); + /* release the cleared decisions to all machines */ releaseTransactionLists(tlist,sdlist); @@ -2001,8 +2003,7 @@ void clearTransaction() } tlistDestroy(tlist); - - printf("%s -> End\n",__func__); + printf("%s -> End\n",__func__); } // after this fuction @@ -2022,7 +2023,8 @@ void makeTransactionLists(tlist_t** tlist,int* sdlist) tlist_node_t* walker = transList->head; while(walker) { - tlistInsertNode2(currentTransactionList,walker); + walker->status = TRANS_OK; + currentTransactionList = tlistInsertNode2(currentTransactionList,walker); walker = walker->next; } @@ -2056,24 +2058,30 @@ void makeTransactionLists(tlist_t** tlist,int* sdlist) tmp = tlistSearch(currentTransactionList,transArray[j].transid); if(tmp == NULL) { + tlist_node_t* tNode = &transArray[j]; + tNode->status = TRANS_OK; currentTransactionList = tlistInsertNode2(currentTransactionList,&(transArray[j])); } else { - if(tmp->decision == DECISION_LOST) + if((tmp->decision != TRANS_COMMIT && tmp->decision != TRANS_ABORT) + && (transArray[j].decision == TRANS_COMMIT || transArray[j].decision == TRANS_ABORT)) { tmp->decision = transArray[j].decision; - } + } } } // j loop } } // i loop + + printf("Before\n"); + tlistPrint(currentTransactionList); // current transaction list is completed // now see if any transaction is still missing walker = currentTransactionList->head; while(walker) { - if(walker->decision == DECISION_LOST) { +// if(walker->decision == DECISION_LOST) { for(i = 0 ; i < numHostsInSystem; i++) { if(sdlist[i] != -1 && hostIpAddrs[i] != myIpAddr) { @@ -2105,7 +2113,9 @@ void makeTransactionLists(tlist_t** tlist,int* sdlist) printf("%s -> No one knows decision for transID : %u\n",__func__,walker->transid); walker->decision = TRANS_ABORT; } - } + if(walker->decision == TRYING_TO_COMMIT) { + printf("%s -> no decision yet transID : %u\n",__func__,walker->transid); + } walker = walker->next; } // while loop @@ -2120,6 +2130,7 @@ void makeTransactionLists(tlist_t** tlist,int* sdlist) } *tlist = currentTransactionList; + printf("\n\nAfter\n"); tlistPrint(currentTransactionList); printf("%s -> End\n",__func__); @@ -2141,8 +2152,6 @@ void releaseTransactionLists(tlist_t* tlist,int* sdlist) { if(sdlist[i] != -1 && hostIpAddrs[i] != myIpAddr) { -// printf("%s -> Sent to sd : %d\n",__func__,sdlist[i]); - if(size == 0) { size = -1; send_data(sdlist[i],&size,sizeof(int)); @@ -2159,6 +2168,11 @@ void releaseTransactionLists(tlist_t* tlist,int* sdlist) printf("%s -> problem\n",__func__); exit(0); } + + pthread_mutex_lock(&liveHosts_mutex); + okCommit = TRANS_AFTER; + pthread_mutex_unlock(&liveHosts_mutex); + } } @@ -2185,7 +2199,6 @@ void restartTransactions() { int i; int sd; - printf("%s -> Enter\n",__func__); for(i = 0; i < numHostsInSystem; i++) { if(hostIpAddrs[i] == myIpAddr) { pthread_mutex_lock(&liveHosts_mutex); @@ -2209,7 +2222,6 @@ void restartTransactions() } } } - printf("%s -> End\n",__func__); } #endif @@ -3316,6 +3328,7 @@ int allHostsLive() { #ifdef RECOVERY void duplicateLostObjects(unsigned int mid){ + printf("%s -> Enter\n",__func__); #ifdef RECOVERYSTATS unsigned int dupeSize = 0; @@ -3346,8 +3359,8 @@ void duplicateLostObjects(unsigned int mid){ * Backup 26 21,24 */ - if(((psd = getSockWithLock(transRequestSockPool, originalMid)) < 0 ) || - ((bsd = getSockWithLock(transRequestSockPool,backupMid)) <0)) { + if(((psd = getSockWithLock(transPrefetchSockPool, originalMid)) < 0 ) || + ((bsd = getSockWithLock(transPrefetchSockPool,backupMid)) <0)) { printf("%s -> psd : %d bsd : %d\n",__func__,psd,bsd); printf("%s -> Socket create error\n",__func__); @@ -3390,8 +3403,8 @@ void duplicateLostObjects(unsigned int mid){ exit(0); } - freeSockWithLock(transRequestSockPool, originalMid, psd); - freeSockWithLock(transRequestSockPool, backupMid, bsd); + freeSockWithLock(transPrefetchSockPool, originalMid, psd); + freeSockWithLock(transPrefetchSockPool, backupMid, bsd); #ifdef RECOVERYSTATS recoverStat[numRecovery-1].recoveredData = dupeSize; -- 2.34.1