fix bugs in sockpool...test and set has to be atomic
[IRC.git] / Robust / src / Runtime / DSTM / interface / trans.c
index 99e6356117ef19920ba791337a4ee86e0f73e95a..155a622271d64c4d320ac418d724d9bdab46620f 100644 (file)
@@ -127,7 +127,7 @@ void prefetch(int ntuples, unsigned int *oids, unsigned short *endoffsets, short
   if(ntuples > 0) {
     int qnodesize = sizeof(prefetchqelem_t) + sizeof(int) + ntuples * (sizeof(unsigned short) + sizeof(unsigned int)) + endoffsets[ntuples - 1] * sizeof(short);
     char * node;
-
+    
     if((node = calloc(1, qnodesize)) == NULL) {
       printf("Calloc Error %s, %d\n", __FILE__, __LINE__);
       return;
@@ -163,40 +163,33 @@ void prefetch(int ntuples, unsigned int *oids, unsigned short *endoffsets, short
 
 /* This function starts up the transaction runtime. */
 int dstmStartup(const char * option) {
-       pthread_t thread_Listen;
-       pthread_attr_t attr;
-       int master=option!=NULL && strcmp(option, "master")==0;
-
-       if (processConfigFile() != 0)
-               return 0; //TODO: return error value, cause main program to exit
+  pthread_t thread_Listen;
+  pthread_attr_t attr;
+  int master=option!=NULL && strcmp(option, "master")==0;
+  
+  if (processConfigFile() != 0)
+    return 0; //TODO: return error value, cause main program to exit
 #ifdef COMPILER
-       if (!master)
-         threadcount--;
+  if (!master)
+    threadcount--;
 #endif
-
-    //Initialize socket pool
-    if((transReadSockPool = createSockPool(transReadSockPool, 2*numHostsInSystem+1, 0.5)) == NULL) {
-        printf("Error in creating new socket pool at  %s line %d\n", __FILE__, __LINE__);
-        return 0;
-    }
-    if((transPrefetchSockPool = createSockPool(transPrefetchSockPool, 2*numHostsInSystem+1, 0.5)) == NULL) {
-        printf("Error in creating new socket pool at  %s line %d\n", __FILE__, __LINE__);
-        return 0;
-    }
-
-       dstmInit();
-       transInit();
-
-       if (master) {
-               pthread_attr_init(&attr);
-               pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_DETACHED);
-               pthread_create(&thread_Listen, &attr, dstmListen, NULL);
-               return 1;
-       } else {
-               dstmListen();
-               return 0;
-       }
-
+  
+  //Initialize socket pool
+  transReadSockPool = createSockPool(transReadSockPool, 2*numHostsInSystem+1);
+  transPrefetchSockPool = createSockPool(transPrefetchSockPool, 2*numHostsInSystem+1);
+  
+  dstmInit();
+  transInit();
+  
+  if (master) {
+    pthread_attr_init(&attr);
+    pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_DETACHED);
+    pthread_create(&thread_Listen, &attr, dstmListen, NULL);
+    return 1;
+  } else {
+    dstmListen();
+    return 0;
+  }
 }
 
 //TODO Use this later
@@ -841,52 +834,37 @@ char sendResponse(thread_data_array_t *tdata, int sd) {
  * */ 
 
 void *getRemoteObj(transrecord_t *record, unsigned int mnum, unsigned int oid) {
-       int size, val;
-       struct sockaddr_in serv_addr;
-       char machineip[16];
-       char control;
-       objheader_t *h;
-       void *objcopy = NULL;
-
-    int sd;
-    if((sd = getSock(transReadSockPool, mnum)) == -1) {
-        printf("%s(): Error: no socket id in the pool of sockets at %s, %d\n", __func__, __FILE__, __LINE__);
-        return NULL;
-    }
+  int size, val;
+  struct sockaddr_in serv_addr;
+  char machineip[16];
+  char control;
+  objheader_t *h;
+  void *objcopy = NULL;
+  
+  int sd = getSock2(transReadSockPool, mnum);
+  char readrequest[sizeof(char)+sizeof(unsigned int)];
+  readrequest[0] = READ_REQUEST;
+  *((unsigned int *)(&readrequest[1])) = oid;
+  send_data(sd, readrequest, sizeof(readrequest));
+  
+  /* Read response from the Participant */
+  recv_data(sd, &control, sizeof(char));
+  
+  if (control==OBJECT_NOT_FOUND) {
+    objcopy = NULL;
+  } else {
+    /* Read object if found into local cache */
+    recv_data(sd, &size, sizeof(int));
+    objcopy = objstrAlloc(record->cache, size);
+    recv_data(sd, objcopy, size);
     
-       char readrequest[sizeof(char)+sizeof(unsigned int)];
-       readrequest[0] = READ_REQUEST;
-       *((unsigned int *)(&readrequest[1])) = oid;
-       send_data(sd, readrequest, sizeof(readrequest));
-
-       /* Read response from the Participant */
-       recv_data(sd, &control, sizeof(char));
-
-       switch(control) {
-               case OBJECT_NOT_FOUND:
-            objcopy = NULL;
-            break;
-               case OBJECT_FOUND:
-                       /* Read object if found into local cache */
-                       recv_data(sd, &size, sizeof(int));
-                       objcopy = objstrAlloc(record->cache, size);
-                       recv_data(sd, objcopy, size);
-                       
-                       /* Insert into cache's lookup table */
-                       chashInsert(record->lookupTable, oid, objcopy); 
-                       break;
-               default:
-                       printf("Error: in recv response from participant on a READ_REQUEST %s, %d\n",__FILE__, __LINE__);
-            break;
-       }
-
-    int status;
-    if((status = freeSock(transReadSockPool, mnum, sd)) == -1) {
-        printf("Error in releasing socket at %s line %d\n", __FILE__, __LINE__);
-        return NULL;
-    }
-
-       return objcopy;
+    /* Insert into cache's lookup table */
+    chashInsert(record->lookupTable, oid, objcopy); 
+  }
+  
+  //    freeSock(transReadSockPool, mnum, sd);
+  
+  return objcopy;
 }
 
 /* This function handles the local objects involved in a transaction commiting process.
@@ -1416,100 +1394,87 @@ void checkPreCache(prefetchqelem_t *node, int *numoffset, unsigned int objoid, i
 
 /* This function is called by the thread calling transPrefetch */
 void *transPrefetch(void *t) {
-       while(1) {
-               /* lock mutex of primary prefetch queue */
-               pthread_mutex_lock(&pqueue.qlock);
-               /* while primary queue is empty, then wait */
-               while((pqueue.front == NULL) && (pqueue.rear == NULL)) {
-                       pthread_cond_wait(&pqueue.qcond, &pqueue.qlock);
-               }
-
-               /* dequeue node to create a machine piles and  finally unlock mutex */
-        prefetchqelem_t *qnode;
-               if((qnode = pre_dequeue()) == NULL) {
-                       printf("Error: No node returned %s, %d\n", __FILE__, __LINE__);
-                       pthread_mutex_unlock(&pqueue.qlock);
-                       continue;
-               }
-               pthread_mutex_unlock(&pqueue.qlock);
-                               
-               /* Reduce redundant prefetch requests */
-               checkPrefetchTuples(qnode);
-               /* Check if the tuples are found locally, if yes then reduce them further*/ 
-               /* and group requests by remote machine ids by calling the makePreGroups() */
-        prefetchpile_t *pilehead = NULL;
-               if((pilehead = foundLocal(qnode)) == NULL) {
-                       printf("Error: No node created for serving prefetch request %s %d\n", __FILE__, __LINE__);
-                       pre_enqueue(qnode);
-                       continue;
-               }
-
-        // Get sock from shared pool 
-        int sd = -1;
-        if((sd = getSock(transPrefetchSockPool, pilehead->mid)) == -1) {
-            printf("Error: No socket id in pool of sockets at %s line %d\n", __FILE__, __LINE__);
-            exit(-1);
-        }
-
-        /* Send  Prefetch Request */
-        prefetchpile_t *ptr = pilehead;
-        while(ptr != NULL) {
-            sendPrefetchReq(ptr, sd);
-            ptr = ptr->next; 
-        }
-
-        /* Release socket */
-        int status;
-        if((status = freeSock(transPrefetchSockPool, pilehead->mid, sd)) == -1) {
-            printf("Error: In realeasing socket at %s line %d\n", __FILE__, __LINE__);
-            return;
-        }
-
-        /* Deallocated pilehead */
-        mcdealloc(pilehead);
-
-               // Deallocate the prefetch queue pile node
-               predealloc(qnode);
-       }
+  while(1) {
+    /* lock mutex of primary prefetch queue */
+    pthread_mutex_lock(&pqueue.qlock);
+    /* while primary queue is empty, then wait */
+    while((pqueue.front == NULL) && (pqueue.rear == NULL)) {
+      pthread_cond_wait(&pqueue.qcond, &pqueue.qlock);
+    }
+    
+    /* dequeue node to create a machine piles and  finally unlock mutex */
+    prefetchqelem_t *qnode;
+    if((qnode = pre_dequeue()) == NULL) {
+      printf("Error: No node returned %s, %d\n", __FILE__, __LINE__);
+      pthread_mutex_unlock(&pqueue.qlock);
+      continue;
+    }
+    pthread_mutex_unlock(&pqueue.qlock);
+    
+    /* Reduce redundant prefetch requests */
+    checkPrefetchTuples(qnode);
+    /* Check if the tuples are found locally, if yes then reduce them further*/ 
+    /* and group requests by remote machine ids by calling the makePreGroups() */
+    prefetchpile_t *pilehead = foundLocal(qnode);
+    
+    // Get sock from shared pool 
+    int sd = getSock2(transPrefetchSockPool, pilehead->mid);
+    
+    /* Send  Prefetch Request */
+    prefetchpile_t *ptr = pilehead;
+    while(ptr != NULL) {
+      sendPrefetchReq(ptr, sd);
+      ptr = ptr->next; 
+    }
+    
+    /* Release socket */
+    // freeSock(transPrefetchSockPool, pilehead->mid, sd);
+    
+    /* Deallocated pilehead */
+    mcdealloc(pilehead);
+    
+    // Deallocate the prefetch queue pile node
+    predealloc(qnode);
+  }
 }
 
 void sendPrefetchReq(prefetchpile_t *mcpilenode, int sd) {
-       int off, len, endpair, count = 0;
-       char control;
-       objpile_t *tmp;
-
-       /* Send TRANS_PREFETCH control message */
-       control = TRANS_PREFETCH;
-       send_data(sd, &control, sizeof(char));
-
-       /* Send Oids and offsets in pairs */
-       tmp = mcpilenode->objpiles;
-       while(tmp != NULL) {
-               off = 0;
-               count++;  /* Keeps track of the number of oid and offset tuples sent per remote machine */
-               len = sizeof(int) + sizeof(unsigned int) + sizeof(unsigned int) + ((tmp->numoffset) * sizeof(short));
-               char oidnoffset[len];
-               bzero(oidnoffset, len);
-               *((int*)oidnoffset) = len;
-               off = sizeof(int);
-               *((unsigned int *)(oidnoffset + off)) = tmp->oid;
-               off += sizeof(unsigned int);
-               *((unsigned int *)(oidnoffset + off)) = myIpAddr; 
-               off += sizeof(unsigned int);
-        int i;
-               for(i = 0; i < tmp->numoffset; i++) {
-                       *((short*)(oidnoffset + off)) = tmp->offset[i];
-                       off+=sizeof(short);
-               }
-               send_data(sd, oidnoffset, len);
-               tmp = tmp->next;
-       }
-
-       /* Send a special char -1 to represent the end of sending oids + offset pair to remote machine */
-       endpair = -1;
-       send_data(sd, &endpair, sizeof(int));
-       
-       return;
+  int off, len, endpair, count = 0;
+  char control;
+  objpile_t *tmp;
+  
+  /* Send TRANS_PREFETCH control message */
+  control = TRANS_PREFETCH;
+  send_data(sd, &control, sizeof(char));
+  
+  /* Send Oids and offsets in pairs */
+  tmp = mcpilenode->objpiles;
+  while(tmp != NULL) {
+    off = 0;
+    count++;  /* Keeps track of the number of oid and offset tuples sent per remote machine */
+    len = sizeof(int) + sizeof(unsigned int) + sizeof(unsigned int) + ((tmp->numoffset) * sizeof(short));
+    char oidnoffset[len];
+    bzero(oidnoffset, len);
+    *((int*)oidnoffset) = len;
+    off = sizeof(int);
+    *((unsigned int *)(oidnoffset + off)) = tmp->oid;
+    off += sizeof(unsigned int);
+    *((unsigned int *)(oidnoffset + off)) = myIpAddr; 
+    off += sizeof(unsigned int);
+    int i;
+    for(i = 0; i < tmp->numoffset; i++) {
+      *((short*)(oidnoffset + off)) = tmp->offset[i];
+      off+=sizeof(short);
+    }
+    send_data(sd, oidnoffset, len);
+    tmp = tmp->next;
+  }
+  
+  /* Send a special char -1 to represent the end of sending oids + offset pair to remote machine */
+  endpair = -1;
+  send_data(sd, &endpair, sizeof(int));
+  
+  return;
 }
 
 int getPrefetchResponse(int sd) {