check LOCK versions
authoradash <adash>
Fri, 2 May 2008 21:46:47 +0000 (21:46 +0000)
committeradash <adash>
Fri, 2 May 2008 21:46:47 +0000 (21:46 +0000)
Robust/src/Runtime/DSTM/interface/dstmserver.c
Robust/src/Runtime/DSTM/interface/trans.c

index 505fd3489c141320ed00a3ce60554dc59276c741..ddacde978c9f36b67413a6e60c111962b25860d9 100644 (file)
@@ -18,6 +18,7 @@ extern pthread_mutex_t notifymutex;
 
 objstr_t *mainobjstore;
 pthread_mutex_t mainobjstore_mutex;
+pthread_mutex_t lockObjHeader;
 pthread_mutexattr_t mainobjstore_mutex_attr; /* Attribute for lock to make it a recursive lock */
 
 sockPoolHashTable_t *transPResponseSocketPool;
@@ -32,6 +33,7 @@ int dstmInit(void)
        pthread_mutexattr_init(&mainobjstore_mutex_attr);
        pthread_mutexattr_settype(&mainobjstore_mutex_attr, PTHREAD_MUTEX_RECURSIVE_NP);
        pthread_mutex_init(&mainobjstore_mutex, &mainobjstore_mutex_attr);
+       pthread_mutex_init(&lockObjHeader,NULL);
        if (mhashCreate(HASH_SIZE, LOADFACTOR))
                return 1; //failure
        
