#define STATUS(x)\
*((unsigned int *) &(((struct ___Object___ *)((unsigned int) x + sizeof(objheader_t)))->___localcopy___))
+#define STATUSPTR(x)\
+ ((unsigned int *) &(((struct ___Object___ *)((unsigned int) x + sizeof(objheader_t)))->___localcopy___))
+
#define TYPE(x)\
((struct ___Object___ *)((unsigned int) x + sizeof(objheader_t)))->type
#define OID(x) x->oid
#define TYPE(x) x->type
#define STATUS(x) x->status
+#define STATUSPTR(x) &x->status
#define GETSIZE(size, x) size=classsize[TYPE(x)]
#endif
char control; /* control message */
char trans_id[TID_LEN]; /* transaction id */
int mcount; /* participant count */
- short numread; /* no of objects read */
- short nummod; /* no of objects modified */
- short numcreated; /* no of objects created */
+ unsigned int numread; /* no of objects read */
+ unsigned int nummod; /* no of objects modified */
+ unsigned int numcreated; /* no of objects created */
int sum_bytes; /* total bytes of modified objects in a transaction */
} fixed_data_t;
printf("mhashSearch returns NULL at %s, %d\n", __FILE__, __LINE__);// find the header address
return 1;
}
- STATUS(((objheader_t *)header)) &= ~(LOCK);
+ UnLock(STATUSPTR(header));
}
/* Send ack to Coordinator */
objnotfound++;
} else { /* If Obj found in machine (i.e. has not moved) */
/* Check if Obj is locked by any previous transaction */
- pthread_mutex_lock(&lockObjHeader);
- if ((STATUS(((objheader_t *)mobj)) & LOCK) == LOCK) {
- pthread_mutex_unlock(&lockObjHeader);
+ if (test_and_set(STATUSPTR(mobj))) {
+ //don't have lock
if (version == ((objheader_t *)mobj)->version) { /* If locked then match versions */
v_matchlock++;
} else {/* If versions don't match ...HARD ABORT */
printf("mhashSearch returns NULL at %s, %d\n", __FILE__, __LINE__);
return 0;
}
- STATUS(headptr) &= ~(LOCK);
+ UnLock(STATUSPTR(headptr));
}
free(oidlocked);
}
return control;
}
} else {/* If Obj is not locked then lock object */
- STATUS(((objheader_t *)mobj)) |= LOCK;
- pthread_mutex_unlock(&lockObjHeader);
/* Save all object oids that are locked on this machine during this transaction request call */
oidlocked[objlocked] = OID(((objheader_t *)mobj));
objlocked++;
printf("mhashSearch returns NULL at %s, %d\n", __FILE__, __LINE__);
return 0;
}
- STATUS(headptr) &= ~(LOCK);
+ UnLock(STATUSPTR(headptr));
}
free(oidlocked);
}
* addresses in lookup table and also changes version number
* Sends an ACK back to Coordinator */
int transCommitProcess(void *modptr, unsigned int *oidmod, unsigned int *oidlocked, int nummod, int numlocked, int acceptfd) {
- objheader_t *header;
- objheader_t *newheader;
- int i = 0, offset = 0;
- char control;
- int tmpsize;
-
- /* Process each modified object saved in the mainobject store */
- for(i = 0; i < nummod; i++) {
- if((header = (objheader_t *) mhashSearch(oidmod[i])) == NULL) {
- printf("Error: mhashsearch returns NULL at %s, %d\n", __FILE__, __LINE__);
- return 1;
- }
- GETSIZE(tmpsize,header);
- memcpy((char*)header + sizeof(objheader_t), ((char *)modptr + sizeof(objheader_t) + offset), tmpsize);
- header->version += 1;
- /* If threads are waiting on this object to be updated, notify them */
- pthread_mutex_lock(¬ifymutex);
- if(header->notifylist != NULL) {
- notifyAll(&header->notifylist, OID(header), header->version);
- }
- pthread_mutex_unlock(¬ifymutex);
- offset += sizeof(objheader_t) + tmpsize;
- }
-
- if (nummod > 0)
- free(modptr);
-
- /* Unlock locked objects */
- for(i = 0; i < numlocked; i++) {
- if((header = (objheader_t *) mhashSearch(oidlocked[i])) == NULL) {
- printf("Error: mhashsearch returns NULL at %s, %d\n", __FILE__, __LINE__);
- return 1;
- }
- STATUS(header) &= ~(LOCK);
- }
- //TODO Update location lookup table
-
- /* Send ack to coordinator */
- control = TRANS_SUCESSFUL;
- send_data((int)acceptfd, &control, sizeof(char));
- return 0;
+ objheader_t *header;
+ objheader_t *newheader;
+ int i = 0, offset = 0;
+ char control;
+ int tmpsize;
+
+ /* Process each modified object saved in the mainobject store */
+ for(i = 0; i < nummod; i++) {
+ if((header = (objheader_t *) mhashSearch(oidmod[i])) == NULL) {
+ printf("Error: mhashsearch returns NULL at %s, %d\n", __FILE__, __LINE__);
+ return 1;
+ }
+ GETSIZE(tmpsize,header);
+ memcpy((char*)header + sizeof(objheader_t), ((char *)modptr + sizeof(objheader_t) + offset), tmpsize);
+ header->version += 1;
+ /* If threads are waiting on this object to be updated, notify them */
+ if(header->notifylist != NULL) {
+ notifyAll(&header->notifylist, OID(header), header->version);
+ }
+ offset += sizeof(objheader_t) + tmpsize;
+ }
+
+ if (nummod > 0)
+ free(modptr);
+
+ /* Unlock locked objects */
+ for(i = 0; i < numlocked; i++) {
+ if((header = (objheader_t *) mhashSearch(oidlocked[i])) == NULL) {
+ printf("Error: mhashsearch returns NULL at %s, %d\n", __FILE__, __LINE__);
+ return 1;
+ }
+ UnLock(STATUSPTR(header));
+ }
+ //TODO Update location lookup table
+
+ /* Send ack to coordinator */
+ control = TRANS_SUCESSFUL;
+ send_data((int)acceptfd, &control, sizeof(char));
+ return 0;
}
/* This function recevies the oid and offset tuples from the Coordinator's prefetch call.
}
void processReqNotify(unsigned int numoid, unsigned int *oidarry, unsigned short *versionarry, unsigned int mid, unsigned int threadid) {
- objheader_t *header;
- unsigned int oid;
- unsigned short newversion;
- char msg[1+ 2 * sizeof(unsigned int) + sizeof(unsigned short)];
- int sd;
- struct sockaddr_in remoteAddr;
- int bytesSent;
- int size;
- int i = 0;
-
- while(i < numoid) {
- oid = *(oidarry + i);
- if((header = (objheader_t *) mhashSearch(oid)) == NULL) {
- printf("Error: mhashsearch returns NULL at %s, %d\n", __FILE__, __LINE__);
- return;
- } else {
- /* Check to see if versions are same */
-checkversion:
- if ((STATUS(header) & LOCK) != LOCK) {
- pthread_mutex_lock(¬ifymutex);
- STATUS(header) |= LOCK;
- newversion = header->version;
- if(newversion == *(versionarry + i)) {
- //Add to the notify list
- if((header->notifylist = insNode(header->notifylist, threadid, mid)) == NULL) {
- printf("Error: Obj notify list points to NULL %s, %d\n", __FILE__, __LINE__);
- pthread_mutex_unlock(¬ifymutex);
- return;
- }
- STATUS(header) &= ~(LOCK);
- pthread_mutex_unlock(¬ifymutex);
- } else {
- STATUS(header) &= ~(LOCK);
- pthread_mutex_unlock(¬ifymutex);
- if ((sd = socket(AF_INET, SOCK_STREAM, 0)) < 0){
- perror("processReqNotify():socket()");
- return;
- }
- bzero(&remoteAddr, sizeof(remoteAddr));
- remoteAddr.sin_family = AF_INET;
- remoteAddr.sin_port = htons(LISTEN_PORT);
- remoteAddr.sin_addr.s_addr = htonl(mid);
-
- if (connect(sd, (struct sockaddr *)&remoteAddr, sizeof(remoteAddr)) < 0){
- printf("Error: processReqNotify():error %d connecting to %s:%d\n", errno,
- inet_ntoa(remoteAddr.sin_addr), LISTEN_PORT);
- close(sd);
- return;
- } else {
- //Send Update notification
- msg[0] = THREAD_NOTIFY_RESPONSE;
- *((unsigned int *)&msg[1]) = oid;
- size = sizeof(unsigned int);
- *((unsigned short *)(&msg[1]+size)) = newversion;
- size += sizeof(unsigned short);
- *((unsigned int *)(&msg[1]+size)) = threadid;
- size = 1+ 2*sizeof(unsigned int) + sizeof(unsigned short);
- send_data(sd, msg, size);
- }
- close(sd);
- }
- } else {
- randomdelay();
- goto checkversion;
- }
- }
- i++;
+ objheader_t *header;
+ unsigned int oid;
+ unsigned short newversion;
+ char msg[1+ 2 * sizeof(unsigned int) + sizeof(unsigned short)];
+ int sd;
+ struct sockaddr_in remoteAddr;
+ int bytesSent;
+ int size;
+ int i = 0;
+
+ while(i < numoid) {
+ oid = *(oidarry + i);
+ if((header = (objheader_t *) mhashSearch(oid)) == NULL) {
+ printf("Error: mhashsearch returns NULL at %s, %d\n", __FILE__, __LINE__);
+ return;
+ } else {
+ /* Check to see if versions are same */
+ checkversion:
+ if (test_and_set(STATUSPTR(header))==0) {
+ //have lock
+ newversion = header->version;
+ if(newversion == *(versionarry + i)) {
+ //Add to the notify list
+ if((header->notifylist = insNode(header->notifylist, threadid, mid)) == NULL) {
+ printf("Error: Obj notify list points to NULL %s, %d\n", __FILE__, __LINE__);
+ return;
+ }
+ UnLock(STATUSPTR(header));
+ } else {
+ UnLock(STATUSPTR(header));
+ if ((sd = socket(AF_INET, SOCK_STREAM, 0)) < 0){
+ perror("processReqNotify():socket()");
+ return;
+ }
+ bzero(&remoteAddr, sizeof(remoteAddr));
+ remoteAddr.sin_family = AF_INET;
+ remoteAddr.sin_port = htons(LISTEN_PORT);
+ remoteAddr.sin_addr.s_addr = htonl(mid);
+
+ if (connect(sd, (struct sockaddr *)&remoteAddr, sizeof(remoteAddr)) < 0){
+ printf("Error: processReqNotify():error %d connecting to %s:%d\n", errno,
+ inet_ntoa(remoteAddr.sin_addr), LISTEN_PORT);
+ close(sd);
+ return;
+ } else {
+ //Send Update notification
+ msg[0] = THREAD_NOTIFY_RESPONSE;
+ *((unsigned int *)&msg[1]) = oid;
+ size = sizeof(unsigned int);
+ *((unsigned short *)(&msg[1]+size)) = newversion;
+ size += sizeof(unsigned short);
+ *((unsigned int *)(&msg[1]+size)) = threadid;
+ size = 1+ 2*sizeof(unsigned int) + sizeof(unsigned short);
+ send_data(sd, msg, size);
+ }
+ close(sd);
}
- free(oidarry);
- free(versionarry);
+ } else {
+ randomdelay();
+ goto checkversion;
+ }
+ }
+ i++;
+ }
+ free(oidarry);
+ free(versionarry);
}
}
//free entire list, starting at store
-void objstrDelete(objstr_t *store)
-{
- objstr_t *tmp;
- while (store != NULL)
- {
- tmp = store->next;
- free(store);
- store = tmp;
- }
- return;
+void objstrDelete(objstr_t *store) {
+ objstr_t *tmp;
+ while (store != NULL) {
+ tmp = store->next;
+ free(store);
+ store = tmp;
+ }
+ return;
}
-void *objstrAlloc(objstr_t *store, unsigned int size)
-{
- void *tmp;
- while (1)
- {
- if (((unsigned int)store->top - (unsigned int)store - sizeof(objstr_t) + size) <= store->size)
- { //store not full
- tmp = store->top;
- store->top += size;
- return tmp;
- }
- //store full
- if (store->next == NULL)
- { //end of list, all full
- if (size > DEFAULT_OBJ_STORE_SIZE) //in case of large objects
- {
- if((store->next = (objstr_t *)calloc(1,(sizeof(objstr_t) + size))) == NULL) {
- printf("%s() Calloc error at line %d, %s\n", __func__, __LINE__, __FILE__);
- return NULL;
- }
- if (store->next == NULL)
- return NULL;
- store = store->next;
- store->size = size;
- }
- else
- {
- if((store->next = calloc(1,(sizeof(objstr_t) + DEFAULT_OBJ_STORE_SIZE))) == NULL) {
- printf("%s() Calloc error at line %d, %s\n", __func__, __LINE__, __FILE__);
- return NULL;
- }
- if (store->next == NULL)
- return NULL;
- store = store->next;
- store->next = NULL;
- store->size = DEFAULT_OBJ_STORE_SIZE;
- }
- store->top = (void *)((unsigned int)store + sizeof(objstr_t) + size);
- return (void *)((unsigned int)store + sizeof(objstr_t));
- }
- else //try the next one
- store = store->next;
+void *objstrAlloc(objstr_t *store, unsigned int size) {
+ void *tmp;
+ while (1) {
+ if (((unsigned int)store->top - (((unsigned int)store) + sizeof(objstr_t)) + size) <= store->size) { //store not full
+ tmp = store->top;
+ store->top += size;
+ return tmp;
+ }
+ //store full
+ if (store->next == NULL) {
+ //end of list, all full
+ if (size > DEFAULT_OBJ_STORE_SIZE) {
+ //in case of large objects
+ if((store->next = (objstr_t *)calloc(1,(sizeof(objstr_t) + size))) == NULL) {
+ printf("%s() Calloc error at line %d, %s\n", __func__, __LINE__, __FILE__);
+ return NULL;
}
+ store = store->next;
+ store->size = size;
+ } else {
+ if((store->next = calloc(1,(sizeof(objstr_t) + DEFAULT_OBJ_STORE_SIZE))) == NULL) {
+ printf("%s() Calloc error at line %d, %s\n", __func__, __LINE__, __FILE__);
+ return NULL;
+ }
+ store = store->next;
+ store->size = DEFAULT_OBJ_STORE_SIZE;
+ }
+ store->top = (void *)(((unsigned int)store) + sizeof(objstr_t) + size);
+ return (void *)(((unsigned int)store) + sizeof(objstr_t));
+ } else
+ store = store->next;
+ }
}
/* This function inserts necessary information into
* a machine pile data structure */
plistnode_t *pInsert(plistnode_t *pile, objheader_t *headeraddr, unsigned int mid, int num_objs) {
- plistnode_t *ptr, *tmp;
- int found = 0, offset = 0;
-
- tmp = pile;
- //Add oid into a machine that is already present in the pile linked list structure
- while(tmp != NULL) {
- if (tmp->mid == mid) {
- int tmpsize;
-
- if (STATUS(headeraddr) & NEW) {
- tmp->oidcreated[tmp->numcreated] = OID(headeraddr);
- tmp->numcreated = tmp->numcreated + 1;
- GETSIZE(tmpsize, headeraddr);
- tmp->sum_bytes += sizeof(objheader_t) + tmpsize;
- }else if (STATUS(headeraddr) & DIRTY) {
- tmp->oidmod[tmp->nummod] = OID(headeraddr);
- tmp->nummod = tmp->nummod + 1;
- GETSIZE(tmpsize, headeraddr);
- tmp->sum_bytes += sizeof(objheader_t) + tmpsize;
- } else {
- offset = (sizeof(unsigned int) + sizeof(short)) * tmp->numread;
- *((unsigned int *)(((char *)tmp->objread) + offset))=OID(headeraddr);
- offset += sizeof(unsigned int);
- *((short *)(((char *)tmp->objread) + offset)) = headeraddr->version;
- tmp->numread = tmp->numread + 1;
- }
- found = 1;
- break;
- }
- tmp = tmp->next;
- }
- //Add oid for any new machine
- if (!found) {
- int tmpsize;
- if((ptr = pCreate(num_objs)) == NULL) {
- return NULL;
- }
- ptr->mid = mid;
- if (STATUS(headeraddr) & NEW) {
- ptr->oidcreated[ptr->numcreated] = OID(headeraddr);
- ptr->numcreated = ptr->numcreated + 1;
- GETSIZE(tmpsize, headeraddr);
- ptr->sum_bytes += sizeof(objheader_t) + tmpsize;
- } else if (STATUS(headeraddr) & DIRTY) {
- ptr->oidmod[ptr->nummod] = OID(headeraddr);
- ptr->nummod = ptr->nummod + 1;
- GETSIZE(tmpsize, headeraddr);
- ptr->sum_bytes += sizeof(objheader_t) + tmpsize;
- } else {
- *((unsigned int *)ptr->objread)=OID(headeraddr);
- offset = sizeof(unsigned int);
- *((short *)(((char *)ptr->objread) + offset)) = headeraddr->version;
- ptr->numread = ptr->numread + 1;
- }
- ptr->next = pile;
- pile = ptr;
- }
-
- /* Clear Flags */
- STATUS(headeraddr) &= ~NEW;
- STATUS(headeraddr) &= ~DIRTY;
-
- return pile;
+ plistnode_t *ptr, *tmp;
+ int found = 0, offset = 0;
+
+ tmp = pile;
+ //Add oid into a machine that is already present in the pile linked list structure
+ while(tmp != NULL) {
+ if (tmp->mid == mid) {
+ int tmpsize;
+
+ if (STATUS(headeraddr) & NEW) {
+ tmp->oidcreated[tmp->numcreated] = OID(headeraddr);
+ tmp->numcreated++;
+ GETSIZE(tmpsize, headeraddr);
+ tmp->sum_bytes += sizeof(objheader_t) + tmpsize;
+ }else if (STATUS(headeraddr) & DIRTY) {
+ tmp->oidmod[tmp->nummod] = OID(headeraddr);
+ tmp->nummod++;
+ GETSIZE(tmpsize, headeraddr);
+ tmp->sum_bytes += sizeof(objheader_t) + tmpsize;
+ } else {
+ offset = (sizeof(unsigned int) + sizeof(short)) * tmp->numread;
+ *((unsigned int *)(((char *)tmp->objread) + offset))=OID(headeraddr);
+ offset += sizeof(unsigned int);
+ *((short *)(((char *)tmp->objread) + offset)) = headeraddr->version;
+ tmp->numread ++;
+ }
+ found = 1;
+ break;
+ }
+ tmp = tmp->next;
+ }
+ //Add oid for any new machine
+ if (!found) {
+ int tmpsize;
+ if((ptr = pCreate(num_objs)) == NULL) {
+ return NULL;
+ }
+ ptr->mid = mid;
+ if (STATUS(headeraddr) & NEW) {
+ ptr->oidcreated[ptr->numcreated] = OID(headeraddr);
+ ptr->numcreated ++;
+ GETSIZE(tmpsize, headeraddr);
+ ptr->sum_bytes += sizeof(objheader_t) + tmpsize;
+ } else if (STATUS(headeraddr) & DIRTY) {
+ ptr->oidmod[ptr->nummod] = OID(headeraddr);
+ ptr->nummod ++;
+ GETSIZE(tmpsize, headeraddr);
+ ptr->sum_bytes += sizeof(objheader_t) + tmpsize;
+ } else {
+ *((unsigned int *)ptr->objread)=OID(headeraddr);
+ offset = sizeof(unsigned int);
+ *((short *)(((char *)ptr->objread) + offset)) = headeraddr->version;
+ ptr->numread ++;
+ }
+ ptr->next = pile;
+ pile = ptr;
+ }
+
+ /* Clear Flags */
+ STATUS(headeraddr) =0;
+
+ return pile;
}
//Count the number of machine piles
* participants involved in a transaction. */
typedef struct plistnode {
unsigned int mid;
- short numread; /* no of objects modified */
- short nummod; /* no of objects read */
- short numcreated; /* no of objects created */
+ unsigned int numread; /* no of objects modified */
+ unsigned int nummod; /* no of objects read */
+ unsigned int numcreated; /* no of objects created */
int sum_bytes; /* total bytes of objects modified */
char *objread; /* Pointer to array containing oids of objects read and their version numbers*/
unsigned int *oidmod; /* Pointer to array containing oids of modified objects */
prehashtable_t pflookup; //Global prefetch cache table
unsigned int prehashCreate(unsigned int size, float loadfactor) {
- prehashlistnode_t *nodes;
- int i;
-
- // Allocate space for the hash table
- if((nodes = calloc(size, sizeof(prehashlistnode_t))) == NULL) {
- printf("Calloc error %s %d\n", __FILE__, __LINE__);
- return 1;
- }
-
- pflookup.table = nodes;
- pflookup.size = size;
- pflookup.numelements = 0; // Initial number of elements in the hash
- pflookup.loadfactor = loadfactor;
-
- //Intiliaze and set prefetch table mutex attribute
- pthread_mutexattr_init(&pflookup.prefetchmutexattr);
- //NOTE:PTHREAD_MUTEX_RECURSIVE is currently inside a #if_def UNIX98 in the pthread.h file
- //Therefore use PTHREAD_MUTEX_RECURSIVE_NP instead
- pthread_mutexattr_settype(&pflookup.prefetchmutexattr, PTHREAD_MUTEX_RECURSIVE_NP);
-
- //Initialize mutex var
- pthread_mutex_init(&pflookup.lock, &pflookup.prefetchmutexattr);
- //pthread_mutex_init(&pflookup.lock, NULL);
- pthread_cond_init(&pflookup.cond, NULL);
- return 0;
+ prehashlistnode_t *nodes;
+ int i;
+
+ // Allocate space for the hash table
+ if((nodes = calloc(size, sizeof(prehashlistnode_t))) == NULL) {
+ printf("Calloc error %s %d\n", __FILE__, __LINE__);
+ return 1;
+ }
+
+ pflookup.table = nodes;
+ pflookup.size = size;
+ pflookup.numelements = 0; // Initial number of elements in the hash
+ pflookup.loadfactor = loadfactor;
+
+ //Intiliaze and set prefetch table mutex attribute
+ pthread_mutexattr_init(&pflookup.prefetchmutexattr);
+ //NOTE:PTHREAD_MUTEX_RECURSIVE is currently inside a #if_def UNIX98 in the pthread.h file
+ //Therefore use PTHREAD_MUTEX_RECURSIVE_NP instead
+ pthread_mutexattr_settype(&pflookup.prefetchmutexattr, PTHREAD_MUTEX_RECURSIVE_NP);
+
+ //Initialize mutex var
+ pthread_mutex_init(&pflookup.lock, &pflookup.prefetchmutexattr);
+ //pthread_mutex_init(&pflookup.lock, NULL);
+ pthread_cond_init(&pflookup.cond, NULL);
+ return 0;
}
//Assign keys to bins inside hash table
unsigned int prehashFunction(unsigned int key) {
- return ( key % (pflookup.size));
+ return ( key % (pflookup.size));
}
//Store oids and their pointers into hash
unsigned int prehashInsert(unsigned int key, void *val) {
- unsigned int newsize;
- int index;
- prehashlistnode_t *ptr, *node;
-
- if(pflookup.numelements > (pflookup.loadfactor * pflookup.size)) {
- //Resize
- newsize = 2 * pflookup.size + 1;
- pthread_mutex_lock(&pflookup.lock);
- prehashResize(newsize);
- pthread_mutex_unlock(&pflookup.lock);
- }
-
- ptr = pflookup.table;
- pflookup.numelements++;
- index = prehashFunction(key);
-
- pthread_mutex_lock(&pflookup.lock);
- if(ptr[index].next == NULL && ptr[index].key == 0) { // Insert at the first position in the hashtable
- ptr[index].key = key;
- ptr[index].val = val;
- } else { // Insert in the beginning of linked list
- if ((node = calloc(1, sizeof(prehashlistnode_t))) == NULL) {
- printf("Calloc error %s, %d\n", __FILE__, __LINE__);
- pthread_mutex_unlock(&pflookup.lock);
- return 1;
- }
- node->key = key;
- node->val = val ;
- node->next = ptr[index].next;
- ptr[index].next = node;
- }
- pthread_mutex_unlock(&pflookup.lock);
- return 0;
+ unsigned int newsize;
+ int index;
+ prehashlistnode_t *ptr, *node;
+
+ if(pflookup.numelements > (pflookup.loadfactor * pflookup.size)) {
+ //Resize
+ newsize = 2 * pflookup.size + 1;
+ pthread_mutex_lock(&pflookup.lock);
+ prehashResize(newsize);
+ pthread_mutex_unlock(&pflookup.lock);
+ }
+
+ ptr = pflookup.table;
+ pflookup.numelements++;
+ index = prehashFunction(key);
+
+ pthread_mutex_lock(&pflookup.lock);
+ if(ptr[index].next == NULL && ptr[index].key == 0) { // Insert at the first position in the hashtable
+ ptr[index].key = key;
+ ptr[index].val = val;
+ } else { // Insert in the beginning of linked list
+ if ((node = calloc(1, sizeof(prehashlistnode_t))) == NULL) {
+ printf("Calloc error %s, %d\n", __FILE__, __LINE__);
+ pthread_mutex_unlock(&pflookup.lock);
+ return 1;
+ }
+ node->key = key;
+ node->val = val ;
+ node->next = ptr[index].next;
+ ptr[index].next = node;
+ }
+ pthread_mutex_unlock(&pflookup.lock);
+ return 0;
}
// Search for an address for a given oid
void *prehashSearch(unsigned int key) {
- int index;
- prehashlistnode_t *ptr, *node;
-
- ptr = pflookup.table;
- index = prehashFunction(key);
- node = &ptr[index];
- pthread_mutex_lock(&pflookup.lock);
- while(node != NULL) {
- if(node->key == key) {
- pthread_mutex_unlock(&pflookup.lock);
- return node->val;
- }
- node = node->next;
- }
- pthread_mutex_unlock(&pflookup.lock);
- return NULL;
+ int index;
+ prehashlistnode_t *ptr, *node;
+
+ ptr = pflookup.table;
+ index = prehashFunction(key);
+ node = &ptr[index];
+ pthread_mutex_lock(&pflookup.lock);
+ while(node != NULL) {
+ if(node->key == key) {
+ pthread_mutex_unlock(&pflookup.lock);
+ return node->val;
+ }
+ node = node->next;
+ }
+ pthread_mutex_unlock(&pflookup.lock);
+ return NULL;
}
unsigned int prehashRemove(unsigned int key) {
- int index;
- prehashlistnode_t *curr, *prev;
- prehashlistnode_t *ptr, *node;
-
- ptr = pflookup.table;
- index = prehashFunction(key);
- curr = &ptr[index];
-
- pthread_mutex_lock(&pflookup.lock);
- for (; curr != NULL; curr = curr->next) {
- if (curr->key == key) { // Find a match in the hash table
- pflookup.numelements--; // Decrement the number of elements in the global hashtable
- if ((curr == &ptr[index]) && (curr->next == NULL)) { // Delete the first item inside the hashtable with no linked list of prehashlistnode_t
- curr->key = 0;
- curr->val = NULL;
- } else if ((curr == &ptr[index]) && (curr->next != NULL)) { //Delete the first item with a linked list of prehashlistnode_t connected
- curr->key = curr->next->key;
- curr->val = curr->next->val;
- node = curr->next;
- curr->next = curr->next->next;
- free(node);
- } else { // Regular delete from linked listed
- prev->next = curr->next;
- free(curr);
- }
- pthread_mutex_unlock(&pflookup.lock);
- return 0;
- }
- prev = curr;
- }
- pthread_mutex_unlock(&pflookup.lock);
- return 1;
+ int index;
+ prehashlistnode_t *curr, *prev;
+ prehashlistnode_t *ptr, *node;
+
+ ptr = pflookup.table;
+ index = prehashFunction(key);
+ curr = &ptr[index];
+
+ pthread_mutex_lock(&pflookup.lock);
+ for (; curr != NULL; curr = curr->next) {
+ if (curr->key == key) { // Find a match in the hash table
+ pflookup.numelements--; // Decrement the number of elements in the global hashtable
+ if ((curr == &ptr[index]) && (curr->next == NULL)) { // Delete the first item inside the hashtable with no linked list of prehashlistnode_t
+ curr->key = 0;
+ curr->val = NULL;
+ } else if ((curr == &ptr[index]) && (curr->next != NULL)) { //Delete the first item with a linked list of prehashlistnode_t connected
+ curr->key = curr->next->key;
+ curr->val = curr->next->val;
+ node = curr->next;
+ curr->next = curr->next->next;
+ free(node);
+ } else { // Regular delete from linked listed
+ prev->next = curr->next;
+ free(curr);
+ }
+ pthread_mutex_unlock(&pflookup.lock);
+ return 0;
+ }
+ prev = curr;
+ }
+ pthread_mutex_unlock(&pflookup.lock);
+ return 1;
}
unsigned int prehashResize(unsigned int newsize) {
- prehashlistnode_t *node, *ptr, *curr, *next; // curr and next keep track of the current and the next chashlistnodes in a linked list
- unsigned int oldsize;
- int isfirst; // Keeps track of the first element in the prehashlistnode_t for each bin in hashtable
- int i,index;
- prehashlistnode_t *newnode;
-
- ptr = pflookup.table;
- oldsize = pflookup.size;
-
- if((node = calloc(newsize, sizeof(prehashlistnode_t))) == NULL) {
- printf("Calloc error %s %d\n", __FILE__, __LINE__);
- return 1;
- }
-
- pflookup.table = node; //Update the global hashtable upon resize()
- pflookup.size = newsize;
- pflookup.numelements = 0;
-
- for(i = 0; i < oldsize; i++) { //Outer loop for each bin in hash table
- curr = &ptr[i];
- isfirst = 1;
- while (curr != NULL) { //Inner loop to go through linked lists
- if (curr->key == 0) { //Exit inner loop if there the first element for a given bin/index is NULL
- break; //key = val =0 for element if not present within the hash table
- }
- next = curr->next;
- index = prehashFunction(curr->key);
- // Insert into the new table
- if(pflookup.table[index].next == NULL && pflookup.table[index].key == 0) {
- pflookup.table[index].key = curr->key;
- pflookup.table[index].val = curr->val;
- pflookup.numelements++;
- }else {
- if((newnode = calloc(1, sizeof(prehashlistnode_t))) == NULL) {
- printf("Calloc error %s, %d\n", __FILE__, __LINE__);
- return 1;
- }
- newnode->key = curr->key;
- newnode->val = curr->val;
- newnode->next = pflookup.table[index].next;
- pflookup.table[index].next = newnode;
- pflookup.numelements++;
- }
-
- //free the linked list of prehashlistnode_t if not the first element in the hash table
- if (isfirst != 1) {
- free(curr);
- }
-
- isfirst = 0;
- curr = next;
- }
- }
-
- free(ptr); //Free the memory of the old hash table
- return 0;
+ prehashlistnode_t *node, *ptr, *curr, *next; // curr and next keep track of the current and the next chashlistnodes in a linked list
+ unsigned int oldsize;
+ int isfirst; // Keeps track of the first element in the prehashlistnode_t for each bin in hashtable
+ int i,index;
+ prehashlistnode_t *newnode;
+
+ ptr = pflookup.table;
+ oldsize = pflookup.size;
+
+ if((node = calloc(newsize, sizeof(prehashlistnode_t))) == NULL) {
+ printf("Calloc error %s %d\n", __FILE__, __LINE__);
+ return 1;
+ }
+
+ pflookup.table = node; //Update the global hashtable upon resize()
+ pflookup.size = newsize;
+ pflookup.numelements = 0;
+
+ for(i = 0; i < oldsize; i++) { //Outer loop for each bin in hash table
+ curr = &ptr[i];
+ isfirst = 1;
+ while (curr != NULL) { //Inner loop to go through linked lists
+ if (curr->key == 0) { //Exit inner loop if there the first element for a given bin/index is NULL
+ break; //key = val =0 for element if not present within the hash table
+ }
+ next = curr->next;
+ index = prehashFunction(curr->key);
+ // Insert into the new table
+ if(pflookup.table[index].next == NULL && pflookup.table[index].key == 0) {
+ pflookup.table[index].key = curr->key;
+ pflookup.table[index].val = curr->val;
+ pflookup.numelements++;
+ }else {
+ if((newnode = calloc(1, sizeof(prehashlistnode_t))) == NULL) {
+ printf("Calloc error %s, %d\n", __FILE__, __LINE__);
+ return 1;
+ }
+ newnode->key = curr->key;
+ newnode->val = curr->val;
+ newnode->next = pflookup.table[index].next;
+ pflookup.table[index].next = newnode;
+ pflookup.numelements++;
+ }
+
+ //free the linked list of prehashlistnode_t if not the first element in the hash table
+ if (isfirst != 1) {
+ free(curr);
+ }
+
+ isfirst = 0;
+ curr = next;
+ }
+ }
+
+ free(ptr); //Free the memory of the old hash table
+ return 0;
}
/* Deletes the prefetch Cache */
void prehashDelete() {
- int i, isFirst;
- prehashlistnode_t *ptr, *curr, *next;
- ptr = pflookup.table;
-
- for(i=0 ; i<pflookup.size ; i++) {
- curr = &ptr[i];
- isFirst = 1;
- while(curr != NULL) {
- next = curr->next;
- if(isFirst != 1) {
- free(curr);
- }
- isFirst = 0;
- curr = next;
- }
- }
-
- free(ptr);
+ int i, isFirst;
+ prehashlistnode_t *ptr, *curr, *next;
+ ptr = pflookup.table;
+
+ for(i=0 ; i<pflookup.size ; i++) {
+ curr = &ptr[i];
+ isFirst = 1;
+ while(curr != NULL) {
+ next = curr->next;
+ if(isFirst != 1) {
+ free(curr);
+ }
+ isFirst = 0;
+ curr = next;
+ }
+ }
+
+ free(ptr);
}
//Note: This is based on the implementation of the inserting a key in the first position of the hashtable
void prehashClear() {
int i, isFirstBin;
prehashlistnode_t *ptr, *prev, *curr;
-
+
pthread_mutex_lock(&pflookup.lock);
ptr = pflookup.table;
for(i = 0; i < pflookup.size; i++) {
#include <netinet/tcp.h>
#if defined(__i386__)
-inline static int test_and_set(volatile unsigned int *addr) {
+inline int test_and_set(volatile unsigned int *addr) {
int oldval;
/* Note: the "xchg" instruction does not need a "lock" prefix */
__asm__ __volatile__("xchgl %0, %1"
: "0"(1), "m"(*(addr)));
return oldval;
}
-inline static void UnLock(volatile unsigned int *addr) {
+inline void UnLock(volatile unsigned int *addr) {
int oldval;
/* Note: the "xchg" instruction does not need a "lock" prefix */
__asm__ __volatile__("xchgl %0, %1"
#include "dstm.h"
+int test_and_set(volatile unsigned int *addr);
+void UnLock(volatile unsigned int *addr);
typedef struct socknode {
int sd;
objcopy = (objheader_t *) objstrAlloc(record->cache, size);
memcpy(objcopy, objheader, size);
/* Insert into cache's lookup table */
+ STATUS(objcopy)=0;
chashInsert(record->lookupTable, OID(objheader), objcopy);
#ifdef COMPILER
return &objcopy[1];
printf("Error: Object not found in Remote location %s, %d\n", __FILE__, __LINE__);
return NULL;
} else {
-
+ STATUS(objcopy)=0;
#ifdef COMPILER
return &objcopy[1];
#else
/* This function creates objects in the transaction record */
objheader_t *transCreateObj(transrecord_t *record, unsigned int size) {
objheader_t *tmp = (objheader_t *) objstrAlloc(record->cache, (sizeof(objheader_t) + size));
- tmp->notifylist = NULL;
OID(tmp) = getNewOID();
tmp->version = 1;
tmp->rcount = 1;
objnotfound++;
} else { /* If Obj found in machine (i.e. has not moved) */
/* Check if Obj is locked by any previous transaction */
- pthread_mutex_lock(&atomicObjLock);
- if ((STATUS((objheader_t *)mobj) & LOCK) == LOCK) {
- pthread_mutex_unlock(&atomicObjLock);
+ if (test_and_set(STATUSPTR(mobj))) {
if (version == ((objheader_t *)mobj)->version) { /* If locked then match versions */
v_matchlock++;
} else {/* If versions don't match ...HARD ABORT */
/* Send TRANS_DISAGREE to Coordinator */
localtdata->tdata->recvmsg[localtdata->tdata->thread_id].rcv_status = TRANS_DISAGREE;
}
- } else {/* If Obj is not locked then lock object */
- STATUS(((objheader_t *)mobj)) |= LOCK;
- pthread_mutex_unlock(&atomicObjLock);
+ } else {
+ //we're locked
/* Save all object oids that are locked on this machine during this transaction request call */
oidlocked[objlocked] = OID(((objheader_t *)mobj));
objlocked++;
printf("mhashsearch returns NULL at %s, %d\n", __FILE__, __LINE__);
return 1;
}
- STATUS(((objheader_t *)header)) &= ~(LOCK);
+ UnLock(STATUSPTR(header));
}
return 0;
/*This function completes the COMMIT process is the transaction is commiting*/
int transComProcess(local_thread_data_array_t *localtdata) {
- objheader_t *header, *tcptr;
- int i, nummod, tmpsize, numcreated, numlocked;
- unsigned int *oidmod, *oidcreated, *oidlocked;
- void *ptrcreate;
-
- nummod = localtdata->tdata->buffer->f.nummod;
- oidmod = localtdata->tdata->buffer->oidmod;
- numcreated = localtdata->tdata->buffer->f.numcreated;
- oidcreated = localtdata->tdata->buffer->oidcreated;
- numlocked = localtdata->transinfo->numlocked;
- oidlocked = localtdata->transinfo->objlocked;
-
- for (i = 0; i < nummod; i++) {
- if((header = (objheader_t *) mhashSearch(oidmod[i])) == NULL) {
- printf("Error: transComProcess() mhashsearch returns NULL at %s, %d\n", __FILE__, __LINE__);
- return 1;
- }
- /* Copy from transaction cache -> main object store */
- if ((tcptr = ((objheader_t *) chashSearch(localtdata->tdata->rec->lookupTable, oidmod[i]))) == NULL) {
- printf("Error: transComProcess() chashSearch returned NULL at %s, %d\n", __FILE__, __LINE__);
- return 1;
- }
- GETSIZE(tmpsize, header);
- pthread_mutex_lock(&mainobjstore_mutex);
- char *tmptcptr = (char *) tcptr;
- memcpy((char*)header+sizeof(objheader_t), (char *)tmptcptr+ sizeof(objheader_t), tmpsize);
- header->version += 1;
- pthread_mutex_lock(¬ifymutex);
- if(header->notifylist != NULL) {
- notifyAll(&header->notifylist, OID(header), header->version);
- }
- pthread_mutex_unlock(¬ifymutex);
- pthread_mutex_unlock(&mainobjstore_mutex);
- }
- /* If object is newly created inside transaction then commit it */
- for (i = 0; i < numcreated; i++) {
- if ((header = ((objheader_t *) chashSearch(localtdata->tdata->rec->lookupTable, oidcreated[i]))) == NULL) {
- printf("Error: transComProcess() chashSearch returned NULL for oid = %x at %s, %d\n", oidcreated[i], __FILE__, __LINE__);
- return 1;
- }
- GETSIZE(tmpsize, header);
- tmpsize += sizeof(objheader_t);
- pthread_mutex_lock(&mainobjstore_mutex);
- if ((ptrcreate = objstrAlloc(mainobjstore, tmpsize)) == NULL) {
- printf("Error: transComProcess() failed objstrAlloc %s, %d\n", __FILE__, __LINE__);
- pthread_mutex_unlock(&mainobjstore_mutex);
- return 1;
- }
- pthread_mutex_unlock(&mainobjstore_mutex);
- memcpy(ptrcreate, header, tmpsize);
- mhashInsert(oidcreated[i], ptrcreate);
- lhashInsert(oidcreated[i], myIpAddr);
- }
- /* Unlock locked objects */
- for(i = 0; i < numlocked; i++) {
- if((header = (objheader_t *) mhashSearch(oidlocked[i])) == NULL) {
- printf("mhashsearch returns NULL at %s, %d\n", __FILE__, __LINE__);
- return 1;
- }
- STATUS(header) &= ~(LOCK);
- }
-
- return 0;
+ objheader_t *header, *tcptr;
+ int i, nummod, tmpsize, numcreated, numlocked;
+ unsigned int *oidmod, *oidcreated, *oidlocked;
+ void *ptrcreate;
+
+ nummod = localtdata->tdata->buffer->f.nummod;
+ oidmod = localtdata->tdata->buffer->oidmod;
+ numcreated = localtdata->tdata->buffer->f.numcreated;
+ oidcreated = localtdata->tdata->buffer->oidcreated;
+ numlocked = localtdata->transinfo->numlocked;
+ oidlocked = localtdata->transinfo->objlocked;
+
+ for (i = 0; i < nummod; i++) {
+ if((header = (objheader_t *) mhashSearch(oidmod[i])) == NULL) {
+ printf("Error: transComProcess() mhashsearch returns NULL at %s, %d\n", __FILE__, __LINE__);
+ return 1;
+ }
+ /* Copy from transaction cache -> main object store */
+ if ((tcptr = ((objheader_t *) chashSearch(localtdata->tdata->rec->lookupTable, oidmod[i]))) == NULL) {
+ printf("Error: transComProcess() chashSearch returned NULL at %s, %d\n", __FILE__, __LINE__);
+ return 1;
+ }
+ GETSIZE(tmpsize, header);
+ char *tmptcptr = (char *) tcptr;
+ memcpy((char*)header+sizeof(objheader_t), (char *)tmptcptr+ sizeof(objheader_t), tmpsize);
+ header->version += 1;
+ if(header->notifylist != NULL) {
+ notifyAll(&header->notifylist, OID(header), header->version);
+ }
+ }
+ /* If object is newly created inside transaction then commit it */
+ for (i = 0; i < numcreated; i++) {
+ if ((header = ((objheader_t *) chashSearch(localtdata->tdata->rec->lookupTable, oidcreated[i]))) == NULL) {
+ printf("Error: transComProcess() chashSearch returned NULL for oid = %x at %s, %d\n", oidcreated[i], __FILE__, __LINE__);
+ return 1;
+ }
+ GETSIZE(tmpsize, header);
+ tmpsize += sizeof(objheader_t);
+ pthread_mutex_lock(&mainobjstore_mutex);
+ if ((ptrcreate = objstrAlloc(mainobjstore, tmpsize)) == NULL) {
+ printf("Error: transComProcess() failed objstrAlloc %s, %d\n", __FILE__, __LINE__);
+ pthread_mutex_unlock(&mainobjstore_mutex);
+ return 1;
+ }
+ pthread_mutex_unlock(&mainobjstore_mutex);
+ memcpy(ptrcreate, header, tmpsize);
+ mhashInsert(oidcreated[i], ptrcreate);
+ lhashInsert(oidcreated[i], myIpAddr);
+ }
+ /* Unlock locked objects */
+ for(i = 0; i < numlocked; i++) {
+ if((header = (objheader_t *) mhashSearch(oidlocked[i])) == NULL) {
+ printf("mhashsearch returns NULL at %s, %d\n", __FILE__, __LINE__);
+ return 1;
+ }
+ UnLock(STATUSPTR(header));
+ }
+
+ return 0;
}
prefetchpile_t *foundLocal(char *ptr) {
}
pthread_mutex_unlock(&prefetchcache_mutex);
memcpy(modptr, recvbuffer + sizeof(char) + sizeof(unsigned int), size);
-
+ STATUS(modptr)=0;
+
/* Insert the oid and its address into the prefetch hash lookup table */
/* Do a version comparison if the oid exists */
if((oldptr = prehashSearch(oid)) != NULL) {
/* This function sends notification request per thread waiting on object(s) whose version
* changes */
int reqNotify(unsigned int *oidarry, unsigned short *versionarry, unsigned int numoid) {
- int sock,i;
- objheader_t *objheader;
- struct sockaddr_in remoteAddr;
- char msg[1 + numoid * (sizeof(unsigned short) + sizeof(unsigned int)) + 3 * sizeof(unsigned int)];
- char *ptr;
- int bytesSent;
- int status, size;
- unsigned short version;
- unsigned int oid,mid;
- static unsigned int threadid = 0;
- pthread_mutex_t threadnotify = PTHREAD_MUTEX_INITIALIZER; //Lock and condition var for threadjoin and notification
- pthread_cond_t threadcond = PTHREAD_COND_INITIALIZER;
- notifydata_t *ndata;
-
- oid = oidarry[0];
- if((mid = lhashSearch(oid)) == 0) {
- printf("Error: %s() No such machine found for oid =%x\n",__func__, oid);
- return;
- }
-
- if ((sock = socket(AF_INET, SOCK_STREAM, 0)) < 0){
- perror("reqNotify():socket()");
- return -1;
- }
-
- bzero(&remoteAddr, sizeof(remoteAddr));
- remoteAddr.sin_family = AF_INET;
- remoteAddr.sin_port = htons(LISTEN_PORT);
- remoteAddr.sin_addr.s_addr = htonl(mid);
-
- /* Generate unique threadid */
- threadid++;
-
- /* Save threadid, numoid, oidarray, versionarray, pthread_cond_variable for later processing */
- if((ndata = calloc(1, sizeof(notifydata_t))) == NULL) {
- printf("Calloc Error %s, %d\n", __FILE__, __LINE__);
- return -1;
- }
- ndata->numoid = numoid;
- ndata->threadid = threadid;
- ndata->oidarry = oidarry;
- ndata->versionarry = versionarry;
- ndata->threadcond = threadcond;
- ndata->threadnotify = threadnotify;
- if((status = notifyhashInsert(threadid, ndata)) != 0) {
- printf("reqNotify(): Insert into notify hash table not successful %s, %d\n", __FILE__, __LINE__);
- free(ndata);
- return -1;
- }
-
- /* Send number of oids, oidarry, version array, machine id and threadid */
- if (connect(sock, (struct sockaddr *)&remoteAddr, sizeof(remoteAddr)) < 0){
- printf("reqNotify():error %d connecting to %s:%d\n", errno,
- inet_ntoa(remoteAddr.sin_addr), LISTEN_PORT);
- free(ndata);
- return -1;
- } else {
- msg[0] = THREAD_NOTIFY_REQUEST;
- *((unsigned int *)(&msg[1])) = numoid;
- /* Send array of oids */
- size = sizeof(unsigned int);
- {
- i = 0;
- while(i < numoid) {
- oid = oidarry[i];
- *((unsigned int *)(&msg[1] + size)) = oid;
- size += sizeof(unsigned int);
- i++;
- }
- }
-
- /* Send array of version */
- {
- i = 0;
- while(i < numoid) {
- version = versionarry[i];
- *((unsigned short *)(&msg[1] + size)) = version;
- size += sizeof(unsigned short);
- i++;
- }
- }
-
- *((unsigned int *)(&msg[1] + size)) = myIpAddr;
- size += sizeof(unsigned int);
- *((unsigned int *)(&msg[1] + size)) = threadid;
- pthread_mutex_lock(&(ndata->threadnotify));
- size = 1 + numoid * (sizeof(unsigned int) + sizeof(unsigned short)) + 3 * sizeof(unsigned int);
- send_data(sock, msg, size);
- pthread_cond_wait(&(ndata->threadcond), &(ndata->threadnotify));
- pthread_mutex_unlock(&(ndata->threadnotify));
- }
-
- pthread_cond_destroy(&threadcond);
- pthread_mutex_destroy(&threadnotify);
- free(ndata);
- close(sock);
- return status;
+ int sock,i;
+ objheader_t *objheader;
+ struct sockaddr_in remoteAddr;
+ char msg[1 + numoid * (sizeof(unsigned short) + sizeof(unsigned int)) + 3 * sizeof(unsigned int)];
+ char *ptr;
+ int bytesSent;
+ int status, size;
+ unsigned short version;
+ unsigned int oid,mid;
+ static unsigned int threadid = 0;
+ pthread_mutex_t threadnotify = PTHREAD_MUTEX_INITIALIZER; //Lock and condition var for threadjoin and notification
+ pthread_cond_t threadcond = PTHREAD_COND_INITIALIZER;
+ notifydata_t *ndata;
+
+ oid = oidarry[0];
+ if((mid = lhashSearch(oid)) == 0) {
+ printf("Error: %s() No such machine found for oid =%x\n",__func__, oid);
+ return;
+ }
+
+ if ((sock = socket(AF_INET, SOCK_STREAM, 0)) < 0){
+ perror("reqNotify():socket()");
+ return -1;
+ }
+
+ bzero(&remoteAddr, sizeof(remoteAddr));
+ remoteAddr.sin_family = AF_INET;
+ remoteAddr.sin_port = htons(LISTEN_PORT);
+ remoteAddr.sin_addr.s_addr = htonl(mid);
+
+ /* Generate unique threadid */
+ threadid++;
+
+ /* Save threadid, numoid, oidarray, versionarray, pthread_cond_variable for later processing */
+ if((ndata = calloc(1, sizeof(notifydata_t))) == NULL) {
+ printf("Calloc Error %s, %d\n", __FILE__, __LINE__);
+ return -1;
+ }
+ ndata->numoid = numoid;
+ ndata->threadid = threadid;
+ ndata->oidarry = oidarry;
+ ndata->versionarry = versionarry;
+ ndata->threadcond = threadcond;
+ ndata->threadnotify = threadnotify;
+ if((status = notifyhashInsert(threadid, ndata)) != 0) {
+ printf("reqNotify(): Insert into notify hash table not successful %s, %d\n", __FILE__, __LINE__);
+ free(ndata);
+ return -1;
+ }
+
+ /* Send number of oids, oidarry, version array, machine id and threadid */
+ if (connect(sock, (struct sockaddr *)&remoteAddr, sizeof(remoteAddr)) < 0){
+ printf("reqNotify():error %d connecting to %s:%d\n", errno,
+ inet_ntoa(remoteAddr.sin_addr), LISTEN_PORT);
+ free(ndata);
+ return -1;
+ } else {
+ msg[0] = THREAD_NOTIFY_REQUEST;
+ *((unsigned int *)(&msg[1])) = numoid;
+ /* Send array of oids */
+ size = sizeof(unsigned int);
+ {
+ i = 0;
+ while(i < numoid) {
+ oid = oidarry[i];
+ *((unsigned int *)(&msg[1] + size)) = oid;
+ size += sizeof(unsigned int);
+ i++;
+ }
+ }
+
+ /* Send array of version */
+ {
+ i = 0;
+ while(i < numoid) {
+ version = versionarry[i];
+ *((unsigned short *)(&msg[1] + size)) = version;
+ size += sizeof(unsigned short);
+ i++;
+ }
+ }
+
+ *((unsigned int *)(&msg[1] + size)) = myIpAddr;
+ size += sizeof(unsigned int);
+ *((unsigned int *)(&msg[1] + size)) = threadid;
+ pthread_mutex_lock(&(ndata->threadnotify));
+ size = 1 + numoid * (sizeof(unsigned int) + sizeof(unsigned short)) + 3 * sizeof(unsigned int);
+ send_data(sock, msg, size);
+ pthread_cond_wait(&(ndata->threadcond), &(ndata->threadnotify));
+ pthread_mutex_unlock(&(ndata->threadnotify));
+ }
+
+ pthread_cond_destroy(&threadcond);
+ pthread_mutex_destroy(&threadnotify);
+ free(ndata);
+ close(sock);
+ return status;
}
void threadNotify(unsigned int oid, unsigned short version, unsigned int tid) {
}
int notifyAll(threadlist_t **head, unsigned int oid, unsigned int version) {
- threadlist_t *ptr;
- unsigned int mid;
- struct sockaddr_in remoteAddr;
- char msg[1 + sizeof(unsigned short) + 2*sizeof(unsigned int)];
- int sock, status, size, bytesSent;
-
- while(*head != NULL) {
- ptr = *head;
- mid = ptr->mid;
- //create a socket connection to that machine
- if ((sock = socket(AF_INET, SOCK_STREAM, 0)) < 0){
- perror("notifyAll():socket()");
- return -1;
- }
-
- bzero(&remoteAddr, sizeof(remoteAddr));
- remoteAddr.sin_family = AF_INET;
- remoteAddr.sin_port = htons(LISTEN_PORT);
- remoteAddr.sin_addr.s_addr = htonl(mid);
- //send Thread Notify response and threadid to that machine
- if (connect(sock, (struct sockaddr *)&remoteAddr, sizeof(remoteAddr)) < 0){
- printf("notifyAll():error %d connecting to %s:%d\n", errno,
- inet_ntoa(remoteAddr.sin_addr), LISTEN_PORT);
- status = -1;
- fflush(stdout);
- } else {
- bzero(msg, (1+sizeof(unsigned short) + 2*sizeof(unsigned int)));
- msg[0] = THREAD_NOTIFY_RESPONSE;
- *((unsigned int *)&msg[1]) = oid;
- size = sizeof(unsigned int);
- *((unsigned short *)(&msg[1]+ size)) = version;
- size+= sizeof(unsigned short);
- *((unsigned int *)(&msg[1]+ size)) = ptr->threadid;
-
- size = 1 + 2*sizeof(unsigned int) + sizeof(unsigned short);
- send_data(sock, msg, size);
- }
- //close socket
- close(sock);
- // Update head
- *head = ptr->next;
- free(ptr);
- }
- return status;
+ threadlist_t *ptr;
+ unsigned int mid;
+ struct sockaddr_in remoteAddr;
+ char msg[1 + sizeof(unsigned short) + 2*sizeof(unsigned int)];
+ int sock, status, size, bytesSent;
+
+ while(*head != NULL) {
+ ptr = *head;
+ mid = ptr->mid;
+ //create a socket connection to that machine
+ if ((sock = socket(AF_INET, SOCK_STREAM, 0)) < 0){
+ perror("notifyAll():socket()");
+ return -1;
+ }
+
+ bzero(&remoteAddr, sizeof(remoteAddr));
+ remoteAddr.sin_family = AF_INET;
+ remoteAddr.sin_port = htons(LISTEN_PORT);
+ remoteAddr.sin_addr.s_addr = htonl(mid);
+ //send Thread Notify response and threadid to that machine
+ if (connect(sock, (struct sockaddr *)&remoteAddr, sizeof(remoteAddr)) < 0){
+ printf("notifyAll():error %d connecting to %s:%d\n", errno,
+ inet_ntoa(remoteAddr.sin_addr), LISTEN_PORT);
+ status = -1;
+ fflush(stdout);
+ } else {
+ bzero(msg, (1+sizeof(unsigned short) + 2*sizeof(unsigned int)));
+ msg[0] = THREAD_NOTIFY_RESPONSE;
+ *((unsigned int *)&msg[1]) = oid;
+ size = sizeof(unsigned int);
+ *((unsigned short *)(&msg[1]+ size)) = version;
+ size+= sizeof(unsigned short);
+ *((unsigned int *)(&msg[1]+ size)) = ptr->threadid;
+
+ size = 1 + 2*sizeof(unsigned int) + sizeof(unsigned short);
+ send_data(sock, msg, size);
+ }
+ //close socket
+ close(sock);
+ // Update head
+ *head = ptr->next;
+ free(ptr);
+ }
+ return status;
}
void transAbort(transrecord_t *trans) {