transaction clearing done.
authorjihoonl <jihoonl>
Thu, 8 Apr 2010 06:10:16 +0000 (06:10 +0000)
committerjihoonl <jihoonl>
Thu, 8 Apr 2010 06:10:16 +0000 (06:10 +0000)
need to test with more benchmarks

Robust/src/Runtime/DSTM/interface_recovery/dstm.h
Robust/src/Runtime/DSTM/interface_recovery/dstmserver.c
Robust/src/Runtime/DSTM/interface_recovery/mlookup.c
Robust/src/Runtime/DSTM/interface_recovery/trans.c

index dbb8abfb168bb712bb566b1dd4ec2e931c8dee3f..fd0bbd3654fbd68c1a269472741097b04fdfcfb7 100644 (file)
@@ -291,7 +291,7 @@ int updateLiveHostsCommit();
 void receiveNewHostLists(int accept);
 void stopTransactions(int TRANS_FLAG);
 void sendTransList(int acceptfd);
-void receiveTransList(int acceptfd);
+int receiveTransList(int acceptfd);
 int combineTransactionList(tlist_node_t* tArray,int size);
 char inspectTransaction(char control,unsigned int transid,char* debug,int TRANS_FLAG);
 
index 3b071e461b7893e55e2a47e8c11579bc66703cd4..8e82efcbff209663739b5df7c18613ff317b1489 100644 (file)
@@ -556,11 +556,10 @@ void *dstmAccept(void *acceptfd) {
       case REQUEST_TRANS_LIST:
         printf("control -> REQUEST_TRANS_LIST\n");
         sendTransList((int)acceptfd);
-        receiveTransList((int)acceptfd);
-
-        pthread_mutex_lock(&liveHosts_mutex);
-        okCommit = TRANS_AFTER;
-        pthread_mutex_unlock(&liveHosts_mutex);
+        response =  receiveTransList((int)acceptfd);
+        stopTransactions(TRANS_AFTER);
+  
+        send_data((int)acceptfd,&response,sizeof(char));
         break;
 
       case REQUEST_TRANS_RESTART:
@@ -1031,18 +1030,15 @@ int processClientReq(fixed_data_t *fixed, trans_commit_data_t *transinfo,
        }
 
 #ifdef RECOVERY
-//  printf("%s -> transID : %u has been committed\n",__func__,transID);
+  inspectTransaction(control,fixed->transid,"processClientReq",TRANS_AFTER);
 
   tlist_node_t* tNode = tlistSearch(transList,fixed->transid);
   tNode->status = TRANS_OK;
-  inspectTransaction(control,fixed->transid,"processClientReq",TRANS_AFTER);
 
   pthread_mutex_lock(&clearNotifyList_mutex);
   transList = tlistRemove(transList,fixed->transid);
   pthread_mutex_unlock(&clearNotifyList_mutex);
 
-  // ====================after transaction point
-
 #endif
 
   /* Free memory */
@@ -1329,12 +1325,6 @@ char getCommitCountForObjRead(unsigned int *oidnotfound, unsigned int *oidlocked
     (*objnotfound)++;
        *control = TRANS_DISAGREE;
   } else {     /* If Obj found in machine (i.e. has not moved) */
-#ifdef DEBUG
-    printf("%s -> Obj found!!\n",__func__);
-       printf("%s -> Obj found: oid = %d, type = %d\t\n", __func__, OID(mobj), TYPE((objheader_t *)mobj));
-         fflush(stdout);
-#endif
-    
     /* Check if Obj is locked by any previous transaction */
     if (read_trylock(STATUSPTR(mobj))) { //Can further acquire read locks
       if (version == ((objheader_t *)mobj)->version) { /* match versions */
@@ -1864,8 +1854,8 @@ void stopTransactions(int TRANS_FLAG)
   pthread_mutex_lock(&liveHosts_mutex);
   okCommit = TRANS_FLAG;
   pthread_mutex_unlock(&liveHosts_mutex);
-  /* make sure that all transactions are stopped */
 
+  /* make sure that all transactions are stopped */
   pthread_mutex_lock(&clearNotifyList_mutex);
 
   do {
@@ -1876,8 +1866,8 @@ void stopTransactions(int TRANS_FLAG)
     {
       // locking
       while(!(walker->status == TRANS_FLAG || walker->status == TRANS_OK)) {
-        printf("%s -> Waiting for %u - Status : %d tHash : %d\n",__func__,walker->transid,walker->status,thashSearch(walker->transid));
-        sleep(2);
+//        printf("%s -> Waiting for %u - Status : %d tHash : %d\n",__func__,walker->transid,walker->status,thashSearch(walker->transid));
+        randomdelay();
       }
 
       walker = walker->next;
@@ -1885,7 +1875,6 @@ void stopTransactions(int TRANS_FLAG)
   }while(transList->flag == 1);
 
   pthread_mutex_unlock(&clearNotifyList_mutex);
-//  printf("%s - > Exit\n",__func__);
 }
 
 void sendTransList(int acceptfd)
@@ -1910,7 +1899,7 @@ void sendTransList(int acceptfd)
   // check if it already commit the decision for a transaction
   recv_data((int)acceptfd,&response, sizeof(char));
 
-  while(response == REQUEST_TRANS_CHECK)
+  while(response == REQUEST_TRANS_CHECK && response != REQUEST_TRANS_COMPLETE )
   {  
     int transid;
     recv_data((int)acceptfd,&transid, sizeof(unsigned int));
@@ -1924,7 +1913,7 @@ void sendTransList(int acceptfd)
   free(transArray);
 }
 
-void receiveTransList(int acceptfd)
+int receiveTransList(int acceptfd)
 {
   int size;
   tlist_node_t* tArray;
@@ -1960,7 +1949,7 @@ void receiveTransList(int acceptfd)
     response = -1;
   }
 
-  send_data((int)acceptfd,&response,sizeof(char));
+  return response;
 }
 
 
@@ -1978,7 +1967,7 @@ int combineTransactionList(tlist_node_t* tArray,int size)
         if(walker->transid == tArray[i].transid)
         {
           walker->decision = tArray[i].decision;
-          walker->status = tArray[i].status;
+//          walker->status = tArray[i].status;
           break;
         }
       }
@@ -2001,7 +1990,9 @@ char inspectTransaction(char finalResponse,unsigned int transid,char* debug,int
     tNode->decision = finalResponse;
   }
 
-  if(!((tNode->decision != DECISION_LOST) && (okCommit == TRANS_OK))) 
+//  printf("%s -> decision = %d okCommit = %d\n",__func__,tNode->decision,okCommit);
+
+  if((tNode->decision == DECISION_LOST) || (okCommit != TRANS_OK))
   {
     pthread_mutex_lock(&liveHosts_mutex);
     tNode->status = TRANS_FLAG;
@@ -2009,8 +2000,8 @@ char inspectTransaction(char finalResponse,unsigned int transid,char* debug,int
 
     // if decision is not lost and okCommit is not TRANS_FLAG, get out of this loop
     while(!((tNode->decision != DECISION_LOST) && (okCommit != TRANS_FLAG))) { 
-      printf("%s -> transID : %u decision : %d is waiting flag : %d\n",debug,tNode->transid,tNode->decision,TRANS_FLAG);
-      sleep(3);
+//      printf("%s -> transID : %u decision : %d is waiting flag : %d\n",debug,tNode->transid,tNode->decision,TRANS_FLAG);
+      randomdelay();
     }
 
     finalResponse = tNode->decision;
index 786281b1f2328d25b9d0274bb407ed56779744df..8e47aa751f23fb4f660ef4adb06b21edefe7a2f1 100644 (file)
@@ -207,7 +207,7 @@ unsigned int *mhashGetKeys(unsigned int *numKeys) {
 }
 
 #ifdef RECOVERY
-void* mhashGetDuplicate(int *dupeSize, int backup) { //how big?
+void* mhashGetDuplicate(int *dupeSize, int backup) { 
 #ifdef DEBUG
        printf("%s-> Start\n", __func__); 
 #endif
index e0bcf79dcf2666e38ef87e07fa0b671e681686bf..4139144ecd2dc75a6a5078c649922619184d2a4f 100644 (file)
@@ -1155,7 +1155,7 @@ int transCommit() {
 #ifdef RECOVERY
   while(okCommit != TRANS_OK) {
 //    printf("%s -> new Transactin is waiting\n",__func__);
-    sleep(2);
+    randomdelay();
   }
 
   transList = tlistInsertNode(transList,transID,TRYING_TO_COMMIT,TRANS_OK);
@@ -1358,10 +1358,10 @@ int transCommit() {
 
 #ifdef RECOVERY
 // wait until leader fix the system
-
     if(okCommit != TRANS_OK) {
-      inspectTransaction(finalResponse,transID,"transCommit before response",TRANS_AFTER);
+      inspectTransaction(finalResponse,transID,"transCommit before response",TRANS_BEFORE);
       finalResponse = TRANS_ABORT;
+      treplyretry = 0;
     }
 #endif
 
@@ -1836,10 +1836,10 @@ void restoreDuplicationState(unsigned int deadHost)
 
   // clear transaction
   clearTransaction();
-//  getchar();
 
   // transfer lost objects
   duplicateLostObjects(deadHost);
+
   // restart transactions
   restartTransactions();
 
@@ -1991,7 +1991,6 @@ void clearTransaction()
      returns an array of ongoing transactions  */
   makeTransactionLists(&tlist,sdlist);
 
-//  getchar();
 
   /* release the cleared decisions to all machines */
   releaseTransactionLists(tlist,sdlist);
@@ -2070,6 +2069,8 @@ void makeTransactionLists(tlist_t** tlist,int* sdlist)
           }
         }
       }  // j loop
+
+      free(transArray);
     }
   }  // i loop
  
@@ -2169,10 +2170,8 @@ void releaseTransactionLists(tlist_t* tlist,int* sdlist)
         exit(0);
       }
 
-      pthread_mutex_lock(&liveHosts_mutex);
-      okCommit = TRANS_AFTER;
-      pthread_mutex_unlock(&liveHosts_mutex);
-
+//      okCommit = TRANS_AFTER;
+      stopTransactions(TRANS_AFTER);
     }
   }