1 /*******************************************************************************
4 * High-performance Distributed Hash Table for finding the location of objects
5 * in a Distributed Shared Transactional Memory system.
10 * 1) Instead of having dhtInsertMult, dhtSearchMult, etc. call their single-key
11 * counterparts repeatedly, define some new messages to handle it more
13 * 2) Improve the efficiency of functions that work with hostArray, hostReplied,
14 * and blockOwnerArray.
15 * 3) Currently a join or leave causes a rebuild of the entire hash table.
16 * Implement more graceful join and leave procedures.
17 * 4) Fine tune timeout values for performance, possibly implement a backoff
18 * algorithm to prevent overloading the network.
19 * 5) Whatever else I'm forgetting
21 *******************************************************************************/
22 /*******************************************************************************
24 *******************************************************************************/
26 #include <netinet/in.h>
27 #include <arpa/inet.h>
28 #include <sys/types.h>
29 #include <sys/socket.h>
30 #include <sys/ioctl.h>
41 #include <linux/sockios.h>
43 #include <sys/queue.h>
45 #include "clookup.h" //this works for now, do we need anything better?
48 /*******************************************************************************
49 * Local Defines, Structs
50 *******************************************************************************/
52 #define MAX_MSG_SIZE 1500
54 #define INIT_HOST_ALLOC 3
55 #define INIT_NUM_BLOCKS 16
56 #define DEFAULT_INTERFACE "eth0"
57 #define TIMEOUT_PERIOD 100
58 #define INSERT_TIMEOUT_MS 500
59 #define INSERT_RETRIES 50
60 #define REMOVE_TIMEOUT_MS 500
61 #define REMOVE_RETRIES 50
62 #define SEARCH_TIMEOUT_MS 500
63 #define SEARCH_RETRIES 50
66 //make sure this matches msg_types global var
96 //make sure this matches state_names, timeout_vals, and retry_vals global vars
134 unsigned int maxKeyCapacity;
137 /*******************************************************************************
138 * Local Function Prototypes
139 *******************************************************************************/
141 int msgSizeOk(unsigned char *msg, unsigned int size);
142 unsigned short read2(unsigned char *msg);
143 unsigned int read4(unsigned char *msg);
144 void write2(unsigned char *ptr, unsigned short tmp);
145 void write4(unsigned char *ptr, unsigned int tmp);
146 unsigned int getMyIpAddr(const char *interfaceStr);
147 int udpSend(unsigned char *msg, unsigned int size, unsigned int destIp);
148 int udpSendAll(unsigned char *msg, unsigned int size);
149 unsigned int hash(unsigned int x);
150 unsigned int getKeyOwner(unsigned int key);
151 void setState(unsigned int newState);
152 void makeAssignments();
153 int addHost(struct hostData newHost);
154 int removeHost(unsigned int ipAddr);
155 void removeUnresponsiveHosts();
156 int checkReplied(unsigned int ipAddr);
158 void writeHostList();
159 void dhtLog(const char *format, ...);
163 /*******************************************************************************
165 *******************************************************************************/
167 //make sure this matches enumeration above
168 const char *msg_types[NUM_MSG_TYPES] =
195 const char *state_names[NUM_STATES] =
200 "LEAD_NORMAL1_STATE",
201 "LEAD_NORMAL2_STATE",
210 "LEAD_REBUILD1_STATE",
211 "LEAD_REBUILD2_STATE",
212 "LEAD_REBUILD3_STATE",
213 "LEAD_REBUILD4_STATE",
218 //note: { 0, 0 } means no timeout
219 struct timeval timeout_vals[NUM_STATES] =
221 { 0, 500000 }, //INIT1_STATE
222 { 0, 500000 }, //INIT2_STATE
223 { 0, 0 }, //NORMAL_STATE
224 { 0, 0 }, //LEAD_NORMAL1_STATE
225 { 3, 0 }, //LEAD_NORMAL2_STATE
226 { 1, 0 }, //ELECT1_STATE
227 { 1, 0 }, //ELECT2_STATE
228 { 0, 500000 }, //REBUILD0_STATE
229 { 0, 500000 }, //REBUILD1_STATE
230 { 10, 0 }, //REBUILD2_STATE
231 { 10, 0 }, //REBUILD3_STATE
232 { 10, 0 }, //REBUILD4_STATE
233 { 1, 0 }, //REBUILD5_STATE
234 { 1, 0 }, //LEAD_REBUILD1_STATE
235 { 1, 0 }, //LEAD_REBUILD2_STATE
236 { 10, 0 }, //LEAD_REBUILD3_STATE
237 { 10, 0 }, //LEAD_REBUILD4_STATE
238 { 0, 500000 }, //EXIT1_STATE
239 { 0, 0 } //EXIT2_STATE
242 int retry_vals[NUM_STATES] =
247 0, //LEAD_NORMAL1_STATE
248 0, //LEAD_NORMAL2_STATE
257 10, //LEAD_REBUILD1_STATE
258 10, //LEAD_REBUILD2_STATE
259 10, //LEAD_REBUILD3_STATE
260 10, //LEAD_REBUILD4_STATE
266 struct hostData myHostData;
267 pthread_t threadUdpListen;
268 pthread_t threadFillTask;
269 //status of fillTask: 0 = ready to run, 1 = running, 2 = completed, 3 = error
271 struct pollfd udpPollSock;
275 unsigned int electionOriginator;
276 unsigned int electionParent;
277 unsigned int hostArraySize = 0;
278 struct hostData *hostArray = NULL;
279 unsigned int numBlocks = 0;
280 unsigned short *blockOwnerArray = NULL;
281 unsigned char *hostReplied = NULL;
282 pthread_mutex_t stateMutex;
283 pthread_cond_t stateCond;
284 chashtable_t *myHashTable;
285 unsigned int numHosts;
286 struct timeval timer;
290 /*******************************************************************************
291 * Interface Function Definitions
292 *******************************************************************************/
294 void dhtInit(unsigned int seedIpAddr, unsigned int maxKeyCapacity)
296 struct in_addr tmpAddr;
297 char filename[23] = "dht-";
298 struct sockaddr_in myAddr;
299 struct sockaddr_in seedAddr;
300 socklen_t socklen = sizeof(struct sockaddr_in);
303 tmpAddr.s_addr = htonl(getMyIpAddr(DEFAULT_INTERFACE));
304 strcat(filename, inet_ntoa(tmpAddr));
305 strcat(filename, ".log");
306 printf("log file: %s\n", filename);
308 logfile = fopen(filename, "w");
309 dhtLog("dhtInit(): inializing...\n");
311 myHostData.ipAddr = getMyIpAddr(DEFAULT_INTERFACE);
312 myHostData.maxKeyCapacity = maxKeyCapacity;
316 electionOriginator = 0;
318 hostArraySize = INIT_HOST_ALLOC;
319 hostArray = calloc(hostArraySize, sizeof(struct hostData));
320 hostReplied = calloc(hostArraySize, sizeof(unsigned char));
321 hostArray[0] = myHostData;
323 numBlocks = INIT_NUM_BLOCKS;
324 blockOwnerArray = calloc(numBlocks, sizeof(unsigned short));
325 pthread_mutex_init(&stateMutex, NULL);
326 pthread_cond_init(&stateCond, NULL);
327 myHashTable = chashCreate(HASH_SIZE, LOADFACTOR);
329 udpPollSock.fd = socket(AF_INET, SOCK_DGRAM, IPPROTO_UDP);
330 if (udpPollSock.fd < 0)
331 perror("dhtInit():socket()");
333 udpPollSock.events = POLLIN;
335 bzero(&myAddr, socklen);
336 myAddr.sin_family = AF_INET;
337 myAddr.sin_addr.s_addr = htonl(INADDR_ANY);
338 myAddr.sin_port = htons(UDP_PORT);
340 if (bind(udpPollSock.fd, (struct sockaddr *)&myAddr, socklen) < 0)
341 perror("dhtInit():bind()");
345 dhtLog("I am the leader\n");
346 leader = myHostData.ipAddr;
347 setState(LEAD_NORMAL1_STATE);
351 initMsg = WHO_IS_LEADER_CMD;
352 udpSend(&initMsg, 1, seed);
353 setState(INIT1_STATE);
356 if (pthread_create(&threadUdpListen, NULL, udpListen, NULL) != 0)
357 dhtLog("dhtInit() - ERROR creating threadUdpListen\n");
363 { //TODO: do this gracefully, wait for response from leader, etc.
367 udpSend(&msg, 1, leader);
368 dhtLog("dhtExit(): cleaning up...\n");
369 pthread_cancel(threadUdpListen);
370 close(udpPollSock.fd);
373 free(blockOwnerArray);
379 int dhtInsert(unsigned int key, unsigned int val)
381 struct sockaddr_in toAddr;
382 struct sockaddr_in fromAddr;
383 socklen_t socklen = sizeof(struct sockaddr_in);
384 struct pollfd pollsock;
392 bzero((char *)&toAddr, socklen);
393 toAddr.sin_family = AF_INET;
394 toAddr.sin_port = htons(UDP_PORT);
396 while (status != OPERATION_OK)
398 pthread_mutex_lock(&stateMutex);
399 while (!(state == NORMAL_STATE || state == LEAD_NORMAL1_STATE
400 || state == LEAD_NORMAL2_STATE || state == REBUILD4_STATE
401 || state == LEAD_REBUILD3_STATE))
402 pthread_cond_wait(&stateCond, &stateMutex);
403 toAddr.sin_addr.s_addr = htonl(getKeyOwner(key));
404 pthread_mutex_unlock(&stateMutex);
406 if ((pollsock.fd = socket(AF_INET, SOCK_DGRAM, 0)) < 0)
408 perror("dhtInsert():socket()");
411 pollsock.events = POLLIN;
413 outBuffer[0] = INSERT_CMD;
414 write4(&outBuffer[1], key);
415 write4(&outBuffer[5], val);
417 for (i = 0; i < INSERT_RETRIES; i++)
419 if (sendto(pollsock.fd, outBuffer, 9, 0, (struct sockaddr *)&toAddr,
422 perror("dhtInsert():sendto()");
425 retval = poll(&pollsock, 1, INSERT_TIMEOUT_MS);
428 perror("dhtInsert():poll()");
433 bytesRcvd = recvfrom(pollsock.fd, inBuffer, 2, 0,
434 (struct sockaddr *)&fromAddr, &socklen);
435 if (fromAddr.sin_addr.s_addr == toAddr.sin_addr.s_addr
436 && fromAddr.sin_port == toAddr.sin_port
437 && bytesRcvd == 2 && inBuffer[0] == INSERT_RES)
439 status = inBuffer[1]; //status from remote host
444 if (status != OPERATION_OK)
446 pthread_mutex_lock(&stateMutex);
447 setState(REBUILD0_STATE);
448 outBuffer[0] = REBUILD_REQ;
449 udpSend(outBuffer, 1, leader);
450 pthread_mutex_unlock(&stateMutex);
459 int dhtInsertMult(unsigned int numKeys, unsigned int *keys, unsigned int *vals)
465 for (i = 0; i < numKeys; i++)
467 if (dhtInsert(keys[i], vals[i]) != 0)
473 int dhtRemove(unsigned int key)
475 struct sockaddr_in toAddr;
476 struct sockaddr_in fromAddr;
477 socklen_t socklen = sizeof(struct sockaddr_in);
478 struct pollfd pollsock;
486 bzero((char *)&toAddr, socklen);
487 toAddr.sin_family = AF_INET;
488 toAddr.sin_port = htons(UDP_PORT);
490 while (!(status == OPERATION_OK || status == KEY_NOT_FOUND))
492 pthread_mutex_lock(&stateMutex);
493 while (!(state == NORMAL_STATE || state == LEAD_NORMAL1_STATE
494 || state == LEAD_NORMAL2_STATE))
495 pthread_cond_wait(&stateCond, &stateMutex);
496 toAddr.sin_addr.s_addr = htonl(getKeyOwner(key));
497 pthread_mutex_unlock(&stateMutex);
499 if ((pollsock.fd = socket(AF_INET, SOCK_DGRAM, 0)) < 0)
501 perror("dhtRemove():socket()");
504 pollsock.events = POLLIN;
506 outBuffer[0] = REMOVE_CMD;
507 write4(&outBuffer[1], key);
509 for (i = 0; i < REMOVE_RETRIES; i++)
511 if (sendto(pollsock.fd, outBuffer, 5, 0, (struct sockaddr *)&toAddr,
514 perror("dhtRemove():sendto()");
517 retval = poll(&pollsock, 1, REMOVE_TIMEOUT_MS);
520 perror("dhtRemove():poll()");
525 bytesRcvd = recvfrom(pollsock.fd, inBuffer, 2, 0,
526 (struct sockaddr *)&fromAddr, &socklen);
527 if (fromAddr.sin_addr.s_addr == toAddr.sin_addr.s_addr
528 && fromAddr.sin_port == toAddr.sin_port
529 && bytesRcvd == 2 && inBuffer[0] == REMOVE_RES)
531 status = inBuffer[1]; //status from remote host
536 if (!(status == OPERATION_OK || status == KEY_NOT_FOUND))
538 pthread_mutex_lock(&stateMutex);
539 setState(REBUILD0_STATE);
540 outBuffer[0] = REBUILD_REQ;
541 udpSend(outBuffer, 1, leader);
542 pthread_mutex_unlock(&stateMutex);
551 int dhtRemoveMult(unsigned int numKeys, unsigned int *keys)
557 for (i = 0; i < numKeys; i++)
559 if (dhtRemove(keys[i]) != 0)
565 int dhtSearch(unsigned int key, unsigned int *val)
567 struct sockaddr_in toAddr;
568 struct sockaddr_in fromAddr;
569 socklen_t socklen = sizeof(struct sockaddr_in);
570 struct pollfd pollsock;
578 bzero((char *)&toAddr, socklen);
579 toAddr.sin_family = AF_INET;
580 toAddr.sin_port = htons(UDP_PORT);
582 while (!(status == OPERATION_OK || status == KEY_NOT_FOUND))
584 pthread_mutex_lock(&stateMutex);
585 while (numBlocks == 0)
586 pthread_cond_wait(&stateCond, &stateMutex);
587 toAddr.sin_addr.s_addr = htonl(getKeyOwner(key));
588 pthread_mutex_unlock(&stateMutex);
590 if ((pollsock.fd = socket(AF_INET, SOCK_DGRAM, 0)) < 0)
592 perror("dhtSearch():socket()");
595 pollsock.events = POLLIN;
597 outBuffer[0] = SEARCH_CMD;
598 write4(&outBuffer[1], key);
600 for (i = 0; i < SEARCH_RETRIES; i++)
602 if (sendto(pollsock.fd, outBuffer, 5, 0, (struct sockaddr *)&toAddr,
605 perror("dhtSearch():sendto()");
608 retval = poll(&pollsock, 1, SEARCH_TIMEOUT_MS);
611 perror("dhtSearch():poll()");
616 bytesRcvd = recvfrom(pollsock.fd, inBuffer, 6, 0,
617 (struct sockaddr *)&fromAddr, &socklen);
618 if (fromAddr.sin_addr.s_addr == toAddr.sin_addr.s_addr
619 && fromAddr.sin_port == toAddr.sin_port
620 && bytesRcvd == 6 && inBuffer[0] == SEARCH_RES)
622 status = inBuffer[1]; //status from remote host
623 *val = read4(&inBuffer[2]);
628 if (!(status == OPERATION_OK || status == KEY_NOT_FOUND))
630 pthread_mutex_lock(&stateMutex);
631 setState(REBUILD0_STATE);
632 outBuffer[0] = REBUILD_REQ;
633 udpSend(outBuffer, 1, leader);
634 pthread_mutex_unlock(&stateMutex);
643 int dhtSearchMult(unsigned int numKeys, unsigned int *keys, unsigned int *vals)
647 for (i = 0; i < numKeys; i++)
649 if (dhtSearch(keys[i], &vals[i]) != 0)
655 /*******************************************************************************
656 * Local Function Definitions
657 *******************************************************************************/
659 int msgSizeOk(unsigned char *msg, unsigned int size)
661 unsigned short tmpNumHosts;
662 unsigned short tmpNumBlocks;
669 case WHO_IS_LEADER_CMD:
677 case RESUME_NORMAL_CMD:
678 case RESUME_NORMAL_RES:
686 case WHO_IS_LEADER_RES:
688 case ELECT_LEADER_CMD:
697 tmpNumHosts = read2(&msg[1]);
698 tmpNumBlocks = read2(&msg[3]);
699 return (size == (5+sizeof(struct hostData)*tmpNumHosts+2*tmpNumBlocks));
700 case ELECT_LEADER_RES:
707 tmpNumHosts = read2(&msg[2]);
708 return (size == (4 + sizeof(struct hostData) * tmpNumHosts));
712 tmpNumHosts = read2(&msg[1]);
713 return (size == (3 + sizeof(struct hostData) * tmpNumHosts));
719 unsigned short read2(unsigned char *ptr)
721 unsigned short tmp = (ptr[1] << 8) | ptr[0];
725 unsigned int read4(unsigned char *ptr)
727 unsigned int tmp = (ptr[3] << 24) | (ptr[2] << 16) | (ptr[1] << 8) | ptr[0];
731 void write2(unsigned char *ptr, unsigned short tmp)
733 ptr[1] = (tmp >> 8) & 0xFF;
738 void write4(unsigned char *ptr, unsigned int tmp)
740 ptr[3] = (tmp >> 24) & 0xFF;
741 ptr[2] = (tmp >> 16) & 0xFF;
742 ptr[1] = (tmp >> 8) & 0xFF;
747 unsigned int getMyIpAddr(const char *interfaceStr)
750 struct ifreq interfaceInfo;
751 struct sockaddr_in *myAddr = (struct sockaddr_in *)&interfaceInfo.ifr_addr;
753 memset(&interfaceInfo, 0, sizeof(struct ifreq));
755 if((sock = socket(AF_INET, SOCK_STREAM, 0)) < 0)
757 perror("getMyIpAddr():socket()");
761 strcpy(interfaceInfo.ifr_name, interfaceStr);
762 myAddr->sin_family = AF_INET;
764 if(ioctl(sock, SIOCGIFADDR, &interfaceInfo) != 0)
766 perror("getMyIpAddr():ioctl()");
770 return ntohl(myAddr->sin_addr.s_addr);
773 int udpSend(unsigned char *msg, unsigned int size, unsigned int destIp)
775 struct sockaddr_in peerAddr;
776 socklen_t socklen = sizeof(struct sockaddr_in);
778 bzero(&peerAddr, socklen);
779 peerAddr.sin_family = AF_INET;
780 peerAddr.sin_addr.s_addr = htonl(destIp);
781 peerAddr.sin_port = htons(UDP_PORT);
785 if (msg[0] < NUM_MSG_TYPES)
786 dhtLog("udpSend(): sending %s to %s, %d bytes\n", msg_types[msg[0]],
787 inet_ntoa(peerAddr.sin_addr), size);
789 dhtLog("udpSend(): sending unknown message to %s, %d bytes\n",
790 inet_ntoa(peerAddr.sin_addr), size);
793 if (sendto(udpPollSock.fd, (void *)msg, size, 0, (struct sockaddr *)&peerAddr,
796 perror("udpSend():sendto()");
803 int udpSendAll(unsigned char *msg, unsigned int size)
807 for (i = 0; i < numHosts; i++)
809 if ((hostReplied[i] == 0) && (hostArray[i].ipAddr != myHostData.ipAddr))
811 if (udpSend(msg, size, hostArray[i].ipAddr) != 0)
818 //note: make sure this is only executed in a valid state, where numBlocks != 0
819 unsigned int hash(unsigned int x)
821 return (x % numBlocks);
824 //note: make sure this is only executed in a valid state, where these arrays
825 // are allocated and the index mappings are consistent
826 unsigned int getKeyOwner(unsigned int key)
828 return hostArray[blockOwnerArray[hash(key)]].ipAddr;
831 //sets state and timer, if applicable
832 void setState(unsigned int newState)
837 gettimeofday(&now, NULL);
839 if (newState >= NUM_STATES)
841 dhtLog("setState(): ERROR: invalid state %d\n", newState);
845 if (timeout_vals[newState].tv_sec == 0
846 && timeout_vals[newState].tv_usec == 0)
852 timeradd(&now, &timeout_vals[newState], &timer);
857 //TODO: only do this for states that require it
858 for (i = 0; i < numHosts; i++)
861 dhtLog("setState(): state set to %s\n", state_names[state]);
867 //TODO: improve these simple and inefficient functions
868 int checkReplied(unsigned int ipAddr)
872 i = findHost(ipAddr);
886 for (i = 0; i < numHosts; i++)
887 if ((hostReplied[i] == 0) && (hostArray[i].ipAddr != myHostData.ipAddr))
893 int findHost(unsigned int ipAddr)
897 for (i = 0; i < numHosts; i++)
898 if (hostArray[i].ipAddr == ipAddr)
899 return i; //found, return index
901 return -1; //not found
904 int removeHost(unsigned int ipAddr)
908 i = findHost(ipAddr);
913 for (j = 0; j < numBlocks; j++)
915 if (blockOwnerArray[j] == i)
916 blockOwnerArray[j] = 0; //TODO: is this what I want to have happen?
917 else if (blockOwnerArray[j] > i)
918 blockOwnerArray[j]--;
921 for (; i < numHosts - 1; i++)
923 hostArray[i] = hostArray[i+1];
924 hostReplied[i] = hostReplied[i+1];
931 void removeUnresponsiveHosts()
935 for (i = 0; i < numHosts; i++)
937 if (!hostReplied[i] && hostArray[i].ipAddr != myHostData.ipAddr)
938 removeHost(hostArray[i].ipAddr);
942 int addHost(struct hostData newHost)
944 struct hostData *newHostArray;
945 unsigned char *newHostReplied;
949 for (i = 0; i < numHosts; i++)
951 if (hostArray[i].ipAddr == newHost.ipAddr)
953 hostArray[i] = newHost;
957 else if (hostArray[i].ipAddr > newHost.ipAddr)
959 if (numHosts == hostArraySize)
961 newHostArray = calloc(2 * hostArraySize, sizeof(struct hostData));
962 newHostReplied = calloc(2 * hostArraySize, sizeof(unsigned char));
963 memcpy(newHostArray, hostArray, (i * sizeof(struct hostData)));
964 memcpy(newHostReplied, hostReplied, (i * sizeof(unsigned char)));
965 newHostArray[i] = newHost;
966 newHostReplied[i] = 0;
967 memcpy(&newHostArray[i+1], &hostArray[i], ((numHosts - i) *
968 sizeof(struct hostData)));
969 memcpy(&newHostReplied[i+1], &hostReplied[i], ((numHosts - i) *
970 sizeof(unsigned char)));
973 hostArray = newHostArray;
974 hostReplied = newHostReplied;
975 hostArraySize = 2 * hostArraySize;
979 for (j = numHosts; j > i; j--)
981 hostArray[j] = hostArray[j-1];
982 hostReplied[j] = hostReplied[j-1];
984 hostArray[i] = newHost;
987 for(j = 0; j < numBlocks; j++)
989 if (blockOwnerArray[j] >= i)
990 blockOwnerArray[j]++;
997 //nothing greater, add to end
998 if (numHosts == hostArraySize)
1000 newHostArray = calloc(2 * hostArraySize, sizeof(struct hostData));
1001 newHostReplied = calloc(2 * hostArraySize, sizeof(unsigned char));
1002 memcpy(newHostArray, hostArray, (numHosts * sizeof(struct hostData)));
1003 memcpy(newHostReplied, hostReplied, (numHosts * sizeof(unsigned char)));
1006 hostArray = newHostArray;
1007 hostReplied = newHostReplied;
1008 hostArraySize = 2 * hostArraySize;
1011 hostArray[numHosts] = newHost;
1012 hostReplied[numHosts] = 0;
1017 void makeAssignments()
1021 if (numBlocks < numHosts)
1023 free(blockOwnerArray);
1024 while (numBlocks < numHosts)
1026 blockOwnerArray = calloc(numBlocks, sizeof(unsigned short));
1029 for (i = 0; i < numBlocks; i++)
1030 blockOwnerArray[i] = i % numHosts;
1035 void writeHostList()
1038 struct in_addr tmpAddr;
1040 fprintf(logfile, "numHosts = %d\n", numHosts);
1041 for (i = 0; i < numHosts; i++)
1043 tmpAddr.s_addr = htonl(hostArray[i].ipAddr);
1044 fprintf(logfile, "%d) %s, %d\n", i, inet_ntoa(tmpAddr),
1045 hostArray[i].maxKeyCapacity);
1050 void dhtLog(const char *format, ...)
1053 // struct timeval now;
1055 // if (gettimeofday(&now, NULL) < 0)
1056 // { perror("dhtLog():gettimeofday()"); }
1057 va_start(args, format);
1058 // if (fprintf(logfile, "%d.%06d:", now.tv_sec, now.tv_usec) < 0)
1059 // { perror("dhtLog():fprintf()"); }
1060 if (vfprintf(logfile, format, args) < 0)
1061 { perror("dhtLog():vfprintf()"); }
1062 if (fflush(logfile) == EOF)
1063 { perror("dhtLog():fflush()"); }
1073 unsigned int numKeys;
1076 vals = mhashGetKeys(&numKeys); //note: key of mhash is val of dht
1077 keys = calloc(numKeys, sizeof(unsigned int));
1079 for (i = 0; i < numKeys; i++)
1080 keys[i] = myHostData.ipAddr;
1082 if (dhtInsertMult(numKeys, keys, vals) == 0)
1093 struct sockaddr_in peerAddr;
1094 unsigned int peerIp;
1095 socklen_t socklen = sizeof(struct sockaddr_in);
1096 unsigned char inBuffer[MAX_MSG_SIZE];
1097 unsigned char outBuffer[MAX_MSG_SIZE];
1100 struct in_addr tmpAddr;
1101 struct hostData tmpHost;
1102 unsigned int tmpKey;
1103 unsigned int tmpVal;
1104 struct hostData *hostDataPtr;
1105 unsigned short *uShortPtr;
1106 unsigned int tmpUInt;
1107 unsigned int tmpUShort;
1109 unsigned int oldState;
1111 dhtLog("udpListen(): linstening on port %d...\n", UDP_PORT);
1115 pollret = poll(&udpPollSock, 1, TIMEOUT_PERIOD);
1116 pthread_mutex_lock(&stateMutex);
1120 perror("udpListen():poll()");
1122 else if (pollret > 0)
1124 bytesRcvd = recvfrom(udpPollSock.fd, inBuffer, MAX_MSG_SIZE, 0,
1125 (struct sockaddr *)&peerAddr, &socklen);
1128 dhtLog("udpListen(): ERROR: bytesRcvd = %d\n", bytesRcvd);
1130 else if (inBuffer[0] >= NUM_MSG_TYPES)
1132 dhtLog("udpListen(): ERROR: unknown msg type = %d\n", inBuffer[0]);
1134 else if (!msgSizeOk(inBuffer, bytesRcvd))
1136 dhtLog("udpListen(): ERROR: msg size not ok: type = %s\n, size = %d\n",
1137 msg_types[inBuffer[0]], bytesRcvd);
1139 else if (state == EXIT2_STATE)
1143 else if (state == INIT1_STATE)
1144 { //after initialization with seed, do not proceed until seed replies
1145 dhtLog("udpListen(): received %s from %s, %d bytes\n",
1146 msg_types[inBuffer[0]], inet_ntoa(peerAddr.sin_addr), bytesRcvd);
1147 for (i = 0; i < bytesRcvd; i++)
1148 dhtLog(" %x", inBuffer[i]);
1150 peerIp = ntohl(peerAddr.sin_addr.s_addr);
1151 if (peerIp == seed && inBuffer[0] == WHO_IS_LEADER_RES)
1153 tmpHost.ipAddr = peerIp;
1154 tmpHost.maxKeyCapacity = 0;
1157 leader = read4(&inBuffer[1]);
1158 tmpAddr.s_addr = htonl(leader);
1159 dhtLog("leader = %s\n", inet_ntoa(tmpAddr));
1162 setState(INIT2_STATE);
1163 outBuffer[0] = JOIN_REQ;
1164 write4(&outBuffer[1], myHostData.maxKeyCapacity);
1165 udpSend(outBuffer, 5, leader);
1169 electionOriginator = myHostData.ipAddr;
1170 setState(ELECT1_STATE);
1171 outBuffer[0] = ELECT_LEADER_CMD;
1172 write4(&outBuffer[1], myHostData.ipAddr); //originator = me
1173 udpSendAll(outBuffer, 5);
1179 dhtLog("udpListen(): received %s from %s, %d bytes\n",
1180 msg_types[inBuffer[0]], inet_ntoa(peerAddr.sin_addr), bytesRcvd);
1181 for (i = 0; i < bytesRcvd; i++)
1182 dhtLog(" %x", inBuffer[i]);
1184 peerIp = ntohl(peerAddr.sin_addr.s_addr);
1185 switch (inBuffer[0])
1188 if (state == NORMAL_STATE || state == LEAD_NORMAL1_STATE
1189 || state == LEAD_NORMAL2_STATE || state == REBUILD4_STATE
1190 || state == REBUILD5_STATE || state == LEAD_REBUILD3_STATE)
1192 tmpKey = read4(&inBuffer[1]);
1193 tmpVal = read4(&inBuffer[5]);
1194 outBuffer[0] = INSERT_RES;
1195 if (getKeyOwner(tmpKey) == myHostData.ipAddr)
1197 if (chashInsert(myHashTable, tmpKey, (void *)tmpVal) == 0)
1198 outBuffer[1] = OPERATION_OK;
1200 outBuffer[1] = INTERNAL_ERROR;
1204 outBuffer[1] = NOT_KEY_OWNER;
1206 //reply to client socket
1207 sendto(udpPollSock.fd, outBuffer, 2, 0,
1208 (struct sockaddr *)&peerAddr, socklen);
1212 if (state == NORMAL_STATE || state == LEAD_NORMAL1_STATE
1213 || state == LEAD_NORMAL2_STATE)
1215 tmpKey = read4(&inBuffer[1]);
1216 outBuffer[0] = REMOVE_RES;
1217 if (getKeyOwner(tmpKey) == myHostData.ipAddr)
1219 if (chashRemove(myHashTable, tmpKey) == 0)
1220 outBuffer[1] = OPERATION_OK;
1222 outBuffer[1] = KEY_NOT_FOUND;
1226 outBuffer[1] = NOT_KEY_OWNER;
1228 //reply to client socket
1229 sendto(udpPollSock.fd, outBuffer, 2, 0,
1230 (struct sockaddr *)&peerAddr, socklen);
1234 if (state == NORMAL_STATE || state == LEAD_NORMAL1_STATE
1235 || state == LEAD_NORMAL2_STATE)
1237 tmpKey = read4(&inBuffer[1]);
1238 outBuffer[0] = SEARCH_RES;
1239 if (getKeyOwner(tmpKey) == myHostData.ipAddr)
1241 if ((tmpVal = (unsigned int)chashSearch(myHashTable, tmpKey)) != 0)
1243 outBuffer[1] = OPERATION_OK;
1244 write4(&outBuffer[2], tmpVal);
1248 outBuffer[1] = KEY_NOT_FOUND;
1249 write4(&outBuffer[2], 0);
1254 outBuffer[1] = NOT_KEY_OWNER;
1255 write4(&outBuffer[2], 0);
1257 //reply to client socket
1258 sendto(udpPollSock.fd, outBuffer, 6, 0,
1259 (struct sockaddr *)&peerAddr, socklen);
1262 case WHO_IS_LEADER_CMD:
1263 tmpHost.ipAddr = peerIp;
1264 tmpHost.maxKeyCapacity = 0;
1267 outBuffer[0] = WHO_IS_LEADER_RES;
1268 //leader == 0 means I don't know who it is
1269 write4(&outBuffer[1], leader);
1270 udpSend(outBuffer, 5, peerIp);
1273 if (state == LEAD_NORMAL1_STATE || state == LEAD_NORMAL2_STATE)
1275 tmpHost.ipAddr = peerIp;
1276 tmpHost.maxKeyCapacity = read4(&inBuffer[1]);
1279 if (state == LEAD_NORMAL1_STATE)
1280 setState(LEAD_NORMAL2_STATE);
1281 outBuffer[0] = JOIN_RES;
1282 outBuffer[1] = 0; //status, success
1283 udpSend(outBuffer, 2, peerIp);
1285 else if (state == LEAD_REBUILD1_STATE)
1287 //note: I don't need to addHost().
1288 checkReplied(peerIp);
1289 outBuffer[0] = JOIN_RES;
1290 outBuffer[1] = 0; //status, success
1291 udpSend(outBuffer, 2, peerIp);
1295 setState(LEAD_REBUILD2_STATE);
1296 outBuffer[0] = DHT_UPDATE_CMD;
1297 write2(&outBuffer[1], numHosts);
1298 write2(&outBuffer[3], numBlocks);
1299 memcpy(&outBuffer[5], hostArray, numHosts*sizeof(struct hostData));
1300 memcpy(&outBuffer[5+numHosts*sizeof(struct hostData)],
1301 blockOwnerArray, numBlocks*2);
1302 udpSendAll(outBuffer, 5 + sizeof(struct hostData) * numHosts
1308 if (state == REBUILD1_STATE)
1310 setState(REBUILD2_STATE);
1312 else if (state == INIT2_STATE)
1314 setState(NORMAL_STATE);
1318 if (state == LEAD_NORMAL1_STATE || state == LEAD_NORMAL2_STATE)
1319 { //TODO: make this graceful, instead of just rebuilding
1321 if (state != LEAD_NORMAL2_STATE)
1322 setState(LEAD_NORMAL2_STATE);
1325 case DHT_UPDATE_CMD:
1326 if (state == REBUILD2_STATE && peerIp == leader)
1329 free(blockOwnerArray);
1330 numHosts = read2(&inBuffer[1]);
1331 numBlocks = read2(&inBuffer[3]);
1332 while (hostArraySize < numHosts)
1334 hostArray = calloc(hostArraySize, sizeof(struct hostData));
1335 blockOwnerArray = calloc(numBlocks, 2);
1336 memcpy(hostArray, &inBuffer[5], numHosts*sizeof(struct hostData));
1337 memcpy(blockOwnerArray, &inBuffer[5+numHosts*sizeof(struct hostData)], numBlocks*2);
1339 setState(REBUILD3_STATE);
1340 outBuffer[0] = DHT_UPDATE_RES;
1341 udpSend(outBuffer, 1, peerIp);
1344 case DHT_UPDATE_RES:
1345 if (state == LEAD_REBUILD2_STATE)
1347 checkReplied(peerIp);
1350 setState(LEAD_REBUILD3_STATE);
1351 outBuffer[0] = FILL_DHT_CMD;
1352 udpSendAll(outBuffer, 1);
1353 if (fillStatus != 0)
1354 dhtLog("udpListen(): ERROR: fillTask already running\n");
1356 if (pthread_create(&threadFillTask, NULL, fillTask, NULL) != 0)
1357 dhtLog("udpListen(): ERROR creating threadFillTask\n");
1361 case ELECT_LEADER_CMD:
1362 tmpUInt = read4(&inBuffer[1]);
1363 if ((state == ELECT1_STATE || state == ELECT2_STATE)
1364 && tmpUInt >= electionOriginator)
1365 { //already participating in a higher-priority election
1366 outBuffer[0] = ELECT_LEADER_RES;
1367 outBuffer[1] = 0xFF;
1368 udpSend(outBuffer, 2, peerIp);
1372 electionOriginator = tmpUInt;
1373 electionParent = peerIp;
1374 setState(ELECT1_STATE);
1375 outBuffer[0] = ELECT_LEADER_CMD;
1376 write4(&outBuffer[1], electionOriginator);
1377 //don't bother forwarding the message to originator or parent
1378 checkReplied(electionOriginator);
1379 checkReplied(electionParent);
1381 { //in case that is everybody I know of
1382 setState(ELECT2_STATE);
1383 outBuffer[0] = ELECT_LEADER_RES;
1385 write2(&outBuffer[2], numHosts);
1386 memcpy(&outBuffer[4], hostArray, sizeof(struct hostData)
1388 udpSend(outBuffer, 4 + sizeof(struct hostData) * numHosts,
1393 udpSendAll(outBuffer, 5);
1397 case ELECT_LEADER_RES:
1398 if (state == ELECT1_STATE)
1400 checkReplied(peerIp);
1401 if (inBuffer[1] != 0xFF)
1403 tmpUShort = read2(&inBuffer[2]);
1404 hostDataPtr = (struct hostData *)&inBuffer[4];
1405 for (i = 0; i < tmpUShort; i++)
1406 addHost(hostDataPtr[i]);
1411 setState(ELECT2_STATE);
1412 if (electionOriginator == myHostData.ipAddr)
1414 leader = hostArray[0].ipAddr;
1415 if (leader == myHostData.ipAddr)
1417 dhtLog("I am the leader!\n");
1418 setState(LEAD_REBUILD1_STATE);
1419 outBuffer[0] = REBUILD_CMD;
1420 udpSendAll(outBuffer, 1);
1424 outBuffer[0] = CONGRATS_CMD;
1425 write2(&outBuffer[1], numHosts);
1426 hostDataPtr = (struct hostData *)&outBuffer[3];
1427 for (i = 0; i < numHosts; i++)
1428 hostDataPtr[i] = hostArray[i];
1429 udpSend(outBuffer, 3 + sizeof(struct hostData) * numHosts,
1435 outBuffer[0] = ELECT_LEADER_RES;
1437 write2(&outBuffer[2], numHosts);
1438 hostDataPtr = (struct hostData *)&outBuffer[4];
1439 for (i = 0; i < numHosts; i++)
1440 hostDataPtr[i] = hostArray[i];
1441 udpSend(outBuffer, 4 + sizeof(struct hostData) * numHosts,
1448 if (state == ELECT2_STATE)
1450 leader = myHostData.ipAddr;
1451 dhtLog("I am the leader!\n");
1452 tmpUShort = read2(&inBuffer[1]);
1453 hostDataPtr = (struct hostData *)&inBuffer[3];
1454 for (i = 0; i < tmpUShort; i++)
1455 addHost(hostDataPtr[i]);
1457 setState(LEAD_REBUILD1_STATE);
1458 outBuffer[0] = REBUILD_CMD;
1459 udpSendAll(outBuffer, 1);
1463 if (state == LEAD_NORMAL1_STATE || state == LEAD_NORMAL2_STATE)
1465 setState(LEAD_REBUILD1_STATE);
1466 outBuffer[0] = REBUILD_CMD;
1467 udpSendAll(outBuffer, 1);
1471 leader = peerIp; //consider this a declaration of authority
1472 setState(REBUILD1_STATE);
1473 outBuffer[0] = JOIN_REQ;
1474 write4(&outBuffer[1], myHostData.maxKeyCapacity);
1475 udpSend(outBuffer, 5, leader);
1478 if (state == REBUILD3_STATE && peerIp == leader)
1480 setState(REBUILD4_STATE);
1481 if (fillStatus != 0)
1482 dhtLog("udpListen(): ERROR: fillTask already running\n");
1484 if (pthread_create(&threadFillTask, NULL, fillTask, NULL) != 0)
1485 dhtLog("udpListen(): ERROR creating threadFillTask\n");
1489 if (state == LEAD_REBUILD3_STATE)
1491 checkReplied(peerIp);
1492 if (allReplied() && fillStatus == 2)
1495 setState(LEAD_REBUILD4_STATE);
1496 outBuffer[0] = RESUME_NORMAL_CMD;
1497 udpSendAll(outBuffer, 1);
1501 case RESUME_NORMAL_CMD:
1502 if (state == REBUILD5_STATE && peerIp == leader)
1504 setState(NORMAL_STATE);
1505 outBuffer[0] = RESUME_NORMAL_RES;
1506 udpSend(outBuffer, 1, leader);
1509 case RESUME_NORMAL_RES:
1510 if (state == LEAD_REBUILD4_STATE)
1512 checkReplied(peerIp);
1515 setState(LEAD_NORMAL1_STATE);
1522 if (state == REBUILD4_STATE)
1526 case 0: dhtLog("udpListen(): ERROR: fillStatus=0 in REBUILD4_STATE\n");
1528 case 1: //do nothing
1530 case 2: //done filling the dht, notify leader
1532 setState(REBUILD5_STATE);
1533 outBuffer[0] = FILL_DHT_RES;
1534 udpSend(outBuffer, 1, leader);
1536 case 3: //error encountered -> restart rebuild
1538 setState(REBUILD0_STATE);
1539 outBuffer[0] = REBUILD_REQ;
1540 udpSend(outBuffer, 1, leader);
1544 if (state == LEAD_REBUILD3_STATE)
1548 case 0: dhtLog("udpListen(): ERROR: fillStatus=0 in LEAD_REBUILD3_STATE\n");
1550 case 1: //do nothing
1552 case 2: //I'm done, now is everybody else also done?
1556 setState(LEAD_REBUILD4_STATE);
1557 outBuffer[0] = RESUME_NORMAL_CMD;
1558 udpSendAll(outBuffer, 1);
1561 case 3: //error encountered -> restart rebuild
1563 setState(LEAD_REBUILD1_STATE);
1564 outBuffer[0] = REBUILD_CMD;
1565 udpSendAll(outBuffer, 1);
1571 gettimeofday(&now, NULL);
1572 if (timercmp(&now, &timer, >))
1574 if (timeoutCntr < retry_vals[state])
1577 timeradd(&now, &timeout_vals[state], &timer);
1578 dhtLog("udpListen(): retry: %d\n", timeoutCntr);
1582 outBuffer[0] = WHO_IS_LEADER_CMD;
1583 udpSend(outBuffer, 1, seed);
1586 outBuffer[0] = JOIN_REQ;
1587 write4(&outBuffer[1], myHostData.maxKeyCapacity);
1588 udpSend(outBuffer, 5, leader);
1591 outBuffer[0] = ELECT_LEADER_CMD;
1592 write4(&outBuffer[1], electionOriginator);
1593 udpSendAll(outBuffer, 5);
1596 if (electionOriginator == myHostData.ipAddr)
1597 { //retry notify leader
1598 outBuffer[0] = CONGRATS_CMD;
1599 write2(&outBuffer[1], numHosts);
1600 memcpy(&outBuffer[3], hostArray, sizeof(struct hostData)
1602 udpSend(outBuffer, 3 + sizeof(struct hostData) * numHosts,
1607 outBuffer[0] = ELECT_LEADER_RES;
1609 write2(&outBuffer[2], numHosts);
1610 memcpy(&outBuffer[4], hostArray, sizeof(struct hostData)
1612 udpSend(outBuffer, 4 + sizeof(struct hostData) * numHosts,
1616 case REBUILD0_STATE:
1617 outBuffer[0] = REBUILD_REQ;
1618 udpSend(outBuffer, 1, leader);
1620 case REBUILD1_STATE:
1621 outBuffer[0] = JOIN_REQ;
1622 write4(&outBuffer[1], myHostData.maxKeyCapacity);
1623 udpSend(outBuffer, 5, leader);
1625 case REBUILD5_STATE:
1626 outBuffer[0] = FILL_DHT_RES;
1627 udpSend(outBuffer, 1, leader);
1629 case LEAD_REBUILD1_STATE:
1630 outBuffer[0] = REBUILD_CMD;
1631 udpSendAll(outBuffer, 1);
1633 case LEAD_REBUILD2_STATE:
1634 outBuffer[0] = DHT_UPDATE_CMD;
1635 write2(&outBuffer[1], numHosts);
1636 write2(&outBuffer[3], numBlocks);
1637 memcpy(&outBuffer[5], hostArray, numHosts
1638 * sizeof(struct hostData));
1639 memcpy(&outBuffer[5+numHosts*sizeof(struct hostData)],
1640 blockOwnerArray, numBlocks*2);
1641 udpSendAll(outBuffer, 5 + sizeof(struct hostData) * numHosts
1644 case LEAD_REBUILD3_STATE:
1645 outBuffer[0] = FILL_DHT_CMD;
1646 udpSendAll(outBuffer, 1);
1648 case LEAD_REBUILD4_STATE:
1649 outBuffer[0] = RESUME_NORMAL_CMD;
1650 udpSendAll(outBuffer, 1);
1652 case EXIT1_STATE: //TODO...
1655 case LEAD_NORMAL1_STATE:
1656 case LEAD_NORMAL2_STATE:
1657 case REBUILD2_STATE:
1658 case REBUILD3_STATE:
1659 case REBUILD4_STATE:
1660 case EXIT2_STATE: //we shouldn't get here
1666 dhtLog("udpListen(): timed out in state %s after %d retries\n",
1667 state_names[state], timeoutCntr);
1671 setState(EXIT2_STATE);
1673 case LEAD_NORMAL2_STATE:
1674 setState(LEAD_REBUILD1_STATE);
1675 outBuffer[0] = REBUILD_CMD;
1676 udpSendAll(outBuffer, 1);
1679 dhtLog("removing unresponsive hosts, before:\n");
1681 removeUnresponsiveHosts();
1684 setState(ELECT2_STATE);
1685 if (electionOriginator == myHostData.ipAddr)
1687 leader = hostArray[0].ipAddr;
1688 if (leader == myHostData.ipAddr)
1690 dhtLog("I am the leader!\n");
1691 setState(LEAD_REBUILD1_STATE);
1692 outBuffer[0] = REBUILD_CMD;
1693 udpSendAll(outBuffer, 1);
1697 outBuffer[0] = CONGRATS_CMD;
1698 write2(&outBuffer[1], numHosts);
1699 memcpy(&outBuffer[3], hostArray, sizeof(struct hostData)
1701 udpSend(outBuffer, 3 + sizeof(struct hostData) * numHosts,
1707 outBuffer[0] = ELECT_LEADER_RES;
1709 write2(&outBuffer[2], numHosts);
1710 memcpy(&outBuffer[4], hostArray, sizeof(struct hostData)
1712 udpSend(outBuffer, 4 + sizeof(struct hostData) * numHosts,
1718 case REBUILD0_STATE:
1719 case REBUILD1_STATE:
1720 case REBUILD2_STATE:
1721 case REBUILD3_STATE:
1722 case REBUILD4_STATE:
1723 case REBUILD5_STATE:
1724 case LEAD_REBUILD1_STATE:
1725 case LEAD_REBUILD2_STATE:
1726 case LEAD_REBUILD3_STATE:
1727 case LEAD_REBUILD4_STATE:
1729 electionOriginator = myHostData.ipAddr;
1730 setState(ELECT1_STATE);
1731 outBuffer[0] = ELECT_LEADER_CMD;
1732 write4(&outBuffer[1], myHostData.ipAddr); //originator = me
1733 udpSendAll(outBuffer, 5);
1736 setState(EXIT2_STATE);
1739 case LEAD_NORMAL1_STATE:
1740 case EXIT2_STATE: //we shouldn't get here
1746 if (state != oldState)
1747 pthread_cond_broadcast(&stateCond);
1748 pthread_mutex_unlock(&stateMutex);