major changes to prefetching code
authorbdemsky <bdemsky>
Mon, 14 Apr 2008 07:16:27 +0000 (07:16 +0000)
committerbdemsky <bdemsky>
Mon, 14 Apr 2008 07:16:27 +0000 (07:16 +0000)
Robust/src/Runtime/DSTM/interface/dstm.h
Robust/src/Runtime/DSTM/interface/machinepile.c
Robust/src/Runtime/DSTM/interface/machinepile.h
Robust/src/Runtime/DSTM/interface/mcpileq.c
Robust/src/Runtime/DSTM/interface/mcpileq.h
Robust/src/Runtime/DSTM/interface/objstr.c
Robust/src/Runtime/DSTM/interface/queue.c
Robust/src/Runtime/DSTM/interface/threadnotify.c
Robust/src/Runtime/DSTM/interface/trans.c

index 8a86ea2fb9b61101e22c0b0f64a6e52ab8a60847..61c38a7888b4c070e95a07da3c37aebde51e0dfe 100644 (file)
@@ -266,10 +266,8 @@ void transAbort(transrecord_t *trans);
 void prefetch(int, unsigned int *, unsigned short *, short*);
 void *transPrefetch(void *);
 void *mcqProcess(void *);
-void checkPrefetchTuples(prefetchqelem_t *);
 prefetchpile_t *foundLocal(prefetchqelem_t *);// returns node with prefetch elements(oids, offsets)
-prefetchpile_t *makePreGroups(prefetchqelem_t *, int *);// returns node with prefetch elements(oids, offsets)
-void checkPreCache(prefetchqelem_t *, int *, unsigned int, int);
+int lookupObject(unsigned int * oid, short offset);
 int transPrefetchProcess(transrecord_t *, int **, short);
 void sendPrefetchReq(prefetchpile_t*, int);
 int getPrefetchResponse(int);
index c27ea01be31d6d977fdd44da5fb8f0b091d9af14..10b5aed87e0956789feed8b8a360fcbf9d8bd518 100644 (file)
@@ -1,85 +1,80 @@
 #include "machinepile.h"
 
-prefetchpile_t *insertPile(int mid, unsigned int oid, short numoffset, short *offset, prefetchpile_t *head) {
-       prefetchpile_t *tmp = head;
-       prefetchpile_t *ptr;
-       objpile_t *objnode;
-       unsigned int *oidarray;
-       short *offvalues;
-       int i;
-       char found = 0;
+void insertPile(int mid, unsigned int oid, short numoffset, short *offset, prefetchpile_t **head) {
+  prefetchpile_t *ptr;
+  objpile_t *objnode;
+  unsigned int *oidarray;
+  objpile_t **tmp;
 
-       while (tmp != NULL) {
-               if (tmp->mid == mid) { // Found a match with exsisting machine id
-                       if ((objnode = (objpile_t *) calloc(1, sizeof(objpile_t))) == NULL) {
-                               printf("Calloc error: %s %d\n", __FILE__, __LINE__);
-                               return NULL;
-                       }
-                       if ((offvalues = (short *) calloc(numoffset, sizeof(short))) == NULL) {
-                               printf("Calloc error: %s %d\n", __FILE__, __LINE__);
-                               return NULL;
-                       }
-                       /* Fill objpiles DS */
-                       objnode->oid = oid;
-                       objnode->numoffset = numoffset;
-                       for(i = 0; i<numoffset; i++)
-                               offvalues[i] = offset[i];
-                       objnode->offset = offvalues;
-                       objnode->next = tmp->objpiles;
-                       tmp->objpiles = objnode;
-                       found = 1;
-                       break;
-               }
-               tmp = tmp->next;
-       }
+  //Loop through the machines
+  for(;1;head=&((*head)->next)) {
+    int tmid;
+    if ((*head)==NULL||(tmid=(*head)->mid)>mid) {
+      prefetchpile_t * tmp = (prefetchpile_t *) malloc(sizeof(prefetchpile_t));
+      tmp->mid = mid;
+      objnode =  malloc(sizeof(objpile_t));
+      objnode->offset = offset;
+      objnode->oid = oid;
+      objnode->numoffset = numoffset;
+      objnode->next = NULL;
+      tmp->objpiles = objnode;
+      tmp->next = *head;
+      *head=tmp;
+      return;
+    }
+
+    //keep looking
+    if (tmid < mid)
+      continue;
+    
+    //found mid list
+    for(tmp=&((*head)->objpiles);1;tmp=&((*tmp)->next)) {
+      int toid;
+      int matchstatus;
+
+      if ((*tmp)==NULL||((toid=(*tmp)->oid)>oid)) {
+       objnode = (objpile_t *) malloc(sizeof(objpile_t));
+       objnode->offset = offset;
+       objnode->oid = oid;
+       objnode->numoffset = numoffset;
+       objnode->next = *tmp;
+       *tmp = objnode;
+       return;
+      }
+      if (toid < oid)
+       continue;
+      
+      /* Fill objpiles DS */
+      int i;
+      int onumoffset=(*tmp)->numoffset;
+      short * ooffset=(*tmp)->offset;
 
-       tmp = head;
-       if(found != 1) {
-                if(tmp->mid == 0) {//First time
-                       tmp->mid = mid;
-                       if ((objnode = (objpile_t *) calloc(1, sizeof(objpile_t))) == NULL) {
-                               printf("Calloc error: %s %d\n", __FILE__, __LINE__);
-                               return NULL;
-                       }
-                       if ((offvalues = (short *) calloc(numoffset, sizeof(short))) == NULL) {
-                               printf("Calloc error: %s %d\n", __FILE__, __LINE__);
-                               return NULL;
-                       }
-                       // Fill objpiles DS
-                       objnode->oid = oid;
-                       objnode->numoffset = numoffset;
-                       for(i = 0; i<numoffset; i++)
-                               offvalues[i] = *((short *)offset + i); 
-                       objnode->offset = offvalues;
-                       objnode->next = NULL;
-                       tmp->objpiles = objnode;
-                       tmp->next = NULL;
-               } else {
-                       if ((tmp = (prefetchpile_t *) calloc(1, sizeof(prefetchpile_t))) == NULL) {
-                               printf("Calloc error: %s %d\n", __FILE__, __LINE__);
-                               return NULL;
-                       }
-                       tmp->mid = mid;
-                       if ((objnode = (objpile_t *) calloc(1, sizeof(objpile_t))) == NULL) {
-                               printf("Calloc error: %s %d\n", __FILE__, __LINE__);
-                               return NULL;
-                       }
-                       if ((offvalues = (short *) calloc(numoffset, sizeof(short))) == NULL) {
-                               printf("Calloc error: %s %d\n", __FILE__, __LINE__);
-                               return NULL;
-                       }
-                       // Fill objpiles DS
-                       objnode->oid = oid;
-                       objnode->numoffset = numoffset;
-                       for(i = 0; i<numoffset; i++)
-                               offvalues[i] = *((short *)offset + i); 
-                       objnode->offset = offvalues;
-                       objnode->next = NULL;
-                       tmp->objpiles = objnode;
-                       tmp->next = head;
-                       head = tmp;
-               }
+      for(i=0;i<numoffset;i++) {
+       if (i>onumoffset) {
+         //We've matched, let's just extend the current prefetch
+         (*tmp)->numoffset=numoffset;
+         (*tmp)->offset=offset;
+         return;
        }
-       
-       return head;
+       if (ooffset[i]<offset[i]) {
+         goto oidloop;
+       } else if (ooffset[i]>offset[i]) {
+         //Place item before the current one
+         objnode = (objpile_t *) malloc(sizeof(objpile_t));
+         objnode->offset = offset;
+         objnode->oid = oid;
+         objnode->numoffset = numoffset;
+         objnode->next = *tmp;
+         *tmp = objnode;
+         return;
+       }
+      }
+      //if we get to the end, we're already covered by this prefetch
+      return;
+    oidloop:
+      ;
+    }
+  }
+  
+
 }
index 8add41b7c77644c59db4c21fd4d479361c90cb29..c32a02a86d7ded0a5d2f374ba236b6c56a53e361 100644 (file)
@@ -5,6 +5,6 @@
 #include <stdio.h>
 #include <stdlib.h>
 
-prefetchpile_t *insertPile(int, unsigned int, short, short *, prefetchpile_t *);
+void insertPile(int, unsigned int, short, short *, prefetchpile_t **);
 
 #endif
index fea54094aa4710e16595ed93459c3f804d9b81bf..a8e5d81f4e9dbd6745dd56cdf155c67b656857a4 100644 (file)
@@ -3,82 +3,76 @@
 mcpileq_t mcqueue; //Global queue
 
 void mcpileqInit(void) {
-       /* Initialize machine queue that containing prefetch oids and offset values  sorted by remote machineid */  
-       mcqueue.front = mcqueue.rear = NULL;
-       //Intiliaze and set machile pile queue's mutex attribute
-       pthread_mutexattr_init(&mcqueue.qlockattr);
-       pthread_mutexattr_settype(&mcqueue.qlockattr, PTHREAD_MUTEX_RECURSIVE_NP);
-       //pthread_mutex_init(&mcqueue.qlock, NULL); 
-       pthread_mutex_init(&mcqueue.qlock,&mcqueue.qlockattr); 
-       pthread_cond_init(&mcqueue.qcond, NULL); 
+  /* Initialize machine queue that containing prefetch oids and offset values  sorted by remote machineid */  
+  mcqueue.front = mcqueue.rear = NULL;
+  //Intiliaze and set machile pile queue's mutex attribute
+  pthread_mutexattr_init(&mcqueue.qlockattr);
+  pthread_mutexattr_settype(&mcqueue.qlockattr, PTHREAD_MUTEX_RECURSIVE_NP);
+  pthread_mutex_init(&mcqueue.qlock,&mcqueue.qlockattr); 
+  pthread_cond_init(&mcqueue.qcond, NULL); 
 }
 
 /* Insert to the rear of machine pile queue */
 void mcpileenqueue(prefetchpile_t *node, prefetchpile_t *tail) {
-       if(mcqueue.front == NULL && mcqueue.rear == NULL) {
-               mcqueue.front = node;
-               mcqueue.rear = tail;
-       } else {
-               mcqueue.rear->next = node;
-               mcqueue.rear = tail;
-       }
+  if(mcqueue.front == NULL) {
+    mcqueue.front = node;
+    mcqueue.rear = tail;
+  } else {
+    mcqueue.rear->next = node;
+    mcqueue.rear = tail;
+  }
 }
 
 /* Return the node pointed to by the front ptr of the queue */
 prefetchpile_t *mcpiledequeue(void) {
-       prefetchpile_t *retnode;
-       if(mcqueue.front == NULL) {
-               printf("Machine pile queue empty: Underflow %s %d\n", __FILE__, __LINE__);
-               return NULL;
-       }
-       retnode = mcqueue.front;
-       mcqueue.front = mcqueue.front->next;
-       if (mcqueue.front == NULL)
-               mcqueue.rear = NULL;
-       retnode->next = NULL;
-
-       return retnode;
+  prefetchpile_t *retnode=mcqueue.front;
+  if(retnode == NULL) {
+    printf("Machine pile queue empty: Underflow %s %d\n", __FILE__, __LINE__);
+    return NULL;
+  }
+  mcqueue.front = retnode->next;
+  if (mcqueue.front == NULL)
+    mcqueue.rear = NULL;
+  retnode->next = NULL;
+  
+  return retnode;
 }
 
 void mcpiledelete(void) {
-       /* Remove each element */
-       while(mcqueue.front != NULL)
-               delqnode();
-       mcqueue.front = mcqueue.rear = NULL;
+  /* Remove each element */
+  while(mcqueue.front != NULL)
+    delqnode();
 }
 
 
 void mcpiledisplay() {
-       int mid;
-
-       prefetchpile_t *tmp = mcqueue.front;
-       while(tmp != NULL) {
-               printf("Remote machine id = %d\n", tmp->mid);
-               tmp = tmp->next;
-       }
+  int mid;
+  
+  prefetchpile_t *tmp = mcqueue.front;
+  while(tmp != NULL) {
+    printf("Remote machine id = %d\n", tmp->mid);
+    tmp = tmp->next;
+  }
 }
 
 /* Delete prefetchpile_t and everything it points to */
 void mcdealloc(prefetchpile_t *node) {
-       prefetchpile_t *prefetchpile_ptr;
-       prefetchpile_t *prefetchpile_next_ptr;
-       objpile_t *objpile_ptr;
-       objpile_t *objpile_next_ptr;
-
-       prefetchpile_ptr = node;
-
-       while (prefetchpile_ptr != NULL)
-       {
-               prefetchpile_next_ptr = prefetchpile_ptr;
-               while(prefetchpile_ptr->objpiles != NULL) {
-                       if(prefetchpile_ptr->objpiles->numoffset > 0) {
-                               free(prefetchpile_ptr->objpiles->offset);
-                       }
-                       objpile_ptr = prefetchpile_ptr->objpiles;
-                       prefetchpile_ptr->objpiles = objpile_ptr->next;
-                       free(objpile_ptr);
-               }
-               prefetchpile_ptr = prefetchpile_next_ptr->next;
-               free(prefetchpile_next_ptr);
-       }
+  prefetchpile_t *prefetchpile_ptr;
+  prefetchpile_t *prefetchpile_next_ptr;
+  objpile_t *objpile_ptr;
+  objpile_t *objpile_next_ptr;
+  
+  prefetchpile_ptr = node;
+  
+  while (prefetchpile_ptr != NULL) {
+    prefetchpile_next_ptr = prefetchpile_ptr;
+    while(prefetchpile_ptr->objpiles != NULL) {
+      //offsets aren't owned by us, so we don't free them.
+      objpile_ptr = prefetchpile_ptr->objpiles;
+      prefetchpile_ptr->objpiles = objpile_ptr->next;
+      free(objpile_ptr);
+    }
+    prefetchpile_ptr = prefetchpile_next_ptr->next;
+    free(prefetchpile_next_ptr);
+  }
 }
index fa58eb0ff1b80645db85b681919e07262b0bbda2..5c4046c970bbef3ae302358a409b1b46fc9d4a24 100644 (file)
@@ -8,25 +8,25 @@
 
 //Structure to make machine groups when prefetching
 typedef struct objpile { 
-       unsigned int oid;
-       short numoffset;
-       short *offset;
-       struct objpile *next;
-}objpile_t;
+  unsigned int oid;
+  short numoffset;
+  short *offset;
+  struct objpile *next;
+} objpile_t;
 
 //Structure for prefetching tuples generated by the compiler
 typedef struct prefetchpile {
-       unsigned int mid;
-       objpile_t *objpiles;
-       struct prefetchpile *next;
-}prefetchpile_t;
+  unsigned int mid;
+  objpile_t *objpiles;
+  struct prefetchpile *next;
+} prefetchpile_t;
 
 typedef struct mcpileq {
-       prefetchpile_t *front, *rear;
-       pthread_mutex_t qlock;
-       pthread_mutexattr_t qlockattr;
-       pthread_cond_t qcond;
-}mcpileq_t;
+  prefetchpile_t *front, *rear;
+  pthread_mutex_t qlock;
+  pthread_mutexattr_t qlockattr;
+  pthread_cond_t qcond;
+} mcpileq_t;
 
 void mcpileqInit(void);
 void mcpileenqueue(prefetchpile_t *, prefetchpile_t *);
index 20004396c34c7814cb04e835fce9e5b9d5e469e7..2589deb82532c94282277e5a255a8a5b448deb11 100644 (file)
@@ -1,12 +1,11 @@
 #include "dstm.h"
 
