From: jihoonl Date: Tue, 25 May 2010 16:10:09 +0000 (+0000) Subject: byte info X-Git-Url: http://demsky.eecs.uci.edu/git/?a=commitdiff_plain;h=9ce7eb037b6311e5209c5d1a5022d2e25c3ec247;p=IRC.git byte info --- diff --git a/Robust/src/Runtime/DSTM/interface_recovery/dstm.h b/Robust/src/Runtime/DSTM/interface_recovery/dstm.h index 4bf76a4f..ece2f51c 100644 --- a/Robust/src/Runtime/DSTM/interface_recovery/dstm.h +++ b/Robust/src/Runtime/DSTM/interface_recovery/dstm.h @@ -249,10 +249,8 @@ typedef struct trans_commit_data { * Structure for Recovery stats **************************************/ typedef struct recoverystat { - unsigned int deadMachine; long long elapsedTime; unsigned int recoveredData; - unsigned int recvData; } recovery_stat_t; #endif diff --git a/Robust/src/Runtime/DSTM/interface_recovery/dstmserver.c b/Robust/src/Runtime/DSTM/interface_recovery/dstmserver.c index 48689643..7c7fb5f7 100644 --- a/Robust/src/Runtime/DSTM/interface_recovery/dstmserver.c +++ b/Robust/src/Runtime/DSTM/interface_recovery/dstmserver.c @@ -43,6 +43,7 @@ tlist_t* transList; int okCommit; // machine flag extern numWaitMachine; extern unsigned int currentEpoch; +extern unsigned int currentBackupMachine; unsigned int leader_index; #endif @@ -577,7 +578,9 @@ void *dstmAccept(void *acceptfd) { printf("control -> UPDATE_LIVE_HOSTS\n"); #endif receiveNewHostLists((int)acceptfd); - + pthread_mutex_lock(&recovery_mutex); + currentBackupMachine = getBackupMachine(myIpAddr); + pthread_mutex_unlock(&recovery_mutex); #ifdef DEBUG printHostsStatus(); printf("%s (UPDATE_LIVE_HOSTS)-> Finished\n", __func__); @@ -590,6 +593,7 @@ void *dstmAccept(void *acceptfd) { { struct sockaddr_in remoteAddr; int sd; + unsigned int dupeSize; unsigned int epoch_num; recv_data((int)acceptfd,&epoch_num,sizeof(unsigned int)); @@ -601,77 +605,86 @@ void *dstmAccept(void *acceptfd) { //object store stuffffff mid = getBackupMachine(myIpAddr); - dupeptr = (char*) mhashGetDuplicate(&tempsize, 0); - - //send control and dupes after - ctrl = RECEIVE_DUPES; - - if((sd = socket(AF_INET, SOCK_STREAM, 0)) < 0) { - perror("ORIGINAL : "); - exit(0); - } + if(mid != currentBackupMachine) { + currentBackupMachine = mid; + dupeptr = (char*) mhashGetDuplicate(&tempsize, 0); + //send control and dupes after + ctrl = RECEIVE_DUPES; - bzero(&remoteAddr, sizeof(remoteAddr)); - remoteAddr.sin_family = AF_INET; - remoteAddr.sin_port = htons(LISTEN_PORT); - remoteAddr.sin_addr.s_addr = htonl(mid); - - if(connect(sd, (struct sockaddr *)&remoteAddr, sizeof(remoteAddr))<0) { - printf("REQUEST_DUPE ERROR : %s\n",strerror(errno)); -// exit(0); - break; - } - else { - send_data(sd, &ctrl, sizeof(char)); - send_data(sd, dupeptr, tempsize); + if((sd = socket(AF_INET, SOCK_STREAM, 0)) < 0) { + perror("ORIGINAL : "); + exit(0); + } + bzero(&remoteAddr, sizeof(remoteAddr)); + remoteAddr.sin_family = AF_INET; + remoteAddr.sin_port = htons(LISTEN_PORT); + remoteAddr.sin_addr.s_addr = htonl(mid); - if((readDuplicateObjs(sd) )!= 0) { - printf("Fail in readDuplicateObj()\n"); + if(connect(sd, (struct sockaddr *)&remoteAddr, sizeof(remoteAddr))<0) { + printf("REQUEST_DUPE ERROR : %s\n",strerror(errno)); break; -// exit(0); } - recv_data(sd, &response, sizeof(char)); + else { + send_data(sd, &ctrl, sizeof(char)); + send_data(sd, dupeptr, tempsize); + + dupeSize = tempsize; + + if((dupeSize += readDuplicateObjs(sd) ) < 0) { + break; + } + recv_data(sd, &response, sizeof(char)); - if(response != DUPLICATION_COMPLETE) { + if(response != DUPLICATION_COMPLETE) { #ifndef DEBUG - printf("%s(REQUEST_DUPE) -> DUPLICATION FAIL\n",__func__); + printf("%s(REQUEST_DUPE) -> DUPLICATION FAIL\n",__func__); #endif //fail message - break; + break; // exit(0); - } + } - close(sd); - } - free(dupeptr); + close(sd); + } + free(dupeptr); + + ctrl = DUPLICATION_COMPLETE; + send_data((int)acceptfd, &ctrl, sizeof(char)); + send_data((int)acceptfd, &dupeSize,sizeof(unsigned int)); - ctrl = DUPLICATION_COMPLETE; - send_data((int)acceptfd, &ctrl, sizeof(char)); #ifdef DEBUG - printf("%s (REQUEST_DUPE)-> Finished\n", __func__); + printf("%s (REQUEST_DUPE)-> Finished\n", __func__); #endif + } + else { + ctrl = DUPLICATION_COMPLETE; + send_data((int)acceptfd,&ctrl,sizeof(char)); + tempsize = 0; + send_data((int)acceptfd,&tempsize,sizeof(unsigned int)); + } } - break; + break; + case RECEIVE_DUPES: #ifdef DEBUG printf("control -> RECEIVE_DUPES sd : %d\n",(int)acceptfd); #endif - if((readDuplicateObjs((int)acceptfd)) != 0) { + if((readDuplicateObjs((int)acceptfd)) < 0) { printf("Error: In readDuplicateObjs() %s, %d\n", __FILE__, __LINE__); - pthread_exit(NULL); +// pthread_exit(NULL); } - - dupeptr = (char*) mhashGetDuplicate(&tempsize, 1); - - send_data((int)acceptfd,dupeptr,tempsize); + else { + dupeptr = (char*) mhashGetDuplicate(&tempsize, 1); + send_data((int)acceptfd,dupeptr,tempsize); - free(dupeptr); - ctrl = DUPLICATION_COMPLETE; - send_data((int)acceptfd, &ctrl, sizeof(char)); + free(dupeptr); + ctrl = DUPLICATION_COMPLETE; + send_data((int)acceptfd, &ctrl, sizeof(char)); #ifdef DEBUG - printf("%s (RECEIVE_DUPES) -> Finished\n",__func__); + printf("%s (RECEIVE_DUPES) -> Finished\n",__func__); #endif + } break; #endif default: @@ -693,22 +706,35 @@ int readDuplicateObjs(int acceptfd) { unsigned int oid; void *dupeptr, *ptrcreate, *ptr; objheader_t *header; + int timeout1; + int timeout2; -#ifdef DEBUG +#ifndef DEBUG printf("%s-> Start\n", __func__); #endif - recv_data((int)acceptfd, &numoid, sizeof(unsigned int)); - recv_data((int)acceptfd, &size, sizeof(int)); + timeout1 = recv_data((int)acceptfd, &numoid, sizeof(unsigned int)); + timeout2 = recv_data((int)acceptfd, &size, sizeof(int)); + + if(timeout1 < 0 || timeout2 < 0) { + return -1; + } if(numoid != 0) { if ((dupeptr = calloc(1, size)) == NULL) { printf("calloc error for duplicated objects %s, %d\n", __FILE__, __LINE__); - return 1; + return -1; } - recv_data((int)acceptfd, dupeptr, size); + + if(recv_data((int)acceptfd, dupeptr, size) < 0) { + free(dupeptr); + return -1; + } + + printf("%s -> PAss this point\n",__func__); ptr = dupeptr; + printf("%s -> numoid = %u\n",__func__,numoid); for(i = 0; i < numoid; i++) { header = (objheader_t *)ptr; oid = OID(header); @@ -736,7 +762,7 @@ int readDuplicateObjs(int acceptfd) { if ((ptrcreate = objstrAlloc(&mainobjstore, tmpsize)) == NULL) { printf("Error: readDuplicateObjs() failed objstrAlloc %s, %d\n", __FILE__, __LINE__); pthread_mutex_unlock(&mainobjstore_mutex); - return 1; + return -1; } pthread_mutex_unlock(&mainobjstore_mutex); memcpy(ptrcreate, header, tmpsize); @@ -762,14 +788,14 @@ int readDuplicateObjs(int acceptfd) { } ptr += tmpsize; } -#ifdef DEBUG +#ifndef DEBUG printf("%s-> End\n", __func__); #endif free(dupeptr); - return 0; + return size; } else { -#ifdef DEBUG +#ifndef DEBUG printf("%s-> No objects duplicated\n", __func__); #endif return 0; diff --git a/Robust/src/Runtime/DSTM/interface_recovery/trans.c b/Robust/src/Runtime/DSTM/interface_recovery/trans.c index 2086cf1a..6a8b12d4 100644 --- a/Robust/src/Runtime/DSTM/interface_recovery/trans.c +++ b/Robust/src/Runtime/DSTM/interface_recovery/trans.c @@ -114,6 +114,7 @@ extern tlist_t* transList; extern pthread_mutex_t clearNotifyList_mutex; unsigned int currentEpoch; +unsigned int currentBackupMachine; #ifdef RECOVERYSTATS int numRecovery = 0; @@ -1828,7 +1829,6 @@ void restoreDuplicationState(unsigned int deadHost,unsigned int epoch_num) unsigned int dupeSize = 0; // to calculate the size of backed up data st = myrdtsc(); // to get clock - recoverStat[numRecovery].deadMachine = deadHost; #endif // update leader's live host list and object locations @@ -1872,6 +1872,7 @@ void restoreDuplicationState(unsigned int deadHost,unsigned int epoch_num) #ifdef RECOVERYSTATS fi = myrdtsc(); recoverStat[numRecovery].elapsedTime = (fi-st)/CPU_FREQ; + recoverStat[numRecovery].recoveredData = flag; numRecovery++; printRecoveryStat(); #endif @@ -3230,6 +3231,11 @@ int updateLiveHostsCommit() { freeSockWithLock(transPrefetchSockPool,hostIpAddrs[i],sd); } } + + pthread_mutex_lock(&recovery_mutex); + currentBackupMachine = getBackupMachine(myIpAddr); + pthread_mutex_unlock(&recovery_mutex); + #ifdef DEBUG printHostsStatus(); printf("%s -> Finish\n",__func__); @@ -3290,6 +3296,8 @@ int allHostsLive() { int duplicateLostObjects(unsigned int epoch_num,int *sdlist){ int i; char response; + unsigned int dupeSize = 0; + unsigned int tempSize; printf("%s -> Enter\n",__func__); /* duplicateLostObject example @@ -3317,18 +3325,21 @@ int duplicateLostObjects(unsigned int epoch_num,int *sdlist){ for(i = 0 ; i < numHostsInSystem; i ++) { if(sdlist[i] == -1) continue; - if(recv_data(sdlist[i],&response,sizeof(char))) - return -2; - if(response != DUPLICATION_COMPLETE) { - return -2; - } + if(recv_data(sdlist[i],&response,sizeof(char))) return -2; + + if(response != DUPLICATION_COMPLETE) return -2; + + if(recv_data(sdlist[i],&tempSize,sizeof(unsigned int)) < 0) return -2; + + dupeSize += tempSize; + } #ifndef DEBUG printf("%s-> End\n", __func__); #endif - return 0; + return dupeSize; } #endif void addHost(unsigned int hostIp) { @@ -3868,8 +3879,9 @@ void printRecoveryStat() { printf("numRecovery = %d\n",numRecovery); int i; for(i=0; i < numRecovery;i++) { - printf("Dead Machine = %s\n",midtoIPString(recoverStat[i].deadMachine)); +// printf("Dead Machine = %s\n",midtoIPString(recoverStat[i].deadMachine)); printf("Recovery Time(ms) = %ld\n",recoverStat[i].elapsedTime); + printf("Recovery Byte = %u\n",recoverStat[i].recoveredData); } printf("**************************\n\n"); fflush(stdout);