added the new looup table for prefetch cache
authoradash <adash>
Thu, 18 Mar 2010 01:22:52 +0000 (01:22 +0000)
committeradash <adash>
Thu, 18 Mar 2010 01:22:52 +0000 (01:22 +0000)
updated gCollect as per the new prefetch cache
send trans disagree quickly and unlock the objects quickly
some bugs fixes that resulted from the dsm caching

Robust/src/Runtime/DSTM/interface_recovery/addPrefetchEnhance.c
Robust/src/Runtime/DSTM/interface_recovery/addPrefetchEnhance.h
Robust/src/Runtime/DSTM/interface_recovery/addUdpEnhance.c
Robust/src/Runtime/DSTM/interface_recovery/addUdpEnhance.h
Robust/src/Runtime/DSTM/interface_recovery/altprelookup.c [new file with mode: 0755]
Robust/src/Runtime/DSTM/interface_recovery/altprelookup.h [new file with mode: 0755]
Robust/src/Runtime/DSTM/interface_recovery/dstm.h
Robust/src/Runtime/DSTM/interface_recovery/dstmserver.c
Robust/src/Runtime/DSTM/interface_recovery/gCollect.c
Robust/src/Runtime/DSTM/interface_recovery/prefetch.c
Robust/src/Runtime/DSTM/interface_recovery/trans.c

index f0436e0a76b2828c6ccc0389f3b0fa2319e211e6..ec000e3e5a085668a2c0be14f22cf741ea3f841b 100644 (file)
@@ -1,5 +1,5 @@
 #include "addPrefetchEnhance.h"
-#include "prelookup.h"
+#include "altprelookup.h"
 
 extern int numprefetchsites; // Number of prefetch sites
 extern pfcstats_t *evalPrefetch; //Global array that keeps track of operation mode (ON/OFF) for each prefetch site
