From 6cef6c1ded0405ccce889ce6f340953aacc26116 Mon Sep 17 00:00:00 2001 From: bdemsky Date: Sat, 3 May 2008 20:07:04 +0000 Subject: [PATCH] various changes... bug fix for transactions with a large number of objects --- Robust/src/Runtime/DSTM/interface/dstm.h | 10 +- .../src/Runtime/DSTM/interface/dstmserver.c | 227 +++++----- Robust/src/Runtime/DSTM/interface/objstr.c | 91 ++-- Robust/src/Runtime/DSTM/interface/plookup.c | 125 +++--- Robust/src/Runtime/DSTM/interface/plookup.h | 6 +- Robust/src/Runtime/DSTM/interface/prelookup.c | 364 +++++++-------- Robust/src/Runtime/DSTM/interface/sockpool.c | 4 +- Robust/src/Runtime/DSTM/interface/sockpool.h | 2 + Robust/src/Runtime/DSTM/interface/trans.c | 422 +++++++++--------- 9 files changed, 614 insertions(+), 637 deletions(-) diff --git a/Robust/src/Runtime/DSTM/interface/dstm.h b/Robust/src/Runtime/DSTM/interface/dstm.h index 4b5b9e4b..6311e213 100644 --- a/Robust/src/Runtime/DSTM/interface/dstm.h +++ b/Robust/src/Runtime/DSTM/interface/dstm.h @@ -99,6 +99,9 @@ typedef struct objheader { #define STATUS(x)\ *((unsigned int *) &(((struct ___Object___ *)((unsigned int) x + sizeof(objheader_t)))->___localcopy___)) +#define STATUSPTR(x)\ + ((unsigned int *) &(((struct ___Object___ *)((unsigned int) x + sizeof(objheader_t)))->___localcopy___)) + #define TYPE(x)\ ((struct ___Object___ *)((unsigned int) x + sizeof(objheader_t)))->type @@ -125,6 +128,7 @@ typedef struct objheader { #define OID(x) x->oid #define TYPE(x) x->type #define STATUS(x) x->status +#define STATUSPTR(x) &x->status #define GETSIZE(size, x) size=classsize[TYPE(x)] #endif @@ -158,9 +162,9 @@ typedef struct fixed_data { char control; /* control message */ char trans_id[TID_LEN]; /* transaction id */ int mcount; /* participant count */ - short numread; /* no of objects read */ - short nummod; /* no of objects modified */ - short numcreated; /* no of objects created */ + unsigned int numread; /* no of objects read */ + unsigned int nummod; /* no of objects modified */ + unsigned int numcreated; /* no of objects created */ int sum_bytes; /* total bytes of modified objects in a transaction */ } fixed_data_t; diff --git a/Robust/src/Runtime/DSTM/interface/dstmserver.c b/Robust/src/Runtime/DSTM/interface/dstmserver.c index b5bcac5b..2674a911 100644 --- a/Robust/src/Runtime/DSTM/interface/dstmserver.c +++ b/Robust/src/Runtime/DSTM/interface/dstmserver.c @@ -365,7 +365,7 @@ int processClientReq(fixed_data_t *fixed, trans_commit_data_t *transinfo, printf("mhashSearch returns NULL at %s, %d\n", __FILE__, __LINE__);// find the header address return 1; } - STATUS(((objheader_t *)header)) &= ~(LOCK); + UnLock(STATUSPTR(header)); } /* Send ack to Coordinator */ @@ -454,9 +454,8 @@ char handleTransReq(fixed_data_t *fixed, trans_commit_data_t *transinfo, unsigne objnotfound++; } else { /* If Obj found in machine (i.e. has not moved) */ /* Check if Obj is locked by any previous transaction */ - pthread_mutex_lock(&lockObjHeader); - if ((STATUS(((objheader_t *)mobj)) & LOCK) == LOCK) { - pthread_mutex_unlock(&lockObjHeader); + if (test_and_set(STATUSPTR(mobj))) { + //don't have lock if (version == ((objheader_t *)mobj)->version) { /* If locked then match versions */ v_matchlock++; } else {/* If versions don't match ...HARD ABORT */ @@ -469,7 +468,7 @@ char handleTransReq(fixed_data_t *fixed, trans_commit_data_t *transinfo, unsigne printf("mhashSearch returns NULL at %s, %d\n", __FILE__, __LINE__); return 0; } - STATUS(headptr) &= ~(LOCK); + UnLock(STATUSPTR(headptr)); } free(oidlocked); } @@ -477,8 +476,6 @@ char handleTransReq(fixed_data_t *fixed, trans_commit_data_t *transinfo, unsigne return control; } } else {/* If Obj is not locked then lock object */ - STATUS(((objheader_t *)mobj)) |= LOCK; - pthread_mutex_unlock(&lockObjHeader); /* Save all object oids that are locked on this machine during this transaction request call */ oidlocked[objlocked] = OID(((objheader_t *)mobj)); objlocked++; @@ -493,7 +490,7 @@ char handleTransReq(fixed_data_t *fixed, trans_commit_data_t *transinfo, unsigne printf("mhashSearch returns NULL at %s, %d\n", __FILE__, __LINE__); return 0; } - STATUS(headptr) &= ~(LOCK); + UnLock(STATUSPTR(headptr)); } free(oidlocked); } @@ -562,47 +559,45 @@ char decideCtrlMessage(fixed_data_t *fixed, trans_commit_data_t *transinfo, int * addresses in lookup table and also changes version number * Sends an ACK back to Coordinator */ int transCommitProcess(void *modptr, unsigned int *oidmod, unsigned int *oidlocked, int nummod, int numlocked, int acceptfd) { - objheader_t *header; - objheader_t *newheader; - int i = 0, offset = 0; - char control; - int tmpsize; - - /* Process each modified object saved in the mainobject store */ - for(i = 0; i < nummod; i++) { - if((header = (objheader_t *) mhashSearch(oidmod[i])) == NULL) { - printf("Error: mhashsearch returns NULL at %s, %d\n", __FILE__, __LINE__); - return 1; - } - GETSIZE(tmpsize,header); - memcpy((char*)header + sizeof(objheader_t), ((char *)modptr + sizeof(objheader_t) + offset), tmpsize); - header->version += 1; - /* If threads are waiting on this object to be updated, notify them */ - pthread_mutex_lock(¬ifymutex); - if(header->notifylist != NULL) { - notifyAll(&header->notifylist, OID(header), header->version); - } - pthread_mutex_unlock(¬ifymutex); - offset += sizeof(objheader_t) + tmpsize; - } - - if (nummod > 0) - free(modptr); - - /* Unlock locked objects */ - for(i = 0; i < numlocked; i++) { - if((header = (objheader_t *) mhashSearch(oidlocked[i])) == NULL) { - printf("Error: mhashsearch returns NULL at %s, %d\n", __FILE__, __LINE__); - return 1; - } - STATUS(header) &= ~(LOCK); - } - //TODO Update location lookup table - - /* Send ack to coordinator */ - control = TRANS_SUCESSFUL; - send_data((int)acceptfd, &control, sizeof(char)); - return 0; + objheader_t *header; + objheader_t *newheader; + int i = 0, offset = 0; + char control; + int tmpsize; + + /* Process each modified object saved in the mainobject store */ + for(i = 0; i < nummod; i++) { + if((header = (objheader_t *) mhashSearch(oidmod[i])) == NULL) { + printf("Error: mhashsearch returns NULL at %s, %d\n", __FILE__, __LINE__); + return 1; + } + GETSIZE(tmpsize,header); + memcpy((char*)header + sizeof(objheader_t), ((char *)modptr + sizeof(objheader_t) + offset), tmpsize); + header->version += 1; + /* If threads are waiting on this object to be updated, notify them */ + if(header->notifylist != NULL) { + notifyAll(&header->notifylist, OID(header), header->version); + } + offset += sizeof(objheader_t) + tmpsize; + } + + if (nummod > 0) + free(modptr); + + /* Unlock locked objects */ + for(i = 0; i < numlocked; i++) { + if((header = (objheader_t *) mhashSearch(oidlocked[i])) == NULL) { + printf("Error: mhashsearch returns NULL at %s, %d\n", __FILE__, __LINE__); + return 1; + } + UnLock(STATUSPTR(header)); + } + //TODO Update location lookup table + + /* Send ack to coordinator */ + control = TRANS_SUCESSFUL; + send_data((int)acceptfd, &control, sizeof(char)); + return 0; } /* This function recevies the oid and offset tuples from the Coordinator's prefetch call. @@ -725,74 +720,70 @@ void sendPrefetchResponse(int sd, char *control, char *sendbuffer, int *size) { } void processReqNotify(unsigned int numoid, unsigned int *oidarry, unsigned short *versionarry, unsigned int mid, unsigned int threadid) { - objheader_t *header; - unsigned int oid; - unsigned short newversion; - char msg[1+ 2 * sizeof(unsigned int) + sizeof(unsigned short)]; - int sd; - struct sockaddr_in remoteAddr; - int bytesSent; - int size; - int i = 0; - - while(i < numoid) { - oid = *(oidarry + i); - if((header = (objheader_t *) mhashSearch(oid)) == NULL) { - printf("Error: mhashsearch returns NULL at %s, %d\n", __FILE__, __LINE__); - return; - } else { - /* Check to see if versions are same */ -checkversion: - if ((STATUS(header) & LOCK) != LOCK) { - pthread_mutex_lock(¬ifymutex); - STATUS(header) |= LOCK; - newversion = header->version; - if(newversion == *(versionarry + i)) { - //Add to the notify list - if((header->notifylist = insNode(header->notifylist, threadid, mid)) == NULL) { - printf("Error: Obj notify list points to NULL %s, %d\n", __FILE__, __LINE__); - pthread_mutex_unlock(¬ifymutex); - return; - } - STATUS(header) &= ~(LOCK); - pthread_mutex_unlock(¬ifymutex); - } else { - STATUS(header) &= ~(LOCK); - pthread_mutex_unlock(¬ifymutex); - if ((sd = socket(AF_INET, SOCK_STREAM, 0)) < 0){ - perror("processReqNotify():socket()"); - return; - } - bzero(&remoteAddr, sizeof(remoteAddr)); - remoteAddr.sin_family = AF_INET; - remoteAddr.sin_port = htons(LISTEN_PORT); - remoteAddr.sin_addr.s_addr = htonl(mid); - - if (connect(sd, (struct sockaddr *)&remoteAddr, sizeof(remoteAddr)) < 0){ - printf("Error: processReqNotify():error %d connecting to %s:%d\n", errno, - inet_ntoa(remoteAddr.sin_addr), LISTEN_PORT); - close(sd); - return; - } else { - //Send Update notification - msg[0] = THREAD_NOTIFY_RESPONSE; - *((unsigned int *)&msg[1]) = oid; - size = sizeof(unsigned int); - *((unsigned short *)(&msg[1]+size)) = newversion; - size += sizeof(unsigned short); - *((unsigned int *)(&msg[1]+size)) = threadid; - size = 1+ 2*sizeof(unsigned int) + sizeof(unsigned short); - send_data(sd, msg, size); - } - close(sd); - } - } else { - randomdelay(); - goto checkversion; - } - } - i++; + objheader_t *header; + unsigned int oid; + unsigned short newversion; + char msg[1+ 2 * sizeof(unsigned int) + sizeof(unsigned short)]; + int sd; + struct sockaddr_in remoteAddr; + int bytesSent; + int size; + int i = 0; + + while(i < numoid) { + oid = *(oidarry + i); + if((header = (objheader_t *) mhashSearch(oid)) == NULL) { + printf("Error: mhashsearch returns NULL at %s, %d\n", __FILE__, __LINE__); + return; + } else { + /* Check to see if versions are same */ + checkversion: + if (test_and_set(STATUSPTR(header))==0) { + //have lock + newversion = header->version; + if(newversion == *(versionarry + i)) { + //Add to the notify list + if((header->notifylist = insNode(header->notifylist, threadid, mid)) == NULL) { + printf("Error: Obj notify list points to NULL %s, %d\n", __FILE__, __LINE__); + return; + } + UnLock(STATUSPTR(header)); + } else { + UnLock(STATUSPTR(header)); + if ((sd = socket(AF_INET, SOCK_STREAM, 0)) < 0){ + perror("processReqNotify():socket()"); + return; + } + bzero(&remoteAddr, sizeof(remoteAddr)); + remoteAddr.sin_family = AF_INET; + remoteAddr.sin_port = htons(LISTEN_PORT); + remoteAddr.sin_addr.s_addr = htonl(mid); + + if (connect(sd, (struct sockaddr *)&remoteAddr, sizeof(remoteAddr)) < 0){ + printf("Error: processReqNotify():error %d connecting to %s:%d\n", errno, + inet_ntoa(remoteAddr.sin_addr), LISTEN_PORT); + close(sd); + return; + } else { + //Send Update notification + msg[0] = THREAD_NOTIFY_RESPONSE; + *((unsigned int *)&msg[1]) = oid; + size = sizeof(unsigned int); + *((unsigned short *)(&msg[1]+size)) = newversion; + size += sizeof(unsigned short); + *((unsigned int *)(&msg[1]+size)) = threadid; + size = 1+ 2*sizeof(unsigned int) + sizeof(unsigned short); + send_data(sd, msg, size); + } + close(sd); } - free(oidarry); - free(versionarry); + } else { + randomdelay(); + goto checkversion; + } + } + i++; + } + free(oidarry); + free(versionarry); } diff --git a/Robust/src/Runtime/DSTM/interface/objstr.c b/Robust/src/Runtime/DSTM/interface/objstr.c index b8279487..e7caec56 100644 --- a/Robust/src/Runtime/DSTM/interface/objstr.c +++ b/Robust/src/Runtime/DSTM/interface/objstr.c @@ -14,59 +14,46 @@ objstr_t *objstrCreate(unsigned int size) { } //free entire list, starting at store -void objstrDelete(objstr_t *store) -{ - objstr_t *tmp; - while (store != NULL) - { - tmp = store->next; - free(store); - store = tmp; - } - return; +void objstrDelete(objstr_t *store) { + objstr_t *tmp; + while (store != NULL) { + tmp = store->next; + free(store); + store = tmp; + } + return; } -void *objstrAlloc(objstr_t *store, unsigned int size) -{ - void *tmp; - while (1) - { - if (((unsigned int)store->top - (unsigned int)store - sizeof(objstr_t) + size) <= store->size) - { //store not full - tmp = store->top; - store->top += size; - return tmp; - } - //store full - if (store->next == NULL) - { //end of list, all full - if (size > DEFAULT_OBJ_STORE_SIZE) //in case of large objects - { - if((store->next = (objstr_t *)calloc(1,(sizeof(objstr_t) + size))) == NULL) { - printf("%s() Calloc error at line %d, %s\n", __func__, __LINE__, __FILE__); - return NULL; - } - if (store->next == NULL) - return NULL; - store = store->next; - store->size = size; - } - else - { - if((store->next = calloc(1,(sizeof(objstr_t) + DEFAULT_OBJ_STORE_SIZE))) == NULL) { - printf("%s() Calloc error at line %d, %s\n", __func__, __LINE__, __FILE__); - return NULL; - } - if (store->next == NULL) - return NULL; - store = store->next; - store->next = NULL; - store->size = DEFAULT_OBJ_STORE_SIZE; - } - store->top = (void *)((unsigned int)store + sizeof(objstr_t) + size); - return (void *)((unsigned int)store + sizeof(objstr_t)); - } - else //try the next one - store = store->next; +void *objstrAlloc(objstr_t *store, unsigned int size) { + void *tmp; + while (1) { + if (((unsigned int)store->top - (((unsigned int)store) + sizeof(objstr_t)) + size) <= store->size) { //store not full + tmp = store->top; + store->top += size; + return tmp; + } + //store full + if (store->next == NULL) { + //end of list, all full + if (size > DEFAULT_OBJ_STORE_SIZE) { + //in case of large objects + if((store->next = (objstr_t *)calloc(1,(sizeof(objstr_t) + size))) == NULL) { + printf("%s() Calloc error at line %d, %s\n", __func__, __LINE__, __FILE__); + return NULL; } + store = store->next; + store->size = size; + } else { + if((store->next = calloc(1,(sizeof(objstr_t) + DEFAULT_OBJ_STORE_SIZE))) == NULL) { + printf("%s() Calloc error at line %d, %s\n", __func__, __LINE__, __FILE__); + return NULL; + } + store = store->next; + store->size = DEFAULT_OBJ_STORE_SIZE; + } + store->top = (void *)(((unsigned int)store) + sizeof(objstr_t) + size); + return (void *)(((unsigned int)store) + sizeof(objstr_t)); + } else + store = store->next; + } } diff --git a/Robust/src/Runtime/DSTM/interface/plookup.c b/Robust/src/Runtime/DSTM/interface/plookup.c index 7672ee0e..aafebf0a 100644 --- a/Robust/src/Runtime/DSTM/interface/plookup.c +++ b/Robust/src/Runtime/DSTM/interface/plookup.c @@ -43,69 +43,68 @@ plistnode_t *pCreate(int objects) { /* This function inserts necessary information into * a machine pile data structure */ plistnode_t *pInsert(plistnode_t *pile, objheader_t *headeraddr, unsigned int mid, int num_objs) { - plistnode_t *ptr, *tmp; - int found = 0, offset = 0; - - tmp = pile; - //Add oid into a machine that is already present in the pile linked list structure - while(tmp != NULL) { - if (tmp->mid == mid) { - int tmpsize; - - if (STATUS(headeraddr) & NEW) { - tmp->oidcreated[tmp->numcreated] = OID(headeraddr); - tmp->numcreated = tmp->numcreated + 1; - GETSIZE(tmpsize, headeraddr); - tmp->sum_bytes += sizeof(objheader_t) + tmpsize; - }else if (STATUS(headeraddr) & DIRTY) { - tmp->oidmod[tmp->nummod] = OID(headeraddr); - tmp->nummod = tmp->nummod + 1; - GETSIZE(tmpsize, headeraddr); - tmp->sum_bytes += sizeof(objheader_t) + tmpsize; - } else { - offset = (sizeof(unsigned int) + sizeof(short)) * tmp->numread; - *((unsigned int *)(((char *)tmp->objread) + offset))=OID(headeraddr); - offset += sizeof(unsigned int); - *((short *)(((char *)tmp->objread) + offset)) = headeraddr->version; - tmp->numread = tmp->numread + 1; - } - found = 1; - break; - } - tmp = tmp->next; - } - //Add oid for any new machine - if (!found) { - int tmpsize; - if((ptr = pCreate(num_objs)) == NULL) { - return NULL; - } - ptr->mid = mid; - if (STATUS(headeraddr) & NEW) { - ptr->oidcreated[ptr->numcreated] = OID(headeraddr); - ptr->numcreated = ptr->numcreated + 1; - GETSIZE(tmpsize, headeraddr); - ptr->sum_bytes += sizeof(objheader_t) + tmpsize; - } else if (STATUS(headeraddr) & DIRTY) { - ptr->oidmod[ptr->nummod] = OID(headeraddr); - ptr->nummod = ptr->nummod + 1; - GETSIZE(tmpsize, headeraddr); - ptr->sum_bytes += sizeof(objheader_t) + tmpsize; - } else { - *((unsigned int *)ptr->objread)=OID(headeraddr); - offset = sizeof(unsigned int); - *((short *)(((char *)ptr->objread) + offset)) = headeraddr->version; - ptr->numread = ptr->numread + 1; - } - ptr->next = pile; - pile = ptr; - } - - /* Clear Flags */ - STATUS(headeraddr) &= ~NEW; - STATUS(headeraddr) &= ~DIRTY; - - return pile; + plistnode_t *ptr, *tmp; + int found = 0, offset = 0; + + tmp = pile; + //Add oid into a machine that is already present in the pile linked list structure + while(tmp != NULL) { + if (tmp->mid == mid) { + int tmpsize; + + if (STATUS(headeraddr) & NEW) { + tmp->oidcreated[tmp->numcreated] = OID(headeraddr); + tmp->numcreated++; + GETSIZE(tmpsize, headeraddr); + tmp->sum_bytes += sizeof(objheader_t) + tmpsize; + }else if (STATUS(headeraddr) & DIRTY) { + tmp->oidmod[tmp->nummod] = OID(headeraddr); + tmp->nummod++; + GETSIZE(tmpsize, headeraddr); + tmp->sum_bytes += sizeof(objheader_t) + tmpsize; + } else { + offset = (sizeof(unsigned int) + sizeof(short)) * tmp->numread; + *((unsigned int *)(((char *)tmp->objread) + offset))=OID(headeraddr); + offset += sizeof(unsigned int); + *((short *)(((char *)tmp->objread) + offset)) = headeraddr->version; + tmp->numread ++; + } + found = 1; + break; + } + tmp = tmp->next; + } + //Add oid for any new machine + if (!found) { + int tmpsize; + if((ptr = pCreate(num_objs)) == NULL) { + return NULL; + } + ptr->mid = mid; + if (STATUS(headeraddr) & NEW) { + ptr->oidcreated[ptr->numcreated] = OID(headeraddr); + ptr->numcreated ++; + GETSIZE(tmpsize, headeraddr); + ptr->sum_bytes += sizeof(objheader_t) + tmpsize; + } else if (STATUS(headeraddr) & DIRTY) { + ptr->oidmod[ptr->nummod] = OID(headeraddr); + ptr->nummod ++; + GETSIZE(tmpsize, headeraddr); + ptr->sum_bytes += sizeof(objheader_t) + tmpsize; + } else { + *((unsigned int *)ptr->objread)=OID(headeraddr); + offset = sizeof(unsigned int); + *((short *)(((char *)ptr->objread) + offset)) = headeraddr->version; + ptr->numread ++; + } + ptr->next = pile; + pile = ptr; + } + + /* Clear Flags */ + STATUS(headeraddr) =0; + + return pile; } //Count the number of machine piles diff --git a/Robust/src/Runtime/DSTM/interface/plookup.h b/Robust/src/Runtime/DSTM/interface/plookup.h index 777f0259..fdcfc535 100644 --- a/Robust/src/Runtime/DSTM/interface/plookup.h +++ b/Robust/src/Runtime/DSTM/interface/plookup.h @@ -10,9 +10,9 @@ * participants involved in a transaction. */ typedef struct plistnode { unsigned int mid; - short numread; /* no of objects modified */ - short nummod; /* no of objects read */ - short numcreated; /* no of objects created */ + unsigned int numread; /* no of objects modified */ + unsigned int nummod; /* no of objects read */ + unsigned int numcreated; /* no of objects created */ int sum_bytes; /* total bytes of objects modified */ char *objread; /* Pointer to array containing oids of objects read and their version numbers*/ unsigned int *oidmod; /* Pointer to array containing oids of modified objects */ diff --git a/Robust/src/Runtime/DSTM/interface/prelookup.c b/Robust/src/Runtime/DSTM/interface/prelookup.c index 4d596f1f..6564f3ef 100644 --- a/Robust/src/Runtime/DSTM/interface/prelookup.c +++ b/Robust/src/Runtime/DSTM/interface/prelookup.c @@ -4,216 +4,216 @@ prehashtable_t pflookup; //Global prefetch cache table unsigned int prehashCreate(unsigned int size, float loadfactor) { - prehashlistnode_t *nodes; - int i; - - // Allocate space for the hash table - if((nodes = calloc(size, sizeof(prehashlistnode_t))) == NULL) { - printf("Calloc error %s %d\n", __FILE__, __LINE__); - return 1; - } - - pflookup.table = nodes; - pflookup.size = size; - pflookup.numelements = 0; // Initial number of elements in the hash - pflookup.loadfactor = loadfactor; - - //Intiliaze and set prefetch table mutex attribute - pthread_mutexattr_init(&pflookup.prefetchmutexattr); - //NOTE:PTHREAD_MUTEX_RECURSIVE is currently inside a #if_def UNIX98 in the pthread.h file - //Therefore use PTHREAD_MUTEX_RECURSIVE_NP instead - pthread_mutexattr_settype(&pflookup.prefetchmutexattr, PTHREAD_MUTEX_RECURSIVE_NP); - - //Initialize mutex var - pthread_mutex_init(&pflookup.lock, &pflookup.prefetchmutexattr); - //pthread_mutex_init(&pflookup.lock, NULL); - pthread_cond_init(&pflookup.cond, NULL); - return 0; + prehashlistnode_t *nodes; + int i; + + // Allocate space for the hash table + if((nodes = calloc(size, sizeof(prehashlistnode_t))) == NULL) { + printf("Calloc error %s %d\n", __FILE__, __LINE__); + return 1; + } + + pflookup.table = nodes; + pflookup.size = size; + pflookup.numelements = 0; // Initial number of elements in the hash + pflookup.loadfactor = loadfactor; + + //Intiliaze and set prefetch table mutex attribute + pthread_mutexattr_init(&pflookup.prefetchmutexattr); + //NOTE:PTHREAD_MUTEX_RECURSIVE is currently inside a #if_def UNIX98 in the pthread.h file + //Therefore use PTHREAD_MUTEX_RECURSIVE_NP instead + pthread_mutexattr_settype(&pflookup.prefetchmutexattr, PTHREAD_MUTEX_RECURSIVE_NP); + + //Initialize mutex var + pthread_mutex_init(&pflookup.lock, &pflookup.prefetchmutexattr); + //pthread_mutex_init(&pflookup.lock, NULL); + pthread_cond_init(&pflookup.cond, NULL); + return 0; } //Assign keys to bins inside hash table unsigned int prehashFunction(unsigned int key) { - return ( key % (pflookup.size)); + return ( key % (pflookup.size)); } //Store oids and their pointers into hash unsigned int prehashInsert(unsigned int key, void *val) { - unsigned int newsize; - int index; - prehashlistnode_t *ptr, *node; - - if(pflookup.numelements > (pflookup.loadfactor * pflookup.size)) { - //Resize - newsize = 2 * pflookup.size + 1; - pthread_mutex_lock(&pflookup.lock); - prehashResize(newsize); - pthread_mutex_unlock(&pflookup.lock); - } - - ptr = pflookup.table; - pflookup.numelements++; - index = prehashFunction(key); - - pthread_mutex_lock(&pflookup.lock); - if(ptr[index].next == NULL && ptr[index].key == 0) { // Insert at the first position in the hashtable - ptr[index].key = key; - ptr[index].val = val; - } else { // Insert in the beginning of linked list - if ((node = calloc(1, sizeof(prehashlistnode_t))) == NULL) { - printf("Calloc error %s, %d\n", __FILE__, __LINE__); - pthread_mutex_unlock(&pflookup.lock); - return 1; - } - node->key = key; - node->val = val ; - node->next = ptr[index].next; - ptr[index].next = node; - } - pthread_mutex_unlock(&pflookup.lock); - return 0; + unsigned int newsize; + int index; + prehashlistnode_t *ptr, *node; + + if(pflookup.numelements > (pflookup.loadfactor * pflookup.size)) { + //Resize + newsize = 2 * pflookup.size + 1; + pthread_mutex_lock(&pflookup.lock); + prehashResize(newsize); + pthread_mutex_unlock(&pflookup.lock); + } + + ptr = pflookup.table; + pflookup.numelements++; + index = prehashFunction(key); + + pthread_mutex_lock(&pflookup.lock); + if(ptr[index].next == NULL && ptr[index].key == 0) { // Insert at the first position in the hashtable + ptr[index].key = key; + ptr[index].val = val; + } else { // Insert in the beginning of linked list + if ((node = calloc(1, sizeof(prehashlistnode_t))) == NULL) { + printf("Calloc error %s, %d\n", __FILE__, __LINE__); + pthread_mutex_unlock(&pflookup.lock); + return 1; + } + node->key = key; + node->val = val ; + node->next = ptr[index].next; + ptr[index].next = node; + } + pthread_mutex_unlock(&pflookup.lock); + return 0; } // Search for an address for a given oid void *prehashSearch(unsigned int key) { - int index; - prehashlistnode_t *ptr, *node; - - ptr = pflookup.table; - index = prehashFunction(key); - node = &ptr[index]; - pthread_mutex_lock(&pflookup.lock); - while(node != NULL) { - if(node->key == key) { - pthread_mutex_unlock(&pflookup.lock); - return node->val; - } - node = node->next; - } - pthread_mutex_unlock(&pflookup.lock); - return NULL; + int index; + prehashlistnode_t *ptr, *node; + + ptr = pflookup.table; + index = prehashFunction(key); + node = &ptr[index]; + pthread_mutex_lock(&pflookup.lock); + while(node != NULL) { + if(node->key == key) { + pthread_mutex_unlock(&pflookup.lock); + return node->val; + } + node = node->next; + } + pthread_mutex_unlock(&pflookup.lock); + return NULL; } unsigned int prehashRemove(unsigned int key) { - int index; - prehashlistnode_t *curr, *prev; - prehashlistnode_t *ptr, *node; - - ptr = pflookup.table; - index = prehashFunction(key); - curr = &ptr[index]; - - pthread_mutex_lock(&pflookup.lock); - for (; curr != NULL; curr = curr->next) { - if (curr->key == key) { // Find a match in the hash table - pflookup.numelements--; // Decrement the number of elements in the global hashtable - if ((curr == &ptr[index]) && (curr->next == NULL)) { // Delete the first item inside the hashtable with no linked list of prehashlistnode_t - curr->key = 0; - curr->val = NULL; - } else if ((curr == &ptr[index]) && (curr->next != NULL)) { //Delete the first item with a linked list of prehashlistnode_t connected - curr->key = curr->next->key; - curr->val = curr->next->val; - node = curr->next; - curr->next = curr->next->next; - free(node); - } else { // Regular delete from linked listed - prev->next = curr->next; - free(curr); - } - pthread_mutex_unlock(&pflookup.lock); - return 0; - } - prev = curr; - } - pthread_mutex_unlock(&pflookup.lock); - return 1; + int index; + prehashlistnode_t *curr, *prev; + prehashlistnode_t *ptr, *node; + + ptr = pflookup.table; + index = prehashFunction(key); + curr = &ptr[index]; + + pthread_mutex_lock(&pflookup.lock); + for (; curr != NULL; curr = curr->next) { + if (curr->key == key) { // Find a match in the hash table + pflookup.numelements--; // Decrement the number of elements in the global hashtable + if ((curr == &ptr[index]) && (curr->next == NULL)) { // Delete the first item inside the hashtable with no linked list of prehashlistnode_t + curr->key = 0; + curr->val = NULL; + } else if ((curr == &ptr[index]) && (curr->next != NULL)) { //Delete the first item with a linked list of prehashlistnode_t connected + curr->key = curr->next->key; + curr->val = curr->next->val; + node = curr->next; + curr->next = curr->next->next; + free(node); + } else { // Regular delete from linked listed + prev->next = curr->next; + free(curr); + } + pthread_mutex_unlock(&pflookup.lock); + return 0; + } + prev = curr; + } + pthread_mutex_unlock(&pflookup.lock); + return 1; } unsigned int prehashResize(unsigned int newsize) { - prehashlistnode_t *node, *ptr, *curr, *next; // curr and next keep track of the current and the next chashlistnodes in a linked list - unsigned int oldsize; - int isfirst; // Keeps track of the first element in the prehashlistnode_t for each bin in hashtable - int i,index; - prehashlistnode_t *newnode; - - ptr = pflookup.table; - oldsize = pflookup.size; - - if((node = calloc(newsize, sizeof(prehashlistnode_t))) == NULL) { - printf("Calloc error %s %d\n", __FILE__, __LINE__); - return 1; - } - - pflookup.table = node; //Update the global hashtable upon resize() - pflookup.size = newsize; - pflookup.numelements = 0; - - for(i = 0; i < oldsize; i++) { //Outer loop for each bin in hash table - curr = &ptr[i]; - isfirst = 1; - while (curr != NULL) { //Inner loop to go through linked lists - if (curr->key == 0) { //Exit inner loop if there the first element for a given bin/index is NULL - break; //key = val =0 for element if not present within the hash table - } - next = curr->next; - index = prehashFunction(curr->key); - // Insert into the new table - if(pflookup.table[index].next == NULL && pflookup.table[index].key == 0) { - pflookup.table[index].key = curr->key; - pflookup.table[index].val = curr->val; - pflookup.numelements++; - }else { - if((newnode = calloc(1, sizeof(prehashlistnode_t))) == NULL) { - printf("Calloc error %s, %d\n", __FILE__, __LINE__); - return 1; - } - newnode->key = curr->key; - newnode->val = curr->val; - newnode->next = pflookup.table[index].next; - pflookup.table[index].next = newnode; - pflookup.numelements++; - } - - //free the linked list of prehashlistnode_t if not the first element in the hash table - if (isfirst != 1) { - free(curr); - } - - isfirst = 0; - curr = next; - } - } - - free(ptr); //Free the memory of the old hash table - return 0; + prehashlistnode_t *node, *ptr, *curr, *next; // curr and next keep track of the current and the next chashlistnodes in a linked list + unsigned int oldsize; + int isfirst; // Keeps track of the first element in the prehashlistnode_t for each bin in hashtable + int i,index; + prehashlistnode_t *newnode; + + ptr = pflookup.table; + oldsize = pflookup.size; + + if((node = calloc(newsize, sizeof(prehashlistnode_t))) == NULL) { + printf("Calloc error %s %d\n", __FILE__, __LINE__); + return 1; + } + + pflookup.table = node; //Update the global hashtable upon resize() + pflookup.size = newsize; + pflookup.numelements = 0; + + for(i = 0; i < oldsize; i++) { //Outer loop for each bin in hash table + curr = &ptr[i]; + isfirst = 1; + while (curr != NULL) { //Inner loop to go through linked lists + if (curr->key == 0) { //Exit inner loop if there the first element for a given bin/index is NULL + break; //key = val =0 for element if not present within the hash table + } + next = curr->next; + index = prehashFunction(curr->key); + // Insert into the new table + if(pflookup.table[index].next == NULL && pflookup.table[index].key == 0) { + pflookup.table[index].key = curr->key; + pflookup.table[index].val = curr->val; + pflookup.numelements++; + }else { + if((newnode = calloc(1, sizeof(prehashlistnode_t))) == NULL) { + printf("Calloc error %s, %d\n", __FILE__, __LINE__); + return 1; + } + newnode->key = curr->key; + newnode->val = curr->val; + newnode->next = pflookup.table[index].next; + pflookup.table[index].next = newnode; + pflookup.numelements++; + } + + //free the linked list of prehashlistnode_t if not the first element in the hash table + if (isfirst != 1) { + free(curr); + } + + isfirst = 0; + curr = next; + } + } + + free(ptr); //Free the memory of the old hash table + return 0; } /* Deletes the prefetch Cache */ void prehashDelete() { - int i, isFirst; - prehashlistnode_t *ptr, *curr, *next; - ptr = pflookup.table; - - for(i=0 ; inext; - if(isFirst != 1) { - free(curr); - } - isFirst = 0; - curr = next; - } - } - - free(ptr); + int i, isFirst; + prehashlistnode_t *ptr, *curr, *next; + ptr = pflookup.table; + + for(i=0 ; inext; + if(isFirst != 1) { + free(curr); + } + isFirst = 0; + curr = next; + } + } + + free(ptr); } //Note: This is based on the implementation of the inserting a key in the first position of the hashtable void prehashClear() { int i, isFirstBin; prehashlistnode_t *ptr, *prev, *curr; - + pthread_mutex_lock(&pflookup.lock); ptr = pflookup.table; for(i = 0; i < pflookup.size; i++) { diff --git a/Robust/src/Runtime/DSTM/interface/sockpool.c b/Robust/src/Runtime/DSTM/interface/sockpool.c index e45cdc55..98a537ef 100644 --- a/Robust/src/Runtime/DSTM/interface/sockpool.c +++ b/Robust/src/Runtime/DSTM/interface/sockpool.c @@ -2,7 +2,7 @@ #include #if defined(__i386__) -inline static int test_and_set(volatile unsigned int *addr) { +inline int test_and_set(volatile unsigned int *addr) { int oldval; /* Note: the "xchg" instruction does not need a "lock" prefix */ __asm__ __volatile__("xchgl %0, %1" @@ -10,7 +10,7 @@ inline static int test_and_set(volatile unsigned int *addr) { : "0"(1), "m"(*(addr))); return oldval; } -inline static void UnLock(volatile unsigned int *addr) { +inline void UnLock(volatile unsigned int *addr) { int oldval; /* Note: the "xchg" instruction does not need a "lock" prefix */ __asm__ __volatile__("xchgl %0, %1" diff --git a/Robust/src/Runtime/DSTM/interface/sockpool.h b/Robust/src/Runtime/DSTM/interface/sockpool.h index bc1b4fab..c85d7da8 100644 --- a/Robust/src/Runtime/DSTM/interface/sockpool.h +++ b/Robust/src/Runtime/DSTM/interface/sockpool.h @@ -3,6 +3,8 @@ #include "dstm.h" +int test_and_set(volatile unsigned int *addr); +void UnLock(volatile unsigned int *addr); typedef struct socknode { int sd; diff --git a/Robust/src/Runtime/DSTM/interface/trans.c b/Robust/src/Runtime/DSTM/interface/trans.c index 0fdd0443..d3064030 100644 --- a/Robust/src/Runtime/DSTM/interface/trans.c +++ b/Robust/src/Runtime/DSTM/interface/trans.c @@ -300,6 +300,7 @@ objheader_t *transRead(transrecord_t *record, unsigned int oid) { objcopy = (objheader_t *) objstrAlloc(record->cache, size); memcpy(objcopy, objheader, size); /* Insert into cache's lookup table */ + STATUS(objcopy)=0; chashInsert(record->lookupTable, OID(objheader), objcopy); #ifdef COMPILER return &objcopy[1]; @@ -331,7 +332,7 @@ objheader_t *transRead(transrecord_t *record, unsigned int oid) { printf("Error: Object not found in Remote location %s, %d\n", __FILE__, __LINE__); return NULL; } else { - + STATUS(objcopy)=0; #ifdef COMPILER return &objcopy[1]; #else @@ -344,7 +345,6 @@ objheader_t *transRead(transrecord_t *record, unsigned int oid) { /* This function creates objects in the transaction record */ objheader_t *transCreateObj(transrecord_t *record, unsigned int size) { objheader_t *tmp = (objheader_t *) objstrAlloc(record->cache, (sizeof(objheader_t) + size)); - tmp->notifylist = NULL; OID(tmp) = getNewOID(); tmp->version = 1; tmp->rcount = 1; @@ -853,9 +853,7 @@ void *handleLocalReq(void *threadarg) { objnotfound++; } else { /* If Obj found in machine (i.e. has not moved) */ /* Check if Obj is locked by any previous transaction */ - pthread_mutex_lock(&atomicObjLock); - if ((STATUS((objheader_t *)mobj) & LOCK) == LOCK) { - pthread_mutex_unlock(&atomicObjLock); + if (test_and_set(STATUSPTR(mobj))) { if (version == ((objheader_t *)mobj)->version) { /* If locked then match versions */ v_matchlock++; } else {/* If versions don't match ...HARD ABORT */ @@ -863,9 +861,8 @@ void *handleLocalReq(void *threadarg) { /* Send TRANS_DISAGREE to Coordinator */ localtdata->tdata->recvmsg[localtdata->tdata->thread_id].rcv_status = TRANS_DISAGREE; } - } else {/* If Obj is not locked then lock object */ - STATUS(((objheader_t *)mobj)) |= LOCK; - pthread_mutex_unlock(&atomicObjLock); + } else { + //we're locked /* Save all object oids that are locked on this machine during this transaction request call */ oidlocked[objlocked] = OID(((objheader_t *)mobj)); objlocked++; @@ -944,7 +941,7 @@ int transAbortProcess(local_thread_data_array_t *localtdata) { printf("mhashsearch returns NULL at %s, %d\n", __FILE__, __LINE__); return 1; } - STATUS(((objheader_t *)header)) &= ~(LOCK); + UnLock(STATUSPTR(header)); } return 0; @@ -952,69 +949,65 @@ int transAbortProcess(local_thread_data_array_t *localtdata) { /*This function completes the COMMIT process is the transaction is commiting*/ int transComProcess(local_thread_data_array_t *localtdata) { - objheader_t *header, *tcptr; - int i, nummod, tmpsize, numcreated, numlocked; - unsigned int *oidmod, *oidcreated, *oidlocked; - void *ptrcreate; - - nummod = localtdata->tdata->buffer->f.nummod; - oidmod = localtdata->tdata->buffer->oidmod; - numcreated = localtdata->tdata->buffer->f.numcreated; - oidcreated = localtdata->tdata->buffer->oidcreated; - numlocked = localtdata->transinfo->numlocked; - oidlocked = localtdata->transinfo->objlocked; - - for (i = 0; i < nummod; i++) { - if((header = (objheader_t *) mhashSearch(oidmod[i])) == NULL) { - printf("Error: transComProcess() mhashsearch returns NULL at %s, %d\n", __FILE__, __LINE__); - return 1; - } - /* Copy from transaction cache -> main object store */ - if ((tcptr = ((objheader_t *) chashSearch(localtdata->tdata->rec->lookupTable, oidmod[i]))) == NULL) { - printf("Error: transComProcess() chashSearch returned NULL at %s, %d\n", __FILE__, __LINE__); - return 1; - } - GETSIZE(tmpsize, header); - pthread_mutex_lock(&mainobjstore_mutex); - char *tmptcptr = (char *) tcptr; - memcpy((char*)header+sizeof(objheader_t), (char *)tmptcptr+ sizeof(objheader_t), tmpsize); - header->version += 1; - pthread_mutex_lock(¬ifymutex); - if(header->notifylist != NULL) { - notifyAll(&header->notifylist, OID(header), header->version); - } - pthread_mutex_unlock(¬ifymutex); - pthread_mutex_unlock(&mainobjstore_mutex); - } - /* If object is newly created inside transaction then commit it */ - for (i = 0; i < numcreated; i++) { - if ((header = ((objheader_t *) chashSearch(localtdata->tdata->rec->lookupTable, oidcreated[i]))) == NULL) { - printf("Error: transComProcess() chashSearch returned NULL for oid = %x at %s, %d\n", oidcreated[i], __FILE__, __LINE__); - return 1; - } - GETSIZE(tmpsize, header); - tmpsize += sizeof(objheader_t); - pthread_mutex_lock(&mainobjstore_mutex); - if ((ptrcreate = objstrAlloc(mainobjstore, tmpsize)) == NULL) { - printf("Error: transComProcess() failed objstrAlloc %s, %d\n", __FILE__, __LINE__); - pthread_mutex_unlock(&mainobjstore_mutex); - return 1; - } - pthread_mutex_unlock(&mainobjstore_mutex); - memcpy(ptrcreate, header, tmpsize); - mhashInsert(oidcreated[i], ptrcreate); - lhashInsert(oidcreated[i], myIpAddr); - } - /* Unlock locked objects */ - for(i = 0; i < numlocked; i++) { - if((header = (objheader_t *) mhashSearch(oidlocked[i])) == NULL) { - printf("mhashsearch returns NULL at %s, %d\n", __FILE__, __LINE__); - return 1; - } - STATUS(header) &= ~(LOCK); - } - - return 0; + objheader_t *header, *tcptr; + int i, nummod, tmpsize, numcreated, numlocked; + unsigned int *oidmod, *oidcreated, *oidlocked; + void *ptrcreate; + + nummod = localtdata->tdata->buffer->f.nummod; + oidmod = localtdata->tdata->buffer->oidmod; + numcreated = localtdata->tdata->buffer->f.numcreated; + oidcreated = localtdata->tdata->buffer->oidcreated; + numlocked = localtdata->transinfo->numlocked; + oidlocked = localtdata->transinfo->objlocked; + + for (i = 0; i < nummod; i++) { + if((header = (objheader_t *) mhashSearch(oidmod[i])) == NULL) { + printf("Error: transComProcess() mhashsearch returns NULL at %s, %d\n", __FILE__, __LINE__); + return 1; + } + /* Copy from transaction cache -> main object store */ + if ((tcptr = ((objheader_t *) chashSearch(localtdata->tdata->rec->lookupTable, oidmod[i]))) == NULL) { + printf("Error: transComProcess() chashSearch returned NULL at %s, %d\n", __FILE__, __LINE__); + return 1; + } + GETSIZE(tmpsize, header); + char *tmptcptr = (char *) tcptr; + memcpy((char*)header+sizeof(objheader_t), (char *)tmptcptr+ sizeof(objheader_t), tmpsize); + header->version += 1; + if(header->notifylist != NULL) { + notifyAll(&header->notifylist, OID(header), header->version); + } + } + /* If object is newly created inside transaction then commit it */ + for (i = 0; i < numcreated; i++) { + if ((header = ((objheader_t *) chashSearch(localtdata->tdata->rec->lookupTable, oidcreated[i]))) == NULL) { + printf("Error: transComProcess() chashSearch returned NULL for oid = %x at %s, %d\n", oidcreated[i], __FILE__, __LINE__); + return 1; + } + GETSIZE(tmpsize, header); + tmpsize += sizeof(objheader_t); + pthread_mutex_lock(&mainobjstore_mutex); + if ((ptrcreate = objstrAlloc(mainobjstore, tmpsize)) == NULL) { + printf("Error: transComProcess() failed objstrAlloc %s, %d\n", __FILE__, __LINE__); + pthread_mutex_unlock(&mainobjstore_mutex); + return 1; + } + pthread_mutex_unlock(&mainobjstore_mutex); + memcpy(ptrcreate, header, tmpsize); + mhashInsert(oidcreated[i], ptrcreate); + lhashInsert(oidcreated[i], myIpAddr); + } + /* Unlock locked objects */ + for(i = 0; i < numlocked; i++) { + if((header = (objheader_t *) mhashSearch(oidlocked[i])) == NULL) { + printf("mhashsearch returns NULL at %s, %d\n", __FILE__, __LINE__); + return 1; + } + UnLock(STATUSPTR(header)); + } + + return 0; } prefetchpile_t *foundLocal(char *ptr) { @@ -1214,7 +1207,8 @@ int getPrefetchResponse(int sd) { } pthread_mutex_unlock(&prefetchcache_mutex); memcpy(modptr, recvbuffer + sizeof(char) + sizeof(unsigned int), size); - + STATUS(modptr)=0; + /* Insert the oid and its address into the prefetch hash lookup table */ /* Do a version comparison if the oid exists */ if((oldptr = prehashSearch(oid)) != NULL) { @@ -1416,103 +1410,103 @@ int findHost(unsigned int hostIp) /* This function sends notification request per thread waiting on object(s) whose version * changes */ int reqNotify(unsigned int *oidarry, unsigned short *versionarry, unsigned int numoid) { - int sock,i; - objheader_t *objheader; - struct sockaddr_in remoteAddr; - char msg[1 + numoid * (sizeof(unsigned short) + sizeof(unsigned int)) + 3 * sizeof(unsigned int)]; - char *ptr; - int bytesSent; - int status, size; - unsigned short version; - unsigned int oid,mid; - static unsigned int threadid = 0; - pthread_mutex_t threadnotify = PTHREAD_MUTEX_INITIALIZER; //Lock and condition var for threadjoin and notification - pthread_cond_t threadcond = PTHREAD_COND_INITIALIZER; - notifydata_t *ndata; - - oid = oidarry[0]; - if((mid = lhashSearch(oid)) == 0) { - printf("Error: %s() No such machine found for oid =%x\n",__func__, oid); - return; - } - - if ((sock = socket(AF_INET, SOCK_STREAM, 0)) < 0){ - perror("reqNotify():socket()"); - return -1; - } - - bzero(&remoteAddr, sizeof(remoteAddr)); - remoteAddr.sin_family = AF_INET; - remoteAddr.sin_port = htons(LISTEN_PORT); - remoteAddr.sin_addr.s_addr = htonl(mid); - - /* Generate unique threadid */ - threadid++; - - /* Save threadid, numoid, oidarray, versionarray, pthread_cond_variable for later processing */ - if((ndata = calloc(1, sizeof(notifydata_t))) == NULL) { - printf("Calloc Error %s, %d\n", __FILE__, __LINE__); - return -1; - } - ndata->numoid = numoid; - ndata->threadid = threadid; - ndata->oidarry = oidarry; - ndata->versionarry = versionarry; - ndata->threadcond = threadcond; - ndata->threadnotify = threadnotify; - if((status = notifyhashInsert(threadid, ndata)) != 0) { - printf("reqNotify(): Insert into notify hash table not successful %s, %d\n", __FILE__, __LINE__); - free(ndata); - return -1; - } - - /* Send number of oids, oidarry, version array, machine id and threadid */ - if (connect(sock, (struct sockaddr *)&remoteAddr, sizeof(remoteAddr)) < 0){ - printf("reqNotify():error %d connecting to %s:%d\n", errno, - inet_ntoa(remoteAddr.sin_addr), LISTEN_PORT); - free(ndata); - return -1; - } else { - msg[0] = THREAD_NOTIFY_REQUEST; - *((unsigned int *)(&msg[1])) = numoid; - /* Send array of oids */ - size = sizeof(unsigned int); - { - i = 0; - while(i < numoid) { - oid = oidarry[i]; - *((unsigned int *)(&msg[1] + size)) = oid; - size += sizeof(unsigned int); - i++; - } - } - - /* Send array of version */ - { - i = 0; - while(i < numoid) { - version = versionarry[i]; - *((unsigned short *)(&msg[1] + size)) = version; - size += sizeof(unsigned short); - i++; - } - } - - *((unsigned int *)(&msg[1] + size)) = myIpAddr; - size += sizeof(unsigned int); - *((unsigned int *)(&msg[1] + size)) = threadid; - pthread_mutex_lock(&(ndata->threadnotify)); - size = 1 + numoid * (sizeof(unsigned int) + sizeof(unsigned short)) + 3 * sizeof(unsigned int); - send_data(sock, msg, size); - pthread_cond_wait(&(ndata->threadcond), &(ndata->threadnotify)); - pthread_mutex_unlock(&(ndata->threadnotify)); - } - - pthread_cond_destroy(&threadcond); - pthread_mutex_destroy(&threadnotify); - free(ndata); - close(sock); - return status; + int sock,i; + objheader_t *objheader; + struct sockaddr_in remoteAddr; + char msg[1 + numoid * (sizeof(unsigned short) + sizeof(unsigned int)) + 3 * sizeof(unsigned int)]; + char *ptr; + int bytesSent; + int status, size; + unsigned short version; + unsigned int oid,mid; + static unsigned int threadid = 0; + pthread_mutex_t threadnotify = PTHREAD_MUTEX_INITIALIZER; //Lock and condition var for threadjoin and notification + pthread_cond_t threadcond = PTHREAD_COND_INITIALIZER; + notifydata_t *ndata; + + oid = oidarry[0]; + if((mid = lhashSearch(oid)) == 0) { + printf("Error: %s() No such machine found for oid =%x\n",__func__, oid); + return; + } + + if ((sock = socket(AF_INET, SOCK_STREAM, 0)) < 0){ + perror("reqNotify():socket()"); + return -1; + } + + bzero(&remoteAddr, sizeof(remoteAddr)); + remoteAddr.sin_family = AF_INET; + remoteAddr.sin_port = htons(LISTEN_PORT); + remoteAddr.sin_addr.s_addr = htonl(mid); + + /* Generate unique threadid */ + threadid++; + + /* Save threadid, numoid, oidarray, versionarray, pthread_cond_variable for later processing */ + if((ndata = calloc(1, sizeof(notifydata_t))) == NULL) { + printf("Calloc Error %s, %d\n", __FILE__, __LINE__); + return -1; + } + ndata->numoid = numoid; + ndata->threadid = threadid; + ndata->oidarry = oidarry; + ndata->versionarry = versionarry; + ndata->threadcond = threadcond; + ndata->threadnotify = threadnotify; + if((status = notifyhashInsert(threadid, ndata)) != 0) { + printf("reqNotify(): Insert into notify hash table not successful %s, %d\n", __FILE__, __LINE__); + free(ndata); + return -1; + } + + /* Send number of oids, oidarry, version array, machine id and threadid */ + if (connect(sock, (struct sockaddr *)&remoteAddr, sizeof(remoteAddr)) < 0){ + printf("reqNotify():error %d connecting to %s:%d\n", errno, + inet_ntoa(remoteAddr.sin_addr), LISTEN_PORT); + free(ndata); + return -1; + } else { + msg[0] = THREAD_NOTIFY_REQUEST; + *((unsigned int *)(&msg[1])) = numoid; + /* Send array of oids */ + size = sizeof(unsigned int); + { + i = 0; + while(i < numoid) { + oid = oidarry[i]; + *((unsigned int *)(&msg[1] + size)) = oid; + size += sizeof(unsigned int); + i++; + } + } + + /* Send array of version */ + { + i = 0; + while(i < numoid) { + version = versionarry[i]; + *((unsigned short *)(&msg[1] + size)) = version; + size += sizeof(unsigned short); + i++; + } + } + + *((unsigned int *)(&msg[1] + size)) = myIpAddr; + size += sizeof(unsigned int); + *((unsigned int *)(&msg[1] + size)) = threadid; + pthread_mutex_lock(&(ndata->threadnotify)); + size = 1 + numoid * (sizeof(unsigned int) + sizeof(unsigned short)) + 3 * sizeof(unsigned int); + send_data(sock, msg, size); + pthread_cond_wait(&(ndata->threadcond), &(ndata->threadnotify)); + pthread_mutex_unlock(&(ndata->threadnotify)); + } + + pthread_cond_destroy(&threadcond); + pthread_mutex_destroy(&threadnotify); + free(ndata); + close(sock); + return status; } void threadNotify(unsigned int oid, unsigned short version, unsigned int tid) { @@ -1551,50 +1545,50 @@ void threadNotify(unsigned int oid, unsigned short version, unsigned int tid) { } int notifyAll(threadlist_t **head, unsigned int oid, unsigned int version) { - threadlist_t *ptr; - unsigned int mid; - struct sockaddr_in remoteAddr; - char msg[1 + sizeof(unsigned short) + 2*sizeof(unsigned int)]; - int sock, status, size, bytesSent; - - while(*head != NULL) { - ptr = *head; - mid = ptr->mid; - //create a socket connection to that machine - if ((sock = socket(AF_INET, SOCK_STREAM, 0)) < 0){ - perror("notifyAll():socket()"); - return -1; - } - - bzero(&remoteAddr, sizeof(remoteAddr)); - remoteAddr.sin_family = AF_INET; - remoteAddr.sin_port = htons(LISTEN_PORT); - remoteAddr.sin_addr.s_addr = htonl(mid); - //send Thread Notify response and threadid to that machine - if (connect(sock, (struct sockaddr *)&remoteAddr, sizeof(remoteAddr)) < 0){ - printf("notifyAll():error %d connecting to %s:%d\n", errno, - inet_ntoa(remoteAddr.sin_addr), LISTEN_PORT); - status = -1; - fflush(stdout); - } else { - bzero(msg, (1+sizeof(unsigned short) + 2*sizeof(unsigned int))); - msg[0] = THREAD_NOTIFY_RESPONSE; - *((unsigned int *)&msg[1]) = oid; - size = sizeof(unsigned int); - *((unsigned short *)(&msg[1]+ size)) = version; - size+= sizeof(unsigned short); - *((unsigned int *)(&msg[1]+ size)) = ptr->threadid; - - size = 1 + 2*sizeof(unsigned int) + sizeof(unsigned short); - send_data(sock, msg, size); - } - //close socket - close(sock); - // Update head - *head = ptr->next; - free(ptr); - } - return status; + threadlist_t *ptr; + unsigned int mid; + struct sockaddr_in remoteAddr; + char msg[1 + sizeof(unsigned short) + 2*sizeof(unsigned int)]; + int sock, status, size, bytesSent; + + while(*head != NULL) { + ptr = *head; + mid = ptr->mid; + //create a socket connection to that machine + if ((sock = socket(AF_INET, SOCK_STREAM, 0)) < 0){ + perror("notifyAll():socket()"); + return -1; + } + + bzero(&remoteAddr, sizeof(remoteAddr)); + remoteAddr.sin_family = AF_INET; + remoteAddr.sin_port = htons(LISTEN_PORT); + remoteAddr.sin_addr.s_addr = htonl(mid); + //send Thread Notify response and threadid to that machine + if (connect(sock, (struct sockaddr *)&remoteAddr, sizeof(remoteAddr)) < 0){ + printf("notifyAll():error %d connecting to %s:%d\n", errno, + inet_ntoa(remoteAddr.sin_addr), LISTEN_PORT); + status = -1; + fflush(stdout); + } else { + bzero(msg, (1+sizeof(unsigned short) + 2*sizeof(unsigned int))); + msg[0] = THREAD_NOTIFY_RESPONSE; + *((unsigned int *)&msg[1]) = oid; + size = sizeof(unsigned int); + *((unsigned short *)(&msg[1]+ size)) = version; + size+= sizeof(unsigned short); + *((unsigned int *)(&msg[1]+ size)) = ptr->threadid; + + size = 1 + 2*sizeof(unsigned int) + sizeof(unsigned short); + send_data(sock, msg, size); + } + //close socket + close(sock); + // Update head + *head = ptr->next; + free(ptr); + } + return status; } void transAbort(transrecord_t *trans) { -- 2.34.1