manual prefetching changes
authoradash <adash>
Mon, 21 Dec 2009 21:58:16 +0000 (21:58 +0000)
committeradash <adash>
Mon, 21 Dec 2009 21:58:16 +0000 (21:58 +0000)
Robust/src/Runtime/DSTM/interface/prefetch.c
Robust/src/Runtime/DSTM/interface/prefetch.h

index 2464b48e1c902104684794f0d568aaad390bd8a1..fc65a716464598f7b821dc09d59e201e053d043f 100644 (file)
@@ -9,6 +9,27 @@ extern sockPoolHashTable_t *transPResponseSocketPool;
 extern pthread_mutex_t prefetchcache_mutex;
 extern prehashtable_t pflookup;
 
+#define LOGTIMES
+#ifdef LOGTIMES
+extern char bigarray1[6*1024*1024];
+extern unsigned int bigarray2[6*1024*1024];
+extern unsigned int bigarray3[6*1024*1024];
+extern long long bigarray4[6*1024*1024];
+extern int bigarray5[6*1024*1024];
+extern int bigindex1;
+#define LOGTIME(x,y,z,a,b) {\
+  int tmp=bigindex1; \
+  bigarray1[tmp]=x; \
+  bigarray2[tmp]=y; \
+  bigarray3[tmp]=z; \
+  bigarray4[tmp]=a; \
+  bigarray5[tmp]=b; \
+  bigindex1++; \
+}
+#else
+#define LOGTIME(x,y,z,a,b)
+//log(eventname, oid, type, time, unqiue id)
+#endif
 
 // Function for new prefetch call
 void rangePrefetch(unsigned int oid, short numoffset, short *offsets) {
@@ -21,6 +42,7 @@ void rangePrefetch(unsigned int oid, short numoffset, short *offsets) {
   ((unsigned int *)node)[0] = oid;
   index = index + (sizeof(unsigned int));
   *((short *)(node+index)) = numoffset;
+  LOGTIME('R',oid,numoffset,0,0);
   index = index + (sizeof(short));
   memcpy(node+index, offsets, numoffset * sizeof(short));
   movehead(qnodesize);
@@ -33,7 +55,9 @@ void *transPrefetchNew() {
 
     int count = numavailable();
     /* Check tuples if they are found locally */
+    LOGTIME('Z',0,0,0,0);
     perMcPrefetchList_t* pilehead = processLocal(node,count);
+    LOGTIME('Y',0,0,0,0);
 
     if (pilehead!=NULL) {
 
@@ -52,7 +76,6 @@ void *transPrefetchNew() {
 
     // Deallocate the prefetch queue pile node
     incmulttail(count);
-    //inctail();
   }
 }
 
@@ -69,9 +92,12 @@ perMcPrefetchList_t *processLocal(char *ptr, int numprefetches) {
     unsigned int dfsList[numoffset];
     int offstop=numoffset-2;
 
+    int countInvalidObj=0;
 
-    objheader_t * header = searchObj(oid);
+    objheader_t * header = searchObj(oid, &countInvalidObj);
+    //printf("%u %x\n", oid, header);
     if (header==NULL) {
+      LOGTIME('b',oid,0,0,countInvalidObj);
       //forward prefetch
       int machinenum = lhashSearch(oid);
       insertPrefetch(machinenum, oid, numoffset, offsetarray, &head);
@@ -80,17 +106,21 @@ perMcPrefetchList_t *processLocal(char *ptr, int numprefetches) {
     dfsList[0]=oid;
     dfsList[1]=0;
 
+    LOGTIME('B',oid,TYPE(header),0,countInvalidObj);
 
     //Start searching the dfsList
     for(top=0; top>=0;) {
-      oid=getNextOid(header, offsetarray, dfsList, top);
+      oid=getNextOid(header, offsetarray, dfsList, top, &countInvalidObj);
+      LOGTIME('O',oid,0,0,countInvalidObj);
+      int isLastOffset=0;
       if (oid&1) {
         int oldisField=TYPE(header) < NUMCLASSES;
         top+=2;
         dfsList[top]=oid;
         dfsList[top+1]=0;
-        header=searchObj(oid);
+        header=searchObj(oid, &countInvalidObj);
         if (header==NULL) {
+          LOGTIME('c',oid,top,0,countInvalidObj);
           //forward prefetch
           int machinenum = lhashSearch(oid);
 
@@ -98,10 +128,13 @@ perMcPrefetchList_t *processLocal(char *ptr, int numprefetches) {
             insertPrefetch(machinenum, oid, 2+numoffset-top, &offsetarray[top-2], &head);
           else
             insertPrefetch(machinenum, oid, numoffset-top, &offsetarray[top], &head);
-        } else if (top<offstop)
+        } else if (top<offstop) {
+          LOGTIME('C',oid,TYPE(header),0,top);
           //okay to continue going down
           continue;
+        }
       } else if (oid==2) {
+        LOGTIME('D',oid,0,0,top);
         //send prefetch first
         int objindex=top+2;
         int machinenum = lhashSearch(dfsList[objindex]);
@@ -110,16 +143,20 @@ perMcPrefetchList_t *processLocal(char *ptr, int numprefetches) {
       //oid is 0
       //go backwards until we can increment
       do {
+        int countInvalidObj1=1;
         do {
           top-=2;
-          if (top<0)
+          if (top<0) {
             return head;
+          }
         } while(dfsList[top+1] == GET_RANGE(offsetarray[top + 3]));
 
-        header=searchObj(dfsList[top]);
+        //header=searchObj(dfsList[top], &countInvalidObj1);
+        header=searchObj(dfsList[top], NULL);
         //header shouldn't be null unless the object moves away, but allow
         //ourselves the option to just continue on if we lose the object
       } while(header==NULL);
+      LOGTIME('F',OID(header),TYPE(header),0,top);
       //increment
       dfsList[top+1]++;
     }
@@ -139,13 +176,15 @@ perMcPrefetchList_t *processRemote(unsigned int oid,  short * offsetarray, int s
 
   /* Initialize */
   perMcPrefetchList_t *head = NULL;
+  int countInvalidObj=0;
 
-  objheader_t * header = searchObj(oid);
+  objheader_t * header = searchObj(oid, &countInvalidObj);
   int offstop=numoffset-2;
   if (header==NULL) {
+      LOGTIME('g',oid,0,0,countInvalidObj);
     //forward prefetch
-    int machinenum = lhashSearch(oid);
-    insertPrefetch(machinenum, oid, numoffset, offsetarray, &head);
+    //int machinenum = lhashSearch(oid);
+    //insertPrefetch(machinenum, oid, numoffset, offsetarray, &head);
     return head;
   } else {
     sendOidFound(header, oid, sd, buffer, &bufoffset);
@@ -153,34 +192,38 @@ perMcPrefetchList_t *processRemote(unsigned int oid,  short * offsetarray, int s
 
   dfsList[0]=oid;
   dfsList[1]=0;
+  LOGTIME('G',OID(header),TYPE(header),0,countInvalidObj);
 
   //Start searching the dfsList
   for(top=0; top>=0;) {
-    oid=getNextOid(header, offsetarray, dfsList, top);
+    oid=getNextOid(header, offsetarray, dfsList, top, &countInvalidObj);
     if (oid&1) {
       int oldisField=TYPE(header) < NUMCLASSES;
       top+=2;
       dfsList[top]=oid;
       dfsList[top+1]=0;
-      header=searchObj(oid);
+      header=searchObj(oid,&countInvalidObj);
       if (header==NULL) {
+        LOGTIME('h',oid,top,0,countInvalidObj);
        //forward prefetch
-       int machinenum = lhashSearch(oid);
-       if (oldisField&&(dfsList[top-1]!=GET_RANGE(offsetarray[top+1])))
-         insertPrefetch(machinenum, oid, 2+numoffset-top, &offsetarray[top-2], &head);
-       else
-         insertPrefetch(machinenum, oid, numoffset-top, &offsetarray[top], &head);
+       //int machinenum = lhashSearch(oid);
+       //if (oldisField&&(dfsList[top-1]!=GET_RANGE(offsetarray[top+1])))
+       //  insertPrefetch(machinenum, oid, 2+numoffset-top, &offsetarray[top-2], &head);
+       //else
+       //  insertPrefetch(machinenum, oid, numoffset-top, &offsetarray[top], &head);
       } else {
        sendOidFound(header, oid, sd, buffer, &bufoffset);
+    LOGTIME('H',oid,TYPE(header),0,top);
        if (top<offstop)
          //okay to continue going down
          continue;
       }
     } else if (oid==2) {
+      LOGTIME('I',oid,top,0,countInvalidObj);
       //send prefetch first
       int objindex=top+2;
-      int machinenum = lhashSearch(dfsList[objindex]);
-      insertPrefetch(machinenum, dfsList[objindex], numoffset-top, &offsetarray[top], &head);
+      //int machinenum = lhashSearch(dfsList[objindex]);
+      //insertPrefetch(machinenum, dfsList[objindex], numoffset-top, &offsetarray[top], &head);
     }
     //oid is 0
     //go backwards until we can increment
@@ -193,10 +236,12 @@ perMcPrefetchList_t *processRemote(unsigned int oid,  short * offsetarray, int s
        }
       } while(dfsList[top+1] == GET_RANGE(offsetarray[top + 3]));
 
-      header=searchObj(dfsList[top]);
+      //header=searchObj(dfsList[top], &countInvalidObj);
+      header=searchObj(dfsList[top], NULL);
       //header shouldn't be null unless the object moves away, but allow
       //ourselves the option to just continue on if we lose the object
     } while(header==NULL);
+    LOGTIME('K',OID(header),TYPE(header),0,top);
     //increment
     dfsList[top+1]++;
   }
@@ -205,12 +250,22 @@ perMcPrefetchList_t *processRemote(unsigned int oid,  short * offsetarray, int s
 }
 
 
-INLINE objheader_t *searchObj(unsigned int oid) {
+INLINE objheader_t *searchObj(unsigned int oid, int *countInvalidObj) {
   objheader_t *header;
   if ((header = (objheader_t *)mhashSearch(oid)) != NULL) {
     return header;
-  } else
-    return prehashSearch(oid);
+  } else {
+    header = prehashSearch(oid);
+    if(header != NULL) {
+      if((STATUS(header) & DIRTY) && (countInvalidObj!= NULL)) {
+        (*countInvalidObj)+=1;
+        if(*countInvalidObj > 1) {
+          return NULL;
+        }
+      }
+    }
+    return header;
+  }
 }
 
 /* Delete perMcPrefetchList_t and everything it points to */
@@ -464,7 +519,8 @@ int rangePrefetchReq(int acceptfd, struct readstruct * readbuffer) {
 }
 
 
-unsigned int getNextOid(objheader_t * header, short * offsetarray, unsigned int *dfsList, int top) {
+unsigned int getNextOid(objheader_t * header, short * offsetarray, unsigned int *dfsList, int top, int *countInvalidObj) 
+{
   int startindex= offsetarray[top+2];
   int currcount = dfsList[top+1];
   int range = GET_RANGE(offsetarray[top + 3]);
@@ -504,7 +560,7 @@ unsigned int getNextOid(objheader_t * header, short * offsetarray, unsigned int
 
     if(currcount!=0 & range != 0) {
       //go to the next offset
-      header=searchObj(dfsList[top+2]);
+      header=searchObj(dfsList[top+2],countInvalidObj);
       if (header==NULL)
        return 2;
     }
index 62b9703edd9e26bf50a5948e2989885d4e299e34..ee537c9a667ab7a6034dae622829823da237ee61 100644 (file)
@@ -62,11 +62,13 @@ void sendRangePrefetchReq(perMcPrefetchList_t *, int sd, unsigned int mid);
 int rangePrefetchReq(int acceptfd, struct readstruct * readbuffer);
 int processOidFound(objheader_t *, short *, int, int, int);
 int getRangePrefetchResponse(int sd, struct readstruct *);
-INLINE objheader_t *searchObj(unsigned int);
+//INLINE objheader_t *searchObj(unsigned int);
+INLINE objheader_t *searchObj(unsigned int, int*);
 
 
 /*********** Functions for computation at the participant end **********/
-unsigned int getNextOid(objheader_t * header, short * offsetarray, unsigned int *dfsList, int top);
+//unsigned int getNextOid(objheader_t * header, short * offsetarray, unsigned int *dfsList, int top);
+unsigned int getNextOid(objheader_t * header, short * offsetarray, unsigned int *dfsList, int top, int *);
 int sendOidFound(objheader_t *, unsigned int, int, char *buffer, int *bufoffset);
 int sendOidNotFound(unsigned int oid, int sd);
 void flushResponses(int sd, char * buffer, int * bufoffset);