From: erubow Date: Fri, 22 Jun 2007 09:25:59 +0000 (+0000) Subject: Added: broadcast and wait function, leader discovery, initialization of X-Git-Tag: preEdgeChange~544 X-Git-Url: http://demsky.eecs.uci.edu/git/?a=commitdiff_plain;h=a90b1c453d6cbf1a075fa3fdbf4c30ee9a8ffca0;p=IRC.git Added: broadcast and wait function, leader discovery, initialization of host and block data, request and transfer of host and block data from leader, also allow for hard-coding of OID locations: #define SIMPLE_DHT, set IP addresses according to configuration. --- diff --git a/Robust/src/Runtime/DSTM/interface/dht.c b/Robust/src/Runtime/DSTM/interface/dht.c index 17b9dafd..2a726f50 100644 --- a/Robust/src/Runtime/DSTM/interface/dht.c +++ b/Robust/src/Runtime/DSTM/interface/dht.c @@ -1,3 +1,46 @@ +#include "dht.h" + +#ifdef SIMPLE_DHT + +#include + +#define NUM_HOSTS 4 +#define OIDS_PER_HOST 0x40000000 + +//set these to your IP addresses +unsigned int hosts[NUM_HOSTS] = { + 0xc0a802c8, + 0xc0a802c9, + 0xc0a802ca, + 0xc0a802cb, +}; + +//does nothing +void dhtInit(unsigned int maxKeyCapaciy) +{ return;} + +//does nothing +void dhtExit() +{ return;} + +//does nothing, returns 0 +int dhtInsert(unsigned int key, unsigned int val) +{ return 0;} + +//does nothing, returns 0 +int dhtRemove(unsigned int key) +{ return 0;} + +//returns 0 if successful and copies val into *val, +// 1 if key not found, -1 if an error occurred +int dhtSearch(unsigned int key, unsigned int *val) +{ + *val = hosts[key / OIDS_PER_HOST]; + return 0; +} + +#else + #include #include #include @@ -13,7 +56,6 @@ #include #include #include -#include "dht.h" #include "clookup.h" //this works for now, do we need anything better? #define BUFFER_SIZE 512 //maximum message size @@ -25,16 +67,27 @@ #define INIT_HOST_ALLOC 16 #define INIT_BLOCK_NUM 64 #define DEFAULT_INTERFACE "eth0" +#define DHT_LOG "dht.log" enum { - INSERT_COMMAND, - REMOVE_COMMAND, - SEARCH_COMMAND, - FIND_LEADER_COMMAND, - INSERT_RESPONSE, - REMOVE_RESPONSE, - SEARCH_RESPONSE, - FIND_LEADER_RESPONSE + INSERT_CMD, + INSERT_RES, + REMOVE_CMD, + REMOVE_RES, + SEARCH_CMD, + SEARCH_RES, + FIND_LEADER_CMD, + FIND_LEADER_RES, + + REBUILD_REQ, + REBUILD_CMD, + JOIN_REQ, + JOIN_RES, + DHT_INFO_CMD, + DHT_INFO_RES, + FILL_DHT_CMD, + FILL_DHT_RES, + REBUILD_DONE_INFO }; @@ -55,33 +108,39 @@ struct hostData { }; struct insertCmd { - unsigned int msgType; + unsigned int msgType:8; + unsigned int unused:24; unsigned int key; unsigned int val; }; struct removeCmd { - unsigned int msgType; + unsigned int msgType:8; + unsigned int unused:24; unsigned int key; }; struct searchCmd { - unsigned int msgType; + unsigned int msgType:8; + unsigned int unused:24; unsigned int key; }; struct insertRes { - unsigned int msgType; + unsigned int msgType:8; + unsigned int unused:24; unsigned int status; }; struct removeRes { - unsigned int msgType; + unsigned int msgType:8; + unsigned int unused:24; unsigned int status; }; struct searchRes { - unsigned int msgType; + unsigned int msgType:8; + unsigned int unused:24; unsigned int status; unsigned int val; }; @@ -89,56 +148,87 @@ struct searchRes { //TODO: leave message, rebuild message... +FILE *logfile; +unsigned int leader; //ip address of leader struct hostData myHostData; +/*----DHT data----*/ unsigned int numHosts; struct hostData *hostArray; -unsigned int hostArraySize; unsigned int numBlocks; unsigned int *blockOwnerArray; -unsigned int blockOwnerArraySize; +/*----end DHT data----*/ +//return my IP address unsigned int getMyIpAddr(); +//sends broadcast to discover leader +unsigned int getLeadersIpAddr(); +//UDP server void *udpListen(); +//TCP server void *tcpListen(); +//TCP connection handler void *tcpAccept(void *); //returns number of bytes received in resBuffer, or -1 if an error occurred -int udpSendWaitForResponse(unsigned int dest_ip, unsigned short dest_port, void *msg, unsigned int msglen, void *resBuffer, unsigned int resBufferSize, unsigned int timeout, unsigned int numRetries); -int sendNoWait(unsigned int dest_ip, unsigned short dest_port, void *msg, unsigned int msglen); +int udpSendWaitForResponse(unsigned int dest_ip, unsigned short dest_port, + void *msg, unsigned int msglen, void *resBuffer, unsigned int resBufferSize, + unsigned int timeout, unsigned int numRetries); +//returns number of bytes received in resBuffer, or -1 if an error occurred +int udpBroadcastWaitForResponse(unsigned int *reply_ip, + unsigned short dest_port, void *msg, unsigned int msglen, void *resBuffer, + unsigned int resBufferSize, unsigned int timeout, unsigned int numRetries); +//just UDP it +int sendNoWait(unsigned int dest_ip, unsigned short dest_port, void *msg, + unsigned int msglen); +//right now this hashes the key into a block and returns the block owner unsigned int getKeyOwner(unsigned int key); +//simple hash unsigned int hash(unsigned int x); +//initiates TCP connection with leader, gets DHT data +int getDHTdata(); +//outputs readable DHT data to outfile +void writeDHTdata(FILE *outfile); void dhtInit(unsigned int maxKeyCapacity) { unsigned int myMessage; int bytesReceived; int i; + int ret; + +#ifdef DHT_LOG + logfile = fopen(DHT_LOG, "w"); +#endif myHostData.ipAddr = getMyIpAddr(); myHostData.maxKeyCapacity = maxKeyCapacity; - - - //announce presence (udp broadcast), get data structures from leader (leader initiates tcp transfer) - + numHosts = numBlocks = 0; + hostArray = NULL; + blockOwnerArray = NULL; -//if no response, I am the first + leader = getLeadersIpAddr(); - numHosts = 1; - hostArray = malloc(INIT_HOST_ALLOC * sizeof(struct hostData)); - hostArray[0] = myHostData; + if (leader == 0) + { //no response: I am the first + leader = getMyIpAddr(); - numBlocks = INIT_BLOCK_NUM; - blockOwnerArray = malloc(numBlocks * sizeof(unsigned short)); - for (i = 0; i < numBlocks; i++) + numHosts = 1; + hostArray = calloc(numHosts, sizeof(struct hostData)); + hostArray[0] = myHostData; + numBlocks = INIT_BLOCK_NUM; + blockOwnerArray = calloc(numBlocks, sizeof(unsigned int)); + for (i = 0; i < numBlocks; i++) + blockOwnerArray[i] = 0; + } + else { - blockOwnerArray[i] = 0; + //get DHT data from leader + ret = getDHTdata(); + + //TODO: actually, just initiate a rebuild here instead } - - //otherwise, scan array and choose blocks to take over - //get data from hosts that own those blocks (tcp), fill hash table - //notify (the leader or everybody?) of ownership changes - - //start server (udp) + + //start servers pthread_t threadUdpListen, threadTcpListen; pthread_create(&threadUdpListen, NULL, udpListen, NULL); pthread_create(&threadTcpListen, NULL, tcpListen, NULL); @@ -148,7 +238,7 @@ void dhtInit(unsigned int maxKeyCapacity) void dhtExit() { - + fclose(logfile); } int dhtInsert(unsigned int key, unsigned int val) @@ -158,14 +248,16 @@ int dhtInsert(unsigned int key, unsigned int val) struct insertRes response; int bytesReceived; - myMessage.msgType = INSERT_COMMAND; + myMessage.msgType = INSERT_CMD; myMessage.key = key; myMessage.val = val; - bytesReceived = udpSendWaitForResponse(dest_ip, UDP_PORT, (void *)&myMessage, sizeof(struct insertCmd), (void *)&response, sizeof(struct insertRes), TIMEOUT_MS, MAX_RETRIES); + bytesReceived = udpSendWaitForResponse(dest_ip, UDP_PORT, (void *)&myMessage, + sizeof(struct insertCmd), (void *)&response, sizeof(struct insertRes), + TIMEOUT_MS, MAX_RETRIES); if (bytesReceived == sizeof(struct insertRes)) { - if (response.msgType == INSERT_RESPONSE) + if (response.msgType == INSERT_RES) { if (response.status == INSERT_OK) return 0; @@ -183,13 +275,15 @@ int dhtRemove(unsigned int key) struct removeRes response; int bytesReceived; - myMessage.msgType = REMOVE_COMMAND; + myMessage.msgType = REMOVE_CMD; myMessage.key = key; - bytesReceived = udpSendWaitForResponse(dest_ip, UDP_PORT, (void *)&myMessage, sizeof(struct removeCmd), (void *)&response, sizeof(struct removeRes), TIMEOUT_MS, MAX_RETRIES); + bytesReceived = udpSendWaitForResponse(dest_ip, UDP_PORT, (void *)&myMessage, + sizeof(struct removeCmd), (void *)&response, sizeof(struct removeRes), + TIMEOUT_MS, MAX_RETRIES); if (bytesReceived == sizeof(struct removeRes)) { - if (response.msgType == REMOVE_RESPONSE) + if (response.msgType == REMOVE_RES) { if (response.status == REMOVE_OK) return 0; @@ -207,13 +301,15 @@ int dhtSearch(unsigned int key, unsigned int *val) struct searchRes response; int bytesReceived; - myMessage.msgType = SEARCH_COMMAND; + myMessage.msgType = SEARCH_CMD; myMessage.key = key; - bytesReceived = udpSendWaitForResponse(dest_ip, UDP_PORT, (void *)&myMessage, sizeof(struct searchCmd), (void *)&response, sizeof(struct searchRes), TIMEOUT_MS, MAX_RETRIES); + bytesReceived = udpSendWaitForResponse(dest_ip, UDP_PORT, (void *)&myMessage, + sizeof(struct searchCmd), (void *)&response, sizeof(struct searchRes), + TIMEOUT_MS, MAX_RETRIES); if (bytesReceived == sizeof(struct searchRes)) { - if (response.msgType == SEARCH_RESPONSE) + if (response.msgType == SEARCH_RES) { if (response.status == KEY_FOUND) { @@ -255,7 +351,7 @@ void *udpListen() if ((sock = socket(AF_INET, SOCK_DGRAM, IPPROTO_UDP)) == -1) { - perror("socket()"); + perror("udpListen():socket()"); pthread_exit(NULL); } @@ -266,42 +362,62 @@ void *udpListen() if (bind(sock, (struct sockaddr *)&myAddr, socklen) == -1) { - perror("bind()"); + perror("udpListen():bind()"); pthread_exit(NULL); } -// printf("listening...\n"); +#ifdef DHT_LOG + fprintf(logfile,"udpListen(): listening on port %d\n", UDP_PORT); + fflush(logfile); +#endif while(1) { - if ((bytesReceived = recvfrom(sock, buffer, BUFFER_SIZE, 0, (struct sockaddr *)&clientAddr, &socklen)) == -1) + if ((bytesReceived = recvfrom(sock, buffer, BUFFER_SIZE, 0, + (struct sockaddr *)&clientAddr, &socklen)) == -1) { - perror("recvfrom()"); + perror("udpListen():recvfrom()"); break; } if (bytesReceived == 0) { - printf("recvfrom() returned 0\n"); +#ifdef DHT_LOG + fprintf(logfile,"udpListen(): recvfrom() returned 0\n"); + fflush(logfile); +#endif break; } gettimeofday(&now, NULL); -// printf("message received:%ds,%dus\n", now.tv_sec, now.tv_usec); +#ifdef DHT_LOG + fprintf(logfile,"udpListen(): message received:%ds,%dus\n", now.tv_sec, + now.tv_usec); + fprintf(logfile,"udpListen(): received %d bytes from %s:%d\n", + bytesReceived,inet_ntoa(clientAddr.sin_addr),htons(clientAddr.sin_port)); + fflush(logfile); +#endif -// printf("Received %d bytes from %x:%d\n", bytesReceived, clientAddr.sin_addr.s_addr, clientAddr.sin_port); switch (buffer[0]) { - case INSERT_COMMAND: + case INSERT_CMD: if (bytesReceived != sizeof(struct insertCmd)) { - printf("error: incorrect message size\n"); +#ifdef DHT_LOG + fprintf(logfile, "udpListen(): ERROR: incorrect message size\n"); + fflush(logfile); +#endif break; } insertCmdPtr = (struct insertCmd *)buffer; -// printf("Insert: key=%d, val=%d\n", insertCmdPtr->key, insertCmdPtr->val); +#ifdef DHT_LOG + fprintf(logfile, "udpListen(): Insert: key=%d, val=%d\n", + insertCmdPtr->key, insertCmdPtr->val); + fflush(logfile); +#endif insertResPtr = (struct insertRes *)replyBuffer; - insertResPtr->msgType = INSERT_RESPONSE; + insertResPtr->msgType = INSERT_RES; if (getKeyOwner(insertCmdPtr->key) == myHostData.ipAddr) { //note: casting val to void * in order to conform to API - if(chashInsert(myHashTable, insertCmdPtr->key, (void *)insertCmdPtr->val) == 0) + if(chashInsert(myHashTable, insertCmdPtr->key, + (void *)insertCmdPtr->val) == 0) insertResPtr->status = INSERT_OK; else insertResPtr->status = INSERT_ERROR; @@ -310,18 +426,28 @@ void *udpListen() { insertResPtr->status = NOT_KEY_OWNER;; } - sendto(sock, (void *)insertResPtr, sizeof(struct insertRes), 0, (struct sockaddr *)&clientAddr, socklen); + if (sendto(sock, (void *)insertResPtr, sizeof(struct insertRes), 0, + (struct sockaddr *)&clientAddr, socklen) == -1) + { + perror("udpListen():sendto()"); + } break; - case REMOVE_COMMAND: + case REMOVE_CMD: if (bytesReceived != sizeof(struct removeCmd)) { - printf("error: incorrect message size\n"); +#ifdef DHT_LOG + fprintf(logfile, "udpListen(): ERROR: incorrect message size\n"); + fflush(logfile); +#endif break; } removeCmdPtr = (struct removeCmd *)buffer; -// printf("Remove: key=%d\n", removeCmdPtr->key); +#ifdef DHT_LOG + fprintf(logfile,"udpListen(): Remove: key=%d\n", removeCmdPtr->key); + fflush(logfile); +#endif removeResPtr = (struct removeRes *)replyBuffer; - removeResPtr->msgType = REMOVE_RESPONSE; + removeResPtr->msgType = REMOVE_RES; if (getKeyOwner(removeCmdPtr->key) == myHostData.ipAddr) { //note: casting val to void * in order to conform to API @@ -334,22 +460,33 @@ void *udpListen() { removeResPtr->status = NOT_KEY_OWNER; } - sendto(sock, (void *)removeResPtr, sizeof(struct removeRes), 0, (struct sockaddr *)&clientAddr, socklen); + if (sendto(sock, (void *)removeResPtr, sizeof(struct removeRes), 0, + (struct sockaddr *)&clientAddr, socklen) == -1) + { + perror("udpListen():sendto()"); + } break; - case SEARCH_COMMAND: + case SEARCH_CMD: if (bytesReceived != sizeof(struct searchCmd)) { - printf("error: incorrect message size\n"); +#ifdef DHT_LOG + fprintf(logfile,"udpListen(): ERROR: incorrect message size\n"); + fflush(logfile); +#endif break; } searchCmdPtr = (struct searchCmd *)buffer; -// printf("Search: key=%d\n",searchCmdPtr->key); +#ifdef DHT_LOG + fprintf(logfile,"udpListen(): Search: key=%d\n",searchCmdPtr->key); + fflush(logfile); +#endif searchResPtr = (struct searchRes *)replyBuffer; - searchResPtr->msgType = SEARCH_RESPONSE; + searchResPtr->msgType = SEARCH_RES; if (getKeyOwner(searchCmdPtr->key) == myHostData.ipAddr) { //note: casting val to void * in order to conform to API - if((searchResPtr->val = (unsigned int)chashSearch(myHashTable, searchCmdPtr->key)) == 0) + if((searchResPtr->val = (unsigned int)chashSearch(myHashTable, + searchCmdPtr->key)) == 0) searchResPtr->status = KEY_NOT_FOUND; else searchResPtr->status = KEY_FOUND; @@ -358,22 +495,49 @@ void *udpListen() { searchResPtr->status = NOT_KEY_OWNER; } - sendto(sock, (void *)searchResPtr, sizeof(struct searchRes), 0, (struct sockaddr *)&clientAddr, socklen); + if (sendto(sock, (void *)searchResPtr, sizeof(struct searchRes), 0, + (struct sockaddr *)&clientAddr, socklen) == -1) + { + perror("udpListen():sendto()"); + } break; - //just ignore anything else -// default: -// printf("Unknown message type\n"); + case FIND_LEADER_CMD: + if (bytesReceived != sizeof(char)) + { +#ifdef DHT_LOG + fprintf(logfile, "udpListen(): ERROR: incorrect message size\n"); + fflush(logfile); +#endif + break; + } + if (leader == getMyIpAddr()) + { + replyBuffer[0] = FIND_LEADER_RES; + if(sendto(sock, (void *)replyBuffer, sizeof(char), 0, + (struct sockaddr *)&clientAddr, socklen) == -1) + { + perror("udpListen():sendto"); + } + } + break; + default: +#ifdef DHT_LOG + fprintf(logfile,"udpListen(): ERROR: Unknown message type\n"); + fflush(logfile); +#endif } } } -int udpSendWaitForResponse(unsigned int dest_ip, unsigned short dest_port, void *msg, unsigned int msglen, void *resBuffer, unsigned int resBufferSize, unsigned int timeout, unsigned int numRetries) +int udpSendWaitForResponse(unsigned int dest_ip, unsigned short dest_port, + void *msg, unsigned int msglen, void *resBuffer, unsigned int resBufferSize, + unsigned int timeout, unsigned int numRetries) { struct sockaddr_in server_addr; struct sockaddr_in ack_addr; socklen_t socklen = sizeof(struct sockaddr_in); struct pollfd pollsock; -// struct timeval now; + struct timeval now; int retval; int i; ssize_t bytesReceived; @@ -385,7 +549,7 @@ int udpSendWaitForResponse(unsigned int dest_ip, unsigned short dest_port, void if ((pollsock.fd = socket(PF_INET, SOCK_DGRAM, IPPROTO_UDP)) == -1) { - perror("socket()"); + perror("udpSendWaitForResponse():socket()"); return -1; } @@ -393,32 +557,127 @@ int udpSendWaitForResponse(unsigned int dest_ip, unsigned short dest_port, void for (i = 0; i < MAX_RETRIES; i++) { -// if (i > 0) -// printf("trying again, count: %d\n", i+1); - if (sendto(pollsock.fd, msg, msglen, 0, (struct sockaddr *)&server_addr, socklen) == -1) +#ifdef DHT_LOG + if (i > 0) + fprintf(logfile,"udpSendWaitForResponse(): trying again, count: %d\n", + i+1); + fflush(logfile); +#endif + if (sendto(pollsock.fd, msg, msglen, 0, (struct sockaddr *)&server_addr, + socklen) == -1) { - perror("sendto"); + perror("udpSendWaitForResponse():sendto"); return -1; } -// gettimeofday(&now, NULL); -// printf("message sent:%ds,%dus\n", now.tv_sec, now.tv_usec); +#ifdef DHT_LOG + gettimeofday(&now, NULL); + fprintf(logfile,"udpSendWaitForResponse(): message sent:%ds,%dus\n", + now.tv_sec, now.tv_usec); + fflush(logfile); +#endif retval = poll(&pollsock, 1, timeout); if (retval !=0) { - bytesReceived = recvfrom(pollsock.fd, resBuffer, resBufferSize, 0, (struct sockaddr *)&ack_addr, &socklen); + bytesReceived = recvfrom(pollsock.fd, resBuffer, resBufferSize, 0, + (struct sockaddr *)&ack_addr, &socklen); if ((ack_addr.sin_addr.s_addr == server_addr.sin_addr.s_addr) && (ack_addr.sin_port == server_addr.sin_port)) { close(pollsock.fd); -// gettimeofday(&now, NULL); -// printf("received response:%ds,%dus\n", now.tv_sec, now.tv_usec); +#ifdef DHT_LOG + gettimeofday(&now, NULL); + fprintf(logfile,"udpSendWaitForResponse(): received response:%ds,%dus\n", now.tv_sec, now.tv_usec); + fflush(logfile); +#endif return bytesReceived; } } } close(pollsock.fd); -// gettimeofday(&now, NULL); -// printf("timed out, no ack:%ds,%dus\n", now.tv_sec, now.tv_usec); +#ifdef DHT_LOG + gettimeofday(&now, NULL); + printf("udpSendWaitForResponse(): timed out, no ack:%ds,%dus\n", + now.tv_sec, now.tv_usec); + fflush(logfile); +#endif + return -1; +} + +int udpBroadcastWaitForResponse(unsigned int *reply_ip, + unsigned short dest_port, void *msg, unsigned int msglen, void *resBuffer, + unsigned int resBufferSize, unsigned int timeout, unsigned int numRetries) +{ + struct sockaddr_in server_addr; + struct sockaddr_in ack_addr; + socklen_t socklen = sizeof(struct sockaddr_in); + struct pollfd pollsock; + struct timeval now; + int retval; + int i; + ssize_t bytesReceived; + int on; + + bzero((char *) &server_addr, sizeof(server_addr)); + server_addr.sin_family = AF_INET; + server_addr.sin_port = htons(dest_port); + server_addr.sin_addr.s_addr = htonl(0xFFFFFFFF); + + if ((pollsock.fd = socket(PF_INET, SOCK_DGRAM, IPPROTO_UDP)) == -1) + { + perror("udpBroadcastWaitForResponse():socket()"); + return -1; + } + + on = 1; + if (setsockopt(pollsock.fd, SOL_SOCKET, SO_BROADCAST, &on, sizeof(on)) == -1) + { + perror("udpBroadcastWaitForResponse():setsockopt()"); + return -1; + } + + pollsock.events = POLLIN; + + for (i = 0; i < MAX_RETRIES; i++) + { +#ifdef DHT_LOG + if (i > 0) + fprintf(logfile,"udpBroadcastWaitForResponse(): trying again, count: %d\n", i+1); + fflush(logfile); +#endif + if (sendto(pollsock.fd, msg, msglen, 0, (struct sockaddr *)&server_addr, + socklen) == -1) + { + perror("udpBroadcastWaitForResponse():sendto()"); + return -1; + } +#ifdef DHT_LOG + gettimeofday(&now, NULL); + fprintf(logfile,"udpBroadcastWaitForResponse(): message sent:%ds,%dus\n", + now.tv_sec, now.tv_usec); + fflush(logfile); +#endif + retval = poll(&pollsock, 1, timeout); + if (retval !=0) + { + bytesReceived = recvfrom(pollsock.fd, resBuffer, resBufferSize, 0, + (struct sockaddr *)&ack_addr, &socklen); + close(pollsock.fd); + *reply_ip = htonl(ack_addr.sin_addr.s_addr); +#ifdef DHT_LOG + gettimeofday(&now, NULL); + fprintf(logfile,"udpBroadcastWaitForResponse(): received response:%ds,%dus\n", now.tv_sec, now.tv_usec); + fflush(logfile); +#endif + return bytesReceived; + } + } + close(pollsock.fd); +#ifdef DHT_LOG + gettimeofday(&now, NULL); + fprintf(logfile,"udpBroadcastWaitForResponse(): timed out, no ack:%ds,%dus\n", + now.tv_sec, now.tv_usec); + fflush(logfile); +#endif return -1; } @@ -434,7 +693,7 @@ void *tcpListen() sockListen = socket(AF_INET, SOCK_STREAM, 0); if (sockListen == -1) { - perror("socket()"); + perror("tcpListen():socket()"); pthread_exit(NULL); } @@ -445,16 +704,21 @@ void *tcpListen() if (bind(sockListen, (struct sockaddr *)&myAddr, socklen) == -1) { - perror("socket()"); + perror("tcpListen():socket()"); pthread_exit(NULL); } if (listen(sockListen, BACKLOG) == -1) { - perror("listen()"); + perror("tcpListen():listen()"); pthread_exit(NULL); } +#ifdef DHT_LOG + fprintf(logfile,"tcpListen(): listening on port %d\n", TCP_PORT); + fflush(logfile); +#endif + while(1) { sockAccept = accept(sockListen, (struct sockaddr *)&clientAddr, &socklen); @@ -465,17 +729,72 @@ void *tcpListen() void *tcpAccept(void *arg) { int sockAccept = (int)arg; - - printf("accepted tcp connection, file descriptor: %d\n", sockAccept); + int bytesReceived; + char msgType; - sleep(30); +#ifdef DHT_LOG + fprintf(logfile, "tcpAccept(): accepted tcp connection, file descriptor: %d\n", sockAccept); + fflush(logfile); +#endif + + bytesReceived = recv(sockAccept, &msgType, sizeof(char), 0); + if (bytesReceived == -1) + { + perror("tcpAccept():recv()"); + } + else if (bytesReceived == 0) + { +#ifdef DHT_LOG + fprintf(logfile, "tcpAccept(): bytesReceived = 0\n", sockAccept); + fflush(logfile); +#endif + } + else + { + switch (msgType) + { + case DHT_INFO_CMD: + if (send(sockAccept, &numHosts, sizeof(numHosts), 0) == -1) + { + perror("tcpAccept():send()"); + break; + } + if (send(sockAccept, &numBlocks, sizeof(numBlocks), 0) == -1) + { + perror("tcpAccept():send()"); + break; + } + if (send(sockAccept, hostArray, numHosts*sizeof(struct hostData), + 0) == -1) + { + perror("tcpAccept():send()"); + break; + } + if (send(sockAccept, blockOwnerArray, numBlocks*sizeof(unsigned int), + 0) == -1) + { + perror("tcpAccept():send()"); + break; + } + break; + default: +#ifdef DHT_LOG + fprintf(logfile, "tcpAccept(): unrecognized msg type\n"); + fflush(logfile); +#endif + } + } if (close(sockAccept) == -1) { - perror("close()"); + perror("tcpAccept():close()"); } - printf("closed tcp connection, file descriptor: %d\n", sockAccept); +#ifdef DHT_LOG + fprintf(logfile, "tcpAccept(): closed tcp connection, file descriptor: %d\n", + sockAccept); + fflush(logfile); +#endif pthread_exit(NULL); } @@ -495,7 +814,7 @@ unsigned int getMyIpAddr() if((sock = socket(AF_INET, SOCK_STREAM, 0)) < 0) { - perror("socket()"); + perror("getMyIpAddr():socket()"); return 1; } @@ -504,15 +823,197 @@ unsigned int getMyIpAddr() if(ioctl(sock, SIOCGIFADDR, &interfaceInfo) != 0) { - perror("ioctl()"); + perror("getMyIpAddr():ioctl()"); return 1; } return ntohl(myAddr->sin_addr.s_addr); } +unsigned int getLeadersIpAddr() +{ + unsigned int reply_ip; + int bytesReceived; + char myMessage; + char response; + +#ifdef DHT_LOG + fprintf(logfile, "getLeadersIpAddr(): broadcasting...\n"); + fflush(logfile); +#endif + + myMessage = FIND_LEADER_CMD; + + bytesReceived = udpBroadcastWaitForResponse(&reply_ip, UDP_PORT, + (void *)&myMessage, sizeof(myMessage), (void *)&response, + sizeof(response), TIMEOUT_MS, MAX_RETRIES); + + if (bytesReceived == -1) + { +#ifdef DHT_LOG + fprintf(logfile, "getLeadersIpAddr(): no response\n"); + fflush(logfile); +#endif + return 0; + } + else if (response == FIND_LEADER_RES) + { +#ifdef DHT_LOG + struct in_addr reply_addr; + reply_addr.s_addr = htonl(reply_ip); + fprintf(logfile, "getLeadersIpAddr(): leader found:%s\n", + inet_ntoa(reply_addr)); + fflush(logfile); +#endif + return reply_ip; + } + else + { +#ifdef DHT_LOG + fprintf(logfile, "getLeadersIpAddr(): unexpected response\n"); + fflush(logfile); +#endif + return 0; + } +} + +int getDHTdata() +{ + struct sockaddr_in leader_addr; + int sock; + char msg; + int bytesReceived; + + if ((sock = socket(AF_INET, SOCK_STREAM, 0)) == -1) + { + perror("getDHTdata():socket()"); + return -1; + } + + bzero((char *)&leader_addr, sizeof(leader_addr)); + leader_addr.sin_family = AF_INET; + leader_addr.sin_port = htons(TCP_PORT); + leader_addr.sin_addr.s_addr = htonl(leader); + + if (connect(sock, (struct sockaddr *)&leader_addr, sizeof(leader_addr)) == -1) + { + perror("getDHTdata():connect()"); + close(sock); + return -1; + } + msg = DHT_INFO_CMD; + if (send(sock, &msg, sizeof(char), 0) == -1) + { + perror("getDHTdata():send()"); + close(sock); + return -1; + } + bytesReceived = recv(sock, &numHosts, sizeof(numHosts), 0); + if (bytesReceived == -1) + { + perror("getDHTdata():recv()"); + close(sock); + return -1; + } + if (bytesReceived != sizeof(numHosts)) + { +#ifdef DHT_LOG + fprintf(logfile,"getDHTdata(): ERROR: numHosts not completely received\n"); + fflush(logfile); + close(sock); + return -1; +#endif + } + bytesReceived = recv(sock, &numBlocks, sizeof(numBlocks), 0); + if (bytesReceived == -1) + { + perror("getDHTdata():recv()"); + close(sock); + return -1; + } + if (bytesReceived != sizeof(numBlocks)) + { +#ifdef DHT_LOG + fprintf(logfile,"getDHTdata(): ERROR: numBlocks not completely received\n"); + fflush(logfile); + close(sock); + return -1; +#endif + } + if (hostArray != NULL) + free(hostArray); + hostArray = calloc(numHosts, sizeof(struct hostData)); + bytesReceived = recv(sock, hostArray, numHosts*sizeof(struct hostData), 0); + if (bytesReceived == -1) + { + perror("getDHTdata():recv()"); + close(sock); + return -1; + } + if (bytesReceived != numHosts*sizeof(struct hostData)) + { +#ifdef DHT_LOG + fprintf(logfile,"getDHTdata(): ERROR: hostArray not completely received\n"); + fflush(logfile); + close(sock); + return -1; +#endif + } + if (blockOwnerArray != NULL) + free(blockOwnerArray); + blockOwnerArray = calloc(numBlocks, sizeof(unsigned int)); + bytesReceived = recv(sock, blockOwnerArray, numBlocks*sizeof(unsigned int), 0); + if (bytesReceived == -1) + { + perror("getDHTdata():recv()"); + close(sock); + return -1; + } + if (bytesReceived != numBlocks*sizeof(unsigned int)) + { +#ifdef DHT_LOG + fprintf(logfile,"getDHTdata(): ERROR: blockOwnerArray not completely received\n"); + fflush(logfile); + close(sock); + return -1; +#endif + } +#ifdef DHT_LOG + fprintf(logfile,"getDHTdata(): got data:\n"); + writeDHTdata(logfile); + fflush(logfile); +#endif + return 0; +} + unsigned int hash(unsigned int x) { return x % numBlocks; } +void leadRebuild() +{ + +} + +void writeDHTdata(FILE *outfile) +{ + int i; + struct in_addr address; + fprintf(outfile,"numHosts=%d,numBlocks=%d\n", numHosts, numBlocks); + fprintf(outfile,"hostArray: index: ipAddr, maxKeyCapacity\n"); + for (i = 0; i < numHosts; i++) + { + address.s_addr = htonl(hostArray[i].ipAddr); + fprintf(outfile,"%d: %s, %d\n", i, inet_ntoa(address), + hostArray[i].maxKeyCapacity); + } + fprintf(outfile,"blockOwnerArray: index: blockOwner\n"); + for (i = 0; i < numBlocks; i++) + { + fprintf(outfile,"%d: %d\n", i, blockOwnerArray[i]); + } +} + +#endif + diff --git a/Robust/src/Runtime/DSTM/interface/dht.h b/Robust/src/Runtime/DSTM/interface/dht.h index 886c7871..ceeb2a0a 100644 --- a/Robust/src/Runtime/DSTM/interface/dht.h +++ b/Robust/src/Runtime/DSTM/interface/dht.h @@ -1,6 +1,8 @@ #ifndef _DHT_H #define _DHT_H +//#define SIMPLE_DHT + #define DHT_NO_KEY_LIMIT 0xFFFFFFFF //called by host which joins (or starts) the system @@ -14,7 +16,8 @@ void dhtExit(); int dhtInsert(unsigned int key, unsigned int val); //returns 0 if successful, -1 if an error occurred int dhtRemove(unsigned int key); -//returns 0 if successful and copies val into *val, 1 if key not found, -1 if an error occurred +//returns 0 if successful and copies val into *val, +// 1 if key not found, -1 if an error occurred int dhtSearch(unsigned int key, unsigned int *val); #endif diff --git a/Robust/src/Runtime/DSTM/interface/testdht.c b/Robust/src/Runtime/DSTM/interface/testdht.c index dbff2265..636b4dd4 100644 --- a/Robust/src/Runtime/DSTM/interface/testdht.c +++ b/Robust/src/Runtime/DSTM/interface/testdht.c @@ -18,16 +18,11 @@ int main() localHash = chashCreate(HASH_SIZE, LOADFACTOR); srandom(time(0)); - for (key = 0; key < NUM_ITEMS; key++) + for (key = 1; key < NUM_ITEMS; key++) { vals[key] = random(); } - vals[NUM_ITEMS / 2] = 0; - vals[NUM_ITEMS / 3] = 1; - vals[NUM_ITEMS / 4] = 2; - vals[NUM_ITEMS / 5] = 0xFFFFFFFF; - printf("testing dhtInsert() and dhtSearch()\n"); for (key = 0; key < NUM_ITEMS; key++) @@ -36,7 +31,7 @@ int main() } error = 0; - for (key = 0; key < NUM_ITEMS; key++) + for (key = 1; key < NUM_ITEMS; key++) { retval = dhtSearch(key, &val); if (retval == 1) @@ -66,13 +61,13 @@ int main() printf("(this currently fails if key = 0 OR val = 0, due to underlying hash table)\n"); printf("testing underlying hash table (clookup.h)\n"); - for (key = 0; key < NUM_ITEMS; key++) + for (key = 1; key < NUM_ITEMS; key++) { chashInsert(localHash, key, (void *)vals[key]); } error = 0; - for (key = 0; key < NUM_ITEMS; key++) + for (key = 1; key < NUM_ITEMS; key++) { val = (unsigned int)chashSearch(localHash, key); if ((void *)val == NULL) @@ -153,7 +148,9 @@ int main() else printf("one or more errors occurred\n"); - sleep(60); + sleep(5); + + dhtExit(); return 0; }