#define GET_PTR_OID(x) ((unsigned int *)(x + 2*sizeof(int)))
#define GET_PTR_EOFF(x,n) ((short *)(x + 2*sizeof(int) + (n*sizeof(unsigned int))))
#define GET_PTR_ARRYFLD(x,n) ((short *)(x + 2*sizeof(int) + (n*sizeof(unsigned int)) + (n*sizeof(short))))
+
#define ENDEBUG(s) { printf("Inside %s()\n", s); fflush(stdout);}
#define EXDEBUG(s) {printf("Outside %s()\n", s); fflush(stdout);}
/*****************************************
void prefetch(int, int, unsigned int *, unsigned short *, short*);
void *transPrefetch(void *);
void *mcqProcess(void *);
-prefetchpile_t *foundLocal(char *); // returns node with prefetch elements(oids, offsets)
+prefetchpile_t *foundLocal(char *, int); // returns node with prefetch elements(oids, offsets)
int lookupObject(unsigned int * oid, short offset);
int checkoid(unsigned int oid);
int transPrefetchProcess(int **, short);
return max;
}
-#define INLINEPREFETCH 1
+#define INLINEPREFETCH 0
+#define PREFTHRESHOLD 4
/* This function is a prefetch call generated by the compiler that
* populates the shared primary prefetch queue*/
int qnodesize = 2*sizeof(int) + ntuples * (sizeof(unsigned short) + sizeof(unsigned int)) + endoffsets[ntuples - 1] * sizeof(short);
int len;
#ifdef INLINEPREFETCH
- char node[qnodesize];
+ int attempted=0;
+ char *node;
+ do {
+ node=getmemory(qnodesize);
+ if (node==NULL&&attempted)
+ break;
+ if (node!=NULL) {
#else
char *node=getmemory(qnodesize);
#endif
memcpy(node+len+ntuples*(sizeof(unsigned int)+sizeof(short)), arrayfields, top*sizeof(short));
#ifdef INLINEPREFETCH
- prefetchpile_t *pilehead = foundLocal(node);
+ movehead(qnodesize);
+ }
+ int numpref=numavailable();
+ attempted=1;
- if (pilehead!=NULL) {
- // Get sock from shared pool
-
- /* Send Prefetch Request */
- prefetchpile_t *ptr = pilehead;
- while(ptr != NULL) {
- int sd = getSock2(transPrefetchSockPool, ptr->mid);
- sendPrefetchReq(ptr, sd);
- ptr = ptr->next;
+ if (node==NULL && numpref!=0 || numpref==PREFTHRESHOLD) {
+ node=gettail();
+ prefetchpile_t *pilehead = foundLocal(node,numpref);
+ if (pilehead!=NULL) {
+ // Get sock from shared pool
+
+ /* Send Prefetch Request */
+ prefetchpile_t *ptr = pilehead;
+ while(ptr != NULL) {
+ int sd = getSock2(transPrefetchSockPool, ptr->mid);
+ sendPrefetchReq(ptr, sd);
+ ptr = ptr->next;
+ }
+
+ mcdealloc(pilehead);
+ resetqueue();
}
-
- /* Release socket */
- // freeSock(transPrefetchSockPool, pilehead->mid, sd);
-
- /* Deallocated pilehead */
- mcdealloc(pilehead);
- }
+ }//end do prefetch if condition
+ } while(node==NULL);
#else
/* Lock and insert into primary prefetch queue */
movehead(qnodesize);
return 0;
}
-prefetchpile_t *foundLocal(char *ptr) {
- int siteid = *(GET_SITEID(ptr));
- int ntuples = *(GET_NTUPLES(ptr));
- unsigned int * oidarray = GET_PTR_OID(ptr);
- unsigned short * endoffsets = GET_PTR_EOFF(ptr, ntuples);
- short * arryfields = GET_PTR_ARRYFLD(ptr, ntuples);
+prefetchpile_t *foundLocal(char *ptr, int numprefetches) {
+ int i;
+ int j;
prefetchpile_t * head=NULL;
- int numLocal = 0;
- int i;
- for(i=0; i<ntuples; i++) {
- unsigned short baseindex=(i==0) ? 0 : endoffsets[i-1];
- unsigned short endindex=endoffsets[i];
- unsigned int oid=oidarray[i];
- int newbase;
- int machinenum;
-
- if (oid==0)
- continue;
- //Look up fields locally
- for(newbase=baseindex; newbase<endindex; newbase++) {
- if (!lookupObject(&oid, arryfields[newbase]))
- break;
- //Ended in a null pointer...
+ for(j=0;j<numprefetches;j++) {
+ int siteid = *(GET_SITEID(ptr));
+ int ntuples = *(GET_NTUPLES(ptr));
+ unsigned int * oidarray = GET_PTR_OID(ptr);
+ unsigned short * endoffsets = GET_PTR_EOFF(ptr, ntuples);
+ short * arryfields = GET_PTR_ARRYFLD(ptr, ntuples);
+ int numLocal = 0;
+
+ for(i=0; i<ntuples; i++) {
+ unsigned short baseindex=(i==0) ? 0 : endoffsets[i-1];
+ unsigned short endindex=endoffsets[i];
+ unsigned int oid=oidarray[i];
+ int newbase;
+ int machinenum;
+
if (oid==0)
+ continue;
+ //Look up fields locally
+ for(newbase=baseindex; newbase<endindex; newbase++) {
+ if (!lookupObject(&oid, arryfields[newbase]))
+ break;
+ //Ended in a null pointer...
+ if (oid==0)
+ goto tuple;
+ }
+ //Entire prefetch is local
+ if (newbase==endindex&&checkoid(oid)) {
+ numLocal++;
goto tuple;
+ }
+ //Add to remote requests
+ machinenum=lhashSearch(oid);
+ insertPile(machinenum, oid, endindex-newbase, &arryfields[newbase], &head);
+ tuple:
+ ;
}
- //Entire prefetch is local
- if (newbase==endindex&&checkoid(oid)) {
- numLocal++;
- goto tuple;
- }
- //Add to remote requests
- machinenum=lhashSearch(oid);
- insertPile(machinenum, oid, endindex-newbase, &arryfields[newbase], &head);
-tuple:
- ;
+
+ /* handle dynamic prefetching */
+ handleDynPrefetching(numLocal, ntuples, siteid);
+ ptr=((char *)&arryfields[endoffsets[ntuples-1]])+sizeof(int);
}
- /* handle dynamic prefetching */
- handleDynPrefetching(numLocal, ntuples, siteid);
return head;
}
void *node=gettail();
/* Check if the tuples are found locally, if yes then reduce them further*/
/* and group requests by remote machine ids by calling the makePreGroups() */
- prefetchpile_t *pilehead = foundLocal(node);
+ int count=numavailable();
+ prefetchpile_t *pilehead = foundLocal(node, count);
if (pilehead!=NULL) {
// Get sock from shared pool
mcdealloc(pilehead);
}
// Deallocate the prefetch queue pile node
- inctail();
+ inctail(numavailable);
}
}
int len, endpair;
char control;
objpile_t *tmp;
+ struct writestruct writebuffer;
+ writebuffer.offset=0;
/* Send TRANS_PREFETCH control message */
int first=1;
*((int *)(&oidnoffset[len]))=-1;
len+=sizeof(int);
}
- send_data(sd, oidnoffset, len);
+ if (tmp!=NULL)
+ send_buf(sd, & writebuffer, oidnoffset, len);
+ else
+ forcesend_buf(sd, & writebuffer, oidnoffset, len);
}
LOGEVENT('S');