8 #define OIDS_PER_HOST 0x40000000
10 //set these to your IP addresses
11 unsigned int hosts[NUM_HOSTS] = {
19 void dhtInit(unsigned int maxKeyCapaciy)
26 //does nothing, returns 0
27 int dhtInsert(unsigned int key, unsigned int val)
30 //does nothing, returns 0
31 int dhtRemove(unsigned int key)
34 //returns 0 if successful and copies val into *val,
35 // 1 if key not found, -1 if an error occurred
36 int dhtSearch(unsigned int key, unsigned int *val)
38 *val = hosts[key / OIDS_PER_HOST];
44 /*******************************************************************************
46 *******************************************************************************/
48 #include <netinet/in.h>
49 #include <arpa/inet.h>
50 #include <sys/types.h>
51 #include <sys/socket.h>
52 #include <sys/ioctl.h>
63 #include <linux/sockios.h>
65 #include "clookup.h" //this works for now, do we need anything better?
67 /*******************************************************************************
68 * Local Defines, Structs
69 *******************************************************************************/
71 #define BUFFER_SIZE 512 //maximum message size
74 #define BACKLOG 10 //max pending tcp connections
75 #define TIMEOUT_MS 500
77 #define INIT_HOST_ALLOC 1
78 #define INIT_BLOCK_NUM 1
79 #define DEFAULT_INTERFACE "eth0"
80 #define DHT_LOG "dht.log"
82 //make sure this is consistent with enum below
83 #define NUM_MSG_TYPES 20
85 //make sure this matches msg_types global var
133 unsigned int maxKeyCapacity;
137 unsigned int msgType:8;
138 unsigned int unused:24;
144 unsigned int msgType:8;
145 unsigned int unused:24;
150 unsigned int msgType:8;
151 unsigned int unused:24;
156 unsigned int msgType:8;
157 unsigned int unused:24;
162 unsigned int msgType:8;
163 unsigned int unused:24;
168 unsigned int msgType:8;
169 unsigned int unused:24;
175 unsigned int msgType:8;
176 unsigned int unused:24;
177 struct hostData newHostData;
180 /*******************************************************************************
182 *******************************************************************************/
184 //make sure this matches enumeration above
185 const char *msg_types[NUM_MSG_TYPES] =
210 //ip address of leader
213 struct hostData myHostData;
214 //number of hosts in the system
215 unsigned int numHosts;
216 //ip address and max key capacity of each host
217 struct hostData *hostArray;
218 //memory allocated for this many items in hostArray
219 unsigned int hostArraySize;
220 //number of keyspace divisions, preferably a power of 2 > numHosts
221 unsigned int numBlocks;
222 //this array has numBlocks elements, each of which contains an index to hostArray
223 // the key owner is found by hashing the key into one of these blocks and using this
224 // array to find the corresponding host in hostArray
225 unsigned int *blockOwnerArray;
226 //used by leader to track which hosts have responded, etc.
227 unsigned int *hostRebuildStates;
229 pthread_t threadUdpListen;
230 pthread_t threadTcpListen;
232 struct pollfd udpServerPollSock;
234 //see above for enumeration of states
237 /*******************************************************************************
238 * Local Function Prototypes
239 *******************************************************************************/
241 //log funtion, use like printf()
242 void dhtLog(const char *format, ...);
243 //return my IP address
244 unsigned int getMyIpAddr();
245 //sends broadcast to discover leader
246 unsigned int findLeader();
251 //TCP connection handler
252 void *tcpAccept(void *);
253 //returns number of bytes received in resBuffer, or -1 if an error occurred
254 int udpSendWaitForResponse(unsigned int dest_ip, unsigned short dest_port,
255 void *msg, unsigned int msglen, void *resBuffer, unsigned int resBufferSize,
256 unsigned int timeout, unsigned int numRetries);
257 //returns number of bytes received in resBuffer, or -1 if an error occurred
258 int udpBroadcastWaitForResponse(unsigned int *reply_ip,
259 unsigned short dest_port, void *msg, unsigned int msglen, void *resBuffer,
260 unsigned int resBufferSize, unsigned int timeout, unsigned int numRetries);
262 int sendNoWait(unsigned int dest_ip, unsigned short dest_port, void *msg,
263 unsigned int msglen);
264 //right now this hashes the key into a block and returns the block owner
265 unsigned int getKeyOwner(unsigned int key);
267 unsigned int hash(unsigned int x);
268 //sends REBUILD_REQ to leader, retries until leader responds, or causes new leader to be chosen
270 //adds entry to end of hostArray, increments numHosts,
271 // allocates more space if necessary
272 void addHost(struct hostData newHost);
273 //initiates TCP connection with leader, gets DHT data
275 //outputs readable DHT data to outfile
276 void writeDHTdata(FILE *outfile);
279 void makeAssignments();
280 //returns not-zero if ok, zero if not ok
281 int msgSizeOk(unsigned char type, unsigned int size);
283 /*******************************************************************************
284 * Global Function Definitions
285 *******************************************************************************/
287 void dhtInit(unsigned int maxKeyCapacity)
289 unsigned int myMessage;
294 logfile = fopen(DHT_LOG, "w");
295 dhtLog("dhtInit() - initializing...\n");
297 myHostData.ipAddr = getMyIpAddr();
298 myHostData.maxKeyCapacity = maxKeyCapacity;
300 numHosts = numBlocks = hostArraySize = 0;
302 blockOwnerArray = NULL;
303 hostRebuildStates = NULL;
305 state = NORMAL_STATE;
307 pthread_create(&threadUdpListen, NULL, udpListen, NULL);
308 pthread_create(&threadTcpListen, NULL, tcpListen, NULL);
317 dhtLog("dhtExit(): cleaning up...\n");
319 pthread_cancel(threadUdpListen);
320 pthread_cancel(threadTcpListen);
321 close(udpServerPollSock.fd);
322 close(tcpListenSock);
326 int dhtInsert(unsigned int key, unsigned int val)
328 unsigned int dest_ip = getKeyOwner(key);
329 struct insertCmd myMessage;
330 struct insertRes response;
333 myMessage.msgType = INSERT_CMD;
337 bytesReceived = udpSendWaitForResponse(dest_ip, UDP_PORT, (void *)&myMessage,
338 sizeof(struct insertCmd), (void *)&response, sizeof(struct insertRes),
339 TIMEOUT_MS, MAX_RETRIES);
340 if (bytesReceived == sizeof(struct insertRes))
342 if (response.msgType == INSERT_RES)
344 if (response.status == INSERT_OK)
346 // if (response.status == NOT_KEY_OWNER)
349 //TODO: find owner and try again, request rebuild if necessary
350 return -1; //this function should be robust enough to always return 0
353 int dhtRemove(unsigned int key)
355 unsigned int dest_ip = getKeyOwner(key);
356 struct removeCmd myMessage;
357 struct removeRes response;
360 myMessage.msgType = REMOVE_CMD;
363 bytesReceived = udpSendWaitForResponse(dest_ip, UDP_PORT, (void *)&myMessage,
364 sizeof(struct removeCmd), (void *)&response, sizeof(struct removeRes),
365 TIMEOUT_MS, MAX_RETRIES);
366 if (bytesReceived == sizeof(struct removeRes))
368 if (response.msgType == REMOVE_RES)
370 if (response.status == REMOVE_OK)
372 // if (response.status == NOT_KEY_OWNER)
375 //TODO: find owner and try again, request rebuild if necessary
376 return -1; //this function should be robust enough to always return 0
379 int dhtSearch(unsigned int key, unsigned int *val)
381 unsigned int dest_ip = getKeyOwner(key);
382 struct searchCmd myMessage;
383 struct searchRes response;
386 myMessage.msgType = SEARCH_CMD;
389 bytesReceived = udpSendWaitForResponse(dest_ip, UDP_PORT, (void *)&myMessage,
390 sizeof(struct searchCmd), (void *)&response, sizeof(struct searchRes),
391 TIMEOUT_MS, MAX_RETRIES);
392 if (bytesReceived == sizeof(struct searchRes))
394 if (response.msgType == SEARCH_RES)
396 if (response.status == KEY_FOUND)
401 if (response.status == KEY_NOT_FOUND)
405 // if (response.status == NOT_KEY_OWNER)
408 //TODO: find owner and try again, request rebuild if necessary
409 return -1; //this function should be robust enough to always return 0 or 1
412 /*******************************************************************************
413 * Local Function Definitions
414 *******************************************************************************/
416 //use UDP for messages that are frequent and short
419 struct sockaddr_in myAddr;
420 struct sockaddr_in clientAddr;
421 struct sockaddr_in bcastAddr;
422 socklen_t socklen = sizeof(struct sockaddr_in);
423 char buffer[BUFFER_SIZE];
424 ssize_t bytesReceived;
425 struct insertCmd *insertCmdPtr;
426 struct removeCmd *removeCmdPtr;
427 struct searchCmd *searchCmdPtr;
428 struct insertRes *insertResPtr;
429 struct removeRes *removeResPtr;
430 struct searchRes *searchResPtr;
431 struct joinReq *joinReqPtr;
432 char replyBuffer[BUFFER_SIZE];
434 struct timeval rebuild1Timeout;
435 int rebuild1TimerSet;
440 chashtable_t *myHashTable = chashCreate(HASH_SIZE, LOADFACTOR);
442 if ((udpServerPollSock.fd = socket(AF_INET, SOCK_DGRAM, IPPROTO_UDP)) == -1)
444 perror("udpListen():socket()");
449 if (setsockopt(udpServerPollSock.fd, SOL_SOCKET, SO_BROADCAST, &on,
452 perror("udpBroadcastWaitForResponse():setsockopt()");
456 udpServerPollSock.events = POLLIN;
458 bzero(&myAddr, socklen);
459 myAddr.sin_family = AF_INET;
460 myAddr.sin_addr.s_addr = htonl(INADDR_ANY);
461 myAddr.sin_port = htons(UDP_PORT);
463 bzero(&bcastAddr, socklen);
464 bcastAddr.sin_family = AF_INET;
465 bcastAddr.sin_addr.s_addr = htonl(0xFFFFFFFF);
466 bcastAddr.sin_port = htons(UDP_PORT);
468 if (bind(udpServerPollSock.fd, (struct sockaddr *)&myAddr, socklen) == -1)
470 perror("udpListen():bind()");
473 dhtLog("udpListen(): listening on port %d\n", UDP_PORT);
475 rebuild1TimerSet = 0;
478 pollret = poll(&udpServerPollSock, 1, TIMEOUT_MS);
480 { perror("udpListen():poll()"); }
481 else if (pollret > 0)
483 if ((bytesReceived = recvfrom(udpServerPollSock.fd, buffer, BUFFER_SIZE,
484 0, (struct sockaddr *)&clientAddr, &socklen)) == -1)
485 { perror("udpListen():recvfrom()"); }
486 else if (bytesReceived == 0)
488 dhtLog("udpListen(): recvfrom() returned 0\n");
492 dhtLog("udpListen(): received %s from %s\n",
493 (buffer[0] < NUM_MSG_TYPES ? msg_types[buffer[0]] :
494 "unknown message"), inet_ntoa(clientAddr.sin_addr));
495 if (!msgSizeOk(buffer[0], bytesReceived))
497 dhtLog("udpListen(): ERROR: incorrect message size\n");
504 if (state == NORMAL_STATE || state == LEAD_NORMAL_STATE
505 || state == REBUILD3_STATE || state == LEAD_REBUILD3_STATE)
507 insertCmdPtr = (struct insertCmd *)buffer;
508 dhtLog( "udpListen(): Insert: key=%d, val=%d\n",
509 insertCmdPtr->key, insertCmdPtr->val);
510 insertResPtr = (struct insertRes *)replyBuffer;
511 insertResPtr->msgType = INSERT_RES;
512 insertResPtr->unused = 0;
513 if (getKeyOwner(insertCmdPtr->key) == myHostData.ipAddr)
515 //note: casting val to void * in order to conform to API
516 if(chashInsert(myHashTable, insertCmdPtr->key,
517 (void *)insertCmdPtr->val) == 0)
518 insertResPtr->status = INSERT_OK;
520 insertResPtr->status = INSERT_ERROR;
524 insertResPtr->status = NOT_KEY_OWNER;;
526 if (sendto(udpServerPollSock.fd, (void *)insertResPtr,
527 sizeof(struct insertRes), 0, (struct sockaddr *)&clientAddr,
529 { perror("udpListen():sendto()"); }
533 if (state == NORMAL_STATE || state == LEAD_NORMAL_STATE)
535 removeCmdPtr = (struct removeCmd *)buffer;
536 dhtLog("udpListen(): Remove: key=%d\n", removeCmdPtr->key);
537 removeResPtr = (struct removeRes *)replyBuffer;
538 removeResPtr->msgType = REMOVE_RES;
539 removeResPtr->unused = 0;
540 if (getKeyOwner(removeCmdPtr->key) == myHostData.ipAddr)
542 //note: casting val to void * in order to conform to API
543 if(chashRemove(myHashTable, removeCmdPtr->key) == 0)
544 removeResPtr->status = INSERT_OK;
546 removeResPtr->status = INSERT_ERROR;
550 removeResPtr->status = NOT_KEY_OWNER;
552 if (sendto(udpServerPollSock.fd, (void *)removeResPtr,
553 sizeof(struct removeRes), 0, (struct sockaddr *)&clientAddr,
555 { perror("udpListen():sendto()"); }
559 if (state == NORMAL_STATE || state == LEAD_NORMAL_STATE)
561 searchCmdPtr = (struct searchCmd *)buffer;
562 dhtLog("udpListen(): Search: key=%d\n",searchCmdPtr->key);
563 searchResPtr = (struct searchRes *)replyBuffer;
564 searchResPtr->msgType = SEARCH_RES;
565 searchResPtr->unused = 0;
566 if (getKeyOwner(searchCmdPtr->key) == myHostData.ipAddr)
568 //note: casting val to void * in order to conform to API
569 if((searchResPtr->val = (unsigned int)chashSearch(myHashTable,
570 searchCmdPtr->key)) == 0)
571 searchResPtr->status = KEY_NOT_FOUND;
573 searchResPtr->status = KEY_FOUND;
577 searchResPtr->status = NOT_KEY_OWNER;
579 if (sendto(udpServerPollSock.fd, (void *)searchResPtr,
580 sizeof(struct searchRes), 0, (struct sockaddr *)&clientAddr,
582 { perror("udpListen():sendto()"); }
585 case FIND_LEADER_REQ:
586 if (state == LEAD_NORMAL_STATE || state == LEAD_REBUILD1_STATE
587 || state == LEAD_REBUILD2_STATE || state == LEAD_REBUILD3_STATE)
589 replyBuffer[0] = FIND_LEADER_RES;
590 if(sendto(udpServerPollSock.fd, (void *)replyBuffer,
591 sizeof(char), 0,(struct sockaddr *)&clientAddr, socklen) == -1)
592 { perror("udpListen():sendto()"); }
596 if (state == LEAD_NORMAL_STATE || state == LEAD_REBUILD1_STATE
597 || state == LEAD_REBUILD2_STATE || state == LEAD_REBUILD3_STATE)
599 replyBuffer[0] = REBUILD_RES;
600 if (sendto(udpServerPollSock.fd, (void *)replyBuffer,
601 sizeof(char), 0, (struct sockaddr *)&clientAddr, socklen) == -1)
602 { perror("udpListen():sendto()"); }
603 if (gettimeofday(&rebuild1Timeout, NULL) < 0)
604 { perror("dhtLog():gettimeofday()"); }
605 //TODO: make this a configurable parameter
606 rebuild1Timeout.tv_sec += 3;
607 rebuild1TimerSet = 1;
608 //clear out previous host data
610 hostArray[0] = myHostData;
612 state = LEAD_REBUILD1_STATE;
614 replyBuffer[0] = REBUILD_CMD;
615 if (sendto(udpServerPollSock.fd, (void *)replyBuffer,
616 sizeof(char), 0, (struct sockaddr *)&bcastAddr, socklen) == -1)
617 { perror("udpListen():sendto()"); }
622 replyBuffer[0] = NOT_LEADER;
623 if(sendto(udpServerPollSock.fd, (void *)replyBuffer,
624 sizeof(char), 0,(struct sockaddr *)&clientAddr, socklen) == -1)
625 { perror("udpListen():sendto()"); }
628 if (state != LEAD_REBUILD1_STATE)
630 //consider this an official declaration of authority,
631 // in case I was confused about this
632 leader = htonl(clientAddr.sin_addr.s_addr);
636 joinReqPtr = (struct joinReq *)replyBuffer;
637 joinReqPtr->msgType = JOIN_REQ;
638 joinReqPtr->unused = 0;
639 joinReqPtr->newHostData = myHostData;
640 //note: I'm reusing bytesReceived and buffer
641 bytesReceived = udpSendWaitForResponse(leader, UDP_PORT,
642 (void *)replyBuffer, sizeof(struct joinReq), (void *)buffer,
643 BUFFER_SIZE, TIMEOUT_MS, MAX_RETRIES);
644 if ((bytesReceived == sizeof(char)) && (buffer[0] == JOIN_RES))
645 state = REBUILD1_STATE;
651 if (state == LEAD_REBUILD1_STATE)
653 joinReqPtr = (struct joinReq *)buffer;
654 addHost(joinReqPtr->newHostData);
656 replyBuffer[0] = JOIN_RES;
657 if (sendto(udpServerPollSock.fd, (void *)replyBuffer,
658 sizeof(char), 0,(struct sockaddr *)&clientAddr, socklen) == -1)
659 { perror("udpListen():sendto()"); }
662 case GET_DHT_INFO_CMD:
663 if (state == REBUILD1_STATE)
666 state = REBUILD2_STATE;
670 dhtLog("udpListen(): ERROR: Unknown message type\n");
674 } //end (pollret > 0)
675 else // (pollret == 0), timeout
677 if (gettimeofday(&now, NULL) < 0)
678 { perror("dhtLog():gettimeofday()"); }
679 if (rebuild1TimerSet && timercmp(&now, &rebuild1Timeout, >))
681 rebuild1TimerSet = 0;
682 if (state == LEAD_REBUILD1_STATE)
685 dhtLog("udpListen(): assignments made\n");
686 writeDHTdata(logfile);
687 if (hostRebuildStates != NULL)
688 free(hostRebuildStates);
689 hostRebuildStates = calloc(numHosts, sizeof(unsigned int));
690 for (i = 0; i < numHosts; i++)
691 hostRebuildStates[i] = REBUILD1_STATE;
692 state = LEAD_REBUILD2_STATE;
693 replyBuffer[0] = GET_DHT_INFO_CMD;
694 if (sendto(udpServerPollSock.fd, (void *)replyBuffer,
695 sizeof(char), 0, (struct sockaddr *)&bcastAddr, socklen) == -1)
696 { perror("udpListen():sendto()"); }
703 int udpSendWaitForResponse(unsigned int dest_ip, unsigned short dest_port,
704 void *msg, unsigned int msglen, void *resBuffer, unsigned int resBufferSize,
705 unsigned int timeout, unsigned int numRetries)
707 struct sockaddr_in server_addr;
708 struct sockaddr_in ack_addr;
709 socklen_t socklen = sizeof(struct sockaddr_in);
710 struct pollfd pollsock;
713 ssize_t bytesReceived;
715 bzero((char *) &server_addr, sizeof(server_addr));
716 server_addr.sin_family = AF_INET;
717 server_addr.sin_port = htons(dest_port);
718 server_addr.sin_addr.s_addr = htonl(dest_ip);
720 if ((pollsock.fd = socket(PF_INET, SOCK_DGRAM, IPPROTO_UDP)) == -1)
722 perror("udpSendWaitForResponse():socket()");
726 pollsock.events = POLLIN;
728 for (i = 0; i < MAX_RETRIES; i++)
731 dhtLog("udpSendWaitForResponse(): trying again, count: %d\n", i+1);
732 if (sendto(pollsock.fd, msg, msglen, 0, (struct sockaddr *)&server_addr,
735 perror("udpSendWaitForResponse():sendto");
738 dhtLog("udpSendWaitForResponse(): message sent\n");
739 retval = poll(&pollsock, 1, timeout);
742 perror("udpSendWaitForResponse():poll()");
746 bytesReceived = recvfrom(pollsock.fd, resBuffer, resBufferSize, 0,
747 (struct sockaddr *)&ack_addr, &socklen);
748 if ((ack_addr.sin_addr.s_addr == server_addr.sin_addr.s_addr)
749 && (ack_addr.sin_port == server_addr.sin_port))
752 dhtLog("udpSendWaitForResponse(): received response\n");
753 return bytesReceived;
758 printf("udpSendWaitForResponse(): timed out, no ack\n");
762 int udpBroadcastWaitForResponse(unsigned int *reply_ip,
763 unsigned short dest_port, void *msg, unsigned int msglen, void *resBuffer,
764 unsigned int resBufferSize, unsigned int timeout, unsigned int numRetries)
766 struct sockaddr_in server_addr;
767 struct sockaddr_in ack_addr;
768 socklen_t socklen = sizeof(struct sockaddr_in);
769 struct pollfd pollsock;
772 ssize_t bytesReceived;
775 bzero((char *) &server_addr, sizeof(server_addr));
776 server_addr.sin_family = AF_INET;
777 server_addr.sin_port = htons(dest_port);
778 server_addr.sin_addr.s_addr = htonl(0xFFFFFFFF);
780 if ((pollsock.fd = socket(PF_INET, SOCK_DGRAM, IPPROTO_UDP)) == -1)
782 perror("udpBroadcastWaitForResponse():socket()");
787 if (setsockopt(pollsock.fd, SOL_SOCKET, SO_BROADCAST, &on, sizeof(on)) == -1)
789 perror("udpBroadcastWaitForResponse():setsockopt()");
793 pollsock.events = POLLIN;
795 for (i = 0; i < MAX_RETRIES; i++)
798 dhtLog("udpBroadcastWaitForResponse(): trying again, count: %d\n", i+1);
799 if (sendto(pollsock.fd, msg, msglen, 0, (struct sockaddr *)&server_addr,
802 perror("udpBroadcastWaitForResponse():sendto()");
805 dhtLog("udpBroadcastWaitForResponse(): message sent\n");
806 retval = poll(&pollsock, 1, timeout);
809 bytesReceived = recvfrom(pollsock.fd, resBuffer, resBufferSize, 0,
810 (struct sockaddr *)&ack_addr, &socklen);
812 *reply_ip = htonl(ack_addr.sin_addr.s_addr);
813 dhtLog("udpBroadcastWaitForResponse(): received response\n");
814 return bytesReceived;
818 dhtLog("udpBroadcastWaitForResponse(): timed out, no ack\n");
822 // use TCP for potentially large and/or important data transfer
825 struct sockaddr_in myAddr;
826 struct sockaddr_in clientAddr;
828 socklen_t socklen = sizeof(struct sockaddr_in);
829 pthread_t threadTcpAccept;
831 tcpListenSock = socket(AF_INET, SOCK_STREAM, 0);
832 if (tcpListenSock == -1)
834 perror("tcpListen():socket()");
838 myAddr.sin_family = AF_INET;
839 myAddr.sin_port = htons(TCP_PORT);
840 myAddr.sin_addr.s_addr = INADDR_ANY;
841 memset(&(myAddr.sin_zero), '\0', 8);
843 if (bind(tcpListenSock, (struct sockaddr *)&myAddr, socklen) == -1)
845 perror("tcpListen():socket()");
849 if (listen(tcpListenSock, BACKLOG) == -1)
851 perror("tcpListen():listen()");
855 dhtLog("tcpListen(): listening on port %d\n", TCP_PORT);
859 tcpAcceptSock = accept(tcpListenSock, (struct sockaddr *)&clientAddr,
861 pthread_create(&threadTcpAccept, NULL, tcpAccept, (void *)tcpAcceptSock);
865 void *tcpAccept(void *arg)
867 int tcpAcceptSock = (int)arg;
871 dhtLog("tcpAccept(): accepted tcp connection, file descriptor: %d\n",
874 bytesReceived = recv(tcpAcceptSock, &msgType, sizeof(char), 0);
875 if (bytesReceived == -1)
876 { perror("tcpAccept():recv()"); }
877 else if (bytesReceived == 0)
879 dhtLog( "tcpAccept(): bytesReceived = 0\n", tcpAcceptSock);
886 if (send(tcpAcceptSock, &numHosts, sizeof(numHosts), 0) == -1)
888 perror("tcpAccept():send()");
891 if (send(tcpAcceptSock, &numBlocks, sizeof(numBlocks), 0) == -1)
893 perror("tcpAccept():send()");
896 if (send(tcpAcceptSock, hostArray, numHosts*sizeof(struct hostData),
899 perror("tcpAccept():send()");
902 if (send(tcpAcceptSock, blockOwnerArray, numBlocks*sizeof(unsigned int),
905 perror("tcpAccept():send()");
910 dhtLog("tcpAccept(): unrecognized msg type\n");
914 if (close(tcpAcceptSock) == -1)
915 { perror("tcpAccept():close()"); }
917 dhtLog("tcpAccept(): closed tcp connection, file descriptor: %d\n",
923 unsigned int getKeyOwner(unsigned int key)
925 if (state == NORMAL_STATE || state == LEAD_NORMAL_STATE
926 || state == REBUILD3_STATE || state == LEAD_REBUILD3_STATE)
928 return hostArray[blockOwnerArray[hash(key)]].ipAddr;
931 { //TODO: figure out what is best to do here. Would like calls to dhtSearch,
932 // etc. to block rather than fail during rebuilds
937 unsigned int getMyIpAddr()
940 struct ifreq interfaceInfo;
941 struct sockaddr_in *myAddr = (struct sockaddr_in *)&interfaceInfo.ifr_addr;
943 memset(&interfaceInfo, 0, sizeof(struct ifreq));
945 if((sock = socket(AF_INET, SOCK_STREAM, 0)) < 0)
947 perror("getMyIpAddr():socket()");
951 strcpy(interfaceInfo.ifr_name, DEFAULT_INTERFACE);
952 myAddr->sin_family = AF_INET;
954 if(ioctl(sock, SIOCGIFADDR, &interfaceInfo) != 0)
956 perror("getMyIpAddr():ioctl()");
960 return ntohl(myAddr->sin_addr.s_addr);
963 unsigned int findLeader()
965 unsigned int reply_ip;
970 dhtLog("findLeader(): broadcasting...\n");
972 myMessage = FIND_LEADER_REQ;
974 bytesReceived = udpBroadcastWaitForResponse(&reply_ip, UDP_PORT,
975 (void *)&myMessage, sizeof(myMessage), (void *)&response,
976 sizeof(response), TIMEOUT_MS, MAX_RETRIES);
978 if (bytesReceived == -1)
980 dhtLog("findLeader(): no response\n");
983 else if (response == FIND_LEADER_RES)
985 struct in_addr reply_addr;
986 reply_addr.s_addr = htonl(reply_ip);
987 dhtLog("findLeader(): leader found:%s\n",
988 inet_ntoa(reply_addr));
993 dhtLog("findLeader(): unexpected response\n");
1000 struct sockaddr_in leader_addr;
1007 if ((sock = socket(AF_INET, SOCK_STREAM, 0)) == -1)
1009 perror("getDHTdata():socket()");
1013 bzero((char *)&leader_addr, sizeof(leader_addr));
1014 leader_addr.sin_family = AF_INET;
1015 leader_addr.sin_port = htons(TCP_PORT);
1016 leader_addr.sin_addr.s_addr = htonl(leader);
1018 if (connect(sock, (struct sockaddr *)&leader_addr, sizeof(leader_addr)) == -1)
1020 perror("getDHTdata():connect()");
1025 if (send(sock, &msg, sizeof(char), 0) == -1)
1027 perror("getDHTdata():send()");
1031 bytesReceived = recv(sock, &numHosts, sizeof(numHosts), 0);
1032 if (bytesReceived == -1)
1034 perror("getDHTdata():recv()");
1038 if (bytesReceived != sizeof(numHosts))
1040 dhtLog("getDHTdata(): ERROR: numHosts not completely received\n");
1044 bytesReceived = recv(sock, &numBlocks, sizeof(numBlocks), 0);
1045 if (bytesReceived == -1)
1047 perror("getDHTdata():recv()");
1051 if (bytesReceived != sizeof(numBlocks))
1053 dhtLog("getDHTdata(): ERROR: numBlocks not completely received\n");
1057 hostArray = calloc(numHosts, sizeof(struct hostData));
1058 bytesReceived = recv(sock, hostArray, numHosts*sizeof(struct hostData), 0);
1059 if (bytesReceived == -1)
1061 perror("getDHTdata():recv()");
1065 if (bytesReceived != numHosts*sizeof(struct hostData))
1067 dhtLog("getDHTdata(): ERROR: hostArray not completely received\n");
1071 blockOwnerArray = calloc(numBlocks, sizeof(unsigned int));
1072 bytesReceived = recv(sock,blockOwnerArray,numBlocks*sizeof(unsigned int),0);
1073 if (bytesReceived == -1)
1075 perror("getDHTdata():recv()");
1079 if (bytesReceived != numBlocks*sizeof(unsigned int))
1081 dhtLog("getDHTdata(): ERROR: blockOwnerArray not completely received\n");
1085 dhtLog("getDHTdata(): got data:\n");
1086 writeDHTdata(logfile);
1091 unsigned int hash(unsigned int x)
1093 //this shouldn't be called when numBlocks = 0, so if you get a divide-by-zero,
1094 // make sure we are in a proper state for key owner lookups
1095 return x % numBlocks;
1098 //This function will not return until it succeeds in submitting
1099 // a rebuild request to the leader. It is then the leader's responibility
1100 // to ensure that the rebuild is caried out
1115 if (retry_count > 0)
1117 dhtLog("initRebuild(): retry count:%d\n", retry_count);
1120 if (leader == 0 || retry_count > 0)
1122 leader = findLeader(); //broadcast
1123 if (leader == 0) //no response
1125 //TODO:elect leader: this will do for now
1127 leader = getMyIpAddr();
1128 state = LEAD_NORMAL_STATE;
1134 bytesReceived = udpSendWaitForResponse(leader, UDP_PORT,
1135 (void *)&msg, sizeof(msg), (void *)&response, sizeof(response),
1136 TIMEOUT_MS, MAX_RETRIES);
1137 if (bytesReceived == -1)
1138 { perror("initRebuild():recv()"); }
1139 else if (bytesReceived != sizeof(response))
1141 dhtLog("initRebuild(): ERROR: response not completely received\n");
1143 else if (response == NOT_LEADER)
1145 struct in_addr address;
1146 address.s_addr = htonl(leader);
1147 dhtLog("initRebuild(): ERROR: %s no longer leader\n",
1148 inet_ntoa(address));
1150 else if (response != REBUILD_RES)
1152 dhtLog("initRebuild(): ERROR: unexpected response\n");
1156 dhtLog("initRebuild(): submitted rebuild request\n");
1157 writeDHTdata(logfile);
1164 void writeDHTdata(FILE *outfile)
1167 struct in_addr address;
1169 fprintf(outfile,"numHosts=%d,numBlocks=%d\n", numHosts, numBlocks);
1170 fprintf(outfile,"hostArray: index: ipAddr, maxKeyCapacity\n");
1171 for (i = 0; i < numHosts; i++)
1173 address.s_addr = htonl(hostArray[i].ipAddr);
1174 fprintf(outfile,"%d: %s, %d\n", i, inet_ntoa(address),
1175 hostArray[i].maxKeyCapacity);
1177 fprintf(outfile,"blockOwnerArray: index: blockOwner\n");
1178 for (i = 0; i < numBlocks; i++)
1179 fprintf(outfile,"%d: %d ", i, blockOwnerArray[i]);
1180 fprintf(outfile,"\n");
1185 if (hostArray != NULL)
1190 if (blockOwnerArray != NULL)
1192 free(blockOwnerArray);
1193 blockOwnerArray = NULL;
1195 numHosts = numBlocks = hostArraySize = 0;
1204 hostArraySize = INIT_HOST_ALLOC;
1205 hostArray = calloc(hostArraySize, sizeof(struct hostData));
1207 hostArray[0] = myHostData;
1208 numBlocks = INIT_BLOCK_NUM;
1209 blockOwnerArray = calloc(numBlocks, sizeof(unsigned int));
1210 for (i = 0; i < numBlocks; i++)
1211 blockOwnerArray[i] = 0;
1216 void addHost(struct hostData newHost)
1218 struct hostData *newArray;
1219 unsigned int newArraySize;
1221 if (hostArray == NULL || blockOwnerArray == NULL || hostArraySize == 0)
1224 if (numHosts == hostArraySize)
1226 newArraySize = hostArraySize * 2;
1227 newArray = calloc(newArraySize, sizeof(struct hostData));
1228 memcpy(newArray, hostArray, (hostArraySize * sizeof(struct hostData)));
1230 hostArray = newArray;
1231 hostArraySize = newArraySize;
1234 hostArray[numHosts] = newHost;
1240 void makeAssignments()
1244 if (hostArray == NULL || blockOwnerArray == NULL || hostArraySize == 0)
1247 if (numBlocks < numHosts)
1249 free(blockOwnerArray);
1250 while (numBlocks < numHosts)
1252 blockOwnerArray = calloc(numBlocks, sizeof(unsigned int));
1255 for (i = 0; i < numBlocks; i++)
1256 blockOwnerArray[i] = i % numHosts;
1261 //returns not-zero if ok, zero if not ok
1262 int msgSizeOk(unsigned char type, unsigned int size)
1269 status = (size == sizeof(struct insertCmd));
1272 status = (size == sizeof(struct insertRes));
1275 status = (size == sizeof(struct removeCmd));
1278 status = (size == sizeof(struct removeRes));
1281 status = (size == sizeof(struct searchCmd));
1284 status = (size == sizeof(struct searchRes));
1286 case FIND_LEADER_REQ:
1287 status = (size == sizeof(char));
1289 case FIND_LEADER_RES:
1290 status = (size == sizeof(char));
1293 status = (size == sizeof(char));
1296 status = (size == sizeof(char));
1299 status = (size == sizeof(char));
1302 status = (size == sizeof(char));
1305 status = (size == sizeof(struct joinReq));
1308 status = (size == sizeof(char));
1310 case GET_DHT_INFO_CMD:
1311 status = (size == sizeof(char));
1314 status = (size == sizeof(char));
1317 status = (size == sizeof(char));
1320 status = (size == sizeof(char));
1323 status = (size == sizeof(char));
1325 case REBUILD_DONE_INFO:
1326 status = (size == sizeof(char));
1335 void dhtLog(const char *format, ...)
1340 if (gettimeofday(&now, NULL) < 0)
1341 { perror("dhtLog():gettimeofday()"); }
1342 va_start(args, format);
1343 if (fprintf(logfile, "%d.%06d:", now.tv_sec, now.tv_usec) < 0)
1344 { perror("dhtLog():fprintf()"); }
1345 if (vfprintf(logfile, format, args) < 0)
1346 { perror("dhtLog():vfprintf()"); }
1347 if (fflush(logfile) == EOF)
1348 { perror("dhtLog():fflush()"); }