From cd01e10f44dc5b0005e49ae5f8b72c3516ebf017 Mon Sep 17 00:00:00 2001 From: jihoonl Date: Thu, 1 Apr 2010 05:38:34 +0000 Subject: [PATCH] still fixing --- .../Runtime/DSTM/interface_recovery/dstm.h | 1 + .../DSTM/interface_recovery/dstmserver.c | 57 ++++++++++++------- .../Runtime/DSTM/interface_recovery/trans.c | 8 ++- 3 files changed, 41 insertions(+), 25 deletions(-) diff --git a/Robust/src/Runtime/DSTM/interface_recovery/dstm.h b/Robust/src/Runtime/DSTM/interface_recovery/dstm.h index 07f1bb47..a5e9b6f9 100644 --- a/Robust/src/Runtime/DSTM/interface_recovery/dstm.h +++ b/Robust/src/Runtime/DSTM/interface_recovery/dstm.h @@ -304,6 +304,7 @@ void stopTransactions(); void sendTransList(int acceptfd); void receiveTransList(int acceptfd); int combineTransactionList(tlist_node_t* tArray,int size); +char inspectTransaction(char control,unsigned int transid); void respondToLeader(); void setLocateObjHosts(); diff --git a/Robust/src/Runtime/DSTM/interface_recovery/dstmserver.c b/Robust/src/Runtime/DSTM/interface_recovery/dstmserver.c index 30ad3bea..b43beb15 100644 --- a/Robust/src/Runtime/DSTM/interface_recovery/dstmserver.c +++ b/Robust/src/Runtime/DSTM/interface_recovery/dstmserver.c @@ -1007,32 +1007,13 @@ int processClientReq(fixed_data_t *fixed, trans_commit_data_t *transinfo, int timeout = recv_data((int)acceptfd, &control, sizeof(char)); #ifdef RECOVERY - tlist_node_t* tNode; - tNode = tlistSearch(transList,fixed->transid); - if(timeout < 0) { // timeout. failed to receiving data from coordinator - tNode->decision = DECISION_LOST; - printf("%s -> DECISON_LOST! control = %d\n",__func__,control); + control = -1; } - else - tNode->decision = control; - // check if it is allowed to commit - if(!((tNode->decision != DECISION_LOST) && (okCommit == TRANS_OK))) - { - pthread_mutex_lock(&liveHosts_mutex); - tNode->status = TRANS_WAIT; - pthread_mutex_unlock(&liveHosts_mutex); - - while(!((tNode->decision != DECISION_LOST) && (okCommit == TRANS_OK))) { - printf("%s -> transID : %u decision : %d is waiting\n",__func__,tNode->transid,tNode->decision); - sleep(1); - } - } - - control = tNode->decision; - + control = inspectTransaction(control,fixed->transid); thashInsert(fixed->transid, control); + #endif switch(control) { @@ -1089,6 +1070,8 @@ int processClientReq(fixed_data_t *fixed, trans_commit_data_t *transinfo, #ifdef RECOVERY // printf("%s -> transID : %u has been committed\n",__func__,transID); + + tlist_node_t* tNode = tlistSearch(transList,fixed->transid); tNode->status = TRANS_OK; pthread_mutex_lock(&clearNotifyList_mutex); @@ -2046,4 +2029,34 @@ int combineTransactionList(tlist_node_t* tArray,int size) return flag; } +char inspectTransaction(char finalResponse,unsigned int transid) +{ + tlist_node_t* tNode; + + tNode = tlistSearch(transList,transid); + + if(finalResponse < 0) { + tNode->decision = DECISION_LOST; + } + else { + tNode->decision = finalResponse; + } + + if(!((tNode->decision != DECISION_LOST) && (okCommit == TRANS_OK))) + { + pthread_mutex_lock(&liveHosts_mutex); + tNode->status = TRANS_WAIT; + pthread_mutex_unlock(&liveHosts_mutex); + + while(!((tNode->decision != DECISION_LOST) && (okCommit == TRANS_OK))) { + printf("%s -> transID : %u decision : %d is waiting\n",__func__,tNode->transid,tNode->decision); + sleep(3); + } + + finalResponse = tNode->decision; + } + + return finalResponse; +} + #endif diff --git a/Robust/src/Runtime/DSTM/interface_recovery/trans.c b/Robust/src/Runtime/DSTM/interface_recovery/trans.c index 039477d1..5636064b 100644 --- a/Robust/src/Runtime/DSTM/interface_recovery/trans.c +++ b/Robust/src/Runtime/DSTM/interface_recovery/trans.c @@ -1419,9 +1419,6 @@ int transCommit() { } else { /* Complete local processing */ -#ifdef RECOVERY - thashInsert(transID,finalResponse); -#endif doLocalProcess(finalResponse, &(tosend[i]), &transinfo); #ifdef ABORTREADERS @@ -1565,6 +1562,11 @@ void handleLocalReq(trans_req_data_t *tdata, trans_commit_data_t *transinfo, cha void doLocalProcess(char finalResponse, trans_req_data_t *tdata, trans_commit_data_t *transinfo) { +#ifdef RECOVERY + finalResponse = inspectTransaction(finalResponse,tdata->f.transid); + thashInsert(tdata->f.transid,finalResponse); +#endif + if(finalResponse == TRANS_ABORT) { if(transAbortProcess(transinfo) != 0) { printf("Error in transAbortProcess() %s,%d\n", __FILE__, __LINE__); -- 2.34.1