prefetch queue implementation with linked list
authoradash <adash>
Fri, 13 Jul 2007 08:02:13 +0000 (08:02 +0000)
committeradash <adash>
Fri, 13 Jul 2007 08:02:13 +0000 (08:02 +0000)
and prefetch call
TODO : some more testing

Robust/src/Runtime/DSTM/interface/trans.c

index ad30d377cd4e96be4d761e9053b563d32288abb6..d36b585b80bc42593421ccf935c4997790aa5812 100644 (file)
@@ -4,23 +4,26 @@
 #include "mlookup.h"
 #include "llookup.h"
 #include "plookup.h"
-#include<pthread.h>
-#include<sys/types.h>
-#include<sys/socket.h>
-#include<netdb.h>
-#include<netinet/in.h>
+//#include "queue.h"
+#include <pthread.h>
+#include <sys/types.h>
+#include <sys/socket.h>
+#include <netdb.h>
+#include <netinet/in.h>
 #include <sys/types.h>
 #include <unistd.h>
 #include <time.h>
+#include <string.h>
 
 #define LISTEN_PORT 2156
-#define MACHINE_IP "127.0.0.1"
 #define RECEIVE_BUFFER_SIZE 2048
 
+/* Global Variables */
 extern int classsize[];
+//extern prefetchthreadqueue_t pfqueue; // Shared prefetch queue
 objstr_t *mainobjstore;
 plistnode_t *createPiles(transrecord_t *);
