add consumeUntilDrained API to NotificationQueue::Consumer
authorJames Sedgwick <jsedgwick@fb.com>
Thu, 4 Dec 2014 18:29:24 +0000 (10:29 -0800)
committerDave Watson <davejwatson@fb.com>
Thu, 11 Dec 2014 16:00:25 +0000 (08:00 -0800)
Summary:
... and employ it in EventBase's destructor to stop leaking memory from unexecuted NotificationQueue-variety runInEventBaseThread() callbacks

in addition to the attached task, this should also fix the root cause of the already-worked-around #5564342

Test Plan:
test no longer leaks.* on the other hand, IOThreadPoolExecutor::stop() no longer actually stops, it joins. I added comments to stop() in headers indicating that it should be treated as best-effort.

* hilariously, this actually isn't true - there's a new leak of a thread local inside the io pool because callbacks can outlive the pool now. i'll leave a full explanation for the upcoming patch.

Reviewed By: davejwatson@fb.com

Subscribers: alandau, bmatheny, mshneer, trunkagent, fugalh, njormrod, folly-diffs@

FB internal diff: D1682860

Tasks: 5336655

Signature: t1:1682860:1416347774:ac5f31fb72373992f425c93ac284a0cd27608db4

folly/experimental/wangle/concurrent/IOThreadPoolExecutor.h
folly/experimental/wangle/concurrent/ThreadPoolExecutor.h
folly/experimental/wangle/concurrent/test/ThreadPoolExecutorTest.cpp
folly/io/async/EventBase.cpp
folly/io/async/EventBase.h
folly/io/async/NotificationQueue.h
folly/io/async/test/EventBaseTest.cpp

