Preliminary code for distributed hash table. Defines some basic udp message structure...
authorerubow <erubow>
Tue, 17 Apr 2007 07:25:53 +0000 (07:25 +0000)
committererubow <erubow>
Tue, 17 Apr 2007 07:25:53 +0000 (07:25 +0000)
Robust/src/Runtime/DSTM/interface/dht.c [new file with mode: 0644]
Robust/src/Runtime/DSTM/interface/dht.h [new file with mode: 0644]
Robust/src/Runtime/DSTM/interface/testdht.c [new file with mode: 0644]

diff --git a/Robust/src/Runtime/DSTM/interface/dht.c b/Robust/src/Runtime/DSTM/interface/dht.c
new file mode 100644 (file)
index 0000000..b8d6da7
--- /dev/null
@@ -0,0 +1,217 @@
+#include <netinet/in.h>
+#include <arpa/inet.h>
+#include <sys/types.h>
+#include <sys/socket.h>
+#include <stdio.h>
+#include <string.h>
+#include <stdlib.h>
+#include <unistd.h>
+#include <pthread.h>
+#include <sys/time.h>
+#include <sys/poll.h>
+#include "dht.h"
+#include "llookup.h"
+
+#define BUFFER_SIZE 512
+#define PORT 2157
+#define TIMEOUT_MS 500
+
+unsigned int numHosts;
+struct hostData *hostList;
+unsigned int numBlocks;
+struct hostData *blockOwner;
+
+void *dhtListen();
+int sendWaitForAck(unsigned int dest_ip, unsigned short dest_port, void *msg, unsigned int msglen);
+
+void dhtInit()
+{
+       
+       //start server (udp)
+       pthread_t threadListen;
+       pthread_create(&threadListen, NULL, dhtListen, NULL);
+
+       //announce presence (udp), get data structures from leader (leader initiates tcp transfer)
+       //if no response, I am the first
+       //otherwise, scan array and choose blocks to take over
+       //get data from hosts that own those blocks (tcp), fill hash table
+       //notify (the leader or everybody?) of ownership changes
+       return;
+}
+
+void dhtExit()
+{
+
+}
+
+int dhtInsert(unsigned int key, unsigned int val)
+{
+       unsigned int dest_ip = 0x7F000001;
+       unsigned short dest_port = PORT;
+       struct dhtInsertMsg myMessage;
+       myMessage.msgType = DHT_INSERT;
+       myMessage.key = key;
+       myMessage.val = val;
+       return sendWaitForAck(dest_ip, dest_port, (void *)&myMessage, sizeof(struct dhtInsertMsg));
+}
+
+int dhtRemove(unsigned int key)
+{
+       unsigned int dest_ip = 0x7F000001;
+       unsigned short dest_port = PORT;
+       struct dhtRemoveMsg myMessage;
+       myMessage.msgType = DHT_REMOVE;
+       myMessage.key = key;
+       return sendWaitForAck(dest_ip, dest_port, (void *)&myMessage, sizeof(struct dhtRemoveMsg));
+}
+
+int dhtSearch(unsigned int key)
+{
+       unsigned int dest_ip = 0x7F000001;
+       unsigned short dest_port = PORT;
+       struct dhtSearchMsg myMessage;
+       myMessage.msgType = DHT_SEARCH;
+       myMessage.key = key;
+       //TODO:this obviously requires more than an ACK, first implement actual hash table
+       return sendWaitForAck(dest_ip, dest_port, (void *)&myMessage, sizeof(struct dhtSearchMsg));
+}
+
+//helper functions
+void *dhtListen()
+{
+       struct sockaddr_in my_addr;
+       struct sockaddr_in client_addr;
+       int sock;
+       socklen_t socklen = sizeof(struct sockaddr_in);
+       char buffer[BUFFER_SIZE];
+       ssize_t bytesReceived;
+       struct timeval now;
+
+       if ((sock = socket(PF_INET, SOCK_DGRAM, IPPROTO_UDP)) == -1)
+       {
+               perror("socket()");
+               exit(1);
+       }
+       
+       bzero(&my_addr, socklen);
+       my_addr.sin_family=AF_INET;
+       my_addr.sin_addr.s_addr=INADDR_ANY;
+       my_addr.sin_port=htons(PORT);
+
+       if (bind(sock, (struct sockaddr *)&my_addr, socklen) == -1)
+       {
+               perror("bind()");
+               exit(1);
+       }
+       printf("listening...\n");
+       while(1)
+       {
+               if ((bytesReceived = recvfrom(sock, buffer, BUFFER_SIZE, 0, (struct sockaddr *)&client_addr, &socklen)) == -1)
+               {
+                       printf("recvfrom() returned -1\n");
+                       break;
+               }
+               if (bytesReceived == 0)
+               {
+                       printf("recvfrom() returned 0\n");
+                       break;
+               }
+               gettimeofday(&now, NULL);
+               printf("message received:%ds,%dus\n", now.tv_sec, now.tv_usec);
+
+               printf("Received %d bytes from %x:%d\n", bytesReceived, client_addr.sin_addr.s_addr, client_addr.sin_port);
+               switch (buffer[0])
+               {
+                       case DHT_INSERT:
+                               if (bytesReceived != sizeof(struct dhtInsertMsg))
+                               {
+                                       printf("error: incorrect message size\n");
+                                       break;
+                               }
+                               printf("Insert: key=%d, val=%d\n",((struct dhtInsertMsg *)buffer)->key,((struct dhtInsertMsg *)buffer)->val);
+                               buffer[0] = DHT_ACK;
+                               sendto(sock, buffer, 1, 0, (struct sockaddr *)&client_addr, socklen);
+                               break;
+                       case DHT_REMOVE:
+                               if (bytesReceived != sizeof(struct dhtRemoveMsg))
+                               {
+                                       printf("error: incorrect message size\n");
+                                       break;
+                               }
+                               printf("Remove: key=%d\n",((struct dhtRemoveMsg *)buffer)->key);
+                               buffer[0] = DHT_ACK;
+                               sendto(sock, buffer, 1, 0, (struct sockaddr *)&client_addr, socklen);
+                               break;
+                       case DHT_SEARCH:
+                               if (bytesReceived != sizeof(struct dhtSearchMsg))
+                               {
+                                       printf("error: incorrect message size\n");
+                                       break;
+                               }
+                               printf("Search: key=%d\n",((struct dhtSearchMsg *)buffer)->key);
+                               buffer[0] = DHT_ACK;
+                               sendto(sock, buffer, 1, 0, (struct sockaddr *)&client_addr, socklen);
+                               break;                  
+                       default:
+                               printf("Unknown message type\n");
+               }
+       }
+}
+
+//send message, wait for response, resend twice before return failure
+int sendWaitForAck(unsigned int dest_ip, unsigned  short dest_port, void *msg, unsigned int msglen)
+{
+       struct sockaddr_in server_addr;
+       struct sockaddr_in ack_addr;
+       socklen_t socklen = sizeof(struct sockaddr_in);
+       struct pollfd pollsock;
+       struct timeval now;
+       int retval;
+       int i;
+       char ackByte;
+       ssize_t bytesReceived;
+
+       bzero((char *) &server_addr, sizeof(server_addr));
+       server_addr.sin_family = AF_INET;
+       server_addr.sin_port = htons(dest_port);
+       server_addr.sin_addr.s_addr = htonl(dest_ip);
+
+       if ((pollsock.fd = socket(PF_INET, SOCK_DGRAM, IPPROTO_UDP)) == -1)
+       {
+               printf("error creating socket\n");
+               return 1;
+       }
+       
+       pollsock.events = POLLIN;
+       
+       for (i = 0; i < 3; i++)
+       {
+               if (i > 0)
+                       printf("trying again, count: %d\n", i+1);
+               if (sendto(pollsock.fd, msg, msglen, 0, (struct sockaddr *)&server_addr, socklen) == -1)
+               {
+                       printf("error sending\n");
+                       return 1;
+               }
+               gettimeofday(&now, NULL);
+               printf("message sent:%ds,%dus\n", now.tv_sec, now.tv_usec);
+               retval = poll(&pollsock, 1, TIMEOUT_MS);
+               if (retval !=0)
+               {
+                       bytesReceived = recvfrom(pollsock.fd, &ackByte, 1, 0, (struct sockaddr *)&ack_addr, &socklen);
+                       if ((bytesReceived == 1) && (ack_addr.sin_addr.s_addr == server_addr.sin_addr.s_addr)
+                       && (ack_addr.sin_port == server_addr.sin_port) && (ackByte == DHT_ACK))
+                       {
+                               close(pollsock.fd);
+                               gettimeofday(&now, NULL);
+                               printf("received ack:%ds,%dus\n", now.tv_sec, now.tv_usec);
+                               return 0;
+                       }
+               }
+       }
+       close(pollsock.fd);
+       gettimeofday(&now, NULL);
+       printf("timed out, no ack:%ds,%dus\n", now.tv_sec, now.tv_usec);
+       return 1;
+}
+
diff --git a/Robust/src/Runtime/DSTM/interface/dht.h b/Robust/src/Runtime/DSTM/interface/dht.h
new file mode 100644 (file)
index 0000000..ac6e748
--- /dev/null
@@ -0,0 +1,58 @@
+#ifndef _DHT_H
+#define _DHT_H
+
+#define INIT_NUM_BLOCKS 16
+
+//messages
+#define DHT_INSERT 1
+#define DHT_REMOVE 2
+#define DHT_SEARCH 3
+#define DHT_ACK 4
+#define DHT_JOIN 5
+#define DHT_LEAVE 6
+#define DHT_REBUILD 7
+//etc...
+
+struct hostData {
+       unsigned int ipAddr;
+       unsigned int maxKeyCapacity;
+       struct hostData *next;
+};
+
+struct dhtInsertMsg {
+       unsigned char msgType;
+       unsigned int unused:12;
+       unsigned int key;
+       unsigned int val;
+};
+
+struct dhtRemoveMsg {
+       unsigned char msgType;
+       unsigned int unused:12;
+       unsigned int key;
+};
+
+struct dhtSearchMsg {
+       unsigned char msgType;
+       unsigned int unused:12;
+       unsigned int key;
+};
+
+struct dhtJoinMsg {
+       unsigned char msgType;
+       unsigned int unused:12;
+       struct hostData newHost;
+};
+
+//called by host which joins (or starts) the system
+void dhtInit();
+//exit system, cleanup
+void dhtExit();
+
+//called by whoever performs the creation, move, deletion
+int dhtInsert(unsigned int key, unsigned int val);
+int dhtRemove(unsigned int key);
+int dhtSearch(unsigned int key);
+
+#endif
+
diff --git a/Robust/src/Runtime/DSTM/interface/testdht.c b/Robust/src/Runtime/DSTM/interface/testdht.c
new file mode 100644 (file)
index 0000000..99368b1
--- /dev/null
@@ -0,0 +1,21 @@
+#include <stdio.h>
+#include "dht.h"
+
+int main()
+{
+       int i;
+
+       dhtInit();
+       sleep(1);
+       
+       for(i = 0; i < 3; i++)
+       {
+               dhtInsert(i, 10-i);
+               sleep(1);
+               dhtRemove(i);
+               sleep(1);
+               dhtSearch(i);
+               sleep(1);
+       }
+       return 0;
+}