From: adash Date: Sat, 13 Mar 2010 01:59:42 +0000 (+0000) Subject: changes for optimizations X-Git-Url: http://demsky.eecs.uci.edu/git/?a=commitdiff_plain;h=352a0863ffe7315faeb8bfd91f32f7e5b319163e;p=IRC.git changes for optimizations --- diff --git a/Robust/src/Runtime/DSTM/interface_recovery/dstm.h b/Robust/src/Runtime/DSTM/interface_recovery/dstm.h index dc0965a1..31212ea4 100644 --- a/Robust/src/Runtime/DSTM/interface_recovery/dstm.h +++ b/Robust/src/Runtime/DSTM/interface_recovery/dstm.h @@ -84,8 +84,9 @@ //Prefetch tuning paramters //#define RETRYINTERVAL 20 //N (For Em3d, SOR, Moldyn benchmarks) //#define SHUTDOWNINTERVAL 3 //M -#define RETRYINTERVAL 100 //N (For MatrixMultiply, 2DFFT benchmarks) +#define RETRYINTERVAL 75 //N (For MatrixMultiply, 2DFFT benchmarks) #define SHUTDOWNINTERVAL 1 //M +#define NUM_TRY_TO_COMMIT 2 #include #include diff --git a/Robust/src/Runtime/DSTM/interface_recovery/dstmserver.c b/Robust/src/Runtime/DSTM/interface_recovery/dstmserver.c index fa17c3da..cf5a0611 100644 --- a/Robust/src/Runtime/DSTM/interface_recovery/dstmserver.c +++ b/Robust/src/Runtime/DSTM/interface_recovery/dstmserver.c @@ -13,6 +13,7 @@ #include "thread.h" #endif #include "gCollect.h" +#include "readstruct.h" #ifdef RECOVERY #include @@ -294,7 +295,7 @@ void *dstmAccept(void *acceptfd) { trans_commit_data_t transinfo; unsigned short objType, *versionarry, version; unsigned int *oidarry, numoid, mid, threadid; - int n, v; + int n, v; #ifdef DEBUG printf("%s-> Entering dstmAccept\n", __func__); fflush(stdout); @@ -302,6 +303,7 @@ void *dstmAccept(void *acceptfd) { /* Receive control messages from other machines */ while(1) { int ret=recv_data_errorcode((int)acceptfd, &control, sizeof(char)); + //int ret=recv_data_errorcode_buf((int)acceptfd, &readbuffer, &control, sizeof(char)); dupeptr = NULL; if (ret==0) @@ -366,6 +368,7 @@ void *dstmAccept(void *acceptfd) { transinfo.modptr = NULL; transinfo.numlocked = 0; transinfo.numnotfound = 0; + //if((val = readClientReq(&transinfo, (int)acceptfd)) != 0) { if((val = readClientReq(&transinfo, (int)acceptfd)) != 0) { printf("Error: In readClientReq() %s, %d\n", __FILE__, __LINE__); pthread_exit(NULL); @@ -920,6 +923,7 @@ int readClientReq(trans_commit_data_t *transinfo, int acceptfd) { return 0; /* Read modified objects */ + //printf("fixed.sum_bytes= %d\n", fixed.sum_bytes); if(fixed.nummod != 0) { if ((modptr = calloc(1, fixed.sum_bytes)) == NULL) { printf("calloc error for modified objects %s, %d\n", __FILE__, __LINE__); @@ -943,12 +947,15 @@ int readClientReq(trans_commit_data_t *transinfo, int acceptfd) { return 1; } ptr = (char *) modptr; + //printf("fixed.nummod= %d\n", fixed.nummod); + //fflush(stdout); for(i = 0 ; i < fixed.nummod; i++) { int tmpsize=0; headaddr = (objheader_t *) ptr; oid = OID(headaddr); oidmod[i] = oid; GETSIZE(tmpsize, headaddr); + //printf("i= %d, tmpsize= %d, oid= %u\n", i, tmpsize, oid); ptr += sizeof(objheader_t) + tmpsize; } #ifdef DEBUG diff --git a/Robust/src/Runtime/DSTM/interface_recovery/readstruct.h b/Robust/src/Runtime/DSTM/interface_recovery/readstruct.h new file mode 100755 index 00000000..2cd83118 --- /dev/null +++ b/Robust/src/Runtime/DSTM/interface_recovery/readstruct.h @@ -0,0 +1,22 @@ +#ifndef READSTRUCT_H +#define READSTRUCT_H +#define MAXBUF 1024 +struct readstruct { + char buf[MAXBUF]; + int head; + int tail; +}; + +#define WMAXBUF 2048 +#define WTOP 512 +struct writestruct { + char buf[WMAXBUF]; + int offset; +}; + +void recv_data_buf(int fd, struct readstruct *, void *buffer, int buflen); +int recv_data_errorcode_buf(int fd, struct readstruct *, void *buffer, int buflen); +void send_buf(int fd, struct writestruct * sendbuffer, void *buffer, int buflen); +void forcesend_buf(int fd, struct writestruct * sendbuffer, void *buffer, int buflen); + +#endif diff --git a/Robust/src/Runtime/DSTM/interface_recovery/trans.c b/Robust/src/Runtime/DSTM/interface_recovery/trans.c index a9030df0..88dd8f6e 100644 --- a/Robust/src/Runtime/DSTM/interface_recovery/trans.c +++ b/Robust/src/Runtime/DSTM/interface_recovery/trans.c @@ -36,6 +36,9 @@ __thread objstr_t *t_cache; __thread struct ___Object___ *revertlist; +__thread struct timespec exponential_backoff; +__thread int count_exponential_backoff; +__thread const int max_exponential_backoff = 1000; // safety limit #ifdef ABORTREADERS __thread int t_abort; __thread jmp_buf aborttrans; @@ -612,6 +615,19 @@ void randomdelay() { return; } +/* This functions inserts exponential backoff delays in the order of msec + * Mostly used when transaction commits retry*/ +void exponentialdelay() { + exponential_backoff.tv_nsec = exponential_backoff.tv_nsec * 2; + nanosleep(&exponential_backoff, NULL); + ++count_exponential_backoff; + if (count_exponential_backoff >= max_exponential_backoff) { + printf(" reached max_exponential_backoff at %s, %s(), %d\n", __FILE__, __func__, __LINE__); + exit(-1); + } + return; +} + /* This function initializes things required in the transaction start*/ void transStart() { t_cache = objstrCreate(1048576); @@ -991,14 +1007,15 @@ plistnode_t *createPiles() { if((mid=lhashSearch(oid)) == 0) { mid = myIpAddr; } - - pile = pInsert(pile, headeraddr, getPrimaryMachine(mid), c_numelements); + + pile = pInsert(pile, headeraddr, getPrimaryMachine(mid), c_numelements); if(numLiveHostsInSystem > 1) { if(makedirty) { STATUS(headeraddr) = DIRTY; + pile = pInsert(pile, headeraddr, getBackupMachine(mid), c_numelements); } - pile = pInsert(pile, headeraddr, getBackupMachine(mid), c_numelements); + //pile = pInsert(pile, headeraddr, getBackupMachine(mid), c_numelements); } #else // Get machine location for object id (and whether local or not) @@ -1093,6 +1110,11 @@ int transCommit() { } #endif + int treplyretryCount = 0; + /* Initialize timeout for exponential delay */ + exponential_backoff.tv_sec = 0; + exponential_backoff.tv_nsec = (long)(10000);//10 microsec + count_exponential_backoff = 0; do { treplyretry = 0; @@ -1159,21 +1181,21 @@ int transCommit() { } socklist[sockindex] = sd; /* Send bytes of data with TRANS_REQUEST control message */ - //send_data(sd, &(tosend[sockindex].f), sizeof(fixed_data_t)); - send_buf(sd, &writebuffer, &(tosend[sockindex].f), sizeof(fixed_data_t)); + send_data(sd, &(tosend[sockindex].f), sizeof(fixed_data_t)); + //send_buf(sd, &writebuffer, &(tosend[sockindex].f), sizeof(fixed_data_t)); /* Send list of machines involved in the transaction */ { int size=sizeof(unsigned int)*(tosend[sockindex].f.mcount); - //send_data(sd, tosend[sockindex].listmid, size); - send_buf(sd, &writebuffer, tosend[sockindex].listmid, size); + send_data(sd, tosend[sockindex].listmid, size); + //send_buf(sd, &writebuffer, tosend[sockindex].listmid, size); } /* Send oids and version number tuples for objects that are read */ { int size=(sizeof(unsigned int)+sizeof(unsigned short))*(tosend[sockindex].f.numread); - //send_data(sd, tosend[sockindex].objread, size); - send_buf(sd, &writebuffer, tosend[sockindex].objread, size); + send_data(sd, tosend[sockindex].objread, size); + //send_buf(sd, &writebuffer, tosend[sockindex].objread, size); } /* Send objects that are modified */ @@ -1186,6 +1208,7 @@ int transCommit() { } int offset = 0; int i; + //printf("tosend[sockindex].f.nummod = %d\n", tosend[sockindex].f.nummod); for(i = 0; i < tosend[sockindex].f.nummod ; i++) { int size; objheader_t *headeraddr; @@ -1197,17 +1220,20 @@ int transCommit() { return 1; } GETSIZE(size,headeraddr); + //printf("i= %d, tmpsize= %d, oid= %u\n", i, size, OID(headeraddr)); size+=sizeof(objheader_t); memcpy(modptr+offset, headeraddr, size); offset+=size; } - //send_data(sd, modptr, tosend[sockindex].f.sum_bytes); - forcesend_buf(sd, &writebuffer, modptr, tosend[sockindex].f.sum_bytes); + //printf("tosend[sockindex].f.sum_bytes= %d\n", tosend[sockindex].f.sum_bytes); + //fflush(stdout); + send_data(sd, modptr, tosend[sockindex].f.sum_bytes); + //forcesend_buf(sd, &writebuffer, modptr, tosend[sockindex].f.sum_bytes); #ifdef RECOVERY /* send transaction id, number of machine involved, machine ids */ - //send_data(sd, &transID, sizeof(unsigned int)); - forcesend_buf(sd, &writebuffer, &transID, sizeof(unsigned int)); + send_data(sd, &transID, sizeof(unsigned int)); + //forcesend_buf(sd, &writebuffer, &transID, sizeof(unsigned int)); #endif free(modptr); } else { //handle request locally @@ -1389,13 +1415,15 @@ int transCommit() { pDelete(pile_ptr); /* wait a random amount of time before retrying to commit transaction*/ if(treplyretry) { - randomdelay(); + treplyretryCount++; + //if(treplyretryCount >= NUM_TRY_TO_COMMIT) + // exponentialdelay(); + //else + randomdelay(); #ifdef TRANSSTATS nSoftAbort++; #endif } - - } while (treplyretry && deadmid != -1); if(finalResponse == TRANS_ABORT) { @@ -2005,6 +2033,15 @@ prefetchpile_t *foundLocal(char *ptr, int numprefetches) { //Add to remote requests machinenum=lhashSearch(oid); +#ifdef RECOVERY + static int flipBit = 0;// Used to distribute requests between primary and backup evenly + // either primary or backup machine + machinenum = (flipBit)?getPrimaryMachine(lhashSearch(oid)):getBackupMachine(lhashSearch(oid)); + flipBit ^= 1; +#ifdef DEBUG + printf("mindex:%d, oid:%d, machinenumber:%s\n", machinenumber, oid, midtoIPString(machinenumber)); +#endif +#endif insertPile(machinenum, oid, endindex-newbase, &arryfields[newbase], &head); tuple: ; @@ -2362,7 +2399,7 @@ unsigned short getObjType(unsigned int oid) { #ifdef RECOVERY unsigned int mid = lhashSearch(oid); unsigned int machineID; - static flipBit = 0; + static int flipBit = 0; machineID = (flipBit)?(getPrimaryMachine(mid)):(getBackupMachine(mid)); int sd = getSock2(transReadSockPool, machineID); #else