8 #define OIDS_PER_HOST 0x40000000
10 //set these to your IP addresses
11 unsigned int hosts[NUM_HOSTS] = {
19 void dhtInit(unsigned int maxKeyCapaciy)
26 //does nothing, returns 0
27 int dhtInsert(unsigned int key, unsigned int val)
30 //does nothing, returns 0
31 int dhtRemove(unsigned int key)
34 //returns 0 if successful and copies val into *val,
35 // 1 if key not found, -1 if an error occurred
36 int dhtSearch(unsigned int key, unsigned int *val)
38 *val = hosts[key / OIDS_PER_HOST];
44 #include <netinet/in.h>
45 #include <arpa/inet.h>
46 #include <sys/types.h>
47 #include <sys/socket.h>
48 #include <sys/ioctl.h>
58 #include <linux/sockios.h>
59 #include "clookup.h" //this works for now, do we need anything better?
61 #define BUFFER_SIZE 512 //maximum message size
64 #define BACKLOG 10 //max pending tcp connections
65 #define TIMEOUT_MS 500
67 #define INIT_HOST_ALLOC 16
68 #define INIT_BLOCK_NUM 64
69 #define DEFAULT_INTERFACE "eth0"
70 #define DHT_LOG "dht.log"
107 unsigned int maxKeyCapacity;
111 unsigned int msgType:8;
112 unsigned int unused:24;
118 unsigned int msgType:8;
119 unsigned int unused:24;
124 unsigned int msgType:8;
125 unsigned int unused:24;
130 unsigned int msgType:8;
131 unsigned int unused:24;
136 unsigned int msgType:8;
137 unsigned int unused:24;
142 unsigned int msgType:8;
143 unsigned int unused:24;
149 //TODO: leave message, rebuild message...
152 unsigned int leader; //ip address of leader
153 struct hostData myHostData;
155 unsigned int numHosts;
156 struct hostData *hostArray;
157 unsigned int numBlocks;
158 unsigned int *blockOwnerArray;
159 /*----end DHT data----*/
161 //return my IP address
162 unsigned int getMyIpAddr();
163 //sends broadcast to discover leader
164 unsigned int getLeadersIpAddr();
169 //TCP connection handler
170 void *tcpAccept(void *);
171 //returns number of bytes received in resBuffer, or -1 if an error occurred
172 int udpSendWaitForResponse(unsigned int dest_ip, unsigned short dest_port,
173 void *msg, unsigned int msglen, void *resBuffer, unsigned int resBufferSize,
174 unsigned int timeout, unsigned int numRetries);
175 //returns number of bytes received in resBuffer, or -1 if an error occurred
176 int udpBroadcastWaitForResponse(unsigned int *reply_ip,
177 unsigned short dest_port, void *msg, unsigned int msglen, void *resBuffer,
178 unsigned int resBufferSize, unsigned int timeout, unsigned int numRetries);
180 int sendNoWait(unsigned int dest_ip, unsigned short dest_port, void *msg,
181 unsigned int msglen);
182 //right now this hashes the key into a block and returns the block owner
183 unsigned int getKeyOwner(unsigned int key);
185 unsigned int hash(unsigned int x);
186 //initiates TCP connection with leader, gets DHT data
188 //outputs readable DHT data to outfile
189 void writeDHTdata(FILE *outfile);
191 void dhtInit(unsigned int maxKeyCapacity)
193 unsigned int myMessage;
199 logfile = fopen(DHT_LOG, "w");
202 myHostData.ipAddr = getMyIpAddr();
203 myHostData.maxKeyCapacity = maxKeyCapacity;
205 numHosts = numBlocks = 0;
207 blockOwnerArray = NULL;
209 leader = getLeadersIpAddr();
212 { //no response: I am the first
213 leader = getMyIpAddr();
216 hostArray = calloc(numHosts, sizeof(struct hostData));
217 hostArray[0] = myHostData;
218 numBlocks = INIT_BLOCK_NUM;
219 blockOwnerArray = calloc(numBlocks, sizeof(unsigned int));
220 for (i = 0; i < numBlocks; i++)
221 blockOwnerArray[i] = 0;
225 //get DHT data from leader
228 //TODO: actually, just initiate a rebuild here instead
232 pthread_t threadUdpListen, threadTcpListen;
233 pthread_create(&threadUdpListen, NULL, udpListen, NULL);
234 pthread_create(&threadTcpListen, NULL, tcpListen, NULL);
244 int dhtInsert(unsigned int key, unsigned int val)
246 unsigned int dest_ip = getKeyOwner(key);
247 struct insertCmd myMessage;
248 struct insertRes response;
251 myMessage.msgType = INSERT_CMD;
255 bytesReceived = udpSendWaitForResponse(dest_ip, UDP_PORT, (void *)&myMessage,
256 sizeof(struct insertCmd), (void *)&response, sizeof(struct insertRes),
257 TIMEOUT_MS, MAX_RETRIES);
258 if (bytesReceived == sizeof(struct insertRes))
260 if (response.msgType == INSERT_RES)
262 if (response.status == INSERT_OK)
264 // if (response.status == NOT_KEY_OWNER)
267 //TODO: find owner and try again, request rebuild if necessary
268 return -1; //this function should be robust enough to always return 0
271 int dhtRemove(unsigned int key)
273 unsigned int dest_ip = getKeyOwner(key);
274 struct removeCmd myMessage;
275 struct removeRes response;
278 myMessage.msgType = REMOVE_CMD;
281 bytesReceived = udpSendWaitForResponse(dest_ip, UDP_PORT, (void *)&myMessage,
282 sizeof(struct removeCmd), (void *)&response, sizeof(struct removeRes),
283 TIMEOUT_MS, MAX_RETRIES);
284 if (bytesReceived == sizeof(struct removeRes))
286 if (response.msgType == REMOVE_RES)
288 if (response.status == REMOVE_OK)
290 // if (response.status == NOT_KEY_OWNER)
293 //TODO: find owner and try again, request rebuild if necessary
294 return -1; //this function should be robust enough to always return 0
297 int dhtSearch(unsigned int key, unsigned int *val)
299 unsigned int dest_ip = getKeyOwner(key);
300 struct searchCmd myMessage;
301 struct searchRes response;
304 myMessage.msgType = SEARCH_CMD;
307 bytesReceived = udpSendWaitForResponse(dest_ip, UDP_PORT, (void *)&myMessage,
308 sizeof(struct searchCmd), (void *)&response, sizeof(struct searchRes),
309 TIMEOUT_MS, MAX_RETRIES);
310 if (bytesReceived == sizeof(struct searchRes))
312 if (response.msgType == SEARCH_RES)
314 if (response.status == KEY_FOUND)
319 if (response.status == KEY_NOT_FOUND)
323 // if (response.status == NOT_KEY_OWNER)
326 //TODO: find owner and try again, request rebuild if necessary
327 return -1; //this function should be robust enough to always return 0 or 1
332 //use UDP for messages that are frequent and short
335 struct sockaddr_in myAddr;
336 struct sockaddr_in clientAddr;
338 socklen_t socklen = sizeof(struct sockaddr_in);
339 char buffer[BUFFER_SIZE];
340 ssize_t bytesReceived;
341 struct insertCmd *insertCmdPtr;
342 struct removeCmd *removeCmdPtr;
343 struct searchCmd *searchCmdPtr;
344 struct insertRes *insertResPtr;
345 struct removeRes *removeResPtr;
346 struct searchRes *searchResPtr;
347 char replyBuffer[BUFFER_SIZE];
350 chashtable_t *myHashTable = chashCreate(HASH_SIZE, LOADFACTOR);
352 if ((sock = socket(AF_INET, SOCK_DGRAM, IPPROTO_UDP)) == -1)
354 perror("udpListen():socket()");
358 bzero(&myAddr, socklen);
359 myAddr.sin_family=AF_INET;
360 myAddr.sin_addr.s_addr=INADDR_ANY;
361 myAddr.sin_port=htons(UDP_PORT);
363 if (bind(sock, (struct sockaddr *)&myAddr, socklen) == -1)
365 perror("udpListen():bind()");
369 fprintf(logfile,"udpListen(): listening on port %d\n", UDP_PORT);
374 if ((bytesReceived = recvfrom(sock, buffer, BUFFER_SIZE, 0,
375 (struct sockaddr *)&clientAddr, &socklen)) == -1)
377 perror("udpListen():recvfrom()");
380 if (bytesReceived == 0)
383 fprintf(logfile,"udpListen(): recvfrom() returned 0\n");
388 gettimeofday(&now, NULL);
390 fprintf(logfile,"udpListen(): message received:%ds,%dus\n", now.tv_sec,
392 fprintf(logfile,"udpListen(): received %d bytes from %s:%d\n",
393 bytesReceived,inet_ntoa(clientAddr.sin_addr),htons(clientAddr.sin_port));
400 if (bytesReceived != sizeof(struct insertCmd))
403 fprintf(logfile, "udpListen(): ERROR: incorrect message size\n");
408 insertCmdPtr = (struct insertCmd *)buffer;
410 fprintf(logfile, "udpListen(): Insert: key=%d, val=%d\n",
411 insertCmdPtr->key, insertCmdPtr->val);
414 insertResPtr = (struct insertRes *)replyBuffer;
415 insertResPtr->msgType = INSERT_RES;
416 if (getKeyOwner(insertCmdPtr->key) == myHostData.ipAddr)
418 //note: casting val to void * in order to conform to API
419 if(chashInsert(myHashTable, insertCmdPtr->key,
420 (void *)insertCmdPtr->val) == 0)
421 insertResPtr->status = INSERT_OK;
423 insertResPtr->status = INSERT_ERROR;
427 insertResPtr->status = NOT_KEY_OWNER;;
429 if (sendto(sock, (void *)insertResPtr, sizeof(struct insertRes), 0,
430 (struct sockaddr *)&clientAddr, socklen) == -1)
432 perror("udpListen():sendto()");
436 if (bytesReceived != sizeof(struct removeCmd))
439 fprintf(logfile, "udpListen(): ERROR: incorrect message size\n");
444 removeCmdPtr = (struct removeCmd *)buffer;
446 fprintf(logfile,"udpListen(): Remove: key=%d\n", removeCmdPtr->key);
449 removeResPtr = (struct removeRes *)replyBuffer;
450 removeResPtr->msgType = REMOVE_RES;
451 if (getKeyOwner(removeCmdPtr->key) == myHostData.ipAddr)
453 //note: casting val to void * in order to conform to API
454 if(chashRemove(myHashTable, removeCmdPtr->key) == 0)
455 removeResPtr->status = INSERT_OK;
457 removeResPtr->status = INSERT_ERROR;
461 removeResPtr->status = NOT_KEY_OWNER;
463 if (sendto(sock, (void *)removeResPtr, sizeof(struct removeRes), 0,
464 (struct sockaddr *)&clientAddr, socklen) == -1)
466 perror("udpListen():sendto()");
470 if (bytesReceived != sizeof(struct searchCmd))
473 fprintf(logfile,"udpListen(): ERROR: incorrect message size\n");
478 searchCmdPtr = (struct searchCmd *)buffer;
480 fprintf(logfile,"udpListen(): Search: key=%d\n",searchCmdPtr->key);
483 searchResPtr = (struct searchRes *)replyBuffer;
484 searchResPtr->msgType = SEARCH_RES;
485 if (getKeyOwner(searchCmdPtr->key) == myHostData.ipAddr)
487 //note: casting val to void * in order to conform to API
488 if((searchResPtr->val = (unsigned int)chashSearch(myHashTable,
489 searchCmdPtr->key)) == 0)
490 searchResPtr->status = KEY_NOT_FOUND;
492 searchResPtr->status = KEY_FOUND;
496 searchResPtr->status = NOT_KEY_OWNER;
498 if (sendto(sock, (void *)searchResPtr, sizeof(struct searchRes), 0,
499 (struct sockaddr *)&clientAddr, socklen) == -1)
501 perror("udpListen():sendto()");
504 case FIND_LEADER_CMD:
505 if (bytesReceived != sizeof(char))
508 fprintf(logfile, "udpListen(): ERROR: incorrect message size\n");
513 if (leader == getMyIpAddr())
515 replyBuffer[0] = FIND_LEADER_RES;
516 if(sendto(sock, (void *)replyBuffer, sizeof(char), 0,
517 (struct sockaddr *)&clientAddr, socklen) == -1)
519 perror("udpListen():sendto");
525 fprintf(logfile,"udpListen(): ERROR: Unknown message type\n");
532 int udpSendWaitForResponse(unsigned int dest_ip, unsigned short dest_port,
533 void *msg, unsigned int msglen, void *resBuffer, unsigned int resBufferSize,
534 unsigned int timeout, unsigned int numRetries)
536 struct sockaddr_in server_addr;
537 struct sockaddr_in ack_addr;
538 socklen_t socklen = sizeof(struct sockaddr_in);
539 struct pollfd pollsock;
543 ssize_t bytesReceived;
545 bzero((char *) &server_addr, sizeof(server_addr));
546 server_addr.sin_family = AF_INET;
547 server_addr.sin_port = htons(dest_port);
548 server_addr.sin_addr.s_addr = htonl(dest_ip);
550 if ((pollsock.fd = socket(PF_INET, SOCK_DGRAM, IPPROTO_UDP)) == -1)
552 perror("udpSendWaitForResponse():socket()");
556 pollsock.events = POLLIN;
558 for (i = 0; i < MAX_RETRIES; i++)
562 fprintf(logfile,"udpSendWaitForResponse(): trying again, count: %d\n",
566 if (sendto(pollsock.fd, msg, msglen, 0, (struct sockaddr *)&server_addr,
569 perror("udpSendWaitForResponse():sendto");
573 gettimeofday(&now, NULL);
574 fprintf(logfile,"udpSendWaitForResponse(): message sent:%ds,%dus\n",
575 now.tv_sec, now.tv_usec);
578 retval = poll(&pollsock, 1, timeout);
581 bytesReceived = recvfrom(pollsock.fd, resBuffer, resBufferSize, 0,
582 (struct sockaddr *)&ack_addr, &socklen);
583 if ((ack_addr.sin_addr.s_addr == server_addr.sin_addr.s_addr)
584 && (ack_addr.sin_port == server_addr.sin_port))
588 gettimeofday(&now, NULL);
589 fprintf(logfile,"udpSendWaitForResponse(): received response:%ds,%dus\n", now.tv_sec, now.tv_usec);
592 return bytesReceived;
598 gettimeofday(&now, NULL);
599 printf("udpSendWaitForResponse(): timed out, no ack:%ds,%dus\n",
600 now.tv_sec, now.tv_usec);
606 int udpBroadcastWaitForResponse(unsigned int *reply_ip,
607 unsigned short dest_port, void *msg, unsigned int msglen, void *resBuffer,
608 unsigned int resBufferSize, unsigned int timeout, unsigned int numRetries)
610 struct sockaddr_in server_addr;
611 struct sockaddr_in ack_addr;
612 socklen_t socklen = sizeof(struct sockaddr_in);
613 struct pollfd pollsock;
617 ssize_t bytesReceived;
620 bzero((char *) &server_addr, sizeof(server_addr));
621 server_addr.sin_family = AF_INET;
622 server_addr.sin_port = htons(dest_port);
623 server_addr.sin_addr.s_addr = htonl(0xFFFFFFFF);
625 if ((pollsock.fd = socket(PF_INET, SOCK_DGRAM, IPPROTO_UDP)) == -1)
627 perror("udpBroadcastWaitForResponse():socket()");
632 if (setsockopt(pollsock.fd, SOL_SOCKET, SO_BROADCAST, &on, sizeof(on)) == -1)
634 perror("udpBroadcastWaitForResponse():setsockopt()");
638 pollsock.events = POLLIN;
640 for (i = 0; i < MAX_RETRIES; i++)
644 fprintf(logfile,"udpBroadcastWaitForResponse(): trying again, count: %d\n", i+1);
647 if (sendto(pollsock.fd, msg, msglen, 0, (struct sockaddr *)&server_addr,
650 perror("udpBroadcastWaitForResponse():sendto()");
654 gettimeofday(&now, NULL);
655 fprintf(logfile,"udpBroadcastWaitForResponse(): message sent:%ds,%dus\n",
656 now.tv_sec, now.tv_usec);
659 retval = poll(&pollsock, 1, timeout);
662 bytesReceived = recvfrom(pollsock.fd, resBuffer, resBufferSize, 0,
663 (struct sockaddr *)&ack_addr, &socklen);
665 *reply_ip = htonl(ack_addr.sin_addr.s_addr);
667 gettimeofday(&now, NULL);
668 fprintf(logfile,"udpBroadcastWaitForResponse(): received response:%ds,%dus\n", now.tv_sec, now.tv_usec);
671 return bytesReceived;
676 gettimeofday(&now, NULL);
677 fprintf(logfile,"udpBroadcastWaitForResponse(): timed out, no ack:%ds,%dus\n",
678 now.tv_sec, now.tv_usec);
684 // use TCP for potentially large and/or important data transfer
687 struct sockaddr_in myAddr;
688 struct sockaddr_in clientAddr;
689 int sockListen, sockAccept;
690 socklen_t socklen = sizeof(struct sockaddr_in);
691 pthread_t threadTcpAccept;
693 sockListen = socket(AF_INET, SOCK_STREAM, 0);
694 if (sockListen == -1)
696 perror("tcpListen():socket()");
700 myAddr.sin_family = AF_INET;
701 myAddr.sin_port = htons(TCP_PORT);
702 myAddr.sin_addr.s_addr = INADDR_ANY;
703 memset(&(myAddr.sin_zero), '\0', 8);
705 if (bind(sockListen, (struct sockaddr *)&myAddr, socklen) == -1)
707 perror("tcpListen():socket()");
711 if (listen(sockListen, BACKLOG) == -1)
713 perror("tcpListen():listen()");
718 fprintf(logfile,"tcpListen(): listening on port %d\n", TCP_PORT);
724 sockAccept = accept(sockListen, (struct sockaddr *)&clientAddr, &socklen);
725 pthread_create(&threadTcpAccept, NULL, tcpAccept, (void *)sockAccept);
729 void *tcpAccept(void *arg)
731 int sockAccept = (int)arg;
736 fprintf(logfile, "tcpAccept(): accepted tcp connection, file descriptor: %d\n", sockAccept);
740 bytesReceived = recv(sockAccept, &msgType, sizeof(char), 0);
741 if (bytesReceived == -1)
743 perror("tcpAccept():recv()");
745 else if (bytesReceived == 0)
748 fprintf(logfile, "tcpAccept(): bytesReceived = 0\n", sockAccept);
757 if (send(sockAccept, &numHosts, sizeof(numHosts), 0) == -1)
759 perror("tcpAccept():send()");
762 if (send(sockAccept, &numBlocks, sizeof(numBlocks), 0) == -1)
764 perror("tcpAccept():send()");
767 if (send(sockAccept, hostArray, numHosts*sizeof(struct hostData),
770 perror("tcpAccept():send()");
773 if (send(sockAccept, blockOwnerArray, numBlocks*sizeof(unsigned int),
776 perror("tcpAccept():send()");
782 fprintf(logfile, "tcpAccept(): unrecognized msg type\n");
788 if (close(sockAccept) == -1)
790 perror("tcpAccept():close()");
794 fprintf(logfile, "tcpAccept(): closed tcp connection, file descriptor: %d\n",
802 unsigned int getKeyOwner(unsigned int key)
804 return hostArray[blockOwnerArray[hash(key)]].ipAddr;
807 unsigned int getMyIpAddr()
810 struct ifreq interfaceInfo;
811 struct sockaddr_in *myAddr = (struct sockaddr_in *)&interfaceInfo.ifr_addr;
813 memset(&interfaceInfo, 0, sizeof(struct ifreq));
815 if((sock = socket(AF_INET, SOCK_STREAM, 0)) < 0)
817 perror("getMyIpAddr():socket()");
821 strcpy(interfaceInfo.ifr_name, DEFAULT_INTERFACE);
822 myAddr->sin_family = AF_INET;
824 if(ioctl(sock, SIOCGIFADDR, &interfaceInfo) != 0)
826 perror("getMyIpAddr():ioctl()");
830 return ntohl(myAddr->sin_addr.s_addr);
833 unsigned int getLeadersIpAddr()
835 unsigned int reply_ip;
841 fprintf(logfile, "getLeadersIpAddr(): broadcasting...\n");
845 myMessage = FIND_LEADER_CMD;
847 bytesReceived = udpBroadcastWaitForResponse(&reply_ip, UDP_PORT,
848 (void *)&myMessage, sizeof(myMessage), (void *)&response,
849 sizeof(response), TIMEOUT_MS, MAX_RETRIES);
851 if (bytesReceived == -1)
854 fprintf(logfile, "getLeadersIpAddr(): no response\n");
859 else if (response == FIND_LEADER_RES)
862 struct in_addr reply_addr;
863 reply_addr.s_addr = htonl(reply_ip);
864 fprintf(logfile, "getLeadersIpAddr(): leader found:%s\n",
865 inet_ntoa(reply_addr));
873 fprintf(logfile, "getLeadersIpAddr(): unexpected response\n");
882 struct sockaddr_in leader_addr;
887 if ((sock = socket(AF_INET, SOCK_STREAM, 0)) == -1)
889 perror("getDHTdata():socket()");
893 bzero((char *)&leader_addr, sizeof(leader_addr));
894 leader_addr.sin_family = AF_INET;
895 leader_addr.sin_port = htons(TCP_PORT);
896 leader_addr.sin_addr.s_addr = htonl(leader);
898 if (connect(sock, (struct sockaddr *)&leader_addr, sizeof(leader_addr)) == -1)
900 perror("getDHTdata():connect()");
905 if (send(sock, &msg, sizeof(char), 0) == -1)
907 perror("getDHTdata():send()");
911 bytesReceived = recv(sock, &numHosts, sizeof(numHosts), 0);
912 if (bytesReceived == -1)
914 perror("getDHTdata():recv()");
918 if (bytesReceived != sizeof(numHosts))
921 fprintf(logfile,"getDHTdata(): ERROR: numHosts not completely received\n");
927 bytesReceived = recv(sock, &numBlocks, sizeof(numBlocks), 0);
928 if (bytesReceived == -1)
930 perror("getDHTdata():recv()");
934 if (bytesReceived != sizeof(numBlocks))
937 fprintf(logfile,"getDHTdata(): ERROR: numBlocks not completely received\n");
943 if (hostArray != NULL)
945 hostArray = calloc(numHosts, sizeof(struct hostData));
946 bytesReceived = recv(sock, hostArray, numHosts*sizeof(struct hostData), 0);
947 if (bytesReceived == -1)
949 perror("getDHTdata():recv()");
953 if (bytesReceived != numHosts*sizeof(struct hostData))
956 fprintf(logfile,"getDHTdata(): ERROR: hostArray not completely received\n");
962 if (blockOwnerArray != NULL)
963 free(blockOwnerArray);
964 blockOwnerArray = calloc(numBlocks, sizeof(unsigned int));
965 bytesReceived = recv(sock, blockOwnerArray, numBlocks*sizeof(unsigned int), 0);
966 if (bytesReceived == -1)
968 perror("getDHTdata():recv()");
972 if (bytesReceived != numBlocks*sizeof(unsigned int))
975 fprintf(logfile,"getDHTdata(): ERROR: blockOwnerArray not completely received\n");
982 fprintf(logfile,"getDHTdata(): got data:\n");
983 writeDHTdata(logfile);
989 unsigned int hash(unsigned int x)
991 return x % numBlocks;
999 void writeDHTdata(FILE *outfile)
1002 struct in_addr address;
1003 fprintf(outfile,"numHosts=%d,numBlocks=%d\n", numHosts, numBlocks);
1004 fprintf(outfile,"hostArray: index: ipAddr, maxKeyCapacity\n");
1005 for (i = 0; i < numHosts; i++)
1007 address.s_addr = htonl(hostArray[i].ipAddr);
1008 fprintf(outfile,"%d: %s, %d\n", i, inet_ntoa(address),
1009 hostArray[i].maxKeyCapacity);
1011 fprintf(outfile,"blockOwnerArray: index: blockOwner\n");
1012 for (i = 0; i < numBlocks; i++)
1014 fprintf(outfile,"%d: %d\n", i, blockOwnerArray[i]);