From d4799bf6bcf4602ba351ea0f191ca14785fd085f Mon Sep 17 00:00:00 2001 From: adash Date: Fri, 13 Jul 2007 08:02:13 +0000 Subject: [PATCH] prefetch queue implementation with linked list and prefetch call TODO : some more testing --- Robust/src/Runtime/DSTM/interface/trans.c | 192 ++++++++++++++++++++-- 1 file changed, 176 insertions(+), 16 deletions(-) diff --git a/Robust/src/Runtime/DSTM/interface/trans.c b/Robust/src/Runtime/DSTM/interface/trans.c index ad30d377..d36b585b 100644 --- a/Robust/src/Runtime/DSTM/interface/trans.c +++ b/Robust/src/Runtime/DSTM/interface/trans.c @@ -4,23 +4,26 @@ #include "mlookup.h" #include "llookup.h" #include "plookup.h" -#include -#include -#include -#include -#include +//#include "queue.h" +#include +#include +#include +#include +#include #include #include #include +#include #define LISTEN_PORT 2156 -#define MACHINE_IP "127.0.0.1" #define RECEIVE_BUFFER_SIZE 2048 +/* Global Variables */ extern int classsize[]; +//extern prefetchthreadqueue_t pfqueue; // Shared prefetch queue objstr_t *mainobjstore; plistnode_t *createPiles(transrecord_t *); -//int checkPrefetchTuples(int **, int *, short); +pthread_t tPrefetch; inline int arrayLength(int *array) { int i; @@ -29,6 +32,139 @@ inline int arrayLength(int *array) { return i; } +// DS that contains information to be shared between threads. +typedef struct prefetchqelem { + struct prefetchqelem *next; +} prefetchqelem_t; + +typedef struct primarypfq { + prefetchqelem_t *front, *rear; + pthread_mutex_t qlock; + pthread_cond_t qcond; +} primarypfq_t; + +primarypfq_t pqueue; //Global queue +int flag; //Global flag + +void prefetch(int ntuples, unsigned int *oids, short *endoffsets, short *arrayfields); + + +void queueInit(void) { + pqueue.front = pqueue.rear = NULL; + pthread_mutex_init(&pqueue.qlock, NULL); + pthread_cond_init(&pqueue.qcond, NULL); +} + +/* Removes the first element of the queue */ +void delqnode() { + prefetchqelem_t *delnode; + if((pqueue.front == NULL) && (pqueue.rear == NULL)) { + printf("The queue is empty: UNDERFLOW %s, %d\n", __FILE__, __LINE__); + return; + } else if (pqueue.front == pqueue.rear) { + free(pqueue.front); + pqueue.front = pqueue.rear = NULL; + } else { + delnode = pqueue.front; + pqueue.front = pqueue.front->next; + free(delnode); + } +} + +void queueDelete(void) { + /* Remove each element */ + while(pqueue.front != NULL) + delqnode(); + pqueue.front = pqueue.rear = NULL; +} + +/* Inserts to the rear of primary prefetch queue */ +void enqueue(prefetchqelem_t *qnode) { + if(pqueue.front == NULL && pqueue.rear == NULL) { + pqueue.front = pqueue.rear = qnode; + } else { + qnode->next = NULL; + pqueue.rear->next = qnode; + pqueue.rear = qnode; + } +} + +prefetchqelem_t *dequeue(void) { + prefetchqelem_t *retnode; + + if (pqueue.front == NULL) { + printf("Queue empty: Underflow %s, %d\n", __FILE__, __LINE__); + return NULL; + } + retnode = pqueue.front; + //TODO make this atomic + pqueue.front = pqueue.front->next; + + return retnode; +} + +void queuedisplay(primarypfq_t pqueue) { + while(pqueue.front != NULL) { + /* + int *ptr = (int *)(pqueue + offset); + int oid = *ptr; + printf("oid = %d", oid); + //printf("Oid = %d", pqueue.front->oids[0]); + pqueue.front = pqueue.front->next; + */ + } +} + +void prefetch(int ntuples, unsigned int *oids, short *endoffsets, short *arrayfields) { + int qnodesize; + int len = 0; + /* Create element for the queue */ + prefetchqelem_t *node; + qnodesize = sizeof(prefetchqelem_t) + sizeof(int) + ntuples * (sizeof(short) + sizeof(unsigned int)) + endoffsets[ntuples -1] * sizeof(short); + if((node = calloc(1,qnodesize)) == NULL) { + printf("Calloc Error %s, %d\n", __FILE__, __LINE__); + return; + } + len = sizeof(prefetchqelem_t); + memcpy(node + len, &ntuples, sizeof(int)); + len += sizeof(int); + memcpy(node + len, oids, ntuples*sizeof(unsigned int)); + len += ntuples * sizeof(unsigned int); + memcpy(node + len, endoffsets, ntuples*sizeof(short)); + len += ntuples * sizeof(short); + memcpy(node + len, arrayfields, endoffsets[ntuples-1]*sizeof(short)); + + pthread_mutex_lock(&pqueue.qlock); + /* Check if primary queue thread operating on this then wait */ + /* + if(flag) { + //TODO + } + */ + enqueue(node); + + pthread_mutex_unlock(&pqueue.qlock); +} + + +/* This function initiates the prefetch thread + * A queue is shared between the main thread of execution + * and the prefetch thread to process the prefetch call + * Call from compiler populates the shared queue with prefetch requests while prefetch thread + * processes the prefetch requests */ +void transInit() { + //Initialize shared queue + queueInit(); + //Create the prefetch thread + pthread_create(&tPrefetch, NULL, transPrefetch, NULL); +} + +/* This function stops the threads spawned */ +void transExit() { + pthread_cancel(tPrefetch); + return; +} + /* This functions inserts randowm wait delays in the order of msec */ void randomdelay(void) { @@ -42,11 +178,13 @@ void randomdelay(void) return; } +/* This function initializes things required in the transaction start*/ transrecord_t *transStart() { transrecord_t *tmp = malloc(sizeof(transrecord_t)); tmp->cache = objstrCreate(1048576); tmp->lookupTable = chashCreate(HASH_SIZE, LOADFACTOR); + return tmp; } @@ -60,7 +198,7 @@ objheader_t *transRead(transrecord_t *record, unsigned int oid) int size; void *buf; /* Search local cache */ - if((objheader =(objheader_t *)chashSearch(record->lookupTable, oid)) != NULL){ + if((objheader = (objheader_t *)chashSearch(record->lookupTable, oid)) != NULL){ return(objheader); } else if ((objheader = (objheader_t *) mhashSearch(oid)) != NULL) { /* Look up in machine lookup table and copy into cache*/ @@ -86,6 +224,7 @@ objheader_t *transRead(transrecord_t *record, unsigned int oid) } } } + /* This function creates objects in the transaction record */ objheader_t *transCreateObj(transrecord_t *record, unsigned short type) { @@ -99,6 +238,7 @@ objheader_t *transCreateObj(transrecord_t *record, unsigned short type) chashInsert(record->lookupTable, tmp->oid, tmp); return tmp; } + /* This function creates machine piles based on all machines involved in a * transaction commit request */ plistnode_t *createPiles(transrecord_t *record) { @@ -147,9 +287,9 @@ plistnode_t *createPiles(transrecord_t *record) { curr = next; } } - return pile; } + /* This function initiates the transaction commit process * Spawns threads for each of the new connections with Participants * and creates new piles by calling the createPiles(), @@ -239,13 +379,13 @@ int transCommit(transrecord_t *record) { thread_data_array[threadnum].replyretry = &treplyretry; thread_data_array[threadnum].rec = record; /* If local do not create any extra connection */ - if(pile->local != 1) { + if(pile->local != 1) { /* Not local */ rc = pthread_create(&thread[threadnum], NULL, transRequest, (void *) &thread_data_array[threadnum]); if (rc) { perror("Error in pthread create\n"); return 1; } - } else { + } else { /*Local*/ /*Unset the pile->local flag*/ pile->local = 0; /*Set flag to identify that Local machine is involved*/ @@ -817,8 +957,28 @@ int transAbortProcess(void *modptr, unsigned int *objlocked, int numlocked, int return 0; } -/*This function makes piles to prefetch records and prefetches the oids from remote machines */ -int transPrefetch(transrecord_t *record, int *arrayofoffset[], short numoids){ +/* This function is called by the thread calling transPrefetch */ +void *transPrefetch(void *prefdata) { + compprefetchdata_t *ptr = (compprefetchdata_t *) prefdata; + int *offstarray = NULL; + + while(1) { + //TODO + + //Typecast prequest to a struct + //Gets requests from the compiler + //Creates and fills the shared queue DS + //Creates the threads that processes the transPrefetch call or + //wakes up the prefetchprocess thread if it is sleeping + + } +} + + + +/*This function is called by the thread that processes the + * prefetch request makes piles to prefetch records and prefetches the oids from remote machines */ +int transPrefetchProcess(transrecord_t *record, int *arrayofoffset[], short numoids){ int i, k = 0, rc; int arraylength[numoids]; unsigned int machinenumber; @@ -831,7 +991,7 @@ int transPrefetch(transrecord_t *record, int *arrayofoffset[], short numoids){ for(i = 0; i < numoids ; i++) { arraylength[i] = arrayLength(arrayofoffset[i]); } - /* Check for similar tuples or other special case tuples that can be combined to a + /* Check for similar tuples or other special case tuples that can be combined to a * prefetch message*/ if(checkPrefetchTuples(arrayofoffset, arraylength, numoids) != 0) { printf("Error on checkPrefetchTuples at %s, %d\n", __FILE__, __LINE__); @@ -876,7 +1036,7 @@ int transPrefetch(transrecord_t *record, int *arrayofoffset[], short numoids){ /* Fill thread data structure */ rc = pthread_create(&thread[i] , &attr, sendPrefetchReq, (void *) arrayofoffset[i]); if (rc) { - perror("Error in pthread create at transPrefetch()\n"); + perror("Error in pthread create at transPrefetchProcess()\n"); return 1; } @@ -887,7 +1047,7 @@ int transPrefetch(transrecord_t *record, int *arrayofoffset[], short numoids){ for (i = 0 ;i < numoids ; i++) { rc = pthread_join(thread[i], NULL); if (rc) { - perror("Error pthread_join() in transPrefetch()\n"); + perror("Error pthread_join() in transPrefetchProcess()\n"); return 1; } } -- 2.34.1