From 0a36799b9073ae23a0f7915efdeccd6b9292ad3d Mon Sep 17 00:00:00 2001 From: adash Date: Tue, 31 Jul 2007 00:11:21 +0000 Subject: [PATCH] Update Makefile Bug fixes for enqueue process of machine pile queue (pool thread) complete primary prefetch thread processing --- Robust/src/Runtime/DSTM/interface/Makefile | 18 ++-- Robust/src/Runtime/DSTM/interface/dstm.h | 4 +- Robust/src/Runtime/DSTM/interface/mcpileq.c | 21 +++++ Robust/src/Runtime/DSTM/interface/trans.c | 99 ++++++++++----------- 4 files changed, 79 insertions(+), 63 deletions(-) diff --git a/Robust/src/Runtime/DSTM/interface/Makefile b/Robust/src/Runtime/DSTM/interface/Makefile index 050152fd..b04fca6d 100644 --- a/Robust/src/Runtime/DSTM/interface/Makefile +++ b/Robust/src/Runtime/DSTM/interface/Makefile @@ -1,22 +1,22 @@ d-3: - gcc -dr -lpthread -g -o d-3 trans.c testd-3.c mlookup.c clookup.c llookup.c dstm.c objstr.c dstmserver.c plookup.c ip.c queue.c prelookup.c + gcc -dr -lpthread -g -o d-3 trans.c testd-3.c mlookup.c clookup.c llookup.c dstm.c objstr.c dstmserver.c plookup.c ip.c queue.c prelookup.c mcpileq.c machinepile.c demsky: - gcc -DDEBUG -lpthread -g -o demsky dstmserver.c testserver.c plookup.c mlookup.c clookup.c llookup.c dstm.c objstr.c trans.c ip.c queue.c prelookup.c + gcc -DDEBUG -lpthread -g -o demsky dstmserver.c testserver.c plookup.c mlookup.c clookup.c llookup.c dstm.c objstr.c trans.c ip.c queue.c prelookup.c mcpileq.c machinepile.c d-4: - gcc -lpthread -g -o d-4 trans.c testd-4.c mlookup.c clookup.c llookup.c dstm.c objstr.c dstmserver.c plookup.c ip.c queue.c prelookup.c + gcc -lpthread -g -o d-4 trans.c testd-4.c mlookup.c clookup.c llookup.c dstm.c objstr.c dstmserver.c plookup.c ip.c queue.c prelookup.c mcpileq.c machinepile.c all: - gcc -lpthread -g -o d-3 trans.c testd-3.c mlookup.c clookup.c llookup.c dstm.c objstr.c dstmserver.c plookup.c ip.c queue.c prelookup.c - gcc -lpthread -g -o demsky dstmserver.c testserver.c plookup.c mlookup.c clookup.c llookup.c dstm.c objstr.c trans.c ip.c queue.c prelookup.c - gcc -lpthread -g -o d-4 trans.c testd-4.c mlookup.c clookup.c llookup.c dstm.c objstr.c dstmserver.c plookup.c ip.c queue.c prelookup.c + gcc -lpthread -g -o d-3 trans.c testd-3.c mlookup.c clookup.c llookup.c dstm.c objstr.c dstmserver.c plookup.c ip.c queue.c prelookup.c mcpileq.c machinepile.c + gcc -lpthread -g -o demsky dstmserver.c testserver.c plookup.c mlookup.c clookup.c llookup.c dstm.c objstr.c trans.c ip.c queue.c prelookup.c mcpileq.c machinepile.c + gcc -lpthread -g -o d-4 trans.c testd-4.c mlookup.c clookup.c llookup.c dstm.c objstr.c dstmserver.c plookup.c ip.c queue.c prelookup.c mcpileq.c machinepile.c mac: - gcc -DMAC -lpthread -g -o d-3 trans.c testd-3.c mlookup.c clookup.c llookup.c dstm.c objstr.c dstmserver.c plookup.c ip.c queue.c prelookup.c - gcc -DMAC -lpthread -g -o demsky dstmserver.c testserver.c plookup.c mlookup.c clookup.c llookup.c dstm.c objstr.c trans.c ip.c queue.c prelookup.c - gcc -DMAC -lpthread -g -o d-4 trans.c testd-4.c mlookup.c clookup.c llookup.c dstm.c objstr.c dstmserver.c plookup.c ip.c queue.c prelookup.c + gcc -DMAC -lpthread -g -o d-3 trans.c testd-3.c mlookup.c clookup.c llookup.c dstm.c objstr.c dstmserver.c plookup.c ip.c queue.c prelookup.c mcpileq.c machinepile.c + gcc -DMAC -lpthread -g -o demsky dstmserver.c testserver.c plookup.c mlookup.c clookup.c llookup.c dstm.c objstr.c trans.c ip.c queue.c prelookup.c mcpileq.c machinepile.c + gcc -DMAC -lpthread -g -o d-4 trans.c testd-4.c mlookup.c clookup.c llookup.c dstm.c objstr.c dstmserver.c plookup.c ip.c queue.c prelookup.c mcpileq.c machinepile.c clean: rm -rf d-3 d-4 demsky diff --git a/Robust/src/Runtime/DSTM/interface/dstm.h b/Robust/src/Runtime/DSTM/interface/dstm.h index 957a68e9..82f40932 100644 --- a/Robust/src/Runtime/DSTM/interface/dstm.h +++ b/Robust/src/Runtime/DSTM/interface/dstm.h @@ -194,8 +194,8 @@ int transComProcess(trans_commit_data_t *); void prefetch(int, unsigned int *, short *, short*); void *transPrefetch(void *); void checkPrefetchTuples(prefetchqelem_t *); -void foundLocal(prefetchqelem_t *); -void makePreGroups(prefetchqelem_t *node); +prefetchpile_t *foundLocal(prefetchqelem_t *); +prefetchpile_t *makePreGroups(prefetchqelem_t *, int *); void checkPreCache(prefetchqelem_t *, int *, int, int, unsigned int, int, int, int); int transPrefetchProcess(transrecord_t *, int **, short); void *sendPrefetchReq(void *); diff --git a/Robust/src/Runtime/DSTM/interface/mcpileq.c b/Robust/src/Runtime/DSTM/interface/mcpileq.c index bbb608d6..379ffd27 100644 --- a/Robust/src/Runtime/DSTM/interface/mcpileq.c +++ b/Robust/src/Runtime/DSTM/interface/mcpileq.c @@ -10,6 +10,7 @@ void mcpileqInit(void) { } /* Insert to the rear of machine pile queue */ +/* void mcpileenqueue(prefetchpile_t *node) { if(mcqueue.front == NULL && mcqueue.rear == NULL) { mcqueue.front = mcqueue.rear = node; @@ -19,6 +20,26 @@ void mcpileenqueue(prefetchpile_t *node) { mcqueue.rear = node; } } +*/ + +void mcpileenqueue(prefetchpile_t *node) { + prefetchpile_t *tmp, *prev; + if(mcqueue.front == NULL && mcqueue.rear == NULL) { + tmp = mcqueue.front = node; + while(tmp != NULL) { + prev = tmp; + tmp = tmp->next; + } + mcqueue.rear = prev; + } else { + tmp = mcqueue.rear->next = node; + while(tmp != NULL) { + prev = tmp; + tmp = tmp->next; + } + mcqueue.rear = prev; + } +} /* Return the node pointed to by the front ptr of the queue */ prefetchpile_t *mcpiledequeue(void) { diff --git a/Robust/src/Runtime/DSTM/interface/trans.c b/Robust/src/Runtime/DSTM/interface/trans.c index 2ddf7d26..2c6f8f4b 100644 --- a/Robust/src/Runtime/DSTM/interface/trans.c +++ b/Robust/src/Runtime/DSTM/interface/trans.c @@ -21,12 +21,6 @@ #define RECEIVE_BUFFER_SIZE 2048 #define NUM_THREADS 10 #define PREFETCH_CACHE_SIZE 1048576 //1MB -/* -#define GET_NTUPLES(x) ((int *)(x + sizeof(prefetchqelem_t))) -#define GET_PTR_OID(x) ((unsigned int *)(x + sizeof(prefetchqelem_t) + sizeof(int))) -#define GET_PTR_EOFF(x,n) ((short *)(x + sizeof(prefetchqelem_t) + sizeof(int) + (n*sizeof(unsigned int)))) -#define GET_PTR_ARRYFLD(x,n) ((short *)(x + sizeof(prefetchqelem_t) + sizeof(int) + (n*sizeof(unsigned int)) + (n*sizeof(short)))) -*/ /* Global Variables */ extern int classsize[]; @@ -103,7 +97,7 @@ void transInit() { pthread_create(&tPrefetch, NULL, transPrefetch, NULL); //Create and Initialize a pool of threads for(t = 0; t< NUM_THREADS; t++) { - //rc = pthread_create(&wthreads[t], NULL, pfqProcess, (void *)t); + // rc = pthread_create(&wthreads[t], NULL, pfqProcess, (void *)t); if (rc) { printf("Thread create error %s, %d\n", __FILE__, __LINE__); return; @@ -914,43 +908,6 @@ int transAbortProcess(void *modptr, unsigned int *objlocked, int numlocked, int return 0; } -/* This function is called by the thread calling transPrefetch */ -void *transPrefetch(void *prefdata) { - int *offstarray = NULL; - prefetchqelem_t *qnode; - - while(1) { - /* lock mutex of primary prefetch queue */ - pthread_mutex_lock(&pqueue.qlock); - /* while primary queue is empty, then wait */ - while((pqueue.front == NULL) && (pqueue.rear == NULL)) { - pthread_cond_wait(&pqueue.qcond, &pqueue.qlock); - } - - /* dequeue node to create a machine piles and finally unlock mutex */ - if((qnode = dequeue()) == NULL) { - printf("Error: No node returned %s, %d\n", __FILE__, __LINE__); - return NULL; - } - pthread_mutex_unlock(&pqueue.qlock); - /* Reduce redundant prefetch requests */ - checkPrefetchTuples(qnode); - /* Check if the tuples are found locally, if yes then reduce them further*/ - /* and group requests by remote machine ids by calling the makePreGroups() */ - foundLocal(qnode); - - /* Lock mutex of pool queue */ - pthread_mutex_lock(&mcqueue.qlock); - /* Update the pool queue with the new remote machine piles generated per prefetch call */ - - - /* Broadcast signal on pool queue */ - - /* Unlock mutex of pool queue */ - - } -} - /* This function checks if the prefetch oids are same and have same offsets * for case x.a.b and y.a.b where x and y have same oid's * or if a.b.c is a subset of x.b.c.d*/ @@ -1089,7 +1046,7 @@ void checkPreCache(prefetchqelem_t *node, int *numoffset, int counter, int loopc } /* This function makes machine piles to be added into the machine pile queue for each prefetch call */ -void makePreGroups(prefetchqelem_t *node, int *numoffset) { +prefetchpile_t *makePreGroups(prefetchqelem_t *node, int *numoffset) { char *ptr, *tmp; int ntuples, slength, i, machinenum; int maxoffset; @@ -1111,27 +1068,28 @@ void makePreGroups(prefetchqelem_t *node, int *numoffset) { /* For each tuple make piles */ if ((machinenum = lhashSearch(oid[i])) == 0) { printf("Error: No such Machine %s, %d\n", __FILE__, __LINE__); - return; + return NULL; } /* Insert into machine pile */ offset = &arryfields[endoffsets[i-1]]; insertPile(machinenum, oid[i], numoffset[i], offset, head); } - return; + return head; } /* This function checks if the oids within the prefetch tuples are available locally. * If yes then makes the tuple invalid. If no then rearranges oid and offset values in * the prefetchqelem_t node to represent a new prefetch tuple */ -void foundLocal(prefetchqelem_t *node) { +prefetchpile_t *foundLocal(prefetchqelem_t *node) { int ntuples,i, j, k, oidnfound = 0, index, flag; unsigned int *oid; unsigned int objoid; char *ptr, *tmp; objheader_t *objheader; short *endoffsets, *arryfields; + prefetchpile_t *head = NULL; ptr = (char *) node; ntuples = *(GET_NTUPLES(ptr)); @@ -1169,7 +1127,7 @@ void foundLocal(prefetchqelem_t *node) { /*New offset oid not found */ if((objheader = (objheader_t*) mhashSearch(objoid)) == NULL) { flag = 1; - checkPreCache(node, &numoffset, j, numoffset[i], objoid, index, i, oidnfound); + checkPreCache(node, numoffset, j, numoffset[i], objoid, index, i, oidnfound); break; } else flag = 0; @@ -1183,14 +1141,51 @@ void foundLocal(prefetchqelem_t *node) { } else { oidnfound = 1; /* Look in Prefetch cache */ - checkPreCache(node, &numoffset, 0, numoffset[i], oid[i], 0, i, oidnfound); + checkPreCache(node, numoffset, 0, numoffset[i], oid[i], 0, i, oidnfound); } } - // Make machine groups - makePreGroups(node, numoffset); + /* Make machine groups */ + head = makePreGroups(node, numoffset); + return head; } +/* This function is called by the thread calling transPrefetch */ +void *transPrefetch(void *t) { + //int *offstarray = NULL; + prefetchqelem_t *qnode; + prefetchpile_t *pilehead = NULL; + + while(1) { + /* lock mutex of primary prefetch queue */ + pthread_mutex_lock(&pqueue.qlock); + /* while primary queue is empty, then wait */ + while((pqueue.front == NULL) && (pqueue.rear == NULL)) { + pthread_cond_wait(&pqueue.qcond, &pqueue.qlock); + } + + /* dequeue node to create a machine piles and finally unlock mutex */ + if((qnode = dequeue()) == NULL) { + printf("Error: No node returned %s, %d\n", __FILE__, __LINE__); + return NULL; + } + pthread_mutex_unlock(&pqueue.qlock); + /* Reduce redundant prefetch requests */ + checkPrefetchTuples(qnode); + /* Check if the tuples are found locally, if yes then reduce them further*/ + /* and group requests by remote machine ids by calling the makePreGroups() */ + pilehead = foundLocal(qnode); + + /* Lock mutex of pool queue */ + pthread_mutex_lock(&mcqueue.qlock); + /* Update the pool queue with the new remote machine piles generated per prefetch call */ + mcpileenqueue(pilehead); + /* Broadcast signal on pool queue */ + pthread_cond_broadcast(&mcqueue.qcond); + /* Unlock mutex of pool queue */ + pthread_mutex_unlock(&mcqueue.qlock); + } +} /*This function is called by the thread that processes the * prefetch request makes piles to prefetch records and prefetches the oids from remote machines */ -- 2.34.1