remove the system.clearPrefetchCache call
[IRC.git] / Robust / src / Runtime / DSTM / interface / trans.c
index d706678bffb2e2fa61dd8e55168e80187cc79d02..087c208af5d25c1051b9ba4ddea222a6b4c88ac6 100644 (file)
@@ -9,6 +9,7 @@
 #include "threadnotify.h"
 #include "queue.h"
 #include "addUdpEnhance.h"
+#include "addPrefetchEnhance.h"
 #include "gCollect.h"
 #ifdef COMPILER
 #include "thread.h"
@@ -40,6 +41,7 @@ unsigned int oidMax;
 
 sockPoolHashTable_t *transReadSockPool;
 sockPoolHashTable_t *transPrefetchSockPool;
+sockPoolHashTable_t *transRequestSockPool;
 pthread_mutex_t notifymutex;
 pthread_mutex_t atomicObjLock;
 
@@ -180,6 +182,7 @@ int dstmStartup(const char * option) {
   //Initialize socket pool
   transReadSockPool = createSockPool(transReadSockPool, 2*numHostsInSystem+1);
   transPrefetchSockPool = createSockPool(transPrefetchSockPool, 2*numHostsInSystem+1);
+  transRequestSockPool = createSockPool(transRequestSockPool, 2*numHostsInSystem+1);
   
   dstmInit();
   transInit();
@@ -483,7 +486,6 @@ int transCommit(transrecord_t *record) {
     pthread_mutex_t tlshrd;
     
     thread_data_array = (thread_data_array_t *) calloc(pilecount, sizeof(thread_data_array_t));
-    
     ltdata = calloc(1, sizeof(local_thread_data_array_t));
     
     thread_response_t rcvd_control_msg[pilecount];     /* Shared thread array that keeps track of responses of participants */
@@ -520,6 +522,7 @@ int transCommit(transrecord_t *record) {
       thread_data_array[threadnum].replyctrl = &treplyctrl;
       thread_data_array[threadnum].replyretry = &treplyretry;
       thread_data_array[threadnum].rec = record;
+      thread_data_array[threadnum].pilehead = pile_ptr;
       /* If local do not create any extra connection */
       if(pile->mid != myIpAddr) { /* Not local */
        do {
@@ -639,25 +642,11 @@ void *transRequest(void *threadarg) {
   objheader_t *headeraddr;
   char control, recvcontrol;
   char machineip[16], retval;
-  
+
   tdata = (thread_data_array_t *) threadarg;
-  
-  /* Send Trans Request */
-  if ((sd = socket(AF_INET, SOCK_STREAM, 0)) <= 0) {
-    printf("transRequest():error %d\n", errno);
-    perror("transRequest() socket error");
-    pthread_exit(NULL);
-  }
-  bzero((char*) &serv_addr, sizeof(serv_addr));
-  serv_addr.sin_family = AF_INET;
-  serv_addr.sin_port = htons(LISTEN_PORT);
-  serv_addr.sin_addr.s_addr = htonl(tdata->mid);
-
-  /* Open Connection */
-  if (connect(sd, (struct sockaddr *) &serv_addr, sizeof(serv_addr)) < 0) {
-    printf("transRequest():error %d, sd= %d\n", errno, sd);
-    perror("transRequest() connect");
-    close(sd);
+
+  if((sd = getSock2WithLock(transRequestSockPool, tdata->mid)) < 0) {
+    printf("transRequest(): socket create error\n");
     pthread_exit(NULL);
   }
   
@@ -695,7 +684,6 @@ void *transRequest(void *threadarg) {
     pthread_mutex_lock(&prefetchcache_mutex);
     if ((newAddr = prefetchobjstrAlloc((unsigned int)length)) == NULL) {
       printf("Error: %s() objstrAlloc error for copying into prefetch cache %s, %d\n", __func__, __FILE__, __LINE__);
-      close(sd);
       pthread_exit(NULL);
     }
     pthread_mutex_unlock(&prefetchcache_mutex);
@@ -744,7 +732,6 @@ void *transRequest(void *threadarg) {
    * to all participants in their respective socket */
   if (sendResponse(tdata, sd) == 0) { 
     printf("sendResponse returned error %s,%d\n", __FILE__, __LINE__);
-    close(sd);
     pthread_exit(NULL);
   }
   
@@ -757,9 +744,6 @@ void *transRequest(void *threadarg) {
   } else {
     //printf("DEBUG-> Error: Incorrect Transaction End Message %d\n", control);
   }
-  
-  /* Close connection */
-  close(sd);
   pthread_exit(NULL);
 }
 
@@ -796,30 +780,17 @@ void decideResponse(thread_data_array_t *tdata) {
     *(tdata->replyctrl) = TRANS_ABORT;
     *(tdata->replyretry) = 0;
     /* clear objects from prefetch cache */
-    for (i = 0; i < tdata->buffer->f.numread; i++) {
-      prehashRemove(*((unsigned int *)(((char *)tdata->buffer->objread) + (sizeof(unsigned int) + sizeof(unsigned short))*i)));
-    }
-    for (i = 0; i < tdata->buffer->f.nummod; i++) {
-      prehashRemove(tdata->buffer->oidmod[i]);
-    }
+    cleanPCache(tdata);
   } else if(transagree == tdata->buffer->f.mcount){
     /* Send Commit */
     *(tdata->replyctrl) = TRANS_COMMIT;
     *(tdata->replyretry) = 0;
-    /* update prefetch cache */
-    /* For objects read */
-    char oidType;
     int retval;
-    oidType = 'R'; 
-    if((retval = updatePrefetchCache(tdata, tdata->buffer->f.numread, oidType)) != 0) {
-      printf("Error: %s() in updating prefetch cache %s, %d\n", __func__, __FILE__, __LINE__);
-      return;
-    }
-    oidType = 'M'; 
-    if((retval = updatePrefetchCache(tdata, tdata->buffer->f.nummod, oidType)) != 0) {
+    if((retval = updatePrefetchCache(tdata)) != 0) {
       printf("Error: %s() in updating prefetch cache %s, %d\n", __func__, __FILE__, __LINE__);
       return;
     }
+
     /* Invalidate objects in other machine cache */
     if(tdata->buffer->f.nummod > 0) {
       if((retval = invalidateObj(tdata)) != 0) {
@@ -835,47 +806,6 @@ void decideResponse(thread_data_array_t *tdata) {
   return;
 }
 
-/* This function updates the prefetch cache when commiting objects 
- * based on the type of oid i.e. if oid is read or oid is modified
- * Return -1 on error else returns 0 
- */
-int updatePrefetchCache(thread_data_array_t* tdata, int numoid, char oidType) {
-  int i;
-  for (i = 0; i < numoid; i++) {
-    //find address object 
-    objheader_t *header, *newAddr;
-    int size;
-    unsigned int oid;
-    if(oidType == 'R') {
-      oid = *((unsigned int *)(tdata->buffer->objread + (sizeof(unsigned int) + sizeof(unsigned short))*i)); 
-    } else {
-      oid = tdata->buffer->oidmod[i];
-    }
-    pthread_mutex_lock(&prefetchcache_mutex);
-    header = (objheader_t *) chashSearch(tdata->rec->lookupTable, oid);
-    header->version += 1;
-    //copy object into prefetch cache
-    GETSIZE(size, header);
-    if ((newAddr = prefetchobjstrAlloc(size + sizeof(objheader_t))) == NULL) {
-      printf("Error: %s() objstrAlloc error for copying into prefetch cache %s, %d\n", __func__, __FILE__, __LINE__);
-      pthread_mutex_unlock(&prefetchcache_mutex);
-      return -1;
-    }
-    pthread_mutex_unlock(&prefetchcache_mutex);
-    memcpy(newAddr, header, (size + sizeof(objheader_t)));
-    //make an entry in prefetch hash table
-    void *oldptr;
-    if((oldptr = prehashSearch(oid)) != NULL) {
-      prehashRemove(oid);
-      prehashInsert(oid, newAddr);
-    } else {
-      prehashInsert(oid, newAddr);
-    }
-  }
-  return 0;
-}
-
-
 /* This function sends the final response to remote machines per
  * thread in their respective socket id It returns a char that is only
  * needed to check the correctness of execution of this function
@@ -1741,3 +1671,70 @@ void transAbort(transrecord_t *trans) {
   chashDelete(trans->lookupTable);
   free(trans);
 }
+
+/* This function inserts necessary information into 
+ * a machine pile data structure */
+plistnode_t *pInsert(plistnode_t *pile, objheader_t *headeraddr, unsigned int mid, int num_objs) {
+  plistnode_t *ptr, *tmp;
+  int found = 0, offset = 0;
+  
+  tmp = pile;
+  //Add oid into a machine that is already present in the pile linked list structure
+  while(tmp != NULL) {
+    if (tmp->mid == mid) {
+      int tmpsize;
+      
+      if (STATUS(headeraddr) & NEW) {
+       tmp->oidcreated[tmp->numcreated] = OID(headeraddr);
+       tmp->numcreated++;
+       GETSIZE(tmpsize, headeraddr);
+       tmp->sum_bytes += sizeof(objheader_t) + tmpsize;
+      }else if (STATUS(headeraddr) & DIRTY) {
+       tmp->oidmod[tmp->nummod] = OID(headeraddr);
+       tmp->nummod++;
+       GETSIZE(tmpsize, headeraddr);
+       tmp->sum_bytes += sizeof(objheader_t) + tmpsize;
+      } else {
+       offset = (sizeof(unsigned int) + sizeof(short)) * tmp->numread;
+       *((unsigned int *)(((char *)tmp->objread) + offset))=OID(headeraddr);
+       offset += sizeof(unsigned int);
+       *((short *)(((char *)tmp->objread) + offset)) = headeraddr->version;
+       tmp->numread ++;
+      }
+      found = 1;
+      break;
+    }
+    tmp = tmp->next;
+  }
+  //Add oid for any new machine 
+  if (!found) {
+    int tmpsize;
+    if((ptr = pCreate(num_objs)) == NULL) {
+      return NULL;
+    }
+    ptr->mid = mid;
+    if (STATUS(headeraddr) & NEW) {
+      ptr->oidcreated[ptr->numcreated] = OID(headeraddr);
+      ptr->numcreated ++;
+      GETSIZE(tmpsize, headeraddr);
+      ptr->sum_bytes += sizeof(objheader_t) + tmpsize;
+    } else if (STATUS(headeraddr) & DIRTY) {
+      ptr->oidmod[ptr->nummod] = OID(headeraddr);
+      ptr->nummod ++;
+      GETSIZE(tmpsize, headeraddr);
+      ptr->sum_bytes += sizeof(objheader_t) + tmpsize;
+    } else {
+      *((unsigned int *)ptr->objread)=OID(headeraddr);
+      offset = sizeof(unsigned int);
+      *((short *)(((char *)ptr->objread) + offset)) = headeraddr->version;
+      ptr->numread ++;
+    }
+    ptr->next = pile;
+    pile = ptr;
+  }
+  
+  /* Clear Flags */
+  STATUS(headeraddr) =0;
+  
+  return pile;
+}