From: adash Date: Fri, 18 Jan 2008 21:53:32 +0000 (+0000) Subject: Implementation for thread join and wait and notify design X-Git-Tag: preEdgeChange~299 X-Git-Url: http://demsky.eecs.uci.edu/git/?a=commitdiff_plain;h=56379fd5311533f8029ad04c12ca093d10be5105;p=IRC.git Implementation for thread join and wait and notify design Minor bug fixes --- diff --git a/Robust/src/ClassLibrary/ThreadDSM.java b/Robust/src/ClassLibrary/ThreadDSM.java index 322a022b..d5c0a0ce 100644 --- a/Robust/src/ClassLibrary/ThreadDSM.java +++ b/Robust/src/ClassLibrary/ThreadDSM.java @@ -1,14 +1,13 @@ public class Thread { /* Don't allow overriding this method. If you do, it will break dispatch * because we don't have the type information necessary. */ - private boolean threadDone; + public boolean threadDone; public Thread() { threadDone = false; } - //public native static void join(); - public native void join(); + public final native void join(); public final native void start(int mid); diff --git a/Robust/src/Runtime/DSTM/interface/dstm.h b/Robust/src/Runtime/DSTM/interface/dstm.h index 917e546e..ab3cc4ed 100644 --- a/Robust/src/Runtime/DSTM/interface/dstm.h +++ b/Robust/src/Runtime/DSTM/interface/dstm.h @@ -34,6 +34,8 @@ #define TRANS_SUCESSFUL 21 #define TRANS_PREFETCH_RESPONSE 22 #define START_REMOTE_THREAD 23 +#define THREAD_NOTIFY_REQUEST 24 +#define THREAD_NOTIFY_RESPONSE 25 //Control bits for status of objects in Machine pile #define OBJ_LOCKED_BUT_VERSION_MATCH 14 @@ -51,6 +53,8 @@ #include "clookup.h" #include "queue.h" #include "mcpileq.h" +#include "threadnotify.h" + #define DEFAULT_OBJ_STORE_SIZE 1048510 //1MB #define TID_LEN 20 @@ -65,6 +69,7 @@ #include "structdefs.h" typedef struct objheader { + threadlist_t *notifylist; unsigned short version; unsigned short rcount; } objheader_t; @@ -93,6 +98,7 @@ typedef struct objheader { #else typedef struct objheader { + threadlist_t *notifylist; unsigned int oid; unsigned short type; unsigned short version; @@ -179,12 +185,15 @@ typedef struct local_thread_data_array { trans_commit_data_t *transinfo; /* Holds information of objects locked and not found in the participant */ } local_thread_data_array_t; -//Structure for members within prefetch tuples -typedef struct member { - short offset; /* Holds offset of the ptr field */ - short index; /* Holds the array index value */ - struct member *next; -}trans_member_t; +//Structure for objects involved in wait-notify call +//TODO Use it +typedef struct notifydata { + unsigned int numoid; /* Number of oids on which we are waiting for updated notification */ + unsigned int threadid; /* The threadid that is waiting for update notification response*/ + unsigned int *oidarry; /* Pointer to array of oids */ + unsigned short *version;/* Pointer to array of versions of the oids that we are waiting on */ +}notifydata_t; + /* Initialize main object store and lookup tables, start server thread. */ int dstmInit(void); @@ -206,8 +215,8 @@ int readClientReq(trans_commit_data_t *, int); int processClientReq(fixed_data_t *, trans_commit_data_t *,unsigned int *, char *, void *, unsigned int *, int); char handleTransReq(fixed_data_t *, trans_commit_data_t *, unsigned int *, char *, void *, int); int decideCtrlMessage(fixed_data_t *, trans_commit_data_t *, int *, int *, int *, int *, int *, void *, unsigned int *, unsigned int *, int); -//int transCommitProcess(trans_commit_data_t *, int); int transCommitProcess(void *, unsigned int *, unsigned int *, int, int, int); +void processReqNotify(unsigned int numoid, unsigned int *oid, unsigned short *version, unsigned int mid, unsigned int threadid); /* end server portion */ /* Prototypes for transactions */ @@ -220,7 +229,7 @@ int processConfigFile(); void addHost(unsigned int); void mapObjMethod(unsigned short); -void randomdelay(void); +void randomdelay(); transrecord_t *transStart(); objheader_t *transRead(transrecord_t *, unsigned int); objheader_t *transCreateObj(transrecord_t *, unsigned int); //returns oid @@ -233,6 +242,7 @@ void *getRemoteObj(transrecord_t *, unsigned int, unsigned int); void *handleLocalReq(void *); int transComProcess(local_thread_data_array_t *); int transAbortProcess(local_thread_data_array_t *); +void transAbort(transrecord_t *trans); void prefetch(int, unsigned int *, unsigned short *, short*); void *transPrefetch(void *); @@ -246,5 +256,10 @@ void sendPrefetchReq(prefetchpile_t*, int); void getPrefetchResponse(int, int); unsigned short getObjType(unsigned int oid); int startRemoteThread(unsigned int oid, unsigned int mid); +/* Sends notification request for thread join, if sucessful returns 0 else returns -1 */ +void reqNotify(unsigned int *oidarry, unsigned short *versionarry, unsigned int mid, unsigned int numoid); +void threadNotify(unsigned int oid, unsigned short version, unsigned int tid); +int notifyAll(threadlist_t **head, unsigned int oid, unsigned int version); + /* end transactions */ #endif diff --git a/Robust/src/Runtime/DSTM/interface/dstmserver.c b/Robust/src/Runtime/DSTM/interface/dstmserver.c index 11d2507f..3ec7635c 100644 --- a/Robust/src/Runtime/DSTM/interface/dstmserver.c +++ b/Robust/src/Runtime/DSTM/interface/dstmserver.c @@ -7,9 +7,12 @@ #include #include #include +#include +#include #include "dstm.h" #include "mlookup.h" #include "llookup.h" +#include "threadnotify.h" #ifdef COMPILER #include "thread.h" #endif @@ -41,6 +44,9 @@ int dstmInit(void) if (lhashCreate(HASH_SIZE, LOADFACTOR)) return 1; //failure + + if (notifyhashCreate(N_HASH_SIZE, N_LOADFACTOR)) + return 1; //failure return 0; } @@ -107,23 +113,22 @@ void *dstmListen() * and accordingly calls other functions to process new requests */ void *dstmAccept(void *acceptfd) { - int numbytes,i, val, retval; + int val, retval, size; unsigned int oid; char buffer[RECEIVE_BUFFER_SIZE], control,ctrl; char *ptr; void *srcObj; objheader_t *h; trans_commit_data_t transinfo; - unsigned short objType; - + unsigned short objType, *versionarry, version; + unsigned int *oidarry, numoid, mid, threadid; + transinfo.objlocked = NULL; transinfo.objnotfound = NULL; transinfo.modptr = NULL; transinfo.numlocked = 0; transinfo.numnotfound = 0; - int fd_flags = fcntl((int)acceptfd, F_GETFD), size; - /* Receive control messages from other machines */ if((retval = recv((int)acceptfd, &control, sizeof(char), 0)) <= 0) { if (retval == 0) { @@ -142,6 +147,7 @@ void *dstmAccept(void *acceptfd) } if((srcObj = mhashSearch(oid)) == NULL) { printf("Object not found in Main Object Store %s %d\n", __FILE__, __LINE__); + pthread_exit(NULL); } h = (objheader_t *) srcObj; GETSIZE(size, h); @@ -191,7 +197,7 @@ void *dstmAccept(void *acceptfd) case TRANS_PREFETCH: printf("DEBUG -> Recv TRANS_PREFETCH\n"); if((val = prefetchReq((int)acceptfd)) != 0) { - printf("Error in readClientReq\n"); + printf("Error in transPrefetch\n"); pthread_exit(NULL); } break; @@ -209,6 +215,43 @@ void *dstmAccept(void *acceptfd) } break; + case THREAD_NOTIFY_REQUEST: + size = sizeof(unsigned int); + retval = recv((int)acceptfd, ptr, size, 0); + numoid = *((unsigned int *) ptr); + size = (sizeof(unsigned int) + sizeof(unsigned short)) * numoid + 2 * sizeof(unsigned int); + retval = recv((int)acceptfd, ptr, size, 0); + oidarry = calloc(numoid, sizeof(unsigned int)); + memcpy(oidarry, ptr, sizeof(unsigned int) * numoid); + size = sizeof(unsigned int) * numoid; + versionarry = calloc(numoid, sizeof(unsigned short)); + memcpy(versionarry, ptr+size, sizeof(unsigned short) * numoid); + size += sizeof(unsigned short) * numoid; + mid = *((unsigned int *)(ptr+size)); + size += sizeof(unsigned int); + threadid = *((unsigned int *)(ptr+size)); + processReqNotify(numoid, oidarry, versionarry, mid, threadid); + + break; + + case THREAD_NOTIFY_RESPONSE: + size = sizeof(unsigned short) + 2 * sizeof(unsigned int); + retval = recv((int)acceptfd, ptr, size, 0); + if(retval <= 0) + perror("dstmAccept(): error receiving THREAD_NOTIFY_RESPONSE msg"); + else if( retval != 2*sizeof(unsigned int) + sizeof(unsigned short)) + printf("dstmAccept(): incorrect smsg size %d for THREAD_NOTIFY_RESPONSE msg\n", retval); + else { + oid = *((unsigned int *)ptr); + size = sizeof(unsigned int); + version = *((unsigned short *)(ptr+size)); + size += sizeof(unsigned short); + threadid = *((unsigned int *)(ptr+size)); + threadNotify(oid,version,threadid); + } + + break; + default: printf("DEBUG -> dstmAccept: Error Unknown opcode %d\n", control); } @@ -561,6 +604,10 @@ int transCommitProcess(void *modptr, unsigned int *oidmod, unsigned int *oidlock pthread_mutex_lock(&mainobjstore_mutex); memcpy(header, (char *)modptr + offset, tmpsize + sizeof(objheader_t)); header->version += 1; + /* If threads are waiting on this object to be updated, notify them */ + if(header->notifylist != NULL) { + notifyAll(&header->notifylist, OID(header), header->version); + } pthread_mutex_unlock(&mainobjstore_mutex); offset += sizeof(objheader_t) + tmpsize; } @@ -628,6 +675,7 @@ int prefetchReq(int acceptfd) { } while(sum < N && n != 0); /* Process each oid */ + printf("Oid 0x%x is being searched\n", oid); if ((mobj = mhashSearch(oid)) == NULL) {/* Obj not found */ /* Save the oids not found in buffer for later use */ *(buffer + index) = OBJECT_NOT_FOUND; @@ -708,3 +756,75 @@ int prefetchReq(int acceptfd) { return 0; } +void processReqNotify(unsigned int numoid, unsigned int *oidarry, unsigned short *versionarry, unsigned int mid, unsigned int threadid) { + objheader_t *header; + unsigned int oid; + unsigned short newversion; + char msg[1+ sizeof(unsigned int)]; + int sd; + struct sockaddr_in remoteAddr; + int bytesSent; + int status, size, retry = 0; + + int i = 0; + while(i < numoid) { + oid = *(oidarry + i); + if((header = (objheader_t *) mhashSearch(oid)) == NULL) { + printf("processReqNotify(): Object is not found in mlookup %s, %d\n", __FILE__, __LINE__); + return; + } else { + /* Check to see if versions are same */ +checkversion: + if ((STATUS(header) & LOCK) != LOCK) { + STATUS(header) |= LOCK; + if(header->version == *(versionarry + i)) { + //Add to the notify list + insNode(header->notifylist, threadid, mid); + } else { + if ((sd = socket(AF_INET, SOCK_STREAM, 0)) < 0){ + perror("processReqNotify():socket()"); + return; + } + bzero(&remoteAddr, sizeof(remoteAddr)); + remoteAddr.sin_family = AF_INET; + remoteAddr.sin_port = htons(LISTEN_PORT); + remoteAddr.sin_addr.s_addr = htonl(mid); + + if (connect(sd, (struct sockaddr *)&remoteAddr, sizeof(remoteAddr)) < 0){ + printf("processReqNotify():error %d connecting to %s:%d\n", errno, + inet_ntoa(remoteAddr.sin_addr), LISTEN_PORT); + status = -1; + } else { + //Send Update notification + msg[0] = THREAD_NOTIFY_RESPONSE; + msg[1] = oid; + size = sizeof(unsigned int); + memcpy(&msg[1] + size, &newversion, sizeof(unsigned short)); + size += sizeof(unsigned short); + memcpy(&msg[1] + size, &threadid, sizeof(unsigned int)); + bytesSent = send(sd, msg, 1+sizeof(unsigned int), 0); + if (bytesSent < 0){ + perror("processReqNotify():send()"); + status = -1; + } else if (bytesSent != 1 + sizeof(unsigned short) + 2*sizeof(unsigned int)){ + printf("processReqNotify(): error, sent %d bytes\n", bytesSent); + status = -1; + } else { + status = 0; + } + + } + close(sd); + } + STATUS(header) &= ~(LOCK); + } else { + randomdelay(); + printf("DEBUG-> processReqNotify() Object is still locked\n"); + goto checkversion; + } + } + } + free(oidarry); + free(versionarry); +} + diff --git a/Robust/src/Runtime/DSTM/interface/mlookup.c b/Robust/src/Runtime/DSTM/interface/mlookup.c index a9cca795..0629adbe 100644 --- a/Robust/src/Runtime/DSTM/interface/mlookup.c +++ b/Robust/src/Runtime/DSTM/interface/mlookup.c @@ -5,8 +5,6 @@ mhashtable_t mlookup; //Global hash table // Creates a machine lookup table with size =" size" unsigned int mhashCreate(unsigned int size, float loadfactor) { mhashlistnode_t *nodes; - int i; - // Allocate space for the hash table if((nodes = calloc(size, sizeof(mhashlistnode_t))) == NULL) { printf("Calloc error %s %d\n", __FILE__, __LINE__); diff --git a/Robust/src/Runtime/DSTM/interface/threadnotify.c b/Robust/src/Runtime/DSTM/interface/threadnotify.c new file mode 100644 index 00000000..cfa45eec --- /dev/null +++ b/Robust/src/Runtime/DSTM/interface/threadnotify.c @@ -0,0 +1,223 @@ +#include "threadnotify.h" + +notifyhashtable_t nlookup; //Global hash table + +void insNode(threadlist_t *head, unsigned int threadid, unsigned int mid) { + threadlist_t *ptr; + if(head == NULL) { + if((head = calloc(1, sizeof(threadlist_t))) == NULL) { + printf("Calloc Error %s, %d,\n", __FILE__, __LINE__); + return; + } + head->threadid = threadid; + head->mid = mid; + head->next = NULL; + } else { + if((ptr = calloc(1, sizeof(threadlist_t))) == NULL) { + printf("Calloc Error %s, %d,\n", __FILE__, __LINE__); + return; + } + ptr->threadid = threadid; + ptr->mid = mid; + ptr->next = head; + head = ptr; + } +} + +void display(threadlist_t *head) { + threadlist_t *ptr; + if(head == NULL) { + printf("No thread is waiting\n"); + return; + } else { + while(head != NULL) { + ptr = head; + printf("The threadid waiting is = %d\n", ptr->threadid); + printf("The mid on which thread present = %d\n", ptr->mid); + head = ptr->next; + } + } +} + +unsigned int notifyhashCreate(unsigned int size, float loadfactor) { + notifylistnode_t *nodes; + + // Allocate space for the hash table + if((nodes = calloc(size, sizeof(notifylistnode_t))) == NULL) { + printf("Calloc error %s %d\n", __FILE__, __LINE__); + return 1; + } + + nlookup.table = nodes; + nlookup.size = size; + nlookup.numelements = 0; // Initial number of elements in the hash + nlookup.loadfactor = loadfactor; + //Initialize the pthread_mutex variable + pthread_mutex_init(&nlookup.locktable, NULL); + return 0; +} + +// Assign to tids to bins inside hash table +unsigned int notifyhashFunction(unsigned int tid) { + return( tid % (nlookup.size)); +} + +// Insert threadcond and threadid mapping into the hash table +unsigned int notifyhashInsert(unsigned int tid, pthread_cond_t threadcond) { + unsigned int newsize; + int index; + notifylistnode_t *ptr, *node; + + if (nlookup.numelements > (nlookup.loadfactor * nlookup.size)) { + //Resize Table + newsize = 2 * nlookup.size + 1; + pthread_mutex_lock(&nlookup.locktable); + notifyhashResize(newsize); + pthread_mutex_unlock(&nlookup.locktable); + } + ptr = nlookup.table; + nlookup.numelements++; + + index = notifyhashFunction(tid); +#ifdef DEBUG + printf("DEBUG -> index = %d, threadid = %d\n", index, tid); +#endif + pthread_mutex_lock(&nlookup.locktable); + if(ptr[index].next == NULL && ptr[index].threadid == 0) { // Insert at the first position in the hashtable + ptr[index].threadid = tid; + ptr[index].threadcond = threadcond; + } else { // Insert in the beginning of linked list + if ((node = calloc(1, sizeof(notifylistnode_t))) == NULL) { + printf("Calloc error %s, %d\n", __FILE__, __LINE__); + pthread_mutex_unlock(&nlookup.locktable); + return 1; + } + node->threadid = tid; + node->threadcond = threadcond; + node->next = ptr[index].next; + ptr[index].next = node; + } + pthread_mutex_unlock(&nlookup.locktable); + return 0; +} + +// Return pthread_cond_t threadcond for a given threadid in the hash table +pthread_cond_t notifyhashSearch(unsigned int tid) { + int index; + notifylistnode_t *ptr, *node; + pthread_cond_t tmp = PTHREAD_COND_INITIALIZER; + + ptr = nlookup.table; // Address of the beginning of hash table + index = notifyhashFunction(tid); + node = &ptr[index]; + pthread_mutex_lock(&nlookup.locktable); + while(node != NULL) { + if(node->threadid == tid) { + pthread_mutex_unlock(&nlookup.locktable); + return node->threadcond; + } + node = node->next; + } + pthread_mutex_unlock(&nlookup.locktable); + return tmp; +} + +// Remove an entry from the hash table +unsigned int notifyhashRemove(unsigned int tid) { + int index; + notifylistnode_t *curr, *prev; + notifylistnode_t *ptr, *node; + + ptr = nlookup.table; + index = notifyhashFunction(tid); + curr = &ptr[index]; + + pthread_cond_t tmp = PTHREAD_COND_INITIALIZER; + pthread_mutex_lock(&nlookup.locktable); + for (; curr != NULL; curr = curr->next) { + if (curr->threadid == tid) { // Find a match in the hash table + nlookup.numelements--; // Decrement the number of elements in the global hashtable + if ((curr == &ptr[index]) && (curr->next == NULL)) { // Delete the first item inside the hashtable with no linked list of notifylistnode_t + curr->threadid = 0; + curr->threadcond = tmp; + } else if ((curr == &ptr[index]) && (curr->next != NULL)) { //Delete the first bin item with a linked list of notifylistnode_t connected + curr->threadid = curr->next->threadid; + curr->threadcond = curr->next->threadcond; + node = curr->next; + curr->next = curr->next->next; + free(node); + } else { // Regular delete from linked listed + prev->next = curr->next; + free(curr); + } + pthread_mutex_unlock(&nlookup.locktable); + return 0; + } + prev = curr; + } + pthread_mutex_unlock(&nlookup.locktable); + return 1; +} + +// Resize table +unsigned int notifyhashResize(unsigned int newsize) { + notifylistnode_t *node, *ptr, *curr, *next; // curr and next keep track of the current and the next notifyhashlistnodes in a linked list + unsigned int oldsize; + int isfirst; // Keeps track of the first element in the notifylistnode_t for each bin in hashtable + int i,index; + notifylistnode_t *newnode; + + ptr = nlookup.table; + oldsize = nlookup.size; + + if((node = calloc(newsize, sizeof(notifylistnode_t))) == NULL) { + printf("Calloc error %s %d\n", __FILE__, __LINE__); + return 1; + } + + nlookup.table = node; //Update the global hashtable upon resize() + nlookup.size = newsize; + nlookup.numelements = 0; + + for(i = 0; i < oldsize; i++) { //Outer loop for each bin in hash table + curr = &ptr[i]; + isfirst = 1; + while (curr != NULL) { //Inner loop to go through linked lists + if (curr->threadid == 0) { //Exit inner loop if there the first element for a given bin/index is NULL + break; //threadid = threadcond =0 for element if not present within the hash table + } + next = curr->next; + index = notifyhashFunction(curr->threadid); +#ifdef DEBUG + printf("DEBUG(resize) -> index = %d, threadid = %d\n", index, curr->threadid); +#endif + // Insert into the new table + if(nlookup.table[index].next == NULL && nlookup.table[index].threadid == 0) { + nlookup.table[index].threadid = curr->threadid; + nlookup.table[index].threadcond = curr->threadcond; + nlookup.numelements++; + }else { + if((newnode = calloc(1, sizeof(notifylistnode_t))) == NULL) { + printf("Calloc error %s, %d\n", __FILE__, __LINE__); + return 1; + } + newnode->threadid = curr->threadid; + newnode->threadcond = curr->threadcond; + newnode->next = nlookup.table[index].next; + nlookup.table[index].next = newnode; + nlookup.numelements++; + } + + //free the linked list of notifylistnode_t if not the first element in the hash table + if (isfirst != 1) { + free(curr); + } + + isfirst = 0; + curr = next; + } + } + + free(ptr); //Free the memory of the old hash table + return 0; +} diff --git a/Robust/src/Runtime/DSTM/interface/threadnotify.h b/Robust/src/Runtime/DSTM/interface/threadnotify.h new file mode 100644 index 00000000..75071a77 --- /dev/null +++ b/Robust/src/Runtime/DSTM/interface/threadnotify.h @@ -0,0 +1,41 @@ +#ifndef _THREADNOTIFY_H_ +#define _THREADNOTIFY_H_ + +#include +#include +#include + +#define N_LOADFACTOR 0.75 +#define N_HASH_SIZE 20 + +//Structure to notify object of which other objects/threads are waiting on it +typedef struct threadlist { + unsigned int threadid; + unsigned int mid; + struct threadlist *next; +} threadlist_t; + +typedef struct notifylistnode { + unsigned int threadid; + pthread_cond_t threadcond; + struct notifylistnode *next; +} notifylistnode_t; + +typedef struct notifyhashtable { + notifylistnode_t *table; //points to beginning of hash table + unsigned int size; + unsigned int numelements; + float loadfactor; + pthread_mutex_t locktable; +} notifyhashtable_t; + +void insNode(threadlist_t *head, unsigned int threadid, unsigned int mid); +void display(threadlist_t *head); +unsigned int notifyhashCreate(unsigned int size, float loadfactor); +unsigned int notifyhashFunction(unsigned int tid); +unsigned notifyhashInsert(unsigned int tid, pthread_cond_t threadcond); +pthread_cond_t notifyhashSearch(unsigned int tid); //returns val, NULL if not found +unsigned int notifyhashRemove(unsigned int tid); //returns -1 if not found +unsigned int notifyhashResize(unsigned int newsize); + +#endif diff --git a/Robust/src/Runtime/DSTM/interface/trans.c b/Robust/src/Runtime/DSTM/interface/trans.c index 26facf03..ea8ecd98 100644 --- a/Robust/src/Runtime/DSTM/interface/trans.c +++ b/Robust/src/Runtime/DSTM/interface/trans.c @@ -6,6 +6,7 @@ #include "llookup.h" #include "plookup.h" #include "prelookup.h" +#include "threadnotify.h" #include "queue.h" #include #include @@ -17,7 +18,6 @@ #include #include #include -#include #define LISTEN_PORT 2156 #define RECEIVE_BUFFER_SIZE 2048 @@ -154,7 +154,6 @@ void *pCacheAlloc(objstr_t *store, unsigned int size) { } if(success == 0) { - printf("DEBUG-> Unable to insert object in Prefetch cache\n"); return NULL; } } @@ -214,7 +213,7 @@ void transExit() { /* This functions inserts randowm wait delays in the order of msec * Mostly used when transaction commits retry*/ -void randomdelay(void) +void randomdelay() { struct timespec req; time_t t; @@ -326,6 +325,8 @@ objheader_t *transRead(transrecord_t *record, unsigned int oid) { /* Get the object from the remote location */ machinenumber = lhashSearch(oid); + char* ipaddr; + midtoIP(machinenumber, ipaddr); objcopy = getRemoteObj(record, machinenumber, oid); if(objcopy == NULL) { printf("Object not found in Remote location %s, %d\n", __FILE__, __LINE__); @@ -344,6 +345,7 @@ objheader_t *transRead(transrecord_t *record, unsigned int oid) { objheader_t *transCreateObj(transrecord_t *record, unsigned int size) { objheader_t *tmp = (objheader_t *) objstrAlloc(record->cache, (sizeof(objheader_t) + size)); + tmp->notifylist = NULL; OID(tmp) = getNewOID(); tmp->version = 1; tmp->rcount = 1; @@ -441,7 +443,7 @@ int transCommit(transrecord_t *record) { /* Create a list of machine ids(Participants) involved in transaction */ if((listmid = calloc(pilecount, sizeof(unsigned int))) == NULL) { printf("Calloc error %s, %d\n", __FILE__, __LINE__); - free(record); + //free(record); return 1; } pListMid(pile, listmid); @@ -462,7 +464,7 @@ int transCommit(transrecord_t *record) { pthread_mutex_destroy(&tlock); pDelete(pile_ptr); free(listmid); - free(record); + //free(record); return 1; } @@ -474,7 +476,7 @@ int transCommit(transrecord_t *record) { pDelete(pile_ptr); free(listmid); free(thread_data_array); - free(record); + //free(record); return 1; } @@ -498,7 +500,7 @@ int transCommit(transrecord_t *record) { free(listmid); free(thread_data_array); free(ltdata); - free(record); + //free(record); return 1; } tosend->f.control = TRANS_REQUEST; @@ -537,7 +539,7 @@ int transCommit(transrecord_t *record) { free(thread_data_array[i].buffer); free(thread_data_array); free(ltdata); - free(record); + //free(record); return 1; } } else { /*Local*/ @@ -556,7 +558,7 @@ int transCommit(transrecord_t *record) { free(thread_data_array[i].buffer); free(thread_data_array); free(ltdata); - free(record); + //free(record); return 1; } } @@ -581,7 +583,7 @@ int transCommit(transrecord_t *record) { free(thread_data_array[j].buffer); free(thread_data_array); free(ltdata); - free(record); + //free(record); return 1; } free(thread_data_array[i].buffer); @@ -1050,6 +1052,11 @@ int transComProcess(local_thread_data_array_t *localtdata) { pthread_mutex_lock(&mainobjstore_mutex); memcpy(header, tcptr, tmpsize + sizeof(objheader_t)); header->version += 1; + /* If threads are waiting on this object to be updated, notify them */ + if(header->notifylist != NULL) { + notifyAll(&header->notifylist, OID(header), header->version); + } + pthread_mutex_unlock(&mainobjstore_mutex); } /* If object is newly created inside transaction then commit it */ @@ -1059,6 +1066,7 @@ int transComProcess(local_thread_data_array_t *localtdata) { return 1; } GETSIZE(tmpsize, header); + tmpsize += sizeof(objheader_t); pthread_mutex_lock(&mainobjstore_mutex); if ((ptrcreate = objstrAlloc(mainobjstore, tmpsize)) == NULL) { printf("Error: transComProcess() failed objstrAlloc\n"); @@ -1067,7 +1075,6 @@ int transComProcess(local_thread_data_array_t *localtdata) { } pthread_mutex_unlock(&mainobjstore_mutex); memcpy(ptrcreate, header, tmpsize + sizeof(objheader_t)); - mhashInsert(oidcreated[i], ptrcreate); lhashInsert(oidcreated[i], myIpAddr); } @@ -1100,6 +1107,7 @@ void checkPrefetchTuples(prefetchqelem_t *node) { oid = GET_PTR_OID(ptr); endoffsets = GET_PTR_EOFF(ptr, ntuples); arryfields = GET_PTR_ARRYFLD(ptr, ntuples); + /* Find offset length for each tuple */ int numoffset[ntuples]; numoffset[0] = endoffsets[0]; @@ -1240,10 +1248,9 @@ prefetchpile_t *foundLocal(prefetchqelem_t *node) { } if(isArray == 1) { int elementsize = classsize[TYPE(objheader)]; - struct ArrayObject *ao = (struct ArrayObject *) (tmp + sizeof(objheader_t)); objoid = *((unsigned int *)(tmp + sizeof(objheader_t) + sizeof(struct ArrayObject) + (elementsize*arryfields[arryfieldindex]))); } else { - objoid = *(tmp + sizeof(objheader_t) + arryfields[arryfieldindex]); + objoid = *((unsigned int *)(tmp + sizeof(objheader_t) + arryfields[arryfieldindex])); } //Update numoffset array numoffset[i] = numoffset[i] - 1; @@ -1315,7 +1322,7 @@ void checkPreCache(prefetchqelem_t *node, int *numoffset, unsigned int objoid, i int elementsize = classsize[TYPE(header)]; objoid = *((unsigned int *)(tmp + sizeof(objheader_t) + sizeof(struct ArrayObject) + (elementsize*arryfields[arryfieldindex]))); } else { - objoid = *(tmp + sizeof(objheader_t) + arryfields[arryfieldindex]); + objoid = *((unsigned int *)(tmp + sizeof(objheader_t) + arryfields[arryfieldindex])); } //Update numoffset array numoffset[index] = numoffset[index] - 1; @@ -1572,7 +1579,8 @@ void getPrefetchResponse(int count, int sd) { /* Increment it to get the object */ /* TODO: For each object not found query DHT for new location and retrieve the object */ index += sizeof(char); - memcpy(&oid, buffer + index, sizeof(unsigned int)); + //memcpy(&oid, buffer + index, sizeof(unsigned int)); + oid = *((unsigned int *)(buffer + index)); index += sizeof(unsigned int); /* Throw an error */ printf("OBJECT NOT FOUND.... THIS SHOULD NOT HAPPEN...TERMINATE PROGRAM\n"); @@ -1600,7 +1608,6 @@ unsigned short getObjType(unsigned int oid) { if ((objheader = (objheader_t *) prehashSearch(oid)) == NULL) { - //prefetch(1, &oid, &numoffsets, NULL); prefetch(1, &oid, numoffset, fieldoffset); pthread_mutex_lock(&pflookup.lock); while ((objheader = (objheader_t *) prehashSearch(oid)) == NULL) @@ -1778,3 +1785,157 @@ int findHost(unsigned int hostIp) //not found return -1; } + +/* This function sends notification request per thread waiting on object(s) whose version + * changes */ +void reqNotify(unsigned int *oidarry, unsigned short *versionarry, unsigned int mid, unsigned int numoid) { + int sock,i; + objheader_t *objheader; + struct sockaddr_in remoteAddr; + char msg[1 + numoid * (sizeof(short) + sizeof(unsigned int)) + sizeof(unsigned int) * 3]; + char *ptr; + int bytesSent; + int status, size; + unsigned short version; + unsigned int oid, threadid; + pthread_mutex_t threadnotify; //Lock and condition var for threadjoin and notification + pthread_cond_t threadcond; + + if ((sock = socket(AF_INET, SOCK_STREAM, 0)) < 0){ + perror("reqNotify():socket()"); + return; + } + + bzero(&remoteAddr, sizeof(remoteAddr)); + remoteAddr.sin_family = AF_INET; + remoteAddr.sin_port = htons(LISTEN_PORT); + remoteAddr.sin_addr.s_addr = htonl(mid); + + /* Generate unique threadid */ + threadid = (unsigned int) pthread_self(); + if((status = notifyhashInsert(threadid, threadcond)) != 0) { + printf("reqNotify(): Insert into notify hash table not successful %s, %d\n", __FILE__, __LINE__); + return; + } + + /* Save data that is sent for later processing */ + //Save threadid, numoid, oidarray, versionarray, also the pthread_cond_variable in a linked list + //TODO + + /* Send oidarry, version array, threadid and machine id */ + if (connect(sock, (struct sockaddr *)&remoteAddr, sizeof(remoteAddr)) < 0){ + printf("reqNotify():error %d connecting to %s:%d\n", errno, + inet_ntoa(remoteAddr.sin_addr), LISTEN_PORT); + status = -1; + } else { + msg[0] = THREAD_NOTIFY_REQUEST; + msg[1] = numoid; + /* Send array of oids */ + size = sizeof(unsigned int); + { + i = 0; + while(i < numoid) { + oid = oidarry[i]; + *((unsigned int *)(&msg[1] + size)) = oid; + size += sizeof(unsigned int); + i++; + } + } + + /* Send array of version */ + { + i = 0; + while(i < numoid) { + version = versionarry[i]; + *((unsigned short *)(&msg[1] + size)) = oid; + size += sizeof(unsigned short); + i++; + } + } + + *((unsigned int *)(&msg[1] + size)) = myIpAddr; + size += sizeof(unsigned int); + *((unsigned int *)(&msg[1] + size)) = threadid; + + pthread_mutex_lock(&threadnotify); + bytesSent = send(sock, msg, 1 + numoid * (sizeof(unsigned int) + sizeof(unsigned short)) + 2 * sizeof(unsigned int) , 0); + if (bytesSent < 0){ + perror("reqNotify():send()"); + status = -1; + } else if (bytesSent != 1 + 5*sizeof(unsigned int)){ + printf("reNotify(): error, sent %d bytes\n", bytesSent); + status = -1; + } else { + status = 0; + } + pthread_cond_wait(&threadcond, &threadnotify); + pthread_mutex_unlock(&threadnotify); + } + + close(sock); +} + +void threadNotify(unsigned int oid, unsigned short version, unsigned int tid) { + pthread_cond_t ret; + //Look up the tid and call the corresponding pthread_cond_signal + ret = notifyhashSearch(tid); + pthread_cond_signal(&ret); + //TODO process oid and version +} + +int notifyAll(threadlist_t **head, unsigned int oid, unsigned int version) { + threadlist_t *ptr; + unsigned int mid; + struct sockaddr_in remoteAddr; + char msg[1 + sizeof(unsigned short) + 2*sizeof(unsigned int)]; + int sock, status, size, bytesSent; + while(*head != NULL) { + ptr = *head; + mid = ptr->mid; + //create a socket connection to that machine + if ((sock = socket(AF_INET, SOCK_STREAM, 0)) < 0){ + perror("notifyAll():socket()"); + return -1; + } + + bzero(&remoteAddr, sizeof(remoteAddr)); + remoteAddr.sin_family = AF_INET; + remoteAddr.sin_port = htons(LISTEN_PORT); + remoteAddr.sin_addr.s_addr = htonl(mid); + //send Thread Notify response and threadid to that machine + if (connect(sock, (struct sockaddr *)&remoteAddr, sizeof(remoteAddr)) < 0){ + printf("notifyAll():error %d connecting to %s:%d\n", errno, + inet_ntoa(remoteAddr.sin_addr), LISTEN_PORT); + status = -1; + } else { + msg[0] = THREAD_NOTIFY_RESPONSE; + msg[1] = oid; + size = sizeof(unsigned int); + *(&msg[1]+ size) = version; + size+= sizeof(unsigned short); + *(&msg[1]+ size) = ptr->threadid; + + bytesSent = send(sock, msg, 1 + 2*sizeof(unsigned int) + sizeof(unsigned short), 0); + if (bytesSent < 0){ + perror("notifyAll():send()"); + status = -1; + } else if (bytesSent != 1 + 2*sizeof(unsigned int) + sizeof(unsigned short)){ + printf("notifyAll(): error, sent %d bytes\n", bytesSent); + status = -1; + } else { + status = 0; + } + } + //close socket + close(sock); + // Update head + *head = ptr->next; + free(ptr); + } +} + +void transAbort(transrecord_t *trans) { + objstrDelete(trans->cache); + chashDelete(trans->lookupTable); + free(trans); +} diff --git a/Robust/src/Runtime/thread.c b/Robust/src/Runtime/thread.c index 009677fb..cc1bed73 100644 --- a/Robust/src/Runtime/thread.c +++ b/Robust/src/Runtime/thread.c @@ -7,6 +7,7 @@ #include "option.h" #include #include +#include #include int threadcount; @@ -19,9 +20,13 @@ pthread_key_t threadlocks; pthread_mutex_t threadnotifylock; pthread_cond_t threadnotifycond; transrecord_t * trans; +pthread_key_t oid; void threadexit() { - void *ptr; + objheader_t* ptr; + void *value; + unsigned int oidvalue; + #ifdef THREADS struct ___Object___ *ll=pthread_getspecific(threadlocks); while(ll!=NULL) { @@ -40,20 +45,22 @@ void threadexit() { threadcount--; pthread_cond_signal(&gccond); pthread_mutex_unlock(&gclistlock); +#ifdef DSTM /* Add transaction to check if thread finished for join operation */ + value = pthread_getspecific(oid); + oidvalue = *((unsigned int *)value); goto transstart; -transretry: - transstart: - trans = transStart(); - ptr = (void *)transRead(trans, (unsigned int) a); - struct ___Thread___ *tmp = ((char *) ptr + sizeof(objheader_t)); - tmp->___threadDone___ = 1; - if(transCommit(trans)) { - goto transretry; - } else { - COMMIT_OBJ(); + trans = transStart(); + ptr = transRead(trans, oidvalue); + struct ___Thread___ *p = (struct ___Thread___ *) ptr; + p->___threadDone___ = 1; + while(!transCommit(trans)) { + printf("DEBUG-> Trans not committed yet\n"); + transAbort(trans); + goto transstart; } +#endif pthread_exit(NULL); } @@ -118,28 +125,48 @@ void CALL11(___Thread______sleep____J, long long ___millis___, long long ___mill } /* Add thread join capability */ -#ifdef DSTM void CALL01(___Thread______join____, struct ___Thread___ * ___this___) { - pthread_t thread; printf("DEBUG -> Inside thread join\n"); - int status; - if(VAR(___this___)->___threadDone___) { +#ifdef DSTM + pthread_t thread; + unsigned int *oidarray, mid; + unsigned short *versionarray, version; + transrecord_t *trans; + objheader_t *ptr; + /* Add transaction to check if thread finished for join operation */ +transstart: + trans = transStart(); + ptr = transRead(trans, (unsigned int) VAR(___this___)); + struct ___Thread___ *p = (struct ___Thread___ *) ptr; + if(p->___threadDone___ == 1) { + transAbort(trans); return; } else { - /* Request Notification */ - pthread_cond_broadcast(&objcond); - pthread_mutex_unlock(&objlock); - pthread_mutex_lock(&threadnotifylock);//wake everyone up - status = reqNotify((unsigned int)VAR(___this___)); - - if((status = reqNotify((unsigned int)VAR(___this___))) != 0) { - printf("No notification is sent %s, %d\n", __FILE__, __LINE__); - } else { + version = (ptr-1)->version; + if((oidarray = calloc(1, sizeof(unsigned int))) == NULL) { + printf("Calloc error %s, %d\n", __FILE__, __LINE__); + return; + } + + oidarray[0] = (unsigned int) VAR(___this___); + + if((versionarray = calloc(1, sizeof(unsigned short))) == NULL) { + printf("Calloc error %s, %d\n", __FILE__, __LINE__); + free(oidarray); return; } + versionarray[0] = version; + mid = lhashSearch((unsigned int) VAR(___this___)); + /* Request Notification */ + reqNotify(oidarray, versionarray, mid, 1); + free(oidarray); + free(versionarray); + transAbort(trans); + goto transstart; } -} + return; #endif +} #ifdef THREADS void CALL01(___Thread______nativeCreate____, struct ___Thread___ * ___this___) { @@ -170,7 +197,14 @@ void CALL12(___Thread______start____I, int ___mid___, struct ___Thread___ * ___t #endif #ifdef DSTM +void globalDestructor(void *value) { + free(value); + pthread_setspecific(oid, NULL); +} + void initDSMthread(int *ptr) { + objheader_t *tmp; + void *threadData; int oid=ptr[0]; int type=ptr[1]; free(ptr); @@ -180,25 +214,26 @@ void initDSMthread(int *ptr) { #else ((void (*)(void *))virtualtable[type*MAXCOUNT+RUNMETHOD])(oid); #endif + threadData = calloc(1, sizeof(unsigned int)); + *((unsigned int *) threadData) = oid; + pthread_setspecific(oid, threadData); pthread_mutex_lock(&gclistlock); - threadcount--; - pthread_cond_signal(&gccond); - pthread_mutex_unlock(&gclistlock); - /* Add transaction to check if thread finished for join operation */ - goto transstart; -transretry: - //TODO - + threadcount--; + pthread_cond_signal(&gccond); + pthread_mutex_unlock(&gclistlock); + /* Add transaction to check if thread finished for join operation */ + goto transstart; transstart: - trans = transStart(); - ptr = (void *)transRead(trans, (unsigned int) oid); - struct ___Thread___ *tmp = ((char *) ptr + sizeof(objheader_t)); - tmp->___threadDone___ = 1; - if(transCommit(trans)) { - goto transretry; - } else { - //TODO - } + trans = transStart(); + tmp = transRead(trans, (unsigned int) oid); + struct ___Thread___ *t = (struct ___Thread___ *) tmp; + t->___threadDone___ = 1; + while(!transCommit(trans)) { + printf("DEBUG-> Trans not committed yet\n"); + transAbort(trans); + goto transstart; + } + pthread_exit(NULL); } void startDSMthread(int oid, int objType) { @@ -214,6 +249,7 @@ void startDSMthread(int oid, int objType) { int * ptr=malloc(sizeof(int)*2); ptr[0]=oid; ptr[1]=objType; + pthread_key_create(&oid, globalDestructor); do { retval=pthread_create(&thread, &nattr, (void * (*)(void *)) &initDSMthread, ptr); if (retval!=0) diff --git a/Robust/src/Tests/Atomic3.java b/Robust/src/Tests/Atomic3.java index 6eba697c..e38acce1 100644 --- a/Robust/src/Tests/Atomic3.java +++ b/Robust/src/Tests/Atomic3.java @@ -1,13 +1,13 @@ public class Atomic3 extends Thread { public Atomic3() { } - static Tree root; + Tree root; Integer count; public static void main(String[] st) { int mid = (128<<24)|(195<<16)|(175<<8)|70; int b; Atomic3 at3 = null; - Integer z; + Integer y,z; atomic { at3 = global new Atomic3(); z = global new Integer(300); @@ -15,9 +15,15 @@ public class Atomic3 extends Thread { at3.root.insert(z); b = at3.root.value.intValue(); } + System.printString("b is "); + System.printInt(b); atomic{ at3.root.item = 2445; + y = global new Integer(400); + at3.root.value = y; + b = at3.root.value.intValue(); } + System.printString("b is "); System.printInt(b); System.printString("\n"); System.printString("Starting\n"); @@ -31,10 +37,9 @@ public class Atomic3 extends Thread { public int run() { int a; atomic { - //FIXME a bug value of trans commit is not saved a = root.value.intValue(); - //a = root.item; } + System.printString("a is "); System.printInt(a); System.printString("\n"); } diff --git a/Robust/src/Tests/Atomic4.java b/Robust/src/Tests/Atomic4.java index 563acbf9..d883c0b9 100644 --- a/Robust/src/Tests/Atomic4.java +++ b/Robust/src/Tests/Atomic4.java @@ -86,6 +86,6 @@ public class People { public boolean isSenior() { if(this.getAge() > 65) return true; - return false;; + return false; } } diff --git a/Robust/src/buildscript b/Robust/src/buildscript index 85e31c0d..6c8f1f10 100755 --- a/Robust/src/buildscript +++ b/Robust/src/buildscript @@ -223,7 +223,7 @@ $ROBUSTROOT/Runtime/GenericHashtable.c $ROBUSTROOT/Runtime/object.c" if $DSMFLAG then EXTRAOPTIONS="$EXTRAOPTIONS -lpthread -DCOMPILER -DDSTM -I$DSMRUNTIME" -FILES="$FILES $DSMRUNTIME/trans.c $DSMRUNTIME/mcpileq.c $DSMRUNTIME/objstr.c $DSMRUNTIME/dstm.c $DSMRUNTIME/mlookup.c $DSMRUNTIME/clookup.c $DSMRUNTIME/llookup.c $DSMRUNTIME/dstmserver.c $DSMRUNTIME/plookup.c $DSMRUNTIME/ip.c $DSMRUNTIME/queue.c $DSMRUNTIME/prelookup.c $DSMRUNTIME/machinepile.c $DSMRUNTIME/localobjects.c $ROBUSTROOT/Runtime/thread.c" +FILES="$FILES $DSMRUNTIME/trans.c $DSMRUNTIME/mcpileq.c $DSMRUNTIME/objstr.c $DSMRUNTIME/dstm.c $DSMRUNTIME/mlookup.c $DSMRUNTIME/clookup.c $DSMRUNTIME/llookup.c $DSMRUNTIME/threadnotify.c $DSMRUNTIME/dstmserver.c $DSMRUNTIME/plookup.c $DSMRUNTIME/ip.c $DSMRUNTIME/queue.c $DSMRUNTIME/prelookup.c $DSMRUNTIME/machinepile.c $DSMRUNTIME/localobjects.c $ROBUSTROOT/Runtime/thread.c" fi if $RECOVERFLAG