From: jihoonl Date: Thu, 1 Apr 2010 01:55:31 +0000 (+0000) Subject: still have racing condition.. X-Git-Url: http://demsky.eecs.uci.edu/git/?a=commitdiff_plain;h=8d6cf6cc10ff5e71a482027b1d5986b21806d668;p=IRC.git still have racing condition.. --- diff --git a/Robust/src/Runtime/DSTM/interface_recovery/altmlookup.c b/Robust/src/Runtime/DSTM/interface_recovery/altmlookup.c index 95cb6484..12e7339c 100644 --- a/Robust/src/Runtime/DSTM/interface_recovery/altmlookup.c +++ b/Robust/src/Runtime/DSTM/interface_recovery/altmlookup.c @@ -303,11 +303,6 @@ void* mhashGetDuplicate(unsigned int *dupeSize, int backup) { //how big? } // printf("%s -> size = %d\n",__func__,size); - for(i=0;i End\n", __func__); #endif + for(i=0;i #endif +#ifdef RECOVERY +#include "translist.h" +#endif //bit designations for status field of objheader #define DIRTY 0x01 @@ -171,7 +188,7 @@ typedef struct objheader { unsigned int oid; unsigned short type; unsigned short version; - unsigned short rcount; +// unsigned short rcount; char status; } objheader_t; @@ -202,7 +219,7 @@ typedef struct thread_response { // Structure that holds fixed data to be sent along with TRANS_REQUEST typedef struct fixed_data { char control; /* control message */ - char trans_id[TID_LEN]; /* transaction id */ + unsigned int transid; /* transaction id */ int mcount; /* participant count */ unsigned int numread; /* no of objects read */ unsigned int nummod; /* no of objects modified */ @@ -217,6 +234,11 @@ typedef struct trans_req_data { char *objread; /* Pointer to array holding oid and version number of objects that are only read */ unsigned int *oidmod; /* Pointer to array holding oids of objects that are modified */ unsigned int *oidcreated; /* Pointer to array holding oids of objects that are newly created */ + +#ifdef RECOVERY + unsigned int transid; +#endif + } trans_req_data_t; /* Structure that holds information of objects that are not found in the participant @@ -235,6 +257,7 @@ typedef struct trans_commit_data { int leaderFixing; pthread_mutex_t leaderFixing_mutex; pthread_mutex_t liveHosts_mutex; + #endif #ifdef RECOVERYSTATS @@ -274,7 +297,15 @@ unsigned int getNewTransID(void); #ifdef RECOVERY /* Prototypes for duplications */ unsigned int updateLiveHosts(); +void updateLiveHostsList(int mid); int updateLiveHostsCommit(); +void receiveNewHostLists(int accept); +void stopTransactions(); +void sendTransList(int acceptfd); +void receiveTransList(int acceptfd); +int combineTransactionList(tlist_node_t* tArray,int size); + +void respondToLeader(); void setLocateObjHosts(); void setReLocateObjHosts(); void printHostsStatus(); @@ -295,7 +326,14 @@ void clearNotifyList(unsigned int oid); void duplicateLostObjects(unsigned int mid); unsigned int duplicateLocalBackupObjects(); unsigned int duplicateLocalOriginalObjects(); +void notifyLeaderDeadMachine(unsigned int deadHost); void restoreDuplicationState(unsigned int deadHost); +void notifyRestoration(); +void clearTransaction(); +void makeTransactionLists(tlist_t**,int*); +void releaseTransactionLists(tlist_t*,int*); +void waitForAllMachine(); +void restartTransactions(); int readDuplicateObjs(int); void printRecoveryStat(); diff --git a/Robust/src/Runtime/DSTM/interface_recovery/dstmserver.c b/Robust/src/Runtime/DSTM/interface_recovery/dstmserver.c index 44b8b895..30ad3bea 100644 --- a/Robust/src/Runtime/DSTM/interface_recovery/dstmserver.c +++ b/Robust/src/Runtime/DSTM/interface_recovery/dstmserver.c @@ -19,6 +19,7 @@ #include #include #include "tlookup.h" +#include "translist.h" #endif #define BACKLOG 10 //max pending connections @@ -37,6 +38,11 @@ extern int *liveHosts; extern int numLiveHostsInSystem; int clearNotifyListFlag; pthread_mutex_t clearNotifyList_mutex; + +tlist_t* transList; +int okCommit; // machine flag +extern numWaitMachine; + #endif objstr_t *mainobjstore; @@ -91,6 +97,10 @@ int dstmInit(void) { #ifdef RECOVERY if (thashCreate(THASH_SIZE, LOADFACTOR)) return 1; + if ((transList = tlistCreate())== NULL) { + printf("well error\n"); + return 1; + } #endif if (notifyhashCreate(N_HASH_SIZE, N_LOADFACTOR)) @@ -102,6 +112,8 @@ int dstmInit(void) { return 0; } + okCommit = TRANS_OK; + return 0; } @@ -191,9 +203,6 @@ void* startAsking() int validHost; int *socklist; int sd; -#ifdef DEBUG - printf("%s -> Entering\n",__func__); -#endif socklist = (int*) calloc(numHostsInSystem,sizeof(int)); @@ -216,14 +225,11 @@ void* startAsking() #ifdef DEBUG printf("%s -> Dead Machine : %s\n",__func__, midtoIPString(hostIpAddrs[deadMachineIndex])); #endif - restoreDuplicationState(hostIpAddrs[deadMachineIndex]); + notifyLeaderDeadMachine(hostIpAddrs[deadMachineIndex]); freeSockWithLock(transPResponseSocketPool, hostIpAddrs[deadMachineIndex], socklist[deadMachineIndex]); socklist[deadMachineIndex] = -1; } // end of if 2 } // end of while 1 -#ifdef DEBUG - printf("%s -> Exiting\n",__func__); -#endif } @@ -233,31 +239,19 @@ unsigned int checkIfAnyMachineDead(int* socklist) int i; char control = RESPOND_LIVE; char response; -#ifdef DEBUG - printf("%s -> Entering\n",__func__); -#endif while(1){ for(i = 0; i< numHostsInSystem;i++) { -#ifdef DEBUG - printf("%s -> socklist[%d] = %d\n",__func__,i,socklist[i]); -#endif if(socklist[i] > 0) { send_data(socklist[i], &control,sizeof(char)); if(recv_data(socklist[i], &response, sizeof(char)) < 0) { // if machine is dead, returns index of socket -#ifdef DEBUG - printf("%s -> Machine dead detecteed\n",__func__); -#endif return i; } else { // machine responded if(response != LIVE) { -#ifdef DEBUG - printf("%s -> Machine dead detected\n",__func__); -#endif return i; } } // end else @@ -513,14 +507,8 @@ void *dstmAccept(void *acceptfd) { #ifdef RECOVERY case RESPOND_LIVE: -#ifdef DEBUG - printf("control -> RESPOND_LIVE\n"); -#endif ctrl = LIVE; send_data((int)acceptfd, &ctrl, sizeof(ctrl)); -#ifdef DEBUG - printf("%s (RESPOND_LIVE)-> Sending LIVE!\n", __func__); -#endif break; #endif #ifdef RECOVERY @@ -539,18 +527,12 @@ void *dstmAccept(void *acceptfd) { if(!leaderFixing) { leaderFixing = 1; pthread_mutex_unlock(&leaderFixing_mutex); - // begin fixing - updateLiveHosts(); - duplicateLostObjects(mid); - if(updateLiveHostsCommit() != 0) { - printf("error updateLiveHostsCommit()\n"); - exit(1); - } - - // finish fixing - pthread_mutex_lock(&leaderFixing_mutex); - leaderFixing = 0; - pthread_mutex_unlock(&leaderFixing_mutex); + + restoreDuplicationState(mid); + // finish fixing + pthread_mutex_lock(&leaderFixing_mutex); + leaderFixing = 0; + pthread_mutex_unlock(&leaderFixing_mutex); } else { pthread_mutex_unlock(&leaderFixing_mutex); @@ -562,27 +544,49 @@ void *dstmAccept(void *acceptfd) { break; #endif #ifdef RECOVERY + case REQUEST_TRANS_WAIT: + receiveNewHostLists((int)acceptfd); + stopTransactions(); + + response = RESPOND_TRANS_WAIT; + send_data((int)acceptfd,&response,sizeof(char)); +// respondToLeader(); + break; + + case RESPOND_TRANS_WAIT: + printf("control -> RESPOND_TRANS_WAIT\n"); + pthread_mutex_lock(&liveHosts_mutex); + numWaitMachine++; + pthread_mutex_unlock(&liveHosts_mutex); + printf("numWaitMachine = %d\n",numWaitMachine); + break; + + case REQUEST_TRANS_LIST: + printf("control -> REQUEST_TRANS_LIST\n"); + sendTransList((int)acceptfd); + receiveTransList((int)acceptfd); + break; + + case REQUEST_TRANS_RESTART: + pthread_mutex_lock(&liveHosts_mutex); + okCommit = TRANS_OK; + pthread_mutex_unlock(&liveHosts_mutex); + break; case UPDATE_LIVE_HOSTS: #ifdef DEBUG printf("control -> UPDATE_LIVE_HOSTS\n"); #endif - // copy back - pthread_mutex_lock(&liveHosts_mutex); - recv_data((int)acceptfd, liveHosts, sizeof(int)*numHostsInSystem); - recv_data((int)acceptfd, locateObjHosts, sizeof(unsigned int)*numHostsInSystem*2); - pthread_mutex_unlock(&liveHosts_mutex); - numLiveHostsInSystem = getNumLiveHostsInSystem(); + receiveNewHostLists((int)acceptfd); + #ifdef DEBUG printHostsStatus(); printf("%s (UPDATE_LIVE_HOSTS)-> Finished\n", __func__); #endif - //exit(0); break; #endif #ifdef RECOVERY case DUPLICATE_ORIGINAL: - { struct sockaddr_in remoteAddr; int sd; @@ -899,6 +903,10 @@ int readClientReq(trans_commit_data_t *transinfo, int acceptfd) { fixed.control = TRANS_REQUEST; timeout = recv_data((int)acceptfd, ptr+1, size); +#ifdef RECOVERY + transList = tlistInsertNode(transList,fixed.transid,TRYING_TO_COMMIT,TRANS_OK); +#endif + /* Read list of mids */ int mcount = fixed.mcount; size = mcount * sizeof(unsigned int); @@ -988,13 +996,6 @@ int processClientReq(fixed_data_t *fixed, trans_commit_data_t *transinfo, objheader_t *tmp_header; void *header; int i = 0, val; - unsigned int transID; -#ifdef DEBUG - printf("%s-> Entering\n", __func__); -#endif - - /* receives transaction id */ - recv_data((int)acceptfd, &transID, sizeof(unsigned int)); /* Send reply to the Coordinator */ if((retval = handleTransReq(fixed, transinfo, listmid, objread, modptr,acceptfd)) == 0 ) { @@ -1004,23 +1005,34 @@ int processClientReq(fixed_data_t *fixed, trans_commit_data_t *transinfo, } int timeout = recv_data((int)acceptfd, &control, sizeof(char)); - /* Process the new control message */ -#ifdef DEBUG - printf("%s -> timeout = %d control = %d\n",__func__,timeout,control); -#endif - + #ifdef RECOVERY + tlist_node_t* tNode; + tNode = tlistSearch(transList,fixed->transid); + if(timeout < 0) { // timeout. failed to receiving data from coordinator -#ifdef DEBUG - printf("%s -> timeout!! assumes coordinator is dead\n",__func__); -#endif - control = receiveDecisionFromBackup(transID,fixed->mcount,listmid); -#ifdef DEBUG - printf("%s -> received Decision %d\n",__func__,control); -#endif - } - /* insert received control into thash for another transaction*/ - thashInsert(transID, control); + tNode->decision = DECISION_LOST; + printf("%s -> DECISON_LOST! control = %d\n",__func__,control); + } + else + tNode->decision = control; + + // check if it is allowed to commit + if(!((tNode->decision != DECISION_LOST) && (okCommit == TRANS_OK))) + { + pthread_mutex_lock(&liveHosts_mutex); + tNode->status = TRANS_WAIT; + pthread_mutex_unlock(&liveHosts_mutex); + + while(!((tNode->decision != DECISION_LOST) && (okCommit == TRANS_OK))) { + printf("%s -> transID : %u decision : %d is waiting\n",__func__,tNode->transid,tNode->decision); + sleep(1); + } + } + + control = tNode->decision; + + thashInsert(fixed->transid, control); #endif switch(control) { @@ -1048,6 +1060,7 @@ int processClientReq(fixed_data_t *fixed, trans_commit_data_t *transinfo, break; case TRANS_COMMIT: + /* insert received control into thash for another transaction*/ /* Invoke the transCommit process() */ if((val = transCommitProcess(modptr, oidmod, transinfo->objlocked, fixed->nummod, transinfo->numlocked, (int)acceptfd)) != 0) { printf("Error: In transCommitProcess() %s, %d\n", __FILE__, __LINE__); @@ -1074,6 +1087,16 @@ int processClientReq(fixed_data_t *fixed, trans_commit_data_t *transinfo, break; } +#ifdef RECOVERY +// printf("%s -> transID : %u has been committed\n",__func__,transID); + tNode->status = TRANS_OK; + + pthread_mutex_lock(&clearNotifyList_mutex); + transList = tlistRemove(transList,fixed->transid); + pthread_mutex_unlock(&clearNotifyList_mutex); + +#endif + /* Free memory */ if (transinfo->objlocked != NULL) { free(transinfo->objlocked); @@ -1870,4 +1893,157 @@ void clearNotifyList(unsigned int oid) printf("%s -> finished\n",__func__); #endif } + +void receiveNewHostLists(int acceptfd) +{ + // copy back + pthread_mutex_lock(&liveHosts_mutex); + recv_data((int)acceptfd, liveHosts, sizeof(int)*numHostsInSystem); + recv_data((int)acceptfd, locateObjHosts, sizeof(unsigned int)*numHostsInSystem*2); + pthread_mutex_unlock(&liveHosts_mutex); + + numLiveHostsInSystem = getNumLiveHostsInSystem(); +} + +/* wait until all transaction waits for leader's decision */ +void stopTransactions() +{ + printf("%s - > Enter\n",__func__); + int size = transList->size; + int i; + tlist_node_t* walker; + + pthread_mutex_lock(&liveHosts_mutex); + okCommit = TRANS_WAIT; + pthread_mutex_unlock(&liveHosts_mutex); + /* make sure that all transactions are stopped */ + + pthread_mutex_lock(&clearNotifyList_mutex); + + do { + transList->flag = 0; + walker = transList->head; + + while(walker) + { + // locking + while(!(walker->status == TRANS_WAIT || walker->status == TRANS_OK)) { + printf("%s -> Waiting for %u - Status : %d tHash : %d\n",__func__,walker->transid,walker->status,thashSearch(walker->transid)); + sleep(2); + } + + walker = walker->next; + } + }while(transList->flag == 1); + + pthread_mutex_unlock(&clearNotifyList_mutex); + printf("%s - > Exit\n",__func__); +} + +void sendTransList(int acceptfd) +{ + printf("%s -> Enter\n",__func__); + + int size; + char response; + int transid; + + // send on-going transaction + tlist_node_t* transArray = tlistToArray(transList,&size); + + if(transList->size != 0) + tlistPrint(transList); + + printf("%s -> transList->size : %d size = %d\n",__func__,transList->size,size); + + send_data((int)acceptfd,&size,sizeof(int)); + send_data((int)acceptfd,transArray, sizeof(tlist_node_t) * size); + + // check if it already commit the decision for a transaction + recv_data((int)acceptfd,&response, sizeof(char)); + + while(response == REQUEST_TRANS_CHECK) + { + int transid; + recv_data((int)acceptfd,&transid, sizeof(unsigned int)); + + response = checkDecision(transid); + send_data((int)acceptfd,&response, sizeof(char)); + + recv_data((int)acceptfd,&response,sizeof(char)); + } + + free(transArray); + printf("%s - > Exit\n",__func__); +} + +void receiveTransList(int acceptfd) +{ + printf("%s -> Enter\n",__func__); + int size; + tlist_node_t* tArray; + tlist_node_t* walker; + int i; + int flag = 1; + char response; + + recv_data((int)acceptfd,&size,sizeof(int)); + + printf("%s -> size : %d\n",__func__,size); + + if(size > 0) { + if((tArray = calloc(size,sizeof(tlist_node_t) * size)) == NULL) + { + printf("%s -> calloc error\n",__func__); + exit(0); + } + + recv_data((int)acceptfd,tArray,sizeof(tlist_node_t) * size); + + flag = combineTransactionList(tArray,size); + + free(tArray); + } + + + if(flag == 1) + { + response = TRANS_OK; + } + else + { + response = -1; + } + + printf("%s -> response : %d\n",__func__,response); + + send_data((int)acceptfd,&response,sizeof(char)); + + printf("%s -> End\n",__func__); +} + + +int combineTransactionList(tlist_node_t* tArray,int size) +{ + int flag = 1; + tlist_node_t* walker; + int i; + + walker = transList->head; + + while(walker){ + for(i = 0; i < size; i++) + { + if(walker->transid == tArray[i].transid) + { + walker->decision = tArray[i].decision; + break; + } + } + walker = walker->next; + } + + return flag; +} + #endif diff --git a/Robust/src/Runtime/DSTM/interface_recovery/sockpool.c b/Robust/src/Runtime/DSTM/interface_recovery/sockpool.c index 4508ef04..d96d8254 100644 --- a/Robust/src/Runtime/DSTM/interface_recovery/sockpool.c +++ b/Robust/src/Runtime/DSTM/interface_recovery/sockpool.c @@ -82,7 +82,7 @@ int createNewSocket(unsigned int mid) { int getSockWithLock(sockPoolHashTable_t *sockhash, unsigned int mid) { socknode_t **ptr; int key = mid&(sockhash->mask); - int sd; + int sd = -1; Lock(&sockhash->mylock); ptr=&(sockhash->table[key]); @@ -105,6 +105,8 @@ int getSockWithLock(sockPoolHashTable_t *sockhash, unsigned int mid) { inusenode->sd = sd; inusenode->mid = mid; insToListWithLock(sockhash, inusenode); + if(sd < 0) + printf("%s -> sd : %d\n",__func__,sd); return sd; } else { return -1; diff --git a/Robust/src/Runtime/DSTM/interface_recovery/trans.c b/Robust/src/Runtime/DSTM/interface_recovery/trans.c index e35829ef..039477d1 100644 --- a/Robust/src/Runtime/DSTM/interface_recovery/trans.c +++ b/Robust/src/Runtime/DSTM/interface_recovery/trans.c @@ -25,6 +25,7 @@ #include #include #include "tlookup.h" +#include "translist.h" #define CPU_FREQ 3056842 #endif @@ -94,6 +95,8 @@ int *liveHosts; int numLiveHostsInSystem; unsigned int *locateObjHosts; +unsigned int numWaitMachine; +extern int okCommit; /* variables to clear dead threads */ int waitThreadMid; @@ -105,6 +108,9 @@ unsigned int transIDMax; char ip[16]; // for debugging purpose +extern tlist_t* transList; +extern pthread_mutex_t clearNotifyList_mutex; + /****************************** * Global variables for Paxos ******************************/ @@ -261,22 +267,13 @@ GDBRECV1: else if (numbytes<0){ //Receive returned an error. //Analyze underlying cause -#ifdef DEBUG - printf("%s-> fd : %d errno = %d %s\n", __func__, fd, errno, strerror(errno)); -#endif if(errno == ECONNRESET || errno == EAGAIN || errno == EWOULDBLOCK) { //machine has failed //if we see EAGAIN w/o failures, we should record the time //when we start read and finish read and see if it is longer //than our threshold -#ifdef DEBUG - printf("%s -> EAGAIN : %s\n",__func__,(errno == EAGAIN)?"TRUE":"FALSE"); -#endif if(errno == EAGAIN) { if(trycounter < 5) { -#ifdef DEBUG - printf("%s -> TRYcounter increases\n",__func__); -#endif trycounter++; continue; } @@ -312,9 +309,6 @@ GDBRECV1: } #endif } //close while loop -#ifdef DEBUG - printf("%s -> fd = %d Exiting\n",__func__,fd); -#endif return 0; // got all the data } @@ -937,6 +931,18 @@ remoteread: #endif objcopy = getRemoteObj(machinenumber, oid); + +#ifdef RECOVERY + if(transRetryFlag) { + notifyLeaderDeadMachine(machinenumber); + return transRead2(oid); + } +#endif + + if(objcopy == NULL) { + printf("Error: Object not found in Remote location %s, %d\n", __FILE__, __LINE__); + return NULL; + } else { #ifdef TRANSSTATS LOGEVENT('R'); nRemoteSend++; @@ -963,27 +969,18 @@ remoteread: #endif } -#ifdef RECOVERY - if(transRetryFlag) { - restoreDuplicationState(machinenumber); -#ifdef DEBUG - printf("%s -> Recall transRead2\n",__func__); -#endif - return transRead2(oid); - } -#endif - - if(objcopy == NULL) { - printf("Error: Object not found in Remote location %s, %d\n", __FILE__, __LINE__); - return NULL; - } else { + if(objcopy == NULL) { + printf("Error: Object not found in Remote location %s, %d\n", __FILE__, __LINE__); + return NULL; + } else { #ifdef COMPILER - return &objcopy[1]; + return &objcopy[1]; #else - return objcopy; + return objcopy; #endif - } - } + } + } + } #ifdef DEBUG printf("%s -> Finished!!\n",__func__); #endif @@ -1212,6 +1209,7 @@ int transCommit() { printf(" myIp:[%s]\n", midtoIPString(myIpAddr)); #endif tosend[sockindex].f.control = TRANS_REQUEST; + tosend[sockindex].f.transid = transID; tosend[sockindex].f.mcount = pilecount; tosend[sockindex].f.numread = pile->numread; tosend[sockindex].f.nummod = pile->nummod; @@ -1281,11 +1279,6 @@ int transCommit() { send_data(sd, modptr, tosend[sockindex].f.sum_bytes); //send_buf(sd, &writebuffer, modptr, tosend[sockindex].f.sum_bytes); -#ifdef RECOVERY - /* send transaction id, number of machine involved, machine ids */ - send_data(sd, &transID, sizeof(unsigned int)); - //forcesend_buf(sd, &writebuffer, &transID, sizeof(unsigned int)); -#endif free(modptr); } else { //handle request locally handleLocalReq(&tosend[sockindex], &transinfo, &getReplyCtrl[sockindex]); @@ -1484,7 +1477,7 @@ int transCommit() { #ifdef DEBUG printf("%s -> Dead machine Detected : %s\n",__func__,midtoIPString(deadmid)); #endif - restoreDuplicationState(deadmid); + notifyLeaderDeadMachine(deadmid); } #endif return TRANS_ABORT; @@ -1518,6 +1511,10 @@ void handleLocalReq(trans_req_data_t *tdata, trans_commit_data_t *transinfo, cha unsigned int oid; unsigned short version; +#ifdef RECOVERY + transList = tlistInsertNode(transList,tdata->f.transid,TRYING_TO_COMMIT,TRANS_OK); +#endif + /* Counters and arrays to formulate decision on control message to be sent */ oidnotfound = (unsigned int *) calloc((tdata->f.numread + tdata->f.nummod), sizeof(unsigned int)); oidlocked = (unsigned int *) calloc((tdata->f.numread + tdata->f.nummod +1), sizeof(unsigned int)); // calloc additional 1 byte for @@ -1584,6 +1581,12 @@ void doLocalProcess(char finalResponse, trans_req_data_t *tdata, trans_commit_da printf("ERROR...No Decision\n"); } +#ifdef RECOVERY + pthread_mutex_lock(&clearNotifyList_mutex); + transList = tlistRemove(transList,tdata->f.transid); + pthread_mutex_unlock(&clearNotifyList_mutex); +#endif + /* Free memory */ if (transinfo->objlocked != NULL) { free(transinfo->objlocked); @@ -1748,11 +1751,12 @@ char receiveDecisionFromBackup(unsigned int transID,int nummid,unsigned int *lis #endif #ifdef RECOVERY -void restoreDuplicationState(unsigned int deadHost) { +void notifyLeaderDeadMachine(unsigned int deadHost) { int sd; char ctrl; if(!liveHosts[findHost(deadHost)]) { // if it is already fixed + printf("%s -> already fixed\n",__func__); sleep(WAIT_TIME); return; } @@ -1764,40 +1768,31 @@ void restoreDuplicationState(unsigned int deadHost) { printf("%s-> leader?:%s, me?:%s\n", __func__, midtoIPString(leader), (myIpAddr == leader)?"LEADER":"NOT LEADER"); #endif - if(leader == myIpAddr) { + if(leader == myIpAddr) { // if i am the leader pthread_mutex_lock(&leaderFixing_mutex); if(!leaderFixing) { leaderFixing = 1; pthread_mutex_unlock(&leaderFixing_mutex); if(!liveHosts[findHost(deadHost)]) { // if it is already fixed -#ifdef DEBUG +#ifndef DEBUG printf("%s -> already fixed\n",__func__); #endif pthread_mutex_lock(&leaderFixing_mutex); leaderFixing =0; pthread_mutex_unlock(&leaderFixing_mutex); } - else { // if i am the leader - updateLiveHosts(); - - if(numLiveHostsInSystem == 1) - setReLocateObjHosts(deadHost); - else - duplicateLostObjects(deadHost); - - if(updateLiveHostsCommit() != 0) { - printf("%s -> error updateLiveHostsCommit()\n",__func__); - exit(1); - } - pthread_mutex_lock(&leaderFixing_mutex); + else { + restoreDuplicationState(deadHost); + + pthread_mutex_lock(&leaderFixing_mutex); leaderFixing = 0; pthread_mutex_unlock(&leaderFixing_mutex); } } else { pthread_mutex_unlock(&leaderFixing_mutex); -#ifdef DEBUG +#ifndef DEBUG printf("%s -> LEADER is already fixing\n",__func__); #endif sleep(WAIT_TIME); @@ -1815,9 +1810,400 @@ void restoreDuplicationState(unsigned int deadHost) { printf("%s -> Message sent\n",__func__); sleep(WAIT_TIME); } +} + +/* Leader's role */ +void restoreDuplicationState(unsigned int deadHost) +{ + printf("%s -> Entering\n",__func__); + + // update leader's live host list and object locations + updateLiveHostsList(deadHost); + setReLocateObjHosts(deadHost); + + // stop all transactions and update all other's machine list + notifyRestoration(); + + + // wait until all machines wait for leader + waitForAllMachine(); + + + // clear transaction + clearTransaction(); + + // transfer lost objects + duplicateLostObjects(deadHost); + getchar(); + // restart transactions + restartTransactions(); + + printf("%s -> Exiting\n",__func__); +} + +/* + 1. request all other machines to stop transactions + 2. update their live machine list + */ + +void notifyRestoration() +{ + int i; + int sd; + int sdlist[numHostsInSystem]; + + printf("%s -> Enter\n",__func__); + + printHostsStatus(); + + pthread_mutex_lock(&liveHosts_mutex); + numWaitMachine = 0; + pthread_mutex_unlock(&liveHosts_mutex); + // for other machines + for(i = 0; i < numHostsInSystem; i++) { + if(liveHosts[i] != 1 || hostIpAddrs[i] == myIpAddr) { + sdlist[i] = -1; + continue; + } + + if((sd = getSockWithLock(transPrefetchSockPool, hostIpAddrs[i])) < 0) + { + printf("%s -> socket create error\n",__func__); + exit(0); + } + else { + sdlist[i] = sd; + char request = REQUEST_TRANS_WAIT; + + send_data(sd, &request, sizeof(char)); + + /* send new host lists and object location */ + pthread_mutex_lock(&liveHosts_mutex); + send_data(sd, liveHosts, sizeof(int)*numHostsInSystem); + send_data(sd, locateObjHosts, sizeof(unsigned int)*numHostsInSystem*2); + pthread_mutex_unlock(&liveHosts_mutex); + } + } + + + for(i = 0 ; i < numHostsInSystem; i++) { + if(sdlist[i] != -1) + { + char response; + recv_data(sdlist[i],&response,sizeof(char)); + if(response == RESPOND_TRANS_WAIT) { + pthread_mutex_lock(&liveHosts_mutex); + numWaitMachine++; + pthread_mutex_unlock(&liveHosts_mutex); + } + + freeSockWithLock(transPrefetchSockPool,hostIpAddrs[i],sdlist[i]); + } + } + /* stop all local transactions */ + stopTransactions(); + printf("%s -> End\n",__func__); +} + +/* acknowledge leader that all transactions are waiting */ +void respondToLeader() +{ + printf("%s -> Enter\n",__func__); + int sd; + + if((sd = getSockWithLock(transPrefetchSockPool, leader)) < 0) { + printf("%s -> cannot open the socket\n",__func__); + exit(0); + } + else { + char request = RESPOND_TRANS_WAIT; +// printf("%s -> request = %s\n sd = %d\n",__func__,(request==RESPOND_TRANS_WAIT)?"RESPOND_TRANS_WAIT":"NONO"); + send_data(sd,&request,sizeof(char)); + freeSockWithLock(transPrefetchSockPool,leader,sd); + } + + printf("%s -> Exit\n",__func__); + return; +} + +/* wait untill receive from all machine */ +void waitForAllMachine() +{ + + pthread_mutex_lock(&liveHosts_mutex); + numWaitMachine++; // for local. It is done + pthread_mutex_unlock(&liveHosts_mutex); + - printf("%s -> Finished!\n",__func__); + /* wait untill receive from all machine */ + while(numWaitMachine < numLiveHostsInSystem) { + sleep(1); + } } + +void clearTransaction() +{ + int size; + tlist_t* tlist; + int sd; + struct sockaddr_in remoteAddr[numHostsInSystem]; + int sdlist[numHostsInSystem]; + int i; + + // open sockets to all live machines + for(i = 0 ; i < numHostsInSystem; i++) { + if(liveHosts[i] == 1 && hostIpAddrs[i] != myIpAddr) { + if((sd = socket(AF_INET , SOCK_STREAM, 0 )) < 0) + { + printf("%s -> socket create Error\n",__func__); + } + else { + bzero(&remoteAddr[i], sizeof(remoteAddr[i])); + remoteAddr[i].sin_family = AF_INET; + remoteAddr[i].sin_port = htons(LISTEN_PORT); + remoteAddr[i].sin_addr.s_addr = htonl(hostIpAddrs[i]); +// printf("%s -> open sd : %d to %s\n",__func__,sd,midtoIPString(hostIpAddrs[i])); + + if(connect(sd, (struct sockaddr *)&remoteAddr[i], sizeof(remoteAddr[i])) < 0) { + printf("%s -> socket connect error\n",__func__); + exit(0); + } + else { + sdlist[i] = sd; + } + } + } + else { + sdlist[i] = -1; + } + } + + /* receive transaction lists from all machines and + clarefy all decisions + returns an array of ongoing transactions */ + makeTransactionLists(&tlist,sdlist); + + /* release the cleared decisions to all machines */ + releaseTransactionLists(tlist,sdlist); + + for(i = 0 ; i < numHostsInSystem; i++) { + if(sdlist[i] != -1) { + close(sdlist[i]); + } + } + + tlistDestroy(tlist); + + printf("%s -> End\n",__func__); +} + +// after this fuction +// leader knows all the on-going transaction list and their decisions +void makeTransactionLists(tlist_t** tlist,int* sdlist) +{ + printf("%s -> Enter\n",__func__); + int sd; + int i; + tlist_t* currentTransactionList = tlistCreate(); + + printf("%s -> tlist size : %d\n",__func__,transList->size); + printf("%s -> currentTransactionList size : %d\n",__func__,currentTransactionList->size); + + + // grab leader's transaction list first + tlist_node_t* walker = transList->head; + + while(walker) { + tlistInsertNode2(currentTransactionList,walker); + walker = walker->next; + } + + // receive others transaction list + for(i = 0; i < numHostsInSystem;i ++) { + if(sdlist[i] != -1 && hostIpAddrs[i] != myIpAddr) { + char request = REQUEST_TRANS_LIST; + int size; + int j; + tlist_node_t* transArray; + tlist_node_t* tmp; + + sd = sdlist[i]; + + // send request + send_data(sd, &request, sizeof(char)); + + // receive all on-going transaction list + recv_data(sd, &size, sizeof(int)); + + printf("%s -> %s - size : %d\n",__func__,midtoIPString(hostIpAddrs[i]),size); + if((transArray = calloc(size, sizeof(tlist_node_t))) == NULL) { + printf("%s -> calloc error\n",__func__); + exit(0); + } + + recv_data(sd,transArray, sizeof(tlist_node_t) * size); + + // add into currentTransactionList + for(j = 0 ; j < size; j ++) { + tmp = tlistSearch(currentTransactionList,transArray[j].transid); + + if(tmp == NULL) { + currentTransactionList = tlistInsertNode2(currentTransactionList,&(transArray[j])); + } + else { + if(tmp->decision == DECISION_LOST) + { + tmp->decision = transArray[j].decision; + } + } + } // j loop + } + } // i loop + + // current transaction list is completed + // now see if any transaction is still missing + walker = currentTransactionList->head; + + while(walker) { + if(walker->decision == DECISION_LOST) { + for(i = 0 ; i < numHostsInSystem; i++) { + if(sdlist[i] != -1 && hostIpAddrs[i] != myIpAddr) + { + char request = REQUEST_TRANS_CHECK; + char respond; + + send_data(sdlist[i], &request, sizeof(char)); + send_data(sdlist[i], &(walker->transid), sizeof(unsigned int)); + + recv_data(sdlist[i], &respond, sizeof(char)); + + if(respond > 0) + { + walker->decision = respond; + break; + } + } + else if(hostIpAddrs[i] == myIpAddr) + { + char decision = checkDecision(walker->transid); + if(decision > 0) { + walker->decision = decision; + break; + } + } + } // i loop + + if(walker->decision == DECISION_LOST) { + printf("%s -> No one knows decision for transID : %u\n",__func__,walker->transid); + walker->decision = TRANS_ABORT; + } + } + walker = walker->next; + } // while loop + + printf("%s -> currentTransactionList size : %d\n",__func__,currentTransactionList->size); + + for(i = 0; i < numHostsInSystem; i++) { + if(sdlist[i] != -1 && hostIpAddrs[i] != myIpAddr) + { + char request = REQUEST_TRANS_COMPLETE; + send_data(sdlist[i], &request,sizeof(char)); + } + } + + *tlist = currentTransactionList; + tlistPrint(currentTransactionList); + + printf("%s -> End\n",__func__); +} + +// send out current on-going transaction +void releaseTransactionLists(tlist_t* tlist,int* sdlist) +{ + printf("%s -> Enter\n",__func__); + int size; + tlist_node_t* tArray = tlistToArray(tlist,&size); + int i; + char response; + int flag; + + printf("%s -> size : %d\n",__func__,size); + + for(i = 0; i < numHostsInSystem; i++) + { + if(sdlist[i] != -1 && hostIpAddrs[i] != myIpAddr) + { +// printf("%s -> Sent to sd : %d\n",__func__,sdlist[i]); + + if(size == 0) { + size = -1; + send_data(sdlist[i],&size,sizeof(int)); + } + else { + send_data(sdlist[i],&size,sizeof(int)); + send_data(sdlist[i],tArray,sizeof(tlist_node_t) * size); + } + } + else { + flag = combineTransactionList(tArray,size); + + if(flag == 0) { + printf("%s -> problem\n",__func__); + exit(0); + } + } + } + + if(size > 0) + free(tArray); + + for(i = 0; i < numHostsInSystem; i ++) { + if(sdlist[i] != -1 && hostIpAddrs[i] != myIpAddr) + { + // printf("%s -> Waiting for %s\n",__func__,midtoIPString(hostIpAddrs[i])); + recv_data(sdlist[i], &response, sizeof(char)); + + if(response != TRANS_OK) + { + printf("%s -> response : %d Need to fix\n",__func__,response); + } + } + } + + printf("%s -> End\n",__func__); +} + +void restartTransactions() +{ + int i; + int sd; + printf("%s -> Enter\n",__func__); + for(i = 0; i < numHostsInSystem; i++) { + if(hostIpAddrs[i] == myIpAddr) { + pthread_mutex_lock(&liveHosts_mutex); + okCommit = TRANS_OK; + pthread_mutex_unlock(&liveHosts_mutex); + continue; + } + if(liveHosts[i] == 1) + { + if((sd = getSockWithLock(transPrefetchSockPool, hostIpAddrs[i])) < 0) + { + printf("%s -> socket create error sd : %d\n",__func__,sd); + exit(0); + } + else { + char request = REQUEST_TRANS_RESTART; + + send_data(sd, &request, sizeof(char)); + + freeSockWithLock(transPrefetchSockPool,hostIpAddrs[i],sd); + } + } + } + printf("%s -> End\n",__func__); +} + #endif @@ -2083,7 +2469,7 @@ prefetchpile_t *foundLocal(char *ptr, int numprefetches) { machinenum = (flipBit)?getPrimaryMachine(lhashSearch(oid)):getBackupMachine(lhashSearch(oid)); flipBit ^= 1; #ifdef DEBUG - printf("mindex:%d, oid:%d, machinenumber:%s\n", machinenumber, oid, midtoIPString(machinenumber)); +// printf("mindex:%d, oid:%d, machinenumber:%s\n", machinenumber, oid, midtoIPString(machinenumber)); #endif #endif insertPile(machinenum, oid, endindex-newbase, &arryfields[newbase], &head); @@ -2523,6 +2909,7 @@ int startRemoteThread(unsigned int oid, unsigned int mid) { { msg[0] = START_REMOTE_THREAD; *((unsigned int *) &msg[1]) = oid; + send_data(sock, msg, 1 + sizeof(unsigned int)); } @@ -2758,6 +3145,8 @@ int getNumLiveHostsInSystem() { return count; } +// if flag = TRANS_OK, allow transactions +// flag = TRANS_WAIT, stop transactins int updateLiveHostsCommit() { #ifdef DEBUG printf("%s -> Enter\n",__func__); @@ -2829,6 +3218,40 @@ void setLocateObjHosts() { } } +// check the passed machine if it is still alive +void updateLiveHostsList(int mid) +{ + int mIndex = findHost(mid); + int sd; + + printf("%s -> Enter with %s\n",__func__,midtoIPString(mid)); + + if((sd = getSockWithLock(transPrefetchSockPool, hostIpAddrs[mIndex])) < 0) { + liveHosts[mIndex] = 0; + numLiveHostsInSystem--; + printf("%s -> 111End with %s\n",__func__,midtoIPString(mid)); + return; + } + + char liverequest = RESPOND_LIVE; + + send_data(sd, &liverequest, sizeof(char)); + + char response = 0; + int timeout = recv_data(sd, &response, sizeof(char)); + + if(response != LIVE) { + liveHosts[mIndex] = 0; + numLiveHostsInSystem--; + } + + freeSockWithLock(transPrefetchSockPool,hostIpAddrs[mIndex],sd); + printf("%s -> 222End with %s\n",__func__,midtoIPString(mid)); + return; + +} + +// rearrange object location array of leader machine void setReLocateObjHosts(int mid) { int mIndex = findHost(mid); @@ -2901,21 +3324,20 @@ void duplicateLostObjects(unsigned int mid){ recoverStat[numRecovery-1].deadMachine = mid; #endif -#ifndef DEBUG +#ifdef DEBUG printf("%s-> Start, mid: [%s]\n", __func__, midtoIPString(mid)); #endif //this needs to be changed. unsigned int backupMid = getBackupMachine(mid); // get backup machine of dead machine - unsigned int originalMid = getDuplicatedPrimaryMachine(mid); // get primary machine that used deadmachine as backup machine. + unsigned int originalMid = getPrimaryMachine(mid); // get primary machine that used deadmachine as backup machine. -#ifdef DEBUG - printf("%s-> backupMid: [%s], ", __func__, midtoIPString(backupMid)); - printf("originalMid: [%s]\n", midtoIPString(originalMid)); +#ifndef DEBUG + printf("%s-> backupMid: %d\t[%s]", __func__, backupMid,midtoIPString(backupMid)); + printf("originalMid: %d\t[%s]\n", originalMid,midtoIPString(originalMid)); printHostsStatus(); #endif - setReLocateObjHosts(mid); //connect to these machines //go through their object store copying necessary (in a transaction) @@ -2933,13 +3355,14 @@ void duplicateLostObjects(unsigned int mid){ * Backup 26 21,24 */ -#ifdef RECOVERYSTATS - dupeSize = 0; -#endif + if(((psd = getSockWithLock(transRequestSockPool, originalMid)) < 0 ) || + ((bsd = getSockWithLock(transRequestSockPool,backupMid)) <0)) { - if(((psd = getSockWithLock(transPrefetchSockPool, originalMid)) < 0 ) || ((bsd = getSockWithLock(transPrefetchSockPool,backupMid)) <0)) { + printf("%s -> psd : %d bsd : %d\n",__func__,psd,bsd); printf("%s -> Socket create error\n",__func__); - exit(0); + + while(1) + sleep(10); } /* request for original */ @@ -2954,7 +3377,7 @@ void duplicateLostObjects(unsigned int mid){ send_data(bsd, &originalMid, sizeof(unsigned int)); char p_response,b_response; - unsigned int p_receivedSize,b_receivedSize; + unsigned int p_receivedSize,b_receivedSize; recv_data(psd, &p_response, sizeof(char)); recv_data(psd, &p_receivedSize, sizeof(unsigned int)); @@ -2976,8 +3399,8 @@ void duplicateLostObjects(unsigned int mid){ exit(0); } - freeSockWithLock(transPrefetchSockPool, originalMid, psd); - freeSockWithLock(transPrefetchSockPool, backupMid, bsd); + freeSockWithLock(transRequestSockPool, originalMid, psd); + freeSockWithLock(transRequestSockPool, backupMid, bsd); #ifdef RECOVERYSTATS fi = myrdtsc(); @@ -3591,16 +4014,10 @@ int paxosPrepare() printf("paxosPrepare(): socket create error\n"); continue; } -#ifdef DEBUG - printf("%s-> Send PAXOS_PREPARE to mid [%s] with my_n=%d\n", __func__, midtoIPString(hostIpAddrs[i]), my_n); -#endif send_data(sd, &control, sizeof(char)); send_data(sd, &my_n, sizeof(int)); int timeout = recv_data(sd, &control, sizeof(char)); if ((sd == -1) || (timeout < 0)) { -#ifdef DEBUG - printf("%s-> timeout to machine [%s]\n", __func__, midtoIPString(hostIpAddrs[i])); -#endif continue; } @@ -3609,9 +4026,6 @@ int paxosPrepare() cnt++; recv_data(sd, &remote_n, sizeof(int)); recv_data(sd, &remote_v, sizeof(int)); -#ifdef DEBUG - printf("%s-> Received PAXOS_PREPARE_OK from mindex [%d] with remote_v=%s\n", __func__, i, midtoIPString(remote_v)); -#endif if(remote_v != origleader) { if (remote_n > tmp_n) { tmp_n = remote_n; @@ -3626,10 +4040,6 @@ int paxosPrepare() freeSockWithLock(transPrefetchSockPool,hostIpAddrs[i],sd); } -#ifdef DEBUG - printf("%s-> cnt:%d, numLiveHostsInSystem:%d\n", __func__, cnt, numLiveHostsInSystem); -#endif - if (cnt >= (numLiveHostsInSystem / 2)) { // majority of OK replies return 1; } @@ -3646,9 +4056,6 @@ int paxosAccept() int sd; int remote_v = temp_v_a; -#ifdef DEBUG - printf("[Accept]...\n"); -#endif for (i = 0; i < numHostsInSystem; ++i) { control = PAXOS_ACCEPT; @@ -3665,10 +4072,8 @@ int paxosAccept() send_data(sd, &remote_v, sizeof(int)); int timeout = recv_data(sd, &control, sizeof(char)); - if ((sd == -1) || (timeout < 0)) { -#ifdef DEBUG - printf("%s-> timeout to machine [%s]\n", __func__, midtoIPString(hostIpAddrs[i])); -#endif + if (timeout < 0) { + freeSockWithLock(transPrefetchSockPool,hostIpAddrs[i],sd); continue; } @@ -3712,14 +4117,10 @@ void paxosLearn() { leader = v_a; paxosRound++; -#ifdef DEBUG - printf("This is my leader!!!: [%s]\n", midtoIPString(leader)); -#endif continue; } if ((sd = getSockWithLock(transPrefetchSockPool, hostIpAddrs[i])) < 0) { continue; - // printf("paxosLearn(): socket create error, attemp\n"); } send_data(sd, &control, sizeof(char)); @@ -3728,23 +4129,15 @@ void paxosLearn() freeSockWithLock(transPrefetchSockPool,hostIpAddrs[i],sd); } - //return v_a; } #endif #ifdef RECOVERY void clearDeadThreadsNotification() { - -#ifdef DEBUG - printf("%s -> Entered\n",__func__); -#endif // clear all the threadnotify request first if(waitThreadID != -1) { -#ifdef DEBUG - printf("%s -> I was waitng for %s\n",__func__,midtoIPString(waitThreadMid)); -#endif int waitThreadIndex = findHost(waitThreadMid); int i; notifydata_t *ndata; @@ -3768,9 +4161,6 @@ void clearDeadThreadsNotification() } } -#ifdef DEBUG - printf("%s -> Finished\n",__func__); -#endif } /* request the primary and the backup machines to clear