simultaneous failure
authorjihoonl <jihoonl>
Sat, 22 May 2010 00:46:51 +0000 (00:46 +0000)
committerjihoonl <jihoonl>
Sat, 22 May 2010 00:46:51 +0000 (00:46 +0000)
Robust/src/Runtime/DSTM/interface_recovery/dstm.h
Robust/src/Runtime/DSTM/interface_recovery/dstmserver.c
Robust/src/Runtime/DSTM/interface_recovery/ip.c
Robust/src/Runtime/DSTM/interface_recovery/plookup.c
Robust/src/Runtime/DSTM/interface_recovery/plookup.h
Robust/src/Runtime/DSTM/interface_recovery/tlookup.c
Robust/src/Runtime/DSTM/interface_recovery/trans.c

index 148439a7cf6301af0f438f735070eb0231bc04c5..4bf76a4f80eb5c613f98d001616b4379e27a84d4 100644 (file)
@@ -318,8 +318,8 @@ int pingMachines(unsigned int epoch_num,int* sdlist,tlist_t**);
 int releaseNewLists(unsigned int epoch_num,int* sdlist,tlist_t*);
 int duplicateLostObjects(unsigned int epoch_num,int* sdlist);
 void restartTransactions(unsigned int epoch_num,int* sdlist);
-void makeTransactionLists(tlist_t**,int);
-void computeLiveHosts(int);
+int makeTransactionLists(tlist_t**,int);
+int computeLiveHosts(int);
 void waitForAllMachine();
 int readDuplicateObjs(int);
 void printRecoveryStat();
index 180f67b59c1a94ad457c64b0416849fb7641b2f4..4868964333476798688dac2aac34304ba30c1ae7 100644 (file)
@@ -227,6 +227,8 @@ void* startPolling()
         socklist[deadMachineIndex] = -1;
       } // end of if 2
     } // end of while 1
+
+    free(socklist);
 }
 
 
@@ -239,7 +241,7 @@ unsigned int checkIfAnyMachineDead(int* socklist)
   
   while(1){
 
-    if(okCommit == TRANS_OK) {
+//    if(okCommit == TRANS_OK) {
       for(i = 0; i< numHostsInSystem;i++) {
         if(socklist[i] > 0) {
           send_data(socklist[i], &control,sizeof(char));
@@ -258,12 +260,13 @@ unsigned int checkIfAnyMachineDead(int* socklist)
       } // end for()
 
       clearDeadThreadsNotification();
-    }
+//    }
+    /*
     else {
       if(leader_index >= 0 ) {
-        send_data(socklist[i],&control,sizeof(char));
+        send_data(socklist[leader_index],&control,sizeof(char));
   
-        if(recv_data(socklist[i], &response, sizeof(char)) < 0) {
+        if(recv_data(socklist[leader_index], &response, sizeof(char)) < 0) {
           // if machine is dead, returns index of socket
           return i;
         }
@@ -275,7 +278,7 @@ unsigned int checkIfAnyMachineDead(int* socklist)
         } // end else
       }
     }
-
+*/
     sleep(numLiveHostsInSystem);  // wait for seconds for next checking
   } // end while(1)
 }
index d5c6b45a806720637eee2530692a74b4bd6930be..00170466f60acd15aaf49da5298b3448b86b150b 100644 (file)
@@ -29,9 +29,6 @@ void midtoIP(unsigned int mid, char *ptr) {
   i.c = (mid & 0x0000ff00) >> 8;
   i.d = mid & 0x000000ff;
   sprintf(ptr, "%d.%d.%d.%d", i.a, i.b, i.c, i.d);
-#ifdef DEBUG
-  printf("DEBUG-> midtoIP() mid = %d.%d.%d.%d\n", i.a, i.b, i.c, i.d);
-#endif
   return;
 }
 
index 3eee2cbd0abf66c75a2c5f9b00366f2f0bbb57f2..7f4631ef6836b9f7386dd179c0281ad3583100f9 100644 (file)
@@ -79,3 +79,30 @@ void pDelete(plistnode_t *pile) {
   }
   return;
 }
+
+void pPrint(plistnode_t *pile)
+{
+  int i;
+  plistnode_t* walker = pile;
+
+  printf("===========pPrint===========\n");
+
+  while(walker) {
+    printf("\nMid = %s\n",midtoIPString(walker->mid));
+    printf("== oidread ==\n");
+    for(i = 0 ; i < walker->numread; i++)
+      printf("%d\n",walker->objread[i]);
+    printf("\n== oidmood ==\n");
+    
+    for(i = 0 ; i < walker->nummod; i++)
+      printf("%d\n",walker->oidmod[i]);
+  
+    printf("\n== oidcreate ==\n");
+    for(i = 0 ; i < walker->numcreated; i++)
+      printf("%d\n",walker->oidcreated[i]);
+
+    walker = walker->next;
+  }
+
+  printf("============= End ========\n");
+}
index d4137839d12d5647836091fc1055fec50eca0576..801703e0fd49d591184c850843a1246ec4665818 100644 (file)
@@ -23,6 +23,6 @@ plistnode_t  *pCreate(int);
 int pCount(plistnode_t *pile);
 int pListMid(plistnode_t *pile, unsigned int *list);
 void pDelete(plistnode_t *pile);