-//int checkPrefetchTuples(int **, int *, short);
+pthread_t tPrefetch;
 
 inline int arrayLength(int *array) {
        int i;
@@ -29,6 +32,139 @@ inline int arrayLength(int *array) {
        return i;
 }
 
+// DS that contains information to be shared between threads.
+typedef struct prefetchqelem {
+       struct prefetchqelem *next;
+} prefetchqelem_t;
+
+typedef struct primarypfq {
+       prefetchqelem_t *front, *rear;
+       pthread_mutex_t qlock;
+       pthread_cond_t qcond;
+} primarypfq_t; 
+
+primarypfq_t pqueue; //Global queue
+int flag; //Global flag
+
+void prefetch(int ntuples, unsigned int *oids, short *endoffsets, short *arrayfields);
+
+
+void queueInit(void) {
+       pqueue.front = pqueue.rear = NULL;
+       pthread_mutex_init(&pqueue.qlock, NULL);
+       pthread_cond_init(&pqueue.qcond, NULL);
+}
+
+/* Removes the first element of the queue */
+void delqnode() {
+       prefetchqelem_t *delnode;
+       if((pqueue.front == NULL) && (pqueue.rear == NULL)) {
+               printf("The queue is empty: UNDERFLOW %s, %d\n", __FILE__, __LINE__);
+               return;
+       } else if (pqueue.front == pqueue.rear) {
+               free(pqueue.front);
+               pqueue.front = pqueue.rear = NULL;
+       } else {
+               delnode = pqueue.front;
+               pqueue.front = pqueue.front->next;
+               free(delnode);
+       }
+}
+
+void queueDelete(void) {
+       /* Remove each element */
+       while(pqueue.front != NULL)
+               delqnode();
+       pqueue.front = pqueue.rear = NULL;
+}
+
+/* Inserts to the rear of primary prefetch queue */
+void enqueue(prefetchqelem_t *qnode) {
+       if(pqueue.front == NULL && pqueue.rear == NULL) {
+               pqueue.front = pqueue.rear = qnode;
+       } else {
+               qnode->next = NULL;
+               pqueue.rear->next = qnode;
+               pqueue.rear = qnode;
+       }
+}
+
+prefetchqelem_t *dequeue(void) {
+       prefetchqelem_t *retnode;
+
+       if (pqueue.front == NULL) {
+               printf("Queue empty: Underflow %s, %d\n", __FILE__, __LINE__);
+               return NULL;
+       }
+       retnode = pqueue.front;
+       //TODO make this atomic
+       pqueue.front = pqueue.front->next;
+
+       return retnode;
+}
+
+void queuedisplay(primarypfq_t pqueue) {
+       while(pqueue.front != NULL) {
+               /*
+               int *ptr = (int *)(pqueue + offset);
+               int oid = *ptr;
+               printf("oid = %d", oid);
+               //printf("Oid = %d", pqueue.front->oids[0]);
+               pqueue.front = pqueue.front->next;
+               */
+       }
+}
+
+void prefetch(int ntuples, unsigned int *oids, short *endoffsets, short *arrayfields) {
+       int qnodesize;
+       int len = 0;
+       /* Create element for the queue */
+       prefetchqelem_t *node;
+       qnodesize = sizeof(prefetchqelem_t) + sizeof(int) + ntuples * (sizeof(short) + sizeof(unsigned int)) + endoffsets[ntuples -1] * sizeof(short); 
+       if((node = calloc(1,qnodesize)) == NULL) {
+               printf("Calloc Error %s, %d\n", __FILE__, __LINE__);
+               return;
+       }
+       len = sizeof(prefetchqelem_t);
+       memcpy(node + len, &ntuples, sizeof(int));
+       len += sizeof(int);
+       memcpy(node + len, oids, ntuples*sizeof(unsigned int));
+       len += ntuples * sizeof(unsigned int);
+       memcpy(node + len, endoffsets, ntuples*sizeof(short));
+       len += ntuples * sizeof(short);
+       memcpy(node + len, arrayfields, endoffsets[ntuples-1]*sizeof(short));
+
+       pthread_mutex_lock(&pqueue.qlock);
+       /* Check if primary queue thread operating on this then wait */
+       /*
+       if(flag) {
+               //TODO
+       }
+       */
+       enqueue(node);
+
+       pthread_mutex_unlock(&pqueue.qlock);
+}
+
+
+/* This function initiates the prefetch thread
+ * A queue is shared between the main thread of execution
+ * and the prefetch thread to process the prefetch call
+ * Call from compiler populates the shared queue with prefetch requests while prefetch thread
+ * processes the prefetch requests */
+void transInit() {
+       //Initialize shared queue
+       queueInit();
+       //Create the prefetch thread 
+       pthread_create(&tPrefetch, NULL, transPrefetch, NULL);
+}
+
+/* This function stops the threads spawned */
+void transExit() {
+       pthread_cancel(tPrefetch);
+       return;
+}
+
 /* This functions inserts randowm wait delays in the order of msec */
 void randomdelay(void)
 {
@@ -42,11 +178,13 @@ void randomdelay(void)
        return;
 }
 
+/* This function initializes things required in the transaction start*/
 transrecord_t *transStart()
 {
        transrecord_t *tmp = malloc(sizeof(transrecord_t));
        tmp->cache = objstrCreate(1048576);
        tmp->lookupTable = chashCreate(HASH_SIZE, LOADFACTOR);
+       
        return tmp;
 }
 
@@ -60,7 +198,7 @@ objheader_t *transRead(transrecord_t *record, unsigned int oid)
        int size;
        void *buf;
                /* Search local cache */
-       if((objheader =(objheader_t *)chashSearch(record->lookupTable, oid)) != NULL){
+       if((objheader = (objheader_t *)chashSearch(record->lookupTable, oid)) != NULL){
                return(objheader);
        } else if ((objheader = (objheader_t *) mhashSearch(oid)) != NULL) {
                /* Look up in machine lookup table  and copy  into cache*/
@@ -86,6 +224,7 @@ objheader_t *transRead(transrecord_t *record, unsigned int oid)
                }
        } 
 }
+
 /* This function creates objects in the transaction record */
 objheader_t *transCreateObj(transrecord_t *record, unsigned short type)
 {
@@ -99,6 +238,7 @@ objheader_t *transCreateObj(transrecord_t *record, unsigned short type)
        chashInsert(record->lookupTable, tmp->oid, tmp);
        return tmp;
 }
