extern int classsize[];
objstr_t *mainobjstore;
plistnode_t *createPiles(transrecord_t *);
-int checkPrefetchTuples(int **, int *, short);
+//int checkPrefetchTuples(int **, int *, short);
inline int arrayLength(int *array) {
int i;
return(objheader);
} else if ((objheader = (objheader_t *) mhashSearch(oid)) != NULL) {
/* Look up in machine lookup table and copy into cache*/
- tmp = mhashSearch(oid);
+// tmp = mhashSearch(oid);
size = sizeof(objheader_t)+classsize[tmp->type];
objcopy = objstrAlloc(record->cache, size);
- memcpy(objcopy, (void *)tmp, size);
+ memcpy(objcopy, (void *)objheader, size);
/* Insert into cache's lookup table */
chashInsert(record->lookupTable, objheader->oid, objcopy);
return(objcopy);
thread_data_array[threadnum].threshold = &tcond;
thread_data_array[threadnum].lock = &tlock;
thread_data_array[threadnum].count = &trecvcount;
- //thread_data_array[threadnum].localstatus = &localstat;
thread_data_array[threadnum].replyctrl = &treplyctrl;
thread_data_array[threadnum].replyretry = &treplyretry;
thread_data_array[threadnum].rec = record;
}
/*This function makes piles to prefetch records and prefetches the oids from remote machines */
-int transPrefetch(int *arrayofoffset[], short numoids){
- int i, k;
+int transPrefetch(transrecord_t *record, int *arrayofoffset[], short numoids){
+ int i, k = 0, rc;
int arraylength[numoids];
unsigned int machinenumber;
objheader_t *tmp, *objheader;
void *objcopy;
int size;
- void *buf;
+ pthread_attr_t attr;
/* Given tuple find length of tuple*/
for(i = 0; i < numoids ; i++) {
}
/* Check for similar tuples or other special case tuples that can be combined to a
* prefetch message*/
- checkPrefetchTuples(arrayofoffset, arraylength, numoids);
-
+ if(checkPrefetchTuples(arrayofoffset, arraylength, numoids) != 0) {
+ printf("Error on checkPrefetchTuples at %s, %d\n", __FILE__, __LINE__);
+ return 1;
+ }
/* Check if part of prefetch request available locally */
for(i = 0; i < numoids ; i++) {
- while(arrayofoffsets[i][k] != -1) {
- if((objheader = (objheader_t *) mhashSearch(arrayofoffsets[i][k])) != NULL) {
+ if(arrayofoffset[i][0] != -1) {
+ if((objheader = (objheader_t *) mhashSearch(arrayofoffset[i][k])) != NULL) {
/* Look up in machine lookup table and copy into cache*/
- tmp = mhashSearch(oid);
+ //tmp = mhashSearch(oid);
size = sizeof(objheader_t)+classsize[tmp->type];
objcopy = objstrAlloc(record->cache, size);
- memcpy(objcopy, (void *)tmp, size);
+ memcpy(objcopy, (void *)objheader, size);
/* Insert into cache's lookup table */
chashInsert(record->lookupTable, objheader->oid, objcopy);
/* Find the offset field*/
- oid = (int)(tmp + sizeof(objheader_t) + arrayofoffsets[i][k+1]);
- } else { /* If not found in machine look up */
- /* Get the object from the remote location */
- machinenumber = lhashSearch(oid);
- /* Create Machine Piles t send prefetch requests use threads*/
- /* For each Pile in the machine send TRANS_PREFETCH */
+ if(foundLocal(arrayofoffset, record, arrayofoffset[i], tmp, arrayofoffset[i][k+1], i) != 0 ) {
+ printf("Error in foundLocal() %s, %d\n", __FILE__, __LINE__);
+ return 1;
+ }
+ } else
+ continue;
+ } else
+ continue;
+ }
+
+ /* Initialize and set thread attributes
+ * Spawn a thread for each prefetch request sent*/
+ pthread_t thread[numoids];
+ pthread_attr_init(&attr);
+ pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_JOINABLE);
+
+ /* Create Machine Piles to send prefetch requests use threads*/
+ for( i = 0 ; i< numoids ; i++) {
+ if(arrayofoffset[i][0] == -1)
+ continue;
+ else{
+ /* For each Pile in the machine send TRANS_PREFETCH */
+ //makePiles(arrayofoffset, numoids);
+ /* Fill thread data structure */
+ rc = pthread_create(&thread[i] , &attr, sendPrefetchReq, (void *) arrayofoffset[i]);
+ if (rc) {
+ perror("Error in pthread create at transPrefetch()\n");
+ return 1;
}
- k = k+1;
+
+ }
+ }
+
+ /* Free attribute and wait to join other threads */
+ for (i = 0 ;i < numoids ; i++) {
+ rc = pthread_join(thread[i], NULL);
+ if (rc) {
+ perror("Error pthread_join() in transPrefetch()\n");
+ return 1;
}
}
+ pthread_attr_destroy(&attr);
+
+ return 0;
}
* for case x.a.b and y.a.b where x and y have same oid's
* or if a.b.c is a subset of x.b.c.d*/
/* check for case where the generated request a.y.z or x.y.z.g then
- * prefetch needs to be generated for x.y.z.g */
+ * prefetch needs to be generated for x.y.z.g if oid of a and x are same*/
for(i = 0; i < numoids -1 ; i++) {
if(arrayofoffset[i][0] == -1)
continue;
larray = arrayofoffset[j];
}
/* From first offset until end of tuple compare all offsets of sarray and larray
- * if not a match then break */
- for(k = 1 ; slength -1 ; k++) {
+ * if not a match then break */
+ for(k = 1 ; k < slength -1 ; k++) {
if(sarray[k] != larray[k]) {
break;
}
return 0;
}
+
+/* This function goes through an array and finds out until what offets
+ * * can it make a new pile
+ * * -1 indicates a special character to mark end of oid, offset tuple*/
+int foundLocal(int *arrayofoffset[], transrecord_t *record, int *array, objheader_t *header, int offset, int index) {
+ int i, j, counter, len;
+ unsigned int oid;
+ void *objcopy;
+ objheader_t *tmp, *objheader;
+ int size;
+
+ /* The oid corresponding to the first offset */
+ oid = (int) (header + sizeof(objheader_t) + offset);
+ /* Repeat for all offset values in the array */
+ for(i = 1; array[i] != -1 ; ) {
+ /* If oid is found locally then insert into cache and continue
+ * with next offset to find the corresponding oid */
+ if((objheader = (objheader_t*) mhashSearch(oid)) != NULL) {
+ tmp = mhashSearch(oid);
+ size = sizeof(objheader_t)+classsize[tmp->type];
+ objcopy = objstrAlloc(record->cache, size);
+ memcpy(objcopy, (void *)tmp, size);
+ /* Insert into cache's lookup table */
+ chashInsert(record->lookupTable, objheader->oid, objcopy);
+ oid = (int) (tmp + sizeof(objheader_t) + array[i+1]);
+ } else {
+ /*If oid not found locally then
+ *assign the latest oid found as the first element
+ *of array and copy left over offsets as other
+ *array elements to build the new prefetch array*/
+
+ arrayofoffset[index][0] = (int) oid;
+ counter = i;
+ len = arrayLength((int *)array[index]);
+ for( j = 1 ; counter <= len; j++,counter++) {
+ arrayofoffset[index][j] = array[counter + 1];
+ }
+ break;
+ }
+ i++;
+ }
+
+ /* If all offsets are found locally, then do not treat this as valid prefetch tuple */
+ if(i == arrayLength((int *)array[index]) - 1) {
+ arrayofoffset[index][0] = -1; //Mark beginning of array as -1
+ }
+
+ return 0;
+}
+
+void *sendPrefetchReq(void *prefetchtuple) {
+ int sd, i;
+ struct sockaddr_in serv_addr;
+ struct hostent *server;
+ char buffer[RECEIVE_BUFFER_SIZE], control, recvcontrol;
+ char machineip[16], retval;
+
+
+ /* Send Trans Prefetch Request */
+ if ((sd = socket(AF_INET, SOCK_STREAM, 0)) < 0) {
+ perror("Error in socket for TRANS_REQUEST\n");
+ return NULL;
+ }
+ bzero((char*) &serv_addr, sizeof(serv_addr));
+ serv_addr.sin_family = AF_INET;
+ serv_addr.sin_port = htons(LISTEN_PORT);
+ //midtoIP(tdata->mid,machineip);
+// machineip[15] = '\0';
+// serv_addr.sin_addr.s_addr = inet_addr(machineip);
+
+ /* Open Connection */
+ if (connect(sd, (struct sockaddr *) &serv_addr, sizeof(struct sockaddr)) < 0) {
+ perror("Error in connect for TRANS_REQUEST\n");
+ return NULL;
+ }
+
+ close(sd);
+ pthread_exit(NULL);
+
+}