From 4235a490badf00e2a1043ba22972309dd3673530 Mon Sep 17 00:00:00 2001 From: erubow Date: Sat, 30 Jun 2007 01:27:20 +0000 Subject: [PATCH] Implemented leader/follower state machines for rebuild procedure up to and including distribution of host list and keyspace responsibilities. I did this using broadcasts under the assumption that all hosts were on a single network segment, but this will be changed. Implemented a nice time-stamped logging function to facilitate testing. --- Robust/src/Runtime/DSTM/interface/dht.c | 1005 +++++++++++++---------- 1 file changed, 585 insertions(+), 420 deletions(-) diff --git a/Robust/src/Runtime/DSTM/interface/dht.c b/Robust/src/Runtime/DSTM/interface/dht.c index 173ff7b9..2b956665 100644 --- a/Robust/src/Runtime/DSTM/interface/dht.c +++ b/Robust/src/Runtime/DSTM/interface/dht.c @@ -41,12 +41,17 @@ int dhtSearch(unsigned int key, unsigned int *val) #else +/******************************************************************************* +* Includes +*******************************************************************************/ + #include #include #include #include #include #include +#include #include #include #include @@ -56,22 +61,28 @@ int dhtSearch(unsigned int key, unsigned int *val) #include #include #include +#include #include "clookup.h" //this works for now, do we need anything better? +/******************************************************************************* +* Local Defines, Structs +*******************************************************************************/ + #define BUFFER_SIZE 512 //maximum message size #define UDP_PORT 2157 #define TCP_PORT 2157 #define BACKLOG 10 //max pending tcp connections #define TIMEOUT_MS 500 #define MAX_RETRIES 3 -#define INIT_HOST_ALLOC 16 -#define INIT_BLOCK_NUM 64 +#define INIT_HOST_ALLOC 1 +#define INIT_BLOCK_NUM 1 #define DEFAULT_INTERFACE "eth0" #define DHT_LOG "dht.log" +//make sure this is consistent with enum below +#define NUM_MSG_TYPES 20 -#define NUM_MSG_TYPES 19 - +//make sure this matches msg_types global var enum { INSERT_CMD, INSERT_RES, @@ -79,7 +90,7 @@ enum { REMOVE_RES, SEARCH_CMD, SEARCH_RES, - FIND_LEADER_CMD, + FIND_LEADER_REQ, FIND_LEADER_RES, REBUILD_REQ, REBUILD_RES, @@ -87,6 +98,7 @@ enum { REBUILD_CMD, JOIN_REQ, JOIN_RES, + GET_DHT_INFO_CMD, DHT_INFO_REQ, DHT_INFO_RES, FILL_DHT_CMD, @@ -94,29 +106,6 @@ enum { REBUILD_DONE_INFO }; -const char *msg_types[NUM_MSG_TYPES] = -{ - "INSERT_CMD", - "INSERT_RES", - "REMOVE_CMD", - "REMOVE_RES", - "SEARCH_CMD", - "SEARCH_RES", - "FIND_LEADER_CMD", - "FIND_LEADER_RES", - "REBUILD_REQ", - "REBUILD_RES", - "NOT_LEADER", - "REBUILD_CMD", - "JOIN_REQ", - "JOIN_RES", - "DHT_INFO_REQ", - "DHT_INFO_RES", - "FILL_DHT_CMD", - "FILL_DHT_RES", - "REBUILD_DONE_INFO" -}; - //status codes enum { INSERT_OK, @@ -128,6 +117,17 @@ enum { NOT_KEY_OWNER, }; +enum { + NORMAL_STATE, + REBUILD1_STATE, + REBUILD2_STATE, + REBUILD3_STATE, + LEAD_NORMAL_STATE, + LEAD_REBUILD1_STATE, + LEAD_REBUILD2_STATE, + LEAD_REBUILD3_STATE +}; + struct hostData { unsigned int ipAddr; unsigned int maxKeyCapacity; @@ -171,28 +171,75 @@ struct searchRes { unsigned int val; }; -struct rebuildRes { +struct joinReq { unsigned int msgType:8; unsigned int unused:24; - unsigned int status; + struct hostData newHostData; }; -//TODO: leave message, rebuild message... +/******************************************************************************* +* Global Variables +*******************************************************************************/ + +//make sure this matches enumeration above +const char *msg_types[NUM_MSG_TYPES] = +{ + "INSERT_CMD", + "INSERT_RES", + "REMOVE_CMD", + "REMOVE_RES", + "SEARCH_CMD", + "SEARCH_RES", + "FIND_LEADER_REQ", + "FIND_LEADER_RES", + "REBUILD_REQ", + "REBUILD_RES", + "NOT_LEADER", + "REBUILD_CMD", + "JOIN_REQ", + "JOIN_RES", + "GET_DHT_INFO_CMD", + "DHT_INFO_REQ", + "DHT_INFO_RES", + "FILL_DHT_CMD", + "FILL_DHT_RES", + "REBUILD_DONE_INFO" +}; FILE *logfile; -unsigned int leader; //ip address of leader +//ip address of leader +unsigned int leader; +//set by dhtInit() struct hostData myHostData; -/*----DHT data----*/ +//number of hosts in the system unsigned int numHosts; +//ip address and max key capacity of each host struct hostData *hostArray; +//memory allocated for this many items in hostArray +unsigned int hostArraySize; +//number of keyspace divisions, preferably a power of 2 > numHosts unsigned int numBlocks; +//this array has numBlocks elements, each of which contains an index to hostArray +// the key owner is found by hashing the key into one of these blocks and using this +// array to find the corresponding host in hostArray unsigned int *blockOwnerArray; -/*----end DHT data----*/ +//used by leader to track which hosts have responded, etc. +unsigned int *hostRebuildStates; +//thread handles pthread_t threadUdpListen; pthread_t threadTcpListen; -int udpServerSock; +//server sockets +struct pollfd udpServerPollSock; int tcpListenSock; +//see above for enumeration of states +int state; + +/******************************************************************************* +* Local Function Prototypes +*******************************************************************************/ +//log funtion, use like printf() +void dhtLog(const char *format, ...); //return my IP address unsigned int getMyIpAddr(); //sends broadcast to discover leader @@ -218,13 +265,24 @@ int sendNoWait(unsigned int dest_ip, unsigned short dest_port, void *msg, unsigned int getKeyOwner(unsigned int key); //simple hash unsigned int hash(unsigned int x); +//sends REBUILD_REQ to leader, retries until leader responds, or causes new leader to be chosen +void initRebuild(); +//adds entry to end of hostArray, increments numHosts, +// allocates more space if necessary +void addHost(struct hostData newHost); //initiates TCP connection with leader, gets DHT data int getDHTdata(); //outputs readable DHT data to outfile void writeDHTdata(FILE *outfile); -void initRebuild(); -void leadRebuild(); -void followRebuild(); +void clearDHTdata(); +void initDHTdata(); +void makeAssignments(); +//returns not-zero if ok, zero if not ok +int msgSizeOk(unsigned char type, unsigned int size); + +/******************************************************************************* +* Global Function Definitions +*******************************************************************************/ void dhtInit(unsigned int maxKeyCapacity) { @@ -233,57 +291,36 @@ void dhtInit(unsigned int maxKeyCapacity) int i; int ret; -#ifdef DHT_LOG logfile = fopen(DHT_LOG, "w"); -#endif + dhtLog("dhtInit() - initializing...\n"); myHostData.ipAddr = getMyIpAddr(); myHostData.maxKeyCapacity = maxKeyCapacity; - numHosts = numBlocks = 0; + numHosts = numBlocks = hostArraySize = 0; hostArray = NULL; blockOwnerArray = NULL; + hostRebuildStates = NULL; + + state = NORMAL_STATE; pthread_create(&threadUdpListen, NULL, udpListen, NULL); pthread_create(&threadTcpListen, NULL, tcpListen, NULL); initRebuild(); -/* leader = findLeader(); - - if (leader == 0) - { //no response: I am the first - leader = getMyIpAddr(); - - numHosts = 1; - hostArray = calloc(numHosts, sizeof(struct hostData)); - hostArray[0] = myHostData; - numBlocks = INIT_BLOCK_NUM; - blockOwnerArray = calloc(numBlocks, sizeof(unsigned int)); - for (i = 0; i < numBlocks; i++) - blockOwnerArray[i] = 0; - } - else - { - //get DHT data from leader - ret = getDHTdata(); - - //TODO: actually, just initiate a rebuild here instead - } -*/ - - //start servers - return; } void dhtExit() { + dhtLog("dhtExit(): cleaning up...\n"); fclose(logfile); pthread_cancel(threadUdpListen); pthread_cancel(threadTcpListen); - close(udpServerSock); + close(udpServerPollSock.fd); close(tcpListenSock); + clearDHTdata(); } int dhtInsert(unsigned int key, unsigned int val) @@ -372,13 +409,16 @@ int dhtSearch(unsigned int key, unsigned int *val) return -1; //this function should be robust enough to always return 0 or 1 } - +/******************************************************************************* +* Local Function Definitions +*******************************************************************************/ //use UDP for messages that are frequent and short void *udpListen() { struct sockaddr_in myAddr; struct sockaddr_in clientAddr; + struct sockaddr_in bcastAddr; socklen_t socklen = sizeof(struct sockaddr_in); char buffer[BUFFER_SIZE]; ssize_t bytesReceived; @@ -388,220 +428,273 @@ void *udpListen() struct insertRes *insertResPtr; struct removeRes *removeResPtr; struct searchRes *searchResPtr; + struct joinReq *joinReqPtr; char replyBuffer[BUFFER_SIZE]; struct timeval now; + struct timeval rebuild1Timeout; + int rebuild1TimerSet; + int on; + int pollret; + int i; chashtable_t *myHashTable = chashCreate(HASH_SIZE, LOADFACTOR); - if ((udpServerSock = socket(AF_INET, SOCK_DGRAM, IPPROTO_UDP)) == -1) + if ((udpServerPollSock.fd = socket(AF_INET, SOCK_DGRAM, IPPROTO_UDP)) == -1) { perror("udpListen():socket()"); pthread_exit(NULL); } + + on = 1; + if (setsockopt(udpServerPollSock.fd, SOL_SOCKET, SO_BROADCAST, &on, + sizeof(on)) == -1) + { + perror("udpBroadcastWaitForResponse():setsockopt()"); + pthread_exit(NULL); + } + + udpServerPollSock.events = POLLIN; bzero(&myAddr, socklen); - myAddr.sin_family=AF_INET; - myAddr.sin_addr.s_addr=INADDR_ANY; - myAddr.sin_port=htons(UDP_PORT); + myAddr.sin_family = AF_INET; + myAddr.sin_addr.s_addr = htonl(INADDR_ANY); + myAddr.sin_port = htons(UDP_PORT); + + bzero(&bcastAddr, socklen); + bcastAddr.sin_family = AF_INET; + bcastAddr.sin_addr.s_addr = htonl(0xFFFFFFFF); + bcastAddr.sin_port = htons(UDP_PORT); - if (bind(udpServerSock, (struct sockaddr *)&myAddr, socklen) == -1) + if (bind(udpServerPollSock.fd, (struct sockaddr *)&myAddr, socklen) == -1) { perror("udpListen():bind()"); pthread_exit(NULL); } -#ifdef DHT_LOG - fprintf(logfile,"udpListen(): listening on port %d\n", UDP_PORT); - fflush(logfile); -#endif + dhtLog("udpListen(): listening on port %d\n", UDP_PORT); + + rebuild1TimerSet = 0; while(1) { - if ((bytesReceived = recvfrom(udpServerSock, buffer, BUFFER_SIZE, 0, - (struct sockaddr *)&clientAddr, &socklen)) == -1) - { - perror("udpListen():recvfrom()"); - } - else if (bytesReceived == 0) + pollret = poll(&udpServerPollSock, 1, TIMEOUT_MS); + if (pollret < 0) + { perror("udpListen():poll()"); } + else if (pollret > 0) { -#ifdef DHT_LOG - fprintf(logfile,"udpListen(): recvfrom() returned 0\n"); - fflush(logfile); -#endif - } - else - { - gettimeofday(&now, NULL); -#ifdef DHT_LOG - fprintf(logfile, "udpListen(): received %s from %s\n", - (buffer[0] < NUM_MSG_TYPES ? msg_types[buffer[0]] : "unknown message"), - inet_ntoa(clientAddr.sin_addr)); -// fprintf(logfile,"udpListen(): time received:%ds,%dus\n", now.tv_sec, -// now.tv_usec); -// fprintf(logfile,"udpListen(): msg size:%d bytes source:%s:%d\n", -// bytesReceived,inet_ntoa(clientAddr.sin_addr),htons(clientAddr.sin_port)); - fflush(logfile); -#endif - - switch (buffer[0]) + if ((bytesReceived = recvfrom(udpServerPollSock.fd, buffer, BUFFER_SIZE, + 0, (struct sockaddr *)&clientAddr, &socklen)) == -1) + { perror("udpListen():recvfrom()"); } + else if (bytesReceived == 0) { - case INSERT_CMD: - if (bytesReceived != sizeof(struct insertCmd)) - { -#ifdef DHT_LOG - fprintf(logfile, "udpListen(): ERROR: incorrect message size\n"); - fflush(logfile); -#endif - break; - } - insertCmdPtr = (struct insertCmd *)buffer; -#ifdef DHT_LOG - fprintf(logfile, "udpListen(): Insert: key=%d, val=%d\n", - insertCmdPtr->key, insertCmdPtr->val); - fflush(logfile); -#endif - insertResPtr = (struct insertRes *)replyBuffer; - insertResPtr->msgType = INSERT_RES; - if (getKeyOwner(insertCmdPtr->key) == myHostData.ipAddr) - { - //note: casting val to void * in order to conform to API - if(chashInsert(myHashTable, insertCmdPtr->key, - (void *)insertCmdPtr->val) == 0) - insertResPtr->status = INSERT_OK; - else - insertResPtr->status = INSERT_ERROR; - } - else - { - insertResPtr->status = NOT_KEY_OWNER;; - } - if (sendto(udpServerSock, (void *)insertResPtr, - sizeof(struct insertRes), 0, (struct sockaddr *)&clientAddr, - socklen) == -1) - { - perror("udpListen():sendto()"); - } - break; - case REMOVE_CMD: - if (bytesReceived != sizeof(struct removeCmd)) - { -#ifdef DHT_LOG - fprintf(logfile, "udpListen(): ERROR: incorrect message size\n"); - fflush(logfile); -#endif - break; - } - removeCmdPtr = (struct removeCmd *)buffer; -#ifdef DHT_LOG - fprintf(logfile,"udpListen(): Remove: key=%d\n", removeCmdPtr->key); - fflush(logfile); -#endif - removeResPtr = (struct removeRes *)replyBuffer; - removeResPtr->msgType = REMOVE_RES; - if (getKeyOwner(removeCmdPtr->key) == myHostData.ipAddr) - { - //note: casting val to void * in order to conform to API - if(chashRemove(myHashTable, removeCmdPtr->key) == 0) - removeResPtr->status = INSERT_OK; - else - removeResPtr->status = INSERT_ERROR; - } - else - { - removeResPtr->status = NOT_KEY_OWNER; - } - if (sendto(udpServerSock, (void *)removeResPtr, sizeof(struct removeRes), 0, - (struct sockaddr *)&clientAddr, socklen) == -1) - { - perror("udpListen():sendto()"); - } - break; - case SEARCH_CMD: - if (bytesReceived != sizeof(struct searchCmd)) - { -#ifdef DHT_LOG - fprintf(logfile,"udpListen(): ERROR: incorrect message size\n"); - fflush(logfile); -#endif - break; - } - searchCmdPtr = (struct searchCmd *)buffer; -#ifdef DHT_LOG - fprintf(logfile,"udpListen(): Search: key=%d\n",searchCmdPtr->key); - fflush(logfile); -#endif - searchResPtr = (struct searchRes *)replyBuffer; - searchResPtr->msgType = SEARCH_RES; - if (getKeyOwner(searchCmdPtr->key) == myHostData.ipAddr) - { - //note: casting val to void * in order to conform to API - if((searchResPtr->val = (unsigned int)chashSearch(myHashTable, - searchCmdPtr->key)) == 0) - searchResPtr->status = KEY_NOT_FOUND; - else - searchResPtr->status = KEY_FOUND; - } - else - { - searchResPtr->status = NOT_KEY_OWNER; - } - if (sendto(udpServerSock, (void *)searchResPtr, sizeof(struct searchRes), 0, - (struct sockaddr *)&clientAddr, socklen) == -1) - { - perror("udpListen():sendto()"); - } - break; - case FIND_LEADER_CMD: - if (bytesReceived != sizeof(char)) - { -#ifdef DHT_LOG - fprintf(logfile, "udpListen(): ERROR: incorrect message size\n"); - fflush(logfile); -#endif - break; - } - if (leader == getMyIpAddr()) - { - replyBuffer[0] = FIND_LEADER_RES; - if(sendto(udpServerSock, (void *)replyBuffer, sizeof(char), 0, - (struct sockaddr *)&clientAddr, socklen) == -1) - { - perror("udpListen():sendto"); - } - } - break; - case REBUILD_REQ: - if (bytesReceived != sizeof(char)) - { -#ifdef DHT_LOG - fprintf(logfile, "udpListen(): ERROR: incorrect message size\n"); - fflush(logfile); -#endif - break; - } - if (leader == getMyIpAddr()) - { - replyBuffer[0] = REBUILD_RES; - if(sendto(udpServerSock, (void *)replyBuffer, sizeof(char), 0, - (struct sockaddr *)&clientAddr, socklen) == -1) - { - perror("udpListen():sendto"); - } - //TODO: leadRebuild() - } - else + dhtLog("udpListen(): recvfrom() returned 0\n"); + } + else + { + dhtLog("udpListen(): received %s from %s\n", + (buffer[0] < NUM_MSG_TYPES ? msg_types[buffer[0]] : + "unknown message"), inet_ntoa(clientAddr.sin_addr)); + if (!msgSizeOk(buffer[0], bytesReceived)) + { + dhtLog("udpListen(): ERROR: incorrect message size\n"); + } + else + { + switch (buffer[0]) { - replyBuffer[0] = NOT_LEADER; - if(sendto(udpServerSock, (void *)replyBuffer, sizeof(char), 0, - (struct sockaddr *)&clientAddr, socklen) == -1) - { - perror("udpListen():sendto"); - } + case INSERT_CMD: + if (state == NORMAL_STATE || state == LEAD_NORMAL_STATE + || state == REBUILD3_STATE || state == LEAD_REBUILD3_STATE) + { + insertCmdPtr = (struct insertCmd *)buffer; + dhtLog( "udpListen(): Insert: key=%d, val=%d\n", + insertCmdPtr->key, insertCmdPtr->val); + insertResPtr = (struct insertRes *)replyBuffer; + insertResPtr->msgType = INSERT_RES; + insertResPtr->unused = 0; + if (getKeyOwner(insertCmdPtr->key) == myHostData.ipAddr) + { + //note: casting val to void * in order to conform to API + if(chashInsert(myHashTable, insertCmdPtr->key, + (void *)insertCmdPtr->val) == 0) + insertResPtr->status = INSERT_OK; + else + insertResPtr->status = INSERT_ERROR; + } + else + { + insertResPtr->status = NOT_KEY_OWNER;; + } + if (sendto(udpServerPollSock.fd, (void *)insertResPtr, + sizeof(struct insertRes), 0, (struct sockaddr *)&clientAddr, + socklen) == -1) + { perror("udpListen():sendto()"); } + } + break; + case REMOVE_CMD: + if (state == NORMAL_STATE || state == LEAD_NORMAL_STATE) + { + removeCmdPtr = (struct removeCmd *)buffer; + dhtLog("udpListen(): Remove: key=%d\n", removeCmdPtr->key); + removeResPtr = (struct removeRes *)replyBuffer; + removeResPtr->msgType = REMOVE_RES; + removeResPtr->unused = 0; + if (getKeyOwner(removeCmdPtr->key) == myHostData.ipAddr) + { + //note: casting val to void * in order to conform to API + if(chashRemove(myHashTable, removeCmdPtr->key) == 0) + removeResPtr->status = INSERT_OK; + else + removeResPtr->status = INSERT_ERROR; + } + else + { + removeResPtr->status = NOT_KEY_OWNER; + } + if (sendto(udpServerPollSock.fd, (void *)removeResPtr, + sizeof(struct removeRes), 0, (struct sockaddr *)&clientAddr, + socklen) == -1) + { perror("udpListen():sendto()"); } + } + break; + case SEARCH_CMD: + if (state == NORMAL_STATE || state == LEAD_NORMAL_STATE) + { + searchCmdPtr = (struct searchCmd *)buffer; + dhtLog("udpListen(): Search: key=%d\n",searchCmdPtr->key); + searchResPtr = (struct searchRes *)replyBuffer; + searchResPtr->msgType = SEARCH_RES; + searchResPtr->unused = 0; + if (getKeyOwner(searchCmdPtr->key) == myHostData.ipAddr) + { + //note: casting val to void * in order to conform to API + if((searchResPtr->val = (unsigned int)chashSearch(myHashTable, + searchCmdPtr->key)) == 0) + searchResPtr->status = KEY_NOT_FOUND; + else + searchResPtr->status = KEY_FOUND; + } + else + { + searchResPtr->status = NOT_KEY_OWNER; + } + if (sendto(udpServerPollSock.fd, (void *)searchResPtr, + sizeof(struct searchRes), 0, (struct sockaddr *)&clientAddr, + socklen) == -1) + { perror("udpListen():sendto()"); } + } + break; + case FIND_LEADER_REQ: + if (state == LEAD_NORMAL_STATE || state == LEAD_REBUILD1_STATE + || state == LEAD_REBUILD2_STATE || state == LEAD_REBUILD3_STATE) + { + replyBuffer[0] = FIND_LEADER_RES; + if(sendto(udpServerPollSock.fd, (void *)replyBuffer, + sizeof(char), 0,(struct sockaddr *)&clientAddr, socklen) == -1) + { perror("udpListen():sendto()"); } + } + break; + case REBUILD_REQ: + if (state == LEAD_NORMAL_STATE || state == LEAD_REBUILD1_STATE + || state == LEAD_REBUILD2_STATE || state == LEAD_REBUILD3_STATE) + { + replyBuffer[0] = REBUILD_RES; + if (sendto(udpServerPollSock.fd, (void *)replyBuffer, + sizeof(char), 0, (struct sockaddr *)&clientAddr, socklen) == -1) + { perror("udpListen():sendto()"); } + if (gettimeofday(&rebuild1Timeout, NULL) < 0) + { perror("dhtLog():gettimeofday()"); } + //TODO: make this a configurable parameter + rebuild1Timeout.tv_sec += 3; + rebuild1TimerSet = 1; + //clear out previous host data + numHosts = 1; + hostArray[0] = myHostData; + + state = LEAD_REBUILD1_STATE; + + replyBuffer[0] = REBUILD_CMD; + if (sendto(udpServerPollSock.fd, (void *)replyBuffer, + sizeof(char), 0, (struct sockaddr *)&bcastAddr, socklen) == -1) + { perror("udpListen():sendto()"); } + + } + else + { + replyBuffer[0] = NOT_LEADER; + if(sendto(udpServerPollSock.fd, (void *)replyBuffer, + sizeof(char), 0,(struct sockaddr *)&clientAddr, socklen) == -1) + { perror("udpListen():sendto()"); } + } + case REBUILD_CMD: + if (state != LEAD_REBUILD1_STATE) + { + //consider this an official declaration of authority, + // in case I was confused about this + leader = htonl(clientAddr.sin_addr.s_addr); + + clearDHTdata(); + + joinReqPtr = (struct joinReq *)replyBuffer; + joinReqPtr->msgType = JOIN_REQ; + joinReqPtr->unused = 0; + joinReqPtr->newHostData = myHostData; + //note: I'm reusing bytesReceived and buffer + bytesReceived = udpSendWaitForResponse(leader, UDP_PORT, + (void *)replyBuffer, sizeof(struct joinReq), (void *)buffer, + BUFFER_SIZE, TIMEOUT_MS, MAX_RETRIES); + if ((bytesReceived == sizeof(char)) && (buffer[0] == JOIN_RES)) + state = REBUILD1_STATE; + else + initRebuild(); + } + break; + case JOIN_REQ: + if (state == LEAD_REBUILD1_STATE) + { + joinReqPtr = (struct joinReq *)buffer; + addHost(joinReqPtr->newHostData); + + replyBuffer[0] = JOIN_RES; + if (sendto(udpServerPollSock.fd, (void *)replyBuffer, + sizeof(char), 0,(struct sockaddr *)&clientAddr, socklen) == -1) + { perror("udpListen():sendto()"); } + } + break; + case GET_DHT_INFO_CMD: + if (state == REBUILD1_STATE) + { + getDHTdata(); + state = REBUILD2_STATE; + } + break; + default: + dhtLog("udpListen(): ERROR: Unknown message type\n"); } - break; -// default: -#ifdef DHT_LOG -// fprintf(logfile,"udpListen(): ERROR: Unknown message type\n"); -// fflush(logfile); -#endif + } + } + } //end (pollret > 0) + else // (pollret == 0), timeout + { + if (gettimeofday(&now, NULL) < 0) + { perror("dhtLog():gettimeofday()"); } + if (rebuild1TimerSet && timercmp(&now, &rebuild1Timeout, >)) + { + rebuild1TimerSet = 0; + if (state == LEAD_REBUILD1_STATE) + { + makeAssignments(); + dhtLog("udpListen(): assignments made\n"); + writeDHTdata(logfile); + if (hostRebuildStates != NULL) + free(hostRebuildStates); + hostRebuildStates = calloc(numHosts, sizeof(unsigned int)); + for (i = 0; i < numHosts; i++) + hostRebuildStates[i] = REBUILD1_STATE; + state = LEAD_REBUILD2_STATE; + replyBuffer[0] = GET_DHT_INFO_CMD; + if (sendto(udpServerPollSock.fd, (void *)replyBuffer, + sizeof(char), 0, (struct sockaddr *)&bcastAddr, socklen) == -1) + { perror("udpListen():sendto()"); } + } } } } @@ -615,7 +708,6 @@ int udpSendWaitForResponse(unsigned int dest_ip, unsigned short dest_port, struct sockaddr_in ack_addr; socklen_t socklen = sizeof(struct sockaddr_in); struct pollfd pollsock; - struct timeval now; int retval; int i; ssize_t bytesReceived; @@ -635,26 +727,21 @@ int udpSendWaitForResponse(unsigned int dest_ip, unsigned short dest_port, for (i = 0; i < MAX_RETRIES; i++) { -#ifdef DHT_LOG if (i > 0) - fprintf(logfile,"udpSendWaitForResponse(): trying again, count: %d\n", - i+1); - fflush(logfile); -#endif + dhtLog("udpSendWaitForResponse(): trying again, count: %d\n", i+1); if (sendto(pollsock.fd, msg, msglen, 0, (struct sockaddr *)&server_addr, socklen) == -1) { perror("udpSendWaitForResponse():sendto"); return -1; } -#ifdef DHT_LOG - gettimeofday(&now, NULL); - fprintf(logfile,"udpSendWaitForResponse(): message sent:%ds,%dus\n", - now.tv_sec, now.tv_usec); - fflush(logfile); -#endif + dhtLog("udpSendWaitForResponse(): message sent\n"); retval = poll(&pollsock, 1, timeout); - if (retval !=0) + if (retval < 0) + { + perror("udpSendWaitForResponse():poll()"); + } + else if (retval > 0) { bytesReceived = recvfrom(pollsock.fd, resBuffer, resBufferSize, 0, (struct sockaddr *)&ack_addr, &socklen); @@ -662,22 +749,13 @@ int udpSendWaitForResponse(unsigned int dest_ip, unsigned short dest_port, && (ack_addr.sin_port == server_addr.sin_port)) { close(pollsock.fd); -#ifdef DHT_LOG - gettimeofday(&now, NULL); - fprintf(logfile,"udpSendWaitForResponse(): received response:%ds,%dus\n", now.tv_sec, now.tv_usec); - fflush(logfile); -#endif + dhtLog("udpSendWaitForResponse(): received response\n"); return bytesReceived; } } } close(pollsock.fd); -#ifdef DHT_LOG - gettimeofday(&now, NULL); - printf("udpSendWaitForResponse(): timed out, no ack:%ds,%dus\n", - now.tv_sec, now.tv_usec); - fflush(logfile); -#endif + printf("udpSendWaitForResponse(): timed out, no ack\n"); return -1; } @@ -689,7 +767,6 @@ int udpBroadcastWaitForResponse(unsigned int *reply_ip, struct sockaddr_in ack_addr; socklen_t socklen = sizeof(struct sockaddr_in); struct pollfd pollsock; - struct timeval now; int retval; int i; ssize_t bytesReceived; @@ -717,23 +794,15 @@ int udpBroadcastWaitForResponse(unsigned int *reply_ip, for (i = 0; i < MAX_RETRIES; i++) { -#ifdef DHT_LOG if (i > 0) - fprintf(logfile,"udpBroadcastWaitForResponse(): trying again, count: %d\n", i+1); - fflush(logfile); -#endif + dhtLog("udpBroadcastWaitForResponse(): trying again, count: %d\n", i+1); if (sendto(pollsock.fd, msg, msglen, 0, (struct sockaddr *)&server_addr, socklen) == -1) { perror("udpBroadcastWaitForResponse():sendto()"); return -1; } -#ifdef DHT_LOG - gettimeofday(&now, NULL); - fprintf(logfile,"udpBroadcastWaitForResponse(): message sent:%ds,%dus\n", - now.tv_sec, now.tv_usec); - fflush(logfile); -#endif + dhtLog("udpBroadcastWaitForResponse(): message sent\n"); retval = poll(&pollsock, 1, timeout); if (retval !=0) { @@ -741,21 +810,12 @@ int udpBroadcastWaitForResponse(unsigned int *reply_ip, (struct sockaddr *)&ack_addr, &socklen); close(pollsock.fd); *reply_ip = htonl(ack_addr.sin_addr.s_addr); -#ifdef DHT_LOG - gettimeofday(&now, NULL); - fprintf(logfile,"udpBroadcastWaitForResponse(): received response:%ds,%dus\n", now.tv_sec, now.tv_usec); - fflush(logfile); -#endif + dhtLog("udpBroadcastWaitForResponse(): received response\n"); return bytesReceived; } } close(pollsock.fd); -#ifdef DHT_LOG - gettimeofday(&now, NULL); - fprintf(logfile,"udpBroadcastWaitForResponse(): timed out, no ack:%ds,%dus\n", - now.tv_sec, now.tv_usec); - fflush(logfile); -#endif + dhtLog("udpBroadcastWaitForResponse(): timed out, no ack\n"); return -1; } @@ -792,14 +852,12 @@ void *tcpListen() pthread_exit(NULL); } -#ifdef DHT_LOG - fprintf(logfile,"tcpListen(): listening on port %d\n", TCP_PORT); - fflush(logfile); -#endif + dhtLog("tcpListen(): listening on port %d\n", TCP_PORT); while(1) { - tcpAcceptSock = accept(tcpListenSock, (struct sockaddr *)&clientAddr, &socklen); + tcpAcceptSock = accept(tcpListenSock, (struct sockaddr *)&clientAddr, + &socklen); pthread_create(&threadTcpAccept, NULL, tcpAccept, (void *)tcpAcceptSock); } } @@ -810,22 +868,15 @@ void *tcpAccept(void *arg) int bytesReceived; char msgType; -#ifdef DHT_LOG - fprintf(logfile, "tcpAccept(): accepted tcp connection, file descriptor: %d\n", tcpAcceptSock); - fflush(logfile); -#endif + dhtLog("tcpAccept(): accepted tcp connection, file descriptor: %d\n", + tcpAcceptSock); bytesReceived = recv(tcpAcceptSock, &msgType, sizeof(char), 0); if (bytesReceived == -1) - { - perror("tcpAccept():recv()"); - } + { perror("tcpAccept():recv()"); } else if (bytesReceived == 0) { -#ifdef DHT_LOG - fprintf(logfile, "tcpAccept(): bytesReceived = 0\n", tcpAcceptSock); - fflush(logfile); -#endif + dhtLog( "tcpAccept(): bytesReceived = 0\n", tcpAcceptSock); } else { @@ -856,30 +907,31 @@ void *tcpAccept(void *arg) } break; default: -#ifdef DHT_LOG - fprintf(logfile, "tcpAccept(): unrecognized msg type\n"); - fflush(logfile); -#endif + dhtLog("tcpAccept(): unrecognized msg type\n"); } } if (close(tcpAcceptSock) == -1) - { - perror("tcpAccept():close()"); - } + { perror("tcpAccept():close()"); } -#ifdef DHT_LOG - fprintf(logfile, "tcpAccept(): closed tcp connection, file descriptor: %d\n", + dhtLog("tcpAccept(): closed tcp connection, file descriptor: %d\n", tcpAcceptSock); - fflush(logfile); -#endif pthread_exit(NULL); } unsigned int getKeyOwner(unsigned int key) { - return hostArray[blockOwnerArray[hash(key)]].ipAddr; + if (state == NORMAL_STATE || state == LEAD_NORMAL_STATE + || state == REBUILD3_STATE || state == LEAD_REBUILD3_STATE) + { + return hostArray[blockOwnerArray[hash(key)]].ipAddr; + } + else + { //TODO: figure out what is best to do here. Would like calls to dhtSearch, + // etc. to block rather than fail during rebuilds + return 0; + } } unsigned int getMyIpAddr() @@ -915,12 +967,9 @@ unsigned int findLeader() char myMessage; char response; -#ifdef DHT_LOG - fprintf(logfile, "findLeader(): broadcasting...\n"); - fflush(logfile); -#endif + dhtLog("findLeader(): broadcasting...\n"); - myMessage = FIND_LEADER_CMD; + myMessage = FIND_LEADER_REQ; bytesReceived = udpBroadcastWaitForResponse(&reply_ip, UDP_PORT, (void *)&myMessage, sizeof(myMessage), (void *)&response, @@ -928,29 +977,20 @@ unsigned int findLeader() if (bytesReceived == -1) { -#ifdef DHT_LOG - fprintf(logfile, "findLeader(): no response\n"); - fflush(logfile); -#endif + dhtLog("findLeader(): no response\n"); return 0; } else if (response == FIND_LEADER_RES) { -#ifdef DHT_LOG struct in_addr reply_addr; reply_addr.s_addr = htonl(reply_ip); - fprintf(logfile, "findLeader(): leader found:%s\n", + dhtLog("findLeader(): leader found:%s\n", inet_ntoa(reply_addr)); - fflush(logfile); -#endif return reply_ip; } else { -#ifdef DHT_LOG - fprintf(logfile, "findLeader(): unexpected response\n"); - fflush(logfile); -#endif + dhtLog("findLeader(): unexpected response\n"); return 0; } } @@ -962,6 +1002,8 @@ int getDHTdata() char msg; int bytesReceived; + clearDHTdata(); + if ((sock = socket(AF_INET, SOCK_STREAM, 0)) == -1) { perror("getDHTdata():socket()"); @@ -995,10 +1037,7 @@ int getDHTdata() } if (bytesReceived != sizeof(numHosts)) { -#ifdef DHT_LOG - fprintf(logfile,"getDHTdata(): ERROR: numHosts not completely received\n"); - fflush(logfile); -#endif + dhtLog("getDHTdata(): ERROR: numHosts not completely received\n"); close(sock); return -1; } @@ -1011,15 +1050,10 @@ int getDHTdata() } if (bytesReceived != sizeof(numBlocks)) { -#ifdef DHT_LOG - fprintf(logfile,"getDHTdata(): ERROR: numBlocks not completely received\n"); - fflush(logfile); -#endif + dhtLog("getDHTdata(): ERROR: numBlocks not completely received\n"); close(sock); return -1; } - if (hostArray != NULL) - free(hostArray); hostArray = calloc(numHosts, sizeof(struct hostData)); bytesReceived = recv(sock, hostArray, numHosts*sizeof(struct hostData), 0); if (bytesReceived == -1) @@ -1030,17 +1064,12 @@ int getDHTdata() } if (bytesReceived != numHosts*sizeof(struct hostData)) { -#ifdef DHT_LOG - fprintf(logfile,"getDHTdata(): ERROR: hostArray not completely received\n"); - fflush(logfile); -#endif + dhtLog("getDHTdata(): ERROR: hostArray not completely received\n"); close(sock); return -1; } - if (blockOwnerArray != NULL) - free(blockOwnerArray); blockOwnerArray = calloc(numBlocks, sizeof(unsigned int)); - bytesReceived = recv(sock, blockOwnerArray, numBlocks*sizeof(unsigned int), 0); + bytesReceived = recv(sock,blockOwnerArray,numBlocks*sizeof(unsigned int),0); if (bytesReceived == -1) { perror("getDHTdata():recv()"); @@ -1049,24 +1078,21 @@ int getDHTdata() } if (bytesReceived != numBlocks*sizeof(unsigned int)) { -#ifdef DHT_LOG - fprintf(logfile,"getDHTdata(): ERROR: blockOwnerArray not completely received\n"); - fflush(logfile); -#endif + dhtLog("getDHTdata(): ERROR: blockOwnerArray not completely received\n"); close(sock); return -1; } -#ifdef DHT_LOG - fprintf(logfile,"getDHTdata(): got data:\n"); - writeDHTdata(logfile); - fflush(logfile); -#endif + dhtLog("getDHTdata(): got data:\n"); + writeDHTdata(logfile); + return 0; } unsigned int hash(unsigned int x) { - return x % numBlocks; + //this shouldn't be called when numBlocks = 0, so if you get a divide-by-zero, + // make sure we are in a proper state for key owner lookups + return x % numBlocks; } //This function will not return until it succeeds in submitting @@ -1086,13 +1112,10 @@ void initRebuild() while (!done) { -#ifdef DHT_LOG if (retry_count > 0) { - fprintf(logfile,"initRebuild(): retry count:%d\n", retry_count); - fflush(logfile); + dhtLog("initRebuild(): retry count:%d\n", retry_count); } -#endif if (leader == 0 || retry_count > 0) { @@ -1100,15 +1123,9 @@ void initRebuild() if (leader == 0) //no response { //TODO:elect leader: this will do for now + initDHTdata(); leader = getMyIpAddr(); - - numHosts = 1; - hostArray = calloc(numHosts, sizeof(struct hostData)); - hostArray[0] = myHostData; - numBlocks = INIT_BLOCK_NUM; - blockOwnerArray = calloc(numBlocks, sizeof(unsigned int)); - for (i = 0; i < numBlocks; i++) - blockOwnerArray[i] = 0; + state = LEAD_NORMAL_STATE; } } @@ -1118,60 +1135,37 @@ void initRebuild() (void *)&msg, sizeof(msg), (void *)&response, sizeof(response), TIMEOUT_MS, MAX_RETRIES); if (bytesReceived == -1) - { - perror("initRebuild():recv()"); - } + { perror("initRebuild():recv()"); } else if (bytesReceived != sizeof(response)) { -#ifdef DHT_LOG - fprintf(logfile,"initRebuild(): ERROR: response not completely received\n"); - fflush(logfile); -#endif + dhtLog("initRebuild(): ERROR: response not completely received\n"); } else if (response == NOT_LEADER) { -#ifdef DHT_LOG struct in_addr address; address.s_addr = htonl(leader); - fprintf(logfile,"initRebuild(): ERROR: %s no longer leader\n", + dhtLog("initRebuild(): ERROR: %s no longer leader\n", inet_ntoa(address)); - fflush(logfile); -#endif } else if (response != REBUILD_RES) { -#ifdef DHT_LOG - fprintf(logfile,"initRebuild(): ERROR: unexpected response\n"); - fflush(logfile); -#endif + dhtLog("initRebuild(): ERROR: unexpected response\n"); } else { -#ifdef DHT_LOG - fprintf(logfile,"initRebuild(): submitted rebuild request\n"); + dhtLog("initRebuild(): submitted rebuild request\n"); writeDHTdata(logfile); - fflush(logfile); -#endif done = 1; } } return; } -void leadRebuild() -{ - -} - -void followRebuild() -{ - -} - void writeDHTdata(FILE *outfile) { int i; struct in_addr address; + fprintf(outfile,"numHosts=%d,numBlocks=%d\n", numHosts, numBlocks); fprintf(outfile,"hostArray: index: ipAddr, maxKeyCapacity\n"); for (i = 0; i < numHosts; i++) @@ -1182,10 +1176,181 @@ void writeDHTdata(FILE *outfile) } fprintf(outfile,"blockOwnerArray: index: blockOwner\n"); for (i = 0; i < numBlocks; i++) + fprintf(outfile,"%d: %d ", i, blockOwnerArray[i]); + fprintf(outfile,"\n"); +} + +void clearDHTdata() +{ + if (hostArray != NULL) + { + free(hostArray); + hostArray = NULL; + } + if (blockOwnerArray != NULL) + { + free(blockOwnerArray); + blockOwnerArray = NULL; + } + numHosts = numBlocks = hostArraySize = 0; + return; +} + +void initDHTdata() +{ + int i; + + clearDHTdata(); + hostArraySize = INIT_HOST_ALLOC; + hostArray = calloc(hostArraySize, sizeof(struct hostData)); + numHosts = 1; + hostArray[0] = myHostData; + numBlocks = INIT_BLOCK_NUM; + blockOwnerArray = calloc(numBlocks, sizeof(unsigned int)); + for (i = 0; i < numBlocks; i++) + blockOwnerArray[i] = 0; + + return; +} + +void addHost(struct hostData newHost) +{ + struct hostData *newArray; + unsigned int newArraySize; + + if (hostArray == NULL || blockOwnerArray == NULL || hostArraySize == 0) + initDHTdata(); + + if (numHosts == hostArraySize) + { + newArraySize = hostArraySize * 2; + newArray = calloc(newArraySize, sizeof(struct hostData)); + memcpy(newArray, hostArray, (hostArraySize * sizeof(struct hostData))); + free(hostArray); + hostArray = newArray; + hostArraySize = newArraySize; + } + + hostArray[numHosts] = newHost; + numHosts++; + + return; +} + +void makeAssignments() +{ + int i; + + if (hostArray == NULL || blockOwnerArray == NULL || hostArraySize == 0) + initDHTdata(); + + if (numBlocks < numHosts) + { + free(blockOwnerArray); + while (numBlocks < numHosts) + numBlocks *= 2; + blockOwnerArray = calloc(numBlocks, sizeof(unsigned int)); + } + + for (i = 0; i < numBlocks; i++) + blockOwnerArray[i] = i % numHosts; + + return; +} + +//returns not-zero if ok, zero if not ok +int msgSizeOk(unsigned char type, unsigned int size) +{ + int status; + + switch (type) { - fprintf(outfile,"%d: %d\n", i, blockOwnerArray[i]); + case INSERT_CMD: + status = (size == sizeof(struct insertCmd)); + break; + case INSERT_RES: + status = (size == sizeof(struct insertRes)); + break; + case REMOVE_CMD: + status = (size == sizeof(struct removeCmd)); + break; + case REMOVE_RES: + status = (size == sizeof(struct removeRes)); + break; + case SEARCH_CMD: + status = (size == sizeof(struct searchCmd)); + break; + case SEARCH_RES: + status = (size == sizeof(struct searchRes)); + break; + case FIND_LEADER_REQ: + status = (size == sizeof(char)); + break; + case FIND_LEADER_RES: + status = (size == sizeof(char)); + break; + case REBUILD_REQ: + status = (size == sizeof(char)); + break; + case REBUILD_RES: + status = (size == sizeof(char)); + break; + case NOT_LEADER: + status = (size == sizeof(char)); + break; + case REBUILD_CMD: + status = (size == sizeof(char)); + break; + case JOIN_REQ: + status = (size == sizeof(struct joinReq)); + break; + case JOIN_RES: + status = (size == sizeof(char)); + break; + case GET_DHT_INFO_CMD: + status = (size == sizeof(char)); + break; + case DHT_INFO_REQ: + status = (size == sizeof(char)); + break; + case DHT_INFO_RES: + status = (size == sizeof(char)); + break; + case FILL_DHT_CMD: + status = (size == sizeof(char)); + break; + case FILL_DHT_RES: + status = (size == sizeof(char)); + break; + case REBUILD_DONE_INFO: + status = (size == sizeof(char)); + break; + default: + status = 0; + break; } + return status; +} + +void dhtLog(const char *format, ...) +{ + va_list args; + struct timeval now; + + if (gettimeofday(&now, NULL) < 0) + { perror("dhtLog():gettimeofday()"); } + va_start(args, format); + if (fprintf(logfile, "%d.%06d:", now.tv_sec, now.tv_usec) < 0) + { perror("dhtLog():fprintf()"); } + if (vfprintf(logfile, format, args) < 0) + { perror("dhtLog():vfprintf()"); } + if (fflush(logfile) == EOF) + { perror("dhtLog():fflush()"); } + va_end(args); + + return; } #endif + -- 2.34.1