From: jihoonl Date: Sat, 11 Jun 2011 22:56:19 +0000 (+0000) Subject: support multi threading X-Git-Url: http://demsky.eecs.uci.edu/git/?a=commitdiff_plain;h=0b68833009e3891cd1161b0b1bb88377deef81b2;p=IRC.git support multi threading --- diff --git a/Robust/src/Runtime/DSTM/interface_recovery/dstm.h b/Robust/src/Runtime/DSTM/interface_recovery/dstm.h index 80c42f64..9c86fa98 100644 --- a/Robust/src/Runtime/DSTM/interface_recovery/dstm.h +++ b/Robust/src/Runtime/DSTM/interface_recovery/dstm.h @@ -9,7 +9,7 @@ #define WAIT_TIME 3 #endif - +#define CFENCE asm volatile("":::"memory"); /*********************************************************** * Macros **********************************************************/ diff --git a/Robust/src/Runtime/DSTM/interface_recovery/dstmserver.c b/Robust/src/Runtime/DSTM/interface_recovery/dstmserver.c index 55749251..c455ec84 100644 --- a/Robust/src/Runtime/DSTM/interface_recovery/dstmserver.c +++ b/Robust/src/Runtime/DSTM/interface_recovery/dstmserver.c @@ -1499,6 +1499,27 @@ int transCommitProcess(void *modptr, unsigned int *oidmod, unsigned int *oidlock printf("nummod: %d, numlocked: %d\n", nummod, numlocked); #endif + char* ptr; + objheader_t* headaddr; + objheader_t* ttmp; + ptr = (char *) modptr; + for(i = 0 ; i < nummod; i++){ + int tmpsize=0; + headaddr = (objheader_t *) ptr; +// printf("44 before OID = %u version = %d Type = %d\n",OID(headaddr),headaddr->version,TYPE(headaddr)); + ttmp = (objheader_t*)mhashSearch(oidmod[i]); + if(ttmp != NULL && TYPE(ttmp) != TYPE(headaddr)) { + printf("before OID = %u Type = %d\n",OID(headaddr),TYPE(headaddr)); + printf("After OID = %u Type = %d\n",OID(ttmp),TYPE(ttmp)); + printf("\n"); + } +// printf("44 after OID = %u version = %d Type = %d\n",OID(headaddr),headaddr->version,TYPE(headaddr)); +// else +// printf("44 after OID = %u version = %d Type = %d\n",OID(ttmp),ttmp->version,TYPE(ttmp)); + GETSIZE(tmpsize, headaddr); + ptr += sizeof(objheader_t) + tmpsize; + } + /* Process each modified object saved in the mainobject store */ for(i = 0; i < nummod; i++) { if((header = (objheader_t *) mhashSearch(oidmod[i])) == NULL) { @@ -1541,6 +1562,7 @@ int transCommitProcess(void *modptr, unsigned int *oidmod, unsigned int *oidlock dst->___cachedHash___=src->___cachedHash___; memcpy(&dst[1], &src[1], tmpsize-sizeof(struct ___Object___)); } + CFENCE; header->version += 1; #ifdef DEBUG printf("oid: %u, new header version: %d\n", oidmod[i], header->version); diff --git a/Robust/src/Runtime/DSTM/interface_recovery/trans.c b/Robust/src/Runtime/DSTM/interface_recovery/trans.c index e5725028..ab12ef68 100644 --- a/Robust/src/Runtime/DSTM/interface_recovery/trans.c +++ b/Robust/src/Runtime/DSTM/interface_recovery/trans.c @@ -112,6 +112,9 @@ char ip[16]; // for debugging purpose extern tlist_t* transList; extern pthread_mutex_t translist_mutex; extern pthread_mutex_t clearNotifyList_mutex; +pthread_mutex_t oidlock; +pthread_mutex_t tidlock; + unsigned int currentEpoch; unsigned int currentBackupMachine; @@ -571,6 +574,12 @@ void transInit() { pthread_mutex_init(&prefetchcache_mutex, &prefetchcache_mutex_attr); pthread_mutex_init(¬ifymutex, NULL); pthread_mutex_init(&atomicObjLock, NULL); +#ifdef RECOVERY + pthread_mutex_init(&oidlock,NULL); + pthread_mutex_init(&tidlock,NULL); +#endif + + #ifdef CACHE //Create prefetch cache lookup table if(prehashCreate(PHASH_SIZE, PLOADFACTOR)) { @@ -973,6 +982,7 @@ remoteread: /* This function creates objects in the transaction record */ objheader_t *transCreateObj(unsigned int size) { + pthread_mutex_lock(&oidlock); objheader_t *tmp = (objheader_t *) objstrAlloc(&t_cache, (sizeof(objheader_t) + size)); OID(tmp) = getNewOID(); tmp->notifylist = NULL; @@ -980,6 +990,7 @@ objheader_t *transCreateObj(unsigned int size) { tmp->isBackup = 0; STATUS(tmp) = NEW; t_chashInsert(OID(tmp), tmp); + pthread_mutex_unlock(&oidlock); #ifdef COMPILER return &tmp[1]; //want space after object header #else @@ -2462,6 +2473,9 @@ int transComProcess(trans_req_data_t *tdata, trans_commit_data_t *transinfo) { memcpy(&dst[1], &src[1], tmpsize-sizeof(struct ___Object___)); } + // memory barrier + CFENCE; + header->version += 1; if(header->notifylist != NULL) { #ifdef RECOVERY @@ -3062,10 +3076,12 @@ unsigned int getNewOID(void) { #ifdef RECOVERY static unsigned int tid = 0xFFFFFFFF; unsigned int getNewTransID(void) { - tid++; + pthread_mutex_lock(&tidlock); + tid+=2; if (tid > transIDMax || tid < transIDMin) { tid = (transIDMin | 1); } + pthread_mutex_unlock(&tidlock); return tid; } #endif