2 * Copyright 2017-present Facebook, Inc.
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
8 * http://www.apache.org/licenses/LICENSE-2.0
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
17 #include <folly/executors/CPUThreadPoolExecutor.h>
18 #include <folly/executors/task_queue/PriorityLifoSemMPMCQueue.h>
22 const size_t CPUThreadPoolExecutor::kDefaultMaxQueueSize = 1 << 14;
24 CPUThreadPoolExecutor::CPUThreadPoolExecutor(
26 std::unique_ptr<BlockingQueue<CPUTask>> taskQueue,
27 std::shared_ptr<ThreadFactory> threadFactory)
28 : ThreadPoolExecutor(numThreads, std::move(threadFactory)),
29 taskQueue_(std::move(taskQueue)) {
30 setNumThreads(numThreads);
33 CPUThreadPoolExecutor::CPUThreadPoolExecutor(
35 std::shared_ptr<ThreadFactory> threadFactory)
36 : CPUThreadPoolExecutor(
38 std::make_unique<LifoSemMPMCQueue<CPUTask>>(
39 CPUThreadPoolExecutor::kDefaultMaxQueueSize),
40 std::move(threadFactory)) {}
42 CPUThreadPoolExecutor::CPUThreadPoolExecutor(size_t numThreads)
43 : CPUThreadPoolExecutor(
45 std::make_shared<NamedThreadFactory>("CPUThreadPool")) {}
47 CPUThreadPoolExecutor::CPUThreadPoolExecutor(
50 std::shared_ptr<ThreadFactory> threadFactory)
51 : CPUThreadPoolExecutor(
53 std::make_unique<PriorityLifoSemMPMCQueue<CPUTask>>(
55 CPUThreadPoolExecutor::kDefaultMaxQueueSize),
56 std::move(threadFactory)) {}
58 CPUThreadPoolExecutor::CPUThreadPoolExecutor(
62 std::shared_ptr<ThreadFactory> threadFactory)
63 : CPUThreadPoolExecutor(
65 std::make_unique<PriorityLifoSemMPMCQueue<CPUTask>>(
68 std::move(threadFactory)) {}
70 CPUThreadPoolExecutor::~CPUThreadPoolExecutor() {
72 CHECK(threadsToStop_ == 0);
75 void CPUThreadPoolExecutor::add(Func func) {
76 add(std::move(func), std::chrono::milliseconds(0));
79 void CPUThreadPoolExecutor::add(
81 std::chrono::milliseconds expiration,
82 Func expireCallback) {
83 // TODO handle enqueue failure, here and in other add() callsites
85 CPUTask(std::move(func), expiration, std::move(expireCallback)));
88 void CPUThreadPoolExecutor::addWithPriority(Func func, int8_t priority) {
89 add(std::move(func), priority, std::chrono::milliseconds(0));
92 void CPUThreadPoolExecutor::add(
95 std::chrono::milliseconds expiration,
96 Func expireCallback) {
97 CHECK(getNumPriorities() > 0);
98 taskQueue_->addWithPriority(
99 CPUTask(std::move(func), expiration, std::move(expireCallback)),
103 uint8_t CPUThreadPoolExecutor::getNumPriorities() const {
104 return taskQueue_->getNumPriorities();
107 BlockingQueue<CPUThreadPoolExecutor::CPUTask>*
108 CPUThreadPoolExecutor::getTaskQueue() {
109 return taskQueue_.get();
112 void CPUThreadPoolExecutor::threadRun(std::shared_ptr<Thread> thread) {
113 this->threadPoolHook_.registerThread();
115 thread->startupBaton.post();
117 auto task = taskQueue_->take();
118 if (UNLIKELY(task.poison)) {
119 CHECK(threadsToStop_-- > 0);
120 for (auto& o : observers_) {
121 o->threadStopped(thread.get());
123 folly::RWSpinLock::WriteHolder w{&threadListLock_};
124 threadList_.remove(thread);
125 stoppedThreads_.add(thread);
128 runTask(thread, std::move(task));
131 if (UNLIKELY(threadsToStop_ > 0 && !isJoin_)) {
132 if (--threadsToStop_ >= 0) {
133 folly::RWSpinLock::WriteHolder w{&threadListLock_};
134 threadList_.remove(thread);
135 stoppedThreads_.add(thread);
144 void CPUThreadPoolExecutor::stopThreads(size_t n) {
146 for (size_t i = 0; i < n; i++) {
147 taskQueue_->addWithPriority(CPUTask(), Executor::LO_PRI);
151 // threadListLock_ is readlocked
152 uint64_t CPUThreadPoolExecutor::getPendingTaskCountImpl(
153 const folly::RWSpinLock::ReadHolder&) {
154 return taskQueue_->size();