Preliminary code for distributed hash table. Defines some basic udp message structure...
[IRC.git] / Robust / src / Runtime / DSTM / interface / dht.c
1 #include <netinet/in.h>
2 #include <arpa/inet.h>
3 #include <sys/types.h>
4 #include <sys/socket.h>
5 #include <stdio.h>
6 #include <string.h>
7 #include <stdlib.h>
8 #include <unistd.h>
9 #include <pthread.h>
10 #include <sys/time.h>
11 #include <sys/poll.h>
12 #include "dht.h"
13 #include "llookup.h"
14
15 #define BUFFER_SIZE 512
16 #define PORT 2157
17 #define TIMEOUT_MS 500
18
19 unsigned int numHosts;
20 struct hostData *hostList;
21 unsigned int numBlocks;
22 struct hostData *blockOwner;
23
24 void *dhtListen();
25 int sendWaitForAck(unsigned int dest_ip, unsigned short dest_port, void *msg, unsigned int msglen);
26
27 void dhtInit()
28 {
29         
30         //start server (udp)
31         pthread_t threadListen;
32         pthread_create(&threadListen, NULL, dhtListen, NULL);
33
34         //announce presence (udp), get data structures from leader (leader initiates tcp transfer)
35         //if no response, I am the first
36         //otherwise, scan array and choose blocks to take over
37         //get data from hosts that own those blocks (tcp), fill hash table
38         //notify (the leader or everybody?) of ownership changes
39         return;
40 }
41
42 void dhtExit()
43 {
44
45 }
46
47 int dhtInsert(unsigned int key, unsigned int val)
48 {
49         unsigned int dest_ip = 0x7F000001;
50         unsigned short dest_port = PORT;
51         struct dhtInsertMsg myMessage;
52         myMessage.msgType = DHT_INSERT;
53         myMessage.key = key;
54         myMessage.val = val;
55         return sendWaitForAck(dest_ip, dest_port, (void *)&myMessage, sizeof(struct dhtInsertMsg));
56 }
57
58 int dhtRemove(unsigned int key)
59 {
60         unsigned int dest_ip = 0x7F000001;
61         unsigned short dest_port = PORT;
62         struct dhtRemoveMsg myMessage;
63         myMessage.msgType = DHT_REMOVE;
64         myMessage.key = key;
65         return sendWaitForAck(dest_ip, dest_port, (void *)&myMessage, sizeof(struct dhtRemoveMsg));
66 }
67
68 int dhtSearch(unsigned int key)
69 {
70         unsigned int dest_ip = 0x7F000001;
71         unsigned short dest_port = PORT;
72         struct dhtSearchMsg myMessage;
73         myMessage.msgType = DHT_SEARCH;
74         myMessage.key = key;
75         //TODO:this obviously requires more than an ACK, first implement actual hash table
76         return sendWaitForAck(dest_ip, dest_port, (void *)&myMessage, sizeof(struct dhtSearchMsg));
77 }
78
79 //helper functions
80 void *dhtListen()
81 {
82         struct sockaddr_in my_addr;
83         struct sockaddr_in client_addr;
84         int sock;
85         socklen_t socklen = sizeof(struct sockaddr_in);
86         char buffer[BUFFER_SIZE];
87         ssize_t bytesReceived;
88         struct timeval now;
89
90         if ((sock = socket(PF_INET, SOCK_DGRAM, IPPROTO_UDP)) == -1)
91         {
92                 perror("socket()");
93                 exit(1);
94         }
95         
96         bzero(&my_addr, socklen);
97         my_addr.sin_family=AF_INET;
98         my_addr.sin_addr.s_addr=INADDR_ANY;
99         my_addr.sin_port=htons(PORT);
100
101         if (bind(sock, (struct sockaddr *)&my_addr, socklen) == -1)
102         {
103                 perror("bind()");
104                 exit(1);
105         }
106         printf("listening...\n");
107         while(1)
108         {
109                 if ((bytesReceived = recvfrom(sock, buffer, BUFFER_SIZE, 0, (struct sockaddr *)&client_addr, &socklen)) == -1)
110                 {
111                         printf("recvfrom() returned -1\n");
112                         break;
113                 }
114                 if (bytesReceived == 0)
115                 {
116                         printf("recvfrom() returned 0\n");
117                         break;
118                 }
119                 gettimeofday(&now, NULL);
120                 printf("message received:%ds,%dus\n", now.tv_sec, now.tv_usec);
121
122                 printf("Received %d bytes from %x:%d\n", bytesReceived, client_addr.sin_addr.s_addr, client_addr.sin_port);
123                 switch (buffer[0])
124                 {
125                         case DHT_INSERT:
126                                 if (bytesReceived != sizeof(struct dhtInsertMsg))
127                                 {
128                                         printf("error: incorrect message size\n");
129                                         break;
130                                 }
131                                 printf("Insert: key=%d, val=%d\n",((struct dhtInsertMsg *)buffer)->key,((struct dhtInsertMsg *)buffer)->val);
132                                 buffer[0] = DHT_ACK;
133                                 sendto(sock, buffer, 1, 0, (struct sockaddr *)&client_addr, socklen);
134                                 break;
135                         case DHT_REMOVE:
136                                 if (bytesReceived != sizeof(struct dhtRemoveMsg))
137                                 {
138                                         printf("error: incorrect message size\n");
139                                         break;
140                                 }
141                                 printf("Remove: key=%d\n",((struct dhtRemoveMsg *)buffer)->key);
142                                 buffer[0] = DHT_ACK;
143                                 sendto(sock, buffer, 1, 0, (struct sockaddr *)&client_addr, socklen);
144                                 break;
145                         case DHT_SEARCH:
146                                 if (bytesReceived != sizeof(struct dhtSearchMsg))
147                                 {
148                                         printf("error: incorrect message size\n");
149                                         break;
150                                 }
151                                 printf("Search: key=%d\n",((struct dhtSearchMsg *)buffer)->key);
152                                 buffer[0] = DHT_ACK;
153                                 sendto(sock, buffer, 1, 0, (struct sockaddr *)&client_addr, socklen);
154                                 break;                  
155                         default:
156                                 printf("Unknown message type\n");
157                 }
158         }
159 }
160
161 //send message, wait for response, resend twice before return failure
162 int sendWaitForAck(unsigned int dest_ip, unsigned  short dest_port, void *msg, unsigned int msglen)
163 {
164         struct sockaddr_in server_addr;
165         struct sockaddr_in ack_addr;
166         socklen_t socklen = sizeof(struct sockaddr_in);
167         struct pollfd pollsock;
168         struct timeval now;
169         int retval;
170         int i;
171         char ackByte;
172         ssize_t bytesReceived;
173
174         bzero((char *) &server_addr, sizeof(server_addr));
175         server_addr.sin_family = AF_INET;
176         server_addr.sin_port = htons(dest_port);
177         server_addr.sin_addr.s_addr = htonl(dest_ip);
178
179         if ((pollsock.fd = socket(PF_INET, SOCK_DGRAM, IPPROTO_UDP)) == -1)
180         {
181                 printf("error creating socket\n");
182                 return 1;
183         }
184         
185         pollsock.events = POLLIN;
186         
187         for (i = 0; i < 3; i++)
188         {
189                 if (i > 0)
190                         printf("trying again, count: %d\n", i+1);
191                 if (sendto(pollsock.fd, msg, msglen, 0, (struct sockaddr *)&server_addr, socklen) == -1)
192                 {
193                         printf("error sending\n");
194                         return 1;
195                 }
196                 gettimeofday(&now, NULL);
197                 printf("message sent:%ds,%dus\n", now.tv_sec, now.tv_usec);
198                 retval = poll(&pollsock, 1, TIMEOUT_MS);
199                 if (retval !=0)
200                 {
201                         bytesReceived = recvfrom(pollsock.fd, &ackByte, 1, 0, (struct sockaddr *)&ack_addr, &socklen);
202                         if ((bytesReceived == 1) && (ack_addr.sin_addr.s_addr == server_addr.sin_addr.s_addr)
203                         && (ack_addr.sin_port == server_addr.sin_port) && (ackByte == DHT_ACK))
204                         {
205                                 close(pollsock.fd);
206                                 gettimeofday(&now, NULL);
207                                 printf("received ack:%ds,%dus\n", now.tv_sec, now.tv_usec);
208                                 return 0;
209                         }
210                 }
211         }
212         close(pollsock.fd);
213         gettimeofday(&now, NULL);
214         printf("timed out, no ack:%ds,%dus\n", now.tv_sec, now.tv_usec);
215         return 1;
216 }
217