From f7c4e59095efda12887fd8c5135c72ad79b7a687 Mon Sep 17 00:00:00 2001 From: adash Date: Thu, 28 Jun 2007 21:59:16 +0000 Subject: [PATCH] Support for local m/c trans request ( no need to set up new connection if local) --- Robust/src/Runtime/DSTM/interface/dstm.h | 53 ++- .../src/Runtime/DSTM/interface/dstmserver.c | 12 + Robust/src/Runtime/DSTM/interface/plookup.h | 1 + Robust/src/Runtime/DSTM/interface/trans.c | 377 ++++++++++++++++-- 4 files changed, 392 insertions(+), 51 deletions(-) diff --git a/Robust/src/Runtime/DSTM/interface/dstm.h b/Robust/src/Runtime/DSTM/interface/dstm.h index f13a28a4..a5d41cc9 100644 --- a/Robust/src/Runtime/DSTM/interface/dstm.h +++ b/Robust/src/Runtime/DSTM/interface/dstm.h @@ -45,6 +45,7 @@ #define DIRTY 0x01 #define NEW 0x02 #define LOCK 0x04 +#define LOCAL 0x08 typedef struct objheader { unsigned int oid; @@ -87,6 +88,18 @@ typedef struct trans_req_data { unsigned int *oidmod; }trans_req_data_t; +// Structure passed to dstmAcceptinfo() on server side to complete TRANS_COMMIT process +typedef struct trans_commit_data{ + unsigned int *objmod; + unsigned int *objlocked; + unsigned int *objnotfound; + void *modptr; + int nummod; + int numlocked; + int numnotfound; +}trans_commit_data_t; + + #define PRINT_TID(PTR) printf("DEBUG -> %x %d\n", PTR->mid, PTR->thread_id); //structure for passing multiple arguments to thread typedef struct thread_data_array { @@ -100,11 +113,16 @@ typedef struct thread_data_array { int *count; //variable to count responses of TRANS_REQUEST protocol from all participants char *replyctrl; //shared ctrl message that stores the reply to be sent, filled by decideResp char *replyretry; //shared variable to find out if we need retry (TRANS_COMMIT case) +// char *localstatus; //shared variable to identify local requests transrecord_t *rec; // To send modified objects -}thread_data_array_t; - +} thread_data_array_t; +//Structure for passing arguments to the local m/c thread +typedef struct local_thread_data_array { + thread_data_array_t *tdata; + trans_commit_data_t *transinfo; //Required for trans commit process +} local_thread_data_array_t; // Structure to save information about an oid necesaary for the decideControl() typedef struct objinfo { @@ -112,16 +130,13 @@ typedef struct objinfo { int poss_val; //Status of object(locked but version matches, version mismatch, oid not present in machine etc) }objinfo_t; -// Structure passed to dstmAcceptinfo() on server side to complete TRANS_COMMIT process -typedef struct trans_commit_data{ - unsigned int *objmod; - unsigned int *objlocked; - unsigned int *objnotfound; - void *modptr; - int nummod; - int numlocked; - int numnotfound; -}trans_commit_data_t; +//Structure for members within prefetch tuples +typedef struct member { + short offset; + short index; + struct member *next; + }trans_member_t; + //Structure for prefetching tuples generated by teh compiler typedef struct trans_prefetchtuple{ @@ -131,13 +146,6 @@ typedef struct trans_commit_data{ struct trans_prefetchtuple *next; }trans_prefetchtuple_t; -//Structure for members within prefetch tuples -typedef struct member { - short offset; - short index; - struct member *next; - }trans_member_t; - /* Initialize main object store and lookup tables, start server thread. */ int dstmInit(void); @@ -163,16 +171,17 @@ int transCommitProcess(trans_commit_data_t *, int); /* end server portion */ /* Prototypes for transactions */ +void randomdelay(void); transrecord_t *transStart(); objheader_t *transRead(transrecord_t *, unsigned int); objheader_t *transCreateObj(transrecord_t *, unsigned short); //returns oid int transCommit(transrecord_t *record); //return 0 if successful void *transRequest(void *); //the C routine that the thread will execute when TRANS_REQUEST begins +void *handleLocalReq(void *); //the C routine that the local m/c thread will execute int decideResponse(thread_data_array_t *);// Coordinator decides what response to send to the participant char sendResponse(thread_data_array_t *, int); //Sends control message back to Participants void *getRemoteObj(transrecord_t *, unsigned int, unsigned int); +int transAbortProcess(void *, unsigned int *, int, int); +int transComProcess(trans_commit_data_t *); /* end transactions */ - -void *getRemoteObj(transrecord_t *, unsigned int, unsigned int); - #endif diff --git a/Robust/src/Runtime/DSTM/interface/dstmserver.c b/Robust/src/Runtime/DSTM/interface/dstmserver.c index 52235800..7b5bd734 100644 --- a/Robust/src/Runtime/DSTM/interface/dstmserver.c +++ b/Robust/src/Runtime/DSTM/interface/dstmserver.c @@ -161,6 +161,13 @@ void *dstmAccept(void *acceptfd) return; } break; + case TRANS_PREFETCH: + printf("DEBUG -> Recv TRANS_PREFETCH\n"); + if((val = prefetchReq((int)acceptfd)) != 0) { + printf("Error in readClientReq\n"); + return; + } + break; default: printf("DEBUG -> dstmAccept: Error Unknown opcode %d\n", control); @@ -520,3 +527,8 @@ int transCommitProcess(trans_commit_data_t *transinfo, int acceptfd) { return 0; } + +int prefetchReq(int acceptfd) { + + return 0; +} diff --git a/Robust/src/Runtime/DSTM/interface/plookup.h b/Robust/src/Runtime/DSTM/interface/plookup.h index 0d8f67af..f4a84e29 100644 --- a/Robust/src/Runtime/DSTM/interface/plookup.h +++ b/Robust/src/Runtime/DSTM/interface/plookup.h @@ -7,6 +7,7 @@ typedef struct plistnode { unsigned int mid; + int local; /*Variable that keeps track if this pile is for LOCAL machine */ unsigned int *oidmod; unsigned int *oidread; int nummod; diff --git a/Robust/src/Runtime/DSTM/interface/trans.c b/Robust/src/Runtime/DSTM/interface/trans.c index 71482691..01073590 100644 --- a/Robust/src/Runtime/DSTM/interface/trans.c +++ b/Robust/src/Runtime/DSTM/interface/trans.c @@ -18,7 +18,9 @@ #define RECEIVE_BUFFER_SIZE 2048 extern int classsize[]; +objstr_t *mainobjstore; plistnode_t *createPiles(transrecord_t *); + /* This functions inserts randowm wait delays in the order of msec */ void randomdelay(void) { @@ -51,6 +53,8 @@ objheader_t *transRead(transrecord_t *record, unsigned int oid) void *buf; /* Search local cache */ if((objheader =(objheader_t *)chashSearch(record->lookupTable, oid)) != NULL){ + //LOCAL Object + objheader->status |= LOCAL; //printf("DEBUG -> transRead oid %d found local\n", oid); return(objheader); } else if ((objheader = (objheader_t *) mhashSearch(oid)) != NULL) { @@ -60,6 +64,8 @@ objheader_t *transRead(transrecord_t *record, unsigned int oid) size = sizeof(objheader_t)+classsize[tmp->type]; objcopy = objstrAlloc(record->cache, size); memcpy(objcopy, (void *)tmp, size); + //LOCAL Object + ((objheader_t *) objcopy)->status |= LOCAL; /* Insert into cache's lookup table */ chashInsert(record->lookupTable, objheader->oid, objcopy); return(objcopy); @@ -91,7 +97,8 @@ objheader_t *transCreateObj(transrecord_t *record, unsigned short type) chashInsert(record->lookupTable, tmp->oid, tmp); return tmp; } - +/* This function creates machine piles based on all machines involved in a + * transaction commit request */ plistnode_t *createPiles(transrecord_t *record) { int i = 0; unsigned int size;/* Represents number of bins in the chash table */ @@ -118,7 +125,7 @@ plistnode_t *createPiles(transrecord_t *record) { printf("Error: No such machine %s, %d\n", __FILE__, __LINE__); return NULL; } - + if ((headeraddr = chashSearch(record->lookupTable, curr->key)) == NULL) { printf("Error: No such oid %s, %d\n", __FILE__, __LINE__); return NULL; @@ -128,6 +135,10 @@ plistnode_t *createPiles(transrecord_t *record) { printf("pInsert error %s, %d\n", __FILE__, __LINE__); return NULL; } + /* Check if local */ + if((headeraddr->status & LOCAL) == LOCAL) { + pile->local = 1; //True i.e. local + } curr = next; } } @@ -136,19 +147,21 @@ plistnode_t *createPiles(transrecord_t *record) { } /* This function initiates the transaction commit process * Spawns threads for each of the new connections with Participants - * by creating new piles, + * and creates new piles by calling the createPiles(), * Fills the piles with necesaary information and * Sends a transrequest() to each pile*/ int transCommit(transrecord_t *record) { unsigned int tot_bytes_mod, *listmid; plistnode_t *pile; - int i, rc; - int pilecount = 0, offset, numthreads = 0, trecvcount = 0, tmachcount = 0; + int i, rc, val; + int pilecount = 0, offset, threadnum = 0, trecvcount = 0, tmachcount = 0; char buffer[RECEIVE_BUFFER_SIZE],control; char transid[TID_LEN]; trans_req_data_t *tosend; + trans_commit_data_t transinfo; static int newtid = 0; char treplyctrl = 0, treplyretry = 0; /* keeps track of the common response that needs to be sent */ + char localstat = 0; /* Look through all the objects in the transaction record and make piles * for each machine involved in the transaction*/ @@ -177,6 +190,12 @@ int transCommit(transrecord_t *record) { thread_data_array_t *thread_data_array; thread_data_array = (thread_data_array_t *) malloc(sizeof(thread_data_array_t)*pilecount); + local_thread_data_array_t *ltdata; + if((ltdata = calloc(1, sizeof(local_thread_data_array_t))) == NULL) { + printf("Calloc error %s, %d\n", __FILE__, __LINE__); + return 1; + } + thread_response_t rcvd_control_msg[pilecount]; /* Shared thread array that keeps track of responses of participants */ /* Initialize and set thread detach attribute */ @@ -198,30 +217,48 @@ int transCommit(transrecord_t *record) { sprintf(tosend->f.trans_id, "%x_%d", pile->mid, newtid); tosend->f.mcount = pilecount; tosend->f.numread = pile->numread; + printf("DEBUG-> pile numread = %d\n", pile->numread); tosend->f.nummod = pile->nummod; tosend->f.sum_bytes = pile->sum_bytes; tosend->listmid = listmid; tosend->objread = pile->objread; tosend->oidmod = pile->oidmod; - thread_data_array[numthreads].thread_id = numthreads; - thread_data_array[numthreads].mid = pile->mid; - thread_data_array[numthreads].pilecount = pilecount; - thread_data_array[numthreads].buffer = tosend; - thread_data_array[numthreads].recvmsg = rcvd_control_msg; - thread_data_array[numthreads].threshold = &tcond; - thread_data_array[numthreads].lock = &tlock; - thread_data_array[numthreads].count = &trecvcount; - thread_data_array[numthreads].replyctrl = &treplyctrl; - thread_data_array[numthreads].replyretry = &treplyretry; - thread_data_array[numthreads].rec = record; - - rc = pthread_create(&thread[numthreads], NULL, transRequest, (void *) &thread_data_array[numthreads]); - if (rc) { - perror("Error in pthread create"); - return 1; - } - numthreads++; - //TODO frees + thread_data_array[threadnum].thread_id = threadnum; + thread_data_array[threadnum].mid = pile->mid; + thread_data_array[threadnum].pilecount = pilecount; + thread_data_array[threadnum].buffer = tosend; + thread_data_array[threadnum].recvmsg = rcvd_control_msg; + thread_data_array[threadnum].threshold = &tcond; + thread_data_array[threadnum].lock = &tlock; + thread_data_array[threadnum].count = &trecvcount; + //thread_data_array[threadnum].localstatus = &localstat; + thread_data_array[threadnum].replyctrl = &treplyctrl; + thread_data_array[threadnum].replyretry = &treplyretry; + thread_data_array[threadnum].rec = record; + /* If local do not create any extra connection */ + if(pile->local != 1) { + rc = pthread_create(&thread[threadnum], NULL, transRequest, (void *) &thread_data_array[threadnum]); + if (rc) { + perror("Error in pthread create\n"); + return 1; + } + } else { + /*Unset the pile->local flag*/ + pile->local = 0; + //header->status &= ~(LOCK); + /*Handle request of local pile */ + /*Set flag to identify that Local machine is involved*/ + ltdata->tdata = &thread_data_array[threadnum]; + printf("DEBUG->Address of ltdata sent = %x\n", <data); + ltdata->transinfo = &transinfo; + printf("DEBUG-> Machine Pile numread = %d\n", ltdata->tdata->buffer->f.numread); + val = pthread_create(&thread[threadnum], NULL, handleLocalReq, (void *) <data); + if (val) { + perror("Error in pthread create\n"); + return 1; + } + } + threadnum++; pile = pile->next; } @@ -242,6 +279,8 @@ int transCommit(transrecord_t *record) { free(tosend); free(listmid); pDelete(pile); + free(thread_data_array); + free(ltdata); /* Retry trans commit procedure if not sucessful in the first try */ if(treplyretry == 1) { @@ -337,6 +376,34 @@ void *transRequest(void *threadarg) { (*(tdata->count))++; /* keeps track of no of messages received by the coordinator */ /* Wake up the threads and invoke decideResponse (once) */ +/* + if((*(tdata->localstatus) & LM_EXISTS) == LM_EXISTS) { //If there is a local machine involved in the transaction + if(*(tdata->count) == tdata->pilecount - 1) { + while(*(tdata->localstatus) & LM_UPDATED != LM_UPDATED) { + ;//Do nothing and wait until Local machine thread updates the common data structure + } + if(decideResponse(tdata) != 0) { + printf("decideResponse returned error %s,%d\n", __FILE__, __LINE__); + pthread_mutex_unlock(tdata->lock); + return NULL; + } + pthread_cond_broadcast(tdata->threshold); + } + } else if ((*(tdata->localstatus) & LM_EXISTS) == 0) { //No local m/c involved in transaction + if(*(tdata->count) == tdata->pilecount) { + if (decideResponse(tdata) != 0) { + printf("decideResponse returned error %s,%d\n", __FILE__, __LINE__); + pthread_mutex_unlock(tdata->lock); + close(sd); + return NULL; + } + pthread_cond_broadcast(tdata->threshold); + } else { + pthread_cond_wait(tdata->threshold, tdata->lock); + } + } +*/ + if(*(tdata->count) == tdata->pilecount) { if (decideResponse(tdata) != 0) { printf("decideResponse returned error %s,%d\n", __FILE__, __LINE__); @@ -347,8 +414,7 @@ void *transRequest(void *threadarg) { pthread_cond_broadcast(tdata->threshold); } else { pthread_cond_wait(tdata->threshold, tdata->lock); - } - + } pthread_mutex_unlock(tdata->lock); /* Send the final response such as TRANS_COMMIT or TRANS_ABORT t @@ -450,9 +516,7 @@ char sendResponse(thread_data_array_t *tdata, int sd) { /* If the decided response is TRANS_ABORT */ if(*(tdata->replyctrl) == TRANS_ABORT) { retval = TRANS_ABORT; - } - /* If the decided response is TRANS_COMMIT */ - if(*(tdata->replyctrl) == TRANS_COMMIT) { + } else if(*(tdata->replyctrl) == TRANS_COMMIT) { /* If the decided response is TRANS_COMMIT */ retval = TRANS_COMMIT; } /* Send response to the Participant */ @@ -535,3 +599,258 @@ void *getRemoteObj(transrecord_t *record, unsigned int mnum, unsigned int oid) { close(sd); return objcopy; } + +/*This function handles the local trans requests involved in a transaction commiting process + * makes a decision if the local machine sends AGREE or DISAGREE or SOFT_ABORT + * Activates the other nonlocal threads that are waiting for the decision and the + * based on common decision by all groups involved in the transaction it + * either commits or aborts the transaction. + * It also frees the calloced memory resources + */ + +//int handleLocalReq(thread_data_array_t *tdata, trans_commit_data_t *transinfo) { +void *handleLocalReq(void *threadarg) { + int val, i = 0; + short version; + char control = 0, *ptr; + unsigned int oid; + unsigned int *oidnotfound = NULL, *oidlocked = NULL, *oidmod = NULL; + void *mobj, *modptr; + objheader_t *headptr; + local_thread_data_array_t *localtdata; + + localtdata = (local_thread_data_array_t *) threadarg; + printf("DEBUG->Address of localtdata = %x\n", localtdata); + + printf("DEBUG-> Machine Pile numread recv = %d\n", localtdata->tdata->buffer->f.numread); + /* Counters and arrays to formulate decision on control message to be sent */ + printf("DEBUG -> %d %d\n",localtdata->tdata->buffer->f.numread, localtdata->tdata->buffer->f.nummod); + oidnotfound = (unsigned int *) calloc((localtdata->tdata->buffer->f.numread + localtdata->tdata->buffer->f.nummod), sizeof(unsigned int)); + oidlocked = (unsigned int *) calloc((localtdata->tdata->buffer->f.numread + localtdata->tdata->buffer->f.nummod), sizeof(unsigned int)); + oidmod = (unsigned int *) calloc(localtdata->tdata->buffer->f.nummod, sizeof(unsigned int)); + int objnotfound = 0, objlocked = 0, objmod =0, v_nomatch = 0, v_matchlock = 0, v_matchnolock = 0; + int objmodnotfound = 0, nummodfound = 0; + + /* modptr points to the beginning of the object store + * created at the Pariticipant */ + if ((modptr = objstrAlloc(mainobjstore, localtdata->tdata->buffer->f.sum_bytes)) == NULL) { + printf("objstrAlloc error for modified objects %s, %d\n", __FILE__, __LINE__); + return NULL; + } + + ptr = modptr; + + /* Process each oid in the machine pile/ group per thread */ + for (i = 0; i < localtdata->tdata->buffer->f.numread + localtdata->tdata->buffer->f.nummod; i++) { + if (i < localtdata->tdata->buffer->f.numread) {//Objs only read and not modified + int incr = sizeof(unsigned int) + sizeof(short);// Offset that points to next position in the objread array + incr *= i; + oid = *((unsigned int *)(localtdata->tdata->buffer->objread + incr)); + incr += sizeof(unsigned int); + version = *((short *)(localtdata->tdata->buffer->objread + incr)); + } else {//Objs modified + headptr = (objheader_t *) ptr; + oid = headptr->oid; + oidmod[objmod] = oid;//Array containing modified oids + objmod++; + version = headptr->version; + ptr += sizeof(objheader_t) + classsize[headptr->type]; + } + + /* Check if object is still present in the machine since the beginning of TRANS_REQUEST */ + + /* Save the oids not found and number of oids not found for later use */ + if ((mobj = mhashSearch(oid)) == NULL) {/* Obj not found */ + /* Save the oids not found and number of oids not found for later use */ + + oidnotfound[objnotfound] = ((objheader_t *)mobj)->oid; + objnotfound++; + } else { /* If Obj found in machine (i.e. has not moved) */ + /* Check if Obj is locked by any previous transaction */ + if ((((objheader_t *)mobj)->status & LOCK) == LOCK) { + if (version == ((objheader_t *)mobj)->version) { /* If not locked then match versions */ + v_matchlock++; + } else {/* If versions don't match ...HARD ABORT */ + v_nomatch++; + /* Send TRANS_DISAGREE to Coordinator */ + localtdata->tdata->recvmsg[localtdata->tdata->thread_id].rcv_status = TRANS_DISAGREE; + printf("DEBUG -> Sending TRANS_DISAGREE\n"); + //return tdata->recvmsg[tdata->thread_id].rcv_status; + } + } else {/* If Obj is not locked then lock object */ + ((objheader_t *)mobj)->status |= LOCK; + //TODO Remove this for Testing + randomdelay(); + + /* Save all object oids that are locked on this machine during this transaction request call */ + oidlocked[objlocked] = ((objheader_t *)mobj)->oid; + objlocked++; + if (version == ((objheader_t *)mobj)->version) { /* Check if versions match */ + v_matchnolock++; + } else { /* If versions don't match ...HARD ABORT */ + v_nomatch++; + /* Send TRANS_DISAGREE to Coordinator */ + localtdata->tdata->recvmsg[localtdata->tdata->thread_id].rcv_status = TRANS_DISAGREE; + printf("DEBUG -> Sending TRANS_DISAGREE\n"); + // return tdata->recvmsg[tdata->thread_id].rcv_status; + } + } + } + } + + /*Decide the response to be sent to the Coordinator( the local machine in this case)*/ + + /* Condition to send TRANS_AGREE */ + if(v_matchnolock == localtdata->tdata->buffer->f.numread + localtdata->tdata->buffer->f.nummod) { + localtdata->tdata->recvmsg[localtdata->tdata->thread_id].rcv_status = TRANS_AGREE; + printf("DEBUG -> Sending TRANS_AGREE\n"); + } + /* Condition to send TRANS_SOFT_ABORT */ + if((v_matchlock > 0 && v_nomatch == 0) || (objnotfound > 0 && v_nomatch == 0)) { + localtdata->tdata->recvmsg[localtdata->tdata->thread_id].rcv_status = TRANS_SOFT_ABORT; + printf("DEBUG -> Sending TRANS_SOFT_ABORT\n"); + /* Send number of oids not found and the missing oids if objects are missing in the machine */ + /* TODO Remember to store the oidnotfound for later use + if(objnotfound != 0) { + int size = sizeof(unsigned int)* objnotfound; + } + */ + } + + /* Fill out the trans_commit_data_t data structure. This is required for a trans commit process + * if Participant receives a TRANS_COMMIT */ + localtdata->transinfo->objmod = oidmod; + localtdata->transinfo->objlocked = oidlocked; + localtdata->transinfo->objnotfound = oidnotfound; + localtdata->transinfo->modptr = modptr; + localtdata->transinfo->nummod = localtdata->tdata->buffer->f.nummod; + localtdata->transinfo->numlocked = objlocked; + localtdata->transinfo->numnotfound = objnotfound; + + /*Set flag to show that common data structure for this individual thread has been written to */ + //*(tdata->localstatus) |= LM_UPDATED; + + /* Lock and update count */ + //Thread sleeps until all messages from pariticipants are received by coordinator + pthread_mutex_lock(localtdata->tdata->lock); + (*(localtdata->tdata->count))++; /* keeps track of no of messages received by the coordinator */ + + /* Wake up the threads and invoke decideResponse (once) */ + if(*(localtdata->tdata->count) == localtdata->tdata->pilecount) { + if (decideResponse(localtdata->tdata) != 0) { + printf("decideResponse returned error %s,%d\n", __FILE__, __LINE__); + pthread_mutex_unlock(localtdata->tdata->lock); + return NULL; + } + pthread_cond_broadcast(localtdata->tdata->threshold); + } else { + pthread_cond_wait(localtdata->tdata->threshold, localtdata->tdata->lock); + } + pthread_mutex_unlock(localtdata->tdata->lock); + + /*Based on DecideResponse(), Either COMMIT or ABORT the operation*/ + if(*(localtdata->tdata->replyctrl) == TRANS_ABORT){ + if(transAbortProcess(modptr,oidlocked, localtdata->transinfo->numlocked, localtdata->transinfo->nummod) != 0) { + printf("Error in transAbortProcess() %s,%d\n", __FILE__, __LINE__); + return NULL; + } + }else if(*(localtdata->tdata->replyctrl) == TRANS_COMMIT){ + if(transComProcess(localtdata->transinfo) != 0) { + printf("Error in transComProcess() %s,%d\n", __FILE__, __LINE__); + return NULL; + } + } + + /* Free memory */ + printf("DEBUG -> Freeing...\n"); + fflush(stdout); + if (localtdata->transinfo->objmod != NULL) { + free(localtdata->transinfo->objmod); + localtdata->transinfo->objmod = NULL; + } + if (localtdata->transinfo->objlocked != NULL) { + free(localtdata->transinfo->objlocked); + localtdata->transinfo->objlocked = NULL; + } + if (localtdata->transinfo->objnotfound != NULL) { + free(localtdata->transinfo->objnotfound); + localtdata->transinfo->objnotfound = NULL; + } + + pthread_exit(NULL); +} +/* This function completes the ABORT process if the transaction is aborting + */ +int transAbortProcess(void *modptr, unsigned int *objlocked, int numlocked, int nummod) { + char *ptr; + int i; + objheader_t *tmp_header; + void *header; + + printf("DEBUG -> Recv TRANS_ABORT\n"); + /* Set all ref counts as 1 and do garbage collection */ + ptr = modptr; + for(i = 0; i< nummod; i++) { + tmp_header = (objheader_t *)ptr; + tmp_header->rcount = 1; + ptr += sizeof(objheader_t) + classsize[tmp_header->type]; + } + /* Unlock objects that was locked due to this transaction */ + for(i = 0; i< numlocked; i++) { + header = mhashSearch(objlocked[i]);// find the header address + ((objheader_t *)header)->status &= ~(LOCK); + } + + /* Send ack to Coordinator */ + printf("DEBUG-> TRANS_SUCCESSFUL\n"); + + /*Free the pointer */ + ptr = NULL; + return 0; +} + +/*This function completes the COMMIT process is the transaction is commiting + */ + int transComProcess(trans_commit_data_t *transinfo) { + objheader_t *header; + int i = 0, offset = 0; + char control; + + printf("DEBUG -> Recv TRANS_COMMIT\n"); + /* Process each modified object saved in the mainobject store */ + for(i=0; inummod; i++) { + if((header = (objheader_t *) mhashSearch(transinfo->objmod[i])) == NULL) { + printf("mhashsearch returns NULL at %s, %d\n", __FILE__, __LINE__); + } + /* Change reference count of older address and free space in objstr ?? */ + header->rcount = 1; //TODO Not sure what would be the val + + /* Change ptr address in mhash table */ + printf("DEBUG -> removing object oid = %d\n", transinfo->objmod[i]); + mhashRemove(transinfo->objmod[i]); + mhashInsert(transinfo->objmod[i], (transinfo->modptr + offset)); + offset += sizeof(objheader_t) + classsize[header->type]; + + /* Update object version number */ + header = (objheader_t *) mhashSearch(transinfo->objmod[i]); + header->version += 1; + } + + /* Unlock locked objects */ + for(i=0; inumlocked; i++) { + header = (objheader_t *) mhashSearch(transinfo->objlocked[i]); + header->status &= ~(LOCK); + } + + //TODO Update location lookup table + + /* Send ack to Coordinator */ + printf("DEBUG-> TRANS_SUCESSFUL\n"); + return 0; + } + +/*This function makes piles to prefetch records and prefetches the oids from remote machines */ +int transPrefetch(transrecord_t *record, trans_prefetchtuple_t *prefetchtuple){ + /* Create Pile*/ + /* For each Pile in the machine send TRANS_PREFETCH */ +} -- 2.34.1