if ((mobj = mhashSearch(oid)) == NULL) {/* Obj not found */
/* Save the oids not found and number of oids not found for later use */
- oidnotfound[objnotfound] = ((objheader_t *)mobj)->oid;
+ //oidnotfound[objnotfound] = ((objheader_t *)mobj)->oid;
+ oidnotfound[objnotfound] = oid;
objnotfound++;
} else { /* If Obj found in machine (i.e. has not moved) */
/* Check if Obj is locked by any previous transaction */
int prefetchReq(int acceptfd) {
+ int length, sum, n, numbytes, N, oidnfound = 0;
+ unsigned int oid;
+ char *ptr;
+ void *mobj;
+ unsigned int *oidnotfound;
+
+ ptr = (char *)&fixed;;
+
+ /* Counters and arrays to formulate decision on control message to be sent */
+ oidnotfound = (unsigned int *) calloc(fixed->numread + fixed->nummod, sizeof(unsigned int));
+
+ /* Repeated recv the oid and offset pairs sent for prefetch */
+ while((numbytes = recv(int)acceptfd, &length, sizeof(int), 0) != -1) {
+ sum = 0;
+ oid = recv((int)acceptfd, &oid, sizeof(unsigned int), 0);
+ numoffset = (length - (sizeof(int) + sizeof(unsigned int)))/ sizeof(short);
+ N = numoffset * sizeof(short);
+ short offset[numoffset];
+ ptr = (char *)&offset;
+ /* Recv the offset values per oid */
+ do {
+ n = recv((int)acceptfd, (void *)ptr+sum, N-sum, 0);
+ sum += n;
+
+ } while(sum < N && n != 0);
+
+ /* Process each oid */
+ /* Check if object is still present in the machine since the beginning of TRANS_PREFETCH */
+ if ((mobj = mhashSearch(oid)) == NULL) {/* Obj not found */
+ /* Save the oids not found and number of oids not found for later use */
+ oidnotfound[objnotfound] = oid;
+ objnotfound++;
+ } else { /* If Obj found in machine (i.e. has not moved) */
+ /* Return the oid ..its header and data */
+
+
+ }
return 0;
}
#include "mlookup.h"
#include "llookup.h"
#include "plookup.h"
+#include "prelookup.h"
#include "queue.h"
#include <pthread.h>
#include <sys/types.h>
extern int classsize[];
extern primarypfq_t pqueue; // shared prefetch queue
extern mcpileq_t mcqueue; //Shared queue containing prefetch requests sorted by remote machineids
+objstr_t *prefetchcache; //Global Prefetch cache
+extern prehashtable_t pflookup; //Global Prefetch cache's lookup table
pthread_t wthreads[NUM_THREADS]; //Worker threads for working on the prefetch queue
pthread_t tPrefetch;
extern objstr_t *mainobjstore;
-objstr_t *prefetchcache;
plistnode_t *createPiles(transrecord_t *);
inline int arrayLength(int *array) {
pthread_create(&tPrefetch, NULL, transPrefetch, NULL);
//Create and Initialize a pool of threads
for(t = 0; t< NUM_THREADS; t++) {
- // rc = pthread_create(&wthreads[t], NULL, pfqProcess, (void *)t);
+ rc = pthread_create(&wthreads[t], NULL, mcqProcess, (void *)t);
if (rc) {
printf("Thread create error %s, %d\n", __FILE__, __LINE__);
return;
pthread_mutex_lock(&mcqueue.qlock);
/* Update the pool queue with the new remote machine piles generated per prefetch call */
mcpileenqueue(pilehead);
- /* Broadcast signal on pool queue */
+ /* Broadcast signal on machine pile queue */
pthread_cond_broadcast(&mcqueue.qcond);
- /* Unlock mutex of pool queue */
+ /* Unlock mutex of mcahine pile queue */
pthread_mutex_unlock(&mcqueue.qlock);
}
}
+/*The pool of threads work on this function to establish connection with
+ * remote machines */
+
+void *mcqProcess(void *threadid) {
+ int tid;
+ prefetchpile_t *mcpilenode;
+
+ tid = (int) threadid;
+ while(1) {
+ /* Lock mutex of mc pile queue */
+ pthread_mutex_lock(&mcqueue.qlock);
+ /* while mc pile queue is empty, then wait */
+ while((mcqueue.front == NULL) && (mcqueue.rear == NULL)) {
+ pthread_cond_wait(&mcqueue.qcond, &mcqueue.qlock);
+ }
+ /* dequeue node to send remote machine connections*/
+ if((mcpilenode = mcpiledequeue()) == NULL) {
+ printf("Dequeue Error: No node returned %s %d\n", __FILE__, __LINE__);
+ return NULL;
+ }
+ /* Unlock mutex */
+ pthread_mutex_unlock(&mcqueue.qlock);
+
+ /*Initiate connection to remote host and send request */
+ sendPrefetchReq(mcpilenode, tid);
+ /* Process Request */
+
+
+ /* TODO: For each object not found query DHT for new location and retrieve the object */
+
+ /* Deallocate the dequeued node */
+ }
+}
+
/*This function is called by the thread that processes the
* prefetch request makes piles to prefetch records and prefetches the oids from remote machines */
int transPrefetchProcess(transrecord_t *record, int *arrayofoffset[], short numoids){
/* For each Pile in the machine send TRANS_PREFETCH */
//makePiles(arrayofoffset, numoids);
/* Fill thread data structure */
- rc = pthread_create(&thread[i] , &attr, sendPrefetchReq, (void *) arrayofoffset[i]);
+ //rc = pthread_create(&thread[i] , &attr, sendPrefetchReq, (void *) arrayofoffset[i]);
if (rc) {
perror("Error in pthread create at transPrefetchProcess()\n");
return 1;
}
-void *sendPrefetchReq(void *prefetchtuple) {
- int sd, i;
+void sendPrefetchReq(prefetchpile_t *mcpilenode, int threadid) {
+ int sd, i, offset, off, len, endpair;
struct sockaddr_in serv_addr;
struct hostent *server;
- char buffer[RECEIVE_BUFFER_SIZE], control, recvcontrol;
- char machineip[16], retval;
+ char machineip[16], control;
+ objpile_t *tmp;
/* Send Trans Prefetch Request */
if ((sd = socket(AF_INET, SOCK_STREAM, 0)) < 0) {
perror("Error in socket for TRANS_REQUEST\n");
- return NULL;
+ return;
}
bzero((char*) &serv_addr, sizeof(serv_addr));
serv_addr.sin_family = AF_INET;
serv_addr.sin_port = htons(LISTEN_PORT);
- //midtoIP(tdata->mid,machineip);
-// machineip[15] = '\0';
-// serv_addr.sin_addr.s_addr = inet_addr(machineip);
+ //serv_addr.sin_addr.s_addr = inet_addr(MACHINE_IP);
+ midtoIP(mcpilenode->mid ,machineip);
+ machineip[15] = '\0';
+ serv_addr.sin_addr.s_addr = inet_addr(machineip);
/* Open Connection */
if (connect(sd, (struct sockaddr *) &serv_addr, sizeof(struct sockaddr)) < 0) {
perror("Error in connect for TRANS_REQUEST\n");
- return NULL;
+ return;
}
- close(sd);
- pthread_exit(NULL);
+ /* Send TRANS_PREFETCH control message */
+ control = TRANS_PREFETCH;
+ if(send(sd, &control, sizeof(char), MSG_NOSIGNAL) < sizeof(char)) {
+ perror("Error in sending prefetch control\n");
+ return;
+ }
+
+ /* Send Oids and offsets in pairs */
+ tmp = mcpilenode->objpiles;
+ while(tmp != NULL) {
+ off = offset = 0;
+ len = sizeof(int) + sizeof(unsigned int) + ((tmp->numoffset) * sizeof(short));
+ char oidnoffset[len];
+ memcpy(oidnoffset, &len, sizeof(int));
+ off = sizeof(int);
+ memcpy(oidnoffset + off, &tmp->oid, sizeof(unsigned int));
+ off += sizeof(unsigned int);
+ for(i = 0; i < numoffsets; i++) {
+ offset = off + (i * sizeof(short));
+ memcpy(oidnoffset + offset, tmp->offset, sizeof(short));
+ }
+ if (send(sd, &oidnoffset, sizeof(oidnoffset),MSG_NOSIGNAL) < sizeof(oidnoffset)) {
+ perror("Error sending fixed bytes for thread\n");
+ return;
+ }
+ tmp = tmp->next;
+ }
+
+ endpair = -1;
+ if (send(sd, &endpair, sizeof(int), MSG_NOSIGNAL) < sizeof(int)) {
+ perror("Error sending fixed bytes for thread\n");
+ return;
+ }
+
+ /* Get Response from the remote machine */
+ getPrefetchResponse();
+
+// close(sd);
+}
+
+void getPrefetchResponse() {
+ int i;
+
+ /* Lock the Prefetch Cache look up table*/
+ pthread_mutex_lock(&pflookup.lock);
+
+ /*TODO For each object found add to Prefetch Cache */
+
+ /* Broadcast signal on prefetch cache condition variable */
+ pthread_cond_broadcast(&pflookup.qcond);
+
+ /* Unlock the Prefetch Cache look up table*/
+ pthread_mutex_unlock(&pflookup.lock);
+
+
+
+
+
}