From b3de95daa1e5e77953af53657532a1e7cf40ae90 Mon Sep 17 00:00:00 2001 From: erubow Date: Sat, 28 Apr 2007 21:49:15 +0000 Subject: [PATCH] the begining of a DHT (only one host right now...) --- Robust/src/Runtime/DSTM/interface/dht.c | 387 ++++++++++++++++---- Robust/src/Runtime/DSTM/interface/dht.h | 51 +-- Robust/src/Runtime/DSTM/interface/testdht.c | 156 +++++++- 3 files changed, 467 insertions(+), 127 deletions(-) diff --git a/Robust/src/Runtime/DSTM/interface/dht.c b/Robust/src/Runtime/DSTM/interface/dht.c index b8d6da7c..9253ba19 100644 --- a/Robust/src/Runtime/DSTM/interface/dht.c +++ b/Robust/src/Runtime/DSTM/interface/dht.c @@ -2,6 +2,7 @@ #include #include #include +#include #include #include #include @@ -9,33 +10,136 @@ #include #include #include +#include +#include +#include #include "dht.h" -#include "llookup.h" +#include "clookup.h" //this works for now, do we need anything better? -#define BUFFER_SIZE 512 -#define PORT 2157 +#define BUFFER_SIZE 512 //maximum message size +#define LISTEN_PORT 2157 #define TIMEOUT_MS 500 +#define MAX_RETRIES 3 +#define INIT_NUM_BLOCKS 16 +#define DEFAULT_INTERFACE "eth0" + +//general commands +#define INSERT_COMMAND 1 +#define REMOVE_COMMAND 2 +#define SEARCH_COMMAND 3 +//general responses +#define INSERT_RESPONSE 4 +#define REMOVE_RESPONSE 5 +#define SEARCH_RESPONSE 6 + +//#define JOIN +//#define LEAVE +//reserved for leader +//#define REBUILD + +//etc... + +//status codes +#define INSERT_OK 1 +#define INSERT_ERROR 2 +#define REMOVE_OK 3 +#define REMOVE_ERROR 4 +#define KEY_FOUND 5 +#define KEY_NOT_FOUND 6 +#define NOT_KEY_OWNER 7 + +struct hostData { + unsigned int ipAddr; + unsigned int maxKeyCapacity; + struct hostData *next; +}; + +struct insertCmd { + unsigned char msgType; + unsigned int unused:12; + unsigned int key; + unsigned int val; +}; + +struct removeCmd { + unsigned char msgType; + unsigned int unused:12; + unsigned int key; +}; + +struct searchCmd { + unsigned char msgType; + unsigned int unused:12; + unsigned int key; +}; + +struct insertRes { + unsigned char msgType; + unsigned int status:12; +}; + +struct removeRes { + unsigned char msgType; + unsigned int status:12; +}; + +struct searchRes { + unsigned char msgType; + unsigned int status:12; + unsigned int val; +}; + +/*struct joinMsg { + unsigned char msgType; + unsigned int unused:12; + struct hostData newHost; +};*/ + +//TODO: leave message, rebuild message... unsigned int numHosts; struct hostData *hostList; +struct hostData *myHostData; unsigned int numBlocks; -struct hostData *blockOwner; +struct hostData **blockOwner; + +unsigned int getMyIpAddr(); void *dhtListen(); -int sendWaitForAck(unsigned int dest_ip, unsigned short dest_port, void *msg, unsigned int msglen); +//returns number of bytes received in resBuffer, or -1 if an error occurred +int sendWaitForResponse(unsigned int dest_ip, unsigned short dest_port, void *msg, unsigned int msglen, void *resBuffer, unsigned int resBufferSize, unsigned int timeout, unsigned int numRetries); +int sendNoWait(unsigned int dest_ip, unsigned short dest_port, void *msg, unsigned int msglen); +unsigned int getKeyOwner(unsigned int key); +unsigned int hash(unsigned int x); -void dhtInit() +void dhtInit(unsigned int maxKeyCapacity) { - - //start server (udp) - pthread_t threadListen; - pthread_create(&threadListen, NULL, dhtListen, NULL); + int i; + + myHostData = malloc(sizeof(struct hostData)); + myHostData->ipAddr = getMyIpAddr(); + myHostData->maxKeyCapacity; + myHostData->next = NULL; + //announce presence (udp), get data structures from leader (leader initiates tcp transfer) //if no response, I am the first + hostList = myHostData; + numBlocks = INIT_NUM_BLOCKS; + blockOwner = malloc(numBlocks * sizeof(struct hostData)); + for (i = 0; i < numBlocks; i++) + { + blockOwner[i] = myHostData; + } + //otherwise, scan array and choose blocks to take over //get data from hosts that own those blocks (tcp), fill hash table //notify (the leader or everybody?) of ownership changes + + //start server (udp) + pthread_t threadListen; + pthread_create(&threadListen, NULL, dhtListen, NULL); + return; } @@ -46,34 +150,85 @@ void dhtExit() int dhtInsert(unsigned int key, unsigned int val) { - unsigned int dest_ip = 0x7F000001; - unsigned short dest_port = PORT; - struct dhtInsertMsg myMessage; - myMessage.msgType = DHT_INSERT; + unsigned int dest_ip = getKeyOwner(key); + unsigned short dest_port = LISTEN_PORT; + struct insertCmd myMessage; + struct insertRes response; + int bytesReceived; + + myMessage.msgType = INSERT_COMMAND; myMessage.key = key; myMessage.val = val; - return sendWaitForAck(dest_ip, dest_port, (void *)&myMessage, sizeof(struct dhtInsertMsg)); + + bytesReceived = sendWaitForResponse(dest_ip, dest_port, (void *)&myMessage, sizeof(struct insertCmd), (void *)&response, sizeof(struct insertRes), TIMEOUT_MS, MAX_RETRIES); + if (bytesReceived == sizeof(struct insertRes)) + { + if (response.msgType == INSERT_RESPONSE) + { + if (response.status == INSERT_OK) + return 0; +// if (response.status == NOT_KEY_OWNER) + } + } +//TODO: find owner and try again, request rebuild if necessary + return -1; //this function should be robust enough to always return 0 } int dhtRemove(unsigned int key) { - unsigned int dest_ip = 0x7F000001; - unsigned short dest_port = PORT; - struct dhtRemoveMsg myMessage; - myMessage.msgType = DHT_REMOVE; + unsigned int dest_ip = getKeyOwner(key); + unsigned short dest_port = LISTEN_PORT; + struct removeCmd myMessage; + struct removeRes response; + int bytesReceived; + + myMessage.msgType = REMOVE_COMMAND; myMessage.key = key; - return sendWaitForAck(dest_ip, dest_port, (void *)&myMessage, sizeof(struct dhtRemoveMsg)); + + bytesReceived = sendWaitForResponse(dest_ip, dest_port, (void *)&myMessage, sizeof(struct removeCmd), (void *)&response, sizeof(struct removeRes), TIMEOUT_MS, MAX_RETRIES); + if (bytesReceived == sizeof(struct removeRes)) + { + if (response.msgType == REMOVE_RESPONSE) + { + if (response.status == REMOVE_OK) + return 0; +// if (response.status == NOT_KEY_OWNER) + } + } +//TODO: find owner and try again, request rebuild if necessary + return -1; //this function should be robust enough to always return 0 } -int dhtSearch(unsigned int key) +int dhtSearch(unsigned int key, unsigned int *val) { - unsigned int dest_ip = 0x7F000001; - unsigned short dest_port = PORT; - struct dhtSearchMsg myMessage; - myMessage.msgType = DHT_SEARCH; + unsigned int dest_ip = getKeyOwner(key); + unsigned short dest_port = LISTEN_PORT; + struct searchCmd myMessage; + struct searchRes response; + int bytesReceived; + + myMessage.msgType = SEARCH_COMMAND; myMessage.key = key; - //TODO:this obviously requires more than an ACK, first implement actual hash table - return sendWaitForAck(dest_ip, dest_port, (void *)&myMessage, sizeof(struct dhtSearchMsg)); + + bytesReceived = sendWaitForResponse(dest_ip, dest_port, (void *)&myMessage, sizeof(struct searchCmd), (void *)&response, sizeof(struct searchRes), TIMEOUT_MS, MAX_RETRIES); + if (bytesReceived == sizeof(struct searchRes)) + { + if (response.msgType == SEARCH_RESPONSE) + { + if (response.status == KEY_FOUND) + { + *val = response.val; + return 0; + } + if (response.status == KEY_NOT_FOUND) + { + return 1; + } +// if (response.status == NOT_KEY_OWNER) + } + } +//TODO: find owner and try again, request rebuild if necessary + return -1; //this function should be robust enough to always return 0 or 1 } //helper functions @@ -85,8 +240,17 @@ void *dhtListen() socklen_t socklen = sizeof(struct sockaddr_in); char buffer[BUFFER_SIZE]; ssize_t bytesReceived; + struct insertCmd *insertCmdPtr; + struct removeCmd *removeCmdPtr; + struct searchCmd *searchCmdPtr; + struct insertRes *insertResPtr; + struct removeRes *removeResPtr; + struct searchRes *searchResPtr; + char replyBuffer[BUFFER_SIZE]; struct timeval now; + chashtable_t *myHashTable = chashCreate(HASH_SIZE, LOADFACTOR); + if ((sock = socket(PF_INET, SOCK_DGRAM, IPPROTO_UDP)) == -1) { perror("socket()"); @@ -96,19 +260,19 @@ void *dhtListen() bzero(&my_addr, socklen); my_addr.sin_family=AF_INET; my_addr.sin_addr.s_addr=INADDR_ANY; - my_addr.sin_port=htons(PORT); + my_addr.sin_port=htons(LISTEN_PORT); if (bind(sock, (struct sockaddr *)&my_addr, socklen) == -1) { perror("bind()"); exit(1); } - printf("listening...\n"); +// printf("listening...\n"); while(1) { if ((bytesReceived = recvfrom(sock, buffer, BUFFER_SIZE, 0, (struct sockaddr *)&client_addr, &socklen)) == -1) { - printf("recvfrom() returned -1\n"); + perror("recvfrom()"); break; } if (bytesReceived == 0) @@ -117,58 +281,99 @@ void *dhtListen() break; } gettimeofday(&now, NULL); - printf("message received:%ds,%dus\n", now.tv_sec, now.tv_usec); +// printf("message received:%ds,%dus\n", now.tv_sec, now.tv_usec); - printf("Received %d bytes from %x:%d\n", bytesReceived, client_addr.sin_addr.s_addr, client_addr.sin_port); +// printf("Received %d bytes from %x:%d\n", bytesReceived, client_addr.sin_addr.s_addr, client_addr.sin_port); switch (buffer[0]) { - case DHT_INSERT: - if (bytesReceived != sizeof(struct dhtInsertMsg)) + case INSERT_COMMAND: + if (bytesReceived != sizeof(struct insertCmd)) { printf("error: incorrect message size\n"); break; } - printf("Insert: key=%d, val=%d\n",((struct dhtInsertMsg *)buffer)->key,((struct dhtInsertMsg *)buffer)->val); - buffer[0] = DHT_ACK; - sendto(sock, buffer, 1, 0, (struct sockaddr *)&client_addr, socklen); + insertCmdPtr = (struct insertCmd *)buffer; +// printf("Insert: key=%d, val=%d\n", insertCmdPtr->key, insertCmdPtr->val); + insertResPtr = (struct insertRes *)replyBuffer; + insertResPtr->msgType = INSERT_RESPONSE; + if (getKeyOwner(insertCmdPtr->key) == myHostData->ipAddr) + { + //note: casting val to void * in order to conform to API + if(chashInsert(myHashTable, insertCmdPtr->key, (void *)insertCmdPtr->val) == 0) + insertResPtr->status = INSERT_OK; + else + insertResPtr->status = INSERT_ERROR; + } + else + { + insertResPtr->status = NOT_KEY_OWNER;; + } + sendto(sock, (void *)insertResPtr, sizeof(struct insertRes), 0, (struct sockaddr *)&client_addr, socklen); break; - case DHT_REMOVE: - if (bytesReceived != sizeof(struct dhtRemoveMsg)) + case REMOVE_COMMAND: + if (bytesReceived != sizeof(struct removeCmd)) { printf("error: incorrect message size\n"); break; } - printf("Remove: key=%d\n",((struct dhtRemoveMsg *)buffer)->key); - buffer[0] = DHT_ACK; - sendto(sock, buffer, 1, 0, (struct sockaddr *)&client_addr, socklen); + removeCmdPtr = (struct removeCmd *)buffer; +// printf("Remove: key=%d\n", removeCmdPtr->key); + removeResPtr = (struct removeRes *)replyBuffer; + removeResPtr->msgType = REMOVE_RESPONSE; + if (getKeyOwner(removeCmdPtr->key) == myHostData->ipAddr) + { + //note: casting val to void * in order to conform to API + if(chashRemove(myHashTable, removeCmdPtr->key) == 0) + removeResPtr->status = INSERT_OK; + else + removeResPtr->status = INSERT_ERROR; + } + else + { + removeResPtr->status = NOT_KEY_OWNER; + } + sendto(sock, (void *)removeResPtr, sizeof(struct removeRes), 0, (struct sockaddr *)&client_addr, socklen); break; - case DHT_SEARCH: - if (bytesReceived != sizeof(struct dhtSearchMsg)) + case SEARCH_COMMAND: + if (bytesReceived != sizeof(struct searchCmd)) { printf("error: incorrect message size\n"); break; } - printf("Search: key=%d\n",((struct dhtSearchMsg *)buffer)->key); - buffer[0] = DHT_ACK; - sendto(sock, buffer, 1, 0, (struct sockaddr *)&client_addr, socklen); - break; - default: - printf("Unknown message type\n"); + searchCmdPtr = (struct searchCmd *)buffer; +// printf("Search: key=%d\n",searchCmdPtr->key); + searchResPtr = (struct searchRes *)replyBuffer; + searchResPtr->msgType = SEARCH_RESPONSE; + if (getKeyOwner(searchCmdPtr->key) == myHostData->ipAddr) + { + //note: casting val to void * in order to conform to API + if((searchResPtr->val = (unsigned int)chashSearch(myHashTable, searchCmdPtr->key)) == 0) + searchResPtr->status = KEY_NOT_FOUND; + else + searchResPtr->status = KEY_FOUND; + } + else + { + searchResPtr->status = NOT_KEY_OWNER; + } + sendto(sock, (void *)searchResPtr, sizeof(struct searchRes), 0, (struct sockaddr *)&client_addr, socklen); + break; + //just ignore anything else +// default: +// printf("Unknown message type\n"); } } } -//send message, wait for response, resend twice before return failure -int sendWaitForAck(unsigned int dest_ip, unsigned short dest_port, void *msg, unsigned int msglen) +int sendWaitForResponse(unsigned int dest_ip, unsigned short dest_port, void *msg, unsigned int msglen, void *resBuffer, unsigned int resBufferSize, unsigned int timeout, unsigned int numRetries) { struct sockaddr_in server_addr; struct sockaddr_in ack_addr; socklen_t socklen = sizeof(struct sockaddr_in); struct pollfd pollsock; - struct timeval now; +// struct timeval now; int retval; int i; - char ackByte; ssize_t bytesReceived; bzero((char *) &server_addr, sizeof(server_addr)); @@ -178,40 +383,76 @@ int sendWaitForAck(unsigned int dest_ip, unsigned short dest_port, void *msg, u if ((pollsock.fd = socket(PF_INET, SOCK_DGRAM, IPPROTO_UDP)) == -1) { - printf("error creating socket\n"); - return 1; + perror("socket()"); + return -1; } pollsock.events = POLLIN; - for (i = 0; i < 3; i++) + for (i = 0; i < MAX_RETRIES; i++) { - if (i > 0) - printf("trying again, count: %d\n", i+1); +// if (i > 0) +// printf("trying again, count: %d\n", i+1); if (sendto(pollsock.fd, msg, msglen, 0, (struct sockaddr *)&server_addr, socklen) == -1) { - printf("error sending\n"); - return 1; + perror("sendto"); + return -1; } - gettimeofday(&now, NULL); - printf("message sent:%ds,%dus\n", now.tv_sec, now.tv_usec); - retval = poll(&pollsock, 1, TIMEOUT_MS); +// gettimeofday(&now, NULL); +// printf("message sent:%ds,%dus\n", now.tv_sec, now.tv_usec); + retval = poll(&pollsock, 1, timeout); if (retval !=0) { - bytesReceived = recvfrom(pollsock.fd, &ackByte, 1, 0, (struct sockaddr *)&ack_addr, &socklen); - if ((bytesReceived == 1) && (ack_addr.sin_addr.s_addr == server_addr.sin_addr.s_addr) - && (ack_addr.sin_port == server_addr.sin_port) && (ackByte == DHT_ACK)) + bytesReceived = recvfrom(pollsock.fd, resBuffer, resBufferSize, 0, (struct sockaddr *)&ack_addr, &socklen); + if ((ack_addr.sin_addr.s_addr == server_addr.sin_addr.s_addr) + && (ack_addr.sin_port == server_addr.sin_port)) { close(pollsock.fd); - gettimeofday(&now, NULL); - printf("received ack:%ds,%dus\n", now.tv_sec, now.tv_usec); - return 0; +// gettimeofday(&now, NULL); +// printf("received response:%ds,%dus\n", now.tv_sec, now.tv_usec); + return bytesReceived; } } } close(pollsock.fd); - gettimeofday(&now, NULL); - printf("timed out, no ack:%ds,%dus\n", now.tv_sec, now.tv_usec); - return 1; +// gettimeofday(&now, NULL); +// printf("timed out, no ack:%ds,%dus\n", now.tv_sec, now.tv_usec); + return -1; +} + +unsigned int getKeyOwner(unsigned int key) +{ + return blockOwner[hash(key)]->ipAddr; +} + +unsigned int getMyIpAddr() +{ + int sock; + struct ifreq interfaceInfo; + struct sockaddr_in *myAddr = (struct sockaddr_in *)&interfaceInfo.ifr_addr; + + memset(&interfaceInfo, 0, sizeof(struct ifreq)); + + if((sock = socket(AF_INET, SOCK_STREAM, 0)) < 0) + { + perror("socket"); + return 1; + } + + strcpy(interfaceInfo.ifr_name, DEFAULT_INTERFACE); + myAddr->sin_family = AF_INET; + + if(ioctl(sock, SIOCGIFADDR, &interfaceInfo) != 0) + { + perror("ioctl"); + return 1; + } + + return ntohl(myAddr->sin_addr.s_addr); +} + +unsigned int hash(unsigned int x) +{ + return x % numBlocks; } diff --git a/Robust/src/Runtime/DSTM/interface/dht.h b/Robust/src/Runtime/DSTM/interface/dht.h index ac6e7489..886c7871 100644 --- a/Robust/src/Runtime/DSTM/interface/dht.h +++ b/Robust/src/Runtime/DSTM/interface/dht.h @@ -1,58 +1,21 @@ #ifndef _DHT_H #define _DHT_H -#define INIT_NUM_BLOCKS 16 - -//messages -#define DHT_INSERT 1 -#define DHT_REMOVE 2 -#define DHT_SEARCH 3 -#define DHT_ACK 4 -#define DHT_JOIN 5 -#define DHT_LEAVE 6 -#define DHT_REBUILD 7 -//etc... - -struct hostData { - unsigned int ipAddr; - unsigned int maxKeyCapacity; - struct hostData *next; -}; - -struct dhtInsertMsg { - unsigned char msgType; - unsigned int unused:12; - unsigned int key; - unsigned int val; -}; - -struct dhtRemoveMsg { - unsigned char msgType; - unsigned int unused:12; - unsigned int key; -}; - -struct dhtSearchMsg { - unsigned char msgType; - unsigned int unused:12; - unsigned int key; -}; - -struct dhtJoinMsg { - unsigned char msgType; - unsigned int unused:12; - struct hostData newHost; -}; +#define DHT_NO_KEY_LIMIT 0xFFFFFFFF //called by host which joins (or starts) the system -void dhtInit(); +void dhtInit(unsigned int maxKeyCapaciy); //exit system, cleanup void dhtExit(); //called by whoever performs the creation, move, deletion + +//returns 0 if successful, -1 if an error occurred int dhtInsert(unsigned int key, unsigned int val); +//returns 0 if successful, -1 if an error occurred int dhtRemove(unsigned int key); -int dhtSearch(unsigned int key); +//returns 0 if successful and copies val into *val, 1 if key not found, -1 if an error occurred +int dhtSearch(unsigned int key, unsigned int *val); #endif diff --git a/Robust/src/Runtime/DSTM/interface/testdht.c b/Robust/src/Runtime/DSTM/interface/testdht.c index 99368b10..60c2b702 100644 --- a/Robust/src/Runtime/DSTM/interface/testdht.c +++ b/Robust/src/Runtime/DSTM/interface/testdht.c @@ -1,21 +1,157 @@ #include #include "dht.h" +#include "clookup.h" + +#define NUM_ITEMS 1000 int main() { - int i; + unsigned int key; + unsigned int val; + unsigned int vals[NUM_ITEMS]; + int retval; + int error; + chashtable_t *localHash; + + dhtInit(DHT_NO_KEY_LIMIT); + localHash = chashCreate(HASH_SIZE, LOADFACTOR); + srandom(time(0)); + + for (key = 0; key < NUM_ITEMS; key++) + { + vals[key] = random(); + } + + vals[NUM_ITEMS / 2] = 0; + vals[NUM_ITEMS / 3] = 1; + vals[NUM_ITEMS / 4] = 2; + vals[NUM_ITEMS / 5] = 0xFFFFFFFF; + + printf("testing dhtInsert() and dhtSearch()\n"); + + for (key = 0; key < NUM_ITEMS; key++) + { + dhtInsert(key, vals[key]); + } + + error = 0; + for (key = 0; key < NUM_ITEMS; key++) + { + retval = dhtSearch(key, &val); + if (retval == 1) + { + printf("item not found: key = %d, expected val = %d\n", key, vals[key]); + error = 1; + } + else if (retval == -1) + { + printf("internal error: key = %d, expected val = %d\n", key, vals[key]); + error = 1; + } + else if (retval == 0) + { + if (vals[key] != val) + { + printf("unexpected value: key = %d, expected val = %d, val = %d\n", key, vals[key], val); + error = 1; + } + } + } + if (!error) + printf("test completed successfully\n"); + else + printf("one or more errors occurred\n"); + + printf("(this currently fails if key = 0 OR val = 0, due to underlying hash table)\n"); + printf("testing underlying hash table (clookup.h)\n"); - dhtInit(); - sleep(1); + for (key = 0; key < NUM_ITEMS; key++) + { + chashInsert(localHash, key, (void *)vals[key]); + } + + error = 0; + for (key = 0; key < NUM_ITEMS; key++) + { + val = (unsigned int)chashSearch(localHash, key); + if ((void *)val == NULL) + { + printf("item not found: key = %d, expected val = %d\n", key, vals[key]); + error = 1; + } + else + { + if (vals[key] != val) + { + printf("unexpected value: key = %d, expected val = %d, val = %d\n", key, vals[key], val); + error = 1; + } + } + for (key = NUM_ITEMS; key < NUM_ITEMS + 20; key++) + { + val = (unsigned int)chashSearch(localHash, key); + if ((void *)val != NULL) + { + printf("error: returned value for key that wasn't inserted: key = %d, val = %d\n", key, val); + error = 1; + } + } + } + + if (!error) + printf("test completed successfully\n"); + else + printf("one or more errors occurred\n"); + + printf("testing dhtRemove(), removing half of the keys, and verifying that the other half is still there\n"); - for(i = 0; i < 3; i++) + for (key = 0; key < NUM_ITEMS / 2; key++) { - dhtInsert(i, 10-i); - sleep(1); - dhtRemove(i); - sleep(1); - dhtSearch(i); - sleep(1); + dhtRemove(key); } + error = 0; + for (key = 0; key < NUM_ITEMS / 2; key++) + { + retval = dhtSearch(key, &val); + if (retval == 0) + { + printf("error: found removed item: key = %d, val = %d\n", key, val); + error = 1; + } + else if (retval == -1) + { + printf("internal error: key = %d, val = %d\n", key, val); + error = 1; + } + } + for (key = NUM_ITEMS / 2; key < NUM_ITEMS; key++) + { + retval = dhtSearch(key, &val); + if (retval == 1) + { + printf("item not found: key = %d, expected val = %d\n", key, vals[key]); + error = 1; + } + else if (retval == -1) + { + printf("internal error: key = %d, expected val = %d\n", key, vals[key]); + error = 1; + } + else if (retval == 0) + { + if (vals[key] != val) + { + printf("unexpected value: key = %d, expected val = %d, val = %d\n", key, vals[key], val); + error = 1; + } + } + } + + if (!error) + printf("test completed successfully\n"); + else + printf("one or more errors occurred\n"); + return 0; } + -- 2.34.1