From 1c25e9e3361d5f6ad879249f2a4fd535a4dc2f66 Mon Sep 17 00:00:00 2001 From: adash Date: Wed, 14 Oct 2009 00:14:00 +0000 Subject: [PATCH] changes to manual prefetching (combine severals sends/recvs) add logevent to SingleTM microbenchmark --- .../MicroBenchmarks/SingleObjectMod.java | 1 + Robust/src/ClassLibrary/System.java | 1 + Robust/src/Runtime/DSTM/interface/prefetch.c | 172 +++++++++++------- Robust/src/Runtime/DSTM/interface/prefetch.h | 6 +- Robust/src/Runtime/runtime.c | 25 ++- 5 files changed, 132 insertions(+), 73 deletions(-) diff --git a/Robust/src/Benchmarks/SingleTM/MicroBenchmarks/SingleObjectMod.java b/Robust/src/Benchmarks/SingleTM/MicroBenchmarks/SingleObjectMod.java index 0f6ed69b..75403dd5 100644 --- a/Robust/src/Benchmarks/SingleTM/MicroBenchmarks/SingleObjectMod.java +++ b/Robust/src/Benchmarks/SingleTM/MicroBenchmarks/SingleObjectMod.java @@ -190,6 +190,7 @@ public class SingleObjectMod extends Thread { mysom[i] = new SingleObjectMod(nthreads, som.arrysize, som.loopsize, som.lsize1, som.lsize2, som.prob, i, som.mainobj, rand); } + System.logevent(); for(int i = 0; i < nthreads; i++) { mysom[i].start(); } diff --git a/Robust/src/ClassLibrary/System.java b/Robust/src/ClassLibrary/System.java index 81845e29..b1dd8f7f 100644 --- a/Robust/src/ClassLibrary/System.java +++ b/Robust/src/ClassLibrary/System.java @@ -68,6 +68,7 @@ public class System { /* Only used for microbenchmark testing of SingleTM version */ public static native void logevent(int event); + public static native void logevent(); /* Only used for microbenchmark testing of SingleTM version */ public static native void initLog(); diff --git a/Robust/src/Runtime/DSTM/interface/prefetch.c b/Robust/src/Runtime/DSTM/interface/prefetch.c index 956dd000..2a083aaf 100644 --- a/Robust/src/Runtime/DSTM/interface/prefetch.c +++ b/Robust/src/Runtime/DSTM/interface/prefetch.c @@ -31,8 +31,9 @@ void *transPrefetchNew() { /* Read from prefetch queue */ void *node = gettail(); + int count = numavailable(); /* Check tuples if they are found locally */ - perMcPrefetchList_t* pilehead = processLocal(node); + perMcPrefetchList_t* pilehead = processLocal(node,count); if (pilehead!=NULL) { @@ -48,74 +49,80 @@ void *transPrefetchNew() { /* Deallocated pilehead */ proPrefetchQDealloc(pilehead); } + // Deallocate the prefetch queue pile node - inctail(); + incmulttail(count); + //inctail(); } } -perMcPrefetchList_t *processLocal(char *ptr) { - unsigned int oid = *(GET_OID(ptr)); - short numoffset = *(GET_NUM_OFFSETS(ptr)); - short *offsetarray = GET_OFFSETS(ptr); - int top; - unsigned int dfsList[numoffset]; - int offstop=numoffset-2; - +perMcPrefetchList_t *processLocal(char *ptr, int numprefetches) { + int i,j; /* Initialize */ perMcPrefetchList_t *head = NULL; - objheader_t * header = searchObj(oid); - if (header==NULL) { - //forward prefetch - int machinenum = lhashSearch(oid); - insertPrefetch(machinenum, oid, numoffset, offsetarray, &head); - return head; - } - dfsList[0]=oid; - dfsList[1]=0; - - - //Start searching the dfsList - for(top=0; top>=0;) { - oid=getNextOid(header, offsetarray, dfsList, top); - if (oid&1) { - int oldisField=TYPE(header) < NUMCLASSES; - top+=2; - dfsList[top]=oid; - dfsList[top+1]=0; - header=searchObj(oid); - if (header==NULL) { - //forward prefetch - int machinenum = lhashSearch(oid); - - if (oldisField&&(dfsList[top-1]!=GET_RANGE(offsetarray[top+1]))) - insertPrefetch(machinenum, oid, 2+numoffset-top, &offsetarray[top-2], &head); - else - insertPrefetch(machinenum, oid, numoffset-top, &offsetarray[top], &head); - } else if (top=0;) { + oid=getNextOid(header, offsetarray, dfsList, top); + if (oid&1) { + int oldisField=TYPE(header) < NUMCLASSES; + top+=2; + dfsList[top]=oid; + dfsList[top+1]=0; + header=searchObj(oid); + if (header==NULL) { + //forward prefetch + int machinenum = lhashSearch(oid); + + if (oldisField&&(dfsList[top-1]!=GET_RANGE(offsetarray[top+1]))) + insertPrefetch(machinenum, oid, 2+numoffset-top, &offsetarray[top-2], &head); + else + insertPrefetch(machinenum, oid, numoffset-top, &offsetarray[top], &head); + } else if (toplist; while(tmp != NULL) { len = sizeof(int) + sizeof(unsigned int) + sizeof(unsigned int) + ((tmp->numoffset) * sizeof(short)); - char oidnoffset[len]; + char oidnoffset[len+5]; char *buf=oidnoffset; + if (first) { + *buf=TRANS_PREFETCH; + buf++;len++; + first=0; + } *((int*)buf) = tmp->numoffset; buf+=sizeof(int); *((unsigned int *)buf) = tmp->oid; buf+=sizeof(unsigned int); +#ifdef TRANSSTATS + sendRemoteReq++; +#endif *((unsigned int *)buf) = mid; buf += sizeof(unsigned int); memcpy(buf, tmp->offsets, (tmp->numoffset)*sizeof(short)); - send_data(sd, oidnoffset, len); tmp = tmp->next; + if(tmp==NULL) { + *((int*)(&oidnoffset[len]))=-1; + len+=sizeof(int); + } + if(tmp!=NULL) + send_buf(sd, & writebuffer, oidnoffset, len); + else + forcesend_buf(sd, & writebuffer, oidnoffset, len); + //send_data(sd, oidnoffset, len); + //tmp = tmp->next; } /* Send a special char -1 to represent the end of sending oids + offset pair to remote machine */ - endpair = -1; - send_data(sd, &endpair, sizeof(int)); + //endpair = -1; + //send_data(sd, &endpair, sizeof(int)); return; } @@ -336,6 +364,9 @@ int getRangePrefetchResponse(int sd, struct readstruct * readbuffer) { recv_data_buf(sd, readbuffer, &length, sizeof(int)); int size = length - sizeof(int); char recvbuffer[size]; +#ifdef TRANSSTATS + getResponse++; +#endif recv_data_buf(sd, readbuffer, recvbuffer, size); char control = *((char *) recvbuffer); unsigned int oid; @@ -394,6 +425,7 @@ int rangePrefetchReq(int acceptfd, struct readstruct * readbuffer) { int numoffset, sd = -1; unsigned int baseoid, mid = -1; oidmidpair_t oidmid; + struct writestruct writebuffer; while (1) { recv_data_buf(acceptfd, readbuffer, &numoffset, sizeof(int)); @@ -402,10 +434,13 @@ int rangePrefetchReq(int acceptfd, struct readstruct * readbuffer) { recv_data_buf(acceptfd, readbuffer, &oidmid, 2*sizeof(unsigned int)); baseoid = oidmid.oid; if(mid != oidmid.mid) { - if(mid!= -1) - freeSockWithLock(transPResponseSocketPool, mid, sd); + if(mid!= -1) { + forcesend_buf(sd, &writebuffer, NULL, 0); + freeSockWithLock(transPResponseSocketPool, mid, sd); + } mid = oidmid.mid; sd = getSockWithLock(transPResponseSocketPool, mid); + writebuffer.offset=0; } short offsetsarry[numoffset]; recv_data_buf(acceptfd, readbuffer, offsetsarry, numoffset*sizeof(short)); @@ -427,6 +462,7 @@ int rangePrefetchReq(int acceptfd, struct readstruct * readbuffer) { //Release socket if(mid!=-1) + forcesend_buf(sd,&writebuffer, NULL, 0); freeSockWithLock(transPResponseSocketPool, mid, sd); return 0; } diff --git a/Robust/src/Runtime/DSTM/interface/prefetch.h b/Robust/src/Runtime/DSTM/interface/prefetch.h index 3e1c2dd9..62b9703e 100644 --- a/Robust/src/Runtime/DSTM/interface/prefetch.h +++ b/Robust/src/Runtime/DSTM/interface/prefetch.h @@ -12,6 +12,10 @@ #define GET_OFFSETS(x) ((short *) (x + sizeof(unsigned int) + sizeof(short))) #define INLINE inline __attribute__((always_inline)) +#ifdef TRANSSTATS +extern int getResponse; +extern int sendRemoteReq; +#endif /****** Global structure **********/ @@ -49,7 +53,7 @@ void proPrefetchQDealloc(perMcPrefetchList_t *); /******** Process Queue Element functions ***********/ void rangePrefetch(unsigned int, short, short *); void *transPrefetchNew(); -perMcPrefetchList_t* processLocal(char *ptr); +perMcPrefetchList_t* processLocal(char *ptr, int); perMcPrefetchList_t *processRemote(unsigned int oid, short * offsetarray, int sd, short numoffset); void insertPrefetch(int, unsigned int, short, short*, perMcPrefetchList_t **); diff --git a/Robust/src/Runtime/runtime.c b/Robust/src/Runtime/runtime.c index 1fa8f0a3..3002aebb 100644 --- a/Robust/src/Runtime/runtime.c +++ b/Robust/src/Runtime/runtime.c @@ -22,9 +22,11 @@ #include #endif #ifdef STMLOG +#define ARRAY_LENGTH 700003 __thread int counter; -__thread int event[100000*7+3]; -__thread unsigned long long clkticks[100000*7+3]; +__thread int event[ARRAY_LENGTH]; +__thread unsigned long long clkticks[ARRAY_LENGTH]; +unsigned long long beginClock=0; #define FILENAME "log" #endif @@ -262,6 +264,13 @@ void CALL11(___System______logevent____I,int ___event___, int ___event___) { return; } +void CALL00(___System______logevent____) { +#ifdef STMLOG + beginClock= rdtsc(); +#endif + return; +} + void CALL11(___System______flushToFile____I, int ___threadid___, int ___threadid___) { #ifdef STMLOG FILE *fp; @@ -275,9 +284,9 @@ void CALL11(___System______flushToFile____I, int ___threadid___, int ___threadid } int i; for (i = 0; i < counter-1; i++) { - fprintf(fp, "%d %lld %lld\n", event[i], clkticks[i], clkticks[i+1]); + fprintf(fp, "%d %lld %lld\n", event[i], clkticks[i]-beginClock, clkticks[i+1]-beginClock); } - fprintf(fp, "%d %lld\n", event[i], clkticks[i]); + fprintf(fp, "%d %lld\n", event[i], clkticks[i]-beginClock); fclose(fp); #endif @@ -287,6 +296,12 @@ void CALL11(___System______flushToFile____I, int ___threadid___, int ___threadid void CALL00(___System______initLog____) { #ifdef STMLOG counter=0; + int i; + for(i=0; itype=type; + //printf("DEBUG %s(), type= %x\n", __func__, type); #ifdef THREADS v->tid=0; v->lockentry=0; -- 2.34.1