New locking strategy... We no longer read lock objects... Instead:
authorbdemsky <bdemsky>
Fri, 17 Apr 2009 22:53:48 +0000 (22:53 +0000)
committerbdemsky <bdemsky>
Fri, 17 Apr 2009 22:53:48 +0000 (22:53 +0000)
We first acquire all write locks.
We next for read object, we check that (1) it isn't locked and the version number matches.
The serialization point for the transaction occurs between the two checks...

Robust/src/Runtime/STM/stm.c
Robust/src/Runtime/STM/tm.h

index a891ee3b1c535bda1a550d795fbe959068bb0058..b00c3602599703789baa47713ec33041845c0690 100644 (file)
@@ -282,17 +282,21 @@ int traverseCache() {
   int numoidrdlocked=0;
   int numoidwrlocked=0;
   void * rdlocked[200];
+  int rdversion[200];
   void * wrlocked[200];
   int softabort=0;
   int i;
   void ** oidrdlocked;
   void ** oidwrlocked;
+  int * oidrdversion;
   if (c_numelements<200) {
     oidrdlocked=rdlocked;
+    oidrdversion=rdversion;
     oidwrlocked=wrlocked;
   } else {
     int size=c_numelements*sizeof(void*);
     oidrdlocked=malloc(size);
+    oidrdversion=malloc(size);
     oidwrlocked=malloc(size);
   }
   chashlistnode_t *ptr = c_table;
@@ -306,9 +310,8 @@ int traverseCache() {
       if(curr->key == NULL)
         break;
       objheader_t * headeraddr=&((objheader_t *) curr->val)[-1];
-      
-      unsigned int version = headeraddr->version;
       objheader_t *header=(objheader_t *) (((char *)curr->key)-sizeof(objheader_t));
+      unsigned int version = headeraddr->version;
       
       if(STATUS(headeraddr) & DIRTY) {
        /* Read from the main heap  and compare versions */
@@ -318,8 +321,13 @@ int traverseCache() {
            oidwrlocked[numoidwrlocked++] = OID(header);
          } else { 
            oidwrlocked[numoidwrlocked++] = OID(header);
-           transAbortProcess(oidrdlocked, &numoidrdlocked, oidwrlocked, &numoidwrlocked);
+           transAbortProcess(oidwrlocked, numoidwrlocked);
            DEBUGSTM("WR Abort: rd: %u wr: %u tot: %u type: %u ver: %u\n", numoidrdlocked, numoidwrlocked, c_numelements, TYPE(header), header->version);
+           if (c_numelements>=200) {
+             free(oidrdlocked);
+             free(oidrdversion);
+             free(oidwrlocked);
+           }
            return TRANS_ABORT;
          }
        } else { /* cannot aquire lock */
@@ -327,45 +335,77 @@ int traverseCache() {
            /* versions match */
            softabort=1;
          } else {
-           transAbortProcess(oidrdlocked, &numoidrdlocked, oidwrlocked, &numoidwrlocked);
+           transAbortProcess(oidwrlocked, numoidwrlocked);
            DEBUGSTM("WR Abort: rd: %u wr: %u tot: %u type: %u ver: %u\n", numoidrdlocked, numoidwrlocked, c_numelements, TYPE(header), header->version);
+           if (c_numelements>=200) {
+             free(oidrdlocked);
+             free(oidrdversion);
+             free(oidwrlocked);
+           }
            return TRANS_ABORT;
          }
        }
       } else {
-       /* Read from the main heap  and compare versions */
-       if(read_trylock(&header->lock)) { //can further acquire read locks
-         if(version == header->version) {/* versions match */
-           oidrdlocked[numoidrdlocked++] = OID(header);
-         } else {
-           oidrdlocked[numoidrdlocked++] = OID(header);
-           transAbortProcess(oidrdlocked, &numoidrdlocked, oidwrlocked, &numoidwrlocked);
-           DEBUGSTM("RD Abort: rd: %u wr: %u tot: %u type: %u ver: %u\n", numoidrdlocked, numoidwrlocked, c_numelements, TYPE(header), header->version);
-           return TRANS_ABORT;
-         }
-       } else { /* cannot aquire lock */
-         if(version == header->version) {
-           softabort=1;
-         } else {
-           transAbortProcess(oidrdlocked, &numoidrdlocked, oidwrlocked, &numoidwrlocked);
-           DEBUGSTM("RD Abort: rd: %u wr: %u tot: %u type: %u ver: %u\n", numoidrdlocked, numoidwrlocked, c_numelements, TYPE(header), header->version);
-           return TRANS_ABORT;
-         }
-       }
+       oidrdversion[numoidrdlocked]=version;
+       oidrdlocked[numoidrdlocked++] = header;
       }
-    
       curr = curr->next;
     }
   } //end of for
