fix bugs in sockpool...test and set has to be atomic
authorbdemsky <bdemsky>
Fri, 11 Apr 2008 08:54:06 +0000 (08:54 +0000)
committerbdemsky <bdemsky>
Fri, 11 Apr 2008 08:54:06 +0000 (08:54 +0000)
Robust/src/Runtime/DSTM/interface/dstmserver.c
Robust/src/Runtime/DSTM/interface/sockpool.c
Robust/src/Runtime/DSTM/interface/sockpool.h
Robust/src/Runtime/DSTM/interface/trans.c

index 1f892e35f0a5fb84eb12e64d9f990b9af398e52f..b17319b9d48c7394afe28866400a07bd3f929eed 100644 (file)
@@ -41,7 +41,7 @@ int dstmInit(void)
                return 1; //failure
 
     //Initialize socket pool
-    if((transPResponseSocketPool = createSockPool(transPResponseSocketPool, 2*numHostsInSystem+1, LOADFACTOR)) == NULL) {
+    if((transPResponseSocketPool = createSockPool(transPResponseSocketPool, 2*numHostsInSystem+1)) == NULL) {
         printf("Error in creating new socket pool at  %s line %d\n", __FILE__, __LINE__);
         return 0;
     }
@@ -606,157 +606,143 @@ int transCommitProcess(void *modptr, unsigned int *oidmod, unsigned int *oidlock
  * then use offset values to prefetch references to other objects */
 
 int prefetchReq(int acceptfd) {
-       int i, size, objsize, numbytes = 0, isArray = 0, numoffset = 0;
-       int length;
-       char *recvbuffer, *sendbuffer, control;
-       unsigned int oid, mid;
-       objheader_t *header;
-       struct sockaddr_in remoteAddr;
-    oidmidpair_t oidmid;
-
-       do {
-               recv_data((int)acceptfd, &length, sizeof(int));
-               if(length != -1) {
-            recv_data((int)acceptfd, &oidmid, 2*sizeof(unsigned int));
-            oid = oidmid.oid;
-            mid = oidmid.mid;
-            size = length - sizeof(int) - (2 * sizeof(unsigned int));
-            numoffset = size/sizeof(short);
-            short offsetarry[numoffset];
-            recv_data((int) acceptfd, offsetarry, size);
-
-            int sd = -1;
-            if((sd = getSock(transPResponseSocketPool, mid)) == -1) {
-                printf("Error: No socket id in pool of sockets at %s line %d\n", __FILE__, __LINE__);
-                exit(-1);
-            }
-
-                       /*Process each oid */
-                       if ((header = mhashSearch(oid)) == NULL) {/* Obj not found */
-                               /* Save the oids not found in buffer for later use */
-                               size = sizeof(int) + sizeof(char) + sizeof(unsigned int) ;
-                               if((sendbuffer = calloc(1, size)) == NULL) {
-                                       printf("Calloc error at %s,%d\n", __FILE__, __LINE__);
-                                       close(sd);
-                                       return -1;
-                               }
-                               *((int *) sendbuffer) = size;
-                               *((char *)(sendbuffer + sizeof(int))) = OBJECT_NOT_FOUND;
-                               *((unsigned int *)(sendbuffer + sizeof(int) + sizeof(char))) = oid;
-
-                               control = TRANS_PREFETCH_RESPONSE;
-                               if(sendPrefetchResponse(sd, &control, sendbuffer, &size) != 0) {
-                                       printf("Error: %s() in sending prefetch response at %s, %d\n",
-                                                       __func__, __FILE__, __LINE__);
-                                       close(sd);
-                                       return -1;
-                               }
-                       } else { /* Object Found */
-                               int incr = 0;
-                               GETSIZE(objsize, header);
-                               size = sizeof(int) + sizeof(char) + sizeof(unsigned int) + sizeof(objheader_t) + objsize;
-                               if((sendbuffer = calloc(1, size)) == NULL) {
-                                       printf("Calloc error at %s,%d\n", __FILE__, __LINE__);
-                                       close(sd);
-                                       return -1;
-                               }
-                               *((int *) (sendbuffer + incr)) = size;
-                               incr += sizeof(int);
-                               *((char *)(sendbuffer + incr)) = OBJECT_FOUND;
-                               incr += sizeof(char);
-                               *((unsigned int *)(sendbuffer+incr)) = oid;
-                               incr += sizeof(unsigned int);
-                               memcpy(sendbuffer + incr, header, objsize + sizeof(objheader_t));
-
-                               control = TRANS_PREFETCH_RESPONSE;
-                               if(sendPrefetchResponse(sd, &control, sendbuffer, &size) != 0) {
-                                       printf("Error: %s() in sending prefetch response at %s, %d\n",
-                                                       __func__, __FILE__, __LINE__);
-                                       close(sd);
-                                       return -1;
-                               }
-
-                               /* Calculate the oid corresponding to the offset value */
-                               for(i = 0 ; i< numoffset ; i++) {
-                                       /* Check for arrays  */
-                                       if(TYPE(header) > NUMCLASSES) {
-                                               isArray = 1;
-                                       }
-                                       if(isArray == 1) {
-                                               int elementsize = classsize[TYPE(header)];
-                                               struct ArrayObject *ao = (struct ArrayObject *) (((char *)header) + sizeof(objheader_t));
-                                               unsigned short length = ao->___length___;
-                                               /* Check if array out of bounds */
-                                               if(offsetarry[i]< 0 || offsetarry[i] >= length) {
-                                                       break;
-                                               }
-                                               oid = *((unsigned int *)(((char *)header) + sizeof(objheader_t) + sizeof(struct ArrayObject) + (elementsize*offsetarry[i])));
-                                       } else {
-                                               oid = *((unsigned int *)(((char *)header) + sizeof(objheader_t) + offsetarry[i]));
-                                       }
-
-                                       if((header = mhashSearch(oid)) == NULL) {
-                                               size = sizeof(int) + sizeof(char) + sizeof(unsigned int) ;
-                                               if((sendbuffer = calloc(1, size)) == NULL) {
-                                                       printf("Calloc error at %s,%d\n", __FILE__, __LINE__);
-                                                       close(sd);
-                                                       return -1;
-                                               }
-                                               *((int *) sendbuffer) = size;
-                                               *((char *)(sendbuffer + sizeof(int))) = OBJECT_NOT_FOUND;
-                                               *((unsigned int *)(sendbuffer + sizeof(int) + sizeof(char))) = oid;
-
-                                               control = TRANS_PREFETCH_RESPONSE;
-                                               if(sendPrefetchResponse(sd, &control, sendbuffer, &size) != 0) {
-                                                       printf("Error: %s() in sending prefetch response at %s, %d\n",
-                                                                       __FILE__, __LINE__);
-                                                       close(sd);
-                                                       return -1;
-                                               }
-                                               break;
-                                       } else {/* Obj Found */
-                                               int incr = 0;
-                                               GETSIZE(objsize, header);
-                                               size = sizeof(int) + sizeof(char) + sizeof(unsigned int) + sizeof(objheader_t) + objsize;
-                                               if((sendbuffer = calloc(1, size)) == NULL) {
-                                                       printf("Calloc error at %s,%d\n", __func__, __FILE__, __LINE__);
-                                                       close(sd);
-                                                       return -1;
-                                               }
-                                               *((int *) (sendbuffer + incr)) = size;
-                                               incr += sizeof(int);
-                                               *((char *)(sendbuffer + incr)) = OBJECT_FOUND;
-                                               incr += sizeof(char);
-                                               *((unsigned int *)(sendbuffer+incr)) = oid;
-                                               incr += sizeof(unsigned int);
-                                               memcpy(sendbuffer + incr, header, objsize + sizeof(objheader_t));
-
-                                               control = TRANS_PREFETCH_RESPONSE;
-                                               if(sendPrefetchResponse(sd, &control, sendbuffer, &size) != 0) {
-                                                       printf("Error: %s() in sending prefetch response at %s, %d\n",
-                                                                       __func__, __FILE__, __LINE__);
-                                                       close(sd);
-                                                       return -1;
-                                               }
-                                       }
-                                       isArray = 0;
-                               }
-                       }
-
-            //Release socket
-            int status;
-            if((status = freeSock(transPResponseSocketPool, mid, sd)) == -1) {
-                printf("Error: in releasing socket at %s line %d\n", __FILE__, __LINE__);
-                return -1;
-            }
-               }
-       } while (length != -1);
-       return 0;
+  int i, size, objsize, numoffset = 0;
+  int length;
+  char *recvbuffer, *sendbuffer, control;
+  unsigned int oid, mid=-1;
+  objheader_t *header;
+  oidmidpair_t oidmid;
+  int sd = -1;
+      
+  while(1) {
+    recv_data((int)acceptfd, &length, sizeof(int));
+    if(length == -1) 
+      break;
+    recv_data((int)acceptfd, &oidmid, 2*sizeof(unsigned int));
+    oid = oidmid.oid;
+    if (mid != oidmid.mid) {
+      if (mid!=-1) {
+       freeSockWithLock(transPResponseSocketPool, mid, sd);
+      }
+      mid=oidmid.mid;
+      if((sd = getSockWithLock(transPResponseSocketPool, mid)) == -1) {
+       printf("Error: No socket id in pool of sockets at %s line %d\n", __FILE__, __LINE__);
+       exit(-1);
+      }
+    }
+    size = length - sizeof(int) - (2 * sizeof(unsigned int));
+    numoffset = size/sizeof(short);
+    short offsetarry[numoffset];
+    recv_data((int) acceptfd, offsetarry, size);
+    
+    /*Process each oid */
+    if ((header = mhashSearch(oid)) == NULL) {/* Obj not found */
+      /* Save the oids not found in buffer for later use */
+      size = sizeof(int) + sizeof(char) + sizeof(unsigned int) ;
+      sendbuffer = calloc(1, size);
+      *((int *) sendbuffer) = size;
+      *((char *)(sendbuffer + sizeof(int))) = OBJECT_NOT_FOUND;
+      *((unsigned int *)(sendbuffer + sizeof(int) + sizeof(char))) = oid;
+      control = TRANS_PREFETCH_RESPONSE;
+      
+      if(sendPrefetchResponse(sd, &control, sendbuffer, &size) != 0) {
+       printf("Error: %s() in sending prefetch response at %s, %d\n",
+              __func__, __FILE__, __LINE__);
+       close(sd);
+       return -1;
+      }
+    } else { /* Object Found */
+      int incr = 0;
+      GETSIZE(objsize, header);
+      size = sizeof(int) + sizeof(char) + sizeof(unsigned int) + sizeof(objheader_t) + objsize;
+      sendbuffer = calloc(1, size);
+      *((int *) (sendbuffer + incr)) = size;
+      incr += sizeof(int);
+      *((char *)(sendbuffer + incr)) = OBJECT_FOUND;
+      incr += sizeof(char);
+      *((unsigned int *)(sendbuffer+incr)) = oid;
+      incr += sizeof(unsigned int);
+      memcpy(sendbuffer + incr, header, objsize + sizeof(objheader_t));
+      
+      control = TRANS_PREFETCH_RESPONSE;
+      if(sendPrefetchResponse(sd, &control, sendbuffer, &size) != 0) {
+       printf("Error: %s() in sending prefetch response at %s, %d\n",
+              __func__, __FILE__, __LINE__);
+       close(sd);
+       return -1;
+      }
+      
+      /* Calculate the oid corresponding to the offset value */
+      for(i = 0 ; i< numoffset ; i++) {
+       /* Check for arrays  */
+       if(TYPE(header) > NUMCLASSES) {
+         int elementsize = classsize[TYPE(header)];
+         struct ArrayObject *ao = (struct ArrayObject *) (((char *)header) + sizeof(objheader_t));
+         unsigned short length = ao->___length___;
+         /* Check if array out of bounds */
+         if(offsetarry[i]< 0 || offsetarry[i] >= length) {
+           break;
+         }
+         oid = *((unsigned int *)(((char *)header) + sizeof(objheader_t) + sizeof(struct ArrayObject) + (elementsize*offsetarry[i])));
+       } else {
+         oid = *((unsigned int *)(((char *)header) + sizeof(objheader_t) + offsetarry[i]));
+       }
+       
+       if((header = mhashSearch(oid)) == NULL) {
+         size = sizeof(int) + sizeof(char) + sizeof(unsigned int) ;
+         if((sendbuffer = calloc(1, size)) == NULL) {
+           printf("Calloc error at %s,%d\n", __FILE__, __LINE__);
+           close(sd);
+           return -1;
+         }
+         *((int *) sendbuffer) = size;
+         *((char *)(sendbuffer + sizeof(int))) = OBJECT_NOT_FOUND;
+         *((unsigned int *)(sendbuffer + sizeof(int) + sizeof(char))) = oid;
+         
+         control = TRANS_PREFETCH_RESPONSE;
+         if(sendPrefetchResponse(sd, &control, sendbuffer, &size) != 0) {
+           printf("Error: %s() in sending prefetch response at %s, %d\n",
+                  __FILE__, __LINE__);
+           close(sd);
+           return -1;
+         }
+         break;
+       } else {/* Obj Found */
+         int incr = 0;
+         GETSIZE(objsize, header);
+         size = sizeof(int) + sizeof(char) + sizeof(unsigned int) + sizeof(objheader_t) + objsize;
+         if((sendbuffer = calloc(1, size)) == NULL) {
+           printf("Calloc error at %s,%d\n", __func__, __FILE__, __LINE__);
+           close(sd);
+           return -1;
+         }
+         *((int *) (sendbuffer + incr)) = size;
+         incr += sizeof(int);
+         *((char *)(sendbuffer + incr)) = OBJECT_FOUND;
+         incr += sizeof(char);
+         *((unsigned int *)(sendbuffer+incr)) = oid;
+         incr += sizeof(unsigned int);
+         memcpy(sendbuffer + incr, header, objsize + sizeof(objheader_t));
+         
+         control = TRANS_PREFETCH_RESPONSE;
+         if(sendPrefetchResponse(sd, &control, sendbuffer, &size) != 0) {
+           printf("Error: %s() in sending prefetch response at %s, %d\n",
+                  __func__, __FILE__, __LINE__);
+           close(sd);
+           return -1;
+         }
+       }
+      }
+    }
+  }
+  //Release socket
+  if (mid!=-1)
+    freeSockWithLock(transPResponseSocketPool, mid, sd);
+    
+  return 0;
 }
 
 int sendPrefetchResponse(int sd, char *control, char *sendbuffer, int *size) {
-       int numbytes = 0;
-
        send_data(sd, control, sizeof(char));
        /* Send the buffer with its size */
        int length = *(size);
index 78e7272d5aa873a8866be90cea42b761a1692f51..64337b25927215a187a8e8b87799ba45f099a490 100644 (file)
@@ -23,7 +23,7 @@ inline static void UnLock(volatile unsigned int *addr) {
 
 #define MAXSPINS 1000
 
-inline void Lock(unsigned int *s) {
+inline void Lock(volatile unsigned int *s) {
   while(test_and_set(s)) {
     int i=0;
     while(*s) {
@@ -85,7 +85,7 @@ int getSockWithLock(sockPoolHashTable_t *sockhash, unsigned int mid) {
   Lock(&sockhash->mylock);
   ptr=&sockhash->table[key];
   
-  while(ptr!=NULL) {
+  while(*ptr!=NULL) {
     if (mid == (*ptr)->mid) {
       socknode_t *tmp=*ptr;
       sd = tmp->sd;
@@ -114,7 +114,7 @@ int getSock(sockPoolHashTable_t *sockhash, unsigned int mid) {
   
   ptr=&sockhash->table[key];
   
-  while(ptr!=NULL) {
+  while(*ptr!=NULL) {
     if (mid == (*ptr)->mid) {
       socknode_t *tmp=*ptr;
       sd = tmp->sd;
@@ -129,14 +129,36 @@ int getSock(sockPoolHashTable_t *sockhash, unsigned int mid) {
     socknode_t *inusenode = calloc(1, sizeof(socknode_t));
     inusenode->next=sockhash->inuse;
     sockhash->inuse=inusenode;
-    inusenode->next=sockhash;
-    sockhash=inusenode;
     return sd;
   } else {
     return -1;
   }
 }
 
+int getSock2(sockPoolHashTable_t *sockhash, unsigned int mid) {
+  socknode_t **ptr;
+  int key = mid%(sockhash->size);
+  int sd;
+  
+  ptr=&sockhash->table[key];
+  
+  while(*ptr!=NULL) {
+    if (mid == (*ptr)->mid) {
+      return (*ptr)->sd;
+    }
+    ptr=&((*ptr)->next);
+  }
+  if((sd = createNewSocket(mid)) != -1) {
+    *ptr=calloc(1, sizeof(socknode_t));
+    (*ptr)->mid=mid;
+    (*ptr)->sd=sd;
+    return sd;
+  } else {
+    return -1;
+  }
+}
+
+
 void insToListWithLock(sockPoolHashTable_t *sockhash, socknode_t *inusenode) {
     Lock(&sockhash->mylock);
     inusenode->next = sockhash->inuse;
index 96d4684098e47c339739b582588270537fd93ba4..bc1b4fab4aa28bdfcc6e91f65217bf2b163ff160 100644 (file)
@@ -17,18 +17,15 @@ typedef struct sockPoolHashTable {
     volatile unsigned int mylock;
 } sockPoolHashTable_t;
 
-sockPoolHashTable_t *createSockPool(sockPoolHashTable_t *, unsigned int, float);
+sockPoolHashTable_t *createSockPool(sockPoolHashTable_t *, unsigned int);
 int getSock(sockPoolHashTable_t *, unsigned int);
+int getSock2(sockPoolHashTable_t *, unsigned int);
 int getSockWithLock(sockPoolHashTable_t *, unsigned int);
-int freeSock(sockPoolHashTable_t *, unsigned int, int);
-int freeSockWithLock(sockPoolHashTable_t *, unsigned int, int);
+void freeSock(sockPoolHashTable_t *, unsigned int, int);
+void freeSockWithLock(sockPoolHashTable_t *, unsigned int, int);
 void insToList(sockPoolHashTable_t *, socknode_t *);
 void insToListWithLock(sockPoolHashTable_t *, socknode_t *);
 int createNewSocket(unsigned int);
-int CompareAndSwap(int *, int, int);
-void InitLock(SpinLock *);
-void Lock (SpinLock *);
-void UnLock (SpinLock *);
 
 #if 0
 /************************************************
index 99e6356117ef19920ba791337a4ee86e0f73e95a..155a622271d64c4d320ac418d724d9bdab46620f 100644 (file)
@@ -127,7 +127,7 @@ void prefetch(int ntuples, unsigned int *oids, unsigned short *endoffsets, short
   if(ntuples > 0) {
     int qnodesize = sizeof(prefetchqelem_t) + sizeof(int) + ntuples * (sizeof(unsigned short) + sizeof(unsigned int)) + endoffsets[ntuples - 1] * sizeof(short);
     char * node;
-
+    
     if((node = calloc(1, qnodesize)) == NULL) {
       printf("Calloc Error %s, %d\n", __FILE__, __LINE__);
       return;
@@ -163,40 +163,33 @@ void prefetch(int ntuples, unsigned int *oids, unsigned short *endoffsets, short
 
 /* This function starts up the transaction runtime. */
 int dstmStartup(const char * option) {
-       pthread_t thread_Listen;
-       pthread_attr_t attr;
-       int master=option!=NULL && strcmp(option, "master")==0;
-
-       if (processConfigFile() != 0)
-               return 0; //TODO: return error value, cause main program to exit
+  pthread_t thread_Listen;
+  pthread_attr_t attr;
+  int master=option!=NULL && strcmp(option, "master")==0;
+  
+  if (processConfigFile() != 0)
+    return 0; //TODO: return error value, cause main program to exit
 #ifdef COMPILER
-       if (!master)
-         threadcount--;
+  if (!master)
+    threadcount--;
 #endif
-
-    //Initialize socket pool
-    if((transReadSockPool = createSockPool(transReadSockPool, 2*numHostsInSystem+1, 0.5)) == NULL) {
-        printf("Error in creating new socket pool at  %s line %d\n", __FILE__, __LINE__);
-        return 0;
-    }
-    if((transPrefetchSockPool = createSockPool(transPrefetchSockPool, 2*numHostsInSystem+1, 0.5)) == NULL) {
-        printf("Error in creating new socket pool at  %s line %d\n", __FILE__, __LINE__);
-        return 0;
-    }
-
-       dstmInit();
-       transInit();
-
-       if (master) {
-               pthread_attr_init(&attr);
-               pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_DETACHED);
-               pthread_create(&thread_Listen, &attr, dstmListen, NULL);
-               return 1;
-       } else {
-               dstmListen();
-               return 0;
-       }
-
+  
+  //Initialize socket pool
+  transReadSockPool = createSockPool(transReadSockPool, 2*numHostsInSystem+1);
+  transPrefetchSockPool = createSockPool(transPrefetchSockPool, 2*numHostsInSystem+1);
+  
+  dstmInit();
+  transInit();
+  
+  if (master) {
+    pthread_attr_init(&attr);
+    pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_DETACHED);
+    pthread_create(&thread_Listen, &attr, dstmListen, NULL);
+    return 1;
+  } else {
+    dstmListen();
+    return 0;
+  }
 }
 
 //TODO Use this later
@@ -841,52 +834,37 @@ char sendResponse(thread_data_array_t *tdata, int sd) {
  * */ 
 
 void *getRemoteObj(transrecord_t *record, unsigned int mnum, unsigned int oid) {
-       int size, val;
-       struct sockaddr_in serv_addr;
-       char machineip[16];
-       char control;
-       objheader_t *h;
-       void *objcopy = NULL;
-
-    int sd;
-    if((sd = getSock(transReadSockPool, mnum)) == -1) {
-        printf("%s(): Error: no socket id in the pool of sockets at %s, %d\n", __func__, __FILE__, __LINE__);
-        return NULL;
-    }
+  int size, val;
+  struct sockaddr_in serv_addr;
+  char machineip[16];
+  char control;
+  objheader_t *h;
+  void *objcopy = NULL;
+  
+  int sd = getSock2(transReadSockPool, mnum);
+  char readrequest[sizeof(char)+sizeof(unsigned int)];
+  readrequest[0] = READ_REQUEST;
+  *((unsigned int *)(&readrequest[1])) = oid;
+  send_data(sd, readrequest, sizeof(readrequest));
+  
+  /* Read response from the Participant */
+  recv_data(sd, &control, sizeof(char));
+  
+  if (control==OBJECT_NOT_FOUND) {
+    objcopy = NULL;
+  } else {
+    /* Read object if found into local cache */
+    recv_data(sd, &size, sizeof(int));
+    objcopy = objstrAlloc(record->cache, size);
+    recv_data(sd, objcopy, size);
     
-       char readrequest[sizeof(char)+sizeof(unsigned int)];
-       readrequest[0] = READ_REQUEST;
-       *((unsigned int *)(&readrequest[1])) = oid;
-       send_data(sd, readrequest, sizeof(readrequest));
-
-       /* Read response from the Participant */
-       recv_data(sd, &control, sizeof(char));
-
-       switch(control) {
-               case OBJECT_NOT_FOUND:
-            objcopy = NULL;
-            break;
-               case OBJECT_FOUND:
-                       /* Read object if found into local cache */
-                       recv_data(sd, &size, sizeof(int));
-                       objcopy = objstrAlloc(record->cache, size);
-                       recv_data(sd, objcopy, size);
-                       
-                       /* Insert into cache's lookup table */
-                       chashInsert(record->lookupTable, oid, objcopy); 
-                       break;
-               default:
-                       printf("Error: in recv response from participant on a READ_REQUEST %s, %d\n",__FILE__, __LINE__);
-            break;
-       }
-
-    int status;
-    if((status = freeSock(transReadSockPool, mnum, sd)) == -1) {
-        printf("Error in releasing socket at %s line %d\n", __FILE__, __LINE__);
-        return NULL;
-    }
-
-       return objcopy;
+    /* Insert into cache's lookup table */
+    chashInsert(record->lookupTable, oid, objcopy); 
+  }
+  
+  //    freeSock(transReadSockPool, mnum, sd);
+  
+  return objcopy;
 }
 
 /* This function handles the local objects involved in a transaction commiting process.
@@ -1416,100 +1394,87 @@ void checkPreCache(prefetchqelem_t *node, int *numoffset, unsigned int objoid, i
 
 /* This function is called by the thread calling transPrefetch */
 void *transPrefetch(void *t) {
-       while(1) {
-               /* lock mutex of primary prefetch queue */
-               pthread_mutex_lock(&pqueue.qlock);
-               /* while primary queue is empty, then wait */
-               while((pqueue.front == NULL) && (pqueue.rear == NULL)) {
-                       pthread_cond_wait(&pqueue.qcond, &pqueue.qlock);
-               }
-
-               /* dequeue node to create a machine piles and  finally unlock mutex */
-        prefetchqelem_t *qnode;
-               if((qnode = pre_dequeue()) == NULL) {
-                       printf("Error: No node returned %s, %d\n", __FILE__, __LINE__);
-                       pthread_mutex_unlock(&pqueue.qlock);
-                       continue;
-               }
-               pthread_mutex_unlock(&pqueue.qlock);
-                               
-               /* Reduce redundant prefetch requests */
-               checkPrefetchTuples(qnode);
-               /* Check if the tuples are found locally, if yes then reduce them further*/ 
-               /* and group requests by remote machine ids by calling the makePreGroups() */
-        prefetchpile_t *pilehead = NULL;
-               if((pilehead = foundLocal(qnode)) == NULL) {
-                       printf("Error: No node created for serving prefetch request %s %d\n", __FILE__, __LINE__);
-                       pre_enqueue(qnode);
-                       continue;
-               }
-
-        // Get sock from shared pool 
-        int sd = -1;
-        if((sd = getSock(transPrefetchSockPool, pilehead->mid)) == -1) {
-            printf("Error: No socket id in pool of sockets at %s line %d\n", __FILE__, __LINE__);
-            exit(-1);
-        }
-
-        /* Send  Prefetch Request */
-        prefetchpile_t *ptr = pilehead;
-        while(ptr != NULL) {
-            sendPrefetchReq(ptr, sd);
-            ptr = ptr->next; 
-        }
-
-        /* Release socket */
-        int status;
-        if((status = freeSock(transPrefetchSockPool, pilehead->mid, sd)) == -1) {
-            printf("Error: In realeasing socket at %s line %d\n", __FILE__, __LINE__);
-            return;
-        }
-
-        /* Deallocated pilehead */
-        mcdealloc(pilehead);
-
-               // Deallocate the prefetch queue pile node
-               predealloc(qnode);
-       }
+  while(1) {
+    /* lock mutex of primary prefetch queue */
+    pthread_mutex_lock(&pqueue.qlock);
+    /* while primary queue is empty, then wait */
+    while((pqueue.front == NULL) && (pqueue.rear == NULL)) {
+      pthread_cond_wait(&pqueue.qcond, &pqueue.qlock);
+    }
+    
+    /* dequeue node to create a machine piles and  finally unlock mutex */
+    prefetchqelem_t *qnode;
+    if((qnode = pre_dequeue()) == NULL) {
+      printf("Error: No node returned %s, %d\n", __FILE__, __LINE__);
+      pthread_mutex_unlock(&pqueue.qlock);
+      continue;
+    }
+    pthread_mutex_unlock(&pqueue.qlock);
+    
+    /* Reduce redundant prefetch requests */
+    checkPrefetchTuples(qnode);
+    /* Check if the tuples are found locally, if yes then reduce them further*/ 
+    /* and group requests by remote machine ids by calling the makePreGroups() */
+    prefetchpile_t *pilehead = foundLocal(qnode);
+    
+    // Get sock from shared pool 
+    int sd = getSock2(transPrefetchSockPool, pilehead->mid);
+    
+    /* Send  Prefetch Request */
+    prefetchpile_t *ptr = pilehead;
+    while(ptr != NULL) {
+      sendPrefetchReq(ptr, sd);
+      ptr = ptr->next; 
+    }
+    
+    /* Release socket */
+    // freeSock(transPrefetchSockPool, pilehead->mid, sd);
+    
+    /* Deallocated pilehead */
+    mcdealloc(pilehead);
+    
+    // Deallocate the prefetch queue pile node
+    predealloc(qnode);
+  }
 }
 
 void sendPrefetchReq(prefetchpile_t *mcpilenode, int sd) {
-       int off, len, endpair, count = 0;
-       char control;
-       objpile_t *tmp;
-
-       /* Send TRANS_PREFETCH control message */
-       control = TRANS_PREFETCH;
-       send_data(sd, &control, sizeof(char));
-
-       /* Send Oids and offsets in pairs */
-       tmp = mcpilenode->objpiles;
-       while(tmp != NULL) {
-               off = 0;
-               count++;  /* Keeps track of the number of oid and offset tuples sent per remote machine */
-               len = sizeof(int) + sizeof(unsigned int) + sizeof(unsigned int) + ((tmp->numoffset) * sizeof(short));
-               char oidnoffset[len];
-               bzero(oidnoffset, len);
-               *((int*)oidnoffset) = len;
-               off = sizeof(int);
-               *((unsigned int *)(oidnoffset + off)) = tmp->oid;
-               off += sizeof(unsigned int);
-               *((unsigned int *)(oidnoffset + off)) = myIpAddr; 
-               off += sizeof(unsigned int);
-        int i;
-               for(i = 0; i < tmp->numoffset; i++) {
-                       *((short*)(oidnoffset + off)) = tmp->offset[i];
-                       off+=sizeof(short);
-               }
-               send_data(sd, oidnoffset, len);
-               tmp = tmp->next;
-       }
-
-       /* Send a special char -1 to represent the end of sending oids + offset pair to remote machine */
-       endpair = -1;
-       send_data(sd, &endpair, sizeof(int));
-       
-       return;
+  int off, len, endpair, count = 0;
+  char control;
+  objpile_t *tmp;
+  
+  /* Send TRANS_PREFETCH control message */
+  control = TRANS_PREFETCH;
+  send_data(sd, &control, sizeof(char));
+  
+  /* Send Oids and offsets in pairs */
+  tmp = mcpilenode->objpiles;
+  while(tmp != NULL) {
+    off = 0;
+    count++;  /* Keeps track of the number of oid and offset tuples sent per remote machine */
+    len = sizeof(int) + sizeof(unsigned int) + sizeof(unsigned int) + ((tmp->numoffset) * sizeof(short));
+    char oidnoffset[len];
+    bzero(oidnoffset, len);
+    *((int*)oidnoffset) = len;
+    off = sizeof(int);
+    *((unsigned int *)(oidnoffset + off)) = tmp->oid;
+    off += sizeof(unsigned int);
+    *((unsigned int *)(oidnoffset + off)) = myIpAddr; 
+    off += sizeof(unsigned int);
+    int i;
+    for(i = 0; i < tmp->numoffset; i++) {
+      *((short*)(oidnoffset + off)) = tmp->offset[i];
+      off+=sizeof(short);
+    }
+    send_data(sd, oidnoffset, len);
+    tmp = tmp->next;
+  }
+  
+  /* Send a special char -1 to represent the end of sending oids + offset pair to remote machine */
+  endpair = -1;
+  send_data(sd, &endpair, sizeof(int));
+  
+  return;
 }
 
 int getPrefetchResponse(int sd) {