From: erubow Date: Sat, 5 May 2007 07:11:51 +0000 (+0000) Subject: added a tcp server (that serves nothing), plus random changes X-Git-Tag: preEdgeChange~596 X-Git-Url: http://demsky.eecs.uci.edu/git/?a=commitdiff_plain;h=68f989a37491a1c1a703eb4529a09443e9ed1c5b;p=IRC.git added a tcp server (that serves nothing), plus random changes --- diff --git a/Robust/src/Runtime/DSTM/interface/dht.c b/Robust/src/Runtime/DSTM/interface/dht.c index 9253ba19..17b9dafd 100644 --- a/Robust/src/Runtime/DSTM/interface/dht.c +++ b/Robust/src/Runtime/DSTM/interface/dht.c @@ -17,119 +17,121 @@ #include "clookup.h" //this works for now, do we need anything better? #define BUFFER_SIZE 512 //maximum message size -#define LISTEN_PORT 2157 +#define UDP_PORT 2157 +#define TCP_PORT 2157 +#define BACKLOG 10 //max pending tcp connections #define TIMEOUT_MS 500 #define MAX_RETRIES 3 -#define INIT_NUM_BLOCKS 16 +#define INIT_HOST_ALLOC 16 +#define INIT_BLOCK_NUM 64 #define DEFAULT_INTERFACE "eth0" -//general commands -#define INSERT_COMMAND 1 -#define REMOVE_COMMAND 2 -#define SEARCH_COMMAND 3 -//general responses -#define INSERT_RESPONSE 4 -#define REMOVE_RESPONSE 5 -#define SEARCH_RESPONSE 6 - -//#define JOIN -//#define LEAVE -//reserved for leader -//#define REBUILD +enum { + INSERT_COMMAND, + REMOVE_COMMAND, + SEARCH_COMMAND, + FIND_LEADER_COMMAND, + INSERT_RESPONSE, + REMOVE_RESPONSE, + SEARCH_RESPONSE, + FIND_LEADER_RESPONSE +}; -//etc... //status codes -#define INSERT_OK 1 -#define INSERT_ERROR 2 -#define REMOVE_OK 3 -#define REMOVE_ERROR 4 -#define KEY_FOUND 5 -#define KEY_NOT_FOUND 6 -#define NOT_KEY_OWNER 7 +enum { + INSERT_OK, + INSERT_ERROR, + REMOVE_OK, + REMOVE_ERROR, + KEY_FOUND, + KEY_NOT_FOUND, + NOT_KEY_OWNER +}; struct hostData { unsigned int ipAddr; unsigned int maxKeyCapacity; - struct hostData *next; }; struct insertCmd { - unsigned char msgType; - unsigned int unused:12; + unsigned int msgType; unsigned int key; unsigned int val; }; struct removeCmd { - unsigned char msgType; - unsigned int unused:12; + unsigned int msgType; unsigned int key; }; struct searchCmd { - unsigned char msgType; - unsigned int unused:12; + unsigned int msgType; unsigned int key; }; struct insertRes { - unsigned char msgType; - unsigned int status:12; + unsigned int msgType; + unsigned int status; }; struct removeRes { - unsigned char msgType; - unsigned int status:12; + unsigned int msgType; + unsigned int status; }; struct searchRes { - unsigned char msgType; - unsigned int status:12; + unsigned int msgType; + unsigned int status; unsigned int val; }; -/*struct joinMsg { - unsigned char msgType; - unsigned int unused:12; - struct hostData newHost; -};*/ //TODO: leave message, rebuild message... +struct hostData myHostData; unsigned int numHosts; -struct hostData *hostList; -struct hostData *myHostData; +struct hostData *hostArray; +unsigned int hostArraySize; unsigned int numBlocks; -struct hostData **blockOwner; - +unsigned int *blockOwnerArray; +unsigned int blockOwnerArraySize; unsigned int getMyIpAddr(); -void *dhtListen(); +void *udpListen(); +void *tcpListen(); +void *tcpAccept(void *); //returns number of bytes received in resBuffer, or -1 if an error occurred -int sendWaitForResponse(unsigned int dest_ip, unsigned short dest_port, void *msg, unsigned int msglen, void *resBuffer, unsigned int resBufferSize, unsigned int timeout, unsigned int numRetries); +int udpSendWaitForResponse(unsigned int dest_ip, unsigned short dest_port, void *msg, unsigned int msglen, void *resBuffer, unsigned int resBufferSize, unsigned int timeout, unsigned int numRetries); int sendNoWait(unsigned int dest_ip, unsigned short dest_port, void *msg, unsigned int msglen); unsigned int getKeyOwner(unsigned int key); unsigned int hash(unsigned int x); void dhtInit(unsigned int maxKeyCapacity) { + unsigned int myMessage; + int bytesReceived; int i; - myHostData = malloc(sizeof(struct hostData)); - myHostData->ipAddr = getMyIpAddr(); - myHostData->maxKeyCapacity; - myHostData->next = NULL; + myHostData.ipAddr = getMyIpAddr(); + myHostData.maxKeyCapacity = maxKeyCapacity; + - //announce presence (udp), get data structures from leader (leader initiates tcp transfer) - //if no response, I am the first - hostList = myHostData; - numBlocks = INIT_NUM_BLOCKS; - blockOwner = malloc(numBlocks * sizeof(struct hostData)); + //announce presence (udp broadcast), get data structures from leader (leader initiates tcp transfer) + + +//if no response, I am the first + + numHosts = 1; + hostArray = malloc(INIT_HOST_ALLOC * sizeof(struct hostData)); + hostArray[0] = myHostData; + + numBlocks = INIT_BLOCK_NUM; + blockOwnerArray = malloc(numBlocks * sizeof(unsigned short)); for (i = 0; i < numBlocks; i++) { - blockOwner[i] = myHostData; + blockOwnerArray[i] = 0; } //otherwise, scan array and choose blocks to take over @@ -137,8 +139,9 @@ void dhtInit(unsigned int maxKeyCapacity) //notify (the leader or everybody?) of ownership changes //start server (udp) - pthread_t threadListen; - pthread_create(&threadListen, NULL, dhtListen, NULL); + pthread_t threadUdpListen, threadTcpListen; + pthread_create(&threadUdpListen, NULL, udpListen, NULL); + pthread_create(&threadTcpListen, NULL, tcpListen, NULL); return; } @@ -151,7 +154,6 @@ void dhtExit() int dhtInsert(unsigned int key, unsigned int val) { unsigned int dest_ip = getKeyOwner(key); - unsigned short dest_port = LISTEN_PORT; struct insertCmd myMessage; struct insertRes response; int bytesReceived; @@ -160,7 +162,7 @@ int dhtInsert(unsigned int key, unsigned int val) myMessage.key = key; myMessage.val = val; - bytesReceived = sendWaitForResponse(dest_ip, dest_port, (void *)&myMessage, sizeof(struct insertCmd), (void *)&response, sizeof(struct insertRes), TIMEOUT_MS, MAX_RETRIES); + bytesReceived = udpSendWaitForResponse(dest_ip, UDP_PORT, (void *)&myMessage, sizeof(struct insertCmd), (void *)&response, sizeof(struct insertRes), TIMEOUT_MS, MAX_RETRIES); if (bytesReceived == sizeof(struct insertRes)) { if (response.msgType == INSERT_RESPONSE) @@ -177,7 +179,6 @@ int dhtInsert(unsigned int key, unsigned int val) int dhtRemove(unsigned int key) { unsigned int dest_ip = getKeyOwner(key); - unsigned short dest_port = LISTEN_PORT; struct removeCmd myMessage; struct removeRes response; int bytesReceived; @@ -185,7 +186,7 @@ int dhtRemove(unsigned int key) myMessage.msgType = REMOVE_COMMAND; myMessage.key = key; - bytesReceived = sendWaitForResponse(dest_ip, dest_port, (void *)&myMessage, sizeof(struct removeCmd), (void *)&response, sizeof(struct removeRes), TIMEOUT_MS, MAX_RETRIES); + bytesReceived = udpSendWaitForResponse(dest_ip, UDP_PORT, (void *)&myMessage, sizeof(struct removeCmd), (void *)&response, sizeof(struct removeRes), TIMEOUT_MS, MAX_RETRIES); if (bytesReceived == sizeof(struct removeRes)) { if (response.msgType == REMOVE_RESPONSE) @@ -202,7 +203,6 @@ int dhtRemove(unsigned int key) int dhtSearch(unsigned int key, unsigned int *val) { unsigned int dest_ip = getKeyOwner(key); - unsigned short dest_port = LISTEN_PORT; struct searchCmd myMessage; struct searchRes response; int bytesReceived; @@ -210,7 +210,7 @@ int dhtSearch(unsigned int key, unsigned int *val) myMessage.msgType = SEARCH_COMMAND; myMessage.key = key; - bytesReceived = sendWaitForResponse(dest_ip, dest_port, (void *)&myMessage, sizeof(struct searchCmd), (void *)&response, sizeof(struct searchRes), TIMEOUT_MS, MAX_RETRIES); + bytesReceived = udpSendWaitForResponse(dest_ip, UDP_PORT, (void *)&myMessage, sizeof(struct searchCmd), (void *)&response, sizeof(struct searchRes), TIMEOUT_MS, MAX_RETRIES); if (bytesReceived == sizeof(struct searchRes)) { if (response.msgType == SEARCH_RESPONSE) @@ -231,11 +231,13 @@ int dhtSearch(unsigned int key, unsigned int *val) return -1; //this function should be robust enough to always return 0 or 1 } -//helper functions -void *dhtListen() + + +//use UDP for messages that are frequent and short +void *udpListen() { - struct sockaddr_in my_addr; - struct sockaddr_in client_addr; + struct sockaddr_in myAddr; + struct sockaddr_in clientAddr; int sock; socklen_t socklen = sizeof(struct sockaddr_in); char buffer[BUFFER_SIZE]; @@ -251,26 +253,26 @@ void *dhtListen() chashtable_t *myHashTable = chashCreate(HASH_SIZE, LOADFACTOR); - if ((sock = socket(PF_INET, SOCK_DGRAM, IPPROTO_UDP)) == -1) + if ((sock = socket(AF_INET, SOCK_DGRAM, IPPROTO_UDP)) == -1) { perror("socket()"); - exit(1); + pthread_exit(NULL); } - bzero(&my_addr, socklen); - my_addr.sin_family=AF_INET; - my_addr.sin_addr.s_addr=INADDR_ANY; - my_addr.sin_port=htons(LISTEN_PORT); + bzero(&myAddr, socklen); + myAddr.sin_family=AF_INET; + myAddr.sin_addr.s_addr=INADDR_ANY; + myAddr.sin_port=htons(UDP_PORT); - if (bind(sock, (struct sockaddr *)&my_addr, socklen) == -1) + if (bind(sock, (struct sockaddr *)&myAddr, socklen) == -1) { perror("bind()"); - exit(1); + pthread_exit(NULL); } // printf("listening...\n"); while(1) { - if ((bytesReceived = recvfrom(sock, buffer, BUFFER_SIZE, 0, (struct sockaddr *)&client_addr, &socklen)) == -1) + if ((bytesReceived = recvfrom(sock, buffer, BUFFER_SIZE, 0, (struct sockaddr *)&clientAddr, &socklen)) == -1) { perror("recvfrom()"); break; @@ -283,7 +285,7 @@ void *dhtListen() gettimeofday(&now, NULL); // printf("message received:%ds,%dus\n", now.tv_sec, now.tv_usec); -// printf("Received %d bytes from %x:%d\n", bytesReceived, client_addr.sin_addr.s_addr, client_addr.sin_port); +// printf("Received %d bytes from %x:%d\n", bytesReceived, clientAddr.sin_addr.s_addr, clientAddr.sin_port); switch (buffer[0]) { case INSERT_COMMAND: @@ -296,7 +298,7 @@ void *dhtListen() // printf("Insert: key=%d, val=%d\n", insertCmdPtr->key, insertCmdPtr->val); insertResPtr = (struct insertRes *)replyBuffer; insertResPtr->msgType = INSERT_RESPONSE; - if (getKeyOwner(insertCmdPtr->key) == myHostData->ipAddr) + if (getKeyOwner(insertCmdPtr->key) == myHostData.ipAddr) { //note: casting val to void * in order to conform to API if(chashInsert(myHashTable, insertCmdPtr->key, (void *)insertCmdPtr->val) == 0) @@ -308,7 +310,7 @@ void *dhtListen() { insertResPtr->status = NOT_KEY_OWNER;; } - sendto(sock, (void *)insertResPtr, sizeof(struct insertRes), 0, (struct sockaddr *)&client_addr, socklen); + sendto(sock, (void *)insertResPtr, sizeof(struct insertRes), 0, (struct sockaddr *)&clientAddr, socklen); break; case REMOVE_COMMAND: if (bytesReceived != sizeof(struct removeCmd)) @@ -320,7 +322,7 @@ void *dhtListen() // printf("Remove: key=%d\n", removeCmdPtr->key); removeResPtr = (struct removeRes *)replyBuffer; removeResPtr->msgType = REMOVE_RESPONSE; - if (getKeyOwner(removeCmdPtr->key) == myHostData->ipAddr) + if (getKeyOwner(removeCmdPtr->key) == myHostData.ipAddr) { //note: casting val to void * in order to conform to API if(chashRemove(myHashTable, removeCmdPtr->key) == 0) @@ -332,7 +334,7 @@ void *dhtListen() { removeResPtr->status = NOT_KEY_OWNER; } - sendto(sock, (void *)removeResPtr, sizeof(struct removeRes), 0, (struct sockaddr *)&client_addr, socklen); + sendto(sock, (void *)removeResPtr, sizeof(struct removeRes), 0, (struct sockaddr *)&clientAddr, socklen); break; case SEARCH_COMMAND: if (bytesReceived != sizeof(struct searchCmd)) @@ -344,7 +346,7 @@ void *dhtListen() // printf("Search: key=%d\n",searchCmdPtr->key); searchResPtr = (struct searchRes *)replyBuffer; searchResPtr->msgType = SEARCH_RESPONSE; - if (getKeyOwner(searchCmdPtr->key) == myHostData->ipAddr) + if (getKeyOwner(searchCmdPtr->key) == myHostData.ipAddr) { //note: casting val to void * in order to conform to API if((searchResPtr->val = (unsigned int)chashSearch(myHashTable, searchCmdPtr->key)) == 0) @@ -356,7 +358,7 @@ void *dhtListen() { searchResPtr->status = NOT_KEY_OWNER; } - sendto(sock, (void *)searchResPtr, sizeof(struct searchRes), 0, (struct sockaddr *)&client_addr, socklen); + sendto(sock, (void *)searchResPtr, sizeof(struct searchRes), 0, (struct sockaddr *)&clientAddr, socklen); break; //just ignore anything else // default: @@ -365,7 +367,7 @@ void *dhtListen() } } -int sendWaitForResponse(unsigned int dest_ip, unsigned short dest_port, void *msg, unsigned int msglen, void *resBuffer, unsigned int resBufferSize, unsigned int timeout, unsigned int numRetries) +int udpSendWaitForResponse(unsigned int dest_ip, unsigned short dest_port, void *msg, unsigned int msglen, void *resBuffer, unsigned int resBufferSize, unsigned int timeout, unsigned int numRetries) { struct sockaddr_in server_addr; struct sockaddr_in ack_addr; @@ -420,9 +422,67 @@ int sendWaitForResponse(unsigned int dest_ip, unsigned short dest_port, void *ms return -1; } +// use TCP for potentially large and/or important data transfer +void *tcpListen() +{ + struct sockaddr_in myAddr; + struct sockaddr_in clientAddr; + int sockListen, sockAccept; + socklen_t socklen = sizeof(struct sockaddr_in); + pthread_t threadTcpAccept; + + sockListen = socket(AF_INET, SOCK_STREAM, 0); + if (sockListen == -1) + { + perror("socket()"); + pthread_exit(NULL); + } + + myAddr.sin_family = AF_INET; + myAddr.sin_port = htons(TCP_PORT); + myAddr.sin_addr.s_addr = INADDR_ANY; + memset(&(myAddr.sin_zero), '\0', 8); + + if (bind(sockListen, (struct sockaddr *)&myAddr, socklen) == -1) + { + perror("socket()"); + pthread_exit(NULL); + } + + if (listen(sockListen, BACKLOG) == -1) + { + perror("listen()"); + pthread_exit(NULL); + } + + while(1) + { + sockAccept = accept(sockListen, (struct sockaddr *)&clientAddr, &socklen); + pthread_create(&threadTcpAccept, NULL, tcpAccept, (void *)sockAccept); + } +} + +void *tcpAccept(void *arg) +{ + int sockAccept = (int)arg; + + printf("accepted tcp connection, file descriptor: %d\n", sockAccept); + + sleep(30); + + if (close(sockAccept) == -1) + { + perror("close()"); + } + + printf("closed tcp connection, file descriptor: %d\n", sockAccept); + + pthread_exit(NULL); +} + unsigned int getKeyOwner(unsigned int key) { - return blockOwner[hash(key)]->ipAddr; + return hostArray[blockOwnerArray[hash(key)]].ipAddr; } unsigned int getMyIpAddr() @@ -435,7 +495,7 @@ unsigned int getMyIpAddr() if((sock = socket(AF_INET, SOCK_STREAM, 0)) < 0) { - perror("socket"); + perror("socket()"); return 1; } @@ -444,7 +504,7 @@ unsigned int getMyIpAddr() if(ioctl(sock, SIOCGIFADDR, &interfaceInfo) != 0) { - perror("ioctl"); + perror("ioctl()"); return 1; } diff --git a/Robust/src/Runtime/DSTM/interface/testdht.c b/Robust/src/Runtime/DSTM/interface/testdht.c index 60c2b702..dbff2265 100644 --- a/Robust/src/Runtime/DSTM/interface/testdht.c +++ b/Robust/src/Runtime/DSTM/interface/testdht.c @@ -14,6 +14,7 @@ int main() chashtable_t *localHash; dhtInit(DHT_NO_KEY_LIMIT); + localHash = chashCreate(HASH_SIZE, LOADFACTOR); srandom(time(0)); @@ -152,6 +153,8 @@ int main() else printf("one or more errors occurred\n"); + sleep(60); + return 0; }