From 04957c42fc9a4b25e9bfb4e254b63d57e45eef2d Mon Sep 17 00:00:00 2001 From: bdemsky Date: Mon, 14 Apr 2008 07:16:27 +0000 Subject: [PATCH] major changes to prefetching code --- Robust/src/Runtime/DSTM/interface/dstm.h | 4 +- .../src/Runtime/DSTM/interface/machinepile.c | 153 +- .../src/Runtime/DSTM/interface/machinepile.h | 2 +- Robust/src/Runtime/DSTM/interface/mcpileq.c | 112 +- Robust/src/Runtime/DSTM/interface/mcpileq.h | 28 +- Robust/src/Runtime/DSTM/interface/objstr.c | 13 +- Robust/src/Runtime/DSTM/interface/queue.c | 137 +- .../src/Runtime/DSTM/interface/threadnotify.c | 398 ++-- Robust/src/Runtime/DSTM/interface/trans.c | 2044 +++++++---------- 9 files changed, 1256 insertions(+), 1635 deletions(-) diff --git a/Robust/src/Runtime/DSTM/interface/dstm.h b/Robust/src/Runtime/DSTM/interface/dstm.h index 8a86ea2f..61c38a78 100644 --- a/Robust/src/Runtime/DSTM/interface/dstm.h +++ b/Robust/src/Runtime/DSTM/interface/dstm.h @@ -266,10 +266,8 @@ void transAbort(transrecord_t *trans); void prefetch(int, unsigned int *, unsigned short *, short*); void *transPrefetch(void *); void *mcqProcess(void *); -void checkPrefetchTuples(prefetchqelem_t *); prefetchpile_t *foundLocal(prefetchqelem_t *);// returns node with prefetch elements(oids, offsets) -prefetchpile_t *makePreGroups(prefetchqelem_t *, int *);// returns node with prefetch elements(oids, offsets) -void checkPreCache(prefetchqelem_t *, int *, unsigned int, int); +int lookupObject(unsigned int * oid, short offset); int transPrefetchProcess(transrecord_t *, int **, short); void sendPrefetchReq(prefetchpile_t*, int); int getPrefetchResponse(int); diff --git a/Robust/src/Runtime/DSTM/interface/machinepile.c b/Robust/src/Runtime/DSTM/interface/machinepile.c index c27ea01b..10b5aed8 100644 --- a/Robust/src/Runtime/DSTM/interface/machinepile.c +++ b/Robust/src/Runtime/DSTM/interface/machinepile.c @@ -1,85 +1,80 @@ #include "machinepile.h" -prefetchpile_t *insertPile(int mid, unsigned int oid, short numoffset, short *offset, prefetchpile_t *head) { - prefetchpile_t *tmp = head; - prefetchpile_t *ptr; - objpile_t *objnode; - unsigned int *oidarray; - short *offvalues; - int i; - char found = 0; +void insertPile(int mid, unsigned int oid, short numoffset, short *offset, prefetchpile_t **head) { + prefetchpile_t *ptr; + objpile_t *objnode; + unsigned int *oidarray; + objpile_t **tmp; - while (tmp != NULL) { - if (tmp->mid == mid) { // Found a match with exsisting machine id - if ((objnode = (objpile_t *) calloc(1, sizeof(objpile_t))) == NULL) { - printf("Calloc error: %s %d\n", __FILE__, __LINE__); - return NULL; - } - if ((offvalues = (short *) calloc(numoffset, sizeof(short))) == NULL) { - printf("Calloc error: %s %d\n", __FILE__, __LINE__); - return NULL; - } - /* Fill objpiles DS */ - objnode->oid = oid; - objnode->numoffset = numoffset; - for(i = 0; ioffset = offvalues; - objnode->next = tmp->objpiles; - tmp->objpiles = objnode; - found = 1; - break; - } - tmp = tmp->next; - } + //Loop through the machines + for(;1;head=&((*head)->next)) { + int tmid; + if ((*head)==NULL||(tmid=(*head)->mid)>mid) { + prefetchpile_t * tmp = (prefetchpile_t *) malloc(sizeof(prefetchpile_t)); + tmp->mid = mid; + objnode = malloc(sizeof(objpile_t)); + objnode->offset = offset; + objnode->oid = oid; + objnode->numoffset = numoffset; + objnode->next = NULL; + tmp->objpiles = objnode; + tmp->next = *head; + *head=tmp; + return; + } + + //keep looking + if (tmid < mid) + continue; + + //found mid list + for(tmp=&((*head)->objpiles);1;tmp=&((*tmp)->next)) { + int toid; + int matchstatus; + + if ((*tmp)==NULL||((toid=(*tmp)->oid)>oid)) { + objnode = (objpile_t *) malloc(sizeof(objpile_t)); + objnode->offset = offset; + objnode->oid = oid; + objnode->numoffset = numoffset; + objnode->next = *tmp; + *tmp = objnode; + return; + } + if (toid < oid) + continue; + + /* Fill objpiles DS */ + int i; + int onumoffset=(*tmp)->numoffset; + short * ooffset=(*tmp)->offset; - tmp = head; - if(found != 1) { - if(tmp->mid == 0) {//First time - tmp->mid = mid; - if ((objnode = (objpile_t *) calloc(1, sizeof(objpile_t))) == NULL) { - printf("Calloc error: %s %d\n", __FILE__, __LINE__); - return NULL; - } - if ((offvalues = (short *) calloc(numoffset, sizeof(short))) == NULL) { - printf("Calloc error: %s %d\n", __FILE__, __LINE__); - return NULL; - } - // Fill objpiles DS - objnode->oid = oid; - objnode->numoffset = numoffset; - for(i = 0; ioffset = offvalues; - objnode->next = NULL; - tmp->objpiles = objnode; - tmp->next = NULL; - } else { - if ((tmp = (prefetchpile_t *) calloc(1, sizeof(prefetchpile_t))) == NULL) { - printf("Calloc error: %s %d\n", __FILE__, __LINE__); - return NULL; - } - tmp->mid = mid; - if ((objnode = (objpile_t *) calloc(1, sizeof(objpile_t))) == NULL) { - printf("Calloc error: %s %d\n", __FILE__, __LINE__); - return NULL; - } - if ((offvalues = (short *) calloc(numoffset, sizeof(short))) == NULL) { - printf("Calloc error: %s %d\n", __FILE__, __LINE__); - return NULL; - } - // Fill objpiles DS - objnode->oid = oid; - objnode->numoffset = numoffset; - for(i = 0; ioffset = offvalues; - objnode->next = NULL; - tmp->objpiles = objnode; - tmp->next = head; - head = tmp; - } + for(i=0;ionumoffset) { + //We've matched, let's just extend the current prefetch + (*tmp)->numoffset=numoffset; + (*tmp)->offset=offset; + return; } - - return head; + if (ooffset[i]offset[i]) { + //Place item before the current one + objnode = (objpile_t *) malloc(sizeof(objpile_t)); + objnode->offset = offset; + objnode->oid = oid; + objnode->numoffset = numoffset; + objnode->next = *tmp; + *tmp = objnode; + return; + } + } + //if we get to the end, we're already covered by this prefetch + return; + oidloop: + ; + } + } + + } diff --git a/Robust/src/Runtime/DSTM/interface/machinepile.h b/Robust/src/Runtime/DSTM/interface/machinepile.h index 8add41b7..c32a02a8 100644 --- a/Robust/src/Runtime/DSTM/interface/machinepile.h +++ b/Robust/src/Runtime/DSTM/interface/machinepile.h @@ -5,6 +5,6 @@ #include #include -prefetchpile_t *insertPile(int, unsigned int, short, short *, prefetchpile_t *); +void insertPile(int, unsigned int, short, short *, prefetchpile_t **); #endif diff --git a/Robust/src/Runtime/DSTM/interface/mcpileq.c b/Robust/src/Runtime/DSTM/interface/mcpileq.c index fea54094..a8e5d81f 100644 --- a/Robust/src/Runtime/DSTM/interface/mcpileq.c +++ b/Robust/src/Runtime/DSTM/interface/mcpileq.c @@ -3,82 +3,76 @@ mcpileq_t mcqueue; //Global queue void mcpileqInit(void) { - /* Initialize machine queue that containing prefetch oids and offset values sorted by remote machineid */ - mcqueue.front = mcqueue.rear = NULL; - //Intiliaze and set machile pile queue's mutex attribute - pthread_mutexattr_init(&mcqueue.qlockattr); - pthread_mutexattr_settype(&mcqueue.qlockattr, PTHREAD_MUTEX_RECURSIVE_NP); - //pthread_mutex_init(&mcqueue.qlock, NULL); - pthread_mutex_init(&mcqueue.qlock,&mcqueue.qlockattr); - pthread_cond_init(&mcqueue.qcond, NULL); + /* Initialize machine queue that containing prefetch oids and offset values sorted by remote machineid */ + mcqueue.front = mcqueue.rear = NULL; + //Intiliaze and set machile pile queue's mutex attribute + pthread_mutexattr_init(&mcqueue.qlockattr); + pthread_mutexattr_settype(&mcqueue.qlockattr, PTHREAD_MUTEX_RECURSIVE_NP); + pthread_mutex_init(&mcqueue.qlock,&mcqueue.qlockattr); + pthread_cond_init(&mcqueue.qcond, NULL); } /* Insert to the rear of machine pile queue */ void mcpileenqueue(prefetchpile_t *node, prefetchpile_t *tail) { - if(mcqueue.front == NULL && mcqueue.rear == NULL) { - mcqueue.front = node; - mcqueue.rear = tail; - } else { - mcqueue.rear->next = node; - mcqueue.rear = tail; - } + if(mcqueue.front == NULL) { + mcqueue.front = node; + mcqueue.rear = tail; + } else { + mcqueue.rear->next = node; + mcqueue.rear = tail; + } } /* Return the node pointed to by the front ptr of the queue */ prefetchpile_t *mcpiledequeue(void) { - prefetchpile_t *retnode; - if(mcqueue.front == NULL) { - printf("Machine pile queue empty: Underflow %s %d\n", __FILE__, __LINE__); - return NULL; - } - retnode = mcqueue.front; - mcqueue.front = mcqueue.front->next; - if (mcqueue.front == NULL) - mcqueue.rear = NULL; - retnode->next = NULL; - - return retnode; + prefetchpile_t *retnode=mcqueue.front; + if(retnode == NULL) { + printf("Machine pile queue empty: Underflow %s %d\n", __FILE__, __LINE__); + return NULL; + } + mcqueue.front = retnode->next; + if (mcqueue.front == NULL) + mcqueue.rear = NULL; + retnode->next = NULL; + + return retnode; } void mcpiledelete(void) { - /* Remove each element */ - while(mcqueue.front != NULL) - delqnode(); - mcqueue.front = mcqueue.rear = NULL; + /* Remove each element */ + while(mcqueue.front != NULL) + delqnode(); } void mcpiledisplay() { - int mid; - - prefetchpile_t *tmp = mcqueue.front; - while(tmp != NULL) { - printf("Remote machine id = %d\n", tmp->mid); - tmp = tmp->next; - } + int mid; + + prefetchpile_t *tmp = mcqueue.front; + while(tmp != NULL) { + printf("Remote machine id = %d\n", tmp->mid); + tmp = tmp->next; + } } /* Delete prefetchpile_t and everything it points to */ void mcdealloc(prefetchpile_t *node) { - prefetchpile_t *prefetchpile_ptr; - prefetchpile_t *prefetchpile_next_ptr; - objpile_t *objpile_ptr; - objpile_t *objpile_next_ptr; - - prefetchpile_ptr = node; - - while (prefetchpile_ptr != NULL) - { - prefetchpile_next_ptr = prefetchpile_ptr; - while(prefetchpile_ptr->objpiles != NULL) { - if(prefetchpile_ptr->objpiles->numoffset > 0) { - free(prefetchpile_ptr->objpiles->offset); - } - objpile_ptr = prefetchpile_ptr->objpiles; - prefetchpile_ptr->objpiles = objpile_ptr->next; - free(objpile_ptr); - } - prefetchpile_ptr = prefetchpile_next_ptr->next; - free(prefetchpile_next_ptr); - } + prefetchpile_t *prefetchpile_ptr; + prefetchpile_t *prefetchpile_next_ptr; + objpile_t *objpile_ptr; + objpile_t *objpile_next_ptr; + + prefetchpile_ptr = node; + + while (prefetchpile_ptr != NULL) { + prefetchpile_next_ptr = prefetchpile_ptr; + while(prefetchpile_ptr->objpiles != NULL) { + //offsets aren't owned by us, so we don't free them. + objpile_ptr = prefetchpile_ptr->objpiles; + prefetchpile_ptr->objpiles = objpile_ptr->next; + free(objpile_ptr); + } + prefetchpile_ptr = prefetchpile_next_ptr->next; + free(prefetchpile_next_ptr); + } } diff --git a/Robust/src/Runtime/DSTM/interface/mcpileq.h b/Robust/src/Runtime/DSTM/interface/mcpileq.h index fa58eb0f..5c4046c9 100644 --- a/Robust/src/Runtime/DSTM/interface/mcpileq.h +++ b/Robust/src/Runtime/DSTM/interface/mcpileq.h @@ -8,25 +8,25 @@ //Structure to make machine groups when prefetching typedef struct objpile { - unsigned int oid; - short numoffset; - short *offset; - struct objpile *next; -}objpile_t; + unsigned int oid; + short numoffset; + short *offset; + struct objpile *next; +} objpile_t; //Structure for prefetching tuples generated by the compiler typedef struct prefetchpile { - unsigned int mid; - objpile_t *objpiles; - struct prefetchpile *next; -}prefetchpile_t; + unsigned int mid; + objpile_t *objpiles; + struct prefetchpile *next; +} prefetchpile_t; typedef struct mcpileq { - prefetchpile_t *front, *rear; - pthread_mutex_t qlock; - pthread_mutexattr_t qlockattr; - pthread_cond_t qcond; -}mcpileq_t; + prefetchpile_t *front, *rear; + pthread_mutex_t qlock; + pthread_mutexattr_t qlockattr; + pthread_cond_t qcond; +} mcpileq_t; void mcpileqInit(void); void mcpileenqueue(prefetchpile_t *, prefetchpile_t *); diff --git a/Robust/src/Runtime/DSTM/interface/objstr.c b/Robust/src/Runtime/DSTM/interface/objstr.c index 20004396..2589deb8 100644 --- a/Robust/src/Runtime/DSTM/interface/objstr.c +++ b/Robust/src/Runtime/DSTM/interface/objstr.c @@ -1,12 +1,11 @@ #include "dstm.h" -objstr_t *objstrCreate(unsigned int size) -{ - objstr_t *tmp = calloc(1, (sizeof(objstr_t) + size)); - tmp->size = size; - tmp->next = NULL; - tmp->top = tmp + 1; //points to end of objstr_t structure! - return tmp; +objstr_t *objstrCreate(unsigned int size) { + objstr_t *tmp = calloc(1, (sizeof(objstr_t) + size)); + tmp->size = size; + tmp->next = NULL; + tmp->top = tmp + 1; //points to end of objstr_t structure! + return tmp; } //free entire list, starting at store diff --git a/Robust/src/Runtime/DSTM/interface/queue.c b/Robust/src/Runtime/DSTM/interface/queue.c index 26e1bfc4..1056959b 100644 --- a/Robust/src/Runtime/DSTM/interface/queue.c +++ b/Robust/src/Runtime/DSTM/interface/queue.c @@ -3,110 +3,79 @@ primarypfq_t pqueue; //Global queue void queueInit(void) { - /* Intitialize primary queue */ - pqueue.front = pqueue.rear = NULL; - pthread_mutexattr_init(&pqueue.qlockattr); - pthread_mutexattr_settype(&pqueue.qlockattr, PTHREAD_MUTEX_RECURSIVE_NP); - pthread_mutex_init(&pqueue.qlock, &pqueue.qlockattr); - pthread_cond_init(&pqueue.qcond, NULL); + /* Intitialize primary queue */ + pqueue.front = pqueue.rear = NULL; + pthread_mutexattr_init(&pqueue.qlockattr); + pthread_mutexattr_settype(&pqueue.qlockattr, PTHREAD_MUTEX_RECURSIVE_NP); + pthread_mutex_init(&pqueue.qlock, &pqueue.qlockattr); + pthread_cond_init(&pqueue.qcond, NULL); } /* Delete the node pointed to by the front ptr of the queue */ void delqnode() { - prefetchqelem_t *delnode; - if((pqueue.front == NULL) && (pqueue.rear == NULL)) { - printf("The queue is empty: UNDERFLOW %s, %d\n", __FILE__, __LINE__); - return; - } else if ((pqueue.front == pqueue.rear) && pqueue.front != NULL && pqueue.rear != NULL) { - free(pqueue.front); - pqueue.front = pqueue.rear = NULL; - } else { - delnode = pqueue.front; - pqueue.front = pqueue.front->next; - free(delnode); - } + prefetchqelem_t *delnode; + if(pqueue.front == NULL) { + printf("The queue is empty: UNDERFLOW %s, %d\n", __FILE__, __LINE__); + return; + } else if (pqueue.front == pqueue.rear) { + free(pqueue.front); + pqueue.front = pqueue.rear = NULL; + } else { + delnode = pqueue.front; + pqueue.front = pqueue.front->next; + free(delnode); + } } void queueDelete(void) { - /* Remove each element */ - while(pqueue.front != NULL) - delqnode(); - pqueue.front = pqueue.rear = NULL; + /* Remove each element */ + while(pqueue.front != NULL) + delqnode(); } /* Inserts to the rear of primary prefetch queue */ void pre_enqueue(prefetchqelem_t *qnode) { - if(pqueue.front == NULL && pqueue.rear == NULL) { - pqueue.front = pqueue.rear = qnode; - } else { - qnode->next = NULL; - pqueue.rear->next = qnode; - pqueue.rear = qnode; - } + if(pqueue.front == NULL) { + pqueue.front = pqueue.rear = qnode; + qnode->next=NULL; + } else { + qnode->next = NULL; + pqueue.rear->next = qnode; + pqueue.rear = qnode; + } } /* Return the node pointed to by the front ptr of the queue */ prefetchqelem_t *pre_dequeue(void) { - prefetchqelem_t *retnode; - if (pqueue.front == NULL) { - printf("Queue empty: Underflow %s, %d\n", __FILE__, __LINE__); - if(pqueue.rear != NULL) { - printf("pqueue.front points to invalid location %s, %d\n", __FILE__, __LINE__); - } - return NULL; - } - retnode = pqueue.front; - pqueue.front = pqueue.front->next; - if (pqueue.front == NULL) - pqueue.rear = NULL; - retnode->next = NULL; - - return retnode; + prefetchqelem_t *retnode; + if (pqueue.front == NULL) { + printf("Queue empty: Underflow %s, %d\n", __FILE__, __LINE__); + return NULL; + } + retnode = pqueue.front; + pqueue.front = pqueue.front->next; + if (pqueue.front == NULL) + pqueue.rear = NULL; + retnode->next = NULL; + + return retnode; } void queueDisplay() { - int offset = sizeof(prefetchqelem_t); - int *ptr; - int ntuples; - char *ptr1; - prefetchqelem_t *tmp = pqueue.front; - while(tmp != NULL) { - ptr1 = (char *) tmp; - ptr = (int *)(ptr1 + offset); - ntuples = *ptr; - tmp = tmp->next; - } + int offset = sizeof(prefetchqelem_t); + int *ptr; + int ntuples; + char *ptr1; + prefetchqelem_t *tmp = pqueue.front; + while(tmp != NULL) { + ptr1 = (char *) tmp; + ptr = (int *)(ptr1 + offset); + ntuples = *ptr; + tmp = tmp->next; + } } void predealloc(prefetchqelem_t *node) { - free(node); + free(node); } - -#if 0 -main() { - unsigned int oids[] = {11, 13}; - short endoffsets[] = {2, 5}; - short arrayfields[] = {2, 2, 1, 5, 6}; - queueInit(); - queueDisplay(); - prefetch(2, oids, endoffsets, arrayfields); - queueDisplay(); - unsigned int oids1[] = {21, 23, 25, 27}; - short endoffsets1[] = {1, 2, 3, 4}; - short arrayfields1[] = {3, 2, 1, 3}; - prefetch(4, oids1, endoffsets1, arrayfields1); - queueDisplay(); - delqnode(); - queueDisplay(); - delqnode(); - queueDisplay(); - delqnode(); - queueDisplay(); - delqnode(); - -} - -#endif - - diff --git a/Robust/src/Runtime/DSTM/interface/threadnotify.c b/Robust/src/Runtime/DSTM/interface/threadnotify.c index a04f3135..9ff506e4 100644 --- a/Robust/src/Runtime/DSTM/interface/threadnotify.c +++ b/Robust/src/Runtime/DSTM/interface/threadnotify.c @@ -6,257 +6,215 @@ notifyhashtable_t nlookup; //Global hash table * for an update notification from a particular object. * This takes in the head of the linked list and inserts the new node to it */ threadlist_t *insNode(threadlist_t *head, unsigned int threadid, unsigned int mid) { - threadlist_t *ptr; - if(head == NULL) { - if((head = calloc(1, sizeof(threadlist_t))) == NULL) { - printf("Calloc Error %s, %d,\n", __FILE__, __LINE__); - return; - } - head->threadid = threadid; - head->mid = mid; - head->next = NULL; - } else { - if((ptr = calloc(1, sizeof(threadlist_t))) == NULL) { - printf("Calloc Error %s, %d,\n", __FILE__, __LINE__); - return; - } - ptr->threadid = threadid; - ptr->mid = mid; - ptr->next = head; - head = ptr; - } - - return head; + threadlist_t *ptr; + if(head == NULL) { + head = malloc(sizeof(threadlist_t)); + head->threadid = threadid; + head->mid = mid; + head->next = NULL; + } else { + ptr = malloc(sizeof(threadlist_t)); + ptr->threadid = threadid; + ptr->mid = mid; + ptr->next = head; + head = ptr; + } + return head; } /* This function displays the linked list of threads waiting on update notification * from an object */ void display(threadlist_t *head) { - threadlist_t *ptr; - if(head == NULL) { - printf("No thread is waiting\n"); - return; - } else { - while(head != NULL) { - ptr = head; - printf("The threadid waiting is = %d\n", ptr->threadid); - printf("The mid on which thread present = %d\n", ptr->mid); - head = ptr->next; - } - } + threadlist_t *ptr; + if(head == NULL) { + printf("No thread is waiting\n"); + return; + } else { + while(head != NULL) { + ptr = head; + printf("The threadid waiting is = %d\n", ptr->threadid); + printf("The mid on which thread present = %d\n", ptr->mid); + head = ptr->next; + } + } } /* This function creates a new hash table that stores a mapping between the threadid and * a pointer to the thread notify data */ unsigned int notifyhashCreate(unsigned int size, float loadfactor) { - notifylistnode_t *nodes; - - // Allocate space for the hash table - if((nodes = calloc(size, sizeof(notifylistnode_t))) == NULL) { - printf("Calloc error %s %d\n", __FILE__, __LINE__); - return 1; - } - - nlookup.table = nodes; - nlookup.size = size; - nlookup.numelements = 0; // Initial number of elements in the hash - nlookup.loadfactor = loadfactor; - //Initialize the pthread_mutex variable - pthread_mutex_init(&nlookup.locktable, NULL); - return 0; + notifylistnode_t *nodes = calloc(size, sizeof(notifylistnode_t)); + nlookup.table = nodes; + nlookup.size = size; + nlookup.numelements = 0; // Initial number of elements in the hash + nlookup.loadfactor = loadfactor; + //Initialize the pthread_mutex variable + pthread_mutex_init(&nlookup.locktable, NULL); + return 0; } // Assign to tids to bins inside hash table unsigned int notifyhashFunction(unsigned int tid) { - return( tid % (nlookup.size)); + return( tid % (nlookup.size)); } // Insert pointer to the notify data and threadid mapping into the hash table unsigned int notifyhashInsert(unsigned int tid, notifydata_t *ndata) { - unsigned int newsize; - int index; - notifylistnode_t *ptr, *node, *tmp; - int isFound = 0; - - if (nlookup.numelements > (nlookup.loadfactor * nlookup.size)) { - //Resize Table - newsize = 2 * nlookup.size + 1; - pthread_mutex_lock(&nlookup.locktable); - notifyhashResize(newsize); - pthread_mutex_unlock(&nlookup.locktable); - } - /* - ptr = nlookup.table; - nlookup.numelements++; - - index = notifyhashFunction(tid); -#ifdef DEBUG - printf("DEBUG -> index = %d, threadid = %d\n", index, tid); -#endif - pthread_mutex_lock(&nlookup.locktable); - if(ptr[index].next == NULL && ptr[index].threadid == 0) { // Insert at the first position in the hashtable - ptr[index].threadid = tid; - ptr[index].ndata = ndata; - } else { // Insert in the beginning of linked list - if ((node = calloc(1, sizeof(notifylistnode_t))) == NULL) { - printf("Calloc error %s, %d\n", __FILE__, __LINE__); - pthread_mutex_unlock(&nlookup.locktable); - return 1; - } - node->threadid = tid; - node->ndata = ndata; - node->next = ptr[index].next; - ptr[index].next = node; - } - pthread_mutex_unlock(&nlookup.locktable); - */ - ptr = nlookup.table; - index = notifyhashFunction(tid); - pthread_mutex_lock(&nlookup.locktable); - if(ptr[index].next == NULL && ptr[index].threadid == 0) { // Insert at the first position in the hashtable - ptr[index].threadid = tid; - ptr[index].ndata = ndata; - } else { - tmp = &ptr[index]; - while(tmp != NULL) { - if(tmp->threadid == tid) { - isFound = 1; - tmp->ndata = ndata; - } - tmp = tmp->next; - } - if(!isFound) { - if ((node = calloc(1, sizeof(notifylistnode_t))) == NULL) { - printf("Calloc error %s, %d\n", __FILE__, __LINE__); - pthread_mutex_unlock(&nlookup.locktable); - return 1; - } - node->threadid = tid; - node->ndata = ndata; - node->next = ptr[index].next; - ptr[index].next = node; - } - } + unsigned int newsize; + int index; + notifylistnode_t *ptr, *node, *tmp; + int isFound = 0; + + if (nlookup.numelements > (nlookup.loadfactor * nlookup.size)) { + //Resize Table + newsize = 2 * nlookup.size + 1; + pthread_mutex_lock(&nlookup.locktable); + notifyhashResize(newsize); + pthread_mutex_unlock(&nlookup.locktable); + } + ptr = nlookup.table; + index = notifyhashFunction(tid); + pthread_mutex_lock(&nlookup.locktable); + if(ptr[index].next == NULL && ptr[index].threadid == 0) { + // Insert at the first position in the hashtable + ptr[index].threadid = tid; + ptr[index].ndata = ndata; + } else { + tmp = &ptr[index]; + while(tmp != NULL) { + if(tmp->threadid == tid) { + isFound = 1; + tmp->ndata = ndata; + } + tmp = tmp->next; + } + if(!isFound) { + if ((node = calloc(1, sizeof(notifylistnode_t))) == NULL) { + printf("Calloc error %s, %d\n", __FILE__, __LINE__); pthread_mutex_unlock(&nlookup.locktable); - - return 0; + return 1; + } + node->threadid = tid; + node->ndata = ndata; + node->next = ptr[index].next; + ptr[index].next = node; + } + } + pthread_mutex_unlock(&nlookup.locktable); + + return 0; } // Return pointer to thread notify data for a given threadid in the hash table notifydata_t *notifyhashSearch(unsigned int tid) { - int index; - notifylistnode_t *ptr, *node; - - ptr = nlookup.table; // Address of the beginning of hash table - index = notifyhashFunction(tid); - node = &ptr[index]; - pthread_mutex_lock(&nlookup.locktable); - while(node != NULL) { - if(node->threadid == tid) { - pthread_mutex_unlock(&nlookup.locktable); - return node->ndata; - } - node = node->next; - } - pthread_mutex_unlock(&nlookup.locktable); - return NULL; + // Address of the beginning of hash table + notifylistnode_t *ptr = nlookup.table; + int index = notifyhashFunction(tid); + pthread_mutex_lock(&nlookup.locktable); + notifylistnode_t * node = &ptr[index]; + while(node != NULL) { + if(node->threadid == tid) { + pthread_mutex_unlock(&nlookup.locktable); + return node->ndata; + } + node = node->next; + } + pthread_mutex_unlock(&nlookup.locktable); + return NULL; } // Remove an entry from the hash table unsigned int notifyhashRemove(unsigned int tid) { - int index; - notifylistnode_t *curr, *prev, *ptr, *node; - - ptr = nlookup.table; - index = notifyhashFunction(tid); - curr = &ptr[index]; - - pthread_mutex_lock(&nlookup.locktable); - for (; curr != NULL; curr = curr->next) { - if (curr->threadid == tid) { // Find a match in the hash table - nlookup.numelements--; // Decrement the number of elements in the global hashtable - if ((curr == &ptr[index]) && (curr->next == NULL)) { // Delete the first item inside the hashtable with no linked list of notifylistnode_t - curr->threadid = 0; - curr->ndata = NULL; - } else if ((curr == &ptr[index]) && (curr->next != NULL)) { //Delete the first bin item with a linked list of notifylistnode_t connected - curr->threadid = curr->next->threadid; - curr->ndata = curr->next->ndata; - node = curr->next; - curr->next = curr->next->next; - free(node); - } else { // Regular delete from linked listed - prev->next = curr->next; - free(curr); - } - pthread_mutex_unlock(&nlookup.locktable); - return 0; - } - prev = curr; - } - pthread_mutex_unlock(&nlookup.locktable); - return 1; + notifylistnode_t *curr, *prev, *node; + + notifylistnode_t *ptr = nlookup.table; + int index = notifyhashFunction(tid); + + pthread_mutex_lock(&nlookup.locktable); + for (curr = &ptr[index]; curr != NULL; curr = curr->next) { + if (curr->threadid == tid) { // Find a match in the hash table + nlookup.numelements--; // Decrement the number of elements in the global hashtable + if ((curr == &ptr[index]) && (curr->next == NULL)) { // Delete the first item inside the hashtable with no linked list of notifylistnode_t + curr->threadid = 0; + curr->ndata = NULL; + } else if ((curr == &ptr[index]) && (curr->next != NULL)) { //Delete the first bin item with a linked list of notifylistnode_t connected + curr->threadid = curr->next->threadid; + curr->ndata = curr->next->ndata; + node = curr->next; + curr->next = curr->next->next; + free(node); + } else { // Regular delete from linked listed + prev->next = curr->next; + free(curr); + } + pthread_mutex_unlock(&nlookup.locktable); + return 0; + } + prev = curr; + } + pthread_mutex_unlock(&nlookup.locktable); + return 1; } // Resize table unsigned int notifyhashResize(unsigned int newsize) { - notifylistnode_t *node, *ptr, *curr, *next; // curr and next keep track of the current and the next notifyhashlistnodes in a linked list - unsigned int oldsize; - int isfirst; // Keeps track of the first element in the notifylistnode_t for each bin in hashtable - int i,index; - notifylistnode_t *newnode; - - ptr = nlookup.table; - oldsize = nlookup.size; - - if((node = calloc(newsize, sizeof(notifylistnode_t))) == NULL) { - printf("Calloc error %s %d\n", __FILE__, __LINE__); - return 1; - } - - nlookup.table = node; //Update the global hashtable upon resize() - nlookup.size = newsize; - nlookup.numelements = 0; - - for(i = 0; i < oldsize; i++) { //Outer loop for each bin in hash table - curr = &ptr[i]; - isfirst = 1; - while (curr != NULL) { //Inner loop to go through linked lists - if (curr->threadid == 0) { //Exit inner loop if there the first element for a given bin/index is NULL - break; //threadid = threadcond =0 for element if not present within the hash table - } - next = curr->next; - index = notifyhashFunction(curr->threadid); + notifylistnode_t *node, *ptr, *curr, *next; // curr and next keep track of the current and the next notifyhashlistnodes in a linked list + unsigned int oldsize; + int isfirst; // Keeps track of the first element in the notifylistnode_t for each bin in hashtable + int i,index; + notifylistnode_t *newnode; + + ptr = nlookup.table; + oldsize = nlookup.size; + + if((node = calloc(newsize, sizeof(notifylistnode_t))) == NULL) { + printf("Calloc error %s %d\n", __FILE__, __LINE__); + return 1; + } + + nlookup.table = node; //Update the global hashtable upon resize() + nlookup.size = newsize; + nlookup.numelements = 0; + + for(i = 0; i < oldsize; i++) { //Outer loop for each bin in hash table + curr = &ptr[i]; + isfirst = 1; + while (curr != NULL) { //Inner loop to go through linked lists + if (curr->threadid == 0) { //Exit inner loop if there the first element for a given bin/index is NULL + break; //threadid = threadcond =0 for element if not present within the hash table + } + next = curr->next; + index = notifyhashFunction(curr->threadid); #ifdef DEBUG - printf("DEBUG(resize) -> index = %d, threadid = %d\n", index, curr->threadid); + printf("DEBUG(resize) -> index = %d, threadid = %d\n", index, curr->threadid); #endif - // Insert into the new table - if(nlookup.table[index].next == NULL && nlookup.table[index].threadid == 0) { - nlookup.table[index].threadid = curr->threadid; - nlookup.table[index].ndata = curr->ndata; - nlookup.numelements++; - }else { - if((newnode = calloc(1, sizeof(notifylistnode_t))) == NULL) { - printf("Calloc error %s, %d\n", __FILE__, __LINE__); - return 1; - } - newnode->threadid = curr->threadid; - newnode->ndata = curr->ndata; - newnode->next = nlookup.table[index].next; - nlookup.table[index].next = newnode; - nlookup.numelements++; - } - - //free the linked list of notifylistnode_t if not the first element in the hash table - if (isfirst != 1) { - free(curr); - } - - isfirst = 0; - curr = next; - } - } - - free(ptr); //Free the memory of the old hash table - ptr = NULL; - return 0; + // Insert into the new table + if(nlookup.table[index].next == NULL && nlookup.table[index].threadid == 0) { + nlookup.table[index].threadid = curr->threadid; + nlookup.table[index].ndata = curr->ndata; + nlookup.numelements++; + }else { + if((newnode = calloc(1, sizeof(notifylistnode_t))) == NULL) { + printf("Calloc error %s, %d\n", __FILE__, __LINE__); + return 1; + } + newnode->threadid = curr->threadid; + newnode->ndata = curr->ndata; + newnode->next = nlookup.table[index].next; + nlookup.table[index].next = newnode; + nlookup.numelements++; + } + + //free the linked list of notifylistnode_t if not the first element in the hash table + if (isfirst != 1) { + free(curr); + } + + isfirst = 0; + curr = next; + } + } + + free(ptr); //Free the memory of the old hash table + ptr = NULL; + return 0; } diff --git a/Robust/src/Runtime/DSTM/interface/trans.c b/Robust/src/Runtime/DSTM/interface/trans.c index 155a6222..046c83f9 100644 --- a/Robust/src/Runtime/DSTM/interface/trans.c +++ b/Robust/src/Runtime/DSTM/interface/trans.c @@ -46,33 +46,33 @@ plistnode_t *createPiles(transrecord_t *); * Send and Recv function calls *******************************/ void send_data(int fd , void *buf, int buflen) { - char *buffer = (char *)(buf); - int size = buflen; - int numbytes; - while (size > 0) { - numbytes = send(fd, buffer, size, MSG_NOSIGNAL); - if (numbytes == -1) { - perror("send"); - exit(-1); - } - buffer += numbytes; - size -= numbytes; - } + char *buffer = (char *)(buf); + int size = buflen; + int numbytes; + while (size > 0) { + numbytes = send(fd, buffer, size, MSG_NOSIGNAL); + if (numbytes == -1) { + perror("send"); + exit(-1); + } + buffer += numbytes; + size -= numbytes; + } } void recv_data(int fd , void *buf, int buflen) { - char *buffer = (char *)(buf); - int size = buflen; - int numbytes; - while (size > 0) { - numbytes = recv(fd, buffer, size, 0); - if (numbytes == -1) { - perror("recv"); - exit(-1); - } - buffer += numbytes; - size -= numbytes; - } + char *buffer = (char *)(buf); + int size = buflen; + int numbytes; + while (size > 0) { + numbytes = recv(fd, buffer, size, 0); + if (numbytes == -1) { + perror("recv"); + exit(-1); + } + buffer += numbytes; + size -= numbytes; + } } int recv_data_errorcode(int fd , void *buf, int buflen) { @@ -90,75 +90,57 @@ int recv_data_errorcode(int fd , void *buf, int buflen) { return 0; } -void printhex(unsigned char *ptr, int numBytes) -{ - int i; - for (i = 0; i < numBytes; i++) - { - if (ptr[i] < 16) - printf("0%x ", ptr[i]); - else - printf("%x ", ptr[i]); - } - printf("\n"); - return; +void printhex(unsigned char *ptr, int numBytes) { + int i; + for (i = 0; i < numBytes; i++) { + if (ptr[i] < 16) + printf("0%x ", ptr[i]); + else + printf("%x ", ptr[i]); + } + printf("\n"); + return; } inline int arrayLength(int *array) { - int i; - for(i=0 ;array[i] != -1; i++) - ; - return i; + int i; + for(i=0 ;array[i] != -1; i++) + ; + return i; } + inline int findmax(int *array, int arraylength) { - int max, i; - max = array[0]; - for(i = 0; i < arraylength; i++){ - if(array[i] > max) { - max = array[i]; - } - } - return max; + int max, i; + max = array[0]; + for(i = 0; i < arraylength; i++){ + if(array[i] > max) { + max = array[i]; + } + } + return max; } + /* This function is a prefetch call generated by the compiler that * populates the shared primary prefetch queue*/ void prefetch(int ntuples, unsigned int *oids, unsigned short *endoffsets, short *arrayfields) { /* Allocate for the queue node*/ - if(ntuples > 0) { - int qnodesize = sizeof(prefetchqelem_t) + sizeof(int) + ntuples * (sizeof(unsigned short) + sizeof(unsigned int)) + endoffsets[ntuples - 1] * sizeof(short); - char * node; - - if((node = calloc(1, qnodesize)) == NULL) { - printf("Calloc Error %s, %d\n", __FILE__, __LINE__); - return; - } else { - /* Set queue node values */ - int len = sizeof(prefetchqelem_t); - int i; - unsigned int *narray; - unsigned short *narray2; - short * narray3; - int top=endoffsets[ntuples-1]; - *((int *)(node+len))=ntuples; - len += sizeof(int); - narray=(unsigned int *)(node+len); - narray2=(unsigned short *)(narray+ntuples); - narray3=(short *)(narray2+ntuples); - - for(i=0;inext != NULL) { - /* check if store is empty */ - if(((unsigned int)ptr->top - (unsigned int)ptr - sizeof(objstr_t) + size) <= ptr->size) { - tmp = ptr->top; - ptr->top += size; - success = 1; - return tmp; - } else { - ptr = ptr-> next; - } - } - - if(success == 0) { - return NULL; - } + void *tmp; + objstr_t *ptr; + ptr = store; + int success = 0; + + while(ptr->next != NULL) { + /* check if store is empty */ + if(((unsigned int)ptr->top - (unsigned int)ptr - sizeof(objstr_t) + size) <= ptr->size) { + tmp = ptr->top; + ptr->top += size; + success = 1; + return tmp; + } else { + ptr = ptr-> next; + } + } + + if(success == 0) { + return NULL; + } } -/* This function initiates the prefetch thread - * A queue is shared between the main thread of execution - * and the prefetch thread to process the prefetch call - * Call from compiler populates the shared queue with prefetch requests while prefetch thread - * processes the prefetch requests */ -void transInit() { - int t, rc; - int retval; - //Create and initialize prefetch cache structure - prefetchcache = objstrCreate(PREFETCH_CACHE_SIZE); - //prefetchcache->next = objstrCreate(PREFETCH_CACHE_SIZE); - //prefetchcache->next->next = objstrCreate(PREFETCH_CACHE_SIZE); - - /* Initialize attributes for mutex */ - pthread_mutexattr_init(&prefetchcache_mutex_attr); - pthread_mutexattr_settype(&prefetchcache_mutex_attr, PTHREAD_MUTEX_RECURSIVE_NP); - - pthread_mutex_init(&prefetchcache_mutex, &prefetchcache_mutex_attr); - - //Create prefetch cache lookup table - if(prehashCreate(HASH_SIZE, LOADFACTOR)) - return; //Failure - - //Initialize primary shared queue - queueInit(); - //Initialize machine pile w/prefetch oids and offsets shared queue - mcpileqInit(); +/* This function initiates the prefetch thread A queue is shared + * between the main thread of execution and the prefetch thread to + * process the prefetch call Call from compiler populates the shared + * queue with prefetch requests while prefetch thread processes the + * prefetch requests */ - //Create the primary prefetch thread - do { - retval=pthread_create(&tPrefetch, NULL, transPrefetch, NULL); - } while(retval!=0); - pthread_detach(tPrefetch); +void transInit() { + int t, rc; + int retval; + //Create and initialize prefetch cache structure + prefetchcache = objstrCreate(PREFETCH_CACHE_SIZE); + + /* Initialize attributes for mutex */ + pthread_mutexattr_init(&prefetchcache_mutex_attr); + pthread_mutexattr_settype(&prefetchcache_mutex_attr, PTHREAD_MUTEX_RECURSIVE_NP); + + pthread_mutex_init(&prefetchcache_mutex, &prefetchcache_mutex_attr); + + //Create prefetch cache lookup table + if(prehashCreate(HASH_SIZE, LOADFACTOR)) { + printf("ERROR\n"); + return; //Failure + } + + //Initialize primary shared queue + queueInit(); + //Initialize machine pile w/prefetch oids and offsets shared queue + mcpileqInit(); + + //Create the primary prefetch thread + do { + retval=pthread_create(&tPrefetch, NULL, transPrefetch, NULL); + } while(retval!=0); + pthread_detach(tPrefetch); } /* This function stops the threads spawned */ void transExit() { - int t; - pthread_cancel(tPrefetch); - for(t = 0; t < NUM_THREADS; t++) - pthread_cancel(wthreads[t]); - - return; + int t; + pthread_cancel(tPrefetch); + for(t = 0; t < NUM_THREADS; t++) + pthread_cancel(wthreads[t]); + + return; } /* This functions inserts randowm wait delays in the order of msec * Mostly used when transaction commits retry*/ -void randomdelay() -{ - struct timespec req; - time_t t; - - t = time(NULL); - req.tv_sec = 0; - req.tv_nsec = (long)(1000000 + (t%10000000)); //1-11 msec - nanosleep(&req, NULL); - return; +void randomdelay() { + struct timespec req; + time_t t; + + t = time(NULL); + req.tv_sec = 0; + req.tv_nsec = (long)(1000000 + (t%10000000)); //1-11 msec + nanosleep(&req, NULL); + return; } /* This function initializes things required in the transaction start*/ -transrecord_t *transStart() -{ - transrecord_t *tmp = calloc(1, sizeof(transrecord_t)); - tmp->cache = objstrCreate(1048576); - tmp->lookupTable = chashCreate(HASH_SIZE, LOADFACTOR); +transrecord_t *transStart() { + transrecord_t *tmp = calloc(1, sizeof(transrecord_t)); + tmp->cache = objstrCreate(1048576); + tmp->lookupTable = chashCreate(HASH_SIZE, LOADFACTOR); #ifdef COMPILER - tmp->revertlist=NULL; + tmp->revertlist=NULL; #endif - return tmp; + return tmp; } /* This function finds the location of the objects involved in a transaction @@ -301,8 +282,8 @@ objheader_t *transRead(transrecord_t *record, unsigned int oid) { return NULL; } - /* Search local transaction cache */ if((objheader = (objheader_t *)chashSearch(record->lookupTable, oid)) != NULL){ + /* Search local transaction cache */ #ifdef COMPILER return &objheader[1]; #else @@ -321,7 +302,8 @@ objheader_t *transRead(transrecord_t *record, unsigned int oid) { #else return objcopy; #endif - } else if((tmp = (objheader_t *) prehashSearch(oid)) != NULL) { /* Look up in prefetch cache */ + } else if((tmp = (objheader_t *) prehashSearch(oid)) != NULL) { + /* Look up in prefetch cache */ GETSIZE(size, tmp); size+=sizeof(objheader_t); objcopy = (objheader_t *) objstrAlloc(record->cache, size); @@ -356,70 +338,60 @@ objheader_t *transRead(transrecord_t *record, unsigned int oid) { } /* This function creates objects in the transaction record */ -objheader_t *transCreateObj(transrecord_t *record, unsigned int size) -{ - objheader_t *tmp = (objheader_t *) objstrAlloc(record->cache, (sizeof(objheader_t) + size)); - tmp->notifylist = NULL; - OID(tmp) = getNewOID(); - tmp->version = 1; - tmp->rcount = 1; - STATUS(tmp) = NEW; - chashInsert(record->lookupTable, OID(tmp), tmp); - +objheader_t *transCreateObj(transrecord_t *record, unsigned int size) { + objheader_t *tmp = (objheader_t *) objstrAlloc(record->cache, (sizeof(objheader_t) + size)); + tmp->notifylist = NULL; + OID(tmp) = getNewOID(); + tmp->version = 1; + tmp->rcount = 1; + STATUS(tmp) = NEW; + chashInsert(record->lookupTable, OID(tmp), tmp); + #ifdef COMPILER - return &tmp[1]; //want space after object header + return &tmp[1]; //want space after object header #else - return tmp; + return tmp; #endif } /* This function creates machine piles based on all machines involved in a * transaction commit request */ plistnode_t *createPiles(transrecord_t *record) { - int i = 0; - unsigned int size;/* Represents number of bins in the chash table */ - chashlistnode_t *curr, *ptr, *next; - plistnode_t *pile = NULL; - unsigned int machinenum; - void *localmachinenum; - objheader_t *headeraddr; - - ptr = record->lookupTable->table; - size = record->lookupTable->size; - - for(i = 0; i < size ; i++) { - curr = &ptr[i]; - /* Inner loop to traverse the linked list of the cache lookupTable */ - while(curr != NULL) { - //if the first bin in hash table is empty - if(curr->key == 0) { - break; - } - next = curr->next; - - if ((headeraddr = chashSearch(record->lookupTable, curr->key)) == NULL) { - printf("Error: No such oid %s, %d\n", __FILE__, __LINE__); - return NULL; - } - - //Get machine location for object id (and whether local or not) - if (STATUS(headeraddr) & NEW || (mhashSearch(curr->key) != NULL)) { - machinenum = myIpAddr; - } else if ((machinenum = lhashSearch(curr->key)) == 0) { - printf("Error: No such machine %s, %d\n", __FILE__, __LINE__); - return NULL; - } - - //Make machine groups - if ((pile = pInsert(pile, headeraddr, machinenum, record->lookupTable->numelements)) == NULL) { - printf("pInsert error %s, %d\n", __FILE__, __LINE__); - return NULL; - } - - curr = next; - } - } - return pile; + int i; + plistnode_t *pile = NULL; + unsigned int machinenum; + objheader_t *headeraddr; + chashlistnode_t * ptr = record->lookupTable->table; + /* Represents number of bins in the chash table */ + unsigned int size = record->lookupTable->size; + + for(i = 0; i < size ; i++) { + chashlistnode_t * curr = &ptr[i]; + /* Inner loop to traverse the linked list of the cache lookupTable */ + while(curr != NULL) { + //if the first bin in hash table is empty + if(curr->key == 0) + break; + + if ((headeraddr = chashSearch(record->lookupTable, curr->key)) == NULL) { + printf("Error: No such oid %s, %d\n", __FILE__, __LINE__); + return NULL; + } + + //Get machine location for object id (and whether local or not) + if (STATUS(headeraddr) & NEW || (mhashSearch(curr->key) != NULL)) { + machinenum = myIpAddr; + } else if ((machinenum = lhashSearch(curr->key)) == 0) { + printf("Error: No such machine %s, %d\n", __FILE__, __LINE__); + return NULL; + } + + //Make machine groups + pile = pInsert(pile, headeraddr, machinenum, record->lookupTable->numelements); + curr = curr->next; + } + } + return pile; } /* This function initiates the transaction commit process @@ -428,213 +400,186 @@ plistnode_t *createPiles(transrecord_t *record) { * Sends a transrequest() to each remote machines for objects found remotely * and calls handleLocalReq() to process objects found locally */ int transCommit(transrecord_t *record) { - unsigned int tot_bytes_mod, *listmid; - plistnode_t *pile, *pile_ptr; - int i, j, rc, val; - int pilecount, offset, threadnum = 0, trecvcount = 0; - char control; - char transid[TID_LEN]; - trans_req_data_t *tosend; - trans_commit_data_t transinfo; - static int newtid = 0; - char treplyctrl = 0, treplyretry = 0; /* keeps track of the common response that needs to be sent */ - char localstat = 0; - thread_data_array_t *thread_data_array; - local_thread_data_array_t *ltdata; - - do { - trecvcount = 0; - threadnum = 0; - treplyretry = 0; - thread_data_array = NULL; - ltdata = NULL; - - /* Look through all the objects in the transaction record and make piles - * for each machine involved in the transaction*/ - pile_ptr = pile = createPiles(record); - - /* Create the packet to be sent in TRANS_REQUEST */ - - /* Count the number of participants */ - pilecount = pCount(pile); - - /* Create a list of machine ids(Participants) involved in transaction */ - if((listmid = calloc(pilecount, sizeof(unsigned int))) == NULL) { - printf("Calloc error %s, %d\n", __FILE__, __LINE__); - return 1; - } - pListMid(pile, listmid); - - - /* Initialize thread variables, - * Spawn a thread for each Participant involved in a transaction */ - pthread_t thread[pilecount]; - pthread_attr_t attr; - pthread_cond_t tcond; - pthread_mutex_t tlock; - pthread_mutex_t tlshrd; - - if((thread_data_array = (thread_data_array_t *) calloc(pilecount, sizeof(thread_data_array_t))) == NULL) { - printf("Calloc error %s, %d\n", __FILE__, __LINE__); - pthread_cond_destroy(&tcond); - pthread_mutex_destroy(&tlock); - pDelete(pile_ptr); - free(listmid); - return 1; - } - - if((ltdata = calloc(1, sizeof(local_thread_data_array_t))) == NULL) { - printf("Calloc error %s, %d\n", __FILE__, __LINE__); - pthread_cond_destroy(&tcond); - pthread_mutex_destroy(&tlock); - pDelete(pile_ptr); - free(listmid); - free(thread_data_array); - return 1; - } - - thread_response_t rcvd_control_msg[pilecount]; /* Shared thread array that keeps track of responses of participants */ - - /* Initialize and set thread detach attribute */ - pthread_attr_init(&attr); - pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_JOINABLE); - pthread_mutex_init(&tlock, NULL); - pthread_cond_init(&tcond, NULL); - - /* Process each machine pile */ - while(pile != NULL) { - //Create transaction id - newtid++; - if ((tosend = calloc(1, sizeof(trans_req_data_t))) == NULL) { - printf("Calloc error %s, %d\n", __FILE__, __LINE__); - pthread_cond_destroy(&tcond); - pthread_mutex_destroy(&tlock); - pDelete(pile_ptr); - free(listmid); - free(thread_data_array); - free(ltdata); - return 1; - } - tosend->f.control = TRANS_REQUEST; - sprintf(tosend->f.trans_id, "%x_%d", pile->mid, newtid); - tosend->f.mcount = pilecount; - tosend->f.numread = pile->numread; - tosend->f.nummod = pile->nummod; - tosend->f.numcreated = pile->numcreated; - tosend->f.sum_bytes = pile->sum_bytes; - tosend->listmid = listmid; - tosend->objread = pile->objread; - tosend->oidmod = pile->oidmod; - tosend->oidcreated = pile->oidcreated; - thread_data_array[threadnum].thread_id = threadnum; - thread_data_array[threadnum].mid = pile->mid; - thread_data_array[threadnum].buffer = tosend; - thread_data_array[threadnum].recvmsg = rcvd_control_msg; - thread_data_array[threadnum].threshold = &tcond; - thread_data_array[threadnum].lock = &tlock; - thread_data_array[threadnum].count = &trecvcount; - thread_data_array[threadnum].replyctrl = &treplyctrl; - thread_data_array[threadnum].replyretry = &treplyretry; - thread_data_array[threadnum].rec = record; - /* If local do not create any extra connection */ - if(pile->mid != myIpAddr) { /* Not local */ - do { - rc = pthread_create(&thread[threadnum], &attr, transRequest, (void *) &thread_data_array[threadnum]); - } while(rc!=0); - if(rc) { - perror("Error in pthread create\n"); - pthread_cond_destroy(&tcond); - pthread_mutex_destroy(&tlock); - pDelete(pile_ptr); - free(listmid); - for (i = 0; i < threadnum; i++) - free(thread_data_array[i].buffer); - free(thread_data_array); - free(ltdata); - return 1; - } - } else { /*Local*/ - ltdata->tdata = &thread_data_array[threadnum]; - ltdata->transinfo = &transinfo; - do { - val = pthread_create(&thread[threadnum], &attr, handleLocalReq, (void *) ltdata); - } while(val!=0); - if(val) { - perror("Error in pthread create\n"); - pthread_cond_destroy(&tcond); - pthread_mutex_destroy(&tlock); - pDelete(pile_ptr); - free(listmid); - for (i = 0; i < threadnum; i++) - free(thread_data_array[i].buffer); - free(thread_data_array); - free(ltdata); - return 1; - } - } - - threadnum++; - pile = pile->next; - } - /* Free attribute and wait for the other threads */ - pthread_attr_destroy(&attr); - - for (i = 0; i < threadnum; i++) { - rc = pthread_join(thread[i], NULL); - if(rc) - { - printf("Error: return code from pthread_join() is %d\n", rc); - pthread_cond_destroy(&tcond); - pthread_mutex_destroy(&tlock); - pDelete(pile_ptr); - free(listmid); - for (j = i; j < threadnum; j++) { - free(thread_data_array[j].buffer); - } - return 1; - } - free(thread_data_array[i].buffer); - } - - /* Free resources */ - pthread_cond_destroy(&tcond); - pthread_mutex_destroy(&tlock); - free(listmid); - pDelete(pile_ptr); - - /* wait a random amount of time before retrying to commit transaction*/ - if(treplyretry == 1) { - free(thread_data_array); - free(ltdata); - randomdelay(); - } - - /* Retry trans commit procedure during soft_abort case */ - } while (treplyretry == 1); - - - if(treplyctrl == TRANS_ABORT) { - /* Free Resources */ - objstrDelete(record->cache); - chashDelete(record->lookupTable); - free(record); - free(thread_data_array); - free(ltdata); - return TRANS_ABORT; - } else if(treplyctrl == TRANS_COMMIT) { - /* Free Resources */ - objstrDelete(record->cache); - chashDelete(record->lookupTable); - free(record); - free(thread_data_array); - free(ltdata); - return 0; - } else { - //TODO Add other cases - printf("Error: in %s() THIS SHOULD NOT HAPPEN.....EXIT PROGRAM\n", __func__); - exit(-1); + unsigned int tot_bytes_mod, *listmid; + plistnode_t *pile, *pile_ptr; + int i, j, rc, val; + int pilecount, offset, threadnum = 0, trecvcount = 0; + char control; + char transid[TID_LEN]; + trans_req_data_t *tosend; + trans_commit_data_t transinfo; + static int newtid = 0; + char treplyctrl = 0, treplyretry = 0; /* keeps track of the common response that needs to be sent */ + char localstat = 0; + thread_data_array_t *thread_data_array; + local_thread_data_array_t *ltdata; + + do { + trecvcount = 0; + threadnum = 0; + treplyretry = 0; + thread_data_array = NULL; + ltdata = NULL; + + /* Look through all the objects in the transaction record and make piles + * for each machine involved in the transaction*/ + pile_ptr = pile = createPiles(record); + + /* Create the packet to be sent in TRANS_REQUEST */ + + /* Count the number of participants */ + pilecount = pCount(pile); + + /* Create a list of machine ids(Participants) involved in transaction */ + listmid = calloc(pilecount, sizeof(unsigned int)); + pListMid(pile, listmid); + + + /* Initialize thread variables, + * Spawn a thread for each Participant involved in a transaction */ + pthread_t thread[pilecount]; + pthread_attr_t attr; + pthread_cond_t tcond; + pthread_mutex_t tlock; + pthread_mutex_t tlshrd; + + thread_data_array = (thread_data_array_t *) calloc(pilecount, sizeof(thread_data_array_t)); + + ltdata = calloc(1, sizeof(local_thread_data_array_t)); + + thread_response_t rcvd_control_msg[pilecount]; /* Shared thread array that keeps track of responses of participants */ + + /* Initialize and set thread detach attribute */ + pthread_attr_init(&attr); + pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_JOINABLE); + pthread_mutex_init(&tlock, NULL); + pthread_cond_init(&tcond, NULL); + + /* Process each machine pile */ + while(pile != NULL) { + //Create transaction id + newtid++; + tosend = calloc(1, sizeof(trans_req_data_t)); + tosend->f.control = TRANS_REQUEST; + sprintf(tosend->f.trans_id, "%x_%d", pile->mid, newtid); + tosend->f.mcount = pilecount; + tosend->f.numread = pile->numread; + tosend->f.nummod = pile->nummod; + tosend->f.numcreated = pile->numcreated; + tosend->f.sum_bytes = pile->sum_bytes; + tosend->listmid = listmid; + tosend->objread = pile->objread; + tosend->oidmod = pile->oidmod; + tosend->oidcreated = pile->oidcreated; + thread_data_array[threadnum].thread_id = threadnum; + thread_data_array[threadnum].mid = pile->mid; + thread_data_array[threadnum].buffer = tosend; + thread_data_array[threadnum].recvmsg = rcvd_control_msg; + thread_data_array[threadnum].threshold = &tcond; + thread_data_array[threadnum].lock = &tlock; + thread_data_array[threadnum].count = &trecvcount; + thread_data_array[threadnum].replyctrl = &treplyctrl; + thread_data_array[threadnum].replyretry = &treplyretry; + thread_data_array[threadnum].rec = record; + /* If local do not create any extra connection */ + if(pile->mid != myIpAddr) { /* Not local */ + do { + rc = pthread_create(&thread[threadnum], &attr, transRequest, (void *) &thread_data_array[threadnum]); + } while(rc!=0); + if(rc) { + perror("Error in pthread create\n"); + pthread_cond_destroy(&tcond); + pthread_mutex_destroy(&tlock); + pDelete(pile_ptr); + free(listmid); + for (i = 0; i < threadnum; i++) + free(thread_data_array[i].buffer); + free(thread_data_array); + free(ltdata); + return 1; } - return 0; + } else { /*Local*/ + ltdata->tdata = &thread_data_array[threadnum]; + ltdata->transinfo = &transinfo; + do { + val = pthread_create(&thread[threadnum], &attr, handleLocalReq, (void *) ltdata); + } while(val!=0); + if(val) { + perror("Error in pthread create\n"); + pthread_cond_destroy(&tcond); + pthread_mutex_destroy(&tlock); + pDelete(pile_ptr); + free(listmid); + for (i = 0; i < threadnum; i++) + free(thread_data_array[i].buffer); + free(thread_data_array); + free(ltdata); + return 1; + } + } + + threadnum++; + pile = pile->next; + } + /* Free attribute and wait for the other threads */ + pthread_attr_destroy(&attr); + + for (i = 0; i < threadnum; i++) { + rc = pthread_join(thread[i], NULL); + if(rc) + { + printf("Error: return code from pthread_join() is %d\n", rc); + pthread_cond_destroy(&tcond); + pthread_mutex_destroy(&tlock); + pDelete(pile_ptr); + free(listmid); + for (j = i; j < threadnum; j++) { + free(thread_data_array[j].buffer); + } + return 1; + } + free(thread_data_array[i].buffer); + } + + /* Free resources */ + pthread_cond_destroy(&tcond); + pthread_mutex_destroy(&tlock); + free(listmid); + pDelete(pile_ptr); + + /* wait a random amount of time before retrying to commit transaction*/ + if(treplyretry) { + free(thread_data_array); + free(ltdata); + randomdelay(); + } + + /* Retry trans commit procedure during soft_abort case */ + } while (treplyretry); + + + if(treplyctrl == TRANS_ABORT) { + /* Free Resources */ + objstrDelete(record->cache); + chashDelete(record->lookupTable); + free(record); + free(thread_data_array); + free(ltdata); + return TRANS_ABORT; + } else if(treplyctrl == TRANS_COMMIT) { + /* Free Resources */ + objstrDelete(record->cache); + chashDelete(record->lookupTable); + free(record); + free(thread_data_array); + free(ltdata); + return 0; + } else { + //TODO Add other cases + printf("Error: in %s() THIS SHOULD NOT HAPPEN.....EXIT PROGRAM\n", __func__); + exit(-1); + } + return 0; } /* This function sends information involved in the transaction request @@ -642,196 +587,184 @@ int transCommit(transrecord_t *record) { * It calls decideresponse() to decide on what control message * to send next to participants and sends the message using sendResponse()*/ void *transRequest(void *threadarg) { - int sd, i, n; - struct sockaddr_in serv_addr; - thread_data_array_t *tdata; - objheader_t *headeraddr; - char control, recvcontrol; - char machineip[16], retval; - - tdata = (thread_data_array_t *) threadarg; - - /* Send Trans Request */ - if ((sd = socket(AF_INET, SOCK_STREAM, 0)) < 0) { - perror("Error in socket for TRANS_REQUEST\n"); - pthread_exit(NULL); - } - bzero((char*) &serv_addr, sizeof(serv_addr)); - serv_addr.sin_family = AF_INET; - serv_addr.sin_port = htons(LISTEN_PORT); - midtoIP(tdata->mid,machineip); - machineip[15] = '\0'; - serv_addr.sin_addr.s_addr = inet_addr(machineip); - /* Open Connection */ - if (connect(sd, (struct sockaddr *) &serv_addr, sizeof(struct sockaddr)) < 0) { - perror("Error in connect for TRANS_REQUEST\n"); - close(sd); - pthread_exit(NULL); - } - - /* Send bytes of data with TRANS_REQUEST control message */ - send_data(sd, &(tdata->buffer->f), sizeof(fixed_data_t)); - - /* Send list of machines involved in the transaction */ - { - int size=sizeof(unsigned int)*tdata->buffer->f.mcount; - send_data(sd, tdata->buffer->listmid, size); - } - - /* Send oids and version number tuples for objects that are read */ - { - int size=(sizeof(unsigned int)+sizeof(unsigned short))*tdata->buffer->f.numread; - send_data(sd, tdata->buffer->objread, size); - } - - /* Send objects that are modified */ - for(i = 0; i < tdata->buffer->f.nummod ; i++) { - int size; - headeraddr = chashSearch(tdata->rec->lookupTable, tdata->buffer->oidmod[i]); - GETSIZE(size,headeraddr); - size+=sizeof(objheader_t); - send_data(sd, headeraddr, size); - } - - /* Read control message from Participant */ - recv_data(sd, &control, sizeof(char)); - recvcontrol = control; - - /* Update common data structure and increment count */ - tdata->recvmsg[tdata->thread_id].rcv_status = recvcontrol; - - /* Lock and update count */ - /* Thread sleeps until all messages from pariticipants are received by coordinator */ - pthread_mutex_lock(tdata->lock); - - (*(tdata->count))++; /* keeps track of no of messages received by the coordinator */ - - /* Wake up the threads and invoke decideResponse (once) */ - if(*(tdata->count) == tdata->buffer->f.mcount) { - decideResponse(tdata); - pthread_cond_broadcast(tdata->threshold); - } else { - pthread_cond_wait(tdata->threshold, tdata->lock); - } - pthread_mutex_unlock(tdata->lock); - - /* Send the final response such as TRANS_COMMIT or TRANS_ABORT t - * to all participants in their respective socket */ - if (sendResponse(tdata, sd) == 0) { - printf("sendResponse returned error %s,%d\n", __FILE__, __LINE__); - close(sd); - pthread_exit(NULL); - } - - recv_data((int)sd, &control, sizeof(char)); - - if(control == TRANS_UNSUCESSFUL) { - //printf("DEBUG-> TRANS_ABORTED\n"); - } else if(control == TRANS_SUCESSFUL) { - //printf("DEBUG-> TRANS_SUCCESSFUL\n"); - } else { - //printf("DEBUG-> Error: Incorrect Transaction End Message %d\n", control); - } - - /* Close connection */ - close(sd); - pthread_exit(NULL); + int sd, i, n; + struct sockaddr_in serv_addr; + thread_data_array_t *tdata; + objheader_t *headeraddr; + char control, recvcontrol; + char machineip[16], retval; + + tdata = (thread_data_array_t *) threadarg; + + /* Send Trans Request */ + if ((sd = socket(AF_INET, SOCK_STREAM, 0)) < 0) { + perror("Error in socket for TRANS_REQUEST\n"); + pthread_exit(NULL); + } + bzero((char*) &serv_addr, sizeof(serv_addr)); + serv_addr.sin_family = AF_INET; + serv_addr.sin_port = htons(LISTEN_PORT); + midtoIP(tdata->mid,machineip); + machineip[15] = '\0'; + serv_addr.sin_addr.s_addr = inet_addr(machineip); + /* Open Connection */ + if (connect(sd, (struct sockaddr *) &serv_addr, sizeof(struct sockaddr)) < 0) { + perror("Error in connect for TRANS_REQUEST\n"); + close(sd); + pthread_exit(NULL); + } + + /* Send bytes of data with TRANS_REQUEST control message */ + send_data(sd, &(tdata->buffer->f), sizeof(fixed_data_t)); + + /* Send list of machines involved in the transaction */ + { + int size=sizeof(unsigned int)*tdata->buffer->f.mcount; + send_data(sd, tdata->buffer->listmid, size); + } + + /* Send oids and version number tuples for objects that are read */ + { + int size=(sizeof(unsigned int)+sizeof(unsigned short))*tdata->buffer->f.numread; + send_data(sd, tdata->buffer->objread, size); + } + + /* Send objects that are modified */ + for(i = 0; i < tdata->buffer->f.nummod ; i++) { + int size; + headeraddr = chashSearch(tdata->rec->lookupTable, tdata->buffer->oidmod[i]); + GETSIZE(size,headeraddr); + size+=sizeof(objheader_t); + send_data(sd, headeraddr, size); + } + + /* Read control message from Participant */ + recv_data(sd, &control, sizeof(char)); + recvcontrol = control; + + /* Update common data structure and increment count */ + tdata->recvmsg[tdata->thread_id].rcv_status = recvcontrol; + + /* Lock and update count */ + /* Thread sleeps until all messages from pariticipants are received by coordinator */ + pthread_mutex_lock(tdata->lock); + + (*(tdata->count))++; /* keeps track of no of messages received by the coordinator */ + + /* Wake up the threads and invoke decideResponse (once) */ + if(*(tdata->count) == tdata->buffer->f.mcount) { + decideResponse(tdata); + pthread_cond_broadcast(tdata->threshold); + } else { + pthread_cond_wait(tdata->threshold, tdata->lock); + } + pthread_mutex_unlock(tdata->lock); + + /* Send the final response such as TRANS_COMMIT or TRANS_ABORT t + * to all participants in their respective socket */ + if (sendResponse(tdata, sd) == 0) { + printf("sendResponse returned error %s,%d\n", __FILE__, __LINE__); + close(sd); + pthread_exit(NULL); + } + + recv_data((int)sd, &control, sizeof(char)); + + if(control == TRANS_UNSUCESSFUL) { + //printf("DEBUG-> TRANS_ABORTED\n"); + } else if(control == TRANS_SUCESSFUL) { + //printf("DEBUG-> TRANS_SUCCESSFUL\n"); + } else { + //printf("DEBUG-> Error: Incorrect Transaction End Message %d\n", control); + } + + /* Close connection */ + close(sd); + pthread_exit(NULL); } /* This function decides the reponse that needs to be sent to * all Participant machines after the TRANS_REQUEST protocol */ void decideResponse(thread_data_array_t *tdata) { - char control; - int i, transagree = 0, transdisagree = 0, transsoftabort = 0; /* Counters to formulate decision of what - message to send */ - - for (i = 0 ; i < tdata->buffer->f.mcount; i++) { - control = tdata->recvmsg[i].rcv_status; /* tdata: keeps track of all participant responses - written onto the shared array */ - switch(control) { - default: - printf("Participant sent unknown message in %s, %d\n", __FILE__, __LINE__); - /* treat as disagree, pass thru */ - case TRANS_DISAGREE: - transdisagree++; - break; - - case TRANS_AGREE: - transagree++; - break; - - case TRANS_SOFT_ABORT: - transsoftabort++; - break; - } - } + char control; + int i, transagree = 0, transdisagree = 0, transsoftabort = 0; /* Counters to formulate decision of what + message to send */ + + for (i = 0 ; i < tdata->buffer->f.mcount; i++) { + control = tdata->recvmsg[i].rcv_status; /* tdata: keeps track of all participant responses + written onto the shared array */ + switch(control) { + default: + printf("Participant sent unknown message in %s, %d\n", __FILE__, __LINE__); + /* treat as disagree, pass thru */ + case TRANS_DISAGREE: + transdisagree++; + break; + + case TRANS_AGREE: + transagree++; + break; + + case TRANS_SOFT_ABORT: + transsoftabort++; + break; + } + } + + if(transdisagree > 0) { + /* Send Abort */ + *(tdata->replyctrl) = TRANS_ABORT; + *(tdata->replyretry) = 0; + /* clear objects from prefetch cache */ + for (i = 0; i < tdata->buffer->f.numread; i++) + prehashRemove(*((unsigned int *)(((char *)tdata->buffer->objread) + (sizeof(unsigned int) + sizeof(unsigned short))*i))); + for (i = 0; i < tdata->buffer->f.nummod; i++) + prehashRemove(tdata->buffer->oidmod[i]); + } else if(transagree == tdata->buffer->f.mcount){ + /* Send Commit */ + *(tdata->replyctrl) = TRANS_COMMIT; + *(tdata->replyretry) = 0; + } else { + /* Send Abort in soft abort case followed by retry commiting transaction again*/ + *(tdata->replyctrl) = TRANS_ABORT; + *(tdata->replyretry) = 1; + } + + return; +} - if(transdisagree > 0) { - /* Send Abort */ - *(tdata->replyctrl) = TRANS_ABORT; - *(tdata->replyretry) = 0; - /* clear objects from prefetch cache */ - for (i = 0; i < tdata->buffer->f.numread; i++) - prehashRemove(*((unsigned int *)(((char *)tdata->buffer->objread) + (sizeof(unsigned int) + sizeof(unsigned short))*i))); - for (i = 0; i < tdata->buffer->f.nummod; i++) - prehashRemove(tdata->buffer->oidmod[i]); - } else if(transagree == tdata->buffer->f.mcount){ - /* Send Commit */ - *(tdata->replyctrl) = TRANS_COMMIT; - *(tdata->replyretry) = 0; - } else { - /* Send Abort in soft abort case followed by retry commiting transaction again*/ - *(tdata->replyctrl) = TRANS_ABORT; - *(tdata->replyretry) = 1; - } +/* This function sends the final response to remote machines per + * thread in their respective socket id It returns a char that is only + * needed to check the correctness of execution of this function + * inside transRequest()*/ - return; -} -/* This function sends the final response to remote machines per thread in their respective socket id - * It returns a char that is only needed to check the correctness of execution of this function inside - * transRequest()*/ char sendResponse(thread_data_array_t *tdata, int sd) { - int n, size, sum, oidcount = 0, control; - char *ptr, retval = 0; - unsigned int *oidnotfound; - - control = *(tdata->replyctrl); - send_data(sd, &control, sizeof(char)); - - //TODO read missing objects during object migration - /* If response is a soft abort due to missing objects at the Participant's side */ - /* - if(tdata->recvmsg[tdata->thread_id].rcv_status == TRANS_SOFT_ABORT) { - // Read list of objects missing - recv_data(sd, &oidcount, sizeof(int)); - //if((read(sd, &oidcount, sizeof(int)) != 0) && (oidcount != 0)) { - if(oidcount != 0) { - size = oidcount * sizeof(unsigned int); - if((oidnotfound = calloc(oidcount, sizeof(unsigned int))) == NULL) { - printf("Calloc error %s, %d\n", __FILE__, __LINE__); - return 0; - } - ptr = (char *) oidnotfound; - recv_data(sd, ptr, size); - } - retval = TRANS_SOFT_ABORT; - } - */ - - /* If the decided response is TRANS_ABORT */ - if(*(tdata->replyctrl) == TRANS_ABORT) { - retval = TRANS_ABORT; - } else if(*(tdata->replyctrl) == TRANS_COMMIT) { /* If the decided response is TRANS_COMMIT */ - retval = TRANS_COMMIT; - } - - return retval; + int n, size, sum, oidcount = 0, control; + char *ptr, retval = 0; + unsigned int *oidnotfound; + + control = *(tdata->replyctrl); + send_data(sd, &control, sizeof(char)); + + //TODO read missing objects during object migration + /* If response is a soft abort due to missing objects at the + Participant's side */ + + /* If the decided response is TRANS_ABORT */ + if(*(tdata->replyctrl) == TRANS_ABORT) { + retval = TRANS_ABORT; + } else if(*(tdata->replyctrl) == TRANS_COMMIT) { + /* If the decided response is TRANS_COMMIT */ + retval = TRANS_COMMIT; + } + + return retval; } -/* This function opens a connection, places an object read request to the - * remote machine, reads the control message and object if available and - * copies the object and its header to the local cache. - * */ +/* This function opens a connection, places an object read request to + * the remote machine, reads the control message and object if + * available and copies the object and its header to the local + * cache. */ void *getRemoteObj(transrecord_t *record, unsigned int mnum, unsigned int oid) { int size, val; @@ -867,148 +800,150 @@ void *getRemoteObj(transrecord_t *record, unsigned int mnum, unsigned int oid) { return objcopy; } -/* This function handles the local objects involved in a transaction commiting process. - * It also makes a decision if this local machine sends AGREE or DISAGREE or SOFT_ABORT to coordinator. - * Note Coordinator = local machine - * It wakes up the other threads from remote participants that are waiting for the coordinator's decision and - * based on common agreement it either commits or aborts the transaction. - * It also frees the memory resources */ -void *handleLocalReq(void *threadarg) { - unsigned int *oidnotfound = NULL, *oidlocked = NULL; - local_thread_data_array_t *localtdata; - int objnotfound = 0, objlocked = 0; - int v_nomatch = 0, v_matchlock = 0, v_matchnolock = 0; - int numread, i; - unsigned int oid; - unsigned short version; - void *mobj; - objheader_t *headptr; - - localtdata = (local_thread_data_array_t *) threadarg; - - /* Counters and arrays to formulate decision on control message to be sent */ - oidnotfound = (unsigned int *) calloc((localtdata->tdata->buffer->f.numread + localtdata->tdata->buffer->f.nummod), sizeof(unsigned int)); - oidlocked = (unsigned int *) calloc((localtdata->tdata->buffer->f.numread + localtdata->tdata->buffer->f.nummod), sizeof(unsigned int)); - - numread = localtdata->tdata->buffer->f.numread; - /* Process each oid in the machine pile/ group per thread */ - for (i = 0; i < localtdata->tdata->buffer->f.numread + localtdata->tdata->buffer->f.nummod; i++) { - if (i < localtdata->tdata->buffer->f.numread) { - int incr = sizeof(unsigned int) + sizeof(unsigned short);// Offset that points to next position in the objread array - incr *= i; - oid = *((unsigned int *)(((char *)localtdata->tdata->buffer->objread) + incr)); - version = *((unsigned short *)(((char *)localtdata->tdata->buffer->objread) + incr + sizeof(unsigned int))); - } else { // Objects Modified - int tmpsize; - headptr = (objheader_t *) chashSearch(localtdata->tdata->rec->lookupTable, localtdata->tdata->buffer->oidmod[i-numread]); - if (headptr == NULL) { - printf("Error: handleLocalReq() returning NULL %s, %d\n", __FILE__, __LINE__); - return NULL; - } - oid = OID(headptr); - version = headptr->version; - } - /* Check if object is still present in the machine since the beginning of TRANS_REQUEST */ - - /* Save the oids not found and number of oids not found for later use */ - if ((mobj = mhashSearch(oid)) == NULL) { /* Obj not found */ - /* Save the oids not found and number of oids not found for later use */ - oidnotfound[objnotfound] = oid; - objnotfound++; - } else { /* If Obj found in machine (i.e. has not moved) */ - /* Check if Obj is locked by any previous transaction */ - if ((STATUS((objheader_t *)mobj) & LOCK) == LOCK) { - if (version == ((objheader_t *)mobj)->version) { /* If locked then match versions */ - v_matchlock++; - } else {/* If versions don't match ...HARD ABORT */ - v_nomatch++; - /* Send TRANS_DISAGREE to Coordinator */ - localtdata->tdata->recvmsg[localtdata->tdata->thread_id].rcv_status = TRANS_DISAGREE; - } - } else {/* If Obj is not locked then lock object */ - STATUS(((objheader_t *)mobj)) |= LOCK; - /* Save all object oids that are locked on this machine during this transaction request call */ - oidlocked[objlocked] = OID(((objheader_t *)mobj)); - objlocked++; - if (version == ((objheader_t *)mobj)->version) { /* Check if versions match */ - v_matchnolock++; - } else { /* If versions don't match ...HARD ABORT */ - v_nomatch++; - /* Send TRANS_DISAGREE to Coordinator */ - localtdata->tdata->recvmsg[localtdata->tdata->thread_id].rcv_status = TRANS_DISAGREE; - } - } - } - } // End for - /* Condition to send TRANS_AGREE */ - if(v_matchnolock == localtdata->tdata->buffer->f.numread + localtdata->tdata->buffer->f.nummod) { - localtdata->tdata->recvmsg[localtdata->tdata->thread_id].rcv_status = TRANS_AGREE; - } - /* Condition to send TRANS_SOFT_ABORT */ - if((v_matchlock > 0 && v_nomatch == 0) || (objnotfound > 0 && v_nomatch == 0)) { - localtdata->tdata->recvmsg[localtdata->tdata->thread_id].rcv_status = TRANS_SOFT_ABORT; - } +/* This function handles the local objects involved in a transaction + * commiting process. It also makes a decision if this local machine + * sends AGREE or DISAGREE or SOFT_ABORT to coordinator. Note + * Coordinator = local machine It wakes up the other threads from + * remote participants that are waiting for the coordinator's decision + * and based on common agreement it either commits or aborts the + * transaction. It also frees the memory resources */ - /* Fill out the trans_commit_data_t data structure. This is required for a trans commit process - * if Participant receives a TRANS_COMMIT */ - localtdata->transinfo->objlocked = oidlocked; - localtdata->transinfo->objnotfound = oidnotfound; - localtdata->transinfo->modptr = NULL; - localtdata->transinfo->numlocked = objlocked; - localtdata->transinfo->numnotfound = objnotfound; - /* Lock and update count */ - //Thread sleeps until all messages from pariticipants are received by coordinator - pthread_mutex_lock(localtdata->tdata->lock); - (*(localtdata->tdata->count))++; /* keeps track of no of messages received by the coordinator */ - - /* Wake up the threads and invoke decideResponse (once) */ - if(*(localtdata->tdata->count) == localtdata->tdata->buffer->f.mcount) { - decideResponse(localtdata->tdata); - pthread_cond_broadcast(localtdata->tdata->threshold); - } else { - pthread_cond_wait(localtdata->tdata->threshold, localtdata->tdata->lock); - } - pthread_mutex_unlock(localtdata->tdata->lock); - if(*(localtdata->tdata->replyctrl) == TRANS_ABORT){ - if(transAbortProcess(localtdata) != 0) { - printf("Error in transAbortProcess() %s,%d\n", __FILE__, __LINE__); - pthread_exit(NULL); - } - } else if(*(localtdata->tdata->replyctrl) == TRANS_COMMIT) { - if(transComProcess(localtdata) != 0) { - printf("Error in transComProcess() %s,%d\n", __FILE__, __LINE__); - pthread_exit(NULL); - } - } - /* Free memory */ - if (localtdata->transinfo->objlocked != NULL) { - free(localtdata->transinfo->objlocked); +void *handleLocalReq(void *threadarg) { + unsigned int *oidnotfound = NULL, *oidlocked = NULL; + local_thread_data_array_t *localtdata; + int objnotfound = 0, objlocked = 0; + int v_nomatch = 0, v_matchlock = 0, v_matchnolock = 0; + int numread, i; + unsigned int oid; + unsigned short version; + void *mobj; + objheader_t *headptr; + + localtdata = (local_thread_data_array_t *) threadarg; + + /* Counters and arrays to formulate decision on control message to be sent */ + oidnotfound = (unsigned int *) calloc((localtdata->tdata->buffer->f.numread + localtdata->tdata->buffer->f.nummod), sizeof(unsigned int)); + oidlocked = (unsigned int *) calloc((localtdata->tdata->buffer->f.numread + localtdata->tdata->buffer->f.nummod), sizeof(unsigned int)); + + numread = localtdata->tdata->buffer->f.numread; + /* Process each oid in the machine pile/ group per thread */ + for (i = 0; i < localtdata->tdata->buffer->f.numread + localtdata->tdata->buffer->f.nummod; i++) { + if (i < localtdata->tdata->buffer->f.numread) { + int incr = sizeof(unsigned int) + sizeof(unsigned short);// Offset that points to next position in the objread array + incr *= i; + oid = *((unsigned int *)(((char *)localtdata->tdata->buffer->objread) + incr)); + version = *((unsigned short *)(((char *)localtdata->tdata->buffer->objread) + incr + sizeof(unsigned int))); + } else { // Objects Modified + int tmpsize; + headptr = (objheader_t *) chashSearch(localtdata->tdata->rec->lookupTable, localtdata->tdata->buffer->oidmod[i-numread]); + if (headptr == NULL) { + printf("Error: handleLocalReq() returning NULL %s, %d\n", __FILE__, __LINE__); + return NULL; + } + oid = OID(headptr); + version = headptr->version; + } + /* Check if object is still present in the machine since the beginning of TRANS_REQUEST */ + + /* Save the oids not found and number of oids not found for later use */ + if ((mobj = mhashSearch(oid)) == NULL) { /* Obj not found */ + /* Save the oids not found and number of oids not found for later use */ + oidnotfound[objnotfound] = oid; + objnotfound++; + } else { /* If Obj found in machine (i.e. has not moved) */ + /* Check if Obj is locked by any previous transaction */ + if ((STATUS((objheader_t *)mobj) & LOCK) == LOCK) { + if (version == ((objheader_t *)mobj)->version) { /* If locked then match versions */ + v_matchlock++; + } else {/* If versions don't match ...HARD ABORT */ + v_nomatch++; + /* Send TRANS_DISAGREE to Coordinator */ + localtdata->tdata->recvmsg[localtdata->tdata->thread_id].rcv_status = TRANS_DISAGREE; } - if (localtdata->transinfo->objnotfound != NULL) { - free(localtdata->transinfo->objnotfound); + } else {/* If Obj is not locked then lock object */ + STATUS(((objheader_t *)mobj)) |= LOCK; + /* Save all object oids that are locked on this machine during this transaction request call */ + oidlocked[objlocked] = OID(((objheader_t *)mobj)); + objlocked++; + if (version == ((objheader_t *)mobj)->version) { /* Check if versions match */ + v_matchnolock++; + } else { /* If versions don't match ...HARD ABORT */ + v_nomatch++; + /* Send TRANS_DISAGREE to Coordinator */ + localtdata->tdata->recvmsg[localtdata->tdata->thread_id].rcv_status = TRANS_DISAGREE; } - - pthread_exit(NULL); + } + } + } // End for + /* Condition to send TRANS_AGREE */ + if(v_matchnolock == localtdata->tdata->buffer->f.numread + localtdata->tdata->buffer->f.nummod) { + localtdata->tdata->recvmsg[localtdata->tdata->thread_id].rcv_status = TRANS_AGREE; + } + /* Condition to send TRANS_SOFT_ABORT */ + if((v_matchlock > 0 && v_nomatch == 0) || (objnotfound > 0 && v_nomatch == 0)) { + localtdata->tdata->recvmsg[localtdata->tdata->thread_id].rcv_status = TRANS_SOFT_ABORT; + } + + /* Fill out the trans_commit_data_t data structure. This is required for a trans commit process + * if Participant receives a TRANS_COMMIT */ + localtdata->transinfo->objlocked = oidlocked; + localtdata->transinfo->objnotfound = oidnotfound; + localtdata->transinfo->modptr = NULL; + localtdata->transinfo->numlocked = objlocked; + localtdata->transinfo->numnotfound = objnotfound; + /* Lock and update count */ + //Thread sleeps until all messages from pariticipants are received by coordinator + pthread_mutex_lock(localtdata->tdata->lock); + (*(localtdata->tdata->count))++; /* keeps track of no of messages received by the coordinator */ + + /* Wake up the threads and invoke decideResponse (once) */ + if(*(localtdata->tdata->count) == localtdata->tdata->buffer->f.mcount) { + decideResponse(localtdata->tdata); + pthread_cond_broadcast(localtdata->tdata->threshold); + } else { + pthread_cond_wait(localtdata->tdata->threshold, localtdata->tdata->lock); + } + pthread_mutex_unlock(localtdata->tdata->lock); + if(*(localtdata->tdata->replyctrl) == TRANS_ABORT){ + if(transAbortProcess(localtdata) != 0) { + printf("Error in transAbortProcess() %s,%d\n", __FILE__, __LINE__); + pthread_exit(NULL); + } + } else if(*(localtdata->tdata->replyctrl) == TRANS_COMMIT) { + if(transComProcess(localtdata) != 0) { + printf("Error in transComProcess() %s,%d\n", __FILE__, __LINE__); + pthread_exit(NULL); + } + } + /* Free memory */ + if (localtdata->transinfo->objlocked != NULL) { + free(localtdata->transinfo->objlocked); + } + if (localtdata->transinfo->objnotfound != NULL) { + free(localtdata->transinfo->objnotfound); + } + + pthread_exit(NULL); } /* This function completes the ABORT process if the transaction is aborting */ int transAbortProcess(local_thread_data_array_t *localtdata) { - int i, numlocked; - unsigned int *objlocked; - void *header; - - numlocked = localtdata->transinfo->numlocked; - objlocked = localtdata->transinfo->objlocked; - - for (i = 0; i < numlocked; i++) { - if((header = mhashSearch(objlocked[i])) == NULL) { - printf("mhashsearch returns NULL at %s, %d\n", __FILE__, __LINE__); - return 1; - } - STATUS(((objheader_t *)header)) &= ~(LOCK); - } - - return 0; + int i, numlocked; + unsigned int *objlocked; + void *header; + + numlocked = localtdata->transinfo->numlocked; + objlocked = localtdata->transinfo->objlocked; + + for (i = 0; i < numlocked; i++) { + if((header = mhashSearch(objlocked[i])) == NULL) { + printf("mhashsearch returns NULL at %s, %d\n", __FILE__, __LINE__); + return 1; + } + STATUS(((objheader_t *)header)) &= ~(LOCK); + } + + return 0; } /*This function completes the COMMIT process is the transaction is commiting*/ @@ -1075,371 +1010,157 @@ int transComProcess(local_thread_data_array_t *localtdata) { return 0; } -/* This function checks if the prefetch oids are same and have same offsets - * for case x.a.b and y.a.b where x and y have same oid's - * or if a.b.c is a subset of x.b.c.d*/ -/* check for case where the generated request a.y.z or x.y.z.g then - * prefetch needs to be generated for x.y.z.g if oid of a and x are same*/ -void checkPrefetchTuples(prefetchqelem_t *node) { - int i,j, count,k, sindex, index; - char *ptr, *tmp; - int ntuples, slength; - unsigned int *oid; - unsigned short *endoffsets; - short *offsets; - - /* Check for the case x.y.z and a.b.c are same oids */ - ptr = (char *) node; - ntuples = *(GET_NTUPLES(ptr)); - oid = GET_PTR_OID(ptr); - endoffsets = GET_PTR_EOFF(ptr, ntuples); - offsets = GET_PTR_ARRYFLD(ptr, ntuples); - - /* Find offset length for each tuple */ - int numoffset[ntuples]; - numoffset[0] = endoffsets[0]; - for(i = 1; i numoffset[j]){ - slength = numoffset[j]; - sindex = j; - } - else { - slength = numoffset[i]; - sindex = i; - } - - /* Compare the offset values based on the current indices - * break if they do not match - * if all offset values match then pick the largest tuple*/ - - if(i == 0) { - k = 0; - } else { - k = endoffsets[i-1]; - } - index = endoffsets[j -1]; - for(count = 0; count < slength; count ++) { - if (offsets[k] != offsets[index]) { - break; - } - index++; - k++; - } - if(slength == count) { - oid[sindex] = 0; - } - } - } - } +prefetchpile_t *foundLocal(prefetchqelem_t *node) { + char * ptr = (char *) node; + int ntuples = *(GET_NTUPLES(ptr)); + unsigned int * oidarray = GET_PTR_OID(ptr); + unsigned short * endoffsets = GET_PTR_EOFF(ptr, ntuples); + short * arryfields = GET_PTR_ARRYFLD(ptr, ntuples); + prefetchpile_t * head=NULL; + + int i; + for(i=0;inext != NULL) { - if((tmp = (prefetchpile_t *) calloc(1, sizeof(prefetchpile_t))) == NULL) { - printf("Calloc error: %s %d\n", __FILE__, __LINE__); - return NULL; - } - tmp->mid = myIpAddr; - tmp->next = head; - head = tmp; - } else { - head->mid = myIpAddr; - } - continue; - } - /* For each tuple make piles */ - if ((machinenum = lhashSearch(oid[i])) == 0) { - printf("Error: No such Machine %s, %d\n", __FILE__, __LINE__); - return NULL; - } - /* Insert into machine pile */ - if(i == 0){ - offset = &arryfields[0]; - } else { - offset = &arryfields[endoffsets[i-1]]; - } - if((head = insertPile(machinenum, oid[i], numoffset[i], offset, head)) == NULL){ - printf("Error: Couldn't create a pile %s, %d\n", __FILE__, __LINE__); - return NULL; - } - } - - return head; +int checkoid(unsigned int oid) { + objheader_t *header; + if ((header=mhashSearch(oid))!=NULL) { + //Found on machine + return 1; + } else if ((header=prehashSearch(oid))!=NULL) { + //Found in cache + return 1; + } else { + return 0; + } } -prefetchpile_t *foundLocal(prefetchqelem_t *node) { - int ntuples,i, j, k, oidnfound = 0, arryfieldindex,nextarryfieldindex, flag = 0, val; - unsigned int *oid; - int isArray; - char *ptr, *tmp; - objheader_t *objheader; - unsigned short *endoffsets; - short *arryfields; - - ptr = (char *) node; - ntuples = *(GET_NTUPLES(ptr)); - oid = GET_PTR_OID(ptr); - endoffsets = GET_PTR_EOFF(ptr, ntuples); - arryfields = GET_PTR_ARRYFLD(ptr, ntuples); - - /* Find offset length for each tuple */ - int numoffset[ntuples];//Number of offsets for each tuple - numoffset[0] = endoffsets[0]; - for(i = 1; i NUMCLASSES) { - isArray = 1; - } - if(isArray == 1) { - int elementsize = classsize[TYPE(objheader)]; - struct ArrayObject *ao = (struct ArrayObject *) (tmp + sizeof(objheader_t)); - int length = ao->___length___; - /* Check if array out of bounds */ - if(arryfields[arryfieldindex] < 0 || arryfields[arryfieldindex] >= length) { - break; //if yes then treat the object as found - } - objoid = *((unsigned int *)(tmp + sizeof(objheader_t) + sizeof(struct ArrayObject) + (elementsize*arryfields[arryfieldindex]))); - } else { - objoid = *((unsigned int *)(tmp + sizeof(objheader_t) + arryfields[arryfieldindex])); - } - //Update numoffset array - numoffset[i] = numoffset[i] - 1; - //Update oid array - oid[i] = objoid; - //Update endoffset array - endoffsets[0] = val = numoffset[0]; - for(k = 1; k < ntuples; k++) { - val = val + numoffset[k]; - endoffsets[k] = val; - } - //Update arrayfields array - for(k = 0; k < endoffsets[ntuples-1]; k++) { - arryfields[arryfieldindex+k] = arryfields[arryfieldindex+k+1]; - } - if((objheader = (objheader_t*) mhashSearch(oid[i])) == NULL) { - flag = 1; - checkPreCache(node, numoffset, oid[i], i); - break; - } - tmp = (char *) objheader; - isArray = 0; - } - /*If all offset oids are found locally,make the prefetch tuple invalid */ - if(flag == 0) { - oid[i] = 0; - } - } else { - /* Look in Prefetch cache */ - checkPreCache(node, numoffset, oid[i],i); - } - flag = 0; - } - - /* Make machine groups */ - prefetchpile_t *head = NULL; - if((head = makePreGroups(node, numoffset)) == NULL) { - printf("Error in makePreGroups() %s %d\n", __FILE__, __LINE__); - return NULL; - } - - return head; -} +int lookupObject(unsigned int * oid, short offset) { + objheader_t *header; + if ((header=mhashSearch(*oid))!=NULL) { + //Found on machine + ; + } else if ((header=prehashSearch(*oid))!=NULL) { + //Found in cache + ; + } else { + return 0; + } -void checkPreCache(prefetchqelem_t *node, int *numoffset, unsigned int objoid, int index) { - char *ptr, *tmp; - int ntuples, i, k, flag=0, isArray =0, arryfieldindex, val; - unsigned int * oid; - unsigned short *endoffsets; - short *arryfields; - objheader_t *header; - - ptr = (char *) node; - ntuples = *(GET_NTUPLES(ptr)); - oid = GET_PTR_OID(ptr); - endoffsets = GET_PTR_EOFF(ptr, ntuples); - arryfields = GET_PTR_ARRYFLD(ptr, ntuples); - - if((header = (objheader_t *) prehashSearch(objoid)) == NULL) { - return; - } else { //Found in Prefetch Cache - //TODO Decide if object is too old, if old remove from cache - tmp = (char *) header; - int loopcount = numoffset[index]; - if(index == 0) - arryfieldindex = 0; - else - arryfieldindex = endoffsets[(index - 1)]; - // Check if any of the offset oid is available in the Prefetch cache - for(i = 0; i < loopcount; i++) { - /* Check for arrays */ - if(TYPE(header) > NUMCLASSES) { - isArray = 1; - } - - if(isArray == 1) { - int elementsize = classsize[TYPE(header)]; - struct ArrayObject *ao = (struct ArrayObject *) (tmp + sizeof(objheader_t)); - int length = ao->___length___; - /* Check if array out of bounds */ - if(arryfields[arryfieldindex] < 0 || arryfields[arryfieldindex] >= length) { - break; //if yes treat the object as found - } - objoid = *((unsigned int *)(tmp + sizeof(objheader_t) + sizeof(struct ArrayObject) + (elementsize*arryfields[arryfieldindex]))); - } else { - objoid = *((unsigned int *)(tmp + sizeof(objheader_t) + arryfields[arryfieldindex])); - } - //Update numoffset array - numoffset[index] = numoffset[index] - 1; - //Update oid array - oid[index] = objoid; - //Update endoffset array - endoffsets[0] = val = numoffset[0]; - for(k = 1; k < ntuples; k++) { - val = val + numoffset[k]; - endoffsets[k] = val; - } - //Update arrayfields array - for(k = 0; k < endoffsets[ntuples-1]; k++) { - arryfields[arryfieldindex+k] = arryfields[arryfieldindex+k+1]; - } - if((header = (objheader_t *)prehashSearch(oid[index])) != NULL) { - tmp = (char *) header; - isArray = 0; - } else { - flag = 1; - break; - } - } - } - //Found in the prefetch cache - if(flag == 0 && (numoffset[index] == 0)) { - oid[index] = 0; - } + if(TYPE(header) > NUMCLASSES) { + int elementsize = classsize[TYPE(header)]; + struct ArrayObject *ao = (struct ArrayObject *) (((char *)header) + sizeof(objheader_t)); + int length = ao->___length___; + /* Check if array out of bounds */ + if(offset < 0 || offset >= length) { + //if yes treat the object as found + (*oid)=0; + return 1; + } + (*oid) = *((unsigned int *)(((char *)ao) + sizeof(struct ArrayObject) + (elementsize*offset))); + return 1; + } else { + (*oid) = *((unsigned int *)(((char *)header) + sizeof(objheader_t) + offset)); + return 1; + } } - /* This function is called by the thread calling transPrefetch */ void *transPrefetch(void *t) { while(1) { /* lock mutex of primary prefetch queue */ pthread_mutex_lock(&pqueue.qlock); /* while primary queue is empty, then wait */ - while((pqueue.front == NULL) && (pqueue.rear == NULL)) { + while(pqueue.front == NULL) { pthread_cond_wait(&pqueue.qcond, &pqueue.qlock); } /* dequeue node to create a machine piles and finally unlock mutex */ - prefetchqelem_t *qnode; - if((qnode = pre_dequeue()) == NULL) { - printf("Error: No node returned %s, %d\n", __FILE__, __LINE__); - pthread_mutex_unlock(&pqueue.qlock); - continue; - } + prefetchqelem_t *qnode = pre_dequeue(); pthread_mutex_unlock(&pqueue.qlock); - /* Reduce redundant prefetch requests */ - checkPrefetchTuples(qnode); /* Check if the tuples are found locally, if yes then reduce them further*/ /* and group requests by remote machine ids by calling the makePreGroups() */ prefetchpile_t *pilehead = foundLocal(qnode); - - // Get sock from shared pool - int sd = getSock2(transPrefetchSockPool, pilehead->mid); - - /* Send Prefetch Request */ - prefetchpile_t *ptr = pilehead; - while(ptr != NULL) { - sendPrefetchReq(ptr, sd); - ptr = ptr->next; + + if (pilehead!=NULL) { + // Get sock from shared pool + int sd = getSock2(transPrefetchSockPool, pilehead->mid); + + /* Send Prefetch Request */ + prefetchpile_t *ptr = pilehead; + while(ptr != NULL) { + sendPrefetchReq(ptr, sd); + ptr = ptr->next; + } + + /* Release socket */ + // freeSock(transPrefetchSockPool, pilehead->mid, sd); + + /* Deallocated pilehead */ + mcdealloc(pilehead); + } - - /* Release socket */ - // freeSock(transPrefetchSockPool, pilehead->mid, sd); - - /* Deallocated pilehead */ - mcdealloc(pilehead); - // Deallocate the prefetch queue pile node predealloc(qnode); } } +void sendPrefetchReqnew(prefetchpile_t *mcpilenode, int sd) { + objpile_t *tmp; + + int size=sizeof(char)+sizeof(int); + for(tmp=mcpilenode->objpiles;tmp!=NULL;tmp=tmp->next) { + size += sizeof(int) + sizeof(unsigned int) + sizeof(unsigned int) + ((tmp->numoffset) * sizeof(short)); + } + + char buft[size]; + char *buf=buft; + *buf=TRANS_PREFETCH; + buf+=sizeof(char); + + for(tmp=mcpilenode->objpiles;tmp!=NULL;tmp=tmp->next) { + int len = sizeof(int) + sizeof(unsigned int) + sizeof(unsigned int) + ((tmp->numoffset) * sizeof(short)); + *((int*)buf)=len; + buf+=sizeof(int); + *((unsigned int *)buf)=tmp->oid; + buf+=sizeof(unsigned int); + *((unsigned int *)(buf)) = myIpAddr; + buf+=sizeof(unsigned int); + memcpy(buf, tmp->offset, tmp->numoffset*sizeof(short)); + buf+=tmp->numoffset*sizeof(short); + } + *((int *)buf)=-1; + send_data(sd, buft, size); + return; +} + void sendPrefetchReq(prefetchpile_t *mcpilenode, int sd) { - int off, len, endpair, count = 0; + int len, endpair; char control; objpile_t *tmp; @@ -1450,22 +1171,16 @@ void sendPrefetchReq(prefetchpile_t *mcpilenode, int sd) { /* Send Oids and offsets in pairs */ tmp = mcpilenode->objpiles; while(tmp != NULL) { - off = 0; - count++; /* Keeps track of the number of oid and offset tuples sent per remote machine */ len = sizeof(int) + sizeof(unsigned int) + sizeof(unsigned int) + ((tmp->numoffset) * sizeof(short)); char oidnoffset[len]; - bzero(oidnoffset, len); - *((int*)oidnoffset) = len; - off = sizeof(int); - *((unsigned int *)(oidnoffset + off)) = tmp->oid; - off += sizeof(unsigned int); - *((unsigned int *)(oidnoffset + off)) = myIpAddr; - off += sizeof(unsigned int); - int i; - for(i = 0; i < tmp->numoffset; i++) { - *((short*)(oidnoffset + off)) = tmp->offset[i]; - off+=sizeof(short); - } + char *buf=oidnoffset; + *((int*)buf) = len; + buf+=sizeof(int); + *((unsigned int *)buf) = tmp->oid; + buf+=sizeof(unsigned int); + *((unsigned int *)buf) = myIpAddr; + buf += sizeof(unsigned int); + memcpy(buf, tmp->offset, tmp->numoffset*sizeof(short)); send_data(sd, oidnoffset, len); tmp = tmp->next; } @@ -1478,91 +1193,84 @@ void sendPrefetchReq(prefetchpile_t *mcpilenode, int sd) { } int getPrefetchResponse(int sd) { - int numbytes = 0, length = 0, size = 0; - char *recvbuffer, control; - unsigned int oid; - void *modptr, *oldptr; - - recv_data((int)sd, &length, sizeof(int)); - size = length - sizeof(int); - if((recvbuffer = calloc(1, size)) == NULL) { - printf("%s() Calloc error at %s,%d\n", __func__, __FILE__, __LINE__); - return -1; - } - - recv_data((int)sd, recvbuffer, size); - - control = *((char *) recvbuffer); - if(control == OBJECT_FOUND) { - numbytes = 0; - oid = *((unsigned int *)(recvbuffer + sizeof(char))); - size = size - (sizeof(char) + sizeof(unsigned int)); - pthread_mutex_lock(&prefetchcache_mutex); - if ((modptr = objstrAlloc(prefetchcache, size)) == NULL) { - printf("Error: objstrAlloc error for copying into prefetch cache %s, %d\n", __FILE__, __LINE__); - pthread_mutex_unlock(&prefetchcache_mutex); - free(recvbuffer); - return -1; - } - pthread_mutex_unlock(&prefetchcache_mutex); - memcpy(modptr, recvbuffer + sizeof(char) + sizeof(unsigned int), size); - - /* Insert the oid and its address into the prefetch hash lookup table */ - /* Do a version comparison if the oid exists */ - if((oldptr = prehashSearch(oid)) != NULL) { - /* If older version then update with new object ptr */ - if(((objheader_t *)oldptr)->version <= ((objheader_t *)modptr)->version) { - prehashRemove(oid); - prehashInsert(oid, modptr); - } else { - /* TODO modptr should be reference counted */ - } - } else {/* Else add the object ptr to hash table*/ - prehashInsert(oid, modptr); - } - /* Lock the Prefetch Cache look up table*/ - pthread_mutex_lock(&pflookup.lock); - /* Broadcast signal on prefetch cache condition variable */ - pthread_cond_broadcast(&pflookup.cond); - /* Unlock the Prefetch Cache look up table*/ - pthread_mutex_unlock(&pflookup.lock); - } else if(control == OBJECT_NOT_FOUND) { - oid = *((unsigned int *)(recvbuffer + sizeof(char))); - /* TODO: For each object not found query DHT for new location and retrieve the object */ - /* Throw an error */ - printf("OBJECT %x NOT FOUND.... THIS SHOULD NOT HAPPEN...TERMINATE PROGRAM\n", oid); - free(recvbuffer); - exit(-1); - } else { - printf("Error: in decoding the control value %d, %s, %d\n",control, __FILE__, __LINE__); - } - - free(recvbuffer); - - return 0; + int numbytes = 0, length = 0, size = 0; + char *recvbuffer, control; + unsigned int oid; + void *modptr, *oldptr; + + recv_data((int)sd, &length, sizeof(int)); + size = length - sizeof(int); + recvbuffer = calloc(1, size); + + recv_data((int)sd, recvbuffer, size); + + control = *((char *) recvbuffer); + if(control == OBJECT_FOUND) { + numbytes = 0; + oid = *((unsigned int *)(recvbuffer + sizeof(char))); + size = size - (sizeof(char) + sizeof(unsigned int)); + pthread_mutex_lock(&prefetchcache_mutex); + if ((modptr = objstrAlloc(prefetchcache, size)) == NULL) { + printf("Error: objstrAlloc error for copying into prefetch cache %s, %d\n", __FILE__, __LINE__); + pthread_mutex_unlock(&prefetchcache_mutex); + free(recvbuffer); + return -1; + } + pthread_mutex_unlock(&prefetchcache_mutex); + memcpy(modptr, recvbuffer + sizeof(char) + sizeof(unsigned int), size); + + /* Insert the oid and its address into the prefetch hash lookup table */ + /* Do a version comparison if the oid exists */ + if((oldptr = prehashSearch(oid)) != NULL) { + /* If older version then update with new object ptr */ + if(((objheader_t *)oldptr)->version <= ((objheader_t *)modptr)->version) { + prehashRemove(oid); + prehashInsert(oid, modptr); + } else { + /* TODO modptr should be reference counted */ + } + } else {/* Else add the object ptr to hash table*/ + prehashInsert(oid, modptr); + } + /* Lock the Prefetch Cache look up table*/ + pthread_mutex_lock(&pflookup.lock); + /* Broadcast signal on prefetch cache condition variable */ + pthread_cond_broadcast(&pflookup.cond); + /* Unlock the Prefetch Cache look up table*/ + pthread_mutex_unlock(&pflookup.lock); + } else if(control == OBJECT_NOT_FOUND) { + oid = *((unsigned int *)(recvbuffer + sizeof(char))); + /* TODO: For each object not found query DHT for new location and retrieve the object */ + /* Throw an error */ + printf("OBJECT %x NOT FOUND.... THIS SHOULD NOT HAPPEN...TERMINATE PROGRAM\n", oid); + free(recvbuffer); + exit(-1); + } else { + printf("Error: in decoding the control value %d, %s, %d\n",control, __FILE__, __LINE__); + } + + free(recvbuffer); + + return 0; } -unsigned short getObjType(unsigned int oid) -{ - objheader_t *objheader; - unsigned short numoffset[] ={0}; - short fieldoffset[] ={}; - - if ((objheader = (objheader_t *) mhashSearch(oid)) == NULL) - { - if ((objheader = (objheader_t *) prehashSearch(oid)) == NULL) - { - prefetch(1, &oid, numoffset, fieldoffset); - pthread_mutex_lock(&pflookup.lock); - while ((objheader = (objheader_t *) prehashSearch(oid)) == NULL) - { - pthread_cond_wait(&pflookup.cond, &pflookup.lock); - } - pthread_mutex_unlock(&pflookup.lock); - } +unsigned short getObjType(unsigned int oid) { + objheader_t *objheader; + unsigned short numoffset[] ={0}; + short fieldoffset[] ={}; + + if ((objheader = (objheader_t *) mhashSearch(oid)) == NULL) { + if ((objheader = (objheader_t *) prehashSearch(oid)) == NULL) { + prefetch(1, &oid, numoffset, fieldoffset); + pthread_mutex_lock(&pflookup.lock); + while ((objheader = (objheader_t *) prehashSearch(oid)) == NULL) { + pthread_cond_wait(&pflookup.cond, &pflookup.lock); } - - return TYPE(objheader); + pthread_mutex_unlock(&pflookup.lock); + } + } + + return TYPE(objheader); } int startRemoteThread(unsigned int oid, unsigned int mid) -- 2.34.1