static const uint32_t kDefaultMaxAcceptAtOnce = 30;
static const uint32_t kDefaultCallbackAcceptAtOnce = 5;
- static const uint32_t kDefaultMaxMessagesInQueue = 0;
+ static const uint32_t kDefaultMaxMessagesInQueue = 1024;
/**
* Create a new AsyncServerSocket with the specified EventBase.
*
return numDroppedConnections_;
}
+ /**
+ * Get the current number of unprocessed messages in NotificationQueue.
+ *
+ * This method must be invoked from the AsyncServerSocket's primary
+ * EventBase thread. Use EventBase::runInEventBaseThread() to schedule the
+ * operation in the correct EventBase if your code is not in the server
+ * socket's primary EventBase.
+ */
+ int64_t getNumPendingMessagesInQueue() const {
+ assert(eventBase_ == nullptr || eventBase_->isInEventBaseThread());
+ int64_t numMsgs = 0;
+ for (const auto& callback : callbacks_) {
+ numMsgs += callback.consumer->getQueue()->size();
+ }
+ return numMsgs;
+ }
+
/**
* Set whether or not SO_KEEPALIVE should be enabled on the server socket
* (and thus on all subsequently-accepted connections). By default, keepalive
ASSERT_EQ(connectionEventCallback.getBackoffEnded(), 0);
ASSERT_EQ(connectionEventCallback.getBackoffError(), 0);
}
+
+/**
+ * Test AsyncServerSocket::getNumPendingMessagesInQueue()
+ */
+TEST(AsyncSocketTest, NumPendingMessagesInQueue) {
+ EventBase eventBase;
+
+ // Counter of how many connections have been accepted
+ int count = 0;
+
+ // Create a server socket
+ auto serverSocket(AsyncServerSocket::newSocket(&eventBase));
+ serverSocket->bind(0);
+ serverSocket->listen(16);
+ folly::SocketAddress serverAddress;
+ serverSocket->getAddress(&serverAddress);
+
+ // Add a callback to accept connections
+ TestAcceptCallback acceptCallback;
+ acceptCallback.setConnectionAcceptedFn(
+ [&](int fd, const folly::SocketAddress& addr) {
+ count++;
+ CHECK_EQ(4 - count, serverSocket->getNumPendingMessagesInQueue());
+
+ if (count == 4) {
+ // all messages are processed, remove accept callback
+ serverSocket->removeAcceptCallback(&acceptCallback, &eventBase);
+ }
+ });
+ acceptCallback.setAcceptErrorFn([&](const std::exception& ex) {
+ serverSocket->removeAcceptCallback(&acceptCallback, &eventBase);
+ });
+ serverSocket->addAcceptCallback(&acceptCallback, &eventBase);
+ serverSocket->startAccepting();
+
+ // Connect to the server socket, 4 clients, there are 4 connections
+ auto socket1(AsyncSocket::newSocket(&eventBase, serverAddress));
+ auto socket2(AsyncSocket::newSocket(&eventBase, serverAddress));
+ auto socket3(AsyncSocket::newSocket(&eventBase, serverAddress));
+ auto socket4(AsyncSocket::newSocket(&eventBase, serverAddress));
+
+ eventBase.loop();
+}