Update Makefile
authoradash <adash>
Tue, 31 Jul 2007 00:11:21 +0000 (00:11 +0000)
committeradash <adash>
Tue, 31 Jul 2007 00:11:21 +0000 (00:11 +0000)
Bug fixes for enqueue process of machine pile queue (pool thread)
complete primary prefetch thread processing

Robust/src/Runtime/DSTM/interface/Makefile
Robust/src/Runtime/DSTM/interface/dstm.h
Robust/src/Runtime/DSTM/interface/mcpileq.c
Robust/src/Runtime/DSTM/interface/trans.c

index 050152fdc8e2324b9ec0256807b158c520ea630b..b04fca6df53ee4ac8c378bdbff8c94574dcc9110 100644 (file)
@@ -1,22 +1,22 @@
 d-3:
-       gcc -dr -lpthread -g -o d-3 trans.c testd-3.c mlookup.c clookup.c llookup.c dstm.c objstr.c dstmserver.c plookup.c ip.c queue.c prelookup.c
+       gcc -dr -lpthread -g -o d-3 trans.c testd-3.c mlookup.c clookup.c llookup.c dstm.c objstr.c dstmserver.c plookup.c ip.c queue.c prelookup.c mcpileq.c machinepile.c
 
 demsky:
-       gcc -DDEBUG -lpthread -g -o demsky dstmserver.c testserver.c plookup.c mlookup.c clookup.c llookup.c dstm.c objstr.c trans.c ip.c queue.c prelookup.c 
+       gcc -DDEBUG -lpthread -g -o demsky dstmserver.c testserver.c plookup.c mlookup.c clookup.c llookup.c dstm.c objstr.c trans.c ip.c queue.c prelookup.c mcpileq.c machinepile.c 
 
 d-4:
-       gcc -lpthread -g -o d-4 trans.c testd-4.c mlookup.c clookup.c llookup.c dstm.c objstr.c dstmserver.c plookup.c ip.c queue.c prelookup.c
+       gcc -lpthread -g -o d-4 trans.c testd-4.c mlookup.c clookup.c llookup.c dstm.c objstr.c dstmserver.c plookup.c ip.c queue.c prelookup.c mcpileq.c machinepile.c
 
 all:
-       gcc -lpthread -g -o d-3 trans.c testd-3.c mlookup.c clookup.c llookup.c dstm.c objstr.c dstmserver.c plookup.c ip.c queue.c prelookup.c
-       gcc -lpthread -g -o demsky dstmserver.c testserver.c plookup.c mlookup.c clookup.c llookup.c dstm.c objstr.c trans.c ip.c queue.c prelookup.c
-       gcc -lpthread -g -o d-4 trans.c testd-4.c mlookup.c clookup.c llookup.c dstm.c objstr.c dstmserver.c plookup.c ip.c queue.c prelookup.c
+       gcc -lpthread -g -o d-3 trans.c testd-3.c mlookup.c clookup.c llookup.c dstm.c objstr.c dstmserver.c plookup.c ip.c queue.c prelookup.c mcpileq.c machinepile.c
+       gcc -lpthread -g -o demsky dstmserver.c testserver.c plookup.c mlookup.c clookup.c llookup.c dstm.c objstr.c trans.c ip.c queue.c prelookup.c mcpileq.c machinepile.c
+       gcc -lpthread -g -o d-4 trans.c testd-4.c mlookup.c clookup.c llookup.c dstm.c objstr.c dstmserver.c plookup.c ip.c queue.c prelookup.c mcpileq.c machinepile.c
 
 
 mac:
-       gcc -DMAC -lpthread -g -o d-3 trans.c testd-3.c mlookup.c clookup.c llookup.c dstm.c objstr.c dstmserver.c plookup.c ip.c queue.c prelookup.c
-       gcc -DMAC -lpthread -g -o demsky dstmserver.c testserver.c plookup.c mlookup.c clookup.c llookup.c dstm.c objstr.c trans.c ip.c queue.c prelookup.c
-       gcc -DMAC -lpthread -g -o d-4 trans.c testd-4.c mlookup.c clookup.c llookup.c dstm.c objstr.c dstmserver.c plookup.c ip.c queue.c prelookup.c
+       gcc -DMAC -lpthread -g -o d-3 trans.c testd-3.c mlookup.c clookup.c llookup.c dstm.c objstr.c dstmserver.c plookup.c ip.c queue.c prelookup.c mcpileq.c machinepile.c
+       gcc -DMAC -lpthread -g -o demsky dstmserver.c testserver.c plookup.c mlookup.c clookup.c llookup.c dstm.c objstr.c trans.c ip.c queue.c prelookup.c mcpileq.c machinepile.c
+       gcc -DMAC -lpthread -g -o d-4 trans.c testd-4.c mlookup.c clookup.c llookup.c dstm.c objstr.c dstmserver.c plookup.c ip.c queue.c prelookup.c mcpileq.c machinepile.c
 
 clean:
        rm -rf d-3 d-4 demsky
