Process pool of threads working on machine piles
authoradash <adash>
Thu, 2 Aug 2007 22:51:57 +0000 (22:51 +0000)
committeradash <adash>
Thu, 2 Aug 2007 22:51:57 +0000 (22:51 +0000)
some minor bug fixes

Robust/src/Runtime/DSTM/interface/dstm.h
Robust/src/Runtime/DSTM/interface/dstmserver.c
Robust/src/Runtime/DSTM/interface/machinepile.c
Robust/src/Runtime/DSTM/interface/mcpileq.c
Robust/src/Runtime/DSTM/interface/prelookup.h
Robust/src/Runtime/DSTM/interface/trans.c

index 82f4093276c31b6c2ce19ad598e797ba3d8b18fe..f2f477fa6105db1557cfcabc4e8c133b21f5db4f 100644 (file)
@@ -193,11 +193,12 @@ int transAbortProcess(void *, unsigned int *, int, int, int);
 int transComProcess(trans_commit_data_t *);
 void prefetch(int, unsigned int *, short *, short*);
 void *transPrefetch(void *);
+void *mcqProcess(void *);
 void checkPrefetchTuples(prefetchqelem_t *);
 prefetchpile_t *foundLocal(prefetchqelem_t *);
 prefetchpile_t *makePreGroups(prefetchqelem_t *, int *);
 void checkPreCache(prefetchqelem_t *, int *, int, int, unsigned int, int, int, int);
 int transPrefetchProcess(transrecord_t *, int **, short);
-void *sendPrefetchReq(void *);
+void *sendPrefetchReq(prefetchpile_t*, int);
 /* end transactions */
 #endif