+
 /* This function creates machine piles based on all machines involved in a
  * transaction commit request */
 plistnode_t *createPiles(transrecord_t *record) {
@@ -147,9 +287,9 @@ plistnode_t *createPiles(transrecord_t *record) {
                        curr = next;
                }
        }
-
        return pile; 
 }
+
 /* This function initiates the transaction commit process
  * Spawns threads for each of the new connections with Participants 
  * and creates new piles by calling the createPiles(),
@@ -239,13 +379,13 @@ int transCommit(transrecord_t *record) {
                thread_data_array[threadnum].replyretry = &treplyretry;
                thread_data_array[threadnum].rec = record;
                /* If local do not create any extra connection */
-               if(pile->local != 1) {
+               if(pile->local != 1) { /* Not local */
                        rc = pthread_create(&thread[threadnum], NULL, transRequest, (void *) &thread_data_array[threadnum]);  
                        if (rc) {
                                perror("Error in pthread create\n");
                                return 1;
                        }
-               } else {
+               } else { /*Local*/
                        /*Unset the pile->local flag*/
                        pile->local = 0;
                        /*Set flag to identify that Local machine is involved*/
@@ -817,8 +957,28 @@ int transAbortProcess(void *modptr, unsigned int *objlocked, int numlocked, int
         return 0;
  }
 
-/*This function makes piles to prefetch records and prefetches the oids from remote machines */
-int transPrefetch(transrecord_t *record, int *arrayofoffset[], short numoids){
+/* This function is called by the thread calling transPrefetch */
+void *transPrefetch(void *prefdata) {
+       compprefetchdata_t *ptr = (compprefetchdata_t *) prefdata;
+       int *offstarray = NULL;
+
+       while(1) {
+               //TODO
+
+               //Typecast prequest to a struct 
+               //Gets requests from the compiler
+               //Creates and fills the shared queue DS 
+               //Creates the threads that processes the transPrefetch call or
+               //wakes up the prefetchprocess thread if it is sleeping
+
+       }
+}
+
+
+
+/*This function is called by the thread that processes the 
+ * prefetch request makes piles to prefetch records and prefetches the oids from remote machines */
+int transPrefetchProcess(transrecord_t *record, int *arrayofoffset[], short numoids){
        int i, k = 0, rc;
        int arraylength[numoids];
        unsigned int machinenumber;
@@ -831,7 +991,7 @@ int transPrefetch(transrecord_t *record, int *arrayofoffset[], short numoids){
        for(i = 0; i < numoids ; i++) {
                arraylength[i] = arrayLength(arrayofoffset[i]);
        }
-       /* Check for similar tuples or  other special case tuples that can be combined to a 
+       /* Check for similar tuples or other special case tuples that can be combined to a 
         * prefetch message*/
        if(checkPrefetchTuples(arrayofoffset, arraylength, numoids) != 0) {
                printf("Error on checkPrefetchTuples at %s, %d\n", __FILE__, __LINE__);
@@ -876,7 +1036,7 @@ int transPrefetch(transrecord_t *record, int *arrayofoffset[], short numoids){
                        /* Fill thread data structure */
                        rc = pthread_create(&thread[i] , &attr, sendPrefetchReq, (void *) arrayofoffset[i]);
                        if (rc) {
-                               perror("Error in pthread create at transPrefetch()\n");
+                               perror("Error in pthread create at transPrefetchProcess()\n");
                                return 1;
                        }
 
@@ -887,7 +1047,7 @@ int transPrefetch(transrecord_t *record, int *arrayofoffset[], short numoids){
        for (i = 0 ;i < numoids ; i++) {
                rc = pthread_join(thread[i], NULL);
                if (rc) {
-                       perror("Error pthread_join() in transPrefetch()\n");
+                       perror("Error pthread_join() in transPrefetchProcess()\n");
                        return 1;
                }
        }