From 39c05c51369998566aeb7fc6a5d653618013ba6a Mon Sep 17 00:00:00 2001 From: adash Date: Thu, 17 Dec 2009 00:05:57 +0000 Subject: [PATCH] runtime changes for ecoop submission --- .../DSTM/interface/addPrefetchEnhance.c | 15 +- .../DSTM/interface/addPrefetchEnhance.h | 2 +- .../Runtime/DSTM/interface/addUdpEnhance.c | 6 +- .../Runtime/DSTM/interface/addUdpEnhance.h | 2 +- .../src/Runtime/DSTM/interface/altprelookup.c | 66 +++-- .../Runtime/DSTM/interface/clocksyncclient.c | 19 +- .../Runtime/DSTM/interface/clocksyncserver.c | 22 +- .../src/Runtime/DSTM/interface/debugmacro.h | 59 +++++ Robust/src/Runtime/DSTM/interface/dstm.h | 16 +- .../src/Runtime/DSTM/interface/dstmserver.c | 73 +++-- Robust/src/Runtime/DSTM/interface/gCollect.c | 60 ----- .../src/Runtime/DSTM/interface/machinepile.c | 5 +- .../src/Runtime/DSTM/interface/machinepile.h | 3 +- Robust/src/Runtime/DSTM/interface/mcpileq.h | 1 + Robust/src/Runtime/DSTM/interface/prefetch.c | 6 +- Robust/src/Runtime/DSTM/interface/signal.c | 27 +- .../src/Runtime/DSTM/interface/threadnotify.c | 4 + Robust/src/Runtime/DSTM/interface/trans.c | 249 +++++++++++------- Robust/src/Runtime/garbage.c | 2 +- Robust/src/Runtime/runtime.c | 2 +- 20 files changed, 399 insertions(+), 240 deletions(-) create mode 100755 Robust/src/Runtime/DSTM/interface/debugmacro.h diff --git a/Robust/src/Runtime/DSTM/interface/addPrefetchEnhance.c b/Robust/src/Runtime/DSTM/interface/addPrefetchEnhance.c index d4160902..dee6025c 100644 --- a/Robust/src/Runtime/DSTM/interface/addPrefetchEnhance.c +++ b/Robust/src/Runtime/DSTM/interface/addPrefetchEnhance.c @@ -1,5 +1,5 @@ #include "addPrefetchEnhance.h" -#include "prelookup.h" +#include "altprelookup.h" extern int numprefetchsites; // Number of prefetch sites extern pfcstats_t *evalPrefetch; //Global array that keeps track of operation mode (ON/OFF) for each prefetch site @@ -57,10 +57,11 @@ char getOperationMode(int siteid) { * we take action accordingly */ void handleDynPrefetching(int numLocal, int ntuples, int siteid) { if(numLocal < ntuples) { - /* prefetch not found locally(miss in cache) */ + /* prefetch not found locally(miss in cache); turn on prefetching*/ evalPrefetch[siteid].operMode = 1; evalPrefetch[siteid].uselesscount = SHUTDOWNINTERVAL; } else { + //Turn off prefetch site if(getOperationMode(siteid) != 0) { evalPrefetch[siteid].uselesscount--; if(evalPrefetch[siteid].uselesscount <= 0) { @@ -175,14 +176,10 @@ int copyToCache(int numoid, unsigned int *oidarray, char oidType) { newAddr->version += 1; newAddr->notifylist = NULL; } + STATUS(newAddr)=0; + //make an entry in prefetch lookup hashtable - void *oldptr; - if((oldptr = prehashSearch(oid)) != NULL) { - prehashRemove(oid); - prehashInsert(oid, newAddr); - } else { - prehashInsert(oid, newAddr); - } + prehashInsert(oid, newAddr); } //end of for return 0; } diff --git a/Robust/src/Runtime/DSTM/interface/addPrefetchEnhance.h b/Robust/src/Runtime/DSTM/interface/addPrefetchEnhance.h index 44c87049..7eb3c519 100644 --- a/Robust/src/Runtime/DSTM/interface/addPrefetchEnhance.h +++ b/Robust/src/Runtime/DSTM/interface/addPrefetchEnhance.h @@ -2,7 +2,7 @@ #define _ADDPREFETCHENHANCE_H_ #include "dstm.h" -#include "mlookup.h" +#include "altmlookup.h" #include "gCollect.h" typedef struct prefetchCountStats { diff --git a/Robust/src/Runtime/DSTM/interface/addUdpEnhance.c b/Robust/src/Runtime/DSTM/interface/addUdpEnhance.c index 5c9363a0..adaf6671 100644 --- a/Robust/src/Runtime/DSTM/interface/addUdpEnhance.c +++ b/Robust/src/Runtime/DSTM/interface/addUdpEnhance.c @@ -5,7 +5,7 @@ #include #include #include "addUdpEnhance.h" -#include "prelookup.h" +#include "altprelookup.h" #ifdef ABORTREADERS #include "abortreaders.h" #endif @@ -209,7 +209,9 @@ int invalidateFromPrefetchCache(char *buffer) { objheader_t *header; /* Lookup Objects in prefetch cache and remove them */ if(((header = prehashSearch(oid)) != NULL)) { - prehashRemove(oid); + //Keep invalid objects + STATUS(header)=DIRTY; + //prehashRemove(oid); } offset += sizeof(unsigned int); } diff --git a/Robust/src/Runtime/DSTM/interface/addUdpEnhance.h b/Robust/src/Runtime/DSTM/interface/addUdpEnhance.h index b3964bce..5011df31 100644 --- a/Robust/src/Runtime/DSTM/interface/addUdpEnhance.h +++ b/Robust/src/Runtime/DSTM/interface/addUdpEnhance.h @@ -2,7 +2,7 @@ #define _ADDUDPENHANCE_H #include "dstm.h" -#include "mlookup.h" +#include "altmlookup.h" /******************************* diff --git a/Robust/src/Runtime/DSTM/interface/altprelookup.c b/Robust/src/Runtime/DSTM/interface/altprelookup.c index 87afbe89..b44e8e99 100644 --- a/Robust/src/Runtime/DSTM/interface/altprelookup.c +++ b/Robust/src/Runtime/DSTM/interface/altprelookup.c @@ -121,54 +121,52 @@ void *prehashSearch(unsigned int key) { } unsigned int prehashRemove(unsigned int key) { - int index; - prehashlistnode_t *prev; - prehashlistnode_t *ptr, *node; - - //eom - unsigned int keyindex=key>>1; + unsigned int keyindex = key >> 1; volatile unsigned int * lockptr=&pflookup.larray[keyindex&PRELOCKMASK].lock; + prehashlistnode_t *node, *prev; while(!write_trylock(lockptr)) { sched_yield(); } - prehashlistnode_t *curr = &pflookup.table[keyindex&pflookup.mask]; - //eom - - for (; curr != NULL; curr = curr->next) { - if (curr->key == key) { - // Find a match in the hash table - //decrement the number of elements in the global hashtable + // If there are no elements + //delete from first bin of table + if (curr->next == NULL && curr->key == key) { + curr->key = 0; + //TODO free(val) ? + curr->val = NULL; + atomic_dec(&(pflookup.numelements)); + write_unlock(lockptr); + return 0; + } + //delete from first bin of table but elements follow in linked list + if (curr->next != NULL && curr->key == key) { + curr->key = curr->next->key; + curr->val = curr->next->val; + node = curr->next; + curr->next = node->next; + free(node); + atomic_dec(&(pflookup.numelements)); + write_unlock(lockptr); + return 0; + } + prev = curr; + curr = curr->next; + //delete from elements in the linked list + for(; curr != NULL; curr = curr->next) { + if (curr->key == key) { + prev->next = curr->next; + free(curr); atomic_dec(&(pflookup.numelements)); - - if ((curr == &ptr[index]) && (curr->next == NULL)) { - // Delete the first item inside the hashtable with no linked list of prehashlistnode_t - curr->key = 0; - curr->val = NULL; - } else if ((curr == &ptr[index]) && (curr->next != NULL)) { - //Delete the first item with a linked list of prehashlistnode_t connected - curr->key = curr->next->key; - curr->val = curr->next->val; - node = curr->next; - curr->next = curr->next->next; - free(node); - } else { - // Regular delete from linked listed - prev->next = curr->next; - free(curr); - } - //pthread_mutex_unlock(&pflookup.lock); - write_unlock(lockptr); + write_unlock(lockptr); return 0; } prev = curr; } write_unlock(lockptr); - return 1; } - + unsigned int prehashResize(unsigned int newsize) { prehashlistnode_t *node, *ptr; // curr and next keep track of the current and the next chashlistnodes in a linked list unsigned int oldsize; diff --git a/Robust/src/Runtime/DSTM/interface/clocksyncclient.c b/Robust/src/Runtime/DSTM/interface/clocksyncclient.c index 749ac47e..46084aca 100644 --- a/Robust/src/Runtime/DSTM/interface/clocksyncclient.c +++ b/Robust/src/Runtime/DSTM/interface/clocksyncclient.c @@ -13,11 +13,12 @@ #include #include #include +#include #define PORT 8500 /* REPLACE with your server machine name*/ #define DIRSIZE 64 -#define NUMITER 1024 +#define NUMITER 10000 static __inline__ unsigned long long rdtsc(void) @@ -108,14 +109,30 @@ int main(int argc, char **argv) { } //printf("DEBUG: dir[0]= %lld\n", dir[0]); array2[i]=rdtsc() - dir[0]; + printf("%lld\n", array2[i]); } for(i=0;i<(NUMITER-1);i++) { norm += array2[i]; } + + /* spew-out the results */ //printf("DEBUG: Average offset= %lld\n", (norm/(NUMITER-1))); + long long average=(norm/(NUMITER-1)); + printf("average= %lld",(norm/(NUMITER-1))); + long long stddev, avg1=0; + for(i=0;i<(NUMITER-1);i++) { + avg1 += ((array2[i] - average) * (array2[i] - average)); + } + float ans = (avg1/(NUMITER-1)); + float squareroot= sqrt(ans); + float squareroot2= sqrt(avg1); + + printf("stddev= %f\n", squareroot); + printf("error= %f\n", squareroot2/(NUMITER-1)); + fprintf(f1,"%lld",(norm/(NUMITER-1))); close(sd); diff --git a/Robust/src/Runtime/DSTM/interface/clocksyncserver.c b/Robust/src/Runtime/DSTM/interface/clocksyncserver.c index 9e481d15..9e1ddafe 100644 --- a/Robust/src/Runtime/DSTM/interface/clocksyncserver.c +++ b/Robust/src/Runtime/DSTM/interface/clocksyncserver.c @@ -13,9 +13,10 @@ #include #include #include +#include #define PORT 8500 -#define NUMITER 1024 +#define NUMITER 10000 #define DIRSIZE 1 static __inline__ unsigned long long rdtsc(void) @@ -89,7 +90,9 @@ int main() { //printf("DEBUG: dir[0]= %lld\n", dir[0]); array2[i] = rdtsc(); //printf("DEBUG: array2[i]= %lld\n", array2[i]); - array1[i]=array2[i] - dir[0]; + //array1[i]=array2[i] - dir[0]; + array1[i]= dir[0] - array2[i]; + printf("%lld\n", array1[i]); /* acknowledge the message, reply w/ the file names */ if (send(sd_current, &array2[i], sizeof(unsigned long long), MSG_NOSIGNAL) == -1) { @@ -105,7 +108,20 @@ int main() { /* spew-out the results */ //printf("DEBUG: Average offset= %lld\n", (norm/(NUMITER-1))); - fprintf(f1,"%lld",(norm/(NUMITER-1))); + long long average=(norm/(NUMITER-1)); + printf("average= %lld",(norm/(NUMITER-1))); + + long long stddev, avg1=0; + for(i=0;i<(NUMITER-1);i++) { + avg1 += ((array1[i] - average) * (array1[i] - average)); + } + float ans = (avg1/(NUMITER-1)); + float squareroot= sqrt(ans); + float squareroot2= sqrt(avg1); + + printf("stddev= %f\n", squareroot); + printf("error= %f\n", squareroot2/(NUMITER-1)); + fprintf(f1,"%lld\n",(norm/(NUMITER-1))); /* give client a chance to properly shutdown */ diff --git a/Robust/src/Runtime/DSTM/interface/debugmacro.h b/Robust/src/Runtime/DSTM/interface/debugmacro.h new file mode 100755 index 00000000..c11be59a --- /dev/null +++ b/Robust/src/Runtime/DSTM/interface/debugmacro.h @@ -0,0 +1,59 @@ +#ifndef _DEBUGMACRO_H_ +#define _DEBUGMACRO_H_ + +/** Macro to print oid and object type **/ +//#define LOGOIDTYPES //turn on printing oid and type events +#ifdef LOGOIDTYPES +#define LOGOIDTYPE(x,y,z,t) printf("[%s: %u %u %lld]\n", x, y, z, t); +#else +#define LOGOIDTYPE(x,y,z,t) +#endif + + +/** Macro to print prefetch site id **/ +//#define LOGPREFETCHSITES +#ifdef LOGPREFETCHSITES +#define LOGPREFETCHSITE(PTR) printf("[siteid= %u] ", PTR->siteid); +#else +#define LOGPREFETCHSITE(PTR) +#endif + + +/* +#define LOGEVENTS //turn on Logging events +#ifdef LOGEVENTS +char bigarray[16*1024*1024]; +int bigindex=0; +#define LOGEVENT(x) { \ + int tmp=bigindex++; \ + bigarray[tmp]=x; \ + } +#else +#define LOGEVENT(x) +#endif +*/ + +/** + * Record Time after clock synchronization + **/ +/* +#define LOGTIMES +#ifdef LOGTIMES +char bigarray1[8*1024*1024]; +unsigned int bigarray2[8*1024*1024]; +unsigned int bigarray3[8*1024*1024]; +long long bigarray4[8*1024*1024]; +int bigindex1=0; +#define LOGTIME(x,y,z,a) {\ + int tmp=bigindex1++; \ + bigarray1[tmp]=x; \ + bigarray2[tmp]=y; \ + bigarray3[tmp]=z; \ + bigarray4[tmp]=a; \ +} +#else +#define LOGTIME(x,y,z,a) +#endif +*/ + +#endif diff --git a/Robust/src/Runtime/DSTM/interface/dstm.h b/Robust/src/Runtime/DSTM/interface/dstm.h index 2e9c6e9d..df4a20ca 100644 --- a/Robust/src/Runtime/DSTM/interface/dstm.h +++ b/Robust/src/Runtime/DSTM/interface/dstm.h @@ -56,9 +56,9 @@ #define UDP_PORT 2158 //Prefetch tuning paramters //#define RETRYINTERVAL 20 //N (For Em3d, SOR, Moldyn benchmarks) -//#define SHUTDOWNINTERVAL 3 //M -#define RETRYINTERVAL 20 //N (For MatrixMultiply, 2DFFT benchmarks) -#define SHUTDOWNINTERVAL 1 //M +//#define SHUTDOWNINTERVAL 3 //M +#define RETRYINTERVAL 100 //N (For MatrixMultiply, 2DFFT, 2DConv benchmarks) +#define SHUTDOWNINTERVAL 1 //M #include #include @@ -264,6 +264,7 @@ void mapObjMethod(unsigned short); void randomdelay(); void transStart(); +//#define TRANSREAD(x,y,z(tobe passed as a parameter to transRead2)) { #define TRANSREAD(x,y) { \ unsigned int inputvalue;\ if ((inputvalue=(unsigned int)y)==0) x=NULL;\ @@ -292,11 +293,11 @@ void sendPrefetchResponse(int sd, char *control, char *sendbuffer, int *size); void prefetch(int, int, unsigned int *, unsigned short *, short*); void *transPrefetch(void *); void *mcqProcess(void *); -prefetchpile_t *foundLocal(char *, int); // returns node with prefetch elements(oids, offsets) -int lookupObject(unsigned int * oid, short offset); -int checkoid(unsigned int oid); +prefetchpile_t *foundLocal(char *, int, int); // returns node with prefetch elements(oids, offsets, siteid) +int lookupObject(unsigned int * oid, short offset, int *); +int checkoid(unsigned int oid, int isLastOffset); int transPrefetchProcess(int **, short); -void sendPrefetchReq(prefetchpile_t*, int); +void sendPrefetchReq(prefetchpile_t*, int, int); void sendPrefetchReqnew(prefetchpile_t*, int); int getPrefetchResponse(int, struct readstruct *); unsigned short getObjType(unsigned int oid); @@ -305,6 +306,7 @@ plistnode_t *pInsert(plistnode_t *pile, objheader_t *headeraddr, unsigned int mi void commitCountForObjRead(char *, unsigned int *, unsigned int *, int *, int *, int *, int *, int *, unsigned int, unsigned short); void commitCountForObjMod(char *, unsigned int *, unsigned int *, int *, int *, int *, int *, int *, unsigned int, unsigned short); +long long myrdtsc(void); /* Sends notification request for thread join, if sucessful returns 0 else returns -1 */ int reqNotify(unsigned int *oidarry, unsigned short *versionarry, unsigned int numoid); void threadNotify(unsigned int oid, unsigned short version, unsigned int tid); diff --git a/Robust/src/Runtime/DSTM/interface/dstmserver.c b/Robust/src/Runtime/DSTM/interface/dstmserver.c index 4122ddc8..859bc4c7 100644 --- a/Robust/src/Runtime/DSTM/interface/dstmserver.c +++ b/Robust/src/Runtime/DSTM/interface/dstmserver.c @@ -3,7 +3,7 @@ #include #include "dstm.h" -#include "mlookup.h" +#include "altmlookup.h" #include "llookup.h" #include "threadnotify.h" #include "prefetch.h" @@ -13,6 +13,7 @@ #endif #include "gCollect.h" #include "readstruct.h" +#include "debugmacro.h" #define BACKLOG 10 //max pending connections #define RECEIVE_BUFFER_SIZE 2048 @@ -20,6 +21,37 @@ extern int classsize[]; extern int numHostsInSystem; extern pthread_mutex_t notifymutex; +extern unsigned long long clockoffset; +long long startreq, endreq, diff; + +//#define LOGTIMES +#ifdef LOGTIMES +extern char bigarray1[6*1024*1024]; +extern unsigned int bigarray2[6*1024*1024]; +extern unsigned int bigarray3[6*1024*1024]; +extern long long bigarray4[6*1024*1024]; +extern int bigarray5[6*1024*1024]; +extern int bigindex1; +#define LOGTIME(x,y,z,a,b) {\ + int tmp=bigindex1; \ + bigarray1[tmp]=x; \ + bigarray2[tmp]=y; \ + bigarray3[tmp]=z; \ + bigarray4[tmp]=a; \ + bigarray5[tmp]=b; \ + bigindex1++; \ +} +#else +#define LOGTIME(x,y,z,a,b) +#endif + + +long long myrdtsc(void) +{ + unsigned hi, lo; + __asm__ __volatile__ ("rdtsc" : "=a"(lo), "=d"(hi)); + return ( (unsigned long long)lo)|( ((unsigned long long)hi)<<32 ); +} objstr_t *mainobjstore; pthread_mutex_t mainobjstore_mutex; @@ -199,6 +231,7 @@ void *dstmAccept(void *acceptfd) { break; } #else + LOGTIME('X',0,0,myrdtsc(),0); if((val = prefetchReq((int)acceptfd, &readbuffer)) != 0) { printf("Error: In prefetchReq() %s, %d\n", __FILE__, __LINE__); break; @@ -552,7 +585,7 @@ char handleTransReq(fixed_data_t *fixed, trans_commit_data_t *transinfo, unsigne free(oidlocked); } */ - //control=TRANS_DISAGREE; + control=TRANS_DISAGREE; send_data(acceptfd, &control, sizeof(char)); #ifdef CACHE send_data(acceptfd, &numBytes, sizeof(int)); @@ -603,7 +636,6 @@ char getCommitCountForObjMod(unsigned int *oidnotfound, unsigned int *oidlocked, *numBytes += size; /* Send TRANS_DISAGREE to Coordinator */ *control = TRANS_DISAGREE; - //printf("%s() oid = %d, type = %d\t", __func__, OID(mobj), TYPE((objheader_t *)mobj)); } //Keep track of oid locked oidlocked[(*objlocked)++] = OID(((objheader_t *)mobj)); @@ -620,7 +652,6 @@ char getCommitCountForObjMod(unsigned int *oidnotfound, unsigned int *oidlocked, size += sizeof(objheader_t); *numBytes += size; *control = TRANS_DISAGREE; - //printf("%s() oid = %d, type = %d\t", __func__, OID(mobj), TYPE((objheader_t *)mobj)); } } } @@ -653,7 +684,6 @@ char getCommitCountForObjRead(unsigned int *oidnotfound, unsigned int *oidlocked *numBytes += size; /* Send TRANS_DISAGREE to Coordinator */ *control = TRANS_DISAGREE; - //printf("%s() oid = %d, type = %d\t", __func__, OID(mobj), TYPE((objheader_t *)mobj)); } //Keep track of oid locked oidlocked[(*objlocked)++] = OID(((objheader_t *)mobj)); @@ -670,7 +700,6 @@ char getCommitCountForObjRead(unsigned int *oidnotfound, unsigned int *oidlocked size += sizeof(objheader_t); *numBytes += size; *control = TRANS_DISAGREE; - //printf("%s() oid = %d, type = %d\t", __func__, OID(mobj), TYPE((objheader_t *)mobj)); } } } @@ -693,10 +722,7 @@ void procRestObjs(char *objread, unsigned short version; /* Process each oid in the machine pile/ group per thread */ - //printf("DEBUG: index= %d, numread= %d, nummod= %d numread+nummod= %d\n", index,numread,nummod,numread+nummod); for (i = index; i < numread+nummod; i++) { - //printf("DEBUG: i= %d\n", i); - //fflush(stdout); if (i < numread) { //Objs only read and not modified int incr = sizeof(unsigned int) + sizeof(unsigned short); // Offset that points to next position in the objread array incr *= i; @@ -859,9 +885,8 @@ void processVerNoMatch(unsigned int *oidnotfound, * Looks for the objects to be prefetched in the main object store. * If objects are not found then record those and if objects are found * then use offset values to prefetch references to other objects */ - int prefetchReq(int acceptfd, struct readstruct * readbuffer) { - int i, size, objsize, numoffset = 0; + int i, size, objsize, numoffset = 0, gid=0; int length; char *recvbuffer, control; unsigned int oid, mid=-1; @@ -869,6 +894,7 @@ int prefetchReq(int acceptfd, struct readstruct * readbuffer) { oidmidpair_t oidmid; struct writestruct writebuffer; int sd = -1; + while(1) { recv_data_buf((int)acceptfd, readbuffer, &numoffset, sizeof(int)); if(numoffset == -1) @@ -885,22 +911,26 @@ int prefetchReq(int acceptfd, struct readstruct * readbuffer) { writebuffer.offset=0; } short offsetarry[numoffset]; + recv_data_buf((int)acceptfd, readbuffer, &gid, sizeof(int)); recv_data_buf((int) acceptfd, readbuffer, offsetarry, numoffset*sizeof(short)); + LOGTIME('A',oid ,0,myrdtsc(),gid); //after recv the entire prefetch request /*Process each oid */ if ((header = mhashSearch(oid)) == NULL) { /* Obj not found */ /* Save the oids not found in buffer for later use */ - size = sizeof(int) + sizeof(char) + sizeof(unsigned int) ; + size = sizeof(int)+sizeof(int) + sizeof(char) + sizeof(unsigned int) ; char sendbuffer[size+1]; sendbuffer[0]=TRANS_PREFETCH_RESPONSE; *((int *) (sendbuffer+sizeof(char))) = size; *((char *)(sendbuffer + sizeof(char)+sizeof(int))) = OBJECT_NOT_FOUND; *((unsigned int *)(sendbuffer + sizeof(int) + sizeof(char)+sizeof(char))) = oid; + *((int *)(sendbuffer+sizeof(int) + sizeof(char)+sizeof(char)+sizeof(unsigned int))) = gid; send_buf(sd, &writebuffer, sendbuffer, size+1); + LOGTIME('J',oid, 0,myrdtsc(), gid); //send first oid not found prefetch request } else { /* Object Found */ int incr = 1; GETSIZE(objsize, header); - size = sizeof(int) + sizeof(char) + sizeof(unsigned int) + sizeof(objheader_t) + objsize; + size = sizeof(int)+sizeof(int) + sizeof(char) + sizeof(unsigned int) + sizeof(objheader_t) + objsize; char sendbuffer[size+1]; sendbuffer[0]=TRANS_PREFETCH_RESPONSE; *((int *)(sendbuffer + incr)) = size; @@ -909,8 +939,12 @@ int prefetchReq(int acceptfd, struct readstruct * readbuffer) { incr += sizeof(char); *((unsigned int *)(sendbuffer+incr)) = oid; incr += sizeof(unsigned int); + *((int *)(sendbuffer+incr)) = gid; + incr += sizeof(int); memcpy(sendbuffer + incr, header, objsize + sizeof(objheader_t)); send_buf(sd, &writebuffer, sendbuffer, size+1); + LOGOIDTYPE("SRES", oid, TYPE(header), (myrdtsc()-clockoffset)); + LOGTIME('C',oid,TYPE(header),myrdtsc(), gid); //send first oid found from prefetch request /* Calculate the oid corresponding to the offset value */ for(i = 0 ; i< numoffset ; i++) { @@ -932,20 +966,24 @@ int prefetchReq(int acceptfd, struct readstruct * readbuffer) { if (oid==0) break; + LOGTIME('B',oid,0,myrdtsc(),gid); //send next oid found from prefetch request + if((header = mhashSearch(oid)) == NULL) { - size = sizeof(int) + sizeof(char) + sizeof(unsigned int) ; + size = sizeof(int)+sizeof(int) + sizeof(char) + sizeof(unsigned int) ; char sendbuffer[size+1]; sendbuffer[0]=TRANS_PREFETCH_RESPONSE; *((int *) (sendbuffer+1)) = size; *((char *)(sendbuffer + sizeof(char)+sizeof(int))) = OBJECT_NOT_FOUND; *((unsigned int *)(sendbuffer + sizeof(char)+sizeof(int) + sizeof(char))) = oid; + *((int *)(sendbuffer+sizeof(int) + sizeof(char)+sizeof(char)+sizeof(unsigned int))) = gid; send_buf(sd, &writebuffer, sendbuffer, size+1); + LOGTIME('J',oid, 0,myrdtsc(), gid); //send first oid not found prefetch request break; } else { /* Obj Found */ int incr = 1; GETSIZE(objsize, header); - size = sizeof(int) + sizeof(char) + sizeof(unsigned int) + sizeof(objheader_t) + objsize; + size = sizeof(int)+sizeof(int) + sizeof(char) + sizeof(unsigned int) + sizeof(objheader_t) + objsize; char sendbuffer[size+1]; sendbuffer[0]=TRANS_PREFETCH_RESPONSE; *((int *)(sendbuffer + incr)) = size; @@ -954,12 +992,17 @@ int prefetchReq(int acceptfd, struct readstruct * readbuffer) { incr += sizeof(char); *((unsigned int *)(sendbuffer+incr)) = oid; incr += sizeof(unsigned int); + *((int *)(sendbuffer+incr)) = gid; + incr += sizeof(int); memcpy(sendbuffer + incr, header, objsize + sizeof(objheader_t)); send_buf(sd, &writebuffer, sendbuffer, size+1); + LOGOIDTYPE("SRES", oid, TYPE(header), (myrdtsc()-clockoffset)); + LOGTIME('C',oid,TYPE(header),myrdtsc(), gid); //send first oid found from prefetch request } } //end of for } } //end of while + //Release socket if (mid!=-1) { forcesend_buf(sd, &writebuffer, NULL, 0); diff --git a/Robust/src/Runtime/DSTM/interface/gCollect.c b/Robust/src/Runtime/DSTM/interface/gCollect.c index 0be92327..185a02f6 100644 --- a/Robust/src/Runtime/DSTM/interface/gCollect.c +++ b/Robust/src/Runtime/DSTM/interface/gCollect.c @@ -1,9 +1,5 @@ #include "gCollect.h" -#if 0 #include "altprelookup.h" -#else -#inlcude "prelookup.h" -#endif extern pthread_mutex_t prefetchcache_mutex; //Mutex to lock Prefetch Cache @@ -99,23 +95,12 @@ void *prefetchobjstrAlloc(unsigned int size) { return ptr; } -#if 0 void clearBlock(objstr_t *block) { unsigned long int tmpbegin=(unsigned int)block; unsigned long int tmpend=(unsigned int)block->top; int i, j; prehashlistnode_t *ptr; - //pthread_mutex_lock(&pflookup.lock); - /* - for(i=0;itop; - int i, j; - prehashlistnode_t *ptr; - pthread_mutex_lock(&pflookup.lock); - - ptr = pflookup.table; - for(i = 0; inext; - for(; next != NULL; curr=next, next = next->next) { - unsigned int val=(unsigned int)next->val; - if ((val>=tmpbegin)&(valnext=next->next; - free(next); - next=curr; - //loop condition is broken now...need to check before incrementing - //if (next==NULL) - // break; - } - } - { - unsigned int val=(unsigned int)orig->val; - if ((val>=tmpbegin)&(valnext==NULL) { - orig->key=0; - orig->val=NULL; - } else { - next=orig->next; - orig->val=next->val; - orig->key=next->key; - orig->next=next->next; - free(next); - } - } - } - } - pthread_mutex_unlock(&pflookup.lock); } -#endif objstr_t *allocateNew(unsigned int size) { objstr_t *tmp; diff --git a/Robust/src/Runtime/DSTM/interface/machinepile.c b/Robust/src/Runtime/DSTM/interface/machinepile.c index 9d4a15de..49905865 100644 --- a/Robust/src/Runtime/DSTM/interface/machinepile.c +++ b/Robust/src/Runtime/DSTM/interface/machinepile.c @@ -1,6 +1,6 @@ #include "machinepile.h" -void insertPile(int mid, unsigned int oid, short numoffset, short *offset, prefetchpile_t **head) { +void insertPile(int mid, unsigned int oid, int siteid, short numoffset, short *offset, prefetchpile_t **head) { prefetchpile_t *ptr; objpile_t *objnode; unsigned int *oidarray; @@ -16,6 +16,7 @@ void insertPile(int mid, unsigned int oid, short numoffset, short *offset, prefe objnode->offset = offset; objnode->oid = oid; objnode->numoffset = numoffset; + objnode->siteid = siteid; objnode->next = NULL; tmp->objpiles = objnode; tmp->next = *head; @@ -37,6 +38,7 @@ void insertPile(int mid, unsigned int oid, short numoffset, short *offset, prefe objnode->offset = offset; objnode->oid = oid; objnode->numoffset = numoffset; + objnode->siteid = siteid; objnode->next = *tmp; *tmp = objnode; return; @@ -64,6 +66,7 @@ void insertPile(int mid, unsigned int oid, short numoffset, short *offset, prefe objnode->offset = offset; objnode->oid = oid; objnode->numoffset = numoffset; + objnode->siteid = siteid; objnode->next = *tmp; *tmp = objnode; return; diff --git a/Robust/src/Runtime/DSTM/interface/machinepile.h b/Robust/src/Runtime/DSTM/interface/machinepile.h index c32a02a8..53f94d6c 100644 --- a/Robust/src/Runtime/DSTM/interface/machinepile.h +++ b/Robust/src/Runtime/DSTM/interface/machinepile.h @@ -5,6 +5,7 @@ #include #include -void insertPile(int, unsigned int, short, short *, prefetchpile_t **); +//add prefetch site as an argument for debugging +void insertPile(int, unsigned int, int, short, short *, prefetchpile_t **); #endif diff --git a/Robust/src/Runtime/DSTM/interface/mcpileq.h b/Robust/src/Runtime/DSTM/interface/mcpileq.h index 5c0ab8ba..53937e73 100644 --- a/Robust/src/Runtime/DSTM/interface/mcpileq.h +++ b/Robust/src/Runtime/DSTM/interface/mcpileq.h @@ -9,6 +9,7 @@ //Structure to make machine groups when prefetching typedef struct objpile { unsigned int oid; + int siteid; short numoffset; short *offset; struct objpile *next; diff --git a/Robust/src/Runtime/DSTM/interface/prefetch.c b/Robust/src/Runtime/DSTM/interface/prefetch.c index 23454d5a..2464b48e 100644 --- a/Robust/src/Runtime/DSTM/interface/prefetch.c +++ b/Robust/src/Runtime/DSTM/interface/prefetch.c @@ -1,5 +1,5 @@ #include "prefetch.h" -#include "prelookup.h" +#include "altprelookup.h" #include "sockpool.h" #include "gCollect.h" @@ -400,7 +400,6 @@ int getRangePrefetchResponse(int sd, struct readstruct * readbuffer) { void * oldptr; if((oldptr = prehashSearch(oid)) != NULL) { if(((objheader_t *)oldptr)->version < ((objheader_t *)ptr)->version) { - //prehashRemove(oid); prehashInsert(oid, ptr); } } else { @@ -410,9 +409,6 @@ int getRangePrefetchResponse(int sd, struct readstruct * readbuffer) { size-=objsize; } - pthread_mutex_lock(&pflookup.lock); - pthread_cond_broadcast(&pflookup.cond); - pthread_mutex_unlock(&pflookup.lock); } else if(control == OBJECT_NOT_FOUND) { oid = *((unsigned int *)(recvbuffer + sizeof(char))); } else { diff --git a/Robust/src/Runtime/DSTM/interface/signal.c b/Robust/src/Runtime/DSTM/interface/signal.c index 2f9c1105..0c2d9c6a 100644 --- a/Robust/src/Runtime/DSTM/interface/signal.c +++ b/Robust/src/Runtime/DSTM/interface/signal.c @@ -2,12 +2,14 @@ #include "addPrefetchEnhance.h" #include #include +#include extern int numTransAbort; extern int numTransCommit; extern int nchashSearch; extern int nmhashSearch; extern int nprehashSearch; +extern int ndirtyCacheObj; extern int nRemoteSend; extern int nSoftAbort; extern int bytesSent; @@ -22,10 +24,28 @@ extern pfcstats_t *evalPrefetch; void transStatsHandler(int sig, siginfo_t* info, void *context) { #ifdef TRANSSTATS - FILE *fp; - if ((fp = fopen("/tmp/client_stats.txt", "a+")) == NULL) { + char filepath[200], exectime[10]; + struct utsname buf; + FILE *fp, *envfp; + + if ((envfp = fopen("/home/adash/.tmpenvs", "r")) == NULL) { + fprintf(stderr, "Error opening file .tmpenvfs"); + exit(-1); + } + memset(filepath, 0, 200); + fscanf(envfp, "%s\n", filepath); + uname(&buf); + strncat(filepath + strlen(filepath), buf.nodename, 4); + strcat(filepath, (const char *) ".txt"); + + memset(exectime, 0, 10); + fscanf(envfp, "%s\n", exectime); + fclose(envfp); + + if ((fp = fopen(filepath, "a+")) == NULL) { exit(-1); } + fprintf(fp, "****** Transaction Stats ******\n"); fprintf(fp, "myIpAddr = %x\n", myIpAddr); fprintf(fp, "numTransAbort = %d\n", numTransAbort); @@ -33,6 +53,7 @@ void transStatsHandler(int sig, siginfo_t* info, void *context) { fprintf(fp, "nchashSearch = %d\n", nchashSearch); fprintf(fp, "nmhashSearch = %d\n", nmhashSearch); fprintf(fp, "nprehashSearch = %d\n", nprehashSearch); + fprintf(fp, "ndirtyCacheObj = %d\n", ndirtyCacheObj); fprintf(fp, "nRemoteReadSend = %d\n", nRemoteSend); fprintf(fp, "nSoftAbort = %d\n", nSoftAbort); fprintf(fp, "bytesSent = %d\n", bytesSent); @@ -40,6 +61,7 @@ void transStatsHandler(int sig, siginfo_t* info, void *context) { fprintf(fp, "totalObjSize= %d\n", totalObjSize); fprintf(fp, "sendRemoteReq= %d\n", sendRemoteReq); fprintf(fp, "getResponse= %d\n", getResponse); + fprintf(fp, "executionTime = %s\n", exectime); fprintf(fp, "**********************************\n"); fflush(fp); fclose(fp); @@ -57,6 +79,7 @@ void transStatsHandler(int sig, siginfo_t* info, void *context) { printf("nchashSearch = %d\n", nchashSearch); printf("nmhashSearch = %d\n", nmhashSearch); printf("nprehashSearch = %d\n", nprehashSearch); + printf("ndirtyCacheObj = %d\n", ndirtyCacheObj); printf("nRemoteReadSend = %d\n", nRemoteSend); printf("nSoftAbort = %d\n", nSoftAbort); printf("bytesSent = %d\n", bytesSent); diff --git a/Robust/src/Runtime/DSTM/interface/threadnotify.c b/Robust/src/Runtime/DSTM/interface/threadnotify.c index 4df4dc43..d2614dbd 100644 --- a/Robust/src/Runtime/DSTM/interface/threadnotify.c +++ b/Robust/src/Runtime/DSTM/interface/threadnotify.c @@ -78,12 +78,15 @@ unsigned int notifyhashInsert(unsigned int tid, notifydata_t *ndata) { // Insert at the first position in the hashtable ptr[index].threadid = tid; ptr[index].ndata = ndata; + nlookup.numelements++; } else { tmp = &ptr[index]; while(tmp != NULL) { if(tmp->threadid == tid) { isFound = 1; tmp->ndata = ndata; + pthread_mutex_unlock(&nlookup.locktable); + return 0; } tmp = tmp->next; } @@ -97,6 +100,7 @@ unsigned int notifyhashInsert(unsigned int tid, notifydata_t *ndata) { node->ndata = ndata; node->next = ptr[index].next; ptr[index].next = node; + nlookup.numelements++; } } pthread_mutex_unlock(&nlookup.locktable); diff --git a/Robust/src/Runtime/DSTM/interface/trans.c b/Robust/src/Runtime/DSTM/interface/trans.c index 9007bc7e..11e27984 100644 --- a/Robust/src/Runtime/DSTM/interface/trans.c +++ b/Robust/src/Runtime/DSTM/interface/trans.c @@ -1,10 +1,11 @@ #include "dstm.h" +#include "debugmacro.h" #include "ip.h" #include "machinepile.h" -#include "mlookup.h" +#include "altmlookup.h" #include "llookup.h" #include "plookup.h" -#include "prelookup.h" +#include "altprelookup.h" #include "threadnotify.h" #include "queue.h" #include "addUdpEnhance.h" @@ -23,6 +24,39 @@ #define NUM_THREADS 1 #define CONFIG_FILENAME "dstm.conf" +//#define LOGEVENTS //turn on Logging events +#ifdef LOGEVENTS +char bigarray[16*1024*1024]; +int bigindex=0; +#define LOGEVENT(x) { \ + int tmp=bigindex++; \ + bigarray[tmp]=x; \ + } +#else +#define LOGEVENT(x) +#endif + +//#define LOGTIMES +#ifdef LOGTIMES +char bigarray1[6*1024*1024]; +unsigned int bigarray2[6*1024*1024]; +unsigned int bigarray3[6*1024*1024]; +long long bigarray4[6*1024*1024]; +int bigarray5[6*1024*1024]; +int bigindex1=0; +#define LOGTIME(x,y,z,a,b) {\ + int tmp=bigindex1; \ + bigarray1[tmp]=x; \ + bigarray2[tmp]=y; \ + bigarray3[tmp]=z; \ + bigarray4[tmp]=a; \ + bigarray5[tmp]=b; \ + bigindex1++; \ +} +#else +#define LOGTIME(x,y,z,a,b) +#endif + /* Thread transaction variables */ __thread objstr_t *t_cache; @@ -32,6 +66,7 @@ __thread int t_abort; __thread jmp_buf aborttrans; #endif +int globalid=0; /* This variable is a unique global identifier for a sendPrefetch request */ /* Global Variables */ extern int classsize[]; @@ -67,6 +102,7 @@ int numTransAbort = 0; int nchashSearch = 0; int nmhashSearch = 0; int nprehashSearch = 0; +int ndirtyCacheObj = 0; int nRemoteSend = 0; int nSoftAbort = 0; int bytesSent = 0; @@ -79,17 +115,7 @@ void printhex(unsigned char *, int); plistnode_t *createPiles(); plistnode_t *sortPiles(plistnode_t *pileptr); -//#define LOGEVENTS -#ifdef LOGEVENTS -char bigarray[16*1024*1024]; -int bigindex=0; -#define LOGEVENT(x) { \ - int tmp=bigindex++; \ - bigarray[tmp]=x; \ - } -#else -#define LOGEVENT(x) -#endif + /******************************* * Send and Recv function calls @@ -289,7 +315,7 @@ inline int findmax(int *array, int arraylength) { } //#define INLINEPREFETCH -#define PREFTHRESHOLD 4 +#define PREFTHRESHOLD 0 /* This function is a prefetch call generated by the compiler that * populates the shared primary prefetch queue*/ @@ -332,23 +358,24 @@ void prefetch(int siteid, int ntuples, unsigned int *oids, unsigned short *endof int numpref=numavailable(); attempted=1; - if (node==NULL && numpref!=0 || numpref==PREFTHRESHOLD) { + if (node==NULL && numpref!=0 || numpref>=PREFTHRESHOLD) { node=gettail(); - prefetchpile_t *pilehead = foundLocal(node,numpref); + prefetchpile_t *pilehead = foundLocal(node,numpref,siteid); if (pilehead!=NULL) { // Get sock from shared pool /* Send Prefetch Request */ prefetchpile_t *ptr = pilehead; while(ptr != NULL) { - int sd = getSock2(transPrefetchSockPool, ptr->mid); - sendPrefetchReq(ptr, sd); - ptr = ptr->next; + globalid++; + int sd = getSock2(transPrefetchSockPool, ptr->mid); + sendPrefetchReq(ptr, sd, globalid); + ptr = ptr->next; } mcdealloc(pilehead); - resetqueue(); } + resetqueue(); }//end do prefetch if condition } while(node==NULL); #else @@ -584,7 +611,6 @@ __attribute__((pure)) objheader_t *transRead(unsigned int oid) { #ifdef ABORTREADERS if (t_abort) { //abort this transaction - //printf("ABORTING\n"); removetransactionhash(); objstrDelete(t_cache); t_chashDelete(); @@ -613,6 +639,12 @@ __attribute__((pure)) objheader_t *transRead(unsigned int oid) { } else { #ifdef CACHE if((tmp = (objheader_t *) prehashSearch(oid)) != NULL) { + if(STATUS(tmp) & DIRTY) { +#ifdef TRANSSTATS + ndirtyCacheObj++; +#endif + goto remoteread; + } #ifdef TRANSSTATS nprehashSearch++; #endif @@ -629,6 +661,7 @@ __attribute__((pure)) objheader_t *transRead(unsigned int oid) { return objcopy; #endif } +remoteread: #endif /* Get the object from the remote location */ if((machinenumber = lhashSearch(oid)) == 0) { @@ -660,13 +693,7 @@ __attribute__((pure)) objheader_t *transRead(unsigned int oid) { pthread_mutex_unlock(&prefetchcache_mutex); memcpy(headerObj, objcopy, size+sizeof(objheader_t)); //make an entry in prefetch lookup hashtable - void *oldptr; - if((oldptr = prehashSearch(oid)) != NULL) { - prehashRemove(oid); - prehashInsert(oid, headerObj); - } else { - prehashInsert(oid, headerObj); - } + prehashInsert(oid, headerObj); LOGEVENT('B'); #endif return &objcopy[1]; @@ -681,6 +708,7 @@ __attribute__((pure)) objheader_t *transRead(unsigned int oid) { /* This function finds the location of the objects involved in a transaction * and returns the pointer to the object if found in a remote location */ __attribute__((pure)) objheader_t *transRead2(unsigned int oid) { +//DEBUG: __attribute__((pure)) objheader_t *transRead2(unsigned int oid, char tmpptr[]) { unsigned int machinenumber; objheader_t *tmp, *objheader; objheader_t *objcopy; @@ -689,7 +717,6 @@ __attribute__((pure)) objheader_t *transRead2(unsigned int oid) { #ifdef ABORTREADERS if (t_abort) { //abort this transaction - //printf("ABORTING\n"); removetransactionhash(); objstrDelete(t_cache); t_chashDelete(); @@ -718,6 +745,12 @@ __attribute__((pure)) objheader_t *transRead2(unsigned int oid) { } else { #ifdef CACHE if((tmp = (objheader_t *) prehashSearch(oid)) != NULL) { + if(STATUS(tmp) & DIRTY) { +#ifdef TRANSSTATS + ndirtyCacheObj++; +#endif + goto remoteread; + } #ifdef TRANSSTATS LOGEVENT('P') nprehashSearch++; @@ -727,6 +760,7 @@ __attribute__((pure)) objheader_t *transRead2(unsigned int oid) { size+=sizeof(objheader_t); objcopy = (objheader_t *) objstrAlloc(&t_cache, size); memcpy(objcopy, tmp, size); + LOGOIDTYPE("P",oid, TYPE(objcopy), myrdtsc()); /* Insert into cache's lookup table */ t_chashInsert(OID(tmp), objcopy); #ifdef COMPILER @@ -735,6 +769,7 @@ __attribute__((pure)) objheader_t *transRead2(unsigned int oid) { return objcopy; #endif } +remoteread: #endif /* Get the object from the remote location */ if((machinenumber = lhashSearch(oid)) == 0) { @@ -742,18 +777,18 @@ __attribute__((pure)) objheader_t *transRead2(unsigned int oid) { return NULL; } objcopy = getRemoteObj(machinenumber, oid); - - if(objcopy == NULL) { - printf("Error: Object not found in Remote location %s, %d\n", __FILE__, __LINE__); - return NULL; - } else { #ifdef TRANSSTATS - LOGEVENT('R'); nRemoteSend++; #endif + + if(objcopy == NULL) { + printf("Error: Object %u not found in Remote location %s, %d\n", oid,__FILE__, __LINE__); + return NULL; + } else { #ifdef COMPILER #ifdef CACHE + LOGOIDTYPE("RR",oid, TYPE(objcopy),myrdtsc()); //Copy object to prefetch cache pthread_mutex_lock(&prefetchcache_mutex); objheader_t *headerObj; @@ -768,13 +803,7 @@ __attribute__((pure)) objheader_t *transRead2(unsigned int oid) { pthread_mutex_unlock(&prefetchcache_mutex); memcpy(headerObj, objcopy, size+sizeof(objheader_t)); //make an entry in prefetch lookup hashtable - void *oldptr; - if((oldptr = prehashSearch(oid)) != NULL) { - prehashRemove(oid); - prehashInsert(oid, headerObj); - } else { - prehashInsert(oid, headerObj); - } + prehashInsert(oid, headerObj); LOGEVENT('B'); #endif return &objcopy[1]; @@ -892,12 +921,16 @@ int transCommit() { } #endif +#ifdef LOGTIMES + int jjj; + for(jjj=0; jjj 1) { + return 0; + } + } } else { return 0; } @@ -1639,7 +1683,7 @@ void *transPrefetch(void *t) { /* Check if the tuples are found locally, if yes then reduce them further*/ /* and group requests by remote machine ids by calling the makePreGroups() */ int count=numavailable(); - prefetchpile_t *pilehead = foundLocal(node, count); + prefetchpile_t *pilehead = foundLocal(node, count, 0); if (pilehead!=NULL) { // Get sock from shared pool @@ -1647,9 +1691,10 @@ void *transPrefetch(void *t) { /* Send Prefetch Request */ prefetchpile_t *ptr = pilehead; while(ptr != NULL) { - int sd = getSock2(transPrefetchSockPool, ptr->mid); - sendPrefetchReq(ptr, sd); - ptr = ptr->next; + globalid++; + int sd = getSock2(transPrefetchSockPool, ptr->mid); + sendPrefetchReq(ptr, sd,globalid); + ptr = ptr->next; } /* Release socket */ @@ -1692,20 +1737,26 @@ void sendPrefetchReqnew(prefetchpile_t *mcpilenode, int sd) { return; } -void sendPrefetchReq(prefetchpile_t *mcpilenode, int sd) { +/** + * parameters: mcpilenode -> pile node to traverse to assemble pref requests + * sd -> socket id + * gid -> global identifier for each prefetch request sent, starts with 0 + **/ +void sendPrefetchReq(prefetchpile_t *mcpilenode, int sd, int gid) { int len, endpair; char control; objpile_t *tmp; struct writestruct writebuffer; writebuffer.offset=0; + /* Send TRANS_PREFETCH control message */ int first=1; /* Send Oids and offsets in pairs */ tmp = mcpilenode->objpiles; while(tmp != NULL) { - len = sizeof(int) + sizeof(unsigned int) + sizeof(unsigned int) + ((tmp->numoffset) * sizeof(short)); + len = sizeof(int)+sizeof(int) + sizeof(unsigned int) + sizeof(unsigned int) + ((tmp->numoffset) * sizeof(short)); char oidnoffset[len+5]; char *buf=oidnoffset; if (first) { @@ -1716,12 +1767,15 @@ void sendPrefetchReq(prefetchpile_t *mcpilenode, int sd) { *((int*)buf) = tmp->numoffset; buf+=sizeof(int); *((unsigned int *)buf) = tmp->oid; + LOGOIDTYPE("S",tmp->oid,tmp->numoffset,myrdtsc()); #ifdef TRANSSTATS sendRemoteReq++; #endif buf+=sizeof(unsigned int); *((unsigned int *)buf) = myIpAddr; - buf += sizeof(unsigned int); + buf+= sizeof(unsigned int); + *((int*)buf) = gid; + buf+=sizeof(int); memcpy(buf, tmp->offset, (tmp->numoffset)*sizeof(short)); tmp = tmp->next; if (tmp==NULL) { @@ -1729,17 +1783,18 @@ void sendPrefetchReq(prefetchpile_t *mcpilenode, int sd) { len+=sizeof(int); } if (tmp!=NULL) - send_buf(sd, & writebuffer, oidnoffset, len); + send_buf(sd, &writebuffer, oidnoffset, len); else - forcesend_buf(sd, & writebuffer, oidnoffset, len); + forcesend_buf(sd, &writebuffer, oidnoffset, len); } - + LOGOIDTYPE("SREQ",0,0,myrdtsc()); LOGEVENT('S'); + LOGTIME('S',0,0,myrdtsc(),gid); //after sending return; } int getPrefetchResponse(int sd, struct readstruct *readbuffer) { - int length = 0, size = 0; + int gid,length = 0, size = 0; char control; unsigned int oid; void *modptr, *oldptr; @@ -1748,14 +1803,17 @@ int getPrefetchResponse(int sd, struct readstruct *readbuffer) { size = length - sizeof(int); char recvbuffer[size]; #ifdef TRANSSTATS - getResponse++; - LOGEVENT('Z'); + getResponse++; + LOGEVENT('Z'); + LOGTIME('K',0,0, myrdtsc(),0); //log time after first recv #endif - recv_data_buf(sd, readbuffer, recvbuffer, size); + recv_data_buf(sd, readbuffer, recvbuffer, size); control = *((char *) recvbuffer); if(control == OBJECT_FOUND) { oid = *((unsigned int *)(recvbuffer + sizeof(char))); - size = size - (sizeof(char) + sizeof(unsigned int)); + gid = *((int *) (recvbuffer+sizeof(char)+sizeof(unsigned int))); + LOGTIME('G',oid,0, myrdtsc(),gid); //log time after first recv + size = size - (sizeof(char) + sizeof(unsigned int) + sizeof(int)); pthread_mutex_lock(&prefetchcache_mutex); if ((modptr = prefetchobjstrAlloc(size)) == NULL) { printf("Error: objstrAlloc error for copying into prefetch cache %s, %d\n", __FILE__, __LINE__); @@ -1763,28 +1821,27 @@ int getPrefetchResponse(int sd, struct readstruct *readbuffer) { return -1; } pthread_mutex_unlock(&prefetchcache_mutex); - memcpy(modptr, recvbuffer + sizeof(char) + sizeof(unsigned int), size); + memcpy(modptr, recvbuffer + sizeof(char) + sizeof(unsigned int)+sizeof(int), size); STATUS(modptr)=0; + /* Insert the oid and its address into the prefetch hash lookup table */ /* Do a version comparison if the oid exists */ if((oldptr = prehashSearch(oid)) != NULL) { /* If older version then update with new object ptr */ if(((objheader_t *)oldptr)->version <= ((objheader_t *)modptr)->version) { - prehashRemove(oid); - prehashInsert(oid, modptr); + prehashInsert(oid, modptr); } } else { /* Else add the object ptr to hash table*/ prehashInsert(oid, modptr); } - /* Lock the Prefetch Cache look up table*/ - pthread_mutex_lock(&pflookup.lock); - /* Broadcast signal on prefetch cache condition variable */ - pthread_cond_broadcast(&pflookup.cond); - /* Unlock the Prefetch Cache look up table*/ - pthread_mutex_unlock(&pflookup.lock); + LOGOIDTYPE("GR",oid, TYPE(modptr),myrdtsc()); + LOGTIME('Z',oid, TYPE(modptr), myrdtsc(),gid); //log time after copying it into the prefetch cache } else if(control == OBJECT_NOT_FOUND) { oid = *((unsigned int *)(recvbuffer + sizeof(char))); + gid = *((int *) (recvbuffer+sizeof(char)+sizeof(unsigned int))); + LOGOIDTYPE("NF",oid,0,myrdtsc()); + LOGTIME('F',oid, 0, myrdtsc(),gid); //log time after copying it into the prefetch cache /* TODO: For each object not found query DHT for new location and retrieve the object */ /* Throw an error */ //printf("OBJECT %x NOT FOUND.... THIS SHOULD NOT HAPPEN...TERMINATE PROGRAM\n", oid); diff --git a/Robust/src/Runtime/garbage.c b/Robust/src/Runtime/garbage.c index 5160bc9f..2c404c74 100644 --- a/Robust/src/Runtime/garbage.c +++ b/Robust/src/Runtime/garbage.c @@ -30,7 +30,7 @@ #define NUMPTRS 100 -#define INITIALHEAPSIZE 5000*1024*1024L +#define INITIALHEAPSIZE 256*1024*1024L #define GCPOINT(x) ((INTPTR)((x)*0.99)) /* This define takes in how full the heap is initially and returns a new heap size to use */ #define HEAPSIZE(x,y) ((INTPTR)(x+y))*2 diff --git a/Robust/src/Runtime/runtime.c b/Robust/src/Runtime/runtime.c index 0eacaa6f..7a339cf3 100644 --- a/Robust/src/Runtime/runtime.c +++ b/Robust/src/Runtime/runtime.c @@ -13,7 +13,7 @@ #include "DSTM/interface_recovery/prelookup.h" #else #include "DSTM/interface/dstm.h" -#include "DSTM/interface/prelookup.h" +#include "DSTM/interface/altprelookup.h" #include "DSTM/interface/prefetch.h" #endif #endif -- 2.34.1