more changes and some bug fixes for thread notify
authoradash <adash>
Mon, 21 Jan 2008 22:42:31 +0000 (22:42 +0000)
committeradash <adash>
Mon, 21 Jan 2008 22:42:31 +0000 (22:42 +0000)
Robust/src/Runtime/DSTM/interface/dstm.h
Robust/src/Runtime/DSTM/interface/dstmserver.c
Robust/src/Runtime/DSTM/interface/threadnotify.c
Robust/src/Runtime/DSTM/interface/threadnotify.h
Robust/src/Runtime/DSTM/interface/trans.c
Robust/src/Runtime/thread.c

index ab3cc4ed2094c7dff5bbd10777e6045ed5bc1a24..0d8b03d909671da66a87d828eca64477c8c3e999 100644 (file)
@@ -185,16 +185,6 @@ typedef struct local_thread_data_array {
        trans_commit_data_t *transinfo; /* Holds information of objects locked and not found in the participant */ 
 } local_thread_data_array_t;
 
-//Structure for objects involved in wait-notify call
-//TODO Use it
-typedef struct notifydata {
-       unsigned int numoid;    /* Number of oids on which we are waiting for updated notification */
-       unsigned int threadid;  /* The threadid that is waiting for  update notification response*/
-       unsigned int *oidarry;  /* Pointer to array of oids */
-       unsigned short *version;/* Pointer to array of versions of the oids that we are waiting on */
-}notifydata_t;
-
-
 /* Initialize main object store and lookup tables, start server thread. */
 int dstmInit(void);
 
@@ -257,7 +247,7 @@ void getPrefetchResponse(int, int);
 unsigned short getObjType(unsigned int oid);
 int startRemoteThread(unsigned int oid, unsigned int mid);
 /* Sends notification request for thread join, if sucessful returns 0 else returns -1 */
-void reqNotify(unsigned int *oidarry, unsigned short *versionarry, unsigned int mid, unsigned int numoid);
+int reqNotify(unsigned int *oidarry, unsigned short *versionarry, unsigned int numoid);
 void threadNotify(unsigned int oid, unsigned short version, unsigned int tid);
 int notifyAll(threadlist_t **head, unsigned int oid, unsigned int version);
 
index 3ec7635ca6b688bf82905a6164e1a2bf97cfe239..5928799c780694b28b17ce4ab84ff96090424741 100644 (file)
@@ -675,7 +675,6 @@ int prefetchReq(int acceptfd) {
     } while(sum < N && n != 0);        
     
     /* Process each oid */
