optimized picking of read-set during pile creation
authoradash <adash>
Tue, 18 May 2010 22:38:40 +0000 (22:38 +0000)
committeradash <adash>
Tue, 18 May 2010 22:38:40 +0000 (22:38 +0000)
Robust/src/Runtime/DSTM/interface_recovery/trans.c

index d5d433ef4acd0c2dd99a0d7f19896702c11183e9..731fab8bf308ad6b420ee2d8d352013dc96ab41b 100644 (file)
@@ -865,7 +865,10 @@ __attribute__((pure)) objheader_t *transRead2(unsigned int oid) {
       /* Look up in machine lookup table  and copy  into cache*/
       GETSIZE(size, objheader);
       size += sizeof(objheader_t);
-      objcopy = (objheader_t *) objstrAlloc(&t_cache, size);
+      if((objcopy = (objheader_t *) objstrAlloc(&t_cache, size)) == NULL) {
+        printf("DEBUG: %s() mlookup objcopy= %x\n", __func__, objcopy);
+        exit(-1);
+      }
       memcpy(objcopy, objheader, size);
       /* Insert into cache's lookup table */
       STATUS(objcopy)=0;
@@ -895,7 +898,10 @@ __attribute__((pure)) objheader_t *transRead2(unsigned int oid) {
       /* Look up in prefetch cache */
       GETSIZE(size, tmp);
       size+=sizeof(objheader_t);
-      objcopy = (objheader_t *) objstrAlloc(&t_cache, size);
+      if((objcopy = (objheader_t *) objstrAlloc(&t_cache, size)) == NULL) {
+        printf("DEBUG: %s() prefetch cache objcopy= %x\n", __func__, objcopy);
+        exit(-1);
+      }
       memcpy(objcopy, tmp, size);
       /* Insert into cache's lookup table */
       t_chashInsert(OID(tmp), objcopy);
@@ -961,6 +967,7 @@ remoteread:
       }
       pthread_mutex_unlock(&prefetchcache_mutex);
       memcpy(headerObj, objcopy, size+sizeof(objheader_t));
+      //printf("%s() DEBUG: type=%d\n",__func__, TYPE(headerObj));
       //make an entry in prefetch lookup hashtable
       prehashInsert(oid, headerObj);
       LOGEVENT('B');
@@ -1018,6 +1025,13 @@ plistnode_t *createPiles() {
   chashlistnode_t * ptr = c_table;
   /* Represents number of bins in the chash table */
   unsigned int size = c_size;
+#ifdef RECOVERY
+  int phostindex[numHostsInSystem];
+  int k;
+  for(k=0; k<numHostsInSystem; k++)
+    phostindex[k] = -1;
+  int hostIndex = 0;
+#endif
        for(i = 0; i < size ; i++) {
     chashlistnode_t * curr = &ptr[i];
                /* Inner loop to traverse the linked list of the cache lookupTable */
@@ -1028,6 +1042,7 @@ plistnode_t *createPiles() {
       headeraddr=(objheader_t *) curr->val;
 
 #ifdef RECOVERY
+
       oid = OID(headeraddr);
 
                          int makedirty = 0;
@@ -1044,28 +1059,70 @@ plistnode_t *createPiles() {
             mid = myIpAddr;
         }
 
-        //if(mid == myIpAddr) {
-        pile = pInsert(pile, headeraddr, getPrimaryMachine(mid), c_numelements);
-        //} else {
-        //  if(bit)
-        //   pile = pInsert(pile, headeraddr, getPrimaryMachine(mid), c_numelements);
-        //  else 
-        //    pile = pInsert(pile, headeraddr, getBackupMachine(mid), c_numelements);
-        //}
+        int selectMid=0;
+        if(mid == myIpAddr) {
+          pile = pInsert(pile, headeraddr, getPrimaryMachine(mid), c_numelements);
+          if(!checkIndex(findHost(myIpAddr), phostindex)) {
+            phostindex[hostIndex++] = findHost(myIpAddr);
+          }
+        } else {
+          int pindex = findHost(mid);//primary copy's index
+          int bindex = (findHost(mid) + 1) % numHostsInSystem;//backup copy's index
+          if(checkIndex(pindex, phostindex)) {
+            pile = pInsert(pile, headeraddr, getPrimaryMachine(mid), c_numelements);
+            selectMid = 1;
+          } else if(checkIndex(bindex, phostindex)) {
+            pile = pInsert(pile, headeraddr, getBackupMachine(mid), c_numelements);
+          } else {
+            //check if any indexes present in the phostindex arry is odd or even
+            int chktype;
+            if((chktype = typeIndex(phostindex)) != -1) {
+              if(chktype == 1) { //odd indexed machines
+                //pick up either backup or primary copy based on the type of previous indexes
+                if((pindex%2) == 0) { 
+                  pile = pInsert(pile, headeraddr, getBackupMachine(mid), c_numelements);
+                  phostindex[hostIndex++] = bindex;
+                } else {
+                  pile = pInsert(pile, headeraddr, getPrimaryMachine(mid), c_numelements);
+                  selectMid = 1;
+                  phostindex[hostIndex++] = pindex;
+                }
+              } else { //even indexed machines
+                if((pindex%2) == 0) {
+                  pile = pInsert(pile, headeraddr, getPrimaryMachine(mid), c_numelements);
+                  selectMid = 1;
+                  phostindex[hostIndex++] = pindex;
+                } else {
+                  pile = pInsert(pile, headeraddr, getBackupMachine(mid), c_numelements);
+                  phostindex[hostIndex++] = bindex;
+                }
+              }
+            } else {
+              if(((myIpAddr%2 == 0) && ((mid%2) == 0)) || ((myIpAddr%2 == 0) && ((mid%2) == 0))) {
+                pile = pInsert(pile, headeraddr, getPrimaryMachine(mid), c_numelements);
+                selectMid = 1;
+                phostindex[hostIndex++] = pindex;
+              } else {
+                pile = pInsert(pile, headeraddr, getBackupMachine(mid), c_numelements);
+                phostindex[hostIndex++] = bindex;
+              }
+            }
+          }
+        }
 
         if(numLiveHostsInSystem > 1) {
           if(makedirty) { 
             STATUS(headeraddr) = DIRTY;
-            //if(mid == myIpAddr) {
-            //  pile = pInsert(pile, headeraddr, getBackupMachine(mid), c_numelements);
-            //} else {
-            //  if(bit)
-            pile = pInsert(pile, headeraddr, getBackupMachine(mid), c_numelements);
-            //  else 
-            //    pile = pInsert(pile, headeraddr, getPrimaryMachine(mid), c_numelements);
-            // }
+            if(mid == myIpAddr) {
+              pile = pInsert(pile, headeraddr, getBackupMachine(mid), c_numelements);
+            } else {
+              if(selectMid) {
+                pile = pInsert(pile, headeraddr, getBackupMachine(mid), c_numelements);
+              } else {
+                pile = pInsert(pile, headeraddr, getPrimaryMachine(mid), c_numelements);
+              }
+            }
           }
-          //pile = pInsert(pile, headeraddr, getBackupMachine(mid), c_numelements);
         }
 #else
                // Get machine location for object id (and whether local or not)
@@ -1120,6 +1177,34 @@ plistnode_t *createPiles() {
 }
 #endif
 
+/**
+ *  This function return 0 if indexes present are even
+ *  returns 1 if indexes present are odd
+ *  return -1 if indexes present are -1
+**/
+int typeIndex(int *phostindex) {
+  if(phostindex[0] == -1)
+    return -1;
+  if((phostindex[0]%2) == 0)
+    return 0;
+  else 
+    return 1;
+}
+
+/**
+ * This function returns 1 is pindex is found 
+ * in the phostindex array else
+ * returns 0
+ **/
+int checkIndex(int pindex, int *phostindex) {
+  int i;
+  for(i=0; i<numHostsInSystem; i++) {
+    if(phostindex[i] == pindex)
+      return 1;
+  }
+  return 0;
+}
+
 /* This function initiates the transaction commit process
  * Spawns threads for each of the new connections with Participants
  * and creates new piles by calling the createPiles(),
@@ -1210,6 +1295,7 @@ int transCommit() {
     int localReqsock = -1;
     trans_req_data_t *tosend;
     tosend = calloc(pilecount, sizeof(trans_req_data_t));
+
     while(pile != NULL) {
 #ifdef DEBUG
                        printf("%s-> New pile:[%s],", __func__, midtoIPString(pile->mid));
@@ -1229,7 +1315,7 @@ int transCommit() {
                        tosend[sockindex].oidcreated = pile->oidcreated;
 
 
-      int sd = 0;
+            int sd = 0;
                        if(pile->mid != myIpAddr) {
                                if((sd = getSockWithLock(transRequestSockPool, pile->mid)) < 0) {
                                        printf("\ntransRequest(): socket create error\n");
@@ -1285,7 +1371,7 @@ int transCommit() {
                                        offset+=size;
                                }
                                send_data(sd, modptr, tosend[sockindex].f.sum_bytes);
-                //send_buf(sd, &writebuffer, modptr, tosend[sockindex].f.sum_bytes);
+                //forcesend_buf(sd, &writebuffer, modptr, tosend[sockindex].f.sum_bytes);
 
                                free(modptr);
                        } else { //handle request locally
@@ -1294,7 +1380,7 @@ int transCommit() {
                        sockindex++;
                        pile = pile->next;
                } //end of pile processing
-
+   
                /* Recv Ctrl msgs from all machines */
 #ifdef DEBUG
                printf("%s-> Finished sending transaction read/mod objects\n",__func__);
@@ -1356,8 +1442,7 @@ int transCommit() {
 #endif
                        }
                }
-
-               /* Decide the final response */
+        /* Decide the final response */
                if((finalResponse = decideResponse(getReplyCtrl, &treplyretry, pilecount)) == 0) {
                        printf("Error: %s() in updating prefetch cache %s, %d\n", __func__, __FILE__, __LINE__);
                        free(tosend);
@@ -1404,7 +1489,7 @@ int transCommit() {
                                                free(listmid);
                                                return 1;
                                        }
-
+                    
 #ifdef ABORTREADERS
                                        removetransaction(tosend[i].oidmod,tosend[i].f.nummod);
                                        removethisreadtransaction(tosend[i].objread, tosend[i].f.numread);
@@ -1436,8 +1521,7 @@ int transCommit() {
       } 
 #endif
     }
-
-
+       
    for(i = 0; i< pilecount; i++) {
      if(socklist[i] > 0) {
        freeSockWithLock(transRequestSockPool,listmid[i], socklist[i]);
@@ -1451,7 +1535,11 @@ int transCommit() {
       pDelete(pile_ptr);
     /* wait a random amount of time before retrying to commit transaction*/
     if(treplyretry) {
-        randomdelay();
+      //treplyretryCount++;
+      //if(treplyretryCount >= NUM_TRY_TO_COMMIT)
+      //  exponentialdelay();
+      //else
+      randomdelay();
 #ifdef TRANSSTATS
                        nSoftAbort++;
 #endif
@@ -1459,7 +1547,6 @@ int transCommit() {
        } while (treplyretry && deadmid != -1);
 
 #ifdef RECOVERY
-
   //===========  after transaction point
   tlist_node_t* tNode = tlistSearch(transList,transID);
   inspectTransaction(finalResponse,transID,"Coordinator",TRANS_AFTER);
@@ -2166,7 +2253,6 @@ void commitCountForObjMod(char *getReplyCtrl, unsigned int *oidnotfound, unsigne
 
        //Keep track of what is locked
        oidlocked[(*numoidlocked)++] = OID(((objheader_t *)mobj));
-       //printf("%s() oid = %d, type = %d\t", __func__, OID(mobj), TYPE((objheader_t *)mobj));
        return;
       }
     } else { //A lock is acquired some place else
@@ -2205,7 +2291,6 @@ void commitCountForObjRead(char *getReplyCtrl, unsigned int *oidnotfound, unsign
                                *getReplyCtrl = TRANS_DISAGREE;
                                //Keep track of what is locked
                                oidlocked[(*numoidlocked)++] = OID(((objheader_t *)mobj));
-                               //printf("%s() oid = %d, type = %d\t", __func__, OID(mobj), TYPE((objheader_t *)mobj));
                                return;
                        }
                } else { //Has reached max number of readers or some other transaction
@@ -2216,7 +2301,6 @@ void commitCountForObjRead(char *getReplyCtrl, unsigned int *oidnotfound, unsign
                                (*v_nomatch)++;
                                /* Send TRANS_DISAGREE to Coordinator */
                                *getReplyCtrl = TRANS_DISAGREE;
-                               //printf("%s() oid = %d, type = %d\t", __func__, OID(mobj), TYPE((objheader_t *)mobj));
                                return;
                        }
     }
@@ -2293,9 +2377,9 @@ int transComProcess(trans_req_data_t *tdata, trans_commit_data_t *transinfo) {
     header->version += 1;
     if(header->notifylist != NULL) {
 #ifdef RECOVERY
-      printf("%s -> to notifyAll\n",__func__);
+      //printf("%s -> to notifyAll\n",__func__);
       if(header->isBackup == 0) {  // if it is primary obj, notify 
-        printf("%s -> Called notifyAll\n",__func__);
+        //printf("%s -> Called notifyAll\n",__func__);
         notifyAll(&header->notifylist, OID(header), header->version);
       }
       else                        // if not, just clear the notification list