#else
+/*******************************************************************************
+* Includes
+*******************************************************************************/
+
#include <netinet/in.h>
#include <arpa/inet.h>
#include <sys/types.h>
#include <sys/socket.h>
#include <sys/ioctl.h>
#include <stdio.h>
+#include <stdarg.h>
#include <string.h>
#include <stdlib.h>
#include <unistd.h>
#include <netdb.h>
#include <net/if.h>
#include <linux/sockios.h>
+#include <sys/time.h>
#include "clookup.h" //this works for now, do we need anything better?
+/*******************************************************************************
+* Local Defines, Structs
+*******************************************************************************/
+
#define BUFFER_SIZE 512 //maximum message size
#define UDP_PORT 2157
#define TCP_PORT 2157
#define BACKLOG 10 //max pending tcp connections
#define TIMEOUT_MS 500
#define MAX_RETRIES 3
-#define INIT_HOST_ALLOC 16
-#define INIT_BLOCK_NUM 64
+#define INIT_HOST_ALLOC 1
+#define INIT_BLOCK_NUM 1
#define DEFAULT_INTERFACE "eth0"
#define DHT_LOG "dht.log"
+//make sure this is consistent with enum below
+#define NUM_MSG_TYPES 20
-#define NUM_MSG_TYPES 19
-
+//make sure this matches msg_types global var
enum {
INSERT_CMD,
INSERT_RES,
REMOVE_RES,
SEARCH_CMD,
SEARCH_RES,
- FIND_LEADER_CMD,
+ FIND_LEADER_REQ,
FIND_LEADER_RES,
REBUILD_REQ,
REBUILD_RES,
REBUILD_CMD,
JOIN_REQ,
JOIN_RES,
+ GET_DHT_INFO_CMD,
DHT_INFO_REQ,
DHT_INFO_RES,
FILL_DHT_CMD,
REBUILD_DONE_INFO
};
-const char *msg_types[NUM_MSG_TYPES] =
-{
- "INSERT_CMD",
- "INSERT_RES",
- "REMOVE_CMD",
- "REMOVE_RES",
- "SEARCH_CMD",
- "SEARCH_RES",
- "FIND_LEADER_CMD",
- "FIND_LEADER_RES",
- "REBUILD_REQ",
- "REBUILD_RES",
- "NOT_LEADER",
- "REBUILD_CMD",
- "JOIN_REQ",
- "JOIN_RES",
- "DHT_INFO_REQ",
- "DHT_INFO_RES",
- "FILL_DHT_CMD",
- "FILL_DHT_RES",
- "REBUILD_DONE_INFO"
-};
-
//status codes
enum {
INSERT_OK,
NOT_KEY_OWNER,
};
+enum {
+ NORMAL_STATE,
+ REBUILD1_STATE,
+ REBUILD2_STATE,
+ REBUILD3_STATE,
+ LEAD_NORMAL_STATE,
+ LEAD_REBUILD1_STATE,
+ LEAD_REBUILD2_STATE,
+ LEAD_REBUILD3_STATE
+};
+
struct hostData {
unsigned int ipAddr;
unsigned int maxKeyCapacity;
unsigned int val;
};
-struct rebuildRes {
+struct joinReq {
unsigned int msgType:8;
unsigned int unused:24;
- unsigned int status;
+ struct hostData newHostData;
};
-//TODO: leave message, rebuild message...
+/*******************************************************************************
+* Global Variables
+*******************************************************************************/
+
+//make sure this matches enumeration above
+const char *msg_types[NUM_MSG_TYPES] =
+{
+ "INSERT_CMD",
+ "INSERT_RES",
+ "REMOVE_CMD",
+ "REMOVE_RES",
+ "SEARCH_CMD",
+ "SEARCH_RES",
+ "FIND_LEADER_REQ",
+ "FIND_LEADER_RES",
+ "REBUILD_REQ",
+ "REBUILD_RES",
+ "NOT_LEADER",
+ "REBUILD_CMD",
+ "JOIN_REQ",
+ "JOIN_RES",
+ "GET_DHT_INFO_CMD",
+ "DHT_INFO_REQ",
+ "DHT_INFO_RES",
+ "FILL_DHT_CMD",
+ "FILL_DHT_RES",
+ "REBUILD_DONE_INFO"
+};
FILE *logfile;
-unsigned int leader; //ip address of leader
+//ip address of leader
+unsigned int leader;
+//set by dhtInit()
struct hostData myHostData;
-/*----DHT data----*/
+//number of hosts in the system
unsigned int numHosts;
+//ip address and max key capacity of each host
struct hostData *hostArray;
+//memory allocated for this many items in hostArray
+unsigned int hostArraySize;
+//number of keyspace divisions, preferably a power of 2 > numHosts
unsigned int numBlocks;
+//this array has numBlocks elements, each of which contains an index to hostArray
+// the key owner is found by hashing the key into one of these blocks and using this
+// array to find the corresponding host in hostArray
unsigned int *blockOwnerArray;
-/*----end DHT data----*/
+//used by leader to track which hosts have responded, etc.
+unsigned int *hostRebuildStates;
+//thread handles
pthread_t threadUdpListen;
pthread_t threadTcpListen;
-int udpServerSock;
+//server sockets
+struct pollfd udpServerPollSock;
int tcpListenSock;
+//see above for enumeration of states
+int state;
+
+/*******************************************************************************
+* Local Function Prototypes
+*******************************************************************************/
+//log funtion, use like printf()
+void dhtLog(const char *format, ...);
//return my IP address
unsigned int getMyIpAddr();
//sends broadcast to discover leader
unsigned int getKeyOwner(unsigned int key);
//simple hash
unsigned int hash(unsigned int x);
+//sends REBUILD_REQ to leader, retries until leader responds, or causes new leader to be chosen
+void initRebuild();
+//adds entry to end of hostArray, increments numHosts,
+// allocates more space if necessary
+void addHost(struct hostData newHost);
//initiates TCP connection with leader, gets DHT data
int getDHTdata();
//outputs readable DHT data to outfile
void writeDHTdata(FILE *outfile);
-void initRebuild();
-void leadRebuild();
-void followRebuild();
+void clearDHTdata();
+void initDHTdata();
+void makeAssignments();
+//returns not-zero if ok, zero if not ok
+int msgSizeOk(unsigned char type, unsigned int size);
+
+/*******************************************************************************
+* Global Function Definitions
+*******************************************************************************/
void dhtInit(unsigned int maxKeyCapacity)
{
int i;
int ret;
-#ifdef DHT_LOG
logfile = fopen(DHT_LOG, "w");
-#endif
+ dhtLog("dhtInit() - initializing...\n");
myHostData.ipAddr = getMyIpAddr();
myHostData.maxKeyCapacity = maxKeyCapacity;
- numHosts = numBlocks = 0;
+ numHosts = numBlocks = hostArraySize = 0;
hostArray = NULL;
blockOwnerArray = NULL;
+ hostRebuildStates = NULL;
+
+ state = NORMAL_STATE;
pthread_create(&threadUdpListen, NULL, udpListen, NULL);
pthread_create(&threadTcpListen, NULL, tcpListen, NULL);
initRebuild();
-/* leader = findLeader();
-
- if (leader == 0)
- { //no response: I am the first
- leader = getMyIpAddr();
-
- numHosts = 1;
- hostArray = calloc(numHosts, sizeof(struct hostData));
- hostArray[0] = myHostData;
- numBlocks = INIT_BLOCK_NUM;
- blockOwnerArray = calloc(numBlocks, sizeof(unsigned int));
- for (i = 0; i < numBlocks; i++)
- blockOwnerArray[i] = 0;
- }
- else
- {
- //get DHT data from leader
- ret = getDHTdata();
-
- //TODO: actually, just initiate a rebuild here instead
- }
-*/
-
- //start servers
-
return;
}
void dhtExit()
{
+ dhtLog("dhtExit(): cleaning up...\n");
fclose(logfile);
pthread_cancel(threadUdpListen);
pthread_cancel(threadTcpListen);
- close(udpServerSock);
+ close(udpServerPollSock.fd);
close(tcpListenSock);
+ clearDHTdata();
}
int dhtInsert(unsigned int key, unsigned int val)
return -1; //this function should be robust enough to always return 0 or 1
}
-
+/*******************************************************************************
+* Local Function Definitions
+*******************************************************************************/
//use UDP for messages that are frequent and short
void *udpListen()
{
struct sockaddr_in myAddr;
struct sockaddr_in clientAddr;
+ struct sockaddr_in bcastAddr;
socklen_t socklen = sizeof(struct sockaddr_in);
char buffer[BUFFER_SIZE];
ssize_t bytesReceived;
struct insertRes *insertResPtr;
struct removeRes *removeResPtr;
struct searchRes *searchResPtr;
+ struct joinReq *joinReqPtr;
char replyBuffer[BUFFER_SIZE];
struct timeval now;
+ struct timeval rebuild1Timeout;
+ int rebuild1TimerSet;
+ int on;
+ int pollret;
+ int i;
chashtable_t *myHashTable = chashCreate(HASH_SIZE, LOADFACTOR);
- if ((udpServerSock = socket(AF_INET, SOCK_DGRAM, IPPROTO_UDP)) == -1)
+ if ((udpServerPollSock.fd = socket(AF_INET, SOCK_DGRAM, IPPROTO_UDP)) == -1)
{
perror("udpListen():socket()");
pthread_exit(NULL);
}
+
+ on = 1;
+ if (setsockopt(udpServerPollSock.fd, SOL_SOCKET, SO_BROADCAST, &on,
+ sizeof(on)) == -1)
+ {
+ perror("udpBroadcastWaitForResponse():setsockopt()");
+ pthread_exit(NULL);
+ }
+
+ udpServerPollSock.events = POLLIN;
bzero(&myAddr, socklen);
- myAddr.sin_family=AF_INET;
- myAddr.sin_addr.s_addr=INADDR_ANY;
- myAddr.sin_port=htons(UDP_PORT);
+ myAddr.sin_family = AF_INET;
+ myAddr.sin_addr.s_addr = htonl(INADDR_ANY);
+ myAddr.sin_port = htons(UDP_PORT);
+
+ bzero(&bcastAddr, socklen);
+ bcastAddr.sin_family = AF_INET;
+ bcastAddr.sin_addr.s_addr = htonl(0xFFFFFFFF);
+ bcastAddr.sin_port = htons(UDP_PORT);
- if (bind(udpServerSock, (struct sockaddr *)&myAddr, socklen) == -1)
+ if (bind(udpServerPollSock.fd, (struct sockaddr *)&myAddr, socklen) == -1)
{
perror("udpListen():bind()");
pthread_exit(NULL);
}
-#ifdef DHT_LOG
- fprintf(logfile,"udpListen(): listening on port %d\n", UDP_PORT);
- fflush(logfile);
-#endif
+ dhtLog("udpListen(): listening on port %d\n", UDP_PORT);
+
+ rebuild1TimerSet = 0;
while(1)
{
- if ((bytesReceived = recvfrom(udpServerSock, buffer, BUFFER_SIZE, 0,
- (struct sockaddr *)&clientAddr, &socklen)) == -1)
- {
- perror("udpListen():recvfrom()");
- }
- else if (bytesReceived == 0)
+ pollret = poll(&udpServerPollSock, 1, TIMEOUT_MS);
+ if (pollret < 0)
+ { perror("udpListen():poll()"); }
+ else if (pollret > 0)
{
-#ifdef DHT_LOG
- fprintf(logfile,"udpListen(): recvfrom() returned 0\n");
- fflush(logfile);
-#endif
- }
- else
- {
- gettimeofday(&now, NULL);
-#ifdef DHT_LOG
- fprintf(logfile, "udpListen(): received %s from %s\n",
- (buffer[0] < NUM_MSG_TYPES ? msg_types[buffer[0]] : "unknown message"),
- inet_ntoa(clientAddr.sin_addr));
-// fprintf(logfile,"udpListen(): time received:%ds,%dus\n", now.tv_sec,
-// now.tv_usec);
-// fprintf(logfile,"udpListen(): msg size:%d bytes source:%s:%d\n",
-// bytesReceived,inet_ntoa(clientAddr.sin_addr),htons(clientAddr.sin_port));
- fflush(logfile);
-#endif
-
- switch (buffer[0])
+ if ((bytesReceived = recvfrom(udpServerPollSock.fd, buffer, BUFFER_SIZE,
+ 0, (struct sockaddr *)&clientAddr, &socklen)) == -1)
+ { perror("udpListen():recvfrom()"); }
+ else if (bytesReceived == 0)
{
- case INSERT_CMD:
- if (bytesReceived != sizeof(struct insertCmd))
- {
-#ifdef DHT_LOG
- fprintf(logfile, "udpListen(): ERROR: incorrect message size\n");
- fflush(logfile);
-#endif
- break;
- }
- insertCmdPtr = (struct insertCmd *)buffer;
-#ifdef DHT_LOG
- fprintf(logfile, "udpListen(): Insert: key=%d, val=%d\n",
- insertCmdPtr->key, insertCmdPtr->val);
- fflush(logfile);
-#endif
- insertResPtr = (struct insertRes *)replyBuffer;
- insertResPtr->msgType = INSERT_RES;
- if (getKeyOwner(insertCmdPtr->key) == myHostData.ipAddr)
- {
- //note: casting val to void * in order to conform to API
- if(chashInsert(myHashTable, insertCmdPtr->key,
- (void *)insertCmdPtr->val) == 0)
- insertResPtr->status = INSERT_OK;
- else
- insertResPtr->status = INSERT_ERROR;
- }
- else
- {
- insertResPtr->status = NOT_KEY_OWNER;;
- }
- if (sendto(udpServerSock, (void *)insertResPtr,
- sizeof(struct insertRes), 0, (struct sockaddr *)&clientAddr,
- socklen) == -1)
- {
- perror("udpListen():sendto()");
- }
- break;
- case REMOVE_CMD:
- if (bytesReceived != sizeof(struct removeCmd))
- {
-#ifdef DHT_LOG
- fprintf(logfile, "udpListen(): ERROR: incorrect message size\n");
- fflush(logfile);
-#endif
- break;
- }
- removeCmdPtr = (struct removeCmd *)buffer;
-#ifdef DHT_LOG
- fprintf(logfile,"udpListen(): Remove: key=%d\n", removeCmdPtr->key);
- fflush(logfile);
-#endif
- removeResPtr = (struct removeRes *)replyBuffer;
- removeResPtr->msgType = REMOVE_RES;
- if (getKeyOwner(removeCmdPtr->key) == myHostData.ipAddr)
- {
- //note: casting val to void * in order to conform to API
- if(chashRemove(myHashTable, removeCmdPtr->key) == 0)
- removeResPtr->status = INSERT_OK;
- else
- removeResPtr->status = INSERT_ERROR;
- }
- else
- {
- removeResPtr->status = NOT_KEY_OWNER;
- }
- if (sendto(udpServerSock, (void *)removeResPtr, sizeof(struct removeRes), 0,
- (struct sockaddr *)&clientAddr, socklen) == -1)
- {
- perror("udpListen():sendto()");
- }
- break;
- case SEARCH_CMD:
- if (bytesReceived != sizeof(struct searchCmd))
- {
-#ifdef DHT_LOG
- fprintf(logfile,"udpListen(): ERROR: incorrect message size\n");
- fflush(logfile);
-#endif
- break;
- }
- searchCmdPtr = (struct searchCmd *)buffer;
-#ifdef DHT_LOG
- fprintf(logfile,"udpListen(): Search: key=%d\n",searchCmdPtr->key);
- fflush(logfile);
-#endif
- searchResPtr = (struct searchRes *)replyBuffer;
- searchResPtr->msgType = SEARCH_RES;
- if (getKeyOwner(searchCmdPtr->key) == myHostData.ipAddr)
- {
- //note: casting val to void * in order to conform to API
- if((searchResPtr->val = (unsigned int)chashSearch(myHashTable,
- searchCmdPtr->key)) == 0)
- searchResPtr->status = KEY_NOT_FOUND;
- else
- searchResPtr->status = KEY_FOUND;
- }
- else
- {
- searchResPtr->status = NOT_KEY_OWNER;
- }
- if (sendto(udpServerSock, (void *)searchResPtr, sizeof(struct searchRes), 0,
- (struct sockaddr *)&clientAddr, socklen) == -1)
- {
- perror("udpListen():sendto()");
- }
- break;
- case FIND_LEADER_CMD:
- if (bytesReceived != sizeof(char))
- {
-#ifdef DHT_LOG
- fprintf(logfile, "udpListen(): ERROR: incorrect message size\n");
- fflush(logfile);
-#endif
- break;
- }
- if (leader == getMyIpAddr())
- {
- replyBuffer[0] = FIND_LEADER_RES;
- if(sendto(udpServerSock, (void *)replyBuffer, sizeof(char), 0,
- (struct sockaddr *)&clientAddr, socklen) == -1)
- {
- perror("udpListen():sendto");
- }
- }
- break;
- case REBUILD_REQ:
- if (bytesReceived != sizeof(char))
- {
-#ifdef DHT_LOG
- fprintf(logfile, "udpListen(): ERROR: incorrect message size\n");
- fflush(logfile);
-#endif
- break;
- }
- if (leader == getMyIpAddr())
- {
- replyBuffer[0] = REBUILD_RES;
- if(sendto(udpServerSock, (void *)replyBuffer, sizeof(char), 0,
- (struct sockaddr *)&clientAddr, socklen) == -1)
- {
- perror("udpListen():sendto");
- }
- //TODO: leadRebuild()
- }
- else
+ dhtLog("udpListen(): recvfrom() returned 0\n");
+ }
+ else
+ {
+ dhtLog("udpListen(): received %s from %s\n",
+ (buffer[0] < NUM_MSG_TYPES ? msg_types[buffer[0]] :
+ "unknown message"), inet_ntoa(clientAddr.sin_addr));
+ if (!msgSizeOk(buffer[0], bytesReceived))
+ {
+ dhtLog("udpListen(): ERROR: incorrect message size\n");
+ }
+ else
+ {
+ switch (buffer[0])
{
- replyBuffer[0] = NOT_LEADER;
- if(sendto(udpServerSock, (void *)replyBuffer, sizeof(char), 0,
- (struct sockaddr *)&clientAddr, socklen) == -1)
- {
- perror("udpListen():sendto");
- }
+ case INSERT_CMD:
+ if (state == NORMAL_STATE || state == LEAD_NORMAL_STATE
+ || state == REBUILD3_STATE || state == LEAD_REBUILD3_STATE)
+ {
+ insertCmdPtr = (struct insertCmd *)buffer;
+ dhtLog( "udpListen(): Insert: key=%d, val=%d\n",
+ insertCmdPtr->key, insertCmdPtr->val);
+ insertResPtr = (struct insertRes *)replyBuffer;
+ insertResPtr->msgType = INSERT_RES;
+ insertResPtr->unused = 0;
+ if (getKeyOwner(insertCmdPtr->key) == myHostData.ipAddr)
+ {
+ //note: casting val to void * in order to conform to API
+ if(chashInsert(myHashTable, insertCmdPtr->key,
+ (void *)insertCmdPtr->val) == 0)
+ insertResPtr->status = INSERT_OK;
+ else
+ insertResPtr->status = INSERT_ERROR;
+ }
+ else
+ {
+ insertResPtr->status = NOT_KEY_OWNER;;
+ }
+ if (sendto(udpServerPollSock.fd, (void *)insertResPtr,
+ sizeof(struct insertRes), 0, (struct sockaddr *)&clientAddr,
+ socklen) == -1)
+ { perror("udpListen():sendto()"); }
+ }
+ break;
+ case REMOVE_CMD:
+ if (state == NORMAL_STATE || state == LEAD_NORMAL_STATE)
+ {
+ removeCmdPtr = (struct removeCmd *)buffer;
+ dhtLog("udpListen(): Remove: key=%d\n", removeCmdPtr->key);
+ removeResPtr = (struct removeRes *)replyBuffer;
+ removeResPtr->msgType = REMOVE_RES;
+ removeResPtr->unused = 0;
+ if (getKeyOwner(removeCmdPtr->key) == myHostData.ipAddr)
+ {
+ //note: casting val to void * in order to conform to API
+ if(chashRemove(myHashTable, removeCmdPtr->key) == 0)
+ removeResPtr->status = INSERT_OK;
+ else
+ removeResPtr->status = INSERT_ERROR;
+ }
+ else
+ {
+ removeResPtr->status = NOT_KEY_OWNER;
+ }
+ if (sendto(udpServerPollSock.fd, (void *)removeResPtr,
+ sizeof(struct removeRes), 0, (struct sockaddr *)&clientAddr,
+ socklen) == -1)
+ { perror("udpListen():sendto()"); }
+ }
+ break;
+ case SEARCH_CMD:
+ if (state == NORMAL_STATE || state == LEAD_NORMAL_STATE)
+ {
+ searchCmdPtr = (struct searchCmd *)buffer;
+ dhtLog("udpListen(): Search: key=%d\n",searchCmdPtr->key);
+ searchResPtr = (struct searchRes *)replyBuffer;
+ searchResPtr->msgType = SEARCH_RES;
+ searchResPtr->unused = 0;
+ if (getKeyOwner(searchCmdPtr->key) == myHostData.ipAddr)
+ {
+ //note: casting val to void * in order to conform to API
+ if((searchResPtr->val = (unsigned int)chashSearch(myHashTable,
+ searchCmdPtr->key)) == 0)
+ searchResPtr->status = KEY_NOT_FOUND;
+ else
+ searchResPtr->status = KEY_FOUND;
+ }
+ else
+ {
+ searchResPtr->status = NOT_KEY_OWNER;
+ }
+ if (sendto(udpServerPollSock.fd, (void *)searchResPtr,
+ sizeof(struct searchRes), 0, (struct sockaddr *)&clientAddr,
+ socklen) == -1)
+ { perror("udpListen():sendto()"); }
+ }
+ break;
+ case FIND_LEADER_REQ:
+ if (state == LEAD_NORMAL_STATE || state == LEAD_REBUILD1_STATE
+ || state == LEAD_REBUILD2_STATE || state == LEAD_REBUILD3_STATE)
+ {
+ replyBuffer[0] = FIND_LEADER_RES;
+ if(sendto(udpServerPollSock.fd, (void *)replyBuffer,
+ sizeof(char), 0,(struct sockaddr *)&clientAddr, socklen) == -1)
+ { perror("udpListen():sendto()"); }
+ }
+ break;
+ case REBUILD_REQ:
+ if (state == LEAD_NORMAL_STATE || state == LEAD_REBUILD1_STATE
+ || state == LEAD_REBUILD2_STATE || state == LEAD_REBUILD3_STATE)
+ {
+ replyBuffer[0] = REBUILD_RES;
+ if (sendto(udpServerPollSock.fd, (void *)replyBuffer,
+ sizeof(char), 0, (struct sockaddr *)&clientAddr, socklen) == -1)
+ { perror("udpListen():sendto()"); }
+ if (gettimeofday(&rebuild1Timeout, NULL) < 0)
+ { perror("dhtLog():gettimeofday()"); }
+ //TODO: make this a configurable parameter
+ rebuild1Timeout.tv_sec += 3;
+ rebuild1TimerSet = 1;
+ //clear out previous host data
+ numHosts = 1;
+ hostArray[0] = myHostData;
+
+ state = LEAD_REBUILD1_STATE;
+
+ replyBuffer[0] = REBUILD_CMD;
+ if (sendto(udpServerPollSock.fd, (void *)replyBuffer,
+ sizeof(char), 0, (struct sockaddr *)&bcastAddr, socklen) == -1)
+ { perror("udpListen():sendto()"); }
+
+ }
+ else
+ {
+ replyBuffer[0] = NOT_LEADER;
+ if(sendto(udpServerPollSock.fd, (void *)replyBuffer,
+ sizeof(char), 0,(struct sockaddr *)&clientAddr, socklen) == -1)
+ { perror("udpListen():sendto()"); }
+ }
+ case REBUILD_CMD:
+ if (state != LEAD_REBUILD1_STATE)
+ {
+ //consider this an official declaration of authority,
+ // in case I was confused about this
+ leader = htonl(clientAddr.sin_addr.s_addr);
+
+ clearDHTdata();
+
+ joinReqPtr = (struct joinReq *)replyBuffer;
+ joinReqPtr->msgType = JOIN_REQ;
+ joinReqPtr->unused = 0;
+ joinReqPtr->newHostData = myHostData;
+ //note: I'm reusing bytesReceived and buffer
+ bytesReceived = udpSendWaitForResponse(leader, UDP_PORT,
+ (void *)replyBuffer, sizeof(struct joinReq), (void *)buffer,
+ BUFFER_SIZE, TIMEOUT_MS, MAX_RETRIES);
+ if ((bytesReceived == sizeof(char)) && (buffer[0] == JOIN_RES))
+ state = REBUILD1_STATE;
+ else
+ initRebuild();
+ }
+ break;
+ case JOIN_REQ:
+ if (state == LEAD_REBUILD1_STATE)
+ {
+ joinReqPtr = (struct joinReq *)buffer;
+ addHost(joinReqPtr->newHostData);
+
+ replyBuffer[0] = JOIN_RES;
+ if (sendto(udpServerPollSock.fd, (void *)replyBuffer,
+ sizeof(char), 0,(struct sockaddr *)&clientAddr, socklen) == -1)
+ { perror("udpListen():sendto()"); }
+ }
+ break;
+ case GET_DHT_INFO_CMD:
+ if (state == REBUILD1_STATE)
+ {
+ getDHTdata();
+ state = REBUILD2_STATE;
+ }
+ break;
+ default:
+ dhtLog("udpListen(): ERROR: Unknown message type\n");
}
- break;
-// default:
-#ifdef DHT_LOG
-// fprintf(logfile,"udpListen(): ERROR: Unknown message type\n");
-// fflush(logfile);
-#endif
+ }
+ }
+ } //end (pollret > 0)
+ else // (pollret == 0), timeout
+ {
+ if (gettimeofday(&now, NULL) < 0)
+ { perror("dhtLog():gettimeofday()"); }
+ if (rebuild1TimerSet && timercmp(&now, &rebuild1Timeout, >))
+ {
+ rebuild1TimerSet = 0;
+ if (state == LEAD_REBUILD1_STATE)
+ {
+ makeAssignments();
+ dhtLog("udpListen(): assignments made\n");
+ writeDHTdata(logfile);
+ if (hostRebuildStates != NULL)
+ free(hostRebuildStates);
+ hostRebuildStates = calloc(numHosts, sizeof(unsigned int));
+ for (i = 0; i < numHosts; i++)
+ hostRebuildStates[i] = REBUILD1_STATE;
+ state = LEAD_REBUILD2_STATE;
+ replyBuffer[0] = GET_DHT_INFO_CMD;
+ if (sendto(udpServerPollSock.fd, (void *)replyBuffer,
+ sizeof(char), 0, (struct sockaddr *)&bcastAddr, socklen) == -1)
+ { perror("udpListen():sendto()"); }
+ }
}
}
}
struct sockaddr_in ack_addr;
socklen_t socklen = sizeof(struct sockaddr_in);
struct pollfd pollsock;
- struct timeval now;
int retval;
int i;
ssize_t bytesReceived;
for (i = 0; i < MAX_RETRIES; i++)
{
-#ifdef DHT_LOG
if (i > 0)
- fprintf(logfile,"udpSendWaitForResponse(): trying again, count: %d\n",
- i+1);
- fflush(logfile);
-#endif
+ dhtLog("udpSendWaitForResponse(): trying again, count: %d\n", i+1);
if (sendto(pollsock.fd, msg, msglen, 0, (struct sockaddr *)&server_addr,
socklen) == -1)
{
perror("udpSendWaitForResponse():sendto");
return -1;
}
-#ifdef DHT_LOG
- gettimeofday(&now, NULL);
- fprintf(logfile,"udpSendWaitForResponse(): message sent:%ds,%dus\n",
- now.tv_sec, now.tv_usec);
- fflush(logfile);
-#endif
+ dhtLog("udpSendWaitForResponse(): message sent\n");
retval = poll(&pollsock, 1, timeout);
- if (retval !=0)
+ if (retval < 0)
+ {
+ perror("udpSendWaitForResponse():poll()");
+ }
+ else if (retval > 0)
{
bytesReceived = recvfrom(pollsock.fd, resBuffer, resBufferSize, 0,
(struct sockaddr *)&ack_addr, &socklen);
&& (ack_addr.sin_port == server_addr.sin_port))
{
close(pollsock.fd);
-#ifdef DHT_LOG
- gettimeofday(&now, NULL);
- fprintf(logfile,"udpSendWaitForResponse(): received response:%ds,%dus\n", now.tv_sec, now.tv_usec);
- fflush(logfile);
-#endif
+ dhtLog("udpSendWaitForResponse(): received response\n");
return bytesReceived;
}
}
}
close(pollsock.fd);
-#ifdef DHT_LOG
- gettimeofday(&now, NULL);
- printf("udpSendWaitForResponse(): timed out, no ack:%ds,%dus\n",
- now.tv_sec, now.tv_usec);
- fflush(logfile);
-#endif
+ printf("udpSendWaitForResponse(): timed out, no ack\n");
return -1;
}
struct sockaddr_in ack_addr;
socklen_t socklen = sizeof(struct sockaddr_in);
struct pollfd pollsock;
- struct timeval now;
int retval;
int i;
ssize_t bytesReceived;
for (i = 0; i < MAX_RETRIES; i++)
{
-#ifdef DHT_LOG
if (i > 0)
- fprintf(logfile,"udpBroadcastWaitForResponse(): trying again, count: %d\n", i+1);
- fflush(logfile);
-#endif
+ dhtLog("udpBroadcastWaitForResponse(): trying again, count: %d\n", i+1);
if (sendto(pollsock.fd, msg, msglen, 0, (struct sockaddr *)&server_addr,
socklen) == -1)
{
perror("udpBroadcastWaitForResponse():sendto()");
return -1;
}
-#ifdef DHT_LOG
- gettimeofday(&now, NULL);
- fprintf(logfile,"udpBroadcastWaitForResponse(): message sent:%ds,%dus\n",
- now.tv_sec, now.tv_usec);
- fflush(logfile);
-#endif
+ dhtLog("udpBroadcastWaitForResponse(): message sent\n");
retval = poll(&pollsock, 1, timeout);
if (retval !=0)
{
(struct sockaddr *)&ack_addr, &socklen);
close(pollsock.fd);
*reply_ip = htonl(ack_addr.sin_addr.s_addr);
-#ifdef DHT_LOG
- gettimeofday(&now, NULL);
- fprintf(logfile,"udpBroadcastWaitForResponse(): received response:%ds,%dus\n", now.tv_sec, now.tv_usec);
- fflush(logfile);
-#endif
+ dhtLog("udpBroadcastWaitForResponse(): received response\n");
return bytesReceived;
}
}
close(pollsock.fd);
-#ifdef DHT_LOG
- gettimeofday(&now, NULL);
- fprintf(logfile,"udpBroadcastWaitForResponse(): timed out, no ack:%ds,%dus\n",
- now.tv_sec, now.tv_usec);
- fflush(logfile);
-#endif
+ dhtLog("udpBroadcastWaitForResponse(): timed out, no ack\n");
return -1;
}
pthread_exit(NULL);
}
-#ifdef DHT_LOG
- fprintf(logfile,"tcpListen(): listening on port %d\n", TCP_PORT);
- fflush(logfile);
-#endif
+ dhtLog("tcpListen(): listening on port %d\n", TCP_PORT);
while(1)
{
- tcpAcceptSock = accept(tcpListenSock, (struct sockaddr *)&clientAddr, &socklen);
+ tcpAcceptSock = accept(tcpListenSock, (struct sockaddr *)&clientAddr,
+ &socklen);
pthread_create(&threadTcpAccept, NULL, tcpAccept, (void *)tcpAcceptSock);
}
}
int bytesReceived;
char msgType;
-#ifdef DHT_LOG
- fprintf(logfile, "tcpAccept(): accepted tcp connection, file descriptor: %d\n", tcpAcceptSock);
- fflush(logfile);
-#endif
+ dhtLog("tcpAccept(): accepted tcp connection, file descriptor: %d\n",
+ tcpAcceptSock);
bytesReceived = recv(tcpAcceptSock, &msgType, sizeof(char), 0);
if (bytesReceived == -1)
- {
- perror("tcpAccept():recv()");
- }
+ { perror("tcpAccept():recv()"); }
else if (bytesReceived == 0)
{
-#ifdef DHT_LOG
- fprintf(logfile, "tcpAccept(): bytesReceived = 0\n", tcpAcceptSock);
- fflush(logfile);
-#endif
+ dhtLog( "tcpAccept(): bytesReceived = 0\n", tcpAcceptSock);
}
else
{
}
break;
default:
-#ifdef DHT_LOG
- fprintf(logfile, "tcpAccept(): unrecognized msg type\n");
- fflush(logfile);
-#endif
+ dhtLog("tcpAccept(): unrecognized msg type\n");
}
}
if (close(tcpAcceptSock) == -1)
- {
- perror("tcpAccept():close()");
- }
+ { perror("tcpAccept():close()"); }
-#ifdef DHT_LOG
- fprintf(logfile, "tcpAccept(): closed tcp connection, file descriptor: %d\n",
+ dhtLog("tcpAccept(): closed tcp connection, file descriptor: %d\n",
tcpAcceptSock);
- fflush(logfile);
-#endif
pthread_exit(NULL);
}
unsigned int getKeyOwner(unsigned int key)
{
- return hostArray[blockOwnerArray[hash(key)]].ipAddr;
+ if (state == NORMAL_STATE || state == LEAD_NORMAL_STATE
+ || state == REBUILD3_STATE || state == LEAD_REBUILD3_STATE)
+ {
+ return hostArray[blockOwnerArray[hash(key)]].ipAddr;
+ }
+ else
+ { //TODO: figure out what is best to do here. Would like calls to dhtSearch,
+ // etc. to block rather than fail during rebuilds
+ return 0;
+ }
}
unsigned int getMyIpAddr()
char myMessage;
char response;
-#ifdef DHT_LOG
- fprintf(logfile, "findLeader(): broadcasting...\n");
- fflush(logfile);
-#endif
+ dhtLog("findLeader(): broadcasting...\n");
- myMessage = FIND_LEADER_CMD;
+ myMessage = FIND_LEADER_REQ;
bytesReceived = udpBroadcastWaitForResponse(&reply_ip, UDP_PORT,
(void *)&myMessage, sizeof(myMessage), (void *)&response,
if (bytesReceived == -1)
{
-#ifdef DHT_LOG
- fprintf(logfile, "findLeader(): no response\n");
- fflush(logfile);
-#endif
+ dhtLog("findLeader(): no response\n");
return 0;
}
else if (response == FIND_LEADER_RES)
{
-#ifdef DHT_LOG
struct in_addr reply_addr;
reply_addr.s_addr = htonl(reply_ip);
- fprintf(logfile, "findLeader(): leader found:%s\n",
+ dhtLog("findLeader(): leader found:%s\n",
inet_ntoa(reply_addr));
- fflush(logfile);
-#endif
return reply_ip;
}
else
{
-#ifdef DHT_LOG
- fprintf(logfile, "findLeader(): unexpected response\n");
- fflush(logfile);
-#endif
+ dhtLog("findLeader(): unexpected response\n");
return 0;
}
}
char msg;
int bytesReceived;
+ clearDHTdata();
+
if ((sock = socket(AF_INET, SOCK_STREAM, 0)) == -1)
{
perror("getDHTdata():socket()");
}
if (bytesReceived != sizeof(numHosts))
{
-#ifdef DHT_LOG
- fprintf(logfile,"getDHTdata(): ERROR: numHosts not completely received\n");
- fflush(logfile);
-#endif
+ dhtLog("getDHTdata(): ERROR: numHosts not completely received\n");
close(sock);
return -1;
}
}
if (bytesReceived != sizeof(numBlocks))
{
-#ifdef DHT_LOG
- fprintf(logfile,"getDHTdata(): ERROR: numBlocks not completely received\n");
- fflush(logfile);
-#endif
+ dhtLog("getDHTdata(): ERROR: numBlocks not completely received\n");
close(sock);
return -1;
}
- if (hostArray != NULL)
- free(hostArray);
hostArray = calloc(numHosts, sizeof(struct hostData));
bytesReceived = recv(sock, hostArray, numHosts*sizeof(struct hostData), 0);
if (bytesReceived == -1)
}
if (bytesReceived != numHosts*sizeof(struct hostData))
{
-#ifdef DHT_LOG
- fprintf(logfile,"getDHTdata(): ERROR: hostArray not completely received\n");
- fflush(logfile);
-#endif
+ dhtLog("getDHTdata(): ERROR: hostArray not completely received\n");
close(sock);
return -1;
}
- if (blockOwnerArray != NULL)
- free(blockOwnerArray);
blockOwnerArray = calloc(numBlocks, sizeof(unsigned int));
- bytesReceived = recv(sock, blockOwnerArray, numBlocks*sizeof(unsigned int), 0);
+ bytesReceived = recv(sock,blockOwnerArray,numBlocks*sizeof(unsigned int),0);
if (bytesReceived == -1)
{
perror("getDHTdata():recv()");
}
if (bytesReceived != numBlocks*sizeof(unsigned int))
{
-#ifdef DHT_LOG
- fprintf(logfile,"getDHTdata(): ERROR: blockOwnerArray not completely received\n");
- fflush(logfile);
-#endif
+ dhtLog("getDHTdata(): ERROR: blockOwnerArray not completely received\n");
close(sock);
return -1;
}
-#ifdef DHT_LOG
- fprintf(logfile,"getDHTdata(): got data:\n");
- writeDHTdata(logfile);
- fflush(logfile);
-#endif
+ dhtLog("getDHTdata(): got data:\n");
+ writeDHTdata(logfile);
+
return 0;
}
unsigned int hash(unsigned int x)
{
- return x % numBlocks;
+ //this shouldn't be called when numBlocks = 0, so if you get a divide-by-zero,
+ // make sure we are in a proper state for key owner lookups
+ return x % numBlocks;
}
//This function will not return until it succeeds in submitting
while (!done)
{
-#ifdef DHT_LOG
if (retry_count > 0)
{
- fprintf(logfile,"initRebuild(): retry count:%d\n", retry_count);
- fflush(logfile);
+ dhtLog("initRebuild(): retry count:%d\n", retry_count);
}
-#endif
if (leader == 0 || retry_count > 0)
{
if (leader == 0) //no response
{
//TODO:elect leader: this will do for now
+ initDHTdata();
leader = getMyIpAddr();
-
- numHosts = 1;
- hostArray = calloc(numHosts, sizeof(struct hostData));
- hostArray[0] = myHostData;
- numBlocks = INIT_BLOCK_NUM;
- blockOwnerArray = calloc(numBlocks, sizeof(unsigned int));
- for (i = 0; i < numBlocks; i++)
- blockOwnerArray[i] = 0;
+ state = LEAD_NORMAL_STATE;
}
}
(void *)&msg, sizeof(msg), (void *)&response, sizeof(response),
TIMEOUT_MS, MAX_RETRIES);
if (bytesReceived == -1)
- {
- perror("initRebuild():recv()");
- }
+ { perror("initRebuild():recv()"); }
else if (bytesReceived != sizeof(response))
{
-#ifdef DHT_LOG
- fprintf(logfile,"initRebuild(): ERROR: response not completely received\n");
- fflush(logfile);
-#endif
+ dhtLog("initRebuild(): ERROR: response not completely received\n");
}
else if (response == NOT_LEADER)
{
-#ifdef DHT_LOG
struct in_addr address;
address.s_addr = htonl(leader);
- fprintf(logfile,"initRebuild(): ERROR: %s no longer leader\n",
+ dhtLog("initRebuild(): ERROR: %s no longer leader\n",
inet_ntoa(address));
- fflush(logfile);
-#endif
}
else if (response != REBUILD_RES)
{
-#ifdef DHT_LOG
- fprintf(logfile,"initRebuild(): ERROR: unexpected response\n");
- fflush(logfile);
-#endif
+ dhtLog("initRebuild(): ERROR: unexpected response\n");
}
else
{
-#ifdef DHT_LOG
- fprintf(logfile,"initRebuild(): submitted rebuild request\n");
+ dhtLog("initRebuild(): submitted rebuild request\n");
writeDHTdata(logfile);
- fflush(logfile);
-#endif
done = 1;
}
}
return;
}
-void leadRebuild()
-{
-
-}
-
-void followRebuild()
-{
-
-}
-
void writeDHTdata(FILE *outfile)
{
int i;
struct in_addr address;
+
fprintf(outfile,"numHosts=%d,numBlocks=%d\n", numHosts, numBlocks);
fprintf(outfile,"hostArray: index: ipAddr, maxKeyCapacity\n");
for (i = 0; i < numHosts; i++)
}
fprintf(outfile,"blockOwnerArray: index: blockOwner\n");
for (i = 0; i < numBlocks; i++)
+ fprintf(outfile,"%d: %d ", i, blockOwnerArray[i]);
+ fprintf(outfile,"\n");
+}
+
+void clearDHTdata()
+{
+ if (hostArray != NULL)
+ {
+ free(hostArray);
+ hostArray = NULL;
+ }
+ if (blockOwnerArray != NULL)
+ {
+ free(blockOwnerArray);
+ blockOwnerArray = NULL;
+ }
+ numHosts = numBlocks = hostArraySize = 0;
+ return;
+}
+
+void initDHTdata()
+{
+ int i;
+
+ clearDHTdata();
+ hostArraySize = INIT_HOST_ALLOC;
+ hostArray = calloc(hostArraySize, sizeof(struct hostData));
+ numHosts = 1;
+ hostArray[0] = myHostData;
+ numBlocks = INIT_BLOCK_NUM;
+ blockOwnerArray = calloc(numBlocks, sizeof(unsigned int));
+ for (i = 0; i < numBlocks; i++)
+ blockOwnerArray[i] = 0;
+
+ return;
+}
+
+void addHost(struct hostData newHost)
+{
+ struct hostData *newArray;
+ unsigned int newArraySize;
+
+ if (hostArray == NULL || blockOwnerArray == NULL || hostArraySize == 0)
+ initDHTdata();
+
+ if (numHosts == hostArraySize)
+ {
+ newArraySize = hostArraySize * 2;
+ newArray = calloc(newArraySize, sizeof(struct hostData));
+ memcpy(newArray, hostArray, (hostArraySize * sizeof(struct hostData)));
+ free(hostArray);
+ hostArray = newArray;
+ hostArraySize = newArraySize;
+ }
+
+ hostArray[numHosts] = newHost;
+ numHosts++;
+
+ return;
+}
+
+void makeAssignments()
+{
+ int i;
+
+ if (hostArray == NULL || blockOwnerArray == NULL || hostArraySize == 0)
+ initDHTdata();
+
+ if (numBlocks < numHosts)
+ {
+ free(blockOwnerArray);
+ while (numBlocks < numHosts)
+ numBlocks *= 2;
+ blockOwnerArray = calloc(numBlocks, sizeof(unsigned int));
+ }
+
+ for (i = 0; i < numBlocks; i++)
+ blockOwnerArray[i] = i % numHosts;
+
+ return;
+}
+
+//returns not-zero if ok, zero if not ok
+int msgSizeOk(unsigned char type, unsigned int size)
+{
+ int status;
+
+ switch (type)
{
- fprintf(outfile,"%d: %d\n", i, blockOwnerArray[i]);
+ case INSERT_CMD:
+ status = (size == sizeof(struct insertCmd));
+ break;
+ case INSERT_RES:
+ status = (size == sizeof(struct insertRes));
+ break;
+ case REMOVE_CMD:
+ status = (size == sizeof(struct removeCmd));
+ break;
+ case REMOVE_RES:
+ status = (size == sizeof(struct removeRes));
+ break;
+ case SEARCH_CMD:
+ status = (size == sizeof(struct searchCmd));
+ break;
+ case SEARCH_RES:
+ status = (size == sizeof(struct searchRes));
+ break;
+ case FIND_LEADER_REQ:
+ status = (size == sizeof(char));
+ break;
+ case FIND_LEADER_RES:
+ status = (size == sizeof(char));
+ break;
+ case REBUILD_REQ:
+ status = (size == sizeof(char));
+ break;
+ case REBUILD_RES:
+ status = (size == sizeof(char));
+ break;
+ case NOT_LEADER:
+ status = (size == sizeof(char));
+ break;
+ case REBUILD_CMD:
+ status = (size == sizeof(char));
+ break;
+ case JOIN_REQ:
+ status = (size == sizeof(struct joinReq));
+ break;
+ case JOIN_RES:
+ status = (size == sizeof(char));
+ break;
+ case GET_DHT_INFO_CMD:
+ status = (size == sizeof(char));
+ break;
+ case DHT_INFO_REQ:
+ status = (size == sizeof(char));
+ break;
+ case DHT_INFO_RES:
+ status = (size == sizeof(char));
+ break;
+ case FILL_DHT_CMD:
+ status = (size == sizeof(char));
+ break;
+ case FILL_DHT_RES:
+ status = (size == sizeof(char));
+ break;
+ case REBUILD_DONE_INFO:
+ status = (size == sizeof(char));
+ break;
+ default:
+ status = 0;
+ break;
}
+ return status;
+}
+
+void dhtLog(const char *format, ...)
+{
+ va_list args;
+ struct timeval now;
+
+ if (gettimeofday(&now, NULL) < 0)
+ { perror("dhtLog():gettimeofday()"); }
+ va_start(args, format);
+ if (fprintf(logfile, "%d.%06d:", now.tv_sec, now.tv_usec) < 0)
+ { perror("dhtLog():fprintf()"); }
+ if (vfprintf(logfile, format, args) < 0)
+ { perror("dhtLog():vfprintf()"); }
+ if (fflush(logfile) == EOF)
+ { perror("dhtLog():fflush()"); }
+ va_end(args);
+
+ return;
}
#endif
+