index 7b5bd7345982fa5f47c78ed18148dc2e751a024e..aaf1676285233160aa00279d2fe955445c00d735 100644 (file)
@@ -376,7 +376,8 @@ char handleTransReq(fixed_data_t *fixed, trans_commit_data_t *transinfo, unsigne
 
                if ((mobj = mhashSearch(oid)) == NULL) {/* Obj not found */
                        /* Save the oids not found and number of oids not found for later use */
-                       oidnotfound[objnotfound] = ((objheader_t *)mobj)->oid;
+                       //oidnotfound[objnotfound] = ((objheader_t *)mobj)->oid;
+                       oidnotfound[objnotfound] = oid;
                        objnotfound++;
                } else { /* If Obj found in machine (i.e. has not moved) */
                        /* Check if Obj is locked by any previous transaction */
@@ -529,6 +530,43 @@ int transCommitProcess(trans_commit_data_t *transinfo, int acceptfd) {
 
 
 int prefetchReq(int acceptfd) {
+       int length, sum, n, numbytes, N, oidnfound = 0;
+       unsigned int oid;
+       char *ptr;
+       void *mobj;
+       unsigned int *oidnotfound;
+       
+       ptr = (char *)&fixed;;
+
+       /* Counters and arrays to formulate decision on control message to be sent */
+       oidnotfound = (unsigned int *) calloc(fixed->numread + fixed->nummod, sizeof(unsigned int)); 
+       
+       /* Repeated recv the oid and offset pairs sent for prefetch */
+       while((numbytes = recv(int)acceptfd, &length, sizeof(int), 0) != -1) {
+               sum = 0;
+               oid = recv((int)acceptfd, &oid, sizeof(unsigned int), 0);
+               numoffset = (length - (sizeof(int) + sizeof(unsigned int)))/ sizeof(short);
+               N = numoffset * sizeof(short);
+               short offset[numoffset];
+               ptr = (char *)&offset;
+               /* Recv the offset values per oid */ 
+               do {
+                       n = recv((int)acceptfd, (void *)ptr+sum, N-sum, 0); 
+                       sum += n; 
+
+               } while(sum < N && n != 0);     
+
+               /* Process each oid */
+               /* Check if object is still present in the machine since the beginning of TRANS_PREFETCH */
+               if ((mobj = mhashSearch(oid)) == NULL) {/* Obj not found */
+                       /* Save the oids not found and number of oids not found for later use */
+                       oidnotfound[objnotfound] = oid;
+                       objnotfound++;
+               } else { /* If Obj found in machine (i.e. has not moved) */
+                       /* Return the oid ..its header and data */
+
+
+       }
 
        return 0;
 }
index 58fe1b924c78cfe617015772db203cff43cd44a6..4e8dc6a54a6dc7ec103d04031afc924bdfbfa413 100644 (file)
@@ -46,3 +46,6 @@ int insertPile(int mid, unsigned int oid, short numoffset, short *offset, prefet
        return 0;
 }
 
+int deletePile() {
+
+}
index 379ffd271f67872b790740685a77c73f15fef84d..e5ec4a84d7c27c337e10937d4e32212d84b9bb02 100644 (file)
@@ -22,6 +22,7 @@ void mcpileenqueue(prefetchpile_t *node) {
 }
 */
 
+/* Insert to the rear of machine pile queue */
 void mcpileenqueue(prefetchpile_t *node) {
        prefetchpile_t *tmp, *prev;
        if(mcqueue.front == NULL && mcqueue.rear == NULL) {
@@ -50,6 +51,7 @@ prefetchpile_t *mcpiledequeue(void) {
        }
        retnode = mcqueue.front;
        mcqueue.front = mcqueue.front->next;
+       retnode->next = NULL;
 
        return retnode;
 }
index 0f68d20bac08840e01d7831e6715a8b24ee4b71d..e8bb204dbc23dd674e7662018132aab8bd609363 100644 (file)
@@ -20,6 +20,7 @@ typedef struct prehashtable {
        unsigned int numelements;
        float loadfactor;
        pthread_mutex_t lock;
+       pthread_cond_t cond;
 } prehashtable_t;
 
 /* Prototypes for hash*/
index 2c6f8f4bb167dfd2faa091a420451be69da74100..ae8fd5ba75d1f57536368c71dd052fe9fb938782 100644 (file)
@@ -5,6 +5,7 @@
 #include "mlookup.h"
 #include "llookup.h"
 #include "plookup.h"
+#include "prelookup.h"
 #include "queue.h"
 #include <pthread.h>
 #include <sys/types.h>
 extern int classsize[];
 extern primarypfq_t pqueue; // shared prefetch queue
 extern mcpileq_t mcqueue;  //Shared queue containing prefetch requests sorted by remote machineids 
+objstr_t *prefetchcache; //Global Prefetch cache 
+extern prehashtable_t pflookup; //Global Prefetch cache's lookup table
 pthread_t wthreads[NUM_THREADS]; //Worker threads for working on the prefetch queue
 pthread_t tPrefetch;
 extern objstr_t *mainobjstore;
-objstr_t *prefetchcache;
 
 plistnode_t *createPiles(transrecord_t *);
 inline int arrayLength(int *array) {
@@ -97,7 +99,7 @@ void transInit() {
        pthread_create(&tPrefetch, NULL, transPrefetch, NULL);
        //Create and Initialize a pool of threads 
        for(t = 0; t< NUM_THREADS; t++) {
-       //      rc = pthread_create(&wthreads[t], NULL, pfqProcess, (void *)t);
+               rc = pthread_create(&wthreads[t], NULL, mcqProcess, (void *)t);
                if (rc) {
                        printf("Thread create error %s, %d\n", __FILE__, __LINE__);
                        return;
@@ -1180,13 +1182,47 @@ void *transPrefetch(void *t) {
                pthread_mutex_lock(&mcqueue.qlock);
                /* Update the pool queue with the new remote machine piles generated per prefetch call */
                mcpileenqueue(pilehead);
-               /* Broadcast signal on pool queue */
+               /* Broadcast signal on machine pile queue */
                pthread_cond_broadcast(&mcqueue.qcond);
-               /* Unlock mutex of pool queue */
+               /* Unlock mutex of  mcahine pile queue */
                pthread_mutex_unlock(&mcqueue.qlock);
        }
 }
 
+/*The pool of threads work on this function to establish connection with
+ * remote machines */
+
+void *mcqProcess(void *threadid) {
+       int tid;
+       prefetchpile_t *mcpilenode;
+
+       tid = (int) threadid;
+       while(1) {
+               /* Lock mutex of mc pile queue */
+               pthread_mutex_lock(&mcqueue.qlock);
+               /* while mc pile queue is empty, then wait */
+               while((mcqueue.front == NULL) && (mcqueue.rear == NULL)) {
+                       pthread_cond_wait(&mcqueue.qcond, &mcqueue.qlock);
+               }
+               /* dequeue node to send remote machine connections*/
+               if((mcpilenode = mcpiledequeue()) == NULL) {
+                       printf("Dequeue Error: No node returned %s %d\n", __FILE__, __LINE__);
+                       return NULL;
+               }
+               /* Unlock mutex */
+               pthread_mutex_unlock(&mcqueue.qlock);
+
+               /*Initiate connection to remote host and send request */ 
+               sendPrefetchReq(mcpilenode, tid);
+               /* Process Request */
+
+
+               /* TODO: For each object not found query DHT for new location and retrieve the object */
+       
+               /* Deallocate the dequeued node */
+       }
+}
+
 /*This function is called by the thread that processes the 
  * prefetch request makes piles to prefetch records and prefetches the oids from remote machines */
 int transPrefetchProcess(transrecord_t *record, int *arrayofoffset[], short numoids){
@@ -1217,7 +1253,7 @@ int transPrefetchProcess(transrecord_t *record, int *arrayofoffset[], short numo
                        /* For each Pile in the machine send TRANS_PREFETCH */
                        //makePiles(arrayofoffset, numoids);
                        /* Fill thread data structure */
-                       rc = pthread_create(&thread[i] , &attr, sendPrefetchReq, (void *) arrayofoffset[i]);
+                       //rc = pthread_create(&thread[i] , &attr, sendPrefetchReq, (void *) arrayofoffset[i]);
                        if (rc) {
                                perror("Error in pthread create at transPrefetchProcess()\n");
                                return 1;
@@ -1240,33 +1276,90 @@ int transPrefetchProcess(transrecord_t *record, int *arrayofoffset[], short numo
                
 }
 
-void *sendPrefetchReq(void *prefetchtuple) {
-       int sd, i;
+void sendPrefetchReq(prefetchpile_t *mcpilenode, int threadid) {
+       int sd, i, offset, off, len, endpair;
        struct sockaddr_in serv_addr;
        struct hostent *server;
-       char buffer[RECEIVE_BUFFER_SIZE], control, recvcontrol;
-       char machineip[16], retval;
+       char machineip[16], control;
+       objpile_t *tmp;
 
 
        /* Send Trans Prefetch Request */
        if ((sd = socket(AF_INET, SOCK_STREAM, 0)) < 0) {
                perror("Error in socket for TRANS_REQUEST\n");
-               return NULL;
+               return;
        }
        bzero((char*) &serv_addr, sizeof(serv_addr));
        serv_addr.sin_family = AF_INET;
        serv_addr.sin_port = htons(LISTEN_PORT);
-       //midtoIP(tdata->mid,machineip);
-//     machineip[15] = '\0';
-//     serv_addr.sin_addr.s_addr = inet_addr(machineip);
+       //serv_addr.sin_addr.s_addr = inet_addr(MACHINE_IP);
+       midtoIP(mcpilenode->mid ,machineip);
+       machineip[15] = '\0';
+       serv_addr.sin_addr.s_addr = inet_addr(machineip);
 
        /* Open Connection */
        if (connect(sd, (struct sockaddr *) &serv_addr, sizeof(struct sockaddr)) < 0) {
                perror("Error in connect for TRANS_REQUEST\n");
-               return NULL;
+               return;
        }
 
-       close(sd);
-       pthread_exit(NULL);
+       /* Send TRANS_PREFETCH control message */
+       control = TRANS_PREFETCH;
+       if(send(sd, &control, sizeof(char), MSG_NOSIGNAL) < sizeof(char)) {
+               perror("Error in sending prefetch control\n");
+               return;
+       }
+
+       /* Send Oids and offsets in pairs */
+       tmp = mcpilenode->objpiles;
+       while(tmp != NULL) {
+               off = offset = 0;
+               len = sizeof(int) + sizeof(unsigned int) + ((tmp->numoffset) * sizeof(short));
+               char oidnoffset[len];
+               memcpy(oidnoffset, &len, sizeof(int));
+               off = sizeof(int);
+               memcpy(oidnoffset + off, &tmp->oid, sizeof(unsigned int));
+               off += sizeof(unsigned int);
+               for(i = 0; i < numoffsets; i++) {
+                       offset = off +  (i * sizeof(short));
+                       memcpy(oidnoffset + offset, tmp->offset, sizeof(short));
+               }
+               if (send(sd, &oidnoffset, sizeof(oidnoffset),MSG_NOSIGNAL) < sizeof(oidnoffset)) {
+                       perror("Error sending fixed bytes for thread\n");
+                       return;
+               }
+               tmp = tmp->next;
+       }
+
+       endpair = -1;
+       if (send(sd, &endpair, sizeof(int), MSG_NOSIGNAL) < sizeof(int)) {
+               perror("Error sending fixed bytes for thread\n");
+               return;
+       }
+
+       /* Get Response from the remote machine */
+       getPrefetchResponse();
+
+//     close(sd);
+}
+
+void getPrefetchResponse() {
+       int i;
+
+       /* Lock the Prefetch Cache look up table*/
+       pthread_mutex_lock(&pflookup.lock);
+
+       /*TODO For each object found add to Prefetch Cache */
+
+       /* Broadcast signal on prefetch cache condition variable */ 
+       pthread_cond_broadcast(&pflookup.qcond);
+       
+       /* Unlock the Prefetch Cache look up table*/
+       pthread_mutex_unlock(&pflookup.lock);
+
+
+
+
+
 
 }