Each host currenty joins by initiating a rebuild. To rebuild, a request
[IRC.git] / Robust / src / Runtime / DSTM / interface / dht.c
1 #include "dht.h"
2
3 #ifdef SIMPLE_DHT
4
5 #include <arpa/inet.h>
6
7 #define NUM_HOSTS 4
8 #define OIDS_PER_HOST 0x40000000
9
10 //set these to your IP addresses
11 unsigned int hosts[NUM_HOSTS] = {
12         0xc0a802c8,
13         0xc0a802c9,
14         0xc0a802ca,
15         0xc0a802cb,
16 };
17
18 //does nothing
19 void dhtInit(unsigned int maxKeyCapaciy)
20 {       return;}
21
22 //does nothing
23 void dhtExit()
24 {       return;}
25
26 //does nothing, returns 0
27 int dhtInsert(unsigned int key, unsigned int val)
28 {       return 0;}
29
30 //does nothing, returns 0
31 int dhtRemove(unsigned int key)
32 {       return 0;}
33
34 //returns 0 if successful and copies val into *val,
35 // 1 if key not found, -1 if an error occurred
36 int dhtSearch(unsigned int key, unsigned int *val)
37 {
38         *val = hosts[key / OIDS_PER_HOST];
39         return 0;
40 }
41
42 #else
43
44 #include <netinet/in.h>
45 #include <arpa/inet.h>
46 #include <sys/types.h>
47 #include <sys/socket.h>
48 #include <sys/ioctl.h>
49 #include <stdio.h>
50 #include <string.h>
51 #include <stdlib.h>
52 #include <unistd.h>
53 #include <pthread.h>
54 #include <sys/time.h>
55 #include <sys/poll.h>
56 #include <netdb.h>
57 #include <net/if.h>
58 #include <linux/sockios.h>
59 #include "clookup.h" //this works for now, do we need anything better?
60
61 #define BUFFER_SIZE 512 //maximum message size
62 #define UDP_PORT 2157
63 #define TCP_PORT 2157
64 #define BACKLOG 10 //max pending tcp connections
65 #define TIMEOUT_MS 500
66 #define MAX_RETRIES 3
67 #define INIT_HOST_ALLOC 16
68 #define INIT_BLOCK_NUM 64
69 #define DEFAULT_INTERFACE "eth0"
70 #define DHT_LOG "dht.log"
71
72
73 #define NUM_MSG_TYPES 19
74
75 enum {
76         INSERT_CMD,
77         INSERT_RES,
78         REMOVE_CMD,
79         REMOVE_RES,
80         SEARCH_CMD,
81         SEARCH_RES,
82         FIND_LEADER_CMD,
83         FIND_LEADER_RES,
84         REBUILD_REQ,
85         REBUILD_RES,
86         NOT_LEADER,
87         REBUILD_CMD,
88         JOIN_REQ,
89         JOIN_RES,
90         DHT_INFO_REQ,
91         DHT_INFO_RES,
92         FILL_DHT_CMD,
93         FILL_DHT_RES,
94         REBUILD_DONE_INFO
95 };
96
97 const char *msg_types[NUM_MSG_TYPES] =
98 {
99         "INSERT_CMD",
100         "INSERT_RES",
101         "REMOVE_CMD",
102         "REMOVE_RES",
103         "SEARCH_CMD",
104         "SEARCH_RES",
105         "FIND_LEADER_CMD",
106         "FIND_LEADER_RES",
107         "REBUILD_REQ",
108         "REBUILD_RES",
109         "NOT_LEADER",
110         "REBUILD_CMD",
111         "JOIN_REQ",
112         "JOIN_RES",
113         "DHT_INFO_REQ",
114         "DHT_INFO_RES",
115         "FILL_DHT_CMD",
116         "FILL_DHT_RES",
117         "REBUILD_DONE_INFO"
118 };
119
120 //status codes
121 enum {
122         INSERT_OK,
123         INSERT_ERROR,
124         REMOVE_OK,
125         REMOVE_ERROR,
126         KEY_FOUND,
127         KEY_NOT_FOUND,
128         NOT_KEY_OWNER,
129 };
130
131 struct hostData {
132         unsigned int ipAddr;
133         unsigned int maxKeyCapacity;
134 };
135
136 struct insertCmd {
137         unsigned int msgType:8;
138         unsigned int unused:24;
139         unsigned int key;
140         unsigned int val;
141 };
142
143 struct removeCmd {
144         unsigned int msgType:8;
145         unsigned int unused:24;
146         unsigned int key;
147 };
148
149 struct searchCmd {
150         unsigned int msgType:8;
151         unsigned int unused:24;
152         unsigned int key;
153 };
154
155 struct insertRes {
156         unsigned int msgType:8;
157         unsigned int unused:24;
158         unsigned int status;
159 };
160
161 struct removeRes {
162         unsigned int msgType:8;
163         unsigned int unused:24;
164         unsigned int status;
165 };
166
167 struct searchRes {
168         unsigned int msgType:8;
169         unsigned int unused:24;
170         unsigned int status;
171         unsigned int val;
172 };
173
174 struct rebuildRes {
175         unsigned int msgType:8;
176         unsigned int unused:24;
177         unsigned int status;
178 };
179
180 //TODO: leave message, rebuild message...
181
182 FILE *logfile;
183 unsigned int leader; //ip address of leader
184 struct hostData myHostData;
185 /*----DHT data----*/
186 unsigned int numHosts;
187 struct hostData *hostArray;
188 unsigned int numBlocks;
189 unsigned int *blockOwnerArray;
190 /*----end DHT data----*/
191 pthread_t threadUdpListen;
192 pthread_t threadTcpListen;
193 int udpServerSock;
194 int tcpListenSock;
195
196 //return my IP address
197 unsigned int getMyIpAddr();
198 //sends broadcast to discover leader
199 unsigned int findLeader();
200 //UDP server
201 void *udpListen();
202 //TCP server
203 void *tcpListen();
204 //TCP connection handler
205 void *tcpAccept(void *);
206 //returns number of bytes received in resBuffer, or -1 if an error occurred
207 int udpSendWaitForResponse(unsigned int dest_ip, unsigned short dest_port,
208         void *msg, unsigned int msglen, void *resBuffer, unsigned int resBufferSize,
209         unsigned int timeout, unsigned int numRetries);
210 //returns number of bytes received in resBuffer, or -1 if an error occurred
211 int udpBroadcastWaitForResponse(unsigned int *reply_ip,
212         unsigned short dest_port, void *msg, unsigned int msglen, void *resBuffer,
213         unsigned int resBufferSize, unsigned int timeout, unsigned int numRetries);
214 //just UDP it
215 int sendNoWait(unsigned int dest_ip, unsigned short dest_port, void *msg,
216         unsigned int msglen);
217 //right now this hashes the key into a block and returns the block owner
218 unsigned int getKeyOwner(unsigned int key);
219 //simple hash
220 unsigned int hash(unsigned int x);
221 //initiates TCP connection with leader, gets DHT data
222 int getDHTdata();
223 //outputs readable DHT data to outfile
224 void writeDHTdata(FILE *outfile);
225 void initRebuild();
226 void leadRebuild();
227 void followRebuild();
228
229 void dhtInit(unsigned int maxKeyCapacity)
230 {
231         unsigned int myMessage;
232         int bytesReceived;
233         int i;
234         int ret;
235
236 #ifdef DHT_LOG
237         logfile = fopen(DHT_LOG, "w");
238 #endif
239
240         myHostData.ipAddr = getMyIpAddr();
241         myHostData.maxKeyCapacity = maxKeyCapacity;
242
243         numHosts = numBlocks = 0;
244         hostArray = NULL;
245         blockOwnerArray = NULL;
246
247         pthread_create(&threadUdpListen, NULL, udpListen, NULL);
248         pthread_create(&threadTcpListen, NULL, tcpListen, NULL);
249
250         initRebuild();
251
252 /*      leader = findLeader();
253
254         if (leader == 0)
255         { //no response: I am the first
256                 leader = getMyIpAddr();
257
258                 numHosts = 1;
259                 hostArray = calloc(numHosts, sizeof(struct hostData));
260                 hostArray[0] = myHostData;
261                 numBlocks = INIT_BLOCK_NUM;
262                 blockOwnerArray = calloc(numBlocks, sizeof(unsigned int));
263                 for (i = 0; i < numBlocks; i++)
264                         blockOwnerArray[i] = 0;
265         }
266         else
267         {
268                 //get DHT data from leader
269                 ret = getDHTdata();
270
271                 //TODO: actually, just initiate a rebuild here instead
272         }
273 */
274
275         //start servers
276         
277         return;
278 }
279
280 void dhtExit()
281 {
282         fclose(logfile);
283         pthread_cancel(threadUdpListen);
284         pthread_cancel(threadTcpListen);
285         close(udpServerSock);
286         close(tcpListenSock);
287 }
288
289 int dhtInsert(unsigned int key, unsigned int val)
290 {
291         unsigned int dest_ip = getKeyOwner(key);
292         struct insertCmd myMessage;
293         struct insertRes response;
294         int bytesReceived;
295
296         myMessage.msgType = INSERT_CMD;
297         myMessage.key = key;
298         myMessage.val = val;
299         
300         bytesReceived = udpSendWaitForResponse(dest_ip, UDP_PORT, (void *)&myMessage,
301                 sizeof(struct insertCmd), (void *)&response, sizeof(struct insertRes),
302                 TIMEOUT_MS, MAX_RETRIES);
303         if (bytesReceived == sizeof(struct insertRes))
304         {
305                 if (response.msgType == INSERT_RES)
306                 {
307                         if (response.status == INSERT_OK)
308                                 return 0;
309 //                      if (response.status == NOT_KEY_OWNER)
310                 }
311         }
312 //TODO: find owner and try again, request rebuild if necessary
313         return -1; //this function should be robust enough to always return 0
314 }
315
316 int dhtRemove(unsigned int key)
317 {
318         unsigned int dest_ip = getKeyOwner(key);
319         struct removeCmd myMessage;
320         struct removeRes response;
321         int bytesReceived;
322         
323         myMessage.msgType = REMOVE_CMD;
324         myMessage.key = key;
325
326         bytesReceived = udpSendWaitForResponse(dest_ip, UDP_PORT, (void *)&myMessage,
327                 sizeof(struct removeCmd), (void *)&response, sizeof(struct removeRes),
328                 TIMEOUT_MS, MAX_RETRIES);
329         if (bytesReceived == sizeof(struct removeRes))
330         {
331                 if (response.msgType == REMOVE_RES)
332                 {
333                         if (response.status == REMOVE_OK)
334                                 return 0;
335 //                      if (response.status == NOT_KEY_OWNER)
336                 }
337         }
338 //TODO: find owner and try again, request rebuild if necessary
339         return -1; //this function should be robust enough to always return 0
340 }
341
342 int dhtSearch(unsigned int key, unsigned int *val)
343 {
344         unsigned int dest_ip = getKeyOwner(key);
345         struct searchCmd myMessage;
346         struct searchRes response;
347         int bytesReceived;
348
349         myMessage.msgType = SEARCH_CMD;
350         myMessage.key = key;
351
352         bytesReceived = udpSendWaitForResponse(dest_ip, UDP_PORT, (void *)&myMessage,
353                 sizeof(struct searchCmd), (void *)&response, sizeof(struct searchRes),
354                 TIMEOUT_MS, MAX_RETRIES);
355         if (bytesReceived == sizeof(struct searchRes))
356         {
357                 if (response.msgType == SEARCH_RES)
358                 {
359                         if (response.status == KEY_FOUND)
360                         {
361                                 *val = response.val;
362                                 return 0;
363                         }
364                         if (response.status == KEY_NOT_FOUND)
365                         {
366                                 return 1;
367                         }
368 //                      if (response.status == NOT_KEY_OWNER)
369                 }
370         }
371 //TODO: find owner and try again, request rebuild if necessary
372         return -1; //this function should be robust enough to always return 0 or 1
373 }
374
375
376
377 //use UDP for messages that are frequent and short
378 void *udpListen()
379 {
380         struct sockaddr_in myAddr;
381         struct sockaddr_in clientAddr;
382         socklen_t socklen = sizeof(struct sockaddr_in);
383         char buffer[BUFFER_SIZE];
384         ssize_t bytesReceived;
385         struct insertCmd *insertCmdPtr;
386         struct removeCmd *removeCmdPtr;
387         struct searchCmd *searchCmdPtr;
388         struct insertRes *insertResPtr;
389         struct removeRes *removeResPtr;
390         struct searchRes *searchResPtr;
391         char replyBuffer[BUFFER_SIZE];
392         struct timeval now;
393
394         chashtable_t *myHashTable = chashCreate(HASH_SIZE, LOADFACTOR);
395
396         if ((udpServerSock = socket(AF_INET, SOCK_DGRAM, IPPROTO_UDP)) == -1)
397         {
398                 perror("udpListen():socket()");
399                 pthread_exit(NULL);
400         }
401         
402         bzero(&myAddr, socklen);
403         myAddr.sin_family=AF_INET;
404         myAddr.sin_addr.s_addr=INADDR_ANY;
405         myAddr.sin_port=htons(UDP_PORT);
406
407         if (bind(udpServerSock, (struct sockaddr *)&myAddr, socklen) == -1)
408         {
409                 perror("udpListen():bind()");
410                 pthread_exit(NULL);
411         }
412 #ifdef DHT_LOG
413         fprintf(logfile,"udpListen(): listening on port %d\n", UDP_PORT);
414         fflush(logfile);
415 #endif
416         while(1)
417         {
418                 if ((bytesReceived = recvfrom(udpServerSock, buffer, BUFFER_SIZE, 0,
419                         (struct sockaddr *)&clientAddr, &socklen)) == -1)
420                 {
421                         perror("udpListen():recvfrom()");
422                 }
423                 else if (bytesReceived == 0)
424                 {
425 #ifdef DHT_LOG
426                         fprintf(logfile,"udpListen(): recvfrom() returned 0\n");
427                         fflush(logfile);
428 #endif
429                 }
430                 else
431                 {
432                         gettimeofday(&now, NULL);
433 #ifdef DHT_LOG
434                         fprintf(logfile, "udpListen(): received %s from %s\n",
435                                 (buffer[0] < NUM_MSG_TYPES ? msg_types[buffer[0]] : "unknown message"),
436                                 inet_ntoa(clientAddr.sin_addr));
437 //                      fprintf(logfile,"udpListen(): time received:%ds,%dus\n", now.tv_sec,
438 //                              now.tv_usec);
439 //                      fprintf(logfile,"udpListen(): msg size:%d bytes source:%s:%d\n",
440 //                              bytesReceived,inet_ntoa(clientAddr.sin_addr),htons(clientAddr.sin_port));
441                         fflush(logfile);
442 #endif
443
444                         switch (buffer[0])
445                         {
446                                 case INSERT_CMD:
447                                         if (bytesReceived != sizeof(struct insertCmd))
448                                         {
449 #ifdef DHT_LOG
450                                                 fprintf(logfile, "udpListen(): ERROR: incorrect message size\n");
451                                                 fflush(logfile);
452 #endif
453                                                 break;
454                                         }
455                                         insertCmdPtr = (struct insertCmd *)buffer;
456 #ifdef DHT_LOG
457                                         fprintf(logfile, "udpListen(): Insert: key=%d, val=%d\n",
458                                                 insertCmdPtr->key, insertCmdPtr->val);
459                                         fflush(logfile);
460 #endif
461                                         insertResPtr = (struct insertRes *)replyBuffer;
462                                         insertResPtr->msgType = INSERT_RES;
463                                         if (getKeyOwner(insertCmdPtr->key) == myHostData.ipAddr)
464                                         {
465                                                 //note: casting val to void * in order to conform to API
466                                                 if(chashInsert(myHashTable, insertCmdPtr->key,
467                                                                 (void *)insertCmdPtr->val) == 0)
468                                                         insertResPtr->status = INSERT_OK;
469                                                 else
470                                                         insertResPtr->status = INSERT_ERROR;
471                                         }
472                                         else
473                                         {
474                                                 insertResPtr->status = NOT_KEY_OWNER;;
475                                         }
476                                         if (sendto(udpServerSock, (void *)insertResPtr,
477                                                 sizeof(struct insertRes), 0, (struct sockaddr *)&clientAddr,
478                                                 socklen) == -1)
479                                         {
480                                                 perror("udpListen():sendto()");
481                                         }
482                                         break;
483                                 case REMOVE_CMD:
484                                         if (bytesReceived != sizeof(struct removeCmd))
485                                         {
486 #ifdef DHT_LOG
487                                                 fprintf(logfile, "udpListen(): ERROR: incorrect message size\n");
488                                                 fflush(logfile);
489 #endif
490                                                 break;
491                                         }
492                                         removeCmdPtr = (struct removeCmd *)buffer;
493 #ifdef DHT_LOG
494                                         fprintf(logfile,"udpListen(): Remove: key=%d\n", removeCmdPtr->key);
495                                         fflush(logfile);
496 #endif
497                                         removeResPtr = (struct removeRes *)replyBuffer;
498                                         removeResPtr->msgType = REMOVE_RES;
499                                         if (getKeyOwner(removeCmdPtr->key) == myHostData.ipAddr)
500                                         {
501                                                 //note: casting val to void * in order to conform to API
502                                                 if(chashRemove(myHashTable, removeCmdPtr->key) == 0)
503                                                         removeResPtr->status = INSERT_OK;
504                                                 else
505                                                         removeResPtr->status = INSERT_ERROR;
506                                         }
507                                         else
508                                         {
509                                                 removeResPtr->status = NOT_KEY_OWNER;
510                                         }
511                                         if (sendto(udpServerSock, (void *)removeResPtr, sizeof(struct removeRes), 0,
512                                                 (struct sockaddr *)&clientAddr, socklen) == -1)
513                                         {
514                                                 perror("udpListen():sendto()");
515                                         }
516                                         break;
517                                 case SEARCH_CMD:
518                                         if (bytesReceived != sizeof(struct searchCmd))
519                                         {
520 #ifdef DHT_LOG
521                                                 fprintf(logfile,"udpListen(): ERROR: incorrect message size\n");
522                                                 fflush(logfile);
523 #endif
524                                                 break;
525                                         }
526                                         searchCmdPtr = (struct searchCmd *)buffer;
527 #ifdef DHT_LOG
528                                                 fprintf(logfile,"udpListen(): Search: key=%d\n",searchCmdPtr->key);
529                                                 fflush(logfile);
530 #endif
531                                         searchResPtr = (struct searchRes *)replyBuffer;
532                                         searchResPtr->msgType = SEARCH_RES;
533                                         if (getKeyOwner(searchCmdPtr->key) == myHostData.ipAddr)
534                                         {
535                                                 //note: casting val to void * in order to conform to API
536                                                 if((searchResPtr->val = (unsigned int)chashSearch(myHashTable,
537                                                                 searchCmdPtr->key)) == 0)
538                                                         searchResPtr->status = KEY_NOT_FOUND;
539                                                 else
540                                                         searchResPtr->status = KEY_FOUND;
541                                         }
542                                         else
543                                         {
544                                                 searchResPtr->status = NOT_KEY_OWNER;
545                                         }
546                                         if (sendto(udpServerSock, (void *)searchResPtr, sizeof(struct searchRes), 0,
547                                                 (struct sockaddr *)&clientAddr, socklen) == -1)
548                                         {
549                                                 perror("udpListen():sendto()");
550                                         }
551                                         break;
552                                 case FIND_LEADER_CMD:
553                                         if (bytesReceived != sizeof(char))
554                                         {
555 #ifdef DHT_LOG
556                                                 fprintf(logfile, "udpListen(): ERROR: incorrect message size\n");
557                                                 fflush(logfile);
558 #endif
559                                                 break;
560                                         }
561                                         if (leader == getMyIpAddr())
562                                         {
563                                                 replyBuffer[0] = FIND_LEADER_RES;
564                                                 if(sendto(udpServerSock, (void *)replyBuffer, sizeof(char), 0,
565                                                         (struct sockaddr *)&clientAddr, socklen) == -1)
566                                                 {
567                                                         perror("udpListen():sendto");
568                                                 }
569                                         }
570                                         break;
571                                 case REBUILD_REQ:
572                                         if (bytesReceived != sizeof(char))
573                                         {
574 #ifdef DHT_LOG
575                                                 fprintf(logfile, "udpListen(): ERROR: incorrect message size\n");
576                                                 fflush(logfile);
577 #endif
578                                                 break;
579                                         }
580                                         if (leader == getMyIpAddr())
581                                         {
582                                                 replyBuffer[0] = REBUILD_RES;
583                                                 if(sendto(udpServerSock, (void *)replyBuffer, sizeof(char), 0,
584                                                         (struct sockaddr *)&clientAddr, socklen) == -1)
585                                                 {
586                                                         perror("udpListen():sendto");
587                                                 }
588                                                 //TODO: leadRebuild()
589                                         }
590                                         else
591                                         {
592                                                 replyBuffer[0] = NOT_LEADER;
593                                                 if(sendto(udpServerSock, (void *)replyBuffer, sizeof(char), 0,
594                                                         (struct sockaddr *)&clientAddr, socklen) == -1)
595                                                 {
596                                                         perror("udpListen():sendto");
597                                                 }
598                                         }
599                                         break;
600 //                              default:
601 #ifdef DHT_LOG
602 //                                      fprintf(logfile,"udpListen(): ERROR: Unknown message type\n");
603 //                                      fflush(logfile);
604 #endif
605                         }
606                 }
607         }
608 }
609
610 int udpSendWaitForResponse(unsigned int dest_ip, unsigned short dest_port,
611         void *msg, unsigned int msglen, void *resBuffer, unsigned int resBufferSize,
612         unsigned int timeout, unsigned int numRetries)
613 {
614         struct sockaddr_in server_addr;
615         struct sockaddr_in ack_addr;
616         socklen_t socklen = sizeof(struct sockaddr_in);
617         struct pollfd pollsock;
618         struct timeval now;
619         int retval;
620         int i;
621         ssize_t bytesReceived;
622
623         bzero((char *) &server_addr, sizeof(server_addr));
624         server_addr.sin_family = AF_INET;
625         server_addr.sin_port = htons(dest_port);
626         server_addr.sin_addr.s_addr = htonl(dest_ip);
627
628         if ((pollsock.fd = socket(PF_INET, SOCK_DGRAM, IPPROTO_UDP)) == -1)
629         {
630                 perror("udpSendWaitForResponse():socket()");
631                 return -1;
632         }
633         
634         pollsock.events = POLLIN;
635         
636         for (i = 0; i < MAX_RETRIES; i++)
637         {
638 #ifdef DHT_LOG
639                 if (i > 0)
640                         fprintf(logfile,"udpSendWaitForResponse(): trying again, count: %d\n",
641                                 i+1);
642                 fflush(logfile);
643 #endif
644                 if (sendto(pollsock.fd, msg, msglen, 0, (struct sockaddr *)&server_addr,
645                         socklen) == -1)
646                 {
647                         perror("udpSendWaitForResponse():sendto");
648                         return -1;
649                 }
650 #ifdef DHT_LOG
651                 gettimeofday(&now, NULL);
652                 fprintf(logfile,"udpSendWaitForResponse(): message sent:%ds,%dus\n",
653                         now.tv_sec, now.tv_usec);
654                 fflush(logfile);
655 #endif
656                 retval = poll(&pollsock, 1, timeout);
657                 if (retval !=0)
658                 {
659                         bytesReceived = recvfrom(pollsock.fd, resBuffer, resBufferSize, 0,
660                                 (struct sockaddr *)&ack_addr, &socklen);
661                         if ((ack_addr.sin_addr.s_addr == server_addr.sin_addr.s_addr)
662                         && (ack_addr.sin_port == server_addr.sin_port))
663                         {
664                                 close(pollsock.fd);
665 #ifdef DHT_LOG
666                                 gettimeofday(&now, NULL);
667                                 fprintf(logfile,"udpSendWaitForResponse(): received response:%ds,%dus\n", now.tv_sec, now.tv_usec);
668                                 fflush(logfile);
669 #endif
670                                 return bytesReceived;
671                         }
672                 }
673         }
674         close(pollsock.fd);
675 #ifdef DHT_LOG
676         gettimeofday(&now, NULL);
677         printf("udpSendWaitForResponse(): timed out, no ack:%ds,%dus\n",
678                 now.tv_sec, now.tv_usec);
679         fflush(logfile);
680 #endif
681         return -1;
682 }
683
684 int udpBroadcastWaitForResponse(unsigned int *reply_ip,
685         unsigned short dest_port, void *msg, unsigned int msglen, void *resBuffer,
686         unsigned int resBufferSize, unsigned int timeout, unsigned int numRetries)
687 {
688         struct sockaddr_in server_addr;
689         struct sockaddr_in ack_addr;
690         socklen_t socklen = sizeof(struct sockaddr_in);
691         struct pollfd pollsock;
692         struct timeval now;
693         int retval;
694         int i;
695         ssize_t bytesReceived;
696         int on;
697
698         bzero((char *) &server_addr, sizeof(server_addr));
699         server_addr.sin_family = AF_INET;
700         server_addr.sin_port = htons(dest_port);
701         server_addr.sin_addr.s_addr = htonl(0xFFFFFFFF);
702
703         if ((pollsock.fd = socket(PF_INET, SOCK_DGRAM, IPPROTO_UDP)) == -1)
704         {
705                 perror("udpBroadcastWaitForResponse():socket()");
706                 return -1;
707         }
708
709         on = 1;
710         if (setsockopt(pollsock.fd, SOL_SOCKET, SO_BROADCAST, &on, sizeof(on)) == -1)
711         {
712                 perror("udpBroadcastWaitForResponse():setsockopt()");
713                 return -1;
714         }
715
716         pollsock.events = POLLIN;
717         
718         for (i = 0; i < MAX_RETRIES; i++)
719         {
720 #ifdef DHT_LOG
721                 if (i > 0)
722                         fprintf(logfile,"udpBroadcastWaitForResponse(): trying again, count: %d\n", i+1);
723                         fflush(logfile);
724 #endif
725                 if (sendto(pollsock.fd, msg, msglen, 0, (struct sockaddr *)&server_addr,
726                         socklen) == -1)
727                 {
728                         perror("udpBroadcastWaitForResponse():sendto()");
729                         return -1;
730                 }
731 #ifdef DHT_LOG
732                 gettimeofday(&now, NULL);
733                 fprintf(logfile,"udpBroadcastWaitForResponse(): message sent:%ds,%dus\n",
734                         now.tv_sec, now.tv_usec);
735                 fflush(logfile);
736 #endif
737                 retval = poll(&pollsock, 1, timeout);
738                 if (retval !=0)
739                 {
740                         bytesReceived = recvfrom(pollsock.fd, resBuffer, resBufferSize, 0,
741                                 (struct sockaddr *)&ack_addr, &socklen);
742                         close(pollsock.fd);
743                         *reply_ip = htonl(ack_addr.sin_addr.s_addr);
744 #ifdef DHT_LOG
745                         gettimeofday(&now, NULL);
746                         fprintf(logfile,"udpBroadcastWaitForResponse(): received response:%ds,%dus\n", now.tv_sec, now.tv_usec);
747                         fflush(logfile);
748 #endif
749                         return bytesReceived;
750                 }
751         }
752         close(pollsock.fd);
753 #ifdef DHT_LOG
754         gettimeofday(&now, NULL);
755         fprintf(logfile,"udpBroadcastWaitForResponse(): timed out, no ack:%ds,%dus\n",
756                 now.tv_sec, now.tv_usec);
757         fflush(logfile);
758 #endif
759         return -1;
760 }
761
762 // use TCP for potentially large and/or important data transfer
763 void *tcpListen()
764 {
765         struct sockaddr_in myAddr;
766         struct sockaddr_in clientAddr;
767         int tcpAcceptSock;
768         socklen_t socklen = sizeof(struct sockaddr_in);
769         pthread_t threadTcpAccept;
770
771         tcpListenSock = socket(AF_INET, SOCK_STREAM, 0);
772         if (tcpListenSock == -1)
773         {
774                 perror("tcpListen():socket()");
775                 pthread_exit(NULL);
776         }
777
778         myAddr.sin_family = AF_INET;
779         myAddr.sin_port = htons(TCP_PORT);
780         myAddr.sin_addr.s_addr = INADDR_ANY;
781         memset(&(myAddr.sin_zero), '\0', 8);
782
783         if (bind(tcpListenSock, (struct sockaddr *)&myAddr, socklen) == -1)
784         {
785                 perror("tcpListen():socket()");
786                 pthread_exit(NULL);
787         }
788
789         if (listen(tcpListenSock, BACKLOG) == -1)
790         {
791                 perror("tcpListen():listen()");
792                 pthread_exit(NULL);
793         }
794
795 #ifdef DHT_LOG
796         fprintf(logfile,"tcpListen(): listening on port %d\n", TCP_PORT);
797         fflush(logfile);
798 #endif
799
800         while(1)
801         {
802                 tcpAcceptSock = accept(tcpListenSock, (struct sockaddr *)&clientAddr, &socklen);
803                 pthread_create(&threadTcpAccept, NULL, tcpAccept, (void *)tcpAcceptSock);
804         }
805 }
806
807 void *tcpAccept(void *arg)
808 {
809         int tcpAcceptSock = (int)arg;
810         int bytesReceived;
811         char msgType;
812
813 #ifdef DHT_LOG
814         fprintf(logfile, "tcpAccept(): accepted tcp connection, file descriptor: %d\n", tcpAcceptSock);
815         fflush(logfile);
816 #endif
817
818         bytesReceived = recv(tcpAcceptSock, &msgType, sizeof(char), 0);
819         if (bytesReceived == -1)
820         {
821                 perror("tcpAccept():recv()");
822         }
823         else if (bytesReceived == 0)
824         {
825 #ifdef DHT_LOG
826                 fprintf(logfile, "tcpAccept(): bytesReceived = 0\n", tcpAcceptSock);
827                 fflush(logfile);
828 #endif
829         }
830         else
831         {
832                 switch (msgType)
833                 {
834                         case DHT_INFO_REQ:
835                                 if (send(tcpAcceptSock, &numHosts, sizeof(numHosts), 0) == -1)
836                                 {
837                                         perror("tcpAccept():send()");
838                                         break;
839                                 }
840                                 if (send(tcpAcceptSock, &numBlocks, sizeof(numBlocks), 0) == -1)
841                                 {
842                                         perror("tcpAccept():send()");
843                                         break;
844                                 }
845                                 if (send(tcpAcceptSock, hostArray, numHosts*sizeof(struct hostData),
846                                                 0) == -1)
847                                 {
848                                         perror("tcpAccept():send()");
849                                         break;
850                                 }
851                                 if (send(tcpAcceptSock, blockOwnerArray, numBlocks*sizeof(unsigned int),
852                                                 0) == -1)
853                                 {
854                                         perror("tcpAccept():send()");
855                                         break;
856                                 }
857                                 break;
858                         default:
859 #ifdef DHT_LOG
860                                 fprintf(logfile, "tcpAccept(): unrecognized msg type\n");
861                                 fflush(logfile);
862 #endif
863                 }
864         }
865
866         if (close(tcpAcceptSock) == -1)
867         {
868                 perror("tcpAccept():close()");
869         }
870
871 #ifdef DHT_LOG
872         fprintf(logfile, "tcpAccept(): closed tcp connection, file descriptor: %d\n",
873                 tcpAcceptSock);
874         fflush(logfile);
875 #endif
876
877         pthread_exit(NULL);
878 }
879
880 unsigned int getKeyOwner(unsigned int key)
881 {
882         return hostArray[blockOwnerArray[hash(key)]].ipAddr;
883 }
884
885 unsigned int getMyIpAddr()
886 {       
887         int sock;
888         struct ifreq interfaceInfo;
889         struct sockaddr_in *myAddr = (struct sockaddr_in *)&interfaceInfo.ifr_addr;
890
891         memset(&interfaceInfo, 0, sizeof(struct ifreq));
892
893         if((sock = socket(AF_INET, SOCK_STREAM, 0)) < 0)
894         {
895                 perror("getMyIpAddr():socket()");
896                 return 1;
897         }
898
899         strcpy(interfaceInfo.ifr_name, DEFAULT_INTERFACE);
900         myAddr->sin_family = AF_INET;
901         
902         if(ioctl(sock, SIOCGIFADDR, &interfaceInfo) != 0)
903         {
904                 perror("getMyIpAddr():ioctl()");
905                 return 1;
906         }
907
908         return ntohl(myAddr->sin_addr.s_addr);
909 }
910
911 unsigned int findLeader()
912 {
913         unsigned int reply_ip;
914         int bytesReceived;
915         char myMessage;
916         char response;
917
918 #ifdef DHT_LOG
919         fprintf(logfile, "findLeader(): broadcasting...\n");
920         fflush(logfile);
921 #endif
922
923         myMessage = FIND_LEADER_CMD;
924
925         bytesReceived = udpBroadcastWaitForResponse(&reply_ip, UDP_PORT,
926                 (void *)&myMessage, sizeof(myMessage), (void *)&response,
927                 sizeof(response), TIMEOUT_MS, MAX_RETRIES);
928
929         if (bytesReceived == -1)
930         {
931 #ifdef DHT_LOG
932                 fprintf(logfile, "findLeader(): no response\n");
933                 fflush(logfile);
934 #endif
935                 return 0;
936         }
937         else if (response == FIND_LEADER_RES)
938         {
939 #ifdef DHT_LOG
940                 struct in_addr reply_addr;
941                 reply_addr.s_addr = htonl(reply_ip);
942                 fprintf(logfile, "findLeader(): leader found:%s\n",
943                                         inet_ntoa(reply_addr));
944                 fflush(logfile);
945 #endif
946                 return reply_ip;
947         }
948         else
949         {
950 #ifdef DHT_LOG
951                 fprintf(logfile, "findLeader(): unexpected response\n");
952                 fflush(logfile);
953 #endif
954                 return 0;
955         }
956 }
957
958 int getDHTdata()
959 {
960         struct sockaddr_in leader_addr;
961         int sock;
962         char msg;
963         int bytesReceived;
964
965         if ((sock = socket(AF_INET, SOCK_STREAM, 0)) == -1)
966         {
967                 perror("getDHTdata():socket()");
968                 return -1;
969         }
970
971         bzero((char *)&leader_addr, sizeof(leader_addr));
972         leader_addr.sin_family = AF_INET;
973         leader_addr.sin_port = htons(TCP_PORT);
974         leader_addr.sin_addr.s_addr = htonl(leader);
975
976         if (connect(sock, (struct sockaddr *)&leader_addr, sizeof(leader_addr)) == -1)
977         {
978                 perror("getDHTdata():connect()");
979                 close(sock);
980                 return -1;
981         }
982         msg = DHT_INFO_REQ;
983         if (send(sock, &msg, sizeof(char), 0) == -1)
984         {
985                 perror("getDHTdata():send()");
986                 close(sock);
987                 return -1;
988         }
989         bytesReceived = recv(sock, &numHosts, sizeof(numHosts), 0);
990         if (bytesReceived == -1)
991         {
992                 perror("getDHTdata():recv()");
993                 close(sock);
994                 return -1;
995         }
996         if (bytesReceived != sizeof(numHosts))
997         {
998 #ifdef DHT_LOG
999                 fprintf(logfile,"getDHTdata(): ERROR: numHosts not completely received\n");
1000                 fflush(logfile);
1001 #endif
1002                 close(sock);
1003                 return -1;
1004         }
1005         bytesReceived = recv(sock, &numBlocks, sizeof(numBlocks), 0);
1006         if (bytesReceived == -1)
1007         {
1008                 perror("getDHTdata():recv()");
1009                 close(sock);
1010                 return -1;
1011         }
1012         if (bytesReceived != sizeof(numBlocks))
1013         {
1014 #ifdef DHT_LOG
1015                 fprintf(logfile,"getDHTdata(): ERROR: numBlocks not completely received\n");
1016                 fflush(logfile);
1017 #endif
1018                 close(sock);
1019                 return -1;
1020         }
1021         if (hostArray != NULL)
1022                 free(hostArray);
1023         hostArray = calloc(numHosts, sizeof(struct hostData));
1024         bytesReceived = recv(sock, hostArray, numHosts*sizeof(struct hostData), 0);
1025         if (bytesReceived == -1)
1026         {
1027                 perror("getDHTdata():recv()");
1028                 close(sock);
1029                 return -1;
1030         }
1031         if (bytesReceived != numHosts*sizeof(struct hostData))
1032         {
1033 #ifdef DHT_LOG
1034                 fprintf(logfile,"getDHTdata(): ERROR: hostArray not completely received\n");
1035                 fflush(logfile);
1036 #endif
1037                 close(sock);
1038                 return -1;
1039         }
1040         if (blockOwnerArray != NULL)
1041                 free(blockOwnerArray);
1042         blockOwnerArray = calloc(numBlocks, sizeof(unsigned int));
1043         bytesReceived = recv(sock, blockOwnerArray, numBlocks*sizeof(unsigned int), 0);
1044         if (bytesReceived == -1)
1045         {
1046                 perror("getDHTdata():recv()");
1047                 close(sock);
1048                 return -1;
1049         }
1050         if (bytesReceived != numBlocks*sizeof(unsigned int))
1051         {
1052 #ifdef DHT_LOG
1053                 fprintf(logfile,"getDHTdata(): ERROR: blockOwnerArray not completely received\n");
1054                 fflush(logfile);
1055 #endif
1056                 close(sock);
1057                 return -1;
1058         }
1059 #ifdef DHT_LOG
1060                 fprintf(logfile,"getDHTdata(): got data:\n");
1061                 writeDHTdata(logfile);
1062                 fflush(logfile);
1063 #endif
1064         return 0;
1065 }
1066
1067 unsigned int hash(unsigned int x)
1068 {
1069         return x % numBlocks;
1070 }
1071
1072 //This function will not return until it succeeds in submitting
1073 // a rebuild request to the leader. It is then the leader's responibility
1074 // to ensure that the rebuild is caried out
1075 void initRebuild()
1076 {
1077         int bytesReceived;
1078         char msg;
1079         char response;
1080         int done;
1081         int retry_count;
1082         int i;
1083
1084         done = 0;
1085         retry_count = 0;
1086
1087         while (!done)
1088         {
1089 #ifdef DHT_LOG
1090                 if (retry_count > 0)
1091                 {
1092                         fprintf(logfile,"initRebuild(): retry count:%d\n", retry_count);
1093                         fflush(logfile);
1094                 }
1095 #endif
1096
1097                 if (leader == 0 || retry_count > 0)
1098                 {
1099                         leader = findLeader(); //broadcast
1100                         if (leader == 0) //no response
1101                         {
1102                                 //TODO:elect leader: this will do for now
1103                                 leader = getMyIpAddr();
1104
1105                                 numHosts = 1;
1106                                 hostArray = calloc(numHosts, sizeof(struct hostData));
1107                                 hostArray[0] = myHostData;
1108                                 numBlocks = INIT_BLOCK_NUM;
1109                                 blockOwnerArray = calloc(numBlocks, sizeof(unsigned int));
1110                                 for (i = 0; i < numBlocks; i++)
1111                                         blockOwnerArray[i] = 0;
1112                         }
1113                 }
1114         
1115                 msg = REBUILD_REQ;
1116
1117                 bytesReceived = udpSendWaitForResponse(leader, UDP_PORT,
1118                         (void *)&msg, sizeof(msg), (void *)&response, sizeof(response),
1119                         TIMEOUT_MS, MAX_RETRIES);
1120                 if (bytesReceived == -1)
1121                 {
1122                         perror("initRebuild():recv()");
1123                 }
1124                 else if (bytesReceived != sizeof(response))
1125                 {
1126 #ifdef DHT_LOG
1127                         fprintf(logfile,"initRebuild(): ERROR: response not completely received\n");
1128                         fflush(logfile);
1129 #endif
1130                 }
1131                 else if (response == NOT_LEADER)
1132                 {
1133 #ifdef DHT_LOG
1134                         struct in_addr address;
1135                         address.s_addr = htonl(leader);
1136                         fprintf(logfile,"initRebuild(): ERROR: %s no longer leader\n",
1137                                 inet_ntoa(address));
1138                         fflush(logfile);
1139 #endif
1140                 }
1141                 else if (response != REBUILD_RES)
1142                 {
1143 #ifdef DHT_LOG
1144                         fprintf(logfile,"initRebuild(): ERROR: unexpected response\n");
1145                         fflush(logfile);
1146 #endif
1147                 }
1148                 else
1149                 {
1150 #ifdef DHT_LOG
1151                         fprintf(logfile,"initRebuild(): submitted rebuild request\n");
1152                         writeDHTdata(logfile);
1153                         fflush(logfile);
1154 #endif
1155                         done = 1;
1156                 }
1157         }
1158         return;
1159 }
1160
1161 void leadRebuild()
1162 {
1163         
1164 }
1165
1166 void followRebuild()
1167 {
1168
1169 }
1170
1171 void writeDHTdata(FILE *outfile)
1172 {
1173         int i;
1174         struct in_addr address;
1175         fprintf(outfile,"numHosts=%d,numBlocks=%d\n", numHosts, numBlocks);
1176         fprintf(outfile,"hostArray: index: ipAddr, maxKeyCapacity\n");
1177         for (i = 0; i < numHosts; i++)
1178         {
1179                 address.s_addr = htonl(hostArray[i].ipAddr);
1180                 fprintf(outfile,"%d: %s, %d\n", i, inet_ntoa(address),
1181                         hostArray[i].maxKeyCapacity);
1182         }
1183         fprintf(outfile,"blockOwnerArray: index: blockOwner\n");
1184         for (i = 0; i < numBlocks; i++)
1185         {
1186                 fprintf(outfile,"%d: %d\n", i, blockOwnerArray[i]);
1187         }
1188 }
1189
1190 #endif
1191