remove the system.clearPrefetchCache call
authoradash <adash>
Fri, 1 Aug 2008 17:48:33 +0000 (17:48 +0000)
committeradash <adash>
Fri, 1 Aug 2008 17:48:33 +0000 (17:48 +0000)
bug fix for openning large number of sockets (use socket pool and recycle sockets)
bug fix for updating prefetch cache using version increment for modified objects
fix compile errors by moving #include for plookup.h

15 files changed:
Robust/src/Benchmarks/Prefetch/Em3d/dsm/Em3dNold.java
Robust/src/Benchmarks/Prefetch/SOR/dsm/JGFSORBench.java
Robust/src/Benchmarks/Prefetch/SOR/dsm/JGFSORBenchSizeA.java
Robust/src/Benchmarks/Prefetch/SOR/dsm/SORRunner.java
Robust/src/IR/Flat/BuildCode.java
Robust/src/Runtime/DSTM/interface/addPrefetchEnhance.c
Robust/src/Runtime/DSTM/interface/addPrefetchEnhance.h
Robust/src/Runtime/DSTM/interface/dstm.h
Robust/src/Runtime/DSTM/interface/gCollect.c
Robust/src/Runtime/DSTM/interface/plookup.c
Robust/src/Runtime/DSTM/interface/plookup.h
Robust/src/Runtime/DSTM/interface/signal.c
Robust/src/Runtime/DSTM/interface/sockpool.c
Robust/src/Runtime/DSTM/interface/sockpool.h
Robust/src/Runtime/DSTM/interface/trans.c

index daf334efc863110c1bfac67124a83b79f0202ab8..d2769e7ee25a2723b13e93c80bd04c7590c9e00f 100644 (file)
@@ -37,7 +37,7 @@ public class Em3d extends Thread
    **/
   private boolean printMsgs;
 
-    int numThreads;
+  int numThreads;
 
   BiGraph bg;
   int upperlimit;
@@ -82,7 +82,6 @@ public class Em3d extends Thread
        }
        
        Barrier.enterBarrier(barr);
-       System.clearPrefetchCache();
 
        /* for  hNodes */
        atomic {
@@ -94,7 +93,6 @@ public class Em3d extends Thread
            }
        }
        Barrier.enterBarrier(barr);
-       System.clearPrefetchCache();
     }
   }
 
@@ -112,11 +110,11 @@ public class Em3d extends Thread
     long start0 = System.currentTimeMillis();
     int numThreads = em.numThreads;
     int[] mid = new int[4];
-    mid[0] = (128<<24)|(195<<16)|(175<<8)|69;
-    mid[1] = (128<<24)|(195<<16)|(175<<8)|80;
-    mid[2] = (128<<24)|(195<<16)|(175<<8)|73;
-    mid[3] = (128<<24)|(195<<16)|(175<<8)|78;
-    System.printString("DEBUG -> numThreads = " + numThreads+"\n");
+    mid[0] = (128<<24)|(195<<16)|(175<<8)|79;
+    mid[1] = (128<<24)|(195<<16)|(175<<8)|73;
+    mid[2] = (128<<24)|(195<<16)|(175<<8)|78;
+    mid[3] = (128<<24)|(195<<16)|(175<<8)|69;
+    //System.printString("DEBUG -> numThreads = " + numThreads+"\n");
     Barrier mybarr;
     BiGraph graph;
     Random rand = new Random(783);
