Added: broadcast and wait function, leader discovery, initialization of
[IRC.git] / Robust / src / Runtime / DSTM / interface / dht.c
1 #include "dht.h"
2
3 #ifdef SIMPLE_DHT
4
5 #include <arpa/inet.h>
6
7 #define NUM_HOSTS 4
8 #define OIDS_PER_HOST 0x40000000
9
10 //set these to your IP addresses
11 unsigned int hosts[NUM_HOSTS] = {
12         0xc0a802c8,
13         0xc0a802c9,
14         0xc0a802ca,
15         0xc0a802cb,
16 };
17
18 //does nothing
19 void dhtInit(unsigned int maxKeyCapaciy)
20 {       return;}
21
22 //does nothing
23 void dhtExit()
24 {       return;}
25
26 //does nothing, returns 0
27 int dhtInsert(unsigned int key, unsigned int val)
28 {       return 0;}
29
30 //does nothing, returns 0
31 int dhtRemove(unsigned int key)
32 {       return 0;}
33
34 //returns 0 if successful and copies val into *val,
35 // 1 if key not found, -1 if an error occurred
36 int dhtSearch(unsigned int key, unsigned int *val)
37 {
38         *val = hosts[key / OIDS_PER_HOST];
39         return 0;
40 }
41
42 #else
43
44 #include <netinet/in.h>
45 #include <arpa/inet.h>
46 #include <sys/types.h>
47 #include <sys/socket.h>
48 #include <sys/ioctl.h>
49 #include <stdio.h>
50 #include <string.h>
51 #include <stdlib.h>
52 #include <unistd.h>
53 #include <pthread.h>
54 #include <sys/time.h>
55 #include <sys/poll.h>
56 #include <netdb.h>
57 #include <net/if.h>
58 #include <linux/sockios.h>
59 #include "clookup.h" //this works for now, do we need anything better?
60
61 #define BUFFER_SIZE 512 //maximum message size
62 #define UDP_PORT 2157
63 #define TCP_PORT 2157
64 #define BACKLOG 10 //max pending tcp connections
65 #define TIMEOUT_MS 500
66 #define MAX_RETRIES 3
67 #define INIT_HOST_ALLOC 16
68 #define INIT_BLOCK_NUM 64
69 #define DEFAULT_INTERFACE "eth0"
70 #define DHT_LOG "dht.log"
71
72 enum {
73         INSERT_CMD,
74         INSERT_RES,
75         REMOVE_CMD,
76         REMOVE_RES,
77         SEARCH_CMD,
78         SEARCH_RES,
79         FIND_LEADER_CMD,
80         FIND_LEADER_RES,
81
82         REBUILD_REQ,
83         REBUILD_CMD,
84         JOIN_REQ,
85         JOIN_RES,
86         DHT_INFO_CMD,
87         DHT_INFO_RES,
88         FILL_DHT_CMD,
89         FILL_DHT_RES,
90         REBUILD_DONE_INFO
91 };
92
93
94 //status codes
95 enum {
96         INSERT_OK,
97         INSERT_ERROR,
98         REMOVE_OK,
99         REMOVE_ERROR,
100         KEY_FOUND,
101         KEY_NOT_FOUND,
102         NOT_KEY_OWNER
103 };
104
105 struct hostData {
106         unsigned int ipAddr;
107         unsigned int maxKeyCapacity;
108 };
109
110 struct insertCmd {
111         unsigned int msgType:8;
112         unsigned int unused:24;
113         unsigned int key;
114         unsigned int val;
115 };
116
117 struct removeCmd {
118         unsigned int msgType:8;
119         unsigned int unused:24;
120         unsigned int key;
121 };
122
123 struct searchCmd {
124         unsigned int msgType:8;
125         unsigned int unused:24;
126         unsigned int key;
127 };
128
129 struct insertRes {
130         unsigned int msgType:8;
131         unsigned int unused:24;
132         unsigned int status;
133 };
134
135 struct removeRes {
136         unsigned int msgType:8;
137         unsigned int unused:24;
138         unsigned int status;
139 };
140
141 struct searchRes {
142         unsigned int msgType:8;
143         unsigned int unused:24;
144         unsigned int status;
145         unsigned int val;
146 };
147
148
149 //TODO: leave message, rebuild message...
150
151 FILE *logfile;
152 unsigned int leader; //ip address of leader
153 struct hostData myHostData;
154 /*----DHT data----*/
155 unsigned int numHosts;
156 struct hostData *hostArray;
157 unsigned int numBlocks;
158 unsigned int *blockOwnerArray;
159 /*----end DHT data----*/
160
161 //return my IP address
162 unsigned int getMyIpAddr();
163 //sends broadcast to discover leader
164 unsigned int getLeadersIpAddr();
165 //UDP server
166 void *udpListen();
167 //TCP server
168 void *tcpListen();
169 //TCP connection handler
170 void *tcpAccept(void *);
171 //returns number of bytes received in resBuffer, or -1 if an error occurred
172 int udpSendWaitForResponse(unsigned int dest_ip, unsigned short dest_port,
173         void *msg, unsigned int msglen, void *resBuffer, unsigned int resBufferSize,
174         unsigned int timeout, unsigned int numRetries);
175 //returns number of bytes received in resBuffer, or -1 if an error occurred
176 int udpBroadcastWaitForResponse(unsigned int *reply_ip,
177         unsigned short dest_port, void *msg, unsigned int msglen, void *resBuffer,
178         unsigned int resBufferSize, unsigned int timeout, unsigned int numRetries);
179 //just UDP it
180 int sendNoWait(unsigned int dest_ip, unsigned short dest_port, void *msg,
181         unsigned int msglen);
182 //right now this hashes the key into a block and returns the block owner
183 unsigned int getKeyOwner(unsigned int key);
184 //simple hash
185 unsigned int hash(unsigned int x);
186 //initiates TCP connection with leader, gets DHT data
187 int getDHTdata();
188 //outputs readable DHT data to outfile
189 void writeDHTdata(FILE *outfile);
190
191 void dhtInit(unsigned int maxKeyCapacity)
192 {
193         unsigned int myMessage;
194         int bytesReceived;
195         int i;
196         int ret;
197
198 #ifdef DHT_LOG
199         logfile = fopen(DHT_LOG, "w");
200 #endif
201
202         myHostData.ipAddr = getMyIpAddr();
203         myHostData.maxKeyCapacity = maxKeyCapacity;
204
205         numHosts = numBlocks = 0;
206         hostArray = NULL;
207         blockOwnerArray = NULL;
208
209         leader = getLeadersIpAddr();
210
211         if (leader == 0)
212         { //no response: I am the first
213                 leader = getMyIpAddr();
214
215                 numHosts = 1;
216                 hostArray = calloc(numHosts, sizeof(struct hostData));
217                 hostArray[0] = myHostData;
218                 numBlocks = INIT_BLOCK_NUM;
219                 blockOwnerArray = calloc(numBlocks, sizeof(unsigned int));
220                 for (i = 0; i < numBlocks; i++)
221                         blockOwnerArray[i] = 0;
222         }
223         else
224         {
225                 //get DHT data from leader
226                 ret = getDHTdata();
227
228                 //TODO: actually, just initiate a rebuild here instead
229         }
230
231         //start servers
232         pthread_t threadUdpListen, threadTcpListen;
233         pthread_create(&threadUdpListen, NULL, udpListen, NULL);
234         pthread_create(&threadTcpListen, NULL, tcpListen, NULL);
235         
236         return;
237 }
238
239 void dhtExit()
240 {
241         fclose(logfile);
242 }
243
244 int dhtInsert(unsigned int key, unsigned int val)
245 {
246         unsigned int dest_ip = getKeyOwner(key);
247         struct insertCmd myMessage;
248         struct insertRes response;
249         int bytesReceived;
250
251         myMessage.msgType = INSERT_CMD;
252         myMessage.key = key;
253         myMessage.val = val;
254         
255         bytesReceived = udpSendWaitForResponse(dest_ip, UDP_PORT, (void *)&myMessage,
256                 sizeof(struct insertCmd), (void *)&response, sizeof(struct insertRes),
257                 TIMEOUT_MS, MAX_RETRIES);
258         if (bytesReceived == sizeof(struct insertRes))
259         {
260                 if (response.msgType == INSERT_RES)
261                 {
262                         if (response.status == INSERT_OK)
263                                 return 0;
264 //                      if (response.status == NOT_KEY_OWNER)
265                 }
266         }
267 //TODO: find owner and try again, request rebuild if necessary
268         return -1; //this function should be robust enough to always return 0
269 }
270
271 int dhtRemove(unsigned int key)
272 {
273         unsigned int dest_ip = getKeyOwner(key);
274         struct removeCmd myMessage;
275         struct removeRes response;
276         int bytesReceived;
277         
278         myMessage.msgType = REMOVE_CMD;
279         myMessage.key = key;
280
281         bytesReceived = udpSendWaitForResponse(dest_ip, UDP_PORT, (void *)&myMessage,
282                 sizeof(struct removeCmd), (void *)&response, sizeof(struct removeRes),
283                 TIMEOUT_MS, MAX_RETRIES);
284         if (bytesReceived == sizeof(struct removeRes))
285         {
286                 if (response.msgType == REMOVE_RES)
287                 {
288                         if (response.status == REMOVE_OK)
289                                 return 0;
290 //                      if (response.status == NOT_KEY_OWNER)
291                 }
292         }
293 //TODO: find owner and try again, request rebuild if necessary
294         return -1; //this function should be robust enough to always return 0
295 }
296
297 int dhtSearch(unsigned int key, unsigned int *val)
298 {
299         unsigned int dest_ip = getKeyOwner(key);
300         struct searchCmd myMessage;
301         struct searchRes response;
302         int bytesReceived;
303
304         myMessage.msgType = SEARCH_CMD;
305         myMessage.key = key;
306
307         bytesReceived = udpSendWaitForResponse(dest_ip, UDP_PORT, (void *)&myMessage,
308                 sizeof(struct searchCmd), (void *)&response, sizeof(struct searchRes),
309                 TIMEOUT_MS, MAX_RETRIES);
310         if (bytesReceived == sizeof(struct searchRes))
311         {
312                 if (response.msgType == SEARCH_RES)
313                 {
314                         if (response.status == KEY_FOUND)
315                         {
316                                 *val = response.val;
317                                 return 0;
318                         }
319                         if (response.status == KEY_NOT_FOUND)
320                         {
321                                 return 1;
322                         }
323 //                      if (response.status == NOT_KEY_OWNER)
324                 }
325         }
326 //TODO: find owner and try again, request rebuild if necessary
327         return -1; //this function should be robust enough to always return 0 or 1
328 }
329
330
331
332 //use UDP for messages that are frequent and short
333 void *udpListen()
334 {
335         struct sockaddr_in myAddr;
336         struct sockaddr_in clientAddr;
337         int sock;
338         socklen_t socklen = sizeof(struct sockaddr_in);
339         char buffer[BUFFER_SIZE];
340         ssize_t bytesReceived;
341         struct insertCmd *insertCmdPtr;
342         struct removeCmd *removeCmdPtr;
343         struct searchCmd *searchCmdPtr;
344         struct insertRes *insertResPtr;
345         struct removeRes *removeResPtr;
346         struct searchRes *searchResPtr;
347         char replyBuffer[BUFFER_SIZE];
348         struct timeval now;
349
350         chashtable_t *myHashTable = chashCreate(HASH_SIZE, LOADFACTOR);
351
352         if ((sock = socket(AF_INET, SOCK_DGRAM, IPPROTO_UDP)) == -1)
353         {
354                 perror("udpListen():socket()");
355                 pthread_exit(NULL);
356         }
357         
358         bzero(&myAddr, socklen);
359         myAddr.sin_family=AF_INET;
360         myAddr.sin_addr.s_addr=INADDR_ANY;
361         myAddr.sin_port=htons(UDP_PORT);
362
363         if (bind(sock, (struct sockaddr *)&myAddr, socklen) == -1)
364         {
365                 perror("udpListen():bind()");
366                 pthread_exit(NULL);
367         }
368 #ifdef DHT_LOG
369         fprintf(logfile,"udpListen(): listening on port %d\n", UDP_PORT);
370         fflush(logfile);
371 #endif
372         while(1)
373         {
374                 if ((bytesReceived = recvfrom(sock, buffer, BUFFER_SIZE, 0,
375                         (struct sockaddr *)&clientAddr, &socklen)) == -1)
376                 {
377                         perror("udpListen():recvfrom()");
378                         break;
379                 }
380                 if (bytesReceived == 0)
381                 {
382 #ifdef DHT_LOG
383                         fprintf(logfile,"udpListen(): recvfrom() returned 0\n");
384                         fflush(logfile);
385 #endif
386                         break;
387                 }
388                 gettimeofday(&now, NULL);
389 #ifdef DHT_LOG
390                 fprintf(logfile,"udpListen(): message received:%ds,%dus\n", now.tv_sec,
391                         now.tv_usec);
392                 fprintf(logfile,"udpListen(): received %d bytes from %s:%d\n",
393                         bytesReceived,inet_ntoa(clientAddr.sin_addr),htons(clientAddr.sin_port));
394                 fflush(logfile);
395 #endif
396
397                 switch (buffer[0])
398                 {
399                         case INSERT_CMD:
400                                 if (bytesReceived != sizeof(struct insertCmd))
401                                 {
402 #ifdef DHT_LOG
403                                         fprintf(logfile, "udpListen(): ERROR: incorrect message size\n");
404                                         fflush(logfile);
405 #endif
406                                         break;
407                                 }
408                                 insertCmdPtr = (struct insertCmd *)buffer;
409 #ifdef DHT_LOG
410                                 fprintf(logfile, "udpListen(): Insert: key=%d, val=%d\n",
411                                         insertCmdPtr->key, insertCmdPtr->val);
412                                 fflush(logfile);
413 #endif
414                                 insertResPtr = (struct insertRes *)replyBuffer;
415                                 insertResPtr->msgType = INSERT_RES;
416                                 if (getKeyOwner(insertCmdPtr->key) == myHostData.ipAddr)
417                                 {
418                                         //note: casting val to void * in order to conform to API
419                                         if(chashInsert(myHashTable, insertCmdPtr->key,
420                                                         (void *)insertCmdPtr->val) == 0)
421                                                 insertResPtr->status = INSERT_OK;
422                                         else
423                                                 insertResPtr->status = INSERT_ERROR;
424                                 }
425                                 else
426                                 {
427                                         insertResPtr->status = NOT_KEY_OWNER;;
428                                 }
429                                 if (sendto(sock, (void *)insertResPtr, sizeof(struct insertRes), 0,
430                                         (struct sockaddr *)&clientAddr, socklen) == -1)
431                                 {
432                                         perror("udpListen():sendto()");
433                                 }
434                                 break;
435                         case REMOVE_CMD:
436                                 if (bytesReceived != sizeof(struct removeCmd))
437                                 {
438 #ifdef DHT_LOG
439                                         fprintf(logfile, "udpListen(): ERROR: incorrect message size\n");
440                                         fflush(logfile);
441 #endif
442                                         break;
443                                 }
444                                 removeCmdPtr = (struct removeCmd *)buffer;
445 #ifdef DHT_LOG
446                                 fprintf(logfile,"udpListen(): Remove: key=%d\n", removeCmdPtr->key);
447                                 fflush(logfile);
448 #endif
449                                 removeResPtr = (struct removeRes *)replyBuffer;
450                                 removeResPtr->msgType = REMOVE_RES;
451                                 if (getKeyOwner(removeCmdPtr->key) == myHostData.ipAddr)
452                                 {
453                                         //note: casting val to void * in order to conform to API
454                                         if(chashRemove(myHashTable, removeCmdPtr->key) == 0)
455                                                 removeResPtr->status = INSERT_OK;
456                                         else
457                                                 removeResPtr->status = INSERT_ERROR;
458                                 }
459                                 else
460                                 {
461                                         removeResPtr->status = NOT_KEY_OWNER;
462                                 }
463                                 if (sendto(sock, (void *)removeResPtr, sizeof(struct removeRes), 0,
464                                         (struct sockaddr *)&clientAddr, socklen) == -1)
465                                 {
466                                         perror("udpListen():sendto()");
467                                 }
468                                 break;
469                         case SEARCH_CMD:
470                                 if (bytesReceived != sizeof(struct searchCmd))
471                                 {
472 #ifdef DHT_LOG
473                                         fprintf(logfile,"udpListen(): ERROR: incorrect message size\n");
474                                         fflush(logfile);
475 #endif
476                                         break;
477                                 }
478                                 searchCmdPtr = (struct searchCmd *)buffer;
479 #ifdef DHT_LOG
480                                         fprintf(logfile,"udpListen(): Search: key=%d\n",searchCmdPtr->key);
481                                         fflush(logfile);
482 #endif
483                                 searchResPtr = (struct searchRes *)replyBuffer;
484                                 searchResPtr->msgType = SEARCH_RES;
485                                 if (getKeyOwner(searchCmdPtr->key) == myHostData.ipAddr)
486                                 {
487                                         //note: casting val to void * in order to conform to API
488                                         if((searchResPtr->val = (unsigned int)chashSearch(myHashTable,
489                                                         searchCmdPtr->key)) == 0)
490                                                 searchResPtr->status = KEY_NOT_FOUND;
491                                         else
492                                                 searchResPtr->status = KEY_FOUND;
493                                 }
494                                 else
495                                 {
496                                         searchResPtr->status = NOT_KEY_OWNER;
497                                 }
498                                 if (sendto(sock, (void *)searchResPtr, sizeof(struct searchRes), 0,
499                                         (struct sockaddr *)&clientAddr, socklen) == -1)
500                                 {
501                                         perror("udpListen():sendto()");
502                                 }
503                                 break;
504                         case FIND_LEADER_CMD:
505                                 if (bytesReceived != sizeof(char))
506                                 {
507 #ifdef DHT_LOG
508                                         fprintf(logfile, "udpListen(): ERROR: incorrect message size\n");
509                                         fflush(logfile);
510 #endif
511                                         break;
512                                 }
513                                 if (leader == getMyIpAddr())
514                                 {
515                                         replyBuffer[0] = FIND_LEADER_RES;
516                                         if(sendto(sock, (void *)replyBuffer, sizeof(char), 0,
517                                                 (struct sockaddr *)&clientAddr, socklen) == -1)
518                                         {
519                                                 perror("udpListen():sendto");
520                                         }
521                                 }
522                                 break;
523                         default:
524 #ifdef DHT_LOG
525                                 fprintf(logfile,"udpListen(): ERROR: Unknown message type\n");
526                                 fflush(logfile);
527 #endif
528                 }
529         }
530 }
531
532 int udpSendWaitForResponse(unsigned int dest_ip, unsigned short dest_port,
533         void *msg, unsigned int msglen, void *resBuffer, unsigned int resBufferSize,
534         unsigned int timeout, unsigned int numRetries)
535 {
536         struct sockaddr_in server_addr;
537         struct sockaddr_in ack_addr;
538         socklen_t socklen = sizeof(struct sockaddr_in);
539         struct pollfd pollsock;
540         struct timeval now;
541         int retval;
542         int i;
543         ssize_t bytesReceived;
544
545         bzero((char *) &server_addr, sizeof(server_addr));
546         server_addr.sin_family = AF_INET;
547         server_addr.sin_port = htons(dest_port);
548         server_addr.sin_addr.s_addr = htonl(dest_ip);
549
550         if ((pollsock.fd = socket(PF_INET, SOCK_DGRAM, IPPROTO_UDP)) == -1)
551         {
552                 perror("udpSendWaitForResponse():socket()");
553                 return -1;
554         }
555         
556         pollsock.events = POLLIN;
557         
558         for (i = 0; i < MAX_RETRIES; i++)
559         {
560 #ifdef DHT_LOG
561                 if (i > 0)
562                         fprintf(logfile,"udpSendWaitForResponse(): trying again, count: %d\n",
563                                 i+1);
564                 fflush(logfile);
565 #endif
566                 if (sendto(pollsock.fd, msg, msglen, 0, (struct sockaddr *)&server_addr,
567                         socklen) == -1)
568                 {
569                         perror("udpSendWaitForResponse():sendto");
570                         return -1;
571                 }
572 #ifdef DHT_LOG
573                 gettimeofday(&now, NULL);
574                 fprintf(logfile,"udpSendWaitForResponse(): message sent:%ds,%dus\n",
575                         now.tv_sec, now.tv_usec);
576                 fflush(logfile);
577 #endif
578                 retval = poll(&pollsock, 1, timeout);
579                 if (retval !=0)
580                 {
581                         bytesReceived = recvfrom(pollsock.fd, resBuffer, resBufferSize, 0,
582                                 (struct sockaddr *)&ack_addr, &socklen);
583                         if ((ack_addr.sin_addr.s_addr == server_addr.sin_addr.s_addr)
584                         && (ack_addr.sin_port == server_addr.sin_port))
585                         {
586                                 close(pollsock.fd);
587 #ifdef DHT_LOG
588                                 gettimeofday(&now, NULL);
589                                 fprintf(logfile,"udpSendWaitForResponse(): received response:%ds,%dus\n", now.tv_sec, now.tv_usec);
590                                 fflush(logfile);
591 #endif
592                                 return bytesReceived;
593                         }
594                 }
595         }
596         close(pollsock.fd);
597 #ifdef DHT_LOG
598         gettimeofday(&now, NULL);
599         printf("udpSendWaitForResponse(): timed out, no ack:%ds,%dus\n",
600                 now.tv_sec, now.tv_usec);
601         fflush(logfile);
602 #endif
603         return -1;
604 }
605
606 int udpBroadcastWaitForResponse(unsigned int *reply_ip,
607         unsigned short dest_port, void *msg, unsigned int msglen, void *resBuffer,
608         unsigned int resBufferSize, unsigned int timeout, unsigned int numRetries)
609 {
610         struct sockaddr_in server_addr;
611         struct sockaddr_in ack_addr;
612         socklen_t socklen = sizeof(struct sockaddr_in);
613         struct pollfd pollsock;
614         struct timeval now;
615         int retval;
616         int i;
617         ssize_t bytesReceived;
618         int on;
619
620         bzero((char *) &server_addr, sizeof(server_addr));
621         server_addr.sin_family = AF_INET;
622         server_addr.sin_port = htons(dest_port);
623         server_addr.sin_addr.s_addr = htonl(0xFFFFFFFF);
624
625         if ((pollsock.fd = socket(PF_INET, SOCK_DGRAM, IPPROTO_UDP)) == -1)
626         {
627                 perror("udpBroadcastWaitForResponse():socket()");
628                 return -1;
629         }
630
631         on = 1;
632         if (setsockopt(pollsock.fd, SOL_SOCKET, SO_BROADCAST, &on, sizeof(on)) == -1)
633         {
634                 perror("udpBroadcastWaitForResponse():setsockopt()");
635                 return -1;
636         }
637
638         pollsock.events = POLLIN;
639         
640         for (i = 0; i < MAX_RETRIES; i++)
641         {
642 #ifdef DHT_LOG
643                 if (i > 0)
644                         fprintf(logfile,"udpBroadcastWaitForResponse(): trying again, count: %d\n", i+1);
645                         fflush(logfile);
646 #endif
647                 if (sendto(pollsock.fd, msg, msglen, 0, (struct sockaddr *)&server_addr,
648                         socklen) == -1)
649                 {
650                         perror("udpBroadcastWaitForResponse():sendto()");
651                         return -1;
652                 }
653 #ifdef DHT_LOG
654                 gettimeofday(&now, NULL);
655                 fprintf(logfile,"udpBroadcastWaitForResponse(): message sent:%ds,%dus\n",
656                         now.tv_sec, now.tv_usec);
657                 fflush(logfile);
658 #endif
659                 retval = poll(&pollsock, 1, timeout);
660                 if (retval !=0)
661                 {
662                         bytesReceived = recvfrom(pollsock.fd, resBuffer, resBufferSize, 0,
663                                 (struct sockaddr *)&ack_addr, &socklen);
664                         close(pollsock.fd);
665                         *reply_ip = htonl(ack_addr.sin_addr.s_addr);
666 #ifdef DHT_LOG
667                         gettimeofday(&now, NULL);
668                         fprintf(logfile,"udpBroadcastWaitForResponse(): received response:%ds,%dus\n", now.tv_sec, now.tv_usec);
669                         fflush(logfile);
670 #endif
671                         return bytesReceived;
672                 }
673         }
674         close(pollsock.fd);
675 #ifdef DHT_LOG
676         gettimeofday(&now, NULL);
677         fprintf(logfile,"udpBroadcastWaitForResponse(): timed out, no ack:%ds,%dus\n",
678                 now.tv_sec, now.tv_usec);
679         fflush(logfile);
680 #endif
681         return -1;
682 }
683
684 // use TCP for potentially large and/or important data transfer
685 void *tcpListen()
686 {
687         struct sockaddr_in myAddr;
688         struct sockaddr_in clientAddr;
689         int sockListen, sockAccept;
690         socklen_t socklen = sizeof(struct sockaddr_in);
691         pthread_t threadTcpAccept;
692
693         sockListen = socket(AF_INET, SOCK_STREAM, 0);
694         if (sockListen == -1)
695         {
696                 perror("tcpListen():socket()");
697                 pthread_exit(NULL);
698         }
699
700         myAddr.sin_family = AF_INET;
701         myAddr.sin_port = htons(TCP_PORT);
702         myAddr.sin_addr.s_addr = INADDR_ANY;
703         memset(&(myAddr.sin_zero), '\0', 8);
704
705         if (bind(sockListen, (struct sockaddr *)&myAddr, socklen) == -1)
706         {
707                 perror("tcpListen():socket()");
708                 pthread_exit(NULL);
709         }
710
711         if (listen(sockListen, BACKLOG) == -1)
712         {
713                 perror("tcpListen():listen()");
714                 pthread_exit(NULL);
715         }
716
717 #ifdef DHT_LOG
718         fprintf(logfile,"tcpListen(): listening on port %d\n", TCP_PORT);
719         fflush(logfile);
720 #endif
721
722         while(1)
723         {
724                 sockAccept = accept(sockListen, (struct sockaddr *)&clientAddr, &socklen);
725                 pthread_create(&threadTcpAccept, NULL, tcpAccept, (void *)sockAccept);
726         }
727 }
728
729 void *tcpAccept(void *arg)
730 {
731         int sockAccept = (int)arg;
732         int bytesReceived;
733         char msgType;
734
735 #ifdef DHT_LOG
736         fprintf(logfile, "tcpAccept(): accepted tcp connection, file descriptor: %d\n", sockAccept);
737         fflush(logfile);
738 #endif
739
740         bytesReceived = recv(sockAccept, &msgType, sizeof(char), 0);
741         if (bytesReceived == -1)
742         {
743                 perror("tcpAccept():recv()");
744         }
745         else if (bytesReceived == 0)
746         {
747 #ifdef DHT_LOG
748                 fprintf(logfile, "tcpAccept(): bytesReceived = 0\n", sockAccept);
749                 fflush(logfile);
750 #endif
751         }
752         else
753         {
754                 switch (msgType)
755                 {
756                         case DHT_INFO_CMD:
757                                 if (send(sockAccept, &numHosts, sizeof(numHosts), 0) == -1)
758                                 {
759                                         perror("tcpAccept():send()");
760                                         break;
761                                 }
762                                 if (send(sockAccept, &numBlocks, sizeof(numBlocks), 0) == -1)
763                                 {
764                                         perror("tcpAccept():send()");
765                                         break;
766                                 }
767                                 if (send(sockAccept, hostArray, numHosts*sizeof(struct hostData),
768                                                 0) == -1)
769                                 {
770                                         perror("tcpAccept():send()");
771                                         break;
772                                 }
773                                 if (send(sockAccept, blockOwnerArray, numBlocks*sizeof(unsigned int),
774                                                 0) == -1)
775                                 {
776                                         perror("tcpAccept():send()");
777                                         break;
778                                 }
779                                 break;
780                         default:
781 #ifdef DHT_LOG
782                                 fprintf(logfile, "tcpAccept(): unrecognized msg type\n");
783                                 fflush(logfile);
784 #endif
785                 }
786         }
787
788         if (close(sockAccept) == -1)
789         {
790                 perror("tcpAccept():close()");
791         }
792
793 #ifdef DHT_LOG
794         fprintf(logfile, "tcpAccept(): closed tcp connection, file descriptor: %d\n",
795                 sockAccept);
796         fflush(logfile);
797 #endif
798
799         pthread_exit(NULL);
800 }
801
802 unsigned int getKeyOwner(unsigned int key)
803 {
804         return hostArray[blockOwnerArray[hash(key)]].ipAddr;
805 }
806
807 unsigned int getMyIpAddr()
808 {       
809         int sock;
810         struct ifreq interfaceInfo;
811         struct sockaddr_in *myAddr = (struct sockaddr_in *)&interfaceInfo.ifr_addr;
812
813         memset(&interfaceInfo, 0, sizeof(struct ifreq));
814
815         if((sock = socket(AF_INET, SOCK_STREAM, 0)) < 0)
816         {
817                 perror("getMyIpAddr():socket()");
818                 return 1;
819         }
820
821         strcpy(interfaceInfo.ifr_name, DEFAULT_INTERFACE);
822         myAddr->sin_family = AF_INET;
823         
824         if(ioctl(sock, SIOCGIFADDR, &interfaceInfo) != 0)
825         {
826                 perror("getMyIpAddr():ioctl()");
827                 return 1;
828         }
829
830         return ntohl(myAddr->sin_addr.s_addr);
831 }
832
833 unsigned int getLeadersIpAddr()
834 {
835         unsigned int reply_ip;
836         int bytesReceived;
837         char myMessage;
838         char response;
839
840 #ifdef DHT_LOG
841         fprintf(logfile, "getLeadersIpAddr(): broadcasting...\n");
842         fflush(logfile);
843 #endif
844
845         myMessage = FIND_LEADER_CMD;
846
847         bytesReceived = udpBroadcastWaitForResponse(&reply_ip, UDP_PORT,
848                 (void *)&myMessage, sizeof(myMessage), (void *)&response,
849                 sizeof(response), TIMEOUT_MS, MAX_RETRIES);
850
851         if (bytesReceived == -1)
852         {
853 #ifdef DHT_LOG
854         fprintf(logfile, "getLeadersIpAddr(): no response\n");
855         fflush(logfile);
856 #endif
857                 return 0;
858         }
859         else if (response == FIND_LEADER_RES)
860         {
861 #ifdef DHT_LOG
862         struct in_addr reply_addr;
863         reply_addr.s_addr = htonl(reply_ip);
864         fprintf(logfile, "getLeadersIpAddr(): leader found:%s\n",
865                                         inet_ntoa(reply_addr));
866         fflush(logfile);
867 #endif
868                 return reply_ip;
869         }
870         else
871         {
872 #ifdef DHT_LOG
873         fprintf(logfile, "getLeadersIpAddr(): unexpected response\n");
874         fflush(logfile);
875 #endif
876                 return 0;
877         }
878 }
879
880 int getDHTdata()
881 {
882         struct sockaddr_in leader_addr;
883         int sock;
884         char msg;
885         int bytesReceived;
886
887         if ((sock = socket(AF_INET, SOCK_STREAM, 0)) == -1)
888         {
889                 perror("getDHTdata():socket()");
890                 return -1;
891         }
892
893         bzero((char *)&leader_addr, sizeof(leader_addr));
894         leader_addr.sin_family = AF_INET;
895         leader_addr.sin_port = htons(TCP_PORT);
896         leader_addr.sin_addr.s_addr = htonl(leader);
897
898         if (connect(sock, (struct sockaddr *)&leader_addr, sizeof(leader_addr)) == -1)
899         {
900                 perror("getDHTdata():connect()");
901                 close(sock);
902                 return -1;
903         }
904         msg = DHT_INFO_CMD;
905         if (send(sock, &msg, sizeof(char), 0) == -1)
906         {
907                 perror("getDHTdata():send()");
908                 close(sock);
909                 return -1;
910         }
911         bytesReceived = recv(sock, &numHosts, sizeof(numHosts), 0);
912         if (bytesReceived == -1)
913         {
914                 perror("getDHTdata():recv()");
915                 close(sock);
916                 return -1;
917         }
918         if (bytesReceived != sizeof(numHosts))
919         {
920 #ifdef DHT_LOG
921                 fprintf(logfile,"getDHTdata(): ERROR: numHosts not completely received\n");
922                 fflush(logfile);
923                 close(sock);
924                 return -1;
925 #endif
926         }
927         bytesReceived = recv(sock, &numBlocks, sizeof(numBlocks), 0);
928         if (bytesReceived == -1)
929         {
930                 perror("getDHTdata():recv()");
931                 close(sock);
932                 return -1;
933         }
934         if (bytesReceived != sizeof(numBlocks))
935         {
936 #ifdef DHT_LOG
937                 fprintf(logfile,"getDHTdata(): ERROR: numBlocks not completely received\n");
938                 fflush(logfile);
939                 close(sock);
940                 return -1;
941 #endif
942         }
943         if (hostArray != NULL)
944                 free(hostArray);
945         hostArray = calloc(numHosts, sizeof(struct hostData));
946         bytesReceived = recv(sock, hostArray, numHosts*sizeof(struct hostData), 0);
947         if (bytesReceived == -1)
948         {
949                 perror("getDHTdata():recv()");
950                 close(sock);
951                 return -1;
952         }
953         if (bytesReceived != numHosts*sizeof(struct hostData))
954         {
955 #ifdef DHT_LOG
956                 fprintf(logfile,"getDHTdata(): ERROR: hostArray not completely received\n");
957                 fflush(logfile);
958                 close(sock);
959                 return -1;
960 #endif
961         }
962         if (blockOwnerArray != NULL)
963                 free(blockOwnerArray);
964         blockOwnerArray = calloc(numBlocks, sizeof(unsigned int));
965         bytesReceived = recv(sock, blockOwnerArray, numBlocks*sizeof(unsigned int), 0);
966         if (bytesReceived == -1)
967         {
968                 perror("getDHTdata():recv()");
969                 close(sock);
970                 return -1;
971         }
972         if (bytesReceived != numBlocks*sizeof(unsigned int))
973         {
974 #ifdef DHT_LOG
975                 fprintf(logfile,"getDHTdata(): ERROR: blockOwnerArray not completely received\n");
976                 fflush(logfile);
977                 close(sock);
978                 return -1;
979 #endif
980         }
981 #ifdef DHT_LOG
982                 fprintf(logfile,"getDHTdata(): got data:\n");
983                 writeDHTdata(logfile);
984                 fflush(logfile);
985 #endif
986         return 0;
987 }
988
989 unsigned int hash(unsigned int x)
990 {
991         return x % numBlocks;
992 }
993
994 void leadRebuild()
995 {
996         
997 }
998
999 void writeDHTdata(FILE *outfile)
1000 {
1001         int i;
1002         struct in_addr address;
1003         fprintf(outfile,"numHosts=%d,numBlocks=%d\n", numHosts, numBlocks);
1004         fprintf(outfile,"hostArray: index: ipAddr, maxKeyCapacity\n");
1005         for (i = 0; i < numHosts; i++)
1006         {
1007                 address.s_addr = htonl(hostArray[i].ipAddr);
1008                 fprintf(outfile,"%d: %s, %d\n", i, inet_ntoa(address),
1009                         hostArray[i].maxKeyCapacity);
1010         }
1011         fprintf(outfile,"blockOwnerArray: index: blockOwner\n");
1012         for (i = 0; i < numBlocks; i++)
1013         {
1014                 fprintf(outfile,"%d: %d\n", i, blockOwnerArray[i]);
1015         }
1016 }
1017
1018 #endif
1019