start of new file
[IRC.git] / Robust / src / Runtime / DSTM / interface / dstmserver.c
index e1ec8c926cd2620ab51a761e2c2574061bd6907c..0928321c310245151796b6cbe686c879d8c0a515 100644 (file)
@@ -143,35 +143,34 @@ void *dstmAccept(void *acceptfd) {
       /* Read oid requested and search if available */
       recv_data((int)acceptfd, &oid, sizeof(unsigned int));
       if((srcObj = mhashSearch(oid)) == NULL) {
-       printf("Error: Object 0x%x is not found in Main Object Store %s, %d\n", oid, __FILE__, __LINE__);
-       break;
+        printf("Error: Object 0x%x is not found in Main Object Store %s, %d\n", oid, __FILE__, __LINE__);
+        break;
       }
       h = (objheader_t *) srcObj;
       GETSIZE(size, h);
       size += sizeof(objheader_t);
       sockid = (int) acceptfd;
-      
       if (h == NULL) {
-       ctrl = OBJECT_NOT_FOUND;
-       send_data(sockid, &ctrl, sizeof(char));
+        ctrl = OBJECT_NOT_FOUND;
+        send_data(sockid, &ctrl, sizeof(char));
       } else {
-       /* Type */
-       char msg[]={OBJECT_FOUND, 0, 0, 0, 0};
-       *((int *)&msg[1])=size;
-       send_data(sockid, &msg, sizeof(msg));
-       send_data(sockid, h, size);
+        // Type 
+        char msg[]={OBJECT_FOUND, 0, 0, 0, 0};
+        *((int *)&msg[1])=size;
+        send_data(sockid, &msg, sizeof(msg));
+        send_data(sockid, h, size);
       }
       break;
-      
+
     case READ_MULT_REQUEST:
       break;
-      
+
     case MOVE_REQUEST:
       break;
-      
+
     case MOVE_MULT_REQUEST:
       break;
-      
+
     case TRANS_REQUEST:
       /* Read transaction request */
       transinfo.objlocked = NULL;
@@ -180,20 +179,20 @@ void *dstmAccept(void *acceptfd) {
       transinfo.numlocked = 0;
       transinfo.numnotfound = 0;
       if((val = readClientReq(&transinfo, (int)acceptfd)) != 0) {
-       printf("Error: In readClientReq() %s, %d\n", __FILE__, __LINE__);
-       pthread_exit(NULL);
+        printf("Error: In readClientReq() %s, %d\n", __FILE__, __LINE__);
+        pthread_exit(NULL);
       }
       break;
     case TRANS_PREFETCH:
       if((val = prefetchReq((int)acceptfd)) != 0) {
-       printf("Error: In prefetchReq() %s, %d\n", __FILE__, __LINE__);
-       break;
+        printf("Error: In prefetchReq() %s, %d\n", __FILE__, __LINE__);
+        break;
       }
       break;
     case TRANS_PREFETCH_RESPONSE:
       if((val = getPrefetchResponse((int) acceptfd)) != 0) {
-       printf("Error: In getPrefetchResponse() %s, %d\n", __FILE__, __LINE__);
-       break;
+        printf("Error: In getPrefetchResponse() %s, %d\n", __FILE__, __LINE__);
+        break;
       }
       break;
     case START_REMOTE_THREAD:
@@ -201,17 +200,17 @@ void *dstmAccept(void *acceptfd) {
       objType = getObjType(oid);
       startDSMthread(oid, objType);
       break;
-      
+
     case THREAD_NOTIFY_REQUEST:
       recv_data((int)acceptfd, &numoid, sizeof(unsigned int));
       size = (sizeof(unsigned int) + sizeof(unsigned short)) * numoid + 2 * sizeof(unsigned int);
       if((buffer = calloc(1,size)) == NULL) {
-       printf("%s() Calloc error at %s, %d\n", __func__, __FILE__, __LINE__);
-       pthread_exit(NULL);
+        printf("%s() Calloc error at %s, %d\n", __func__, __FILE__, __LINE__);
+        pthread_exit(NULL);
       }
-      
+
       recv_data((int)acceptfd, buffer, size);
-      
+
       oidarry = calloc(numoid, sizeof(unsigned int)); 
       memcpy(oidarry, buffer, sizeof(unsigned int) * numoid);
       size = sizeof(unsigned int) * numoid;
@@ -223,18 +222,18 @@ void *dstmAccept(void *acceptfd) {
       threadid = *((unsigned int *)(buffer+size));
       processReqNotify(numoid, oidarry, versionarry, mid, threadid);
       free(buffer);
-      
+
       break;
 
     case THREAD_NOTIFY_RESPONSE:
       size = sizeof(unsigned short) + 2 * sizeof(unsigned int);
       if((buffer = calloc(1,size)) == NULL) {
-       printf("%s() Calloc error at %s, %d\n", __func__, __FILE__, __LINE__);
-       pthread_exit(NULL);
+        printf("%s() Calloc error at %s, %d\n", __func__, __FILE__, __LINE__);
+        pthread_exit(NULL);
       }
-      
+
       recv_data((int)acceptfd, buffer, size);
-      
+
       oid = *((unsigned int *)buffer);
       size = sizeof(unsigned int);
       version = *((unsigned short *)(buffer+size));
@@ -252,56 +251,56 @@ void *dstmAccept(void *acceptfd) {
     }
   }
 
- closeconnection:
+closeconnection:
   /* Close connection */
   if (close((int)acceptfd) == -1)
     perror("close");
   pthread_exit(NULL);
 }
-  
+
 /* This function reads the information available in a transaction request
  * and makes a function call to process the request */
 int readClientReq(trans_commit_data_t *transinfo, int acceptfd) {
-       char *ptr;
-       void *modptr;
-       unsigned int *oidmod, oid;
-       fixed_data_t fixed;
-       objheader_t *headaddr;
-       int sum, i, size, n, val;
-
-       oidmod = NULL;
-
-       /* Read fixed_data_t data structure */ 
-       size = sizeof(fixed) - 1;
-       ptr = (char *)&fixed;;
-       fixed.control = TRANS_REQUEST;
-       recv_data((int)acceptfd, ptr+1, size);
-
-       /* Read list of mids */
-       int mcount = fixed.mcount;
-       size = mcount * sizeof(unsigned int);
-       unsigned int listmid[mcount];
-       ptr = (char *) listmid;
-       recv_data((int)acceptfd, ptr, size);
-       
-       /* Read oid and version tuples for those objects that are not modified in the transaction */
-       int numread = fixed.numread;
-       size = numread * (sizeof(unsigned int) + sizeof(unsigned short));
-       char objread[size];
-       if(numread != 0) { //If pile contains more than one object to be read, 
-                         // keep reading all objects
-               recv_data((int)acceptfd, objread, size);        
-       }
-       
-       /* Read modified objects */
-       if(fixed.nummod != 0) {
-               if ((modptr = calloc(1, fixed.sum_bytes)) == NULL) {
-                       printf("calloc error for modified objects %s, %d\n", __FILE__, __LINE__);
-                       return 1;
-               }
-               size = fixed.sum_bytes;
-               recv_data((int)acceptfd, modptr, size); 
-       }
+  char *ptr;
+  void *modptr;
+  unsigned int *oidmod, oid;
+  fixed_data_t fixed;
+  objheader_t *headaddr;
+  int sum, i, size, n, val;
+
+  oidmod = NULL;
+
+  /* Read fixed_data_t data structure */ 
+  size = sizeof(fixed) - 1;
+  ptr = (char *)&fixed;;
+  fixed.control = TRANS_REQUEST;
+  recv_data((int)acceptfd, ptr+1, size);
+
+  /* Read list of mids */
+  int mcount = fixed.mcount;
+  size = mcount * sizeof(unsigned int);
+  unsigned int listmid[mcount];
+  ptr = (char *) listmid;
+  recv_data((int)acceptfd, ptr, size);
+
+  /* Read oid and version tuples for those objects that are not modified in the transaction */
+  int numread = fixed.numread;
+  size = numread * (sizeof(unsigned int) + sizeof(unsigned short));
+  char objread[size];
+  if(numread != 0) { //If pile contains more than one object to be read, 
+    // keep reading all objects
+    recv_data((int)acceptfd, objread, size);   
+  }
+
+  /* Read modified objects */
+  if(fixed.nummod != 0) {
+    if ((modptr = calloc(1, fixed.sum_bytes)) == NULL) {
+      printf("calloc error for modified objects %s, %d\n", __FILE__, __LINE__);
+      return 1;
+    }
+    size = fixed.sum_bytes;
+    recv_data((int)acceptfd, modptr, size);    
+  }
 
        /* Create an array of oids for modified objects */
        oidmod = (unsigned int *) calloc(fixed.nummod, sizeof(unsigned int));
@@ -405,7 +404,6 @@ int processClientReq(fixed_data_t *fixed, trans_commit_data_t *transinfo,
        if (transinfo->objnotfound != NULL) {
                free(transinfo->objnotfound);
        }
-
        return 0;
 }
 
@@ -416,16 +414,17 @@ char handleTransReq(fixed_data_t *fixed, trans_commit_data_t *transinfo, unsigne
        unsigned short version;
        char control = 0, *ptr;
        unsigned int oid;
-       unsigned int *oidnotfound, *oidlocked;
+       unsigned int *oidnotfound, *oidlocked, *oidvernotmatch;
        void *mobj;
        objheader_t *headptr;
 
        /* Counters and arrays to formulate decision on control message to be sent */
        oidnotfound = (unsigned int *) calloc(fixed->numread + fixed->nummod, sizeof(unsigned int)); 
        oidlocked = (unsigned int *) calloc(fixed->numread + fixed->nummod, sizeof(unsigned int)); 
-       int objnotfound = 0, objlocked = 0;
+       oidvernotmatch = (unsigned int *) calloc(fixed->numread + fixed->nummod, sizeof(unsigned int)); 
+       int objnotfound = 0, objlocked = 0, objvernotmatch = 0;
        int v_nomatch = 0, v_matchlock = 0, v_matchnolock = 0;
-
+    int numBytes = 0;
        /* modptr points to the beginning of the object store 
         * created at the Pariticipant. 
         * Object store holds the modified objects involved in the transaction request */ 
@@ -462,20 +461,20 @@ char handleTransReq(fixed_data_t *fixed, trans_commit_data_t *transinfo, unsigne
               v_matchlock++;
             } else {/* If versions don't match ...HARD ABORT */
               v_nomatch++;
+              oidvernotmatch[objvernotmatch] = oid;
+              objvernotmatch++;
+              int size;
+              GETSIZE(size, mobj);
+              size += sizeof(objheader_t);
+              numBytes += size;
               /* Send TRANS_DISAGREE to Coordinator */
               control = TRANS_DISAGREE;
-              if (objlocked > 0) {
-                for(j = 0; j < objlocked; j++) {
-                  if((headptr = mhashSearch(oidlocked[j])) == NULL) {
-                    printf("mhashSearch returns NULL at %s, %d\n", __FILE__, __LINE__);
-                    return 0;
-                  }
-                 UnLock(STATUSPTR(headptr));
-                }
-                free(oidlocked);
-              }
-              send_data(acceptfd, &control, sizeof(char));
-              return control;
+#ifdef CHECKTA
+  char b[] = "version mismatch";
+  char c[] = "object type";
+  TABORT3(__func__, b, c, TYPE(mobj));
+#endif
+
             }
           } else {/* If Obj is not locked then lock object */
             /* Save all object oids that are locked on this machine during this transaction request call */
@@ -485,26 +484,56 @@ char handleTransReq(fixed_data_t *fixed, trans_commit_data_t *transinfo, unsigne
               v_matchnolock++;
             } else { /* If versions don't match ...HARD ABORT */
               v_nomatch++;
+              oidvernotmatch[objvernotmatch] = oid;
+              objvernotmatch++;
+              int size;
+              GETSIZE(size, mobj);
+              size += sizeof(objheader_t);
+              numBytes += size;
               control = TRANS_DISAGREE;
-              if (objlocked > 0) {
-                for(j = 0; j < objlocked; j++) {
-                  if((headptr = mhashSearch(oidlocked[j])) == NULL) {
-                    printf("mhashSearch returns NULL at %s, %d\n", __FILE__, __LINE__);
-                    return 0;
-                  }
-                 UnLock(STATUSPTR(headptr));
-                }
-                free(oidlocked);
-              }
 
-              /* Send TRANS_DISAGREE to Coordinator */
-              send_data(acceptfd, &control, sizeof(char));
-              return control;
+#ifdef CHECKTA
+  char b[] = "version mismatch";
+  char c[] = "object type";
+  TABORT3(__func__, b, c, TYPE(mobj));
+#endif
             }
           }
         }
        }
        
+       /* send TRANS_DISAGREE and objs*/
+    if(v_nomatch > 0) {
+      char *objs = calloc(1, numBytes);
+      int j, offset = 0;
+      for(j = 0; j<objvernotmatch; j++) {
+        objheader_t *header = mhashSearch(oidvernotmatch[j]);
+        int size = 0;
+        GETSIZE(size, header);
+        size += sizeof(objheader_t);
+        memcpy(objs+offset, header, size);
+        offset += size;
+      }
+      if (objlocked > 0) {
+        for(j = 0; j < objlocked; j++) {
+          if((headptr = mhashSearch(oidlocked[j])) == NULL) {
+            printf("mhashSearch returns NULL at %s, %d\n", __FILE__, __LINE__);
+            return 0;
+          }
+          UnLock(STATUSPTR(headptr));
+        }
+        free(oidlocked);
+      }
+      send_data(acceptfd, &control, sizeof(char));
+      send_data(acceptfd, &numBytes, sizeof(int));
+      send_data(acceptfd, objs, numBytes);
+      transinfo->objvernotmatch = oidvernotmatch;
+      transinfo->numvernotmatch = objvernotmatch;
+      free(objs);
+      free(transinfo->objvernotmatch);
+      return control;
+    }
+
        /* Decide what control message to send to Coordinator */
        if ((control = decideCtrlMessage(fixed, transinfo, &v_matchnolock, &v_matchlock, &v_nomatch, &objnotfound, &objlocked,
                                        modptr, oidnotfound, oidlocked, acceptfd)) == 0) {
@@ -513,7 +542,6 @@ char handleTransReq(fixed_data_t *fixed, trans_commit_data_t *transinfo, unsigne
        }
        
        return control;
-
 }
 /* This function decides what control message such as TRANS_AGREE, TRANS_DISAGREE or TRANS_SOFT_ABORT
  * to send to Coordinator based on the votes of oids involved in the transaction */
@@ -536,7 +564,7 @@ char decideCtrlMessage(fixed_data_t *fixed, trans_commit_data_t *transinfo, int
                /* Send control message */
                send_data(acceptfd, &control, sizeof(char));
        
-               /* Send number of oids not found and the missing oids if objects are missing in the machine */
+               /*  FIXME how to send objs Send number of oids not found and the missing oids if objects are missing in the machine */
                if(*(objnotfound) != 0) { 
                        int msg[1];
                        msg[0] = *(objnotfound);