index 273155a7662f94a40f1b69bb6bf132b400623d7d..11741f1dddec1db4466c24d3caaca11fbe11c016 100644 (file)
@@ -93,9 +93,9 @@ public class JGFSORBench {
     SORRunner tmp;
     int[] mid = new int[4];
     mid[0] = (128<<24)|(195<<16)|(175<<8)|79;
-    mid[1] = (128<<24)|(195<<16)|(175<<8)|80;
-    mid[2] = (128<<24)|(195<<16)|(175<<8)|73;
-    mid[3] = (128<<24)|(195<<16)|(175<<8)|78;
+    mid[1] = (128<<24)|(195<<16)|(175<<8)|73;
+    mid[2] = (128<<24)|(195<<16)|(175<<8)|78;
+    mid[3] = (128<<24)|(195<<16)|(175<<8)|69;
     for(int i=1;i<numthreads;i++) {
       atomic {
         thobjects[i] =  global new SORRunner(i,omega,G,num_iterations,sor.sync,numthreads);
index 23f272240cae5f3c85030fa57ba18042de244dc7..ed50e2a22947aeb2108d07d1a09a0990c6005822 100644 (file)
@@ -62,5 +62,6 @@ public class JGFSORBenchSizeA{
 
     JGFInstrumentor.addOpsToTimer("Section2:SOR:Kernel", (double) jacobi, instr.timers);
     JGFInstrumentor.printTimer("Section2:SOR:Kernel", instr.timers); 
+    System.printString("Finished\n");
   }
 }
index 611d5a857c97a0a4ded6bef265ad6599813ed994..9a41faecacdd710d1dfa345aa318ead642898f96 100644 (file)
@@ -126,8 +126,6 @@ class SORRunner extends Thread {
             done=false;
         }
       }
-
-      System.clearPrefetchCache();
     }//end of for
   } //end of run()
 }
