1 /*******************************************************************************
4 * High-performance Distributed Hash Table for finding the location of objects
5 * in a Distributed Shared Transactional Memory system.
10 * 1) Instead of having dhtInsertMult, dhtSearchMult, etc. call their single-key
11 * counterparts repeatedly, define some new messages to handle it more
13 * 2) Improve the efficiency of functions that work with hostArray, hostReplied,
14 * and blockOwnerArray.
15 * 3) Currently a join or leave causes a rebuild of the entire hash table.
16 * Implement more graceful join and leave procedures.
17 * 4) Fine tune timeout values for performance, possibly implement a backoff
18 * algorithm to prevent overloading the network.
19 * 5) Whatever else I'm forgetting
21 *******************************************************************************/
22 /*******************************************************************************
24 *******************************************************************************/
26 #include <netinet/in.h>
27 #include <arpa/inet.h>
28 #include <sys/types.h>
29 #include <sys/socket.h>
30 #include <sys/ioctl.h>
41 #include <linux/sockios.h>
42 #include <sys/queue.h>
44 #include "clookup.h" //this works for now, do we need anything better?
47 /*******************************************************************************
48 * Local Defines, Structs
49 *******************************************************************************/
51 #define MAX_MSG_SIZE 1500
53 #define INIT_HOST_ALLOC 3
54 #define INIT_NUM_BLOCKS 16
55 #define DEFAULT_INTERFACE "eth0"
56 #define TIMEOUT_PERIOD 100
57 #define INSERT_TIMEOUT_MS 500
58 #define INSERT_RETRIES 50
59 #define REMOVE_TIMEOUT_MS 500
60 #define REMOVE_RETRIES 50
61 #define SEARCH_TIMEOUT_MS 500
62 #define SEARCH_RETRIES 50
65 //make sure this matches msg_types global var
95 //make sure this matches state_names, timeout_vals, and retry_vals global vars
132 unsigned int maxKeyCapacity;
135 /*******************************************************************************
136 * Local Function Prototypes
137 *******************************************************************************/
139 int msgSizeOk(unsigned char *msg, unsigned int size);
140 unsigned short read2(unsigned char *msg);
141 unsigned int read4(unsigned char *msg);
142 void write2(unsigned char *ptr, unsigned short tmp);
143 void write4(unsigned char *ptr, unsigned int tmp);
144 unsigned int getMyIpAddr(const char *interfaceStr);
145 int udpSend(unsigned char *msg, unsigned int size, unsigned int destIp);
146 int udpSendAll(unsigned char *msg, unsigned int size);
147 unsigned int hash(unsigned int x);
148 unsigned int getKeyOwner(unsigned int key);
149 void setState(unsigned int newState);
150 void makeAssignments();
151 int addHost(struct hostData newHost);
152 int removeHost(unsigned int ipAddr);
153 void removeUnresponsiveHosts();
154 int checkReplied(unsigned int ipAddr);
156 void writeHostList();
157 void dhtLog(const char *format, ...);
161 /*******************************************************************************
163 *******************************************************************************/
165 //make sure this matches enumeration above
166 const char *msg_types[NUM_MSG_TYPES] =
193 const char *state_names[NUM_STATES] =
198 "LEAD_NORMAL1_STATE",
199 "LEAD_NORMAL2_STATE",
208 "LEAD_REBUILD1_STATE",
209 "LEAD_REBUILD2_STATE",
210 "LEAD_REBUILD3_STATE",
211 "LEAD_REBUILD4_STATE",
216 //note: { 0, 0 } means no timeout
217 struct timeval timeout_vals[NUM_STATES] ={
218 { 0, 500000 }, //INIT1_STATE
219 { 0, 500000 }, //INIT2_STATE
220 { 0, 0 }, //NORMAL_STATE
221 { 0, 0 }, //LEAD_NORMAL1_STATE
222 { 3, 0 }, //LEAD_NORMAL2_STATE
223 { 1, 0 }, //ELECT1_STATE
224 { 1, 0 }, //ELECT2_STATE
225 { 0, 500000 }, //REBUILD0_STATE
226 { 0, 500000 }, //REBUILD1_STATE
227 { 10, 0 }, //REBUILD2_STATE
228 { 10, 0 }, //REBUILD3_STATE
229 { 10, 0 }, //REBUILD4_STATE
230 { 1, 0 }, //REBUILD5_STATE
231 { 1, 0 }, //LEAD_REBUILD1_STATE
232 { 1, 0 }, //LEAD_REBUILD2_STATE
233 { 10, 0 }, //LEAD_REBUILD3_STATE
234 { 10, 0 }, //LEAD_REBUILD4_STATE
235 { 0, 500000 }, //EXIT1_STATE
236 { 0, 0 } //EXIT2_STATE
239 int retry_vals[NUM_STATES] =
244 0, //LEAD_NORMAL1_STATE
245 0, //LEAD_NORMAL2_STATE
254 10, //LEAD_REBUILD1_STATE
255 10, //LEAD_REBUILD2_STATE
256 10, //LEAD_REBUILD3_STATE
257 10, //LEAD_REBUILD4_STATE
263 struct hostData myHostData;
264 pthread_t threadUdpListen;
265 pthread_t threadFillTask;
266 //status of fillTask: 0 = ready to run, 1 = running, 2 = completed, 3 = error
268 struct pollfd udpPollSock;
272 unsigned int electionOriginator;
273 unsigned int electionParent;
274 unsigned int hostArraySize = 0;
275 struct hostData *hostArray = NULL;
276 unsigned int numBlocks = 0;
277 unsigned short *blockOwnerArray = NULL;
278 unsigned char *hostReplied = NULL;
279 pthread_mutex_t stateMutex;
280 pthread_cond_t stateCond;
281 chashtable_t *myHashTable;
282 unsigned int numHosts;
283 struct timeval timer;
287 /*******************************************************************************
288 * Interface Function Definitions
289 *******************************************************************************/
291 void dhtInit(unsigned int seedIpAddr, unsigned int maxKeyCapacity) {
292 struct in_addr tmpAddr;
293 char filename[23] = "dht-";
294 struct sockaddr_in myAddr;
295 struct sockaddr_in seedAddr;
296 socklen_t socklen = sizeof(struct sockaddr_in);
299 tmpAddr.s_addr = htonl(getMyIpAddr(DEFAULT_INTERFACE));
300 strcat(filename, inet_ntoa(tmpAddr));
301 strcat(filename, ".log");
302 printf("log file: %s\n", filename);
304 logfile = fopen(filename, "w");
305 dhtLog("dhtInit(): inializing...\n");
307 myHostData.ipAddr = getMyIpAddr(DEFAULT_INTERFACE);
308 myHostData.maxKeyCapacity = maxKeyCapacity;
312 electionOriginator = 0;
314 hostArraySize = INIT_HOST_ALLOC;
315 hostArray = calloc(hostArraySize, sizeof(struct hostData));
316 hostReplied = calloc(hostArraySize, sizeof(unsigned char));
317 hostArray[0] = myHostData;
319 numBlocks = INIT_NUM_BLOCKS;
320 blockOwnerArray = calloc(numBlocks, sizeof(unsigned short));
321 pthread_mutex_init(&stateMutex, NULL);
322 pthread_cond_init(&stateCond, NULL);
323 myHashTable = chashCreate(HASH_SIZE, LOADFACTOR);
325 udpPollSock.fd = socket(AF_INET, SOCK_DGRAM, IPPROTO_UDP);
326 if (udpPollSock.fd < 0)
327 perror("dhtInit():socket()");
329 udpPollSock.events = POLLIN;
331 bzero(&myAddr, socklen);
332 myAddr.sin_family = AF_INET;
333 myAddr.sin_addr.s_addr = htonl(INADDR_ANY);
334 myAddr.sin_port = htons(UDP_PORT);
336 if (bind(udpPollSock.fd, (struct sockaddr *)&myAddr, socklen) < 0)
337 perror("dhtInit():bind()");
340 dhtLog("I am the leader\n");
341 leader = myHostData.ipAddr;
342 setState(LEAD_NORMAL1_STATE);
345 initMsg = WHO_IS_LEADER_CMD;
346 udpSend(&initMsg, 1, seed);
347 setState(INIT1_STATE);
350 if (pthread_create(&threadUdpListen, NULL, udpListen, NULL) != 0)
351 dhtLog("dhtInit() - ERROR creating threadUdpListen\n");
356 void dhtExit() { //TODO: do this gracefully, wait for response from leader, etc.
360 udpSend(&msg, 1, leader);
361 dhtLog("dhtExit(): cleaning up...\n");
362 pthread_cancel(threadUdpListen);
363 close(udpPollSock.fd);
366 free(blockOwnerArray);
372 int dhtInsert(unsigned int key, unsigned int val) {
373 struct sockaddr_in toAddr;
374 struct sockaddr_in fromAddr;
375 socklen_t socklen = sizeof(struct sockaddr_in);
376 struct pollfd pollsock;
384 bzero((char *)&toAddr, socklen);
385 toAddr.sin_family = AF_INET;
386 toAddr.sin_port = htons(UDP_PORT);
388 while (status != OPERATION_OK) {
389 pthread_mutex_lock(&stateMutex);
390 while (!(state == NORMAL_STATE || state == LEAD_NORMAL1_STATE
391 || state == LEAD_NORMAL2_STATE || state == REBUILD4_STATE
392 || state == LEAD_REBUILD3_STATE))
393 pthread_cond_wait(&stateCond, &stateMutex);
394 toAddr.sin_addr.s_addr = htonl(getKeyOwner(key));
395 pthread_mutex_unlock(&stateMutex);
397 if ((pollsock.fd = socket(AF_INET, SOCK_DGRAM, 0)) < 0) {
398 perror("dhtInsert():socket()");
401 pollsock.events = POLLIN;
403 outBuffer[0] = INSERT_CMD;
404 write4(&outBuffer[1], key);
405 write4(&outBuffer[5], val);
407 for (i = 0; i < INSERT_RETRIES; i++) {
408 if (sendto(pollsock.fd, outBuffer, 9, 0, (struct sockaddr *)&toAddr,
410 perror("dhtInsert():sendto()");
413 retval = poll(&pollsock, 1, INSERT_TIMEOUT_MS);
415 perror("dhtInsert():poll()");
419 bytesRcvd = recvfrom(pollsock.fd, inBuffer, 2, 0,
420 (struct sockaddr *)&fromAddr, &socklen);
421 if (fromAddr.sin_addr.s_addr == toAddr.sin_addr.s_addr
422 && fromAddr.sin_port == toAddr.sin_port
423 && bytesRcvd == 2 && inBuffer[0] == INSERT_RES) {
424 status = inBuffer[1]; //status from remote host
429 if (status != OPERATION_OK) {
430 pthread_mutex_lock(&stateMutex);
431 setState(REBUILD0_STATE);
432 outBuffer[0] = REBUILD_REQ;
433 udpSend(outBuffer, 1, leader);
434 pthread_mutex_unlock(&stateMutex);
443 int dhtInsertMult(unsigned int numKeys, unsigned int *keys, unsigned int *vals) {
448 for (i = 0; i < numKeys; i++) {
449 if (dhtInsert(keys[i], vals[i]) != 0)
455 int dhtRemove(unsigned int key) {
456 struct sockaddr_in toAddr;
457 struct sockaddr_in fromAddr;
458 socklen_t socklen = sizeof(struct sockaddr_in);
459 struct pollfd pollsock;
467 bzero((char *)&toAddr, socklen);
468 toAddr.sin_family = AF_INET;
469 toAddr.sin_port = htons(UDP_PORT);
471 while (!(status == OPERATION_OK || status == KEY_NOT_FOUND)) {
472 pthread_mutex_lock(&stateMutex);
473 while (!(state == NORMAL_STATE || state == LEAD_NORMAL1_STATE
474 || state == LEAD_NORMAL2_STATE))
475 pthread_cond_wait(&stateCond, &stateMutex);
476 toAddr.sin_addr.s_addr = htonl(getKeyOwner(key));
477 pthread_mutex_unlock(&stateMutex);
479 if ((pollsock.fd = socket(AF_INET, SOCK_DGRAM, 0)) < 0) {
480 perror("dhtRemove():socket()");
483 pollsock.events = POLLIN;
485 outBuffer[0] = REMOVE_CMD;
486 write4(&outBuffer[1], key);
488 for (i = 0; i < REMOVE_RETRIES; i++) {
489 if (sendto(pollsock.fd, outBuffer, 5, 0, (struct sockaddr *)&toAddr,
491 perror("dhtRemove():sendto()");
494 retval = poll(&pollsock, 1, REMOVE_TIMEOUT_MS);
496 perror("dhtRemove():poll()");
500 bytesRcvd = recvfrom(pollsock.fd, inBuffer, 2, 0,
501 (struct sockaddr *)&fromAddr, &socklen);
502 if (fromAddr.sin_addr.s_addr == toAddr.sin_addr.s_addr
503 && fromAddr.sin_port == toAddr.sin_port
504 && bytesRcvd == 2 && inBuffer[0] == REMOVE_RES) {
505 status = inBuffer[1]; //status from remote host
510 if (!(status == OPERATION_OK || status == KEY_NOT_FOUND)) {
511 pthread_mutex_lock(&stateMutex);
512 setState(REBUILD0_STATE);
513 outBuffer[0] = REBUILD_REQ;
514 udpSend(outBuffer, 1, leader);
515 pthread_mutex_unlock(&stateMutex);
524 int dhtRemoveMult(unsigned int numKeys, unsigned int *keys) {
529 for (i = 0; i < numKeys; i++) {
530 if (dhtRemove(keys[i]) != 0)
536 int dhtSearch(unsigned int key, unsigned int *val) {
537 struct sockaddr_in toAddr;
538 struct sockaddr_in fromAddr;
539 socklen_t socklen = sizeof(struct sockaddr_in);
540 struct pollfd pollsock;
548 bzero((char *)&toAddr, socklen);
549 toAddr.sin_family = AF_INET;
550 toAddr.sin_port = htons(UDP_PORT);
552 while (!(status == OPERATION_OK || status == KEY_NOT_FOUND)) {
553 pthread_mutex_lock(&stateMutex);
554 while (numBlocks == 0)
555 pthread_cond_wait(&stateCond, &stateMutex);
556 toAddr.sin_addr.s_addr = htonl(getKeyOwner(key));
557 pthread_mutex_unlock(&stateMutex);
559 if ((pollsock.fd = socket(AF_INET, SOCK_DGRAM, 0)) < 0) {
560 perror("dhtSearch():socket()");
563 pollsock.events = POLLIN;
565 outBuffer[0] = SEARCH_CMD;
566 write4(&outBuffer[1], key);
568 for (i = 0; i < SEARCH_RETRIES; i++) {
569 if (sendto(pollsock.fd, outBuffer, 5, 0, (struct sockaddr *)&toAddr,
571 perror("dhtSearch():sendto()");
574 retval = poll(&pollsock, 1, SEARCH_TIMEOUT_MS);
576 perror("dhtSearch():poll()");
580 bytesRcvd = recvfrom(pollsock.fd, inBuffer, 6, 0,
581 (struct sockaddr *)&fromAddr, &socklen);
582 if (fromAddr.sin_addr.s_addr == toAddr.sin_addr.s_addr
583 && fromAddr.sin_port == toAddr.sin_port
584 && bytesRcvd == 6 && inBuffer[0] == SEARCH_RES) {
585 status = inBuffer[1]; //status from remote host
586 *val = read4(&inBuffer[2]);
591 if (!(status == OPERATION_OK || status == KEY_NOT_FOUND)) {
592 pthread_mutex_lock(&stateMutex);
593 setState(REBUILD0_STATE);
594 outBuffer[0] = REBUILD_REQ;
595 udpSend(outBuffer, 1, leader);
596 pthread_mutex_unlock(&stateMutex);
605 int dhtSearchMult(unsigned int numKeys, unsigned int *keys, unsigned int *vals) {
608 for (i = 0; i < numKeys; i++) {
609 if (dhtSearch(keys[i], &vals[i]) != 0)
615 /*******************************************************************************
616 * Local Function Definitions
617 *******************************************************************************/
619 int msgSizeOk(unsigned char *msg, unsigned int size) {
620 unsigned short tmpNumHosts;
621 unsigned short tmpNumBlocks;
627 case WHO_IS_LEADER_CMD:
635 case RESUME_NORMAL_CMD:
636 case RESUME_NORMAL_RES:
646 case WHO_IS_LEADER_RES:
648 case ELECT_LEADER_CMD:
660 tmpNumHosts = read2(&msg[1]);
661 tmpNumBlocks = read2(&msg[3]);
662 return (size == (5+sizeof(struct hostData)*tmpNumHosts+2*tmpNumBlocks));
664 case ELECT_LEADER_RES:
671 tmpNumHosts = read2(&msg[2]);
672 return (size == (4 + sizeof(struct hostData) * tmpNumHosts));
677 tmpNumHosts = read2(&msg[1]);
678 return (size == (3 + sizeof(struct hostData) * tmpNumHosts));
685 unsigned short read2(unsigned char *ptr) {
686 unsigned short tmp = (ptr[1] << 8) | ptr[0];
690 unsigned int read4(unsigned char *ptr) {
691 unsigned int tmp = (ptr[3] << 24) | (ptr[2] << 16) | (ptr[1] << 8) | ptr[0];
695 void write2(unsigned char *ptr, unsigned short tmp) {
696 ptr[1] = (tmp >> 8) & 0xFF;
701 void write4(unsigned char *ptr, unsigned int tmp) {
702 ptr[3] = (tmp >> 24) & 0xFF;
703 ptr[2] = (tmp >> 16) & 0xFF;
704 ptr[1] = (tmp >> 8) & 0xFF;
709 unsigned int getMyIpAddr(const char *interfaceStr) {
711 struct ifreq interfaceInfo;
712 struct sockaddr_in *myAddr = (struct sockaddr_in *)&interfaceInfo.ifr_addr;
714 memset(&interfaceInfo, 0, sizeof(struct ifreq));
716 if((sock = socket(AF_INET, SOCK_STREAM, 0)) < 0) {
717 perror("getMyIpAddr():socket()");
721 strcpy(interfaceInfo.ifr_name, interfaceStr);
722 myAddr->sin_family = AF_INET;
724 if(ioctl(sock, SIOCGIFADDR, &interfaceInfo) != 0) {
725 perror("getMyIpAddr():ioctl()");
729 return ntohl(myAddr->sin_addr.s_addr);
732 int udpSend(unsigned char *msg, unsigned int size, unsigned int destIp) {
733 struct sockaddr_in peerAddr;
734 socklen_t socklen = sizeof(struct sockaddr_in);
736 bzero(&peerAddr, socklen);
737 peerAddr.sin_family = AF_INET;
738 peerAddr.sin_addr.s_addr = htonl(destIp);
739 peerAddr.sin_port = htons(UDP_PORT);
742 if (msg[0] < NUM_MSG_TYPES)
743 dhtLog("udpSend(): sending %s to %s, %d bytes\n", msg_types[msg[0]],
744 inet_ntoa(peerAddr.sin_addr), size);
746 dhtLog("udpSend(): sending unknown message to %s, %d bytes\n",
747 inet_ntoa(peerAddr.sin_addr), size);
750 if (sendto(udpPollSock.fd, (void *)msg, size, 0, (struct sockaddr *)&peerAddr,
752 perror("udpSend():sendto()");
759 int udpSendAll(unsigned char *msg, unsigned int size) {
762 for (i = 0; i < numHosts; i++) {
763 if ((hostReplied[i] == 0) && (hostArray[i].ipAddr != myHostData.ipAddr)) {
764 if (udpSend(msg, size, hostArray[i].ipAddr) != 0)
771 //note: make sure this is only executed in a valid state, where numBlocks != 0
772 unsigned int hash(unsigned int x) {
773 return (x % numBlocks);
776 //note: make sure this is only executed in a valid state, where these arrays
777 // are allocated and the index mappings are consistent
778 unsigned int getKeyOwner(unsigned int key) {
779 return hostArray[blockOwnerArray[hash(key)]].ipAddr;
782 //sets state and timer, if applicable
783 void setState(unsigned int newState) {
787 gettimeofday(&now, NULL);
789 if (newState >= NUM_STATES) {
790 dhtLog("setState(): ERROR: invalid state %d\n", newState);
793 if (timeout_vals[newState].tv_sec == 0
794 && timeout_vals[newState].tv_usec == 0) { //no timer
798 timeradd(&now, &timeout_vals[newState], &timer);
803 //TODO: only do this for states that require it
804 for (i = 0; i < numHosts; i++)
807 dhtLog("setState(): state set to %s\n", state_names[state]);
813 //TODO: improve these simple and inefficient functions
814 int checkReplied(unsigned int ipAddr) {
817 i = findHost(ipAddr);
830 for (i = 0; i < numHosts; i++)
831 if ((hostReplied[i] == 0) && (hostArray[i].ipAddr != myHostData.ipAddr))
837 int findHost(unsigned int ipAddr) {
840 for (i = 0; i < numHosts; i++)
841 if (hostArray[i].ipAddr == ipAddr)
842 return i; //found, return index
844 return -1; //not found
847 int removeHost(unsigned int ipAddr) {
850 i = findHost(ipAddr);
855 for (j = 0; j < numBlocks; j++) {
856 if (blockOwnerArray[j] == i)
857 blockOwnerArray[j] = 0; //TODO: is this what I want to have happen?
858 else if (blockOwnerArray[j] > i)
859 blockOwnerArray[j]--;
862 for (; i < numHosts - 1; i++) {
863 hostArray[i] = hostArray[i+1];
864 hostReplied[i] = hostReplied[i+1];
871 void removeUnresponsiveHosts() {
874 for (i = 0; i < numHosts; i++) {
875 if (!hostReplied[i] && hostArray[i].ipAddr != myHostData.ipAddr)
876 removeHost(hostArray[i].ipAddr);
880 int addHost(struct hostData newHost) {
881 struct hostData *newHostArray;
882 unsigned char *newHostReplied;
886 for (i = 0; i < numHosts; i++) {
887 if (hostArray[i].ipAddr == newHost.ipAddr) {
888 hostArray[i] = newHost;
891 } else if (hostArray[i].ipAddr > newHost.ipAddr) {
892 if (numHosts == hostArraySize) {
893 newHostArray = calloc(2 * hostArraySize, sizeof(struct hostData));
894 newHostReplied = calloc(2 * hostArraySize, sizeof(unsigned char));
895 memcpy(newHostArray, hostArray, (i * sizeof(struct hostData)));
896 memcpy(newHostReplied, hostReplied, (i * sizeof(unsigned char)));
897 newHostArray[i] = newHost;
898 newHostReplied[i] = 0;
899 memcpy(&newHostArray[i+1], &hostArray[i], ((numHosts - i) *
900 sizeof(struct hostData)));
901 memcpy(&newHostReplied[i+1], &hostReplied[i], ((numHosts - i) *
902 sizeof(unsigned char)));
905 hostArray = newHostArray;
906 hostReplied = newHostReplied;
907 hostArraySize = 2 * hostArraySize;
910 for (j = numHosts; j > i; j--) {
911 hostArray[j] = hostArray[j-1];
912 hostReplied[j] = hostReplied[j-1];
914 hostArray[i] = newHost;
917 for(j = 0; j < numBlocks; j++) {
918 if (blockOwnerArray[j] >= i)
919 blockOwnerArray[j]++;
926 //nothing greater, add to end
927 if (numHosts == hostArraySize) {
928 newHostArray = calloc(2 * hostArraySize, sizeof(struct hostData));
929 newHostReplied = calloc(2 * hostArraySize, sizeof(unsigned char));
930 memcpy(newHostArray, hostArray, (numHosts * sizeof(struct hostData)));
931 memcpy(newHostReplied, hostReplied, (numHosts * sizeof(unsigned char)));
934 hostArray = newHostArray;
935 hostReplied = newHostReplied;
936 hostArraySize = 2 * hostArraySize;
939 hostArray[numHosts] = newHost;
940 hostReplied[numHosts] = 0;
945 void makeAssignments() {
948 if (numBlocks < numHosts) {
949 free(blockOwnerArray);
950 while (numBlocks < numHosts)
952 blockOwnerArray = calloc(numBlocks, sizeof(unsigned short));
955 for (i = 0; i < numBlocks; i++)
956 blockOwnerArray[i] = i % numHosts;
961 void writeHostList() {
963 struct in_addr tmpAddr;
965 fprintf(logfile, "numHosts = %d\n", numHosts);
966 for (i = 0; i < numHosts; i++) {
967 tmpAddr.s_addr = htonl(hostArray[i].ipAddr);
968 fprintf(logfile, "%d) %s, %d\n", i, inet_ntoa(tmpAddr),
969 hostArray[i].maxKeyCapacity);
974 void dhtLog(const char *format, ...) {
976 // struct timeval now;
978 // if (gettimeofday(&now, NULL) < 0)
979 // { perror("dhtLog():gettimeofday()"); }
980 va_start(args, format);
981 // if (fprintf(logfile, "%d.%06d:", now.tv_sec, now.tv_usec) < 0)
982 // { perror("dhtLog():fprintf()"); }
983 if (vfprintf(logfile, format, args) < 0) {
984 perror("dhtLog():vfprintf()");
986 if (fflush(logfile) == EOF) {
987 perror("dhtLog():fflush()");
997 unsigned int numKeys;
1000 vals = mhashGetKeys(&numKeys); //note: key of mhash is val of dht
1001 keys = calloc(numKeys, sizeof(unsigned int));
1003 for (i = 0; i < numKeys; i++)
1004 keys[i] = myHostData.ipAddr;
1006 if (dhtInsertMult(numKeys, keys, vals) == 0)
1016 struct sockaddr_in peerAddr;
1017 unsigned int peerIp;
1018 socklen_t socklen = sizeof(struct sockaddr_in);
1019 unsigned char inBuffer[MAX_MSG_SIZE];
1020 unsigned char outBuffer[MAX_MSG_SIZE];
1023 struct in_addr tmpAddr;
1024 struct hostData tmpHost;
1025 unsigned int tmpKey;
1026 unsigned int tmpVal;
1027 struct hostData *hostDataPtr;
1028 unsigned short *uShortPtr;
1029 unsigned int tmpUInt;
1030 unsigned int tmpUShort;
1032 unsigned int oldState;
1034 dhtLog("udpListen(): linstening on port %d...\n", UDP_PORT);
1037 pollret = poll(&udpPollSock, 1, TIMEOUT_PERIOD);
1038 pthread_mutex_lock(&stateMutex);
1041 perror("udpListen():poll()");
1042 } else if (pollret > 0) {
1043 bytesRcvd = recvfrom(udpPollSock.fd, inBuffer, MAX_MSG_SIZE, 0,
1044 (struct sockaddr *)&peerAddr, &socklen);
1045 if (bytesRcvd < 1) {
1046 dhtLog("udpListen(): ERROR: bytesRcvd = %d\n", bytesRcvd);
1047 } else if (inBuffer[0] >= NUM_MSG_TYPES) {
1048 dhtLog("udpListen(): ERROR: unknown msg type = %d\n", inBuffer[0]);
1049 } else if (!msgSizeOk(inBuffer, bytesRcvd)) {
1050 dhtLog("udpListen(): ERROR: msg size not ok: type = %s\n, size = %d\n",
1051 msg_types[inBuffer[0]], bytesRcvd);
1052 } else if (state == EXIT2_STATE) {
1054 } else if (state == INIT1_STATE) { //after initialization with seed, do not proceed until seed replies
1055 dhtLog("udpListen(): received %s from %s, %d bytes\n",
1056 msg_types[inBuffer[0]], inet_ntoa(peerAddr.sin_addr), bytesRcvd);
1057 for (i = 0; i < bytesRcvd; i++)
1058 dhtLog(" %x", inBuffer[i]);
1060 peerIp = ntohl(peerAddr.sin_addr.s_addr);
1061 if (peerIp == seed && inBuffer[0] == WHO_IS_LEADER_RES) {
1062 tmpHost.ipAddr = peerIp;
1063 tmpHost.maxKeyCapacity = 0;
1066 leader = read4(&inBuffer[1]);
1067 tmpAddr.s_addr = htonl(leader);
1068 dhtLog("leader = %s\n", inet_ntoa(tmpAddr));
1070 setState(INIT2_STATE);
1071 outBuffer[0] = JOIN_REQ;
1072 write4(&outBuffer[1], myHostData.maxKeyCapacity);
1073 udpSend(outBuffer, 5, leader);
1076 electionOriginator = myHostData.ipAddr;
1077 setState(ELECT1_STATE);
1078 outBuffer[0] = ELECT_LEADER_CMD;
1079 write4(&outBuffer[1], myHostData.ipAddr); //originator = me
1080 udpSendAll(outBuffer, 5);
1085 dhtLog("udpListen(): received %s from %s, %d bytes\n",
1086 msg_types[inBuffer[0]], inet_ntoa(peerAddr.sin_addr), bytesRcvd);
1087 for (i = 0; i < bytesRcvd; i++)
1088 dhtLog(" %x", inBuffer[i]);
1090 peerIp = ntohl(peerAddr.sin_addr.s_addr);
1091 switch (inBuffer[0]) {
1093 if (state == NORMAL_STATE || state == LEAD_NORMAL1_STATE
1094 || state == LEAD_NORMAL2_STATE || state == REBUILD4_STATE
1095 || state == REBUILD5_STATE || state == LEAD_REBUILD3_STATE) {
1096 tmpKey = read4(&inBuffer[1]);
1097 tmpVal = read4(&inBuffer[5]);
1098 outBuffer[0] = INSERT_RES;
1099 if (getKeyOwner(tmpKey) == myHostData.ipAddr) {
1100 if (chashInsert(myHashTable, tmpKey, (void *)tmpVal) == 0)
1101 outBuffer[1] = OPERATION_OK;
1103 outBuffer[1] = INTERNAL_ERROR;
1106 outBuffer[1] = NOT_KEY_OWNER;
1108 //reply to client socket
1109 sendto(udpPollSock.fd, outBuffer, 2, 0,
1110 (struct sockaddr *)&peerAddr, socklen);
1115 if (state == NORMAL_STATE || state == LEAD_NORMAL1_STATE
1116 || state == LEAD_NORMAL2_STATE) {
1117 tmpKey = read4(&inBuffer[1]);
1118 outBuffer[0] = REMOVE_RES;
1119 if (getKeyOwner(tmpKey) == myHostData.ipAddr) {
1120 if (chashRemove(myHashTable, tmpKey) == 0)
1121 outBuffer[1] = OPERATION_OK;
1123 outBuffer[1] = KEY_NOT_FOUND;
1126 outBuffer[1] = NOT_KEY_OWNER;
1128 //reply to client socket
1129 sendto(udpPollSock.fd, outBuffer, 2, 0,
1130 (struct sockaddr *)&peerAddr, socklen);
1135 if (state == NORMAL_STATE || state == LEAD_NORMAL1_STATE
1136 || state == LEAD_NORMAL2_STATE) {
1137 tmpKey = read4(&inBuffer[1]);
1138 outBuffer[0] = SEARCH_RES;
1139 if (getKeyOwner(tmpKey) == myHostData.ipAddr) {
1140 if ((tmpVal = (unsigned int)chashSearch(myHashTable, tmpKey)) != 0) {
1141 outBuffer[1] = OPERATION_OK;
1142 write4(&outBuffer[2], tmpVal);
1145 outBuffer[1] = KEY_NOT_FOUND;
1146 write4(&outBuffer[2], 0);
1150 outBuffer[1] = NOT_KEY_OWNER;
1151 write4(&outBuffer[2], 0);
1153 //reply to client socket
1154 sendto(udpPollSock.fd, outBuffer, 6, 0,
1155 (struct sockaddr *)&peerAddr, socklen);
1159 case WHO_IS_LEADER_CMD:
1160 tmpHost.ipAddr = peerIp;
1161 tmpHost.maxKeyCapacity = 0;
1164 outBuffer[0] = WHO_IS_LEADER_RES;
1165 //leader == 0 means I don't know who it is
1166 write4(&outBuffer[1], leader);
1167 udpSend(outBuffer, 5, peerIp);
1171 if (state == LEAD_NORMAL1_STATE || state == LEAD_NORMAL2_STATE) {
1172 tmpHost.ipAddr = peerIp;
1173 tmpHost.maxKeyCapacity = read4(&inBuffer[1]);
1176 if (state == LEAD_NORMAL1_STATE)
1177 setState(LEAD_NORMAL2_STATE);
1178 outBuffer[0] = JOIN_RES;
1179 outBuffer[1] = 0; //status, success
1180 udpSend(outBuffer, 2, peerIp);
1181 } else if (state == LEAD_REBUILD1_STATE) {
1182 //note: I don't need to addHost().
1183 checkReplied(peerIp);
1184 outBuffer[0] = JOIN_RES;
1185 outBuffer[1] = 0; //status, success
1186 udpSend(outBuffer, 2, peerIp);
1189 setState(LEAD_REBUILD2_STATE);
1190 outBuffer[0] = DHT_UPDATE_CMD;
1191 write2(&outBuffer[1], numHosts);
1192 write2(&outBuffer[3], numBlocks);
1193 memcpy(&outBuffer[5], hostArray, numHosts*sizeof(struct hostData));
1194 memcpy(&outBuffer[5+numHosts*sizeof(struct hostData)],
1195 blockOwnerArray, numBlocks*2);
1196 udpSendAll(outBuffer, 5 + sizeof(struct hostData) * numHosts
1203 if (state == REBUILD1_STATE) {
1204 setState(REBUILD2_STATE);
1205 } else if (state == INIT2_STATE) {
1206 setState(NORMAL_STATE);
1211 if (state == LEAD_NORMAL1_STATE || state == LEAD_NORMAL2_STATE) { //TODO: make this graceful, instead of just rebuilding
1213 if (state != LEAD_NORMAL2_STATE)
1214 setState(LEAD_NORMAL2_STATE);
1218 case DHT_UPDATE_CMD:
1219 if (state == REBUILD2_STATE && peerIp == leader) {
1221 free(blockOwnerArray);
1222 numHosts = read2(&inBuffer[1]);
1223 numBlocks = read2(&inBuffer[3]);
1224 while (hostArraySize < numHosts)
1226 hostArray = calloc(hostArraySize, sizeof(struct hostData));
1227 blockOwnerArray = calloc(numBlocks, 2);
1228 memcpy(hostArray, &inBuffer[5], numHosts*sizeof(struct hostData));
1229 memcpy(blockOwnerArray, &inBuffer[5+numHosts*sizeof(struct hostData)], numBlocks*2);
1231 setState(REBUILD3_STATE);
1232 outBuffer[0] = DHT_UPDATE_RES;
1233 udpSend(outBuffer, 1, peerIp);
1237 case DHT_UPDATE_RES:
1238 if (state == LEAD_REBUILD2_STATE) {
1239 checkReplied(peerIp);
1241 setState(LEAD_REBUILD3_STATE);
1242 outBuffer[0] = FILL_DHT_CMD;
1243 udpSendAll(outBuffer, 1);
1244 if (fillStatus != 0)
1245 dhtLog("udpListen(): ERROR: fillTask already running\n");
1247 if (pthread_create(&threadFillTask, NULL, fillTask, NULL) != 0)
1248 dhtLog("udpListen(): ERROR creating threadFillTask\n");
1253 case ELECT_LEADER_CMD:
1254 tmpUInt = read4(&inBuffer[1]);
1255 if ((state == ELECT1_STATE || state == ELECT2_STATE)
1256 && tmpUInt >= electionOriginator) { //already participating in a higher-priority election
1257 outBuffer[0] = ELECT_LEADER_RES;
1258 outBuffer[1] = 0xFF;
1259 udpSend(outBuffer, 2, peerIp);
1262 electionOriginator = tmpUInt;
1263 electionParent = peerIp;
1264 setState(ELECT1_STATE);
1265 outBuffer[0] = ELECT_LEADER_CMD;
1266 write4(&outBuffer[1], electionOriginator);
1267 //don't bother forwarding the message to originator or parent
1268 checkReplied(electionOriginator);
1269 checkReplied(electionParent);
1270 if (allReplied()) { //in case that is everybody I know of
1271 setState(ELECT2_STATE);
1272 outBuffer[0] = ELECT_LEADER_RES;
1274 write2(&outBuffer[2], numHosts);
1275 memcpy(&outBuffer[4], hostArray, sizeof(struct hostData)
1277 udpSend(outBuffer, 4 + sizeof(struct hostData) * numHosts,
1281 udpSendAll(outBuffer, 5);
1286 case ELECT_LEADER_RES:
1287 if (state == ELECT1_STATE) {
1288 checkReplied(peerIp);
1289 if (inBuffer[1] != 0xFF) {
1290 tmpUShort = read2(&inBuffer[2]);
1291 hostDataPtr = (struct hostData *)&inBuffer[4];
1292 for (i = 0; i < tmpUShort; i++)
1293 addHost(hostDataPtr[i]);
1297 setState(ELECT2_STATE);
1298 if (electionOriginator == myHostData.ipAddr) {
1299 leader = hostArray[0].ipAddr;
1300 if (leader == myHostData.ipAddr) { //I am the leader
1301 dhtLog("I am the leader!\n");
1302 setState(LEAD_REBUILD1_STATE);
1303 outBuffer[0] = REBUILD_CMD;
1304 udpSendAll(outBuffer, 1);
1307 outBuffer[0] = CONGRATS_CMD;
1308 write2(&outBuffer[1], numHosts);
1309 hostDataPtr = (struct hostData *)&outBuffer[3];
1310 for (i = 0; i < numHosts; i++)
1311 hostDataPtr[i] = hostArray[i];
1312 udpSend(outBuffer, 3 + sizeof(struct hostData) * numHosts,
1317 outBuffer[0] = ELECT_LEADER_RES;
1319 write2(&outBuffer[2], numHosts);
1320 hostDataPtr = (struct hostData *)&outBuffer[4];
1321 for (i = 0; i < numHosts; i++)
1322 hostDataPtr[i] = hostArray[i];
1323 udpSend(outBuffer, 4 + sizeof(struct hostData) * numHosts,
1331 if (state == ELECT2_STATE) { //I am the leader
1332 leader = myHostData.ipAddr;
1333 dhtLog("I am the leader!\n");
1334 tmpUShort = read2(&inBuffer[1]);
1335 hostDataPtr = (struct hostData *)&inBuffer[3];
1336 for (i = 0; i < tmpUShort; i++)
1337 addHost(hostDataPtr[i]);
1339 setState(LEAD_REBUILD1_STATE);
1340 outBuffer[0] = REBUILD_CMD;
1341 udpSendAll(outBuffer, 1);
1346 if (state == LEAD_NORMAL1_STATE || state == LEAD_NORMAL2_STATE) {
1347 setState(LEAD_REBUILD1_STATE);
1348 outBuffer[0] = REBUILD_CMD;
1349 udpSendAll(outBuffer, 1);
1354 leader = peerIp; //consider this a declaration of authority
1355 setState(REBUILD1_STATE);
1356 outBuffer[0] = JOIN_REQ;
1357 write4(&outBuffer[1], myHostData.maxKeyCapacity);
1358 udpSend(outBuffer, 5, leader);
1362 if (state == REBUILD3_STATE && peerIp == leader) {
1363 setState(REBUILD4_STATE);
1364 if (fillStatus != 0)
1365 dhtLog("udpListen(): ERROR: fillTask already running\n");
1367 if (pthread_create(&threadFillTask, NULL, fillTask, NULL) != 0)
1368 dhtLog("udpListen(): ERROR creating threadFillTask\n");
1373 if (state == LEAD_REBUILD3_STATE) {
1374 checkReplied(peerIp);
1375 if (allReplied() && fillStatus == 2) {
1377 setState(LEAD_REBUILD4_STATE);
1378 outBuffer[0] = RESUME_NORMAL_CMD;
1379 udpSendAll(outBuffer, 1);
1384 case RESUME_NORMAL_CMD:
1385 if (state == REBUILD5_STATE && peerIp == leader) {
1386 setState(NORMAL_STATE);
1387 outBuffer[0] = RESUME_NORMAL_RES;
1388 udpSend(outBuffer, 1, leader);
1392 case RESUME_NORMAL_RES:
1393 if (state == LEAD_REBUILD4_STATE) {
1394 checkReplied(peerIp);
1396 setState(LEAD_NORMAL1_STATE);
1403 if (state == REBUILD4_STATE) {
1404 switch (fillStatus) {
1405 case 0: dhtLog("udpListen(): ERROR: fillStatus=0 in REBUILD4_STATE\n");
1408 case 1: //do nothing
1411 case 2: //done filling the dht, notify leader
1413 setState(REBUILD5_STATE);
1414 outBuffer[0] = FILL_DHT_RES;
1415 udpSend(outBuffer, 1, leader);
1418 case 3: //error encountered -> restart rebuild
1420 setState(REBUILD0_STATE);
1421 outBuffer[0] = REBUILD_REQ;
1422 udpSend(outBuffer, 1, leader);
1426 if (state == LEAD_REBUILD3_STATE) {
1427 switch (fillStatus) {
1428 case 0: dhtLog("udpListen(): ERROR: fillStatus=0 in LEAD_REBUILD3_STATE\n");
1431 case 1: //do nothing
1434 case 2: //I'm done, now is everybody else also done?
1437 setState(LEAD_REBUILD4_STATE);
1438 outBuffer[0] = RESUME_NORMAL_CMD;
1439 udpSendAll(outBuffer, 1);
1443 case 3: //error encountered -> restart rebuild
1445 setState(LEAD_REBUILD1_STATE);
1446 outBuffer[0] = REBUILD_CMD;
1447 udpSendAll(outBuffer, 1);
1452 gettimeofday(&now, NULL);
1453 if (timercmp(&now, &timer, >)) {
1454 if (timeoutCntr < retry_vals[state]) {
1456 timeradd(&now, &timeout_vals[state], &timer);
1457 dhtLog("udpListen(): retry: %d\n", timeoutCntr);
1460 outBuffer[0] = WHO_IS_LEADER_CMD;
1461 udpSend(outBuffer, 1, seed);
1465 outBuffer[0] = JOIN_REQ;
1466 write4(&outBuffer[1], myHostData.maxKeyCapacity);
1467 udpSend(outBuffer, 5, leader);
1471 outBuffer[0] = ELECT_LEADER_CMD;
1472 write4(&outBuffer[1], electionOriginator);
1473 udpSendAll(outBuffer, 5);
1477 if (electionOriginator == myHostData.ipAddr) { //retry notify leader
1478 outBuffer[0] = CONGRATS_CMD;
1479 write2(&outBuffer[1], numHosts);
1480 memcpy(&outBuffer[3], hostArray, sizeof(struct hostData)
1482 udpSend(outBuffer, 3 + sizeof(struct hostData) * numHosts,
1486 outBuffer[0] = ELECT_LEADER_RES;
1488 write2(&outBuffer[2], numHosts);
1489 memcpy(&outBuffer[4], hostArray, sizeof(struct hostData)
1491 udpSend(outBuffer, 4 + sizeof(struct hostData) * numHosts,
1496 case REBUILD0_STATE:
1497 outBuffer[0] = REBUILD_REQ;
1498 udpSend(outBuffer, 1, leader);
1501 case REBUILD1_STATE:
1502 outBuffer[0] = JOIN_REQ;
1503 write4(&outBuffer[1], myHostData.maxKeyCapacity);
1504 udpSend(outBuffer, 5, leader);
1507 case REBUILD5_STATE:
1508 outBuffer[0] = FILL_DHT_RES;
1509 udpSend(outBuffer, 1, leader);
1512 case LEAD_REBUILD1_STATE:
1513 outBuffer[0] = REBUILD_CMD;
1514 udpSendAll(outBuffer, 1);
1517 case LEAD_REBUILD2_STATE:
1518 outBuffer[0] = DHT_UPDATE_CMD;
1519 write2(&outBuffer[1], numHosts);
1520 write2(&outBuffer[3], numBlocks);
1521 memcpy(&outBuffer[5], hostArray, numHosts
1522 * sizeof(struct hostData));
1523 memcpy(&outBuffer[5+numHosts*sizeof(struct hostData)],
1524 blockOwnerArray, numBlocks*2);
1525 udpSendAll(outBuffer, 5 + sizeof(struct hostData) * numHosts
1529 case LEAD_REBUILD3_STATE:
1530 outBuffer[0] = FILL_DHT_CMD;
1531 udpSendAll(outBuffer, 1);
1534 case LEAD_REBUILD4_STATE:
1535 outBuffer[0] = RESUME_NORMAL_CMD;
1536 udpSendAll(outBuffer, 1);
1539 case EXIT1_STATE: //TODO...
1543 case LEAD_NORMAL1_STATE:
1544 case LEAD_NORMAL2_STATE:
1545 case REBUILD2_STATE:
1546 case REBUILD3_STATE:
1547 case REBUILD4_STATE:
1548 case EXIT2_STATE: //we shouldn't get here
1553 dhtLog("udpListen(): timed out in state %s after %d retries\n",
1554 state_names[state], timeoutCntr);
1557 setState(EXIT2_STATE);
1560 case LEAD_NORMAL2_STATE:
1561 setState(LEAD_REBUILD1_STATE);
1562 outBuffer[0] = REBUILD_CMD;
1563 udpSendAll(outBuffer, 1);
1567 dhtLog("removing unresponsive hosts, before:\n");
1569 removeUnresponsiveHosts();
1572 setState(ELECT2_STATE);
1573 if (electionOriginator == myHostData.ipAddr) {
1574 leader = hostArray[0].ipAddr;
1575 if (leader == myHostData.ipAddr) { //I am the leader
1576 dhtLog("I am the leader!\n");
1577 setState(LEAD_REBUILD1_STATE);
1578 outBuffer[0] = REBUILD_CMD;
1579 udpSendAll(outBuffer, 1);
1582 outBuffer[0] = CONGRATS_CMD;
1583 write2(&outBuffer[1], numHosts);
1584 memcpy(&outBuffer[3], hostArray, sizeof(struct hostData)
1586 udpSend(outBuffer, 3 + sizeof(struct hostData) * numHosts,
1591 outBuffer[0] = ELECT_LEADER_RES;
1593 write2(&outBuffer[2], numHosts);
1594 memcpy(&outBuffer[4], hostArray, sizeof(struct hostData)
1596 udpSend(outBuffer, 4 + sizeof(struct hostData) * numHosts,
1603 case REBUILD0_STATE:
1604 case REBUILD1_STATE:
1605 case REBUILD2_STATE:
1606 case REBUILD3_STATE:
1607 case REBUILD4_STATE:
1608 case REBUILD5_STATE:
1609 case LEAD_REBUILD1_STATE:
1610 case LEAD_REBUILD2_STATE:
1611 case LEAD_REBUILD3_STATE:
1612 case LEAD_REBUILD4_STATE:
1614 electionOriginator = myHostData.ipAddr;
1615 setState(ELECT1_STATE);
1616 outBuffer[0] = ELECT_LEADER_CMD;
1617 write4(&outBuffer[1], myHostData.ipAddr); //originator = me
1618 udpSendAll(outBuffer, 5);
1622 setState(EXIT2_STATE);
1626 case LEAD_NORMAL1_STATE:
1627 case EXIT2_STATE: //we shouldn't get here
1633 if (state != oldState)
1634 pthread_cond_broadcast(&stateCond);
1635 pthread_mutex_unlock(&stateMutex);