From 5051519228ea2620e7b2898e4529c7ce8e8c5b9b Mon Sep 17 00:00:00 2001 From: adash Date: Tue, 25 Nov 2008 21:19:07 +0000 Subject: [PATCH] more changes to support range prefetching : TODO 1. optimizing boundary condition for arrays --- Robust/src/Runtime/DSTM/interface/prefetch.c | 258 ++++++++++--------- Robust/src/Runtime/DSTM/interface/prefetch.h | 33 ++- 2 files changed, 169 insertions(+), 122 deletions(-) diff --git a/Robust/src/Runtime/DSTM/interface/prefetch.c b/Robust/src/Runtime/DSTM/interface/prefetch.c index 183923a9..d263b048 100644 --- a/Robust/src/Runtime/DSTM/interface/prefetch.c +++ b/Robust/src/Runtime/DSTM/interface/prefetch.c @@ -1,5 +1,9 @@ #include "prefetch.h" #include "prelookup.h" +#include "sockpool.h" + +extern sockPoolHashTable_t *transPrefetchSockPool; +extern unsigned int myIpAddr; /* Steps for the new prefetch call */ // Function for new prefetch call @@ -35,8 +39,24 @@ void *transPrefetchNew() { /* Read from prefetch queue */ void *node = gettail(); /* Check tuples if they are found locally */ - checkIfLocal(node); + perMcPrefetchList_t* pilehead = checkIfLocal(node); + + if (pilehead!=NULL) { + // Get sock from shared pool + int sd = getSock2(transPrefetchSockPool, pilehead->mid); + + /* Send Prefetch Request */ + perMcPrefetchList_t *ptr = pilehead; + while(ptr != NULL) { + sendRangePrefetchReq(ptr, sd); + ptr = ptr->next; + } + /* Deallocated pilehead */ + proPrefetchQDealloc(pilehead); + } + // Deallocate the prefetch queue pile node + inctail(); } } @@ -49,7 +69,7 @@ int getsize(short *ptr, int n) { return sum; } -void checkIfLocal(char *ptr) { +perMcPrefetchList_t* checkIfLocal(char *ptr) { int siteid = *(GET_SITEID(ptr)); unsigned int *baseoids = GET_PTR_OID(ptr); unsigned int ntuples = *(GET_NTUPLES(ptr)); @@ -58,7 +78,7 @@ void checkIfLocal(char *ptr) { int i, j, k; int numLocal = 0; - prefetchpile_t * head=NULL; + perMcPrefetchList_t * head=NULL; // Iterate for each object for (i = 0; i < ntuples; i++) { @@ -81,12 +101,13 @@ void checkIfLocal(char *ptr) { visited++; continue; } + if (!isOidAvail(chldOffstFrmBase[visited+1])) { // Add to remote requests unsigned int oid = chldOffstFrmBase[visited+1]; - unsigned int * oidarray = NULL; //TODO FILL THIS ARRAY int machinenum = lhashSearch(oid); - insertPile(machinenum, oidarray, numoffset-j, offsets, &head); + //TODO Group a bunch of oids to send in one prefetch request + insertPrefetch(machinenum, oid, numoffset-j, offsets, &head); break; } else { // iterate over each offset @@ -112,6 +133,7 @@ tuple: /* handle dynamic prefetching */ handleDynPrefetching(numLocal, ntuples, siteid); + return head; } int isOidAvail(unsigned int oid) { @@ -215,138 +237,132 @@ int lookForObjs(int *chldOffstFrmBase, short *offsets, return 1; } -#if 0 -int lookForObjs(unsigned int *oid, short *offset, int *numoids, int *newbase) { - objheader_t *header; - if((header = mhashSearch(*oid))!= NULL) { - //Found on machine - ; - } else if((header = prehashSearch(*oid))!=NULL) { - //Found in prefetch cache - ; - } else { - return 0; - } +/* Delete perMcPrefetchList_t and everything it points to */ +void proPrefetchQDealloc(perMcPrefetchList_t *node) { + perMcPrefetchList_t *prefetchpile_ptr; + perMcPrefetchList_t *prefetchpile_next_ptr; + objOffsetPile_t *objpile_ptr; + objOffsetPile_t *objpile_next_ptr; - if(TYPE(header) > NUMCLASSES) { - int elementsize = classsize[TYPE(header)]; - struct ArrayObject *ao = (struct ArrayObject *) (((char *)header) + sizeof(objheader_t)); - int length = ao->___length___; - /* Check if array out of bounds */ - if(offset[*newbase] < 0 || offset[*newbase] >= length) { - //if yes treat the object as found - (*oid)=0; - return 1; - } else { - if(getOtherOid(header, ao, offset, numoids, newbase)) - return 1; + prefetchpile_ptr = node; + while (prefetchpile_ptr != NULL) { + prefetchpile_next_ptr = prefetchpile_ptr; + while(prefetchpile_ptr->list != NULL) { + //offsets aren't owned by us, so we don't free them. + objpile_ptr = prefetchpile_ptr->list; + prefetchpile_ptr->list = objpile_ptr->next; + free(objpile_ptr); } - } else { //linked list - //(*oid) = *((unsigned int *)(((char *)header) + sizeof(objheader_t) + offset)); - if(getNext(header, offset, numoids, newbase)) - return 1; - //(*newbase)++; + prefetchpile_ptr = prefetchpile_next_ptr->next; + free(prefetchpile_next_ptr); } } -void resolveArrays(unsigned int *arrayOfOids, short *offset, int *numoids, int *newbase) { - /* - int i; - */ -} +void insertPrefetch(int mid, unsigned int oid, short numoffset, short *offsets, perMcPrefetchList_t **head) { + perMcPrefetchList_t *ptr; + objOffsetPile_t *objnode; + objOffsetPile_t **tmp; -int getOtherOid(header, ao, offset, numoids, newbase) { - short range, stride; - short startindex = offset[*newbase]; - int getnewbaseVal = *newbase + 1; - if(getnewbaseVal == 0) { - (*oid) = *((unsigned int *)(((char *)ao) + sizeof(struct ArrayObject) + (elementsize*startindex))); - (*newbase) = (*newbase) + 2; //skip the immediate offset - return 1; - } else if(getnewbaseVal > 0) { - /* Resolve the oids within a given range */ - (*newbase)++; - range = GET_RANGE(offset[*newbase]); - stride = GET_STRIDE((void *)(offset[*newbase])); - stride = stride + 1; //NOTE 000 => stride = 1, 001 => stride = 2 - int index = 0; - unsigned int arrayOfOids[range+1]; - if(GET_STRIDEINC(offset[*newbase])) { //-ve stride + //Loop through the machines + for(; 1; head=&((*head)->next)) { + int tmid; + if ((*head)==NULL||(tmid=(*head)->mid)>mid) { + perMcPrefetchList_t * tmp = (perMcPrefetchList_t *) malloc(sizeof(perMcPrefetchList_t)); + tmp->mid = mid; + objnode = malloc(sizeof(objOffsetPile_t)); + objnode->offsets = offsets; + objnode->oid = oid; + objnode->numoffset = numoffset; + objnode->next = NULL; + tmp->list = objnode; + tmp->next = *head; + *head=tmp; + return; + } + + //keep looking + if (tmid < mid) + continue; + + //found mid list + for(tmp=&((*head)->list); 1; tmp=&((*tmp)->next)) { + int toid; + int matchstatus; + + if ((*tmp)==NULL||((toid=(*tmp)->oid)>oid)) { + objnode = (objOffsetPile_t *) malloc(sizeof(objOffsetPile_t)); + objnode->offsets = offsets; + objnode->oid = oid; + objnode->numoffset = numoffset; + objnode->next = *tmp; + *tmp = objnode; + return; + } + if (toid < oid) + continue; + + /* Fill list DS */ int i; - for(i = startindex; i <= range; i = i - stride) { - if(i < 0 || i >= length) { - //if yes treat the object as found - (*oid)=0; - return 1; + int onumoffset=(*tmp)->numoffset; + short * ooffset=(*tmp)->offsets; + + for(i=0; ionumoffset) { + //We've matched, let's just extend the current prefetch + (*tmp)->numoffset=numoffset; + (*tmp)->offsets=offsets; + return; } - arrayOfOids[index] = *((unsigned int *)(((char *)ao) + sizeof(struct ArrayObject) + (elementsize*i))); - index++; - } - } else { //+ve stride - int i; - for(i = startindex; i <= range; i = i + stride) { - if(i < 0 || i >= length) { - //if yes treat the object as found - (*oid)=0; - return 1; + if (ooffset[i]offsets[i]) { + //Place item before the current one + objnode = (objOffsetPile_t *) malloc(sizeof(objOffsetPile_t)); + objnode->offsets = offsets; + objnode->oid = oid; + objnode->numoffset = numoffset; + objnode->next = *tmp; + *tmp = objnode; + return; } - arrayOfOids[index] = *((unsigned int *)(((char *)ao) + sizeof(struct ArrayObject) + (elementsize*i))); - index++; } + //if we get to the end, we're already covered by this prefetch + return; +oidloop: + ; } - //(*oid) = *((unsigned int *)(((char *)ao) + sizeof(struct ArrayObject) + (elementsize*startindex))); - (*newbase) = (*newbase) + 2; - return 1; - } else { - ; } } +void sendRangePrefetchReq(perMcPrefetchList_t *mcpilenode, int sd) { + int len, endpair; + char control; + objOffsetPile_t *tmp; -void checkIfLocal(char *ptr) { - int siteid = *(GET_SITEID(ptr)); - int ntuples = *(GET_NTUPLES(ptr)); - unsigned int *baseoids = GET_PTR_OID(ptr); - unsigned short *numoffsets = GET_PTR_EOFF(ptr, ntuples); - short *offsets = GET_PTR_ARRYFLD(ptr, ntuples); - prefetchpile_t * head=NULL; - int numLocal = 0; + /* Send TRANS_PREFETCH control message */ + control = TRANS_PREFETCH; + send_data(sd, &control, sizeof(char)); - int i ; - for(i=0; ilist; + while(tmp != NULL) { + len = sizeof(int) + sizeof(unsigned int) + sizeof(unsigned int) + ((tmp->numoffset) * sizeof(short)); + char oidnoffset[len]; + char *buf=oidnoffset; + *((int*)buf) = tmp->numoffset; + buf+=sizeof(int); + *((unsigned int *)buf) = tmp->oid; + buf+=sizeof(unsigned int); + *((unsigned int *)buf) = myIpAddr; + buf += sizeof(unsigned int); + memcpy(buf, tmp->offsets, (tmp->numoffset)*sizeof(short)); + send_data(sd, oidnoffset, len); + tmp = tmp->next; + } - //Look up fields locally - int newbase; - for(newbase=baseindex; newbase> 12) #define GET_RANGE(x) (x & 0x0fff) #define GET_STRIDEINC(x) ((x & 0x8000) >> 15) + +/****** Global structure **********/ +typedef struct objOffsetPile { + unsigned int oid; + short numoffset; + short *offsets; + struct objOffsetPile *next; +} objOffsetPile_t; + +typedef struct perMcPrefetchList { + unsigned int mid; + objOffsetPile_t *list; + struct perMcPrefetchList *next; +} perMcPrefetchList_t; + +typedef struct proPrefetchQ { + perMcPrefetchList_t *front, *rear; + pthread_mutex_t qlock; + pthread_mutexattr_t qlockattr; + pthread_cond_t qcond; +} proPrefetchQ_t; + +// Global Prefetch Processing Queue +proPrefetchQ_t prefetchQ; + +/**** Prefetch Queue to be processed functions ******/ +void proPrefetchQDealloc(perMcPrefetchList_t *); + +/******** Process Queue Element functions ***********/ void rangePrefetch(int, int, unsigned int *, unsigned short *, short *offset); void *transPrefetchNew(); -void checkIfLocal(char *ptr); +perMcPrefetchList_t* checkIfLocal(char *ptr); int isOidAvail(unsigned int oid); int lookForObjs(int*, short *, int *, int *, int *); +void insertPrefetch(int, unsigned int, short, short*, perMcPrefetchList_t **); +void sendRangePrefetchReq(perMcPrefetchList_t *, int sd); /************* Internal functions *******************/ int getsize(short *ptr, int n); -- 2.34.1