From: adash Date: Wed, 19 Sep 2007 21:15:03 +0000 (+0000) Subject: Fixed : crashing due to pile creation X-Git-Tag: preEdgeChange~426 X-Git-Url: http://demsky.eecs.uci.edu/git/?a=commitdiff_plain;h=a70277d9a8adfe8110e026976a083e188abb3273;p=IRC.git Fixed : crashing due to pile creation Changed locks to have recursive attributes --- diff --git a/Robust/src/Runtime/DSTM/interface/mcpileq.c b/Robust/src/Runtime/DSTM/interface/mcpileq.c index 38941ef5..d6da34f3 100644 --- a/Robust/src/Runtime/DSTM/interface/mcpileq.c +++ b/Robust/src/Runtime/DSTM/interface/mcpileq.c @@ -5,23 +5,14 @@ mcpileq_t mcqueue; void mcpileqInit(void) { /* Initialize machine queue that containing prefetch oids and offset values sorted by remote machineid */ mcqueue.front = mcqueue.rear = NULL; - pthread_mutex_init(&mcqueue.qlock, NULL); + //Intiliaze and set machile pile queue's mutex attribute + pthread_mutexattr_init(&mcqueue.qlockattr); + pthread_mutexattr_settype(&mcqueue.qlockattr, PTHREAD_MUTEX_RECURSIVE_NP); + //pthread_mutex_init(&mcqueue.qlock, NULL); + pthread_mutex_init(&mcqueue.qlock,&mcqueue.qlockattr); pthread_cond_init(&mcqueue.qcond, NULL); } -/* Insert to the rear of machine pile queue */ -/* -void mcpileenqueue(prefetchpile_t *node) { - if(mcqueue.front == NULL && mcqueue.rear == NULL) { - mcqueue.front = mcqueue.rear = node; - } else { - node->next = NULL; - mcqueue.rear->next = node; - mcqueue.rear = node; - } -} -*/ - /* Insert to the rear of machine pile queue */ void mcpileenqueue(prefetchpile_t *node) { prefetchpile_t *tmp, *prev; diff --git a/Robust/src/Runtime/DSTM/interface/mcpileq.h b/Robust/src/Runtime/DSTM/interface/mcpileq.h index 8291c591..8c570d7f 100644 --- a/Robust/src/Runtime/DSTM/interface/mcpileq.h +++ b/Robust/src/Runtime/DSTM/interface/mcpileq.h @@ -24,6 +24,7 @@ typedef struct prefetchpile { typedef struct mcpileq { prefetchpile_t *front, *rear; pthread_mutex_t qlock; + pthread_mutexattr_t qlockattr; pthread_cond_t qcond; }mcpileq_t; diff --git a/Robust/src/Runtime/DSTM/interface/prelookup.c b/Robust/src/Runtime/DSTM/interface/prelookup.c index 64f756ba..6eda49cf 100644 --- a/Robust/src/Runtime/DSTM/interface/prelookup.c +++ b/Robust/src/Runtime/DSTM/interface/prelookup.c @@ -27,7 +27,7 @@ unsigned int prehashCreate(unsigned int size, float loadfactor) { //Initialize mutex var pthread_mutex_init(&pflookup.lock, &pflookup.prefetchmutexattr); //pthread_mutex_init(&pflookup.lock, NULL); - + pthread_cond_init(&pflookup.cond, NULL); return 0; } diff --git a/Robust/src/Runtime/DSTM/interface/queue.c b/Robust/src/Runtime/DSTM/interface/queue.c index 7a8fbba3..954a52b5 100644 --- a/Robust/src/Runtime/DSTM/interface/queue.c +++ b/Robust/src/Runtime/DSTM/interface/queue.c @@ -5,7 +5,10 @@ primarypfq_t pqueue; //Global queue void queueInit(void) { /* Intitialize primary queue */ pqueue.front = pqueue.rear = NULL; - pthread_mutex_init(&pqueue.qlock, NULL); + pthread_mutexattr_init(&pqueue.qlockattr); + pthread_mutexattr_settype(&pqueue.qlockattr, PTHREAD_MUTEX_RECURSIVE_NP); + pthread_mutex_init(&pqueue.qlock, &pqueue.qlockattr); + //pthread_mutex_init(&pqueue.qlock, NULL); pthread_cond_init(&pqueue.qcond, NULL); } diff --git a/Robust/src/Runtime/DSTM/interface/queue.h b/Robust/src/Runtime/DSTM/interface/queue.h index 2a3754dd..d315135b 100644 --- a/Robust/src/Runtime/DSTM/interface/queue.h +++ b/Robust/src/Runtime/DSTM/interface/queue.h @@ -14,6 +14,7 @@ typedef struct prefetchqelem { typedef struct primarypfq { prefetchqelem_t *front, *rear; pthread_mutex_t qlock; + pthread_mutexattr_t qlockattr; pthread_cond_t qcond; } primarypfq_t; diff --git a/Robust/src/Runtime/DSTM/interface/trans.c b/Robust/src/Runtime/DSTM/interface/trans.c index c50a10b0..fd0cd0d6 100644 --- a/Robust/src/Runtime/DSTM/interface/trans.c +++ b/Robust/src/Runtime/DSTM/interface/trans.c @@ -27,10 +27,11 @@ /* Global Variables */ extern int classsize[]; -extern primarypfq_t pqueue; // shared prefetch queue +extern primarypfq_t pqueue; //Shared prefetch queue extern mcpileq_t mcqueue; //Shared queue containing prefetch requests sorted by remote machineids objstr_t *prefetchcache; //Global Prefetch cache pthread_mutex_t prefetchcache_mutex;// Mutex to lock Prefetch Cache +pthread_mutexattr_t prefetchcache_mutex_attr; /* Attribute for lock to make it a recursive lock */ extern pthread_mutex_t mainobjstore_mutex;// Mutex to lock main Object store extern prehashtable_t pflookup; //Global Prefetch cache's lookup table pthread_t wthreads[NUM_THREADS]; //Worker threads for working on the prefetch queue @@ -125,7 +126,13 @@ void transInit() { int t, rc; //Create and initialize prefetch cache structure prefetchcache = objstrCreate(PREFETCH_CACHE_SIZE); - pthread_mutex_init(&prefetchcache_mutex, NULL); + + /* Initialize attributes for mutex */ + pthread_mutexattr_init(&prefetchcache_mutex_attr); + pthread_mutexattr_settype(&prefetchcache_mutex_attr, PTHREAD_MUTEX_RECURSIVE_NP); + + pthread_mutex_init(&prefetchcache_mutex, &prefetchcache_mutex_attr); + //Create prefetch cache lookup table if(prehashCreate(HASH_SIZE, LOADFACTOR)) return; //Failure @@ -207,8 +214,7 @@ objheader_t *transRead(transrecord_t *record, unsigned int oid) { #endif } else if ((objheader = (objheader_t *) mhashSearch(oid)) != NULL) { /* Look up in machine lookup table and copy into cache*/ - tmp = mhashSearch(oid); - GETSIZE(size, tmp); + GETSIZE(size, objheader); size += sizeof(objheader_t); //TODO:Lock the local trans cache while copying the object here objcopy = objstrAlloc(record->cache, size); @@ -236,7 +242,7 @@ objheader_t *transRead(transrecord_t *record, unsigned int oid) { #endif } else { /*If object not found in prefetch cache then block until object appears in the prefetch cache */ - pthread_mutex_lock(&prefetchcache_mutex); + pthread_mutex_lock(&pflookup.lock); while(!found) { rc = pthread_cond_timedwait(&pflookup.cond, &pflookup.lock, &ts); if(rc == ETIMEDOUT) { @@ -249,14 +255,14 @@ objheader_t *transRead(transrecord_t *record, unsigned int oid) { objcopy = objstrAlloc(record->cache, size); memcpy(objcopy, (void *)tmp, size); chashInsert(record->lookupTable, OID(tmp), objcopy); - pthread_mutex_unlock(&prefetchcache_mutex); + pthread_mutex_unlock(&pflookup.lock); #ifdef COMPILER return &objcopy[1]; #else return objcopy; #endif } else { - pthread_mutex_unlock(&prefetchcache_mutex); + pthread_mutex_unlock(&pflookup.lock); break; } } @@ -535,6 +541,9 @@ int transCommit(transrecord_t *record) { /* Retry trans commit procedure if not sucessful in the first try */ } while (treplyretry == 1); + /* Free Resources */ + objstrDelete(record->cache); + chashDelete(record->lookupTable); free(record); return 0; } @@ -677,19 +686,13 @@ void decideResponse(thread_data_array_t *tdata) { } } - /* Send Abort */ if(transdisagree > 0) { + /* Send Abort */ *(tdata->replyctrl) = TRANS_ABORT; - /* Free resources */ - objstrDelete(tdata->rec->cache); - chashDelete(tdata->rec->lookupTable); } else if(transagree == tdata->buffer->f.mcount){ /* Send Commit */ *(tdata->replyctrl) = TRANS_COMMIT; - /* Free resources */ - objstrDelete(tdata->rec->cache); - chashDelete(tdata->rec->lookupTable); - } else { /* (transsoftabort > 0 && transdisagree == 0) */ + } else { /* Send Abort in soft abort case followed by retry commiting transaction again*/ *(tdata->replyctrl) = TRANS_ABORT; *(tdata->replyretry) = 1; @@ -1039,6 +1042,10 @@ int transComProcess(void *modptr, unsigned int *oidmod, unsigned int *oidcreated header->version += 1; } + /*If object is in prefetch cache then update it in prefetch cache */ + + + /* If object is newly created inside transaction then commit it */ for (i = 0; i < numcreated; i++) { int tmpsize; @@ -1046,7 +1053,6 @@ int transComProcess(void *modptr, unsigned int *oidmod, unsigned int *oidcreated mhashInsert(oidcreated[i], (((char *)modptr) + offset)); GETSIZE(tmpsize, header); offset += sizeof(objheader_t) + tmpsize; - lhashInsert(oidcreated[i], myIpAddr); }