+++ /dev/null
-#include <netinet/in.h>
-#include <arpa/inet.h>
-#include <sys/types.h>
-#include <sys/socket.h>
-#include <stdio.h>
-#include <string.h>
-#include <stdlib.h>
-#include <unistd.h>
-#include <pthread.h>
-#include <sys/time.h>
-#include <sys/poll.h>
-#include "dht.h"
-#include "llookup.h"
-
-#define BUFFER_SIZE 512
-#define PORT 2157
-#define TIMEOUT_MS 500
-
-unsigned int numHosts;
-struct hostData *hostList;
-unsigned int numBlocks;
-struct hostData *blockOwner;
-
-void *dhtListen();
-int sendWaitForAck(unsigned int dest_ip, unsigned short dest_port, void *msg, unsigned int msglen);
-
-void dhtInit()
-{
-
- //start server (udp)
- pthread_t threadListen;
- pthread_create(&threadListen, NULL, dhtListen, NULL);
-
- //announce presence (udp), get data structures from leader (leader initiates tcp transfer)
- //if no response, I am the first
- //otherwise, scan array and choose blocks to take over
- //get data from hosts that own those blocks (tcp), fill hash table
- //notify (the leader or everybody?) of ownership changes
- return;
-}
-
-void dhtExit()
-{
-
-}
-
-int dhtInsert(unsigned int key, unsigned int val)
-{
- unsigned int dest_ip = 0x7F000001;
- unsigned short dest_port = PORT;
- struct dhtInsertMsg myMessage;
- myMessage.msgType = DHT_INSERT;
- myMessage.key = key;
- myMessage.val = val;
- return sendWaitForAck(dest_ip, dest_port, (void *)&myMessage, sizeof(struct dhtInsertMsg));
-}
-
-int dhtRemove(unsigned int key)
-{
- unsigned int dest_ip = 0x7F000001;
- unsigned short dest_port = PORT;
- struct dhtRemoveMsg myMessage;
- myMessage.msgType = DHT_REMOVE;
- myMessage.key = key;
- return sendWaitForAck(dest_ip, dest_port, (void *)&myMessage, sizeof(struct dhtRemoveMsg));
-}
-
-int dhtSearch(unsigned int key)
-{
- unsigned int dest_ip = 0x7F000001;
- unsigned short dest_port = PORT;
- struct dhtSearchMsg myMessage;
- myMessage.msgType = DHT_SEARCH;
- myMessage.key = key;
- //TODO:this obviously requires more than an ACK, first implement actual hash table
- return sendWaitForAck(dest_ip, dest_port, (void *)&myMessage, sizeof(struct dhtSearchMsg));
-}
-
-//helper functions
-void *dhtListen()
-{
- struct sockaddr_in my_addr;
- struct sockaddr_in client_addr;
- int sock;
- socklen_t socklen = sizeof(struct sockaddr_in);
- char buffer[BUFFER_SIZE];
- ssize_t bytesReceived;
- struct timeval now;
-
- if ((sock = socket(PF_INET, SOCK_DGRAM, IPPROTO_UDP)) == -1)
- {
- perror("socket()");
- exit(1);
- }
-
- bzero(&my_addr, socklen);
- my_addr.sin_family=AF_INET;
- my_addr.sin_addr.s_addr=INADDR_ANY;
- my_addr.sin_port=htons(PORT);
-
- if (bind(sock, (struct sockaddr *)&my_addr, socklen) == -1)
- {
- perror("bind()");
- exit(1);
- }
- printf("listening...\n");
- while(1)
- {
- if ((bytesReceived = recvfrom(sock, buffer, BUFFER_SIZE, 0, (struct sockaddr *)&client_addr, &socklen)) == -1)
- {
- printf("recvfrom() returned -1\n");
- break;
- }
- if (bytesReceived == 0)
- {
- printf("recvfrom() returned 0\n");
- break;
- }
- gettimeofday(&now, NULL);
- printf("message received:%ds,%dus\n", now.tv_sec, now.tv_usec);
-
- printf("Received %d bytes from %x:%d\n", bytesReceived, client_addr.sin_addr.s_addr, client_addr.sin_port);
- switch (buffer[0])
- {
- case DHT_INSERT:
- if (bytesReceived != sizeof(struct dhtInsertMsg))
- {
- printf("error: incorrect message size\n");
- break;
- }
- printf("Insert: key=%d, val=%d\n",((struct dhtInsertMsg *)buffer)->key,((struct dhtInsertMsg *)buffer)->val);
- buffer[0] = DHT_ACK;
- sendto(sock, buffer, 1, 0, (struct sockaddr *)&client_addr, socklen);
- break;
- case DHT_REMOVE:
- if (bytesReceived != sizeof(struct dhtRemoveMsg))
- {
- printf("error: incorrect message size\n");
- break;
- }
- printf("Remove: key=%d\n",((struct dhtRemoveMsg *)buffer)->key);
- buffer[0] = DHT_ACK;
- sendto(sock, buffer, 1, 0, (struct sockaddr *)&client_addr, socklen);
- break;
- case DHT_SEARCH:
- if (bytesReceived != sizeof(struct dhtSearchMsg))
- {
- printf("error: incorrect message size\n");
- break;
- }
- printf("Search: key=%d\n",((struct dhtSearchMsg *)buffer)->key);
- buffer[0] = DHT_ACK;
- sendto(sock, buffer, 1, 0, (struct sockaddr *)&client_addr, socklen);
- break;
- default:
- printf("Unknown message type\n");
- }
- }
-}
-
-//send message, wait for response, resend twice before return failure
-int sendWaitForAck(unsigned int dest_ip, unsigned short dest_port, void *msg, unsigned int msglen)
-{
- struct sockaddr_in server_addr;
- struct sockaddr_in ack_addr;
- socklen_t socklen = sizeof(struct sockaddr_in);
- struct pollfd pollsock;
- struct timeval now;
- int retval;
- int i;
- char ackByte;
- ssize_t bytesReceived;
-
- bzero((char *) &server_addr, sizeof(server_addr));
- server_addr.sin_family = AF_INET;
- server_addr.sin_port = htons(dest_port);
- server_addr.sin_addr.s_addr = htonl(dest_ip);
-
- if ((pollsock.fd = socket(PF_INET, SOCK_DGRAM, IPPROTO_UDP)) == -1)
- {
- printf("error creating socket\n");
- return 1;
- }
-
- pollsock.events = POLLIN;
-
- for (i = 0; i < 3; i++)
- {
- if (i > 0)
- printf("trying again, count: %d\n", i+1);
- if (sendto(pollsock.fd, msg, msglen, 0, (struct sockaddr *)&server_addr, socklen) == -1)
- {
- printf("error sending\n");
- return 1;
- }
- gettimeofday(&now, NULL);
- printf("message sent:%ds,%dus\n", now.tv_sec, now.tv_usec);
- retval = poll(&pollsock, 1, TIMEOUT_MS);
- if (retval !=0)
- {
- bytesReceived = recvfrom(pollsock.fd, &ackByte, 1, 0, (struct sockaddr *)&ack_addr, &socklen);
- if ((bytesReceived == 1) && (ack_addr.sin_addr.s_addr == server_addr.sin_addr.s_addr)
- && (ack_addr.sin_port == server_addr.sin_port) && (ackByte == DHT_ACK))
- {
- close(pollsock.fd);
- gettimeofday(&now, NULL);
- printf("received ack:%ds,%dus\n", now.tv_sec, now.tv_usec);
- return 0;
- }
- }
- }
- close(pollsock.fd);
- gettimeofday(&now, NULL);
- printf("timed out, no ack:%ds,%dus\n", now.tv_sec, now.tv_usec);
- return 1;
-}
-