From: bdemsky Date: Fri, 13 Mar 2009 03:50:41 +0000 (+0000) Subject: batch communications for prefetches X-Git-Url: http://demsky.eecs.uci.edu/git/?a=commitdiff_plain;h=99cc3306f34030265037aa1a231598c9ee73c5cf;p=IRC.git batch communications for prefetches --- diff --git a/Robust/src/Runtime/DSTM/interface/dstm.h b/Robust/src/Runtime/DSTM/interface/dstm.h index a621f9bd..bd271231 100644 --- a/Robust/src/Runtime/DSTM/interface/dstm.h +++ b/Robust/src/Runtime/DSTM/interface/dstm.h @@ -49,7 +49,6 @@ //Max number of objects #define MAX_OBJECTS 20 -#define DEFAULT_OBJ_STORE_SIZE 1048510 //1MB //Transaction id per machine #define TID_LEN 20 #define LISTEN_PORT 2156 diff --git a/Robust/src/Runtime/DSTM/interface/dstmserver.c b/Robust/src/Runtime/DSTM/interface/dstmserver.c index ff3987d0..ca40fac2 100644 --- a/Robust/src/Runtime/DSTM/interface/dstmserver.c +++ b/Robust/src/Runtime/DSTM/interface/dstmserver.c @@ -11,6 +11,7 @@ #ifdef COMPILER #include "thread.h" #endif +#include "gCollect.h" #define BACKLOG 10 //max pending connections #define RECEIVE_BUFFER_SIZE 2048 @@ -789,7 +790,7 @@ int prefetchReq(int acceptfd) { /* Calculate the oid corresponding to the offset value */ for(i = 0 ; i< numoffset ; i++) { /* Check for arrays */ - if(TYPE(header) > NUMCLASSES) { + if(TYPE(header) >= NUMCLASSES) { int elementsize = classsize[TYPE(header)]; struct ArrayObject *ao = (struct ArrayObject *) (((char *)header) + sizeof(objheader_t)); unsigned short length = ao->___length___; diff --git a/Robust/src/Runtime/DSTM/interface/gCollect.h b/Robust/src/Runtime/DSTM/interface/gCollect.h index 5a1b8e4a..747e987f 100644 --- a/Robust/src/Runtime/DSTM/interface/gCollect.h +++ b/Robust/src/Runtime/DSTM/interface/gCollect.h @@ -7,13 +7,14 @@ ****** Global constants ********** **********************************/ -#define STALE_MINTHRESHOLD 30 +#define STALE_MINTHRESHOLD 10 //minimum size -#define STALE_MAXTHRESHOLD 40 //ugly hack..if you make this too small things +#define STALE_MAXTHRESHOLD 30 //ugly hack..if you make this too small things // will fail in odd subtle ways -#define PREFETCH_FLUSH_THRESHOLD 20 -#define STALL_THRESHOLD 30 +#define DEFAULT_OBJ_STORE_SIZE (4194304-16) //just a little less the 4MB +#define PREFETCH_FLUSH_THRESHOLD 10 //MINIMUM SIZE BEFORE FLUSHING +#define STALL_THRESHOLD 15 //number of prefetches stores before we can start freeing old ones diff --git a/Robust/src/Runtime/DSTM/interface/objstr.c b/Robust/src/Runtime/DSTM/interface/objstr.c index 1e3bcb54..173b069d 100644 --- a/Robust/src/Runtime/DSTM/interface/objstr.c +++ b/Robust/src/Runtime/DSTM/interface/objstr.c @@ -1,4 +1,5 @@ #include "dstm.h" +#include "gCollect.h" #define OSUSED(x) (((unsigned int)(x)->top)-((unsigned int) (x+1))) #define OSFREE(x) ((x)->size-OSUSED(x)) diff --git a/Robust/src/Runtime/DSTM/interface/prefetch.c b/Robust/src/Runtime/DSTM/interface/prefetch.c index d28ba1f7..b85deec8 100644 --- a/Robust/src/Runtime/DSTM/interface/prefetch.c +++ b/Robust/src/Runtime/DSTM/interface/prefetch.c @@ -79,14 +79,19 @@ perMcPrefetchList_t *processLocal(char *ptr) { for(top=0; top>=0;) { oid=getNextOid(header, offsetarray, dfsList, top); if (oid&1) { + int oldisField=TYPE(header) < NUMCLASSES; top+=2; dfsList[top]=oid; dfsList[top+1]=0; header=searchObj(oid); if (header==NULL) { //forward prefetch - int machinenum = lhashSearch(dfsList[top]); - insertPrefetch(machinenum, dfsList[top], numoffset-top, &offsetarray[top], &head); + int machinenum = lhashSearch(oid); + + if (oldisField&&(dfsList[top-1]!=GET_RANGE(offsetarray[top+1]))) + insertPrefetch(machinenum, oid, 2+numoffset-top, &offsetarray[top-2], &head); + else + insertPrefetch(machinenum, oid, numoffset-top, &offsetarray[top], &head); } else if (top=0;) { oid=getNextOid(header, offsetarray, dfsList, top); - if (oid&1) { + int oldisField=TYPE(header) < NUMCLASSES; top+=2; dfsList[top]=oid; dfsList[top+1]=0; header=searchObj(oid); if (header==NULL) { //forward prefetch - int machinenum = lhashSearch(dfsList[top]); - insertPrefetch(machinenum, dfsList[top], numoffset-top, &offsetarray[top], &head); + int machinenum = lhashSearch(oid); + if (oldisField&&(dfsList[top-1]!=GET_RANGE(offsetarray[top+1]))) + insertPrefetch(machinenum, oid, 2+numoffset-top, &offsetarray[top-2], &head); + else + insertPrefetch(machinenum, oid, numoffset-top, &offsetarray[top], &head); } else { - sendOidFound(header, oid, sd); + sendOidFound(header, oid, sd, buffer, &bufoffset); if (topversion <= ((objheader_t *)ptr)->version) { - prehashRemove(oid); + //ignore oid value...we'll get it from the object + + while(size>0) { + unsigned int objsize; + GETSIZE(objsize, ptr); + STATUS(ptr)=0; + oid=OID(ptr); + objsize+=sizeof(objheader_t); + + /* Insert into prefetch hash lookup table */ + void * oldptr; + if((oldptr = prehashSearch(oid)) != NULL) { + if(((objheader_t *)oldptr)->version <= ((objheader_t *)ptr)->version) { + prehashRemove(oid); + prehashInsert(oid, ptr); + } + } else { prehashInsert(oid, ptr); } - } else { - prehashInsert(oid, ptr); + ptr=(void *)(((unsigned int)ptr)+objsize); + size-=objsize; } - objheader_t *head = prehashSearch(oid); + pthread_mutex_lock(&pflookup.lock); pthread_cond_broadcast(&pflookup.cond); pthread_mutex_unlock(&pflookup.lock); @@ -406,7 +436,7 @@ unsigned int getNextOid(objheader_t * header, short * offsetarray, unsigned int int currcount = dfsList[top+1]; int range = GET_RANGE(offsetarray[top + 3]); - if(TYPE(header) > NUMCLASSES) { + if(TYPE(header) >= NUMCLASSES) { //Array case struct ArrayObject *ao = (struct ArrayObject *) (((char *)header) + sizeof(objheader_t)); int stride = GET_STRIDE(offsetarray[top + 3])+1; @@ -450,22 +480,45 @@ unsigned int getNextOid(objheader_t * header, short * offsetarray, unsigned int } } -int sendOidFound(objheader_t * header, unsigned int oid, int sd) { - int incr = 0; +void flushResponses(int sd, char * buffer, int * bufoffset) { + if ((*bufoffset)!=0) { + send_data(sd, buffer, *bufoffset); + *bufoffset=0; + } +} + +int sendOidFound(objheader_t * header, unsigned int oid, int sd, char *buffer, int *bufoffset) { + int incr; int objsize; GETSIZE(objsize, header); - int size = sizeof(int) + sizeof(char) + sizeof(unsigned int) + sizeof(objheader_t) + objsize; - char sendbuffer[size]; - *((int *)(sendbuffer + incr)) = size; - incr += sizeof(int); - *((char *)(sendbuffer + incr)) = OBJECT_FOUND; - incr += sizeof(char); - *((unsigned int *)(sendbuffer + incr)) = oid; - incr += sizeof(unsigned int); - memcpy(sendbuffer + incr, header, objsize + sizeof(objheader_t)); + int size = sizeof(objheader_t) + objsize; + char *sendbuffer; + + if ((incr=(*bufoffset))==0) { + buffer[incr] = TRANS_PREFETCH_RESPONSE; + incr+=sizeof(char); + *((int *)(buffer + incr)) = size+sizeof(int)+sizeof(char)+sizeof(unsigned int); + incr += sizeof(int); + *((char *)(buffer + incr)) = OBJECT_FOUND; + incr += sizeof(char); + *((unsigned int *)(buffer + incr)) = oid; + incr += sizeof(unsigned int); + } else + *((int *)(buffer+sizeof(char)))+=size; + + if ((incr+size) NUMCLASSES) { + if(TYPE(header) >= NUMCLASSES) { int elementsize = classsize[TYPE(header)]; struct ArrayObject *ao = (struct ArrayObject *) (((char *)header) + sizeof(objheader_t)); int length = ao->___length___;