From: jihoonl Date: Wed, 7 Oct 2009 19:18:00 +0000 (+0000) Subject: recovery final X-Git-Url: http://demsky.eecs.uci.edu/git/?a=commitdiff_plain;h=66da65e8bca2d9002d41f3bdf4949aca26ab509f;p=IRC.git recovery final --- diff --git a/Robust/src/Runtime/DSTM/interface_recovery/dstmserver.c b/Robust/src/Runtime/DSTM/interface_recovery/dstmserver.c index 3025d42e..6e6c3168 100644 --- a/Robust/src/Runtime/DSTM/interface_recovery/dstmserver.c +++ b/Robust/src/Runtime/DSTM/interface_recovery/dstmserver.c @@ -35,19 +35,15 @@ extern unsigned int *locateObjHosts; extern int *liveHosts; extern int numLiveHostsInSystem; int clearNotifyListFlag; +pthread_mutex_t clearNotifyList_mutex; #endif objstr_t *mainobjstore; pthread_mutex_t mainobjstore_mutex; pthread_mutex_t lockObjHeader; -pthread_mutex_t clearNotifyList_mutex; pthread_mutexattr_t mainobjstore_mutex_attr; /* Attribute for lock to make it a recursive lock */ sockPoolHashTable_t *transPResponseSocketPool; -extern sockPoolHashTable_t *transRequestSockPool; -extern sockPoolHashTable_t *transReadSockPool; - -int failFlag = 0; //debug #ifdef RECOVERY /****************************** @@ -150,16 +146,16 @@ void *dstmListen(void *lfd) { pthread_t thread_dstm_accept; #ifdef RECOVERY - int firsttime = 1; + int firsttime = 1; // these two are for periodic checking pthread_t thread_dstm_asking; #endif -#ifdef DEBUG + printf("Listening on port %d, fd = %d\n", LISTEN_PORT, listenfd); -#endif while(1) { int retval; int flag=1; acceptfd = accept(listenfd, (struct sockaddr *)&client_addr, &addrlength); + setsockopt(acceptfd, IPPROTO_TCP, TCP_NODELAY, (char *) &flag, sizeof(flag)); #ifdef RECOVERY if(firsttime) { @@ -170,11 +166,7 @@ void *dstmListen(void *lfd) { pthread_detach(thread_dstm_asking); } #endif -#ifdef debug - printf("%s -> fd accepted\n",__func__); -#endif - setsockopt(acceptfd, IPPROTO_TCP, TCP_NODELAY, (char *) &flag, sizeof(flag)); do { retval=pthread_create(&thread_dstm_accept, NULL, dstmAccept, (void *)acceptfd); } while(retval!=0); @@ -280,16 +272,20 @@ void *dstmAccept(void *acceptfd) { char control,ctrl, response; char *ptr; void *srcObj; + +#ifdef RECOVERY void *dupeptr; + unsigned int transIDreceived; + char decision; + int timeout; +#endif + int i, tempsize; objheader_t *h; trans_commit_data_t transinfo; unsigned short objType, *versionarry, version; unsigned int *oidarry, numoid, mid, threadid; int n, v; - unsigned int transIDreceived; - char decision; - struct sockaddr_in remoteAddr; #ifdef DEBUG printf("%s-> Entering dstmAccept\n", __func__); fflush(stdout); @@ -302,9 +298,7 @@ void *dstmAccept(void *acceptfd) { if (ret==0) break; if (ret==-1) { -#ifdef DEBUG printf("DEBUG -> RECV Error!.. retrying\n"); -#endif // exit(0); break; } @@ -317,7 +311,10 @@ void *dstmAccept(void *acceptfd) { printf("control -> READ_REQUEST\n"); #endif /* Read oid requested and search if available */ - recv_data((int)acceptfd, &oid, sizeof(unsigned int)); + timeout = recv_data((int)acceptfd, &oid, sizeof(unsigned int)); + + if(timeout < 0) + break; while((srcObj = mhashSearch(oid)) == NULL) { int ret; // printf("HERE!!\n"); @@ -368,7 +365,8 @@ void *dstmAccept(void *acceptfd) { #ifdef RECOVERY case ASK_COMMIT : - recv_data((int)acceptfd, &transIDreceived, sizeof(unsigned int)); + if(recv_data((int)acceptfd, &transIDreceived, sizeof(unsigned int)) < 0) + break; decision = checkDecision(transIDreceived); @@ -573,116 +571,125 @@ void *dstmAccept(void *acceptfd) { #ifdef RECOVERY case DUPLICATE_ORIGINAL: + + { + struct sockaddr_in remoteAddr; + int sd; + #ifdef DEBUG - printf("control -> DUPLICATE_ORIGINAL\n"); - printf("%s (DUPLICATE_ORIGINAL)-> Attempt to duplicate original objects\n", __func__); + printf("control -> DUPLICATE_ORIGINAL\n"); + printf("%s (DUPLICATE_ORIGINAL)-> Attempt to duplicate original objects\n", __func__); #endif - //object store stuffffff - recv_data((int)acceptfd, &mid, sizeof(unsigned int)); - tempsize = mhashGetDuplicate(&dupeptr, 0); + //object store stuffffff + recv_data((int)acceptfd, &mid, sizeof(unsigned int)); + tempsize = mhashGetDuplicate(&dupeptr, 0); - //send control and dupes after - ctrl = RECEIVE_DUPES; + //send control and dupes after + ctrl = RECEIVE_DUPES; - if((sd = socket(AF_INET, SOCK_STREAM, 0)) < 0) { - perror("ORIGINAL : "); - exit(0); - } + if((sd = socket(AF_INET, SOCK_STREAM, 0)) < 0) { + perror("ORIGINAL : "); + exit(0); + } - bzero(&remoteAddr, sizeof(remoteAddr)); - remoteAddr.sin_family = AF_INET; - remoteAddr.sin_port = htons(LISTEN_PORT); - remoteAddr.sin_addr.s_addr = htonl(mid); + bzero(&remoteAddr, sizeof(remoteAddr)); + remoteAddr.sin_family = AF_INET; + remoteAddr.sin_port = htons(LISTEN_PORT); + remoteAddr.sin_addr.s_addr = htonl(mid); - if(connect(sd, (struct sockaddr *)&remoteAddr, sizeof(remoteAddr))<0) { - printf("ORIGINAL ERROR : %s\n",strerror(errno)); - exit(0); - } - else { - send_data(sd, &ctrl, sizeof(char)); - send_data(sd, dupeptr, tempsize); + if(connect(sd, (struct sockaddr *)&remoteAddr, sizeof(remoteAddr))<0) { + printf("ORIGINAL ERROR : %s\n",strerror(errno)); + exit(0); + } + else { + send_data(sd, &ctrl, sizeof(char)); + send_data(sd, dupeptr, tempsize); - recv_data(sd, &response, sizeof(char)); + recv_data(sd, &response, sizeof(char)); #ifdef DEBUG - printf("%s ->response : %d - %d\n",__func__,response,DUPLICATION_COMPLETE); + printf("%s ->response : %d - %d\n",__func__,response,DUPLICATION_COMPLETE); #endif - if(response != DUPLICATION_COMPLETE) { -#ifdef DEBUG - printf("%s(DUPLICATION_ORIGINAL) -> DUPLICATION FAIL\n",__func__); + if(response != DUPLICATION_COMPLETE) { +#ifndef DEBUG + printf("%s(DUPLICATION_ORIGINAL) -> DUPLICATION FAIL\n",__func__); #endif - //fail message - exit(0); - } + //fail message + exit(0); + } - close(sd); - } - free(dupeptr); + close(sd); + } + free(dupeptr); - ctrl = DUPLICATION_COMPLETE; - send_data((int)acceptfd, &ctrl, sizeof(char)); -#ifndef DEBUG - printf("%s (DUPLICATE_ORIGINAL)-> Finished\n", __func__); + ctrl = DUPLICATION_COMPLETE; + send_data((int)acceptfd, &ctrl, sizeof(char)); +#ifdef DEBUG + printf("%s (DUPLICATE_ORIGINAL)-> Finished\n", __func__); #endif - break; + } + break; case DUPLICATE_BACKUP: -#ifndef DEBUG - printf("control -> DUPLICATE_BACKUP\n"); - printf("%s (DUPLICATE_BACKUP)-> Attempt to duplicate backup objects\n", __func__); + { + struct sockaddr_in remoteAddr; + int sd; +#ifdef DEBUG + printf("control -> DUPLICATE_BACKUP\n"); + printf("%s (DUPLICATE_BACKUP)-> Attempt to duplicate backup objects\n", __func__); #endif - //object store stuffffff - recv_data((int)acceptfd, &mid, sizeof(unsigned int)); + //object store stuffffff + recv_data((int)acceptfd, &mid, sizeof(unsigned int)); - tempsize = mhashGetDuplicate(&dupeptr, 1); + tempsize = mhashGetDuplicate(&dupeptr, 1); - //send control and dupes after - ctrl = RECEIVE_DUPES; + //send control and dupes after + ctrl = RECEIVE_DUPES; - if((sd = socket(AF_INET, SOCK_STREAM, 0)) < 0) { - perror("BACKUP : "); - exit(0); - } + if((sd = socket(AF_INET, SOCK_STREAM, 0)) < 0) { + perror("BACKUP : "); + exit(0); + } - bzero(&remoteAddr, sizeof(remoteAddr)); - remoteAddr.sin_family = AF_INET; - remoteAddr.sin_port = htons(LISTEN_PORT); - remoteAddr.sin_addr.s_addr = htonl(mid); + bzero(&remoteAddr, sizeof(remoteAddr)); + remoteAddr.sin_family = AF_INET; + remoteAddr.sin_port = htons(LISTEN_PORT); + remoteAddr.sin_addr.s_addr = htonl(mid); - if(connect(sd, (struct sockaddr *)&remoteAddr, sizeof(remoteAddr))<0) { - printf("BACKUP ERROR : %s\n",strerror(errno)); - exit(0); - } - else { - send_data(sd, &ctrl, sizeof(char)); - send_data(sd, dupeptr, tempsize); + if(connect(sd, (struct sockaddr *)&remoteAddr, sizeof(remoteAddr))<0) { + printf("BACKUP ERROR : %s\n",strerror(errno)); + exit(0); + } + else { + send_data(sd, &ctrl, sizeof(char)); + send_data(sd, dupeptr, tempsize); - recv_data(sd, &response, sizeof(char)); + recv_data(sd, &response, sizeof(char)); #ifdef DEBUG - printf("%s ->response : %d - %d\n",__func__,response,DUPLICATION_COMPLETE); + printf("%s ->response : %d - %d\n",__func__,response,DUPLICATION_COMPLETE); #endif - if(response != DUPLICATION_COMPLETE) { + if(response != DUPLICATION_COMPLETE) { #ifndef DEBUG - printf("%s(DUPLICATION_BACKUP) -> DUPLICATION FAIL\n",__func__); + printf("%s(DUPLICATION_BACKUP) -> DUPLICATION FAIL\n",__func__); #endif - exit(0); - } + exit(0); + } - close(sd); - } + close(sd); + } - free(dupeptr); + free(dupeptr); - ctrl = DUPLICATION_COMPLETE; - send_data((int)acceptfd, &ctrl, sizeof(char)); -#ifndef DEBUG - printf("%s (DUPLICATE_BACKUP)-> Finished\n", __func__); + ctrl = DUPLICATION_COMPLETE; + send_data((int)acceptfd, &ctrl, sizeof(char)); +#ifdef DEBUG + printf("%s (DUPLICATE_BACKUP)-> Finished\n", __func__); #endif - + } break; case RECEIVE_DUPES: -#ifndef DEBUG +#ifdef DEBUG printf("control -> RECEIVE_DUPES sd : %d\n",(int)acceptfd); #endif if((readDuplicateObjs((int)acceptfd)) != 0) { @@ -692,7 +699,7 @@ void *dstmAccept(void *acceptfd) { ctrl = DUPLICATION_COMPLETE; send_data((int)acceptfd, &ctrl, sizeof(char)); -#ifndef DEBUG +#ifdef DEBUG printf("%s (RECEIVE_DUPES) -> Finished\n",__func__); #endif break; @@ -780,15 +787,15 @@ int readDuplicateObjs(int acceptfd) { #endif recv_data((int)acceptfd, &numoid, sizeof(unsigned int)); recv_data((int)acceptfd, &size, sizeof(int)); - // do i need array of oids? - // answer: no! now get to work - if(numoid != 0) { + + if(numoid != 0) { if ((dupeptr = calloc(1, size)) == NULL) { printf("calloc error for duplicated objects %s, %d\n", __FILE__, __LINE__); return 1; } recv_data((int)acceptfd, dupeptr, size); + ptr = dupeptr; for(i = 0; i < numoid; i++) { header = (objheader_t *)ptr; @@ -898,6 +905,9 @@ int readClientReq(trans_commit_data_t *transinfo, int acceptfd) { timeout = recv_data((int)acceptfd, objread, size); } + if(timeout < 0) + return 0; + /* Read modified objects */ if(fixed.nummod != 0) { if ((modptr = calloc(1, fixed.sum_bytes)) == NULL) { @@ -909,7 +919,11 @@ int readClientReq(trans_commit_data_t *transinfo, int acceptfd) { } if(timeout < 0) // coordinator failed + { + if(modptr != NULL) + free(modptr); return 0; + } /* Create an array of oids for modified objects */ oidmod = (unsigned int *) calloc(fixed.nummod, sizeof(unsigned int)); @@ -992,10 +1006,10 @@ int processClientReq(fixed_data_t *fixed, trans_commit_data_t *transinfo, printf("%s -> received Decision %d\n",__func__,control); #endif } - /* insert received control into thash for another transaction*/ thashInsert(transID, control); #endif + switch(control) { case TRANS_ABORT: if (fixed->nummod > 0) @@ -1168,6 +1182,9 @@ char handleTransReq(fixed_data_t *fixed, trans_commit_data_t *transinfo, unsigne printf("%s -> control = %d, file = %s, line = %d\n", __func__,(int)control, __FILE__, __LINE__); #endif + if(control < 0) + printf("control = %d\n",control); + send_data(acceptfd, &control, sizeof(char)); #ifdef CACHE send_data(acceptfd, &numBytes, sizeof(int)); @@ -1362,14 +1379,15 @@ char decideCtrlMessage(fixed_data_t *fixed, trans_commit_data_t *transinfo, int /* Send control message */ send_data(acceptfd, &control, sizeof(char)); + /* FIXME how to send objs Send number of oids not found and the missing oids if objects are missing in the machine */ - if(*(objnotfound) != 0) { + /*if(*(objnotfound) != 0) { int msg[1]; msg[0] = *(objnotfound); send_data(acceptfd, &msg, sizeof(int)); int size = sizeof(unsigned int)* *(objnotfound); send_data(acceptfd, oidnotfound, size); - } + }*/ } /* Fill out the trans_commit_data_t data structure. This is required for a trans commit process diff --git a/Robust/src/Runtime/DSTM/interface_recovery/trans.c b/Robust/src/Runtime/DSTM/interface_recovery/trans.c index b8f69dd2..e4433700 100644 --- a/Robust/src/Runtime/DSTM/interface_recovery/trans.c +++ b/Robust/src/Runtime/DSTM/interface_recovery/trans.c @@ -234,7 +234,7 @@ GDBRECV1: #endif if(errno == EAGAIN) { if(trycounter < 5) { -#ifndef DEBUG +#ifdef DEBUG printf("%s -> TRYcounter increases\n",__func__); #endif trycounter++; @@ -414,7 +414,7 @@ int dstmStartup(const char * option) { setLocateObjHosts(); updateLiveHostsCommit(); paxos(); - printHostsStatus(); +// printHostsStatus(); if(!allHostsLive()) { printf("Not all hosts live. Exiting.\n"); exit(-1); @@ -751,7 +751,6 @@ __attribute__((pure)) objheader_t *transRead2(unsigned int oid) { #ifdef RECOVERY transRetryFlag = 0; - unsigned int machinenumber; static int flipBit = 0; // Used to distribute requests between primary and backup evenly // either primary or backup machine machinenumber = (flipBit)?getPrimaryMachine(lhashSearch(oid)):getBackupMachine(lhashSearch(oid)); @@ -1070,7 +1069,7 @@ int transCommit() { #endif free(modptr); } else { //handle request locally - handleLocalReq(&tosend[sockindex], &transinfo, &getReplyCtrl[sockindex]); + handleLocalReq(&tosend[sockindex], &transinfo, &getReplyCtrl[sockindex]); } sockindex++; pile = pile->next; @@ -1088,13 +1087,17 @@ int transCommit() { char control; int timeout; // a variable to check if the connection is still alive. if it is -1, then need to transcommit again timeout = recv_data(sd, &control, sizeof(char)); + +// printf("i = %d control = %d\n",i,control); + + //Update common data structure with new ctrl msg getReplyCtrl[i] = control; /* Recv Objects if participant sends TRANS_DISAGREE */ #ifdef CACHE if(control == TRANS_DISAGREE) { int length; - recv_data(sd, &length, sizeof(int)); + timeout = recv_data(sd, &length, sizeof(int)); void *newAddr; pthread_mutex_lock(&prefetchcache_mutex); if ((newAddr = prefetchobjstrAlloc((unsigned int)length)) == NULL) { @@ -1105,7 +1108,7 @@ int transCommit() { return 1; } pthread_mutex_unlock(&prefetchcache_mutex); - recv_data(sd, newAddr, length); + timeout = recv_data(sd, newAddr, length); int offset = 0; while(length != 0) { unsigned int oidToPrefetch; @@ -1147,6 +1150,8 @@ int transCommit() { #ifdef DEBUG printf("%s-> Decide final response now\n", __func__); #endif + + /* Decide the final response */ if((finalResponse = decideResponse(getReplyCtrl, &treplyretry, pilecount)) == 0) { printf("Error: %s() in updating prefetch cache %s, %d\n", __func__, __FILE__, __LINE__); @@ -1277,7 +1282,7 @@ int transCommit() { printf("Error: in %s() THIS SHOULD NOT HAPPEN.....EXIT PROGRAM\n", __func__); exit(-1); } -#ifndef DEBUG +#ifdef DEBUG printf("%s-> End, line:%d\n\n", __func__, __LINE__); #endif return 0; @@ -1389,7 +1394,7 @@ char decideResponse(char *getReplyCtrl, char *treplyretry, int pilecount) { control = getReplyCtrl[i]; switch(control) { default: -#ifdef DEBUG +#ifndef DEBUG printf("%s-> Participant sent unknown message, i:%d, Control: %d\n", __func__, i, (int)control); #endif @@ -1452,7 +1457,7 @@ void *getRemoteObj(unsigned int mnum, unsigned int oid) { objheader_t *h; void *objcopy = NULL; - int sd = getSock2(transRequestSockPool, mnum); + int sd = getSock2(transReadSockPool, mnum); char readrequest[sizeof(char)+sizeof(unsigned int)]; readrequest[0] = READ_REQUEST; *((unsigned int *)(&readrequest[1])) = oid; @@ -1505,7 +1510,7 @@ char receiveDecisionFromBackup(unsigned int transID,int nummid,unsigned int *lis char response; for(i = 0; i < nummid; i++) { - if((sd = getSock(transPrefetchSockPool, listmid[i])) < 0) { + if((sd = getSockWithLock(transPrefetchSockPool, listmid[i])) < 0) { printf("%s -> socket Error!!\n"); } else { @@ -1522,7 +1527,7 @@ char receiveDecisionFromBackup(unsigned int transID,int nummid,unsigned int *lis break; // received response // else check next machine - freeSock(transPrefetchSockPool, listmid[i],sd); + freeSockWithLock(transPrefetchSockPool, listmid[i],sd); } } #ifdef DEBUG @@ -1567,7 +1572,8 @@ void restoreDuplicationState(unsigned int deadHost) { else { // if i am the leader updateLiveHosts(); duplicateLostObjects(deadHost); - + printf("%s -> got to this point\n",__func__); + if(updateLiveHostsCommit() != 0) { printf("%s -> error updateLiveHostsCommit()\n",__func__); exit(1); @@ -1580,7 +1586,7 @@ void restoreDuplicationState(unsigned int deadHost) { else { pthread_mutex_unlock(&leaderFixing_mutex); #ifdef DEBUG - printf("%s (REMOTE_RESTORE_DUPLICATED_STATE -> LEADER is already fixing\n",__func__); + printf("%s -> LEADER is already fixing\n",__func__); #endif sleep(WAIT_TIME); } @@ -1594,6 +1600,7 @@ void restoreDuplicationState(unsigned int deadHost) { send_data(sd, &ctrl, sizeof(char)); send_data(sd, &deadHost, sizeof(unsigned int)); freeSockWithLock(transPrefetchSockPool,leader,sd); + printf("%s -> Message sent\n",__func__); sleep(WAIT_TIME); } @@ -2380,7 +2387,7 @@ int updateLiveHostsCommit() { //for each machine send data for(i = 0; i < numHostsInSystem; i++) { // hard define num of retries - if(i == myIndexInHostArray) + if(hostIpAddrs[i] == myIpAddr) continue; if(liveHosts[i] == 1) { if((sd = getSockWithLock(transPrefetchSockPool, hostIpAddrs[i])) < 0) { @@ -2492,8 +2499,9 @@ int allHostsLive() { #ifdef RECOVERY void duplicateLostObjects(unsigned int mid){ - +#ifndef DEBUG printf("%s-> Start, mid: [%s]\n", __func__, midtoIPString(mid)); +#endif //this needs to be changed. unsigned int backupMid = getBackupMachine(mid); // get backup machine of dead machine @@ -2629,7 +2637,7 @@ void duplicateLocalOriginalObjects(unsigned int mid) { int tempsize, sd; char *dupeptr, ctrl, response; -#ifndef DEBUG +#ifdef DEBUG printf("%s-> Start\n", __func__); #endif //copy code fom dstmserver here @@ -2659,7 +2667,7 @@ void duplicateLocalOriginalObjects(unsigned int mid) { if(response != DUPLICATION_COMPLETE) { //fail message -#ifndef DEBUG +#ifdef DEBUG printf("%s -> DUPLICATION_FAIL\n",__func__); #endif exit(0); @@ -2667,7 +2675,7 @@ void duplicateLocalOriginalObjects(unsigned int mid) { free(dupeptr); -#ifndef DEBUG +#ifdef DEBUG printf("%s-> End\n", __func__); #endif @@ -3060,47 +3068,28 @@ plistnode_t *pInsert(plistnode_t *pile, objheader_t *headeraddr, unsigned int mi return pile; } +// relocate the position of myIp pile to end of list plistnode_t *sortPiles(plistnode_t *pileptr) { - plistnode_t *head, *ptr, *tail; - head = pileptr; - ptr = pileptr; - /* Get tail pointer */ - while(ptr!= NULL) { - tail = ptr; - ptr = ptr->next; + plistnode_t *ptr, *tail; + tail = pileptr; + ptr = NULL; + /* Get tail pointer and myIp pile ptr */ + while(tail->next != NULL) { + if(tail->mid == myIpAddr) + ptr = tail; + tail = tail->next; } - ptr = pileptr; - plistnode_t *prev = pileptr; - /* Arrange local machine processing at the end of the pile list */ - while(ptr != NULL) { - if(ptr != tail) { - /* - if(ptr->mid == myIpAddr && (prev != pileptr)) { - prev->next = ptr->next; - ptr->next = NULL; - tail->next = ptr; - return pileptr; - } - if((ptr->mid == myIpAddr) && (prev == pileptr)) { - prev->next = ptr->next; - ptr->next = NULL; - tail->next = ptr; - return pileptr; - } - */ - - if((ptr->mid == myIpAddr)) - { - tail->next = pileptr; - pileptr = ptr->next; - ptr->next = NULL; - return pileptr; - } - prev = ptr; - } - ptr = ptr->next; - } - return pileptr; + // if ptr is null, then myIp pile is already at tail + if(ptr != NULL) { + /* Arrange local machine processing at the end of the pile list */ + tail->next = pileptr; + pileptr = ptr->next; + ptr->next = NULL; + return pileptr; + } + + /* get too this point iff myIpAddr pile is at tail */ + return pileptr; } #ifdef RECOVERY