From: James Sedgwick Date: Thu, 4 Dec 2014 18:29:24 +0000 (-0800) Subject: add consumeUntilDrained API to NotificationQueue::Consumer X-Git-Tag: v0.22.0~137 X-Git-Url: http://demsky.eecs.uci.edu/git/?a=commitdiff_plain;h=35ed5364f1ba809d6d103c98c03ae951aab3bc05;p=folly.git add consumeUntilDrained API to NotificationQueue::Consumer Summary: ... and employ it in EventBase's destructor to stop leaking memory from unexecuted NotificationQueue-variety runInEventBaseThread() callbacks in addition to the attached task, this should also fix the root cause of the already-worked-around #5564342 Test Plan: test no longer leaks.* on the other hand, IOThreadPoolExecutor::stop() no longer actually stops, it joins. I added comments to stop() in headers indicating that it should be treated as best-effort. * hilariously, this actually isn't true - there's a new leak of a thread local inside the io pool because callbacks can outlive the pool now. i'll leave a full explanation for the upcoming patch. Reviewed By: davejwatson@fb.com Subscribers: alandau, bmatheny, mshneer, trunkagent, fugalh, njormrod, folly-diffs@ FB internal diff: D1682860 Tasks: 5336655 Signature: t1:1682860:1416347774:ac5f31fb72373992f425c93ac284a0cd27608db4 --- diff --git a/folly/experimental/wangle/concurrent/IOThreadPoolExecutor.h b/folly/experimental/wangle/concurrent/IOThreadPoolExecutor.h index 35cce4c0..0dcfd2ef 100644 --- a/folly/experimental/wangle/concurrent/IOThreadPoolExecutor.h +++ b/folly/experimental/wangle/concurrent/IOThreadPoolExecutor.h @@ -20,6 +20,8 @@ namespace folly { namespace wangle { +// N.B. For this thread pool, stop() behaves like join() because outstanding +// tasks belong to the event base and will be executed upon its destruction. class IOThreadPoolExecutor : public ThreadPoolExecutor { public: explicit IOThreadPoolExecutor( diff --git a/folly/experimental/wangle/concurrent/ThreadPoolExecutor.h b/folly/experimental/wangle/concurrent/ThreadPoolExecutor.h index 52c7d1f7..b6b0dd78 100644 --- a/folly/experimental/wangle/concurrent/ThreadPoolExecutor.h +++ b/folly/experimental/wangle/concurrent/ThreadPoolExecutor.h @@ -56,6 +56,11 @@ class ThreadPoolExecutor : public Executor { size_t numThreads(); void setNumThreads(size_t numThreads); + /* + * stop() is best effort - there is no guarantee that unexecuted tasks won't + * be executed before it returns. Specifically, IOThreadPoolExecutor's stop() + * behaves like join(). + */ void stop(); void join(); diff --git a/folly/experimental/wangle/concurrent/test/ThreadPoolExecutorTest.cpp b/folly/experimental/wangle/concurrent/test/ThreadPoolExecutorTest.cpp index 6e3782ce..2749d0cd 100644 --- a/folly/experimental/wangle/concurrent/test/ThreadPoolExecutorTest.cpp +++ b/folly/experimental/wangle/concurrent/test/ThreadPoolExecutorTest.cpp @@ -75,6 +75,24 @@ static void stop() { EXPECT_GT(1000, completed); } +// IOThreadPoolExecutor's stop() behaves like join(). Outstanding tasks belong +// to the event base, will be executed upon its destruction, and cannot be +// taken back. +template <> +void stop() { + IOThreadPoolExecutor tpe(1); + std::atomic completed(0); + auto f = [&](){ + burnMs(10)(); + completed++; + }; + for (int i = 0; i < 10; i++) { + tpe.add(f); + } + tpe.stop(); + EXPECT_EQ(10, completed); +} + TEST(ThreadPoolExecutorTest, CPUStop) { stop(); } diff --git a/folly/io/async/EventBase.cpp b/folly/io/async/EventBase.cpp index 6916ceac..7bfc2213 100644 --- a/folly/io/async/EventBase.cpp +++ b/folly/io/async/EventBase.cpp @@ -221,6 +221,10 @@ EventBase::~EventBase() { (void) runLoopCallbacks(false); + if (!fnRunner_->consumeUntilDrained()) { + LOG(ERROR) << "~EventBase(): Unable to drain notification queue"; + } + // Stop consumer before deleting NotificationQueue fnRunner_->stopConsuming(); { diff --git a/folly/io/async/EventBase.h b/folly/io/async/EventBase.h index d1846071..92de83fc 100644 --- a/folly/io/async/EventBase.h +++ b/folly/io/async/EventBase.h @@ -281,9 +281,7 @@ class EventBase : * If runInEventBaseThread() returns true the function has successfully been * scheduled to run in the loop thread. However, if the loop is terminated * (and never later restarted) before it has a chance to run the requested - * function, the function may never be run at all. The caller is responsible - * for handling this situation correctly if they may terminate the loop with - * outstanding runInEventBaseThread() calls pending. + * function, the function will be run upon the EventBase's destruction. * * If two calls to runInEventBaseThread() are made from the same thread, the * functions will always be run in the order that they were scheduled. @@ -313,10 +311,9 @@ class EventBase : * function pointer and void* argument, as it has to allocate memory to copy * the std::function object. * - * If the EventBase loop is terminated before it has a chance to run this - * function, the allocated memory will be leaked. The caller is responsible - * for ensuring that the EventBase loop is not terminated before this - * function can run. + * If the loop is terminated (and never later restarted) before it has a + * chance to run the requested function, the function will be run upon the + * EventBase's destruction. * * The function must not throw any exceptions. */ diff --git a/folly/io/async/NotificationQueue.h b/folly/io/async/NotificationQueue.h index f817fbdc..317a73ff 100644 --- a/folly/io/async/NotificationQueue.h +++ b/folly/io/async/NotificationQueue.h @@ -111,6 +111,17 @@ class NotificationQueue { */ void stopConsuming(); + /** + * Consume messages off the queue until it is empty. No messages may be + * added to the queue while it is draining, so that the process is bounded. + * To that end, putMessage/tryPutMessage will throw an std::runtime_error, + * and tryPutMessageNoThrow will return false. + * + * @returns true if the queue was drained, false otherwise. In practice, + * this will only fail if someone else is already draining the queue. + */ + bool consumeUntilDrained() noexcept; + /** * Get the NotificationQueue that this consumer is currently consuming * messages from. Returns nullptr if the consumer is not currently @@ -144,6 +155,17 @@ class NotificationQueue { virtual void handlerReady(uint16_t events) noexcept; private: + /** + * Consume messages off the the queue until + * - the queue is empty (1), or + * - until the consumer is destroyed, or + * - until the consumer is uninstalled, or + * - an exception is thrown in the course of dequeueing, or + * - unless isDrain is true, until the maxReadAtOnce_ limit is hit + * + * (1) Well, maybe. See logic/comments around "wasEmpty" in implementation. + */ + void consumeMessages(bool isDrain) noexcept; void setActive(bool active, bool shouldLock = false) { if (!queue_) { @@ -282,6 +304,8 @@ class NotificationQueue { * If the queue is full, a std::overflow_error will be thrown. The * setMaxQueueSize() function controls the maximum queue size. * + * If the queue is currently draining, an std::runtime_error will be thrown. + * * This method may contend briefly on a spinlock if many threads are * concurrently accessing the queue, but for all intents and purposes it will * immediately place the message on the queue and return. @@ -301,8 +325,9 @@ class NotificationQueue { * No-throw versions of the above. Instead returns true on success, false on * failure. * - * Only std::overflow_error is prevented from being thrown (since this is the - * common exception case), user code must still catch std::bad_alloc errors. + * Only std::overflow_error (the common exception case) and std::runtime_error + * (which indicates that the queue is being drained) are prevented from being + * thrown. User code must still catch std::bad_alloc errors. */ bool tryPutMessageNoThrow(MessageT&& message) { return putMessageImpl(std::move(message), advisoryMaxQueueSize_, false); @@ -318,8 +343,10 @@ class NotificationQueue { * and always puts the message on the queue, even if the maximum queue size * would be exceeded. * - * putMessage() may throw std::bad_alloc if memory allocation fails, and may - * throw any other exception thrown by the MessageT move/copy constructor. + * putMessage() may throw + * - std::bad_alloc if memory allocation fails, and may + * - std::runtime_error if the queue is currently draining + * - any other exception thrown by the MessageT move/copy constructor. */ void putMessage(MessageT&& message) { putMessageImpl(std::move(message), 0); @@ -414,6 +441,13 @@ class NotificationQueue { return true; } + inline bool checkDraining(bool throws=true) { + if (UNLIKELY(draining_ && throws)) { + throw std::runtime_error("queue is draining, cannot add message"); + } + return draining_; + } + inline void signalEvent(size_t numAdded = 1) const { static const uint8_t kPipeMessage[] = { 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1 @@ -472,7 +506,7 @@ class NotificationQueue { bool signal = false; { folly::io::PortableSpinLockGuard g(spinlock_); - if (!checkQueueSize(maxSize, throws)) { + if (checkDraining(throws) || !checkQueueSize(maxSize, throws)) { return false; } // We only need to signal an event if not all consumers are @@ -496,7 +530,7 @@ class NotificationQueue { bool signal = false; { folly::io::PortableSpinLockGuard g(spinlock_); - if (!checkQueueSize(maxSize, throws)) { + if (checkDraining(throws) || !checkQueueSize(maxSize, throws)) { return false; } if (numActiveConsumers_ < numConsumers_) { @@ -518,6 +552,7 @@ class NotificationQueue { size_t numAdded = 0; { folly::io::PortableSpinLockGuard g(spinlock_); + checkDraining(); while (first != last) { queue_.push_back(std::make_pair(*first, RequestContext::saveContext())); ++first; @@ -540,6 +575,7 @@ class NotificationQueue { std::deque>> queue_; int numConsumers_{0}; std::atomic numActiveConsumers_{0}; + bool draining_{false}; }; template @@ -556,6 +592,12 @@ NotificationQueue::Consumer::~Consumer() { template void NotificationQueue::Consumer::handlerReady(uint16_t events) noexcept { + consumeMessages(false); +} + +template +void NotificationQueue::Consumer::consumeMessages( + bool isDrain) noexcept { uint32_t numProcessed = 0; bool firstRun = true; setActive(true); @@ -567,7 +609,7 @@ void NotificationQueue::Consumer::handlerReady(uint16_t events) // may not actually be an event available (another consumer may // have read it). We don't really care, we only care about // emptying the queue. - if (firstRun) { + if (!isDrain && firstRun) { queue_->tryConsumeEvent(); firstRun = false; } @@ -632,7 +674,8 @@ void NotificationQueue::Consumer::handlerReady(uint16_t events) // If we have hit maxReadAtOnce_, we are done. ++numProcessed; - if (maxReadAtOnce_ > 0 && numProcessed >= maxReadAtOnce_) { + if (!isDrain && maxReadAtOnce_ > 0 && + numProcessed >= maxReadAtOnce_) { queue_->signalEvent(1); return; } @@ -663,7 +706,9 @@ void NotificationQueue::Consumer::handlerReady(uint16_t events) // Push a notification back on the eventfd since we didn't actually // read the message off of the queue. - queue_->signalEvent(1); + if (!isDrain) { + queue_->signalEvent(1); + } } return; @@ -716,4 +761,21 @@ void NotificationQueue::Consumer::stopConsuming() { queue_ = nullptr; } +template +bool NotificationQueue::Consumer::consumeUntilDrained() noexcept { + { + folly::io::PortableSpinLockGuard g(queue_->spinlock_); + if (queue_->draining_) { + return false; + } + queue_->draining_ = true; + } + consumeMessages(true); + { + folly::io::PortableSpinLockGuard g(queue_->spinlock_); + queue_->draining_ = false; + } + return true; +} + } // folly diff --git a/folly/io/async/test/EventBaseTest.cpp b/folly/io/async/test/EventBaseTest.cpp index 59b89dba..a8ff23c7 100644 --- a/folly/io/async/test/EventBaseTest.cpp +++ b/folly/io/async/test/EventBaseTest.cpp @@ -1591,3 +1591,16 @@ TEST(EventBaseTest, StopBeforeLoop) { SUCCEED(); } + +TEST(EventBaseTest, RunCallbacksOnDestruction) { + bool ran = false; + + { + EventBase base; + base.runInEventBaseThread([&](){ + ran = true; + }); + } + + ASSERT_TRUE(ran); +}