two phase recovery
authorjihoonl <jihoonl>
Wed, 7 Apr 2010 00:55:20 +0000 (00:55 +0000)
committerjihoonl <jihoonl>
Wed, 7 Apr 2010 00:55:20 +0000 (00:55 +0000)
Robust/src/Runtime/DSTM/interface_recovery/dstm.h
Robust/src/Runtime/DSTM/interface_recovery/dstmserver.c
Robust/src/Runtime/DSTM/interface_recovery/trans.c

index 2800d8ab66e20f47062bd001892ee978ec398bd4..dbb8abfb168bb712bb566b1dd4ec2e931c8dee3f 100644 (file)
@@ -289,11 +289,11 @@ unsigned int updateLiveHosts();
 void updateLiveHostsList(int mid);
 int updateLiveHostsCommit();
 void receiveNewHostLists(int accept);
-void stopTransactions();
+void stopTransactions(int TRANS_FLAG);
 void sendTransList(int acceptfd);
 void receiveTransList(int acceptfd);
 int combineTransactionList(tlist_node_t* tArray,int size);
-char inspectTransaction(char control,unsigned int transid);
+char inspectTransaction(char control,unsigned int transid,char* debug,int TRANS_FLAG);
 
 void respondToLeader();
 void setLocateObjHosts();
index 5a93f44c064f926dd2c06cf5a848db4682866186..3b071e461b7893e55e2a47e8c11579bc66703cd4 100644 (file)
@@ -538,7 +538,7 @@ void *dstmAccept(void *acceptfd) {
 #ifdef RECOVERY
       case REQUEST_TRANS_WAIT:
         receiveNewHostLists((int)acceptfd);        
-        stopTransactions();
+        stopTransactions(TRANS_BEFORE);
 
         response = RESPOND_TRANS_WAIT;
         send_data((int)acceptfd,&response,sizeof(char));
@@ -557,6 +557,10 @@ void *dstmAccept(void *acceptfd) {
         printf("control -> REQUEST_TRANS_LIST\n");
         sendTransList((int)acceptfd);
         receiveTransList((int)acceptfd);
+
+        pthread_mutex_lock(&liveHosts_mutex);
+        okCommit = TRANS_AFTER;
+        pthread_mutex_unlock(&liveHosts_mutex);
         break;
 
       case REQUEST_TRANS_RESTART:
@@ -960,6 +964,8 @@ int processClientReq(fixed_data_t *fixed, trans_commit_data_t *transinfo,
     return 1;
   }
 
+//  printf("%s -> Waiting for transID : %u\n",__func__,fixed->transid);
+
        int timeout = recv_data((int)acceptfd, &control, sizeof(char));
 
 #ifdef RECOVERY
@@ -967,7 +973,7 @@ int processClientReq(fixed_data_t *fixed, trans_commit_data_t *transinfo,
     control = -1;
   }
   // check if it is allowed to commit
-  control = inspectTransaction(control,fixed->transid);
+  control = inspectTransaction(control,fixed->transid,"processClientReq",TRANS_BEFORE);
   thashInsert(fixed->transid, control);
 
 #endif
@@ -1029,11 +1035,14 @@ int processClientReq(fixed_data_t *fixed, trans_commit_data_t *transinfo,
 
   tlist_node_t* tNode = tlistSearch(transList,fixed->transid);
   tNode->status = TRANS_OK;
+  inspectTransaction(control,fixed->transid,"processClientReq",TRANS_AFTER);
 
   pthread_mutex_lock(&clearNotifyList_mutex);
   transList = tlistRemove(transList,fixed->transid);
   pthread_mutex_unlock(&clearNotifyList_mutex);
 
+  // ====================after transaction point
+
 #endif
 
   /* Free memory */
@@ -1845,15 +1854,15 @@ void receiveNewHostLists(int acceptfd)
 }
 
 /* wait until all transaction waits for leader's decision */
-void stopTransactions()
+void stopTransactions(int TRANS_FLAG)
 {
-  printf("%s - > Enter\n",__func__);
+//  printf("%s - > Enter flag :%d\n",__func__,TRANS_FLAG);
   int size = transList->size;
   int i;
   tlist_node_t* walker;
   
   pthread_mutex_lock(&liveHosts_mutex);
-  okCommit = TRANS_WAIT;
+  okCommit = TRANS_FLAG;
   pthread_mutex_unlock(&liveHosts_mutex);
   /* make sure that all transactions are stopped */
 
@@ -1866,7 +1875,7 @@ void stopTransactions()
     while(walker)
     {
       // locking
-      while(!(walker->status == TRANS_WAIT || walker->status == TRANS_OK)) {
+      while(!(walker->status == TRANS_FLAG || walker->status == TRANS_OK)) {
         printf("%s -> Waiting for %u - Status : %d tHash : %d\n",__func__,walker->transid,walker->status,thashSearch(walker->transid));
         sleep(2);
       }
@@ -1876,7 +1885,7 @@ void stopTransactions()
   }while(transList->flag == 1);
 
   pthread_mutex_unlock(&clearNotifyList_mutex);
-  printf("%s - > Exit\n",__func__);
+//  printf("%s - > Exit\n",__func__);
 }
 
 void sendTransList(int acceptfd)
@@ -1913,12 +1922,10 @@ void sendTransList(int acceptfd)
   }
 
   free(transArray);
-  printf("%s - > Exit\n",__func__);
 }
 
 void receiveTransList(int acceptfd)
 {
-  printf("%s -> Enter\n",__func__);
   int size;
   tlist_node_t* tArray;
   tlist_node_t* walker;
@@ -1928,7 +1935,6 @@ void receiveTransList(int acceptfd)
   
   recv_data((int)acceptfd,&size,sizeof(int));
 
-  printf("%s -> size : %d\n",__func__,size);
 
   if(size > 0) {
     if((tArray = calloc(size,sizeof(tlist_node_t) * size)) == NULL)
@@ -1954,11 +1960,7 @@ void receiveTransList(int acceptfd)
     response = -1;
   }
 
-  printf("%s -> response : %d\n",__func__,response);
-  
   send_data((int)acceptfd,&response,sizeof(char));
-
-  printf("%s -> End\n",__func__);
 }
 
 
@@ -1976,6 +1978,7 @@ int combineTransactionList(tlist_node_t* tArray,int size)
         if(walker->transid == tArray[i].transid)
         {
           walker->decision = tArray[i].decision;
+          walker->status = tArray[i].status;
           break;
         }
       }
