#include "clookup.h" //this works for now, do we need anything better?
#define BUFFER_SIZE 512 //maximum message size
-#define LISTEN_PORT 2157
+#define UDP_PORT 2157
+#define TCP_PORT 2157
+#define BACKLOG 10 //max pending tcp connections
#define TIMEOUT_MS 500
#define MAX_RETRIES 3
-#define INIT_NUM_BLOCKS 16
+#define INIT_HOST_ALLOC 16
+#define INIT_BLOCK_NUM 64
#define DEFAULT_INTERFACE "eth0"
-//general commands
-#define INSERT_COMMAND 1
-#define REMOVE_COMMAND 2
-#define SEARCH_COMMAND 3
-//general responses
-#define INSERT_RESPONSE 4
-#define REMOVE_RESPONSE 5
-#define SEARCH_RESPONSE 6
-
-//#define JOIN
-//#define LEAVE
-//reserved for leader
-//#define REBUILD
+enum {
+ INSERT_COMMAND,
+ REMOVE_COMMAND,
+ SEARCH_COMMAND,
+ FIND_LEADER_COMMAND,
+ INSERT_RESPONSE,
+ REMOVE_RESPONSE,
+ SEARCH_RESPONSE,
+ FIND_LEADER_RESPONSE
+};
-//etc...
//status codes
-#define INSERT_OK 1
-#define INSERT_ERROR 2
-#define REMOVE_OK 3
-#define REMOVE_ERROR 4
-#define KEY_FOUND 5
-#define KEY_NOT_FOUND 6
-#define NOT_KEY_OWNER 7
+enum {
+ INSERT_OK,
+ INSERT_ERROR,
+ REMOVE_OK,
+ REMOVE_ERROR,
+ KEY_FOUND,
+ KEY_NOT_FOUND,
+ NOT_KEY_OWNER
+};
struct hostData {
unsigned int ipAddr;
unsigned int maxKeyCapacity;
- struct hostData *next;
};
struct insertCmd {
- unsigned char msgType;
- unsigned int unused:12;
+ unsigned int msgType;
unsigned int key;
unsigned int val;
};
struct removeCmd {
- unsigned char msgType;
- unsigned int unused:12;
+ unsigned int msgType;
unsigned int key;
};
struct searchCmd {
- unsigned char msgType;
- unsigned int unused:12;
+ unsigned int msgType;
unsigned int key;
};
struct insertRes {
- unsigned char msgType;
- unsigned int status:12;
+ unsigned int msgType;
+ unsigned int status;
};
struct removeRes {
- unsigned char msgType;
- unsigned int status:12;
+ unsigned int msgType;
+ unsigned int status;
};
struct searchRes {
- unsigned char msgType;
- unsigned int status:12;
+ unsigned int msgType;
+ unsigned int status;
unsigned int val;
};
-/*struct joinMsg {
- unsigned char msgType;
- unsigned int unused:12;
- struct hostData newHost;
-};*/
//TODO: leave message, rebuild message...
+struct hostData myHostData;
unsigned int numHosts;
-struct hostData *hostList;
-struct hostData *myHostData;
+struct hostData *hostArray;
+unsigned int hostArraySize;
unsigned int numBlocks;
-struct hostData **blockOwner;
-
+unsigned int *blockOwnerArray;
+unsigned int blockOwnerArraySize;
unsigned int getMyIpAddr();
-void *dhtListen();
+void *udpListen();
+void *tcpListen();
+void *tcpAccept(void *);
//returns number of bytes received in resBuffer, or -1 if an error occurred
-int sendWaitForResponse(unsigned int dest_ip, unsigned short dest_port, void *msg, unsigned int msglen, void *resBuffer, unsigned int resBufferSize, unsigned int timeout, unsigned int numRetries);
+int udpSendWaitForResponse(unsigned int dest_ip, unsigned short dest_port, void *msg, unsigned int msglen, void *resBuffer, unsigned int resBufferSize, unsigned int timeout, unsigned int numRetries);
int sendNoWait(unsigned int dest_ip, unsigned short dest_port, void *msg, unsigned int msglen);
unsigned int getKeyOwner(unsigned int key);
unsigned int hash(unsigned int x);
void dhtInit(unsigned int maxKeyCapacity)
{
+ unsigned int myMessage;
+ int bytesReceived;
int i;
- myHostData = malloc(sizeof(struct hostData));
- myHostData->ipAddr = getMyIpAddr();
- myHostData->maxKeyCapacity;
- myHostData->next = NULL;
+ myHostData.ipAddr = getMyIpAddr();
+ myHostData.maxKeyCapacity = maxKeyCapacity;
+
- //announce presence (udp), get data structures from leader (leader initiates tcp transfer)
- //if no response, I am the first
- hostList = myHostData;
- numBlocks = INIT_NUM_BLOCKS;
- blockOwner = malloc(numBlocks * sizeof(struct hostData));
+ //announce presence (udp broadcast), get data structures from leader (leader initiates tcp transfer)
+
+
+//if no response, I am the first
+
+ numHosts = 1;
+ hostArray = malloc(INIT_HOST_ALLOC * sizeof(struct hostData));
+ hostArray[0] = myHostData;
+
+ numBlocks = INIT_BLOCK_NUM;
+ blockOwnerArray = malloc(numBlocks * sizeof(unsigned short));
for (i = 0; i < numBlocks; i++)
{
- blockOwner[i] = myHostData;
+ blockOwnerArray[i] = 0;
}
//otherwise, scan array and choose blocks to take over
//notify (the leader or everybody?) of ownership changes
//start server (udp)
- pthread_t threadListen;
- pthread_create(&threadListen, NULL, dhtListen, NULL);
+ pthread_t threadUdpListen, threadTcpListen;
+ pthread_create(&threadUdpListen, NULL, udpListen, NULL);
+ pthread_create(&threadTcpListen, NULL, tcpListen, NULL);
return;
}
int dhtInsert(unsigned int key, unsigned int val)
{
unsigned int dest_ip = getKeyOwner(key);
- unsigned short dest_port = LISTEN_PORT;
struct insertCmd myMessage;
struct insertRes response;
int bytesReceived;
myMessage.key = key;
myMessage.val = val;
- bytesReceived = sendWaitForResponse(dest_ip, dest_port, (void *)&myMessage, sizeof(struct insertCmd), (void *)&response, sizeof(struct insertRes), TIMEOUT_MS, MAX_RETRIES);
+ bytesReceived = udpSendWaitForResponse(dest_ip, UDP_PORT, (void *)&myMessage, sizeof(struct insertCmd), (void *)&response, sizeof(struct insertRes), TIMEOUT_MS, MAX_RETRIES);
if (bytesReceived == sizeof(struct insertRes))
{
if (response.msgType == INSERT_RESPONSE)
int dhtRemove(unsigned int key)
{
unsigned int dest_ip = getKeyOwner(key);
- unsigned short dest_port = LISTEN_PORT;
struct removeCmd myMessage;
struct removeRes response;
int bytesReceived;
myMessage.msgType = REMOVE_COMMAND;
myMessage.key = key;
- bytesReceived = sendWaitForResponse(dest_ip, dest_port, (void *)&myMessage, sizeof(struct removeCmd), (void *)&response, sizeof(struct removeRes), TIMEOUT_MS, MAX_RETRIES);
+ bytesReceived = udpSendWaitForResponse(dest_ip, UDP_PORT, (void *)&myMessage, sizeof(struct removeCmd), (void *)&response, sizeof(struct removeRes), TIMEOUT_MS, MAX_RETRIES);
if (bytesReceived == sizeof(struct removeRes))
{
if (response.msgType == REMOVE_RESPONSE)
int dhtSearch(unsigned int key, unsigned int *val)
{
unsigned int dest_ip = getKeyOwner(key);
- unsigned short dest_port = LISTEN_PORT;
struct searchCmd myMessage;
struct searchRes response;
int bytesReceived;
myMessage.msgType = SEARCH_COMMAND;
myMessage.key = key;
- bytesReceived = sendWaitForResponse(dest_ip, dest_port, (void *)&myMessage, sizeof(struct searchCmd), (void *)&response, sizeof(struct searchRes), TIMEOUT_MS, MAX_RETRIES);
+ bytesReceived = udpSendWaitForResponse(dest_ip, UDP_PORT, (void *)&myMessage, sizeof(struct searchCmd), (void *)&response, sizeof(struct searchRes), TIMEOUT_MS, MAX_RETRIES);
if (bytesReceived == sizeof(struct searchRes))
{
if (response.msgType == SEARCH_RESPONSE)
return -1; //this function should be robust enough to always return 0 or 1
}
-//helper functions
-void *dhtListen()
+
+
+//use UDP for messages that are frequent and short
+void *udpListen()
{
- struct sockaddr_in my_addr;
- struct sockaddr_in client_addr;
+ struct sockaddr_in myAddr;
+ struct sockaddr_in clientAddr;
int sock;
socklen_t socklen = sizeof(struct sockaddr_in);
char buffer[BUFFER_SIZE];
chashtable_t *myHashTable = chashCreate(HASH_SIZE, LOADFACTOR);
- if ((sock = socket(PF_INET, SOCK_DGRAM, IPPROTO_UDP)) == -1)
+ if ((sock = socket(AF_INET, SOCK_DGRAM, IPPROTO_UDP)) == -1)
{
perror("socket()");
- exit(1);
+ pthread_exit(NULL);
}
- bzero(&my_addr, socklen);
- my_addr.sin_family=AF_INET;
- my_addr.sin_addr.s_addr=INADDR_ANY;
- my_addr.sin_port=htons(LISTEN_PORT);
+ bzero(&myAddr, socklen);
+ myAddr.sin_family=AF_INET;
+ myAddr.sin_addr.s_addr=INADDR_ANY;
+ myAddr.sin_port=htons(UDP_PORT);
- if (bind(sock, (struct sockaddr *)&my_addr, socklen) == -1)
+ if (bind(sock, (struct sockaddr *)&myAddr, socklen) == -1)
{
perror("bind()");
- exit(1);
+ pthread_exit(NULL);
}
// printf("listening...\n");
while(1)
{
- if ((bytesReceived = recvfrom(sock, buffer, BUFFER_SIZE, 0, (struct sockaddr *)&client_addr, &socklen)) == -1)
+ if ((bytesReceived = recvfrom(sock, buffer, BUFFER_SIZE, 0, (struct sockaddr *)&clientAddr, &socklen)) == -1)
{
perror("recvfrom()");
break;
gettimeofday(&now, NULL);
// printf("message received:%ds,%dus\n", now.tv_sec, now.tv_usec);
-// printf("Received %d bytes from %x:%d\n", bytesReceived, client_addr.sin_addr.s_addr, client_addr.sin_port);
+// printf("Received %d bytes from %x:%d\n", bytesReceived, clientAddr.sin_addr.s_addr, clientAddr.sin_port);
switch (buffer[0])
{
case INSERT_COMMAND:
// printf("Insert: key=%d, val=%d\n", insertCmdPtr->key, insertCmdPtr->val);
insertResPtr = (struct insertRes *)replyBuffer;
insertResPtr->msgType = INSERT_RESPONSE;
- if (getKeyOwner(insertCmdPtr->key) == myHostData->ipAddr)
+ if (getKeyOwner(insertCmdPtr->key) == myHostData.ipAddr)
{
//note: casting val to void * in order to conform to API
if(chashInsert(myHashTable, insertCmdPtr->key, (void *)insertCmdPtr->val) == 0)
{
insertResPtr->status = NOT_KEY_OWNER;;
}
- sendto(sock, (void *)insertResPtr, sizeof(struct insertRes), 0, (struct sockaddr *)&client_addr, socklen);
+ sendto(sock, (void *)insertResPtr, sizeof(struct insertRes), 0, (struct sockaddr *)&clientAddr, socklen);
break;
case REMOVE_COMMAND:
if (bytesReceived != sizeof(struct removeCmd))
// printf("Remove: key=%d\n", removeCmdPtr->key);
removeResPtr = (struct removeRes *)replyBuffer;
removeResPtr->msgType = REMOVE_RESPONSE;
- if (getKeyOwner(removeCmdPtr->key) == myHostData->ipAddr)
+ if (getKeyOwner(removeCmdPtr->key) == myHostData.ipAddr)
{
//note: casting val to void * in order to conform to API
if(chashRemove(myHashTable, removeCmdPtr->key) == 0)
{
removeResPtr->status = NOT_KEY_OWNER;
}
- sendto(sock, (void *)removeResPtr, sizeof(struct removeRes), 0, (struct sockaddr *)&client_addr, socklen);
+ sendto(sock, (void *)removeResPtr, sizeof(struct removeRes), 0, (struct sockaddr *)&clientAddr, socklen);
break;
case SEARCH_COMMAND:
if (bytesReceived != sizeof(struct searchCmd))
// printf("Search: key=%d\n",searchCmdPtr->key);
searchResPtr = (struct searchRes *)replyBuffer;
searchResPtr->msgType = SEARCH_RESPONSE;
- if (getKeyOwner(searchCmdPtr->key) == myHostData->ipAddr)
+ if (getKeyOwner(searchCmdPtr->key) == myHostData.ipAddr)
{
//note: casting val to void * in order to conform to API
if((searchResPtr->val = (unsigned int)chashSearch(myHashTable, searchCmdPtr->key)) == 0)
{
searchResPtr->status = NOT_KEY_OWNER;
}
- sendto(sock, (void *)searchResPtr, sizeof(struct searchRes), 0, (struct sockaddr *)&client_addr, socklen);
+ sendto(sock, (void *)searchResPtr, sizeof(struct searchRes), 0, (struct sockaddr *)&clientAddr, socklen);
break;
//just ignore anything else
// default:
}
}
-int sendWaitForResponse(unsigned int dest_ip, unsigned short dest_port, void *msg, unsigned int msglen, void *resBuffer, unsigned int resBufferSize, unsigned int timeout, unsigned int numRetries)
+int udpSendWaitForResponse(unsigned int dest_ip, unsigned short dest_port, void *msg, unsigned int msglen, void *resBuffer, unsigned int resBufferSize, unsigned int timeout, unsigned int numRetries)
{
struct sockaddr_in server_addr;
struct sockaddr_in ack_addr;
return -1;
}
+// use TCP for potentially large and/or important data transfer
+void *tcpListen()
+{
+ struct sockaddr_in myAddr;
+ struct sockaddr_in clientAddr;
+ int sockListen, sockAccept;
+ socklen_t socklen = sizeof(struct sockaddr_in);
+ pthread_t threadTcpAccept;
+
+ sockListen = socket(AF_INET, SOCK_STREAM, 0);
+ if (sockListen == -1)
+ {
+ perror("socket()");
+ pthread_exit(NULL);
+ }
+
+ myAddr.sin_family = AF_INET;
+ myAddr.sin_port = htons(TCP_PORT);
+ myAddr.sin_addr.s_addr = INADDR_ANY;
+ memset(&(myAddr.sin_zero), '\0', 8);
+
+ if (bind(sockListen, (struct sockaddr *)&myAddr, socklen) == -1)
+ {
+ perror("socket()");
+ pthread_exit(NULL);
+ }
+
+ if (listen(sockListen, BACKLOG) == -1)
+ {
+ perror("listen()");
+ pthread_exit(NULL);
+ }
+
+ while(1)
+ {
+ sockAccept = accept(sockListen, (struct sockaddr *)&clientAddr, &socklen);
+ pthread_create(&threadTcpAccept, NULL, tcpAccept, (void *)sockAccept);
+ }
+}
+
+void *tcpAccept(void *arg)
+{
+ int sockAccept = (int)arg;
+
+ printf("accepted tcp connection, file descriptor: %d\n", sockAccept);
+
+ sleep(30);
+
+ if (close(sockAccept) == -1)
+ {
+ perror("close()");
+ }
+
+ printf("closed tcp connection, file descriptor: %d\n", sockAccept);
+
+ pthread_exit(NULL);
+}
+
unsigned int getKeyOwner(unsigned int key)
{
- return blockOwner[hash(key)]->ipAddr;
+ return hostArray[blockOwnerArray[hash(key)]].ipAddr;
}
unsigned int getMyIpAddr()
if((sock = socket(AF_INET, SOCK_STREAM, 0)) < 0)
{
- perror("socket");
+ perror("socket()");
return 1;
}
if(ioctl(sock, SIOCGIFADDR, &interfaceInfo) != 0)
{
- perror("ioctl");
+ perror("ioctl()");
return 1;
}