recovery..
authorjihoonl <jihoonl>
Tue, 22 Jun 2010 04:09:44 +0000 (04:09 +0000)
committerjihoonl <jihoonl>
Tue, 22 Jun 2010 04:09:44 +0000 (04:09 +0000)
still needs to fix

Robust/src/Runtime/DSTM/interface_recovery/dstm.h
Robust/src/Runtime/DSTM/interface_recovery/dstmserver.c
Robust/src/Runtime/DSTM/interface_recovery/trans.c
Robust/src/Runtime/DSTM/interface_recovery/translist.c
Robust/src/Runtime/DSTM/interface_recovery/translist.h

index ece2f51cd6c02378349441a2d3c5592eb26d89ee..80c42f643c90d3e01083e0ce7e342eb06b998619 100644 (file)
@@ -282,12 +282,11 @@ unsigned int updateLiveHosts();
 void updateLiveHostsList(int mid);
 int updateLiveHostsCommit();
 void receiveNewHostLists(int accept);
-void stopTransactions(int TRANS_FLAG);
+int stopTransactions(int TRANS_FLAG,unsigned int epoch_num);
 void sendMyList(int);
 void sendTransList(int acceptfd);
 int receiveTransList(int acceptfd);
 int combineTransactionList(tlist_node_t* tArray,int size);
-char inspectTransaction(char control,unsigned int transid,char* debug,int TRANS_FLAG);
 
 void respondToLeader();
 void setLocateObjHosts();
@@ -311,12 +310,12 @@ void notifyLeaderDeadMachine(unsigned int deadHost);
 void restoreDuplicationState(unsigned int deadHost,unsigned int epoch_num);
 int* getSocketLists();
 void freeSocketLists(int*);
-int inspectEpoch(unsigned int);
+int inspectEpoch(unsigned int,const char*);
 int pingMachines(unsigned int epoch_num,int* sdlist,tlist_t**);
 int releaseNewLists(unsigned int epoch_num,int* sdlist,tlist_t*);
 int duplicateLostObjects(unsigned int epoch_num,int* sdlist);
 void restartTransactions(unsigned int epoch_num,int* sdlist);
-int makeTransactionLists(tlist_t**,int);
+int makeTransactionLists(tlist_t**,int sd,unsigned int epoch_num);
 int computeLiveHosts(int);
 void waitForAllMachine();
 int readDuplicateObjs(int);
@@ -335,6 +334,7 @@ void *dstmAccept(void *);
 
 int readClientReq(trans_commit_data_t *, int);
 int processClientReq(fixed_data_t *, trans_commit_data_t *,unsigned int *, char *, void *, unsigned int *, int);
+void commitObjects(char control,fixed_data_t* fixed,trans_commit_data_t* transinfo,void* modptr,unsigned int* oidmod,int acceptfd);
 char checkDecision(unsigned int);
 char receiveDecisionFromBackup(unsigned int,int,unsigned int*);
 char handleTransReq(fixed_data_t *, trans_commit_data_t *, unsigned int *, char *, void *, int);
@@ -378,6 +378,7 @@ __attribute__((pure)) objheader_t *transRead2(unsigned int);
 objheader_t *transCreateObj(unsigned int); //returns oid header
 unsigned int locateBackupMachine(unsigned int oid);
 int transCommit(); //return 0 if successful
+void commitMessages(unsigned int epoch_num,int* sdlist,unsigned int deadsd,int pilecount,trans_req_data_t* tosend,char finalResponse,char treplyretry,trans_commit_data_t transinfo);
 void *transRequest(void *);     //the C routine that the thread will execute when TRANS_REQUEST begins
 char decideResponse(char *, char *,  int); // Coordinator decides what response to send to the participant
 void *getRemoteObj(unsigned int, unsigned int); // returns object header from main object store after object is copied into it from remote machine
index 7c7fb5f7395a9d06ad4dc264a2adbd4e338f67c7..b28d3140fbc4738d338e9fe8363c905a2f008a9c 100644 (file)
@@ -38,6 +38,7 @@ extern int *liveHosts;
 extern int numLiveHostsInSystem;
 int clearNotifyListFlag;
 pthread_mutex_t clearNotifyList_mutex;
+pthread_mutex_t translist_mutex;
 
 tlist_t* transList;
 int okCommit; // machine flag
