Added: broadcast and wait function, leader discovery, initialization of
authorerubow <erubow>
Fri, 22 Jun 2007 09:25:59 +0000 (09:25 +0000)
committererubow <erubow>
Fri, 22 Jun 2007 09:25:59 +0000 (09:25 +0000)
host and block data, request and transfer of host and block data from
leader, also allow for hard-coding of OID locations: #define SIMPLE_DHT,
set IP addresses according to configuration.

Robust/src/Runtime/DSTM/interface/dht.c
Robust/src/Runtime/DSTM/interface/dht.h
Robust/src/Runtime/DSTM/interface/testdht.c

index 17b9dafd1fd65469be281a0902356bb7f30e61e4..2a726f508fa6031b9d74a86d73760c30099ccd7b 100644 (file)
@@ -1,3 +1,46 @@
+#include "dht.h"
+
+#ifdef SIMPLE_DHT
+
+#include <arpa/inet.h>
+
+#define NUM_HOSTS 4
+#define OIDS_PER_HOST 0x40000000
+
+//set these to your IP addresses
+unsigned int hosts[NUM_HOSTS] = {
+       0xc0a802c8,
+       0xc0a802c9,
+       0xc0a802ca,
+       0xc0a802cb,
+};
+
+//does nothing
+void dhtInit(unsigned int maxKeyCapaciy)
+{      return;}
+
+//does nothing
+void dhtExit()
+{      return;}
+
+//does nothing, returns 0
+int dhtInsert(unsigned int key, unsigned int val)
+{      return 0;}
+
+//does nothing, returns 0
+int dhtRemove(unsigned int key)
+{      return 0;}
+
+//returns 0 if successful and copies val into *val,
+// 1 if key not found, -1 if an error occurred
+int dhtSearch(unsigned int key, unsigned int *val)
+{
+       *val = hosts[key / OIDS_PER_HOST];
+       return 0;
+}
+
+#else
+
 #include <netinet/in.h>
 #include <arpa/inet.h>
 #include <sys/types.h>
@@ -13,7 +56,6 @@
 #include <netdb.h>
 #include <net/if.h>
 #include <linux/sockios.h>
-#include "dht.h"
 #include "clookup.h" //this works for now, do we need anything better?
 
 #define BUFFER_SIZE 512 //maximum message size
 #define INIT_HOST_ALLOC 16
 #define INIT_BLOCK_NUM 64
 #define DEFAULT_INTERFACE "eth0"
+#define DHT_LOG "dht.log"
 
 enum {
-       INSERT_COMMAND,
-       REMOVE_COMMAND,
-       SEARCH_COMMAND,
-       FIND_LEADER_COMMAND,
-       INSERT_RESPONSE,
-       REMOVE_RESPONSE,
-       SEARCH_RESPONSE,
-       FIND_LEADER_RESPONSE
+       INSERT_CMD,
+       INSERT_RES,
+       REMOVE_CMD,
+       REMOVE_RES,
+       SEARCH_CMD,
+       SEARCH_RES,
+       FIND_LEADER_CMD,
+       FIND_LEADER_RES,
+
+       REBUILD_REQ,
+       REBUILD_CMD,
+       JOIN_REQ,
+       JOIN_RES,
+       DHT_INFO_CMD,
+       DHT_INFO_RES,
+       FILL_DHT_CMD,
+       FILL_DHT_RES,
+       REBUILD_DONE_INFO
 };
 
 
@@ -55,33 +108,39 @@ struct hostData {
 };
 
 struct insertCmd {
-       unsigned int msgType;
+       unsigned int msgType:8;
+       unsigned int unused:24;
        unsigned int key;
        unsigned int val;
 };
 
 struct removeCmd {
-       unsigned int msgType;
+       unsigned int msgType:8;
+       unsigned int unused:24;
        unsigned int key;
 };
 
 struct searchCmd {
-       unsigned int msgType;
+       unsigned int msgType:8;
+       unsigned int unused:24;
        unsigned int key;
 };
 
 struct insertRes {
-       unsigned int msgType;
+       unsigned int msgType:8;
+       unsigned int unused:24;
        unsigned int status;
 };
 
 struct removeRes {
-       unsigned int msgType;
+       unsigned int msgType:8;
+       unsigned int unused:24;
        unsigned int status;
 };
 
 struct searchRes {
-       unsigned int msgType;
+       unsigned int msgType:8;
+       unsigned int unused:24;
        unsigned int status;
        unsigned int val;
 };
