From: adash Date: Mon, 30 Jul 2007 19:47:14 +0000 (+0000) Subject: bug fixes and add machine pile queue DS that saves oids and offsets meant for X-Git-Tag: preEdgeChange~508 X-Git-Url: http://demsky.eecs.uci.edu/git/?a=commitdiff_plain;h=ba8111dac1d4c79c8edcc955275df61d11d53831;p=IRC.git bug fixes and add machine pile queue DS that saves oids and offsets meant for remote machines --- diff --git a/Robust/src/Runtime/DSTM/interface/dstm.h b/Robust/src/Runtime/DSTM/interface/dstm.h index c937a57b..957a68e9 100644 --- a/Robust/src/Runtime/DSTM/interface/dstm.h +++ b/Robust/src/Runtime/DSTM/interface/dstm.h @@ -5,6 +5,12 @@ #define MSG_NOSIGNAL 0 #endif +#define GET_NTUPLES(x) ((int *)(x + sizeof(prefetchqelem_t))) +#define GET_PTR_OID(x) ((unsigned int *)(x + sizeof(prefetchqelem_t) + sizeof(int))) +#define GET_PTR_EOFF(x,n) ((short *)(x + sizeof(prefetchqelem_t) + sizeof(int) + (n*sizeof(unsigned int)))) +#define GET_PTR_ARRYFLD(x,n) ((short *)(x + sizeof(prefetchqelem_t) + sizeof(int) + (n*sizeof(unsigned int)) + (n*sizeof(short)))) + + //Coordinator Messages #define READ_REQUEST 1 #define READ_MULT_REQUEST 2 @@ -42,6 +48,7 @@ #include #include "clookup.h" #include "queue.h" +#include "mcpileq.h" #define DEFAULT_OBJ_STORE_SIZE 1048510 //1MB #define TID_LEN 20 @@ -135,27 +142,15 @@ typedef struct objinfo { //Structure for members within prefetch tuples typedef struct member { - short offset; - short index; - struct member *next; - }trans_member_t; - - -//Structure for prefetching tuples generated by teh compiler - typedef struct prefetchpile{ - int mid; - int *oids; - - int **numofarrys; - struct prefetchpile *next; - }prefetchpile_t; - -//Structure per Oid in the prefetch call + short offset; + short index; + struct member *next; +}trans_member_t; /* //Structure that holds the compiler generated prefetch data typedef struct compprefetchdata { - transrecord_t *record; +transrecord_t *record; } compprefetchdata_t; */ diff --git a/Robust/src/Runtime/DSTM/interface/machinepile.c b/Robust/src/Runtime/DSTM/interface/machinepile.c new file mode 100644 index 00000000..58fe1b92 --- /dev/null +++ b/Robust/src/Runtime/DSTM/interface/machinepile.c @@ -0,0 +1,48 @@ +#include "machinepile.h" + +int insertPile(int mid, unsigned int oid, short numoffset, short *offset, prefetchpile_t *head) { + prefetchpile_t *tmp = head; + objpile_t *objnode; + unsigned int *oidarray; + int ntuples; + char found = 0; + + while (tmp != NULL) { + if (tmp->mid == mid) { // Found a match with exsisting machine id + if ((objnode = (objpile_t *) calloc(1, sizeof(objpile_t))) == NULL) { + printf("Calloc error: %s %d\n", __FILE__, __LINE__); + return -1; + } + /* Fill objpiles DS */ + objnode->oid = oid; + objnode->numoffset = numoffset; + objnode->offset = offset; + objnode->next = tmp->objpiles; + tmp->objpiles = objnode; + found = 1; + break; + } + tmp = tmp->next; + } + if (!found) {// Not found => insert new mid DS + if ((tmp = (prefetchpile_t *) calloc(1, sizeof(prefetchpile_t))) == NULL) { + printf("Calloc error: %s %d\n", __FILE__, __LINE__); + return -1; + } + tmp->mid = mid; + if ((objnode = (objpile_t *) calloc(1, sizeof(objpile_t))) == NULL) { + printf("Calloc error: %s %d\n", __FILE__, __LINE__); + return -1; + } + /* Fill objpiles DS */ + objnode->oid = oid; + objnode->numoffset = numoffset; + objnode->offset = offset; + objnode->next = tmp->objpiles; // i.e., objnode->next = NULL; + tmp->objpiles = objnode; + tmp->next = head; + head = tmp; + } + return 0; +} + diff --git a/Robust/src/Runtime/DSTM/interface/machinepile.h b/Robust/src/Runtime/DSTM/interface/machinepile.h new file mode 100644 index 00000000..b8ca3d69 --- /dev/null +++ b/Robust/src/Runtime/DSTM/interface/machinepile.h @@ -0,0 +1,10 @@ +#ifndef _MACHINEPILE_H_ +#define _MACHINEPILE_H_ + +#include "mcpileq.h" +#include +#include + +int insertPile(int, unsigned int, short, short *, prefetchpile_t *); + +#endif diff --git a/Robust/src/Runtime/DSTM/interface/mcpileq.c b/Robust/src/Runtime/DSTM/interface/mcpileq.c new file mode 100644 index 00000000..bbb608d6 --- /dev/null +++ b/Robust/src/Runtime/DSTM/interface/mcpileq.c @@ -0,0 +1,77 @@ +#include "mcpileq.h" + +mcpileq_t mcqueue; + +void mcpileqInit(void) { + /* Initialize machine queue that containing prefetch oids and offset values sorted by remote machineid */ + mcqueue.front = mcqueue.rear = NULL; + pthread_mutex_init(&mcqueue.qlock, NULL); + pthread_cond_init(&mcqueue.qcond, NULL); +} + +/* Insert to the rear of machine pile queue */ +void mcpileenqueue(prefetchpile_t *node) { + if(mcqueue.front == NULL && mcqueue.rear == NULL) { + mcqueue.front = mcqueue.rear = node; + } else { + node->next = NULL; + mcqueue.rear->next = node; + mcqueue.rear = node; + } +} + +/* Return the node pointed to by the front ptr of the queue */ +prefetchpile_t *mcpiledequeue(void) { + prefetchpile_t *retnode; + if(mcqueue.front == NULL) { + printf("Machune pile queue empty: Underfloe %s %d\n", __FILE__, __LINE__); + return NULL; + } + retnode = mcqueue.front; + mcqueue.front = mcqueue.front->next; + + return retnode; +} + +/* Delete the node pointed to by the front ptr of the queue */ +void delnode() { + prefetchpile_t *delnode; + if((mcqueue.front == NULL) && (mcqueue.rear == NULL)) { + printf("The queue is empty: UNDERFLOW %s, %d\n", __FILE__, __LINE__); + return; + } else if ((mcqueue.front == mcqueue.rear) && mcqueue.front != NULL && mcqueue.rear != NULL) { + printf("TEST1\n"); + free(mcqueue.front); + mcqueue.front = mcqueue.rear = NULL; + } else { + delnode = mcqueue.front; + mcqueue.front = mcqueue.front->next; + printf("TEST2\n"); + free(delnode); + } +} + +void mcpiledelete(void) { + /* Remove each element */ + while(mcqueue.front != NULL) + delqnode(); + mcqueue.front = mcqueue.rear = NULL; +} + + +void mcpiledisplay() { + int mid; + + prefetchpile_t *tmp = mcqueue.front; + while(tmp != NULL) { + printf("Remote machine id = %d\n", tmp->mid); + tmp = tmp->next; + } +} + + + + + + + diff --git a/Robust/src/Runtime/DSTM/interface/mcpileq.h b/Robust/src/Runtime/DSTM/interface/mcpileq.h new file mode 100644 index 00000000..7add8deb --- /dev/null +++ b/Robust/src/Runtime/DSTM/interface/mcpileq.h @@ -0,0 +1,37 @@ +#ifndef _MCPILEQ_H_ +#define _MCPILEQ_H_ + +#include +#include +#include +#include + +//Structure to make machine groups when prefetching +typedef struct objpile { + unsigned int oid; + short numoffset; + short *offset; + struct objpile *next; +}objpile_t; + +//Structure for prefetching tuples generated by the compiler +typedef struct prefetchpile { + int mid; + objpile_t *objpiles; + struct prefetchpile *next; +}prefetchpile_t; + +typedef struct mcpileq { + prefetchpile_t *front, *rear; + pthread_mutex_t qlock; + pthread_cond_t qcond; +}mcpileq_t; + +void mcpileqInit(void); +void mcpileenqueue(prefetchpile_t *); +prefetchpile_t *mcpiledequeue(void); +void delnode(); +void mcpiledelete(); +void mcpiledisplay(); + +#endif diff --git a/Robust/src/Runtime/DSTM/interface/queue.c b/Robust/src/Runtime/DSTM/interface/queue.c index f164528f..298e0d11 100644 --- a/Robust/src/Runtime/DSTM/interface/queue.c +++ b/Robust/src/Runtime/DSTM/interface/queue.c @@ -3,13 +3,13 @@ primarypfq_t pqueue; //Global queue void queueInit(void) { - /* Intitialize primary thread */ + /* Intitialize primary queue */ pqueue.front = pqueue.rear = NULL; pthread_mutex_init(&pqueue.qlock, NULL); pthread_cond_init(&pqueue.qcond, NULL); } -/* Removes the first element of the queue */ +/* Delete the node pointed to by the front ptr of the queue */ void delqnode() { prefetchqelem_t *delnode; if((pqueue.front == NULL) && (pqueue.rear == NULL)) { @@ -45,6 +45,7 @@ void enqueue(prefetchqelem_t *qnode) { } } +/* Return the node pointed to by the front ptr of the queue */ prefetchqelem_t *dequeue(void) { prefetchqelem_t *retnode; if (pqueue.front == NULL) { @@ -52,7 +53,6 @@ prefetchqelem_t *dequeue(void) { return NULL; } retnode = pqueue.front; - //TODO make this atomic pqueue.front = pqueue.front->next; return retnode; diff --git a/Robust/src/Runtime/DSTM/interface/trans.c b/Robust/src/Runtime/DSTM/interface/trans.c index 2bf2d019..2ddf7d26 100644 --- a/Robust/src/Runtime/DSTM/interface/trans.c +++ b/Robust/src/Runtime/DSTM/interface/trans.c @@ -1,6 +1,7 @@ #include "dstm.h" #include "ip.h" #include "clookup.h" +#include "machinepile.h" #include "mlookup.h" #include "llookup.h" #include "plookup.h" @@ -20,15 +21,17 @@ #define RECEIVE_BUFFER_SIZE 2048 #define NUM_THREADS 10 #define PREFETCH_CACHE_SIZE 1048576 //1MB - +/* #define GET_NTUPLES(x) ((int *)(x + sizeof(prefetchqelem_t))) #define GET_PTR_OID(x) ((unsigned int *)(x + sizeof(prefetchqelem_t) + sizeof(int))) #define GET_PTR_EOFF(x,n) ((short *)(x + sizeof(prefetchqelem_t) + sizeof(int) + (n*sizeof(unsigned int)))) #define GET_PTR_ARRYFLD(x,n) ((short *)(x + sizeof(prefetchqelem_t) + sizeof(int) + (n*sizeof(unsigned int)) + (n*sizeof(short)))) +*/ /* Global Variables */ extern int classsize[]; extern primarypfq_t pqueue; // shared prefetch queue +extern mcpileq_t mcqueue; //Shared queue containing prefetch requests sorted by remote machineids pthread_t wthreads[NUM_THREADS]; //Worker threads for working on the prefetch queue pthread_t tPrefetch; extern objstr_t *mainobjstore; @@ -41,6 +44,16 @@ inline int arrayLength(int *array) { ; return i; } +inline int findmax(int *array, int arraylength) { + int max, i; + max = array[0]; + for(i = 0; i < arraylength; i++){ + if(array[i] > max) { + max = array[i]; + } + } + return max; +} /* This function is a prefetch call generated by the compiler that * populates the shared primary prefetch queue*/ void prefetch(int ntuples, unsigned int *oids, short *endoffsets, short *arrayfields) { @@ -84,6 +97,8 @@ void transInit() { return; //Failure //Initialize primary shared queue queueInit(); + //Initialize machine pile w/prefetch oids and offsets shared queue + mcpileqInit(); //Create the primary prefetch thread pthread_create(&tPrefetch, NULL, transPrefetch, NULL); //Create and Initialize a pool of threads @@ -919,9 +934,19 @@ void *transPrefetch(void *prefdata) { } pthread_mutex_unlock(&pqueue.qlock); /* Reduce redundant prefetch requests */ - /* Group Requests by where objects are located */ - + checkPrefetchTuples(qnode); + /* Check if the tuples are found locally, if yes then reduce them further*/ + /* and group requests by remote machine ids by calling the makePreGroups() */ + foundLocal(qnode); + + /* Lock mutex of pool queue */ + pthread_mutex_lock(&mcqueue.qlock); + /* Update the pool queue with the new remote machine piles generated per prefetch call */ + + /* Broadcast signal on pool queue */ + + /* Unlock mutex of pool queue */ } } @@ -984,24 +1009,19 @@ void checkPrefetchTuples(prefetchqelem_t *node) { k++; } } else { - printf("i = %d, j = %d\n", i, j); k = endoffsets[i-1]; index = endoffsets[j-1]; printf("Value of slength = %d\n", slength); for(count = 0; count < slength; count++) { - printf("Value of count =%d\n", count); if(arryfields[k] != arryfields[index]) { break; } index++; k++; } - printf("Value of count =%d\n", count); } - printf("The value of sindex = %d\n", sindex); if(slength == count) { - printf("DEBUG-> Inside slength if %d\n", sindex); oid[sindex] = -1; } } @@ -1058,22 +1078,50 @@ void checkPreCache(prefetchqelem_t *node, int *numoffset, int counter, int loopc * and copy left over offsets into the arrayoffsetfieldarray*/ oid[iter] = objoid; numoffset[iter] = numoffset[iter] - (i+1); - if(iter == 0) - endoffsets[iter] = numoffset[iter]; - else - endoffsets[iter] = numoffset[iter] + endoffsets[iter - 1]; for(k = 0; k < numoffset[iter] ; k++) { - arryfields[k] = arryfields[counter+1]; - counter++; + arryfields[endoffsets[counter]+k] = arryfields[endoffsets[counter]+k+1]; } if(flag == 0) { oid[iter] = -1; numoffset[iter] = 0; - endoffsets[iter] = 0; } } +/* This function makes machine piles to be added into the machine pile queue for each prefetch call */ +void makePreGroups(prefetchqelem_t *node, int *numoffset) { + char *ptr, *tmp; + int ntuples, slength, i, machinenum; + int maxoffset; + unsigned int *oid; + short *endoffsets, *arryfields, *offset; + prefetchpile_t *head = NULL; + + /* Check for the case x.y.z and a.b.c are same oids */ + ptr = (char *) node; + ntuples = *(GET_NTUPLES(ptr)); + oid = GET_PTR_OID(ptr); + endoffsets = GET_PTR_EOFF(ptr, ntuples); + arryfields = GET_PTR_ARRYFLD(ptr, ntuples); + + /* Check for redundant tuples by comparing oids of each tuple */ + for(i = 0; i < ntuples; i++) { + if(oid[i] == -1) + continue; + /* For each tuple make piles */ + if ((machinenum = lhashSearch(oid[i])) == 0) { + printf("Error: No such Machine %s, %d\n", __FILE__, __LINE__); + return; + } + /* Insert into machine pile */ + offset = &arryfields[endoffsets[i-1]]; + insertPile(machinenum, oid[i], numoffset[i], offset, head); + } + + return; +} + + /* This function checks if the oids within the prefetch tuples are available locally. * If yes then makes the tuple invalid. If no then rearranges oid and offset values in * the prefetchqelem_t node to represent a new prefetch tuple */ @@ -1110,75 +1158,39 @@ void foundLocal(prefetchqelem_t *node) { index = endoffsets[i - 1]; for(j = 0 ; j < numoffset[i] ; j++) { objoid = *(tmp + sizeof(objheader_t) + arryfields[index]); + /*If oid found locally then + *assign the latest oid found as the new oid + *and copy left over offsets into the arrayoffsetfieldarray*/ + oid[i] = objoid; + numoffset[i] = numoffset[i] - (j+1); + for(k = 0; k < numoffset[i]; k++) + arryfields[endoffsets[j]+ k] = arryfields[endoffsets[j]+k+1]; index++; /*New offset oid not found */ if((objheader = (objheader_t*) mhashSearch(objoid)) == NULL) { flag = 1; - checkPreCache(node, numoffset, j, numoffset[i], objoid, index, i, oidnfound); + checkPreCache(node, &numoffset, j, numoffset[i], objoid, index, i, oidnfound); break; } else flag = 0; } - /*If oid not found locally then - *assign the latest oid found as the new oid - *and copy left over offsets into the arrayoffsetfieldarray*/ - oid[i] = objoid; - numoffset[i] = numoffset[i] - (j+1); - if(i == 0) - endoffsets[i] = numoffset[i]; - else - endoffsets[i] = numoffset[i] - endoffsets[i - 1]; - for(k = 0; k < numoffset[i]; k++) { - arryfields[k] = arryfields[j+1]; - j++; - } + /*If all offset oids are found locally,make the prefetch tuple invalid */ if(flag == 0) { oid[i] = -1; numoffset[i] = 0; - endoffsets[i] = 0; } } else { oidnfound = 1; /* Look in Prefetch cache */ - checkPreCache(node, numoffset, 0, numoffset[i], oid[i], 0, i, oidnfound); + checkPreCache(node, &numoffset, 0, numoffset[i], oid[i], 0, i, oidnfound); } } + // Make machine groups + makePreGroups(node, numoffset); } -void makePreGroups(prefetchqelem_t *node) { - char *ptr, *tmp; - int ntuples, slength, i, machinenum; - unsigned int *oid; - short *endoffsets, *arryfields; - - - /* Check for the case x.y.z and a.b.c are same oids */ - ptr = (char *) node; - ntuples = *(GET_NTUPLES(ptr)); - oid = GET_PTR_OID(ptr); - endoffsets = GET_PTR_EOFF(ptr, ntuples); - arryfields = GET_PTR_ARRYFLD(ptr, ntuples); - /* Find offset length for each tuple */ - int numoffset[ntuples]; - numoffset[0] = endoffsets[0]; - for(i = 1; i