various changes...
authorbdemsky <bdemsky>
Sat, 3 May 2008 20:07:04 +0000 (20:07 +0000)
committerbdemsky <bdemsky>
Sat, 3 May 2008 20:07:04 +0000 (20:07 +0000)
bug fix for transactions with a large number of objects

Robust/src/Runtime/DSTM/interface/dstm.h
Robust/src/Runtime/DSTM/interface/dstmserver.c
Robust/src/Runtime/DSTM/interface/objstr.c
Robust/src/Runtime/DSTM/interface/plookup.c
Robust/src/Runtime/DSTM/interface/plookup.h
Robust/src/Runtime/DSTM/interface/prelookup.c
Robust/src/Runtime/DSTM/interface/sockpool.c
Robust/src/Runtime/DSTM/interface/sockpool.h
Robust/src/Runtime/DSTM/interface/trans.c

index 4b5b9e4be3de141af64f48347923d9950b0494fe..6311e213f27c6353ba221ed4a9721778ac275d9b 100644 (file)
@@ -99,6 +99,9 @@ typedef struct objheader {
 #define STATUS(x)\
         *((unsigned int *) &(((struct ___Object___ *)((unsigned int) x + sizeof(objheader_t)))->___localcopy___))
 
+#define STATUSPTR(x)\
+        ((unsigned int *) &(((struct ___Object___ *)((unsigned int) x + sizeof(objheader_t)))->___localcopy___))
+
 #define TYPE(x)\
         ((struct ___Object___ *)((unsigned int) x + sizeof(objheader_t)))->type
 
@@ -125,6 +128,7 @@ typedef struct objheader {
 #define OID(x) x->oid
 #define TYPE(x) x->type
 #define STATUS(x) x->status
+#define STATUSPTR(x) &x->status
 #define GETSIZE(size, x) size=classsize[TYPE(x)]
 #endif
 
@@ -158,9 +162,9 @@ typedef struct fixed_data {
   char control;                        /* control message */
   char trans_id[TID_LEN];      /* transaction id */
   int mcount;                  /* participant count */
-  short numread;               /* no of objects read */
-  short nummod;                        /* no of objects modified */
-  short numcreated;            /* no of objects created */
+  unsigned int numread;                /* no of objects read */
+  unsigned int nummod;                 /* no of objects modified */
+  unsigned int numcreated;             /* no of objects created */
   int sum_bytes;               /* total bytes of modified objects in a transaction */
 } fixed_data_t;
 
index b5bcac5bc1a316ddd6f4efa748e9f529ffe4f421..2674a911fda741bf145dc7cbb9f05e7aeee891b1 100644 (file)
@@ -365,7 +365,7 @@ int processClientReq(fixed_data_t *fixed, trans_commit_data_t *transinfo,
                                        printf("mhashSearch returns NULL at %s, %d\n", __FILE__, __LINE__);// find the header address
                                        return 1;
                                }
-                               STATUS(((objheader_t *)header)) &= ~(LOCK);             
+                               UnLock(STATUSPTR(header));
                        }
 
                        /* Send ack to Coordinator */
@@ -454,9 +454,8 @@ char handleTransReq(fixed_data_t *fixed, trans_commit_data_t *transinfo, unsigne
           objnotfound++;
         } else { /* If Obj found in machine (i.e. has not moved) */
           /* Check if Obj is locked by any previous transaction */
-          pthread_mutex_lock(&lockObjHeader);
-          if ((STATUS(((objheader_t *)mobj)) & LOCK) == LOCK) {                
-            pthread_mutex_unlock(&lockObjHeader);
+          if (test_and_set(STATUSPTR(mobj))) {
+           //don't have lock
             if (version == ((objheader_t *)mobj)->version) {      /* If locked then match versions */
               v_matchlock++;
             } else {/* If versions don't match ...HARD ABORT */
@@ -469,7 +468,7 @@ char handleTransReq(fixed_data_t *fixed, trans_commit_data_t *transinfo, unsigne
                     printf("mhashSearch returns NULL at %s, %d\n", __FILE__, __LINE__);
                     return 0;
                   }
-                  STATUS(headptr) &= ~(LOCK);
+                 UnLock(STATUSPTR(headptr));
                 }
                 free(oidlocked);
               }
@@ -477,8 +476,6 @@ char handleTransReq(fixed_data_t *fixed, trans_commit_data_t *transinfo, unsigne
               return control;
             }
           } else {/* If Obj is not locked then lock object */
-            STATUS(((objheader_t *)mobj)) |= LOCK;
-            pthread_mutex_unlock(&lockObjHeader);
             /* Save all object oids that are locked on this machine during this transaction request call */
             oidlocked[objlocked] = OID(((objheader_t *)mobj));
             objlocked++;
@@ -493,7 +490,7 @@ char handleTransReq(fixed_data_t *fixed, trans_commit_data_t *transinfo, unsigne
                     printf("mhashSearch returns NULL at %s, %d\n", __FILE__, __LINE__);
                     return 0;
                   }
-                  STATUS(headptr) &= ~(LOCK);
+                 UnLock(STATUSPTR(headptr));
                 }
                 free(oidlocked);
               }
@@ -562,47 +559,45 @@ char decideCtrlMessage(fixed_data_t *fixed, trans_commit_data_t *transinfo, int
  * addresses in lookup table and also changes version number
  * Sends an ACK back to Coordinator */
 int transCommitProcess(void *modptr, unsigned int *oidmod, unsigned int *oidlocked, int nummod, int numlocked, int acceptfd) {
-       objheader_t *header;
-       objheader_t *newheader;
-       int i = 0, offset = 0;
-       char control;
-       int tmpsize;
-
-       /* Process each modified object saved in the mainobject store */
-       for(i = 0; i < nummod; i++) {
-               if((header = (objheader_t *) mhashSearch(oidmod[i])) == NULL) {
-                       printf("Error: mhashsearch returns NULL at %s, %d\n", __FILE__, __LINE__);
-                       return 1;
-               }
-               GETSIZE(tmpsize,header);
-               memcpy((char*)header + sizeof(objheader_t), ((char *)modptr + sizeof(objheader_t) + offset), tmpsize);
-               header->version += 1; 
-               /* If threads are waiting on this object to be updated, notify them */
-        pthread_mutex_lock(&notifymutex);
-               if(header->notifylist != NULL) {
-                       notifyAll(&header->notifylist, OID(header), header->version);
-               }
-        pthread_mutex_unlock(&notifymutex);
-               offset += sizeof(objheader_t) + tmpsize;
-       }
-
-       if (nummod > 0)
-               free(modptr);
-
-       /* Unlock locked objects */
-       for(i = 0; i < numlocked; i++) {
-               if((header = (objheader_t *) mhashSearch(oidlocked[i])) == NULL) {
-                       printf("Error: mhashsearch returns NULL at %s, %d\n", __FILE__, __LINE__);
-                       return 1;
-               }
-               STATUS(header) &= ~(LOCK);
-       }
-       //TODO Update location lookup table
-
-       /* Send ack to coordinator */
-       control = TRANS_SUCESSFUL;
-       send_data((int)acceptfd, &control, sizeof(char));
-       return 0;
+  objheader_t *header;
+  objheader_t *newheader;
+  int i = 0, offset = 0;
+  char control;
+  int tmpsize;
+  
+  /* Process each modified object saved in the mainobject store */
+  for(i = 0; i < nummod; i++) {
+    if((header = (objheader_t *) mhashSearch(oidmod[i])) == NULL) {
+      printf("Error: mhashsearch returns NULL at %s, %d\n", __FILE__, __LINE__);
+      return 1;
+    }
+    GETSIZE(tmpsize,header);
+    memcpy((char*)header + sizeof(objheader_t), ((char *)modptr + sizeof(objheader_t) + offset), tmpsize);
+    header->version += 1; 
+    /* If threads are waiting on this object to be updated, notify them */
+    if(header->notifylist != NULL) {
+      notifyAll(&header->notifylist, OID(header), header->version);
+    }
+    offset += sizeof(objheader_t) + tmpsize;
+  }
+  
+  if (nummod > 0)
+    free(modptr);
+  
+  /* Unlock locked objects */
+  for(i = 0; i < numlocked; i++) {
+    if((header = (objheader_t *) mhashSearch(oidlocked[i])) == NULL) {
+      printf("Error: mhashsearch returns NULL at %s, %d\n", __FILE__, __LINE__);
+      return 1;
+    }
+    UnLock(STATUSPTR(header));
+  }
+  //TODO Update location lookup table
+  
+  /* Send ack to coordinator */
+  control = TRANS_SUCESSFUL;
+  send_data((int)acceptfd, &control, sizeof(char));
+  return 0;
 }
 
 /* This function recevies the oid and offset tuples from the Coordinator's prefetch call.
@@ -725,74 +720,70 @@ void sendPrefetchResponse(int sd, char *control, char *sendbuffer, int *size) {
 }
 
 void processReqNotify(unsigned int numoid, unsigned int *oidarry, unsigned short *versionarry, unsigned int mid, unsigned int threadid) {
-       objheader_t *header;
-       unsigned int oid;
-       unsigned short newversion;
-       char msg[1+  2 * sizeof(unsigned int) + sizeof(unsigned short)];
-       int sd;
-       struct sockaddr_in remoteAddr;
-       int bytesSent;
-       int size;
-       int i = 0;
-
-       while(i < numoid) {
-               oid = *(oidarry + i);
-               if((header = (objheader_t *) mhashSearch(oid)) == NULL) {
-                       printf("Error: mhashsearch returns NULL at %s, %d\n", __FILE__, __LINE__);
-                       return;
-               } else {
-                       /* Check to see if versions are same */
-checkversion:
-                       if ((STATUS(header) & LOCK) != LOCK) {          
-                pthread_mutex_lock(&notifymutex);
-                               STATUS(header) |= LOCK;
-                               newversion = header->version;
-                               if(newversion == *(versionarry + i)) {
-                                       //Add to the notify list 
-                                       if((header->notifylist = insNode(header->notifylist, threadid, mid)) == NULL) {
-                                               printf("Error: Obj notify list points to NULL %s, %d\n", __FILE__, __LINE__); 
-                        pthread_mutex_unlock(&notifymutex);
-                                               return;
-                                       }
-                                       STATUS(header) &= ~(LOCK);              
-                    pthread_mutex_unlock(&notifymutex);
-                               } else {
-                                       STATUS(header) &= ~(LOCK);              
-                    pthread_mutex_unlock(&notifymutex);
-                                       if ((sd = socket(AF_INET, SOCK_STREAM, 0)) < 0){
-                                               perror("processReqNotify():socket()");
-                                               return;
-                                       }
-                                       bzero(&remoteAddr, sizeof(remoteAddr));
-                                       remoteAddr.sin_family = AF_INET;
-                                       remoteAddr.sin_port = htons(LISTEN_PORT);
-                                       remoteAddr.sin_addr.s_addr = htonl(mid);
-
-                                       if (connect(sd, (struct sockaddr *)&remoteAddr, sizeof(remoteAddr)) < 0){
-                                               printf("Error: processReqNotify():error %d connecting to %s:%d\n", errno,
-                                                               inet_ntoa(remoteAddr.sin_addr), LISTEN_PORT);
-                                               close(sd);
-                                               return;
-                                       } else {
-                                               //Send Update notification
-                                               msg[0] = THREAD_NOTIFY_RESPONSE;
-                                               *((unsigned int *)&msg[1]) = oid;
-                                               size = sizeof(unsigned int);
-                                               *((unsigned short *)(&msg[1]+size)) = newversion;
-                                               size += sizeof(unsigned short);
-                                               *((unsigned int *)(&msg[1]+size)) = threadid;
-                                               size = 1+ 2*sizeof(unsigned int) + sizeof(unsigned short);
-                                               send_data(sd, msg, size);
-                                       }
-                                       close(sd);
-                               }
-                       } else {
-                               randomdelay();
-                               goto checkversion;
-                       }
-               }
-               i++;
+  objheader_t *header;
+  unsigned int oid;
+  unsigned short newversion;
+  char msg[1+  2 * sizeof(unsigned int) + sizeof(unsigned short)];
+  int sd;
+  struct sockaddr_in remoteAddr;
+  int bytesSent;
+  int size;
+  int i = 0;
+  
+  while(i < numoid) {
+    oid = *(oidarry + i);
+    if((header = (objheader_t *) mhashSearch(oid)) == NULL) {
+      printf("Error: mhashsearch returns NULL at %s, %d\n", __FILE__, __LINE__);
+      return;
+    } else {
+      /* Check to see if versions are same */
+    checkversion:
+      if (test_and_set(STATUSPTR(header))==0) {
+       //have lock
+       newversion = header->version;
+       if(newversion == *(versionarry + i)) {
+         //Add to the notify list 
+         if((header->notifylist = insNode(header->notifylist, threadid, mid)) == NULL) {
+           printf("Error: Obj notify list points to NULL %s, %d\n", __FILE__, __LINE__); 
+           return;
+         }
+         UnLock(STATUSPTR(header));
+       } else {
+         UnLock(STATUSPTR(header));
+         if ((sd = socket(AF_INET, SOCK_STREAM, 0)) < 0){
+           perror("processReqNotify():socket()");
+           return;
+         }
+         bzero(&remoteAddr, sizeof(remoteAddr));
+         remoteAddr.sin_family = AF_INET;
+         remoteAddr.sin_port = htons(LISTEN_PORT);
+         remoteAddr.sin_addr.s_addr = htonl(mid);
+         
+         if (connect(sd, (struct sockaddr *)&remoteAddr, sizeof(remoteAddr)) < 0){
+           printf("Error: processReqNotify():error %d connecting to %s:%d\n", errno,
+                  inet_ntoa(remoteAddr.sin_addr), LISTEN_PORT);
+           close(sd);
+           return;
+         } else {
+           //Send Update notification
+           msg[0] = THREAD_NOTIFY_RESPONSE;
+           *((unsigned int *)&msg[1]) = oid;
+           size = sizeof(unsigned int);
+           *((unsigned short *)(&msg[1]+size)) = newversion;
+           size += sizeof(unsigned short);
+           *((unsigned int *)(&msg[1]+size)) = threadid;
+           size = 1+ 2*sizeof(unsigned int) + sizeof(unsigned short);
+           send_data(sd, msg, size);
+         }
+         close(sd);
        }
-       free(oidarry);
-       free(versionarry);
+      } else {
+       randomdelay();
+       goto checkversion;
+      }
+    }
+    i++;
+  }
+  free(oidarry);
+  free(versionarry);
 }