+
+  //THIS IS THE SERIALIZATION POINT *****
+
+  for(i=0;i<numoidrdlocked;i++) {
+    /* Read from the main heap  and compare versions */
+    objheader_t *header=oidrdlocked[i];
+    unsigned int version=oidrdversion[i];
+    if(header->lock>0) { //not write locked
+      if(version != header->version) {/* versions do not match */
+       oidrdlocked[numoidrdlocked++] = OID(header);
+       transAbortProcess(oidwrlocked, numoidwrlocked);
+       DEBUGSTM("RD Abort: rd: %u wr: %u tot: %u type: %u ver: %u\n", numoidrdlocked, numoidwrlocked, c_numelements, TYPE(header), header->version);
+       if (c_numelements>=200) {
+         free(oidrdlocked);
+         free(oidrdversion);
+         free(oidwrlocked);
+       }
+       return TRANS_ABORT;
+      }
+    } else { /* cannot aquire lock */
+      //do increment as we didn't get lock
+      if(version == header->version) {
+       softabort=1;
+      } else {
+       transAbortProcess(oidwrlocked, numoidwrlocked);
+       DEBUGSTM("RD Abort: rd: %u wr: %u tot: %u type: %u ver: %u\n", numoidrdlocked, numoidwrlocked, c_numelements, TYPE(header), header->version);
+       if (c_numelements>=200) {
+         free(oidrdlocked);
+         free(oidrdversion);
+         free(oidwrlocked);
+       }
+       return TRANS_ABORT;
+      }
+    }
+  }
   
   /* Decide the final response */
   if (softabort) {
-    transAbortProcess(oidrdlocked, &numoidrdlocked, oidwrlocked, &numoidwrlocked);
+    transAbortProcess(oidwrlocked, numoidwrlocked);
     DEBUGSTM("Soft Abort: rd: %u wr: %u tot: %u\n", numoidrdlocked, numoidwrlocked, c_numelements);
+    if (c_numelements>=200) {
+      free(oidrdlocked);
+      free(oidrdversion);
+      free(oidwrlocked);
+    }
     return TRANS_SOFT_ABORT;
   } else {
-    transCommitProcess(oidrdlocked, &numoidrdlocked, oidwrlocked, &numoidwrlocked);
+    transCommitProcess(oidwrlocked, numoidwrlocked);
     DEBUGSTM("Commit: rd: %u wr: %u tot: %u\n", numoidrdlocked, numoidwrlocked, c_numelements);
+    if (c_numelements>=200) {
+      free(oidrdlocked);
+      free(oidrdversion);
+      free(oidwrlocked);
+    }
     return TRANS_COMMIT;
   }
 }
