2 * Copyright 2017 Facebook, Inc.
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
8 * http://www.apache.org/licenses/LICENSE-2.0
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
18 #include <folly/Baton.h>
19 #include <folly/Executor.h>
20 #include <folly/Memory.h>
21 #include <folly/RWSpinLock.h>
22 #include <folly/executors/GlobalThreadPoolList.h>
23 #include <folly/executors/task_queue/LifoSemMPMCQueue.h>
24 #include <folly/executors/thread_factory/NamedThreadFactory.h>
25 #include <folly/io/async/Request.h>
31 #include <glog/logging.h>
35 class ThreadPoolExecutor : public virtual folly::Executor {
37 explicit ThreadPoolExecutor(
39 std::shared_ptr<ThreadFactory> threadFactory,
40 bool isWaitForAll = false);
42 ~ThreadPoolExecutor() override;
44 void add(Func func) override = 0;
46 add(Func func, std::chrono::milliseconds expiration, Func expireCallback) = 0;
48 void setThreadFactory(std::shared_ptr<ThreadFactory> threadFactory) {
49 CHECK(numThreads() == 0);
50 threadFactory_ = std::move(threadFactory);
53 std::shared_ptr<ThreadFactory> getThreadFactory() {
54 return threadFactory_;
58 void setNumThreads(size_t numThreads);
60 * stop() is best effort - there is no guarantee that unexecuted tasks won't
61 * be executed before it returns. Specifically, IOThreadPoolExecutor's stop()
62 * behaves like join().
75 size_t threadCount, idleThreadCount, activeThreadCount;
76 uint64_t pendingTaskCount, totalTaskCount;
77 std::chrono::nanoseconds maxIdleTime;
80 PoolStats getPoolStats();
81 uint64_t getPendingTaskCount();
84 TaskStats() : expired(false), waitTime(0), runTime(0) {}
86 std::chrono::nanoseconds waitTime;
87 std::chrono::nanoseconds runTime;
90 using TaskStatsCallback = std::function<void(TaskStats)>;
91 void subscribeToTaskStats(TaskStatsCallback cb);
94 * Base class for threads created with ThreadPoolExecutor.
95 * Some subclasses have methods that operate on these
100 virtual ~ThreadHandle() = default;
104 * Observer interface for thread start/stop.
105 * Provides hooks so actions can be taken when
106 * threads are created
110 virtual void threadStarted(ThreadHandle*) = 0;
111 virtual void threadStopped(ThreadHandle*) = 0;
112 virtual void threadPreviouslyStarted(ThreadHandle* h) {
115 virtual void threadNotYetStopped(ThreadHandle* h) {
118 virtual ~Observer() = default;
121 void addObserver(std::shared_ptr<Observer>);
122 void removeObserver(std::shared_ptr<Observer>);
125 // Prerequisite: threadListLock_ writelocked
126 void addThreads(size_t n);
127 // Prerequisite: threadListLock_ writelocked
128 void removeThreads(size_t n, bool isJoin);
130 struct TaskStatsCallbackRegistry;
132 struct FOLLY_ALIGN_TO_AVOID_FALSE_SHARING Thread : public ThreadHandle {
133 explicit Thread(ThreadPoolExecutor* pool)
137 lastActiveTime(std::chrono::steady_clock::now()),
138 taskStatsCallbacks(pool->taskStatsCallbacks_) {}
140 ~Thread() override = default;
142 static std::atomic<uint64_t> nextId;
146 std::chrono::steady_clock::time_point lastActiveTime;
147 folly::Baton<> startupBaton;
148 std::shared_ptr<TaskStatsCallbackRegistry> taskStatsCallbacks;
151 typedef std::shared_ptr<Thread> ThreadPtr;
156 std::chrono::milliseconds expiration,
157 Func&& expireCallback);
160 std::chrono::steady_clock::time_point enqueueTime_;
161 std::chrono::milliseconds expiration_;
162 Func expireCallback_;
163 std::shared_ptr<folly::RequestContext> context_;
166 static void runTask(const ThreadPtr& thread, Task&& task);
168 // The function that will be bound to pool threads. It must call
169 // thread->startupBaton.post() when it's ready to consume work.
170 virtual void threadRun(ThreadPtr thread) = 0;
172 // Stop n threads and put their ThreadPtrs in the stoppedThreads_ queue
173 // and remove them from threadList_, either synchronize or asynchronize
174 // Prerequisite: threadListLock_ writelocked
175 virtual void stopThreads(size_t n) = 0;
177 // Join n stopped threads and remove them from waitingForJoinThreads_ queue.
178 // Should not hold a lock because joining thread operation may invoke some
179 // cleanup operations on the thread, and those cleanup operations may
180 // require a lock on ThreadPoolExecutor.
181 void joinStoppedThreads(size_t n);
183 // Create a suitable Thread struct
184 virtual ThreadPtr makeThread() {
185 return std::make_shared<Thread>(this);
188 // Prerequisite: threadListLock_ readlocked
189 virtual uint64_t getPendingTaskCountImpl(const RWSpinLock::ReadHolder&) = 0;
193 void add(const ThreadPtr& state) {
194 auto it = std::lower_bound(
198 // compare method is a static method of class
199 // and therefore cannot be inlined by compiler
200 // as a template predicate of the STL algorithm
201 // but wrapped up with the lambda function (lambda will be inlined)
202 // compiler can inline compare method as well
203 [&](const ThreadPtr& ts1, const ThreadPtr& ts2) -> bool { // inline
204 return compare(ts1, ts2);
206 vec_.insert(it, state);
209 void remove(const ThreadPtr& state) {
210 auto itPair = std::equal_range(
215 [&](const ThreadPtr& ts1, const ThreadPtr& ts2) -> bool { // inline
216 return compare(ts1, ts2);
218 CHECK(itPair.first != vec_.end());
219 CHECK(std::next(itPair.first) == itPair.second);
220 vec_.erase(itPair.first);
223 const std::vector<ThreadPtr>& get() const {
228 static bool compare(const ThreadPtr& ts1, const ThreadPtr& ts2) {
229 return ts1->id < ts2->id;
232 std::vector<ThreadPtr> vec_;
235 class StoppedThreadQueue : public BlockingQueue<ThreadPtr> {
237 void add(ThreadPtr item) override;
238 ThreadPtr take() override;
239 size_t size() override;
244 std::queue<ThreadPtr> queue_;
247 std::shared_ptr<ThreadFactory> threadFactory_;
248 const bool isWaitForAll_; // whether to wait till event base loop exits
250 ThreadList threadList_;
251 folly::RWSpinLock threadListLock_;
252 StoppedThreadQueue stoppedThreads_;
253 std::atomic<bool> isJoin_; // whether the current downsizing is a join
255 struct TaskStatsCallbackRegistry {
256 folly::ThreadLocal<bool> inCallback;
257 folly::Synchronized<std::vector<TaskStatsCallback>> callbackList;
259 std::shared_ptr<TaskStatsCallbackRegistry> taskStatsCallbacks_;
260 std::vector<std::shared_ptr<Observer>> observers_;
261 folly::ThreadPoolListHook threadPoolHook_;