From d94659160520a148559d5ac90d1cc29afdc8366e Mon Sep 17 00:00:00 2001 From: adash Date: Thu, 19 Jul 2007 16:34:15 +0000 Subject: [PATCH] a. Change queue implementation to linked list b. add new library for prefetch cache lookup c. Inittialize data structures d. prefetch call generated by compiler e. primary prefetch thread processing included --- Robust/src/Runtime/DSTM/interface/Makefile | 18 +- Robust/src/Runtime/DSTM/interface/dstm.h | 35 +- Robust/src/Runtime/DSTM/interface/prelookup.c | 202 +++++++ Robust/src/Runtime/DSTM/interface/prelookup.h | 35 ++ Robust/src/Runtime/DSTM/interface/queue.c | 148 +++-- Robust/src/Runtime/DSTM/interface/queue.h | 25 +- Robust/src/Runtime/DSTM/interface/testd-3.c | 54 +- Robust/src/Runtime/DSTM/interface/testd-4.c | 24 +- .../src/Runtime/DSTM/interface/testserver.c | 16 +- Robust/src/Runtime/DSTM/interface/trans.c | 548 ++++++++++-------- 10 files changed, 735 insertions(+), 370 deletions(-) create mode 100644 Robust/src/Runtime/DSTM/interface/prelookup.c create mode 100644 Robust/src/Runtime/DSTM/interface/prelookup.h diff --git a/Robust/src/Runtime/DSTM/interface/Makefile b/Robust/src/Runtime/DSTM/interface/Makefile index 7f38b40b..050152fd 100644 --- a/Robust/src/Runtime/DSTM/interface/Makefile +++ b/Robust/src/Runtime/DSTM/interface/Makefile @@ -1,22 +1,22 @@ d-3: - gcc -dr -lpthread -g -o d-3 trans.c testd-3.c mlookup.c clookup.c llookup.c dstm.c objstr.c dstmserver.c plookup.c ip.c queue.c + gcc -dr -lpthread -g -o d-3 trans.c testd-3.c mlookup.c clookup.c llookup.c dstm.c objstr.c dstmserver.c plookup.c ip.c queue.c prelookup.c demsky: - gcc -DDEBUG -lpthread -g -o demsky dstmserver.c testserver.c plookup.c mlookup.c clookup.c llookup.c dstm.c objstr.c trans.c ip.c queue.c + gcc -DDEBUG -lpthread -g -o demsky dstmserver.c testserver.c plookup.c mlookup.c clookup.c llookup.c dstm.c objstr.c trans.c ip.c queue.c prelookup.c d-4: - gcc -lpthread -g -o d-4 trans.c testd-4.c mlookup.c clookup.c llookup.c dstm.c objstr.c dstmserver.c plookup.c ip.c queue.c + gcc -lpthread -g -o d-4 trans.c testd-4.c mlookup.c clookup.c llookup.c dstm.c objstr.c dstmserver.c plookup.c ip.c queue.c prelookup.c all: - gcc -lpthread -g -o d-3 trans.c testd-3.c mlookup.c clookup.c llookup.c dstm.c objstr.c dstmserver.c plookup.c ip.c queue.c - gcc -lpthread -g -o demsky dstmserver.c testserver.c plookup.c mlookup.c clookup.c llookup.c dstm.c objstr.c trans.c ip.c queue.c - gcc -lpthread -g -o d-4 trans.c testd-4.c mlookup.c clookup.c llookup.c dstm.c objstr.c dstmserver.c plookup.c ip.c queue.c + gcc -lpthread -g -o d-3 trans.c testd-3.c mlookup.c clookup.c llookup.c dstm.c objstr.c dstmserver.c plookup.c ip.c queue.c prelookup.c + gcc -lpthread -g -o demsky dstmserver.c testserver.c plookup.c mlookup.c clookup.c llookup.c dstm.c objstr.c trans.c ip.c queue.c prelookup.c + gcc -lpthread -g -o d-4 trans.c testd-4.c mlookup.c clookup.c llookup.c dstm.c objstr.c dstmserver.c plookup.c ip.c queue.c prelookup.c mac: - gcc -DMAC -lpthread -g -o d-3 trans.c testd-3.c mlookup.c clookup.c llookup.c dstm.c objstr.c dstmserver.c plookup.c ip.c queue.c - gcc -DMAC -lpthread -g -o demsky dstmserver.c testserver.c plookup.c mlookup.c clookup.c llookup.c dstm.c objstr.c trans.c ip.c queue.c - gcc -DMAC -lpthread -g -o d-4 trans.c testd-4.c mlookup.c clookup.c llookup.c dstm.c objstr.c dstmserver.c plookup.c ip.c queue.c + gcc -DMAC -lpthread -g -o d-3 trans.c testd-3.c mlookup.c clookup.c llookup.c dstm.c objstr.c dstmserver.c plookup.c ip.c queue.c prelookup.c + gcc -DMAC -lpthread -g -o demsky dstmserver.c testserver.c plookup.c mlookup.c clookup.c llookup.c dstm.c objstr.c trans.c ip.c queue.c prelookup.c + gcc -DMAC -lpthread -g -o d-4 trans.c testd-4.c mlookup.c clookup.c llookup.c dstm.c objstr.c dstmserver.c plookup.c ip.c queue.c prelookup.c clean: rm -rf d-3 d-4 demsky diff --git a/Robust/src/Runtime/DSTM/interface/dstm.h b/Robust/src/Runtime/DSTM/interface/dstm.h index cfcbecfd..c937a57b 100644 --- a/Robust/src/Runtime/DSTM/interface/dstm.h +++ b/Robust/src/Runtime/DSTM/interface/dstm.h @@ -32,12 +32,16 @@ #define OBJ_UNLOCK_BUT_VERSION_MATCH 15 #define VERSION_NO_MATCH 16 +//Max number of objects +#define MAX_OBJECTS 20 + #include #include #include #include #include "clookup.h" +#include "queue.h" #define DEFAULT_OBJ_STORE_SIZE 1048510 //1MB #define TID_LEN 20 @@ -113,7 +117,6 @@ typedef struct thread_data_array { int *count; //variable to count responses of TRANS_REQUEST protocol from all participants char *replyctrl; //shared ctrl message that stores the reply to be sent, filled by decideResp char *replyretry; //shared variable to find out if we need retry (TRANS_COMMIT case) -// char *localstatus; //shared variable to identify local requests transrecord_t *rec; // To send modified objects } thread_data_array_t; @@ -139,12 +142,22 @@ typedef struct member { //Structure for prefetching tuples generated by teh compiler - typedef struct trans_prefetchtuple{ - unsigned int depth; - unsigned int oid; - trans_member_t member; - struct trans_prefetchtuple *next; - }trans_prefetchtuple_t; + typedef struct prefetchpile{ + int mid; + int *oids; + + int **numofarrys; + struct prefetchpile *next; + }prefetchpile_t; + +//Structure per Oid in the prefetch call + +/* +//Structure that holds the compiler generated prefetch data +typedef struct compprefetchdata { + transrecord_t *record; +} compprefetchdata_t; +*/ /* Initialize main object store and lookup tables, start server thread. */ int dstmInit(void); @@ -183,5 +196,13 @@ char sendResponse(thread_data_array_t *, int); //Sends control message back to P void *getRemoteObj(transrecord_t *, unsigned int, unsigned int); int transAbortProcess(void *, unsigned int *, int, int, int); int transComProcess(trans_commit_data_t *); +void prefetch(int, unsigned int *, short *, short*); +void *transPrefetch(void *); +void checkPrefetchTuples(prefetchqelem_t *); +void foundLocal(prefetchqelem_t *); +void makePreGroups(prefetchqelem_t *node); +void checkPreCache(prefetchqelem_t *, int *, int, int, unsigned int, int, int, int); +int transPrefetchProcess(transrecord_t *, int **, short); +void *sendPrefetchReq(void *); /* end transactions */ #endif diff --git a/Robust/src/Runtime/DSTM/interface/prelookup.c b/Robust/src/Runtime/DSTM/interface/prelookup.c new file mode 100644 index 00000000..77733dfd --- /dev/null +++ b/Robust/src/Runtime/DSTM/interface/prelookup.c @@ -0,0 +1,202 @@ + #include "prelookup.h" + +prehashtable_t pflookup; //Global prefetch cache table + +unsigned int prehashCreate(unsigned int size, float loadfactor) { + prehashlistnode_t *nodes; + int i; + + // Allocate space for the hash table + if((nodes = calloc(size, sizeof(prehashlistnode_t))) == NULL) { + printf("Calloc error %s %d\n", __FILE__, __LINE__); + return 1; + } + + pflookup.table = nodes; + pflookup.size = size; + pflookup.numelements = 0; // Initial number of elements in the hash + pflookup.loadfactor = loadfactor; + + //Initialize mutex var + pthread_mutex_init(&pflookup.lock, NULL); + + return 0; +} + +//Assign keys to bins inside hash table +unsigned int prehashFunction(unsigned int key) { + return ( key % (pflookup.size)); +} + +//Store oids and their pointers into hash +unsigned int prehashInsert(unsigned int key, void *val) { + unsigned int newsize; + int index; + prehashlistnode_t *ptr, *node; + + if(pflookup.numelements > (pflookup.loadfactor * pflookup.size)) { + //Resize + newsize = 2 * pflookup.size + 1; + pthread_mutex_lock(&pflookup.lock); + prehashResize(newsize); + pthread_mutex_unlock(&pflookup.lock); + } + + ptr = pflookup.table; + pflookup.numelements++; + index = prehashFunction(key); + + pthread_mutex_lock(&pflookup.lock); + if(ptr[index].next == NULL && ptr[index].key == 0) { // Insert at the first position in the hashtable + ptr[index].key = key; + ptr[index].val = val; + } else { // Insert in the beginning of linked list + if ((node = calloc(1, sizeof(prehashlistnode_t))) == NULL) { + printf("Calloc error %s, %d\n", __FILE__, __LINE__); + pthread_mutex_unlock(&pflookup.lock); + return 1; + } + node->key = key; + node->val = val ; + node->next = ptr[index].next; + ptr[index].next = node; + } + pthread_mutex_unlock(&pflookup.lock); + return 0; +} + +// Search for an address for a given oid +void *prehashSearch(unsigned int key) { + int index; + prehashlistnode_t *ptr, *node; + + ptr = pflookup.table; + index = prehashFunction(key); + node = &ptr[index]; + pthread_mutex_lock(&pflookup.lock); + while(node != NULL) { + if(node->key == key) { + pthread_mutex_unlock(&pflookup.lock); + return node->val; + } + node = node->next; + } + pthread_mutex_unlock(&pflookup.lock); + return NULL; +} + +unsigned int prehashRemove(unsigned int key) { + int index; + prehashlistnode_t *curr, *prev; + prehashlistnode_t *ptr, *node; + + ptr = pflookup.table; + index = prehashFunction(key); + curr = &ptr[index]; + + pthread_mutex_lock(&pflookup.lock); + for (; curr != NULL; curr = curr->next) { + if (curr->key == key) { // Find a match in the hash table + pflookup.numelements--; // Decrement the number of elements in the global hashtable + if ((curr == &ptr[index]) && (curr->next == NULL)) { // Delete the first item inside the hashtable with no linked list of prehashlistnode_t + curr->key = 0; + curr->val = NULL; + } else if ((curr == &ptr[index]) && (curr->next != NULL)) { //Delete the first item with a linked list of prehashlistnode_t connected + curr->key = curr->next->key; + curr->val = curr->next->val; + node = curr->next; + curr->next = curr->next->next; + free(node); + } else { // Regular delete from linked listed + prev->next = curr->next; + free(curr); + } + pthread_mutex_unlock(&pflookup.lock); + return 0; + } + prev = curr; + } + pthread_mutex_unlock(&pflookup.lock); + return 1; +} + +unsigned int prehashResize(unsigned int newsize) { + prehashlistnode_t *node, *ptr, *curr, *next; // curr and next keep track of the current and the next chashlistnodes in a linked list + unsigned int oldsize; + int isfirst; // Keeps track of the first element in the prehashlistnode_t for each bin in hashtable + int i,index; + prehashlistnode_t *newnode; + + ptr = pflookup.table; + oldsize = pflookup.size; + + if((node = calloc(newsize, sizeof(prehashlistnode_t))) == NULL) { + printf("Calloc error %s %d\n", __FILE__, __LINE__); + return 1; + } + + pflookup.table = node; //Update the global hashtable upon resize() + pflookup.size = newsize; + pflookup.numelements = 0; + + for(i = 0; i < oldsize; i++) { //Outer loop for each bin in hash table + curr = &ptr[i]; + isfirst = 1; + while (curr != NULL) { //Inner loop to go through linked lists + if (curr->key == 0) { //Exit inner loop if there the first element for a given bin/index is NULL + break; //key = val =0 for element if not present within the hash table + } + next = curr->next; + index = prehashFunction(curr->key); + // Insert into the new table + if(pflookup.table[index].next == NULL && pflookup.table[index].key == 0) { + pflookup.table[index].key = curr->key; + pflookup.table[index].val = curr->val; + pflookup.numelements++; + }else { + if((newnode = calloc(1, sizeof(prehashlistnode_t))) == NULL) { + printf("Calloc error %s, %d\n", __FILE__, __LINE__); + return 1; + } + newnode->key = curr->key; + newnode->val = curr->val; + newnode->next = pflookup.table[index].next; + pflookup.table[index].next = newnode; + pflookup.numelements++; + } + + //free the linked list of prehashlistnode_t if not the first element in the hash table + if (isfirst != 1) { + free(curr); + } + + isfirst = 0; + curr = next; + } + } + + free(ptr); //Free the memory of the old hash table + return 0; +} + +/* Deletes the prefetch Cache */ +void prehashDelete() { + int i, isFirst; + prehashlistnode_t *ptr, *curr, *next; + ptr = pflookup.table; + + for(i=0 ; inext; + if(isFirst != 1) { + free(curr); + } + isFirst = 0; + curr = next; + } + } + + free(ptr); +} diff --git a/Robust/src/Runtime/DSTM/interface/prelookup.h b/Robust/src/Runtime/DSTM/interface/prelookup.h new file mode 100644 index 00000000..0f68d20b --- /dev/null +++ b/Robust/src/Runtime/DSTM/interface/prelookup.h @@ -0,0 +1,35 @@ +#ifndef _PRELOOKUP_H_ +#define _PRELOOKUP_H_ + +#include +#include +#include + +#define LOADFACTOR 0.75 +#define HASH_SIZE 100 + +typedef struct prehashlistnode { + unsigned int key; + void *val; //this can be cast to another type or used to point to a larger structure + struct prehashlistnode *next; +} prehashlistnode_t; + +typedef struct prehashtable { + prehashlistnode_t *table; // points to beginning of hash table + unsigned int size; + unsigned int numelements; + float loadfactor; + pthread_mutex_t lock; +} prehashtable_t; + +/* Prototypes for hash*/ +unsigned int prehashCreate(unsigned int size, float loadfactor); +unsigned int prehashFunction(unsigned int key); +unsigned int prehashInsert(unsigned int key, void *val); +void *prehashSearch(unsigned int key); //returns val, NULL if not found +unsigned int prehashRemove(unsigned int key); //returns -1 if not found +unsigned int prehashResize(unsigned int newsize); +/* end hash */ + +#endif + diff --git a/Robust/src/Runtime/DSTM/interface/queue.c b/Robust/src/Runtime/DSTM/interface/queue.c index fc12eaab..f164528f 100644 --- a/Robust/src/Runtime/DSTM/interface/queue.c +++ b/Robust/src/Runtime/DSTM/interface/queue.c @@ -1,83 +1,103 @@ #include "queue.h" -prefetchthreadqueue_t queue; //Global shared prefetch queue +primarypfq_t pqueue; //Global queue -void queueInsert(int *array) { - pthread_mutex_lock(&queue.qlock); - queue.rear = queue.rear % ARRAY_SIZE; - if(queue.front == queue.rear && queue.buffer[queue.front] != NULL) { - printf("The Circular Queue is Full : OVERFLOW\n"); - pthread_mutex_unlock(&queue.qlock); +void queueInit(void) { + /* Intitialize primary thread */ + pqueue.front = pqueue.rear = NULL; + pthread_mutex_init(&pqueue.qlock, NULL); + pthread_cond_init(&pqueue.qcond, NULL); +} + +/* Removes the first element of the queue */ +void delqnode() { + prefetchqelem_t *delnode; + if((pqueue.front == NULL) && (pqueue.rear == NULL)) { + printf("The queue is empty: UNDERFLOW %s, %d\n", __FILE__, __LINE__); return; + } else if ((pqueue.front == pqueue.rear) && pqueue.front != NULL && pqueue.rear != NULL) { + printf("TEST1\n"); + free(pqueue.front); + pqueue.front = pqueue.rear = NULL; } else { - queue.buffer[queue.rear] = array; - queue.rear++; + delnode = pqueue.front; + pqueue.front = pqueue.front->next; + printf("TEST2\n"); + free(delnode); } - pthread_mutex_unlock(&queue.qlock); } -int *queueDelete() { - int *i; - i = NULL; - pthread_mutex_lock(&queue.qlock); - if(queue.front == queue.rear && queue.buffer[queue.front] == NULL) { - printf("The Circular Queue is Empty : UNDERFLOW\n"); - pthread_mutex_unlock(&queue.qlock); - return NULL; +void queueDelete(void) { + /* Remove each element */ + while(pqueue.front != NULL) + delqnode(); + pqueue.front = pqueue.rear = NULL; +} + +/* Inserts to the rear of primary prefetch queue */ +void enqueue(prefetchqelem_t *qnode) { + if(pqueue.front == NULL && pqueue.rear == NULL) { + pqueue.front = pqueue.rear = qnode; } else { - i = queue.buffer[queue.front]; - queue.buffer[queue.front] = NULL; - queue.front++; - queue.front = queue.front % ARRAY_SIZE; - pthread_mutex_unlock(&queue.qlock); - return i; + qnode->next = NULL; + pqueue.rear->next = qnode; + pqueue.rear = qnode; } } -void queueInit() { - int i; - queue.front = 0; - queue.rear = 0; - for(i = 0; i< ARRAY_SIZE; i++) - queue.buffer[i] = NULL; - /* Initialize the pthread_mutex variable */ - pthread_mutex_init(&queue.qlock, NULL); +prefetchqelem_t *dequeue(void) { + prefetchqelem_t *retnode; + if (pqueue.front == NULL) { + printf("Queue empty: Underflow %s, %d\n", __FILE__, __LINE__); + return NULL; + } + retnode = pqueue.front; + //TODO make this atomic + pqueue.front = pqueue.front->next; + + return retnode; } -/* For testing purposes */ +void queueDisplay() { + int offset = sizeof(prefetchqelem_t); + int *ptr; + int ntuples; + char *ptr1; + prefetchqelem_t *tmp = pqueue.front; + while(tmp != NULL) { + ptr1 = (char *) tmp; + ptr = (int *)(ptr1 + offset); + ntuples = *ptr; + printf("Number of tuples = %d\n", ntuples); + tmp = tmp->next; + } +} + + #if 0 main() { - int *d; - queueIsEmpty(); - int a[] = {5, 2, 8, -1}; - int b[] = {11, 8, 4, 19, -1}; - int c[] = {16, 8, 4, -1}; - printf("Front = %d, Rear = %d\n",queue.front, queue.rear); - d = queueDelete(); - printf("Front = %d, Rear = %d\n",queue.front, queue.rear); - queueInsert(a); - printf("Enqueued ptr is %x\n", a); - printf("1st Insert Front = %d, Rear = %d\n",queue.front, queue.rear); - queueInsert(b); - printf("Enqueued ptr is %x\n", b); - printf("2nd Insert Front = %d, Rear = %d\n",queue.front, queue.rear); - queueInsert(c); - printf("3rd Insert Front = %d, Rear = %d\n",queue.front, queue.rear); - d = queueDelete(); - printf("Dequeued ptr is %x\n", d); - printf("After 1st del Front = %d, Rear = %d\n",queue.front, queue.rear); - queueInsert(c); - printf("Enqueued ptr is %x\n", c); - printf("After 4th insert Front = %d, Rear = %d\n",queue.front, queue.rear); - d = queueDelete(); - printf("Dequeued ptr is %x\n", d); - printf("After 2nd del Front = %d, Rear = %d\n",queue.front, queue.rear); - d = queueDelete(); - printf("Dequeued ptr is %x\n", d); - printf("After 3rd del Front = %d, Rear = %d\n",queue.front, queue.rear); - d = queueDelete(); - printf("Dequeued ptr is %x\n", d); - printf("After 4th del Front = %d, Rear = %d\n",queue.front, queue.rear); + unsigned int oids[] = {11, 13}; + short endoffsets[] = {2, 5}; + short arrayfields[] = {2, 2, 1, 5, 6}; + queueInit(); + queueDisplay(); + prefetch(2, oids, endoffsets, arrayfields); + queueDisplay(); + unsigned int oids1[] = {21, 23, 25, 27}; + short endoffsets1[] = {1, 2, 3, 4}; + short arrayfields1[] = {3, 2, 1, 3}; + prefetch(4, oids1, endoffsets1, arrayfields1); + queueDisplay(); + delqnode(); + queueDisplay(); + delqnode(); + queueDisplay(); + delqnode(); + queueDisplay(); + delqnode(); + } #endif + + diff --git a/Robust/src/Runtime/DSTM/interface/queue.h b/Robust/src/Runtime/DSTM/interface/queue.h index 9ad5df45..ea7dba2d 100644 --- a/Robust/src/Runtime/DSTM/interface/queue.h +++ b/Robust/src/Runtime/DSTM/interface/queue.h @@ -4,19 +4,24 @@ #include #include #include - -#define ARRAY_SIZE 20 +#include // DS that contains information to be shared between threads. -typedef struct prefetchthreadqueue { - int *buffer[ARRAY_SIZE]; - int front; - int rear; +typedef struct prefetchqelem { + struct prefetchqelem *next; +} prefetchqelem_t; + +typedef struct primarypfq { + prefetchqelem_t *front, *rear; pthread_mutex_t qlock; -} prefetchthreadqueue_t; + pthread_cond_t qcond; +} primarypfq_t; -void queueInsert(int *); -int *queueDelete(); -void queueInit(); //Initializes the queue and qlock mutex +void queueInit(void); +void delqnode(); +void queueDelete(void); +void enqueue(prefetchqelem_t *qnode); +prefetchqelem_t *dequeue(void); +void queueDisplay(); #endif diff --git a/Robust/src/Runtime/DSTM/interface/testd-3.c b/Robust/src/Runtime/DSTM/interface/testd-3.c index 1dfcea0c..d3261a21 100644 --- a/Robust/src/Runtime/DSTM/interface/testd-3.c +++ b/Robust/src/Runtime/DSTM/interface/testd-3.c @@ -133,7 +133,7 @@ int test2a(void) { lhashInsert(header->oid, mid); //Inserting into lhashtable - mid = iptoMid("128.200.9.30"); //d-4.eecs.uci.edu + mid = iptoMid("128.195.175.69"); //dw-1.eecs.uci.edu lhashInsert(31, mid); lhashInsert(32, mid); @@ -148,9 +148,9 @@ int test2a(void) { //Check if machine demsky is up and running checkServer(mid, "128.200.9.10"); - mid = iptoMid("128.200.9.30"); - //Check if machine d-4 is up and running - checkServer(mid, "128.200.9.30"); + mid = iptoMid("128.195.175.69"); + //Check if machine dw-1 is up and running + checkServer(mid, "128.195.175.69"); // Start Transaction myTrans = transStart(); @@ -222,7 +222,7 @@ int test2b(void) { lhashInsert(header->oid, mid); //Inserting into lhashtable - mid = iptoMid("128.200.9.30"); //d-4.eecs.uci.edu + mid = iptoMid("128.195.175.69"); //dw-1.eecs.uci.edu lhashInsert(31, mid); lhashInsert(32, mid); lhashInsert(33, mid); @@ -238,9 +238,9 @@ int test2b(void) { //Check if machine demsky is up and running checkServer(mid, "128.200.9.10"); - mid = iptoMid("128.200.9.30"); - //Check if machine d-4 is up and running - checkServer(mid, "128.200.9.30"); + mid = iptoMid("128.195.175.69"); + //Check if machine dw-1 is up and running + checkServer(mid, "128.195.175.69"); // Start Transaction myTrans = transStart(); @@ -259,7 +259,7 @@ int test2b(void) { printf("Object not found\n"); } - //read object 32 (found on d-4) + //read object 32 (found on dw-1) if((h4 = transRead(myTrans, 32)) == NULL) { printf("Object not found\n"); } @@ -378,7 +378,7 @@ int test5(void) { lhashInsert(header->oid, mid); //Inserting into lhashtable - mid = iptoMid("128.200.9.30"); //d-4.eecs.uci.edu + mid = iptoMid("128.195.175.69"); //dw-1.eecs.uci.edu lhashInsert(31, mid); lhashInsert(32, mid); lhashInsert(33, mid); @@ -393,9 +393,9 @@ int test5(void) { pthread_create(&thread_Listen, &attr, dstmListen, NULL); //Check if machine demsky is up and running checkServer(mid, "128.200.9.10"); - mid = iptoMid("128.200.9.30"); - //Check if machine d-4 is up and running - checkServer(mid, "128.200.9.30"); + mid = iptoMid("128.195.175.69"); + //Check if machine dw-1 is up and running + checkServer(mid, "128.195.175.69"); // Start Transaction myTrans = transStart(); @@ -405,11 +405,11 @@ int test5(void) { printf("Object not found\n"); } //read object 2 (found on demksy) - if((h1 = transRead(myTrans,2)) == NULL){ + if((h2 = transRead(myTrans,2)) == NULL){ printf("Object not found\n"); } - //read object 31 (found on d-4) - if((h2 = transRead(myTrans, 31)) == NULL) { + //read object 22 (found locally ) + if((h3 = transRead(myTrans, 22)) == NULL) { printf("Object not found\n"); } @@ -449,7 +449,7 @@ int test5a(void) { header = (objheader_t *) objstrAlloc(mainobjstore, size); header->oid = 21; header->type = 1; - //read object 31 (found on d-4) + //read object 31 (found on dw-1) if((h2 = transRead(myTrans, 31)) == NULL) { printf("Object not found\n"); } @@ -512,7 +512,7 @@ int test5b(void) { lhashInsert(header->oid, mid); //Inserting into lhashtable - mid = iptoMid("128.200.9.30"); //d-4.eecs.uci.edu + mid = iptoMid("128.195.175.69"); //dw-1.eecs.uci.edu lhashInsert(31, mid); lhashInsert(32, mid); lhashInsert(33, mid); @@ -528,9 +528,9 @@ int test5b(void) { //Check if machine demsky is up and running checkServer(mid, "128.200.9.10"); - mid = iptoMid("128.200.9.30"); - //Check if machine d-4 is up and running - checkServer(mid, "128.200.9.30"); + mid = iptoMid("128.195.175.69"); + //Check if machine dw-1 is up and running + checkServer(mid, "128.195.175.69"); // Start Transaction myTrans = transStart(); @@ -539,7 +539,7 @@ int test5b(void) { if((h1 = transRead(myTrans, 1)) == NULL){ printf("Object not found\n"); } - //read object 31 (found on d-4) + //read object 31 (found on dw-1) if((h2 = transRead(myTrans, 31)) == NULL) { printf("Object not found\n"); } @@ -612,7 +612,7 @@ int test7(void) { lhashInsert(header->oid, mid); //Inserting into lhashtable - mid = iptoMid("128.200.9.30"); //d-4.eecs.uci.edu + mid = iptoMid("128.195.175.69"); //dw-1.eecs.uci.edu lhashInsert(31, mid); lhashInsert(32, mid); lhashInsert(33, mid); @@ -628,9 +628,9 @@ int test7(void) { //Check if machine demsky is up and running checkServer(mid, "128.200.9.10"); - mid = iptoMid("128.200.9.30"); - //Check if machine d-4 is up and running - checkServer(mid, "128.200.9.30"); + mid = iptoMid("128.195.175.69"); + //Check if machine dw-1 is up and running + checkServer(mid, "128.195.175.69"); // Start Transaction myTrans = transStart(); @@ -639,7 +639,7 @@ int test7(void) { if((h1 = transRead(myTrans, 3)) == NULL){ printf("Object not found\n"); } - //read object 32 (found on d-4) + //read object 32 (found on dw-1) if((h2 = transRead(myTrans, 32)) == NULL) { printf("Object not found\n"); } diff --git a/Robust/src/Runtime/DSTM/interface/testd-4.c b/Robust/src/Runtime/DSTM/interface/testd-4.c index 9876bdb2..26dba3a9 100644 --- a/Robust/src/Runtime/DSTM/interface/testd-4.c +++ b/Robust/src/Runtime/DSTM/interface/testd-4.c @@ -41,7 +41,7 @@ int test1() { header = (objheader_t *) objstrAlloc(mainobjstore, size); init_obj(header, 31, 2, 1, 0, NEW); mhashInsert(header->oid, header); - mid = iptoMid("128.200.9.30"); + mid = iptoMid("128.195.175.69"); lhashInsert(header->oid, mid); //Create and Insert Oid 32 @@ -49,7 +49,7 @@ int test1() { header = (objheader_t *) objstrAlloc(mainobjstore, size); init_obj(header, 32, 1, 1, 0, NEW); mhashInsert(header->oid, header); - mid = iptoMid("128.200.9.30"); + mid = iptoMid("128.195.175.69"); lhashInsert(header->oid, mid); //Create and Insert Oid 33 @@ -57,7 +57,7 @@ int test1() { header = (objheader_t *) objstrAlloc(mainobjstore, size); init_obj(header, 33, 0, 1, 0, NEW); mhashInsert(header->oid, header); - mid = iptoMid("128.200.9.30"); + mid = iptoMid("128.195.175.69"); lhashInsert(header->oid, mid); //Inserting into lhashtable into d-3.eecs @@ -104,7 +104,7 @@ int test2() { header = (objheader_t *) objstrAlloc(mainobjstore, size); init_obj(header, 31, 2, 1, 0, NEW); mhashInsert(header->oid, header); - mid = iptoMid("128.200.9.30"); + mid = iptoMid("128.195.175.69"); lhashInsert(header->oid, mid); //Create and Insert Oid 32 @@ -112,7 +112,7 @@ int test2() { header = (objheader_t *) objstrAlloc(mainobjstore, size); init_obj(header, 32, 1, 1, 0, NEW); mhashInsert(header->oid, header); - mid = iptoMid("128.200.9.30"); + mid = iptoMid("128.195.175.69"); lhashInsert(header->oid, mid); //Create and Insert Oid 33 @@ -120,7 +120,7 @@ int test2() { header = (objheader_t *) objstrAlloc(mainobjstore, size); init_obj(header, 33, 0, 1, 0, NEW); mhashInsert(header->oid, header); - mid = iptoMid("128.200.9.30"); + mid = iptoMid("128.195.175.69"); lhashInsert(header->oid, mid); //Inserting into lhashtable into d-3.eecs @@ -190,7 +190,7 @@ int test3() { header = (objheader_t *) objstrAlloc(mainobjstore, size); init_obj(header, 31, 2, 1, 0, NEW); mhashInsert(header->oid, header); - mid = iptoMid("128.200.9.30"); + mid = iptoMid("128.195.175.69"); lhashInsert(header->oid, mid); //Create and Insert Oid 32 @@ -198,7 +198,7 @@ int test3() { header = (objheader_t *) objstrAlloc(mainobjstore, size); init_obj(header, 32, 1, 1, 0, NEW); mhashInsert(header->oid, header); - mid = iptoMid("128.200.9.30"); + mid = iptoMid("128.195.175.69"); lhashInsert(header->oid, mid); //Create and Insert Oid 33 @@ -206,7 +206,7 @@ int test3() { header = (objheader_t *) objstrAlloc(mainobjstore, size); init_obj(header, 33, 0, 1, 0, NEW); mhashInsert(header->oid, header); - mid = iptoMid("128.200.9.30"); + mid = iptoMid("128.195.175.69"); lhashInsert(header->oid, mid); //Inserting into lhashtable into d-3.eecs @@ -272,7 +272,7 @@ int test4() { header = (objheader_t *) objstrAlloc(mainobjstore, size); init_obj(header, 31, 2, 1, 0, NEW); mhashInsert(header->oid, header); - mid = iptoMid("128.200.9.30"); + mid = iptoMid("128.195.175.69"); lhashInsert(header->oid, mid); //Create and Insert Oid 32 @@ -280,7 +280,7 @@ int test4() { header = (objheader_t *) objstrAlloc(mainobjstore, size); init_obj(header, 32, 1, 1, 0, NEW); mhashInsert(header->oid, header); - mid = iptoMid("128.200.9.30"); + mid = iptoMid("128.195.175.69"); lhashInsert(header->oid, mid); //Create and Insert Oid 33 @@ -288,7 +288,7 @@ int test4() { header = (objheader_t *) objstrAlloc(mainobjstore, size); init_obj(header, 33, 0, 1, 0, NEW); mhashInsert(header->oid, header); - mid = iptoMid("128.200.9.30"); + mid = iptoMid("128.195.175.69"); lhashInsert(header->oid, mid); //Inserting into lhashtable into d-3.eecs diff --git a/Robust/src/Runtime/DSTM/interface/testserver.c b/Robust/src/Runtime/DSTM/interface/testserver.c index 61e397db..85d3b920 100644 --- a/Robust/src/Runtime/DSTM/interface/testserver.c +++ b/Robust/src/Runtime/DSTM/interface/testserver.c @@ -44,7 +44,7 @@ void init_obj(objheader_t *h, unsigned int oid, unsigned short type, \ int main() { - //test1(); +// test1(); // test3(); test4(); } @@ -189,15 +189,15 @@ int test3() { lhashInsert(21, mid); lhashInsert(22, mid); - mid = iptoMid("128.200.9.30"); //d-4.eecs.uci.edu + mid = iptoMid("128.195.175.69"); //dw-1.eecs.uci.edu //Inserting into lhashtable lhashInsert(31, mid); lhashInsert(32, mid); lhashInsert(33, mid); pthread_create(&thread_Listen, &attr, dstmListen, NULL); - //Check if machine d-4 is up and running - checkServer(mid, "128.200.9.30"); + //Check if machine dw-1 is up and running + checkServer(mid, "128.195.175.69"); mid = iptoMid("128.200.9.29"); //Check if machine d-3 is up and running checkServer(mid, "128.200.9.29"); @@ -281,15 +281,15 @@ int test4() { lhashInsert(21, mid); lhashInsert(22, mid); - mid = iptoMid("128.200.9.30"); //d-4.eecs.uci.edu + mid = iptoMid("128.195.175.69"); //dw-1.eecs.uci.edu //Inserting into lhashtable lhashInsert(31, mid); lhashInsert(32, mid); lhashInsert(33, mid); pthread_create(&thread_Listen, &attr, dstmListen, NULL); - //Check if machine d-4 is up and running - checkServer(mid, "128.200.9.30"); + //Check if machine dw-1 is up and running + checkServer(mid, "128.195.175.69"); mid = iptoMid("128.200.9.29"); //Check if machine d-3 is up and running checkServer(mid, "128.200.9.29"); @@ -306,7 +306,7 @@ int test4() { if((h2 = transRead(myTrans, 1)) == NULL) { printf("Object not found\n"); } - //read object 31(present in d-4 machine) + //read object 31(present in dw-1 machine) if((h3 = transRead(myTrans, 31)) == NULL) { printf("Object not found\n"); } diff --git a/Robust/src/Runtime/DSTM/interface/trans.c b/Robust/src/Runtime/DSTM/interface/trans.c index d36b585b..2bf2d019 100644 --- a/Robust/src/Runtime/DSTM/interface/trans.c +++ b/Robust/src/Runtime/DSTM/interface/trans.c @@ -4,7 +4,7 @@ #include "mlookup.h" #include "llookup.h" #include "plookup.h" -//#include "queue.h" +#include "queue.h" #include #include #include @@ -14,117 +14,47 @@ #include #include #include +#include #define LISTEN_PORT 2156 #define RECEIVE_BUFFER_SIZE 2048 +#define NUM_THREADS 10 +#define PREFETCH_CACHE_SIZE 1048576 //1MB + +#define GET_NTUPLES(x) ((int *)(x + sizeof(prefetchqelem_t))) +#define GET_PTR_OID(x) ((unsigned int *)(x + sizeof(prefetchqelem_t) + sizeof(int))) +#define GET_PTR_EOFF(x,n) ((short *)(x + sizeof(prefetchqelem_t) + sizeof(int) + (n*sizeof(unsigned int)))) +#define GET_PTR_ARRYFLD(x,n) ((short *)(x + sizeof(prefetchqelem_t) + sizeof(int) + (n*sizeof(unsigned int)) + (n*sizeof(short)))) /* Global Variables */ extern int classsize[]; -//extern prefetchthreadqueue_t pfqueue; // Shared prefetch queue -objstr_t *mainobjstore; -plistnode_t *createPiles(transrecord_t *); +extern primarypfq_t pqueue; // shared prefetch queue +pthread_t wthreads[NUM_THREADS]; //Worker threads for working on the prefetch queue pthread_t tPrefetch; +extern objstr_t *mainobjstore; +objstr_t *prefetchcache; +plistnode_t *createPiles(transrecord_t *); inline int arrayLength(int *array) { int i; for(i=0 ;array[i] != -1; i++) ; return i; } - -// DS that contains information to be shared between threads. -typedef struct prefetchqelem { - struct prefetchqelem *next; -} prefetchqelem_t; - -typedef struct primarypfq { - prefetchqelem_t *front, *rear; - pthread_mutex_t qlock; - pthread_cond_t qcond; -} primarypfq_t; - -primarypfq_t pqueue; //Global queue -int flag; //Global flag - -void prefetch(int ntuples, unsigned int *oids, short *endoffsets, short *arrayfields); - - -void queueInit(void) { - pqueue.front = pqueue.rear = NULL; - pthread_mutex_init(&pqueue.qlock, NULL); - pthread_cond_init(&pqueue.qcond, NULL); -} - -/* Removes the first element of the queue */ -void delqnode() { - prefetchqelem_t *delnode; - if((pqueue.front == NULL) && (pqueue.rear == NULL)) { - printf("The queue is empty: UNDERFLOW %s, %d\n", __FILE__, __LINE__); - return; - } else if (pqueue.front == pqueue.rear) { - free(pqueue.front); - pqueue.front = pqueue.rear = NULL; - } else { - delnode = pqueue.front; - pqueue.front = pqueue.front->next; - free(delnode); - } -} - -void queueDelete(void) { - /* Remove each element */ - while(pqueue.front != NULL) - delqnode(); - pqueue.front = pqueue.rear = NULL; -} - -/* Inserts to the rear of primary prefetch queue */ -void enqueue(prefetchqelem_t *qnode) { - if(pqueue.front == NULL && pqueue.rear == NULL) { - pqueue.front = pqueue.rear = qnode; - } else { - qnode->next = NULL; - pqueue.rear->next = qnode; - pqueue.rear = qnode; - } -} - -prefetchqelem_t *dequeue(void) { - prefetchqelem_t *retnode; - - if (pqueue.front == NULL) { - printf("Queue empty: Underflow %s, %d\n", __FILE__, __LINE__); - return NULL; - } - retnode = pqueue.front; - //TODO make this atomic - pqueue.front = pqueue.front->next; - - return retnode; -} - -void queuedisplay(primarypfq_t pqueue) { - while(pqueue.front != NULL) { - /* - int *ptr = (int *)(pqueue + offset); - int oid = *ptr; - printf("oid = %d", oid); - //printf("Oid = %d", pqueue.front->oids[0]); - pqueue.front = pqueue.front->next; - */ - } -} - +/* This function is a prefetch call generated by the compiler that + * populates the shared primary prefetch queue*/ void prefetch(int ntuples, unsigned int *oids, short *endoffsets, short *arrayfields) { int qnodesize; int len = 0; - /* Create element for the queue */ - prefetchqelem_t *node; - qnodesize = sizeof(prefetchqelem_t) + sizeof(int) + ntuples * (sizeof(short) + sizeof(unsigned int)) + endoffsets[ntuples -1] * sizeof(short); + + /* Allocate for the queue node*/ + char *node; + qnodesize = sizeof(prefetchqelem_t) + sizeof(int) + ntuples * (sizeof(short) + sizeof(unsigned int)) + endoffsets[ntuples - 1] * sizeof(short); if((node = calloc(1,qnodesize)) == NULL) { printf("Calloc Error %s, %d\n", __FILE__, __LINE__); return; } + /* Set queue node values */ len = sizeof(prefetchqelem_t); memcpy(node + len, &ntuples, sizeof(int)); len += sizeof(int); @@ -133,39 +63,51 @@ void prefetch(int ntuples, unsigned int *oids, short *endoffsets, short *arrayfi memcpy(node + len, endoffsets, ntuples*sizeof(short)); len += ntuples * sizeof(short); memcpy(node + len, arrayfields, endoffsets[ntuples-1]*sizeof(short)); - + /* Lock and insert into primary prefetch queue */ pthread_mutex_lock(&pqueue.qlock); - /* Check if primary queue thread operating on this then wait */ - /* - if(flag) { - //TODO - } - */ - enqueue(node); - + enqueue((prefetchqelem_t *)node); + pthread_cond_signal(&pqueue.qcond); pthread_mutex_unlock(&pqueue.qlock); } - /* This function initiates the prefetch thread * A queue is shared between the main thread of execution * and the prefetch thread to process the prefetch call * Call from compiler populates the shared queue with prefetch requests while prefetch thread * processes the prefetch requests */ void transInit() { - //Initialize shared queue + int t, rc; + //Create and initialize prefetch cache structure + prefetchcache = objstrCreate(PREFETCH_CACHE_SIZE); + //Create prefetch cache lookup table + if(prehashCreate(HASH_SIZE, LOADFACTOR)) + return; //Failure + //Initialize primary shared queue queueInit(); - //Create the prefetch thread + //Create the primary prefetch thread pthread_create(&tPrefetch, NULL, transPrefetch, NULL); + //Create and Initialize a pool of threads + for(t = 0; t< NUM_THREADS; t++) { + //rc = pthread_create(&wthreads[t], NULL, pfqProcess, (void *)t); + if (rc) { + printf("Thread create error %s, %d\n", __FILE__, __LINE__); + return; + } + } } /* This function stops the threads spawned */ void transExit() { + int t; pthread_cancel(tPrefetch); + for(t = 0; t < NUM_THREADS; t++) + pthread_cancel(wthreads[t]); + return; } -/* This functions inserts randowm wait delays in the order of msec */ +/* This functions inserts randowm wait delays in the order of msec + * Mostly used when transaction commits retry*/ void randomdelay(void) { struct timespec req, rem; @@ -959,22 +901,284 @@ int transAbortProcess(void *modptr, unsigned int *objlocked, int numlocked, int /* This function is called by the thread calling transPrefetch */ void *transPrefetch(void *prefdata) { - compprefetchdata_t *ptr = (compprefetchdata_t *) prefdata; int *offstarray = NULL; + prefetchqelem_t *qnode; while(1) { - //TODO + /* lock mutex of primary prefetch queue */ + pthread_mutex_lock(&pqueue.qlock); + /* while primary queue is empty, then wait */ + while((pqueue.front == NULL) && (pqueue.rear == NULL)) { + pthread_cond_wait(&pqueue.qcond, &pqueue.qlock); + } + + /* dequeue node to create a machine piles and finally unlock mutex */ + if((qnode = dequeue()) == NULL) { + printf("Error: No node returned %s, %d\n", __FILE__, __LINE__); + return NULL; + } + pthread_mutex_unlock(&pqueue.qlock); + /* Reduce redundant prefetch requests */ + /* Group Requests by where objects are located */ + + + + } +} + +/* This function checks if the prefetch oids are same and have same offsets + * for case x.a.b and y.a.b where x and y have same oid's + * or if a.b.c is a subset of x.b.c.d*/ +/* check for case where the generated request a.y.z or x.y.z.g then + * prefetch needs to be generated for x.y.z.g if oid of a and x are same*/ +void checkPrefetchTuples(prefetchqelem_t *node) { + int i,j, count,k, sindex, index; + char *ptr, *tmp; + int ntuples, slength; + unsigned int *oid; + short *endoffsets, *arryfields; + + /* Check for the case x.y.z and a.b.c are same oids */ + ptr = (char *) node; + ntuples = *(GET_NTUPLES(ptr)); + oid = GET_PTR_OID(ptr); + endoffsets = GET_PTR_EOFF(ptr, ntuples); + arryfields = GET_PTR_ARRYFLD(ptr, ntuples); + /* Find offset length for each tuple */ + int numoffset[ntuples]; + numoffset[0] = endoffsets[0]; + for(i = 1; i numoffset[j]){ + slength = numoffset[j]; + sindex = j; + } + else { + slength = numoffset[i]; + sindex = i; + } + + /* Compare the offset values based on the current indices + * break if they do not match + * if all offset values match then pick the largest tuple*/ + + if(i == 0) { + k = 0; + index = endoffsets[j -1]; + for(count = 0; count < slength; count ++) { + if (arryfields[k] != arryfields[index]) { + break; + } + index++; + k++; + } + } else { + printf("i = %d, j = %d\n", i, j); + k = endoffsets[i-1]; + index = endoffsets[j-1]; + printf("Value of slength = %d\n", slength); + for(count = 0; count < slength; count++) { + printf("Value of count =%d\n", count); + if(arryfields[k] != arryfields[index]) { + break; + } + index++; + k++; + } + printf("Value of count =%d\n", count); + } + printf("The value of sindex = %d\n", sindex); + + if(slength == count) { + printf("DEBUG-> Inside slength if %d\n", sindex); + oid[sindex] = -1; + } + } + } + } +} + +void checkPreCache(prefetchqelem_t *node, int *numoffset, int counter, int loopcount, unsigned int objoid, int index, int iter, int oidnfound) { + char *ptr, *tmp; + int ntuples, i, k, flag; + unsigned int * oid; + short *endoffsets, *arryfields; + objheader_t *header; + + ptr = (char *) node; + ntuples = *(GET_NTUPLES(ptr)); + oid = GET_PTR_OID(ptr); + endoffsets = GET_PTR_EOFF(ptr, ntuples); + arryfields = GET_PTR_ARRYFLD(ptr, ntuples); + + if(oidnfound == 1) { + if((header = (objheader_t *) prehashSearch(objoid)) == NULL) { + return; + } else { //Found in Prefetch Cache + //TODO Decide if object is too old, if old remove from cache + tmp = (char *) header; + /* Check if any of the offset oid is available in the Prefetch cache */ + for(i = counter; i < loopcount; i++) { + objoid = *(tmp + sizeof(objheader_t) + arryfields[counter]); + if((header = (objheader_t *)prehashSearch(objoid)) != NULL) { + flag = 0; + } else { + flag = 1; + break; + } + } + } + } else { + for(i = counter; itype]; - objcopy = objstrAlloc(record->cache, size); - memcpy(objcopy, (void *)objheader, size); - /* Insert into cache's lookup table */ - chashInsert(record->lookupTable, objheader->oid, objcopy); - /* Find the offset field*/ - if(foundLocal(arrayofoffset, record, arrayofoffset[i], tmp, arrayofoffset[i][k+1], i) != 0 ) { - printf("Error in foundLocal() %s, %d\n", __FILE__, __LINE__); - return 1; - } - } else - continue; - } else - continue; - } /* Initialize and set thread attributes * Spawn a thread for each prefetch request sent*/ @@ -1057,100 +1233,6 @@ int transPrefetchProcess(transrecord_t *record, int *arrayofoffset[], short numo } -int checkPrefetchTuples(int *arrayofoffset[], int *arraylength, short numoids) { - int i, j, k, matchfound = 0, count = 0, slength, length; - int *sarray, *larray; - - /* Check if the prefetch oids are same and have same offsets - * for case x.a.b and y.a.b where x and y have same oid's - * or if a.b.c is a subset of x.b.c.d*/ - /* check for case where the generated request a.y.z or x.y.z.g then - * prefetch needs to be generated for x.y.z.g if oid of a and x are same*/ - for(i = 0; i < numoids -1 ; i++) { - if(arrayofoffset[i][0] == -1) - continue; - for( j = 0; j < (numoids - (i+1)); j++) { - if(arrayofoffset[j][0] == -1) - continue; - /* If the oids of the tuples match */ - if((arrayofoffset[i][0] == arrayofoffset[j][0]) && (i != j )) { - /*Find the smallest length of two arrays, find ptrs to smallest and long array */ - if(arraylength[i] > arraylength[j]) { - slength = arraylength[j]; - sarray = arrayofoffset[j]; - larray = arrayofoffset[i]; - } else { - slength = arraylength[i]; - sarray = arrayofoffset[i]; - larray = arrayofoffset[j]; - } - /* From first offset until end of tuple compare all offsets of sarray and larray - * if not a match then break */ - for(k = 1 ; k < slength -1 ; k++) { - if(sarray[k] != larray[k]) { - break; - } - } - /* If number of same offsets encountered is same as - * no of offsets in small array then common tuples found*/ - if(k == (slength -1)) - sarray[0] = -1; - } - } - } - - return 0; -} - -/* This function goes through an array and finds out until what offets - * * can it make a new pile - * * -1 indicates a special character to mark end of oid, offset tuple*/ -int foundLocal(int *arrayofoffset[], transrecord_t *record, int *array, objheader_t *header, int offset, int index) { - int i, j, counter, len; - unsigned int oid; - void *objcopy; - objheader_t *tmp, *objheader; - int size; - - /* The oid corresponding to the first offset */ - oid = (int) (header + sizeof(objheader_t) + offset); - /* Repeat for all offset values in the array */ - for(i = 1; array[i] != -1 ; ) { - /* If oid is found locally then insert into cache and continue - * with next offset to find the corresponding oid */ - if((objheader = (objheader_t*) mhashSearch(oid)) != NULL) { - tmp = mhashSearch(oid); - size = sizeof(objheader_t)+classsize[tmp->type]; - objcopy = objstrAlloc(record->cache, size); - memcpy(objcopy, (void *)tmp, size); - /* Insert into cache's lookup table */ - chashInsert(record->lookupTable, objheader->oid, objcopy); - oid = (int) (tmp + sizeof(objheader_t) + array[i+1]); - } else { - /*If oid not found locally then - *assign the latest oid found as the first element - *of array and copy left over offsets as other - *array elements to build the new prefetch array*/ - - arrayofoffset[index][0] = (int) oid; - counter = i; - len = arrayLength((int *)array[index]); - for( j = 1 ; counter <= len; j++,counter++) { - arrayofoffset[index][j] = array[counter + 1]; - } - break; - } - i++; - } - - /* If all offsets are found locally, then do not treat this as valid prefetch tuple */ - if(i == arrayLength((int *)array[index]) - 1) { - arrayofoffset[index][0] = -1; //Mark beginning of array as -1 - } - - return 0; -} - void *sendPrefetchReq(void *prefetchtuple) { int sd, i; struct sockaddr_in serv_addr; -- 2.34.1