From: adash Date: Thu, 19 Jul 2007 16:34:15 +0000 (+0000) Subject: a. Change queue implementation to linked list X-Git-Tag: preEdgeChange~517 X-Git-Url: http://demsky.eecs.uci.edu/git/?a=commitdiff_plain;h=d94659160520a148559d5ac90d1cc29afdc8366e;p=IRC.git a. Change queue implementation to linked list b. add new library for prefetch cache lookup c. Inittialize data structures d. prefetch call generated by compiler e. primary prefetch thread processing included --- diff --git a/Robust/src/Runtime/DSTM/interface/Makefile b/Robust/src/Runtime/DSTM/interface/Makefile index 7f38b40b..050152fd 100644 --- a/Robust/src/Runtime/DSTM/interface/Makefile +++ b/Robust/src/Runtime/DSTM/interface/Makefile @@ -1,22 +1,22 @@ d-3: - gcc -dr -lpthread -g -o d-3 trans.c testd-3.c mlookup.c clookup.c llookup.c dstm.c objstr.c dstmserver.c plookup.c ip.c queue.c + gcc -dr -lpthread -g -o d-3 trans.c testd-3.c mlookup.c clookup.c llookup.c dstm.c objstr.c dstmserver.c plookup.c ip.c queue.c prelookup.c demsky: - gcc -DDEBUG -lpthread -g -o demsky dstmserver.c testserver.c plookup.c mlookup.c clookup.c llookup.c dstm.c objstr.c trans.c ip.c queue.c + gcc -DDEBUG -lpthread -g -o demsky dstmserver.c testserver.c plookup.c mlookup.c clookup.c llookup.c dstm.c objstr.c trans.c ip.c queue.c prelookup.c d-4: - gcc -lpthread -g -o d-4 trans.c testd-4.c mlookup.c clookup.c llookup.c dstm.c objstr.c dstmserver.c plookup.c ip.c queue.c + gcc -lpthread -g -o d-4 trans.c testd-4.c mlookup.c clookup.c llookup.c dstm.c objstr.c dstmserver.c plookup.c ip.c queue.c prelookup.c all: - gcc -lpthread -g -o d-3 trans.c testd-3.c mlookup.c clookup.c llookup.c dstm.c objstr.c dstmserver.c plookup.c ip.c queue.c - gcc -lpthread -g -o demsky dstmserver.c testserver.c plookup.c mlookup.c clookup.c llookup.c dstm.c objstr.c trans.c ip.c queue.c - gcc -lpthread -g -o d-4 trans.c testd-4.c mlookup.c clookup.c llookup.c dstm.c objstr.c dstmserver.c plookup.c ip.c queue.c + gcc -lpthread -g -o d-3 trans.c testd-3.c mlookup.c clookup.c llookup.c dstm.c objstr.c dstmserver.c plookup.c ip.c queue.c prelookup.c + gcc -lpthread -g -o demsky dstmserver.c testserver.c plookup.c mlookup.c clookup.c llookup.c dstm.c objstr.c trans.c ip.c queue.c prelookup.c + gcc -lpthread -g -o d-4 trans.c testd-4.c mlookup.c clookup.c llookup.c dstm.c objstr.c dstmserver.c plookup.c ip.c queue.c prelookup.c mac: - gcc -DMAC -lpthread -g -o d-3 trans.c testd-3.c mlookup.c clookup.c llookup.c dstm.c objstr.c dstmserver.c plookup.c ip.c queue.c - gcc -DMAC -lpthread -g -o demsky dstmserver.c testserver.c plookup.c mlookup.c clookup.c llookup.c dstm.c objstr.c trans.c ip.c queue.c - gcc -DMAC -lpthread -g -o d-4 trans.c testd-4.c mlookup.c clookup.c llookup.c dstm.c objstr.c dstmserver.c plookup.c ip.c queue.c + gcc -DMAC -lpthread -g -o d-3 trans.c testd-3.c mlookup.c clookup.c llookup.c dstm.c objstr.c dstmserver.c plookup.c ip.c queue.c prelookup.c + gcc -DMAC -lpthread -g -o demsky dstmserver.c testserver.c plookup.c mlookup.c clookup.c llookup.c dstm.c objstr.c trans.c ip.c queue.c prelookup.c + gcc -DMAC -lpthread -g -o d-4 trans.c testd-4.c mlookup.c clookup.c llookup.c dstm.c objstr.c dstmserver.c plookup.c ip.c queue.c prelookup.c clean: rm -rf d-3 d-4 demsky diff --git a/Robust/src/Runtime/DSTM/interface/dstm.h b/Robust/src/Runtime/DSTM/interface/dstm.h index cfcbecfd..c937a57b 100644 --- a/Robust/src/Runtime/DSTM/interface/dstm.h +++ b/Robust/src/Runtime/DSTM/interface/dstm.h @@ -32,12 +32,16 @@ #define OBJ_UNLOCK_BUT_VERSION_MATCH 15 #define VERSION_NO_MATCH 16 +//Max number of objects +#define MAX_OBJECTS 20 + #include #include #include #include #include "clookup.h" +#include "queue.h" #define DEFAULT_OBJ_STORE_SIZE 1048510 //1MB #define TID_LEN 20 @@ -113,7 +117,6 @@ typedef struct thread_data_array { int *count; //variable to count responses of TRANS_REQUEST protocol from all participants char *replyctrl; //shared ctrl message that stores the reply to be sent, filled by decideResp char *replyretry; //shared variable to find out if we need retry (TRANS_COMMIT case) -// char *localstatus; //shared variable to identify local requests transrecord_t *rec; // To send modified objects } thread_data_array_t; @@ -139,12 +142,22 @@ typedef struct member { //Structure for prefetching tuples generated by teh compiler - typedef struct trans_prefetchtuple{ - unsigned int depth; - unsigned int oid; - trans_member_t member; - struct trans_prefetchtuple *next; - }trans_prefetchtuple_t; + typedef struct prefetchpile{ + int mid; + int *oids; + + int **numofarrys; + struct prefetchpile *next; + }prefetchpile_t; + +//Structure per Oid in the prefetch call + +/* +//Structure that holds the compiler generated prefetch data +typedef struct compprefetchdata { + transrecord_t *record; +} compprefetchdata_t; +*/ /* Initialize main object store and lookup tables, start server thread. */ int dstmInit(void); @@ -183,5 +196,13 @@ char sendResponse(thread_data_array_t *, int); //Sends control message back to P void *getRemoteObj(transrecord_t *, unsigned int, unsigned int); int transAbortProcess(void *, unsigned int *, int, int, int); int transComProcess(trans_commit_data_t *); +void prefetch(int, unsigned int *, short *, short*); +void *transPrefetch(void *); +void checkPrefetchTuples(prefetchqelem_t *); +void foundLocal(prefetchqelem_t *); +void makePreGroups(prefetchqelem_t *node); +void checkPreCache(prefetchqelem_t *, int *, int, int, unsigned int, int, int, int); +int transPrefetchProcess(transrecord_t *, int **, short); +void *sendPrefetchReq(void *); /* end transactions */ #endif diff --git a/Robust/src/Runtime/DSTM/interface/prelookup.c b/Robust/src/Runtime/DSTM/interface/prelookup.c new file mode 100644 index 00000000..77733dfd --- /dev/null +++ b/Robust/src/Runtime/DSTM/interface/prelookup.c @@ -0,0 +1,202 @@ + #include "prelookup.h" + +prehashtable_t pflookup; //Global prefetch cache table + +unsigned int prehashCreate(unsigned int size, float loadfactor) { + prehashlistnode_t *nodes; + int i; + + // Allocate space for the hash table + if((nodes = calloc(size, sizeof(prehashlistnode_t))) == NULL) { + printf("Calloc error %s %d\n", __FILE__, __LINE__); + return 1; + } + + pflookup.table = nodes; + pflookup.size = size; + pflookup.numelements = 0; // Initial number of elements in the hash + pflookup.loadfactor = loadfactor; + + //Initialize mutex var + pthread_mutex_init(&pflookup.lock, NULL); + + return 0; +} + +//Assign keys to bins inside hash table +unsigned int prehashFunction(unsigned int key) { + return ( key % (pflookup.size)); +} + +//Store oids and their pointers into hash +unsigned int prehashInsert(unsigned int key, void *val) { + unsigned int newsize; + int index; + prehashlistnode_t *ptr, *node; + + if(pflookup.numelements > (pflookup.loadfactor * pflookup.size)) { + //Resize + newsize = 2 * pflookup.size + 1; + pthread_mutex_lock(&pflookup.lock); + prehashResize(newsize); + pthread_mutex_unlock(&pflookup.lock); + } + + ptr = pflookup.table; + pflookup.numelements++; + index = prehashFunction(key); + + pthread_mutex_lock(&pflookup.lock); + if(ptr[index].next == NULL && ptr[index].key == 0) { // Insert at the first position in the hashtable + ptr[index].key = key; + ptr[index].val = val; + } else { // Insert in the beginning of linked list + if ((node = calloc(1, sizeof(prehashlistnode_t))) == NULL) { + printf("Calloc error %s, %d\n", __FILE__, __LINE__); + pthread_mutex_unlock(&pflookup.lock); + return 1; + } + node->key = key; + node->val = val ; + node->next = ptr[index].next; + ptr[index].next = node; + } + pthread_mutex_unlock(&pflookup.lock); + return 0; +} + +// Search for an address for a given oid +void *prehashSearch(unsigned int key) { + int index; + prehashlistnode_t *ptr, *node; + + ptr = pflookup.table; + index = prehashFunction(key); + node = &ptr[index]; + pthread_mutex_lock(&pflookup.lock); + while(node != NULL) { + if(node->key == key) { + pthread_mutex_unlock(&pflookup.lock); + return node->val; + } + node = node->next; + } + pthread_mutex_unlock(&pflookup.lock); + return NULL; +} + +unsigned int prehashRemove(unsigned int key) { + int index; + prehashlistnode_t *curr, *prev; + prehashlistnode_t *ptr, *node; + + ptr = pflookup.table; + index = prehashFunction(key); + curr = &ptr[index]; + + pthread_mutex_lock(&pflookup.lock); + for (; curr != NULL; curr = curr->next) { + if (curr->key == key) { // Find a match in the hash table + pflookup.numelements--; // Decrement the number of elements in the global hashtable + if ((curr == &ptr[index]) && (curr->next == NULL)) { // Delete the first item inside the hashtable with no linked list of prehashlistnode_t + curr->key = 0; + curr->val = NULL; + } else if ((curr == &ptr[index]) && (curr->next != NULL)) { //Delete the first item with a linked list of prehashlistnode_t connected + curr->key = curr->next->key; + curr->val = curr->next->val; + node = curr->next; + curr->next = curr->next->next; + free(node); + } else { // Regular delete from linked listed + prev->next = curr->next; + free(curr); + } + pthread_mutex_unlock(&pflookup.lock); + return 0; + } + prev = curr; + } + pthread_mutex_unlock(&pflookup.lock); + return 1; +} + +unsigned int prehashResize(unsigned int newsize) { + prehashlistnode_t *node, *ptr, *curr, *next; // curr and next keep track of the current and the next chashlistnodes in a linked list + unsigned int oldsize; + int isfirst; // Keeps track of the first element in the prehashlistnode_t for each bin in hashtable + int i,index; + prehashlistnode_t *newnode; + + ptr = pflookup.table; + oldsize = pflookup.size; + + if((node = calloc(newsize, sizeof(prehashlistnode_t))) == NULL) { + printf("Calloc error %s %d\n", __FILE__, __LINE__); + return 1; + } + + pflookup.table = node; //Update the global hashtable upon resize() + pflookup.size = newsize; + pflookup.numelements = 0; + + for(i = 0; i < oldsize; i++) { //Outer loop for each bin in hash table + curr = &ptr[i]; + isfirst = 1; + while (curr != NULL) { //Inner loop to go through linked lists + if (curr->key == 0) { //Exit inner loop if there the first element for a given bin/index is NULL + break; //key = val =0 for element if not present within the hash table + } + next = curr->next; + index = prehashFunction(curr->key); + // Insert into the new table + if(pflookup.table[index].next == NULL && pflookup.table[index].key == 0) { + pflookup.table[index].key = curr->key; + pflookup.table[index].val = curr->val; + pflookup.numelements++; + }else { + if((newnode = calloc(1, sizeof(prehashlistnode_t))) == NULL) { + printf("Calloc error %s, %d\n", __FILE__, __LINE__); + return 1; + } + newnode->key = curr->key; + newnode->val = curr->val; + newnode->next = pflookup.table[index].next; + pflookup.table[index].next = newnode; + pflookup.numelements++; + } + + //free the linked list of prehashlistnode_t if not the first element in the hash table + if (isfirst != 1) { + free(curr); + } + + isfirst = 0; + curr = next; + } + } + + free(ptr); //Free the memory of the old hash table + return 0; +} + +/* Deletes the prefetch Cache */ +void prehashDelete() { + int i, isFirst; + prehashlistnode_t *ptr, *curr, *next; + ptr = pflookup.table; + + for(i=0 ; inext; + if(isFirst != 1) { + free(curr); + } + isFirst = 0; + curr = next; + } + } + + free(ptr); +} diff --git a/Robust/src/Runtime/DSTM/interface/prelookup.h b/Robust/src/Runtime/DSTM/interface/prelookup.h new file mode 100644 index 00000000..0f68d20b --- /dev/null +++ b/Robust/src/Runtime/DSTM/interface/prelookup.h @@ -0,0 +1,35 @@ +#ifndef _PRELOOKUP_H_ +#define _PRELOOKUP_H_ + +#include +#include +#include + +#define LOADFACTOR 0.75 +#define HASH_SIZE 100 + +typedef struct prehashlistnode { + unsigned int key; + void *val; //this can be cast to another type or used to point to a larger structure + struct prehashlistnode *next; +} prehashlistnode_t; + +typedef struct prehashtable { + prehashlistnode_t *table; // points to beginning of hash table + unsigned int size; + unsigned int numelements; + float loadfactor; + pthread_mutex_t lock; +} prehashtable_t; + +/* Prototypes for hash*/ +unsigned int prehashCreate(unsigned int size, float loadfactor); +unsigned int prehashFunction(unsigned int key); +unsigned int prehashInsert(unsigned int key, void *val); +void *prehashSearch(unsigned int key); //returns val, NULL if not found +unsigned int prehashRemove(unsigned int key); //returns -1 if not found +unsigned int prehashResize(unsigned int newsize); +/* end hash */ + +#endif + diff --git a/Robust/src/Runtime/DSTM/interface/queue.c b/Robust/src/Runtime/DSTM/interface/queue.c index fc12eaab..f164528f 100644 --- a/Robust/src/Runtime/DSTM/interface/queue.c +++ b/Robust/src/Runtime/DSTM/interface/queue.c @@ -1,83 +1,103 @@ #include "queue.h" -prefetchthreadqueue_t queue; //Global shared prefetch queue +primarypfq_t pqueue; //Global queue -void queueInsert(int *array) { - pthread_mutex_lock(&queue.qlock); - queue.rear = queue.rear % ARRAY_SIZE; - if(queue.front == queue.rear && queue.buffer[queue.front] != NULL) { - printf("The Circular Queue is Full : OVERFLOW\n"); - pthread_mutex_unlock(&queue.qlock); +void queueInit(void) { + /* Intitialize primary thread */ + pqueue.front = pqueue.rear = NULL; + pthread_mutex_init(&pqueue.qlock, NULL); + pthread_cond_init(&pqueue.qcond, NULL); +} + +/* Removes the first element of the queue */ +void delqnode() { + prefetchqelem_t *delnode; + if((pqueue.front == NULL) && (pqueue.rear == NULL)) { + printf("The queue is empty: UNDERFLOW %s, %d\n", __FILE__, __LINE__); return; + } else if ((pqueue.front == pqueue.rear) && pqueue.front != NULL && pqueue.rear != NULL) { + printf("TEST1\n"); + free(pqueue.front); + pqueue.front = pqueue.rear = NULL; } else { - queue.buffer[queue.rear] = array; - queue.rear++; + delnode = pqueue.front; + pqueue.front = pqueue.front->next; + printf("TEST2\n"); + free(delnode); } - pthread_mutex_unlock(&queue.qlock); } -int *queueDelete() { - int *i; - i = NULL; - pthread_mutex_lock(&queue.qlock); - if(queue.front == queue.rear && queue.buffer[queue.front] == NULL) { - printf("The Circular Queue is Empty : UNDERFLOW\n"); - pthread_mutex_unlock(&queue.qlock); - return NULL; +void queueDelete(void) { + /* Remove each element */ + while(pqueue.front != NULL) + delqnode(); + pqueue.front = pqueue.rear = NULL; +} + +/* Inserts to the rear of primary prefetch queue */ +void enqueue(prefetchqelem_t *qnode) { + if(pqueue.front == NULL && pqueue.rear == NULL) { + pqueue.front = pqueue.rear = qnode; } else { - i = queue.buffer[queue.front]; - queue.buffer[queue.front] = NULL; - queue.front++; - queue.front = queue.front % ARRAY_SIZE; - pthread_mutex_unlock(&queue.qlock); - return i; + qnode->next = NULL; + pqueue.rear->next = qnode; + pqueue.rear = qnode; } } -void queueInit() { - int i; - queue.front = 0; - queue.rear = 0; - for(i = 0; i< ARRAY_SIZE; i++) - queue.buffer[i] = NULL; - /* Initialize the pthread_mutex variable */ - pthread_mutex_init(&queue.qlock, NULL); +prefetchqelem_t *dequeue(void) { + prefetchqelem_t *retnode; + if (pqueue.front == NULL) { + printf("Queue empty: Underflow %s, %d\n", __FILE__, __LINE__); + return NULL; + } + retnode = pqueue.front; + //TODO make this atomic + pqueue.front = pqueue.front->next; + + return retnode; } -/* For testing purposes */ +void queueDisplay() { + int offset = sizeof(prefetchqelem_t); + int *ptr; + int ntuples; + char *ptr1; + prefetchqelem_t *tmp = pqueue.front; + while(tmp != NULL) { + ptr1 = (char *) tmp; + ptr = (int *)(ptr1 + offset); + ntuples = *ptr; + printf("Number of tuples = %d\n", ntuples); + tmp = tmp->next; + } +} + + #if 0 main() { - int *d; - queueIsEmpty(); - int a[] = {5, 2, 8, -1}; - int b[] = {11, 8, 4, 19, -1}; - int c[] = {16, 8, 4, -1}; - printf("Front = %d, Rear = %d\n",queue.front, queue.rear); - d = queueDelete(); - printf("Front = %d, Rear = %d\n",queue.front, queue.rear); - queueInsert(a); - printf("Enqueued ptr is %x\n", a); - printf("1st Insert Front = %d, Rear = %d\n",queue.front, queue.rear); - queueInsert(b); - printf("Enqueued ptr is %x\n", b); - printf("2nd Insert Front = %d, Rear = %d\n",queue.front, queue.rear); - queueInsert(c); - printf("3rd Insert Front = %d, Rear = %d\n",queue.front, queue.rear); - d = queueDelete(); - printf("Dequeued ptr is %x\n", d); - printf("After 1st del Front = %d, Rear = %d\n",queue.front, queue.rear); - queueInsert(c); - printf("Enqueued ptr is %x\n", c); - printf("After 4th insert Front = %d, Rear = %d\n",queue.front, queue.rear); - d = queueDelete(); - printf("Dequeued ptr is %x\n", d); - printf("After 2nd del Front = %d, Rear = %d\n",queue.front, queue.rear); - d = queueDelete(); - printf("Dequeued ptr is %x\n", d); - printf("After 3rd del Front = %d, Rear = %d\n",queue.front, queue.rear); - d = queueDelete(); - printf("Dequeued ptr is %x\n", d); - printf("After 4th del Front = %d, Rear = %d\n",queue.front, queue.rear); + unsigned int oids[] = {11, 13}; + short endoffsets[] = {2, 5}; + short arrayfields[] = {2, 2, 1, 5, 6}; + queueInit(); + queueDisplay(); + prefetch(2, oids, endoffsets, arrayfields); + queueDisplay(); + unsigned int oids1[] = {21, 23, 25, 27}; + short endoffsets1[] = {1, 2, 3, 4}; + short arrayfields1[] = {3, 2, 1, 3}; + prefetch(4, oids1, endoffsets1, arrayfields1); + queueDisplay(); + delqnode(); + queueDisplay(); + delqnode(); + queueDisplay(); + delqnode(); + queueDisplay(); + delqnode(); + } #endif + + diff --git a/Robust/src/Runtime/DSTM/interface/queue.h b/Robust/src/Runtime/DSTM/interface/queue.h index 9ad5df45..ea7dba2d 100644 --- a/Robust/src/Runtime/DSTM/interface/queue.h +++ b/Robust/src/Runtime/DSTM/interface/queue.h @@ -4,19 +4,24 @@ #include #include #include - -#define ARRAY_SIZE 20 +#include // DS that contains information to be shared between threads. -typedef struct prefetchthreadqueue { - int *buffer[ARRAY_SIZE]; - int front; - int rear; +typedef struct prefetchqelem { + struct prefetchqelem *next; +} prefetchqelem_t; + +typedef struct primarypfq { + prefetchqelem_t *front, *rear; pthread_mutex_t qlock; -} prefetchthreadqueue_t; + pthread_cond_t qcond; +} primarypfq_t; -void queueInsert(int *); -int *queueDelete(); -void queueInit(); //Initializes the queue and qlock mutex +void queueInit(void); +void delqnode(); +void queueDelete(void); +void enqueue(prefetchqelem_t *qnode); +prefetchqelem_t *dequeue(void); +void queueDisplay(); #endif diff --git a/Robust/src/Runtime/DSTM/interface/testd-3.c b/Robust/src/Runtime/DSTM/interface/testd-3.c index 1dfcea0c..d3261a21 100644 --- a/Robust/src/Runtime/DSTM/interface/testd-3.c +++ b/Robust/src/Runtime/DSTM/interface/testd-3.c @@ -133,7 +133,7 @@ int test2a(void) { lhashInsert(header->oid, mid); //Inserting into lhashtable - mid = iptoMid("128.200.9.30"); //d-4.eecs.uci.edu + mid = iptoMid("128.195.175.69"); //dw-1.eecs.uci.edu lhashInsert(31, mid); lhashInsert(32, mid); @@ -148,9 +148,9 @@ int test2a(void) { //Check if machine demsky is up and running checkServer(mid, "128.200.9.10"); - mid = iptoMid("128.200.9.30"); - //Check if machine d-4 is up and running - checkServer(mid, "128.200.9.30"); + mid = iptoMid("128.195.175.69"); + //Check if machine dw-1 is up and running + checkServer(mid, "128.195.175.69"); // Start Transaction myTrans = transStart(); @@ -222,7 +222,7 @@ int test2b(void) { lhashInsert(header->oid, mid); //Inserting into lhashtable - mid = iptoMid("128.200.9.30"); //d-4.eecs.uci.edu + mid = iptoMid("128.195.175.69"); //dw-1.eecs.uci.edu lhashInsert(31, mid); lhashInsert(32, mid); lhashInsert(33, mid); @@ -238,9 +238,9 @@ int test2b(void) { //Check if machine demsky is up and running checkServer(mid, "128.200.9.10"); - mid = iptoMid("128.200.9.30"); - //Check if machine d-4 is up and running - checkServer(mid, "128.200.9.30"); + mid = iptoMid("128.195.175.69"); + //Check if machine dw-1 is up and running + checkServer(mid, "128.195.175.69"); // Start Transaction myTrans = transStart(); @@ -259,7 +259,7 @@ int test2b(void) { printf("Object not found\n"); } - //read object 32 (found on d-4) + //read object 32 (found on dw-1) if((h4 = transRead(myTrans, 32)) == NULL) { printf("Object not found\n"); } @@ -378,7 +378,7 @@ int test5(void) { lhashInsert(header->oid, mid); //Inserting into lhashtable - mid = iptoMid("128.200.9.30"); //d-4.eecs.uci.edu + mid = iptoMid("128.195.175.69"); //dw-1.eecs.uci.edu lhashInsert(31, mid); lhashInsert(32, mid); lhashInsert(33, mid); @@ -393,9 +393,9 @@ int test5(void) { pthread_create(&thread_Listen, &attr, dstmListen, NULL); //Check if machine demsky is up and running checkServer(mid, "128.200.9.10"); - mid = iptoMid("128.200.9.30"); - //Check if machine d-4 is up and running - checkServer(mid, "128.200.9.30"); + mid = iptoMid("128.195.175.69"); + //Check if machine dw-1 is up and running + checkServer(mid, "128.195.175.69"); // Start Transaction myTrans = transStart(); @@ -405,11 +405,11 @@ int test5(void) { printf("Object not found\n"); } //read object 2 (found on demksy) - if((h1 = transRead(myTrans,2)) == NULL){ + if((h2 = transRead(myTrans,2)) == NULL){ printf("Object not found\n"); } - //read object 31 (found on d-4) - if((h2 = transRead(myTrans, 31)) == NULL) { + //read object 22 (found locally ) + if((h3 = transRead(myTrans, 22)) == NULL) { printf("Object not found\n"); } @@ -449,7 +449,7 @@ int test5a(void) { header = (objheader_t *) objstrAlloc(mainobjstore, size); header->oid = 21; header->type = 1; - //read object 31 (found on d-4) + //read object 31 (found on dw-1) if((h2 = transRead(myTrans, 31)) == NULL) { printf("Object not found\n"); } @@ -512,7 +512,7 @@ int test5b(void) { lhashInsert(header->oid, mid); //Inserting into lhashtable - mid = iptoMid("128.200.9.30"); //d-4.eecs.uci.edu + mid = iptoMid("128.195.175.69"); //dw-1.eecs.uci.edu lhashInsert(31, mid); lhashInsert(32, mid); lhashInsert(33, mid); @@ -528,9 +528,9 @@ int test5b(void) { //Check if machine demsky is up and running checkServer(mid, "128.200.9.10"); - mid = iptoMid("128.200.9.30"); - //Check if machine d-4 is up and running - checkServer(mid, "128.200.9.30"); + mid = iptoMid("128.195.175.69"); + //Check if machine dw-1 is up and running + checkServer(mid, "128.195.175.69"); // Start Transaction myTrans = transStart(); @@ -539,7 +539,7 @@ int test5b(void) { if((h1 = transRead(myTrans, 1)) == NULL){ printf("Object not found\n"); } - //read object 31 (found on d-4) + //read object 31 (found on dw-1) if((h2 = transRead(myTrans, 31)) == NULL) { printf("Object not found\n"); } @@ -612,7 +612,7 @@ int test7(void) { lhashInsert(header->oid, mid); //Inserting into lhashtable - mid = iptoMid("128.200.9.30"); //d-4.eecs.uci.edu + mid = iptoMid("128.195.175.69"); //dw-1.eecs.uci.edu lhashInsert(31, mid); lhashInsert(32, mid); lhashInsert(33, mid); @@ -628,9 +628,9 @@ int test7(void) { //Check if machine demsky is up and running checkServer(mid, "128.200.9.10"); - mid = iptoMid("128.200.9.30"); - //Check if machine d-4 is up and running - checkServer(mid, "128.200.9.30"); + mid = iptoMid("128.195.175.69"); + //Check if machine dw-1 is up and running + checkServer(mid, "128.195.175.69"); // Start Transaction myTrans = transStart(); @@ -639,7 +639,7 @@ int test7(void) { if((h1 = transRead(myTrans, 3)) == NULL){ printf("Object not found\n"); } - //read object 32 (found on d-4) + //read object 32 (found on dw-1) if((h2 = transRead(myTrans, 32)) == NULL) { printf("Object not found\n"); } diff --git a/Robust/src/Runtime/DSTM/interface/testd-4.c b/Robust/src/Runtime/DSTM/interface/testd-4.c index 9876bdb2..26dba3a9 100644 --- a/Robust/src/Runtime/DSTM/interface/testd-4.c +++ b/Robust/src/Runtime/DSTM/interface/testd-4.c @@ -41,7 +41,7 @@ int test1() { header = (objheader_t *) objstrAlloc(mainobjstore, size); init_obj(header, 31, 2, 1, 0, NEW); mhashInsert(header->oid, header); - mid = iptoMid("128.200.9.30"); + mid = iptoMid("128.195.175.69"); lhashInsert(header->oid, mid); //Create and Insert Oid 32 @@ -49,7 +49,7 @@ int test1() { header = (objheader_t *) objstrAlloc(mainobjstore, size); init_obj(header, 32, 1, 1, 0, NEW); mhashInsert(header->oid, header); - mid = iptoMid("128.200.9.30"); + mid = iptoMid("128.195.175.69"); lhashInsert(header->oid, mid); //Create and Insert Oid 33 @@ -57,7 +57,7 @@ int test1() { header = (objheader_t *) objstrAlloc(mainobjstore, size); init_obj(header, 33, 0, 1, 0, NEW); mhashInsert(header->oid, header); - mid = iptoMid("128.200.9.30"); + mid = iptoMid("128.195.175.69"); lhashInsert(header->oid, mid); //Inserting into lhashtable into d-3.eecs @@ -104,7 +104,7 @@ int test2() { header = (objheader_t *) objstrAlloc(mainobjstore, size); init_obj(header, 31, 2, 1, 0, NEW); mhashInsert(header->oid, header); - mid = iptoMid("128.200.9.30"); + mid = iptoMid("128.195.175.69"); lhashInsert(header->oid, mid); //Create and Insert Oid 32 @@ -112,7 +112,7 @@ int test2() { header = (objheader_t *) objstrAlloc(mainobjstore, size); init_obj(header, 32, 1, 1, 0, NEW); mhashInsert(header->oid, header); - mid = iptoMid("128.200.9.30"); + mid = iptoMid("128.195.175.69"); lhashInsert(header->oid, mid); //Create and Insert Oid 33 @@ -120,7 +120,7 @@ int test2() { header = (objheader_t *) objstrAlloc(mainobjstore, size); init_obj(header, 33, 0, 1, 0, NEW); mhashInsert(header->oid, header); - mid = iptoMid("128.200.9.30"); + mid = iptoMid("128.195.175.69"); lhashInsert(header->oid, mid); //Inserting into lhashtable into d-3.eecs @@ -190,7 +190,7 @@ int test3() { header = (objheader_t *) objstrAlloc(mainobjstore, size); init_obj(header, 31, 2, 1, 0, NEW); mhashInsert(header->oid, header); - mid = iptoMid("128.200.9.30"); + mid = iptoMid("128.195.175.69"); lhashInsert(header->oid, mid); //Create and Insert Oid 32 @@ -198,7 +198,7 @@ int test3() { header = (objheader_t *) objstrAlloc(mainobjstore, size); init_obj(header, 32, 1, 1, 0, NEW); mhashInsert(header->oid, header); - mid = iptoMid("128.200.9.30"); + mid = iptoMid("128.195.175.69"); lhashInsert(header->oid, mid); //Create and Insert Oid 33 @@ -206,7 +206,7 @@ int test3() { header = (objheader_t *) objstrAlloc(mainobjstore, size); init_obj(header, 33, 0, 1, 0, NEW); mhashInsert(header->oid, header); - mid = iptoMid("128.200.9.30"); + mid = iptoMid("128.195.175.69"); lhashInsert(header->oid, mid); //Inserting into lhashtable into d-3.eecs @@ -272,7 +272,7 @@ int test4() { header = (objheader_t *) objstrAlloc(mainobjstore, size); init_obj(header, 31, 2, 1, 0, NEW); mhashInsert(header->oid, header); - mid = iptoMid("128.200.9.30"); + mid = iptoMid("128.195.175.69"); lhashInsert(header->oid, mid); //Create and Insert Oid 32 @@ -280,7 +280,7 @@ int test4() { header = (objheader_t *) objstrAlloc(mainobjstore, size); init_obj(header, 32, 1, 1, 0, NEW); mhashInsert(header->oid, header); - mid = iptoMid("128.200.9.30"); + mid = iptoMid("128.195.175.69"); lhashInsert(header->oid, mid); //Create and Insert Oid 33 @@ -288,7 +288,7 @@ int test4() { header = (objheader_t *) objstrAlloc(mainobjstore, size); init_obj(header, 33, 0, 1, 0, NEW); mhashInsert(header->oid, header); - mid = iptoMid("128.200.9.30"); + mid = iptoMid("128.195.175.69"); lhashInsert(header->oid, mid); //Inserting into lhashtable into d-3.eecs diff --git a/Robust/src/Runtime/DSTM/interface/testserver.c b/Robust/src/Runtime/DSTM/interface/testserver.c index 61e397db..85d3b920 100644 --- a/Robust/src/Runtime/DSTM/interface/testserver.c +++ b/Robust/src/Runtime/DSTM/interface/testserver.c @@ -44,7 +44,7 @@ void init_obj(objheader_t *h, unsigned int oid, unsigned short type, \ int main() { - //test1(); +// test1(); // test3(); test4(); } @@ -189,15 +189,15 @@ int test3() { lhashInsert(21, mid); lhashInsert(22, mid); - mid = iptoMid("128.200.9.30"); //d-4.eecs.uci.edu + mid = iptoMid("128.195.175.69"); //dw-1.eecs.uci.edu //Inserting into lhashtable lhashInsert(31, mid); lhashInsert(32, mid); lhashInsert(33, mid); pthread_create(&thread_Listen, &attr, dstmListen, NULL); - //Check if machine d-4 is up and running - checkServer(mid, "128.200.9.30"); + //Check if machine dw-1 is up and running + checkServer(mid, "128.195.175.69"); mid = iptoMid("128.200.9.29"); //Check if machine d-3 is up and running checkServer(mid, "128.200.9.29"); @@ -281,15 +281,15 @@ int test4() { lhashInsert(21, mid); lhashInsert(22, mid); - mid = iptoMid("128.200.9.30"); //d-4.eecs.uci.edu + mid = iptoMid("128.195.175.69"); //dw-1.eecs.uci.edu //Inserting into lhashtable lhashInsert(31, mid); lhashInsert(32, mid); lhashInsert(33, mid); pthread_create(&thread_Listen, &attr, dstmListen, NULL); - //Check if machine d-4 is up and running - checkServer(mid, "128.200.9.30"); + //Check if machine dw-1 is up and running + checkServer(mid, "128.195.175.69"); mid = iptoMid("128.200.9.29"); //Check if machine d-3 is up and running checkServer(mid, "128.200.9.29"); @@ -306,7 +306,7 @@ int test4() { if((h2 = transRead(myTrans, 1)) == NULL) { printf("Object not found\n"); } - //read object 31(present in d-4 machine) + //read object 31(present in dw-1 machine) if((h3 = transRead(myTrans, 31)) == NULL) { printf("Object not found\n"); } diff --git a/Robust/src/Runtime/DSTM/interface/trans.c b/Robust/src/Runtime/DSTM/interface/trans.c index d36b585b..2bf2d019 100644 --- a/Robust/src/Runtime/DSTM/interface/trans.c +++ b/Robust/src/Runtime/DSTM/interface/trans.c @@ -4,7 +4,7 @@ #include "mlookup.h" #include "llookup.h" #include "plookup.h" -//#include "queue.h" +#include "queue.h" #include #include #include @@ -14,117 +14,47 @@ #include #include #include +#include #define LISTEN_PORT 2156 #define RECEIVE_BUFFER_SIZE 2048 +#define NUM_THREADS 10 +#define PREFETCH_CACHE_SIZE 1048576 //1MB + +#define GET_NTUPLES(x) ((int *)(x + sizeof(prefetchqelem_t))) +#define GET_PTR_OID(x) ((unsigned int *)(x + sizeof(prefetchqelem_t) + sizeof(int))) +#define GET_PTR_EOFF(x,n) ((short *)(x + sizeof(prefetchqelem_t) + sizeof(int) + (n*sizeof(unsigned int)))) +#define GET_PTR_ARRYFLD(x,n) ((short *)(x + sizeof(prefetchqelem_t) + sizeof(int) + (n*sizeof(unsigned int)) + (n*sizeof(short)))) /* Global Variables */ extern int classsize[]; -//extern prefetchthreadqueue_t pfqueue; // Shared prefetch queue -objstr_t *mainobjstore; -plistnode_t *createPiles(transrecord_t *); +extern primarypfq_t pqueue; // shared prefetch queue +pthread_t wthreads[NUM_THREADS]; //Worker threads for working on the prefetch queue pthread_t tPrefetch; +extern objstr_t *mainobjstore; +objstr_t *prefetchcache; +plistnode_t *createPiles(transrecord_t *); inline int arrayLength(int *array) { int i; for(i=0 ;array[i] != -1; i++) ; return i; } - -// DS that contains information to be shared between threads. -typedef struct prefetchqelem { - struct prefetchqelem *next; -} prefetchqelem_t; - -typedef struct primarypfq { - prefetchqelem_t *front, *rear; - pthread_mutex_t qlock; - pthread_cond_t qcond; -} primarypfq_t; - -primarypfq_t pqueue; //Global queue -int flag; //Global flag - -void prefetch(int ntuples, unsigned int *oids, short *endoffsets, short *arrayfields); - - -void queueInit(void) { - pqueue.front = pqueue.rear = NULL; - pthread_mutex_init(&pqueue.qlock, NULL); - pthread_cond_init(&pqueue.qcond, NULL); -} - -/* Removes the first element of the queue */ -void delqnode() { - prefetchqelem_t *delnode; - if((pqueue.front == NULL) && (pqueue.rear == NULL)) { - printf("The queue is empty: UNDERFLOW %s, %d\n", __FILE__, __LINE__); - return; - } else if (pqueue.front == pqueue.rear) { - free(pqueue.front); - pqueue.front = pqueue.rear = NULL; - } else { - delnode = pqueue.front; - pqueue.front = pqueue.front->next; - free(delnode); - } -} - -void queueDelete(void) { - /* Remove each element */ - while(pqueue.front != NULL) - delqnode(); - pqueue.front = pqueue.rear = NULL; -} - -/* Inserts to the rear of primary prefetch queue */ -void enqueue(prefetchqelem_t *qnode) { - if(pqueue.front == NULL && pqueue.rear == NULL) { - pqueue.front = pqueue.rear = qnode; - } else { - qnode->next = NULL; - pqueue.rear->next = qnode; - pqueue.rear = qnode; - } -} - -prefetchqelem_t *dequeue(void) { - prefetchqelem_t *retnode; - - if (pqueue.front == NULL) { - printf("Queue empty: Underflow %s, %d\n", __FILE__, __LINE__); - return NULL; - } - retnode = pqueue.front; - //TODO make this atomic - pqueue.front = pqueue.front->next; - - return retnode; -} - -void queuedisplay(primarypfq_t pqueue) { - while(pqueue.front != NULL) { - /* - int *ptr = (int *)(pqueue + offset); - int oid = *ptr; - printf("oid = %d", oid); - //printf("Oid = %d", pqueue.front->oids[0]); - pqueue.front = pqueue.front->next; - */ - } -} - +/* This function is a prefetch call generated by the compiler that + * populates the shared primary prefetch queue*/ void prefetch(int ntuples, unsigned int *oids, short *endoffsets, short *arrayfields) { int qnodesize; int len = 0; - /* Create element for the queue */ - prefetchqelem_t *node; - qnodesize = sizeof(prefetchqelem_t) + sizeof(int) + ntuples * (sizeof(short) + sizeof(unsigned int)) + endoffsets[ntuples -1] * sizeof(short); + + /* Allocate for the queue node*/ + char *node; + qnodesize = sizeof(prefetchqelem_t) + sizeof(int) + ntuples * (sizeof(short) + sizeof(unsigned int)) + endoffsets[ntuples - 1] * sizeof(short); if((node = calloc(1,qnodesize)) == NULL) { printf("Calloc Error %s, %d\n", __FILE__, __LINE__); return; } + /* Set queue node values */ len = sizeof(prefetchqelem_t); memcpy(node + len, &ntuples, sizeof(int)); len += sizeof(int); @@ -133,39 +63,51 @@ void prefetch(int ntuples, unsigned int *oids, short *endoffsets, short *arrayfi memcpy(node + len, endoffsets, ntuples*sizeof(short)); len += ntuples * sizeof(short); memcpy(node + len, arrayfields, endoffsets[ntuples-1]*sizeof(short)); - + /* Lock and insert into primary prefetch queue */ pthread_mutex_lock(&pqueue.qlock); - /* Check if primary queue thread operating on this then wait */ - /* - if(flag) { - //TODO - } - */ - enqueue(node); - + enqueue((prefetchqelem_t *)node); + pthread_cond_signal(&pqueue.qcond); pthread_mutex_unlock(&pqueue.qlock); } - /* This function initiates the prefetch thread * A queue is shared between the main thread of execution * and the prefetch thread to process the prefetch call * Call from compiler populates the shared queue with prefetch requests while prefetch thread * processes the prefetch requests */ void transInit() { - //Initialize shared queue + int t, rc; + //Create and initialize prefetch cache structure + prefetchcache = objstrCreate(PREFETCH_CACHE_SIZE); + //Create prefetch cache lookup table + if(prehashCreate(HASH_SIZE, LOADFACTOR)) + return; //Failure + //Initialize primary shared queue queueInit(); - //Create the prefetch thread + //Create the primary prefetch thread pthread_create(&tPrefetch, NULL, transPrefetch, NULL); + //Create and Initialize a pool of threads + for(t = 0; t< NUM_THREADS; t++) { + //rc = pthread_create(&wthreads[t], NULL, pfqProcess, (void *)t); + if (rc) { + printf("Thread create error %s, %d\n", __FILE__, __LINE__); + return; + } + } } /* This function stops the threads spawned */ void transExit() { + int t; pthread_cancel(tPrefetch); + for(t = 0; t < NUM_THREADS; t++) + pthread_cancel(wthreads[t]); + return; } -/* This functions inserts randowm wait delays in the order of msec */ +/* This functions inserts randowm wait delays in the order of msec + * Mostly used when transaction commits retry*/ void randomdelay(void) { struct timespec req, rem; @@ -959,22 +901,284 @@ int transAbortProcess(void *modptr, unsigned int *objlocked, int numlocked, int /* This function is called by the thread calling transPrefetch */ void *transPrefetch(void *prefdata) { - compprefetchdata_t *ptr = (compprefetchdata_t *) prefdata; int *offstarray = NULL; + prefetchqelem_t *qnode; while(1) { - //TODO + /* lock mutex of primary prefetch queue */ + pthread_mutex_lock(&pqueue.qlock); + /* while primary queue is empty, then wait */ + while((pqueue.front == NULL) && (pqueue.rear == NULL)) { + pthread_cond_wait(&pqueue.qcond, &pqueue.qlock); + } + + /* dequeue node to create a machine piles and finally unlock mutex */ + if((qnode = dequeue()) == NULL) { + printf("Error: No node returned %s, %d\n", __FILE__, __LINE__); + return NULL; + } + pthread_mutex_unlock(&pqueue.qlock); + /* Reduce redundant prefetch requests */ + /* Group Requests by where objects are located */ + + + + } +} + +/* This function checks if the prefetch oids are same and have same offsets + * for case x.a.b and y.a.b where x and y have same oid's + * or if a.b.c is a subset of x.b.c.d*/ +/* check for case where the generated request a.y.z or x.y.z.g then + * prefetch needs to be generated for x.y.z.g if oid of a and x are same*/ +void checkPrefetchTuples(prefetchqelem_t *node) { + int i,j, count,k, sindex, index; + char *ptr, *tmp; + int ntuples, slength; + unsigned int *oid; + short *endoffsets, *arryfields; + + /* Check for the case x.y.z and a.b.c are same oids */ + ptr = (char *) node; + ntuples = *(GET_NTUPLES(ptr)); + oid = GET_PTR_OID(ptr); + endoffsets = GET_PTR_EOFF(ptr, ntuples); + arryfields = GET_PTR_ARRYFLD(ptr, ntuples); + /* Find offset length for each tuple */ + int numoffset[ntuples]; + numoffset[0] = endoffsets[0]; + for(i = 1; i numoffset[j]){ + slength = numoffset[j]; + sindex = j; + } + else { + slength = numoffset[i]; + sindex = i; + } + + /* Compare the offset values based on the current indices + * break if they do not match + * if all offset values match then pick the largest tuple*/ + + if(i == 0) { + k = 0; + index = endoffsets[j -1]; + for(count = 0; count < slength; count ++) { + if (arryfields[k] != arryfields[index]) { + break; + } + index++; + k++; + } + } else { + printf("i = %d, j = %d\n", i, j); + k = endoffsets[i-1]; + index = endoffsets[j-1]; + printf("Value of slength = %d\n", slength); + for(count = 0; count < slength; count++) { + printf("Value of count =%d\n", count); + if(arryfields[k] != arryfields[index]) { + break; + } + index++; + k++; + } + printf("Value of count =%d\n", count); + } + printf("The value of sindex = %d\n", sindex); + + if(slength == count) { + printf("DEBUG-> Inside slength if %d\n", sindex); + oid[sindex] = -1; + } + } + } + } +} + +void checkPreCache(prefetchqelem_t *node, int *numoffset, int counter, int loopcount, unsigned int objoid, int index, int iter, int oidnfound) { + char *ptr, *tmp; + int ntuples, i, k, flag; + unsigned int * oid; + short *endoffsets, *arryfields; + objheader_t *header; + + ptr = (char *) node; + ntuples = *(GET_NTUPLES(ptr)); + oid = GET_PTR_OID(ptr); + endoffsets = GET_PTR_EOFF(ptr, ntuples); + arryfields = GET_PTR_ARRYFLD(ptr, ntuples); + + if(oidnfound == 1) { + if((header = (objheader_t *) prehashSearch(objoid)) == NULL) { + return; + } else { //Found in Prefetch Cache + //TODO Decide if object is too old, if old remove from cache + tmp = (char *) header; + /* Check if any of the offset oid is available in the Prefetch cache */ + for(i = counter; i < loopcount; i++) { + objoid = *(tmp + sizeof(objheader_t) + arryfields[counter]); + if((header = (objheader_t *)prehashSearch(objoid)) != NULL) { + flag = 0; + } else { + flag = 1; + break; + } + } + } + } else { + for(i = counter; itype]; - objcopy = objstrAlloc(record->cache, size); - memcpy(objcopy, (void *)objheader, size); - /* Insert into cache's lookup table */ - chashInsert(record->lookupTable, objheader->oid, objcopy); - /* Find the offset field*/ - if(foundLocal(arrayofoffset, record, arrayofoffset[i], tmp, arrayofoffset[i][k+1], i) != 0 ) { - printf("Error in foundLocal() %s, %d\n", __FILE__, __LINE__); - return 1; - } - } else - continue; - } else - continue; - } /* Initialize and set thread attributes * Spawn a thread for each prefetch request sent*/ @@ -1057,100 +1233,6 @@ int transPrefetchProcess(transrecord_t *record, int *arrayofoffset[], short numo } -int checkPrefetchTuples(int *arrayofoffset[], int *arraylength, short numoids) { - int i, j, k, matchfound = 0, count = 0, slength, length; - int *sarray, *larray; - - /* Check if the prefetch oids are same and have same offsets - * for case x.a.b and y.a.b where x and y have same oid's - * or if a.b.c is a subset of x.b.c.d*/ - /* check for case where the generated request a.y.z or x.y.z.g then - * prefetch needs to be generated for x.y.z.g if oid of a and x are same*/ - for(i = 0; i < numoids -1 ; i++) { - if(arrayofoffset[i][0] == -1) - continue; - for( j = 0; j < (numoids - (i+1)); j++) { - if(arrayofoffset[j][0] == -1) - continue; - /* If the oids of the tuples match */ - if((arrayofoffset[i][0] == arrayofoffset[j][0]) && (i != j )) { - /*Find the smallest length of two arrays, find ptrs to smallest and long array */ - if(arraylength[i] > arraylength[j]) { - slength = arraylength[j]; - sarray = arrayofoffset[j]; - larray = arrayofoffset[i]; - } else { - slength = arraylength[i]; - sarray = arrayofoffset[i]; - larray = arrayofoffset[j]; - } - /* From first offset until end of tuple compare all offsets of sarray and larray - * if not a match then break */ - for(k = 1 ; k < slength -1 ; k++) { - if(sarray[k] != larray[k]) { - break; - } - } - /* If number of same offsets encountered is same as - * no of offsets in small array then common tuples found*/ - if(k == (slength -1)) - sarray[0] = -1; - } - } - } - - return 0; -} - -/* This function goes through an array and finds out until what offets - * * can it make a new pile - * * -1 indicates a special character to mark end of oid, offset tuple*/ -int foundLocal(int *arrayofoffset[], transrecord_t *record, int *array, objheader_t *header, int offset, int index) { - int i, j, counter, len; - unsigned int oid; - void *objcopy; - objheader_t *tmp, *objheader; - int size; - - /* The oid corresponding to the first offset */ - oid = (int) (header + sizeof(objheader_t) + offset); - /* Repeat for all offset values in the array */ - for(i = 1; array[i] != -1 ; ) { - /* If oid is found locally then insert into cache and continue - * with next offset to find the corresponding oid */ - if((objheader = (objheader_t*) mhashSearch(oid)) != NULL) { - tmp = mhashSearch(oid); - size = sizeof(objheader_t)+classsize[tmp->type]; - objcopy = objstrAlloc(record->cache, size); - memcpy(objcopy, (void *)tmp, size); - /* Insert into cache's lookup table */ - chashInsert(record->lookupTable, objheader->oid, objcopy); - oid = (int) (tmp + sizeof(objheader_t) + array[i+1]); - } else { - /*If oid not found locally then - *assign the latest oid found as the first element - *of array and copy left over offsets as other - *array elements to build the new prefetch array*/ - - arrayofoffset[index][0] = (int) oid; - counter = i; - len = arrayLength((int *)array[index]); - for( j = 1 ; counter <= len; j++,counter++) { - arrayofoffset[index][j] = array[counter + 1]; - } - break; - } - i++; - } - - /* If all offsets are found locally, then do not treat this as valid prefetch tuple */ - if(i == arrayLength((int *)array[index]) - 1) { - arrayofoffset[index][0] = -1; //Mark beginning of array as -1 - } - - return 0; -} - void *sendPrefetchReq(void *prefetchtuple) { int sd, i; struct sockaddr_in serv_addr;