#include <arpa/inet.h>
#include <sys/types.h>
#include <sys/socket.h>
+#include <sys/ioctl.h>
#include <stdio.h>
#include <string.h>
#include <stdlib.h>
#include <pthread.h>
#include <sys/time.h>
#include <sys/poll.h>
+#include <netdb.h>
+#include <net/if.h>
+#include <linux/sockios.h>
#include "dht.h"
-#include "llookup.h"
+#include "clookup.h" //this works for now, do we need anything better?
-#define BUFFER_SIZE 512
-#define PORT 2157
+#define BUFFER_SIZE 512 //maximum message size
+#define LISTEN_PORT 2157
#define TIMEOUT_MS 500
+#define MAX_RETRIES 3
+#define INIT_NUM_BLOCKS 16
+#define DEFAULT_INTERFACE "eth0"
+
+//general commands
+#define INSERT_COMMAND 1
+#define REMOVE_COMMAND 2
+#define SEARCH_COMMAND 3
+//general responses
+#define INSERT_RESPONSE 4
+#define REMOVE_RESPONSE 5
+#define SEARCH_RESPONSE 6
+
+//#define JOIN
+//#define LEAVE
+//reserved for leader
+//#define REBUILD
+
+//etc...
+
+//status codes
+#define INSERT_OK 1
+#define INSERT_ERROR 2
+#define REMOVE_OK 3
+#define REMOVE_ERROR 4
+#define KEY_FOUND 5
+#define KEY_NOT_FOUND 6
+#define NOT_KEY_OWNER 7
+
+struct hostData {
+ unsigned int ipAddr;
+ unsigned int maxKeyCapacity;
+ struct hostData *next;
+};
+
+struct insertCmd {
+ unsigned char msgType;
+ unsigned int unused:12;
+ unsigned int key;
+ unsigned int val;
+};
+
+struct removeCmd {
+ unsigned char msgType;
+ unsigned int unused:12;
+ unsigned int key;
+};
+
+struct searchCmd {
+ unsigned char msgType;
+ unsigned int unused:12;
+ unsigned int key;
+};
+
+struct insertRes {
+ unsigned char msgType;
+ unsigned int status:12;
+};
+
+struct removeRes {
+ unsigned char msgType;
+ unsigned int status:12;
+};
+
+struct searchRes {
+ unsigned char msgType;
+ unsigned int status:12;
+ unsigned int val;
+};
+
+/*struct joinMsg {
+ unsigned char msgType;
+ unsigned int unused:12;
+ struct hostData newHost;
+};*/
+
+//TODO: leave message, rebuild message...
unsigned int numHosts;
struct hostData *hostList;
+struct hostData *myHostData;
unsigned int numBlocks;
-struct hostData *blockOwner;
+struct hostData **blockOwner;
+
+unsigned int getMyIpAddr();
void *dhtListen();
-int sendWaitForAck(unsigned int dest_ip, unsigned short dest_port, void *msg, unsigned int msglen);
+//returns number of bytes received in resBuffer, or -1 if an error occurred
+int sendWaitForResponse(unsigned int dest_ip, unsigned short dest_port, void *msg, unsigned int msglen, void *resBuffer, unsigned int resBufferSize, unsigned int timeout, unsigned int numRetries);
+int sendNoWait(unsigned int dest_ip, unsigned short dest_port, void *msg, unsigned int msglen);
+unsigned int getKeyOwner(unsigned int key);
+unsigned int hash(unsigned int x);
-void dhtInit()
+void dhtInit(unsigned int maxKeyCapacity)
{
-
- //start server (udp)
- pthread_t threadListen;
- pthread_create(&threadListen, NULL, dhtListen, NULL);
+ int i;
+
+ myHostData = malloc(sizeof(struct hostData));
+ myHostData->ipAddr = getMyIpAddr();
+ myHostData->maxKeyCapacity;
+ myHostData->next = NULL;
+
//announce presence (udp), get data structures from leader (leader initiates tcp transfer)
//if no response, I am the first
+ hostList = myHostData;
+ numBlocks = INIT_NUM_BLOCKS;
+ blockOwner = malloc(numBlocks * sizeof(struct hostData));
+ for (i = 0; i < numBlocks; i++)
+ {
+ blockOwner[i] = myHostData;
+ }
+
//otherwise, scan array and choose blocks to take over
//get data from hosts that own those blocks (tcp), fill hash table
//notify (the leader or everybody?) of ownership changes
+
+ //start server (udp)
+ pthread_t threadListen;
+ pthread_create(&threadListen, NULL, dhtListen, NULL);
+
return;
}
int dhtInsert(unsigned int key, unsigned int val)
{
- unsigned int dest_ip = 0x7F000001;
- unsigned short dest_port = PORT;
- struct dhtInsertMsg myMessage;
- myMessage.msgType = DHT_INSERT;
+ unsigned int dest_ip = getKeyOwner(key);
+ unsigned short dest_port = LISTEN_PORT;
+ struct insertCmd myMessage;
+ struct insertRes response;
+ int bytesReceived;
+
+ myMessage.msgType = INSERT_COMMAND;
myMessage.key = key;
myMessage.val = val;
- return sendWaitForAck(dest_ip, dest_port, (void *)&myMessage, sizeof(struct dhtInsertMsg));
+
+ bytesReceived = sendWaitForResponse(dest_ip, dest_port, (void *)&myMessage, sizeof(struct insertCmd), (void *)&response, sizeof(struct insertRes), TIMEOUT_MS, MAX_RETRIES);
+ if (bytesReceived == sizeof(struct insertRes))
+ {
+ if (response.msgType == INSERT_RESPONSE)
+ {
+ if (response.status == INSERT_OK)
+ return 0;
+// if (response.status == NOT_KEY_OWNER)
+ }
+ }
+//TODO: find owner and try again, request rebuild if necessary
+ return -1; //this function should be robust enough to always return 0
}
int dhtRemove(unsigned int key)
{
- unsigned int dest_ip = 0x7F000001;
- unsigned short dest_port = PORT;
- struct dhtRemoveMsg myMessage;
- myMessage.msgType = DHT_REMOVE;
+ unsigned int dest_ip = getKeyOwner(key);
+ unsigned short dest_port = LISTEN_PORT;
+ struct removeCmd myMessage;
+ struct removeRes response;
+ int bytesReceived;
+
+ myMessage.msgType = REMOVE_COMMAND;
myMessage.key = key;
- return sendWaitForAck(dest_ip, dest_port, (void *)&myMessage, sizeof(struct dhtRemoveMsg));
+
+ bytesReceived = sendWaitForResponse(dest_ip, dest_port, (void *)&myMessage, sizeof(struct removeCmd), (void *)&response, sizeof(struct removeRes), TIMEOUT_MS, MAX_RETRIES);
+ if (bytesReceived == sizeof(struct removeRes))
+ {
+ if (response.msgType == REMOVE_RESPONSE)
+ {
+ if (response.status == REMOVE_OK)
+ return 0;
+// if (response.status == NOT_KEY_OWNER)
+ }
+ }
+//TODO: find owner and try again, request rebuild if necessary
+ return -1; //this function should be robust enough to always return 0
}
-int dhtSearch(unsigned int key)
+int dhtSearch(unsigned int key, unsigned int *val)
{
- unsigned int dest_ip = 0x7F000001;
- unsigned short dest_port = PORT;
- struct dhtSearchMsg myMessage;
- myMessage.msgType = DHT_SEARCH;
+ unsigned int dest_ip = getKeyOwner(key);
+ unsigned short dest_port = LISTEN_PORT;
+ struct searchCmd myMessage;
+ struct searchRes response;
+ int bytesReceived;
+
+ myMessage.msgType = SEARCH_COMMAND;
myMessage.key = key;
- //TODO:this obviously requires more than an ACK, first implement actual hash table
- return sendWaitForAck(dest_ip, dest_port, (void *)&myMessage, sizeof(struct dhtSearchMsg));
+
+ bytesReceived = sendWaitForResponse(dest_ip, dest_port, (void *)&myMessage, sizeof(struct searchCmd), (void *)&response, sizeof(struct searchRes), TIMEOUT_MS, MAX_RETRIES);
+ if (bytesReceived == sizeof(struct searchRes))
+ {
+ if (response.msgType == SEARCH_RESPONSE)
+ {
+ if (response.status == KEY_FOUND)
+ {
+ *val = response.val;
+ return 0;
+ }
+ if (response.status == KEY_NOT_FOUND)
+ {
+ return 1;
+ }
+// if (response.status == NOT_KEY_OWNER)
+ }
+ }
+//TODO: find owner and try again, request rebuild if necessary
+ return -1; //this function should be robust enough to always return 0 or 1
}
//helper functions
socklen_t socklen = sizeof(struct sockaddr_in);
char buffer[BUFFER_SIZE];
ssize_t bytesReceived;
+ struct insertCmd *insertCmdPtr;
+ struct removeCmd *removeCmdPtr;
+ struct searchCmd *searchCmdPtr;
+ struct insertRes *insertResPtr;
+ struct removeRes *removeResPtr;
+ struct searchRes *searchResPtr;
+ char replyBuffer[BUFFER_SIZE];
struct timeval now;
+ chashtable_t *myHashTable = chashCreate(HASH_SIZE, LOADFACTOR);
+
if ((sock = socket(PF_INET, SOCK_DGRAM, IPPROTO_UDP)) == -1)
{
perror("socket()");
bzero(&my_addr, socklen);
my_addr.sin_family=AF_INET;
my_addr.sin_addr.s_addr=INADDR_ANY;
- my_addr.sin_port=htons(PORT);
+ my_addr.sin_port=htons(LISTEN_PORT);
if (bind(sock, (struct sockaddr *)&my_addr, socklen) == -1)
{
perror("bind()");
exit(1);
}
- printf("listening...\n");
+// printf("listening...\n");
while(1)
{
if ((bytesReceived = recvfrom(sock, buffer, BUFFER_SIZE, 0, (struct sockaddr *)&client_addr, &socklen)) == -1)
{
- printf("recvfrom() returned -1\n");
+ perror("recvfrom()");
break;
}
if (bytesReceived == 0)
break;
}
gettimeofday(&now, NULL);
- printf("message received:%ds,%dus\n", now.tv_sec, now.tv_usec);
+// printf("message received:%ds,%dus\n", now.tv_sec, now.tv_usec);
- printf("Received %d bytes from %x:%d\n", bytesReceived, client_addr.sin_addr.s_addr, client_addr.sin_port);
+// printf("Received %d bytes from %x:%d\n", bytesReceived, client_addr.sin_addr.s_addr, client_addr.sin_port);
switch (buffer[0])
{
- case DHT_INSERT:
- if (bytesReceived != sizeof(struct dhtInsertMsg))
+ case INSERT_COMMAND:
+ if (bytesReceived != sizeof(struct insertCmd))
{
printf("error: incorrect message size\n");
break;
}
- printf("Insert: key=%d, val=%d\n",((struct dhtInsertMsg *)buffer)->key,((struct dhtInsertMsg *)buffer)->val);
- buffer[0] = DHT_ACK;
- sendto(sock, buffer, 1, 0, (struct sockaddr *)&client_addr, socklen);
+ insertCmdPtr = (struct insertCmd *)buffer;
+// printf("Insert: key=%d, val=%d\n", insertCmdPtr->key, insertCmdPtr->val);
+ insertResPtr = (struct insertRes *)replyBuffer;
+ insertResPtr->msgType = INSERT_RESPONSE;
+ if (getKeyOwner(insertCmdPtr->key) == myHostData->ipAddr)
+ {
+ //note: casting val to void * in order to conform to API
+ if(chashInsert(myHashTable, insertCmdPtr->key, (void *)insertCmdPtr->val) == 0)
+ insertResPtr->status = INSERT_OK;
+ else
+ insertResPtr->status = INSERT_ERROR;
+ }
+ else
+ {
+ insertResPtr->status = NOT_KEY_OWNER;;
+ }
+ sendto(sock, (void *)insertResPtr, sizeof(struct insertRes), 0, (struct sockaddr *)&client_addr, socklen);
break;
- case DHT_REMOVE:
- if (bytesReceived != sizeof(struct dhtRemoveMsg))
+ case REMOVE_COMMAND:
+ if (bytesReceived != sizeof(struct removeCmd))
{
printf("error: incorrect message size\n");
break;
}
- printf("Remove: key=%d\n",((struct dhtRemoveMsg *)buffer)->key);
- buffer[0] = DHT_ACK;
- sendto(sock, buffer, 1, 0, (struct sockaddr *)&client_addr, socklen);
+ removeCmdPtr = (struct removeCmd *)buffer;
+// printf("Remove: key=%d\n", removeCmdPtr->key);
+ removeResPtr = (struct removeRes *)replyBuffer;
+ removeResPtr->msgType = REMOVE_RESPONSE;
+ if (getKeyOwner(removeCmdPtr->key) == myHostData->ipAddr)
+ {
+ //note: casting val to void * in order to conform to API
+ if(chashRemove(myHashTable, removeCmdPtr->key) == 0)
+ removeResPtr->status = INSERT_OK;
+ else
+ removeResPtr->status = INSERT_ERROR;
+ }
+ else
+ {
+ removeResPtr->status = NOT_KEY_OWNER;
+ }
+ sendto(sock, (void *)removeResPtr, sizeof(struct removeRes), 0, (struct sockaddr *)&client_addr, socklen);
break;
- case DHT_SEARCH:
- if (bytesReceived != sizeof(struct dhtSearchMsg))
+ case SEARCH_COMMAND:
+ if (bytesReceived != sizeof(struct searchCmd))
{
printf("error: incorrect message size\n");
break;
}
- printf("Search: key=%d\n",((struct dhtSearchMsg *)buffer)->key);
- buffer[0] = DHT_ACK;
- sendto(sock, buffer, 1, 0, (struct sockaddr *)&client_addr, socklen);
- break;
- default:
- printf("Unknown message type\n");
+ searchCmdPtr = (struct searchCmd *)buffer;
+// printf("Search: key=%d\n",searchCmdPtr->key);
+ searchResPtr = (struct searchRes *)replyBuffer;
+ searchResPtr->msgType = SEARCH_RESPONSE;
+ if (getKeyOwner(searchCmdPtr->key) == myHostData->ipAddr)
+ {
+ //note: casting val to void * in order to conform to API
+ if((searchResPtr->val = (unsigned int)chashSearch(myHashTable, searchCmdPtr->key)) == 0)
+ searchResPtr->status = KEY_NOT_FOUND;
+ else
+ searchResPtr->status = KEY_FOUND;
+ }
+ else
+ {
+ searchResPtr->status = NOT_KEY_OWNER;
+ }
+ sendto(sock, (void *)searchResPtr, sizeof(struct searchRes), 0, (struct sockaddr *)&client_addr, socklen);
+ break;
+ //just ignore anything else
+// default:
+// printf("Unknown message type\n");
}
}
}
-//send message, wait for response, resend twice before return failure
-int sendWaitForAck(unsigned int dest_ip, unsigned short dest_port, void *msg, unsigned int msglen)
+int sendWaitForResponse(unsigned int dest_ip, unsigned short dest_port, void *msg, unsigned int msglen, void *resBuffer, unsigned int resBufferSize, unsigned int timeout, unsigned int numRetries)
{
struct sockaddr_in server_addr;
struct sockaddr_in ack_addr;
socklen_t socklen = sizeof(struct sockaddr_in);
struct pollfd pollsock;
- struct timeval now;
+// struct timeval now;
int retval;
int i;
- char ackByte;
ssize_t bytesReceived;
bzero((char *) &server_addr, sizeof(server_addr));
if ((pollsock.fd = socket(PF_INET, SOCK_DGRAM, IPPROTO_UDP)) == -1)
{
- printf("error creating socket\n");
- return 1;
+ perror("socket()");
+ return -1;
}
pollsock.events = POLLIN;
- for (i = 0; i < 3; i++)
+ for (i = 0; i < MAX_RETRIES; i++)
{
- if (i > 0)
- printf("trying again, count: %d\n", i+1);
+// if (i > 0)
+// printf("trying again, count: %d\n", i+1);
if (sendto(pollsock.fd, msg, msglen, 0, (struct sockaddr *)&server_addr, socklen) == -1)
{
- printf("error sending\n");
- return 1;
+ perror("sendto");
+ return -1;
}
- gettimeofday(&now, NULL);
- printf("message sent:%ds,%dus\n", now.tv_sec, now.tv_usec);
- retval = poll(&pollsock, 1, TIMEOUT_MS);
+// gettimeofday(&now, NULL);
+// printf("message sent:%ds,%dus\n", now.tv_sec, now.tv_usec);
+ retval = poll(&pollsock, 1, timeout);
if (retval !=0)
{
- bytesReceived = recvfrom(pollsock.fd, &ackByte, 1, 0, (struct sockaddr *)&ack_addr, &socklen);
- if ((bytesReceived == 1) && (ack_addr.sin_addr.s_addr == server_addr.sin_addr.s_addr)
- && (ack_addr.sin_port == server_addr.sin_port) && (ackByte == DHT_ACK))
+ bytesReceived = recvfrom(pollsock.fd, resBuffer, resBufferSize, 0, (struct sockaddr *)&ack_addr, &socklen);
+ if ((ack_addr.sin_addr.s_addr == server_addr.sin_addr.s_addr)
+ && (ack_addr.sin_port == server_addr.sin_port))
{
close(pollsock.fd);
- gettimeofday(&now, NULL);
- printf("received ack:%ds,%dus\n", now.tv_sec, now.tv_usec);
- return 0;
+// gettimeofday(&now, NULL);
+// printf("received response:%ds,%dus\n", now.tv_sec, now.tv_usec);
+ return bytesReceived;
}
}
}
close(pollsock.fd);
- gettimeofday(&now, NULL);
- printf("timed out, no ack:%ds,%dus\n", now.tv_sec, now.tv_usec);
- return 1;
+// gettimeofday(&now, NULL);
+// printf("timed out, no ack:%ds,%dus\n", now.tv_sec, now.tv_usec);
+ return -1;
+}
+
+unsigned int getKeyOwner(unsigned int key)
+{
+ return blockOwner[hash(key)]->ipAddr;
+}
+
+unsigned int getMyIpAddr()
+{
+ int sock;
+ struct ifreq interfaceInfo;
+ struct sockaddr_in *myAddr = (struct sockaddr_in *)&interfaceInfo.ifr_addr;
+
+ memset(&interfaceInfo, 0, sizeof(struct ifreq));
+
+ if((sock = socket(AF_INET, SOCK_STREAM, 0)) < 0)
+ {
+ perror("socket");
+ return 1;
+ }
+
+ strcpy(interfaceInfo.ifr_name, DEFAULT_INTERFACE);
+ myAddr->sin_family = AF_INET;
+
+ if(ioctl(sock, SIOCGIFADDR, &interfaceInfo) != 0)
+ {
+ perror("ioctl");
+ return 1;
+ }
+
+ return ntohl(myAddr->sin_addr.s_addr);
+}
+
+unsigned int hash(unsigned int x)
+{
+ return x % numBlocks;
}
#include <stdio.h>
#include "dht.h"
+#include "clookup.h"
+
+#define NUM_ITEMS 1000
int main()
{
- int i;
+ unsigned int key;
+ unsigned int val;
+ unsigned int vals[NUM_ITEMS];
+ int retval;
+ int error;
+ chashtable_t *localHash;
+
+ dhtInit(DHT_NO_KEY_LIMIT);
+ localHash = chashCreate(HASH_SIZE, LOADFACTOR);
+ srandom(time(0));
+
+ for (key = 0; key < NUM_ITEMS; key++)
+ {
+ vals[key] = random();
+ }
+
+ vals[NUM_ITEMS / 2] = 0;
+ vals[NUM_ITEMS / 3] = 1;
+ vals[NUM_ITEMS / 4] = 2;
+ vals[NUM_ITEMS / 5] = 0xFFFFFFFF;
+
+ printf("testing dhtInsert() and dhtSearch()\n");
+
+ for (key = 0; key < NUM_ITEMS; key++)
+ {
+ dhtInsert(key, vals[key]);
+ }
+
+ error = 0;
+ for (key = 0; key < NUM_ITEMS; key++)
+ {
+ retval = dhtSearch(key, &val);
+ if (retval == 1)
+ {
+ printf("item not found: key = %d, expected val = %d\n", key, vals[key]);
+ error = 1;
+ }
+ else if (retval == -1)
+ {
+ printf("internal error: key = %d, expected val = %d\n", key, vals[key]);
+ error = 1;
+ }
+ else if (retval == 0)
+ {
+ if (vals[key] != val)
+ {
+ printf("unexpected value: key = %d, expected val = %d, val = %d\n", key, vals[key], val);
+ error = 1;
+ }
+ }
+ }
+ if (!error)
+ printf("test completed successfully\n");
+ else
+ printf("one or more errors occurred\n");
+
+ printf("(this currently fails if key = 0 OR val = 0, due to underlying hash table)\n");
+ printf("testing underlying hash table (clookup.h)\n");
- dhtInit();
- sleep(1);
+ for (key = 0; key < NUM_ITEMS; key++)
+ {
+ chashInsert(localHash, key, (void *)vals[key]);
+ }
+
+ error = 0;
+ for (key = 0; key < NUM_ITEMS; key++)
+ {
+ val = (unsigned int)chashSearch(localHash, key);
+ if ((void *)val == NULL)
+ {
+ printf("item not found: key = %d, expected val = %d\n", key, vals[key]);
+ error = 1;
+ }
+ else
+ {
+ if (vals[key] != val)
+ {
+ printf("unexpected value: key = %d, expected val = %d, val = %d\n", key, vals[key], val);
+ error = 1;
+ }
+ }
+ for (key = NUM_ITEMS; key < NUM_ITEMS + 20; key++)
+ {
+ val = (unsigned int)chashSearch(localHash, key);
+ if ((void *)val != NULL)
+ {
+ printf("error: returned value for key that wasn't inserted: key = %d, val = %d\n", key, val);
+ error = 1;
+ }
+ }
+ }
+
+ if (!error)
+ printf("test completed successfully\n");
+ else
+ printf("one or more errors occurred\n");
+
+ printf("testing dhtRemove(), removing half of the keys, and verifying that the other half is still there\n");
- for(i = 0; i < 3; i++)
+ for (key = 0; key < NUM_ITEMS / 2; key++)
{
- dhtInsert(i, 10-i);
- sleep(1);
- dhtRemove(i);
- sleep(1);
- dhtSearch(i);
- sleep(1);
+ dhtRemove(key);
}
+ error = 0;
+ for (key = 0; key < NUM_ITEMS / 2; key++)
+ {
+ retval = dhtSearch(key, &val);
+ if (retval == 0)
+ {
+ printf("error: found removed item: key = %d, val = %d\n", key, val);
+ error = 1;
+ }
+ else if (retval == -1)
+ {
+ printf("internal error: key = %d, val = %d\n", key, val);
+ error = 1;
+ }
+ }
+ for (key = NUM_ITEMS / 2; key < NUM_ITEMS; key++)
+ {
+ retval = dhtSearch(key, &val);
+ if (retval == 1)
+ {
+ printf("item not found: key = %d, expected val = %d\n", key, vals[key]);
+ error = 1;
+ }
+ else if (retval == -1)
+ {
+ printf("internal error: key = %d, expected val = %d\n", key, vals[key]);
+ error = 1;
+ }
+ else if (retval == 0)
+ {
+ if (vals[key] != val)
+ {
+ printf("unexpected value: key = %d, expected val = %d, val = %d\n", key, vals[key], val);
+ error = 1;
+ }
+ }
+ }
+
+ if (!error)
+ printf("test completed successfully\n");
+ else
+ printf("one or more errors occurred\n");
+
return 0;
}
+