DHT: Somewhat functional. Added a function to mlookup allowing the dht to retrieve...
authorerubow <erubow>
Sat, 4 Aug 2007 00:40:11 +0000 (00:40 +0000)
committererubow <erubow>
Sat, 4 Aug 2007 00:40:11 +0000 (00:40 +0000)
Robust/src/Runtime/DSTM/interface/clookup.c
Robust/src/Runtime/DSTM/interface/dht.c
Robust/src/Runtime/DSTM/interface/dht.h
Robust/src/Runtime/DSTM/interface/mlookup.c
Robust/src/Runtime/DSTM/interface/mlookup.h
Robust/src/Runtime/DSTM/interface/testdht.c

index aa01469829a04de04e6d6a72e329dfe20bbcbf94..ca7d34c9f5299b8e2b35fb631b13a7a48829d0f0 100644 (file)
@@ -57,7 +57,7 @@ unsigned int chashInsert(chashtable_t *table, unsigned int key, void *val) {
                        return 1;
                }
                node->key = key;
-               node->val = val ;
+               node->val = val;
                node->next = ptr[index].next;
                ptr[index].next = node;
        }
index 2b956665b89ad367c7f3f3ee0169506fc099a1c0..e0ef4445d97a377acf393a4dcf4a26013ee27c76 100644 (file)
@@ -1,46 +1,24 @@
-#include "dht.h"
-
-#ifdef SIMPLE_DHT
-
-#include <arpa/inet.h>
-
-#define NUM_HOSTS 4
-#define OIDS_PER_HOST 0x40000000
-
-//set these to your IP addresses
-unsigned int hosts[NUM_HOSTS] = {
-       0xc0a802c8,
-       0xc0a802c9,
-       0xc0a802ca,
-       0xc0a802cb,
-};
-
-//does nothing
-void dhtInit(unsigned int maxKeyCapaciy)
-{      return;}
-
-//does nothing
-void dhtExit()
-{      return;}
-
-//does nothing, returns 0
-int dhtInsert(unsigned int key, unsigned int val)
-{      return 0;}
-
-//does nothing, returns 0
-int dhtRemove(unsigned int key)
-{      return 0;}
-
-//returns 0 if successful and copies val into *val,
-// 1 if key not found, -1 if an error occurred
-int dhtSearch(unsigned int key, unsigned int *val)
-{
-       *val = hosts[key / OIDS_PER_HOST];
-       return 0;
-}
-
-#else
-
+/*******************************************************************************
+*                                 dht.c
+*
+*  High-performance Distributed Hash Table for finding the location of objects
+* in a Distributed Shared Transactional Memory system.
+*
+* Creator: Erik Rubow
+*
+* TODO:
+* 1) Instead of having dhtInsertMult, dhtSearchMult, etc. call their single-key
+* counterparts repeatedly, define some new messages to handle it more
+* efficiently.
+* 2) Improve the efficiency of functions that work with hostArray, hostReplied,
+* and blockOwnerArray.
+* 3) Currently a join or leave causes a rebuild of the entire hash table.
+* Implement more graceful join and leave procedures.
+* 4) Fine tune timeout values for performance, possibly implement a backoff
+* algorithm to prevent overloading the network.
+* 5) Whatever else I'm forgetting
+*
+*******************************************************************************/
 /*******************************************************************************
 *                              Includes
 *******************************************************************************/
@@ -62,120 +40,125 @@ int dhtSearch(unsigned int key, unsigned int *val)
 #include <net/if.h>
 #include <linux/sockios.h>
 #include <sys/time.h>
+#include <sys/queue.h>
+#include "dht.h"
 #include "clookup.h" //this works for now, do we need anything better?
+#include "mlookup.h"
 
 /*******************************************************************************
 *                           Local Defines, Structs
 *******************************************************************************/
 
-#define BUFFER_SIZE 512 //maximum message size
+#define MAX_MSG_SIZE 1500
 #define UDP_PORT 2157
-#define TCP_PORT 2157
-#define BACKLOG 10 //max pending tcp connections
-#define TIMEOUT_MS 500
-#define MAX_RETRIES 3
-#define INIT_HOST_ALLOC 1
-#define INIT_BLOCK_NUM 1
+#define INIT_HOST_ALLOC 3
+#define INIT_NUM_BLOCKS 16
 #define DEFAULT_INTERFACE "eth0"
-#define DHT_LOG "dht.log"
-
-//make sure this is consistent with enum below
-#define NUM_MSG_TYPES 20
-
+#define TIMEOUT_PERIOD 100
+#define INSERT_TIMEOUT_MS 500
+#define INSERT_RETRIES 50
+#define REMOVE_TIMEOUT_MS 500
+#define REMOVE_RETRIES 50
+#define SEARCH_TIMEOUT_MS 500
+#define SEARCH_RETRIES 50
+
+//message types
 //make sure this matches msg_types global var
