1 #include <netinet/in.h>
4 #include <sys/socket.h>
15 #include <linux/sockios.h>
17 #include "clookup.h" //this works for now, do we need anything better?
19 #define BUFFER_SIZE 512 //maximum message size
20 #define LISTEN_PORT 2157
21 #define TIMEOUT_MS 500
23 #define INIT_NUM_BLOCKS 16
24 #define DEFAULT_INTERFACE "eth0"
27 #define INSERT_COMMAND 1
28 #define REMOVE_COMMAND 2
29 #define SEARCH_COMMAND 3
31 #define INSERT_RESPONSE 4
32 #define REMOVE_RESPONSE 5
33 #define SEARCH_RESPONSE 6
44 #define INSERT_ERROR 2
46 #define REMOVE_ERROR 4
48 #define KEY_NOT_FOUND 6
49 #define NOT_KEY_OWNER 7
53 unsigned int maxKeyCapacity;
54 struct hostData *next;
58 unsigned char msgType;
59 unsigned int unused:12;
65 unsigned char msgType;
66 unsigned int unused:12;
71 unsigned char msgType;
72 unsigned int unused:12;
77 unsigned char msgType;
78 unsigned int status:12;
82 unsigned char msgType;
83 unsigned int status:12;
87 unsigned char msgType;
88 unsigned int status:12;
93 unsigned char msgType;
94 unsigned int unused:12;
95 struct hostData newHost;
98 //TODO: leave message, rebuild message...
100 unsigned int numHosts;
101 struct hostData *hostList;
102 struct hostData *myHostData;
103 unsigned int numBlocks;
104 struct hostData **blockOwner;
107 unsigned int getMyIpAddr();
109 //returns number of bytes received in resBuffer, or -1 if an error occurred
110 int sendWaitForResponse(unsigned int dest_ip, unsigned short dest_port, void *msg, unsigned int msglen, void *resBuffer, unsigned int resBufferSize, unsigned int timeout, unsigned int numRetries);
111 int sendNoWait(unsigned int dest_ip, unsigned short dest_port, void *msg, unsigned int msglen);
112 unsigned int getKeyOwner(unsigned int key);
113 unsigned int hash(unsigned int x);
115 void dhtInit(unsigned int maxKeyCapacity)
119 myHostData = malloc(sizeof(struct hostData));
120 myHostData->ipAddr = getMyIpAddr();
121 myHostData->maxKeyCapacity;
122 myHostData->next = NULL;
125 //announce presence (udp), get data structures from leader (leader initiates tcp transfer)
126 //if no response, I am the first
127 hostList = myHostData;
128 numBlocks = INIT_NUM_BLOCKS;
129 blockOwner = malloc(numBlocks * sizeof(struct hostData));
130 for (i = 0; i < numBlocks; i++)
132 blockOwner[i] = myHostData;
135 //otherwise, scan array and choose blocks to take over
136 //get data from hosts that own those blocks (tcp), fill hash table
137 //notify (the leader or everybody?) of ownership changes
140 pthread_t threadListen;
141 pthread_create(&threadListen, NULL, dhtListen, NULL);
151 int dhtInsert(unsigned int key, unsigned int val)
153 unsigned int dest_ip = getKeyOwner(key);
154 unsigned short dest_port = LISTEN_PORT;
155 struct insertCmd myMessage;
156 struct insertRes response;
159 myMessage.msgType = INSERT_COMMAND;
163 bytesReceived = sendWaitForResponse(dest_ip, dest_port, (void *)&myMessage, sizeof(struct insertCmd), (void *)&response, sizeof(struct insertRes), TIMEOUT_MS, MAX_RETRIES);
164 if (bytesReceived == sizeof(struct insertRes))
166 if (response.msgType == INSERT_RESPONSE)
168 if (response.status == INSERT_OK)
170 // if (response.status == NOT_KEY_OWNER)
173 //TODO: find owner and try again, request rebuild if necessary
174 return -1; //this function should be robust enough to always return 0
177 int dhtRemove(unsigned int key)
179 unsigned int dest_ip = getKeyOwner(key);
180 unsigned short dest_port = LISTEN_PORT;
181 struct removeCmd myMessage;
182 struct removeRes response;
185 myMessage.msgType = REMOVE_COMMAND;
188 bytesReceived = sendWaitForResponse(dest_ip, dest_port, (void *)&myMessage, sizeof(struct removeCmd), (void *)&response, sizeof(struct removeRes), TIMEOUT_MS, MAX_RETRIES);
189 if (bytesReceived == sizeof(struct removeRes))
191 if (response.msgType == REMOVE_RESPONSE)
193 if (response.status == REMOVE_OK)
195 // if (response.status == NOT_KEY_OWNER)
198 //TODO: find owner and try again, request rebuild if necessary
199 return -1; //this function should be robust enough to always return 0
202 int dhtSearch(unsigned int key, unsigned int *val)
204 unsigned int dest_ip = getKeyOwner(key);
205 unsigned short dest_port = LISTEN_PORT;
206 struct searchCmd myMessage;
207 struct searchRes response;
210 myMessage.msgType = SEARCH_COMMAND;
213 bytesReceived = sendWaitForResponse(dest_ip, dest_port, (void *)&myMessage, sizeof(struct searchCmd), (void *)&response, sizeof(struct searchRes), TIMEOUT_MS, MAX_RETRIES);
214 if (bytesReceived == sizeof(struct searchRes))
216 if (response.msgType == SEARCH_RESPONSE)
218 if (response.status == KEY_FOUND)
223 if (response.status == KEY_NOT_FOUND)
227 // if (response.status == NOT_KEY_OWNER)
230 //TODO: find owner and try again, request rebuild if necessary
231 return -1; //this function should be robust enough to always return 0 or 1
237 struct sockaddr_in my_addr;
238 struct sockaddr_in client_addr;
240 socklen_t socklen = sizeof(struct sockaddr_in);
241 char buffer[BUFFER_SIZE];
242 ssize_t bytesReceived;
243 struct insertCmd *insertCmdPtr;
244 struct removeCmd *removeCmdPtr;
245 struct searchCmd *searchCmdPtr;
246 struct insertRes *insertResPtr;
247 struct removeRes *removeResPtr;
248 struct searchRes *searchResPtr;
249 char replyBuffer[BUFFER_SIZE];
252 chashtable_t *myHashTable = chashCreate(HASH_SIZE, LOADFACTOR);
254 if ((sock = socket(PF_INET, SOCK_DGRAM, IPPROTO_UDP)) == -1)
260 bzero(&my_addr, socklen);
261 my_addr.sin_family=AF_INET;
262 my_addr.sin_addr.s_addr=INADDR_ANY;
263 my_addr.sin_port=htons(LISTEN_PORT);
265 if (bind(sock, (struct sockaddr *)&my_addr, socklen) == -1)
270 // printf("listening...\n");
273 if ((bytesReceived = recvfrom(sock, buffer, BUFFER_SIZE, 0, (struct sockaddr *)&client_addr, &socklen)) == -1)
275 perror("recvfrom()");
278 if (bytesReceived == 0)
280 printf("recvfrom() returned 0\n");
283 gettimeofday(&now, NULL);
284 // printf("message received:%ds,%dus\n", now.tv_sec, now.tv_usec);
286 // printf("Received %d bytes from %x:%d\n", bytesReceived, client_addr.sin_addr.s_addr, client_addr.sin_port);
290 if (bytesReceived != sizeof(struct insertCmd))
292 printf("error: incorrect message size\n");
295 insertCmdPtr = (struct insertCmd *)buffer;
296 // printf("Insert: key=%d, val=%d\n", insertCmdPtr->key, insertCmdPtr->val);
297 insertResPtr = (struct insertRes *)replyBuffer;
298 insertResPtr->msgType = INSERT_RESPONSE;
299 if (getKeyOwner(insertCmdPtr->key) == myHostData->ipAddr)
301 //note: casting val to void * in order to conform to API
302 if(chashInsert(myHashTable, insertCmdPtr->key, (void *)insertCmdPtr->val) == 0)
303 insertResPtr->status = INSERT_OK;
305 insertResPtr->status = INSERT_ERROR;
309 insertResPtr->status = NOT_KEY_OWNER;;
311 sendto(sock, (void *)insertResPtr, sizeof(struct insertRes), 0, (struct sockaddr *)&client_addr, socklen);
314 if (bytesReceived != sizeof(struct removeCmd))
316 printf("error: incorrect message size\n");
319 removeCmdPtr = (struct removeCmd *)buffer;
320 // printf("Remove: key=%d\n", removeCmdPtr->key);
321 removeResPtr = (struct removeRes *)replyBuffer;
322 removeResPtr->msgType = REMOVE_RESPONSE;
323 if (getKeyOwner(removeCmdPtr->key) == myHostData->ipAddr)
325 //note: casting val to void * in order to conform to API
326 if(chashRemove(myHashTable, removeCmdPtr->key) == 0)
327 removeResPtr->status = INSERT_OK;
329 removeResPtr->status = INSERT_ERROR;
333 removeResPtr->status = NOT_KEY_OWNER;
335 sendto(sock, (void *)removeResPtr, sizeof(struct removeRes), 0, (struct sockaddr *)&client_addr, socklen);
338 if (bytesReceived != sizeof(struct searchCmd))
340 printf("error: incorrect message size\n");
343 searchCmdPtr = (struct searchCmd *)buffer;
344 // printf("Search: key=%d\n",searchCmdPtr->key);
345 searchResPtr = (struct searchRes *)replyBuffer;
346 searchResPtr->msgType = SEARCH_RESPONSE;
347 if (getKeyOwner(searchCmdPtr->key) == myHostData->ipAddr)
349 //note: casting val to void * in order to conform to API
350 if((searchResPtr->val = (unsigned int)chashSearch(myHashTable, searchCmdPtr->key)) == 0)
351 searchResPtr->status = KEY_NOT_FOUND;
353 searchResPtr->status = KEY_FOUND;
357 searchResPtr->status = NOT_KEY_OWNER;
359 sendto(sock, (void *)searchResPtr, sizeof(struct searchRes), 0, (struct sockaddr *)&client_addr, socklen);
361 //just ignore anything else
363 // printf("Unknown message type\n");
368 int sendWaitForResponse(unsigned int dest_ip, unsigned short dest_port, void *msg, unsigned int msglen, void *resBuffer, unsigned int resBufferSize, unsigned int timeout, unsigned int numRetries)
370 struct sockaddr_in server_addr;
371 struct sockaddr_in ack_addr;
372 socklen_t socklen = sizeof(struct sockaddr_in);
373 struct pollfd pollsock;
374 // struct timeval now;
377 ssize_t bytesReceived;
379 bzero((char *) &server_addr, sizeof(server_addr));
380 server_addr.sin_family = AF_INET;
381 server_addr.sin_port = htons(dest_port);
382 server_addr.sin_addr.s_addr = htonl(dest_ip);
384 if ((pollsock.fd = socket(PF_INET, SOCK_DGRAM, IPPROTO_UDP)) == -1)
390 pollsock.events = POLLIN;
392 for (i = 0; i < MAX_RETRIES; i++)
395 // printf("trying again, count: %d\n", i+1);
396 if (sendto(pollsock.fd, msg, msglen, 0, (struct sockaddr *)&server_addr, socklen) == -1)
401 // gettimeofday(&now, NULL);
402 // printf("message sent:%ds,%dus\n", now.tv_sec, now.tv_usec);
403 retval = poll(&pollsock, 1, timeout);
406 bytesReceived = recvfrom(pollsock.fd, resBuffer, resBufferSize, 0, (struct sockaddr *)&ack_addr, &socklen);
407 if ((ack_addr.sin_addr.s_addr == server_addr.sin_addr.s_addr)
408 && (ack_addr.sin_port == server_addr.sin_port))
411 // gettimeofday(&now, NULL);
412 // printf("received response:%ds,%dus\n", now.tv_sec, now.tv_usec);
413 return bytesReceived;
418 // gettimeofday(&now, NULL);
419 // printf("timed out, no ack:%ds,%dus\n", now.tv_sec, now.tv_usec);
423 unsigned int getKeyOwner(unsigned int key)
425 return blockOwner[hash(key)]->ipAddr;
428 unsigned int getMyIpAddr()
431 struct ifreq interfaceInfo;
432 struct sockaddr_in *myAddr = (struct sockaddr_in *)&interfaceInfo.ifr_addr;
434 memset(&interfaceInfo, 0, sizeof(struct ifreq));
436 if((sock = socket(AF_INET, SOCK_STREAM, 0)) < 0)
442 strcpy(interfaceInfo.ifr_name, DEFAULT_INTERFACE);
443 myAddr->sin_family = AF_INET;
445 if(ioctl(sock, SIOCGIFADDR, &interfaceInfo) != 0)
451 return ntohl(myAddr->sin_addr.s_addr);
454 unsigned int hash(unsigned int x)
456 return x % numBlocks;