added a tcp server (that serves nothing), plus random changes
authorerubow <erubow>
Sat, 5 May 2007 07:11:51 +0000 (07:11 +0000)
committererubow <erubow>
Sat, 5 May 2007 07:11:51 +0000 (07:11 +0000)
Robust/src/Runtime/DSTM/interface/dht.c
Robust/src/Runtime/DSTM/interface/testdht.c

index 9253ba1975925ac7fcd4902fb8ab8a4a57e44d9f..17b9dafd1fd65469be281a0902356bb7f30e61e4 100644 (file)
 #include "clookup.h" //this works for now, do we need anything better?
 
 #define BUFFER_SIZE 512 //maximum message size
-#define LISTEN_PORT 2157
+#define UDP_PORT 2157
+#define TCP_PORT 2157
+#define BACKLOG 10 //max pending tcp connections
 #define TIMEOUT_MS 500
 #define MAX_RETRIES 3
-#define INIT_NUM_BLOCKS 16
+#define INIT_HOST_ALLOC 16
+#define INIT_BLOCK_NUM 64
 #define DEFAULT_INTERFACE "eth0"
 
-//general commands
-#define INSERT_COMMAND 1
-#define REMOVE_COMMAND 2
-#define SEARCH_COMMAND 3
-//general responses
-#define INSERT_RESPONSE 4
-#define REMOVE_RESPONSE 5
-#define SEARCH_RESPONSE 6
-
-//#define JOIN
-//#define LEAVE
-//reserved for leader
-//#define REBUILD
+enum {
+       INSERT_COMMAND,
+       REMOVE_COMMAND,
+       SEARCH_COMMAND,
+       FIND_LEADER_COMMAND,
+       INSERT_RESPONSE,
+       REMOVE_RESPONSE,
+       SEARCH_RESPONSE,
+       FIND_LEADER_RESPONSE
+};
 
-//etc...
 
 //status codes
-#define INSERT_OK 1
-#define INSERT_ERROR 2
-#define REMOVE_OK 3
-#define REMOVE_ERROR 4
-#define KEY_FOUND 5
-#define KEY_NOT_FOUND 6
-#define NOT_KEY_OWNER 7
+enum {
+       INSERT_OK,
+       INSERT_ERROR,
+       REMOVE_OK,
+       REMOVE_ERROR,
+       KEY_FOUND,
+       KEY_NOT_FOUND,
+       NOT_KEY_OWNER
+};
 
 struct hostData {
        unsigned int ipAddr;
        unsigned int maxKeyCapacity;
-       struct hostData *next;
 };
 
 struct insertCmd {
-       unsigned char msgType;
-       unsigned int unused:12;
+       unsigned int msgType;
        unsigned int key;
        unsigned int val;
 };
 
 struct removeCmd {
-       unsigned char msgType;
-       unsigned int unused:12;
+       unsigned int msgType;
        unsigned int key;
 };
 
 struct searchCmd {
-       unsigned char msgType;
-       unsigned int unused:12;
+       unsigned int msgType;
        unsigned int key;
 };
 
 struct insertRes {
-       unsigned char msgType;
-       unsigned int status:12;
+       unsigned int msgType;
+       unsigned int status;
 };
 
 struct removeRes {
-       unsigned char msgType;
-       unsigned int status:12;
+       unsigned int msgType;
+       unsigned int status;
 };
 
 struct searchRes {
-       unsigned char msgType;
-       unsigned int status:12;
+       unsigned int msgType;
+       unsigned int status;
        unsigned int val;
 };
 
-/*struct joinMsg {
-       unsigned char msgType;
-       unsigned int unused:12;
-       struct hostData newHost;
-};*/
 
 //TODO: leave message, rebuild message...
 
+struct hostData myHostData;
 unsigned int numHosts;
-struct hostData *hostList;
-struct hostData *myHostData;
+struct hostData *hostArray;
+unsigned int hostArraySize;
 unsigned int numBlocks;
-struct hostData **blockOwner;
-
+unsigned int *blockOwnerArray;
+unsigned int blockOwnerArraySize;
 
 unsigned int getMyIpAddr();
