From d6f780361bf72f4e2dffb524e68c2769b33d94d9 Mon Sep 17 00:00:00 2001 From: adash Date: Fri, 16 Apr 2010 00:18:03 +0000 Subject: [PATCH] grab locks on sockpool for all communications involving cache and prefetch --- .../Runtime/DSTM/interface_recovery/dstm.h | 4 +- .../DSTM/interface_recovery/dstmserver.c | 6 ++- .../Runtime/DSTM/interface_recovery/trans.c | 46 +++++++++++++++++-- 3 files changed, 49 insertions(+), 7 deletions(-) diff --git a/Robust/src/Runtime/DSTM/interface_recovery/dstm.h b/Robust/src/Runtime/DSTM/interface_recovery/dstm.h index fd0bbd36..3349b21a 100644 --- a/Robust/src/Runtime/DSTM/interface_recovery/dstm.h +++ b/Robust/src/Runtime/DSTM/interface_recovery/dstm.h @@ -86,8 +86,8 @@ //Prefetch tuning paramters //#define RETRYINTERVAL 20 //N (For Em3d, SOR, Moldyn benchmarks) //#define SHUTDOWNINTERVAL 3 //M -#define RETRYINTERVAL 75 //N (For MatrixMultiply, 2DFFT benchmarks) -#define SHUTDOWNINTERVAL 1 //M +#define RETRYINTERVAL 60 //N (For MatrixMultiply, 2DFFT benchmarks) +#define SHUTDOWNINTERVAL 10 //M #define NUM_TRY_TO_COMMIT 2 #define MEM_ALLOC_THRESHOLD 20485760//20MB diff --git a/Robust/src/Runtime/DSTM/interface_recovery/dstmserver.c b/Robust/src/Runtime/DSTM/interface_recovery/dstmserver.c index 8e82efcb..163a1684 100644 --- a/Robust/src/Runtime/DSTM/interface_recovery/dstmserver.c +++ b/Robust/src/Runtime/DSTM/interface_recovery/dstmserver.c @@ -1626,7 +1626,11 @@ int prefetchReq(int acceptfd) { freeSockWithLock(transPResponseSocketPool, mid, sd); } mid=oidmid.mid; - sd = getSockWithLock(transPResponseSocketPool, mid); + //sd = getSockWithLock(transPResponseSocketPool, mid); + if((sd = getSockWithLock(transPResponseSocketPool, mid)) < 0) { + printf("%s() Socket Create Error at %s, %d\n", __func__, __FILE__, __LINE__); + return -1; + } } short offsetarry[numoffset]; recv_data((int) acceptfd, offsetarry, numoffset*sizeof(short)); diff --git a/Robust/src/Runtime/DSTM/interface_recovery/trans.c b/Robust/src/Runtime/DSTM/interface_recovery/trans.c index 38954dab..edc9c1be 100644 --- a/Robust/src/Runtime/DSTM/interface_recovery/trans.c +++ b/Robust/src/Runtime/DSTM/interface_recovery/trans.c @@ -463,8 +463,14 @@ void prefetch(int siteid, int ntuples, unsigned int *oids, unsigned short *endof /* Send Prefetch Request */ prefetchpile_t *ptr = pilehead; while(ptr != NULL) { - int sd = getSock2(transPrefetchSockPool, ptr->mid); + //int sd = getSock2(transPrefetchSockPool, ptr->mid); + int sd; + if((sd = getSockWithLock(transPrefetchSockPool, ptr->mid)) < 0) { + printf("%s() Socket Create Error at %s, %d\n", __func__, __FILE__, __LINE__); + return; + } sendPrefetchReq(ptr, sd); + freeSockWithLock(transPrefetchSockPool, ptr->mid, sd); ptr = ptr->next; } @@ -1663,7 +1669,12 @@ void *getRemoteObj(unsigned int mnum, unsigned int oid) { objheader_t *h; void *objcopy = NULL; - int sd = getSock2(transReadSockPool, mnum); + //int sd = getSock2(transReadSockPool, mnum); + int sd; + if((sd = getSockWithLock(transReadSockPool, mnum)) < 0) { + printf("%s() Socket Create Error at %s, %d\n", __func__, __FILE__, __LINE__); + return NULL; + } char readrequest[sizeof(char)+sizeof(unsigned int)]; readrequest[0] = READ_REQUEST; *((unsigned int *)(&readrequest[1])) = oid; @@ -1700,6 +1711,7 @@ void *getRemoteObj(unsigned int mnum, unsigned int oid) { totalObjSize += size; #endif } + freeSockWithLock(transReadSockPool, mnum, sd); return objcopy; } @@ -2840,6 +2852,7 @@ unsigned short getObjType(unsigned int oid) { objheader_t *objheader; unsigned short numoffset[] ={0}; short fieldoffset[] ={}; + int sd=0; if ((objheader = (objheader_t *) mhashSearch(oid)) == NULL) { #ifdef CACHE @@ -2851,10 +2864,18 @@ unsigned short getObjType(unsigned int oid) { unsigned int machineID; static int flipBit = 0; machineID = (flipBit)?(getPrimaryMachine(mid)):(getBackupMachine(mid)); - int sd = getSock2(transReadSockPool, machineID); + //int sd = getSock2(transReadSockPool, machineID); + if((sd = getSockWithLock(transReadSockPool, machineID)) < 0) { + printf("%s() Socket Create Error at %s, %d\n", __func__, __FILE__, __LINE__); + return 0; + } #else unsigned int mid = lhashSearch(oid); - int sd = getSock2(transReadSockPool, mid); + //int sd = getSock2(transReadSockPool, mid); + if((sd = getSockWithLock(transReadSockPool, mid)) < 0) { + printf("%s() Socket Create Error at %s, %d\n", __func__, __FILE__, __LINE__); + return 0; + } #endif char remotereadrequest[sizeof(char)+sizeof(unsigned int)]; remotereadrequest[0] = READ_REQUEST; @@ -2881,18 +2902,34 @@ unsigned short getObjType(unsigned int oid) { pthread_mutex_unlock(&prefetchcache_mutex); recv_data(sd, objheader, size); prehashInsert(oid, objheader); +#ifdef RECOVERY + freeSockWithLock(transReadSockPool, machineID, sd); +#else + freeSockWithLock(transReadSockPool, mid, sd); return TYPE(objheader); +#endif #else char *buffer; if((buffer = calloc(1, size)) == NULL) { printf("%s() Calloc Error %s at line %d\n", __func__, __FILE__, __LINE__); fflush(stdout); +#ifdef RECOVERY + freeSockWithLock(transReadSockPool, machineID, sd); +#else + freeSockWithLock(transReadSockPool, mid, sd); +#endif return 0; } recv_data(sd, buffer, size); objheader = (objheader_t *)buffer; unsigned short type = TYPE(objheader); free(buffer); +#ifdef RECOVERY + freeSockWithLock(transReadSockPool, machineID, sd); +#else + freeSockWithLock(transReadSockPool, mid, sd); +#endif + return type; #endif } @@ -4060,6 +4097,7 @@ void printRecoveryStat() { } printf("**************************\n\n"); fflush(stdout); + fflush(stdout); #else printf("No stat\n"); #endif -- 2.34.1