From b7a0f26dcf38f4f632a50053e1fa9dcccb9bad0b Mon Sep 17 00:00:00 2001 From: Alexey Spiridonov Date: Fri, 28 Apr 2017 15:52:16 -0700 Subject: [PATCH] Simplify BackgroundThreads, move them to folly/experimental/ThreadedRepeatingFunctionRunner Summary: - `FunctionScheduler` and `EventBase` are great for sharing one thread for many functions, but one-function-per-thread is messy. - Both of those implementations are complicated, but `FunctionThreads` is dead-simple. - I made it even simpler by eliminating the former `incrementalSleep` in favor of `std::future::wait_for`, which allows instant interruption without a tweakable param. h/t aru777 for suggesting `std::future` instead of `std::condition_variable`. Reviewed By: yfeldblum Differential Revision: D4742134 fbshipit-source-id: b520bbcd5f218b2276200ffe8926722ae8a8d6ca --- folly/Makefile.am | 2 + folly/experimental/FunctionScheduler.h | 4 +- .../ThreadedRepeatingFunctionRunner.cpp | 91 ++++++++++ .../ThreadedRepeatingFunctionRunner.h | 160 ++++++++++++++++++ .../ThreadedRepeatingFunctionRunnerTest.cpp | 87 ++++++++++ 5 files changed, 343 insertions(+), 1 deletion(-) create mode 100644 folly/experimental/ThreadedRepeatingFunctionRunner.cpp create mode 100644 folly/experimental/ThreadedRepeatingFunctionRunner.h create mode 100644 folly/experimental/test/ThreadedRepeatingFunctionRunnerTest.cpp diff --git a/folly/Makefile.am b/folly/Makefile.am index 8af497b2..6d93decd 100644 --- a/folly/Makefile.am +++ b/folly/Makefile.am @@ -104,6 +104,7 @@ nobase_follyinclude_HEADERS = \ experimental/AtomicSharedPtr.h \ experimental/detail/AtomicSharedPtr-detail.h \ experimental/AutoTimer.h \ + experimental/ThreadedRepeatingFunctionRunner.h \ experimental/Bits.h \ experimental/BitVectorCoding.h \ experimental/DynamicParser.h \ @@ -536,6 +537,7 @@ libfolly_la_SOURCES = \ Uri.cpp \ Version.cpp \ experimental/AsymmetricMemoryBarrier.cpp \ + experimental/ThreadedRepeatingFunctionRunner.cpp \ experimental/bser/Dump.cpp \ experimental/bser/Load.cpp \ experimental/DynamicParser.cpp \ diff --git a/folly/experimental/FunctionScheduler.h b/folly/experimental/FunctionScheduler.h index ba55feec..0c590842 100644 --- a/folly/experimental/FunctionScheduler.h +++ b/folly/experimental/FunctionScheduler.h @@ -42,7 +42,9 @@ namespace folly { * * * Note: the class uses only one thread - if you want to use more than one - * thread use multiple FunctionScheduler objects + * thread, either use multiple FunctionScheduler objects, or check out + * ThreadedRepeatingFunctionRunner.h for a much simpler contract of + * "run each function periodically in its own thread". * * start() schedules the functions, while shutdown() terminates further * scheduling. diff --git a/folly/experimental/ThreadedRepeatingFunctionRunner.cpp b/folly/experimental/ThreadedRepeatingFunctionRunner.cpp new file mode 100644 index 00000000..56a15c9e --- /dev/null +++ b/folly/experimental/ThreadedRepeatingFunctionRunner.cpp @@ -0,0 +1,91 @@ +/* + * Copyright 2015-present Facebook, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#include "folly/experimental/ThreadedRepeatingFunctionRunner.h" + +#include +#include + +namespace folly { + +ThreadedRepeatingFunctionRunner::ThreadedRepeatingFunctionRunner() {} + +ThreadedRepeatingFunctionRunner::~ThreadedRepeatingFunctionRunner() { + stopAndWarn("ThreadedRepeatingFunctionRunner"); +} + +void ThreadedRepeatingFunctionRunner::stopAndWarn( + const std::string& class_of_destructor) { + if (stopImpl()) { + LOG(ERROR) + << "ThreadedRepeatingFunctionRunner::stop() should already have been " + << "called, since the " << class_of_destructor << " destructor is now " + << "running. This is unsafe because it means that its threads " + << "may be accessing class state that was already destroyed " + << "(e.g. derived class members, or members that were declared after " + << "the " << class_of_destructor << ") ."; + stop(); + } +} + +void ThreadedRepeatingFunctionRunner::stop() { + stopImpl(); +} + +bool ThreadedRepeatingFunctionRunner::stopImpl() { + { + std::unique_lock lock(stopMutex_); + if (stopping_) { + return false; // Do nothing if stop() is called twice. + } + stopping_ = true; + } + stopCv_.notify_all(); + for (auto& t : threads_) { + t.join(); + } + return true; +} + +void ThreadedRepeatingFunctionRunner::add( + RepeatingFn fn, + std::chrono::milliseconds initialSleep) { + threads_.emplace_back( + &ThreadedRepeatingFunctionRunner::executeInLoop, + this, + std::move(fn), + initialSleep); +} + +bool ThreadedRepeatingFunctionRunner::waitFor( + std::chrono::milliseconds duration) noexcept { + using clock = std::chrono::steady_clock; + const auto deadline = clock::now() + duration; + std::unique_lock lock(stopMutex_); + stopCv_.wait_until( + lock, deadline, [&] { return stopping_ || clock::now() > deadline; }); + return !stopping_; +} + +void ThreadedRepeatingFunctionRunner::executeInLoop( + RepeatingFn fn, + std::chrono::milliseconds initialSleep) noexcept { + auto duration = initialSleep; + while (waitFor(duration)) { + duration = fn(); + } +} + +} // namespace folly diff --git a/folly/experimental/ThreadedRepeatingFunctionRunner.h b/folly/experimental/ThreadedRepeatingFunctionRunner.h new file mode 100644 index 00000000..878da728 --- /dev/null +++ b/folly/experimental/ThreadedRepeatingFunctionRunner.h @@ -0,0 +1,160 @@ +/* + * Copyright 2015-present Facebook, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#pragma once + +#include +#include +#include +#include + +namespace folly { + +/** + * For each function `fn` you add to this object, `fn` will be run in a loop + * in its own thread, with the thread sleeping between invocations of `fn` + * for the duration returned by `fn`'s previous run. + * + * To clean up these threads, invoke `stop()`, which will interrupt sleeping + * threads. `stop()` will wait for already-running functions to return. + * + * == Alternatives == + * + * If you want to multiplex multiple functions on the same thread, you can + * either use EventBase with AsyncTimeout objects, or FunctionScheduler for + * a slightly simpler API. + * + * == Thread-safety == + * + * This type follows the common rule that: + * (1) const member functions are safe to call concurrently with const + * member functions, but + * (2) non-const member functions are not safe to call concurrently with + * any member functions. + * + * == Pitfalls == + * + * Threads and classes don't mix well in C++, so you have to be very careful + * if you want to have ThreadedRepeatingFunctionRunner as a member of your + * class. A reasonable pattern looks like this: + * + * struct MyClass { + * // Note that threads are NOT added in the constructor, for two reasons: + * // + * // (1) If you added some, and had any subsequent initialization (e.g. + * // derived class constructors), 'this' would not be fully + * // constructed when the worker threads came up, causing + * // heisenbugs. + * // + * // (2) Also, if your constructor threw after thread creation, the + * // class destructor would not be invoked, potentially leaving the + * // threads running too long. + * // + * // It's better to have explicit two-step initialization, or to lazily + * // add threads the first time they are needed. + * MyClass() : count_(0) {} + * + * // You must stop the threads as early as possible in the destruction + * // process (or even before). In the case of a class hierarchy, the + * // final class MUST always call stop() as the first thing in its + * // destructor -- otherwise, the worker threads may access already- + * // destroyed state. + * ~MyClass() { + * // if MyClass is abstract: + * threads_.stopAndWarn("MyClass"); + * // Otherwise: + * threads_.stop(); + * } + * + * // See the constructor for why two-stage initialization is preferred. + * void init() { + * threads_.add(bind(&MyClass::incrementCount, this)); + * } + * + * std::chrono::milliseconds incrementCount() { + * ++count_; + * return 10; + * } + * + * private: + * std::atomic count_; + * // Declared last because the threads' functions access other members. + * ThreadedRepeatingFunctionRunner threads_; + * }; + */ +class ThreadedRepeatingFunctionRunner final { + public: + // Returns how long to wait before the next repetition. Must not throw. + using RepeatingFn = folly::Function; + + ThreadedRepeatingFunctionRunner(); + ~ThreadedRepeatingFunctionRunner(); + + /** + * Ideally, you will call this before initiating the destruction of the + * host object. Otherwise, this should be the first thing in the + * destruction sequence. If it comes any later, worker threads may access + * class state that had already been destroyed. + */ + void stop(); + + /** + * Must be called at the TOP of the destructor of any abstract class that + * contains ThreadedRepeatingFunctionRunner (directly or through a + * parent). Any non-abstract class destructor must instead stop() at the + * top. + */ + void stopAndWarn(const std::string& class_of_destructor); + + /** + * Run your noexcept function `f` in a background loop, sleeping between + * calls for a duration returned by `f`. Optionally waits for + * `initialSleep` before calling `f` for the first time. + * + * DANGER: If a non-final class has a ThreadedRepeatingFunctionRunner + * member (which, by the way, must be declared last in the class), then + * you must not call add() in your constructor. Otherwise, your thread + * risks accessing uninitialized data belonging to a child class. To + * avoid this design bug, prefer to use two-stage initialization to start + * your threads. + */ + void add( + RepeatingFn f, + std::chrono::milliseconds initialSleep = std::chrono::milliseconds(0)); + + size_t size() const { return threads_.size(); } + + private: + // Returns true if this is the first stop(). + bool stopImpl(); + + // Sleep for a duration, or until stop() is called. + bool waitFor(std::chrono::milliseconds duration) noexcept; + + // Noexcept allows us to get a good backtrace on crashes -- otherwise, + // std::terminate would get called **outside** of the thread function. + void executeInLoop( + RepeatingFn, + std::chrono::milliseconds initialSleep) noexcept; + + std::mutex stopMutex_; + bool stopping_{false}; // protected by stopMutex_ + std::condition_variable stopCv_; + + std::vector threads_; +}; + +} // namespace folly diff --git a/folly/experimental/test/ThreadedRepeatingFunctionRunnerTest.cpp b/folly/experimental/test/ThreadedRepeatingFunctionRunnerTest.cpp new file mode 100644 index 00000000..28c79f39 --- /dev/null +++ b/folly/experimental/test/ThreadedRepeatingFunctionRunnerTest.cpp @@ -0,0 +1,87 @@ +/* + * Copyright 2015-present Facebook, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#include "folly/experimental/ThreadedRepeatingFunctionRunner.h" + +#include +#include + +using namespace std; + +struct Foo { + explicit Foo(std::atomic& d) : data(d) {} + ~Foo() { + runner_.stop(); + } + + void start() { + runner_.add([this]() { + ++data; + return std::chrono::seconds(0); + }); + } + + std::atomic& data; + folly::ThreadedRepeatingFunctionRunner runner_; // Must be declared last +}; + +struct FooLongSleep { + explicit FooLongSleep(std::atomic& d) : data(d) {} + ~FooLongSleep() { + runner_.stop(); + data.store(-1); + } + + void start() { + runner_.add([this]() { + data.store(1); + return 1000h; // Test would time out if we waited + }); + } + + std::atomic& data; + folly::ThreadedRepeatingFunctionRunner runner_; // Must be declared last +}; + +TEST(TestThreadedRepeatingFunctionRunner, HandleBackgroundLoop) { + std::atomic data(0); + { + Foo f(data); + EXPECT_EQ(0, data.load()); + f.start(); // Runs increment thread in background + while (data.load() == 0) { + /* sleep override */ this_thread::sleep_for(chrono::milliseconds(10)); + } + } + // The increment thread should have been destroyed + auto prev_val = data.load(); + /* sleep override */ this_thread::sleep_for(chrono::milliseconds(100)); + EXPECT_EQ(data.load(), prev_val); +} + +TEST(TestThreadedRepeatingFunctionRunner, HandleLongSleepingThread) { + std::atomic data(0); + { + FooLongSleep f(data); + EXPECT_EQ(0, data.load()); + f.start(); + while (data.load() == 0) { + /* sleep override */ this_thread::sleep_for(chrono::milliseconds(10)); + } + EXPECT_EQ(1, data.load()); + } + // Foo should have been destroyed, which stopped the thread! + EXPECT_EQ(-1, data.load()); +} -- 2.34.1