fix annoying race condition on startup
authorbdemsky <bdemsky>
Mon, 5 May 2008 22:01:03 +0000 (22:01 +0000)
committerbdemsky <bdemsky>
Mon, 5 May 2008 22:01:03 +0000 (22:01 +0000)
Robust/src/Runtime/DSTM/interface/dstm.h
Robust/src/Runtime/DSTM/interface/dstmserver.c
Robust/src/Runtime/DSTM/interface/trans.c

index 6311e213f27c6353ba221ed4a9721778ac275d9b..98f60f8637af078419b9474d1dc4224017ce4d4b 100644 (file)
@@ -235,7 +235,8 @@ void clearObjStore(); // TODO:currently only clears the prefetch cache object st
 /* end object store */
 
 /* Prototypes for server portion */
-void *dstmListen();
+void *dstmListen(void *);
+int startlistening();
 void *dstmAccept(void *);
 int readClientReq(trans_commit_data_t *, int);
 int processClientReq(fixed_data_t *, trans_commit_data_t *,unsigned int *, char *, void *, unsigned int *, int);
index d28eb808e658e54ed0810baa8cf7ff3967aea0e4..e1ec8c926cd2620ab51a761e2c2574061bd6907c 100644 (file)
@@ -53,65 +53,67 @@ int dstmInit(void)
        return 0;
 }
 
-/* This function starts the thread to listen on a socket 
- * for tranaction calls */
-void *dstmListen()
-{
-       int listenfd, acceptfd;
-       struct sockaddr_in my_addr;
-       struct sockaddr_in client_addr;
-       socklen_t addrlength = sizeof(struct sockaddr);
-       pthread_t thread_dstm_accept;
-       int i;
-       int setsockflag=1;
-
-       listenfd = socket(AF_INET, SOCK_STREAM, 0);
-       if (listenfd == -1)
-       {
-               perror("socket");
-               exit(1);
-       }
 
-       if (setsockopt(listenfd, SOL_SOCKET, SO_REUSEADDR, &setsockflag, sizeof (setsockflag)) < 0) {
-         perror("socket");
-         exit(1);
-       }
+int startlistening() {
+  int listenfd;
+  struct sockaddr_in my_addr;
+  socklen_t addrlength = sizeof(struct sockaddr);
+  int setsockflag=1;
+  
+  listenfd = socket(AF_INET, SOCK_STREAM, 0);
+  if (listenfd == -1) {
+    perror("socket");
+    exit(1);
+  }
+  
+  if (setsockopt(listenfd, SOL_SOCKET, SO_REUSEADDR, &setsockflag, sizeof (setsockflag)) < 0) {
+    perror("socket");
+    exit(1);
+  }
 #ifdef MAC
-       if (setsockopt(listenfd, SOL_SOCKET, SO_NOSIGPIPE, &setsockflag, sizeof (setsockflag)) < 0) {
-         perror("socket");
-         exit(1);
-       }
+  if (setsockopt(listenfd, SOL_SOCKET, SO_NOSIGPIPE, &setsockflag, sizeof (setsockflag)) < 0) {
+    perror("socket");
+    exit(1);
+  }
 #endif
+  
+  my_addr.sin_family = AF_INET;
+  my_addr.sin_port = htons(LISTEN_PORT);
+  my_addr.sin_addr.s_addr = INADDR_ANY;
+  memset(&(my_addr.sin_zero), '\0', 8);
+  
+  if (bind(listenfd, (struct sockaddr *)&my_addr, addrlength) == -1) {
+    perror("bind");
+    exit(1);
+  }
+  
+  if (listen(listenfd, BACKLOG) == -1) {
+    perror("listen");
+    exit(1);
+  }
+  return listenfd;
+}
 
-       my_addr.sin_family = AF_INET;
-       my_addr.sin_port = htons(LISTEN_PORT);
-       my_addr.sin_addr.s_addr = INADDR_ANY;
-       memset(&(my_addr.sin_zero), '\0', 8);
-
-       if (bind(listenfd, (struct sockaddr *)&my_addr, addrlength) == -1)
-       {
-               perror("bind");
-               exit(1);
-       }
-       
-       if (listen(listenfd, BACKLOG) == -1)
-       {
-               perror("listen");
-               exit(1);
-       }
-
-       printf("Listening on port %d, fd = %d\n", LISTEN_PORT, listenfd);
-       while(1)
-       {
-         int retval;
-         int flag=1;
-         acceptfd = accept(listenfd, (struct sockaddr *)&client_addr, &addrlength);
-         setsockopt(acceptfd, IPPROTO_TCP, TCP_NODELAY, (char *) &flag, sizeof(flag));
-         do {
-           retval=pthread_create(&thread_dstm_accept, NULL, dstmAccept, (void *)acceptfd);
-         } while(retval!=0);
-         pthread_detach(thread_dstm_accept);
-       }
+/* This function starts the thread to listen on a socket 
+ * for tranaction calls */
+void *dstmListen(void *lfd) {
+  int listenfd=(int)lfd;
+  int acceptfd;
+  struct sockaddr_in client_addr;
+  socklen_t addrlength = sizeof(struct sockaddr);
+  pthread_t thread_dstm_accept;
+  
+  printf("Listening on port %d, fd = %d\n", LISTEN_PORT, listenfd);
+  while(1) {
+    int retval;
+    int flag=1;
+    acceptfd = accept(listenfd, (struct sockaddr *)&client_addr, &addrlength);
+    setsockopt(acceptfd, IPPROTO_TCP, TCP_NODELAY, (char *) &flag, sizeof(flag));
+    do {
+      retval=pthread_create(&thread_dstm_accept, NULL, dstmAccept, (void *)acceptfd);
+    } while(retval!=0);
+    pthread_detach(thread_dstm_accept);
+  }
 }
 /* This function accepts a new connection request, decodes the control message in the connection 
  * and accordingly calls other functions to process new requests */
index d3064030f80d73a469272eae674af5737fdf12a2..e862061ad83b5fe6a0e0ec55698bf08881b688d7 100644 (file)
@@ -132,6 +132,7 @@ void prefetch(int ntuples, unsigned int *oids, unsigned short *endoffsets, short
   /* Set queue node values */
   int len;
   int top=endoffsets[ntuples-1];
+
   *((int *)(node))=ntuples;
   len = sizeof(int);
   memcpy(node+len, oids, ntuples*sizeof(unsigned int));
@@ -147,7 +148,8 @@ int dstmStartup(const char * option) {
   pthread_t thread_Listen;
   pthread_attr_t attr;
   int master=option!=NULL && strcmp(option, "master")==0;
-  
+  int fd;
+
   if (processConfigFile() != 0)
     return 0; //TODO: return error value, cause main program to exit
 #ifdef COMPILER
@@ -162,13 +164,14 @@ int dstmStartup(const char * option) {
   dstmInit();
   transInit();
   
+  fd=startlistening();
   if (master) {
     pthread_attr_init(&attr);
     pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_DETACHED);
-    pthread_create(&thread_Listen, &attr, dstmListen, NULL);
+    pthread_create(&thread_Listen, &attr, dstmListen, (void*)fd);
     return 1;
   } else {
-    dstmListen();
+    dstmListen((void *)fd);
     return 0;
   }
 }