the begining of a DHT (only one host right now...)
[IRC.git] / Robust / src / Runtime / DSTM / interface / dht.c
1 #include <netinet/in.h>
2 #include <arpa/inet.h>
3 #include <sys/types.h>
4 #include <sys/socket.h>
5 #include <sys/ioctl.h>
6 #include <stdio.h>
7 #include <string.h>
8 #include <stdlib.h>
9 #include <unistd.h>
10 #include <pthread.h>
11 #include <sys/time.h>
12 #include <sys/poll.h>
13 #include <netdb.h>
14 #include <net/if.h>
15 #include <linux/sockios.h>
16 #include "dht.h"
17 #include "clookup.h" //this works for now, do we need anything better?
18
19 #define BUFFER_SIZE 512 //maximum message size
20 #define LISTEN_PORT 2157
21 #define TIMEOUT_MS 500
22 #define MAX_RETRIES 3
23 #define INIT_NUM_BLOCKS 16
24 #define DEFAULT_INTERFACE "eth0"
25
26 //general commands
27 #define INSERT_COMMAND 1
28 #define REMOVE_COMMAND 2
29 #define SEARCH_COMMAND 3
30 //general responses
31 #define INSERT_RESPONSE 4
32 #define REMOVE_RESPONSE 5
33 #define SEARCH_RESPONSE 6
34
35 //#define JOIN
36 //#define LEAVE
37 //reserved for leader
38 //#define REBUILD
39
40 //etc...
41
42 //status codes
43 #define INSERT_OK 1
44 #define INSERT_ERROR 2
45 #define REMOVE_OK 3
46 #define REMOVE_ERROR 4
47 #define KEY_FOUND 5
48 #define KEY_NOT_FOUND 6
49 #define NOT_KEY_OWNER 7
50
51 struct hostData {
52         unsigned int ipAddr;
53         unsigned int maxKeyCapacity;
54         struct hostData *next;
55 };
56
57 struct insertCmd {
58         unsigned char msgType;
59         unsigned int unused:12;
60         unsigned int key;
61         unsigned int val;
62 };
63
64 struct removeCmd {
65         unsigned char msgType;
66         unsigned int unused:12;
67         unsigned int key;
68 };
69
70 struct searchCmd {
71         unsigned char msgType;
72         unsigned int unused:12;
73         unsigned int key;
74 };
75
76 struct insertRes {
77         unsigned char msgType;
78         unsigned int status:12;
79 };
80
81 struct removeRes {
82         unsigned char msgType;
83         unsigned int status:12;
84 };
85
86 struct searchRes {
87         unsigned char msgType;
88         unsigned int status:12;
89         unsigned int val;
90 };
91
92 /*struct joinMsg {
93         unsigned char msgType;
94         unsigned int unused:12;
95         struct hostData newHost;
96 };*/
97
98 //TODO: leave message, rebuild message...
99
100 unsigned int numHosts;
101 struct hostData *hostList;
102 struct hostData *myHostData;
103 unsigned int numBlocks;
104 struct hostData **blockOwner;
105
106
107 unsigned int getMyIpAddr();
108 void *dhtListen();
109 //returns number of bytes received in resBuffer, or -1 if an error occurred
110 int sendWaitForResponse(unsigned int dest_ip, unsigned short dest_port, void *msg, unsigned int msglen, void *resBuffer, unsigned int resBufferSize, unsigned int timeout, unsigned int numRetries);
111 int sendNoWait(unsigned int dest_ip, unsigned short dest_port, void *msg, unsigned int msglen);
112 unsigned int getKeyOwner(unsigned int key);
113 unsigned int hash(unsigned int x);
114
115 void dhtInit(unsigned int maxKeyCapacity)
116 {
117         int i;
118
119         myHostData = malloc(sizeof(struct hostData));
120         myHostData->ipAddr = getMyIpAddr();
121         myHostData->maxKeyCapacity;
122         myHostData->next = NULL;
123
124
125         //announce presence (udp), get data structures from leader (leader initiates tcp transfer)
126         //if no response, I am the first
127         hostList = myHostData;
128         numBlocks = INIT_NUM_BLOCKS;
129         blockOwner = malloc(numBlocks * sizeof(struct hostData));
130         for (i = 0; i < numBlocks; i++)
131         {
132                 blockOwner[i] = myHostData;
133         }
134         
135         //otherwise, scan array and choose blocks to take over
136         //get data from hosts that own those blocks (tcp), fill hash table
137         //notify (the leader or everybody?) of ownership changes
138         
139         //start server (udp)
140         pthread_t threadListen;
141         pthread_create(&threadListen, NULL, dhtListen, NULL);
142         
143         return;
144 }
145
146 void dhtExit()
147 {
148
149 }
150
151 int dhtInsert(unsigned int key, unsigned int val)
152 {
153         unsigned int dest_ip = getKeyOwner(key);
154         unsigned short dest_port = LISTEN_PORT;
155         struct insertCmd myMessage;
156         struct insertRes response;
157         int bytesReceived;
158
159         myMessage.msgType = INSERT_COMMAND;
160         myMessage.key = key;
161         myMessage.val = val;
162         
163         bytesReceived = sendWaitForResponse(dest_ip, dest_port, (void *)&myMessage, sizeof(struct insertCmd), (void *)&response, sizeof(struct insertRes), TIMEOUT_MS, MAX_RETRIES);
164         if (bytesReceived == sizeof(struct insertRes))
165         {
166                 if (response.msgType == INSERT_RESPONSE)
167                 {
168                         if (response.status == INSERT_OK)
169                                 return 0;
170 //                      if (response.status == NOT_KEY_OWNER)
171                 }
172         }
173 //TODO: find owner and try again, request rebuild if necessary
174         return -1; //this function should be robust enough to always return 0
175 }
176
177 int dhtRemove(unsigned int key)
178 {
179         unsigned int dest_ip = getKeyOwner(key);
180         unsigned short dest_port = LISTEN_PORT;
181         struct removeCmd myMessage;
182         struct removeRes response;
183         int bytesReceived;
184         
185         myMessage.msgType = REMOVE_COMMAND;
186         myMessage.key = key;
187
188         bytesReceived = sendWaitForResponse(dest_ip, dest_port, (void *)&myMessage, sizeof(struct removeCmd), (void *)&response, sizeof(struct removeRes), TIMEOUT_MS, MAX_RETRIES);
189         if (bytesReceived == sizeof(struct removeRes))
190         {
191                 if (response.msgType == REMOVE_RESPONSE)
192                 {
193                         if (response.status == REMOVE_OK)
194                                 return 0;
195 //                      if (response.status == NOT_KEY_OWNER)
196                 }
197         }
198 //TODO: find owner and try again, request rebuild if necessary
199         return -1; //this function should be robust enough to always return 0
200 }
201
202 int dhtSearch(unsigned int key, unsigned int *val)
203 {
204         unsigned int dest_ip = getKeyOwner(key);
205         unsigned short dest_port = LISTEN_PORT;
206         struct searchCmd myMessage;
207         struct searchRes response;
208         int bytesReceived;
209
210         myMessage.msgType = SEARCH_COMMAND;
211         myMessage.key = key;
212
213         bytesReceived = sendWaitForResponse(dest_ip, dest_port, (void *)&myMessage, sizeof(struct searchCmd), (void *)&response, sizeof(struct searchRes), TIMEOUT_MS, MAX_RETRIES);
214         if (bytesReceived == sizeof(struct searchRes))
215         {
216                 if (response.msgType == SEARCH_RESPONSE)
217                 {
218                         if (response.status == KEY_FOUND)
219                         {
220                                 *val = response.val;
221                                 return 0;
222                         }
223                         if (response.status == KEY_NOT_FOUND)
224                         {
225                                 return 1;
226                         }
227 //                      if (response.status == NOT_KEY_OWNER)
228                 }
229         }
230 //TODO: find owner and try again, request rebuild if necessary
231         return -1; //this function should be robust enough to always return 0 or 1
232 }
233
234 //helper functions
235 void *dhtListen()
236 {
237         struct sockaddr_in my_addr;
238         struct sockaddr_in client_addr;
239         int sock;
240         socklen_t socklen = sizeof(struct sockaddr_in);
241         char buffer[BUFFER_SIZE];
242         ssize_t bytesReceived;
243         struct insertCmd *insertCmdPtr;
244         struct removeCmd *removeCmdPtr;
245         struct searchCmd *searchCmdPtr;
246         struct insertRes *insertResPtr;
247         struct removeRes *removeResPtr;
248         struct searchRes *searchResPtr;
249         char replyBuffer[BUFFER_SIZE];
250         struct timeval now;
251
252         chashtable_t *myHashTable = chashCreate(HASH_SIZE, LOADFACTOR);
253
254         if ((sock = socket(PF_INET, SOCK_DGRAM, IPPROTO_UDP)) == -1)
255         {
256                 perror("socket()");
257                 exit(1);
258         }
259         
260         bzero(&my_addr, socklen);
261         my_addr.sin_family=AF_INET;
262         my_addr.sin_addr.s_addr=INADDR_ANY;
263         my_addr.sin_port=htons(LISTEN_PORT);
264
265         if (bind(sock, (struct sockaddr *)&my_addr, socklen) == -1)
266         {
267                 perror("bind()");
268                 exit(1);
269         }
270 //      printf("listening...\n");
271         while(1)
272         {
273                 if ((bytesReceived = recvfrom(sock, buffer, BUFFER_SIZE, 0, (struct sockaddr *)&client_addr, &socklen)) == -1)
274                 {
275                         perror("recvfrom()");
276                         break;
277                 }
278                 if (bytesReceived == 0)
279                 {
280                         printf("recvfrom() returned 0\n");
281                         break;
282                 }
283                 gettimeofday(&now, NULL);
284 //              printf("message received:%ds,%dus\n", now.tv_sec, now.tv_usec);
285
286 //              printf("Received %d bytes from %x:%d\n", bytesReceived, client_addr.sin_addr.s_addr, client_addr.sin_port);
287                 switch (buffer[0])
288                 {
289                         case INSERT_COMMAND:
290                                 if (bytesReceived != sizeof(struct insertCmd))
291                                 {
292                                         printf("error: incorrect message size\n");
293                                         break;
294                                 }
295                                 insertCmdPtr = (struct insertCmd *)buffer;
296 //                              printf("Insert: key=%d, val=%d\n", insertCmdPtr->key, insertCmdPtr->val);
297                                 insertResPtr = (struct insertRes *)replyBuffer;
298                                 insertResPtr->msgType = INSERT_RESPONSE;
299                                 if (getKeyOwner(insertCmdPtr->key) == myHostData->ipAddr)
300                                 {
301                                         //note: casting val to void * in order to conform to API
302                                         if(chashInsert(myHashTable, insertCmdPtr->key, (void *)insertCmdPtr->val) == 0)
303                                                 insertResPtr->status = INSERT_OK;
304                                         else
305                                                 insertResPtr->status = INSERT_ERROR;
306                                 }
307                                 else
308                                 {
309                                         insertResPtr->status = NOT_KEY_OWNER;;
310                                 }
311                                 sendto(sock, (void *)insertResPtr, sizeof(struct insertRes), 0, (struct sockaddr *)&client_addr, socklen);
312                                 break;
313                         case REMOVE_COMMAND:
314                                 if (bytesReceived != sizeof(struct removeCmd))
315                                 {
316                                         printf("error: incorrect message size\n");
317                                         break;
318                                 }
319                                 removeCmdPtr = (struct removeCmd *)buffer;
320 //                              printf("Remove: key=%d\n", removeCmdPtr->key);
321                                 removeResPtr = (struct removeRes *)replyBuffer;
322                                 removeResPtr->msgType = REMOVE_RESPONSE;
323                                 if (getKeyOwner(removeCmdPtr->key) == myHostData->ipAddr)
324                                 {
325                                         //note: casting val to void * in order to conform to API
326                                         if(chashRemove(myHashTable, removeCmdPtr->key) == 0)
327                                                 removeResPtr->status = INSERT_OK;
328                                         else
329                                                 removeResPtr->status = INSERT_ERROR;
330                                 }
331                                 else
332                                 {
333                                         removeResPtr->status = NOT_KEY_OWNER;
334                                 }
335                                 sendto(sock, (void *)removeResPtr, sizeof(struct removeRes), 0, (struct sockaddr *)&client_addr, socklen);
336                                 break;
337                         case SEARCH_COMMAND:
338                                 if (bytesReceived != sizeof(struct searchCmd))
339                                 {
340                                         printf("error: incorrect message size\n");
341                                         break;
342                                 }
343                                 searchCmdPtr = (struct searchCmd *)buffer;
344 //                              printf("Search: key=%d\n",searchCmdPtr->key);
345                                 searchResPtr = (struct searchRes *)replyBuffer;
346                                 searchResPtr->msgType = SEARCH_RESPONSE;
347                                 if (getKeyOwner(searchCmdPtr->key) == myHostData->ipAddr)
348                                 {
349                                         //note: casting val to void * in order to conform to API
350                                         if((searchResPtr->val = (unsigned int)chashSearch(myHashTable, searchCmdPtr->key)) == 0)
351                                                 searchResPtr->status = KEY_NOT_FOUND;
352                                         else
353                                                 searchResPtr->status = KEY_FOUND;
354                                 }
355                                 else
356                                 {
357                                         searchResPtr->status = NOT_KEY_OWNER;
358                                 }
359                                 sendto(sock, (void *)searchResPtr, sizeof(struct searchRes), 0, (struct sockaddr *)&client_addr, socklen);
360                                 break;
361                                 //just ignore anything else
362 //                      default:
363 //                              printf("Unknown message type\n");
364                 }
365         }
366 }
367
368 int sendWaitForResponse(unsigned int dest_ip, unsigned short dest_port, void *msg, unsigned int msglen, void *resBuffer, unsigned int resBufferSize, unsigned int timeout, unsigned int numRetries)
369 {
370         struct sockaddr_in server_addr;
371         struct sockaddr_in ack_addr;
372         socklen_t socklen = sizeof(struct sockaddr_in);
373         struct pollfd pollsock;
374 //      struct timeval now;
375         int retval;
376         int i;
377         ssize_t bytesReceived;
378
379         bzero((char *) &server_addr, sizeof(server_addr));
380         server_addr.sin_family = AF_INET;
381         server_addr.sin_port = htons(dest_port);
382         server_addr.sin_addr.s_addr = htonl(dest_ip);
383
384         if ((pollsock.fd = socket(PF_INET, SOCK_DGRAM, IPPROTO_UDP)) == -1)
385         {
386                 perror("socket()");
387                 return -1;
388         }
389         
390         pollsock.events = POLLIN;
391         
392         for (i = 0; i < MAX_RETRIES; i++)
393         {
394 //              if (i > 0)
395 //                      printf("trying again, count: %d\n", i+1);
396                 if (sendto(pollsock.fd, msg, msglen, 0, (struct sockaddr *)&server_addr, socklen) == -1)
397                 {
398                         perror("sendto");
399                         return -1;
400                 }
401 //              gettimeofday(&now, NULL);
402 //              printf("message sent:%ds,%dus\n", now.tv_sec, now.tv_usec);
403                 retval = poll(&pollsock, 1, timeout);
404                 if (retval !=0)
405                 {
406                         bytesReceived = recvfrom(pollsock.fd, resBuffer, resBufferSize, 0, (struct sockaddr *)&ack_addr, &socklen);
407                         if ((ack_addr.sin_addr.s_addr == server_addr.sin_addr.s_addr)
408                         && (ack_addr.sin_port == server_addr.sin_port))
409                         {
410                                 close(pollsock.fd);
411 //                              gettimeofday(&now, NULL);
412 //                              printf("received response:%ds,%dus\n", now.tv_sec, now.tv_usec);
413                                 return bytesReceived;
414                         }
415                 }
416         }
417         close(pollsock.fd);
418 //      gettimeofday(&now, NULL);
419 //      printf("timed out, no ack:%ds,%dus\n", now.tv_sec, now.tv_usec);
420         return -1;
421 }
422
423 unsigned int getKeyOwner(unsigned int key)
424 {
425         return blockOwner[hash(key)]->ipAddr;
426 }
427
428 unsigned int getMyIpAddr()
429 {       
430         int sock;
431         struct ifreq interfaceInfo;
432         struct sockaddr_in *myAddr = (struct sockaddr_in *)&interfaceInfo.ifr_addr;
433
434         memset(&interfaceInfo, 0, sizeof(struct ifreq));
435
436         if((sock = socket(AF_INET, SOCK_STREAM, 0)) < 0)
437         {
438                 perror("socket");
439                 return 1;
440         }
441
442         strcpy(interfaceInfo.ifr_name, DEFAULT_INTERFACE);
443         myAddr->sin_family = AF_INET;
444         
445         if(ioctl(sock, SIOCGIFADDR, &interfaceInfo) != 0)
446         {
447                 perror("ioctl");
448                 return 1;
449         }
450
451         return ntohl(myAddr->sin_addr.s_addr);
452 }
453
454 unsigned int hash(unsigned int x)
455 {
456         return x % numBlocks;
457 }
458