@@ -44,10 +44,11 @@ char getOperationMode(int siteid) {
  * we take action accordingly */
 void handleDynPrefetching(int numLocal, int ntuples, int siteid) {
   if(numLocal < ntuples) {
-    /* prefetch not found locally(miss in cache) */
+    /* prefetch not found locally(miss in cache); turn on prefetching*/
     evalPrefetch[siteid].operMode = 1;
     evalPrefetch[siteid].uselesscount = SHUTDOWNINTERVAL;
   } else {
+    //Turn off prefetch site
     if(getOperationMode(siteid) != 0) {
       evalPrefetch[siteid].uselesscount--;
       if(evalPrefetch[siteid].uselesscount <= 0) {
@@ -107,15 +108,15 @@ void cleanPCache() {
 int updatePrefetchCache(trans_req_data_t *tdata) {
   int retval;
   char oidType;
-  /* TODO commit it for now because objects read
-   * are already copied to cache during remote reading */
-  //oidType = 'R';
-  //if(tdata->f.numread > 0) {
-  //  if((retval = copyToCache(tdata->f.numread, (unsigned int *)(tdata->objread), oidType)) != 0) {
-  //    printf("%s(): Error in copying objects read at %s, %d\n", __func__, __FILE__, __LINE__);
-  //    return -1;
-  //  }
-  //}
+  /*//TODO comment it for now because remote objects read are already in the prefetch cache
+  oidType = 'R';
+  if(tdata->f.numread > 0) {
+    if((retval = copyToCache(tdata->f.numread, (unsigned int *)(tdata->objread), oidType)) != 0) {
+      printf("%s(): Error in copying objects read at %s, %d\n", __func__, __FILE__, __LINE__);
+      return -1;
+    }
+  }
+  */
   if(tdata->f.nummod > 0) {
     oidType = 'M';
     if((retval = copyToCache(tdata->f.nummod, tdata->oidmod, oidType)) != 0) {
@@ -130,13 +131,13 @@ int copyToCache(int numoid, unsigned int *oidarray, char oidType) {
   int i;
   for (i = 0; i < numoid; i++) {
     unsigned int oid;
-    if(oidType == 'R') {
-      char * objread = (char *) oidarray;
-      oid = *((unsigned int *)(objread+(sizeof(unsigned int)+
-                                        sizeof(unsigned short))*i));
-    } else {
+    //if(oidType == 'R') {
+    //  char * objread = (char *) oidarray;
+    //  oid = *((unsigned int *)(objread+(sizeof(unsigned int)+
+    //                                    sizeof(unsigned short))*i));
+    //} else {
       oid = oidarray[i];
-    }
+    //}
     pthread_mutex_lock(&prefetchcache_mutex);
     objheader_t * header;
     if((header = (objheader_t *) t_chashSearch(oid)) == NULL) {
@@ -161,14 +162,10 @@ int copyToCache(int numoid, unsigned int *oidarray, char oidType) {
       newAddr->version += 1;
       newAddr->notifylist = NULL;
     }
+    STATUS(newAddr)=0;
+
     //make an entry in prefetch lookup hashtable
-    void *oldptr;
-    if((oldptr = prehashSearch(oid)) != NULL) {
-      prehashRemove(oid);
-      prehashInsert(oid, newAddr);
-    } else {
-      prehashInsert(oid, newAddr);
-    }
+    prehashInsert(oid, newAddr);
   } //end of for
   return 0;
 }
index 44c87049fedbb4e2e7f29bda53c84a1883b1d2fe..7eb3c5192247f90771a81b2d9810fb0cdd629be8 100644 (file)
@@ -2,7 +2,7 @@
 #define _ADDPREFETCHENHANCE_H_
 
 #include "dstm.h"
-#include "mlookup.h"
+#include "altmlookup.h"
 #include "gCollect.h"
 
 typedef struct prefetchCountStats {
index c7e3f9f4bc333b409d13c9e3029276c600aad770..ae49de7f1ea2c10a23cb1a026bf2ed6e537b28b4 100644 (file)
@@ -5,7 +5,7 @@
 #include <math.h>
 #include <netinet/tcp.h>
 #include "addUdpEnhance.h"
-#include "prelookup.h"
+#include "altprelookup.h"
 #ifdef ABORTREADERS
 #include "abortreaders.h"
 #endif
@@ -135,26 +135,6 @@ int invalidateObj(trans_req_data_t *tdata, int pilecount, char finalresponse, in
   return 0;
 }
 
-#if 0
-int invalidateObj(trans_req_data_t *tdata) {
-  struct sockaddr_in clientaddr;
-  int retval;
-
-  bzero(&clientaddr, sizeof(clientaddr));
-  clientaddr.sin_family = AF_INET;
-  clientaddr.sin_port = htons(UDP_PORT);
-  clientaddr.sin_addr.s_addr = INADDR_BROADCAST;
-  int maxObjsPerMsg = (MAX_SIZE - 2*sizeof(unsigned int))/sizeof(unsigned int);
-  /* send single udp msg */
-  if((retval = sendUdpMsg(tdata, pilecount, nummod, &clientaddr, finalresponse, socklist)) < 0) {
-    printf("%s() error in sending udp message at %s, %d\n", __func__, __FILE__, __LINE__);
-    return -1;
-  }
-  return 0;
-}
-
-#endif
-
 /* Function sends a udp broadcast, also distinguishes
  * msg size to be sent based on the total number of objects modified
  * returns -1 on error and 0 on success */
@@ -198,52 +178,6 @@ send:
   return 0;
 }
 
-#if 0
-
-/* Function sends a udp broadcast, also distinguishes
- * msg size to be sent based on the iteration flag
- * returns -1 on error and 0 on success */
-int sendUdpMsg(trans_req_data_t *tdata, struct sockaddr_in *clientaddr, int iteration) {
-  char writeBuffer[MAX_SIZE];
-  int maxObjsPerMsg = (MAX_SIZE - 2*sizeof(unsigned int))/sizeof(unsigned int);
-  int offset = 0;
-  *((short *)&writeBuffer[0]) = INVALIDATE_OBJS; //control msg
-  offset += sizeof(short);
-  *((unsigned int *)(writeBuffer+offset)) = myIpAddr; //mid sending invalidation
-  offset += sizeof(unsigned int);
-  if(iteration == 0) { // iteration flag == zero, send single udp msg
-    *((short *)(writeBuffer+offset)) = (short) (sizeof(unsigned int) * (tdata->f.nummod));  //sizeof msg
-    offset += sizeof(short);
-    int i;
-    for(i = 0; i < tdata->f.nummod; i++) {
-      *((unsigned int *) (writeBuffer+offset)) = tdata->oidmod[i];  //copy objects
-      offset += sizeof(unsigned int);
-    }
-  } else { // iteration flag > zero, send multiple udp msg
-    int numObj;
-    if((tdata->f.nummod - (iteration * maxObjsPerMsg)) > 0)
-      numObj = maxObjsPerMsg;
-    else
-      numObj = tdata->f.nummod - ((iteration - 1)*maxObjsPerMsg);
-    *((short *)(writeBuffer+offset)) = (short) (sizeof(unsigned int) * numObj);
-    offset += sizeof(short);
-    int index = (iteration - 1) * maxObjsPerMsg;
-    int i;
-    for(i = 0; i < numObj; i++) {
-      *((unsigned int *) (writeBuffer+offset)) = tdata->oidmod[index+i];
-      offset += sizeof(unsigned int);
-    }
-  }
-  int n;
-  if((n = sendto(udpSockFd, (const void *) writeBuffer, sizeof(writeBuffer), 0, (const struct sockaddr *)clientaddr, sizeof(struct sockaddr_in))) < 0) {
-    perror("sendto error- ");
-    printf("DEBUG-> sendto error: errorno %d\n", errno);
-    return -1;
-  }
-  return 0;
-}
-#endif
-
 /* Function searches given oid in prefetch cache and invalidates obj from cache
  * returns -1 on error and 0 on success */
 int invalidateFromPrefetchCache(char *buffer) {
index 295f8af454ef3f6a5f1561784f4675737c7281bf..38cca125e64a0d21a0ddccece5c23dc246f46f87 100644 (file)
@@ -2,7 +2,7 @@
 #define _ADDUDPENHANCE_H
 
 #include "dstm.h"
-#include "mlookup.h"
+#include "altmlookup.h"
 
 
 /*******************************
@@ -21,9 +21,7 @@
 int createUdpSocket();
 int udpInit();
 void *udpListenBroadcast(void *);
-//int invalidateObj(trans_req_data_t *);
 int invalidateObj(trans_req_data_t *, int, char, int*);
 int invalidateFromPrefetchCache(char *);
-//int sendUdpMsg(trans_req_data_t *, struct sockaddr_in *, int);
 int sendUdpMsg(trans_req_data_t *, int, int, struct sockaddr_in *, char, int*);
 #endif
diff --git a/Robust/src/Runtime/DSTM/interface_recovery/altprelookup.c b/Robust/src/Runtime/DSTM/interface_recovery/altprelookup.c
new file mode 100755 (executable)
index 0000000..b44e8e9
--- /dev/null
@@ -0,0 +1,304 @@
+#include "altprelookup.h"
+#include "dsmlock.h"
+#include "gCollect.h"
+extern objstr_t *prefetchcache;
+extern pthread_mutex_t prefetchcache_mutex; //Mutex to lock Prefetch Cache
+extern prefetchNodeInfo_t pNodeInfo;
+
+prehashtable_t pflookup; //Global prefetch cache table
+
+unsigned int prehashCreate(unsigned int size, float loadfactor) {
+  prehashlistnode_t *nodes;
+  int i;
+
+  // Allocate space for the hash table
+  if((nodes = calloc(size, sizeof(prehashlistnode_t))) == NULL) {
+    printf("Calloc error %s %d\n", __FILE__, __LINE__);
+    return 1;
+  }
+  pflookup.table = nodes;
+  pflookup.size = size;
+  pflookup.mask = size -1;
+  pflookup.numelements = 0; // Initial number of elements in the hash
+  pflookup.loadfactor = loadfactor;
+  pflookup.threshold=loadfactor*size;
+  
+  //Initilize 
+  for(i=0;i<PRENUMLOCKS;i++){
+    pflookup.larray[i].lock=RW_LOCK_BIAS;
+  }
+
+  /*
+  //Intiliaze and set prefetch table mutex attribute
+  pthread_mutexattr_init(&pflookup.prefetchmutexattr);
+  //NOTE:PTHREAD_MUTEX_RECURSIVE is currently inside a #if_def UNIX98 in the pthread.h file
+  //Therefore use PTHREAD_MUTEX_RECURSIVE_NP instead
+  pthread_mutexattr_settype(&pflookup.prefetchmutexattr, PTHREAD_MUTEX_RECURSIVE_NP);
+
+  //Initialize mutex var
+  pthread_mutex_init(&pflookup.lock, &pflookup.prefetchmutexattr);
+  //pthread_mutex_init(&pflookup.lock, NULL);
+  pthread_cond_init(&pflookup.cond, NULL);
+  */
+
+  return 0;
+}
+
+//Assign keys to bins inside hash table
+unsigned int prehashFunction(unsigned int key) {
+  return ( key & pflookup.mask) >> 1;
+}
+
+//Store oids and their pointers into hash
+void prehashInsert(unsigned int key, void *val) {
+  
+  int isFound=0;
+  prehashlistnode_t *ptr, *tmp, *node;
+
+  if(pflookup.numelements > (pflookup.threshold)) {
+    //Resize
+    unsigned int newsize = pflookup.size << 1;
+    prehashResize(newsize);
+  }
+
+  unsigned int keyindex=key>>1;
+  volatile unsigned int * lockptr=&pflookup.larray[keyindex&PRELOCKMASK].lock;
+  while(!write_trylock(lockptr)) {
+    sched_yield();
+  }
+
+  ptr = &pflookup.table[keyindex&pflookup.mask];
+
+  if(ptr->key==0) { //Insert at the first bin of the table
+    ptr->key = key;
+    ptr->val = val;
+    atomic_inc(&pflookup.numelements);
+  } else {
+    tmp = ptr;
+    while(tmp != NULL) { 
+      if(tmp->key == key) {
+        isFound=1;
+        tmp->val = val;//Replace value for an exsisting key
+        write_unlock(lockptr);
+        return;
+      }
+      tmp=tmp->next;
+    }
+    if(!isFound) { //Insert new key and value into the chain of linked list for the given bin
+      node = calloc(1, sizeof(prehashlistnode_t));
+      node->key = key;
+      node->val = val ;
+      node->next = ptr->next;
+      ptr->next=node;
+      atomic_inc(&pflookup.numelements);
+    }
+  }
+  write_unlock(lockptr);
+  return;
+}
+
+// Search for an address for a given oid
+void *prehashSearch(unsigned int key) {
+  int index;
+  unsigned int keyindex=key>>1;
+  volatile unsigned int * lockptr=&pflookup.larray[keyindex&PRELOCKMASK].lock;
+  while(!read_trylock(lockptr)) {
+    sched_yield();
+  }
+  prehashlistnode_t *node = &pflookup.table[keyindex&pflookup.mask];
+  do {
+    if(node->key == key) {
+      void * tmp=node->val;
+      read_unlock(lockptr);
+      return tmp;
+    }
+    node = node->next;
+  } while (node!=NULL);
+  read_unlock(lockptr);
+  return NULL;
+}
+
+unsigned int prehashRemove(unsigned int key) {
+  unsigned int keyindex = key >> 1;
+  volatile unsigned int * lockptr=&pflookup.larray[keyindex&PRELOCKMASK].lock;
+  prehashlistnode_t *node, *prev;
+
+  while(!write_trylock(lockptr)) {
+    sched_yield();
+  }
+  prehashlistnode_t *curr = &pflookup.table[keyindex&pflookup.mask];
+  // If there are no elements
+  //delete from first bin of table
+  if (curr->next == NULL && curr->key == key) {
+    curr->key = 0;
+    //TODO free(val) ?
+    curr->val = NULL;
+    atomic_dec(&(pflookup.numelements));
+    write_unlock(lockptr);
+    return 0;
+  }
+  //delete from first bin of table but elements follow in linked list
+  if (curr->next != NULL && curr->key == key) {
+    curr->key = curr->next->key;
+    curr->val = curr->next->val;
+    node = curr->next;
+    curr->next = node->next;
+    free(node);
+    atomic_dec(&(pflookup.numelements));
+    write_unlock(lockptr);
+    return 0;
+  }
+  prev = curr;
+  curr = curr->next;
+  //delete from elements in the linked list
+  for(; curr != NULL; curr = curr->next) {
+    if (curr->key == key) {
+      prev->next = curr->next;
+      free(curr);
+      atomic_dec(&(pflookup.numelements));
+      write_unlock(lockptr);
+      return 0;
+    }
+    prev = curr;
+  }
+  write_unlock(lockptr);
+  return 1;
+}
+unsigned int prehashResize(unsigned int newsize) {
+  prehashlistnode_t *node, *ptr;  // curr and next keep track of the current and the next chashlistnodes in a linked list
+  unsigned int oldsize;
+  int i,index;
+  unsigned int mask;
+
+  for(i=0;i<PRENUMLOCKS;i++) {
+    volatile unsigned int * lockptr=&pflookup.larray[i].lock;
+    
+    while(!write_trylock(lockptr)) {
+      sched_yield();
+    }
+  }
+  
+  if (pflookup.numelements < pflookup.threshold) {
+    //release lock and return
+    for(i=0;i<PRENUMLOCKS;i++) {
+      volatile unsigned int * lockptr=&pflookup.larray[i].lock;
+      write_unlock(lockptr);
+    }
+    return;
+  }
+
+  ptr = pflookup.table;
+  oldsize = pflookup.size;
+
+  if((node = calloc(newsize, sizeof(prehashlistnode_t))) == NULL) {
+    printf("Calloc error %s %d\n", __FILE__, __LINE__);
+    return 1;
+  }
+
+  pflookup.table = node;                //Update the global hashtable upon resize()
+  pflookup.size = newsize;
+  pflookup.threshold=newsize*pflookup.loadfactor;
+  mask=pflookup.mask = newsize -1;
+
+  for(i = 0; i < oldsize; i++) {                        //Outer loop for each bin in hash table
+    prehashlistnode_t * curr = &ptr[i];
+    prehashlistnode_t *tmp, *next;
+    int isfirst = 1;
+    do {
+      unsigned int key;
+      if ((key=curr->key) == 0) {             //Exit inner loop if there the first element for a given bin/index is NULL
+       break;                  //key = val =0 for element if not present within the hash table
+      }
+      next = curr->next;
+      //index = (key & mask)>>1;
+      index = (key >> 1) & mask;
+      tmp=&pflookup.table[index];
+      // Insert into the new table
+      if(tmp->key==0) {
+       tmp->key=curr->key;
+       tmp->val=curr->val;
+       if (!isfirst)
+         free(curr);
+      } /*
+         NOTE:  Add this case if you change this...                                                        
+         This case currently never happens because of the way things rehash....                            
+else if (isfirst) {
+       prehashlistnode_t * newnode = calloc(1, sizeof(prehashlistnode_t));
+       newnode->key = curr->key;
+       newnode->val = curr->val;
+       newnode->next = tmp->next;
+       tmp->next=newnode;
+       } */
+      else {
+       curr->next=tmp->next;
+       tmp->next=curr;
+      }
+
+      isfirst = 0;
+      curr = next;
+    } while(curr!=NULL);
+  }
+
+  free(ptr);            //Free the memory of the old hash table
+  for(i=0;i<PRENUMLOCKS;i++) {
+    volatile unsigned int * lockptr=&pflookup.larray[i].lock;
+    write_unlock(lockptr);
+  }
+  return 0;
+}
+
+//Note: This is based on the implementation of the inserting a key in the first position of the hashtable
+void prehashClear() {
+  /*
+#ifdef CACHE
+  int i, isFirstBin;
+  prehashlistnode_t *ptr, *prev, *curr;
+
+  pthread_mutex_lock(&pflookup.lock);
+
+  ptr = pflookup.table;
+  for(i = 0; i < pflookup.size; i++) {
+    prev = &ptr[i];
+    isFirstBin = 1;
+    while(prev->next != NULL) {
+      isFirstBin = 0;
+      curr = prev->next;
+      prev->next = curr->next;
+      free(curr);
+    }
+    if(isFirstBin == 1) {
+      prev->key = 0;
+      prev->next = NULL;
+    }
+  }
+  {
+    int stale;
+    pthread_mutex_unlock(&pflookup.lock);
+    pthread_mutex_lock(&prefetchcache_mutex);
+    if (pNodeInfo.newstale==NULL) {
+      //transfer the list wholesale;
+      pNodeInfo.oldstale=pNodeInfo.oldptr;
+      pNodeInfo.newstale=pNodeInfo.newptr;
+    } else {
+      //merge the two lists
+      pNodeInfo.newstale->prev=pNodeInfo.oldptr;
+      pNodeInfo.newstale=pNodeInfo.newptr;
+    }
+    stale=STALL_THRESHOLD-pNodeInfo.stale_count;
+    
+    if (stale>0&&stale>pNodeInfo.stall)
+      pNodeInfo.stall=stale;
+
+    pNodeInfo.stale_count+=pNodeInfo.os_count;
+    pNodeInfo.oldptr=getObjStr(DEFAULT_OBJ_STORE_SIZE);
+    pNodeInfo.newptr=pNodeInfo.oldptr;
+    pNodeInfo.os_count=1;
+    pthread_mutex_unlock(&prefetchcache_mutex);
+  }
+#endif
+  */
+}
+
diff --git a/Robust/src/Runtime/DSTM/interface_recovery/altprelookup.h b/Robust/src/Runtime/DSTM/interface_recovery/altprelookup.h
new file mode 100755 (executable)
index 0000000..fc08e29
--- /dev/null
@@ -0,0 +1,50 @@
+#ifndef _PRELOOKUP_H_
+#define _PRELOOKUP_H_
+
+#include <stdlib.h>
+#include <stdio.h>
+#include <pthread.h>
+
+#define PLOADFACTOR 0.25
+#define PHASH_SIZE 1024
+
+typedef struct prehashlistnode {
+  unsigned int key;
+  void *val;       //this can be cast to another type or used to point to a larger structure
+  struct prehashlistnode *next;
+} prehashlistnode_t;
+
+
+struct prelockarray {
+   volatile unsigned int lock;
+   int buf[15];
+};
+
+#define PRENUMLOCKS 16
+#define PRELOCKMASK (PRENUMLOCKS-1)
+
+
+struct objstr;
+
+typedef struct prehashtable {
+  prehashlistnode_t *table;     // points to beginning of hash table
+  unsigned int size;
+  unsigned int mask;
+  unsigned int numelements;
+  unsigned int threshold;
+  double loadfactor;
+  struct prelockarray larray[PRENUMLOCKS];
+} prehashtable_t;
+
+/* Prototypes for hash*/
+unsigned int prehashCreate(unsigned int size, float loadfactor);
+unsigned int prehashFunction(unsigned int key);
+void prehashInsert(unsigned int key, void *val);
+void *prehashSearch(unsigned int key); //returns val, NULL if not found
+unsigned int prehashRemove(unsigned int key); //returns -1 if not found
+unsigned int prehashResize(unsigned int newsize);
+void prehashClear();
+/* end hash */
+
+#endif
+
index 31212ea4db39ae7a5787d9e9166c2141c26a29ee..2ed4d70a10d916e598cea88ef8916ec778da7c18 100644 (file)
@@ -87,6 +87,7 @@
 #define RETRYINTERVAL  75 //N  (For MatrixMultiply, 2DFFT benchmarks)
 #define SHUTDOWNINTERVAL  1  //M
 #define NUM_TRY_TO_COMMIT 2
+#define MEM_ALLOC_THRESHOLD 20485760//20MB
 
 #include <stdlib.h>
 #include <stdio.h>
@@ -319,10 +320,12 @@ char handleTransReq(fixed_data_t *, trans_commit_data_t *, unsigned int *, char
 char decideCtrlMessage(fixed_data_t *, trans_commit_data_t *, int *, int *, int *, int *, int *, void *, unsigned int *, unsigned int *, int);
 int transCommitProcess(void *, unsigned int *, unsigned int *, int, int, int);
 void processReqNotify(unsigned int numoid, unsigned int *oid, unsigned short *version, unsigned int mid, unsigned int threadid);
-void getCommitCountForObjMod(unsigned int *, unsigned int *, unsigned int *, int *,
+char getCommitCountForObjMod(unsigned int *, unsigned int *, unsigned int *, int *,
                              int *, int *, int *, int *, int *, int *, char *, unsigned int, unsigned short);
-void getCommitCountForObjRead(unsigned int *, unsigned int *, unsigned int *, int *, int *, int *, int *, int *,
+char getCommitCountForObjRead(unsigned int *, unsigned int *, unsigned int *, int *, int *, int *, int *, int *,
                               int *, int *, char *, unsigned int, unsigned short);
+void procRestObjs(char *, char *, int , int, int, unsigned int *, unsigned int *, int *, int *, int *, int *);
+void processVerNoMatch(unsigned int *, unsigned int *, int *, int *, int *, int *, unsigned int, unsigned short);
 /* end server portion */
 
 /* Prototypes for transactions */
index cf5a0611ea6df8eb44b2054227f4f5bc5031998c..44b8b895929362008dd82fad74a4eb6286902bca 100644 (file)
@@ -281,7 +281,6 @@ void *dstmAccept(void *acceptfd) {
        char control,ctrl, response;
        char *ptr;
        void *srcObj;
-
 #ifdef RECOVERY
        void *dupeptr;
   unsigned int transIDreceived;
@@ -368,7 +367,6 @@ void *dstmAccept(void *acceptfd) {
                                transinfo.modptr = NULL;
                                transinfo.numlocked = 0;
                                transinfo.numnotfound = 0;
-                               //if((val = readClientReq(&transinfo, (int)acceptfd)) != 0) {
                                if((val = readClientReq(&transinfo, (int)acceptfd)) != 0) {
                                        printf("Error: In readClientReq() %s, %d\n", __FILE__, __LINE__);
                                        pthread_exit(NULL);
@@ -479,6 +477,7 @@ void *dstmAccept(void *acceptfd) {
                                threadNotify(oid,version,threadid);
                                free(buffer);
                                break;
+
 #ifdef RECOVERY
       case CLEAR_NOTIFY_LIST:
 #ifdef DEBUG
@@ -923,7 +922,6 @@ int readClientReq(trans_commit_data_t *transinfo, int acceptfd) {
     return 0;
 
   /* Read modified objects */
-  //printf("fixed.sum_bytes= %d\n", fixed.sum_bytes);
   if(fixed.nummod != 0) {
     if ((modptr = calloc(1, fixed.sum_bytes)) == NULL) {
       printf("calloc error for modified objects %s, %d\n", __FILE__, __LINE__);
@@ -947,15 +945,12 @@ int readClientReq(trans_commit_data_t *transinfo, int acceptfd) {
     return 1;
   }
   ptr = (char *) modptr;
-  //printf("fixed.nummod= %d\n", fixed.nummod);
-  //fflush(stdout);
-  for(i = 0 ; i < fixed.nummod; i++) {
+  for(i = 0 ; i < fixed.nummod; i++){
     int tmpsize=0;
     headaddr = (objheader_t *) ptr;
     oid = OID(headaddr);
     oidmod[i] = oid;
     GETSIZE(tmpsize, headaddr);
-    //printf("i= %d, tmpsize= %d, oid= %u\n", i, tmpsize, oid);
     ptr += sizeof(objheader_t) + tmpsize;
   }
 #ifdef DEBUG
@@ -1131,6 +1126,8 @@ char handleTransReq(fixed_data_t *fixed, trans_commit_data_t *transinfo, unsigne
    * Object store holds the modified objects involved in the transaction request */
   ptr = (char *) modptr;
 
+  char retval;
+
   /* Process each oid in the machine pile/ group per thread */
   for (i = 0; i < fixed->numread + fixed->nummod; i++) {
     if (i < fixed->numread) { //Objs only read and not modified
@@ -1142,7 +1139,7 @@ char handleTransReq(fixed_data_t *fixed, trans_commit_data_t *transinfo, unsigne
 #ifdef DEBUG
       printf("%s -> oid : %u    version : %d\n",__func__,oid,version);
 #endif
-      getCommitCountForObjRead(oidnotfound, oidlocked, oidvernotmatch, &objnotfound, &objlocked, &objvernotmatch,
+      retval=getCommitCountForObjRead(oidnotfound, oidlocked, oidvernotmatch, &objnotfound, &objlocked, &objvernotmatch,
                                &v_matchnolock, &v_matchlock, &v_nomatch, &numBytes, &control, oid, version);
     } else {  //Objs modified
       if(i == fixed->numread) {
@@ -1155,13 +1152,45 @@ char handleTransReq(fixed_data_t *fixed, trans_commit_data_t *transinfo, unsigne
       GETSIZE(tmpsize, headptr);
       ptr += sizeof(objheader_t) + tmpsize;
 
-      getCommitCountForObjMod(oidnotfound, oidlocked, oidvernotmatch, &objnotfound,
+      retval=getCommitCountForObjMod(oidnotfound, oidlocked, oidvernotmatch, &objnotfound,
                               &objlocked, &objvernotmatch, &v_matchnolock, &v_matchlock, &v_nomatch,
                               &numBytes, &control, oid, version);
     }
+    if(retval==TRANS_DISAGREE || retval==TRANS_SOFT_ABORT) {
+      //unlock objects as soon versions mismatch or else 
+      //locks cannot be acquired elsewhere
+      if (objlocked > 0) {
+        int useWriteUnlock = 0; 
+        for(j = 0; j < objlocked; j++) {
+          if(oidlocked[j] == -1) {
+             useWriteUnlock = 1; 
+             continue;
+          } 
+          if((headptr = mhashSearch(oidlocked[j])) == NULL) {
+            printf("mhashSearch returns NULL at %s, %d\n", __FILE__, __LINE__);
+            return 0;
+          }
+          if(useWriteUnlock) {
+            write_unlock(STATUSPTR(headptr));
+          } else {
+            read_unlock(STATUSPTR(headptr));
+          }
+        }
+        if(v_nomatch > 0)
+          free(oidlocked);
+      }
+      objlocked=0;
+      break;
+    }
+  }
+
+  //go through rest of the objects for version mismatches
+  if(retval==TRANS_DISAGREE || retval==TRANS_SOFT_ABORT) {
+    i++;
+    procRestObjs(objread, ptr, i, fixed->numread, fixed->nummod, oidnotfound, oidvernotmatch, &objnotfound, &objvernotmatch, &v_nomatch, &numBytes);
   }
 
-  /* send TRANS_DISAGREE and objs*/
+  /* send TRANS_DISAGREE and objs that caused the ABORTS*/
        if(v_nomatch > 0) {
 #ifdef CACHE
                char *objs = calloc(1, numBytes);
@@ -1175,6 +1204,7 @@ char handleTransReq(fixed_data_t *fixed, trans_commit_data_t *transinfo, unsigne
                        offset += size;
                }
 #endif
+        /*
                if (objlocked > 0) {
                        int useWriteUnlock = 0;
                        for(j = 0; j < objlocked; j++) {
@@ -1194,20 +1224,20 @@ char handleTransReq(fixed_data_t *fixed, trans_commit_data_t *transinfo, unsigne
                        }
                        free(oidlocked);
                }
+        */
     
-
 #ifdef DEBUG
                printf("%s -> control = %d, file = %s, line = %d\n", __func__,(int)control, __FILE__, __LINE__);
 #endif
 
     if(control < 0)
       printf("control = %d\n",control);
+    control=TRANS_DISAGREE;
 
                send_data(acceptfd, &control, sizeof(char));
 #ifdef CACHE
                send_data(acceptfd, &numBytes, sizeof(int));
                send_data(acceptfd, objs, numBytes);
-
                transinfo->objvernotmatch = oidvernotmatch;
                transinfo->numvernotmatch = objvernotmatch;
                free(objs);
@@ -1226,7 +1256,7 @@ char handleTransReq(fixed_data_t *fixed, trans_commit_data_t *transinfo, unsigne
 }
 
 /* Update Commit info for objects that are modified */
-void getCommitCountForObjMod(unsigned int *oidnotfound, unsigned int *oidlocked,
+char getCommitCountForObjMod(unsigned int *oidnotfound, unsigned int *oidlocked,
                              unsigned int *oidvernotmatch, int *objnotfound, int *objlocked, int *objvernotmatch,
                              int *v_matchnolock, int *v_matchlock, int *v_nomatch, int *numBytes,
                              char *control, unsigned int oid, unsigned short version) {
@@ -1251,6 +1281,7 @@ void getCommitCountForObjMod(unsigned int *oidnotfound, unsigned int *oidlocked,
                /* Save the oids not found and number of oids not found for later use */
                oidnotfound[*objnotfound] = oid;
                (*objnotfound)++;
+        *control = TRANS_DISAGREE;
        } else {     /* If Obj found in machine (i.e. has not moved) */
                /* Check if Obj is locked by any previous transaction */
                if (write_trylock(STATUSPTR(mobj))) { // Can acquire write lock
@@ -1260,6 +1291,7 @@ void getCommitCountForObjMod(unsigned int *oidnotfound, unsigned int *oidlocked,
 #endif
                        if (version == ((objheader_t *)mobj)->version) { /* match versions */
                                (*v_matchnolock)++;
+                               *control = TRANS_AGREE;
                        } else { /* If versions don't match ...HARD ABORT */
                                (*v_nomatch)++;
                                oidvernotmatch[*objvernotmatch] = oid;
@@ -1277,6 +1309,7 @@ void getCommitCountForObjMod(unsigned int *oidnotfound, unsigned int *oidlocked,
                } else {  //we are locked
                        if (version == ((objheader_t *)mobj)->version) {     /* Check if versions match */
                                (*v_matchlock)++;
+                               *control = TRANS_SOFT_ABORT;
                        } else { /* If versions don't match ...HARD ABORT */
                                (*v_nomatch)++;
                                oidvernotmatch[*objvernotmatch] = oid;
@@ -1293,10 +1326,11 @@ void getCommitCountForObjMod(unsigned int *oidnotfound, unsigned int *oidlocked,
 #ifdef DEBUG
        printf("%s -> oid: %u, v_matchnolock: %d, v_matchlock: %d, v_nomatch: %d\n",__func__,oid, *v_matchnolock, *v_matchlock, *v_nomatch);
 #endif
+    return *control;
 }
 
 /* Update Commit info for objects that are read */
-void getCommitCountForObjRead(unsigned int *oidnotfound, unsigned int *oidlocked, unsigned int *oidvernotmatch,
+char getCommitCountForObjRead(unsigned int *oidnotfound, unsigned int *oidlocked, unsigned int *oidvernotmatch,
                               int *objnotfound, int *objlocked, int * objvernotmatch, int *v_matchnolock, int *v_matchlock,
                               int *v_nomatch, int *numBytes, char *control, unsigned int oid, unsigned short version) {
   void *mobj;
@@ -1322,6 +1356,7 @@ void getCommitCountForObjRead(unsigned int *oidnotfound, unsigned int *oidlocked
     /* Save the oids not found and number of oids not found for later use */
     oidnotfound[*objnotfound] = oid;
     (*objnotfound)++;
+       *control = TRANS_DISAGREE;
   } else {     /* If Obj found in machine (i.e. has not moved) */
 #ifdef DEBUG
     printf("%s -> Obj found!!\n",__func__);
@@ -1333,6 +1368,7 @@ void getCommitCountForObjRead(unsigned int *oidnotfound, unsigned int *oidlocked
     if (read_trylock(STATUSPTR(mobj))) { //Can further acquire read locks
       if (version == ((objheader_t *)mobj)->version) { /* match versions */
        (*v_matchnolock)++;
+        *control = TRANS_AGREE;
       } else { /* If versions don't match ...HARD ABORT */
        (*v_nomatch)++;
        oidvernotmatch[(*objvernotmatch)++] = oid;
@@ -1350,6 +1386,7 @@ void getCommitCountForObjRead(unsigned int *oidnotfound, unsigned int *oidlocked
     } else { /* Some other transaction has aquired a write lock on this object */
       if (version == ((objheader_t *)mobj)->version) { /* Check if versions match */
        (*v_matchlock)++;
+       *control = TRANS_SOFT_ABORT;
       } else { /* If versions don't match ...HARD ABORT */
        (*v_nomatch)++;
        oidvernotmatch[*objvernotmatch] = oid;
@@ -1365,6 +1402,80 @@ void getCommitCountForObjRead(unsigned int *oidnotfound, unsigned int *oidlocked
 #ifdef DEBUG
        printf("%s -> oid: %u, v_matchnolock: %d, v_matchlock: %d, v_nomatch: %d\n",__func__, oid, *v_matchnolock, *v_matchlock, *v_nomatch);
 #endif
+  return *control;
+}
+
+void procRestObjs(char *objread, 
+                  char *objmod, 
+                  int index, 
+                  int numread, 
+                  int nummod, 
+                  unsigned int *oidnotfound, 
+                  unsigned int *oidvernotmatch,
+                  int *objnotfound, 
+                  int *objvernotmatch, 
+                  int *v_nomatch, 
+                  int *numBytes) {
+  int i;
+  unsigned int oid;
+  unsigned short version;
+
+  /* Process each oid in the machine pile/ group per thread */
+  for (i = index; i < numread+nummod; i++) {
+    if (i < numread) { //Objs only read and not modified
+      int incr = sizeof(unsigned int) + sizeof(unsigned short); // Offset that points to next position in the objread array
+      incr *= i;
+      oid = *((unsigned int *)(objread + incr));
+      incr += sizeof(unsigned int);
+      version = *((unsigned short *)(objread + incr));
+    } else {  //Objs modified
+      objheader_t *headptr;
+      headptr = (objheader_t *) objmod;
+      oid = OID(headptr);
+      version = headptr->version;
+      int tmpsize;
+      GETSIZE(tmpsize, headptr);
+      objmod += sizeof(objheader_t) + tmpsize;
+    }
+    processVerNoMatch(oidnotfound,
+        oidvernotmatch,
+        objnotfound,
+        objvernotmatch,
+        v_nomatch,
+        numBytes,
+        oid, 
+        version);
+  }
+  return;
+}
+
+void processVerNoMatch(unsigned int *oidnotfound, 
+                      unsigned int *oidvernotmatch, 
+                      int *objnotfound, 
+                      int *objvernotmatch, 
+                      int *v_nomatch, 
+                      int *numBytes,
+                      unsigned int oid, 
+                      unsigned short version) {
+  void *mobj;
+  /* Check if object is still present in the machine since the beginning of TRANS_REQUEST */
+
+  if ((mobj = mhashSearch(oid)) == NULL) {    /* Obj not found */
+    /* Save the oids not found and number of oids not found for later use */
+    oidnotfound[*objnotfound] = oid;
+    (*objnotfound)++;
+  } else {     /* If Obj found in machine (i.e. has not moved) */
+    /* Check if Obj is locked by any previous transaction */
+    if (version != ((objheader_t *)mobj)->version) { /* match versions */
+      (*v_nomatch)++;
+      oidvernotmatch[*objvernotmatch] = oid;
+         (*objvernotmatch)++;
+         int size;
+      GETSIZE(size, mobj);
+      size += sizeof(objheader_t);
+      *numBytes += size;
+    }
+  }
 }
 
 /* This function decides what control message such as TRANS_AGREE, TRANS_DISAGREE or TRANS_SOFT_ABORT
index 6e9d339e204ff9f0f9005d4c1f4306cada81ca56..185a02f618c422d1afa2fb3e7845ae5fb1c577a1 100644 (file)
@@ -1,5 +1,5 @@
 #include "gCollect.h"
-#include "prelookup.h"
+#include "altprelookup.h"
 
 
 extern pthread_mutex_t prefetchcache_mutex; //Mutex to lock Prefetch Cache
@@ -62,7 +62,7 @@ void *prefetchobjstrAlloc(unsigned int size) {
   pNodeInfo.newptr->prev=tmp;
   pNodeInfo.newptr=tmp;
   pNodeInfo.os_count++;
-  
+
   if (pNodeInfo.os_count>PREFETCH_FLUSH_THRESHOLD) {
     //remove oldest from linked list
     objstr_t *tofree=pNodeInfo.oldptr;
@@ -96,14 +96,21 @@ void *prefetchobjstrAlloc(unsigned int size) {
 }
 
 void clearBlock(objstr_t *block) {
+
   unsigned long int tmpbegin=(unsigned int)block;
   unsigned long int tmpend=(unsigned int)block->top;
   int i, j;
   prehashlistnode_t *ptr;
-  pthread_mutex_lock(&pflookup.lock);
 
+  int lockindex=0;
   ptr = pflookup.table;
+  volatile unsigned int * lockptr_current=&pflookup.larray[lockindex].lock;
+  while(!write_trylock(lockptr_current)) {
+    sched_yield();
+  }
+
   for(i = 0; i<pflookup.size; i++) {
+
     prehashlistnode_t *orig=&ptr[i];
     prehashlistnode_t *curr = orig;
     prehashlistnode_t *next=curr->next;
@@ -112,10 +119,10 @@ void clearBlock(objstr_t *block) {
       if ((val>=tmpbegin)&(val<tmpend)) {
        prehashlistnode_t *tmp=curr->next=next->next;
        free(next);
-       next=tmp;
+       next=curr;
        //loop condition is broken now...need to check before incrementing
-       if (next==NULL)
-         break;
+       //      if (next==NULL)
+       // break;
       }
     }
     {
@@ -133,8 +140,21 @@ void clearBlock(objstr_t *block) {
        }
       }
     }
-  }
-  pthread_mutex_unlock(&pflookup.lock);
+
+    if(((i+1)&(pflookup.mask>>4))==0 && (i+1)<pflookup.size){
+      // try to grab new lock
+      lockindex++;
+      volatile unsigned int * lockptr_new=&pflookup.larray[lockindex].lock;
+      while(!write_trylock(lockptr_new)){
+        sched_yield();
+      }
+      write_unlock(lockptr_current);
+      lockptr_current=lockptr_new;      
+    }
+    
+  }// end of for (pflokup)
+  
+  write_unlock(lockptr_current);
 }
 
 objstr_t *allocateNew(unsigned int size) {
index de522845df044925c7d4fc6c00c7d99315f3c30f..7c3b29729256be9f8dfb023aaed8ccb7ac343a88 100644 (file)
@@ -1,5 +1,5 @@
 #include "prefetch.h"
-#include "prelookup.h"
+#include "altprelookup.h"
 #include "sockpool.h"
 #include "gCollect.h"
 
index 88dd8f6eef39781d6a47b1d51079c587a1b35a18..e35829ef0d5792ebecf07625deb1b10281504350 100644 (file)
@@ -4,7 +4,7 @@
 #include "altmlookup.h"
 #include "llookup.h"
 #include "plookup.h"
-#include "prelookup.h"
+#include "altprelookup.h"
 #include "threadnotify.h"
 #include "queue.h"
 #include "addUdpEnhance.h"
@@ -51,7 +51,6 @@ extern int numprefetchsites; //Global variable containing number of prefetch sit
 extern pthread_mutex_t mainobjstore_mutex; // Mutex to lock main Object store
 pthread_mutex_t prefetchcache_mutex; // Mutex to lock Prefetch Cache
 pthread_mutexattr_t prefetchcache_mutex_attr; /* Attribute for lock to make it a recursive lock */
-extern prehashtable_t pflookup; //Global Prefetch cache's lookup table
 pthread_t wthreads[NUM_THREADS]; //Worker threads for working on the prefetch queue
 pthread_t tPrefetch;            /* Primary Prefetch thread that processes the prefetch queue */
 extern objstr_t *mainobjstore;
@@ -319,6 +318,48 @@ GDBRECV1:
   return 0; // got all the data
 }
 
+int recvw(int fd, void *buf, int len, int flags) {
+  return recv(fd, buf, len, flags);
+}
+
+void recv_data_buf(int fd, struct readstruct * readbuffer, void *buffer, int buflen) {
+  char *buf=(char *)buffer;
+  int numbytes=readbuffer->head-readbuffer->tail;
+  if (numbytes>buflen)
+    numbytes=buflen;
+  if (numbytes>0) {
+    memcpy(buf, &readbuffer->buf[readbuffer->tail], numbytes);
+    readbuffer->tail+=numbytes;
+    buflen-=numbytes;
+    buf+=numbytes;
+  }
+  if (buflen==0) {
+    return;
+  }
+  if (buflen>=MAXBUF) {
+    recv_data(fd, buf, buflen);
+    return;
+  }
+  
+  int maxbuf=MAXBUF;
+  int obufflen=buflen;
+  readbuffer->head=0;
+  
+  while (buflen > 0) {
+    int numbytes = recvw(fd, &readbuffer->buf[readbuffer->head], maxbuf, 0);
+    if (numbytes == -1) {
+      perror("recv");
+      exit(0);
+    }
+    bytesRecv+=numbytes;
+    buflen-=numbytes;
+    readbuffer->head+=numbytes;
+    maxbuf-=numbytes;
+  }
+  memcpy(buf,readbuffer->buf,obufflen);
+  readbuffer->tail=obufflen;
+}
+
 int recv_data_errorcode(int fd, void *buf, int buflen) {
 #ifdef DEBUG
   printf("%s-> Start; fd:%d, buflen:%d\n", __func__, fd, buflen);
@@ -896,26 +937,12 @@ remoteread:
 #endif
 
     objcopy = getRemoteObj(machinenumber, oid);
-
-#ifdef RECOVERY
-    if(transRetryFlag) {
-      restoreDuplicationState(machinenumber);
-#ifdef DEBUG
-      printf("%s -> Recall transRead2\n",__func__);
-#endif
-      return transRead2(oid);
-    }
-#endif
-
-  if(objcopy == NULL) {
-         printf("Error: Object not found in Remote location %s, %d\n", __FILE__, __LINE__);
-               return NULL;
-       } else {
 #ifdef TRANSSTATS
     LOGEVENT('R');
     nRemoteSend++;
 #endif
-#ifdef COMPILER
+
+    if(objcopy!=NULL) {
 #ifdef CACHE
       //Copy object to prefetch cache
       pthread_mutex_lock(&prefetchcache_mutex);
@@ -932,8 +959,25 @@ remoteread:
       memcpy(headerObj, objcopy, size+sizeof(objheader_t));
       //make an entry in prefetch lookup hashtable
       prehashInsert(oid, headerObj);
+      LOGEVENT('B');
 #endif
+    }
 
+#ifdef RECOVERY
+    if(transRetryFlag) {
+      restoreDuplicationState(machinenumber);
+#ifdef DEBUG
+      printf("%s -> Recall transRead2\n",__func__);
+#endif
+      return transRead2(oid);
+    }
+#endif
+
+  if(objcopy == NULL) {
+         printf("Error: Object not found in Remote location %s, %d\n", __FILE__, __LINE__);
+               return NULL;
+       } else {
+#ifdef COMPILER
                return &objcopy[1];
 #else
                return objcopy;
@@ -951,11 +995,9 @@ objheader_t *transCreateObj(unsigned int size) {
   OID(tmp) = getNewOID();
   tmp->notifylist = NULL;
   tmp->version = 1;
-  //tmp->rcount = 1;
   tmp->isBackup = 0;
   STATUS(tmp) = NEW;
   t_chashInsert(OID(tmp), tmp);
-
 #ifdef COMPILER
   return &tmp[1]; //want space after object header
 #else
@@ -981,7 +1023,6 @@ plistnode_t *createPiles() {
   chashlistnode_t * ptr = c_table;
   /* Represents number of bins in the chash table */
   unsigned int size = c_size;
-
        for(i = 0; i < size ; i++) {
     chashlistnode_t * curr = &ptr[i];
                /* Inner loop to traverse the linked list of the cache lookupTable */
@@ -1008,14 +1049,28 @@ plistnode_t *createPiles() {
             mid = myIpAddr;
         }
 
+        //if(mid == myIpAddr) {
         pile = pInsert(pile, headeraddr, getPrimaryMachine(mid), c_numelements);
+        //} else {
+        //  if(bit)
+        //   pile = pInsert(pile, headeraddr, getPrimaryMachine(mid), c_numelements);
+        //  else 
+        //    pile = pInsert(pile, headeraddr, getBackupMachine(mid), c_numelements);
+        //}
 
         if(numLiveHostsInSystem > 1) {
-                           if(makedirty) { 
-                                   STATUS(headeraddr) = DIRTY;
-                    pile = pInsert(pile, headeraddr, getBackupMachine(mid), c_numelements);
-                       }
-                         //pile = pInsert(pile, headeraddr, getBackupMachine(mid), c_numelements);
+          if(makedirty) { 
+            STATUS(headeraddr) = DIRTY;
+            //if(mid == myIpAddr) {
+            //  pile = pInsert(pile, headeraddr, getBackupMachine(mid), c_numelements);
+            //} else {
+            //  if(bit)
+            pile = pInsert(pile, headeraddr, getBackupMachine(mid), c_numelements);
+            //  else 
+            //    pile = pInsert(pile, headeraddr, getPrimaryMachine(mid), c_numelements);
+            // }
+          }
+          //pile = pInsert(pile, headeraddr, getBackupMachine(mid), c_numelements);
         }
 #else
                // Get machine location for object id (and whether local or not)
@@ -1113,7 +1168,7 @@ int transCommit() {
   int treplyretryCount = 0;
   /* Initialize timeout for exponential delay */
   exponential_backoff.tv_sec = 0;
-  exponential_backoff.tv_nsec = (long)(10000);//10 microsec
+  exponential_backoff.tv_nsec = (long)(12000);//12 microsec
   count_exponential_backoff = 0;
   do {
     treplyretry = 0;
@@ -1208,7 +1263,6 @@ int transCommit() {
                                }
                                int offset = 0;
                                int i;
-                //printf("tosend[sockindex].f.nummod = %d\n", tosend[sockindex].f.nummod);
                                for(i = 0; i < tosend[sockindex].f.nummod ; i++) {
                                        int size;
                                        objheader_t *headeraddr;
@@ -1220,15 +1274,12 @@ int transCommit() {
                                                return 1;
                                        }
                                        GETSIZE(size,headeraddr);
-                    //printf("i= %d, tmpsize= %d, oid= %u\n", i, size, OID(headeraddr));
                                        size+=sizeof(objheader_t);
                                        memcpy(modptr+offset, headeraddr, size);
                                        offset+=size;
                                }
-                //printf("tosend[sockindex].f.sum_bytes= %d\n", tosend[sockindex].f.sum_bytes);
-                //fflush(stdout);
                                send_data(sd, modptr, tosend[sockindex].f.sum_bytes);
-                //forcesend_buf(sd, &writebuffer, modptr, tosend[sockindex].f.sum_bytes);
+                //send_buf(sd, &writebuffer, modptr, tosend[sockindex].f.sum_bytes);
 
 #ifdef RECOVERY
         /* send transaction id, number of machine involved, machine ids */
@@ -1288,13 +1339,7 @@ int transCommit() {
                                                GETSIZE(size, header);
                                                size += sizeof(objheader_t);
                                                //make an entry in prefetch hash table
-                                               void *oldptr;
-                                               if((oldptr = prehashSearch(oidToPrefetch)) != NULL) {
-                                                       prehashRemove(oidToPrefetch);
-                                                       prehashInsert(oidToPrefetch, header);
-                                               } else {
-                                                       prehashInsert(oidToPrefetch, header);
-                                               }
+                        prehashInsert(oidToPrefetch, header);
                                                length = length - size;
                                                offset += size;
                                        }
@@ -1415,7 +1460,7 @@ int transCommit() {
       pDelete(pile_ptr);
     /* wait a random amount of time before retrying to commit transaction*/
     if(treplyretry) {
-      treplyretryCount++;
+      //treplyretryCount++;
       //if(treplyretryCount >= NUM_TRY_TO_COMMIT)
       //  exponentialdelay();
       //else 
@@ -1592,7 +1637,7 @@ char decideResponse(char *getReplyCtrl, char *treplyretry, int pilecount) {
     return TRANS_ABORT;
 #ifdef CACHE
     /* clear objects from prefetch cache */
-    cleanPCache();
+    //cleanPCache();
 #endif
   } else if(transagree == pilecount) {
     /* Send Commit */
@@ -1810,7 +1855,6 @@ void commitCountForObjMod(char *getReplyCtrl, unsigned int *oidnotfound, unsigne
        (*v_nomatch)++;
        /* Send TRANS_DISAGREE to Coordinator */
        *getReplyCtrl = TRANS_DISAGREE;
-       //printf("%s() oid = %d, type = %d\t", __func__, OID(mobj), TYPE((objheader_t *)mobj));
        return;
       }
     }
@@ -2360,19 +2404,20 @@ int getPrefetchResponse(int sd) {
     /* Do a version comparison if the oid exists */
     if((oldptr = prehashSearch(oid)) != NULL) {
       /* If older version then update with new object ptr */
-      if(((objheader_t *)oldptr)->version <= ((objheader_t *)modptr)->version) {
-       prehashRemove(oid);
+      if(((objheader_t *)oldptr)->version < ((objheader_t *)modptr)->version) {
        prehashInsert(oid, modptr);
       }
     } else { /* Else add the object ptr to hash table*/
       prehashInsert(oid, modptr);
     }
+#if 0
     /* Lock the Prefetch Cache look up table*/
     pthread_mutex_lock(&pflookup.lock);
     /* Broadcast signal on prefetch cache condition variable */
     pthread_cond_broadcast(&pflookup.cond);
     /* Unlock the Prefetch Cache look up table*/
     pthread_mutex_unlock(&pflookup.lock);
+#endif
   } else if(control == OBJECT_NOT_FOUND) {
     oid = *((unsigned int *)(recvbuffer + sizeof(char)));
     /* TODO: For each object not found query DHT for new location and retrieve the object */