index 957a68e95bb3b8d81dcfd8438512c78278821ef1..82f4093276c31b6c2ce19ad598e797ba3d8b18fe 100644 (file)
@@ -194,8 +194,8 @@ int transComProcess(trans_commit_data_t *);
 void prefetch(int, unsigned int *, short *, short*);
 void *transPrefetch(void *);
 void checkPrefetchTuples(prefetchqelem_t *);
-void foundLocal(prefetchqelem_t *);
-void makePreGroups(prefetchqelem_t *node);
+prefetchpile_t *foundLocal(prefetchqelem_t *);
+prefetchpile_t *makePreGroups(prefetchqelem_t *, int *);
 void checkPreCache(prefetchqelem_t *, int *, int, int, unsigned int, int, int, int);
 int transPrefetchProcess(transrecord_t *, int **, short);
 void *sendPrefetchReq(void *);
index bbb608d69faabf83b19662239d6dc67d476d2b7f..379ffd271f67872b790740685a77c73f15fef84d 100644 (file)
@@ -10,6 +10,7 @@ void mcpileqInit(void) {
 }
 
 /* Insert to the rear of machine pile queue */
+/*
 void mcpileenqueue(prefetchpile_t *node) {
        if(mcqueue.front == NULL && mcqueue.rear == NULL) {
                mcqueue.front = mcqueue.rear = node;
@@ -19,6 +20,26 @@ void mcpileenqueue(prefetchpile_t *node) {
                mcqueue.rear = node;
        }
 }
+*/
+
+void mcpileenqueue(prefetchpile_t *node) {
+       prefetchpile_t *tmp, *prev;
+       if(mcqueue.front == NULL && mcqueue.rear == NULL) {
+               tmp = mcqueue.front = node;
+               while(tmp != NULL) {
+                       prev = tmp;
+                       tmp = tmp->next;
+               }
+               mcqueue.rear = prev;
+       } else {
+               tmp = mcqueue.rear->next = node;
+               while(tmp != NULL) {
+                       prev = tmp;
+                       tmp = tmp->next;
+               }
+               mcqueue.rear = prev;
+       }
+}
 
 /* Return the node pointed to by the front ptr of the queue */
 prefetchpile_t *mcpiledequeue(void) {
index 2ddf7d2693e4772f138c2d492d04afd1c94e617c..2c6f8f4bb167dfd2faa091a420451be69da74100 100644 (file)
 #define RECEIVE_BUFFER_SIZE 2048
 #define NUM_THREADS 10
 #define PREFETCH_CACHE_SIZE 1048576 //1MB
-/*
-#define GET_NTUPLES(x)         ((int *)(x + sizeof(prefetchqelem_t)))
-#define GET_PTR_OID(x)         ((unsigned int *)(x + sizeof(prefetchqelem_t) + sizeof(int)))
-#define GET_PTR_EOFF(x,n) ((short *)(x + sizeof(prefetchqelem_t) + sizeof(int) + (n*sizeof(unsigned int))))
-#define GET_PTR_ARRYFLD(x,n) ((short *)(x + sizeof(prefetchqelem_t) + sizeof(int) + (n*sizeof(unsigned int)) + (n*sizeof(short))))
-*/
 
 /* Global Variables */
 extern int classsize[];
@@ -103,7 +97,7 @@ void transInit() {
        pthread_create(&tPrefetch, NULL, transPrefetch, NULL);
        //Create and Initialize a pool of threads 
        for(t = 0; t< NUM_THREADS; t++) {
-               //rc = pthread_create(&wthreads[t], NULL, pfqProcess, (void *)t);
+       //      rc = pthread_create(&wthreads[t], NULL, pfqProcess, (void *)t);
                if (rc) {
                        printf("Thread create error %s, %d\n", __FILE__, __LINE__);
                        return;
@@ -914,43 +908,6 @@ int transAbortProcess(void *modptr, unsigned int *objlocked, int numlocked, int
         return 0;
  }
 
-/* This function is called by the thread calling transPrefetch */
-void *transPrefetch(void *prefdata) {
-       int *offstarray = NULL;
-       prefetchqelem_t *qnode;
-
-       while(1) {
-               /* lock mutex of primary prefetch queue */
-               pthread_mutex_lock(&pqueue.qlock);
-               /* while primary queue is empty, then wait */
-               while((pqueue.front == NULL) && (pqueue.rear == NULL)) {
-                       pthread_cond_wait(&pqueue.qcond, &pqueue.qlock);
-               }
-
-               /* dequeue node to create a machine piles and  finally unlock mutex */
-               if((qnode = dequeue()) == NULL) {
-                       printf("Error: No node returned %s, %d\n", __FILE__, __LINE__);
-                       return NULL;
-               }
-               pthread_mutex_unlock(&pqueue.qlock);
-               /* Reduce redundant prefetch requests */
-               checkPrefetchTuples(qnode);
-               /* Check if the tuples are found locally, if yes then reduce them further*/ 
-               /* and group requests by remote machine ids by calling the makePreGroups() */
-               foundLocal(qnode);
-               
-               /* Lock mutex of pool queue */
-               pthread_mutex_lock(&mcqueue.qlock);
-               /* Update the pool queue with the new remote machine piles generated per prefetch call */
-
-
-               /* Broadcast signal on pool queue */
-
-               /* Unlock mutex of pool queue */
-
-       }
-}
-
 /* This function checks if the prefetch oids are same and have same offsets  
  * for case x.a.b and y.a.b where x and y have same oid's
  * or if a.b.c is a subset of x.b.c.d*/ 
@@ -1089,7 +1046,7 @@ void checkPreCache(prefetchqelem_t *node, int *numoffset, int counter, int loopc
 }
 
 /* This function makes machine piles to be added into the machine pile queue for each prefetch call */
-void makePreGroups(prefetchqelem_t *node, int *numoffset) {
+prefetchpile_t *makePreGroups(prefetchqelem_t *node, int *numoffset) {
        char *ptr, *tmp;
        int ntuples, slength, i, machinenum;
        int maxoffset;
@@ -1111,27 +1068,28 @@ void makePreGroups(prefetchqelem_t *node, int *numoffset) {
                /* For each tuple make piles */
                if ((machinenum = lhashSearch(oid[i])) == 0) {
                        printf("Error: No such Machine %s, %d\n", __FILE__, __LINE__);
-                       return;
+                       return NULL;
                }
                /* Insert into machine pile */
                offset = &arryfields[endoffsets[i-1]];
                insertPile(machinenum, oid[i], numoffset[i], offset, head);
        }
 
-       return;
+       return head;
 }
 
 
 /* This function checks if the oids within the prefetch tuples are available locally.
  * If yes then makes the tuple invalid. If no then rearranges oid and offset values in 
  * the prefetchqelem_t node to represent a new prefetch tuple */
-void foundLocal(prefetchqelem_t *node) {
+prefetchpile_t *foundLocal(prefetchqelem_t *node) {
        int ntuples,i, j, k, oidnfound = 0, index, flag;
        unsigned int *oid;
        unsigned int  objoid;
        char *ptr, *tmp;
        objheader_t *objheader;
        short *endoffsets, *arryfields; 
+       prefetchpile_t *head = NULL;
 
        ptr = (char *) node;
        ntuples = *(GET_NTUPLES(ptr));
@@ -1169,7 +1127,7 @@ void foundLocal(prefetchqelem_t *node) {
                                /*New offset oid not found */
                                if((objheader = (objheader_t*) mhashSearch(objoid)) == NULL) {
                                        flag = 1;
-                                       checkPreCache(node, &numoffset, j, numoffset[i], objoid, index, i, oidnfound); 
+                                       checkPreCache(node, numoffset, j, numoffset[i], objoid, index, i, oidnfound); 
                                        break;
                                } else 
                                        flag = 0;
@@ -1183,14 +1141,51 @@ void foundLocal(prefetchqelem_t *node) {
                } else {
                        oidnfound = 1;
                        /* Look in Prefetch cache */
-                       checkPreCache(node, &numoffset, 0, numoffset[i], oid[i], 0, i, oidnfound); 
+                       checkPreCache(node, numoffset, 0, numoffset[i], oid[i], 0, i, oidnfound); 
                }
 
        }
-       // Make machine groups
-       makePreGroups(node, numoffset);
+       /* Make machine groups */
+       head = makePreGroups(node, numoffset);
+       return head;
 }
 
+/* This function is called by the thread calling transPrefetch */
+void *transPrefetch(void *t) {
+       //int *offstarray = NULL;
+       prefetchqelem_t *qnode;
+       prefetchpile_t *pilehead = NULL;
+
+       while(1) {
+               /* lock mutex of primary prefetch queue */
+               pthread_mutex_lock(&pqueue.qlock);
+               /* while primary queue is empty, then wait */
+               while((pqueue.front == NULL) && (pqueue.rear == NULL)) {
+                       pthread_cond_wait(&pqueue.qcond, &pqueue.qlock);
+               }
+
+               /* dequeue node to create a machine piles and  finally unlock mutex */
+               if((qnode = dequeue()) == NULL) {
+                       printf("Error: No node returned %s, %d\n", __FILE__, __LINE__);
+                       return NULL;
+               }
+               pthread_mutex_unlock(&pqueue.qlock);
+               /* Reduce redundant prefetch requests */
+               checkPrefetchTuples(qnode);
+               /* Check if the tuples are found locally, if yes then reduce them further*/ 
+               /* and group requests by remote machine ids by calling the makePreGroups() */
+               pilehead = foundLocal(qnode);
+               
+               /* Lock mutex of pool queue */
+               pthread_mutex_lock(&mcqueue.qlock);
+               /* Update the pool queue with the new remote machine piles generated per prefetch call */
+               mcpileenqueue(pilehead);
+               /* Broadcast signal on pool queue */
+               pthread_cond_broadcast(&mcqueue.qcond);
+               /* Unlock mutex of pool queue */
+               pthread_mutex_unlock(&mcqueue.qlock);
+       }
+}
 
 /*This function is called by the thread that processes the 
  * prefetch request makes piles to prefetch records and prefetches the oids from remote machines */