bug fixes
[IRC.git] / Robust / src / Runtime / DSTM / interface / trans.c
index b5527c2be573efd1ca69930b98d0e58a5fc1e57d..834175108e3be0240e926fd7840e0d0beb969e41 100644 (file)
@@ -68,7 +68,7 @@ void send_data(int fd , void *buf, int buflen) {
     numbytes = send(fd, buffer, size, MSG_NOSIGNAL);
     if (numbytes == -1) {
       perror("send");
-      exit(-1);
+      return;
     }
     buffer += numbytes;
     size -= numbytes;
@@ -83,7 +83,7 @@ void recv_data(int fd , void *buf, int buflen) {
     numbytes = recv(fd, buffer, size, 0);
     if (numbytes == -1) {
       perror("recv");
-      exit(-1);
+      return;
     }
     buffer += numbytes;
     size -= numbytes;
@@ -143,10 +143,13 @@ inline int findmax(int *array, int arraylength) {
 void prefetch(int siteid, int ntuples, unsigned int *oids, unsigned short *endoffsets, short *arrayfields) {
   /* Allocate for the queue node*/
   int qnodesize = 2*sizeof(int) + ntuples * (sizeof(unsigned short) + sizeof(unsigned int)) + endoffsets[ntuples - 1] * sizeof(short);
-  char * node= getmemory(qnodesize);
-  /* Set queue node values */
   int len;
+  char * node= getmemory(qnodesize);
   int top=endoffsets[ntuples-1];
+  
+  if (node==NULL) 
+    return;
+  /* Set queue node values */
 
   /* TODO: Remove this after testing */
   evalPrefetch[siteid].callcount++;
@@ -192,10 +195,10 @@ int dstmStartup(const char * option) {
   
   fd=startlistening();
   udpfd = udpInit();
+  pthread_attr_init(&attr);
+  pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_DETACHED);
   pthread_create(&udp_thread_Listen, &attr, udpListenBroadcast, (void*)udpfd);
   if (master) {
-    pthread_attr_init(&attr);
-    pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_DETACHED);
     pthread_create(&thread_Listen, &attr, dstmListen, (void*)fd);
     return 1;
   } else {
@@ -537,7 +540,6 @@ int transCommit(transrecord_t *record) {
       thread_data_array[threadnum].replyctrl = &treplyctrl;
       thread_data_array[threadnum].replyretry = &treplyretry;
       thread_data_array[threadnum].rec = record;
-      thread_data_array[threadnum].pilehead = pile_ptr;
       /* If local do not create any extra connection */
       if(pile->mid != myIpAddr) { /* Not local */
        do {
@@ -691,7 +693,10 @@ void *transRequest(void *threadarg) {
   /* Send objects that are modified */
   for(i = 0; i < tdata->buffer->f.nummod ; i++) {
     int size;
-    headeraddr = chashSearch(tdata->rec->lookupTable, tdata->buffer->oidmod[i]);
+    if((headeraddr = chashSearch(tdata->rec->lookupTable, tdata->buffer->oidmod[i])) == NULL) {
+      printf("%s() Error: No such oid %s, %d\n", __func__, __FILE__, __LINE__);
+      pthread_exit(NULL);
+    }
     GETSIZE(size,headeraddr);
     size+=sizeof(objheader_t);
     send_data(sd, headeraddr, size);
@@ -761,8 +766,15 @@ void *transRequest(void *threadarg) {
   }
   pthread_mutex_unlock(tdata->lock);
 
-  /* Invalidate objects in other machine cache */
   if(*(tdata->replyctrl) == TRANS_COMMIT) {
+    int retval;
+     /* Update prefetch cache */
+    if((retval = updatePrefetchCache(tdata)) != 0) {
+      printf("Error: %s() in updating prefetch cache %s, %d\n", __func__, __FILE__, __LINE__);
+      return;
+    }
+
+    /* Invalidate objects in other machine cache */
     if(tdata->buffer->f.nummod > 0) {
       if((retval = invalidateObj(tdata)) != 0) {
         printf("Error: %s() in invalidating Objects %s, %d\n", __func__, __FILE__, __LINE__);
@@ -771,7 +783,6 @@ void *transRequest(void *threadarg) {
     }
   }
   
-
   /* Send the final response such as TRANS_COMMIT or TRANS_ABORT 
    * to all participants in their respective socket */
   if (sendResponse(tdata, sd) == 0) { 
@@ -788,7 +799,6 @@ void *transRequest(void *threadarg) {
   } else {
     //printf("DEBUG-> Error: Incorrect Transaction End Message %d\n", control);
   }
-
   pthread_exit(NULL);
 }
 
@@ -830,12 +840,6 @@ void decideResponse(thread_data_array_t *tdata) {
     /* Send Commit */
     *(tdata->replyctrl) = TRANS_COMMIT;
     *(tdata->replyretry) = 0;
-    int retval;
-    /* Update prefetch cache */
-    if((retval = updatePrefetchCache(tdata)) != 0) {
-      printf("Error: %s() in updating prefetch cache %s, %d\n", __func__, __FILE__, __LINE__);
-      return;
-    }
   } else { 
     /* Send Abort in soft abort case followed by retry commiting transaction again*/
     *(tdata->replyctrl) = TRANS_ABORT;
@@ -926,7 +930,7 @@ void *handleLocalReq(void *threadarg) {
   unsigned short version;
   void *mobj;
   objheader_t *headptr;
-  
+
   localtdata = (local_thread_data_array_t *) threadarg;
   
   /* Counters and arrays to formulate decision on control message to be sent */
@@ -945,8 +949,8 @@ void *handleLocalReq(void *threadarg) {
       int tmpsize;
       headptr = (objheader_t *) chashSearch(localtdata->tdata->rec->lookupTable, localtdata->tdata->buffer->oidmod[i-numread]);
       if (headptr == NULL) {
-       printf("Error: handleLocalReq() returning NULL %s, %d\n", __FILE__, __LINE__);
-       return NULL;
+        printf("Error: handleLocalReq() returning NULL, no such oid %s, %d\n", __FILE__, __LINE__);
+        return NULL;
       }
       oid = OID(headptr);
       version = headptr->version;
@@ -1035,11 +1039,13 @@ void *handleLocalReq(void *threadarg) {
   if(*(localtdata->tdata->replyctrl) == TRANS_ABORT){
     if(transAbortProcess(localtdata) != 0) {
       printf("Error in transAbortProcess() %s,%d\n", __FILE__, __LINE__);
+      fflush(stdout);
       pthread_exit(NULL);
     }
   } else if(*(localtdata->tdata->replyctrl) == TRANS_COMMIT) {
     if(transComProcess(localtdata) != 0) {
       printf("Error in transComProcess() %s,%d\n", __FILE__, __LINE__);
+      fflush(stdout);
       pthread_exit(NULL);
     }
   }
@@ -1050,7 +1056,6 @@ void *handleLocalReq(void *threadarg) {
   if (localtdata->transinfo->objnotfound != NULL) {
     free(localtdata->transinfo->objnotfound);
   }
-  
   pthread_exit(NULL);
 }
 
@@ -1363,7 +1368,7 @@ int getPrefetchResponse(int sd) {
     oid = *((unsigned int *)(recvbuffer + sizeof(char)));
     /* TODO: For each object not found query DHT for new location and retrieve the object */
     /* Throw an error */
-    printf("OBJECT %x NOT FOUND.... THIS SHOULD NOT HAPPEN...TERMINATE PROGRAM\n", oid);
+    //printf("OBJECT %x NOT FOUND.... THIS SHOULD NOT HAPPEN...TERMINATE PROGRAM\n", oid);
     //    exit(-1);
   } else {
     printf("Error: in decoding the control value %d, %s, %d\n",control, __FILE__, __LINE__);
@@ -1376,18 +1381,39 @@ unsigned short getObjType(unsigned int oid) {
   objheader_t *objheader;
   unsigned short numoffset[] ={0};
   short fieldoffset[] ={};
-  
+
   if ((objheader = (objheader_t *) mhashSearch(oid)) == NULL) {
-      if ((objheader = (objheader_t *) prehashSearch(oid)) == NULL) {
-       prefetch(0, 1, &oid, numoffset, fieldoffset);
-       pthread_mutex_lock(&pflookup.lock);
-       while ((objheader = (objheader_t *) prehashSearch(oid)) == NULL) {
-         pthread_cond_wait(&pflookup.cond, &pflookup.lock);
-       }
-       pthread_mutex_unlock(&pflookup.lock);
+    if ((objheader = (objheader_t *) prehashSearch(oid)) == NULL) {
+      unsigned int mid = lhashSearch(oid);
+      int sd = getSock2(transReadSockPool, mid);
+      char remotereadrequest[sizeof(char)+sizeof(unsigned int)];
+      remotereadrequest[0] = READ_REQUEST;
+      *((unsigned int *)(&remotereadrequest[1])) = oid;
+      send_data(sd, remotereadrequest, sizeof(remotereadrequest));
+
+      /* Read response from the Participant */
+      char control;
+      recv_data(sd, &control, sizeof(char));
+
+      if (control==OBJECT_NOT_FOUND) {
+        printf("Error: in %s() THIS SHOULD NOT HAPPEN.....EXIT PROGRAM\n", __func__);
+        fflush(stdout);
+        exit(-1);
+      } else {
+        /* Read object if found into local cache */
+        int size;
+        recv_data(sd, &size, sizeof(int));
+        pthread_mutex_lock(&prefetchcache_mutex);
+        if ((objheader = prefetchobjstrAlloc(size)) == NULL) {
+          printf("Error: %s() objstrAlloc error for copying into prefetch cache %s, %d\n", __func__, __FILE__, __LINE__);
+          pthread_exit(NULL);
+        }
+        pthread_mutex_unlock(&prefetchcache_mutex);
+        recv_data(sd, objheader, size);
+        prehashInsert(oid, objheader);
       }
+    }
   }
-  
   return TYPE(objheader);
 }
 
@@ -1493,10 +1519,6 @@ int processConfigFile()
 #else
        myIpAddr = getMyIpAddr("eth0");
 #endif
-
-#ifdef CHECKTA
-    printf("My ip address = %x", myIpAddr);
-#endif
        myIndexInHostArray = findHost(myIpAddr);
        if (myIndexInHostArray == -1)
        {
@@ -1667,7 +1689,7 @@ void threadNotify(unsigned int oid, unsigned short version, unsigned int tid) {
                        return;
                } else {
                        if(version <= ndata->versionarry[index]){
-                               printf("threadNotify(): New version %d has not changed since last version %s, %d\n", version, __FILE__, __LINE__);
+                               printf("threadNotify(): New version %d has not changed since last version for oid = %d, %s, %d\n", version, oid, __FILE__, __LINE__);
                                return;
                        } else {
                                /* Clear from prefetch cache and free thread related data structure */
@@ -1705,8 +1727,8 @@ int notifyAll(threadlist_t **head, unsigned int oid, unsigned int version) {
     if (connect(sock, (struct sockaddr *)&remoteAddr, sizeof(remoteAddr)) < 0){
       printf("notifyAll():error %d connecting to %s:%d\n", errno,
             inet_ntoa(remoteAddr.sin_addr), LISTEN_PORT);
-      status = -1;
       fflush(stdout);
+      status = -1;
     } else {
       bzero(msg, (1+sizeof(unsigned short) + 2*sizeof(unsigned int)));
       msg[0] = THREAD_NOTIFY_RESPONSE;