d-3:
- gcc -dr -lpthread -g -o d-3 trans.c testd-3.c mlookup.c clookup.c llookup.c dstm.c objstr.c dstmserver.c plookup.c ip.c queue.c prelookup.c
+ gcc -dr -lpthread -g -o d-3 trans.c testd-3.c mlookup.c clookup.c llookup.c dstm.c objstr.c dstmserver.c plookup.c ip.c queue.c prelookup.c mcpileq.c machinepile.c
demsky:
- gcc -DDEBUG -lpthread -g -o demsky dstmserver.c testserver.c plookup.c mlookup.c clookup.c llookup.c dstm.c objstr.c trans.c ip.c queue.c prelookup.c
+ gcc -DDEBUG -lpthread -g -o demsky dstmserver.c testserver.c plookup.c mlookup.c clookup.c llookup.c dstm.c objstr.c trans.c ip.c queue.c prelookup.c mcpileq.c machinepile.c
d-4:
- gcc -lpthread -g -o d-4 trans.c testd-4.c mlookup.c clookup.c llookup.c dstm.c objstr.c dstmserver.c plookup.c ip.c queue.c prelookup.c
+ gcc -lpthread -g -o d-4 trans.c testd-4.c mlookup.c clookup.c llookup.c dstm.c objstr.c dstmserver.c plookup.c ip.c queue.c prelookup.c mcpileq.c machinepile.c
all:
- gcc -lpthread -g -o d-3 trans.c testd-3.c mlookup.c clookup.c llookup.c dstm.c objstr.c dstmserver.c plookup.c ip.c queue.c prelookup.c
- gcc -lpthread -g -o demsky dstmserver.c testserver.c plookup.c mlookup.c clookup.c llookup.c dstm.c objstr.c trans.c ip.c queue.c prelookup.c
- gcc -lpthread -g -o d-4 trans.c testd-4.c mlookup.c clookup.c llookup.c dstm.c objstr.c dstmserver.c plookup.c ip.c queue.c prelookup.c
+ gcc -lpthread -g -o d-3 trans.c testd-3.c mlookup.c clookup.c llookup.c dstm.c objstr.c dstmserver.c plookup.c ip.c queue.c prelookup.c mcpileq.c machinepile.c
+ gcc -lpthread -g -o demsky dstmserver.c testserver.c plookup.c mlookup.c clookup.c llookup.c dstm.c objstr.c trans.c ip.c queue.c prelookup.c mcpileq.c machinepile.c
+ gcc -lpthread -g -o d-4 trans.c testd-4.c mlookup.c clookup.c llookup.c dstm.c objstr.c dstmserver.c plookup.c ip.c queue.c prelookup.c mcpileq.c machinepile.c
mac:
- gcc -DMAC -lpthread -g -o d-3 trans.c testd-3.c mlookup.c clookup.c llookup.c dstm.c objstr.c dstmserver.c plookup.c ip.c queue.c prelookup.c
- gcc -DMAC -lpthread -g -o demsky dstmserver.c testserver.c plookup.c mlookup.c clookup.c llookup.c dstm.c objstr.c trans.c ip.c queue.c prelookup.c
- gcc -DMAC -lpthread -g -o d-4 trans.c testd-4.c mlookup.c clookup.c llookup.c dstm.c objstr.c dstmserver.c plookup.c ip.c queue.c prelookup.c
+ gcc -DMAC -lpthread -g -o d-3 trans.c testd-3.c mlookup.c clookup.c llookup.c dstm.c objstr.c dstmserver.c plookup.c ip.c queue.c prelookup.c mcpileq.c machinepile.c
+ gcc -DMAC -lpthread -g -o demsky dstmserver.c testserver.c plookup.c mlookup.c clookup.c llookup.c dstm.c objstr.c trans.c ip.c queue.c prelookup.c mcpileq.c machinepile.c
+ gcc -DMAC -lpthread -g -o d-4 trans.c testd-4.c mlookup.c clookup.c llookup.c dstm.c objstr.c dstmserver.c plookup.c ip.c queue.c prelookup.c mcpileq.c machinepile.c
clean:
rm -rf d-3 d-4 demsky
#define RECEIVE_BUFFER_SIZE 2048
#define NUM_THREADS 10
#define PREFETCH_CACHE_SIZE 1048576 //1MB
-/*
-#define GET_NTUPLES(x) ((int *)(x + sizeof(prefetchqelem_t)))
-#define GET_PTR_OID(x) ((unsigned int *)(x + sizeof(prefetchqelem_t) + sizeof(int)))
-#define GET_PTR_EOFF(x,n) ((short *)(x + sizeof(prefetchqelem_t) + sizeof(int) + (n*sizeof(unsigned int))))
-#define GET_PTR_ARRYFLD(x,n) ((short *)(x + sizeof(prefetchqelem_t) + sizeof(int) + (n*sizeof(unsigned int)) + (n*sizeof(short))))
-*/
/* Global Variables */
extern int classsize[];
pthread_create(&tPrefetch, NULL, transPrefetch, NULL);
//Create and Initialize a pool of threads
for(t = 0; t< NUM_THREADS; t++) {
- //rc = pthread_create(&wthreads[t], NULL, pfqProcess, (void *)t);
+ // rc = pthread_create(&wthreads[t], NULL, pfqProcess, (void *)t);
if (rc) {
printf("Thread create error %s, %d\n", __FILE__, __LINE__);
return;
return 0;
}
-/* This function is called by the thread calling transPrefetch */
-void *transPrefetch(void *prefdata) {
- int *offstarray = NULL;
- prefetchqelem_t *qnode;
-
- while(1) {
- /* lock mutex of primary prefetch queue */
- pthread_mutex_lock(&pqueue.qlock);
- /* while primary queue is empty, then wait */
- while((pqueue.front == NULL) && (pqueue.rear == NULL)) {
- pthread_cond_wait(&pqueue.qcond, &pqueue.qlock);
- }
-
- /* dequeue node to create a machine piles and finally unlock mutex */
- if((qnode = dequeue()) == NULL) {
- printf("Error: No node returned %s, %d\n", __FILE__, __LINE__);
- return NULL;
- }
- pthread_mutex_unlock(&pqueue.qlock);
- /* Reduce redundant prefetch requests */
- checkPrefetchTuples(qnode);
- /* Check if the tuples are found locally, if yes then reduce them further*/
- /* and group requests by remote machine ids by calling the makePreGroups() */
- foundLocal(qnode);
-
- /* Lock mutex of pool queue */
- pthread_mutex_lock(&mcqueue.qlock);
- /* Update the pool queue with the new remote machine piles generated per prefetch call */
-
-
- /* Broadcast signal on pool queue */
-
- /* Unlock mutex of pool queue */
-
- }
-}
-
/* This function checks if the prefetch oids are same and have same offsets
* for case x.a.b and y.a.b where x and y have same oid's
* or if a.b.c is a subset of x.b.c.d*/
}
/* This function makes machine piles to be added into the machine pile queue for each prefetch call */
-void makePreGroups(prefetchqelem_t *node, int *numoffset) {
+prefetchpile_t *makePreGroups(prefetchqelem_t *node, int *numoffset) {
char *ptr, *tmp;
int ntuples, slength, i, machinenum;
int maxoffset;
/* For each tuple make piles */
if ((machinenum = lhashSearch(oid[i])) == 0) {
printf("Error: No such Machine %s, %d\n", __FILE__, __LINE__);
- return;
+ return NULL;
}
/* Insert into machine pile */
offset = &arryfields[endoffsets[i-1]];
insertPile(machinenum, oid[i], numoffset[i], offset, head);
}
- return;
+ return head;
}
/* This function checks if the oids within the prefetch tuples are available locally.
* If yes then makes the tuple invalid. If no then rearranges oid and offset values in
* the prefetchqelem_t node to represent a new prefetch tuple */
-void foundLocal(prefetchqelem_t *node) {
+prefetchpile_t *foundLocal(prefetchqelem_t *node) {
int ntuples,i, j, k, oidnfound = 0, index, flag;
unsigned int *oid;
unsigned int objoid;
char *ptr, *tmp;
objheader_t *objheader;
short *endoffsets, *arryfields;
+ prefetchpile_t *head = NULL;
ptr = (char *) node;
ntuples = *(GET_NTUPLES(ptr));
/*New offset oid not found */
if((objheader = (objheader_t*) mhashSearch(objoid)) == NULL) {
flag = 1;
- checkPreCache(node, &numoffset, j, numoffset[i], objoid, index, i, oidnfound);
+ checkPreCache(node, numoffset, j, numoffset[i], objoid, index, i, oidnfound);
break;
} else
flag = 0;
} else {
oidnfound = 1;
/* Look in Prefetch cache */
- checkPreCache(node, &numoffset, 0, numoffset[i], oid[i], 0, i, oidnfound);
+ checkPreCache(node, numoffset, 0, numoffset[i], oid[i], 0, i, oidnfound);
}
}
- // Make machine groups
- makePreGroups(node, numoffset);
+ /* Make machine groups */
+ head = makePreGroups(node, numoffset);
+ return head;
}
+/* This function is called by the thread calling transPrefetch */
+void *transPrefetch(void *t) {
+ //int *offstarray = NULL;
+ prefetchqelem_t *qnode;
+ prefetchpile_t *pilehead = NULL;
+
+ while(1) {
+ /* lock mutex of primary prefetch queue */
+ pthread_mutex_lock(&pqueue.qlock);
+ /* while primary queue is empty, then wait */
+ while((pqueue.front == NULL) && (pqueue.rear == NULL)) {
+ pthread_cond_wait(&pqueue.qcond, &pqueue.qlock);
+ }
+
+ /* dequeue node to create a machine piles and finally unlock mutex */
+ if((qnode = dequeue()) == NULL) {
+ printf("Error: No node returned %s, %d\n", __FILE__, __LINE__);
+ return NULL;
+ }
+ pthread_mutex_unlock(&pqueue.qlock);
+ /* Reduce redundant prefetch requests */
+ checkPrefetchTuples(qnode);
+ /* Check if the tuples are found locally, if yes then reduce them further*/
+ /* and group requests by remote machine ids by calling the makePreGroups() */
+ pilehead = foundLocal(qnode);
+
+ /* Lock mutex of pool queue */
+ pthread_mutex_lock(&mcqueue.qlock);
+ /* Update the pool queue with the new remote machine piles generated per prefetch call */
+ mcpileenqueue(pilehead);
+ /* Broadcast signal on pool queue */
+ pthread_cond_broadcast(&mcqueue.qcond);
+ /* Unlock mutex of pool queue */
+ pthread_mutex_unlock(&mcqueue.qlock);
+ }
+}
/*This function is called by the thread that processes the
* prefetch request makes piles to prefetch records and prefetches the oids from remote machines */