alternate fix for fd's....
authorbdemsky <bdemsky>
Mon, 5 May 2008 07:28:51 +0000 (07:28 +0000)
committerbdemsky <bdemsky>
Mon, 5 May 2008 07:28:51 +0000 (07:28 +0000)
issue is that ==0 indicates socket is closed...but we need to close our end, not return...

Robust/src/Runtime/DSTM/interface/dstmserver.c

index a5e2016b2386fdfced7d3aebe32d57ad01454c40..d28eb808e658e54ed0810baa8cf7ff3967aea0e4 100644 (file)
@@ -16,7 +16,6 @@
 extern int classsize[];
 extern int numHostsInSystem;
 extern pthread_mutex_t notifymutex;
-extern unsigned int myIpAddr;
 
 objstr_t *mainobjstore;
 pthread_mutex_t mainobjstore_mutex;
@@ -30,92 +29,89 @@ sockPoolHashTable_t *transPResponseSocketPool;
 
 int dstmInit(void)
 {
-  mainobjstore = objstrCreate(DEFAULT_OBJ_STORE_SIZE);
-  /* Initialize attribute for mutex */
-  pthread_mutexattr_init(&mainobjstore_mutex_attr);
-  pthread_mutexattr_settype(&mainobjstore_mutex_attr, PTHREAD_MUTEX_RECURSIVE_NP);
-  pthread_mutex_init(&mainobjstore_mutex, &mainobjstore_mutex_attr);
-  pthread_mutex_init(&lockObjHeader,NULL);
-  if (mhashCreate(HASH_SIZE, LOADFACTOR))
-    return 1; //failure
-
-  if (lhashCreate(HASH_SIZE, LOADFACTOR))
-    return 1; //failure
-
-  if (notifyhashCreate(N_HASH_SIZE, N_LOADFACTOR))
-    return 1; //failure
-
-  //Initialize socket pool
-  if((transPResponseSocketPool = createSockPool(transPResponseSocketPool, 2*numHostsInSystem+1)) == NULL) {
-    printf("Error in creating new socket pool at  %s line %d\n", __FILE__, __LINE__);
-    return 0;
-  }
-  return 0;
+       mainobjstore = objstrCreate(DEFAULT_OBJ_STORE_SIZE);
+       /* Initialize attribute for mutex */
+       pthread_mutexattr_init(&mainobjstore_mutex_attr);
+       pthread_mutexattr_settype(&mainobjstore_mutex_attr, PTHREAD_MUTEX_RECURSIVE_NP);
+       pthread_mutex_init(&mainobjstore_mutex, &mainobjstore_mutex_attr);
+       pthread_mutex_init(&lockObjHeader,NULL);
+       if (mhashCreate(HASH_SIZE, LOADFACTOR))
+               return 1; //failure
+       
+       if (lhashCreate(HASH_SIZE, LOADFACTOR))
+               return 1; //failure
+
+       if (notifyhashCreate(N_HASH_SIZE, N_LOADFACTOR))
+               return 1; //failure
+
+    //Initialize socket pool
+    if((transPResponseSocketPool = createSockPool(transPResponseSocketPool, 2*numHostsInSystem+1)) == NULL) {
+        printf("Error in creating new socket pool at  %s line %d\n", __FILE__, __LINE__);
+        return 0;
+    }
+
+       return 0;
 }
 
 /* This function starts the thread to listen on a socket 
  * for tranaction calls */
 void *dstmListen()
 {
-  int listenfd, acceptfd;
-  struct sockaddr_in my_addr;
-  struct sockaddr_in client_addr;
-  socklen_t addrlength = sizeof(struct sockaddr);
-  pthread_t thread_dstm_accept;
-  int i;
-  int setsockflag=1;
-
-  listenfd = socket(AF_INET, SOCK_STREAM, 0);
-  if (listenfd == -1)
-  {
-    perror("socket");
-    exit(1);
-  }
+       int listenfd, acceptfd;
+       struct sockaddr_in my_addr;
+       struct sockaddr_in client_addr;
+       socklen_t addrlength = sizeof(struct sockaddr);
+       pthread_t thread_dstm_accept;
+       int i;
+       int setsockflag=1;
+
+       listenfd = socket(AF_INET, SOCK_STREAM, 0);
+       if (listenfd == -1)
+       {
+               perror("socket");
+               exit(1);
+       }
 
-  if (setsockopt(listenfd, SOL_SOCKET, SO_REUSEADDR, &setsockflag, sizeof (setsockflag)) < 0) {
-    perror("socket");
-    exit(1);
-  }
+       if (setsockopt(listenfd, SOL_SOCKET, SO_REUSEADDR, &setsockflag, sizeof (setsockflag)) < 0) {
+         perror("socket");
+         exit(1);
+       }
 #ifdef MAC
-  if (setsockopt(listenfd, SOL_SOCKET, SO_NOSIGPIPE, &setsockflag, sizeof (setsockflag)) < 0) {
-    perror("socket");
-    exit(1);
-  }
+       if (setsockopt(listenfd, SOL_SOCKET, SO_NOSIGPIPE, &setsockflag, sizeof (setsockflag)) < 0) {
+         perror("socket");
+         exit(1);
+       }
 #endif
 
-  my_addr.sin_family = AF_INET;
-  my_addr.sin_port = htons(LISTEN_PORT);
-  my_addr.sin_addr.s_addr = INADDR_ANY;
-  memset(&(my_addr.sin_zero), '\0', 8);
-
-  if (bind(listenfd, (struct sockaddr *)&my_addr, addrlength) == -1)
-  {
-    perror("bind");
-    exit(1);
-  }
+       my_addr.sin_family = AF_INET;
+       my_addr.sin_port = htons(LISTEN_PORT);
+       my_addr.sin_addr.s_addr = INADDR_ANY;
+       memset(&(my_addr.sin_zero), '\0', 8);
 
-  if (listen(listenfd, BACKLOG) == -1)
-  {
-    perror("listen");
-    exit(1);
-  }
+       if (bind(listenfd, (struct sockaddr *)&my_addr, addrlength) == -1)
+       {
+               perror("bind");
+               exit(1);
+       }
+       
+       if (listen(listenfd, BACKLOG) == -1)
+       {
+               perror("listen");
+               exit(1);
+       }
 
-  printf("Listening on port %d, fd = %d\n", LISTEN_PORT, listenfd);
-  while(1)
-  {
-    int retval;
-    int flag=1;
-    acceptfd = accept(listenfd, (struct sockaddr *)&client_addr, &addrlength);
-    if (acceptfd < 0) {
-      perror("Error in accept: ");
-      printf("error %x", acceptfd); fflush(stdout);
-    }
-    setsockopt(acceptfd, IPPROTO_TCP, TCP_NODELAY, (char *) &flag, sizeof(flag));
-    do {
-      retval=pthread_create(&thread_dstm_accept, NULL, dstmAccept, (void *)acceptfd);
-    } while(retval!=0);
-    pthread_detach(thread_dstm_accept);
-  }
+       printf("Listening on port %d, fd = %d\n", LISTEN_PORT, listenfd);
+       while(1)
+       {
+         int retval;
+         int flag=1;
+         acceptfd = accept(listenfd, (struct sockaddr *)&client_addr, &addrlength);
+         setsockopt(acceptfd, IPPROTO_TCP, TCP_NODELAY, (char *) &flag, sizeof(flag));
+         do {
+           retval=pthread_create(&thread_dstm_accept, NULL, dstmAccept, (void *)acceptfd);
+         } while(retval!=0);
+         pthread_detach(thread_dstm_accept);
+       }
 }
 /* This function accepts a new connection request, decodes the control message in the connection 
  * and accordingly calls other functions to process new requests */
