fix minor errors
authoradash <adash>
Fri, 10 Aug 2007 20:09:13 +0000 (20:09 +0000)
committeradash <adash>
Fri, 10 Aug 2007 20:09:13 +0000 (20:09 +0000)
1.Modify Makefile and test*.c files to reflect new ipaddrs of dw-1 and dw-2
for testing purposes
2. compare versions before inserting objs into prefetch cache
3. When objects not found through a prefetch request, throw error
4. TODO : Implement when to delete old objects from prefetch cache

Robust/src/Runtime/DSTM/interface/Makefile
Robust/src/Runtime/DSTM/interface/dstm.c
Robust/src/Runtime/DSTM/interface/dstmserver.c
Robust/src/Runtime/DSTM/interface/prelookup.c
Robust/src/Runtime/DSTM/interface/testclient.c
Robust/src/Runtime/DSTM/interface/testserver.c
Robust/src/Runtime/DSTM/interface/trans.c

index b04fca6df53ee4ac8c378bdbff8c94574dcc9110..5ab26a4a17e012503913b0d8d7c123f03d4b4bc3 100644 (file)
@@ -1,22 +1,22 @@
-d-3:
-       gcc -dr -lpthread -g -o d-3 trans.c testd-3.c mlookup.c clookup.c llookup.c dstm.c objstr.c dstmserver.c plookup.c ip.c queue.c prelookup.c mcpileq.c machinepile.c
+dw-2:
+       gcc -dr -lpthread -g -o dw-2 trans.c testdw-2.c mlookup.c clookup.c llookup.c dstm.c objstr.c dstmserver.c plookup.c ip.c queue.c prelookup.c mcpileq.c machinepile.c
 
 demsky:
        gcc -DDEBUG -lpthread -g -o demsky dstmserver.c testserver.c plookup.c mlookup.c clookup.c llookup.c dstm.c objstr.c trans.c ip.c queue.c prelookup.c mcpileq.c machinepile.c 
 
-d-4:
-       gcc -lpthread -g -o d-4 trans.c testd-4.c mlookup.c clookup.c llookup.c dstm.c objstr.c dstmserver.c plookup.c ip.c queue.c prelookup.c mcpileq.c machinepile.c
+dw-1:
+       gcc -lpthread -g -o dw-1 trans.c testdw-1.c mlookup.c clookup.c llookup.c dstm.c objstr.c dstmserver.c plookup.c ip.c queue.c prelookup.c mcpileq.c machinepile.c
 
 all:
-       gcc -lpthread -g -o d-3 trans.c testd-3.c mlookup.c clookup.c llookup.c dstm.c objstr.c dstmserver.c plookup.c ip.c queue.c prelookup.c mcpileq.c machinepile.c
+       gcc -lpthread -g -o dw-2 trans.c testdw-2.c mlookup.c clookup.c llookup.c dstm.c objstr.c dstmserver.c plookup.c ip.c queue.c prelookup.c mcpileq.c machinepile.c
        gcc -lpthread -g -o demsky dstmserver.c testserver.c plookup.c mlookup.c clookup.c llookup.c dstm.c objstr.c trans.c ip.c queue.c prelookup.c mcpileq.c machinepile.c
-       gcc -lpthread -g -o d-4 trans.c testd-4.c mlookup.c clookup.c llookup.c dstm.c objstr.c dstmserver.c plookup.c ip.c queue.c prelookup.c mcpileq.c machinepile.c
+       gcc -lpthread -g -o dw-1 trans.c testdw-1.c mlookup.c clookup.c llookup.c dstm.c objstr.c dstmserver.c plookup.c ip.c queue.c prelookup.c mcpileq.c machinepile.c
 
 
 mac:
-       gcc -DMAC -lpthread -g -o d-3 trans.c testd-3.c mlookup.c clookup.c llookup.c dstm.c objstr.c dstmserver.c plookup.c ip.c queue.c prelookup.c mcpileq.c machinepile.c
+       gcc -DMAC -lpthread -g -o dw-2 trans.c testdw-2.c mlookup.c clookup.c llookup.c dstm.c objstr.c dstmserver.c plookup.c ip.c queue.c prelookup.c mcpileq.c machinepile.c
        gcc -DMAC -lpthread -g -o demsky dstmserver.c testserver.c plookup.c mlookup.c clookup.c llookup.c dstm.c objstr.c trans.c ip.c queue.c prelookup.c mcpileq.c machinepile.c
-       gcc -DMAC -lpthread -g -o d-4 trans.c testd-4.c mlookup.c clookup.c llookup.c dstm.c objstr.c dstmserver.c plookup.c ip.c queue.c prelookup.c mcpileq.c machinepile.c
+       gcc -DMAC -lpthread -g -o dw-1 trans.c testdw-1.c mlookup.c clookup.c llookup.c dstm.c objstr.c dstmserver.c plookup.c ip.c queue.c prelookup.c mcpileq.c machinepile.c
 
 clean:
-       rm -rf d-3 d-4 demsky
+       rm -rf dw-2 dw-1 demsky
index 7fbb553b4f0a2e8bc7d8fc75c616e3af7f4de09c..c586dd754c4e08dc33ee4b1f5d53b459b46b79b6 100644 (file)
@@ -7,7 +7,7 @@ extern int classsize[];
 // Get a new object id
 unsigned int getNewOID(void) {
        static int id = 1;
-       return id++;
+       return id+=2;
 }
 
 // Get the size of the object for a given type
index 30dd47d0da46e5530cde028862df47791348e3e3..a1136966d520eb703ecd4a0f3e00894bbadd1af2 100644 (file)
@@ -263,6 +263,7 @@ int processClientReq(fixed_data_t *fixed, trans_commit_data_t *transinfo,
                printf("Handle Trans Req error %s, %d\n", __FILE__, __LINE__);
                return 1;
        }
