From 96931de123d68dcf7f6d387778fb830302db9a9a Mon Sep 17 00:00:00 2001 From: adash Date: Sat, 6 Mar 2010 22:55:04 +0000 Subject: [PATCH] changes to trans.c for performance improvement --- .../DSTM/interface_recovery/dstmserver.c | 2 +- .../Runtime/DSTM/interface_recovery/trans.c | 173 ++++++++++++------ 2 files changed, 122 insertions(+), 53 deletions(-) diff --git a/Robust/src/Runtime/DSTM/interface_recovery/dstmserver.c b/Robust/src/Runtime/DSTM/interface_recovery/dstmserver.c index 56a26b57..fa17c3da 100644 --- a/Robust/src/Runtime/DSTM/interface_recovery/dstmserver.c +++ b/Robust/src/Runtime/DSTM/interface_recovery/dstmserver.c @@ -944,7 +944,7 @@ int readClientReq(trans_commit_data_t *transinfo, int acceptfd) { } ptr = (char *) modptr; for(i = 0 ; i < fixed.nummod; i++) { - int tmpsize; + int tmpsize=0; headaddr = (objheader_t *) ptr; oid = OID(headaddr); oidmod[i] = oid; diff --git a/Robust/src/Runtime/DSTM/interface_recovery/trans.c b/Robust/src/Runtime/DSTM/interface_recovery/trans.c index 2f4186d2..a9030df0 100644 --- a/Robust/src/Runtime/DSTM/interface_recovery/trans.c +++ b/Robust/src/Runtime/DSTM/interface_recovery/trans.c @@ -26,8 +26,6 @@ #include #include "tlookup.h" -//#define CPU_FREQ 2992440 - #define CPU_FREQ 3056842 #endif @@ -77,6 +75,7 @@ int numTransAbort = 0; int nchashSearch = 0; int nmhashSearch = 0; int nprehashSearch = 0; +int ndirtyCacheObj = 0; int nRemoteSend = 0; int nSoftAbort = 0; int bytesSent = 0; @@ -98,7 +97,7 @@ unsigned int *locateObjHosts; int waitThreadMid; unsigned int waitThreadID; -int transRetryFlag; +__thread int transRetryFlag; unsigned int transIDMin; unsigned int transIDMax; @@ -208,6 +207,34 @@ GDBSEND1: return 0; // completed sending data } +void send_buf(int fd, struct writestruct * sendbuffer, void *buffer, int buflen) { + if (buflen+sendbuffer->offset>WMAXBUF) { + send_data(fd, sendbuffer->buf, sendbuffer->offset); + sendbuffer->offset=0; + send_data(fd, buffer, buflen); + return; + } + memcpy(&sendbuffer->buf[sendbuffer->offset], buffer, buflen); + sendbuffer->offset+=buflen; + if (sendbuffer->offset>WTOP) { + send_data(fd, sendbuffer->buf, sendbuffer->offset); + sendbuffer->offset=0; + } +} + +void forcesend_buf(int fd, struct writestruct * sendbuffer, void *buffer, int buflen) { + if (buflen+sendbuffer->offset>WMAXBUF) { + send_data(fd, sendbuffer->buf, sendbuffer->offset); + sendbuffer->offset=0; + send_data(fd, buffer, buflen); + return; + } + memcpy(&sendbuffer->buf[sendbuffer->offset], buffer, buflen); + sendbuffer->offset+=buflen; + send_data(fd, sendbuffer->buf, sendbuffer->offset); + sendbuffer->offset=0; +} + //Returns negative value if receive cannot be completed because of //timeout or machine failure @@ -222,6 +249,7 @@ int recv_data(int fd, void *buf, int buflen) { GDBRECV1: #endif numbytes = recv(fd, buffer, size, 0); + bytesRecv += numbytes; if (numbytes>0) { buffer += numbytes; @@ -307,6 +335,7 @@ int recv_data_errorcode(int fd, void *buf, int buflen) { perror("recv_data_errorcode"); return -1; } + bytesRecv += numbytes; buffer += numbytes; size -= numbytes; } @@ -687,6 +716,12 @@ __attribute__((pure)) objheader_t *transRead(unsigned int oid) { } else { #ifdef CACHE if((tmp = (objheader_t *) prehashSearch(oid)) != NULL) { + if(STATUS(tmp) & DIRTY) { +#ifdef TRANSSTATS + ndirtyCacheObj++; +#endif + goto remoteread; + } #ifdef TRANSSTATS nprehashSearch++; #endif @@ -703,6 +738,7 @@ __attribute__((pure)) objheader_t *transRead(unsigned int oid) { return objcopy; #endif } +remoteread: #endif /* Get the object from the remote location */ if((machinenumber = lhashSearch(oid)) == 0) { @@ -719,6 +755,24 @@ __attribute__((pure)) objheader_t *transRead(unsigned int oid) { nRemoteSend++; #endif #ifdef COMPILER +#ifdef CACHE + //Copy object to prefetch cache + pthread_mutex_lock(&prefetchcache_mutex); + objheader_t *headerObj; + int size; + GETSIZE(size, objcopy); + if((headerObj = prefetchobjstrAlloc(size + sizeof(objheader_t))) == NULL) { + printf("%s(): Error in getting memory from prefetch cache at %s, %d\n", __func__, + __FILE__, __LINE__); + pthread_mutex_unlock(&prefetchcache_mutex); + return NULL; + } + pthread_mutex_unlock(&prefetchcache_mutex); + memcpy(headerObj, objcopy, size+sizeof(objheader_t)); + //make an entry in prefetch lookup hashtable + prehashInsert(oid, headerObj); + LOGEVENT('B'); +#endif return &objcopy[1]; #else return objcopy; @@ -779,6 +833,12 @@ __attribute__((pure)) objheader_t *transRead2(unsigned int oid) { } else { #ifdef CACHE if((tmp = (objheader_t *) prehashSearch(oid)) != NULL) { + if(STATUS(tmp) & DIRTY) { +#ifdef TRANSSTATS + ndirtyCacheObj++; +#endif + goto remoteread; + } #ifdef TRANSSTATS LOGEVENT('P') nprehashSearch++; @@ -796,8 +856,9 @@ __attribute__((pure)) objheader_t *transRead2(unsigned int oid) { return objcopy; #endif } +remoteread: #endif - /* Get the object from the remote location */ + /* Get the object from the remote location */ if((machinenumber = lhashSearch(oid)) == 0) { printf("Error: %s() No machine found for oid =% %s,%dx\n",__func__, machinenumber, __FILE__, __LINE__); return NULL; @@ -818,7 +879,7 @@ __attribute__((pure)) objheader_t *transRead2(unsigned int oid) { #endif #endif - objcopy = getRemoteObj(machinenumber, oid); + objcopy = getRemoteObj(machinenumber, oid); #ifdef RECOVERY if(transRetryFlag) { @@ -836,9 +897,27 @@ __attribute__((pure)) objheader_t *transRead2(unsigned int oid) { } else { #ifdef TRANSSTATS LOGEVENT('R'); - nRemoteSend++; + nRemoteSend++; #endif #ifdef COMPILER +#ifdef CACHE + //Copy object to prefetch cache + pthread_mutex_lock(&prefetchcache_mutex); + objheader_t *headerObj; + int size; + GETSIZE(size, objcopy); + if((headerObj = prefetchobjstrAlloc(size+sizeof(objheader_t))) == NULL) { + printf("%s(): Error in getting memory from prefetch cache at %s, %d\n", __func__, + __FILE__, __LINE__); + pthread_mutex_unlock(&prefetchcache_mutex); + return NULL; + } + pthread_mutex_unlock(&prefetchcache_mutex); + memcpy(headerObj, objcopy, size+sizeof(objheader_t)); + //make an entry in prefetch lookup hashtable + prehashInsert(oid, headerObj); +#endif + return &objcopy[1]; #else return objcopy; @@ -848,7 +927,6 @@ __attribute__((pure)) objheader_t *transRead2(unsigned int oid) { #ifdef DEBUG printf("%s -> Finished!!\n",__func__); #endif - } /* This function creates objects in the transaction record */ @@ -857,8 +935,8 @@ objheader_t *transCreateObj(unsigned int size) { OID(tmp) = getNewOID(); tmp->notifylist = NULL; tmp->version = 1; - tmp->rcount = 1; - tmp->isBackup = 0; + //tmp->rcount = 1; + tmp->isBackup = 0; STATUS(tmp) = NEW; t_chashInsert(OID(tmp), tmp); @@ -903,8 +981,6 @@ plistnode_t *createPiles() { int makedirty = 0; unsigned int mid; - mid = lhashSearch(oid); - // if the obj is dirty or new if(STATUS(headeraddr) & DIRTY || STATUS(headeraddr) & NEW) { // set flag for backup machine @@ -912,7 +988,7 @@ plistnode_t *createPiles() { } // if the obj is new or local, destination will be my Ip - if((mid = lhashSearch(oid)) == 0) { + if((mid=lhashSearch(oid)) == 0) { mid = myIpAddr; } @@ -989,7 +1065,8 @@ int transCommit() { int firsttime=1; trans_commit_data_t transinfo; /* keeps track of objs locked during transaction */ char finalResponse; - + struct writestruct writebuffer; + writebuffer.offset=0; #ifdef RECOVERY int deadsd = -1; int deadmid = -1; @@ -1040,16 +1117,16 @@ int transCommit() { /* Create a socket and getReplyCtrl array, initialize */ int socklist[pilecount]; + char getReplyCtrl[pilecount]; int loopcount; - for(loopcount = 0 ; loopcount < pilecount; loopcount++) + for(loopcount = 0 ; loopcount < pilecount; loopcount++) { socklist[loopcount] = 0; - char getReplyCtrl[pilecount]; - for(loopcount = 0 ; loopcount < pilecount; loopcount++) getReplyCtrl[loopcount] = 0; + } /* Process each machine pile */ int sockindex = 0; - int localReqsock = -1; + int localReqsock = -1; trans_req_data_t *tosend; tosend = calloc(pilecount, sizeof(trans_req_data_t)); while(pile != NULL) { @@ -1082,18 +1159,21 @@ int transCommit() { } socklist[sockindex] = sd; /* Send bytes of data with TRANS_REQUEST control message */ - send_data(sd, &(tosend[sockindex].f), sizeof(fixed_data_t)); + //send_data(sd, &(tosend[sockindex].f), sizeof(fixed_data_t)); + send_buf(sd, &writebuffer, &(tosend[sockindex].f), sizeof(fixed_data_t)); /* Send list of machines involved in the transaction */ { int size=sizeof(unsigned int)*(tosend[sockindex].f.mcount); - send_data(sd, tosend[sockindex].listmid, size); + //send_data(sd, tosend[sockindex].listmid, size); + send_buf(sd, &writebuffer, tosend[sockindex].listmid, size); } /* Send oids and version number tuples for objects that are read */ { int size=(sizeof(unsigned int)+sizeof(unsigned short))*(tosend[sockindex].f.numread); - send_data(sd, tosend[sockindex].objread, size); + //send_data(sd, tosend[sockindex].objread, size); + send_buf(sd, &writebuffer, tosend[sockindex].objread, size); } /* Send objects that are modified */ @@ -1121,11 +1201,13 @@ int transCommit() { memcpy(modptr+offset, headeraddr, size); offset+=size; } - send_data(sd, modptr, tosend[sockindex].f.sum_bytes); + //send_data(sd, modptr, tosend[sockindex].f.sum_bytes); + forcesend_buf(sd, &writebuffer, modptr, tosend[sockindex].f.sum_bytes); #ifdef RECOVERY /* send transaction id, number of machine involved, machine ids */ - send_data(sd, &transID, sizeof(unsigned int)); + //send_data(sd, &transID, sizeof(unsigned int)); + forcesend_buf(sd, &writebuffer, &transID, sizeof(unsigned int)); #endif free(modptr); } else { //handle request locally @@ -1254,17 +1336,6 @@ int transCommit() { return 1; } -#if 0 - /* Invalidate objects in other machine cache */ - if(tosend[i].f.nummod > 0) { - if((retval = invalidateObj(&(tosend[i]))) != 0) { - printf("Error: %s() in invalidating Objects %s, %d\n", __func__, __FILE__, __LINE__); - free(tosend); - free(listmid); - return 1; - } - } -#endif #ifdef ABORTREADERS removetransaction(tosend[i].oidmod,tosend[i].f.nummod); removethisreadtransaction(tosend[i].objread, tosend[i].f.numread); @@ -1863,7 +1934,6 @@ int transComProcess(trans_req_data_t *tdata, trans_commit_data_t *transinfo) { memcpy(ptrcreate, header, tmpsize); mhashInsert(oidcreated[i], ptrcreate); lhashInsert(oidcreated[i], myIpAddr); -// printf("oid created : %u\n",oidcreated[i]); } /* Unlock locked objects */ int useWriteUnlock = 0; @@ -2205,6 +2275,9 @@ void sendPrefetchReq(prefetchpile_t *mcpilenode, int sd) { *((int*)buf) = tmp->numoffset; buf+=sizeof(int); *((unsigned int *)buf) = tmp->oid; +#ifdef TRANSSTATS + sendRemoteReq++; +#endif buf+=sizeof(unsigned int); *((unsigned int *)buf) = myIpAddr; buf += sizeof(unsigned int); @@ -2216,7 +2289,6 @@ void sendPrefetchReq(prefetchpile_t *mcpilenode, int sd) { /* Send a special char -1 to represent the end of sending oids + offset pair to remote machine */ endpair = -1; send_data(sd, &endpair, sizeof(int)); - return; } @@ -2229,7 +2301,9 @@ int getPrefetchResponse(int sd) { recv_data((int)sd, &length, sizeof(int)); size = length - sizeof(int); char recvbuffer[size]; - +#ifdef TRANSSTATS + getResponse++; +#endif recv_data((int)sd, recvbuffer, size); control = *((char *) recvbuffer); if(control == OBJECT_FOUND) { @@ -2739,8 +2813,7 @@ void duplicateLostObjects(unsigned int mid){ numRecovery++; long long st; long long fi; - unsigned int dupeSize; // to calculate the size of backed up data - unsigned int recvDataSize = 0; // to calculate the size of recv data + unsigned int dupeSize = 0; // to calculate the size of backed up data st = myrdtsc(); // to get clock recoverStat[numRecovery-1].deadMachine = mid; @@ -2778,12 +2851,15 @@ void duplicateLostObjects(unsigned int mid){ * Backup 26 21,24 */ +#ifdef RECOVERYSTATS dupeSize = 0; +#endif if(((psd = getSockWithLock(transPrefetchSockPool, originalMid)) < 0 ) || ((bsd = getSockWithLock(transPrefetchSockPool,backupMid)) <0)) { printf("%s -> Socket create error\n",__func__); exit(0); } + /* request for original */ char duperequest; duperequest = DUPLICATE_ORIGINAL; @@ -2801,14 +2877,16 @@ void duplicateLostObjects(unsigned int mid){ recv_data(psd, &p_response, sizeof(char)); recv_data(psd, &p_receivedSize, sizeof(unsigned int)); +#ifdef RECOVERYSTATS dupeSize += p_receivedSize; // size of primary data - recvDataSize += p_receivedSize; // size of primary data +#endif recv_data(bsd, &b_response, sizeof(char)); recv_data(bsd, &b_receivedSize, sizeof(unsigned int)); +#ifdef RECOVERYSTATS dupeSize += b_receivedSize; // size of backup data - recvDataSize += b_receivedSize; // size of backup data +#endif if(p_response != DUPLICATION_COMPLETE || b_response != DUPLICATION_COMPLETE) { @@ -2823,8 +2901,6 @@ void duplicateLostObjects(unsigned int mid){ fi = myrdtsc(); recoverStat[numRecovery-1].elapsedTime = (fi-st)/CPU_FREQ; recoverStat[numRecovery-1].recoveredData = dupeSize; - recoverStat[numRecovery-1].recvData = recvDataSize; - printRecoveryStat(); #endif @@ -3678,9 +3754,10 @@ int checkiftheMachineDead(unsigned int mid) { int mIndex = findHost(mid); return getStatus(mIndex); } +#endif -#ifdef RECOVERYSTATS void printRecoveryStat() { +#ifdef RECOVERYSTATS printf("\n***** Recovery Stats *****\n"); printf("numRecovery = %d\n",numRecovery); int i; @@ -3688,18 +3765,10 @@ void printRecoveryStat() { printf("Dead Machine = %s\n",midtoIPString(recoverStat[i].deadMachine)); printf("Recoveryed data(byte) = %u\n",recoverStat[i].recoveredData); printf("Recovery Time(ms) = %ld\n",recoverStat[i].elapsedTime); - printf("Data recv(bytes) = %ld\n",recoverStat[i].recvData); } printf("**************************\n\n"); fflush(stdout); -} #else -void printRecoveryStat() { printf("No stat\n"); -} -#endif - - - - #endif +} -- 2.34.1