-
+void pPrint(plistnode_t *pile);
 #endif
 
index 23b00a63331baf6719c89517570a38a9415cad06..cafd594c3bd7c85d1873c11ff8097adca5065bea 100644 (file)
@@ -46,9 +46,6 @@ unsigned int thashInsert(unsigned int transid, char decision) {
   ptr = tlookup.table;
   tlookup.numelements++;
 
-#ifdef DEBUG
-  printf("DEBUG(insert) transid = %d, decision  = %d, index = %d\n",transid, decision, index);
-#endif
   if(ptr[index].next == NULL && ptr[index].transid == 0) {          // Insert at the first position in the hashtable
     ptr[index].transid = transid;
     ptr[index].decision = decision;
index 63431d0836ec6e1f5b18d83b58b9d62a9ab3cbd2..2086cf1a1f1db78f532b28efce38555aa2dea8a2 100644 (file)
@@ -1225,7 +1225,8 @@ int transCommit() {
 
 #ifdef RECOVERY
   while(okCommit != TRANS_OK) {
-//    printf("%s -> new Transactin is waiting\n",__func__);
+//    printf("%s -> new Transaction is waiting\n",__func__);
+ //   sleep(1);
     randomdelay();
   }
 
@@ -1794,6 +1795,14 @@ void notifyLeaderDeadMachine(unsigned int deadHost) {
                return;
        }
   
+  pthread_mutex_lock(&liveHosts_mutex);
+  liveHosts[findHost(deadHost)] = 0;
+  numLiveHostsInSystem--;
+  pthread_mutex_unlock(&liveHosts_mutex);
+
+  if(numLiveHostsInSystem == 1)
+    return;
+
   // increase epoch number by number machines in the system
   pthread_mutex_lock(&recovery_mutex);
   epoch_num = currentEpoch = INCREASE_EPOCH(currentEpoch,numHostsInSystem,myIndexInHostArray);
@@ -1824,28 +1833,32 @@ void restoreDuplicationState(unsigned int deadHost,unsigned int epoch_num)
   // update leader's live host list and object locations
   
   do {
-    sdlist = getSocketLists();
+    do {
+      sdlist = getSocketLists();
+  
+      printf("%s -> I'm currently leader num : %d\n\n",__func__,epoch_num);
+      if((flag = pingMachines(epoch_num,sdlist,&tList)) < 0) break;
 
-    printf("%s -> I'm currently leader num : %d\n\n",__func__,epoch_num);
-    if((flag = pingMachines(epoch_num,sdlist,&tList)) < 0) {
-      break;;
-    }
+      printf("%s -> I'm currently leader num : %d\n\n",__func__,epoch_num);
+      if((flag = releaseNewLists(epoch_num,sdlist,tList)) < 0) break;
     
-    printf("%s -> I'm currently leader num : %d\n\n",__func__,epoch_num);
+      // transfer lost objects
+      if((flag= duplicateLostObjects(epoch_num,sdlist)) < 0) break;
 
-    if((flag = releaseNewLists(epoch_num,sdlist,tList)) < 0) {
-      break;;
-    }
+      // restart transactions
+      restartTransactions(epoch_num,sdlist);
+    }while(0);
+
+    freeSocketLists(sdlist);
 
-    // transfer lost objects
-    if((flag= duplicateLostObjects(epoch_num,sdlist)) < 0) {
+    // falg == 0  - fixed
+    //      == -1 - higher epoch
+    //      == -2 - found another failure, redo everything
+    if(flag > -2)
       break;
-    }
-    // restart transactions
-    restartTransactions(epoch_num,sdlist);
-  }while(0);
 
-  freeSocketLists(sdlist);
+    printf("%s -> Retry \n",__func__);
+  }while(0);
 
   if(flag < 0) {
     printf("%s -> higher epoch\n",__func__);
@@ -1854,7 +1867,8 @@ void restoreDuplicationState(unsigned int deadHost,unsigned int epoch_num)
       randomdelay();
     }
     
-  }else {
+  }else { 
+    printf("%s -> I was leader! num : %d\n",__func__,epoch_num);
 #ifdef RECOVERYSTATS
   fi = myrdtsc();
   recoverStat[numRecovery].elapsedTime = (fi-st)/CPU_FREQ;
@@ -1979,14 +1993,19 @@ int pingMachines(unsigned int epoch_num,int* sdlist,tlist_t** tList)
     if(sdlist[i] == -1 || hostIpAddrs[i] == myIpAddr)
       continue;
 
-    recv_data(sdlist[i],&response,sizeof(char));
+    if(recv_data(sdlist[i],&response,sizeof(char)) < 0)
+    {
+      tlistDestroy(currentTransactionList);
+      return -2;
+    }
 
     if(response == RESPOND_TRANS_WAIT) 
     {
-      // receive live host list
-      computeLiveHosts(sdlist[i]);
-      // receive transaction list
-      makeTransactionLists(&currentTransactionList,sdlist[i]);
+      // receive live host list       // receive transaction list
+      if(computeLiveHosts(sdlist[i]) < 0 || makeTransactionLists(&currentTransactionList,sdlist[i]) < 0) {
+        tlistDestroy(currentTransactionList);
+        return -2;
+      }
     }
     else if(response == RESPOND_HIGHER_EPOCH)
     {
@@ -2017,12 +2036,13 @@ int pingMachines(unsigned int epoch_num,int* sdlist,tlist_t** tList)
   return 0;
 }
 
-void computeLiveHosts(int sd)
+int computeLiveHosts(int sd)
 {
   int receivedHost[numHostsInSystem];
   int i;
   
-  recv_data(sd,receivedHost,sizeof(int)*numHostsInSystem);
+  if(recv_data(sd,receivedHost,sizeof(int)*numHostsInSystem))
+    return -2;
 
   for(i = 0 ; i < numHostsInSystem;i ++)
   {
@@ -2034,7 +2054,7 @@ void computeLiveHosts(int sd)
       liveHosts[i] = 0;
   }
   
-  return;
+  return 0;
 }
 
 int releaseNewLists(unsigned int epoch_num,int* sdlist,tlist_t* tlist)
@@ -2092,7 +2112,10 @@ int releaseNewLists(unsigned int epoch_num,int* sdlist,tlist_t* tlist)
     if(sdlist[i] != -1 && hostIpAddrs[i] != myIpAddr)
     {
  //     printf("%s -> Waiting for %s\n",__func__,midtoIPString(hostIpAddrs[i]));
-      recv_data(sdlist[i], &response, sizeof(char));
+      if(recv_data(sdlist[i], &response, sizeof(char)) < 0) {
+        tlistDestroy(tlist);  
+        return -2;
+      }
       if(response != TRANS_OK)
       {
         printf("%s -> response : %d Need to fix\n",__func__,response);
@@ -2106,11 +2129,12 @@ int releaseNewLists(unsigned int epoch_num,int* sdlist,tlist_t* tlist)
   }
   tlistDestroy(tlist);  
   printf("%s -> End\n",__func__);
+  return 0;
 }
 
 // after this fuction
 // leader knows all the on-going transaction list and their decisions
-void makeTransactionLists(tlist_t** tlist,int sd)
+int makeTransactionLists(tlist_t** tlist,int sd)
 {
   tlist_node_t* transArray;
   tlist_node_t* tmp;
@@ -2119,14 +2143,18 @@ void makeTransactionLists(tlist_t** tlist,int sd)
   int size;
 
   // receive all on-going transaction list
-  recv_data(sd, &size, sizeof(int));
+  if(recv_data(sd, &size, sizeof(int)) < 0) 
+    return -2;
 
   if((transArray = calloc(size, sizeof(tlist_node_t))) == NULL) {
     printf("%s -> calloc error\n",__func__);
     exit(0);
   }
       
-  recv_data(sd,transArray, sizeof(tlist_node_t) * size);
+  if(recv_data(sd,transArray, sizeof(tlist_node_t) * size) < 0) {
+    free(transArray);
+    return -2;
+  }
 
   // add into currentTransactionList
   for(j = 0 ; j < size; j ++) {
@@ -2158,7 +2186,8 @@ void makeTransactionLists(tlist_t** tlist,int sd)
     send_data(sd, &request, sizeof(char));
     send_data(sd, &(walker->transid), sizeof(unsigned int));
 
-    recv_data(sd, &respond, sizeof(char));
+    if(recv_data(sd, &respond, sizeof(char)) < 0)
+      return -2;
 
     if(respond  > 0)
     {
@@ -2171,7 +2200,7 @@ void makeTransactionLists(tlist_t** tlist,int sd)
   request = REQUEST_TRANS_COMPLETE;
   send_data(sd, &request,sizeof(char));    
 
-  return;
+  return 0;
 }
 
 void restartTransactions(unsigned int epoch_num,int* sdlist)
@@ -3016,7 +3045,7 @@ int processConfigFile() {
   myIndexInHostArray = findHost(myIpAddr);
 #ifdef RECOVERY
        liveHosts[myIndexInHostArray] = 1;
-  currentEpoch = myIndexInHostArray;
+  currentEpoch = 1;
 
 #ifdef RECOVERYSTATS
   numRecovery = 0;
@@ -3288,17 +3317,18 @@ int duplicateLostObjects(unsigned int epoch_num,int *sdlist){
   for(i = 0 ; i < numHostsInSystem; i ++) {
     if(sdlist[i] == -1)
       continue;
-    recv_data(sdlist[i],&response,sizeof(char));
+    if(recv_data(sdlist[i],&response,sizeof(char)))
+      return -2;
 
     if(response != DUPLICATION_COMPLETE) {
-      printf("%s -> fail!\n",__func__);
-      exit(0);
+      return -2;
     } 
   }
 
 #ifndef DEBUG
        printf("%s-> End\n", __func__);  
 #endif
+  return 0;
 }
 #endif
 void addHost(unsigned int hostIp) {