From fab1dc4a577089eca6c90bdc68d83cda89657c4a Mon Sep 17 00:00:00 2001 From: jihoonl Date: Wed, 29 Jun 2011 06:31:18 +0000 Subject: [PATCH] debugging in progress --- .../DSTM/interface_recovery/dstmserver.c | 15 ++- .../Runtime/DSTM/interface_recovery/trans.c | 109 ++++++++++-------- 2 files changed, 66 insertions(+), 58 deletions(-) diff --git a/Robust/src/Runtime/DSTM/interface_recovery/dstmserver.c b/Robust/src/Runtime/DSTM/interface_recovery/dstmserver.c index c455ec84..eb0836ee 100644 --- a/Robust/src/Runtime/DSTM/interface_recovery/dstmserver.c +++ b/Robust/src/Runtime/DSTM/interface_recovery/dstmserver.c @@ -529,7 +529,6 @@ void *dstmAccept(void *acceptfd) { else { printf("Got new Leader! : %d\n",epoch_num); pthread_mutex_lock(&recovery_mutex); - currentEpoch = epoch_num; okCommit = TRANS_BEFORE; leader_index = new_leader_index; pthread_mutex_unlock(&recovery_mutex); @@ -932,7 +931,7 @@ int processClientReq(fixed_data_t *fixed, trans_commit_data_t *transinfo, // printf("%s -> transID : %u\n",__func__,fixed->transid); if(inspectEpoch(fixed->epoch_num,"procesClient1") < 0) { - printf("%s-> Higher Epoch current epoch = %u epoch %u\n",__func__,currentEpoch,fixed->epoch_num); +// printf("%s-> Higher Epoch current epoch = %u epoch %u\n",__func__,currentEpoch,fixed->epoch_num); control = RESPOND_HIGHER_EPOCH; send_data((int)acceptfd,&control,sizeof(char)); } @@ -947,6 +946,7 @@ int processClientReq(fixed_data_t *fixed, trans_commit_data_t *transinfo, int timeout1 = recv_data((int)acceptfd, &control, sizeof(char)); int timeout2 = recv_data((int)acceptfd, &epoch_num, sizeof(unsigned int)); +// printf("%s -> Received for transID : %u\n",__func__,fixed->transid); if(timeout1 < 0 || timeout2 < 0) { // timeout. failed to receiving data from coordinator control = DECISION_LOST; } @@ -958,7 +958,7 @@ int processClientReq(fixed_data_t *fixed, trans_commit_data_t *transinfo, // check if it is allowed to commit tNode->decision = control; do { - tNode->status = TRANS_INPROGRESS; + tNode->status = TRANS_BEFORE; if(okCommit != TRANS_BEFORE) { if(inspectEpoch(tNode->epoch_num,"processCleint2") > 0) { tNode->status = TRANS_INPROGRESS; @@ -976,11 +976,10 @@ int processClientReq(fixed_data_t *fixed, trans_commit_data_t *transinfo, } else { tNode->status = TRANS_WAIT; -// printf("%s -> Waiting!! \ttransID : %u decision : %d status : %d \n",__func__,tNode->transid,tNode->decision,tNode->status); -// sleep(3); + printf("%s -> Waiting!! \ttransID : %u decision : %d status : %d \n",__func__,tNode->transid,tNode->decision,tNode->status); + sleep(3); randomdelay(); } - }while(tNode->status != TRANS_AFTER); // printf("%s -> trans ID : %u is cleared\n",__func__,tNode->transid); @@ -1882,7 +1881,7 @@ int stopTransactions(int TRANS_FLAG,unsigned int epoch_num) while(walker) { // locking - while(walker->status != TRANS_WAIT && tlistSearch(transList,walker->transid) != NULL) { + while((walker->status != TRYING_TO_COMMIT && walker->status != TRANS_WAIT) && tlistSearch(transList,walker->transid) != NULL) { // printf("%s -> BEFORE transid : %u - decision %d Status : %d \n",__func__,walker->transid,walker->decision,walker->status); if(inspectEpoch(epoch_num,"stopTrans_Before") < 0) { // printf("%s -> Higher Epoch is seen, walker->epoch = %u currentEpoch = %u\n",__func__,epoch_num,currentEpoch); @@ -1919,7 +1918,7 @@ int stopTransactions(int TRANS_FLAG,unsigned int epoch_num) return -1; } -// sleep(5); + // sleep(5); randomdelay(); }while(size != 0); } diff --git a/Robust/src/Runtime/DSTM/interface_recovery/trans.c b/Robust/src/Runtime/DSTM/interface_recovery/trans.c index 5b5d0d61..6b07b3d0 100644 --- a/Robust/src/Runtime/DSTM/interface_recovery/trans.c +++ b/Robust/src/Runtime/DSTM/interface_recovery/trans.c @@ -19,7 +19,6 @@ #include "abortreaders.h" #endif #include "trans.h" -#include "mlp_lock.h" #ifdef RECOVERY #include @@ -113,6 +112,8 @@ char ip[16]; // for debugging purpose extern tlist_t* transList; extern pthread_mutex_t translist_mutex; extern pthread_mutex_t clearNotifyList_mutex; +pthread_mutex_t oidlock; +pthread_mutex_t tidlock; unsigned int currentEpoch; @@ -269,7 +270,7 @@ GDBRECV1: goto GDBRECV1; #endif -#ifdef DEBUG +#ifndef DEBUG printf("%s -> Unexpected ERROR!\n",__func__); printf("%s-> errno = %d %s\n", __func__, errno, strerror(errno)); #endif @@ -281,7 +282,7 @@ GDBRECV1: //Case: numbytes==0 //machine has failed -- this case probably doesn't occur in reality // -#ifdef DEBUG +#ifndef DEBUG printf("%s -> SHOULD NOT BE HERE\n",__func__); #endif return -1; @@ -573,6 +574,10 @@ void transInit() { pthread_mutex_init(&prefetchcache_mutex, &prefetchcache_mutex_attr); pthread_mutex_init(¬ifymutex, NULL); pthread_mutex_init(&atomicObjLock, NULL); +#ifdef RECOVERY + pthread_mutex_init(&oidlock,NULL); + pthread_mutex_init(&tidlock,NULL); +#endif #ifdef CACHE @@ -977,6 +982,7 @@ remoteread: /* This function creates objects in the transaction record */ objheader_t *transCreateObj(unsigned int size) { + pthread_mutex_lock(&oidlock); objheader_t *tmp = (objheader_t *) objstrAlloc(&t_cache, (sizeof(objheader_t) + size)); OID(tmp) = getNewOID(); tmp->notifylist = NULL; @@ -984,6 +990,7 @@ objheader_t *transCreateObj(unsigned int size) { tmp->isBackup = 0; STATUS(tmp) = NEW; t_chashInsert(OID(tmp), tmp); + pthread_mutex_unlock(&oidlock); #ifdef COMPILER return &tmp[1]; //want space after object header #else @@ -1382,7 +1389,14 @@ int transCommit() { pile = pile->next; } //end of pile processing - /* Recv Ctrl msgs from all machines */ + + pthread_mutex_lock(&translist_mutex); + transList = tlistInsertNode(transList,transID,-3,TRYING_TO_COMMIT,epoch_num); + tNode = tlistSearch(transList,transID); + pthread_mutex_unlock(&translist_mutex); + + + /* Recv Ctrl msgs from all machines */ #ifdef DEBUG printf("%s-> Finished sending transaction read/mod objects transID = %u\n",__func__,transID); #endif @@ -1393,7 +1407,7 @@ int transCommit() { if(sd != 0) { char control; int timeout; // a variable to check if the connection is still alive. if it is -1, then need to transcommit again -// printf("%s -> Waiting for mid : %s transID = %u\n",__func__,midtoIPString(midlist[i]),transID); +// printf("%s -> Waiting for mid : %s transID = %u sd = %d\n",__func__,midtoIPString(midlist[i]),transID,sd); timeout = recv_data(sd, &control, sizeof(char)); // printf("%s -> Received mid : %s control %d timeout = %d\n",__func__,midtoIPString(midlist[i]),control,timeout); @@ -1457,13 +1471,14 @@ int transCommit() { printf("%s -> Received Higher epoch\n",__func__); finalResponse = TRANS_ABORT; treplyretry = 0; +// sleep(5); } #endif -// printf("%s -> transID = %u Passed this point\n",__func__,transID); - pthread_mutex_lock(&translist_mutex); - transList = tlistInsertNode(transList,transID,-3,TRYING_TO_COMMIT,epoch_num); - tNode = tlistSearch(transList,transID); - pthread_mutex_unlock(&translist_mutex); + // printf("%s -> transID = %u Passed this point\n",__func__,transID); + + + + #ifdef CACHE if (finalResponse == TRANS_COMMIT) { @@ -1489,7 +1504,6 @@ int transCommit() { tNode->status = TRANS_AFTER; } else { - tNode->status = TRYING_TO_COMMIT; if(inspectEpoch(epoch_num,"TRANS_COMMIT2") > 0) { treplyretry = 1; } @@ -1589,6 +1603,7 @@ void commitMessages(unsigned int epoch_num,int* socklist,unsigned int deadsd,int } #endif #endif +// printf("%s -> Trans Id = %u Sending to sd = %d\n",__func__,tosend[i].f.transid,sd); send_data(sd,&finalResponse,sizeof(char)); send_data(sd,&epoch_num,sizeof(unsigned int)); } else { @@ -1867,7 +1882,7 @@ void restoreDuplicationState(unsigned int deadHost,unsigned int epoch_num) int flag = 0; #ifdef RECOVERYSTATS -// printf("Recovery Start\n"); + printf("Recovery Start dead = %s\n",midtoIPString(deadHost)); long long st; long long fi; unsigned int dupeSize = 0; // to calculate the size of backed up data @@ -1884,13 +1899,13 @@ void restoreDuplicationState(unsigned int deadHost,unsigned int epoch_num) if((flag = pingMachines(epoch_num,sdlist,&tList)) < 0) break; pthread_mutex_lock(&translist_mutex); -// tlistPrint(tList); + tlistPrint(tList); pthread_mutex_unlock(&translist_mutex); // getchar(); -// printf("%s -> I'm currently leader num : %d releaseing new lists\n\n",__func__,epoch_num); + printf("%s -> I'm currently leader num : %d releaseing new lists\n\n",__func__,epoch_num); if((flag = releaseNewLists(epoch_num,sdlist,tList)) < 0) break; // getchar(); -// printf("%s -> I'm currently leader num : %d duplicate objects\n\n",__func__,epoch_num); + printf("%s -> I'm currently leader num : %d duplicate objects\n\n",__func__,epoch_num); // transfer lost objects if((flag= duplicateLostObjects(epoch_num,sdlist)) < 0) break; @@ -2018,19 +2033,19 @@ int pingMachines(unsigned int epoch_num,int* sdlist,tlist_t** tList) if(sdlist[i] == -1 || hostIpAddrs[i] == myIpAddr) continue; -// printf("%s -> sending request_trans_wait to %s\n",__func__,midtoIPString(hostIpAddrs[i])); + printf("%s -> sending request_trans_wait to %s\n",__func__,midtoIPString(hostIpAddrs[i])); request = REQUEST_TRANS_WAIT; send_data(sdlist[i],&request, sizeof(char)); send_data(sdlist[i],&epoch_num,sizeof(unsigned int)); send_data(sdlist[i],&myIndexInHostArray,sizeof(unsigned int)); } -// printf("%s -> Stop transaction\n",__func__); + printf("%s -> Stop transaction\n",__func__); /* stop all local transactions */ if(stopTransactions(TRANS_BEFORE,epoch_num) < 0) return -1; -// printf("After Stop transaction\n"); + printf("After Stop transaction\n"); // grab leader's transaction list first tlist_node_t* walker = transList->head; @@ -2042,15 +2057,15 @@ int pingMachines(unsigned int epoch_num,int* sdlist,tlist_t** tList) walker = walker->next; } -// printf("%s -> Local Transactions\n",__func__); -// tlistPrint(currentTransactionList); + printf("%s -> Local Transactions\n",__func__); + tlistPrint(currentTransactionList); for(i = 0; i < numHostsInSystem; i++) { if(sdlist[i] == -1 || hostIpAddrs[i] == myIpAddr) continue; -// printf("%s -> receving from %s\n",__func__,midtoIPString(hostIpAddrs[i])); + printf("%s -> receving from %s\n",__func__,midtoIPString(hostIpAddrs[i])); if(recv_data(sdlist[i],&response,sizeof(char)) < 0) { printf("Here\n"); @@ -2062,11 +2077,11 @@ int pingMachines(unsigned int epoch_num,int* sdlist,tlist_t** tList) if(response == RESPOND_TRANS_WAIT) { -// printf("%s -> RESPOND_TRANS_WAIT\n",__func__); + printf("%s -> RESPOND_TRANS_WAIT\n",__func__); int timeout1 = computeLiveHosts(sdlist[i]); -// printf("%s -> received host list\n",__func__); + printf("%s -> received host list\n",__func__); int timeout2 = makeTransactionLists(¤tTransactionList,sdlist[i],epoch_num); -// printf("%s -> received transaction list\n",__func__); + printf("%s -> received transaction list\n",__func__); // receive live host list // receive transaction list if(timeout1 < 0 || timeout2 < 0) { pthread_mutex_lock(&translist_mutex); @@ -2074,7 +2089,7 @@ int pingMachines(unsigned int epoch_num,int* sdlist,tlist_t** tList) pthread_mutex_unlock(&translist_mutex); return -2; } - // tlistPrint(currentTransactionList); + tlistPrint(currentTransactionList); } else if(response == RESPOND_HIGHER_EPOCH) { @@ -2102,7 +2117,7 @@ int pingMachines(unsigned int epoch_num,int* sdlist,tlist_t** tList) } *tList = currentTransactionList; -// printf("%s -> Exit\n",__func__); + printf("%s -> Exit\n",__func__); return 0; } @@ -2242,7 +2257,7 @@ int makeTransactionLists(tlist_t** tlist,int sd,unsigned int epoch_num) tlist_node_t* tNode = &transArray[j]; tNode->status = TRANS_OK; -// printf("%s -> transid = %u decision = %d\n",__func__,transArray[j].transid,transArray[j].decision); + printf("%s -> transid = %u decision = %d\n",__func__,transArray[j].transid,transArray[j].decision); *tlist = tlistInsertNode2(*tlist,&(transArray[j]),epoch_num); } else { @@ -2308,11 +2323,11 @@ int inspectEpoch(unsigned int epoch_num,const char* f) pthread_mutex_lock(&recovery_mutex); if(epoch_num < currentEpoch) { flag = -1; - }/* - else if(epoch_num > currentEpoch) { -// printf("%s -> current epoch %u is changed to epoch num = %u\n",f,currentEpoch,epoch_num); -// currentEpoch = epoch_num; - }*/ + } + else if((epoch_num > currentEpoch) && strcmp(f,"REQUEST_TRANS_WAIT")==0) { + printf("%s -> current epoch %u is changed to epoch num = %u\n",f,currentEpoch,epoch_num); + currentEpoch = epoch_num; + } pthread_mutex_unlock(&recovery_mutex); return flag; @@ -3059,29 +3074,23 @@ int startRemoteThread(unsigned int oid, unsigned int mid) { //TODO: when reusing oids, make sure they are not already in use! static unsigned int id = 0xFFFFFFFF; unsigned int getNewOID(void) { - do { - unsigned int origid=id; - unsigned int newid=id+2; - if (newid> oidMax || newid < oidMin) { - newid=oidMin | 1; - } - if (CAS32(&id, origid, newid)==origid) - return newid; - } while(1); + id += 2; + if (id > oidMax || id < oidMin) { + id = (oidMin | 1); + } + return id; } #ifdef RECOVERY static unsigned int tid = 0xFFFFFFFF; unsigned int getNewTransID(void) { - do { - unsigned int origtid=tid; - unsigned int newtid=tid+2; - if (newtid>transIDMax || newtid < transIDMin) { - newtid=transIDMin | 1; - } - if (CAS32(&tid, origtid, newtid)==origtid) - return newtid; - } while(1); + pthread_mutex_lock(&tidlock); + tid+=2; + if (tid > transIDMax || tid < transIDMin) { + tid = (transIDMin | 1); + } + pthread_mutex_unlock(&tidlock); + return tid; } #endif -- 2.34.1