From 6860b4b5d5944ef92a9138b691661760e110e4ab Mon Sep 17 00:00:00 2001 From: adash Date: Mon, 15 Feb 2010 07:44:04 +0000 Subject: [PATCH] changes to improve commit intensive benchmarks and combine invalid messages and commit messages for speedup --- .../Runtime/DSTM/interface/addUdpEnhance.c | 94 +++++++++---------- .../Runtime/DSTM/interface/addUdpEnhance.h | 4 +- .../src/Runtime/DSTM/interface/dstmserver.c | 3 +- Robust/src/Runtime/DSTM/interface/sandbox.c | 43 ++++----- Robust/src/Runtime/DSTM/interface/trans.c | 59 +++++------- 5 files changed, 93 insertions(+), 110 deletions(-) diff --git a/Robust/src/Runtime/DSTM/interface/addUdpEnhance.c b/Robust/src/Runtime/DSTM/interface/addUdpEnhance.c index adaf6671..71bc6baf 100644 --- a/Robust/src/Runtime/DSTM/interface/addUdpEnhance.c +++ b/Robust/src/Runtime/DSTM/interface/addUdpEnhance.c @@ -83,7 +83,6 @@ void *udpListenBroadcast(void *sockfd) { printf("Listening on port %d, fd = %d\n", UDP_PORT, (int)sockfd); - memset(readBuffer, 0, MAX_SIZE); while(1) { int bytesRcvd = recvfrom((int)sockfd, readBuffer, sizeof(readBuffer), 0, (struct sockaddr *)&servaddr, &socklen); if(bytesRcvd == -1) { @@ -113,76 +112,72 @@ void *udpListenBroadcast(void *sockfd) { /* Function that invalidate objects that * have been currently modified * returns -1 on error and 0 on success */ -int invalidateObj(trans_req_data_t *tdata) { +int invalidateObj(trans_req_data_t *tdata, int pilecount, char finalresponse, int *socklist) { + struct timeval start, end; struct sockaddr_in clientaddr; int retval; - + int i; + int nummod=0; + for(i=0;if.nummod < maxObjsPerMsg) { - /* send single udp msg */ - int iteration = 0; - if((retval = sendUdpMsg(tdata, &clientaddr, iteration)) < 0) { - printf("%s() error in sending udp message at %s, %d\n", __func__, __FILE__, __LINE__); - return -1; - } - } else { - /* Split into several udp msgs */ - int maxUdpMsg = tdata->f.nummod/maxObjsPerMsg; - if (tdata->f.nummod%maxObjsPerMsg) maxUdpMsg++; - int i; - for(i = 1; i <= maxUdpMsg; i++) { - if((retval = sendUdpMsg(tdata, &clientaddr, i)) < 0) { - printf("%s() error in sending udp message at %s, %d\n", __func__, __FILE__, __LINE__); - return -1; - } - } + /* send single udp msg */ + if((retval = sendUdpMsg(tdata, pilecount, nummod, &clientaddr, finalresponse, socklist)) < 0) { + printf("%s() error in sending udp message at %s, %d\n", __func__, __FILE__, __LINE__); + return -1; + } + /* Send trans commit or abort message when prefetching or caching */ + for(i=0;if.nummod)); //sizeof msg - offset += sizeof(short); - int i; - for(i = 0; i < tdata->f.nummod; i++) { - *((unsigned int *) (writeBuffer+offset)) = tdata->oidmod[i]; //copy objects - offset += sizeof(unsigned int); + + while(nummod>0) { + int numtosend=nummod>maxObjsPerMsg?maxObjsPerMsg:nummod; + int localoffset=offset; + int sentmsgs=0; + *((short *)(writeBuffer+offset)) = (short) (sizeof(unsigned int) * numtosend); + localoffset += sizeof(short); + + for(; j < pilecount; j++) { + for(; i < tdata[j].f.nummod; i++) { + *((unsigned int *) (writeBuffer+localoffset)) = tdata[j].oidmod[i]; //copy objects + localoffset += sizeof(unsigned int); + if ((++sentmsgs)==numtosend) { + i++; + goto send; + } + } + i=0; } - } else { // iteration flag > zero, send multiple udp msg - int numObj; - if((tdata->f.nummod - (iteration * maxObjsPerMsg)) > 0) - numObj = maxObjsPerMsg; - else - numObj = tdata->f.nummod - ((iteration - 1)*maxObjsPerMsg); - *((short *)(writeBuffer+offset)) = (short) (sizeof(unsigned int) * numObj); - offset += sizeof(short); - int index = (iteration - 1) * maxObjsPerMsg; - int i; - for(i = 0; i < numObj; i++) { - *((unsigned int *) (writeBuffer+offset)) = tdata->oidmod[index+i]; - offset += sizeof(unsigned int); +send: + if(sendto(udpSockFd, (const void *) writeBuffer, localoffset, 0, (const struct sockaddr *)clientaddr, sizeof(struct sockaddr_in)) < 0) { + perror("sendto error- "); + printf("DEBUG-> sendto error: errorno %d\n", errno); + return -1; } - } - int n; - if((n = sendto(udpSockFd, (const void *) writeBuffer, sizeof(writeBuffer), 0, (const struct sockaddr *)clientaddr, sizeof(struct sockaddr_in))) < 0) { - perror("sendto error- "); - printf("DEBUG-> sendto error: errorno %d\n", errno); - return -1; + nummod= nummod - numtosend; } return 0; } @@ -218,3 +213,4 @@ int invalidateFromPrefetchCache(char *buffer) { } return 0; } + diff --git a/Robust/src/Runtime/DSTM/interface/addUdpEnhance.h b/Robust/src/Runtime/DSTM/interface/addUdpEnhance.h index 5011df31..38cca125 100644 --- a/Robust/src/Runtime/DSTM/interface/addUdpEnhance.h +++ b/Robust/src/Runtime/DSTM/interface/addUdpEnhance.h @@ -21,7 +21,7 @@ int createUdpSocket(); int udpInit(); void *udpListenBroadcast(void *); -int invalidateObj(trans_req_data_t *); +int invalidateObj(trans_req_data_t *, int, char, int*); int invalidateFromPrefetchCache(char *); -int sendUdpMsg(trans_req_data_t *, struct sockaddr_in *, int); +int sendUdpMsg(trans_req_data_t *, int, int, struct sockaddr_in *, char, int*); #endif diff --git a/Robust/src/Runtime/DSTM/interface/dstmserver.c b/Robust/src/Runtime/DSTM/interface/dstmserver.c index 40554000..62de32aa 100644 --- a/Robust/src/Runtime/DSTM/interface/dstmserver.c +++ b/Robust/src/Runtime/DSTM/interface/dstmserver.c @@ -472,7 +472,7 @@ int processClientReq(fixed_data_t *fixed, trans_commit_data_t *transinfo, break; default: - printf("Error: No response to TRANS_AGREE OR DISAGREE protocol %s, %d\n", __FILE__, __LINE__); + printf("Error: No response to TRANS_AGREE OR DISAGREE protocol control = %d %s, %d\n", control, __FILE__, __LINE__); //TODO Use fixed.trans_id TID since Client may have died break; } @@ -814,7 +814,6 @@ void processVerNoMatch(unsigned int *oidnotfound, } /* Condition to send TRANS_SOFT_ABORT */ if((*(v_matchlock) > 0 && *(v_nomatch) == 0) || (*(objnotfound) > 0 && *(v_nomatch) == 0)) { - //if((*(v_matchlock) > 0 && *(v_nomatch) == 0) || (*(objnotfound) > 0 && *(v_nomatch) == 0)) { control = TRANS_SOFT_ABORT; /* Send control message */ diff --git a/Robust/src/Runtime/DSTM/interface/sandbox.c b/Robust/src/Runtime/DSTM/interface/sandbox.c index 29e1bbd6..301910f7 100644 --- a/Robust/src/Runtime/DSTM/interface/sandbox.c +++ b/Robust/src/Runtime/DSTM/interface/sandbox.c @@ -48,36 +48,31 @@ int checktrans() { break; objheader_t *headeraddr=(objheader_t*) curr->val; unsigned int machinenum; - if (STATUS(headeraddr) & NEW || (mhashSearch(curr->key) != NULL)) { - machinenum = myIpAddr; - } else if ((machinenum = lhashSearch(curr->key)) == 0) { - printf("Error: No such machine %s, %d\n", __func__, __LINE__); - return 1; - } - if(machinenum != myIpAddr) + objheader_t *tmp; + + if (STATUS(headeraddr) & NEW) { + //new objects cannot be stale + } else if ((tmp=mhashSearch(curr->key)) != NULL) { + if (tmp->version!=headeraddr->version) { + //version mismatch + deletehead(head); + return 1; //return 1 when objects are inconsistent + } + } else { + machinenum = lhashSearch(curr->key); head = createList(head, headeraddr, machinenum, c_numelements); + } + curr = curr->next; } } /* Send oid and versions for checking */ - int retval=-1; - if(head != NULL) { - retval = verify(head); - } - - if(retval == 1) { //consistent objects - /* free head */ - deletehead(head); + if(head == NULL) return 0; - } - - if(retval == 0) { - /* free head */ - deletehead(head); - return 1; //return 1 when objects are inconsistent - } - - return 0; + + int retval = verify(head); + deletehead(head); + return retval==0; } nodeElem_t * createList(nodeElem_t *head, objheader_t *headeraddr, unsigned int mid, diff --git a/Robust/src/Runtime/DSTM/interface/trans.c b/Robust/src/Runtime/DSTM/interface/trans.c index 99daaeb9..eaf70309 100644 --- a/Robust/src/Runtime/DSTM/interface/trans.c +++ b/Robust/src/Runtime/DSTM/interface/trans.c @@ -121,8 +121,6 @@ void printhex(unsigned char *, int); plistnode_t *createPiles(); plistnode_t *sortPiles(plistnode_t *pileptr); - - /******************************* * Send and Recv function calls *******************************/ @@ -935,6 +933,7 @@ plistnode_t *createPiles() { * Sends a transrequest() to each remote machines for objects found remotely * and calls handleLocalReq() to process objects found locally */ int transCommit() { + char buffer[30]; unsigned int tot_bytes_mod, *listmid; plistnode_t *pile, *pile_ptr; char treplyretry; /* keeps track of the common response that needs to be sent */ @@ -944,6 +943,8 @@ int transCommit() { #ifdef SANDBOX abortenabled=0; #endif + struct writestruct writebuffer; + writebuffer.offset=0; #ifdef LOGEVENTS int iii; @@ -1031,18 +1032,18 @@ int transCommit() { } socklist[sockindex] = sd; /* Send bytes of data with TRANS_REQUEST control message */ - send_data(sd, &(tosend[sockindex].f), sizeof(fixed_data_t)); + send_buf(sd, &writebuffer, &(tosend[sockindex].f), sizeof(fixed_data_t)); /* Send list of machines involved in the transaction */ { int size=sizeof(unsigned int)*(tosend[sockindex].f.mcount); - send_data(sd, tosend[sockindex].listmid, size); + send_buf(sd, &writebuffer, tosend[sockindex].listmid, size); } /* Send oids and version number tuples for objects that are read */ { int size=(sizeof(unsigned int)+sizeof(unsigned short))*(tosend[sockindex].f.numread); - send_data(sd, tosend[sockindex].objread, size); + send_buf(sd, &writebuffer, tosend[sockindex].objread, size); } /* Send objects that are modified */ @@ -1070,7 +1071,7 @@ int transCommit() { memcpy(modptr+offset, headeraddr, size); offset+=size; } - send_data(sd, modptr, tosend[sockindex].f.sum_bytes); + forcesend_buf(sd, &writebuffer, modptr, tosend[sockindex].f.sum_bytes); free(modptr); } else { //handle request locally handleLocalReq(&tosend[sockindex], &transinfo, &getReplyCtrl[sockindex]); @@ -1078,6 +1079,7 @@ int transCommit() { sockindex++; pile = pile->next; } //end of pile processing + /* Recv Ctrl msgs from all machines */ int i; for(i = 0; i < pilecount; i++) { @@ -1123,6 +1125,7 @@ int transCommit() { #endif } } + /* Decide the final response */ if((finalResponse = decideResponse(getReplyCtrl, &treplyretry, pilecount)) == 0) { printf("Error: %s() in updating prefetch cache %s, %d\n", __func__, __FILE__, __LINE__); @@ -1145,17 +1148,6 @@ int transCommit() { free(listmid); return 1; } - - - /* Invalidate objects in other machine cache */ - if(tosend[i].f.nummod > 0) { - if((retval = invalidateObj(&(tosend[i]))) != 0) { - printf("Error: %s() in invalidating Objects %s, %d\n", __func__, __FILE__, __LINE__); - free(tosend); - free(listmid); - return 1; - } - } #ifdef ABORTREADERS removetransaction(tosend[i].oidmod,tosend[i].f.nummod); removethisreadtransaction(tosend[i].objread, tosend[i].f.numread); @@ -1168,7 +1160,9 @@ int transCommit() { } #endif #endif - send_data(sd, &finalResponse, sizeof(char)); +#ifndef CACHE + send_data(sd, &finalResponse, sizeof(char)); +#endif } else { /* Complete local processing */ doLocalProcess(finalResponse, &(tosend[i]), &transinfo); @@ -1184,6 +1178,18 @@ int transCommit() { } } +#ifdef CACHE + { + /* Invalidate objects in other machine cache */ + int retval; + if((retval = invalidateObj(tosend, pilecount,finalResponse,socklist)) != 0) { + printf("Error: %s() in invalidating Objects %s, %d\n", __func__, __FILE__, __LINE__); + free(tosend); + free(listmid); + return 1; + } + } +#endif /* Free resources */ free(tosend); free(listmid); @@ -1195,7 +1201,7 @@ int transCommit() { if(treplyretryCount >= NUM_TRY_TO_COMMIT) exponentialdelay(); else - randomdelay(); + randomdelay(); #ifdef TRANSSTATS nSoftAbort++; #endif @@ -1203,9 +1209,6 @@ int transCommit() { /* Retry trans commit procedure during soft_abort case */ } while (treplyretry); - exponential_backoff.tv_sec = 0; - exponential_backoff.tv_nsec = (long)(10000);//10 microsec_ - if(finalResponse == TRANS_ABORT) { #ifdef TRANSSTATS LOGEVENT('A'); @@ -1283,7 +1286,7 @@ void handleLocalReq(trans_req_data_t *tdata, trans_commit_data_t *transinfo, cha transinfo->modptr = NULL; transinfo->numlocked = numoidlocked; transinfo->numnotfound = numoidnotfound; - + /* Condition to send TRANS_AGREE */ if(v_matchnolock == tdata->f.numread + tdata->f.nummod) { *getReplyCtrl = TRANS_AGREE; @@ -1302,16 +1305,6 @@ void doLocalProcess(char finalResponse, trans_req_data_t *tdata, trans_commit_da return; } } else if(finalResponse == TRANS_COMMIT) { -#ifdef CACHE - /* Invalidate objects in other machine cache */ - if(tdata->f.nummod > 0) { - int retval; - if((retval = invalidateObj(tdata)) != 0) { - printf("Error: %s() in invalidating Objects %s, %d\n", __func__, __FILE__, __LINE__); - return; - } - } -#endif if(transComProcess(tdata, transinfo) != 0) { printf("Error in transComProcess() %s,%d\n", __FILE__, __LINE__); fflush(stdout); -- 2.34.1