-void *dhtListen();
+void *udpListen();
+void *tcpListen();
+void *tcpAccept(void *);
 //returns number of bytes received in resBuffer, or -1 if an error occurred
-int sendWaitForResponse(unsigned int dest_ip, unsigned short dest_port, void *msg, unsigned int msglen, void *resBuffer, unsigned int resBufferSize, unsigned int timeout, unsigned int numRetries);
+int udpSendWaitForResponse(unsigned int dest_ip, unsigned short dest_port, void *msg, unsigned int msglen, void *resBuffer, unsigned int resBufferSize, unsigned int timeout, unsigned int numRetries);
 int sendNoWait(unsigned int dest_ip, unsigned short dest_port, void *msg, unsigned int msglen);
 unsigned int getKeyOwner(unsigned int key);
 unsigned int hash(unsigned int x);
 
 void dhtInit(unsigned int maxKeyCapacity)
 {
+       unsigned int myMessage;
+       int bytesReceived;
        int i;
 
-       myHostData = malloc(sizeof(struct hostData));
-       myHostData->ipAddr = getMyIpAddr();
-       myHostData->maxKeyCapacity;
-       myHostData->next = NULL;
+       myHostData.ipAddr = getMyIpAddr();
+       myHostData.maxKeyCapacity = maxKeyCapacity;
 
+       
 
-       //announce presence (udp), get data structures from leader (leader initiates tcp transfer)
-       //if no response, I am the first
-       hostList = myHostData;
-       numBlocks = INIT_NUM_BLOCKS;
-       blockOwner = malloc(numBlocks * sizeof(struct hostData));
+       //announce presence (udp broadcast), get data structures from leader (leader initiates tcp transfer)
+       
+
+//if no response, I am the first
+
+       numHosts = 1;
+       hostArray = malloc(INIT_HOST_ALLOC * sizeof(struct hostData));
+       hostArray[0] = myHostData;
+
+       numBlocks = INIT_BLOCK_NUM;
+       blockOwnerArray = malloc(numBlocks * sizeof(unsigned short));
        for (i = 0; i < numBlocks; i++)
        {
-               blockOwner[i] = myHostData;
+               blockOwnerArray[i] = 0;
        }
        
        //otherwise, scan array and choose blocks to take over
@@ -137,8 +139,9 @@ void dhtInit(unsigned int maxKeyCapacity)
        //notify (the leader or everybody?) of ownership changes
        
        //start server (udp)
-       pthread_t threadListen;
-       pthread_create(&threadListen, NULL, dhtListen, NULL);
+       pthread_t threadUdpListen, threadTcpListen;
+       pthread_create(&threadUdpListen, NULL, udpListen, NULL);
+       pthread_create(&threadTcpListen, NULL, tcpListen, NULL);
        
        return;
 }
