5 extern sockPoolHashTable_t *transPrefetchSockPool;
6 extern unsigned int myIpAddr;
8 /* Steps for the new prefetch call */
9 // Function for new prefetch call
10 void rangePrefetch(int prefetchsiteid, int ntuples, unsigned int *baseoids,
11 unsigned short *numoffsets, short *offsets) {
12 // a[0][1] - a[3][1] = a.0.3
14 // a.f.next.h = a.f.0.next.0.h
15 // a.f.next.next.h = a.f.next.2.h
16 /* Allocate memory in prefetch queue and push the block there */
17 int qnodesize = 2*sizeof(int)+ ntuples *(sizeof(unsigned int) + sizeof(unsigned short)) + numoffsets[ntuples -1] * sizeof(short);
18 char *node = (char *) getmemory(qnodesize);
19 int top = numoffsets[ntuples -1];
25 *((int *)(node)) = prefetchsiteid;
26 *((int *)(node + sizeof(int))) = ntuples;
27 index = 2 * sizeof(int);
28 memcpy(node+index, baseoids, ntuples * sizeof(unsigned int));
29 index = index + ntuples *(sizeof(unsigned int));
30 memcpy(node+index, numoffsets, ntuples * sizeof(unsigned short));
31 index = index + ntuples *(sizeof(unsigned short));
32 memcpy(node+index, offsets, top * sizeof(short));
37 void *transPrefetchNew() {
39 /* Read from prefetch queue */
40 void *node = gettail();
41 /* Check tuples if they are found locally */
42 perMcPrefetchList_t* pilehead = checkIfLocal(node);
45 // Get sock from shared pool
46 int sd = getSock2(transPrefetchSockPool, pilehead->mid);
48 /* Send Prefetch Request */
49 perMcPrefetchList_t *ptr = pilehead;
51 sendRangePrefetchReq(ptr, sd);
55 /* Deallocated pilehead */
56 proPrefetchQDealloc(pilehead);
58 // Deallocate the prefetch queue pile node
63 int getsize(short *ptr, int n) {
64 int sum = 0, newsum, i;
65 for (i = n-1; i >= 0; i--) {
66 newsum = (1 + ptr[i])+((1 + ptr[i])*sum);
72 perMcPrefetchList_t* checkIfLocal(char *ptr) {
73 int siteid = *(GET_SITEID(ptr));
74 unsigned int *baseoids = GET_PTR_OID(ptr);
75 unsigned int ntuples = *(GET_NTUPLES(ptr));
76 unsigned short *endoffsets = GET_PTR_EOFF(ptr, ntuples);
77 short *offsets = GET_PTR_ARRYFLD(ptr, ntuples);
81 perMcPrefetchList_t * head=NULL;
83 // Iterate for each object
84 for (i = 0; i < ntuples; i++) {
85 int numoffset = (i == 0) ? endoffsets[0] : (endoffsets[i] - endoffsets[i-1]);
86 int sizetmpObjSet = numoffset >> 1;
87 unsigned short tmpobjset[sizetmpObjSet];
89 for (l = 0; l < sizetmpObjSet; l++) {
90 tmpobjset[l] = GET_RANGE(offsets[2*l+1]);
92 int maxChldOids = getsize(tmpobjset, sizetmpObjSet)+1;
93 unsigned int chldOffstFrmBase[maxChldOids];
94 chldOffstFrmBase[0] = baseoids[i];
95 int tovisit = 0, visited = -1;
96 // Iterate for each element of offsets
97 for (j = 0; j < numoffset; j++) {
98 // Iterate over each element to be visited
99 while (visited != tovisit) {
100 if(chldOffstFrmBase[visited+1] == 0) {
105 if (!isOidAvail(chldOffstFrmBase[visited+1])) {
106 // Add to remote requests
107 unsigned int oid = chldOffstFrmBase[visited+1];
108 int machinenum = lhashSearch(oid);
109 //TODO Group a bunch of oids to send in one prefetch request
110 insertPrefetch(machinenum, oid, numoffset-j, offsets, &head);
113 // iterate over each offset
115 if((retval = lookForObjs(chldOffstFrmBase, offsets, &j,
116 &visited, &tovisit)) == 0) {
117 printf("%s() Error: Object not found %s at line %d\n",
118 __func__, __FILE__, __LINE__);
123 } // end iterate for each element of offsets
125 //Entire prefetch found locally
132 } // end iterate for each object
134 /* handle dynamic prefetching */
135 handleDynPrefetching(numLocal, ntuples, siteid);
139 int isOidAvail(unsigned int oid) {
140 objheader_t * header;
141 if((header=(objheader_t *)mhashSearch(oid))!=NULL) {
144 } else if ((header=(objheader_t *)prehashSearch(oid))!=NULL) {
151 int lookForObjs(int *chldOffstFrmBase, short *offsets,
152 int *index, int *visited, int *tovisit) {
154 unsigned int oid = chldOffstFrmBase[*visited+1];
155 if((header = (objheader_t *)mhashSearch(oid))!= NULL) {
158 } else if((header = (objheader_t *)prehashSearch(oid))!=NULL) {
159 //Found in prefetch cache
162 printf("DEBUG->%s()THIS SHOULD NOR HAPPEN\n", __func__);
166 if(TYPE(header) > NUMCLASSES) {
167 int elementsize = classsize[TYPE(header)];
168 struct ArrayObject *ao = (struct ArrayObject *) (((char *)header) + sizeof(objheader_t));
169 int length = ao->___length___;
170 /* Check if array out of bounds */
171 int startindex = offsets[*index];
172 int range = GET_RANGE(offsets[(*index)+1]);
173 if(range > 0 && range < length) {
174 short stride = GET_STRIDE(offsets[(*index)+1]);
175 stride = stride + 1; //NOTE bit pattern 000 => stride = 1, 001 => stride = 2
177 //check is stride +ve or negative
178 if(GET_STRIDEINC(offsets[(*index)]+1)) { //-ve stride
179 for(i = startindex; i <= range+1; i = i - stride) {
180 unsigned int oid = 0;
181 if((i < 0 || i >= length)) {
182 //if yes treat the object as found
186 // compute new object
187 oid = *((unsigned int *)(((char *)ao) + sizeof(struct ArrayObject) + (elementsize*i)));
190 chldOffstFrmBase[*tovisit] = oid;
191 *tovisit = *tovisit + 1;
193 } else { //+ve stride
194 for(i = startindex; i <= range; i = i + stride) {
195 unsigned int oid = 0;
196 if(i < 0 || i >= length) {
197 //if yes treat the object as found
201 // compute new object
202 oid = *((unsigned int *)(((char *)ao) + sizeof(struct ArrayObject) + (elementsize*i)));
205 chldOffstFrmBase[*tovisit] = oid;
206 *tovisit = *tovisit + 1;
209 } else if(range == 0) {
210 if(startindex >=0 || startindex < length) {
211 unsigned int oid = *((unsigned int *)(((char *)ao) + sizeof(struct ArrayObject) + (elementsize*startindex)));
213 chldOffstFrmBase[*tovisit] = oid;
214 *tovisit = *tovisit + 1;
217 } else { //linked list
218 int startindex = offsets[*index];
219 int range = GET_RANGE(offsets[(*index)+1]);
222 oid = *((unsigned int *)(((char *)header) + sizeof(objheader_t) + startindex));
224 chldOffstFrmBase[*tovisit] = oid;
225 *tovisit = *tovisit + 1;
228 for(i = 0; i < range; i++) {
229 oid = *((unsigned int *)(((char *)header) + sizeof(objheader_t) + startindex));
231 chldOffstFrmBase[*tovisit] = oid;
232 *tovisit = *tovisit + 1;
240 /* Delete perMcPrefetchList_t and everything it points to */
241 void proPrefetchQDealloc(perMcPrefetchList_t *node) {
242 perMcPrefetchList_t *prefetchpile_ptr;
243 perMcPrefetchList_t *prefetchpile_next_ptr;
244 objOffsetPile_t *objpile_ptr;
245 objOffsetPile_t *objpile_next_ptr;
247 prefetchpile_ptr = node;
248 while (prefetchpile_ptr != NULL) {
249 prefetchpile_next_ptr = prefetchpile_ptr;
250 while(prefetchpile_ptr->list != NULL) {
251 //offsets aren't owned by us, so we don't free them.
252 objpile_ptr = prefetchpile_ptr->list;
253 prefetchpile_ptr->list = objpile_ptr->next;
256 prefetchpile_ptr = prefetchpile_next_ptr->next;
257 free(prefetchpile_next_ptr);
261 void insertPrefetch(int mid, unsigned int oid, short numoffset, short *offsets, perMcPrefetchList_t **head) {
262 perMcPrefetchList_t *ptr;
263 objOffsetPile_t *objnode;
264 objOffsetPile_t **tmp;
266 //Loop through the machines
267 for(; 1; head=&((*head)->next)) {
269 if ((*head)==NULL||(tmid=(*head)->mid)>mid) {
270 perMcPrefetchList_t * tmp = (perMcPrefetchList_t *) malloc(sizeof(perMcPrefetchList_t));
272 objnode = malloc(sizeof(objOffsetPile_t));
273 objnode->offsets = offsets;
275 objnode->numoffset = numoffset;
276 objnode->next = NULL;
288 for(tmp=&((*head)->list); 1; tmp=&((*tmp)->next)) {
292 if ((*tmp)==NULL||((toid=(*tmp)->oid)>oid)) {
293 objnode = (objOffsetPile_t *) malloc(sizeof(objOffsetPile_t));
294 objnode->offsets = offsets;
296 objnode->numoffset = numoffset;
297 objnode->next = *tmp;
306 int onumoffset=(*tmp)->numoffset;
307 short * ooffset=(*tmp)->offsets;
309 for(i=0; i<numoffset; i++) {
311 //We've matched, let's just extend the current prefetch
312 (*tmp)->numoffset=numoffset;
313 (*tmp)->offsets=offsets;
316 if (ooffset[i]<offsets[i]) {
318 } else if (ooffset[i]>offsets[i]) {
319 //Place item before the current one
320 objnode = (objOffsetPile_t *) malloc(sizeof(objOffsetPile_t));
321 objnode->offsets = offsets;
323 objnode->numoffset = numoffset;
324 objnode->next = *tmp;
329 //if we get to the end, we're already covered by this prefetch
337 void sendRangePrefetchReq(perMcPrefetchList_t *mcpilenode, int sd) {
340 objOffsetPile_t *tmp;
342 /* Send TRANS_PREFETCH control message */
343 control = TRANS_PREFETCH;
344 send_data(sd, &control, sizeof(char));
346 /* Send Oids and offsets in pairs */
347 tmp = mcpilenode->list;
349 len = sizeof(int) + sizeof(unsigned int) + sizeof(unsigned int) + ((tmp->numoffset) * sizeof(short));
350 char oidnoffset[len];
351 char *buf=oidnoffset;
352 *((int*)buf) = tmp->numoffset;
354 *((unsigned int *)buf) = tmp->oid;
355 buf+=sizeof(unsigned int);
356 *((unsigned int *)buf) = myIpAddr;
357 buf += sizeof(unsigned int);
358 memcpy(buf, tmp->offsets, (tmp->numoffset)*sizeof(short));
359 send_data(sd, oidnoffset, len);
363 /* Send a special char -1 to represent the end of sending oids + offset pair to remote machine */
365 send_data(sd, &endpair, sizeof(int));