+#include "dht.h"
+
+#ifdef SIMPLE_DHT
+
+#include <arpa/inet.h>
+
+#define NUM_HOSTS 4
+#define OIDS_PER_HOST 0x40000000
+
+//set these to your IP addresses
+unsigned int hosts[NUM_HOSTS] = {
+ 0xc0a802c8,
+ 0xc0a802c9,
+ 0xc0a802ca,
+ 0xc0a802cb,
+};
+
+//does nothing
+void dhtInit(unsigned int maxKeyCapaciy)
+{ return;}
+
+//does nothing
+void dhtExit()
+{ return;}
+
+//does nothing, returns 0
+int dhtInsert(unsigned int key, unsigned int val)
+{ return 0;}
+
+//does nothing, returns 0
+int dhtRemove(unsigned int key)
+{ return 0;}
+
+//returns 0 if successful and copies val into *val,
+// 1 if key not found, -1 if an error occurred
+int dhtSearch(unsigned int key, unsigned int *val)
+{
+ *val = hosts[key / OIDS_PER_HOST];
+ return 0;
+}
+
+#else
+
#include <netinet/in.h>
#include <arpa/inet.h>
#include <sys/types.h>
#include <netdb.h>
#include <net/if.h>
#include <linux/sockios.h>
-#include "dht.h"
#include "clookup.h" //this works for now, do we need anything better?
#define BUFFER_SIZE 512 //maximum message size
#define INIT_HOST_ALLOC 16
#define INIT_BLOCK_NUM 64
#define DEFAULT_INTERFACE "eth0"
+#define DHT_LOG "dht.log"
enum {
- INSERT_COMMAND,
- REMOVE_COMMAND,
- SEARCH_COMMAND,
- FIND_LEADER_COMMAND,
- INSERT_RESPONSE,
- REMOVE_RESPONSE,
- SEARCH_RESPONSE,
- FIND_LEADER_RESPONSE
+ INSERT_CMD,
+ INSERT_RES,
+ REMOVE_CMD,
+ REMOVE_RES,
+ SEARCH_CMD,
+ SEARCH_RES,
+ FIND_LEADER_CMD,
+ FIND_LEADER_RES,
+
+ REBUILD_REQ,
+ REBUILD_CMD,
+ JOIN_REQ,
+ JOIN_RES,
+ DHT_INFO_CMD,
+ DHT_INFO_RES,
+ FILL_DHT_CMD,
+ FILL_DHT_RES,
+ REBUILD_DONE_INFO
};
};
struct insertCmd {
- unsigned int msgType;
+ unsigned int msgType:8;
+ unsigned int unused:24;
unsigned int key;
unsigned int val;
};
struct removeCmd {
- unsigned int msgType;
+ unsigned int msgType:8;
+ unsigned int unused:24;
unsigned int key;
};
struct searchCmd {
- unsigned int msgType;
+ unsigned int msgType:8;
+ unsigned int unused:24;
unsigned int key;
};
struct insertRes {
- unsigned int msgType;
+ unsigned int msgType:8;
+ unsigned int unused:24;
unsigned int status;
};
struct removeRes {
- unsigned int msgType;
+ unsigned int msgType:8;
+ unsigned int unused:24;
unsigned int status;
};
struct searchRes {
- unsigned int msgType;
+ unsigned int msgType:8;
+ unsigned int unused:24;
unsigned int status;
unsigned int val;
};
//TODO: leave message, rebuild message...
+FILE *logfile;
+unsigned int leader; //ip address of leader
struct hostData myHostData;
+/*----DHT data----*/
unsigned int numHosts;
struct hostData *hostArray;
-unsigned int hostArraySize;
unsigned int numBlocks;
unsigned int *blockOwnerArray;
-unsigned int blockOwnerArraySize;
+/*----end DHT data----*/
+//return my IP address
unsigned int getMyIpAddr();
+//sends broadcast to discover leader
+unsigned int getLeadersIpAddr();
+//UDP server
void *udpListen();
+//TCP server
void *tcpListen();
+//TCP connection handler
void *tcpAccept(void *);
//returns number of bytes received in resBuffer, or -1 if an error occurred
-int udpSendWaitForResponse(unsigned int dest_ip, unsigned short dest_port, void *msg, unsigned int msglen, void *resBuffer, unsigned int resBufferSize, unsigned int timeout, unsigned int numRetries);
-int sendNoWait(unsigned int dest_ip, unsigned short dest_port, void *msg, unsigned int msglen);
+int udpSendWaitForResponse(unsigned int dest_ip, unsigned short dest_port,
+ void *msg, unsigned int msglen, void *resBuffer, unsigned int resBufferSize,
+ unsigned int timeout, unsigned int numRetries);
+//returns number of bytes received in resBuffer, or -1 if an error occurred
+int udpBroadcastWaitForResponse(unsigned int *reply_ip,
+ unsigned short dest_port, void *msg, unsigned int msglen, void *resBuffer,
+ unsigned int resBufferSize, unsigned int timeout, unsigned int numRetries);
+//just UDP it
+int sendNoWait(unsigned int dest_ip, unsigned short dest_port, void *msg,
+ unsigned int msglen);
+//right now this hashes the key into a block and returns the block owner
unsigned int getKeyOwner(unsigned int key);
+//simple hash
unsigned int hash(unsigned int x);
+//initiates TCP connection with leader, gets DHT data
+int getDHTdata();
+//outputs readable DHT data to outfile
+void writeDHTdata(FILE *outfile);
void dhtInit(unsigned int maxKeyCapacity)
{
unsigned int myMessage;
int bytesReceived;
int i;
+ int ret;
+
+#ifdef DHT_LOG
+ logfile = fopen(DHT_LOG, "w");
+#endif
myHostData.ipAddr = getMyIpAddr();
myHostData.maxKeyCapacity = maxKeyCapacity;
-
-
- //announce presence (udp broadcast), get data structures from leader (leader initiates tcp transfer)
-
+ numHosts = numBlocks = 0;
+ hostArray = NULL;
+ blockOwnerArray = NULL;
-//if no response, I am the first
+ leader = getLeadersIpAddr();
- numHosts = 1;
- hostArray = malloc(INIT_HOST_ALLOC * sizeof(struct hostData));
- hostArray[0] = myHostData;
+ if (leader == 0)
+ { //no response: I am the first
+ leader = getMyIpAddr();
- numBlocks = INIT_BLOCK_NUM;
- blockOwnerArray = malloc(numBlocks * sizeof(unsigned short));
- for (i = 0; i < numBlocks; i++)
+ numHosts = 1;
+ hostArray = calloc(numHosts, sizeof(struct hostData));
+ hostArray[0] = myHostData;
+ numBlocks = INIT_BLOCK_NUM;
+ blockOwnerArray = calloc(numBlocks, sizeof(unsigned int));
+ for (i = 0; i < numBlocks; i++)
+ blockOwnerArray[i] = 0;
+ }
+ else
{
- blockOwnerArray[i] = 0;
+ //get DHT data from leader
+ ret = getDHTdata();
+
+ //TODO: actually, just initiate a rebuild here instead
}
-
- //otherwise, scan array and choose blocks to take over
- //get data from hosts that own those blocks (tcp), fill hash table
- //notify (the leader or everybody?) of ownership changes
-
- //start server (udp)
+
+ //start servers
pthread_t threadUdpListen, threadTcpListen;
pthread_create(&threadUdpListen, NULL, udpListen, NULL);
pthread_create(&threadTcpListen, NULL, tcpListen, NULL);
void dhtExit()
{
-
+ fclose(logfile);
}
int dhtInsert(unsigned int key, unsigned int val)
struct insertRes response;
int bytesReceived;
- myMessage.msgType = INSERT_COMMAND;
+ myMessage.msgType = INSERT_CMD;
myMessage.key = key;
myMessage.val = val;
- bytesReceived = udpSendWaitForResponse(dest_ip, UDP_PORT, (void *)&myMessage, sizeof(struct insertCmd), (void *)&response, sizeof(struct insertRes), TIMEOUT_MS, MAX_RETRIES);
+ bytesReceived = udpSendWaitForResponse(dest_ip, UDP_PORT, (void *)&myMessage,
+ sizeof(struct insertCmd), (void *)&response, sizeof(struct insertRes),
+ TIMEOUT_MS, MAX_RETRIES);
if (bytesReceived == sizeof(struct insertRes))
{
- if (response.msgType == INSERT_RESPONSE)
+ if (response.msgType == INSERT_RES)
{
if (response.status == INSERT_OK)
return 0;
struct removeRes response;
int bytesReceived;
- myMessage.msgType = REMOVE_COMMAND;
+ myMessage.msgType = REMOVE_CMD;
myMessage.key = key;
- bytesReceived = udpSendWaitForResponse(dest_ip, UDP_PORT, (void *)&myMessage, sizeof(struct removeCmd), (void *)&response, sizeof(struct removeRes), TIMEOUT_MS, MAX_RETRIES);
+ bytesReceived = udpSendWaitForResponse(dest_ip, UDP_PORT, (void *)&myMessage,
+ sizeof(struct removeCmd), (void *)&response, sizeof(struct removeRes),
+ TIMEOUT_MS, MAX_RETRIES);
if (bytesReceived == sizeof(struct removeRes))
{
- if (response.msgType == REMOVE_RESPONSE)
+ if (response.msgType == REMOVE_RES)
{
if (response.status == REMOVE_OK)
return 0;
struct searchRes response;
int bytesReceived;
- myMessage.msgType = SEARCH_COMMAND;
+ myMessage.msgType = SEARCH_CMD;
myMessage.key = key;
- bytesReceived = udpSendWaitForResponse(dest_ip, UDP_PORT, (void *)&myMessage, sizeof(struct searchCmd), (void *)&response, sizeof(struct searchRes), TIMEOUT_MS, MAX_RETRIES);
+ bytesReceived = udpSendWaitForResponse(dest_ip, UDP_PORT, (void *)&myMessage,
+ sizeof(struct searchCmd), (void *)&response, sizeof(struct searchRes),
+ TIMEOUT_MS, MAX_RETRIES);
if (bytesReceived == sizeof(struct searchRes))
{
- if (response.msgType == SEARCH_RESPONSE)
+ if (response.msgType == SEARCH_RES)
{
if (response.status == KEY_FOUND)
{
if ((sock = socket(AF_INET, SOCK_DGRAM, IPPROTO_UDP)) == -1)
{
- perror("socket()");
+ perror("udpListen():socket()");
pthread_exit(NULL);
}
if (bind(sock, (struct sockaddr *)&myAddr, socklen) == -1)
{
- perror("bind()");
+ perror("udpListen():bind()");
pthread_exit(NULL);
}
-// printf("listening...\n");
+#ifdef DHT_LOG
+ fprintf(logfile,"udpListen(): listening on port %d\n", UDP_PORT);
+ fflush(logfile);
+#endif
while(1)
{
- if ((bytesReceived = recvfrom(sock, buffer, BUFFER_SIZE, 0, (struct sockaddr *)&clientAddr, &socklen)) == -1)
+ if ((bytesReceived = recvfrom(sock, buffer, BUFFER_SIZE, 0,
+ (struct sockaddr *)&clientAddr, &socklen)) == -1)
{
- perror("recvfrom()");
+ perror("udpListen():recvfrom()");
break;
}
if (bytesReceived == 0)
{
- printf("recvfrom() returned 0\n");
+#ifdef DHT_LOG
+ fprintf(logfile,"udpListen(): recvfrom() returned 0\n");
+ fflush(logfile);
+#endif
break;
}
gettimeofday(&now, NULL);
-// printf("message received:%ds,%dus\n", now.tv_sec, now.tv_usec);
+#ifdef DHT_LOG
+ fprintf(logfile,"udpListen(): message received:%ds,%dus\n", now.tv_sec,
+ now.tv_usec);
+ fprintf(logfile,"udpListen(): received %d bytes from %s:%d\n",
+ bytesReceived,inet_ntoa(clientAddr.sin_addr),htons(clientAddr.sin_port));
+ fflush(logfile);
+#endif
-// printf("Received %d bytes from %x:%d\n", bytesReceived, clientAddr.sin_addr.s_addr, clientAddr.sin_port);
switch (buffer[0])
{
- case INSERT_COMMAND:
+ case INSERT_CMD:
if (bytesReceived != sizeof(struct insertCmd))
{
- printf("error: incorrect message size\n");
+#ifdef DHT_LOG
+ fprintf(logfile, "udpListen(): ERROR: incorrect message size\n");
+ fflush(logfile);
+#endif
break;
}
insertCmdPtr = (struct insertCmd *)buffer;
-// printf("Insert: key=%d, val=%d\n", insertCmdPtr->key, insertCmdPtr->val);
+#ifdef DHT_LOG
+ fprintf(logfile, "udpListen(): Insert: key=%d, val=%d\n",
+ insertCmdPtr->key, insertCmdPtr->val);
+ fflush(logfile);
+#endif
insertResPtr = (struct insertRes *)replyBuffer;
- insertResPtr->msgType = INSERT_RESPONSE;
+ insertResPtr->msgType = INSERT_RES;
if (getKeyOwner(insertCmdPtr->key) == myHostData.ipAddr)
{
//note: casting val to void * in order to conform to API
- if(chashInsert(myHashTable, insertCmdPtr->key, (void *)insertCmdPtr->val) == 0)
+ if(chashInsert(myHashTable, insertCmdPtr->key,
+ (void *)insertCmdPtr->val) == 0)
insertResPtr->status = INSERT_OK;
else
insertResPtr->status = INSERT_ERROR;
{
insertResPtr->status = NOT_KEY_OWNER;;
}
- sendto(sock, (void *)insertResPtr, sizeof(struct insertRes), 0, (struct sockaddr *)&clientAddr, socklen);
+ if (sendto(sock, (void *)insertResPtr, sizeof(struct insertRes), 0,
+ (struct sockaddr *)&clientAddr, socklen) == -1)
+ {
+ perror("udpListen():sendto()");
+ }
break;
- case REMOVE_COMMAND:
+ case REMOVE_CMD:
if (bytesReceived != sizeof(struct removeCmd))
{
- printf("error: incorrect message size\n");
+#ifdef DHT_LOG
+ fprintf(logfile, "udpListen(): ERROR: incorrect message size\n");
+ fflush(logfile);
+#endif
break;
}
removeCmdPtr = (struct removeCmd *)buffer;
-// printf("Remove: key=%d\n", removeCmdPtr->key);
+#ifdef DHT_LOG
+ fprintf(logfile,"udpListen(): Remove: key=%d\n", removeCmdPtr->key);
+ fflush(logfile);
+#endif
removeResPtr = (struct removeRes *)replyBuffer;
- removeResPtr->msgType = REMOVE_RESPONSE;
+ removeResPtr->msgType = REMOVE_RES;
if (getKeyOwner(removeCmdPtr->key) == myHostData.ipAddr)
{
//note: casting val to void * in order to conform to API
{
removeResPtr->status = NOT_KEY_OWNER;
}
- sendto(sock, (void *)removeResPtr, sizeof(struct removeRes), 0, (struct sockaddr *)&clientAddr, socklen);
+ if (sendto(sock, (void *)removeResPtr, sizeof(struct removeRes), 0,
+ (struct sockaddr *)&clientAddr, socklen) == -1)
+ {
+ perror("udpListen():sendto()");
+ }
break;
- case SEARCH_COMMAND:
+ case SEARCH_CMD:
if (bytesReceived != sizeof(struct searchCmd))
{
- printf("error: incorrect message size\n");
+#ifdef DHT_LOG
+ fprintf(logfile,"udpListen(): ERROR: incorrect message size\n");
+ fflush(logfile);
+#endif
break;
}
searchCmdPtr = (struct searchCmd *)buffer;
-// printf("Search: key=%d\n",searchCmdPtr->key);
+#ifdef DHT_LOG
+ fprintf(logfile,"udpListen(): Search: key=%d\n",searchCmdPtr->key);
+ fflush(logfile);
+#endif
searchResPtr = (struct searchRes *)replyBuffer;
- searchResPtr->msgType = SEARCH_RESPONSE;
+ searchResPtr->msgType = SEARCH_RES;
if (getKeyOwner(searchCmdPtr->key) == myHostData.ipAddr)
{
//note: casting val to void * in order to conform to API
- if((searchResPtr->val = (unsigned int)chashSearch(myHashTable, searchCmdPtr->key)) == 0)
+ if((searchResPtr->val = (unsigned int)chashSearch(myHashTable,
+ searchCmdPtr->key)) == 0)
searchResPtr->status = KEY_NOT_FOUND;
else
searchResPtr->status = KEY_FOUND;
{
searchResPtr->status = NOT_KEY_OWNER;
}
- sendto(sock, (void *)searchResPtr, sizeof(struct searchRes), 0, (struct sockaddr *)&clientAddr, socklen);
+ if (sendto(sock, (void *)searchResPtr, sizeof(struct searchRes), 0,
+ (struct sockaddr *)&clientAddr, socklen) == -1)
+ {
+ perror("udpListen():sendto()");
+ }
break;
- //just ignore anything else
-// default:
-// printf("Unknown message type\n");
+ case FIND_LEADER_CMD:
+ if (bytesReceived != sizeof(char))
+ {
+#ifdef DHT_LOG
+ fprintf(logfile, "udpListen(): ERROR: incorrect message size\n");
+ fflush(logfile);
+#endif
+ break;
+ }
+ if (leader == getMyIpAddr())
+ {
+ replyBuffer[0] = FIND_LEADER_RES;
+ if(sendto(sock, (void *)replyBuffer, sizeof(char), 0,
+ (struct sockaddr *)&clientAddr, socklen) == -1)
+ {
+ perror("udpListen():sendto");
+ }
+ }
+ break;
+ default:
+#ifdef DHT_LOG
+ fprintf(logfile,"udpListen(): ERROR: Unknown message type\n");
+ fflush(logfile);
+#endif
}
}
}
-int udpSendWaitForResponse(unsigned int dest_ip, unsigned short dest_port, void *msg, unsigned int msglen, void *resBuffer, unsigned int resBufferSize, unsigned int timeout, unsigned int numRetries)
+int udpSendWaitForResponse(unsigned int dest_ip, unsigned short dest_port,
+ void *msg, unsigned int msglen, void *resBuffer, unsigned int resBufferSize,
+ unsigned int timeout, unsigned int numRetries)
{
struct sockaddr_in server_addr;
struct sockaddr_in ack_addr;
socklen_t socklen = sizeof(struct sockaddr_in);
struct pollfd pollsock;
-// struct timeval now;
+ struct timeval now;
int retval;
int i;
ssize_t bytesReceived;
if ((pollsock.fd = socket(PF_INET, SOCK_DGRAM, IPPROTO_UDP)) == -1)
{
- perror("socket()");
+ perror("udpSendWaitForResponse():socket()");
return -1;
}
for (i = 0; i < MAX_RETRIES; i++)
{
-// if (i > 0)
-// printf("trying again, count: %d\n", i+1);
- if (sendto(pollsock.fd, msg, msglen, 0, (struct sockaddr *)&server_addr, socklen) == -1)
+#ifdef DHT_LOG
+ if (i > 0)
+ fprintf(logfile,"udpSendWaitForResponse(): trying again, count: %d\n",
+ i+1);
+ fflush(logfile);
+#endif
+ if (sendto(pollsock.fd, msg, msglen, 0, (struct sockaddr *)&server_addr,
+ socklen) == -1)
{
- perror("sendto");
+ perror("udpSendWaitForResponse():sendto");
return -1;
}
-// gettimeofday(&now, NULL);
-// printf("message sent:%ds,%dus\n", now.tv_sec, now.tv_usec);
+#ifdef DHT_LOG
+ gettimeofday(&now, NULL);
+ fprintf(logfile,"udpSendWaitForResponse(): message sent:%ds,%dus\n",
+ now.tv_sec, now.tv_usec);
+ fflush(logfile);
+#endif
retval = poll(&pollsock, 1, timeout);
if (retval !=0)
{
- bytesReceived = recvfrom(pollsock.fd, resBuffer, resBufferSize, 0, (struct sockaddr *)&ack_addr, &socklen);
+ bytesReceived = recvfrom(pollsock.fd, resBuffer, resBufferSize, 0,
+ (struct sockaddr *)&ack_addr, &socklen);
if ((ack_addr.sin_addr.s_addr == server_addr.sin_addr.s_addr)
&& (ack_addr.sin_port == server_addr.sin_port))
{
close(pollsock.fd);
-// gettimeofday(&now, NULL);
-// printf("received response:%ds,%dus\n", now.tv_sec, now.tv_usec);
+#ifdef DHT_LOG
+ gettimeofday(&now, NULL);
+ fprintf(logfile,"udpSendWaitForResponse(): received response:%ds,%dus\n", now.tv_sec, now.tv_usec);
+ fflush(logfile);
+#endif
return bytesReceived;
}
}
}
close(pollsock.fd);
-// gettimeofday(&now, NULL);
-// printf("timed out, no ack:%ds,%dus\n", now.tv_sec, now.tv_usec);
+#ifdef DHT_LOG
+ gettimeofday(&now, NULL);
+ printf("udpSendWaitForResponse(): timed out, no ack:%ds,%dus\n",
+ now.tv_sec, now.tv_usec);
+ fflush(logfile);
+#endif
+ return -1;
+}
+
+int udpBroadcastWaitForResponse(unsigned int *reply_ip,
+ unsigned short dest_port, void *msg, unsigned int msglen, void *resBuffer,
+ unsigned int resBufferSize, unsigned int timeout, unsigned int numRetries)
+{
+ struct sockaddr_in server_addr;
+ struct sockaddr_in ack_addr;
+ socklen_t socklen = sizeof(struct sockaddr_in);
+ struct pollfd pollsock;
+ struct timeval now;
+ int retval;
+ int i;
+ ssize_t bytesReceived;
+ int on;
+
+ bzero((char *) &server_addr, sizeof(server_addr));
+ server_addr.sin_family = AF_INET;
+ server_addr.sin_port = htons(dest_port);
+ server_addr.sin_addr.s_addr = htonl(0xFFFFFFFF);
+
+ if ((pollsock.fd = socket(PF_INET, SOCK_DGRAM, IPPROTO_UDP)) == -1)
+ {
+ perror("udpBroadcastWaitForResponse():socket()");
+ return -1;
+ }
+
+ on = 1;
+ if (setsockopt(pollsock.fd, SOL_SOCKET, SO_BROADCAST, &on, sizeof(on)) == -1)
+ {
+ perror("udpBroadcastWaitForResponse():setsockopt()");
+ return -1;
+ }
+
+ pollsock.events = POLLIN;
+
+ for (i = 0; i < MAX_RETRIES; i++)
+ {
+#ifdef DHT_LOG
+ if (i > 0)
+ fprintf(logfile,"udpBroadcastWaitForResponse(): trying again, count: %d\n", i+1);
+ fflush(logfile);
+#endif
+ if (sendto(pollsock.fd, msg, msglen, 0, (struct sockaddr *)&server_addr,
+ socklen) == -1)
+ {
+ perror("udpBroadcastWaitForResponse():sendto()");
+ return -1;
+ }
+#ifdef DHT_LOG
+ gettimeofday(&now, NULL);
+ fprintf(logfile,"udpBroadcastWaitForResponse(): message sent:%ds,%dus\n",
+ now.tv_sec, now.tv_usec);
+ fflush(logfile);
+#endif
+ retval = poll(&pollsock, 1, timeout);
+ if (retval !=0)
+ {
+ bytesReceived = recvfrom(pollsock.fd, resBuffer, resBufferSize, 0,
+ (struct sockaddr *)&ack_addr, &socklen);
+ close(pollsock.fd);
+ *reply_ip = htonl(ack_addr.sin_addr.s_addr);
+#ifdef DHT_LOG
+ gettimeofday(&now, NULL);
+ fprintf(logfile,"udpBroadcastWaitForResponse(): received response:%ds,%dus\n", now.tv_sec, now.tv_usec);
+ fflush(logfile);
+#endif
+ return bytesReceived;
+ }
+ }
+ close(pollsock.fd);
+#ifdef DHT_LOG
+ gettimeofday(&now, NULL);
+ fprintf(logfile,"udpBroadcastWaitForResponse(): timed out, no ack:%ds,%dus\n",
+ now.tv_sec, now.tv_usec);
+ fflush(logfile);
+#endif
return -1;
}
sockListen = socket(AF_INET, SOCK_STREAM, 0);
if (sockListen == -1)
{
- perror("socket()");
+ perror("tcpListen():socket()");
pthread_exit(NULL);
}
if (bind(sockListen, (struct sockaddr *)&myAddr, socklen) == -1)
{
- perror("socket()");
+ perror("tcpListen():socket()");
pthread_exit(NULL);
}
if (listen(sockListen, BACKLOG) == -1)
{
- perror("listen()");
+ perror("tcpListen():listen()");
pthread_exit(NULL);
}
+#ifdef DHT_LOG
+ fprintf(logfile,"tcpListen(): listening on port %d\n", TCP_PORT);
+ fflush(logfile);
+#endif
+
while(1)
{
sockAccept = accept(sockListen, (struct sockaddr *)&clientAddr, &socklen);
void *tcpAccept(void *arg)
{
int sockAccept = (int)arg;
-
- printf("accepted tcp connection, file descriptor: %d\n", sockAccept);
+ int bytesReceived;
+ char msgType;
- sleep(30);
+#ifdef DHT_LOG
+ fprintf(logfile, "tcpAccept(): accepted tcp connection, file descriptor: %d\n", sockAccept);
+ fflush(logfile);
+#endif
+
+ bytesReceived = recv(sockAccept, &msgType, sizeof(char), 0);
+ if (bytesReceived == -1)
+ {
+ perror("tcpAccept():recv()");
+ }
+ else if (bytesReceived == 0)
+ {
+#ifdef DHT_LOG
+ fprintf(logfile, "tcpAccept(): bytesReceived = 0\n", sockAccept);
+ fflush(logfile);
+#endif
+ }
+ else
+ {
+ switch (msgType)
+ {
+ case DHT_INFO_CMD:
+ if (send(sockAccept, &numHosts, sizeof(numHosts), 0) == -1)
+ {
+ perror("tcpAccept():send()");
+ break;
+ }
+ if (send(sockAccept, &numBlocks, sizeof(numBlocks), 0) == -1)
+ {
+ perror("tcpAccept():send()");
+ break;
+ }
+ if (send(sockAccept, hostArray, numHosts*sizeof(struct hostData),
+ 0) == -1)
+ {
+ perror("tcpAccept():send()");
+ break;
+ }
+ if (send(sockAccept, blockOwnerArray, numBlocks*sizeof(unsigned int),
+ 0) == -1)
+ {
+ perror("tcpAccept():send()");
+ break;
+ }
+ break;
+ default:
+#ifdef DHT_LOG
+ fprintf(logfile, "tcpAccept(): unrecognized msg type\n");
+ fflush(logfile);
+#endif
+ }
+ }
if (close(sockAccept) == -1)
{
- perror("close()");
+ perror("tcpAccept():close()");
}
- printf("closed tcp connection, file descriptor: %d\n", sockAccept);
+#ifdef DHT_LOG
+ fprintf(logfile, "tcpAccept(): closed tcp connection, file descriptor: %d\n",
+ sockAccept);
+ fflush(logfile);
+#endif
pthread_exit(NULL);
}
if((sock = socket(AF_INET, SOCK_STREAM, 0)) < 0)
{
- perror("socket()");
+ perror("getMyIpAddr():socket()");
return 1;
}
if(ioctl(sock, SIOCGIFADDR, &interfaceInfo) != 0)
{
- perror("ioctl()");
+ perror("getMyIpAddr():ioctl()");
return 1;
}
return ntohl(myAddr->sin_addr.s_addr);
}
+unsigned int getLeadersIpAddr()
+{
+ unsigned int reply_ip;
+ int bytesReceived;
+ char myMessage;
+ char response;
+
+#ifdef DHT_LOG
+ fprintf(logfile, "getLeadersIpAddr(): broadcasting...\n");
+ fflush(logfile);
+#endif
+
+ myMessage = FIND_LEADER_CMD;
+
+ bytesReceived = udpBroadcastWaitForResponse(&reply_ip, UDP_PORT,
+ (void *)&myMessage, sizeof(myMessage), (void *)&response,
+ sizeof(response), TIMEOUT_MS, MAX_RETRIES);
+
+ if (bytesReceived == -1)
+ {
+#ifdef DHT_LOG
+ fprintf(logfile, "getLeadersIpAddr(): no response\n");
+ fflush(logfile);
+#endif
+ return 0;
+ }
+ else if (response == FIND_LEADER_RES)
+ {
+#ifdef DHT_LOG
+ struct in_addr reply_addr;
+ reply_addr.s_addr = htonl(reply_ip);
+ fprintf(logfile, "getLeadersIpAddr(): leader found:%s\n",
+ inet_ntoa(reply_addr));
+ fflush(logfile);
+#endif
+ return reply_ip;
+ }
+ else
+ {
+#ifdef DHT_LOG
+ fprintf(logfile, "getLeadersIpAddr(): unexpected response\n");
+ fflush(logfile);
+#endif
+ return 0;
+ }
+}
+
+int getDHTdata()
+{
+ struct sockaddr_in leader_addr;
+ int sock;
+ char msg;
+ int bytesReceived;
+
+ if ((sock = socket(AF_INET, SOCK_STREAM, 0)) == -1)
+ {
+ perror("getDHTdata():socket()");
+ return -1;
+ }
+
+ bzero((char *)&leader_addr, sizeof(leader_addr));
+ leader_addr.sin_family = AF_INET;
+ leader_addr.sin_port = htons(TCP_PORT);
+ leader_addr.sin_addr.s_addr = htonl(leader);
+
+ if (connect(sock, (struct sockaddr *)&leader_addr, sizeof(leader_addr)) == -1)
+ {
+ perror("getDHTdata():connect()");
+ close(sock);
+ return -1;
+ }
+ msg = DHT_INFO_CMD;
+ if (send(sock, &msg, sizeof(char), 0) == -1)
+ {
+ perror("getDHTdata():send()");
+ close(sock);
+ return -1;
+ }
+ bytesReceived = recv(sock, &numHosts, sizeof(numHosts), 0);
+ if (bytesReceived == -1)
+ {
+ perror("getDHTdata():recv()");
+ close(sock);
+ return -1;
+ }
+ if (bytesReceived != sizeof(numHosts))
+ {
+#ifdef DHT_LOG
+ fprintf(logfile,"getDHTdata(): ERROR: numHosts not completely received\n");
+ fflush(logfile);
+ close(sock);
+ return -1;
+#endif
+ }
+ bytesReceived = recv(sock, &numBlocks, sizeof(numBlocks), 0);
+ if (bytesReceived == -1)
+ {
+ perror("getDHTdata():recv()");
+ close(sock);
+ return -1;
+ }
+ if (bytesReceived != sizeof(numBlocks))
+ {
+#ifdef DHT_LOG
+ fprintf(logfile,"getDHTdata(): ERROR: numBlocks not completely received\n");
+ fflush(logfile);
+ close(sock);
+ return -1;
+#endif
+ }
+ if (hostArray != NULL)
+ free(hostArray);
+ hostArray = calloc(numHosts, sizeof(struct hostData));
+ bytesReceived = recv(sock, hostArray, numHosts*sizeof(struct hostData), 0);
+ if (bytesReceived == -1)
+ {
+ perror("getDHTdata():recv()");
+ close(sock);
+ return -1;
+ }
+ if (bytesReceived != numHosts*sizeof(struct hostData))
+ {
+#ifdef DHT_LOG
+ fprintf(logfile,"getDHTdata(): ERROR: hostArray not completely received\n");
+ fflush(logfile);
+ close(sock);
+ return -1;
+#endif
+ }
+ if (blockOwnerArray != NULL)
+ free(blockOwnerArray);
+ blockOwnerArray = calloc(numBlocks, sizeof(unsigned int));
+ bytesReceived = recv(sock, blockOwnerArray, numBlocks*sizeof(unsigned int), 0);
+ if (bytesReceived == -1)
+ {
+ perror("getDHTdata():recv()");
+ close(sock);
+ return -1;
+ }
+ if (bytesReceived != numBlocks*sizeof(unsigned int))
+ {
+#ifdef DHT_LOG
+ fprintf(logfile,"getDHTdata(): ERROR: blockOwnerArray not completely received\n");
+ fflush(logfile);
+ close(sock);
+ return -1;
+#endif
+ }
+#ifdef DHT_LOG
+ fprintf(logfile,"getDHTdata(): got data:\n");
+ writeDHTdata(logfile);
+ fflush(logfile);
+#endif
+ return 0;
+}
+
unsigned int hash(unsigned int x)
{
return x % numBlocks;
}
+void leadRebuild()
+{
+
+}
+
+void writeDHTdata(FILE *outfile)
+{
+ int i;
+ struct in_addr address;
+ fprintf(outfile,"numHosts=%d,numBlocks=%d\n", numHosts, numBlocks);
+ fprintf(outfile,"hostArray: index: ipAddr, maxKeyCapacity\n");
+ for (i = 0; i < numHosts; i++)
+ {
+ address.s_addr = htonl(hostArray[i].ipAddr);
+ fprintf(outfile,"%d: %s, %d\n", i, inet_ntoa(address),
+ hostArray[i].maxKeyCapacity);
+ }
+ fprintf(outfile,"blockOwnerArray: index: blockOwner\n");
+ for (i = 0; i < numBlocks; i++)
+ {
+ fprintf(outfile,"%d: %d\n", i, blockOwnerArray[i]);
+ }
+}
+
+#endif
+