@@ -434,66 +436,69 @@ char handleTransReq(fixed_data_t *fixed, trans_commit_data_t *transinfo, unsigne
                  int tmpsize;
                  headptr = (objheader_t *) ptr;
                  oid = OID(headptr);
-                 version = headptr->version;
-                 GETSIZE(tmpsize, headptr);
-                 ptr += sizeof(objheader_t) + tmpsize;
-               }
-               
-               /* Check if object is still present in the machine since the beginning of TRANS_REQUEST */
-
-               if ((mobj = mhashSearch(oid)) == NULL) {/* Obj not found */
-                       /* Save the oids not found and number of oids not found for later use */
-                       oidnotfound[objnotfound] = oid;
-                       objnotfound++;
-               } else { /* If Obj found in machine (i.e. has not moved) */
-                       /* Check if Obj is locked by any previous transaction */
-                       if ((STATUS(((objheader_t *)mobj)) & LOCK) == LOCK) {           
-                               if (version == ((objheader_t *)mobj)->version) {      /* If locked then match versions */
-                                       v_matchlock++;
-                               } else {/* If versions don't match ...HARD ABORT */
-                                       v_nomatch++;
-                                       /* Send TRANS_DISAGREE to Coordinator */
-                                       control = TRANS_DISAGREE;
-                                       if (objlocked > 0) {
-                                         for(j = 0; j < objlocked; j++) {
-                                                       if((headptr = mhashSearch(oidlocked[j])) == NULL) {
-                                                               printf("mhashSearch returns NULL at %s, %d\n", __FILE__, __LINE__);
-                                                               return 0;
-                                                       }
-                                                       STATUS(headptr) &= ~(LOCK);
-                                               }
-                                               free(oidlocked);
-                                       }
-                                       send_data(acceptfd, &control, sizeof(char));
-                                       return control;
-                               }
-                       } else {/* If Obj is not locked then lock object */
-                               STATUS(((objheader_t *)mobj)) |= LOCK;
-                               /* Save all object oids that are locked on this machine during this transaction request call */
-                               oidlocked[objlocked] = OID(((objheader_t *)mobj));
-                               objlocked++;
-                               if (version == ((objheader_t *)mobj)->version) { /* Check if versions match */
-                                       v_matchnolock++;
-                               } else { /* If versions don't match ...HARD ABORT */
-                                       v_nomatch++;
-                                       control = TRANS_DISAGREE;
-                                       if (objlocked > 0) {
-                                               for(j = 0; j < objlocked; j++) {
-                                                       if((headptr = mhashSearch(oidlocked[j])) == NULL) {
-                                                               printf("mhashSearch returns NULL at %s, %d\n", __FILE__, __LINE__);
-                                                               return 0;
-                                                       }
-                                                       STATUS(headptr) &= ~(LOCK);
-                                               }
-                                               free(oidlocked);
-                                       }
-
-                                       /* Send TRANS_DISAGREE to Coordinator */
-                                       send_data(acceptfd, &control, sizeof(char));
-                                       return control;
-                               }
-                       }
-               }
+          version = headptr->version;
+          GETSIZE(tmpsize, headptr);
+          ptr += sizeof(objheader_t) + tmpsize;
+        }
+
+        /* Check if object is still present in the machine since the beginning of TRANS_REQUEST */
+
+        if ((mobj = mhashSearch(oid)) == NULL) {/* Obj not found */
+          /* Save the oids not found and number of oids not found for later use */
+          oidnotfound[objnotfound] = oid;
+          objnotfound++;
+        } else { /* If Obj found in machine (i.e. has not moved) */
+          /* Check if Obj is locked by any previous transaction */
+          pthread_mutex_lock(&lockObjHeader);
+          if ((STATUS(((objheader_t *)mobj)) & LOCK) == LOCK) {                
+            pthread_mutex_unlock(&lockObjHeader);
+            if (version == ((objheader_t *)mobj)->version) {      /* If locked then match versions */
+              v_matchlock++;
+            } else {/* If versions don't match ...HARD ABORT */
+              v_nomatch++;
+              /* Send TRANS_DISAGREE to Coordinator */
+              control = TRANS_DISAGREE;
+              if (objlocked > 0) {
+                for(j = 0; j < objlocked; j++) {
+                  if((headptr = mhashSearch(oidlocked[j])) == NULL) {
+                    printf("mhashSearch returns NULL at %s, %d\n", __FILE__, __LINE__);
+                    return 0;
+                  }
+                  STATUS(headptr) &= ~(LOCK);
+                }
+                free(oidlocked);
+              }
+              send_data(acceptfd, &control, sizeof(char));
+              return control;
+            }
+          } else {/* If Obj is not locked then lock object */
+            STATUS(((objheader_t *)mobj)) |= LOCK;
+            pthread_mutex_unlock(&lockObjHeader);
+            /* Save all object oids that are locked on this machine during this transaction request call */
+            oidlocked[objlocked] = OID(((objheader_t *)mobj));
+            objlocked++;
+            if (version == ((objheader_t *)mobj)->version) { /* Check if versions match */
+              v_matchnolock++;
+            } else { /* If versions don't match ...HARD ABORT */
+              v_nomatch++;
+              control = TRANS_DISAGREE;
+              if (objlocked > 0) {
+                for(j = 0; j < objlocked; j++) {
+                  if((headptr = mhashSearch(oidlocked[j])) == NULL) {
+                    printf("mhashSearch returns NULL at %s, %d\n", __FILE__, __LINE__);
+                    return 0;
+                  }
+                  STATUS(headptr) &= ~(LOCK);
+                }
+                free(oidlocked);
+              }
+
+              /* Send TRANS_DISAGREE to Coordinator */
+              send_data(acceptfd, &control, sizeof(char));
+              return control;
+            }
+          }
+        }
        }
        
        /* Decide what control message to send to Coordinator */
@@ -565,7 +570,6 @@ int transCommitProcess(void *modptr, unsigned int *oidmod, unsigned int *oidlock
                        return 1;
                }
                GETSIZE(tmpsize,header);
-               pthread_mutex_lock(&mainobjstore_mutex);
                memcpy((char*)header + sizeof(objheader_t), ((char *)modptr + sizeof(objheader_t) + offset), tmpsize);
                header->version += 1; 
                /* If threads are waiting on this object to be updated, notify them */
@@ -574,7 +578,6 @@ int transCommitProcess(void *modptr, unsigned int *oidmod, unsigned int *oidlock
                        notifyAll(&header->notifylist, OID(header), header->version);
                }
         pthread_mutex_unlock(&notifymutex);
-               pthread_mutex_unlock(&mainobjstore_mutex);
                offset += sizeof(objheader_t) + tmpsize;
        }
 
index bbd87742e025ab01e3bb2422ca597d33c03e1b2a..4141f2227a9f5a1e8485dfedf11513892bf7ec87 100644 (file)
@@ -39,6 +39,7 @@ unsigned int oidMax;
 sockPoolHashTable_t *transReadSockPool;
 sockPoolHashTable_t *transPrefetchSockPool;
 pthread_mutex_t notifymutex;
