From 2426f01007760726da7aeedde8adce82dbadfa20 Mon Sep 17 00:00:00 2001 From: jihoonl Date: Sat, 22 May 2010 00:46:51 +0000 Subject: [PATCH] simultaneous failure --- .../Runtime/DSTM/interface_recovery/dstm.h | 4 +- .../DSTM/interface_recovery/dstmserver.c | 13 ++- .../src/Runtime/DSTM/interface_recovery/ip.c | 3 - .../Runtime/DSTM/interface_recovery/plookup.c | 27 +++++ .../Runtime/DSTM/interface_recovery/plookup.h | 2 +- .../Runtime/DSTM/interface_recovery/tlookup.c | 3 - .../Runtime/DSTM/interface_recovery/trans.c | 102 +++++++++++------- 7 files changed, 104 insertions(+), 50 deletions(-) diff --git a/Robust/src/Runtime/DSTM/interface_recovery/dstm.h b/Robust/src/Runtime/DSTM/interface_recovery/dstm.h index 148439a7..4bf76a4f 100644 --- a/Robust/src/Runtime/DSTM/interface_recovery/dstm.h +++ b/Robust/src/Runtime/DSTM/interface_recovery/dstm.h @@ -318,8 +318,8 @@ int pingMachines(unsigned int epoch_num,int* sdlist,tlist_t**); int releaseNewLists(unsigned int epoch_num,int* sdlist,tlist_t*); int duplicateLostObjects(unsigned int epoch_num,int* sdlist); void restartTransactions(unsigned int epoch_num,int* sdlist); -void makeTransactionLists(tlist_t**,int); -void computeLiveHosts(int); +int makeTransactionLists(tlist_t**,int); +int computeLiveHosts(int); void waitForAllMachine(); int readDuplicateObjs(int); void printRecoveryStat(); diff --git a/Robust/src/Runtime/DSTM/interface_recovery/dstmserver.c b/Robust/src/Runtime/DSTM/interface_recovery/dstmserver.c index 180f67b5..48689643 100644 --- a/Robust/src/Runtime/DSTM/interface_recovery/dstmserver.c +++ b/Robust/src/Runtime/DSTM/interface_recovery/dstmserver.c @@ -227,6 +227,8 @@ void* startPolling() socklist[deadMachineIndex] = -1; } // end of if 2 } // end of while 1 + + free(socklist); } @@ -239,7 +241,7 @@ unsigned int checkIfAnyMachineDead(int* socklist) while(1){ - if(okCommit == TRANS_OK) { +// if(okCommit == TRANS_OK) { for(i = 0; i< numHostsInSystem;i++) { if(socklist[i] > 0) { send_data(socklist[i], &control,sizeof(char)); @@ -258,12 +260,13 @@ unsigned int checkIfAnyMachineDead(int* socklist) } // end for() clearDeadThreadsNotification(); - } +// } + /* else { if(leader_index >= 0 ) { - send_data(socklist[i],&control,sizeof(char)); + send_data(socklist[leader_index],&control,sizeof(char)); - if(recv_data(socklist[i], &response, sizeof(char)) < 0) { + if(recv_data(socklist[leader_index], &response, sizeof(char)) < 0) { // if machine is dead, returns index of socket return i; } @@ -275,7 +278,7 @@ unsigned int checkIfAnyMachineDead(int* socklist) } // end else } } - +*/ sleep(numLiveHostsInSystem); // wait for seconds for next checking } // end while(1) } diff --git a/Robust/src/Runtime/DSTM/interface_recovery/ip.c b/Robust/src/Runtime/DSTM/interface_recovery/ip.c index d5c6b45a..00170466 100644 --- a/Robust/src/Runtime/DSTM/interface_recovery/ip.c +++ b/Robust/src/Runtime/DSTM/interface_recovery/ip.c @@ -29,9 +29,6 @@ void midtoIP(unsigned int mid, char *ptr) { i.c = (mid & 0x0000ff00) >> 8; i.d = mid & 0x000000ff; sprintf(ptr, "%d.%d.%d.%d", i.a, i.b, i.c, i.d); -#ifdef DEBUG - printf("DEBUG-> midtoIP() mid = %d.%d.%d.%d\n", i.a, i.b, i.c, i.d); -#endif return; } diff --git a/Robust/src/Runtime/DSTM/interface_recovery/plookup.c b/Robust/src/Runtime/DSTM/interface_recovery/plookup.c index 3eee2cbd..7f4631ef 100644 --- a/Robust/src/Runtime/DSTM/interface_recovery/plookup.c +++ b/Robust/src/Runtime/DSTM/interface_recovery/plookup.c @@ -79,3 +79,30 @@ void pDelete(plistnode_t *pile) { } return; } + +void pPrint(plistnode_t *pile) +{ + int i; + plistnode_t* walker = pile; + + printf("===========pPrint===========\n"); + + while(walker) { + printf("\nMid = %s\n",midtoIPString(walker->mid)); + printf("== oidread ==\n"); + for(i = 0 ; i < walker->numread; i++) + printf("%d\n",walker->objread[i]); + printf("\n== oidmood ==\n"); + + for(i = 0 ; i < walker->nummod; i++) + printf("%d\n",walker->oidmod[i]); + + printf("\n== oidcreate ==\n"); + for(i = 0 ; i < walker->numcreated; i++) + printf("%d\n",walker->oidcreated[i]); + + walker = walker->next; + } + + printf("============= End ========\n"); +} diff --git a/Robust/src/Runtime/DSTM/interface_recovery/plookup.h b/Robust/src/Runtime/DSTM/interface_recovery/plookup.h index d4137839..801703e0 100644 --- a/Robust/src/Runtime/DSTM/interface_recovery/plookup.h +++ b/Robust/src/Runtime/DSTM/interface_recovery/plookup.h @@ -23,6 +23,6 @@ plistnode_t *pCreate(int); int pCount(plistnode_t *pile); int pListMid(plistnode_t *pile, unsigned int *list); void pDelete(plistnode_t *pile); - +void pPrint(plistnode_t *pile); #endif diff --git a/Robust/src/Runtime/DSTM/interface_recovery/tlookup.c b/Robust/src/Runtime/DSTM/interface_recovery/tlookup.c index 23b00a63..cafd594c 100644 --- a/Robust/src/Runtime/DSTM/interface_recovery/tlookup.c +++ b/Robust/src/Runtime/DSTM/interface_recovery/tlookup.c @@ -46,9 +46,6 @@ unsigned int thashInsert(unsigned int transid, char decision) { ptr = tlookup.table; tlookup.numelements++; -#ifdef DEBUG - printf("DEBUG(insert) transid = %d, decision = %d, index = %d\n",transid, decision, index); -#endif if(ptr[index].next == NULL && ptr[index].transid == 0) { // Insert at the first position in the hashtable ptr[index].transid = transid; ptr[index].decision = decision; diff --git a/Robust/src/Runtime/DSTM/interface_recovery/trans.c b/Robust/src/Runtime/DSTM/interface_recovery/trans.c index 63431d08..2086cf1a 100644 --- a/Robust/src/Runtime/DSTM/interface_recovery/trans.c +++ b/Robust/src/Runtime/DSTM/interface_recovery/trans.c @@ -1225,7 +1225,8 @@ int transCommit() { #ifdef RECOVERY while(okCommit != TRANS_OK) { -// printf("%s -> new Transactin is waiting\n",__func__); +// printf("%s -> new Transaction is waiting\n",__func__); + // sleep(1); randomdelay(); } @@ -1794,6 +1795,14 @@ void notifyLeaderDeadMachine(unsigned int deadHost) { return; } + pthread_mutex_lock(&liveHosts_mutex); + liveHosts[findHost(deadHost)] = 0; + numLiveHostsInSystem--; + pthread_mutex_unlock(&liveHosts_mutex); + + if(numLiveHostsInSystem == 1) + return; + // increase epoch number by number machines in the system pthread_mutex_lock(&recovery_mutex); epoch_num = currentEpoch = INCREASE_EPOCH(currentEpoch,numHostsInSystem,myIndexInHostArray); @@ -1824,28 +1833,32 @@ void restoreDuplicationState(unsigned int deadHost,unsigned int epoch_num) // update leader's live host list and object locations do { - sdlist = getSocketLists(); + do { + sdlist = getSocketLists(); + + printf("%s -> I'm currently leader num : %d\n\n",__func__,epoch_num); + if((flag = pingMachines(epoch_num,sdlist,&tList)) < 0) break; - printf("%s -> I'm currently leader num : %d\n\n",__func__,epoch_num); - if((flag = pingMachines(epoch_num,sdlist,&tList)) < 0) { - break;; - } + printf("%s -> I'm currently leader num : %d\n\n",__func__,epoch_num); + if((flag = releaseNewLists(epoch_num,sdlist,tList)) < 0) break; - printf("%s -> I'm currently leader num : %d\n\n",__func__,epoch_num); + // transfer lost objects + if((flag= duplicateLostObjects(epoch_num,sdlist)) < 0) break; - if((flag = releaseNewLists(epoch_num,sdlist,tList)) < 0) { - break;; - } + // restart transactions + restartTransactions(epoch_num,sdlist); + }while(0); + + freeSocketLists(sdlist); - // transfer lost objects - if((flag= duplicateLostObjects(epoch_num,sdlist)) < 0) { + // falg == 0 - fixed + // == -1 - higher epoch + // == -2 - found another failure, redo everything + if(flag > -2) break; - } - // restart transactions - restartTransactions(epoch_num,sdlist); - }while(0); - freeSocketLists(sdlist); + printf("%s -> Retry \n",__func__); + }while(0); if(flag < 0) { printf("%s -> higher epoch\n",__func__); @@ -1854,7 +1867,8 @@ void restoreDuplicationState(unsigned int deadHost,unsigned int epoch_num) randomdelay(); } - }else { + }else { + printf("%s -> I was leader! num : %d\n",__func__,epoch_num); #ifdef RECOVERYSTATS fi = myrdtsc(); recoverStat[numRecovery].elapsedTime = (fi-st)/CPU_FREQ; @@ -1979,14 +1993,19 @@ int pingMachines(unsigned int epoch_num,int* sdlist,tlist_t** tList) if(sdlist[i] == -1 || hostIpAddrs[i] == myIpAddr) continue; - recv_data(sdlist[i],&response,sizeof(char)); + if(recv_data(sdlist[i],&response,sizeof(char)) < 0) + { + tlistDestroy(currentTransactionList); + return -2; + } if(response == RESPOND_TRANS_WAIT) { - // receive live host list - computeLiveHosts(sdlist[i]); - // receive transaction list - makeTransactionLists(¤tTransactionList,sdlist[i]); + // receive live host list // receive transaction list + if(computeLiveHosts(sdlist[i]) < 0 || makeTransactionLists(¤tTransactionList,sdlist[i]) < 0) { + tlistDestroy(currentTransactionList); + return -2; + } } else if(response == RESPOND_HIGHER_EPOCH) { @@ -2017,12 +2036,13 @@ int pingMachines(unsigned int epoch_num,int* sdlist,tlist_t** tList) return 0; } -void computeLiveHosts(int sd) +int computeLiveHosts(int sd) { int receivedHost[numHostsInSystem]; int i; - recv_data(sd,receivedHost,sizeof(int)*numHostsInSystem); + if(recv_data(sd,receivedHost,sizeof(int)*numHostsInSystem)) + return -2; for(i = 0 ; i < numHostsInSystem;i ++) { @@ -2034,7 +2054,7 @@ void computeLiveHosts(int sd) liveHosts[i] = 0; } - return; + return 0; } int releaseNewLists(unsigned int epoch_num,int* sdlist,tlist_t* tlist) @@ -2092,7 +2112,10 @@ int releaseNewLists(unsigned int epoch_num,int* sdlist,tlist_t* tlist) if(sdlist[i] != -1 && hostIpAddrs[i] != myIpAddr) { // printf("%s -> Waiting for %s\n",__func__,midtoIPString(hostIpAddrs[i])); - recv_data(sdlist[i], &response, sizeof(char)); + if(recv_data(sdlist[i], &response, sizeof(char)) < 0) { + tlistDestroy(tlist); + return -2; + } if(response != TRANS_OK) { printf("%s -> response : %d Need to fix\n",__func__,response); @@ -2106,11 +2129,12 @@ int releaseNewLists(unsigned int epoch_num,int* sdlist,tlist_t* tlist) } tlistDestroy(tlist); printf("%s -> End\n",__func__); + return 0; } // after this fuction // leader knows all the on-going transaction list and their decisions -void makeTransactionLists(tlist_t** tlist,int sd) +int makeTransactionLists(tlist_t** tlist,int sd) { tlist_node_t* transArray; tlist_node_t* tmp; @@ -2119,14 +2143,18 @@ void makeTransactionLists(tlist_t** tlist,int sd) int size; // receive all on-going transaction list - recv_data(sd, &size, sizeof(int)); + if(recv_data(sd, &size, sizeof(int)) < 0) + return -2; if((transArray = calloc(size, sizeof(tlist_node_t))) == NULL) { printf("%s -> calloc error\n",__func__); exit(0); } - recv_data(sd,transArray, sizeof(tlist_node_t) * size); + if(recv_data(sd,transArray, sizeof(tlist_node_t) * size) < 0) { + free(transArray); + return -2; + } // add into currentTransactionList for(j = 0 ; j < size; j ++) { @@ -2158,7 +2186,8 @@ void makeTransactionLists(tlist_t** tlist,int sd) send_data(sd, &request, sizeof(char)); send_data(sd, &(walker->transid), sizeof(unsigned int)); - recv_data(sd, &respond, sizeof(char)); + if(recv_data(sd, &respond, sizeof(char)) < 0) + return -2; if(respond > 0) { @@ -2171,7 +2200,7 @@ void makeTransactionLists(tlist_t** tlist,int sd) request = REQUEST_TRANS_COMPLETE; send_data(sd, &request,sizeof(char)); - return; + return 0; } void restartTransactions(unsigned int epoch_num,int* sdlist) @@ -3016,7 +3045,7 @@ int processConfigFile() { myIndexInHostArray = findHost(myIpAddr); #ifdef RECOVERY liveHosts[myIndexInHostArray] = 1; - currentEpoch = myIndexInHostArray; + currentEpoch = 1; #ifdef RECOVERYSTATS numRecovery = 0; @@ -3288,17 +3317,18 @@ int duplicateLostObjects(unsigned int epoch_num,int *sdlist){ for(i = 0 ; i < numHostsInSystem; i ++) { if(sdlist[i] == -1) continue; - recv_data(sdlist[i],&response,sizeof(char)); + if(recv_data(sdlist[i],&response,sizeof(char))) + return -2; if(response != DUPLICATION_COMPLETE) { - printf("%s -> fail!\n",__func__); - exit(0); + return -2; } } #ifndef DEBUG printf("%s-> End\n", __func__); #endif + return 0; } #endif void addHost(unsigned int hostIp) { -- 2.34.1