From 5f4380353682f532edcdbfe82c145fc4b3879b74 Mon Sep 17 00:00:00 2001 From: adash Date: Thu, 2 Aug 2007 22:51:57 +0000 Subject: [PATCH] Process pool of threads working on machine piles some minor bug fixes --- Robust/src/Runtime/DSTM/interface/dstm.h | 3 +- .../src/Runtime/DSTM/interface/dstmserver.c | 40 +++++- .../src/Runtime/DSTM/interface/machinepile.c | 3 + Robust/src/Runtime/DSTM/interface/mcpileq.c | 2 + Robust/src/Runtime/DSTM/interface/prelookup.h | 1 + Robust/src/Runtime/DSTM/interface/trans.c | 125 +++++++++++++++--- 6 files changed, 156 insertions(+), 18 deletions(-) diff --git a/Robust/src/Runtime/DSTM/interface/dstm.h b/Robust/src/Runtime/DSTM/interface/dstm.h index 82f40932..f2f477fa 100644 --- a/Robust/src/Runtime/DSTM/interface/dstm.h +++ b/Robust/src/Runtime/DSTM/interface/dstm.h @@ -193,11 +193,12 @@ int transAbortProcess(void *, unsigned int *, int, int, int); int transComProcess(trans_commit_data_t *); void prefetch(int, unsigned int *, short *, short*); void *transPrefetch(void *); +void *mcqProcess(void *); void checkPrefetchTuples(prefetchqelem_t *); prefetchpile_t *foundLocal(prefetchqelem_t *); prefetchpile_t *makePreGroups(prefetchqelem_t *, int *); void checkPreCache(prefetchqelem_t *, int *, int, int, unsigned int, int, int, int); int transPrefetchProcess(transrecord_t *, int **, short); -void *sendPrefetchReq(void *); +void *sendPrefetchReq(prefetchpile_t*, int); /* end transactions */ #endif diff --git a/Robust/src/Runtime/DSTM/interface/dstmserver.c b/Robust/src/Runtime/DSTM/interface/dstmserver.c index 7b5bd734..aaf16762 100644 --- a/Robust/src/Runtime/DSTM/interface/dstmserver.c +++ b/Robust/src/Runtime/DSTM/interface/dstmserver.c @@ -376,7 +376,8 @@ char handleTransReq(fixed_data_t *fixed, trans_commit_data_t *transinfo, unsigne if ((mobj = mhashSearch(oid)) == NULL) {/* Obj not found */ /* Save the oids not found and number of oids not found for later use */ - oidnotfound[objnotfound] = ((objheader_t *)mobj)->oid; + //oidnotfound[objnotfound] = ((objheader_t *)mobj)->oid; + oidnotfound[objnotfound] = oid; objnotfound++; } else { /* If Obj found in machine (i.e. has not moved) */ /* Check if Obj is locked by any previous transaction */ @@ -529,6 +530,43 @@ int transCommitProcess(trans_commit_data_t *transinfo, int acceptfd) { int prefetchReq(int acceptfd) { + int length, sum, n, numbytes, N, oidnfound = 0; + unsigned int oid; + char *ptr; + void *mobj; + unsigned int *oidnotfound; + + ptr = (char *)&fixed;; + + /* Counters and arrays to formulate decision on control message to be sent */ + oidnotfound = (unsigned int *) calloc(fixed->numread + fixed->nummod, sizeof(unsigned int)); + + /* Repeated recv the oid and offset pairs sent for prefetch */ + while((numbytes = recv(int)acceptfd, &length, sizeof(int), 0) != -1) { + sum = 0; + oid = recv((int)acceptfd, &oid, sizeof(unsigned int), 0); + numoffset = (length - (sizeof(int) + sizeof(unsigned int)))/ sizeof(short); + N = numoffset * sizeof(short); + short offset[numoffset]; + ptr = (char *)&offset; + /* Recv the offset values per oid */ + do { + n = recv((int)acceptfd, (void *)ptr+sum, N-sum, 0); + sum += n; + + } while(sum < N && n != 0); + + /* Process each oid */ + /* Check if object is still present in the machine since the beginning of TRANS_PREFETCH */ + if ((mobj = mhashSearch(oid)) == NULL) {/* Obj not found */ + /* Save the oids not found and number of oids not found for later use */ + oidnotfound[objnotfound] = oid; + objnotfound++; + } else { /* If Obj found in machine (i.e. has not moved) */ + /* Return the oid ..its header and data */ + + + } return 0; } diff --git a/Robust/src/Runtime/DSTM/interface/machinepile.c b/Robust/src/Runtime/DSTM/interface/machinepile.c index 58fe1b92..4e8dc6a5 100644 --- a/Robust/src/Runtime/DSTM/interface/machinepile.c +++ b/Robust/src/Runtime/DSTM/interface/machinepile.c @@ -46,3 +46,6 @@ int insertPile(int mid, unsigned int oid, short numoffset, short *offset, prefet return 0; } +int deletePile() { + +} diff --git a/Robust/src/Runtime/DSTM/interface/mcpileq.c b/Robust/src/Runtime/DSTM/interface/mcpileq.c index 379ffd27..e5ec4a84 100644 --- a/Robust/src/Runtime/DSTM/interface/mcpileq.c +++ b/Robust/src/Runtime/DSTM/interface/mcpileq.c @@ -22,6 +22,7 @@ void mcpileenqueue(prefetchpile_t *node) { } */ +/* Insert to the rear of machine pile queue */ void mcpileenqueue(prefetchpile_t *node) { prefetchpile_t *tmp, *prev; if(mcqueue.front == NULL && mcqueue.rear == NULL) { @@ -50,6 +51,7 @@ prefetchpile_t *mcpiledequeue(void) { } retnode = mcqueue.front; mcqueue.front = mcqueue.front->next; + retnode->next = NULL; return retnode; } diff --git a/Robust/src/Runtime/DSTM/interface/prelookup.h b/Robust/src/Runtime/DSTM/interface/prelookup.h index 0f68d20b..e8bb204d 100644 --- a/Robust/src/Runtime/DSTM/interface/prelookup.h +++ b/Robust/src/Runtime/DSTM/interface/prelookup.h @@ -20,6 +20,7 @@ typedef struct prehashtable { unsigned int numelements; float loadfactor; pthread_mutex_t lock; + pthread_cond_t cond; } prehashtable_t; /* Prototypes for hash*/ diff --git a/Robust/src/Runtime/DSTM/interface/trans.c b/Robust/src/Runtime/DSTM/interface/trans.c index 2c6f8f4b..ae8fd5ba 100644 --- a/Robust/src/Runtime/DSTM/interface/trans.c +++ b/Robust/src/Runtime/DSTM/interface/trans.c @@ -5,6 +5,7 @@ #include "mlookup.h" #include "llookup.h" #include "plookup.h" +#include "prelookup.h" #include "queue.h" #include #include @@ -26,10 +27,11 @@ extern int classsize[]; extern primarypfq_t pqueue; // shared prefetch queue extern mcpileq_t mcqueue; //Shared queue containing prefetch requests sorted by remote machineids +objstr_t *prefetchcache; //Global Prefetch cache +extern prehashtable_t pflookup; //Global Prefetch cache's lookup table pthread_t wthreads[NUM_THREADS]; //Worker threads for working on the prefetch queue pthread_t tPrefetch; extern objstr_t *mainobjstore; -objstr_t *prefetchcache; plistnode_t *createPiles(transrecord_t *); inline int arrayLength(int *array) { @@ -97,7 +99,7 @@ void transInit() { pthread_create(&tPrefetch, NULL, transPrefetch, NULL); //Create and Initialize a pool of threads for(t = 0; t< NUM_THREADS; t++) { - // rc = pthread_create(&wthreads[t], NULL, pfqProcess, (void *)t); + rc = pthread_create(&wthreads[t], NULL, mcqProcess, (void *)t); if (rc) { printf("Thread create error %s, %d\n", __FILE__, __LINE__); return; @@ -1180,13 +1182,47 @@ void *transPrefetch(void *t) { pthread_mutex_lock(&mcqueue.qlock); /* Update the pool queue with the new remote machine piles generated per prefetch call */ mcpileenqueue(pilehead); - /* Broadcast signal on pool queue */ + /* Broadcast signal on machine pile queue */ pthread_cond_broadcast(&mcqueue.qcond); - /* Unlock mutex of pool queue */ + /* Unlock mutex of mcahine pile queue */ pthread_mutex_unlock(&mcqueue.qlock); } } +/*The pool of threads work on this function to establish connection with + * remote machines */ + +void *mcqProcess(void *threadid) { + int tid; + prefetchpile_t *mcpilenode; + + tid = (int) threadid; + while(1) { + /* Lock mutex of mc pile queue */ + pthread_mutex_lock(&mcqueue.qlock); + /* while mc pile queue is empty, then wait */ + while((mcqueue.front == NULL) && (mcqueue.rear == NULL)) { + pthread_cond_wait(&mcqueue.qcond, &mcqueue.qlock); + } + /* dequeue node to send remote machine connections*/ + if((mcpilenode = mcpiledequeue()) == NULL) { + printf("Dequeue Error: No node returned %s %d\n", __FILE__, __LINE__); + return NULL; + } + /* Unlock mutex */ + pthread_mutex_unlock(&mcqueue.qlock); + + /*Initiate connection to remote host and send request */ + sendPrefetchReq(mcpilenode, tid); + /* Process Request */ + + + /* TODO: For each object not found query DHT for new location and retrieve the object */ + + /* Deallocate the dequeued node */ + } +} + /*This function is called by the thread that processes the * prefetch request makes piles to prefetch records and prefetches the oids from remote machines */ int transPrefetchProcess(transrecord_t *record, int *arrayofoffset[], short numoids){ @@ -1217,7 +1253,7 @@ int transPrefetchProcess(transrecord_t *record, int *arrayofoffset[], short numo /* For each Pile in the machine send TRANS_PREFETCH */ //makePiles(arrayofoffset, numoids); /* Fill thread data structure */ - rc = pthread_create(&thread[i] , &attr, sendPrefetchReq, (void *) arrayofoffset[i]); + //rc = pthread_create(&thread[i] , &attr, sendPrefetchReq, (void *) arrayofoffset[i]); if (rc) { perror("Error in pthread create at transPrefetchProcess()\n"); return 1; @@ -1240,33 +1276,90 @@ int transPrefetchProcess(transrecord_t *record, int *arrayofoffset[], short numo } -void *sendPrefetchReq(void *prefetchtuple) { - int sd, i; +void sendPrefetchReq(prefetchpile_t *mcpilenode, int threadid) { + int sd, i, offset, off, len, endpair; struct sockaddr_in serv_addr; struct hostent *server; - char buffer[RECEIVE_BUFFER_SIZE], control, recvcontrol; - char machineip[16], retval; + char machineip[16], control; + objpile_t *tmp; /* Send Trans Prefetch Request */ if ((sd = socket(AF_INET, SOCK_STREAM, 0)) < 0) { perror("Error in socket for TRANS_REQUEST\n"); - return NULL; + return; } bzero((char*) &serv_addr, sizeof(serv_addr)); serv_addr.sin_family = AF_INET; serv_addr.sin_port = htons(LISTEN_PORT); - //midtoIP(tdata->mid,machineip); -// machineip[15] = '\0'; -// serv_addr.sin_addr.s_addr = inet_addr(machineip); + //serv_addr.sin_addr.s_addr = inet_addr(MACHINE_IP); + midtoIP(mcpilenode->mid ,machineip); + machineip[15] = '\0'; + serv_addr.sin_addr.s_addr = inet_addr(machineip); /* Open Connection */ if (connect(sd, (struct sockaddr *) &serv_addr, sizeof(struct sockaddr)) < 0) { perror("Error in connect for TRANS_REQUEST\n"); - return NULL; + return; } - close(sd); - pthread_exit(NULL); + /* Send TRANS_PREFETCH control message */ + control = TRANS_PREFETCH; + if(send(sd, &control, sizeof(char), MSG_NOSIGNAL) < sizeof(char)) { + perror("Error in sending prefetch control\n"); + return; + } + + /* Send Oids and offsets in pairs */ + tmp = mcpilenode->objpiles; + while(tmp != NULL) { + off = offset = 0; + len = sizeof(int) + sizeof(unsigned int) + ((tmp->numoffset) * sizeof(short)); + char oidnoffset[len]; + memcpy(oidnoffset, &len, sizeof(int)); + off = sizeof(int); + memcpy(oidnoffset + off, &tmp->oid, sizeof(unsigned int)); + off += sizeof(unsigned int); + for(i = 0; i < numoffsets; i++) { + offset = off + (i * sizeof(short)); + memcpy(oidnoffset + offset, tmp->offset, sizeof(short)); + } + if (send(sd, &oidnoffset, sizeof(oidnoffset),MSG_NOSIGNAL) < sizeof(oidnoffset)) { + perror("Error sending fixed bytes for thread\n"); + return; + } + tmp = tmp->next; + } + + endpair = -1; + if (send(sd, &endpair, sizeof(int), MSG_NOSIGNAL) < sizeof(int)) { + perror("Error sending fixed bytes for thread\n"); + return; + } + + /* Get Response from the remote machine */ + getPrefetchResponse(); + +// close(sd); +} + +void getPrefetchResponse() { + int i; + + /* Lock the Prefetch Cache look up table*/ + pthread_mutex_lock(&pflookup.lock); + + /*TODO For each object found add to Prefetch Cache */ + + /* Broadcast signal on prefetch cache condition variable */ + pthread_cond_broadcast(&pflookup.qcond); + + /* Unlock the Prefetch Cache look up table*/ + pthread_mutex_unlock(&pflookup.lock); + + + + + } -- 2.34.1