@@ -89,56 +148,87 @@ struct searchRes {
 
 //TODO: leave message, rebuild message...
 
+FILE *logfile;
+unsigned int leader; //ip address of leader
 struct hostData myHostData;
+/*----DHT data----*/
 unsigned int numHosts;
 struct hostData *hostArray;
-unsigned int hostArraySize;
 unsigned int numBlocks;
 unsigned int *blockOwnerArray;
-unsigned int blockOwnerArraySize;
+/*----end DHT data----*/
 
+//return my IP address
 unsigned int getMyIpAddr();
+//sends broadcast to discover leader
+unsigned int getLeadersIpAddr();
+//UDP server
 void *udpListen();
+//TCP server
 void *tcpListen();
+//TCP connection handler
 void *tcpAccept(void *);
 //returns number of bytes received in resBuffer, or -1 if an error occurred
-int udpSendWaitForResponse(unsigned int dest_ip, unsigned short dest_port, void *msg, unsigned int msglen, void *resBuffer, unsigned int resBufferSize, unsigned int timeout, unsigned int numRetries);
-int sendNoWait(unsigned int dest_ip, unsigned short dest_port, void *msg, unsigned int msglen);
+int udpSendWaitForResponse(unsigned int dest_ip, unsigned short dest_port,
+       void *msg, unsigned int msglen, void *resBuffer, unsigned int resBufferSize,
+       unsigned int timeout, unsigned int numRetries);
+//returns number of bytes received in resBuffer, or -1 if an error occurred
+int udpBroadcastWaitForResponse(unsigned int *reply_ip,
+       unsigned short dest_port, void *msg, unsigned int msglen, void *resBuffer,
+       unsigned int resBufferSize, unsigned int timeout, unsigned int numRetries);
+//just UDP it
+int sendNoWait(unsigned int dest_ip, unsigned short dest_port, void *msg,
+       unsigned int msglen);
+//right now this hashes the key into a block and returns the block owner
 unsigned int getKeyOwner(unsigned int key);
+//simple hash
 unsigned int hash(unsigned int x);
+//initiates TCP connection with leader, gets DHT data
+int getDHTdata();
+//outputs readable DHT data to outfile
+void writeDHTdata(FILE *outfile);
 
 void dhtInit(unsigned int maxKeyCapacity)
 {
        unsigned int myMessage;
        int bytesReceived;
        int i;
+       int ret;
+
+#ifdef DHT_LOG
+       logfile = fopen(DHT_LOG, "w");
+#endif
 
        myHostData.ipAddr = getMyIpAddr();
        myHostData.maxKeyCapacity = maxKeyCapacity;
 
-       
-
-       //announce presence (udp broadcast), get data structures from leader (leader initiates tcp transfer)
-       
+       numHosts = numBlocks = 0;
+       hostArray = NULL;
+       blockOwnerArray = NULL;
 
-//if no response, I am the first
+       leader = getLeadersIpAddr();
 
-       numHosts = 1;
-       hostArray = malloc(INIT_HOST_ALLOC * sizeof(struct hostData));
-       hostArray[0] = myHostData;
+       if (leader == 0)
+       { //no response: I am the first
+               leader = getMyIpAddr();
 
-       numBlocks = INIT_BLOCK_NUM;
-       blockOwnerArray = malloc(numBlocks * sizeof(unsigned short));
-       for (i = 0; i < numBlocks; i++)
+               numHosts = 1;
+               hostArray = calloc(numHosts, sizeof(struct hostData));
+               hostArray[0] = myHostData;
+               numBlocks = INIT_BLOCK_NUM;
+               blockOwnerArray = calloc(numBlocks, sizeof(unsigned int));
+               for (i = 0; i < numBlocks; i++)
+                       blockOwnerArray[i] = 0;
+       }
+       else
        {
-               blockOwnerArray[i] = 0;
+               //get DHT data from leader
+               ret = getDHTdata();
+
+               //TODO: actually, just initiate a rebuild here instead
        }
-       
-       //otherwise, scan array and choose blocks to take over
-       //get data from hosts that own those blocks (tcp), fill hash table
-       //notify (the leader or everybody?) of ownership changes
-       
-       //start server (udp)
+
+       //start servers
        pthread_t threadUdpListen, threadTcpListen;
        pthread_create(&threadUdpListen, NULL, udpListen, NULL);
        pthread_create(&threadTcpListen, NULL, tcpListen, NULL);
@@ -148,7 +238,7 @@ void dhtInit(unsigned int maxKeyCapacity)
 
 void dhtExit()
 {
-
+       fclose(logfile);
 }
 
 int dhtInsert(unsigned int key, unsigned int val)
@@ -158,14 +248,16 @@ int dhtInsert(unsigned int key, unsigned int val)
        struct insertRes response;
        int bytesReceived;
 
-       myMessage.msgType = INSERT_COMMAND;
+       myMessage.msgType = INSERT_CMD;
        myMessage.key = key;
        myMessage.val = val;
        
-       bytesReceived = udpSendWaitForResponse(dest_ip, UDP_PORT, (void *)&myMessage, sizeof(struct insertCmd), (void *)&response, sizeof(struct insertRes), TIMEOUT_MS, MAX_RETRIES);
+       bytesReceived = udpSendWaitForResponse(dest_ip, UDP_PORT, (void *)&myMessage,
+               sizeof(struct insertCmd), (void *)&response, sizeof(struct insertRes),
+               TIMEOUT_MS, MAX_RETRIES);
        if (bytesReceived == sizeof(struct insertRes))
        {
-               if (response.msgType == INSERT_RESPONSE)
+               if (response.msgType == INSERT_RES)
                {
                        if (response.status == INSERT_OK)
                                return 0;
@@ -183,13 +275,15 @@ int dhtRemove(unsigned int key)
        struct removeRes response;
        int bytesReceived;
        
-       myMessage.msgType = REMOVE_COMMAND;
+       myMessage.msgType = REMOVE_CMD;
        myMessage.key = key;
 
-       bytesReceived = udpSendWaitForResponse(dest_ip, UDP_PORT, (void *)&myMessage, sizeof(struct removeCmd), (void *)&response, sizeof(struct removeRes), TIMEOUT_MS, MAX_RETRIES);
+       bytesReceived = udpSendWaitForResponse(dest_ip, UDP_PORT, (void *)&myMessage,
+               sizeof(struct removeCmd), (void *)&response, sizeof(struct removeRes),
+               TIMEOUT_MS, MAX_RETRIES);
        if (bytesReceived == sizeof(struct removeRes))
        {
-               if (response.msgType == REMOVE_RESPONSE)
+               if (response.msgType == REMOVE_RES)
                {
                        if (response.status == REMOVE_OK)
                                return 0;
@@ -207,13 +301,15 @@ int dhtSearch(unsigned int key, unsigned int *val)
        struct searchRes response;
        int bytesReceived;
 
-       myMessage.msgType = SEARCH_COMMAND;
+       myMessage.msgType = SEARCH_CMD;
        myMessage.key = key;
 
-       bytesReceived = udpSendWaitForResponse(dest_ip, UDP_PORT, (void *)&myMessage, sizeof(struct searchCmd), (void *)&response, sizeof(struct searchRes), TIMEOUT_MS, MAX_RETRIES);
+       bytesReceived = udpSendWaitForResponse(dest_ip, UDP_PORT, (void *)&myMessage,
+               sizeof(struct searchCmd), (void *)&response, sizeof(struct searchRes),
+               TIMEOUT_MS, MAX_RETRIES);
        if (bytesReceived == sizeof(struct searchRes))
        {
-               if (response.msgType == SEARCH_RESPONSE)
+               if (response.msgType == SEARCH_RES)
                {
                        if (response.status == KEY_FOUND)
                        {
@@ -255,7 +351,7 @@ void *udpListen()
 
        if ((sock = socket(AF_INET, SOCK_DGRAM, IPPROTO_UDP)) == -1)
        {
-               perror("socket()");
+               perror("udpListen():socket()");
                pthread_exit(NULL);
        }
        
@@ -266,42 +362,62 @@ void *udpListen()
 
        if (bind(sock, (struct sockaddr *)&myAddr, socklen) == -1)
        {
-               perror("bind()");
+               perror("udpListen():bind()");
                pthread_exit(NULL);
        }
-//     printf("listening...\n");
+#ifdef DHT_LOG
+       fprintf(logfile,"udpListen(): listening on port %d\n", UDP_PORT);
+       fflush(logfile);
+#endif
        while(1)
        {
-               if ((bytesReceived = recvfrom(sock, buffer, BUFFER_SIZE, 0, (struct sockaddr *)&clientAddr, &socklen)) == -1)
+               if ((bytesReceived = recvfrom(sock, buffer, BUFFER_SIZE, 0,
+                       (struct sockaddr *)&clientAddr, &socklen)) == -1)
                {
-                       perror("recvfrom()");
+                       perror("udpListen():recvfrom()");
                        break;
                }
                if (bytesReceived == 0)
                {
-                       printf("recvfrom() returned 0\n");
+#ifdef DHT_LOG
+                       fprintf(logfile,"udpListen(): recvfrom() returned 0\n");
+                       fflush(logfile);
+#endif
                        break;
                }
                gettimeofday(&now, NULL);
-//             printf("message received:%ds,%dus\n", now.tv_sec, now.tv_usec);
+#ifdef DHT_LOG
+               fprintf(logfile,"udpListen(): message received:%ds,%dus\n", now.tv_sec,
+                       now.tv_usec);
+               fprintf(logfile,"udpListen(): received %d bytes from %s:%d\n",
+                       bytesReceived,inet_ntoa(clientAddr.sin_addr),htons(clientAddr.sin_port));
+               fflush(logfile);
+#endif
 
-//             printf("Received %d bytes from %x:%d\n", bytesReceived, clientAddr.sin_addr.s_addr, clientAddr.sin_port);
                switch (buffer[0])
                {
-                       case INSERT_COMMAND:
+                       case INSERT_CMD:
                                if (bytesReceived != sizeof(struct insertCmd))
                                {
-                                       printf("error: incorrect message size\n");
+#ifdef DHT_LOG
+                                       fprintf(logfile, "udpListen(): ERROR: incorrect message size\n");
+                                       fflush(logfile);
+#endif
                                        break;
                                }
                                insertCmdPtr = (struct insertCmd *)buffer;
-//                             printf("Insert: key=%d, val=%d\n", insertCmdPtr->key, insertCmdPtr->val);
+#ifdef DHT_LOG
+                               fprintf(logfile, "udpListen(): Insert: key=%d, val=%d\n",
+                                       insertCmdPtr->key, insertCmdPtr->val);
+                               fflush(logfile);
+#endif
                                insertResPtr = (struct insertRes *)replyBuffer;
-                               insertResPtr->msgType = INSERT_RESPONSE;
+                               insertResPtr->msgType = INSERT_RES;
                                if (getKeyOwner(insertCmdPtr->key) == myHostData.ipAddr)
                                {
                                        //note: casting val to void * in order to conform to API
-                                       if(chashInsert(myHashTable, insertCmdPtr->key, (void *)insertCmdPtr->val) == 0)
+                                       if(chashInsert(myHashTable, insertCmdPtr->key,
+                                                       (void *)insertCmdPtr->val) == 0)
                                                insertResPtr->status = INSERT_OK;
                                        else
                                                insertResPtr->status = INSERT_ERROR;
@@ -310,18 +426,28 @@ void *udpListen()
                                {
                                        insertResPtr->status = NOT_KEY_OWNER;;
                                }
-                               sendto(sock, (void *)insertResPtr, sizeof(struct insertRes), 0, (struct sockaddr *)&clientAddr, socklen);
+                               if (sendto(sock, (void *)insertResPtr, sizeof(struct insertRes), 0,
+                                       (struct sockaddr *)&clientAddr, socklen) == -1)
+                               {
+                                       perror("udpListen():sendto()");
+                               }
                                break;
-                       case REMOVE_COMMAND:
+                       case REMOVE_CMD:
                                if (bytesReceived != sizeof(struct removeCmd))
                                {
-                                       printf("error: incorrect message size\n");
+#ifdef DHT_LOG
+                                       fprintf(logfile, "udpListen(): ERROR: incorrect message size\n");
+                                       fflush(logfile);
+#endif
                                        break;
                                }
                                removeCmdPtr = (struct removeCmd *)buffer;
-//                             printf("Remove: key=%d\n", removeCmdPtr->key);
+#ifdef DHT_LOG
+                               fprintf(logfile,"udpListen(): Remove: key=%d\n", removeCmdPtr->key);
+                               fflush(logfile);
+#endif
                                removeResPtr = (struct removeRes *)replyBuffer;
-                               removeResPtr->msgType = REMOVE_RESPONSE;
+                               removeResPtr->msgType = REMOVE_RES;
                                if (getKeyOwner(removeCmdPtr->key) == myHostData.ipAddr)
                                {
                                        //note: casting val to void * in order to conform to API
@@ -334,22 +460,33 @@ void *udpListen()
                                {
                                        removeResPtr->status = NOT_KEY_OWNER;
                                }
-                               sendto(sock, (void *)removeResPtr, sizeof(struct removeRes), 0, (struct sockaddr *)&clientAddr, socklen);
+                               if (sendto(sock, (void *)removeResPtr, sizeof(struct removeRes), 0,
+                                       (struct sockaddr *)&clientAddr, socklen) == -1)
+                               {
+                                       perror("udpListen():sendto()");
+                               }
                                break;
-                       case SEARCH_COMMAND:
+                       case SEARCH_CMD:
                                if (bytesReceived != sizeof(struct searchCmd))
                                {
-                                       printf("error: incorrect message size\n");
+#ifdef DHT_LOG
+                                       fprintf(logfile,"udpListen(): ERROR: incorrect message size\n");
+                                       fflush(logfile);
+#endif
                                        break;
                                }
                                searchCmdPtr = (struct searchCmd *)buffer;
-//                             printf("Search: key=%d\n",searchCmdPtr->key);
+#ifdef DHT_LOG
+                                       fprintf(logfile,"udpListen(): Search: key=%d\n",searchCmdPtr->key);
+                                       fflush(logfile);
+#endif
                                searchResPtr = (struct searchRes *)replyBuffer;
-                               searchResPtr->msgType = SEARCH_RESPONSE;
+                               searchResPtr->msgType = SEARCH_RES;
                                if (getKeyOwner(searchCmdPtr->key) == myHostData.ipAddr)
                                {
                                        //note: casting val to void * in order to conform to API
-                                       if((searchResPtr->val = (unsigned int)chashSearch(myHashTable, searchCmdPtr->key)) == 0)
+                                       if((searchResPtr->val = (unsigned int)chashSearch(myHashTable,
+                                                       searchCmdPtr->key)) == 0)
                                                searchResPtr->status = KEY_NOT_FOUND;
                                        else
                                                searchResPtr->status = KEY_FOUND;
@@ -358,22 +495,49 @@ void *udpListen()
                                {
                                        searchResPtr->status = NOT_KEY_OWNER;
                                }
-                               sendto(sock, (void *)searchResPtr, sizeof(struct searchRes), 0, (struct sockaddr *)&clientAddr, socklen);
+                               if (sendto(sock, (void *)searchResPtr, sizeof(struct searchRes), 0,
+                                       (struct sockaddr *)&clientAddr, socklen) == -1)
+                               {
+                                       perror("udpListen():sendto()");
+                               }
                                break;
-                               //just ignore anything else
-//                     default:
-//                             printf("Unknown message type\n");
+                       case FIND_LEADER_CMD:
+                               if (bytesReceived != sizeof(char))
+                               {
+#ifdef DHT_LOG
+                                       fprintf(logfile, "udpListen(): ERROR: incorrect message size\n");
+                                       fflush(logfile);
+#endif
+                                       break;
+                               }
+                               if (leader == getMyIpAddr())
+                               {
+                                       replyBuffer[0] = FIND_LEADER_RES;
+                                       if(sendto(sock, (void *)replyBuffer, sizeof(char), 0,
+                                               (struct sockaddr *)&clientAddr, socklen) == -1)
+                                       {
+                                               perror("udpListen():sendto");
+                                       }
+                               }
+                               break;
+                       default:
+#ifdef DHT_LOG
+                               fprintf(logfile,"udpListen(): ERROR: Unknown message type\n");
+                               fflush(logfile);
+#endif
                }
        }
 }
 
-int udpSendWaitForResponse(unsigned int dest_ip, unsigned short dest_port, void *msg, unsigned int msglen, void *resBuffer, unsigned int resBufferSize, unsigned int timeout, unsigned int numRetries)
+int udpSendWaitForResponse(unsigned int dest_ip, unsigned short dest_port,
+       void *msg, unsigned int msglen, void *resBuffer, unsigned int resBufferSize,
+       unsigned int timeout, unsigned int numRetries)
 {
        struct sockaddr_in server_addr;
        struct sockaddr_in ack_addr;
        socklen_t socklen = sizeof(struct sockaddr_in);
        struct pollfd pollsock;
-//     struct timeval now;
+       struct timeval now;
        int retval;
        int i;
        ssize_t bytesReceived;
@@ -385,7 +549,7 @@ int udpSendWaitForResponse(unsigned int dest_ip, unsigned short dest_port, void
 
        if ((pollsock.fd = socket(PF_INET, SOCK_DGRAM, IPPROTO_UDP)) == -1)
        {
-               perror("socket()");
+               perror("udpSendWaitForResponse():socket()");
                return -1;
        }
        
@@ -393,32 +557,127 @@ int udpSendWaitForResponse(unsigned int dest_ip, unsigned short dest_port, void
        
        for (i = 0; i < MAX_RETRIES; i++)
        {
-//             if (i > 0)
-//                     printf("trying again, count: %d\n", i+1);
-               if (sendto(pollsock.fd, msg, msglen, 0, (struct sockaddr *)&server_addr, socklen) == -1)
+#ifdef DHT_LOG
+               if (i > 0)
+                       fprintf(logfile,"udpSendWaitForResponse(): trying again, count: %d\n",
+                               i+1);
+               fflush(logfile);
+#endif
+               if (sendto(pollsock.fd, msg, msglen, 0, (struct sockaddr *)&server_addr,
+                       socklen) == -1)
                {
-                       perror("sendto");
+                       perror("udpSendWaitForResponse():sendto");
                        return -1;
                }
-//             gettimeofday(&now, NULL);
-//             printf("message sent:%ds,%dus\n", now.tv_sec, now.tv_usec);
+#ifdef DHT_LOG
+               gettimeofday(&now, NULL);
+               fprintf(logfile,"udpSendWaitForResponse(): message sent:%ds,%dus\n",
+                       now.tv_sec, now.tv_usec);
+               fflush(logfile);
+#endif
                retval = poll(&pollsock, 1, timeout);
                if (retval !=0)
                {
-                       bytesReceived = recvfrom(pollsock.fd, resBuffer, resBufferSize, 0, (struct sockaddr *)&ack_addr, &socklen);
+                       bytesReceived = recvfrom(pollsock.fd, resBuffer, resBufferSize, 0,
+                               (struct sockaddr *)&ack_addr, &socklen);
                        if ((ack_addr.sin_addr.s_addr == server_addr.sin_addr.s_addr)
                        && (ack_addr.sin_port == server_addr.sin_port))
                        {
                                close(pollsock.fd);
-//                             gettimeofday(&now, NULL);
-//                             printf("received response:%ds,%dus\n", now.tv_sec, now.tv_usec);
+#ifdef DHT_LOG
+                               gettimeofday(&now, NULL);
+                               fprintf(logfile,"udpSendWaitForResponse(): received response:%ds,%dus\n", now.tv_sec, now.tv_usec);
+                               fflush(logfile);
+#endif
                                return bytesReceived;
                        }
                }
        }
        close(pollsock.fd);
-//     gettimeofday(&now, NULL);
-//     printf("timed out, no ack:%ds,%dus\n", now.tv_sec, now.tv_usec);
+#ifdef DHT_LOG
+       gettimeofday(&now, NULL);
+       printf("udpSendWaitForResponse(): timed out, no ack:%ds,%dus\n",
+               now.tv_sec, now.tv_usec);
+       fflush(logfile);
+#endif
+       return -1;
+}
+
+int udpBroadcastWaitForResponse(unsigned int *reply_ip,
+       unsigned short dest_port, void *msg, unsigned int msglen, void *resBuffer,
+       unsigned int resBufferSize, unsigned int timeout, unsigned int numRetries)
+{
+       struct sockaddr_in server_addr;
+       struct sockaddr_in ack_addr;
+       socklen_t socklen = sizeof(struct sockaddr_in);
+       struct pollfd pollsock;
+       struct timeval now;
+       int retval;
+       int i;
+       ssize_t bytesReceived;
+       int on;
+
+       bzero((char *) &server_addr, sizeof(server_addr));
+       server_addr.sin_family = AF_INET;
+       server_addr.sin_port = htons(dest_port);
+       server_addr.sin_addr.s_addr = htonl(0xFFFFFFFF);
+
+       if ((pollsock.fd = socket(PF_INET, SOCK_DGRAM, IPPROTO_UDP)) == -1)
+       {
+               perror("udpBroadcastWaitForResponse():socket()");
+               return -1;
+       }
+
+       on = 1;
+       if (setsockopt(pollsock.fd, SOL_SOCKET, SO_BROADCAST, &on, sizeof(on)) == -1)
+       {
+               perror("udpBroadcastWaitForResponse():setsockopt()");
+               return -1;
+       }
+
+       pollsock.events = POLLIN;
+       
+       for (i = 0; i < MAX_RETRIES; i++)
+       {
+#ifdef DHT_LOG
+               if (i > 0)
+                       fprintf(logfile,"udpBroadcastWaitForResponse(): trying again, count: %d\n", i+1);
+                       fflush(logfile);
+#endif
+               if (sendto(pollsock.fd, msg, msglen, 0, (struct sockaddr *)&server_addr,
+                       socklen) == -1)
+               {
+                       perror("udpBroadcastWaitForResponse():sendto()");
+                       return -1;
+               }
+#ifdef DHT_LOG
+               gettimeofday(&now, NULL);
+               fprintf(logfile,"udpBroadcastWaitForResponse(): message sent:%ds,%dus\n",
+                       now.tv_sec, now.tv_usec);
+               fflush(logfile);
+#endif
+               retval = poll(&pollsock, 1, timeout);
+               if (retval !=0)
+               {
+                       bytesReceived = recvfrom(pollsock.fd, resBuffer, resBufferSize, 0,
+                               (struct sockaddr *)&ack_addr, &socklen);
+                       close(pollsock.fd);
+                       *reply_ip = htonl(ack_addr.sin_addr.s_addr);
+#ifdef DHT_LOG
+                       gettimeofday(&now, NULL);
+                       fprintf(logfile,"udpBroadcastWaitForResponse(): received response:%ds,%dus\n", now.tv_sec, now.tv_usec);
+                       fflush(logfile);
+#endif
+                       return bytesReceived;
+               }
+       }
+       close(pollsock.fd);
+#ifdef DHT_LOG
+       gettimeofday(&now, NULL);
+       fprintf(logfile,"udpBroadcastWaitForResponse(): timed out, no ack:%ds,%dus\n",
+               now.tv_sec, now.tv_usec);
+       fflush(logfile);
+#endif
        return -1;
 }
 
@@ -434,7 +693,7 @@ void *tcpListen()
        sockListen = socket(AF_INET, SOCK_STREAM, 0);
        if (sockListen == -1)
        {
-               perror("socket()");
+               perror("tcpListen():socket()");
                pthread_exit(NULL);
        }
 
@@ -445,16 +704,21 @@ void *tcpListen()
 
        if (bind(sockListen, (struct sockaddr *)&myAddr, socklen) == -1)
        {
-               perror("socket()");
+               perror("tcpListen():socket()");
                pthread_exit(NULL);
        }
 
        if (listen(sockListen, BACKLOG) == -1)
        {
-               perror("listen()");
+               perror("tcpListen():listen()");
                pthread_exit(NULL);
        }
 
+#ifdef DHT_LOG
+       fprintf(logfile,"tcpListen(): listening on port %d\n", TCP_PORT);
+       fflush(logfile);
+#endif
+
        while(1)
        {
                sockAccept = accept(sockListen, (struct sockaddr *)&clientAddr, &socklen);
@@ -465,17 +729,72 @@ void *tcpListen()
 void *tcpAccept(void *arg)
 {
        int sockAccept = (int)arg;
-       
-       printf("accepted tcp connection, file descriptor: %d\n", sockAccept);
+       int bytesReceived;
+       char msgType;
 
-       sleep(30);
+#ifdef DHT_LOG
+       fprintf(logfile, "tcpAccept(): accepted tcp connection, file descriptor: %d\n", sockAccept);
+       fflush(logfile);
+#endif
+
+       bytesReceived = recv(sockAccept, &msgType, sizeof(char), 0);
+       if (bytesReceived == -1)
+       {
+               perror("tcpAccept():recv()");
+       }
+       else if (bytesReceived == 0)
+       {
+#ifdef DHT_LOG
+               fprintf(logfile, "tcpAccept(): bytesReceived = 0\n", sockAccept);
+               fflush(logfile);
+#endif
+       }
+       else
+       {
+               switch (msgType)
+               {
+                       case DHT_INFO_CMD:
+                               if (send(sockAccept, &numHosts, sizeof(numHosts), 0) == -1)
+                               {
+                                       perror("tcpAccept():send()");
+                                       break;
+                               }
+                               if (send(sockAccept, &numBlocks, sizeof(numBlocks), 0) == -1)
+                               {
+                                       perror("tcpAccept():send()");
+                                       break;
+                               }
+                               if (send(sockAccept, hostArray, numHosts*sizeof(struct hostData),
+                                               0) == -1)
+                               {
+                                       perror("tcpAccept():send()");
+                                       break;
+                               }
+                               if (send(sockAccept, blockOwnerArray, numBlocks*sizeof(unsigned int),
+                                               0) == -1)
+                               {
+                                       perror("tcpAccept():send()");
+                                       break;
+                               }
+                               break;
+                       default:
+#ifdef DHT_LOG
+                               fprintf(logfile, "tcpAccept(): unrecognized msg type\n");
+                               fflush(logfile);
+#endif
+               }
+       }
 
        if (close(sockAccept) == -1)
        {
-               perror("close()");
+               perror("tcpAccept():close()");
        }
 
-       printf("closed tcp connection, file descriptor: %d\n", sockAccept);
+#ifdef DHT_LOG
+       fprintf(logfile, "tcpAccept(): closed tcp connection, file descriptor: %d\n",
+               sockAccept);
+       fflush(logfile);
+#endif
 
        pthread_exit(NULL);
 }
@@ -495,7 +814,7 @@ unsigned int getMyIpAddr()
 
        if((sock = socket(AF_INET, SOCK_STREAM, 0)) < 0)
        {
-               perror("socket()");
+               perror("getMyIpAddr():socket()");
                return 1;
        }
 
@@ -504,15 +823,197 @@ unsigned int getMyIpAddr()
        
        if(ioctl(sock, SIOCGIFADDR, &interfaceInfo) != 0)
        {
-               perror("ioctl()");
+               perror("getMyIpAddr():ioctl()");
                return 1;
        }
 
        return ntohl(myAddr->sin_addr.s_addr);
 }
 
+unsigned int getLeadersIpAddr()
+{
+       unsigned int reply_ip;
+       int bytesReceived;
+       char myMessage;
+       char response;
+
+#ifdef DHT_LOG
+       fprintf(logfile, "getLeadersIpAddr(): broadcasting...\n");
+       fflush(logfile);
+#endif
+
+       myMessage = FIND_LEADER_CMD;
+
+       bytesReceived = udpBroadcastWaitForResponse(&reply_ip, UDP_PORT,
+               (void *)&myMessage, sizeof(myMessage), (void *)&response,
+               sizeof(response), TIMEOUT_MS, MAX_RETRIES);
+
+       if (bytesReceived == -1)
+       {
+#ifdef DHT_LOG
+       fprintf(logfile, "getLeadersIpAddr(): no response\n");
+       fflush(logfile);
+#endif
+               return 0;
+       }
+       else if (response == FIND_LEADER_RES)
+       {
+#ifdef DHT_LOG
+       struct in_addr reply_addr;
+       reply_addr.s_addr = htonl(reply_ip);
+       fprintf(logfile, "getLeadersIpAddr(): leader found:%s\n",
+                                       inet_ntoa(reply_addr));
+       fflush(logfile);
+#endif
+               return reply_ip;
+       }
+       else
+       {
+#ifdef DHT_LOG
+       fprintf(logfile, "getLeadersIpAddr(): unexpected response\n");
+       fflush(logfile);
+#endif
+               return 0;
+       }
+}
+
+int getDHTdata()
+{
+       struct sockaddr_in leader_addr;
+       int sock;
+       char msg;
+       int bytesReceived;
+
+       if ((sock = socket(AF_INET, SOCK_STREAM, 0)) == -1)
+       {
+               perror("getDHTdata():socket()");
+               return -1;
+       }
+
+       bzero((char *)&leader_addr, sizeof(leader_addr));
+       leader_addr.sin_family = AF_INET;
+       leader_addr.sin_port = htons(TCP_PORT);
+       leader_addr.sin_addr.s_addr = htonl(leader);
+
+       if (connect(sock, (struct sockaddr *)&leader_addr, sizeof(leader_addr)) == -1)
+       {
+               perror("getDHTdata():connect()");
+               close(sock);
+               return -1;
+       }
+       msg = DHT_INFO_CMD;
+       if (send(sock, &msg, sizeof(char), 0) == -1)
+       {
+               perror("getDHTdata():send()");
+               close(sock);
+               return -1;
+       }
+       bytesReceived = recv(sock, &numHosts, sizeof(numHosts), 0);
+       if (bytesReceived == -1)
+       {
+               perror("getDHTdata():recv()");
+               close(sock);
+               return -1;
+       }
+       if (bytesReceived != sizeof(numHosts))
+       {
+#ifdef DHT_LOG
+               fprintf(logfile,"getDHTdata(): ERROR: numHosts not completely received\n");
+               fflush(logfile);
+               close(sock);
+               return -1;
+#endif
+       }
+       bytesReceived = recv(sock, &numBlocks, sizeof(numBlocks), 0);
+       if (bytesReceived == -1)
+       {
+               perror("getDHTdata():recv()");
+               close(sock);
+               return -1;
+       }
+       if (bytesReceived != sizeof(numBlocks))
+       {
+#ifdef DHT_LOG
+               fprintf(logfile,"getDHTdata(): ERROR: numBlocks not completely received\n");
+               fflush(logfile);
+               close(sock);
+               return -1;
+#endif
+       }
+       if (hostArray != NULL)
+               free(hostArray);
+       hostArray = calloc(numHosts, sizeof(struct hostData));
+       bytesReceived = recv(sock, hostArray, numHosts*sizeof(struct hostData), 0);
+       if (bytesReceived == -1)
+       {
+               perror("getDHTdata():recv()");
+               close(sock);
+               return -1;
+       }
+       if (bytesReceived != numHosts*sizeof(struct hostData))
+       {
+#ifdef DHT_LOG
+               fprintf(logfile,"getDHTdata(): ERROR: hostArray not completely received\n");
+               fflush(logfile);
+               close(sock);
+               return -1;
+#endif
+       }
+       if (blockOwnerArray != NULL)
+               free(blockOwnerArray);
+       blockOwnerArray = calloc(numBlocks, sizeof(unsigned int));
+       bytesReceived = recv(sock, blockOwnerArray, numBlocks*sizeof(unsigned int), 0);
+       if (bytesReceived == -1)
+       {
+               perror("getDHTdata():recv()");
+               close(sock);
+               return -1;
+       }
+       if (bytesReceived != numBlocks*sizeof(unsigned int))
+       {
+#ifdef DHT_LOG
+               fprintf(logfile,"getDHTdata(): ERROR: blockOwnerArray not completely received\n");
+               fflush(logfile);
+               close(sock);
+               return -1;
+#endif
+       }
+#ifdef DHT_LOG
+               fprintf(logfile,"getDHTdata(): got data:\n");
+               writeDHTdata(logfile);
+               fflush(logfile);
+#endif
+       return 0;
+}
+
 unsigned int hash(unsigned int x)
 {
        return x % numBlocks;
 }
 
+void leadRebuild()
+{
+       
+}
+
+void writeDHTdata(FILE *outfile)
+{
+       int i;
+       struct in_addr address;
+       fprintf(outfile,"numHosts=%d,numBlocks=%d\n", numHosts, numBlocks);
+       fprintf(outfile,"hostArray: index: ipAddr, maxKeyCapacity\n");
+       for (i = 0; i < numHosts; i++)
+       {
+               address.s_addr = htonl(hostArray[i].ipAddr);
+               fprintf(outfile,"%d: %s, %d\n", i, inet_ntoa(address),
+                       hostArray[i].maxKeyCapacity);
+       }
+       fprintf(outfile,"blockOwnerArray: index: blockOwner\n");
+       for (i = 0; i < numBlocks; i++)
+       {
+               fprintf(outfile,"%d: %d\n", i, blockOwnerArray[i]);
+       }
+}
+
+#endif
+
index 886c7871038eb6ab29469b896cc8fdde82f47513..ceeb2a0a74af019113d4a14cfdb8e970b6354974 100644 (file)
@@ -1,6 +1,8 @@
 #ifndef _DHT_H
 #define _DHT_H
 
+//#define SIMPLE_DHT
+
 #define DHT_NO_KEY_LIMIT 0xFFFFFFFF
 
 //called by host which joins (or starts) the system
@@ -14,7 +16,8 @@ void dhtExit();
 int dhtInsert(unsigned int key, unsigned int val);
 //returns 0 if successful, -1 if an error occurred
 int dhtRemove(unsigned int key);
-//returns 0 if successful and copies val into *val, 1 if key not found, -1 if an error occurred
+//returns 0 if successful and copies val into *val,
+// 1 if key not found, -1 if an error occurred
 int dhtSearch(unsigned int key, unsigned int *val);
 
 #endif
index dbff22659ea52e42a4a00f9064716463df716e23..636b4dd4399019696cf0fd5419ea0cf3de2eb40f 100644 (file)
@@ -18,16 +18,11 @@ int main()
        localHash = chashCreate(HASH_SIZE, LOADFACTOR);
        srandom(time(0));
 
-       for (key = 0; key < NUM_ITEMS; key++)
+       for (key = 1; key < NUM_ITEMS; key++)
        {
                vals[key] = random();
        }
 
-       vals[NUM_ITEMS / 2] = 0;
-       vals[NUM_ITEMS / 3] = 1;
-       vals[NUM_ITEMS / 4] = 2;
-       vals[NUM_ITEMS / 5] = 0xFFFFFFFF;
-
        printf("testing dhtInsert() and dhtSearch()\n");
 
        for (key = 0; key < NUM_ITEMS; key++)
@@ -36,7 +31,7 @@ int main()
        }
 
        error = 0;
-       for (key = 0; key < NUM_ITEMS; key++)
+       for (key = 1; key < NUM_ITEMS; key++)
        {
                retval = dhtSearch(key, &val);
                if (retval == 1)
@@ -66,13 +61,13 @@ int main()
        printf("(this currently fails if key = 0 OR val = 0, due to underlying hash table)\n");
        printf("testing underlying hash table (clookup.h)\n");
 
-       for (key = 0; key < NUM_ITEMS; key++)
+       for (key = 1; key < NUM_ITEMS; key++)
        {
                chashInsert(localHash, key, (void *)vals[key]);
        }
 
        error = 0;
-       for (key = 0; key < NUM_ITEMS; key++)
+       for (key = 1; key < NUM_ITEMS; key++)
        {
                val = (unsigned int)chashSearch(localHash, key);
                if ((void *)val == NULL)
@@ -153,7 +148,9 @@ int main()
        else
                printf("one or more errors occurred\n");
 
-       sleep(60);
+       sleep(5);
+
+       dhtExit();
 
        return 0;
 }