d-3:
- gcc -dr -lpthread -g -o d-3 trans.c testd-3.c mlookup.c clookup.c llookup.c dstm.c objstr.c dstmserver.c plookup.c ip.c queue.c
+ gcc -dr -lpthread -g -o d-3 trans.c testd-3.c mlookup.c clookup.c llookup.c dstm.c objstr.c dstmserver.c plookup.c ip.c queue.c prelookup.c
demsky:
- gcc -DDEBUG -lpthread -g -o demsky dstmserver.c testserver.c plookup.c mlookup.c clookup.c llookup.c dstm.c objstr.c trans.c ip.c queue.c
+ gcc -DDEBUG -lpthread -g -o demsky dstmserver.c testserver.c plookup.c mlookup.c clookup.c llookup.c dstm.c objstr.c trans.c ip.c queue.c prelookup.c
d-4:
- gcc -lpthread -g -o d-4 trans.c testd-4.c mlookup.c clookup.c llookup.c dstm.c objstr.c dstmserver.c plookup.c ip.c queue.c
+ gcc -lpthread -g -o d-4 trans.c testd-4.c mlookup.c clookup.c llookup.c dstm.c objstr.c dstmserver.c plookup.c ip.c queue.c prelookup.c
all:
- gcc -lpthread -g -o d-3 trans.c testd-3.c mlookup.c clookup.c llookup.c dstm.c objstr.c dstmserver.c plookup.c ip.c queue.c
- gcc -lpthread -g -o demsky dstmserver.c testserver.c plookup.c mlookup.c clookup.c llookup.c dstm.c objstr.c trans.c ip.c queue.c
- gcc -lpthread -g -o d-4 trans.c testd-4.c mlookup.c clookup.c llookup.c dstm.c objstr.c dstmserver.c plookup.c ip.c queue.c
+ gcc -lpthread -g -o d-3 trans.c testd-3.c mlookup.c clookup.c llookup.c dstm.c objstr.c dstmserver.c plookup.c ip.c queue.c prelookup.c
+ gcc -lpthread -g -o demsky dstmserver.c testserver.c plookup.c mlookup.c clookup.c llookup.c dstm.c objstr.c trans.c ip.c queue.c prelookup.c
+ gcc -lpthread -g -o d-4 trans.c testd-4.c mlookup.c clookup.c llookup.c dstm.c objstr.c dstmserver.c plookup.c ip.c queue.c prelookup.c
mac:
- gcc -DMAC -lpthread -g -o d-3 trans.c testd-3.c mlookup.c clookup.c llookup.c dstm.c objstr.c dstmserver.c plookup.c ip.c queue.c
- gcc -DMAC -lpthread -g -o demsky dstmserver.c testserver.c plookup.c mlookup.c clookup.c llookup.c dstm.c objstr.c trans.c ip.c queue.c
- gcc -DMAC -lpthread -g -o d-4 trans.c testd-4.c mlookup.c clookup.c llookup.c dstm.c objstr.c dstmserver.c plookup.c ip.c queue.c
+ gcc -DMAC -lpthread -g -o d-3 trans.c testd-3.c mlookup.c clookup.c llookup.c dstm.c objstr.c dstmserver.c plookup.c ip.c queue.c prelookup.c
+ gcc -DMAC -lpthread -g -o demsky dstmserver.c testserver.c plookup.c mlookup.c clookup.c llookup.c dstm.c objstr.c trans.c ip.c queue.c prelookup.c
+ gcc -DMAC -lpthread -g -o d-4 trans.c testd-4.c mlookup.c clookup.c llookup.c dstm.c objstr.c dstmserver.c plookup.c ip.c queue.c prelookup.c
clean:
rm -rf d-3 d-4 demsky
#define OBJ_UNLOCK_BUT_VERSION_MATCH 15
#define VERSION_NO_MATCH 16
+//Max number of objects
+#define MAX_OBJECTS 20
+
#include <stdlib.h>
#include <stdio.h>
#include <string.h>
#include <pthread.h>
#include "clookup.h"
+#include "queue.h"
#define DEFAULT_OBJ_STORE_SIZE 1048510 //1MB
#define TID_LEN 20
int *count; //variable to count responses of TRANS_REQUEST protocol from all participants
char *replyctrl; //shared ctrl message that stores the reply to be sent, filled by decideResp
char *replyretry; //shared variable to find out if we need retry (TRANS_COMMIT case)
-// char *localstatus; //shared variable to identify local requests
transrecord_t *rec; // To send modified objects
} thread_data_array_t;
//Structure for prefetching tuples generated by teh compiler
- typedef struct trans_prefetchtuple{
- unsigned int depth;
- unsigned int oid;
- trans_member_t member;
- struct trans_prefetchtuple *next;
- }trans_prefetchtuple_t;
+ typedef struct prefetchpile{
+ int mid;
+ int *oids;
+
+ int **numofarrys;
+ struct prefetchpile *next;
+ }prefetchpile_t;
+
+//Structure per Oid in the prefetch call
+
+/*
+//Structure that holds the compiler generated prefetch data
+typedef struct compprefetchdata {
+ transrecord_t *record;
+} compprefetchdata_t;
+*/
/* Initialize main object store and lookup tables, start server thread. */
int dstmInit(void);
void *getRemoteObj(transrecord_t *, unsigned int, unsigned int);
int transAbortProcess(void *, unsigned int *, int, int, int);
int transComProcess(trans_commit_data_t *);
+void prefetch(int, unsigned int *, short *, short*);
+void *transPrefetch(void *);
+void checkPrefetchTuples(prefetchqelem_t *);
+void foundLocal(prefetchqelem_t *);
+void makePreGroups(prefetchqelem_t *node);
+void checkPreCache(prefetchqelem_t *, int *, int, int, unsigned int, int, int, int);
+int transPrefetchProcess(transrecord_t *, int **, short);
+void *sendPrefetchReq(void *);
/* end transactions */
#endif
--- /dev/null
+ #include "prelookup.h"
+
+prehashtable_t pflookup; //Global prefetch cache table
+
+unsigned int prehashCreate(unsigned int size, float loadfactor) {
+ prehashlistnode_t *nodes;
+ int i;
+
+ // Allocate space for the hash table
+ if((nodes = calloc(size, sizeof(prehashlistnode_t))) == NULL) {
+ printf("Calloc error %s %d\n", __FILE__, __LINE__);
+ return 1;
+ }
+
+ pflookup.table = nodes;
+ pflookup.size = size;
+ pflookup.numelements = 0; // Initial number of elements in the hash
+ pflookup.loadfactor = loadfactor;
+
+ //Initialize mutex var
+ pthread_mutex_init(&pflookup.lock, NULL);
+
+ return 0;
+}
+
+//Assign keys to bins inside hash table
+unsigned int prehashFunction(unsigned int key) {
+ return ( key % (pflookup.size));
+}
+
+//Store oids and their pointers into hash
+unsigned int prehashInsert(unsigned int key, void *val) {
+ unsigned int newsize;
+ int index;
+ prehashlistnode_t *ptr, *node;
+
+ if(pflookup.numelements > (pflookup.loadfactor * pflookup.size)) {
+ //Resize
+ newsize = 2 * pflookup.size + 1;
+ pthread_mutex_lock(&pflookup.lock);
+ prehashResize(newsize);
+ pthread_mutex_unlock(&pflookup.lock);
+ }
+
+ ptr = pflookup.table;
+ pflookup.numelements++;
+ index = prehashFunction(key);
+
+ pthread_mutex_lock(&pflookup.lock);
+ if(ptr[index].next == NULL && ptr[index].key == 0) { // Insert at the first position in the hashtable
+ ptr[index].key = key;
+ ptr[index].val = val;
+ } else { // Insert in the beginning of linked list
+ if ((node = calloc(1, sizeof(prehashlistnode_t))) == NULL) {
+ printf("Calloc error %s, %d\n", __FILE__, __LINE__);
+ pthread_mutex_unlock(&pflookup.lock);
+ return 1;
+ }
+ node->key = key;
+ node->val = val ;
+ node->next = ptr[index].next;
+ ptr[index].next = node;
+ }
+ pthread_mutex_unlock(&pflookup.lock);
+ return 0;
+}
+
+// Search for an address for a given oid
+void *prehashSearch(unsigned int key) {
+ int index;
+ prehashlistnode_t *ptr, *node;
+
+ ptr = pflookup.table;
+ index = prehashFunction(key);
+ node = &ptr[index];
+ pthread_mutex_lock(&pflookup.lock);
+ while(node != NULL) {
+ if(node->key == key) {
+ pthread_mutex_unlock(&pflookup.lock);
+ return node->val;
+ }
+ node = node->next;
+ }
+ pthread_mutex_unlock(&pflookup.lock);
+ return NULL;
+}
+
+unsigned int prehashRemove(unsigned int key) {
+ int index;
+ prehashlistnode_t *curr, *prev;
+ prehashlistnode_t *ptr, *node;
+
+ ptr = pflookup.table;
+ index = prehashFunction(key);
+ curr = &ptr[index];
+
+ pthread_mutex_lock(&pflookup.lock);
+ for (; curr != NULL; curr = curr->next) {
+ if (curr->key == key) { // Find a match in the hash table
+ pflookup.numelements--; // Decrement the number of elements in the global hashtable
+ if ((curr == &ptr[index]) && (curr->next == NULL)) { // Delete the first item inside the hashtable with no linked list of prehashlistnode_t
+ curr->key = 0;
+ curr->val = NULL;
+ } else if ((curr == &ptr[index]) && (curr->next != NULL)) { //Delete the first item with a linked list of prehashlistnode_t connected
+ curr->key = curr->next->key;
+ curr->val = curr->next->val;
+ node = curr->next;
+ curr->next = curr->next->next;
+ free(node);
+ } else { // Regular delete from linked listed
+ prev->next = curr->next;
+ free(curr);
+ }
+ pthread_mutex_unlock(&pflookup.lock);
+ return 0;
+ }
+ prev = curr;
+ }
+ pthread_mutex_unlock(&pflookup.lock);
+ return 1;
+}
+
+unsigned int prehashResize(unsigned int newsize) {
+ prehashlistnode_t *node, *ptr, *curr, *next; // curr and next keep track of the current and the next chashlistnodes in a linked list
+ unsigned int oldsize;
+ int isfirst; // Keeps track of the first element in the prehashlistnode_t for each bin in hashtable
+ int i,index;
+ prehashlistnode_t *newnode;
+
+ ptr = pflookup.table;
+ oldsize = pflookup.size;
+
+ if((node = calloc(newsize, sizeof(prehashlistnode_t))) == NULL) {
+ printf("Calloc error %s %d\n", __FILE__, __LINE__);
+ return 1;
+ }
+
+ pflookup.table = node; //Update the global hashtable upon resize()
+ pflookup.size = newsize;
+ pflookup.numelements = 0;
+
+ for(i = 0; i < oldsize; i++) { //Outer loop for each bin in hash table
+ curr = &ptr[i];
+ isfirst = 1;
+ while (curr != NULL) { //Inner loop to go through linked lists
+ if (curr->key == 0) { //Exit inner loop if there the first element for a given bin/index is NULL
+ break; //key = val =0 for element if not present within the hash table
+ }
+ next = curr->next;
+ index = prehashFunction(curr->key);
+ // Insert into the new table
+ if(pflookup.table[index].next == NULL && pflookup.table[index].key == 0) {
+ pflookup.table[index].key = curr->key;
+ pflookup.table[index].val = curr->val;
+ pflookup.numelements++;
+ }else {
+ if((newnode = calloc(1, sizeof(prehashlistnode_t))) == NULL) {
+ printf("Calloc error %s, %d\n", __FILE__, __LINE__);
+ return 1;
+ }
+ newnode->key = curr->key;
+ newnode->val = curr->val;
+ newnode->next = pflookup.table[index].next;
+ pflookup.table[index].next = newnode;
+ pflookup.numelements++;
+ }
+
+ //free the linked list of prehashlistnode_t if not the first element in the hash table
+ if (isfirst != 1) {
+ free(curr);
+ }
+
+ isfirst = 0;
+ curr = next;
+ }
+ }
+
+ free(ptr); //Free the memory of the old hash table
+ return 0;
+}
+
+/* Deletes the prefetch Cache */
+void prehashDelete() {
+ int i, isFirst;
+ prehashlistnode_t *ptr, *curr, *next;
+ ptr = pflookup.table;
+
+ for(i=0 ; i<pflookup.size ; i++) {
+ curr = &ptr[i];
+ isFirst = 1;
+ while(curr != NULL) {
+ next = curr->next;
+ if(isFirst != 1) {
+ free(curr);
+ }
+ isFirst = 0;
+ curr = next;
+ }
+ }
+
+ free(ptr);
+}
--- /dev/null
+#ifndef _PRELOOKUP_H_
+#define _PRELOOKUP_H_
+
+#include <stdlib.h>
+#include <stdio.h>
+#include <pthread.h>
+
+#define LOADFACTOR 0.75
+#define HASH_SIZE 100
+
+typedef struct prehashlistnode {
+ unsigned int key;
+ void *val; //this can be cast to another type or used to point to a larger structure
+ struct prehashlistnode *next;
+} prehashlistnode_t;
+
+typedef struct prehashtable {
+ prehashlistnode_t *table; // points to beginning of hash table
+ unsigned int size;
+ unsigned int numelements;
+ float loadfactor;
+ pthread_mutex_t lock;
+} prehashtable_t;
+
+/* Prototypes for hash*/
+unsigned int prehashCreate(unsigned int size, float loadfactor);
+unsigned int prehashFunction(unsigned int key);
+unsigned int prehashInsert(unsigned int key, void *val);
+void *prehashSearch(unsigned int key); //returns val, NULL if not found
+unsigned int prehashRemove(unsigned int key); //returns -1 if not found
+unsigned int prehashResize(unsigned int newsize);
+/* end hash */
+
+#endif
+
#include "queue.h"
-prefetchthreadqueue_t queue; //Global shared prefetch queue
+primarypfq_t pqueue; //Global queue
-void queueInsert(int *array) {
- pthread_mutex_lock(&queue.qlock);
- queue.rear = queue.rear % ARRAY_SIZE;
- if(queue.front == queue.rear && queue.buffer[queue.front] != NULL) {
- printf("The Circular Queue is Full : OVERFLOW\n");
- pthread_mutex_unlock(&queue.qlock);
+void queueInit(void) {
+ /* Intitialize primary thread */
+ pqueue.front = pqueue.rear = NULL;
+ pthread_mutex_init(&pqueue.qlock, NULL);
+ pthread_cond_init(&pqueue.qcond, NULL);
+}
+
+/* Removes the first element of the queue */
+void delqnode() {
+ prefetchqelem_t *delnode;
+ if((pqueue.front == NULL) && (pqueue.rear == NULL)) {
+ printf("The queue is empty: UNDERFLOW %s, %d\n", __FILE__, __LINE__);
return;
+ } else if ((pqueue.front == pqueue.rear) && pqueue.front != NULL && pqueue.rear != NULL) {
+ printf("TEST1\n");
+ free(pqueue.front);
+ pqueue.front = pqueue.rear = NULL;
} else {
- queue.buffer[queue.rear] = array;
- queue.rear++;
+ delnode = pqueue.front;
+ pqueue.front = pqueue.front->next;
+ printf("TEST2\n");
+ free(delnode);
}
- pthread_mutex_unlock(&queue.qlock);
}
-int *queueDelete() {
- int *i;
- i = NULL;
- pthread_mutex_lock(&queue.qlock);
- if(queue.front == queue.rear && queue.buffer[queue.front] == NULL) {
- printf("The Circular Queue is Empty : UNDERFLOW\n");
- pthread_mutex_unlock(&queue.qlock);
- return NULL;
+void queueDelete(void) {
+ /* Remove each element */
+ while(pqueue.front != NULL)
+ delqnode();
+ pqueue.front = pqueue.rear = NULL;
+}
+
+/* Inserts to the rear of primary prefetch queue */
+void enqueue(prefetchqelem_t *qnode) {
+ if(pqueue.front == NULL && pqueue.rear == NULL) {
+ pqueue.front = pqueue.rear = qnode;
} else {
- i = queue.buffer[queue.front];
- queue.buffer[queue.front] = NULL;
- queue.front++;
- queue.front = queue.front % ARRAY_SIZE;
- pthread_mutex_unlock(&queue.qlock);
- return i;
+ qnode->next = NULL;
+ pqueue.rear->next = qnode;
+ pqueue.rear = qnode;
}
}
-void queueInit() {
- int i;
- queue.front = 0;
- queue.rear = 0;
- for(i = 0; i< ARRAY_SIZE; i++)
- queue.buffer[i] = NULL;
- /* Initialize the pthread_mutex variable */
- pthread_mutex_init(&queue.qlock, NULL);
+prefetchqelem_t *dequeue(void) {
+ prefetchqelem_t *retnode;
+ if (pqueue.front == NULL) {
+ printf("Queue empty: Underflow %s, %d\n", __FILE__, __LINE__);
+ return NULL;
+ }
+ retnode = pqueue.front;
+ //TODO make this atomic
+ pqueue.front = pqueue.front->next;
+
+ return retnode;
}
-/* For testing purposes */
+void queueDisplay() {
+ int offset = sizeof(prefetchqelem_t);
+ int *ptr;
+ int ntuples;
+ char *ptr1;
+ prefetchqelem_t *tmp = pqueue.front;
+ while(tmp != NULL) {
+ ptr1 = (char *) tmp;
+ ptr = (int *)(ptr1 + offset);
+ ntuples = *ptr;
+ printf("Number of tuples = %d\n", ntuples);
+ tmp = tmp->next;
+ }
+}
+
+
#if 0
main() {
- int *d;
- queueIsEmpty();
- int a[] = {5, 2, 8, -1};
- int b[] = {11, 8, 4, 19, -1};
- int c[] = {16, 8, 4, -1};
- printf("Front = %d, Rear = %d\n",queue.front, queue.rear);
- d = queueDelete();
- printf("Front = %d, Rear = %d\n",queue.front, queue.rear);
- queueInsert(a);
- printf("Enqueued ptr is %x\n", a);
- printf("1st Insert Front = %d, Rear = %d\n",queue.front, queue.rear);
- queueInsert(b);
- printf("Enqueued ptr is %x\n", b);
- printf("2nd Insert Front = %d, Rear = %d\n",queue.front, queue.rear);
- queueInsert(c);
- printf("3rd Insert Front = %d, Rear = %d\n",queue.front, queue.rear);
- d = queueDelete();
- printf("Dequeued ptr is %x\n", d);
- printf("After 1st del Front = %d, Rear = %d\n",queue.front, queue.rear);
- queueInsert(c);
- printf("Enqueued ptr is %x\n", c);
- printf("After 4th insert Front = %d, Rear = %d\n",queue.front, queue.rear);
- d = queueDelete();
- printf("Dequeued ptr is %x\n", d);
- printf("After 2nd del Front = %d, Rear = %d\n",queue.front, queue.rear);
- d = queueDelete();
- printf("Dequeued ptr is %x\n", d);
- printf("After 3rd del Front = %d, Rear = %d\n",queue.front, queue.rear);
- d = queueDelete();
- printf("Dequeued ptr is %x\n", d);
- printf("After 4th del Front = %d, Rear = %d\n",queue.front, queue.rear);
+ unsigned int oids[] = {11, 13};
+ short endoffsets[] = {2, 5};
+ short arrayfields[] = {2, 2, 1, 5, 6};
+ queueInit();
+ queueDisplay();
+ prefetch(2, oids, endoffsets, arrayfields);
+ queueDisplay();
+ unsigned int oids1[] = {21, 23, 25, 27};
+ short endoffsets1[] = {1, 2, 3, 4};
+ short arrayfields1[] = {3, 2, 1, 3};
+ prefetch(4, oids1, endoffsets1, arrayfields1);
+ queueDisplay();
+ delqnode();
+ queueDisplay();
+ delqnode();
+ queueDisplay();
+ delqnode();
+ queueDisplay();
+ delqnode();
+
}
#endif
+
+
#include<stdio.h>
#include<stdlib.h>
#include<pthread.h>
-
-#define ARRAY_SIZE 20
+#include<string.h>
// DS that contains information to be shared between threads.
-typedef struct prefetchthreadqueue {
- int *buffer[ARRAY_SIZE];
- int front;
- int rear;
+typedef struct prefetchqelem {
+ struct prefetchqelem *next;
+} prefetchqelem_t;
+
+typedef struct primarypfq {
+ prefetchqelem_t *front, *rear;
pthread_mutex_t qlock;
-} prefetchthreadqueue_t;
+ pthread_cond_t qcond;
+} primarypfq_t;
-void queueInsert(int *);
-int *queueDelete();
-void queueInit(); //Initializes the queue and qlock mutex
+void queueInit(void);
+void delqnode();
+void queueDelete(void);
+void enqueue(prefetchqelem_t *qnode);
+prefetchqelem_t *dequeue(void);
+void queueDisplay();
#endif
lhashInsert(header->oid, mid);
//Inserting into lhashtable
- mid = iptoMid("128.200.9.30"); //d-4.eecs.uci.edu
+ mid = iptoMid("128.195.175.69"); //dw-1.eecs.uci.edu
lhashInsert(31, mid);
lhashInsert(32, mid);
//Check if machine demsky is up and running
checkServer(mid, "128.200.9.10");
- mid = iptoMid("128.200.9.30");
- //Check if machine d-4 is up and running
- checkServer(mid, "128.200.9.30");
+ mid = iptoMid("128.195.175.69");
+ //Check if machine dw-1 is up and running
+ checkServer(mid, "128.195.175.69");
// Start Transaction
myTrans = transStart();
lhashInsert(header->oid, mid);
//Inserting into lhashtable
- mid = iptoMid("128.200.9.30"); //d-4.eecs.uci.edu
+ mid = iptoMid("128.195.175.69"); //dw-1.eecs.uci.edu
lhashInsert(31, mid);
lhashInsert(32, mid);
lhashInsert(33, mid);
//Check if machine demsky is up and running
checkServer(mid, "128.200.9.10");
- mid = iptoMid("128.200.9.30");
- //Check if machine d-4 is up and running
- checkServer(mid, "128.200.9.30");
+ mid = iptoMid("128.195.175.69");
+ //Check if machine dw-1 is up and running
+ checkServer(mid, "128.195.175.69");
// Start Transaction
myTrans = transStart();
printf("Object not found\n");
}
- //read object 32 (found on d-4)
+ //read object 32 (found on dw-1)
if((h4 = transRead(myTrans, 32)) == NULL) {
printf("Object not found\n");
}
lhashInsert(header->oid, mid);
//Inserting into lhashtable
- mid = iptoMid("128.200.9.30"); //d-4.eecs.uci.edu
+ mid = iptoMid("128.195.175.69"); //dw-1.eecs.uci.edu
lhashInsert(31, mid);
lhashInsert(32, mid);
lhashInsert(33, mid);
pthread_create(&thread_Listen, &attr, dstmListen, NULL);
//Check if machine demsky is up and running
checkServer(mid, "128.200.9.10");
- mid = iptoMid("128.200.9.30");
- //Check if machine d-4 is up and running
- checkServer(mid, "128.200.9.30");
+ mid = iptoMid("128.195.175.69");
+ //Check if machine dw-1 is up and running
+ checkServer(mid, "128.195.175.69");
// Start Transaction
myTrans = transStart();
printf("Object not found\n");
}
//read object 2 (found on demksy)
- if((h1 = transRead(myTrans,2)) == NULL){
+ if((h2 = transRead(myTrans,2)) == NULL){
printf("Object not found\n");
}
- //read object 31 (found on d-4)
- if((h2 = transRead(myTrans, 31)) == NULL) {
+ //read object 22 (found locally )
+ if((h3 = transRead(myTrans, 22)) == NULL) {
printf("Object not found\n");
}
header = (objheader_t *) objstrAlloc(mainobjstore, size);
header->oid = 21;
header->type = 1;
- //read object 31 (found on d-4)
+ //read object 31 (found on dw-1)
if((h2 = transRead(myTrans, 31)) == NULL) {
printf("Object not found\n");
}
lhashInsert(header->oid, mid);
//Inserting into lhashtable
- mid = iptoMid("128.200.9.30"); //d-4.eecs.uci.edu
+ mid = iptoMid("128.195.175.69"); //dw-1.eecs.uci.edu
lhashInsert(31, mid);
lhashInsert(32, mid);
lhashInsert(33, mid);
//Check if machine demsky is up and running
checkServer(mid, "128.200.9.10");
- mid = iptoMid("128.200.9.30");
- //Check if machine d-4 is up and running
- checkServer(mid, "128.200.9.30");
+ mid = iptoMid("128.195.175.69");
+ //Check if machine dw-1 is up and running
+ checkServer(mid, "128.195.175.69");
// Start Transaction
myTrans = transStart();
if((h1 = transRead(myTrans, 1)) == NULL){
printf("Object not found\n");
}
- //read object 31 (found on d-4)
+ //read object 31 (found on dw-1)
if((h2 = transRead(myTrans, 31)) == NULL) {
printf("Object not found\n");
}
lhashInsert(header->oid, mid);
//Inserting into lhashtable
- mid = iptoMid("128.200.9.30"); //d-4.eecs.uci.edu
+ mid = iptoMid("128.195.175.69"); //dw-1.eecs.uci.edu
lhashInsert(31, mid);
lhashInsert(32, mid);
lhashInsert(33, mid);
//Check if machine demsky is up and running
checkServer(mid, "128.200.9.10");
- mid = iptoMid("128.200.9.30");
- //Check if machine d-4 is up and running
- checkServer(mid, "128.200.9.30");
+ mid = iptoMid("128.195.175.69");
+ //Check if machine dw-1 is up and running
+ checkServer(mid, "128.195.175.69");
// Start Transaction
myTrans = transStart();
if((h1 = transRead(myTrans, 3)) == NULL){
printf("Object not found\n");
}
- //read object 32 (found on d-4)
+ //read object 32 (found on dw-1)
if((h2 = transRead(myTrans, 32)) == NULL) {
printf("Object not found\n");
}
header = (objheader_t *) objstrAlloc(mainobjstore, size);
init_obj(header, 31, 2, 1, 0, NEW);
mhashInsert(header->oid, header);
- mid = iptoMid("128.200.9.30");
+ mid = iptoMid("128.195.175.69");
lhashInsert(header->oid, mid);
//Create and Insert Oid 32
header = (objheader_t *) objstrAlloc(mainobjstore, size);
init_obj(header, 32, 1, 1, 0, NEW);
mhashInsert(header->oid, header);
- mid = iptoMid("128.200.9.30");
+ mid = iptoMid("128.195.175.69");
lhashInsert(header->oid, mid);
//Create and Insert Oid 33
header = (objheader_t *) objstrAlloc(mainobjstore, size);
init_obj(header, 33, 0, 1, 0, NEW);
mhashInsert(header->oid, header);
- mid = iptoMid("128.200.9.30");
+ mid = iptoMid("128.195.175.69");
lhashInsert(header->oid, mid);
//Inserting into lhashtable into d-3.eecs
header = (objheader_t *) objstrAlloc(mainobjstore, size);
init_obj(header, 31, 2, 1, 0, NEW);
mhashInsert(header->oid, header);
- mid = iptoMid("128.200.9.30");
+ mid = iptoMid("128.195.175.69");
lhashInsert(header->oid, mid);
//Create and Insert Oid 32
header = (objheader_t *) objstrAlloc(mainobjstore, size);
init_obj(header, 32, 1, 1, 0, NEW);
mhashInsert(header->oid, header);
- mid = iptoMid("128.200.9.30");
+ mid = iptoMid("128.195.175.69");
lhashInsert(header->oid, mid);
//Create and Insert Oid 33
header = (objheader_t *) objstrAlloc(mainobjstore, size);
init_obj(header, 33, 0, 1, 0, NEW);
mhashInsert(header->oid, header);
- mid = iptoMid("128.200.9.30");
+ mid = iptoMid("128.195.175.69");
lhashInsert(header->oid, mid);
//Inserting into lhashtable into d-3.eecs
header = (objheader_t *) objstrAlloc(mainobjstore, size);
init_obj(header, 31, 2, 1, 0, NEW);
mhashInsert(header->oid, header);
- mid = iptoMid("128.200.9.30");
+ mid = iptoMid("128.195.175.69");
lhashInsert(header->oid, mid);
//Create and Insert Oid 32
header = (objheader_t *) objstrAlloc(mainobjstore, size);
init_obj(header, 32, 1, 1, 0, NEW);
mhashInsert(header->oid, header);
- mid = iptoMid("128.200.9.30");
+ mid = iptoMid("128.195.175.69");
lhashInsert(header->oid, mid);
//Create and Insert Oid 33
header = (objheader_t *) objstrAlloc(mainobjstore, size);
init_obj(header, 33, 0, 1, 0, NEW);
mhashInsert(header->oid, header);
- mid = iptoMid("128.200.9.30");
+ mid = iptoMid("128.195.175.69");
lhashInsert(header->oid, mid);
//Inserting into lhashtable into d-3.eecs
header = (objheader_t *) objstrAlloc(mainobjstore, size);
init_obj(header, 31, 2, 1, 0, NEW);
mhashInsert(header->oid, header);
- mid = iptoMid("128.200.9.30");
+ mid = iptoMid("128.195.175.69");
lhashInsert(header->oid, mid);
//Create and Insert Oid 32
header = (objheader_t *) objstrAlloc(mainobjstore, size);
init_obj(header, 32, 1, 1, 0, NEW);
mhashInsert(header->oid, header);
- mid = iptoMid("128.200.9.30");
+ mid = iptoMid("128.195.175.69");
lhashInsert(header->oid, mid);
//Create and Insert Oid 33
header = (objheader_t *) objstrAlloc(mainobjstore, size);
init_obj(header, 33, 0, 1, 0, NEW);
mhashInsert(header->oid, header);
- mid = iptoMid("128.200.9.30");
+ mid = iptoMid("128.195.175.69");
lhashInsert(header->oid, mid);
//Inserting into lhashtable into d-3.eecs
int main()
{
- //test1();
+// test1();
// test3();
test4();
}
lhashInsert(21, mid);
lhashInsert(22, mid);
- mid = iptoMid("128.200.9.30"); //d-4.eecs.uci.edu
+ mid = iptoMid("128.195.175.69"); //dw-1.eecs.uci.edu
//Inserting into lhashtable
lhashInsert(31, mid);
lhashInsert(32, mid);
lhashInsert(33, mid);
pthread_create(&thread_Listen, &attr, dstmListen, NULL);
- //Check if machine d-4 is up and running
- checkServer(mid, "128.200.9.30");
+ //Check if machine dw-1 is up and running
+ checkServer(mid, "128.195.175.69");
mid = iptoMid("128.200.9.29");
//Check if machine d-3 is up and running
checkServer(mid, "128.200.9.29");
lhashInsert(21, mid);
lhashInsert(22, mid);
- mid = iptoMid("128.200.9.30"); //d-4.eecs.uci.edu
+ mid = iptoMid("128.195.175.69"); //dw-1.eecs.uci.edu
//Inserting into lhashtable
lhashInsert(31, mid);
lhashInsert(32, mid);
lhashInsert(33, mid);
pthread_create(&thread_Listen, &attr, dstmListen, NULL);
- //Check if machine d-4 is up and running
- checkServer(mid, "128.200.9.30");
+ //Check if machine dw-1 is up and running
+ checkServer(mid, "128.195.175.69");
mid = iptoMid("128.200.9.29");
//Check if machine d-3 is up and running
checkServer(mid, "128.200.9.29");
if((h2 = transRead(myTrans, 1)) == NULL) {
printf("Object not found\n");
}
- //read object 31(present in d-4 machine)
+ //read object 31(present in dw-1 machine)
if((h3 = transRead(myTrans, 31)) == NULL) {
printf("Object not found\n");
}
#include "mlookup.h"
#include "llookup.h"
#include "plookup.h"
-//#include "queue.h"
+#include "queue.h"
#include <pthread.h>
#include <sys/types.h>
#include <sys/socket.h>
#include <unistd.h>
#include <time.h>
#include <string.h>
+#include <pthread.h>
#define LISTEN_PORT 2156
#define RECEIVE_BUFFER_SIZE 2048
+#define NUM_THREADS 10
+#define PREFETCH_CACHE_SIZE 1048576 //1MB
+
+#define GET_NTUPLES(x) ((int *)(x + sizeof(prefetchqelem_t)))
+#define GET_PTR_OID(x) ((unsigned int *)(x + sizeof(prefetchqelem_t) + sizeof(int)))
+#define GET_PTR_EOFF(x,n) ((short *)(x + sizeof(prefetchqelem_t) + sizeof(int) + (n*sizeof(unsigned int))))
+#define GET_PTR_ARRYFLD(x,n) ((short *)(x + sizeof(prefetchqelem_t) + sizeof(int) + (n*sizeof(unsigned int)) + (n*sizeof(short))))
/* Global Variables */
extern int classsize[];
-//extern prefetchthreadqueue_t pfqueue; // Shared prefetch queue
-objstr_t *mainobjstore;
-plistnode_t *createPiles(transrecord_t *);
+extern primarypfq_t pqueue; // shared prefetch queue
+pthread_t wthreads[NUM_THREADS]; //Worker threads for working on the prefetch queue
pthread_t tPrefetch;
+extern objstr_t *mainobjstore;
+objstr_t *prefetchcache;
+plistnode_t *createPiles(transrecord_t *);
inline int arrayLength(int *array) {
int i;
for(i=0 ;array[i] != -1; i++)
;
return i;
}
-
-// DS that contains information to be shared between threads.
-typedef struct prefetchqelem {
- struct prefetchqelem *next;
-} prefetchqelem_t;
-
-typedef struct primarypfq {
- prefetchqelem_t *front, *rear;
- pthread_mutex_t qlock;
- pthread_cond_t qcond;
-} primarypfq_t;
-
-primarypfq_t pqueue; //Global queue
-int flag; //Global flag
-
-void prefetch(int ntuples, unsigned int *oids, short *endoffsets, short *arrayfields);
-
-
-void queueInit(void) {
- pqueue.front = pqueue.rear = NULL;
- pthread_mutex_init(&pqueue.qlock, NULL);
- pthread_cond_init(&pqueue.qcond, NULL);
-}
-
-/* Removes the first element of the queue */
-void delqnode() {
- prefetchqelem_t *delnode;
- if((pqueue.front == NULL) && (pqueue.rear == NULL)) {
- printf("The queue is empty: UNDERFLOW %s, %d\n", __FILE__, __LINE__);
- return;
- } else if (pqueue.front == pqueue.rear) {
- free(pqueue.front);
- pqueue.front = pqueue.rear = NULL;
- } else {
- delnode = pqueue.front;
- pqueue.front = pqueue.front->next;
- free(delnode);
- }
-}
-
-void queueDelete(void) {
- /* Remove each element */
- while(pqueue.front != NULL)
- delqnode();
- pqueue.front = pqueue.rear = NULL;
-}
-
-/* Inserts to the rear of primary prefetch queue */
-void enqueue(prefetchqelem_t *qnode) {
- if(pqueue.front == NULL && pqueue.rear == NULL) {
- pqueue.front = pqueue.rear = qnode;
- } else {
- qnode->next = NULL;
- pqueue.rear->next = qnode;
- pqueue.rear = qnode;
- }
-}
-
-prefetchqelem_t *dequeue(void) {
- prefetchqelem_t *retnode;
-
- if (pqueue.front == NULL) {
- printf("Queue empty: Underflow %s, %d\n", __FILE__, __LINE__);
- return NULL;
- }
- retnode = pqueue.front;
- //TODO make this atomic
- pqueue.front = pqueue.front->next;
-
- return retnode;
-}
-
-void queuedisplay(primarypfq_t pqueue) {
- while(pqueue.front != NULL) {
- /*
- int *ptr = (int *)(pqueue + offset);
- int oid = *ptr;
- printf("oid = %d", oid);
- //printf("Oid = %d", pqueue.front->oids[0]);
- pqueue.front = pqueue.front->next;
- */
- }
-}
-
+/* This function is a prefetch call generated by the compiler that
+ * populates the shared primary prefetch queue*/
void prefetch(int ntuples, unsigned int *oids, short *endoffsets, short *arrayfields) {
int qnodesize;
int len = 0;
- /* Create element for the queue */
- prefetchqelem_t *node;
- qnodesize = sizeof(prefetchqelem_t) + sizeof(int) + ntuples * (sizeof(short) + sizeof(unsigned int)) + endoffsets[ntuples -1] * sizeof(short);
+
+ /* Allocate for the queue node*/
+ char *node;
+ qnodesize = sizeof(prefetchqelem_t) + sizeof(int) + ntuples * (sizeof(short) + sizeof(unsigned int)) + endoffsets[ntuples - 1] * sizeof(short);
if((node = calloc(1,qnodesize)) == NULL) {
printf("Calloc Error %s, %d\n", __FILE__, __LINE__);
return;
}
+ /* Set queue node values */
len = sizeof(prefetchqelem_t);
memcpy(node + len, &ntuples, sizeof(int));
len += sizeof(int);
memcpy(node + len, endoffsets, ntuples*sizeof(short));
len += ntuples * sizeof(short);
memcpy(node + len, arrayfields, endoffsets[ntuples-1]*sizeof(short));
-
+ /* Lock and insert into primary prefetch queue */
pthread_mutex_lock(&pqueue.qlock);
- /* Check if primary queue thread operating on this then wait */
- /*
- if(flag) {
- //TODO
- }
- */
- enqueue(node);
-
+ enqueue((prefetchqelem_t *)node);
+ pthread_cond_signal(&pqueue.qcond);
pthread_mutex_unlock(&pqueue.qlock);
}
-
/* This function initiates the prefetch thread
* A queue is shared between the main thread of execution
* and the prefetch thread to process the prefetch call
* Call from compiler populates the shared queue with prefetch requests while prefetch thread
* processes the prefetch requests */
void transInit() {
- //Initialize shared queue
+ int t, rc;
+ //Create and initialize prefetch cache structure
+ prefetchcache = objstrCreate(PREFETCH_CACHE_SIZE);
+ //Create prefetch cache lookup table
+ if(prehashCreate(HASH_SIZE, LOADFACTOR))
+ return; //Failure
+ //Initialize primary shared queue
queueInit();
- //Create the prefetch thread
+ //Create the primary prefetch thread
pthread_create(&tPrefetch, NULL, transPrefetch, NULL);
+ //Create and Initialize a pool of threads
+ for(t = 0; t< NUM_THREADS; t++) {
+ //rc = pthread_create(&wthreads[t], NULL, pfqProcess, (void *)t);
+ if (rc) {
+ printf("Thread create error %s, %d\n", __FILE__, __LINE__);
+ return;
+ }
+ }
}
/* This function stops the threads spawned */
void transExit() {
+ int t;
pthread_cancel(tPrefetch);
+ for(t = 0; t < NUM_THREADS; t++)
+ pthread_cancel(wthreads[t]);
+
return;
}
-/* This functions inserts randowm wait delays in the order of msec */
+/* This functions inserts randowm wait delays in the order of msec
+ * Mostly used when transaction commits retry*/
void randomdelay(void)
{
struct timespec req, rem;
/* This function is called by the thread calling transPrefetch */
void *transPrefetch(void *prefdata) {
- compprefetchdata_t *ptr = (compprefetchdata_t *) prefdata;
int *offstarray = NULL;
+ prefetchqelem_t *qnode;
while(1) {
- //TODO
+ /* lock mutex of primary prefetch queue */
+ pthread_mutex_lock(&pqueue.qlock);
+ /* while primary queue is empty, then wait */
+ while((pqueue.front == NULL) && (pqueue.rear == NULL)) {
+ pthread_cond_wait(&pqueue.qcond, &pqueue.qlock);
+ }
+
+ /* dequeue node to create a machine piles and finally unlock mutex */
+ if((qnode = dequeue()) == NULL) {
+ printf("Error: No node returned %s, %d\n", __FILE__, __LINE__);
+ return NULL;
+ }
+ pthread_mutex_unlock(&pqueue.qlock);
+ /* Reduce redundant prefetch requests */
+ /* Group Requests by where objects are located */
+
+
+
+ }
+}
+
+/* This function checks if the prefetch oids are same and have same offsets
+ * for case x.a.b and y.a.b where x and y have same oid's
+ * or if a.b.c is a subset of x.b.c.d*/
+/* check for case where the generated request a.y.z or x.y.z.g then
+ * prefetch needs to be generated for x.y.z.g if oid of a and x are same*/
+void checkPrefetchTuples(prefetchqelem_t *node) {
+ int i,j, count,k, sindex, index;
+ char *ptr, *tmp;
+ int ntuples, slength;
+ unsigned int *oid;
+ short *endoffsets, *arryfields;
+
+ /* Check for the case x.y.z and a.b.c are same oids */
+ ptr = (char *) node;
+ ntuples = *(GET_NTUPLES(ptr));
+ oid = GET_PTR_OID(ptr);
+ endoffsets = GET_PTR_EOFF(ptr, ntuples);
+ arryfields = GET_PTR_ARRYFLD(ptr, ntuples);
+ /* Find offset length for each tuple */
+ int numoffset[ntuples];
+ numoffset[0] = endoffsets[0];
+ for(i = 1; i<ntuples; i++) {
+ numoffset[i] = endoffsets[i] - endoffsets[i-1];
+ }
+ /* Check for redundant tuples by comparing oids of each tuple */
+ for(i = 0; i < ntuples; i++) {
+ if(oid[i] == -1)
+ continue;
+ for(j = i+1 ; j < ntuples; j++) {
+ if(oid[j] == -1)
+ continue;
+ /*If oids of tuples match */
+ if (oid[i] == oid[j]) {
+ /* Find the smallest offset length of two tuples*/
+ if(numoffset[i] > numoffset[j]){
+ slength = numoffset[j];
+ sindex = j;
+ }
+ else {
+ slength = numoffset[i];
+ sindex = i;
+ }
+
+ /* Compare the offset values based on the current indices
+ * break if they do not match
+ * if all offset values match then pick the largest tuple*/
+
+ if(i == 0) {
+ k = 0;
+ index = endoffsets[j -1];
+ for(count = 0; count < slength; count ++) {
+ if (arryfields[k] != arryfields[index]) {
+ break;
+ }
+ index++;
+ k++;
+ }
+ } else {
+ printf("i = %d, j = %d\n", i, j);
+ k = endoffsets[i-1];
+ index = endoffsets[j-1];
+ printf("Value of slength = %d\n", slength);
+ for(count = 0; count < slength; count++) {
+ printf("Value of count =%d\n", count);
+ if(arryfields[k] != arryfields[index]) {
+ break;
+ }
+ index++;
+ k++;
+ }
+ printf("Value of count =%d\n", count);
+ }
+ printf("The value of sindex = %d\n", sindex);
+
+ if(slength == count) {
+ printf("DEBUG-> Inside slength if %d\n", sindex);
+ oid[sindex] = -1;
+ }
+ }
+ }
+ }
+}
+
+void checkPreCache(prefetchqelem_t *node, int *numoffset, int counter, int loopcount, unsigned int objoid, int index, int iter, int oidnfound) {
+ char *ptr, *tmp;
+ int ntuples, i, k, flag;
+ unsigned int * oid;
+ short *endoffsets, *arryfields;
+ objheader_t *header;
+
+ ptr = (char *) node;
+ ntuples = *(GET_NTUPLES(ptr));
+ oid = GET_PTR_OID(ptr);
+ endoffsets = GET_PTR_EOFF(ptr, ntuples);
+ arryfields = GET_PTR_ARRYFLD(ptr, ntuples);
+
+ if(oidnfound == 1) {
+ if((header = (objheader_t *) prehashSearch(objoid)) == NULL) {
+ return;
+ } else { //Found in Prefetch Cache
+ //TODO Decide if object is too old, if old remove from cache
+ tmp = (char *) header;
+ /* Check if any of the offset oid is available in the Prefetch cache */
+ for(i = counter; i < loopcount; i++) {
+ objoid = *(tmp + sizeof(objheader_t) + arryfields[counter]);
+ if((header = (objheader_t *)prehashSearch(objoid)) != NULL) {
+ flag = 0;
+ } else {
+ flag = 1;
+ break;
+ }
+ }
+ }
+ } else {
+ for(i = counter; i<loopcount; i++) {
+ if((header = (objheader_t *)prehashSearch(objoid)) != NULL) {
+ tmp = (char *) header;
+ objoid = *(tmp + sizeof(objheader_t) + arryfields[index]);
+ flag = 0;
+ index++;
+ } else {
+ flag = 1;
+ break;
+ }
+ }
+ }
- //Typecast prequest to a struct
- //Gets requests from the compiler
- //Creates and fills the shared queue DS
- //Creates the threads that processes the transPrefetch call or
- //wakes up the prefetchprocess thread if it is sleeping
+ /* If oid not found locally or in prefetch cache then
+ * assign the latest oid found as the new oid
+ * and copy left over offsets into the arrayoffsetfieldarray*/
+ oid[iter] = objoid;
+ numoffset[iter] = numoffset[iter] - (i+1);
+ if(iter == 0)
+ endoffsets[iter] = numoffset[iter];
+ else
+ endoffsets[iter] = numoffset[iter] + endoffsets[iter - 1];
+ for(k = 0; k < numoffset[iter] ; k++) {
+ arryfields[k] = arryfields[counter+1];
+ counter++;
+ }
+
+ if(flag == 0) {
+ oid[iter] = -1;
+ numoffset[iter] = 0;
+ endoffsets[iter] = 0;
+ }
+}
+
+/* This function checks if the oids within the prefetch tuples are available locally.
+ * If yes then makes the tuple invalid. If no then rearranges oid and offset values in
+ * the prefetchqelem_t node to represent a new prefetch tuple */
+void foundLocal(prefetchqelem_t *node) {
+ int ntuples,i, j, k, oidnfound = 0, index, flag;
+ unsigned int *oid;
+ unsigned int objoid;
+ char *ptr, *tmp;
+ objheader_t *objheader;
+ short *endoffsets, *arryfields;
+
+ ptr = (char *) node;
+ ntuples = *(GET_NTUPLES(ptr));
+ oid = GET_PTR_OID(ptr);
+ endoffsets = GET_PTR_EOFF(ptr, ntuples);
+ arryfields = GET_PTR_ARRYFLD(ptr, ntuples);
+ /* Find offset length for each tuple */
+ int numoffset[ntuples];//Number of offsets for each tuple
+ numoffset[0] = endoffsets[0];
+ for(i = 1; i<ntuples; i++) {
+ numoffset[i] = endoffsets[i] - endoffsets[i-1];
+ }
+ for(i = 0; i < ntuples; i++) {
+ if(oid[i] == -1)
+ continue;
+ /* If object found locally */
+ if((objheader = (objheader_t*) mhashSearch(oid[i])) != NULL) {
+ oidnfound = 0;
+ tmp = (char *) objheader;
+ /* Find the oid of its offset value */
+ if(i == 0)
+ index = 0;
+ else
+ index = endoffsets[i - 1];
+ for(j = 0 ; j < numoffset[i] ; j++) {
+ objoid = *(tmp + sizeof(objheader_t) + arryfields[index]);
+ index++;
+ /*New offset oid not found */
+ if((objheader = (objheader_t*) mhashSearch(objoid)) == NULL) {
+ flag = 1;
+ checkPreCache(node, numoffset, j, numoffset[i], objoid, index, i, oidnfound);
+ break;
+ } else
+ flag = 0;
+ }
+ /*If oid not found locally then
+ *assign the latest oid found as the new oid
+ *and copy left over offsets into the arrayoffsetfieldarray*/
+ oid[i] = objoid;
+ numoffset[i] = numoffset[i] - (j+1);
+ if(i == 0)
+ endoffsets[i] = numoffset[i];
+ else
+ endoffsets[i] = numoffset[i] - endoffsets[i - 1];
+ for(k = 0; k < numoffset[i]; k++) {
+ arryfields[k] = arryfields[j+1];
+ j++;
+ }
+ /*If all offset oids are found locally,make the prefetch tuple invalid */
+ if(flag == 0) {
+ oid[i] = -1;
+ numoffset[i] = 0;
+ endoffsets[i] = 0;
+ }
+ } else {
+ oidnfound = 1;
+ /* Look in Prefetch cache */
+ checkPreCache(node, numoffset, 0, numoffset[i], oid[i], 0, i, oidnfound);
+ }
}
}
+void makePreGroups(prefetchqelem_t *node) {
+ char *ptr, *tmp;
+ int ntuples, slength, i, machinenum;
+ unsigned int *oid;
+ short *endoffsets, *arryfields;
+
+
+ /* Check for the case x.y.z and a.b.c are same oids */
+ ptr = (char *) node;
+ ntuples = *(GET_NTUPLES(ptr));
+ oid = GET_PTR_OID(ptr);
+ endoffsets = GET_PTR_EOFF(ptr, ntuples);
+ arryfields = GET_PTR_ARRYFLD(ptr, ntuples);
+ /* Find offset length for each tuple */
+ int numoffset[ntuples];
+ numoffset[0] = endoffsets[0];
+ for(i = 1; i<ntuples; i++) {
+ numoffset[i] = endoffsets[i] - endoffsets[i-1];
+ }
+ /* Check for redundant tuples by comparing oids of each tuple */
+ for(i = 0; i < ntuples; i++) {
+ if(oid[i] == -1)
+ continue;
+ /* For each tuple make piles */
+ if ((machinenum = lhashSearch(oid[i])) == 0) {
+ printf("Error: No such Machine %s, %d\n", __FILE__, __LINE__);
+ return;
+ }
+ }
+
+}
/*This function is called by the thread that processes the
* prefetch request makes piles to prefetch records and prefetches the oids from remote machines */
for(i = 0; i < numoids ; i++) {
arraylength[i] = arrayLength(arrayofoffset[i]);
}
- /* Check for similar tuples or other special case tuples that can be combined to a
- * prefetch message*/
- if(checkPrefetchTuples(arrayofoffset, arraylength, numoids) != 0) {
- printf("Error on checkPrefetchTuples at %s, %d\n", __FILE__, __LINE__);
- return 1;
- }
-
- /* Check if part of prefetch request available locally */
- for(i = 0; i < numoids ; i++) {
- if(arrayofoffset[i][0] != -1) {
- if((objheader = (objheader_t *) mhashSearch(arrayofoffset[i][k])) != NULL) {
- /* Look up in machine lookup table and copy into cache*/
- //tmp = mhashSearch(oid);
- size = sizeof(objheader_t)+classsize[tmp->type];
- objcopy = objstrAlloc(record->cache, size);
- memcpy(objcopy, (void *)objheader, size);
- /* Insert into cache's lookup table */
- chashInsert(record->lookupTable, objheader->oid, objcopy);
- /* Find the offset field*/
- if(foundLocal(arrayofoffset, record, arrayofoffset[i], tmp, arrayofoffset[i][k+1], i) != 0 ) {
- printf("Error in foundLocal() %s, %d\n", __FILE__, __LINE__);
- return 1;
- }
- } else
- continue;
- } else
- continue;
- }
/* Initialize and set thread attributes
* Spawn a thread for each prefetch request sent*/
}
-int checkPrefetchTuples(int *arrayofoffset[], int *arraylength, short numoids) {
- int i, j, k, matchfound = 0, count = 0, slength, length;
- int *sarray, *larray;
-
- /* Check if the prefetch oids are same and have same offsets
- * for case x.a.b and y.a.b where x and y have same oid's
- * or if a.b.c is a subset of x.b.c.d*/
- /* check for case where the generated request a.y.z or x.y.z.g then
- * prefetch needs to be generated for x.y.z.g if oid of a and x are same*/
- for(i = 0; i < numoids -1 ; i++) {
- if(arrayofoffset[i][0] == -1)
- continue;
- for( j = 0; j < (numoids - (i+1)); j++) {
- if(arrayofoffset[j][0] == -1)
- continue;
- /* If the oids of the tuples match */
- if((arrayofoffset[i][0] == arrayofoffset[j][0]) && (i != j )) {
- /*Find the smallest length of two arrays, find ptrs to smallest and long array */
- if(arraylength[i] > arraylength[j]) {
- slength = arraylength[j];
- sarray = arrayofoffset[j];
- larray = arrayofoffset[i];
- } else {
- slength = arraylength[i];
- sarray = arrayofoffset[i];
- larray = arrayofoffset[j];
- }
- /* From first offset until end of tuple compare all offsets of sarray and larray
- * if not a match then break */
- for(k = 1 ; k < slength -1 ; k++) {
- if(sarray[k] != larray[k]) {
- break;
- }
- }
- /* If number of same offsets encountered is same as
- * no of offsets in small array then common tuples found*/
- if(k == (slength -1))
- sarray[0] = -1;
- }
- }
- }
-
- return 0;
-}
-
-/* This function goes through an array and finds out until what offets
- * * can it make a new pile
- * * -1 indicates a special character to mark end of oid, offset tuple*/
-int foundLocal(int *arrayofoffset[], transrecord_t *record, int *array, objheader_t *header, int offset, int index) {
- int i, j, counter, len;
- unsigned int oid;
- void *objcopy;
- objheader_t *tmp, *objheader;
- int size;
-
- /* The oid corresponding to the first offset */
- oid = (int) (header + sizeof(objheader_t) + offset);
- /* Repeat for all offset values in the array */
- for(i = 1; array[i] != -1 ; ) {
- /* If oid is found locally then insert into cache and continue
- * with next offset to find the corresponding oid */
- if((objheader = (objheader_t*) mhashSearch(oid)) != NULL) {
- tmp = mhashSearch(oid);
- size = sizeof(objheader_t)+classsize[tmp->type];
- objcopy = objstrAlloc(record->cache, size);
- memcpy(objcopy, (void *)tmp, size);
- /* Insert into cache's lookup table */
- chashInsert(record->lookupTable, objheader->oid, objcopy);
- oid = (int) (tmp + sizeof(objheader_t) + array[i+1]);
- } else {
- /*If oid not found locally then
- *assign the latest oid found as the first element
- *of array and copy left over offsets as other
- *array elements to build the new prefetch array*/
-
- arrayofoffset[index][0] = (int) oid;
- counter = i;
- len = arrayLength((int *)array[index]);
- for( j = 1 ; counter <= len; j++,counter++) {
- arrayofoffset[index][j] = array[counter + 1];
- }
- break;
- }
- i++;
- }
-
- /* If all offsets are found locally, then do not treat this as valid prefetch tuple */
- if(i == arrayLength((int *)array[index]) - 1) {
- arrayofoffset[index][0] = -1; //Mark beginning of array as -1
- }
-
- return 0;
-}
-
void *sendPrefetchReq(void *prefetchtuple) {
int sd, i;
struct sockaddr_in serv_addr;