From d992d2b88e882c3f1997e7c1cf0715e5ef3d6447 Mon Sep 17 00:00:00 2001 From: erubow Date: Tue, 17 Apr 2007 07:25:53 +0000 Subject: [PATCH] Preliminary code for distributed hash table. Defines some basic udp message structures, etc. --- Robust/src/Runtime/DSTM/interface/dht.c | 217 ++++++++++++++++++++ Robust/src/Runtime/DSTM/interface/dht.h | 58 ++++++ Robust/src/Runtime/DSTM/interface/testdht.c | 21 ++ 3 files changed, 296 insertions(+) create mode 100644 Robust/src/Runtime/DSTM/interface/dht.c create mode 100644 Robust/src/Runtime/DSTM/interface/dht.h create mode 100644 Robust/src/Runtime/DSTM/interface/testdht.c diff --git a/Robust/src/Runtime/DSTM/interface/dht.c b/Robust/src/Runtime/DSTM/interface/dht.c new file mode 100644 index 00000000..b8d6da7c --- /dev/null +++ b/Robust/src/Runtime/DSTM/interface/dht.c @@ -0,0 +1,217 @@ +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include "dht.h" +#include "llookup.h" + +#define BUFFER_SIZE 512 +#define PORT 2157 +#define TIMEOUT_MS 500 + +unsigned int numHosts; +struct hostData *hostList; +unsigned int numBlocks; +struct hostData *blockOwner; + +void *dhtListen(); +int sendWaitForAck(unsigned int dest_ip, unsigned short dest_port, void *msg, unsigned int msglen); + +void dhtInit() +{ + + //start server (udp) + pthread_t threadListen; + pthread_create(&threadListen, NULL, dhtListen, NULL); + + //announce presence (udp), get data structures from leader (leader initiates tcp transfer) + //if no response, I am the first + //otherwise, scan array and choose blocks to take over + //get data from hosts that own those blocks (tcp), fill hash table + //notify (the leader or everybody?) of ownership changes + return; +} + +void dhtExit() +{ + +} + +int dhtInsert(unsigned int key, unsigned int val) +{ + unsigned int dest_ip = 0x7F000001; + unsigned short dest_port = PORT; + struct dhtInsertMsg myMessage; + myMessage.msgType = DHT_INSERT; + myMessage.key = key; + myMessage.val = val; + return sendWaitForAck(dest_ip, dest_port, (void *)&myMessage, sizeof(struct dhtInsertMsg)); +} + +int dhtRemove(unsigned int key) +{ + unsigned int dest_ip = 0x7F000001; + unsigned short dest_port = PORT; + struct dhtRemoveMsg myMessage; + myMessage.msgType = DHT_REMOVE; + myMessage.key = key; + return sendWaitForAck(dest_ip, dest_port, (void *)&myMessage, sizeof(struct dhtRemoveMsg)); +} + +int dhtSearch(unsigned int key) +{ + unsigned int dest_ip = 0x7F000001; + unsigned short dest_port = PORT; + struct dhtSearchMsg myMessage; + myMessage.msgType = DHT_SEARCH; + myMessage.key = key; + //TODO:this obviously requires more than an ACK, first implement actual hash table + return sendWaitForAck(dest_ip, dest_port, (void *)&myMessage, sizeof(struct dhtSearchMsg)); +} + +//helper functions +void *dhtListen() +{ + struct sockaddr_in my_addr; + struct sockaddr_in client_addr; + int sock; + socklen_t socklen = sizeof(struct sockaddr_in); + char buffer[BUFFER_SIZE]; + ssize_t bytesReceived; + struct timeval now; + + if ((sock = socket(PF_INET, SOCK_DGRAM, IPPROTO_UDP)) == -1) + { + perror("socket()"); + exit(1); + } + + bzero(&my_addr, socklen); + my_addr.sin_family=AF_INET; + my_addr.sin_addr.s_addr=INADDR_ANY; + my_addr.sin_port=htons(PORT); + + if (bind(sock, (struct sockaddr *)&my_addr, socklen) == -1) + { + perror("bind()"); + exit(1); + } + printf("listening...\n"); + while(1) + { + if ((bytesReceived = recvfrom(sock, buffer, BUFFER_SIZE, 0, (struct sockaddr *)&client_addr, &socklen)) == -1) + { + printf("recvfrom() returned -1\n"); + break; + } + if (bytesReceived == 0) + { + printf("recvfrom() returned 0\n"); + break; + } + gettimeofday(&now, NULL); + printf("message received:%ds,%dus\n", now.tv_sec, now.tv_usec); + + printf("Received %d bytes from %x:%d\n", bytesReceived, client_addr.sin_addr.s_addr, client_addr.sin_port); + switch (buffer[0]) + { + case DHT_INSERT: + if (bytesReceived != sizeof(struct dhtInsertMsg)) + { + printf("error: incorrect message size\n"); + break; + } + printf("Insert: key=%d, val=%d\n",((struct dhtInsertMsg *)buffer)->key,((struct dhtInsertMsg *)buffer)->val); + buffer[0] = DHT_ACK; + sendto(sock, buffer, 1, 0, (struct sockaddr *)&client_addr, socklen); + break; + case DHT_REMOVE: + if (bytesReceived != sizeof(struct dhtRemoveMsg)) + { + printf("error: incorrect message size\n"); + break; + } + printf("Remove: key=%d\n",((struct dhtRemoveMsg *)buffer)->key); + buffer[0] = DHT_ACK; + sendto(sock, buffer, 1, 0, (struct sockaddr *)&client_addr, socklen); + break; + case DHT_SEARCH: + if (bytesReceived != sizeof(struct dhtSearchMsg)) + { + printf("error: incorrect message size\n"); + break; + } + printf("Search: key=%d\n",((struct dhtSearchMsg *)buffer)->key); + buffer[0] = DHT_ACK; + sendto(sock, buffer, 1, 0, (struct sockaddr *)&client_addr, socklen); + break; + default: + printf("Unknown message type\n"); + } + } +} + +//send message, wait for response, resend twice before return failure +int sendWaitForAck(unsigned int dest_ip, unsigned short dest_port, void *msg, unsigned int msglen) +{ + struct sockaddr_in server_addr; + struct sockaddr_in ack_addr; + socklen_t socklen = sizeof(struct sockaddr_in); + struct pollfd pollsock; + struct timeval now; + int retval; + int i; + char ackByte; + ssize_t bytesReceived; + + bzero((char *) &server_addr, sizeof(server_addr)); + server_addr.sin_family = AF_INET; + server_addr.sin_port = htons(dest_port); + server_addr.sin_addr.s_addr = htonl(dest_ip); + + if ((pollsock.fd = socket(PF_INET, SOCK_DGRAM, IPPROTO_UDP)) == -1) + { + printf("error creating socket\n"); + return 1; + } + + pollsock.events = POLLIN; + + for (i = 0; i < 3; i++) + { + if (i > 0) + printf("trying again, count: %d\n", i+1); + if (sendto(pollsock.fd, msg, msglen, 0, (struct sockaddr *)&server_addr, socklen) == -1) + { + printf("error sending\n"); + return 1; + } + gettimeofday(&now, NULL); + printf("message sent:%ds,%dus\n", now.tv_sec, now.tv_usec); + retval = poll(&pollsock, 1, TIMEOUT_MS); + if (retval !=0) + { + bytesReceived = recvfrom(pollsock.fd, &ackByte, 1, 0, (struct sockaddr *)&ack_addr, &socklen); + if ((bytesReceived == 1) && (ack_addr.sin_addr.s_addr == server_addr.sin_addr.s_addr) + && (ack_addr.sin_port == server_addr.sin_port) && (ackByte == DHT_ACK)) + { + close(pollsock.fd); + gettimeofday(&now, NULL); + printf("received ack:%ds,%dus\n", now.tv_sec, now.tv_usec); + return 0; + } + } + } + close(pollsock.fd); + gettimeofday(&now, NULL); + printf("timed out, no ack:%ds,%dus\n", now.tv_sec, now.tv_usec); + return 1; +} + diff --git a/Robust/src/Runtime/DSTM/interface/dht.h b/Robust/src/Runtime/DSTM/interface/dht.h new file mode 100644 index 00000000..ac6e7489 --- /dev/null +++ b/Robust/src/Runtime/DSTM/interface/dht.h @@ -0,0 +1,58 @@ +#ifndef _DHT_H +#define _DHT_H + +#define INIT_NUM_BLOCKS 16 + +//messages +#define DHT_INSERT 1 +#define DHT_REMOVE 2 +#define DHT_SEARCH 3 +#define DHT_ACK 4 +#define DHT_JOIN 5 +#define DHT_LEAVE 6 +#define DHT_REBUILD 7 +//etc... + +struct hostData { + unsigned int ipAddr; + unsigned int maxKeyCapacity; + struct hostData *next; +}; + +struct dhtInsertMsg { + unsigned char msgType; + unsigned int unused:12; + unsigned int key; + unsigned int val; +}; + +struct dhtRemoveMsg { + unsigned char msgType; + unsigned int unused:12; + unsigned int key; +}; + +struct dhtSearchMsg { + unsigned char msgType; + unsigned int unused:12; + unsigned int key; +}; + +struct dhtJoinMsg { + unsigned char msgType; + unsigned int unused:12; + struct hostData newHost; +}; + +//called by host which joins (or starts) the system +void dhtInit(); +//exit system, cleanup +void dhtExit(); + +//called by whoever performs the creation, move, deletion +int dhtInsert(unsigned int key, unsigned int val); +int dhtRemove(unsigned int key); +int dhtSearch(unsigned int key); + +#endif + diff --git a/Robust/src/Runtime/DSTM/interface/testdht.c b/Robust/src/Runtime/DSTM/interface/testdht.c new file mode 100644 index 00000000..99368b10 --- /dev/null +++ b/Robust/src/Runtime/DSTM/interface/testdht.c @@ -0,0 +1,21 @@ +#include +#include "dht.h" + +int main() +{ + int i; + + dhtInit(); + sleep(1); + + for(i = 0; i < 3; i++) + { + dhtInsert(i, 10-i); + sleep(1); + dhtRemove(i); + sleep(1); + dhtSearch(i); + sleep(1); + } + return 0; +} -- 2.34.1