Implementation for thread join and wait and notify design
authoradash <adash>
Fri, 18 Jan 2008 21:53:32 +0000 (21:53 +0000)
committeradash <adash>
Fri, 18 Jan 2008 21:53:32 +0000 (21:53 +0000)
Minor bug fixes

Robust/src/ClassLibrary/ThreadDSM.java
Robust/src/Runtime/DSTM/interface/dstm.h
Robust/src/Runtime/DSTM/interface/dstmserver.c
Robust/src/Runtime/DSTM/interface/mlookup.c
Robust/src/Runtime/DSTM/interface/threadnotify.c [new file with mode: 0644]
Robust/src/Runtime/DSTM/interface/threadnotify.h [new file with mode: 0644]
Robust/src/Runtime/DSTM/interface/trans.c
Robust/src/Runtime/thread.c
Robust/src/Tests/Atomic3.java
Robust/src/Tests/Atomic4.java
Robust/src/buildscript

index 322a022b3c3a644155b7e5d95782d3eb66847535..d5c0a0ce9998f00b946083f66b573696e0783894 100644 (file)
@@ -1,14 +1,13 @@
 public class Thread {
     /* Don't allow overriding this method.  If you do, it will break dispatch
      * because we don't have the type information necessary. */
-    private boolean threadDone;
+    public boolean threadDone;
 
     public Thread() {
         threadDone = false;
     }
 
-    //public native static void join();
-    public native void join();
+    public final native void join();
 
     public final native void start(int mid);
 
index 917e546e1c5380304f278719923bf4627a0a97ed..ab3cc4ed2094c7dff5bbd10777e6045ed5bc1a24 100644 (file)
@@ -34,6 +34,8 @@
 #define TRANS_SUCESSFUL                        21
 #define TRANS_PREFETCH_RESPONSE                22
 #define START_REMOTE_THREAD            23
+#define THREAD_NOTIFY_REQUEST          24
+#define THREAD_NOTIFY_RESPONSE         25
 
 //Control bits for status of objects in Machine pile
 #define OBJ_LOCKED_BUT_VERSION_MATCH   14
@@ -51,6 +53,8 @@
 #include "clookup.h"
 #include "queue.h"
 #include "mcpileq.h"
+#include "threadnotify.h"
+
 
 #define DEFAULT_OBJ_STORE_SIZE 1048510 //1MB
 #define TID_LEN 20
@@ -65,6 +69,7 @@
 #include "structdefs.h"
 
 typedef struct objheader {
+       threadlist_t *notifylist;
        unsigned short version;
        unsigned short rcount;
 } objheader_t;
@@ -93,6 +98,7 @@ typedef struct objheader {
 #else
 
 typedef struct objheader {
+       threadlist_t *notifylist;
        unsigned int oid;
        unsigned short type;
        unsigned short version;
@@ -179,12 +185,15 @@ typedef struct local_thread_data_array {
        trans_commit_data_t *transinfo; /* Holds information of objects locked and not found in the participant */ 
 } local_thread_data_array_t;
 
-//Structure for members within prefetch tuples
-typedef struct member {
-       short offset;           /* Holds offset of the ptr field */
-       short index;            /* Holds the array index value */ 
-       struct member *next;    
-}trans_member_t;
+//Structure for objects involved in wait-notify call
+//TODO Use it
+typedef struct notifydata {
+       unsigned int numoid;    /* Number of oids on which we are waiting for updated notification */
+       unsigned int threadid;  /* The threadid that is waiting for  update notification response*/
+       unsigned int *oidarry;  /* Pointer to array of oids */
+       unsigned short *version;/* Pointer to array of versions of the oids that we are waiting on */
+}notifydata_t;
+
 
 /* Initialize main object store and lookup tables, start server thread. */
 int dstmInit(void);
@@ -206,8 +215,8 @@ int readClientReq(trans_commit_data_t *, int);
 int processClientReq(fixed_data_t *, trans_commit_data_t *,unsigned int *, char *, void *, unsigned int *, int);
 char handleTransReq(fixed_data_t *, trans_commit_data_t *, unsigned int *, char *, void *, int);
 int decideCtrlMessage(fixed_data_t *, trans_commit_data_t *, int *, int *, int *, int *, int *, void *, unsigned int *, unsigned int *, int);
-//int transCommitProcess(trans_commit_data_t *, int);
 int transCommitProcess(void *, unsigned int *, unsigned int *, int, int, int);
+void processReqNotify(unsigned int numoid, unsigned int *oid, unsigned short *version, unsigned int mid, unsigned int threadid);
 /* end server portion */
 
 /* Prototypes for transactions */
@@ -220,7 +229,7 @@ int processConfigFile();
 void addHost(unsigned int);
 void mapObjMethod(unsigned short);
 
-void randomdelay(void);
+void randomdelay();
 transrecord_t *transStart();
 objheader_t *transRead(transrecord_t *, unsigned int);
 objheader_t *transCreateObj(transrecord_t *, unsigned int); //returns oid
@@ -233,6 +242,7 @@ void *getRemoteObj(transrecord_t *, unsigned int, unsigned int);
 void *handleLocalReq(void *);
 int transComProcess(local_thread_data_array_t *);
 int transAbortProcess(local_thread_data_array_t *);
+void transAbort(transrecord_t *trans);
 
 void prefetch(int, unsigned int *, unsigned short *, short*);
 void *transPrefetch(void *);
@@ -246,5 +256,10 @@ void sendPrefetchReq(prefetchpile_t*, int);
 void getPrefetchResponse(int, int);
 unsigned short getObjType(unsigned int oid);
 int startRemoteThread(unsigned int oid, unsigned int mid);
+/* Sends notification request for thread join, if sucessful returns 0 else returns -1 */
+void reqNotify(unsigned int *oidarry, unsigned short *versionarry, unsigned int mid, unsigned int numoid);
+void threadNotify(unsigned int oid, unsigned short version, unsigned int tid);
+int notifyAll(threadlist_t **head, unsigned int oid, unsigned int version);
+
 /* end transactions */
 #endif
index 11d2507f4586c4b60c270dc694f0ad259834e7a0..3ec7635ca6b688bf82905a6164e1a2bf97cfe239 100644 (file)
@@ -7,9 +7,12 @@
 #include <pthread.h>
 #include <netdb.h>
 #include <fcntl.h>
+#include <errno.h>
+#include <string.h>
 #include "dstm.h"
 #include "mlookup.h"
 #include "llookup.h"
+#include "threadnotify.h"
 #ifdef COMPILER
 #include "thread.h"
 #endif
@@ -41,6 +44,9 @@ int dstmInit(void)
        
        if (lhashCreate(HASH_SIZE, LOADFACTOR))
                return 1; //failure
+
+       if (notifyhashCreate(N_HASH_SIZE, N_LOADFACTOR))
+               return 1; //failure
        
        return 0;
 }
@@ -107,23 +113,22 @@ void *dstmListen()
  * and accordingly calls other functions to process new requests */
 void *dstmAccept(void *acceptfd)
 {
-       int numbytes,i, val, retval;
+       int val, retval, size;
        unsigned int oid;
        char buffer[RECEIVE_BUFFER_SIZE], control,ctrl;
        char *ptr;
        void *srcObj;
        objheader_t *h;
        trans_commit_data_t transinfo;
-       unsigned short objType;
-       
+       unsigned short objType, *versionarry, version;
+       unsigned int *oidarry, numoid, mid, threadid;
+
        transinfo.objlocked = NULL;
        transinfo.objnotfound = NULL;
        transinfo.modptr = NULL;
        transinfo.numlocked = 0;
        transinfo.numnotfound = 0;
 
-       int fd_flags = fcntl((int)acceptfd, F_GETFD), size;
-
        /* Receive control messages from other machines */
        if((retval = recv((int)acceptfd, &control, sizeof(char), 0)) <= 0) {
                if (retval == 0) {
@@ -142,6 +147,7 @@ void *dstmAccept(void *acceptfd)
                        }
                        if((srcObj = mhashSearch(oid)) == NULL) {
                                printf("Object not found in Main Object Store %s %d\n", __FILE__, __LINE__);
+                               pthread_exit(NULL);
                        }
                        h = (objheader_t *) srcObj;
                        GETSIZE(size, h);
@@ -191,7 +197,7 @@ void *dstmAccept(void *acceptfd)
                case TRANS_PREFETCH:
                        printf("DEBUG -> Recv TRANS_PREFETCH\n");
                        if((val = prefetchReq((int)acceptfd)) != 0) {
-                               printf("Error in readClientReq\n");
+                               printf("Error in transPrefetch\n");
                                pthread_exit(NULL);
                        }
                        break;
@@ -209,6 +215,43 @@ void *dstmAccept(void *acceptfd)
                        }
                        break;
 
+               case THREAD_NOTIFY_REQUEST:
+                       size = sizeof(unsigned int);
+                       retval = recv((int)acceptfd, ptr, size, 0);
+                       numoid = *((unsigned int *) ptr);
+                       size = (sizeof(unsigned int) + sizeof(unsigned short)) * numoid + 2 * sizeof(unsigned int);
+                       retval = recv((int)acceptfd, ptr, size, 0);
+                       oidarry = calloc(numoid, sizeof(unsigned int)); 
+                       memcpy(oidarry, ptr, sizeof(unsigned int) * numoid);
+                       size = sizeof(unsigned int) * numoid;
+                       versionarry = calloc(numoid, sizeof(unsigned short));
+                       memcpy(versionarry, ptr+size, sizeof(unsigned short) * numoid);
+                       size += sizeof(unsigned short) * numoid;
+                       mid = *((unsigned int *)(ptr+size));
+                       size += sizeof(unsigned int);
+                       threadid = *((unsigned int *)(ptr+size));
+                       processReqNotify(numoid, oidarry, versionarry, mid, threadid);
+
+                       break;
+
+               case THREAD_NOTIFY_RESPONSE:
+                       size = sizeof(unsigned short) + 2 * sizeof(unsigned int);
+                       retval = recv((int)acceptfd, ptr, size, 0);
+                       if(retval <= 0) 
+                               perror("dstmAccept(): error receiving THREAD_NOTIFY_RESPONSE msg");
+                       else if( retval != 2*sizeof(unsigned int) + sizeof(unsigned short))
+                               printf("dstmAccept(): incorrect smsg size %d for THREAD_NOTIFY_RESPONSE msg\n", retval);
+                       else {
+                               oid = *((unsigned int *)ptr);
+                               size = sizeof(unsigned int);
+                               version = *((unsigned short *)(ptr+size));
+                               size += sizeof(unsigned short);
+                               threadid = *((unsigned int *)(ptr+size));
+                               threadNotify(oid,version,threadid);
+                       }
+
+                       break;
+
                default:
                        printf("DEBUG -> dstmAccept: Error Unknown opcode %d\n", control);
        }
@@ -561,6 +604,10 @@ int transCommitProcess(void *modptr, unsigned int *oidmod, unsigned int *oidlock
                pthread_mutex_lock(&mainobjstore_mutex);
                memcpy(header, (char *)modptr + offset, tmpsize + sizeof(objheader_t));
                header->version += 1; 
+               /* If threads are waiting on this object to be updated, notify them */
+               if(header->notifylist != NULL) {
+                       notifyAll(&header->notifylist, OID(header), header->version);
+               }
                pthread_mutex_unlock(&mainobjstore_mutex);
                offset += sizeof(objheader_t) + tmpsize;
        }
@@ -628,6 +675,7 @@ int prefetchReq(int acceptfd) {
     } while(sum < N && n != 0);        
     
     /* Process each oid */
+    printf("Oid 0x%x is being searched\n", oid);
     if ((mobj = mhashSearch(oid)) == NULL) {/* Obj not found */
       /* Save the oids not found in buffer for later use */
       *(buffer + index) = OBJECT_NOT_FOUND;
@@ -708,3 +756,75 @@ int prefetchReq(int acceptfd) {
   return 0;
 }
 
+void processReqNotify(unsigned int numoid, unsigned int *oidarry, unsigned short *versionarry, unsigned int mid, unsigned int threadid) {
+       objheader_t *header;
+       unsigned int oid;
+       unsigned short newversion;
+       char msg[1+ sizeof(unsigned int)];
+       int sd;
+       struct sockaddr_in remoteAddr;
+       int bytesSent;
+       int status, size, retry = 0;
+
+       int i = 0;
+       while(i < numoid) {
+               oid = *(oidarry + i);
+               if((header = (objheader_t *) mhashSearch(oid)) == NULL) {
+                       printf("processReqNotify(): Object is not found in mlookup %s, %d\n", __FILE__, __LINE__);
+                       return;
+               } else {
+                       /* Check to see if versions are same */
+checkversion:
+                       if ((STATUS(header) & LOCK) != LOCK) {          
+                               STATUS(header) |= LOCK;
+                               if(header->version == *(versionarry + i)) {
+                                       //Add to the notify list 
+                                       insNode(header->notifylist, threadid, mid); 
+                               } else {
+                                       if ((sd = socket(AF_INET, SOCK_STREAM, 0)) < 0){
+                                               perror("processReqNotify():socket()");
+                                               return;
+                                       }
+                                       bzero(&remoteAddr, sizeof(remoteAddr));
+                                       remoteAddr.sin_family = AF_INET;
+                                       remoteAddr.sin_port = htons(LISTEN_PORT);
+                                       remoteAddr.sin_addr.s_addr = htonl(mid);
+
+                                       if (connect(sd, (struct sockaddr *)&remoteAddr, sizeof(remoteAddr)) < 0){
+                                               printf("processReqNotify():error %d connecting to %s:%d\n", errno,
+                                                               inet_ntoa(remoteAddr.sin_addr), LISTEN_PORT);
+                                               status = -1;
+                                       } else {
+                                               //Send Update notification
+                                               msg[0] = THREAD_NOTIFY_RESPONSE;
+                                               msg[1] = oid;
+                                               size = sizeof(unsigned int);
+                                               memcpy(&msg[1] + size, &newversion, sizeof(unsigned short)); 
+                                               size += sizeof(unsigned short);
+                                               memcpy(&msg[1] + size, &threadid, sizeof(unsigned int)); 
+                                               bytesSent = send(sd, msg, 1+sizeof(unsigned int), 0);
+                                               if (bytesSent < 0){
+                                                       perror("processReqNotify():send()");
+                                                       status = -1;
+                                               } else if (bytesSent != 1 + sizeof(unsigned short) + 2*sizeof(unsigned int)){
+                                                       printf("processReqNotify(): error, sent %d bytes\n", bytesSent);
+                                                       status = -1;
+                                               } else {
+                                                       status = 0;
+                                               }
+
+                                       }
+                                       close(sd);
+                               }
+                               STATUS(header) &= ~(LOCK);              
+                       } else {
+                               randomdelay();
+                               printf("DEBUG-> processReqNotify() Object is still locked\n");
+                               goto checkversion;
+                       }
+               }
+       }
+       free(oidarry);
+       free(versionarry);
+}
+
index a9cca79536745d1babb8c490f7e4f5a3a465fb0e..0629adbec6c768ac618a08c46dd9b4df792dce3f 100644 (file)
@@ -5,8 +5,6 @@ mhashtable_t mlookup;   //Global hash table
 // Creates a machine lookup table with size =" size" 
 unsigned int mhashCreate(unsigned int size, float loadfactor)  {
        mhashlistnode_t *nodes;
-       int i;
-
        // Allocate space for the hash table 
        if((nodes = calloc(size, sizeof(mhashlistnode_t))) == NULL) {
                printf("Calloc error %s %d\n", __FILE__, __LINE__);
diff --git a/Robust/src/Runtime/DSTM/interface/threadnotify.c b/Robust/src/Runtime/DSTM/interface/threadnotify.c
new file mode 100644 (file)
index 0000000..cfa45ee
--- /dev/null
@@ -0,0 +1,223 @@
+#include "threadnotify.h"
+
+notifyhashtable_t nlookup; //Global hash table
+
+void insNode(threadlist_t *head, unsigned int threadid, unsigned int mid) {
+       threadlist_t *ptr;
+       if(head == NULL) {
+               if((head = calloc(1, sizeof(threadlist_t))) == NULL) {
+                       printf("Calloc Error %s, %d,\n", __FILE__, __LINE__);
+                       return;
+               }
+               head->threadid = threadid;
+               head->mid = mid;
+               head->next = NULL;
+       } else {
+               if((ptr = calloc(1, sizeof(threadlist_t))) == NULL) {
+                       printf("Calloc Error %s, %d,\n", __FILE__, __LINE__);
+                       return;
+               }
+               ptr->threadid = threadid;
+               ptr->mid = mid;
+               ptr->next = head;
+               head = ptr;
+       }
+}
+
+void display(threadlist_t *head) {
+       threadlist_t *ptr;
+       if(head == NULL) {
+               printf("No thread is waiting\n");
+               return;
+       } else {
+               while(head != NULL) {
+                       ptr = head;
+                       printf("The threadid waiting is = %d\n", ptr->threadid);
+                       printf("The mid on which thread present = %d\n", ptr->mid);
+                       head = ptr->next;
+               }
+       }
+}
+
+unsigned int notifyhashCreate(unsigned int size, float loadfactor) { 
+       notifylistnode_t *nodes;
+
+       // Allocate space for the hash table 
+       if((nodes = calloc(size, sizeof(notifylistnode_t))) == NULL) {
+               printf("Calloc error %s %d\n", __FILE__, __LINE__);
+               return 1;
+       }
+
+       nlookup.table = nodes;
+       nlookup.size = size;
+       nlookup.numelements = 0; // Initial number of elements in the hash
+       nlookup.loadfactor = loadfactor;
+       //Initialize the pthread_mutex variable         
+       pthread_mutex_init(&nlookup.locktable, NULL);
+       return 0;
+}
+
+// Assign to tids to bins inside hash table
+unsigned int notifyhashFunction(unsigned int tid) {
+       return( tid % (nlookup.size));
+}
+
+// Insert threadcond and threadid mapping into the hash table
+unsigned int notifyhashInsert(unsigned int tid, pthread_cond_t threadcond) {
+       unsigned int newsize;
+       int index;
+       notifylistnode_t *ptr, *node;
+
+       if (nlookup.numelements > (nlookup.loadfactor * nlookup.size)) {
+               //Resize Table
+               newsize = 2 * nlookup.size + 1;         
+               pthread_mutex_lock(&nlookup.locktable);
+               notifyhashResize(newsize);
+               pthread_mutex_unlock(&nlookup.locktable);
+       }
+       ptr = nlookup.table;
+       nlookup.numelements++;
+
+       index = notifyhashFunction(tid);
+#ifdef DEBUG
+       printf("DEBUG -> index = %d, threadid = %d\n", index, tid);
+#endif
+       pthread_mutex_lock(&nlookup.locktable);
+       if(ptr[index].next == NULL && ptr[index].threadid == 0) {       // Insert at the first position in the hashtable
+               ptr[index].threadid = tid;
+               ptr[index].threadcond = threadcond;
+       } else {                        // Insert in the beginning of linked list
+               if ((node = calloc(1, sizeof(notifylistnode_t))) == NULL) {
+                       printf("Calloc error %s, %d\n", __FILE__, __LINE__);
+                       pthread_mutex_unlock(&nlookup.locktable);
+                       return 1;
+               }
+               node->threadid = tid;
+               node->threadcond = threadcond;
+               node->next = ptr[index].next;
+               ptr[index].next = node;
+       }
+       pthread_mutex_unlock(&nlookup.locktable);
+       return 0;
+}
+
+// Return pthread_cond_t threadcond for a given threadid in the hash table
+pthread_cond_t notifyhashSearch(unsigned int tid) {
+       int index;
+       notifylistnode_t *ptr, *node;
+       pthread_cond_t tmp = PTHREAD_COND_INITIALIZER;
+
+       ptr = nlookup.table;    // Address of the beginning of hash table       
+       index = notifyhashFunction(tid);
+       node = &ptr[index];
+       pthread_mutex_lock(&nlookup.locktable);
+       while(node != NULL) {
+               if(node->threadid == tid) {
+                       pthread_mutex_unlock(&nlookup.locktable);
+                       return node->threadcond;
+               }
+               node = node->next;
+       }
+       pthread_mutex_unlock(&nlookup.locktable);
+       return tmp;
+}
+
+// Remove an entry from the hash table
+unsigned int notifyhashRemove(unsigned int tid) {
+       int index;
+       notifylistnode_t *curr, *prev;
+       notifylistnode_t *ptr, *node;
+
+       ptr = nlookup.table;
+       index = notifyhashFunction(tid);
+       curr = &ptr[index];
+
+       pthread_cond_t tmp = PTHREAD_COND_INITIALIZER;
+       pthread_mutex_lock(&nlookup.locktable);
+       for (; curr != NULL; curr = curr->next) {
+               if (curr->threadid == tid) {         // Find a match in the hash table
+                       nlookup.numelements--;  // Decrement the number of elements in the global hashtable
+                       if ((curr == &ptr[index]) && (curr->next == NULL))  { // Delete the first item inside the hashtable with no linked list of notifylistnode_t 
+                               curr->threadid = 0;
+                               curr->threadcond = tmp;
+                       } else if ((curr == &ptr[index]) && (curr->next != NULL)) { //Delete the first bin item with a linked list of notifylistnode_t  connected 
+                               curr->threadid = curr->next->threadid;
+                               curr->threadcond = curr->next->threadcond;
+                               node = curr->next;
+                               curr->next = curr->next->next;
+                               free(node);
+                       } else {                                                // Regular delete from linked listed    
+                               prev->next = curr->next;
+                               free(curr);
+                       }
+                       pthread_mutex_unlock(&nlookup.locktable);
+                       return 0;
+               }       
+               prev = curr; 
+       }
+       pthread_mutex_unlock(&nlookup.locktable);
+       return 1;
+}
+
+// Resize table
+unsigned int notifyhashResize(unsigned int newsize) {
+       notifylistnode_t *node, *ptr, *curr, *next;     // curr and next keep track of the current and the next notifyhashlistnodes in a linked list
+       unsigned int oldsize;
+       int isfirst;    // Keeps track of the first element in the notifylistnode_t for each bin in hashtable
+       int i,index;    
+       notifylistnode_t *newnode;              
+
+       ptr = nlookup.table;
+       oldsize = nlookup.size;
+
+       if((node = calloc(newsize, sizeof(notifylistnode_t))) == NULL) {
+               printf("Calloc error %s %d\n", __FILE__, __LINE__);
+               return 1;
+       }
+
+       nlookup.table = node;           //Update the global hashtable upon resize()
+       nlookup.size = newsize;
+       nlookup.numelements = 0;
+
+       for(i = 0; i < oldsize; i++) {                  //Outer loop for each bin in hash table
+               curr = &ptr[i];
+               isfirst = 1;                    
+               while (curr != NULL) {                  //Inner loop to go through linked lists
+                       if (curr->threadid == 0) {              //Exit inner loop if there the first element for a given bin/index is NULL
+                               break;                  //threadid = threadcond =0 for element if not present within the hash table
+                       }
+                       next = curr->next;
+                       index = notifyhashFunction(curr->threadid);
+#ifdef DEBUG
+                       printf("DEBUG(resize) -> index = %d, threadid = %d\n", index, curr->threadid);
+#endif
+                       // Insert into the new table
+                       if(nlookup.table[index].next == NULL && nlookup.table[index].threadid == 0) { 
+                               nlookup.table[index].threadid = curr->threadid;
+                               nlookup.table[index].threadcond = curr->threadcond;
+                               nlookup.numelements++;
+                       }else { 
+                               if((newnode = calloc(1, sizeof(notifylistnode_t))) == NULL) { 
+                                       printf("Calloc error %s, %d\n", __FILE__, __LINE__);
+                                       return 1;
+                               }       
+                               newnode->threadid = curr->threadid;
+                               newnode->threadcond = curr->threadcond;
+                               newnode->next = nlookup.table[index].next;
+                               nlookup.table[index].next = newnode;    
+                               nlookup.numelements++;
+                       }       
+
+                       //free the linked list of notifylistnode_t if not the first element in the hash table
+                       if (isfirst != 1) {
+                               free(curr);
+                       } 
+
+                       isfirst = 0;
+                       curr = next;
+               }
+       }
+
+       free(ptr);              //Free the memory of the old hash table 
+       return 0;
+}
diff --git a/Robust/src/Runtime/DSTM/interface/threadnotify.h b/Robust/src/Runtime/DSTM/interface/threadnotify.h
new file mode 100644 (file)
index 0000000..75071a7
--- /dev/null
@@ -0,0 +1,41 @@
+#ifndef _THREADNOTIFY_H_
+#define _THREADNOTIFY_H_
+
+#include <stdio.h>
+#include <stdlib.h>
+#include <pthread.h>
+
+#define N_LOADFACTOR 0.75
+#define N_HASH_SIZE 20
+
+//Structure to notify object of which other objects/threads are waiting on it
+typedef struct threadlist {
+       unsigned int threadid;
+       unsigned int mid;
+       struct threadlist *next;
+} threadlist_t;
+
+typedef struct notifylistnode {
+       unsigned int threadid;
+       pthread_cond_t threadcond;
+       struct notifylistnode *next;
+} notifylistnode_t;
+
+typedef struct notifyhashtable {
+       notifylistnode_t *table; //points to beginning of hash table
+       unsigned int size;
+       unsigned int numelements;
+       float loadfactor;
+       pthread_mutex_t locktable;
+} notifyhashtable_t;
+
+void insNode(threadlist_t *head, unsigned int threadid, unsigned int mid);
+void display(threadlist_t *head);
+unsigned int notifyhashCreate(unsigned int size, float loadfactor);
+unsigned int notifyhashFunction(unsigned int tid);
+unsigned notifyhashInsert(unsigned int tid, pthread_cond_t threadcond);
+pthread_cond_t notifyhashSearch(unsigned int tid); //returns val, NULL if not found
+unsigned int notifyhashRemove(unsigned int tid); //returns -1 if not found
+unsigned int notifyhashResize(unsigned int newsize);
+
+#endif
index 26facf03161c68e526b5a9003f97f2a9b5fb1aa6..ea8ecd9869382ba55b8fc2818b9c63b56a6a026f 100644 (file)
@@ -6,6 +6,7 @@
 #include "llookup.h"
 #include "plookup.h"
 #include "prelookup.h"
+#include "threadnotify.h"
 #include "queue.h"
 #include <pthread.h>
 #include <sys/types.h>
@@ -17,7 +18,6 @@
 #include <errno.h>
 #include <time.h>
 #include <string.h>
-#include <pthread.h>
 
 #define LISTEN_PORT 2156
 #define RECEIVE_BUFFER_SIZE 2048
@@ -154,7 +154,6 @@ void *pCacheAlloc(objstr_t *store, unsigned int size) {
        }
 
        if(success == 0) {
-               printf("DEBUG-> Unable to insert object in Prefetch cache\n");
                return NULL;
        }
 }
@@ -214,7 +213,7 @@ void transExit() {
 
 /* This functions inserts randowm wait delays in the order of msec
  * Mostly used when transaction commits retry*/
-void randomdelay(void)
+void randomdelay()
 {
        struct timespec req;
        time_t t;
@@ -326,6 +325,8 @@ objheader_t *transRead(transrecord_t *record, unsigned int oid) {
 
                /* Get the object from the remote location */
                machinenumber = lhashSearch(oid);
+               char* ipaddr;
+               midtoIP(machinenumber, ipaddr);
                objcopy = getRemoteObj(record, machinenumber, oid);
                if(objcopy == NULL) {
                        printf("Object not found in Remote location %s, %d\n", __FILE__, __LINE__);
@@ -344,6 +345,7 @@ objheader_t *transRead(transrecord_t *record, unsigned int oid) {
 objheader_t *transCreateObj(transrecord_t *record, unsigned int size)
 {
   objheader_t *tmp = (objheader_t *) objstrAlloc(record->cache, (sizeof(objheader_t) + size));
+  tmp->notifylist = NULL;
   OID(tmp) = getNewOID();
   tmp->version = 1;
   tmp->rcount = 1;
@@ -441,7 +443,7 @@ int transCommit(transrecord_t *record) {
                /* Create a list of machine ids(Participants) involved in transaction   */
                if((listmid = calloc(pilecount, sizeof(unsigned int))) == NULL) {
                        printf("Calloc error %s, %d\n", __FILE__, __LINE__);
-                       free(record);
+                       //free(record);
                        return 1;
                }               
                pListMid(pile, listmid);
@@ -462,7 +464,7 @@ int transCommit(transrecord_t *record) {
                        pthread_mutex_destroy(&tlock);
                        pDelete(pile_ptr);
                        free(listmid);
-                       free(record);
+                       //free(record);
                        return 1;
                }
 
@@ -474,7 +476,7 @@ int transCommit(transrecord_t *record) {
                        pDelete(pile_ptr);
                        free(listmid);
                        free(thread_data_array);
-                       free(record);
+                       //free(record);
                        return 1;
                }
 
@@ -498,7 +500,7 @@ int transCommit(transrecord_t *record) {
                                free(listmid);
                                free(thread_data_array);
                                free(ltdata);
-                               free(record);
+                               //free(record);
                                return 1;
                        }
                        tosend->f.control = TRANS_REQUEST;
@@ -537,7 +539,7 @@ int transCommit(transrecord_t *record) {
                                                free(thread_data_array[i].buffer);
                                        free(thread_data_array);
                                        free(ltdata);
-                                       free(record);
+                                       //free(record);
                                        return 1;
                                }
                        } else { /*Local*/
@@ -556,7 +558,7 @@ int transCommit(transrecord_t *record) {
                                                free(thread_data_array[i].buffer);
                                        free(thread_data_array);
                                        free(ltdata);
-                                       free(record);
+                                       //free(record);
                                        return 1;
                                }
                        }
@@ -581,7 +583,7 @@ int transCommit(transrecord_t *record) {
                                        free(thread_data_array[j].buffer);
                                free(thread_data_array);
                                free(ltdata);
-                               free(record);
+                               //free(record);
                                return 1;
                        }
                        free(thread_data_array[i].buffer);
@@ -1050,6 +1052,11 @@ int transComProcess(local_thread_data_array_t  *localtdata) {
                pthread_mutex_lock(&mainobjstore_mutex);
                memcpy(header, tcptr, tmpsize + sizeof(objheader_t));
                header->version += 1;
+               /* If threads are waiting on this object to be updated, notify them */
+               if(header->notifylist != NULL) {
+                       notifyAll(&header->notifylist, OID(header), header->version);
+               }
+
                pthread_mutex_unlock(&mainobjstore_mutex);
        }
        /* If object is newly created inside transaction then commit it */
@@ -1059,6 +1066,7 @@ int transComProcess(local_thread_data_array_t  *localtdata) {
                        return 1;
                }
                GETSIZE(tmpsize, header);
+               tmpsize += sizeof(objheader_t);
                pthread_mutex_lock(&mainobjstore_mutex);
                if ((ptrcreate = objstrAlloc(mainobjstore, tmpsize)) == NULL) {
                        printf("Error: transComProcess() failed objstrAlloc\n");
@@ -1067,7 +1075,6 @@ int transComProcess(local_thread_data_array_t  *localtdata) {
                }
                pthread_mutex_unlock(&mainobjstore_mutex);
                memcpy(ptrcreate, header, tmpsize + sizeof(objheader_t));
-
                mhashInsert(oidcreated[i], ptrcreate);
                lhashInsert(oidcreated[i], myIpAddr);
        }
@@ -1100,6 +1107,7 @@ void checkPrefetchTuples(prefetchqelem_t *node) {
        oid = GET_PTR_OID(ptr);
        endoffsets = GET_PTR_EOFF(ptr, ntuples); 
        arryfields = GET_PTR_ARRYFLD(ptr, ntuples);
+       
        /* Find offset length for each tuple */
        int numoffset[ntuples];
        numoffset[0] = endoffsets[0];
@@ -1240,10 +1248,9 @@ prefetchpile_t *foundLocal(prefetchqelem_t *node) {
                                }
                                if(isArray == 1) {
                                        int elementsize = classsize[TYPE(objheader)];
-                                       struct ArrayObject *ao = (struct ArrayObject *) (tmp + sizeof(objheader_t));
                                        objoid = *((unsigned int *)(tmp + sizeof(objheader_t) + sizeof(struct ArrayObject) + (elementsize*arryfields[arryfieldindex])));
                                } else {
-                                       objoid = *(tmp + sizeof(objheader_t) + arryfields[arryfieldindex]);
+                                       objoid = *((unsigned int *)(tmp + sizeof(objheader_t) + arryfields[arryfieldindex]));
                                }
                                //Update numoffset array
                                numoffset[i] = numoffset[i] - 1;
@@ -1315,7 +1322,7 @@ void checkPreCache(prefetchqelem_t *node, int *numoffset, unsigned int objoid, i
                                int elementsize = classsize[TYPE(header)];
                                objoid = *((unsigned int *)(tmp + sizeof(objheader_t) + sizeof(struct ArrayObject) + (elementsize*arryfields[arryfieldindex])));
                        } else {
-                               objoid = *(tmp + sizeof(objheader_t) + arryfields[arryfieldindex]);
+                               objoid = *((unsigned int *)(tmp + sizeof(objheader_t) + arryfields[arryfieldindex]));
                        }
                        //Update numoffset array
                        numoffset[index] = numoffset[index] - 1;
@@ -1572,7 +1579,8 @@ void getPrefetchResponse(int count, int sd) {
                                        /* Increment it to get the object */
                                        /* TODO: For each object not found query DHT for new location and retrieve the object */
                                        index += sizeof(char);
-                                       memcpy(&oid, buffer + index, sizeof(unsigned int));
+                                       //memcpy(&oid, buffer + index, sizeof(unsigned int));
+                                       oid = *((unsigned int *)(buffer + index));
                                        index += sizeof(unsigned int);
                                        /* Throw an error */
                                        printf("OBJECT NOT FOUND.... THIS SHOULD NOT HAPPEN...TERMINATE PROGRAM\n");
@@ -1600,7 +1608,6 @@ unsigned short getObjType(unsigned int oid)
        {
                if ((objheader = (objheader_t *) prehashSearch(oid)) == NULL)
                {
-                       //prefetch(1, &oid, &numoffsets, NULL);
                        prefetch(1, &oid, numoffset, fieldoffset);
                        pthread_mutex_lock(&pflookup.lock);
                        while ((objheader = (objheader_t *) prehashSearch(oid)) == NULL)
@@ -1778,3 +1785,157 @@ int findHost(unsigned int hostIp)
        //not found
        return -1;
 }
+
+/* This function sends notification request per thread waiting on object(s) whose version 
+ * changes */
+void reqNotify(unsigned int *oidarry, unsigned short *versionarry, unsigned int mid, unsigned int numoid) {
+       int sock,i;
+       objheader_t *objheader;
+       struct sockaddr_in remoteAddr;
+       char msg[1 + numoid * (sizeof(short) + sizeof(unsigned int)) + sizeof(unsigned int) * 3];
+       char *ptr;
+       int bytesSent;
+       int status, size;
+       unsigned short version;
+       unsigned int oid, threadid;
+       pthread_mutex_t threadnotify; //Lock and condition var for threadjoin and notification
+       pthread_cond_t threadcond;
+
+       if ((sock = socket(AF_INET, SOCK_STREAM, 0)) < 0){
+               perror("reqNotify():socket()");
+               return;
+       }
+
+       bzero(&remoteAddr, sizeof(remoteAddr));
+       remoteAddr.sin_family = AF_INET;
+       remoteAddr.sin_port = htons(LISTEN_PORT);
+       remoteAddr.sin_addr.s_addr = htonl(mid);
+
+       /* Generate unique threadid */
+       threadid = (unsigned int) pthread_self();
+       if((status = notifyhashInsert(threadid, threadcond)) != 0) {
+               printf("reqNotify(): Insert into notify hash table not successful %s, %d\n", __FILE__, __LINE__);
+               return;
+       }
+
+       /* Save data that is sent for later processing */
+       //Save threadid, numoid, oidarray, versionarray, also the pthread_cond_variable in a linked list
+       //TODO
+
+       /* Send oidarry, version array, threadid and machine id */      
+       if (connect(sock, (struct sockaddr *)&remoteAddr, sizeof(remoteAddr)) < 0){
+               printf("reqNotify():error %d connecting to %s:%d\n", errno,
+                               inet_ntoa(remoteAddr.sin_addr), LISTEN_PORT);
+               status = -1;
+       } else {
+               msg[0] = THREAD_NOTIFY_REQUEST;
+               msg[1] = numoid;
+               /* Send array of oids  */
+               size = sizeof(unsigned int);
+               {
+                       i = 0;
+                       while(i < numoid) {
+                               oid = oidarry[i];
+                               *((unsigned int *)(&msg[1] + size)) = oid;
+                               size += sizeof(unsigned int);
+                               i++;
+                       }
+               }
+
+               /* Send array of version  */
+               {
+                       i = 0;
+                       while(i < numoid) {
+                               version = versionarry[i];
+                               *((unsigned short *)(&msg[1] + size)) = oid;
+                               size += sizeof(unsigned short);
+                               i++;
+                       }
+               }
+
+               *((unsigned int *)(&msg[1] + size)) = myIpAddr;
+               size += sizeof(unsigned int);
+               *((unsigned int *)(&msg[1] + size)) = threadid;
+
+               pthread_mutex_lock(&threadnotify);
+               bytesSent = send(sock, msg, 1 + numoid * (sizeof(unsigned int) + sizeof(unsigned short)) + 2 * sizeof(unsigned int) , 0);
+               if (bytesSent < 0){
+                       perror("reqNotify():send()");
+                       status = -1;
+               } else if (bytesSent != 1 + 5*sizeof(unsigned int)){
+                       printf("reNotify(): error, sent %d bytes\n", bytesSent);
+                       status = -1;
+               } else {
+                       status = 0;
+               }
+               pthread_cond_wait(&threadcond, &threadnotify);
+               pthread_mutex_unlock(&threadnotify);
+       }
+
+       close(sock);
+}
+
+void threadNotify(unsigned int oid, unsigned short version, unsigned int tid) {
+       pthread_cond_t ret;
+       //Look up the tid and call the corresponding pthread_cond_signal
+       ret = notifyhashSearch(tid);
+       pthread_cond_signal(&ret);
+       //TODO process oid and version
+}
+
+int notifyAll(threadlist_t **head, unsigned int oid, unsigned int version) {
+       threadlist_t *ptr;
+       unsigned int mid;
+       struct sockaddr_in remoteAddr;
+       char msg[1 + sizeof(unsigned short) + 2*sizeof(unsigned int)];
+       int sock, status, size, bytesSent;
+       while(*head != NULL) {
+               ptr = *head;
+               mid = ptr->mid; 
+               //create a socket connection to that machine
+               if ((sock = socket(AF_INET, SOCK_STREAM, 0)) < 0){
+                       perror("notifyAll():socket()");
+                       return -1;
+               }
+
+               bzero(&remoteAddr, sizeof(remoteAddr));
+               remoteAddr.sin_family = AF_INET;
+               remoteAddr.sin_port = htons(LISTEN_PORT);
+               remoteAddr.sin_addr.s_addr = htonl(mid);
+               //send Thread Notify response and threadid to that machine
+               if (connect(sock, (struct sockaddr *)&remoteAddr, sizeof(remoteAddr)) < 0){
+                       printf("notifyAll():error %d connecting to %s:%d\n", errno,
+                                       inet_ntoa(remoteAddr.sin_addr), LISTEN_PORT);
+                       status = -1;
+               } else {
+                       msg[0] = THREAD_NOTIFY_RESPONSE;
+                       msg[1] = oid;
+                       size = sizeof(unsigned int);
+                       *(&msg[1]+ size) = version;
+                       size+= sizeof(unsigned short);
+                       *(&msg[1]+ size) = ptr->threadid;
+
+                       bytesSent = send(sock, msg, 1 + 2*sizeof(unsigned int) + sizeof(unsigned short), 0);
+                       if (bytesSent < 0){
+                               perror("notifyAll():send()");
+                               status = -1;
+                       } else if (bytesSent != 1 + 2*sizeof(unsigned int) + sizeof(unsigned short)){
+                               printf("notifyAll(): error, sent %d bytes\n", bytesSent);
+                               status = -1;
+                       } else {
+                               status = 0;
+                       }
+               }
+               //close socket
+               close(sock);
+               // Update head
+               *head = ptr->next;
+               free(ptr);
+       }
+}
+
+void transAbort(transrecord_t *trans) {
+       objstrDelete(trans->cache);
+       chashDelete(trans->lookupTable);
+       free(trans);
+}
index 009677fb3b86a7a5f2563207a4019774f7139234..cc1bed7359b632d405ffcbca63ed87d580ccb644 100644 (file)
@@ -7,6 +7,7 @@
 #include "option.h"
 #include <signal.h>
 #include <DSTM/interface/dstm.h>
+#include <DSTM/interface/llookup.h>
 
 #include <stdio.h>
 int threadcount;
@@ -19,9 +20,13 @@ pthread_key_t threadlocks;
 pthread_mutex_t threadnotifylock;
 pthread_cond_t threadnotifycond;
 transrecord_t * trans;
+pthread_key_t oid;
 
 void threadexit() {
-  void *ptr;
+  objheader_t* ptr;
+  void *value;
+  unsigned int oidvalue;
+
 #ifdef THREADS
   struct ___Object___ *ll=pthread_getspecific(threadlocks);
   while(ll!=NULL) {
@@ -40,20 +45,22 @@ void threadexit() {
   threadcount--;
   pthread_cond_signal(&gccond);
   pthread_mutex_unlock(&gclistlock);
+#ifdef DSTM
   /* Add transaction to check if thread finished for join operation */
+  value = pthread_getspecific(oid);
+  oidvalue = *((unsigned int *)value);
   goto transstart;
-transretry:
-
 transstart:
-       trans = transStart();
-       ptr  = (void *)transRead(trans, (unsigned int) a);
-       struct ___Thread___ *tmp = ((char *) ptr + sizeof(objheader_t));
-       tmp->___threadDone___ = 1;
-       if(transCommit(trans)) {
-                                       goto transretry;
-  } else {
-                                       COMMIT_OBJ();
+  trans = transStart();
+  ptr = transRead(trans, oidvalue);
+  struct ___Thread___ *p = (struct ___Thread___ *) ptr;
+  p->___threadDone___ = 1;
+  while(!transCommit(trans)) {
+         printf("DEBUG-> Trans not committed yet\n");
+         transAbort(trans);
+         goto transstart;
   }
+#endif 
   pthread_exit(NULL);
 }
 
@@ -118,28 +125,48 @@ void CALL11(___Thread______sleep____J, long long ___millis___, long long ___mill
 }
 
 /* Add thread join capability */
-#ifdef DSTM
 void CALL01(___Thread______join____, struct ___Thread___ * ___this___) {
-  pthread_t thread;
   printf("DEBUG -> Inside thread join\n");
-  int status;
-  if(VAR(___this___)->___threadDone___) {
+#ifdef DSTM
+  pthread_t thread;
+  unsigned int *oidarray, mid;
+  unsigned short *versionarray, version;
+  transrecord_t *trans;
+  objheader_t *ptr;
+  /* Add transaction to check if thread finished for join operation */
+transstart:
+  trans = transStart();
+  ptr = transRead(trans, (unsigned int) VAR(___this___));
+  struct ___Thread___ *p = (struct ___Thread___ *) ptr;
+  if(p->___threadDone___ == 1) {
+         transAbort(trans);
          return;
   } else {
-         /* Request Notification */
-         pthread_cond_broadcast(&objcond);
-         pthread_mutex_unlock(&objlock);
-         pthread_mutex_lock(&threadnotifylock);//wake everyone up
-         status = reqNotify((unsigned int)VAR(___this___));
-         
-         if((status = reqNotify((unsigned int)VAR(___this___))) != 0) {
-                 printf("No notification is sent %s, %d\n", __FILE__, __LINE__);
-         } else {
+         version = (ptr-1)->version;
+         if((oidarray = calloc(1, sizeof(unsigned int))) == NULL) {
+                 printf("Calloc error %s, %d\n", __FILE__, __LINE__);
+                 return;
+         }
+
+         oidarray[0] = (unsigned int) VAR(___this___);
+
+         if((versionarray = calloc(1, sizeof(unsigned short))) == NULL) {
+                 printf("Calloc error %s, %d\n", __FILE__, __LINE__);
+                 free(oidarray);
                  return;
          }
+         versionarray[0] = version;
+         mid = lhashSearch((unsigned int) VAR(___this___));
+         /* Request Notification */
+         reqNotify(oidarray, versionarray, mid, 1); 
+         free(oidarray);
+         free(versionarray);
+         transAbort(trans);
+         goto transstart;
   }
-}
+  return;
 #endif
+}
 
 #ifdef THREADS
 void CALL01(___Thread______nativeCreate____, struct ___Thread___ * ___this___) {
@@ -170,7 +197,14 @@ void CALL12(___Thread______start____I, int ___mid___, struct ___Thread___ * ___t
 #endif
 
 #ifdef DSTM
+void globalDestructor(void *value) {
+       free(value);
+       pthread_setspecific(oid, NULL);
+}
+
 void initDSMthread(int *ptr) {
+  objheader_t *tmp;    
+  void *threadData;
   int oid=ptr[0];
   int type=ptr[1];
   free(ptr);
@@ -180,25 +214,26 @@ void initDSMthread(int *ptr) {
 #else
   ((void (*)(void *))virtualtable[type*MAXCOUNT+RUNMETHOD])(oid);
 #endif
+  threadData = calloc(1, sizeof(unsigned int));
+  *((unsigned int *) threadData) = oid;
+  pthread_setspecific(oid, threadData);
   pthread_mutex_lock(&gclistlock);
-       threadcount--;
-       pthread_cond_signal(&gccond);
-       pthread_mutex_unlock(&gclistlock);
-       /* Add transaction to check if thread finished for join operation */
-       goto transstart;
-transretry:
-       //TODO
-
+  threadcount--;
+  pthread_cond_signal(&gccond);
+  pthread_mutex_unlock(&gclistlock);
+  /* Add transaction to check if thread finished for join operation */
+  goto transstart;
 transstart:
-       trans = transStart();
-       ptr  = (void *)transRead(trans, (unsigned int) oid);
-       struct ___Thread___ *tmp = ((char *) ptr + sizeof(objheader_t));
-       tmp->___threadDone___ = 1;
-       if(transCommit(trans)) {
-                                       goto transretry;
-       } else {
-                                       //TODO
-       }
+  trans = transStart();
+  tmp  = transRead(trans, (unsigned int) oid);
+  struct ___Thread___ *t = (struct ___Thread___ *) tmp;
+  t->___threadDone___ = 1;
+  while(!transCommit(trans)) {
+         printf("DEBUG-> Trans not committed yet\n");
+         transAbort(trans);
+         goto transstart;
+  }
+  pthread_exit(NULL);
 }
 
 void startDSMthread(int oid, int objType) {
@@ -214,6 +249,7 @@ void startDSMthread(int oid, int objType) {
   int * ptr=malloc(sizeof(int)*2);
   ptr[0]=oid;
   ptr[1]=objType;
+  pthread_key_create(&oid, globalDestructor);
   do {
     retval=pthread_create(&thread, &nattr, (void * (*)(void *)) &initDSMthread,  ptr);
     if (retval!=0)
index 6eba697cdd0fa87eefda0e0f8159c4d8bcc8b7d3..e38acce11930691b6e17826ceaa69487dd229e7f 100644 (file)
@@ -1,13 +1,13 @@
 public class Atomic3 extends Thread {
        public Atomic3() {
        }
-       static Tree root;
+       Tree root;
        Integer count;
        public static void main(String[] st) {
                int mid = (128<<24)|(195<<16)|(175<<8)|70;
                int b;
                Atomic3 at3 = null;
-               Integer z;
+               Integer y,z;
                atomic {
                        at3 = global new Atomic3();
                        z = global new Integer(300);
@@ -15,9 +15,15 @@ public class Atomic3 extends Thread {
                        at3.root.insert(z);
                        b = at3.root.value.intValue();
                }
+               System.printString("b is ");
+               System.printInt(b);
                atomic{
                        at3.root.item = 2445;
+                       y = global new Integer(400);
+                       at3.root.value = y;
+                       b = at3.root.value.intValue();
                }
+               System.printString("b is ");
                System.printInt(b);
                System.printString("\n");
                System.printString("Starting\n");
@@ -31,10 +37,9 @@ public class Atomic3 extends Thread {
        public int run() {
                int a;
                atomic {
-                       //FIXME a bug value of trans commit is not saved
                        a = root.value.intValue();
-                       //a = root.item;
                }
+               System.printString("a is ");
                System.printInt(a);
                System.printString("\n");
        }
index 563acbf95901afdc78b93609f64334d681ce82c0..d883c0b92add43f5a8ef76001231d652f7b4f404 100644 (file)
@@ -86,6 +86,6 @@ public class People {
        public boolean isSenior() {
                if(this.getAge() > 65)
                        return true;
-               return false;;
+               return false;
        }
 }
index 85e31c0d6869148eae68c5111b3af691869e1b92..6c8f1f10a665849ee2775d5c99780853ebc2c084 100755 (executable)
@@ -223,7 +223,7 @@ $ROBUSTROOT/Runtime/GenericHashtable.c $ROBUSTROOT/Runtime/object.c"
 if $DSMFLAG
 then
 EXTRAOPTIONS="$EXTRAOPTIONS -lpthread -DCOMPILER -DDSTM -I$DSMRUNTIME"
-FILES="$FILES $DSMRUNTIME/trans.c $DSMRUNTIME/mcpileq.c $DSMRUNTIME/objstr.c $DSMRUNTIME/dstm.c $DSMRUNTIME/mlookup.c $DSMRUNTIME/clookup.c $DSMRUNTIME/llookup.c $DSMRUNTIME/dstmserver.c $DSMRUNTIME/plookup.c $DSMRUNTIME/ip.c $DSMRUNTIME/queue.c $DSMRUNTIME/prelookup.c $DSMRUNTIME/machinepile.c $DSMRUNTIME/localobjects.c $ROBUSTROOT/Runtime/thread.c"
+FILES="$FILES $DSMRUNTIME/trans.c $DSMRUNTIME/mcpileq.c $DSMRUNTIME/objstr.c $DSMRUNTIME/dstm.c $DSMRUNTIME/mlookup.c $DSMRUNTIME/clookup.c $DSMRUNTIME/llookup.c $DSMRUNTIME/threadnotify.c $DSMRUNTIME/dstmserver.c $DSMRUNTIME/plookup.c $DSMRUNTIME/ip.c $DSMRUNTIME/queue.c $DSMRUNTIME/prelookup.c $DSMRUNTIME/machinepile.c $DSMRUNTIME/localobjects.c $ROBUSTROOT/Runtime/thread.c"
 fi
 
 if $RECOVERFLAG