+pthread_mutex_t atomicObjLock;
 
 void printhex(unsigned char *, int);
 plistnode_t *createPiles(transrecord_t *);
@@ -217,6 +218,7 @@ void transInit() {
   pthread_mutex_init(&prefetchcache_mutex, &prefetchcache_mutex_attr);
   
   pthread_mutex_init(&notifymutex, NULL);
+  pthread_mutex_init(&atomicObjLock, NULL);
   //Create prefetch cache lookup table
   if(prehashCreate(HASH_SIZE, LOADFACTOR)) {
     printf("ERROR\n");
@@ -544,7 +546,7 @@ int transCommit(transrecord_t *record) {
        }
       free(thread_data_array[i].buffer);
     }
-    
+
     /* Free resources */       
     pthread_cond_destroy(&tcond);
     pthread_mutex_destroy(&tlock);
@@ -561,7 +563,6 @@ int transCommit(transrecord_t *record) {
     /* Retry trans commit procedure during soft_abort case */
   } while (treplyretry);
   
-  
   if(treplyctrl == TRANS_ABORT) {
     /* Free Resources */
     objstrDelete(record->cache);
@@ -854,26 +855,29 @@ void *handleLocalReq(void *threadarg) {
       objnotfound++;
     } else { /* If Obj found in machine (i.e. has not moved) */
       /* Check if Obj is locked by any previous transaction */
+      pthread_mutex_lock(&atomicObjLock);
       if ((STATUS((objheader_t *)mobj) & LOCK) == LOCK) {
-       if (version == ((objheader_t *)mobj)->version) {      /* If locked then match versions */ 
-         v_matchlock++;
-       } else {/* If versions don't match ...HARD ABORT */
-         v_nomatch++;
-         /* Send TRANS_DISAGREE to Coordinator */
-         localtdata->tdata->recvmsg[localtdata->tdata->thread_id].rcv_status = TRANS_DISAGREE;
-       }
+        pthread_mutex_unlock(&atomicObjLock);
+        if (version == ((objheader_t *)mobj)->version) {      /* If locked then match versions */ 
+          v_matchlock++;
+        } else {/* If versions don't match ...HARD ABORT */
+          v_nomatch++;
+          /* Send TRANS_DISAGREE to Coordinator */
+          localtdata->tdata->recvmsg[localtdata->tdata->thread_id].rcv_status = TRANS_DISAGREE;
+        }
       } else {/* If Obj is not locked then lock object */
-       STATUS(((objheader_t *)mobj)) |= LOCK;
-       /* Save all object oids that are locked on this machine during this transaction request call */
-       oidlocked[objlocked] = OID(((objheader_t *)mobj));
-       objlocked++;
-       if (version == ((objheader_t *)mobj)->version) { /* Check if versions match */
-         v_matchnolock++;
-       } else { /* If versions don't match ...HARD ABORT */
-         v_nomatch++;
-         /* Send TRANS_DISAGREE to Coordinator */
-         localtdata->tdata->recvmsg[localtdata->tdata->thread_id].rcv_status = TRANS_DISAGREE;
-       }
+        STATUS(((objheader_t *)mobj)) |= LOCK;
+        pthread_mutex_unlock(&atomicObjLock);
+        /* Save all object oids that are locked on this machine during this transaction request call */
+        oidlocked[objlocked] = OID(((objheader_t *)mobj));
+        objlocked++;
+        if (version == ((objheader_t *)mobj)->version) { /* Check if versions match */
+          v_matchnolock++;
+        } else { /* If versions don't match ...HARD ABORT */
+          v_nomatch++;
+          /* Send TRANS_DISAGREE to Coordinator */
+          localtdata->tdata->recvmsg[localtdata->tdata->thread_id].rcv_status = TRANS_DISAGREE;
+        }
       }
     }
   } // End for
@@ -1509,7 +1513,6 @@ int reqNotify(unsigned int *oidarry, unsigned short *versionarry, unsigned int n
                *((unsigned int *)(&msg[1] + size)) = myIpAddr;
                size += sizeof(unsigned int);
                *((unsigned int *)(&msg[1] + size)) = threadid;
-
                pthread_mutex_lock(&(ndata->threadnotify));
                size = 1 + numoid * (sizeof(unsigned int) + sizeof(unsigned short)) + 3 * sizeof(unsigned int);
                send_data(sock, msg, size);