#include "prefetch.h"
#include "prelookup.h"
#include "sockpool.h"
+#include "gCollect.h"
extern sockPoolHashTable_t *transPrefetchSockPool;
extern unsigned int myIpAddr;
extern sockPoolHashTable_t *transPResponseSocketPool;
+extern pthread_mutex_t prefetchcache_mutex;
+extern prehashtable_t pflookup;
// Function for new prefetch call
void rangePrefetch(unsigned int oid, short numoffset, short *offsets) {
int numLocal = 0;
perMcPrefetchList_t * head=NULL;
- //printf("Inside %s()\n", __func__);
// Iterate for the object
int noffset = (int) numoffsets;
tmpobjset[l] = GET_RANGE(offsets[2*l+1]);
}
int maxChldOids = getsize(tmpobjset, sizetmpObjSet)+1;
- //printf("%s() maxChldOids = %d\n", __func__, maxChldOids);
unsigned int chldOffstFrmBase[maxChldOids];
chldOffstFrmBase[0] = oid;
int tovisit = 0, visited = -1;
// Iterate for each element of offsets
- //printf("%s() noffset = %d, sizetmpObjSet= %d, visited = %d, tovisit= %d\n", __func__, noffset, sizetmpObjSet, visited, tovisit);
for (j = 0; j < noffset; j++) {
// Iterate over each element to be visited
while (visited != tovisit) {
visited++;
continue;
}
-
- if (!isOidAvail(chldOffstFrmBase[visited+1])) {
+ if (!checkoid(chldOffstFrmBase[visited+1])) {
// Add to remote requests
unsigned int oid = chldOffstFrmBase[visited+1];
int machinenum = lhashSearch(oid);
//TODO Group a bunch of oids to send in one prefetch request
- //printf("Oid Not Found, send Prefetch for oid = %d, noffset-j= %d j should point to the new offset = %d\n", oid, noffset-j, j);
insertPrefetch(machinenum, oid, noffset-j, &offsets[j], &head);
goto tuple;
} else {
return NULL;
}
}
- //printf("%s() visited = %d, tovisit= %d\n", __func__, visited, tovisit);
visited++;
- fflush(stdout);
}
} // end iterate for each element of offsets
}
tuple:
;
-
return head;
}
-int isOidAvail(unsigned int oid) {
- objheader_t * header;
- if((header=(objheader_t *)mhashSearch(oid))!=NULL) {
- //Found on machine
- return 1;
- } else if ((header=(objheader_t *)prehashSearch(oid))!=NULL) {
- return 1;
- } else {
- return 0;
- }
-}
-
int lookForObjs(int *chldOffstFrmBase, short *offsets,
int *index, int *visited, int *tovisit, int *noffset) {
- //printf("Inside %s()\n", __func__);
objheader_t *header;
unsigned int oid = chldOffstFrmBase[*visited+1];
if((header = (objheader_t *)mhashSearch(oid))!= NULL) {
//Found in prefetch cache
;
} else {
- printf("DEBUG->%s()THIS SHOULD NOR HAPPEN\n", __func__);
+ printf("%s() Error: THIS SHOULD NOT HAPPEN\n", __func__);
return -1;
}
/* Check if array out of bounds */
int startindex = offsets[*index];
int range = GET_RANGE(offsets[(*index)+1]);
- //printf("%s() Array range = %d\n", __func__, range);
if(range > 0 && range < length) {
short stride = GET_STRIDE(offsets[(*index)+1]);
stride = stride + 1; //NOTE bit pattern 000 => stride = 1, 001 => stride = 2
} else { //linked list
int startindex = offsets[*index];
int range = GET_RANGE(offsets[(*index)+1]);
- //printf("%s() LinkedList range = %d\n", __func__, range);
unsigned int oid = *((unsigned int *)(((char *)header) + sizeof(objheader_t) + startindex));
- //printf("Oid = %d\n", oid);
if (range == 0) {
chldOffstFrmBase[*tovisit+1] = oid;
- if(isOidAvail(oid)) {
+ if(checkoid(oid)) {
*visited = *visited + 1;
*tovisit = *tovisit + 1;
*index = *index + 2;
- //printf("%s() Found visited = %d, tovisit=%d, index=%d\n", __func__, *visited, *tovisit, *index);
return 1;
} else {
*tovisit = *tovisit + 1;
- //printf("%s() Not Found visited = %d, tovisit=%d, index=%d\n", __func__, *visited, *tovisit, *index);
return 1;
}
} else {
int i;
for(i = 0; i<range; i++) {
chldOffstFrmBase[*tovisit+1] = oid;
- if(isOidAvail(oid)) {
+ if(checkoid(oid)) {
//get the next object
if((header = (objheader_t *)mhashSearch(oid))!= NULL) {
//Found on machine
*visited = *visited + 1;
} else {
//update range
- offsets[(*index)+1]= (offsets[(*index)+1] & 0x0fff) - 1;
+ offsets[(*index)+1] = (offsets[(*index)+1] & 0x0fff) - 1; //TODO Check this range updation
*tovisit = *tovisit + 1;
return 1;
}
}
+ *index = *index + 2;
return 1;
}
}
/* Send a special char -1 to represent the end of sending oids + offset pair to remote machine */
endpair = -1;
send_data(sd, &endpair, sizeof(int));
-
return;
}
int getRangePrefetchResponse(int sd) {
- //printf("Inside %s()\n", __func__);
-
-
+ int length = 0;
+ recv_data(sd, &length, sizeof(int));
+ int size = length - sizeof(int);
+ char recvbuffer[size];
+ recv_data(sd, recvbuffer, size);
+ char control = *((char *) recvbuffer);
+ unsigned int oid;
+ if(control == OBJECT_FOUND) {
+ oid = *((unsigned int *)(recvbuffer + sizeof(char)));
+ size = size - (sizeof(char) + sizeof(unsigned int));
+ pthread_mutex_lock(&prefetchcache_mutex);
+ void *ptr;
+ if((ptr = prefetchobjstrAlloc(size)) == NULL) {
+ printf("%s() Error: objstrAlloc error for copying into prefetch cache in line %d at %s\n",
+ __func__, __LINE__, __FILE__);
+ pthread_mutex_unlock(&prefetchcache_mutex);
+ return -1;
+ }
+ pthread_mutex_unlock(&prefetchcache_mutex);
+ memcpy(ptr, recvbuffer + sizeof(char) + sizeof(unsigned int), size);
+ STATUS(ptr)=0;
+
+ /* Insert into prefetch hash lookup table */
+ void * oldptr;
+ if((oldptr = prehashSearch(oid)) != NULL) {
+ if(((objheader_t *)oldptr)->version <= ((objheader_t *)ptr)->version) {
+ prehashRemove(oid);
+ prehashInsert(oid, ptr);
+ }
+ } else {
+ prehashInsert(oid, ptr);
+ }
+ pthread_mutex_lock(&pflookup.lock);
+ pthread_cond_broadcast(&pflookup.cond);
+ pthread_mutex_unlock(&pflookup.lock);
+ } else if(control == OBJECT_NOT_FOUND) {
+ oid = *((unsigned int *)(recvbuffer + sizeof(char)));
+ //printf("%s() Error: OBJ NOT FOUND.. THIS SHOULD NOT HAPPEN\n", __func__);
+ } else {
+ printf("%s() Error: in Decoding the control value %d, %s\n", __func__, __LINE__, __FILE__);
+ }
return 0;
}
-
int rangePrefetchReq(int acceptfd) {
int numoffset, sd = -1;
unsigned int oid, mid = -1;
oidmidpair_t oidmid;
- //printf("Inside %s()\n", __func__);
while (1) {
recv_data(acceptfd, &numoffset, sizeof(int));
short offsetsarry[numoffset];
recv_data(acceptfd, offsetsarry, numoffset*sizeof(short));
- /*Process each oid */
+ /* Obj not found */
objheader_t *header;
if((header = (objheader_t *)mhashSearch(oid)) == NULL) {
int size = sizeof(int) + sizeof(char) + sizeof(unsigned int);
break;
} else { //Obj found
int retval;
- if((retval = processOidFound(header, offsetsarry, 0, sd)) != 0) {
+ if((retval = processOidFound(header, offsetsarry, 0, sd, numoffset)) != 0) {
printf("%s() Error: in processOidFound() at line %d in %s()\n",
__func__, __LINE__, __FILE__);
return -1;
}
}
}
-
//Release socket
if(mid!=-1)
freeSockWithLock(transPResponseSocketPool, mid, sd);
-
return 0;
}
-int processOidFound(objheader_t *header, short * offsetsarry, int index, int sd) {
+int processOidFound(objheader_t *header, short * offsetsarry, int index, int sd, int numoffset) {
int objsize;
GETSIZE(objsize, header);
int size = sizeof(int) + sizeof(char) + sizeof(unsigned int) +
char control = TRANS_PREFETCH_RESPONSE;
sendPrefetchResponse(sd, &control, sendbuffer, &size);
- if(TYPE(header) > NUMCLASSES) {
- int elementsize = classsize[TYPE(header)];
- struct ArrayObject *ao = (struct ArrayObject *) (((char *)header) + sizeof(objheader_t));
- int length = ao->___length___;
- /* Check if array out of bounds */
- int startindex = offsetsarry[index];
- int range = GET_RANGE(offsetsarry[index+1]);
- //printf("%s() Array range = %d\n", __func__, range);
- if(range > 0 && range < length) {
- short stride = GET_STRIDE(offsets[index+1]);
- stride = stride + 1; //NOTE bit pattern 000 => stride = 1, 001 => stride = 2
- int i;
- //check is stride +ve or negative
- if(GET_STRIDEINC(offsets[index]+1)) { //-ve stride
- for(i = startindex; i <= range+1; i = i - stride) {
- unsigned int oid = 0;
- if((i < 0 || i >= length)) {
- //if yes treat the object as found
- oid = 0;
- continue;
- } else {
- // compute new object
- oid = *((unsigned int *)(((char *)ao) + sizeof(struct ArrayObject) + (elementsize*i)));
- }
- if(oid == 0)
- goto end;
- }
- } else { //+ve stride
- for(i = startindex; i <= range; i = i + stride) {
- unsigned int oid = 0;
- if(i < 0 || i >= length) {
- //if yes treat the object as found
- oid = 0;
- continue;
- } else {
- // compute new object
- oid = *((unsigned int *)(((char *)ao) + sizeof(struct ArrayObject) + (elementsize*i)));
- }
- // add new object
- chldOffstFrmBase[*tovisit] = oid;
- *tovisit = *tovisit + 1;
- }
+ /* Calculate the oid corresponding to the offset value */
+ int i;
+ for(i = 0; i<numoffset;) {
+ //check for arrays
+ if(TYPE(header) > NUMCLASSES) {
+ int retval;
+ if((retval = processArrayOids(offsetsarry, header, &i, sd)) != 0) {
+ printf("%s() Error: in processArrayOids() at line %d for %s()\n", __func__, __LINE__, __FILE__);
+ return -1;
}
- } else if(range == 0) {
- if(startindex >=0 || startindex < length) {
- unsigned int oid = *((unsigned int *)(((char *)ao) + sizeof(struct ArrayObject) + (elementsize*startindex)));
- // add new object
- chldOffstFrmBase[*tovisit] = oid;
- *tovisit = *tovisit + 1;
+ } else { //linked list
+ int retval;
+ if((retval = processLinkedListOids(offsetsarry, header, &i, sd)) != 0) {
+ printf("%s() Error: in processLinkedListOids() at line %d for %s()\n", __func__, __LINE__, __FILE__);
+ return -1;
}
}
- *index = *index + 2;
- } else { //linked list
- int startindex = offsets[*index];
- int range = GET_RANGE(offsets[(*index)+1]);
- //printf("%s() LinkedList range = %d\n", __func__, range);
- unsigned int oid = *((unsigned int *)(((char *)header) + sizeof(objheader_t) + startindex));
- //printf("Oid = %d\n", oid);
- if (range == 0) {
- chldOffstFrmBase[*tovisit+1] = oid;
- if(isOidAvail(oid)) {
- *visited = *visited + 1;
- *tovisit = *tovisit + 1;
- *index = *index + 2;
- //printf("%s() Found visited = %d, tovisit=%d, index=%d\n", __func__, *visited, *tovisit, *index);
- return 1;
+ }
+ return 0;
+}
+
+int findOidinStride(short * offsets, struct ArrayObject *ao, int index, int elementsize,
+ int startindex, int range, int length, int sd) {
+ short stride = GET_STRIDE(offsets[index+1]);
+ stride = stride + 1; //NOTE bit pattern 000 => stride = 1, 001 => stride = 2
+ int i;
+ //check is stride +ve or negative
+ unsigned int oid;
+ if(GET_STRIDEINC(offsets[index]+1)) { //-ve stride
+ for(i = startindex; i <= range+1; i = i - stride) {
+ if((i < 0 || i >= length)) {
+ //if yes treat the object as found
+ oid = 0;
+ continue;
} else {
- *tovisit = *tovisit + 1;
- //printf("%s() Not Found visited = %d, tovisit=%d, index=%d\n", __func__, *visited, *tovisit, *index);
- return 1;
+ // compute new object
+ oid = *((unsigned int *)(((char *)ao) + sizeof(struct ArrayObject) + (elementsize*i)));
+ }
+ if(oid == 0)
+ break;
+ if(checkoid(oid)) {
+ int retval;
+ if((retval = sendOidFound(oid, sd)) != 0) {
+ printf("%s() Error in sendOidFound() at line %d in %s()\n", __func__, __LINE__, __FILE__);
+ return -1;
+ }
+ }
+ }
+ } else { //+ve stride
+ for(i = startindex; i <= range; i = i + stride) {
+ if(i < 0 || i >= length) {
+ //if yes treat the object as found
+ oid = 0;
+ continue;
+ } else {
+ // compute new object
+ oid = *((unsigned int *)(((char *)ao) + sizeof(struct ArrayObject) + (elementsize*i)));
+ }
+ if(oid == 0)
+ break;
+ if(checkoid(oid)) {
+ int retval;
+ if((retval = sendOidFound(oid, sd)) != 0) {
+ printf("%s() Error in sendOidFound() at line %d in %s()\n", __func__, __LINE__, __FILE__);
+ return -1;
+ }
+ }
+ }
+ }
+ return 0;
+}
+
+int sendOidFound(unsigned int oid, int sd) {
+ objheader_t *header;
+ if((header = (objheader_t *) mhashSearch(oid)) != NULL) {
+ ;
+ } else if((header = (objheader_t *) prehashSearch(oid))!=NULL) {
+ ;
+ } else {
+ printf("%s() Error: THIS SHOULD NOT HAPPEN at line %d in %s()\n", __func__, __LINE__, __FILE__);
+ return -1;
+ }
+
+ int incr = 0;
+ int objsize;
+ GETSIZE(objsize, header);
+ int size = sizeof(int) + sizeof(char) + sizeof(unsigned int) + sizeof(objheader_t) + objsize;
+ char sendbuffer[size];
+ *((int *)(sendbuffer + incr)) = size;
+ incr += sizeof(int);
+ *((char *)(sendbuffer + incr)) = OBJECT_FOUND;
+ incr += sizeof(char);
+ *((unsigned int *)(sendbuffer + incr)) = oid;
+ incr += sizeof(unsigned int);
+ memcpy(sendbuffer + incr, header, objsize + sizeof(objheader_t));
+
+ char control = TRANS_PREFETCH_RESPONSE;
+ sendPrefetchResponse(sd, &control, sendbuffer, &size);
+ return 0;
+}
+
+int processArrayOids(short *offsetsarry, objheader_t *header, int *index, int sd) {
+ int elementsize = classsize[TYPE(header)];
+ struct ArrayObject *ao = (struct ArrayObject *) (((char *)header) + sizeof(objheader_t));
+ int length = ao->___length___;
+ /* Check if array out of bounds */
+ int startindex = offsetsarry[*index];
+ int range = GET_RANGE(offsetsarry[*index+1]);
+ if(range > 0 && range < length) {
+ int retval;
+ if((retval = findOidinStride(offsetsarry, ao, *index, elementsize, startindex, range, length, sd)) != 0) {
+ printf("%s() Error: in findOidinStride in line %d at file %s\n", __func__, __LINE__, __FILE__);
+ return -1;
+ }
+ } else if(range == 0) {
+ unsigned int oid;
+ if(startindex >=0 || startindex < length) {
+ oid = *((unsigned int *)(((char *)ao) + sizeof(struct ArrayObject) + (elementsize*startindex)));
+ }
+ if(oid == 0)
+ return 0;
+ if(checkoid(oid)) {
+ int retval;
+ if((retval = sendOidFound(oid, sd)) != 0) {
+ printf("%s() Error: in sendOidFound() at line %d in %s()\n", __func__, __LINE__, __FILE__);
+ return -1;
}
} else {
- int i;
- for(i = 0; i<range; i++) {
- chldOffstFrmBase[*tovisit+1] = oid;
- if(isOidAvail(oid)) {
- //get the next object
- if((header = (objheader_t *)mhashSearch(oid))!= NULL) {
- //Found on machine
- ;
- } else if((header = (objheader_t *)prehashSearch(oid))!=NULL) {
- //Found in prefetch cache
- ;
- } else {
- ;
- }
- oid = *((unsigned int *)(((char *)header) + sizeof(objheader_t) + startindex));
- *tovisit = *tovisit + 1;
- *visited = *visited + 1;
+ int size = sizeof(int) + sizeof(char) + sizeof(unsigned int);
+ char sendbuffer[size];
+ *((int *) sendbuffer) = size;
+ *((char *)(sendbuffer + sizeof(int))) = OBJECT_NOT_FOUND;
+ *((unsigned int *)(sendbuffer + sizeof(int) + sizeof(char))) = oid;
+ char control = TRANS_PREFETCH_RESPONSE;
+ sendPrefetchResponse(sd, &control, sendbuffer, &size);
+ }
+ }
+ *index = *index + 2;
+ return 0;
+}
+
+int processLinkedListOids(short *offsetsarry, objheader_t *header, int *index, int sd) {
+ int startindex = offsetsarry[*index];
+ int range = GET_RANGE(offsetsarry[(*index)+1]);
+ unsigned int oid = *((unsigned int *)(((char *)header) + sizeof(objheader_t) + startindex));
+ if(oid == 0)
+ return 0;
+ if(range == 0) {
+ if(checkoid(oid)) {
+ int retval;
+ if((retval = sendOidFound(oid, sd)) != 0) {
+ printf("%s() Error: in sendOidFound() at line %d in %s()\n", __func__, __LINE__, __FILE__);
+ return -1;
+ }
+ }
+ } else {
+ int i;
+ for(i = 0; i<range; i++) {
+ if(checkoid(oid)) {
+ int retval;
+ if((retval = sendOidFound(oid, sd)) != 0) {
+ printf("%s() Error: in sendOidFound() at line %d in %s()\n", __func__, __LINE__, __FILE__);
+ return -1;
+ }
+ //get the next object
+ if((header = (objheader_t *)mhashSearch(oid))!= NULL) {
+ //Found on machine
+ ;
+ } else if((header = (objheader_t *)prehashSearch(oid))!=NULL) {
+ //Found in prefetch cache
+ ;
} else {
- //update range
- offsets[(*index)+1]= (offsets[(*index)+1] & 0x0fff) - 1;
- *tovisit = *tovisit + 1;
- return 1;
+ ;
}
+ oid = *((unsigned int *)(((char *)header) + sizeof(objheader_t) + startindex));
+ } else {
+ // Object not found
+ int size = sizeof(int) + sizeof(char) + sizeof(unsigned int);
+ char sendbuffer[size];
+ *((int *) sendbuffer) = size;
+ *((char *)(sendbuffer + sizeof(int))) = OBJECT_NOT_FOUND;
+ *((unsigned int *)(sendbuffer + sizeof(int) + sizeof(char))) = oid;
+ char control = TRANS_PREFETCH_RESPONSE;
+ sendPrefetchResponse(sd, &control, sendbuffer, &size);
+ return 0;
}
- return 1;
}
}
-end:
- ;
+ *index = *index + 2;
return 0;
}