From: Jun Li Date: Tue, 27 Oct 2015 00:40:17 +0000 (-0700) Subject: Expose pending messages in queue stats in AsyncServerSocket X-Git-Tag: deprecate-dynamic-initializer~293 X-Git-Url: http://demsky.eecs.uci.edu/git/?a=commitdiff_plain;h=28c5ca589c26815b2393b35e6a2fc582f37f6453;p=folly.git Expose pending messages in queue stats in AsyncServerSocket Summary: Expose pending messages in accept queue in AsyncServerSocket. Set default accept message queue size to 1024. Reviewed By: djwatson Differential Revision: D2525161 fb-gh-sync-id: a69ea0ee77729e4a8300bde3e3c07840f2d5d3cb --- diff --git a/folly/io/async/AsyncServerSocket.h b/folly/io/async/AsyncServerSocket.h index 9b82ebd5..d423d5f3 100644 --- a/folly/io/async/AsyncServerSocket.h +++ b/folly/io/async/AsyncServerSocket.h @@ -200,7 +200,7 @@ class AsyncServerSocket : public DelayedDestruction static const uint32_t kDefaultMaxAcceptAtOnce = 30; static const uint32_t kDefaultCallbackAcceptAtOnce = 5; - static const uint32_t kDefaultMaxMessagesInQueue = 0; + static const uint32_t kDefaultMaxMessagesInQueue = 1024; /** * Create a new AsyncServerSocket with the specified EventBase. * @@ -564,6 +564,23 @@ class AsyncServerSocket : public DelayedDestruction return numDroppedConnections_; } + /** + * Get the current number of unprocessed messages in NotificationQueue. + * + * This method must be invoked from the AsyncServerSocket's primary + * EventBase thread. Use EventBase::runInEventBaseThread() to schedule the + * operation in the correct EventBase if your code is not in the server + * socket's primary EventBase. + */ + int64_t getNumPendingMessagesInQueue() const { + assert(eventBase_ == nullptr || eventBase_->isInEventBaseThread()); + int64_t numMsgs = 0; + for (const auto& callback : callbacks_) { + numMsgs += callback.consumer->getQueue()->size(); + } + return numMsgs; + } + /** * Set whether or not SO_KEEPALIVE should be enabled on the server socket * (and thus on all subsequently-accepted connections). By default, keepalive diff --git a/folly/io/async/test/AsyncSocketTest2.cpp b/folly/io/async/test/AsyncSocketTest2.cpp index 73295ca3..1a5ebeb8 100644 --- a/folly/io/async/test/AsyncSocketTest2.cpp +++ b/folly/io/async/test/AsyncSocketTest2.cpp @@ -2195,3 +2195,46 @@ TEST(AsyncSocketTest, ConnectionEventCallbackDefault) { ASSERT_EQ(connectionEventCallback.getBackoffEnded(), 0); ASSERT_EQ(connectionEventCallback.getBackoffError(), 0); } + +/** + * Test AsyncServerSocket::getNumPendingMessagesInQueue() + */ +TEST(AsyncSocketTest, NumPendingMessagesInQueue) { + EventBase eventBase; + + // Counter of how many connections have been accepted + int count = 0; + + // Create a server socket + auto serverSocket(AsyncServerSocket::newSocket(&eventBase)); + serverSocket->bind(0); + serverSocket->listen(16); + folly::SocketAddress serverAddress; + serverSocket->getAddress(&serverAddress); + + // Add a callback to accept connections + TestAcceptCallback acceptCallback; + acceptCallback.setConnectionAcceptedFn( + [&](int fd, const folly::SocketAddress& addr) { + count++; + CHECK_EQ(4 - count, serverSocket->getNumPendingMessagesInQueue()); + + if (count == 4) { + // all messages are processed, remove accept callback + serverSocket->removeAcceptCallback(&acceptCallback, &eventBase); + } + }); + acceptCallback.setAcceptErrorFn([&](const std::exception& ex) { + serverSocket->removeAcceptCallback(&acceptCallback, &eventBase); + }); + serverSocket->addAcceptCallback(&acceptCallback, &eventBase); + serverSocket->startAccepting(); + + // Connect to the server socket, 4 clients, there are 4 connections + auto socket1(AsyncSocket::newSocket(&eventBase, serverAddress)); + auto socket2(AsyncSocket::newSocket(&eventBase, serverAddress)); + auto socket3(AsyncSocket::newSocket(&eventBase, serverAddress)); + auto socket4(AsyncSocket::newSocket(&eventBase, serverAddress)); + + eventBase.loop(); +}