Implemented leader/follower state machines for rebuild procedure up to
authorerubow <erubow>
Sat, 30 Jun 2007 01:27:20 +0000 (01:27 +0000)
committererubow <erubow>
Sat, 30 Jun 2007 01:27:20 +0000 (01:27 +0000)
and including distribution of host list and keyspace responsibilities. I
did this using broadcasts under the assumption that all hosts were on a
single network segment, but this will be changed. Implemented a nice
time-stamped logging function to facilitate testing.

Robust/src/Runtime/DSTM/interface/dht.c

index 173ff7b94ee75595d7eb204394e11624e62eca92..2b956665b89ad367c7f3f3ee0169506fc099a1c0 100644 (file)
@@ -41,12 +41,17 @@ int dhtSearch(unsigned int key, unsigned int *val)
 
 #else
 
+/*******************************************************************************
+*                              Includes
+*******************************************************************************/
+
 #include <netinet/in.h>
 #include <arpa/inet.h>
 #include <sys/types.h>
 #include <sys/socket.h>
 #include <sys/ioctl.h>
 #include <stdio.h>
+#include <stdarg.h>
 #include <string.h>
 #include <stdlib.h>
 #include <unistd.h>
@@ -56,22 +61,28 @@ int dhtSearch(unsigned int key, unsigned int *val)
 #include <netdb.h>
 #include <net/if.h>
 #include <linux/sockios.h>
+#include <sys/time.h>
 #include "clookup.h" //this works for now, do we need anything better?
 
+/*******************************************************************************
+*                           Local Defines, Structs
+*******************************************************************************/
+
 #define BUFFER_SIZE 512 //maximum message size
 #define UDP_PORT 2157
 #define TCP_PORT 2157
 #define BACKLOG 10 //max pending tcp connections
 #define TIMEOUT_MS 500
 #define MAX_RETRIES 3
-#define INIT_HOST_ALLOC 16
-#define INIT_BLOCK_NUM 64
+#define INIT_HOST_ALLOC 1
+#define INIT_BLOCK_NUM 1
 #define DEFAULT_INTERFACE "eth0"
 #define DHT_LOG "dht.log"
 
+//make sure this is consistent with enum below
+#define NUM_MSG_TYPES 20
 
