Process pool of threads working on machine piles
[IRC.git] / Robust / src / Runtime / DSTM / interface / trans.c
index 2c6f8f4bb167dfd2faa091a420451be69da74100..ae8fd5ba75d1f57536368c71dd052fe9fb938782 100644 (file)
@@ -5,6 +5,7 @@
 #include "mlookup.h"
 #include "llookup.h"
 #include "plookup.h"
+#include "prelookup.h"
 #include "queue.h"
 #include <pthread.h>
 #include <sys/types.h>
 extern int classsize[];
 extern primarypfq_t pqueue; // shared prefetch queue
 extern mcpileq_t mcqueue;  //Shared queue containing prefetch requests sorted by remote machineids 
+objstr_t *prefetchcache; //Global Prefetch cache 
+extern prehashtable_t pflookup; //Global Prefetch cache's lookup table
 pthread_t wthreads[NUM_THREADS]; //Worker threads for working on the prefetch queue
 pthread_t tPrefetch;
 extern objstr_t *mainobjstore;
-objstr_t *prefetchcache;
 
 plistnode_t *createPiles(transrecord_t *);
 inline int arrayLength(int *array) {
@@ -97,7 +99,7 @@ void transInit() {
        pthread_create(&tPrefetch, NULL, transPrefetch, NULL);
        //Create and Initialize a pool of threads 
        for(t = 0; t< NUM_THREADS; t++) {
-       //      rc = pthread_create(&wthreads[t], NULL, pfqProcess, (void *)t);
+               rc = pthread_create(&wthreads[t], NULL, mcqProcess, (void *)t);
                if (rc) {
                        printf("Thread create error %s, %d\n", __FILE__, __LINE__);
                        return;
@@ -1180,13 +1182,47 @@ void *transPrefetch(void *t) {
                pthread_mutex_lock(&mcqueue.qlock);
                /* Update the pool queue with the new remote machine piles generated per prefetch call */
                mcpileenqueue(pilehead);
-               /* Broadcast signal on pool queue */
+               /* Broadcast signal on machine pile queue */
                pthread_cond_broadcast(&mcqueue.qcond);
-               /* Unlock mutex of pool queue */
+               /* Unlock mutex of  mcahine pile queue */
                pthread_mutex_unlock(&mcqueue.qlock);
        }
 }
 
+/*The pool of threads work on this function to establish connection with
+ * remote machines */
+
+void *mcqProcess(void *threadid) {
+       int tid;
+       prefetchpile_t *mcpilenode;
+
+       tid = (int) threadid;
+       while(1) {
+               /* Lock mutex of mc pile queue */
+               pthread_mutex_lock(&mcqueue.qlock);
+               /* while mc pile queue is empty, then wait */
+               while((mcqueue.front == NULL) && (mcqueue.rear == NULL)) {
+                       pthread_cond_wait(&mcqueue.qcond, &mcqueue.qlock);
+               }
+               /* dequeue node to send remote machine connections*/
+               if((mcpilenode = mcpiledequeue()) == NULL) {
+                       printf("Dequeue Error: No node returned %s %d\n", __FILE__, __LINE__);
+                       return NULL;
+               }
+               /* Unlock mutex */
+               pthread_mutex_unlock(&mcqueue.qlock);
+
+               /*Initiate connection to remote host and send request */ 
+               sendPrefetchReq(mcpilenode, tid);
+               /* Process Request */
+
+
+               /* TODO: For each object not found query DHT for new location and retrieve the object */
+       
+               /* Deallocate the dequeued node */
+       }
+}
+
 /*This function is called by the thread that processes the 
  * prefetch request makes piles to prefetch records and prefetches the oids from remote machines */
 int transPrefetchProcess(transrecord_t *record, int *arrayofoffset[], short numoids){
@@ -1217,7 +1253,7 @@ int transPrefetchProcess(transrecord_t *record, int *arrayofoffset[], short numo
                        /* For each Pile in the machine send TRANS_PREFETCH */
                        //makePiles(arrayofoffset, numoids);
                        /* Fill thread data structure */
-                       rc = pthread_create(&thread[i] , &attr, sendPrefetchReq, (void *) arrayofoffset[i]);
+                       //rc = pthread_create(&thread[i] , &attr, sendPrefetchReq, (void *) arrayofoffset[i]);
                        if (rc) {
                                perror("Error in pthread create at transPrefetchProcess()\n");
                                return 1;
@@ -1240,33 +1276,90 @@ int transPrefetchProcess(transrecord_t *record, int *arrayofoffset[], short numo
                
 }
 
-void *sendPrefetchReq(void *prefetchtuple) {
-       int sd, i;
+void sendPrefetchReq(prefetchpile_t *mcpilenode, int threadid) {
+       int sd, i, offset, off, len, endpair;
        struct sockaddr_in serv_addr;
        struct hostent *server;
-       char buffer[RECEIVE_BUFFER_SIZE], control, recvcontrol;
-       char machineip[16], retval;
+       char machineip[16], control;
+       objpile_t *tmp;
 
 
        /* Send Trans Prefetch Request */
        if ((sd = socket(AF_INET, SOCK_STREAM, 0)) < 0) {
                perror("Error in socket for TRANS_REQUEST\n");
-               return NULL;
+               return;
        }
        bzero((char*) &serv_addr, sizeof(serv_addr));
        serv_addr.sin_family = AF_INET;
        serv_addr.sin_port = htons(LISTEN_PORT);
-       //midtoIP(tdata->mid,machineip);
-//     machineip[15] = '\0';
-//     serv_addr.sin_addr.s_addr = inet_addr(machineip);
+       //serv_addr.sin_addr.s_addr = inet_addr(MACHINE_IP);
+       midtoIP(mcpilenode->mid ,machineip);
+       machineip[15] = '\0';
+       serv_addr.sin_addr.s_addr = inet_addr(machineip);
 
        /* Open Connection */
        if (connect(sd, (struct sockaddr *) &serv_addr, sizeof(struct sockaddr)) < 0) {
                perror("Error in connect for TRANS_REQUEST\n");
-               return NULL;
+               return;
        }
 
-       close(sd);
-       pthread_exit(NULL);
+       /* Send TRANS_PREFETCH control message */
+       control = TRANS_PREFETCH;
+       if(send(sd, &control, sizeof(char), MSG_NOSIGNAL) < sizeof(char)) {
+               perror("Error in sending prefetch control\n");
+               return;
+       }
+
+       /* Send Oids and offsets in pairs */
+       tmp = mcpilenode->objpiles;
+       while(tmp != NULL) {
+               off = offset = 0;
+               len = sizeof(int) + sizeof(unsigned int) + ((tmp->numoffset) * sizeof(short));
+               char oidnoffset[len];
+               memcpy(oidnoffset, &len, sizeof(int));
+               off = sizeof(int);
+               memcpy(oidnoffset + off, &tmp->oid, sizeof(unsigned int));
+               off += sizeof(unsigned int);
+               for(i = 0; i < numoffsets; i++) {
+                       offset = off +  (i * sizeof(short));
+                       memcpy(oidnoffset + offset, tmp->offset, sizeof(short));
+               }
+               if (send(sd, &oidnoffset, sizeof(oidnoffset),MSG_NOSIGNAL) < sizeof(oidnoffset)) {
+                       perror("Error sending fixed bytes for thread\n");
+                       return;
+               }
+               tmp = tmp->next;
+       }
+
+       endpair = -1;
+       if (send(sd, &endpair, sizeof(int), MSG_NOSIGNAL) < sizeof(int)) {
+               perror("Error sending fixed bytes for thread\n");
+               return;
+       }
+
+       /* Get Response from the remote machine */
+       getPrefetchResponse();
+
+//     close(sd);
+}
+
+void getPrefetchResponse() {
+       int i;
+
+       /* Lock the Prefetch Cache look up table*/
+       pthread_mutex_lock(&pflookup.lock);
+
+       /*TODO For each object found add to Prefetch Cache */
+
+       /* Broadcast signal on prefetch cache condition variable */ 
+       pthread_cond_broadcast(&pflookup.qcond);
+       
+       /* Unlock the Prefetch Cache look up table*/
+       pthread_mutex_unlock(&pflookup.lock);
+
+
+
+
+
 
 }