From 047e7cdca01fbdcc012e52b4c50e3931ebe2388f Mon Sep 17 00:00:00 2001 From: adash Date: Tue, 7 Aug 2007 21:59:15 +0000 Subject: [PATCH] forgot file add deallocation of node code complete transRead calls TODO: modify memcpy and take care of buffer overflow --- .../src/Runtime/DSTM/interface/dstmserver.c | 2 +- .../src/Runtime/DSTM/interface/machinepile.c | 2 + .../src/Runtime/DSTM/interface/machinepile.h | 1 + Robust/src/Runtime/DSTM/interface/mcpileq.c | 19 ++-- Robust/src/Runtime/DSTM/interface/mcpileq.h | 1 + .../src/Runtime/DSTM/interface/prefetchpile.c | 99 +++++++++++++++++++ Robust/src/Runtime/DSTM/interface/queue.c | 5 + Robust/src/Runtime/DSTM/interface/queue.h | 3 +- Robust/src/Runtime/DSTM/interface/trans.c | 69 ++++++++++--- 9 files changed, 181 insertions(+), 20 deletions(-) create mode 100644 Robust/src/Runtime/DSTM/interface/prefetchpile.c diff --git a/Robust/src/Runtime/DSTM/interface/dstmserver.c b/Robust/src/Runtime/DSTM/interface/dstmserver.c index 33cbd581..7dee7f73 100644 --- a/Robust/src/Runtime/DSTM/interface/dstmserver.c +++ b/Robust/src/Runtime/DSTM/interface/dstmserver.c @@ -619,7 +619,7 @@ int prefetchReq(int acceptfd) { } } - /* Send the buffer size */ + /* Add the buffer size into buffer as a parameter */ memcpy(buffer, &index, sizeof(unsigned int)); /* Send the entire buffer with its size and oids found and not found */ if(send((int)acceptfd, &buffer, sizeof(index - 1), MSG_NOSIGNAL) < sizeof(index -1)) { diff --git a/Robust/src/Runtime/DSTM/interface/machinepile.c b/Robust/src/Runtime/DSTM/interface/machinepile.c index 4e8dc6a5..9b5c416f 100644 --- a/Robust/src/Runtime/DSTM/interface/machinepile.c +++ b/Robust/src/Runtime/DSTM/interface/machinepile.c @@ -46,6 +46,8 @@ int insertPile(int mid, unsigned int oid, short numoffset, short *offset, prefet return 0; } +//TODO int deletePile() { + return 0; } diff --git a/Robust/src/Runtime/DSTM/interface/machinepile.h b/Robust/src/Runtime/DSTM/interface/machinepile.h index b8ca3d69..70fd47fb 100644 --- a/Robust/src/Runtime/DSTM/interface/machinepile.h +++ b/Robust/src/Runtime/DSTM/interface/machinepile.h @@ -6,5 +6,6 @@ #include int insertPile(int, unsigned int, short, short *, prefetchpile_t *); +int deletePile(); #endif diff --git a/Robust/src/Runtime/DSTM/interface/mcpileq.c b/Robust/src/Runtime/DSTM/interface/mcpileq.c index e5ec4a84..27081a08 100644 --- a/Robust/src/Runtime/DSTM/interface/mcpileq.c +++ b/Robust/src/Runtime/DSTM/interface/mcpileq.c @@ -92,9 +92,16 @@ void mcpiledisplay() { } } - - - - - - +void mcdealloc(prefetchpile_t *node) { + /* Remove the offset ptr and linked lists of objpile_t */ + objpile_t *delnode; + while(node->objpiles != NULL) { + node->objpiles->offset = NULL; + delnode = node->objpiles; + node->objpiles = node->objpiles->next; + free(delnode); + node->objpiles->next = NULL; + } + free(node); + node->next = NULL; +} diff --git a/Robust/src/Runtime/DSTM/interface/mcpileq.h b/Robust/src/Runtime/DSTM/interface/mcpileq.h index 7add8deb..93e20630 100644 --- a/Robust/src/Runtime/DSTM/interface/mcpileq.h +++ b/Robust/src/Runtime/DSTM/interface/mcpileq.h @@ -33,5 +33,6 @@ prefetchpile_t *mcpiledequeue(void); void delnode(); void mcpiledelete(); void mcpiledisplay(); +void mcdealloc(prefetchpile_t *); #endif diff --git a/Robust/src/Runtime/DSTM/interface/prefetchpile.c b/Robust/src/Runtime/DSTM/interface/prefetchpile.c new file mode 100644 index 00000000..9f87bdbe --- /dev/null +++ b/Robust/src/Runtime/DSTM/interface/prefetchpile.c @@ -0,0 +1,99 @@ +#include "dstm.h" + +/* Make a queue of prefetchpile_t type */ +prefetchpile_t poolqueue; //Global queue for machine piles + +/* Create new machine group */ +prefetchpile_t *createPile(int numoffsets) { + prefetchpile_t *pile; + if((pile = calloc(1, sizeof(prefetchpile_t))) == NULL) { + printf("Calloc error %s %d\n", __FILE__, __LINE__); + return NULL; + } + + /* Create a new object pile */ + if((pile->objpiles = calloc(1, sizeof(objpile_t))) == NULL) { + printf("Calloc error %s %d\n", __FILE__, __LINE__); + return NULL; + } + + /* Create a ptr to the offset array for a given prefetch oid tuple */ + if((pile->objpiles->offset = calloc(numoffsets, sizeof(short))) == NULL) { + printf("Calloc error %s %d\n", __FILE__, __LINE__); + return NULL; + } + + pile->next = NULL; + + return pile; +} + +/* Into into prefetch pile*/ +void pileIns(prefetchpile_t *pile, short *endoffsets, short* arryfields, unsigned int *oid,int mnum,int noffsets, int index) { + prefetchpile_t *tmp, *ptr; + objpile_t *opile; + short *offsetarry; + int found = 0, k; + + tmp = pile; + while(tmp != NULL) { + //Check if mnum already exists in the pile + if(tmp->mid == mnum) { + /* Create a new object pile */ + if((opile = calloc(1, sizeof(objpile_t))) == NULL) { + printf("Calloc error %s %d\n", __FILE__, __LINE__); + return NULL; + } + opile->next = tmp->objpiles; + tmp->objpiles = opile; + + tmp->objpiles->oid = oid[index]; + if(index == 0) + k = 0; + else + k = endoffsets[index -1]; + //Copy the offset values into objpile + for(i = 0; i < numoffsets[i]; i++) { + ptr->objpile->offsets[i] = arryfields[k]; + k++; + } + /* Create a ptr to the offset array for a given prefetch oid tuple */ + if((offsetarry = calloc(numoffsets, sizeof(short))) == NULL) { + printf("Calloc error %s %d\n", __FILE__, __LINE__); + return NULL; + } + + + found = 1; + break; + } + tmp = tmp->next; + } + + //Add New machine pile to the linked list + if(!found) { + if((ptr = createPile(noffsets)) == NULL) { + printf("No new pile created %s %d\n", __FILE__, __LINE__); + return; + } + ptr->mid = mnum; + ptr->objpile->oid = oid[index]; + if(index == 0) + k = 0; + else + k = endoffsets[index -1]; + //Copy the offset values into objpile + for(i = 0; i < numoffsets[i]; i++) { + ptr->objpile->offsets[i] = arryfields[k]; + k++; + } + ptr->next = pile; + pile = ptr; + } + + return pile; +} + +/* Insert into object pile */ + + diff --git a/Robust/src/Runtime/DSTM/interface/queue.c b/Robust/src/Runtime/DSTM/interface/queue.c index 298e0d11..ce82aa7e 100644 --- a/Robust/src/Runtime/DSTM/interface/queue.c +++ b/Robust/src/Runtime/DSTM/interface/queue.c @@ -73,6 +73,11 @@ void queueDisplay() { } } +void predealloc(prefetchqelem_t *node) { + free(node); + node->next = NULL; +} + #if 0 main() { diff --git a/Robust/src/Runtime/DSTM/interface/queue.h b/Robust/src/Runtime/DSTM/interface/queue.h index ea7dba2d..ff614903 100644 --- a/Robust/src/Runtime/DSTM/interface/queue.h +++ b/Robust/src/Runtime/DSTM/interface/queue.h @@ -21,7 +21,8 @@ typedef struct primarypfq { void queueInit(void); void delqnode(); void queueDelete(void); -void enqueue(prefetchqelem_t *qnode); +void enqueue(prefetchqelem_t *); prefetchqelem_t *dequeue(void); void queueDisplay(); +void predealloc(prefetchqelem_t *); #endif diff --git a/Robust/src/Runtime/DSTM/interface/trans.c b/Robust/src/Runtime/DSTM/interface/trans.c index fdbccf86..f2efc67d 100644 --- a/Robust/src/Runtime/DSTM/interface/trans.c +++ b/Robust/src/Runtime/DSTM/interface/trans.c @@ -14,6 +14,7 @@ #include #include #include +#include #include #include #include @@ -59,7 +60,7 @@ void prefetch(int ntuples, unsigned int *oids, short *endoffsets, short *arrayfi /* Allocate for the queue node*/ char *node; qnodesize = sizeof(prefetchqelem_t) + sizeof(int) + ntuples * (sizeof(short) + sizeof(unsigned int)) + endoffsets[ntuples - 1] * sizeof(short); - if((node = calloc(1,qnodesize)) == NULL) { + if((node = calloc(1, qnodesize)) == NULL) { printf("Calloc Error %s, %d\n", __FILE__, __LINE__); return; } @@ -105,6 +106,7 @@ void transInit() { return; } } + //TODO when to deletethreads } /* This function stops the threads spawned */ @@ -148,21 +150,58 @@ objheader_t *transRead(transrecord_t *record, unsigned int oid) unsigned int machinenumber; objheader_t *tmp, *objheader; void *objcopy; - int size; + int size, rc, found = 0; void *buf; - /* Search local cache */ + struct timespec ts; + struct timeval tp; + + rc = gettimeofday(&tp, NULL); + + /* Convert from timeval to timespec */ + ts.tv_nsec = tp.tv_usec * 1000; + + /* Search local transaction cache */ if((objheader = (objheader_t *)chashSearch(record->lookupTable, oid)) != NULL){ return(objheader); } else if ((objheader = (objheader_t *) mhashSearch(oid)) != NULL) { /* Look up in machine lookup table and copy into cache*/ - // tmp = mhashSearch(oid); + tmp = mhashSearch(oid); size = sizeof(objheader_t)+classsize[tmp->type]; objcopy = objstrAlloc(record->cache, size); memcpy(objcopy, (void *)objheader, size); /* Insert into cache's lookup table */ chashInsert(record->lookupTable, objheader->oid, objcopy); return(objcopy); - } else { /* If not found in machine look up */ + } else if((tmp = (objheader_t *) prehashSearch(oid)) != NULL) { /* Look up in prefetch cache */ + found = 1; + size = sizeof(objheader_t)+classsize[tmp->type]; + objcopy = objstrAlloc(record->cache, size); + memcpy(objcopy, (void *)tmp, size); + /* Insert into cache's lookup table */ + chashInsert(record->lookupTable, tmp->oid, objcopy); + return(objcopy); + } else { /* If not found anywhere, then block until object appears in prefetch cache */ + pthread_mutex_lock(&pflookup.lock); + while(!found) { + rc = pthread_cond_timedwait(&pflookup.cond, &pflookup.lock, &ts); + if(rc == ETIMEDOUT) { + printf("Wait timed out\n"); + /* Check Prefetch cache again */ + if((tmp = (objheader_t *) prehashSearch(oid)) != NULL) { /* Look up in prefetch cache */ + found = 1; + size = sizeof(objheader_t)+classsize[tmp->type]; + objcopy = objstrAlloc(record->cache, size); + memcpy(objcopy, (void *)tmp, size); + /* Insert into cache's lookup table */ + chashInsert(record->lookupTable, tmp->oid, objcopy); + return(objcopy); + } else { + pthread_mutex_unlock(&pflookup.lock); + break; + } + pthread_mutex_unlock(&pflookup.lock); + } + } /* Get the object from the remote location */ machinenumber = lhashSearch(oid); objcopy = getRemoteObj(record, machinenumber, oid); @@ -234,6 +273,7 @@ plistnode_t *createPiles(transrecord_t *record) { /* Check if local or not */ if((localmachinenum = mhashSearch(curr->key)) != NULL) { + /* Set the pile->local flag*/ pile->local = 1; //True i.e. local } @@ -769,6 +809,9 @@ void *handleLocalReq(void *threadarg) { if((v_matchlock > 0 && v_nomatch == 0) || (objnotfound > 0 && v_nomatch == 0)) { localtdata->tdata->recvmsg[localtdata->tdata->thread_id].rcv_status = TRANS_SOFT_ABORT; printf("DEBUG -> Sending TRANS_SOFT_ABORT\n"); + //TODO currently the only soft abort case that is supported is when object locked by previous + //transaction => v_matchlock > 0 + //The other case for SOFT ABORT i.e. when object is not found but versions match is not supported /* Send number of oids not found and the missing oids if objects are missing in the machine */ /* TODO Remember to store the oidnotfound for later use if(objnotfound != 0) { @@ -860,7 +903,6 @@ int transAbortProcess(void *modptr, unsigned int *objlocked, int numlocked, int header = mhashSearch(objlocked[i]);// find the header address ((objheader_t *)header)->status &= ~(LOCK); } - //TODO/* Unset the bit for local objects */ /* Send ack to Coordinator */ printf("DEBUG-> TRANS_SUCCESSFUL\n"); @@ -903,7 +945,6 @@ int transComProcess(trans_commit_data_t *transinfo) { } //TODO Update location lookup table - //TODO/* Unset the bit for local objects */ /* Send ack to Coordinator */ printf("DEBUG-> TRANS_SUCESSFUL\n"); @@ -1184,8 +1225,11 @@ void *transPrefetch(void *t) { mcpileenqueue(pilehead); /* Broadcast signal on machine pile queue */ pthread_cond_broadcast(&mcqueue.qcond); - /* Unlock mutex of mcahine pile queue */ + /* Unlock mutex of machine pile queue */ pthread_mutex_unlock(&mcqueue.qlock); + /* Deallocate the prefetch queue pile node */ + predealloc(qnode); + } } @@ -1200,11 +1244,11 @@ void *mcqProcess(void *threadid) { while(1) { /* Lock mutex of mc pile queue */ pthread_mutex_lock(&mcqueue.qlock); - /* while mc pile queue is empty, then wait */ + /* When mc pile queue is empty, wait */ while((mcqueue.front == NULL) && (mcqueue.rear == NULL)) { pthread_cond_wait(&mcqueue.qcond, &mcqueue.qlock); } - /* dequeue node to send remote machine connections*/ + /* Dequeue node to send remote machine connections*/ if((mcpilenode = mcpiledequeue()) == NULL) { printf("Dequeue Error: No node returned %s %d\n", __FILE__, __LINE__); return NULL; @@ -1217,7 +1261,8 @@ void *mcqProcess(void *threadid) { sendPrefetchReq(mcpilenode, tid); /* TODO: For each object not found query DHT for new location and retrieve the object */ - /* Deallocate the dequeued node */ + /* Deallocate the machine queue pile node */ + mcdealloc(mcpilenode); } } @@ -1392,7 +1437,7 @@ void getPrefetchResponse(int count, int sd) { memcpy(modptr, buffer+index, objsize); index += sizeof(int); /* Add pointer and oid to hash table */ - //TODO Do we need a version comparison herei ?? + //TODO Do we need a version comparison here?? prehashInsert(oid, modptr); /* Broadcast signal on prefetch cache condition variable */ pthread_cond_broadcast(&pflookup.cond); -- 2.34.1