From 73741d7e7f4c211a887f5cf73135bb9c3658eed3 Mon Sep 17 00:00:00 2001 From: adash Date: Thu, 8 Jan 2009 07:53:18 +0000 Subject: [PATCH] more changes in rangeprefeches to handle prefetch requests at the server end --- .../src/Runtime/DSTM/interface/dstmserver.c | 15 ++ Robust/src/Runtime/DSTM/interface/prefetch.c | 199 +++++++++++++++++- Robust/src/Runtime/DSTM/interface/prefetch.h | 4 + 3 files changed, 215 insertions(+), 3 deletions(-) diff --git a/Robust/src/Runtime/DSTM/interface/dstmserver.c b/Robust/src/Runtime/DSTM/interface/dstmserver.c index 69e10ca1..578886e1 100644 --- a/Robust/src/Runtime/DSTM/interface/dstmserver.c +++ b/Robust/src/Runtime/DSTM/interface/dstmserver.c @@ -6,6 +6,7 @@ #include "mlookup.h" #include "llookup.h" #include "threadnotify.h" +#include "prefetch.h" #ifdef COMPILER #include "thread.h" #endif @@ -184,17 +185,31 @@ void *dstmAccept(void *acceptfd) { break; case TRANS_PREFETCH: +#ifdef RANGEPREFETCH + if((val = rangePrefetchReq((int)acceptfd)) != 0) { + printf("Error: In rangePrefetchReq() %s, %d\n", __FILE__, __LINE__); + break; + } +#else if((val = prefetchReq((int)acceptfd)) != 0) { printf("Error: In prefetchReq() %s, %d\n", __FILE__, __LINE__); break; } +#endif break; case TRANS_PREFETCH_RESPONSE: +#ifdef RANGEPREFETCH + if((val = getRangePrefetchResponse((int)acceptfd)) != 0) { + printf("Error: In getRangePrefetchRespose() %s, %d\n", __FILE__, __LINE__); + break; + } +#else if((val = getPrefetchResponse((int) acceptfd)) != 0) { printf("Error: In getPrefetchResponse() %s, %d\n", __FILE__, __LINE__); break; } +#endif break; case START_REMOTE_THREAD: diff --git a/Robust/src/Runtime/DSTM/interface/prefetch.c b/Robust/src/Runtime/DSTM/interface/prefetch.c index de2826a1..4bb74e97 100644 --- a/Robust/src/Runtime/DSTM/interface/prefetch.c +++ b/Robust/src/Runtime/DSTM/interface/prefetch.c @@ -4,6 +4,7 @@ extern sockPoolHashTable_t *transPrefetchSockPool; extern unsigned int myIpAddr; +extern sockPoolHashTable_t *transPResponseSocketPool; // Function for new prefetch call void rangePrefetch(unsigned int oid, short numoffset, short *offsets) { @@ -72,6 +73,7 @@ perMcPrefetchList_t* checkIfLocal(char *ptr) { int numLocal = 0; perMcPrefetchList_t * head=NULL; + //printf("Inside %s()\n", __func__); // Iterate for the object int noffset = (int) numoffsets; @@ -82,10 +84,12 @@ perMcPrefetchList_t* checkIfLocal(char *ptr) { tmpobjset[l] = GET_RANGE(offsets[2*l+1]); } int maxChldOids = getsize(tmpobjset, sizetmpObjSet)+1; + //printf("%s() maxChldOids = %d\n", __func__, maxChldOids); unsigned int chldOffstFrmBase[maxChldOids]; chldOffstFrmBase[0] = oid; int tovisit = 0, visited = -1; // Iterate for each element of offsets + //printf("%s() noffset = %d, sizetmpObjSet= %d, visited = %d, tovisit= %d\n", __func__, noffset, sizetmpObjSet, visited, tovisit); for (j = 0; j < noffset; j++) { // Iterate over each element to be visited while (visited != tovisit) { @@ -99,8 +103,9 @@ perMcPrefetchList_t* checkIfLocal(char *ptr) { unsigned int oid = chldOffstFrmBase[visited+1]; int machinenum = lhashSearch(oid); //TODO Group a bunch of oids to send in one prefetch request - insertPrefetch(machinenum, oid, noffset-j, offsets, &head); - break; + //printf("Oid Not Found, send Prefetch for oid = %d, noffset-j= %d j should point to the new offset = %d\n", oid, noffset-j, j); + insertPrefetch(machinenum, oid, noffset-j, &offsets[j], &head); + goto tuple; } else { // iterate over each offset int retval; @@ -111,7 +116,9 @@ perMcPrefetchList_t* checkIfLocal(char *ptr) { return NULL; } } + //printf("%s() visited = %d, tovisit= %d\n", __func__, visited, tovisit); visited++; + fflush(stdout); } } // end iterate for each element of offsets @@ -140,6 +147,7 @@ int isOidAvail(unsigned int oid) { int lookForObjs(int *chldOffstFrmBase, short *offsets, int *index, int *visited, int *tovisit, int *noffset) { + //printf("Inside %s()\n", __func__); objheader_t *header; unsigned int oid = chldOffstFrmBase[*visited+1]; if((header = (objheader_t *)mhashSearch(oid))!= NULL) { @@ -160,6 +168,7 @@ int lookForObjs(int *chldOffstFrmBase, short *offsets, /* Check if array out of bounds */ int startindex = offsets[*index]; int range = GET_RANGE(offsets[(*index)+1]); + //printf("%s() Array range = %d\n", __func__, range); if(range > 0 && range < length) { short stride = GET_STRIDE(offsets[(*index)+1]); stride = stride + 1; //NOTE bit pattern 000 => stride = 1, 001 => stride = 2 @@ -208,15 +217,20 @@ int lookForObjs(int *chldOffstFrmBase, short *offsets, } else { //linked list int startindex = offsets[*index]; int range = GET_RANGE(offsets[(*index)+1]); + //printf("%s() LinkedList range = %d\n", __func__, range); unsigned int oid = *((unsigned int *)(((char *)header) + sizeof(objheader_t) + startindex)); + //printf("Oid = %d\n", oid); if (range == 0) { chldOffstFrmBase[*tovisit+1] = oid; if(isOidAvail(oid)) { *visited = *visited + 1; + *tovisit = *tovisit + 1; *index = *index + 2; + //printf("%s() Found visited = %d, tovisit=%d, index=%d\n", __func__, *visited, *tovisit, *index); return 1; } else { *tovisit = *tovisit + 1; + //printf("%s() Not Found visited = %d, tovisit=%d, index=%d\n", __func__, *visited, *tovisit, *index); return 1; } } else { @@ -246,7 +260,6 @@ int lookForObjs(int *chldOffstFrmBase, short *offsets, } return 1; } - } return 1; } @@ -380,3 +393,183 @@ void sendRangePrefetchReq(perMcPrefetchList_t *mcpilenode, int sd) { return; } + +int getRangePrefetchResponse(int sd) { + //printf("Inside %s()\n", __func__); + + + return 0; +} + + +int rangePrefetchReq(int acceptfd) { + int numoffset, sd = -1; + unsigned int oid, mid = -1; + oidmidpair_t oidmid; + //printf("Inside %s()\n", __func__); + + while (1) { + recv_data(acceptfd, &numoffset, sizeof(int)); + if(numoffset == -1) + break; + recv_data(acceptfd, &oidmid, 2*sizeof(unsigned int)); + oid = oidmid.oid; + if(mid != oidmid.mid) { + if(mid!= -1) + freeSockWithLock(transPResponseSocketPool, mid, sd); + mid = oidmid.mid; + sd = getSockWithLock(transPResponseSocketPool, mid); + } + + short offsetsarry[numoffset]; + recv_data(acceptfd, offsetsarry, numoffset*sizeof(short)); + + /*Process each oid */ + objheader_t *header; + if((header = (objheader_t *)mhashSearch(oid)) == NULL) { + int size = sizeof(int) + sizeof(char) + sizeof(unsigned int); + char sendbuffer[size]; + *((int *) sendbuffer) = size; + *((char *)(sendbuffer + sizeof(int))) = OBJECT_NOT_FOUND; + *((unsigned int *)(sendbuffer + sizeof(int) + sizeof(unsigned int))) = oid; + char control = TRANS_PREFETCH_RESPONSE; + sendPrefetchResponse(sd, &control, sendbuffer, &size); + break; + } else { //Obj found + int retval; + if((retval = processOidFound(header, offsetsarry, 0, sd)) != 0) { + printf("%s() Error: in processOidFound() at line %d in %s()\n", + __func__, __LINE__, __FILE__); + return -1; + } + } + } + + //Release socket + if(mid!=-1) + freeSockWithLock(transPResponseSocketPool, mid, sd); + + return 0; +} + +int processOidFound(objheader_t *header, short * offsetsarry, int index, int sd) { + int objsize; + GETSIZE(objsize, header); + int size = sizeof(int) + sizeof(char) + sizeof(unsigned int) + + sizeof(objheader_t) + objsize; + char sendbuffer[size]; + int incr = 0; + *((int *)(sendbuffer + incr)) = size; + incr += sizeof(int); + *((char *)(sendbuffer + incr)) = OBJECT_FOUND; + incr += sizeof(char); + *((unsigned int *)(sendbuffer + incr)) = OID(header); + incr += sizeof(unsigned int); + memcpy(sendbuffer + incr, header, objsize + sizeof(objheader_t)); + + char control = TRANS_PREFETCH_RESPONSE; + sendPrefetchResponse(sd, &control, sendbuffer, &size); + + if(TYPE(header) > NUMCLASSES) { + int elementsize = classsize[TYPE(header)]; + struct ArrayObject *ao = (struct ArrayObject *) (((char *)header) + sizeof(objheader_t)); + int length = ao->___length___; + /* Check if array out of bounds */ + int startindex = offsetsarry[index]; + int range = GET_RANGE(offsetsarry[index+1]); + //printf("%s() Array range = %d\n", __func__, range); + if(range > 0 && range < length) { + short stride = GET_STRIDE(offsets[index+1]); + stride = stride + 1; //NOTE bit pattern 000 => stride = 1, 001 => stride = 2 + int i; + //check is stride +ve or negative + if(GET_STRIDEINC(offsets[index]+1)) { //-ve stride + for(i = startindex; i <= range+1; i = i - stride) { + unsigned int oid = 0; + if((i < 0 || i >= length)) { + //if yes treat the object as found + oid = 0; + continue; + } else { + // compute new object + oid = *((unsigned int *)(((char *)ao) + sizeof(struct ArrayObject) + (elementsize*i))); + } + if(oid == 0) + goto end; + } + } else { //+ve stride + for(i = startindex; i <= range; i = i + stride) { + unsigned int oid = 0; + if(i < 0 || i >= length) { + //if yes treat the object as found + oid = 0; + continue; + } else { + // compute new object + oid = *((unsigned int *)(((char *)ao) + sizeof(struct ArrayObject) + (elementsize*i))); + } + // add new object + chldOffstFrmBase[*tovisit] = oid; + *tovisit = *tovisit + 1; + } + } + } else if(range == 0) { + if(startindex >=0 || startindex < length) { + unsigned int oid = *((unsigned int *)(((char *)ao) + sizeof(struct ArrayObject) + (elementsize*startindex))); + // add new object + chldOffstFrmBase[*tovisit] = oid; + *tovisit = *tovisit + 1; + } + } + *index = *index + 2; + } else { //linked list + int startindex = offsets[*index]; + int range = GET_RANGE(offsets[(*index)+1]); + //printf("%s() LinkedList range = %d\n", __func__, range); + unsigned int oid = *((unsigned int *)(((char *)header) + sizeof(objheader_t) + startindex)); + //printf("Oid = %d\n", oid); + if (range == 0) { + chldOffstFrmBase[*tovisit+1] = oid; + if(isOidAvail(oid)) { + *visited = *visited + 1; + *tovisit = *tovisit + 1; + *index = *index + 2; + //printf("%s() Found visited = %d, tovisit=%d, index=%d\n", __func__, *visited, *tovisit, *index); + return 1; + } else { + *tovisit = *tovisit + 1; + //printf("%s() Not Found visited = %d, tovisit=%d, index=%d\n", __func__, *visited, *tovisit, *index); + return 1; + } + } else { + int i; + for(i = 0; i