1 #include <netinet/in.h>
4 #include <sys/socket.h>
15 #include <linux/sockios.h>
17 #include "clookup.h" //this works for now, do we need anything better?
19 #define BUFFER_SIZE 512 //maximum message size
22 #define BACKLOG 10 //max pending tcp connections
23 #define TIMEOUT_MS 500
25 #define INIT_HOST_ALLOC 16
26 #define INIT_BLOCK_NUM 64
27 #define DEFAULT_INTERFACE "eth0"
54 unsigned int maxKeyCapacity;
90 //TODO: leave message, rebuild message...
92 struct hostData myHostData;
93 unsigned int numHosts;
94 struct hostData *hostArray;
95 unsigned int hostArraySize;
96 unsigned int numBlocks;
97 unsigned int *blockOwnerArray;
98 unsigned int blockOwnerArraySize;
100 unsigned int getMyIpAddr();
103 void *tcpAccept(void *);
104 //returns number of bytes received in resBuffer, or -1 if an error occurred
105 int udpSendWaitForResponse(unsigned int dest_ip, unsigned short dest_port, void *msg, unsigned int msglen, void *resBuffer, unsigned int resBufferSize, unsigned int timeout, unsigned int numRetries);
106 int sendNoWait(unsigned int dest_ip, unsigned short dest_port, void *msg, unsigned int msglen);
107 unsigned int getKeyOwner(unsigned int key);
108 unsigned int hash(unsigned int x);
110 void dhtInit(unsigned int maxKeyCapacity)
112 unsigned int myMessage;
116 myHostData.ipAddr = getMyIpAddr();
117 myHostData.maxKeyCapacity = maxKeyCapacity;
121 //announce presence (udp broadcast), get data structures from leader (leader initiates tcp transfer)
124 //if no response, I am the first
127 hostArray = malloc(INIT_HOST_ALLOC * sizeof(struct hostData));
128 hostArray[0] = myHostData;
130 numBlocks = INIT_BLOCK_NUM;
131 blockOwnerArray = malloc(numBlocks * sizeof(unsigned short));
132 for (i = 0; i < numBlocks; i++)
134 blockOwnerArray[i] = 0;
137 //otherwise, scan array and choose blocks to take over
138 //get data from hosts that own those blocks (tcp), fill hash table
139 //notify (the leader or everybody?) of ownership changes
142 pthread_t threadUdpListen, threadTcpListen;
143 pthread_create(&threadUdpListen, NULL, udpListen, NULL);
144 pthread_create(&threadTcpListen, NULL, tcpListen, NULL);
154 int dhtInsert(unsigned int key, unsigned int val)
156 unsigned int dest_ip = getKeyOwner(key);
157 struct insertCmd myMessage;
158 struct insertRes response;
161 myMessage.msgType = INSERT_COMMAND;
165 bytesReceived = udpSendWaitForResponse(dest_ip, UDP_PORT, (void *)&myMessage, sizeof(struct insertCmd), (void *)&response, sizeof(struct insertRes), TIMEOUT_MS, MAX_RETRIES);
166 if (bytesReceived == sizeof(struct insertRes))
168 if (response.msgType == INSERT_RESPONSE)
170 if (response.status == INSERT_OK)
172 // if (response.status == NOT_KEY_OWNER)
175 //TODO: find owner and try again, request rebuild if necessary
176 return -1; //this function should be robust enough to always return 0
179 int dhtRemove(unsigned int key)
181 unsigned int dest_ip = getKeyOwner(key);
182 struct removeCmd myMessage;
183 struct removeRes response;
186 myMessage.msgType = REMOVE_COMMAND;
189 bytesReceived = udpSendWaitForResponse(dest_ip, UDP_PORT, (void *)&myMessage, sizeof(struct removeCmd), (void *)&response, sizeof(struct removeRes), TIMEOUT_MS, MAX_RETRIES);
190 if (bytesReceived == sizeof(struct removeRes))
192 if (response.msgType == REMOVE_RESPONSE)
194 if (response.status == REMOVE_OK)
196 // if (response.status == NOT_KEY_OWNER)
199 //TODO: find owner and try again, request rebuild if necessary
200 return -1; //this function should be robust enough to always return 0
203 int dhtSearch(unsigned int key, unsigned int *val)
205 unsigned int dest_ip = getKeyOwner(key);
206 struct searchCmd myMessage;
207 struct searchRes response;
210 myMessage.msgType = SEARCH_COMMAND;
213 bytesReceived = udpSendWaitForResponse(dest_ip, UDP_PORT, (void *)&myMessage, sizeof(struct searchCmd), (void *)&response, sizeof(struct searchRes), TIMEOUT_MS, MAX_RETRIES);
214 if (bytesReceived == sizeof(struct searchRes))
216 if (response.msgType == SEARCH_RESPONSE)
218 if (response.status == KEY_FOUND)
223 if (response.status == KEY_NOT_FOUND)
227 // if (response.status == NOT_KEY_OWNER)
230 //TODO: find owner and try again, request rebuild if necessary
231 return -1; //this function should be robust enough to always return 0 or 1
236 //use UDP for messages that are frequent and short
239 struct sockaddr_in myAddr;
240 struct sockaddr_in clientAddr;
242 socklen_t socklen = sizeof(struct sockaddr_in);
243 char buffer[BUFFER_SIZE];
244 ssize_t bytesReceived;
245 struct insertCmd *insertCmdPtr;
246 struct removeCmd *removeCmdPtr;
247 struct searchCmd *searchCmdPtr;
248 struct insertRes *insertResPtr;
249 struct removeRes *removeResPtr;
250 struct searchRes *searchResPtr;
251 char replyBuffer[BUFFER_SIZE];
254 chashtable_t *myHashTable = chashCreate(HASH_SIZE, LOADFACTOR);
256 if ((sock = socket(AF_INET, SOCK_DGRAM, IPPROTO_UDP)) == -1)
262 bzero(&myAddr, socklen);
263 myAddr.sin_family=AF_INET;
264 myAddr.sin_addr.s_addr=INADDR_ANY;
265 myAddr.sin_port=htons(UDP_PORT);
267 if (bind(sock, (struct sockaddr *)&myAddr, socklen) == -1)
272 // printf("listening...\n");
275 if ((bytesReceived = recvfrom(sock, buffer, BUFFER_SIZE, 0, (struct sockaddr *)&clientAddr, &socklen)) == -1)
277 perror("recvfrom()");
280 if (bytesReceived == 0)
282 printf("recvfrom() returned 0\n");
285 gettimeofday(&now, NULL);
286 // printf("message received:%ds,%dus\n", now.tv_sec, now.tv_usec);
288 // printf("Received %d bytes from %x:%d\n", bytesReceived, clientAddr.sin_addr.s_addr, clientAddr.sin_port);
292 if (bytesReceived != sizeof(struct insertCmd))
294 printf("error: incorrect message size\n");
297 insertCmdPtr = (struct insertCmd *)buffer;
298 // printf("Insert: key=%d, val=%d\n", insertCmdPtr->key, insertCmdPtr->val);
299 insertResPtr = (struct insertRes *)replyBuffer;
300 insertResPtr->msgType = INSERT_RESPONSE;
301 if (getKeyOwner(insertCmdPtr->key) == myHostData.ipAddr)
303 //note: casting val to void * in order to conform to API
304 if(chashInsert(myHashTable, insertCmdPtr->key, (void *)insertCmdPtr->val) == 0)
305 insertResPtr->status = INSERT_OK;
307 insertResPtr->status = INSERT_ERROR;
311 insertResPtr->status = NOT_KEY_OWNER;;
313 sendto(sock, (void *)insertResPtr, sizeof(struct insertRes), 0, (struct sockaddr *)&clientAddr, socklen);
316 if (bytesReceived != sizeof(struct removeCmd))
318 printf("error: incorrect message size\n");
321 removeCmdPtr = (struct removeCmd *)buffer;
322 // printf("Remove: key=%d\n", removeCmdPtr->key);
323 removeResPtr = (struct removeRes *)replyBuffer;
324 removeResPtr->msgType = REMOVE_RESPONSE;
325 if (getKeyOwner(removeCmdPtr->key) == myHostData.ipAddr)
327 //note: casting val to void * in order to conform to API
328 if(chashRemove(myHashTable, removeCmdPtr->key) == 0)
329 removeResPtr->status = INSERT_OK;
331 removeResPtr->status = INSERT_ERROR;
335 removeResPtr->status = NOT_KEY_OWNER;
337 sendto(sock, (void *)removeResPtr, sizeof(struct removeRes), 0, (struct sockaddr *)&clientAddr, socklen);
340 if (bytesReceived != sizeof(struct searchCmd))
342 printf("error: incorrect message size\n");
345 searchCmdPtr = (struct searchCmd *)buffer;
346 // printf("Search: key=%d\n",searchCmdPtr->key);
347 searchResPtr = (struct searchRes *)replyBuffer;
348 searchResPtr->msgType = SEARCH_RESPONSE;
349 if (getKeyOwner(searchCmdPtr->key) == myHostData.ipAddr)
351 //note: casting val to void * in order to conform to API
352 if((searchResPtr->val = (unsigned int)chashSearch(myHashTable, searchCmdPtr->key)) == 0)
353 searchResPtr->status = KEY_NOT_FOUND;
355 searchResPtr->status = KEY_FOUND;
359 searchResPtr->status = NOT_KEY_OWNER;
361 sendto(sock, (void *)searchResPtr, sizeof(struct searchRes), 0, (struct sockaddr *)&clientAddr, socklen);
363 //just ignore anything else
365 // printf("Unknown message type\n");
370 int udpSendWaitForResponse(unsigned int dest_ip, unsigned short dest_port, void *msg, unsigned int msglen, void *resBuffer, unsigned int resBufferSize, unsigned int timeout, unsigned int numRetries)
372 struct sockaddr_in server_addr;
373 struct sockaddr_in ack_addr;
374 socklen_t socklen = sizeof(struct sockaddr_in);
375 struct pollfd pollsock;
376 // struct timeval now;
379 ssize_t bytesReceived;
381 bzero((char *) &server_addr, sizeof(server_addr));
382 server_addr.sin_family = AF_INET;
383 server_addr.sin_port = htons(dest_port);
384 server_addr.sin_addr.s_addr = htonl(dest_ip);
386 if ((pollsock.fd = socket(PF_INET, SOCK_DGRAM, IPPROTO_UDP)) == -1)
392 pollsock.events = POLLIN;
394 for (i = 0; i < MAX_RETRIES; i++)
397 // printf("trying again, count: %d\n", i+1);
398 if (sendto(pollsock.fd, msg, msglen, 0, (struct sockaddr *)&server_addr, socklen) == -1)
403 // gettimeofday(&now, NULL);
404 // printf("message sent:%ds,%dus\n", now.tv_sec, now.tv_usec);
405 retval = poll(&pollsock, 1, timeout);
408 bytesReceived = recvfrom(pollsock.fd, resBuffer, resBufferSize, 0, (struct sockaddr *)&ack_addr, &socklen);
409 if ((ack_addr.sin_addr.s_addr == server_addr.sin_addr.s_addr)
410 && (ack_addr.sin_port == server_addr.sin_port))
413 // gettimeofday(&now, NULL);
414 // printf("received response:%ds,%dus\n", now.tv_sec, now.tv_usec);
415 return bytesReceived;
420 // gettimeofday(&now, NULL);
421 // printf("timed out, no ack:%ds,%dus\n", now.tv_sec, now.tv_usec);
425 // use TCP for potentially large and/or important data transfer
428 struct sockaddr_in myAddr;
429 struct sockaddr_in clientAddr;
430 int sockListen, sockAccept;
431 socklen_t socklen = sizeof(struct sockaddr_in);
432 pthread_t threadTcpAccept;
434 sockListen = socket(AF_INET, SOCK_STREAM, 0);
435 if (sockListen == -1)
441 myAddr.sin_family = AF_INET;
442 myAddr.sin_port = htons(TCP_PORT);
443 myAddr.sin_addr.s_addr = INADDR_ANY;
444 memset(&(myAddr.sin_zero), '\0', 8);
446 if (bind(sockListen, (struct sockaddr *)&myAddr, socklen) == -1)
452 if (listen(sockListen, BACKLOG) == -1)
460 sockAccept = accept(sockListen, (struct sockaddr *)&clientAddr, &socklen);
461 pthread_create(&threadTcpAccept, NULL, tcpAccept, (void *)sockAccept);
465 void *tcpAccept(void *arg)
467 int sockAccept = (int)arg;
469 printf("accepted tcp connection, file descriptor: %d\n", sockAccept);
473 if (close(sockAccept) == -1)
478 printf("closed tcp connection, file descriptor: %d\n", sockAccept);
483 unsigned int getKeyOwner(unsigned int key)
485 return hostArray[blockOwnerArray[hash(key)]].ipAddr;
488 unsigned int getMyIpAddr()
491 struct ifreq interfaceInfo;
492 struct sockaddr_in *myAddr = (struct sockaddr_in *)&interfaceInfo.ifr_addr;
494 memset(&interfaceInfo, 0, sizeof(struct ifreq));
496 if((sock = socket(AF_INET, SOCK_STREAM, 0)) < 0)
502 strcpy(interfaceInfo.ifr_name, DEFAULT_INTERFACE);
503 myAddr->sin_family = AF_INET;
505 if(ioctl(sock, SIOCGIFADDR, &interfaceInfo) != 0)
511 return ntohl(myAddr->sin_addr.s_addr);
514 unsigned int hash(unsigned int x)
516 return x % numBlocks;