b43f12678523840e97fc2729879a2c00153d0450
[IRC.git] / Robust / src / Runtime / DSTM / interface / prefetch.c
1 #include "prefetch.h"
2 #include "prelookup.h"
3 #include "sockpool.h"
4 #include "gCollect.h"
5
6 extern sockPoolHashTable_t *transPrefetchSockPool;
7 extern unsigned int myIpAddr;
8 extern sockPoolHashTable_t *transPResponseSocketPool;
9 extern pthread_mutex_t prefetchcache_mutex;
10 extern prehashtable_t pflookup;
11
12 // Function for new prefetch call
13 void rangePrefetch(unsigned int oid, short numoffset, short *offsets) {
14   /* Allocate memory in prefetch queue and push the block there */
15   int qnodesize = sizeof(unsigned int) + sizeof(unsigned short) + numoffset * sizeof(short);
16   char *node = (char *) getmemory(qnodesize);
17   int i;
18
19   if(node == NULL)
20     return;
21
22   int index = 0;
23   ((unsigned int *)node)[0] = oid;
24   index = index + (sizeof(unsigned int));
25   *((short *)(node+index)) = numoffset;
26   index = index + (sizeof(short));
27   memcpy(node+index, offsets, numoffset * sizeof(short));
28
29   movehead(qnodesize);
30 }
31
32 void *transPrefetchNew() {
33   while(1) {
34     /* Read from prefetch queue */
35     void *node = gettail();
36
37     /* Check tuples if they are found locally */
38     perMcPrefetchList_t* pilehead = checkIfLocal(node);
39
40     if (pilehead!=NULL) {
41       // Get sock from shared pool
42       int sd = getSock2(transPrefetchSockPool, pilehead->mid);
43
44       /* Send  Prefetch Request */
45       perMcPrefetchList_t *ptr = pilehead;
46       while(ptr != NULL) {
47         sendRangePrefetchReq(ptr, sd);
48         ptr = ptr->next;
49       }
50
51       /* Deallocated pilehead */
52       proPrefetchQDealloc(pilehead);
53     }
54     // Deallocate the prefetch queue pile node
55     inctail();
56   }
57 }
58
59 int getsize(short *ptr, int n) {
60   int sum = 0, newsum, i;
61   for (i = n-1; i >= 0; i--) {
62     newsum = (1 + ptr[i])+((1 + ptr[i])*sum);
63     sum = newsum;
64   }
65   return sum;
66 }
67
68
69 perMcPrefetchList_t *checkIfLocal(char *ptr) {
70   unsigned int oid = *(GET_OID(ptr));
71   short numoffset = *(GET_NUM_OFFSETS(ptr));
72   short *offsetarray = GET_OFFSETS(ptr);
73   int depth=0, top=0;
74   unsigned int dfsList[numoffset];
75   oidAtDepth_t odep;
76
77   /* Initialize */
78   perMcPrefetchList_t *head = NULL;
79   odep.oid = 0;
80   odep.depth = depth;
81   int i;
82   for(i = 0; i<numoffset; i++) {
83     dfsList[i] = 0;
84   }
85
86   //Start searching the dfsList
87   while(top >= 0) {
88     int retval;
89     if((retval = getNextOid(offsetarray, dfsList, &top, &depth, &odep, oid)) != 0) {
90       printf("%s() Error: Getting new oid at %s, %d\n", __func__, __FILE__, __LINE__);
91       return NULL;
92     }
93     dfsList[top] = odep.oid;
94     dfsList[top+1] = 0;
95 labelL1:
96     ;
97     objheader_t *objhead = searchObj(dfsList[top]);
98     if(objhead == NULL) { //null oid or oid not found
99       // Not found
100       int machinenum = lhashSearch(dfsList[top]);
101       insertPrefetch(machinenum, dfsList[top], numoffset-(depth), &offsetarray[depth], &head);
102       //go up the tree
103       while((dfsList[top+1] == *(offsetarray + depth + 1)) && (depth >= 0)) {
104         if(top == depth) {
105           top -= 2;
106           depth -= 2;
107         } else {
108           depth -= 2;
109         }
110       }
111       //return if no more paths to explore
112       if(top < 0 || depth < 0) {
113         return head;
114       }
115       //If more paths to explore, proceed down the tree
116       dfsList[top+1]++;
117       int prev = top - 2;
118       objheader_t *header;
119       header = searchObj(dfsList[prev]);
120       if(header == NULL) {
121         printf("%s() Error Object not found at %s , %d\n", __func__, __FILE__, __LINE__);
122         return NULL;
123       } else {
124         //if Array
125         if(TYPE(header) > NUMCLASSES) {
126           dfsList[top] = getNextArrayOid(offsetarray, dfsList, &top, &depth);
127         } else { //linked list
128           dfsList[top] = getNextPointerOid(offsetarray, dfsList, &top, &depth);
129         }
130         goto labelL1;
131       }
132     } else { // increment and go down the tree
133       //Increment top
134       top += 2;
135       depth += 2;
136       if(depth >= numoffset) { //reached the end of the path
137         top -= 2;
138         depth -= 2;
139         //go up the tree
140         while((dfsList[top + 1] == *(offsetarray + depth + 1)) && (depth >= 0)) {
141           if(top == depth) {
142             top -= 2;
143             depth -= 2;
144           } else
145             depth -= 2;
146         }
147         //return if no more paths to explore
148         if(top < 0 || depth < 0) {
149           return head;
150         }
151         //If more paths to explore, go down the tree
152         dfsList[top + 1]++;
153         int prev = top - 2;
154         objheader_t * header;
155         header = searchObj(dfsList[prev]);
156         if(header == NULL) {
157           printf("%s() Error Object not found at %s , %d\n", __func__, __FILE__, __LINE__);
158           return NULL;
159         } else {
160           //if Array
161           if(TYPE(header) > NUMCLASSES) {
162             dfsList[top] = getNextArrayOid(offsetarray, dfsList, &top, &depth);
163           } else { //linked list
164             dfsList[top] = getNextPointerOid(offsetarray, dfsList, &top, &depth);
165           }
166           goto labelL1;
167         }
168       } else
169         continue;
170     }
171   } //end of while
172   return head;
173 }
174
175 objheader_t *searchObj(unsigned int oid) {
176   objheader_t *header = NULL;
177
178   if ((header = (objheader_t *)mhashSearch(oid)) != NULL) {
179     return header;
180   } else if ((header = (objheader_t *) prehashSearch(oid)) != NULL) {
181     return header;
182   } else {
183     //printf("Error: Cannot find header %s, %d\n", __func__, __LINE__);
184   }
185   return NULL;
186 }
187
188 /* Delete perMcPrefetchList_t and everything it points to */
189 void proPrefetchQDealloc(perMcPrefetchList_t *node) {
190   perMcPrefetchList_t *prefetchpile_ptr;
191   perMcPrefetchList_t *prefetchpile_next_ptr;
192   objOffsetPile_t *objpile_ptr;
193   objOffsetPile_t *objpile_next_ptr;
194
195   prefetchpile_ptr = node;
196   while (prefetchpile_ptr != NULL) {
197     prefetchpile_next_ptr = prefetchpile_ptr;
198     while(prefetchpile_ptr->list != NULL) {
199       //offsets aren't owned by us, so we don't free them.
200       objpile_ptr = prefetchpile_ptr->list;
201       prefetchpile_ptr->list = objpile_ptr->next;
202       free(objpile_ptr);
203     }
204     prefetchpile_ptr = prefetchpile_next_ptr->next;
205     free(prefetchpile_next_ptr);
206   }
207 }
208
209 void insertPrefetch(int mid, unsigned int oid, short numoffset, short *offsets, perMcPrefetchList_t **head) {
210   perMcPrefetchList_t *ptr;
211   objOffsetPile_t *objnode;
212   objOffsetPile_t **tmp;
213
214   //Loop through the machines
215   for(; 1; head=&((*head)->next)) {
216     int tmid;
217     if ((*head)==NULL||(tmid=(*head)->mid)>mid) {
218       perMcPrefetchList_t * tmp = (perMcPrefetchList_t *) malloc(sizeof(perMcPrefetchList_t));
219       tmp->mid = mid;
220       objnode =  malloc(sizeof(objOffsetPile_t));
221       objnode->offsets = offsets;
222       objnode->oid = oid;
223       objnode->numoffset = numoffset;
224       objnode->next = NULL;
225       tmp->list = objnode;
226       tmp->next = *head;
227       *head=tmp;
228       return;
229     }
230
231     //keep looking
232     if (tmid < mid)
233       continue;
234
235     //found mid list
236     for(tmp=&((*head)->list); 1; tmp=&((*tmp)->next)) {
237       int toid;
238       int matchstatus;
239
240       if ((*tmp)==NULL||((toid=(*tmp)->oid)>oid)) {
241         objnode = (objOffsetPile_t *) malloc(sizeof(objOffsetPile_t));
242         objnode->offsets = offsets;
243         objnode->oid = oid;
244         objnode->numoffset = numoffset;
245         objnode->next = *tmp;
246         *tmp = objnode;
247         return;
248       }
249       if (toid < oid)
250         continue;
251
252       /* Fill list DS */
253       int i;
254       int onumoffset=(*tmp)->numoffset;
255       short * ooffset=(*tmp)->offsets;
256
257       for(i=0; i<numoffset; i++) {
258         if (i>onumoffset) {
259           //We've matched, let's just extend the current prefetch
260           (*tmp)->numoffset=numoffset;
261           (*tmp)->offsets=offsets;
262           return;
263         }
264         if (ooffset[i]<offsets[i]) {
265           goto oidloop;
266         } else if (ooffset[i]>offsets[i]) {
267           //Place item before the current one
268           objnode = (objOffsetPile_t *) malloc(sizeof(objOffsetPile_t));
269           objnode->offsets = offsets;
270           objnode->oid = oid;
271           objnode->numoffset = numoffset;
272           objnode->next = *tmp;
273           *tmp = objnode;
274           return;
275         }
276       }
277       //if we get to the end, we're already covered by this prefetch
278       return;
279 oidloop:
280       ;
281     }
282   }
283 }
284
285 void sendRangePrefetchReq(perMcPrefetchList_t *mcpilenode, int sd) {
286   int len, endpair;
287   char control;
288   objOffsetPile_t *tmp;
289
290   /* Send TRANS_PREFETCH control message */
291   control = TRANS_PREFETCH;
292   send_data(sd, &control, sizeof(char));
293
294   /* Send Oids and offsets in pairs */
295   tmp = mcpilenode->list;
296   while(tmp != NULL) {
297     len = sizeof(int) + sizeof(unsigned int) + sizeof(unsigned int) + ((tmp->numoffset) * sizeof(short));
298     char oidnoffset[len];
299     char *buf=oidnoffset;
300     *((int*)buf) = tmp->numoffset;
301     buf+=sizeof(int);
302     *((unsigned int *)buf) = tmp->oid;
303     buf+=sizeof(unsigned int);
304     *((unsigned int *)buf) = myIpAddr;
305     buf += sizeof(unsigned int);
306     memcpy(buf, tmp->offsets, (tmp->numoffset)*sizeof(short));
307     send_data(sd, oidnoffset, len);
308     tmp = tmp->next;
309   }
310
311   /* Send a special char -1 to represent the end of sending oids + offset pair to remote machine */
312   endpair = -1;
313   send_data(sd, &endpair, sizeof(int));
314   return;
315 }
316
317 int getRangePrefetchResponse(int sd) {
318   int length = 0;
319   recv_data(sd, &length, sizeof(int));
320   int size = length - sizeof(int);
321   char recvbuffer[size];
322   recv_data(sd, recvbuffer, size);
323   char control = *((char *) recvbuffer);
324   unsigned int oid;
325   if(control == OBJECT_FOUND) {
326     oid = *((unsigned int *)(recvbuffer + sizeof(char)));
327     size = size - (sizeof(char) + sizeof(unsigned int));
328     pthread_mutex_lock(&prefetchcache_mutex);
329     void *ptr;
330     if((ptr = prefetchobjstrAlloc(size)) == NULL) {
331       printf("%s() Error: objstrAlloc error for copying into prefetch cache in line %d at %s\n",
332              __func__, __LINE__, __FILE__);
333       pthread_mutex_unlock(&prefetchcache_mutex);
334       return -1;
335     }
336     pthread_mutex_unlock(&prefetchcache_mutex);
337     memcpy(ptr, recvbuffer + sizeof(char) + sizeof(unsigned int), size);
338     STATUS(ptr)=0;
339
340     /* Insert into prefetch hash lookup table */
341     void * oldptr;
342     if((oldptr = prehashSearch(oid)) != NULL) {
343       if(((objheader_t *)oldptr)->version <= ((objheader_t *)ptr)->version) {
344         prehashRemove(oid);
345         prehashInsert(oid, ptr);
346       }
347     } else {
348       prehashInsert(oid, ptr);
349     }
350     pthread_mutex_lock(&pflookup.lock);
351     pthread_cond_broadcast(&pflookup.cond);
352     pthread_mutex_unlock(&pflookup.lock);
353   } else if(control == OBJECT_NOT_FOUND) {
354     oid = *((unsigned int *)(recvbuffer + sizeof(char)));
355     //printf("%s() Error: OBJ NOT FOUND.. THIS SHOULD NOT HAPPEN\n", __func__);
356   } else {
357     printf("%s() Error: in Decoding the control value %d, %s\n", __func__, __LINE__, __FILE__);
358   }
359   return 0;
360 }
361
362 int rangePrefetchReq(int acceptfd) {
363   int numoffset, sd = -1;
364   unsigned int baseoid, mid = -1;
365   oidmidpair_t oidmid;
366
367   while (1) {
368     recv_data(acceptfd, &numoffset, sizeof(int));
369     if(numoffset == -1)
370       break;
371     recv_data(acceptfd, &oidmid, 2*sizeof(unsigned int));
372     baseoid = oidmid.oid;
373     if(mid != oidmid.mid) {
374       if(mid!= -1)
375         freeSockWithLock(transPResponseSocketPool, mid, sd);
376       mid = oidmid.mid;
377       sd = getSockWithLock(transPResponseSocketPool, mid);
378     }
379     short offsetsarry[numoffset];
380     recv_data(acceptfd, offsetsarry, numoffset*sizeof(short));
381     int retval;
382     if((retval = dfsOffsetTree(baseoid, offsetsarry, sd, numoffset)) != 0) {
383       printf("%s() Error: in dfsOffsetTree() at line %d in %s()\n",
384              __func__, __LINE__, __FILE__);
385       return -1;
386     }
387   }
388
389   //Release socket
390   if(mid!=-1)
391     freeSockWithLock(transPResponseSocketPool, mid, sd);
392   return 0;
393 }
394
395 int dfsOffsetTree(unsigned int baseoid, short * offsetarray, int sd, int numoffset) {
396   int depth=0, top=0;
397   unsigned int dfsList[numoffset];
398   oidAtDepth_t odep;
399
400   /* Initialize */
401   odep.oid = 0;
402   odep.depth = depth;
403   int i;
404   for(i = 0; i<numoffset; i++) {
405     dfsList[i] = 0;
406   }
407
408   //Start searching the dfsList
409   while(top >= 0) {
410     int retval;
411     if((retval = getNextOid(offsetarray, dfsList, &top, &depth, &odep, baseoid)) != 0) {
412       printf("%s() Error: Getting new oid at %s, %d\n", __func__, __FILE__, __LINE__);
413       return -1;
414     }
415     dfsList[top] = odep.oid;
416     dfsList[top+1] = 0;
417 labelL1:
418     ;
419     objheader_t *objhead = searchObj(dfsList[top]);
420     if(objhead == NULL) { //null oid or oid not found
421       int retval;
422       if((retval = sendOidNotFound(dfsList[top], sd)) != 0) {
423         printf("%s() Error in sendOidNotFound() at line %d in %s()\n", __func__, __LINE__, __FILE__);
424         return -1;
425       }
426       //go up the tree
427       while((dfsList[top+1] == *(offsetarray + depth + 1)) && (depth >= 0)) {
428         if(top == depth) {
429           top -= 2;
430           depth -= 2;
431         } else {
432           depth -= 2;
433         }
434       }
435       //return if no more paths to explore
436       if(top < 0 || depth < 0) {
437         return 0;
438       }
439       //If more paths to explore, proceed down the tree
440       dfsList[top+1]++;
441       int prev = top - 2;
442       objheader_t *header;
443       header = searchObj(dfsList[prev]);
444       if(header == NULL) {
445         printf("%s() Error Object not found at %s , %d\n", __func__, __FILE__, __LINE__);
446         return -1;
447         //return 0;
448       } else {
449         //if Array
450         if(TYPE(header) > NUMCLASSES) {
451           dfsList[top] = getNextArrayOid(offsetarray, dfsList, &top, &depth);
452         } else { //linked list
453           dfsList[top] = getNextPointerOid(offsetarray, dfsList, &top, &depth);
454         }
455         goto labelL1;
456       }
457     } else { // increment and go down the tree
458       if((retval = sendOidFound(OID(objhead), sd)) != 0) {
459         printf("%s() Error in sendOidFound() at line %d in %s()\n", __func__, __LINE__, __FILE__);
460         return -1;
461       }
462       //Increment top
463       top += 2;
464       depth += 2;
465       if(depth >= numoffset) { //reached the end of the path
466         top -= 2;
467         depth -= 2;
468         //go up the tree
469         while((dfsList[top + 1] == *(offsetarray + depth + 1)) && (depth >= 0)) {
470           if(top == depth) {
471             top -= 2;
472             depth -= 2;
473           } else
474             depth -= 2;
475         }
476         //return if no more paths to explore
477         if(top < 0 || depth < 0) {
478           return 0;
479         }
480         //If more paths to explore, go down the tree
481         dfsList[top + 1]++;
482         int prev = top - 2;
483         objheader_t * header;
484         header = searchObj(dfsList[prev]);
485         if(header == NULL) {
486           printf("%s() Error Object not found at %s , %d\n", __func__, __FILE__, __LINE__);
487           return -1;
488           //return 0;
489         } else {
490           //if Array
491           if(TYPE(header) > NUMCLASSES) {
492             dfsList[top] = getNextArrayOid(offsetarray, dfsList, &top, &depth);
493           } else { //linked list
494             dfsList[top] = getNextPointerOid(offsetarray, dfsList, &top, &depth);
495           }
496           goto labelL1;
497         }
498       } else
499         continue;
500     }
501   } //end of while
502   return 0;
503 }
504
505 int getNextOid(short * offsetarray, unsigned int *dfsList, int *top, int *depth, oidAtDepth_t *odep, unsigned int baseoid) {
506   if(*top == 0) {
507     odep->oid = baseoid;
508     odep->depth = 0;
509   } else {
510     int prev = (*top) - 2;
511     unsigned int oid = *(dfsList+prev);
512     objheader_t * header = searchObj(oid);
513     if(header == NULL) {
514       odep->oid = 0;
515       odep->depth = 0;
516       return 0;
517     } else {
518       int range = GET_RANGE(*(offsetarray+(*depth) + 1));
519       short stride = GET_STRIDE(*(offsetarray+(*depth) + 1));
520       stride++; //Note bit pattern 000 => stride = 1 etc
521       //if Array
522       if(TYPE(header) > NUMCLASSES) {
523         int elementsize = classsize[TYPE(header)];
524         struct ArrayObject *ao = (struct ArrayObject *) (((char *)header) + sizeof(objheader_t));
525         int length = ao->___length___;
526         //check is stride is +ve or -ve
527         int sign;
528         if(GET_STRIDEINC(*(offsetarray+ (*depth) + 1))) {
529           sign = -1;
530         } else {
531           sign = 1;
532         }
533         int startelement = *(offsetarray + (*depth));
534         if(startelement < 0 || startelement >=length) {
535           printf("%s() Error: Offset out of range at %d\n", __func__, __LINE__);
536           odep->oid = 0;
537           odep->depth = 0;
538           return 0;
539         }
540         int index = *(dfsList+(*top)+1);
541         odep->oid = *((unsigned int *)(((char *)ao) + sizeof(struct ArrayObject) \
542                                        + (elementsize * (startelement + (sign*stride*index)))));
543         odep->depth = *(depth);
544       } else { //linked list
545         int dep;
546         int startelement;
547         if(range > 0) { //go to the next offset
548           startelement = *((int *)(offsetarray + (*depth) + 2));
549           dep = *depth + 2;
550         } else if(range == 0) {
551           startelement = *((int *)(offsetarray + (*depth)));
552           dep = *depth;
553         } else { //range < 0
554           odep->oid = 0;
555           odep->depth = 0;
556           return 0;
557         }
558         odep->oid = *((unsigned int *)(((char *)header) + sizeof(objheader_t) + startelement));
559         odep->depth = dep;
560       }
561     }
562   }
563   return 0;
564 }
565
566 unsigned int getNextArrayOid(short *offsetarray, unsigned int *dfsList, int *top, int* depth) {
567   int prev = (*top) - 2;
568   unsigned int oid = *(dfsList + prev);
569   objheader_t *header = searchObj(oid);
570   if(header == NULL) {
571     printf("%s() Error: Object not found at %s , %d\n", __func__, __FILE__, __LINE__);
572     return 0;
573   } else {
574     short stride = GET_STRIDE(*(offsetarray+(*depth) + 1));
575     stride++; //Note bit pattern 000 => stride = 1 etc
576     //check is stride is +ve or -ve
577     int sign;
578     if(GET_STRIDEINC(*(offsetarray+ (*depth) + 1))) {
579       sign = -1;
580     } else {
581       sign = 1;
582     }
583     int elementsize = classsize[TYPE(header)];
584     struct ArrayObject *ao = (struct ArrayObject *) (((char *)header) + sizeof(objheader_t));
585     int length = ao->___length___;
586     int startelement = *(offsetarray + (*depth));
587     if(startelement < 0 || startelement >=length) {
588       printf("%s() Error: Offset out of range at %d\n", __func__, __LINE__);
589       return 0;
590     }
591     int index = *(dfsList + *top + 1);
592     oid = *((unsigned int *)(((char *)ao) + sizeof(struct ArrayObject) \
593                              + (elementsize * (startelement + (sign*stride*index)))));
594   }
595   return oid;
596 }
597
598 unsigned int getNextPointerOid(short *offsetarray, unsigned int *dfsList, int *top, int* depth) {
599   int prev;
600   if(*(dfsList + *top + 1) > 1) { //tells which offset to calculate the oid from
601     prev = *top;
602   } else {
603     prev = *top - 2;
604   }
605   unsigned int oid = *(dfsList + prev);
606   objheader_t *header = searchObj(oid);
607   if(header == NULL) {
608     printf("%s() Error: Object not found at %s , %d\n", __func__, __FILE__, __LINE__);
609     return 0;
610   } else {
611     int startelement = *(offsetarray + *depth);
612     oid = *((unsigned int *)(((char *)header) + sizeof(objheader_t) + startelement));
613     //TODO add optimization for checking if this oid has already not been found
614   }
615   return oid;
616 }
617
618 int sendOidFound(unsigned int oid, int sd) {
619   objheader_t *header;
620   if((header = (objheader_t *) mhashSearch(oid)) != NULL) {
621     ;
622   } else if((header = (objheader_t *) prehashSearch(oid))!=NULL) {
623     ;
624   } else {
625     printf("%s() Error: THIS SHOULD NOT HAPPEN at line %d in %s()\n", __func__, __LINE__, __FILE__);
626     return -1;
627   }
628
629   int incr = 0;
630   int objsize;
631   GETSIZE(objsize, header);
632   int size  = sizeof(int) + sizeof(char) + sizeof(unsigned int) + sizeof(objheader_t) + objsize;
633   char sendbuffer[size];
634   *((int *)(sendbuffer + incr)) = size;
635   incr += sizeof(int);
636   *((char *)(sendbuffer + incr)) = OBJECT_FOUND;
637   incr += sizeof(char);
638   *((unsigned int *)(sendbuffer + incr)) = oid;
639   incr += sizeof(unsigned int);
640   memcpy(sendbuffer + incr, header, objsize + sizeof(objheader_t));
641
642   char control = TRANS_PREFETCH_RESPONSE;
643   sendPrefetchResponse(sd, &control, sendbuffer, &size);
644   return 0;
645 }
646
647 int sendOidNotFound(unsigned int oid, int sd) {
648   int size  = sizeof(int) + sizeof(char) + sizeof(unsigned int);
649   char sendbuffer[size];
650   *((int *)sendbuffer) = size;
651   *((char *)(sendbuffer + sizeof(int))) = OBJECT_NOT_FOUND;
652   *((unsigned int *)(sendbuffer + sizeof(int) + sizeof(unsigned int))) = oid;
653   char control = TRANS_PREFETCH_RESPONSE;
654   sendPrefetchResponse(sd, &control, sendbuffer, &size);
655   return 0;
656 }