index b6679ed2b00858e89b8a11f2c07246ebd458fdb1..50795313ed29bd34784c5678e88af71b796b6a48 100644 (file)
@@ -316,7 +316,7 @@ public class BuildCode {
        outmethod.println("#include \"virtualtable.h\"");
        outmethod.println("#include \"runtime.h\"");
        if (state.DSM) {
-           outmethod.println("#include \"dstm.h\"");
+           outmethod.println("#include \"addPrefetchEnhance.h\"");
            outmethod.println("#include \"localobjects.h\"");
        }
        if(state.MULTICORE) {
index ef7d7aa4c5f8a7aab2ce36db3151fbef978785d3..a39bc897727e2d4727b2cabdf0fdb48d41be38eb 100644 (file)
@@ -1,10 +1,14 @@
-#include "dstm.h"
 #include "addPrefetchEnhance.h"
+#include "prelookup.h"
 
-extern int numprefetchsites;
-//pfcstats_t evalPrefetch[numprefetchsites]; //Global array for all prefetch sites in the executable
-extern pfcstats_t *evalPrefetch;
+extern int numprefetchsites; // Number of prefetch sites
+extern pfcstats_t *evalPrefetch; //Global array that keeps track of operation mode (ON/OFF) for each prefetch site
+extern objstr_t *prefetchcache; //Global Prefetch cache
+extern pthread_mutex_t prefetchcache_mutex; //Mutex to lock Prefetch Cache
+extern unsigned int myIpAddr;
 
+/* This function creates and initializes the 
+ * evalPrefetch global array */
 pfcstats_t *initPrefetchStats() {
   pfcstats_t *ptr;
   if((ptr = calloc(numprefetchsites, sizeof(pfcstats_t))) == NULL) {
@@ -34,6 +38,10 @@ char getOperationMode(int siteid) {
   return evalPrefetch[siteid].operMode;
 }
 
+/* This function updates counters and mode of operation of a 
+ * prefetch site during runtime. When the prefetch call at a site
+ * generates oids that are found/not found in the prefetch cache,
+ * we take action accordingly */
 void handleDynPrefetching(int numLocal, int ntuples, int siteid) {
   if(numLocal < ntuples) {
     /* prefetch not found locally(miss in cache) */
@@ -47,3 +55,93 @@ void handleDynPrefetching(int numLocal, int ntuples, int siteid) {
     }
   }
 }
+
+/* This function clears from prefetch cache those 
+ * entries that caused a transaction abort */
+void cleanPCache(thread_data_array_t *tdata) {
+  transrecord_t *rec = tdata->rec;
+  unsigned int size = rec->lookupTable->size;
+  chashlistnode_t *ptr = rec->lookupTable->table;
+  int i;
+  for(i = 0; i < size; i++) {
+    chashlistnode_t *curr = &ptr[i]; //for each entry in the cache lookupTable
+    while(curr != NULL) {
+      if(curr->key == 0) 
+        break;
+      objheader_t *header1, *header2;
+      if((header1 = mhashSearch(curr->key)) == NULL && ((header2 = prehashSearch(curr->key)) != NULL)) {
+        /* Not found in local machine's object store and found in prefetch cache */
+        /* Remove from prefetch cache */
+        prehashRemove(curr->key);
+      }
+      curr = curr->next;
+    }
+  }
+}
+
+/* This function updates the prefetch cache with
+ * entires from the transaction cache when a 
+ * transaction commits 
+ * Return -1 on error else returns 0 */ 
+int updatePrefetchCache(thread_data_array_t* tdata) {
+  plistnode_t *pile = tdata->pilehead;
+  while(pile != NULL) {
+    if(pile->mid != myIpAddr) { //Not local machine
+      int retval;
+      char oidType;
+      oidType = 'R';
+      if((retval = copyToCache(pile->numread, (unsigned int *)(pile->objread), tdata, oidType)) != 0) {
+        printf("%s(): Error in copying objects read at %s, %d\n", __func__, __FILE__, __LINE__);
+        return -1;
+      }
+      oidType = 'M';
+      if((retval = copyToCache(pile->nummod, pile->oidmod, tdata, oidType)) != 0) {
+        printf("%s(): Error in copying objects read at %s, %d\n", __func__, __FILE__, __LINE__);
+        return -1;
+      }
+    }
+    pile = pile->next;
+  }
+  return 0;
+}
+
+int copyToCache(int numoid, unsigned int *oidarray, thread_data_array_t *tdata, char oidType) { 
+  int i;
+  for (i = 0; i < numoid; i++) {
+    unsigned int oid;
+    if(oidType == 'R') {
+      char * objread = (char *) oidarray;
+      oid = *((unsigned int *)(objread+(sizeof(unsigned int)+
+              sizeof(unsigned short))*i));
+    } else {
+      oid = oidarray[i];
+    }
+    pthread_mutex_lock(&prefetchcache_mutex);
+    objheader_t *header = (objheader_t *) chashSearch(tdata->rec->lookupTable, oid); 
+    //copy into prefetch cache
+    int size;
+    GETSIZE(size, header);
+    objheader_t * newAddr;
+    if((newAddr = prefetchobjstrAlloc(size + sizeof(objheader_t))) == NULL) {
+      printf("%s(): Error in getting memory from prefetch cache at %s, %d\n", __func__, 
+          __FILE__, __LINE__);
+      pthread_mutex_unlock(&prefetchcache_mutex);
+      return -1;
+    }
+    pthread_mutex_unlock(&prefetchcache_mutex);
+    memcpy(newAddr, header, size+sizeof(objheader_t));
+    //Increment version for every modified object
+    if(oidType == 'M') {
+      newAddr->version += 1;
+    }
+    //make an entry in prefetch lookup hashtable
+    void *oldptr;
+    if((oldptr = prehashSearch(oid)) != NULL) {
+      prehashRemove(oid);
+      prehashInsert(oid, newAddr);
+    } else {
+      prehashInsert(oid, newAddr);
+    }
+  } //end of for
+  return 0;
+}
index e33e37661aa549bc079c578b631159fa9009f0ec..a335c0ac52f2cc040ed8a4ee00f9fab8470d35e2 100644 (file)
@@ -1,6 +1,10 @@
 #ifndef _ADDPREFETCHENHANCE_H_
 #define _ADDPREFETCHENHANCE_H_
 
+#include "dstm.h"
+#include "mlookup.h"
+#include "gCollect.h"
+
 typedef struct prefetchCountStats {
   int retrycount;    /* keeps track of when to retry and check if we can turn on this prefetch site */ 
   int uselesscount; /* keeps track of how long was the prefetching at site useles */ 
@@ -13,5 +17,8 @@ int getRetryCount(int siteid);
 int getUselessCount(int siteid);
 char getOperationMode(int);
 void handleDynPrefetching(int, int, int);
+void cleanPCache(thread_data_array_t *tdata);
+int updatePrefetchCache(thread_data_array_t *);
+int copyToCache(int , unsigned int *, thread_data_array_t *, char );
 
 #endif
index e2ad79cf66f61a94f155bdd7a3c5d1c73cea9d62..11927ab1f8d7028f62cc8e739e21a58286572562 100644 (file)
@@ -58,8 +58,8 @@
 #define LISTEN_PORT 2156
 #define UDP_PORT 2158
 //Prefetch tuning paramters
-#define RETRYINTERVAL  100  //N
-#define SHUTDOWNINTERVAL  1  //M
+#define RETRYINTERVAL  7 //N
+#define SHUTDOWNINTERVAL  4  //M
 
 #include <stdlib.h>
 #include <stdio.h>
@@ -81,9 +81,8 @@
 #include <errno.h>
 #include <time.h>
 #include "sockpool.h"
-#include "prelookup.h"
 #include <signal.h>
-#include "addPrefetchEnhance.h"
+#include "plookup.h"
 
 //bit designations for status field of objheader
 #define DIRTY 0x01
@@ -209,7 +208,7 @@ typedef struct trans_commit_data{
 typedef struct thread_data_array {
   int thread_id;       
   int mid;    
-  trans_req_data_t *buffer;    /* Holds trans request information sent to participants */  
+  trans_req_data_t *buffer;    /* Holds trans request information sent to a participant, based on threadid */  
   thread_response_t *recvmsg;  /* Shared datastructure to keep track of the participants response to a trans request */
   pthread_cond_t *threshold;    /* Condition var to waking up a thread */
   pthread_mutex_t *lock;       /* Lock for counting participants response */
@@ -217,6 +216,7 @@ typedef struct thread_data_array {
   char *replyctrl;             /* Shared ctrl message that stores the reply to be sent to participants, filled by decideResponse() */
   char *replyretry;            /* Shared variable that keep track if coordinator needs retry */
   transrecord_t *rec;          /* To send modified objects */
+  plistnode_t *pilehead;   /*  Shared variable, ptr to the head of the machine piles for the transaction rec */
 } thread_data_array_t;
 
 
@@ -296,9 +296,7 @@ void sendPrefetchReqnew(prefetchpile_t*, int);
 int getPrefetchResponse(int);
 unsigned short getObjType(unsigned int oid);
 int startRemoteThread(unsigned int oid, unsigned int mid);
-int updatePrefetchCache(thread_data_array_t *, int, char);
-
-
+plistnode_t *pInsert(plistnode_t *pile, objheader_t *headeraddr, unsigned int mid, int num_objs);
 
 /* Sends notification request for thread join, if sucessful returns 0 else returns -1 */
 int reqNotify(unsigned int *oidarry, unsigned short *versionarry, unsigned int numoid);
index 3d1371b3afcbf917043269ee7e9b2f3d70f8ab19..fed70e2d2013162ab861713e38ef93094611f22f 100644 (file)
@@ -1,4 +1,5 @@
 #include "gCollect.h"
+#include "prelookup.h"
 
 extern objstr_t *prefetchcache; //Global Prefetch cache
 extern pthread_mutex_t prefetchcache_mutex; //Mutex to lock Prefetch Cache
index aafebf0a306fbf921a4bebb4a6476f8db40fe418..683f11d474abcfcc8816307893516878dd63a4d3 100644 (file)
@@ -40,73 +40,6 @@ plistnode_t *pCreate(int objects) {
        return pile;
 }
 
-/* This function inserts necessary information into 
- * a machine pile data structure */
-plistnode_t *pInsert(plistnode_t *pile, objheader_t *headeraddr, unsigned int mid, int num_objs) {
-  plistnode_t *ptr, *tmp;
-  int found = 0, offset = 0;
-  
-  tmp = pile;
-  //Add oid into a machine that is already present in the pile linked list structure
-  while(tmp != NULL) {
-    if (tmp->mid == mid) {
-      int tmpsize;
-      
-      if (STATUS(headeraddr) & NEW) {
-       tmp->oidcreated[tmp->numcreated] = OID(headeraddr);
-       tmp->numcreated++;
-       GETSIZE(tmpsize, headeraddr);
-       tmp->sum_bytes += sizeof(objheader_t) + tmpsize;
-      }else if (STATUS(headeraddr) & DIRTY) {
-       tmp->oidmod[tmp->nummod] = OID(headeraddr);
-       tmp->nummod++;
-       GETSIZE(tmpsize, headeraddr);
-       tmp->sum_bytes += sizeof(objheader_t) + tmpsize;
-      } else {
-       offset = (sizeof(unsigned int) + sizeof(short)) * tmp->numread;
-       *((unsigned int *)(((char *)tmp->objread) + offset))=OID(headeraddr);
-       offset += sizeof(unsigned int);
-       *((short *)(((char *)tmp->objread) + offset)) = headeraddr->version;
-       tmp->numread ++;
-      }
-      found = 1;
-      break;
-    }
-    tmp = tmp->next;
-  }
-  //Add oid for any new machine 
-  if (!found) {
-    int tmpsize;
-    if((ptr = pCreate(num_objs)) == NULL) {
-      return NULL;
-    }
-    ptr->mid = mid;
-    if (STATUS(headeraddr) & NEW) {
-      ptr->oidcreated[ptr->numcreated] = OID(headeraddr);
-      ptr->numcreated ++;
-      GETSIZE(tmpsize, headeraddr);
-      ptr->sum_bytes += sizeof(objheader_t) + tmpsize;
-    } else if (STATUS(headeraddr) & DIRTY) {
-      ptr->oidmod[ptr->nummod] = OID(headeraddr);
-      ptr->nummod ++;
-      GETSIZE(tmpsize, headeraddr);
-      ptr->sum_bytes += sizeof(objheader_t) + tmpsize;
-    } else {
-      *((unsigned int *)ptr->objread)=OID(headeraddr);
-      offset = sizeof(unsigned int);
-      *((short *)(((char *)ptr->objread) + offset)) = headeraddr->version;
-      ptr->numread ++;
-    }
-    ptr->next = pile;
-    pile = ptr;
-  }
-  
-  /* Clear Flags */
-  STATUS(headeraddr) =0;
-  
-  return pile;
-}
-
 //Count the number of machine piles
 int pCount(plistnode_t *pile) {
        plistnode_t *tmp;
index fdcfc535052b3fc86091f208894b0428b2ea28d7..4d15b4a761aa6efdb7dd2e6838b6d4e6cc5fdefd 100644 (file)
@@ -3,7 +3,6 @@
 
 #include <stdlib.h>
 #include <stdio.h>
-#include "dstm.h"
 
 /* This structure is created using a transaction record.
  * It is filled out with pile information necessary for 
@@ -21,7 +20,6 @@ typedef struct plistnode {
 } plistnode_t;
 
 plistnode_t  *pCreate(int);
-plistnode_t *pInsert(plistnode_t *pile, objheader_t *headeraddr, unsigned int mid, int num_objs);
 int pCount(plistnode_t *pile);
 int pListMid(plistnode_t *pile, unsigned int *list);
 void pDelete(plistnode_t *pile);
index 062e8a0f1d2e922c4c0ebb47230e7ba49221ec0d..dae52b8d55fbd1521b9fcb783990bab61d8545be 100644 (file)
@@ -1,4 +1,5 @@
 #include "dstm.h"
+#include "addPrefetchEnhance.h"
 #include <signal.h>
 
 extern int numTransAbort;
index 2cb2a2412039c5068f62c83e4b11cae6cc79ff1f..27d2e953f42f163acaaaaf28d9844f3245dd8b23 100644 (file)
@@ -101,6 +101,8 @@ int getSockWithLock(sockPoolHashTable_t *sockhash, unsigned int mid) {
   UnLock(&sockhash->mylock);
   if((sd = createNewSocket(mid)) != -1) {
     socknode_t *inusenode = calloc(1, sizeof(socknode_t));
+    inusenode->sd = sd;
+    inusenode->mid = mid;
     insToListWithLock(sockhash, inusenode);
     return sd;
   } else {
@@ -159,6 +161,33 @@ int getSock2(sockPoolHashTable_t *sockhash, unsigned int mid) {
   }
 }
 
+/*socket pool with multiple TR threads asking to connect to same machine  */
+int getSock2WithLock(sockPoolHashTable_t *sockhash, unsigned int mid) {
+  socknode_t **ptr;
+  int key = mid%(sockhash->size);
+  int sd;
+  
+  Lock(&sockhash->mylock);
+  ptr=&(sockhash->table[key]);
+  while(*ptr!=NULL) {
+    if (mid == (*ptr)->mid) {
+      UnLock(&sockhash->mylock);
+      return (*ptr)->sd;
+    }
+    ptr=&((*ptr)->next);
+  }
+  UnLock(&sockhash->mylock);
+  if((sd = createNewSocket(mid)) != -1) {
+    *ptr=calloc(1, sizeof(socknode_t));
+    (*ptr)->mid=mid;
+    (*ptr)->sd=sd;
+    //insToListWithLock(sockhash, *ptr);
+    return sd;
+  } else {
+    return -1;
+  }
+}
+
 void insToListWithLock(sockPoolHashTable_t *sockhash, socknode_t *inusenode) {
     Lock(&sockhash->mylock);
     inusenode->next = sockhash->inuse;
index be392e47ce4b534d37f614e6729848b44e828209..212c426a9ed6489a3d8bd38aca9af5fdd3620cb3 100644 (file)
@@ -23,6 +23,7 @@ typedef struct sockPoolHashTable {
 sockPoolHashTable_t *createSockPool(sockPoolHashTable_t *, unsigned int);
 int getSock(sockPoolHashTable_t *, unsigned int);
 int getSock2(sockPoolHashTable_t *, unsigned int);
+int getSock2WithLock(sockPoolHashTable_t *h, unsigned int); 
 int getSockWithLock(sockPoolHashTable_t *, unsigned int);
 void freeSock(sockPoolHashTable_t *, unsigned int, int);
 void freeSockWithLock(sockPoolHashTable_t *, unsigned int, int);
index d706678bffb2e2fa61dd8e55168e80187cc79d02..087c208af5d25c1051b9ba4ddea222a6b4c88ac6 100644 (file)
@@ -9,6 +9,7 @@
 #include "threadnotify.h"
 #include "queue.h"
 #include "addUdpEnhance.h"
+#include "addPrefetchEnhance.h"
 #include "gCollect.h"
 #ifdef COMPILER
 #include "thread.h"
@@ -40,6 +41,7 @@ unsigned int oidMax;
 
 sockPoolHashTable_t *transReadSockPool;
 sockPoolHashTable_t *transPrefetchSockPool;
+sockPoolHashTable_t *transRequestSockPool;
 pthread_mutex_t notifymutex;
 pthread_mutex_t atomicObjLock;
 
@@ -180,6 +182,7 @@ int dstmStartup(const char * option) {
   //Initialize socket pool
   transReadSockPool = createSockPool(transReadSockPool, 2*numHostsInSystem+1);
   transPrefetchSockPool = createSockPool(transPrefetchSockPool, 2*numHostsInSystem+1);
+  transRequestSockPool = createSockPool(transRequestSockPool, 2*numHostsInSystem+1);
   
   dstmInit();
   transInit();
@@ -483,7 +486,6 @@ int transCommit(transrecord_t *record) {
     pthread_mutex_t tlshrd;
     
     thread_data_array = (thread_data_array_t *) calloc(pilecount, sizeof(thread_data_array_t));
-    
     ltdata = calloc(1, sizeof(local_thread_data_array_t));
     
     thread_response_t rcvd_control_msg[pilecount];     /* Shared thread array that keeps track of responses of participants */
@@ -520,6 +522,7 @@ int transCommit(transrecord_t *record) {
       thread_data_array[threadnum].replyctrl = &treplyctrl;
       thread_data_array[threadnum].replyretry = &treplyretry;
       thread_data_array[threadnum].rec = record;
+      thread_data_array[threadnum].pilehead = pile_ptr;
       /* If local do not create any extra connection */
       if(pile->mid != myIpAddr) { /* Not local */
        do {
@@ -639,25 +642,11 @@ void *transRequest(void *threadarg) {
   objheader_t *headeraddr;
   char control, recvcontrol;
   char machineip[16], retval;
-  
+
   tdata = (thread_data_array_t *) threadarg;
-  
-  /* Send Trans Request */
-  if ((sd = socket(AF_INET, SOCK_STREAM, 0)) <= 0) {
-    printf("transRequest():error %d\n", errno);
-    perror("transRequest() socket error");
-    pthread_exit(NULL);
-  }
-  bzero((char*) &serv_addr, sizeof(serv_addr));
-  serv_addr.sin_family = AF_INET;
-  serv_addr.sin_port = htons(LISTEN_PORT);
-  serv_addr.sin_addr.s_addr = htonl(tdata->mid);
-
-  /* Open Connection */
-  if (connect(sd, (struct sockaddr *) &serv_addr, sizeof(serv_addr)) < 0) {
-    printf("transRequest():error %d, sd= %d\n", errno, sd);
-    perror("transRequest() connect");
-    close(sd);
+
+  if((sd = getSock2WithLock(transRequestSockPool, tdata->mid)) < 0) {
+    printf("transRequest(): socket create error\n");
     pthread_exit(NULL);
   }
   
@@ -695,7 +684,6 @@ void *transRequest(void *threadarg) {
     pthread_mutex_lock(&prefetchcache_mutex);
     if ((newAddr = prefetchobjstrAlloc((unsigned int)length)) == NULL) {
       printf("Error: %s() objstrAlloc error for copying into prefetch cache %s, %d\n", __func__, __FILE__, __LINE__);
-      close(sd);
       pthread_exit(NULL);
     }
     pthread_mutex_unlock(&prefetchcache_mutex);
@@ -744,7 +732,6 @@ void *transRequest(void *threadarg) {
    * to all participants in their respective socket */
   if (sendResponse(tdata, sd) == 0) { 
     printf("sendResponse returned error %s,%d\n", __FILE__, __LINE__);
-    close(sd);
     pthread_exit(NULL);
   }
   
@@ -757,9 +744,6 @@ void *transRequest(void *threadarg) {
   } else {
     //printf("DEBUG-> Error: Incorrect Transaction End Message %d\n", control);
   }
-  
-  /* Close connection */
-  close(sd);
   pthread_exit(NULL);
 }
 
@@ -796,30 +780,17 @@ void decideResponse(thread_data_array_t *tdata) {
     *(tdata->replyctrl) = TRANS_ABORT;
     *(tdata->replyretry) = 0;
     /* clear objects from prefetch cache */
-    for (i = 0; i < tdata->buffer->f.numread; i++) {
-      prehashRemove(*((unsigned int *)(((char *)tdata->buffer->objread) + (sizeof(unsigned int) + sizeof(unsigned short))*i)));
-    }
-    for (i = 0; i < tdata->buffer->f.nummod; i++) {
-      prehashRemove(tdata->buffer->oidmod[i]);
-    }
+    cleanPCache(tdata);
   } else if(transagree == tdata->buffer->f.mcount){
     /* Send Commit */
     *(tdata->replyctrl) = TRANS_COMMIT;
     *(tdata->replyretry) = 0;
-    /* update prefetch cache */
-    /* For objects read */
-    char oidType;
     int retval;
-    oidType = 'R'; 
-    if((retval = updatePrefetchCache(tdata, tdata->buffer->f.numread, oidType)) != 0) {
-      printf("Error: %s() in updating prefetch cache %s, %d\n", __func__, __FILE__, __LINE__);
-      return;
-    }
-    oidType = 'M'; 
-    if((retval = updatePrefetchCache(tdata, tdata->buffer->f.nummod, oidType)) != 0) {
+    if((retval = updatePrefetchCache(tdata)) != 0) {
       printf("Error: %s() in updating prefetch cache %s, %d\n", __func__, __FILE__, __LINE__);
       return;
     }
+
     /* Invalidate objects in other machine cache */
     if(tdata->buffer->f.nummod > 0) {
       if((retval = invalidateObj(tdata)) != 0) {
@@ -835,47 +806,6 @@ void decideResponse(thread_data_array_t *tdata) {
   return;
 }
 
-/* This function updates the prefetch cache when commiting objects 
- * based on the type of oid i.e. if oid is read or oid is modified
- * Return -1 on error else returns 0 
- */
-int updatePrefetchCache(thread_data_array_t* tdata, int numoid, char oidType) {
-  int i;
-  for (i = 0; i < numoid; i++) {
-    //find address object 
-    objheader_t *header, *newAddr;
-    int size;
-    unsigned int oid;
-    if(oidType == 'R') {
-      oid = *((unsigned int *)(tdata->buffer->objread + (sizeof(unsigned int) + sizeof(unsigned short))*i)); 
-    } else {
-      oid = tdata->buffer->oidmod[i];
-    }
-    pthread_mutex_lock(&prefetchcache_mutex);
-    header = (objheader_t *) chashSearch(tdata->rec->lookupTable, oid);
-    header->version += 1;
-    //copy object into prefetch cache
-    GETSIZE(size, header);
-    if ((newAddr = prefetchobjstrAlloc(size + sizeof(objheader_t))) == NULL) {
-      printf("Error: %s() objstrAlloc error for copying into prefetch cache %s, %d\n", __func__, __FILE__, __LINE__);
-      pthread_mutex_unlock(&prefetchcache_mutex);
-      return -1;
-    }
-    pthread_mutex_unlock(&prefetchcache_mutex);
-    memcpy(newAddr, header, (size + sizeof(objheader_t)));
-    //make an entry in prefetch hash table
-    void *oldptr;
-    if((oldptr = prehashSearch(oid)) != NULL) {
-      prehashRemove(oid);
-      prehashInsert(oid, newAddr);
-    } else {
-      prehashInsert(oid, newAddr);
-    }
-  }
-  return 0;
-}
-
-
 /* This function sends the final response to remote machines per
  * thread in their respective socket id It returns a char that is only
  * needed to check the correctness of execution of this function
@@ -1741,3 +1671,70 @@ void transAbort(transrecord_t *trans) {
   chashDelete(trans->lookupTable);
   free(trans);
 }
+
+/* This function inserts necessary information into 
+ * a machine pile data structure */
+plistnode_t *pInsert(plistnode_t *pile, objheader_t *headeraddr, unsigned int mid, int num_objs) {
+  plistnode_t *ptr, *tmp;
+  int found = 0, offset = 0;
+  
+  tmp = pile;
+  //Add oid into a machine that is already present in the pile linked list structure
+  while(tmp != NULL) {
+    if (tmp->mid == mid) {
+      int tmpsize;
+      
+      if (STATUS(headeraddr) & NEW) {
+       tmp->oidcreated[tmp->numcreated] = OID(headeraddr);
+       tmp->numcreated++;
+       GETSIZE(tmpsize, headeraddr);
+       tmp->sum_bytes += sizeof(objheader_t) + tmpsize;
+      }else if (STATUS(headeraddr) & DIRTY) {
+       tmp->oidmod[tmp->nummod] = OID(headeraddr);
+       tmp->nummod++;
+       GETSIZE(tmpsize, headeraddr);
+       tmp->sum_bytes += sizeof(objheader_t) + tmpsize;
+      } else {
+       offset = (sizeof(unsigned int) + sizeof(short)) * tmp->numread;
+       *((unsigned int *)(((char *)tmp->objread) + offset))=OID(headeraddr);
+       offset += sizeof(unsigned int);
+       *((short *)(((char *)tmp->objread) + offset)) = headeraddr->version;
+       tmp->numread ++;
+      }
+      found = 1;
+      break;
+    }
+    tmp = tmp->next;
+  }
+  //Add oid for any new machine 
+  if (!found) {
+    int tmpsize;
+    if((ptr = pCreate(num_objs)) == NULL) {
+      return NULL;
+    }
+    ptr->mid = mid;
+    if (STATUS(headeraddr) & NEW) {
+      ptr->oidcreated[ptr->numcreated] = OID(headeraddr);
+      ptr->numcreated ++;
+      GETSIZE(tmpsize, headeraddr);
+      ptr->sum_bytes += sizeof(objheader_t) + tmpsize;
+    } else if (STATUS(headeraddr) & DIRTY) {
+      ptr->oidmod[ptr->nummod] = OID(headeraddr);
+      ptr->nummod ++;
+      GETSIZE(tmpsize, headeraddr);
+      ptr->sum_bytes += sizeof(objheader_t) + tmpsize;
+    } else {
+      *((unsigned int *)ptr->objread)=OID(headeraddr);
+      offset = sizeof(unsigned int);
+      *((short *)(((char *)ptr->objread) + offset)) = headeraddr->version;
+      ptr->numread ++;
+    }
+    ptr->next = pile;
+    pile = ptr;
+  }
+  
+  /* Clear Flags */
+  STATUS(headeraddr) =0;
+  
+  return pile;
+}