fix for too many socket errors and race condition in locking main
authoradash <adash>
Mon, 5 May 2008 06:54:57 +0000 (06:54 +0000)
committeradash <adash>
Mon, 5 May 2008 06:54:57 +0000 (06:54 +0000)
object stores look up table

Robust/src/Runtime/DSTM/interface/dstmserver.c
Robust/src/Runtime/DSTM/interface/mlookup.c

index 2674a911fda741bf145dc7cbb9f05e7aeee891b1..a5e2016b2386fdfced7d3aebe32d57ad01454c40 100644 (file)
@@ -16,6 +16,7 @@
 extern int classsize[];
 extern int numHostsInSystem;
 extern pthread_mutex_t notifymutex;
+extern unsigned int myIpAddr;
 
 objstr_t *mainobjstore;
 pthread_mutex_t mainobjstore_mutex;
@@ -29,89 +30,92 @@ sockPoolHashTable_t *transPResponseSocketPool;
 
 int dstmInit(void)
 {
-       mainobjstore = objstrCreate(DEFAULT_OBJ_STORE_SIZE);
-       /* Initialize attribute for mutex */
-       pthread_mutexattr_init(&mainobjstore_mutex_attr);
-       pthread_mutexattr_settype(&mainobjstore_mutex_attr, PTHREAD_MUTEX_RECURSIVE_NP);
-       pthread_mutex_init(&mainobjstore_mutex, &mainobjstore_mutex_attr);
-       pthread_mutex_init(&lockObjHeader,NULL);
-       if (mhashCreate(HASH_SIZE, LOADFACTOR))
-               return 1; //failure
-       
-       if (lhashCreate(HASH_SIZE, LOADFACTOR))
-               return 1; //failure
-
-       if (notifyhashCreate(N_HASH_SIZE, N_LOADFACTOR))
-               return 1; //failure
-
-    //Initialize socket pool
-    if((transPResponseSocketPool = createSockPool(transPResponseSocketPool, 2*numHostsInSystem+1)) == NULL) {
-        printf("Error in creating new socket pool at  %s line %d\n", __FILE__, __LINE__);
-        return 0;
-    }
-
-       return 0;
+  mainobjstore = objstrCreate(DEFAULT_OBJ_STORE_SIZE);
+  /* Initialize attribute for mutex */
+  pthread_mutexattr_init(&mainobjstore_mutex_attr);
+  pthread_mutexattr_settype(&mainobjstore_mutex_attr, PTHREAD_MUTEX_RECURSIVE_NP);
+  pthread_mutex_init(&mainobjstore_mutex, &mainobjstore_mutex_attr);
+  pthread_mutex_init(&lockObjHeader,NULL);
+  if (mhashCreate(HASH_SIZE, LOADFACTOR))
+    return 1; //failure
+
+  if (lhashCreate(HASH_SIZE, LOADFACTOR))
+    return 1; //failure
+
+  if (notifyhashCreate(N_HASH_SIZE, N_LOADFACTOR))
+    return 1; //failure
+
+  //Initialize socket pool
+  if((transPResponseSocketPool = createSockPool(transPResponseSocketPool, 2*numHostsInSystem+1)) == NULL) {
+    printf("Error in creating new socket pool at  %s line %d\n", __FILE__, __LINE__);
+    return 0;
+  }
+  return 0;
 }
 
 /* This function starts the thread to listen on a socket 
  * for tranaction calls */
 void *dstmListen()
 {
-       int listenfd, acceptfd;
-       struct sockaddr_in my_addr;
-       struct sockaddr_in client_addr;
-       socklen_t addrlength = sizeof(struct sockaddr);
-       pthread_t thread_dstm_accept;
-       int i;
-       int setsockflag=1;
-
-       listenfd = socket(AF_INET, SOCK_STREAM, 0);
-       if (listenfd == -1)
-       {
-               perror("socket");
-               exit(1);
-       }
+  int listenfd, acceptfd;
+  struct sockaddr_in my_addr;
+  struct sockaddr_in client_addr;
+  socklen_t addrlength = sizeof(struct sockaddr);
+  pthread_t thread_dstm_accept;
+  int i;
+  int setsockflag=1;
+
+  listenfd = socket(AF_INET, SOCK_STREAM, 0);
+  if (listenfd == -1)
+  {
+    perror("socket");
+    exit(1);
+  }
 
-       if (setsockopt(listenfd, SOL_SOCKET, SO_REUSEADDR, &setsockflag, sizeof (setsockflag)) < 0) {
-         perror("socket");
-         exit(1);
-       }
+  if (setsockopt(listenfd, SOL_SOCKET, SO_REUSEADDR, &setsockflag, sizeof (setsockflag)) < 0) {
+    perror("socket");
+    exit(1);
+  }
 #ifdef MAC
-       if (setsockopt(listenfd, SOL_SOCKET, SO_NOSIGPIPE, &setsockflag, sizeof (setsockflag)) < 0) {
-         perror("socket");
-         exit(1);
-       }
+  if (setsockopt(listenfd, SOL_SOCKET, SO_NOSIGPIPE, &setsockflag, sizeof (setsockflag)) < 0) {
+    perror("socket");
+    exit(1);
+  }
 #endif
 
-       my_addr.sin_family = AF_INET;
-       my_addr.sin_port = htons(LISTEN_PORT);
-       my_addr.sin_addr.s_addr = INADDR_ANY;
-       memset(&(my_addr.sin_zero), '\0', 8);
+  my_addr.sin_family = AF_INET;
+  my_addr.sin_port = htons(LISTEN_PORT);
+  my_addr.sin_addr.s_addr = INADDR_ANY;
+  memset(&(my_addr.sin_zero), '\0', 8);
 
-       if (bind(listenfd, (struct sockaddr *)&my_addr, addrlength) == -1)
-       {
-               perror("bind");
-               exit(1);
-       }
-       
-       if (listen(listenfd, BACKLOG) == -1)
-       {
-               perror("listen");
-               exit(1);
-       }
+  if (bind(listenfd, (struct sockaddr *)&my_addr, addrlength) == -1)
+  {
+    perror("bind");
+    exit(1);
+  }
 
-       printf("Listening on port %d, fd = %d\n", LISTEN_PORT, listenfd);
-       while(1)
-       {
-         int retval;
-         int flag=1;
-         acceptfd = accept(listenfd, (struct sockaddr *)&client_addr, &addrlength);
-         setsockopt(acceptfd, IPPROTO_TCP, TCP_NODELAY, (char *) &flag, sizeof(flag));
-         do {
-           retval=pthread_create(&thread_dstm_accept, NULL, dstmAccept, (void *)acceptfd);
-         } while(retval!=0);
-         pthread_detach(thread_dstm_accept);
-       }
+  if (listen(listenfd, BACKLOG) == -1)
+  {
+    perror("listen");
+    exit(1);
+  }
+
+  printf("Listening on port %d, fd = %d\n", LISTEN_PORT, listenfd);
+  while(1)
+  {
+    int retval;
+    int flag=1;
+    acceptfd = accept(listenfd, (struct sockaddr *)&client_addr, &addrlength);
+    if (acceptfd < 0) {
+      perror("Error in accept: ");
+      printf("error %x", acceptfd); fflush(stdout);
+    }
+    setsockopt(acceptfd, IPPROTO_TCP, TCP_NODELAY, (char *) &flag, sizeof(flag));
+    do {
+      retval=pthread_create(&thread_dstm_accept, NULL, dstmAccept, (void *)acceptfd);
+    } while(retval!=0);
+    pthread_detach(thread_dstm_accept);
+  }
 }
 /* This function accepts a new connection request, decodes the control message in the connection 
  * and accordingly calls other functions to process new requests */
