HACK to make things work
[IRC.git] / Robust / src / Runtime / DSTM / interface / trans.c
index 0fdd0443a535af76fec3d50b2b602c948edc016f..d60bea5f3fbcafa7bfbd4368a109420233d069bd 100644 (file)
@@ -132,6 +132,7 @@ void prefetch(int ntuples, unsigned int *oids, unsigned short *endoffsets, short
   /* Set queue node values */
   int len;
   int top=endoffsets[ntuples-1];
+
   *((int *)(node))=ntuples;
   len = sizeof(int);
   memcpy(node+len, oids, ntuples*sizeof(unsigned int));
@@ -147,7 +148,8 @@ int dstmStartup(const char * option) {
   pthread_t thread_Listen;
   pthread_attr_t attr;
   int master=option!=NULL && strcmp(option, "master")==0;
-  
+  int fd;
+
   if (processConfigFile() != 0)
     return 0; //TODO: return error value, cause main program to exit
 #ifdef COMPILER
@@ -162,13 +164,14 @@ int dstmStartup(const char * option) {
   dstmInit();
   transInit();
   
+  fd=startlistening();
   if (master) {
     pthread_attr_init(&attr);
     pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_DETACHED);
-    pthread_create(&thread_Listen, &attr, dstmListen, NULL);
+    pthread_create(&thread_Listen, &attr, dstmListen, (void*)fd);
     return 1;
   } else {
-    dstmListen();
+    dstmListen((void *)fd);
     return 0;
   }
 }
@@ -300,6 +303,7 @@ objheader_t *transRead(transrecord_t *record, unsigned int oid) {
     objcopy = (objheader_t *) objstrAlloc(record->cache, size);
     memcpy(objcopy, objheader, size);
     /* Insert into cache's lookup table */
+    STATUS(objcopy)=0;
     chashInsert(record->lookupTable, OID(objheader), objcopy); 
 #ifdef COMPILER
     return &objcopy[1];
@@ -331,7 +335,7 @@ objheader_t *transRead(transrecord_t *record, unsigned int oid) {
       printf("Error: Object not found in Remote location %s, %d\n", __FILE__, __LINE__);
       return NULL;
     } else {
-      
+      STATUS(objcopy)=0;      
 #ifdef COMPILER
       return &objcopy[1];
 #else
@@ -344,7 +348,6 @@ objheader_t *transRead(transrecord_t *record, unsigned int oid) {
 /* This function creates objects in the transaction record */
 objheader_t *transCreateObj(transrecord_t *record, unsigned int size) {
   objheader_t *tmp = (objheader_t *) objstrAlloc(record->cache, (sizeof(objheader_t) + size));
-  tmp->notifylist = NULL;
   OID(tmp) = getNewOID();
   tmp->version = 1;
   tmp->rcount = 1;
@@ -407,18 +410,19 @@ int transCommit(transrecord_t *record) {
   unsigned int tot_bytes_mod, *listmid;
   plistnode_t *pile, *pile_ptr;
   int i, j, rc, val;
-  int pilecount, offset, threadnum = 0, trecvcount = 0;
+  int pilecount, offset, threadnum, trecvcount;
   char control;
   char transid[TID_LEN];
   trans_req_data_t *tosend;
   trans_commit_data_t transinfo;
   static int newtid = 0;
-  char treplyctrl = 0, treplyretry = 0; /* keeps track of the common response that needs to be sent */
-  char localstat = 0;
+  char treplyctrl, treplyretry; /* keeps track of the common response that needs to be sent */
   thread_data_array_t *thread_data_array;
   local_thread_data_array_t *ltdata;
+  int firsttime=1;
   
   do { 
+    treplyctrl=0;
     trecvcount = 0; 
     threadnum = 0; 
     treplyretry = 0;
@@ -427,8 +431,12 @@ int transCommit(transrecord_t *record) {
     
     /* Look through all the objects in the transaction record and make piles 
      * for each machine involved in the transaction*/
-    pile_ptr = pile = createPiles(record);
-    
+    if (firsttime)
+      pile_ptr = pile = createPiles(record);
+    else
+      pile=pile_ptr;
+    firsttime=0;
+
     /* Create the packet to be sent in TRANS_REQUEST */
     
     /* Count the number of participants */
@@ -458,7 +466,7 @@ int transCommit(transrecord_t *record) {
     pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_JOINABLE);
     pthread_mutex_init(&tlock, NULL);
     pthread_cond_init(&tcond, NULL);
-    
+
     /* Process each machine pile */
     while(pile != NULL) {
       //Create transaction id
@@ -549,7 +557,9 @@ int transCommit(transrecord_t *record) {
     pthread_cond_destroy(&tcond);
     pthread_mutex_destroy(&tlock);
     free(listmid);
-    pDelete(pile_ptr);
+
+    if (!treplyretry)
+      pDelete(pile_ptr);
     
     /* wait a random amount of time before retrying to commit transaction*/
     if(treplyretry) {
@@ -644,7 +654,6 @@ void *transRequest(void *threadarg) {
   /* Read control message from Participant */
   recv_data(sd, &control, sizeof(char));
   recvcontrol = control;
-  
   /* Update common data structure and increment count */
   tdata->recvmsg[tdata->thread_id].rcv_status = recvcontrol;
   
@@ -853,9 +862,7 @@ void *handleLocalReq(void *threadarg) {
       objnotfound++;
     } else { /* If Obj found in machine (i.e. has not moved) */
       /* Check if Obj is locked by any previous transaction */
-      pthread_mutex_lock(&atomicObjLock);
-      if ((STATUS((objheader_t *)mobj) & LOCK) == LOCK) {
-        pthread_mutex_unlock(&atomicObjLock);
+      if (test_and_set(STATUSPTR(mobj))) {
         if (version == ((objheader_t *)mobj)->version) {      /* If locked then match versions */ 
           v_matchlock++;
         } else {/* If versions don't match ...HARD ABORT */
@@ -863,9 +870,8 @@ void *handleLocalReq(void *threadarg) {
           /* Send TRANS_DISAGREE to Coordinator */
           localtdata->tdata->recvmsg[localtdata->tdata->thread_id].rcv_status = TRANS_DISAGREE;
         }
-      } else {/* If Obj is not locked then lock object */
-        STATUS(((objheader_t *)mobj)) |= LOCK;
-        pthread_mutex_unlock(&atomicObjLock);
+      } else {
+       //we're locked
         /* Save all object oids that are locked on this machine during this transaction request call */
         oidlocked[objlocked] = OID(((objheader_t *)mobj));
         objlocked++;
@@ -944,7 +950,7 @@ int transAbortProcess(local_thread_data_array_t  *localtdata) {
       printf("mhashsearch returns NULL at %s, %d\n", __FILE__, __LINE__);
       return 1;
     }
-    STATUS(((objheader_t *)header)) &= ~(LOCK);
+    UnLock(STATUSPTR(header));
   }
   
   return 0;
@@ -952,69 +958,65 @@ int transAbortProcess(local_thread_data_array_t  *localtdata) {
 
 /*This function completes the COMMIT process is the transaction is commiting*/
 int transComProcess(local_thread_data_array_t  *localtdata) {
-       objheader_t *header, *tcptr;
-       int i, nummod, tmpsize, numcreated, numlocked;
-       unsigned int *oidmod, *oidcreated, *oidlocked;
-       void *ptrcreate;
-
-       nummod = localtdata->tdata->buffer->f.nummod;
-       oidmod = localtdata->tdata->buffer->oidmod;
-       numcreated = localtdata->tdata->buffer->f.numcreated;
-       oidcreated = localtdata->tdata->buffer->oidcreated;
-       numlocked = localtdata->transinfo->numlocked;
-       oidlocked = localtdata->transinfo->objlocked;
-
-       for (i = 0; i < nummod; i++) {
-               if((header = (objheader_t *) mhashSearch(oidmod[i])) == NULL) {
-                       printf("Error: transComProcess() mhashsearch returns NULL at %s, %d\n", __FILE__, __LINE__);
-                       return 1;
-               }
-               /* Copy from transaction cache -> main object store */
-               if ((tcptr = ((objheader_t *) chashSearch(localtdata->tdata->rec->lookupTable, oidmod[i]))) == NULL) {
-                       printf("Error: transComProcess() chashSearch returned NULL at %s, %d\n", __FILE__, __LINE__);
-                       return 1;
-               }
-               GETSIZE(tmpsize, header);
-               pthread_mutex_lock(&mainobjstore_mutex);
-        char *tmptcptr = (char *) tcptr;
-               memcpy((char*)header+sizeof(objheader_t), (char *)tmptcptr+ sizeof(objheader_t), tmpsize);
-               header->version += 1;
-        pthread_mutex_lock(&notifymutex);
-               if(header->notifylist != NULL) {
-                       notifyAll(&header->notifylist, OID(header), header->version);
-               }
-        pthread_mutex_unlock(&notifymutex);
-               pthread_mutex_unlock(&mainobjstore_mutex);
-       }
-       /* If object is newly created inside transaction then commit it */
-       for (i = 0; i < numcreated; i++) {
-               if ((header = ((objheader_t *) chashSearch(localtdata->tdata->rec->lookupTable, oidcreated[i]))) == NULL) {
-                       printf("Error: transComProcess() chashSearch returned NULL for oid = %x at %s, %d\n", oidcreated[i], __FILE__, __LINE__);
-                       return 1;
-               }
-               GETSIZE(tmpsize, header);
-               tmpsize += sizeof(objheader_t);
-               pthread_mutex_lock(&mainobjstore_mutex);
-               if ((ptrcreate = objstrAlloc(mainobjstore, tmpsize)) == NULL) {
-                       printf("Error: transComProcess() failed objstrAlloc %s, %d\n", __FILE__, __LINE__);
-                       pthread_mutex_unlock(&mainobjstore_mutex);
-                       return 1;
-               }
-               pthread_mutex_unlock(&mainobjstore_mutex);
-               memcpy(ptrcreate, header, tmpsize);
-               mhashInsert(oidcreated[i], ptrcreate);
-               lhashInsert(oidcreated[i], myIpAddr);
-       }
-       /* Unlock locked objects */
-       for(i = 0; i < numlocked; i++) {
-               if((header = (objheader_t *) mhashSearch(oidlocked[i])) == NULL) {
-                       printf("mhashsearch returns NULL at %s, %d\n", __FILE__, __LINE__);
-                       return 1;
-               }
-               STATUS(header) &= ~(LOCK);
-       }
-
-       return 0;
+  objheader_t *header, *tcptr;
+  int i, nummod, tmpsize, numcreated, numlocked;
+  unsigned int *oidmod, *oidcreated, *oidlocked;
+  void *ptrcreate;
+  
+  nummod = localtdata->tdata->buffer->f.nummod;
+  oidmod = localtdata->tdata->buffer->oidmod;
+  numcreated = localtdata->tdata->buffer->f.numcreated;
+  oidcreated = localtdata->tdata->buffer->oidcreated;
+  numlocked = localtdata->transinfo->numlocked;
+  oidlocked = localtdata->transinfo->objlocked;
+  
+  for (i = 0; i < nummod; i++) {
+    if((header = (objheader_t *) mhashSearch(oidmod[i])) == NULL) {
+      printf("Error: transComProcess() mhashsearch returns NULL at %s, %d\n", __FILE__, __LINE__);
+      return 1;
+    }
+    /* Copy from transaction cache -> main object store */
+    if ((tcptr = ((objheader_t *) chashSearch(localtdata->tdata->rec->lookupTable, oidmod[i]))) == NULL) {
+      printf("Error: transComProcess() chashSearch returned NULL at %s, %d\n", __FILE__, __LINE__);
+      return 1;
+    }
+    GETSIZE(tmpsize, header);
+    char *tmptcptr = (char *) tcptr;
+    memcpy((char*)header+sizeof(objheader_t), (char *)tmptcptr+ sizeof(objheader_t), tmpsize);
+    header->version += 1;
+    if(header->notifylist != NULL) {
+      notifyAll(&header->notifylist, OID(header), header->version);
+    }
+  }
+  /* If object is newly created inside transaction then commit it */
+  for (i = 0; i < numcreated; i++) {
+    if ((header = ((objheader_t *) chashSearch(localtdata->tdata->rec->lookupTable, oidcreated[i]))) == NULL) {
+      printf("Error: transComProcess() chashSearch returned NULL for oid = %x at %s, %d\n", oidcreated[i], __FILE__, __LINE__);
+      return 1;
+    }
+    GETSIZE(tmpsize, header);
+    tmpsize += sizeof(objheader_t);
+    pthread_mutex_lock(&mainobjstore_mutex);
+    if ((ptrcreate = objstrAlloc(mainobjstore, tmpsize)) == NULL) {
+      printf("Error: transComProcess() failed objstrAlloc %s, %d\n", __FILE__, __LINE__);
+      pthread_mutex_unlock(&mainobjstore_mutex);
+      return 1;
+    }
+    pthread_mutex_unlock(&mainobjstore_mutex);
+    memcpy(ptrcreate, header, tmpsize);
+    mhashInsert(oidcreated[i], ptrcreate);
+    lhashInsert(oidcreated[i], myIpAddr);
+  }
+  /* Unlock locked objects */
+  for(i = 0; i < numlocked; i++) {
+    if((header = (objheader_t *) mhashSearch(oidlocked[i])) == NULL) {
+      printf("mhashsearch returns NULL at %s, %d\n", __FILE__, __LINE__);
+      return 1;
+    }
+    UnLock(STATUSPTR(header));
+  }
+  
+  return 0;
 }
 
 prefetchpile_t *foundLocal(char *ptr) {
@@ -1214,7 +1216,8 @@ int getPrefetchResponse(int sd) {
     }
     pthread_mutex_unlock(&prefetchcache_mutex);
     memcpy(modptr, recvbuffer + sizeof(char) + sizeof(unsigned int), size);
-    
+    STATUS(modptr)=0;
+
     /* Insert the oid and its address into the prefetch hash lookup table */
     /* Do a version comparison if the oid exists */
     if((oldptr = prehashSearch(oid)) != NULL) {
@@ -1416,103 +1419,103 @@ int findHost(unsigned int hostIp)
 /* This function sends notification request per thread waiting on object(s) whose version 
  * changes */
 int reqNotify(unsigned int *oidarry, unsigned short *versionarry, unsigned int numoid) {
-       int sock,i;
-       objheader_t *objheader;
-       struct sockaddr_in remoteAddr;
-       char msg[1 + numoid * (sizeof(unsigned short) + sizeof(unsigned int)) +  3 * sizeof(unsigned int)];
-       char *ptr;
-       int bytesSent;
-       int status, size;
-       unsigned short version;
-       unsigned int oid,mid;
-       static unsigned int threadid = 0;
-       pthread_mutex_t threadnotify = PTHREAD_MUTEX_INITIALIZER; //Lock and condition var for threadjoin and notification
-       pthread_cond_t threadcond = PTHREAD_COND_INITIALIZER;
-       notifydata_t *ndata;
-
-       oid = oidarry[0];
-       if((mid = lhashSearch(oid)) == 0) {
-               printf("Error: %s() No such machine found for oid =%x\n",__func__, oid);
-               return;
-       }
-
-       if ((sock = socket(AF_INET, SOCK_STREAM, 0)) < 0){
-               perror("reqNotify():socket()");
-               return -1;
-       }
-
-       bzero(&remoteAddr, sizeof(remoteAddr));
-       remoteAddr.sin_family = AF_INET;
-       remoteAddr.sin_port = htons(LISTEN_PORT);
-       remoteAddr.sin_addr.s_addr = htonl(mid);
-
-       /* Generate unique threadid */
-       threadid++;
-
-       /* Save threadid, numoid, oidarray, versionarray, pthread_cond_variable for later processing */
-       if((ndata = calloc(1, sizeof(notifydata_t))) == NULL) {
-               printf("Calloc Error %s, %d\n", __FILE__, __LINE__);
-               return -1;
-       }
-       ndata->numoid = numoid;
-       ndata->threadid = threadid;
-       ndata->oidarry = oidarry;
-       ndata->versionarry = versionarry;
-       ndata->threadcond = threadcond;
-       ndata->threadnotify = threadnotify;
-       if((status = notifyhashInsert(threadid, ndata)) != 0) {
-               printf("reqNotify(): Insert into notify hash table not successful %s, %d\n", __FILE__, __LINE__);
-               free(ndata);
-               return -1; 
-       }
-       
-       /* Send  number of oids, oidarry, version array, machine id and threadid */     
-       if (connect(sock, (struct sockaddr *)&remoteAddr, sizeof(remoteAddr)) < 0){
-               printf("reqNotify():error %d connecting to %s:%d\n", errno,
-                               inet_ntoa(remoteAddr.sin_addr), LISTEN_PORT);
-               free(ndata);
-               return -1;
-       } else {
-               msg[0] = THREAD_NOTIFY_REQUEST;
-               *((unsigned int *)(&msg[1])) = numoid;
-               /* Send array of oids  */
-               size = sizeof(unsigned int);
-               {
-                       i = 0;
-                       while(i < numoid) {
-                               oid = oidarry[i];
-                               *((unsigned int *)(&msg[1] + size)) = oid;
-                               size += sizeof(unsigned int);
-                               i++;
-                       }
-               }
-
-               /* Send array of version  */
-               {
-                       i = 0;
-                       while(i < numoid) {
-                               version = versionarry[i];
-                               *((unsigned short *)(&msg[1] + size)) = version;
-                               size += sizeof(unsigned short);
-                               i++;
-                       }
-               }
-
-               *((unsigned int *)(&msg[1] + size)) = myIpAddr;
-               size += sizeof(unsigned int);
-               *((unsigned int *)(&msg[1] + size)) = threadid;
-               pthread_mutex_lock(&(ndata->threadnotify));
-               size = 1 + numoid * (sizeof(unsigned int) + sizeof(unsigned short)) + 3 * sizeof(unsigned int);
-               send_data(sock, msg, size);
-               pthread_cond_wait(&(ndata->threadcond), &(ndata->threadnotify));
-               pthread_mutex_unlock(&(ndata->threadnotify));
-       }
-
-       pthread_cond_destroy(&threadcond);
-       pthread_mutex_destroy(&threadnotify);
-       free(ndata);
-       close(sock);
-       return status;
+  int sock,i;
+  objheader_t *objheader;
+  struct sockaddr_in remoteAddr;
+  char msg[1 + numoid * (sizeof(unsigned short) + sizeof(unsigned int)) +  3 * sizeof(unsigned int)];
+  char *ptr;
+  int bytesSent;
+  int status, size;
+  unsigned short version;
+  unsigned int oid,mid;
+  static unsigned int threadid = 0;
+  pthread_mutex_t threadnotify = PTHREAD_MUTEX_INITIALIZER; //Lock and condition var for threadjoin and notification
+  pthread_cond_t threadcond = PTHREAD_COND_INITIALIZER;
+  notifydata_t *ndata;
+  
+  oid = oidarry[0];
+  if((mid = lhashSearch(oid)) == 0) {
+    printf("Error: %s() No such machine found for oid =%x\n",__func__, oid);
+    return;
+  }
+  
+  if ((sock = socket(AF_INET, SOCK_STREAM, 0)) < 0){
+    perror("reqNotify():socket()");
+    return -1;
+  }
+  
+  bzero(&remoteAddr, sizeof(remoteAddr));
+  remoteAddr.sin_family = AF_INET;
+  remoteAddr.sin_port = htons(LISTEN_PORT);
+  remoteAddr.sin_addr.s_addr = htonl(mid);
+  
+  /* Generate unique threadid */
+  threadid++;
+  
+  /* Save threadid, numoid, oidarray, versionarray, pthread_cond_variable for later processing */
+  if((ndata = calloc(1, sizeof(notifydata_t))) == NULL) {
+    printf("Calloc Error %s, %d\n", __FILE__, __LINE__);
+    return -1;
+  }
+  ndata->numoid = numoid;
+  ndata->threadid = threadid;
+  ndata->oidarry = oidarry;
+  ndata->versionarry = versionarry;
+  ndata->threadcond = threadcond;
+  ndata->threadnotify = threadnotify;
+  if((status = notifyhashInsert(threadid, ndata)) != 0) {
+    printf("reqNotify(): Insert into notify hash table not successful %s, %d\n", __FILE__, __LINE__);
+    free(ndata);
+    return -1; 
+  }
+  
+  /* Send  number of oids, oidarry, version array, machine id and threadid */  
+  if (connect(sock, (struct sockaddr *)&remoteAddr, sizeof(remoteAddr)) < 0){
+    printf("reqNotify():error %d connecting to %s:%d\n", errno,
+          inet_ntoa(remoteAddr.sin_addr), LISTEN_PORT);
+    free(ndata);
+    return -1;
+  } else {
+    msg[0] = THREAD_NOTIFY_REQUEST;
+    *((unsigned int *)(&msg[1])) = numoid;
+    /* Send array of oids  */
+    size = sizeof(unsigned int);
+    {
+      i = 0;
+      while(i < numoid) {
+       oid = oidarry[i];
+       *((unsigned int *)(&msg[1] + size)) = oid;
+       size += sizeof(unsigned int);
+       i++;
+      }
+    }
+    
+    /* Send array of version  */
+    {
+      i = 0;
+      while(i < numoid) {
+       version = versionarry[i];
+       *((unsigned short *)(&msg[1] + size)) = version;
+       size += sizeof(unsigned short);
+       i++;
+      }
+    }
+    
+    *((unsigned int *)(&msg[1] + size)) = myIpAddr;
+    size += sizeof(unsigned int);
+    *((unsigned int *)(&msg[1] + size)) = threadid;
+    pthread_mutex_lock(&(ndata->threadnotify));
+    size = 1 + numoid * (sizeof(unsigned int) + sizeof(unsigned short)) + 3 * sizeof(unsigned int);
+    send_data(sock, msg, size);
+    pthread_cond_wait(&(ndata->threadcond), &(ndata->threadnotify));
+    pthread_mutex_unlock(&(ndata->threadnotify));
+  }
+  
+  pthread_cond_destroy(&threadcond);
+  pthread_mutex_destroy(&threadnotify);
+  free(ndata);
+  close(sock);
+  return status;
 }
 
 void threadNotify(unsigned int oid, unsigned short version, unsigned int tid) {
@@ -1551,50 +1554,50 @@ void threadNotify(unsigned int oid, unsigned short version, unsigned int tid) {
 }
 
 int notifyAll(threadlist_t **head, unsigned int oid, unsigned int version) {
-       threadlist_t *ptr;
-       unsigned int mid;
-       struct sockaddr_in remoteAddr;
-       char msg[1 + sizeof(unsigned short) + 2*sizeof(unsigned int)];
-       int sock, status, size, bytesSent;
-
-       while(*head != NULL) {
-               ptr = *head;
-               mid = ptr->mid; 
-               //create a socket connection to that machine
-               if ((sock = socket(AF_INET, SOCK_STREAM, 0)) < 0){
-                       perror("notifyAll():socket()");
-                       return -1;
-               }
-
-               bzero(&remoteAddr, sizeof(remoteAddr));
-               remoteAddr.sin_family = AF_INET;
-               remoteAddr.sin_port = htons(LISTEN_PORT);
-               remoteAddr.sin_addr.s_addr = htonl(mid);
-               //send Thread Notify response and threadid to that machine
-               if (connect(sock, (struct sockaddr *)&remoteAddr, sizeof(remoteAddr)) < 0){
-                       printf("notifyAll():error %d connecting to %s:%d\n", errno,
-                                       inet_ntoa(remoteAddr.sin_addr), LISTEN_PORT);
-                       status = -1;
-            fflush(stdout);
-               } else {
-                       bzero(msg, (1+sizeof(unsigned short) + 2*sizeof(unsigned int)));
-                       msg[0] = THREAD_NOTIFY_RESPONSE;
-                       *((unsigned int *)&msg[1]) = oid;
-                       size = sizeof(unsigned int);
-                       *((unsigned short *)(&msg[1]+ size)) = version;
-                       size+= sizeof(unsigned short);
-                       *((unsigned int *)(&msg[1]+ size)) = ptr->threadid;
-
-                       size = 1 + 2*sizeof(unsigned int) + sizeof(unsigned short);
-                       send_data(sock, msg, size);
-               }
-               //close socket
-               close(sock);
-               // Update head
-               *head = ptr->next;
-               free(ptr);
-       }
-       return status;
+  threadlist_t *ptr;
+  unsigned int mid;
+  struct sockaddr_in remoteAddr;
+  char msg[1 + sizeof(unsigned short) + 2*sizeof(unsigned int)];
+  int sock, status, size, bytesSent;
+  
+  while(*head != NULL) {
+    ptr = *head;
+    mid = ptr->mid; 
+    //create a socket connection to that machine
+    if ((sock = socket(AF_INET, SOCK_STREAM, 0)) < 0){
+      perror("notifyAll():socket()");
+      return -1;
+    }
+    
+    bzero(&remoteAddr, sizeof(remoteAddr));
+    remoteAddr.sin_family = AF_INET;
+    remoteAddr.sin_port = htons(LISTEN_PORT);
+    remoteAddr.sin_addr.s_addr = htonl(mid);
+    //send Thread Notify response and threadid to that machine
+    if (connect(sock, (struct sockaddr *)&remoteAddr, sizeof(remoteAddr)) < 0){
+      printf("notifyAll():error %d connecting to %s:%d\n", errno,
+            inet_ntoa(remoteAddr.sin_addr), LISTEN_PORT);
+      status = -1;
+      fflush(stdout);
+    } else {
+      bzero(msg, (1+sizeof(unsigned short) + 2*sizeof(unsigned int)));
+      msg[0] = THREAD_NOTIFY_RESPONSE;
+      *((unsigned int *)&msg[1]) = oid;
+      size = sizeof(unsigned int);
+      *((unsigned short *)(&msg[1]+ size)) = version;
+      size+= sizeof(unsigned short);
+      *((unsigned int *)(&msg[1]+ size)) = ptr->threadid;
+      
+      size = 1 + 2*sizeof(unsigned int) + sizeof(unsigned short);
+      send_data(sock, msg, size);
+    }
+    //close socket
+    close(sock);
+    // Update head
+    *head = ptr->next;
+    free(ptr);
+  }
+  return status;
 }
 
 void transAbort(transrecord_t *trans) {