support multi threading
authorjihoonl <jihoonl>
Sat, 11 Jun 2011 22:56:19 +0000 (22:56 +0000)
committerjihoonl <jihoonl>
Sat, 11 Jun 2011 22:56:19 +0000 (22:56 +0000)
Robust/src/Runtime/DSTM/interface_recovery/dstm.h
Robust/src/Runtime/DSTM/interface_recovery/dstmserver.c
Robust/src/Runtime/DSTM/interface_recovery/trans.c

index 80c42f643c90d3e01083e0ce7e342eb06b998619..9c86fa98e2fb0347fca137c66485a7da11e84051 100644 (file)
@@ -9,7 +9,7 @@
 #define WAIT_TIME 3
 #endif
 
-
+#define CFENCE   asm volatile("":::"memory");
 /***********************************************************
  *       Macros
  **********************************************************/
index 557492514e21bfc0468365a4655998fa4aee2f73..c455ec84a3e4abab2ae7106230d0caea8f34cba6 100644 (file)
@@ -1499,6 +1499,27 @@ int transCommitProcess(void *modptr, unsigned int *oidmod, unsigned int *oidlock
        printf("nummod: %d, numlocked: %d\n", nummod, numlocked);
 #endif
 
+  char* ptr;
+  objheader_t* headaddr;
+  objheader_t* ttmp;
+  ptr = (char *) modptr;
+  for(i = 0 ; i < nummod; i++){
+    int tmpsize=0;
+    headaddr = (objheader_t *) ptr;
+//    printf("44 before OID = %u version = %d Type = %d\n",OID(headaddr),headaddr->version,TYPE(headaddr));
+    ttmp = (objheader_t*)mhashSearch(oidmod[i]);
+    if(ttmp != NULL && TYPE(ttmp) != TYPE(headaddr)) {
+      printf("before OID = %u Type = %d\n",OID(headaddr),TYPE(headaddr));
+      printf("After  OID = %u Type = %d\n",OID(ttmp),TYPE(ttmp));
+      printf("\n");
+    }
+//      printf("44 after  OID = %u version = %d Type = %d\n",OID(headaddr),headaddr->version,TYPE(headaddr));
+//    else
+//      printf("44 after  OID = %u version = %d Type = %d\n",OID(ttmp),ttmp->version,TYPE(ttmp));
+    GETSIZE(tmpsize, headaddr);
+    ptr += sizeof(objheader_t) + tmpsize;
+  }
+
   /* Process each modified object saved in the mainobject store */
   for(i = 0; i < nummod; i++) {
     if((header = (objheader_t *) mhashSearch(oidmod[i])) == NULL) {
@@ -1541,6 +1562,7 @@ int transCommitProcess(void *modptr, unsigned int *oidmod, unsigned int *oidlock
       dst->___cachedHash___=src->___cachedHash___;
       memcpy(&dst[1], &src[1], tmpsize-sizeof(struct ___Object___));
     }
+    CFENCE;
     header->version += 1;
 #ifdef DEBUG
     printf("oid: %u, new header version: %d\n", oidmod[i], header->version);
index e5725028b8373425cb53b1064c504b0b45d23f2d..ab12ef684c36cf298eb041e12fdf32aa8ca32e9d 100644 (file)
@@ -112,6 +112,9 @@ char ip[16];      // for debugging purpose
 extern tlist_t* transList;
 extern pthread_mutex_t translist_mutex;
 extern pthread_mutex_t clearNotifyList_mutex;
+pthread_mutex_t oidlock;
+pthread_mutex_t tidlock;
+
 
 unsigned int currentEpoch;
 unsigned int currentBackupMachine;
@@ -571,6 +574,12 @@ void transInit() {
   pthread_mutex_init(&prefetchcache_mutex, &prefetchcache_mutex_attr);
   pthread_mutex_init(&notifymutex, NULL);
   pthread_mutex_init(&atomicObjLock, NULL);
+#ifdef RECOVERY                      
+  pthread_mutex_init(&oidlock,NULL);
+  pthread_mutex_init(&tidlock,NULL);
+#endif
+
+
 #ifdef CACHE
   //Create prefetch cache lookup table
   if(prehashCreate(PHASH_SIZE, PLOADFACTOR)) {
@@ -973,6 +982,7 @@ remoteread:
 
 /* This function creates objects in the transaction record */
 objheader_t *transCreateObj(unsigned int size) {
+  pthread_mutex_lock(&oidlock);
   objheader_t *tmp = (objheader_t *) objstrAlloc(&t_cache, (sizeof(objheader_t) + size));
   OID(tmp) = getNewOID();
   tmp->notifylist = NULL;
@@ -980,6 +990,7 @@ objheader_t *transCreateObj(unsigned int size) {
   tmp->isBackup = 0;
   STATUS(tmp) = NEW;
   t_chashInsert(OID(tmp), tmp);
+  pthread_mutex_unlock(&oidlock);
 #ifdef COMPILER
   return &tmp[1]; //want space after object header
 #else
@@ -2462,6 +2473,9 @@ int transComProcess(trans_req_data_t *tdata, trans_commit_data_t *transinfo) {
       memcpy(&dst[1], &src[1], tmpsize-sizeof(struct ___Object___));
     }
 
+    // memory barrier
+    CFENCE;
+
     header->version += 1;
     if(header->notifylist != NULL) {
 #ifdef RECOVERY
@@ -3062,10 +3076,12 @@ unsigned int getNewOID(void) {
 #ifdef RECOVERY
 static unsigned int tid = 0xFFFFFFFF;
 unsigned int getNewTransID(void) {
-  tid++;
+  pthread_mutex_lock(&tidlock);
+  tid+=2;
   if (tid > transIDMax || tid < transIDMin) {
     tid = (transIDMin | 1);
   }
+  pthread_mutex_unlock(&tidlock);
   return tid;
 }
 #endif