From: adash Date: Fri, 29 Jun 2007 19:11:57 +0000 (+0000) Subject: Bug fixes for local request processing X-Git-Tag: preEdgeChange~538 X-Git-Url: http://demsky.eecs.uci.edu/git/?a=commitdiff_plain;h=750469df34ecbb7a6cdb8cd2f63ad6778d2b1509;p=IRC.git Bug fixes for local request processing --- diff --git a/Robust/src/Runtime/DSTM/interface/dstm.h b/Robust/src/Runtime/DSTM/interface/dstm.h index a5d41cc9..cfcbecfd 100644 --- a/Robust/src/Runtime/DSTM/interface/dstm.h +++ b/Robust/src/Runtime/DSTM/interface/dstm.h @@ -181,7 +181,7 @@ void *handleLocalReq(void *); //the C routine that the local m/c thread will exe int decideResponse(thread_data_array_t *);// Coordinator decides what response to send to the participant char sendResponse(thread_data_array_t *, int); //Sends control message back to Participants void *getRemoteObj(transrecord_t *, unsigned int, unsigned int); -int transAbortProcess(void *, unsigned int *, int, int); +int transAbortProcess(void *, unsigned int *, int, int, int); int transComProcess(trans_commit_data_t *); /* end transactions */ #endif diff --git a/Robust/src/Runtime/DSTM/interface/trans.c b/Robust/src/Runtime/DSTM/interface/trans.c index 01073590..b61d7f41 100644 --- a/Robust/src/Runtime/DSTM/interface/trans.c +++ b/Robust/src/Runtime/DSTM/interface/trans.c @@ -217,7 +217,6 @@ int transCommit(transrecord_t *record) { sprintf(tosend->f.trans_id, "%x_%d", pile->mid, newtid); tosend->f.mcount = pilecount; tosend->f.numread = pile->numread; - printf("DEBUG-> pile numread = %d\n", pile->numread); tosend->f.nummod = pile->nummod; tosend->f.sum_bytes = pile->sum_bytes; tosend->listmid = listmid; @@ -245,14 +244,10 @@ int transCommit(transrecord_t *record) { } else { /*Unset the pile->local flag*/ pile->local = 0; - //header->status &= ~(LOCK); - /*Handle request of local pile */ /*Set flag to identify that Local machine is involved*/ ltdata->tdata = &thread_data_array[threadnum]; - printf("DEBUG->Address of ltdata sent = %x\n", <data); ltdata->transinfo = &transinfo; - printf("DEBUG-> Machine Pile numread = %d\n", ltdata->tdata->buffer->f.numread); - val = pthread_create(&thread[threadnum], NULL, handleLocalReq, (void *) <data); + val = pthread_create(&thread[threadnum], NULL, handleLocalReq, (void *) ltdata); if (val) { perror("Error in pthread create\n"); return 1; @@ -376,34 +371,6 @@ void *transRequest(void *threadarg) { (*(tdata->count))++; /* keeps track of no of messages received by the coordinator */ /* Wake up the threads and invoke decideResponse (once) */ -/* - if((*(tdata->localstatus) & LM_EXISTS) == LM_EXISTS) { //If there is a local machine involved in the transaction - if(*(tdata->count) == tdata->pilecount - 1) { - while(*(tdata->localstatus) & LM_UPDATED != LM_UPDATED) { - ;//Do nothing and wait until Local machine thread updates the common data structure - } - if(decideResponse(tdata) != 0) { - printf("decideResponse returned error %s,%d\n", __FILE__, __LINE__); - pthread_mutex_unlock(tdata->lock); - return NULL; - } - pthread_cond_broadcast(tdata->threshold); - } - } else if ((*(tdata->localstatus) & LM_EXISTS) == 0) { //No local m/c involved in transaction - if(*(tdata->count) == tdata->pilecount) { - if (decideResponse(tdata) != 0) { - printf("decideResponse returned error %s,%d\n", __FILE__, __LINE__); - pthread_mutex_unlock(tdata->lock); - close(sd); - return NULL; - } - pthread_cond_broadcast(tdata->threshold); - } else { - pthread_cond_wait(tdata->threshold, tdata->lock); - } - } -*/ - if(*(tdata->count) == tdata->pilecount) { if (decideResponse(tdata) != 0) { printf("decideResponse returned error %s,%d\n", __FILE__, __LINE__); @@ -608,7 +575,6 @@ void *getRemoteObj(transrecord_t *record, unsigned int mnum, unsigned int oid) { * It also frees the calloced memory resources */ -//int handleLocalReq(thread_data_array_t *tdata, trans_commit_data_t *transinfo) { void *handleLocalReq(void *threadarg) { int val, i = 0; short version; @@ -620,11 +586,8 @@ void *handleLocalReq(void *threadarg) { local_thread_data_array_t *localtdata; localtdata = (local_thread_data_array_t *) threadarg; - printf("DEBUG->Address of localtdata = %x\n", localtdata); - printf("DEBUG-> Machine Pile numread recv = %d\n", localtdata->tdata->buffer->f.numread); /* Counters and arrays to formulate decision on control message to be sent */ - printf("DEBUG -> %d %d\n",localtdata->tdata->buffer->f.numread, localtdata->tdata->buffer->f.nummod); oidnotfound = (unsigned int *) calloc((localtdata->tdata->buffer->f.numread + localtdata->tdata->buffer->f.nummod), sizeof(unsigned int)); oidlocked = (unsigned int *) calloc((localtdata->tdata->buffer->f.numread + localtdata->tdata->buffer->f.nummod), sizeof(unsigned int)); oidmod = (unsigned int *) calloc(localtdata->tdata->buffer->f.nummod, sizeof(unsigned int)); @@ -750,7 +713,7 @@ void *handleLocalReq(void *threadarg) { /*Based on DecideResponse(), Either COMMIT or ABORT the operation*/ if(*(localtdata->tdata->replyctrl) == TRANS_ABORT){ - if(transAbortProcess(modptr,oidlocked, localtdata->transinfo->numlocked, localtdata->transinfo->nummod) != 0) { + if(transAbortProcess(modptr,oidlocked, localtdata->transinfo->numlocked, localtdata->transinfo->nummod, localtdata->tdata->buffer->f.numread) != 0) { printf("Error in transAbortProcess() %s,%d\n", __FILE__, __LINE__); return NULL; } @@ -781,7 +744,7 @@ void *handleLocalReq(void *threadarg) { } /* This function completes the ABORT process if the transaction is aborting */ -int transAbortProcess(void *modptr, unsigned int *objlocked, int numlocked, int nummod) { +int transAbortProcess(void *modptr, unsigned int *objlocked, int numlocked, int nummod, int numread) { char *ptr; int i; objheader_t *tmp_header; @@ -800,6 +763,7 @@ int transAbortProcess(void *modptr, unsigned int *objlocked, int numlocked, int header = mhashSearch(objlocked[i]);// find the header address ((objheader_t *)header)->status &= ~(LOCK); } + //TODO/* Unset the bit for local objects */ /* Send ack to Coordinator */ printf("DEBUG-> TRANS_SUCCESSFUL\n"); @@ -826,7 +790,6 @@ int transAbortProcess(void *modptr, unsigned int *objlocked, int numlocked, int header->rcount = 1; //TODO Not sure what would be the val /* Change ptr address in mhash table */ - printf("DEBUG -> removing object oid = %d\n", transinfo->objmod[i]); mhashRemove(transinfo->objmod[i]); mhashInsert(transinfo->objmod[i], (transinfo->modptr + offset)); offset += sizeof(objheader_t) + classsize[header->type]; @@ -843,6 +806,7 @@ int transAbortProcess(void *modptr, unsigned int *objlocked, int numlocked, int } //TODO Update location lookup table + //TODO/* Unset the bit for local objects */ /* Send ack to Coordinator */ printf("DEBUG-> TRANS_SUCESSFUL\n");