added a tcp server (that serves nothing), plus random changes
[IRC.git] / Robust / src / Runtime / DSTM / interface / dht.c
1 #include <netinet/in.h>
2 #include <arpa/inet.h>
3 #include <sys/types.h>
4 #include <sys/socket.h>
5 #include <sys/ioctl.h>
6 #include <stdio.h>
7 #include <string.h>
8 #include <stdlib.h>
9 #include <unistd.h>
10 #include <pthread.h>
11 #include <sys/time.h>
12 #include <sys/poll.h>
13 #include <netdb.h>
14 #include <net/if.h>
15 #include <linux/sockios.h>
16 #include "dht.h"
17 #include "clookup.h" //this works for now, do we need anything better?
18
19 #define BUFFER_SIZE 512 //maximum message size
20 #define UDP_PORT 2157
21 #define TCP_PORT 2157
22 #define BACKLOG 10 //max pending tcp connections
23 #define TIMEOUT_MS 500
24 #define MAX_RETRIES 3
25 #define INIT_HOST_ALLOC 16
26 #define INIT_BLOCK_NUM 64
27 #define DEFAULT_INTERFACE "eth0"
28
29 enum {
30         INSERT_COMMAND,
31         REMOVE_COMMAND,
32         SEARCH_COMMAND,
33         FIND_LEADER_COMMAND,
34         INSERT_RESPONSE,
35         REMOVE_RESPONSE,
36         SEARCH_RESPONSE,
37         FIND_LEADER_RESPONSE
38 };
39
40
41 //status codes
42 enum {
43         INSERT_OK,
44         INSERT_ERROR,
45         REMOVE_OK,
46         REMOVE_ERROR,
47         KEY_FOUND,
48         KEY_NOT_FOUND,
49         NOT_KEY_OWNER
50 };
51
52 struct hostData {
53         unsigned int ipAddr;
54         unsigned int maxKeyCapacity;
55 };
56
57 struct insertCmd {
58         unsigned int msgType;
59         unsigned int key;
60         unsigned int val;
61 };
62
63 struct removeCmd {
64         unsigned int msgType;
65         unsigned int key;
66 };
67
68 struct searchCmd {
69         unsigned int msgType;
70         unsigned int key;
71 };
72
73 struct insertRes {
74         unsigned int msgType;
75         unsigned int status;
76 };
77
78 struct removeRes {
79         unsigned int msgType;
80         unsigned int status;
81 };
82
83 struct searchRes {
84         unsigned int msgType;
85         unsigned int status;
86         unsigned int val;
87 };
88
89
90 //TODO: leave message, rebuild message...
91
92 struct hostData myHostData;
93 unsigned int numHosts;
94 struct hostData *hostArray;
95 unsigned int hostArraySize;
96 unsigned int numBlocks;
97 unsigned int *blockOwnerArray;
98 unsigned int blockOwnerArraySize;
99
100 unsigned int getMyIpAddr();
101 void *udpListen();
102 void *tcpListen();
103 void *tcpAccept(void *);
104 //returns number of bytes received in resBuffer, or -1 if an error occurred
105 int udpSendWaitForResponse(unsigned int dest_ip, unsigned short dest_port, void *msg, unsigned int msglen, void *resBuffer, unsigned int resBufferSize, unsigned int timeout, unsigned int numRetries);
106 int sendNoWait(unsigned int dest_ip, unsigned short dest_port, void *msg, unsigned int msglen);
107 unsigned int getKeyOwner(unsigned int key);
108 unsigned int hash(unsigned int x);
109
110 void dhtInit(unsigned int maxKeyCapacity)
111 {
112         unsigned int myMessage;
113         int bytesReceived;
114         int i;
115
116         myHostData.ipAddr = getMyIpAddr();
117         myHostData.maxKeyCapacity = maxKeyCapacity;
118
119         
120
121         //announce presence (udp broadcast), get data structures from leader (leader initiates tcp transfer)
122         
123
124 //if no response, I am the first
125
126         numHosts = 1;
127         hostArray = malloc(INIT_HOST_ALLOC * sizeof(struct hostData));
128         hostArray[0] = myHostData;
129
130         numBlocks = INIT_BLOCK_NUM;
131         blockOwnerArray = malloc(numBlocks * sizeof(unsigned short));
132         for (i = 0; i < numBlocks; i++)
133         {
134                 blockOwnerArray[i] = 0;
135         }
136         
137         //otherwise, scan array and choose blocks to take over
138         //get data from hosts that own those blocks (tcp), fill hash table
139         //notify (the leader or everybody?) of ownership changes
140         
141         //start server (udp)
142         pthread_t threadUdpListen, threadTcpListen;
143         pthread_create(&threadUdpListen, NULL, udpListen, NULL);
144         pthread_create(&threadTcpListen, NULL, tcpListen, NULL);
145         
146         return;
147 }
148
149 void dhtExit()
150 {
151
152 }
153
154 int dhtInsert(unsigned int key, unsigned int val)
155 {
156         unsigned int dest_ip = getKeyOwner(key);
157         struct insertCmd myMessage;
158         struct insertRes response;
159         int bytesReceived;
160
161         myMessage.msgType = INSERT_COMMAND;
162         myMessage.key = key;
163         myMessage.val = val;
164         
165         bytesReceived = udpSendWaitForResponse(dest_ip, UDP_PORT, (void *)&myMessage, sizeof(struct insertCmd), (void *)&response, sizeof(struct insertRes), TIMEOUT_MS, MAX_RETRIES);
166         if (bytesReceived == sizeof(struct insertRes))
167         {
168                 if (response.msgType == INSERT_RESPONSE)
169                 {
170                         if (response.status == INSERT_OK)
171                                 return 0;
172 //                      if (response.status == NOT_KEY_OWNER)
173                 }
174         }
175 //TODO: find owner and try again, request rebuild if necessary
176         return -1; //this function should be robust enough to always return 0
177 }
178
179 int dhtRemove(unsigned int key)
180 {
181         unsigned int dest_ip = getKeyOwner(key);
182         struct removeCmd myMessage;
183         struct removeRes response;
184         int bytesReceived;
185         
186         myMessage.msgType = REMOVE_COMMAND;
187         myMessage.key = key;
188
189         bytesReceived = udpSendWaitForResponse(dest_ip, UDP_PORT, (void *)&myMessage, sizeof(struct removeCmd), (void *)&response, sizeof(struct removeRes), TIMEOUT_MS, MAX_RETRIES);
190         if (bytesReceived == sizeof(struct removeRes))
191         {
192                 if (response.msgType == REMOVE_RESPONSE)
193                 {
194                         if (response.status == REMOVE_OK)
195                                 return 0;
196 //                      if (response.status == NOT_KEY_OWNER)
197                 }
198         }
199 //TODO: find owner and try again, request rebuild if necessary
200         return -1; //this function should be robust enough to always return 0
201 }
202
203 int dhtSearch(unsigned int key, unsigned int *val)
204 {
205         unsigned int dest_ip = getKeyOwner(key);
206         struct searchCmd myMessage;
207         struct searchRes response;
208         int bytesReceived;
209
210         myMessage.msgType = SEARCH_COMMAND;
211         myMessage.key = key;
212
213         bytesReceived = udpSendWaitForResponse(dest_ip, UDP_PORT, (void *)&myMessage, sizeof(struct searchCmd), (void *)&response, sizeof(struct searchRes), TIMEOUT_MS, MAX_RETRIES);
214         if (bytesReceived == sizeof(struct searchRes))
215         {
216                 if (response.msgType == SEARCH_RESPONSE)
217                 {
218                         if (response.status == KEY_FOUND)
219                         {
220                                 *val = response.val;
221                                 return 0;
222                         }
223                         if (response.status == KEY_NOT_FOUND)
224                         {
225                                 return 1;
226                         }
227 //                      if (response.status == NOT_KEY_OWNER)
228                 }
229         }
230 //TODO: find owner and try again, request rebuild if necessary
231         return -1; //this function should be robust enough to always return 0 or 1
232 }
233
234
235
236 //use UDP for messages that are frequent and short
237 void *udpListen()
238 {
239         struct sockaddr_in myAddr;
240         struct sockaddr_in clientAddr;
241         int sock;
242         socklen_t socklen = sizeof(struct sockaddr_in);
243         char buffer[BUFFER_SIZE];
244         ssize_t bytesReceived;
245         struct insertCmd *insertCmdPtr;
246         struct removeCmd *removeCmdPtr;
247         struct searchCmd *searchCmdPtr;
248         struct insertRes *insertResPtr;
249         struct removeRes *removeResPtr;
250         struct searchRes *searchResPtr;
251         char replyBuffer[BUFFER_SIZE];
252         struct timeval now;
253
254         chashtable_t *myHashTable = chashCreate(HASH_SIZE, LOADFACTOR);
255
256         if ((sock = socket(AF_INET, SOCK_DGRAM, IPPROTO_UDP)) == -1)
257         {
258                 perror("socket()");
259                 pthread_exit(NULL);
260         }
261         
262         bzero(&myAddr, socklen);
263         myAddr.sin_family=AF_INET;
264         myAddr.sin_addr.s_addr=INADDR_ANY;
265         myAddr.sin_port=htons(UDP_PORT);
266
267         if (bind(sock, (struct sockaddr *)&myAddr, socklen) == -1)
268         {
269                 perror("bind()");
270                 pthread_exit(NULL);
271         }
272 //      printf("listening...\n");
273         while(1)
274         {
275                 if ((bytesReceived = recvfrom(sock, buffer, BUFFER_SIZE, 0, (struct sockaddr *)&clientAddr, &socklen)) == -1)
276                 {
277                         perror("recvfrom()");
278                         break;
279                 }
280                 if (bytesReceived == 0)
281                 {
282                         printf("recvfrom() returned 0\n");
283                         break;
284                 }
285                 gettimeofday(&now, NULL);
286 //              printf("message received:%ds,%dus\n", now.tv_sec, now.tv_usec);
287
288 //              printf("Received %d bytes from %x:%d\n", bytesReceived, clientAddr.sin_addr.s_addr, clientAddr.sin_port);
289                 switch (buffer[0])
290                 {
291                         case INSERT_COMMAND:
292                                 if (bytesReceived != sizeof(struct insertCmd))
293                                 {
294                                         printf("error: incorrect message size\n");
295                                         break;
296                                 }
297                                 insertCmdPtr = (struct insertCmd *)buffer;
298 //                              printf("Insert: key=%d, val=%d\n", insertCmdPtr->key, insertCmdPtr->val);
299                                 insertResPtr = (struct insertRes *)replyBuffer;
300                                 insertResPtr->msgType = INSERT_RESPONSE;
301                                 if (getKeyOwner(insertCmdPtr->key) == myHostData.ipAddr)
302                                 {
303                                         //note: casting val to void * in order to conform to API
304                                         if(chashInsert(myHashTable, insertCmdPtr->key, (void *)insertCmdPtr->val) == 0)
305                                                 insertResPtr->status = INSERT_OK;
306                                         else
307                                                 insertResPtr->status = INSERT_ERROR;
308                                 }
309                                 else
310                                 {
311                                         insertResPtr->status = NOT_KEY_OWNER;;
312                                 }
313                                 sendto(sock, (void *)insertResPtr, sizeof(struct insertRes), 0, (struct sockaddr *)&clientAddr, socklen);
314                                 break;
315                         case REMOVE_COMMAND:
316                                 if (bytesReceived != sizeof(struct removeCmd))
317                                 {
318                                         printf("error: incorrect message size\n");
319                                         break;
320                                 }
321                                 removeCmdPtr = (struct removeCmd *)buffer;
322 //                              printf("Remove: key=%d\n", removeCmdPtr->key);
323                                 removeResPtr = (struct removeRes *)replyBuffer;
324                                 removeResPtr->msgType = REMOVE_RESPONSE;
325                                 if (getKeyOwner(removeCmdPtr->key) == myHostData.ipAddr)
326                                 {
327                                         //note: casting val to void * in order to conform to API
328                                         if(chashRemove(myHashTable, removeCmdPtr->key) == 0)
329                                                 removeResPtr->status = INSERT_OK;
330                                         else
331                                                 removeResPtr->status = INSERT_ERROR;
332                                 }
333                                 else
334                                 {
335                                         removeResPtr->status = NOT_KEY_OWNER;
336                                 }
337                                 sendto(sock, (void *)removeResPtr, sizeof(struct removeRes), 0, (struct sockaddr *)&clientAddr, socklen);
338                                 break;
339                         case SEARCH_COMMAND:
340                                 if (bytesReceived != sizeof(struct searchCmd))
341                                 {
342                                         printf("error: incorrect message size\n");
343                                         break;
344                                 }
345                                 searchCmdPtr = (struct searchCmd *)buffer;
346 //                              printf("Search: key=%d\n",searchCmdPtr->key);
347                                 searchResPtr = (struct searchRes *)replyBuffer;
348                                 searchResPtr->msgType = SEARCH_RESPONSE;
349                                 if (getKeyOwner(searchCmdPtr->key) == myHostData.ipAddr)
350                                 {
351                                         //note: casting val to void * in order to conform to API
352                                         if((searchResPtr->val = (unsigned int)chashSearch(myHashTable, searchCmdPtr->key)) == 0)
353                                                 searchResPtr->status = KEY_NOT_FOUND;
354                                         else
355                                                 searchResPtr->status = KEY_FOUND;
356                                 }
357                                 else
358                                 {
359                                         searchResPtr->status = NOT_KEY_OWNER;
360                                 }
361                                 sendto(sock, (void *)searchResPtr, sizeof(struct searchRes), 0, (struct sockaddr *)&clientAddr, socklen);
362                                 break;
363                                 //just ignore anything else
364 //                      default:
365 //                              printf("Unknown message type\n");
366                 }
367         }
368 }
369
370 int udpSendWaitForResponse(unsigned int dest_ip, unsigned short dest_port, void *msg, unsigned int msglen, void *resBuffer, unsigned int resBufferSize, unsigned int timeout, unsigned int numRetries)
371 {
372         struct sockaddr_in server_addr;
373         struct sockaddr_in ack_addr;
374         socklen_t socklen = sizeof(struct sockaddr_in);
375         struct pollfd pollsock;
376 //      struct timeval now;
377         int retval;
378         int i;
379         ssize_t bytesReceived;
380
381         bzero((char *) &server_addr, sizeof(server_addr));
382         server_addr.sin_family = AF_INET;
383         server_addr.sin_port = htons(dest_port);
384         server_addr.sin_addr.s_addr = htonl(dest_ip);
385
386         if ((pollsock.fd = socket(PF_INET, SOCK_DGRAM, IPPROTO_UDP)) == -1)
387         {
388                 perror("socket()");
389                 return -1;
390         }
391         
392         pollsock.events = POLLIN;
393         
394         for (i = 0; i < MAX_RETRIES; i++)
395         {
396 //              if (i > 0)
397 //                      printf("trying again, count: %d\n", i+1);
398                 if (sendto(pollsock.fd, msg, msglen, 0, (struct sockaddr *)&server_addr, socklen) == -1)
399                 {
400                         perror("sendto");
401                         return -1;
402                 }
403 //              gettimeofday(&now, NULL);
404 //              printf("message sent:%ds,%dus\n", now.tv_sec, now.tv_usec);
405                 retval = poll(&pollsock, 1, timeout);
406                 if (retval !=0)
407                 {
408                         bytesReceived = recvfrom(pollsock.fd, resBuffer, resBufferSize, 0, (struct sockaddr *)&ack_addr, &socklen);
409                         if ((ack_addr.sin_addr.s_addr == server_addr.sin_addr.s_addr)
410                         && (ack_addr.sin_port == server_addr.sin_port))
411                         {
412                                 close(pollsock.fd);
413 //                              gettimeofday(&now, NULL);
414 //                              printf("received response:%ds,%dus\n", now.tv_sec, now.tv_usec);
415                                 return bytesReceived;
416                         }
417                 }
418         }
419         close(pollsock.fd);
420 //      gettimeofday(&now, NULL);
421 //      printf("timed out, no ack:%ds,%dus\n", now.tv_sec, now.tv_usec);
422         return -1;
423 }
424
425 // use TCP for potentially large and/or important data transfer
426 void *tcpListen()
427 {
428         struct sockaddr_in myAddr;
429         struct sockaddr_in clientAddr;
430         int sockListen, sockAccept;
431         socklen_t socklen = sizeof(struct sockaddr_in);
432         pthread_t threadTcpAccept;
433
434         sockListen = socket(AF_INET, SOCK_STREAM, 0);
435         if (sockListen == -1)
436         {
437                 perror("socket()");
438                 pthread_exit(NULL);
439         }
440
441         myAddr.sin_family = AF_INET;
442         myAddr.sin_port = htons(TCP_PORT);
443         myAddr.sin_addr.s_addr = INADDR_ANY;
444         memset(&(myAddr.sin_zero), '\0', 8);
445
446         if (bind(sockListen, (struct sockaddr *)&myAddr, socklen) == -1)
447         {
448                 perror("socket()");
449                 pthread_exit(NULL);
450         }
451
452         if (listen(sockListen, BACKLOG) == -1)
453         {
454                 perror("listen()");
455                 pthread_exit(NULL);
456         }
457
458         while(1)
459         {
460                 sockAccept = accept(sockListen, (struct sockaddr *)&clientAddr, &socklen);
461                 pthread_create(&threadTcpAccept, NULL, tcpAccept, (void *)sockAccept);
462         }
463 }
464
465 void *tcpAccept(void *arg)
466 {
467         int sockAccept = (int)arg;
468         
469         printf("accepted tcp connection, file descriptor: %d\n", sockAccept);
470
471         sleep(30);
472
473         if (close(sockAccept) == -1)
474         {
475                 perror("close()");
476         }
477
478         printf("closed tcp connection, file descriptor: %d\n", sockAccept);
479
480         pthread_exit(NULL);
481 }
482
483 unsigned int getKeyOwner(unsigned int key)
484 {
485         return hostArray[blockOwnerArray[hash(key)]].ipAddr;
486 }
487
488 unsigned int getMyIpAddr()
489 {       
490         int sock;
491         struct ifreq interfaceInfo;
492         struct sockaddr_in *myAddr = (struct sockaddr_in *)&interfaceInfo.ifr_addr;
493
494         memset(&interfaceInfo, 0, sizeof(struct ifreq));
495
496         if((sock = socket(AF_INET, SOCK_STREAM, 0)) < 0)
497         {
498                 perror("socket()");
499                 return 1;
500         }
501
502         strcpy(interfaceInfo.ifr_name, DEFAULT_INTERFACE);
503         myAddr->sin_family = AF_INET;
504         
505         if(ioctl(sock, SIOCGIFADDR, &interfaceInfo) != 0)
506         {
507                 perror("ioctl()");
508                 return 1;
509         }
510
511         return ntohl(myAddr->sin_addr.s_addr);
512 }
513
514 unsigned int hash(unsigned int x)
515 {
516         return x % numBlocks;
517 }
518