more changes to support range prefetching :
authoradash <adash>
Tue, 25 Nov 2008 21:19:07 +0000 (21:19 +0000)
committeradash <adash>
Tue, 25 Nov 2008 21:19:07 +0000 (21:19 +0000)
TODO
1. optimizing boundary condition for arrays

Robust/src/Runtime/DSTM/interface/prefetch.c
Robust/src/Runtime/DSTM/interface/prefetch.h

index 183923a98280bd9ba9c97716eeb192ffc819b79b..d263b048f26e105d286481416a2f2c0935e6e2ce 100644 (file)
@@ -1,5 +1,9 @@
 #include "prefetch.h"
 #include "prelookup.h"
+#include "sockpool.h"
+
+extern sockPoolHashTable_t *transPrefetchSockPool;
+extern unsigned int myIpAddr;
 
 /* Steps for the new prefetch call */
 // Function for new prefetch call
@@ -35,8 +39,24 @@ void *transPrefetchNew() {
     /* Read from prefetch queue */
     void *node = gettail();
     /* Check tuples if they are found locally */
-    checkIfLocal(node);
+    perMcPrefetchList_t* pilehead = checkIfLocal(node);
+
+    if (pilehead!=NULL) {
+      // Get sock from shared pool
+      int sd = getSock2(transPrefetchSockPool, pilehead->mid);
+
+      /* Send  Prefetch Request */
+      perMcPrefetchList_t *ptr = pilehead;
+      while(ptr != NULL) {
+        sendRangePrefetchReq(ptr, sd);
+        ptr = ptr->next;
+      }
 
+      /* Deallocated pilehead */
+      proPrefetchQDealloc(pilehead);
+    }
+    // Deallocate the prefetch queue pile node
+    inctail();
   }
 }
 
@@ -49,7 +69,7 @@ int getsize(short *ptr, int n) {
   return sum;
 }
 
