From 159c180e5e02e77fd5d21c1ffc4a21b56b8258cc Mon Sep 17 00:00:00 2001 From: adash Date: Mon, 21 Jan 2008 22:42:31 +0000 Subject: [PATCH] more changes and some bug fixes for thread notify --- Robust/src/Runtime/DSTM/interface/dstm.h | 12 +-- .../src/Runtime/DSTM/interface/dstmserver.c | 9 +- .../src/Runtime/DSTM/interface/threadnotify.c | 36 +++---- .../src/Runtime/DSTM/interface/threadnotify.h | 30 ++++-- Robust/src/Runtime/DSTM/interface/trans.c | 96 ++++++++++++++----- Robust/src/Runtime/thread.c | 11 +-- 6 files changed, 122 insertions(+), 72 deletions(-) diff --git a/Robust/src/Runtime/DSTM/interface/dstm.h b/Robust/src/Runtime/DSTM/interface/dstm.h index ab3cc4ed..0d8b03d9 100644 --- a/Robust/src/Runtime/DSTM/interface/dstm.h +++ b/Robust/src/Runtime/DSTM/interface/dstm.h @@ -185,16 +185,6 @@ typedef struct local_thread_data_array { trans_commit_data_t *transinfo; /* Holds information of objects locked and not found in the participant */ } local_thread_data_array_t; -//Structure for objects involved in wait-notify call -//TODO Use it -typedef struct notifydata { - unsigned int numoid; /* Number of oids on which we are waiting for updated notification */ - unsigned int threadid; /* The threadid that is waiting for update notification response*/ - unsigned int *oidarry; /* Pointer to array of oids */ - unsigned short *version;/* Pointer to array of versions of the oids that we are waiting on */ -}notifydata_t; - - /* Initialize main object store and lookup tables, start server thread. */ int dstmInit(void); @@ -257,7 +247,7 @@ void getPrefetchResponse(int, int); unsigned short getObjType(unsigned int oid); int startRemoteThread(unsigned int oid, unsigned int mid); /* Sends notification request for thread join, if sucessful returns 0 else returns -1 */ -void reqNotify(unsigned int *oidarry, unsigned short *versionarry, unsigned int mid, unsigned int numoid); +int reqNotify(unsigned int *oidarry, unsigned short *versionarry, unsigned int numoid); void threadNotify(unsigned int oid, unsigned short version, unsigned int tid); int notifyAll(threadlist_t **head, unsigned int oid, unsigned int version); diff --git a/Robust/src/Runtime/DSTM/interface/dstmserver.c b/Robust/src/Runtime/DSTM/interface/dstmserver.c index 3ec7635c..5928799c 100644 --- a/Robust/src/Runtime/DSTM/interface/dstmserver.c +++ b/Robust/src/Runtime/DSTM/interface/dstmserver.c @@ -675,7 +675,6 @@ int prefetchReq(int acceptfd) { } while(sum < N && n != 0); /* Process each oid */ - printf("Oid 0x%x is being searched\n", oid); if ((mobj = mhashSearch(oid)) == NULL) {/* Obj not found */ /* Save the oids not found in buffer for later use */ *(buffer + index) = OBJECT_NOT_FOUND; @@ -760,7 +759,7 @@ void processReqNotify(unsigned int numoid, unsigned int *oidarry, unsigned short objheader_t *header; unsigned int oid; unsigned short newversion; - char msg[1+ sizeof(unsigned int)]; + char msg[1+ 2 * sizeof(unsigned int) + sizeof(unsigned short)]; int sd; struct sockaddr_in remoteAddr; int bytesSent; @@ -799,10 +798,10 @@ checkversion: msg[0] = THREAD_NOTIFY_RESPONSE; msg[1] = oid; size = sizeof(unsigned int); - memcpy(&msg[1] + size, &newversion, sizeof(unsigned short)); + *((unsigned short *)(&msg[1]+size)) = newversion; size += sizeof(unsigned short); - memcpy(&msg[1] + size, &threadid, sizeof(unsigned int)); - bytesSent = send(sd, msg, 1+sizeof(unsigned int), 0); + *((unsigned int *)(&msg[1]+size)) = threadid; + bytesSent = send(sd, msg, 1+ 2*sizeof(unsigned int) + sizeof(unsigned short), 0); if (bytesSent < 0){ perror("processReqNotify():send()"); status = -1; diff --git a/Robust/src/Runtime/DSTM/interface/threadnotify.c b/Robust/src/Runtime/DSTM/interface/threadnotify.c index cfa45eec..eb2f5f39 100644 --- a/Robust/src/Runtime/DSTM/interface/threadnotify.c +++ b/Robust/src/Runtime/DSTM/interface/threadnotify.c @@ -2,6 +2,9 @@ notifyhashtable_t nlookup; //Global hash table +/* This function creates a new node in the linked list of threads waiting + * for an update notification from a particular object. + * This takes in the head of the linked list and inserts the new node to it */ void insNode(threadlist_t *head, unsigned int threadid, unsigned int mid) { threadlist_t *ptr; if(head == NULL) { @@ -24,6 +27,8 @@ void insNode(threadlist_t *head, unsigned int threadid, unsigned int mid) { } } +/* This function displays the linked list of threads waiting on update notification + * from an object */ void display(threadlist_t *head) { threadlist_t *ptr; if(head == NULL) { @@ -39,6 +44,8 @@ void display(threadlist_t *head) { } } +/* This function creates a new hash table that stores a mapping between the threadid and + * a pointer to the thread notify data */ unsigned int notifyhashCreate(unsigned int size, float loadfactor) { notifylistnode_t *nodes; @@ -62,8 +69,8 @@ unsigned int notifyhashFunction(unsigned int tid) { return( tid % (nlookup.size)); } -// Insert threadcond and threadid mapping into the hash table -unsigned int notifyhashInsert(unsigned int tid, pthread_cond_t threadcond) { +// Insert pointer to the notify data and threadid mapping into the hash table +unsigned int notifyhashInsert(unsigned int tid, notifydata_t *ndata) { unsigned int newsize; int index; notifylistnode_t *ptr, *node; @@ -85,7 +92,7 @@ unsigned int notifyhashInsert(unsigned int tid, pthread_cond_t threadcond) { pthread_mutex_lock(&nlookup.locktable); if(ptr[index].next == NULL && ptr[index].threadid == 0) { // Insert at the first position in the hashtable ptr[index].threadid = tid; - ptr[index].threadcond = threadcond; + ptr[index].ndata = ndata; } else { // Insert in the beginning of linked list if ((node = calloc(1, sizeof(notifylistnode_t))) == NULL) { printf("Calloc error %s, %d\n", __FILE__, __LINE__); @@ -93,7 +100,7 @@ unsigned int notifyhashInsert(unsigned int tid, pthread_cond_t threadcond) { return 1; } node->threadid = tid; - node->threadcond = threadcond; + node->ndata = ndata; node->next = ptr[index].next; ptr[index].next = node; } @@ -101,11 +108,10 @@ unsigned int notifyhashInsert(unsigned int tid, pthread_cond_t threadcond) { return 0; } -// Return pthread_cond_t threadcond for a given threadid in the hash table -pthread_cond_t notifyhashSearch(unsigned int tid) { +// Return pointer to thread notify data for a given threadid in the hash table +notifydata_t *notifyhashSearch(unsigned int tid) { int index; notifylistnode_t *ptr, *node; - pthread_cond_t tmp = PTHREAD_COND_INITIALIZER; ptr = nlookup.table; // Address of the beginning of hash table index = notifyhashFunction(tid); @@ -114,35 +120,33 @@ pthread_cond_t notifyhashSearch(unsigned int tid) { while(node != NULL) { if(node->threadid == tid) { pthread_mutex_unlock(&nlookup.locktable); - return node->threadcond; + return node->ndata; } node = node->next; } pthread_mutex_unlock(&nlookup.locktable); - return tmp; + return NULL; } // Remove an entry from the hash table unsigned int notifyhashRemove(unsigned int tid) { int index; - notifylistnode_t *curr, *prev; - notifylistnode_t *ptr, *node; + notifylistnode_t *curr, *prev, *ptr, *node; ptr = nlookup.table; index = notifyhashFunction(tid); curr = &ptr[index]; - pthread_cond_t tmp = PTHREAD_COND_INITIALIZER; pthread_mutex_lock(&nlookup.locktable); for (; curr != NULL; curr = curr->next) { if (curr->threadid == tid) { // Find a match in the hash table nlookup.numelements--; // Decrement the number of elements in the global hashtable if ((curr == &ptr[index]) && (curr->next == NULL)) { // Delete the first item inside the hashtable with no linked list of notifylistnode_t curr->threadid = 0; - curr->threadcond = tmp; + curr->ndata = NULL; } else if ((curr == &ptr[index]) && (curr->next != NULL)) { //Delete the first bin item with a linked list of notifylistnode_t connected curr->threadid = curr->next->threadid; - curr->threadcond = curr->next->threadcond; + curr->ndata = curr->next->ndata; node = curr->next; curr->next = curr->next->next; free(node); @@ -194,7 +198,7 @@ unsigned int notifyhashResize(unsigned int newsize) { // Insert into the new table if(nlookup.table[index].next == NULL && nlookup.table[index].threadid == 0) { nlookup.table[index].threadid = curr->threadid; - nlookup.table[index].threadcond = curr->threadcond; + nlookup.table[index].ndata = curr->ndata; nlookup.numelements++; }else { if((newnode = calloc(1, sizeof(notifylistnode_t))) == NULL) { @@ -202,7 +206,7 @@ unsigned int notifyhashResize(unsigned int newsize) { return 1; } newnode->threadid = curr->threadid; - newnode->threadcond = curr->threadcond; + newnode->ndata = curr->ndata; newnode->next = nlookup.table[index].next; nlookup.table[index].next = newnode; nlookup.numelements++; diff --git a/Robust/src/Runtime/DSTM/interface/threadnotify.h b/Robust/src/Runtime/DSTM/interface/threadnotify.h index 75071a77..6749af42 100644 --- a/Robust/src/Runtime/DSTM/interface/threadnotify.h +++ b/Robust/src/Runtime/DSTM/interface/threadnotify.h @@ -15,27 +15,37 @@ typedef struct threadlist { struct threadlist *next; } threadlist_t; +//Structure for objects involved in wait-notify call +typedef struct notifydata { + unsigned int numoid; /* Number of oids on which we are waiting for updated notification */ + unsigned int threadid; /* The threadid that is waiting for update notification response*/ + unsigned int *oidarry; /* Pointer to array of oids that this threadid is waiting on*/ + unsigned short *versionarry;/* Pointer to array of versions of the oids that we are waiting on */ + pthread_cond_t threadcond; /* Cond variable associated with each threadid that needs to be signaled*/ +}notifydata_t; + typedef struct notifylistnode { unsigned int threadid; - pthread_cond_t threadcond; + notifydata_t *ndata; struct notifylistnode *next; } notifylistnode_t; typedef struct notifyhashtable { - notifylistnode_t *table; //points to beginning of hash table + notifylistnode_t *table; //Points to beginning of hash table unsigned int size; unsigned int numelements; float loadfactor; - pthread_mutex_t locktable; + pthread_mutex_t locktable; //Lock for the hashtable } notifyhashtable_t; -void insNode(threadlist_t *head, unsigned int threadid, unsigned int mid); -void display(threadlist_t *head); -unsigned int notifyhashCreate(unsigned int size, float loadfactor); -unsigned int notifyhashFunction(unsigned int tid); -unsigned notifyhashInsert(unsigned int tid, pthread_cond_t threadcond); -pthread_cond_t notifyhashSearch(unsigned int tid); //returns val, NULL if not found -unsigned int notifyhashRemove(unsigned int tid); //returns -1 if not found +void insNode(threadlist_t *head, unsigned int threadid, unsigned int mid); //Inserts nodes for one object that + //needs to send notification to threads waiting on it +void display(threadlist_t *head);// Displays linked list of nodes for one object +unsigned int notifyhashCreate(unsigned int size, float loadfactor); //returns 1 if hashtable creation is not successful +unsigned int notifyhashFunction(unsigned int tid); //returns index in the hash table +unsigned int notifyhashInsert(unsigned int tid, notifydata_t *ndata); //returns 1 if insert not successful +notifydata_t *notifyhashSearch(unsigned int tid); //returns pointer to notify data, NULL if not found +unsigned int notifyhashRemove(unsigned int tid); //returns 1 if not successful unsigned int notifyhashResize(unsigned int newsize); #endif diff --git a/Robust/src/Runtime/DSTM/interface/trans.c b/Robust/src/Runtime/DSTM/interface/trans.c index ea8ecd98..48dcd424 100644 --- a/Robust/src/Runtime/DSTM/interface/trans.c +++ b/Robust/src/Runtime/DSTM/interface/trans.c @@ -1019,13 +1019,13 @@ int transAbortProcess(local_thread_data_array_t *localtdata) { STATUS(((objheader_t *)header)) &= ~(LOCK); } - printf("TRANS_ABORTED at Coordinator end\n"); - + printf("TRANS_ABORTED\n"); return 0; } /*This function completes the COMMIT process is the transaction is commiting*/ int transComProcess(local_thread_data_array_t *localtdata) { + static int prevsize = 0, *prevptr; objheader_t *header, *tcptr; int i, nummod, tmpsize, numcreated, numlocked; unsigned int *oidmod, *oidcreated, *oidlocked; @@ -1086,6 +1086,7 @@ int transComProcess(local_thread_data_array_t *localtdata) { } STATUS(header) &= ~(LOCK); } + return 0; } @@ -1440,9 +1441,10 @@ void sendPrefetchReq(prefetchpile_t *mcpilenode, int threadid) { /* Send Trans Prefetch Request */ if ((sd = socket(AF_INET, SOCK_STREAM, 0)) < 0) { - perror("Error in socket for TRANS_REQUEST\n"); + perror("Error in socket for SEND_PREFETCH_REQUEST\n"); return; } + bzero((char*) &serv_addr, sizeof(serv_addr)); serv_addr.sin_family = AF_INET; serv_addr.sin_port = htons(LISTEN_PORT); @@ -1452,7 +1454,7 @@ void sendPrefetchReq(prefetchpile_t *mcpilenode, int threadid) { /* Open Connection */ if (connect(sd, (struct sockaddr *) &serv_addr, sizeof(struct sockaddr)) < 0) { - perror("Error in connect for TRANS_REQUEST\n"); + perror("Error in connect for SEND_PREFETCH_REQUEST\n"); close(sd); return; } @@ -1788,22 +1790,27 @@ int findHost(unsigned int hostIp) /* This function sends notification request per thread waiting on object(s) whose version * changes */ -void reqNotify(unsigned int *oidarry, unsigned short *versionarry, unsigned int mid, unsigned int numoid) { +int reqNotify(unsigned int *oidarry, unsigned short *versionarry, unsigned int numoid) { int sock,i; objheader_t *objheader; struct sockaddr_in remoteAddr; - char msg[1 + numoid * (sizeof(short) + sizeof(unsigned int)) + sizeof(unsigned int) * 3]; + char msg[1 + numoid * (sizeof(short) + sizeof(unsigned int)) + 3 * sizeof(unsigned int)]; char *ptr; int bytesSent; int status, size; unsigned short version; - unsigned int oid, threadid; + unsigned int oid, threadid, mid; pthread_mutex_t threadnotify; //Lock and condition var for threadjoin and notification pthread_cond_t threadcond; + notifydata_t *ndata; + + //FIXME currently all oids belong to one machine + oid = oidarry[0]; + mid = lhashSearch(oid); if ((sock = socket(AF_INET, SOCK_STREAM, 0)) < 0){ perror("reqNotify():socket()"); - return; + return -1; } bzero(&remoteAddr, sizeof(remoteAddr)); @@ -1813,20 +1820,29 @@ void reqNotify(unsigned int *oidarry, unsigned short *versionarry, unsigned int /* Generate unique threadid */ threadid = (unsigned int) pthread_self(); - if((status = notifyhashInsert(threadid, threadcond)) != 0) { + + /* Save threadid, numoid, oidarray, versionarray, pthread_cond_variable for later processing */ + if((ndata = calloc(1, sizeof(notifydata_t))) == NULL) { + printf("Calloc Error %s, %d\n", __FILE__, __LINE__); + return -1; + } + ndata->numoid = numoid; + ndata->threadid = threadid; + ndata->oidarry = oidarry; + ndata->versionarry = versionarry; + ndata->threadcond = threadcond; + if((status = notifyhashInsert(threadid, ndata)) != 0) { printf("reqNotify(): Insert into notify hash table not successful %s, %d\n", __FILE__, __LINE__); - return; + free(ndata); + return -1; } - - /* Save data that is sent for later processing */ - //Save threadid, numoid, oidarray, versionarray, also the pthread_cond_variable in a linked list - //TODO - - /* Send oidarry, version array, threadid and machine id */ + + /* Send number of oids, oidarry, version array, machine id and threadid */ if (connect(sock, (struct sockaddr *)&remoteAddr, sizeof(remoteAddr)) < 0){ printf("reqNotify():error %d connecting to %s:%d\n", errno, inet_ntoa(remoteAddr.sin_addr), LISTEN_PORT); - status = -1; + free(ndata); + return -1; } else { msg[0] = THREAD_NOTIFY_REQUEST; msg[1] = numoid; @@ -1873,14 +1889,47 @@ void reqNotify(unsigned int *oidarry, unsigned short *versionarry, unsigned int } close(sock); + return status; } void threadNotify(unsigned int oid, unsigned short version, unsigned int tid) { - pthread_cond_t ret; + notifydata_t *ndata; + int i, objIsFound = 0, index; + void *ptr; + //Look up the tid and call the corresponding pthread_cond_signal - ret = notifyhashSearch(tid); - pthread_cond_signal(&ret); - //TODO process oid and version + if((ndata = notifyhashSearch(tid)) == NULL) { + printf("threadnotify(): No such threadid is present %s, %d\n", __FILE__, __LINE__); + return; + } else { + for(i = 0; i < ndata->numoid; i++) { + if(ndata->oidarry[i] == oid){ + objIsFound = 1; + index = i; + } + } + if(objIsFound == 0){ + return; + } else { + if(version <= ndata->versionarry[index]){ + return; + } else { + /* Clear from prefetch cache and free thread related data structure */ + if((ptr = prehashSearch(oid)) == NULL) { + //TODO Ask about freeing + printf("threadnotify(): No such oid %s, %d\n", __FILE__, __LINE__); + pthread_cond_signal(&ndata->threadcond); + free(ndata); + return; + } else { + prehashRemove(oid); + pthread_cond_signal(&ndata->threadcond); + free(ndata); + } + } + } + } + return; } int notifyAll(threadlist_t **head, unsigned int oid, unsigned int version) { @@ -1911,9 +1960,9 @@ int notifyAll(threadlist_t **head, unsigned int oid, unsigned int version) { msg[0] = THREAD_NOTIFY_RESPONSE; msg[1] = oid; size = sizeof(unsigned int); - *(&msg[1]+ size) = version; + *((unsigned short *)(&msg[1]+ size)) = version; size+= sizeof(unsigned short); - *(&msg[1]+ size) = ptr->threadid; + *((unsigned int *)(&msg[1]+ size)) = ptr->threadid; bytesSent = send(sock, msg, 1 + 2*sizeof(unsigned int) + sizeof(unsigned short), 0); if (bytesSent < 0){ @@ -1932,6 +1981,7 @@ int notifyAll(threadlist_t **head, unsigned int oid, unsigned int version) { *head = ptr->next; free(ptr); } + return status; } void transAbort(transrecord_t *trans) { diff --git a/Robust/src/Runtime/thread.c b/Robust/src/Runtime/thread.c index cc1bed73..5523426a 100644 --- a/Robust/src/Runtime/thread.c +++ b/Robust/src/Runtime/thread.c @@ -55,8 +55,7 @@ transstart: ptr = transRead(trans, oidvalue); struct ___Thread___ *p = (struct ___Thread___ *) ptr; p->___threadDone___ = 1; - while(!transCommit(trans)) { - printf("DEBUG-> Trans not committed yet\n"); + if(transCommit(trans) != 0) { transAbort(trans); goto transstart; } @@ -129,7 +128,7 @@ void CALL01(___Thread______join____, struct ___Thread___ * ___this___) { printf("DEBUG -> Inside thread join\n"); #ifdef DSTM pthread_t thread; - unsigned int *oidarray, mid; + unsigned int *oidarray; unsigned short *versionarray, version; transrecord_t *trans; objheader_t *ptr; @@ -156,9 +155,8 @@ transstart: return; } versionarray[0] = version; - mid = lhashSearch((unsigned int) VAR(___this___)); /* Request Notification */ - reqNotify(oidarray, versionarray, mid, 1); + reqNotify(oidarray, versionarray, 1); free(oidarray); free(versionarray); transAbort(trans); @@ -228,8 +226,7 @@ transstart: tmp = transRead(trans, (unsigned int) oid); struct ___Thread___ *t = (struct ___Thread___ *) tmp; t->___threadDone___ = 1; - while(!transCommit(trans)) { - printf("DEBUG-> Trans not committed yet\n"); + if(transCommit(trans)!= 0) { transAbort(trans); goto transstart; } -- 2.34.1