#include "ip.h"
extern objstr_t *mainobjstore;
-int classsize[]={sizeof(int),sizeof(char),sizeof(short), sizeof(void *)};
+typedef struct testobj1 {
+ int x;
+ char z;
+} testobj1_t;
+
+typedef struct testobj2 {
+ char z[10];
+ char c;
+ testobj1_t *y;
+} testobj2_t;
+
+typedef struct testobj3 {
+ short p;
+ testobj1_t *q;
+ testobj2_t *r;
+} testobj3_t;
+
+typedef struct testobj4 {
+ int b;
+ void *q;
+ testobj3_t *a;
+} testobj4_t;
+
+typedef struct testobj5 {
+ testobj4_t *a;
+} testobj5_t;
+
+
+int classsize[]={sizeof(int),sizeof(char),sizeof(short), sizeof(void *), sizeof(testobj1_t),
+ sizeof(testobj2_t), sizeof(testobj3_t), sizeof(testobj4_t), sizeof(testobj5_t)};
+
+
+//int classsize[]={sizeof(int),sizeof(char),sizeof(short), sizeof(void *)};
int test1(void);
int test2(void);
lhashInsert(header->oid, mid);
//Inserting into lhashtable
- mid = iptoMid("128.200.9.29"); //d-3.eecs.uci.edu
+ mid = iptoMid("128.195.175.70"); //dw-2.eecs.uci.edu
lhashInsert(20, mid);
lhashInsert(21, mid);
lhashInsert(22, mid);
//Check if machine dw-1 is up and running
checkServer(mid, "128.195.175.69");
- mid = iptoMid("128.200.9.29");
- //Check if machine d-3 is up and running
- checkServer(mid, "128.200.9.29");
+ mid = iptoMid("128.195.175.70");
+ //Check if machine dw-2 is up and running
+ checkServer(mid, "128.195.175.70");
// Start Transaction
myTrans = transStart();
objheader_t *h1, *h2, *h3, *h4;//h1,h2 from local ; h3 from d-1 , h-4 from d-2
dstmInit();
+ transInit();
pthread_attr_init(&attr);
pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_JOINABLE);
lhashInsert(header->oid, mid);
//Inserting into lhashtable
- mid = iptoMid("128.200.9.29"); //d-3.eecs.uci.edu
+ mid = iptoMid("128.195.175.70"); //dw-2.eecs.uci.edu
lhashInsert(20, mid);
lhashInsert(21, mid);
lhashInsert(22, mid);
pthread_create(&thread_Listen, &attr, dstmListen, NULL);
//Check if machine dw-1 is up and running
checkServer(mid, "128.195.175.69");
- mid = iptoMid("128.200.9.29");
- //Check if machine d-3 is up and running
- checkServer(mid, "128.200.9.29");
+ mid = iptoMid("128.195.175.70");
+ //Check if machine dw-2 is up and running
+ checkServer(mid, "128.195.175.70");
// Start Transaction
myTrans = transStart();
if((h3 = transRead(myTrans, 31)) == NULL) {
printf("Object not found\n");
}
- //read object 21(present in d-3 machine)
+ //read object 21(present in dw-2 machine)
if((h4 = transRead(myTrans, 21)) == NULL) {
printf("Object not found\n");
}
//Create the primary prefetch thread
pthread_create(&tPrefetch, NULL, transPrefetch, NULL);
//Create and Initialize a pool of threads
+ /* Threads are active for the entire period runtime is running */
for(t = 0; t< NUM_THREADS; t++) {
rc = pthread_create(&wthreads[t], NULL, mcqProcess, (void *)t);
if (rc) {
return;
}
}
- //TODO when to deletethreads
}
/* This function stops the threads spawned */
/* This function finds the location of the objects involved in a transaction
* and returns the pointer to the object if found in a remote location */
-objheader_t *transRead(transrecord_t *record, unsigned int oid)
-{
+objheader_t *transRead(transrecord_t *record, unsigned int oid) {
+ printf("Inside transaction read call\n");
unsigned int machinenumber;
objheader_t *tmp, *objheader;
void *objcopy;
/* Search local transaction cache */
if((objheader = (objheader_t *)chashSearch(record->lookupTable, oid)) != NULL){
+ printf("Inside transaction cache \n");
return(objheader);
} else if ((objheader = (objheader_t *) mhashSearch(oid)) != NULL) {
/* Look up in machine lookup table and copy into cache*/
+ printf("Inside mainobject store \n");
tmp = mhashSearch(oid);
size = sizeof(objheader_t)+classsize[TYPE(tmp)];
objcopy = objstrAlloc(record->cache, size);
chashInsert(record->lookupTable, OID(objheader), objcopy);
return(objcopy);
} else if((tmp = (objheader_t *) prehashSearch(oid)) != NULL) { /* Look up in prefetch cache */
+ printf("Inside prefetch cache \n");
found = 1;
size = sizeof(objheader_t)+classsize[TYPE(tmp)];
objcopy = objstrAlloc(record->cache, size);
chashInsert(record->lookupTable, OID(tmp), objcopy);
return(objcopy);
} else { /* If not found anywhere, then block until object appears in prefetch cache */
+ printf("Inside remote machine\n");
pthread_mutex_lock(&pflookup.lock);
while(!found) {
rc = pthread_cond_timedwait(&pflookup.cond, &pflookup.lock, &ts);
char treplyctrl = 0, treplyretry = 0; /* keeps track of the common response that needs to be sent */
char localstat = 0;
+
/* Look through all the objects in the transaction record and make piles
* for each machine involved in the transaction*/
pile = createPiles(record);
if(treplyretry == 1) {
/* wait a random amount of time */
randomdelay();
- //sleep(1);
/* Retry the commiting transaction again */
transCommit(record);
}
/* Send Abort */
*(tdata->replyctrl) = TRANS_ABORT;
printf("DEBUG-> trans.c Sending TRANS_ABORT\n");
+ /* Free resources */
objstrDelete(tdata->rec->cache);
chashDelete(tdata->rec->lookupTable);
free(tdata->rec);
/* Send Commit */
*(tdata->replyctrl) = TRANS_COMMIT;
printf("DEBUG-> trans.c Sending TRANS_COMMIT\n");
+ /* Free resources */
objstrDelete(tdata->rec->cache);
chashDelete(tdata->rec->lookupTable);
free(tdata->rec);
return 0;
}
-/* This function sends the final response to all threads in their respective socket id */
+/* This function sends the final response to remote machines per thread in their respective socket id */
char sendResponse(thread_data_array_t *tdata, int sd) {
int n, N, sum, oidcount = 0;
char *ptr, retval = 0;
}
}
-/*The pool of threads work on this function to establish connection with
- * remote machines */
+/* Each thread in the pool of threads calls this function to establish connection with
+ * remote machines, send the prefetch requests and process the reponses from
+ * the remote machines .
+ * The thread is active throughout the period of runtime */
void *mcqProcess(void *threadid) {
int tid;
/*Initiate connection to remote host and send request */
/* Process Request */
sendPrefetchReq(mcpilenode, tid);
- /* TODO: For each object not found query DHT for new location and retrieve the object */
/* Deallocate the machine queue pile node */
mcdealloc(mcpilenode);
}
}
-/*This function is called by the thread that processes the
- * prefetch request makes piles to prefetch records and prefetches the oids from remote machines */
-int transPrefetchProcess(transrecord_t *record, int *arrayofoffset[], short numoids){
- int i, k = 0, rc;
- int arraylength[numoids];
- unsigned int machinenumber;
- objheader_t *tmp, *objheader;
- void *objcopy;
- int size;
- pthread_attr_t attr;
-
- /* Given tuple find length of tuple*/
- for(i = 0; i < numoids ; i++) {
- arraylength[i] = arrayLength(arrayofoffset[i]);
- }
-
- /* Initialize and set thread attributes
- * Spawn a thread for each prefetch request sent*/
- pthread_t thread[numoids];
- pthread_attr_init(&attr);
- pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_JOINABLE);
-
- /* Create Machine Piles to send prefetch requests use threads*/
- for( i = 0 ; i< numoids ; i++) {
- if(arrayofoffset[i][0] == -1)
- continue;
- else{
- /* For each Pile in the machine send TRANS_PREFETCH */
- //makePiles(arrayofoffset, numoids);
- /* Fill thread data structure */
- //rc = pthread_create(&thread[i] , &attr, sendPrefetchReq, (void *) arrayofoffset[i]);
- if (rc) {
- perror("Error in pthread create at transPrefetchProcess()\n");
- return 1;
- }
-
- }
- }
-
- /* Free attribute and wait to join other threads */
- for (i = 0 ;i < numoids ; i++) {
- rc = pthread_join(thread[i], NULL);
- if (rc) {
- perror("Error pthread_join() in transPrefetchProcess()\n");
- return 1;
- }
- }
- pthread_attr_destroy(&attr);
-
- return 0;
-
-}
-
void sendPrefetchReq(prefetchpile_t *mcpilenode, int threadid) {
int sd, i, offset, off, len, endpair, numoffsets, count = 0;
struct sockaddr_in serv_addr;
/* Get Response from the remote machine */
getPrefetchResponse(count,sd);
close(sd);
+ return;
}
void getPrefetchResponse(int count, int sd) {
unsigned int bufsize,oid;
char buffer[RECEIVE_BUFFER_SIZE], control;
char *ptr;
- void *modptr;
+ void *modptr, *oldptr;
/* Read prefetch response from the Remote machine */
if((val = read(sd, &control, sizeof(char))) <= 0) {
perror("No control response for Prefetch request sent\n");
return;
}
+
if(control == TRANS_PREFETCH_RESPONSE) {
/*For each oid and offset tuple sent as prefetch request to remote machine*/
while(i < count) {
}
memcpy(modptr, buffer+index, objsize);
index += sizeof(int);
- /* Add pointer and oid to hash table */
- //TODO Do we need a version comparison here??
- prehashInsert(oid, modptr);
+ /* Insert the oid and its address into the prefetch hash lookup table */
+ /* Do a version comparison if the oid exists */
+ if((oldptr = prehashSearch(oid)) != NULL) {
+ /* If older version then update with new object ptr */
+ if(((objheader_t *)oldptr)->version < ((objheader_t *)modptr)->version) {
+ prehashRemove(oid);
+ prehashInsert(oid, modptr);
+ } else if(((objheader_t *)oldptr)->version == ((objheader_t *)modptr)->version) {
+ /* Add the new object ptr to hash table */
+ prehashInsert(oid, modptr);
+ } else { /* Do nothing */
+ ;
+ }
+ } else {/*If doesn't no match found in hashtable, add the object ptr to hash table*/
+ prehashInsert(oid, modptr);
+ }
/* Broadcast signal on prefetch cache condition variable */
pthread_cond_broadcast(&pflookup.cond);
/* Unlock the Prefetch Cache look up table*/
pthread_mutex_unlock(&pflookup.lock);
} else if(buffer[index] == OBJECT_NOT_FOUND) {
/* Increment it to get the object */
- // TODO If object not found, local machine takes inventory
+ /* TODO: For each object not found query DHT for new location and retrieve the object */
index += sizeof(char);
memcpy(&oid, buffer + index, sizeof(unsigned int));
index += sizeof(unsigned int);
+ /* Throw an error */
+ printf("OBJECT NOT FOUND.... THIS SHOULD NOT HAPPEN...TERMINATE PROGRAM\n");
+ exit(-1);
} else
printf("Error in decoding the index value %s, %d\n",__FILE__, __LINE__);
}
}
} else
printf("Error in receving response for prefetch request %s, %d\n",__FILE__, __LINE__);
+ return;
}