From a857c6567945a0de5161fb11a54f1db516fd5148 Mon Sep 17 00:00:00 2001 From: adash Date: Wed, 12 Nov 2008 03:59:45 +0000 Subject: [PATCH] Changes to increase garbage collector heap Runtime changes for a single threaded transaction commit --- .../DSTM/interface/addPrefetchEnhance.c | 32 +- .../DSTM/interface/addPrefetchEnhance.h | 6 +- .../Runtime/DSTM/interface/addUdpEnhance.c | 22 +- .../Runtime/DSTM/interface/addUdpEnhance.h | 4 +- Robust/src/Runtime/DSTM/interface/dstm.h | 41 +- .../src/Runtime/DSTM/interface/dstmserver.c | 10 +- Robust/src/Runtime/DSTM/interface/signal.c | 188 ++++- Robust/src/Runtime/DSTM/interface/trans.c | 727 ++++++++---------- Robust/src/Runtime/garbage.c | 2 +- 9 files changed, 567 insertions(+), 465 deletions(-) diff --git a/Robust/src/Runtime/DSTM/interface/addPrefetchEnhance.c b/Robust/src/Runtime/DSTM/interface/addPrefetchEnhance.c index e86b860e..d88e077d 100644 --- a/Robust/src/Runtime/DSTM/interface/addPrefetchEnhance.c +++ b/Robust/src/Runtime/DSTM/interface/addPrefetchEnhance.c @@ -59,8 +59,8 @@ void handleDynPrefetching(int numLocal, int ntuples, int siteid) { /* This function clears from prefetch cache those * entries that caused a transaction abort */ -void cleanPCache(thread_data_array_t *tdata) { - transrecord_t *rec = tdata->rec; +void cleanPCache(transrecord_t *record) { + transrecord_t *rec = record; unsigned int size = rec->lookupTable->size; chashlistnode_t *ptr = rec->lookupTable->table; int i; @@ -70,8 +70,8 @@ void cleanPCache(thread_data_array_t *tdata) { if(curr->key == 0) break; objheader_t *header1, *header2; + /* Not found in local machine's object store and found in prefetch cache */ if((header1 = mhashSearch(curr->key)) == NULL && ((header2 = prehashSearch(curr->key)) != NULL)) { - /* Not found in local machine's object store and found in prefetch cache */ /* Remove from prefetch cache */ prehashRemove(curr->key); } @@ -81,26 +81,30 @@ void cleanPCache(thread_data_array_t *tdata) { } /* This function updates the prefetch cache with - * entires from the transaction cache when a + * entries from the transaction cache when a * transaction commits * Return -1 on error else returns 0 */ -int updatePrefetchCache(thread_data_array_t* tdata) { +int updatePrefetchCache(trans_req_data_t *tdata, transrecord_t *rec) { int retval; char oidType; oidType = 'R'; - if((retval = copyToCache(tdata->buffer->f.numread, (unsigned int *)(tdata->buffer->objread), tdata, oidType)) != 0) { - printf("%s(): Error in copying objects read at %s, %d\n", __func__, __FILE__, __LINE__); - return -1; + if(tdata->f.numread > 0) { + if((retval = copyToCache(tdata->f.numread, (unsigned int *)(tdata->objread), rec, oidType)) != 0) { + printf("%s(): Error in copying objects read at %s, %d\n", __func__, __FILE__, __LINE__); + return -1; + } } - oidType = 'M'; - if((retval = copyToCache(tdata->buffer->f.nummod, tdata->buffer->oidmod, tdata, oidType)) != 0) { - printf("%s(): Error in copying objects read at %s, %d\n", __func__, __FILE__, __LINE__); - return -1; + if(tdata->f.nummod > 0) { + oidType = 'M'; + if((retval = copyToCache(tdata->f.nummod, tdata->oidmod, rec, oidType)) != 0) { + printf("%s(): Error in copying objects read at %s, %d\n", __func__, __FILE__, __LINE__); + return -1; + } } return 0; } -int copyToCache(int numoid, unsigned int *oidarray, thread_data_array_t *tdata, char oidType) { +int copyToCache(int numoid, unsigned int *oidarray, transrecord_t *rec, char oidType) { int i; for (i = 0; i < numoid; i++) { unsigned int oid; @@ -113,7 +117,7 @@ int copyToCache(int numoid, unsigned int *oidarray, thread_data_array_t *tdata, } pthread_mutex_lock(&prefetchcache_mutex); objheader_t * header; - if((header = (objheader_t *) chashSearch(tdata->rec->lookupTable, oid)) == NULL) { + if((header = (objheader_t *) chashSearch(rec->lookupTable, oid)) == NULL) { printf("%s() obj %x is no longer in transaction cache at %s , %d\n", __func__, oid,__FILE__, __LINE__); fflush(stdout); return -1; diff --git a/Robust/src/Runtime/DSTM/interface/addPrefetchEnhance.h b/Robust/src/Runtime/DSTM/interface/addPrefetchEnhance.h index 5336451f..93dd853e 100644 --- a/Robust/src/Runtime/DSTM/interface/addPrefetchEnhance.h +++ b/Robust/src/Runtime/DSTM/interface/addPrefetchEnhance.h @@ -17,8 +17,8 @@ int getRetryCount(int siteid); int getUselessCount(int siteid); char getOperationMode(int); void handleDynPrefetching(int, int, int); -void cleanPCache(thread_data_array_t *tdata); -int updatePrefetchCache(thread_data_array_t *); -int copyToCache(int, unsigned int *, thread_data_array_t *, char); +void cleanPCache(transrecord_t *); +int updatePrefetchCache(trans_req_data_t *, transrecord_t *); +int copyToCache(int, unsigned int *, transrecord_t *rec, char); #endif diff --git a/Robust/src/Runtime/DSTM/interface/addUdpEnhance.c b/Robust/src/Runtime/DSTM/interface/addUdpEnhance.c index bd080de8..8280d15f 100644 --- a/Robust/src/Runtime/DSTM/interface/addUdpEnhance.c +++ b/Robust/src/Runtime/DSTM/interface/addUdpEnhance.c @@ -110,7 +110,7 @@ void *udpListenBroadcast(void *sockfd) { /* Function that invalidate objects that * have been currently modified * returns -1 on error and 0 on success */ -int invalidateObj(thread_data_array_t *tdata) { +int invalidateObj(trans_req_data_t *tdata) { struct sockaddr_in clientaddr; int retval; @@ -119,7 +119,7 @@ int invalidateObj(thread_data_array_t *tdata) { clientaddr.sin_port = htons(UDP_PORT); clientaddr.sin_addr.s_addr = INADDR_BROADCAST; int maxObjsPerMsg = (MAX_SIZE - 2*sizeof(unsigned int))/sizeof(unsigned int); - if(tdata->buffer->f.nummod < maxObjsPerMsg) { + if(tdata->f.nummod < maxObjsPerMsg) { /* send single udp msg */ int iteration = 0; if((retval = sendUdpMsg(tdata, &clientaddr, iteration)) < 0) { @@ -128,8 +128,8 @@ int invalidateObj(thread_data_array_t *tdata) { } } else { /* Split into several udp msgs */ - int maxUdpMsg = tdata->buffer->f.nummod/maxObjsPerMsg; - if (tdata->buffer->f.nummod%maxObjsPerMsg) maxUdpMsg++; + int maxUdpMsg = tdata->f.nummod/maxObjsPerMsg; + if (tdata->f.nummod%maxObjsPerMsg) maxUdpMsg++; int i; for(i = 1; i <= maxUdpMsg; i++) { if((retval = sendUdpMsg(tdata, &clientaddr, i)) < 0) { @@ -144,7 +144,7 @@ int invalidateObj(thread_data_array_t *tdata) { /* Function sends a udp broadcast, also distinguishes * msg size to be sent based on the iteration flag * returns -1 on error and 0 on success */ -int sendUdpMsg(thread_data_array_t *tdata, struct sockaddr_in *clientaddr, int iteration) { +int sendUdpMsg(trans_req_data_t *tdata, struct sockaddr_in *clientaddr, int iteration) { char writeBuffer[MAX_SIZE]; int maxObjsPerMsg = (MAX_SIZE - 2*sizeof(unsigned int))/sizeof(unsigned int); int offset = 0; @@ -153,25 +153,25 @@ int sendUdpMsg(thread_data_array_t *tdata, struct sockaddr_in *clientaddr, int i *((unsigned int *)(writeBuffer+offset)) = myIpAddr; //mid sending invalidation offset += sizeof(unsigned int); if(iteration == 0) { // iteration flag == zero, send single udp msg - *((short *)(writeBuffer+offset)) = (short) (sizeof(unsigned int) * (tdata->buffer->f.nummod)); //sizeof msg + *((short *)(writeBuffer+offset)) = (short) (sizeof(unsigned int) * (tdata->f.nummod)); //sizeof msg offset += sizeof(short); int i; - for(i = 0; i < tdata->buffer->f.nummod; i++) { - *((unsigned int *) (writeBuffer+offset)) = tdata->buffer->oidmod[i]; //copy objects + for(i = 0; i < tdata->f.nummod; i++) { + *((unsigned int *) (writeBuffer+offset)) = tdata->oidmod[i]; //copy objects offset += sizeof(unsigned int); } } else { // iteration flag > zero, send multiple udp msg int numObj; - if((tdata->buffer->f.nummod - (iteration * maxObjsPerMsg)) > 0) + if((tdata->f.nummod - (iteration * maxObjsPerMsg)) > 0) numObj = maxObjsPerMsg; else - numObj = tdata->buffer->f.nummod - ((iteration - 1)*maxObjsPerMsg); + numObj = tdata->f.nummod - ((iteration - 1)*maxObjsPerMsg); *((short *)(writeBuffer+offset)) = (short) (sizeof(unsigned int) * numObj); offset += sizeof(short); int index = (iteration - 1) * maxObjsPerMsg; int i; for(i = 0; i < numObj; i++) { - *((unsigned int *) (writeBuffer+offset)) = tdata->buffer->oidmod[index+i]; + *((unsigned int *) (writeBuffer+offset)) = tdata->oidmod[index+i]; offset += sizeof(unsigned int); } } diff --git a/Robust/src/Runtime/DSTM/interface/addUdpEnhance.h b/Robust/src/Runtime/DSTM/interface/addUdpEnhance.h index 21a4d12a..b3964bce 100644 --- a/Robust/src/Runtime/DSTM/interface/addUdpEnhance.h +++ b/Robust/src/Runtime/DSTM/interface/addUdpEnhance.h @@ -21,7 +21,7 @@ int createUdpSocket(); int udpInit(); void *udpListenBroadcast(void *); -int invalidateObj(thread_data_array_t *); +int invalidateObj(trans_req_data_t *); int invalidateFromPrefetchCache(char *); -int sendUdpMsg(thread_data_array_t *, struct sockaddr_in *, int); +int sendUdpMsg(trans_req_data_t *, struct sockaddr_in *, int); #endif diff --git a/Robust/src/Runtime/DSTM/interface/dstm.h b/Robust/src/Runtime/DSTM/interface/dstm.h index 6eb801b8..2c7867ca 100644 --- a/Robust/src/Runtime/DSTM/interface/dstm.h +++ b/Robust/src/Runtime/DSTM/interface/dstm.h @@ -55,8 +55,10 @@ #define LISTEN_PORT 2156 #define UDP_PORT 2158 //Prefetch tuning paramters -#define RETRYINTERVAL 20 //N -#define SHUTDOWNINTERVAL 3 //M +//#define RETRYINTERVAL 20 //N (For Em3d, SOR, Moldyn benchmarks) +//#define SHUTDOWNINTERVAL 3 //M +#define RETRYINTERVAL 100 //N (For MatrixMultiply, 2DFFT benchmarks) +#define SHUTDOWNINTERVAL 1 //M #include #include @@ -91,6 +93,9 @@ /*******Global statistics *********/ extern int numprefetchsites; +double idForTimeDelay; /* TODO Remove, necessary to get time delay for starting transRequest for this id */ +int transCount; /* TODO Remove, necessary to the transaction id */ + #ifdef COMPILER #include "structdefs.h" @@ -208,9 +213,9 @@ typedef struct thread_data_array { int mid; trans_req_data_t *buffer; /* Holds trans request information sent to a participant, based on threadid */ thread_response_t *recvmsg; /* Shared datastructure to keep track of the participants response to a trans request */ - pthread_cond_t *threshold; /* Condition var to waking up a thread */ + pthread_cond_t *threshold; /* Condition var to wake up a thread */ pthread_mutex_t *lock; /* Lock for counting participants response */ - int *count; /* Variable to count responses from all participants to the TRANS_REQUEST protocol */ + int *count; /* Shared variable to count responses from all participants to the TRANS_REQUEST protocol */ char *replyctrl; /* Shared ctrl message that stores the reply to be sent to participants, filled by decideResponse() */ char *replyretry; /* Shared variable that keep track if coordinator needs retry */ transrecord_t *rec; /* Shared variable transaction record send to all thread data */ @@ -278,12 +283,12 @@ objheader_t *transRead(transrecord_t *, unsigned int); objheader_t *transCreateObj(transrecord_t *, unsigned int); //returns oid header int transCommit(transrecord_t *record); //return 0 if successful void *transRequest(void *); //the C routine that the thread will execute when TRANS_REQUEST begins -void decideResponse(thread_data_array_t *); // Coordinator decides what response to send to the participant -char sendResponse(thread_data_array_t *, int); //Sends control message back to Participants +char decideResponse(char *, char *, transrecord_t *, int); // Coordinator decides what response to send to the participant void *getRemoteObj(transrecord_t *, unsigned int, unsigned int); // returns object header from main object store after object is copied into it from remote machine -void *handleLocalReq(void *); //handles Local requests -int transComProcess(local_thread_data_array_t *); -int transAbortProcess(local_thread_data_array_t *); +void handleLocalReq(trans_req_data_t *, trans_commit_data_t *, transrecord_t *, char *); +int transComProcess(trans_req_data_t *, trans_commit_data_t *, transrecord_t *); +void doLocalProcess(char, trans_req_data_t *tdata, trans_commit_data_t *, transrecord_t *); +int transAbortProcess(trans_commit_data_t *); void transAbort(transrecord_t *trans); void sendPrefetchResponse(int sd, char *control, char *sendbuffer, int *size); void prefetch(int, int, unsigned int *, unsigned short *, short*); @@ -298,13 +303,27 @@ int getPrefetchResponse(int); unsigned short getObjType(unsigned int oid); int startRemoteThread(unsigned int oid, unsigned int mid); plistnode_t *pInsert(plistnode_t *pile, objheader_t *headeraddr, unsigned int mid, int num_objs); -void commitCountForObjRead(local_thread_data_array_t *, unsigned int *, unsigned int *, int *, int *, int *, int *, int *, unsigned int, unsigned short); -void commitCountForObjMod(local_thread_data_array_t *, unsigned int *, unsigned int *, int *, int *, int *, int *, int *, unsigned int, unsigned short); +void commitCountForObjRead(char *, unsigned int *, unsigned int *, int *, int *, int *, int *, int *, unsigned int, unsigned short); +void commitCountForObjMod(char *, unsigned int *, unsigned int *, int *, int *, int *, int *, int *, unsigned int, unsigned short); /* Sends notification request for thread join, if sucessful returns 0 else returns -1 */ int reqNotify(unsigned int *oidarry, unsigned short *versionarry, unsigned int numoid); void threadNotify(unsigned int oid, unsigned short version, unsigned int tid); int notifyAll(threadlist_t **head, unsigned int oid, unsigned int version); +/* Internal functions from signal.c */ +int getthreadid(); +double getMax(double *array, int size); +double getMin(double *array, int size); +double getfast(int siteid, int threadid); +double getslowest(int siteid, int threadid); +double getavg(int siteid, int threadid); +double getavgperthd(int siteid, int threadid); +double avgfast(int siteid, int threadid); +double avgslow(int siteid, int threadid); +void bubblesort(); +void swap(double *e1, double *e2); +double avgofthreads(int siteid, int threadid); + /* end transactions */ #endif diff --git a/Robust/src/Runtime/DSTM/interface/dstmserver.c b/Robust/src/Runtime/DSTM/interface/dstmserver.c index d65c0df5..69e10ca1 100644 --- a/Robust/src/Runtime/DSTM/interface/dstmserver.c +++ b/Robust/src/Runtime/DSTM/interface/dstmserver.c @@ -24,7 +24,6 @@ pthread_mutexattr_t mainobjstore_mutex_attr; /* Attribute for lock to make it a sockPoolHashTable_t *transPResponseSocketPool; - /* This function initializes the main objects store and creates the * global machine and location lookup table */ @@ -344,6 +343,7 @@ int readClientReq(trans_commit_data_t *transinfo, int acceptfd) { * Following this it also receives a new control message from the co-ordinator and processes this message*/ int processClientReq(fixed_data_t *fixed, trans_commit_data_t *transinfo, unsigned int *listmid, char *objread, void *modptr, unsigned int *oidmod, int acceptfd) { + char control, sendctrl, retval; objheader_t *tmp_header; void *header; @@ -354,6 +354,7 @@ int processClientReq(fixed_data_t *fixed, trans_commit_data_t *transinfo, printf("Error: In handleTransReq() %s, %d\n", __FILE__, __LINE__); return 1; } + recv_data((int)acceptfd, &control, sizeof(char)); /* Process the new control message */ switch(control) { @@ -402,6 +403,7 @@ int processClientReq(fixed_data_t *fixed, trans_commit_data_t *transinfo, //TODO Use fixed.trans_id TID since Client may have died break; } + /* Free memory */ if (transinfo->objlocked != NULL) { free(transinfo->objlocked); @@ -409,6 +411,7 @@ int processClientReq(fixed_data_t *fixed, trans_commit_data_t *transinfo, if (transinfo->objnotfound != NULL) { free(transinfo->objnotfound); } + return 0; } @@ -542,6 +545,7 @@ void getCommitCountForObjMod(unsigned int *oidnotfound, unsigned int *oidlocked, *numBytes += size; /* Send TRANS_DISAGREE to Coordinator */ *control = TRANS_DISAGREE; + //printf("%s() oid = %d, type = %d\t", __func__, OID(mobj), TYPE((objheader_t *)mobj)); } //Keep track of oid locked oidlocked[*objlocked] = OID(((objheader_t *)mobj)); @@ -558,6 +562,7 @@ void getCommitCountForObjMod(unsigned int *oidnotfound, unsigned int *oidlocked, size += sizeof(objheader_t); *numBytes += size; *control = TRANS_DISAGREE; + //printf("%s() oid = %d, type = %d\t", __func__, OID(mobj), TYPE((objheader_t *)mobj)); } } } @@ -588,6 +593,7 @@ void getCommitCountForObjRead(unsigned int *oidnotfound, unsigned int *oidlocked *numBytes += size; /* Send TRANS_DISAGREE to Coordinator */ *control = TRANS_DISAGREE; + //printf("%s() oid = %d, type = %d\t", __func__, OID(mobj), TYPE((objheader_t *)mobj)); } //Keep track of oid locked oidlocked[*objlocked] = OID(((objheader_t *)mobj)); @@ -604,6 +610,7 @@ void getCommitCountForObjRead(unsigned int *oidnotfound, unsigned int *oidlocked size += sizeof(objheader_t); *numBytes += size; *control = TRANS_DISAGREE; + //printf("%s() oid = %d, type = %d\t", __func__, OID(mobj), TYPE((objheader_t *)mobj)); } } } @@ -647,7 +654,6 @@ char decideCtrlMessage(fixed_data_t *fixed, trans_commit_data_t *transinfo, int transinfo->modptr = modptr; transinfo->numlocked = *(objlocked); transinfo->numnotfound = *(objnotfound); - return control; } diff --git a/Robust/src/Runtime/DSTM/interface/signal.c b/Robust/src/Runtime/DSTM/interface/signal.c index 9433e4fb..6cd99c6c 100644 --- a/Robust/src/Runtime/DSTM/interface/signal.c +++ b/Robust/src/Runtime/DSTM/interface/signal.c @@ -9,7 +9,8 @@ extern int nmhashSearch; extern int nprehashSearch; extern int nRemoteSend; extern int nSoftAbort; -extern int numprefetchsites; +extern unsigned int myIpAddr; + void handle(); extern pfcstats_t *evalPrefetch; @@ -23,13 +24,6 @@ void transStatsHandler(int sig, siginfo_t* info, void *context) { printf("nprehashSearch = %d\n", nprehashSearch); printf("nRemoteReadSend = %d\n", nRemoteSend); printf("nSoftAbort = %d\n", nSoftAbort); - //TODO Remove later - /* - int i; - for(i=0; i array[i]) + min = array[i]; + } + return min; + } + + int getthreadid() { + int val; + if(((128<<24)|(195<<16)|(175<<8)|84) == myIpAddr) + val = 0; + else if(((128<<24)|(195<<16)|(175<<8)|86) == myIpAddr) + val = 1; + else if(((128<<24)|(195<<16)|(175<<8)|87) == myIpAddr) + val = 2; + else if(((128<<24)|(195<<16)|(175<<8)|88) == myIpAddr) + val = 3; + else + val = 4; + printf("threadid/mid = %d\n", val); + return val; + } + + double getfast(int siteid, int threadid) { + int i, j, k; + double fast = 0.0; + //for(i = 0; i < 2; i++) { // for 2 MC + for(i = 0; i < 5; i++) { // for 5 MC + if(i == threadid) + continue; + for(k= 0; k threadstats[j+1][siteid][k]) { + double temp; + temp = threadstats[j+1][siteid][k]; + threadstats[j+1][siteid][k] = threadstats[j][siteid][k]; + threadstats[j][siteid][k] = temp; + } + } + } //end of sorting + } // end for each transaction + } // end for each siteid + } + + double avgofthreads(int siteid, int threadid) { + double total = 0.0; + int k; + for(k = 0; k threadstats[i][siteid][k]) { + slow = threadstats[i][siteid][k]; + } + } + avgslowtime[k] = slow; + } + double total= 0.0; + for(k = 0; k threadstats[i][siteid][k]) { + slow = threadstats[i][siteid][k]; + } + } + } + return slow; + } + + double getavg(int siteid, int threadid) { + double total=0.0; + int i, j, k; + int totalcount = 0; + //for(i = 0; i < 2; i++) { //for 2 MC + for(i = 0; i < 5; i++) { //for 5 MC + if(i == threadid) + continue; + for(k= 0; kf.control = TRANS_REQUEST; - sprintf(tosend->f.trans_id, "%x_%d", pile->mid, newtid); - tosend->f.mcount = pilecount; - tosend->f.numread = pile->numread; - tosend->f.nummod = pile->nummod; - tosend->f.numcreated = pile->numcreated; - tosend->f.sum_bytes = pile->sum_bytes; - tosend->listmid = listmid; - tosend->objread = pile->objread; - tosend->oidmod = pile->oidmod; - tosend->oidcreated = pile->oidcreated; - thread_data_array[threadnum].thread_id = threadnum; - thread_data_array[threadnum].mid = pile->mid; - thread_data_array[threadnum].buffer = tosend; - thread_data_array[threadnum].recvmsg = rcvd_control_msg; - thread_data_array[threadnum].threshold = &tcond; - thread_data_array[threadnum].lock = &tlock; - thread_data_array[threadnum].count = &trecvcount; - thread_data_array[threadnum].replyctrl = &treplyctrl; - thread_data_array[threadnum].replyretry = &treplyretry; - thread_data_array[threadnum].rec = record; - /* If local do not create any extra connection */ - if(pile->mid != myIpAddr) { /* Not local */ - do { - rc = pthread_create(&thread[threadnum], &attr, transRequest, (void *) &thread_data_array[threadnum]); - } while(rc!=0); - if(rc) { - perror("Error in pthread create\n"); - pthread_cond_destroy(&tcond); - pthread_mutex_destroy(&tlock); - pDelete(pile_ptr); + tosend[sockindex].f.control = TRANS_REQUEST; + tosend[sockindex].f.mcount = pilecount; + tosend[sockindex].f.numread = pile->numread; + tosend[sockindex].f.nummod = pile->nummod; + tosend[sockindex].f.numcreated = pile->numcreated; + tosend[sockindex].f.sum_bytes = pile->sum_bytes; + tosend[sockindex].listmid = listmid; + tosend[sockindex].objread = pile->objread; + tosend[sockindex].oidmod = pile->oidmod; + tosend[sockindex].oidcreated = pile->oidcreated; + int sd = 0; + if(pile->mid != myIpAddr) { + if((sd = getSock2WithLock(transRequestSockPool, pile->mid)) < 0) { + printf("transRequest(): socket create error\n"); free(listmid); - for (i = 0; i < threadnum; i++) - free(thread_data_array[i].buffer); - free(thread_data_array); - free(ltdata); + free(tosend); return 1; } - } else { /*Local*/ - ltdata->tdata = &thread_data_array[threadnum]; - ltdata->transinfo = &transinfo; - do { - val = pthread_create(&thread[threadnum], &attr, handleLocalReq, (void *) ltdata); - } while(val!=0); - if(val) { - perror("Error in pthread create\n"); - pthread_cond_destroy(&tcond); - pthread_mutex_destroy(&tlock); - pDelete(pile_ptr); + socklist[sockindex] = sd; + /* Send bytes of data with TRANS_REQUEST control message */ + send_data(sd, &(tosend[sockindex].f), sizeof(fixed_data_t)); + + /* Send list of machines involved in the transaction */ + { + int size=sizeof(unsigned int)*(tosend[sockindex].f.mcount); + send_data(sd, tosend[sockindex].listmid, size); + } + + /* Send oids and version number tuples for objects that are read */ + { + int size=(sizeof(unsigned int)+sizeof(unsigned short))*(tosend[sockindex].f.numread); + send_data(sd, tosend[sockindex].objread, size); + } + + /* Send objects that are modified */ + void *modptr; + if((modptr = calloc(1, tosend[sockindex].f.sum_bytes)) == NULL) { + printf("Calloc error for modified objects %s, %d\n", __FILE__, __LINE__); free(listmid); - for (i = 0; i < threadnum; i++) - free(thread_data_array[i].buffer); - free(thread_data_array); - free(ltdata); + free(tosend); return 1; } + int offset = 0; + int i; + for(i = 0; i < tosend[sockindex].f.nummod ; i++) { + int size; + objheader_t *headeraddr; + if((headeraddr = chashSearch(record->lookupTable, tosend[sockindex].oidmod[i])) == NULL) { + printf("%s() Error: No such oid %s, %d\n", __func__, __FILE__, __LINE__); + free(modptr); + free(listmid); + free(tosend); + return 1; + } + GETSIZE(size,headeraddr); + size+=sizeof(objheader_t); + memcpy(modptr+offset, headeraddr, size); + offset+=size; + } + send_data(sd, modptr, tosend[sockindex].f.sum_bytes); + free(modptr); + } else { //handle request locally + handleLocalReq(&tosend[sockindex], &transinfo, record, &getReplyCtrl[sockindex]); } - - threadnum++; + sockindex++; pile = pile->next; + } //end of pile processing + /* Recv Ctrl msgs from all machines */ + int i; + for(i = 0; i < pilecount; i++) { + int sd = socklist[i]; + if(sd != 0) { + char control; + recv_data(sd, &control, sizeof(char)); + //Update common data structure with new ctrl msg + getReplyCtrl[i] = control; + /* Recv Objects if participant sends TRANS_DISAGREE */ +#ifdef CACHE + if(control == TRANS_DISAGREE) { + int length; + recv_data(sd, &length, sizeof(int)); + void *newAddr; + pthread_mutex_lock(&prefetchcache_mutex); + if ((newAddr = prefetchobjstrAlloc((unsigned int)length)) == NULL) { + printf("Error: %s() objstrAlloc error for copying into prefetch cache %s, %d\n", __func__, __FILE__, __LINE__); + free(tosend); + free(listmid); + pthread_mutex_unlock(&prefetchcache_mutex); + return 1; + } + pthread_mutex_unlock(&prefetchcache_mutex); + recv_data(sd, newAddr, length); + int offset = 0; + while(length != 0) { + unsigned int oidToPrefetch; + objheader_t * header; + header = (objheader_t *)(((char *)newAddr) + offset); + oidToPrefetch = OID(header); + int size = 0; + GETSIZE(size, header); + size += sizeof(objheader_t); + //make an entry in prefetch hash table + void *oldptr; + if((oldptr = prehashSearch(oidToPrefetch)) != NULL) { + prehashRemove(oidToPrefetch); + prehashInsert(oidToPrefetch, header); + } else { + prehashInsert(oidToPrefetch, header); + } + length = length - size; + offset += size; + } + } //end of receiving objs +#endif + } + } + /* Decide the final response */ + if((finalResponse = decideResponse(getReplyCtrl, &treplyretry, record, pilecount)) == 0) { + printf("Error: %s() in updating prefetch cache %s, %d\n", __func__, __FILE__, __LINE__); + free(tosend); + free(listmid); + return 1; } - /* Free attribute and wait for the other threads */ - pthread_attr_destroy(&attr); - for (i = 0; i < threadnum; i++) { - rc = pthread_join(thread[i], NULL); - if(rc) { - printf("Error: return code from pthread_join() is %d\n", rc); - pthread_cond_destroy(&tcond); - pthread_mutex_destroy(&tlock); - pDelete(pile_ptr); - free(listmid); - for (j = i; j < threadnum; j++) { - free(thread_data_array[j].buffer); + /* Send responses to all machines */ + for(i = 0; i < pilecount; i++) { + int sd = socklist[i]; + if(sd != 0) { +#ifdef CACHE + if(finalResponse == TRANS_COMMIT) { + int retval; + /* Update prefetch cache */ + if((retval = updatePrefetchCache(&(tosend[i]), record)) != 0) { + printf("Error: %s() in updating prefetch cache %s, %d\n", __func__, __FILE__, __LINE__); + free(tosend); + free(listmid); + return 1; + } + /* Invalidate objects in other machine cache */ + if(tosend[i].f.nummod > 0) { + if((retval = invalidateObj(&(tosend[i]))) != 0) { + printf("Error: %s() in invalidating Objects %s, %d\n", __func__, __FILE__, __LINE__); + free(tosend); + free(listmid); + return 1; + } + } } - return 1; +#endif + send_data(sd, &finalResponse, sizeof(char)); + } else { + /* Complete local processing */ + doLocalProcess(finalResponse, &(tosend[i]), &transinfo, record); } - free(thread_data_array[i].buffer); } + /* Free resources */ - pthread_cond_destroy(&tcond); - pthread_mutex_destroy(&tlock); + free(tosend); free(listmid); - if (!treplyretry) pDelete(pile_ptr); - /* wait a random amount of time before retrying to commit transaction*/ if(treplyretry) { - free(thread_data_array); - free(ltdata); randomdelay(); #ifdef TRANSSTATS nSoftAbort++; #endif } - /* Retry trans commit procedure during soft_abort case */ } while (treplyretry); - if(treplyctrl == TRANS_ABORT) { + if(finalResponse == TRANS_ABORT) { + //printf("Aborting trans\n"); #ifdef TRANSSTATS numTransAbort++; #endif @@ -640,10 +694,8 @@ int transCommit(transrecord_t *record) { objstrDelete(record->cache); chashDelete(record->lookupTable); free(record); - free(thread_data_array); - free(ltdata); return TRANS_ABORT; - } else if(treplyctrl == TRANS_COMMIT) { + } else if(finalResponse == TRANS_COMMIT) { #ifdef TRANSSTATS numTransCommit++; #endif @@ -651,8 +703,6 @@ int transCommit(transrecord_t *record) { objstrDelete(record->cache); chashDelete(record->lookupTable); free(record); - free(thread_data_array); - free(ltdata); return 0; } else { //TODO Add other cases @@ -662,154 +712,108 @@ int transCommit(transrecord_t *record) { return 0; } -/* This function sends information involved in the transaction request - * to participants and accepts a response from particpants. - * It calls decideresponse() to decide on what control message - * to send next to participants and sends the message using sendResponse()*/ -void *transRequest(void *threadarg) { - int sd, i, n; - struct sockaddr_in serv_addr; - thread_data_array_t *tdata; - objheader_t *headeraddr; - char control, recvcontrol; - char machineip[16], retval; - - tdata = (thread_data_array_t *) threadarg; - if((sd = getSock2WithLock(transRequestSockPool, tdata->mid)) < 0) { - printf("transRequest(): socket create error\n"); - pthread_exit(NULL); - } - - /* Send bytes of data with TRANS_REQUEST control message */ - send_data(sd, &(tdata->buffer->f), sizeof(fixed_data_t)); - - /* Send list of machines involved in the transaction */ - { - int size=sizeof(unsigned int)*tdata->buffer->f.mcount; - send_data(sd, tdata->buffer->listmid, size); - } - - /* Send oids and version number tuples for objects that are read */ - { - int size=(sizeof(unsigned int)+sizeof(unsigned short))*tdata->buffer->f.numread; - send_data(sd, tdata->buffer->objread, size); - } - - /* Send objects that are modified */ - void *modptr; - if((modptr = calloc(1, tdata->buffer->f.sum_bytes)) == NULL) { - printf("Calloc error for modified objects %s, %d\n", __FILE__, __LINE__); - pthread_exit(NULL); - } - int offset = 0; - for(i = 0; i < tdata->buffer->f.nummod ; i++) { - int size; - if((headeraddr = chashSearch(tdata->rec->lookupTable, tdata->buffer->oidmod[i])) == NULL) { - printf("%s() Error: No such oid %s, %d\n", __func__, __FILE__, __LINE__); - pthread_exit(NULL); - } - GETSIZE(size,headeraddr); - size+=sizeof(objheader_t); - memcpy(modptr+offset, headeraddr, size); - offset+=size; - } - send_data(sd, modptr, tdata->buffer->f.sum_bytes); - free(modptr); - /* Read control message from Participant */ - recv_data(sd, &control, sizeof(char)); +/* This function handles the local objects involved in a transaction + * commiting process. It also makes a decision if this local machine + * sends AGREE or DISAGREE or SOFT_ABORT to coordinator */ +void handleLocalReq(trans_req_data_t *tdata, trans_commit_data_t *transinfo, transrecord_t *rec, char *getReplyCtrl) { + unsigned int *oidnotfound = NULL, *oidlocked = NULL; + int numoidnotfound = 0, numoidlocked = 0; + int v_nomatch = 0, v_matchlock = 0, v_matchnolock = 0; + int numread, i; + unsigned int oid; + unsigned short version; - /* Recv Objects if participant sends TRANS_DISAGREE */ -#ifdef CACHE - if(control == TRANS_DISAGREE) { - int length; - recv_data(sd, &length, sizeof(int)); - void *newAddr; - pthread_mutex_lock(&prefetchcache_mutex); - if ((newAddr = prefetchobjstrAlloc((unsigned int)length)) == NULL) { - printf("Error: %s() objstrAlloc error for copying into prefetch cache %s, %d\n", __func__, __FILE__, __LINE__); - pthread_mutex_unlock(&prefetchcache_mutex); - pthread_exit(NULL); - } - pthread_mutex_unlock(&prefetchcache_mutex); - recv_data(sd, newAddr, length); - int offset = 0; - while(length != 0) { - unsigned int oidToPrefetch; - objheader_t * header; - header = (objheader_t *)(((char *)newAddr) + offset); - oidToPrefetch = OID(header); - int size = 0; - GETSIZE(size, header); - size += sizeof(objheader_t); - //make an entry in prefetch hash table - void *oldptr; - if((oldptr = prehashSearch(oidToPrefetch)) != NULL) { - prehashRemove(oidToPrefetch); - prehashInsert(oidToPrefetch, header); - } else { - prehashInsert(oidToPrefetch, header); + /* Counters and arrays to formulate decision on control message to be sent */ + oidnotfound = (unsigned int *) calloc((tdata->f.numread + tdata->f.nummod), sizeof(unsigned int)); + oidlocked = (unsigned int *) calloc((tdata->f.numread + tdata->f.nummod +1), sizeof(unsigned int)); // calloc additional 1 byte for + //setting a divider between read and write locks + numread = tdata->f.numread; + /* Process each oid in the machine pile/ group per thread */ + for (i = 0; i < tdata->f.numread + tdata->f.nummod; i++) { + if (i < tdata->f.numread) { + int incr = sizeof(unsigned int) + sizeof(unsigned short); // Offset that points to next position in the objread array + incr *= i; + oid = *((unsigned int *)(((char *)tdata->objread) + incr)); + version = *((unsigned short *)(((char *)tdata->objread) + incr + sizeof(unsigned int))); + commitCountForObjRead(getReplyCtrl, oidnotfound, oidlocked, &numoidnotfound, &numoidlocked, &v_nomatch, &v_matchlock, &v_matchnolock, oid, version); + } else { // Objects Modified + if(i == tdata->f.numread) { + oidlocked[numoidlocked] = -1; + numoidlocked++; + } + int tmpsize; + objheader_t *headptr; + headptr = (objheader_t *) chashSearch(rec->lookupTable, tdata->oidmod[i-numread]); + if (headptr == NULL) { + printf("Error: handleLocalReq() returning NULL, no such oid %s, %d\n", __FILE__, __LINE__); + return; } - length = length - size; - offset += size; + oid = OID(headptr); + version = headptr->version; + commitCountForObjMod(getReplyCtrl, oidnotfound, oidlocked, &numoidnotfound, &numoidlocked, &v_nomatch, &v_matchlock, &v_matchnolock, oid, version); } } -#endif - recvcontrol = control; - /* Update common data structure and increment count */ - tdata->recvmsg[tdata->thread_id].rcv_status = recvcontrol; - - /* Lock and update count */ - /* Thread sleeps until all messages from pariticipants are received by coordinator */ - pthread_mutex_lock(tdata->lock); - (*(tdata->count))++; /* keeps track of no of messages received by the coordinator */ +/* Fill out the trans_commit_data_t data structure. This is required for a trans commit process + * if Participant receives a TRANS_COMMIT */ + transinfo->objlocked = oidlocked; + transinfo->objnotfound = oidnotfound; + transinfo->modptr = NULL; + transinfo->numlocked = numoidlocked; + transinfo->numnotfound = numoidnotfound; - /* Wake up the threads and invoke decideResponse (once) */ - if(*(tdata->count) == tdata->buffer->f.mcount) { - decideResponse(tdata); - pthread_cond_broadcast(tdata->threshold); - } else { - pthread_cond_wait(tdata->threshold, tdata->lock); + /* Condition to send TRANS_AGREE */ + if(v_matchnolock == tdata->f.numread + tdata->f.nummod) { + *getReplyCtrl = TRANS_AGREE; } - pthread_mutex_unlock(tdata->lock); + /* Condition to send TRANS_SOFT_ABORT */ + if((v_matchlock > 0 && v_nomatch == 0) || (numoidnotfound > 0 && v_nomatch == 0)) { + *getReplyCtrl = TRANS_SOFT_ABORT; + } +} -#ifdef CACHE - if(*(tdata->replyctrl) == TRANS_COMMIT) { - int retval; - /* Update prefetch cache */ - if((retval = updatePrefetchCache(tdata)) != 0) { - printf("Error: %s() in updating prefetch cache %s, %d\n", __func__, __FILE__, __LINE__); +void doLocalProcess(char finalResponse, trans_req_data_t *tdata, trans_commit_data_t *transinfo, transrecord_t *record) { + if(finalResponse == TRANS_ABORT) { + if(transAbortProcess(transinfo) != 0) { + printf("Error in transAbortProcess() %s,%d\n", __FILE__, __LINE__); + fflush(stdout); return; } - + } else if(finalResponse == TRANS_COMMIT) { +#ifdef CACHE /* Invalidate objects in other machine cache */ - if(tdata->buffer->f.nummod > 0) { + if(tdata->f.nummod > 0) { + int retval; if((retval = invalidateObj(tdata)) != 0) { printf("Error: %s() in invalidating Objects %s, %d\n", __func__, __FILE__, __LINE__); return; } } - } #endif + if(transComProcess(tdata, transinfo, record) != 0) { + printf("Error in transComProcess() %s,%d\n", __FILE__, __LINE__); + fflush(stdout); + return; + } + } - /* Send the final response such as TRANS_COMMIT or TRANS_ABORT - * to all participants in their respective socket */ - if (sendResponse(tdata, sd) == 0) { - printf("sendResponse returned error %s,%d\n", __FILE__, __LINE__); - pthread_exit(NULL); + /* Free memory */ + if (transinfo->objlocked != NULL) { + free(transinfo->objlocked); + } + if (transinfo->objnotfound != NULL) { + free(transinfo->objnotfound); } - pthread_exit(NULL); } /* This function decides the reponse that needs to be sent to * all Participant machines after the TRANS_REQUEST protocol */ -void decideResponse(thread_data_array_t *tdata) { - char control; +char decideResponse(char *getReplyCtrl, char *treplyretry, transrecord_t *record, int pilecount) { int i, transagree = 0, transdisagree = 0, transsoftabort = 0; /* Counters to formulate decision of what message to send */ - for (i = 0 ; i < tdata->buffer->f.mcount; i++) { - control = tdata->recvmsg[i].rcv_status; /* tdata: keeps track of all participant responses - written onto the shared array */ + for (i = 0 ; i < pilecount; i++) { + char control; + control = getReplyCtrl[i]; switch(control) { default: printf("Participant sent unknown message in %s, %d\n", __FILE__, __LINE__); @@ -831,49 +835,22 @@ void decideResponse(thread_data_array_t *tdata) { if(transdisagree > 0) { /* Send Abort */ - *(tdata->replyctrl) = TRANS_ABORT; - *(tdata->replyretry) = 0; + *treplyretry = 0; + return TRANS_ABORT; #ifdef CACHE /* clear objects from prefetch cache */ - cleanPCache(tdata); + cleanPCache(record); #endif - } else if(transagree == tdata->buffer->f.mcount) { + } else if(transagree == pilecount) { /* Send Commit */ - *(tdata->replyctrl) = TRANS_COMMIT; - *(tdata->replyretry) = 0; + *treplyretry = 0; + return TRANS_COMMIT; } else { /* Send Abort in soft abort case followed by retry commiting transaction again*/ - *(tdata->replyctrl) = TRANS_ABORT; - *(tdata->replyretry) = 1; - } - return; -} - -/* This function sends the final response to remote machines per - * thread in their respective socket id It returns a char that is only - * needed to check the correctness of execution of this function - * inside transRequest()*/ - -char sendResponse(thread_data_array_t *tdata, int sd) { - int n, size, sum, oidcount = 0, control; - char *ptr, retval = 0; - unsigned int *oidnotfound; - - control = *(tdata->replyctrl); - send_data(sd, &control, sizeof(char)); - - //TODO read missing objects during object migration - /* If response is a soft abort due to missing objects at the - Participant's side */ - - /* If the decided response is TRANS_ABORT */ - if(*(tdata->replyctrl) == TRANS_ABORT) { - retval = TRANS_ABORT; - } else if(*(tdata->replyctrl) == TRANS_COMMIT) { - /* If the decided response is TRANS_COMMIT */ - retval = TRANS_COMMIT; + *treplyretry = 1; + return TRANS_ABORT; } - return retval; + return 0; } /* This function opens a connection, places an object read request to @@ -912,122 +889,8 @@ void *getRemoteObj(transrecord_t *record, unsigned int mnum, unsigned int oid) { return objcopy; } -/* This function handles the local objects involved in a transaction - * commiting process. It also makes a decision if this local machine - * sends AGREE or DISAGREE or SOFT_ABORT to coordinator. Note - * Coordinator = local machine It wakes up the other threads from - * remote participants that are waiting for the coordinator's decision - * and based on common agreement it either commits or aborts the - * transaction. It also frees the memory resources */ - -void *handleLocalReq(void *threadarg) { - unsigned int *oidnotfound = NULL, *oidlocked = NULL; - local_thread_data_array_t *localtdata; - int numoidnotfound = 0, numoidlocked = 0; - int v_nomatch = 0, v_matchlock = 0, v_matchnolock = 0; - int numread, i; - unsigned int oid; - unsigned short version; - localtdata = (local_thread_data_array_t *) threadarg; - /* Counters and arrays to formulate decision on control message to be sent */ - oidnotfound = (unsigned int *) calloc((localtdata->tdata->buffer->f.numread + localtdata->tdata->buffer->f.nummod), sizeof(unsigned int)); - oidlocked = (unsigned int *) calloc((localtdata->tdata->buffer->f.numread + localtdata->tdata->buffer->f.nummod +1), sizeof(unsigned int)); // calloc additional 1 byte for - //setting a divider of read locks - //and write locks - - numread = localtdata->tdata->buffer->f.numread; - /* Process each oid in the machine pile/ group per thread */ - for (i = 0; i < localtdata->tdata->buffer->f.numread + localtdata->tdata->buffer->f.nummod; i++) { - if (i < localtdata->tdata->buffer->f.numread) { - int incr = sizeof(unsigned int) + sizeof(unsigned short); // Offset that points to next position in the objread array - incr *= i; - oid = *((unsigned int *)(((char *)localtdata->tdata->buffer->objread) + incr)); - version = *((unsigned short *)(((char *)localtdata->tdata->buffer->objread) + incr + sizeof(unsigned int))); - commitCountForObjRead(localtdata, oidnotfound, oidlocked, &numoidnotfound, &numoidlocked, &v_nomatch, &v_matchlock, &v_matchnolock, oid, version); - } else { // Objects Modified - if(i == localtdata->tdata->buffer->f.numread) { - oidlocked[numoidlocked] = -1; - numoidlocked++; - } - int tmpsize; - objheader_t *headptr; - headptr = (objheader_t *) chashSearch(localtdata->tdata->rec->lookupTable, localtdata->tdata->buffer->oidmod[i-numread]); - if (headptr == NULL) { - printf("Error: handleLocalReq() returning NULL, no such oid %s, %d\n", __FILE__, __LINE__); - return NULL; - } - oid = OID(headptr); - version = headptr->version; - commitCountForObjMod(localtdata, oidnotfound, oidlocked, &numoidnotfound, &numoidlocked, &v_nomatch, &v_matchlock, &v_matchnolock, oid, version); - } - } - - /* Condition to send TRANS_AGREE */ - if(v_matchnolock == localtdata->tdata->buffer->f.numread + localtdata->tdata->buffer->f.nummod) { - localtdata->tdata->recvmsg[localtdata->tdata->thread_id].rcv_status = TRANS_AGREE; - } - /* Condition to send TRANS_SOFT_ABORT */ - if((v_matchlock > 0 && v_nomatch == 0) || (numoidnotfound > 0 && v_nomatch == 0)) { - localtdata->tdata->recvmsg[localtdata->tdata->thread_id].rcv_status = TRANS_SOFT_ABORT; - } - - /* Fill out the trans_commit_data_t data structure. This is required for a trans commit process - * if Participant receives a TRANS_COMMIT */ - localtdata->transinfo->objlocked = oidlocked; - localtdata->transinfo->objnotfound = oidnotfound; - localtdata->transinfo->modptr = NULL; - localtdata->transinfo->numlocked = numoidlocked; - localtdata->transinfo->numnotfound = numoidnotfound; - - /* Lock and update count */ - //Thread sleeps until all messages from pariticipants are received by coordinator - pthread_mutex_lock(localtdata->tdata->lock); - (*(localtdata->tdata->count))++; /* keeps track of no of messages received by the coordinator */ - - /* Wake up the threads and invoke decideResponse (once) */ - if(*(localtdata->tdata->count) == localtdata->tdata->buffer->f.mcount) { - decideResponse(localtdata->tdata); - pthread_cond_broadcast(localtdata->tdata->threshold); - } else { - pthread_cond_wait(localtdata->tdata->threshold, localtdata->tdata->lock); - } - pthread_mutex_unlock(localtdata->tdata->lock); - if(*(localtdata->tdata->replyctrl) == TRANS_ABORT) { - if(transAbortProcess(localtdata) != 0) { - printf("Error in transAbortProcess() %s,%d\n", __FILE__, __LINE__); - fflush(stdout); - pthread_exit(NULL); - } - } else if(*(localtdata->tdata->replyctrl) == TRANS_COMMIT) { -#ifdef CACHE - /* Invalidate objects in other machine cache */ - if(localtdata->tdata->buffer->f.nummod > 0) { - int retval; - if((retval = invalidateObj(localtdata->tdata)) != 0) { - printf("Error: %s() in invalidating Objects %s, %d\n", __func__, __FILE__, __LINE__); - return; - } - } -#endif - if(transComProcess(localtdata) != 0) { - printf("Error in transComProcess() %s,%d\n", __FILE__, __LINE__); - fflush(stdout); - pthread_exit(NULL); - } - } - - /* Free memory */ - if (localtdata->transinfo->objlocked != NULL) { - free(localtdata->transinfo->objlocked); - } - if (localtdata->transinfo->objnotfound != NULL) { - free(localtdata->transinfo->objnotfound); - } - pthread_exit(NULL); -} - /* Commit info for objects modified */ -void commitCountForObjMod(local_thread_data_array_t *localtdata, unsigned int *oidnotfound, unsigned int *oidlocked, int *numoidnotfound, +void commitCountForObjMod(char *getReplyCtrl, unsigned int *oidnotfound, unsigned int *oidlocked, int *numoidnotfound, int *numoidlocked, int *v_nomatch, int *v_matchlock, int *v_matchnolock, unsigned int oid, unsigned short version) { void *mobj; /* Check if object is still present in the machine since the beginning of TRANS_REQUEST */ @@ -1047,10 +910,12 @@ void commitCountForObjMod(local_thread_data_array_t *localtdata, unsigned int *o } else { /* If versions don't match ...HARD ABORT */ (*v_nomatch)++; /* Send TRANS_DISAGREE to Coordinator */ - localtdata->tdata->recvmsg[localtdata->tdata->thread_id].rcv_status = TRANS_DISAGREE; + *getReplyCtrl = TRANS_DISAGREE; + //Keep track of what is locked oidlocked[*numoidlocked] = OID(((objheader_t *)mobj)); (*numoidlocked)++; + //printf("%s() oid = %d, type = %d\t", __func__, OID(mobj), TYPE((objheader_t *)mobj)); return; } } else { //A lock is acquired some place else @@ -1059,7 +924,8 @@ void commitCountForObjMod(local_thread_data_array_t *localtdata, unsigned int *o } else { /* If versions don't match ...HARD ABORT */ (*v_nomatch)++; /* Send TRANS_DISAGREE to Coordinator */ - localtdata->tdata->recvmsg[localtdata->tdata->thread_id].rcv_status = TRANS_DISAGREE; + *getReplyCtrl = TRANS_DISAGREE; + //printf("%s() oid = %d, type = %d\t", __func__, OID(mobj), TYPE((objheader_t *)mobj)); return; } } @@ -1067,7 +933,7 @@ void commitCountForObjMod(local_thread_data_array_t *localtdata, unsigned int *o } /* Commit info for objects modified */ -void commitCountForObjRead(local_thread_data_array_t *localtdata, unsigned int *oidnotfound, unsigned int *oidlocked, int *numoidnotfound, +void commitCountForObjRead(char *getReplyCtrl, unsigned int *oidnotfound, unsigned int *oidlocked, int *numoidnotfound, int *numoidlocked, int *v_nomatch, int *v_matchlock, int *v_matchnolock, unsigned int oid, unsigned short version) { void *mobj; /* Check if object is still present in the machine since the beginning of TRANS_REQUEST */ @@ -1087,10 +953,11 @@ void commitCountForObjRead(local_thread_data_array_t *localtdata, unsigned int * } else { /* If versions don't match ...HARD ABORT */ (*v_nomatch)++; /* Send TRANS_DISAGREE to Coordinator */ - localtdata->tdata->recvmsg[localtdata->tdata->thread_id].rcv_status = TRANS_DISAGREE; + *getReplyCtrl = TRANS_DISAGREE; //Keep track of what is locked oidlocked[*numoidlocked] = OID(((objheader_t *)mobj)); (*numoidlocked)++; + //printf("%s() oid = %d, type = %d\t", __func__, OID(mobj), TYPE((objheader_t *)mobj)); return; } } else { //Has reached max number of readers or some other transaction @@ -1100,7 +967,8 @@ void commitCountForObjRead(local_thread_data_array_t *localtdata, unsigned int * } else { /* If versions don't match ...HARD ABORT */ (*v_nomatch)++; /* Send TRANS_DISAGREE to Coordinator */ - localtdata->tdata->recvmsg[localtdata->tdata->thread_id].rcv_status = TRANS_DISAGREE; + *getReplyCtrl = TRANS_DISAGREE; + //printf("%s() oid = %d, type = %d\t", __func__, OID(mobj), TYPE((objheader_t *)mobj)); return; } } @@ -1108,13 +976,13 @@ void commitCountForObjRead(local_thread_data_array_t *localtdata, unsigned int * } /* This function completes the ABORT process if the transaction is aborting */ -int transAbortProcess(local_thread_data_array_t *localtdata) { +int transAbortProcess(trans_commit_data_t *transinfo) { int i, numlocked; unsigned int *objlocked; void *header; - numlocked = localtdata->transinfo->numlocked; - objlocked = localtdata->transinfo->objlocked; + numlocked = transinfo->numlocked; + objlocked = transinfo->objlocked; int useWriteUnlock = 0; for (i = 0; i < numlocked; i++) { @@ -1137,18 +1005,18 @@ int transAbortProcess(local_thread_data_array_t *localtdata) { } /*This function completes the COMMIT process if the transaction is commiting*/ -int transComProcess(local_thread_data_array_t *localtdata) { +int transComProcess(trans_req_data_t *tdata, trans_commit_data_t *transinfo, transrecord_t *rec) { objheader_t *header, *tcptr; int i, nummod, tmpsize, numcreated, numlocked; unsigned int *oidmod, *oidcreated, *oidlocked; void *ptrcreate; - nummod = localtdata->tdata->buffer->f.nummod; - oidmod = localtdata->tdata->buffer->oidmod; - numcreated = localtdata->tdata->buffer->f.numcreated; - oidcreated = localtdata->tdata->buffer->oidcreated; - numlocked = localtdata->transinfo->numlocked; - oidlocked = localtdata->transinfo->objlocked; + nummod = tdata->f.nummod; + oidmod = tdata->oidmod; + numcreated = tdata->f.numcreated; + oidcreated = tdata->oidcreated; + numlocked = transinfo->numlocked; + oidlocked = transinfo->objlocked; for (i = 0; i < nummod; i++) { if((header = (objheader_t *) mhashSearch(oidmod[i])) == NULL) { @@ -1156,7 +1024,7 @@ int transComProcess(local_thread_data_array_t *localtdata) { return 1; } /* Copy from transaction cache -> main object store */ - if ((tcptr = ((objheader_t *) chashSearch(localtdata->tdata->rec->lookupTable, oidmod[i]))) == NULL) { + if ((tcptr = ((objheader_t *) chashSearch(rec->lookupTable, oidmod[i]))) == NULL) { printf("Error: transComProcess() chashSearch returned NULL at %s, %d\n", __FILE__, __LINE__); return 1; } @@ -1170,7 +1038,7 @@ int transComProcess(local_thread_data_array_t *localtdata) { } /* If object is newly created inside transaction then commit it */ for (i = 0; i < numcreated; i++) { - if ((header = ((objheader_t *) chashSearch(localtdata->tdata->rec->lookupTable, oidcreated[i]))) == NULL) { + if ((header = ((objheader_t *) chashSearch(rec->lookupTable, oidcreated[i]))) == NULL) { printf("Error: transComProcess() chashSearch returned NULL for oid = %x at %s, %d\n", oidcreated[i], __FILE__, __LINE__); return 1; } @@ -1299,7 +1167,7 @@ int lookupObject(unsigned int * oid, short offset) { /* This function is called by the thread calling transPrefetch */ void *transPrefetch(void *t) { while(1) { - /* lock mutex of primary prefetch queue */ + /* read from prefetch queue */ void *node=gettail(); /* Check if the tuples are found locally, if yes then reduce them further*/ /* and group requests by remote machine ids by calling the makePreGroups() */ @@ -1895,3 +1763,36 @@ plistnode_t *pInsert(plistnode_t *pile, objheader_t *headeraddr, unsigned int mi return pile; } + +plistnode_t *sortPiles(plistnode_t *pileptr) { + plistnode_t *head, *ptr, *tail; + head = pileptr; + ptr = pileptr; + /* Get tail pointer */ + while(ptr!= NULL) { + tail = ptr; + ptr = ptr->next; + } + ptr = pileptr; + plistnode_t *prev = pileptr; + /* Arrange local machine processing at the end of the pile list */ + while(ptr != NULL) { + if(ptr != tail) { + if(ptr->mid == myIpAddr && (prev != pileptr)) { + prev->next = ptr->next; + ptr->next = NULL; + tail->next = ptr; + return pileptr; + } + if((ptr->mid == myIpAddr) && (prev == pileptr)) { + prev = ptr->next; + ptr->next = NULL; + tail->next = ptr; + return prev; + } + prev = ptr; + } + ptr = ptr->next; + } + return pileptr; +} diff --git a/Robust/src/Runtime/garbage.c b/Robust/src/Runtime/garbage.c index f608e4d6..9684923d 100644 --- a/Robust/src/Runtime/garbage.c +++ b/Robust/src/Runtime/garbage.c @@ -19,7 +19,7 @@ #define NUMPTRS 100 -#define INITIALHEAPSIZE 10*1024 +#define INITIALHEAPSIZE 8192*1024 #define GCPOINT(x) ((int)((x)*0.9)) /* This define takes in how full the heap is initially and returns a new heap size to use */ #define HEAPSIZE(x,y) (((int)((x)/0.6))+y) -- 2.34.1