#include "mlookup.h"
#include "llookup.h"
#include "plookup.h"
-#include<pthread.h>
-#include<sys/types.h>
-#include<sys/socket.h>
-#include<netdb.h>
-#include<netinet/in.h>
+//#include "queue.h"
+#include <pthread.h>
+#include <sys/types.h>
+#include <sys/socket.h>
+#include <netdb.h>
+#include <netinet/in.h>
#include <sys/types.h>
#include <unistd.h>
#include <time.h>
+#include <string.h>
#define LISTEN_PORT 2156
-#define MACHINE_IP "127.0.0.1"
#define RECEIVE_BUFFER_SIZE 2048
+/* Global Variables */
extern int classsize[];
+//extern prefetchthreadqueue_t pfqueue; // Shared prefetch queue
objstr_t *mainobjstore;
plistnode_t *createPiles(transrecord_t *);
-//int checkPrefetchTuples(int **, int *, short);
+pthread_t tPrefetch;
inline int arrayLength(int *array) {
int i;
return i;
}
+// DS that contains information to be shared between threads.
+typedef struct prefetchqelem {
+ struct prefetchqelem *next;
+} prefetchqelem_t;
+
+typedef struct primarypfq {
+ prefetchqelem_t *front, *rear;
+ pthread_mutex_t qlock;
+ pthread_cond_t qcond;
+} primarypfq_t;
+
+primarypfq_t pqueue; //Global queue
+int flag; //Global flag
+
+void prefetch(int ntuples, unsigned int *oids, short *endoffsets, short *arrayfields);
+
+
+void queueInit(void) {
+ pqueue.front = pqueue.rear = NULL;
+ pthread_mutex_init(&pqueue.qlock, NULL);
+ pthread_cond_init(&pqueue.qcond, NULL);
+}
+
+/* Removes the first element of the queue */
+void delqnode() {
+ prefetchqelem_t *delnode;
+ if((pqueue.front == NULL) && (pqueue.rear == NULL)) {
+ printf("The queue is empty: UNDERFLOW %s, %d\n", __FILE__, __LINE__);
+ return;
+ } else if (pqueue.front == pqueue.rear) {
+ free(pqueue.front);
+ pqueue.front = pqueue.rear = NULL;
+ } else {
+ delnode = pqueue.front;
+ pqueue.front = pqueue.front->next;
+ free(delnode);
+ }
+}
+
+void queueDelete(void) {
+ /* Remove each element */
+ while(pqueue.front != NULL)
+ delqnode();
+ pqueue.front = pqueue.rear = NULL;
+}
+
+/* Inserts to the rear of primary prefetch queue */
+void enqueue(prefetchqelem_t *qnode) {
+ if(pqueue.front == NULL && pqueue.rear == NULL) {
+ pqueue.front = pqueue.rear = qnode;
+ } else {
+ qnode->next = NULL;
+ pqueue.rear->next = qnode;
+ pqueue.rear = qnode;
+ }
+}
+
+prefetchqelem_t *dequeue(void) {
+ prefetchqelem_t *retnode;
+
+ if (pqueue.front == NULL) {
+ printf("Queue empty: Underflow %s, %d\n", __FILE__, __LINE__);
+ return NULL;
+ }
+ retnode = pqueue.front;
+ //TODO make this atomic
+ pqueue.front = pqueue.front->next;
+
+ return retnode;
+}
+
+void queuedisplay(primarypfq_t pqueue) {
+ while(pqueue.front != NULL) {
+ /*
+ int *ptr = (int *)(pqueue + offset);
+ int oid = *ptr;
+ printf("oid = %d", oid);
+ //printf("Oid = %d", pqueue.front->oids[0]);
+ pqueue.front = pqueue.front->next;
+ */
+ }
+}
+
+void prefetch(int ntuples, unsigned int *oids, short *endoffsets, short *arrayfields) {
+ int qnodesize;
+ int len = 0;
+ /* Create element for the queue */
+ prefetchqelem_t *node;
+ qnodesize = sizeof(prefetchqelem_t) + sizeof(int) + ntuples * (sizeof(short) + sizeof(unsigned int)) + endoffsets[ntuples -1] * sizeof(short);
+ if((node = calloc(1,qnodesize)) == NULL) {
+ printf("Calloc Error %s, %d\n", __FILE__, __LINE__);
+ return;
+ }
+ len = sizeof(prefetchqelem_t);
+ memcpy(node + len, &ntuples, sizeof(int));
+ len += sizeof(int);
+ memcpy(node + len, oids, ntuples*sizeof(unsigned int));
+ len += ntuples * sizeof(unsigned int);
+ memcpy(node + len, endoffsets, ntuples*sizeof(short));
+ len += ntuples * sizeof(short);
+ memcpy(node + len, arrayfields, endoffsets[ntuples-1]*sizeof(short));
+
+ pthread_mutex_lock(&pqueue.qlock);
+ /* Check if primary queue thread operating on this then wait */
+ /*
+ if(flag) {
+ //TODO
+ }
+ */
+ enqueue(node);
+
+ pthread_mutex_unlock(&pqueue.qlock);
+}
+
+
+/* This function initiates the prefetch thread
+ * A queue is shared between the main thread of execution
+ * and the prefetch thread to process the prefetch call
+ * Call from compiler populates the shared queue with prefetch requests while prefetch thread
+ * processes the prefetch requests */
+void transInit() {
+ //Initialize shared queue
+ queueInit();
+ //Create the prefetch thread
+ pthread_create(&tPrefetch, NULL, transPrefetch, NULL);
+}
+
+/* This function stops the threads spawned */
+void transExit() {
+ pthread_cancel(tPrefetch);
+ return;
+}
+
/* This functions inserts randowm wait delays in the order of msec */
void randomdelay(void)
{
return;
}
+/* This function initializes things required in the transaction start*/
transrecord_t *transStart()
{
transrecord_t *tmp = malloc(sizeof(transrecord_t));
tmp->cache = objstrCreate(1048576);
tmp->lookupTable = chashCreate(HASH_SIZE, LOADFACTOR);
+
return tmp;
}
int size;
void *buf;
/* Search local cache */
- if((objheader =(objheader_t *)chashSearch(record->lookupTable, oid)) != NULL){
+ if((objheader = (objheader_t *)chashSearch(record->lookupTable, oid)) != NULL){
return(objheader);
} else if ((objheader = (objheader_t *) mhashSearch(oid)) != NULL) {
/* Look up in machine lookup table and copy into cache*/
}
}
}
+
/* This function creates objects in the transaction record */
objheader_t *transCreateObj(transrecord_t *record, unsigned short type)
{
chashInsert(record->lookupTable, tmp->oid, tmp);
return tmp;
}
+
/* This function creates machine piles based on all machines involved in a
* transaction commit request */
plistnode_t *createPiles(transrecord_t *record) {
curr = next;
}
}
-
return pile;
}
+
/* This function initiates the transaction commit process
* Spawns threads for each of the new connections with Participants
* and creates new piles by calling the createPiles(),
thread_data_array[threadnum].replyretry = &treplyretry;
thread_data_array[threadnum].rec = record;
/* If local do not create any extra connection */
- if(pile->local != 1) {
+ if(pile->local != 1) { /* Not local */
rc = pthread_create(&thread[threadnum], NULL, transRequest, (void *) &thread_data_array[threadnum]);
if (rc) {
perror("Error in pthread create\n");
return 1;
}
- } else {
+ } else { /*Local*/
/*Unset the pile->local flag*/
pile->local = 0;
/*Set flag to identify that Local machine is involved*/
return 0;
}
-/*This function makes piles to prefetch records and prefetches the oids from remote machines */
-int transPrefetch(transrecord_t *record, int *arrayofoffset[], short numoids){
+/* This function is called by the thread calling transPrefetch */
+void *transPrefetch(void *prefdata) {
+ compprefetchdata_t *ptr = (compprefetchdata_t *) prefdata;
+ int *offstarray = NULL;
+
+ while(1) {
+ //TODO
+
+ //Typecast prequest to a struct
+ //Gets requests from the compiler
+ //Creates and fills the shared queue DS
+ //Creates the threads that processes the transPrefetch call or
+ //wakes up the prefetchprocess thread if it is sleeping
+
+ }
+}
+
+
+
+/*This function is called by the thread that processes the
+ * prefetch request makes piles to prefetch records and prefetches the oids from remote machines */
+int transPrefetchProcess(transrecord_t *record, int *arrayofoffset[], short numoids){
int i, k = 0, rc;
int arraylength[numoids];
unsigned int machinenumber;
for(i = 0; i < numoids ; i++) {
arraylength[i] = arrayLength(arrayofoffset[i]);
}
- /* Check for similar tuples or other special case tuples that can be combined to a
+ /* Check for similar tuples or other special case tuples that can be combined to a
* prefetch message*/
if(checkPrefetchTuples(arrayofoffset, arraylength, numoids) != 0) {
printf("Error on checkPrefetchTuples at %s, %d\n", __FILE__, __LINE__);
/* Fill thread data structure */
rc = pthread_create(&thread[i] , &attr, sendPrefetchReq, (void *) arrayofoffset[i]);
if (rc) {
- perror("Error in pthread create at transPrefetch()\n");
+ perror("Error in pthread create at transPrefetchProcess()\n");
return 1;
}
for (i = 0 ;i < numoids ; i++) {
rc = pthread_join(thread[i], NULL);
if (rc) {
- perror("Error pthread_join() in transPrefetch()\n");
+ perror("Error pthread_join() in transPrefetchProcess()\n");
return 1;
}
}