From 7792d3e433322d0391a97202760576166c3d4281 Mon Sep 17 00:00:00 2001 From: adash Date: Wed, 4 Jul 2007 00:45:30 +0000 Subject: [PATCH] untested code for prefetch request a) check if part of prefetch request available locally b) Spawn threads for each new connection to send prefetch request --- Robust/src/Runtime/DSTM/interface/trans.c | 161 ++++++++++++++++++---- 1 file changed, 138 insertions(+), 23 deletions(-) diff --git a/Robust/src/Runtime/DSTM/interface/trans.c b/Robust/src/Runtime/DSTM/interface/trans.c index 573d221d..ad30d377 100644 --- a/Robust/src/Runtime/DSTM/interface/trans.c +++ b/Robust/src/Runtime/DSTM/interface/trans.c @@ -20,7 +20,7 @@ extern int classsize[]; objstr_t *mainobjstore; plistnode_t *createPiles(transrecord_t *); -int checkPrefetchTuples(int **, int *, short); +//int checkPrefetchTuples(int **, int *, short); inline int arrayLength(int *array) { int i; @@ -64,10 +64,10 @@ objheader_t *transRead(transrecord_t *record, unsigned int oid) return(objheader); } else if ((objheader = (objheader_t *) mhashSearch(oid)) != NULL) { /* Look up in machine lookup table and copy into cache*/ - tmp = mhashSearch(oid); +// tmp = mhashSearch(oid); size = sizeof(objheader_t)+classsize[tmp->type]; objcopy = objstrAlloc(record->cache, size); - memcpy(objcopy, (void *)tmp, size); + memcpy(objcopy, (void *)objheader, size); /* Insert into cache's lookup table */ chashInsert(record->lookupTable, objheader->oid, objcopy); return(objcopy); @@ -235,7 +235,6 @@ int transCommit(transrecord_t *record) { thread_data_array[threadnum].threshold = &tcond; thread_data_array[threadnum].lock = &tlock; thread_data_array[threadnum].count = &trecvcount; - //thread_data_array[threadnum].localstatus = &localstat; thread_data_array[threadnum].replyctrl = &treplyctrl; thread_data_array[threadnum].replyretry = &treplyretry; thread_data_array[threadnum].rec = record; @@ -819,14 +818,14 @@ int transAbortProcess(void *modptr, unsigned int *objlocked, int numlocked, int } /*This function makes piles to prefetch records and prefetches the oids from remote machines */ -int transPrefetch(int *arrayofoffset[], short numoids){ - int i, k; +int transPrefetch(transrecord_t *record, int *arrayofoffset[], short numoids){ + int i, k = 0, rc; int arraylength[numoids]; unsigned int machinenumber; objheader_t *tmp, *objheader; void *objcopy; int size; - void *buf; + pthread_attr_t attr; /* Given tuple find length of tuple*/ for(i = 0; i < numoids ; i++) { @@ -834,31 +833,67 @@ int transPrefetch(int *arrayofoffset[], short numoids){ } /* Check for similar tuples or other special case tuples that can be combined to a * prefetch message*/ - checkPrefetchTuples(arrayofoffset, arraylength, numoids); - + if(checkPrefetchTuples(arrayofoffset, arraylength, numoids) != 0) { + printf("Error on checkPrefetchTuples at %s, %d\n", __FILE__, __LINE__); + return 1; + } /* Check if part of prefetch request available locally */ for(i = 0; i < numoids ; i++) { - while(arrayofoffsets[i][k] != -1) { - if((objheader = (objheader_t *) mhashSearch(arrayofoffsets[i][k])) != NULL) { + if(arrayofoffset[i][0] != -1) { + if((objheader = (objheader_t *) mhashSearch(arrayofoffset[i][k])) != NULL) { /* Look up in machine lookup table and copy into cache*/ - tmp = mhashSearch(oid); + //tmp = mhashSearch(oid); size = sizeof(objheader_t)+classsize[tmp->type]; objcopy = objstrAlloc(record->cache, size); - memcpy(objcopy, (void *)tmp, size); + memcpy(objcopy, (void *)objheader, size); /* Insert into cache's lookup table */ chashInsert(record->lookupTable, objheader->oid, objcopy); /* Find the offset field*/ - oid = (int)(tmp + sizeof(objheader_t) + arrayofoffsets[i][k+1]); - } else { /* If not found in machine look up */ - /* Get the object from the remote location */ - machinenumber = lhashSearch(oid); - /* Create Machine Piles t send prefetch requests use threads*/ - /* For each Pile in the machine send TRANS_PREFETCH */ + if(foundLocal(arrayofoffset, record, arrayofoffset[i], tmp, arrayofoffset[i][k+1], i) != 0 ) { + printf("Error in foundLocal() %s, %d\n", __FILE__, __LINE__); + return 1; + } + } else + continue; + } else + continue; + } + + /* Initialize and set thread attributes + * Spawn a thread for each prefetch request sent*/ + pthread_t thread[numoids]; + pthread_attr_init(&attr); + pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_JOINABLE); + + /* Create Machine Piles to send prefetch requests use threads*/ + for( i = 0 ; i< numoids ; i++) { + if(arrayofoffset[i][0] == -1) + continue; + else{ + /* For each Pile in the machine send TRANS_PREFETCH */ + //makePiles(arrayofoffset, numoids); + /* Fill thread data structure */ + rc = pthread_create(&thread[i] , &attr, sendPrefetchReq, (void *) arrayofoffset[i]); + if (rc) { + perror("Error in pthread create at transPrefetch()\n"); + return 1; } - k = k+1; + + } + } + + /* Free attribute and wait to join other threads */ + for (i = 0 ;i < numoids ; i++) { + rc = pthread_join(thread[i], NULL); + if (rc) { + perror("Error pthread_join() in transPrefetch()\n"); + return 1; } } + pthread_attr_destroy(&attr); + + return 0; } @@ -870,7 +905,7 @@ int checkPrefetchTuples(int *arrayofoffset[], int *arraylength, short numoids) { * for case x.a.b and y.a.b where x and y have same oid's * or if a.b.c is a subset of x.b.c.d*/ /* check for case where the generated request a.y.z or x.y.z.g then - * prefetch needs to be generated for x.y.z.g */ + * prefetch needs to be generated for x.y.z.g if oid of a and x are same*/ for(i = 0; i < numoids -1 ; i++) { if(arrayofoffset[i][0] == -1) continue; @@ -890,8 +925,8 @@ int checkPrefetchTuples(int *arrayofoffset[], int *arraylength, short numoids) { larray = arrayofoffset[j]; } /* From first offset until end of tuple compare all offsets of sarray and larray - * if not a match then break */ - for(k = 1 ; slength -1 ; k++) { + * if not a match then break */ + for(k = 1 ; k < slength -1 ; k++) { if(sarray[k] != larray[k]) { break; } @@ -906,3 +941,83 @@ int checkPrefetchTuples(int *arrayofoffset[], int *arraylength, short numoids) { return 0; } + +/* This function goes through an array and finds out until what offets + * * can it make a new pile + * * -1 indicates a special character to mark end of oid, offset tuple*/ +int foundLocal(int *arrayofoffset[], transrecord_t *record, int *array, objheader_t *header, int offset, int index) { + int i, j, counter, len; + unsigned int oid; + void *objcopy; + objheader_t *tmp, *objheader; + int size; + + /* The oid corresponding to the first offset */ + oid = (int) (header + sizeof(objheader_t) + offset); + /* Repeat for all offset values in the array */ + for(i = 1; array[i] != -1 ; ) { + /* If oid is found locally then insert into cache and continue + * with next offset to find the corresponding oid */ + if((objheader = (objheader_t*) mhashSearch(oid)) != NULL) { + tmp = mhashSearch(oid); + size = sizeof(objheader_t)+classsize[tmp->type]; + objcopy = objstrAlloc(record->cache, size); + memcpy(objcopy, (void *)tmp, size); + /* Insert into cache's lookup table */ + chashInsert(record->lookupTable, objheader->oid, objcopy); + oid = (int) (tmp + sizeof(objheader_t) + array[i+1]); + } else { + /*If oid not found locally then + *assign the latest oid found as the first element + *of array and copy left over offsets as other + *array elements to build the new prefetch array*/ + + arrayofoffset[index][0] = (int) oid; + counter = i; + len = arrayLength((int *)array[index]); + for( j = 1 ; counter <= len; j++,counter++) { + arrayofoffset[index][j] = array[counter + 1]; + } + break; + } + i++; + } + + /* If all offsets are found locally, then do not treat this as valid prefetch tuple */ + if(i == arrayLength((int *)array[index]) - 1) { + arrayofoffset[index][0] = -1; //Mark beginning of array as -1 + } + + return 0; +} + +void *sendPrefetchReq(void *prefetchtuple) { + int sd, i; + struct sockaddr_in serv_addr; + struct hostent *server; + char buffer[RECEIVE_BUFFER_SIZE], control, recvcontrol; + char machineip[16], retval; + + + /* Send Trans Prefetch Request */ + if ((sd = socket(AF_INET, SOCK_STREAM, 0)) < 0) { + perror("Error in socket for TRANS_REQUEST\n"); + return NULL; + } + bzero((char*) &serv_addr, sizeof(serv_addr)); + serv_addr.sin_family = AF_INET; + serv_addr.sin_port = htons(LISTEN_PORT); + //midtoIP(tdata->mid,machineip); +// machineip[15] = '\0'; +// serv_addr.sin_addr.s_addr = inet_addr(machineip); + + /* Open Connection */ + if (connect(sd, (struct sockaddr *) &serv_addr, sizeof(struct sockaddr)) < 0) { + perror("Error in connect for TRANS_REQUEST\n"); + return NULL; + } + + close(sd); + pthread_exit(NULL); + +} -- 2.34.1