@@ -151,7 +154,6 @@ void dhtExit()
 int dhtInsert(unsigned int key, unsigned int val)
 {
        unsigned int dest_ip = getKeyOwner(key);
-       unsigned short dest_port = LISTEN_PORT;
        struct insertCmd myMessage;
        struct insertRes response;
        int bytesReceived;
@@ -160,7 +162,7 @@ int dhtInsert(unsigned int key, unsigned int val)
        myMessage.key = key;
        myMessage.val = val;
        
-       bytesReceived = sendWaitForResponse(dest_ip, dest_port, (void *)&myMessage, sizeof(struct insertCmd), (void *)&response, sizeof(struct insertRes), TIMEOUT_MS, MAX_RETRIES);
+       bytesReceived = udpSendWaitForResponse(dest_ip, UDP_PORT, (void *)&myMessage, sizeof(struct insertCmd), (void *)&response, sizeof(struct insertRes), TIMEOUT_MS, MAX_RETRIES);
        if (bytesReceived == sizeof(struct insertRes))
        {
                if (response.msgType == INSERT_RESPONSE)
@@ -177,7 +179,6 @@ int dhtInsert(unsigned int key, unsigned int val)
 int dhtRemove(unsigned int key)
 {
        unsigned int dest_ip = getKeyOwner(key);
-       unsigned short dest_port = LISTEN_PORT;
        struct removeCmd myMessage;
        struct removeRes response;
        int bytesReceived;
@@ -185,7 +186,7 @@ int dhtRemove(unsigned int key)
        myMessage.msgType = REMOVE_COMMAND;
        myMessage.key = key;
 
-       bytesReceived = sendWaitForResponse(dest_ip, dest_port, (void *)&myMessage, sizeof(struct removeCmd), (void *)&response, sizeof(struct removeRes), TIMEOUT_MS, MAX_RETRIES);
+       bytesReceived = udpSendWaitForResponse(dest_ip, UDP_PORT, (void *)&myMessage, sizeof(struct removeCmd), (void *)&response, sizeof(struct removeRes), TIMEOUT_MS, MAX_RETRIES);
        if (bytesReceived == sizeof(struct removeRes))
        {
                if (response.msgType == REMOVE_RESPONSE)
@@ -202,7 +203,6 @@ int dhtRemove(unsigned int key)
 int dhtSearch(unsigned int key, unsigned int *val)
 {
        unsigned int dest_ip = getKeyOwner(key);
-       unsigned short dest_port = LISTEN_PORT;
        struct searchCmd myMessage;
        struct searchRes response;
        int bytesReceived;
@@ -210,7 +210,7 @@ int dhtSearch(unsigned int key, unsigned int *val)
        myMessage.msgType = SEARCH_COMMAND;
        myMessage.key = key;
 
-       bytesReceived = sendWaitForResponse(dest_ip, dest_port, (void *)&myMessage, sizeof(struct searchCmd), (void *)&response, sizeof(struct searchRes), TIMEOUT_MS, MAX_RETRIES);
+       bytesReceived = udpSendWaitForResponse(dest_ip, UDP_PORT, (void *)&myMessage, sizeof(struct searchCmd), (void *)&response, sizeof(struct searchRes), TIMEOUT_MS, MAX_RETRIES);
        if (bytesReceived == sizeof(struct searchRes))
        {
                if (response.msgType == SEARCH_RESPONSE)
@@ -231,11 +231,13 @@ int dhtSearch(unsigned int key, unsigned int *val)
        return -1; //this function should be robust enough to always return 0 or 1
 }
 
-//helper functions
-void *dhtListen()
+
+
+//use UDP for messages that are frequent and short
+void *udpListen()
 {
-       struct sockaddr_in my_addr;
-       struct sockaddr_in client_addr;
+       struct sockaddr_in myAddr;
+       struct sockaddr_in clientAddr;
        int sock;
        socklen_t socklen = sizeof(struct sockaddr_in);
        char buffer[BUFFER_SIZE];
@@ -251,26 +253,26 @@ void *dhtListen()
 
        chashtable_t *myHashTable = chashCreate(HASH_SIZE, LOADFACTOR);
 
-       if ((sock = socket(PF_INET, SOCK_DGRAM, IPPROTO_UDP)) == -1)
+       if ((sock = socket(AF_INET, SOCK_DGRAM, IPPROTO_UDP)) == -1)
        {
                perror("socket()");
-               exit(1);
+               pthread_exit(NULL);
        }
        
-       bzero(&my_addr, socklen);
-       my_addr.sin_family=AF_INET;
-       my_addr.sin_addr.s_addr=INADDR_ANY;
-       my_addr.sin_port=htons(LISTEN_PORT);
+       bzero(&myAddr, socklen);
+       myAddr.sin_family=AF_INET;
+       myAddr.sin_addr.s_addr=INADDR_ANY;
+       myAddr.sin_port=htons(UDP_PORT);
 
-       if (bind(sock, (struct sockaddr *)&my_addr, socklen) == -1)
+       if (bind(sock, (struct sockaddr *)&myAddr, socklen) == -1)
        {
                perror("bind()");
-               exit(1);
+               pthread_exit(NULL);
        }
 //     printf("listening...\n");
        while(1)
        {
-               if ((bytesReceived = recvfrom(sock, buffer, BUFFER_SIZE, 0, (struct sockaddr *)&client_addr, &socklen)) == -1)
+               if ((bytesReceived = recvfrom(sock, buffer, BUFFER_SIZE, 0, (struct sockaddr *)&clientAddr, &socklen)) == -1)
                {
                        perror("recvfrom()");
                        break;
@@ -283,7 +285,7 @@ void *dhtListen()
                gettimeofday(&now, NULL);
 //             printf("message received:%ds,%dus\n", now.tv_sec, now.tv_usec);
 
-//             printf("Received %d bytes from %x:%d\n", bytesReceived, client_addr.sin_addr.s_addr, client_addr.sin_port);
+//             printf("Received %d bytes from %x:%d\n", bytesReceived, clientAddr.sin_addr.s_addr, clientAddr.sin_port);
                switch (buffer[0])
                {
                        case INSERT_COMMAND:
@@ -296,7 +298,7 @@ void *dhtListen()
 //                             printf("Insert: key=%d, val=%d\n", insertCmdPtr->key, insertCmdPtr->val);
                                insertResPtr = (struct insertRes *)replyBuffer;
                                insertResPtr->msgType = INSERT_RESPONSE;
-                               if (getKeyOwner(insertCmdPtr->key) == myHostData->ipAddr)
+                               if (getKeyOwner(insertCmdPtr->key) == myHostData.ipAddr)
                                {
                                        //note: casting val to void * in order to conform to API
                                        if(chashInsert(myHashTable, insertCmdPtr->key, (void *)insertCmdPtr->val) == 0)
@@ -308,7 +310,7 @@ void *dhtListen()
                                {
                                        insertResPtr->status = NOT_KEY_OWNER;;
                                }
-                               sendto(sock, (void *)insertResPtr, sizeof(struct insertRes), 0, (struct sockaddr *)&client_addr, socklen);
+                               sendto(sock, (void *)insertResPtr, sizeof(struct insertRes), 0, (struct sockaddr *)&clientAddr, socklen);
                                break;
                        case REMOVE_COMMAND:
                                if (bytesReceived != sizeof(struct removeCmd))
@@ -320,7 +322,7 @@ void *dhtListen()
 //                             printf("Remove: key=%d\n", removeCmdPtr->key);
                                removeResPtr = (struct removeRes *)replyBuffer;
                                removeResPtr->msgType = REMOVE_RESPONSE;
-                               if (getKeyOwner(removeCmdPtr->key) == myHostData->ipAddr)
+                               if (getKeyOwner(removeCmdPtr->key) == myHostData.ipAddr)
                                {
                                        //note: casting val to void * in order to conform to API
                                        if(chashRemove(myHashTable, removeCmdPtr->key) == 0)
@@ -332,7 +334,7 @@ void *dhtListen()
                                {
                                        removeResPtr->status = NOT_KEY_OWNER;
                                }
-                               sendto(sock, (void *)removeResPtr, sizeof(struct removeRes), 0, (struct sockaddr *)&client_addr, socklen);
+                               sendto(sock, (void *)removeResPtr, sizeof(struct removeRes), 0, (struct sockaddr *)&clientAddr, socklen);
                                break;
                        case SEARCH_COMMAND:
                                if (bytesReceived != sizeof(struct searchCmd))
@@ -344,7 +346,7 @@ void *dhtListen()
 //                             printf("Search: key=%d\n",searchCmdPtr->key);
                                searchResPtr = (struct searchRes *)replyBuffer;
                                searchResPtr->msgType = SEARCH_RESPONSE;
-                               if (getKeyOwner(searchCmdPtr->key) == myHostData->ipAddr)
+                               if (getKeyOwner(searchCmdPtr->key) == myHostData.ipAddr)
                                {
                                        //note: casting val to void * in order to conform to API
                                        if((searchResPtr->val = (unsigned int)chashSearch(myHashTable, searchCmdPtr->key)) == 0)
@@ -356,7 +358,7 @@ void *dhtListen()
                                {
                                        searchResPtr->status = NOT_KEY_OWNER;
                                }
-                               sendto(sock, (void *)searchResPtr, sizeof(struct searchRes), 0, (struct sockaddr *)&client_addr, socklen);
+                               sendto(sock, (void *)searchResPtr, sizeof(struct searchRes), 0, (struct sockaddr *)&clientAddr, socklen);
                                break;
                                //just ignore anything else
 //                     default:
@@ -365,7 +367,7 @@ void *dhtListen()
        }
 }
 
-int sendWaitForResponse(unsigned int dest_ip, unsigned short dest_port, void *msg, unsigned int msglen, void *resBuffer, unsigned int resBufferSize, unsigned int timeout, unsigned int numRetries)
+int udpSendWaitForResponse(unsigned int dest_ip, unsigned short dest_port, void *msg, unsigned int msglen, void *resBuffer, unsigned int resBufferSize, unsigned int timeout, unsigned int numRetries)
 {
        struct sockaddr_in server_addr;
        struct sockaddr_in ack_addr;
@@ -420,9 +422,67 @@ int sendWaitForResponse(unsigned int dest_ip, unsigned short dest_port, void *ms
        return -1;
 }
 
+// use TCP for potentially large and/or important data transfer
+void *tcpListen()
+{
+       struct sockaddr_in myAddr;
+       struct sockaddr_in clientAddr;
+       int sockListen, sockAccept;
+       socklen_t socklen = sizeof(struct sockaddr_in);
+       pthread_t threadTcpAccept;
+
+       sockListen = socket(AF_INET, SOCK_STREAM, 0);
+       if (sockListen == -1)
+       {
+               perror("socket()");
+               pthread_exit(NULL);
+       }
+
+       myAddr.sin_family = AF_INET;
+       myAddr.sin_port = htons(TCP_PORT);
+       myAddr.sin_addr.s_addr = INADDR_ANY;
+       memset(&(myAddr.sin_zero), '\0', 8);
+
+       if (bind(sockListen, (struct sockaddr *)&myAddr, socklen) == -1)
+       {
+               perror("socket()");
+               pthread_exit(NULL);
+       }
+
+       if (listen(sockListen, BACKLOG) == -1)
+       {
+               perror("listen()");
+               pthread_exit(NULL);
+       }
+
+       while(1)
+       {
+               sockAccept = accept(sockListen, (struct sockaddr *)&clientAddr, &socklen);
+               pthread_create(&threadTcpAccept, NULL, tcpAccept, (void *)sockAccept);
+       }
+}
+
+void *tcpAccept(void *arg)
+{
+       int sockAccept = (int)arg;
+       
+       printf("accepted tcp connection, file descriptor: %d\n", sockAccept);
+
+       sleep(30);
+
+       if (close(sockAccept) == -1)
+       {
+               perror("close()");
+       }
+
+       printf("closed tcp connection, file descriptor: %d\n", sockAccept);
+
+       pthread_exit(NULL);
+}
+
 unsigned int getKeyOwner(unsigned int key)
 {
-       return blockOwner[hash(key)]->ipAddr;
+       return hostArray[blockOwnerArray[hash(key)]].ipAddr;
 }
 
 unsigned int getMyIpAddr()
@@ -435,7 +495,7 @@ unsigned int getMyIpAddr()
 
        if((sock = socket(AF_INET, SOCK_STREAM, 0)) < 0)
        {
-               perror("socket");
+               perror("socket()");
                return 1;
        }
 
@@ -444,7 +504,7 @@ unsigned int getMyIpAddr()
        
        if(ioctl(sock, SIOCGIFADDR, &interfaceInfo) != 0)
        {
-               perror("ioctl");
+               perror("ioctl()");
                return 1;
        }
 
index 60c2b702c49d54033d1f640c72c3994365cdfe4f..dbff22659ea52e42a4a00f9064716463df716e23 100644 (file)
@@ -14,6 +14,7 @@ int main()
        chashtable_t *localHash;
 
        dhtInit(DHT_NO_KEY_LIMIT);
+
        localHash = chashCreate(HASH_SIZE, LOADFACTOR);
        srandom(time(0));
 
@@ -152,6 +153,8 @@ int main()
        else
                printf("one or more errors occurred\n");
 
+       sleep(60);
+
        return 0;
 }