@@ -1985,13 +1988,13 @@ int combineTransactionList(tlist_node_t* tArray,int size)
   return flag;
 }
 
-char inspectTransaction(char finalResponse,unsigned int transid)
+char inspectTransaction(char finalResponse,unsigned int transid,char* debug,int TRANS_FLAG)
 {
   tlist_node_t* tNode;
 
   tNode = tlistSearch(transList,transid);
   
-  if(finalResponse < 0) {
+  if(finalResponse <= 0) {
     tNode->decision = DECISION_LOST;
   }
   else {
@@ -2001,11 +2004,12 @@ char inspectTransaction(char finalResponse,unsigned int transid)
   if(!((tNode->decision != DECISION_LOST) && (okCommit == TRANS_OK))) 
   {
     pthread_mutex_lock(&liveHosts_mutex);
-    tNode->status = TRANS_WAIT;
+    tNode->status = TRANS_FLAG;
     pthread_mutex_unlock(&liveHosts_mutex);
 
-    while(!((tNode->decision != DECISION_LOST) && (okCommit == TRANS_OK))) { 
-      printf("%s -> transID : %u decision : %d is waiting\n",__func__,tNode->transid,tNode->decision);
+    // if decision is not lost and okCommit is not TRANS_FLAG, get out of this loop
+    while(!((tNode->decision != DECISION_LOST) && (okCommit != TRANS_FLAG))) { 
+      printf("%s -> transID : %u decision : %d is waiting flag : %d\n",debug,tNode->transid,tNode->decision,TRANS_FLAG);
       sleep(3);
     }
 
index 6a197c478a36d43f35eb5abcabdaaae96f7b36cb..e0bcf79dcf2666e38ef87e07fa0b671e681686bf 100644 (file)
@@ -1154,7 +1154,7 @@ int transCommit() {
 
 #ifdef RECOVERY
   while(okCommit != TRANS_OK) {
-    printf("%s -> new Transactin is waiting\n",__func__);
+//    printf("%s -> new Transactin is waiting\n",__func__);
     sleep(2);
   }
 
@@ -1358,11 +1358,9 @@ int transCommit() {
 
 #ifdef RECOVERY
 // wait until leader fix the system
+
     if(okCommit != TRANS_OK) {
-      while(okCommit != TRANS_OK) {
-        printf("%s -> Coordinator is waiting finalResponse : %d\n",__func__,finalResponse);
-        sleep(1);
-      }
+      inspectTransaction(finalResponse,transID,"transCommit before response",TRANS_AFTER);
       finalResponse = TRANS_ABORT;
     }
 #endif
@@ -1452,8 +1450,13 @@ int transCommit() {
        } while (treplyretry && deadmid != -1);
 
 #ifdef RECOVERY
+
+  //===========  after transaction point
   tlist_node_t* tNode = tlistSearch(transList,transID);
+  inspectTransaction(finalResponse,transID,"Coordinator",TRANS_AFTER);
+
   tNode->status = TRANS_OK;
+  finalResponse = tNode->decision;
 
   pthread_mutex_lock(&clearNotifyList_mutex);
   transList = tlistRemove(transList,transID);
@@ -1555,7 +1558,7 @@ void handleLocalReq(trans_req_data_t *tdata, trans_commit_data_t *transinfo, cha
 char doLocalProcess(char finalResponse, trans_req_data_t *tdata, trans_commit_data_t *transinfo) {
 
 #ifdef RECOVERY
-  finalResponse = inspectTransaction(finalResponse,tdata->f.transid);
+  finalResponse = inspectTransaction(finalResponse,tdata->f.transid,"Local Commit",TRANS_BEFORE);
   thashInsert(tdata->f.transid,finalResponse);
 #endif
 
@@ -1833,10 +1836,10 @@ void restoreDuplicationState(unsigned int deadHost)
 
   // clear transaction
   clearTransaction();
+//  getchar();
 
   // transfer lost objects
   duplicateLostObjects(deadHost);
-  getchar();
   // restart transactions
   restartTransactions();
 
@@ -1860,8 +1863,6 @@ void notifyRestoration()
   int sd;
   int sdlist[numHostsInSystem];
 
-  printf("%s -> Enter\n",__func__);
-       
   printHostsStatus();
 
   pthread_mutex_lock(&liveHosts_mutex);
@@ -1909,8 +1910,7 @@ void notifyRestoration()
     }
   }
   /* stop all local transactions */
-  stopTransactions();
-  printf("%s -> End\n",__func__);
+  stopTransactions(TRANS_BEFORE);
 }
 
 /* acknowledge leader that all transactions are waiting */
@@ -1991,6 +1991,8 @@ void clearTransaction()
      returns an array of ongoing transactions  */
   makeTransactionLists(&tlist,sdlist);
 
+//  getchar();
+
   /* release the cleared decisions to all machines */
   releaseTransactionLists(tlist,sdlist);
 
@@ -2001,8 +2003,7 @@ void clearTransaction()
   }
 
   tlistDestroy(tlist);
-  
-  printf("%s -> End\n",__func__);
+   printf("%s -> End\n",__func__);
 }
 
 // after this fuction
@@ -2022,7 +2023,8 @@ void makeTransactionLists(tlist_t** tlist,int* sdlist)
   tlist_node_t* walker = transList->head;
   
   while(walker) {
-    tlistInsertNode2(currentTransactionList,walker);
+    walker->status = TRANS_OK;
+    currentTransactionList = tlistInsertNode2(currentTransactionList,walker);
     walker = walker->next;
   }
 
@@ -2056,24 +2058,30 @@ void makeTransactionLists(tlist_t** tlist,int* sdlist)
         tmp = tlistSearch(currentTransactionList,transArray[j].transid);
           
         if(tmp == NULL) {
+          tlist_node_t* tNode = &transArray[j];
+          tNode->status = TRANS_OK;
           currentTransactionList = tlistInsertNode2(currentTransactionList,&(transArray[j]));
         }
         else {
-          if(tmp->decision == DECISION_LOST)
+          if((tmp->decision != TRANS_COMMIT && tmp->decision != TRANS_ABORT) 
+                && (transArray[j].decision == TRANS_COMMIT || transArray[j].decision == TRANS_ABORT))
           {
            tmp->decision = transArray[j].decision;
-           }
+          }
         }
       }  // j loop
     }
   }  // i loop