@@ -381,17 +421,21 @@ int alttraverseCache() {
   int numoidrdlocked=0;
   int numoidwrlocked=0;
   void * rdlocked[200];
+  int rdversion[200];
   void * wrlocked[200];
   int softabort=0;
   int i;
   void ** oidrdlocked;
+  int * oidrdversion;
   void ** oidwrlocked;
   if (c_numelements<200) {
     oidrdlocked=rdlocked;
+    oidrdversion=rdversion;
     oidwrlocked=wrlocked;
   } else {
     int size=c_numelements*sizeof(void*);
     oidrdlocked=malloc(size);
+    oidrdversion=malloc(size);
     oidwrlocked=malloc(size);
   }
   chashlistnode_t *curr = c_list;
@@ -399,9 +443,8 @@ int alttraverseCache() {
   while(curr != NULL) {
     //if the first bin in hash table is empty
     objheader_t * headeraddr=&((objheader_t *) curr->val)[-1];
-    
-    unsigned int version = headeraddr->version;
     objheader_t *header=(objheader_t *) (((char *)curr->key)-sizeof(objheader_t));
+    unsigned int version = headeraddr->version;
     
     if(STATUS(headeraddr) & DIRTY) {
       /* Read from the main heap  and compare versions */
@@ -411,8 +454,13 @@ int alttraverseCache() {
          oidwrlocked[numoidwrlocked++] = OID(header);
        } else { 
          oidwrlocked[numoidwrlocked++] = OID(header);
-         transAbortProcess(oidrdlocked, &numoidrdlocked, oidwrlocked, &numoidwrlocked);
+         transAbortProcess(oidwrlocked, numoidwrlocked);
          DEBUGSTM("WR Abort: rd: %u wr: %u tot: %u type: %u ver: %u\n", numoidrdlocked, numoidwrlocked, c_numelements, TYPE(header), header->version);
+         if (c_numelements>=200) {
+           free(oidrdlocked);
+           free(oidrdversion);
+           free(oidwrlocked);
+         }
          return TRANS_ABORT;
        }
       } else { /* cannot aquire lock */
@@ -420,44 +468,72 @@ int alttraverseCache() {
          /* versions match */
          softabort=1;
        } else {
-         transAbortProcess(oidrdlocked, &numoidrdlocked, oidwrlocked, &numoidwrlocked);
+         transAbortProcess(oidwrlocked, numoidwrlocked);
          DEBUGSTM("WR Abort: rd: %u wr: %u tot: %u type: %u ver: %u\n", numoidrdlocked, numoidwrlocked, c_numelements, TYPE(header), header->version);
+         if (c_numelements>=200) {
+           free(oidrdlocked);
+           free(oidrdversion);
+           free(oidwrlocked);
+         }
          return TRANS_ABORT;
        }
       }
     } else {
       /* Read from the main heap  and compare versions */
-      if(read_trylock(&header->lock)) { //can further aquire read locks
-       if(version == header->version) {/* versions match */
-         oidrdlocked[numoidrdlocked++] = OID(header);
-       } else {
-         oidrdlocked[numoidrdlocked++] = OID(header);
-         transAbortProcess(oidrdlocked, &numoidrdlocked, oidwrlocked, &numoidwrlocked);
-         DEBUGSTM("RD Abort: rd: %u wr: %u tot: %u type: %u ver: %u\n", numoidrdlocked, numoidwrlocked, c_numelements, TYPE(header), header->version);
-         return TRANS_ABORT;
+      oidrdversion[numoidrdlocked]=version;
+      oidrdlocked[numoidrdlocked++] = header;
+    }
+    curr = curr->lnext;
+  }
+  //THIS IS THE SERIALIZATION POINT *****
+  for(i=0;i<numoidrdlocked;i++) {
+    objheader_t * header = oidrdlocked[i];
+    unsigned int version=oidrdversion[i];
+    if(header->lock>=0) {
+      if(version != header->version) {
+       transAbortProcess(oidwrlocked, numoidwrlocked);
+       DEBUGSTM("RD Abort: rd: %u wr: %u tot: %u type: %u ver: %u\n", numoidrdlocked, numoidwrlocked, c_numelements, TYPE(header), header->version);
+       if (c_numelements>=200) {
+         free(oidrdlocked);
+         free(oidrdversion);
+         free(oidwrlocked);
        }
-      } else { /* cannot aquire lock */
-       if(version == header->version) {
-         softabort=1;
-       } else {
-         transAbortProcess(oidrdlocked, &numoidrdlocked, oidwrlocked, &numoidwrlocked);
-         DEBUGSTM("RD Abort: rd: %u wr: %u tot: %u type: %u ver: %u\n", numoidrdlocked, numoidwrlocked, c_numelements, TYPE(header), header->version);
-         return TRANS_ABORT;
+       return TRANS_ABORT;
+      }
+    } else { /* cannot aquire lock */
+      if(version == header->version) {
+       softabort=1;
+      } else {
+       transAbortProcess(oidwrlocked, numoidwrlocked);
+       DEBUGSTM("RD Abort: rd: %u wr: %u tot: %u type: %u ver: %u\n", numoidrdlocked, numoidwrlocked, c_numelements, TYPE(header), header->version);
+       if (c_numelements>=200) {
+         free(oidrdlocked);
+         free(oidrdversion);
+         free(oidwrlocked);
        }
+       return TRANS_ABORT;
       }
     }
-    
-    curr = curr->lnext;
   }
   
   /* Decide the final response */
   if (softabort) {
-    transAbortProcess(oidrdlocked, &numoidrdlocked, oidwrlocked, &numoidwrlocked);
+    transAbortProcess(oidwrlocked, numoidwrlocked);
     DEBUGSTM("Soft Abort: rd: %u wr: %u tot: %u\n", numoidrdlocked, numoidwrlocked, c_numelements);
+    if (c_numelements>=200) {
+      free(oidrdlocked);
+      free(oidrdversion);
+      free(oidwrlocked);
+    }
     return TRANS_SOFT_ABORT;
   } else {
-    transCommitProcess(oidrdlocked, &numoidrdlocked, oidwrlocked, &numoidwrlocked);
+    transCommitProcess(oidwrlocked, numoidwrlocked);
     DEBUGSTM("Commit: rd: %u wr: %u tot: %u\n", numoidrdlocked, numoidwrlocked, c_numelements);
+    if (c_numelements>=200) {
+      free(oidrdlocked);
+      free(oidrdversion);
+      free(oidwrlocked);
+    }
     return TRANS_COMMIT;
   }
 }
@@ -468,26 +544,17 @@ int alttraverseCache() {
  *
  * =================================
  */
-int transAbortProcess(void **oidrdlocked, int *numoidrdlocked, void **oidwrlocked, int *numoidwrlocked) {
+int transAbortProcess(void **oidwrlocked, int numoidwrlocked) {
   int i;
   objheader_t *header;
   /* Release read locks */
-  for(i=0; i< *numoidrdlocked; i++) {
-    /* Read from the main heap */
-    header = (objheader_t *)(((char *)(oidrdlocked[i])) - sizeof(objheader_t));
-    read_unlock(&header->lock);
-  }
 
   /* Release write locks */
-  for(i=0; i< *numoidwrlocked; i++) {
+  for(i=0; i< numoidwrlocked; i++) {
     /* Read from the main heap */
     header = (objheader_t *)(((char *)(oidwrlocked[i])) - sizeof(objheader_t));
     write_unlock(&header->lock);
   }
-  if (c_numelements>=200) {
-    free(oidrdlocked);
-    free(oidwrlocked);
-  }
 }
 
 /* ==================================
@@ -495,8 +562,7 @@ int transAbortProcess(void **oidrdlocked, int *numoidrdlocked, void **oidwrlocke
  *
  * =================================
  */
-int transCommitProcess(void ** oidrdlocked, int *numoidrdlocked,
-                    void ** oidwrlocked, int *numoidwrlocked) {
+int transCommitProcess(void ** oidwrlocked, int numoidwrlocked) {
   objheader_t *header;
   void *ptrcreate;
   int i;
@@ -511,7 +577,7 @@ int transCommitProcess(void ** oidrdlocked, int *numoidrdlocked,
   }
   
   /* Copy from transaction cache -> main object store */
-  for (i = 0; i < *numoidwrlocked; i++) {
+  for (i = 0; i < numoidwrlocked; i++) {
     /* Read from the main heap */ 
     header = (objheader_t *)(((char *)(oidwrlocked[i])) - sizeof(objheader_t));
     int tmpsize;
@@ -524,22 +590,11 @@ int transCommitProcess(void ** oidrdlocked, int *numoidrdlocked,
     header->version += 1;
   }
   
-  /* Release read locks */
-  for(i=0; i< *numoidrdlocked; i++) {
-    /* Read from the main heap */
-    header = (objheader_t *)(((char *)(oidrdlocked[i])) - sizeof(objheader_t)); 
-    read_unlock(&header->lock);
-  }
-
   /* Release write locks */
-  for(i=0; i< *numoidwrlocked; i++) {
+  for(i=0; i< numoidwrlocked; i++) {
     header = (objheader_t *)(((char *)(oidwrlocked[i])) - sizeof(objheader_t)); 
     write_unlock(&header->lock);
   }
-  if (c_numelements>=200) {
-    free(oidrdlocked);
-    free(oidwrlocked);
-  }
   return 0;
 }
 
index 1631d66658c33d0cc07dc61611f3f432a10fb1af..c51ab03d33ffad5f160de5950bf7f80d32ab5679 100644 (file)
@@ -147,8 +147,8 @@ __attribute__((pure)) void *transRead(void * oid);
 int transCommit();
 int traverseCache();
 int alttraverseCache();
-int transAbortProcess(void **, int *, void **, int *);
-int transCommmitProcess(void **, int *, void **, int *);
+int transAbortProcess(void **, int);
+int transCommmitProcess(void **, int);
 void randomdelay(int);
 
 #endif