-    printf("Oid 0x%x is being searched\n", oid);
     if ((mobj = mhashSearch(oid)) == NULL) {/* Obj not found */
       /* Save the oids not found in buffer for later use */
       *(buffer + index) = OBJECT_NOT_FOUND;
@@ -760,7 +759,7 @@ void processReqNotify(unsigned int numoid, unsigned int *oidarry, unsigned short
        objheader_t *header;
        unsigned int oid;
        unsigned short newversion;
-       char msg[1+ sizeof(unsigned int)];
+       char msg[1+  2 * sizeof(unsigned int) + sizeof(unsigned short)];
        int sd;
        struct sockaddr_in remoteAddr;
        int bytesSent;
@@ -799,10 +798,10 @@ checkversion:
                                                msg[0] = THREAD_NOTIFY_RESPONSE;
                                                msg[1] = oid;
                                                size = sizeof(unsigned int);
-                                               memcpy(&msg[1] + size, &newversion, sizeof(unsigned short)); 
+                                               *((unsigned short *)(&msg[1]+size)) = newversion;
                                                size += sizeof(unsigned short);
-                                               memcpy(&msg[1] + size, &threadid, sizeof(unsigned int)); 
-                                               bytesSent = send(sd, msg, 1+sizeof(unsigned int), 0);
+                                               *((unsigned int *)(&msg[1]+size)) = threadid;
+                                               bytesSent = send(sd, msg, 1+ 2*sizeof(unsigned int) + sizeof(unsigned short), 0);
                                                if (bytesSent < 0){
                                                        perror("processReqNotify():send()");
                                                        status = -1;
index cfa45eeceaa517a7bf27412554f931af1a744c6d..eb2f5f39be3757e3b4897b8dfa53e1a8d9abde07 100644 (file)
@@ -2,6 +2,9 @@
 
 notifyhashtable_t nlookup; //Global hash table
 
+/* This function creates a new node in the linked list of threads waiting
+ * for an update notification from a particular object.
+ * This takes in the head of the linked list and inserts the new node to it */
 void insNode(threadlist_t *head, unsigned int threadid, unsigned int mid) {
        threadlist_t *ptr;
        if(head == NULL) {
@@ -24,6 +27,8 @@ void insNode(threadlist_t *head, unsigned int threadid, unsigned int mid) {
        }
 }
 
+/* This function displays the linked list of threads waiting on update notification
+ * from an object */
 void display(threadlist_t *head) {
        threadlist_t *ptr;
        if(head == NULL) {
@@ -39,6 +44,8 @@ void display(threadlist_t *head) {
        }
 }
 
+/* This function creates a new hash table that stores a mapping between the threadid and
+ * a pointer to the thread notify data */
 unsigned int notifyhashCreate(unsigned int size, float loadfactor) { 
        notifylistnode_t *nodes;
 
@@ -62,8 +69,8 @@ unsigned int notifyhashFunction(unsigned int tid) {
        return( tid % (nlookup.size));
 }
 
-// Insert threadcond and threadid mapping into the hash table
-unsigned int notifyhashInsert(unsigned int tid, pthread_cond_t threadcond) {
+// Insert pointer to the notify data and threadid mapping into the hash table
+unsigned int notifyhashInsert(unsigned int tid, notifydata_t *ndata) {
        unsigned int newsize;
        int index;
        notifylistnode_t *ptr, *node;
@@ -85,7 +92,7 @@ unsigned int notifyhashInsert(unsigned int tid, pthread_cond_t threadcond) {
        pthread_mutex_lock(&nlookup.locktable);
        if(ptr[index].next == NULL && ptr[index].threadid == 0) {       // Insert at the first position in the hashtable
                ptr[index].threadid = tid;
-               ptr[index].threadcond = threadcond;
+               ptr[index].ndata = ndata;
        } else {                        // Insert in the beginning of linked list
                if ((node = calloc(1, sizeof(notifylistnode_t))) == NULL) {
                        printf("Calloc error %s, %d\n", __FILE__, __LINE__);
@@ -93,7 +100,7 @@ unsigned int notifyhashInsert(unsigned int tid, pthread_cond_t threadcond) {
                        return 1;
                }
                node->threadid = tid;
-               node->threadcond = threadcond;
+               node->ndata = ndata;
                node->next = ptr[index].next;
                ptr[index].next = node;
        }
@@ -101,11 +108,10 @@ unsigned int notifyhashInsert(unsigned int tid, pthread_cond_t threadcond) {
        return 0;
 }
 
-// Return pthread_cond_t threadcond for a given threadid in the hash table
-pthread_cond_t notifyhashSearch(unsigned int tid) {
+// Return pointer to thread notify data for a given threadid in the hash table
+notifydata_t  *notifyhashSearch(unsigned int tid) {
        int index;
        notifylistnode_t *ptr, *node;
-       pthread_cond_t tmp = PTHREAD_COND_INITIALIZER;
 
        ptr = nlookup.table;    // Address of the beginning of hash table       
        index = notifyhashFunction(tid);
@@ -114,35 +120,33 @@ pthread_cond_t notifyhashSearch(unsigned int tid) {
        while(node != NULL) {
                if(node->threadid == tid) {
                        pthread_mutex_unlock(&nlookup.locktable);
-                       return node->threadcond;
+                       return node->ndata;
                }
                node = node->next;
        }
        pthread_mutex_unlock(&nlookup.locktable);
-       return tmp;
+       return NULL;
 }
 
 // Remove an entry from the hash table
 unsigned int notifyhashRemove(unsigned int tid) {
        int index;
-       notifylistnode_t *curr, *prev;
-       notifylistnode_t *ptr, *node;
+       notifylistnode_t *curr, *prev, *ptr, *node;
 
        ptr = nlookup.table;
        index = notifyhashFunction(tid);
        curr = &ptr[index];
 
-       pthread_cond_t tmp = PTHREAD_COND_INITIALIZER;
        pthread_mutex_lock(&nlookup.locktable);
        for (; curr != NULL; curr = curr->next) {
                if (curr->threadid == tid) {         // Find a match in the hash table
                        nlookup.numelements--;  // Decrement the number of elements in the global hashtable
                        if ((curr == &ptr[index]) && (curr->next == NULL))  { // Delete the first item inside the hashtable with no linked list of notifylistnode_t 
                                curr->threadid = 0;
-                               curr->threadcond = tmp;
+                               curr->ndata = NULL;
                        } else if ((curr == &ptr[index]) && (curr->next != NULL)) { //Delete the first bin item with a linked list of notifylistnode_t  connected 
                                curr->threadid = curr->next->threadid;
-                               curr->threadcond = curr->next->threadcond;
+                               curr->ndata = curr->next->ndata;
                                node = curr->next;
                                curr->next = curr->next->next;
                                free(node);
@@ -194,7 +198,7 @@ unsigned int notifyhashResize(unsigned int newsize) {
                        // Insert into the new table
                        if(nlookup.table[index].next == NULL && nlookup.table[index].threadid == 0) { 
                                nlookup.table[index].threadid = curr->threadid;
-                               nlookup.table[index].threadcond = curr->threadcond;
+                               nlookup.table[index].ndata = curr->ndata;
                                nlookup.numelements++;
                        }else { 
                                if((newnode = calloc(1, sizeof(notifylistnode_t))) == NULL) { 
@@ -202,7 +206,7 @@ unsigned int notifyhashResize(unsigned int newsize) {
                                        return 1;
                                }       
                                newnode->threadid = curr->threadid;
-                               newnode->threadcond = curr->threadcond;
+                               newnode->ndata = curr->ndata;
                                newnode->next = nlookup.table[index].next;
                                nlookup.table[index].next = newnode;    
                                nlookup.numelements++;
index 75071a77e818db35b92649a2cb1063ee0c240e3f..6749af4255d503f6d7dfbca9a414825a564493f1 100644 (file)
@@ -15,27 +15,37 @@ typedef struct threadlist {
        struct threadlist *next;
 } threadlist_t;
 
+//Structure for objects involved in wait-notify call
+typedef struct notifydata {
+       unsigned int numoid;    /* Number of oids on which we are waiting for updated notification */
+       unsigned int threadid;  /* The threadid that is waiting for  update notification response*/
+       unsigned int *oidarry;  /* Pointer to array of oids that this threadid is waiting on*/
+       unsigned short *versionarry;/* Pointer to array of versions of the oids that we are waiting on */
+       pthread_cond_t threadcond; /* Cond variable associated with each threadid that needs to be signaled*/
+}notifydata_t;
+
 typedef struct notifylistnode {
        unsigned int threadid;
-       pthread_cond_t threadcond;
+       notifydata_t *ndata; 
        struct notifylistnode *next;
 } notifylistnode_t;
 
 typedef struct notifyhashtable {
-       notifylistnode_t *table; //points to beginning of hash table
+       notifylistnode_t *table; //Points to beginning of hash table
        unsigned int size;
        unsigned int numelements;
        float loadfactor;
-       pthread_mutex_t locktable;
+       pthread_mutex_t locktable; //Lock for the hashtable
 } notifyhashtable_t;
 
-void insNode(threadlist_t *head, unsigned int threadid, unsigned int mid);
-void display(threadlist_t *head);
-unsigned int notifyhashCreate(unsigned int size, float loadfactor);
-unsigned int notifyhashFunction(unsigned int tid);
-unsigned notifyhashInsert(unsigned int tid, pthread_cond_t threadcond);
-pthread_cond_t notifyhashSearch(unsigned int tid); //returns val, NULL if not found
-unsigned int notifyhashRemove(unsigned int tid); //returns -1 if not found
+void insNode(threadlist_t *head, unsigned int threadid, unsigned int mid); //Inserts nodes for one object that 
+                                                                          //needs to send notification to threads waiting on it
+void display(threadlist_t *head);// Displays linked list of nodes for one object
+unsigned int notifyhashCreate(unsigned int size, float loadfactor); //returns 1 if hashtable creation is not successful
+unsigned int notifyhashFunction(unsigned int tid); //returns index in the hash table 
+unsigned int notifyhashInsert(unsigned int tid, notifydata_t *ndata); //returns 1 if insert not successful
+notifydata_t *notifyhashSearch(unsigned int tid); //returns pointer to notify data, NULL if not found
+unsigned int notifyhashRemove(unsigned int tid); //returns 1 if not successful
 unsigned int notifyhashResize(unsigned int newsize);
 
 #endif
index ea8ecd9869382ba55b8fc2818b9c63b56a6a026f..48dcd42476f20882288374dfd91ea6e2ba626144 100644 (file)
@@ -1019,13 +1019,13 @@ int transAbortProcess(local_thread_data_array_t  *localtdata) {
                STATUS(((objheader_t *)header)) &= ~(LOCK);
        }
 
-       printf("TRANS_ABORTED at Coordinator end\n");
-
+       printf("TRANS_ABORTED\n");
        return 0;
 }
 
 /*This function completes the COMMIT process is the transaction is commiting*/
 int transComProcess(local_thread_data_array_t  *localtdata) {
+       static int prevsize = 0, *prevptr;
        objheader_t *header, *tcptr;
        int i, nummod, tmpsize, numcreated, numlocked;
        unsigned int *oidmod, *oidcreated, *oidlocked;
@@ -1086,6 +1086,7 @@ int transComProcess(local_thread_data_array_t  *localtdata) {
                }
                STATUS(header) &= ~(LOCK);
        }
+
        return 0;
 }
 
@@ -1440,9 +1441,10 @@ void sendPrefetchReq(prefetchpile_t *mcpilenode, int threadid) {
 
        /* Send Trans Prefetch Request */
        if ((sd = socket(AF_INET, SOCK_STREAM, 0)) < 0) {
-               perror("Error in socket for TRANS_REQUEST\n");
+               perror("Error in socket for SEND_PREFETCH_REQUEST\n");
                return;
        }
+
        bzero((char*) &serv_addr, sizeof(serv_addr));
        serv_addr.sin_family = AF_INET;
        serv_addr.sin_port = htons(LISTEN_PORT);
@@ -1452,7 +1454,7 @@ void sendPrefetchReq(prefetchpile_t *mcpilenode, int threadid) {
 
        /* Open Connection */
        if (connect(sd, (struct sockaddr *) &serv_addr, sizeof(struct sockaddr)) < 0) {
-               perror("Error in connect for TRANS_REQUEST\n");
+               perror("Error in connect for SEND_PREFETCH_REQUEST\n");
                close(sd);
                return;
        }
@@ -1788,22 +1790,27 @@ int findHost(unsigned int hostIp)
 
 /* This function sends notification request per thread waiting on object(s) whose version 
  * changes */
-void reqNotify(unsigned int *oidarry, unsigned short *versionarry, unsigned int mid, unsigned int numoid) {
+int reqNotify(unsigned int *oidarry, unsigned short *versionarry, unsigned int numoid) {
        int sock,i;
        objheader_t *objheader;
        struct sockaddr_in remoteAddr;
-       char msg[1 + numoid * (sizeof(short) + sizeof(unsigned int)) + sizeof(unsigned int) * 3];
+       char msg[1 + numoid * (sizeof(short) + sizeof(unsigned int)) +  3 * sizeof(unsigned int)];
        char *ptr;
        int bytesSent;
        int status, size;
        unsigned short version;
-       unsigned int oid, threadid;
+       unsigned int oid, threadid, mid;
        pthread_mutex_t threadnotify; //Lock and condition var for threadjoin and notification
        pthread_cond_t threadcond;
+       notifydata_t *ndata;
+
+       //FIXME currently all oids belong to one machine
+       oid = oidarry[0];
+       mid = lhashSearch(oid);
 
        if ((sock = socket(AF_INET, SOCK_STREAM, 0)) < 0){
                perror("reqNotify():socket()");
-               return;
+               return -1;
        }
 
        bzero(&remoteAddr, sizeof(remoteAddr));
@@ -1813,20 +1820,29 @@ void reqNotify(unsigned int *oidarry, unsigned short *versionarry, unsigned int
 
        /* Generate unique threadid */
        threadid = (unsigned int) pthread_self();
-       if((status = notifyhashInsert(threadid, threadcond)) != 0) {
+
+       /* Save threadid, numoid, oidarray, versionarray, pthread_cond_variable for later processing */
+       if((ndata = calloc(1, sizeof(notifydata_t))) == NULL) {
+               printf("Calloc Error %s, %d\n", __FILE__, __LINE__);
+               return -1;
+       }
+       ndata->numoid = numoid;
+       ndata->threadid = threadid;
+       ndata->oidarry = oidarry;
+       ndata->versionarry = versionarry;
+       ndata->threadcond = threadcond;
+       if((status = notifyhashInsert(threadid, ndata)) != 0) {
                printf("reqNotify(): Insert into notify hash table not successful %s, %d\n", __FILE__, __LINE__);
-               return;
+               free(ndata);
+               return -1; 
        }
-
-       /* Save data that is sent for later processing */
-       //Save threadid, numoid, oidarray, versionarray, also the pthread_cond_variable in a linked list
-       //TODO
-
-       /* Send oidarry, version array, threadid and machine id */      
+       
+       /* Send  number of oids, oidarry, version array, machine id and threadid */     
        if (connect(sock, (struct sockaddr *)&remoteAddr, sizeof(remoteAddr)) < 0){
                printf("reqNotify():error %d connecting to %s:%d\n", errno,
                                inet_ntoa(remoteAddr.sin_addr), LISTEN_PORT);
-               status = -1;
+               free(ndata);
+               return -1;
        } else {
                msg[0] = THREAD_NOTIFY_REQUEST;
                msg[1] = numoid;
@@ -1873,14 +1889,47 @@ void reqNotify(unsigned int *oidarry, unsigned short *versionarry, unsigned int
        }
 
        close(sock);
+       return status;
 }
 
 void threadNotify(unsigned int oid, unsigned short version, unsigned int tid) {
-       pthread_cond_t ret;
+       notifydata_t *ndata;
+       int i, objIsFound = 0, index;
+       void *ptr;
+
        //Look up the tid and call the corresponding pthread_cond_signal
-       ret = notifyhashSearch(tid);
-       pthread_cond_signal(&ret);
-       //TODO process oid and version
+       if((ndata = notifyhashSearch(tid)) == NULL) {
+               printf("threadnotify(): No such threadid is present %s, %d\n", __FILE__, __LINE__);
+               return;
+       } else  {
+               for(i = 0; i < ndata->numoid; i++) {
+                       if(ndata->oidarry[i] == oid){
+                               objIsFound = 1;
+                               index = i;
+                       }
+               }
+               if(objIsFound == 0){
+                       return;
+               } else {
+                       if(version <= ndata->versionarry[index]){
+                               return;
+                       } else {
+                               /* Clear from prefetch cache and free thread related data structure */
+                               if((ptr = prehashSearch(oid)) == NULL) {
+                                       //TODO Ask about freeing
+                                       printf("threadnotify(): No such oid %s, %d\n", __FILE__, __LINE__);
+                                       pthread_cond_signal(&ndata->threadcond);
+                                       free(ndata);
+                                       return;
+                               } else {
+                                       prehashRemove(oid);
+                                       pthread_cond_signal(&ndata->threadcond);
+                                       free(ndata);
+                               }
+                       }
+               }
+       }
+       return;
 }
 
 int notifyAll(threadlist_t **head, unsigned int oid, unsigned int version) {
@@ -1911,9 +1960,9 @@ int notifyAll(threadlist_t **head, unsigned int oid, unsigned int version) {
                        msg[0] = THREAD_NOTIFY_RESPONSE;
                        msg[1] = oid;
                        size = sizeof(unsigned int);
-                       *(&msg[1]+ size) = version;
+                       *((unsigned short *)(&msg[1]+ size)) = version;
                        size+= sizeof(unsigned short);
-                       *(&msg[1]+ size) = ptr->threadid;
+                       *((unsigned int *)(&msg[1]+ size)) = ptr->threadid;
 
                        bytesSent = send(sock, msg, 1 + 2*sizeof(unsigned int) + sizeof(unsigned short), 0);
                        if (bytesSent < 0){
@@ -1932,6 +1981,7 @@ int notifyAll(threadlist_t **head, unsigned int oid, unsigned int version) {
                *head = ptr->next;
                free(ptr);
        }
+       return status;
 }
 
 void transAbort(transrecord_t *trans) {
index cc1bed7359b632d405ffcbca63ed87d580ccb644..5523426a20a301a3d262991e547778eb5bcbda21 100644 (file)
@@ -55,8 +55,7 @@ transstart:
   ptr = transRead(trans, oidvalue);
   struct ___Thread___ *p = (struct ___Thread___ *) ptr;
   p->___threadDone___ = 1;
-  while(!transCommit(trans)) {
-         printf("DEBUG-> Trans not committed yet\n");
+  if(transCommit(trans) != 0) {
          transAbort(trans);
          goto transstart;
   }
@@ -129,7 +128,7 @@ void CALL01(___Thread______join____, struct ___Thread___ * ___this___) {
   printf("DEBUG -> Inside thread join\n");
 #ifdef DSTM
   pthread_t thread;
-  unsigned int *oidarray, mid;
+  unsigned int *oidarray;
   unsigned short *versionarray, version;
   transrecord_t *trans;
   objheader_t *ptr;
@@ -156,9 +155,8 @@ transstart:
                  return;
          }
          versionarray[0] = version;
-         mid = lhashSearch((unsigned int) VAR(___this___));
          /* Request Notification */
-         reqNotify(oidarray, versionarray, mid, 1); 
+         reqNotify(oidarray, versionarray, 1); 
          free(oidarray);
          free(versionarray);
          transAbort(trans);
@@ -228,8 +226,7 @@ transstart:
   tmp  = transRead(trans, (unsigned int) oid);
   struct ___Thread___ *t = (struct ___Thread___ *) tmp;
   t->___threadDone___ = 1;
-  while(!transCommit(trans)) {
-         printf("DEBUG-> Trans not committed yet\n");
+  if(transCommit(trans)!= 0) {
          transAbort(trans);
          goto transstart;
   }