Bug fixes for local request processing
authoradash <adash>
Fri, 29 Jun 2007 19:11:57 +0000 (19:11 +0000)
committeradash <adash>
Fri, 29 Jun 2007 19:11:57 +0000 (19:11 +0000)
Robust/src/Runtime/DSTM/interface/dstm.h
Robust/src/Runtime/DSTM/interface/trans.c

index a5d41cc9fd122b9f5a64ac922a1616b0fa6acc02..cfcbecfd279962bb75e9a489c7bf7be4793a20ec 100644 (file)
@@ -181,7 +181,7 @@ void *handleLocalReq(void *);       //the C routine that the local m/c thread will exe
 int decideResponse(thread_data_array_t *);// Coordinator decides what response to send to the participant
 char sendResponse(thread_data_array_t *, int); //Sends control message back to Participants
 void *getRemoteObj(transrecord_t *, unsigned int, unsigned int);
-int transAbortProcess(void *, unsigned int *, int, int);
+int transAbortProcess(void *, unsigned int *, int, int, int);
 int transComProcess(trans_commit_data_t *);
 /* end transactions */
 #endif
index 010735901d5283ca9e8924d969ffdaf227a72e9e..b61d7f41c6db64adb31bbdbefd4e2ea3aa1f39e9 100644 (file)
@@ -217,7 +217,6 @@ int transCommit(transrecord_t *record) {
                sprintf(tosend->f.trans_id, "%x_%d", pile->mid, newtid);
                tosend->f.mcount = pilecount;
                tosend->f.numread = pile->numread;
-               printf("DEBUG-> pile numread = %d\n", pile->numread);
                tosend->f.nummod = pile->nummod;
                tosend->f.sum_bytes = pile->sum_bytes;
                tosend->listmid = listmid;
@@ -245,14 +244,10 @@ int transCommit(transrecord_t *record) {
                } else {
                        /*Unset the pile->local flag*/
                        pile->local = 0;
-                       //header->status &= ~(LOCK);
-                       /*Handle request of local pile */
                        /*Set flag to identify that Local machine is involved*/
                        ltdata->tdata = &thread_data_array[threadnum];
-                       printf("DEBUG->Address of ltdata sent = %x\n", &ltdata);
                        ltdata->transinfo = &transinfo;
-                       printf("DEBUG-> Machine Pile numread = %d\n", ltdata->tdata->buffer->f.numread);
-                       val = pthread_create(&thread[threadnum], NULL, handleLocalReq, (void *) &ltdata);
+                       val = pthread_create(&thread[threadnum], NULL, handleLocalReq, (void *) ltdata);
                        if (val) {
                                perror("Error in pthread create\n");
                                return 1;
@@ -376,34 +371,6 @@ void *transRequest(void *threadarg) {
        (*(tdata->count))++; /* keeps track of no of messages received by the coordinator */
 
        /* Wake up the threads and invoke decideResponse (once) */
-/*
-       if((*(tdata->localstatus) & LM_EXISTS) == LM_EXISTS) { //If there is a local machine involved in the transaction
-               if(*(tdata->count) == tdata->pilecount - 1) {
-                       while(*(tdata->localstatus) & LM_UPDATED != LM_UPDATED) {
-                               ;//Do nothing and wait until Local machine thread updates the common data structure
-                       }
-                       if(decideResponse(tdata) != 0) {
-                               printf("decideResponse returned error %s,%d\n", __FILE__, __LINE__);
-                               pthread_mutex_unlock(tdata->lock);
-                               return NULL;
-                       }
-                       pthread_cond_broadcast(tdata->threshold);
-               }
-       } else if ((*(tdata->localstatus) & LM_EXISTS) == 0) { //No local m/c involved in transaction
-               if(*(tdata->count) == tdata->pilecount) {
-                       if (decideResponse(tdata) != 0) { 
-                               printf("decideResponse returned error %s,%d\n", __FILE__, __LINE__);
-                               pthread_mutex_unlock(tdata->lock);
-                               close(sd);
-                               return NULL;
-                       }
-                       pthread_cond_broadcast(tdata->threshold);
-               } else {
-                       pthread_cond_wait(tdata->threshold, tdata->lock);
-               }       
-       }
-*/
-
        if(*(tdata->count) == tdata->pilecount) {
                if (decideResponse(tdata) != 0) { 
                        printf("decideResponse returned error %s,%d\n", __FILE__, __LINE__);
@@ -608,7 +575,6 @@ void *getRemoteObj(transrecord_t *record, unsigned int mnum, unsigned int oid) {
  * It also frees the calloced memory resources
  */
 
-//int handleLocalReq(thread_data_array_t *tdata, trans_commit_data_t *transinfo) {
 void *handleLocalReq(void *threadarg) {
        int val, i = 0;
        short version;
@@ -620,11 +586,8 @@ void *handleLocalReq(void *threadarg) {
        local_thread_data_array_t *localtdata;
 
        localtdata = (local_thread_data_array_t *) threadarg;
-       printf("DEBUG->Address of localtdata = %x\n", localtdata);
 
-       printf("DEBUG-> Machine Pile numread recv = %d\n", localtdata->tdata->buffer->f.numread);
        /* Counters and arrays to formulate decision on control message to be sent */
-       printf("DEBUG -> %d %d\n",localtdata->tdata->buffer->f.numread, localtdata->tdata->buffer->f.nummod);
        oidnotfound = (unsigned int *) calloc((localtdata->tdata->buffer->f.numread + localtdata->tdata->buffer->f.nummod), sizeof(unsigned int));
        oidlocked = (unsigned int *) calloc((localtdata->tdata->buffer->f.numread + localtdata->tdata->buffer->f.nummod), sizeof(unsigned int));
        oidmod = (unsigned int *) calloc(localtdata->tdata->buffer->f.nummod, sizeof(unsigned int));
@@ -750,7 +713,7 @@ void *handleLocalReq(void *threadarg) {
 
        /*Based on DecideResponse(), Either COMMIT or ABORT the operation*/
        if(*(localtdata->tdata->replyctrl) == TRANS_ABORT){
-               if(transAbortProcess(modptr,oidlocked, localtdata->transinfo->numlocked, localtdata->transinfo->nummod) != 0) {
+               if(transAbortProcess(modptr,oidlocked, localtdata->transinfo->numlocked, localtdata->transinfo->nummod, localtdata->tdata->buffer->f.numread) != 0) {
                        printf("Error in transAbortProcess() %s,%d\n", __FILE__, __LINE__);
                        return NULL;
                }
@@ -781,7 +744,7 @@ void *handleLocalReq(void *threadarg) {
 }
 /* This function completes the ABORT process if the transaction is aborting 
  */
-int transAbortProcess(void *modptr, unsigned int *objlocked, int numlocked, int nummod) {
+int transAbortProcess(void *modptr, unsigned int *objlocked, int numlocked, int nummod, int numread) {
        char *ptr;
        int i;
        objheader_t *tmp_header;
@@ -800,6 +763,7 @@ int transAbortProcess(void *modptr, unsigned int *objlocked, int numlocked, int
                header = mhashSearch(objlocked[i]);// find the header address
                ((objheader_t *)header)->status &= ~(LOCK);
        }
+       //TODO/* Unset the bit for local objects */
 
        /* Send ack to Coordinator */
        printf("DEBUG-> TRANS_SUCCESSFUL\n");
@@ -826,7 +790,6 @@ int transAbortProcess(void *modptr, unsigned int *objlocked, int numlocked, int
                 header->rcount = 1; //TODO Not sure what would be the val
 
                 /* Change ptr address in mhash table */
-                printf("DEBUG -> removing object oid = %d\n", transinfo->objmod[i]);
                 mhashRemove(transinfo->objmod[i]);
                 mhashInsert(transinfo->objmod[i], (transinfo->modptr + offset));
                 offset += sizeof(objheader_t) + classsize[header->type];
@@ -843,6 +806,7 @@ int transAbortProcess(void *modptr, unsigned int *objlocked, int numlocked, int
         }
 
         //TODO Update location lookup table
+        //TODO/* Unset the bit for local objects */
 
         /* Send ack to Coordinator */
         printf("DEBUG-> TRANS_SUCESSFUL\n");