1 #include <netinet/in.h>
4 #include <sys/socket.h>
15 #define BUFFER_SIZE 512
17 #define TIMEOUT_MS 500
19 unsigned int numHosts;
20 struct hostData *hostList;
21 unsigned int numBlocks;
22 struct hostData *blockOwner;
25 int sendWaitForAck(unsigned int dest_ip, unsigned short dest_port, void *msg, unsigned int msglen);
31 pthread_t threadListen;
32 pthread_create(&threadListen, NULL, dhtListen, NULL);
34 //announce presence (udp), get data structures from leader (leader initiates tcp transfer)
35 //if no response, I am the first
36 //otherwise, scan array and choose blocks to take over
37 //get data from hosts that own those blocks (tcp), fill hash table
38 //notify (the leader or everybody?) of ownership changes
47 int dhtInsert(unsigned int key, unsigned int val)
49 unsigned int dest_ip = 0x7F000001;
50 unsigned short dest_port = PORT;
51 struct dhtInsertMsg myMessage;
52 myMessage.msgType = DHT_INSERT;
55 return sendWaitForAck(dest_ip, dest_port, (void *)&myMessage, sizeof(struct dhtInsertMsg));
58 int dhtRemove(unsigned int key)
60 unsigned int dest_ip = 0x7F000001;
61 unsigned short dest_port = PORT;
62 struct dhtRemoveMsg myMessage;
63 myMessage.msgType = DHT_REMOVE;
65 return sendWaitForAck(dest_ip, dest_port, (void *)&myMessage, sizeof(struct dhtRemoveMsg));
68 int dhtSearch(unsigned int key)
70 unsigned int dest_ip = 0x7F000001;
71 unsigned short dest_port = PORT;
72 struct dhtSearchMsg myMessage;
73 myMessage.msgType = DHT_SEARCH;
75 //TODO:this obviously requires more than an ACK, first implement actual hash table
76 return sendWaitForAck(dest_ip, dest_port, (void *)&myMessage, sizeof(struct dhtSearchMsg));
82 struct sockaddr_in my_addr;
83 struct sockaddr_in client_addr;
85 socklen_t socklen = sizeof(struct sockaddr_in);
86 char buffer[BUFFER_SIZE];
87 ssize_t bytesReceived;
90 if ((sock = socket(PF_INET, SOCK_DGRAM, IPPROTO_UDP)) == -1)
96 bzero(&my_addr, socklen);
97 my_addr.sin_family=AF_INET;
98 my_addr.sin_addr.s_addr=INADDR_ANY;
99 my_addr.sin_port=htons(PORT);
101 if (bind(sock, (struct sockaddr *)&my_addr, socklen) == -1)
106 printf("listening...\n");
109 if ((bytesReceived = recvfrom(sock, buffer, BUFFER_SIZE, 0, (struct sockaddr *)&client_addr, &socklen)) == -1)
111 printf("recvfrom() returned -1\n");
114 if (bytesReceived == 0)
116 printf("recvfrom() returned 0\n");
119 gettimeofday(&now, NULL);
120 printf("message received:%ds,%dus\n", now.tv_sec, now.tv_usec);
122 printf("Received %d bytes from %x:%d\n", bytesReceived, client_addr.sin_addr.s_addr, client_addr.sin_port);
126 if (bytesReceived != sizeof(struct dhtInsertMsg))
128 printf("error: incorrect message size\n");
131 printf("Insert: key=%d, val=%d\n",((struct dhtInsertMsg *)buffer)->key,((struct dhtInsertMsg *)buffer)->val);
133 sendto(sock, buffer, 1, 0, (struct sockaddr *)&client_addr, socklen);
136 if (bytesReceived != sizeof(struct dhtRemoveMsg))
138 printf("error: incorrect message size\n");
141 printf("Remove: key=%d\n",((struct dhtRemoveMsg *)buffer)->key);
143 sendto(sock, buffer, 1, 0, (struct sockaddr *)&client_addr, socklen);
146 if (bytesReceived != sizeof(struct dhtSearchMsg))
148 printf("error: incorrect message size\n");
151 printf("Search: key=%d\n",((struct dhtSearchMsg *)buffer)->key);
153 sendto(sock, buffer, 1, 0, (struct sockaddr *)&client_addr, socklen);
156 printf("Unknown message type\n");
161 //send message, wait for response, resend twice before return failure
162 int sendWaitForAck(unsigned int dest_ip, unsigned short dest_port, void *msg, unsigned int msglen)
164 struct sockaddr_in server_addr;
165 struct sockaddr_in ack_addr;
166 socklen_t socklen = sizeof(struct sockaddr_in);
167 struct pollfd pollsock;
172 ssize_t bytesReceived;
174 bzero((char *) &server_addr, sizeof(server_addr));
175 server_addr.sin_family = AF_INET;
176 server_addr.sin_port = htons(dest_port);
177 server_addr.sin_addr.s_addr = htonl(dest_ip);
179 if ((pollsock.fd = socket(PF_INET, SOCK_DGRAM, IPPROTO_UDP)) == -1)
181 printf("error creating socket\n");
185 pollsock.events = POLLIN;
187 for (i = 0; i < 3; i++)
190 printf("trying again, count: %d\n", i+1);
191 if (sendto(pollsock.fd, msg, msglen, 0, (struct sockaddr *)&server_addr, socklen) == -1)
193 printf("error sending\n");
196 gettimeofday(&now, NULL);
197 printf("message sent:%ds,%dus\n", now.tv_sec, now.tv_usec);
198 retval = poll(&pollsock, 1, TIMEOUT_MS);
201 bytesReceived = recvfrom(pollsock.fd, &ackByte, 1, 0, (struct sockaddr *)&ack_addr, &socklen);
202 if ((bytesReceived == 1) && (ack_addr.sin_addr.s_addr == server_addr.sin_addr.s_addr)
203 && (ack_addr.sin_port == server_addr.sin_port) && (ackByte == DHT_ACK))
206 gettimeofday(&now, NULL);
207 printf("received ack:%ds,%dus\n", now.tv_sec, now.tv_usec);
213 gettimeofday(&now, NULL);
214 printf("timed out, no ack:%ds,%dus\n", now.tv_sec, now.tv_usec);