Support for local m/c trans request ( no need to set up new connection if local)
authoradash <adash>
Thu, 28 Jun 2007 21:59:16 +0000 (21:59 +0000)
committeradash <adash>
Thu, 28 Jun 2007 21:59:16 +0000 (21:59 +0000)
Robust/src/Runtime/DSTM/interface/dstm.h
Robust/src/Runtime/DSTM/interface/dstmserver.c
Robust/src/Runtime/DSTM/interface/plookup.h
Robust/src/Runtime/DSTM/interface/trans.c

index f13a28a4e8f2af00a7e48c4075782c26b8188233..a5d41cc9fd122b9f5a64ac922a1616b0fa6acc02 100644 (file)
@@ -45,6 +45,7 @@
 #define DIRTY 0x01
 #define NEW   0x02
 #define LOCK  0x04
+#define LOCAL  0x08
 
 typedef struct objheader {
        unsigned int oid;
@@ -87,6 +88,18 @@ typedef struct trans_req_data {
        unsigned int *oidmod;
 }trans_req_data_t;
 
+// Structure passed to dstmAcceptinfo() on server side to complete TRANS_COMMIT process 
+typedef struct trans_commit_data{
+       unsigned int *objmod;
+       unsigned int *objlocked;
+       unsigned int *objnotfound;
+       void *modptr;
+       int nummod;
+       int numlocked;
+       int numnotfound;
+}trans_commit_data_t;
+
+
 #define PRINT_TID(PTR) printf("DEBUG -> %x %d\n", PTR->mid, PTR->thread_id);
 //structure for passing multiple arguments to thread
 typedef struct thread_data_array {
@@ -100,11 +113,16 @@ typedef struct thread_data_array {
        int *count;             //variable to count responses of TRANS_REQUEST protocol from all participants
        char *replyctrl;        //shared ctrl message that stores the reply to be sent, filled by decideResp
        char *replyretry;       //shared variable to find out if we need retry (TRANS_COMMIT case) 
+//     char *localstatus;     //shared variable to identify local requests
        transrecord_t *rec;     // To send modified objects
-}thread_data_array_t;
-
+} thread_data_array_t;
 
 
+//Structure for passing arguments to the local m/c thread
+typedef struct local_thread_data_array {
+       thread_data_array_t *tdata;
+       trans_commit_data_t *transinfo; //Required for trans commit process
+} local_thread_data_array_t;
 
 // Structure to save information about an oid necesaary for the decideControl()
 typedef struct objinfo {
@@ -112,16 +130,13 @@ typedef struct objinfo {
        int poss_val; //Status of object(locked but version matches, version mismatch, oid not present in machine etc) 
 }objinfo_t;
 
-// Structure passed to dstmAcceptinfo() on server side to complete TRANS_COMMIT process 
-typedef struct trans_commit_data{
-       unsigned int *objmod;
-       unsigned int *objlocked;
-       unsigned int *objnotfound;
-       void *modptr;
-       int nummod;
-       int numlocked;
-       int numnotfound;
-}trans_commit_data_t;
+//Structure for members within prefetch tuples
+typedef struct member {
+        short offset;
+        short index;
+        struct member *next;
+ }trans_member_t;
+
 
 //Structure for prefetching tuples generated by teh compiler
  typedef struct trans_prefetchtuple{
@@ -131,13 +146,6 @@ typedef struct trans_commit_data{
         struct trans_prefetchtuple *next;
  }trans_prefetchtuple_t;
 
-//Structure for members within prefetch tuples
-typedef struct member {
-        short offset;
-        short index;
-        struct member *next;
- }trans_member_t;
-
 /* Initialize main object store and lookup tables, start server thread. */
 int dstmInit(void);
 
@@ -163,16 +171,17 @@ int transCommitProcess(trans_commit_data_t *, int);
 /* end server portion */
 
 /* Prototypes for transactions */
+void randomdelay(void);
 transrecord_t *transStart();
 objheader_t *transRead(transrecord_t *, unsigned int);
 objheader_t *transCreateObj(transrecord_t *, unsigned short); //returns oid
 int transCommit(transrecord_t *record); //return 0 if successful
 void *transRequest(void *);    //the C routine that the thread will execute when TRANS_REQUEST begins
+void *handleLocalReq(void *);  //the C routine that the local m/c thread will execute 
 int decideResponse(thread_data_array_t *);// Coordinator decides what response to send to the participant
 char sendResponse(thread_data_array_t *, int); //Sends control message back to Participants
 void *getRemoteObj(transrecord_t *, unsigned int, unsigned int);
+int transAbortProcess(void *, unsigned int *, int, int);
+int transComProcess(trans_commit_data_t *);
 /* end transactions */
-
-void *getRemoteObj(transrecord_t *, unsigned int, unsigned int);
-
 #endif
index 522358000404a024e695393e46e2a46db6056a80..7b5bd7345982fa5f47c78ed18148dc2e751a024e 100644 (file)
@@ -161,6 +161,13 @@ void *dstmAccept(void *acceptfd)
                                return;
                        }
                        break;
+               case TRANS_PREFETCH:
+                       printf("DEBUG -> Recv TRANS_PREFETCH\n");
+                       if((val = prefetchReq((int)acceptfd)) != 0) {
+                               printf("Error in readClientReq\n");
+                               return;
+                       }
+                       break;
 
                default:
                        printf("DEBUG -> dstmAccept: Error Unknown opcode %d\n", control);
@@ -520,3 +527,8 @@ int transCommitProcess(trans_commit_data_t *transinfo, int acceptfd) {
        return 0;
 }
 
+
+int prefetchReq(int acceptfd) {
+
+       return 0;
+}
index 0d8f67af120fb445b6e3572aacfce1a0d080174f..f4a84e29c81619478e9c30dc8f03c43a7301747e 100644 (file)
@@ -7,6 +7,7 @@
 
 typedef struct plistnode {
        unsigned int mid;
+       int local;              /*Variable that keeps track if this pile is for LOCAL machine */
        unsigned int *oidmod;
        unsigned int *oidread;
        int nummod;
index 714826913f82123afd4da5d9da4f9f204d6777f2..010735901d5283ca9e8924d969ffdaf227a72e9e 100644 (file)
@@ -18,7 +18,9 @@
 #define RECEIVE_BUFFER_SIZE 2048
 
 extern int classsize[];
+objstr_t *mainobjstore;
 plistnode_t *createPiles(transrecord_t *);
+
 /* This functions inserts randowm wait delays in the order of msec */
 void randomdelay(void)
 {
@@ -51,6 +53,8 @@ objheader_t *transRead(transrecord_t *record, unsigned int oid)
        void *buf;
                /* Search local cache */
        if((objheader =(objheader_t *)chashSearch(record->lookupTable, oid)) != NULL){
+               //LOCAL Object
+               objheader->status |= LOCAL;
                //printf("DEBUG -> transRead oid %d found local\n", oid);
                return(objheader);
        } else if ((objheader = (objheader_t *) mhashSearch(oid)) != NULL) {
@@ -60,6 +64,8 @@ objheader_t *transRead(transrecord_t *record, unsigned int oid)
                size = sizeof(objheader_t)+classsize[tmp->type];
                objcopy = objstrAlloc(record->cache, size);
                memcpy(objcopy, (void *)tmp, size);
+               //LOCAL Object
+               ((objheader_t *) objcopy)->status |= LOCAL;
                /* Insert into cache's lookup table */
                chashInsert(record->lookupTable, objheader->oid, objcopy); 
                return(objcopy);
@@ -91,7 +97,8 @@ objheader_t *transCreateObj(transrecord_t *record, unsigned short type)
        chashInsert(record->lookupTable, tmp->oid, tmp);
        return tmp;
 }
-
+/* This function creates machine piles based on all machines involved in a
+ * transaction commit request */
 plistnode_t *createPiles(transrecord_t *record) {
        int i = 0;
        unsigned int size;/* Represents number of bins in the chash table */
@@ -118,7 +125,7 @@ plistnode_t *createPiles(transrecord_t *record) {
                               printf("Error: No such machine %s, %d\n", __FILE__, __LINE__);
                               return NULL;
                        }
-                                       
+
                        if ((headeraddr = chashSearch(record->lookupTable, curr->key)) == NULL) {
                                printf("Error: No such oid %s, %d\n", __FILE__, __LINE__);
                                return NULL;
@@ -128,6 +135,10 @@ plistnode_t *createPiles(transrecord_t *record) {
                                printf("pInsert error %s, %d\n", __FILE__, __LINE__);
                                return NULL;
                        }
+                       /* Check if local */
+                       if((headeraddr->status & LOCAL) == LOCAL) {
+                               pile->local = 1; //True i.e. local
+                       }
                        curr = next;
                }
        }
@@ -136,19 +147,21 @@ plistnode_t *createPiles(transrecord_t *record) {
 }
 /* This function initiates the transaction commit process
  * Spawns threads for each of the new connections with Participants 
- * by creating new piles,
+ * and creates new piles by calling the createPiles(),
  * Fills the piles with necesaary information and 
  * Sends a transrequest() to each pile*/
 int transCommit(transrecord_t *record) {       
        unsigned int tot_bytes_mod, *listmid;
        plistnode_t *pile;
-       int i, rc;
-       int pilecount = 0, offset, numthreads = 0, trecvcount = 0, tmachcount = 0;
+       int i, rc, val;
+       int pilecount = 0, offset, threadnum = 0, trecvcount = 0, tmachcount = 0;
        char buffer[RECEIVE_BUFFER_SIZE],control;
        char transid[TID_LEN];
        trans_req_data_t *tosend;
+       trans_commit_data_t transinfo;
        static int newtid = 0;
        char treplyctrl = 0, treplyretry = 0; /* keeps track of the common response that needs to be sent */
+       char localstat = 0;
 
        /* Look through all the objects in the transaction record and make piles 
         * for each machine involved in the transaction*/
@@ -177,6 +190,12 @@ int transCommit(transrecord_t *record) {
        
        thread_data_array_t *thread_data_array;
        thread_data_array = (thread_data_array_t *) malloc(sizeof(thread_data_array_t)*pilecount);
+       local_thread_data_array_t *ltdata;
+       if((ltdata = calloc(1, sizeof(local_thread_data_array_t))) == NULL) {
+               printf("Calloc error %s, %d\n", __FILE__, __LINE__);
+               return 1;
+       }
+
        thread_response_t rcvd_control_msg[pilecount];  /* Shared thread array that keeps track of responses of participants */
 
        /* Initialize and set thread detach attribute */
@@ -198,30 +217,48 @@ int transCommit(transrecord_t *record) {
                sprintf(tosend->f.trans_id, "%x_%d", pile->mid, newtid);
                tosend->f.mcount = pilecount;
                tosend->f.numread = pile->numread;
+               printf("DEBUG-> pile numread = %d\n", pile->numread);
                tosend->f.nummod = pile->nummod;
                tosend->f.sum_bytes = pile->sum_bytes;
                tosend->listmid = listmid;
                tosend->objread = pile->objread;
                tosend->oidmod = pile->oidmod;
-               thread_data_array[numthreads].thread_id = numthreads;
-               thread_data_array[numthreads].mid = pile->mid;
-               thread_data_array[numthreads].pilecount = pilecount;
-               thread_data_array[numthreads].buffer = tosend;
-               thread_data_array[numthreads].recvmsg = rcvd_control_msg;
-               thread_data_array[numthreads].threshold = &tcond;
-               thread_data_array[numthreads].lock = &tlock;
-               thread_data_array[numthreads].count = &trecvcount;
-               thread_data_array[numthreads].replyctrl = &treplyctrl;
-               thread_data_array[numthreads].replyretry = &treplyretry;
-               thread_data_array[numthreads].rec = record;
-
-               rc = pthread_create(&thread[numthreads], NULL, transRequest, (void *) &thread_data_array[numthreads]);  
-               if (rc) {
-                       perror("Error in pthread create");
-                       return 1;
-               }               
-               numthreads++;           
-               //TODO frees 
+               thread_data_array[threadnum].thread_id = threadnum;
+               thread_data_array[threadnum].mid = pile->mid;
+               thread_data_array[threadnum].pilecount = pilecount;
+               thread_data_array[threadnum].buffer = tosend;
+               thread_data_array[threadnum].recvmsg = rcvd_control_msg;
+               thread_data_array[threadnum].threshold = &tcond;
+               thread_data_array[threadnum].lock = &tlock;
+               thread_data_array[threadnum].count = &trecvcount;
+               //thread_data_array[threadnum].localstatus = &localstat;
+               thread_data_array[threadnum].replyctrl = &treplyctrl;
+               thread_data_array[threadnum].replyretry = &treplyretry;
+               thread_data_array[threadnum].rec = record;
+               /* If local do not create any extra connection */
+               if(pile->local != 1) {
+                       rc = pthread_create(&thread[threadnum], NULL, transRequest, (void *) &thread_data_array[threadnum]);  
+                       if (rc) {
+                               perror("Error in pthread create\n");
+                               return 1;
+                       }
+               } else {
+                       /*Unset the pile->local flag*/
+                       pile->local = 0;
+                       //header->status &= ~(LOCK);
+                       /*Handle request of local pile */
+                       /*Set flag to identify that Local machine is involved*/
+                       ltdata->tdata = &thread_data_array[threadnum];
+                       printf("DEBUG->Address of ltdata sent = %x\n", &ltdata);
+                       ltdata->transinfo = &transinfo;
+                       printf("DEBUG-> Machine Pile numread = %d\n", ltdata->tdata->buffer->f.numread);
+                       val = pthread_create(&thread[threadnum], NULL, handleLocalReq, (void *) &ltdata);
+                       if (val) {
+                               perror("Error in pthread create\n");
+                               return 1;
+                       }
+               }
+               threadnum++;            
                pile = pile->next;
        }
 
@@ -242,6 +279,8 @@ int transCommit(transrecord_t *record) {
        free(tosend);
        free(listmid);
        pDelete(pile);
+       free(thread_data_array);
+       free(ltdata);
 
        /* Retry trans commit procedure if not sucessful in the first try */
        if(treplyretry == 1) {
@@ -337,6 +376,34 @@ void *transRequest(void *threadarg) {
        (*(tdata->count))++; /* keeps track of no of messages received by the coordinator */
 
        /* Wake up the threads and invoke decideResponse (once) */
+/*
+       if((*(tdata->localstatus) & LM_EXISTS) == LM_EXISTS) { //If there is a local machine involved in the transaction
+               if(*(tdata->count) == tdata->pilecount - 1) {
+                       while(*(tdata->localstatus) & LM_UPDATED != LM_UPDATED) {
+                               ;//Do nothing and wait until Local machine thread updates the common data structure
+                       }
+                       if(decideResponse(tdata) != 0) {
+                               printf("decideResponse returned error %s,%d\n", __FILE__, __LINE__);
+                               pthread_mutex_unlock(tdata->lock);
+                               return NULL;
+                       }
+                       pthread_cond_broadcast(tdata->threshold);
+               }
+       } else if ((*(tdata->localstatus) & LM_EXISTS) == 0) { //No local m/c involved in transaction
+               if(*(tdata->count) == tdata->pilecount) {
+                       if (decideResponse(tdata) != 0) { 
+                               printf("decideResponse returned error %s,%d\n", __FILE__, __LINE__);
+                               pthread_mutex_unlock(tdata->lock);
+                               close(sd);
+                               return NULL;
+                       }
+                       pthread_cond_broadcast(tdata->threshold);
+               } else {
+                       pthread_cond_wait(tdata->threshold, tdata->lock);
+               }       
+       }
+*/
+
        if(*(tdata->count) == tdata->pilecount) {
                if (decideResponse(tdata) != 0) { 
                        printf("decideResponse returned error %s,%d\n", __FILE__, __LINE__);
@@ -347,8 +414,7 @@ void *transRequest(void *threadarg) {
                pthread_cond_broadcast(tdata->threshold);
        } else {
                pthread_cond_wait(tdata->threshold, tdata->lock);
-       }       
-
+       }
        pthread_mutex_unlock(tdata->lock);
 
        /* Send the final response such as TRANS_COMMIT or TRANS_ABORT t
@@ -450,9 +516,7 @@ char sendResponse(thread_data_array_t *tdata, int sd) {
        /* If the decided response is TRANS_ABORT */
        if(*(tdata->replyctrl) == TRANS_ABORT) {
                retval = TRANS_ABORT;
-       }
-       /* If the decided response is TRANS_COMMIT */
-       if(*(tdata->replyctrl) == TRANS_COMMIT) {
+       } else if(*(tdata->replyctrl) == TRANS_COMMIT) { /* If the decided response is TRANS_COMMIT */
                retval = TRANS_COMMIT;
        }
        /* Send response to the Participant */
@@ -535,3 +599,258 @@ void *getRemoteObj(transrecord_t *record, unsigned int mnum, unsigned int oid) {
        close(sd);
        return objcopy;
 }
+
+/*This function handles the local trans requests involved in a transaction commiting process
+ * makes a decision if the local machine sends AGREE or DISAGREE or SOFT_ABORT
+ * Activates the other nonlocal threads that are waiting for the decision and the
+ * based on common decision by all groups involved in the transaction it 
+ * either commits or aborts the transaction.
+ * It also frees the calloced memory resources
+ */
+
+//int handleLocalReq(thread_data_array_t *tdata, trans_commit_data_t *transinfo) {
+void *handleLocalReq(void *threadarg) {
+       int val, i = 0;
+       short version;
+       char control = 0, *ptr;
+       unsigned int oid;
+       unsigned int *oidnotfound = NULL, *oidlocked = NULL, *oidmod = NULL;
+       void *mobj, *modptr;
+       objheader_t *headptr;
+       local_thread_data_array_t *localtdata;
+
+       localtdata = (local_thread_data_array_t *) threadarg;
+       printf("DEBUG->Address of localtdata = %x\n", localtdata);
+
+       printf("DEBUG-> Machine Pile numread recv = %d\n", localtdata->tdata->buffer->f.numread);
+       /* Counters and arrays to formulate decision on control message to be sent */
+       printf("DEBUG -> %d %d\n",localtdata->tdata->buffer->f.numread, localtdata->tdata->buffer->f.nummod);
+       oidnotfound = (unsigned int *) calloc((localtdata->tdata->buffer->f.numread + localtdata->tdata->buffer->f.nummod), sizeof(unsigned int));
+       oidlocked = (unsigned int *) calloc((localtdata->tdata->buffer->f.numread + localtdata->tdata->buffer->f.nummod), sizeof(unsigned int));
+       oidmod = (unsigned int *) calloc(localtdata->tdata->buffer->f.nummod, sizeof(unsigned int));
+       int objnotfound = 0, objlocked = 0, objmod =0, v_nomatch = 0, v_matchlock = 0, v_matchnolock = 0;
+       int objmodnotfound = 0, nummodfound = 0;
+
+       /* modptr points to the beginning of the object store 
+        * created at the Pariticipant */ 
+       if ((modptr = objstrAlloc(mainobjstore, localtdata->tdata->buffer->f.sum_bytes)) == NULL) {
+               printf("objstrAlloc error for modified objects %s, %d\n", __FILE__, __LINE__);
+               return NULL;
+       }
+
+       ptr = modptr;
+
+       /* Process each oid in the machine pile/ group per thread */
+       for (i = 0; i < localtdata->tdata->buffer->f.numread + localtdata->tdata->buffer->f.nummod; i++) {
+               if (i < localtdata->tdata->buffer->f.numread) {//Objs only read and not modified
+                       int incr = sizeof(unsigned int) + sizeof(short);// Offset that points to next position in the objread array
+                       incr *= i;
+                       oid = *((unsigned int *)(localtdata->tdata->buffer->objread + incr));
+                       incr += sizeof(unsigned int);
+                       version = *((short *)(localtdata->tdata->buffer->objread + incr));
+               } else {//Objs modified
+                       headptr = (objheader_t *) ptr;
+                       oid = headptr->oid;
+                       oidmod[objmod] = oid;//Array containing modified oids
+                       objmod++;
+                       version = headptr->version;
+                       ptr += sizeof(objheader_t) + classsize[headptr->type];
+               }
+
+               /* Check if object is still present in the machine since the beginning of TRANS_REQUEST */
+
+               /* Save the oids not found and number of oids not found for later use */
+               if ((mobj = mhashSearch(oid)) == NULL) {/* Obj not found */
+                       /* Save the oids not found and number of oids not found for later use */
+
+                       oidnotfound[objnotfound] = ((objheader_t *)mobj)->oid;
+                       objnotfound++;
+               } else { /* If Obj found in machine (i.e. has not moved) */
+                       /* Check if Obj is locked by any previous transaction */
+                       if ((((objheader_t *)mobj)->status & LOCK) == LOCK) {
+                               if (version == ((objheader_t *)mobj)->version) {      /* If not locked then match versions */ 
+                                       v_matchlock++;
+                               } else {/* If versions don't match ...HARD ABORT */
+                                       v_nomatch++;
+                                       /* Send TRANS_DISAGREE to Coordinator */
+                                       localtdata->tdata->recvmsg[localtdata->tdata->thread_id].rcv_status = TRANS_DISAGREE;
+                                       printf("DEBUG -> Sending TRANS_DISAGREE\n");
+                                       //return tdata->recvmsg[tdata->thread_id].rcv_status;  
+                               }
+                       } else {/* If Obj is not locked then lock object */
+                               ((objheader_t *)mobj)->status |= LOCK;
+                               //TODO Remove this for Testing
+                               randomdelay();
+
+                               /* Save all object oids that are locked on this machine during this transaction request call */
+                               oidlocked[objlocked] = ((objheader_t *)mobj)->oid;
+                               objlocked++;
+                               if (version == ((objheader_t *)mobj)->version) { /* Check if versions match */
+                                       v_matchnolock++;
+                               } else { /* If versions don't match ...HARD ABORT */
+                                       v_nomatch++;
+                                       /* Send TRANS_DISAGREE to Coordinator */
+                                       localtdata->tdata->recvmsg[localtdata->tdata->thread_id].rcv_status = TRANS_DISAGREE;
+                                       printf("DEBUG -> Sending TRANS_DISAGREE\n");
+                               //      return tdata->recvmsg[tdata->thread_id].rcv_status;  
+                               }
+                       }
+               }
+       }
+
+       /*Decide the response to be sent to the Coordinator( the local machine in this case)*/
+
+       /* Condition to send TRANS_AGREE */
+       if(v_matchnolock == localtdata->tdata->buffer->f.numread + localtdata->tdata->buffer->f.nummod) {
+               localtdata->tdata->recvmsg[localtdata->tdata->thread_id].rcv_status = TRANS_AGREE;
+               printf("DEBUG -> Sending TRANS_AGREE\n");
+       }
+       /* Condition to send TRANS_SOFT_ABORT */
+       if((v_matchlock > 0 && v_nomatch == 0) || (objnotfound > 0 && v_nomatch == 0)) {
+               localtdata->tdata->recvmsg[localtdata->tdata->thread_id].rcv_status = TRANS_SOFT_ABORT;
+               printf("DEBUG -> Sending TRANS_SOFT_ABORT\n");
+               /* Send number of oids not found and the missing oids if objects are missing in the machine */
+               /* TODO Remember to store the oidnotfound for later use
+               if(objnotfound != 0) {
+                       int size = sizeof(unsigned int)* objnotfound;
+               }
+               */
+       }
+
+       /* Fill out the trans_commit_data_t data structure. This is required for a trans commit process
+        * if Participant receives a TRANS_COMMIT */
+       localtdata->transinfo->objmod = oidmod;
+       localtdata->transinfo->objlocked = oidlocked;
+       localtdata->transinfo->objnotfound = oidnotfound;
+       localtdata->transinfo->modptr = modptr;
+       localtdata->transinfo->nummod = localtdata->tdata->buffer->f.nummod;
+       localtdata->transinfo->numlocked = objlocked;
+       localtdata->transinfo->numnotfound = objnotfound;
+
+       /*Set flag to show that common data structure for this individual thread has been written to */
+       //*(tdata->localstatus) |= LM_UPDATED;
+       
+       /* Lock and update count */
+       //Thread sleeps until all messages from pariticipants are received by coordinator
+       pthread_mutex_lock(localtdata->tdata->lock);
+       (*(localtdata->tdata->count))++; /* keeps track of no of messages received by the coordinator */
+
+       /* Wake up the threads and invoke decideResponse (once) */
+       if(*(localtdata->tdata->count) == localtdata->tdata->pilecount) {
+               if (decideResponse(localtdata->tdata) != 0) { 
+                       printf("decideResponse returned error %s,%d\n", __FILE__, __LINE__);
+                       pthread_mutex_unlock(localtdata->tdata->lock);
+                       return NULL;
+               }
+               pthread_cond_broadcast(localtdata->tdata->threshold);
+       } else {
+               pthread_cond_wait(localtdata->tdata->threshold, localtdata->tdata->lock);
+       }
+       pthread_mutex_unlock(localtdata->tdata->lock);
+
+       /*Based on DecideResponse(), Either COMMIT or ABORT the operation*/
+       if(*(localtdata->tdata->replyctrl) == TRANS_ABORT){
+               if(transAbortProcess(modptr,oidlocked, localtdata->transinfo->numlocked, localtdata->transinfo->nummod) != 0) {
+                       printf("Error in transAbortProcess() %s,%d\n", __FILE__, __LINE__);
+                       return NULL;
+               }
+       }else if(*(localtdata->tdata->replyctrl) == TRANS_COMMIT){
+               if(transComProcess(localtdata->transinfo) != 0) {
+                       printf("Error in transComProcess() %s,%d\n", __FILE__, __LINE__);
+                       return NULL;
+               }
+       }
+
+       /* Free memory */
+       printf("DEBUG -> Freeing...\n");
+       fflush(stdout);
+       if (localtdata->transinfo->objmod != NULL) {
+               free(localtdata->transinfo->objmod);
+               localtdata->transinfo->objmod = NULL;
+       }
+       if (localtdata->transinfo->objlocked != NULL) {
+               free(localtdata->transinfo->objlocked);
+               localtdata->transinfo->objlocked = NULL;
+       }
+       if (localtdata->transinfo->objnotfound != NULL) {
+               free(localtdata->transinfo->objnotfound);
+               localtdata->transinfo->objnotfound = NULL;
+       }
+       
+       pthread_exit(NULL);
+}
+/* This function completes the ABORT process if the transaction is aborting 
+ */
+int transAbortProcess(void *modptr, unsigned int *objlocked, int numlocked, int nummod) {
+       char *ptr;
+       int i;
+       objheader_t *tmp_header;
+       void *header;
+
+       printf("DEBUG -> Recv TRANS_ABORT\n");
+       /* Set all ref counts as 1 and do garbage collection */
+       ptr = modptr;
+       for(i = 0; i< nummod; i++) {
+               tmp_header = (objheader_t *)ptr;
+               tmp_header->rcount = 1;
+               ptr += sizeof(objheader_t) + classsize[tmp_header->type];
+       }
+       /* Unlock objects that was locked due to this transaction */
+       for(i = 0; i< numlocked; i++) {
+               header = mhashSearch(objlocked[i]);// find the header address
+               ((objheader_t *)header)->status &= ~(LOCK);
+       }
+
+       /* Send ack to Coordinator */
+       printf("DEBUG-> TRANS_SUCCESSFUL\n");
+
+       /*Free the pointer */
+       ptr = NULL;
+       return 0;
+}
+
+/*This function completes the COMMIT process is the transaction is commiting
+ */
+ int transComProcess(trans_commit_data_t *transinfo) {
+        objheader_t *header;
+        int i = 0, offset = 0;
+        char control;
+        
+        printf("DEBUG -> Recv TRANS_COMMIT\n");
+        /* Process each modified object saved in the mainobject store */
+        for(i=0; i<transinfo->nummod; i++) {
+                if((header = (objheader_t *) mhashSearch(transinfo->objmod[i])) == NULL) {
+                        printf("mhashsearch returns NULL at %s, %d\n", __FILE__, __LINE__);
+                }
+                /* Change reference count of older address and free space in objstr ?? */
+                header->rcount = 1; //TODO Not sure what would be the val
+
+                /* Change ptr address in mhash table */
+                printf("DEBUG -> removing object oid = %d\n", transinfo->objmod[i]);
+                mhashRemove(transinfo->objmod[i]);
+                mhashInsert(transinfo->objmod[i], (transinfo->modptr + offset));
+                offset += sizeof(objheader_t) + classsize[header->type];
+
+                /* Update object version number */
+                header = (objheader_t *) mhashSearch(transinfo->objmod[i]);
+                header->version += 1;
+        }
+
+        /* Unlock locked objects */
+        for(i=0; i<transinfo->numlocked; i++) {
+                header = (objheader_t *) mhashSearch(transinfo->objlocked[i]);
+                header->status &= ~(LOCK);
+        }
+
+        //TODO Update location lookup table
+
+        /* Send ack to Coordinator */
+        printf("DEBUG-> TRANS_SUCESSFUL\n");
+        return 0;
+ }
+
+/*This function makes piles to prefetch records and prefetches the oids from remote machines */
+int transPrefetch(transrecord_t *record, trans_prefetchtuple_t *prefetchtuple){
+       /* Create Pile*/
+       /* For each Pile in the machine send TRANS_PREFETCH */
+}