a. Change queue implementation to linked list
authoradash <adash>
Thu, 19 Jul 2007 16:34:15 +0000 (16:34 +0000)
committeradash <adash>
Thu, 19 Jul 2007 16:34:15 +0000 (16:34 +0000)
b. add new library for prefetch cache lookup
c. Inittialize data structures
d. prefetch call generated by compiler
e. primary prefetch thread processing included

Robust/src/Runtime/DSTM/interface/Makefile
Robust/src/Runtime/DSTM/interface/dstm.h
Robust/src/Runtime/DSTM/interface/prelookup.c [new file with mode: 0644]
Robust/src/Runtime/DSTM/interface/prelookup.h [new file with mode: 0644]
Robust/src/Runtime/DSTM/interface/queue.c
Robust/src/Runtime/DSTM/interface/queue.h
Robust/src/Runtime/DSTM/interface/testd-3.c
Robust/src/Runtime/DSTM/interface/testd-4.c
Robust/src/Runtime/DSTM/interface/testserver.c
Robust/src/Runtime/DSTM/interface/trans.c

index 7f38b40b10a4c1da4f537f1f5d6d0ba0de15b0b8..050152fdc8e2324b9ec0256807b158c520ea630b 100644 (file)
@@ -1,22 +1,22 @@
 d-3:
-       gcc -dr -lpthread -g -o d-3 trans.c testd-3.c mlookup.c clookup.c llookup.c dstm.c objstr.c dstmserver.c plookup.c ip.c queue.c
+       gcc -dr -lpthread -g -o d-3 trans.c testd-3.c mlookup.c clookup.c llookup.c dstm.c objstr.c dstmserver.c plookup.c ip.c queue.c prelookup.c
 
 demsky:
-       gcc -DDEBUG -lpthread -g -o demsky dstmserver.c testserver.c plookup.c mlookup.c clookup.c llookup.c dstm.c objstr.c trans.c ip.c queue.c
+       gcc -DDEBUG -lpthread -g -o demsky dstmserver.c testserver.c plookup.c mlookup.c clookup.c llookup.c dstm.c objstr.c trans.c ip.c queue.c prelookup.c 
 
 d-4:
-       gcc -lpthread -g -o d-4 trans.c testd-4.c mlookup.c clookup.c llookup.c dstm.c objstr.c dstmserver.c plookup.c ip.c queue.c
+       gcc -lpthread -g -o d-4 trans.c testd-4.c mlookup.c clookup.c llookup.c dstm.c objstr.c dstmserver.c plookup.c ip.c queue.c prelookup.c
 
 all:
-       gcc -lpthread -g -o d-3 trans.c testd-3.c mlookup.c clookup.c llookup.c dstm.c objstr.c dstmserver.c plookup.c ip.c queue.c
-       gcc -lpthread -g -o demsky dstmserver.c testserver.c plookup.c mlookup.c clookup.c llookup.c dstm.c objstr.c trans.c ip.c queue.c
-       gcc -lpthread -g -o d-4 trans.c testd-4.c mlookup.c clookup.c llookup.c dstm.c objstr.c dstmserver.c plookup.c ip.c queue.c
+       gcc -lpthread -g -o d-3 trans.c testd-3.c mlookup.c clookup.c llookup.c dstm.c objstr.c dstmserver.c plookup.c ip.c queue.c prelookup.c
+       gcc -lpthread -g -o demsky dstmserver.c testserver.c plookup.c mlookup.c clookup.c llookup.c dstm.c objstr.c trans.c ip.c queue.c prelookup.c
+       gcc -lpthread -g -o d-4 trans.c testd-4.c mlookup.c clookup.c llookup.c dstm.c objstr.c dstmserver.c plookup.c ip.c queue.c prelookup.c
 
 
 mac:
-       gcc -DMAC -lpthread -g -o d-3 trans.c testd-3.c mlookup.c clookup.c llookup.c dstm.c objstr.c dstmserver.c plookup.c ip.c queue.c
-       gcc -DMAC -lpthread -g -o demsky dstmserver.c testserver.c plookup.c mlookup.c clookup.c llookup.c dstm.c objstr.c trans.c ip.c queue.c
-       gcc -DMAC -lpthread -g -o d-4 trans.c testd-4.c mlookup.c clookup.c llookup.c dstm.c objstr.c dstmserver.c plookup.c ip.c queue.c
+       gcc -DMAC -lpthread -g -o d-3 trans.c testd-3.c mlookup.c clookup.c llookup.c dstm.c objstr.c dstmserver.c plookup.c ip.c queue.c prelookup.c
+       gcc -DMAC -lpthread -g -o demsky dstmserver.c testserver.c plookup.c mlookup.c clookup.c llookup.c dstm.c objstr.c trans.c ip.c queue.c prelookup.c
+       gcc -DMAC -lpthread -g -o d-4 trans.c testd-4.c mlookup.c clookup.c llookup.c dstm.c objstr.c dstmserver.c plookup.c ip.c queue.c prelookup.c
 
 clean:
        rm -rf d-3 d-4 demsky
index cfcbecfd279962bb75e9a489c7bf7be4793a20ec..c937a57b7f12aa8d7796e56bc49aa0058a776940 100644 (file)
 #define OBJ_UNLOCK_BUT_VERSION_MATCH   15
 #define VERSION_NO_MATCH               16
 
+//Max number of objects 
+#define MAX_OBJECTS  20
+
 
 #include <stdlib.h>
 #include <stdio.h>
 #include <string.h>
 #include <pthread.h>
 #include "clookup.h"
+#include "queue.h"
 
 #define DEFAULT_OBJ_STORE_SIZE 1048510 //1MB
 #define TID_LEN 20
@@ -113,7 +117,6 @@ typedef struct thread_data_array {
        int *count;             //variable to count responses of TRANS_REQUEST protocol from all participants
        char *replyctrl;        //shared ctrl message that stores the reply to be sent, filled by decideResp
        char *replyretry;       //shared variable to find out if we need retry (TRANS_COMMIT case) 
-//     char *localstatus;     //shared variable to identify local requests
        transrecord_t *rec;     // To send modified objects
 } thread_data_array_t;
 
