6 extern sockPoolHashTable_t *transPrefetchSockPool;
7 extern unsigned int myIpAddr;
8 extern sockPoolHashTable_t *transPResponseSocketPool;
9 extern pthread_mutex_t prefetchcache_mutex;
10 extern prehashtable_t pflookup;
12 // Function for new prefetch call
13 void rangePrefetch(unsigned int oid, short numoffset, short *offsets) {
14 /* Allocate memory in prefetch queue and push the block there */
15 int qnodesize = sizeof(unsigned int) + sizeof(unsigned short) + numoffset * sizeof(short);
16 char *node = (char *) getmemory(qnodesize);
23 ((unsigned int *)node)[0] = oid;
24 index = index + (sizeof(unsigned int));
25 *((short *)(node+index)) = numoffset;
26 index = index + (sizeof(short));
27 memcpy(node+index, offsets, numoffset * sizeof(short));
32 void *transPrefetchNew() {
34 /* Read from prefetch queue */
35 void *node = gettail();
37 /* Check tuples if they are found locally */
38 perMcPrefetchList_t* pilehead = checkIfLocal(node);
41 // Get sock from shared pool
42 int sd = getSock2(transPrefetchSockPool, pilehead->mid);
44 /* Send Prefetch Request */
45 perMcPrefetchList_t *ptr = pilehead;
47 sendRangePrefetchReq(ptr, sd);
51 /* Deallocated pilehead */
52 proPrefetchQDealloc(pilehead);
54 // Deallocate the prefetch queue pile node
59 int getsize(short *ptr, int n) {
60 int sum = 0, newsum, i;
61 for (i = n-1; i >= 0; i--) {
62 newsum = (1 + ptr[i])+((1 + ptr[i])*sum);
69 perMcPrefetchList_t *checkIfLocal(char *ptr) {
70 unsigned int oid = *(GET_OID(ptr));
71 short numoffset = *(GET_NUM_OFFSETS(ptr));
72 short *offsetarray = GET_OFFSETS(ptr);
74 unsigned int dfsList[numoffset];
78 perMcPrefetchList_t *head = NULL;
82 for(i = 0; i<numoffset; i++) {
86 //Start searching the dfsList
89 if((retval = getNextOid(offsetarray, dfsList, &top, &depth, &odep, oid)) != 0) {
90 printf("%s() Error: Getting new oid at %s, %d\n", __func__, __FILE__, __LINE__);
93 dfsList[top] = odep.oid;
97 objheader_t *objhead = searchObj(dfsList[top]);
98 if(objhead == NULL) { //null oid or oid not found
100 int machinenum = lhashSearch(dfsList[top]);
101 insertPrefetch(machinenum, dfsList[top], numoffset-(depth), &offsetarray[depth], &head);
103 while((dfsList[top+1] == *(offsetarray + depth + 1)) && (depth >= 0)) {
111 //return if no more paths to explore
112 if(top < 0 || depth < 0) {
115 //If more paths to explore, proceed down the tree
119 header = searchObj(dfsList[prev]);
121 printf("%s() Error Object not found at %s , %d\n", __func__, __FILE__, __LINE__);
125 if(TYPE(header) > NUMCLASSES) {
126 dfsList[top] = getNextArrayOid(offsetarray, dfsList, &top, &depth);
127 } else { //linked list
128 dfsList[top] = getNextPointerOid(offsetarray, dfsList, &top, &depth);
132 } else { // increment and go down the tree
136 if(depth >= numoffset) { //reached the end of the path
140 while((dfsList[top + 1] == *(offsetarray + depth + 1)) && (depth >= 0)) {
147 //return if no more paths to explore
148 if(top < 0 || depth < 0) {
151 //If more paths to explore, go down the tree
154 objheader_t * header;
155 header = searchObj(dfsList[prev]);
157 printf("%s() Error Object not found at %s , %d\n", __func__, __FILE__, __LINE__);
161 if(TYPE(header) > NUMCLASSES) {
162 dfsList[top] = getNextArrayOid(offsetarray, dfsList, &top, &depth);
163 } else { //linked list
164 dfsList[top] = getNextPointerOid(offsetarray, dfsList, &top, &depth);
175 objheader_t *searchObj(unsigned int oid) {
176 objheader_t *header = NULL;
178 if ((header = (objheader_t *)mhashSearch(oid)) != NULL) {
180 } else if ((header = (objheader_t *) prehashSearch(oid)) != NULL) {
183 //printf("Error: Cannot find header %s, %d\n", __func__, __LINE__);
188 /* Delete perMcPrefetchList_t and everything it points to */
189 void proPrefetchQDealloc(perMcPrefetchList_t *node) {
190 perMcPrefetchList_t *prefetchpile_ptr;
191 perMcPrefetchList_t *prefetchpile_next_ptr;
192 objOffsetPile_t *objpile_ptr;
193 objOffsetPile_t *objpile_next_ptr;
195 prefetchpile_ptr = node;
196 while (prefetchpile_ptr != NULL) {
197 prefetchpile_next_ptr = prefetchpile_ptr;
198 while(prefetchpile_ptr->list != NULL) {
199 //offsets aren't owned by us, so we don't free them.
200 objpile_ptr = prefetchpile_ptr->list;
201 prefetchpile_ptr->list = objpile_ptr->next;
204 prefetchpile_ptr = prefetchpile_next_ptr->next;
205 free(prefetchpile_next_ptr);
209 void insertPrefetch(int mid, unsigned int oid, short numoffset, short *offsets, perMcPrefetchList_t **head) {
210 perMcPrefetchList_t *ptr;
211 objOffsetPile_t *objnode;
212 objOffsetPile_t **tmp;
214 //Loop through the machines
215 for(; 1; head=&((*head)->next)) {
217 if ((*head)==NULL||(tmid=(*head)->mid)>mid) {
218 perMcPrefetchList_t * tmp = (perMcPrefetchList_t *) malloc(sizeof(perMcPrefetchList_t));
220 objnode = malloc(sizeof(objOffsetPile_t));
221 objnode->offsets = offsets;
223 objnode->numoffset = numoffset;
224 objnode->next = NULL;
236 for(tmp=&((*head)->list); 1; tmp=&((*tmp)->next)) {
240 if ((*tmp)==NULL||((toid=(*tmp)->oid)>oid)) {
241 objnode = (objOffsetPile_t *) malloc(sizeof(objOffsetPile_t));
242 objnode->offsets = offsets;
244 objnode->numoffset = numoffset;
245 objnode->next = *tmp;
254 int onumoffset=(*tmp)->numoffset;
255 short * ooffset=(*tmp)->offsets;
257 for(i=0; i<numoffset; i++) {
259 //We've matched, let's just extend the current prefetch
260 (*tmp)->numoffset=numoffset;
261 (*tmp)->offsets=offsets;
264 if (ooffset[i]<offsets[i]) {
266 } else if (ooffset[i]>offsets[i]) {
267 //Place item before the current one
268 objnode = (objOffsetPile_t *) malloc(sizeof(objOffsetPile_t));
269 objnode->offsets = offsets;
271 objnode->numoffset = numoffset;
272 objnode->next = *tmp;
277 //if we get to the end, we're already covered by this prefetch
285 void sendRangePrefetchReq(perMcPrefetchList_t *mcpilenode, int sd) {
288 objOffsetPile_t *tmp;
290 /* Send TRANS_PREFETCH control message */
291 control = TRANS_PREFETCH;
292 send_data(sd, &control, sizeof(char));
294 /* Send Oids and offsets in pairs */
295 tmp = mcpilenode->list;
297 len = sizeof(int) + sizeof(unsigned int) + sizeof(unsigned int) + ((tmp->numoffset) * sizeof(short));
298 char oidnoffset[len];
299 char *buf=oidnoffset;
300 *((int*)buf) = tmp->numoffset;
302 *((unsigned int *)buf) = tmp->oid;
303 buf+=sizeof(unsigned int);
304 *((unsigned int *)buf) = myIpAddr;
305 buf += sizeof(unsigned int);
306 memcpy(buf, tmp->offsets, (tmp->numoffset)*sizeof(short));
307 send_data(sd, oidnoffset, len);
311 /* Send a special char -1 to represent the end of sending oids + offset pair to remote machine */
313 send_data(sd, &endpair, sizeof(int));
317 int getRangePrefetchResponse(int sd) {
319 recv_data(sd, &length, sizeof(int));
320 int size = length - sizeof(int);
321 char recvbuffer[size];
322 recv_data(sd, recvbuffer, size);
323 char control = *((char *) recvbuffer);
325 if(control == OBJECT_FOUND) {
326 oid = *((unsigned int *)(recvbuffer + sizeof(char)));
327 size = size - (sizeof(char) + sizeof(unsigned int));
328 pthread_mutex_lock(&prefetchcache_mutex);
330 if((ptr = prefetchobjstrAlloc(size)) == NULL) {
331 printf("%s() Error: objstrAlloc error for copying into prefetch cache in line %d at %s\n",
332 __func__, __LINE__, __FILE__);
333 pthread_mutex_unlock(&prefetchcache_mutex);
336 pthread_mutex_unlock(&prefetchcache_mutex);
337 memcpy(ptr, recvbuffer + sizeof(char) + sizeof(unsigned int), size);
340 /* Insert into prefetch hash lookup table */
342 if((oldptr = prehashSearch(oid)) != NULL) {
343 if(((objheader_t *)oldptr)->version <= ((objheader_t *)ptr)->version) {
345 prehashInsert(oid, ptr);
348 prehashInsert(oid, ptr);
350 pthread_mutex_lock(&pflookup.lock);
351 pthread_cond_broadcast(&pflookup.cond);
352 pthread_mutex_unlock(&pflookup.lock);
353 } else if(control == OBJECT_NOT_FOUND) {
354 oid = *((unsigned int *)(recvbuffer + sizeof(char)));
355 //printf("%s() Error: OBJ NOT FOUND.. THIS SHOULD NOT HAPPEN\n", __func__);
357 printf("%s() Error: in Decoding the control value %d, %s\n", __func__, __LINE__, __FILE__);
362 int rangePrefetchReq(int acceptfd) {
363 int numoffset, sd = -1;
364 unsigned int baseoid, mid = -1;
368 recv_data(acceptfd, &numoffset, sizeof(int));
371 recv_data(acceptfd, &oidmid, 2*sizeof(unsigned int));
372 baseoid = oidmid.oid;
373 if(mid != oidmid.mid) {
375 freeSockWithLock(transPResponseSocketPool, mid, sd);
377 sd = getSockWithLock(transPResponseSocketPool, mid);
379 short offsetsarry[numoffset];
380 recv_data(acceptfd, offsetsarry, numoffset*sizeof(short));
382 if((retval = dfsOffsetTree(baseoid, offsetsarry, sd, numoffset)) != 0) {
383 printf("%s() Error: in dfsOffsetTree() at line %d in %s()\n",
384 __func__, __LINE__, __FILE__);
391 freeSockWithLock(transPResponseSocketPool, mid, sd);
395 int dfsOffsetTree(unsigned int baseoid, short * offsetarray, int sd, int numoffset) {
397 unsigned int dfsList[numoffset];
404 for(i = 0; i<numoffset; i++) {
408 //Start searching the dfsList
411 if((retval = getNextOid(offsetarray, dfsList, &top, &depth, &odep, baseoid)) != 0) {
412 printf("%s() Error: Getting new oid at %s, %d\n", __func__, __FILE__, __LINE__);
415 dfsList[top] = odep.oid;
419 objheader_t *objhead = searchObj(dfsList[top]);
420 if(objhead == NULL) { //null oid or oid not found
422 if((retval = sendOidNotFound(dfsList[top], sd)) != 0) {
423 printf("%s() Error in sendOidNotFound() at line %d in %s()\n", __func__, __LINE__, __FILE__);
427 while((dfsList[top+1] == *(offsetarray + depth + 1)) && (depth >= 0)) {
435 //return if no more paths to explore
436 if(top < 0 || depth < 0) {
439 //If more paths to explore, proceed down the tree
443 header = searchObj(dfsList[prev]);
445 printf("%s() Error Object not found at %s , %d\n", __func__, __FILE__, __LINE__);
450 if(TYPE(header) > NUMCLASSES) {
451 dfsList[top] = getNextArrayOid(offsetarray, dfsList, &top, &depth);
452 } else { //linked list
453 dfsList[top] = getNextPointerOid(offsetarray, dfsList, &top, &depth);
457 } else { // increment and go down the tree
458 if((retval = sendOidFound(OID(objhead), sd)) != 0) {
459 printf("%s() Error in sendOidFound() at line %d in %s()\n", __func__, __LINE__, __FILE__);
465 if(depth >= numoffset) { //reached the end of the path
469 while((dfsList[top + 1] == *(offsetarray + depth + 1)) && (depth >= 0)) {
476 //return if no more paths to explore
477 if(top < 0 || depth < 0) {
480 //If more paths to explore, go down the tree
483 objheader_t * header;
484 header = searchObj(dfsList[prev]);
486 printf("%s() Error Object not found at %s , %d\n", __func__, __FILE__, __LINE__);
491 if(TYPE(header) > NUMCLASSES) {
492 dfsList[top] = getNextArrayOid(offsetarray, dfsList, &top, &depth);
493 } else { //linked list
494 dfsList[top] = getNextPointerOid(offsetarray, dfsList, &top, &depth);
505 int getNextOid(short * offsetarray, unsigned int *dfsList, int *top, int *depth, oidAtDepth_t *odep, unsigned int baseoid) {
510 int prev = (*top) - 2;
511 unsigned int oid = *(dfsList+prev);
512 objheader_t * header = searchObj(oid);
518 int range = GET_RANGE(*(offsetarray+(*depth) + 1));
519 short stride = GET_STRIDE(*(offsetarray+(*depth) + 1));
520 stride++; //Note bit pattern 000 => stride = 1 etc
522 if(TYPE(header) > NUMCLASSES) {
523 int elementsize = classsize[TYPE(header)];
524 struct ArrayObject *ao = (struct ArrayObject *) (((char *)header) + sizeof(objheader_t));
525 int length = ao->___length___;
526 //check is stride is +ve or -ve
528 if(GET_STRIDEINC(*(offsetarray+ (*depth) + 1))) {
533 int startelement = *(offsetarray + (*depth));
534 if(startelement < 0 || startelement >=length) {
535 printf("%s() Error: Offset out of range at %d\n", __func__, __LINE__);
540 int index = *(dfsList+(*top)+1);
541 odep->oid = *((unsigned int *)(((char *)ao) + sizeof(struct ArrayObject) \
542 + (elementsize * (startelement + (sign*stride*index)))));
543 odep->depth = *(depth);
544 } else { //linked list
547 if(range > 0) { //go to the next offset
548 startelement = *((int *)(offsetarray + (*depth) + 2));
550 } else if(range == 0) {
551 startelement = *((int *)(offsetarray + (*depth)));
558 odep->oid = *((unsigned int *)(((char *)header) + sizeof(objheader_t) + startelement));
566 unsigned int getNextArrayOid(short *offsetarray, unsigned int *dfsList, int *top, int* depth) {
567 int prev = (*top) - 2;
568 unsigned int oid = *(dfsList + prev);
569 objheader_t *header = searchObj(oid);
571 printf("%s() Error: Object not found at %s , %d\n", __func__, __FILE__, __LINE__);
574 short stride = GET_STRIDE(*(offsetarray+(*depth) + 1));
575 stride++; //Note bit pattern 000 => stride = 1 etc
576 //check is stride is +ve or -ve
578 if(GET_STRIDEINC(*(offsetarray+ (*depth) + 1))) {
583 int elementsize = classsize[TYPE(header)];
584 struct ArrayObject *ao = (struct ArrayObject *) (((char *)header) + sizeof(objheader_t));
585 int length = ao->___length___;
586 int startelement = *(offsetarray + (*depth));
587 if(startelement < 0 || startelement >=length) {
588 printf("%s() Error: Offset out of range at %d\n", __func__, __LINE__);
591 int index = *(dfsList + *top + 1);
592 oid = *((unsigned int *)(((char *)ao) + sizeof(struct ArrayObject) \
593 + (elementsize * (startelement + (sign*stride*index)))));
598 unsigned int getNextPointerOid(short *offsetarray, unsigned int *dfsList, int *top, int* depth) {
600 if(*(dfsList + *top + 1) > 1) { //tells which offset to calculate the oid from
605 unsigned int oid = *(dfsList + prev);
606 objheader_t *header = searchObj(oid);
608 printf("%s() Error: Object not found at %s , %d\n", __func__, __FILE__, __LINE__);
611 int startelement = *(offsetarray + *depth);
612 oid = *((unsigned int *)(((char *)header) + sizeof(objheader_t) + startelement));
613 //TODO add optimization for checking if this oid has already not been found
618 int sendOidFound(unsigned int oid, int sd) {
620 if((header = (objheader_t *) mhashSearch(oid)) != NULL) {
622 } else if((header = (objheader_t *) prehashSearch(oid))!=NULL) {
625 printf("%s() Error: THIS SHOULD NOT HAPPEN at line %d in %s()\n", __func__, __LINE__, __FILE__);
631 GETSIZE(objsize, header);
632 int size = sizeof(int) + sizeof(char) + sizeof(unsigned int) + sizeof(objheader_t) + objsize;
633 char sendbuffer[size];
634 *((int *)(sendbuffer + incr)) = size;
636 *((char *)(sendbuffer + incr)) = OBJECT_FOUND;
637 incr += sizeof(char);
638 *((unsigned int *)(sendbuffer + incr)) = oid;
639 incr += sizeof(unsigned int);
640 memcpy(sendbuffer + incr, header, objsize + sizeof(objheader_t));
642 char control = TRANS_PREFETCH_RESPONSE;
643 sendPrefetchResponse(sd, &control, sendbuffer, &size);
647 int sendOidNotFound(unsigned int oid, int sd) {
648 int size = sizeof(int) + sizeof(char) + sizeof(unsigned int);
649 char sendbuffer[size];
650 *((int *)sendbuffer) = size;
651 *((char *)(sendbuffer + sizeof(int))) = OBJECT_NOT_FOUND;
652 *((unsigned int *)(sendbuffer + sizeof(int) + sizeof(unsigned int))) = oid;
653 char control = TRANS_PREFETCH_RESPONSE;
654 sendPrefetchResponse(sd, &control, sendbuffer, &size);