+  printf("Before\n");
+  tlistPrint(currentTransactionList);
 
   // current transaction list is completed
   // now see if any transaction is still missing
   walker = currentTransactionList->head;
 
   while(walker) {
-    if(walker->decision == DECISION_LOST) {
+//    if(walker->decision == DECISION_LOST) {
       for(i = 0 ; i < numHostsInSystem; i++) {
         if(sdlist[i] != -1 && hostIpAddrs[i] != myIpAddr)
         {
@@ -2105,7 +2113,9 @@ void makeTransactionLists(tlist_t** tlist,int* sdlist)
         printf("%s -> No one knows decision for transID : %u\n",__func__,walker->transid);
         walker->decision = TRANS_ABORT;
       }
-    }
+      if(walker->decision == TRYING_TO_COMMIT) {
+        printf("%s -> no decision yet transID : %u\n",__func__,walker->transid);
+      }
     walker = walker->next;
   } // while loop
 
@@ -2120,6 +2130,7 @@ void makeTransactionLists(tlist_t** tlist,int* sdlist)
   }
 
   *tlist = currentTransactionList;
+  printf("\n\nAfter\n");
   tlistPrint(currentTransactionList);
 
   printf("%s -> End\n",__func__);
@@ -2141,8 +2152,6 @@ void releaseTransactionLists(tlist_t* tlist,int* sdlist)
   {
     if(sdlist[i] != -1 && hostIpAddrs[i] != myIpAddr)
     {
-//      printf("%s -> Sent to sd : %d\n",__func__,sdlist[i]);
-
       if(size == 0) {
         size = -1;
         send_data(sdlist[i],&size,sizeof(int));
@@ -2159,6 +2168,11 @@ void releaseTransactionLists(tlist_t* tlist,int* sdlist)
         printf("%s -> problem\n",__func__);
         exit(0);
       }
+
+      pthread_mutex_lock(&liveHosts_mutex);
+      okCommit = TRANS_AFTER;
+      pthread_mutex_unlock(&liveHosts_mutex);
+
     }
   }
   
