for(top=0; top>=0;) {
oid=getNextOid(header, offsetarray, dfsList, top);
if (oid&1) {
+ int oldisField=TYPE(header) < NUMCLASSES;
top+=2;
dfsList[top]=oid;
dfsList[top+1]=0;
header=searchObj(oid);
if (header==NULL) {
//forward prefetch
- int machinenum = lhashSearch(dfsList[top]);
- insertPrefetch(machinenum, dfsList[top], numoffset-top, &offsetarray[top], &head);
+ int machinenum = lhashSearch(oid);
+
+ if (oldisField&&(dfsList[top-1]!=GET_RANGE(offsetarray[top+1])))
+ insertPrefetch(machinenum, oid, 2+numoffset-top, &offsetarray[top-2], &head);
+ else
+ insertPrefetch(machinenum, oid, numoffset-top, &offsetarray[top], &head);
} else if (top<offstop)
//okay to continue going down
continue;
return head;
}
+#define PBUFFERSIZE 16384
+
+
perMcPrefetchList_t *processRemote(unsigned int oid, short * offsetarray, int sd, short numoffset) {
int top;
unsigned int dfsList[numoffset];
+ char buffer[PBUFFERSIZE];
+ int bufoffset=0;
/* Initialize */
perMcPrefetchList_t *head = NULL;
insertPrefetch(machinenum, oid, numoffset, offsetarray, &head);
return head;
} else {
- sendOidFound(header, oid, sd);
+ sendOidFound(header, oid, sd, buffer, &bufoffset);
}
dfsList[0]=oid;
//Start searching the dfsList
for(top=0; top>=0;) {
oid=getNextOid(header, offsetarray, dfsList, top);
-
if (oid&1) {
+ int oldisField=TYPE(header) < NUMCLASSES;
top+=2;
dfsList[top]=oid;
dfsList[top+1]=0;
header=searchObj(oid);
if (header==NULL) {
//forward prefetch
- int machinenum = lhashSearch(dfsList[top]);
- insertPrefetch(machinenum, dfsList[top], numoffset-top, &offsetarray[top], &head);
+ int machinenum = lhashSearch(oid);
+ if (oldisField&&(dfsList[top-1]!=GET_RANGE(offsetarray[top+1])))
+ insertPrefetch(machinenum, oid, 2+numoffset-top, &offsetarray[top-2], &head);
+ else
+ insertPrefetch(machinenum, oid, numoffset-top, &offsetarray[top], &head);
} else {
- sendOidFound(header, oid, sd);
+ sendOidFound(header, oid, sd, buffer, &bufoffset);
if (top<offstop)
//okay to continue going down
continue;
do {
do {
top-=2;
- if (top<0)
+ if (top<0) {
+ flushResponses(sd, buffer, &bufoffset);
return head;
+ }
} while(dfsList[top+1] == GET_RANGE(offsetarray[top + 3]));
header=searchObj(dfsList[top]);
//increment
dfsList[top+1]++;
}
+ flushResponses(sd, buffer, &bufoffset);
return head;
}
char control = *((char *) recvbuffer);
unsigned int oid;
if(control == OBJECT_FOUND) {
- oid = *((unsigned int *)(recvbuffer + sizeof(char)));
size = size - (sizeof(char) + sizeof(unsigned int));
pthread_mutex_lock(&prefetchcache_mutex);
void *ptr;
pthread_mutex_unlock(&prefetchcache_mutex);
return -1;
}
+
+ void *tmp=ptr;
+ int osize=size;
pthread_mutex_unlock(&prefetchcache_mutex);
+
memcpy(ptr, recvbuffer + sizeof(char) + sizeof(unsigned int), size);
- STATUS(ptr)=0;
- /* Insert into prefetch hash lookup table */
- void * oldptr;
- if((oldptr = prehashSearch(oid)) != NULL) {
- if(((objheader_t *)oldptr)->version <= ((objheader_t *)ptr)->version) {
- prehashRemove(oid);
+ //ignore oid value...we'll get it from the object
+
+ while(size>0) {
+ unsigned int objsize;
+ GETSIZE(objsize, ptr);
+ STATUS(ptr)=0;
+ oid=OID(ptr);
+ objsize+=sizeof(objheader_t);
+
+ /* Insert into prefetch hash lookup table */
+ void * oldptr;
+ if((oldptr = prehashSearch(oid)) != NULL) {
+ if(((objheader_t *)oldptr)->version <= ((objheader_t *)ptr)->version) {
+ prehashRemove(oid);
+ prehashInsert(oid, ptr);
+ }
+ } else {
prehashInsert(oid, ptr);
}
- } else {
- prehashInsert(oid, ptr);
+ ptr=(void *)(((unsigned int)ptr)+objsize);
+ size-=objsize;
}
- objheader_t *head = prehashSearch(oid);
+
pthread_mutex_lock(&pflookup.lock);
pthread_cond_broadcast(&pflookup.cond);
pthread_mutex_unlock(&pflookup.lock);
int currcount = dfsList[top+1];
int range = GET_RANGE(offsetarray[top + 3]);
- if(TYPE(header) > NUMCLASSES) {
+ if(TYPE(header) >= NUMCLASSES) {
//Array case
struct ArrayObject *ao = (struct ArrayObject *) (((char *)header) + sizeof(objheader_t));
int stride = GET_STRIDE(offsetarray[top + 3])+1;
}
}
-int sendOidFound(objheader_t * header, unsigned int oid, int sd) {
- int incr = 0;
+void flushResponses(int sd, char * buffer, int * bufoffset) {
+ if ((*bufoffset)!=0) {
+ send_data(sd, buffer, *bufoffset);
+ *bufoffset=0;
+ }
+}
+
+int sendOidFound(objheader_t * header, unsigned int oid, int sd, char *buffer, int *bufoffset) {
+ int incr;
int objsize;
GETSIZE(objsize, header);
- int size = sizeof(int) + sizeof(char) + sizeof(unsigned int) + sizeof(objheader_t) + objsize;
- char sendbuffer[size];
- *((int *)(sendbuffer + incr)) = size;
- incr += sizeof(int);
- *((char *)(sendbuffer + incr)) = OBJECT_FOUND;
- incr += sizeof(char);
- *((unsigned int *)(sendbuffer + incr)) = oid;
- incr += sizeof(unsigned int);
- memcpy(sendbuffer + incr, header, objsize + sizeof(objheader_t));
+ int size = sizeof(objheader_t) + objsize;
+ char *sendbuffer;
+
+ if ((incr=(*bufoffset))==0) {
+ buffer[incr] = TRANS_PREFETCH_RESPONSE;
+ incr+=sizeof(char);
+ *((int *)(buffer + incr)) = size+sizeof(int)+sizeof(char)+sizeof(unsigned int);
+ incr += sizeof(int);
+ *((char *)(buffer + incr)) = OBJECT_FOUND;
+ incr += sizeof(char);
+ *((unsigned int *)(buffer + incr)) = oid;
+ incr += sizeof(unsigned int);
+ } else
+ *((int *)(buffer+sizeof(char)))+=size;
+
+ if ((incr+size)<PBUFFERSIZE) {
+ //don't need to allocate, just copy
+ sendbuffer=buffer;
+ (*bufoffset)=incr+size;
+ } else {
+ sendbuffer=alloca(size+incr);
+ memcpy(sendbuffer, buffer, incr);
+ *bufoffset=0;
+ }
- char control = TRANS_PREFETCH_RESPONSE;
- sendPrefetchResponse(sd, &control, sendbuffer, &size);
+ memcpy(sendbuffer + incr, header, size);
+ if ((*bufoffset)==0)
+ send_data(sd, sendbuffer, size+incr);
return 0;
}