From cee554294359a867028fcfe8974baa759e102615 Mon Sep 17 00:00:00 2001 From: jihoonl Date: Thu, 8 Apr 2010 06:10:16 +0000 Subject: [PATCH] transaction clearing done. need to test with more benchmarks --- .../Runtime/DSTM/interface_recovery/dstm.h | 2 +- .../DSTM/interface_recovery/dstmserver.c | 43 ++++++++----------- .../Runtime/DSTM/interface_recovery/mlookup.c | 2 +- .../Runtime/DSTM/interface_recovery/trans.c | 17 ++++---- 4 files changed, 27 insertions(+), 37 deletions(-) diff --git a/Robust/src/Runtime/DSTM/interface_recovery/dstm.h b/Robust/src/Runtime/DSTM/interface_recovery/dstm.h index dbb8abfb..fd0bbd36 100644 --- a/Robust/src/Runtime/DSTM/interface_recovery/dstm.h +++ b/Robust/src/Runtime/DSTM/interface_recovery/dstm.h @@ -291,7 +291,7 @@ int updateLiveHostsCommit(); void receiveNewHostLists(int accept); void stopTransactions(int TRANS_FLAG); void sendTransList(int acceptfd); -void receiveTransList(int acceptfd); +int receiveTransList(int acceptfd); int combineTransactionList(tlist_node_t* tArray,int size); char inspectTransaction(char control,unsigned int transid,char* debug,int TRANS_FLAG); diff --git a/Robust/src/Runtime/DSTM/interface_recovery/dstmserver.c b/Robust/src/Runtime/DSTM/interface_recovery/dstmserver.c index 3b071e46..8e82efcb 100644 --- a/Robust/src/Runtime/DSTM/interface_recovery/dstmserver.c +++ b/Robust/src/Runtime/DSTM/interface_recovery/dstmserver.c @@ -556,11 +556,10 @@ void *dstmAccept(void *acceptfd) { case REQUEST_TRANS_LIST: printf("control -> REQUEST_TRANS_LIST\n"); sendTransList((int)acceptfd); - receiveTransList((int)acceptfd); - - pthread_mutex_lock(&liveHosts_mutex); - okCommit = TRANS_AFTER; - pthread_mutex_unlock(&liveHosts_mutex); + response = receiveTransList((int)acceptfd); + stopTransactions(TRANS_AFTER); + + send_data((int)acceptfd,&response,sizeof(char)); break; case REQUEST_TRANS_RESTART: @@ -1031,18 +1030,15 @@ int processClientReq(fixed_data_t *fixed, trans_commit_data_t *transinfo, } #ifdef RECOVERY -// printf("%s -> transID : %u has been committed\n",__func__,transID); + inspectTransaction(control,fixed->transid,"processClientReq",TRANS_AFTER); tlist_node_t* tNode = tlistSearch(transList,fixed->transid); tNode->status = TRANS_OK; - inspectTransaction(control,fixed->transid,"processClientReq",TRANS_AFTER); pthread_mutex_lock(&clearNotifyList_mutex); transList = tlistRemove(transList,fixed->transid); pthread_mutex_unlock(&clearNotifyList_mutex); - // ====================after transaction point - #endif /* Free memory */ @@ -1329,12 +1325,6 @@ char getCommitCountForObjRead(unsigned int *oidnotfound, unsigned int *oidlocked (*objnotfound)++; *control = TRANS_DISAGREE; } else { /* If Obj found in machine (i.e. has not moved) */ -#ifdef DEBUG - printf("%s -> Obj found!!\n",__func__); - printf("%s -> Obj found: oid = %d, type = %d\t\n", __func__, OID(mobj), TYPE((objheader_t *)mobj)); - fflush(stdout); -#endif - /* Check if Obj is locked by any previous transaction */ if (read_trylock(STATUSPTR(mobj))) { //Can further acquire read locks if (version == ((objheader_t *)mobj)->version) { /* match versions */ @@ -1864,8 +1854,8 @@ void stopTransactions(int TRANS_FLAG) pthread_mutex_lock(&liveHosts_mutex); okCommit = TRANS_FLAG; pthread_mutex_unlock(&liveHosts_mutex); - /* make sure that all transactions are stopped */ + /* make sure that all transactions are stopped */ pthread_mutex_lock(&clearNotifyList_mutex); do { @@ -1876,8 +1866,8 @@ void stopTransactions(int TRANS_FLAG) { // locking while(!(walker->status == TRANS_FLAG || walker->status == TRANS_OK)) { - printf("%s -> Waiting for %u - Status : %d tHash : %d\n",__func__,walker->transid,walker->status,thashSearch(walker->transid)); - sleep(2); +// printf("%s -> Waiting for %u - Status : %d tHash : %d\n",__func__,walker->transid,walker->status,thashSearch(walker->transid)); + randomdelay(); } walker = walker->next; @@ -1885,7 +1875,6 @@ void stopTransactions(int TRANS_FLAG) }while(transList->flag == 1); pthread_mutex_unlock(&clearNotifyList_mutex); -// printf("%s - > Exit\n",__func__); } void sendTransList(int acceptfd) @@ -1910,7 +1899,7 @@ void sendTransList(int acceptfd) // check if it already commit the decision for a transaction recv_data((int)acceptfd,&response, sizeof(char)); - while(response == REQUEST_TRANS_CHECK) + while(response == REQUEST_TRANS_CHECK && response != REQUEST_TRANS_COMPLETE ) { int transid; recv_data((int)acceptfd,&transid, sizeof(unsigned int)); @@ -1924,7 +1913,7 @@ void sendTransList(int acceptfd) free(transArray); } -void receiveTransList(int acceptfd) +int receiveTransList(int acceptfd) { int size; tlist_node_t* tArray; @@ -1960,7 +1949,7 @@ void receiveTransList(int acceptfd) response = -1; } - send_data((int)acceptfd,&response,sizeof(char)); + return response; } @@ -1978,7 +1967,7 @@ int combineTransactionList(tlist_node_t* tArray,int size) if(walker->transid == tArray[i].transid) { walker->decision = tArray[i].decision; - walker->status = tArray[i].status; +// walker->status = tArray[i].status; break; } } @@ -2001,7 +1990,9 @@ char inspectTransaction(char finalResponse,unsigned int transid,char* debug,int tNode->decision = finalResponse; } - if(!((tNode->decision != DECISION_LOST) && (okCommit == TRANS_OK))) +// printf("%s -> decision = %d okCommit = %d\n",__func__,tNode->decision,okCommit); + + if((tNode->decision == DECISION_LOST) || (okCommit != TRANS_OK)) { pthread_mutex_lock(&liveHosts_mutex); tNode->status = TRANS_FLAG; @@ -2009,8 +2000,8 @@ char inspectTransaction(char finalResponse,unsigned int transid,char* debug,int // if decision is not lost and okCommit is not TRANS_FLAG, get out of this loop while(!((tNode->decision != DECISION_LOST) && (okCommit != TRANS_FLAG))) { - printf("%s -> transID : %u decision : %d is waiting flag : %d\n",debug,tNode->transid,tNode->decision,TRANS_FLAG); - sleep(3); +// printf("%s -> transID : %u decision : %d is waiting flag : %d\n",debug,tNode->transid,tNode->decision,TRANS_FLAG); + randomdelay(); } finalResponse = tNode->decision; diff --git a/Robust/src/Runtime/DSTM/interface_recovery/mlookup.c b/Robust/src/Runtime/DSTM/interface_recovery/mlookup.c index 786281b1..8e47aa75 100644 --- a/Robust/src/Runtime/DSTM/interface_recovery/mlookup.c +++ b/Robust/src/Runtime/DSTM/interface_recovery/mlookup.c @@ -207,7 +207,7 @@ unsigned int *mhashGetKeys(unsigned int *numKeys) { } #ifdef RECOVERY -void* mhashGetDuplicate(int *dupeSize, int backup) { //how big? +void* mhashGetDuplicate(int *dupeSize, int backup) { #ifdef DEBUG printf("%s-> Start\n", __func__); #endif diff --git a/Robust/src/Runtime/DSTM/interface_recovery/trans.c b/Robust/src/Runtime/DSTM/interface_recovery/trans.c index e0bcf79d..4139144e 100644 --- a/Robust/src/Runtime/DSTM/interface_recovery/trans.c +++ b/Robust/src/Runtime/DSTM/interface_recovery/trans.c @@ -1155,7 +1155,7 @@ int transCommit() { #ifdef RECOVERY while(okCommit != TRANS_OK) { // printf("%s -> new Transactin is waiting\n",__func__); - sleep(2); + randomdelay(); } transList = tlistInsertNode(transList,transID,TRYING_TO_COMMIT,TRANS_OK); @@ -1358,10 +1358,10 @@ int transCommit() { #ifdef RECOVERY // wait until leader fix the system - if(okCommit != TRANS_OK) { - inspectTransaction(finalResponse,transID,"transCommit before response",TRANS_AFTER); + inspectTransaction(finalResponse,transID,"transCommit before response",TRANS_BEFORE); finalResponse = TRANS_ABORT; + treplyretry = 0; } #endif @@ -1836,10 +1836,10 @@ void restoreDuplicationState(unsigned int deadHost) // clear transaction clearTransaction(); -// getchar(); // transfer lost objects duplicateLostObjects(deadHost); + // restart transactions restartTransactions(); @@ -1991,7 +1991,6 @@ void clearTransaction() returns an array of ongoing transactions */ makeTransactionLists(&tlist,sdlist); -// getchar(); /* release the cleared decisions to all machines */ releaseTransactionLists(tlist,sdlist); @@ -2070,6 +2069,8 @@ void makeTransactionLists(tlist_t** tlist,int* sdlist) } } } // j loop + + free(transArray); } } // i loop @@ -2169,10 +2170,8 @@ void releaseTransactionLists(tlist_t* tlist,int* sdlist) exit(0); } - pthread_mutex_lock(&liveHosts_mutex); - okCommit = TRANS_AFTER; - pthread_mutex_unlock(&liveHosts_mutex); - +// okCommit = TRANS_AFTER; + stopTransactions(TRANS_AFTER); } } -- 2.34.1