@@ -2185,7 +2199,6 @@ void restartTransactions()
 {
   int i;
   int sd;
-  printf("%s -> Enter\n",__func__);
   for(i = 0; i < numHostsInSystem; i++) {
     if(hostIpAddrs[i] == myIpAddr) {
       pthread_mutex_lock(&liveHosts_mutex);
@@ -2209,7 +2222,6 @@ void restartTransactions()
       }
     }
   }
-  printf("%s -> End\n",__func__);
 }
 
 #endif
@@ -3316,6 +3328,7 @@ int allHostsLive() {
 
 #ifdef RECOVERY
 void duplicateLostObjects(unsigned int mid){
+  printf("%s -> Enter\n",__func__);
 
 #ifdef RECOVERYSTATS
   unsigned int dupeSize = 0;
@@ -3346,8 +3359,8 @@ void duplicateLostObjects(unsigned int mid){
    * Backup     26      21,24
    */
 
-  if(((psd = getSockWithLock(transRequestSockPool, originalMid)) < 0 ) || 
-      ((bsd = getSockWithLock(transRequestSockPool,backupMid)) <0)) {
+  if(((psd = getSockWithLock(transPrefetchSockPool, originalMid)) < 0 ) || 
+      ((bsd = getSockWithLock(transPrefetchSockPool,backupMid)) <0)) {
 
     printf("%s -> psd : %d bsd : %d\n",__func__,psd,bsd);
     printf("%s -> Socket create error\n",__func__);
@@ -3390,8 +3403,8 @@ void duplicateLostObjects(unsigned int mid){
     exit(0);
   }
 
-  freeSockWithLock(transRequestSockPool, originalMid, psd);
-  freeSockWithLock(transRequestSockPool, backupMid, bsd);
+  freeSockWithLock(transPrefetchSockPool, originalMid, psd);
+  freeSockWithLock(transPrefetchSockPool, backupMid, bsd);
 
 #ifdef RECOVERYSTATS
   recoverStat[numRecovery-1].recoveredData = dupeSize;