#include "prefetch.h"
#include "prelookup.h"
+#include "sockpool.h"
+
+extern sockPoolHashTable_t *transPrefetchSockPool;
+extern unsigned int myIpAddr;
/* Steps for the new prefetch call */
// Function for new prefetch call
/* Read from prefetch queue */
void *node = gettail();
/* Check tuples if they are found locally */
- checkIfLocal(node);
+ perMcPrefetchList_t* pilehead = checkIfLocal(node);
+
+ if (pilehead!=NULL) {
+ // Get sock from shared pool
+ int sd = getSock2(transPrefetchSockPool, pilehead->mid);
+
+ /* Send Prefetch Request */
+ perMcPrefetchList_t *ptr = pilehead;
+ while(ptr != NULL) {
+ sendRangePrefetchReq(ptr, sd);
+ ptr = ptr->next;
+ }
+ /* Deallocated pilehead */
+ proPrefetchQDealloc(pilehead);
+ }
+ // Deallocate the prefetch queue pile node
+ inctail();
}
}
return sum;
}
-void checkIfLocal(char *ptr) {
+perMcPrefetchList_t* checkIfLocal(char *ptr) {
int siteid = *(GET_SITEID(ptr));
unsigned int *baseoids = GET_PTR_OID(ptr);
unsigned int ntuples = *(GET_NTUPLES(ptr));
int i, j, k;
int numLocal = 0;
- prefetchpile_t * head=NULL;
+ perMcPrefetchList_t * head=NULL;
// Iterate for each object
for (i = 0; i < ntuples; i++) {
visited++;
continue;
}
+
if (!isOidAvail(chldOffstFrmBase[visited+1])) {
// Add to remote requests
unsigned int oid = chldOffstFrmBase[visited+1];
- unsigned int * oidarray = NULL; //TODO FILL THIS ARRAY
int machinenum = lhashSearch(oid);
- insertPile(machinenum, oidarray, numoffset-j, offsets, &head);
+ //TODO Group a bunch of oids to send in one prefetch request
+ insertPrefetch(machinenum, oid, numoffset-j, offsets, &head);
break;
} else {
// iterate over each offset
/* handle dynamic prefetching */
handleDynPrefetching(numLocal, ntuples, siteid);
+ return head;
}
int isOidAvail(unsigned int oid) {
return 1;
}
-#if 0
-int lookForObjs(unsigned int *oid, short *offset, int *numoids, int *newbase) {
- objheader_t *header;
- if((header = mhashSearch(*oid))!= NULL) {
- //Found on machine
- ;
- } else if((header = prehashSearch(*oid))!=NULL) {
- //Found in prefetch cache
- ;
- } else {
- return 0;
- }
+/* Delete perMcPrefetchList_t and everything it points to */
+void proPrefetchQDealloc(perMcPrefetchList_t *node) {
+ perMcPrefetchList_t *prefetchpile_ptr;
+ perMcPrefetchList_t *prefetchpile_next_ptr;
+ objOffsetPile_t *objpile_ptr;
+ objOffsetPile_t *objpile_next_ptr;
- if(TYPE(header) > NUMCLASSES) {
- int elementsize = classsize[TYPE(header)];
- struct ArrayObject *ao = (struct ArrayObject *) (((char *)header) + sizeof(objheader_t));
- int length = ao->___length___;
- /* Check if array out of bounds */
- if(offset[*newbase] < 0 || offset[*newbase] >= length) {
- //if yes treat the object as found
- (*oid)=0;
- return 1;
- } else {
- if(getOtherOid(header, ao, offset, numoids, newbase))
- return 1;
+ prefetchpile_ptr = node;
+ while (prefetchpile_ptr != NULL) {
+ prefetchpile_next_ptr = prefetchpile_ptr;
+ while(prefetchpile_ptr->list != NULL) {
+ //offsets aren't owned by us, so we don't free them.
+ objpile_ptr = prefetchpile_ptr->list;
+ prefetchpile_ptr->list = objpile_ptr->next;
+ free(objpile_ptr);
}
- } else { //linked list
- //(*oid) = *((unsigned int *)(((char *)header) + sizeof(objheader_t) + offset));
- if(getNext(header, offset, numoids, newbase))
- return 1;
- //(*newbase)++;
+ prefetchpile_ptr = prefetchpile_next_ptr->next;
+ free(prefetchpile_next_ptr);
}
}
-void resolveArrays(unsigned int *arrayOfOids, short *offset, int *numoids, int *newbase) {
- /*
- int i;
- */
-}
+void insertPrefetch(int mid, unsigned int oid, short numoffset, short *offsets, perMcPrefetchList_t **head) {
+ perMcPrefetchList_t *ptr;
+ objOffsetPile_t *objnode;
+ objOffsetPile_t **tmp;
-int getOtherOid(header, ao, offset, numoids, newbase) {
- short range, stride;
- short startindex = offset[*newbase];
- int getnewbaseVal = *newbase + 1;
- if(getnewbaseVal == 0) {
- (*oid) = *((unsigned int *)(((char *)ao) + sizeof(struct ArrayObject) + (elementsize*startindex)));
- (*newbase) = (*newbase) + 2; //skip the immediate offset
- return 1;
- } else if(getnewbaseVal > 0) {
- /* Resolve the oids within a given range */
- (*newbase)++;
- range = GET_RANGE(offset[*newbase]);
- stride = GET_STRIDE((void *)(offset[*newbase]));
- stride = stride + 1; //NOTE 000 => stride = 1, 001 => stride = 2
- int index = 0;
- unsigned int arrayOfOids[range+1];
- if(GET_STRIDEINC(offset[*newbase])) { //-ve stride
+ //Loop through the machines
+ for(; 1; head=&((*head)->next)) {
+ int tmid;
+ if ((*head)==NULL||(tmid=(*head)->mid)>mid) {
+ perMcPrefetchList_t * tmp = (perMcPrefetchList_t *) malloc(sizeof(perMcPrefetchList_t));
+ tmp->mid = mid;
+ objnode = malloc(sizeof(objOffsetPile_t));
+ objnode->offsets = offsets;
+ objnode->oid = oid;
+ objnode->numoffset = numoffset;
+ objnode->next = NULL;
+ tmp->list = objnode;
+ tmp->next = *head;
+ *head=tmp;
+ return;
+ }
+
+ //keep looking
+ if (tmid < mid)
+ continue;
+
+ //found mid list
+ for(tmp=&((*head)->list); 1; tmp=&((*tmp)->next)) {
+ int toid;
+ int matchstatus;
+
+ if ((*tmp)==NULL||((toid=(*tmp)->oid)>oid)) {
+ objnode = (objOffsetPile_t *) malloc(sizeof(objOffsetPile_t));
+ objnode->offsets = offsets;
+ objnode->oid = oid;
+ objnode->numoffset = numoffset;
+ objnode->next = *tmp;
+ *tmp = objnode;
+ return;
+ }
+ if (toid < oid)
+ continue;
+
+ /* Fill list DS */
int i;
- for(i = startindex; i <= range; i = i - stride) {
- if(i < 0 || i >= length) {
- //if yes treat the object as found
- (*oid)=0;
- return 1;
+ int onumoffset=(*tmp)->numoffset;
+ short * ooffset=(*tmp)->offsets;
+
+ for(i=0; i<numoffset; i++) {
+ if (i>onumoffset) {
+ //We've matched, let's just extend the current prefetch
+ (*tmp)->numoffset=numoffset;
+ (*tmp)->offsets=offsets;
+ return;
}
- arrayOfOids[index] = *((unsigned int *)(((char *)ao) + sizeof(struct ArrayObject) + (elementsize*i)));
- index++;
- }
- } else { //+ve stride
- int i;
- for(i = startindex; i <= range; i = i + stride) {
- if(i < 0 || i >= length) {
- //if yes treat the object as found
- (*oid)=0;
- return 1;
+ if (ooffset[i]<offsets[i]) {
+ goto oidloop;
+ } else if (ooffset[i]>offsets[i]) {
+ //Place item before the current one
+ objnode = (objOffsetPile_t *) malloc(sizeof(objOffsetPile_t));
+ objnode->offsets = offsets;
+ objnode->oid = oid;
+ objnode->numoffset = numoffset;
+ objnode->next = *tmp;
+ *tmp = objnode;
+ return;
}
- arrayOfOids[index] = *((unsigned int *)(((char *)ao) + sizeof(struct ArrayObject) + (elementsize*i)));
- index++;
}
+ //if we get to the end, we're already covered by this prefetch
+ return;
+oidloop:
+ ;
}
- //(*oid) = *((unsigned int *)(((char *)ao) + sizeof(struct ArrayObject) + (elementsize*startindex)));
- (*newbase) = (*newbase) + 2;
- return 1;
- } else {
- ;
}
}
+void sendRangePrefetchReq(perMcPrefetchList_t *mcpilenode, int sd) {
+ int len, endpair;
+ char control;
+ objOffsetPile_t *tmp;
-void checkIfLocal(char *ptr) {
- int siteid = *(GET_SITEID(ptr));
- int ntuples = *(GET_NTUPLES(ptr));
- unsigned int *baseoids = GET_PTR_OID(ptr);
- unsigned short *numoffsets = GET_PTR_EOFF(ptr, ntuples);
- short *offsets = GET_PTR_ARRYFLD(ptr, ntuples);
- prefetchpile_t * head=NULL;
- int numLocal = 0;
+ /* Send TRANS_PREFETCH control message */
+ control = TRANS_PREFETCH;
+ send_data(sd, &control, sizeof(char));
- int i ;
- for(i=0; i<ntuples; i++) {
- unsigned short baseindex=(i==0) ? 0 : numoffsets[i -1];
- unsigned short endindex = numoffsets[i];
- unsigned int oid = baseoids[i];
- int numoids = 0;
- if(oid == 0)
- continue;
+ /* Send Oids and offsets in pairs */
+ tmp = mcpilenode->list;
+ while(tmp != NULL) {
+ len = sizeof(int) + sizeof(unsigned int) + sizeof(unsigned int) + ((tmp->numoffset) * sizeof(short));
+ char oidnoffset[len];
+ char *buf=oidnoffset;
+ *((int*)buf) = tmp->numoffset;
+ buf+=sizeof(int);
+ *((unsigned int *)buf) = tmp->oid;
+ buf+=sizeof(unsigned int);
+ *((unsigned int *)buf) = myIpAddr;
+ buf += sizeof(unsigned int);
+ memcpy(buf, tmp->offsets, (tmp->numoffset)*sizeof(short));
+ send_data(sd, oidnoffset, len);
+ tmp = tmp->next;
+ }
- //Look up fields locally
- int newbase;
- for(newbase=baseindex; newbase<endindex; ) {
- if(!lookForObjs(&oid, &offsets[newbase], &numoids, &newbase)) {
- break;
- }
- //Ended in a null pointer
- if(oid == 0)
- goto tuple;
- }
+ /* Send a special char -1 to represent the end of sending oids + offset pair to remote machine */
+ endpair = -1;
+ send_data(sd, &endpair, sizeof(int));
- //Add to remote request
- machinenum=lhashSearch(oid);
- // Create an array of oids and offsets
- unsigned int arrayoid[numoids];
- unsigned short arraynumoffset[numoids];
- void *arryfields[numoids];
- for(i = 0; i<numoids; i++) {
- arrayoid[i] = oid;
- arraynumoffset[i] = endindex - newbase;
- arryfields[i] = (void*)(&arryfields[newbase]);
- }
- //insertPile(machinenum, oid, endindex-newbase, &arryfields[newbase], &head);
- insertPile(machinenum, arrayoid, arraynumoffset, arryfields, &head);
-tuple:
- ;
- }
+ return;
}
-#endif