+
        /* Read new control message from Coordiator */
        if((retval = recv((int)acceptfd, &control, sizeof(char), 0)) <= 0 ) {
                perror("Error in receiving control message\n");
@@ -293,7 +294,6 @@ int processClientReq(fixed_data_t *fixed, trans_commit_data_t *transinfo,
                                return 1;
                        }
                        ptr = NULL;
-       //              return 0;
                        break;
 
                case TRANS_COMMIT:
@@ -333,8 +333,7 @@ int processClientReq(fixed_data_t *fixed, trans_commit_data_t *transinfo,
 }
 
 /* This function increments counters while running a voting decision on all objects involved 
- * in TRANS_REQUEST */
-
+ * in TRANS_REQUEST and If a TRANS_DISAGREE sends the response immediately back to the coordinator */
 char handleTransReq(fixed_data_t *fixed, trans_commit_data_t *transinfo, unsigned int *listmid, char *objread, void *modptr, int acceptfd) {
        int val, i = 0;
        short version;
@@ -445,6 +444,7 @@ int decideCtrlMessage(fixed_data_t *fixed, trans_commit_data_t *transinfo, int *
        /* Condition to send TRANS_AGREE */
        if(*(v_matchnolock) == fixed->numread + fixed->nummod) {
                control = TRANS_AGREE;
+               /* Send control message */
                if((val = send(acceptfd, &control, sizeof(char), MSG_NOSIGNAL)) < sizeof(char)) {
                        perror("Error in sending control to Coordinator\n");
                        return 0;
index 77733dfd05e0fa586af5107aecd4869a737f1b52..e92d5957b120cda38d36b0f562bf13ef3492b65a 100644 (file)
@@ -1,4 +1,5 @@
- #include "prelookup.h"
+/* LOCK THE ENTIRE HASH TABLE */
+#include "prelookup.h"
 
 prehashtable_t pflookup; //Global prefetch cache table
 
index 0cf1cf0acae2cafe4b7c395f7c1b7427b665a2ba..66ad94d888e90314c1bc5101b3af01af6b691f2d 100644 (file)
@@ -7,8 +7,36 @@
 #define LISTEN_PORT 2156
 
 extern objstr_t *mainobjstore;
-
-int classsize[]={sizeof(int),sizeof(char),sizeof(short), sizeof(void *)};      
+typedef struct testobj1 {
+       int x;
+       char z;
+} testobj1_t;
+
+typedef struct testobj2 {
+       char z[10];
+       char c;
+       testobj1_t *y;
+} testobj2_t;
+
+typedef struct testobj3 {
+       short p;
+       testobj1_t *q;
+       testobj2_t *r;
+} testobj3_t;
+
+typedef struct testobj4 {
+       int b;
+       void *q;
+       testobj3_t *a;
+} testobj4_t;
+
+typedef struct testobj5 {
+       testobj4_t *a;
+} testobj5_t;
+
+
+int classsize[]={sizeof(int),sizeof(char),sizeof(short), sizeof(void *), sizeof(testobj1_t), 
+       sizeof(testobj2_t), sizeof(testobj3_t), sizeof(testobj4_t), sizeof(testobj5_t)};        
 
 unsigned int createObjects(transrecord_t *record) {
        objheader_t *header, *tmp;
index 85d3b920b922224621ed8b6f250fb107c98e27db..84b1abfe8cf36dbf639731610de657c7fc9873f4 100644 (file)
@@ -6,7 +6,39 @@
 #include "ip.h"
 
 extern objstr_t *mainobjstore;
-int classsize[]={sizeof(int),sizeof(char),sizeof(short), sizeof(void *)};
+typedef struct testobj1 {
+       int x;
+       char z;
+} testobj1_t;
+
+typedef struct testobj2 {
+       char z[10];
+       char c;
+       testobj1_t *y;
+} testobj2_t;
+
+typedef struct testobj3 {
+       short p;
+       testobj1_t *q;
+       testobj2_t *r;
+} testobj3_t;
+
+typedef struct testobj4 {
+       int b;
+       void *q;
+       testobj3_t *a;
+} testobj4_t;
+
+typedef struct testobj5 {
+       testobj4_t *a;
+} testobj5_t;
+
+
+int classsize[]={sizeof(int),sizeof(char),sizeof(short), sizeof(void *), sizeof(testobj1_t), 
+       sizeof(testobj2_t), sizeof(testobj3_t), sizeof(testobj4_t), sizeof(testobj5_t)};        
+
+
+//int classsize[]={sizeof(int),sizeof(char),sizeof(short), sizeof(void *)};
 
 int test1(void);
 int test2(void);
@@ -184,7 +216,7 @@ int test3() {
        lhashInsert(header->oid, mid);
 
        //Inserting into lhashtable
-       mid = iptoMid("128.200.9.29"); //d-3.eecs.uci.edu
+       mid = iptoMid("128.195.175.70"); //dw-2.eecs.uci.edu
        lhashInsert(20, mid);
        lhashInsert(21, mid);
        lhashInsert(22, mid);
@@ -198,9 +230,9 @@ int test3() {
 
        //Check if machine dw-1 is up and running
        checkServer(mid, "128.195.175.69");
-       mid = iptoMid("128.200.9.29");
-       //Check if machine d-3 is up and running
-       checkServer(mid, "128.200.9.29");
+       mid = iptoMid("128.195.175.70");
+       //Check if machine dw-2 is up and running
+       checkServer(mid, "128.195.175.70");
 
        // Start Transaction    
        myTrans = transStart();
@@ -239,6 +271,7 @@ int test4() {
        objheader_t *h1, *h2, *h3, *h4;//h1,h2 from local ; h3 from d-1 , h-4 from d-2
 
        dstmInit();     
+       transInit();
        pthread_attr_init(&attr);
        pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_JOINABLE);
 
@@ -276,7 +309,7 @@ int test4() {
        lhashInsert(header->oid, mid);
 
        //Inserting into lhashtable
-       mid = iptoMid("128.200.9.29"); //d-3.eecs.uci.edu
+       mid = iptoMid("128.195.175.70"); //dw-2.eecs.uci.edu
        lhashInsert(20, mid);
        lhashInsert(21, mid);
        lhashInsert(22, mid);
@@ -290,9 +323,9 @@ int test4() {
        pthread_create(&thread_Listen, &attr, dstmListen, NULL);
        //Check if machine dw-1 is up and running
        checkServer(mid, "128.195.175.69");
-       mid = iptoMid("128.200.9.29");
-       //Check if machine d-3 is up and running
-       checkServer(mid, "128.200.9.29");
+       mid = iptoMid("128.195.175.70");
+       //Check if machine dw-2 is up and running
+       checkServer(mid, "128.195.175.70");
 
        // Start Transaction    
        myTrans = transStart();
@@ -310,7 +343,7 @@ int test4() {
        if((h3 = transRead(myTrans, 31)) == NULL) {
                printf("Object not found\n");
        }
-       //read object 21(present in d-3 machine)
+       //read object 21(present in dw-2 machine)
        if((h4 = transRead(myTrans, 21)) == NULL) {
                printf("Object not found\n");
        }
index 31d86115c307f94047c28e9fa731da04e35dd183..aaa16ef48cff6c9e6abb6661480bd05e5b8fc5cf 100644 (file)
@@ -99,6 +99,7 @@ void transInit() {
        //Create the primary prefetch thread 
        pthread_create(&tPrefetch, NULL, transPrefetch, NULL);
        //Create and Initialize a pool of threads 
+       /* Threads are active for the entire period runtime is running */
        for(t = 0; t< NUM_THREADS; t++) {
                rc = pthread_create(&wthreads[t], NULL, mcqProcess, (void *)t);
                if (rc) {
@@ -106,7 +107,6 @@ void transInit() {
                        return;
                }
        }
-       //TODO when to deletethreads
 }
 
 /* This function stops the threads spawned */
@@ -145,8 +145,8 @@ transrecord_t *transStart()
 
 /* This function finds the location of the objects involved in a transaction
  * and returns the pointer to the object if found in a remote location */
-objheader_t *transRead(transrecord_t *record, unsigned int oid)
-{      
+objheader_t *transRead(transrecord_t *record, unsigned int oid) {      
+       printf("Inside transaction read call\n");
        unsigned int machinenumber;
        objheader_t *tmp, *objheader;
        void *objcopy;
@@ -162,9 +162,11 @@ objheader_t *transRead(transrecord_t *record, unsigned int oid)
 
        /* Search local transaction cache */
        if((objheader = (objheader_t *)chashSearch(record->lookupTable, oid)) != NULL){
+               printf("Inside transaction cache \n");
                return(objheader);
        } else if ((objheader = (objheader_t *) mhashSearch(oid)) != NULL) {
                /* Look up in machine lookup table  and copy  into cache*/
+               printf("Inside mainobject store \n");
                tmp = mhashSearch(oid);
                size = sizeof(objheader_t)+classsize[TYPE(tmp)];
                objcopy = objstrAlloc(record->cache, size);
@@ -173,6 +175,7 @@ objheader_t *transRead(transrecord_t *record, unsigned int oid)
                chashInsert(record->lookupTable, OID(objheader), objcopy); 
                return(objcopy);
        } else if((tmp = (objheader_t *) prehashSearch(oid)) != NULL) { /* Look up in prefetch cache */
+               printf("Inside prefetch cache \n");
                found = 1;
                size = sizeof(objheader_t)+classsize[TYPE(tmp)];
                objcopy = objstrAlloc(record->cache, size);
@@ -181,6 +184,7 @@ objheader_t *transRead(transrecord_t *record, unsigned int oid)
                chashInsert(record->lookupTable, OID(tmp), objcopy); 
                return(objcopy);
        } else { /* If not found anywhere, then block until object appears in prefetch cache */
+               printf("Inside remote machine\n");
                pthread_mutex_lock(&pflookup.lock);
                while(!found) {
                        rc = pthread_cond_timedwait(&pflookup.cond, &pflookup.lock, &ts);
@@ -300,6 +304,7 @@ int transCommit(transrecord_t *record) {
        char treplyctrl = 0, treplyretry = 0; /* keeps track of the common response that needs to be sent */
        char localstat = 0;
 
+
        /* Look through all the objects in the transaction record and make piles 
         * for each machine involved in the transaction*/
        pile = createPiles(record);
@@ -417,7 +422,6 @@ int transCommit(transrecord_t *record) {
        if(treplyretry == 1) {
                /* wait a random amount of time */
                randomdelay();
-               //sleep(1);
                /* Retry the commiting transaction again */
                transCommit(record);
        }
@@ -572,6 +576,7 @@ int decideResponse(thread_data_array_t *tdata) {
                /* Send Abort */
                *(tdata->replyctrl) = TRANS_ABORT;
                printf("DEBUG-> trans.c Sending TRANS_ABORT\n");
+               /* Free resources */
                objstrDelete(tdata->rec->cache);
                chashDelete(tdata->rec->lookupTable);
                free(tdata->rec);
@@ -579,6 +584,7 @@ int decideResponse(thread_data_array_t *tdata) {
                /* Send Commit */
                *(tdata->replyctrl) = TRANS_COMMIT;
                printf("DEBUG-> trans.c Sending TRANS_COMMIT\n");
+               /* Free resources */
                objstrDelete(tdata->rec->cache);
                chashDelete(tdata->rec->lookupTable);
                free(tdata->rec);
@@ -594,7 +600,7 @@ int decideResponse(thread_data_array_t *tdata) {
 
        return 0;
 }
-/* This function sends the final response to all threads in their respective socket id */
+/* This function sends the final response to remote machines per thread in their respective socket id */
 char sendResponse(thread_data_array_t *tdata, int sd) {
        int n, N, sum, oidcount = 0;
        char *ptr, retval = 0;
@@ -1232,8 +1238,10 @@ void *transPrefetch(void *t) {
        }
 }
 
-/*The pool of threads work on this function to establish connection with
- * remote machines */
+/* Each thread in the  pool of threads calls this function to establish connection with
+ * remote machines, send the prefetch requests and process the reponses from
+ * the remote machines .
+ * The thread is active throughout the period of runtime */
 
 void *mcqProcess(void *threadid) {
        int tid;
@@ -1258,66 +1266,12 @@ void *mcqProcess(void *threadid) {
                /*Initiate connection to remote host and send request */ 
                /* Process Request */
                sendPrefetchReq(mcpilenode, tid);
-               /* TODO: For each object not found query DHT for new location and retrieve the object */
 
                /* Deallocate the machine queue pile node */
                mcdealloc(mcpilenode);
        }
 }
 
-/*This function is called by the thread that processes the 
- * prefetch request makes piles to prefetch records and prefetches the oids from remote machines */
-int transPrefetchProcess(transrecord_t *record, int *arrayofoffset[], short numoids){
-       int i, k = 0, rc;
-       int arraylength[numoids];
-       unsigned int machinenumber;
-       objheader_t *tmp, *objheader;
-       void *objcopy;
-       int size;
-       pthread_attr_t attr;
-
-       /* Given tuple find length of tuple*/
-       for(i = 0; i < numoids ; i++) {
-               arraylength[i] = arrayLength(arrayofoffset[i]);
-       }
-
-       /* Initialize and set thread attributes 
-        * Spawn a thread for each prefetch request sent*/
-       pthread_t thread[numoids];
-       pthread_attr_init(&attr);
-       pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_JOINABLE);
-
-       /* Create  Machine Piles to send prefetch requests use threads*/
-       for( i = 0 ; i< numoids ; i++) {
-               if(arrayofoffset[i][0] == -1) 
-                       continue;
-               else{
-                       /* For each Pile in the machine send TRANS_PREFETCH */
-                       //makePiles(arrayofoffset, numoids);
-                       /* Fill thread data structure */
-                       //rc = pthread_create(&thread[i] , &attr, sendPrefetchReq, (void *) arrayofoffset[i]);
-                       if (rc) {
-                               perror("Error in pthread create at transPrefetchProcess()\n");
-                               return 1;
-                       }
-
-               }
-       }
-
-       /* Free attribute and wait to join other threads */
-       for (i = 0 ;i < numoids ; i++) {
-               rc = pthread_join(thread[i], NULL);
-               if (rc) {
-                       perror("Error pthread_join() in transPrefetchProcess()\n");
-                       return 1;
-               }
-       }
-       pthread_attr_destroy(&attr);
-
-       return 0;
-
-}
-
 void sendPrefetchReq(prefetchpile_t *mcpilenode, int threadid) {
        int sd, i, offset, off, len, endpair, numoffsets, count = 0;
        struct sockaddr_in serv_addr;
@@ -1384,6 +1338,7 @@ void sendPrefetchReq(prefetchpile_t *mcpilenode, int threadid) {
        /* Get Response from the remote machine */
        getPrefetchResponse(count,sd);
        close(sd);
+       return;
 }
 
 void getPrefetchResponse(int count, int sd) {
@@ -1391,13 +1346,14 @@ void getPrefetchResponse(int count, int sd) {
        unsigned int bufsize,oid;
        char buffer[RECEIVE_BUFFER_SIZE], control;
        char *ptr;
-       void *modptr;
+       void *modptr, *oldptr;
 
        /* Read  prefetch response from the Remote machine */
        if((val = read(sd, &control, sizeof(char))) <= 0) {
                perror("No control response for Prefetch request sent\n");
                return;
        }
+
        if(control == TRANS_PREFETCH_RESPONSE) {
                /*For each oid and offset tuple sent as prefetch request to remote machine*/
                while(i < count) {
@@ -1435,19 +1391,35 @@ void getPrefetchResponse(int count, int sd) {
                                        }
                                        memcpy(modptr, buffer+index, objsize);
                                        index += sizeof(int);
-                                       /* Add pointer and oid to hash table */
-                                       //TODO Do we need a version comparison here??
-                                       prehashInsert(oid, modptr);
+                                       /* Insert the oid and its address into the prefetch hash lookup table */
+                                       /* Do a version comparison if the oid exists */
+                                       if((oldptr = prehashSearch(oid)) != NULL) {
+                                               /* If older version then update with new object ptr */
+                                               if(((objheader_t *)oldptr)->version < ((objheader_t *)modptr)->version) {
+                                                       prehashRemove(oid);
+                                                       prehashInsert(oid, modptr);
+                                               } else if(((objheader_t *)oldptr)->version == ((objheader_t *)modptr)->version) { 
+                                                       /* Add the new object ptr to hash table */
+                                                       prehashInsert(oid, modptr);
+                                               } else { /* Do nothing */
+                                                       ;
+                                               }
+                                       } else {/*If doesn't no match found in hashtable, add the object ptr to hash table*/
+                                               prehashInsert(oid, modptr);
+                                       }
                                        /* Broadcast signal on prefetch cache condition variable */ 
                                        pthread_cond_broadcast(&pflookup.cond);
                                        /* Unlock the Prefetch Cache look up table*/
                                        pthread_mutex_unlock(&pflookup.lock);
                                } else if(buffer[index] == OBJECT_NOT_FOUND) {
                                        /* Increment it to get the object */
-                                       // TODO If object not found, local machine takes inventory
+                                       /* TODO: For each object not found query DHT for new location and retrieve the object */
                                        index += sizeof(char);
                                        memcpy(&oid, buffer + index, sizeof(unsigned int));
                                        index += sizeof(unsigned int);
+                                       /* Throw an error */
+                                       printf("OBJECT NOT FOUND.... THIS SHOULD NOT HAPPEN...TERMINATE PROGRAM\n");
+                                       exit(-1);
                                } else 
                                        printf("Error in decoding the index value %s, %d\n",__FILE__, __LINE__);
                        }
@@ -1456,4 +1428,5 @@ void getPrefetchResponse(int count, int sd) {
                }
        } else
                printf("Error in receving response for prefetch request %s, %d\n",__FILE__, __LINE__);
+       return;
 }