@@ -139,12 +142,22 @@ typedef struct member {
 
 
 //Structure for prefetching tuples generated by teh compiler
- typedef struct trans_prefetchtuple{
-        unsigned int depth;
-        unsigned int oid;
-        trans_member_t member;
-        struct trans_prefetchtuple *next;
- }trans_prefetchtuple_t;
+ typedef struct prefetchpile{
+        int mid;
+        int *oids;
+
+        int **numofarrys;
+        struct prefetchpile *next;
+ }prefetchpile_t;
+
+//Structure per Oid in the prefetch call
+
+/*
+//Structure that holds the compiler generated prefetch data
+typedef struct compprefetchdata {
+       transrecord_t *record;
+} compprefetchdata_t;
+*/
 
 /* Initialize main object store and lookup tables, start server thread. */
 int dstmInit(void);
@@ -183,5 +196,13 @@ char sendResponse(thread_data_array_t *, int); //Sends control message back to P
 void *getRemoteObj(transrecord_t *, unsigned int, unsigned int);
 int transAbortProcess(void *, unsigned int *, int, int, int);
 int transComProcess(trans_commit_data_t *);
+void prefetch(int, unsigned int *, short *, short*);
+void *transPrefetch(void *);
+void checkPrefetchTuples(prefetchqelem_t *);
+void foundLocal(prefetchqelem_t *);
+void makePreGroups(prefetchqelem_t *node);
+void checkPreCache(prefetchqelem_t *, int *, int, int, unsigned int, int, int, int);
+int transPrefetchProcess(transrecord_t *, int **, short);
+void *sendPrefetchReq(void *);
 /* end transactions */
 #endif
diff --git a/Robust/src/Runtime/DSTM/interface/prelookup.c b/Robust/src/Runtime/DSTM/interface/prelookup.c
new file mode 100644 (file)
index 0000000..77733df
--- /dev/null
@@ -0,0 +1,202 @@
+ #include "prelookup.h"
+
+prehashtable_t pflookup; //Global prefetch cache table
+
+unsigned int prehashCreate(unsigned int size, float loadfactor) {
+       prehashlistnode_t *nodes; 
+        int i; 
+
+        // Allocate space for the hash table 
+       if((nodes = calloc(size, sizeof(prehashlistnode_t))) == NULL) { 
+               printf("Calloc error %s %d\n", __FILE__, __LINE__);
+               return 1;
+       }       
+
+        pflookup.table = nodes;
+        pflookup.size = size; 
+        pflookup.numelements = 0; // Initial number of elements in the hash
+        pflookup.loadfactor = loadfactor;
+
+       //Initialize mutex var
+       pthread_mutex_init(&pflookup.lock, NULL);
+        
+       return 0;
+}
+
+//Assign keys to bins inside hash table
+unsigned int prehashFunction(unsigned int key) {
+       return ( key % (pflookup.size));
+}
+
+//Store oids and their pointers into hash
+unsigned int prehashInsert(unsigned int key, void *val) {
+       unsigned int newsize;
+       int index;
+       prehashlistnode_t *ptr, *node;
+
+       if(pflookup.numelements > (pflookup.loadfactor * pflookup.size)) {
+               //Resize
+               newsize = 2 * pflookup.size + 1;
+               pthread_mutex_lock(&pflookup.lock);
+               prehashResize(newsize);
+               pthread_mutex_unlock(&pflookup.lock);
+       }
+
+       ptr = pflookup.table;
+       pflookup.numelements++;
+       index = prehashFunction(key);
+       
+       pthread_mutex_lock(&pflookup.lock);
+       if(ptr[index].next == NULL && ptr[index].key == 0) {    // Insert at the first position in the hashtable
+               ptr[index].key = key;
+               ptr[index].val = val;
+       } else {                        // Insert in the beginning of linked list
+               if ((node = calloc(1, sizeof(prehashlistnode_t))) == NULL) {
+                       printf("Calloc error %s, %d\n", __FILE__, __LINE__);
+                       pthread_mutex_unlock(&pflookup.lock);
+                       return 1;
+               }
+               node->key = key;
+               node->val = val ;
+               node->next = ptr[index].next;
+               ptr[index].next = node;
+       }
+       pthread_mutex_unlock(&pflookup.lock);
+       return 0;
+}
+
+// Search for an address for a given oid
+void *prehashSearch(unsigned int key) {
+       int index;
+       prehashlistnode_t *ptr, *node;
+
+       ptr = pflookup.table;
+       index = prehashFunction(key);
+       node = &ptr[index];
+       pthread_mutex_lock(&pflookup.lock);
+       while(node != NULL) {
+               if(node->key == key) {
+                       pthread_mutex_unlock(&pflookup.lock);
+                       return node->val;
+               }
+               node = node->next;
+       }
+       pthread_mutex_unlock(&pflookup.lock);
+       return NULL;
+}
+
+unsigned int prehashRemove(unsigned int key) {
+       int index;
+       prehashlistnode_t *curr, *prev;
+       prehashlistnode_t *ptr, *node;
+       
+       ptr = pflookup.table;
+       index = prehashFunction(key);
+       curr = &ptr[index];
+
+       pthread_mutex_lock(&pflookup.lock);
+       for (; curr != NULL; curr = curr->next) {
+               if (curr->key == key) {         // Find a match in the hash table
+                       pflookup.numelements--;  // Decrement the number of elements in the global hashtable
+                       if ((curr == &ptr[index]) && (curr->next == NULL))  { // Delete the first item inside the hashtable with no linked list of prehashlistnode_t 
+                               curr->key = 0;
+                               curr->val = NULL;
+                       } else if ((curr == &ptr[index]) && (curr->next != NULL)) { //Delete the first item with a linked list of prehashlistnode_t  connected 
+                               curr->key = curr->next->key;
+                               curr->val = curr->next->val;
+                               node = curr->next;
+                               curr->next = curr->next->next;
+                               free(node);
+                       } else {                                                // Regular delete from linked listed    
+                               prev->next = curr->next;
+                               free(curr);
+                       }
+                       pthread_mutex_unlock(&pflookup.lock);
+                       return 0;
+               }       
+               prev = curr; 
+       }
+       pthread_mutex_unlock(&pflookup.lock);
+       return 1;
+}
+
+unsigned int prehashResize(unsigned int newsize) {
+       prehashlistnode_t *node, *ptr, *curr, *next;    // curr and next keep track of the current and the next chashlistnodes in a linked list
+       unsigned int oldsize;
+       int isfirst;    // Keeps track of the first element in the prehashlistnode_t for each bin in hashtable
+       int i,index;    
+       prehashlistnode_t *newnode;             
+       
+       ptr = pflookup.table;
+       oldsize = pflookup.size;
+       
+       if((node = calloc(newsize, sizeof(prehashlistnode_t))) == NULL) {
+               printf("Calloc error %s %d\n", __FILE__, __LINE__);
+               return 1;
+       }
+
+       pflookup.table = node;          //Update the global hashtable upon resize()
+       pflookup.size = newsize;
+       pflookup.numelements = 0;
+
+       for(i = 0; i < oldsize; i++) {                  //Outer loop for each bin in hash table
+               curr = &ptr[i];
+               isfirst = 1;                    
+               while (curr != NULL) {                  //Inner loop to go through linked lists
+                       if (curr->key == 0) {           //Exit inner loop if there the first element for a given bin/index is NULL
+                               break;                  //key = val =0 for element if not present within the hash table
+                       }
+                       next = curr->next;
+                       index = prehashFunction(curr->key);
+                       // Insert into the new table
+                       if(pflookup.table[index].next == NULL && pflookup.table[index].key == 0) { 
+                               pflookup.table[index].key = curr->key;
+                               pflookup.table[index].val = curr->val;
+                               pflookup.numelements++;
+                       }else { 
+                               if((newnode = calloc(1, sizeof(prehashlistnode_t))) == NULL) { 
+                                       printf("Calloc error %s, %d\n", __FILE__, __LINE__);
+                                       return 1;
+                               }       
+                               newnode->key = curr->key;
+                               newnode->val = curr->val;
+                               newnode->next = pflookup.table[index].next;
+                               pflookup.table[index].next = newnode;    
+                               pflookup.numelements++;
+                       }       
+
+                       //free the linked list of prehashlistnode_t if not the first element in the hash table
+                       if (isfirst != 1) {
+                               free(curr);
+                       } 
+                       
+                       isfirst = 0;
+                       curr = next;
+               }
+       }
+
+       free(ptr);              //Free the memory of the old hash table 
+       return 0;
+}
+
+/* Deletes the prefetch Cache */
+void prehashDelete() {
+       int i, isFirst;
+       prehashlistnode_t *ptr, *curr, *next;
+       ptr = pflookup.table; 
+
+       for(i=0 ; i<pflookup.size ; i++) {
+               curr = &ptr[i];
+               isFirst = 1;
+               while(curr != NULL) {
+                       next = curr->next;
+                       if(isFirst != 1) {
+                               free(curr);
+                       }
+                       isFirst = 0;
+                       curr = next;
+               }
+       }
+
+       free(ptr);
+}
diff --git a/Robust/src/Runtime/DSTM/interface/prelookup.h b/Robust/src/Runtime/DSTM/interface/prelookup.h
new file mode 100644 (file)
index 0000000..0f68d20
--- /dev/null
@@ -0,0 +1,35 @@
+#ifndef _PRELOOKUP_H_
+#define _PRELOOKUP_H_
+
+#include <stdlib.h>
+#include <stdio.h>
+#include <pthread.h>
+
+#define LOADFACTOR 0.75
+#define HASH_SIZE 100
+
+typedef struct prehashlistnode {
+       unsigned int key;
+       void *val; //this can be cast to another type or used to point to a larger structure
+       struct prehashlistnode *next;
+} prehashlistnode_t;
+
+typedef struct prehashtable {
+       prehashlistnode_t *table;       // points to beginning of hash table
+       unsigned int size;
+       unsigned int numelements;
+       float loadfactor;
+       pthread_mutex_t lock;
+} prehashtable_t;
+
+/* Prototypes for hash*/
+unsigned int prehashCreate(unsigned int size, float loadfactor);
+unsigned int prehashFunction(unsigned int key);
+unsigned int prehashInsert(unsigned int key, void *val);
+void *prehashSearch(unsigned int key); //returns val, NULL if not found
+unsigned int prehashRemove(unsigned int key); //returns -1 if not found
+unsigned int prehashResize(unsigned int newsize);
+/* end hash */
+
+#endif
+
index fc12eaabebc36cb003bdc270d279d367ef149f92..f164528f6e4462ab5af24ddfc68024406f44c99a 100644 (file)
 #include "queue.h"
 
-prefetchthreadqueue_t queue; //Global shared prefetch queue
+primarypfq_t pqueue; //Global queue
 
-void queueInsert(int *array) {
-       pthread_mutex_lock(&queue.qlock);
-       queue.rear = queue.rear % ARRAY_SIZE;
-       if(queue.front == queue.rear && queue.buffer[queue.front] != NULL) {
-               printf("The Circular Queue is Full : OVERFLOW\n");
-               pthread_mutex_unlock(&queue.qlock);
+void queueInit(void) {
+       /* Intitialize primary thread */
+       pqueue.front = pqueue.rear = NULL;
+       pthread_mutex_init(&pqueue.qlock, NULL);
+       pthread_cond_init(&pqueue.qcond, NULL);
+}
+
+/* Removes the first element of the queue */
+void delqnode() {
+       prefetchqelem_t *delnode;
+       if((pqueue.front == NULL) && (pqueue.rear == NULL)) {
+               printf("The queue is empty: UNDERFLOW %s, %d\n", __FILE__, __LINE__);
                return;
+       } else if ((pqueue.front == pqueue.rear) && pqueue.front != NULL && pqueue.rear != NULL) {
+               printf("TEST1\n");
+               free(pqueue.front);
+               pqueue.front = pqueue.rear = NULL;
        } else {
-               queue.buffer[queue.rear] = array;
-               queue.rear++;
+               delnode = pqueue.front;
+               pqueue.front = pqueue.front->next;
+               printf("TEST2\n");
+               free(delnode);
        }
-       pthread_mutex_unlock(&queue.qlock);
 }
 
-int *queueDelete() {
-       int *i; 
-       i = NULL;
-       pthread_mutex_lock(&queue.qlock);
-       if(queue.front == queue.rear && queue.buffer[queue.front] == NULL) {
-               printf("The Circular Queue is Empty : UNDERFLOW\n");
-               pthread_mutex_unlock(&queue.qlock);
-               return NULL;
+void queueDelete(void) {
+       /* Remove each element */
+       while(pqueue.front != NULL)
+               delqnode();
+       pqueue.front = pqueue.rear = NULL;
+}
+
+/* Inserts to the rear of primary prefetch queue */
+void enqueue(prefetchqelem_t *qnode) {
+       if(pqueue.front == NULL && pqueue.rear == NULL) {
+               pqueue.front = pqueue.rear = qnode;
        } else {
-               i = queue.buffer[queue.front];
-               queue.buffer[queue.front] = NULL;
-               queue.front++;
-               queue.front = queue.front % ARRAY_SIZE;
-               pthread_mutex_unlock(&queue.qlock);
-               return i;
+               qnode->next = NULL;
+               pqueue.rear->next = qnode;
+               pqueue.rear = qnode;
        }
 }
 
-void queueInit() {
-       int i;
-       queue.front = 0;
-       queue.rear = 0;
-       for(i = 0; i< ARRAY_SIZE; i++) 
-               queue.buffer[i] = NULL;
-       /* Initialize the pthread_mutex variable */
-       pthread_mutex_init(&queue.qlock, NULL);
+prefetchqelem_t *dequeue(void) {
+       prefetchqelem_t *retnode;
+       if (pqueue.front == NULL) {
+               printf("Queue empty: Underflow %s, %d\n", __FILE__, __LINE__);
+               return NULL;
+       }
+       retnode = pqueue.front;
+       //TODO make this atomic
+       pqueue.front = pqueue.front->next;
+
+       return retnode;
 }
 
-/* For testing purposes */
+void queueDisplay() {
+       int offset = sizeof(prefetchqelem_t);
+       int *ptr;
+       int ntuples;
+       char *ptr1;
+       prefetchqelem_t *tmp = pqueue.front;
+       while(tmp != NULL) {
+               ptr1 = (char *) tmp;
+               ptr = (int *)(ptr1 + offset);
+               ntuples = *ptr;
+               printf("Number of tuples = %d\n", ntuples);
+               tmp = tmp->next;
+       }
+}
+
+
 #if 0
 main() {
-       int *d; 
-       queueIsEmpty();
-       int a[] = {5, 2, 8, -1};
-       int b[] = {11, 8, 4, 19, -1};
-       int c[] = {16, 8, 4, -1};
-       printf("Front = %d, Rear = %d\n",queue.front, queue.rear);
-       d = queueDelete();
-       printf("Front = %d, Rear = %d\n",queue.front, queue.rear);
-       queueInsert(a);
-       printf("Enqueued ptr is %x\n", a);
-       printf("1st Insert Front = %d, Rear = %d\n",queue.front, queue.rear);
-       queueInsert(b);
-       printf("Enqueued ptr is %x\n", b);
-       printf("2nd Insert Front = %d, Rear = %d\n",queue.front, queue.rear);
-       queueInsert(c);
-       printf("3rd Insert Front = %d, Rear = %d\n",queue.front, queue.rear);
-       d = queueDelete();
-       printf("Dequeued ptr is %x\n", d);
-       printf("After 1st del Front = %d, Rear = %d\n",queue.front, queue.rear);
-       queueInsert(c);
-       printf("Enqueued ptr is %x\n", c);
-       printf("After 4th insert Front = %d, Rear = %d\n",queue.front, queue.rear);
-       d = queueDelete();
-       printf("Dequeued ptr is %x\n", d);
-       printf("After 2nd del Front = %d, Rear = %d\n",queue.front, queue.rear);
-       d = queueDelete();
-       printf("Dequeued ptr is %x\n", d);
-       printf("After 3rd del Front = %d, Rear = %d\n",queue.front, queue.rear);
-       d = queueDelete();
-       printf("Dequeued ptr is %x\n", d);
-       printf("After 4th del Front = %d, Rear = %d\n",queue.front, queue.rear);
+       unsigned int oids[] = {11, 13};
+       short endoffsets[] = {2, 5};
+       short arrayfields[] = {2, 2, 1, 5, 6};
+       queueInit();
+       queueDisplay();
+       prefetch(2, oids, endoffsets, arrayfields);
+       queueDisplay();
+       unsigned int oids1[] = {21, 23, 25, 27};
+       short endoffsets1[] = {1, 2, 3, 4};
+       short arrayfields1[] = {3, 2, 1, 3};
+       prefetch(4, oids1, endoffsets1, arrayfields1);
+       queueDisplay();
+       delqnode();
+       queueDisplay();
+       delqnode();
+       queueDisplay();
+       delqnode();
+       queueDisplay();
+       delqnode();
+
 }
 
 #endif
+
+
index 9ad5df452c5afb4bcc22cf831d0714c235590018..ea7dba2d91a3d15a283e3aa47fde3a022bb90c55 100644 (file)
@@ -4,19 +4,24 @@
 #include<stdio.h>
 #include<stdlib.h>
 #include<pthread.h>
-
-#define ARRAY_SIZE 20
+#include<string.h>
 
 // DS that contains information to be shared between threads.
-typedef struct prefetchthreadqueue {
-       int *buffer[ARRAY_SIZE];
-       int front;
-       int rear;
+typedef struct prefetchqelem {
+       struct prefetchqelem *next;
+} prefetchqelem_t;
+
+typedef struct primarypfq {
+       prefetchqelem_t *front, *rear;
        pthread_mutex_t qlock;
-} prefetchthreadqueue_t;
+       pthread_cond_t qcond;
+} primarypfq_t; 
 
-void queueInsert(int *);
-int *queueDelete();
-void queueInit(); //Initializes the queue and qlock mutex 
 
+void queueInit(void);
+void delqnode(); 
+void queueDelete(void);
+void enqueue(prefetchqelem_t *qnode);
+prefetchqelem_t *dequeue(void);
+void queueDisplay();
 #endif
index 1dfcea0c5a697be4c9a939fad3e4f0c734ee45b7..d3261a21bfa5b36edca74bc508828ed70c486d88 100644 (file)
@@ -133,7 +133,7 @@ int test2a(void) {
         lhashInsert(header->oid, mid);
 
         //Inserting into lhashtable
-        mid = iptoMid("128.200.9.30"); //d-4.eecs.uci.edu
+        mid = iptoMid("128.195.175.69"); //dw-1.eecs.uci.edu
         lhashInsert(31, mid);
         lhashInsert(32, mid);
 
@@ -148,9 +148,9 @@ int test2a(void) {
 
         //Check if machine demsky is up and running
         checkServer(mid, "128.200.9.10");
-        mid = iptoMid("128.200.9.30");
-        //Check if machine d-4 is up and running
-        checkServer(mid, "128.200.9.30");
+        mid = iptoMid("128.195.175.69");
+        //Check if machine dw-1 is up and running
+        checkServer(mid, "128.195.175.69");
 
         // Start Transaction    
         myTrans = transStart();
@@ -222,7 +222,7 @@ int test2b(void) {
        lhashInsert(header->oid, mid);
 
        //Inserting into lhashtable
-       mid = iptoMid("128.200.9.30"); //d-4.eecs.uci.edu
+       mid = iptoMid("128.195.175.69"); //dw-1.eecs.uci.edu
        lhashInsert(31, mid);
        lhashInsert(32, mid);
        lhashInsert(33, mid);
@@ -238,9 +238,9 @@ int test2b(void) {
 
        //Check if machine demsky is up and running
        checkServer(mid, "128.200.9.10");
-       mid = iptoMid("128.200.9.30");
-       //Check if machine d-4 is up and running
-       checkServer(mid, "128.200.9.30");
+       mid = iptoMid("128.195.175.69");
+       //Check if machine dw-1 is up and running
+       checkServer(mid, "128.195.175.69");
 
        // Start Transaction    
        myTrans = transStart();
@@ -259,7 +259,7 @@ int test2b(void) {
                printf("Object not found\n");
        }
        
-       //read object 32 (found on d-4)
+       //read object 32 (found on dw-1)
        if((h4 = transRead(myTrans, 32)) == NULL) {
                printf("Object not found\n");
        }
@@ -378,7 +378,7 @@ int test5(void) {
        lhashInsert(header->oid, mid);
 
        //Inserting into lhashtable
-       mid = iptoMid("128.200.9.30"); //d-4.eecs.uci.edu
+       mid = iptoMid("128.195.175.69"); //dw-1.eecs.uci.edu
        lhashInsert(31, mid);
        lhashInsert(32, mid);
        lhashInsert(33, mid);
@@ -393,9 +393,9 @@ int test5(void) {
        pthread_create(&thread_Listen, &attr, dstmListen, NULL);
        //Check if machine demsky is up and running
        checkServer(mid, "128.200.9.10");
-       mid = iptoMid("128.200.9.30");
-       //Check if machine d-4 is up and running
-       checkServer(mid, "128.200.9.30");
+       mid = iptoMid("128.195.175.69");
+       //Check if machine dw-1 is up and running
+       checkServer(mid, "128.195.175.69");
 
        // Start Transaction    
        myTrans = transStart();
@@ -405,11 +405,11 @@ int test5(void) {
                printf("Object not found\n");
        }
        //read object 2 (found on demksy)
-       if((h1 = transRead(myTrans,2)) == NULL){
+       if((h2 = transRead(myTrans,2)) == NULL){
                printf("Object not found\n");
        }
-       //read object 31 (found on d-4)
-       if((h2 = transRead(myTrans, 31)) == NULL) {
+       //read object 22 (found locally )
+       if((h3 = transRead(myTrans, 22)) == NULL) {
                printf("Object not found\n");
        }
        
@@ -449,7 +449,7 @@ int test5a(void) {
        header = (objheader_t *) objstrAlloc(mainobjstore, size);
        header->oid = 21;
        header->type = 1;
-       //read object 31 (found on d-4)
+       //read object 31 (found on dw-1)
        if((h2 = transRead(myTrans, 31)) == NULL) {
                printf("Object not found\n");
        }
@@ -512,7 +512,7 @@ int test5b(void) {
        lhashInsert(header->oid, mid);
 
        //Inserting into lhashtable
-       mid = iptoMid("128.200.9.30"); //d-4.eecs.uci.edu
+       mid = iptoMid("128.195.175.69"); //dw-1.eecs.uci.edu
        lhashInsert(31, mid);
        lhashInsert(32, mid);
        lhashInsert(33, mid);
@@ -528,9 +528,9 @@ int test5b(void) {
 
        //Check if machine demsky is up and running
        checkServer(mid, "128.200.9.10");
-       mid = iptoMid("128.200.9.30");
-       //Check if machine d-4 is up and running
-       checkServer(mid, "128.200.9.30");
+       mid = iptoMid("128.195.175.69");
+       //Check if machine dw-1 is up and running
+       checkServer(mid, "128.195.175.69");
 
        // Start Transaction    
        myTrans = transStart();
@@ -539,7 +539,7 @@ int test5b(void) {
        if((h1 = transRead(myTrans, 1)) == NULL){
                printf("Object not found\n");
        }
-       //read object 31 (found on d-4)
+       //read object 31 (found on dw-1)
        if((h2 = transRead(myTrans, 31)) == NULL) {
                printf("Object not found\n");
        }
@@ -612,7 +612,7 @@ int test7(void) {
        lhashInsert(header->oid, mid);
 
        //Inserting into lhashtable
-       mid = iptoMid("128.200.9.30"); //d-4.eecs.uci.edu
+       mid = iptoMid("128.195.175.69"); //dw-1.eecs.uci.edu
        lhashInsert(31, mid);
        lhashInsert(32, mid);
        lhashInsert(33, mid);
@@ -628,9 +628,9 @@ int test7(void) {
 
        //Check if machine demsky is up and running
        checkServer(mid, "128.200.9.10");
-       mid = iptoMid("128.200.9.30");
-       //Check if machine d-4 is up and running
-       checkServer(mid, "128.200.9.30");
+       mid = iptoMid("128.195.175.69");
+       //Check if machine dw-1 is up and running
+       checkServer(mid, "128.195.175.69");
 
        // Start Transaction    
        myTrans = transStart();
@@ -639,7 +639,7 @@ int test7(void) {
        if((h1 = transRead(myTrans, 3)) == NULL){
                printf("Object not found\n");
        }
-       //read object 32 (found on d-4)
+       //read object 32 (found on dw-1)
        if((h2 = transRead(myTrans, 32)) == NULL) {
                printf("Object not found\n");
        }
index 9876bdb2058aabd26d7938fb0a1c0bedded20598..26dba3a9f3cb6e18cf5ad4e13b150c1f7711466c 100644 (file)
@@ -41,7 +41,7 @@ int test1() {
        header = (objheader_t *) objstrAlloc(mainobjstore, size);
        init_obj(header, 31, 2, 1, 0, NEW);
        mhashInsert(header->oid, header);
-       mid = iptoMid("128.200.9.30");
+       mid = iptoMid("128.195.175.69");
        lhashInsert(header->oid, mid);
 
        //Create and Insert Oid 32
@@ -49,7 +49,7 @@ int test1() {
        header = (objheader_t *) objstrAlloc(mainobjstore, size);
        init_obj(header, 32, 1, 1, 0, NEW);
        mhashInsert(header->oid, header);
-       mid = iptoMid("128.200.9.30");
+       mid = iptoMid("128.195.175.69");
        lhashInsert(header->oid, mid);
 
        //Create and Insert Oid 33
@@ -57,7 +57,7 @@ int test1() {
        header = (objheader_t *) objstrAlloc(mainobjstore, size);
        init_obj(header, 33, 0, 1, 0, NEW);
        mhashInsert(header->oid, header);
-       mid = iptoMid("128.200.9.30");
+       mid = iptoMid("128.195.175.69");
        lhashInsert(header->oid, mid);
 
        //Inserting into lhashtable into d-3.eecs
@@ -104,7 +104,7 @@ int test2() {
        header = (objheader_t *) objstrAlloc(mainobjstore, size);
        init_obj(header, 31, 2, 1, 0, NEW);
        mhashInsert(header->oid, header);
-       mid = iptoMid("128.200.9.30");
+       mid = iptoMid("128.195.175.69");
        lhashInsert(header->oid, mid);
 
        //Create and Insert Oid 32
@@ -112,7 +112,7 @@ int test2() {
        header = (objheader_t *) objstrAlloc(mainobjstore, size);
        init_obj(header, 32, 1, 1, 0, NEW);
        mhashInsert(header->oid, header);
-       mid = iptoMid("128.200.9.30");
+       mid = iptoMid("128.195.175.69");
        lhashInsert(header->oid, mid);
 
        //Create and Insert Oid 33
@@ -120,7 +120,7 @@ int test2() {
        header = (objheader_t *) objstrAlloc(mainobjstore, size);
        init_obj(header, 33, 0, 1, 0, NEW);
        mhashInsert(header->oid, header);
-       mid = iptoMid("128.200.9.30");
+       mid = iptoMid("128.195.175.69");
        lhashInsert(header->oid, mid);
 
        //Inserting into lhashtable into d-3.eecs
@@ -190,7 +190,7 @@ int test3() {
        header = (objheader_t *) objstrAlloc(mainobjstore, size);
        init_obj(header, 31, 2, 1, 0, NEW);
        mhashInsert(header->oid, header);
-       mid = iptoMid("128.200.9.30");
+       mid = iptoMid("128.195.175.69");
        lhashInsert(header->oid, mid);
 
        //Create and Insert Oid 32
@@ -198,7 +198,7 @@ int test3() {
        header = (objheader_t *) objstrAlloc(mainobjstore, size);
        init_obj(header, 32, 1, 1, 0, NEW);
        mhashInsert(header->oid, header);
-       mid = iptoMid("128.200.9.30");
+       mid = iptoMid("128.195.175.69");
        lhashInsert(header->oid, mid);
 
        //Create and Insert Oid 33
@@ -206,7 +206,7 @@ int test3() {
        header = (objheader_t *) objstrAlloc(mainobjstore, size);
        init_obj(header, 33, 0, 1, 0, NEW);
        mhashInsert(header->oid, header);
-       mid = iptoMid("128.200.9.30");
+       mid = iptoMid("128.195.175.69");
        lhashInsert(header->oid, mid);
 
        //Inserting into lhashtable into d-3.eecs
@@ -272,7 +272,7 @@ int test4() {
        header = (objheader_t *) objstrAlloc(mainobjstore, size);
        init_obj(header, 31, 2, 1, 0, NEW);
        mhashInsert(header->oid, header);
-       mid = iptoMid("128.200.9.30");
+       mid = iptoMid("128.195.175.69");
        lhashInsert(header->oid, mid);
 
        //Create and Insert Oid 32
@@ -280,7 +280,7 @@ int test4() {
        header = (objheader_t *) objstrAlloc(mainobjstore, size);
        init_obj(header, 32, 1, 1, 0, NEW);
        mhashInsert(header->oid, header);
-       mid = iptoMid("128.200.9.30");
+       mid = iptoMid("128.195.175.69");
        lhashInsert(header->oid, mid);
 
        //Create and Insert Oid 33
@@ -288,7 +288,7 @@ int test4() {
        header = (objheader_t *) objstrAlloc(mainobjstore, size);
        init_obj(header, 33, 0, 1, 0, NEW);
        mhashInsert(header->oid, header);
-       mid = iptoMid("128.200.9.30");
+       mid = iptoMid("128.195.175.69");
        lhashInsert(header->oid, mid);
 
        //Inserting into lhashtable into d-3.eecs
index 61e397db1434aaf69010b4cef593bfe08f679ede..85d3b920b922224621ed8b6f250fb107c98e27db 100644 (file)
@@ -44,7 +44,7 @@ void init_obj(objheader_t *h, unsigned int oid, unsigned short type, \
 
 int main()
 {
-       //test1();
+//     test1();
 //     test3();
        test4();
 }
@@ -189,15 +189,15 @@ int test3() {
        lhashInsert(21, mid);
        lhashInsert(22, mid);
 
-       mid = iptoMid("128.200.9.30"); //d-4.eecs.uci.edu
+       mid = iptoMid("128.195.175.69"); //dw-1.eecs.uci.edu
        //Inserting into lhashtable
        lhashInsert(31, mid);
        lhashInsert(32, mid);
        lhashInsert(33, mid);
        pthread_create(&thread_Listen, &attr, dstmListen, NULL);
 
-       //Check if machine d-4 is up and running
-       checkServer(mid, "128.200.9.30");
+       //Check if machine dw-1 is up and running
+       checkServer(mid, "128.195.175.69");
        mid = iptoMid("128.200.9.29");
        //Check if machine d-3 is up and running
        checkServer(mid, "128.200.9.29");
@@ -281,15 +281,15 @@ int test4() {
        lhashInsert(21, mid);
        lhashInsert(22, mid);
 
-       mid = iptoMid("128.200.9.30"); //d-4.eecs.uci.edu
+       mid = iptoMid("128.195.175.69"); //dw-1.eecs.uci.edu
        //Inserting into lhashtable
        lhashInsert(31, mid);
        lhashInsert(32, mid);
        lhashInsert(33, mid);
 
        pthread_create(&thread_Listen, &attr, dstmListen, NULL);
-       //Check if machine d-4 is up and running
-       checkServer(mid, "128.200.9.30");
+       //Check if machine dw-1 is up and running
+       checkServer(mid, "128.195.175.69");
        mid = iptoMid("128.200.9.29");
        //Check if machine d-3 is up and running
        checkServer(mid, "128.200.9.29");
@@ -306,7 +306,7 @@ int test4() {
        if((h2 = transRead(myTrans, 1)) == NULL) {
                printf("Object not found\n");
        }
-       //read object 31(present in d-4 machine)
+       //read object 31(present in dw-1 machine)
        if((h3 = transRead(myTrans, 31)) == NULL) {
                printf("Object not found\n");
        }
index d36b585b80bc42593421ccf935c4997790aa5812..2bf2d0195700bb2bf506b9ca36a5d47710545447 100644 (file)
@@ -4,7 +4,7 @@
 #include "mlookup.h"
 #include "llookup.h"
 #include "plookup.h"
-//#include "queue.h"
+#include "queue.h"
 #include <pthread.h>
 #include <sys/types.h>
 #include <sys/socket.h>
 #include <unistd.h>
 #include <time.h>
 #include <string.h>
+#include <pthread.h>
 
 #define LISTEN_PORT 2156
 #define RECEIVE_BUFFER_SIZE 2048
+#define NUM_THREADS 10
+#define PREFETCH_CACHE_SIZE 1048576 //1MB
+
+#define GET_NTUPLES(x)         ((int *)(x + sizeof(prefetchqelem_t)))
+#define GET_PTR_OID(x)         ((unsigned int *)(x + sizeof(prefetchqelem_t) + sizeof(int)))
+#define GET_PTR_EOFF(x,n) ((short *)(x + sizeof(prefetchqelem_t) + sizeof(int) + (n*sizeof(unsigned int))))
+#define GET_PTR_ARRYFLD(x,n) ((short *)(x + sizeof(prefetchqelem_t) + sizeof(int) + (n*sizeof(unsigned int)) + (n*sizeof(short))))
 
 /* Global Variables */
 extern int classsize[];
-//extern prefetchthreadqueue_t pfqueue; // Shared prefetch queue
-objstr_t *mainobjstore;
-plistnode_t *createPiles(transrecord_t *);
+extern primarypfq_t pqueue; // shared prefetch queue
+pthread_t wthreads[NUM_THREADS]; //Worker threads for working on the prefetch queue
 pthread_t tPrefetch;
+extern objstr_t *mainobjstore;
+objstr_t *prefetchcache;
 
+plistnode_t *createPiles(transrecord_t *);
 inline int arrayLength(int *array) {
        int i;
        for(i=0 ;array[i] != -1; i++)
                ;
        return i;
 }
-
-// DS that contains information to be shared between threads.
-typedef struct prefetchqelem {
-       struct prefetchqelem *next;
-} prefetchqelem_t;
-
-typedef struct primarypfq {
-       prefetchqelem_t *front, *rear;
-       pthread_mutex_t qlock;
-       pthread_cond_t qcond;
-} primarypfq_t; 
-
-primarypfq_t pqueue; //Global queue
-int flag; //Global flag
-
-void prefetch(int ntuples, unsigned int *oids, short *endoffsets, short *arrayfields);
-
-
-void queueInit(void) {
-       pqueue.front = pqueue.rear = NULL;
-       pthread_mutex_init(&pqueue.qlock, NULL);
-       pthread_cond_init(&pqueue.qcond, NULL);
-}
-
-/* Removes the first element of the queue */
-void delqnode() {
-       prefetchqelem_t *delnode;
-       if((pqueue.front == NULL) && (pqueue.rear == NULL)) {
-               printf("The queue is empty: UNDERFLOW %s, %d\n", __FILE__, __LINE__);
-               return;
-       } else if (pqueue.front == pqueue.rear) {
-               free(pqueue.front);
-               pqueue.front = pqueue.rear = NULL;
-       } else {
-               delnode = pqueue.front;
-               pqueue.front = pqueue.front->next;
-               free(delnode);
-       }
-}
-
-void queueDelete(void) {
-       /* Remove each element */
-       while(pqueue.front != NULL)
-               delqnode();
-       pqueue.front = pqueue.rear = NULL;
-}
-
-/* Inserts to the rear of primary prefetch queue */
-void enqueue(prefetchqelem_t *qnode) {
-       if(pqueue.front == NULL && pqueue.rear == NULL) {
-               pqueue.front = pqueue.rear = qnode;
-       } else {
-               qnode->next = NULL;
-               pqueue.rear->next = qnode;
-               pqueue.rear = qnode;
-       }
-}
-
-prefetchqelem_t *dequeue(void) {
-       prefetchqelem_t *retnode;
-
-       if (pqueue.front == NULL) {
-               printf("Queue empty: Underflow %s, %d\n", __FILE__, __LINE__);
-               return NULL;
-       }
-       retnode = pqueue.front;
-       //TODO make this atomic
-       pqueue.front = pqueue.front->next;
-
-       return retnode;
-}
-
-void queuedisplay(primarypfq_t pqueue) {
-       while(pqueue.front != NULL) {
-               /*
-               int *ptr = (int *)(pqueue + offset);
-               int oid = *ptr;
-               printf("oid = %d", oid);
-               //printf("Oid = %d", pqueue.front->oids[0]);
-               pqueue.front = pqueue.front->next;
-               */
-       }
-}
-
+/* This function is a prefetch call generated by the compiler that
+ * populates the shared primary prefetch queue*/
 void prefetch(int ntuples, unsigned int *oids, short *endoffsets, short *arrayfields) {
        int qnodesize;
        int len = 0;
-       /* Create element for the queue */
-       prefetchqelem_t *node;
-       qnodesize = sizeof(prefetchqelem_t) + sizeof(int) + ntuples * (sizeof(short) + sizeof(unsigned int)) + endoffsets[ntuples -1] * sizeof(short); 
+
+       /* Allocate for the queue node*/
+       char *node;
+       qnodesize = sizeof(prefetchqelem_t) + sizeof(int) + ntuples * (sizeof(short) + sizeof(unsigned int)) + endoffsets[ntuples - 1] * sizeof(short); 
        if((node = calloc(1,qnodesize)) == NULL) {
                printf("Calloc Error %s, %d\n", __FILE__, __LINE__);
                return;
        }
+       /* Set queue node values */
        len = sizeof(prefetchqelem_t);
        memcpy(node + len, &ntuples, sizeof(int));
        len += sizeof(int);
@@ -133,39 +63,51 @@ void prefetch(int ntuples, unsigned int *oids, short *endoffsets, short *arrayfi
        memcpy(node + len, endoffsets, ntuples*sizeof(short));
        len += ntuples * sizeof(short);
        memcpy(node + len, arrayfields, endoffsets[ntuples-1]*sizeof(short));
-
+       /* Lock and insert into primary prefetch queue */
        pthread_mutex_lock(&pqueue.qlock);
-       /* Check if primary queue thread operating on this then wait */
-       /*
-       if(flag) {
-               //TODO
-       }
-       */
-       enqueue(node);
-
+       enqueue((prefetchqelem_t *)node);
+       pthread_cond_signal(&pqueue.qcond);
        pthread_mutex_unlock(&pqueue.qlock);
 }
 
-
 /* This function initiates the prefetch thread
  * A queue is shared between the main thread of execution
  * and the prefetch thread to process the prefetch call
  * Call from compiler populates the shared queue with prefetch requests while prefetch thread
  * processes the prefetch requests */
 void transInit() {
-       //Initialize shared queue
+       int t, rc;
+       //Create and initialize prefetch cache structure
+       prefetchcache = objstrCreate(PREFETCH_CACHE_SIZE);
+       //Create prefetch cache lookup table
+       if(prehashCreate(HASH_SIZE, LOADFACTOR))
+                       return; //Failure
+       //Initialize primary shared queue
        queueInit();
-       //Create the prefetch thread 
+       //Create the primary prefetch thread 
        pthread_create(&tPrefetch, NULL, transPrefetch, NULL);
+       //Create and Initialize a pool of threads 
+       for(t = 0; t< NUM_THREADS; t++) {
+               //rc = pthread_create(&wthreads[t], NULL, pfqProcess, (void *)t);
+               if (rc) {
+                       printf("Thread create error %s, %d\n", __FILE__, __LINE__);
+                       return;
+               }
+       }
 }
 
 /* This function stops the threads spawned */
 void transExit() {
+       int t;
        pthread_cancel(tPrefetch);
+       for(t = 0; t < NUM_THREADS; t++)
+               pthread_cancel(wthreads[t]);
+
        return;
 }
 
-/* This functions inserts randowm wait delays in the order of msec */
+/* This functions inserts randowm wait delays in the order of msec
+ * Mostly used when transaction commits retry*/
 void randomdelay(void)
 {
        struct timespec req, rem;
@@ -959,22 +901,284 @@ int transAbortProcess(void *modptr, unsigned int *objlocked, int numlocked, int
 
 /* This function is called by the thread calling transPrefetch */
 void *transPrefetch(void *prefdata) {
-       compprefetchdata_t *ptr = (compprefetchdata_t *) prefdata;
        int *offstarray = NULL;
+       prefetchqelem_t *qnode;
 
        while(1) {
-               //TODO
+               /* lock mutex of primary prefetch queue */
+               pthread_mutex_lock(&pqueue.qlock);
+               /* while primary queue is empty, then wait */
+               while((pqueue.front == NULL) && (pqueue.rear == NULL)) {
+                       pthread_cond_wait(&pqueue.qcond, &pqueue.qlock);
+               }
+
+               /* dequeue node to create a machine piles and  finally unlock mutex */
+               if((qnode = dequeue()) == NULL) {
+                       printf("Error: No node returned %s, %d\n", __FILE__, __LINE__);
+                       return NULL;
+               }
+               pthread_mutex_unlock(&pqueue.qlock);
+               /* Reduce redundant prefetch requests */
+               /* Group Requests by where objects are located */
+       
+
+
+       }
+}
+
+/* This function checks if the prefetch oids are same and have same offsets  
+ * for case x.a.b and y.a.b where x and y have same oid's
+ * or if a.b.c is a subset of x.b.c.d*/ 
+/* check for case where the generated request a.y.z or x.y.z.g then 
+ * prefetch needs to be generated for x.y.z.g  if oid of a and x are same*/
+void checkPrefetchTuples(prefetchqelem_t *node) {
+       int i,j, count,k, sindex, index;
+       char *ptr, *tmp;
+       int ntuples, slength;
+       unsigned int *oid;
+       short *endoffsets, *arryfields; 
+
+       /* Check for the case x.y.z and a.b.c are same oids */ 
+       ptr = (char *) node;
+       ntuples = *(GET_NTUPLES(ptr));
+       oid = GET_PTR_OID(ptr);
+       endoffsets = GET_PTR_EOFF(ptr, ntuples); 
+       arryfields = GET_PTR_ARRYFLD(ptr, ntuples);
+       /* Find offset length for each tuple */
+       int numoffset[ntuples];
+       numoffset[0] = endoffsets[0];
+       for(i = 1; i<ntuples; i++) {
+               numoffset[i] = endoffsets[i] - endoffsets[i-1];
+       }
+       /* Check for redundant tuples by comparing oids of each tuple */
+       for(i = 0; i < ntuples; i++) {
+               if(oid[i] == -1)
+                       continue;
+               for(j = i+1 ; j < ntuples; j++) {
+                       if(oid[j] == -1)
+                               continue;
+                       /*If oids of tuples match */ 
+                       if (oid[i] == oid[j]) {
+                               /* Find the smallest offset length of two tuples*/
+                               if(numoffset[i] >  numoffset[j]){
+                                       slength = numoffset[j];
+                                       sindex = j;
+                               }
+                               else {
+                                       slength = numoffset[i];
+                                       sindex = i;
+                               }
+
+                               /* Compare the offset values based on the current indices
+                                * break if they do not match
+                                * if all offset values match then pick the largest tuple*/
+
+                               if(i == 0) {
+                                       k = 0;
+                                       index = endoffsets[j -1];
+                                       for(count = 0; count < slength; count ++) {
+                                               if (arryfields[k] != arryfields[index]) { 
+                                                       break;
+                                               }
+                                               index++;
+                                               k++;
+                                       }       
+                               } else {
+                                       printf("i = %d, j = %d\n", i, j);
+                                       k = endoffsets[i-1];
+                                       index = endoffsets[j-1];
+                                       printf("Value of slength = %d\n", slength);
+                                       for(count = 0; count < slength; count++) {
+                                               printf("Value of count =%d\n", count);
+                                               if(arryfields[k] != arryfields[index]) {
+                                                       break;
+                                               }
+                                               index++;
+                                               k++;
+                                       }
+                                       printf("Value of count =%d\n", count);
+                               }
+                               printf("The value of sindex = %d\n", sindex);
+
+                               if(slength == count) {
+                                       printf("DEBUG-> Inside slength if %d\n", sindex);
+                                       oid[sindex] = -1;
+                               }
+                       }
+               }
+       }
+}
+
+void checkPreCache(prefetchqelem_t *node, int *numoffset, int counter, int loopcount, unsigned int objoid, int index, int iter, int oidnfound) {
+       char *ptr, *tmp;
+       int ntuples, i, k, flag;
+       unsigned int * oid;
+       short *endoffsets, *arryfields;
+       objheader_t *header;
+
+       ptr = (char *) node;
+       ntuples = *(GET_NTUPLES(ptr));
+       oid = GET_PTR_OID(ptr);
+       endoffsets = GET_PTR_EOFF(ptr, ntuples);
+       arryfields = GET_PTR_ARRYFLD(ptr, ntuples);
+
+       if(oidnfound == 1) {
+               if((header = (objheader_t *) prehashSearch(objoid)) == NULL) {
+                       return;
+               } else { //Found in Prefetch Cache
+                       //TODO Decide if object is too old, if old remove from cache
+                       tmp = (char *) header;
+                       /* Check if any of the offset oid is available in the Prefetch cache */
+                       for(i = counter; i < loopcount; i++) {
+                               objoid = *(tmp + sizeof(objheader_t) + arryfields[counter]);
+                               if((header = (objheader_t *)prehashSearch(objoid)) != NULL) {
+                                       flag = 0;
+                               } else {
+                                       flag = 1;
+                                       break;
+                               }
+                       }
+               }
+       } else {
+               for(i = counter; i<loopcount; i++) {
+                       if((header = (objheader_t *)prehashSearch(objoid)) != NULL) {
+                               tmp = (char *) header;
+                               objoid = *(tmp + sizeof(objheader_t) + arryfields[index]);
+                               flag = 0;
+                               index++;
+                       } else {
+                               flag = 1;
+                               break;
+                       }
+               }
+       }
 
-               //Typecast prequest to a struct 
-               //Gets requests from the compiler
-               //Creates and fills the shared queue DS 
-               //Creates the threads that processes the transPrefetch call or
-               //wakes up the prefetchprocess thread if it is sleeping
+       /* If oid not found locally or in prefetch cache then 
+        * assign the latest oid found as the new oid 
+        * and copy left over offsets into the arrayoffsetfieldarray*/
+       oid[iter] = objoid;
+       numoffset[iter] = numoffset[iter] - (i+1);
+       if(iter == 0)
+               endoffsets[iter] = numoffset[iter];
+       else
+               endoffsets[iter] = numoffset[iter] + endoffsets[iter - 1];
+       for(k = 0; k < numoffset[iter] ; k++) {
+               arryfields[k] = arryfields[counter+1];
+               counter++;
+       }
+
+       if(flag == 0) {
+               oid[iter] = -1;
+               numoffset[iter] = 0;
+               endoffsets[iter] = 0;
+       }
+}
+
+/* This function checks if the oids within the prefetch tuples are available locally.
+ * If yes then makes the tuple invalid. If no then rearranges oid and offset values in 
+ * the prefetchqelem_t node to represent a new prefetch tuple */
+void foundLocal(prefetchqelem_t *node) {
+       int ntuples,i, j, k, oidnfound = 0, index, flag;
+       unsigned int *oid;
+       unsigned int  objoid;
+       char *ptr, *tmp;
+       objheader_t *objheader;
+       short *endoffsets, *arryfields; 
+
+       ptr = (char *) node;
+       ntuples = *(GET_NTUPLES(ptr));
+       oid = GET_PTR_OID(ptr);
+       endoffsets = GET_PTR_EOFF(ptr, ntuples); 
+       arryfields = GET_PTR_ARRYFLD(ptr, ntuples);
+       /* Find offset length for each tuple */
+       int numoffset[ntuples];//Number of offsets for each tuple
+       numoffset[0] = endoffsets[0];
+       for(i = 1; i<ntuples; i++) {
+               numoffset[i] = endoffsets[i] - endoffsets[i-1];
+       }
+       for(i = 0; i < ntuples; i++) { 
+               if(oid[i] == -1)
+                       continue;
+               /* If object found locally */
+               if((objheader = (objheader_t*) mhashSearch(oid[i])) != NULL) { 
+                       oidnfound = 0;
+                       tmp = (char *) objheader;
+                       /* Find the oid of its offset value */
+                       if(i == 0) 
+                               index = 0;
+                       else 
+                               index = endoffsets[i - 1];
+                       for(j = 0 ; j < numoffset[i] ; j++) {
+                               objoid = *(tmp + sizeof(objheader_t) + arryfields[index]);
+                               index++;
+                               /*New offset oid not found */
+                               if((objheader = (objheader_t*) mhashSearch(objoid)) == NULL) {
+                                       flag = 1;
+                                       checkPreCache(node, numoffset, j, numoffset[i], objoid, index, i, oidnfound); 
+                                       break;
+                               } else 
+                                       flag = 0;
+                       }
+                       /*If oid not found locally then 
+                        *assign the latest oid found as the new oid 
+                        *and copy left over offsets into the arrayoffsetfieldarray*/
+                       oid[i] = objoid;
+                       numoffset[i] = numoffset[i] - (j+1);
+                       if(i == 0)
+                               endoffsets[i] = numoffset[i];
+                       else 
+                               endoffsets[i] = numoffset[i] - endoffsets[i - 1];
+                       for(k = 0; k < numoffset[i]; k++) {
+                               arryfields[k] = arryfields[j+1];
+                               j++;
+                       }
+                       /*If all offset oids are found locally,make the prefetch tuple invalid */
+                       if(flag == 0) {
+                               oid[i] = -1;
+                               numoffset[i] = 0;
+                               endoffsets[i] = 0;
+                       }
+               } else {
+                       oidnfound = 1;
+                       /* Look in Prefetch cache */
+                       checkPreCache(node, numoffset, 0, numoffset[i], oid[i], 0, i, oidnfound); 
+               }
 
        }
 }
 
+void makePreGroups(prefetchqelem_t *node) {
+       char *ptr, *tmp;
+       int ntuples, slength, i, machinenum;
+       unsigned int *oid;
+       short *endoffsets, *arryfields; 
+
+
+       /* Check for the case x.y.z and a.b.c are same oids */ 
+       ptr = (char *) node;
+       ntuples = *(GET_NTUPLES(ptr));
+       oid = GET_PTR_OID(ptr);
+       endoffsets = GET_PTR_EOFF(ptr, ntuples); 
+       arryfields = GET_PTR_ARRYFLD(ptr, ntuples);
+       /* Find offset length for each tuple */
+       int numoffset[ntuples];
+       numoffset[0] = endoffsets[0];
+       for(i = 1; i<ntuples; i++) {
+               numoffset[i] = endoffsets[i] - endoffsets[i-1];
+       }
 
+       /* Check for redundant tuples by comparing oids of each tuple */
+       for(i = 0; i < ntuples; i++) {
+               if(oid[i] == -1)
+                       continue;
+               /* For each tuple make piles */
+               if ((machinenum = lhashSearch(oid[i])) == 0) {
+                       printf("Error: No such Machine %s, %d\n", __FILE__, __LINE__);
+                       return;
+               }
+       }
+
+}
 
 /*This function is called by the thread that processes the 
  * prefetch request makes piles to prefetch records and prefetches the oids from remote machines */
@@ -991,34 +1195,6 @@ int transPrefetchProcess(transrecord_t *record, int *arrayofoffset[], short numo
        for(i = 0; i < numoids ; i++) {
                arraylength[i] = arrayLength(arrayofoffset[i]);
        }
-       /* Check for similar tuples or other special case tuples that can be combined to a 
-        * prefetch message*/
-       if(checkPrefetchTuples(arrayofoffset, arraylength, numoids) != 0) {
-               printf("Error on checkPrefetchTuples at %s, %d\n", __FILE__, __LINE__);
-               return 1;
-       }
-
-       /* Check if part of prefetch request available locally */
-       for(i = 0; i < numoids ; i++) {
-               if(arrayofoffset[i][0] != -1) {
-                       if((objheader = (objheader_t *) mhashSearch(arrayofoffset[i][k])) != NULL) {
-                               /* Look up in machine lookup table  and copy  into cache*/
-                               //tmp = mhashSearch(oid);
-                               size = sizeof(objheader_t)+classsize[tmp->type];
-                               objcopy = objstrAlloc(record->cache, size);
-                               memcpy(objcopy, (void *)objheader, size);
-                               /* Insert into cache's lookup table */
-                               chashInsert(record->lookupTable, objheader->oid, objcopy); 
-                               /* Find the offset field*/
-                               if(foundLocal(arrayofoffset, record, arrayofoffset[i], tmp, arrayofoffset[i][k+1], i) != 0 ) {
-                                       printf("Error in foundLocal() %s, %d\n", __FILE__, __LINE__);
-                                       return 1;
-                               }
-                       } else 
-                               continue;
-               } else 
-                       continue;
-       }
 
        /* Initialize and set thread attributes 
         * Spawn a thread for each prefetch request sent*/
@@ -1057,100 +1233,6 @@ int transPrefetchProcess(transrecord_t *record, int *arrayofoffset[], short numo
                
 }
 
-int checkPrefetchTuples(int *arrayofoffset[], int *arraylength, short numoids) {
-       int i, j, k, matchfound = 0, count = 0, slength, length;
-       int *sarray, *larray;
-
-       /* Check if the prefetch oids are same and have same offsets  
-        * for case x.a.b and y.a.b where x and y have same oid's
-        * or if a.b.c is a subset of x.b.c.d*/ 
-       /* check for case where the generated request a.y.z or x.y.z.g then 
-        * prefetch needs to be generated for x.y.z.g  if oid of a and x are same*/
-       for(i = 0; i < numoids -1 ; i++) {
-               if(arrayofoffset[i][0] == -1)
-                       continue;
-               for( j = 0; j < (numoids - (i+1)); j++) {
-                       if(arrayofoffset[j][0] == -1)
-                               continue;
-                       /* If the oids of the tuples match */
-                       if((arrayofoffset[i][0] == arrayofoffset[j][0]) && (i != j )) {
-                               /*Find the  smallest length of two arrays, find ptrs to smallest and long array */
-                               if(arraylength[i] > arraylength[j]) {
-                                       slength = arraylength[j];
-                                       sarray = arrayofoffset[j];
-                                       larray = arrayofoffset[i];
-                               } else {
-                                       slength = arraylength[i];
-                                       sarray = arrayofoffset[i];
-                                       larray = arrayofoffset[j];
-                               }
-                               /* From first offset until end of tuple compare all offsets of sarray and larray
-                                * if not a match then break */
-                               for(k = 1 ; k < slength -1 ; k++) {
-                                       if(sarray[k] != larray[k]) {
-                                               break;
-                                       }
-                               }
-                               /* If number of same offsets encountered is same as 
-                                * no of offsets in small array then common tuples found*/
-                                if(k == (slength -1))
-                                        sarray[0] = -1;
-                       }
-               }
-       }
-
-       return 0;
-}
-
-/* This function goes through an array and finds out until what offets 
- *  * can it make a new pile
- *   * -1 indicates a special character to mark end of oid, offset tuple*/
-int foundLocal(int *arrayofoffset[], transrecord_t *record, int *array, objheader_t *header, int offset, int index) {
-       int i, j, counter, len;
-       unsigned int oid;
-       void *objcopy;
-       objheader_t *tmp, *objheader;
-       int size;
-
-       /* The oid corresponding to the first offset */
-       oid = (int) (header + sizeof(objheader_t) + offset);
-       /* Repeat for all offset values in the array */
-       for(i = 1; array[i] != -1 ; ) { 
-               /* If oid  is found locally  then insert into cache and continue
-                * with next offset to find the corresponding oid */
-               if((objheader = (objheader_t*) mhashSearch(oid)) != NULL) {
-                       tmp = mhashSearch(oid);
-                       size = sizeof(objheader_t)+classsize[tmp->type];
-                       objcopy = objstrAlloc(record->cache, size);
-                       memcpy(objcopy, (void *)tmp, size);
-                       /* Insert into cache's lookup table */
-                       chashInsert(record->lookupTable, objheader->oid, objcopy);
-                       oid = (int) (tmp + sizeof(objheader_t) + array[i+1]);
-               } else {
-                       /*If oid not found locally then 
-                        *assign the latest oid found as the first element
-                        *of array and copy left over offsets as other
-                        *array elements to build the new prefetch array*/
-
-                       arrayofoffset[index][0] = (int) oid;
-                       counter = i;
-                       len = arrayLength((int *)array[index]);
-                       for( j = 1 ; counter <= len; j++,counter++) {
-                               arrayofoffset[index][j] = array[counter + 1]; 
-                       }
-                       break;
-               }
-               i++;
-       }
-
-       /* If all offsets are found locally, then do not treat this as valid prefetch tuple */
-       if(i == arrayLength((int *)array[index]) - 1) {
-               arrayofoffset[index][0] = -1; //Mark beginning of array as -1
-       }
-       
-       return 0;
-}
-
 void *sendPrefetchReq(void *prefetchtuple) {
        int sd, i;
        struct sockaddr_in serv_addr;