From 6e2ca7663bc8fa22199e01c92b46d892187fccaa Mon Sep 17 00:00:00 2001 From: bdemsky Date: Mon, 5 May 2008 22:01:03 +0000 Subject: [PATCH] fix annoying race condition on startup --- Robust/src/Runtime/DSTM/interface/dstm.h | 3 +- .../src/Runtime/DSTM/interface/dstmserver.c | 112 +++++++++--------- Robust/src/Runtime/DSTM/interface/trans.c | 9 +- 3 files changed, 65 insertions(+), 59 deletions(-) diff --git a/Robust/src/Runtime/DSTM/interface/dstm.h b/Robust/src/Runtime/DSTM/interface/dstm.h index 6311e213..98f60f86 100644 --- a/Robust/src/Runtime/DSTM/interface/dstm.h +++ b/Robust/src/Runtime/DSTM/interface/dstm.h @@ -235,7 +235,8 @@ void clearObjStore(); // TODO:currently only clears the prefetch cache object st /* end object store */ /* Prototypes for server portion */ -void *dstmListen(); +void *dstmListen(void *); +int startlistening(); void *dstmAccept(void *); int readClientReq(trans_commit_data_t *, int); int processClientReq(fixed_data_t *, trans_commit_data_t *,unsigned int *, char *, void *, unsigned int *, int); diff --git a/Robust/src/Runtime/DSTM/interface/dstmserver.c b/Robust/src/Runtime/DSTM/interface/dstmserver.c index d28eb808..e1ec8c92 100644 --- a/Robust/src/Runtime/DSTM/interface/dstmserver.c +++ b/Robust/src/Runtime/DSTM/interface/dstmserver.c @@ -53,65 +53,67 @@ int dstmInit(void) return 0; } -/* This function starts the thread to listen on a socket - * for tranaction calls */ -void *dstmListen() -{ - int listenfd, acceptfd; - struct sockaddr_in my_addr; - struct sockaddr_in client_addr; - socklen_t addrlength = sizeof(struct sockaddr); - pthread_t thread_dstm_accept; - int i; - int setsockflag=1; - - listenfd = socket(AF_INET, SOCK_STREAM, 0); - if (listenfd == -1) - { - perror("socket"); - exit(1); - } - if (setsockopt(listenfd, SOL_SOCKET, SO_REUSEADDR, &setsockflag, sizeof (setsockflag)) < 0) { - perror("socket"); - exit(1); - } +int startlistening() { + int listenfd; + struct sockaddr_in my_addr; + socklen_t addrlength = sizeof(struct sockaddr); + int setsockflag=1; + + listenfd = socket(AF_INET, SOCK_STREAM, 0); + if (listenfd == -1) { + perror("socket"); + exit(1); + } + + if (setsockopt(listenfd, SOL_SOCKET, SO_REUSEADDR, &setsockflag, sizeof (setsockflag)) < 0) { + perror("socket"); + exit(1); + } #ifdef MAC - if (setsockopt(listenfd, SOL_SOCKET, SO_NOSIGPIPE, &setsockflag, sizeof (setsockflag)) < 0) { - perror("socket"); - exit(1); - } + if (setsockopt(listenfd, SOL_SOCKET, SO_NOSIGPIPE, &setsockflag, sizeof (setsockflag)) < 0) { + perror("socket"); + exit(1); + } #endif + + my_addr.sin_family = AF_INET; + my_addr.sin_port = htons(LISTEN_PORT); + my_addr.sin_addr.s_addr = INADDR_ANY; + memset(&(my_addr.sin_zero), '\0', 8); + + if (bind(listenfd, (struct sockaddr *)&my_addr, addrlength) == -1) { + perror("bind"); + exit(1); + } + + if (listen(listenfd, BACKLOG) == -1) { + perror("listen"); + exit(1); + } + return listenfd; +} - my_addr.sin_family = AF_INET; - my_addr.sin_port = htons(LISTEN_PORT); - my_addr.sin_addr.s_addr = INADDR_ANY; - memset(&(my_addr.sin_zero), '\0', 8); - - if (bind(listenfd, (struct sockaddr *)&my_addr, addrlength) == -1) - { - perror("bind"); - exit(1); - } - - if (listen(listenfd, BACKLOG) == -1) - { - perror("listen"); - exit(1); - } - - printf("Listening on port %d, fd = %d\n", LISTEN_PORT, listenfd); - while(1) - { - int retval; - int flag=1; - acceptfd = accept(listenfd, (struct sockaddr *)&client_addr, &addrlength); - setsockopt(acceptfd, IPPROTO_TCP, TCP_NODELAY, (char *) &flag, sizeof(flag)); - do { - retval=pthread_create(&thread_dstm_accept, NULL, dstmAccept, (void *)acceptfd); - } while(retval!=0); - pthread_detach(thread_dstm_accept); - } +/* This function starts the thread to listen on a socket + * for tranaction calls */ +void *dstmListen(void *lfd) { + int listenfd=(int)lfd; + int acceptfd; + struct sockaddr_in client_addr; + socklen_t addrlength = sizeof(struct sockaddr); + pthread_t thread_dstm_accept; + + printf("Listening on port %d, fd = %d\n", LISTEN_PORT, listenfd); + while(1) { + int retval; + int flag=1; + acceptfd = accept(listenfd, (struct sockaddr *)&client_addr, &addrlength); + setsockopt(acceptfd, IPPROTO_TCP, TCP_NODELAY, (char *) &flag, sizeof(flag)); + do { + retval=pthread_create(&thread_dstm_accept, NULL, dstmAccept, (void *)acceptfd); + } while(retval!=0); + pthread_detach(thread_dstm_accept); + } } /* This function accepts a new connection request, decodes the control message in the connection * and accordingly calls other functions to process new requests */ diff --git a/Robust/src/Runtime/DSTM/interface/trans.c b/Robust/src/Runtime/DSTM/interface/trans.c index d3064030..e862061a 100644 --- a/Robust/src/Runtime/DSTM/interface/trans.c +++ b/Robust/src/Runtime/DSTM/interface/trans.c @@ -132,6 +132,7 @@ void prefetch(int ntuples, unsigned int *oids, unsigned short *endoffsets, short /* Set queue node values */ int len; int top=endoffsets[ntuples-1]; + *((int *)(node))=ntuples; len = sizeof(int); memcpy(node+len, oids, ntuples*sizeof(unsigned int)); @@ -147,7 +148,8 @@ int dstmStartup(const char * option) { pthread_t thread_Listen; pthread_attr_t attr; int master=option!=NULL && strcmp(option, "master")==0; - + int fd; + if (processConfigFile() != 0) return 0; //TODO: return error value, cause main program to exit #ifdef COMPILER @@ -162,13 +164,14 @@ int dstmStartup(const char * option) { dstmInit(); transInit(); + fd=startlistening(); if (master) { pthread_attr_init(&attr); pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_DETACHED); - pthread_create(&thread_Listen, &attr, dstmListen, NULL); + pthread_create(&thread_Listen, &attr, dstmListen, (void*)fd); return 1; } else { - dstmListen(); + dstmListen((void *)fd); return 0; } } -- 2.34.1