index 35cce4c008692d7405209ecf1885259a7724e22c..0dcfd2eff071bdd71cfac072156238fcb28545f2 100644 (file)
@@ -20,6 +20,8 @@
 
 namespace folly { namespace wangle {
 
+// N.B. For this thread pool, stop() behaves like join() because outstanding
+// tasks belong to the event base and will be executed upon its destruction.
 class IOThreadPoolExecutor : public ThreadPoolExecutor {
  public:
   explicit IOThreadPoolExecutor(
index 52c7d1f789d1f7200ac84eaf59a6893c901ac20e..b6b0dd788b7b7c019012fba321e8dc542a0e829d 100644 (file)
@@ -56,6 +56,11 @@ class ThreadPoolExecutor : public Executor {
 
   size_t numThreads();
   void setNumThreads(size_t numThreads);
+  /*
+   * stop() is best effort - there is no guarantee that unexecuted tasks won't
+   * be executed before it returns. Specifically, IOThreadPoolExecutor's stop()
+   * behaves like join().
+   */
   void stop();
   void join();
 
index 6e3782ce40724af14714978abdf7473368c2e9d8..2749d0cd583ecdb971456e6b1af22537793ef27f 100644 (file)
@@ -75,6 +75,24 @@ static void stop() {
   EXPECT_GT(1000, completed);
 }
 
+// IOThreadPoolExecutor's stop() behaves like join(). Outstanding tasks belong
+// to the event base, will be executed upon its destruction, and cannot be
+// taken back.
+template <>
+void stop<IOThreadPoolExecutor>() {
+  IOThreadPoolExecutor tpe(1);
+  std::atomic<int> completed(0);
+  auto f = [&](){
+    burnMs(10)();
+    completed++;
+  };
+  for (int i = 0; i < 10; i++) {
+    tpe.add(f);
+  }
+  tpe.stop();
+  EXPECT_EQ(10, completed);
+}
+
 TEST(ThreadPoolExecutorTest, CPUStop) {
   stop<CPUThreadPoolExecutor>();
 }
index 6916ceaca57c88c863ff49b21f6a224e92c66c87..7bfc221368921f49b0e3cae3eeae7842ec635c69 100644 (file)
@@ -221,6 +221,10 @@ EventBase::~EventBase() {
 
   (void) runLoopCallbacks(false);
 
+  if (!fnRunner_->consumeUntilDrained()) {
+    LOG(ERROR) << "~EventBase(): Unable to drain notification queue";
+  }
+
   // Stop consumer before deleting NotificationQueue
   fnRunner_->stopConsuming();
   {
index d18460715cb3efb77907448b6053a7e04ae6c2e1..92de83fc76b67035bc54c5cc99218e4eadfb503d 100644 (file)
@@ -281,9 +281,7 @@ class EventBase :
    * If runInEventBaseThread() returns true the function has successfully been
    * scheduled to run in the loop thread.  However, if the loop is terminated
    * (and never later restarted) before it has a chance to run the requested
-   * function, the function may never be run at all.  The caller is responsible
-   * for handling this situation correctly if they may terminate the loop with
-   * outstanding runInEventBaseThread() calls pending.
+   * function, the function will be run upon the EventBase's destruction.
    *
    * If two calls to runInEventBaseThread() are made from the same thread, the
    * functions will always be run in the order that they were scheduled.
@@ -313,10 +311,9 @@ class EventBase :
    * function pointer and void* argument, as it has to allocate memory to copy
    * the std::function object.
    *
-   * If the EventBase loop is terminated before it has a chance to run this
-   * function, the allocated memory will be leaked.  The caller is responsible
-   * for ensuring that the EventBase loop is not terminated before this
-   * function can run.
+   * If the loop is terminated (and never later restarted) before it has a
+   * chance to run the requested function, the function will be run upon the
+   * EventBase's destruction.
    *
    * The function must not throw any exceptions.
    */
index f817fbdc8e24bead9d9e8fc516131e4fa4f71c72..317a73ffec66f097b9948b81c6f5b0fec8c31cd7 100644 (file)
@@ -111,6 +111,17 @@ class NotificationQueue {
      */
     void stopConsuming();
 
+    /**
+     * Consume messages off the queue until it is empty. No messages may be
+     * added to the queue while it is draining, so that the process is bounded.
+     * To that end, putMessage/tryPutMessage will throw an std::runtime_error,
+     * and tryPutMessageNoThrow will return false.
+     *
+     * @returns true if the queue was drained, false otherwise. In practice,
+     * this will only fail if someone else is already draining the queue.
+     */
+    bool consumeUntilDrained() noexcept;
+
     /**
      * Get the NotificationQueue that this consumer is currently consuming
      * messages from.  Returns nullptr if the consumer is not currently
@@ -144,6 +155,17 @@ class NotificationQueue {
     virtual void handlerReady(uint16_t events) noexcept;
 
    private:
+    /**
+     * Consume messages off the the queue until
+     *   - the queue is empty (1), or
+     *   - until the consumer is destroyed, or
+     *   - until the consumer is uninstalled, or
+     *   - an exception is thrown in the course of dequeueing, or
+     *   - unless isDrain is true, until the maxReadAtOnce_ limit is hit
+     *
+     * (1) Well, maybe. See logic/comments around "wasEmpty" in implementation.
+     */
+    void consumeMessages(bool isDrain) noexcept;
 
     void setActive(bool active, bool shouldLock = false) {
       if (!queue_) {
@@ -282,6 +304,8 @@ class NotificationQueue {
    * If the queue is full, a std::overflow_error will be thrown.  The
    * setMaxQueueSize() function controls the maximum queue size.
    *
+   * If the queue is currently draining, an std::runtime_error will be thrown.
+   *
    * This method may contend briefly on a spinlock if many threads are
    * concurrently accessing the queue, but for all intents and purposes it will
    * immediately place the message on the queue and return.
@@ -301,8 +325,9 @@ class NotificationQueue {
    * No-throw versions of the above.  Instead returns true on success, false on
    * failure.
    *
-   * Only std::overflow_error is prevented from being thrown (since this is the
-   * common exception case), user code must still catch std::bad_alloc errors.
+   * Only std::overflow_error (the common exception case) and std::runtime_error
+   * (which indicates that the queue is being drained) are prevented from being
+   * thrown. User code must still catch std::bad_alloc errors.
    */
   bool tryPutMessageNoThrow(MessageT&& message) {
     return putMessageImpl(std::move(message), advisoryMaxQueueSize_, false);
@@ -318,8 +343,10 @@ class NotificationQueue {
    * and always puts the message on the queue, even if the maximum queue size
    * would be exceeded.
    *
-   * putMessage() may throw std::bad_alloc if memory allocation fails, and may
-   * throw any other exception thrown by the MessageT move/copy constructor.
+   * putMessage() may throw
+   *   - std::bad_alloc if memory allocation fails, and may
+   *   - std::runtime_error if the queue is currently draining
+   *   - any other exception thrown by the MessageT move/copy constructor.
    */
   void putMessage(MessageT&& message) {
     putMessageImpl(std::move(message), 0);
@@ -414,6 +441,13 @@ class NotificationQueue {
     return true;
   }
 
+  inline bool checkDraining(bool throws=true) {
+    if (UNLIKELY(draining_ && throws)) {
+      throw std::runtime_error("queue is draining, cannot add message");
+    }
+    return draining_;
+  }
+
   inline void signalEvent(size_t numAdded = 1) const {
     static const uint8_t kPipeMessage[] = {
       1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1
@@ -472,7 +506,7 @@ class NotificationQueue {
     bool signal = false;
     {
       folly::io::PortableSpinLockGuard g(spinlock_);
-      if (!checkQueueSize(maxSize, throws)) {
+      if (checkDraining(throws) || !checkQueueSize(maxSize, throws)) {
         return false;
       }
       // We only need to signal an event if not all consumers are
@@ -496,7 +530,7 @@ class NotificationQueue {
     bool signal = false;
     {
       folly::io::PortableSpinLockGuard g(spinlock_);
-      if (!checkQueueSize(maxSize, throws)) {
+      if (checkDraining(throws) || !checkQueueSize(maxSize, throws)) {
         return false;
       }
       if (numActiveConsumers_ < numConsumers_) {
@@ -518,6 +552,7 @@ class NotificationQueue {
     size_t numAdded = 0;
     {
       folly::io::PortableSpinLockGuard g(spinlock_);
+      checkDraining();
       while (first != last) {
         queue_.push_back(std::make_pair(*first, RequestContext::saveContext()));
         ++first;
@@ -540,6 +575,7 @@ class NotificationQueue {
   std::deque<std::pair<MessageT, std::shared_ptr<RequestContext>>> queue_;
   int numConsumers_{0};
   std::atomic<int> numActiveConsumers_{0};
+  bool draining_{false};
 };
 
 template<typename MessageT>
@@ -556,6 +592,12 @@ NotificationQueue<MessageT>::Consumer::~Consumer() {
 template<typename MessageT>
 void NotificationQueue<MessageT>::Consumer::handlerReady(uint16_t events)
     noexcept {
+  consumeMessages(false);
+}
+
+template<typename MessageT>
+void NotificationQueue<MessageT>::Consumer::consumeMessages(
+    bool isDrain) noexcept {
   uint32_t numProcessed = 0;
   bool firstRun = true;
   setActive(true);
@@ -567,7 +609,7 @@ void NotificationQueue<MessageT>::Consumer::handlerReady(uint16_t events)
     // may not actually be an event available (another consumer may
     // have read it).  We don't really care, we only care about
     // emptying the queue.
-    if (firstRun) {
+    if (!isDrain && firstRun) {
       queue_->tryConsumeEvent();
       firstRun = false;
     }
@@ -632,7 +674,8 @@ void NotificationQueue<MessageT>::Consumer::handlerReady(uint16_t events)
 
       // If we have hit maxReadAtOnce_, we are done.
       ++numProcessed;
-      if (maxReadAtOnce_ > 0 && numProcessed >= maxReadAtOnce_) {
+      if (!isDrain && maxReadAtOnce_ > 0 &&
+          numProcessed >= maxReadAtOnce_) {
         queue_->signalEvent(1);
         return;
       }
@@ -663,7 +706,9 @@ void NotificationQueue<MessageT>::Consumer::handlerReady(uint16_t events)
 
         // Push a notification back on the eventfd since we didn't actually
         // read the message off of the queue.
-        queue_->signalEvent(1);
+        if (!isDrain) {
+          queue_->signalEvent(1);
+        }
       }
 
       return;
@@ -716,4 +761,21 @@ void NotificationQueue<MessageT>::Consumer::stopConsuming() {
   queue_ = nullptr;
 }
 
+template<typename MessageT>
+bool NotificationQueue<MessageT>::Consumer::consumeUntilDrained() noexcept {
+  {
+    folly::io::PortableSpinLockGuard g(queue_->spinlock_);
+    if (queue_->draining_) {
+      return false;
+    }
+    queue_->draining_ = true;
+  }
+  consumeMessages(true);
+  {
+    folly::io::PortableSpinLockGuard g(queue_->spinlock_);
+    queue_->draining_ = false;
+  }
+  return true;
+}
+
 } // folly
index 59b89dbaf32c7b456e08c144cd00949d4675ae88..a8ff23c7b7e77d39805f85accf44c06b8a10540f 100644 (file)
@@ -1591,3 +1591,16 @@ TEST(EventBaseTest, StopBeforeLoop) {
 
   SUCCEED();
 }
+
+TEST(EventBaseTest, RunCallbacksOnDestruction) {
+  bool ran = false;
+
+  {
+    EventBase base;
+    base.runInEventBaseThread([&](){
+      ran = true;
+    });
+  }
+
+  ASSERT_TRUE(ran);
+}