@@ -130,52 +126,50 @@ void *dstmAccept(void *acceptfd) {
   trans_commit_data_t transinfo;
   unsigned short objType, *versionarry, version;
   unsigned int *oidarry, numoid, mid, threadid;
-
+  
   /* Receive control messages from other machines */
-  if((retval = recv_data_errorcode((int)acceptfd, &control, sizeof(char))) <= 0) {
-    printf("%s() Error: Receiving control = %d at %s, %d\n", __func__, control, __FILE__, __LINE__);
-    pthread_exit(NULL);
-  }
-
-  switch(control) {
+  while(1) {
+    int ret=recv_data_errorcode((int)acceptfd, &control, sizeof(char));
+    if (ret==0)
+      break;
+    if (ret==-1) {
+      printf("DEBUG -> RECV Error!.. retrying\n");
+      break;
+    }
+    switch(control) {
     case READ_REQUEST:
-      do {
-        /* Read oid requested and search if available */
-        recv_data((int)acceptfd, &oid, sizeof(unsigned int));
-        if((srcObj = mhashSearch(oid)) == NULL) {
-          printf("Error: Object 0x%x is not found in Main Object Store %s, %d\n", oid, __FILE__, __LINE__);
-          break;
-        }
-        h = (objheader_t *) srcObj;
-        GETSIZE(size, h);
-        size += sizeof(objheader_t);
-        sockid = (int) acceptfd;
-
-        if (h == NULL) {
-          ctrl = OBJECT_NOT_FOUND;
-          send_data(sockid, &ctrl, sizeof(char));
-        } else {
-          /* Type */
-          char msg[]={OBJECT_FOUND, 0, 0, 0, 0};
-          *((int *)&msg[1])=size;
-          send_data(sockid, &msg, sizeof(msg));
-          send_data(sockid, h, size);
-        }
-        if((retval = recv_data_errorcode((int)acceptfd, &control, sizeof(char))) <= 0)  {
-          pthread_exit(NULL);
-        }
-      } while(control == READ_REQUEST);
+      /* Read oid requested and search if available */
+      recv_data((int)acceptfd, &oid, sizeof(unsigned int));
+      if((srcObj = mhashSearch(oid)) == NULL) {
+       printf("Error: Object 0x%x is not found in Main Object Store %s, %d\n", oid, __FILE__, __LINE__);
+       break;
+      }
+      h = (objheader_t *) srcObj;
+      GETSIZE(size, h);
+      size += sizeof(objheader_t);
+      sockid = (int) acceptfd;
+      
+      if (h == NULL) {
+       ctrl = OBJECT_NOT_FOUND;
+       send_data(sockid, &ctrl, sizeof(char));
+      } else {
+       /* Type */
+       char msg[]={OBJECT_FOUND, 0, 0, 0, 0};
+       *((int *)&msg[1])=size;
+       send_data(sockid, &msg, sizeof(msg));
+       send_data(sockid, h, size);
+      }
       break;
-
+      
     case READ_MULT_REQUEST:
       break;
-
+      
     case MOVE_REQUEST:
       break;
-
+      
     case MOVE_MULT_REQUEST:
       break;
-
+      
     case TRANS_REQUEST:
       /* Read transaction request */
       transinfo.objlocked = NULL;
@@ -184,48 +178,38 @@ void *dstmAccept(void *acceptfd) {
       transinfo.numlocked = 0;
       transinfo.numnotfound = 0;
       if((val = readClientReq(&transinfo, (int)acceptfd)) != 0) {
-        printf("Error: In readClientReq() %s, %d\n", __FILE__, __LINE__);
-        break;
+       printf("Error: In readClientReq() %s, %d\n", __FILE__, __LINE__);
+       pthread_exit(NULL);
       }
       break;
     case TRANS_PREFETCH:
-      do {
-        if((val = prefetchReq((int)acceptfd)) != 0) {
-          printf("Error: In prefetchReq() %s, %d\n", __FILE__, __LINE__);
-          break;
-        }
-        if((retval = recv_data_errorcode((int)acceptfd, &control, sizeof(char))) <= 0)  {
-          pthread_exit(NULL);
-        }
-      } while (control == TRANS_PREFETCH);
+      if((val = prefetchReq((int)acceptfd)) != 0) {
+       printf("Error: In prefetchReq() %s, %d\n", __FILE__, __LINE__);
+       break;
+      }
       break;
     case TRANS_PREFETCH_RESPONSE:
-      do {
-        if((val = getPrefetchResponse((int) acceptfd)) != 0) {
-          printf("Error: In getPrefetchResponse() %s, %d\n", __FILE__, __LINE__);
-          break;
-        }
-        if((retval = recv_data_errorcode((int)acceptfd, &control, sizeof(char))) <= 0)  {
-          pthread_exit(NULL);
-        }
-      } while (control == TRANS_PREFETCH_RESPONSE);
+      if((val = getPrefetchResponse((int) acceptfd)) != 0) {
+       printf("Error: In getPrefetchResponse() %s, %d\n", __FILE__, __LINE__);
+       break;
+      }
       break;
     case START_REMOTE_THREAD:
       recv_data((int)acceptfd, &oid, sizeof(unsigned int));
       objType = getObjType(oid);
       startDSMthread(oid, objType);
       break;
-
+      
     case THREAD_NOTIFY_REQUEST:
       recv_data((int)acceptfd, &numoid, sizeof(unsigned int));
       size = (sizeof(unsigned int) + sizeof(unsigned short)) * numoid + 2 * sizeof(unsigned int);
       if((buffer = calloc(1,size)) == NULL) {
-        printf("%s() Calloc error at %s, %d\n", __func__, __FILE__, __LINE__);
-        pthread_exit(NULL);
+       printf("%s() Calloc error at %s, %d\n", __func__, __FILE__, __LINE__);
+       pthread_exit(NULL);
       }
-
+      
       recv_data((int)acceptfd, buffer, size);
-
+      
       oidarry = calloc(numoid, sizeof(unsigned int)); 
       memcpy(oidarry, buffer, sizeof(unsigned int) * numoid);
       size = sizeof(unsigned int) * numoid;
@@ -237,18 +221,18 @@ void *dstmAccept(void *acceptfd) {
       threadid = *((unsigned int *)(buffer+size));
       processReqNotify(numoid, oidarry, versionarry, mid, threadid);
       free(buffer);
-
+      
       break;
 
     case THREAD_NOTIFY_RESPONSE:
       size = sizeof(unsigned short) + 2 * sizeof(unsigned int);
       if((buffer = calloc(1,size)) == NULL) {
-        printf("%s() Calloc error at %s, %d\n", __func__, __FILE__, __LINE__);
-        pthread_exit(NULL);
+       printf("%s() Calloc error at %s, %d\n", __func__, __FILE__, __LINE__);
+       pthread_exit(NULL);
       }
-
+      
       recv_data((int)acceptfd, buffer, size);
-
+      
       oid = *((unsigned int *)buffer);
       size = sizeof(unsigned int);
       version = *((unsigned short *)(buffer+size));
@@ -263,9 +247,10 @@ void *dstmAccept(void *acceptfd) {
 
     default:
       printf("Error: dstmAccept() Unknown opcode %d at %s, %d\n", control, __FILE__, __LINE__);
+    }
   }
 
-closeconnection:
+ closeconnection:
   /* Close connection */
   if (close((int)acceptfd) == -1)
     perror("close");