@@ -80,6 +81,7 @@ int dstmInit(void) {
        pthread_mutex_init(&liveHosts_mutex, NULL);
        pthread_mutex_init(&recovery_mutex, NULL);
   pthread_mutex_init(&clearNotifyList_mutex,NULL);
+  pthread_mutex_init(&translist_mutex,NULL);
 #endif
 
   if (mhashCreate(MHASH_SIZE, MLOADFACTOR))
@@ -315,6 +317,7 @@ void *dstmAccept(void *acceptfd) {
        while(1) {
                int ret=recv_data_errorcode((int)acceptfd, &control, sizeof(char));
                //int ret=recv_data_errorcode_buf((int)acceptfd, &readbuffer, &control, sizeof(char));
+//    printf("%s -> Received control = %d\n",__func__,control);
     dupeptr = NULL;
 
                if (ret==0)
@@ -514,28 +517,31 @@ void *dstmAccept(void *acceptfd) {
 #endif
 #ifdef RECOVERY
       case REQUEST_TRANS_WAIT:
-        { 
+        {
           unsigned int new_leader_index;
           recv_data((int)acceptfd,&epoch_num,sizeof(unsigned int));
           recv_data((int)acceptfd,&new_leader_index,sizeof(unsigned int));
 
-          if(inspectEpoch(epoch_num) < 0) {
+          if(inspectEpoch(epoch_num,"REQUEST_TRANS_WAIT") < 0) {
             response = RESPOND_HIGHER_EPOCH;
             send_data((int)acceptfd,&response,sizeof(char));
           }
           else {
             printf("Got new Leader! : %d\n",epoch_num);
-
-            stopTransactions(TRANS_BEFORE);
-
             pthread_mutex_lock(&recovery_mutex);
             currentEpoch = epoch_num;
+            okCommit = TRANS_BEFORE;
             leader_index = new_leader_index;
             pthread_mutex_unlock(&recovery_mutex);
-            
-            response = RESPOND_TRANS_WAIT;
-            send_data((int)acceptfd,&response,sizeof(char));
-            sendMyList((int)acceptfd);
+            if(stopTransactions(TRANS_BEFORE,epoch_num) < 0) {
+              response = RESPOND_HIGHER_EPOCH;
+              send_data((int)acceptfd,&response,sizeof(char));
+            }
+            else {
+              response = RESPOND_TRANS_WAIT;
+              send_data((int)acceptfd,&response,sizeof(char));
+              sendMyList((int)acceptfd);
+            }
           }
         } 
         break;
@@ -545,14 +551,16 @@ void *dstmAccept(void *acceptfd) {
         { 
           recv_data((int)acceptfd,&epoch_num,sizeof(unsigned int));
 
-          if(inspectEpoch(epoch_num) < 0) {
+          if(inspectEpoch(epoch_num,"RELEASE_NEW_LIST") < 0) {
             response = RESPOND_HIGHER_EPOCH;
           }
           else 
           {
             response =  receiveNewList((int)acceptfd);
-            stopTransactions(TRANS_AFTER);
+            if(stopTransactions(TRANS_AFTER,epoch_num) < -1)
+              response = RESPOND_HIGHER_EPOCH;
           }
+          printf("After stop transaction\n");
           send_data((int)acceptfd,&response,sizeof(char));
         }
       break;
@@ -561,7 +569,7 @@ void *dstmAccept(void *acceptfd) {
 
         recv_data((int)acceptfd,&epoch_num,sizeof(char));
 
-        if(inspectEpoch(epoch_num) < 0) break;
+        if(inspectEpoch(epoch_num,"REQUEST_TRANS_RESTART") < 0) break;
         
         pthread_mutex_lock(&liveHosts_mutex);
         printf("RESTART!!!\n");
@@ -598,7 +606,7 @@ void *dstmAccept(void *acceptfd) {
 
          recv_data((int)acceptfd,&epoch_num,sizeof(unsigned int));
 
-         if(inspectEpoch(epoch_num) < 0) {
+         if(inspectEpoch(epoch_num,"REQUEST_DUPLICATE") < 0) {
            break;
          }
                                
@@ -731,10 +739,8 @@ int readDuplicateObjs(int acceptfd) {
       return -1;
     }
   
-    printf("%s -> PAss this point\n",__func__);
 
                ptr = dupeptr;
-    printf("%s -> numoid = %u\n",__func__,numoid);
                for(i = 0; i < numoid; i++) {
                        header = (objheader_t *)ptr;
                        oid = OID(header);
@@ -824,10 +830,6 @@ int readClientReq(trans_commit_data_t *transinfo, int acceptfd) {
   fixed.control = TRANS_REQUEST;
   timeout = recv_data((int)acceptfd, ptr+1, size);
 
-#ifdef RECOVERY
-  transList = tlistInsertNode(transList,fixed.transid,TRYING_TO_COMMIT,TRANS_OK);
-#endif
-
   /* Read list of mids */
   int mcount = fixed.mcount;
   size = mcount * sizeof(unsigned int);
@@ -915,109 +917,155 @@ int processClientReq(fixed_data_t *fixed, trans_commit_data_t *transinfo,
   char control, sendctrl, retval;
   
   objheader_t *tmp_header;
-  void *header;
-  int i = 0, val;
+  int i = 0;
+  unsigned int epoch_num;
+  tlist_node_t* tNode;
+#ifdef DEBUG
+  printf("%s -> Enter\n",__func__);
+#endif
 
+//  printf("%s -> transID : %u\n",__func__,fixed->transid);
+  if(inspectEpoch(fixed->epoch_num,"procesClient1") < 0) {
+    printf("%s-> Higher Epoch current epoch = %u epoch %u\n",__func__,currentEpoch,fixed->epoch_num);
+    control = RESPOND_HIGHER_EPOCH;
+    send_data((int)acceptfd,&control,sizeof(char));
+  }
   /* Send reply to the Coordinator */
-  if((retval = handleTransReq(fixed, transinfo, listmid, objread, modptr,acceptfd)) == 0 ) {
+  else if((retval = handleTransReq(fixed, transinfo, listmid, objread, modptr,acceptfd)) == 0 ) {
     printf("Error: In handleTransReq() %s, %d\n", __FILE__, __LINE__);
          printf("DEBUG-> Exiting processClientReq, line = %d\n", __LINE__);
     return 1;
   }
 
 //  printf("%s -> Waiting for transID : %u\n",__func__,fixed->transid);
+       int timeout1 = recv_data((int)acceptfd, &control, sizeof(char));
+  int timeout2 = recv_data((int)acceptfd, &epoch_num, sizeof(unsigned int));
 
-       int timeout = recv_data((int)acceptfd, &control, sizeof(char));
-
-#ifdef RECOVERY
-  if(timeout < 0) {  // timeout. failed to receiving data from coordinator
-    control = -1;
+  if(timeout1 < 0 || timeout2 < 0) {  // timeout. failed to receiving data from coordinator
+    control = DECISION_LOST;
   }
+  
+  pthread_mutex_lock(&translist_mutex);
+  transList = tlistInsertNode(transList,fixed->transid,control,TRYING_TO_COMMIT,epoch_num);
+  pthread_mutex_unlock(&translist_mutex);
+
+  pthread_mutex_lock(&translist_mutex);
+  tNode = tlistSearch(transList,fixed->transid);  
+  pthread_mutex_unlock(&translist_mutex);         
+  
   // check if it is allowed to commit
-  control = inspectTransaction(control,fixed->transid,"processClientReq",TRANS_BEFORE);
-  thashInsert(fixed->transid, control);
+  do {
+    tNode->status = TRANS_INPROGRESS; 
+    if(okCommit != TRANS_BEFORE) {
+      if(inspectEpoch(tNode->epoch_num,"processCleint2") > 0) {
+        tNode->status = TRANS_INPROGRESS;
+        thashInsert(fixed->transid,tNode->decision);
+        commitObjects(tNode->decision,fixed,transinfo,modptr,oidmod,acceptfd);
+        tNode->status = TRANS_AFTER;
+      }
+      if(okCommit == TRANS_AFTER) {
+      printf("%s -> 11 \ttransID : %u decision : %d status : %d \n",__func__,tNode->transid,tNode->decision,tNode->status);
+      sleep(3);     
+      }
+    }
+    else {
+      tNode->status = TRYING_TO_COMMIT;
+      printf("%s -> Waiting!! \ttransID : %u decision : %d status : %d \n",__func__,tNode->transid,tNode->decision,tNode->status);
+      sleep(3);
+      randomdelay();
+    }
+    
+  }while(tNode->status != TRANS_AFTER);
+
+  if(okCommit == TRANS_AFTER)
+  {
+    printf("%s -> TRANS_AFTER!! \ttransID : %u decision : %d status : %d \n",__func__,tNode->transid,tNode->decision,tNode->status);
+    printf("%s -> Before removing\n",__func__);
+  }
+
+
+  pthread_mutex_lock(&translist_mutex);
+  transList = tlistRemove(transList,fixed->transid);
+  pthread_mutex_unlock(&translist_mutex);
+
+  if(okCommit == TRANS_AFTER)
+    printf("%s -> After removing\n",__func__);
+
 
+  /* Free memory */
+  if (transinfo->objlocked != NULL) {
+    free(transinfo->objlocked);
+  }
+  if (transinfo->objnotfound != NULL) {
+    free(transinfo->objnotfound);
+  }
+#ifdef DEBUG
+       printf("%s-> Exit\n", __func__);
 #endif
 
+  return 0;
+}
+
+void commitObjects(char control,fixed_data_t* fixed,trans_commit_data_t* transinfo,void* modptr,unsigned int* oidmod,int acceptfd)
+{
+  void *header;
+  int val;
+  int i;
+
   switch(control) {
-               case TRANS_ABORT:
-                       if (fixed->nummod > 0)
-                               free(modptr);
-                       /* Unlock objects that was locked due to this transaction */
-                       int useWriteUnlock = 0;
+    case TRANS_ABORT:
+      if (fixed->nummod > 0)       
+        free(modptr);
+      /* Unlock objects that was locked due to this transaction */
+      int useWriteUnlock = 0;
                        for(i = 0; i< transinfo->numlocked; i++) {
-                               if(transinfo->objlocked[i] == -1) {
-                                       useWriteUnlock = 1;
-                                       continue;
+                         if(transinfo->objlocked[i] == -1) {
+                                 useWriteUnlock = 1;
+                                 continue;
                                }
                                if((header = mhashSearch(transinfo->objlocked[i])) == NULL) {
-                                       printf("mhashSearch returns NULL at %s, %d\n", __FILE__, __LINE__); // find the header address
-                                       printf("%s-> Exiting, line:%d\n", __func__, __LINE__);
-                                       return 1;
+                           printf("mhashSearch returns NULL at %s, %d\n", __FILE__, __LINE__); // find the header address
+                                 printf("%s-> Exiting, line:%d\n", __func__, __LINE__);
+                           exit(0);
+          return ;
                                }
                                if(useWriteUnlock) {
-                                       write_unlock(STATUSPTR(header));
+                                 write_unlock(STATUSPTR(header));
                                } else {
-                                       read_unlock(STATUSPTR(header));
-                               }
+                                 read_unlock(STATUSPTR(header));
+                         }
                        }
                        break;
-
-               case TRANS_COMMIT:
+      case TRANS_COMMIT:
       /* insert received control into thash for another transaction*/
                        /* Invoke the transCommit process() */
-                       if((val = transCommitProcess(modptr, oidmod, transinfo->objlocked, fixed->nummod, transinfo->numlocked, (int)acceptfd)) != 0) {
-                               printf("Error: In transCommitProcess() %s, %d\n", __FILE__, __LINE__);
+                 if((val = transCommitProcess(modptr, oidmod, transinfo->objlocked, fixed->nummod, transinfo->numlocked, (int)acceptfd)) != 0) {
+                         printf("Error: In transCommitProcess() %s, %d\n", __FILE__, __LINE__);
                                /* Free memory */
                                if (transinfo->objlocked != NULL) {
-                                       free(transinfo->objlocked);
+                                 free(transinfo->objlocked);
                                }
                                if (transinfo->objnotfound != NULL) {
-                                       free(transinfo->objnotfound);
-                               }
-                               printf("%s-> Exiting, line:%d\n", __func__, __LINE__);
-                               return 1;
+                                 free(transinfo->objnotfound);
+             }
+                         printf("%s-> Exiting, line:%d\n", __func__, __LINE__);
+        exit(0);
+        return;
                        }
-
-
-                       break;
-
-               case TRANS_ABORT_BUT_RETRY_COMMIT_WITH_RELOCATING:
-                       break;
-
-               default:
-                       printf("Error: No response to TRANS_AGREE OR DISAGREE protocol %s, %d\n", __FILE__, __LINE__);
-                       //TODO Use fixed.trans_id  TID since Client may have died
-                       break;
-       }
-
-#ifdef RECOVERY
-  inspectTransaction(control,fixed->transid,"processClientReq",TRANS_AFTER);
-
-  tlist_node_t* tNode = tlistSearch(transList,fixed->transid);
-  tNode->status = TRANS_OK;
-
-  pthread_mutex_lock(&clearNotifyList_mutex);
-  transList = tlistRemove(transList,fixed->transid);
-  pthread_mutex_unlock(&clearNotifyList_mutex);
-
-#endif
-
-  /* Free memory */
-  if (transinfo->objlocked != NULL) {
-    free(transinfo->objlocked);
-  }
-  if (transinfo->objnotfound != NULL) {
-    free(transinfo->objnotfound);
+      break;
+    case TRANS_ABORT_BUT_RETRY_COMMIT_WITH_RELOCATING:
+      break;
+    default:
+      printf("%s : No response to TRANS_AGREE OR DISAGREE protocol - transID = %u, control =  %d\a\n",__func__,fixed->transid);
+      //TODO Use fixed.trans_id  TID since Client may have died
+                       break;                                                                                                                        
   }
-#ifdef DEBUG
-       printf("%s-> Exiting, line:%d\n", __func__, __LINE__);
-#endif
-
-  return 0;
-}
-
-#ifdef RECOVERY
+  
+  return;
+} 
+            
+            
+           
 char checkDecision(unsigned int transID) 
 {
 #ifdef DEBUG
@@ -1031,7 +1079,6 @@ char checkDecision(unsigned int transID)
   else
     return response;
 }
-#endif
 
 /* This function increments counters while running a voting decision on all objects involved
  * in TRANS_REQUEST and If a TRANS_DISAGREE sends the response immediately back to the coordinator */
@@ -1042,6 +1089,9 @@ char handleTransReq(fixed_data_t *fixed, trans_commit_data_t *transinfo, unsigne
   unsigned int oid;
   unsigned int *oidnotfound, *oidlocked, *oidvernotmatch;
   objheader_t *headptr;
+#ifdef DEBUG
+  printf("%s -> Enter\n",__func__);
+#endif
 
   /* Counters and arrays to formulate decision on control message to be sent */
   oidnotfound = (unsigned int *) calloc(fixed->numread + fixed->nummod, sizeof(unsigned int));
@@ -1135,27 +1185,6 @@ char handleTransReq(fixed_data_t *fixed, trans_commit_data_t *transinfo, unsigne
                        offset += size;
                }
 #endif
-        /*
-               if (objlocked > 0) {
-                       int useWriteUnlock = 0;
-                       for(j = 0; j < objlocked; j++) {
-                               if(oidlocked[j] == -1) {
-                                       useWriteUnlock = 1;
-                                       continue;
-                               }
-                               if((headptr = mhashSearch(oidlocked[j])) == NULL) {
-                                       printf("mhashSearch returns NULL at %s, %d\n", __FILE__, __LINE__);
-                                       return 0;
-                               }
-                               if(useWriteUnlock) {
-                                       write_unlock(STATUSPTR(headptr));
-                               } else {
-                                       read_unlock(STATUSPTR(headptr));
-                               }
-                       }
-                       free(oidlocked);
-               }
-        */
     
 #ifdef DEBUG
                printf("%s -> control = %d, file = %s, line = %d\n", __func__,(int)control, __FILE__, __LINE__);
@@ -1165,6 +1194,7 @@ char handleTransReq(fixed_data_t *fixed, trans_commit_data_t *transinfo, unsigne
       printf("control = %d\n",control);
     control=TRANS_DISAGREE;
 
+    printf("%s -> Sent message!\n",__func__);
                send_data(acceptfd, &control, sizeof(char));
 #ifdef CACHE
                send_data(acceptfd, &numBytes, sizeof(int));
@@ -1183,6 +1213,9 @@ char handleTransReq(fixed_data_t *fixed, trans_commit_data_t *transinfo, unsigne
     printf("Error: In decideCtrlMessage() %s, %d\n", __FILE__, __LINE__);
     return 0;
   }
+#ifdef DEBUG
+  printf("%s -> Exit\n",__func__);
+#endif
   return control;
 }
 
@@ -1414,6 +1447,9 @@ char decideCtrlMessage(fixed_data_t *fixed, trans_commit_data_t *transinfo, int
                        unsigned int *oidnotfound, unsigned int *oidlocked, int acceptfd) {
   int val;
   char control = 0;
+#ifdef DEBUG
+  printf("%s -> Enter\n",__func__);
+#endif
 
   /* Condition to send TRANS_AGREE */
   if(*(v_matchnolock) == fixed->numread + fixed->nummod) {
@@ -1436,16 +1472,6 @@ char decideCtrlMessage(fixed_data_t *fixed, trans_commit_data_t *transinfo, int
 #endif
     /* Send control message */
     send_data(acceptfd, &control, sizeof(char));
-
-    
-    /*  FIXME how to send objs Send number of oids not found and the missing oids if objects are missing in the machine */
-    /*if(*(objnotfound) != 0) {
-      int msg[1];
-      msg[0] = *(objnotfound);
-      send_data(acceptfd, &msg, sizeof(int));
-      int size = sizeof(unsigned int)* *(objnotfound);
-      send_data(acceptfd, oidnotfound, size);
-    }*/
   }
 
   /* Fill out the trans_commit_data_t data structure. This is required for a trans commit process
@@ -1455,6 +1481,10 @@ char decideCtrlMessage(fixed_data_t *fixed, trans_commit_data_t *transinfo, int
   transinfo->modptr = modptr;
   transinfo->numlocked = *(objlocked);
   transinfo->numnotfound = *(objnotfound);
+
+#ifdef DEBUG
+  printf("%s -> Exit\n",__func__);
+#endif
   return control;
 }
 
@@ -1816,37 +1846,62 @@ void receiveNewHostLists(int acceptfd)
 }
 
 /* wait until all transaction waits for leader's decision */
-void stopTransactions(int TRANS_FLAG)
+int stopTransactions(int TRANS_FLAG,unsigned int epoch_num)
 {
 //  printf("%s - > Enter flag :%d\n",__func__,TRANS_FLAG);
   int size = transList->size;
   int i;
+  int flag;
   tlist_node_t* walker;
-  
-  pthread_mutex_lock(&liveHosts_mutex);
-  okCommit = TRANS_FLAG;
-  pthread_mutex_unlock(&liveHosts_mutex);
 
-  /* make sure that all transactions are stopped */
-  pthread_mutex_lock(&clearNotifyList_mutex);
+  if(TRANS_FLAG == TRANS_BEFORE) {
+    okCommit = TRANS_BEFORE;
+    /* make sure that all transactions are stopped */
+    do {
+      transList->flag = 0;
+      walker = transList->head;
 
-  do {
-    transList->flag = 0;
-    walker = transList->head;
+      while(walker)
+      {
+        // locking
+        while(walker->status == TRANS_INPROGRESS) {
+          printf("%s ->transid : %u - decision %d Status : %d Waitflag = %d\n",__func__,walker->transid,walker->decision,walker->status,TRANS_FLAG);
+          if(inspectEpoch(epoch_num,"stopTrans_Before") < 0)
+            return -1;                                                
+          sleep(3);
+        }
+      walker = walker->next;
+      }
 
-    while(walker)
-    {
-      // locking
-      while(!(walker->status == TRANS_FLAG || walker->status == TRANS_OK)) {
-//        printf("%s -> Waiting for %u - Status : %d tHash : %d\n",__func__,walker->transid,walker->status,thashSearch(walker->transid));
-        randomdelay();
+      pthread_mutex_lock(&translist_mutex);
+      flag = transList->flag;
+      pthread_mutex_unlock(&translist_mutex);
+    }while(flag == 1);
+  }
+  else if(TRANS_FLAG == TRANS_AFTER)
+  {
+    printf("%s -> TRANS_AFTER\n",__func__);
+    okCommit = TRANS_AFTER;
+    do {
+      pthread_mutex_lock(&translist_mutex);
+      size = transList->size;
+      printf("%s -> size = %d\n",__func__,size);
+      printf("%s -> okCommit = %d\n",__func__,okCommit);
+      walker = transList->head;
+      while(walker){
+        printf("%s ->transid : %u - decision %d Status : %d epoch = %u  current epoch : %u\n",__func__,walker->transid,walker->decision,walker->status,walker->epoch_num,currentEpoch);
+        walker = walker->next;
       }
+      pthread_mutex_unlock(&translist_mutex);
 
-      walker = walker->next;
-    }
-  }while(transList->flag == 1);
+      if(inspectEpoch(epoch_num,"stopTrans_Before") < 0)
+        return -1;
 
-  pthread_mutex_unlock(&clearNotifyList_mutex);
+      sleep(3);
+    }while(size != 0);
+  }
+
+  return 0;
 }
 
 void sendMyList(int acceptfd)
@@ -1860,18 +1915,28 @@ void sendMyList(int acceptfd)
 
 void sendTransList(int acceptfd)
 {
+  printf("%s -> Enter\n",__func__);
   int size;
   char response;
   int transid;
+  int i;
+  tlist_node_t* walker = transList->head;
 
   // send on-going transaction
+  pthread_mutex_lock(&translist_mutex);
   tlist_node_t* transArray = tlistToArray(transList,&size);
+  pthread_mutex_unlock(&translist_mutex);
 
-/*  if(transList->size != 0)
+  if(transList->size != 0)
     tlistPrint(transList);
 
   printf("%s -> transList->size : %d  size = %d\n",__func__,transList->size,size);
-*/
+
+  for(i = 0; i< size; i++) {
+    printf("ID : %u  Decision : %d  status : %d\n",transArray[i].transid,transArray[i].decision,transArray[i].status);
+  }
+  printf("%s -> End transArray\n",__func__);
+
   send_data((int)acceptfd,&size,sizeof(int));
   send_data((int)acceptfd,transArray, sizeof(tlist_node_t) * size);
 
@@ -1894,6 +1959,7 @@ void sendTransList(int acceptfd)
 
 int receiveNewList(int acceptfd)
 {
+  printf("%s -> Enter\n",__func__);
   int size;
   tlist_node_t* tArray;
   tlist_node_t* walker;
@@ -1933,6 +1999,7 @@ int receiveNewList(int acceptfd)
     response = -1;
   }
 
+  printf("%s -> Exit\n",__func__);
   return response;
 }
 
@@ -1951,7 +2018,7 @@ int combineTransactionList(tlist_node_t* tArray,int size)
         if(walker->transid == tArray[i].transid)
         {
           walker->decision = tArray[i].decision;
-//          walker->status = tArray[i].status;
+          walker->epoch_num = tArray[i].epoch_num;
           break;
         }
       }
@@ -1961,37 +2028,4 @@ int combineTransactionList(tlist_node_t* tArray,int size)
   return flag;
 }
 
-char inspectTransaction(char finalResponse,unsigned int transid,char* debug,int TRANS_FLAG)
-{
-  tlist_node_t* tNode;
-
-  tNode = tlistSearch(transList,transid);
-  
-  if(finalResponse <= 0) {
-    tNode->decision = DECISION_LOST;
-  }
-  else {
-    tNode->decision = finalResponse;
-  }
-
-//  printf("%s -> decision = %d okCommit = %d\n",__func__,tNode->decision,okCommit);
-
-  if((tNode->decision == DECISION_LOST) || (okCommit != TRANS_OK))
-  {
-    pthread_mutex_lock(&liveHosts_mutex);
-    tNode->status = TRANS_FLAG;
-    pthread_mutex_unlock(&liveHosts_mutex);
-
-    // if decision is not lost and okCommit is not TRANS_FLAG, get out of this loop
-    while(!((tNode->decision != DECISION_LOST) && (okCommit != TRANS_FLAG))) { 
-//      printf("%s -> transID : %u decision : %d is waiting flag : %d\n",debug,tNode->transid,tNode->decision,TRANS_FLAG);
-      randomdelay();
-    }
-
-    finalResponse = tNode->decision;
-  }
-
-  return finalResponse;
-}
-  
 #endif
index 6a8b12d42a2bf6d82ac9d6ae6d55ce735de4cc13..1d8ad41d343391e3afd3e67410e7905b9798ecbe 100644 (file)
@@ -111,6 +111,7 @@ unsigned int transIDMax;
 char ip[16];      // for debugging purpose
 
 extern tlist_t* transList;
+extern pthread_mutex_t translist_mutex;
 extern pthread_mutex_t clearNotifyList_mutex;
 
 unsigned int currentEpoch;
@@ -240,6 +241,7 @@ GDBRECV1:
     numbytes = recv(fd, buffer, size, 0);
     bytesRecv += numbytes;
     
+    
     if (numbytes>0) {
       buffer += numbytes;
       size -= numbytes;
@@ -275,6 +277,8 @@ GDBRECV1:
        return -2;
       }
     } else {
+//      printf("%s -> Here?\n",__func__);
+//      printf("%s-> errno = %d %s\n", __func__, errno, strerror(errno));
       //Case: numbytes==0
       //machine has failed -- this case probably doesn't occur in reality
       //
@@ -1202,6 +1206,8 @@ int transCommit() {
   int deadsd = -1;
   int deadmid = -1;
   unsigned int transID = getNewTransID();
+  unsigned int epoch_num;
+  tlist_node_t* tNode;
 #endif
 
 #ifdef DEBUG
@@ -1217,7 +1223,7 @@ int transCommit() {
     removetransactionhash();
     objstrDelete(t_cache);
     t_chashDelete();
-#ifdef DEBUG
+#ifndef DEBUG
          printf("%s-> End, line:%d\n\n", __func__, __LINE__);
 #endif
     return 1;
@@ -1230,10 +1236,8 @@ int transCommit() {
  //   sleep(1);
     randomdelay();
   }
-
-  transList = tlistInsertNode(transList,transID,TRYING_TO_COMMIT,TRANS_OK);
 #endif
-
+    
   int treplyretryCount = 0;
   /* Initialize timeout for exponential delay */
   exponential_backoff.tv_sec = 0;
@@ -1242,6 +1246,10 @@ int transCommit() {
   do {
     treplyretry = 0;
 
+    pthread_mutex_lock(&recovery_mutex);
+    epoch_num = currentEpoch;
+    pthread_mutex_unlock(&recovery_mutex);
+
     /* Look through all the objects in the transaction record and make piles
      * for each machine involved in the transaction*/
     if (firsttime) {
@@ -1263,6 +1271,7 @@ int transCommit() {
 
     /* Create a socket and getReplyCtrl array, initialize */
     int socklist[pilecount];
+    unsigned int midlist[pilecount];
     char getReplyCtrl[pilecount];
     int loopcount;
     for(loopcount = 0 ; loopcount < pilecount; loopcount++) {
@@ -1276,6 +1285,8 @@ int transCommit() {
     trans_req_data_t *tosend;
     tosend = calloc(pilecount, sizeof(trans_req_data_t));
 
+//    printf("%s -> transID : %u Start!\n",__func__,transID);
+
     while(pile != NULL) {
 #ifdef DEBUG
                        printf("%s-> New pile:[%s],", __func__, midtoIPString(pile->mid));
@@ -1288,20 +1299,22 @@ int transCommit() {
                        tosend[sockindex].f.nummod = pile->nummod;
                        tosend[sockindex].f.numcreated = pile->numcreated;
                        tosend[sockindex].f.sum_bytes = pile->sum_bytes;
-      tosend[sockindex].f.epoch_num = currentEpoch;
+      tosend[sockindex].f.epoch_num = epoch_num;
                        tosend[sockindex].listmid = listmid;
                        tosend[sockindex].objread = pile->objread;
                        tosend[sockindex].oidmod = pile->oidmod;
                        tosend[sockindex].oidcreated = pile->oidcreated;
 
 
-            int sd = 0;
+      midlist[sockindex] = pile->mid; // debugging purpose
+
+      int sd = 0;
                        if(pile->mid != myIpAddr) {
                                if((sd = getSockWithLock(transRequestSockPool, pile->mid)) < 0) {
                                        printf("\ntransRequest(): socket create error\n");
                                        free(listmid);
                                        free(tosend);
-#ifdef DEBUG
+#ifndef DEBUG
                                        printf("%s-> End, line:%d\n\n", __func__, __LINE__);
 #endif
                                        return 1;
@@ -1353,6 +1366,7 @@ int transCommit() {
                                send_data(sd, modptr, tosend[sockindex].f.sum_bytes);
                 //forcesend_buf(sd, &writebuffer, modptr, tosend[sockindex].f.sum_bytes);
 
+
                                free(modptr);
                        } else { //handle request locally
         handleLocalReq(&tosend[sockindex], &transinfo, &getReplyCtrl[sockindex]);
@@ -1363,7 +1377,7 @@ int transCommit() {
    
                /* Recv Ctrl msgs from all machines */
 #ifdef DEBUG
-               printf("%s-> Finished sending transaction read/mod objects\n",__func__);
+               printf("%s-> Finished sending transaction read/mod objects transID = %u\n",__func__,transID);
 #endif
                int i;
 
@@ -1372,11 +1386,10 @@ int transCommit() {
                        if(sd != 0) {
                                char control;
         int timeout;            // a variable to check if the connection is still alive. if it is -1, then need to transcommit again
+//        printf("%s -> Waiting for mid : %s transID = %u\n",__func__,midtoIPString(midlist[i]),transID);
         timeout = recv_data(sd, &control, sizeof(char));
 
-//        printf("i = %d control = %d\n",i,control);
-        
-
+//        printf("%s -> Received mid : %s control %d timeout = %d\n",__func__,midtoIPString(midlist[i]),control,timeout);
                                //Update common data structure with new ctrl msg
                                getReplyCtrl[i] = control;
                                /* Recv Objects if participant sends TRANS_DISAGREE */
@@ -1406,13 +1419,14 @@ int transCommit() {
                                                GETSIZE(size, header);
                                                size += sizeof(objheader_t);
                                                //make an entry in prefetch hash table
-                        prehashInsert(oidToPrefetch, header);
+            prehashInsert(oidToPrefetch, header);
                                                length = length - size;
                                                offset += size;
                                        }
                                } //end of receiving objs
 #endif
-
+        
+//        printf("%s -> Pass this point2\n",__func__);
 #ifdef RECOVERY
         if(timeout < 0) {
           deadmid = listmid[i];
@@ -1432,12 +1446,13 @@ int transCommit() {
 
 #ifdef RECOVERY
 // wait until leader fix the system
-    if(okCommit != TRANS_OK) {
-      inspectTransaction(finalResponse,transID,"transCommit before response",TRANS_BEFORE);
+    if(finalResponse == RESPOND_HIGHER_EPOCH) {
+      printf("%s -> Received Higher epoch\n",__func__);
       finalResponse = TRANS_ABORT;
       treplyretry = 0;
     }
 #endif
+//    printf("%s -> transID = %u Passed this point\n",__func__,transID);
 
 #ifdef CACHE
     if (finalResponse == TRANS_COMMIT) {
@@ -1452,73 +1467,51 @@ int transCommit() {
     }
 #endif
 
-               /* Send responses to all machines */
-               for(i = 0; i < pilecount; i++) {
-                       int sd = socklist[i];
-#ifdef RECOVERY
-      if(sd != deadsd) {
-#endif
-                       if(sd != 0) {
-#ifdef CACHE
-                               if(finalResponse == TRANS_COMMIT) {
-                                       int retval;
-                                       /* Update prefetch cache */
-                                       if((retval = updatePrefetchCache(&(tosend[i]))) != 0) {
-                                               printf("Error: %s() in updating prefetch cache %s, %d\n", __func__, __FILE__, __LINE__);
-                                                 free(tosend);
-                                               free(listmid);
-                                               return 1;
-                                       }
-                    
-#ifdef ABORTREADERS
-                                       removetransaction(tosend[i].oidmod,tosend[i].f.nummod);
-                                       removethisreadtransaction(tosend[i].objread, tosend[i].f.numread);
-#endif
-                                 }
-#ifdef ABORTREADERS
-                               else if (!treplyretry) {
-                                       removethistransaction(tosend[i].oidmod,tosend[i].f.nummod);
-                                       removethisreadtransaction(tosend[i].objread,tosend[i].f.numread);
-                               }
-#endif
-#endif
-          send_data(sd,&finalResponse,sizeof(char));
-      } else {
-                               /* Complete local processing */
-          finalResponse = doLocalProcess(finalResponse, &(tosend[i]), &transinfo);
-
-#ifdef ABORTREADERS
-                                 if(finalResponse == TRANS_COMMIT) {
-                                         removetransaction(tosend[i].oidmod,tosend[i].f.nummod);
-                                       removethisreadtransaction(tosend[i].objread,tosend[i].f.numread);
-                               } else if (!treplyretry) {
-                                       removethistransaction(tosend[i].oidmod,tosend[i].f.nummod);
-                                       removethisreadtransaction(tosend[i].objread,tosend[i].f.numread);
-                                 }
-#endif
-                       }
-#ifdef RECOVERY
-      } 
-#endif
+  if(finalResponse == TRANS_COMMIT) {
+    pthread_mutex_lock(&translist_mutex);
+    transList = tlistInsertNode(transList,transID,TRYING_TO_COMMIT,TRYING_TO_COMMIT,epoch_num);
+    tNode = tlistSearch(transList,transID);
+    pthread_mutex_unlock(&translist_mutex);
+    
+    tNode->decision = finalResponse;
+    tNode->status = TRANS_INPROGRESS;
+    if(okCommit == TRANS_OK && inspectEpoch(epoch_num,"TRANS_COMMIT") > 0)
+    {
+      finalResponse = tNode->decision;
+      thashInsert(transID,tNode->decision);
+      commitMessages(epoch_num,socklist,deadsd,pilecount,tosend,tNode->decision,treplyretry,transinfo);
+      tNode->status = TRANS_AFTER;
+    }
+    else { 
+      tNode->status = TRYING_TO_COMMIT;
+      if(inspectEpoch(epoch_num,"TRANS_COMMIT2") > 0) {
+//        treplyretry = 1; 
+      }
+      finalResponse = TRANS_ABORT;
+      commitMessages(epoch_num,socklist,deadsd,pilecount,tosend,finalResponse,treplyretry,transinfo);
     }
-       
-   for(i = 0; i< pilecount; i++) {
+
+    //===========  after transaction point
+    pthread_mutex_lock(&translist_mutex);
+    transList = tlistRemove(transList,transID);
+    pthread_mutex_unlock(&translist_mutex);
+  }
+  else {
+    commitMessages(epoch_num,socklist,deadsd,pilecount,tosend,finalResponse,treplyretry,transinfo);
+  }
+
+  for(i = 0; i< pilecount; i++) {
      if(socklist[i] > 0) {
        freeSockWithLock(transRequestSockPool,listmid[i], socklist[i]);
      }
    }
-
-                       /* Free resources */
-    free(tosend);
-    free(listmid);
-    if (!treplyretry)
-      pDelete(pile_ptr);
-    /* wait a random amount of time before retrying to commit transaction*/
-    if(treplyretry) {
-      //treplyretryCount++;
-      //if(treplyretryCount >= NUM_TRY_TO_COMMIT)
-      //  exponentialdelay();
-      //else
+               /* Free resources */
+  free(tosend);
+  free(listmid);
+  if (!treplyretry)
+    pDelete(pile_ptr);
+  /* wait a random amount of time before retrying to commit transaction*/
+  if(treplyretry) {
       randomdelay();
 #ifdef TRANSSTATS
                        nSoftAbort++;
@@ -1527,16 +1520,10 @@ int transCommit() {
        } while (treplyretry && deadmid != -1);
 
 #ifdef RECOVERY
-  //===========  after transaction point
-  tlist_node_t* tNode = tlistSearch(transList,transID);
-  inspectTransaction(finalResponse,transID,"Coordinator",TRANS_AFTER);
 
-  tNode->status = TRANS_OK;
-  finalResponse = tNode->decision;
 
-  pthread_mutex_lock(&clearNotifyList_mutex);
-  transList = tlistRemove(transList,transID);
-  pthread_mutex_unlock(&clearNotifyList_mutex);
+
+
 #endif
 
        if(finalResponse == TRANS_ABORT) {
@@ -1547,8 +1534,7 @@ int transCommit() {
     objstrDelete(t_cache);
     t_chashDelete();
 #ifdef RECOVERY
-    if(deadmid != -1) { /* if deadmid is greater than or equal to 0, 
-                          then there is dead machine. */
+    if(deadmid != -1) { /* if deadmid is greater than or equal to 0,                           then there is dead machine. */
       notifyLeaderDeadMachine(deadmid);
     }
 #endif
@@ -1572,6 +1558,59 @@ int transCommit() {
   return 0;
 }
 
+void commitMessages(unsigned int epoch_num,int* socklist,unsigned int deadsd,int pilecount,trans_req_data_t* tosend,char finalResponse,char treplyretry,trans_commit_data_t transinfo ) {
+  int i;
+  /* Send responses to all machines */
+       for(i = 0; i < pilecount; i++) {
+         int sd = socklist[i];
+#ifdef RECOVERY
+    if(sd != deadsd) {
+#endif
+    if(sd != 0) {
+#ifdef CACHE
+           if(finalResponse == TRANS_COMMIT) {
+                   int retval;
+                         /* Update prefetch cache */
+                         if((retval = updatePrefetchCache(&(tosend[i]))) != 0) {
+                           printf("Error: %s() in updating prefetch cache %s, %d\n", __func__, __FILE__, __LINE__);
+//                               free(tosend);
+  //                     free(listmid);
+          exit(0);
+//                       return 1;
+                         }
+#ifdef ABORTREADERS
+                       removetransaction(tosend[i].oidmod,tosend[i].f.nummod);
+                         removethisreadtransaction(tosend[i].objread, tosend[i].f.numread);
+#endif
+      }
+#ifdef ABORTREADERS
+      else if (!treplyretry) {
+             removethistransaction(tosend[i].oidmod,tosend[i].f.nummod);
+             removethisreadtransaction(tosend[i].objread,tosend[i].f.numread);
+           }
+#endif
+#endif
+      send_data(sd,&finalResponse,sizeof(char));
+      send_data(sd,&epoch_num,sizeof(unsigned int));
+     } else {
+     /* Complete local processing */
+     finalResponse = doLocalProcess(finalResponse, &(tosend[i]), &transinfo);
+#ifdef ABORTREADERS
+      if(finalResponse == TRANS_COMMIT) {
+                         removetransaction(tosend[i].oidmod,tosend[i].f.nummod);
+                 removethisreadtransaction(tosend[i].objread,tosend[i].f.numread);
+               } else if (!treplyretry) {
+                   removethistransaction(tosend[i].oidmod,tosend[i].f.nummod);
+                         removethisreadtransaction(tosend[i].objread,tosend[i].f.numread);
+                       }
+#endif
+       }
+#ifdef RECOVERY
+    } 
+#endif    
+  }
+}
+
 /* This function handles the local objects involved in a transaction
  * commiting process.  It also makes a decision if this local machine
  * sends AGREE or DISAGREE or SOFT_ABORT to coordinator */
@@ -1632,12 +1671,6 @@ void handleLocalReq(trans_req_data_t *tdata, trans_commit_data_t *transinfo, cha
 }
 
 char doLocalProcess(char finalResponse, trans_req_data_t *tdata, trans_commit_data_t *transinfo) {
-
-#ifdef RECOVERY
-  finalResponse = inspectTransaction(finalResponse,tdata->f.transid,"Local Commit",TRANS_BEFORE);
-  thashInsert(tdata->f.transid,finalResponse);
-#endif
-
   if(finalResponse == TRANS_ABORT) {
     if(transAbortProcess(transinfo) != 0) {
       printf("Error in transAbortProcess() %s,%d\n", __FILE__, __LINE__);
@@ -1651,7 +1684,7 @@ char doLocalProcess(char finalResponse, trans_req_data_t *tdata, trans_commit_da
       return;
     }
   } else {
-    printf("ERROR...No Decision\n");
+    printf("%s -> ERROR...No Decision transID = %u finalResponse = %d\a\n",__func__,tdata->f.transid,finalResponse);
   }
 
 
@@ -1671,6 +1704,8 @@ char doLocalProcess(char finalResponse, trans_req_data_t *tdata, trans_commit_da
 char decideResponse(char *getReplyCtrl, char *treplyretry, int pilecount) {
   int i, transagree = 0, transdisagree = 0, transsoftabort = 0; /* Counters to formulate decision of what
                                                                   message to send */
+
+  int higher_epoch_num=0;
   for (i = 0 ; i < pilecount; i++) {
     char control;
     control = getReplyCtrl[i];
@@ -1701,9 +1736,19 @@ char decideResponse(char *getReplyCtrl, char *treplyretry, int pilecount) {
       printf("%s-> Participant sent TRANS_SOFT_ABORT, i:%d, Control: %d\n", __func__, i, (int)control);
 #endif
       break;
+    case RESPOND_HIGHER_EPOCH:
+      higher_epoch_num++;
+#ifdef DEBUG                                                                                              
+      printf("%s-> Participant sent TRANS_DISAGREE, i:%d, Control: %d\n", __func__, i, (int)control);     
+#endif 
+      break;
     }
   }
 
+  if(higher_epoch_num > 0)
+    return RESPOND_HIGHER_EPOCH;
+
+
   if(transdisagree > 0) {
     /* Send Abort */
     *treplyretry = 0;
@@ -1791,7 +1836,7 @@ void notifyLeaderDeadMachine(unsigned int deadHost) {
   unsigned int epoch_num;
 
        if(!liveHosts[findHost(deadHost)]) {  // if it is already fixed
-    printf("%s -> already fixed\n",__func__);
+//    printf("%s -> already fixed\n",__func__);
                sleep(WAIT_TIME);
                return;
        }
@@ -1807,6 +1852,7 @@ void notifyLeaderDeadMachine(unsigned int deadHost) {
   // increase epoch number by number machines in the system
   pthread_mutex_lock(&recovery_mutex);
   epoch_num = currentEpoch = INCREASE_EPOCH(currentEpoch,numHostsInSystem,myIndexInHostArray);
+  okCommit = TRANS_BEFORE;
   pthread_mutex_unlock(&recovery_mutex);
 
   // notify all machines that this machien will act as leader.
@@ -1817,13 +1863,12 @@ void notifyLeaderDeadMachine(unsigned int deadHost) {
 /* Leader's role */ 
 void restoreDuplicationState(unsigned int deadHost,unsigned int epoch_num)
 {
-  printf("%s -> Entering\n",__func__);
   int* sdlist;
   tlist_t* tList;
   int flag = 0;
 
 #ifdef RECOVERYSTATS
-  printf("Recovery Start\n");
+//  printf("Recovery Start\n");
   long long st;
   long long fi;
   unsigned int dupeSize = 0;  // to calculate the size of backed up data
@@ -1836,12 +1881,17 @@ void restoreDuplicationState(unsigned int deadHost,unsigned int epoch_num)
     do {
       sdlist = getSocketLists();
   
-      printf("%s -> I'm currently leader num : %d\n\n",__func__,epoch_num);
+      printf("%s -> I'm currently leader num : %d ping machines\n\n",__func__,epoch_num);
       if((flag = pingMachines(epoch_num,sdlist,&tList)) < 0) break;
 
-      printf("%s -> I'm currently leader num : %d\n\n",__func__,epoch_num);
+      pthread_mutex_lock(&translist_mutex);
+      tlistPrint(tList);
+      pthread_mutex_unlock(&translist_mutex);
+      getchar();
+      printf("%s -> I'm currently leader num : %d releaseing new lists\n\n",__func__,epoch_num);
       if((flag = releaseNewLists(epoch_num,sdlist,tList)) < 0) break;
-    
+      getchar();
+      printf("%s -> I'm currently leader num : %d duplicate objects\n\n",__func__,epoch_num);
       // transfer lost objects
       if((flag= duplicateLostObjects(epoch_num,sdlist)) < 0) break;
 
@@ -1877,7 +1927,6 @@ void restoreDuplicationState(unsigned int deadHost,unsigned int epoch_num)
   printRecoveryStat();
 #endif
   }
-  printf("%s -> Exiting\n",__func__);
 }
 
 int* getSocketLists()
@@ -1957,7 +2006,7 @@ int pingMachines(unsigned int epoch_num,int* sdlist,tlist_t** tList)
   char response;
   tlist_t* currentTransactionList;
    
-  if(inspectEpoch(epoch_num) < 0) {
+  if(inspectEpoch(epoch_num,__func__) < 0) {
     printf("%s -> Higher Epoch\n",__func__);
     return -1;
   }
@@ -1970,47 +2019,72 @@ int pingMachines(unsigned int epoch_num,int* sdlist,tlist_t** tList)
     if(sdlist[i] == -1 || hostIpAddrs[i] == myIpAddr)
       continue;
   
+    printf("%s -> sending request_trans_wait to %s\n",__func__,midtoIPString(hostIpAddrs[i]));
     request = REQUEST_TRANS_WAIT;
     send_data(sdlist[i],&request, sizeof(char));
     send_data(sdlist[i],&epoch_num,sizeof(unsigned int));
     send_data(sdlist[i],&myIndexInHostArray,sizeof(unsigned int));
   }
 
+  printf("%s -> Stop transaction\n",__func__);
   /* stop all local transactions */
-  stopTransactions(TRANS_BEFORE);
+  if(stopTransactions(TRANS_BEFORE,epoch_num) < 0)
+    return -1;
 
+  printf("After Stop transaction\n");
 
   // grab leader's transaction list first
   tlist_node_t* walker = transList->head;
-  
   while(walker) {
-    walker->status = TRANS_OK;
-    currentTransactionList = tlistInsertNode2(currentTransactionList,walker);
+    pthread_mutex_lock(&translist_mutex);
+    currentTransactionList = tlistInsertNode2(currentTransactionList,walker,epoch_num);
+    pthread_mutex_unlock(&translist_mutex);
     walker = walker->next;
   }
 
+//  printf("%s -> Local Transactions\n",__func__);
+//  tlistPrint(currentTransactionList);
+
   for(i = 0; i < numHostsInSystem; i++)
   {
     if(sdlist[i] == -1 || hostIpAddrs[i] == myIpAddr)
       continue;
 
+    printf("%s -> receving from %s\n",__func__,midtoIPString(hostIpAddrs[i]));
     if(recv_data(sdlist[i],&response,sizeof(char)) < 0)
     {
+      printf("Here\n");
+      pthread_mutex_lock(&translist_mutex);
       tlistDestroy(currentTransactionList);
+      pthread_mutex_unlock(&translist_mutex);
       return -2;
     }
 
+    printf("recevied response = %d\n",response);
     if(response == RESPOND_TRANS_WAIT) 
     {
+      printf("%s -> RESPOND_TRANS_WAIT\n",__func__);
+      int timeout1 = computeLiveHosts(sdlist[i]);
+      printf("%s -> received host list\n",__func__);
+      int timeout2 = makeTransactionLists(&currentTransactionList,sdlist[i],epoch_num);
+      printf("%s -> received transaction list\n",__func__);
       // receive live host list       // receive transaction list
-      if(computeLiveHosts(sdlist[i]) < 0 || makeTransactionLists(&currentTransactionList,sdlist[i]) < 0) {
+      if(timeout1 < 0 || timeout2 < 0) {
+        pthread_mutex_lock(&translist_mutex);
         tlistDestroy(currentTransactionList);
+        pthread_mutex_unlock(&translist_mutex);
         return -2;
       }
+      printf("\n\n\nAfter mid : %s \n",midtoIPString(hostIpAddrs[i]));
+      tlistPrint(currentTransactionList);
     }
     else if(response == RESPOND_HIGHER_EPOCH)
     {
+      printf("%s -> RESPOND_HIGHER_EPOCH\n",__func__);
+      pthread_mutex_lock(&translist_mutex);
       tlistDestroy(currentTransactionList);
+      pthread_mutex_unlock(&translist_mutex);
       return -1;
     }
     else {
@@ -2029,8 +2103,6 @@ int pingMachines(unsigned int epoch_num,int* sdlist,tlist_t** tList)
     }
     walker = walker->next;
   }
-
-  tlistPrint(currentTransactionList);
   *tList = currentTransactionList;
 
   printf("%s -> Exit\n",__func__);
@@ -2060,6 +2132,7 @@ int computeLiveHosts(int sd)
 
 int releaseNewLists(unsigned int epoch_num,int* sdlist,tlist_t* tlist)
 {
+  printf("%s -> Enter\n",__func__);
   int i;
   char response = RELEASE_NEW_LIST;
   int size;
@@ -2067,7 +2140,7 @@ int releaseNewLists(unsigned int epoch_num,int* sdlist,tlist_t* tlist)
   tlist_node_t* tArray;
   
   
-  if(inspectEpoch(epoch_num) < 0) return -1;  
+  if(inspectEpoch(epoch_num,__func__) < 0) return -1;  
   
   tArray = tlistToArray(tlist,&size);
 
@@ -2101,7 +2174,8 @@ int releaseNewLists(unsigned int epoch_num,int* sdlist,tlist_t* tlist)
         printf("%s -> problem\n",__func__);
         exit(0);
       }
-      stopTransactions(TRANS_AFTER);
+      if(stopTransactions(TRANS_AFTER,epoch_num) < 0)
+        return -1;
     }
   }
 
@@ -2135,12 +2209,13 @@ int releaseNewLists(unsigned int epoch_num,int* sdlist,tlist_t* tlist)
 
 // after this fuction
 // leader knows all the on-going transaction list and their decisions
-int makeTransactionLists(tlist_t** tlist,int sd)
+int makeTransactionLists(tlist_t** tlist,int sd,unsigned int epoch_num)
 {
   tlist_node_t* transArray;
   tlist_node_t* tmp;
   tlist_node_t* walker;
   int j;
+  int i;
   int size;
 
   // receive all on-going transaction list
@@ -2157,6 +2232,12 @@ int makeTransactionLists(tlist_t** tlist,int sd)
     return -2;
   }
 
+  printf("%s -> Received TransArray\n",__func__);
+  for(i = 0; i< size; i++) {
+    printf("ID : %u  Decision : %d  status : %d\n",transArray[i].transid,transArray[i].decision,transArray[i].status);
+  }
+  printf("%s -> End transArray\n",__func__);
+
   // add into currentTransactionList
   for(j = 0 ; j < size; j ++) {
     tmp = tlistSearch(*tlist,transArray[j].transid);
@@ -2164,7 +2245,9 @@ int makeTransactionLists(tlist_t** tlist,int sd)
     if(tmp == NULL) {
       tlist_node_t* tNode = &transArray[j];
       tNode->status = TRANS_OK;
-      *tlist = tlistInsertNode2(*tlist,&(transArray[j]));
+
+      printf("%s -> transid = %u decision = %d\n",__func__,transArray[j].transid,transArray[j].decision);
+      *tlist = tlistInsertNode2(*tlist,&(transArray[j]),epoch_num);
     }
     else {
       if(tmp->decision == DECISION_LOST && transArray[j].decision != DECISION_LOST)
@@ -2222,13 +2305,19 @@ void restartTransactions(unsigned int epoch_num,int* sdlist)
   }
 }
 
-int inspectEpoch(unsigned int epoch_num)
+int inspectEpoch(unsigned int epoch_num,const char* f)
 {
   int flag = 1;
+
+//  printf("%s -> current epoch %u epoch num = %u\n",__func__,currentEpoch,epoch_num);
   pthread_mutex_lock(&recovery_mutex);
   if(epoch_num < currentEpoch) {
     flag = -1;
-  }
+  }/*
+  else if(epoch_num > currentEpoch) {
+//    printf("%s -> current epoch %u is changed to epoch num = %u\n",f,currentEpoch,epoch_num);
+//    currentEpoch = epoch_num;
+  }*/
   pthread_mutex_unlock(&recovery_mutex);
 
   return flag;
@@ -3199,8 +3288,6 @@ int getNumLiveHostsInSystem() {
        return count;
 }
 
-// if flag = TRANS_OK, allow transactions
-//    flag = TRANS_WAIT, stop transactins
 int updateLiveHostsCommit() {
 #ifdef DEBUG      
   printf("%s -> Enter\n",__func__);
@@ -3311,7 +3398,7 @@ int duplicateLostObjects(unsigned int epoch_num,int *sdlist){
    * Backup     26      21,24
    */
 
-  if(inspectEpoch(epoch_num) < 0) return -1;
+  if(inspectEpoch(epoch_num,__func__) < 0) return -1;
 
   response = REQUEST_DUPLICATE;
 
index 446a78fb87c62cb6665e5a3fd4036676fbdca3bd..8176fd5e31b2ef398ab774be769da1de5b3f5ca1 100644 (file)
@@ -11,8 +11,6 @@ tlist_t* tlistCreate()
   transList->head = NULL;
   transList->size = 0;
 
-  pthread_mutex_init(&(transList->mutex),NULL);
-
   return transList;
 }
 
@@ -34,17 +32,16 @@ tlist_t* tlistDestroy(tlist_t* transList)
 }
 
 // tlistInsertNode extension
-tlist_t* tlistInsertNode2(tlist_t* transList,tlist_node_t* tNode) 
+tlist_t* tlistInsertNode2(tlist_t* transList,tlist_node_t* tNode,unsigned int epoch_num
 {
-  transList = tlistInsertNode(transList,tNode->transid,tNode->decision,tNode->status);
+  transList = tlistInsertNode(transList,tNode->transid,tNode->decision,tNode->status,epoch_num);
   return transList;
 }
 
 // return 0 if success, return -1 if fail
-tlist_t* tlistInsertNode(tlist_t* transList,unsigned int transid,char decision,char status) {
+tlist_t* tlistInsertNode(tlist_t* transList,unsigned int transid,char decision,char status,unsigned int epoch_num) {
 
 //  printf("%s -> ADD transID : %u decision %d  status  %d\n",__func__,transid,decision,status);
-  pthread_mutex_lock(&(transList->mutex));
   tlist_node_t* head = transList->head;
 
   if(head == NULL) {
@@ -56,13 +53,12 @@ tlist_t* tlistInsertNode(tlist_t* transList,unsigned int transid,char decision,c
     head->transid = transid;
     head->decision = decision;
     head->status = status;
+    head->epoch_num = epoch_num;
     head->next = NULL;
 
-    //pthread_mutex_lock(&(transList->mutex));
     transList->head = head;
     (transList->size)++;
     transList->flag = 1;
-    pthread_mutex_unlock(&(transList->mutex));
     return transList;
   }
   else {
@@ -77,12 +73,12 @@ tlist_t* tlistInsertNode(tlist_t* transList,unsigned int transid,char decision,c
     tmp->transid = transid;
     tmp->decision = decision;
     tmp->status = status;
+    tmp->epoch_num = epoch_num;
 
     tmp->next = transList->head;
     transList->head = tmp;
     (transList->size)++;
     transList->flag = 1;
-    pthread_mutex_unlock(&(transList->mutex));
     return transList;
   }
 }
@@ -90,7 +86,6 @@ tlist_t* tlistInsertNode(tlist_t* transList,unsigned int transid,char decision,c
 // return tlist_t if success, return null if cannot find
 tlist_node_t* tlistSearch(tlist_t* transList,unsigned int transid)
 {
-  pthread_mutex_lock(&(transList->mutex));
   tlist_node_t* ptr = transList->head;
 
   while(ptr != NULL)
@@ -100,20 +95,21 @@ tlist_node_t* tlistSearch(tlist_t* transList,unsigned int transid)
 
     ptr = ptr->next;
   }
-  pthread_mutex_unlock(&(transList->mutex));
   return ptr;
 }
 
 tlist_t* tlistRemove(tlist_t* transList,unsigned int transid)
 {
 //  printf("%s -> REMOVE transID : %u \n",__func__,transid);
-  pthread_mutex_lock(&(transList->mutex));
 
   int flag = -1;
   tlist_node_t* tmp;
   tlist_node_t* ptr = transList->head;
   tlist_node_t* prev = NULL;
 
+  if(transList->head == NULL)
+    return transList;
+
   /* if it is head */
   if(transList->head->transid == transid)
   {
@@ -123,7 +119,6 @@ tlist_t* tlistRemove(tlist_t* transList,unsigned int transid)
     (transList->size)--;
     transList->flag = 1;
 
-    pthread_mutex_unlock(&(transList->mutex));
     return transList;
   }
 
@@ -140,16 +135,12 @@ tlist_t* tlistRemove(tlist_t* transList,unsigned int transid)
       (transList->size)--;
       flag = 0;
       transList->flag = 1;
-      pthread_mutex_unlock(&(transList->mutex));
       return transList;
     }
     prev = ptr;
     ptr = ptr->next;
   }
     
-  pthread_mutex_unlock(&(transList->mutex));
-  printf("%s -> remove Fail!\n",__func__);
-
   return transList;
 }
 
@@ -171,7 +162,12 @@ tlist_node_t* tlistToArray(tlist_t* transList,int* size)
 
   while(walker)
   {
-    array[i++] = *walker;
+    array[i].transid = walker->transid;
+    array[i].decision = walker->decision;
+    array[i].status = walker->status;
+    array[i].epoch_num = walker->epoch_num;
+
+    i++;
     walker = walker->next;
   }
 
index 5c4da45141cc9e584a27610db0f456a94729f582..9364fb4ded4e1b26e9675c0ed69e89f12124b3d0 100644 (file)
@@ -12,7 +12,8 @@
 /* for machine flag */
 #define TRANS_OK     3
 #define TRANS_BEFORE 4
-#define TRANS_AFTER  5
+#define TRANS_INPROGRESS   5
+#define TRANS_AFTER  6
 
 /*
    Status
@@ -26,6 +27,7 @@ typedef struct trans_list_node {
   unsigned int transid;
   char decision;
   char status;
+  unsigned int epoch_num;
   struct trans_list_node *next;
 } tlist_node_t;
 
@@ -34,7 +36,6 @@ typedef struct trans_list
   tlist_node_t *head;
   int size;
   int flag;
-  pthread_mutex_t mutex;
 } tlist_t;
 
 // allocate tlist_t, return -1 if memory overflow
@@ -42,8 +43,8 @@ tlist_t* tlistCreate();
 tlist_t* tlistDestroy(tlist_t*);
 
 // return 0 if success, return -1 if fail
-tlist_t* tlistInsertNode(tlist_t* transList,unsigned int transid,char decision,char status);
-tlist_t* tlistInsertNode2(tlist_t* transList,tlist_node_t* tNode) ;
+tlist_t* tlistInsertNode(tlist_t* transList,unsigned int transid,char decision,char status,unsigned int epoch_num);
+tlist_t* tlistInsertNode2(tlist_t* transList,tlist_node_t* tNode,unsigned int epoch_num) ;
 
 // remove node.
 // return 0 if success, return -1 if fail