@@ -126,50 +130,52 @@ void *dstmAccept(void *acceptfd) {
   trans_commit_data_t transinfo;
   unsigned short objType, *versionarry, version;
   unsigned int *oidarry, numoid, mid, threadid;
-  
+
   /* Receive control messages from other machines */
-  while(1) {
-    int ret=recv_data_errorcode((int)acceptfd, &control, sizeof(char));
-    if (ret==0)
-      return;
-    if (ret==-1) {
-      printf("DEBUG -> RECV Error!.. retrying\n");
-      break;
-    }
-    switch(control) {
+  if((retval = recv_data_errorcode((int)acceptfd, &control, sizeof(char))) <= 0) {
+    printf("%s() Error: Receiving control = %d at %s, %d\n", __func__, control, __FILE__, __LINE__);
+    pthread_exit(NULL);
+  }
+
+  switch(control) {
     case READ_REQUEST:
-      /* Read oid requested and search if available */
-      recv_data((int)acceptfd, &oid, sizeof(unsigned int));
-      if((srcObj = mhashSearch(oid)) == NULL) {
-       printf("Error: Object 0x%x is not found in Main Object Store %s, %d\n", oid, __FILE__, __LINE__);
-       break;
-      }
-      h = (objheader_t *) srcObj;
-      GETSIZE(size, h);
-      size += sizeof(objheader_t);
-      sockid = (int) acceptfd;
-      
-      if (h == NULL) {
-       ctrl = OBJECT_NOT_FOUND;
-       send_data(sockid, &ctrl, sizeof(char));
-      } else {
-       /* Type */
-       char msg[]={OBJECT_FOUND, 0, 0, 0, 0};
-       *((int *)&msg[1])=size;
-       send_data(sockid, &msg, sizeof(msg));
-       send_data(sockid, h, size);
-      }
+      do {
+        /* Read oid requested and search if available */
+        recv_data((int)acceptfd, &oid, sizeof(unsigned int));
+        if((srcObj = mhashSearch(oid)) == NULL) {
+          printf("Error: Object 0x%x is not found in Main Object Store %s, %d\n", oid, __FILE__, __LINE__);
+          break;
+        }
+        h = (objheader_t *) srcObj;
+        GETSIZE(size, h);
+        size += sizeof(objheader_t);
+        sockid = (int) acceptfd;
+
+        if (h == NULL) {
+          ctrl = OBJECT_NOT_FOUND;
+          send_data(sockid, &ctrl, sizeof(char));
+        } else {
+          /* Type */
+          char msg[]={OBJECT_FOUND, 0, 0, 0, 0};
+          *((int *)&msg[1])=size;
+          send_data(sockid, &msg, sizeof(msg));
+          send_data(sockid, h, size);
+        }
+        if((retval = recv_data_errorcode((int)acceptfd, &control, sizeof(char))) <= 0)  {
+          pthread_exit(NULL);
+        }
+      } while(control == READ_REQUEST);
       break;
-      
+
     case READ_MULT_REQUEST:
       break;
-      
+
     case MOVE_REQUEST:
       break;
-      
+
     case MOVE_MULT_REQUEST:
       break;
-      
+
     case TRANS_REQUEST:
       /* Read transaction request */
       transinfo.objlocked = NULL;
@@ -178,38 +184,48 @@ void *dstmAccept(void *acceptfd) {
       transinfo.numlocked = 0;
       transinfo.numnotfound = 0;
       if((val = readClientReq(&transinfo, (int)acceptfd)) != 0) {
-       printf("Error: In readClientReq() %s, %d\n", __FILE__, __LINE__);
-       pthread_exit(NULL);
+        printf("Error: In readClientReq() %s, %d\n", __FILE__, __LINE__);
+        break;
       }
       break;
     case TRANS_PREFETCH:
-      if((val = prefetchReq((int)acceptfd)) != 0) {
-       printf("Error: In prefetchReq() %s, %d\n", __FILE__, __LINE__);
-       break;
-      }
+      do {
+        if((val = prefetchReq((int)acceptfd)) != 0) {
+          printf("Error: In prefetchReq() %s, %d\n", __FILE__, __LINE__);
+          break;
+        }
+        if((retval = recv_data_errorcode((int)acceptfd, &control, sizeof(char))) <= 0)  {
+          pthread_exit(NULL);
+        }
+      } while (control == TRANS_PREFETCH);
       break;
     case TRANS_PREFETCH_RESPONSE:
-      if((val = getPrefetchResponse((int) acceptfd)) != 0) {
-       printf("Error: In getPrefetchResponse() %s, %d\n", __FILE__, __LINE__);
-       break;
-      }
+      do {
+        if((val = getPrefetchResponse((int) acceptfd)) != 0) {
+          printf("Error: In getPrefetchResponse() %s, %d\n", __FILE__, __LINE__);
+          break;
+        }
+        if((retval = recv_data_errorcode((int)acceptfd, &control, sizeof(char))) <= 0)  {
+          pthread_exit(NULL);
+        }
+      } while (control == TRANS_PREFETCH_RESPONSE);
       break;
     case START_REMOTE_THREAD:
       recv_data((int)acceptfd, &oid, sizeof(unsigned int));
       objType = getObjType(oid);
       startDSMthread(oid, objType);
       break;
-      
+
     case THREAD_NOTIFY_REQUEST:
       recv_data((int)acceptfd, &numoid, sizeof(unsigned int));
       size = (sizeof(unsigned int) + sizeof(unsigned short)) * numoid + 2 * sizeof(unsigned int);
       if((buffer = calloc(1,size)) == NULL) {
-       printf("%s() Calloc error at %s, %d\n", __func__, __FILE__, __LINE__);
-       pthread_exit(NULL);
+        printf("%s() Calloc error at %s, %d\n", __func__, __FILE__, __LINE__);
+        pthread_exit(NULL);
       }
-      
+
       recv_data((int)acceptfd, buffer, size);
-      
+
       oidarry = calloc(numoid, sizeof(unsigned int)); 
       memcpy(oidarry, buffer, sizeof(unsigned int) * numoid);
       size = sizeof(unsigned int) * numoid;
@@ -221,18 +237,18 @@ void *dstmAccept(void *acceptfd) {
       threadid = *((unsigned int *)(buffer+size));
       processReqNotify(numoid, oidarry, versionarry, mid, threadid);
       free(buffer);
-      
+
       break;
 
     case THREAD_NOTIFY_RESPONSE:
       size = sizeof(unsigned short) + 2 * sizeof(unsigned int);
       if((buffer = calloc(1,size)) == NULL) {
-       printf("%s() Calloc error at %s, %d\n", __func__, __FILE__, __LINE__);
-       pthread_exit(NULL);
+        printf("%s() Calloc error at %s, %d\n", __func__, __FILE__, __LINE__);
+        pthread_exit(NULL);
       }
-      
+
       recv_data((int)acceptfd, buffer, size);
-      
+
       oid = *((unsigned int *)buffer);
       size = sizeof(unsigned int);
       version = *((unsigned short *)(buffer+size));
@@ -247,10 +263,9 @@ void *dstmAccept(void *acceptfd) {
 
     default:
       printf("Error: dstmAccept() Unknown opcode %d at %s, %d\n", control, __FILE__, __LINE__);
-    }
   }
 
- closeconnection:
+closeconnection:
   /* Close connection */
   if (close((int)acceptfd) == -1)
     perror("close");
index 0629adbec6c768ac618a08c46dd9b4df792dce3f..9000b80e69519c6ee4c847501407da2b67d3e099 100644 (file)
@@ -41,11 +41,11 @@ unsigned int mhashInsert(unsigned int key, void *val) {
        ptr = mlookup.table;
        mlookup.numelements++;
        
-       index = mhashFunction(key);
 #ifdef DEBUG
        printf("DEBUG -> index = %d, key = %d, val = %x\n", index, key, val);
 #endif
        pthread_mutex_lock(&mlookup.locktable);
+       index = mhashFunction(key);
        if(ptr[index].next == NULL && ptr[index].key == 0) {    // Insert at the first position in the hashtable
                ptr[index].key = key;
                ptr[index].val = val;
@@ -69,10 +69,10 @@ void *mhashSearch(unsigned int key) {
        int index;
        mhashlistnode_t *ptr, *node;
 
+       pthread_mutex_lock(&mlookup.locktable);
        ptr = mlookup.table;    // Address of the beginning of hash table       
        index = mhashFunction(key);
        node = &ptr[index];
-       pthread_mutex_lock(&mlookup.locktable);
        while(node != NULL) {
                if(node->key == key) {
                        pthread_mutex_unlock(&mlookup.locktable);
@@ -90,11 +90,10 @@ unsigned int mhashRemove(unsigned int key) {
        mhashlistnode_t *curr, *prev;
        mhashlistnode_t *ptr, *node;
        
+       pthread_mutex_lock(&mlookup.locktable);
        ptr = mlookup.table;
        index = mhashFunction(key);
        curr = &ptr[index];
-
-       pthread_mutex_lock(&mlookup.locktable);
        for (; curr != NULL; curr = curr->next) {
                if (curr->key == key) {         // Find a match in the hash table
                        mlookup.numelements--;  // Decrement the number of elements in the global hashtable