From: erubow Date: Sat, 23 Jun 2007 01:16:59 +0000 (+0000) Subject: Each host currenty joins by initiating a rebuild. To rebuild, a request X-Git-Tag: preEdgeChange~541 X-Git-Url: http://demsky.eecs.uci.edu/git/?a=commitdiff_plain;h=bcc8159986604ccbc86d8f991eb815edfd47a48b;p=IRC.git Each host currenty joins by initiating a rebuild. To rebuild, a request is sent to the leader, and the leader acks the request. So far it stops there, but the leader is in charge of coordinating the rebuild. --- diff --git a/Robust/src/Runtime/DSTM/interface/dht.c b/Robust/src/Runtime/DSTM/interface/dht.c index 2a726f50..173ff7b9 100644 --- a/Robust/src/Runtime/DSTM/interface/dht.c +++ b/Robust/src/Runtime/DSTM/interface/dht.c @@ -69,6 +69,9 @@ int dhtSearch(unsigned int key, unsigned int *val) #define DEFAULT_INTERFACE "eth0" #define DHT_LOG "dht.log" + +#define NUM_MSG_TYPES 19 + enum { INSERT_CMD, INSERT_RES, @@ -78,18 +81,41 @@ enum { SEARCH_RES, FIND_LEADER_CMD, FIND_LEADER_RES, - REBUILD_REQ, + REBUILD_RES, + NOT_LEADER, REBUILD_CMD, JOIN_REQ, JOIN_RES, - DHT_INFO_CMD, + DHT_INFO_REQ, DHT_INFO_RES, FILL_DHT_CMD, FILL_DHT_RES, REBUILD_DONE_INFO }; +const char *msg_types[NUM_MSG_TYPES] = +{ + "INSERT_CMD", + "INSERT_RES", + "REMOVE_CMD", + "REMOVE_RES", + "SEARCH_CMD", + "SEARCH_RES", + "FIND_LEADER_CMD", + "FIND_LEADER_RES", + "REBUILD_REQ", + "REBUILD_RES", + "NOT_LEADER", + "REBUILD_CMD", + "JOIN_REQ", + "JOIN_RES", + "DHT_INFO_REQ", + "DHT_INFO_RES", + "FILL_DHT_CMD", + "FILL_DHT_RES", + "REBUILD_DONE_INFO" +}; //status codes enum { @@ -99,7 +125,7 @@ enum { REMOVE_ERROR, KEY_FOUND, KEY_NOT_FOUND, - NOT_KEY_OWNER + NOT_KEY_OWNER, }; struct hostData { @@ -145,6 +171,11 @@ struct searchRes { unsigned int val; }; +struct rebuildRes { + unsigned int msgType:8; + unsigned int unused:24; + unsigned int status; +}; //TODO: leave message, rebuild message... @@ -157,11 +188,15 @@ struct hostData *hostArray; unsigned int numBlocks; unsigned int *blockOwnerArray; /*----end DHT data----*/ +pthread_t threadUdpListen; +pthread_t threadTcpListen; +int udpServerSock; +int tcpListenSock; //return my IP address unsigned int getMyIpAddr(); //sends broadcast to discover leader -unsigned int getLeadersIpAddr(); +unsigned int findLeader(); //UDP server void *udpListen(); //TCP server @@ -187,6 +222,9 @@ unsigned int hash(unsigned int x); int getDHTdata(); //outputs readable DHT data to outfile void writeDHTdata(FILE *outfile); +void initRebuild(); +void leadRebuild(); +void followRebuild(); void dhtInit(unsigned int maxKeyCapacity) { @@ -206,7 +244,12 @@ void dhtInit(unsigned int maxKeyCapacity) hostArray = NULL; blockOwnerArray = NULL; - leader = getLeadersIpAddr(); + pthread_create(&threadUdpListen, NULL, udpListen, NULL); + pthread_create(&threadTcpListen, NULL, tcpListen, NULL); + + initRebuild(); + +/* leader = findLeader(); if (leader == 0) { //no response: I am the first @@ -227,11 +270,9 @@ void dhtInit(unsigned int maxKeyCapacity) //TODO: actually, just initiate a rebuild here instead } +*/ //start servers - pthread_t threadUdpListen, threadTcpListen; - pthread_create(&threadUdpListen, NULL, udpListen, NULL); - pthread_create(&threadTcpListen, NULL, tcpListen, NULL); return; } @@ -239,6 +280,10 @@ void dhtInit(unsigned int maxKeyCapacity) void dhtExit() { fclose(logfile); + pthread_cancel(threadUdpListen); + pthread_cancel(threadTcpListen); + close(udpServerSock); + close(tcpListenSock); } int dhtInsert(unsigned int key, unsigned int val) @@ -334,7 +379,6 @@ void *udpListen() { struct sockaddr_in myAddr; struct sockaddr_in clientAddr; - int sock; socklen_t socklen = sizeof(struct sockaddr_in); char buffer[BUFFER_SIZE]; ssize_t bytesReceived; @@ -349,7 +393,7 @@ void *udpListen() chashtable_t *myHashTable = chashCreate(HASH_SIZE, LOADFACTOR); - if ((sock = socket(AF_INET, SOCK_DGRAM, IPPROTO_UDP)) == -1) + if ((udpServerSock = socket(AF_INET, SOCK_DGRAM, IPPROTO_UDP)) == -1) { perror("udpListen():socket()"); pthread_exit(NULL); @@ -360,7 +404,7 @@ void *udpListen() myAddr.sin_addr.s_addr=INADDR_ANY; myAddr.sin_port=htons(UDP_PORT); - if (bind(sock, (struct sockaddr *)&myAddr, socklen) == -1) + if (bind(udpServerSock, (struct sockaddr *)&myAddr, socklen) == -1) { perror("udpListen():bind()"); pthread_exit(NULL); @@ -371,160 +415,194 @@ void *udpListen() #endif while(1) { - if ((bytesReceived = recvfrom(sock, buffer, BUFFER_SIZE, 0, + if ((bytesReceived = recvfrom(udpServerSock, buffer, BUFFER_SIZE, 0, (struct sockaddr *)&clientAddr, &socklen)) == -1) { perror("udpListen():recvfrom()"); - break; } - if (bytesReceived == 0) + else if (bytesReceived == 0) { #ifdef DHT_LOG fprintf(logfile,"udpListen(): recvfrom() returned 0\n"); fflush(logfile); #endif - break; } - gettimeofday(&now, NULL); + else + { + gettimeofday(&now, NULL); #ifdef DHT_LOG - fprintf(logfile,"udpListen(): message received:%ds,%dus\n", now.tv_sec, - now.tv_usec); - fprintf(logfile,"udpListen(): received %d bytes from %s:%d\n", - bytesReceived,inet_ntoa(clientAddr.sin_addr),htons(clientAddr.sin_port)); - fflush(logfile); + fprintf(logfile, "udpListen(): received %s from %s\n", + (buffer[0] < NUM_MSG_TYPES ? msg_types[buffer[0]] : "unknown message"), + inet_ntoa(clientAddr.sin_addr)); +// fprintf(logfile,"udpListen(): time received:%ds,%dus\n", now.tv_sec, +// now.tv_usec); +// fprintf(logfile,"udpListen(): msg size:%d bytes source:%s:%d\n", +// bytesReceived,inet_ntoa(clientAddr.sin_addr),htons(clientAddr.sin_port)); + fflush(logfile); #endif - switch (buffer[0]) - { - case INSERT_CMD: - if (bytesReceived != sizeof(struct insertCmd)) - { + switch (buffer[0]) + { + case INSERT_CMD: + if (bytesReceived != sizeof(struct insertCmd)) + { +#ifdef DHT_LOG + fprintf(logfile, "udpListen(): ERROR: incorrect message size\n"); + fflush(logfile); +#endif + break; + } + insertCmdPtr = (struct insertCmd *)buffer; #ifdef DHT_LOG - fprintf(logfile, "udpListen(): ERROR: incorrect message size\n"); + fprintf(logfile, "udpListen(): Insert: key=%d, val=%d\n", + insertCmdPtr->key, insertCmdPtr->val); fflush(logfile); #endif + insertResPtr = (struct insertRes *)replyBuffer; + insertResPtr->msgType = INSERT_RES; + if (getKeyOwner(insertCmdPtr->key) == myHostData.ipAddr) + { + //note: casting val to void * in order to conform to API + if(chashInsert(myHashTable, insertCmdPtr->key, + (void *)insertCmdPtr->val) == 0) + insertResPtr->status = INSERT_OK; + else + insertResPtr->status = INSERT_ERROR; + } + else + { + insertResPtr->status = NOT_KEY_OWNER;; + } + if (sendto(udpServerSock, (void *)insertResPtr, + sizeof(struct insertRes), 0, (struct sockaddr *)&clientAddr, + socklen) == -1) + { + perror("udpListen():sendto()"); + } break; - } - insertCmdPtr = (struct insertCmd *)buffer; + case REMOVE_CMD: + if (bytesReceived != sizeof(struct removeCmd)) + { #ifdef DHT_LOG - fprintf(logfile, "udpListen(): Insert: key=%d, val=%d\n", - insertCmdPtr->key, insertCmdPtr->val); - fflush(logfile); + fprintf(logfile, "udpListen(): ERROR: incorrect message size\n"); + fflush(logfile); #endif - insertResPtr = (struct insertRes *)replyBuffer; - insertResPtr->msgType = INSERT_RES; - if (getKeyOwner(insertCmdPtr->key) == myHostData.ipAddr) - { - //note: casting val to void * in order to conform to API - if(chashInsert(myHashTable, insertCmdPtr->key, - (void *)insertCmdPtr->val) == 0) - insertResPtr->status = INSERT_OK; - else - insertResPtr->status = INSERT_ERROR; - } - else - { - insertResPtr->status = NOT_KEY_OWNER;; - } - if (sendto(sock, (void *)insertResPtr, sizeof(struct insertRes), 0, - (struct sockaddr *)&clientAddr, socklen) == -1) - { - perror("udpListen():sendto()"); - } - break; - case REMOVE_CMD: - if (bytesReceived != sizeof(struct removeCmd)) - { + break; + } + removeCmdPtr = (struct removeCmd *)buffer; #ifdef DHT_LOG - fprintf(logfile, "udpListen(): ERROR: incorrect message size\n"); + fprintf(logfile,"udpListen(): Remove: key=%d\n", removeCmdPtr->key); fflush(logfile); #endif + removeResPtr = (struct removeRes *)replyBuffer; + removeResPtr->msgType = REMOVE_RES; + if (getKeyOwner(removeCmdPtr->key) == myHostData.ipAddr) + { + //note: casting val to void * in order to conform to API + if(chashRemove(myHashTable, removeCmdPtr->key) == 0) + removeResPtr->status = INSERT_OK; + else + removeResPtr->status = INSERT_ERROR; + } + else + { + removeResPtr->status = NOT_KEY_OWNER; + } + if (sendto(udpServerSock, (void *)removeResPtr, sizeof(struct removeRes), 0, + (struct sockaddr *)&clientAddr, socklen) == -1) + { + perror("udpListen():sendto()"); + } break; - } - removeCmdPtr = (struct removeCmd *)buffer; + case SEARCH_CMD: + if (bytesReceived != sizeof(struct searchCmd)) + { #ifdef DHT_LOG - fprintf(logfile,"udpListen(): Remove: key=%d\n", removeCmdPtr->key); - fflush(logfile); + fprintf(logfile,"udpListen(): ERROR: incorrect message size\n"); + fflush(logfile); #endif - removeResPtr = (struct removeRes *)replyBuffer; - removeResPtr->msgType = REMOVE_RES; - if (getKeyOwner(removeCmdPtr->key) == myHostData.ipAddr) - { - //note: casting val to void * in order to conform to API - if(chashRemove(myHashTable, removeCmdPtr->key) == 0) - removeResPtr->status = INSERT_OK; - else - removeResPtr->status = INSERT_ERROR; - } - else - { - removeResPtr->status = NOT_KEY_OWNER; - } - if (sendto(sock, (void *)removeResPtr, sizeof(struct removeRes), 0, - (struct sockaddr *)&clientAddr, socklen) == -1) - { - perror("udpListen():sendto()"); - } - break; - case SEARCH_CMD: - if (bytesReceived != sizeof(struct searchCmd)) - { + break; + } + searchCmdPtr = (struct searchCmd *)buffer; #ifdef DHT_LOG - fprintf(logfile,"udpListen(): ERROR: incorrect message size\n"); - fflush(logfile); + fprintf(logfile,"udpListen(): Search: key=%d\n",searchCmdPtr->key); + fflush(logfile); #endif + searchResPtr = (struct searchRes *)replyBuffer; + searchResPtr->msgType = SEARCH_RES; + if (getKeyOwner(searchCmdPtr->key) == myHostData.ipAddr) + { + //note: casting val to void * in order to conform to API + if((searchResPtr->val = (unsigned int)chashSearch(myHashTable, + searchCmdPtr->key)) == 0) + searchResPtr->status = KEY_NOT_FOUND; + else + searchResPtr->status = KEY_FOUND; + } + else + { + searchResPtr->status = NOT_KEY_OWNER; + } + if (sendto(udpServerSock, (void *)searchResPtr, sizeof(struct searchRes), 0, + (struct sockaddr *)&clientAddr, socklen) == -1) + { + perror("udpListen():sendto()"); + } break; - } - searchCmdPtr = (struct searchCmd *)buffer; + case FIND_LEADER_CMD: + if (bytesReceived != sizeof(char)) + { #ifdef DHT_LOG - fprintf(logfile,"udpListen(): Search: key=%d\n",searchCmdPtr->key); - fflush(logfile); + fprintf(logfile, "udpListen(): ERROR: incorrect message size\n"); + fflush(logfile); #endif - searchResPtr = (struct searchRes *)replyBuffer; - searchResPtr->msgType = SEARCH_RES; - if (getKeyOwner(searchCmdPtr->key) == myHostData.ipAddr) - { - //note: casting val to void * in order to conform to API - if((searchResPtr->val = (unsigned int)chashSearch(myHashTable, - searchCmdPtr->key)) == 0) - searchResPtr->status = KEY_NOT_FOUND; - else - searchResPtr->status = KEY_FOUND; - } - else - { - searchResPtr->status = NOT_KEY_OWNER; - } - if (sendto(sock, (void *)searchResPtr, sizeof(struct searchRes), 0, - (struct sockaddr *)&clientAddr, socklen) == -1) - { - perror("udpListen():sendto()"); - } - break; - case FIND_LEADER_CMD: - if (bytesReceived != sizeof(char)) - { + break; + } + if (leader == getMyIpAddr()) + { + replyBuffer[0] = FIND_LEADER_RES; + if(sendto(udpServerSock, (void *)replyBuffer, sizeof(char), 0, + (struct sockaddr *)&clientAddr, socklen) == -1) + { + perror("udpListen():sendto"); + } + } + break; + case REBUILD_REQ: + if (bytesReceived != sizeof(char)) + { #ifdef DHT_LOG - fprintf(logfile, "udpListen(): ERROR: incorrect message size\n"); - fflush(logfile); + fprintf(logfile, "udpListen(): ERROR: incorrect message size\n"); + fflush(logfile); #endif - break; - } - if (leader == getMyIpAddr()) - { - replyBuffer[0] = FIND_LEADER_RES; - if(sendto(sock, (void *)replyBuffer, sizeof(char), 0, - (struct sockaddr *)&clientAddr, socklen) == -1) + break; + } + if (leader == getMyIpAddr()) { - perror("udpListen():sendto"); + replyBuffer[0] = REBUILD_RES; + if(sendto(udpServerSock, (void *)replyBuffer, sizeof(char), 0, + (struct sockaddr *)&clientAddr, socklen) == -1) + { + perror("udpListen():sendto"); + } + //TODO: leadRebuild() } - } - break; - default: + else + { + replyBuffer[0] = NOT_LEADER; + if(sendto(udpServerSock, (void *)replyBuffer, sizeof(char), 0, + (struct sockaddr *)&clientAddr, socklen) == -1) + { + perror("udpListen():sendto"); + } + } + break; +// default: #ifdef DHT_LOG - fprintf(logfile,"udpListen(): ERROR: Unknown message type\n"); - fflush(logfile); +// fprintf(logfile,"udpListen(): ERROR: Unknown message type\n"); +// fflush(logfile); #endif + } } } } @@ -686,12 +764,12 @@ void *tcpListen() { struct sockaddr_in myAddr; struct sockaddr_in clientAddr; - int sockListen, sockAccept; + int tcpAcceptSock; socklen_t socklen = sizeof(struct sockaddr_in); pthread_t threadTcpAccept; - sockListen = socket(AF_INET, SOCK_STREAM, 0); - if (sockListen == -1) + tcpListenSock = socket(AF_INET, SOCK_STREAM, 0); + if (tcpListenSock == -1) { perror("tcpListen():socket()"); pthread_exit(NULL); @@ -702,13 +780,13 @@ void *tcpListen() myAddr.sin_addr.s_addr = INADDR_ANY; memset(&(myAddr.sin_zero), '\0', 8); - if (bind(sockListen, (struct sockaddr *)&myAddr, socklen) == -1) + if (bind(tcpListenSock, (struct sockaddr *)&myAddr, socklen) == -1) { perror("tcpListen():socket()"); pthread_exit(NULL); } - if (listen(sockListen, BACKLOG) == -1) + if (listen(tcpListenSock, BACKLOG) == -1) { perror("tcpListen():listen()"); pthread_exit(NULL); @@ -721,23 +799,23 @@ void *tcpListen() while(1) { - sockAccept = accept(sockListen, (struct sockaddr *)&clientAddr, &socklen); - pthread_create(&threadTcpAccept, NULL, tcpAccept, (void *)sockAccept); + tcpAcceptSock = accept(tcpListenSock, (struct sockaddr *)&clientAddr, &socklen); + pthread_create(&threadTcpAccept, NULL, tcpAccept, (void *)tcpAcceptSock); } } void *tcpAccept(void *arg) { - int sockAccept = (int)arg; + int tcpAcceptSock = (int)arg; int bytesReceived; char msgType; #ifdef DHT_LOG - fprintf(logfile, "tcpAccept(): accepted tcp connection, file descriptor: %d\n", sockAccept); + fprintf(logfile, "tcpAccept(): accepted tcp connection, file descriptor: %d\n", tcpAcceptSock); fflush(logfile); #endif - bytesReceived = recv(sockAccept, &msgType, sizeof(char), 0); + bytesReceived = recv(tcpAcceptSock, &msgType, sizeof(char), 0); if (bytesReceived == -1) { perror("tcpAccept():recv()"); @@ -745,7 +823,7 @@ void *tcpAccept(void *arg) else if (bytesReceived == 0) { #ifdef DHT_LOG - fprintf(logfile, "tcpAccept(): bytesReceived = 0\n", sockAccept); + fprintf(logfile, "tcpAccept(): bytesReceived = 0\n", tcpAcceptSock); fflush(logfile); #endif } @@ -753,24 +831,24 @@ void *tcpAccept(void *arg) { switch (msgType) { - case DHT_INFO_CMD: - if (send(sockAccept, &numHosts, sizeof(numHosts), 0) == -1) + case DHT_INFO_REQ: + if (send(tcpAcceptSock, &numHosts, sizeof(numHosts), 0) == -1) { perror("tcpAccept():send()"); break; } - if (send(sockAccept, &numBlocks, sizeof(numBlocks), 0) == -1) + if (send(tcpAcceptSock, &numBlocks, sizeof(numBlocks), 0) == -1) { perror("tcpAccept():send()"); break; } - if (send(sockAccept, hostArray, numHosts*sizeof(struct hostData), + if (send(tcpAcceptSock, hostArray, numHosts*sizeof(struct hostData), 0) == -1) { perror("tcpAccept():send()"); break; } - if (send(sockAccept, blockOwnerArray, numBlocks*sizeof(unsigned int), + if (send(tcpAcceptSock, blockOwnerArray, numBlocks*sizeof(unsigned int), 0) == -1) { perror("tcpAccept():send()"); @@ -785,14 +863,14 @@ void *tcpAccept(void *arg) } } - if (close(sockAccept) == -1) + if (close(tcpAcceptSock) == -1) { perror("tcpAccept():close()"); } #ifdef DHT_LOG fprintf(logfile, "tcpAccept(): closed tcp connection, file descriptor: %d\n", - sockAccept); + tcpAcceptSock); fflush(logfile); #endif @@ -830,7 +908,7 @@ unsigned int getMyIpAddr() return ntohl(myAddr->sin_addr.s_addr); } -unsigned int getLeadersIpAddr() +unsigned int findLeader() { unsigned int reply_ip; int bytesReceived; @@ -838,7 +916,7 @@ unsigned int getLeadersIpAddr() char response; #ifdef DHT_LOG - fprintf(logfile, "getLeadersIpAddr(): broadcasting...\n"); + fprintf(logfile, "findLeader(): broadcasting...\n"); fflush(logfile); #endif @@ -851,27 +929,27 @@ unsigned int getLeadersIpAddr() if (bytesReceived == -1) { #ifdef DHT_LOG - fprintf(logfile, "getLeadersIpAddr(): no response\n"); - fflush(logfile); + fprintf(logfile, "findLeader(): no response\n"); + fflush(logfile); #endif return 0; } else if (response == FIND_LEADER_RES) { #ifdef DHT_LOG - struct in_addr reply_addr; - reply_addr.s_addr = htonl(reply_ip); - fprintf(logfile, "getLeadersIpAddr(): leader found:%s\n", + struct in_addr reply_addr; + reply_addr.s_addr = htonl(reply_ip); + fprintf(logfile, "findLeader(): leader found:%s\n", inet_ntoa(reply_addr)); - fflush(logfile); + fflush(logfile); #endif return reply_ip; } else { #ifdef DHT_LOG - fprintf(logfile, "getLeadersIpAddr(): unexpected response\n"); - fflush(logfile); + fprintf(logfile, "findLeader(): unexpected response\n"); + fflush(logfile); #endif return 0; } @@ -901,7 +979,7 @@ int getDHTdata() close(sock); return -1; } - msg = DHT_INFO_CMD; + msg = DHT_INFO_REQ; if (send(sock, &msg, sizeof(char), 0) == -1) { perror("getDHTdata():send()"); @@ -920,9 +998,9 @@ int getDHTdata() #ifdef DHT_LOG fprintf(logfile,"getDHTdata(): ERROR: numHosts not completely received\n"); fflush(logfile); +#endif close(sock); return -1; -#endif } bytesReceived = recv(sock, &numBlocks, sizeof(numBlocks), 0); if (bytesReceived == -1) @@ -936,9 +1014,9 @@ int getDHTdata() #ifdef DHT_LOG fprintf(logfile,"getDHTdata(): ERROR: numBlocks not completely received\n"); fflush(logfile); +#endif close(sock); return -1; -#endif } if (hostArray != NULL) free(hostArray); @@ -955,9 +1033,9 @@ int getDHTdata() #ifdef DHT_LOG fprintf(logfile,"getDHTdata(): ERROR: hostArray not completely received\n"); fflush(logfile); +#endif close(sock); return -1; -#endif } if (blockOwnerArray != NULL) free(blockOwnerArray); @@ -974,9 +1052,9 @@ int getDHTdata() #ifdef DHT_LOG fprintf(logfile,"getDHTdata(): ERROR: blockOwnerArray not completely received\n"); fflush(logfile); +#endif close(sock); return -1; -#endif } #ifdef DHT_LOG fprintf(logfile,"getDHTdata(): got data:\n"); @@ -991,11 +1069,105 @@ unsigned int hash(unsigned int x) return x % numBlocks; } +//This function will not return until it succeeds in submitting +// a rebuild request to the leader. It is then the leader's responibility +// to ensure that the rebuild is caried out +void initRebuild() +{ + int bytesReceived; + char msg; + char response; + int done; + int retry_count; + int i; + + done = 0; + retry_count = 0; + + while (!done) + { +#ifdef DHT_LOG + if (retry_count > 0) + { + fprintf(logfile,"initRebuild(): retry count:%d\n", retry_count); + fflush(logfile); + } +#endif + + if (leader == 0 || retry_count > 0) + { + leader = findLeader(); //broadcast + if (leader == 0) //no response + { + //TODO:elect leader: this will do for now + leader = getMyIpAddr(); + + numHosts = 1; + hostArray = calloc(numHosts, sizeof(struct hostData)); + hostArray[0] = myHostData; + numBlocks = INIT_BLOCK_NUM; + blockOwnerArray = calloc(numBlocks, sizeof(unsigned int)); + for (i = 0; i < numBlocks; i++) + blockOwnerArray[i] = 0; + } + } + + msg = REBUILD_REQ; + + bytesReceived = udpSendWaitForResponse(leader, UDP_PORT, + (void *)&msg, sizeof(msg), (void *)&response, sizeof(response), + TIMEOUT_MS, MAX_RETRIES); + if (bytesReceived == -1) + { + perror("initRebuild():recv()"); + } + else if (bytesReceived != sizeof(response)) + { +#ifdef DHT_LOG + fprintf(logfile,"initRebuild(): ERROR: response not completely received\n"); + fflush(logfile); +#endif + } + else if (response == NOT_LEADER) + { +#ifdef DHT_LOG + struct in_addr address; + address.s_addr = htonl(leader); + fprintf(logfile,"initRebuild(): ERROR: %s no longer leader\n", + inet_ntoa(address)); + fflush(logfile); +#endif + } + else if (response != REBUILD_RES) + { +#ifdef DHT_LOG + fprintf(logfile,"initRebuild(): ERROR: unexpected response\n"); + fflush(logfile); +#endif + } + else + { +#ifdef DHT_LOG + fprintf(logfile,"initRebuild(): submitted rebuild request\n"); + writeDHTdata(logfile); + fflush(logfile); +#endif + done = 1; + } + } + return; +} + void leadRebuild() { } +void followRebuild() +{ + +} + void writeDHTdata(FILE *outfile) { int i;