From: adash Date: Fri, 2 May 2008 21:46:47 +0000 (+0000) Subject: check LOCK versions X-Git-Tag: preEdgeChange~115 X-Git-Url: http://demsky.eecs.uci.edu/git/?a=commitdiff_plain;h=962a86de3fee386e1490fad89ab65da34acf111a;p=IRC.git check LOCK versions --- diff --git a/Robust/src/Runtime/DSTM/interface/dstmserver.c b/Robust/src/Runtime/DSTM/interface/dstmserver.c index 505fd348..ddacde97 100644 --- a/Robust/src/Runtime/DSTM/interface/dstmserver.c +++ b/Robust/src/Runtime/DSTM/interface/dstmserver.c @@ -18,6 +18,7 @@ extern pthread_mutex_t notifymutex; objstr_t *mainobjstore; pthread_mutex_t mainobjstore_mutex; +pthread_mutex_t lockObjHeader; pthread_mutexattr_t mainobjstore_mutex_attr; /* Attribute for lock to make it a recursive lock */ sockPoolHashTable_t *transPResponseSocketPool; @@ -32,6 +33,7 @@ int dstmInit(void) pthread_mutexattr_init(&mainobjstore_mutex_attr); pthread_mutexattr_settype(&mainobjstore_mutex_attr, PTHREAD_MUTEX_RECURSIVE_NP); pthread_mutex_init(&mainobjstore_mutex, &mainobjstore_mutex_attr); + pthread_mutex_init(&lockObjHeader,NULL); if (mhashCreate(HASH_SIZE, LOADFACTOR)) return 1; //failure @@ -434,66 +436,69 @@ char handleTransReq(fixed_data_t *fixed, trans_commit_data_t *transinfo, unsigne int tmpsize; headptr = (objheader_t *) ptr; oid = OID(headptr); - version = headptr->version; - GETSIZE(tmpsize, headptr); - ptr += sizeof(objheader_t) + tmpsize; - } - - /* Check if object is still present in the machine since the beginning of TRANS_REQUEST */ - - if ((mobj = mhashSearch(oid)) == NULL) {/* Obj not found */ - /* Save the oids not found and number of oids not found for later use */ - oidnotfound[objnotfound] = oid; - objnotfound++; - } else { /* If Obj found in machine (i.e. has not moved) */ - /* Check if Obj is locked by any previous transaction */ - if ((STATUS(((objheader_t *)mobj)) & LOCK) == LOCK) { - if (version == ((objheader_t *)mobj)->version) { /* If locked then match versions */ - v_matchlock++; - } else {/* If versions don't match ...HARD ABORT */ - v_nomatch++; - /* Send TRANS_DISAGREE to Coordinator */ - control = TRANS_DISAGREE; - if (objlocked > 0) { - for(j = 0; j < objlocked; j++) { - if((headptr = mhashSearch(oidlocked[j])) == NULL) { - printf("mhashSearch returns NULL at %s, %d\n", __FILE__, __LINE__); - return 0; - } - STATUS(headptr) &= ~(LOCK); - } - free(oidlocked); - } - send_data(acceptfd, &control, sizeof(char)); - return control; - } - } else {/* If Obj is not locked then lock object */ - STATUS(((objheader_t *)mobj)) |= LOCK; - /* Save all object oids that are locked on this machine during this transaction request call */ - oidlocked[objlocked] = OID(((objheader_t *)mobj)); - objlocked++; - if (version == ((objheader_t *)mobj)->version) { /* Check if versions match */ - v_matchnolock++; - } else { /* If versions don't match ...HARD ABORT */ - v_nomatch++; - control = TRANS_DISAGREE; - if (objlocked > 0) { - for(j = 0; j < objlocked; j++) { - if((headptr = mhashSearch(oidlocked[j])) == NULL) { - printf("mhashSearch returns NULL at %s, %d\n", __FILE__, __LINE__); - return 0; - } - STATUS(headptr) &= ~(LOCK); - } - free(oidlocked); - } - - /* Send TRANS_DISAGREE to Coordinator */ - send_data(acceptfd, &control, sizeof(char)); - return control; - } - } - } + version = headptr->version; + GETSIZE(tmpsize, headptr); + ptr += sizeof(objheader_t) + tmpsize; + } + + /* Check if object is still present in the machine since the beginning of TRANS_REQUEST */ + + if ((mobj = mhashSearch(oid)) == NULL) {/* Obj not found */ + /* Save the oids not found and number of oids not found for later use */ + oidnotfound[objnotfound] = oid; + objnotfound++; + } else { /* If Obj found in machine (i.e. has not moved) */ + /* Check if Obj is locked by any previous transaction */ + pthread_mutex_lock(&lockObjHeader); + if ((STATUS(((objheader_t *)mobj)) & LOCK) == LOCK) { + pthread_mutex_unlock(&lockObjHeader); + if (version == ((objheader_t *)mobj)->version) { /* If locked then match versions */ + v_matchlock++; + } else {/* If versions don't match ...HARD ABORT */ + v_nomatch++; + /* Send TRANS_DISAGREE to Coordinator */ + control = TRANS_DISAGREE; + if (objlocked > 0) { + for(j = 0; j < objlocked; j++) { + if((headptr = mhashSearch(oidlocked[j])) == NULL) { + printf("mhashSearch returns NULL at %s, %d\n", __FILE__, __LINE__); + return 0; + } + STATUS(headptr) &= ~(LOCK); + } + free(oidlocked); + } + send_data(acceptfd, &control, sizeof(char)); + return control; + } + } else {/* If Obj is not locked then lock object */ + STATUS(((objheader_t *)mobj)) |= LOCK; + pthread_mutex_unlock(&lockObjHeader); + /* Save all object oids that are locked on this machine during this transaction request call */ + oidlocked[objlocked] = OID(((objheader_t *)mobj)); + objlocked++; + if (version == ((objheader_t *)mobj)->version) { /* Check if versions match */ + v_matchnolock++; + } else { /* If versions don't match ...HARD ABORT */ + v_nomatch++; + control = TRANS_DISAGREE; + if (objlocked > 0) { + for(j = 0; j < objlocked; j++) { + if((headptr = mhashSearch(oidlocked[j])) == NULL) { + printf("mhashSearch returns NULL at %s, %d\n", __FILE__, __LINE__); + return 0; + } + STATUS(headptr) &= ~(LOCK); + } + free(oidlocked); + } + + /* Send TRANS_DISAGREE to Coordinator */ + send_data(acceptfd, &control, sizeof(char)); + return control; + } + } + } } /* Decide what control message to send to Coordinator */ @@ -565,7 +570,6 @@ int transCommitProcess(void *modptr, unsigned int *oidmod, unsigned int *oidlock return 1; } GETSIZE(tmpsize,header); - pthread_mutex_lock(&mainobjstore_mutex); memcpy((char*)header + sizeof(objheader_t), ((char *)modptr + sizeof(objheader_t) + offset), tmpsize); header->version += 1; /* If threads are waiting on this object to be updated, notify them */ @@ -574,7 +578,6 @@ int transCommitProcess(void *modptr, unsigned int *oidmod, unsigned int *oidlock notifyAll(&header->notifylist, OID(header), header->version); } pthread_mutex_unlock(¬ifymutex); - pthread_mutex_unlock(&mainobjstore_mutex); offset += sizeof(objheader_t) + tmpsize; } diff --git a/Robust/src/Runtime/DSTM/interface/trans.c b/Robust/src/Runtime/DSTM/interface/trans.c index bbd87742..4141f222 100644 --- a/Robust/src/Runtime/DSTM/interface/trans.c +++ b/Robust/src/Runtime/DSTM/interface/trans.c @@ -39,6 +39,7 @@ unsigned int oidMax; sockPoolHashTable_t *transReadSockPool; sockPoolHashTable_t *transPrefetchSockPool; pthread_mutex_t notifymutex; +pthread_mutex_t atomicObjLock; void printhex(unsigned char *, int); plistnode_t *createPiles(transrecord_t *); @@ -217,6 +218,7 @@ void transInit() { pthread_mutex_init(&prefetchcache_mutex, &prefetchcache_mutex_attr); pthread_mutex_init(¬ifymutex, NULL); + pthread_mutex_init(&atomicObjLock, NULL); //Create prefetch cache lookup table if(prehashCreate(HASH_SIZE, LOADFACTOR)) { printf("ERROR\n"); @@ -544,7 +546,7 @@ int transCommit(transrecord_t *record) { } free(thread_data_array[i].buffer); } - + /* Free resources */ pthread_cond_destroy(&tcond); pthread_mutex_destroy(&tlock); @@ -561,7 +563,6 @@ int transCommit(transrecord_t *record) { /* Retry trans commit procedure during soft_abort case */ } while (treplyretry); - if(treplyctrl == TRANS_ABORT) { /* Free Resources */ objstrDelete(record->cache); @@ -854,26 +855,29 @@ void *handleLocalReq(void *threadarg) { objnotfound++; } else { /* If Obj found in machine (i.e. has not moved) */ /* Check if Obj is locked by any previous transaction */ + pthread_mutex_lock(&atomicObjLock); if ((STATUS((objheader_t *)mobj) & LOCK) == LOCK) { - if (version == ((objheader_t *)mobj)->version) { /* If locked then match versions */ - v_matchlock++; - } else {/* If versions don't match ...HARD ABORT */ - v_nomatch++; - /* Send TRANS_DISAGREE to Coordinator */ - localtdata->tdata->recvmsg[localtdata->tdata->thread_id].rcv_status = TRANS_DISAGREE; - } + pthread_mutex_unlock(&atomicObjLock); + if (version == ((objheader_t *)mobj)->version) { /* If locked then match versions */ + v_matchlock++; + } else {/* If versions don't match ...HARD ABORT */ + v_nomatch++; + /* Send TRANS_DISAGREE to Coordinator */ + localtdata->tdata->recvmsg[localtdata->tdata->thread_id].rcv_status = TRANS_DISAGREE; + } } else {/* If Obj is not locked then lock object */ - STATUS(((objheader_t *)mobj)) |= LOCK; - /* Save all object oids that are locked on this machine during this transaction request call */ - oidlocked[objlocked] = OID(((objheader_t *)mobj)); - objlocked++; - if (version == ((objheader_t *)mobj)->version) { /* Check if versions match */ - v_matchnolock++; - } else { /* If versions don't match ...HARD ABORT */ - v_nomatch++; - /* Send TRANS_DISAGREE to Coordinator */ - localtdata->tdata->recvmsg[localtdata->tdata->thread_id].rcv_status = TRANS_DISAGREE; - } + STATUS(((objheader_t *)mobj)) |= LOCK; + pthread_mutex_unlock(&atomicObjLock); + /* Save all object oids that are locked on this machine during this transaction request call */ + oidlocked[objlocked] = OID(((objheader_t *)mobj)); + objlocked++; + if (version == ((objheader_t *)mobj)->version) { /* Check if versions match */ + v_matchnolock++; + } else { /* If versions don't match ...HARD ABORT */ + v_nomatch++; + /* Send TRANS_DISAGREE to Coordinator */ + localtdata->tdata->recvmsg[localtdata->tdata->thread_id].rcv_status = TRANS_DISAGREE; + } } } } // End for @@ -1509,7 +1513,6 @@ int reqNotify(unsigned int *oidarry, unsigned short *versionarry, unsigned int n *((unsigned int *)(&msg[1] + size)) = myIpAddr; size += sizeof(unsigned int); *((unsigned int *)(&msg[1] + size)) = threadid; - pthread_mutex_lock(&(ndata->threadnotify)); size = 1 + numoid * (sizeof(unsigned int) + sizeof(unsigned short)) + 3 * sizeof(unsigned int); send_data(sock, msg, size);