-enum {
+enum
+{
        INSERT_CMD,
        INSERT_RES,
        REMOVE_CMD,
        REMOVE_RES,
        SEARCH_CMD,
        SEARCH_RES,
-       FIND_LEADER_REQ,
-       FIND_LEADER_RES,
-       REBUILD_REQ,
-       REBUILD_RES,
-       NOT_LEADER,
-       REBUILD_CMD,
+       WHO_IS_LEADER_CMD,
+       WHO_IS_LEADER_RES,
        JOIN_REQ,
        JOIN_RES,
-       GET_DHT_INFO_CMD,
-       DHT_INFO_REQ,
-       DHT_INFO_RES,
+       LEAVE_REQ,
+       LEAVE_RES,
+       DHT_UPDATE_CMD,
+       DHT_UPDATE_RES,
+       ELECT_LEADER_CMD,
+       ELECT_LEADER_RES,
+       CONGRATS_CMD,
+       REBUILD_REQ,
+       REBUILD_CMD,
        FILL_DHT_CMD,
        FILL_DHT_RES,
-       REBUILD_DONE_INFO
+       RESUME_NORMAL_CMD,
+       RESUME_NORMAL_RES,
+       NUM_MSG_TYPES
 };
 
-//status codes
-enum {
-       INSERT_OK,
-       INSERT_ERROR,
-       REMOVE_OK,
-       REMOVE_ERROR,
-       KEY_FOUND,
-       KEY_NOT_FOUND,
-       NOT_KEY_OWNER,
-};
-
-enum {
+//states
+//make sure this matches state_names, timeout_vals, and retry_vals global vars
+enum
+{
+       INIT1_STATE,
+       INIT2_STATE,
        NORMAL_STATE,
+       LEAD_NORMAL1_STATE,
+       LEAD_NORMAL2_STATE,
+       ELECT1_STATE,
+       ELECT2_STATE,
+       REBUILD0_STATE,
        REBUILD1_STATE,
        REBUILD2_STATE,
        REBUILD3_STATE,
-       LEAD_NORMAL_STATE,
+       REBUILD4_STATE,
+       REBUILD5_STATE,
        LEAD_REBUILD1_STATE,
        LEAD_REBUILD2_STATE,
-       LEAD_REBUILD3_STATE
-};
-
-struct hostData {
-       unsigned int ipAddr;
-       unsigned int maxKeyCapacity;
-};
-
-struct insertCmd {
-       unsigned int msgType:8;
-       unsigned int unused:24;
-       unsigned int key;
-       unsigned int val;
-};
-
-struct removeCmd {
-       unsigned int msgType:8;
-       unsigned int unused:24;
-       unsigned int key;
-};
-
-struct searchCmd {
-       unsigned int msgType:8;
-       unsigned int unused:24;
-       unsigned int key;
+       LEAD_REBUILD3_STATE,
+       LEAD_REBUILD4_STATE,
+       EXIT1_STATE,
+       EXIT2_STATE,
+       NUM_STATES
 };
 
-struct insertRes {
-       unsigned int msgType:8;
-       unsigned int unused:24;
-       unsigned int status;
+//status codes
+enum
+{
+       OPERATION_OK,
+       KEY_NOT_FOUND,
+       NOT_KEY_OWNER,
+       NOT_LEADER,
+       INTERNAL_ERROR
 };
 
-struct removeRes {
-       unsigned int msgType:8;
-       unsigned int unused:24;
-       unsigned int status;
+struct hostData
+{
+       unsigned int ipAddr;
+       unsigned int maxKeyCapacity;
 };
 
-struct searchRes {
-       unsigned int msgType:8;
-       unsigned int unused:24;
-       unsigned int status;
-       unsigned int val;
-};
+/*******************************************************************************
+*                         Local Function Prototypes
+*******************************************************************************/
 
-struct joinReq {
-       unsigned int msgType:8;
-       unsigned int unused:24;
-       struct hostData newHostData;
-};
+int msgSizeOk(unsigned char *msg, unsigned int size);
+unsigned short read2(unsigned char *msg);
+unsigned int read4(unsigned char *msg);
+void write2(unsigned char *ptr, unsigned short tmp);
+void write4(unsigned char *ptr, unsigned int tmp);
+unsigned int getMyIpAddr(const char *interfaceStr);
+int udpSend(unsigned char *msg, unsigned int size, unsigned int destIp);
+int udpSendAll(unsigned char *msg, unsigned int size);
+unsigned int hash(unsigned int x);
+unsigned int getKeyOwner(unsigned int key);
+void setState(unsigned int newState);
+void makeAssignments();
+int addHost(struct hostData newHost);
+int removeHost(unsigned int ipAddr);
+void removeUnresponsiveHosts();
+int checkReplied(unsigned int ipAddr);
+int allReplied();
+void writeHostList();
+void dhtLog(const char *format, ...);
+void *fillTask();
+void *udpListen();
 
 /*******************************************************************************
 *                           Global Variables
@@ -190,751 +173,578 @@ const char *msg_types[NUM_MSG_TYPES] =
        "REMOVE_RES",
        "SEARCH_CMD",
        "SEARCH_RES",
-       "FIND_LEADER_REQ",
-       "FIND_LEADER_RES",
-       "REBUILD_REQ",
-       "REBUILD_RES",
-       "NOT_LEADER",
-       "REBUILD_CMD",
+       "WHO_IS_LEADER_CMD",
+       "WHO_IS_LEADER_RES",
        "JOIN_REQ",
        "JOIN_RES",
-       "GET_DHT_INFO_CMD",
-       "DHT_INFO_REQ",
-       "DHT_INFO_RES",
+       "LEAVE_REQ",
+       "LEAVE_RES",
+       "DHT_UPDATE_CMD",
+       "DHT_UPDATE_RES",
+       "ELECT_LEADER_CMD",
+       "ELECT_LEADER_RES",
+       "CONGRATS_CMD",
+       "REBUILD_REQ",
+       "REBUILD_CMD",
        "FILL_DHT_CMD",
        "FILL_DHT_RES",
-       "REBUILD_DONE_INFO"
+       "RESUME_NORMAL_CMD",
+       "RESUME_NORMAL_RES"
+};
+
+const char *state_names[NUM_STATES] =
+{
+       "INIT1_STATE",
+       "INIT2_STATE",
+       "NORMAL_STATE",
+       "LEAD_NORMAL1_STATE",
+       "LEAD_NORMAL2_STATE",
+       "ELECT1_STATE",
+       "ELECT2_STATE",
+       "REBUILD0_STATE",
+       "REBUILD1_STATE",
+       "REBUILD2_STATE",
+       "REBUILD3_STATE",
+       "REBUILD4_STATE",
+       "REBUILD5_STATE",
+       "LEAD_REBUILD1_STATE",
+       "LEAD_REBUILD2_STATE",
+       "LEAD_REBUILD3_STATE",
+       "LEAD_REBUILD4_STATE",
+       "EXIT1_STATE",
+       "EXIT2_STATE",
+};
+
+//note: { 0, 0 } means no timeout
+struct timeval timeout_vals[NUM_STATES] =
+{
+       { 0, 500000 }, //INIT1_STATE
+       { 0, 500000 }, //INIT2_STATE
+       { 0, 0 }, //NORMAL_STATE
+       { 0, 0 }, //LEAD_NORMAL1_STATE
+       { 3, 0 }, //LEAD_NORMAL2_STATE
+       { 1, 0 }, //ELECT1_STATE
+       { 1, 0 }, //ELECT2_STATE
+       { 0, 500000 }, //REBUILD0_STATE
+       { 0, 500000 }, //REBUILD1_STATE
+       { 10, 0 }, //REBUILD2_STATE
+       { 10, 0 }, //REBUILD3_STATE
+       { 10, 0 }, //REBUILD4_STATE
+       { 1, 0 }, //REBUILD5_STATE
+       { 1, 0 }, //LEAD_REBUILD1_STATE
+       { 1, 0 }, //LEAD_REBUILD2_STATE
+       { 10, 0 }, //LEAD_REBUILD3_STATE
+       { 10, 0 }, //LEAD_REBUILD4_STATE
+       { 0, 500000 }, //EXIT1_STATE
+       { 0, 0 } //EXIT2_STATE
+};
+
+int retry_vals[NUM_STATES] =
+{
+       100, //INIT1_STATE
+       10, //INIT2_STATE
+       0, //NORMAL_STATE
+       0, //LEAD_NORMAL1_STATE
+       0, //LEAD_NORMAL2_STATE
+       10, //ELECT1_STATE
+       10, //ELECT2_STATE
+       10, //REBUILD0_STATE
+       10, //REBUILD1_STATE
+       0, //REBUILD2_STATE
+       0, //REBUILD3_STATE
+       0, //REBUILD4_STATE
+       10, //REBUILD5_STATE
+       10, //LEAD_REBUILD1_STATE
+       10, //LEAD_REBUILD2_STATE
+       10, //LEAD_REBUILD3_STATE
+       10, //LEAD_REBUILD4_STATE
+       10, //EXIT1_STATE
+       0 //EXIT2_STATE
 };
 
 FILE *logfile;
-//ip address of leader
-unsigned int leader;
-//set by dhtInit()
 struct hostData myHostData;
-//number of hosts in the system
-unsigned int numHosts;
-//ip address and max key capacity of each host
-struct hostData *hostArray;
-//memory allocated for this many items in hostArray
-unsigned int hostArraySize;
-//number of keyspace divisions, preferably a power of 2 > numHosts
-unsigned int numBlocks;
-//this array has numBlocks elements, each of which contains an index to hostArray
-// the key owner is found by hashing the key into one of these blocks and using this
-// array to find the corresponding host in hostArray
-unsigned int *blockOwnerArray;
-//used by leader to track which hosts have responded, etc.
-unsigned int *hostRebuildStates;
-//thread handles
 pthread_t threadUdpListen;
-pthread_t threadTcpListen;
-//server sockets
-struct pollfd udpServerPollSock;
-int tcpListenSock;
-//see above for enumeration of states
-int state;
-
-/*******************************************************************************
-*                         Local Function Prototypes
-*******************************************************************************/
-
-//log funtion, use like printf()
-void dhtLog(const char *format, ...);
-//return my IP address
-unsigned int getMyIpAddr();
-//sends broadcast to discover leader
-unsigned int findLeader();
-//UDP server
-void *udpListen();
-//TCP server
-void *tcpListen();
-//TCP connection handler
-void *tcpAccept(void *);
-//returns number of bytes received in resBuffer, or -1 if an error occurred
-int udpSendWaitForResponse(unsigned int dest_ip, unsigned short dest_port,
-       void *msg, unsigned int msglen, void *resBuffer, unsigned int resBufferSize,
-       unsigned int timeout, unsigned int numRetries);
-//returns number of bytes received in resBuffer, or -1 if an error occurred
-int udpBroadcastWaitForResponse(unsigned int *reply_ip,
-       unsigned short dest_port, void *msg, unsigned int msglen, void *resBuffer,
-       unsigned int resBufferSize, unsigned int timeout, unsigned int numRetries);
-//just UDP it
-int sendNoWait(unsigned int dest_ip, unsigned short dest_port, void *msg,
-       unsigned int msglen);
-//right now this hashes the key into a block and returns the block owner
-unsigned int getKeyOwner(unsigned int key);
-//simple hash
-unsigned int hash(unsigned int x);
-//sends REBUILD_REQ to leader, retries until leader responds, or causes new leader to be chosen
-void initRebuild();
-//adds entry to end of hostArray, increments numHosts,
-// allocates more space if necessary
-void addHost(struct hostData newHost);
-//initiates TCP connection with leader, gets DHT data
-int getDHTdata();
-//outputs readable DHT data to outfile
-void writeDHTdata(FILE *outfile);
-void clearDHTdata();
-void initDHTdata();
-void makeAssignments();
-//returns not-zero if ok, zero if not ok
-int msgSizeOk(unsigned char type, unsigned int size);
+pthread_t threadFillTask;
+//status of fillTask: 0 = ready to run, 1 = running, 2 = completed, 3 = error
+int fillStatus;
+struct pollfd udpPollSock;
+unsigned int state;
+unsigned int seed;
+unsigned int leader;
+unsigned int electionOriginator;
+unsigned int electionParent;
+unsigned int hostArraySize = 0;
+struct hostData *hostArray = NULL;
+unsigned int numBlocks = 0;
+unsigned short *blockOwnerArray = NULL;
+unsigned char *hostReplied = NULL;
+pthread_mutex_t stateMutex;
+pthread_cond_t stateCond;
+chashtable_t *myHashTable;
+unsigned int numHosts;
+struct timeval timer;
+int timerSet;
+int timeoutCntr;
 
 /*******************************************************************************
-*                      Global Function Definitions
+*                      Interface Function Definitions
 *******************************************************************************/
 
-void dhtInit(unsigned int maxKeyCapacity)
+void dhtInit(unsigned int seedIpAddr, unsigned int maxKeyCapacity)
 {
-       unsigned int myMessage;
-       int bytesReceived;
-       int i;
-       int ret;
+       struct in_addr tmpAddr;
+       char filename[23] = "dht-";
+       struct sockaddr_in myAddr;
+       struct sockaddr_in seedAddr;
+       socklen_t socklen = sizeof(struct sockaddr_in);
+       char initMsg;
 
-       logfile = fopen(DHT_LOG, "w");
-       dhtLog("dhtInit() - initializing...\n");
+       tmpAddr.s_addr = htonl(getMyIpAddr(DEFAULT_INTERFACE));
+       strcat(filename, inet_ntoa(tmpAddr));
+       strcat(filename, ".log");
+       printf("log file: %s\n", filename);
 
-       myHostData.ipAddr = getMyIpAddr();
+       logfile = fopen(filename, "w");
+       dhtLog("dhtInit(): inializing...\n");
+
+       myHostData.ipAddr = getMyIpAddr(DEFAULT_INTERFACE);
        myHostData.maxKeyCapacity = maxKeyCapacity;
 
-       numHosts = numBlocks = hostArraySize = 0;
-       hostArray = NULL;
-       blockOwnerArray = NULL;
-       hostRebuildStates = NULL;
+       seed = seedIpAddr;
+       leader = 0;
+       electionOriginator = 0;
+       electionParent = 0;
+       hostArraySize = INIT_HOST_ALLOC;
+       hostArray = calloc(hostArraySize, sizeof(struct hostData));
+       hostReplied = calloc(hostArraySize, sizeof(unsigned char));
+       hostArray[0] = myHostData;
+       numHosts = 1;
+       numBlocks = INIT_NUM_BLOCKS;
+       blockOwnerArray = calloc(numBlocks, sizeof(unsigned short));
+       pthread_mutex_init(&stateMutex, NULL);
+       pthread_cond_init(&stateCond, NULL);
+       myHashTable = chashCreate(HASH_SIZE, LOADFACTOR);
+
+       udpPollSock.fd = socket(AF_INET, SOCK_DGRAM, IPPROTO_UDP);
+       if (udpPollSock.fd < 0)
+               perror("dhtInit():socket()");
+
+       udpPollSock.events = POLLIN;
+       
+       bzero(&myAddr, socklen);
+       myAddr.sin_family = AF_INET;
+       myAddr.sin_addr.s_addr = htonl(INADDR_ANY);
+       myAddr.sin_port = htons(UDP_PORT);
 
-       state = NORMAL_STATE;
+       if (bind(udpPollSock.fd, (struct sockaddr *)&myAddr, socklen) < 0)
+               perror("dhtInit():bind()");
 
-       pthread_create(&threadUdpListen, NULL, udpListen, NULL);
-       pthread_create(&threadTcpListen, NULL, tcpListen, NULL);
+       if (seed == 0)
+       {
+               dhtLog("I am the leader\n");
+               leader = myHostData.ipAddr;
+               setState(LEAD_NORMAL1_STATE);
+       }
+       else
+       {
+               initMsg = WHO_IS_LEADER_CMD;
+               udpSend(&initMsg, 1, seed);
+               setState(INIT1_STATE);
+       }
 
-       initRebuild();
+       if (pthread_create(&threadUdpListen, NULL, udpListen, NULL) != 0)
+               dhtLog("dhtInit() - ERROR creating threadUdpListen\n");
 
        return;
 }
 
 void dhtExit()
-{
+{ //TODO: do this gracefully, wait for response from leader, etc.
+       char msg;
+
+       msg = LEAVE_REQ;
+       udpSend(&msg, 1, leader);
        dhtLog("dhtExit(): cleaning up...\n");
-       fclose(logfile);
        pthread_cancel(threadUdpListen);
-       pthread_cancel(threadTcpListen);
-       close(udpServerPollSock.fd);
-       close(tcpListenSock);
-       clearDHTdata();
+       close(udpPollSock.fd);
+       free(hostArray);
+       free(hostReplied);
+       free(blockOwnerArray);
+       fclose(logfile);
+
+       return;
 }
 
 int dhtInsert(unsigned int key, unsigned int val)
 {
-       unsigned int dest_ip = getKeyOwner(key);
-       struct insertCmd myMessage;
-       struct insertRes response;
-       int bytesReceived;
-
-       myMessage.msgType = INSERT_CMD;
-       myMessage.key = key;
-       myMessage.val = val;
-       
-       bytesReceived = udpSendWaitForResponse(dest_ip, UDP_PORT, (void *)&myMessage,
-               sizeof(struct insertCmd), (void *)&response, sizeof(struct insertRes),
-               TIMEOUT_MS, MAX_RETRIES);
-       if (bytesReceived == sizeof(struct insertRes))
-       {
-               if (response.msgType == INSERT_RES)
-               {
-                       if (response.status == INSERT_OK)
-                               return 0;
-//                     if (response.status == NOT_KEY_OWNER)
-               }
-       }
-//TODO: find owner and try again, request rebuild if necessary
-       return -1; //this function should be robust enough to always return 0
-}
+       struct sockaddr_in toAddr;
+       struct sockaddr_in fromAddr;
+       socklen_t socklen = sizeof(struct sockaddr_in);
+       struct pollfd pollsock;
+       char inBuffer[2];
+       char outBuffer[9];
+       ssize_t bytesRcvd;
+       int i;
+       int retval;
+       int status = -1;
 
-int dhtRemove(unsigned int key)
-{
-       unsigned int dest_ip = getKeyOwner(key);
-       struct removeCmd myMessage;
-       struct removeRes response;
-       int bytesReceived;
-       
-       myMessage.msgType = REMOVE_CMD;
-       myMessage.key = key;
+       bzero((char *)&toAddr, socklen);
+       toAddr.sin_family = AF_INET;
+       toAddr.sin_port = htons(UDP_PORT);
 
-       bytesReceived = udpSendWaitForResponse(dest_ip, UDP_PORT, (void *)&myMessage,
-               sizeof(struct removeCmd), (void *)&response, sizeof(struct removeRes),
-               TIMEOUT_MS, MAX_RETRIES);
-       if (bytesReceived == sizeof(struct removeRes))
+       while (status != OPERATION_OK)
        {
-               if (response.msgType == REMOVE_RES)
+               pthread_mutex_lock(&stateMutex);
+               while (!(state == NORMAL_STATE || state == LEAD_NORMAL1_STATE
+                               || state == LEAD_NORMAL2_STATE || state == REBUILD4_STATE
+                               || state == LEAD_REBUILD3_STATE))
+                       pthread_cond_wait(&stateCond, &stateMutex);
+               toAddr.sin_addr.s_addr = htonl(getKeyOwner(key));
+               pthread_mutex_unlock(&stateMutex);
+
+               if ((pollsock.fd = socket(AF_INET, SOCK_DGRAM, 0)) < 0)
                {
-                       if (response.status == REMOVE_OK)
-                               return 0;
-//                     if (response.status == NOT_KEY_OWNER)
+                       perror("dhtInsert():socket()");
+                       return -1;
                }
-       }
-//TODO: find owner and try again, request rebuild if necessary
-       return -1; //this function should be robust enough to always return 0
-}
+               pollsock.events = POLLIN;
 
-int dhtSearch(unsigned int key, unsigned int *val)
-{
-       unsigned int dest_ip = getKeyOwner(key);
-       struct searchCmd myMessage;
-       struct searchRes response;
-       int bytesReceived;
-
-       myMessage.msgType = SEARCH_CMD;
-       myMessage.key = key;
-
-       bytesReceived = udpSendWaitForResponse(dest_ip, UDP_PORT, (void *)&myMessage,
-               sizeof(struct searchCmd), (void *)&response, sizeof(struct searchRes),
-               TIMEOUT_MS, MAX_RETRIES);
-       if (bytesReceived == sizeof(struct searchRes))
-       {
-               if (response.msgType == SEARCH_RES)
+               outBuffer[0] = INSERT_CMD;
+               write4(&outBuffer[1], key);
+               write4(&outBuffer[5], val);
+
+               for (i = 0; i < INSERT_RETRIES; i++)
                {
-                       if (response.status == KEY_FOUND)
+                       if (sendto(pollsock.fd, outBuffer, 9, 0, (struct sockaddr *)&toAddr,
+                               socklen) < 0)
                        {
-                               *val = response.val;
-                               return 0;
+                               perror("dhtInsert():sendto()");
+                               break;
                        }
-                       if (response.status == KEY_NOT_FOUND)
+                       retval = poll(&pollsock, 1, INSERT_TIMEOUT_MS);
+                       if (retval < 0)
                        {
-                               return 1;
+                               perror("dhtInsert():poll()");
+                               break;
+                       }
+                       if (retval > 0)
+                       {
+                               bytesRcvd = recvfrom(pollsock.fd, inBuffer, 2, 0,
+                                       (struct sockaddr *)&fromAddr, &socklen);
+                               if (fromAddr.sin_addr.s_addr == toAddr.sin_addr.s_addr
+                                       && fromAddr.sin_port == toAddr.sin_port
+                                       && bytesRcvd == 2 && inBuffer[0] == INSERT_RES)
+                               {
+                                       status = inBuffer[1]; //status from remote host
+                                       break;
+                               }
                        }
-//                     if (response.status == NOT_KEY_OWNER)
+               }
+               if (status != OPERATION_OK)
+               {
+                       pthread_mutex_lock(&stateMutex);
+                       setState(REBUILD0_STATE);
+                       outBuffer[0] = REBUILD_REQ;
+                       udpSend(outBuffer, 1, leader);
+                       pthread_mutex_unlock(&stateMutex);
                }
        }
-//TODO: find owner and try again, request rebuild if necessary
-       return -1; //this function should be robust enough to always return 0 or 1
-}
 
-/*******************************************************************************
-*                      Local Function Definitions
-*******************************************************************************/
+       close(pollsock.fd);
 
-//use UDP for messages that are frequent and short
-void *udpListen()
+       return status;
+}
+
+int dhtInsertMult(unsigned int numKeys, unsigned int *keys,    unsigned int *vals)
 {
-       struct sockaddr_in myAddr;
-       struct sockaddr_in clientAddr;
-       struct sockaddr_in bcastAddr;
-       socklen_t socklen = sizeof(struct sockaddr_in);
-       char buffer[BUFFER_SIZE];
-       ssize_t bytesReceived;
-       struct insertCmd *insertCmdPtr;
-       struct removeCmd *removeCmdPtr;
-       struct searchCmd *searchCmdPtr;
-       struct insertRes *insertResPtr;
-       struct removeRes *removeResPtr;
-       struct searchRes *searchResPtr;
-       struct joinReq *joinReqPtr;
-       char replyBuffer[BUFFER_SIZE];
-       struct timeval now;
-       struct timeval rebuild1Timeout;
-       int rebuild1TimerSet;
-       int on;
-       int pollret;
+       int status;
        int i;
 
-       chashtable_t *myHashTable = chashCreate(HASH_SIZE, LOADFACTOR);
-
-       if ((udpServerPollSock.fd = socket(AF_INET, SOCK_DGRAM, IPPROTO_UDP)) == -1)
+       status = 0;
+       for (i = 0; i < numKeys; i++)
        {
-               perror("udpListen():socket()");
-               pthread_exit(NULL);
+               if (dhtInsert(keys[i], vals[i]) != 0)
+                       status = -1;
        }
+       return status;
+}
 
-       on = 1;
-       if (setsockopt(udpServerPollSock.fd, SOL_SOCKET, SO_BROADCAST, &on,
-               sizeof(on)) == -1)
-       {
-               perror("udpBroadcastWaitForResponse():setsockopt()");
-               pthread_exit(NULL);
-       }
-       
-       udpServerPollSock.events = POLLIN;
-       
-       bzero(&myAddr, socklen);
-       myAddr.sin_family = AF_INET;
-       myAddr.sin_addr.s_addr = htonl(INADDR_ANY);
-       myAddr.sin_port = htons(UDP_PORT);
+int dhtRemove(unsigned int key)
+{
+       struct sockaddr_in toAddr;
+       struct sockaddr_in fromAddr;
+       socklen_t socklen = sizeof(struct sockaddr_in);
+       struct pollfd pollsock;
+       char inBuffer[2];
+       char outBuffer[5];
+       ssize_t bytesRcvd;
+       int i;
+       int retval;
+       int status = -1;
 
-       bzero(&bcastAddr, socklen);
-       bcastAddr.sin_family = AF_INET;
-       bcastAddr.sin_addr.s_addr = htonl(0xFFFFFFFF);
-       bcastAddr.sin_port = htons(UDP_PORT);
+       bzero((char *)&toAddr, socklen);
+       toAddr.sin_family = AF_INET;
+       toAddr.sin_port = htons(UDP_PORT);
 
-       if (bind(udpServerPollSock.fd, (struct sockaddr *)&myAddr, socklen) == -1)
+       while (!(status == OPERATION_OK || status == KEY_NOT_FOUND))
        {
-               perror("udpListen():bind()");
-               pthread_exit(NULL);
-       }
-       dhtLog("udpListen(): listening on port %d\n", UDP_PORT);
+               pthread_mutex_lock(&stateMutex);
+               while (!(state == NORMAL_STATE || state == LEAD_NORMAL1_STATE
+                               || state == LEAD_NORMAL2_STATE))
+                       pthread_cond_wait(&stateCond, &stateMutex);
+               toAddr.sin_addr.s_addr = htonl(getKeyOwner(key));
+               pthread_mutex_unlock(&stateMutex);
+
+               if ((pollsock.fd = socket(AF_INET, SOCK_DGRAM, 0)) < 0)
+               {
+                       perror("dhtRemove():socket()");
+                       return -1;
+               }
+               pollsock.events = POLLIN;
 
-       rebuild1TimerSet = 0;
-       while(1)
-       {
-               pollret = poll(&udpServerPollSock, 1, TIMEOUT_MS);
-               if (pollret < 0)
-               {       perror("udpListen():poll()");   }
-               else if (pollret > 0)
+               outBuffer[0] = REMOVE_CMD;
+               write4(&outBuffer[1], key);
+
+               for (i = 0; i < REMOVE_RETRIES; i++)
                {
-                       if ((bytesReceived = recvfrom(udpServerPollSock.fd, buffer, BUFFER_SIZE,
-                               0, (struct sockaddr *)&clientAddr, &socklen)) == -1)
-                       {       perror("udpListen():recvfrom()");       }
-                       else if (bytesReceived == 0)
+                       if (sendto(pollsock.fd, outBuffer, 5, 0, (struct sockaddr *)&toAddr,
+                               socklen) < 0)
                        {
-                               dhtLog("udpListen(): recvfrom() returned 0\n");
+                               perror("dhtRemove():sendto()");
+                               break;
                        }
-                       else
+                       retval = poll(&pollsock, 1, REMOVE_TIMEOUT_MS);
+                       if (retval < 0)
                        {
-                               dhtLog("udpListen(): received %s from %s\n",
-                                       (buffer[0] < NUM_MSG_TYPES ? msg_types[buffer[0]] :
-                                       "unknown message"), inet_ntoa(clientAddr.sin_addr));
-                               if (!msgSizeOk(buffer[0], bytesReceived))
-                               {
-                                       dhtLog("udpListen(): ERROR: incorrect message size\n");
-                               }
-                               else
-                               {
-                                       switch (buffer[0])
-                                       {
-                                               case INSERT_CMD:
-                                                       if (state == NORMAL_STATE || state == LEAD_NORMAL_STATE
-                                                               || state == REBUILD3_STATE || state == LEAD_REBUILD3_STATE)
-                                                       {
-                                                               insertCmdPtr = (struct insertCmd *)buffer;
-                                                               dhtLog( "udpListen(): Insert: key=%d, val=%d\n",
-                                                                       insertCmdPtr->key, insertCmdPtr->val);
-                                                               insertResPtr = (struct insertRes *)replyBuffer;
-                                                               insertResPtr->msgType = INSERT_RES;
-                                                               insertResPtr->unused = 0;
-                                                               if (getKeyOwner(insertCmdPtr->key) == myHostData.ipAddr)
-                                                               {
-                                                                       //note: casting val to void * in order to conform to API
-                                                                       if(chashInsert(myHashTable, insertCmdPtr->key,
-                                                                                       (void *)insertCmdPtr->val) == 0)
-                                                                               insertResPtr->status = INSERT_OK;
-                                                                       else
-                                                                               insertResPtr->status = INSERT_ERROR;
-                                                               }
-                                                               else
-                                                               {
-                                                                       insertResPtr->status = NOT_KEY_OWNER;;
-                                                               }
-                                                               if (sendto(udpServerPollSock.fd, (void *)insertResPtr,
-                                                                       sizeof(struct insertRes), 0, (struct sockaddr *)&clientAddr,
-                                                                       socklen) == -1)
-                                                               {       perror("udpListen():sendto()"); }
-                                                       }
-                                                       break;
-                                               case REMOVE_CMD:
-                                                       if (state == NORMAL_STATE || state == LEAD_NORMAL_STATE)
-                                                       {
-                                                               removeCmdPtr = (struct removeCmd *)buffer;
-                                                               dhtLog("udpListen(): Remove: key=%d\n", removeCmdPtr->key);
-                                                               removeResPtr = (struct removeRes *)replyBuffer;
-                                                               removeResPtr->msgType = REMOVE_RES;
-                                                               removeResPtr->unused = 0;
-                                                               if (getKeyOwner(removeCmdPtr->key) == myHostData.ipAddr)
-                                                               {
-                                                                       //note: casting val to void * in order to conform to API
-                                                                       if(chashRemove(myHashTable, removeCmdPtr->key) == 0)
-                                                                               removeResPtr->status = INSERT_OK;
-                                                                       else
-                                                                               removeResPtr->status = INSERT_ERROR;
-                                                               }
-                                                               else
-                                                               {
-                                                                       removeResPtr->status = NOT_KEY_OWNER;
-                                                               }
-                                                               if (sendto(udpServerPollSock.fd, (void *)removeResPtr,
-                                                                       sizeof(struct removeRes), 0, (struct sockaddr *)&clientAddr,
-                                                                       socklen) == -1)
-                                                               {       perror("udpListen():sendto()"); }
-                                                       }
-                                                       break;
-                                               case SEARCH_CMD:
-                                                       if (state == NORMAL_STATE || state == LEAD_NORMAL_STATE)
-                                                       {
-                                                               searchCmdPtr = (struct searchCmd *)buffer;
-                                                               dhtLog("udpListen(): Search: key=%d\n",searchCmdPtr->key);
-                                                               searchResPtr = (struct searchRes *)replyBuffer;
-                                                               searchResPtr->msgType = SEARCH_RES;
-                                                               searchResPtr->unused = 0;
-                                                               if (getKeyOwner(searchCmdPtr->key) == myHostData.ipAddr)
-                                                               {
-                                                                       //note: casting val to void * in order to conform to API
-                                                                       if((searchResPtr->val = (unsigned int)chashSearch(myHashTable,
-                                                                                       searchCmdPtr->key)) == 0)
-                                                                               searchResPtr->status = KEY_NOT_FOUND;
-                                                                       else
-                                                                               searchResPtr->status = KEY_FOUND;
-                                                               }
-                                                               else
-                                                               {
-                                                                       searchResPtr->status = NOT_KEY_OWNER;
-                                                               }
-                                                               if (sendto(udpServerPollSock.fd, (void *)searchResPtr,
-                                                                       sizeof(struct searchRes), 0, (struct sockaddr *)&clientAddr,
-                                                                       socklen) == -1)
-                                                               {       perror("udpListen():sendto()"); }
-                                                       }
-                                                       break;
-                                               case FIND_LEADER_REQ:
-                                                       if (state == LEAD_NORMAL_STATE || state == LEAD_REBUILD1_STATE
-                                                               || state == LEAD_REBUILD2_STATE || state == LEAD_REBUILD3_STATE)
-                                                       {
-                                                               replyBuffer[0] = FIND_LEADER_RES;
-                                                               if(sendto(udpServerPollSock.fd, (void *)replyBuffer,
-                                                                       sizeof(char), 0,(struct sockaddr *)&clientAddr, socklen) == -1)
-                                                               {       perror("udpListen():sendto()"); }
-                                                       }
-                                                       break;
-                                               case REBUILD_REQ:
-                                                       if (state == LEAD_NORMAL_STATE || state == LEAD_REBUILD1_STATE
-                                                               || state == LEAD_REBUILD2_STATE || state == LEAD_REBUILD3_STATE)
-                                                       {
-                                                               replyBuffer[0] = REBUILD_RES;
-                                                               if (sendto(udpServerPollSock.fd, (void *)replyBuffer,
-                                                                       sizeof(char), 0, (struct sockaddr *)&clientAddr, socklen) == -1)
-                                                               {       perror("udpListen():sendto()"); }
-                                                               if (gettimeofday(&rebuild1Timeout, NULL) < 0)
-                                                               {       perror("dhtLog():gettimeofday()"); }
-                                                               //TODO: make this a configurable parameter
-                                                               rebuild1Timeout.tv_sec += 3;
-                                                               rebuild1TimerSet = 1;
-                                                               //clear out previous host data
-                                                               numHosts = 1;
-                                                               hostArray[0] = myHostData;
-
-                                                               state = LEAD_REBUILD1_STATE;
-
-                                                               replyBuffer[0] = REBUILD_CMD;
-                                                               if (sendto(udpServerPollSock.fd, (void *)replyBuffer,
-                                                                       sizeof(char), 0, (struct sockaddr *)&bcastAddr, socklen) == -1)
-                                                               {       perror("udpListen():sendto()"); }
-                                                               
-                                                       }
-                                                       else
-                                                       {
-                                                               replyBuffer[0] = NOT_LEADER;
-                                                               if(sendto(udpServerPollSock.fd, (void *)replyBuffer,
-                                                                       sizeof(char), 0,(struct sockaddr *)&clientAddr, socklen) == -1)
-                                                               {       perror("udpListen():sendto()"); }
-                                                       }
-                                               case REBUILD_CMD:
-                                                       if (state != LEAD_REBUILD1_STATE)
-                                                       {
-                                                               //consider this an official declaration of authority,
-                                                               // in case I was confused about this
-                                                               leader = htonl(clientAddr.sin_addr.s_addr);
-                                                               
-                                                               clearDHTdata();
-
-                                                               joinReqPtr = (struct joinReq *)replyBuffer;
-                                                               joinReqPtr->msgType = JOIN_REQ;
-                                                               joinReqPtr->unused = 0;
-                                                               joinReqPtr->newHostData = myHostData;
-                                                               //note: I'm reusing bytesReceived and buffer
-                                                               bytesReceived = udpSendWaitForResponse(leader, UDP_PORT,
-                                                                       (void *)replyBuffer, sizeof(struct joinReq), (void *)buffer,
-                                                                       BUFFER_SIZE, TIMEOUT_MS, MAX_RETRIES);
-                                                               if ((bytesReceived == sizeof(char)) && (buffer[0] == JOIN_RES))
-                                                                       state = REBUILD1_STATE;
-                                                               else
-                                                                       initRebuild();
-                                                       }
-                                                       break;
-                                               case JOIN_REQ:
-                                                       if (state == LEAD_REBUILD1_STATE)
-                                                       {
-                                                               joinReqPtr = (struct joinReq *)buffer;
-                                                               addHost(joinReqPtr->newHostData);
-
-                                                               replyBuffer[0] = JOIN_RES;
-                                                               if (sendto(udpServerPollSock.fd, (void *)replyBuffer,
-                                                                       sizeof(char), 0,(struct sockaddr *)&clientAddr, socklen) == -1)
-                                                               {       perror("udpListen():sendto()"); }
-                                                       }
-                                                       break;
-                                               case GET_DHT_INFO_CMD:
-                                                       if (state == REBUILD1_STATE)
-                                                       {
-                                                               getDHTdata();
-                                                               state = REBUILD2_STATE;
-                                                       }
-                                                       break;
-                                               default:
-                                                       dhtLog("udpListen(): ERROR: Unknown message type\n");
-                                       }
-                               }
+                               perror("dhtRemove():poll()");
+                               break;
                        }
-               } //end (pollret > 0)
-               else // (pollret == 0), timeout
-               {
-                       if (gettimeofday(&now, NULL) < 0)
-                       {       perror("dhtLog():gettimeofday()"); }
-                       if (rebuild1TimerSet && timercmp(&now, &rebuild1Timeout, >))
+                       if (retval > 0)
                        {
-                               rebuild1TimerSet = 0;
-                               if (state == LEAD_REBUILD1_STATE)
+                               bytesRcvd = recvfrom(pollsock.fd, inBuffer, 2, 0,
+                                       (struct sockaddr *)&fromAddr, &socklen);
+                               if (fromAddr.sin_addr.s_addr == toAddr.sin_addr.s_addr
+                                       && fromAddr.sin_port == toAddr.sin_port
+                                       && bytesRcvd == 2 && inBuffer[0] == REMOVE_RES)
                                {
-                                       makeAssignments();
-                                       dhtLog("udpListen(): assignments made\n");
-                                       writeDHTdata(logfile);
-                                       if (hostRebuildStates != NULL)
-                                               free(hostRebuildStates);
-                                       hostRebuildStates = calloc(numHosts, sizeof(unsigned int));
-                                       for (i = 0; i < numHosts; i++)
-                                               hostRebuildStates[i] = REBUILD1_STATE;
-                                       state = LEAD_REBUILD2_STATE;
-                                       replyBuffer[0] = GET_DHT_INFO_CMD;
-                                       if (sendto(udpServerPollSock.fd, (void *)replyBuffer,
-                                               sizeof(char), 0, (struct sockaddr *)&bcastAddr, socklen) == -1)
-                                       {       perror("udpListen():sendto()"); }
+                                       status = inBuffer[1]; //status from remote host
+                                       break;
                                }
                        }
                }
+               if (!(status == OPERATION_OK || status == KEY_NOT_FOUND))
+               {
+                       pthread_mutex_lock(&stateMutex);
+                       setState(REBUILD0_STATE);
+                       outBuffer[0] = REBUILD_REQ;
+                       udpSend(outBuffer, 1, leader);
+                       pthread_mutex_unlock(&stateMutex);
+               }
        }
+
+       close(pollsock.fd);
+
+       return status;
 }
 
-int udpSendWaitForResponse(unsigned int dest_ip, unsigned short dest_port,
-       void *msg, unsigned int msglen, void *resBuffer, unsigned int resBufferSize,
-       unsigned int timeout, unsigned int numRetries)
+int dhtRemoveMult(unsigned int numKeys, unsigned int *keys)
 {
-       struct sockaddr_in server_addr;
-       struct sockaddr_in ack_addr;
-       socklen_t socklen = sizeof(struct sockaddr_in);
-       struct pollfd pollsock;
-       int retval;
+       int status;
        int i;
-       ssize_t bytesReceived;
-
-       bzero((char *) &server_addr, sizeof(server_addr));
-       server_addr.sin_family = AF_INET;
-       server_addr.sin_port = htons(dest_port);
-       server_addr.sin_addr.s_addr = htonl(dest_ip);
 
-       if ((pollsock.fd = socket(PF_INET, SOCK_DGRAM, IPPROTO_UDP)) == -1)
+       status = 0;
+       for (i = 0; i < numKeys; i++)
        {
-               perror("udpSendWaitForResponse():socket()");
-               return -1;
+               if (dhtRemove(keys[i]) != 0)
+                       status = -1;
        }
-       
-       pollsock.events = POLLIN;
-       
-       for (i = 0; i < MAX_RETRIES; i++)
-       {
-               if (i > 0)
-                       dhtLog("udpSendWaitForResponse(): trying again, count: %d\n", i+1);
-               if (sendto(pollsock.fd, msg, msglen, 0, (struct sockaddr *)&server_addr,
-                       socklen) == -1)
-               {
-                       perror("udpSendWaitForResponse():sendto");
-                       return -1;
-               }
-               dhtLog("udpSendWaitForResponse(): message sent\n");
-               retval = poll(&pollsock, 1, timeout);
-               if (retval < 0)
-               {
-                       perror("udpSendWaitForResponse():poll()");
-               }
-               else if (retval > 0)
-               {
-                       bytesReceived = recvfrom(pollsock.fd, resBuffer, resBufferSize, 0,
-                               (struct sockaddr *)&ack_addr, &socklen);
-                       if ((ack_addr.sin_addr.s_addr == server_addr.sin_addr.s_addr)
-                       && (ack_addr.sin_port == server_addr.sin_port))
-                       {
-                               close(pollsock.fd);
-                               dhtLog("udpSendWaitForResponse(): received response\n");
-                               return bytesReceived;
-                       }
-               }
-       }
-       close(pollsock.fd);
-       printf("udpSendWaitForResponse(): timed out, no ack\n");
-       return -1;
+       return status;
 }
 
-int udpBroadcastWaitForResponse(unsigned int *reply_ip,
-       unsigned short dest_port, void *msg, unsigned int msglen, void *resBuffer,
-       unsigned int resBufferSize, unsigned int timeout, unsigned int numRetries)
+int dhtSearch(unsigned int key, unsigned int *val)
 {
-       struct sockaddr_in server_addr;
-       struct sockaddr_in ack_addr;
+       struct sockaddr_in toAddr;
+       struct sockaddr_in fromAddr;
        socklen_t socklen = sizeof(struct sockaddr_in);
        struct pollfd pollsock;
-       int retval;
+       char inBuffer[6];
+       char outBuffer[5];
+       ssize_t bytesRcvd;
        int i;
-       ssize_t bytesReceived;
-       int on;
-
-       bzero((char *) &server_addr, sizeof(server_addr));
-       server_addr.sin_family = AF_INET;
-       server_addr.sin_port = htons(dest_port);
-       server_addr.sin_addr.s_addr = htonl(0xFFFFFFFF);
+       int retval;
+       int status = -1;
 
-       if ((pollsock.fd = socket(PF_INET, SOCK_DGRAM, IPPROTO_UDP)) == -1)
-       {
-               perror("udpBroadcastWaitForResponse():socket()");
-               return -1;
-       }
+       bzero((char *)&toAddr, socklen);
+       toAddr.sin_family = AF_INET;
+       toAddr.sin_port = htons(UDP_PORT);
 
-       on = 1;
-       if (setsockopt(pollsock.fd, SOL_SOCKET, SO_BROADCAST, &on, sizeof(on)) == -1)
+       while (!(status == OPERATION_OK || status == KEY_NOT_FOUND))
        {
-               perror("udpBroadcastWaitForResponse():setsockopt()");
-               return -1;
-       }
+               pthread_mutex_lock(&stateMutex);
+               while (numBlocks == 0)
+                       pthread_cond_wait(&stateCond, &stateMutex);
+               toAddr.sin_addr.s_addr = htonl(getKeyOwner(key));
+               pthread_mutex_unlock(&stateMutex);
 
-       pollsock.events = POLLIN;
-       
-       for (i = 0; i < MAX_RETRIES; i++)
-       {
-               if (i > 0)
-                       dhtLog("udpBroadcastWaitForResponse(): trying again, count: %d\n", i+1);
-               if (sendto(pollsock.fd, msg, msglen, 0, (struct sockaddr *)&server_addr,
-                       socklen) == -1)
+               if ((pollsock.fd = socket(AF_INET, SOCK_DGRAM, 0)) < 0)
                {
-                       perror("udpBroadcastWaitForResponse():sendto()");
+                       perror("dhtSearch():socket()");
                        return -1;
                }
-               dhtLog("udpBroadcastWaitForResponse(): message sent\n");
-               retval = poll(&pollsock, 1, timeout);
-               if (retval !=0)
+               pollsock.events = POLLIN;
+
+               outBuffer[0] = SEARCH_CMD;
+               write4(&outBuffer[1], key);
+
+               for (i = 0; i < SEARCH_RETRIES; i++)
                {
-                       bytesReceived = recvfrom(pollsock.fd, resBuffer, resBufferSize, 0,
-                               (struct sockaddr *)&ack_addr, &socklen);
-                       close(pollsock.fd);
-                       *reply_ip = htonl(ack_addr.sin_addr.s_addr);
-                       dhtLog("udpBroadcastWaitForResponse(): received response\n");
-                       return bytesReceived;
+                       if (sendto(pollsock.fd, outBuffer, 5, 0, (struct sockaddr *)&toAddr,
+                               socklen) < 0)
+                       {
+                               perror("dhtSearch():sendto()");
+                               break;
+                       }
+                       retval = poll(&pollsock, 1, SEARCH_TIMEOUT_MS);
+                       if (retval < 0)
+                       {
+                               perror("dhtSearch():poll()");
+                               break;
+                       }
+                       if (retval > 0)
+                       {
+                               bytesRcvd = recvfrom(pollsock.fd, inBuffer, 6, 0,
+                                       (struct sockaddr *)&fromAddr, &socklen);
+                               if (fromAddr.sin_addr.s_addr == toAddr.sin_addr.s_addr
+                                       && fromAddr.sin_port == toAddr.sin_port
+                                       && bytesRcvd == 6 && inBuffer[0] == SEARCH_RES)
+                               {
+                                       status = inBuffer[1]; //status from remote host
+                                       *val = read4(&inBuffer[2]);
+                                       break;
+                               }
+                       }
+               }
+               if (!(status == OPERATION_OK || status == KEY_NOT_FOUND))
+               {
+                       pthread_mutex_lock(&stateMutex);
+                       setState(REBUILD0_STATE);
+                       outBuffer[0] = REBUILD_REQ;
+                       udpSend(outBuffer, 1, leader);
+                       pthread_mutex_unlock(&stateMutex);
                }
        }
+
        close(pollsock.fd);
-       dhtLog("udpBroadcastWaitForResponse(): timed out, no ack\n");
-       return -1;
+
+       return status;
 }
 
-// use TCP for potentially large and/or important data transfer
-void *tcpListen()
+int dhtSearchMult(unsigned int numKeys, unsigned int *keys, unsigned int *vals)
 {
-       struct sockaddr_in myAddr;
-       struct sockaddr_in clientAddr;
-       int tcpAcceptSock;
-       socklen_t socklen = sizeof(struct sockaddr_in);
-       pthread_t threadTcpAccept;
-
-       tcpListenSock = socket(AF_INET, SOCK_STREAM, 0);
-       if (tcpListenSock == -1)
+       int i;
+       int status = 0;
+       for (i = 0; i < numKeys; i++)
        {
-               perror("tcpListen():socket()");
-               pthread_exit(NULL);
+               if (dhtSearch(keys[i], &vals[i]) != 0)
+                       status = -1;
        }
+       return status;
+}
 
-       myAddr.sin_family = AF_INET;
-       myAddr.sin_port = htons(TCP_PORT);
-       myAddr.sin_addr.s_addr = INADDR_ANY;
-       memset(&(myAddr.sin_zero), '\0', 8);
-
-       if (bind(tcpListenSock, (struct sockaddr *)&myAddr, socklen) == -1)
-       {
-               perror("tcpListen():socket()");
-               pthread_exit(NULL);
-       }
+/*******************************************************************************
+*                      Local Function Definitions
+*******************************************************************************/
 
-       if (listen(tcpListenSock, BACKLOG) == -1)
-       {
-               perror("tcpListen():listen()");
-               pthread_exit(NULL);
-       }
+int msgSizeOk(unsigned char *msg, unsigned int size)
+{
+       unsigned short tmpNumHosts;
+       unsigned short tmpNumBlocks;
 
-       dhtLog("tcpListen(): listening on port %d\n", TCP_PORT);
+       if (size < 1)
+               return 1;
 
-       while(1)
+       switch (msg[0])
        {
-               tcpAcceptSock = accept(tcpListenSock, (struct sockaddr *)&clientAddr,
-                       &socklen);
-               pthread_create(&threadTcpAccept, NULL, tcpAccept, (void *)tcpAcceptSock);
+               case WHO_IS_LEADER_CMD:
+               case LEAVE_REQ:
+               case LEAVE_RES:
+               case DHT_UPDATE_RES:
+               case REBUILD_REQ:
+               case REBUILD_CMD:
+               case FILL_DHT_CMD:
+               case FILL_DHT_RES:
+               case RESUME_NORMAL_CMD:
+               case RESUME_NORMAL_RES:
+                       return (size == 1);
+               case INSERT_RES:
+               case REMOVE_RES:
+               case JOIN_RES:
+                       return (size == 2);
+               case REMOVE_CMD:
+               case SEARCH_CMD:
+               case WHO_IS_LEADER_RES:
+               case JOIN_REQ:
+               case ELECT_LEADER_CMD:
+                       return (size == 5);
+               case SEARCH_RES:
+                       return (size == 6);
+               case INSERT_CMD:
+                       return (size == 9);
+               case DHT_UPDATE_CMD:
+                       if (size < 5)
+                               return 1;
+                       tmpNumHosts = read2(&msg[1]);
+                       tmpNumBlocks = read2(&msg[3]);
+                       return (size == (5+sizeof(struct hostData)*tmpNumHosts+2*tmpNumBlocks));
+               case ELECT_LEADER_RES:
+                       if (size < 2)
+                               return 1;
+                       if (msg[1] == 0xFF)
+                               return (size == 2);
+                       if (size < 4)
+                               return 1;
+                       tmpNumHosts = read2(&msg[2]);
+                       return (size == (4 + sizeof(struct hostData) * tmpNumHosts));
+               case CONGRATS_CMD:
+                       if (size < 3)
+                               return 1;
+                       tmpNumHosts = read2(&msg[1]);
+                       return (size == (3 + sizeof(struct hostData) * tmpNumHosts));
+               default:
+                       return 1;
        }
 }
 
-void *tcpAccept(void *arg)
+unsigned short read2(unsigned char *ptr)
 {
-       int tcpAcceptSock = (int)arg;
-       int bytesReceived;
-       char msgType;
-
-       dhtLog("tcpAccept(): accepted tcp connection, file descriptor: %d\n",
-               tcpAcceptSock);
-
-       bytesReceived = recv(tcpAcceptSock, &msgType, sizeof(char), 0);
-       if (bytesReceived == -1)
-       {       perror("tcpAccept():recv()");   }
-       else if (bytesReceived == 0)
-       {
-               dhtLog( "tcpAccept(): bytesReceived = 0\n", tcpAcceptSock);
-       }
-       else
-       {
-               switch (msgType)
-               {
-                       case DHT_INFO_REQ:
-                               if (send(tcpAcceptSock, &numHosts, sizeof(numHosts), 0) == -1)
-                               {
-                                       perror("tcpAccept():send()");
-                                       break;
-                               }
-                               if (send(tcpAcceptSock, &numBlocks, sizeof(numBlocks), 0) == -1)
-                               {
-                                       perror("tcpAccept():send()");
-                                       break;
-                               }
-                               if (send(tcpAcceptSock, hostArray, numHosts*sizeof(struct hostData),
-                                               0) == -1)
-                               {
-                                       perror("tcpAccept():send()");
-                                       break;
-                               }
-                               if (send(tcpAcceptSock, blockOwnerArray, numBlocks*sizeof(unsigned int),
-                                               0) == -1)
-                               {
-                                       perror("tcpAccept():send()");
-                                       break;
-                               }
-                               break;
-                       default:
-                               dhtLog("tcpAccept(): unrecognized msg type\n");
-               }
-       }
-
-       if (close(tcpAcceptSock) == -1)
-       {       perror("tcpAccept():close()"); }
+       unsigned short tmp = (ptr[1] << 8) | ptr[0];
+       return tmp;
+}
 
-       dhtLog("tcpAccept(): closed tcp connection, file descriptor: %d\n",
-               tcpAcceptSock);
+unsigned int read4(unsigned char *ptr)
+{
+       unsigned int tmp = (ptr[3] << 24) | (ptr[2] << 16) | (ptr[1] << 8) | ptr[0];
+       return tmp;
+}
 
-       pthread_exit(NULL);
+void write2(unsigned char *ptr, unsigned short tmp)
+{
+       ptr[1] = (tmp >> 8) & 0xFF;
+       ptr[0] = tmp & 0xFF;
+       return;
 }
 
-unsigned int getKeyOwner(unsigned int key)
+void write4(unsigned char *ptr, unsigned int tmp)
 {
-       if (state == NORMAL_STATE || state == LEAD_NORMAL_STATE
-               || state == REBUILD3_STATE || state == LEAD_REBUILD3_STATE)
-       {
-               return hostArray[blockOwnerArray[hash(key)]].ipAddr;
-       }
-       else
-       { //TODO: figure out what is best to do here. Would like calls to dhtSearch,
-               // etc. to block rather than fail during rebuilds
-               return 0;
-       }
+       ptr[3] = (tmp >> 24) & 0xFF;
+       ptr[2] = (tmp >> 16) & 0xFF;
+       ptr[1] = (tmp >> 8) & 0xFF;
+       ptr[0] = tmp & 0xFF;
+       return;
 }
 
-unsigned int getMyIpAddr()
+unsigned int getMyIpAddr(const char *interfaceStr)
 {      
        int sock;
        struct ifreq interfaceInfo;
@@ -948,7 +758,7 @@ unsigned int getMyIpAddr()
                return 1;
        }
 
-       strcpy(interfaceInfo.ifr_name, DEFAULT_INTERFACE);
+       strcpy(interfaceInfo.ifr_name, interfaceStr);
        myAddr->sin_family = AF_INET;
        
        if(ioctl(sock, SIOCGIFADDR, &interfaceInfo) != 0)
@@ -960,296 +770,260 @@ unsigned int getMyIpAddr()
        return ntohl(myAddr->sin_addr.s_addr);
 }
 
-unsigned int findLeader()
+int udpSend(unsigned char *msg, unsigned int size, unsigned int destIp)
 {
-       unsigned int reply_ip;
-       int bytesReceived;
-       char myMessage;
-       char response;
-
-       dhtLog("findLeader(): broadcasting...\n");
-
-       myMessage = FIND_LEADER_REQ;
+       struct sockaddr_in peerAddr;
+       socklen_t socklen = sizeof(struct sockaddr_in);
 
-       bytesReceived = udpBroadcastWaitForResponse(&reply_ip, UDP_PORT,
-               (void *)&myMessage, sizeof(myMessage), (void *)&response,
-               sizeof(response), TIMEOUT_MS, MAX_RETRIES);
+       bzero(&peerAddr, socklen);
+       peerAddr.sin_family = AF_INET;
+       peerAddr.sin_addr.s_addr = htonl(destIp);
+       peerAddr.sin_port = htons(UDP_PORT);
 
-       if (bytesReceived == -1)
+       if (size >= 1)
        {
-               dhtLog("findLeader(): no response\n");
-               return 0;
-       }
-       else if (response == FIND_LEADER_RES)
-       {
-               struct in_addr reply_addr;
-               reply_addr.s_addr = htonl(reply_ip);
-               dhtLog("findLeader(): leader found:%s\n",
-                                       inet_ntoa(reply_addr));
-               return reply_ip;
+               if (msg[0] < NUM_MSG_TYPES)
+                       dhtLog("udpSend(): sending %s to %s, %d bytes\n", msg_types[msg[0]],
+                               inet_ntoa(peerAddr.sin_addr), size);
+               else
+                       dhtLog("udpSend(): sending unknown message to %s, %d bytes\n",
+                               inet_ntoa(peerAddr.sin_addr), size);
        }
-       else
+
+       if (sendto(udpPollSock.fd, (void *)msg, size, 0, (struct sockaddr *)&peerAddr,
+                       socklen) < 0)
        {
-               dhtLog("findLeader(): unexpected response\n");
-               return 0;
+               perror("udpSend():sendto()");
+               return -1;
        }
+       
+       return 0;
 }
 
-int getDHTdata()
+int udpSendAll(unsigned char *msg, unsigned int size)
 {
-       struct sockaddr_in leader_addr;
-       int sock;
-       char msg;
-       int bytesReceived;
-
-       clearDHTdata();
-
-       if ((sock = socket(AF_INET, SOCK_STREAM, 0)) == -1)
+       int i;
+       int status = 0;
+       for (i = 0; i < numHosts; i++)
        {
-               perror("getDHTdata():socket()");
-               return -1;
+               if ((hostReplied[i] == 0) && (hostArray[i].ipAddr != myHostData.ipAddr))
+               {
+                       if (udpSend(msg, size, hostArray[i].ipAddr) != 0)
+                               status = -1;
+               }
        }
+       return status;
+}
 
-       bzero((char *)&leader_addr, sizeof(leader_addr));
-       leader_addr.sin_family = AF_INET;
-       leader_addr.sin_port = htons(TCP_PORT);
-       leader_addr.sin_addr.s_addr = htonl(leader);
+//note: make sure this is only executed in a valid state, where numBlocks != 0
+unsigned int hash(unsigned int x)
+{
+       return (x % numBlocks);
+}
 
-       if (connect(sock, (struct sockaddr *)&leader_addr, sizeof(leader_addr)) == -1)
-       {
-               perror("getDHTdata():connect()");
-               close(sock);
-               return -1;
-       }
-       msg = DHT_INFO_REQ;
-       if (send(sock, &msg, sizeof(char), 0) == -1)
-       {
-               perror("getDHTdata():send()");
-               close(sock);
-               return -1;
-       }
-       bytesReceived = recv(sock, &numHosts, sizeof(numHosts), 0);
-       if (bytesReceived == -1)
-       {
-               perror("getDHTdata():recv()");
-               close(sock);
-               return -1;
-       }
-       if (bytesReceived != sizeof(numHosts))
-       {
-               dhtLog("getDHTdata(): ERROR: numHosts not completely received\n");
-               close(sock);
-               return -1;
-       }
-       bytesReceived = recv(sock, &numBlocks, sizeof(numBlocks), 0);
-       if (bytesReceived == -1)
-       {
-               perror("getDHTdata():recv()");
-               close(sock);
-               return -1;
-       }
-       if (bytesReceived != sizeof(numBlocks))
-       {
-               dhtLog("getDHTdata(): ERROR: numBlocks not completely received\n");
-               close(sock);
-               return -1;
-       }
-       hostArray = calloc(numHosts, sizeof(struct hostData));
-       bytesReceived = recv(sock, hostArray, numHosts*sizeof(struct hostData), 0);
-       if (bytesReceived == -1)
-       {
-               perror("getDHTdata():recv()");
-               close(sock);
-               return -1;
-       }
-       if (bytesReceived != numHosts*sizeof(struct hostData))
-       {
-               dhtLog("getDHTdata(): ERROR: hostArray not completely received\n");
-               close(sock);
-               return -1;
-       }
-       blockOwnerArray = calloc(numBlocks, sizeof(unsigned int));
-       bytesReceived = recv(sock,blockOwnerArray,numBlocks*sizeof(unsigned int),0);
-       if (bytesReceived == -1)
+//note: make sure this is only executed in a valid state, where these arrays
+// are allocated and the index mappings are consistent
+unsigned int getKeyOwner(unsigned int key)
+{
+       return hostArray[blockOwnerArray[hash(key)]].ipAddr;
+}
+
+//sets state and timer, if applicable
+void setState(unsigned int newState)
+{
+       struct timeval now;
+       int i;
+       
+       gettimeofday(&now, NULL);
+       
+       if (newState >= NUM_STATES)
        {
-               perror("getDHTdata():recv()");
-               close(sock);
-               return -1;
+               dhtLog("setState(): ERROR: invalid state %d\n", newState);
        }
-       if (bytesReceived != numBlocks*sizeof(unsigned int))
+       else
        {
-               dhtLog("getDHTdata(): ERROR: blockOwnerArray not completely received\n");
-               close(sock);
-               return -1;
+               if (timeout_vals[newState].tv_sec == 0
+                       && timeout_vals[newState].tv_usec == 0)
+               { //no timer
+                       timerSet = 0;
+               }
+               else
+               {
+                       timeradd(&now, &timeout_vals[newState], &timer);
+                       timerSet = 1;
+               }
+               timeoutCntr = 0;
+               state = newState;
+               //TODO: only do this for states that require it
+               for (i = 0; i < numHosts; i++)
+                       hostReplied[i] = 0;
+
+               dhtLog("setState(): state set to %s\n", state_names[state]);
        }
-       dhtLog("getDHTdata(): got data:\n");
-       writeDHTdata(logfile);
 
-       return 0;
+       return;
 }
 
-unsigned int hash(unsigned int x)
+//TODO: improve these simple and inefficient functions
+int checkReplied(unsigned int ipAddr)
 {
-       //this shouldn't be called when numBlocks = 0, so if you get a divide-by-zero,
-       // make sure we are in a proper state for key owner lookups
-               return x % numBlocks;
+       int i;
+
+       i = findHost(ipAddr);
+
+       if (i == -1)
+               return -1;
+
+       hostReplied[i] = 1;
+
+       return 0;
 }
 
-//This function will not return until it succeeds in submitting
-// a rebuild request to the leader. It is then the leader's responibility
-// to ensure that the rebuild is caried out
-void initRebuild()
+int allReplied()
 {
-       int bytesReceived;
-       char msg;
-       char response;
-       int done;
-       int retry_count;
        int i;
 
-       done = 0;
-       retry_count = 0;
-
-       while (!done)
-       {
-               if (retry_count > 0)
-               {
-                       dhtLog("initRebuild(): retry count:%d\n", retry_count);
-               }
-
-               if (leader == 0 || retry_count > 0)
-               {
-                       leader = findLeader(); //broadcast
-                       if (leader == 0) //no response
-                       {
-                               //TODO:elect leader: this will do for now
-                               initDHTdata();
-                               leader = getMyIpAddr();
-                               state = LEAD_NORMAL_STATE;
-                       }
-               }
+       for (i = 0; i < numHosts; i++)
+               if ((hostReplied[i] == 0) && (hostArray[i].ipAddr != myHostData.ipAddr))
+                       return 0;
        
-               msg = REBUILD_REQ;
-
-               bytesReceived = udpSendWaitForResponse(leader, UDP_PORT,
-                       (void *)&msg, sizeof(msg), (void *)&response, sizeof(response),
-                       TIMEOUT_MS, MAX_RETRIES);
-               if (bytesReceived == -1)
-               {       perror("initRebuild():recv()"); }
-               else if (bytesReceived != sizeof(response))
-               {
-                       dhtLog("initRebuild(): ERROR: response not completely received\n");
-               }
-               else if (response == NOT_LEADER)
-               {
-                       struct in_addr address;
-                       address.s_addr = htonl(leader);
-                       dhtLog("initRebuild(): ERROR: %s no longer leader\n",
-                               inet_ntoa(address));
-               }
-               else if (response != REBUILD_RES)
-               {
-                       dhtLog("initRebuild(): ERROR: unexpected response\n");
-               }
-               else
-               {
-                       dhtLog("initRebuild(): submitted rebuild request\n");
-                       writeDHTdata(logfile);
-                       done = 1;
-               }
-       }
-       return;
+       return 1;
 }
 
-void writeDHTdata(FILE *outfile)
+int findHost(unsigned int ipAddr)
 {
        int i;
-       struct in_addr address;
 
-       fprintf(outfile,"numHosts=%d,numBlocks=%d\n", numHosts, numBlocks);
-       fprintf(outfile,"hostArray: index: ipAddr, maxKeyCapacity\n");
        for (i = 0; i < numHosts; i++)
-       {
-               address.s_addr = htonl(hostArray[i].ipAddr);
-               fprintf(outfile,"%d: %s, %d\n", i, inet_ntoa(address),
-                       hostArray[i].maxKeyCapacity);
-       }
-       fprintf(outfile,"blockOwnerArray: index: blockOwner\n");
-       for (i = 0; i < numBlocks; i++)
-               fprintf(outfile,"%d: %d  ", i, blockOwnerArray[i]);
-       fprintf(outfile,"\n");
+               if (hostArray[i].ipAddr == ipAddr)
+                       return i; //found, return index
+       
+       return -1; //not found
 }
 
-void clearDHTdata()
+int removeHost(unsigned int ipAddr)
 {
-       if (hostArray != NULL)
+       int i, j;
+
+       i = findHost(ipAddr);
+
+       if (i == -1)
+               return -1;
+
+       for (j = 0; j < numBlocks; j++)
        {
-               free(hostArray);
-               hostArray = NULL;
+               if (blockOwnerArray[j] == i)
+                       blockOwnerArray[j] = 0; //TODO: is this what I want to have happen?
+               else if (blockOwnerArray[j] > i)
+                       blockOwnerArray[j]--;
        }
-       if (blockOwnerArray != NULL)
+
+       for (; i < numHosts - 1; i++)
        {
-               free(blockOwnerArray);
-               blockOwnerArray = NULL;
+               hostArray[i] = hostArray[i+1];
+               hostReplied[i] = hostReplied[i+1];
        }
-       numHosts = numBlocks = hostArraySize = 0;
-       return;
+       numHosts--;
+
+       return 0;
 }
 
-void initDHTdata()
+void removeUnresponsiveHosts()
 {
        int i;
 
-       clearDHTdata();
-       hostArraySize = INIT_HOST_ALLOC;
-       hostArray = calloc(hostArraySize, sizeof(struct hostData));
-       numHosts = 1;
-       hostArray[0] = myHostData;
-       numBlocks = INIT_BLOCK_NUM;
-       blockOwnerArray = calloc(numBlocks, sizeof(unsigned int));
-       for (i = 0; i < numBlocks; i++)
-               blockOwnerArray[i] = 0;
-       
-       return;
+       for (i = 0; i < numHosts; i++)
+       {
+               if (!hostReplied[i] && hostArray[i].ipAddr != myHostData.ipAddr)
+                       removeHost(hostArray[i].ipAddr);
+       }
 }
 
-void addHost(struct hostData newHost)
+int addHost(struct hostData newHost)
 {
-       struct hostData *newArray;
-       unsigned int newArraySize;
+       struct hostData *newHostArray;
+       unsigned char *newHostReplied;
+       int i;
+       int j;
 
-       if (hostArray == NULL || blockOwnerArray == NULL || hostArraySize == 0)
-               initDHTdata();
+       for (i = 0; i < numHosts; i++)
+       {
+               if (hostArray[i].ipAddr == newHost.ipAddr)
+               {
+                       hostArray[i] = newHost;
+                       hostReplied[i] = 0;
+                       return 0;
+               }
+               else if (hostArray[i].ipAddr > newHost.ipAddr)
+               {
+                       if (numHosts == hostArraySize)
+                       {
+                               newHostArray = calloc(2 * hostArraySize, sizeof(struct hostData));
+                               newHostReplied = calloc(2 * hostArraySize, sizeof(unsigned char));
+                               memcpy(newHostArray, hostArray, (i * sizeof(struct hostData)));
+                               memcpy(newHostReplied, hostReplied, (i * sizeof(unsigned char)));
+                               newHostArray[i] = newHost;
+                               newHostReplied[i] = 0;
+                               memcpy(&newHostArray[i+1], &hostArray[i], ((numHosts - i) *
+                                       sizeof(struct hostData)));
+                               memcpy(&newHostReplied[i+1], &hostReplied[i], ((numHosts - i) *
+                                       sizeof(unsigned char)));
+                               free(hostArray);
+                               free(hostReplied);
+                               hostArray = newHostArray;
+                               hostReplied = newHostReplied;
+                               hostArraySize = 2 * hostArraySize;
+                       }
+                       else
+                       {
+                               for (j = numHosts; j > i; j--)
+                               {
+                                       hostArray[j] = hostArray[j-1];
+                                       hostReplied[j] = hostReplied[j-1];
+                               }
+                               hostArray[i] = newHost;
+                               hostReplied[i] = 0;
+                       }
+                       for(j = 0; j < numBlocks; j++)
+                       {
+                               if (blockOwnerArray[j] >= i)
+                                       blockOwnerArray[j]++;
+                       }
+                       numHosts++;
+                       return 1;
+               }
+       }
 
+       //nothing greater, add to end
        if (numHosts == hostArraySize)
        {
-               newArraySize = hostArraySize * 2;
-               newArray = calloc(newArraySize, sizeof(struct hostData));
-               memcpy(newArray, hostArray, (hostArraySize * sizeof(struct hostData)));
+               newHostArray = calloc(2 * hostArraySize, sizeof(struct hostData));
+               newHostReplied = calloc(2 * hostArraySize, sizeof(unsigned char));
+               memcpy(newHostArray, hostArray, (numHosts * sizeof(struct hostData)));
+               memcpy(newHostReplied, hostReplied, (numHosts * sizeof(unsigned char)));
                free(hostArray);
-               hostArray = newArray;
-               hostArraySize = newArraySize;
+               free(hostReplied);
+               hostArray = newHostArray;
+               hostReplied = newHostReplied;
+               hostArraySize = 2 * hostArraySize;
        }
 
        hostArray[numHosts] = newHost;
+       hostReplied[numHosts] = 0;
        numHosts++;
-
-       return;
+       return 1;
 }
 
 void makeAssignments()
 {
        int i;
 
-       if (hostArray == NULL || blockOwnerArray == NULL || hostArraySize == 0)
-               initDHTdata();
-       
        if (numBlocks < numHosts)
        {
                free(blockOwnerArray);
                while (numBlocks < numHosts)
                        numBlocks *= 2;
-               blockOwnerArray = calloc(numBlocks, sizeof(unsigned int));
+               blockOwnerArray = calloc(numBlocks, sizeof(unsigned short));
        }
 
        for (i = 0; i < numBlocks; i++)
@@ -1258,90 +1032,31 @@ void makeAssignments()
        return;
 }
 
-//returns not-zero if ok, zero if not ok
-int msgSizeOk(unsigned char type, unsigned int size)
+void writeHostList()
 {
-       int status;
+       int i;
+       struct in_addr tmpAddr;
 
-       switch (type)
+       fprintf(logfile, "numHosts = %d\n", numHosts);
+       for (i = 0; i < numHosts; i++)
        {
-               case INSERT_CMD:
-                       status = (size == sizeof(struct insertCmd));
-                       break;
-               case INSERT_RES:
-                       status = (size == sizeof(struct insertRes));
-                       break;
-               case REMOVE_CMD:
-                       status = (size == sizeof(struct removeCmd));
-                       break;
-               case REMOVE_RES:
-                       status = (size == sizeof(struct removeRes));
-                       break;
-               case SEARCH_CMD:
-                       status = (size == sizeof(struct searchCmd));
-                       break;
-               case SEARCH_RES:
-                       status = (size == sizeof(struct searchRes));
-                       break;
-               case FIND_LEADER_REQ:
-                       status = (size == sizeof(char));
-                       break;
-               case FIND_LEADER_RES:
-                       status = (size == sizeof(char));
-                       break;
-               case REBUILD_REQ:
-                       status = (size == sizeof(char));
-                       break;
-               case REBUILD_RES:
-                       status = (size == sizeof(char));
-                       break;
-               case NOT_LEADER:
-                       status = (size == sizeof(char));
-                       break;
-               case REBUILD_CMD:
-                       status = (size == sizeof(char));
-                       break;
-               case JOIN_REQ:
-                       status = (size == sizeof(struct joinReq));
-                       break;
-               case JOIN_RES:
-                       status = (size == sizeof(char));
-                       break;
-               case GET_DHT_INFO_CMD:
-                       status = (size == sizeof(char));
-                       break;
-               case DHT_INFO_REQ:
-                       status = (size == sizeof(char));
-                       break;
-               case DHT_INFO_RES:
-                       status = (size == sizeof(char));
-                       break;
-               case FILL_DHT_CMD:
-                       status = (size == sizeof(char));
-                       break;
-               case FILL_DHT_RES:
-                       status = (size == sizeof(char));
-                       break;
-               case REBUILD_DONE_INFO:
-                       status = (size == sizeof(char));
-                       break;
-               default:
-                       status = 0;
-                       break;
+               tmpAddr.s_addr = htonl(hostArray[i].ipAddr);
+               fprintf(logfile, "%d) %s, %d\n", i, inet_ntoa(tmpAddr),
+                       hostArray[i].maxKeyCapacity);
        }
-       return status;
+       return;
 }
 
 void dhtLog(const char *format, ...)
 {
        va_list args;
-       struct timeval now;
+//     struct timeval now;
 
-       if (gettimeofday(&now, NULL) < 0)
-       {       perror("dhtLog():gettimeofday()"); }
+//     if (gettimeofday(&now, NULL) < 0)
+//     {       perror("dhtLog():gettimeofday()"); }
        va_start(args, format);
-       if (fprintf(logfile, "%d.%06d:", now.tv_sec, now.tv_usec) < 0)
-       {       perror("dhtLog():fprintf()"); }
+//     if (fprintf(logfile, "%d.%06d:", now.tv_sec, now.tv_usec) < 0)
+//     {       perror("dhtLog():fprintf()"); }
        if (vfprintf(logfile, format, args) < 0)
        {       perror("dhtLog():vfprintf()"); }
        if (fflush(logfile) == EOF)
@@ -1351,6 +1066,686 @@ void dhtLog(const char *format, ...)
        return;
 }
 
-#endif
+void *fillTask()
+{
+       unsigned int *vals;
+       unsigned int *keys;
+       unsigned int numKeys;
+       int i;
+       
+       vals = mhashGetKeys(&numKeys); //note: key of mhash is val of dht
+       keys = calloc(numKeys, sizeof(unsigned int));
+
+       for (i = 0; i < numKeys; i++)
+               keys[i] = myHostData.ipAddr;
 
+       if (dhtInsertMult(numKeys, keys, vals) == 0)
+               fillStatus = 2;
+       else
+               fillStatus = 3;
+       
+       pthread_exit(NULL);
+}
+
+void *udpListen()
+{
+       ssize_t bytesRcvd;
+       struct sockaddr_in peerAddr;
+       unsigned int peerIp;
+       socklen_t socklen = sizeof(struct sockaddr_in);
+       unsigned char inBuffer[MAX_MSG_SIZE];
+       unsigned char outBuffer[MAX_MSG_SIZE];
+       int pollret;
+       struct timeval now;
+       struct in_addr tmpAddr;
+       struct hostData tmpHost;
+       unsigned int tmpKey;
+       unsigned int tmpVal;
+       struct hostData *hostDataPtr;
+       unsigned short *uShortPtr;
+       unsigned int tmpUInt;
+       unsigned int tmpUShort;
+       int i;
+       unsigned int oldState;
+
+       dhtLog("udpListen(): linstening on port %d...\n", UDP_PORT);
+
+       while (1)
+       {
+               pollret = poll(&udpPollSock, 1, TIMEOUT_PERIOD);
+               pthread_mutex_lock(&stateMutex);
+               oldState = state;
+               if (pollret < 0)
+               {
+                       perror("udpListen():poll()");
+               }
+               else if (pollret > 0)
+               {
+                       bytesRcvd = recvfrom(udpPollSock.fd, inBuffer, MAX_MSG_SIZE, 0,
+                               (struct sockaddr *)&peerAddr, &socklen);
+                       if (bytesRcvd < 1)
+                       {
+                               dhtLog("udpListen(): ERROR: bytesRcvd = %d\n", bytesRcvd);
+                       }
+                       else if (inBuffer[0] >= NUM_MSG_TYPES)
+                       {
+                               dhtLog("udpListen(): ERROR: unknown msg type = %d\n", inBuffer[0]);
+                       }
+                       else if (!msgSizeOk(inBuffer, bytesRcvd))
+                       {
+                               dhtLog("udpListen(): ERROR: msg size not ok: type = %s\n, size = %d\n",
+                                       msg_types[inBuffer[0]], bytesRcvd);
+                       }
+                       else if (state == EXIT2_STATE)
+                       {
+                               //do nothing
+                       }
+                       else if (state == INIT1_STATE)
+                       { //after initialization with seed, do not proceed until seed replies
+                               dhtLog("udpListen(): received %s from %s, %d bytes\n",
+                                       msg_types[inBuffer[0]], inet_ntoa(peerAddr.sin_addr), bytesRcvd);
+                               for (i = 0; i < bytesRcvd; i++)
+                                       dhtLog(" %x", inBuffer[i]);
+                               dhtLog("\n");
+                               peerIp = ntohl(peerAddr.sin_addr.s_addr);
+                               if (peerIp == seed && inBuffer[0] == WHO_IS_LEADER_RES)
+                               {
+                                       tmpHost.ipAddr = peerIp;
+                                       tmpHost.maxKeyCapacity = 0;
+                                       addHost(tmpHost);
+                                       writeHostList();
+                                       leader = read4(&inBuffer[1]);
+                                       tmpAddr.s_addr = htonl(leader);
+                                       dhtLog("leader = %s\n", inet_ntoa(tmpAddr));
+                                       if (leader != 0)
+                                       {
+                                               setState(INIT2_STATE);
+                                               outBuffer[0] = JOIN_REQ;
+                                               write4(&outBuffer[1], myHostData.maxKeyCapacity);
+                                               udpSend(outBuffer, 5, leader);
+                                       }
+                                       else
+                                       {
+                                               electionOriginator = myHostData.ipAddr;
+                                               setState(ELECT1_STATE);
+                                               outBuffer[0] = ELECT_LEADER_CMD;
+                                               write4(&outBuffer[1], myHostData.ipAddr); //originator = me
+                                               udpSendAll(outBuffer, 5);
+                                       }
+                               }
+                       }
+                       else
+                       {
+                               dhtLog("udpListen(): received %s from %s, %d bytes\n",
+                                       msg_types[inBuffer[0]], inet_ntoa(peerAddr.sin_addr), bytesRcvd);
+                               for (i = 0; i < bytesRcvd; i++)
+                                       dhtLog(" %x", inBuffer[i]);
+                               dhtLog("\n");
+                               peerIp = ntohl(peerAddr.sin_addr.s_addr);
+                               switch (inBuffer[0])
+                               {
+                                       case INSERT_CMD:
+                                               if (state == NORMAL_STATE || state == LEAD_NORMAL1_STATE
+                                                       || state == LEAD_NORMAL2_STATE || state == REBUILD4_STATE
+                                                       || state == REBUILD5_STATE || state == LEAD_REBUILD3_STATE)
+                                               {
+                                                       tmpKey = read4(&inBuffer[1]);
+                                                       tmpVal = read4(&inBuffer[5]);
+                                                       outBuffer[0] = INSERT_RES;
+                                                       if (getKeyOwner(tmpKey) == myHostData.ipAddr)
+                                                       {
+                                                               if (chashInsert(myHashTable, tmpKey, (void *)tmpVal) == 0)
+                                                                       outBuffer[1] = OPERATION_OK;
+                                                               else
+                                                                       outBuffer[1] = INTERNAL_ERROR;
+                                                       }
+                                                       else
+                                                       {
+                                                               outBuffer[1] = NOT_KEY_OWNER;
+                                                       }
+                                                       //reply to client socket
+                                                       sendto(udpPollSock.fd, outBuffer, 2, 0,
+                                                               (struct sockaddr *)&peerAddr, socklen);
+                                               }
+                                               break;
+                                       case REMOVE_CMD:
+                                               if (state == NORMAL_STATE || state == LEAD_NORMAL1_STATE
+                                                       || state == LEAD_NORMAL2_STATE)
+                                               {
+                                                       tmpKey = read4(&inBuffer[1]);
+                                                       outBuffer[0] = REMOVE_RES;
+                                                       if (getKeyOwner(tmpKey) == myHostData.ipAddr)
+                                                       {
+                                                               if (chashRemove(myHashTable, tmpKey) == 0)
+                                                                       outBuffer[1] = OPERATION_OK;
+                                                               else
+                                                                       outBuffer[1] = KEY_NOT_FOUND;
+                                                       }
+                                                       else
+                                                       {
+                                                               outBuffer[1] = NOT_KEY_OWNER;
+                                                       }
+                                                       //reply to client socket
+                                                       sendto(udpPollSock.fd, outBuffer, 2, 0,
+                                                               (struct sockaddr *)&peerAddr, socklen);
+                                               }
+                                               break;
+                                       case SEARCH_CMD:
+                                               if (state == NORMAL_STATE || state == LEAD_NORMAL1_STATE
+                                                       || state == LEAD_NORMAL2_STATE)
+                                               {
+                                                       tmpKey = read4(&inBuffer[1]);
+                                                       outBuffer[0] = SEARCH_RES;
+                                                       if (getKeyOwner(tmpKey) == myHostData.ipAddr)
+                                                       {
+                                                               if ((tmpVal = (unsigned int)chashSearch(myHashTable, tmpKey)) != 0)
+                                                               {
+                                                                       outBuffer[1] = OPERATION_OK;
+                                                                       write4(&outBuffer[2], tmpVal);
+                                                               }
+                                                               else
+                                                               {
+                                                                       outBuffer[1] = KEY_NOT_FOUND;
+                                                                       write4(&outBuffer[2], 0);
+                                                               }
+                                                       }
+                                                       else
+                                                       {
+                                                               outBuffer[1] = NOT_KEY_OWNER;
+                                                               write4(&outBuffer[2], 0);
+                                                       }
+                                                       //reply to client socket
+                                                       sendto(udpPollSock.fd, outBuffer, 6, 0,
+                                                               (struct sockaddr *)&peerAddr, socklen);
+                                               }
+                                               break;
+                                       case WHO_IS_LEADER_CMD:
+                                               tmpHost.ipAddr = peerIp;
+                                               tmpHost.maxKeyCapacity = 0;
+                                               addHost(tmpHost);
+                                               writeHostList();
+                                               outBuffer[0] = WHO_IS_LEADER_RES;
+                                               //leader == 0 means I don't know who it is
+                                               write4(&outBuffer[1], leader);
+                                               udpSend(outBuffer, 5, peerIp);
+                                               break;
+                                       case JOIN_REQ:
+                                               if (state == LEAD_NORMAL1_STATE || state == LEAD_NORMAL2_STATE)
+                                               {
+                                                       tmpHost.ipAddr = peerIp;
+                                                       tmpHost.maxKeyCapacity = read4(&inBuffer[1]);
+                                                       addHost(tmpHost);
+                                                       writeHostList();
+                                                       if (state == LEAD_NORMAL1_STATE)
+                                                               setState(LEAD_NORMAL2_STATE);
+                                                       outBuffer[0] = JOIN_RES;
+                                                       outBuffer[1] = 0; //status, success
+                                                       udpSend(outBuffer, 2, peerIp);
+                                               }
+                                               else if (state == LEAD_REBUILD1_STATE)
+                                               {
+                                                       //note: I don't need to addHost().
+                                                       checkReplied(peerIp);
+                                                       outBuffer[0] = JOIN_RES;
+                                                       outBuffer[1] = 0; //status, success
+                                                       udpSend(outBuffer, 2, peerIp);
+                                                       if (allReplied())
+                                                       {
+                                                               makeAssignments();
+                                                               setState(LEAD_REBUILD2_STATE);
+                                                               outBuffer[0] = DHT_UPDATE_CMD;
+                                                               write2(&outBuffer[1], numHosts);
+                                                               write2(&outBuffer[3], numBlocks);
+                                                               memcpy(&outBuffer[5], hostArray, numHosts*sizeof(struct hostData));
+                                                               memcpy(&outBuffer[5+numHosts*sizeof(struct hostData)],
+                                                                       blockOwnerArray, numBlocks*2);
+                                                               udpSendAll(outBuffer, 5 + sizeof(struct hostData) * numHosts
+                                                                       + 2 * numBlocks);
+                                                       }
+                                               }
+                                               break;
+                                       case JOIN_RES:
+                                               if (state == REBUILD1_STATE)
+                                               {
+                                                       setState(REBUILD2_STATE);
+                                               }
+                                               else if (state == INIT2_STATE)
+                                               {
+                                                       setState(NORMAL_STATE);
+                                               }
+                                               break;
+                                       case LEAVE_REQ:
+                                               if (state == LEAD_NORMAL1_STATE || state == LEAD_NORMAL2_STATE)
+                                               { //TODO: make this graceful, instead of just rebuilding
+                                                       removeHost(peerIp);
+                                                       if (state != LEAD_NORMAL2_STATE)
+                                                               setState(LEAD_NORMAL2_STATE);
+                                               }
+                                               break;
+                                       case DHT_UPDATE_CMD:
+                                               if (state == REBUILD2_STATE && peerIp == leader)
+                                               {
+                                                       free(hostArray);
+                                                       free(blockOwnerArray);
+                                                       numHosts = read2(&inBuffer[1]);
+                                                       numBlocks = read2(&inBuffer[3]);
+                                                       while (hostArraySize < numHosts)
+                                                               hostArraySize *= 2;
+                                                       hostArray = calloc(hostArraySize, sizeof(struct hostData));
+                                                       blockOwnerArray = calloc(numBlocks, 2);
+                                                       memcpy(hostArray, &inBuffer[5], numHosts*sizeof(struct hostData));
+                                                       memcpy(blockOwnerArray, &inBuffer[5+numHosts*sizeof(struct hostData)], numBlocks*2);
+                                                       writeHostList();
+                                                       setState(REBUILD3_STATE);
+                                                       outBuffer[0] = DHT_UPDATE_RES;
+                                                       udpSend(outBuffer, 1, peerIp);
+                                               }
+                                               break;
+                                       case DHT_UPDATE_RES:
+                                               if (state == LEAD_REBUILD2_STATE)
+                                               {
+                                                       checkReplied(peerIp);
+                                                       if (allReplied())
+                                                       {
+                                                               setState(LEAD_REBUILD3_STATE);
+                                                               outBuffer[0] = FILL_DHT_CMD;
+                                                               udpSendAll(outBuffer, 1);
+                                                               if (fillStatus != 0)
+                                                                       dhtLog("udpListen(): ERROR: fillTask already running\n");
+                                                               fillStatus = 1;
+                                                               if (pthread_create(&threadFillTask, NULL, fillTask, NULL) != 0)
+                                                                       dhtLog("udpListen(): ERROR creating threadFillTask\n");
+                                                       }
+                                               }
+                                               break;
+                                       case ELECT_LEADER_CMD:
+                                               tmpUInt = read4(&inBuffer[1]);
+                                               if ((state == ELECT1_STATE || state == ELECT2_STATE)
+                                                       && tmpUInt >= electionOriginator)
+                                               { //already participating in a higher-priority election
+                                                       outBuffer[0] = ELECT_LEADER_RES;
+                                                       outBuffer[1] = 0xFF;
+                                                       udpSend(outBuffer, 2, peerIp);
+                                               }
+                                               else
+                                               { //join election
+                                                       electionOriginator = tmpUInt;
+                                                       electionParent = peerIp;
+                                                       setState(ELECT1_STATE);
+                                                       outBuffer[0] = ELECT_LEADER_CMD;
+                                                       write4(&outBuffer[1], electionOriginator);
+                                                       //don't bother forwarding the message to originator or parent
+                                                       checkReplied(electionOriginator);
+                                                       checkReplied(electionParent);
+                                                       if (allReplied())
+                                                       { //in case that is everybody I know of
+                                                               setState(ELECT2_STATE);
+                                                               outBuffer[0] = ELECT_LEADER_RES;
+                                                               outBuffer[1] = 0;
+                                                               write2(&outBuffer[2], numHosts);
+                                                               memcpy(&outBuffer[4], hostArray, sizeof(struct hostData)
+                                                                       * numHosts);
+                                                               udpSend(outBuffer, 4 + sizeof(struct hostData) * numHosts,
+                                                                       electionParent);
+                                                       }
+                                                       else
+                                                       {
+                                                               udpSendAll(outBuffer, 5);
+                                                       }
+                                               }
+                                               break;
+                                       case ELECT_LEADER_RES:
+                                               if (state == ELECT1_STATE)
+                                               {
+                                                       checkReplied(peerIp);
+                                                       if (inBuffer[1] != 0xFF)
+                                                       {
+                                                               tmpUShort = read2(&inBuffer[2]);
+                                                               hostDataPtr = (struct hostData *)&inBuffer[4];
+                                                               for (i = 0; i < tmpUShort; i++)
+                                                                       addHost(hostDataPtr[i]);
+                                                               writeHostList();
+                                                       }
+                                                       if (allReplied())
+                                                       {
+                                                               setState(ELECT2_STATE);
+                                                               if (electionOriginator == myHostData.ipAddr)
+                                                               {
+                                                                       leader = hostArray[0].ipAddr;
+                                                                       if (leader == myHostData.ipAddr)
+                                                                       { //I am the leader
+                                                                               dhtLog("I am the leader!\n");
+                                                                               setState(LEAD_REBUILD1_STATE);
+                                                                               outBuffer[0] = REBUILD_CMD;
+                                                                               udpSendAll(outBuffer, 1);
+                                                                       }
+                                                                       else
+                                                                       { //notify leader
+                                                                               outBuffer[0] = CONGRATS_CMD;
+                                                                               write2(&outBuffer[1], numHosts);
+                                                                               hostDataPtr = (struct hostData *)&outBuffer[3];
+                                                                               for (i = 0; i < numHosts; i++)
+                                                                                       hostDataPtr[i] = hostArray[i];
+                                                                               udpSend(outBuffer, 3 + sizeof(struct hostData) * numHosts,
+                                                                                       leader);
+                                                                       }
+                                                               }
+                                                               else
+                                                               {
+                                                                       outBuffer[0] = ELECT_LEADER_RES;
+                                                                       outBuffer[1] = 0;
+                                                                       write2(&outBuffer[2], numHosts);
+                                                                       hostDataPtr = (struct hostData *)&outBuffer[4];
+                                                                       for (i = 0; i < numHosts; i++)
+                                                                               hostDataPtr[i] = hostArray[i];
+                                                                       udpSend(outBuffer, 4 + sizeof(struct hostData) * numHosts,
+                                                                               electionParent);
+                                                               }
+                                                       }
+                                               }
+                                               break;
+                                       case CONGRATS_CMD:
+                                               if (state == ELECT2_STATE)
+                                               { //I am the leader
+                                                       leader = myHostData.ipAddr;
+                                                       dhtLog("I am the leader!\n");
+                                                       tmpUShort = read2(&inBuffer[1]);
+                                                       hostDataPtr = (struct hostData *)&inBuffer[3];
+                                                       for (i = 0; i < tmpUShort; i++)
+                                                               addHost(hostDataPtr[i]);
+                                                       writeHostList();
+                                                       setState(LEAD_REBUILD1_STATE);
+                                                       outBuffer[0] = REBUILD_CMD;
+                                                       udpSendAll(outBuffer, 1);
+                                               }
+                                               break;
+                                       case REBUILD_REQ:
+                                               if (state == LEAD_NORMAL1_STATE || state == LEAD_NORMAL2_STATE)
+                                               {
+                                                       setState(LEAD_REBUILD1_STATE);
+                                                       outBuffer[0] = REBUILD_CMD;
+                                                       udpSendAll(outBuffer, 1);
+                                               }
+                                               break;
+                                       case REBUILD_CMD:
+                                               leader = peerIp; //consider this a declaration of authority
+                                               setState(REBUILD1_STATE);
+                                               outBuffer[0] = JOIN_REQ;
+                                               write4(&outBuffer[1], myHostData.maxKeyCapacity);
+                                               udpSend(outBuffer, 5, leader);
+                                               break;
+                                       case FILL_DHT_CMD:
+                                               if (state == REBUILD3_STATE && peerIp == leader)
+                                               {
+                                                       setState(REBUILD4_STATE);
+                                                       if (fillStatus != 0)
+                                                               dhtLog("udpListen(): ERROR: fillTask already running\n");
+                                                       fillStatus = 1;
+                                                       if (pthread_create(&threadFillTask, NULL, fillTask, NULL) != 0)
+                                                               dhtLog("udpListen(): ERROR creating threadFillTask\n");
+                                               }
+                                               break;
+                                       case FILL_DHT_RES:
+                                               if (state == LEAD_REBUILD3_STATE)
+                                               {
+                                                       checkReplied(peerIp);
+                                                       if (allReplied() && fillStatus == 2)
+                                                       {
+                                                               fillStatus = 0;
+                                                               setState(LEAD_REBUILD4_STATE);
+                                                               outBuffer[0] = RESUME_NORMAL_CMD;
+                                                               udpSendAll(outBuffer, 1);
+                                                       }
+                                               }
+                                               break;
+                                       case RESUME_NORMAL_CMD:
+                                               if (state == REBUILD5_STATE && peerIp == leader)
+                                               {
+                                                       setState(NORMAL_STATE);
+                                                       outBuffer[0] = RESUME_NORMAL_RES;
+                                                       udpSend(outBuffer, 1, leader);
+                                               }
+                                               break;
+                                       case RESUME_NORMAL_RES:
+                                               if (state == LEAD_REBUILD4_STATE)
+                                               {
+                                                       checkReplied(peerIp);
+                                                       if (allReplied())
+                                                       {
+                                                               setState(LEAD_NORMAL1_STATE);
+                                                       }
+                                               }
+                                               break;
+                               }
+                       }
+               }
+               if (state == REBUILD4_STATE)
+               {
+                       switch (fillStatus)
+                       {
+                               case 0: dhtLog("udpListen(): ERROR: fillStatus=0 in REBUILD4_STATE\n");
+                                       break;
+                               case 1: //do nothing
+                                       break;
+                               case 2: //done filling the dht, notify leader
+                                       fillStatus = 0;
+                                       setState(REBUILD5_STATE);
+                                       outBuffer[0] = FILL_DHT_RES;
+                                       udpSend(outBuffer, 1, leader);
+                                       break;
+                               case 3: //error encountered -> restart rebuild
+                                       fillStatus = 0;
+                                       setState(REBUILD0_STATE);
+                                       outBuffer[0] = REBUILD_REQ;
+                                       udpSend(outBuffer, 1, leader);
+                                       break;
+                       }
+               }
+               if (state == LEAD_REBUILD3_STATE)
+               {
+                       switch (fillStatus)
+                       {
+                               case 0: dhtLog("udpListen(): ERROR: fillStatus=0 in LEAD_REBUILD3_STATE\n");
+                                       break;
+                               case 1: //do nothing
+                                       break;
+                               case 2: //I'm done, now is everybody else also done?
+                                       if (allReplied())
+                                       {
+                                               fillStatus = 0;
+                                               setState(LEAD_REBUILD4_STATE);
+                                               outBuffer[0] = RESUME_NORMAL_CMD;
+                                               udpSendAll(outBuffer, 1);
+                                       }
+                                       break;
+                               case 3: //error encountered -> restart rebuild
+                                       fillStatus = 0;
+                                       setState(LEAD_REBUILD1_STATE);
+                                       outBuffer[0] = REBUILD_CMD;
+                                       udpSendAll(outBuffer, 1);
+                                       break;
+                       }
+               }
+               if (timerSet)
+               {
+                       gettimeofday(&now, NULL);
+                       if (timercmp(&now, &timer, >))
+                       {
+                               if (timeoutCntr < retry_vals[state])
+                               {
+                                       timeoutCntr++;
+                                       timeradd(&now, &timeout_vals[state], &timer);
+                                       dhtLog("udpListen(): retry: %d\n", timeoutCntr);
+                                       switch (state)
+                                       {
+                                               case INIT1_STATE:
+                                                       outBuffer[0] = WHO_IS_LEADER_CMD;
+                                                       udpSend(outBuffer, 1, seed);
+                                                       break;
+                                               case INIT2_STATE:
+                                                       outBuffer[0] = JOIN_REQ;
+                                                       write4(&outBuffer[1], myHostData.maxKeyCapacity);
+                                                       udpSend(outBuffer, 5, leader);
+                                                       break;
+                                               case ELECT1_STATE:
+                                                       outBuffer[0] = ELECT_LEADER_CMD;
+                                                       write4(&outBuffer[1], electionOriginator);
+                                                       udpSendAll(outBuffer, 5);
+                                                       break;
+                                               case ELECT2_STATE:
+                                                       if (electionOriginator == myHostData.ipAddr)
+                                                       { //retry notify leader
+                                                               outBuffer[0] = CONGRATS_CMD;
+                                                               write2(&outBuffer[1], numHosts);
+                                                               memcpy(&outBuffer[3], hostArray, sizeof(struct hostData)
+                                                                       * numHosts);
+                                                               udpSend(outBuffer, 3 + sizeof(struct hostData) * numHosts,
+                                                                       leader);
+                                                       }
+                                                       else
+                                                       {
+                                                               outBuffer[0] = ELECT_LEADER_RES;
+                                                               outBuffer[1] = 0;
+                                                               write2(&outBuffer[2], numHosts);
+                                                               memcpy(&outBuffer[4], hostArray, sizeof(struct hostData)
+                                                                       * numHosts);
+                                                               udpSend(outBuffer, 4 + sizeof(struct hostData) * numHosts,
+                                                                       electionParent);
+                                                       }
+                                                       break;
+                                               case REBUILD0_STATE:
+                                                       outBuffer[0] = REBUILD_REQ;
+                                                       udpSend(outBuffer, 1, leader);
+                                                       break;
+                                               case REBUILD1_STATE:
+                                                       outBuffer[0] = JOIN_REQ;
+                                                       write4(&outBuffer[1], myHostData.maxKeyCapacity);
+                                                       udpSend(outBuffer, 5, leader);
+                                                       break;
+                                               case REBUILD5_STATE:
+                                                       outBuffer[0] = FILL_DHT_RES;
+                                                       udpSend(outBuffer, 1, leader);
+                                                       break;
+                                               case LEAD_REBUILD1_STATE:
+                                                       outBuffer[0] = REBUILD_CMD;
+                                                       udpSendAll(outBuffer, 1);
+                                                       break;
+                                               case LEAD_REBUILD2_STATE:
+                                                       outBuffer[0] = DHT_UPDATE_CMD;
+                                                       write2(&outBuffer[1], numHosts);
+                                                       write2(&outBuffer[3], numBlocks);
+                                                       memcpy(&outBuffer[5], hostArray, numHosts
+                                                               * sizeof(struct hostData));
+                                                       memcpy(&outBuffer[5+numHosts*sizeof(struct hostData)],
+                                                               blockOwnerArray, numBlocks*2);
+                                                       udpSendAll(outBuffer, 5 + sizeof(struct hostData) * numHosts
+                                                               + 2 * numBlocks);
+                                                       break;
+                                               case LEAD_REBUILD3_STATE:
+                                                       outBuffer[0] = FILL_DHT_CMD;
+                                                       udpSendAll(outBuffer, 1);
+                                                       break;
+                                               case LEAD_REBUILD4_STATE:
+                                                       outBuffer[0] = RESUME_NORMAL_CMD;
+                                                       udpSendAll(outBuffer, 1);
+                                                       break;
+                                               case EXIT1_STATE: //TODO...
+                                                       break;
+                                               case NORMAL_STATE:
+                                               case LEAD_NORMAL1_STATE:
+                                               case LEAD_NORMAL2_STATE:
+                                               case REBUILD2_STATE:
+                                               case REBUILD3_STATE:
+                                               case REBUILD4_STATE:
+                                               case EXIT2_STATE: //we shouldn't get here
+                                                       break;
+                                       }
+                               }
+                               else
+                               {
+                                       dhtLog("udpListen(): timed out in state %s after %d retries\n",
+                                               state_names[state], timeoutCntr);
+                                       switch (state)
+                                       {
+                                               case INIT1_STATE:
+                                                       setState(EXIT2_STATE);
+                                                       break;
+                                               case LEAD_NORMAL2_STATE:
+                                                       setState(LEAD_REBUILD1_STATE);
+                                                       outBuffer[0] = REBUILD_CMD;
+                                                       udpSendAll(outBuffer, 1);
+                                                       break;
+                                               case ELECT1_STATE:
+                                                       dhtLog("removing unresponsive hosts, before:\n");
+                                                       writeHostList();
+                                                       removeUnresponsiveHosts();
+                                                       dhtLog("after\n");
+                                                       writeHostList();
+                                                       setState(ELECT2_STATE);
+                                                       if (electionOriginator == myHostData.ipAddr)
+                                                       {
+                                                               leader = hostArray[0].ipAddr;
+                                                               if (leader == myHostData.ipAddr)
+                                                               { //I am the leader
+                                                                       dhtLog("I am the leader!\n");
+                                                                       setState(LEAD_REBUILD1_STATE);
+                                                                       outBuffer[0] = REBUILD_CMD;
+                                                                       udpSendAll(outBuffer, 1);
+                                                               }
+                                                               else
+                                                               { //notify leader
+                                                                       outBuffer[0] = CONGRATS_CMD;
+                                                                       write2(&outBuffer[1], numHosts);
+                                                                       memcpy(&outBuffer[3], hostArray, sizeof(struct hostData)
+                                                                               * numHosts);
+                                                                       udpSend(outBuffer, 3 + sizeof(struct hostData) * numHosts,
+                                                                               leader);
+                                                               }
+                                                       }
+                                                       else
+                                                       {
+                                                               outBuffer[0] = ELECT_LEADER_RES;
+                                                               outBuffer[1] = 0;
+                                                               write2(&outBuffer[2], numHosts);
+                                                               memcpy(&outBuffer[4], hostArray, sizeof(struct hostData)
+                                                                       * numHosts);
+                                                               udpSend(outBuffer, 4 + sizeof(struct hostData) * numHosts,
+                                                                       electionParent);
+                                                       }
+                                                       break;
+                                               case INIT2_STATE:
+                                               case ELECT2_STATE:
+                                               case REBUILD0_STATE:
+                                               case REBUILD1_STATE:
+                                               case REBUILD2_STATE:
+                                               case REBUILD3_STATE:
+                                               case REBUILD4_STATE:
+                                               case REBUILD5_STATE:
+                                               case LEAD_REBUILD1_STATE:
+                                               case LEAD_REBUILD2_STATE:
+                                               case LEAD_REBUILD3_STATE:
+                                               case LEAD_REBUILD4_STATE:
+                                                       //start election
+                                                       electionOriginator = myHostData.ipAddr;
+                                                       setState(ELECT1_STATE);
+                                                       outBuffer[0] = ELECT_LEADER_CMD;
+                                                       write4(&outBuffer[1], myHostData.ipAddr); //originator = me
+                                                       udpSendAll(outBuffer, 5);
+                                                       break;
+                                               case EXIT1_STATE:
+                                                       setState(EXIT2_STATE);
+                                                       break;
+                                               case NORMAL_STATE:
+                                               case LEAD_NORMAL1_STATE:
+                                               case EXIT2_STATE: //we shouldn't get here
+                                                       break;
+                                       }
+                               }
+                       }
+               }
+               if (state != oldState)
+                       pthread_cond_broadcast(&stateCond);
+               pthread_mutex_unlock(&stateMutex);
+       }
+}
 
index ceeb2a0a74af019113d4a14cfdb8e970b6354974..344ab5303766661bfd7e1919204dac8e8aec0503 100644 (file)
@@ -1,12 +1,20 @@
 #ifndef _DHT_H
 #define _DHT_H
 
-//#define SIMPLE_DHT
+#include <stdio.h>
+
+/*******************************************************************************
+*                             Local Structs
+*******************************************************************************/
 
 #define DHT_NO_KEY_LIMIT 0xFFFFFFFF
 
+/*******************************************************************************
+*                       Interface Function Prototypes
+*******************************************************************************/
+
 //called by host which joins (or starts) the system
-void dhtInit(unsigned int maxKeyCapaciy);
+void dhtInit(unsigned int seedIp, unsigned int maxKeyCapaciy);
 //exit system, cleanup
 void dhtExit();
 
@@ -14,11 +22,17 @@ void dhtExit();
 
 //returns 0 if successful, -1 if an error occurred
 int dhtInsert(unsigned int key, unsigned int val);
+//simultaneously inserts the key-val pairs in the given arrays
+int dhtInsertMult(unsigned int numKeys, unsigned int *keys,    unsigned int *vals);
 //returns 0 if successful, -1 if an error occurred
 int dhtRemove(unsigned int key);
+//simultaneously delete the keys in the given array
+int dhtRemoveMult(unsigned int numKeys, unsigned int *keys);
 //returns 0 if successful and copies val into *val,
 // 1 if key not found, -1 if an error occurred
 int dhtSearch(unsigned int key, unsigned int *val);
-
+//simultaneously search for the vals that correspond to the given keys.
+// result is placed in vals[]
+int dhtSearchMult(unsigned int numKeys, unsigned int *keys, unsigned int *vals);
 #endif
 
index 8bc046dc90428338f64cb35c36de9c7af815a11f..2c95263805946da2ed5126158783b0eff1cf3047 100644 (file)
@@ -187,3 +187,35 @@ unsigned int mhashResize(unsigned int newsize) {
        return 0;
 }
 
+unsigned int *mhashGetKeys(unsigned int *numKeys)
+{
+       unsigned int *keys;
+       int i, keyindex;
+       mhashlistnode_t *curr;
+
+       pthread_mutex_lock(&mlookup.locktable);
+
+       *numKeys = mlookup.numelements;
+       keys = calloc(*numKeys, sizeof(unsigned int));
+
+       keyindex = 0;
+       for (i = 0; i < mlookup.size; i++)
+       {
+               if (mlookup.table[i].key != 0)
+               {
+                       curr = &mlookup.table[i];
+                       while (curr != NULL)
+                       {
+                               keys[keyindex++] = curr->key;
+                               curr = curr->next;
+                       }
+               }
+       }
+
+       if (keyindex != *numKeys)
+               printf("mhashGetKeys(): WARNING: incorrect mlookup.numelements value!\n");
+
+       pthread_mutex_unlock(&mlookup.locktable);
+       return keys;
+}
+
index f3b0159df9983fbc317c68d4fdc7ead6b1b2d0e0..85396c524fc05086f6c7a2dcd6e797b169720329 100644 (file)
@@ -28,6 +28,8 @@ unsigned mhashInsert(unsigned int key, void *val);
 void *mhashSearch(unsigned int key); //returns val, NULL if not found
 unsigned int mhashRemove(unsigned int key); //returns -1 if not found
 unsigned int mhashResize(unsigned int newsize);
+unsigned int *mhashGetKeys(unsigned int *numKeys);
+void mhashPrint();
 
 #endif
 
index 636b4dd4399019696cf0fd5419ea0cf3de2eb40f..acc86a0e504eb6ed9389b059a1bfc787c0c0e686 100644 (file)
@@ -2,7 +2,7 @@
 #include "dht.h"
 #include "clookup.h"
 
-#define NUM_ITEMS 1000
+#define NUM_ITEMS 100000
 
 int main()
 {
@@ -13,7 +13,7 @@ int main()
        int error;
        chashtable_t *localHash;
 
-       dhtInit(DHT_NO_KEY_LIMIT);
+       dhtInit(0x80C3AF45, DHT_NO_KEY_LIMIT);
 
        localHash = chashCreate(HASH_SIZE, LOADFACTOR);
        srandom(time(0));
@@ -23,6 +23,8 @@ int main()
                vals[key] = random();
        }
 
+       sleep(5);
+
        printf("testing dhtInsert() and dhtSearch()\n");
 
        for (key = 0; key < NUM_ITEMS; key++)