From: adash Date: Fri, 7 Sep 2007 18:21:30 +0000 (+0000) Subject: Comments added and several minor changes to get rid of extra variables X-Git-Tag: preEdgeChange~464 X-Git-Url: http://demsky.eecs.uci.edu/git/?a=commitdiff_plain;h=aebe4a0ab803b8a5726219d0d43a4cc0248b1bdc;p=IRC.git Comments added and several minor changes to get rid of extra variables --- diff --git a/Robust/src/Runtime/DSTM/interface/dstm.h b/Robust/src/Runtime/DSTM/interface/dstm.h index 6e9a481e..cf855326 100644 --- a/Robust/src/Runtime/DSTM/interface/dstm.h +++ b/Robust/src/Runtime/DSTM/interface/dstm.h @@ -108,85 +108,71 @@ typedef struct transrecord { struct ___Object___ * revertlist; #endif } transrecord_t; -// Structure that keeps track of responses from the participants +// Structure is a shared structure that keeps track of responses from the participants typedef struct thread_response { char rcv_status; } thread_response_t; -// Structure that holds fixed data sizes to be sent along with TRANS_REQUEST +// Structure that holds fixed data to be sent along with TRANS_REQUEST typedef struct fixed_data { - char control; - char trans_id[TID_LEN]; - int mcount; // Machine count - short numread; // Number of objects read - short nummod; // Number of objects modified - int sum_bytes; // Total bytes modified + char control; /* control message */ + char trans_id[TID_LEN]; /* transaction id */ + int mcount; /* participant count */ + short numread; /* no of objects read */ + short nummod; /* no of objects modified */ + int sum_bytes; /* total bytes of modified objects in a transaction */ } fixed_data_t; -// Structure that holds variable data sizes per machine participant +/* Structure that holds trans request information for each participant */ typedef struct trans_req_data { - fixed_data_t f; - unsigned int *listmid; - char *objread; - unsigned int *oidmod; -} trans_req_data_t; - -// Structure passed to dstmAcceptinfo() on server side to complete TRANS_COMMIT process - + fixed_data_t f; /* Holds first few fixed bytes of data sent during TRANS_REQUEST protcol*/ + unsigned int *listmid; /* Pointer to array holding list of participants */ + char *objread; /* Pointer to array holding oid and version number of objects that are only read */ + unsigned int *oidmod; /* Pointer to array holding oids of objects that are modified */ +} trans_req_data_t; + +/* Structure that holds information of objects that are not found in the participant + * and objs locked within a transaction during commit process */ typedef struct trans_commit_data{ - unsigned int *objmod; - unsigned int *objlocked; - unsigned int *objnotfound; - void *modptr; - int nummod; - int numlocked; - int numnotfound; + unsigned int *objlocked; /* Pointer to array holding oids of objects locked inside a transaction */ + unsigned int *objnotfound; /* Pointer to array holding oids of objects not found on the participant machine */ + void *modptr; /* Pointer to the address in the mainobject store of the participant that holds all modified objects */ + int numlocked; /* no of objects locked */ + int numnotfound; /* no of objects not found */ } trans_commit_data_t; #define PRINT_TID(PTR) printf("DEBUG -> %x %d\n", PTR->mid, PTR->thread_id); -//structure for passing multiple arguments to thread +/* Structure for passing multiple arguments to a thread + * spawned to process each transaction on a machine */ typedef struct thread_data_array { - int thread_id; + int thread_id; int mid; - int pilecount; - trans_req_data_t *buffer; - thread_response_t *recvmsg;//shared datastructure to keep track of the control message receiv - pthread_cond_t *threshold; //threshhold for waking up a thread - pthread_mutex_t *lock; //lock the count variable - int *count; //variable to count responses of TRANS_REQUEST protocol from all participants - char *replyctrl; //shared ctrl message that stores the reply to be sent, filled by decideResp - char *replyretry; //shared variable to find out if we need retry (TRANS_COMMIT case) - transrecord_t *rec; // To send modified objects + int pilecount; /* No of remote machines involved */ + trans_req_data_t *buffer; /* Holds trans request information sent to participants */ + thread_response_t *recvmsg; /* Shared datastructure to keep track of the participants response to a trans request */ + pthread_cond_t *threshold; /* Condition var to waking up a thread */ + pthread_mutex_t *lock; /* Lock for counting participants response */ + int *count; /* Variable to count responses from all participants to the TRANS_REQUEST protocol */ + char *replyctrl; /* Shared ctrl message that stores the reply to be sent to participants, filled by decideResponse() */ + char *replyretry; /* Shared variable that keep track if coordinator needs retry */ + transrecord_t *rec; /* To send modified objects */ } thread_data_array_t; //Structure for passing arguments to the local m/c thread typedef struct local_thread_data_array { - thread_data_array_t *tdata; - trans_commit_data_t *transinfo; //Required for trans commit process + thread_data_array_t *tdata; /* Holds all the arguments send to a thread that is spawned when transaction commits */ + trans_commit_data_t *transinfo; /* Holds information of objects locked and not found in the participant */ } local_thread_data_array_t; -// Structure to save information about an oid necesaary for the decideControl() -typedef struct objinfo { - unsigned int oid; - int poss_val; //Status of object(locked but version matches, version mismatch, oid not present in machine etc) -}objinfo_t; - //Structure for members within prefetch tuples typedef struct member { - short offset; - short index; - struct member *next; + short offset; /* Holds offset of the ptr field */ + short index; /* Holds the array index value */ + struct member *next; }trans_member_t; -/* -//Structure that holds the compiler generated prefetch data -typedef struct compprefetchdata { -transrecord_t *record; -} compprefetchdata_t; -*/ - /* Initialize main object store and lookup tables, start server thread. */ int dstmInit(void); @@ -205,10 +191,11 @@ void *objstrAlloc(objstr_t *store, unsigned int size); //size in bytes void *dstmListen(); void *dstmAccept(void *); int readClientReq(trans_commit_data_t *, int); -int processClientReq(fixed_data_t *, trans_commit_data_t *,unsigned int *, char *, void *, int); +int processClientReq(fixed_data_t *, trans_commit_data_t *,unsigned int *, char *, void *, unsigned int *, int); char handleTransReq(fixed_data_t *, trans_commit_data_t *, unsigned int *, char *, void *, int); -int decideCtrlMessage(fixed_data_t *, trans_commit_data_t *, int *, int *, int *, int *, int *, void *, unsigned int *, unsigned int *, unsigned int *, int); -int transCommitProcess(trans_commit_data_t *, int); +int decideCtrlMessage(fixed_data_t *, trans_commit_data_t *, int *, int *, int *, int *, int *, void *, unsigned int *, unsigned int *, int); +//int transCommitProcess(trans_commit_data_t *, int); +int transCommitProcess(void *, unsigned int *, unsigned int *, int, int, int); /* end server portion */ /* Prototypes for transactions */ @@ -229,8 +216,9 @@ void *handleLocalReq(void *); //the C routine that the local m/c thread will exe int decideResponse(thread_data_array_t *);// Coordinator decides what response to send to the participant char sendResponse(thread_data_array_t *, int); //Sends control message back to Participants void *getRemoteObj(transrecord_t *, unsigned int, unsigned int); -int transAbortProcess(void *, unsigned int *, int, int, int); -int transComProcess(trans_commit_data_t *); +int transAbortProcess(void *, unsigned int *, int, int); +//int transComProcess(trans_commit_data_t *); +int transComProcess(void*, unsigned int *, unsigned int *, int, int); void prefetch(int, unsigned int *, short *, short*); void *transPrefetch(void *); void *mcqProcess(void *); diff --git a/Robust/src/Runtime/DSTM/interface/dstmserver.c b/Robust/src/Runtime/DSTM/interface/dstmserver.c index a28e906f..65460bd9 100644 --- a/Robust/src/Runtime/DSTM/interface/dstmserver.c +++ b/Robust/src/Runtime/DSTM/interface/dstmserver.c @@ -20,11 +20,12 @@ extern int classsize[]; objstr_t *mainobjstore; +/* This function initializes the main objects store and creates the + * global machine and location lookup table */ + int dstmInit(void) { - /* Initialize main object store */ mainobjstore = objstrCreate(DEFAULT_OBJ_STORE_SIZE); - /* Create machine lookup table and location lookup table */ if (mhashCreate(HASH_SIZE, LOADFACTOR)) return 1; //failure @@ -34,6 +35,8 @@ int dstmInit(void) return 0; } +/* This function starts the thread to listen on a socket + * for tranaction calls */ void *dstmListen() { int listenfd, acceptfd; @@ -203,7 +206,9 @@ void *dstmAccept(void *acceptfd) int readClientReq(trans_commit_data_t *transinfo, int acceptfd) { char *ptr; void *modptr; + unsigned int *oidmod, oid; fixed_data_t fixed; + objheader_t *headaddr; int sum = 0, i, N, n, val; /* Read fixed_data_t data structure */ @@ -249,17 +254,31 @@ int readClientReq(trans_commit_data_t *transinfo, int acceptfd) { } sum = 0; do { // Recv the objs that are modified by the Coordinator - n = recv((int)acceptfd, modptr+sum, fixed.sum_bytes-sum, 0); + n = recv((int)acceptfd, (char *) modptr+sum, fixed.sum_bytes-sum, 0); sum += n; } while (sum < fixed.sum_bytes && n != 0); } + /* Create an array of oids for modified objects */ + oidmod = (unsigned int *) calloc(fixed.nummod, sizeof(unsigned int)); + ptr = (char *) modptr; + for(i = 0 ; i < fixed.nummod; i++) { + headaddr = (objheader_t *) ptr; + oid = OID(headaddr); + oidmod[i] = oid; + ptr += sizeof(objheader_t) + classsize[TYPE(headaddr)]; + } + /*Process the information read */ - if((val = processClientReq(&fixed, transinfo, listmid, objread, modptr, acceptfd)) != 0) { + if((val = processClientReq(&fixed, transinfo, listmid, objread, modptr, oidmod, acceptfd)) != 0) { printf("Error in processClientReq %s, %d\n", __FILE__, __LINE__); return 1; } + + /* Free resources */ + free(oidmod); + return 0; } @@ -267,7 +286,7 @@ int readClientReq(trans_commit_data_t *transinfo, int acceptfd) { * function and sends a reply to the co-ordinator. * Following this it also receives a new control message from the co-ordinator and processes this message*/ int processClientReq(fixed_data_t *fixed, trans_commit_data_t *transinfo, - unsigned int *listmid, char *objread, void *modptr, int acceptfd) { + unsigned int *listmid, char *objread, void *modptr, unsigned int *oidmod, int acceptfd) { char *ptr, control, sendctrl; objheader_t *tmp_header; void *header; @@ -314,7 +333,7 @@ int processClientReq(fixed_data_t *fixed, trans_commit_data_t *transinfo, case TRANS_COMMIT: /* Invoke the transCommit process() */ printf("DEBUG -> Recv TRANS_COMMIT \n"); - if((val = transCommitProcess(transinfo, (int)acceptfd)) != 0) { + if((val = transCommitProcess(modptr, oidmod, transinfo->objlocked, fixed->nummod, transinfo->numlocked, (int)acceptfd)) != 0) { printf("Error in transCommitProcess %s, %d\n", __FILE__, __LINE__); return 1; } @@ -332,10 +351,7 @@ int processClientReq(fixed_data_t *fixed, trans_commit_data_t *transinfo, /* Free memory */ printf("DEBUG -> Freeing...\n"); fflush(stdout); - if (transinfo->objmod != NULL) { - free(transinfo->objmod); - transinfo->objmod = NULL; - } + if (transinfo->objlocked != NULL) { free(transinfo->objlocked); transinfo->objlocked = NULL; @@ -361,14 +377,13 @@ char handleTransReq(fixed_data_t *fixed, trans_commit_data_t *transinfo, unsigne /* Counters and arrays to formulate decision on control message to be sent */ oidnotfound = (unsigned int *) calloc(fixed->numread + fixed->nummod, sizeof(unsigned int)); oidlocked = (unsigned int *) calloc(fixed->numread + fixed->nummod, sizeof(unsigned int)); - oidmod = (unsigned int *) calloc(fixed->nummod, sizeof(unsigned int)); - int objnotfound = 0, objlocked = 0, objmod =0, v_nomatch = 0, v_matchlock = 0, v_matchnolock = 0; - int objmodnotfound = 0, nummodfound = 0; + int objnotfound = 0, objlocked = 0; + int v_nomatch = 0, v_matchlock = 0, v_matchnolock = 0; /* modptr points to the beginning of the object store * created at the Pariticipant. * Object store holds the modified objects involved in the transaction request */ - ptr = modptr; + ptr = (char *) modptr; /* Process each oid in the machine pile/ group per thread */ for (i = 0; i < fixed->numread + fixed->nummod; i++) { @@ -381,8 +396,6 @@ char handleTransReq(fixed_data_t *fixed, trans_commit_data_t *transinfo, unsigne } else {//Objs modified headptr = (objheader_t *) ptr; oid = OID(headptr); - oidmod[objmod] = oid;//Array containing modified oids - objmod++; version = headptr->version; ptr += sizeof(objheader_t) + classsize[TYPE(headptr)]; } @@ -440,7 +453,7 @@ char handleTransReq(fixed_data_t *fixed, trans_commit_data_t *transinfo, unsigne /* Decide what control message to send to Coordinator */ if ((val = decideCtrlMessage(fixed, transinfo, &v_matchnolock, &v_matchlock, &v_nomatch, &objnotfound, &objlocked, - modptr, oidnotfound, oidlocked, oidmod, acceptfd)) == 0) { + modptr, oidnotfound, oidlocked, acceptfd)) == 0) { printf("Error in decideCtrlMessage %s, %d\n", __FILE__, __LINE__); return 0; } @@ -452,8 +465,7 @@ char handleTransReq(fixed_data_t *fixed, trans_commit_data_t *transinfo, unsigne * to send to Coordinator based on the votes of oids involved in the transaction */ int decideCtrlMessage(fixed_data_t *fixed, trans_commit_data_t *transinfo, int *v_matchnolock, int *v_matchlock, int *v_nomatch, int *objnotfound, int *objlocked, void *modptr, - unsigned int *oidnotfound, unsigned int *oidlocked, unsigned int *oidmod, - int acceptfd) { + unsigned int *oidnotfound, unsigned int *oidlocked, int acceptfd) { int val; char control = 0; /* Condition to send TRANS_AGREE */ @@ -490,11 +502,9 @@ int decideCtrlMessage(fixed_data_t *fixed, trans_commit_data_t *transinfo, int * /* Fill out the trans_commit_data_t data structure. This is required for a trans commit process * if Participant receives a TRANS_COMMIT */ - transinfo->objmod = oidmod; transinfo->objlocked = oidlocked; transinfo->objnotfound = oidnotfound; transinfo->modptr = modptr; - transinfo->nummod = fixed->nummod; transinfo->numlocked = *(objlocked); transinfo->numnotfound = *(objnotfound); @@ -504,31 +514,35 @@ int decideCtrlMessage(fixed_data_t *fixed, trans_commit_data_t *transinfo, int * /* This function processes all modified objects involved in a TRANS_COMMIT and updates pointer * addresses in lookup table and also changes version number * Sends an ACK back to Coordinator */ -int transCommitProcess(trans_commit_data_t *transinfo, int acceptfd) { +int transCommitProcess(void *modptr, unsigned int *oidmod, unsigned int *oidlocked, int nummod, int numlocked, int acceptfd) { objheader_t *header; int i = 0, offset = 0; char control; + /* Process each modified object saved in the mainobject store */ - for(i=0; inummod; i++) { - if((header = (objheader_t *) mhashSearch(transinfo->objmod[i])) == NULL) { + for(i = 0; i < nummod; i++) { + if((header = (objheader_t *) mhashSearch(oidmod[i])) == NULL) { printf("mhashsearch returns NULL at %s, %d\n", __FILE__, __LINE__); + return 1; } /* Change reference count of older address and free space in objstr ?? */ header->rcount = 1; //Not sure what would be the val /* Change ptr address in mhash table */ - printf("DEBUG -> removing object oid = %d\n", transinfo->objmod[i]); - mhashRemove(transinfo->objmod[i]); - mhashInsert(transinfo->objmod[i], (transinfo->modptr + offset)); + mhashRemove(oidmod[i]); + mhashInsert(oidmod[i], (((char *)modptr) + offset)); offset += sizeof(objheader_t) + classsize[TYPE(header)]; /* Update object version number */ - header = (objheader_t *) mhashSearch(transinfo->objmod[i]); + header = (objheader_t *) mhashSearch(oidmod[i]); header->version += 1; } /* Unlock locked objects */ - for(i=0; inumlocked; i++) { - header = (objheader_t *) mhashSearch(transinfo->objlocked[i]); + for(i = 0; i < numlocked; i++) { + if((header = (objheader_t *) mhashSearch(oidlocked[i])) == NULL) { + printf("mhashsearch returns NULL at %s, %d\n", __FILE__, __LINE__); + return 1; + } STATUS(header) &= ~(LOCK); } @@ -544,6 +558,11 @@ int transCommitProcess(trans_commit_data_t *transinfo, int acceptfd) { return 0; } +/* This function recevies the oid and offset tuples from the Coordinator's prefetch call. + * Looks for the objects to be prefetched in the main object store. + * If objects are not found then record those and if objects are found + * then use offset values to prefetch references to other objects */ + int prefetchReq(int acceptfd) { int i, length, sum, n, numbytes, numoffset, N, objnotfound = 0, size, count = 0; unsigned int oid, index = 0; @@ -574,7 +593,6 @@ int prefetchReq(int acceptfd) { } while(sum < N && n != 0); /* Process each oid */ - /* Check if object is still present in the machine since the beginning of TRANS_PREFETCH */ if ((mobj = mhashSearch(oid)) == NULL) {/* Obj not found */ /* Save the oids not found in buffer for later use */ *(buffer + index) = OBJECT_NOT_FOUND; diff --git a/Robust/src/Runtime/DSTM/interface/plookup.c b/Robust/src/Runtime/DSTM/interface/plookup.c index 07a19fd2..7bf79ba6 100644 --- a/Robust/src/Runtime/DSTM/interface/plookup.c +++ b/Robust/src/Runtime/DSTM/interface/plookup.c @@ -1,6 +1,13 @@ #include "plookup.h" extern int classsize[]; +//NOTE: "pile" ptr points to the head of the linked list of the machine pile data structures + +/* This function creates a new pile data structure to hold + * obj ids of objects modified or read inside a transaction, + * no of objects read and no of objects modified + * that belong to a single machine */ + plistnode_t *pCreate(int objects) { plistnode_t *pile; @@ -9,32 +16,37 @@ plistnode_t *pCreate(int objects) { printf("Calloc error %s %d\n", __FILE__, __LINE__); return NULL; } - pile->next = NULL; if ((pile->oidmod = calloc(objects, sizeof(unsigned int))) == NULL) { printf("Calloc error %s %d\n", __FILE__, __LINE__); + free(pile); return NULL; } + /* if ((pile->oidread = calloc(objects, sizeof(unsigned int))) == NULL) { printf("Calloc error %s %d\n", __FILE__, __LINE__); return NULL; } - pile->nummod = pile->numread = pile->sum_bytes = 0; - if ((pile->objread = calloc(objects, sizeof(int) + sizeof(short))) == NULL) { + */ + if ((pile->objread = calloc(objects, sizeof(unsigned int) + sizeof(short))) == NULL) { printf("Calloc error %s %d\n", __FILE__, __LINE__); + free(pile); + free(pile->oidmod); return NULL; } - pile->objmodified = NULL; - pile->nummod = pile->numread = pile->sum_bytes = 0; + pile->nummod = pile->numread = pile->sum_bytes = 0; + pile->next = NULL; return pile; } +/* This function inserts necessary information into + * a machine pile data structure */ plistnode_t *pInsert(plistnode_t *pile, objheader_t *headeraddr, unsigned int mid, int num_objs) { plistnode_t *ptr, *tmp; int found = 0, offset; tmp = pile; - //Add oid into a machine that is a part of the pile linked list structure + //Add oid into a machine that is already present in the pile linked list structure while(tmp != NULL) { if (tmp->mid == mid) { if (STATUS(headeraddr) & DIRTY) { @@ -42,13 +54,12 @@ plistnode_t *pInsert(plistnode_t *pile, objheader_t *headeraddr, unsigned int mi tmp->nummod = tmp->nummod + 1; tmp->sum_bytes += sizeof(objheader_t) + classsize[TYPE(headeraddr)]; } else { - tmp->oidread[tmp->numread] = OID(headeraddr); + // tmp->oidread[tmp->numread] = OID(headeraddr); offset = (sizeof(unsigned int) + sizeof(short)) * tmp->numread; *((unsigned int *)(tmp->objread + offset))=OID(headeraddr); offset += sizeof(unsigned int); memcpy(tmp->objread + offset, &headeraddr->version, sizeof(short)); tmp->numread = tmp->numread + 1; - // printf("DEBUG->pInsert() No of obj read = %d\n", tmp->numread); } found = 1; break; @@ -66,7 +77,7 @@ plistnode_t *pInsert(plistnode_t *pile, objheader_t *headeraddr, unsigned int mi ptr->nummod = ptr->nummod + 1; ptr->sum_bytes += sizeof(objheader_t) + classsize[TYPE(headeraddr)]; } else { - ptr->oidread[ptr->numread] = OID(headeraddr); + // ptr->oidread[ptr->numread] = OID(headeraddr); *((unsigned int *)ptr->objread)=OID(headeraddr); memcpy(ptr->objread + sizeof(unsigned int), &headeraddr->version, sizeof(short)); ptr->numread = ptr->numread + 1; @@ -78,7 +89,7 @@ plistnode_t *pInsert(plistnode_t *pile, objheader_t *headeraddr, unsigned int mi return pile; } -//Count the number of machine groups +//Count the number of machine piles int pCount(plistnode_t *pile) { plistnode_t *tmp; int pcount = 0; @@ -110,7 +121,7 @@ void pDelete(plistnode_t *pile) { while(tmp != NULL) { next = tmp->next; free(tmp->oidmod); - free(tmp->oidread); + //free(tmp->oidread); free(tmp->objread); free(tmp); tmp = next; diff --git a/Robust/src/Runtime/DSTM/interface/plookup.h b/Robust/src/Runtime/DSTM/interface/plookup.h index f4a84e29..e35c451b 100644 --- a/Robust/src/Runtime/DSTM/interface/plookup.h +++ b/Robust/src/Runtime/DSTM/interface/plookup.h @@ -5,17 +5,18 @@ #include #include "dstm.h" +/* This structure is created using a transaction record. + * It is filled out with pile information necessary for + * participants involved in a transaction. */ typedef struct plistnode { unsigned int mid; - int local; /*Variable that keeps track if this pile is for LOCAL machine */ - unsigned int *oidmod; - unsigned int *oidread; - int nummod; - int numread; - int sum_bytes; - char *objread; - char *objmodified; - int vote; + int local; /* Variable that keeps track if this pile is for LOCAL machine */ + unsigned int *oidmod; /* Pointer to array containing oids of modified objects */ + unsigned int *oidread; /* TODO: REMOVE THIS Pointer to array of objects read */ + int nummod; /* no of objects read */ + int numread; /* no of objects modified */ + int sum_bytes; /* total bytes of objects modified */ + char *objread; /* Pointer to array containing oids of objects read and their version numbers*/ struct plistnode *next; } plistnode_t; diff --git a/Robust/src/Runtime/DSTM/interface/trans.c b/Robust/src/Runtime/DSTM/interface/trans.c index 9053965f..8e86b346 100644 --- a/Robust/src/Runtime/DSTM/interface/trans.c +++ b/Robust/src/Runtime/DSTM/interface/trans.c @@ -218,6 +218,7 @@ objheader_t *transRead(transrecord_t *record, unsigned int oid) { chashInsert(record->lookupTable, OID(tmp), objcopy); return(objcopy); } else { /* If not found anywhere, then block until object appears in prefetch cache */ +#if 0 printf("Inside remote machine\n"); pthread_mutex_lock(&pflookup.lock); while(!found) { @@ -240,16 +241,15 @@ objheader_t *transRead(transrecord_t *record, unsigned int oid) { pthread_mutex_unlock(&pflookup.lock); } } +#endif /* Get the object from the remote location */ machinenumber = lhashSearch(oid); objcopy = getRemoteObj(record, machinenumber, oid); if(objcopy == NULL) { //If object is not found in Remote location - //printf("Object oid = %d not found in Machine %d\n", oid, machinenumber); return NULL; } else { - //printf("Object oid = %d found in Machine %d\n", oid, machinenumber); return(objcopy); } } @@ -292,6 +292,7 @@ plistnode_t *createPiles(transrecord_t *record) { } next = curr->next; //Get machine location for object id + //TODO Check is the object is newly created ...if not then lookup the location table if ((machinenum = lhashSearch(curr->key)) == 0) { printf("Error: No such machine %s, %d\n", __FILE__, __LINE__); @@ -322,9 +323,9 @@ plistnode_t *createPiles(transrecord_t *record) { /* This function initiates the transaction commit process * Spawns threads for each of the new connections with Participants - * and creates new piles by calling the createPiles(), - * Fills the piles with necesaary information and - * Sends a transrequest() to each pile*/ + * and creates new piles by calling the createPiles(), + * Sends a transrequest() to each remote machines for objects found remotely + * and calls handleLocalReq() to process objects found locally */ int transCommit(transrecord_t *record) { unsigned int tot_bytes_mod, *listmid; plistnode_t *pile, *pile_ptr; @@ -462,10 +463,10 @@ int transCommit(transrecord_t *record) { return 0; } -/* This function sends information involved in the transaction request and - * accepts a response from particpants. +/* This function sends information involved in the transaction request + * to participants and accepts a response from particpants. * It calls decideresponse() to decide on what control message - * to send next and sends the message using sendResponse()*/ + * to send next to participants and sends the message using sendResponse()*/ void *transRequest(void *threadarg) { int sd, i, n; struct sockaddr_in serv_addr; @@ -538,7 +539,7 @@ void *transRequest(void *threadarg) { tdata->recvmsg[tdata->thread_id].rcv_status = recvcontrol; /* Lock and update count */ - //Thread sleeps until all messages from pariticipants are received by coordinator + /* Thread sleeps until all messages from pariticipants are received by coordinator */ pthread_mutex_lock(tdata->lock); (*(tdata->count))++; /* keeps track of no of messages received by the coordinator */ @@ -572,15 +573,13 @@ void *transRequest(void *threadarg) { } /* This function decides the reponse that needs to be sent to - * all Participant machines involved in the transaction commit */ + * all Participant machines after the TRANS_REQUEST protocol */ int decideResponse(thread_data_array_t *tdata) { char control; int i, transagree = 0, transdisagree = 0, transsoftabort = 0; /* Counters to formulate decision of what message to send */ - //Check common data structure for (i = 0 ; i < tdata->pilecount ; i++) { - /*Switch on response from Participant */ control = tdata->recvmsg[i].rcv_status; /* tdata: keeps track of all participant responses written onto the shared array */ switch(control) { @@ -604,9 +603,8 @@ int decideResponse(thread_data_array_t *tdata) { } } - /* Decide what control message to send to Participant */ + /* Send Abort */ if(transdisagree > 0) { - /* Send Abort */ *(tdata->replyctrl) = TRANS_ABORT; printf("DEBUG-> trans.c Sending TRANS_ABORT\n"); /* Free resources */ @@ -646,6 +644,7 @@ char sendResponse(thread_data_array_t *tdata, int sd) { N = oidcount * sizeof(unsigned int); if((oidnotfound = calloc(oidcount, sizeof(unsigned int))) == NULL) { printf("Calloc error %s, %d\n", __FILE__, __LINE__); + return 0; } ptr = (char *) oidnotfound; do { @@ -661,7 +660,7 @@ char sendResponse(thread_data_array_t *tdata, int sd) { } else if(*(tdata->replyctrl) == TRANS_COMMIT) { /* If the decided response is TRANS_COMMIT */ retval = TRANS_COMMIT; } - /* Send response to the Participant */ + if (send(sd, tdata->replyctrl, sizeof(char),MSG_NOSIGNAL) < sizeof(char)) { perror("Error sending ctrl message for participant\n"); } @@ -742,22 +741,20 @@ void *getRemoteObj(transrecord_t *record, unsigned int mnum, unsigned int oid) { return objcopy; } -/*This function handles the local trans requests involved in a transaction commiting process - * makes a decision if the local machine sends AGREE or DISAGREE or SOFT_ABORT - * Activates the other nonlocal threads that are waiting for the decision and the - * based on common decision by all groups involved in the transaction it - * either commits or aborts the transaction. - * It also frees the calloced memory resources - */ - +/* This function handles the local objects involved in a transaction commiting process. + * It also makes a decision if this local machine sends AGREE or DISAGREE or SOFT_ABORT to coordinator. + * Note Coordinator = local machine + * It wakes up the other threads from remote participants that are waiting for the coordinator's decision and + * based on common agreement it either commits or aborts the transaction. + * It also frees the memory resources */ void *handleLocalReq(void *threadarg) { - int val, i = 0; + int val, i = 0, size, offset = 0; short version; char control = 0, *ptr; unsigned int oid; unsigned int *oidnotfound = NULL, *oidlocked = NULL, *oidmod = NULL; void *mobj, *modptr; - objheader_t *headptr; + objheader_t *headptr, *headeraddr; local_thread_data_array_t *localtdata; localtdata = (local_thread_data_array_t *) threadarg; @@ -765,9 +762,8 @@ void *handleLocalReq(void *threadarg) { /* Counters and arrays to formulate decision on control message to be sent */ oidnotfound = (unsigned int *) calloc((localtdata->tdata->buffer->f.numread + localtdata->tdata->buffer->f.nummod), sizeof(unsigned int)); oidlocked = (unsigned int *) calloc((localtdata->tdata->buffer->f.numread + localtdata->tdata->buffer->f.nummod), sizeof(unsigned int)); - oidmod = (unsigned int *) calloc(localtdata->tdata->buffer->f.nummod, sizeof(unsigned int)); - int objnotfound = 0, objlocked = 0, objmod =0, v_nomatch = 0, v_matchlock = 0, v_matchnolock = 0; - int objmodnotfound = 0, nummodfound = 0; + int objnotfound = 0, objlocked = 0; + int v_nomatch = 0, v_matchlock = 0, v_matchnolock = 0; /* modptr points to the beginning of the object store * created at the Pariticipant */ @@ -775,8 +771,16 @@ void *handleLocalReq(void *threadarg) { printf("objstrAlloc error for modified objects %s, %d\n", __FILE__, __LINE__); return NULL; } + /* Write modified objects into the mainobject store */ + for(i = 0; i< localtdata->tdata->buffer->f.nummod; i++) { + headeraddr = chashSearch(localtdata->tdata->rec->lookupTable, localtdata->tdata->buffer->oidmod[i]); + size = sizeof(objheader_t) + classsize[TYPE(headeraddr)]; + memcpy(modptr+offset, headeraddr, size); + offset += size; + } ptr = modptr; + offset = 0; //Reset /* Process each oid in the machine pile/ group per thread */ for (i = 0; i < localtdata->tdata->buffer->f.numread + localtdata->tdata->buffer->f.nummod; i++) { @@ -787,10 +791,8 @@ void *handleLocalReq(void *threadarg) { incr += sizeof(unsigned int); version = *((short *)(localtdata->tdata->buffer->objread + incr)); } else {//Objs modified - headptr = (objheader_t *) ptr; + headptr = (objheader_t *)ptr; oid = OID(headptr); - oidmod[objmod] = oid;//Array containing modified oids - objmod++; version = headptr->version; ptr += sizeof(objheader_t) + classsize[TYPE(headptr)]; } @@ -812,7 +814,6 @@ void *handleLocalReq(void *threadarg) { /* Send TRANS_DISAGREE to Coordinator */ localtdata->tdata->recvmsg[localtdata->tdata->thread_id].rcv_status = TRANS_DISAGREE; printf("DEBUG -> Sending TRANS_DISAGREE\n"); - //return tdata->recvmsg[tdata->thread_id].rcv_status; } } else {/* If Obj is not locked then lock object */ STATUS(((objheader_t *)mobj)) |= LOCK; @@ -829,14 +830,11 @@ void *handleLocalReq(void *threadarg) { /* Send TRANS_DISAGREE to Coordinator */ localtdata->tdata->recvmsg[localtdata->tdata->thread_id].rcv_status = TRANS_DISAGREE; printf("DEBUG -> Sending TRANS_DISAGREE\n"); - // return tdata->recvmsg[tdata->thread_id].rcv_status; } } } } - /*Decide the response to be sent to the Coordinator( the local machine in this case)*/ - /* Condition to send TRANS_AGREE */ if(v_matchnolock == localtdata->tdata->buffer->f.numread + localtdata->tdata->buffer->f.nummod) { localtdata->tdata->recvmsg[localtdata->tdata->thread_id].rcv_status = TRANS_AGREE; @@ -859,17 +857,12 @@ void *handleLocalReq(void *threadarg) { /* Fill out the trans_commit_data_t data structure. This is required for a trans commit process * if Participant receives a TRANS_COMMIT */ - localtdata->transinfo->objmod = oidmod; localtdata->transinfo->objlocked = oidlocked; localtdata->transinfo->objnotfound = oidnotfound; localtdata->transinfo->modptr = modptr; - localtdata->transinfo->nummod = localtdata->tdata->buffer->f.nummod; localtdata->transinfo->numlocked = objlocked; localtdata->transinfo->numnotfound = objnotfound; - /*Set flag to show that common data structure for this individual thread has been written to */ - //*(tdata->localstatus) |= LM_UPDATED; - /* Lock and update count */ //Thread sleeps until all messages from pariticipants are received by coordinator pthread_mutex_lock(localtdata->tdata->lock); @@ -890,12 +883,12 @@ void *handleLocalReq(void *threadarg) { /*Based on DecideResponse(), Either COMMIT or ABORT the operation*/ if(*(localtdata->tdata->replyctrl) == TRANS_ABORT){ - if(transAbortProcess(modptr,oidlocked, localtdata->transinfo->numlocked, localtdata->transinfo->nummod, localtdata->tdata->buffer->f.numread) != 0) { + if(transAbortProcess(modptr,oidlocked, localtdata->transinfo->numlocked, localtdata->tdata->buffer->f.nummod) != 0) { printf("Error in transAbortProcess() %s,%d\n", __FILE__, __LINE__); return NULL; } }else if(*(localtdata->tdata->replyctrl) == TRANS_COMMIT){ - if(transComProcess(localtdata->transinfo) != 0) { + if(transComProcess(modptr, localtdata->tdata->buffer->oidmod, oidlocked, localtdata->tdata->buffer->f.nummod, localtdata->transinfo->numlocked) != 0) { printf("Error in transComProcess() %s,%d\n", __FILE__, __LINE__); return NULL; } @@ -904,10 +897,7 @@ void *handleLocalReq(void *threadarg) { /* Free memory */ printf("DEBUG -> Freeing...\n"); fflush(stdout); - if (localtdata->transinfo->objmod != NULL) { - free(localtdata->transinfo->objmod); - localtdata->transinfo->objmod = NULL; - } + if (localtdata->transinfo->objlocked != NULL) { free(localtdata->transinfo->objlocked); localtdata->transinfo->objlocked = NULL; @@ -921,7 +911,7 @@ void *handleLocalReq(void *threadarg) { } /* This function completes the ABORT process if the transaction is aborting */ -int transAbortProcess(void *modptr, unsigned int *objlocked, int numlocked, int nummod, int numread) { +int transAbortProcess(void *modptr, unsigned int *objlocked, int numlocked, int nummod) { char *ptr; int i; objheader_t *tmp_header; @@ -929,7 +919,7 @@ int transAbortProcess(void *modptr, unsigned int *objlocked, int numlocked, int printf("DEBUG -> Recv TRANS_ABORT\n"); /* Set all ref counts as 1 and do garbage collection */ - ptr = modptr; + ptr = (char *)modptr; for(i = 0; i< nummod; i++) { tmp_header = (objheader_t *)ptr; tmp_header->rcount = 1; @@ -937,7 +927,10 @@ int transAbortProcess(void *modptr, unsigned int *objlocked, int numlocked, int } /* Unlock objects that was locked due to this transaction */ for(i = 0; i< numlocked; i++) { - header = mhashSearch(objlocked[i]);// find the header address + if((header = mhashSearch(objlocked[i])) == NULL) { + printf("mhashsearch returns NULL at %s, %d\n", __FILE__, __LINE__); + return 1; + } STATUS(((objheader_t *)header)) &= ~(LOCK); } @@ -951,33 +944,36 @@ int transAbortProcess(void *modptr, unsigned int *objlocked, int numlocked, int /*This function completes the COMMIT process is the transaction is commiting */ -int transComProcess(trans_commit_data_t *transinfo) { +int transComProcess(void *modptr, unsigned int *oidmod, unsigned int *objlocked, int nummod, int numlocked) { objheader_t *header; int i = 0, offset = 0; char control; - printf("DEBUG -> Recv TRANS_COMMIT\n"); /* Process each modified object saved in the mainobject store */ - for(i=0; inummod; i++) { - if((header = (objheader_t *) mhashSearch(transinfo->objmod[i])) == NULL) { + for(i = 0; i < nummod; i++) { + if((header = (objheader_t *) mhashSearch(oidmod[i])) == NULL) { printf("mhashsearch returns NULL at %s, %d\n", __FILE__, __LINE__); + return 1; } /* Change reference count of older address and free space in objstr ?? */ header->rcount = 1; //TODO Not sure what would be the val /* Change ptr address in mhash table */ - mhashRemove(transinfo->objmod[i]); - mhashInsert(transinfo->objmod[i], (transinfo->modptr + offset)); + mhashRemove(oidmod[i]); + mhashInsert(oidmod[i], (((char *)modptr) + offset)); offset += sizeof(objheader_t) + classsize[TYPE(header)]; /* Update object version number */ - header = (objheader_t *) mhashSearch(transinfo->objmod[i]); + header = (objheader_t *) mhashSearch(oidmod[i]); header->version += 1; } /* Unlock locked objects */ - for(i=0; inumlocked; i++) { - header = (objheader_t *) mhashSearch(transinfo->objlocked[i]); + for(i = 0; i < numlocked; i++) { + if((header = (objheader_t *) mhashSearch(objlocked[i])) == NULL) { + printf("mhashsearch returns NULL at %s, %d\n", __FILE__, __LINE__); + return 1; + } STATUS(header) &= ~(LOCK); } @@ -1232,7 +1228,6 @@ prefetchpile_t *foundLocal(prefetchqelem_t *node) { /* This function is called by the thread calling transPrefetch */ void *transPrefetch(void *t) { - //int *offstarray = NULL; prefetchqelem_t *qnode; prefetchpile_t *pilehead = NULL; @@ -1342,7 +1337,7 @@ void sendPrefetchReq(prefetchpile_t *mcpilenode, int threadid) { tmp = mcpilenode->objpiles; while(tmp != NULL) { off = offset = 0; - count++; // Keeps track of the number of oid and offset tuples sent per remote machine + count++; /* Keeps track of the number of oid and offset tuples sent per remote machine */ len = sizeof(int) + sizeof(unsigned int) + ((tmp->numoffset) * sizeof(short)); char oidnoffset[len]; memcpy(oidnoffset, &len, sizeof(int));