d263b048f26e105d286481416a2f2c0935e6e2ce
[IRC.git] / Robust / src / Runtime / DSTM / interface / prefetch.c
1 #include "prefetch.h"
2 #include "prelookup.h"
3 #include "sockpool.h"
4
5 extern sockPoolHashTable_t *transPrefetchSockPool;
6 extern unsigned int myIpAddr;
7
8 /* Steps for the new prefetch call */
9 // Function for new prefetch call
10 void rangePrefetch(int prefetchsiteid, int ntuples, unsigned int *baseoids, 
11     unsigned short *numoffsets, short *offsets) {
12   // a[0][1] - a[3][1] = a.0.3
13   // a.f.h   = a.f.h
14   // a.f.next.h = a.f.0.next.0.h
15   // a.f.next.next.h  = a.f.next.2.h
16   /* Allocate memory in prefetch queue and push the block there */
17   int qnodesize = 2*sizeof(int)+ ntuples *(sizeof(unsigned int) + sizeof(unsigned short)) + numoffsets[ntuples -1] * sizeof(short);
18   char *node = (char *) getmemory(qnodesize);
19   int top = numoffsets[ntuples -1];
20
21   if(node == NULL)
22     return;
23
24   int index = 0;
25   *((int *)(node)) = prefetchsiteid;
26   *((int *)(node + sizeof(int))) = ntuples;
27   index = 2 * sizeof(int);
28   memcpy(node+index, baseoids, ntuples * sizeof(unsigned int));
29   index = index + ntuples *(sizeof(unsigned int));
30   memcpy(node+index, numoffsets, ntuples * sizeof(unsigned short));
31   index = index + ntuples *(sizeof(unsigned short));
32   memcpy(node+index, offsets, top * sizeof(short));
33
34   movehead(qnodesize);
35 }
36
37 void *transPrefetchNew() {
38   while(1) {
39     /* Read from prefetch queue */
40     void *node = gettail();
41     /* Check tuples if they are found locally */
42     perMcPrefetchList_t* pilehead = checkIfLocal(node);
43
44     if (pilehead!=NULL) {
45       // Get sock from shared pool
46       int sd = getSock2(transPrefetchSockPool, pilehead->mid);
47
48       /* Send  Prefetch Request */
49       perMcPrefetchList_t *ptr = pilehead;
50       while(ptr != NULL) {
51         sendRangePrefetchReq(ptr, sd);
52         ptr = ptr->next;
53       }
54
55       /* Deallocated pilehead */
56       proPrefetchQDealloc(pilehead);
57     }
58     // Deallocate the prefetch queue pile node
59     inctail();
60   }
61 }
62
63 int getsize(short *ptr, int n) {
64   int sum = 0, newsum, i;
65   for (i = n-1; i >= 0; i--) {
66     newsum = (1 + ptr[i])+((1 + ptr[i])*sum);
67     sum = newsum;
68   }
69   return sum;
70 }
71
72 perMcPrefetchList_t*  checkIfLocal(char *ptr) {
73   int siteid = *(GET_SITEID(ptr));
74   unsigned int *baseoids = GET_PTR_OID(ptr);
75   unsigned int ntuples = *(GET_NTUPLES(ptr));
76   unsigned short *endoffsets = GET_PTR_EOFF(ptr, ntuples);
77   short *offsets = GET_PTR_ARRYFLD(ptr, ntuples);
78   int i, j, k;
79   int numLocal = 0;
80
81   perMcPrefetchList_t * head=NULL;
82
83   // Iterate for each object
84   for (i = 0; i < ntuples; i++) {
85     int numoffset = (i == 0) ? endoffsets[0] : (endoffsets[i] - endoffsets[i-1]);
86     int sizetmpObjSet = numoffset >> 1; 
87     unsigned short tmpobjset[sizetmpObjSet];
88     int l;
89     for (l = 0; l < sizetmpObjSet; l++) {
90       tmpobjset[l] = GET_RANGE(offsets[2*l+1]);
91     }
92     int maxChldOids = getsize(tmpobjset, sizetmpObjSet)+1;
93     unsigned int chldOffstFrmBase[maxChldOids];
94     chldOffstFrmBase[0] = baseoids[i];
95     int tovisit = 0, visited = -1;
96     // Iterate for each element of offsets
97     for (j = 0; j < numoffset; j++) {
98       // Iterate over each element to be visited
99       while (visited != tovisit) {
100         if(chldOffstFrmBase[visited+1] == 0) {
101           visited++;
102           continue;
103         }
104
105         if (!isOidAvail(chldOffstFrmBase[visited+1])) { 
106           // Add to remote requests 
107           unsigned int oid = chldOffstFrmBase[visited+1];
108           int machinenum = lhashSearch(oid);
109           //TODO Group a bunch of oids to send in one prefetch request
110           insertPrefetch(machinenum, oid, numoffset-j, offsets, &head);
111           break;
112         } else {
113           // iterate over each offset
114           int retval;
115           if((retval = lookForObjs(chldOffstFrmBase, offsets, &j, 
116               &visited, &tovisit)) == 0) {
117             printf("%s() Error: Object not found %s at line %d\n", 
118                 __func__, __FILE__, __LINE__); 
119           }
120         }
121         visited++;
122       } 
123     } // end iterate for each element of offsets
124
125     //Entire prefetch found locally
126     if(j == numoffset) {
127       numLocal++;
128       goto tuple;
129     }
130 tuple:
131     ;
132   } // end iterate for each object
133
134   /* handle dynamic prefetching */
135   handleDynPrefetching(numLocal, ntuples, siteid);
136   return head;
137 }
138
139 int isOidAvail(unsigned int oid) {
140   objheader_t * header;
141   if((header=(objheader_t *)mhashSearch(oid))!=NULL) {
142     //Found on machine
143     return 1;
144   } else if ((header=(objheader_t *)prehashSearch(oid))!=NULL) {
145     return 1;
146   } else {
147     return 0;
148   }
149 }
150
151 int lookForObjs(int *chldOffstFrmBase, short *offsets, 
152     int *index, int *visited, int *tovisit) {
153   objheader_t *header;
154   unsigned int oid = chldOffstFrmBase[*visited+1];
155   if((header = (objheader_t *)mhashSearch(oid))!= NULL) {
156     //Found on machine
157     ;
158   } else if((header = (objheader_t *)prehashSearch(oid))!=NULL) {
159     //Found in prefetch cache
160     ;
161   } else {
162     printf("DEBUG->%s()THIS SHOULD NOR HAPPEN\n", __func__);
163     return 0;
164   }
165
166   if(TYPE(header) > NUMCLASSES) {
167     int elementsize = classsize[TYPE(header)];
168     struct ArrayObject *ao = (struct ArrayObject *) (((char *)header) + sizeof(objheader_t));
169     int length = ao->___length___;
170     /* Check if array out of bounds */
171     int startindex = offsets[*index];
172     int range = GET_RANGE(offsets[(*index)+1]);
173     if(range > 0 && range < length) {
174       short stride = GET_STRIDE(offsets[(*index)+1]);
175       stride = stride + 1; //NOTE  bit pattern 000 => stride = 1, 001 => stride = 2
176       int i;
177       //check is stride +ve or negative
178       if(GET_STRIDEINC(offsets[(*index)]+1)) { //-ve stride
179         for(i = startindex; i <= range+1; i = i - stride) {
180           unsigned int oid = 0;
181           if((i < 0 || i >= length)) {
182             //if yes treat the object as found
183             oid = 0;
184             continue;
185           } else {
186             // compute new object
187             oid = *((unsigned int *)(((char *)ao) + sizeof(struct ArrayObject) + (elementsize*i)));
188           }
189           // add new object
190           chldOffstFrmBase[*tovisit] = oid;
191           *tovisit = *tovisit + 1;
192         }
193       } else { //+ve stride
194         for(i = startindex; i <= range; i = i + stride) {
195           unsigned int oid = 0;
196           if(i < 0 || i >= length) {
197             //if yes treat the object as found
198             oid = 0;
199             continue;
200           } else {
201             // compute new object
202             oid = *((unsigned int *)(((char *)ao) + sizeof(struct ArrayObject) + (elementsize*i)));
203           }
204           // add new object
205           chldOffstFrmBase[*tovisit] = oid;
206           *tovisit = *tovisit + 1;
207         }
208       }
209     } else if(range == 0) {
210       if(startindex >=0 || startindex < length) {
211         unsigned int oid = *((unsigned int *)(((char *)ao) + sizeof(struct ArrayObject) + (elementsize*startindex)));
212         // add new object
213         chldOffstFrmBase[*tovisit] = oid;
214         *tovisit = *tovisit + 1;
215       }
216     }
217   } else { //linked list
218     int startindex = offsets[*index];
219     int range = GET_RANGE(offsets[(*index)+1]);
220     unsigned int oid;
221     if(range == 0) {
222       oid = *((unsigned int *)(((char *)header) + sizeof(objheader_t) + startindex));
223       // add new object
224       chldOffstFrmBase[*tovisit] = oid;
225       *tovisit = *tovisit + 1;
226     } else {
227       int i;
228       for(i = 0; i < range; i++) {
229         oid = *((unsigned int *)(((char *)header) + sizeof(objheader_t) + startindex));
230         // add new object
231         chldOffstFrmBase[*tovisit] = oid;
232         *tovisit = *tovisit + 1;
233       }
234     }
235   }
236   *index = *index + 2;
237   return 1;
238 }
239
240 /* Delete perMcPrefetchList_t and everything it points to */
241 void proPrefetchQDealloc(perMcPrefetchList_t *node) {
242   perMcPrefetchList_t *prefetchpile_ptr;
243   perMcPrefetchList_t *prefetchpile_next_ptr;
244   objOffsetPile_t *objpile_ptr;
245   objOffsetPile_t *objpile_next_ptr;
246
247   prefetchpile_ptr = node;
248   while (prefetchpile_ptr != NULL) {
249     prefetchpile_next_ptr = prefetchpile_ptr;
250     while(prefetchpile_ptr->list != NULL) {
251       //offsets aren't owned by us, so we don't free them.
252       objpile_ptr = prefetchpile_ptr->list;
253       prefetchpile_ptr->list = objpile_ptr->next;
254       free(objpile_ptr);
255     }
256     prefetchpile_ptr = prefetchpile_next_ptr->next;
257     free(prefetchpile_next_ptr);
258   }
259 }
260
261 void insertPrefetch(int mid, unsigned int oid, short numoffset, short *offsets, perMcPrefetchList_t **head) {
262   perMcPrefetchList_t *ptr;
263   objOffsetPile_t *objnode;
264   objOffsetPile_t **tmp;
265
266   //Loop through the machines
267   for(; 1; head=&((*head)->next)) {
268     int tmid;
269     if ((*head)==NULL||(tmid=(*head)->mid)>mid) {
270       perMcPrefetchList_t * tmp = (perMcPrefetchList_t *) malloc(sizeof(perMcPrefetchList_t));
271       tmp->mid = mid;
272       objnode =  malloc(sizeof(objOffsetPile_t));
273       objnode->offsets = offsets;
274       objnode->oid = oid;
275       objnode->numoffset = numoffset;
276       objnode->next = NULL;
277       tmp->list = objnode;
278       tmp->next = *head;
279       *head=tmp;
280       return;
281     }   
282
283     //keep looking
284     if (tmid < mid)
285       continue;
286
287     //found mid list
288     for(tmp=&((*head)->list); 1; tmp=&((*tmp)->next)) {
289       int toid;
290       int matchstatus;
291
292       if ((*tmp)==NULL||((toid=(*tmp)->oid)>oid)) {
293         objnode = (objOffsetPile_t *) malloc(sizeof(objOffsetPile_t));
294         objnode->offsets = offsets;
295         objnode->oid = oid;
296         objnode->numoffset = numoffset;
297         objnode->next = *tmp;
298         *tmp = objnode;
299         return;
300       }   
301       if (toid < oid)
302         continue;
303
304       /* Fill list DS */
305       int i;
306       int onumoffset=(*tmp)->numoffset;
307       short * ooffset=(*tmp)->offsets;
308
309       for(i=0; i<numoffset; i++) {
310         if (i>onumoffset) {
311           //We've matched, let's just extend the current prefetch
312           (*tmp)->numoffset=numoffset;
313           (*tmp)->offsets=offsets;
314           return;
315         }
316         if (ooffset[i]<offsets[i]) {
317           goto oidloop;
318         } else if (ooffset[i]>offsets[i]) {
319           //Place item before the current one
320           objnode = (objOffsetPile_t *) malloc(sizeof(objOffsetPile_t));
321           objnode->offsets = offsets;
322           objnode->oid = oid;
323           objnode->numoffset = numoffset;
324           objnode->next = *tmp;
325           *tmp = objnode;
326           return;
327         }
328       }
329       //if we get to the end, we're already covered by this prefetch
330       return;
331 oidloop:
332       ;
333     }
334   }
335 }
336
337 void sendRangePrefetchReq(perMcPrefetchList_t *mcpilenode, int sd) {
338   int len, endpair;
339   char control;
340   objOffsetPile_t *tmp;
341
342   /* Send TRANS_PREFETCH control message */
343   control = TRANS_PREFETCH;
344   send_data(sd, &control, sizeof(char));
345
346   /* Send Oids and offsets in pairs */
347   tmp = mcpilenode->list;
348   while(tmp != NULL) {
349     len = sizeof(int) + sizeof(unsigned int) + sizeof(unsigned int) + ((tmp->numoffset) * sizeof(short));
350     char oidnoffset[len];
351     char *buf=oidnoffset;
352     *((int*)buf) = tmp->numoffset;
353     buf+=sizeof(int);
354     *((unsigned int *)buf) = tmp->oid;
355     buf+=sizeof(unsigned int);
356     *((unsigned int *)buf) = myIpAddr;
357     buf += sizeof(unsigned int);
358     memcpy(buf, tmp->offsets, (tmp->numoffset)*sizeof(short));
359     send_data(sd, oidnoffset, len);
360     tmp = tmp->next;
361   }
362
363   /* Send a special char -1 to represent the end of sending oids + offset pair to remote machine */
364   endpair = -1;
365   send_data(sd, &endpair, sizeof(int));
366
367   return;
368 }