1 /*******************************************************************************
4 * High-performance Distributed Hash Table for finding the location of objects
5 * in a Distributed Shared Transactional Memory system.
10 * 1) Instead of having dhtInsertMult, dhtSearchMult, etc. call their single-key
11 * counterparts repeatedly, define some new messages to handle it more
13 * 2) Improve the efficiency of functions that work with hostArray, hostReplied,
14 * and blockOwnerArray.
15 * 3) Currently a join or leave causes a rebuild of the entire hash table.
16 * Implement more graceful join and leave procedures.
17 * 4) Fine tune timeout values for performance, possibly implement a backoff
18 * algorithm to prevent overloading the network.
19 * 5) Whatever else I'm forgetting
21 *******************************************************************************/
22 /*******************************************************************************
24 *******************************************************************************/
26 #include <netinet/in.h>
27 #include <arpa/inet.h>
28 #include <sys/types.h>
29 #include <sys/socket.h>
30 #include <sys/ioctl.h>
41 #include <linux/sockios.h>
42 #include <sys/queue.h>
44 #include "clookup.h" //this works for now, do we need anything better?
47 /*******************************************************************************
48 * Local Defines, Structs
49 *******************************************************************************/
51 #define MAX_MSG_SIZE 1500
53 #define INIT_HOST_ALLOC 3
54 #define INIT_NUM_BLOCKS 16
55 #define DEFAULT_INTERFACE "eth0"
56 #define TIMEOUT_PERIOD 100
57 #define INSERT_TIMEOUT_MS 500
58 #define INSERT_RETRIES 50
59 #define REMOVE_TIMEOUT_MS 500
60 #define REMOVE_RETRIES 50
61 #define SEARCH_TIMEOUT_MS 500
62 #define SEARCH_RETRIES 50
65 //make sure this matches msg_types global var
95 //make sure this matches state_names, timeout_vals, and retry_vals global vars
133 unsigned int maxKeyCapacity;
136 /*******************************************************************************
137 * Local Function Prototypes
138 *******************************************************************************/
140 int msgSizeOk(unsigned char *msg, unsigned int size);
141 unsigned short read2(unsigned char *msg);
142 unsigned int read4(unsigned char *msg);
143 void write2(unsigned char *ptr, unsigned short tmp);
144 void write4(unsigned char *ptr, unsigned int tmp);
145 unsigned int getMyIpAddr(const char *interfaceStr);
146 int udpSend(unsigned char *msg, unsigned int size, unsigned int destIp);
147 int udpSendAll(unsigned char *msg, unsigned int size);
148 unsigned int hash(unsigned int x);
149 unsigned int getKeyOwner(unsigned int key);
150 void setState(unsigned int newState);
151 void makeAssignments();
152 int addHost(struct hostData newHost);
153 int removeHost(unsigned int ipAddr);
154 void removeUnresponsiveHosts();
155 int checkReplied(unsigned int ipAddr);
157 void writeHostList();
158 void dhtLog(const char *format, ...);
162 /*******************************************************************************
164 *******************************************************************************/
166 //make sure this matches enumeration above
167 const char *msg_types[NUM_MSG_TYPES] =
194 const char *state_names[NUM_STATES] =
199 "LEAD_NORMAL1_STATE",
200 "LEAD_NORMAL2_STATE",
209 "LEAD_REBUILD1_STATE",
210 "LEAD_REBUILD2_STATE",
211 "LEAD_REBUILD3_STATE",
212 "LEAD_REBUILD4_STATE",
217 //note: { 0, 0 } means no timeout
218 struct timeval timeout_vals[NUM_STATES] =
220 { 0, 500000 }, //INIT1_STATE
221 { 0, 500000 }, //INIT2_STATE
222 { 0, 0 }, //NORMAL_STATE
223 { 0, 0 }, //LEAD_NORMAL1_STATE
224 { 3, 0 }, //LEAD_NORMAL2_STATE
225 { 1, 0 }, //ELECT1_STATE
226 { 1, 0 }, //ELECT2_STATE
227 { 0, 500000 }, //REBUILD0_STATE
228 { 0, 500000 }, //REBUILD1_STATE
229 { 10, 0 }, //REBUILD2_STATE
230 { 10, 0 }, //REBUILD3_STATE
231 { 10, 0 }, //REBUILD4_STATE
232 { 1, 0 }, //REBUILD5_STATE
233 { 1, 0 }, //LEAD_REBUILD1_STATE
234 { 1, 0 }, //LEAD_REBUILD2_STATE
235 { 10, 0 }, //LEAD_REBUILD3_STATE
236 { 10, 0 }, //LEAD_REBUILD4_STATE
237 { 0, 500000 }, //EXIT1_STATE
238 { 0, 0 } //EXIT2_STATE
241 int retry_vals[NUM_STATES] =
246 0, //LEAD_NORMAL1_STATE
247 0, //LEAD_NORMAL2_STATE
256 10, //LEAD_REBUILD1_STATE
257 10, //LEAD_REBUILD2_STATE
258 10, //LEAD_REBUILD3_STATE
259 10, //LEAD_REBUILD4_STATE
265 struct hostData myHostData;
266 pthread_t threadUdpListen;
267 pthread_t threadFillTask;
268 //status of fillTask: 0 = ready to run, 1 = running, 2 = completed, 3 = error
270 struct pollfd udpPollSock;
274 unsigned int electionOriginator;
275 unsigned int electionParent;
276 unsigned int hostArraySize = 0;
277 struct hostData *hostArray = NULL;
278 unsigned int numBlocks = 0;
279 unsigned short *blockOwnerArray = NULL;
280 unsigned char *hostReplied = NULL;
281 pthread_mutex_t stateMutex;
282 pthread_cond_t stateCond;
283 chashtable_t *myHashTable;
284 unsigned int numHosts;
285 struct timeval timer;
289 /*******************************************************************************
290 * Interface Function Definitions
291 *******************************************************************************/
293 void dhtInit(unsigned int seedIpAddr, unsigned int maxKeyCapacity)
295 struct in_addr tmpAddr;
296 char filename[23] = "dht-";
297 struct sockaddr_in myAddr;
298 struct sockaddr_in seedAddr;
299 socklen_t socklen = sizeof(struct sockaddr_in);
302 tmpAddr.s_addr = htonl(getMyIpAddr(DEFAULT_INTERFACE));
303 strcat(filename, inet_ntoa(tmpAddr));
304 strcat(filename, ".log");
305 printf("log file: %s\n", filename);
307 logfile = fopen(filename, "w");
308 dhtLog("dhtInit(): inializing...\n");
310 myHostData.ipAddr = getMyIpAddr(DEFAULT_INTERFACE);
311 myHostData.maxKeyCapacity = maxKeyCapacity;
315 electionOriginator = 0;
317 hostArraySize = INIT_HOST_ALLOC;
318 hostArray = calloc(hostArraySize, sizeof(struct hostData));
319 hostReplied = calloc(hostArraySize, sizeof(unsigned char));
320 hostArray[0] = myHostData;
322 numBlocks = INIT_NUM_BLOCKS;
323 blockOwnerArray = calloc(numBlocks, sizeof(unsigned short));
324 pthread_mutex_init(&stateMutex, NULL);
325 pthread_cond_init(&stateCond, NULL);
326 myHashTable = chashCreate(HASH_SIZE, LOADFACTOR);
328 udpPollSock.fd = socket(AF_INET, SOCK_DGRAM, IPPROTO_UDP);
329 if (udpPollSock.fd < 0)
330 perror("dhtInit():socket()");
332 udpPollSock.events = POLLIN;
334 bzero(&myAddr, socklen);
335 myAddr.sin_family = AF_INET;
336 myAddr.sin_addr.s_addr = htonl(INADDR_ANY);
337 myAddr.sin_port = htons(UDP_PORT);
339 if (bind(udpPollSock.fd, (struct sockaddr *)&myAddr, socklen) < 0)
340 perror("dhtInit():bind()");
344 dhtLog("I am the leader\n");
345 leader = myHostData.ipAddr;
346 setState(LEAD_NORMAL1_STATE);
350 initMsg = WHO_IS_LEADER_CMD;
351 udpSend(&initMsg, 1, seed);
352 setState(INIT1_STATE);
355 if (pthread_create(&threadUdpListen, NULL, udpListen, NULL) != 0)
356 dhtLog("dhtInit() - ERROR creating threadUdpListen\n");
362 { //TODO: do this gracefully, wait for response from leader, etc.
366 udpSend(&msg, 1, leader);
367 dhtLog("dhtExit(): cleaning up...\n");
368 pthread_cancel(threadUdpListen);
369 close(udpPollSock.fd);
372 free(blockOwnerArray);
378 int dhtInsert(unsigned int key, unsigned int val)
380 struct sockaddr_in toAddr;
381 struct sockaddr_in fromAddr;
382 socklen_t socklen = sizeof(struct sockaddr_in);
383 struct pollfd pollsock;
391 bzero((char *)&toAddr, socklen);
392 toAddr.sin_family = AF_INET;
393 toAddr.sin_port = htons(UDP_PORT);
395 while (status != OPERATION_OK)
397 pthread_mutex_lock(&stateMutex);
398 while (!(state == NORMAL_STATE || state == LEAD_NORMAL1_STATE
399 || state == LEAD_NORMAL2_STATE || state == REBUILD4_STATE
400 || state == LEAD_REBUILD3_STATE))
401 pthread_cond_wait(&stateCond, &stateMutex);
402 toAddr.sin_addr.s_addr = htonl(getKeyOwner(key));
403 pthread_mutex_unlock(&stateMutex);
405 if ((pollsock.fd = socket(AF_INET, SOCK_DGRAM, 0)) < 0)
407 perror("dhtInsert():socket()");
410 pollsock.events = POLLIN;
412 outBuffer[0] = INSERT_CMD;
413 write4(&outBuffer[1], key);
414 write4(&outBuffer[5], val);
416 for (i = 0; i < INSERT_RETRIES; i++)
418 if (sendto(pollsock.fd, outBuffer, 9, 0, (struct sockaddr *)&toAddr,
421 perror("dhtInsert():sendto()");
424 retval = poll(&pollsock, 1, INSERT_TIMEOUT_MS);
427 perror("dhtInsert():poll()");
432 bytesRcvd = recvfrom(pollsock.fd, inBuffer, 2, 0,
433 (struct sockaddr *)&fromAddr, &socklen);
434 if (fromAddr.sin_addr.s_addr == toAddr.sin_addr.s_addr
435 && fromAddr.sin_port == toAddr.sin_port
436 && bytesRcvd == 2 && inBuffer[0] == INSERT_RES)
438 status = inBuffer[1]; //status from remote host
443 if (status != OPERATION_OK)
445 pthread_mutex_lock(&stateMutex);
446 setState(REBUILD0_STATE);
447 outBuffer[0] = REBUILD_REQ;
448 udpSend(outBuffer, 1, leader);
449 pthread_mutex_unlock(&stateMutex);
458 int dhtInsertMult(unsigned int numKeys, unsigned int *keys, unsigned int *vals)
464 for (i = 0; i < numKeys; i++)
466 if (dhtInsert(keys[i], vals[i]) != 0)
472 int dhtRemove(unsigned int key)
474 struct sockaddr_in toAddr;
475 struct sockaddr_in fromAddr;
476 socklen_t socklen = sizeof(struct sockaddr_in);
477 struct pollfd pollsock;
485 bzero((char *)&toAddr, socklen);
486 toAddr.sin_family = AF_INET;
487 toAddr.sin_port = htons(UDP_PORT);
489 while (!(status == OPERATION_OK || status == KEY_NOT_FOUND))
491 pthread_mutex_lock(&stateMutex);
492 while (!(state == NORMAL_STATE || state == LEAD_NORMAL1_STATE
493 || state == LEAD_NORMAL2_STATE))
494 pthread_cond_wait(&stateCond, &stateMutex);
495 toAddr.sin_addr.s_addr = htonl(getKeyOwner(key));
496 pthread_mutex_unlock(&stateMutex);
498 if ((pollsock.fd = socket(AF_INET, SOCK_DGRAM, 0)) < 0)
500 perror("dhtRemove():socket()");
503 pollsock.events = POLLIN;
505 outBuffer[0] = REMOVE_CMD;
506 write4(&outBuffer[1], key);
508 for (i = 0; i < REMOVE_RETRIES; i++)
510 if (sendto(pollsock.fd, outBuffer, 5, 0, (struct sockaddr *)&toAddr,
513 perror("dhtRemove():sendto()");
516 retval = poll(&pollsock, 1, REMOVE_TIMEOUT_MS);
519 perror("dhtRemove():poll()");
524 bytesRcvd = recvfrom(pollsock.fd, inBuffer, 2, 0,
525 (struct sockaddr *)&fromAddr, &socklen);
526 if (fromAddr.sin_addr.s_addr == toAddr.sin_addr.s_addr
527 && fromAddr.sin_port == toAddr.sin_port
528 && bytesRcvd == 2 && inBuffer[0] == REMOVE_RES)
530 status = inBuffer[1]; //status from remote host
535 if (!(status == OPERATION_OK || status == KEY_NOT_FOUND))
537 pthread_mutex_lock(&stateMutex);
538 setState(REBUILD0_STATE);
539 outBuffer[0] = REBUILD_REQ;
540 udpSend(outBuffer, 1, leader);
541 pthread_mutex_unlock(&stateMutex);
550 int dhtRemoveMult(unsigned int numKeys, unsigned int *keys)
556 for (i = 0; i < numKeys; i++)
558 if (dhtRemove(keys[i]) != 0)
564 int dhtSearch(unsigned int key, unsigned int *val)
566 struct sockaddr_in toAddr;
567 struct sockaddr_in fromAddr;
568 socklen_t socklen = sizeof(struct sockaddr_in);
569 struct pollfd pollsock;
577 bzero((char *)&toAddr, socklen);
578 toAddr.sin_family = AF_INET;
579 toAddr.sin_port = htons(UDP_PORT);
581 while (!(status == OPERATION_OK || status == KEY_NOT_FOUND))
583 pthread_mutex_lock(&stateMutex);
584 while (numBlocks == 0)
585 pthread_cond_wait(&stateCond, &stateMutex);
586 toAddr.sin_addr.s_addr = htonl(getKeyOwner(key));
587 pthread_mutex_unlock(&stateMutex);
589 if ((pollsock.fd = socket(AF_INET, SOCK_DGRAM, 0)) < 0)
591 perror("dhtSearch():socket()");
594 pollsock.events = POLLIN;
596 outBuffer[0] = SEARCH_CMD;
597 write4(&outBuffer[1], key);
599 for (i = 0; i < SEARCH_RETRIES; i++)
601 if (sendto(pollsock.fd, outBuffer, 5, 0, (struct sockaddr *)&toAddr,
604 perror("dhtSearch():sendto()");
607 retval = poll(&pollsock, 1, SEARCH_TIMEOUT_MS);
610 perror("dhtSearch():poll()");
615 bytesRcvd = recvfrom(pollsock.fd, inBuffer, 6, 0,
616 (struct sockaddr *)&fromAddr, &socklen);
617 if (fromAddr.sin_addr.s_addr == toAddr.sin_addr.s_addr
618 && fromAddr.sin_port == toAddr.sin_port
619 && bytesRcvd == 6 && inBuffer[0] == SEARCH_RES)
621 status = inBuffer[1]; //status from remote host
622 *val = read4(&inBuffer[2]);
627 if (!(status == OPERATION_OK || status == KEY_NOT_FOUND))
629 pthread_mutex_lock(&stateMutex);
630 setState(REBUILD0_STATE);
631 outBuffer[0] = REBUILD_REQ;
632 udpSend(outBuffer, 1, leader);
633 pthread_mutex_unlock(&stateMutex);
642 int dhtSearchMult(unsigned int numKeys, unsigned int *keys, unsigned int *vals)
646 for (i = 0; i < numKeys; i++)
648 if (dhtSearch(keys[i], &vals[i]) != 0)
654 /*******************************************************************************
655 * Local Function Definitions
656 *******************************************************************************/
658 int msgSizeOk(unsigned char *msg, unsigned int size)
660 unsigned short tmpNumHosts;
661 unsigned short tmpNumBlocks;
668 case WHO_IS_LEADER_CMD:
676 case RESUME_NORMAL_CMD:
677 case RESUME_NORMAL_RES:
685 case WHO_IS_LEADER_RES:
687 case ELECT_LEADER_CMD:
696 tmpNumHosts = read2(&msg[1]);
697 tmpNumBlocks = read2(&msg[3]);
698 return (size == (5+sizeof(struct hostData)*tmpNumHosts+2*tmpNumBlocks));
699 case ELECT_LEADER_RES:
706 tmpNumHosts = read2(&msg[2]);
707 return (size == (4 + sizeof(struct hostData) * tmpNumHosts));
711 tmpNumHosts = read2(&msg[1]);
712 return (size == (3 + sizeof(struct hostData) * tmpNumHosts));
718 unsigned short read2(unsigned char *ptr)
720 unsigned short tmp = (ptr[1] << 8) | ptr[0];
724 unsigned int read4(unsigned char *ptr)
726 unsigned int tmp = (ptr[3] << 24) | (ptr[2] << 16) | (ptr[1] << 8) | ptr[0];
730 void write2(unsigned char *ptr, unsigned short tmp)
732 ptr[1] = (tmp >> 8) & 0xFF;
737 void write4(unsigned char *ptr, unsigned int tmp)
739 ptr[3] = (tmp >> 24) & 0xFF;
740 ptr[2] = (tmp >> 16) & 0xFF;
741 ptr[1] = (tmp >> 8) & 0xFF;
746 unsigned int getMyIpAddr(const char *interfaceStr)
749 struct ifreq interfaceInfo;
750 struct sockaddr_in *myAddr = (struct sockaddr_in *)&interfaceInfo.ifr_addr;
752 memset(&interfaceInfo, 0, sizeof(struct ifreq));
754 if((sock = socket(AF_INET, SOCK_STREAM, 0)) < 0)
756 perror("getMyIpAddr():socket()");
760 strcpy(interfaceInfo.ifr_name, interfaceStr);
761 myAddr->sin_family = AF_INET;
763 if(ioctl(sock, SIOCGIFADDR, &interfaceInfo) != 0)
765 perror("getMyIpAddr():ioctl()");
769 return ntohl(myAddr->sin_addr.s_addr);
772 int udpSend(unsigned char *msg, unsigned int size, unsigned int destIp)
774 struct sockaddr_in peerAddr;
775 socklen_t socklen = sizeof(struct sockaddr_in);
777 bzero(&peerAddr, socklen);
778 peerAddr.sin_family = AF_INET;
779 peerAddr.sin_addr.s_addr = htonl(destIp);
780 peerAddr.sin_port = htons(UDP_PORT);
784 if (msg[0] < NUM_MSG_TYPES)
785 dhtLog("udpSend(): sending %s to %s, %d bytes\n", msg_types[msg[0]],
786 inet_ntoa(peerAddr.sin_addr), size);
788 dhtLog("udpSend(): sending unknown message to %s, %d bytes\n",
789 inet_ntoa(peerAddr.sin_addr), size);
792 if (sendto(udpPollSock.fd, (void *)msg, size, 0, (struct sockaddr *)&peerAddr,
795 perror("udpSend():sendto()");
802 int udpSendAll(unsigned char *msg, unsigned int size)
806 for (i = 0; i < numHosts; i++)
808 if ((hostReplied[i] == 0) && (hostArray[i].ipAddr != myHostData.ipAddr))
810 if (udpSend(msg, size, hostArray[i].ipAddr) != 0)
817 //note: make sure this is only executed in a valid state, where numBlocks != 0
818 unsigned int hash(unsigned int x)
820 return (x % numBlocks);
823 //note: make sure this is only executed in a valid state, where these arrays
824 // are allocated and the index mappings are consistent
825 unsigned int getKeyOwner(unsigned int key)
827 return hostArray[blockOwnerArray[hash(key)]].ipAddr;
830 //sets state and timer, if applicable
831 void setState(unsigned int newState)
836 gettimeofday(&now, NULL);
838 if (newState >= NUM_STATES)
840 dhtLog("setState(): ERROR: invalid state %d\n", newState);
844 if (timeout_vals[newState].tv_sec == 0
845 && timeout_vals[newState].tv_usec == 0)
851 timeradd(&now, &timeout_vals[newState], &timer);
856 //TODO: only do this for states that require it
857 for (i = 0; i < numHosts; i++)
860 dhtLog("setState(): state set to %s\n", state_names[state]);
866 //TODO: improve these simple and inefficient functions
867 int checkReplied(unsigned int ipAddr)
871 i = findHost(ipAddr);
885 for (i = 0; i < numHosts; i++)
886 if ((hostReplied[i] == 0) && (hostArray[i].ipAddr != myHostData.ipAddr))
892 int findHost(unsigned int ipAddr)
896 for (i = 0; i < numHosts; i++)
897 if (hostArray[i].ipAddr == ipAddr)
898 return i; //found, return index
900 return -1; //not found
903 int removeHost(unsigned int ipAddr)
907 i = findHost(ipAddr);
912 for (j = 0; j < numBlocks; j++)
914 if (blockOwnerArray[j] == i)
915 blockOwnerArray[j] = 0; //TODO: is this what I want to have happen?
916 else if (blockOwnerArray[j] > i)
917 blockOwnerArray[j]--;
920 for (; i < numHosts - 1; i++)
922 hostArray[i] = hostArray[i+1];
923 hostReplied[i] = hostReplied[i+1];
930 void removeUnresponsiveHosts()
934 for (i = 0; i < numHosts; i++)
936 if (!hostReplied[i] && hostArray[i].ipAddr != myHostData.ipAddr)
937 removeHost(hostArray[i].ipAddr);
941 int addHost(struct hostData newHost)
943 struct hostData *newHostArray;
944 unsigned char *newHostReplied;
948 for (i = 0; i < numHosts; i++)
950 if (hostArray[i].ipAddr == newHost.ipAddr)
952 hostArray[i] = newHost;
956 else if (hostArray[i].ipAddr > newHost.ipAddr)
958 if (numHosts == hostArraySize)
960 newHostArray = calloc(2 * hostArraySize, sizeof(struct hostData));
961 newHostReplied = calloc(2 * hostArraySize, sizeof(unsigned char));
962 memcpy(newHostArray, hostArray, (i * sizeof(struct hostData)));
963 memcpy(newHostReplied, hostReplied, (i * sizeof(unsigned char)));
964 newHostArray[i] = newHost;
965 newHostReplied[i] = 0;
966 memcpy(&newHostArray[i+1], &hostArray[i], ((numHosts - i) *
967 sizeof(struct hostData)));
968 memcpy(&newHostReplied[i+1], &hostReplied[i], ((numHosts - i) *
969 sizeof(unsigned char)));
972 hostArray = newHostArray;
973 hostReplied = newHostReplied;
974 hostArraySize = 2 * hostArraySize;
978 for (j = numHosts; j > i; j--)
980 hostArray[j] = hostArray[j-1];
981 hostReplied[j] = hostReplied[j-1];
983 hostArray[i] = newHost;
986 for(j = 0; j < numBlocks; j++)
988 if (blockOwnerArray[j] >= i)
989 blockOwnerArray[j]++;
996 //nothing greater, add to end
997 if (numHosts == hostArraySize)
999 newHostArray = calloc(2 * hostArraySize, sizeof(struct hostData));
1000 newHostReplied = calloc(2 * hostArraySize, sizeof(unsigned char));
1001 memcpy(newHostArray, hostArray, (numHosts * sizeof(struct hostData)));
1002 memcpy(newHostReplied, hostReplied, (numHosts * sizeof(unsigned char)));
1005 hostArray = newHostArray;
1006 hostReplied = newHostReplied;
1007 hostArraySize = 2 * hostArraySize;
1010 hostArray[numHosts] = newHost;
1011 hostReplied[numHosts] = 0;
1016 void makeAssignments()
1020 if (numBlocks < numHosts)
1022 free(blockOwnerArray);
1023 while (numBlocks < numHosts)
1025 blockOwnerArray = calloc(numBlocks, sizeof(unsigned short));
1028 for (i = 0; i < numBlocks; i++)
1029 blockOwnerArray[i] = i % numHosts;
1034 void writeHostList()
1037 struct in_addr tmpAddr;
1039 fprintf(logfile, "numHosts = %d\n", numHosts);
1040 for (i = 0; i < numHosts; i++)
1042 tmpAddr.s_addr = htonl(hostArray[i].ipAddr);
1043 fprintf(logfile, "%d) %s, %d\n", i, inet_ntoa(tmpAddr),
1044 hostArray[i].maxKeyCapacity);
1049 void dhtLog(const char *format, ...)
1052 // struct timeval now;
1054 // if (gettimeofday(&now, NULL) < 0)
1055 // { perror("dhtLog():gettimeofday()"); }
1056 va_start(args, format);
1057 // if (fprintf(logfile, "%d.%06d:", now.tv_sec, now.tv_usec) < 0)
1058 // { perror("dhtLog():fprintf()"); }
1059 if (vfprintf(logfile, format, args) < 0)
1060 { perror("dhtLog():vfprintf()"); }
1061 if (fflush(logfile) == EOF)
1062 { perror("dhtLog():fflush()"); }
1072 unsigned int numKeys;
1075 vals = mhashGetKeys(&numKeys); //note: key of mhash is val of dht
1076 keys = calloc(numKeys, sizeof(unsigned int));
1078 for (i = 0; i < numKeys; i++)
1079 keys[i] = myHostData.ipAddr;
1081 if (dhtInsertMult(numKeys, keys, vals) == 0)
1092 struct sockaddr_in peerAddr;
1093 unsigned int peerIp;
1094 socklen_t socklen = sizeof(struct sockaddr_in);
1095 unsigned char inBuffer[MAX_MSG_SIZE];
1096 unsigned char outBuffer[MAX_MSG_SIZE];
1099 struct in_addr tmpAddr;
1100 struct hostData tmpHost;
1101 unsigned int tmpKey;
1102 unsigned int tmpVal;
1103 struct hostData *hostDataPtr;
1104 unsigned short *uShortPtr;
1105 unsigned int tmpUInt;
1106 unsigned int tmpUShort;
1108 unsigned int oldState;
1110 dhtLog("udpListen(): linstening on port %d...\n", UDP_PORT);
1114 pollret = poll(&udpPollSock, 1, TIMEOUT_PERIOD);
1115 pthread_mutex_lock(&stateMutex);
1119 perror("udpListen():poll()");
1121 else if (pollret > 0)
1123 bytesRcvd = recvfrom(udpPollSock.fd, inBuffer, MAX_MSG_SIZE, 0,
1124 (struct sockaddr *)&peerAddr, &socklen);
1127 dhtLog("udpListen(): ERROR: bytesRcvd = %d\n", bytesRcvd);
1129 else if (inBuffer[0] >= NUM_MSG_TYPES)
1131 dhtLog("udpListen(): ERROR: unknown msg type = %d\n", inBuffer[0]);
1133 else if (!msgSizeOk(inBuffer, bytesRcvd))
1135 dhtLog("udpListen(): ERROR: msg size not ok: type = %s\n, size = %d\n",
1136 msg_types[inBuffer[0]], bytesRcvd);
1138 else if (state == EXIT2_STATE)
1142 else if (state == INIT1_STATE)
1143 { //after initialization with seed, do not proceed until seed replies
1144 dhtLog("udpListen(): received %s from %s, %d bytes\n",
1145 msg_types[inBuffer[0]], inet_ntoa(peerAddr.sin_addr), bytesRcvd);
1146 for (i = 0; i < bytesRcvd; i++)
1147 dhtLog(" %x", inBuffer[i]);
1149 peerIp = ntohl(peerAddr.sin_addr.s_addr);
1150 if (peerIp == seed && inBuffer[0] == WHO_IS_LEADER_RES)
1152 tmpHost.ipAddr = peerIp;
1153 tmpHost.maxKeyCapacity = 0;
1156 leader = read4(&inBuffer[1]);
1157 tmpAddr.s_addr = htonl(leader);
1158 dhtLog("leader = %s\n", inet_ntoa(tmpAddr));
1161 setState(INIT2_STATE);
1162 outBuffer[0] = JOIN_REQ;
1163 write4(&outBuffer[1], myHostData.maxKeyCapacity);
1164 udpSend(outBuffer, 5, leader);
1168 electionOriginator = myHostData.ipAddr;
1169 setState(ELECT1_STATE);
1170 outBuffer[0] = ELECT_LEADER_CMD;
1171 write4(&outBuffer[1], myHostData.ipAddr); //originator = me
1172 udpSendAll(outBuffer, 5);
1178 dhtLog("udpListen(): received %s from %s, %d bytes\n",
1179 msg_types[inBuffer[0]], inet_ntoa(peerAddr.sin_addr), bytesRcvd);
1180 for (i = 0; i < bytesRcvd; i++)
1181 dhtLog(" %x", inBuffer[i]);
1183 peerIp = ntohl(peerAddr.sin_addr.s_addr);
1184 switch (inBuffer[0])
1187 if (state == NORMAL_STATE || state == LEAD_NORMAL1_STATE
1188 || state == LEAD_NORMAL2_STATE || state == REBUILD4_STATE
1189 || state == REBUILD5_STATE || state == LEAD_REBUILD3_STATE)
1191 tmpKey = read4(&inBuffer[1]);
1192 tmpVal = read4(&inBuffer[5]);
1193 outBuffer[0] = INSERT_RES;
1194 if (getKeyOwner(tmpKey) == myHostData.ipAddr)
1196 if (chashInsert(myHashTable, tmpKey, (void *)tmpVal) == 0)
1197 outBuffer[1] = OPERATION_OK;
1199 outBuffer[1] = INTERNAL_ERROR;
1203 outBuffer[1] = NOT_KEY_OWNER;
1205 //reply to client socket
1206 sendto(udpPollSock.fd, outBuffer, 2, 0,
1207 (struct sockaddr *)&peerAddr, socklen);
1211 if (state == NORMAL_STATE || state == LEAD_NORMAL1_STATE
1212 || state == LEAD_NORMAL2_STATE)
1214 tmpKey = read4(&inBuffer[1]);
1215 outBuffer[0] = REMOVE_RES;
1216 if (getKeyOwner(tmpKey) == myHostData.ipAddr)
1218 if (chashRemove(myHashTable, tmpKey) == 0)
1219 outBuffer[1] = OPERATION_OK;
1221 outBuffer[1] = KEY_NOT_FOUND;
1225 outBuffer[1] = NOT_KEY_OWNER;
1227 //reply to client socket
1228 sendto(udpPollSock.fd, outBuffer, 2, 0,
1229 (struct sockaddr *)&peerAddr, socklen);
1233 if (state == NORMAL_STATE || state == LEAD_NORMAL1_STATE
1234 || state == LEAD_NORMAL2_STATE)
1236 tmpKey = read4(&inBuffer[1]);
1237 outBuffer[0] = SEARCH_RES;
1238 if (getKeyOwner(tmpKey) == myHostData.ipAddr)
1240 if ((tmpVal = (unsigned int)chashSearch(myHashTable, tmpKey)) != 0)
1242 outBuffer[1] = OPERATION_OK;
1243 write4(&outBuffer[2], tmpVal);
1247 outBuffer[1] = KEY_NOT_FOUND;
1248 write4(&outBuffer[2], 0);
1253 outBuffer[1] = NOT_KEY_OWNER;
1254 write4(&outBuffer[2], 0);
1256 //reply to client socket
1257 sendto(udpPollSock.fd, outBuffer, 6, 0,
1258 (struct sockaddr *)&peerAddr, socklen);
1261 case WHO_IS_LEADER_CMD:
1262 tmpHost.ipAddr = peerIp;
1263 tmpHost.maxKeyCapacity = 0;
1266 outBuffer[0] = WHO_IS_LEADER_RES;
1267 //leader == 0 means I don't know who it is
1268 write4(&outBuffer[1], leader);
1269 udpSend(outBuffer, 5, peerIp);
1272 if (state == LEAD_NORMAL1_STATE || state == LEAD_NORMAL2_STATE)
1274 tmpHost.ipAddr = peerIp;
1275 tmpHost.maxKeyCapacity = read4(&inBuffer[1]);
1278 if (state == LEAD_NORMAL1_STATE)
1279 setState(LEAD_NORMAL2_STATE);
1280 outBuffer[0] = JOIN_RES;
1281 outBuffer[1] = 0; //status, success
1282 udpSend(outBuffer, 2, peerIp);
1284 else if (state == LEAD_REBUILD1_STATE)
1286 //note: I don't need to addHost().
1287 checkReplied(peerIp);
1288 outBuffer[0] = JOIN_RES;
1289 outBuffer[1] = 0; //status, success
1290 udpSend(outBuffer, 2, peerIp);
1294 setState(LEAD_REBUILD2_STATE);
1295 outBuffer[0] = DHT_UPDATE_CMD;
1296 write2(&outBuffer[1], numHosts);
1297 write2(&outBuffer[3], numBlocks);
1298 memcpy(&outBuffer[5], hostArray, numHosts*sizeof(struct hostData));
1299 memcpy(&outBuffer[5+numHosts*sizeof(struct hostData)],
1300 blockOwnerArray, numBlocks*2);
1301 udpSendAll(outBuffer, 5 + sizeof(struct hostData) * numHosts
1307 if (state == REBUILD1_STATE)
1309 setState(REBUILD2_STATE);
1311 else if (state == INIT2_STATE)
1313 setState(NORMAL_STATE);
1317 if (state == LEAD_NORMAL1_STATE || state == LEAD_NORMAL2_STATE)
1318 { //TODO: make this graceful, instead of just rebuilding
1320 if (state != LEAD_NORMAL2_STATE)
1321 setState(LEAD_NORMAL2_STATE);
1324 case DHT_UPDATE_CMD:
1325 if (state == REBUILD2_STATE && peerIp == leader)
1328 free(blockOwnerArray);
1329 numHosts = read2(&inBuffer[1]);
1330 numBlocks = read2(&inBuffer[3]);
1331 while (hostArraySize < numHosts)
1333 hostArray = calloc(hostArraySize, sizeof(struct hostData));
1334 blockOwnerArray = calloc(numBlocks, 2);
1335 memcpy(hostArray, &inBuffer[5], numHosts*sizeof(struct hostData));
1336 memcpy(blockOwnerArray, &inBuffer[5+numHosts*sizeof(struct hostData)], numBlocks*2);
1338 setState(REBUILD3_STATE);
1339 outBuffer[0] = DHT_UPDATE_RES;
1340 udpSend(outBuffer, 1, peerIp);
1343 case DHT_UPDATE_RES:
1344 if (state == LEAD_REBUILD2_STATE)
1346 checkReplied(peerIp);
1349 setState(LEAD_REBUILD3_STATE);
1350 outBuffer[0] = FILL_DHT_CMD;
1351 udpSendAll(outBuffer, 1);
1352 if (fillStatus != 0)
1353 dhtLog("udpListen(): ERROR: fillTask already running\n");
1355 if (pthread_create(&threadFillTask, NULL, fillTask, NULL) != 0)
1356 dhtLog("udpListen(): ERROR creating threadFillTask\n");
1360 case ELECT_LEADER_CMD:
1361 tmpUInt = read4(&inBuffer[1]);
1362 if ((state == ELECT1_STATE || state == ELECT2_STATE)
1363 && tmpUInt >= electionOriginator)
1364 { //already participating in a higher-priority election
1365 outBuffer[0] = ELECT_LEADER_RES;
1366 outBuffer[1] = 0xFF;
1367 udpSend(outBuffer, 2, peerIp);
1371 electionOriginator = tmpUInt;
1372 electionParent = peerIp;
1373 setState(ELECT1_STATE);
1374 outBuffer[0] = ELECT_LEADER_CMD;
1375 write4(&outBuffer[1], electionOriginator);
1376 //don't bother forwarding the message to originator or parent
1377 checkReplied(electionOriginator);
1378 checkReplied(electionParent);
1380 { //in case that is everybody I know of
1381 setState(ELECT2_STATE);
1382 outBuffer[0] = ELECT_LEADER_RES;
1384 write2(&outBuffer[2], numHosts);
1385 memcpy(&outBuffer[4], hostArray, sizeof(struct hostData)
1387 udpSend(outBuffer, 4 + sizeof(struct hostData) * numHosts,
1392 udpSendAll(outBuffer, 5);
1396 case ELECT_LEADER_RES:
1397 if (state == ELECT1_STATE)
1399 checkReplied(peerIp);
1400 if (inBuffer[1] != 0xFF)
1402 tmpUShort = read2(&inBuffer[2]);
1403 hostDataPtr = (struct hostData *)&inBuffer[4];
1404 for (i = 0; i < tmpUShort; i++)
1405 addHost(hostDataPtr[i]);
1410 setState(ELECT2_STATE);
1411 if (electionOriginator == myHostData.ipAddr)
1413 leader = hostArray[0].ipAddr;
1414 if (leader == myHostData.ipAddr)
1416 dhtLog("I am the leader!\n");
1417 setState(LEAD_REBUILD1_STATE);
1418 outBuffer[0] = REBUILD_CMD;
1419 udpSendAll(outBuffer, 1);
1423 outBuffer[0] = CONGRATS_CMD;
1424 write2(&outBuffer[1], numHosts);
1425 hostDataPtr = (struct hostData *)&outBuffer[3];
1426 for (i = 0; i < numHosts; i++)
1427 hostDataPtr[i] = hostArray[i];
1428 udpSend(outBuffer, 3 + sizeof(struct hostData) * numHosts,
1434 outBuffer[0] = ELECT_LEADER_RES;
1436 write2(&outBuffer[2], numHosts);
1437 hostDataPtr = (struct hostData *)&outBuffer[4];
1438 for (i = 0; i < numHosts; i++)
1439 hostDataPtr[i] = hostArray[i];
1440 udpSend(outBuffer, 4 + sizeof(struct hostData) * numHosts,
1447 if (state == ELECT2_STATE)
1449 leader = myHostData.ipAddr;
1450 dhtLog("I am the leader!\n");
1451 tmpUShort = read2(&inBuffer[1]);
1452 hostDataPtr = (struct hostData *)&inBuffer[3];
1453 for (i = 0; i < tmpUShort; i++)
1454 addHost(hostDataPtr[i]);
1456 setState(LEAD_REBUILD1_STATE);
1457 outBuffer[0] = REBUILD_CMD;
1458 udpSendAll(outBuffer, 1);
1462 if (state == LEAD_NORMAL1_STATE || state == LEAD_NORMAL2_STATE)
1464 setState(LEAD_REBUILD1_STATE);
1465 outBuffer[0] = REBUILD_CMD;
1466 udpSendAll(outBuffer, 1);
1470 leader = peerIp; //consider this a declaration of authority
1471 setState(REBUILD1_STATE);
1472 outBuffer[0] = JOIN_REQ;
1473 write4(&outBuffer[1], myHostData.maxKeyCapacity);
1474 udpSend(outBuffer, 5, leader);
1477 if (state == REBUILD3_STATE && peerIp == leader)
1479 setState(REBUILD4_STATE);
1480 if (fillStatus != 0)
1481 dhtLog("udpListen(): ERROR: fillTask already running\n");
1483 if (pthread_create(&threadFillTask, NULL, fillTask, NULL) != 0)
1484 dhtLog("udpListen(): ERROR creating threadFillTask\n");
1488 if (state == LEAD_REBUILD3_STATE)
1490 checkReplied(peerIp);
1491 if (allReplied() && fillStatus == 2)
1494 setState(LEAD_REBUILD4_STATE);
1495 outBuffer[0] = RESUME_NORMAL_CMD;
1496 udpSendAll(outBuffer, 1);
1500 case RESUME_NORMAL_CMD:
1501 if (state == REBUILD5_STATE && peerIp == leader)
1503 setState(NORMAL_STATE);
1504 outBuffer[0] = RESUME_NORMAL_RES;
1505 udpSend(outBuffer, 1, leader);
1508 case RESUME_NORMAL_RES:
1509 if (state == LEAD_REBUILD4_STATE)
1511 checkReplied(peerIp);
1514 setState(LEAD_NORMAL1_STATE);
1521 if (state == REBUILD4_STATE)
1525 case 0: dhtLog("udpListen(): ERROR: fillStatus=0 in REBUILD4_STATE\n");
1527 case 1: //do nothing
1529 case 2: //done filling the dht, notify leader
1531 setState(REBUILD5_STATE);
1532 outBuffer[0] = FILL_DHT_RES;
1533 udpSend(outBuffer, 1, leader);
1535 case 3: //error encountered -> restart rebuild
1537 setState(REBUILD0_STATE);
1538 outBuffer[0] = REBUILD_REQ;
1539 udpSend(outBuffer, 1, leader);
1543 if (state == LEAD_REBUILD3_STATE)
1547 case 0: dhtLog("udpListen(): ERROR: fillStatus=0 in LEAD_REBUILD3_STATE\n");
1549 case 1: //do nothing
1551 case 2: //I'm done, now is everybody else also done?
1555 setState(LEAD_REBUILD4_STATE);
1556 outBuffer[0] = RESUME_NORMAL_CMD;
1557 udpSendAll(outBuffer, 1);
1560 case 3: //error encountered -> restart rebuild
1562 setState(LEAD_REBUILD1_STATE);
1563 outBuffer[0] = REBUILD_CMD;
1564 udpSendAll(outBuffer, 1);
1570 gettimeofday(&now, NULL);
1571 if (timercmp(&now, &timer, >))
1573 if (timeoutCntr < retry_vals[state])
1576 timeradd(&now, &timeout_vals[state], &timer);
1577 dhtLog("udpListen(): retry: %d\n", timeoutCntr);
1581 outBuffer[0] = WHO_IS_LEADER_CMD;
1582 udpSend(outBuffer, 1, seed);
1585 outBuffer[0] = JOIN_REQ;
1586 write4(&outBuffer[1], myHostData.maxKeyCapacity);
1587 udpSend(outBuffer, 5, leader);
1590 outBuffer[0] = ELECT_LEADER_CMD;
1591 write4(&outBuffer[1], electionOriginator);
1592 udpSendAll(outBuffer, 5);
1595 if (electionOriginator == myHostData.ipAddr)
1596 { //retry notify leader
1597 outBuffer[0] = CONGRATS_CMD;
1598 write2(&outBuffer[1], numHosts);
1599 memcpy(&outBuffer[3], hostArray, sizeof(struct hostData)
1601 udpSend(outBuffer, 3 + sizeof(struct hostData) * numHosts,
1606 outBuffer[0] = ELECT_LEADER_RES;
1608 write2(&outBuffer[2], numHosts);
1609 memcpy(&outBuffer[4], hostArray, sizeof(struct hostData)
1611 udpSend(outBuffer, 4 + sizeof(struct hostData) * numHosts,
1615 case REBUILD0_STATE:
1616 outBuffer[0] = REBUILD_REQ;
1617 udpSend(outBuffer, 1, leader);
1619 case REBUILD1_STATE:
1620 outBuffer[0] = JOIN_REQ;
1621 write4(&outBuffer[1], myHostData.maxKeyCapacity);
1622 udpSend(outBuffer, 5, leader);
1624 case REBUILD5_STATE:
1625 outBuffer[0] = FILL_DHT_RES;
1626 udpSend(outBuffer, 1, leader);
1628 case LEAD_REBUILD1_STATE:
1629 outBuffer[0] = REBUILD_CMD;
1630 udpSendAll(outBuffer, 1);
1632 case LEAD_REBUILD2_STATE:
1633 outBuffer[0] = DHT_UPDATE_CMD;
1634 write2(&outBuffer[1], numHosts);
1635 write2(&outBuffer[3], numBlocks);
1636 memcpy(&outBuffer[5], hostArray, numHosts
1637 * sizeof(struct hostData));
1638 memcpy(&outBuffer[5+numHosts*sizeof(struct hostData)],
1639 blockOwnerArray, numBlocks*2);
1640 udpSendAll(outBuffer, 5 + sizeof(struct hostData) * numHosts
1643 case LEAD_REBUILD3_STATE:
1644 outBuffer[0] = FILL_DHT_CMD;
1645 udpSendAll(outBuffer, 1);
1647 case LEAD_REBUILD4_STATE:
1648 outBuffer[0] = RESUME_NORMAL_CMD;
1649 udpSendAll(outBuffer, 1);
1651 case EXIT1_STATE: //TODO...
1654 case LEAD_NORMAL1_STATE:
1655 case LEAD_NORMAL2_STATE:
1656 case REBUILD2_STATE:
1657 case REBUILD3_STATE:
1658 case REBUILD4_STATE:
1659 case EXIT2_STATE: //we shouldn't get here
1665 dhtLog("udpListen(): timed out in state %s after %d retries\n",
1666 state_names[state], timeoutCntr);
1670 setState(EXIT2_STATE);
1672 case LEAD_NORMAL2_STATE:
1673 setState(LEAD_REBUILD1_STATE);
1674 outBuffer[0] = REBUILD_CMD;
1675 udpSendAll(outBuffer, 1);
1678 dhtLog("removing unresponsive hosts, before:\n");
1680 removeUnresponsiveHosts();
1683 setState(ELECT2_STATE);
1684 if (electionOriginator == myHostData.ipAddr)
1686 leader = hostArray[0].ipAddr;
1687 if (leader == myHostData.ipAddr)
1689 dhtLog("I am the leader!\n");
1690 setState(LEAD_REBUILD1_STATE);
1691 outBuffer[0] = REBUILD_CMD;
1692 udpSendAll(outBuffer, 1);
1696 outBuffer[0] = CONGRATS_CMD;
1697 write2(&outBuffer[1], numHosts);
1698 memcpy(&outBuffer[3], hostArray, sizeof(struct hostData)
1700 udpSend(outBuffer, 3 + sizeof(struct hostData) * numHosts,
1706 outBuffer[0] = ELECT_LEADER_RES;
1708 write2(&outBuffer[2], numHosts);
1709 memcpy(&outBuffer[4], hostArray, sizeof(struct hostData)
1711 udpSend(outBuffer, 4 + sizeof(struct hostData) * numHosts,
1717 case REBUILD0_STATE:
1718 case REBUILD1_STATE:
1719 case REBUILD2_STATE:
1720 case REBUILD3_STATE:
1721 case REBUILD4_STATE:
1722 case REBUILD5_STATE:
1723 case LEAD_REBUILD1_STATE:
1724 case LEAD_REBUILD2_STATE:
1725 case LEAD_REBUILD3_STATE:
1726 case LEAD_REBUILD4_STATE:
1728 electionOriginator = myHostData.ipAddr;
1729 setState(ELECT1_STATE);
1730 outBuffer[0] = ELECT_LEADER_CMD;
1731 write4(&outBuffer[1], myHostData.ipAddr); //originator = me
1732 udpSendAll(outBuffer, 5);
1735 setState(EXIT2_STATE);
1738 case LEAD_NORMAL1_STATE:
1739 case EXIT2_STATE: //we shouldn't get here
1745 if (state != oldState)
1746 pthread_cond_broadcast(&stateCond);
1747 pthread_mutex_unlock(&stateMutex);