-#define NUM_MSG_TYPES 19
-
+//make sure this matches msg_types global var
 enum {
        INSERT_CMD,
        INSERT_RES,
@@ -79,7 +90,7 @@ enum {
        REMOVE_RES,
        SEARCH_CMD,
        SEARCH_RES,
-       FIND_LEADER_CMD,
+       FIND_LEADER_REQ,
        FIND_LEADER_RES,
        REBUILD_REQ,
        REBUILD_RES,
@@ -87,6 +98,7 @@ enum {
        REBUILD_CMD,
        JOIN_REQ,
        JOIN_RES,
+       GET_DHT_INFO_CMD,
        DHT_INFO_REQ,
        DHT_INFO_RES,
        FILL_DHT_CMD,
@@ -94,29 +106,6 @@ enum {
        REBUILD_DONE_INFO
 };
 
-const char *msg_types[NUM_MSG_TYPES] =
-{
-       "INSERT_CMD",
-       "INSERT_RES",
-       "REMOVE_CMD",
-       "REMOVE_RES",
-       "SEARCH_CMD",
-       "SEARCH_RES",
-       "FIND_LEADER_CMD",
-       "FIND_LEADER_RES",
-       "REBUILD_REQ",
-       "REBUILD_RES",
-       "NOT_LEADER",
-       "REBUILD_CMD",
-       "JOIN_REQ",
-       "JOIN_RES",
-       "DHT_INFO_REQ",
-       "DHT_INFO_RES",
-       "FILL_DHT_CMD",
-       "FILL_DHT_RES",
-       "REBUILD_DONE_INFO"
-};
-
 //status codes
 enum {
        INSERT_OK,
@@ -128,6 +117,17 @@ enum {
        NOT_KEY_OWNER,
 };
 
+enum {
+       NORMAL_STATE,
+       REBUILD1_STATE,
+       REBUILD2_STATE,
+       REBUILD3_STATE,
+       LEAD_NORMAL_STATE,
+       LEAD_REBUILD1_STATE,
+       LEAD_REBUILD2_STATE,
+       LEAD_REBUILD3_STATE
+};
+
 struct hostData {
        unsigned int ipAddr;
        unsigned int maxKeyCapacity;
@@ -171,28 +171,75 @@ struct searchRes {
        unsigned int val;
 };
 
-struct rebuildRes {
+struct joinReq {
        unsigned int msgType:8;
        unsigned int unused:24;
-       unsigned int status;
+       struct hostData newHostData;
 };
 
-//TODO: leave message, rebuild message...
+/*******************************************************************************
+*                           Global Variables
+*******************************************************************************/
+
+//make sure this matches enumeration above
+const char *msg_types[NUM_MSG_TYPES] =
+{
+       "INSERT_CMD",
+       "INSERT_RES",
+       "REMOVE_CMD",
+       "REMOVE_RES",
+       "SEARCH_CMD",
+       "SEARCH_RES",
+       "FIND_LEADER_REQ",
+       "FIND_LEADER_RES",
+       "REBUILD_REQ",
+       "REBUILD_RES",
+       "NOT_LEADER",
+       "REBUILD_CMD",
+       "JOIN_REQ",
+       "JOIN_RES",
+       "GET_DHT_INFO_CMD",
+       "DHT_INFO_REQ",
+       "DHT_INFO_RES",
+       "FILL_DHT_CMD",
+       "FILL_DHT_RES",
+       "REBUILD_DONE_INFO"
+};
 
 FILE *logfile;
-unsigned int leader; //ip address of leader
+//ip address of leader
+unsigned int leader;
+//set by dhtInit()
 struct hostData myHostData;
-/*----DHT data----*/
+//number of hosts in the system
 unsigned int numHosts;
+//ip address and max key capacity of each host
 struct hostData *hostArray;
+//memory allocated for this many items in hostArray
+unsigned int hostArraySize;
+//number of keyspace divisions, preferably a power of 2 > numHosts
 unsigned int numBlocks;
+//this array has numBlocks elements, each of which contains an index to hostArray
+// the key owner is found by hashing the key into one of these blocks and using this
+// array to find the corresponding host in hostArray
 unsigned int *blockOwnerArray;
-/*----end DHT data----*/
+//used by leader to track which hosts have responded, etc.
+unsigned int *hostRebuildStates;
+//thread handles
 pthread_t threadUdpListen;
 pthread_t threadTcpListen;
-int udpServerSock;
+//server sockets
+struct pollfd udpServerPollSock;
 int tcpListenSock;
+//see above for enumeration of states
+int state;
+
+/*******************************************************************************
+*                         Local Function Prototypes
+*******************************************************************************/
 
+//log funtion, use like printf()
+void dhtLog(const char *format, ...);
 //return my IP address
 unsigned int getMyIpAddr();
 //sends broadcast to discover leader
@@ -218,13 +265,24 @@ int sendNoWait(unsigned int dest_ip, unsigned short dest_port, void *msg,
 unsigned int getKeyOwner(unsigned int key);
 //simple hash
 unsigned int hash(unsigned int x);
+//sends REBUILD_REQ to leader, retries until leader responds, or causes new leader to be chosen
+void initRebuild();
+//adds entry to end of hostArray, increments numHosts,
+// allocates more space if necessary
+void addHost(struct hostData newHost);
 //initiates TCP connection with leader, gets DHT data
 int getDHTdata();
 //outputs readable DHT data to outfile
 void writeDHTdata(FILE *outfile);
-void initRebuild();
-void leadRebuild();
-void followRebuild();
+void clearDHTdata();
+void initDHTdata();
+void makeAssignments();
+//returns not-zero if ok, zero if not ok
+int msgSizeOk(unsigned char type, unsigned int size);
+
+/*******************************************************************************
+*                      Global Function Definitions
+*******************************************************************************/
 
 void dhtInit(unsigned int maxKeyCapacity)
 {
@@ -233,57 +291,36 @@ void dhtInit(unsigned int maxKeyCapacity)
        int i;
        int ret;
 
-#ifdef DHT_LOG
        logfile = fopen(DHT_LOG, "w");
-#endif
+       dhtLog("dhtInit() - initializing...\n");
 
        myHostData.ipAddr = getMyIpAddr();
        myHostData.maxKeyCapacity = maxKeyCapacity;
 
-       numHosts = numBlocks = 0;
+       numHosts = numBlocks = hostArraySize = 0;
        hostArray = NULL;
        blockOwnerArray = NULL;
+       hostRebuildStates = NULL;
+
+       state = NORMAL_STATE;
 
        pthread_create(&threadUdpListen, NULL, udpListen, NULL);
        pthread_create(&threadTcpListen, NULL, tcpListen, NULL);
 
        initRebuild();
 
-/*     leader = findLeader();
-
-       if (leader == 0)
-       { //no response: I am the first
-               leader = getMyIpAddr();
-
-               numHosts = 1;
-               hostArray = calloc(numHosts, sizeof(struct hostData));
-               hostArray[0] = myHostData;
-               numBlocks = INIT_BLOCK_NUM;
-               blockOwnerArray = calloc(numBlocks, sizeof(unsigned int));
-               for (i = 0; i < numBlocks; i++)
-                       blockOwnerArray[i] = 0;
-       }
-       else
-       {
-               //get DHT data from leader
-               ret = getDHTdata();
-
-               //TODO: actually, just initiate a rebuild here instead
-       }
-*/
-
-       //start servers
-       
        return;
 }
 
 void dhtExit()
 {
+       dhtLog("dhtExit(): cleaning up...\n");
        fclose(logfile);
        pthread_cancel(threadUdpListen);
        pthread_cancel(threadTcpListen);
-       close(udpServerSock);
+       close(udpServerPollSock.fd);
        close(tcpListenSock);
+       clearDHTdata();
 }
 
 int dhtInsert(unsigned int key, unsigned int val)
@@ -372,13 +409,16 @@ int dhtSearch(unsigned int key, unsigned int *val)
        return -1; //this function should be robust enough to always return 0 or 1
 }
 
-
+/*******************************************************************************
+*                      Local Function Definitions
+*******************************************************************************/
 
 //use UDP for messages that are frequent and short
 void *udpListen()
 {
        struct sockaddr_in myAddr;
        struct sockaddr_in clientAddr;
+       struct sockaddr_in bcastAddr;
        socklen_t socklen = sizeof(struct sockaddr_in);
        char buffer[BUFFER_SIZE];
        ssize_t bytesReceived;
@@ -388,220 +428,273 @@ void *udpListen()
        struct insertRes *insertResPtr;
        struct removeRes *removeResPtr;
        struct searchRes *searchResPtr;
+       struct joinReq *joinReqPtr;
        char replyBuffer[BUFFER_SIZE];
        struct timeval now;
+       struct timeval rebuild1Timeout;
+       int rebuild1TimerSet;
+       int on;
+       int pollret;
+       int i;
 
        chashtable_t *myHashTable = chashCreate(HASH_SIZE, LOADFACTOR);
 
-       if ((udpServerSock = socket(AF_INET, SOCK_DGRAM, IPPROTO_UDP)) == -1)
+       if ((udpServerPollSock.fd = socket(AF_INET, SOCK_DGRAM, IPPROTO_UDP)) == -1)
        {
                perror("udpListen():socket()");
                pthread_exit(NULL);
        }
+
+       on = 1;
+       if (setsockopt(udpServerPollSock.fd, SOL_SOCKET, SO_BROADCAST, &on,
+               sizeof(on)) == -1)
+       {
+               perror("udpBroadcastWaitForResponse():setsockopt()");
+               pthread_exit(NULL);
+       }
+       
+       udpServerPollSock.events = POLLIN;
        
        bzero(&myAddr, socklen);
-       myAddr.sin_family=AF_INET;
-       myAddr.sin_addr.s_addr=INADDR_ANY;
-       myAddr.sin_port=htons(UDP_PORT);
+       myAddr.sin_family = AF_INET;
+       myAddr.sin_addr.s_addr = htonl(INADDR_ANY);
+       myAddr.sin_port = htons(UDP_PORT);
+
+       bzero(&bcastAddr, socklen);
+       bcastAddr.sin_family = AF_INET;
+       bcastAddr.sin_addr.s_addr = htonl(0xFFFFFFFF);
+       bcastAddr.sin_port = htons(UDP_PORT);
 
-       if (bind(udpServerSock, (struct sockaddr *)&myAddr, socklen) == -1)
+       if (bind(udpServerPollSock.fd, (struct sockaddr *)&myAddr, socklen) == -1)
        {
                perror("udpListen():bind()");
                pthread_exit(NULL);
        }
-#ifdef DHT_LOG
-       fprintf(logfile,"udpListen(): listening on port %d\n", UDP_PORT);
-       fflush(logfile);
-#endif
+       dhtLog("udpListen(): listening on port %d\n", UDP_PORT);
+
+       rebuild1TimerSet = 0;
        while(1)
        {
-               if ((bytesReceived = recvfrom(udpServerSock, buffer, BUFFER_SIZE, 0,
-                       (struct sockaddr *)&clientAddr, &socklen)) == -1)
-               {
-                       perror("udpListen():recvfrom()");
-               }
-               else if (bytesReceived == 0)
+               pollret = poll(&udpServerPollSock, 1, TIMEOUT_MS);
+               if (pollret < 0)
+               {       perror("udpListen():poll()");   }
+               else if (pollret > 0)
                {
-#ifdef DHT_LOG
-                       fprintf(logfile,"udpListen(): recvfrom() returned 0\n");
-                       fflush(logfile);
-#endif
-               }
-               else
-               {
-                       gettimeofday(&now, NULL);
-#ifdef DHT_LOG
-                       fprintf(logfile, "udpListen(): received %s from %s\n",
-                               (buffer[0] < NUM_MSG_TYPES ? msg_types[buffer[0]] : "unknown message"),
-                               inet_ntoa(clientAddr.sin_addr));
-//                     fprintf(logfile,"udpListen(): time received:%ds,%dus\n", now.tv_sec,
-//                             now.tv_usec);
-//                     fprintf(logfile,"udpListen(): msg size:%d bytes source:%s:%d\n",
-//                             bytesReceived,inet_ntoa(clientAddr.sin_addr),htons(clientAddr.sin_port));
-                       fflush(logfile);
-#endif
-
-                       switch (buffer[0])
+                       if ((bytesReceived = recvfrom(udpServerPollSock.fd, buffer, BUFFER_SIZE,
+                               0, (struct sockaddr *)&clientAddr, &socklen)) == -1)
+                       {       perror("udpListen():recvfrom()");       }
+                       else if (bytesReceived == 0)
                        {
-                               case INSERT_CMD:
-                                       if (bytesReceived != sizeof(struct insertCmd))
-                                       {
-#ifdef DHT_LOG
-                                               fprintf(logfile, "udpListen(): ERROR: incorrect message size\n");
-                                               fflush(logfile);
-#endif
-                                               break;
-                                       }
-                                       insertCmdPtr = (struct insertCmd *)buffer;
-#ifdef DHT_LOG
-                                       fprintf(logfile, "udpListen(): Insert: key=%d, val=%d\n",
-                                               insertCmdPtr->key, insertCmdPtr->val);
-                                       fflush(logfile);
-#endif
-                                       insertResPtr = (struct insertRes *)replyBuffer;
-                                       insertResPtr->msgType = INSERT_RES;
-                                       if (getKeyOwner(insertCmdPtr->key) == myHostData.ipAddr)
-                                       {
-                                               //note: casting val to void * in order to conform to API
-                                               if(chashInsert(myHashTable, insertCmdPtr->key,
-                                                               (void *)insertCmdPtr->val) == 0)
-                                                       insertResPtr->status = INSERT_OK;
-                                               else
-                                                       insertResPtr->status = INSERT_ERROR;
-                                       }
-                                       else
-                                       {
-                                               insertResPtr->status = NOT_KEY_OWNER;;
-                                       }
-                                       if (sendto(udpServerSock, (void *)insertResPtr,
-                                               sizeof(struct insertRes), 0, (struct sockaddr *)&clientAddr,
-                                               socklen) == -1)
-                                       {
-                                               perror("udpListen():sendto()");
-                                       }
-                                       break;
-                               case REMOVE_CMD:
-                                       if (bytesReceived != sizeof(struct removeCmd))
-                                       {
-#ifdef DHT_LOG
-                                               fprintf(logfile, "udpListen(): ERROR: incorrect message size\n");
-                                               fflush(logfile);
-#endif
-                                               break;
-                                       }
-                                       removeCmdPtr = (struct removeCmd *)buffer;
-#ifdef DHT_LOG
-                                       fprintf(logfile,"udpListen(): Remove: key=%d\n", removeCmdPtr->key);
-                                       fflush(logfile);
-#endif
-                                       removeResPtr = (struct removeRes *)replyBuffer;
-                                       removeResPtr->msgType = REMOVE_RES;
-                                       if (getKeyOwner(removeCmdPtr->key) == myHostData.ipAddr)
-                                       {
-                                               //note: casting val to void * in order to conform to API
-                                               if(chashRemove(myHashTable, removeCmdPtr->key) == 0)
-                                                       removeResPtr->status = INSERT_OK;
-                                               else
-                                                       removeResPtr->status = INSERT_ERROR;
-                                       }
-                                       else
-                                       {
-                                               removeResPtr->status = NOT_KEY_OWNER;
-                                       }
-                                       if (sendto(udpServerSock, (void *)removeResPtr, sizeof(struct removeRes), 0,
-                                               (struct sockaddr *)&clientAddr, socklen) == -1)
-                                       {
-                                               perror("udpListen():sendto()");
-                                       }
-                                       break;
-                               case SEARCH_CMD:
-                                       if (bytesReceived != sizeof(struct searchCmd))
-                                       {
-#ifdef DHT_LOG
-                                               fprintf(logfile,"udpListen(): ERROR: incorrect message size\n");
-                                               fflush(logfile);
-#endif
-                                               break;
-                                       }
-                                       searchCmdPtr = (struct searchCmd *)buffer;
-#ifdef DHT_LOG
-                                               fprintf(logfile,"udpListen(): Search: key=%d\n",searchCmdPtr->key);
-                                               fflush(logfile);
-#endif
-                                       searchResPtr = (struct searchRes *)replyBuffer;
-                                       searchResPtr->msgType = SEARCH_RES;
-                                       if (getKeyOwner(searchCmdPtr->key) == myHostData.ipAddr)
-                                       {
-                                               //note: casting val to void * in order to conform to API
-                                               if((searchResPtr->val = (unsigned int)chashSearch(myHashTable,
-                                                               searchCmdPtr->key)) == 0)
-                                                       searchResPtr->status = KEY_NOT_FOUND;
-                                               else
-                                                       searchResPtr->status = KEY_FOUND;
-                                       }
-                                       else
-                                       {
-                                               searchResPtr->status = NOT_KEY_OWNER;
-                                       }
-                                       if (sendto(udpServerSock, (void *)searchResPtr, sizeof(struct searchRes), 0,
-                                               (struct sockaddr *)&clientAddr, socklen) == -1)
-                                       {
-                                               perror("udpListen():sendto()");
-                                       }
-                                       break;
-                               case FIND_LEADER_CMD:
-                                       if (bytesReceived != sizeof(char))
-                                       {
-#ifdef DHT_LOG
-                                               fprintf(logfile, "udpListen(): ERROR: incorrect message size\n");
-                                               fflush(logfile);
-#endif
-                                               break;
-                                       }
-                                       if (leader == getMyIpAddr())
-                                       {
-                                               replyBuffer[0] = FIND_LEADER_RES;
-                                               if(sendto(udpServerSock, (void *)replyBuffer, sizeof(char), 0,
-                                                       (struct sockaddr *)&clientAddr, socklen) == -1)
-                                               {
-                                                       perror("udpListen():sendto");
-                                               }
-                                       }
-                                       break;
-                               case REBUILD_REQ:
-                                       if (bytesReceived != sizeof(char))
-                                       {
-#ifdef DHT_LOG
-                                               fprintf(logfile, "udpListen(): ERROR: incorrect message size\n");
-                                               fflush(logfile);
-#endif
-                                               break;
-                                       }
-                                       if (leader == getMyIpAddr())
-                                       {
-                                               replyBuffer[0] = REBUILD_RES;
-                                               if(sendto(udpServerSock, (void *)replyBuffer, sizeof(char), 0,
-                                                       (struct sockaddr *)&clientAddr, socklen) == -1)
-                                               {
-                                                       perror("udpListen():sendto");
-                                               }
-                                               //TODO: leadRebuild()
-                                       }
-                                       else
+                               dhtLog("udpListen(): recvfrom() returned 0\n");
+                       }
+                       else
+                       {
+                               dhtLog("udpListen(): received %s from %s\n",
+                                       (buffer[0] < NUM_MSG_TYPES ? msg_types[buffer[0]] :
+                                       "unknown message"), inet_ntoa(clientAddr.sin_addr));
+                               if (!msgSizeOk(buffer[0], bytesReceived))
+                               {
+                                       dhtLog("udpListen(): ERROR: incorrect message size\n");
+                               }
+                               else
+                               {
+                                       switch (buffer[0])
                                        {
-                                               replyBuffer[0] = NOT_LEADER;
-                                               if(sendto(udpServerSock, (void *)replyBuffer, sizeof(char), 0,
-                                                       (struct sockaddr *)&clientAddr, socklen) == -1)
-                                               {
-                                                       perror("udpListen():sendto");
-                                               }
+                                               case INSERT_CMD:
+                                                       if (state == NORMAL_STATE || state == LEAD_NORMAL_STATE
+                                                               || state == REBUILD3_STATE || state == LEAD_REBUILD3_STATE)
+                                                       {
+                                                               insertCmdPtr = (struct insertCmd *)buffer;
+                                                               dhtLog( "udpListen(): Insert: key=%d, val=%d\n",
+                                                                       insertCmdPtr->key, insertCmdPtr->val);
+                                                               insertResPtr = (struct insertRes *)replyBuffer;
+                                                               insertResPtr->msgType = INSERT_RES;
+                                                               insertResPtr->unused = 0;
+                                                               if (getKeyOwner(insertCmdPtr->key) == myHostData.ipAddr)
+                                                               {
+                                                                       //note: casting val to void * in order to conform to API
+                                                                       if(chashInsert(myHashTable, insertCmdPtr->key,
+                                                                                       (void *)insertCmdPtr->val) == 0)
+                                                                               insertResPtr->status = INSERT_OK;
+                                                                       else
+                                                                               insertResPtr->status = INSERT_ERROR;
+                                                               }
+                                                               else
+                                                               {
+                                                                       insertResPtr->status = NOT_KEY_OWNER;;
+                                                               }
+                                                               if (sendto(udpServerPollSock.fd, (void *)insertResPtr,
+                                                                       sizeof(struct insertRes), 0, (struct sockaddr *)&clientAddr,
+                                                                       socklen) == -1)
+                                                               {       perror("udpListen():sendto()"); }
+                                                       }
+                                                       break;
+                                               case REMOVE_CMD:
+                                                       if (state == NORMAL_STATE || state == LEAD_NORMAL_STATE)
+                                                       {
+                                                               removeCmdPtr = (struct removeCmd *)buffer;
+                                                               dhtLog("udpListen(): Remove: key=%d\n", removeCmdPtr->key);
+                                                               removeResPtr = (struct removeRes *)replyBuffer;
+                                                               removeResPtr->msgType = REMOVE_RES;
+                                                               removeResPtr->unused = 0;
+                                                               if (getKeyOwner(removeCmdPtr->key) == myHostData.ipAddr)
+                                                               {
+                                                                       //note: casting val to void * in order to conform to API
+                                                                       if(chashRemove(myHashTable, removeCmdPtr->key) == 0)
+                                                                               removeResPtr->status = INSERT_OK;
+                                                                       else
+                                                                               removeResPtr->status = INSERT_ERROR;
+                                                               }
+                                                               else
+                                                               {
+                                                                       removeResPtr->status = NOT_KEY_OWNER;
+                                                               }
+                                                               if (sendto(udpServerPollSock.fd, (void *)removeResPtr,
+                                                                       sizeof(struct removeRes), 0, (struct sockaddr *)&clientAddr,
+                                                                       socklen) == -1)
+                                                               {       perror("udpListen():sendto()"); }
+                                                       }
+                                                       break;
+                                               case SEARCH_CMD:
+                                                       if (state == NORMAL_STATE || state == LEAD_NORMAL_STATE)
+                                                       {
+                                                               searchCmdPtr = (struct searchCmd *)buffer;
+                                                               dhtLog("udpListen(): Search: key=%d\n",searchCmdPtr->key);
+                                                               searchResPtr = (struct searchRes *)replyBuffer;
+                                                               searchResPtr->msgType = SEARCH_RES;
+                                                               searchResPtr->unused = 0;
+                                                               if (getKeyOwner(searchCmdPtr->key) == myHostData.ipAddr)
+                                                               {
+                                                                       //note: casting val to void * in order to conform to API
+                                                                       if((searchResPtr->val = (unsigned int)chashSearch(myHashTable,
+                                                                                       searchCmdPtr->key)) == 0)
+                                                                               searchResPtr->status = KEY_NOT_FOUND;
+                                                                       else
+                                                                               searchResPtr->status = KEY_FOUND;
+                                                               }
+                                                               else
+                                                               {
+                                                                       searchResPtr->status = NOT_KEY_OWNER;
+                                                               }
+                                                               if (sendto(udpServerPollSock.fd, (void *)searchResPtr,
+                                                                       sizeof(struct searchRes), 0, (struct sockaddr *)&clientAddr,
+                                                                       socklen) == -1)
+                                                               {       perror("udpListen():sendto()"); }
+                                                       }
+                                                       break;
+                                               case FIND_LEADER_REQ:
+                                                       if (state == LEAD_NORMAL_STATE || state == LEAD_REBUILD1_STATE
+                                                               || state == LEAD_REBUILD2_STATE || state == LEAD_REBUILD3_STATE)
+                                                       {
+                                                               replyBuffer[0] = FIND_LEADER_RES;
+                                                               if(sendto(udpServerPollSock.fd, (void *)replyBuffer,
+                                                                       sizeof(char), 0,(struct sockaddr *)&clientAddr, socklen) == -1)
+                                                               {       perror("udpListen():sendto()"); }
+                                                       }
+                                                       break;
+                                               case REBUILD_REQ:
+                                                       if (state == LEAD_NORMAL_STATE || state == LEAD_REBUILD1_STATE
+                                                               || state == LEAD_REBUILD2_STATE || state == LEAD_REBUILD3_STATE)
+                                                       {
+                                                               replyBuffer[0] = REBUILD_RES;
+                                                               if (sendto(udpServerPollSock.fd, (void *)replyBuffer,
+                                                                       sizeof(char), 0, (struct sockaddr *)&clientAddr, socklen) == -1)
+                                                               {       perror("udpListen():sendto()"); }
+                                                               if (gettimeofday(&rebuild1Timeout, NULL) < 0)
+                                                               {       perror("dhtLog():gettimeofday()"); }
+                                                               //TODO: make this a configurable parameter
+                                                               rebuild1Timeout.tv_sec += 3;
+                                                               rebuild1TimerSet = 1;
+                                                               //clear out previous host data
+                                                               numHosts = 1;
+                                                               hostArray[0] = myHostData;
+
+                                                               state = LEAD_REBUILD1_STATE;
+
+                                                               replyBuffer[0] = REBUILD_CMD;
+                                                               if (sendto(udpServerPollSock.fd, (void *)replyBuffer,
+                                                                       sizeof(char), 0, (struct sockaddr *)&bcastAddr, socklen) == -1)
+                                                               {       perror("udpListen():sendto()"); }
+                                                               
+                                                       }
+                                                       else
+                                                       {
+                                                               replyBuffer[0] = NOT_LEADER;
+                                                               if(sendto(udpServerPollSock.fd, (void *)replyBuffer,
+                                                                       sizeof(char), 0,(struct sockaddr *)&clientAddr, socklen) == -1)
+                                                               {       perror("udpListen():sendto()"); }
+                                                       }
+                                               case REBUILD_CMD:
+                                                       if (state != LEAD_REBUILD1_STATE)
+                                                       {
+                                                               //consider this an official declaration of authority,
+                                                               // in case I was confused about this
+                                                               leader = htonl(clientAddr.sin_addr.s_addr);
+                                                               
+                                                               clearDHTdata();
+
+                                                               joinReqPtr = (struct joinReq *)replyBuffer;
+                                                               joinReqPtr->msgType = JOIN_REQ;
+                                                               joinReqPtr->unused = 0;
+                                                               joinReqPtr->newHostData = myHostData;
+                                                               //note: I'm reusing bytesReceived and buffer
+                                                               bytesReceived = udpSendWaitForResponse(leader, UDP_PORT,
+                                                                       (void *)replyBuffer, sizeof(struct joinReq), (void *)buffer,
+                                                                       BUFFER_SIZE, TIMEOUT_MS, MAX_RETRIES);
+                                                               if ((bytesReceived == sizeof(char)) && (buffer[0] == JOIN_RES))
+                                                                       state = REBUILD1_STATE;
+                                                               else
+                                                                       initRebuild();
+                                                       }
+                                                       break;
+                                               case JOIN_REQ:
+                                                       if (state == LEAD_REBUILD1_STATE)
+                                                       {
+                                                               joinReqPtr = (struct joinReq *)buffer;
+                                                               addHost(joinReqPtr->newHostData);
+
+                                                               replyBuffer[0] = JOIN_RES;
+                                                               if (sendto(udpServerPollSock.fd, (void *)replyBuffer,
+                                                                       sizeof(char), 0,(struct sockaddr *)&clientAddr, socklen) == -1)
+                                                               {       perror("udpListen():sendto()"); }
+                                                       }
+                                                       break;
+                                               case GET_DHT_INFO_CMD:
+                                                       if (state == REBUILD1_STATE)
+                                                       {
+                                                               getDHTdata();
+                                                               state = REBUILD2_STATE;
+                                                       }
+                                                       break;
+                                               default:
+                                                       dhtLog("udpListen(): ERROR: Unknown message type\n");
                                        }
-                                       break;
-//                             default:
-#ifdef DHT_LOG
-//                                     fprintf(logfile,"udpListen(): ERROR: Unknown message type\n");
-//                                     fflush(logfile);
-#endif
+                               }
+                       }
+               } //end (pollret > 0)
+               else // (pollret == 0), timeout
+               {
+                       if (gettimeofday(&now, NULL) < 0)
+                       {       perror("dhtLog():gettimeofday()"); }
+                       if (rebuild1TimerSet && timercmp(&now, &rebuild1Timeout, >))
+                       {
+                               rebuild1TimerSet = 0;
+                               if (state == LEAD_REBUILD1_STATE)
+                               {
+                                       makeAssignments();
+                                       dhtLog("udpListen(): assignments made\n");
+                                       writeDHTdata(logfile);
+                                       if (hostRebuildStates != NULL)
+                                               free(hostRebuildStates);
+                                       hostRebuildStates = calloc(numHosts, sizeof(unsigned int));
+                                       for (i = 0; i < numHosts; i++)
+                                               hostRebuildStates[i] = REBUILD1_STATE;
+                                       state = LEAD_REBUILD2_STATE;
+                                       replyBuffer[0] = GET_DHT_INFO_CMD;
+                                       if (sendto(udpServerPollSock.fd, (void *)replyBuffer,
+                                               sizeof(char), 0, (struct sockaddr *)&bcastAddr, socklen) == -1)
+                                       {       perror("udpListen():sendto()"); }
+                               }
                        }
                }
        }
@@ -615,7 +708,6 @@ int udpSendWaitForResponse(unsigned int dest_ip, unsigned short dest_port,
        struct sockaddr_in ack_addr;
        socklen_t socklen = sizeof(struct sockaddr_in);
        struct pollfd pollsock;
-       struct timeval now;
        int retval;
        int i;
        ssize_t bytesReceived;
@@ -635,26 +727,21 @@ int udpSendWaitForResponse(unsigned int dest_ip, unsigned short dest_port,
        
        for (i = 0; i < MAX_RETRIES; i++)
        {
-#ifdef DHT_LOG
                if (i > 0)
-                       fprintf(logfile,"udpSendWaitForResponse(): trying again, count: %d\n",
-                               i+1);
-               fflush(logfile);
-#endif
+                       dhtLog("udpSendWaitForResponse(): trying again, count: %d\n", i+1);
                if (sendto(pollsock.fd, msg, msglen, 0, (struct sockaddr *)&server_addr,
                        socklen) == -1)
                {
                        perror("udpSendWaitForResponse():sendto");
                        return -1;
                }
-#ifdef DHT_LOG
-               gettimeofday(&now, NULL);
-               fprintf(logfile,"udpSendWaitForResponse(): message sent:%ds,%dus\n",
-                       now.tv_sec, now.tv_usec);
-               fflush(logfile);
-#endif
+               dhtLog("udpSendWaitForResponse(): message sent\n");
                retval = poll(&pollsock, 1, timeout);
-               if (retval !=0)
+               if (retval < 0)
+               {
+                       perror("udpSendWaitForResponse():poll()");
+               }
+               else if (retval > 0)
                {
                        bytesReceived = recvfrom(pollsock.fd, resBuffer, resBufferSize, 0,
                                (struct sockaddr *)&ack_addr, &socklen);
@@ -662,22 +749,13 @@ int udpSendWaitForResponse(unsigned int dest_ip, unsigned short dest_port,
                        && (ack_addr.sin_port == server_addr.sin_port))
                        {
                                close(pollsock.fd);
-#ifdef DHT_LOG
-                               gettimeofday(&now, NULL);
-                               fprintf(logfile,"udpSendWaitForResponse(): received response:%ds,%dus\n", now.tv_sec, now.tv_usec);
-                               fflush(logfile);
-#endif
+                               dhtLog("udpSendWaitForResponse(): received response\n");
                                return bytesReceived;
                        }
                }
        }
        close(pollsock.fd);
-#ifdef DHT_LOG
-       gettimeofday(&now, NULL);
-       printf("udpSendWaitForResponse(): timed out, no ack:%ds,%dus\n",
-               now.tv_sec, now.tv_usec);
-       fflush(logfile);
-#endif
+       printf("udpSendWaitForResponse(): timed out, no ack\n");
        return -1;
 }
 
@@ -689,7 +767,6 @@ int udpBroadcastWaitForResponse(unsigned int *reply_ip,
        struct sockaddr_in ack_addr;
        socklen_t socklen = sizeof(struct sockaddr_in);
        struct pollfd pollsock;
-       struct timeval now;
        int retval;
        int i;
        ssize_t bytesReceived;
@@ -717,23 +794,15 @@ int udpBroadcastWaitForResponse(unsigned int *reply_ip,
        
        for (i = 0; i < MAX_RETRIES; i++)
        {
-#ifdef DHT_LOG
                if (i > 0)
-                       fprintf(logfile,"udpBroadcastWaitForResponse(): trying again, count: %d\n", i+1);
-                       fflush(logfile);
-#endif
+                       dhtLog("udpBroadcastWaitForResponse(): trying again, count: %d\n", i+1);
                if (sendto(pollsock.fd, msg, msglen, 0, (struct sockaddr *)&server_addr,
                        socklen) == -1)
                {
                        perror("udpBroadcastWaitForResponse():sendto()");
                        return -1;
                }
-#ifdef DHT_LOG
-               gettimeofday(&now, NULL);
-               fprintf(logfile,"udpBroadcastWaitForResponse(): message sent:%ds,%dus\n",
-                       now.tv_sec, now.tv_usec);
-               fflush(logfile);
-#endif
+               dhtLog("udpBroadcastWaitForResponse(): message sent\n");
                retval = poll(&pollsock, 1, timeout);
                if (retval !=0)
                {
@@ -741,21 +810,12 @@ int udpBroadcastWaitForResponse(unsigned int *reply_ip,
                                (struct sockaddr *)&ack_addr, &socklen);
                        close(pollsock.fd);
                        *reply_ip = htonl(ack_addr.sin_addr.s_addr);
-#ifdef DHT_LOG
-                       gettimeofday(&now, NULL);
-                       fprintf(logfile,"udpBroadcastWaitForResponse(): received response:%ds,%dus\n", now.tv_sec, now.tv_usec);
-                       fflush(logfile);
-#endif
+                       dhtLog("udpBroadcastWaitForResponse(): received response\n");
                        return bytesReceived;
                }
        }
        close(pollsock.fd);
-#ifdef DHT_LOG
-       gettimeofday(&now, NULL);
-       fprintf(logfile,"udpBroadcastWaitForResponse(): timed out, no ack:%ds,%dus\n",
-               now.tv_sec, now.tv_usec);
-       fflush(logfile);
-#endif
+       dhtLog("udpBroadcastWaitForResponse(): timed out, no ack\n");
        return -1;
 }
 
@@ -792,14 +852,12 @@ void *tcpListen()
                pthread_exit(NULL);
        }
 
-#ifdef DHT_LOG
-       fprintf(logfile,"tcpListen(): listening on port %d\n", TCP_PORT);
-       fflush(logfile);
-#endif
+       dhtLog("tcpListen(): listening on port %d\n", TCP_PORT);
 
        while(1)
        {
-               tcpAcceptSock = accept(tcpListenSock, (struct sockaddr *)&clientAddr, &socklen);
+               tcpAcceptSock = accept(tcpListenSock, (struct sockaddr *)&clientAddr,
+                       &socklen);
                pthread_create(&threadTcpAccept, NULL, tcpAccept, (void *)tcpAcceptSock);
        }
 }
@@ -810,22 +868,15 @@ void *tcpAccept(void *arg)
        int bytesReceived;
        char msgType;
 
-#ifdef DHT_LOG
-       fprintf(logfile, "tcpAccept(): accepted tcp connection, file descriptor: %d\n", tcpAcceptSock);
-       fflush(logfile);
-#endif
+       dhtLog("tcpAccept(): accepted tcp connection, file descriptor: %d\n",
+               tcpAcceptSock);
 
        bytesReceived = recv(tcpAcceptSock, &msgType, sizeof(char), 0);
        if (bytesReceived == -1)
-       {
-               perror("tcpAccept():recv()");
-       }
+       {       perror("tcpAccept():recv()");   }
        else if (bytesReceived == 0)
        {
-#ifdef DHT_LOG
-               fprintf(logfile, "tcpAccept(): bytesReceived = 0\n", tcpAcceptSock);
-               fflush(logfile);
-#endif
+               dhtLog( "tcpAccept(): bytesReceived = 0\n", tcpAcceptSock);
        }
        else
        {
@@ -856,30 +907,31 @@ void *tcpAccept(void *arg)
                                }
                                break;
                        default:
-#ifdef DHT_LOG
-                               fprintf(logfile, "tcpAccept(): unrecognized msg type\n");
-                               fflush(logfile);
-#endif
+                               dhtLog("tcpAccept(): unrecognized msg type\n");
                }
        }
 
        if (close(tcpAcceptSock) == -1)
-       {
-               perror("tcpAccept():close()");
-       }
+       {       perror("tcpAccept():close()"); }
 
-#ifdef DHT_LOG
-       fprintf(logfile, "tcpAccept(): closed tcp connection, file descriptor: %d\n",
+       dhtLog("tcpAccept(): closed tcp connection, file descriptor: %d\n",
                tcpAcceptSock);
-       fflush(logfile);
-#endif
 
        pthread_exit(NULL);
 }
 
 unsigned int getKeyOwner(unsigned int key)
 {
-       return hostArray[blockOwnerArray[hash(key)]].ipAddr;
+       if (state == NORMAL_STATE || state == LEAD_NORMAL_STATE
+               || state == REBUILD3_STATE || state == LEAD_REBUILD3_STATE)
+       {
+               return hostArray[blockOwnerArray[hash(key)]].ipAddr;
+       }
+       else
+       { //TODO: figure out what is best to do here. Would like calls to dhtSearch,
+               // etc. to block rather than fail during rebuilds
+               return 0;
+       }
 }
 
 unsigned int getMyIpAddr()
@@ -915,12 +967,9 @@ unsigned int findLeader()
        char myMessage;
        char response;
 
-#ifdef DHT_LOG
-       fprintf(logfile, "findLeader(): broadcasting...\n");
-       fflush(logfile);
-#endif
+       dhtLog("findLeader(): broadcasting...\n");
 
-       myMessage = FIND_LEADER_CMD;
+       myMessage = FIND_LEADER_REQ;
 
        bytesReceived = udpBroadcastWaitForResponse(&reply_ip, UDP_PORT,
                (void *)&myMessage, sizeof(myMessage), (void *)&response,
@@ -928,29 +977,20 @@ unsigned int findLeader()
 
        if (bytesReceived == -1)
        {
-#ifdef DHT_LOG
-               fprintf(logfile, "findLeader(): no response\n");
-               fflush(logfile);
-#endif
+               dhtLog("findLeader(): no response\n");
                return 0;
        }
        else if (response == FIND_LEADER_RES)
        {
-#ifdef DHT_LOG
                struct in_addr reply_addr;
                reply_addr.s_addr = htonl(reply_ip);
-               fprintf(logfile, "findLeader(): leader found:%s\n",
+               dhtLog("findLeader(): leader found:%s\n",
                                        inet_ntoa(reply_addr));
-               fflush(logfile);
-#endif
                return reply_ip;
        }
        else
        {
-#ifdef DHT_LOG
-               fprintf(logfile, "findLeader(): unexpected response\n");
-               fflush(logfile);
-#endif
+               dhtLog("findLeader(): unexpected response\n");
                return 0;
        }
 }
@@ -962,6 +1002,8 @@ int getDHTdata()
        char msg;
        int bytesReceived;
 
+       clearDHTdata();
+
        if ((sock = socket(AF_INET, SOCK_STREAM, 0)) == -1)
        {
                perror("getDHTdata():socket()");
@@ -995,10 +1037,7 @@ int getDHTdata()
        }
        if (bytesReceived != sizeof(numHosts))
        {
-#ifdef DHT_LOG
-               fprintf(logfile,"getDHTdata(): ERROR: numHosts not completely received\n");
-               fflush(logfile);
-#endif
+               dhtLog("getDHTdata(): ERROR: numHosts not completely received\n");
                close(sock);
                return -1;
        }
@@ -1011,15 +1050,10 @@ int getDHTdata()
        }
        if (bytesReceived != sizeof(numBlocks))
        {
-#ifdef DHT_LOG
-               fprintf(logfile,"getDHTdata(): ERROR: numBlocks not completely received\n");
-               fflush(logfile);
-#endif
+               dhtLog("getDHTdata(): ERROR: numBlocks not completely received\n");
                close(sock);
                return -1;
        }
-       if (hostArray != NULL)
-               free(hostArray);
        hostArray = calloc(numHosts, sizeof(struct hostData));
        bytesReceived = recv(sock, hostArray, numHosts*sizeof(struct hostData), 0);
        if (bytesReceived == -1)
@@ -1030,17 +1064,12 @@ int getDHTdata()
        }
        if (bytesReceived != numHosts*sizeof(struct hostData))
        {
-#ifdef DHT_LOG
-               fprintf(logfile,"getDHTdata(): ERROR: hostArray not completely received\n");
-               fflush(logfile);
-#endif
+               dhtLog("getDHTdata(): ERROR: hostArray not completely received\n");
                close(sock);
                return -1;
        }
-       if (blockOwnerArray != NULL)
-               free(blockOwnerArray);
        blockOwnerArray = calloc(numBlocks, sizeof(unsigned int));
-       bytesReceived = recv(sock, blockOwnerArray, numBlocks*sizeof(unsigned int), 0);
+       bytesReceived = recv(sock,blockOwnerArray,numBlocks*sizeof(unsigned int),0);
        if (bytesReceived == -1)
        {
                perror("getDHTdata():recv()");
@@ -1049,24 +1078,21 @@ int getDHTdata()
        }
        if (bytesReceived != numBlocks*sizeof(unsigned int))
        {
-#ifdef DHT_LOG
-               fprintf(logfile,"getDHTdata(): ERROR: blockOwnerArray not completely received\n");
-               fflush(logfile);
-#endif
+               dhtLog("getDHTdata(): ERROR: blockOwnerArray not completely received\n");
                close(sock);
                return -1;
        }
-#ifdef DHT_LOG
-               fprintf(logfile,"getDHTdata(): got data:\n");
-               writeDHTdata(logfile);
-               fflush(logfile);
-#endif
+       dhtLog("getDHTdata(): got data:\n");
+       writeDHTdata(logfile);
+
        return 0;
 }
 
 unsigned int hash(unsigned int x)
 {
-       return x % numBlocks;
+       //this shouldn't be called when numBlocks = 0, so if you get a divide-by-zero,
+       // make sure we are in a proper state for key owner lookups
+               return x % numBlocks;
 }
 
 //This function will not return until it succeeds in submitting
@@ -1086,13 +1112,10 @@ void initRebuild()
 
        while (!done)
        {
-#ifdef DHT_LOG
                if (retry_count > 0)
                {
-                       fprintf(logfile,"initRebuild(): retry count:%d\n", retry_count);
-                       fflush(logfile);
+                       dhtLog("initRebuild(): retry count:%d\n", retry_count);
                }
-#endif
 
                if (leader == 0 || retry_count > 0)
                {
@@ -1100,15 +1123,9 @@ void initRebuild()
                        if (leader == 0) //no response
                        {
                                //TODO:elect leader: this will do for now
+                               initDHTdata();
                                leader = getMyIpAddr();
-
-                               numHosts = 1;
-                               hostArray = calloc(numHosts, sizeof(struct hostData));
-                               hostArray[0] = myHostData;
-                               numBlocks = INIT_BLOCK_NUM;
-                               blockOwnerArray = calloc(numBlocks, sizeof(unsigned int));
-                               for (i = 0; i < numBlocks; i++)
-                                       blockOwnerArray[i] = 0;
+                               state = LEAD_NORMAL_STATE;
                        }
                }
        
@@ -1118,60 +1135,37 @@ void initRebuild()
                        (void *)&msg, sizeof(msg), (void *)&response, sizeof(response),
                        TIMEOUT_MS, MAX_RETRIES);
                if (bytesReceived == -1)
-               {
-                       perror("initRebuild():recv()");
-               }
+               {       perror("initRebuild():recv()"); }
                else if (bytesReceived != sizeof(response))
                {
-#ifdef DHT_LOG
-                       fprintf(logfile,"initRebuild(): ERROR: response not completely received\n");
-                       fflush(logfile);
-#endif
+                       dhtLog("initRebuild(): ERROR: response not completely received\n");
                }
                else if (response == NOT_LEADER)
                {
-#ifdef DHT_LOG
                        struct in_addr address;
                        address.s_addr = htonl(leader);
-                       fprintf(logfile,"initRebuild(): ERROR: %s no longer leader\n",
+                       dhtLog("initRebuild(): ERROR: %s no longer leader\n",
                                inet_ntoa(address));
-                       fflush(logfile);
-#endif
                }
                else if (response != REBUILD_RES)
                {
-#ifdef DHT_LOG
-                       fprintf(logfile,"initRebuild(): ERROR: unexpected response\n");
-                       fflush(logfile);
-#endif
+                       dhtLog("initRebuild(): ERROR: unexpected response\n");
                }
                else
                {
-#ifdef DHT_LOG
-                       fprintf(logfile,"initRebuild(): submitted rebuild request\n");
+                       dhtLog("initRebuild(): submitted rebuild request\n");
                        writeDHTdata(logfile);
-                       fflush(logfile);
-#endif
                        done = 1;
                }
        }
        return;
 }
 
-void leadRebuild()
-{
-       
-}
-
-void followRebuild()
-{
-
-}
-
 void writeDHTdata(FILE *outfile)
 {
        int i;
        struct in_addr address;
+
        fprintf(outfile,"numHosts=%d,numBlocks=%d\n", numHosts, numBlocks);
        fprintf(outfile,"hostArray: index: ipAddr, maxKeyCapacity\n");
        for (i = 0; i < numHosts; i++)
@@ -1182,10 +1176,181 @@ void writeDHTdata(FILE *outfile)
        }
        fprintf(outfile,"blockOwnerArray: index: blockOwner\n");
        for (i = 0; i < numBlocks; i++)
+               fprintf(outfile,"%d: %d  ", i, blockOwnerArray[i]);
+       fprintf(outfile,"\n");
+}
+
+void clearDHTdata()
+{
+       if (hostArray != NULL)
+       {
+               free(hostArray);
+               hostArray = NULL;
+       }
+       if (blockOwnerArray != NULL)
+       {
+               free(blockOwnerArray);
+               blockOwnerArray = NULL;
+       }
+       numHosts = numBlocks = hostArraySize = 0;
+       return;
+}
+
+void initDHTdata()
+{
+       int i;
+
+       clearDHTdata();
+       hostArraySize = INIT_HOST_ALLOC;
+       hostArray = calloc(hostArraySize, sizeof(struct hostData));
+       numHosts = 1;
+       hostArray[0] = myHostData;
+       numBlocks = INIT_BLOCK_NUM;
+       blockOwnerArray = calloc(numBlocks, sizeof(unsigned int));
+       for (i = 0; i < numBlocks; i++)
+               blockOwnerArray[i] = 0;
+       
+       return;
+}
+
+void addHost(struct hostData newHost)
+{
+       struct hostData *newArray;
+       unsigned int newArraySize;
+
+       if (hostArray == NULL || blockOwnerArray == NULL || hostArraySize == 0)
+               initDHTdata();
+
+       if (numHosts == hostArraySize)
+       {
+               newArraySize = hostArraySize * 2;
+               newArray = calloc(newArraySize, sizeof(struct hostData));
+               memcpy(newArray, hostArray, (hostArraySize * sizeof(struct hostData)));
+               free(hostArray);
+               hostArray = newArray;
+               hostArraySize = newArraySize;
+       }
+
+       hostArray[numHosts] = newHost;
+       numHosts++;
+
+       return;
+}
+
+void makeAssignments()
+{
+       int i;
+
+       if (hostArray == NULL || blockOwnerArray == NULL || hostArraySize == 0)
+               initDHTdata();
+       
+       if (numBlocks < numHosts)
+       {
+               free(blockOwnerArray);
+               while (numBlocks < numHosts)
+                       numBlocks *= 2;
+               blockOwnerArray = calloc(numBlocks, sizeof(unsigned int));
+       }
+
+       for (i = 0; i < numBlocks; i++)
+               blockOwnerArray[i]  = i % numHosts;
+
+       return;
+}
+
+//returns not-zero if ok, zero if not ok
+int msgSizeOk(unsigned char type, unsigned int size)
+{
+       int status;
+
+       switch (type)
        {
-               fprintf(outfile,"%d: %d\n", i, blockOwnerArray[i]);
+               case INSERT_CMD:
+                       status = (size == sizeof(struct insertCmd));
+                       break;
+               case INSERT_RES:
+                       status = (size == sizeof(struct insertRes));
+                       break;
+               case REMOVE_CMD:
+                       status = (size == sizeof(struct removeCmd));
+                       break;
+               case REMOVE_RES:
+                       status = (size == sizeof(struct removeRes));
+                       break;
+               case SEARCH_CMD:
+                       status = (size == sizeof(struct searchCmd));
+                       break;
+               case SEARCH_RES:
+                       status = (size == sizeof(struct searchRes));
+                       break;
+               case FIND_LEADER_REQ:
+                       status = (size == sizeof(char));
+                       break;
+               case FIND_LEADER_RES:
+                       status = (size == sizeof(char));
+                       break;
+               case REBUILD_REQ:
+                       status = (size == sizeof(char));
+                       break;
+               case REBUILD_RES:
+                       status = (size == sizeof(char));
+                       break;
+               case NOT_LEADER:
+                       status = (size == sizeof(char));
+                       break;
+               case REBUILD_CMD:
+                       status = (size == sizeof(char));
+                       break;
+               case JOIN_REQ:
+                       status = (size == sizeof(struct joinReq));
+                       break;
+               case JOIN_RES:
+                       status = (size == sizeof(char));
+                       break;
+               case GET_DHT_INFO_CMD:
+                       status = (size == sizeof(char));
+                       break;
+               case DHT_INFO_REQ:
+                       status = (size == sizeof(char));
+                       break;
+               case DHT_INFO_RES:
+                       status = (size == sizeof(char));
+                       break;
+               case FILL_DHT_CMD:
+                       status = (size == sizeof(char));
+                       break;
+               case FILL_DHT_RES:
+                       status = (size == sizeof(char));
+                       break;
+               case REBUILD_DONE_INFO:
+                       status = (size == sizeof(char));
+                       break;
+               default:
+                       status = 0;
+                       break;
        }
+       return status;
+}
+
+void dhtLog(const char *format, ...)
+{
+       va_list args;
+       struct timeval now;
+
+       if (gettimeofday(&now, NULL) < 0)
+       {       perror("dhtLog():gettimeofday()"); }
+       va_start(args, format);
+       if (fprintf(logfile, "%d.%06d:", now.tv_sec, now.tv_usec) < 0)
+       {       perror("dhtLog():fprintf()"); }
+       if (vfprintf(logfile, format, args) < 0)
+       {       perror("dhtLog():vfprintf()"); }
+       if (fflush(logfile) == EOF)
+       {       perror("dhtLog():fflush()"); }
+       va_end(args);
+
+       return;
 }
 
 #endif
 
+