byte info
authorjihoonl <jihoonl>
Tue, 25 May 2010 16:10:09 +0000 (16:10 +0000)
committerjihoonl <jihoonl>
Tue, 25 May 2010 16:10:09 +0000 (16:10 +0000)
Robust/src/Runtime/DSTM/interface_recovery/dstm.h
Robust/src/Runtime/DSTM/interface_recovery/dstmserver.c
Robust/src/Runtime/DSTM/interface_recovery/trans.c

index 4bf76a4f80eb5c613f98d001616b4379e27a84d4..ece2f51cd6c02378349441a2d3c5592eb26d89ee 100644 (file)
@@ -249,10 +249,8 @@ typedef struct trans_commit_data {
  * Structure for Recovery stats
  **************************************/
 typedef struct recoverystat {
-  unsigned int deadMachine;
   long long elapsedTime;
   unsigned int recoveredData;
-  unsigned int recvData;
 } recovery_stat_t;
 #endif
 
index 4868964333476798688dac2aac34304ba30c1ae7..7c7fb5f7395a9d06ad4dc264a2adbd4e338f67c7 100644 (file)
@@ -43,6 +43,7 @@ tlist_t* transList;
 int okCommit; // machine flag
 extern numWaitMachine;
 extern unsigned int currentEpoch;
+extern unsigned int currentBackupMachine;
 unsigned int leader_index;
 
 #endif
@@ -577,7 +578,9 @@ void *dstmAccept(void *acceptfd) {
         printf("control -> UPDATE_LIVE_HOSTS\n");
 #endif
         receiveNewHostLists((int)acceptfd);
-
+        pthread_mutex_lock(&recovery_mutex);
+        currentBackupMachine = getBackupMachine(myIpAddr);
+        pthread_mutex_unlock(&recovery_mutex);
 #ifdef DEBUG
                                printHostsStatus();
                          printf("%s (UPDATE_LIVE_HOSTS)-> Finished\n", __func__);      
@@ -590,6 +593,7 @@ void *dstmAccept(void *acceptfd) {
        {
          struct sockaddr_in remoteAddr;
          int sd;
+         unsigned int dupeSize;
          unsigned int epoch_num;
 
          recv_data((int)acceptfd,&epoch_num,sizeof(unsigned int));
@@ -601,77 +605,86 @@ void *dstmAccept(void *acceptfd) {
          //object store stuffffff
          mid = getBackupMachine(myIpAddr);
 
-         dupeptr = (char*) mhashGetDuplicate(&tempsize, 0);
-
-                               //send control and dupes after
-                               ctrl = RECEIVE_DUPES;
-
-          if((sd = socket(AF_INET, SOCK_STREAM, 0)) < 0) {
-            perror("ORIGINAL : ");
-            exit(0);
-          }
+         if(mid != currentBackupMachine) {
+           currentBackupMachine = mid;
+           dupeptr = (char*) mhashGetDuplicate(&tempsize, 0);
+           //send control and dupes after
+           ctrl = RECEIVE_DUPES;
 
-          bzero(&remoteAddr, sizeof(remoteAddr));
-          remoteAddr.sin_family = AF_INET;
-          remoteAddr.sin_port = htons(LISTEN_PORT);
-          remoteAddr.sin_addr.s_addr = htonl(mid);
-
-          if(connect(sd, (struct sockaddr *)&remoteAddr, sizeof(remoteAddr))<0) {
-            printf("REQUEST_DUPE ERROR : %s\n",strerror(errno));
-//            exit(0);
-            break;
-          }
-          else {
-                               send_data(sd, &ctrl, sizeof(char));
-                               send_data(sd, dupeptr, tempsize);
+            if((sd = socket(AF_INET, SOCK_STREAM, 0)) < 0) {
+              perror("ORIGINAL : ");
+              exit(0);
+            }
+            bzero(&remoteAddr, sizeof(remoteAddr));
+            remoteAddr.sin_family = AF_INET;
+            remoteAddr.sin_port = htons(LISTEN_PORT);
+            remoteAddr.sin_addr.s_addr = htonl(mid);
 
-            if((readDuplicateObjs(sd) )!= 0) {
-              printf("Fail in readDuplicateObj()\n");
+            if(connect(sd, (struct sockaddr *)&remoteAddr, sizeof(remoteAddr))<0) {
+              printf("REQUEST_DUPE ERROR : %s\n",strerror(errno));
               break;
-//              exit(0);
             }
-                                 recv_data(sd, &response, sizeof(char));
+            else {
+                               send_data(sd, &ctrl, sizeof(char));
+                               send_data(sd, dupeptr, tempsize);
+
+              dupeSize = tempsize;
+
+              if((dupeSize += readDuplicateObjs(sd) ) < 0) {
+                break;
+              }
+                                 recv_data(sd, &response, sizeof(char));
                                
-            if(response != DUPLICATION_COMPLETE) {
+              if(response != DUPLICATION_COMPLETE) {
 #ifndef DEBUG
-             printf("%s(REQUEST_DUPE) -> DUPLICATION FAIL\n",__func__);
+               printf("%s(REQUEST_DUPE) -> DUPLICATION FAIL\n",__func__);
 #endif
                                  //fail message
-             break;
+               break;
 //             exit(0);
-                                 }
+                                   }
 
-            close(sd);
-          }
-          free(dupeptr);
+              close(sd);
+            }
+           free(dupeptr);
+
+           ctrl = DUPLICATION_COMPLETE;
+           send_data((int)acceptfd, &ctrl, sizeof(char));
+           send_data((int)acceptfd, &dupeSize,sizeof(unsigned int));
 
-          ctrl = DUPLICATION_COMPLETE;
-                                 send_data((int)acceptfd, &ctrl, sizeof(char));
 #ifdef DEBUG
-                                 printf("%s (REQUEST_DUPE)-> Finished\n", __func__);   
+           printf("%s (REQUEST_DUPE)-> Finished\n", __func__); 
 #endif
+        }
+        else {
+          ctrl = DUPLICATION_COMPLETE;
+          send_data((int)acceptfd,&ctrl,sizeof(char));
+          tempsize = 0;
+          send_data((int)acceptfd,&tempsize,sizeof(unsigned int));
+        }
        }
-                                 break;
+                        break;
+          
 
                        case RECEIVE_DUPES:
 #ifdef DEBUG
         printf("control -> RECEIVE_DUPES sd : %d\n",(int)acceptfd);
 #endif
-                               if((readDuplicateObjs((int)acceptfd)) != 0) {
+                               if((readDuplicateObjs((int)acceptfd)) < 0) {
                                        printf("Error: In readDuplicateObjs() %s, %d\n", __FILE__, __LINE__);
-                                       pthread_exit(NULL);
+//                                     pthread_exit(NULL);
                                }
-        
-        dupeptr = (char*) mhashGetDuplicate(&tempsize, 1);
-        
-        send_data((int)acceptfd,dupeptr,tempsize);
+        else {
+          dupeptr = (char*) mhashGetDuplicate(&tempsize, 1);
+          send_data((int)acceptfd,dupeptr,tempsize);
 
-        free(dupeptr);
-                               ctrl = DUPLICATION_COMPLETE;
-                               send_data((int)acceptfd, &ctrl, sizeof(char));
+          free(dupeptr);
+                               ctrl = DUPLICATION_COMPLETE;
+                               send_data((int)acceptfd, &ctrl, sizeof(char));
 #ifdef DEBUG
-        printf("%s (RECEIVE_DUPES) -> Finished\n",__func__);
+          printf("%s (RECEIVE_DUPES) -> Finished\n",__func__);
 #endif
+        }
                                break;
 #endif
                        default:
@@ -693,22 +706,35 @@ int readDuplicateObjs(int acceptfd) {
        unsigned int oid;
        void *dupeptr, *ptrcreate, *ptr;
        objheader_t *header;
+  int timeout1;
+  int timeout2;
 
-#ifdef DEBUG
+#ifndef DEBUG
        printf("%s-> Start\n", __func__);
 #endif
-       recv_data((int)acceptfd, &numoid, sizeof(unsigned int));
-       recv_data((int)acceptfd, &size, sizeof(int));   
+       timeout1 = recv_data((int)acceptfd, &numoid, sizeof(unsigned int));
+       timeout2 = recv_data((int)acceptfd, &size, sizeof(int));        
+
+  if(timeout1 < 0 || timeout2 < 0) {
+    return -1;
+  }
        
   if(numoid != 0) {
                if ((dupeptr = calloc(1, size)) == NULL) {
                        printf("calloc error for duplicated objects %s, %d\n", __FILE__, __LINE__);
-                       return 1;
+                       return -1;
                }
 
-               recv_data((int)acceptfd, dupeptr, size);
+
+               if(recv_data((int)acceptfd, dupeptr, size) < 0) {
+      free(dupeptr);
+      return -1;
+    }
+  
+    printf("%s -> PAss this point\n",__func__);
 
                ptr = dupeptr;
+    printf("%s -> numoid = %u\n",__func__,numoid);
                for(i = 0; i < numoid; i++) {
                        header = (objheader_t *)ptr;
                        oid = OID(header);
@@ -736,7 +762,7 @@ int readDuplicateObjs(int acceptfd) {
                        if ((ptrcreate = objstrAlloc(&mainobjstore, tmpsize)) == NULL) {
                                printf("Error: readDuplicateObjs() failed objstrAlloc %s, %d\n", __FILE__, __LINE__);
                                pthread_mutex_unlock(&mainobjstore_mutex);
-                                 return 1;
+                                 return -1;
                        }
                        pthread_mutex_unlock(&mainobjstore_mutex);
              memcpy(ptrcreate, header, tmpsize);
@@ -762,14 +788,14 @@ int readDuplicateObjs(int acceptfd) {
       }
                ptr += tmpsize;
                }
-#ifdef DEBUG
+#ifndef DEBUG
                printf("%s-> End\n", __func__);
 #endif
     free(dupeptr);
-               return 0;
+               return size;
        }
        else {
-#ifdef DEBUG
+#ifndef DEBUG
                printf("%s-> No objects duplicated\n", __func__);
 #endif
                return 0;
index 2086cf1a1f1db78f532b28efce38555aa2dea8a2..6a8b12d42a2bf6d82ac9d6ae6d55ce735de4cc13 100644 (file)
@@ -114,6 +114,7 @@ extern tlist_t* transList;
 extern pthread_mutex_t clearNotifyList_mutex;
 
 unsigned int currentEpoch;
+unsigned int currentBackupMachine;
 
 #ifdef RECOVERYSTATS
   int numRecovery = 0;
@@ -1828,7 +1829,6 @@ void restoreDuplicationState(unsigned int deadHost,unsigned int epoch_num)
   unsigned int dupeSize = 0;  // to calculate the size of backed up data
 
   st = myrdtsc(); // to get clock
-  recoverStat[numRecovery].deadMachine = deadHost;
 #endif
   // update leader's live host list and object locations
   
@@ -1872,6 +1872,7 @@ void restoreDuplicationState(unsigned int deadHost,unsigned int epoch_num)
 #ifdef RECOVERYSTATS
   fi = myrdtsc();
   recoverStat[numRecovery].elapsedTime = (fi-st)/CPU_FREQ;
+  recoverStat[numRecovery].recoveredData = flag;
   numRecovery++;
   printRecoveryStat();
 #endif
@@ -3230,6 +3231,11 @@ int updateLiveHostsCommit() {
       freeSockWithLock(transPrefetchSockPool,hostIpAddrs[i],sd);
                }
        }
+
+  pthread_mutex_lock(&recovery_mutex);
+  currentBackupMachine = getBackupMachine(myIpAddr);
+  pthread_mutex_unlock(&recovery_mutex);
+
 #ifdef DEBUG
        printHostsStatus();
   printf("%s -> Finish\n",__func__);
@@ -3290,6 +3296,8 @@ int allHostsLive() {
 int duplicateLostObjects(unsigned int epoch_num,int *sdlist){
   int i;
   char response;
+  unsigned int dupeSize = 0;
+  unsigned int tempSize;
   printf("%s -> Enter\n",__func__);
 
   /* duplicateLostObject example
@@ -3317,18 +3325,21 @@ int duplicateLostObjects(unsigned int epoch_num,int *sdlist){
   for(i = 0 ; i < numHostsInSystem; i ++) {
     if(sdlist[i] == -1)
       continue;
-    if(recv_data(sdlist[i],&response,sizeof(char)))
-      return -2;
 
-    if(response != DUPLICATION_COMPLETE) {
-      return -2;
-    } 
+    if(recv_data(sdlist[i],&response,sizeof(char)))    return -2;
+
+    if(response != DUPLICATION_COMPLETE) return -2; 
+
+    if(recv_data(sdlist[i],&tempSize,sizeof(unsigned int)) < 0) return -2;
+
+    dupeSize += tempSize;
+    
   }
 
 #ifndef DEBUG
        printf("%s-> End\n", __func__);  
 #endif
-  return 0;
+  return dupeSize; 
 }
 #endif
 void addHost(unsigned int hostIp) {
@@ -3868,8 +3879,9 @@ void printRecoveryStat() {
   printf("numRecovery = %d\n",numRecovery);
   int i;
   for(i=0; i < numRecovery;i++) {
-    printf("Dead Machine = %s\n",midtoIPString(recoverStat[i].deadMachine));
+//    printf("Dead Machine = %s\n",midtoIPString(recoverStat[i].deadMachine));
     printf("Recovery Time(ms) = %ld\n",recoverStat[i].elapsedTime);
+    printf("Recovery Byte     = %u\n",recoverStat[i].recoveredData);
   }
   printf("**************************\n\n");
   fflush(stdout);