-objstr_t *objstrCreate(unsigned int size)
-{
-       objstr_t *tmp = calloc(1, (sizeof(objstr_t) + size));
-       tmp->size = size;
-       tmp->next = NULL;
-       tmp->top = tmp + 1; //points to end of objstr_t structure!
-       return tmp;
+objstr_t *objstrCreate(unsigned int size) {
+  objstr_t *tmp = calloc(1, (sizeof(objstr_t) + size));
+  tmp->size = size;
+  tmp->next = NULL;
+  tmp->top = tmp + 1; //points to end of objstr_t structure!
+  return tmp;
 }
 
 //free entire list, starting at store
index 26e1bfc460291c1c1d50f73b1fbeb851aa86ac41..1056959b4dd51aaac76242b2236750850059c1a0 100644 (file)
 primarypfq_t pqueue; //Global queue
 
 void queueInit(void) {
-       /* Intitialize primary queue */
-       pqueue.front = pqueue.rear = NULL;
-       pthread_mutexattr_init(&pqueue.qlockattr);
-       pthread_mutexattr_settype(&pqueue.qlockattr, PTHREAD_MUTEX_RECURSIVE_NP);
-       pthread_mutex_init(&pqueue.qlock, &pqueue.qlockattr);
-       pthread_cond_init(&pqueue.qcond, NULL);
+  /* Intitialize primary queue */
+  pqueue.front = pqueue.rear = NULL;
+  pthread_mutexattr_init(&pqueue.qlockattr);
+  pthread_mutexattr_settype(&pqueue.qlockattr, PTHREAD_MUTEX_RECURSIVE_NP);
+  pthread_mutex_init(&pqueue.qlock, &pqueue.qlockattr);
+  pthread_cond_init(&pqueue.qcond, NULL);
 }
 
 /* Delete the node pointed to by the front ptr of the queue */
 void delqnode() {
-       prefetchqelem_t *delnode;
-       if((pqueue.front == NULL) && (pqueue.rear == NULL)) {
-               printf("The queue is empty: UNDERFLOW %s, %d\n", __FILE__, __LINE__);
-               return;
-       } else if ((pqueue.front == pqueue.rear) && pqueue.front != NULL && pqueue.rear != NULL) {
-               free(pqueue.front);
-               pqueue.front = pqueue.rear = NULL;
-       } else {
-               delnode = pqueue.front;
-               pqueue.front = pqueue.front->next;
-               free(delnode);
-       }
+  prefetchqelem_t *delnode;
+  if(pqueue.front == NULL) {
+    printf("The queue is empty: UNDERFLOW %s, %d\n", __FILE__, __LINE__);
+    return;
+  } else if (pqueue.front == pqueue.rear) {
+    free(pqueue.front);
+    pqueue.front = pqueue.rear = NULL;
+  } else {
+    delnode = pqueue.front;
+    pqueue.front = pqueue.front->next;
+    free(delnode);
+  }
 }
 
 void queueDelete(void) {
-       /* Remove each element */
-       while(pqueue.front != NULL)
-               delqnode();
-       pqueue.front = pqueue.rear = NULL;
+  /* Remove each element */
+  while(pqueue.front != NULL)
+    delqnode();
 }
 
 /* Inserts to the rear of primary prefetch queue */
 void pre_enqueue(prefetchqelem_t *qnode) {
-       if(pqueue.front == NULL && pqueue.rear == NULL) {
-               pqueue.front = pqueue.rear = qnode;
-       } else {
-               qnode->next = NULL;
-               pqueue.rear->next = qnode;
-               pqueue.rear = qnode;
-       }
+  if(pqueue.front == NULL) {
+    pqueue.front = pqueue.rear = qnode;
+    qnode->next=NULL;
+  } else {
+    qnode->next = NULL;
+    pqueue.rear->next = qnode;
+    pqueue.rear = qnode;
+  }
 }
 
 /* Return the node pointed to by the front ptr of the queue */
 prefetchqelem_t *pre_dequeue(void) {
-       prefetchqelem_t *retnode;
-       if (pqueue.front == NULL) {
-               printf("Queue empty: Underflow %s, %d\n", __FILE__, __LINE__);
-               if(pqueue.rear != NULL) {
-                       printf("pqueue.front points to invalid location %s, %d\n", __FILE__, __LINE__);
-               }
-               return NULL;
-       }
-       retnode = pqueue.front;
-       pqueue.front = pqueue.front->next;
-       if (pqueue.front == NULL)
-               pqueue.rear = NULL;
-       retnode->next = NULL;
-
-       return retnode;
+  prefetchqelem_t *retnode;
+  if (pqueue.front == NULL) {
+    printf("Queue empty: Underflow %s, %d\n", __FILE__, __LINE__);
+    return NULL;
+  }
+  retnode = pqueue.front;
+  pqueue.front = pqueue.front->next;
+  if (pqueue.front == NULL)
+    pqueue.rear = NULL;
+  retnode->next = NULL;
+  
+  return retnode;
 }
 
 void queueDisplay() {
-       int offset = sizeof(prefetchqelem_t);
-       int *ptr;
-       int ntuples;
-       char *ptr1;
-       prefetchqelem_t *tmp = pqueue.front;
-       while(tmp != NULL) {
-               ptr1 = (char *) tmp;
-               ptr = (int *)(ptr1 + offset);
-               ntuples = *ptr;
-               tmp = tmp->next;
-       }
+  int offset = sizeof(prefetchqelem_t);
+  int *ptr;
+  int ntuples;
+  char *ptr1;
+  prefetchqelem_t *tmp = pqueue.front;
+  while(tmp != NULL) {
+    ptr1 = (char *) tmp;
+    ptr = (int *)(ptr1 + offset);
+    ntuples = *ptr;
+    tmp = tmp->next;
+  }
 }
 
 void predealloc(prefetchqelem_t *node) {
-       free(node);
+  free(node);
 }
 
-
-#if 0
-main() {
-       unsigned int oids[] = {11, 13};
-       short endoffsets[] = {2, 5};
-       short arrayfields[] = {2, 2, 1, 5, 6};
-       queueInit();
-       queueDisplay();
-       prefetch(2, oids, endoffsets, arrayfields);
-       queueDisplay();
-       unsigned int oids1[] = {21, 23, 25, 27};
-       short endoffsets1[] = {1, 2, 3, 4};
-       short arrayfields1[] = {3, 2, 1, 3};
-       prefetch(4, oids1, endoffsets1, arrayfields1);
-       queueDisplay();
-       delqnode();
-       queueDisplay();
-       delqnode();
-       queueDisplay();
-       delqnode();
-       queueDisplay();
-       delqnode();
-
-}
-
-#endif
-
-
index a04f3135cd24578025ae35dfc65ddcf6ff7be4bb..9ff506e47c723e26f7bab589ac399e6fa09216d4 100644 (file)
@@ -6,257 +6,215 @@ notifyhashtable_t nlookup; //Global hash table
  * for an update notification from a particular object.
  * This takes in the head of the linked list and inserts the new node to it */
 threadlist_t *insNode(threadlist_t *head, unsigned int threadid, unsigned int mid) {
-       threadlist_t *ptr;
-       if(head == NULL) {
-               if((head = calloc(1, sizeof(threadlist_t))) == NULL) {
-                       printf("Calloc Error %s, %d,\n", __FILE__, __LINE__);
-                       return;
-               }
-               head->threadid = threadid;
-               head->mid = mid;
-               head->next = NULL;
-       } else {
-               if((ptr = calloc(1, sizeof(threadlist_t))) == NULL) {
-                       printf("Calloc Error %s, %d,\n", __FILE__, __LINE__);
-                       return;
-               }
-               ptr->threadid = threadid;
-               ptr->mid = mid;
-               ptr->next = head;
-               head = ptr;
-       }
-
-       return head;
+  threadlist_t *ptr;
+  if(head == NULL) {
+    head = malloc(sizeof(threadlist_t));
+    head->threadid = threadid;
+    head->mid = mid;
+    head->next = NULL;
+  } else {
+    ptr = malloc(sizeof(threadlist_t));
+    ptr->threadid = threadid;
+    ptr->mid = mid;
+    ptr->next = head;
+    head = ptr;
+  }
+  return head;
 }
 
 /* This function displays the linked list of threads waiting on update notification
  * from an object */
 void display(threadlist_t *head) {
-       threadlist_t *ptr;
-       if(head == NULL) {
-               printf("No thread is waiting\n");
-               return;
-       } else {
-               while(head != NULL) {
-                       ptr = head;
-                       printf("The threadid waiting is = %d\n", ptr->threadid);
-                       printf("The mid on which thread present = %d\n", ptr->mid);
-                       head = ptr->next;
-               }
-       }
+  threadlist_t *ptr;
+  if(head == NULL) {
+    printf("No thread is waiting\n");
+    return;
+  } else {
+    while(head != NULL) {
+      ptr = head;
+      printf("The threadid waiting is = %d\n", ptr->threadid);
+      printf("The mid on which thread present = %d\n", ptr->mid);
+      head = ptr->next;
+    }
+  }
 }
 
 /* This function creates a new hash table that stores a mapping between the threadid and
  * a pointer to the thread notify data */
 unsigned int notifyhashCreate(unsigned int size, float loadfactor) { 
-       notifylistnode_t *nodes;
-
-       // Allocate space for the hash table 
-       if((nodes = calloc(size, sizeof(notifylistnode_t))) == NULL) {
-               printf("Calloc error %s %d\n", __FILE__, __LINE__);
-               return 1;
-       }
-
-       nlookup.table = nodes;
-       nlookup.size = size;
-       nlookup.numelements = 0; // Initial number of elements in the hash
-       nlookup.loadfactor = loadfactor;
-       //Initialize the pthread_mutex variable         
-       pthread_mutex_init(&nlookup.locktable, NULL);
-       return 0;
+  notifylistnode_t *nodes = calloc(size, sizeof(notifylistnode_t));
+  nlookup.table = nodes;
+  nlookup.size = size;
+  nlookup.numelements = 0; // Initial number of elements in the hash
+  nlookup.loadfactor = loadfactor;
+  //Initialize the pthread_mutex variable      
+  pthread_mutex_init(&nlookup.locktable, NULL);
+  return 0;
 }
 
 // Assign to tids to bins inside hash table
 unsigned int notifyhashFunction(unsigned int tid) {
-       return( tid % (nlookup.size));
+  return( tid % (nlookup.size));
 }
 
 // Insert pointer to the notify data and threadid mapping into the hash table
 unsigned int notifyhashInsert(unsigned int tid, notifydata_t *ndata) {
-       unsigned int newsize;
-       int index;
-       notifylistnode_t *ptr, *node, *tmp;
-       int isFound = 0;
-
-       if (nlookup.numelements > (nlookup.loadfactor * nlookup.size)) {
-               //Resize Table
-               newsize = 2 * nlookup.size + 1;         
-               pthread_mutex_lock(&nlookup.locktable);
-               notifyhashResize(newsize);
-               pthread_mutex_unlock(&nlookup.locktable);
-       }
-       /*
-       ptr = nlookup.table;
-       nlookup.numelements++;
-
-       index = notifyhashFunction(tid);
-#ifdef DEBUG
-       printf("DEBUG -> index = %d, threadid = %d\n", index, tid);
-#endif
-       pthread_mutex_lock(&nlookup.locktable);
-       if(ptr[index].next == NULL && ptr[index].threadid == 0) {       // Insert at the first position in the hashtable
-               ptr[index].threadid = tid;
-               ptr[index].ndata = ndata;
-       } else {                        // Insert in the beginning of linked list
-               if ((node = calloc(1, sizeof(notifylistnode_t))) == NULL) {
-                       printf("Calloc error %s, %d\n", __FILE__, __LINE__);
-                       pthread_mutex_unlock(&nlookup.locktable);
-                       return 1;
-               }
-               node->threadid = tid;
-               node->ndata = ndata;
-               node->next = ptr[index].next;
-               ptr[index].next = node;
-       }
-       pthread_mutex_unlock(&nlookup.locktable);
-       */
-       ptr = nlookup.table;
-       index = notifyhashFunction(tid);
-       pthread_mutex_lock(&nlookup.locktable);
-       if(ptr[index].next == NULL && ptr[index].threadid == 0) {       // Insert at the first position in the hashtable
-               ptr[index].threadid = tid;
-               ptr[index].ndata = ndata;
-       } else {
-               tmp = &ptr[index];
-               while(tmp != NULL) {
-                       if(tmp->threadid == tid) {
-                               isFound = 1;
-                               tmp->ndata = ndata;
-                       }
-                       tmp = tmp->next;
-               }
-               if(!isFound) {
-                       if ((node = calloc(1, sizeof(notifylistnode_t))) == NULL) {
-                               printf("Calloc error %s, %d\n", __FILE__, __LINE__);
-                               pthread_mutex_unlock(&nlookup.locktable);
-                               return 1;
-                       }
-                       node->threadid = tid;
-                       node->ndata = ndata;
-                       node->next = ptr[index].next;
-                       ptr[index].next = node;
-               }
-       }
+  unsigned int newsize;
+  int index;
+  notifylistnode_t *ptr, *node, *tmp;
+  int isFound = 0;
+  
+  if (nlookup.numelements > (nlookup.loadfactor * nlookup.size)) {
+    //Resize Table
+    newsize = 2 * nlookup.size + 1;            
+    pthread_mutex_lock(&nlookup.locktable);
+    notifyhashResize(newsize);
+    pthread_mutex_unlock(&nlookup.locktable);
+  }
+  ptr = nlookup.table;
+  index = notifyhashFunction(tid);
+  pthread_mutex_lock(&nlookup.locktable);
+  if(ptr[index].next == NULL && ptr[index].threadid == 0) {
+    // Insert at the first position in the hashtable
+    ptr[index].threadid = tid;
+    ptr[index].ndata = ndata;
+  } else {
+    tmp = &ptr[index];
+    while(tmp != NULL) {
+      if(tmp->threadid == tid) {
+       isFound = 1;
+       tmp->ndata = ndata;
+      }
+      tmp = tmp->next;
+    }
+    if(!isFound) {
+      if ((node = calloc(1, sizeof(notifylistnode_t))) == NULL) {
+       printf("Calloc error %s, %d\n", __FILE__, __LINE__);
        pthread_mutex_unlock(&nlookup.locktable);
-
-       return 0;
+       return 1;
+      }
+      node->threadid = tid;
+      node->ndata = ndata;
+      node->next = ptr[index].next;
+      ptr[index].next = node;
+    }
+  }
+  pthread_mutex_unlock(&nlookup.locktable);
+  
+  return 0;
 }
 
 // Return pointer to thread notify data for a given threadid in the hash table
 notifydata_t  *notifyhashSearch(unsigned int tid) {
-       int index;
-       notifylistnode_t *ptr, *node;
-
-       ptr = nlookup.table;    // Address of the beginning of hash table       
-       index = notifyhashFunction(tid);
-       node = &ptr[index];
-       pthread_mutex_lock(&nlookup.locktable);
-       while(node != NULL) {
-               if(node->threadid == tid) {
-                       pthread_mutex_unlock(&nlookup.locktable);
-                       return node->ndata;
-               }
-               node = node->next;
-       }
-       pthread_mutex_unlock(&nlookup.locktable);
-       return NULL;
+  // Address of the beginning of hash table    
+  notifylistnode_t *ptr = nlookup.table;       
+  int index = notifyhashFunction(tid);
+  pthread_mutex_lock(&nlookup.locktable);
+  notifylistnode_t * node = &ptr[index];
+  while(node != NULL) {
+    if(node->threadid == tid) {
+      pthread_mutex_unlock(&nlookup.locktable);
+      return node->ndata;
+    }
+    node = node->next;
+  }
+  pthread_mutex_unlock(&nlookup.locktable);
+  return NULL;
 }
 
 // Remove an entry from the hash table
 unsigned int notifyhashRemove(unsigned int tid) {
-       int index;
-       notifylistnode_t *curr, *prev, *ptr, *node;
-
-       ptr = nlookup.table;
-       index = notifyhashFunction(tid);
-       curr = &ptr[index];
-
-       pthread_mutex_lock(&nlookup.locktable);
-       for (; curr != NULL; curr = curr->next) {
-               if (curr->threadid == tid) {         // Find a match in the hash table
-                       nlookup.numelements--;  // Decrement the number of elements in the global hashtable
-                       if ((curr == &ptr[index]) && (curr->next == NULL))  { // Delete the first item inside the hashtable with no linked list of notifylistnode_t 
-                               curr->threadid = 0;
-                               curr->ndata = NULL;
-                       } else if ((curr == &ptr[index]) && (curr->next != NULL)) { //Delete the first bin item with a linked list of notifylistnode_t  connected 
-                               curr->threadid = curr->next->threadid;
-                               curr->ndata = curr->next->ndata;
-                               node = curr->next;
-                               curr->next = curr->next->next;
-                               free(node);
-                       } else {                                                // Regular delete from linked listed    
-                               prev->next = curr->next;
-                               free(curr);
-                       }
-                       pthread_mutex_unlock(&nlookup.locktable);
-                       return 0;
-               }       
-               prev = curr; 
-       }
-       pthread_mutex_unlock(&nlookup.locktable);
-       return 1;
+  notifylistnode_t *curr, *prev, *node;
+  
+  notifylistnode_t *ptr = nlookup.table;
+  int index = notifyhashFunction(tid);
+  
+  pthread_mutex_lock(&nlookup.locktable);
+  for (curr = &ptr[index]; curr != NULL; curr = curr->next) {
+    if (curr->threadid == tid) {         // Find a match in the hash table
+      nlookup.numelements--;  // Decrement the number of elements in the global hashtable
+      if ((curr == &ptr[index]) && (curr->next == NULL))  { // Delete the first item inside the hashtable with no linked list of notifylistnode_t 
+       curr->threadid = 0;
+       curr->ndata = NULL;
+      } else if ((curr == &ptr[index]) && (curr->next != NULL)) { //Delete the first bin item with a linked list of notifylistnode_t  connected 
+       curr->threadid = curr->next->threadid;
+       curr->ndata = curr->next->ndata;
+       node = curr->next;
+       curr->next = curr->next->next;
+       free(node);
+      } else {                                         // Regular delete from linked listed    
+       prev->next = curr->next;
+       free(curr);
+      }
+      pthread_mutex_unlock(&nlookup.locktable);
+      return 0;
+    }       
+    prev = curr; 
+  }
+  pthread_mutex_unlock(&nlookup.locktable);
+  return 1;
 }
 
 // Resize table
 unsigned int notifyhashResize(unsigned int newsize) {
-       notifylistnode_t *node, *ptr, *curr, *next;     // curr and next keep track of the current and the next notifyhashlistnodes in a linked list
-       unsigned int oldsize;
-       int isfirst;    // Keeps track of the first element in the notifylistnode_t for each bin in hashtable
-       int i,index;    
-       notifylistnode_t *newnode;              
-
-       ptr = nlookup.table;
-       oldsize = nlookup.size;
-
-       if((node = calloc(newsize, sizeof(notifylistnode_t))) == NULL) {
-               printf("Calloc error %s %d\n", __FILE__, __LINE__);
-               return 1;
-       }
-
-       nlookup.table = node;           //Update the global hashtable upon resize()
-       nlookup.size = newsize;
-       nlookup.numelements = 0;
-
-       for(i = 0; i < oldsize; i++) {                  //Outer loop for each bin in hash table
-               curr = &ptr[i];
-               isfirst = 1;                    
-               while (curr != NULL) {                  //Inner loop to go through linked lists
-                       if (curr->threadid == 0) {              //Exit inner loop if there the first element for a given bin/index is NULL
-                               break;                  //threadid = threadcond =0 for element if not present within the hash table
-                       }
-                       next = curr->next;
-                       index = notifyhashFunction(curr->threadid);
+  notifylistnode_t *node, *ptr, *curr, *next;  // curr and next keep track of the current and the next notifyhashlistnodes in a linked list
+  unsigned int oldsize;
+  int isfirst;    // Keeps track of the first element in the notifylistnode_t for each bin in hashtable
+  int i,index;         
+  notifylistnode_t *newnode;           
+
+  ptr = nlookup.table;
+  oldsize = nlookup.size;
+  
+  if((node = calloc(newsize, sizeof(notifylistnode_t))) == NULL) {
+    printf("Calloc error %s %d\n", __FILE__, __LINE__);
+    return 1;
+  }
+  
+  nlookup.table = node;                //Update the global hashtable upon resize()
+  nlookup.size = newsize;
+  nlookup.numelements = 0;
+  
+  for(i = 0; i < oldsize; i++) {                       //Outer loop for each bin in hash table
+    curr = &ptr[i];
+    isfirst = 1;                       
+    while (curr != NULL) {                     //Inner loop to go through linked lists
+      if (curr->threadid == 0) {               //Exit inner loop if there the first element for a given bin/index is NULL
+       break;                  //threadid = threadcond =0 for element if not present within the hash table
+      }
+      next = curr->next;
+      index = notifyhashFunction(curr->threadid);
 #ifdef DEBUG
-                       printf("DEBUG(resize) -> index = %d, threadid = %d\n", index, curr->threadid);
+      printf("DEBUG(resize) -> index = %d, threadid = %d\n", index, curr->threadid);
 #endif
-                       // Insert into the new table
-                       if(nlookup.table[index].next == NULL && nlookup.table[index].threadid == 0) { 
-                               nlookup.table[index].threadid = curr->threadid;
-                               nlookup.table[index].ndata = curr->ndata;
-                               nlookup.numelements++;
-                       }else { 
-                               if((newnode = calloc(1, sizeof(notifylistnode_t))) == NULL) { 
-                                       printf("Calloc error %s, %d\n", __FILE__, __LINE__);
-                                       return 1;
-                               }       
-                               newnode->threadid = curr->threadid;
-                               newnode->ndata = curr->ndata;
-                               newnode->next = nlookup.table[index].next;
-                               nlookup.table[index].next = newnode;    
-                               nlookup.numelements++;
-                       }       
-
-                       //free the linked list of notifylistnode_t if not the first element in the hash table
-                       if (isfirst != 1) {
-                               free(curr);
-                       
-
-                       isfirst = 0;
-                       curr = next;
-               }
-       }
-
-       free(ptr);              //Free the memory of the old hash table 
-       ptr = NULL;
-       return 0;
+      // Insert into the new table
+      if(nlookup.table[index].next == NULL && nlookup.table[index].threadid == 0) { 
+       nlookup.table[index].threadid = curr->threadid;
+       nlookup.table[index].ndata = curr->ndata;
+       nlookup.numelements++;
+      }else { 
+       if((newnode = calloc(1, sizeof(notifylistnode_t))) == NULL) { 
+         printf("Calloc error %s, %d\n", __FILE__, __LINE__);
+         return 1;
+       }       
+       newnode->threadid = curr->threadid;
+       newnode->ndata = curr->ndata;
+       newnode->next = nlookup.table[index].next;
+       nlookup.table[index].next = newnode;    
+       nlookup.numelements++;
+      }       
+      
+      //free the linked list of notifylistnode_t if not the first element in the hash table
+      if (isfirst != 1) {
+       free(curr);
+      } 
+      
+      isfirst = 0;
+      curr = next;
+    }
+  }
+  
+  free(ptr);           //Free the memory of the old hash table 
+  ptr = NULL;
+  return 0;
 }
index 155a622271d64c4d320ac418d724d9bdab46620f..046c83f9ef56431d9c5367403b5a31320c2c525e 100644 (file)
@@ -46,33 +46,33 @@ plistnode_t *createPiles(transrecord_t *);
  * Send and Recv function calls 
  *******************************/
 void send_data(int fd , void *buf, int buflen) {
-       char *buffer = (char *)(buf); 
-       int size = buflen;
-       int numbytes; 
-       while (size > 0) {
-               numbytes = send(fd, buffer, size, MSG_NOSIGNAL);
-               if (numbytes == -1) {
-                       perror("send");
-                       exit(-1);
-               }
-               buffer += numbytes;
-               size -= numbytes;
-       }
+  char *buffer = (char *)(buf); 
+  int size = buflen;
+  int numbytes; 
+  while (size > 0) {
+    numbytes = send(fd, buffer, size, MSG_NOSIGNAL);
+    if (numbytes == -1) {
+      perror("send");
+      exit(-1);
+    }
+    buffer += numbytes;
+    size -= numbytes;
+  }
 }
 
 void recv_data(int fd , void *buf, int buflen) {
-       char *buffer = (char *)(buf); 
-       int size = buflen;
-       int numbytes; 
-       while (size > 0) {
-               numbytes = recv(fd, buffer, size, 0);
-               if (numbytes == -1) {
-                       perror("recv");
-                       exit(-1);
-               }
-               buffer += numbytes;
-               size -= numbytes;
-       }
+  char *buffer = (char *)(buf); 
+  int size = buflen;
+  int numbytes; 
+  while (size > 0) {
+    numbytes = recv(fd, buffer, size, 0);
+    if (numbytes == -1) {
+      perror("recv");
+      exit(-1);
+    }
+    buffer += numbytes;
+    size -= numbytes;
+  }
 }
 
 int recv_data_errorcode(int fd , void *buf, int buflen) {
@@ -90,75 +90,57 @@ int recv_data_errorcode(int fd , void *buf, int buflen) {
   return 0;
 }
 
-void printhex(unsigned char *ptr, int numBytes)
-{
-       int i;
-       for (i = 0; i < numBytes; i++)
-       {
-               if (ptr[i] < 16)
-                       printf("0%x ", ptr[i]);
-               else
-                       printf("%x ", ptr[i]);
-       }
-       printf("\n");
-       return;
+void printhex(unsigned char *ptr, int numBytes) {
+  int i;
+  for (i = 0; i < numBytes; i++) {
+    if (ptr[i] < 16)
+      printf("0%x ", ptr[i]);
+    else
+      printf("%x ", ptr[i]);
+  }
+  printf("\n");
+  return;
 }
 
 inline int arrayLength(int *array) {
-       int i;
-       for(i=0 ;array[i] != -1; i++)
-               ;
-       return i;
+  int i;
+  for(i=0 ;array[i] != -1; i++)
+    ;
+  return i;
 }
+
 inline int findmax(int *array, int arraylength) {
-       int max, i;
-       max = array[0];
-       for(i = 0; i < arraylength; i++){
-               if(array[i] > max) {
-                       max = array[i];
-               }
-       }
-       return max;
+  int max, i;
+  max = array[0];
+  for(i = 0; i < arraylength; i++){
+    if(array[i] > max) {
+      max = array[i];
+    }
+  }
+  return max;
 }
+
 /* This function is a prefetch call generated by the compiler that
  * populates the shared primary prefetch queue*/
 void prefetch(int ntuples, unsigned int *oids, unsigned short *endoffsets, short *arrayfields) {
   /* Allocate for the queue node*/
-  if(ntuples > 0) {
-    int qnodesize = sizeof(prefetchqelem_t) + sizeof(int) + ntuples * (sizeof(unsigned short) + sizeof(unsigned int)) + endoffsets[ntuples - 1] * sizeof(short);
-    char * node;
-    
-    if((node = calloc(1, qnodesize)) == NULL) {
-      printf("Calloc Error %s, %d\n", __FILE__, __LINE__);
-      return;
-    } else {
-      /* Set queue node values */
-      int len = sizeof(prefetchqelem_t);
-      int i;
-      unsigned int *narray;
-      unsigned short *narray2;
-      short * narray3;
-      int top=endoffsets[ntuples-1];
-      *((int *)(node+len))=ntuples;
-      len += sizeof(int);
-      narray=(unsigned int *)(node+len);
-      narray2=(unsigned short *)(narray+ntuples);
-      narray3=(short *)(narray2+ntuples);
-      
-      for(i=0;i<ntuples;i++) {
-       narray[i]=oids[i];
-       narray2[i]=endoffsets[i];
-      }
-      for(i=0;i<top;i++) {
-       narray3[i]=arrayfields[i];
-      }
-      /* Lock and insert into primary prefetch queue */
-      pthread_mutex_lock(&pqueue.qlock);
-      pre_enqueue((prefetchqelem_t *)node);
-      pthread_cond_signal(&pqueue.qcond);
-      pthread_mutex_unlock(&pqueue.qlock);
-    }
-  }
+  int qnodesize = sizeof(prefetchqelem_t) + sizeof(int) + ntuples * (sizeof(unsigned short) + sizeof(unsigned int)) + endoffsets[ntuples - 1] * sizeof(short);
+  char * node= malloc(qnodesize);
+  /* Set queue node values */
+  int len = sizeof(prefetchqelem_t);
+  int top=endoffsets[ntuples-1];
+  *((int *)(node+len))=ntuples;
+  len += sizeof(int);
+  
+  memcpy(node+len, oids, ntuples*sizeof(unsigned int));
+  memcpy(node+len+ntuples*sizeof(unsigned int), endoffsets, ntuples*sizeof(unsigned short));
+  memcpy(node+len+ntuples*(sizeof(unsigned int)+sizeof(short)), arrayfields, top*sizeof(short));
+
+  /* Lock and insert into primary prefetch queue */
+  pthread_mutex_lock(&pqueue.qlock);
+  pre_enqueue((prefetchqelem_t *)node);
+  pthread_cond_signal(&pqueue.qcond);
+  pthread_mutex_unlock(&pqueue.qlock);
 }
 
 /* This function starts up the transaction runtime. */
@@ -194,97 +176,96 @@ int dstmStartup(const char * option) {
 
 //TODO Use this later
 void *pCacheAlloc(objstr_t *store, unsigned int size) {
-       void *tmp;
-       objstr_t *ptr;
-       ptr = store;
-       int success = 0;
-
-       while(ptr->next != NULL) {
-               /* check if store is empty */
-               if(((unsigned int)ptr->top - (unsigned int)ptr - sizeof(objstr_t) + size) <= ptr->size) {
-                       tmp = ptr->top;
-                       ptr->top += size;
-                       success = 1;
-                       return tmp;
-               } else {
-                       ptr = ptr-> next;
-               }
-       }
-
-       if(success == 0) {
-               return NULL;
-       }
+  void *tmp;
+  objstr_t *ptr;
+  ptr = store;
+  int success = 0;
+  
+  while(ptr->next != NULL) {
+    /* check if store is empty */
+    if(((unsigned int)ptr->top - (unsigned int)ptr - sizeof(objstr_t) + size) <= ptr->size) {
+      tmp = ptr->top;
+      ptr->top += size;
+      success = 1;
+      return tmp;
+    } else {
+      ptr = ptr-> next;
+    }
+  }
+  
+  if(success == 0) {
+    return NULL;
+  }
 }
 
-/* This function initiates the prefetch thread
- * A queue is shared between the main thread of execution
- * and the prefetch thread to process the prefetch call
- * Call from compiler populates the shared queue with prefetch requests while prefetch thread
- * processes the prefetch requests */
-void transInit() {
-       int t, rc;
-       int retval;
-       //Create and initialize prefetch cache structure
-       prefetchcache = objstrCreate(PREFETCH_CACHE_SIZE);
-       //prefetchcache->next = objstrCreate(PREFETCH_CACHE_SIZE);
-       //prefetchcache->next->next = objstrCreate(PREFETCH_CACHE_SIZE);
-
-       /* Initialize attributes for mutex */
-       pthread_mutexattr_init(&prefetchcache_mutex_attr);
-       pthread_mutexattr_settype(&prefetchcache_mutex_attr, PTHREAD_MUTEX_RECURSIVE_NP);
-       
-       pthread_mutex_init(&prefetchcache_mutex, &prefetchcache_mutex_attr);
-
-       //Create prefetch cache lookup table
-       if(prehashCreate(HASH_SIZE, LOADFACTOR))
-               return; //Failure
-
-       //Initialize primary shared queue
-       queueInit();
-       //Initialize machine pile w/prefetch oids and offsets shared queue
-       mcpileqInit();
+/* This function initiates the prefetch thread A queue is shared
+ * between the main thread of execution and the prefetch thread to
+ * process the prefetch call Call from compiler populates the shared
+ * queue with prefetch requests while prefetch thread processes the
+ * prefetch requests */
 
-       //Create the primary prefetch thread 
-       do {
-         retval=pthread_create(&tPrefetch, NULL, transPrefetch, NULL);
-       } while(retval!=0);
-       pthread_detach(tPrefetch);
+void transInit() {
+  int t, rc;
+  int retval;
+  //Create and initialize prefetch cache structure
+  prefetchcache = objstrCreate(PREFETCH_CACHE_SIZE);
+  
+  /* Initialize attributes for mutex */
+  pthread_mutexattr_init(&prefetchcache_mutex_attr);
+  pthread_mutexattr_settype(&prefetchcache_mutex_attr, PTHREAD_MUTEX_RECURSIVE_NP);
+  
+  pthread_mutex_init(&prefetchcache_mutex, &prefetchcache_mutex_attr);
+  
+  //Create prefetch cache lookup table
+  if(prehashCreate(HASH_SIZE, LOADFACTOR)) {
+    printf("ERROR\n");
+    return; //Failure
+  }
+  
+  //Initialize primary shared queue
+  queueInit();
+  //Initialize machine pile w/prefetch oids and offsets shared queue
+  mcpileqInit();
+  
+  //Create the primary prefetch thread 
+  do {
+    retval=pthread_create(&tPrefetch, NULL, transPrefetch, NULL);
+  } while(retval!=0);
+  pthread_detach(tPrefetch);
 }
 
 /* This function stops the threads spawned */
 void transExit() {
-       int t;
-       pthread_cancel(tPrefetch);
-       for(t = 0; t < NUM_THREADS; t++)
-               pthread_cancel(wthreads[t]);
-
-       return;
+  int t;
+  pthread_cancel(tPrefetch);
+  for(t = 0; t < NUM_THREADS; t++)
+    pthread_cancel(wthreads[t]);
+  
+  return;
 }
 
 /* This functions inserts randowm wait delays in the order of msec
  * Mostly used when transaction commits retry*/
-void randomdelay()
-{
-       struct timespec req;
-       time_t t;
-
-       t = time(NULL);
-       req.tv_sec = 0;
-       req.tv_nsec = (long)(1000000 + (t%10000000)); //1-11 msec
-       nanosleep(&req, NULL);
-       return;
+void randomdelay() {
+  struct timespec req;
+  time_t t;
+  
+  t = time(NULL);
+  req.tv_sec = 0;
+  req.tv_nsec = (long)(1000000 + (t%10000000)); //1-11 msec
+  nanosleep(&req, NULL);
+  return;
 }
 
 /* This function initializes things required in the transaction start*/
-transrecord_t *transStart()
-{
-       transrecord_t *tmp = calloc(1, sizeof(transrecord_t));
-       tmp->cache = objstrCreate(1048576);
-       tmp->lookupTable = chashCreate(HASH_SIZE, LOADFACTOR);
+transrecord_t *transStart() {
+  transrecord_t *tmp = calloc(1, sizeof(transrecord_t));
+  tmp->cache = objstrCreate(1048576);
+  tmp->lookupTable = chashCreate(HASH_SIZE, LOADFACTOR);
 #ifdef COMPILER
-       tmp->revertlist=NULL;
+  tmp->revertlist=NULL;
 #endif
-       return tmp;
+  return tmp;
 }
 
 /* This function finds the location of the objects involved in a transaction
@@ -301,8 +282,8 @@ objheader_t *transRead(transrecord_t *record, unsigned int oid) {
     return NULL;
   }
   
-  /* Search local transaction cache */
   if((objheader = (objheader_t *)chashSearch(record->lookupTable, oid)) != NULL){
+    /* Search local transaction cache */
 #ifdef COMPILER
     return &objheader[1];
 #else
@@ -321,7 +302,8 @@ objheader_t *transRead(transrecord_t *record, unsigned int oid) {
 #else
     return objcopy;
 #endif
-  } else if((tmp = (objheader_t *) prehashSearch(oid)) != NULL) { /* Look up in prefetch cache */
+  } else if((tmp = (objheader_t *) prehashSearch(oid)) != NULL) { 
+    /* Look up in prefetch cache */
     GETSIZE(size, tmp);
     size+=sizeof(objheader_t);
     objcopy = (objheader_t *) objstrAlloc(record->cache, size);
@@ -356,70 +338,60 @@ objheader_t *transRead(transrecord_t *record, unsigned int oid) {
 }
 
 /* This function creates objects in the transaction record */
-objheader_t *transCreateObj(transrecord_t *record, unsigned int size)
-{
-       objheader_t *tmp = (objheader_t *) objstrAlloc(record->cache, (sizeof(objheader_t) + size));
-       tmp->notifylist = NULL;
-       OID(tmp) = getNewOID();
-       tmp->version = 1;
-       tmp->rcount = 1;
-       STATUS(tmp) = NEW;
-       chashInsert(record->lookupTable, OID(tmp), tmp);
-
+objheader_t *transCreateObj(transrecord_t *record, unsigned int size) {
+  objheader_t *tmp = (objheader_t *) objstrAlloc(record->cache, (sizeof(objheader_t) + size));
+  tmp->notifylist = NULL;
+  OID(tmp) = getNewOID();
+  tmp->version = 1;
+  tmp->rcount = 1;
+  STATUS(tmp) = NEW;
+  chashInsert(record->lookupTable, OID(tmp), tmp);
+  
 #ifdef COMPILER
-       return &tmp[1]; //want space after object header
+  return &tmp[1]; //want space after object header
 #else
-       return tmp;
+  return tmp;
 #endif
 }
 
 /* This function creates machine piles based on all machines involved in a
  * transaction commit request */
 plistnode_t *createPiles(transrecord_t *record) {
-       int i = 0;
-       unsigned int size;/* Represents number of bins in the chash table */
-       chashlistnode_t *curr, *ptr, *next;
-       plistnode_t *pile = NULL;
-       unsigned int machinenum;
-       void *localmachinenum;
-       objheader_t *headeraddr;
-
-       ptr = record->lookupTable->table;
-       size = record->lookupTable->size;
-
-       for(i = 0; i < size ; i++) {
-               curr = &ptr[i];
-               /* Inner loop to traverse the linked list of the cache lookupTable */
-               while(curr != NULL) {
-                       //if the first bin in hash table is empty
-                       if(curr->key == 0) {
-                               break;
-                       }
-                       next = curr->next;
-
-                       if ((headeraddr = chashSearch(record->lookupTable, curr->key)) == NULL) {
-                               printf("Error: No such oid %s, %d\n", __FILE__, __LINE__);
-                               return NULL;
-                       }
-
-                       //Get machine location for object id (and whether local or not)
-                       if (STATUS(headeraddr) & NEW || (mhashSearch(curr->key) != NULL)) {
-                               machinenum = myIpAddr;
-                       } else  if ((machinenum = lhashSearch(curr->key)) == 0) {
-                               printf("Error: No such machine %s, %d\n", __FILE__, __LINE__);
-                               return NULL;
-                       }
-
-                       //Make machine groups
-                       if ((pile = pInsert(pile, headeraddr, machinenum, record->lookupTable->numelements)) == NULL) {
-                               printf("pInsert error %s, %d\n", __FILE__, __LINE__);
-                               return NULL;
-                       }
-
-                       curr = next;
-               }
-       }
-       return pile; 
+  int i;
+  plistnode_t *pile = NULL;
+  unsigned int machinenum;
+  objheader_t *headeraddr;
+  chashlistnode_t * ptr = record->lookupTable->table;
+  /* Represents number of bins in the chash table */
+  unsigned int size = record->lookupTable->size;
+  
+  for(i = 0; i < size ; i++) {
+    chashlistnode_t * curr = &ptr[i];
+    /* Inner loop to traverse the linked list of the cache lookupTable */
+    while(curr != NULL) {
+      //if the first bin in hash table is empty
+      if(curr->key == 0)
+       break;
+      
+      if ((headeraddr = chashSearch(record->lookupTable, curr->key)) == NULL) {
+       printf("Error: No such oid %s, %d\n", __FILE__, __LINE__);
+       return NULL;
+      }
+      
+      //Get machine location for object id (and whether local or not)
+      if (STATUS(headeraddr) & NEW || (mhashSearch(curr->key) != NULL)) {
+       machinenum = myIpAddr;
+      } else if ((machinenum = lhashSearch(curr->key)) == 0) {
+       printf("Error: No such machine %s, %d\n", __FILE__, __LINE__);
+       return NULL;
+      }
+      
+      //Make machine groups
+      pile = pInsert(pile, headeraddr, machinenum, record->lookupTable->numelements);
+      curr = curr->next;
+    }
+  }
+  return pile; 
 }
 
 /* This function initiates the transaction commit process
@@ -428,213 +400,186 @@ plistnode_t *createPiles(transrecord_t *record) {
  * Sends a transrequest() to each remote machines for objects found remotely 
  * and calls handleLocalReq() to process objects found locally */
 int transCommit(transrecord_t *record) {       
-       unsigned int tot_bytes_mod, *listmid;
-       plistnode_t *pile, *pile_ptr;
-       int i, j, rc, val;
-       int pilecount, offset, threadnum = 0, trecvcount = 0;
-       char control;
-       char transid[TID_LEN];
-       trans_req_data_t *tosend;
-       trans_commit_data_t transinfo;
-       static int newtid = 0;
-       char treplyctrl = 0, treplyretry = 0; /* keeps track of the common response that needs to be sent */
-       char localstat = 0;
-       thread_data_array_t *thread_data_array;
-       local_thread_data_array_t *ltdata;
-
-       do { 
-               trecvcount = 0; 
-               threadnum = 0; 
-               treplyretry = 0;
-               thread_data_array = NULL;
-               ltdata = NULL;
-
-               /* Look through all the objects in the transaction record and make piles 
-                * for each machine involved in the transaction*/
-               pile_ptr = pile = createPiles(record);
-
-               /* Create the packet to be sent in TRANS_REQUEST */
-
-               /* Count the number of participants */
-               pilecount = pCount(pile);
-
-               /* Create a list of machine ids(Participants) involved in transaction   */
-               if((listmid = calloc(pilecount, sizeof(unsigned int))) == NULL) {
-                       printf("Calloc error %s, %d\n", __FILE__, __LINE__);
-                       return 1;
-               }               
-               pListMid(pile, listmid);
-
-
-               /* Initialize thread variables,
-                * Spawn a thread for each Participant involved in a transaction */
-               pthread_t thread[pilecount];
-               pthread_attr_t attr;                    
-               pthread_cond_t tcond;
-               pthread_mutex_t tlock;
-               pthread_mutex_t tlshrd;
-
-               if((thread_data_array = (thread_data_array_t *) calloc(pilecount, sizeof(thread_data_array_t))) == NULL) {
-                       printf("Calloc error %s, %d\n", __FILE__, __LINE__);
-                       pthread_cond_destroy(&tcond);
-                       pthread_mutex_destroy(&tlock);
-                       pDelete(pile_ptr);
-                       free(listmid);
-                       return 1;
-               }
-
-               if((ltdata = calloc(1, sizeof(local_thread_data_array_t))) == NULL) {
-                       printf("Calloc error %s, %d\n", __FILE__, __LINE__);
-                       pthread_cond_destroy(&tcond);
-                       pthread_mutex_destroy(&tlock);
-                       pDelete(pile_ptr);
-                       free(listmid);
-                       free(thread_data_array);
-                       return 1;
-               }
-
-               thread_response_t rcvd_control_msg[pilecount];  /* Shared thread array that keeps track of responses of participants */
-
-               /* Initialize and set thread detach attribute */
-               pthread_attr_init(&attr);
-               pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_JOINABLE);
-               pthread_mutex_init(&tlock, NULL);
-               pthread_cond_init(&tcond, NULL);
-
-               /* Process each machine pile */
-               while(pile != NULL) {
-                       //Create transaction id
-                       newtid++;
-                       if ((tosend = calloc(1, sizeof(trans_req_data_t))) == NULL) {
-                               printf("Calloc error %s, %d\n", __FILE__, __LINE__);
-                               pthread_cond_destroy(&tcond);
-                               pthread_mutex_destroy(&tlock);
-                               pDelete(pile_ptr);
-                               free(listmid);
-                               free(thread_data_array);
-                               free(ltdata);
-                               return 1;
-                       }
-                       tosend->f.control = TRANS_REQUEST;
-                       sprintf(tosend->f.trans_id, "%x_%d", pile->mid, newtid);
-                       tosend->f.mcount = pilecount;
-                       tosend->f.numread = pile->numread;
-                       tosend->f.nummod = pile->nummod;
-                       tosend->f.numcreated = pile->numcreated;
-                       tosend->f.sum_bytes = pile->sum_bytes;
-                       tosend->listmid = listmid;
-                       tosend->objread = pile->objread;
-                       tosend->oidmod = pile->oidmod;
-                       tosend->oidcreated = pile->oidcreated;
-                       thread_data_array[threadnum].thread_id = threadnum;
-                       thread_data_array[threadnum].mid = pile->mid;
-                       thread_data_array[threadnum].buffer = tosend;
-                       thread_data_array[threadnum].recvmsg = rcvd_control_msg;
-                       thread_data_array[threadnum].threshold = &tcond;
-                       thread_data_array[threadnum].lock = &tlock;
-                       thread_data_array[threadnum].count = &trecvcount;
-                       thread_data_array[threadnum].replyctrl = &treplyctrl;
-                       thread_data_array[threadnum].replyretry = &treplyretry;
-                       thread_data_array[threadnum].rec = record;
-                       /* If local do not create any extra connection */
-                       if(pile->mid != myIpAddr) { /* Not local */
-                               do {
-                                       rc = pthread_create(&thread[threadnum], &attr, transRequest, (void *) &thread_data_array[threadnum]);  
-                               } while(rc!=0);
-                               if(rc) {
-                                       perror("Error in pthread create\n");
-                                       pthread_cond_destroy(&tcond);
-                                       pthread_mutex_destroy(&tlock);
-                                       pDelete(pile_ptr);
-                                       free(listmid);
-                                       for (i = 0; i < threadnum; i++)
-                                               free(thread_data_array[i].buffer);
-                                       free(thread_data_array);
-                                       free(ltdata);
-                                       return 1;
-                               }
-                       } else { /*Local*/
-                               ltdata->tdata = &thread_data_array[threadnum];
-                               ltdata->transinfo = &transinfo;
-                               do {
-                                       val = pthread_create(&thread[threadnum], &attr, handleLocalReq, (void *) ltdata);
-                               } while(val!=0);
-                               if(val) {
-                                       perror("Error in pthread create\n");
-                                       pthread_cond_destroy(&tcond);
-                                       pthread_mutex_destroy(&tlock);
-                                       pDelete(pile_ptr);
-                                       free(listmid);
-                                       for (i = 0; i < threadnum; i++)
-                                               free(thread_data_array[i].buffer);
-                                       free(thread_data_array);
-                                       free(ltdata);
-                                       return 1;
-                               }
-                       }
-
-                       threadnum++;            
-                       pile = pile->next;
-               }
-               /* Free attribute and wait for the other threads */
-               pthread_attr_destroy(&attr);
-
-               for (i = 0; i < threadnum; i++) {
-                       rc = pthread_join(thread[i], NULL);
-                       if(rc)
-                       {
-                               printf("Error: return code from pthread_join() is %d\n", rc);
-                               pthread_cond_destroy(&tcond);
-                               pthread_mutex_destroy(&tlock);
-                               pDelete(pile_ptr);
-                               free(listmid);
-                               for (j = i; j < threadnum; j++) {
-                                       free(thread_data_array[j].buffer);
-                               }
-                               return 1;
-                       }
-                       free(thread_data_array[i].buffer);
-               }
-
-               /* Free resources */    
-               pthread_cond_destroy(&tcond);
-               pthread_mutex_destroy(&tlock);
-               free(listmid);
-               pDelete(pile_ptr);
-
-               /* wait a random amount of time before retrying to commit transaction*/
-               if(treplyretry == 1) {
-                       free(thread_data_array);
-                       free(ltdata);
-                       randomdelay();
-               }
-
-       /* Retry trans commit procedure during soft_abort case */
-       } while (treplyretry == 1);
-
-
-       if(treplyctrl == TRANS_ABORT) {
-               /* Free Resources */
-               objstrDelete(record->cache);
-               chashDelete(record->lookupTable);
-               free(record);
-               free(thread_data_array);
-               free(ltdata);
-               return TRANS_ABORT;
-       } else if(treplyctrl == TRANS_COMMIT) {
-               /* Free Resources */
-               objstrDelete(record->cache);
-               chashDelete(record->lookupTable);
-               free(record);
-               free(thread_data_array);
-               free(ltdata);
-               return 0;
-       } else {
-               //TODO Add other cases
-               printf("Error: in %s() THIS SHOULD NOT HAPPEN.....EXIT PROGRAM\n", __func__);
-               exit(-1);
+  unsigned int tot_bytes_mod, *listmid;
+  plistnode_t *pile, *pile_ptr;
+  int i, j, rc, val;
+  int pilecount, offset, threadnum = 0, trecvcount = 0;
+  char control;
+  char transid[TID_LEN];
+  trans_req_data_t *tosend;
+  trans_commit_data_t transinfo;
+  static int newtid = 0;
+  char treplyctrl = 0, treplyretry = 0; /* keeps track of the common response that needs to be sent */
+  char localstat = 0;
+  thread_data_array_t *thread_data_array;
+  local_thread_data_array_t *ltdata;
+  
+  do { 
+    trecvcount = 0; 
+    threadnum = 0; 
+    treplyretry = 0;
+    thread_data_array = NULL;
+    ltdata = NULL;
+    
+    /* Look through all the objects in the transaction record and make piles 
+     * for each machine involved in the transaction*/
+    pile_ptr = pile = createPiles(record);
+    
+    /* Create the packet to be sent in TRANS_REQUEST */
+    
+    /* Count the number of participants */
+    pilecount = pCount(pile);
+    
+    /* Create a list of machine ids(Participants) involved in transaction      */
+    listmid = calloc(pilecount, sizeof(unsigned int));
+    pListMid(pile, listmid);
+    
+    
+    /* Initialize thread variables,
+     * Spawn a thread for each Participant involved in a transaction */
+    pthread_t thread[pilecount];
+    pthread_attr_t attr;                       
+    pthread_cond_t tcond;
+    pthread_mutex_t tlock;
+    pthread_mutex_t tlshrd;
+    
+    thread_data_array = (thread_data_array_t *) calloc(pilecount, sizeof(thread_data_array_t));
+    
+    ltdata = calloc(1, sizeof(local_thread_data_array_t));
+    
+    thread_response_t rcvd_control_msg[pilecount];     /* Shared thread array that keeps track of responses of participants */
+    
+    /* Initialize and set thread detach attribute */
+    pthread_attr_init(&attr);
+    pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_JOINABLE);
+    pthread_mutex_init(&tlock, NULL);
+    pthread_cond_init(&tcond, NULL);
+    
+    /* Process each machine pile */
+    while(pile != NULL) {
+      //Create transaction id
+      newtid++;
+      tosend = calloc(1, sizeof(trans_req_data_t));
+      tosend->f.control = TRANS_REQUEST;
+      sprintf(tosend->f.trans_id, "%x_%d", pile->mid, newtid);
+      tosend->f.mcount = pilecount;
+      tosend->f.numread = pile->numread;
+      tosend->f.nummod = pile->nummod;
+      tosend->f.numcreated = pile->numcreated;
+      tosend->f.sum_bytes = pile->sum_bytes;
+      tosend->listmid = listmid;
+      tosend->objread = pile->objread;
+      tosend->oidmod = pile->oidmod;
+      tosend->oidcreated = pile->oidcreated;
+      thread_data_array[threadnum].thread_id = threadnum;
+      thread_data_array[threadnum].mid = pile->mid;
+      thread_data_array[threadnum].buffer = tosend;
+      thread_data_array[threadnum].recvmsg = rcvd_control_msg;
+      thread_data_array[threadnum].threshold = &tcond;
+      thread_data_array[threadnum].lock = &tlock;
+      thread_data_array[threadnum].count = &trecvcount;
+      thread_data_array[threadnum].replyctrl = &treplyctrl;
+      thread_data_array[threadnum].replyretry = &treplyretry;
+      thread_data_array[threadnum].rec = record;
+      /* If local do not create any extra connection */
+      if(pile->mid != myIpAddr) { /* Not local */
+       do {
+         rc = pthread_create(&thread[threadnum], &attr, transRequest, (void *) &thread_data_array[threadnum]);  
+       } while(rc!=0);
+       if(rc) {
+         perror("Error in pthread create\n");
+         pthread_cond_destroy(&tcond);
+         pthread_mutex_destroy(&tlock);
+         pDelete(pile_ptr);
+         free(listmid);
+         for (i = 0; i < threadnum; i++)
+           free(thread_data_array[i].buffer);
+         free(thread_data_array);
+         free(ltdata);
+         return 1;
        }
-       return 0;
+      } else { /*Local*/
+       ltdata->tdata = &thread_data_array[threadnum];
+       ltdata->transinfo = &transinfo;
+       do {
+         val = pthread_create(&thread[threadnum], &attr, handleLocalReq, (void *) ltdata);
+       } while(val!=0);
+       if(val) {
+         perror("Error in pthread create\n");
+         pthread_cond_destroy(&tcond);
+         pthread_mutex_destroy(&tlock);
+         pDelete(pile_ptr);
+         free(listmid);
+         for (i = 0; i < threadnum; i++)
+           free(thread_data_array[i].buffer);
+         free(thread_data_array);
+         free(ltdata);
+         return 1;
+       }
+      }
+      
+      threadnum++;             
+      pile = pile->next;
+    }
+    /* Free attribute and wait for the other threads */
+    pthread_attr_destroy(&attr);
+    
+    for (i = 0; i < threadnum; i++) {
+      rc = pthread_join(thread[i], NULL);
+      if(rc)
+       {
+         printf("Error: return code from pthread_join() is %d\n", rc);
+         pthread_cond_destroy(&tcond);
+         pthread_mutex_destroy(&tlock);
+         pDelete(pile_ptr);
+         free(listmid);
+         for (j = i; j < threadnum; j++) {
+           free(thread_data_array[j].buffer);
+         }
+         return 1;
+       }
+      free(thread_data_array[i].buffer);
+    }
+    
+    /* Free resources */       
+    pthread_cond_destroy(&tcond);
+    pthread_mutex_destroy(&tlock);
+    free(listmid);
+    pDelete(pile_ptr);
+    
+    /* wait a random amount of time before retrying to commit transaction*/
+    if(treplyretry) {
+      free(thread_data_array);
+      free(ltdata);
+      randomdelay();
+    }
+    
+    /* Retry trans commit procedure during soft_abort case */
+  } while (treplyretry);
+  
+  
+  if(treplyctrl == TRANS_ABORT) {
+    /* Free Resources */
+    objstrDelete(record->cache);
+    chashDelete(record->lookupTable);
+    free(record);
+    free(thread_data_array);
+    free(ltdata);
+    return TRANS_ABORT;
+  } else if(treplyctrl == TRANS_COMMIT) {
+    /* Free Resources */
+    objstrDelete(record->cache);
+    chashDelete(record->lookupTable);
+    free(record);
+    free(thread_data_array);
+    free(ltdata);
+    return 0;
+  } else {
+    //TODO Add other cases
+    printf("Error: in %s() THIS SHOULD NOT HAPPEN.....EXIT PROGRAM\n", __func__);
+    exit(-1);
+  }
+  return 0;
 }
 
 /* This function sends information involved in the transaction request 
@@ -642,196 +587,184 @@ int transCommit(transrecord_t *record) {
  * It calls decideresponse() to decide on what control message 
  * to send next to participants and sends the message using sendResponse()*/
 void *transRequest(void *threadarg) {
-       int sd, i, n;
-       struct sockaddr_in serv_addr;
-       thread_data_array_t *tdata;
-       objheader_t *headeraddr;
-       char control, recvcontrol;
-       char machineip[16], retval;
-
-       tdata = (thread_data_array_t *) threadarg;
-
-       /* Send Trans Request */
-       if ((sd = socket(AF_INET, SOCK_STREAM, 0)) < 0) {
-               perror("Error in socket for TRANS_REQUEST\n");
-               pthread_exit(NULL);
-       }
-       bzero((char*) &serv_addr, sizeof(serv_addr));
-       serv_addr.sin_family = AF_INET;
-       serv_addr.sin_port = htons(LISTEN_PORT);
-       midtoIP(tdata->mid,machineip);
-       machineip[15] = '\0';
-       serv_addr.sin_addr.s_addr = inet_addr(machineip);
-       /* Open Connection */
-       if (connect(sd, (struct sockaddr *) &serv_addr, sizeof(struct sockaddr)) < 0) {
-               perror("Error in connect for TRANS_REQUEST\n");
-               close(sd);
-               pthread_exit(NULL);
-       }
-
-       /* Send bytes of data with TRANS_REQUEST control message */
-       send_data(sd, &(tdata->buffer->f), sizeof(fixed_data_t));
-       
-       /* Send list of machines involved in the transaction */
-       {
-               int size=sizeof(unsigned int)*tdata->buffer->f.mcount;
-               send_data(sd, tdata->buffer->listmid, size);
-       }
-
-       /* Send oids and version number tuples for objects that are read */
-       {
-               int size=(sizeof(unsigned int)+sizeof(unsigned short))*tdata->buffer->f.numread;
-               send_data(sd, tdata->buffer->objread, size);
-       }
-
-       /* Send objects that are modified */
-       for(i = 0; i < tdata->buffer->f.nummod ; i++) {
-               int size;
-               headeraddr = chashSearch(tdata->rec->lookupTable, tdata->buffer->oidmod[i]);
-               GETSIZE(size,headeraddr);
-               size+=sizeof(objheader_t);
-               send_data(sd, headeraddr, size);
-       }
-
-       /* Read control message from Participant */
-       recv_data(sd, &control, sizeof(char));
-       recvcontrol = control;
-
-       /* Update common data structure and increment count */
-       tdata->recvmsg[tdata->thread_id].rcv_status = recvcontrol;
-
-       /* Lock and update count */
-       /* Thread sleeps until all messages from pariticipants are received by coordinator */
-       pthread_mutex_lock(tdata->lock);
-
-       (*(tdata->count))++; /* keeps track of no of messages received by the coordinator */
-
-       /* Wake up the threads and invoke decideResponse (once) */
-       if(*(tdata->count) == tdata->buffer->f.mcount) {
-               decideResponse(tdata); 
-               pthread_cond_broadcast(tdata->threshold);
-       } else {
-               pthread_cond_wait(tdata->threshold, tdata->lock);
-       }
-       pthread_mutex_unlock(tdata->lock);
-
-       /* Send the final response such as TRANS_COMMIT or TRANS_ABORT t
-        * to all participants in their respective socket */
-       if (sendResponse(tdata, sd) == 0) { 
-               printf("sendResponse returned error %s,%d\n", __FILE__, __LINE__);
-               close(sd);
-               pthread_exit(NULL);
-       }
-
-       recv_data((int)sd, &control, sizeof(char)); 
-
-       if(control == TRANS_UNSUCESSFUL) {
-               //printf("DEBUG-> TRANS_ABORTED\n");
-       } else if(control == TRANS_SUCESSFUL) {
-               //printf("DEBUG-> TRANS_SUCCESSFUL\n");
-       } else {
-               //printf("DEBUG-> Error: Incorrect Transaction End Message %d\n", control);
-       }
-
-       /* Close connection */
-       close(sd);
-       pthread_exit(NULL);
+  int sd, i, n;
+  struct sockaddr_in serv_addr;
+  thread_data_array_t *tdata;
+  objheader_t *headeraddr;
+  char control, recvcontrol;
+  char machineip[16], retval;
+  
+  tdata = (thread_data_array_t *) threadarg;
+  
+  /* Send Trans Request */
+  if ((sd = socket(AF_INET, SOCK_STREAM, 0)) < 0) {
+    perror("Error in socket for TRANS_REQUEST\n");
+    pthread_exit(NULL);
+  }
+  bzero((char*) &serv_addr, sizeof(serv_addr));
+  serv_addr.sin_family = AF_INET;
+  serv_addr.sin_port = htons(LISTEN_PORT);
+  midtoIP(tdata->mid,machineip);
+  machineip[15] = '\0';
+  serv_addr.sin_addr.s_addr = inet_addr(machineip);
+  /* Open Connection */
+  if (connect(sd, (struct sockaddr *) &serv_addr, sizeof(struct sockaddr)) < 0) {
+    perror("Error in connect for TRANS_REQUEST\n");
+    close(sd);
+    pthread_exit(NULL);
+  }
+  
+  /* Send bytes of data with TRANS_REQUEST control message */
+  send_data(sd, &(tdata->buffer->f), sizeof(fixed_data_t));
+  
+  /* Send list of machines involved in the transaction */
+  {
+    int size=sizeof(unsigned int)*tdata->buffer->f.mcount;
+    send_data(sd, tdata->buffer->listmid, size);
+  }
+  
+  /* Send oids and version number tuples for objects that are read */
+  {
+    int size=(sizeof(unsigned int)+sizeof(unsigned short))*tdata->buffer->f.numread;
+    send_data(sd, tdata->buffer->objread, size);
+  }
+  
+  /* Send objects that are modified */
+  for(i = 0; i < tdata->buffer->f.nummod ; i++) {
+    int size;
+    headeraddr = chashSearch(tdata->rec->lookupTable, tdata->buffer->oidmod[i]);
+    GETSIZE(size,headeraddr);
+    size+=sizeof(objheader_t);
+    send_data(sd, headeraddr, size);
+  }
+  
+  /* Read control message from Participant */
+  recv_data(sd, &control, sizeof(char));
+  recvcontrol = control;
+  
+  /* Update common data structure and increment count */
+  tdata->recvmsg[tdata->thread_id].rcv_status = recvcontrol;
+  
+  /* Lock and update count */
+  /* Thread sleeps until all messages from pariticipants are received by coordinator */
+  pthread_mutex_lock(tdata->lock);
+  
+  (*(tdata->count))++; /* keeps track of no of messages received by the coordinator */
+  
+  /* Wake up the threads and invoke decideResponse (once) */
+  if(*(tdata->count) == tdata->buffer->f.mcount) {
+    decideResponse(tdata); 
+    pthread_cond_broadcast(tdata->threshold);
+  } else {
+    pthread_cond_wait(tdata->threshold, tdata->lock);
+  }
+  pthread_mutex_unlock(tdata->lock);
+  
+  /* Send the final response such as TRANS_COMMIT or TRANS_ABORT t
+   * to all participants in their respective socket */
+  if (sendResponse(tdata, sd) == 0) { 
+    printf("sendResponse returned error %s,%d\n", __FILE__, __LINE__);
+    close(sd);
+    pthread_exit(NULL);
+  }
+  
+  recv_data((int)sd, &control, sizeof(char)); 
+  
+  if(control == TRANS_UNSUCESSFUL) {
+    //printf("DEBUG-> TRANS_ABORTED\n");
+  } else if(control == TRANS_SUCESSFUL) {
+    //printf("DEBUG-> TRANS_SUCCESSFUL\n");
+  } else {
+    //printf("DEBUG-> Error: Incorrect Transaction End Message %d\n", control);
+  }
+  
+  /* Close connection */
+  close(sd);
+  pthread_exit(NULL);
 }
 
 /* This function decides the reponse that needs to be sent to 
  * all Participant machines after the TRANS_REQUEST protocol */
 void decideResponse(thread_data_array_t *tdata) {
-       char control;
-       int i, transagree = 0, transdisagree = 0, transsoftabort = 0; /* Counters to formulate decision of what
-                                                                        message to send */
-
-       for (i = 0 ; i < tdata->buffer->f.mcount; i++) {
-               control = tdata->recvmsg[i].rcv_status; /* tdata: keeps track of all participant responses
-                                                          written onto the shared array */
-               switch(control) {
-                       default:
-                               printf("Participant sent unknown message in %s, %d\n", __FILE__, __LINE__);
-                               /* treat as disagree, pass thru */
-                       case TRANS_DISAGREE:
-                               transdisagree++;
-                               break;
-
-                       case TRANS_AGREE:
-                               transagree++;
-                               break;
-
-                       case TRANS_SOFT_ABORT:
-                               transsoftabort++;
-                               break;
-               }
-       }
+  char control;
+  int i, transagree = 0, transdisagree = 0, transsoftabort = 0; /* Counters to formulate decision of what
+                                                                  message to send */
+  
+  for (i = 0 ; i < tdata->buffer->f.mcount; i++) {
+    control = tdata->recvmsg[i].rcv_status; /* tdata: keeps track of all participant responses
+                                              written onto the shared array */
+    switch(control) {
+    default:
+      printf("Participant sent unknown message in %s, %d\n", __FILE__, __LINE__);
+      /* treat as disagree, pass thru */
+    case TRANS_DISAGREE:
+      transdisagree++;
+      break;
+      
+    case TRANS_AGREE:
+      transagree++;
+      break;
+      
+    case TRANS_SOFT_ABORT:
+      transsoftabort++;
+      break;
+    }
+  }
+  
+  if(transdisagree > 0) {
+    /* Send Abort */
+    *(tdata->replyctrl) = TRANS_ABORT;
+    *(tdata->replyretry) = 0;
+    /* clear objects from prefetch cache */
+    for (i = 0; i < tdata->buffer->f.numread; i++)
+      prehashRemove(*((unsigned int *)(((char *)tdata->buffer->objread) + (sizeof(unsigned int) + sizeof(unsigned short))*i)));
+    for (i = 0; i < tdata->buffer->f.nummod; i++)
+      prehashRemove(tdata->buffer->oidmod[i]);
+  } else if(transagree == tdata->buffer->f.mcount){
+    /* Send Commit */
+    *(tdata->replyctrl) = TRANS_COMMIT;
+    *(tdata->replyretry) = 0;
+  } else { 
+    /* Send Abort in soft abort case followed by retry commiting transaction again*/
+    *(tdata->replyctrl) = TRANS_ABORT;
+    *(tdata->replyretry) = 1;
+  }
+  
+  return;
+}
 
-       if(transdisagree > 0) {
-               /* Send Abort */
-               *(tdata->replyctrl) = TRANS_ABORT;
-               *(tdata->replyretry) = 0;
-               /* clear objects from prefetch cache */
-               for (i = 0; i < tdata->buffer->f.numread; i++)
-                       prehashRemove(*((unsigned int *)(((char *)tdata->buffer->objread) + (sizeof(unsigned int) + sizeof(unsigned short))*i)));
-               for (i = 0; i < tdata->buffer->f.nummod; i++)
-                       prehashRemove(tdata->buffer->oidmod[i]);
-       } else if(transagree == tdata->buffer->f.mcount){
-               /* Send Commit */
-               *(tdata->replyctrl) = TRANS_COMMIT;
-               *(tdata->replyretry) = 0;
-       } else { 
-               /* Send Abort in soft abort case followed by retry commiting transaction again*/
-               *(tdata->replyctrl) = TRANS_ABORT;
-               *(tdata->replyretry) = 1;
-       }
+/* This function sends the final response to remote machines per
+ * thread in their respective socket id It returns a char that is only
+ * needed to check the correctness of execution of this function
+ * inside transRequest()*/
 
-       return;
-}
-/* This function sends the final response to remote machines per thread in their respective socket id 
- * It returns a char that is only needed to check the correctness of execution of this function inside
- * transRequest()*/
 char sendResponse(thread_data_array_t *tdata, int sd) {
-       int n, size, sum, oidcount = 0, control;
-       char *ptr, retval = 0;
-       unsigned int *oidnotfound;
-
-       control = *(tdata->replyctrl);
-       send_data(sd, &control, sizeof(char));
-
-       //TODO read missing objects during object migration
-       /* If response is a soft abort due to missing objects at the Participant's side */
-       /*
-       if(tdata->recvmsg[tdata->thread_id].rcv_status == TRANS_SOFT_ABORT) {
-               // Read list of objects missing  
-               recv_data(sd, &oidcount, sizeof(int));
-               //if((read(sd, &oidcount, sizeof(int)) != 0) && (oidcount != 0)) {
-               if(oidcount != 0) {
-                       size = oidcount * sizeof(unsigned int);
-                       if((oidnotfound = calloc(oidcount, sizeof(unsigned int))) == NULL) {
-                               printf("Calloc error %s, %d\n", __FILE__, __LINE__);
-                               return 0;
-                       }
-                       ptr = (char *) oidnotfound;
-                       recv_data(sd, ptr, size);
-               }
-               retval =  TRANS_SOFT_ABORT;
-       }
-       */
-
-       /* If the decided response is TRANS_ABORT */
-       if(*(tdata->replyctrl) == TRANS_ABORT) {
-               retval = TRANS_ABORT;
-       } else if(*(tdata->replyctrl) == TRANS_COMMIT) { /* If the decided response is TRANS_COMMIT */ 
-               retval = TRANS_COMMIT;
-       }
-       
-       return retval;
+  int n, size, sum, oidcount = 0, control;
+  char *ptr, retval = 0;
+  unsigned int *oidnotfound;
+  
+  control = *(tdata->replyctrl);
+  send_data(sd, &control, sizeof(char));
+  
+  //TODO read missing objects during object migration
+  /* If response is a soft abort due to missing objects at the
+     Participant's side */
+  
+  /* If the decided response is TRANS_ABORT */
+  if(*(tdata->replyctrl) == TRANS_ABORT) {
+    retval = TRANS_ABORT;
+  } else if(*(tdata->replyctrl) == TRANS_COMMIT) { 
+    /* If the decided response is TRANS_COMMIT */ 
+    retval = TRANS_COMMIT;
+  }
+  
+  return retval;
 }
 
-/* This function opens a connection, places an object read request to the 
- * remote machine, reads the control message and object if available  and 
- * copies the object and its header to the local cache.
- * */ 
+/* This function opens a connection, places an object read request to
+ * the remote machine, reads the control message and object if
+ * available and copies the object and its header to the local
+ * cache. */ 
 
 void *getRemoteObj(transrecord_t *record, unsigned int mnum, unsigned int oid) {
   int size, val;
@@ -867,148 +800,150 @@ void *getRemoteObj(transrecord_t *record, unsigned int mnum, unsigned int oid) {
   return objcopy;
 }
 
-/* This function handles the local objects involved in a transaction commiting process.
- * It also makes a decision if this local machine sends AGREE or DISAGREE or SOFT_ABORT to coordinator.
- * Note Coordinator = local machine
- * It wakes up the other threads from remote participants that are waiting for the coordinator's decision and
- * based on common agreement it either commits or aborts the transaction.
- * It also frees the memory resources */
-void *handleLocalReq(void *threadarg) {
-       unsigned int *oidnotfound = NULL, *oidlocked = NULL;
-       local_thread_data_array_t *localtdata;
-       int objnotfound = 0, objlocked = 0; 
-       int v_nomatch = 0, v_matchlock = 0, v_matchnolock = 0;
-       int numread, i;
-       unsigned int oid;
-       unsigned short version;
-       void *mobj;
-       objheader_t *headptr;
-
-       localtdata = (local_thread_data_array_t *) threadarg;
-
-       /* Counters and arrays to formulate decision on control message to be sent */
-       oidnotfound = (unsigned int *) calloc((localtdata->tdata->buffer->f.numread + localtdata->tdata->buffer->f.nummod), sizeof(unsigned int));
-       oidlocked = (unsigned int *) calloc((localtdata->tdata->buffer->f.numread + localtdata->tdata->buffer->f.nummod), sizeof(unsigned int));
-
-       numread = localtdata->tdata->buffer->f.numread;
-       /* Process each oid in the machine pile/ group per thread */
-       for (i = 0; i < localtdata->tdata->buffer->f.numread + localtdata->tdata->buffer->f.nummod; i++) {
-               if (i < localtdata->tdata->buffer->f.numread) {
-                       int incr = sizeof(unsigned int) + sizeof(unsigned short);// Offset that points to next position in the objread array
-                       incr *= i;
-                       oid = *((unsigned int *)(((char *)localtdata->tdata->buffer->objread) + incr));
-                       version = *((unsigned short *)(((char *)localtdata->tdata->buffer->objread) + incr + sizeof(unsigned int)));
-               } else { // Objects Modified
-                       int tmpsize;
-                       headptr = (objheader_t *) chashSearch(localtdata->tdata->rec->lookupTable, localtdata->tdata->buffer->oidmod[i-numread]);
-                       if (headptr == NULL) {
-                               printf("Error: handleLocalReq() returning NULL %s, %d\n", __FILE__, __LINE__);
-                               return NULL;
-                       }
-                       oid = OID(headptr);
-                       version = headptr->version;
-               }
-               /* Check if object is still present in the machine since the beginning of TRANS_REQUEST */
-
-               /* Save the oids not found and number of oids not found for later use */
-               if ((mobj = mhashSearch(oid)) == NULL) { /* Obj not found */
-                       /* Save the oids not found and number of oids not found for later use */
-                       oidnotfound[objnotfound] = oid;
-                       objnotfound++;
-               } else { /* If Obj found in machine (i.e. has not moved) */
-                       /* Check if Obj is locked by any previous transaction */
-                       if ((STATUS((objheader_t *)mobj) & LOCK) == LOCK) {
-                               if (version == ((objheader_t *)mobj)->version) {      /* If locked then match versions */ 
-                                       v_matchlock++;
-                               } else {/* If versions don't match ...HARD ABORT */
-                                       v_nomatch++;
-                                       /* Send TRANS_DISAGREE to Coordinator */
-                                       localtdata->tdata->recvmsg[localtdata->tdata->thread_id].rcv_status = TRANS_DISAGREE;
-                               }
-                       } else {/* If Obj is not locked then lock object */
-                               STATUS(((objheader_t *)mobj)) |= LOCK;
-                               /* Save all object oids that are locked on this machine during this transaction request call */
-                               oidlocked[objlocked] = OID(((objheader_t *)mobj));
-                               objlocked++;
-                               if (version == ((objheader_t *)mobj)->version) { /* Check if versions match */
-                                       v_matchnolock++;
-                               } else { /* If versions don't match ...HARD ABORT */
-                                       v_nomatch++;
-                                       /* Send TRANS_DISAGREE to Coordinator */
-                                       localtdata->tdata->recvmsg[localtdata->tdata->thread_id].rcv_status = TRANS_DISAGREE;
-                               }
-                       }
-               }
-       } // End for
-       /* Condition to send TRANS_AGREE */
-       if(v_matchnolock == localtdata->tdata->buffer->f.numread + localtdata->tdata->buffer->f.nummod) {
-               localtdata->tdata->recvmsg[localtdata->tdata->thread_id].rcv_status = TRANS_AGREE;
-       }
-       /* Condition to send TRANS_SOFT_ABORT */
-       if((v_matchlock > 0 && v_nomatch == 0) || (objnotfound > 0 && v_nomatch == 0)) {
-               localtdata->tdata->recvmsg[localtdata->tdata->thread_id].rcv_status = TRANS_SOFT_ABORT;
-       }
+/* This function handles the local objects involved in a transaction
+ * commiting process.  It also makes a decision if this local machine
+ * sends AGREE or DISAGREE or SOFT_ABORT to coordinator.  Note
+ * Coordinator = local machine It wakes up the other threads from
+ * remote participants that are waiting for the coordinator's decision
+ * and based on common agreement it either commits or aborts the
+ * transaction.  It also frees the memory resources */
 
-       /* Fill out the trans_commit_data_t data structure. This is required for a trans commit process
-        * if Participant receives a TRANS_COMMIT */
-       localtdata->transinfo->objlocked = oidlocked;
-       localtdata->transinfo->objnotfound = oidnotfound;
-       localtdata->transinfo->modptr = NULL;
-       localtdata->transinfo->numlocked = objlocked;
-       localtdata->transinfo->numnotfound = objnotfound;
-       /* Lock and update count */
-       //Thread sleeps until all messages from pariticipants are received by coordinator
-       pthread_mutex_lock(localtdata->tdata->lock);
-       (*(localtdata->tdata->count))++; /* keeps track of no of messages received by the coordinator */
-
-       /* Wake up the threads and invoke decideResponse (once) */
-       if(*(localtdata->tdata->count) == localtdata->tdata->buffer->f.mcount) {
-               decideResponse(localtdata->tdata); 
-               pthread_cond_broadcast(localtdata->tdata->threshold);
-       } else {
-               pthread_cond_wait(localtdata->tdata->threshold, localtdata->tdata->lock);
-       }
-       pthread_mutex_unlock(localtdata->tdata->lock);
-       if(*(localtdata->tdata->replyctrl) == TRANS_ABORT){
-               if(transAbortProcess(localtdata) != 0) {
-                       printf("Error in transAbortProcess() %s,%d\n", __FILE__, __LINE__);
-                       pthread_exit(NULL);
-               }
-       } else if(*(localtdata->tdata->replyctrl) == TRANS_COMMIT) {
-               if(transComProcess(localtdata) != 0) {
-                       printf("Error in transComProcess() %s,%d\n", __FILE__, __LINE__);
-                       pthread_exit(NULL);
-               }
-       }
-       /* Free memory */
-       if (localtdata->transinfo->objlocked != NULL) {
-               free(localtdata->transinfo->objlocked);
+void *handleLocalReq(void *threadarg) {
+  unsigned int *oidnotfound = NULL, *oidlocked = NULL;
+  local_thread_data_array_t *localtdata;
+  int objnotfound = 0, objlocked = 0; 
+  int v_nomatch = 0, v_matchlock = 0, v_matchnolock = 0;
+  int numread, i;
+  unsigned int oid;
+  unsigned short version;
+  void *mobj;
+  objheader_t *headptr;
+  
+  localtdata = (local_thread_data_array_t *) threadarg;
+  
+  /* Counters and arrays to formulate decision on control message to be sent */
+  oidnotfound = (unsigned int *) calloc((localtdata->tdata->buffer->f.numread + localtdata->tdata->buffer->f.nummod), sizeof(unsigned int));
+  oidlocked = (unsigned int *) calloc((localtdata->tdata->buffer->f.numread + localtdata->tdata->buffer->f.nummod), sizeof(unsigned int));
+  
+  numread = localtdata->tdata->buffer->f.numread;
+  /* Process each oid in the machine pile/ group per thread */
+  for (i = 0; i < localtdata->tdata->buffer->f.numread + localtdata->tdata->buffer->f.nummod; i++) {
+    if (i < localtdata->tdata->buffer->f.numread) {
+      int incr = sizeof(unsigned int) + sizeof(unsigned short);// Offset that points to next position in the objread array
+      incr *= i;
+      oid = *((unsigned int *)(((char *)localtdata->tdata->buffer->objread) + incr));
+      version = *((unsigned short *)(((char *)localtdata->tdata->buffer->objread) + incr + sizeof(unsigned int)));
+    } else { // Objects Modified
+      int tmpsize;
+      headptr = (objheader_t *) chashSearch(localtdata->tdata->rec->lookupTable, localtdata->tdata->buffer->oidmod[i-numread]);
+      if (headptr == NULL) {
+       printf("Error: handleLocalReq() returning NULL %s, %d\n", __FILE__, __LINE__);
+       return NULL;
+      }
+      oid = OID(headptr);
+      version = headptr->version;
+    }
+    /* Check if object is still present in the machine since the beginning of TRANS_REQUEST */
+    
+    /* Save the oids not found and number of oids not found for later use */
+    if ((mobj = mhashSearch(oid)) == NULL) { /* Obj not found */
+      /* Save the oids not found and number of oids not found for later use */
+      oidnotfound[objnotfound] = oid;
+      objnotfound++;
+    } else { /* If Obj found in machine (i.e. has not moved) */
+      /* Check if Obj is locked by any previous transaction */
+      if ((STATUS((objheader_t *)mobj) & LOCK) == LOCK) {
+       if (version == ((objheader_t *)mobj)->version) {      /* If locked then match versions */ 
+         v_matchlock++;
+       } else {/* If versions don't match ...HARD ABORT */
+         v_nomatch++;
+         /* Send TRANS_DISAGREE to Coordinator */
+         localtdata->tdata->recvmsg[localtdata->tdata->thread_id].rcv_status = TRANS_DISAGREE;
        }
-       if (localtdata->transinfo->objnotfound != NULL) {
-               free(localtdata->transinfo->objnotfound);
+      } else {/* If Obj is not locked then lock object */
+       STATUS(((objheader_t *)mobj)) |= LOCK;
+       /* Save all object oids that are locked on this machine during this transaction request call */
+       oidlocked[objlocked] = OID(((objheader_t *)mobj));
+       objlocked++;
+       if (version == ((objheader_t *)mobj)->version) { /* Check if versions match */
+         v_matchnolock++;
+       } else { /* If versions don't match ...HARD ABORT */
+         v_nomatch++;
+         /* Send TRANS_DISAGREE to Coordinator */
+         localtdata->tdata->recvmsg[localtdata->tdata->thread_id].rcv_status = TRANS_DISAGREE;
        }
-
-       pthread_exit(NULL);
+      }
+    }
+  } // End for
+  /* Condition to send TRANS_AGREE */
+  if(v_matchnolock == localtdata->tdata->buffer->f.numread + localtdata->tdata->buffer->f.nummod) {
+    localtdata->tdata->recvmsg[localtdata->tdata->thread_id].rcv_status = TRANS_AGREE;
+  }
+  /* Condition to send TRANS_SOFT_ABORT */
+  if((v_matchlock > 0 && v_nomatch == 0) || (objnotfound > 0 && v_nomatch == 0)) {
+    localtdata->tdata->recvmsg[localtdata->tdata->thread_id].rcv_status = TRANS_SOFT_ABORT;
+  }
+  
+  /* Fill out the trans_commit_data_t data structure. This is required for a trans commit process
+   * if Participant receives a TRANS_COMMIT */
+  localtdata->transinfo->objlocked = oidlocked;
+  localtdata->transinfo->objnotfound = oidnotfound;
+  localtdata->transinfo->modptr = NULL;
+  localtdata->transinfo->numlocked = objlocked;
+  localtdata->transinfo->numnotfound = objnotfound;
+  /* Lock and update count */
+  //Thread sleeps until all messages from pariticipants are received by coordinator
+  pthread_mutex_lock(localtdata->tdata->lock);
+  (*(localtdata->tdata->count))++; /* keeps track of no of messages received by the coordinator */
+  
+  /* Wake up the threads and invoke decideResponse (once) */
+  if(*(localtdata->tdata->count) == localtdata->tdata->buffer->f.mcount) {
+    decideResponse(localtdata->tdata); 
+    pthread_cond_broadcast(localtdata->tdata->threshold);
+  } else {
+    pthread_cond_wait(localtdata->tdata->threshold, localtdata->tdata->lock);
+  }
+  pthread_mutex_unlock(localtdata->tdata->lock);
+  if(*(localtdata->tdata->replyctrl) == TRANS_ABORT){
+    if(transAbortProcess(localtdata) != 0) {
+      printf("Error in transAbortProcess() %s,%d\n", __FILE__, __LINE__);
+      pthread_exit(NULL);
+    }
+  } else if(*(localtdata->tdata->replyctrl) == TRANS_COMMIT) {
+    if(transComProcess(localtdata) != 0) {
+      printf("Error in transComProcess() %s,%d\n", __FILE__, __LINE__);
+      pthread_exit(NULL);
+    }
+  }
+  /* Free memory */
+  if (localtdata->transinfo->objlocked != NULL) {
+    free(localtdata->transinfo->objlocked);
+  }
+  if (localtdata->transinfo->objnotfound != NULL) {
+    free(localtdata->transinfo->objnotfound);
+  }
+  
+  pthread_exit(NULL);
 }
 
 /* This function completes the ABORT process if the transaction is aborting */
 int transAbortProcess(local_thread_data_array_t  *localtdata) {
-       int i, numlocked;
-       unsigned int *objlocked;
-       void *header;
-
-       numlocked = localtdata->transinfo->numlocked;
-       objlocked = localtdata->transinfo->objlocked;
-
-       for (i = 0; i < numlocked; i++) {
-               if((header = mhashSearch(objlocked[i])) == NULL) {
-                       printf("mhashsearch returns NULL at %s, %d\n", __FILE__, __LINE__);
-                       return 1;
-               }
-               STATUS(((objheader_t *)header)) &= ~(LOCK);
-       }
-
-       return 0;
+  int i, numlocked;
+  unsigned int *objlocked;
+  void *header;
+  
+  numlocked = localtdata->transinfo->numlocked;
+  objlocked = localtdata->transinfo->objlocked;
+  
+  for (i = 0; i < numlocked; i++) {
+    if((header = mhashSearch(objlocked[i])) == NULL) {
+      printf("mhashsearch returns NULL at %s, %d\n", __FILE__, __LINE__);
+      return 1;
+    }
+    STATUS(((objheader_t *)header)) &= ~(LOCK);
+  }
+  
+  return 0;
 }
 
 /*This function completes the COMMIT process is the transaction is commiting*/
@@ -1075,371 +1010,157 @@ int transComProcess(local_thread_data_array_t  *localtdata) {
        return 0;
 }
 
-/* This function checks if the prefetch oids are same and have same offsets  
- * for case x.a.b and y.a.b where x and y have same oid's
- * or if a.b.c is a subset of x.b.c.d*/ 
-/* check for case where the generated request a.y.z or x.y.z.g then 
- * prefetch needs to be generated for x.y.z.g  if oid of a and x are same*/
-void checkPrefetchTuples(prefetchqelem_t *node) {
-       int i,j, count,k, sindex, index;
-       char *ptr, *tmp;
-       int ntuples, slength;
-       unsigned int *oid;
-       unsigned short *endoffsets;
-       short *offsets; 
-
-       /* Check for the case x.y.z and a.b.c are same oids */ 
-       ptr = (char *) node;
-       ntuples = *(GET_NTUPLES(ptr));
-       oid = GET_PTR_OID(ptr);
-       endoffsets = GET_PTR_EOFF(ptr, ntuples); 
-       offsets = GET_PTR_ARRYFLD(ptr, ntuples);
-       
-       /* Find offset length for each tuple */
-       int numoffset[ntuples];
-       numoffset[0] = endoffsets[0];
-       for(i = 1; i<ntuples; i++) {
-               numoffset[i] = endoffsets[i] - endoffsets[i-1];
-       }
-       /* Check for redundant tuples by comparing oids of each tuple */
-       for(i = 0; i < ntuples; i++) {
-               if(oid[i] == 0)
-                       continue;
-               for(j = i+1 ; j < ntuples; j++) {
-                       if(oid[j] == 0)
-                               continue;
-                       /*If oids of tuples match */ 
-                       if (oid[i] == oid[j]) {
-                               /* Find the smallest offset length of two tuples*/
-                               if(numoffset[i] >  numoffset[j]){
-                                       slength = numoffset[j];
-                                       sindex = j;
-                               }
-                               else {
-                                       slength = numoffset[i];
-                                       sindex = i;
-                               }
-
-                               /* Compare the offset values based on the current indices
-                                * break if they do not match
-                                * if all offset values match then pick the largest tuple*/
-
-                               if(i == 0) {
-                                       k = 0;
-                               } else {
-                                       k = endoffsets[i-1];
-                               }
-                               index = endoffsets[j -1];
-                               for(count = 0; count < slength; count ++) {
-                                       if (offsets[k] != offsets[index]) { 
-                                               break;
-                                       }
-                                       index++;
-                                       k++;
-                               }       
-                               if(slength == count) {
-                                       oid[sindex] = 0;
-                               }
-                       }
-               }
-       }
+prefetchpile_t *foundLocal(prefetchqelem_t *node) {
+  char * ptr = (char *) node;
+  int ntuples = *(GET_NTUPLES(ptr));
+  unsigned int * oidarray = GET_PTR_OID(ptr);
+  unsigned short * endoffsets = GET_PTR_EOFF(ptr, ntuples); 
+  short * arryfields = GET_PTR_ARRYFLD(ptr, ntuples);
+  prefetchpile_t * head=NULL;
+  
+  int i;
+  for(i=0;i<ntuples; i++) {
+    unsigned short baseindex=(i==0)?0:endoffsets[i-1];
+    unsigned short endindex=endoffsets[i];
+    unsigned int oid=oidarray[i];
+    int newbase;
+    int machinenum;
+    //Look up fields locally
+    for(newbase=baseindex;newbase<endindex;newbase++) {
+      if (!lookupObject(&oid, arryfields[newbase]))
+       break;
+      //Ended in a null pointer...
+      if (oid==0)
+       goto tuple;
+    }
+    //Entire prefetch is local
+    if (newbase==endindex&&checkoid(oid))
+      goto tuple;
+    //Add to remote requests
+    machinenum=lhashSearch(oid);
+    insertPile(machinenum, oid, endindex-newbase, &arryfields[newbase], &head);
+  tuple:
+    ;
+  }
+  return head;
 }
-/* This function makes machine piles to be added into the machine pile queue for each prefetch call */
-prefetchpile_t *makePreGroups(prefetchqelem_t *node, int *numoffset) {
-       char *ptr;
-       int ntuples, i, machinenum, count=0;
-       unsigned int *oid;
-       unsigned short *endoffsets;
-       short *arryfields, *offset; 
-       prefetchpile_t *head = NULL, *tmp = NULL;
-
-       /* Check for the case x.y.z and a.b.c are same oids */ 
-       ptr = (char *) node;
-       ntuples = *(GET_NTUPLES(ptr));
-       oid = GET_PTR_OID(ptr);
-       endoffsets = GET_PTR_EOFF(ptr, ntuples); 
-       arryfields = GET_PTR_ARRYFLD(ptr, ntuples);
-
-       if((head = (prefetchpile_t *) calloc(1, sizeof(prefetchpile_t))) == NULL) {
-               printf("Calloc error: %s %d\n", __FILE__, __LINE__);
-               return NULL;
-       }
-
-       /* Check for redundant tuples by comparing oids of each tuple */
-       for(i = 0; i < ntuples; i++) {
-               if(oid[i] == 0){
-                       if(head->next != NULL) {
-                               if((tmp = (prefetchpile_t *) calloc(1, sizeof(prefetchpile_t))) == NULL) {
-                                       printf("Calloc error: %s %d\n", __FILE__, __LINE__);
-                                       return NULL;
-                               }
-                               tmp->mid = myIpAddr;
-                               tmp->next = head;
-                               head = tmp;
-                       } else {
-                               head->mid = myIpAddr;
-                       }
-                       continue;
-               }
-               /* For each tuple make piles */
-               if ((machinenum = lhashSearch(oid[i])) == 0) {
-                       printf("Error: No such Machine %s, %d\n", __FILE__, __LINE__);
-                       return NULL;
-               }
-               /* Insert into machine pile */
-               if(i == 0){
-                       offset = &arryfields[0];
-               } else {
-                       offset = &arryfields[endoffsets[i-1]];
-               }
 
-               if((head = insertPile(machinenum, oid[i], numoffset[i], offset, head)) == NULL){
-                       printf("Error: Couldn't create a pile %s, %d\n", __FILE__, __LINE__);
-                       return NULL;
-               }
-       }
-
-       return head;
+int checkoid(unsigned int oid) {
+  objheader_t *header;
+  if ((header=mhashSearch(oid))!=NULL) {
+    //Found on machine
+    return 1;
+  } else if ((header=prehashSearch(oid))!=NULL) {
+    //Found in cache
+    return 1;
+  } else {
+    return 0;
+  }
 }
 
-prefetchpile_t *foundLocal(prefetchqelem_t *node) {
-       int ntuples,i, j, k, oidnfound = 0, arryfieldindex,nextarryfieldindex, flag = 0, val;
-       unsigned int *oid;
-       int isArray;
-       char *ptr, *tmp;
-       objheader_t *objheader;
-       unsigned short *endoffsets;
-       short *arryfields; 
-
-       ptr = (char *) node;
-       ntuples = *(GET_NTUPLES(ptr));
-       oid = GET_PTR_OID(ptr);
-       endoffsets = GET_PTR_EOFF(ptr, ntuples); 
-       arryfields = GET_PTR_ARRYFLD(ptr, ntuples);
-
-       /* Find offset length for each tuple */
-       int numoffset[ntuples];//Number of offsets for each tuple
-       numoffset[0] = endoffsets[0];
-       for(i = 1; i<ntuples; i++) {
-               numoffset[i] = endoffsets[i] - endoffsets[i-1];
-       }
-
-       for(i = 0; i < ntuples; i++) { 
-               if(oid[i] == 0){
-                       if(i == 0) {
-                               arryfieldindex = 0;
-                               nextarryfieldindex =  endoffsets[0];
-                       }else {
-                               arryfieldindex = endoffsets[i-1];
-                               nextarryfieldindex =  endoffsets[i];
-                       }
-                       numoffset[i] = 0;
-                       endoffsets[0] = val = numoffset[0];
-                       for(k = 1; k < ntuples; k++) {
-                               val = val + numoffset[k];
-                               endoffsets[k] = val; 
-                       }
-                       
-                       for(k = 0; k<endoffsets[ntuples-1]; k++) {
-                               arryfields[arryfieldindex+k] = arryfields[nextarryfieldindex+k];
-                       }
-                       continue;
-               }
-
-               /* If object found locally */
-               if((objheader = (objheader_t*) mhashSearch(oid[i])) != NULL) { 
-                       isArray = 0;
-                       tmp = (char *) objheader;
-                       int orgnumoffset = numoffset[i];
-                       if(i == 0) {
-                               arryfieldindex = 0;
-                       }else {
-                               arryfieldindex = endoffsets[i-1];
-                       }
-
-                       for(j = 0; j<orgnumoffset; j++) {
-                               unsigned int objoid = 0;
-                               /* Check for arrays  */
-                               if(TYPE(objheader) > NUMCLASSES) {
-                                       isArray = 1;
-                               }
-                               if(isArray == 1) {
-                                       int elementsize = classsize[TYPE(objheader)];
-                                       struct ArrayObject *ao = (struct ArrayObject *) (tmp + sizeof(objheader_t));
-                                       int length = ao->___length___;
-                                       /* Check if array out of bounds */
-                                       if(arryfields[arryfieldindex] < 0 || arryfields[arryfieldindex] >= length) {
-                                               break; //if yes then treat the object as found 
-                                       }
-                                       objoid = *((unsigned int *)(tmp + sizeof(objheader_t) + sizeof(struct ArrayObject) + (elementsize*arryfields[arryfieldindex])));
-                               } else {
-                                       objoid = *((unsigned int *)(tmp + sizeof(objheader_t) + arryfields[arryfieldindex]));
-                               }
-                               //Update numoffset array
-                               numoffset[i] = numoffset[i] - 1;
-                               //Update oid array
-                               oid[i] = objoid;
-                               //Update endoffset array
-                               endoffsets[0] = val = numoffset[0];
-                               for(k = 1; k < ntuples; k++) {
-                                       val = val + numoffset[k];
-                                       endoffsets[k] = val; 
-                               }
-                               //Update arrayfields array
-                               for(k = 0; k < endoffsets[ntuples-1]; k++) {
-                                       arryfields[arryfieldindex+k] = arryfields[arryfieldindex+k+1];
-                               }
-                               if((objheader = (objheader_t*) mhashSearch(oid[i])) == NULL) {
-                                       flag = 1;
-                                       checkPreCache(node, numoffset, oid[i], i); 
-                                       break;
-                               }
-                               tmp = (char *) objheader;
-                               isArray = 0;
-                       }
-                       /*If all offset oids are found locally,make the prefetch tuple invalid */
-                       if(flag == 0) {
-                               oid[i] = 0;
-                       }
-               } else {
-                       /* Look in Prefetch cache */
-                       checkPreCache(node, numoffset, oid[i],i); 
-               }
-               flag = 0;
-       }
-       
-       /* Make machine groups */
-       prefetchpile_t *head = NULL;
-       if((head = makePreGroups(node, numoffset)) == NULL) {
-               printf("Error in makePreGroups() %s %d\n", __FILE__, __LINE__);
-               return NULL;
-       }
-
-       return head;
-}
+int lookupObject(unsigned int * oid, short offset) {
+  objheader_t *header;
+  if ((header=mhashSearch(*oid))!=NULL) {
+    //Found on machine
+    ;
+  } else if ((header=prehashSearch(*oid))!=NULL) {
+    //Found in cache
+    ;
+  } else {
+    return 0;
+  }
 
-void checkPreCache(prefetchqelem_t *node, int *numoffset, unsigned int objoid, int index) {
-       char *ptr, *tmp;
-       int ntuples, i, k, flag=0, isArray =0, arryfieldindex, val;
-       unsigned int * oid;
-       unsigned short *endoffsets;
-       short *arryfields;
-       objheader_t *header;
-
-       ptr = (char *) node;
-       ntuples = *(GET_NTUPLES(ptr));
-       oid = GET_PTR_OID(ptr);
-       endoffsets = GET_PTR_EOFF(ptr, ntuples);
-       arryfields = GET_PTR_ARRYFLD(ptr, ntuples);
-
-       if((header = (objheader_t *) prehashSearch(objoid)) == NULL) {
-               return;
-       } else { //Found in Prefetch Cache
-               //TODO Decide if object is too old, if old remove from cache
-               tmp = (char *) header;
-               int loopcount = numoffset[index];       
-               if(index == 0)
-                       arryfieldindex = 0;
-               else
-                       arryfieldindex = endoffsets[(index - 1)];
-               // Check if any of the offset oid is available in the Prefetch cache
-               for(i = 0; i < loopcount; i++) {
-                       /* Check for arrays  */
-                       if(TYPE(header) > NUMCLASSES) {
-                               isArray = 1;
-                       }
-               
-                       if(isArray == 1) {
-                               int elementsize = classsize[TYPE(header)];
-                               struct ArrayObject *ao = (struct ArrayObject *) (tmp + sizeof(objheader_t));
-                               int length = ao->___length___;
-                               /* Check if array out of bounds */
-                               if(arryfields[arryfieldindex] < 0 || arryfields[arryfieldindex] >= length) {
-                                       break; //if yes treat the object as found
-                               }
-                               objoid = *((unsigned int *)(tmp + sizeof(objheader_t) + sizeof(struct ArrayObject) + (elementsize*arryfields[arryfieldindex])));
-                       } else {
-                               objoid = *((unsigned int *)(tmp + sizeof(objheader_t) + arryfields[arryfieldindex]));
-                       }
-                       //Update numoffset array
-                       numoffset[index] = numoffset[index] - 1;
-                       //Update oid array
-                       oid[index] = objoid;
-                       //Update endoffset array
-                       endoffsets[0] = val = numoffset[0];
-                       for(k = 1; k < ntuples; k++) {
-                               val = val + numoffset[k];
-                               endoffsets[k] = val; 
-                       }
-                       //Update arrayfields array
-                       for(k = 0; k < endoffsets[ntuples-1]; k++) {
-                               arryfields[arryfieldindex+k] = arryfields[arryfieldindex+k+1];
-                       }
-                       if((header = (objheader_t *)prehashSearch(oid[index])) != NULL) {
-                               tmp = (char *) header;
-                               isArray = 0;
-                       } else {
-                               flag = 1;
-                               break;
-                       }
-               }
-       }
-       //Found in the prefetch cache
-       if(flag == 0 && (numoffset[index] == 0)) {
-               oid[index] = 0;
-       }
+  if(TYPE(header) > NUMCLASSES) {
+    int elementsize = classsize[TYPE(header)];
+    struct ArrayObject *ao = (struct ArrayObject *) (((char *)header) + sizeof(objheader_t));
+    int length = ao->___length___;
+    /* Check if array out of bounds */
+    if(offset < 0 || offset >= length) {
+      //if yes treat the object as found
+      (*oid)=0;
+      return 1;
+    }
+    (*oid) = *((unsigned int *)(((char *)ao) + sizeof(struct ArrayObject) + (elementsize*offset)));
+    return 1;
+  } else {
+    (*oid) = *((unsigned int *)(((char *)header) + sizeof(objheader_t) + offset));
+    return 1;
+  }
 }
 
 
-
 /* This function is called by the thread calling transPrefetch */
 void *transPrefetch(void *t) {
   while(1) {
     /* lock mutex of primary prefetch queue */
     pthread_mutex_lock(&pqueue.qlock);
     /* while primary queue is empty, then wait */
-    while((pqueue.front == NULL) && (pqueue.rear == NULL)) {
+    while(pqueue.front == NULL) {
       pthread_cond_wait(&pqueue.qcond, &pqueue.qlock);
     }
     
     /* dequeue node to create a machine piles and  finally unlock mutex */
-    prefetchqelem_t *qnode;
-    if((qnode = pre_dequeue()) == NULL) {
-      printf("Error: No node returned %s, %d\n", __FILE__, __LINE__);
-      pthread_mutex_unlock(&pqueue.qlock);
-      continue;
-    }
+    prefetchqelem_t *qnode = pre_dequeue();
     pthread_mutex_unlock(&pqueue.qlock);
     
-    /* Reduce redundant prefetch requests */
-    checkPrefetchTuples(qnode);
     /* Check if the tuples are found locally, if yes then reduce them further*/ 
     /* and group requests by remote machine ids by calling the makePreGroups() */
     prefetchpile_t *pilehead = foundLocal(qnode);
-    
-    // Get sock from shared pool 
-    int sd = getSock2(transPrefetchSockPool, pilehead->mid);
-    
-    /* Send  Prefetch Request */
-    prefetchpile_t *ptr = pilehead;
-    while(ptr != NULL) {
-      sendPrefetchReq(ptr, sd);
-      ptr = ptr->next; 
+
+    if (pilehead!=NULL) {
+      // Get sock from shared pool 
+      int sd = getSock2(transPrefetchSockPool, pilehead->mid);
+      
+      /* Send  Prefetch Request */
+      prefetchpile_t *ptr = pilehead;
+      while(ptr != NULL) {
+       sendPrefetchReq(ptr, sd);
+       ptr = ptr->next; 
+      }
+      
+      /* Release socket */
+      //       freeSock(transPrefetchSockPool, pilehead->mid, sd);
+      
+      /* Deallocated pilehead */
+      mcdealloc(pilehead);
+      
     }
-    
-    /* Release socket */
-    // freeSock(transPrefetchSockPool, pilehead->mid, sd);
-    
-    /* Deallocated pilehead */
-    mcdealloc(pilehead);
-    
     // Deallocate the prefetch queue pile node
     predealloc(qnode);
   }
 }
 
+void sendPrefetchReqnew(prefetchpile_t *mcpilenode, int sd) {
+  objpile_t *tmp;
+  
+  int size=sizeof(char)+sizeof(int);
+  for(tmp=mcpilenode->objpiles;tmp!=NULL;tmp=tmp->next) {
+    size += sizeof(int) + sizeof(unsigned int) + sizeof(unsigned int) + ((tmp->numoffset) * sizeof(short));
+  }
+  
+  char buft[size];
+  char *buf=buft;
+  *buf=TRANS_PREFETCH;
+  buf+=sizeof(char);
+  
+  for(tmp=mcpilenode->objpiles;tmp!=NULL;tmp=tmp->next) {
+    int len = sizeof(int) + sizeof(unsigned int) + sizeof(unsigned int) + ((tmp->numoffset) * sizeof(short));
+    *((int*)buf)=len;
+    buf+=sizeof(int);
+    *((unsigned int *)buf)=tmp->oid;
+    buf+=sizeof(unsigned int);
+    *((unsigned int *)(buf)) = myIpAddr; 
+    buf+=sizeof(unsigned int);
+    memcpy(buf, tmp->offset, tmp->numoffset*sizeof(short));
+    buf+=tmp->numoffset*sizeof(short);
+  }
+  *((int *)buf)=-1;
+  send_data(sd, buft, size);
+  return;
+}
+
 void sendPrefetchReq(prefetchpile_t *mcpilenode, int sd) {
-  int off, len, endpair, count = 0;
+  int len, endpair;
   char control;
   objpile_t *tmp;
   
@@ -1450,22 +1171,16 @@ void sendPrefetchReq(prefetchpile_t *mcpilenode, int sd) {
   /* Send Oids and offsets in pairs */
   tmp = mcpilenode->objpiles;
   while(tmp != NULL) {
-    off = 0;
-    count++;  /* Keeps track of the number of oid and offset tuples sent per remote machine */
     len = sizeof(int) + sizeof(unsigned int) + sizeof(unsigned int) + ((tmp->numoffset) * sizeof(short));
     char oidnoffset[len];
-    bzero(oidnoffset, len);
-    *((int*)oidnoffset) = len;
-    off = sizeof(int);
-    *((unsigned int *)(oidnoffset + off)) = tmp->oid;
-    off += sizeof(unsigned int);
-    *((unsigned int *)(oidnoffset + off)) = myIpAddr; 
-    off += sizeof(unsigned int);
-    int i;
-    for(i = 0; i < tmp->numoffset; i++) {
-      *((short*)(oidnoffset + off)) = tmp->offset[i];
-      off+=sizeof(short);
-    }
+    char *buf=oidnoffset;
+    *((int*)buf) = len;
+    buf+=sizeof(int);
+    *((unsigned int *)buf) = tmp->oid;
+    buf+=sizeof(unsigned int);
+    *((unsigned int *)buf) = myIpAddr; 
+    buf += sizeof(unsigned int);
+    memcpy(buf, tmp->offset, tmp->numoffset*sizeof(short));
     send_data(sd, oidnoffset, len);
     tmp = tmp->next;
   }
@@ -1478,91 +1193,84 @@ void sendPrefetchReq(prefetchpile_t *mcpilenode, int sd) {
 }
 
 int getPrefetchResponse(int sd) {
-       int numbytes = 0, length = 0, size = 0;
-       char *recvbuffer, control;
-       unsigned int oid;
-       void *modptr, *oldptr;
-
-       recv_data((int)sd, &length, sizeof(int)); 
-       size = length - sizeof(int);
-       if((recvbuffer = calloc(1, size)) == NULL) {
-               printf("%s() Calloc error at %s,%d\n", __func__, __FILE__, __LINE__);
-               return -1;
-       }
-
-       recv_data((int)sd, recvbuffer, size);
-
-       control = *((char *) recvbuffer);
-       if(control == OBJECT_FOUND) {
-               numbytes = 0;
-               oid = *((unsigned int *)(recvbuffer + sizeof(char)));
-               size = size - (sizeof(char) + sizeof(unsigned int));
-               pthread_mutex_lock(&prefetchcache_mutex);
-               if ((modptr = objstrAlloc(prefetchcache, size)) == NULL) {
-                       printf("Error: objstrAlloc error for copying into prefetch cache %s, %d\n", __FILE__, __LINE__);
-                       pthread_mutex_unlock(&prefetchcache_mutex);
-                       free(recvbuffer);
-                       return -1;
-               }
-               pthread_mutex_unlock(&prefetchcache_mutex);
-               memcpy(modptr, recvbuffer + sizeof(char) + sizeof(unsigned int), size);
-
-               /* Insert the oid and its address into the prefetch hash lookup table */
-               /* Do a version comparison if the oid exists */
-               if((oldptr = prehashSearch(oid)) != NULL) {
-                       /* If older version then update with new object ptr */
-                       if(((objheader_t *)oldptr)->version <= ((objheader_t *)modptr)->version) {
-                               prehashRemove(oid);
-                               prehashInsert(oid, modptr);
-                       } else {
-                               /* TODO modptr should be reference counted */
-                       }
-               } else {/* Else add the object ptr to hash table*/
-                       prehashInsert(oid, modptr);
-               }
-               /* Lock the Prefetch Cache look up table*/
-               pthread_mutex_lock(&pflookup.lock);
-               /* Broadcast signal on prefetch cache condition variable */ 
-               pthread_cond_broadcast(&pflookup.cond);
-               /* Unlock the Prefetch Cache look up table*/
-               pthread_mutex_unlock(&pflookup.lock);
-       } else if(control == OBJECT_NOT_FOUND) {
-               oid = *((unsigned int *)(recvbuffer + sizeof(char)));
-               /* TODO: For each object not found query DHT for new location and retrieve the object */
-               /* Throw an error */
-               printf("OBJECT %x NOT FOUND.... THIS SHOULD NOT HAPPEN...TERMINATE PROGRAM\n", oid);
-               free(recvbuffer);
-               exit(-1);
-       } else {
-               printf("Error: in decoding the control value %d, %s, %d\n",control, __FILE__, __LINE__);
-       }
-
-       free(recvbuffer);
-
-       return 0;
+  int numbytes = 0, length = 0, size = 0;
+  char *recvbuffer, control;
+  unsigned int oid;
+  void *modptr, *oldptr;
+  
+  recv_data((int)sd, &length, sizeof(int)); 
+  size = length - sizeof(int);
+  recvbuffer = calloc(1, size);
+
+  recv_data((int)sd, recvbuffer, size);
+
+  control = *((char *) recvbuffer);
+  if(control == OBJECT_FOUND) {
+    numbytes = 0;
+    oid = *((unsigned int *)(recvbuffer + sizeof(char)));
+    size = size - (sizeof(char) + sizeof(unsigned int));
+    pthread_mutex_lock(&prefetchcache_mutex);
+    if ((modptr = objstrAlloc(prefetchcache, size)) == NULL) {
+      printf("Error: objstrAlloc error for copying into prefetch cache %s, %d\n", __FILE__, __LINE__);
+      pthread_mutex_unlock(&prefetchcache_mutex);
+      free(recvbuffer);
+      return -1;
+    }
+    pthread_mutex_unlock(&prefetchcache_mutex);
+    memcpy(modptr, recvbuffer + sizeof(char) + sizeof(unsigned int), size);
+    
+    /* Insert the oid and its address into the prefetch hash lookup table */
+    /* Do a version comparison if the oid exists */
+    if((oldptr = prehashSearch(oid)) != NULL) {
+      /* If older version then update with new object ptr */
+      if(((objheader_t *)oldptr)->version <= ((objheader_t *)modptr)->version) {
+       prehashRemove(oid);
+       prehashInsert(oid, modptr);
+      } else {
+       /* TODO modptr should be reference counted */
+      }
+    } else {/* Else add the object ptr to hash table*/
+      prehashInsert(oid, modptr);
+    }
+    /* Lock the Prefetch Cache look up table*/
+    pthread_mutex_lock(&pflookup.lock);
+    /* Broadcast signal on prefetch cache condition variable */ 
+    pthread_cond_broadcast(&pflookup.cond);
+    /* Unlock the Prefetch Cache look up table*/
+    pthread_mutex_unlock(&pflookup.lock);
+  } else if(control == OBJECT_NOT_FOUND) {
+    oid = *((unsigned int *)(recvbuffer + sizeof(char)));
+    /* TODO: For each object not found query DHT for new location and retrieve the object */
+    /* Throw an error */
+    printf("OBJECT %x NOT FOUND.... THIS SHOULD NOT HAPPEN...TERMINATE PROGRAM\n", oid);
+    free(recvbuffer);
+    exit(-1);
+  } else {
+    printf("Error: in decoding the control value %d, %s, %d\n",control, __FILE__, __LINE__);
+  }
+  
+  free(recvbuffer);
+  
+  return 0;
 }
 
-unsigned short getObjType(unsigned int oid)
-{
-       objheader_t *objheader;
-       unsigned short numoffset[] ={0};
-       short fieldoffset[] ={};
-
-       if ((objheader = (objheader_t *) mhashSearch(oid)) == NULL)
-       {
-               if ((objheader = (objheader_t *) prehashSearch(oid)) == NULL)
-               {
-                       prefetch(1, &oid, numoffset, fieldoffset);
-                       pthread_mutex_lock(&pflookup.lock);
-                       while ((objheader = (objheader_t *) prehashSearch(oid)) == NULL)
-                       {
-                               pthread_cond_wait(&pflookup.cond, &pflookup.lock);
-                       }
-                       pthread_mutex_unlock(&pflookup.lock);
-               }
+unsigned short getObjType(unsigned int oid) {
+  objheader_t *objheader;
+  unsigned short numoffset[] ={0};
+  short fieldoffset[] ={};
+  
+  if ((objheader = (objheader_t *) mhashSearch(oid)) == NULL) {
+      if ((objheader = (objheader_t *) prehashSearch(oid)) == NULL) {
+       prefetch(1, &oid, numoffset, fieldoffset);
+       pthread_mutex_lock(&pflookup.lock);
+       while ((objheader = (objheader_t *) prehashSearch(oid)) == NULL) {
+         pthread_cond_wait(&pflookup.cond, &pflookup.lock);
        }
-
-       return TYPE(objheader);
+       pthread_mutex_unlock(&pflookup.lock);
+      }
+  }
+  
+  return TYPE(objheader);
 }
 
 int startRemoteThread(unsigned int oid, unsigned int mid)