Each host currenty joins by initiating a rebuild. To rebuild, a request
authorerubow <erubow>
Sat, 23 Jun 2007 01:16:59 +0000 (01:16 +0000)
committererubow <erubow>
Sat, 23 Jun 2007 01:16:59 +0000 (01:16 +0000)
is sent to the leader, and the leader acks the request. So far it stops
there, but the leader is in charge of coordinating the rebuild.

Robust/src/Runtime/DSTM/interface/dht.c

index 2a726f508fa6031b9d74a86d73760c30099ccd7b..173ff7b94ee75595d7eb204394e11624e62eca92 100644 (file)
@@ -69,6 +69,9 @@ int dhtSearch(unsigned int key, unsigned int *val)
 #define DEFAULT_INTERFACE "eth0"
 #define DHT_LOG "dht.log"
 
+
+#define NUM_MSG_TYPES 19
+
 enum {
        INSERT_CMD,
        INSERT_RES,
@@ -78,18 +81,41 @@ enum {
        SEARCH_RES,
        FIND_LEADER_CMD,
        FIND_LEADER_RES,
-
        REBUILD_REQ,
+       REBUILD_RES,
+       NOT_LEADER,
        REBUILD_CMD,
        JOIN_REQ,
        JOIN_RES,
-       DHT_INFO_CMD,
+       DHT_INFO_REQ,
        DHT_INFO_RES,
        FILL_DHT_CMD,
        FILL_DHT_RES,
        REBUILD_DONE_INFO
 };
 
+const char *msg_types[NUM_MSG_TYPES] =
+{
+       "INSERT_CMD",
+       "INSERT_RES",
+       "REMOVE_CMD",
+       "REMOVE_RES",
+       "SEARCH_CMD",
+       "SEARCH_RES",
+       "FIND_LEADER_CMD",
+       "FIND_LEADER_RES",
+       "REBUILD_REQ",
+       "REBUILD_RES",
+       "NOT_LEADER",
+       "REBUILD_CMD",
+       "JOIN_REQ",
+       "JOIN_RES",
+       "DHT_INFO_REQ",
+       "DHT_INFO_RES",
+       "FILL_DHT_CMD",
+       "FILL_DHT_RES",
+       "REBUILD_DONE_INFO"
+};
 
 //status codes
 enum {
@@ -99,7 +125,7 @@ enum {
        REMOVE_ERROR,
        KEY_FOUND,
        KEY_NOT_FOUND,
-       NOT_KEY_OWNER
+       NOT_KEY_OWNER,
 };
 
 struct hostData {
@@ -145,6 +171,11 @@ struct searchRes {
        unsigned int val;
 };
 
+struct rebuildRes {
+       unsigned int msgType:8;
+       unsigned int unused:24;
+       unsigned int status;
+};
 
 //TODO: leave message, rebuild message...
 
@@ -157,11 +188,15 @@ struct hostData *hostArray;
 unsigned int numBlocks;
 unsigned int *blockOwnerArray;
 /*----end DHT data----*/
+pthread_t threadUdpListen;
+pthread_t threadTcpListen;
+int udpServerSock;
+int tcpListenSock;
 
 //return my IP address
 unsigned int getMyIpAddr();
 //sends broadcast to discover leader
-unsigned int getLeadersIpAddr();
+unsigned int findLeader();
 //UDP server
 void *udpListen();
 //TCP server
@@ -187,6 +222,9 @@ unsigned int hash(unsigned int x);
 int getDHTdata();
 //outputs readable DHT data to outfile
 void writeDHTdata(FILE *outfile);
+void initRebuild();
+void leadRebuild();
+void followRebuild();
 
 void dhtInit(unsigned int maxKeyCapacity)
 {
@@ -206,7 +244,12 @@ void dhtInit(unsigned int maxKeyCapacity)
        hostArray = NULL;
        blockOwnerArray = NULL;
 
-       leader = getLeadersIpAddr();
+       pthread_create(&threadUdpListen, NULL, udpListen, NULL);
+       pthread_create(&threadTcpListen, NULL, tcpListen, NULL);
+
+       initRebuild();
+
+/*     leader = findLeader();
 
        if (leader == 0)
        { //no response: I am the first
@@ -227,11 +270,9 @@ void dhtInit(unsigned int maxKeyCapacity)
 
                //TODO: actually, just initiate a rebuild here instead
        }
+*/
 
        //start servers
-       pthread_t threadUdpListen, threadTcpListen;
-       pthread_create(&threadUdpListen, NULL, udpListen, NULL);
-       pthread_create(&threadTcpListen, NULL, tcpListen, NULL);
        
        return;
 }
@@ -239,6 +280,10 @@ void dhtInit(unsigned int maxKeyCapacity)
 void dhtExit()
 {
        fclose(logfile);
+       pthread_cancel(threadUdpListen);
+       pthread_cancel(threadTcpListen);
+       close(udpServerSock);
+       close(tcpListenSock);
 }
 
 int dhtInsert(unsigned int key, unsigned int val)
@@ -334,7 +379,6 @@ void *udpListen()
 {
        struct sockaddr_in myAddr;
        struct sockaddr_in clientAddr;
-       int sock;
        socklen_t socklen = sizeof(struct sockaddr_in);
        char buffer[BUFFER_SIZE];
        ssize_t bytesReceived;
@@ -349,7 +393,7 @@ void *udpListen()
 
        chashtable_t *myHashTable = chashCreate(HASH_SIZE, LOADFACTOR);
 
-       if ((sock = socket(AF_INET, SOCK_DGRAM, IPPROTO_UDP)) == -1)
+       if ((udpServerSock = socket(AF_INET, SOCK_DGRAM, IPPROTO_UDP)) == -1)
        {
                perror("udpListen():socket()");
                pthread_exit(NULL);
@@ -360,7 +404,7 @@ void *udpListen()
        myAddr.sin_addr.s_addr=INADDR_ANY;
        myAddr.sin_port=htons(UDP_PORT);
 
-       if (bind(sock, (struct sockaddr *)&myAddr, socklen) == -1)
+       if (bind(udpServerSock, (struct sockaddr *)&myAddr, socklen) == -1)
        {
                perror("udpListen():bind()");
                pthread_exit(NULL);
@@ -371,160 +415,194 @@ void *udpListen()
 #endif
        while(1)
        {
-               if ((bytesReceived = recvfrom(sock, buffer, BUFFER_SIZE, 0,
+               if ((bytesReceived = recvfrom(udpServerSock, buffer, BUFFER_SIZE, 0,
                        (struct sockaddr *)&clientAddr, &socklen)) == -1)
                {
                        perror("udpListen():recvfrom()");
-                       break;
                }
-               if (bytesReceived == 0)
+               else if (bytesReceived == 0)
                {
 #ifdef DHT_LOG
                        fprintf(logfile,"udpListen(): recvfrom() returned 0\n");
                        fflush(logfile);
 #endif
-                       break;
                }
-               gettimeofday(&now, NULL);
+               else
+               {
+                       gettimeofday(&now, NULL);
 #ifdef DHT_LOG
-               fprintf(logfile,"udpListen(): message received:%ds,%dus\n", now.tv_sec,
-                       now.tv_usec);
-               fprintf(logfile,"udpListen(): received %d bytes from %s:%d\n",
-                       bytesReceived,inet_ntoa(clientAddr.sin_addr),htons(clientAddr.sin_port));
-               fflush(logfile);
+                       fprintf(logfile, "udpListen(): received %s from %s\n",
+                               (buffer[0] < NUM_MSG_TYPES ? msg_types[buffer[0]] : "unknown message"),
+                               inet_ntoa(clientAddr.sin_addr));
+//                     fprintf(logfile,"udpListen(): time received:%ds,%dus\n", now.tv_sec,
+//                             now.tv_usec);
+//                     fprintf(logfile,"udpListen(): msg size:%d bytes source:%s:%d\n",
+//                             bytesReceived,inet_ntoa(clientAddr.sin_addr),htons(clientAddr.sin_port));
+                       fflush(logfile);
 #endif
 
-               switch (buffer[0])
-               {
-                       case INSERT_CMD:
-                               if (bytesReceived != sizeof(struct insertCmd))
-                               {
+                       switch (buffer[0])
+                       {
+                               case INSERT_CMD:
+                                       if (bytesReceived != sizeof(struct insertCmd))
+                                       {
+#ifdef DHT_LOG
+                                               fprintf(logfile, "udpListen(): ERROR: incorrect message size\n");
+                                               fflush(logfile);
+#endif
+                                               break;
+                                       }
+                                       insertCmdPtr = (struct insertCmd *)buffer;
 #ifdef DHT_LOG
-                                       fprintf(logfile, "udpListen(): ERROR: incorrect message size\n");
+                                       fprintf(logfile, "udpListen(): Insert: key=%d, val=%d\n",
+                                               insertCmdPtr->key, insertCmdPtr->val);
                                        fflush(logfile);
 #endif
+                                       insertResPtr = (struct insertRes *)replyBuffer;
+                                       insertResPtr->msgType = INSERT_RES;
+                                       if (getKeyOwner(insertCmdPtr->key) == myHostData.ipAddr)
+                                       {
+                                               //note: casting val to void * in order to conform to API
+                                               if(chashInsert(myHashTable, insertCmdPtr->key,
+                                                               (void *)insertCmdPtr->val) == 0)
+                                                       insertResPtr->status = INSERT_OK;
+                                               else
+                                                       insertResPtr->status = INSERT_ERROR;
+                                       }
+                                       else
+                                       {
+                                               insertResPtr->status = NOT_KEY_OWNER;;
+                                       }
+                                       if (sendto(udpServerSock, (void *)insertResPtr,
+                                               sizeof(struct insertRes), 0, (struct sockaddr *)&clientAddr,
+                                               socklen) == -1)
+                                       {
+                                               perror("udpListen():sendto()");
+                                       }
                                        break;
-                               }
-                               insertCmdPtr = (struct insertCmd *)buffer;
+                               case REMOVE_CMD:
+                                       if (bytesReceived != sizeof(struct removeCmd))
+                                       {
 #ifdef DHT_LOG
-                               fprintf(logfile, "udpListen(): Insert: key=%d, val=%d\n",
-                                       insertCmdPtr->key, insertCmdPtr->val);
-                               fflush(logfile);
+                                               fprintf(logfile, "udpListen(): ERROR: incorrect message size\n");
+                                               fflush(logfile);
 #endif
-                               insertResPtr = (struct insertRes *)replyBuffer;
-                               insertResPtr->msgType = INSERT_RES;
-                               if (getKeyOwner(insertCmdPtr->key) == myHostData.ipAddr)
-                               {
-                                       //note: casting val to void * in order to conform to API
-                                       if(chashInsert(myHashTable, insertCmdPtr->key,
-                                                       (void *)insertCmdPtr->val) == 0)
-                                               insertResPtr->status = INSERT_OK;
-                                       else
-                                               insertResPtr->status = INSERT_ERROR;
-                               }
-                               else
-                               {
-                                       insertResPtr->status = NOT_KEY_OWNER;;
-                               }
-                               if (sendto(sock, (void *)insertResPtr, sizeof(struct insertRes), 0,
-                                       (struct sockaddr *)&clientAddr, socklen) == -1)
-                               {
-                                       perror("udpListen():sendto()");
-                               }
-                               break;
-                       case REMOVE_CMD:
-                               if (bytesReceived != sizeof(struct removeCmd))
-                               {
+                                               break;
+                                       }
+                                       removeCmdPtr = (struct removeCmd *)buffer;
 #ifdef DHT_LOG
-                                       fprintf(logfile, "udpListen(): ERROR: incorrect message size\n");
+                                       fprintf(logfile,"udpListen(): Remove: key=%d\n", removeCmdPtr->key);
                                        fflush(logfile);
 #endif
+                                       removeResPtr = (struct removeRes *)replyBuffer;
+                                       removeResPtr->msgType = REMOVE_RES;
+                                       if (getKeyOwner(removeCmdPtr->key) == myHostData.ipAddr)
+                                       {
+                                               //note: casting val to void * in order to conform to API
+                                               if(chashRemove(myHashTable, removeCmdPtr->key) == 0)
+                                                       removeResPtr->status = INSERT_OK;
+                                               else
+                                                       removeResPtr->status = INSERT_ERROR;
+                                       }
+                                       else
+                                       {
+                                               removeResPtr->status = NOT_KEY_OWNER;
+                                       }
+                                       if (sendto(udpServerSock, (void *)removeResPtr, sizeof(struct removeRes), 0,
+                                               (struct sockaddr *)&clientAddr, socklen) == -1)
+                                       {
+                                               perror("udpListen():sendto()");
+                                       }
                                        break;
-                               }
-                               removeCmdPtr = (struct removeCmd *)buffer;
+                               case SEARCH_CMD:
+                                       if (bytesReceived != sizeof(struct searchCmd))
+                                       {
 #ifdef DHT_LOG
-                               fprintf(logfile,"udpListen(): Remove: key=%d\n", removeCmdPtr->key);
-                               fflush(logfile);
+                                               fprintf(logfile,"udpListen(): ERROR: incorrect message size\n");
+                                               fflush(logfile);
 #endif
-                               removeResPtr = (struct removeRes *)replyBuffer;
-                               removeResPtr->msgType = REMOVE_RES;
-                               if (getKeyOwner(removeCmdPtr->key) == myHostData.ipAddr)
-                               {
-                                       //note: casting val to void * in order to conform to API
-                                       if(chashRemove(myHashTable, removeCmdPtr->key) == 0)
-                                               removeResPtr->status = INSERT_OK;
-                                       else
-                                               removeResPtr->status = INSERT_ERROR;
-                               }
-                               else
-                               {
-                                       removeResPtr->status = NOT_KEY_OWNER;
-                               }
-                               if (sendto(sock, (void *)removeResPtr, sizeof(struct removeRes), 0,
-                                       (struct sockaddr *)&clientAddr, socklen) == -1)
-                               {
-                                       perror("udpListen():sendto()");
-                               }
-                               break;
-                       case SEARCH_CMD:
-                               if (bytesReceived != sizeof(struct searchCmd))
-                               {
+                                               break;
+                                       }
+                                       searchCmdPtr = (struct searchCmd *)buffer;
 #ifdef DHT_LOG
-                                       fprintf(logfile,"udpListen(): ERROR: incorrect message size\n");
-                                       fflush(logfile);
+                                               fprintf(logfile,"udpListen(): Search: key=%d\n",searchCmdPtr->key);
+                                               fflush(logfile);
 #endif
+                                       searchResPtr = (struct searchRes *)replyBuffer;
+                                       searchResPtr->msgType = SEARCH_RES;
+                                       if (getKeyOwner(searchCmdPtr->key) == myHostData.ipAddr)
+                                       {
+                                               //note: casting val to void * in order to conform to API
+                                               if((searchResPtr->val = (unsigned int)chashSearch(myHashTable,
+                                                               searchCmdPtr->key)) == 0)
+                                                       searchResPtr->status = KEY_NOT_FOUND;
+                                               else
+                                                       searchResPtr->status = KEY_FOUND;
+                                       }
+                                       else
+                                       {
+                                               searchResPtr->status = NOT_KEY_OWNER;
+                                       }
+                                       if (sendto(udpServerSock, (void *)searchResPtr, sizeof(struct searchRes), 0,
+                                               (struct sockaddr *)&clientAddr, socklen) == -1)
+                                       {
+                                               perror("udpListen():sendto()");
+                                       }
                                        break;
-                               }
-                               searchCmdPtr = (struct searchCmd *)buffer;
+                               case FIND_LEADER_CMD:
+                                       if (bytesReceived != sizeof(char))
+                                       {
 #ifdef DHT_LOG
-                                       fprintf(logfile,"udpListen(): Search: key=%d\n",searchCmdPtr->key);
-                                       fflush(logfile);
+                                               fprintf(logfile, "udpListen(): ERROR: incorrect message size\n");
+                                               fflush(logfile);
 #endif
-                               searchResPtr = (struct searchRes *)replyBuffer;
-                               searchResPtr->msgType = SEARCH_RES;
-                               if (getKeyOwner(searchCmdPtr->key) == myHostData.ipAddr)
-                               {
-                                       //note: casting val to void * in order to conform to API
-                                       if((searchResPtr->val = (unsigned int)chashSearch(myHashTable,
-                                                       searchCmdPtr->key)) == 0)
-                                               searchResPtr->status = KEY_NOT_FOUND;
-                                       else
-                                               searchResPtr->status = KEY_FOUND;
-                               }
-                               else
-                               {
-                                       searchResPtr->status = NOT_KEY_OWNER;
-                               }
-                               if (sendto(sock, (void *)searchResPtr, sizeof(struct searchRes), 0,
-                                       (struct sockaddr *)&clientAddr, socklen) == -1)
-                               {
-                                       perror("udpListen():sendto()");
-                               }
-                               break;
-                       case FIND_LEADER_CMD:
-                               if (bytesReceived != sizeof(char))
-                               {
+                                               break;
+                                       }
+                                       if (leader == getMyIpAddr())
+                                       {
+                                               replyBuffer[0] = FIND_LEADER_RES;
+                                               if(sendto(udpServerSock, (void *)replyBuffer, sizeof(char), 0,
+                                                       (struct sockaddr *)&clientAddr, socklen) == -1)
+                                               {
+                                                       perror("udpListen():sendto");
+                                               }
+                                       }
+                                       break;
+                               case REBUILD_REQ:
+                                       if (bytesReceived != sizeof(char))
+                                       {
 #ifdef DHT_LOG
-                                       fprintf(logfile, "udpListen(): ERROR: incorrect message size\n");
-                                       fflush(logfile);
+                                               fprintf(logfile, "udpListen(): ERROR: incorrect message size\n");
+                                               fflush(logfile);
 #endif
-                                       break;
-                               }
-                               if (leader == getMyIpAddr())
-                               {
-                                       replyBuffer[0] = FIND_LEADER_RES;
-                                       if(sendto(sock, (void *)replyBuffer, sizeof(char), 0,
-                                               (struct sockaddr *)&clientAddr, socklen) == -1)
+                                               break;
+                                       }
+                                       if (leader == getMyIpAddr())
                                        {
-                                               perror("udpListen():sendto");
+                                               replyBuffer[0] = REBUILD_RES;
+                                               if(sendto(udpServerSock, (void *)replyBuffer, sizeof(char), 0,
+                                                       (struct sockaddr *)&clientAddr, socklen) == -1)
+                                               {
+                                                       perror("udpListen():sendto");
+                                               }
+                                               //TODO: leadRebuild()
                                        }
-                               }
-                               break;
-                       default:
+                                       else
+                                       {
+                                               replyBuffer[0] = NOT_LEADER;
+                                               if(sendto(udpServerSock, (void *)replyBuffer, sizeof(char), 0,
+                                                       (struct sockaddr *)&clientAddr, socklen) == -1)
+                                               {
+                                                       perror("udpListen():sendto");
+                                               }
+                                       }
+                                       break;
+//                             default:
 #ifdef DHT_LOG
-                               fprintf(logfile,"udpListen(): ERROR: Unknown message type\n");
-                               fflush(logfile);
+//                                     fprintf(logfile,"udpListen(): ERROR: Unknown message type\n");
+//                                     fflush(logfile);
 #endif
+                       }
                }
        }
 }
@@ -686,12 +764,12 @@ void *tcpListen()
 {
        struct sockaddr_in myAddr;
        struct sockaddr_in clientAddr;
-       int sockListen, sockAccept;
+       int tcpAcceptSock;
        socklen_t socklen = sizeof(struct sockaddr_in);
        pthread_t threadTcpAccept;
 
-       sockListen = socket(AF_INET, SOCK_STREAM, 0);
-       if (sockListen == -1)
+       tcpListenSock = socket(AF_INET, SOCK_STREAM, 0);
+       if (tcpListenSock == -1)
        {
                perror("tcpListen():socket()");
                pthread_exit(NULL);
@@ -702,13 +780,13 @@ void *tcpListen()
        myAddr.sin_addr.s_addr = INADDR_ANY;
        memset(&(myAddr.sin_zero), '\0', 8);
 
-       if (bind(sockListen, (struct sockaddr *)&myAddr, socklen) == -1)
+       if (bind(tcpListenSock, (struct sockaddr *)&myAddr, socklen) == -1)
        {
                perror("tcpListen():socket()");
                pthread_exit(NULL);
        }
 
-       if (listen(sockListen, BACKLOG) == -1)
+       if (listen(tcpListenSock, BACKLOG) == -1)
        {
                perror("tcpListen():listen()");
                pthread_exit(NULL);
@@ -721,23 +799,23 @@ void *tcpListen()
 
        while(1)
        {
-               sockAccept = accept(sockListen, (struct sockaddr *)&clientAddr, &socklen);
-               pthread_create(&threadTcpAccept, NULL, tcpAccept, (void *)sockAccept);
+               tcpAcceptSock = accept(tcpListenSock, (struct sockaddr *)&clientAddr, &socklen);
+               pthread_create(&threadTcpAccept, NULL, tcpAccept, (void *)tcpAcceptSock);
        }
 }
 
 void *tcpAccept(void *arg)
 {
-       int sockAccept = (int)arg;
+       int tcpAcceptSock = (int)arg;
        int bytesReceived;
        char msgType;
 
 #ifdef DHT_LOG
-       fprintf(logfile, "tcpAccept(): accepted tcp connection, file descriptor: %d\n", sockAccept);
+       fprintf(logfile, "tcpAccept(): accepted tcp connection, file descriptor: %d\n", tcpAcceptSock);
        fflush(logfile);
 #endif
 
-       bytesReceived = recv(sockAccept, &msgType, sizeof(char), 0);
+       bytesReceived = recv(tcpAcceptSock, &msgType, sizeof(char), 0);
        if (bytesReceived == -1)
        {
                perror("tcpAccept():recv()");
@@ -745,7 +823,7 @@ void *tcpAccept(void *arg)
        else if (bytesReceived == 0)
        {
 #ifdef DHT_LOG
-               fprintf(logfile, "tcpAccept(): bytesReceived = 0\n", sockAccept);
+               fprintf(logfile, "tcpAccept(): bytesReceived = 0\n", tcpAcceptSock);
                fflush(logfile);
 #endif
        }
@@ -753,24 +831,24 @@ void *tcpAccept(void *arg)
        {
                switch (msgType)
                {
-                       case DHT_INFO_CMD:
-                               if (send(sockAccept, &numHosts, sizeof(numHosts), 0) == -1)
+                       case DHT_INFO_REQ:
+                               if (send(tcpAcceptSock, &numHosts, sizeof(numHosts), 0) == -1)
                                {
                                        perror("tcpAccept():send()");
                                        break;
                                }
-                               if (send(sockAccept, &numBlocks, sizeof(numBlocks), 0) == -1)
+                               if (send(tcpAcceptSock, &numBlocks, sizeof(numBlocks), 0) == -1)
                                {
                                        perror("tcpAccept():send()");
                                        break;
                                }
-                               if (send(sockAccept, hostArray, numHosts*sizeof(struct hostData),
+                               if (send(tcpAcceptSock, hostArray, numHosts*sizeof(struct hostData),
                                                0) == -1)
                                {
                                        perror("tcpAccept():send()");
                                        break;
                                }
-                               if (send(sockAccept, blockOwnerArray, numBlocks*sizeof(unsigned int),
+                               if (send(tcpAcceptSock, blockOwnerArray, numBlocks*sizeof(unsigned int),
                                                0) == -1)
                                {
                                        perror("tcpAccept():send()");
@@ -785,14 +863,14 @@ void *tcpAccept(void *arg)
                }
        }
 
-       if (close(sockAccept) == -1)
+       if (close(tcpAcceptSock) == -1)
        {
                perror("tcpAccept():close()");
        }
 
 #ifdef DHT_LOG
        fprintf(logfile, "tcpAccept(): closed tcp connection, file descriptor: %d\n",
-               sockAccept);
+               tcpAcceptSock);
        fflush(logfile);
 #endif
 
@@ -830,7 +908,7 @@ unsigned int getMyIpAddr()
        return ntohl(myAddr->sin_addr.s_addr);
 }
 
-unsigned int getLeadersIpAddr()
+unsigned int findLeader()
 {
        unsigned int reply_ip;
        int bytesReceived;
@@ -838,7 +916,7 @@ unsigned int getLeadersIpAddr()
        char response;
 
 #ifdef DHT_LOG
-       fprintf(logfile, "getLeadersIpAddr(): broadcasting...\n");
+       fprintf(logfile, "findLeader(): broadcasting...\n");
        fflush(logfile);
 #endif
 
@@ -851,27 +929,27 @@ unsigned int getLeadersIpAddr()
        if (bytesReceived == -1)
        {
 #ifdef DHT_LOG
-       fprintf(logfile, "getLeadersIpAddr(): no response\n");
-       fflush(logfile);
+               fprintf(logfile, "findLeader(): no response\n");
+               fflush(logfile);
 #endif
                return 0;
        }
        else if (response == FIND_LEADER_RES)
        {
 #ifdef DHT_LOG
-       struct in_addr reply_addr;
-       reply_addr.s_addr = htonl(reply_ip);
-       fprintf(logfile, "getLeadersIpAddr(): leader found:%s\n",
+               struct in_addr reply_addr;
+               reply_addr.s_addr = htonl(reply_ip);
+               fprintf(logfile, "findLeader(): leader found:%s\n",
                                        inet_ntoa(reply_addr));
-       fflush(logfile);
+               fflush(logfile);
 #endif
                return reply_ip;
        }
        else
        {
 #ifdef DHT_LOG
-       fprintf(logfile, "getLeadersIpAddr(): unexpected response\n");
-       fflush(logfile);
+               fprintf(logfile, "findLeader(): unexpected response\n");
+               fflush(logfile);
 #endif
                return 0;
        }
@@ -901,7 +979,7 @@ int getDHTdata()
                close(sock);
                return -1;
        }
-       msg = DHT_INFO_CMD;
+       msg = DHT_INFO_REQ;
        if (send(sock, &msg, sizeof(char), 0) == -1)
        {
                perror("getDHTdata():send()");
@@ -920,9 +998,9 @@ int getDHTdata()
 #ifdef DHT_LOG
                fprintf(logfile,"getDHTdata(): ERROR: numHosts not completely received\n");
                fflush(logfile);
+#endif
                close(sock);
                return -1;
-#endif
        }
        bytesReceived = recv(sock, &numBlocks, sizeof(numBlocks), 0);
        if (bytesReceived == -1)
@@ -936,9 +1014,9 @@ int getDHTdata()
 #ifdef DHT_LOG
                fprintf(logfile,"getDHTdata(): ERROR: numBlocks not completely received\n");
                fflush(logfile);
+#endif
                close(sock);
                return -1;
-#endif
        }
        if (hostArray != NULL)
                free(hostArray);
@@ -955,9 +1033,9 @@ int getDHTdata()
 #ifdef DHT_LOG
                fprintf(logfile,"getDHTdata(): ERROR: hostArray not completely received\n");
                fflush(logfile);
+#endif
                close(sock);
                return -1;
-#endif
        }
        if (blockOwnerArray != NULL)
                free(blockOwnerArray);
@@ -974,9 +1052,9 @@ int getDHTdata()
 #ifdef DHT_LOG
                fprintf(logfile,"getDHTdata(): ERROR: blockOwnerArray not completely received\n");
                fflush(logfile);
+#endif
                close(sock);
                return -1;
-#endif
        }
 #ifdef DHT_LOG
                fprintf(logfile,"getDHTdata(): got data:\n");
@@ -991,11 +1069,105 @@ unsigned int hash(unsigned int x)
        return x % numBlocks;
 }
 
+//This function will not return until it succeeds in submitting
+// a rebuild request to the leader. It is then the leader's responibility
+// to ensure that the rebuild is caried out
+void initRebuild()
+{
+       int bytesReceived;
+       char msg;
+       char response;
+       int done;
+       int retry_count;
+       int i;
+
+       done = 0;
+       retry_count = 0;
+
+       while (!done)
+       {
+#ifdef DHT_LOG
+               if (retry_count > 0)
+               {
+                       fprintf(logfile,"initRebuild(): retry count:%d\n", retry_count);
+                       fflush(logfile);
+               }
+#endif
+
+               if (leader == 0 || retry_count > 0)
+               {
+                       leader = findLeader(); //broadcast
+                       if (leader == 0) //no response
+                       {
+                               //TODO:elect leader: this will do for now
+                               leader = getMyIpAddr();
+
+                               numHosts = 1;
+                               hostArray = calloc(numHosts, sizeof(struct hostData));
+                               hostArray[0] = myHostData;
+                               numBlocks = INIT_BLOCK_NUM;
+                               blockOwnerArray = calloc(numBlocks, sizeof(unsigned int));
+                               for (i = 0; i < numBlocks; i++)
+                                       blockOwnerArray[i] = 0;
+                       }
+               }
+       
+               msg = REBUILD_REQ;
+
+               bytesReceived = udpSendWaitForResponse(leader, UDP_PORT,
+                       (void *)&msg, sizeof(msg), (void *)&response, sizeof(response),
+                       TIMEOUT_MS, MAX_RETRIES);
+               if (bytesReceived == -1)
+               {
+                       perror("initRebuild():recv()");
+               }
+               else if (bytesReceived != sizeof(response))
+               {
+#ifdef DHT_LOG
+                       fprintf(logfile,"initRebuild(): ERROR: response not completely received\n");
+                       fflush(logfile);
+#endif
+               }
+               else if (response == NOT_LEADER)
+               {
+#ifdef DHT_LOG
+                       struct in_addr address;
+                       address.s_addr = htonl(leader);
+                       fprintf(logfile,"initRebuild(): ERROR: %s no longer leader\n",
+                               inet_ntoa(address));
+                       fflush(logfile);
+#endif
+               }
+               else if (response != REBUILD_RES)
+               {
+#ifdef DHT_LOG
+                       fprintf(logfile,"initRebuild(): ERROR: unexpected response\n");
+                       fflush(logfile);
+#endif
+               }
+               else
+               {
+#ifdef DHT_LOG
+                       fprintf(logfile,"initRebuild(): submitted rebuild request\n");
+                       writeDHTdata(logfile);
+                       fflush(logfile);
+#endif
+                       done = 1;
+               }
+       }
+       return;
+}
+
 void leadRebuild()
 {
        
 }
 
+void followRebuild()
+{
+
+}
+
 void writeDHTdata(FILE *outfile)
 {
        int i;