From: erubow Date: Sat, 4 Aug 2007 00:40:11 +0000 (+0000) Subject: DHT: Somewhat functional. Added a function to mlookup allowing the dht to retrieve... X-Git-Tag: preEdgeChange~502 X-Git-Url: http://demsky.eecs.uci.edu/git/?a=commitdiff_plain;h=bfeb7d3a7af4da97c883850ab403d1483f73cac9;p=IRC.git DHT: Somewhat functional. Added a function to mlookup allowing the dht to retrieve the locally stored oids for the purpose of rebuilding (not tested yet!). Rebuilds are pretty slow when hosts disappear, so some fine tuning is needed. --- diff --git a/Robust/src/Runtime/DSTM/interface/clookup.c b/Robust/src/Runtime/DSTM/interface/clookup.c index aa014698..ca7d34c9 100644 --- a/Robust/src/Runtime/DSTM/interface/clookup.c +++ b/Robust/src/Runtime/DSTM/interface/clookup.c @@ -57,7 +57,7 @@ unsigned int chashInsert(chashtable_t *table, unsigned int key, void *val) { return 1; } node->key = key; - node->val = val ; + node->val = val; node->next = ptr[index].next; ptr[index].next = node; } diff --git a/Robust/src/Runtime/DSTM/interface/dht.c b/Robust/src/Runtime/DSTM/interface/dht.c index 2b956665..e0ef4445 100644 --- a/Robust/src/Runtime/DSTM/interface/dht.c +++ b/Robust/src/Runtime/DSTM/interface/dht.c @@ -1,46 +1,24 @@ -#include "dht.h" - -#ifdef SIMPLE_DHT - -#include - -#define NUM_HOSTS 4 -#define OIDS_PER_HOST 0x40000000 - -//set these to your IP addresses -unsigned int hosts[NUM_HOSTS] = { - 0xc0a802c8, - 0xc0a802c9, - 0xc0a802ca, - 0xc0a802cb, -}; - -//does nothing -void dhtInit(unsigned int maxKeyCapaciy) -{ return;} - -//does nothing -void dhtExit() -{ return;} - -//does nothing, returns 0 -int dhtInsert(unsigned int key, unsigned int val) -{ return 0;} - -//does nothing, returns 0 -int dhtRemove(unsigned int key) -{ return 0;} - -//returns 0 if successful and copies val into *val, -// 1 if key not found, -1 if an error occurred -int dhtSearch(unsigned int key, unsigned int *val) -{ - *val = hosts[key / OIDS_PER_HOST]; - return 0; -} - -#else - +/******************************************************************************* +* dht.c +* +* High-performance Distributed Hash Table for finding the location of objects +* in a Distributed Shared Transactional Memory system. +* +* Creator: Erik Rubow +* +* TODO: +* 1) Instead of having dhtInsertMult, dhtSearchMult, etc. call their single-key +* counterparts repeatedly, define some new messages to handle it more +* efficiently. +* 2) Improve the efficiency of functions that work with hostArray, hostReplied, +* and blockOwnerArray. +* 3) Currently a join or leave causes a rebuild of the entire hash table. +* Implement more graceful join and leave procedures. +* 4) Fine tune timeout values for performance, possibly implement a backoff +* algorithm to prevent overloading the network. +* 5) Whatever else I'm forgetting +* +*******************************************************************************/ /******************************************************************************* * Includes *******************************************************************************/ @@ -62,120 +40,125 @@ int dhtSearch(unsigned int key, unsigned int *val) #include #include #include +#include +#include "dht.h" #include "clookup.h" //this works for now, do we need anything better? +#include "mlookup.h" /******************************************************************************* * Local Defines, Structs *******************************************************************************/ -#define BUFFER_SIZE 512 //maximum message size +#define MAX_MSG_SIZE 1500 #define UDP_PORT 2157 -#define TCP_PORT 2157 -#define BACKLOG 10 //max pending tcp connections -#define TIMEOUT_MS 500 -#define MAX_RETRIES 3 -#define INIT_HOST_ALLOC 1 -#define INIT_BLOCK_NUM 1 +#define INIT_HOST_ALLOC 3 +#define INIT_NUM_BLOCKS 16 #define DEFAULT_INTERFACE "eth0" -#define DHT_LOG "dht.log" - -//make sure this is consistent with enum below -#define NUM_MSG_TYPES 20 - +#define TIMEOUT_PERIOD 100 +#define INSERT_TIMEOUT_MS 500 +#define INSERT_RETRIES 50 +#define REMOVE_TIMEOUT_MS 500 +#define REMOVE_RETRIES 50 +#define SEARCH_TIMEOUT_MS 500 +#define SEARCH_RETRIES 50 + +//message types //make sure this matches msg_types global var -enum { +enum +{ INSERT_CMD, INSERT_RES, REMOVE_CMD, REMOVE_RES, SEARCH_CMD, SEARCH_RES, - FIND_LEADER_REQ, - FIND_LEADER_RES, - REBUILD_REQ, - REBUILD_RES, - NOT_LEADER, - REBUILD_CMD, + WHO_IS_LEADER_CMD, + WHO_IS_LEADER_RES, JOIN_REQ, JOIN_RES, - GET_DHT_INFO_CMD, - DHT_INFO_REQ, - DHT_INFO_RES, + LEAVE_REQ, + LEAVE_RES, + DHT_UPDATE_CMD, + DHT_UPDATE_RES, + ELECT_LEADER_CMD, + ELECT_LEADER_RES, + CONGRATS_CMD, + REBUILD_REQ, + REBUILD_CMD, FILL_DHT_CMD, FILL_DHT_RES, - REBUILD_DONE_INFO + RESUME_NORMAL_CMD, + RESUME_NORMAL_RES, + NUM_MSG_TYPES }; -//status codes -enum { - INSERT_OK, - INSERT_ERROR, - REMOVE_OK, - REMOVE_ERROR, - KEY_FOUND, - KEY_NOT_FOUND, - NOT_KEY_OWNER, -}; - -enum { +//states +//make sure this matches state_names, timeout_vals, and retry_vals global vars +enum +{ + INIT1_STATE, + INIT2_STATE, NORMAL_STATE, + LEAD_NORMAL1_STATE, + LEAD_NORMAL2_STATE, + ELECT1_STATE, + ELECT2_STATE, + REBUILD0_STATE, REBUILD1_STATE, REBUILD2_STATE, REBUILD3_STATE, - LEAD_NORMAL_STATE, + REBUILD4_STATE, + REBUILD5_STATE, LEAD_REBUILD1_STATE, LEAD_REBUILD2_STATE, - LEAD_REBUILD3_STATE -}; - -struct hostData { - unsigned int ipAddr; - unsigned int maxKeyCapacity; -}; - -struct insertCmd { - unsigned int msgType:8; - unsigned int unused:24; - unsigned int key; - unsigned int val; -}; - -struct removeCmd { - unsigned int msgType:8; - unsigned int unused:24; - unsigned int key; -}; - -struct searchCmd { - unsigned int msgType:8; - unsigned int unused:24; - unsigned int key; + LEAD_REBUILD3_STATE, + LEAD_REBUILD4_STATE, + EXIT1_STATE, + EXIT2_STATE, + NUM_STATES }; -struct insertRes { - unsigned int msgType:8; - unsigned int unused:24; - unsigned int status; +//status codes +enum +{ + OPERATION_OK, + KEY_NOT_FOUND, + NOT_KEY_OWNER, + NOT_LEADER, + INTERNAL_ERROR }; -struct removeRes { - unsigned int msgType:8; - unsigned int unused:24; - unsigned int status; +struct hostData +{ + unsigned int ipAddr; + unsigned int maxKeyCapacity; }; -struct searchRes { - unsigned int msgType:8; - unsigned int unused:24; - unsigned int status; - unsigned int val; -}; +/******************************************************************************* +* Local Function Prototypes +*******************************************************************************/ -struct joinReq { - unsigned int msgType:8; - unsigned int unused:24; - struct hostData newHostData; -}; +int msgSizeOk(unsigned char *msg, unsigned int size); +unsigned short read2(unsigned char *msg); +unsigned int read4(unsigned char *msg); +void write2(unsigned char *ptr, unsigned short tmp); +void write4(unsigned char *ptr, unsigned int tmp); +unsigned int getMyIpAddr(const char *interfaceStr); +int udpSend(unsigned char *msg, unsigned int size, unsigned int destIp); +int udpSendAll(unsigned char *msg, unsigned int size); +unsigned int hash(unsigned int x); +unsigned int getKeyOwner(unsigned int key); +void setState(unsigned int newState); +void makeAssignments(); +int addHost(struct hostData newHost); +int removeHost(unsigned int ipAddr); +void removeUnresponsiveHosts(); +int checkReplied(unsigned int ipAddr); +int allReplied(); +void writeHostList(); +void dhtLog(const char *format, ...); +void *fillTask(); +void *udpListen(); /******************************************************************************* * Global Variables @@ -190,751 +173,578 @@ const char *msg_types[NUM_MSG_TYPES] = "REMOVE_RES", "SEARCH_CMD", "SEARCH_RES", - "FIND_LEADER_REQ", - "FIND_LEADER_RES", - "REBUILD_REQ", - "REBUILD_RES", - "NOT_LEADER", - "REBUILD_CMD", + "WHO_IS_LEADER_CMD", + "WHO_IS_LEADER_RES", "JOIN_REQ", "JOIN_RES", - "GET_DHT_INFO_CMD", - "DHT_INFO_REQ", - "DHT_INFO_RES", + "LEAVE_REQ", + "LEAVE_RES", + "DHT_UPDATE_CMD", + "DHT_UPDATE_RES", + "ELECT_LEADER_CMD", + "ELECT_LEADER_RES", + "CONGRATS_CMD", + "REBUILD_REQ", + "REBUILD_CMD", "FILL_DHT_CMD", "FILL_DHT_RES", - "REBUILD_DONE_INFO" + "RESUME_NORMAL_CMD", + "RESUME_NORMAL_RES" +}; + +const char *state_names[NUM_STATES] = +{ + "INIT1_STATE", + "INIT2_STATE", + "NORMAL_STATE", + "LEAD_NORMAL1_STATE", + "LEAD_NORMAL2_STATE", + "ELECT1_STATE", + "ELECT2_STATE", + "REBUILD0_STATE", + "REBUILD1_STATE", + "REBUILD2_STATE", + "REBUILD3_STATE", + "REBUILD4_STATE", + "REBUILD5_STATE", + "LEAD_REBUILD1_STATE", + "LEAD_REBUILD2_STATE", + "LEAD_REBUILD3_STATE", + "LEAD_REBUILD4_STATE", + "EXIT1_STATE", + "EXIT2_STATE", +}; + +//note: { 0, 0 } means no timeout +struct timeval timeout_vals[NUM_STATES] = +{ + { 0, 500000 }, //INIT1_STATE + { 0, 500000 }, //INIT2_STATE + { 0, 0 }, //NORMAL_STATE + { 0, 0 }, //LEAD_NORMAL1_STATE + { 3, 0 }, //LEAD_NORMAL2_STATE + { 1, 0 }, //ELECT1_STATE + { 1, 0 }, //ELECT2_STATE + { 0, 500000 }, //REBUILD0_STATE + { 0, 500000 }, //REBUILD1_STATE + { 10, 0 }, //REBUILD2_STATE + { 10, 0 }, //REBUILD3_STATE + { 10, 0 }, //REBUILD4_STATE + { 1, 0 }, //REBUILD5_STATE + { 1, 0 }, //LEAD_REBUILD1_STATE + { 1, 0 }, //LEAD_REBUILD2_STATE + { 10, 0 }, //LEAD_REBUILD3_STATE + { 10, 0 }, //LEAD_REBUILD4_STATE + { 0, 500000 }, //EXIT1_STATE + { 0, 0 } //EXIT2_STATE +}; + +int retry_vals[NUM_STATES] = +{ + 100, //INIT1_STATE + 10, //INIT2_STATE + 0, //NORMAL_STATE + 0, //LEAD_NORMAL1_STATE + 0, //LEAD_NORMAL2_STATE + 10, //ELECT1_STATE + 10, //ELECT2_STATE + 10, //REBUILD0_STATE + 10, //REBUILD1_STATE + 0, //REBUILD2_STATE + 0, //REBUILD3_STATE + 0, //REBUILD4_STATE + 10, //REBUILD5_STATE + 10, //LEAD_REBUILD1_STATE + 10, //LEAD_REBUILD2_STATE + 10, //LEAD_REBUILD3_STATE + 10, //LEAD_REBUILD4_STATE + 10, //EXIT1_STATE + 0 //EXIT2_STATE }; FILE *logfile; -//ip address of leader -unsigned int leader; -//set by dhtInit() struct hostData myHostData; -//number of hosts in the system -unsigned int numHosts; -//ip address and max key capacity of each host -struct hostData *hostArray; -//memory allocated for this many items in hostArray -unsigned int hostArraySize; -//number of keyspace divisions, preferably a power of 2 > numHosts -unsigned int numBlocks; -//this array has numBlocks elements, each of which contains an index to hostArray -// the key owner is found by hashing the key into one of these blocks and using this -// array to find the corresponding host in hostArray -unsigned int *blockOwnerArray; -//used by leader to track which hosts have responded, etc. -unsigned int *hostRebuildStates; -//thread handles pthread_t threadUdpListen; -pthread_t threadTcpListen; -//server sockets -struct pollfd udpServerPollSock; -int tcpListenSock; -//see above for enumeration of states -int state; - -/******************************************************************************* -* Local Function Prototypes -*******************************************************************************/ - -//log funtion, use like printf() -void dhtLog(const char *format, ...); -//return my IP address -unsigned int getMyIpAddr(); -//sends broadcast to discover leader -unsigned int findLeader(); -//UDP server -void *udpListen(); -//TCP server -void *tcpListen(); -//TCP connection handler -void *tcpAccept(void *); -//returns number of bytes received in resBuffer, or -1 if an error occurred -int udpSendWaitForResponse(unsigned int dest_ip, unsigned short dest_port, - void *msg, unsigned int msglen, void *resBuffer, unsigned int resBufferSize, - unsigned int timeout, unsigned int numRetries); -//returns number of bytes received in resBuffer, or -1 if an error occurred -int udpBroadcastWaitForResponse(unsigned int *reply_ip, - unsigned short dest_port, void *msg, unsigned int msglen, void *resBuffer, - unsigned int resBufferSize, unsigned int timeout, unsigned int numRetries); -//just UDP it -int sendNoWait(unsigned int dest_ip, unsigned short dest_port, void *msg, - unsigned int msglen); -//right now this hashes the key into a block and returns the block owner -unsigned int getKeyOwner(unsigned int key); -//simple hash -unsigned int hash(unsigned int x); -//sends REBUILD_REQ to leader, retries until leader responds, or causes new leader to be chosen -void initRebuild(); -//adds entry to end of hostArray, increments numHosts, -// allocates more space if necessary -void addHost(struct hostData newHost); -//initiates TCP connection with leader, gets DHT data -int getDHTdata(); -//outputs readable DHT data to outfile -void writeDHTdata(FILE *outfile); -void clearDHTdata(); -void initDHTdata(); -void makeAssignments(); -//returns not-zero if ok, zero if not ok -int msgSizeOk(unsigned char type, unsigned int size); +pthread_t threadFillTask; +//status of fillTask: 0 = ready to run, 1 = running, 2 = completed, 3 = error +int fillStatus; +struct pollfd udpPollSock; +unsigned int state; +unsigned int seed; +unsigned int leader; +unsigned int electionOriginator; +unsigned int electionParent; +unsigned int hostArraySize = 0; +struct hostData *hostArray = NULL; +unsigned int numBlocks = 0; +unsigned short *blockOwnerArray = NULL; +unsigned char *hostReplied = NULL; +pthread_mutex_t stateMutex; +pthread_cond_t stateCond; +chashtable_t *myHashTable; +unsigned int numHosts; +struct timeval timer; +int timerSet; +int timeoutCntr; /******************************************************************************* -* Global Function Definitions +* Interface Function Definitions *******************************************************************************/ -void dhtInit(unsigned int maxKeyCapacity) +void dhtInit(unsigned int seedIpAddr, unsigned int maxKeyCapacity) { - unsigned int myMessage; - int bytesReceived; - int i; - int ret; + struct in_addr tmpAddr; + char filename[23] = "dht-"; + struct sockaddr_in myAddr; + struct sockaddr_in seedAddr; + socklen_t socklen = sizeof(struct sockaddr_in); + char initMsg; - logfile = fopen(DHT_LOG, "w"); - dhtLog("dhtInit() - initializing...\n"); + tmpAddr.s_addr = htonl(getMyIpAddr(DEFAULT_INTERFACE)); + strcat(filename, inet_ntoa(tmpAddr)); + strcat(filename, ".log"); + printf("log file: %s\n", filename); - myHostData.ipAddr = getMyIpAddr(); + logfile = fopen(filename, "w"); + dhtLog("dhtInit(): inializing...\n"); + + myHostData.ipAddr = getMyIpAddr(DEFAULT_INTERFACE); myHostData.maxKeyCapacity = maxKeyCapacity; - numHosts = numBlocks = hostArraySize = 0; - hostArray = NULL; - blockOwnerArray = NULL; - hostRebuildStates = NULL; + seed = seedIpAddr; + leader = 0; + electionOriginator = 0; + electionParent = 0; + hostArraySize = INIT_HOST_ALLOC; + hostArray = calloc(hostArraySize, sizeof(struct hostData)); + hostReplied = calloc(hostArraySize, sizeof(unsigned char)); + hostArray[0] = myHostData; + numHosts = 1; + numBlocks = INIT_NUM_BLOCKS; + blockOwnerArray = calloc(numBlocks, sizeof(unsigned short)); + pthread_mutex_init(&stateMutex, NULL); + pthread_cond_init(&stateCond, NULL); + myHashTable = chashCreate(HASH_SIZE, LOADFACTOR); + + udpPollSock.fd = socket(AF_INET, SOCK_DGRAM, IPPROTO_UDP); + if (udpPollSock.fd < 0) + perror("dhtInit():socket()"); + + udpPollSock.events = POLLIN; + + bzero(&myAddr, socklen); + myAddr.sin_family = AF_INET; + myAddr.sin_addr.s_addr = htonl(INADDR_ANY); + myAddr.sin_port = htons(UDP_PORT); - state = NORMAL_STATE; + if (bind(udpPollSock.fd, (struct sockaddr *)&myAddr, socklen) < 0) + perror("dhtInit():bind()"); - pthread_create(&threadUdpListen, NULL, udpListen, NULL); - pthread_create(&threadTcpListen, NULL, tcpListen, NULL); + if (seed == 0) + { + dhtLog("I am the leader\n"); + leader = myHostData.ipAddr; + setState(LEAD_NORMAL1_STATE); + } + else + { + initMsg = WHO_IS_LEADER_CMD; + udpSend(&initMsg, 1, seed); + setState(INIT1_STATE); + } - initRebuild(); + if (pthread_create(&threadUdpListen, NULL, udpListen, NULL) != 0) + dhtLog("dhtInit() - ERROR creating threadUdpListen\n"); return; } void dhtExit() -{ +{ //TODO: do this gracefully, wait for response from leader, etc. + char msg; + + msg = LEAVE_REQ; + udpSend(&msg, 1, leader); dhtLog("dhtExit(): cleaning up...\n"); - fclose(logfile); pthread_cancel(threadUdpListen); - pthread_cancel(threadTcpListen); - close(udpServerPollSock.fd); - close(tcpListenSock); - clearDHTdata(); + close(udpPollSock.fd); + free(hostArray); + free(hostReplied); + free(blockOwnerArray); + fclose(logfile); + + return; } int dhtInsert(unsigned int key, unsigned int val) { - unsigned int dest_ip = getKeyOwner(key); - struct insertCmd myMessage; - struct insertRes response; - int bytesReceived; - - myMessage.msgType = INSERT_CMD; - myMessage.key = key; - myMessage.val = val; - - bytesReceived = udpSendWaitForResponse(dest_ip, UDP_PORT, (void *)&myMessage, - sizeof(struct insertCmd), (void *)&response, sizeof(struct insertRes), - TIMEOUT_MS, MAX_RETRIES); - if (bytesReceived == sizeof(struct insertRes)) - { - if (response.msgType == INSERT_RES) - { - if (response.status == INSERT_OK) - return 0; -// if (response.status == NOT_KEY_OWNER) - } - } -//TODO: find owner and try again, request rebuild if necessary - return -1; //this function should be robust enough to always return 0 -} + struct sockaddr_in toAddr; + struct sockaddr_in fromAddr; + socklen_t socklen = sizeof(struct sockaddr_in); + struct pollfd pollsock; + char inBuffer[2]; + char outBuffer[9]; + ssize_t bytesRcvd; + int i; + int retval; + int status = -1; -int dhtRemove(unsigned int key) -{ - unsigned int dest_ip = getKeyOwner(key); - struct removeCmd myMessage; - struct removeRes response; - int bytesReceived; - - myMessage.msgType = REMOVE_CMD; - myMessage.key = key; + bzero((char *)&toAddr, socklen); + toAddr.sin_family = AF_INET; + toAddr.sin_port = htons(UDP_PORT); - bytesReceived = udpSendWaitForResponse(dest_ip, UDP_PORT, (void *)&myMessage, - sizeof(struct removeCmd), (void *)&response, sizeof(struct removeRes), - TIMEOUT_MS, MAX_RETRIES); - if (bytesReceived == sizeof(struct removeRes)) + while (status != OPERATION_OK) { - if (response.msgType == REMOVE_RES) + pthread_mutex_lock(&stateMutex); + while (!(state == NORMAL_STATE || state == LEAD_NORMAL1_STATE + || state == LEAD_NORMAL2_STATE || state == REBUILD4_STATE + || state == LEAD_REBUILD3_STATE)) + pthread_cond_wait(&stateCond, &stateMutex); + toAddr.sin_addr.s_addr = htonl(getKeyOwner(key)); + pthread_mutex_unlock(&stateMutex); + + if ((pollsock.fd = socket(AF_INET, SOCK_DGRAM, 0)) < 0) { - if (response.status == REMOVE_OK) - return 0; -// if (response.status == NOT_KEY_OWNER) + perror("dhtInsert():socket()"); + return -1; } - } -//TODO: find owner and try again, request rebuild if necessary - return -1; //this function should be robust enough to always return 0 -} + pollsock.events = POLLIN; -int dhtSearch(unsigned int key, unsigned int *val) -{ - unsigned int dest_ip = getKeyOwner(key); - struct searchCmd myMessage; - struct searchRes response; - int bytesReceived; - - myMessage.msgType = SEARCH_CMD; - myMessage.key = key; - - bytesReceived = udpSendWaitForResponse(dest_ip, UDP_PORT, (void *)&myMessage, - sizeof(struct searchCmd), (void *)&response, sizeof(struct searchRes), - TIMEOUT_MS, MAX_RETRIES); - if (bytesReceived == sizeof(struct searchRes)) - { - if (response.msgType == SEARCH_RES) + outBuffer[0] = INSERT_CMD; + write4(&outBuffer[1], key); + write4(&outBuffer[5], val); + + for (i = 0; i < INSERT_RETRIES; i++) { - if (response.status == KEY_FOUND) + if (sendto(pollsock.fd, outBuffer, 9, 0, (struct sockaddr *)&toAddr, + socklen) < 0) { - *val = response.val; - return 0; + perror("dhtInsert():sendto()"); + break; } - if (response.status == KEY_NOT_FOUND) + retval = poll(&pollsock, 1, INSERT_TIMEOUT_MS); + if (retval < 0) { - return 1; + perror("dhtInsert():poll()"); + break; + } + if (retval > 0) + { + bytesRcvd = recvfrom(pollsock.fd, inBuffer, 2, 0, + (struct sockaddr *)&fromAddr, &socklen); + if (fromAddr.sin_addr.s_addr == toAddr.sin_addr.s_addr + && fromAddr.sin_port == toAddr.sin_port + && bytesRcvd == 2 && inBuffer[0] == INSERT_RES) + { + status = inBuffer[1]; //status from remote host + break; + } } -// if (response.status == NOT_KEY_OWNER) + } + if (status != OPERATION_OK) + { + pthread_mutex_lock(&stateMutex); + setState(REBUILD0_STATE); + outBuffer[0] = REBUILD_REQ; + udpSend(outBuffer, 1, leader); + pthread_mutex_unlock(&stateMutex); } } -//TODO: find owner and try again, request rebuild if necessary - return -1; //this function should be robust enough to always return 0 or 1 -} -/******************************************************************************* -* Local Function Definitions -*******************************************************************************/ + close(pollsock.fd); -//use UDP for messages that are frequent and short -void *udpListen() + return status; +} + +int dhtInsertMult(unsigned int numKeys, unsigned int *keys, unsigned int *vals) { - struct sockaddr_in myAddr; - struct sockaddr_in clientAddr; - struct sockaddr_in bcastAddr; - socklen_t socklen = sizeof(struct sockaddr_in); - char buffer[BUFFER_SIZE]; - ssize_t bytesReceived; - struct insertCmd *insertCmdPtr; - struct removeCmd *removeCmdPtr; - struct searchCmd *searchCmdPtr; - struct insertRes *insertResPtr; - struct removeRes *removeResPtr; - struct searchRes *searchResPtr; - struct joinReq *joinReqPtr; - char replyBuffer[BUFFER_SIZE]; - struct timeval now; - struct timeval rebuild1Timeout; - int rebuild1TimerSet; - int on; - int pollret; + int status; int i; - chashtable_t *myHashTable = chashCreate(HASH_SIZE, LOADFACTOR); - - if ((udpServerPollSock.fd = socket(AF_INET, SOCK_DGRAM, IPPROTO_UDP)) == -1) + status = 0; + for (i = 0; i < numKeys; i++) { - perror("udpListen():socket()"); - pthread_exit(NULL); + if (dhtInsert(keys[i], vals[i]) != 0) + status = -1; } + return status; +} - on = 1; - if (setsockopt(udpServerPollSock.fd, SOL_SOCKET, SO_BROADCAST, &on, - sizeof(on)) == -1) - { - perror("udpBroadcastWaitForResponse():setsockopt()"); - pthread_exit(NULL); - } - - udpServerPollSock.events = POLLIN; - - bzero(&myAddr, socklen); - myAddr.sin_family = AF_INET; - myAddr.sin_addr.s_addr = htonl(INADDR_ANY); - myAddr.sin_port = htons(UDP_PORT); +int dhtRemove(unsigned int key) +{ + struct sockaddr_in toAddr; + struct sockaddr_in fromAddr; + socklen_t socklen = sizeof(struct sockaddr_in); + struct pollfd pollsock; + char inBuffer[2]; + char outBuffer[5]; + ssize_t bytesRcvd; + int i; + int retval; + int status = -1; - bzero(&bcastAddr, socklen); - bcastAddr.sin_family = AF_INET; - bcastAddr.sin_addr.s_addr = htonl(0xFFFFFFFF); - bcastAddr.sin_port = htons(UDP_PORT); + bzero((char *)&toAddr, socklen); + toAddr.sin_family = AF_INET; + toAddr.sin_port = htons(UDP_PORT); - if (bind(udpServerPollSock.fd, (struct sockaddr *)&myAddr, socklen) == -1) + while (!(status == OPERATION_OK || status == KEY_NOT_FOUND)) { - perror("udpListen():bind()"); - pthread_exit(NULL); - } - dhtLog("udpListen(): listening on port %d\n", UDP_PORT); + pthread_mutex_lock(&stateMutex); + while (!(state == NORMAL_STATE || state == LEAD_NORMAL1_STATE + || state == LEAD_NORMAL2_STATE)) + pthread_cond_wait(&stateCond, &stateMutex); + toAddr.sin_addr.s_addr = htonl(getKeyOwner(key)); + pthread_mutex_unlock(&stateMutex); + + if ((pollsock.fd = socket(AF_INET, SOCK_DGRAM, 0)) < 0) + { + perror("dhtRemove():socket()"); + return -1; + } + pollsock.events = POLLIN; - rebuild1TimerSet = 0; - while(1) - { - pollret = poll(&udpServerPollSock, 1, TIMEOUT_MS); - if (pollret < 0) - { perror("udpListen():poll()"); } - else if (pollret > 0) + outBuffer[0] = REMOVE_CMD; + write4(&outBuffer[1], key); + + for (i = 0; i < REMOVE_RETRIES; i++) { - if ((bytesReceived = recvfrom(udpServerPollSock.fd, buffer, BUFFER_SIZE, - 0, (struct sockaddr *)&clientAddr, &socklen)) == -1) - { perror("udpListen():recvfrom()"); } - else if (bytesReceived == 0) + if (sendto(pollsock.fd, outBuffer, 5, 0, (struct sockaddr *)&toAddr, + socklen) < 0) { - dhtLog("udpListen(): recvfrom() returned 0\n"); + perror("dhtRemove():sendto()"); + break; } - else + retval = poll(&pollsock, 1, REMOVE_TIMEOUT_MS); + if (retval < 0) { - dhtLog("udpListen(): received %s from %s\n", - (buffer[0] < NUM_MSG_TYPES ? msg_types[buffer[0]] : - "unknown message"), inet_ntoa(clientAddr.sin_addr)); - if (!msgSizeOk(buffer[0], bytesReceived)) - { - dhtLog("udpListen(): ERROR: incorrect message size\n"); - } - else - { - switch (buffer[0]) - { - case INSERT_CMD: - if (state == NORMAL_STATE || state == LEAD_NORMAL_STATE - || state == REBUILD3_STATE || state == LEAD_REBUILD3_STATE) - { - insertCmdPtr = (struct insertCmd *)buffer; - dhtLog( "udpListen(): Insert: key=%d, val=%d\n", - insertCmdPtr->key, insertCmdPtr->val); - insertResPtr = (struct insertRes *)replyBuffer; - insertResPtr->msgType = INSERT_RES; - insertResPtr->unused = 0; - if (getKeyOwner(insertCmdPtr->key) == myHostData.ipAddr) - { - //note: casting val to void * in order to conform to API - if(chashInsert(myHashTable, insertCmdPtr->key, - (void *)insertCmdPtr->val) == 0) - insertResPtr->status = INSERT_OK; - else - insertResPtr->status = INSERT_ERROR; - } - else - { - insertResPtr->status = NOT_KEY_OWNER;; - } - if (sendto(udpServerPollSock.fd, (void *)insertResPtr, - sizeof(struct insertRes), 0, (struct sockaddr *)&clientAddr, - socklen) == -1) - { perror("udpListen():sendto()"); } - } - break; - case REMOVE_CMD: - if (state == NORMAL_STATE || state == LEAD_NORMAL_STATE) - { - removeCmdPtr = (struct removeCmd *)buffer; - dhtLog("udpListen(): Remove: key=%d\n", removeCmdPtr->key); - removeResPtr = (struct removeRes *)replyBuffer; - removeResPtr->msgType = REMOVE_RES; - removeResPtr->unused = 0; - if (getKeyOwner(removeCmdPtr->key) == myHostData.ipAddr) - { - //note: casting val to void * in order to conform to API - if(chashRemove(myHashTable, removeCmdPtr->key) == 0) - removeResPtr->status = INSERT_OK; - else - removeResPtr->status = INSERT_ERROR; - } - else - { - removeResPtr->status = NOT_KEY_OWNER; - } - if (sendto(udpServerPollSock.fd, (void *)removeResPtr, - sizeof(struct removeRes), 0, (struct sockaddr *)&clientAddr, - socklen) == -1) - { perror("udpListen():sendto()"); } - } - break; - case SEARCH_CMD: - if (state == NORMAL_STATE || state == LEAD_NORMAL_STATE) - { - searchCmdPtr = (struct searchCmd *)buffer; - dhtLog("udpListen(): Search: key=%d\n",searchCmdPtr->key); - searchResPtr = (struct searchRes *)replyBuffer; - searchResPtr->msgType = SEARCH_RES; - searchResPtr->unused = 0; - if (getKeyOwner(searchCmdPtr->key) == myHostData.ipAddr) - { - //note: casting val to void * in order to conform to API - if((searchResPtr->val = (unsigned int)chashSearch(myHashTable, - searchCmdPtr->key)) == 0) - searchResPtr->status = KEY_NOT_FOUND; - else - searchResPtr->status = KEY_FOUND; - } - else - { - searchResPtr->status = NOT_KEY_OWNER; - } - if (sendto(udpServerPollSock.fd, (void *)searchResPtr, - sizeof(struct searchRes), 0, (struct sockaddr *)&clientAddr, - socklen) == -1) - { perror("udpListen():sendto()"); } - } - break; - case FIND_LEADER_REQ: - if (state == LEAD_NORMAL_STATE || state == LEAD_REBUILD1_STATE - || state == LEAD_REBUILD2_STATE || state == LEAD_REBUILD3_STATE) - { - replyBuffer[0] = FIND_LEADER_RES; - if(sendto(udpServerPollSock.fd, (void *)replyBuffer, - sizeof(char), 0,(struct sockaddr *)&clientAddr, socklen) == -1) - { perror("udpListen():sendto()"); } - } - break; - case REBUILD_REQ: - if (state == LEAD_NORMAL_STATE || state == LEAD_REBUILD1_STATE - || state == LEAD_REBUILD2_STATE || state == LEAD_REBUILD3_STATE) - { - replyBuffer[0] = REBUILD_RES; - if (sendto(udpServerPollSock.fd, (void *)replyBuffer, - sizeof(char), 0, (struct sockaddr *)&clientAddr, socklen) == -1) - { perror("udpListen():sendto()"); } - if (gettimeofday(&rebuild1Timeout, NULL) < 0) - { perror("dhtLog():gettimeofday()"); } - //TODO: make this a configurable parameter - rebuild1Timeout.tv_sec += 3; - rebuild1TimerSet = 1; - //clear out previous host data - numHosts = 1; - hostArray[0] = myHostData; - - state = LEAD_REBUILD1_STATE; - - replyBuffer[0] = REBUILD_CMD; - if (sendto(udpServerPollSock.fd, (void *)replyBuffer, - sizeof(char), 0, (struct sockaddr *)&bcastAddr, socklen) == -1) - { perror("udpListen():sendto()"); } - - } - else - { - replyBuffer[0] = NOT_LEADER; - if(sendto(udpServerPollSock.fd, (void *)replyBuffer, - sizeof(char), 0,(struct sockaddr *)&clientAddr, socklen) == -1) - { perror("udpListen():sendto()"); } - } - case REBUILD_CMD: - if (state != LEAD_REBUILD1_STATE) - { - //consider this an official declaration of authority, - // in case I was confused about this - leader = htonl(clientAddr.sin_addr.s_addr); - - clearDHTdata(); - - joinReqPtr = (struct joinReq *)replyBuffer; - joinReqPtr->msgType = JOIN_REQ; - joinReqPtr->unused = 0; - joinReqPtr->newHostData = myHostData; - //note: I'm reusing bytesReceived and buffer - bytesReceived = udpSendWaitForResponse(leader, UDP_PORT, - (void *)replyBuffer, sizeof(struct joinReq), (void *)buffer, - BUFFER_SIZE, TIMEOUT_MS, MAX_RETRIES); - if ((bytesReceived == sizeof(char)) && (buffer[0] == JOIN_RES)) - state = REBUILD1_STATE; - else - initRebuild(); - } - break; - case JOIN_REQ: - if (state == LEAD_REBUILD1_STATE) - { - joinReqPtr = (struct joinReq *)buffer; - addHost(joinReqPtr->newHostData); - - replyBuffer[0] = JOIN_RES; - if (sendto(udpServerPollSock.fd, (void *)replyBuffer, - sizeof(char), 0,(struct sockaddr *)&clientAddr, socklen) == -1) - { perror("udpListen():sendto()"); } - } - break; - case GET_DHT_INFO_CMD: - if (state == REBUILD1_STATE) - { - getDHTdata(); - state = REBUILD2_STATE; - } - break; - default: - dhtLog("udpListen(): ERROR: Unknown message type\n"); - } - } + perror("dhtRemove():poll()"); + break; } - } //end (pollret > 0) - else // (pollret == 0), timeout - { - if (gettimeofday(&now, NULL) < 0) - { perror("dhtLog():gettimeofday()"); } - if (rebuild1TimerSet && timercmp(&now, &rebuild1Timeout, >)) + if (retval > 0) { - rebuild1TimerSet = 0; - if (state == LEAD_REBUILD1_STATE) + bytesRcvd = recvfrom(pollsock.fd, inBuffer, 2, 0, + (struct sockaddr *)&fromAddr, &socklen); + if (fromAddr.sin_addr.s_addr == toAddr.sin_addr.s_addr + && fromAddr.sin_port == toAddr.sin_port + && bytesRcvd == 2 && inBuffer[0] == REMOVE_RES) { - makeAssignments(); - dhtLog("udpListen(): assignments made\n"); - writeDHTdata(logfile); - if (hostRebuildStates != NULL) - free(hostRebuildStates); - hostRebuildStates = calloc(numHosts, sizeof(unsigned int)); - for (i = 0; i < numHosts; i++) - hostRebuildStates[i] = REBUILD1_STATE; - state = LEAD_REBUILD2_STATE; - replyBuffer[0] = GET_DHT_INFO_CMD; - if (sendto(udpServerPollSock.fd, (void *)replyBuffer, - sizeof(char), 0, (struct sockaddr *)&bcastAddr, socklen) == -1) - { perror("udpListen():sendto()"); } + status = inBuffer[1]; //status from remote host + break; } } } + if (!(status == OPERATION_OK || status == KEY_NOT_FOUND)) + { + pthread_mutex_lock(&stateMutex); + setState(REBUILD0_STATE); + outBuffer[0] = REBUILD_REQ; + udpSend(outBuffer, 1, leader); + pthread_mutex_unlock(&stateMutex); + } } + + close(pollsock.fd); + + return status; } -int udpSendWaitForResponse(unsigned int dest_ip, unsigned short dest_port, - void *msg, unsigned int msglen, void *resBuffer, unsigned int resBufferSize, - unsigned int timeout, unsigned int numRetries) +int dhtRemoveMult(unsigned int numKeys, unsigned int *keys) { - struct sockaddr_in server_addr; - struct sockaddr_in ack_addr; - socklen_t socklen = sizeof(struct sockaddr_in); - struct pollfd pollsock; - int retval; + int status; int i; - ssize_t bytesReceived; - - bzero((char *) &server_addr, sizeof(server_addr)); - server_addr.sin_family = AF_INET; - server_addr.sin_port = htons(dest_port); - server_addr.sin_addr.s_addr = htonl(dest_ip); - if ((pollsock.fd = socket(PF_INET, SOCK_DGRAM, IPPROTO_UDP)) == -1) + status = 0; + for (i = 0; i < numKeys; i++) { - perror("udpSendWaitForResponse():socket()"); - return -1; + if (dhtRemove(keys[i]) != 0) + status = -1; } - - pollsock.events = POLLIN; - - for (i = 0; i < MAX_RETRIES; i++) - { - if (i > 0) - dhtLog("udpSendWaitForResponse(): trying again, count: %d\n", i+1); - if (sendto(pollsock.fd, msg, msglen, 0, (struct sockaddr *)&server_addr, - socklen) == -1) - { - perror("udpSendWaitForResponse():sendto"); - return -1; - } - dhtLog("udpSendWaitForResponse(): message sent\n"); - retval = poll(&pollsock, 1, timeout); - if (retval < 0) - { - perror("udpSendWaitForResponse():poll()"); - } - else if (retval > 0) - { - bytesReceived = recvfrom(pollsock.fd, resBuffer, resBufferSize, 0, - (struct sockaddr *)&ack_addr, &socklen); - if ((ack_addr.sin_addr.s_addr == server_addr.sin_addr.s_addr) - && (ack_addr.sin_port == server_addr.sin_port)) - { - close(pollsock.fd); - dhtLog("udpSendWaitForResponse(): received response\n"); - return bytesReceived; - } - } - } - close(pollsock.fd); - printf("udpSendWaitForResponse(): timed out, no ack\n"); - return -1; + return status; } -int udpBroadcastWaitForResponse(unsigned int *reply_ip, - unsigned short dest_port, void *msg, unsigned int msglen, void *resBuffer, - unsigned int resBufferSize, unsigned int timeout, unsigned int numRetries) +int dhtSearch(unsigned int key, unsigned int *val) { - struct sockaddr_in server_addr; - struct sockaddr_in ack_addr; + struct sockaddr_in toAddr; + struct sockaddr_in fromAddr; socklen_t socklen = sizeof(struct sockaddr_in); struct pollfd pollsock; - int retval; + char inBuffer[6]; + char outBuffer[5]; + ssize_t bytesRcvd; int i; - ssize_t bytesReceived; - int on; - - bzero((char *) &server_addr, sizeof(server_addr)); - server_addr.sin_family = AF_INET; - server_addr.sin_port = htons(dest_port); - server_addr.sin_addr.s_addr = htonl(0xFFFFFFFF); + int retval; + int status = -1; - if ((pollsock.fd = socket(PF_INET, SOCK_DGRAM, IPPROTO_UDP)) == -1) - { - perror("udpBroadcastWaitForResponse():socket()"); - return -1; - } + bzero((char *)&toAddr, socklen); + toAddr.sin_family = AF_INET; + toAddr.sin_port = htons(UDP_PORT); - on = 1; - if (setsockopt(pollsock.fd, SOL_SOCKET, SO_BROADCAST, &on, sizeof(on)) == -1) + while (!(status == OPERATION_OK || status == KEY_NOT_FOUND)) { - perror("udpBroadcastWaitForResponse():setsockopt()"); - return -1; - } + pthread_mutex_lock(&stateMutex); + while (numBlocks == 0) + pthread_cond_wait(&stateCond, &stateMutex); + toAddr.sin_addr.s_addr = htonl(getKeyOwner(key)); + pthread_mutex_unlock(&stateMutex); - pollsock.events = POLLIN; - - for (i = 0; i < MAX_RETRIES; i++) - { - if (i > 0) - dhtLog("udpBroadcastWaitForResponse(): trying again, count: %d\n", i+1); - if (sendto(pollsock.fd, msg, msglen, 0, (struct sockaddr *)&server_addr, - socklen) == -1) + if ((pollsock.fd = socket(AF_INET, SOCK_DGRAM, 0)) < 0) { - perror("udpBroadcastWaitForResponse():sendto()"); + perror("dhtSearch():socket()"); return -1; } - dhtLog("udpBroadcastWaitForResponse(): message sent\n"); - retval = poll(&pollsock, 1, timeout); - if (retval !=0) + pollsock.events = POLLIN; + + outBuffer[0] = SEARCH_CMD; + write4(&outBuffer[1], key); + + for (i = 0; i < SEARCH_RETRIES; i++) { - bytesReceived = recvfrom(pollsock.fd, resBuffer, resBufferSize, 0, - (struct sockaddr *)&ack_addr, &socklen); - close(pollsock.fd); - *reply_ip = htonl(ack_addr.sin_addr.s_addr); - dhtLog("udpBroadcastWaitForResponse(): received response\n"); - return bytesReceived; + if (sendto(pollsock.fd, outBuffer, 5, 0, (struct sockaddr *)&toAddr, + socklen) < 0) + { + perror("dhtSearch():sendto()"); + break; + } + retval = poll(&pollsock, 1, SEARCH_TIMEOUT_MS); + if (retval < 0) + { + perror("dhtSearch():poll()"); + break; + } + if (retval > 0) + { + bytesRcvd = recvfrom(pollsock.fd, inBuffer, 6, 0, + (struct sockaddr *)&fromAddr, &socklen); + if (fromAddr.sin_addr.s_addr == toAddr.sin_addr.s_addr + && fromAddr.sin_port == toAddr.sin_port + && bytesRcvd == 6 && inBuffer[0] == SEARCH_RES) + { + status = inBuffer[1]; //status from remote host + *val = read4(&inBuffer[2]); + break; + } + } + } + if (!(status == OPERATION_OK || status == KEY_NOT_FOUND)) + { + pthread_mutex_lock(&stateMutex); + setState(REBUILD0_STATE); + outBuffer[0] = REBUILD_REQ; + udpSend(outBuffer, 1, leader); + pthread_mutex_unlock(&stateMutex); } } + close(pollsock.fd); - dhtLog("udpBroadcastWaitForResponse(): timed out, no ack\n"); - return -1; + + return status; } -// use TCP for potentially large and/or important data transfer -void *tcpListen() +int dhtSearchMult(unsigned int numKeys, unsigned int *keys, unsigned int *vals) { - struct sockaddr_in myAddr; - struct sockaddr_in clientAddr; - int tcpAcceptSock; - socklen_t socklen = sizeof(struct sockaddr_in); - pthread_t threadTcpAccept; - - tcpListenSock = socket(AF_INET, SOCK_STREAM, 0); - if (tcpListenSock == -1) + int i; + int status = 0; + for (i = 0; i < numKeys; i++) { - perror("tcpListen():socket()"); - pthread_exit(NULL); + if (dhtSearch(keys[i], &vals[i]) != 0) + status = -1; } + return status; +} - myAddr.sin_family = AF_INET; - myAddr.sin_port = htons(TCP_PORT); - myAddr.sin_addr.s_addr = INADDR_ANY; - memset(&(myAddr.sin_zero), '\0', 8); - - if (bind(tcpListenSock, (struct sockaddr *)&myAddr, socklen) == -1) - { - perror("tcpListen():socket()"); - pthread_exit(NULL); - } +/******************************************************************************* +* Local Function Definitions +*******************************************************************************/ - if (listen(tcpListenSock, BACKLOG) == -1) - { - perror("tcpListen():listen()"); - pthread_exit(NULL); - } +int msgSizeOk(unsigned char *msg, unsigned int size) +{ + unsigned short tmpNumHosts; + unsigned short tmpNumBlocks; - dhtLog("tcpListen(): listening on port %d\n", TCP_PORT); + if (size < 1) + return 1; - while(1) + switch (msg[0]) { - tcpAcceptSock = accept(tcpListenSock, (struct sockaddr *)&clientAddr, - &socklen); - pthread_create(&threadTcpAccept, NULL, tcpAccept, (void *)tcpAcceptSock); + case WHO_IS_LEADER_CMD: + case LEAVE_REQ: + case LEAVE_RES: + case DHT_UPDATE_RES: + case REBUILD_REQ: + case REBUILD_CMD: + case FILL_DHT_CMD: + case FILL_DHT_RES: + case RESUME_NORMAL_CMD: + case RESUME_NORMAL_RES: + return (size == 1); + case INSERT_RES: + case REMOVE_RES: + case JOIN_RES: + return (size == 2); + case REMOVE_CMD: + case SEARCH_CMD: + case WHO_IS_LEADER_RES: + case JOIN_REQ: + case ELECT_LEADER_CMD: + return (size == 5); + case SEARCH_RES: + return (size == 6); + case INSERT_CMD: + return (size == 9); + case DHT_UPDATE_CMD: + if (size < 5) + return 1; + tmpNumHosts = read2(&msg[1]); + tmpNumBlocks = read2(&msg[3]); + return (size == (5+sizeof(struct hostData)*tmpNumHosts+2*tmpNumBlocks)); + case ELECT_LEADER_RES: + if (size < 2) + return 1; + if (msg[1] == 0xFF) + return (size == 2); + if (size < 4) + return 1; + tmpNumHosts = read2(&msg[2]); + return (size == (4 + sizeof(struct hostData) * tmpNumHosts)); + case CONGRATS_CMD: + if (size < 3) + return 1; + tmpNumHosts = read2(&msg[1]); + return (size == (3 + sizeof(struct hostData) * tmpNumHosts)); + default: + return 1; } } -void *tcpAccept(void *arg) +unsigned short read2(unsigned char *ptr) { - int tcpAcceptSock = (int)arg; - int bytesReceived; - char msgType; - - dhtLog("tcpAccept(): accepted tcp connection, file descriptor: %d\n", - tcpAcceptSock); - - bytesReceived = recv(tcpAcceptSock, &msgType, sizeof(char), 0); - if (bytesReceived == -1) - { perror("tcpAccept():recv()"); } - else if (bytesReceived == 0) - { - dhtLog( "tcpAccept(): bytesReceived = 0\n", tcpAcceptSock); - } - else - { - switch (msgType) - { - case DHT_INFO_REQ: - if (send(tcpAcceptSock, &numHosts, sizeof(numHosts), 0) == -1) - { - perror("tcpAccept():send()"); - break; - } - if (send(tcpAcceptSock, &numBlocks, sizeof(numBlocks), 0) == -1) - { - perror("tcpAccept():send()"); - break; - } - if (send(tcpAcceptSock, hostArray, numHosts*sizeof(struct hostData), - 0) == -1) - { - perror("tcpAccept():send()"); - break; - } - if (send(tcpAcceptSock, blockOwnerArray, numBlocks*sizeof(unsigned int), - 0) == -1) - { - perror("tcpAccept():send()"); - break; - } - break; - default: - dhtLog("tcpAccept(): unrecognized msg type\n"); - } - } - - if (close(tcpAcceptSock) == -1) - { perror("tcpAccept():close()"); } + unsigned short tmp = (ptr[1] << 8) | ptr[0]; + return tmp; +} - dhtLog("tcpAccept(): closed tcp connection, file descriptor: %d\n", - tcpAcceptSock); +unsigned int read4(unsigned char *ptr) +{ + unsigned int tmp = (ptr[3] << 24) | (ptr[2] << 16) | (ptr[1] << 8) | ptr[0]; + return tmp; +} - pthread_exit(NULL); +void write2(unsigned char *ptr, unsigned short tmp) +{ + ptr[1] = (tmp >> 8) & 0xFF; + ptr[0] = tmp & 0xFF; + return; } -unsigned int getKeyOwner(unsigned int key) +void write4(unsigned char *ptr, unsigned int tmp) { - if (state == NORMAL_STATE || state == LEAD_NORMAL_STATE - || state == REBUILD3_STATE || state == LEAD_REBUILD3_STATE) - { - return hostArray[blockOwnerArray[hash(key)]].ipAddr; - } - else - { //TODO: figure out what is best to do here. Would like calls to dhtSearch, - // etc. to block rather than fail during rebuilds - return 0; - } + ptr[3] = (tmp >> 24) & 0xFF; + ptr[2] = (tmp >> 16) & 0xFF; + ptr[1] = (tmp >> 8) & 0xFF; + ptr[0] = tmp & 0xFF; + return; } -unsigned int getMyIpAddr() +unsigned int getMyIpAddr(const char *interfaceStr) { int sock; struct ifreq interfaceInfo; @@ -948,7 +758,7 @@ unsigned int getMyIpAddr() return 1; } - strcpy(interfaceInfo.ifr_name, DEFAULT_INTERFACE); + strcpy(interfaceInfo.ifr_name, interfaceStr); myAddr->sin_family = AF_INET; if(ioctl(sock, SIOCGIFADDR, &interfaceInfo) != 0) @@ -960,296 +770,260 @@ unsigned int getMyIpAddr() return ntohl(myAddr->sin_addr.s_addr); } -unsigned int findLeader() +int udpSend(unsigned char *msg, unsigned int size, unsigned int destIp) { - unsigned int reply_ip; - int bytesReceived; - char myMessage; - char response; - - dhtLog("findLeader(): broadcasting...\n"); - - myMessage = FIND_LEADER_REQ; + struct sockaddr_in peerAddr; + socklen_t socklen = sizeof(struct sockaddr_in); - bytesReceived = udpBroadcastWaitForResponse(&reply_ip, UDP_PORT, - (void *)&myMessage, sizeof(myMessage), (void *)&response, - sizeof(response), TIMEOUT_MS, MAX_RETRIES); + bzero(&peerAddr, socklen); + peerAddr.sin_family = AF_INET; + peerAddr.sin_addr.s_addr = htonl(destIp); + peerAddr.sin_port = htons(UDP_PORT); - if (bytesReceived == -1) + if (size >= 1) { - dhtLog("findLeader(): no response\n"); - return 0; - } - else if (response == FIND_LEADER_RES) - { - struct in_addr reply_addr; - reply_addr.s_addr = htonl(reply_ip); - dhtLog("findLeader(): leader found:%s\n", - inet_ntoa(reply_addr)); - return reply_ip; + if (msg[0] < NUM_MSG_TYPES) + dhtLog("udpSend(): sending %s to %s, %d bytes\n", msg_types[msg[0]], + inet_ntoa(peerAddr.sin_addr), size); + else + dhtLog("udpSend(): sending unknown message to %s, %d bytes\n", + inet_ntoa(peerAddr.sin_addr), size); } - else + + if (sendto(udpPollSock.fd, (void *)msg, size, 0, (struct sockaddr *)&peerAddr, + socklen) < 0) { - dhtLog("findLeader(): unexpected response\n"); - return 0; + perror("udpSend():sendto()"); + return -1; } + + return 0; } -int getDHTdata() +int udpSendAll(unsigned char *msg, unsigned int size) { - struct sockaddr_in leader_addr; - int sock; - char msg; - int bytesReceived; - - clearDHTdata(); - - if ((sock = socket(AF_INET, SOCK_STREAM, 0)) == -1) + int i; + int status = 0; + for (i = 0; i < numHosts; i++) { - perror("getDHTdata():socket()"); - return -1; + if ((hostReplied[i] == 0) && (hostArray[i].ipAddr != myHostData.ipAddr)) + { + if (udpSend(msg, size, hostArray[i].ipAddr) != 0) + status = -1; + } } + return status; +} - bzero((char *)&leader_addr, sizeof(leader_addr)); - leader_addr.sin_family = AF_INET; - leader_addr.sin_port = htons(TCP_PORT); - leader_addr.sin_addr.s_addr = htonl(leader); +//note: make sure this is only executed in a valid state, where numBlocks != 0 +unsigned int hash(unsigned int x) +{ + return (x % numBlocks); +} - if (connect(sock, (struct sockaddr *)&leader_addr, sizeof(leader_addr)) == -1) - { - perror("getDHTdata():connect()"); - close(sock); - return -1; - } - msg = DHT_INFO_REQ; - if (send(sock, &msg, sizeof(char), 0) == -1) - { - perror("getDHTdata():send()"); - close(sock); - return -1; - } - bytesReceived = recv(sock, &numHosts, sizeof(numHosts), 0); - if (bytesReceived == -1) - { - perror("getDHTdata():recv()"); - close(sock); - return -1; - } - if (bytesReceived != sizeof(numHosts)) - { - dhtLog("getDHTdata(): ERROR: numHosts not completely received\n"); - close(sock); - return -1; - } - bytesReceived = recv(sock, &numBlocks, sizeof(numBlocks), 0); - if (bytesReceived == -1) - { - perror("getDHTdata():recv()"); - close(sock); - return -1; - } - if (bytesReceived != sizeof(numBlocks)) - { - dhtLog("getDHTdata(): ERROR: numBlocks not completely received\n"); - close(sock); - return -1; - } - hostArray = calloc(numHosts, sizeof(struct hostData)); - bytesReceived = recv(sock, hostArray, numHosts*sizeof(struct hostData), 0); - if (bytesReceived == -1) - { - perror("getDHTdata():recv()"); - close(sock); - return -1; - } - if (bytesReceived != numHosts*sizeof(struct hostData)) - { - dhtLog("getDHTdata(): ERROR: hostArray not completely received\n"); - close(sock); - return -1; - } - blockOwnerArray = calloc(numBlocks, sizeof(unsigned int)); - bytesReceived = recv(sock,blockOwnerArray,numBlocks*sizeof(unsigned int),0); - if (bytesReceived == -1) +//note: make sure this is only executed in a valid state, where these arrays +// are allocated and the index mappings are consistent +unsigned int getKeyOwner(unsigned int key) +{ + return hostArray[blockOwnerArray[hash(key)]].ipAddr; +} + +//sets state and timer, if applicable +void setState(unsigned int newState) +{ + struct timeval now; + int i; + + gettimeofday(&now, NULL); + + if (newState >= NUM_STATES) { - perror("getDHTdata():recv()"); - close(sock); - return -1; + dhtLog("setState(): ERROR: invalid state %d\n", newState); } - if (bytesReceived != numBlocks*sizeof(unsigned int)) + else { - dhtLog("getDHTdata(): ERROR: blockOwnerArray not completely received\n"); - close(sock); - return -1; + if (timeout_vals[newState].tv_sec == 0 + && timeout_vals[newState].tv_usec == 0) + { //no timer + timerSet = 0; + } + else + { + timeradd(&now, &timeout_vals[newState], &timer); + timerSet = 1; + } + timeoutCntr = 0; + state = newState; + //TODO: only do this for states that require it + for (i = 0; i < numHosts; i++) + hostReplied[i] = 0; + + dhtLog("setState(): state set to %s\n", state_names[state]); } - dhtLog("getDHTdata(): got data:\n"); - writeDHTdata(logfile); - return 0; + return; } -unsigned int hash(unsigned int x) +//TODO: improve these simple and inefficient functions +int checkReplied(unsigned int ipAddr) { - //this shouldn't be called when numBlocks = 0, so if you get a divide-by-zero, - // make sure we are in a proper state for key owner lookups - return x % numBlocks; + int i; + + i = findHost(ipAddr); + + if (i == -1) + return -1; + + hostReplied[i] = 1; + + return 0; } -//This function will not return until it succeeds in submitting -// a rebuild request to the leader. It is then the leader's responibility -// to ensure that the rebuild is caried out -void initRebuild() +int allReplied() { - int bytesReceived; - char msg; - char response; - int done; - int retry_count; int i; - done = 0; - retry_count = 0; - - while (!done) - { - if (retry_count > 0) - { - dhtLog("initRebuild(): retry count:%d\n", retry_count); - } - - if (leader == 0 || retry_count > 0) - { - leader = findLeader(); //broadcast - if (leader == 0) //no response - { - //TODO:elect leader: this will do for now - initDHTdata(); - leader = getMyIpAddr(); - state = LEAD_NORMAL_STATE; - } - } + for (i = 0; i < numHosts; i++) + if ((hostReplied[i] == 0) && (hostArray[i].ipAddr != myHostData.ipAddr)) + return 0; - msg = REBUILD_REQ; - - bytesReceived = udpSendWaitForResponse(leader, UDP_PORT, - (void *)&msg, sizeof(msg), (void *)&response, sizeof(response), - TIMEOUT_MS, MAX_RETRIES); - if (bytesReceived == -1) - { perror("initRebuild():recv()"); } - else if (bytesReceived != sizeof(response)) - { - dhtLog("initRebuild(): ERROR: response not completely received\n"); - } - else if (response == NOT_LEADER) - { - struct in_addr address; - address.s_addr = htonl(leader); - dhtLog("initRebuild(): ERROR: %s no longer leader\n", - inet_ntoa(address)); - } - else if (response != REBUILD_RES) - { - dhtLog("initRebuild(): ERROR: unexpected response\n"); - } - else - { - dhtLog("initRebuild(): submitted rebuild request\n"); - writeDHTdata(logfile); - done = 1; - } - } - return; + return 1; } -void writeDHTdata(FILE *outfile) +int findHost(unsigned int ipAddr) { int i; - struct in_addr address; - fprintf(outfile,"numHosts=%d,numBlocks=%d\n", numHosts, numBlocks); - fprintf(outfile,"hostArray: index: ipAddr, maxKeyCapacity\n"); for (i = 0; i < numHosts; i++) - { - address.s_addr = htonl(hostArray[i].ipAddr); - fprintf(outfile,"%d: %s, %d\n", i, inet_ntoa(address), - hostArray[i].maxKeyCapacity); - } - fprintf(outfile,"blockOwnerArray: index: blockOwner\n"); - for (i = 0; i < numBlocks; i++) - fprintf(outfile,"%d: %d ", i, blockOwnerArray[i]); - fprintf(outfile,"\n"); + if (hostArray[i].ipAddr == ipAddr) + return i; //found, return index + + return -1; //not found } -void clearDHTdata() +int removeHost(unsigned int ipAddr) { - if (hostArray != NULL) + int i, j; + + i = findHost(ipAddr); + + if (i == -1) + return -1; + + for (j = 0; j < numBlocks; j++) { - free(hostArray); - hostArray = NULL; + if (blockOwnerArray[j] == i) + blockOwnerArray[j] = 0; //TODO: is this what I want to have happen? + else if (blockOwnerArray[j] > i) + blockOwnerArray[j]--; } - if (blockOwnerArray != NULL) + + for (; i < numHosts - 1; i++) { - free(blockOwnerArray); - blockOwnerArray = NULL; + hostArray[i] = hostArray[i+1]; + hostReplied[i] = hostReplied[i+1]; } - numHosts = numBlocks = hostArraySize = 0; - return; + numHosts--; + + return 0; } -void initDHTdata() +void removeUnresponsiveHosts() { int i; - clearDHTdata(); - hostArraySize = INIT_HOST_ALLOC; - hostArray = calloc(hostArraySize, sizeof(struct hostData)); - numHosts = 1; - hostArray[0] = myHostData; - numBlocks = INIT_BLOCK_NUM; - blockOwnerArray = calloc(numBlocks, sizeof(unsigned int)); - for (i = 0; i < numBlocks; i++) - blockOwnerArray[i] = 0; - - return; + for (i = 0; i < numHosts; i++) + { + if (!hostReplied[i] && hostArray[i].ipAddr != myHostData.ipAddr) + removeHost(hostArray[i].ipAddr); + } } -void addHost(struct hostData newHost) +int addHost(struct hostData newHost) { - struct hostData *newArray; - unsigned int newArraySize; + struct hostData *newHostArray; + unsigned char *newHostReplied; + int i; + int j; - if (hostArray == NULL || blockOwnerArray == NULL || hostArraySize == 0) - initDHTdata(); + for (i = 0; i < numHosts; i++) + { + if (hostArray[i].ipAddr == newHost.ipAddr) + { + hostArray[i] = newHost; + hostReplied[i] = 0; + return 0; + } + else if (hostArray[i].ipAddr > newHost.ipAddr) + { + if (numHosts == hostArraySize) + { + newHostArray = calloc(2 * hostArraySize, sizeof(struct hostData)); + newHostReplied = calloc(2 * hostArraySize, sizeof(unsigned char)); + memcpy(newHostArray, hostArray, (i * sizeof(struct hostData))); + memcpy(newHostReplied, hostReplied, (i * sizeof(unsigned char))); + newHostArray[i] = newHost; + newHostReplied[i] = 0; + memcpy(&newHostArray[i+1], &hostArray[i], ((numHosts - i) * + sizeof(struct hostData))); + memcpy(&newHostReplied[i+1], &hostReplied[i], ((numHosts - i) * + sizeof(unsigned char))); + free(hostArray); + free(hostReplied); + hostArray = newHostArray; + hostReplied = newHostReplied; + hostArraySize = 2 * hostArraySize; + } + else + { + for (j = numHosts; j > i; j--) + { + hostArray[j] = hostArray[j-1]; + hostReplied[j] = hostReplied[j-1]; + } + hostArray[i] = newHost; + hostReplied[i] = 0; + } + for(j = 0; j < numBlocks; j++) + { + if (blockOwnerArray[j] >= i) + blockOwnerArray[j]++; + } + numHosts++; + return 1; + } + } + //nothing greater, add to end if (numHosts == hostArraySize) { - newArraySize = hostArraySize * 2; - newArray = calloc(newArraySize, sizeof(struct hostData)); - memcpy(newArray, hostArray, (hostArraySize * sizeof(struct hostData))); + newHostArray = calloc(2 * hostArraySize, sizeof(struct hostData)); + newHostReplied = calloc(2 * hostArraySize, sizeof(unsigned char)); + memcpy(newHostArray, hostArray, (numHosts * sizeof(struct hostData))); + memcpy(newHostReplied, hostReplied, (numHosts * sizeof(unsigned char))); free(hostArray); - hostArray = newArray; - hostArraySize = newArraySize; + free(hostReplied); + hostArray = newHostArray; + hostReplied = newHostReplied; + hostArraySize = 2 * hostArraySize; } hostArray[numHosts] = newHost; + hostReplied[numHosts] = 0; numHosts++; - - return; + return 1; } void makeAssignments() { int i; - if (hostArray == NULL || blockOwnerArray == NULL || hostArraySize == 0) - initDHTdata(); - if (numBlocks < numHosts) { free(blockOwnerArray); while (numBlocks < numHosts) numBlocks *= 2; - blockOwnerArray = calloc(numBlocks, sizeof(unsigned int)); + blockOwnerArray = calloc(numBlocks, sizeof(unsigned short)); } for (i = 0; i < numBlocks; i++) @@ -1258,90 +1032,31 @@ void makeAssignments() return; } -//returns not-zero if ok, zero if not ok -int msgSizeOk(unsigned char type, unsigned int size) +void writeHostList() { - int status; + int i; + struct in_addr tmpAddr; - switch (type) + fprintf(logfile, "numHosts = %d\n", numHosts); + for (i = 0; i < numHosts; i++) { - case INSERT_CMD: - status = (size == sizeof(struct insertCmd)); - break; - case INSERT_RES: - status = (size == sizeof(struct insertRes)); - break; - case REMOVE_CMD: - status = (size == sizeof(struct removeCmd)); - break; - case REMOVE_RES: - status = (size == sizeof(struct removeRes)); - break; - case SEARCH_CMD: - status = (size == sizeof(struct searchCmd)); - break; - case SEARCH_RES: - status = (size == sizeof(struct searchRes)); - break; - case FIND_LEADER_REQ: - status = (size == sizeof(char)); - break; - case FIND_LEADER_RES: - status = (size == sizeof(char)); - break; - case REBUILD_REQ: - status = (size == sizeof(char)); - break; - case REBUILD_RES: - status = (size == sizeof(char)); - break; - case NOT_LEADER: - status = (size == sizeof(char)); - break; - case REBUILD_CMD: - status = (size == sizeof(char)); - break; - case JOIN_REQ: - status = (size == sizeof(struct joinReq)); - break; - case JOIN_RES: - status = (size == sizeof(char)); - break; - case GET_DHT_INFO_CMD: - status = (size == sizeof(char)); - break; - case DHT_INFO_REQ: - status = (size == sizeof(char)); - break; - case DHT_INFO_RES: - status = (size == sizeof(char)); - break; - case FILL_DHT_CMD: - status = (size == sizeof(char)); - break; - case FILL_DHT_RES: - status = (size == sizeof(char)); - break; - case REBUILD_DONE_INFO: - status = (size == sizeof(char)); - break; - default: - status = 0; - break; + tmpAddr.s_addr = htonl(hostArray[i].ipAddr); + fprintf(logfile, "%d) %s, %d\n", i, inet_ntoa(tmpAddr), + hostArray[i].maxKeyCapacity); } - return status; + return; } void dhtLog(const char *format, ...) { va_list args; - struct timeval now; +// struct timeval now; - if (gettimeofday(&now, NULL) < 0) - { perror("dhtLog():gettimeofday()"); } +// if (gettimeofday(&now, NULL) < 0) +// { perror("dhtLog():gettimeofday()"); } va_start(args, format); - if (fprintf(logfile, "%d.%06d:", now.tv_sec, now.tv_usec) < 0) - { perror("dhtLog():fprintf()"); } +// if (fprintf(logfile, "%d.%06d:", now.tv_sec, now.tv_usec) < 0) +// { perror("dhtLog():fprintf()"); } if (vfprintf(logfile, format, args) < 0) { perror("dhtLog():vfprintf()"); } if (fflush(logfile) == EOF) @@ -1351,6 +1066,686 @@ void dhtLog(const char *format, ...) return; } -#endif +void *fillTask() +{ + unsigned int *vals; + unsigned int *keys; + unsigned int numKeys; + int i; + + vals = mhashGetKeys(&numKeys); //note: key of mhash is val of dht + keys = calloc(numKeys, sizeof(unsigned int)); + + for (i = 0; i < numKeys; i++) + keys[i] = myHostData.ipAddr; + if (dhtInsertMult(numKeys, keys, vals) == 0) + fillStatus = 2; + else + fillStatus = 3; + + pthread_exit(NULL); +} + +void *udpListen() +{ + ssize_t bytesRcvd; + struct sockaddr_in peerAddr; + unsigned int peerIp; + socklen_t socklen = sizeof(struct sockaddr_in); + unsigned char inBuffer[MAX_MSG_SIZE]; + unsigned char outBuffer[MAX_MSG_SIZE]; + int pollret; + struct timeval now; + struct in_addr tmpAddr; + struct hostData tmpHost; + unsigned int tmpKey; + unsigned int tmpVal; + struct hostData *hostDataPtr; + unsigned short *uShortPtr; + unsigned int tmpUInt; + unsigned int tmpUShort; + int i; + unsigned int oldState; + + dhtLog("udpListen(): linstening on port %d...\n", UDP_PORT); + + while (1) + { + pollret = poll(&udpPollSock, 1, TIMEOUT_PERIOD); + pthread_mutex_lock(&stateMutex); + oldState = state; + if (pollret < 0) + { + perror("udpListen():poll()"); + } + else if (pollret > 0) + { + bytesRcvd = recvfrom(udpPollSock.fd, inBuffer, MAX_MSG_SIZE, 0, + (struct sockaddr *)&peerAddr, &socklen); + if (bytesRcvd < 1) + { + dhtLog("udpListen(): ERROR: bytesRcvd = %d\n", bytesRcvd); + } + else if (inBuffer[0] >= NUM_MSG_TYPES) + { + dhtLog("udpListen(): ERROR: unknown msg type = %d\n", inBuffer[0]); + } + else if (!msgSizeOk(inBuffer, bytesRcvd)) + { + dhtLog("udpListen(): ERROR: msg size not ok: type = %s\n, size = %d\n", + msg_types[inBuffer[0]], bytesRcvd); + } + else if (state == EXIT2_STATE) + { + //do nothing + } + else if (state == INIT1_STATE) + { //after initialization with seed, do not proceed until seed replies + dhtLog("udpListen(): received %s from %s, %d bytes\n", + msg_types[inBuffer[0]], inet_ntoa(peerAddr.sin_addr), bytesRcvd); + for (i = 0; i < bytesRcvd; i++) + dhtLog(" %x", inBuffer[i]); + dhtLog("\n"); + peerIp = ntohl(peerAddr.sin_addr.s_addr); + if (peerIp == seed && inBuffer[0] == WHO_IS_LEADER_RES) + { + tmpHost.ipAddr = peerIp; + tmpHost.maxKeyCapacity = 0; + addHost(tmpHost); + writeHostList(); + leader = read4(&inBuffer[1]); + tmpAddr.s_addr = htonl(leader); + dhtLog("leader = %s\n", inet_ntoa(tmpAddr)); + if (leader != 0) + { + setState(INIT2_STATE); + outBuffer[0] = JOIN_REQ; + write4(&outBuffer[1], myHostData.maxKeyCapacity); + udpSend(outBuffer, 5, leader); + } + else + { + electionOriginator = myHostData.ipAddr; + setState(ELECT1_STATE); + outBuffer[0] = ELECT_LEADER_CMD; + write4(&outBuffer[1], myHostData.ipAddr); //originator = me + udpSendAll(outBuffer, 5); + } + } + } + else + { + dhtLog("udpListen(): received %s from %s, %d bytes\n", + msg_types[inBuffer[0]], inet_ntoa(peerAddr.sin_addr), bytesRcvd); + for (i = 0; i < bytesRcvd; i++) + dhtLog(" %x", inBuffer[i]); + dhtLog("\n"); + peerIp = ntohl(peerAddr.sin_addr.s_addr); + switch (inBuffer[0]) + { + case INSERT_CMD: + if (state == NORMAL_STATE || state == LEAD_NORMAL1_STATE + || state == LEAD_NORMAL2_STATE || state == REBUILD4_STATE + || state == REBUILD5_STATE || state == LEAD_REBUILD3_STATE) + { + tmpKey = read4(&inBuffer[1]); + tmpVal = read4(&inBuffer[5]); + outBuffer[0] = INSERT_RES; + if (getKeyOwner(tmpKey) == myHostData.ipAddr) + { + if (chashInsert(myHashTable, tmpKey, (void *)tmpVal) == 0) + outBuffer[1] = OPERATION_OK; + else + outBuffer[1] = INTERNAL_ERROR; + } + else + { + outBuffer[1] = NOT_KEY_OWNER; + } + //reply to client socket + sendto(udpPollSock.fd, outBuffer, 2, 0, + (struct sockaddr *)&peerAddr, socklen); + } + break; + case REMOVE_CMD: + if (state == NORMAL_STATE || state == LEAD_NORMAL1_STATE + || state == LEAD_NORMAL2_STATE) + { + tmpKey = read4(&inBuffer[1]); + outBuffer[0] = REMOVE_RES; + if (getKeyOwner(tmpKey) == myHostData.ipAddr) + { + if (chashRemove(myHashTable, tmpKey) == 0) + outBuffer[1] = OPERATION_OK; + else + outBuffer[1] = KEY_NOT_FOUND; + } + else + { + outBuffer[1] = NOT_KEY_OWNER; + } + //reply to client socket + sendto(udpPollSock.fd, outBuffer, 2, 0, + (struct sockaddr *)&peerAddr, socklen); + } + break; + case SEARCH_CMD: + if (state == NORMAL_STATE || state == LEAD_NORMAL1_STATE + || state == LEAD_NORMAL2_STATE) + { + tmpKey = read4(&inBuffer[1]); + outBuffer[0] = SEARCH_RES; + if (getKeyOwner(tmpKey) == myHostData.ipAddr) + { + if ((tmpVal = (unsigned int)chashSearch(myHashTable, tmpKey)) != 0) + { + outBuffer[1] = OPERATION_OK; + write4(&outBuffer[2], tmpVal); + } + else + { + outBuffer[1] = KEY_NOT_FOUND; + write4(&outBuffer[2], 0); + } + } + else + { + outBuffer[1] = NOT_KEY_OWNER; + write4(&outBuffer[2], 0); + } + //reply to client socket + sendto(udpPollSock.fd, outBuffer, 6, 0, + (struct sockaddr *)&peerAddr, socklen); + } + break; + case WHO_IS_LEADER_CMD: + tmpHost.ipAddr = peerIp; + tmpHost.maxKeyCapacity = 0; + addHost(tmpHost); + writeHostList(); + outBuffer[0] = WHO_IS_LEADER_RES; + //leader == 0 means I don't know who it is + write4(&outBuffer[1], leader); + udpSend(outBuffer, 5, peerIp); + break; + case JOIN_REQ: + if (state == LEAD_NORMAL1_STATE || state == LEAD_NORMAL2_STATE) + { + tmpHost.ipAddr = peerIp; + tmpHost.maxKeyCapacity = read4(&inBuffer[1]); + addHost(tmpHost); + writeHostList(); + if (state == LEAD_NORMAL1_STATE) + setState(LEAD_NORMAL2_STATE); + outBuffer[0] = JOIN_RES; + outBuffer[1] = 0; //status, success + udpSend(outBuffer, 2, peerIp); + } + else if (state == LEAD_REBUILD1_STATE) + { + //note: I don't need to addHost(). + checkReplied(peerIp); + outBuffer[0] = JOIN_RES; + outBuffer[1] = 0; //status, success + udpSend(outBuffer, 2, peerIp); + if (allReplied()) + { + makeAssignments(); + setState(LEAD_REBUILD2_STATE); + outBuffer[0] = DHT_UPDATE_CMD; + write2(&outBuffer[1], numHosts); + write2(&outBuffer[3], numBlocks); + memcpy(&outBuffer[5], hostArray, numHosts*sizeof(struct hostData)); + memcpy(&outBuffer[5+numHosts*sizeof(struct hostData)], + blockOwnerArray, numBlocks*2); + udpSendAll(outBuffer, 5 + sizeof(struct hostData) * numHosts + + 2 * numBlocks); + } + } + break; + case JOIN_RES: + if (state == REBUILD1_STATE) + { + setState(REBUILD2_STATE); + } + else if (state == INIT2_STATE) + { + setState(NORMAL_STATE); + } + break; + case LEAVE_REQ: + if (state == LEAD_NORMAL1_STATE || state == LEAD_NORMAL2_STATE) + { //TODO: make this graceful, instead of just rebuilding + removeHost(peerIp); + if (state != LEAD_NORMAL2_STATE) + setState(LEAD_NORMAL2_STATE); + } + break; + case DHT_UPDATE_CMD: + if (state == REBUILD2_STATE && peerIp == leader) + { + free(hostArray); + free(blockOwnerArray); + numHosts = read2(&inBuffer[1]); + numBlocks = read2(&inBuffer[3]); + while (hostArraySize < numHosts) + hostArraySize *= 2; + hostArray = calloc(hostArraySize, sizeof(struct hostData)); + blockOwnerArray = calloc(numBlocks, 2); + memcpy(hostArray, &inBuffer[5], numHosts*sizeof(struct hostData)); + memcpy(blockOwnerArray, &inBuffer[5+numHosts*sizeof(struct hostData)], numBlocks*2); + writeHostList(); + setState(REBUILD3_STATE); + outBuffer[0] = DHT_UPDATE_RES; + udpSend(outBuffer, 1, peerIp); + } + break; + case DHT_UPDATE_RES: + if (state == LEAD_REBUILD2_STATE) + { + checkReplied(peerIp); + if (allReplied()) + { + setState(LEAD_REBUILD3_STATE); + outBuffer[0] = FILL_DHT_CMD; + udpSendAll(outBuffer, 1); + if (fillStatus != 0) + dhtLog("udpListen(): ERROR: fillTask already running\n"); + fillStatus = 1; + if (pthread_create(&threadFillTask, NULL, fillTask, NULL) != 0) + dhtLog("udpListen(): ERROR creating threadFillTask\n"); + } + } + break; + case ELECT_LEADER_CMD: + tmpUInt = read4(&inBuffer[1]); + if ((state == ELECT1_STATE || state == ELECT2_STATE) + && tmpUInt >= electionOriginator) + { //already participating in a higher-priority election + outBuffer[0] = ELECT_LEADER_RES; + outBuffer[1] = 0xFF; + udpSend(outBuffer, 2, peerIp); + } + else + { //join election + electionOriginator = tmpUInt; + electionParent = peerIp; + setState(ELECT1_STATE); + outBuffer[0] = ELECT_LEADER_CMD; + write4(&outBuffer[1], electionOriginator); + //don't bother forwarding the message to originator or parent + checkReplied(electionOriginator); + checkReplied(electionParent); + if (allReplied()) + { //in case that is everybody I know of + setState(ELECT2_STATE); + outBuffer[0] = ELECT_LEADER_RES; + outBuffer[1] = 0; + write2(&outBuffer[2], numHosts); + memcpy(&outBuffer[4], hostArray, sizeof(struct hostData) + * numHosts); + udpSend(outBuffer, 4 + sizeof(struct hostData) * numHosts, + electionParent); + } + else + { + udpSendAll(outBuffer, 5); + } + } + break; + case ELECT_LEADER_RES: + if (state == ELECT1_STATE) + { + checkReplied(peerIp); + if (inBuffer[1] != 0xFF) + { + tmpUShort = read2(&inBuffer[2]); + hostDataPtr = (struct hostData *)&inBuffer[4]; + for (i = 0; i < tmpUShort; i++) + addHost(hostDataPtr[i]); + writeHostList(); + } + if (allReplied()) + { + setState(ELECT2_STATE); + if (electionOriginator == myHostData.ipAddr) + { + leader = hostArray[0].ipAddr; + if (leader == myHostData.ipAddr) + { //I am the leader + dhtLog("I am the leader!\n"); + setState(LEAD_REBUILD1_STATE); + outBuffer[0] = REBUILD_CMD; + udpSendAll(outBuffer, 1); + } + else + { //notify leader + outBuffer[0] = CONGRATS_CMD; + write2(&outBuffer[1], numHosts); + hostDataPtr = (struct hostData *)&outBuffer[3]; + for (i = 0; i < numHosts; i++) + hostDataPtr[i] = hostArray[i]; + udpSend(outBuffer, 3 + sizeof(struct hostData) * numHosts, + leader); + } + } + else + { + outBuffer[0] = ELECT_LEADER_RES; + outBuffer[1] = 0; + write2(&outBuffer[2], numHosts); + hostDataPtr = (struct hostData *)&outBuffer[4]; + for (i = 0; i < numHosts; i++) + hostDataPtr[i] = hostArray[i]; + udpSend(outBuffer, 4 + sizeof(struct hostData) * numHosts, + electionParent); + } + } + } + break; + case CONGRATS_CMD: + if (state == ELECT2_STATE) + { //I am the leader + leader = myHostData.ipAddr; + dhtLog("I am the leader!\n"); + tmpUShort = read2(&inBuffer[1]); + hostDataPtr = (struct hostData *)&inBuffer[3]; + for (i = 0; i < tmpUShort; i++) + addHost(hostDataPtr[i]); + writeHostList(); + setState(LEAD_REBUILD1_STATE); + outBuffer[0] = REBUILD_CMD; + udpSendAll(outBuffer, 1); + } + break; + case REBUILD_REQ: + if (state == LEAD_NORMAL1_STATE || state == LEAD_NORMAL2_STATE) + { + setState(LEAD_REBUILD1_STATE); + outBuffer[0] = REBUILD_CMD; + udpSendAll(outBuffer, 1); + } + break; + case REBUILD_CMD: + leader = peerIp; //consider this a declaration of authority + setState(REBUILD1_STATE); + outBuffer[0] = JOIN_REQ; + write4(&outBuffer[1], myHostData.maxKeyCapacity); + udpSend(outBuffer, 5, leader); + break; + case FILL_DHT_CMD: + if (state == REBUILD3_STATE && peerIp == leader) + { + setState(REBUILD4_STATE); + if (fillStatus != 0) + dhtLog("udpListen(): ERROR: fillTask already running\n"); + fillStatus = 1; + if (pthread_create(&threadFillTask, NULL, fillTask, NULL) != 0) + dhtLog("udpListen(): ERROR creating threadFillTask\n"); + } + break; + case FILL_DHT_RES: + if (state == LEAD_REBUILD3_STATE) + { + checkReplied(peerIp); + if (allReplied() && fillStatus == 2) + { + fillStatus = 0; + setState(LEAD_REBUILD4_STATE); + outBuffer[0] = RESUME_NORMAL_CMD; + udpSendAll(outBuffer, 1); + } + } + break; + case RESUME_NORMAL_CMD: + if (state == REBUILD5_STATE && peerIp == leader) + { + setState(NORMAL_STATE); + outBuffer[0] = RESUME_NORMAL_RES; + udpSend(outBuffer, 1, leader); + } + break; + case RESUME_NORMAL_RES: + if (state == LEAD_REBUILD4_STATE) + { + checkReplied(peerIp); + if (allReplied()) + { + setState(LEAD_NORMAL1_STATE); + } + } + break; + } + } + } + if (state == REBUILD4_STATE) + { + switch (fillStatus) + { + case 0: dhtLog("udpListen(): ERROR: fillStatus=0 in REBUILD4_STATE\n"); + break; + case 1: //do nothing + break; + case 2: //done filling the dht, notify leader + fillStatus = 0; + setState(REBUILD5_STATE); + outBuffer[0] = FILL_DHT_RES; + udpSend(outBuffer, 1, leader); + break; + case 3: //error encountered -> restart rebuild + fillStatus = 0; + setState(REBUILD0_STATE); + outBuffer[0] = REBUILD_REQ; + udpSend(outBuffer, 1, leader); + break; + } + } + if (state == LEAD_REBUILD3_STATE) + { + switch (fillStatus) + { + case 0: dhtLog("udpListen(): ERROR: fillStatus=0 in LEAD_REBUILD3_STATE\n"); + break; + case 1: //do nothing + break; + case 2: //I'm done, now is everybody else also done? + if (allReplied()) + { + fillStatus = 0; + setState(LEAD_REBUILD4_STATE); + outBuffer[0] = RESUME_NORMAL_CMD; + udpSendAll(outBuffer, 1); + } + break; + case 3: //error encountered -> restart rebuild + fillStatus = 0; + setState(LEAD_REBUILD1_STATE); + outBuffer[0] = REBUILD_CMD; + udpSendAll(outBuffer, 1); + break; + } + } + if (timerSet) + { + gettimeofday(&now, NULL); + if (timercmp(&now, &timer, >)) + { + if (timeoutCntr < retry_vals[state]) + { + timeoutCntr++; + timeradd(&now, &timeout_vals[state], &timer); + dhtLog("udpListen(): retry: %d\n", timeoutCntr); + switch (state) + { + case INIT1_STATE: + outBuffer[0] = WHO_IS_LEADER_CMD; + udpSend(outBuffer, 1, seed); + break; + case INIT2_STATE: + outBuffer[0] = JOIN_REQ; + write4(&outBuffer[1], myHostData.maxKeyCapacity); + udpSend(outBuffer, 5, leader); + break; + case ELECT1_STATE: + outBuffer[0] = ELECT_LEADER_CMD; + write4(&outBuffer[1], electionOriginator); + udpSendAll(outBuffer, 5); + break; + case ELECT2_STATE: + if (electionOriginator == myHostData.ipAddr) + { //retry notify leader + outBuffer[0] = CONGRATS_CMD; + write2(&outBuffer[1], numHosts); + memcpy(&outBuffer[3], hostArray, sizeof(struct hostData) + * numHosts); + udpSend(outBuffer, 3 + sizeof(struct hostData) * numHosts, + leader); + } + else + { + outBuffer[0] = ELECT_LEADER_RES; + outBuffer[1] = 0; + write2(&outBuffer[2], numHosts); + memcpy(&outBuffer[4], hostArray, sizeof(struct hostData) + * numHosts); + udpSend(outBuffer, 4 + sizeof(struct hostData) * numHosts, + electionParent); + } + break; + case REBUILD0_STATE: + outBuffer[0] = REBUILD_REQ; + udpSend(outBuffer, 1, leader); + break; + case REBUILD1_STATE: + outBuffer[0] = JOIN_REQ; + write4(&outBuffer[1], myHostData.maxKeyCapacity); + udpSend(outBuffer, 5, leader); + break; + case REBUILD5_STATE: + outBuffer[0] = FILL_DHT_RES; + udpSend(outBuffer, 1, leader); + break; + case LEAD_REBUILD1_STATE: + outBuffer[0] = REBUILD_CMD; + udpSendAll(outBuffer, 1); + break; + case LEAD_REBUILD2_STATE: + outBuffer[0] = DHT_UPDATE_CMD; + write2(&outBuffer[1], numHosts); + write2(&outBuffer[3], numBlocks); + memcpy(&outBuffer[5], hostArray, numHosts + * sizeof(struct hostData)); + memcpy(&outBuffer[5+numHosts*sizeof(struct hostData)], + blockOwnerArray, numBlocks*2); + udpSendAll(outBuffer, 5 + sizeof(struct hostData) * numHosts + + 2 * numBlocks); + break; + case LEAD_REBUILD3_STATE: + outBuffer[0] = FILL_DHT_CMD; + udpSendAll(outBuffer, 1); + break; + case LEAD_REBUILD4_STATE: + outBuffer[0] = RESUME_NORMAL_CMD; + udpSendAll(outBuffer, 1); + break; + case EXIT1_STATE: //TODO... + break; + case NORMAL_STATE: + case LEAD_NORMAL1_STATE: + case LEAD_NORMAL2_STATE: + case REBUILD2_STATE: + case REBUILD3_STATE: + case REBUILD4_STATE: + case EXIT2_STATE: //we shouldn't get here + break; + } + } + else + { + dhtLog("udpListen(): timed out in state %s after %d retries\n", + state_names[state], timeoutCntr); + switch (state) + { + case INIT1_STATE: + setState(EXIT2_STATE); + break; + case LEAD_NORMAL2_STATE: + setState(LEAD_REBUILD1_STATE); + outBuffer[0] = REBUILD_CMD; + udpSendAll(outBuffer, 1); + break; + case ELECT1_STATE: + dhtLog("removing unresponsive hosts, before:\n"); + writeHostList(); + removeUnresponsiveHosts(); + dhtLog("after\n"); + writeHostList(); + setState(ELECT2_STATE); + if (electionOriginator == myHostData.ipAddr) + { + leader = hostArray[0].ipAddr; + if (leader == myHostData.ipAddr) + { //I am the leader + dhtLog("I am the leader!\n"); + setState(LEAD_REBUILD1_STATE); + outBuffer[0] = REBUILD_CMD; + udpSendAll(outBuffer, 1); + } + else + { //notify leader + outBuffer[0] = CONGRATS_CMD; + write2(&outBuffer[1], numHosts); + memcpy(&outBuffer[3], hostArray, sizeof(struct hostData) + * numHosts); + udpSend(outBuffer, 3 + sizeof(struct hostData) * numHosts, + leader); + } + } + else + { + outBuffer[0] = ELECT_LEADER_RES; + outBuffer[1] = 0; + write2(&outBuffer[2], numHosts); + memcpy(&outBuffer[4], hostArray, sizeof(struct hostData) + * numHosts); + udpSend(outBuffer, 4 + sizeof(struct hostData) * numHosts, + electionParent); + } + break; + case INIT2_STATE: + case ELECT2_STATE: + case REBUILD0_STATE: + case REBUILD1_STATE: + case REBUILD2_STATE: + case REBUILD3_STATE: + case REBUILD4_STATE: + case REBUILD5_STATE: + case LEAD_REBUILD1_STATE: + case LEAD_REBUILD2_STATE: + case LEAD_REBUILD3_STATE: + case LEAD_REBUILD4_STATE: + //start election + electionOriginator = myHostData.ipAddr; + setState(ELECT1_STATE); + outBuffer[0] = ELECT_LEADER_CMD; + write4(&outBuffer[1], myHostData.ipAddr); //originator = me + udpSendAll(outBuffer, 5); + break; + case EXIT1_STATE: + setState(EXIT2_STATE); + break; + case NORMAL_STATE: + case LEAD_NORMAL1_STATE: + case EXIT2_STATE: //we shouldn't get here + break; + } + } + } + } + if (state != oldState) + pthread_cond_broadcast(&stateCond); + pthread_mutex_unlock(&stateMutex); + } +} diff --git a/Robust/src/Runtime/DSTM/interface/dht.h b/Robust/src/Runtime/DSTM/interface/dht.h index ceeb2a0a..344ab530 100644 --- a/Robust/src/Runtime/DSTM/interface/dht.h +++ b/Robust/src/Runtime/DSTM/interface/dht.h @@ -1,12 +1,20 @@ #ifndef _DHT_H #define _DHT_H -//#define SIMPLE_DHT +#include + +/******************************************************************************* +* Local Structs +*******************************************************************************/ #define DHT_NO_KEY_LIMIT 0xFFFFFFFF +/******************************************************************************* +* Interface Function Prototypes +*******************************************************************************/ + //called by host which joins (or starts) the system -void dhtInit(unsigned int maxKeyCapaciy); +void dhtInit(unsigned int seedIp, unsigned int maxKeyCapaciy); //exit system, cleanup void dhtExit(); @@ -14,11 +22,17 @@ void dhtExit(); //returns 0 if successful, -1 if an error occurred int dhtInsert(unsigned int key, unsigned int val); +//simultaneously inserts the key-val pairs in the given arrays +int dhtInsertMult(unsigned int numKeys, unsigned int *keys, unsigned int *vals); //returns 0 if successful, -1 if an error occurred int dhtRemove(unsigned int key); +//simultaneously delete the keys in the given array +int dhtRemoveMult(unsigned int numKeys, unsigned int *keys); //returns 0 if successful and copies val into *val, // 1 if key not found, -1 if an error occurred int dhtSearch(unsigned int key, unsigned int *val); - +//simultaneously search for the vals that correspond to the given keys. +// result is placed in vals[] +int dhtSearchMult(unsigned int numKeys, unsigned int *keys, unsigned int *vals); #endif diff --git a/Robust/src/Runtime/DSTM/interface/mlookup.c b/Robust/src/Runtime/DSTM/interface/mlookup.c index 8bc046dc..2c952638 100644 --- a/Robust/src/Runtime/DSTM/interface/mlookup.c +++ b/Robust/src/Runtime/DSTM/interface/mlookup.c @@ -187,3 +187,35 @@ unsigned int mhashResize(unsigned int newsize) { return 0; } +unsigned int *mhashGetKeys(unsigned int *numKeys) +{ + unsigned int *keys; + int i, keyindex; + mhashlistnode_t *curr; + + pthread_mutex_lock(&mlookup.locktable); + + *numKeys = mlookup.numelements; + keys = calloc(*numKeys, sizeof(unsigned int)); + + keyindex = 0; + for (i = 0; i < mlookup.size; i++) + { + if (mlookup.table[i].key != 0) + { + curr = &mlookup.table[i]; + while (curr != NULL) + { + keys[keyindex++] = curr->key; + curr = curr->next; + } + } + } + + if (keyindex != *numKeys) + printf("mhashGetKeys(): WARNING: incorrect mlookup.numelements value!\n"); + + pthread_mutex_unlock(&mlookup.locktable); + return keys; +} + diff --git a/Robust/src/Runtime/DSTM/interface/mlookup.h b/Robust/src/Runtime/DSTM/interface/mlookup.h index f3b0159d..85396c52 100644 --- a/Robust/src/Runtime/DSTM/interface/mlookup.h +++ b/Robust/src/Runtime/DSTM/interface/mlookup.h @@ -28,6 +28,8 @@ unsigned mhashInsert(unsigned int key, void *val); void *mhashSearch(unsigned int key); //returns val, NULL if not found unsigned int mhashRemove(unsigned int key); //returns -1 if not found unsigned int mhashResize(unsigned int newsize); +unsigned int *mhashGetKeys(unsigned int *numKeys); +void mhashPrint(); #endif diff --git a/Robust/src/Runtime/DSTM/interface/testdht.c b/Robust/src/Runtime/DSTM/interface/testdht.c index 636b4dd4..acc86a0e 100644 --- a/Robust/src/Runtime/DSTM/interface/testdht.c +++ b/Robust/src/Runtime/DSTM/interface/testdht.c @@ -2,7 +2,7 @@ #include "dht.h" #include "clookup.h" -#define NUM_ITEMS 1000 +#define NUM_ITEMS 100000 int main() { @@ -13,7 +13,7 @@ int main() int error; chashtable_t *localHash; - dhtInit(DHT_NO_KEY_LIMIT); + dhtInit(0x80C3AF45, DHT_NO_KEY_LIMIT); localHash = chashCreate(HASH_SIZE, LOADFACTOR); srandom(time(0)); @@ -23,6 +23,8 @@ int main() vals[key] = random(); } + sleep(5); + printf("testing dhtInsert() and dhtSearch()\n"); for (key = 0; key < NUM_ITEMS; key++)