From 0ad060629e27ee9c6e6f746baf08d2bbb983cef2 Mon Sep 17 00:00:00 2001 From: adash Date: Tue, 18 Sep 2007 17:31:31 +0000 Subject: [PATCH] Added new PTHREAD_MUTEX_RECURSIVE mutex attribute Checked locks for correcting waits Fixed double frees TODO: Correct segmentation violation due to corrupt pointers to run the third remotethread test case --- Robust/src/Runtime/DSTM/interface/dstm.h | 1 - .../src/Runtime/DSTM/interface/dstmserver.c | 63 +++++++++++---- Robust/src/Runtime/DSTM/interface/prelookup.c | 9 ++- Robust/src/Runtime/DSTM/interface/prelookup.h | 1 + Robust/src/Runtime/DSTM/interface/trans.c | 76 ++++++++++--------- 5 files changed, 97 insertions(+), 53 deletions(-) diff --git a/Robust/src/Runtime/DSTM/interface/dstm.h b/Robust/src/Runtime/DSTM/interface/dstm.h index efbc4a6b..d4ea42b7 100644 --- a/Robust/src/Runtime/DSTM/interface/dstm.h +++ b/Robust/src/Runtime/DSTM/interface/dstm.h @@ -162,7 +162,6 @@ typedef struct trans_commit_data{ typedef struct thread_data_array { int thread_id; int mid; - int pilecount; /* No of remote machines involved */ trans_req_data_t *buffer; /* Holds trans request information sent to participants */ thread_response_t *recvmsg; /* Shared datastructure to keep track of the participants response to a trans request */ pthread_cond_t *threshold; /* Condition var to waking up a thread */ diff --git a/Robust/src/Runtime/DSTM/interface/dstmserver.c b/Robust/src/Runtime/DSTM/interface/dstmserver.c index 4ddf42a3..2d293577 100644 --- a/Robust/src/Runtime/DSTM/interface/dstmserver.c +++ b/Robust/src/Runtime/DSTM/interface/dstmserver.c @@ -127,7 +127,9 @@ void *dstmAccept(void *acceptfd) perror("Error receiving object from cooridnator\n"); return NULL; } - srcObj = mhashSearch(oid); + if((srcObj = mhashSearch(oid)) == NULL) { + printf("Object not found in Main Object Store %s %d\n", __FILE__, __LINE__); + } h = (objheader_t *) srcObj; GETSIZE(size, h); size += sizeof(objheader_t); @@ -140,16 +142,16 @@ void *dstmAccept(void *acceptfd) } } else { /* Type */ - char msg[]={OBJECT_FOUND, 0, 0, 0, 0}; - *((int *)&msg[1])=size; - if(send((int)acceptfd, &msg, sizeof(msg), MSG_NOSIGNAL) < sizeof(msg)) { - perror("Error sending size of object to coordinator\n"); - return NULL; - } - if(send((int)acceptfd, h, size, MSG_NOSIGNAL) < size) { - perror("Error in sending object\n"); - return NULL; - } + char msg[]={OBJECT_FOUND, 0, 0, 0, 0}; + *((int *)&msg[1])=size; + if(send((int)acceptfd, &msg, sizeof(msg), MSG_NOSIGNAL) < sizeof(msg)) { + perror("Error sending size of object to coordinator\n"); + return NULL; + } + if(send((int)acceptfd, h, size, MSG_NOSIGNAL) < size) { + perror("Error in sending object\n"); + return NULL; + } } break; @@ -285,12 +287,19 @@ int readClientReq(trans_commit_data_t *transinfo, int acceptfd) { /*Process the information read */ if((val = processClientReq(&fixed, transinfo, listmid, objread, modptr, oidmod, acceptfd)) != 0) { printf("Error in processClientReq %s, %d\n", __FILE__, __LINE__); + /* Free resources */ + if(oidmod != NULL) { + free(oidmod); + oidmod = NULL; + } return 1; } - /* Free resources */ - free(oidmod); + if(oidmod != NULL) { + free(oidmod); + oidmod = NULL; + } return 0; } @@ -334,12 +343,21 @@ int processClientReq(fixed_data_t *fixed, trans_commit_data_t *transinfo, header = mhashSearch(transinfo->objlocked[i]);// find the header address STATUS(((objheader_t *)header)) &= ~(LOCK); } - + /* Send ack to Coordinator */ printf("DEBUG -> Recv TRANS_ABORT\n"); sendctrl = TRANS_SUCESSFUL; if(send((int)acceptfd, &sendctrl, sizeof(char), MSG_NOSIGNAL) < sizeof(char)) { perror("Error sending ACK to coordinator\n"); + if (transinfo->objlocked != NULL) { + free(transinfo->objlocked); + transinfo->objlocked = NULL; + } + if (transinfo->objnotfound != NULL) { + free(transinfo->objnotfound); + transinfo->objnotfound = NULL; + } + return 1; } ptr = NULL; @@ -350,6 +368,17 @@ int processClientReq(fixed_data_t *fixed, trans_commit_data_t *transinfo, printf("DEBUG -> Recv TRANS_COMMIT \n"); if((val = transCommitProcess(modptr, oidmod, transinfo->objlocked, fixed->nummod, transinfo->numlocked, (int)acceptfd)) != 0) { printf("Error in transCommitProcess %s, %d\n", __FILE__, __LINE__); + /* Free memory */ + printf("DEBUG -> Freeing...\n"); + fflush(stdout); + if (transinfo->objlocked != NULL) { + free(transinfo->objlocked); + transinfo->objlocked = NULL; + } + if (transinfo->objnotfound != NULL) { + free(transinfo->objnotfound); + transinfo->objnotfound = NULL; + } return 1; } break; @@ -363,10 +392,11 @@ int processClientReq(fixed_data_t *fixed, trans_commit_data_t *transinfo, //TODO Use fixed.trans_id TID since Client may have died break; } + /* Free memory */ printf("DEBUG -> Freeing...\n"); fflush(stdout); - + if (transinfo->objlocked != NULL) { free(transinfo->objlocked); transinfo->objlocked = NULL; @@ -375,6 +405,7 @@ int processClientReq(fixed_data_t *fixed, trans_commit_data_t *transinfo, free(transinfo->objnotfound); transinfo->objnotfound = NULL; } + return 0; } @@ -557,6 +588,7 @@ int transCommitProcess(void *modptr, unsigned int *oidmod, unsigned int *oidlock header = (objheader_t *) mhashSearch(oidmod[i]); header->version += 1; } + /* Unlock locked objects */ for(i = 0; i < numlocked; i++) { if((header = (objheader_t *) mhashSearch(oidlocked[i])) == NULL) { @@ -565,7 +597,6 @@ int transCommitProcess(void *modptr, unsigned int *oidmod, unsigned int *oidlock } STATUS(header) &= ~(LOCK); } - //TODO Update location lookup table /* Send ack to coordinator */ diff --git a/Robust/src/Runtime/DSTM/interface/prelookup.c b/Robust/src/Runtime/DSTM/interface/prelookup.c index e92d5957..64f756ba 100644 --- a/Robust/src/Runtime/DSTM/interface/prelookup.c +++ b/Robust/src/Runtime/DSTM/interface/prelookup.c @@ -18,8 +18,15 @@ unsigned int prehashCreate(unsigned int size, float loadfactor) { pflookup.numelements = 0; // Initial number of elements in the hash pflookup.loadfactor = loadfactor; + //Intiliaze and set prefetch table mutex attribute + pthread_mutexattr_init(&pflookup.prefetchmutexattr); + //NOTE:PTHREAD_MUTEX_RECURSIVE is currently inside a #if_def UNIX98 in the pthread.h file + //Therefore use PTHREAD_MUTEX_RECURSIVE_NP instead + pthread_mutexattr_settype(&pflookup.prefetchmutexattr, PTHREAD_MUTEX_RECURSIVE_NP); + //Initialize mutex var - pthread_mutex_init(&pflookup.lock, NULL); + pthread_mutex_init(&pflookup.lock, &pflookup.prefetchmutexattr); + //pthread_mutex_init(&pflookup.lock, NULL); return 0; } diff --git a/Robust/src/Runtime/DSTM/interface/prelookup.h b/Robust/src/Runtime/DSTM/interface/prelookup.h index e8bb204d..acb540d9 100644 --- a/Robust/src/Runtime/DSTM/interface/prelookup.h +++ b/Robust/src/Runtime/DSTM/interface/prelookup.h @@ -20,6 +20,7 @@ typedef struct prehashtable { unsigned int numelements; float loadfactor; pthread_mutex_t lock; + pthread_mutexattr_t prefetchmutexattr; pthread_cond_t cond; } prehashtable_t; diff --git a/Robust/src/Runtime/DSTM/interface/trans.c b/Robust/src/Runtime/DSTM/interface/trans.c index f85e8fbc..2b0b5fbc 100644 --- a/Robust/src/Runtime/DSTM/interface/trans.c +++ b/Robust/src/Runtime/DSTM/interface/trans.c @@ -30,11 +30,11 @@ extern int classsize[]; extern primarypfq_t pqueue; // shared prefetch queue extern mcpileq_t mcqueue; //Shared queue containing prefetch requests sorted by remote machineids objstr_t *prefetchcache; //Global Prefetch cache -pthread_mutex_t prefetchcache_mutex; -extern pthread_mutex_t mainobjstore_mutex; +pthread_mutex_t prefetchcache_mutex;// Mutex to lock Prefetch Cache +extern pthread_mutex_t mainobjstore_mutex;// Mutex to lock main Object store extern prehashtable_t pflookup; //Global Prefetch cache's lookup table pthread_t wthreads[NUM_THREADS]; //Worker threads for working on the prefetch queue -pthread_t tPrefetch; +pthread_t tPrefetch; /* Primary Prefetch thread that processes the prefetch queue */ extern objstr_t *mainobjstore; unsigned int myIpAddr; unsigned int *hostIpAddrs; @@ -196,7 +196,7 @@ objheader_t *transRead(transrecord_t *record, unsigned int oid) { rc = gettimeofday(&tp, NULL); /* Convert from timeval to timespec */ - ts.tv_nsec = tp.tv_usec * 1000; + ts.tv_nsec = tp.tv_usec * 10; /* Search local transaction cache */ if((objheader = (objheader_t *)chashSearch(record->lookupTable, oid)) != NULL){ @@ -210,6 +210,7 @@ objheader_t *transRead(transrecord_t *record, unsigned int oid) { tmp = mhashSearch(oid); GETSIZE(size, tmp); size += sizeof(objheader_t); + //TODO:Lock the local trans cache while copying the object here objcopy = objstrAlloc(record->cache, size); memcpy(objcopy, (void *)objheader, size); /* Insert into cache's lookup table */ @@ -223,6 +224,7 @@ objheader_t *transRead(transrecord_t *record, unsigned int oid) { found = 1; GETSIZE(size, tmp); size+=sizeof(objheader_t); + //TODO:Lock the local trans cache while copying the object here objcopy = objstrAlloc(record->cache, size); memcpy(objcopy, (void *)tmp, size); /* Insert into cache's lookup table */ @@ -232,41 +234,39 @@ objheader_t *transRead(transrecord_t *record, unsigned int oid) { #else return objcopy; #endif - } else { /* If not found anywhere, then block until object appears in prefetch cache */ -#if 0 - printf("Inside remote machine\n"); - pthread_mutex_lock(&pflookup.lock); + } else { + /*If object not found in prefetch cache then block until object appears in the prefetch cache */ + pthread_mutex_lock(&prefetchcache_mutex); while(!found) { - rc = pthread_cond_timedwait(&pflookup.cond, &pflookup.lock, &ts); + rc = pthread_cond_timedwait(&pflookup.cond, &pflookup.lock, &ts); if(rc == ETIMEDOUT) { printf("Wait timed out\n"); /* Check Prefetch cache again */ - if((tmp = (objheader_t *) prehashSearch(oid)) != NULL) { /* Look up in prefetch cache */ + if((tmp =(objheader_t *) prehashSearch(oid)) != NULL) { found = 1; - GETSIZE(size, tmp); + GETSIZE(size,tmp); size+=sizeof(objheader_t); objcopy = objstrAlloc(record->cache, size); memcpy(objcopy, (void *)tmp, size); - /* Insert into cache's lookup table */ chashInsert(record->lookupTable, OID(tmp), objcopy); + pthread_mutex_unlock(&prefetchcache_mutex); #ifdef COMPILER return &objcopy[1]; #else return objcopy; #endif } else { - pthread_mutex_unlock(&pflookup.lock); + pthread_mutex_unlock(&prefetchcache_mutex); break; } - pthread_mutex_unlock(&pflookup.lock); } } -#endif + /* Get the object from the remote location */ machinenumber = lhashSearch(oid); objcopy = getRemoteObj(record, machinenumber, oid); if(objcopy == NULL) { - //If object is not found in Remote location + printf("Object not found in Remote location %s, %d\n", __FILE__, __LINE__); return NULL; } else { #ifdef COMPILER @@ -275,7 +275,7 @@ objheader_t *transRead(transrecord_t *record, unsigned int oid) { return objcopy; #endif } - } + } } /* This function creates objects in the transaction record */ @@ -444,7 +444,6 @@ int transCommit(transrecord_t *record) { tosend->oidcreated = pile->oidcreated; thread_data_array[threadnum].thread_id = threadnum; thread_data_array[threadnum].mid = pile->mid; - thread_data_array[threadnum].pilecount = pilecount; thread_data_array[threadnum].buffer = tosend; thread_data_array[threadnum].recvmsg = rcvd_control_msg; thread_data_array[threadnum].threshold = &tcond; @@ -483,13 +482,15 @@ int transCommit(transrecord_t *record) { return 1; } } + threadnum++; pile = pile->next; } /* Free attribute and wait for the other threads */ pthread_attr_destroy(&attr); - for (i = 0 ;i < pilecount ; i++) { + + for (i = 0; i < pilecount; i++) { rc = pthread_join(thread[i], NULL); if(rc) { @@ -505,14 +506,18 @@ int transCommit(transrecord_t *record) { } free(thread_data_array[i].buffer); } + /* Free resources */ pthread_cond_destroy(&tcond); pthread_mutex_destroy(&tlock); - free(listmid); + if(listmid != NULL) + free(listmid); pDelete(pile_ptr); - free(thread_data_array); - free(ltdata); + if(thread_data_array != NULL) + free(thread_data_array); + if(ltdata != NULL) + free(ltdata); /* wait a random amount of time */ if (treplyretry == 1) @@ -567,7 +572,7 @@ void *transRequest(void *threadarg) { } /* Send list of machines involved in the transaction */ { - int size=sizeof(unsigned int)*tdata->pilecount; + int size=sizeof(unsigned int)*tdata->buffer->f.mcount; if (send(sd, tdata->buffer->listmid, size, MSG_NOSIGNAL) < size) { perror("Error sending list of machines for thread\n"); pthread_exit(NULL); @@ -614,7 +619,7 @@ void *transRequest(void *threadarg) { (*(tdata->count))++; /* keeps track of no of messages received by the coordinator */ /* Wake up the threads and invoke decideResponse (once) */ - if(*(tdata->count) == tdata->pilecount) { + if(*(tdata->count) == tdata->buffer->f.mcount) { if (decideResponse(tdata) != 0) { printf("decideResponse returned error %s,%d\n", __FILE__, __LINE__); pthread_mutex_unlock(tdata->lock); @@ -648,7 +653,7 @@ int decideResponse(thread_data_array_t *tdata) { int i, transagree = 0, transdisagree = 0, transsoftabort = 0; /* Counters to formulate decision of what message to send */ - for (i = 0 ; i < tdata->pilecount ; i++) { + for (i = 0 ; i < tdata->buffer->f.mcount; i++) { control = tdata->recvmsg[i].rcv_status; /* tdata: keeps track of all participant responses written onto the shared array */ switch(control) { @@ -676,7 +681,7 @@ int decideResponse(thread_data_array_t *tdata) { objstrDelete(tdata->rec->cache); chashDelete(tdata->rec->lookupTable); free(tdata->rec); - } else if(transagree == tdata->pilecount){ + } else if(transagree == tdata->buffer->f.mcount){ /* Send Commit */ *(tdata->replyctrl) = TRANS_COMMIT; /* Free resources */ @@ -840,7 +845,7 @@ void *handleLocalReq(void *threadarg) { headeraddr = chashSearch(localtdata->tdata->rec->lookupTable, localtdata->tdata->buffer->oidmod[i]); GETSIZE(size,headeraddr); size+=sizeof(objheader_t); - memcpy(modptr+offset, headeraddr, size); + memcpy((char *)modptr+offset, headeraddr, size); offset += size; } /* Write new objects into the mainobject store */ @@ -848,7 +853,7 @@ void *handleLocalReq(void *threadarg) { headeraddr = chashSearch(localtdata->tdata->rec->lookupTable, localtdata->tdata->buffer->oidcreated[i]); GETSIZE(size, headeraddr); size+=sizeof(objheader_t); - memcpy(modptr+offset, headeraddr, size); + memcpy((char *)modptr+offset, headeraddr, size); offset += size; } @@ -940,7 +945,7 @@ void *handleLocalReq(void *threadarg) { (*(localtdata->tdata->count))++; /* keeps track of no of messages received by the coordinator */ /* Wake up the threads and invoke decideResponse (once) */ - if(*(localtdata->tdata->count) == localtdata->tdata->pilecount) { + if(*(localtdata->tdata->count) == localtdata->tdata->buffer->f.mcount) { if (decideResponse(localtdata->tdata) != 0) { printf("decideResponse returned error %s,%d\n", __FILE__, __LINE__); pthread_mutex_unlock(localtdata->tdata->lock); @@ -966,7 +971,6 @@ void *handleLocalReq(void *threadarg) { } /* Free memory */ - if (localtdata->transinfo->objlocked != NULL) { free(localtdata->transinfo->objlocked); localtdata->transinfo->objlocked = NULL; @@ -987,9 +991,9 @@ int transAbortProcess(void *modptr, unsigned int *objlocked, int numlocked, int void *header; /* Set all ref counts as 1 and do garbage collection */ - ptr = (char *)modptr; + ptr = modptr; for(i = 0; i< nummod; i++) { - int tmpsize; + int tmpsize; tmp_header = (objheader_t *)ptr; tmp_header->rcount = 0; GETSIZE(tmpsize, tmp_header); @@ -1005,6 +1009,7 @@ int transAbortProcess(void *modptr, unsigned int *objlocked, int numlocked, int } /* Send ack to Coordinator */ + printf("TRANS_SUCCESSFUL\n"); /*Free the pointer */ ptr = NULL; @@ -1041,7 +1046,7 @@ int transComProcess(void *modptr, unsigned int *oidmod, unsigned int *oidcreated for (i = 0; i < numcreated; i++) { - int tmpsize; + int tmpsize; header = (objheader_t *)(((char *)modptr) + offset); mhashInsert(oidcreated[i], (((char *)modptr) + offset)); GETSIZE(tmpsize, header); @@ -1062,6 +1067,7 @@ int transComProcess(void *modptr, unsigned int *oidmod, unsigned int *oidcreated //TODO Update location lookup table /* Send ack to Coordinator */ + printf("TRANS_SUCCESSFUL\n"); return 0; } @@ -1552,12 +1558,12 @@ unsigned short getObjType(unsigned int oid) if ((objheader = (objheader_t *) prehashSearch(oid)) == NULL) { prefetch(1, &oid, &numoffsets, NULL); + pthread_mutex_lock(&pflookup.lock); while ((objheader = (objheader_t *) prehashSearch(oid)) == NULL) { - pthread_mutex_lock(&pflookup.lock); pthread_cond_wait(&pflookup.cond, &pflookup.lock); - pthread_mutex_unlock(&pflookup.lock); } + pthread_mutex_unlock(&pflookup.lock); } } -- 2.34.1