index b8279487fd79de813cea049559a7e5049a5ae816..e7caec5661c13b29ab477c4298d4f732bfa3281a 100644 (file)
@@ -14,59 +14,46 @@ objstr_t *objstrCreate(unsigned int size) {
 }
 
 //free entire list, starting at store
-void objstrDelete(objstr_t *store)
-{
-       objstr_t *tmp;
-       while (store != NULL)
-       {
-               tmp = store->next;
-               free(store);
-               store = tmp;
-       }
-       return;
+void objstrDelete(objstr_t *store) {
+  objstr_t *tmp;
+  while (store != NULL) {
+    tmp = store->next;
+    free(store);
+    store = tmp;
+  }
+  return;
 }
 
-void *objstrAlloc(objstr_t *store, unsigned int size)
-{
-       void *tmp;
-       while (1)
-       {
-               if (((unsigned int)store->top - (unsigned int)store - sizeof(objstr_t) + size) <= store->size)
-               {  //store not full
-                       tmp = store->top;
-                       store->top += size;
-                       return tmp;
-               }
-               //store full
-               if (store->next == NULL)
-               {  //end of list, all full
-                       if (size > DEFAULT_OBJ_STORE_SIZE) //in case of large objects
-                       {
-                               if((store->next = (objstr_t *)calloc(1,(sizeof(objstr_t) + size))) == NULL) {
-                  printf("%s() Calloc error at line %d, %s\n", __func__, __LINE__, __FILE__);
-                  return NULL;
-                }
-                               if (store->next == NULL)
-                                       return NULL;
-                               store = store->next;
-                               store->size = size;
-                       }
-                       else
-                       {
-                               if((store->next = calloc(1,(sizeof(objstr_t) + DEFAULT_OBJ_STORE_SIZE))) == NULL) {
-                  printf("%s() Calloc error at line %d, %s\n", __func__, __LINE__, __FILE__);
-                  return NULL;
-                }
-                               if (store->next == NULL)
-                                       return NULL;
-                               store = store->next;
-                               store->next = NULL;
-                               store->size = DEFAULT_OBJ_STORE_SIZE;
-                       }
-                       store->top = (void *)((unsigned int)store + sizeof(objstr_t) + size);
-                       return (void *)((unsigned int)store + sizeof(objstr_t));
-               }
-               else  //try the next one
-                       store = store->next;
+void *objstrAlloc(objstr_t *store, unsigned int size) {
+  void *tmp;
+  while (1) {
+    if (((unsigned int)store->top - (((unsigned int)store) + sizeof(objstr_t)) + size) <= store->size) { //store not full
+      tmp = store->top;
+      store->top += size;
+      return tmp;
+    }
+    //store full
+    if (store->next == NULL) {
+      //end of list, all full
+      if (size > DEFAULT_OBJ_STORE_SIZE) {
+       //in case of large objects
+       if((store->next = (objstr_t *)calloc(1,(sizeof(objstr_t) + size))) == NULL) {
+         printf("%s() Calloc error at line %d, %s\n", __func__, __LINE__, __FILE__);
+         return NULL;
        }
+       store = store->next;
+       store->size = size;
+      } else {
+       if((store->next = calloc(1,(sizeof(objstr_t) + DEFAULT_OBJ_STORE_SIZE))) == NULL) {
+         printf("%s() Calloc error at line %d, %s\n", __func__, __LINE__, __FILE__);
+         return NULL;
+       }
+       store = store->next;
+       store->size = DEFAULT_OBJ_STORE_SIZE;
+      }
+      store->top = (void *)(((unsigned int)store) + sizeof(objstr_t) + size);
+      return (void *)(((unsigned int)store) + sizeof(objstr_t));
+    } else
+      store = store->next;
+  }
 }
index 7672ee0ee11140e4785126abae6dc13b5279d516..aafebf0a306fbf921a4bebb4a6476f8db40fe418 100644 (file)
@@ -43,69 +43,68 @@ plistnode_t *pCreate(int objects) {
 /* This function inserts necessary information into 
  * a machine pile data structure */
 plistnode_t *pInsert(plistnode_t *pile, objheader_t *headeraddr, unsigned int mid, int num_objs) {
-       plistnode_t *ptr, *tmp;
-       int found = 0, offset = 0;
-
-       tmp = pile;
-       //Add oid into a machine that is already present in the pile linked list structure
-       while(tmp != NULL) {
-               if (tmp->mid == mid) {
-                 int tmpsize;
-                 
-                 if (STATUS(headeraddr) & NEW) {
-                   tmp->oidcreated[tmp->numcreated] = OID(headeraddr);
-                   tmp->numcreated = tmp->numcreated + 1;
-                   GETSIZE(tmpsize, headeraddr);
-                   tmp->sum_bytes += sizeof(objheader_t) + tmpsize;
-                 }else if (STATUS(headeraddr) & DIRTY) {
-                   tmp->oidmod[tmp->nummod] = OID(headeraddr);
-                   tmp->nummod = tmp->nummod + 1;
-                   GETSIZE(tmpsize, headeraddr);
-                   tmp->sum_bytes += sizeof(objheader_t) + tmpsize;
-                 } else {
-                   offset = (sizeof(unsigned int) + sizeof(short)) * tmp->numread;
-                   *((unsigned int *)(((char *)tmp->objread) + offset))=OID(headeraddr);
-                   offset += sizeof(unsigned int);
-                   *((short *)(((char *)tmp->objread) + offset)) = headeraddr->version;
-                   tmp->numread = tmp->numread + 1;
-                 }
-                 found = 1;
-                 break;
-               }
-               tmp = tmp->next;
-       }
-       //Add oid for any new machine 
-       if (!found) {
-         int tmpsize;
-         if((ptr = pCreate(num_objs)) == NULL) {
-           return NULL;
-         }
-         ptr->mid = mid;
-         if (STATUS(headeraddr) & NEW) {
-           ptr->oidcreated[ptr->numcreated] = OID(headeraddr);
-           ptr->numcreated = ptr->numcreated + 1;
-           GETSIZE(tmpsize, headeraddr);
-           ptr->sum_bytes += sizeof(objheader_t) + tmpsize;
-         } else if (STATUS(headeraddr) & DIRTY) {
-           ptr->oidmod[ptr->nummod] = OID(headeraddr);
-           ptr->nummod = ptr->nummod + 1;
-           GETSIZE(tmpsize, headeraddr);
-           ptr->sum_bytes += sizeof(objheader_t) + tmpsize;
-         } else {
-           *((unsigned int *)ptr->objread)=OID(headeraddr);
-           offset = sizeof(unsigned int);
-           *((short *)(((char *)ptr->objread) + offset)) = headeraddr->version;
-           ptr->numread = ptr->numread + 1;
-         }
-         ptr->next = pile;
-         pile = ptr;
-       }
-
-       /* Clear Flags */
-       STATUS(headeraddr) &= ~NEW;
-       STATUS(headeraddr) &= ~DIRTY;
-
-       return pile;
+  plistnode_t *ptr, *tmp;
+  int found = 0, offset = 0;
+  
+  tmp = pile;
+  //Add oid into a machine that is already present in the pile linked list structure
+  while(tmp != NULL) {
+    if (tmp->mid == mid) {
+      int tmpsize;
+      
+      if (STATUS(headeraddr) & NEW) {
+       tmp->oidcreated[tmp->numcreated] = OID(headeraddr);
+       tmp->numcreated++;
+       GETSIZE(tmpsize, headeraddr);
+       tmp->sum_bytes += sizeof(objheader_t) + tmpsize;
+      }else if (STATUS(headeraddr) & DIRTY) {
+       tmp->oidmod[tmp->nummod] = OID(headeraddr);
+       tmp->nummod++;
+       GETSIZE(tmpsize, headeraddr);
+       tmp->sum_bytes += sizeof(objheader_t) + tmpsize;
+      } else {
+       offset = (sizeof(unsigned int) + sizeof(short)) * tmp->numread;
+       *((unsigned int *)(((char *)tmp->objread) + offset))=OID(headeraddr);
+       offset += sizeof(unsigned int);
+       *((short *)(((char *)tmp->objread) + offset)) = headeraddr->version;
+       tmp->numread ++;
+      }
+      found = 1;
+      break;
+    }
+    tmp = tmp->next;
+  }
+  //Add oid for any new machine 
+  if (!found) {
+    int tmpsize;
+    if((ptr = pCreate(num_objs)) == NULL) {
+      return NULL;
+    }
+    ptr->mid = mid;
+    if (STATUS(headeraddr) & NEW) {
+      ptr->oidcreated[ptr->numcreated] = OID(headeraddr);
+      ptr->numcreated ++;
+      GETSIZE(tmpsize, headeraddr);
+      ptr->sum_bytes += sizeof(objheader_t) + tmpsize;
+    } else if (STATUS(headeraddr) & DIRTY) {
+      ptr->oidmod[ptr->nummod] = OID(headeraddr);
+      ptr->nummod ++;
+      GETSIZE(tmpsize, headeraddr);
+      ptr->sum_bytes += sizeof(objheader_t) + tmpsize;
+    } else {
+      *((unsigned int *)ptr->objread)=OID(headeraddr);
+      offset = sizeof(unsigned int);
+      *((short *)(((char *)ptr->objread) + offset)) = headeraddr->version;
+      ptr->numread ++;
+    }
+    ptr->next = pile;
+    pile = ptr;
+  }
+  
+  /* Clear Flags */
+  STATUS(headeraddr) =0;
+  
+  return pile;
 }
 
 //Count the number of machine piles
index 777f0259adca6a644e6ec5dcc1f152c5bbbce17b..fdcfc535052b3fc86091f208894b0428b2ea28d7 100644 (file)
@@ -10,9 +10,9 @@
  * participants involved in a transaction. */
 typedef struct plistnode {
        unsigned int mid;
-       short numread;          /* no of objects modified */
-       short nummod;           /* no of objects read */
-       short numcreated; /* no of objects created */
+       unsigned int numread;           /* no of objects modified */
+       unsigned int nummod;            /* no of objects read */
+       unsigned int  numcreated; /* no of objects created */
        int sum_bytes;          /* total bytes of objects modified */
        char *objread;          /* Pointer to array containing oids of objects read and their version numbers*/
        unsigned int *oidmod;   /* Pointer to array containing oids of modified objects */ 
index 4d596f1f281c330b187e2a790cd771aaf2183ddc..6564f3efa39bf6b475e23816318895cd8b6fed1f 100644 (file)
 prehashtable_t pflookup; //Global prefetch cache table
 
 unsigned int prehashCreate(unsigned int size, float loadfactor) {
-       prehashlistnode_t *nodes; 
-        int i; 
-
-        // Allocate space for the hash table 
-       if((nodes = calloc(size, sizeof(prehashlistnode_t))) == NULL) { 
-               printf("Calloc error %s %d\n", __FILE__, __LINE__);
-               return 1;
-       }       
-
-        pflookup.table = nodes;
-        pflookup.size = size; 
-        pflookup.numelements = 0; // Initial number of elements in the hash
-        pflookup.loadfactor = loadfactor;
-
-       //Intiliaze and set prefetch table mutex attribute
-       pthread_mutexattr_init(&pflookup.prefetchmutexattr);
-       //NOTE:PTHREAD_MUTEX_RECURSIVE is currently inside a #if_def UNIX98 in the pthread.h file
-       //Therefore use PTHREAD_MUTEX_RECURSIVE_NP instead
-       pthread_mutexattr_settype(&pflookup.prefetchmutexattr, PTHREAD_MUTEX_RECURSIVE_NP);
-
-       //Initialize mutex var
-       pthread_mutex_init(&pflookup.lock, &pflookup.prefetchmutexattr);
-       //pthread_mutex_init(&pflookup.lock, NULL);
-       pthread_cond_init(&pflookup.cond, NULL); 
-       return 0;
+  prehashlistnode_t *nodes; 
+  int i; 
+  
+  // Allocate space for the hash table 
+  if((nodes = calloc(size, sizeof(prehashlistnode_t))) == NULL) { 
+    printf("Calloc error %s %d\n", __FILE__, __LINE__);
+    return 1;
+  }
+  
+  pflookup.table = nodes;
+  pflookup.size = size; 
+  pflookup.numelements = 0; // Initial number of elements in the hash
+  pflookup.loadfactor = loadfactor;
+  
+  //Intiliaze and set prefetch table mutex attribute
+  pthread_mutexattr_init(&pflookup.prefetchmutexattr);
+  //NOTE:PTHREAD_MUTEX_RECURSIVE is currently inside a #if_def UNIX98 in the pthread.h file
+  //Therefore use PTHREAD_MUTEX_RECURSIVE_NP instead
+  pthread_mutexattr_settype(&pflookup.prefetchmutexattr, PTHREAD_MUTEX_RECURSIVE_NP);
+  
+  //Initialize mutex var
+  pthread_mutex_init(&pflookup.lock, &pflookup.prefetchmutexattr);
+  //pthread_mutex_init(&pflookup.lock, NULL);
+  pthread_cond_init(&pflookup.cond, NULL); 
+  return 0;
 }
 
 //Assign keys to bins inside hash table
 unsigned int prehashFunction(unsigned int key) {
-       return ( key % (pflookup.size));
+  return ( key % (pflookup.size));
 }
 
 //Store oids and their pointers into hash
 unsigned int prehashInsert(unsigned int key, void *val) {
-       unsigned int newsize;
-       int index;
-       prehashlistnode_t *ptr, *node;
-
-       if(pflookup.numelements > (pflookup.loadfactor * pflookup.size)) {
-               //Resize
-               newsize = 2 * pflookup.size + 1;
-               pthread_mutex_lock(&pflookup.lock);
-               prehashResize(newsize);
-               pthread_mutex_unlock(&pflookup.lock);
-       }
-
-       ptr = pflookup.table;
-       pflookup.numelements++;
-       index = prehashFunction(key);
-       
-       pthread_mutex_lock(&pflookup.lock);
-       if(ptr[index].next == NULL && ptr[index].key == 0) {    // Insert at the first position in the hashtable
-               ptr[index].key = key;
-               ptr[index].val = val;
-       } else {                        // Insert in the beginning of linked list
-               if ((node = calloc(1, sizeof(prehashlistnode_t))) == NULL) {
-                       printf("Calloc error %s, %d\n", __FILE__, __LINE__);
-                       pthread_mutex_unlock(&pflookup.lock);
-                       return 1;
-               }
-               node->key = key;
-               node->val = val ;
-               node->next = ptr[index].next;
-               ptr[index].next = node;
-       }
-       pthread_mutex_unlock(&pflookup.lock);
-       return 0;
+  unsigned int newsize;
+  int index;
+  prehashlistnode_t *ptr, *node;
+  
+  if(pflookup.numelements > (pflookup.loadfactor * pflookup.size)) {
+    //Resize
+    newsize = 2 * pflookup.size + 1;
+    pthread_mutex_lock(&pflookup.lock);
+    prehashResize(newsize);
+    pthread_mutex_unlock(&pflookup.lock);
+  }
+  
+  ptr = pflookup.table;
+  pflookup.numelements++;
+  index = prehashFunction(key);
+  
+  pthread_mutex_lock(&pflookup.lock);
+  if(ptr[index].next == NULL && ptr[index].key == 0) { // Insert at the first position in the hashtable
+    ptr[index].key = key;
+    ptr[index].val = val;
+  } else {                     // Insert in the beginning of linked list
+    if ((node = calloc(1, sizeof(prehashlistnode_t))) == NULL) {
+      printf("Calloc error %s, %d\n", __FILE__, __LINE__);
+      pthread_mutex_unlock(&pflookup.lock);
+      return 1;
+    }
+    node->key = key;
+    node->val = val ;
+    node->next = ptr[index].next;
+    ptr[index].next = node;
+  }
+  pthread_mutex_unlock(&pflookup.lock);
+  return 0;
 }
 
 // Search for an address for a given oid
 void *prehashSearch(unsigned int key) {
-       int index;
-       prehashlistnode_t *ptr, *node;
-
-       ptr = pflookup.table;
-       index = prehashFunction(key);
-       node = &ptr[index];
-       pthread_mutex_lock(&pflookup.lock);
-       while(node != NULL) {
-               if(node->key == key) {
-                       pthread_mutex_unlock(&pflookup.lock);
-                       return node->val;
-               }
-               node = node->next;
-       }
-       pthread_mutex_unlock(&pflookup.lock);
-       return NULL;
+  int index;
+  prehashlistnode_t *ptr, *node;
+  
+  ptr = pflookup.table;
+  index = prehashFunction(key);
+  node = &ptr[index];
+  pthread_mutex_lock(&pflookup.lock);
+  while(node != NULL) {
+    if(node->key == key) {
+      pthread_mutex_unlock(&pflookup.lock);
+      return node->val;
+    }
+    node = node->next;
+  }
+  pthread_mutex_unlock(&pflookup.lock);
+  return NULL;
 }
 
 unsigned int prehashRemove(unsigned int key) {
-       int index;
-       prehashlistnode_t *curr, *prev;
-       prehashlistnode_t *ptr, *node;
-       
-       ptr = pflookup.table;
-       index = prehashFunction(key);
-       curr = &ptr[index];
-
-       pthread_mutex_lock(&pflookup.lock);
-       for (; curr != NULL; curr = curr->next) {
-               if (curr->key == key) {         // Find a match in the hash table
-                       pflookup.numelements--;  // Decrement the number of elements in the global hashtable
-                       if ((curr == &ptr[index]) && (curr->next == NULL))  { // Delete the first item inside the hashtable with no linked list of prehashlistnode_t 
-                               curr->key = 0;
-                               curr->val = NULL;
-                       } else if ((curr == &ptr[index]) && (curr->next != NULL)) { //Delete the first item with a linked list of prehashlistnode_t  connected 
-                               curr->key = curr->next->key;
-                               curr->val = curr->next->val;
-                               node = curr->next;
-                               curr->next = curr->next->next;
-                               free(node);
-                       } else {                                                // Regular delete from linked listed    
-                               prev->next = curr->next;
-                               free(curr);
-                       }
-                       pthread_mutex_unlock(&pflookup.lock);
-                       return 0;
-               }       
-               prev = curr; 
-       }
-       pthread_mutex_unlock(&pflookup.lock);
-       return 1;
+  int index;
+  prehashlistnode_t *curr, *prev;
+  prehashlistnode_t *ptr, *node;
+  
+  ptr = pflookup.table;
+  index = prehashFunction(key);
+  curr = &ptr[index];
+  
+  pthread_mutex_lock(&pflookup.lock);
+  for (; curr != NULL; curr = curr->next) {
+    if (curr->key == key) {         // Find a match in the hash table
+      pflookup.numelements--;  // Decrement the number of elements in the global hashtable
+      if ((curr == &ptr[index]) && (curr->next == NULL))  { // Delete the first item inside the hashtable with no linked list of prehashlistnode_t 
+       curr->key = 0;
+       curr->val = NULL;
+      } else if ((curr == &ptr[index]) && (curr->next != NULL)) { //Delete the first item with a linked list of prehashlistnode_t  connected 
+       curr->key = curr->next->key;
+       curr->val = curr->next->val;
+       node = curr->next;
+       curr->next = curr->next->next;
+       free(node);
+      } else {                                         // Regular delete from linked listed    
+       prev->next = curr->next;
+       free(curr);
+      }
+      pthread_mutex_unlock(&pflookup.lock);
+      return 0;
+    }       
+    prev = curr; 
+  }
+  pthread_mutex_unlock(&pflookup.lock);
+  return 1;
 }
 
 unsigned int prehashResize(unsigned int newsize) {
-       prehashlistnode_t *node, *ptr, *curr, *next;    // curr and next keep track of the current and the next chashlistnodes in a linked list
-       unsigned int oldsize;
-       int isfirst;    // Keeps track of the first element in the prehashlistnode_t for each bin in hashtable
-       int i,index;    
-       prehashlistnode_t *newnode;             
-       
-       ptr = pflookup.table;
-       oldsize = pflookup.size;
-       
-       if((node = calloc(newsize, sizeof(prehashlistnode_t))) == NULL) {
-               printf("Calloc error %s %d\n", __FILE__, __LINE__);
-               return 1;
-       }
-
-       pflookup.table = node;          //Update the global hashtable upon resize()
-       pflookup.size = newsize;
-       pflookup.numelements = 0;
-
-       for(i = 0; i < oldsize; i++) {                  //Outer loop for each bin in hash table
-               curr = &ptr[i];
-               isfirst = 1;                    
-               while (curr != NULL) {                  //Inner loop to go through linked lists
-                       if (curr->key == 0) {           //Exit inner loop if there the first element for a given bin/index is NULL
-                               break;                  //key = val =0 for element if not present within the hash table
-                       }
-                       next = curr->next;
-                       index = prehashFunction(curr->key);
-                       // Insert into the new table
-                       if(pflookup.table[index].next == NULL && pflookup.table[index].key == 0) { 
-                               pflookup.table[index].key = curr->key;
-                               pflookup.table[index].val = curr->val;
-                               pflookup.numelements++;
-                       }else { 
-                               if((newnode = calloc(1, sizeof(prehashlistnode_t))) == NULL) { 
-                                       printf("Calloc error %s, %d\n", __FILE__, __LINE__);
-                                       return 1;
-                               }       
-                               newnode->key = curr->key;
-                               newnode->val = curr->val;
-                               newnode->next = pflookup.table[index].next;
-                               pflookup.table[index].next = newnode;    
-                               pflookup.numelements++;
-                       }       
-
-                       //free the linked list of prehashlistnode_t if not the first element in the hash table
-                       if (isfirst != 1) {
-                               free(curr);
-                       
-                       
-                       isfirst = 0;
-                       curr = next;
-               }
-       }
-
-       free(ptr);              //Free the memory of the old hash table 
-       return 0;
+  prehashlistnode_t *node, *ptr, *curr, *next; // curr and next keep track of the current and the next chashlistnodes in a linked list
+  unsigned int oldsize;
+  int isfirst;    // Keeps track of the first element in the prehashlistnode_t for each bin in hashtable
+  int i,index;         
+  prehashlistnode_t *newnode;          
+  
+  ptr = pflookup.table;
+  oldsize = pflookup.size;
+  
+  if((node = calloc(newsize, sizeof(prehashlistnode_t))) == NULL) {
+    printf("Calloc error %s %d\n", __FILE__, __LINE__);
+    return 1;
+  }
+  
+  pflookup.table = node;               //Update the global hashtable upon resize()
+  pflookup.size = newsize;
+  pflookup.numelements = 0;
+  
+  for(i = 0; i < oldsize; i++) {                       //Outer loop for each bin in hash table
+    curr = &ptr[i];
+    isfirst = 1;                       
+    while (curr != NULL) {                     //Inner loop to go through linked lists
+      if (curr->key == 0) {            //Exit inner loop if there the first element for a given bin/index is NULL
+       break;                  //key = val =0 for element if not present within the hash table
+      }
+      next = curr->next;
+      index = prehashFunction(curr->key);
+      // Insert into the new table
+      if(pflookup.table[index].next == NULL && pflookup.table[index].key == 0) { 
+       pflookup.table[index].key = curr->key;
+       pflookup.table[index].val = curr->val;
+       pflookup.numelements++;
+      }else { 
+       if((newnode = calloc(1, sizeof(prehashlistnode_t))) == NULL) { 
+         printf("Calloc error %s, %d\n", __FILE__, __LINE__);
+         return 1;
+       }       
+       newnode->key = curr->key;
+       newnode->val = curr->val;
+       newnode->next = pflookup.table[index].next;
+       pflookup.table[index].next = newnode;    
+       pflookup.numelements++;
+      }       
+      
+      //free the linked list of prehashlistnode_t if not the first element in the hash table
+      if (isfirst != 1) {
+       free(curr);
+      } 
+      
+      isfirst = 0;
+      curr = next;
+    }
+  }
+  
+  free(ptr);           //Free the memory of the old hash table 
+  return 0;
 }
 
 /* Deletes the prefetch Cache */
 void prehashDelete() {
-       int i, isFirst;
-       prehashlistnode_t *ptr, *curr, *next;
-       ptr = pflookup.table; 
-
-       for(i=0 ; i<pflookup.size ; i++) {
-               curr = &ptr[i];
-               isFirst = 1;
-               while(curr != NULL) {
-                       next = curr->next;
-                       if(isFirst != 1) {
-                               free(curr);
-                       }
-                       isFirst = 0;
-                       curr = next;
-               }
-       }
-
-       free(ptr);
+  int i, isFirst;
+  prehashlistnode_t *ptr, *curr, *next;
+  ptr = pflookup.table; 
+  
+  for(i=0 ; i<pflookup.size ; i++) {
+    curr = &ptr[i];
+    isFirst = 1;
+    while(curr != NULL) {
+      next = curr->next;
+      if(isFirst != 1) {
+       free(curr);
+      }
+      isFirst = 0;
+      curr = next;
+    }
+  }
+  
+  free(ptr);
 }
 
 //Note: This is based on the implementation of the inserting a key in the first position of the hashtable 
 void prehashClear() {
   int i, isFirstBin;
   prehashlistnode_t *ptr, *prev, *curr;
-
+  
   pthread_mutex_lock(&pflookup.lock);
   ptr = pflookup.table; 
   for(i = 0; i < pflookup.size; i++) {
index e45cdc552456d836db450b77b72b00416ad129c4..98a537ef20c5f7ee664c68a3b6532ce930b10255 100644 (file)
@@ -2,7 +2,7 @@
 #include <netinet/tcp.h>
 
 #if defined(__i386__)
-inline static int test_and_set(volatile unsigned int *addr) {
+inline int test_and_set(volatile unsigned int *addr) {
     int oldval;
     /* Note: the "xchg" instruction does not need a "lock" prefix */
     __asm__ __volatile__("xchgl %0, %1"
@@ -10,7 +10,7 @@ inline static int test_and_set(volatile unsigned int *addr) {
         : "0"(1), "m"(*(addr)));
     return oldval;
 }
-inline static void UnLock(volatile unsigned int *addr) {
+inline void UnLock(volatile unsigned int *addr) {
     int oldval;
     /* Note: the "xchg" instruction does not need a "lock" prefix */
     __asm__ __volatile__("xchgl %0, %1"
index bc1b4fab4aa28bdfcc6e91f65217bf2b163ff160..c85d7da826a9bdde5bcf5d46fbf3863da0279f9f 100644 (file)
@@ -3,6 +3,8 @@
 
 #include "dstm.h"
 
+int test_and_set(volatile unsigned int *addr);
+void UnLock(volatile unsigned int *addr);
 
 typedef struct socknode {
     int sd;
index 0fdd0443a535af76fec3d50b2b602c948edc016f..d3064030f80d73a469272eae674af5737fdf12a2 100644 (file)
@@ -300,6 +300,7 @@ objheader_t *transRead(transrecord_t *record, unsigned int oid) {
     objcopy = (objheader_t *) objstrAlloc(record->cache, size);
     memcpy(objcopy, objheader, size);
     /* Insert into cache's lookup table */
+    STATUS(objcopy)=0;
     chashInsert(record->lookupTable, OID(objheader), objcopy); 
 #ifdef COMPILER
     return &objcopy[1];
@@ -331,7 +332,7 @@ objheader_t *transRead(transrecord_t *record, unsigned int oid) {
       printf("Error: Object not found in Remote location %s, %d\n", __FILE__, __LINE__);
       return NULL;
     } else {
-      
+      STATUS(objcopy)=0;      
 #ifdef COMPILER
       return &objcopy[1];
 #else
@@ -344,7 +345,6 @@ objheader_t *transRead(transrecord_t *record, unsigned int oid) {
 /* This function creates objects in the transaction record */
 objheader_t *transCreateObj(transrecord_t *record, unsigned int size) {
   objheader_t *tmp = (objheader_t *) objstrAlloc(record->cache, (sizeof(objheader_t) + size));
-  tmp->notifylist = NULL;
   OID(tmp) = getNewOID();
   tmp->version = 1;
   tmp->rcount = 1;
@@ -853,9 +853,7 @@ void *handleLocalReq(void *threadarg) {
       objnotfound++;
     } else { /* If Obj found in machine (i.e. has not moved) */
       /* Check if Obj is locked by any previous transaction */
-      pthread_mutex_lock(&atomicObjLock);
-      if ((STATUS((objheader_t *)mobj) & LOCK) == LOCK) {
-        pthread_mutex_unlock(&atomicObjLock);
+      if (test_and_set(STATUSPTR(mobj))) {
         if (version == ((objheader_t *)mobj)->version) {      /* If locked then match versions */ 
           v_matchlock++;
         } else {/* If versions don't match ...HARD ABORT */
@@ -863,9 +861,8 @@ void *handleLocalReq(void *threadarg) {
           /* Send TRANS_DISAGREE to Coordinator */
           localtdata->tdata->recvmsg[localtdata->tdata->thread_id].rcv_status = TRANS_DISAGREE;
         }
-      } else {/* If Obj is not locked then lock object */
-        STATUS(((objheader_t *)mobj)) |= LOCK;
-        pthread_mutex_unlock(&atomicObjLock);
+      } else {
+       //we're locked
         /* Save all object oids that are locked on this machine during this transaction request call */
         oidlocked[objlocked] = OID(((objheader_t *)mobj));
         objlocked++;
@@ -944,7 +941,7 @@ int transAbortProcess(local_thread_data_array_t  *localtdata) {
       printf("mhashsearch returns NULL at %s, %d\n", __FILE__, __LINE__);
       return 1;
     }
-    STATUS(((objheader_t *)header)) &= ~(LOCK);
+    UnLock(STATUSPTR(header));
   }
   
   return 0;
@@ -952,69 +949,65 @@ int transAbortProcess(local_thread_data_array_t  *localtdata) {
 
 /*This function completes the COMMIT process is the transaction is commiting*/
 int transComProcess(local_thread_data_array_t  *localtdata) {
-       objheader_t *header, *tcptr;
-       int i, nummod, tmpsize, numcreated, numlocked;
-       unsigned int *oidmod, *oidcreated, *oidlocked;
-       void *ptrcreate;
-
-       nummod = localtdata->tdata->buffer->f.nummod;
-       oidmod = localtdata->tdata->buffer->oidmod;
-       numcreated = localtdata->tdata->buffer->f.numcreated;
-       oidcreated = localtdata->tdata->buffer->oidcreated;
-       numlocked = localtdata->transinfo->numlocked;
-       oidlocked = localtdata->transinfo->objlocked;
-
-       for (i = 0; i < nummod; i++) {
-               if((header = (objheader_t *) mhashSearch(oidmod[i])) == NULL) {
-                       printf("Error: transComProcess() mhashsearch returns NULL at %s, %d\n", __FILE__, __LINE__);
-                       return 1;
-               }
-               /* Copy from transaction cache -> main object store */
-               if ((tcptr = ((objheader_t *) chashSearch(localtdata->tdata->rec->lookupTable, oidmod[i]))) == NULL) {
-                       printf("Error: transComProcess() chashSearch returned NULL at %s, %d\n", __FILE__, __LINE__);
-                       return 1;
-               }
-               GETSIZE(tmpsize, header);
-               pthread_mutex_lock(&mainobjstore_mutex);
-        char *tmptcptr = (char *) tcptr;
-               memcpy((char*)header+sizeof(objheader_t), (char *)tmptcptr+ sizeof(objheader_t), tmpsize);
-               header->version += 1;
-        pthread_mutex_lock(&notifymutex);
-               if(header->notifylist != NULL) {
-                       notifyAll(&header->notifylist, OID(header), header->version);
-               }
-        pthread_mutex_unlock(&notifymutex);
-               pthread_mutex_unlock(&mainobjstore_mutex);
-       }
-       /* If object is newly created inside transaction then commit it */
-       for (i = 0; i < numcreated; i++) {
-               if ((header = ((objheader_t *) chashSearch(localtdata->tdata->rec->lookupTable, oidcreated[i]))) == NULL) {
-                       printf("Error: transComProcess() chashSearch returned NULL for oid = %x at %s, %d\n", oidcreated[i], __FILE__, __LINE__);
-                       return 1;
-               }
-               GETSIZE(tmpsize, header);
-               tmpsize += sizeof(objheader_t);
-               pthread_mutex_lock(&mainobjstore_mutex);
-               if ((ptrcreate = objstrAlloc(mainobjstore, tmpsize)) == NULL) {
-                       printf("Error: transComProcess() failed objstrAlloc %s, %d\n", __FILE__, __LINE__);
-                       pthread_mutex_unlock(&mainobjstore_mutex);
-                       return 1;
-               }
-               pthread_mutex_unlock(&mainobjstore_mutex);
-               memcpy(ptrcreate, header, tmpsize);
-               mhashInsert(oidcreated[i], ptrcreate);
-               lhashInsert(oidcreated[i], myIpAddr);
-       }
-       /* Unlock locked objects */
-       for(i = 0; i < numlocked; i++) {
-               if((header = (objheader_t *) mhashSearch(oidlocked[i])) == NULL) {
-                       printf("mhashsearch returns NULL at %s, %d\n", __FILE__, __LINE__);
-                       return 1;
-               }
-               STATUS(header) &= ~(LOCK);
-       }
-
-       return 0;
+  objheader_t *header, *tcptr;
+  int i, nummod, tmpsize, numcreated, numlocked;
+  unsigned int *oidmod, *oidcreated, *oidlocked;
+  void *ptrcreate;
+  
+  nummod = localtdata->tdata->buffer->f.nummod;
+  oidmod = localtdata->tdata->buffer->oidmod;
+  numcreated = localtdata->tdata->buffer->f.numcreated;
+  oidcreated = localtdata->tdata->buffer->oidcreated;
+  numlocked = localtdata->transinfo->numlocked;
+  oidlocked = localtdata->transinfo->objlocked;
+  
+  for (i = 0; i < nummod; i++) {
+    if((header = (objheader_t *) mhashSearch(oidmod[i])) == NULL) {
+      printf("Error: transComProcess() mhashsearch returns NULL at %s, %d\n", __FILE__, __LINE__);
+      return 1;
+    }
+    /* Copy from transaction cache -> main object store */
+    if ((tcptr = ((objheader_t *) chashSearch(localtdata->tdata->rec->lookupTable, oidmod[i]))) == NULL) {
+      printf("Error: transComProcess() chashSearch returned NULL at %s, %d\n", __FILE__, __LINE__);
+      return 1;
+    }
+    GETSIZE(tmpsize, header);
+    char *tmptcptr = (char *) tcptr;
+    memcpy((char*)header+sizeof(objheader_t), (char *)tmptcptr+ sizeof(objheader_t), tmpsize);
+    header->version += 1;
+    if(header->notifylist != NULL) {
+      notifyAll(&header->notifylist, OID(header), header->version);
+    }
+  }
+  /* If object is newly created inside transaction then commit it */
+  for (i = 0; i < numcreated; i++) {
+    if ((header = ((objheader_t *) chashSearch(localtdata->tdata->rec->lookupTable, oidcreated[i]))) == NULL) {
+      printf("Error: transComProcess() chashSearch returned NULL for oid = %x at %s, %d\n", oidcreated[i], __FILE__, __LINE__);
+      return 1;
+    }
+    GETSIZE(tmpsize, header);
+    tmpsize += sizeof(objheader_t);
+    pthread_mutex_lock(&mainobjstore_mutex);
+    if ((ptrcreate = objstrAlloc(mainobjstore, tmpsize)) == NULL) {
+      printf("Error: transComProcess() failed objstrAlloc %s, %d\n", __FILE__, __LINE__);
+      pthread_mutex_unlock(&mainobjstore_mutex);
+      return 1;
+    }
+    pthread_mutex_unlock(&mainobjstore_mutex);
+    memcpy(ptrcreate, header, tmpsize);
+    mhashInsert(oidcreated[i], ptrcreate);
+    lhashInsert(oidcreated[i], myIpAddr);
+  }
+  /* Unlock locked objects */
+  for(i = 0; i < numlocked; i++) {
+    if((header = (objheader_t *) mhashSearch(oidlocked[i])) == NULL) {
+      printf("mhashsearch returns NULL at %s, %d\n", __FILE__, __LINE__);
+      return 1;
+    }
+    UnLock(STATUSPTR(header));
+  }
+  
+  return 0;
 }
 
 prefetchpile_t *foundLocal(char *ptr) {
@@ -1214,7 +1207,8 @@ int getPrefetchResponse(int sd) {
     }
     pthread_mutex_unlock(&prefetchcache_mutex);
     memcpy(modptr, recvbuffer + sizeof(char) + sizeof(unsigned int), size);
-    
+    STATUS(modptr)=0;
+
     /* Insert the oid and its address into the prefetch hash lookup table */
     /* Do a version comparison if the oid exists */
     if((oldptr = prehashSearch(oid)) != NULL) {
@@ -1416,103 +1410,103 @@ int findHost(unsigned int hostIp)
 /* This function sends notification request per thread waiting on object(s) whose version 
  * changes */
 int reqNotify(unsigned int *oidarry, unsigned short *versionarry, unsigned int numoid) {
-       int sock,i;
-       objheader_t *objheader;
-       struct sockaddr_in remoteAddr;
-       char msg[1 + numoid * (sizeof(unsigned short) + sizeof(unsigned int)) +  3 * sizeof(unsigned int)];
-       char *ptr;
-       int bytesSent;
-       int status, size;
-       unsigned short version;
-       unsigned int oid,mid;
-       static unsigned int threadid = 0;
-       pthread_mutex_t threadnotify = PTHREAD_MUTEX_INITIALIZER; //Lock and condition var for threadjoin and notification
-       pthread_cond_t threadcond = PTHREAD_COND_INITIALIZER;
-       notifydata_t *ndata;
-
-       oid = oidarry[0];
-       if((mid = lhashSearch(oid)) == 0) {
-               printf("Error: %s() No such machine found for oid =%x\n",__func__, oid);
-               return;
-       }
-
-       if ((sock = socket(AF_INET, SOCK_STREAM, 0)) < 0){
-               perror("reqNotify():socket()");
-               return -1;
-       }
-
-       bzero(&remoteAddr, sizeof(remoteAddr));
-       remoteAddr.sin_family = AF_INET;
-       remoteAddr.sin_port = htons(LISTEN_PORT);
-       remoteAddr.sin_addr.s_addr = htonl(mid);
-
-       /* Generate unique threadid */
-       threadid++;
-
-       /* Save threadid, numoid, oidarray, versionarray, pthread_cond_variable for later processing */
-       if((ndata = calloc(1, sizeof(notifydata_t))) == NULL) {
-               printf("Calloc Error %s, %d\n", __FILE__, __LINE__);
-               return -1;
-       }
-       ndata->numoid = numoid;
-       ndata->threadid = threadid;
-       ndata->oidarry = oidarry;
-       ndata->versionarry = versionarry;
-       ndata->threadcond = threadcond;
-       ndata->threadnotify = threadnotify;
-       if((status = notifyhashInsert(threadid, ndata)) != 0) {
-               printf("reqNotify(): Insert into notify hash table not successful %s, %d\n", __FILE__, __LINE__);
-               free(ndata);
-               return -1; 
-       }
-       
-       /* Send  number of oids, oidarry, version array, machine id and threadid */     
-       if (connect(sock, (struct sockaddr *)&remoteAddr, sizeof(remoteAddr)) < 0){
-               printf("reqNotify():error %d connecting to %s:%d\n", errno,
-                               inet_ntoa(remoteAddr.sin_addr), LISTEN_PORT);
-               free(ndata);
-               return -1;
-       } else {
-               msg[0] = THREAD_NOTIFY_REQUEST;
-               *((unsigned int *)(&msg[1])) = numoid;
-               /* Send array of oids  */
-               size = sizeof(unsigned int);
-               {
-                       i = 0;
-                       while(i < numoid) {
-                               oid = oidarry[i];
-                               *((unsigned int *)(&msg[1] + size)) = oid;
-                               size += sizeof(unsigned int);
-                               i++;
-                       }
-               }
-
-               /* Send array of version  */
-               {
-                       i = 0;
-                       while(i < numoid) {
-                               version = versionarry[i];
-                               *((unsigned short *)(&msg[1] + size)) = version;
-                               size += sizeof(unsigned short);
-                               i++;
-                       }
-               }
-
-               *((unsigned int *)(&msg[1] + size)) = myIpAddr;
-               size += sizeof(unsigned int);
-               *((unsigned int *)(&msg[1] + size)) = threadid;
-               pthread_mutex_lock(&(ndata->threadnotify));
-               size = 1 + numoid * (sizeof(unsigned int) + sizeof(unsigned short)) + 3 * sizeof(unsigned int);
-               send_data(sock, msg, size);
-               pthread_cond_wait(&(ndata->threadcond), &(ndata->threadnotify));
-               pthread_mutex_unlock(&(ndata->threadnotify));
-       }
-
-       pthread_cond_destroy(&threadcond);
-       pthread_mutex_destroy(&threadnotify);
-       free(ndata);
-       close(sock);
-       return status;
+  int sock,i;
+  objheader_t *objheader;
+  struct sockaddr_in remoteAddr;
+  char msg[1 + numoid * (sizeof(unsigned short) + sizeof(unsigned int)) +  3 * sizeof(unsigned int)];
+  char *ptr;
+  int bytesSent;
+  int status, size;
+  unsigned short version;
+  unsigned int oid,mid;
+  static unsigned int threadid = 0;
+  pthread_mutex_t threadnotify = PTHREAD_MUTEX_INITIALIZER; //Lock and condition var for threadjoin and notification
+  pthread_cond_t threadcond = PTHREAD_COND_INITIALIZER;
+  notifydata_t *ndata;
+  
+  oid = oidarry[0];
+  if((mid = lhashSearch(oid)) == 0) {
+    printf("Error: %s() No such machine found for oid =%x\n",__func__, oid);
+    return;
+  }
+  
+  if ((sock = socket(AF_INET, SOCK_STREAM, 0)) < 0){
+    perror("reqNotify():socket()");
+    return -1;
+  }
+  
+  bzero(&remoteAddr, sizeof(remoteAddr));
+  remoteAddr.sin_family = AF_INET;
+  remoteAddr.sin_port = htons(LISTEN_PORT);
+  remoteAddr.sin_addr.s_addr = htonl(mid);
+  
+  /* Generate unique threadid */
+  threadid++;
+  
+  /* Save threadid, numoid, oidarray, versionarray, pthread_cond_variable for later processing */
+  if((ndata = calloc(1, sizeof(notifydata_t))) == NULL) {
+    printf("Calloc Error %s, %d\n", __FILE__, __LINE__);
+    return -1;
+  }
+  ndata->numoid = numoid;
+  ndata->threadid = threadid;
+  ndata->oidarry = oidarry;
+  ndata->versionarry = versionarry;
+  ndata->threadcond = threadcond;
+  ndata->threadnotify = threadnotify;
+  if((status = notifyhashInsert(threadid, ndata)) != 0) {
+    printf("reqNotify(): Insert into notify hash table not successful %s, %d\n", __FILE__, __LINE__);
+    free(ndata);
+    return -1; 
+  }
+  
+  /* Send  number of oids, oidarry, version array, machine id and threadid */  
+  if (connect(sock, (struct sockaddr *)&remoteAddr, sizeof(remoteAddr)) < 0){
+    printf("reqNotify():error %d connecting to %s:%d\n", errno,
+          inet_ntoa(remoteAddr.sin_addr), LISTEN_PORT);
+    free(ndata);
+    return -1;
+  } else {
+    msg[0] = THREAD_NOTIFY_REQUEST;
+    *((unsigned int *)(&msg[1])) = numoid;
+    /* Send array of oids  */
+    size = sizeof(unsigned int);
+    {
+      i = 0;
+      while(i < numoid) {
+       oid = oidarry[i];
+       *((unsigned int *)(&msg[1] + size)) = oid;
+       size += sizeof(unsigned int);
+       i++;
+      }
+    }
+    
+    /* Send array of version  */
+    {
+      i = 0;
+      while(i < numoid) {
+       version = versionarry[i];
+       *((unsigned short *)(&msg[1] + size)) = version;
+       size += sizeof(unsigned short);
+       i++;
+      }
+    }
+    
+    *((unsigned int *)(&msg[1] + size)) = myIpAddr;
+    size += sizeof(unsigned int);
+    *((unsigned int *)(&msg[1] + size)) = threadid;
+    pthread_mutex_lock(&(ndata->threadnotify));
+    size = 1 + numoid * (sizeof(unsigned int) + sizeof(unsigned short)) + 3 * sizeof(unsigned int);
+    send_data(sock, msg, size);
+    pthread_cond_wait(&(ndata->threadcond), &(ndata->threadnotify));
+    pthread_mutex_unlock(&(ndata->threadnotify));
+  }
+  
+  pthread_cond_destroy(&threadcond);
+  pthread_mutex_destroy(&threadnotify);
+  free(ndata);
+  close(sock);
+  return status;
 }
 
 void threadNotify(unsigned int oid, unsigned short version, unsigned int tid) {
@@ -1551,50 +1545,50 @@ void threadNotify(unsigned int oid, unsigned short version, unsigned int tid) {
 }
 
 int notifyAll(threadlist_t **head, unsigned int oid, unsigned int version) {
-       threadlist_t *ptr;
-       unsigned int mid;
-       struct sockaddr_in remoteAddr;
-       char msg[1 + sizeof(unsigned short) + 2*sizeof(unsigned int)];
-       int sock, status, size, bytesSent;
-
-       while(*head != NULL) {
-               ptr = *head;
-               mid = ptr->mid; 
-               //create a socket connection to that machine
-               if ((sock = socket(AF_INET, SOCK_STREAM, 0)) < 0){
-                       perror("notifyAll():socket()");
-                       return -1;
-               }
-
-               bzero(&remoteAddr, sizeof(remoteAddr));
-               remoteAddr.sin_family = AF_INET;
-               remoteAddr.sin_port = htons(LISTEN_PORT);
-               remoteAddr.sin_addr.s_addr = htonl(mid);
-               //send Thread Notify response and threadid to that machine
-               if (connect(sock, (struct sockaddr *)&remoteAddr, sizeof(remoteAddr)) < 0){
-                       printf("notifyAll():error %d connecting to %s:%d\n", errno,
-                                       inet_ntoa(remoteAddr.sin_addr), LISTEN_PORT);
-                       status = -1;
-            fflush(stdout);
-               } else {
-                       bzero(msg, (1+sizeof(unsigned short) + 2*sizeof(unsigned int)));
-                       msg[0] = THREAD_NOTIFY_RESPONSE;
-                       *((unsigned int *)&msg[1]) = oid;
-                       size = sizeof(unsigned int);
-                       *((unsigned short *)(&msg[1]+ size)) = version;
-                       size+= sizeof(unsigned short);
-                       *((unsigned int *)(&msg[1]+ size)) = ptr->threadid;
-
-                       size = 1 + 2*sizeof(unsigned int) + sizeof(unsigned short);
-                       send_data(sock, msg, size);
-               }
-               //close socket
-               close(sock);
-               // Update head
-               *head = ptr->next;
-               free(ptr);
-       }
-       return status;
+  threadlist_t *ptr;
+  unsigned int mid;
+  struct sockaddr_in remoteAddr;
+  char msg[1 + sizeof(unsigned short) + 2*sizeof(unsigned int)];
+  int sock, status, size, bytesSent;
+  
+  while(*head != NULL) {
+    ptr = *head;
+    mid = ptr->mid; 
+    //create a socket connection to that machine
+    if ((sock = socket(AF_INET, SOCK_STREAM, 0)) < 0){
+      perror("notifyAll():socket()");
+      return -1;
+    }
+    
+    bzero(&remoteAddr, sizeof(remoteAddr));
+    remoteAddr.sin_family = AF_INET;
+    remoteAddr.sin_port = htons(LISTEN_PORT);
+    remoteAddr.sin_addr.s_addr = htonl(mid);
+    //send Thread Notify response and threadid to that machine
+    if (connect(sock, (struct sockaddr *)&remoteAddr, sizeof(remoteAddr)) < 0){
+      printf("notifyAll():error %d connecting to %s:%d\n", errno,
+            inet_ntoa(remoteAddr.sin_addr), LISTEN_PORT);
+      status = -1;
+      fflush(stdout);
+    } else {
+      bzero(msg, (1+sizeof(unsigned short) + 2*sizeof(unsigned int)));
+      msg[0] = THREAD_NOTIFY_RESPONSE;
+      *((unsigned int *)&msg[1]) = oid;
+      size = sizeof(unsigned int);
+      *((unsigned short *)(&msg[1]+ size)) = version;
+      size+= sizeof(unsigned short);
+      *((unsigned int *)(&msg[1]+ size)) = ptr->threadid;
+      
+      size = 1 + 2*sizeof(unsigned int) + sizeof(unsigned short);
+      send_data(sock, msg, size);
+    }
+    //close socket
+    close(sock);
+    // Update head
+    *head = ptr->next;
+    free(ptr);
+  }
+  return status;
 }
 
 void transAbort(transrecord_t *trans) {