8 #define OIDS_PER_HOST 0x40000000
10 //set these to your IP addresses
11 unsigned int hosts[NUM_HOSTS] = {
19 void dhtInit(unsigned int maxKeyCapaciy)
26 //does nothing, returns 0
27 int dhtInsert(unsigned int key, unsigned int val)
30 //does nothing, returns 0
31 int dhtRemove(unsigned int key)
34 //returns 0 if successful and copies val into *val,
35 // 1 if key not found, -1 if an error occurred
36 int dhtSearch(unsigned int key, unsigned int *val)
38 *val = hosts[key / OIDS_PER_HOST];
44 #include <netinet/in.h>
45 #include <arpa/inet.h>
46 #include <sys/types.h>
47 #include <sys/socket.h>
48 #include <sys/ioctl.h>
58 #include <linux/sockios.h>
59 #include "clookup.h" //this works for now, do we need anything better?
61 #define BUFFER_SIZE 512 //maximum message size
64 #define BACKLOG 10 //max pending tcp connections
65 #define TIMEOUT_MS 500
67 #define INIT_HOST_ALLOC 16
68 #define INIT_BLOCK_NUM 64
69 #define DEFAULT_INTERFACE "eth0"
70 #define DHT_LOG "dht.log"
73 #define NUM_MSG_TYPES 19
97 const char *msg_types[NUM_MSG_TYPES] =
133 unsigned int maxKeyCapacity;
137 unsigned int msgType:8;
138 unsigned int unused:24;
144 unsigned int msgType:8;
145 unsigned int unused:24;
150 unsigned int msgType:8;
151 unsigned int unused:24;
156 unsigned int msgType:8;
157 unsigned int unused:24;
162 unsigned int msgType:8;
163 unsigned int unused:24;
168 unsigned int msgType:8;
169 unsigned int unused:24;
175 unsigned int msgType:8;
176 unsigned int unused:24;
180 //TODO: leave message, rebuild message...
183 unsigned int leader; //ip address of leader
184 struct hostData myHostData;
186 unsigned int numHosts;
187 struct hostData *hostArray;
188 unsigned int numBlocks;
189 unsigned int *blockOwnerArray;
190 /*----end DHT data----*/
191 pthread_t threadUdpListen;
192 pthread_t threadTcpListen;
196 //return my IP address
197 unsigned int getMyIpAddr();
198 //sends broadcast to discover leader
199 unsigned int findLeader();
204 //TCP connection handler
205 void *tcpAccept(void *);
206 //returns number of bytes received in resBuffer, or -1 if an error occurred
207 int udpSendWaitForResponse(unsigned int dest_ip, unsigned short dest_port,
208 void *msg, unsigned int msglen, void *resBuffer, unsigned int resBufferSize,
209 unsigned int timeout, unsigned int numRetries);
210 //returns number of bytes received in resBuffer, or -1 if an error occurred
211 int udpBroadcastWaitForResponse(unsigned int *reply_ip,
212 unsigned short dest_port, void *msg, unsigned int msglen, void *resBuffer,
213 unsigned int resBufferSize, unsigned int timeout, unsigned int numRetries);
215 int sendNoWait(unsigned int dest_ip, unsigned short dest_port, void *msg,
216 unsigned int msglen);
217 //right now this hashes the key into a block and returns the block owner
218 unsigned int getKeyOwner(unsigned int key);
220 unsigned int hash(unsigned int x);
221 //initiates TCP connection with leader, gets DHT data
223 //outputs readable DHT data to outfile
224 void writeDHTdata(FILE *outfile);
227 void followRebuild();
229 void dhtInit(unsigned int maxKeyCapacity)
231 unsigned int myMessage;
237 logfile = fopen(DHT_LOG, "w");
240 myHostData.ipAddr = getMyIpAddr();
241 myHostData.maxKeyCapacity = maxKeyCapacity;
243 numHosts = numBlocks = 0;
245 blockOwnerArray = NULL;
247 pthread_create(&threadUdpListen, NULL, udpListen, NULL);
248 pthread_create(&threadTcpListen, NULL, tcpListen, NULL);
252 /* leader = findLeader();
255 { //no response: I am the first
256 leader = getMyIpAddr();
259 hostArray = calloc(numHosts, sizeof(struct hostData));
260 hostArray[0] = myHostData;
261 numBlocks = INIT_BLOCK_NUM;
262 blockOwnerArray = calloc(numBlocks, sizeof(unsigned int));
263 for (i = 0; i < numBlocks; i++)
264 blockOwnerArray[i] = 0;
268 //get DHT data from leader
271 //TODO: actually, just initiate a rebuild here instead
283 pthread_cancel(threadUdpListen);
284 pthread_cancel(threadTcpListen);
285 close(udpServerSock);
286 close(tcpListenSock);
289 int dhtInsert(unsigned int key, unsigned int val)
291 unsigned int dest_ip = getKeyOwner(key);
292 struct insertCmd myMessage;
293 struct insertRes response;
296 myMessage.msgType = INSERT_CMD;
300 bytesReceived = udpSendWaitForResponse(dest_ip, UDP_PORT, (void *)&myMessage,
301 sizeof(struct insertCmd), (void *)&response, sizeof(struct insertRes),
302 TIMEOUT_MS, MAX_RETRIES);
303 if (bytesReceived == sizeof(struct insertRes))
305 if (response.msgType == INSERT_RES)
307 if (response.status == INSERT_OK)
309 // if (response.status == NOT_KEY_OWNER)
312 //TODO: find owner and try again, request rebuild if necessary
313 return -1; //this function should be robust enough to always return 0
316 int dhtRemove(unsigned int key)
318 unsigned int dest_ip = getKeyOwner(key);
319 struct removeCmd myMessage;
320 struct removeRes response;
323 myMessage.msgType = REMOVE_CMD;
326 bytesReceived = udpSendWaitForResponse(dest_ip, UDP_PORT, (void *)&myMessage,
327 sizeof(struct removeCmd), (void *)&response, sizeof(struct removeRes),
328 TIMEOUT_MS, MAX_RETRIES);
329 if (bytesReceived == sizeof(struct removeRes))
331 if (response.msgType == REMOVE_RES)
333 if (response.status == REMOVE_OK)
335 // if (response.status == NOT_KEY_OWNER)
338 //TODO: find owner and try again, request rebuild if necessary
339 return -1; //this function should be robust enough to always return 0
342 int dhtSearch(unsigned int key, unsigned int *val)
344 unsigned int dest_ip = getKeyOwner(key);
345 struct searchCmd myMessage;
346 struct searchRes response;
349 myMessage.msgType = SEARCH_CMD;
352 bytesReceived = udpSendWaitForResponse(dest_ip, UDP_PORT, (void *)&myMessage,
353 sizeof(struct searchCmd), (void *)&response, sizeof(struct searchRes),
354 TIMEOUT_MS, MAX_RETRIES);
355 if (bytesReceived == sizeof(struct searchRes))
357 if (response.msgType == SEARCH_RES)
359 if (response.status == KEY_FOUND)
364 if (response.status == KEY_NOT_FOUND)
368 // if (response.status == NOT_KEY_OWNER)
371 //TODO: find owner and try again, request rebuild if necessary
372 return -1; //this function should be robust enough to always return 0 or 1
377 //use UDP for messages that are frequent and short
380 struct sockaddr_in myAddr;
381 struct sockaddr_in clientAddr;
382 socklen_t socklen = sizeof(struct sockaddr_in);
383 char buffer[BUFFER_SIZE];
384 ssize_t bytesReceived;
385 struct insertCmd *insertCmdPtr;
386 struct removeCmd *removeCmdPtr;
387 struct searchCmd *searchCmdPtr;
388 struct insertRes *insertResPtr;
389 struct removeRes *removeResPtr;
390 struct searchRes *searchResPtr;
391 char replyBuffer[BUFFER_SIZE];
394 chashtable_t *myHashTable = chashCreate(HASH_SIZE, LOADFACTOR);
396 if ((udpServerSock = socket(AF_INET, SOCK_DGRAM, IPPROTO_UDP)) == -1)
398 perror("udpListen():socket()");
402 bzero(&myAddr, socklen);
403 myAddr.sin_family=AF_INET;
404 myAddr.sin_addr.s_addr=INADDR_ANY;
405 myAddr.sin_port=htons(UDP_PORT);
407 if (bind(udpServerSock, (struct sockaddr *)&myAddr, socklen) == -1)
409 perror("udpListen():bind()");
413 fprintf(logfile,"udpListen(): listening on port %d\n", UDP_PORT);
418 if ((bytesReceived = recvfrom(udpServerSock, buffer, BUFFER_SIZE, 0,
419 (struct sockaddr *)&clientAddr, &socklen)) == -1)
421 perror("udpListen():recvfrom()");
423 else if (bytesReceived == 0)
426 fprintf(logfile,"udpListen(): recvfrom() returned 0\n");
432 gettimeofday(&now, NULL);
434 fprintf(logfile, "udpListen(): received %s from %s\n",
435 (buffer[0] < NUM_MSG_TYPES ? msg_types[buffer[0]] : "unknown message"),
436 inet_ntoa(clientAddr.sin_addr));
437 // fprintf(logfile,"udpListen(): time received:%ds,%dus\n", now.tv_sec,
439 // fprintf(logfile,"udpListen(): msg size:%d bytes source:%s:%d\n",
440 // bytesReceived,inet_ntoa(clientAddr.sin_addr),htons(clientAddr.sin_port));
447 if (bytesReceived != sizeof(struct insertCmd))
450 fprintf(logfile, "udpListen(): ERROR: incorrect message size\n");
455 insertCmdPtr = (struct insertCmd *)buffer;
457 fprintf(logfile, "udpListen(): Insert: key=%d, val=%d\n",
458 insertCmdPtr->key, insertCmdPtr->val);
461 insertResPtr = (struct insertRes *)replyBuffer;
462 insertResPtr->msgType = INSERT_RES;
463 if (getKeyOwner(insertCmdPtr->key) == myHostData.ipAddr)
465 //note: casting val to void * in order to conform to API
466 if(chashInsert(myHashTable, insertCmdPtr->key,
467 (void *)insertCmdPtr->val) == 0)
468 insertResPtr->status = INSERT_OK;
470 insertResPtr->status = INSERT_ERROR;
474 insertResPtr->status = NOT_KEY_OWNER;;
476 if (sendto(udpServerSock, (void *)insertResPtr,
477 sizeof(struct insertRes), 0, (struct sockaddr *)&clientAddr,
480 perror("udpListen():sendto()");
484 if (bytesReceived != sizeof(struct removeCmd))
487 fprintf(logfile, "udpListen(): ERROR: incorrect message size\n");
492 removeCmdPtr = (struct removeCmd *)buffer;
494 fprintf(logfile,"udpListen(): Remove: key=%d\n", removeCmdPtr->key);
497 removeResPtr = (struct removeRes *)replyBuffer;
498 removeResPtr->msgType = REMOVE_RES;
499 if (getKeyOwner(removeCmdPtr->key) == myHostData.ipAddr)
501 //note: casting val to void * in order to conform to API
502 if(chashRemove(myHashTable, removeCmdPtr->key) == 0)
503 removeResPtr->status = INSERT_OK;
505 removeResPtr->status = INSERT_ERROR;
509 removeResPtr->status = NOT_KEY_OWNER;
511 if (sendto(udpServerSock, (void *)removeResPtr, sizeof(struct removeRes), 0,
512 (struct sockaddr *)&clientAddr, socklen) == -1)
514 perror("udpListen():sendto()");
518 if (bytesReceived != sizeof(struct searchCmd))
521 fprintf(logfile,"udpListen(): ERROR: incorrect message size\n");
526 searchCmdPtr = (struct searchCmd *)buffer;
528 fprintf(logfile,"udpListen(): Search: key=%d\n",searchCmdPtr->key);
531 searchResPtr = (struct searchRes *)replyBuffer;
532 searchResPtr->msgType = SEARCH_RES;
533 if (getKeyOwner(searchCmdPtr->key) == myHostData.ipAddr)
535 //note: casting val to void * in order to conform to API
536 if((searchResPtr->val = (unsigned int)chashSearch(myHashTable,
537 searchCmdPtr->key)) == 0)
538 searchResPtr->status = KEY_NOT_FOUND;
540 searchResPtr->status = KEY_FOUND;
544 searchResPtr->status = NOT_KEY_OWNER;
546 if (sendto(udpServerSock, (void *)searchResPtr, sizeof(struct searchRes), 0,
547 (struct sockaddr *)&clientAddr, socklen) == -1)
549 perror("udpListen():sendto()");
552 case FIND_LEADER_CMD:
553 if (bytesReceived != sizeof(char))
556 fprintf(logfile, "udpListen(): ERROR: incorrect message size\n");
561 if (leader == getMyIpAddr())
563 replyBuffer[0] = FIND_LEADER_RES;
564 if(sendto(udpServerSock, (void *)replyBuffer, sizeof(char), 0,
565 (struct sockaddr *)&clientAddr, socklen) == -1)
567 perror("udpListen():sendto");
572 if (bytesReceived != sizeof(char))
575 fprintf(logfile, "udpListen(): ERROR: incorrect message size\n");
580 if (leader == getMyIpAddr())
582 replyBuffer[0] = REBUILD_RES;
583 if(sendto(udpServerSock, (void *)replyBuffer, sizeof(char), 0,
584 (struct sockaddr *)&clientAddr, socklen) == -1)
586 perror("udpListen():sendto");
588 //TODO: leadRebuild()
592 replyBuffer[0] = NOT_LEADER;
593 if(sendto(udpServerSock, (void *)replyBuffer, sizeof(char), 0,
594 (struct sockaddr *)&clientAddr, socklen) == -1)
596 perror("udpListen():sendto");
602 // fprintf(logfile,"udpListen(): ERROR: Unknown message type\n");
610 int udpSendWaitForResponse(unsigned int dest_ip, unsigned short dest_port,
611 void *msg, unsigned int msglen, void *resBuffer, unsigned int resBufferSize,
612 unsigned int timeout, unsigned int numRetries)
614 struct sockaddr_in server_addr;
615 struct sockaddr_in ack_addr;
616 socklen_t socklen = sizeof(struct sockaddr_in);
617 struct pollfd pollsock;
621 ssize_t bytesReceived;
623 bzero((char *) &server_addr, sizeof(server_addr));
624 server_addr.sin_family = AF_INET;
625 server_addr.sin_port = htons(dest_port);
626 server_addr.sin_addr.s_addr = htonl(dest_ip);
628 if ((pollsock.fd = socket(PF_INET, SOCK_DGRAM, IPPROTO_UDP)) == -1)
630 perror("udpSendWaitForResponse():socket()");
634 pollsock.events = POLLIN;
636 for (i = 0; i < MAX_RETRIES; i++)
640 fprintf(logfile,"udpSendWaitForResponse(): trying again, count: %d\n",
644 if (sendto(pollsock.fd, msg, msglen, 0, (struct sockaddr *)&server_addr,
647 perror("udpSendWaitForResponse():sendto");
651 gettimeofday(&now, NULL);
652 fprintf(logfile,"udpSendWaitForResponse(): message sent:%ds,%dus\n",
653 now.tv_sec, now.tv_usec);
656 retval = poll(&pollsock, 1, timeout);
659 bytesReceived = recvfrom(pollsock.fd, resBuffer, resBufferSize, 0,
660 (struct sockaddr *)&ack_addr, &socklen);
661 if ((ack_addr.sin_addr.s_addr == server_addr.sin_addr.s_addr)
662 && (ack_addr.sin_port == server_addr.sin_port))
666 gettimeofday(&now, NULL);
667 fprintf(logfile,"udpSendWaitForResponse(): received response:%ds,%dus\n", now.tv_sec, now.tv_usec);
670 return bytesReceived;
676 gettimeofday(&now, NULL);
677 printf("udpSendWaitForResponse(): timed out, no ack:%ds,%dus\n",
678 now.tv_sec, now.tv_usec);
684 int udpBroadcastWaitForResponse(unsigned int *reply_ip,
685 unsigned short dest_port, void *msg, unsigned int msglen, void *resBuffer,
686 unsigned int resBufferSize, unsigned int timeout, unsigned int numRetries)
688 struct sockaddr_in server_addr;
689 struct sockaddr_in ack_addr;
690 socklen_t socklen = sizeof(struct sockaddr_in);
691 struct pollfd pollsock;
695 ssize_t bytesReceived;
698 bzero((char *) &server_addr, sizeof(server_addr));
699 server_addr.sin_family = AF_INET;
700 server_addr.sin_port = htons(dest_port);
701 server_addr.sin_addr.s_addr = htonl(0xFFFFFFFF);
703 if ((pollsock.fd = socket(PF_INET, SOCK_DGRAM, IPPROTO_UDP)) == -1)
705 perror("udpBroadcastWaitForResponse():socket()");
710 if (setsockopt(pollsock.fd, SOL_SOCKET, SO_BROADCAST, &on, sizeof(on)) == -1)
712 perror("udpBroadcastWaitForResponse():setsockopt()");
716 pollsock.events = POLLIN;
718 for (i = 0; i < MAX_RETRIES; i++)
722 fprintf(logfile,"udpBroadcastWaitForResponse(): trying again, count: %d\n", i+1);
725 if (sendto(pollsock.fd, msg, msglen, 0, (struct sockaddr *)&server_addr,
728 perror("udpBroadcastWaitForResponse():sendto()");
732 gettimeofday(&now, NULL);
733 fprintf(logfile,"udpBroadcastWaitForResponse(): message sent:%ds,%dus\n",
734 now.tv_sec, now.tv_usec);
737 retval = poll(&pollsock, 1, timeout);
740 bytesReceived = recvfrom(pollsock.fd, resBuffer, resBufferSize, 0,
741 (struct sockaddr *)&ack_addr, &socklen);
743 *reply_ip = htonl(ack_addr.sin_addr.s_addr);
745 gettimeofday(&now, NULL);
746 fprintf(logfile,"udpBroadcastWaitForResponse(): received response:%ds,%dus\n", now.tv_sec, now.tv_usec);
749 return bytesReceived;
754 gettimeofday(&now, NULL);
755 fprintf(logfile,"udpBroadcastWaitForResponse(): timed out, no ack:%ds,%dus\n",
756 now.tv_sec, now.tv_usec);
762 // use TCP for potentially large and/or important data transfer
765 struct sockaddr_in myAddr;
766 struct sockaddr_in clientAddr;
768 socklen_t socklen = sizeof(struct sockaddr_in);
769 pthread_t threadTcpAccept;
771 tcpListenSock = socket(AF_INET, SOCK_STREAM, 0);
772 if (tcpListenSock == -1)
774 perror("tcpListen():socket()");
778 myAddr.sin_family = AF_INET;
779 myAddr.sin_port = htons(TCP_PORT);
780 myAddr.sin_addr.s_addr = INADDR_ANY;
781 memset(&(myAddr.sin_zero), '\0', 8);
783 if (bind(tcpListenSock, (struct sockaddr *)&myAddr, socklen) == -1)
785 perror("tcpListen():socket()");
789 if (listen(tcpListenSock, BACKLOG) == -1)
791 perror("tcpListen():listen()");
796 fprintf(logfile,"tcpListen(): listening on port %d\n", TCP_PORT);
802 tcpAcceptSock = accept(tcpListenSock, (struct sockaddr *)&clientAddr, &socklen);
803 pthread_create(&threadTcpAccept, NULL, tcpAccept, (void *)tcpAcceptSock);
807 void *tcpAccept(void *arg)
809 int tcpAcceptSock = (int)arg;
814 fprintf(logfile, "tcpAccept(): accepted tcp connection, file descriptor: %d\n", tcpAcceptSock);
818 bytesReceived = recv(tcpAcceptSock, &msgType, sizeof(char), 0);
819 if (bytesReceived == -1)
821 perror("tcpAccept():recv()");
823 else if (bytesReceived == 0)
826 fprintf(logfile, "tcpAccept(): bytesReceived = 0\n", tcpAcceptSock);
835 if (send(tcpAcceptSock, &numHosts, sizeof(numHosts), 0) == -1)
837 perror("tcpAccept():send()");
840 if (send(tcpAcceptSock, &numBlocks, sizeof(numBlocks), 0) == -1)
842 perror("tcpAccept():send()");
845 if (send(tcpAcceptSock, hostArray, numHosts*sizeof(struct hostData),
848 perror("tcpAccept():send()");
851 if (send(tcpAcceptSock, blockOwnerArray, numBlocks*sizeof(unsigned int),
854 perror("tcpAccept():send()");
860 fprintf(logfile, "tcpAccept(): unrecognized msg type\n");
866 if (close(tcpAcceptSock) == -1)
868 perror("tcpAccept():close()");
872 fprintf(logfile, "tcpAccept(): closed tcp connection, file descriptor: %d\n",
880 unsigned int getKeyOwner(unsigned int key)
882 return hostArray[blockOwnerArray[hash(key)]].ipAddr;
885 unsigned int getMyIpAddr()
888 struct ifreq interfaceInfo;
889 struct sockaddr_in *myAddr = (struct sockaddr_in *)&interfaceInfo.ifr_addr;
891 memset(&interfaceInfo, 0, sizeof(struct ifreq));
893 if((sock = socket(AF_INET, SOCK_STREAM, 0)) < 0)
895 perror("getMyIpAddr():socket()");
899 strcpy(interfaceInfo.ifr_name, DEFAULT_INTERFACE);
900 myAddr->sin_family = AF_INET;
902 if(ioctl(sock, SIOCGIFADDR, &interfaceInfo) != 0)
904 perror("getMyIpAddr():ioctl()");
908 return ntohl(myAddr->sin_addr.s_addr);
911 unsigned int findLeader()
913 unsigned int reply_ip;
919 fprintf(logfile, "findLeader(): broadcasting...\n");
923 myMessage = FIND_LEADER_CMD;
925 bytesReceived = udpBroadcastWaitForResponse(&reply_ip, UDP_PORT,
926 (void *)&myMessage, sizeof(myMessage), (void *)&response,
927 sizeof(response), TIMEOUT_MS, MAX_RETRIES);
929 if (bytesReceived == -1)
932 fprintf(logfile, "findLeader(): no response\n");
937 else if (response == FIND_LEADER_RES)
940 struct in_addr reply_addr;
941 reply_addr.s_addr = htonl(reply_ip);
942 fprintf(logfile, "findLeader(): leader found:%s\n",
943 inet_ntoa(reply_addr));
951 fprintf(logfile, "findLeader(): unexpected response\n");
960 struct sockaddr_in leader_addr;
965 if ((sock = socket(AF_INET, SOCK_STREAM, 0)) == -1)
967 perror("getDHTdata():socket()");
971 bzero((char *)&leader_addr, sizeof(leader_addr));
972 leader_addr.sin_family = AF_INET;
973 leader_addr.sin_port = htons(TCP_PORT);
974 leader_addr.sin_addr.s_addr = htonl(leader);
976 if (connect(sock, (struct sockaddr *)&leader_addr, sizeof(leader_addr)) == -1)
978 perror("getDHTdata():connect()");
983 if (send(sock, &msg, sizeof(char), 0) == -1)
985 perror("getDHTdata():send()");
989 bytesReceived = recv(sock, &numHosts, sizeof(numHosts), 0);
990 if (bytesReceived == -1)
992 perror("getDHTdata():recv()");
996 if (bytesReceived != sizeof(numHosts))
999 fprintf(logfile,"getDHTdata(): ERROR: numHosts not completely received\n");
1005 bytesReceived = recv(sock, &numBlocks, sizeof(numBlocks), 0);
1006 if (bytesReceived == -1)
1008 perror("getDHTdata():recv()");
1012 if (bytesReceived != sizeof(numBlocks))
1015 fprintf(logfile,"getDHTdata(): ERROR: numBlocks not completely received\n");
1021 if (hostArray != NULL)
1023 hostArray = calloc(numHosts, sizeof(struct hostData));
1024 bytesReceived = recv(sock, hostArray, numHosts*sizeof(struct hostData), 0);
1025 if (bytesReceived == -1)
1027 perror("getDHTdata():recv()");
1031 if (bytesReceived != numHosts*sizeof(struct hostData))
1034 fprintf(logfile,"getDHTdata(): ERROR: hostArray not completely received\n");
1040 if (blockOwnerArray != NULL)
1041 free(blockOwnerArray);
1042 blockOwnerArray = calloc(numBlocks, sizeof(unsigned int));
1043 bytesReceived = recv(sock, blockOwnerArray, numBlocks*sizeof(unsigned int), 0);
1044 if (bytesReceived == -1)
1046 perror("getDHTdata():recv()");
1050 if (bytesReceived != numBlocks*sizeof(unsigned int))
1053 fprintf(logfile,"getDHTdata(): ERROR: blockOwnerArray not completely received\n");
1060 fprintf(logfile,"getDHTdata(): got data:\n");
1061 writeDHTdata(logfile);
1067 unsigned int hash(unsigned int x)
1069 return x % numBlocks;
1072 //This function will not return until it succeeds in submitting
1073 // a rebuild request to the leader. It is then the leader's responibility
1074 // to ensure that the rebuild is caried out
1090 if (retry_count > 0)
1092 fprintf(logfile,"initRebuild(): retry count:%d\n", retry_count);
1097 if (leader == 0 || retry_count > 0)
1099 leader = findLeader(); //broadcast
1100 if (leader == 0) //no response
1102 //TODO:elect leader: this will do for now
1103 leader = getMyIpAddr();
1106 hostArray = calloc(numHosts, sizeof(struct hostData));
1107 hostArray[0] = myHostData;
1108 numBlocks = INIT_BLOCK_NUM;
1109 blockOwnerArray = calloc(numBlocks, sizeof(unsigned int));
1110 for (i = 0; i < numBlocks; i++)
1111 blockOwnerArray[i] = 0;
1117 bytesReceived = udpSendWaitForResponse(leader, UDP_PORT,
1118 (void *)&msg, sizeof(msg), (void *)&response, sizeof(response),
1119 TIMEOUT_MS, MAX_RETRIES);
1120 if (bytesReceived == -1)
1122 perror("initRebuild():recv()");
1124 else if (bytesReceived != sizeof(response))
1127 fprintf(logfile,"initRebuild(): ERROR: response not completely received\n");
1131 else if (response == NOT_LEADER)
1134 struct in_addr address;
1135 address.s_addr = htonl(leader);
1136 fprintf(logfile,"initRebuild(): ERROR: %s no longer leader\n",
1137 inet_ntoa(address));
1141 else if (response != REBUILD_RES)
1144 fprintf(logfile,"initRebuild(): ERROR: unexpected response\n");
1151 fprintf(logfile,"initRebuild(): submitted rebuild request\n");
1152 writeDHTdata(logfile);
1166 void followRebuild()
1171 void writeDHTdata(FILE *outfile)
1174 struct in_addr address;
1175 fprintf(outfile,"numHosts=%d,numBlocks=%d\n", numHosts, numBlocks);
1176 fprintf(outfile,"hostArray: index: ipAddr, maxKeyCapacity\n");
1177 for (i = 0; i < numHosts; i++)
1179 address.s_addr = htonl(hostArray[i].ipAddr);
1180 fprintf(outfile,"%d: %s, %d\n", i, inet_ntoa(address),
1181 hostArray[i].maxKeyCapacity);
1183 fprintf(outfile,"blockOwnerArray: index: blockOwner\n");
1184 for (i = 0; i < numBlocks; i++)
1186 fprintf(outfile,"%d: %d\n", i, blockOwnerArray[i]);