polling thread ping only leader machine when a machine failure is detected
authorjihoonl <jihoonl>
Tue, 27 Apr 2010 16:57:25 +0000 (16:57 +0000)
committerjihoonl <jihoonl>
Tue, 27 Apr 2010 16:57:25 +0000 (16:57 +0000)
Robust/src/Runtime/DSTM/interface_recovery/dstm.h
Robust/src/Runtime/DSTM/interface_recovery/dstmserver.c
Robust/src/Runtime/DSTM/interface_recovery/trans.c

index c7c9f216e7d31c58da583b3849577034a3a85cb0..ae3c6d02570113aee716588a0b6182db17ab53d5 100644 (file)
@@ -301,7 +301,7 @@ unsigned int getBackupMachine(unsigned int mid);
 unsigned int getDuplicatedPrimaryMachine(unsigned int mid);
 int getNumLiveHostsInSystem();
 int getMyStatus();
-void* startAsking();
+void* startPolling();
 unsigned int checkIfAnyMachineDead(int*);
 void clearDeadThreadsNotification();
 /* end duplication */
index d104934798bc3eae60e719fc9556ea8c64165abe..8bf19a1811b12ccee1be3071b195f7b914f1e866 100644 (file)
@@ -43,6 +43,7 @@ tlist_t* transList;
 int okCommit; // machine flag
 extern numWaitMachine;
 extern unsigned int currentEpoch;
+unsigned int leader_index;
 
 #endif
 
@@ -176,7 +177,7 @@ void *dstmListen(void *lfd) {
 #ifdef RECOVERY
     if(firsttime) {
       do {
-        retval = pthread_create(&thread_dstm_asking, NULL, startAsking, NULL);
+        retval = pthread_create(&thread_dstm_asking, NULL, startPolling, NULL);
       }while(retval!=0);
       firsttime=0;
       pthread_detach(thread_dstm_asking);
@@ -191,7 +192,7 @@ void *dstmListen(void *lfd) {
 }
 
 #ifdef RECOVERY
-void* startAsking()
+void* startPolling()
 {
   unsigned int deadMachineIndex = -1;
   int i;
@@ -236,24 +237,41 @@ unsigned int checkIfAnyMachineDead(int* socklist)
   char response;
   
   while(1){
-    for(i = 0; i< numHostsInSystem;i++) {
-      if(socklist[i] > 0) {
-        send_data(socklist[i], &control,sizeof(char));
 
-        if(recv_data(socklist[i], &response, sizeof(char)) < 0) {
-          // if machine is dead, returns index of socket
-          return i;
-        }
-        else {
-          // machine responded
-          if(response != LIVE) {
+    if(okCommit == TRANS_OK) {
+      for(i = 0; i< numHostsInSystem;i++) {
+        if(socklist[i] > 0) {
+          send_data(socklist[i], &control,sizeof(char));
+  
+          if(recv_data(socklist[i], &response, sizeof(char)) < 0) {
+            // if machine is dead, returns index of socket
             return i;
           }
-        } // end else
-      }// end if(socklist[i]
-    } // end for()
+          else {
+            // machine responded
+            if(response != LIVE) {
+              return i;
+            }
+          } // end else
+        }// end if(socklist[i]
+      } // end for()
 
-    clearDeadThreadsNotification();
+      clearDeadThreadsNotification();
+    }
+    else {
+      send_data(socklist[i],&control,sizeof(char));
+
+      if(recv_data(socklist[i], &response, sizeof(char)) < 0) {
+        // if machine is dead, returns index of socket
+        return i;
+      }
+      else {
+        // machine responded
+        if(response != LIVE) {
+          return i;
+        }
+      } // end else
+    }
 
     sleep(numLiveHostsInSystem);  // wait for seconds for next checking
   } // end while(1)
@@ -498,7 +516,9 @@ void *dstmAccept(void *acceptfd) {
 #ifdef RECOVERY
       case REQUEST_TRANS_WAIT:
         { 
+          unsigned int new_leader_index;
           recv_data((int)acceptfd,&epoch_num,sizeof(unsigned int));
+          recv_data((int)acceptfd,&new_leader_index,sizeof(unsigned int));
 
           if(inspectEpoch(epoch_num) < 0) {
             response = RESPOND_HIGHER_EPOCH;
@@ -506,8 +526,14 @@ void *dstmAccept(void *acceptfd) {
           }
           else {
             printf("Got new Leader! : %d\n",epoch_num);
-            currentEpoch = epoch_num;
+
             stopTransactions(TRANS_BEFORE);
+
+            pthread_mutex_lock(&recovery_mutex);
+            currentEpoch = epoch_num;
+            leader_index = new_leader_index;
+            pthread_mutex_unlock(&recovery_mutex);
+            
             response = RESPOND_TRANS_WAIT;
             send_data((int)acceptfd,&response,sizeof(char));
             sendMyList((int)acceptfd);
index 073daa5f4ce463bbc86a2fe9797b2db2dbf38003..d5d433ef4acd0c2dd99a0d7f19896702c11183e9 100644 (file)
@@ -532,8 +532,6 @@ int dstmStartup(const char * option) {
                updateLiveHosts();
                setLocateObjHosts();
                updateLiveHostsCommit();
-//             leader = paxos(hostIpAddrs,liveHosts,myIpAddr,numHostsInSystem,numLiveHostsInSystem);
-//    printHostsStatus();
                if(!allHostsLive()) {
                        printf("Not all hosts live. Exiting.\n");
                        exit(-1);
@@ -1895,6 +1893,7 @@ int pingMachines(unsigned int epoch_num,int* sdlist,tlist_t** tList)
     request = REQUEST_TRANS_WAIT;
     send_data(sdlist[i],&request, sizeof(char));
     send_data(sdlist[i],&epoch_num,sizeof(unsigned int));
+    send_data(sdlist[i],&myIndexInHostArray,sizeof(unsigned int));
   }
 
   /* stop all local transactions */