From a96bba74169327818b5e165fdcac5c8c2876fc43 Mon Sep 17 00:00:00 2001 From: adash Date: Fri, 10 Aug 2007 20:09:13 +0000 Subject: [PATCH] fix minor errors 1.Modify Makefile and test*.c files to reflect new ipaddrs of dw-1 and dw-2 for testing purposes 2. compare versions before inserting objs into prefetch cache 3. When objects not found through a prefetch request, throw error 4. TODO : Implement when to delete old objects from prefetch cache --- Robust/src/Runtime/DSTM/interface/Makefile | 18 +-- Robust/src/Runtime/DSTM/interface/dstm.c | 2 +- .../src/Runtime/DSTM/interface/dstmserver.c | 6 +- Robust/src/Runtime/DSTM/interface/prelookup.c | 3 +- .../src/Runtime/DSTM/interface/testclient.c | 32 +++++- .../src/Runtime/DSTM/interface/testserver.c | 53 +++++++-- Robust/src/Runtime/DSTM/interface/trans.c | 105 +++++++----------- 7 files changed, 127 insertions(+), 92 deletions(-) diff --git a/Robust/src/Runtime/DSTM/interface/Makefile b/Robust/src/Runtime/DSTM/interface/Makefile index b04fca6d..5ab26a4a 100644 --- a/Robust/src/Runtime/DSTM/interface/Makefile +++ b/Robust/src/Runtime/DSTM/interface/Makefile @@ -1,22 +1,22 @@ -d-3: - gcc -dr -lpthread -g -o d-3 trans.c testd-3.c mlookup.c clookup.c llookup.c dstm.c objstr.c dstmserver.c plookup.c ip.c queue.c prelookup.c mcpileq.c machinepile.c +dw-2: + gcc -dr -lpthread -g -o dw-2 trans.c testdw-2.c mlookup.c clookup.c llookup.c dstm.c objstr.c dstmserver.c plookup.c ip.c queue.c prelookup.c mcpileq.c machinepile.c demsky: gcc -DDEBUG -lpthread -g -o demsky dstmserver.c testserver.c plookup.c mlookup.c clookup.c llookup.c dstm.c objstr.c trans.c ip.c queue.c prelookup.c mcpileq.c machinepile.c -d-4: - gcc -lpthread -g -o d-4 trans.c testd-4.c mlookup.c clookup.c llookup.c dstm.c objstr.c dstmserver.c plookup.c ip.c queue.c prelookup.c mcpileq.c machinepile.c +dw-1: + gcc -lpthread -g -o dw-1 trans.c testdw-1.c mlookup.c clookup.c llookup.c dstm.c objstr.c dstmserver.c plookup.c ip.c queue.c prelookup.c mcpileq.c machinepile.c all: - gcc -lpthread -g -o d-3 trans.c testd-3.c mlookup.c clookup.c llookup.c dstm.c objstr.c dstmserver.c plookup.c ip.c queue.c prelookup.c mcpileq.c machinepile.c + gcc -lpthread -g -o dw-2 trans.c testdw-2.c mlookup.c clookup.c llookup.c dstm.c objstr.c dstmserver.c plookup.c ip.c queue.c prelookup.c mcpileq.c machinepile.c gcc -lpthread -g -o demsky dstmserver.c testserver.c plookup.c mlookup.c clookup.c llookup.c dstm.c objstr.c trans.c ip.c queue.c prelookup.c mcpileq.c machinepile.c - gcc -lpthread -g -o d-4 trans.c testd-4.c mlookup.c clookup.c llookup.c dstm.c objstr.c dstmserver.c plookup.c ip.c queue.c prelookup.c mcpileq.c machinepile.c + gcc -lpthread -g -o dw-1 trans.c testdw-1.c mlookup.c clookup.c llookup.c dstm.c objstr.c dstmserver.c plookup.c ip.c queue.c prelookup.c mcpileq.c machinepile.c mac: - gcc -DMAC -lpthread -g -o d-3 trans.c testd-3.c mlookup.c clookup.c llookup.c dstm.c objstr.c dstmserver.c plookup.c ip.c queue.c prelookup.c mcpileq.c machinepile.c + gcc -DMAC -lpthread -g -o dw-2 trans.c testdw-2.c mlookup.c clookup.c llookup.c dstm.c objstr.c dstmserver.c plookup.c ip.c queue.c prelookup.c mcpileq.c machinepile.c gcc -DMAC -lpthread -g -o demsky dstmserver.c testserver.c plookup.c mlookup.c clookup.c llookup.c dstm.c objstr.c trans.c ip.c queue.c prelookup.c mcpileq.c machinepile.c - gcc -DMAC -lpthread -g -o d-4 trans.c testd-4.c mlookup.c clookup.c llookup.c dstm.c objstr.c dstmserver.c plookup.c ip.c queue.c prelookup.c mcpileq.c machinepile.c + gcc -DMAC -lpthread -g -o dw-1 trans.c testdw-1.c mlookup.c clookup.c llookup.c dstm.c objstr.c dstmserver.c plookup.c ip.c queue.c prelookup.c mcpileq.c machinepile.c clean: - rm -rf d-3 d-4 demsky + rm -rf dw-2 dw-1 demsky diff --git a/Robust/src/Runtime/DSTM/interface/dstm.c b/Robust/src/Runtime/DSTM/interface/dstm.c index 7fbb553b..c586dd75 100644 --- a/Robust/src/Runtime/DSTM/interface/dstm.c +++ b/Robust/src/Runtime/DSTM/interface/dstm.c @@ -7,7 +7,7 @@ extern int classsize[]; // Get a new object id unsigned int getNewOID(void) { static int id = 1; - return id++; + return id+=2; } // Get the size of the object for a given type diff --git a/Robust/src/Runtime/DSTM/interface/dstmserver.c b/Robust/src/Runtime/DSTM/interface/dstmserver.c index 30dd47d0..a1136966 100644 --- a/Robust/src/Runtime/DSTM/interface/dstmserver.c +++ b/Robust/src/Runtime/DSTM/interface/dstmserver.c @@ -263,6 +263,7 @@ int processClientReq(fixed_data_t *fixed, trans_commit_data_t *transinfo, printf("Handle Trans Req error %s, %d\n", __FILE__, __LINE__); return 1; } + /* Read new control message from Coordiator */ if((retval = recv((int)acceptfd, &control, sizeof(char), 0)) <= 0 ) { perror("Error in receiving control message\n"); @@ -293,7 +294,6 @@ int processClientReq(fixed_data_t *fixed, trans_commit_data_t *transinfo, return 1; } ptr = NULL; - // return 0; break; case TRANS_COMMIT: @@ -333,8 +333,7 @@ int processClientReq(fixed_data_t *fixed, trans_commit_data_t *transinfo, } /* This function increments counters while running a voting decision on all objects involved - * in TRANS_REQUEST */ - + * in TRANS_REQUEST and If a TRANS_DISAGREE sends the response immediately back to the coordinator */ char handleTransReq(fixed_data_t *fixed, trans_commit_data_t *transinfo, unsigned int *listmid, char *objread, void *modptr, int acceptfd) { int val, i = 0; short version; @@ -445,6 +444,7 @@ int decideCtrlMessage(fixed_data_t *fixed, trans_commit_data_t *transinfo, int * /* Condition to send TRANS_AGREE */ if(*(v_matchnolock) == fixed->numread + fixed->nummod) { control = TRANS_AGREE; + /* Send control message */ if((val = send(acceptfd, &control, sizeof(char), MSG_NOSIGNAL)) < sizeof(char)) { perror("Error in sending control to Coordinator\n"); return 0; diff --git a/Robust/src/Runtime/DSTM/interface/prelookup.c b/Robust/src/Runtime/DSTM/interface/prelookup.c index 77733dfd..e92d5957 100644 --- a/Robust/src/Runtime/DSTM/interface/prelookup.c +++ b/Robust/src/Runtime/DSTM/interface/prelookup.c @@ -1,4 +1,5 @@ - #include "prelookup.h" +/* LOCK THE ENTIRE HASH TABLE */ +#include "prelookup.h" prehashtable_t pflookup; //Global prefetch cache table diff --git a/Robust/src/Runtime/DSTM/interface/testclient.c b/Robust/src/Runtime/DSTM/interface/testclient.c index 0cf1cf0a..66ad94d8 100644 --- a/Robust/src/Runtime/DSTM/interface/testclient.c +++ b/Robust/src/Runtime/DSTM/interface/testclient.c @@ -7,8 +7,36 @@ #define LISTEN_PORT 2156 extern objstr_t *mainobjstore; - -int classsize[]={sizeof(int),sizeof(char),sizeof(short), sizeof(void *)}; +typedef struct testobj1 { + int x; + char z; +} testobj1_t; + +typedef struct testobj2 { + char z[10]; + char c; + testobj1_t *y; +} testobj2_t; + +typedef struct testobj3 { + short p; + testobj1_t *q; + testobj2_t *r; +} testobj3_t; + +typedef struct testobj4 { + int b; + void *q; + testobj3_t *a; +} testobj4_t; + +typedef struct testobj5 { + testobj4_t *a; +} testobj5_t; + + +int classsize[]={sizeof(int),sizeof(char),sizeof(short), sizeof(void *), sizeof(testobj1_t), + sizeof(testobj2_t), sizeof(testobj3_t), sizeof(testobj4_t), sizeof(testobj5_t)}; unsigned int createObjects(transrecord_t *record) { objheader_t *header, *tmp; diff --git a/Robust/src/Runtime/DSTM/interface/testserver.c b/Robust/src/Runtime/DSTM/interface/testserver.c index 85d3b920..84b1abfe 100644 --- a/Robust/src/Runtime/DSTM/interface/testserver.c +++ b/Robust/src/Runtime/DSTM/interface/testserver.c @@ -6,7 +6,39 @@ #include "ip.h" extern objstr_t *mainobjstore; -int classsize[]={sizeof(int),sizeof(char),sizeof(short), sizeof(void *)}; +typedef struct testobj1 { + int x; + char z; +} testobj1_t; + +typedef struct testobj2 { + char z[10]; + char c; + testobj1_t *y; +} testobj2_t; + +typedef struct testobj3 { + short p; + testobj1_t *q; + testobj2_t *r; +} testobj3_t; + +typedef struct testobj4 { + int b; + void *q; + testobj3_t *a; +} testobj4_t; + +typedef struct testobj5 { + testobj4_t *a; +} testobj5_t; + + +int classsize[]={sizeof(int),sizeof(char),sizeof(short), sizeof(void *), sizeof(testobj1_t), + sizeof(testobj2_t), sizeof(testobj3_t), sizeof(testobj4_t), sizeof(testobj5_t)}; + + +//int classsize[]={sizeof(int),sizeof(char),sizeof(short), sizeof(void *)}; int test1(void); int test2(void); @@ -184,7 +216,7 @@ int test3() { lhashInsert(header->oid, mid); //Inserting into lhashtable - mid = iptoMid("128.200.9.29"); //d-3.eecs.uci.edu + mid = iptoMid("128.195.175.70"); //dw-2.eecs.uci.edu lhashInsert(20, mid); lhashInsert(21, mid); lhashInsert(22, mid); @@ -198,9 +230,9 @@ int test3() { //Check if machine dw-1 is up and running checkServer(mid, "128.195.175.69"); - mid = iptoMid("128.200.9.29"); - //Check if machine d-3 is up and running - checkServer(mid, "128.200.9.29"); + mid = iptoMid("128.195.175.70"); + //Check if machine dw-2 is up and running + checkServer(mid, "128.195.175.70"); // Start Transaction myTrans = transStart(); @@ -239,6 +271,7 @@ int test4() { objheader_t *h1, *h2, *h3, *h4;//h1,h2 from local ; h3 from d-1 , h-4 from d-2 dstmInit(); + transInit(); pthread_attr_init(&attr); pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_JOINABLE); @@ -276,7 +309,7 @@ int test4() { lhashInsert(header->oid, mid); //Inserting into lhashtable - mid = iptoMid("128.200.9.29"); //d-3.eecs.uci.edu + mid = iptoMid("128.195.175.70"); //dw-2.eecs.uci.edu lhashInsert(20, mid); lhashInsert(21, mid); lhashInsert(22, mid); @@ -290,9 +323,9 @@ int test4() { pthread_create(&thread_Listen, &attr, dstmListen, NULL); //Check if machine dw-1 is up and running checkServer(mid, "128.195.175.69"); - mid = iptoMid("128.200.9.29"); - //Check if machine d-3 is up and running - checkServer(mid, "128.200.9.29"); + mid = iptoMid("128.195.175.70"); + //Check if machine dw-2 is up and running + checkServer(mid, "128.195.175.70"); // Start Transaction myTrans = transStart(); @@ -310,7 +343,7 @@ int test4() { if((h3 = transRead(myTrans, 31)) == NULL) { printf("Object not found\n"); } - //read object 21(present in d-3 machine) + //read object 21(present in dw-2 machine) if((h4 = transRead(myTrans, 21)) == NULL) { printf("Object not found\n"); } diff --git a/Robust/src/Runtime/DSTM/interface/trans.c b/Robust/src/Runtime/DSTM/interface/trans.c index 31d86115..aaa16ef4 100644 --- a/Robust/src/Runtime/DSTM/interface/trans.c +++ b/Robust/src/Runtime/DSTM/interface/trans.c @@ -99,6 +99,7 @@ void transInit() { //Create the primary prefetch thread pthread_create(&tPrefetch, NULL, transPrefetch, NULL); //Create and Initialize a pool of threads + /* Threads are active for the entire period runtime is running */ for(t = 0; t< NUM_THREADS; t++) { rc = pthread_create(&wthreads[t], NULL, mcqProcess, (void *)t); if (rc) { @@ -106,7 +107,6 @@ void transInit() { return; } } - //TODO when to deletethreads } /* This function stops the threads spawned */ @@ -145,8 +145,8 @@ transrecord_t *transStart() /* This function finds the location of the objects involved in a transaction * and returns the pointer to the object if found in a remote location */ -objheader_t *transRead(transrecord_t *record, unsigned int oid) -{ +objheader_t *transRead(transrecord_t *record, unsigned int oid) { + printf("Inside transaction read call\n"); unsigned int machinenumber; objheader_t *tmp, *objheader; void *objcopy; @@ -162,9 +162,11 @@ objheader_t *transRead(transrecord_t *record, unsigned int oid) /* Search local transaction cache */ if((objheader = (objheader_t *)chashSearch(record->lookupTable, oid)) != NULL){ + printf("Inside transaction cache \n"); return(objheader); } else if ((objheader = (objheader_t *) mhashSearch(oid)) != NULL) { /* Look up in machine lookup table and copy into cache*/ + printf("Inside mainobject store \n"); tmp = mhashSearch(oid); size = sizeof(objheader_t)+classsize[TYPE(tmp)]; objcopy = objstrAlloc(record->cache, size); @@ -173,6 +175,7 @@ objheader_t *transRead(transrecord_t *record, unsigned int oid) chashInsert(record->lookupTable, OID(objheader), objcopy); return(objcopy); } else if((tmp = (objheader_t *) prehashSearch(oid)) != NULL) { /* Look up in prefetch cache */ + printf("Inside prefetch cache \n"); found = 1; size = sizeof(objheader_t)+classsize[TYPE(tmp)]; objcopy = objstrAlloc(record->cache, size); @@ -181,6 +184,7 @@ objheader_t *transRead(transrecord_t *record, unsigned int oid) chashInsert(record->lookupTable, OID(tmp), objcopy); return(objcopy); } else { /* If not found anywhere, then block until object appears in prefetch cache */ + printf("Inside remote machine\n"); pthread_mutex_lock(&pflookup.lock); while(!found) { rc = pthread_cond_timedwait(&pflookup.cond, &pflookup.lock, &ts); @@ -300,6 +304,7 @@ int transCommit(transrecord_t *record) { char treplyctrl = 0, treplyretry = 0; /* keeps track of the common response that needs to be sent */ char localstat = 0; + /* Look through all the objects in the transaction record and make piles * for each machine involved in the transaction*/ pile = createPiles(record); @@ -417,7 +422,6 @@ int transCommit(transrecord_t *record) { if(treplyretry == 1) { /* wait a random amount of time */ randomdelay(); - //sleep(1); /* Retry the commiting transaction again */ transCommit(record); } @@ -572,6 +576,7 @@ int decideResponse(thread_data_array_t *tdata) { /* Send Abort */ *(tdata->replyctrl) = TRANS_ABORT; printf("DEBUG-> trans.c Sending TRANS_ABORT\n"); + /* Free resources */ objstrDelete(tdata->rec->cache); chashDelete(tdata->rec->lookupTable); free(tdata->rec); @@ -579,6 +584,7 @@ int decideResponse(thread_data_array_t *tdata) { /* Send Commit */ *(tdata->replyctrl) = TRANS_COMMIT; printf("DEBUG-> trans.c Sending TRANS_COMMIT\n"); + /* Free resources */ objstrDelete(tdata->rec->cache); chashDelete(tdata->rec->lookupTable); free(tdata->rec); @@ -594,7 +600,7 @@ int decideResponse(thread_data_array_t *tdata) { return 0; } -/* This function sends the final response to all threads in their respective socket id */ +/* This function sends the final response to remote machines per thread in their respective socket id */ char sendResponse(thread_data_array_t *tdata, int sd) { int n, N, sum, oidcount = 0; char *ptr, retval = 0; @@ -1232,8 +1238,10 @@ void *transPrefetch(void *t) { } } -/*The pool of threads work on this function to establish connection with - * remote machines */ +/* Each thread in the pool of threads calls this function to establish connection with + * remote machines, send the prefetch requests and process the reponses from + * the remote machines . + * The thread is active throughout the period of runtime */ void *mcqProcess(void *threadid) { int tid; @@ -1258,66 +1266,12 @@ void *mcqProcess(void *threadid) { /*Initiate connection to remote host and send request */ /* Process Request */ sendPrefetchReq(mcpilenode, tid); - /* TODO: For each object not found query DHT for new location and retrieve the object */ /* Deallocate the machine queue pile node */ mcdealloc(mcpilenode); } } -/*This function is called by the thread that processes the - * prefetch request makes piles to prefetch records and prefetches the oids from remote machines */ -int transPrefetchProcess(transrecord_t *record, int *arrayofoffset[], short numoids){ - int i, k = 0, rc; - int arraylength[numoids]; - unsigned int machinenumber; - objheader_t *tmp, *objheader; - void *objcopy; - int size; - pthread_attr_t attr; - - /* Given tuple find length of tuple*/ - for(i = 0; i < numoids ; i++) { - arraylength[i] = arrayLength(arrayofoffset[i]); - } - - /* Initialize and set thread attributes - * Spawn a thread for each prefetch request sent*/ - pthread_t thread[numoids]; - pthread_attr_init(&attr); - pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_JOINABLE); - - /* Create Machine Piles to send prefetch requests use threads*/ - for( i = 0 ; i< numoids ; i++) { - if(arrayofoffset[i][0] == -1) - continue; - else{ - /* For each Pile in the machine send TRANS_PREFETCH */ - //makePiles(arrayofoffset, numoids); - /* Fill thread data structure */ - //rc = pthread_create(&thread[i] , &attr, sendPrefetchReq, (void *) arrayofoffset[i]); - if (rc) { - perror("Error in pthread create at transPrefetchProcess()\n"); - return 1; - } - - } - } - - /* Free attribute and wait to join other threads */ - for (i = 0 ;i < numoids ; i++) { - rc = pthread_join(thread[i], NULL); - if (rc) { - perror("Error pthread_join() in transPrefetchProcess()\n"); - return 1; - } - } - pthread_attr_destroy(&attr); - - return 0; - -} - void sendPrefetchReq(prefetchpile_t *mcpilenode, int threadid) { int sd, i, offset, off, len, endpair, numoffsets, count = 0; struct sockaddr_in serv_addr; @@ -1384,6 +1338,7 @@ void sendPrefetchReq(prefetchpile_t *mcpilenode, int threadid) { /* Get Response from the remote machine */ getPrefetchResponse(count,sd); close(sd); + return; } void getPrefetchResponse(int count, int sd) { @@ -1391,13 +1346,14 @@ void getPrefetchResponse(int count, int sd) { unsigned int bufsize,oid; char buffer[RECEIVE_BUFFER_SIZE], control; char *ptr; - void *modptr; + void *modptr, *oldptr; /* Read prefetch response from the Remote machine */ if((val = read(sd, &control, sizeof(char))) <= 0) { perror("No control response for Prefetch request sent\n"); return; } + if(control == TRANS_PREFETCH_RESPONSE) { /*For each oid and offset tuple sent as prefetch request to remote machine*/ while(i < count) { @@ -1435,19 +1391,35 @@ void getPrefetchResponse(int count, int sd) { } memcpy(modptr, buffer+index, objsize); index += sizeof(int); - /* Add pointer and oid to hash table */ - //TODO Do we need a version comparison here?? - prehashInsert(oid, modptr); + /* Insert the oid and its address into the prefetch hash lookup table */ + /* Do a version comparison if the oid exists */ + if((oldptr = prehashSearch(oid)) != NULL) { + /* If older version then update with new object ptr */ + if(((objheader_t *)oldptr)->version < ((objheader_t *)modptr)->version) { + prehashRemove(oid); + prehashInsert(oid, modptr); + } else if(((objheader_t *)oldptr)->version == ((objheader_t *)modptr)->version) { + /* Add the new object ptr to hash table */ + prehashInsert(oid, modptr); + } else { /* Do nothing */ + ; + } + } else {/*If doesn't no match found in hashtable, add the object ptr to hash table*/ + prehashInsert(oid, modptr); + } /* Broadcast signal on prefetch cache condition variable */ pthread_cond_broadcast(&pflookup.cond); /* Unlock the Prefetch Cache look up table*/ pthread_mutex_unlock(&pflookup.lock); } else if(buffer[index] == OBJECT_NOT_FOUND) { /* Increment it to get the object */ - // TODO If object not found, local machine takes inventory + /* TODO: For each object not found query DHT for new location and retrieve the object */ index += sizeof(char); memcpy(&oid, buffer + index, sizeof(unsigned int)); index += sizeof(unsigned int); + /* Throw an error */ + printf("OBJECT NOT FOUND.... THIS SHOULD NOT HAPPEN...TERMINATE PROGRAM\n"); + exit(-1); } else printf("Error in decoding the index value %s, %d\n",__FILE__, __LINE__); } @@ -1456,4 +1428,5 @@ void getPrefetchResponse(int count, int sd) { } } else printf("Error in receving response for prefetch request %s, %d\n",__FILE__, __LINE__); + return; } -- 2.34.1