-void checkIfLocal(char *ptr) {
+perMcPrefetchList_t*  checkIfLocal(char *ptr) {
   int siteid = *(GET_SITEID(ptr));
   unsigned int *baseoids = GET_PTR_OID(ptr);
   unsigned int ntuples = *(GET_NTUPLES(ptr));
@@ -58,7 +78,7 @@ void checkIfLocal(char *ptr) {
   int i, j, k;
   int numLocal = 0;
 
-  prefetchpile_t * head=NULL;
+  perMcPrefetchList_t * head=NULL;
 
   // Iterate for each object
   for (i = 0; i < ntuples; i++) {
@@ -81,12 +101,13 @@ void checkIfLocal(char *ptr) {
           visited++;
           continue;
         }
+
         if (!isOidAvail(chldOffstFrmBase[visited+1])) { 
           // Add to remote requests 
           unsigned int oid = chldOffstFrmBase[visited+1];
-          unsigned int * oidarray = NULL; //TODO FILL THIS ARRAY
           int machinenum = lhashSearch(oid);
-          insertPile(machinenum, oidarray, numoffset-j, offsets, &head);
+          //TODO Group a bunch of oids to send in one prefetch request
+          insertPrefetch(machinenum, oid, numoffset-j, offsets, &head);
           break;
         } else {
           // iterate over each offset
@@ -112,6 +133,7 @@ tuple:
 
   /* handle dynamic prefetching */
   handleDynPrefetching(numLocal, ntuples, siteid);
+  return head;
 }
 
 int isOidAvail(unsigned int oid) {
@@ -215,138 +237,132 @@ int lookForObjs(int *chldOffstFrmBase, short *offsets,
   return 1;
 }
 
-#if 0
-int lookForObjs(unsigned int *oid, short *offset, int *numoids, int *newbase) {
-  objheader_t *header;
-  if((header = mhashSearch(*oid))!= NULL) {
-    //Found on machine
-    ;
-  } else if((header = prehashSearch(*oid))!=NULL) {
-    //Found in prefetch cache
-    ;
-  } else {
-    return 0;
-  }
+/* Delete perMcPrefetchList_t and everything it points to */
+void proPrefetchQDealloc(perMcPrefetchList_t *node) {
+  perMcPrefetchList_t *prefetchpile_ptr;
+  perMcPrefetchList_t *prefetchpile_next_ptr;
+  objOffsetPile_t *objpile_ptr;
+  objOffsetPile_t *objpile_next_ptr;
 
-  if(TYPE(header) > NUMCLASSES) {
-    int elementsize = classsize[TYPE(header)];
-    struct ArrayObject *ao = (struct ArrayObject *) (((char *)header) + sizeof(objheader_t));
-    int length = ao->___length___;
-        /* Check if array out of bounds */
-    if(offset[*newbase] < 0 || offset[*newbase] >= length) {
-      //if yes treat the object as found
-      (*oid)=0;
-      return 1;
-    } else {
-      if(getOtherOid(header, ao, offset, numoids, newbase))
-        return 1;
+  prefetchpile_ptr = node;
+  while (prefetchpile_ptr != NULL) {
+    prefetchpile_next_ptr = prefetchpile_ptr;
+    while(prefetchpile_ptr->list != NULL) {
+      //offsets aren't owned by us, so we don't free them.
+      objpile_ptr = prefetchpile_ptr->list;
+      prefetchpile_ptr->list = objpile_ptr->next;
+      free(objpile_ptr);
     }
-  } else { //linked list
-    //(*oid) = *((unsigned int *)(((char *)header) + sizeof(objheader_t) + offset));
-    if(getNext(header, offset, numoids, newbase)) 
-      return 1;
-    //(*newbase)++;
+    prefetchpile_ptr = prefetchpile_next_ptr->next;
+    free(prefetchpile_next_ptr);
   }
 }
 
-void resolveArrays(unsigned int *arrayOfOids, short *offset, int *numoids, int *newbase) {
-  /*
-  int i;
-  */
-}
+void insertPrefetch(int mid, unsigned int oid, short numoffset, short *offsets, perMcPrefetchList_t **head) {
+  perMcPrefetchList_t *ptr;
+  objOffsetPile_t *objnode;
+  objOffsetPile_t **tmp;
 
-int getOtherOid(header, ao, offset, numoids, newbase) {
-  short range, stride;
-  short startindex = offset[*newbase];
-  int getnewbaseVal = *newbase + 1;
-  if(getnewbaseVal == 0) {
-    (*oid) = *((unsigned int *)(((char *)ao) + sizeof(struct ArrayObject) + (elementsize*startindex)));
-    (*newbase) = (*newbase) + 2; //skip the immediate offset
-    return 1;
-  } else if(getnewbaseVal > 0) {
-    /* Resolve the oids within a given range */
-    (*newbase)++;
-    range = GET_RANGE(offset[*newbase]);
-    stride = GET_STRIDE((void *)(offset[*newbase]));
-    stride = stride + 1; //NOTE 000 => stride = 1, 001 => stride = 2
-    int index = 0;
-    unsigned int arrayOfOids[range+1];
-    if(GET_STRIDEINC(offset[*newbase])) { //-ve stride
+  //Loop through the machines
+  for(; 1; head=&((*head)->next)) {
+    int tmid;
+    if ((*head)==NULL||(tmid=(*head)->mid)>mid) {
+      perMcPrefetchList_t * tmp = (perMcPrefetchList_t *) malloc(sizeof(perMcPrefetchList_t));
+      tmp->mid = mid;
+      objnode =  malloc(sizeof(objOffsetPile_t));
+      objnode->offsets = offsets;
+      objnode->oid = oid;
+      objnode->numoffset = numoffset;
+      objnode->next = NULL;
+      tmp->list = objnode;
+      tmp->next = *head;
+      *head=tmp;
+      return;
+    }   
+
+    //keep looking
+    if (tmid < mid)
+      continue;
+
+    //found mid list
+    for(tmp=&((*head)->list); 1; tmp=&((*tmp)->next)) {
+      int toid;
+      int matchstatus;
+
+      if ((*tmp)==NULL||((toid=(*tmp)->oid)>oid)) {
+        objnode = (objOffsetPile_t *) malloc(sizeof(objOffsetPile_t));
+        objnode->offsets = offsets;
+        objnode->oid = oid;
+        objnode->numoffset = numoffset;
+        objnode->next = *tmp;
+        *tmp = objnode;
+        return;
+      }   
+      if (toid < oid)
+        continue;
+
+      /* Fill list DS */
       int i;
-      for(i = startindex; i <= range; i = i - stride) {
-        if(i < 0 || i >= length) {
-          //if yes treat the object as found
-          (*oid)=0;
-          return 1;
+      int onumoffset=(*tmp)->numoffset;
+      short * ooffset=(*tmp)->offsets;
+
+      for(i=0; i<numoffset; i++) {
+        if (i>onumoffset) {
+          //We've matched, let's just extend the current prefetch
+          (*tmp)->numoffset=numoffset;
+          (*tmp)->offsets=offsets;
+          return;
         }
-        arrayOfOids[index] = *((unsigned int *)(((char *)ao) + sizeof(struct ArrayObject) + (elementsize*i)));
-        index++;
-      }
-    } else { //+ve stride
-      int i;
-      for(i = startindex; i <= range; i = i + stride) {
-        if(i < 0 || i >= length) {
-          //if yes treat the object as found
-          (*oid)=0;
-          return 1;
+        if (ooffset[i]<offsets[i]) {
+          goto oidloop;
+        } else if (ooffset[i]>offsets[i]) {
+          //Place item before the current one
+          objnode = (objOffsetPile_t *) malloc(sizeof(objOffsetPile_t));
+          objnode->offsets = offsets;
+          objnode->oid = oid;
+          objnode->numoffset = numoffset;
+          objnode->next = *tmp;
+          *tmp = objnode;
+          return;
         }
-        arrayOfOids[index] = *((unsigned int *)(((char *)ao) + sizeof(struct ArrayObject) + (elementsize*i)));
-        index++;
       }
+      //if we get to the end, we're already covered by this prefetch
+      return;
+oidloop:
+      ;
     }
-    //(*oid) = *((unsigned int *)(((char *)ao) + sizeof(struct ArrayObject) + (elementsize*startindex)));
-    (*newbase) = (*newbase) + 2; 
-    return 1;
-  } else {
-    ;
   }
 }
 
+void sendRangePrefetchReq(perMcPrefetchList_t *mcpilenode, int sd) {
+  int len, endpair;
+  char control;
+  objOffsetPile_t *tmp;
 
-void checkIfLocal(char *ptr) {
-  int siteid = *(GET_SITEID(ptr));
-  int ntuples = *(GET_NTUPLES(ptr));
-  unsigned int *baseoids = GET_PTR_OID(ptr);
-  unsigned short *numoffsets = GET_PTR_EOFF(ptr, ntuples);
-  short *offsets = GET_PTR_ARRYFLD(ptr, ntuples);
-  prefetchpile_t * head=NULL;
-  int numLocal = 0;
+  /* Send TRANS_PREFETCH control message */
+  control = TRANS_PREFETCH;
+  send_data(sd, &control, sizeof(char));
 
-  int i ;
-  for(i=0; i<ntuples; i++) {
-    unsigned short baseindex=(i==0) ? 0 : numoffsets[i -1];
-    unsigned short endindex = numoffsets[i];
-    unsigned int oid = baseoids[i];
-    int numoids = 0;
-    if(oid == 0)
-      continue;
+  /* Send Oids and offsets in pairs */
+  tmp = mcpilenode->list;
+  while(tmp != NULL) {
+    len = sizeof(int) + sizeof(unsigned int) + sizeof(unsigned int) + ((tmp->numoffset) * sizeof(short));
+    char oidnoffset[len];
+    char *buf=oidnoffset;
+    *((int*)buf) = tmp->numoffset;
+    buf+=sizeof(int);
+    *((unsigned int *)buf) = tmp->oid;
+    buf+=sizeof(unsigned int);
+    *((unsigned int *)buf) = myIpAddr;
+    buf += sizeof(unsigned int);
+    memcpy(buf, tmp->offsets, (tmp->numoffset)*sizeof(short));
+    send_data(sd, oidnoffset, len);
+    tmp = tmp->next;
+  }
 
-    //Look up fields locally
-    int newbase;
-    for(newbase=baseindex; newbase<endindex; ) {
-      if(!lookForObjs(&oid, &offsets[newbase], &numoids, &newbase)) {
-       break;
-      }
-      //Ended in a null pointer
-      if(oid == 0)
-       goto tuple;
-    }
+  /* Send a special char -1 to represent the end of sending oids + offset pair to remote machine */
+  endpair = -1;
+  send_data(sd, &endpair, sizeof(int));
 
-    //Add to remote request
-    machinenum=lhashSearch(oid);
-    // Create an array of oids and offsets
-    unsigned int arrayoid[numoids];
-    unsigned short arraynumoffset[numoids];
-    void *arryfields[numoids];
-    for(i = 0; i<numoids; i++) {
-      arrayoid[i] = oid;
-      arraynumoffset[i] = endindex - newbase;
-      arryfields[i] = (void*)(&arryfields[newbase]);
-    }
-    //insertPile(machinenum, oid, endindex-newbase, &arryfields[newbase], &head);
-    insertPile(machinenum, arrayoid, arraynumoffset, arryfields, &head);
-tuple:
-    ;
-  }
+  return;
 }
-#endif
index 0920bce6c65bcec44aa721f35d140071d4900abe..79e0a49c76e746a9a973871492b543724cca8b35 100644 (file)
@@ -4,11 +4,42 @@
 #define GET_STRIDE(x) ((x & 0x7000) >> 12)
 #define GET_RANGE(x) (x & 0x0fff)
 #define GET_STRIDEINC(x) ((x & 0x8000) >> 15)
+
+/****** Global structure **********/
+typedef struct objOffsetPile {
+  unsigned int oid;
+  short numoffset;
+  short *offsets;
+  struct objOffsetPile *next;
+} objOffsetPile_t;
+
+typedef struct perMcPrefetchList {
+  unsigned int mid;
+  objOffsetPile_t *list;
+  struct perMcPrefetchList *next;
+} perMcPrefetchList_t;
+
+typedef struct proPrefetchQ {
+  perMcPrefetchList_t *front, *rear;
+  pthread_mutex_t qlock;
+  pthread_mutexattr_t qlockattr;
+  pthread_cond_t qcond;
+} proPrefetchQ_t;
+
+// Global Prefetch Processing Queue
+proPrefetchQ_t prefetchQ;
+
+/**** Prefetch Queue to be processed functions ******/
+void proPrefetchQDealloc(perMcPrefetchList_t *);
+
+/******** Process Queue Element functions ***********/
 void rangePrefetch(int, int, unsigned int *, unsigned short *, short *offset);
 void *transPrefetchNew();
-void checkIfLocal(char *ptr);
+perMcPrefetchList_t* checkIfLocal(char *ptr);
 int isOidAvail(unsigned int oid);
 int lookForObjs(int*, short *, int *, int *, int *);
+void insertPrefetch(int, unsigned int, short, short*, perMcPrefetchList_t **); 
+void sendRangePrefetchReq(perMcPrefetchList_t *, int sd);
 
 /************* Internal functions *******************/
 int getsize(short *ptr, int n);