minor changes for update cache call(where is it called from)
[IRC.git] / Robust / src / Runtime / DSTM / interface / trans.c
index 639ac7875060f9e55e5b78d2a9e03c2ddeb5493b..242e6bce56d7a0b8d7038a880faad75012b8f59c 100644 (file)
@@ -1,6 +1,5 @@
 #include "dstm.h"
 #include "ip.h"
-#include "clookup.h"
 #include "machinepile.h"
 #include "mlookup.h"
 #include "llookup.h"
@@ -50,6 +49,10 @@ pthread_mutex_t atomicObjLock;
  **********************************/
 int numTransCommit = 0;
 int numTransAbort = 0;
+int nchashSearch = 0;
+int nmhashSearch = 0;
+int nprehashSearch = 0;
+int nRemoteSend = 0;
 
 void printhex(unsigned char *, int);
 plistnode_t *createPiles(transrecord_t *);
@@ -189,10 +192,10 @@ int dstmStartup(const char * option) {
   
   fd=startlistening();
   udpfd = udpInit();
+  pthread_attr_init(&attr);
+  pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_DETACHED);
   pthread_create(&udp_thread_Listen, &attr, udpListenBroadcast, (void*)udpfd);
   if (master) {
-    pthread_attr_init(&attr);
-    pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_DETACHED);
     pthread_create(&thread_Listen, &attr, dstmListen, (void*)fd);
     return 1;
   } else {
@@ -299,7 +302,7 @@ transrecord_t *transStart() {
     return NULL;
   }
   tmp->cache = objstrCreate(1048576);
-  tmp->lookupTable = chashCreate(HASH_SIZE, LOADFACTOR);
+  tmp->lookupTable = chashCreate(CHASH_SIZE, CLOADFACTOR);
 #ifdef COMPILER
   tmp->revertlist=NULL;
 #endif
@@ -319,7 +322,10 @@ objheader_t *transRead(transrecord_t *record, unsigned int oid) {
     return NULL;
   }
   
-  if((objheader = (objheader_t *)chashSearch(record->lookupTable, oid)) != NULL){
+  if((objheader = chashSearch(record->lookupTable, oid)) != NULL){
+#ifdef TRANSSTATS
+    nchashSearch++;
+#endif
     /* Search local transaction cache */
 #ifdef COMPILER
     return &objheader[1];
@@ -327,6 +333,9 @@ objheader_t *transRead(transrecord_t *record, unsigned int oid) {
     return objheader;
 #endif
   } else if ((objheader = (objheader_t *) mhashSearch(oid)) != NULL) {
+#ifdef TRANSSTATS
+    nmhashSearch++;
+#endif
     /* Look up in machine lookup table  and copy  into cache*/
     GETSIZE(size, objheader);
     size += sizeof(objheader_t);
@@ -341,6 +350,9 @@ objheader_t *transRead(transrecord_t *record, unsigned int oid) {
     return objcopy;
 #endif
   } else if((tmp = (objheader_t *) prehashSearch(oid)) != NULL) { 
+#ifdef TRANSSTATS
+    nprehashSearch++;
+#endif
     /* Look up in prefetch cache */
     GETSIZE(size, tmp);
     size+=sizeof(objheader_t);
@@ -365,6 +377,9 @@ objheader_t *transRead(transrecord_t *record, unsigned int oid) {
       printf("Error: Object not found in Remote location %s, %d\n", __FILE__, __LINE__);
       return NULL;
     } else {
+#ifdef TRANSSTATS
+    nRemoteSend++;
+#endif
       STATUS(objcopy)=0;      
 #ifdef COMPILER
       return &objcopy[1];
@@ -410,7 +425,7 @@ plistnode_t *createPiles(transrecord_t *record) {
       if(curr->key == 0)
        break;
       
-      if ((headeraddr = chashSearch(record->lookupTable, curr->key)) == NULL) {
+      if ((headeraddr = (objheader_t *) chashSearch(record->lookupTable, curr->key)) == NULL) {
        printf("Error: No such oid %s, %d\n", __FILE__, __LINE__);
        return NULL;
       }
@@ -522,7 +537,6 @@ int transCommit(transrecord_t *record) {
       thread_data_array[threadnum].replyctrl = &treplyctrl;
       thread_data_array[threadnum].replyretry = &treplyretry;
       thread_data_array[threadnum].rec = record;
-      thread_data_array[threadnum].pilehead = pile_ptr;
       /* If local do not create any extra connection */
       if(pile->mid != myIpAddr) { /* Not local */
        do {
@@ -604,6 +618,10 @@ int transCommit(transrecord_t *record) {
   if(treplyctrl == TRANS_ABORT) {
 #ifdef TRANSSTATS
     numTransAbort++;
+#endif
+#ifdef CHECKTB
+    char a[] = "Aborting";
+    TABORT1(a);
 #endif
     /* Free Resources */
     objstrDelete(record->cache);
@@ -615,6 +633,10 @@ int transCommit(transrecord_t *record) {
   } else if(treplyctrl == TRANS_COMMIT) {
 #ifdef TRANSSTATS
     numTransCommit++;
+#endif
+#ifdef CHECKTB
+    char a[] = "Commiting";
+    TABORT1(a);
 #endif
     /* Free Resources */
     objstrDelete(record->cache);
@@ -649,10 +671,10 @@ void *transRequest(void *threadarg) {
     printf("transRequest(): socket create error\n");
     pthread_exit(NULL);
   }
-  
+
   /* Send bytes of data with TRANS_REQUEST control message */
   send_data(sd, &(tdata->buffer->f), sizeof(fixed_data_t));
-  
+
   /* Send list of machines involved in the transaction */
   {
     int size=sizeof(unsigned int)*tdata->buffer->f.mcount;
@@ -694,6 +716,10 @@ void *transRequest(void *threadarg) {
       objheader_t * header;
       header = (objheader_t *) (((char *)newAddr) + offset);
       oidToPrefetch = OID(header);
+#ifdef CHECKTA
+      char a[] = "object type";
+      TABORT8(__func__, a, TYPE(header));
+#endif
       int size = 0;
       GETSIZE(size, header);
       size += sizeof(objheader_t);
@@ -709,7 +735,13 @@ void *transRequest(void *threadarg) {
       offset += size;
     }
   }
+
   recvcontrol = control;
+#ifdef CHECKTA
+  char a[] = "mid";
+  char c[] = "status";
+  TABORT5(__func__, a, c, tdata->mid, control);
+#endif
   /* Update common data structure and increment count */
   tdata->recvmsg[tdata->thread_id].rcv_status = recvcontrol;
   
@@ -727,6 +759,23 @@ void *transRequest(void *threadarg) {
     pthread_cond_wait(tdata->threshold, tdata->lock);
   }
   pthread_mutex_unlock(tdata->lock);
+
+  if(*(tdata->replyctrl) == TRANS_COMMIT) {
+    int retval;
+     /* Update prefetch cache */
+    if((retval = updatePrefetchCache(tdata)) != 0) {
+      printf("Error: %s() in updating prefetch cache %s, %d\n", __func__, __FILE__, __LINE__);
+      return;
+    }
+
+    /* Invalidate objects in other machine cache */
+    if(tdata->buffer->f.nummod > 0) {
+      if((retval = invalidateObj(tdata)) != 0) {
+        printf("Error: %s() in invalidating Objects %s, %d\n", __func__, __FILE__, __LINE__);
+        return;
+      }
+    }
+  }
   
   /* Send the final response such as TRANS_COMMIT or TRANS_ABORT 
    * to all participants in their respective socket */
@@ -785,19 +834,6 @@ void decideResponse(thread_data_array_t *tdata) {
     /* Send Commit */
     *(tdata->replyctrl) = TRANS_COMMIT;
     *(tdata->replyretry) = 0;
-    int retval;
-    if((retval = updatePrefetchCache(tdata)) != 0) {
-      printf("Error: %s() in updating prefetch cache %s, %d\n", __func__, __FILE__, __LINE__);
-      return;
-    }
-
-    /* Invalidate objects in other machine cache */
-    if(tdata->buffer->f.nummod > 0) {
-      if((retval = invalidateObj(tdata)) != 0) {
-        printf("Error: %s() in invalidating Objects %s, %d\n", __func__, __FILE__, __LINE__);
-        return;
-      }
-    }
   } else { 
     /* Send Abort in soft abort case followed by retry commiting transaction again*/
     *(tdata->replyctrl) = TRANS_ABORT;
@@ -888,7 +924,7 @@ void *handleLocalReq(void *threadarg) {
   unsigned short version;
   void *mobj;
   objheader_t *headptr;
-  
+
   localtdata = (local_thread_data_array_t *) threadarg;
   
   /* Counters and arrays to formulate decision on control message to be sent */
@@ -929,6 +965,13 @@ void *handleLocalReq(void *threadarg) {
           v_nomatch++;
           /* Send TRANS_DISAGREE to Coordinator */
           localtdata->tdata->recvmsg[localtdata->tdata->thread_id].rcv_status = TRANS_DISAGREE;
+#ifdef CHECKTA
+  char a[] = "mid";
+  char b[] = "version mismatch";
+  char c[] = "object type";
+  TABORT7(__func__, b, a, c, localtdata->tdata->mid, TYPE(mobj));
+#endif
+          break;
         }
       } else {
        //we're locked
@@ -941,6 +984,13 @@ void *handleLocalReq(void *threadarg) {
           v_nomatch++;
           /* Send TRANS_DISAGREE to Coordinator */
           localtdata->tdata->recvmsg[localtdata->tdata->thread_id].rcv_status = TRANS_DISAGREE;
+#ifdef CHECKTA
+  char a[] = "mid";
+  char b[] = "version mismatch";
+  char c[] = "object type";
+  TABORT7(__func__, b, a, c, localtdata->tdata->mid, TYPE(mobj));
+#endif
+          break;
         }
       }
     }
@@ -951,6 +1001,12 @@ void *handleLocalReq(void *threadarg) {
   }
   /* Condition to send TRANS_SOFT_ABORT */
   if((v_matchlock > 0 && v_nomatch == 0) || (numoidnotfound > 0 && v_nomatch == 0)) {
+#ifdef CHECKTA
+  char a[] = "mid";
+  char b[] = "version mismatch";
+  char c[] = "object type";
+  TABORT7(__func__, b, a, c, localtdata->tdata->mid, TYPE(mobj));
+#endif
     localtdata->tdata->recvmsg[localtdata->tdata->thread_id].rcv_status = TRANS_SOFT_ABORT;
   }
   
@@ -977,11 +1033,13 @@ void *handleLocalReq(void *threadarg) {
   if(*(localtdata->tdata->replyctrl) == TRANS_ABORT){
     if(transAbortProcess(localtdata) != 0) {
       printf("Error in transAbortProcess() %s,%d\n", __FILE__, __LINE__);
+      fflush(stdout);
       pthread_exit(NULL);
     }
   } else if(*(localtdata->tdata->replyctrl) == TRANS_COMMIT) {
     if(transComProcess(localtdata) != 0) {
       printf("Error in transComProcess() %s,%d\n", __FILE__, __LINE__);
+      fflush(stdout);
       pthread_exit(NULL);
     }
   }
@@ -992,7 +1050,6 @@ void *handleLocalReq(void *threadarg) {
   if (localtdata->transinfo->objnotfound != NULL) {
     free(localtdata->transinfo->objnotfound);
   }
-  
   pthread_exit(NULL);
 }
 
@@ -1305,7 +1362,7 @@ int getPrefetchResponse(int sd) {
     oid = *((unsigned int *)(recvbuffer + sizeof(char)));
     /* TODO: For each object not found query DHT for new location and retrieve the object */
     /* Throw an error */
-    printf("OBJECT %x NOT FOUND.... THIS SHOULD NOT HAPPEN...TERMINATE PROGRAM\n", oid);
+    //printf("OBJECT %x NOT FOUND.... THIS SHOULD NOT HAPPEN...TERMINATE PROGRAM\n", oid);
     //    exit(-1);
   } else {
     printf("Error: in decoding the control value %d, %s, %d\n",control, __FILE__, __LINE__);
@@ -1435,6 +1492,10 @@ int processConfigFile()
 #else
        myIpAddr = getMyIpAddr("eth0");
 #endif
+
+#ifdef CHECKTA
+    printf("My ip address = %x", myIpAddr);
+#endif
        myIndexInHostArray = findHost(myIpAddr);
        if (myIndexInHostArray == -1)
        {
@@ -1643,8 +1704,8 @@ int notifyAll(threadlist_t **head, unsigned int oid, unsigned int version) {
     if (connect(sock, (struct sockaddr *)&remoteAddr, sizeof(remoteAddr)) < 0){
       printf("notifyAll():error %d connecting to %s:%d\n", errno,
             inet_ntoa(remoteAddr.sin_addr), LISTEN_PORT);
-      status = -1;
       fflush(stdout);
+      status = -1;
     } else {
       bzero(msg, (1+sizeof(unsigned short) + 2*sizeof(unsigned int)));
       msg[0] = THREAD_NOTIFY_RESPONSE;