From: adash Date: Wed, 3 Mar 2010 23:53:11 +0000 (+0000) Subject: changes for prefetch/caching and performance improvements X-Git-Url: http://demsky.eecs.uci.edu/git/?a=commitdiff_plain;h=cfe36eac1ddc6daed864dce98a0f7e6b58f566a5;p=IRC.git changes for prefetch/caching and performance improvements --- diff --git a/Robust/src/IR/Flat/BuildCode.java b/Robust/src/IR/Flat/BuildCode.java index e7b6291b..182a9aa2 100644 --- a/Robust/src/IR/Flat/BuildCode.java +++ b/Robust/src/IR/Flat/BuildCode.java @@ -313,10 +313,15 @@ public class BuildCode { } if (state.DSM) { - outmethod.println("#ifdef TRANSSTATS \n"); - outmethod.println("handle();\n"); - outmethod.println("#endif\n"); + if (state.DSMRECOVERYSTATS) + outmethod.println("handle();\n"); + else { + outmethod.println("#ifdef TRANSSTATS \n"); + outmethod.println("handle();\n"); + outmethod.println("#endif\n"); + } } + if (state.THREAD||state.DSM||state.SINGLETM) { outmethod.println("initializethreads();"); } diff --git a/Robust/src/Runtime/DSTM/interface_recovery/addUdpEnhance.c b/Robust/src/Runtime/DSTM/interface_recovery/addUdpEnhance.c index 5c9363a0..c7e3f9f4 100644 --- a/Robust/src/Runtime/DSTM/interface_recovery/addUdpEnhance.c +++ b/Robust/src/Runtime/DSTM/interface_recovery/addUdpEnhance.c @@ -113,6 +113,29 @@ void *udpListenBroadcast(void *sockfd) { /* Function that invalidate objects that * have been currently modified * returns -1 on error and 0 on success */ +int invalidateObj(trans_req_data_t *tdata, int pilecount, char finalresponse, int *socklist) { + struct timeval start, end; + struct sockaddr_in clientaddr; + int retval; + int i; + int nummod=0; + for(i=0;if.nummod < maxObjsPerMsg) { - /* send single udp msg */ - int iteration = 0; - if((retval = sendUdpMsg(tdata, &clientaddr, iteration)) < 0) { - printf("%s() error in sending udp message at %s, %d\n", __func__, __FILE__, __LINE__); - return -1; - } - } else { - /* Split into several udp msgs */ - int maxUdpMsg = tdata->f.nummod/maxObjsPerMsg; - if (tdata->f.nummod%maxObjsPerMsg) maxUdpMsg++; - int i; - for(i = 1; i <= maxUdpMsg; i++) { - if((retval = sendUdpMsg(tdata, &clientaddr, i)) < 0) { - printf("%s() error in sending udp message at %s, %d\n", __func__, __FILE__, __LINE__); - return -1; + /* send single udp msg */ + if((retval = sendUdpMsg(tdata, pilecount, nummod, &clientaddr, finalresponse, socklist)) < 0) { + printf("%s() error in sending udp message at %s, %d\n", __func__, __FILE__, __LINE__); + return -1; + } + return 0; +} + +#endif + +/* Function sends a udp broadcast, also distinguishes + * msg size to be sent based on the total number of objects modified + * returns -1 on error and 0 on success */ +int sendUdpMsg(trans_req_data_t *tdata, int pilecount, int nummod, struct sockaddr_in *clientaddr, char finalresponse, int *socklist) { + char writeBuffer[MAX_SIZE]; + int maxObjsPerMsg = (MAX_SIZE - 2*sizeof(unsigned int))/sizeof(unsigned int); + int offset = 0; + int i=0,j=0; + + *((short *)&writeBuffer[0]) = INVALIDATE_OBJS; //control msg + offset += sizeof(short); + *((unsigned int *)(writeBuffer+offset)) = myIpAddr; //mid sending invalidation + offset += sizeof(unsigned int); + + while(nummod>0) { + int numtosend=nummod>maxObjsPerMsg?maxObjsPerMsg:nummod; + int localoffset=offset; + int sentmsgs=0; + *((short *)(writeBuffer+offset)) = (short) (sizeof(unsigned int) * numtosend); + localoffset += sizeof(short); + + for(; j < pilecount; j++) { + for(; i < tdata[j].f.nummod; i++) { + *((unsigned int *) (writeBuffer+localoffset)) = tdata[j].oidmod[i]; //copy objects + localoffset += sizeof(unsigned int); + if ((++sentmsgs)==numtosend) { + i++; + goto send; + } } + i=0; } +send: + if(sendto(udpSockFd, (const void *) writeBuffer, localoffset, 0, (const struct sockaddr *)clientaddr, sizeof(struct sockaddr_in)) < 0) { + perror("sendto error- "); + printf("DEBUG-> sendto error: errorno %d\n", errno); + return -1; + } + nummod= nummod - numtosend; } return 0; } +#if 0 + /* Function sends a udp broadcast, also distinguishes * msg size to be sent based on the iteration flag * returns -1 on error and 0 on success */ @@ -186,6 +242,7 @@ int sendUdpMsg(trans_req_data_t *tdata, struct sockaddr_in *clientaddr, int iter } return 0; } +#endif /* Function searches given oid in prefetch cache and invalidates obj from cache * returns -1 on error and 0 on success */ @@ -209,7 +266,9 @@ int invalidateFromPrefetchCache(char *buffer) { objheader_t *header; /* Lookup Objects in prefetch cache and remove them */ if(((header = prehashSearch(oid)) != NULL)) { - prehashRemove(oid); + //Keep invalid objects + STATUS(header)=DIRTY; + //prehashRemove(oid); } offset += sizeof(unsigned int); } diff --git a/Robust/src/Runtime/DSTM/interface_recovery/addUdpEnhance.h b/Robust/src/Runtime/DSTM/interface_recovery/addUdpEnhance.h index b3964bce..295f8af4 100644 --- a/Robust/src/Runtime/DSTM/interface_recovery/addUdpEnhance.h +++ b/Robust/src/Runtime/DSTM/interface_recovery/addUdpEnhance.h @@ -21,7 +21,9 @@ int createUdpSocket(); int udpInit(); void *udpListenBroadcast(void *); -int invalidateObj(trans_req_data_t *); +//int invalidateObj(trans_req_data_t *); +int invalidateObj(trans_req_data_t *, int, char, int*); int invalidateFromPrefetchCache(char *); -int sendUdpMsg(trans_req_data_t *, struct sockaddr_in *, int); +//int sendUdpMsg(trans_req_data_t *, struct sockaddr_in *, int); +int sendUdpMsg(trans_req_data_t *, int, int, struct sockaddr_in *, char, int*); #endif diff --git a/Robust/src/Runtime/DSTM/interface_recovery/altmlookup.h b/Robust/src/Runtime/DSTM/interface_recovery/altmlookup.h index 34984bdc..877dd3a4 100644 --- a/Robust/src/Runtime/DSTM/interface_recovery/altmlookup.h +++ b/Robust/src/Runtime/DSTM/interface_recovery/altmlookup.h @@ -1,5 +1,5 @@ -#ifndef _ALTMLOOKUP_H_ -#define _ALTMLOOKUP_H_ +#ifndef _MLOOKUP_H_ +#define _MLOOKUP_H_ #include #include diff --git a/Robust/src/Runtime/DSTM/interface_recovery/dstm.h b/Robust/src/Runtime/DSTM/interface_recovery/dstm.h index 5f4eb9d2..1583232d 100644 --- a/Robust/src/Runtime/DSTM/interface_recovery/dstm.h +++ b/Robust/src/Runtime/DSTM/interface_recovery/dstm.h @@ -242,6 +242,7 @@ typedef struct recoverystat { unsigned int deadMachine; long long elapsedTime; unsigned int recoveredData; + unsigned int recvData; } recovery_stat_t; #endif @@ -363,9 +364,9 @@ void sendPrefetchResponse(int sd, char *control, char *sendbuffer, int *size); void prefetch(int, int, unsigned int *, unsigned short *, short*); void *transPrefetch(void *); void *mcqProcess(void *); -prefetchpile_t *foundLocal(char *); // returns node with prefetch elements(oids, offsets) -int lookupObject(unsigned int * oid, short offset); -int checkoid(unsigned int oid); +prefetchpile_t *foundLocal(char *,int); // returns node with prefetch elements(oids, offsets) +int lookupObject(unsigned int * oid, short offset, int *); +int checkoid(unsigned int oid, int); int transPrefetchProcess(int **, short); void sendPrefetchReq(prefetchpile_t*, int); void sendPrefetchReqnew(prefetchpile_t*, int); diff --git a/Robust/src/Runtime/DSTM/interface_recovery/dstmserver.c b/Robust/src/Runtime/DSTM/interface_recovery/dstmserver.c index 811888cd..56a26b57 100644 --- a/Robust/src/Runtime/DSTM/interface_recovery/dstmserver.c +++ b/Robust/src/Runtime/DSTM/interface_recovery/dstmserver.c @@ -4,7 +4,7 @@ #include #include #include "dstm.h" -#include "mlookup.h" +#include "altmlookup.h" #include "llookup.h" #include "threadnotify.h" #include "prefetch.h" diff --git a/Robust/src/Runtime/DSTM/interface_recovery/queue.c b/Robust/src/Runtime/DSTM/interface_recovery/queue.c index fcb58191..06641b62 100644 --- a/Robust/src/Runtime/DSTM/interface_recovery/queue.c +++ b/Robust/src/Runtime/DSTM/interface_recovery/queue.c @@ -58,10 +58,10 @@ void movehead(int size) { void * gettail() { while(tailoffset==headoffset) { //Sleep - // pthread_mutex_lock(&qlock); - // if (tailoffset==headoffset) - // pthread_cond_wait(&qcond, &qlock); - // pthread_mutex_unlock(&qlock); + pthread_mutex_lock(&qlock); + if (tailoffset==headoffset) + pthread_cond_wait(&qcond, &qlock); + pthread_mutex_unlock(&qlock); } if (*((int *)(memory+tailoffset))==-1) { tailoffset=0; //do loop @@ -70,6 +70,38 @@ void * gettail() { return memory+tailoffset+sizeof(int); } +int numavailable() { + int tmp=tailoffset; + int available=0; + if (*((int *)(memory+tmp))==-1) { + tmp=0; + } + while(tmp!=headoffset) { + available++; + tmp=tmp+*((int *)(memory+tmp)); + if (tmp>QSIZE|| (*((int *)(memory+tmp))==-1)) { + break; + } + } + return available; +} + +void incmulttail(int num) { + int i; + for(i=0;iQSIZE) + tailoffset=0; + else + tailoffset=tmpoffset; + } +} + +void resetqueue() { + headoffset=0; + tailoffset=0; +} + void inctail() { int tmpoffset=tailoffset+*((int *)(memory+tailoffset)); if (tmpoffset>QSIZE) diff --git a/Robust/src/Runtime/DSTM/interface_recovery/queue.h b/Robust/src/Runtime/DSTM/interface_recovery/queue.h index 2e1aa9ec..e284615d 100644 --- a/Robust/src/Runtime/DSTM/interface_recovery/queue.h +++ b/Robust/src/Runtime/DSTM/interface_recovery/queue.h @@ -13,4 +13,7 @@ void movehead(int size); void * gettail(); void inctail(); void predealloc(); +int numavailable(); +void resetqueue(); +void incmulttail(int); #endif diff --git a/Robust/src/Runtime/DSTM/interface_recovery/trans.c b/Robust/src/Runtime/DSTM/interface_recovery/trans.c index b751584b..31aa60ab 100644 --- a/Robust/src/Runtime/DSTM/interface_recovery/trans.c +++ b/Robust/src/Runtime/DSTM/interface_recovery/trans.c @@ -1,7 +1,7 @@ #include "dstm.h" #include "ip.h" #include "machinepile.h" -#include "mlookup.h" +#include "altmlookup.h" #include "llookup.h" #include "plookup.h" #include "prelookup.h" @@ -352,17 +352,33 @@ char* midtoIPString(unsigned int mid){ return ip; } #endif + +#define INLINEPREFETCH +#define PREFTHRESHOLD 0 + /* This function is a prefetch call generated by the compiler that * populates the shared primary prefetch queue*/ void prefetch(int siteid, int ntuples, unsigned int *oids, unsigned short *endoffsets, short *arrayfields) { /* Allocate for the queue node*/ int qnodesize = 2*sizeof(int) + ntuples * (sizeof(unsigned short) + sizeof(unsigned int)) + endoffsets[ntuples - 1] * sizeof(short); int len; - char * node= getmemory(qnodesize); +#ifdef INLINEPREFETCH + int attempted=0; + char *node; + do { + node=getmemory(qnodesize); + if (node==NULL&&attempted) + break; + if (node!=NULL) { +#else + char *node=getmemory(qnodesize); +#endif int top=endoffsets[ntuples-1]; - if (node==NULL) + if (node==NULL) { + LOGEVENT('D'); return; + } /* Set queue node values */ /* TODO: Remove this after testing */ @@ -375,8 +391,35 @@ void prefetch(int siteid, int ntuples, unsigned int *oids, unsigned short *endof memcpy(node+len+ntuples*sizeof(unsigned int), endoffsets, ntuples*sizeof(unsigned short)); memcpy(node+len+ntuples*(sizeof(unsigned int)+sizeof(short)), arrayfields, top*sizeof(short)); +#ifdef INLINEPREFETCH + movehead(qnodesize); + } + int numpref=numavailable(); + attempted=1; + + if (node==NULL && numpref!=0 || numpref>=PREFTHRESHOLD) { + node=gettail(); + prefetchpile_t *pilehead = foundLocal(node,numpref); + if (pilehead!=NULL) { + // Get sock from shared pool + + /* Send Prefetch Request */ + prefetchpile_t *ptr = pilehead; + while(ptr != NULL) { + int sd = getSock2(transPrefetchSockPool, ptr->mid); + sendPrefetchReq(ptr, sd); + ptr = ptr->next; + } + + mcdealloc(pilehead); + } + resetqueue(); + }//end do prefetch if condition + } while(node==NULL); +#else /* Lock and insert into primary prefetch queue */ movehead(qnodesize); +#endif } /* This function starts up the transaction runtime. */ @@ -503,12 +546,16 @@ void transInit() { retval=pthread_create(&tPrefetch, NULL, transPrefetchNew, NULL); } while(retval!=0); #else +#ifndef INLINEPREFETCH do { retval=pthread_create(&tPrefetch, NULL, transPrefetch, NULL); } while(retval!=0); #endif +#endif +#ifndef INLINEPREFETCH pthread_detach(tPrefetch); #endif +#endif } /* This function stops the threads spawned */ @@ -1175,7 +1222,20 @@ int transCommit() { #ifdef DEBUG printf("%s-> Final Response: %d\n", __func__, (int)finalResponse); #endif - + +#ifdef CACHE + if (finalResponse == TRANS_COMMIT) { + /* Invalidate objects in other machine cache */ + int retval; + if((retval = invalidateObj(tosend, pilecount,finalResponse,socklist)) != 0) { + printf("Error: %s() in invalidating Objects %s, %d\n", __func__, __FILE__, __LINE__); + free(tosend); + free(listmid); + return 1; + } + } +#endif + /* Send responses to all machines */ for(i = 0; i < pilecount; i++) { int sd = socklist[i]; @@ -1194,6 +1254,7 @@ int transCommit() { return 1; } +#if 0 /* Invalidate objects in other machine cache */ if(tosend[i].f.nummod > 0) { if((retval = invalidateObj(&(tosend[i]))) != 0) { @@ -1203,6 +1264,7 @@ int transCommit() { return 1; } } +#endif #ifdef ABORTREADERS removetransaction(tosend[i].oidmod,tosend[i].f.nummod); removethisreadtransaction(tosend[i].objread, tosend[i].f.numread); @@ -1369,16 +1431,6 @@ void doLocalProcess(char finalResponse, trans_req_data_t *tdata, trans_commit_da return; } } else if(finalResponse == TRANS_COMMIT) { -#ifdef CACHE - /* Invalidate objects in other machine cache */ - if(tdata->f.nummod > 0) { - int retval; - if((retval = invalidateObj(tdata)) != 0) { - printf("Error: %s() in invalidating Objects %s, %d\n", __func__, __FILE__, __LINE__); - return; - } - } -#endif if(transComProcess(tdata, transinfo) != 0) { printf("Error in transComProcess() %s,%d\n", __FILE__, __LINE__); fflush(stdout); @@ -1833,6 +1885,70 @@ int transComProcess(trans_req_data_t *tdata, trans_commit_data_t *transinfo) { return 0; } +prefetchpile_t *foundLocal(char *ptr, int numprefetches) { + int i; + int j; + prefetchpile_t * head=NULL; + + for(j=0;j 1) { + return 0; + } + } + } else { + return 0; + } + if(TYPE(header) >= NUMCLASSES) { + int elementsize = classsize[TYPE(header)]; + struct ArrayObject *ao = (struct ArrayObject *) (((char *)header) + sizeof(objheader_t)); + int length = ao->___length___; + /* Check if array out of bounds */ + if(offset < 0 || offset >= length) { + //if yes treat the object as found + (*oid)=0; + return 1; + } + (*oid) = *((unsigned int *)(((char *)ao) + sizeof(struct ArrayObject) + (elementsize*offset))); + return 1; + } else { + (*oid) = *((unsigned int *)(((char *)header) + sizeof(objheader_t) + offset)); + return 1; + } +} + +#if 0 int lookupObject(unsigned int * oid, short offset) { objheader_t *header; if ((header=mhashSearch(*oid))!=NULL) { @@ -1918,9 +2091,42 @@ int lookupObject(unsigned int * oid, short offset) { return 1; } } +#endif + +/* This function is called by the thread calling transPrefetch */ +void *transPrefetch(void *t) { + while(1) { + /* read from prefetch queue */ + void *node=gettail(); + /* Check if the tuples are found locally, if yes then reduce them further*/ + /* and group requests by remote machine ids by calling the makePreGroups() */ + int count=numavailable(); + prefetchpile_t *pilehead = foundLocal(node, count); + if (pilehead!=NULL) { + // Get sock from shared pool + + /* Send Prefetch Request */ + prefetchpile_t *ptr = pilehead; + while(ptr != NULL) { + int sd = getSock2(transPrefetchSockPool, ptr->mid); + sendPrefetchReq(ptr, sd); + ptr = ptr->next; + } + + /* Release socket */ + // freeSock(transPrefetchSockPool, pilehead->mid, sd); + + /* Deallocated pilehead */ + mcdealloc(pilehead); + } + // Deallocate the prefetch queue pile node + incmulttail(count); + } +} /* This function is called by the thread calling transPrefetch */ +#if 0 void *transPrefetch(void *t) { while(1) { /* read from prefetch queue */ @@ -1950,6 +2156,7 @@ void *transPrefetch(void *t) { inctail(); } } +#endif void sendPrefetchReqnew(prefetchpile_t *mcpilenode, int sd) { objpile_t *tmp; @@ -2532,7 +2739,8 @@ void duplicateLostObjects(unsigned int mid){ numRecovery++; long long st; long long fi; - unsigned int dupeSize = 0; // to calculate the size of backed up data + unsigned int dupeSize; // to calculate the size of backed up data + unsigned int recvDataSize = 0; // to calculate the size of recv data st = myrdtsc(); // to get clock recoverStat[numRecovery-1].deadMachine = mid; @@ -2577,7 +2785,6 @@ void duplicateLostObjects(unsigned int mid){ if(((psd = getSockWithLock(transPrefetchSockPool, originalMid)) < 0 ) || ((bsd = getSockWithLock(transPrefetchSockPool,backupMid)) <0)) { printf("%s -> Socket create error\n",__func__); exit(0); - } /* request for original */ char duperequest; @@ -2591,17 +2798,19 @@ void duplicateLostObjects(unsigned int mid){ send_data(bsd, &originalMid, sizeof(unsigned int)); char p_response,b_response; - unsigned int p_receivedSize,b_receivedSize; + unsigned int p_receivedSize,b_receivedSize; recv_data(psd, &p_response, sizeof(char)); recv_data(psd, &p_receivedSize, sizeof(unsigned int)); dupeSize += p_receivedSize; // size of primary data + recvDataSize += p_receivedSize; // size of primary data recv_data(bsd, &b_response, sizeof(char)); recv_data(bsd, &b_receivedSize, sizeof(unsigned int)); dupeSize += b_receivedSize; // size of backup data + recvDataSize += b_receivedSize; // size of backup data if(p_response != DUPLICATION_COMPLETE || b_response != DUPLICATION_COMPLETE) { @@ -2616,6 +2825,7 @@ void duplicateLostObjects(unsigned int mid){ fi = myrdtsc(); recoverStat[numRecovery-1].elapsedTime = (fi-st)/CPU_FREQ; recoverStat[numRecovery-1].recoveredData = dupeSize; + recoverStat[numRecovery-1].recvData = recvDataSize; printRecoveryStat(); #endif @@ -3479,7 +3689,8 @@ void printRecoveryStat() { for(i=0; i < numRecovery;i++) { printf("Dead Machine = %s\n",midtoIPString(recoverStat[i].deadMachine)); printf("Recoveryed data(byte) = %u\n",recoverStat[i].recoveredData); - printf("Recovery Time(us) = %ld\n",recoverStat[i].elapsedTime); + printf("Recovery Time(ms) = %ld\n",recoverStat[i].elapsedTime); + printf("Data recv(bytes) = %ld\n",recoverStat[i].recvData); } printf("**************************\n\n"); fflush(stdout); diff --git a/Robust/src/Runtime/runtime.c b/Robust/src/Runtime/runtime.c index 56c003dc..9e939227 100644 --- a/Robust/src/Runtime/runtime.c +++ b/Robust/src/Runtime/runtime.c @@ -232,9 +232,11 @@ void CALL02(___System______deepArrayCopy____L___Object____L___Object___, struct void CALL11(___System______exit____I,int ___status___, int ___status___) { #ifdef TRANSSTATS +#ifndef RECOVERY printf("numTransCommit = %d\n", numTransCommit); printf("numTransAbort = %d\n", numTransAbort); printf("nSoftAbort = %d\n", nSoftAbort); +#endif #ifdef STM printf("nSoftAbortCommit = %d\n", nSoftAbortCommit); printf("nSoftAbortAbort = %d\n", nSoftAbortAbort);