untested code for prefetch request
authoradash <adash>
Wed, 4 Jul 2007 00:45:30 +0000 (00:45 +0000)
committeradash <adash>
Wed, 4 Jul 2007 00:45:30 +0000 (00:45 +0000)
a) check if part of prefetch request available locally
b) Spawn threads for each new connection to send prefetch request

Robust/src/Runtime/DSTM/interface/trans.c

index 573d221d08433231d08eea5b5be8e5412814860b..ad30d377cd4e96be4d761e9053b563d32288abb6 100644 (file)
@@ -20,7 +20,7 @@
 extern int classsize[];
 objstr_t *mainobjstore;
 plistnode_t *createPiles(transrecord_t *);
-int checkPrefetchTuples(int **, int *, short);
+//int checkPrefetchTuples(int **, int *, short);
 
 inline int arrayLength(int *array) {
        int i;
@@ -64,10 +64,10 @@ objheader_t *transRead(transrecord_t *record, unsigned int oid)
                return(objheader);
        } else if ((objheader = (objheader_t *) mhashSearch(oid)) != NULL) {
                /* Look up in machine lookup table  and copy  into cache*/
-               tmp = mhashSearch(oid);
+//             tmp = mhashSearch(oid);
                size = sizeof(objheader_t)+classsize[tmp->type];
                objcopy = objstrAlloc(record->cache, size);
-               memcpy(objcopy, (void *)tmp, size);
+               memcpy(objcopy, (void *)objheader, size);
                /* Insert into cache's lookup table */
                chashInsert(record->lookupTable, objheader->oid, objcopy); 
                return(objcopy);
@@ -235,7 +235,6 @@ int transCommit(transrecord_t *record) {
                thread_data_array[threadnum].threshold = &tcond;
                thread_data_array[threadnum].lock = &tlock;
                thread_data_array[threadnum].count = &trecvcount;
-               //thread_data_array[threadnum].localstatus = &localstat;
                thread_data_array[threadnum].replyctrl = &treplyctrl;
                thread_data_array[threadnum].replyretry = &treplyretry;
                thread_data_array[threadnum].rec = record;
@@ -819,14 +818,14 @@ int transAbortProcess(void *modptr, unsigned int *objlocked, int numlocked, int
  }
 
 /*This function makes piles to prefetch records and prefetches the oids from remote machines */
-int transPrefetch(int *arrayofoffset[], short numoids){
-       int i, k;
+int transPrefetch(transrecord_t *record, int *arrayofoffset[], short numoids){
+       int i, k = 0, rc;
        int arraylength[numoids];
        unsigned int machinenumber;
        objheader_t *tmp, *objheader;
        void *objcopy;
        int size;
-       void *buf;
+       pthread_attr_t attr;
 
        /* Given tuple find length of tuple*/
        for(i = 0; i < numoids ; i++) {
@@ -834,31 +833,67 @@ int transPrefetch(int *arrayofoffset[], short numoids){
        }
        /* Check for similar tuples or  other special case tuples that can be combined to a 
         * prefetch message*/
-       checkPrefetchTuples(arrayofoffset, arraylength, numoids);
-
+       if(checkPrefetchTuples(arrayofoffset, arraylength, numoids) != 0) {
+               printf("Error on checkPrefetchTuples at %s, %d\n", __FILE__, __LINE__);
+               return 1;
+       }
 
        /* Check if part of prefetch request available locally */
        for(i = 0; i < numoids ; i++) {
-               while(arrayofoffsets[i][k] != -1) {
-                       if((objheader = (objheader_t *) mhashSearch(arrayofoffsets[i][k])) != NULL) {
+               if(arrayofoffset[i][0] != -1) {
+                       if((objheader = (objheader_t *) mhashSearch(arrayofoffset[i][k])) != NULL) {
                                /* Look up in machine lookup table  and copy  into cache*/
-                               tmp = mhashSearch(oid);
+                               //tmp = mhashSearch(oid);
                                size = sizeof(objheader_t)+classsize[tmp->type];
                                objcopy = objstrAlloc(record->cache, size);
-                               memcpy(objcopy, (void *)tmp, size);
+                               memcpy(objcopy, (void *)objheader, size);
                                /* Insert into cache's lookup table */
                                chashInsert(record->lookupTable, objheader->oid, objcopy); 
                                /* Find the offset field*/
-                               oid = (int)(tmp + sizeof(objheader_t) + arrayofoffsets[i][k+1]);
-                       } else  { /* If not found in machine look up */
-                               /* Get the object from the remote location */
-                               machinenumber = lhashSearch(oid);
-                               /* Create  Machine Piles t send prefetch requests use threads*/
-                               /* For each Pile in the machine send TRANS_PREFETCH */
+                               if(foundLocal(arrayofoffset, record, arrayofoffset[i], tmp, arrayofoffset[i][k+1], i) != 0 ) {
+                                       printf("Error in foundLocal() %s, %d\n", __FILE__, __LINE__);
+                                       return 1;
+                               }
+                       } else 
+                               continue;
+               } else 
+                       continue;
+       }
+
+       /* Initialize and set thread attributes 
+        * Spawn a thread for each prefetch request sent*/
+       pthread_t thread[numoids];
+       pthread_attr_init(&attr);
+       pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_JOINABLE);
+               
+       /* Create  Machine Piles to send prefetch requests use threads*/
+       for( i = 0 ; i< numoids ; i++) {
+               if(arrayofoffset[i][0] == -1) 
+                       continue;
+               else{
+                       /* For each Pile in the machine send TRANS_PREFETCH */
+                       //makePiles(arrayofoffset, numoids);
+                       /* Fill thread data structure */
+                       rc = pthread_create(&thread[i] , &attr, sendPrefetchReq, (void *) arrayofoffset[i]);
+                       if (rc) {
+                               perror("Error in pthread create at transPrefetch()\n");
+                               return 1;
                        }
-                       k = k+1;
+
+               }
+       }
+
+       /* Free attribute and wait to join other threads */
+       for (i = 0 ;i < numoids ; i++) {
+               rc = pthread_join(thread[i], NULL);
+               if (rc) {
+                       perror("Error pthread_join() in transPrefetch()\n");
+                       return 1;
                }
        }
+       pthread_attr_destroy(&attr);
+       
+       return 0;
                
 }
 
@@ -870,7 +905,7 @@ int checkPrefetchTuples(int *arrayofoffset[], int *arraylength, short numoids) {
         * for case x.a.b and y.a.b where x and y have same oid's
         * or if a.b.c is a subset of x.b.c.d*/ 
        /* check for case where the generated request a.y.z or x.y.z.g then 
-        * prefetch needs to be generated for x.y.z.g */
+        * prefetch needs to be generated for x.y.z.g  if oid of a and x are same*/
        for(i = 0; i < numoids -1 ; i++) {
                if(arrayofoffset[i][0] == -1)
                        continue;
@@ -890,8 +925,8 @@ int checkPrefetchTuples(int *arrayofoffset[], int *arraylength, short numoids) {
                                        larray = arrayofoffset[j];
                                }
                                /* From first offset until end of tuple compare all offsets of sarray and larray
-                                * if not a match then break  */
-                               for(k = 1 ; slength -1 ; k++) {
+                                * if not a match then break */
+                               for(k = 1 ; k < slength -1 ; k++) {
                                        if(sarray[k] != larray[k]) {
                                                break;
                                        }
@@ -906,3 +941,83 @@ int checkPrefetchTuples(int *arrayofoffset[], int *arraylength, short numoids) {
 
        return 0;
 }
+
+/* This function goes through an array and finds out until what offets 
+ *  * can it make a new pile
+ *   * -1 indicates a special character to mark end of oid, offset tuple*/
+int foundLocal(int *arrayofoffset[], transrecord_t *record, int *array, objheader_t *header, int offset, int index) {
+       int i, j, counter, len;
+       unsigned int oid;
+       void *objcopy;
+       objheader_t *tmp, *objheader;
+       int size;
+
+       /* The oid corresponding to the first offset */
+       oid = (int) (header + sizeof(objheader_t) + offset);
+       /* Repeat for all offset values in the array */
+       for(i = 1; array[i] != -1 ; ) { 
+               /* If oid  is found locally  then insert into cache and continue
+                * with next offset to find the corresponding oid */
+               if((objheader = (objheader_t*) mhashSearch(oid)) != NULL) {
+                       tmp = mhashSearch(oid);
+                       size = sizeof(objheader_t)+classsize[tmp->type];
+                       objcopy = objstrAlloc(record->cache, size);
+                       memcpy(objcopy, (void *)tmp, size);
+                       /* Insert into cache's lookup table */
+                       chashInsert(record->lookupTable, objheader->oid, objcopy);
+                       oid = (int) (tmp + sizeof(objheader_t) + array[i+1]);
+               } else {
+                       /*If oid not found locally then 
+                        *assign the latest oid found as the first element
+                        *of array and copy left over offsets as other
+                        *array elements to build the new prefetch array*/
+
+                       arrayofoffset[index][0] = (int) oid;
+                       counter = i;
+                       len = arrayLength((int *)array[index]);
+                       for( j = 1 ; counter <= len; j++,counter++) {
+                               arrayofoffset[index][j] = array[counter + 1]; 
+                       }
+                       break;
+               }
+               i++;
+       }
+
+       /* If all offsets are found locally, then do not treat this as valid prefetch tuple */
+       if(i == arrayLength((int *)array[index]) - 1) {
+               arrayofoffset[index][0] = -1; //Mark beginning of array as -1
+       }
+       
+       return 0;
+}
+
+void *sendPrefetchReq(void *prefetchtuple) {
+       int sd, i;
+       struct sockaddr_in serv_addr;
+       struct hostent *server;
+       char buffer[RECEIVE_BUFFER_SIZE], control, recvcontrol;
+       char machineip[16], retval;
+
+
+       /* Send Trans Prefetch Request */
+       if ((sd = socket(AF_INET, SOCK_STREAM, 0)) < 0) {
+               perror("Error in socket for TRANS_REQUEST\n");
+               return NULL;
+       }
+       bzero((char*) &serv_addr, sizeof(serv_addr));
+       serv_addr.sin_family = AF_INET;
+       serv_addr.sin_port = htons(LISTEN_PORT);
+       //midtoIP(tdata->mid,machineip);
+//     machineip[15] = '\0';
+//     serv_addr.sin_addr.s_addr = inet_addr(machineip);
+
+       /* Open Connection */
+       if (connect(sd, (struct sockaddr *) &serv_addr, sizeof(struct sockaddr)) < 0) {
+               perror("Error in connect for TRANS_REQUEST\n");
+               return NULL;
+       }
+
+       close(sd);
+       pthread_exit(NULL);
+
+}