From b2aa4286156f424155a16ca2d94f42ddefd94e57 Mon Sep 17 00:00:00 2001 From: erubow Date: Wed, 12 Sep 2007 02:03:07 +0000 Subject: [PATCH] Added code to read config file of IP address, and use these for the mostly fair distribution of the OID space. Updated llookup to use this static partitioning for now. --- Robust/src/Runtime/DSTM/interface/dstm.c | 6 - Robust/src/Runtime/DSTM/interface/dstm.h | 2 + .../src/Runtime/DSTM/interface/dstmserver.c | 7 +- Robust/src/Runtime/DSTM/interface/llookup.c | 33 +++++ Robust/src/Runtime/DSTM/interface/llookup.h | 20 ++- Robust/src/Runtime/DSTM/interface/trans.c | 121 +++++++++++++++++- 6 files changed, 171 insertions(+), 18 deletions(-) diff --git a/Robust/src/Runtime/DSTM/interface/dstm.c b/Robust/src/Runtime/DSTM/interface/dstm.c index c586dd75..e404925e 100644 --- a/Robust/src/Runtime/DSTM/interface/dstm.c +++ b/Robust/src/Runtime/DSTM/interface/dstm.c @@ -4,12 +4,6 @@ extern int classsize[]; /* BEGIN object header */ -// Get a new object id -unsigned int getNewOID(void) { - static int id = 1; - return id+=2; -} - // Get the size of the object for a given type unsigned int objSize(objheader_t *object) { return classsize[TYPE(object)]; diff --git a/Robust/src/Runtime/DSTM/interface/dstm.h b/Robust/src/Runtime/DSTM/interface/dstm.h index ffa30361..59de2485 100644 --- a/Robust/src/Runtime/DSTM/interface/dstm.h +++ b/Robust/src/Runtime/DSTM/interface/dstm.h @@ -206,6 +206,8 @@ int transCommitProcess(void *, unsigned int *, unsigned int *, int, int, int); int dstmStartup(const char *); void transInit(); +int processConfigFile(); +void addHost(unsigned int); void randomdelay(void); transrecord_t *transStart(); diff --git a/Robust/src/Runtime/DSTM/interface/dstmserver.c b/Robust/src/Runtime/DSTM/interface/dstmserver.c index 8aa6e35f..dff96e32 100644 --- a/Robust/src/Runtime/DSTM/interface/dstmserver.c +++ b/Robust/src/Runtime/DSTM/interface/dstmserver.c @@ -105,7 +105,6 @@ void *dstmAccept(void *acceptfd) int fd_flags = fcntl((int)acceptfd, F_GETFD), size; - printf("Recieved connection: fd = %d\n", (int)acceptfd); /* Receive control messages from other machines */ if((retval = recv((int)acceptfd, &control, sizeof(char), 0)) <= 0) { if (retval == 0) { @@ -182,9 +181,9 @@ void *dstmAccept(void *acceptfd) retval); else { //TODO: execute run method on this global thread object - printf("dstmAccept(): received START_REMOTE_THREAD msg, oid=%d\n", oid); + printf("dstmAccept(): received START_REMOTE_THREAD msg, oid=0x%x\n", oid); objType = getObjType(oid); - printf("dstmAccept(): type of object %d is %d\n", oid, objType); + printf("dstmAccept(): type of object 0x%x is %d\n", oid, objType); } break; @@ -195,8 +194,6 @@ void *dstmAccept(void *acceptfd) /* Close connection */ if (close((int)acceptfd) == -1) perror("close"); - else - printf("Closed connection: fd = %d\n", (int)acceptfd); pthread_exit(NULL); } diff --git a/Robust/src/Runtime/DSTM/interface/llookup.c b/Robust/src/Runtime/DSTM/interface/llookup.c index cc989a28..30760978 100644 --- a/Robust/src/Runtime/DSTM/interface/llookup.c +++ b/Robust/src/Runtime/DSTM/interface/llookup.c @@ -9,6 +9,36 @@ ***************************************************************************************************/ #include "llookup.h" +#ifdef SIMPLE_LLOOKUP + +extern unsigned int *hostIpAddrs; +extern unsigned int oidsPerBlock; + +unsigned int lhashCreate(unsigned int size, float loadfactor) +{ + return 0; +} + +unsigned int lhashInsert(unsigned int oid, unsigned int mid) +{ + return 0; +} + +unsigned int lhashSearch(unsigned int oid) +{ + if (oidsPerBlock == 0) + return hostIpAddrs[0]; + else + return hostIpAddrs[oid / oidsPerBlock]; +} + +unsigned int lhashRemove(unsigned int oid) +{ + return 0; +} + +#else + lhashtable_t llookup; //Global Hash table // Creates a hash table with size and an array of lhashlistnode_t @@ -193,3 +223,6 @@ unsigned int lhashResize(unsigned int newsize) { free(ptr); //Free the memory of the old hash table return 0; } + +#endif + diff --git a/Robust/src/Runtime/DSTM/interface/llookup.h b/Robust/src/Runtime/DSTM/interface/llookup.h index 7a70b9bd..20dbac72 100644 --- a/Robust/src/Runtime/DSTM/interface/llookup.h +++ b/Robust/src/Runtime/DSTM/interface/llookup.h @@ -5,6 +5,8 @@ #include #include +#define SIMPLE_LLOOKUP + #define LOADFACTOR 0.75 #define HASH_SIZE 100 @@ -22,11 +24,17 @@ typedef struct lhashtable { pthread_mutex_t locktable; } lhashtable_t; -unsigned int lhashCreate(unsigned int size, float loadfactor);// returns 0 for success and 0 for failure -unsigned int lhashFunction(unsigned int oid); // returns 0 for success and 0 for failure -unsigned int lhashInsert(unsigned int oid, unsigned int mid); // returns 0 for success and 0 for failure -unsigned int lhashSearch(unsigned int oid); //returns mid, 0 if not found -unsigned int lhashRemove(unsigned int oid); //returns 0 if not success -unsigned int lhashResize(unsigned int newsize); // returns 0 for success and 0 for failure +//returns 0 for success and 1 for failure +unsigned int lhashCreate(unsigned int size, float loadfactor); +//returns 0 for success and 1 for failure +unsigned int lhashInsert(unsigned int oid, unsigned int mid); +//returns mid, 0 if not found +unsigned int lhashSearch(unsigned int oid); +//returns 0 for success and 1 for failure +unsigned int lhashRemove(unsigned int oid); + +//helper functions +unsigned int lhashResize(unsigned int newsize); +unsigned int lhashFunction(unsigned int oid); #endif diff --git a/Robust/src/Runtime/DSTM/interface/trans.c b/Robust/src/Runtime/DSTM/interface/trans.c index c5f3500a..487b3cb3 100644 --- a/Robust/src/Runtime/DSTM/interface/trans.c +++ b/Robust/src/Runtime/DSTM/interface/trans.c @@ -23,6 +23,7 @@ #define RECEIVE_BUFFER_SIZE 2048 #define NUM_THREADS 10 #define PREFETCH_CACHE_SIZE 1048576 //1MB +#define CONFIG_FILENAME "dstm.conf" /* Global Variables */ extern int classsize[]; @@ -34,6 +35,13 @@ pthread_t wthreads[NUM_THREADS]; //Worker threads for working on the prefetch qu pthread_t tPrefetch; extern objstr_t *mainobjstore; unsigned int myIpAddr; +unsigned int *hostIpAddrs; +int sizeOfHostArray; +int numHostsInSystem; +int myIndexInHostArray; +unsigned int oidsPerBlock; +unsigned int oidMin; +unsigned int oidMax; plistnode_t *createPiles(transrecord_t *); inline int arrayLength(int *array) { @@ -87,7 +95,8 @@ int dstmStartup(const char * option) { pthread_attr_t attr; int master=strcmp(option, "master")==0; - myIpAddr = getMyIpAddr("eth0"); + if (processConfigFile() != 0) + return 0; //TODO: return error value, cause main program to exit dstmInit(); transInit(); @@ -1558,3 +1567,113 @@ int startRemoteThread(unsigned int oid, unsigned int mid) return status; } +//TODO: when reusing oids, make sure they are not already in use! +unsigned int getNewOID(void) { + static unsigned int id = 0xFFFFFFFF; + + id += 2; + if (id > oidMax || id < oidMin) + { + id = (oidMin | 1); + } + return id; +} + +int processConfigFile() +{ + FILE *configFile; + const int maxLineLength = 200; + char lineBuffer[maxLineLength]; + char *token; + const char *delimiters = " \t\n"; + char *commentBegin; + in_addr_t tmpAddr; + + configFile = fopen(CONFIG_FILENAME, "r"); + if (configFile == NULL) + { + printf("error opening %s:\n", CONFIG_FILENAME); + perror(""); + return -1; + } + + numHostsInSystem = 0; + sizeOfHostArray = 8; + hostIpAddrs = calloc(sizeOfHostArray, sizeof(unsigned int)); + + while(fgets(lineBuffer, maxLineLength, configFile) != NULL) + { + commentBegin = strchr(lineBuffer, '#'); + if (commentBegin != NULL) + *commentBegin = '\0'; + token = strtok(lineBuffer, delimiters); + while (token != NULL) + { + tmpAddr = inet_addr(token); + if ((int)tmpAddr == -1) + { + printf("error in %s: bad token:%s\n", CONFIG_FILENAME, token); + fclose(configFile); + return -1; + } + else + addHost(htonl(tmpAddr)); + token = strtok(NULL, delimiters); + } + } + + fclose(configFile); + + if (numHostsInSystem < 1) + { + printf("error in %s: no IP Adresses found\n", CONFIG_FILENAME); + return -1; + } + myIpAddr = getMyIpAddr("eth0"); + myIndexInHostArray = findHost(myIpAddr); + if (myIndexInHostArray == -1) + { + printf("error in %s: IP Address of eth0 not found\n", CONFIG_FILENAME); + return -1; + } + oidsPerBlock = (0xFFFFFFFF / numHostsInSystem) + 1; + oidMin = oidsPerBlock * myIndexInHostArray; + if (myIndexInHostArray == numHostsInSystem - 1) + oidMax = 0xFFFFFFFF; + else + oidMax = oidsPerBlock * (myIndexInHostArray + 1) - 1; + + return 0; +} + +void addHost(unsigned int hostIp) +{ + unsigned int *tmpArray; + + if (findHost(hostIp) != -1) + return; + + if (numHostsInSystem == sizeOfHostArray) + { + tmpArray = calloc(sizeOfHostArray * 2, sizeof(unsigned int)); + memcpy(tmpArray, hostIpAddrs, sizeof(unsigned int) * numHostsInSystem); + free(hostIpAddrs); + hostIpAddrs = tmpArray; + } + + hostIpAddrs[numHostsInSystem++] = hostIp; + + return; +} + +int findHost(unsigned int hostIp) +{ + int i; + for (i = 0; i < numHostsInSystem; i++) + if (hostIpAddrs[i] == hostIp) + return i; + + //not found + return -1; +} + -- 2.34.1