bug fix for invalidation (also allow invalidation of objects when these objects are
[IRC.git] / Robust / src / Runtime / DSTM / interface / trans.c
index c4139fc0ee4718c391f8302c293d52ebe593b66c..c0dfe4e5f038c80a54e090e1cee1c2c5c618d3d4 100644 (file)
@@ -1,6 +1,5 @@
 #include "dstm.h"
 #include "ip.h"
-#include "clookup.h"
 #include "machinepile.h"
 #include "mlookup.h"
 #include "llookup.h"
@@ -9,6 +8,8 @@
 #include "threadnotify.h"
 #include "queue.h"
 #include "addUdpEnhance.h"
+#include "addPrefetchEnhance.h"
+#include "gCollect.h"
 #ifdef COMPILER
 #include "thread.h"
 #endif
@@ -19,6 +20,7 @@
 
 /* Global Variables */
 extern int classsize[];
+pfcstats_t *evalPrefetch;
 objstr_t *prefetchcache; //Global Prefetch cache
 pthread_mutex_t prefetchcache_mutex;// Mutex to lock Prefetch Cache
 pthread_mutexattr_t prefetchcache_mutex_attr; /* Attribute for lock to make it a recursive lock */
@@ -38,14 +40,19 @@ unsigned int oidMax;
 
 sockPoolHashTable_t *transReadSockPool;
 sockPoolHashTable_t *transPrefetchSockPool;
+sockPoolHashTable_t *transRequestSockPool;
 pthread_mutex_t notifymutex;
 pthread_mutex_t atomicObjLock;
 
 /***********************************
  * Global Variables for statistics
  **********************************/
-extern int numTransCommit;
-extern int numTransAbort;
+int numTransCommit = 0;
+int numTransAbort = 0;
+int nchashSearch = 0;
+int nmhashSearch = 0;
+int nprehashSearch = 0;
+int nRemoteSend = 0;
 
 void printhex(unsigned char *, int);
 plistnode_t *createPiles(transrecord_t *);
@@ -61,7 +68,7 @@ void send_data(int fd , void *buf, int buflen) {
     numbytes = send(fd, buffer, size, MSG_NOSIGNAL);
     if (numbytes == -1) {
       perror("send");
-      exit(-1);
+      return;
     }
     buffer += numbytes;
     size -= numbytes;
@@ -76,7 +83,7 @@ void recv_data(int fd , void *buf, int buflen) {
     numbytes = recv(fd, buffer, size, 0);
     if (numbytes == -1) {
       perror("recv");
-      exit(-1);
+      return;
     }
     buffer += numbytes;
     size -= numbytes;
@@ -92,6 +99,7 @@ int recv_data_errorcode(int fd , void *buf, int buflen) {
     if (numbytes==0)
       return 0;
     if (numbytes == -1) {
+      perror("recv");
       return -1;
     }
     buffer += numbytes;
@@ -132,16 +140,23 @@ inline int findmax(int *array, int arraylength) {
 
 /* This function is a prefetch call generated by the compiler that
  * populates the shared primary prefetch queue*/
-void prefetch(int ntuples, unsigned int *oids, unsigned short *endoffsets, short *arrayfields) {
+void prefetch(int siteid, int ntuples, unsigned int *oids, unsigned short *endoffsets, short *arrayfields) {
   /* Allocate for the queue node*/
-  int qnodesize = sizeof(int) + ntuples * (sizeof(unsigned short) + sizeof(unsigned int)) + endoffsets[ntuples - 1] * sizeof(short);
-  char * node= getmemory(qnodesize);
-  /* Set queue node values */
+  int qnodesize = 2*sizeof(int) + ntuples * (sizeof(unsigned short) + sizeof(unsigned int)) + endoffsets[ntuples - 1] * sizeof(short);
   int len;
+  char * node= getmemory(qnodesize);
   int top=endoffsets[ntuples-1];
+  
+  if (node==NULL) 
+    return;
+  /* Set queue node values */
 
-  *((int *)(node))=ntuples;
-  len = sizeof(int);
+  /* TODO: Remove this after testing */
+  evalPrefetch[siteid].callcount++;
+
+  *((int *)(node))=siteid;
+  *((int *)(node + sizeof(int))) = ntuples;
+  len = 2*sizeof(int);
   memcpy(node+len, oids, ntuples*sizeof(unsigned int));
   memcpy(node+len+ntuples*sizeof(unsigned int), endoffsets, ntuples*sizeof(unsigned short));
   memcpy(node+len+ntuples*(sizeof(unsigned int)+sizeof(short)), arrayfields, top*sizeof(short));
@@ -173,16 +188,17 @@ int dstmStartup(const char * option) {
   //Initialize socket pool
   transReadSockPool = createSockPool(transReadSockPool, 2*numHostsInSystem+1);
   transPrefetchSockPool = createSockPool(transPrefetchSockPool, 2*numHostsInSystem+1);
+  transRequestSockPool = createSockPool(transRequestSockPool, 2*numHostsInSystem+1);
   
   dstmInit();
   transInit();
   
   fd=startlistening();
   udpfd = udpInit();
+  pthread_attr_init(&attr);
+  pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_DETACHED);
   pthread_create(&udp_thread_Listen, &attr, udpListenBroadcast, (void*)udpfd);
   if (master) {
-    pthread_attr_init(&attr);
-    pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_DETACHED);
     pthread_create(&thread_Listen, &attr, dstmListen, (void*)fd);
     return 1;
   } else {
@@ -226,6 +242,11 @@ void transInit() {
   int retval;
   //Create and initialize prefetch cache structure
   prefetchcache = objstrCreate(PREFETCH_CACHE_SIZE);
+  initializePCache();
+  if((evalPrefetch = initPrefetchStats()) == NULL) {
+    printf("%s() Error allocating memory at %s, %d\n", __func__, __FILE__, __LINE__);
+    exit(0);
+  }
   
   /* Initialize attributes for mutex */
   pthread_mutexattr_init(&prefetchcache_mutex_attr);
@@ -284,7 +305,7 @@ transrecord_t *transStart() {
     return NULL;
   }
   tmp->cache = objstrCreate(1048576);
-  tmp->lookupTable = chashCreate(HASH_SIZE, LOADFACTOR);
+  tmp->lookupTable = chashCreate(CHASH_SIZE, CLOADFACTOR);
 #ifdef COMPILER
   tmp->revertlist=NULL;
 #endif
@@ -297,14 +318,17 @@ objheader_t *transRead(transrecord_t *record, unsigned int oid) {
   unsigned int machinenumber;
   objheader_t *tmp, *objheader;
   objheader_t *objcopy;
-  int size, found = 0;
+  int size;
   void *buf;
   
   if(oid == 0) {
     return NULL;
   }
   
-  if((objheader = (objheader_t *)chashSearch(record->lookupTable, oid)) != NULL){
+  if((objheader = chashSearch(record->lookupTable, oid)) != NULL){
+#ifdef TRANSSTATS
+    nchashSearch++;
+#endif
     /* Search local transaction cache */
 #ifdef COMPILER
     return &objheader[1];
@@ -312,6 +336,9 @@ objheader_t *transRead(transrecord_t *record, unsigned int oid) {
     return objheader;
 #endif
   } else if ((objheader = (objheader_t *) mhashSearch(oid)) != NULL) {
+#ifdef TRANSSTATS
+    nmhashSearch++;
+#endif
     /* Look up in machine lookup table  and copy  into cache*/
     GETSIZE(size, objheader);
     size += sizeof(objheader_t);
@@ -326,6 +353,13 @@ objheader_t *transRead(transrecord_t *record, unsigned int oid) {
     return objcopy;
 #endif
   } else if((tmp = (objheader_t *) prehashSearch(oid)) != NULL) { 
+#ifdef TRANSSTATS
+    nprehashSearch++;
+#endif
+#ifdef CHECKTA
+    printf("Prefetch cache read, oid = %x, oidtype =%d\n", oid, TYPE(tmp));
+    fflush(stdout);
+#endif
     /* Look up in prefetch cache */
     GETSIZE(size, tmp);
     size+=sizeof(objheader_t);
@@ -350,6 +384,13 @@ objheader_t *transRead(transrecord_t *record, unsigned int oid) {
       printf("Error: Object not found in Remote location %s, %d\n", __FILE__, __LINE__);
       return NULL;
     } else {
+#ifdef TRANSSTATS
+    nRemoteSend++;
+#endif
+#ifdef CHECKTA
+    printf("Remote read, oid = %x, oidtype =%d\n", oid, TYPE(objcopy));
+    fflush(stdout);
+#endif
       STATUS(objcopy)=0;      
 #ifdef COMPILER
       return &objcopy[1];
@@ -395,7 +436,7 @@ plistnode_t *createPiles(transrecord_t *record) {
       if(curr->key == 0)
        break;
       
-      if ((headeraddr = chashSearch(record->lookupTable, curr->key)) == NULL) {
+      if ((headeraddr = (objheader_t *) chashSearch(record->lookupTable, curr->key)) == NULL) {
        printf("Error: No such oid %s, %d\n", __FILE__, __LINE__);
        return NULL;
       }
@@ -471,7 +512,6 @@ int transCommit(transrecord_t *record) {
     pthread_mutex_t tlshrd;
     
     thread_data_array = (thread_data_array_t *) calloc(pilecount, sizeof(thread_data_array_t));
-    
     ltdata = calloc(1, sizeof(local_thread_data_array_t));
     
     thread_response_t rcvd_control_msg[pilecount];     /* Shared thread array that keeps track of responses of participants */
@@ -588,7 +628,11 @@ int transCommit(transrecord_t *record) {
   
   if(treplyctrl == TRANS_ABORT) {
 #ifdef TRANSSTATS
-    ++numTransAbort;
+    numTransAbort++;
+#endif
+#ifdef CHECKTA
+    char a[] = "Aborting";
+    TABORT1(a);
 #endif
     /* Free Resources */
     objstrDelete(record->cache);
@@ -599,7 +643,11 @@ int transCommit(transrecord_t *record) {
     return TRANS_ABORT;
   } else if(treplyctrl == TRANS_COMMIT) {
 #ifdef TRANSSTATS
-    ++numTransCommit;
+    numTransCommit++;
+#endif
+#ifdef CHECKTA
+    char a[] = "Commiting";
+    TABORT1(a);
 #endif
     /* Free Resources */
     objstrDelete(record->cache);
@@ -627,29 +675,17 @@ void *transRequest(void *threadarg) {
   objheader_t *headeraddr;
   char control, recvcontrol;
   char machineip[16], retval;
-  
+
   tdata = (thread_data_array_t *) threadarg;
-  
-  /* Send Trans Request */
-  if ((sd = socket(AF_INET, SOCK_STREAM, 0)) < 0) {
-    perror("Error in socket for TRANS_REQUEST\n");
-    pthread_exit(NULL);
-  }
-  bzero((char*) &serv_addr, sizeof(serv_addr));
-  serv_addr.sin_family = AF_INET;
-  serv_addr.sin_port = htons(LISTEN_PORT);
-  serv_addr.sin_addr.s_addr = htonl(tdata->mid);
-
-  /* Open Connection */
-  if (connect(sd, (struct sockaddr *) &serv_addr, sizeof(serv_addr)) < 0) {
-    perror("Error in connect for TRANS_REQUEST\n");
-    close(sd);
+
+  if((sd = getSock2WithLock(transRequestSockPool, tdata->mid)) < 0) {
+    printf("transRequest(): socket create error\n");
     pthread_exit(NULL);
   }
-  
+
   /* Send bytes of data with TRANS_REQUEST control message */
   send_data(sd, &(tdata->buffer->f), sizeof(fixed_data_t));
-  
+
   /* Send list of machines involved in the transaction */
   {
     int size=sizeof(unsigned int)*tdata->buffer->f.mcount;
@@ -665,7 +701,10 @@ void *transRequest(void *threadarg) {
   /* Send objects that are modified */
   for(i = 0; i < tdata->buffer->f.nummod ; i++) {
     int size;
-    headeraddr = chashSearch(tdata->rec->lookupTable, tdata->buffer->oidmod[i]);
+    if((headeraddr = chashSearch(tdata->rec->lookupTable, tdata->buffer->oidmod[i])) == NULL) {
+      printf("%s() Error: No such oid %s, %d\n", __func__, __FILE__, __LINE__);
+      pthread_exit(NULL);
+    }
     GETSIZE(size,headeraddr);
     size+=sizeof(objheader_t);
     send_data(sd, headeraddr, size);
@@ -679,9 +718,8 @@ void *transRequest(void *threadarg) {
     recv_data(sd, &length, sizeof(int));
     void *newAddr;
     pthread_mutex_lock(&prefetchcache_mutex);
-    if ((newAddr = objstrAlloc(prefetchcache, length)) == NULL) {
+    if ((newAddr = prefetchobjstrAlloc((unsigned int)length)) == NULL) {
       printf("Error: %s() objstrAlloc error for copying into prefetch cache %s, %d\n", __func__, __FILE__, __LINE__);
-      close(sd);
       pthread_exit(NULL);
     }
     pthread_mutex_unlock(&prefetchcache_mutex);
@@ -692,6 +730,11 @@ void *transRequest(void *threadarg) {
       objheader_t * header;
       header = (objheader_t *) (((char *)newAddr) + offset);
       oidToPrefetch = OID(header);
+#ifdef CHECKTA
+      printf("Trans disagree for oid = %x: ", OID(header));
+      char a[] = "object type";
+      TABORT8(__func__, a, TYPE(header));
+#endif
       int size = 0;
       GETSIZE(size, header);
       size += sizeof(objheader_t);
@@ -707,6 +750,7 @@ void *transRequest(void *threadarg) {
       offset += size;
     }
   }
+
   recvcontrol = control;
   /* Update common data structure and increment count */
   tdata->recvmsg[tdata->thread_id].rcv_status = recvcontrol;
@@ -725,12 +769,51 @@ void *transRequest(void *threadarg) {
     pthread_cond_wait(tdata->threshold, tdata->lock);
   }
   pthread_mutex_unlock(tdata->lock);
+
+  /* clear objects from prefetch cache */
+  /*
+  if(*(tdata->replyctrl) == TRANS_ABORT) {
+    int i;
+    for(i=0; i<tdata->buffer->f.nummod; i++) {
+      unsigned int oid = tdata->buffer->oidmod[i];
+      objheader_t *header;
+      if((header = prehashSearch(oid)) != NULL) {
+        prehashRemove(oid);
+      }
+    }
+    for(i=0; i<tdata->buffer->f.numread; i++) {
+      char *objread = tdata->buffer->objread;
+      unsigned int oid = *((unsigned int *)(objread+(sizeof(unsigned int) +
+                  sizeof(unsigned short))*i));
+      objheader_t *header;
+      if((header = prehashSearch(oid)) != NULL) {
+        prehashRemove(oid);
+      }
+    }
+  }
+  */
+
+  if(*(tdata->replyctrl) == TRANS_COMMIT) {
+    int retval;
+     /* Update prefetch cache */
+    if((retval = updatePrefetchCache(tdata)) != 0) {
+      printf("Error: %s() in updating prefetch cache %s, %d\n", __func__, __FILE__, __LINE__);
+      return;
+    }
+
+    /* Invalidate objects in other machine cache */
+    if(tdata->buffer->f.nummod > 0) {
+      if((retval = invalidateObj(tdata)) != 0) {
+        printf("Error: %s() in invalidating Objects %s, %d\n", __func__, __FILE__, __LINE__);
+        return;
+      }
+    }
+  }
   
   /* Send the final response such as TRANS_COMMIT or TRANS_ABORT 
    * to all participants in their respective socket */
   if (sendResponse(tdata, sd) == 0) { 
     printf("sendResponse returned error %s,%d\n", __FILE__, __LINE__);
-    close(sd);
     pthread_exit(NULL);
   }
   
@@ -743,9 +826,6 @@ void *transRequest(void *threadarg) {
   } else {
     //printf("DEBUG-> Error: Incorrect Transaction End Message %d\n", control);
   }
-  
-  /* Close connection */
-  close(sd);
   pthread_exit(NULL);
 }
 
@@ -782,37 +862,11 @@ void decideResponse(thread_data_array_t *tdata) {
     *(tdata->replyctrl) = TRANS_ABORT;
     *(tdata->replyretry) = 0;
     /* clear objects from prefetch cache */
-    for (i = 0; i < tdata->buffer->f.numread; i++) {
-      prehashRemove(*((unsigned int *)(((char *)tdata->buffer->objread) + (sizeof(unsigned int) + sizeof(unsigned short))*i)));
-    }
-    for (i = 0; i < tdata->buffer->f.nummod; i++) {
-      prehashRemove(tdata->buffer->oidmod[i]);
-    }
+    cleanPCache(tdata);
   } else if(transagree == tdata->buffer->f.mcount){
     /* Send Commit */
     *(tdata->replyctrl) = TRANS_COMMIT;
     *(tdata->replyretry) = 0;
-    /* update prefetch cache */
-    /* For objects read */
-    char oidType;
-    int retval;
-    oidType = 'R'; 
-    if((retval = updatePrefetchCache(tdata, tdata->buffer->f.numread, oidType)) != 0) {
-      printf("Error: %s() in updating prefetch cache %s, %d\n", __func__, __FILE__, __LINE__);
-      return;
-    }
-    oidType = 'M'; 
-    if((retval = updatePrefetchCache(tdata, tdata->buffer->f.nummod, oidType)) != 0) {
-      printf("Error: %s() in updating prefetch cache %s, %d\n", __func__, __FILE__, __LINE__);
-      return;
-    }
-    /* Invalidate objects in other machine cache */
-    if(tdata->buffer->f.nummod > 0) {
-      if((retval = invalidateObj(tdata)) != 0) {
-        printf("Error: %s() in invalidating Objects %s, %d\n", __func__, __FILE__, __LINE__);
-        return;
-      }
-    }
   } else { 
     /* Send Abort in soft abort case followed by retry commiting transaction again*/
     *(tdata->replyctrl) = TRANS_ABORT;
@@ -821,45 +875,6 @@ void decideResponse(thread_data_array_t *tdata) {
   return;
 }
 
-/* This function updates the prefetch cache when commiting objects 
- * based on the type of oid i.e. if oid is read or oid is modified
- * Return -1 on error else returns 0 
- */
-int updatePrefetchCache(thread_data_array_t* tdata, int numoid, char oidType) {
-  int i;
-  for (i = 0; i < numoid; i++) {
-    //find address object 
-    objheader_t *header, *newAddr;
-    int size;
-    unsigned int oid;
-    if(oidType == 'R') {
-      oid = *((unsigned int *)(tdata->buffer->objread + (sizeof(unsigned int) + sizeof(unsigned short))*i)); 
-    } else {
-      oid = tdata->buffer->oidmod[i];
-    }
-    pthread_mutex_lock(&prefetchcache_mutex);
-    header = (objheader_t *) chashSearch(tdata->rec->lookupTable, oid);
-    //copy object into prefetch cache
-    GETSIZE(size, header);
-    if ((newAddr = objstrAlloc(prefetchcache, (size + sizeof(objheader_t)))) == NULL) {
-      printf("Error: %s() objstrAlloc error for copying into prefetch cache %s, %d\n", __func__, __FILE__, __LINE__);
-      return -1;
-    }
-    pthread_mutex_unlock(&prefetchcache_mutex);
-    memcpy(newAddr, header, (size + sizeof(objheader_t)));
-    //make an entry in prefetch hash table
-    void *oldptr;
-    if((oldptr = prehashSearch(oid)) != NULL) {
-      prehashRemove(oid);
-      prehashInsert(oid, newAddr);
-    } else {
-      prehashInsert(oid, newAddr);
-    }
-  }
-  return 0;
-}
-
-
 /* This function sends the final response to remote machines per
  * thread in their respective socket id It returns a char that is only
  * needed to check the correctness of execution of this function
@@ -942,7 +957,7 @@ void *handleLocalReq(void *threadarg) {
   unsigned short version;
   void *mobj;
   objheader_t *headptr;
-  
+
   localtdata = (local_thread_data_array_t *) threadarg;
   
   /* Counters and arrays to formulate decision on control message to be sent */
@@ -961,8 +976,8 @@ void *handleLocalReq(void *threadarg) {
       int tmpsize;
       headptr = (objheader_t *) chashSearch(localtdata->tdata->rec->lookupTable, localtdata->tdata->buffer->oidmod[i-numread]);
       if (headptr == NULL) {
-       printf("Error: handleLocalReq() returning NULL %s, %d\n", __FILE__, __LINE__);
-       return NULL;
+        printf("Error: handleLocalReq() returning NULL, no such oid %s, %d\n", __FILE__, __LINE__);
+        return NULL;
       }
       oid = OID(headptr);
       version = headptr->version;
@@ -983,6 +998,20 @@ void *handleLocalReq(void *threadarg) {
           v_nomatch++;
           /* Send TRANS_DISAGREE to Coordinator */
           localtdata->tdata->recvmsg[localtdata->tdata->thread_id].rcv_status = TRANS_DISAGREE;
+#ifdef CHECKTA
+      printf("Trans disagree for oid = %x: ", OID(mobj));
+      char a[] = "object type";
+      TABORT8(__func__, a, TYPE(mobj));
+#endif
+
+#ifdef CHECKTA
+  //char a[] = "mid";
+  //char b[] = "version mismatch";
+  //char c[] = "object type";
+  //char d[] = "oid";
+  //TABORT9(__func__, b, a, c, d, localtdata->tdata->mid, TYPE(mobj), OID(mobj));
+#endif
+          break;
         }
       } else {
        //we're locked
@@ -995,6 +1024,19 @@ void *handleLocalReq(void *threadarg) {
           v_nomatch++;
           /* Send TRANS_DISAGREE to Coordinator */
           localtdata->tdata->recvmsg[localtdata->tdata->thread_id].rcv_status = TRANS_DISAGREE;
+#ifdef CHECKTA
+      printf("Trans disagree for oid = %x: ", OID(mobj));
+      char a[] = "object type";
+      TABORT8(__func__, a, TYPE(mobj));
+#endif
+#ifdef CHECKTA
+  //char a[] = "mid";
+  //char b[] = "version mismatch";
+  //char c[] = "object type";
+  //char d[] = "oid";
+  //TABORT9(__func__, b, a, c, d, localtdata->tdata->mid, TYPE(mobj), OID(mobj));
+#endif
+          break;
         }
       }
     }
@@ -1005,6 +1047,13 @@ void *handleLocalReq(void *threadarg) {
   }
   /* Condition to send TRANS_SOFT_ABORT */
   if((v_matchlock > 0 && v_nomatch == 0) || (numoidnotfound > 0 && v_nomatch == 0)) {
+#ifdef CHECKTA
+  //char a[] = "mid";
+  //char b[] = "version mismatch";
+  //char c[] = "object type";
+  //TABORT7(__func__, b, a, c, localtdata->tdata->mid, TYPE(mobj));
+  printf("%s() Soft abort\n", __func__);
+#endif
     localtdata->tdata->recvmsg[localtdata->tdata->thread_id].rcv_status = TRANS_SOFT_ABORT;
   }
   
@@ -1031,11 +1080,21 @@ void *handleLocalReq(void *threadarg) {
   if(*(localtdata->tdata->replyctrl) == TRANS_ABORT){
     if(transAbortProcess(localtdata) != 0) {
       printf("Error in transAbortProcess() %s,%d\n", __FILE__, __LINE__);
+      fflush(stdout);
       pthread_exit(NULL);
     }
   } else if(*(localtdata->tdata->replyctrl) == TRANS_COMMIT) {
+    /* Invalidate objects in other machine cache */
+    if(localtdata->tdata->buffer->f.nummod > 0) {
+      int retval;
+      if((retval = invalidateObj(localtdata->tdata)) != 0) {
+        printf("Error: %s() in invalidating Objects %s, %d\n", __func__, __FILE__, __LINE__);
+        return;
+      }
+    }
     if(transComProcess(localtdata) != 0) {
       printf("Error in transComProcess() %s,%d\n", __FILE__, __LINE__);
+      fflush(stdout);
       pthread_exit(NULL);
     }
   }
@@ -1046,7 +1105,6 @@ void *handleLocalReq(void *threadarg) {
   if (localtdata->transinfo->objnotfound != NULL) {
     free(localtdata->transinfo->objnotfound);
   }
-  
   pthread_exit(NULL);
 }
 
@@ -1070,7 +1128,7 @@ int transAbortProcess(local_thread_data_array_t  *localtdata) {
   return 0;
 }
 
-/*This function completes the COMMIT process is the transaction is commiting*/
+/*This function completes the COMMIT process if the transaction is commiting*/
 int transComProcess(local_thread_data_array_t  *localtdata) {
   objheader_t *header, *tcptr;
   int i, nummod, tmpsize, numcreated, numlocked;
@@ -1083,7 +1141,7 @@ int transComProcess(local_thread_data_array_t  *localtdata) {
   oidcreated = localtdata->tdata->buffer->oidcreated;
   numlocked = localtdata->transinfo->numlocked;
   oidlocked = localtdata->transinfo->objlocked;
-  
+
   for (i = 0; i < nummod; i++) {
     if((header = (objheader_t *) mhashSearch(oidmod[i])) == NULL) {
       printf("Error: transComProcess() mhashsearch returns NULL at %s, %d\n", __FILE__, __LINE__);
@@ -1134,11 +1192,13 @@ int transComProcess(local_thread_data_array_t  *localtdata) {
 }
 
 prefetchpile_t *foundLocal(char *ptr) {
+  int siteid = *(GET_SITEID(ptr));
   int ntuples = *(GET_NTUPLES(ptr));
   unsigned int * oidarray = GET_PTR_OID(ptr);
   unsigned short * endoffsets = GET_PTR_EOFF(ptr, ntuples); 
   short * arryfields = GET_PTR_ARRYFLD(ptr, ntuples);
   prefetchpile_t * head=NULL;
+  int numLocal = 0;
   
   int i;
   for(i=0;i<ntuples; i++) {
@@ -1158,14 +1218,19 @@ prefetchpile_t *foundLocal(char *ptr) {
        goto tuple;
     }
     //Entire prefetch is local
-    if (newbase==endindex&&checkoid(oid))
+    if (newbase==endindex&&checkoid(oid)){
+      numLocal++;
       goto tuple;
+    }
     //Add to remote requests
     machinenum=lhashSearch(oid);
     insertPile(machinenum, oid, endindex-newbase, &arryfields[newbase], &head);
   tuple:
     ;
   }
+
+  /* handle dynamic prefetching */
+  handleDynPrefetching(numLocal, ntuples, siteid);
   return head;
 }
 
@@ -1320,10 +1385,9 @@ int getPrefetchResponse(int sd) {
   control = *((char *) recvbuffer);
   if(control == OBJECT_FOUND) {
     oid = *((unsigned int *)(recvbuffer + sizeof(char)));
-    //printf("oid %d found\n",oid);
     size = size - (sizeof(char) + sizeof(unsigned int));
     pthread_mutex_lock(&prefetchcache_mutex);
-    if ((modptr = objstrAlloc(prefetchcache, size)) == NULL) {
+    if ((modptr = prefetchobjstrAlloc(size)) == NULL) {
       printf("Error: objstrAlloc error for copying into prefetch cache %s, %d\n", __FILE__, __LINE__);
       pthread_mutex_unlock(&prefetchcache_mutex);
       return -1;
@@ -1353,7 +1417,7 @@ int getPrefetchResponse(int sd) {
     oid = *((unsigned int *)(recvbuffer + sizeof(char)));
     /* TODO: For each object not found query DHT for new location and retrieve the object */
     /* Throw an error */
-    printf("OBJECT %x NOT FOUND.... THIS SHOULD NOT HAPPEN...TERMINATE PROGRAM\n", oid);
+    //printf("OBJECT %x NOT FOUND.... THIS SHOULD NOT HAPPEN...TERMINATE PROGRAM\n", oid);
     //    exit(-1);
   } else {
     printf("Error: in decoding the control value %d, %s, %d\n",control, __FILE__, __LINE__);
@@ -1366,18 +1430,39 @@ unsigned short getObjType(unsigned int oid) {
   objheader_t *objheader;
   unsigned short numoffset[] ={0};
   short fieldoffset[] ={};
-  
+
   if ((objheader = (objheader_t *) mhashSearch(oid)) == NULL) {
-      if ((objheader = (objheader_t *) prehashSearch(oid)) == NULL) {
-       prefetch(1, &oid, numoffset, fieldoffset);
-       pthread_mutex_lock(&pflookup.lock);
-       while ((objheader = (objheader_t *) prehashSearch(oid)) == NULL) {
-         pthread_cond_wait(&pflookup.cond, &pflookup.lock);
-       }
-       pthread_mutex_unlock(&pflookup.lock);
+    if ((objheader = (objheader_t *) prehashSearch(oid)) == NULL) {
+      unsigned int mid = lhashSearch(oid);
+      int sd = getSock2(transReadSockPool, mid);
+      char remotereadrequest[sizeof(char)+sizeof(unsigned int)];
+      remotereadrequest[0] = READ_REQUEST;
+      *((unsigned int *)(&remotereadrequest[1])) = oid;
+      send_data(sd, remotereadrequest, sizeof(remotereadrequest));
+
+      /* Read response from the Participant */
+      char control;
+      recv_data(sd, &control, sizeof(char));
+
+      if (control==OBJECT_NOT_FOUND) {
+        printf("Error: in %s() THIS SHOULD NOT HAPPEN.....EXIT PROGRAM\n", __func__);
+        fflush(stdout);
+        exit(-1);
+      } else {
+        /* Read object if found into local cache */
+        int size;
+        recv_data(sd, &size, sizeof(int));
+        pthread_mutex_lock(&prefetchcache_mutex);
+        if ((objheader = prefetchobjstrAlloc(size)) == NULL) {
+          printf("Error: %s() objstrAlloc error for copying into prefetch cache %s, %d\n", __func__, __FILE__, __LINE__);
+          pthread_exit(NULL);
+        }
+        pthread_mutex_unlock(&prefetchcache_mutex);
+        recv_data(sd, objheader, size);
+        prehashInsert(oid, objheader);
       }
+    }
   }
-  
   return TYPE(objheader);
 }
 
@@ -1653,7 +1738,7 @@ void threadNotify(unsigned int oid, unsigned short version, unsigned int tid) {
                        return;
                } else {
                        if(version <= ndata->versionarry[index]){
-                               printf("threadNotify(): New version %d has not changed since last version %s, %d\n", version, __FILE__, __LINE__);
+                               printf("threadNotify(): New version %d has not changed since last version for oid = %d, %s, %d\n", version, oid, __FILE__, __LINE__);
                                return;
                        } else {
                                /* Clear from prefetch cache and free thread related data structure */
@@ -1691,8 +1776,8 @@ int notifyAll(threadlist_t **head, unsigned int oid, unsigned int version) {
     if (connect(sock, (struct sockaddr *)&remoteAddr, sizeof(remoteAddr)) < 0){
       printf("notifyAll():error %d connecting to %s:%d\n", errno,
             inet_ntoa(remoteAddr.sin_addr), LISTEN_PORT);
-      status = -1;
       fflush(stdout);
+      status = -1;
     } else {
       bzero(msg, (1+sizeof(unsigned short) + 2*sizeof(unsigned int)));
       msg[0] = THREAD_NOTIFY_RESPONSE;
@@ -1719,3 +1804,70 @@ void transAbort(transrecord_t *trans) {
   chashDelete(trans->lookupTable);
   free(trans);
 }
+
+/* This function inserts necessary information into 
+ * a machine pile data structure */
+plistnode_t *pInsert(plistnode_t *pile, objheader_t *headeraddr, unsigned int mid, int num_objs) {
+  plistnode_t *ptr, *tmp;
+  int found = 0, offset = 0;
+  
+  tmp = pile;
+  //Add oid into a machine that is already present in the pile linked list structure
+  while(tmp != NULL) {
+    if (tmp->mid == mid) {
+      int tmpsize;
+      
+      if (STATUS(headeraddr) & NEW) {
+       tmp->oidcreated[tmp->numcreated] = OID(headeraddr);
+       tmp->numcreated++;
+       GETSIZE(tmpsize, headeraddr);
+       tmp->sum_bytes += sizeof(objheader_t) + tmpsize;
+      }else if (STATUS(headeraddr) & DIRTY) {
+       tmp->oidmod[tmp->nummod] = OID(headeraddr);
+       tmp->nummod++;
+       GETSIZE(tmpsize, headeraddr);
+       tmp->sum_bytes += sizeof(objheader_t) + tmpsize;
+      } else {
+       offset = (sizeof(unsigned int) + sizeof(short)) * tmp->numread;
+       *((unsigned int *)(((char *)tmp->objread) + offset))=OID(headeraddr);
+       offset += sizeof(unsigned int);
+       *((short *)(((char *)tmp->objread) + offset)) = headeraddr->version;
+       tmp->numread ++;
+      }
+      found = 1;
+      break;
+    }
+    tmp = tmp->next;
+  }
+  //Add oid for any new machine 
+  if (!found) {
+    int tmpsize;
+    if((ptr = pCreate(num_objs)) == NULL) {
+      return NULL;
+    }
+    ptr->mid = mid;
+    if (STATUS(headeraddr) & NEW) {
+      ptr->oidcreated[ptr->numcreated] = OID(headeraddr);
+      ptr->numcreated ++;
+      GETSIZE(tmpsize, headeraddr);
+      ptr->sum_bytes += sizeof(objheader_t) + tmpsize;
+    } else if (STATUS(headeraddr) & DIRTY) {
+      ptr->oidmod[ptr->nummod] = OID(headeraddr);
+      ptr->nummod ++;
+      GETSIZE(tmpsize, headeraddr);
+      ptr->sum_bytes += sizeof(objheader_t) + tmpsize;
+    } else {
+      *((unsigned int *)ptr->objread)=OID(headeraddr);
+      offset = sizeof(unsigned int);
+      *((short *)(((char *)ptr->objread) + offset)) = headeraddr->version;
+      ptr->numread ++;
+    }
+    ptr->next = pile;
+    pile = ptr;
+  }
+  
+  /* Clear Flags */
+  STATUS(headeraddr) =0;
+  
+  return pile;
+}