2 * Copyright 2017 Facebook, Inc.
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
8 * http://www.apache.org/licenses/LICENSE-2.0
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
19 #include <folly/executors/CPUThreadPoolExecutor.h>
20 #include <folly/executors/FutureExecutor.h>
21 #include <folly/executors/IOThreadPoolExecutor.h>
22 #include <folly/executors/ThreadPoolExecutor.h>
23 #include <folly/executors/task_queue/LifoSemMPMCQueue.h>
24 #include <folly/executors/thread_factory/PriorityThreadFactory.h>
25 #include <folly/portability/GTest.h>
27 using namespace folly;
28 using namespace std::chrono;
30 static Func burnMs(uint64_t ms) {
31 return [ms]() { std::this_thread::sleep_for(milliseconds(ms)); };
40 TEST(ThreadPoolExecutorTest, CPUBasic) {
41 basic<CPUThreadPoolExecutor>();
44 TEST(IOThreadPoolExecutorTest, IOBasic) {
45 basic<IOThreadPoolExecutor>();
49 static void resize() {
51 EXPECT_EQ(100, tpe.numThreads());
52 tpe.setNumThreads(50);
53 EXPECT_EQ(50, tpe.numThreads());
54 tpe.setNumThreads(150);
55 EXPECT_EQ(150, tpe.numThreads());
58 TEST(ThreadPoolExecutorTest, CPUResize) {
59 resize<CPUThreadPoolExecutor>();
62 TEST(ThreadPoolExecutorTest, IOResize) {
63 resize<IOThreadPoolExecutor>();
69 std::atomic<int> completed(0);
74 for (int i = 0; i < 1000; i++) {
78 EXPECT_GT(1000, completed);
81 // IOThreadPoolExecutor's stop() behaves like join(). Outstanding tasks belong
82 // to the event base, will be executed upon its destruction, and cannot be
85 void stop<IOThreadPoolExecutor>() {
86 IOThreadPoolExecutor tpe(1);
87 std::atomic<int> completed(0);
92 for (int i = 0; i < 10; i++) {
96 EXPECT_EQ(10, completed);
99 TEST(ThreadPoolExecutorTest, CPUStop) {
100 stop<CPUThreadPoolExecutor>();
103 TEST(ThreadPoolExecutorTest, IOStop) {
104 stop<IOThreadPoolExecutor>();
110 std::atomic<int> completed(0);
115 for (int i = 0; i < 1000; i++) {
119 EXPECT_EQ(1000, completed);
122 TEST(ThreadPoolExecutorTest, CPUJoin) {
123 join<CPUThreadPoolExecutor>();
126 TEST(ThreadPoolExecutorTest, IOJoin) {
127 join<IOThreadPoolExecutor>();
131 static void resizeUnderLoad() {
133 std::atomic<int> completed(0);
138 for (int i = 0; i < 1000; i++) {
141 tpe.setNumThreads(5);
142 tpe.setNumThreads(15);
144 EXPECT_EQ(1000, completed);
147 TEST(ThreadPoolExecutorTest, CPUResizeUnderLoad) {
148 resizeUnderLoad<CPUThreadPoolExecutor>();
151 TEST(ThreadPoolExecutorTest, IOResizeUnderLoad) {
152 resizeUnderLoad<IOThreadPoolExecutor>();
156 static void poolStats() {
157 folly::Baton<> startBaton, endBaton;
159 auto stats = tpe.getPoolStats();
160 EXPECT_EQ(1, stats.threadCount);
161 EXPECT_EQ(1, stats.idleThreadCount);
162 EXPECT_EQ(0, stats.activeThreadCount);
163 EXPECT_EQ(0, stats.pendingTaskCount);
164 EXPECT_EQ(0, tpe.getPendingTaskCount());
165 EXPECT_EQ(0, stats.totalTaskCount);
172 stats = tpe.getPoolStats();
173 EXPECT_EQ(1, stats.threadCount);
174 EXPECT_EQ(0, stats.idleThreadCount);
175 EXPECT_EQ(1, stats.activeThreadCount);
176 EXPECT_EQ(1, stats.pendingTaskCount);
177 EXPECT_EQ(1, tpe.getPendingTaskCount());
178 EXPECT_EQ(2, stats.totalTaskCount);
182 TEST(ThreadPoolExecutorTest, CPUPoolStats) {
183 poolStats<CPUThreadPoolExecutor>();
186 TEST(ThreadPoolExecutorTest, IOPoolStats) {
187 poolStats<IOThreadPoolExecutor>();
191 static void taskStats() {
193 std::atomic<int> c(0);
194 tpe.subscribeToTaskStats([&](ThreadPoolExecutor::TaskStats stats) {
196 EXPECT_LT(milliseconds(0), stats.runTime);
198 EXPECT_LT(milliseconds(0), stats.waitTime);
207 TEST(ThreadPoolExecutorTest, CPUTaskStats) {
208 taskStats<CPUThreadPoolExecutor>();
211 TEST(ThreadPoolExecutorTest, IOTaskStats) {
212 taskStats<IOThreadPoolExecutor>();
216 static void expiration() {
218 std::atomic<int> statCbCount(0);
219 tpe.subscribeToTaskStats([&](ThreadPoolExecutor::TaskStats stats) {
220 int i = statCbCount++;
222 EXPECT_FALSE(stats.expired);
224 EXPECT_TRUE(stats.expired);
229 std::atomic<int> expireCbCount(0);
230 auto expireCb = [&]() { expireCbCount++; };
231 tpe.add(burnMs(10), seconds(60), expireCb);
232 tpe.add(burnMs(10), milliseconds(10), expireCb);
234 EXPECT_EQ(2, statCbCount);
235 EXPECT_EQ(1, expireCbCount);
238 TEST(ThreadPoolExecutorTest, CPUExpiration) {
239 expiration<CPUThreadPoolExecutor>();
242 TEST(ThreadPoolExecutorTest, IOExpiration) {
243 expiration<IOThreadPoolExecutor>();
246 template <typename TPE>
247 static void futureExecutor() {
248 FutureExecutor<TPE> fe(2);
249 std::atomic<int> c{0};
250 fe.addFuture([]() { return makeFuture<int>(42); }).then([&](Try<int>&& t) {
252 EXPECT_EQ(42, t.value());
254 fe.addFuture([]() { return 100; }).then([&](Try<int>&& t) {
256 EXPECT_EQ(100, t.value());
258 fe.addFuture([]() { return makeFuture(); }).then([&](Try<Unit>&& t) {
260 EXPECT_NO_THROW(t.value());
262 fe.addFuture([]() { return; }).then([&](Try<Unit>&& t) {
264 EXPECT_NO_THROW(t.value());
266 fe.addFuture([]() { throw std::runtime_error("oops"); })
267 .then([&](Try<Unit>&& t) {
269 EXPECT_THROW(t.value(), std::runtime_error);
271 // Test doing actual async work
272 folly::Baton<> baton;
274 auto p = std::make_shared<Promise<int>>();
275 std::thread t([p]() {
280 return p->getFuture();
282 .then([&](Try<int>&& t) {
283 EXPECT_EQ(42, t.value());
292 TEST(ThreadPoolExecutorTest, CPUFuturePool) {
293 futureExecutor<CPUThreadPoolExecutor>();
296 TEST(ThreadPoolExecutorTest, IOFuturePool) {
297 futureExecutor<IOThreadPoolExecutor>();
300 TEST(ThreadPoolExecutorTest, PriorityPreemptionTest) {
301 bool tookLopri = false;
304 EXPECT_FALSE(tookLopri);
311 CPUThreadPoolExecutor pool(0, 2);
312 for (int i = 0; i < 50; i++) {
313 pool.addWithPriority(lopri, Executor::LO_PRI);
315 for (int i = 0; i < 50; i++) {
316 pool.addWithPriority(hipri, Executor::HI_PRI);
318 pool.setNumThreads(1);
320 EXPECT_EQ(100, completed);
323 class TestObserver : public ThreadPoolExecutor::Observer {
325 void threadStarted(ThreadPoolExecutor::ThreadHandle*) override {
328 void threadStopped(ThreadPoolExecutor::ThreadHandle*) override {
331 void threadPreviouslyStarted(ThreadPoolExecutor::ThreadHandle*) override {
334 void threadNotYetStopped(ThreadPoolExecutor::ThreadHandle*) override {
338 ASSERT_EQ(threads_, 0);
342 std::atomic<int> threads_{0};
345 TEST(ThreadPoolExecutorTest, IOObserver) {
346 auto observer = std::make_shared<TestObserver>();
349 IOThreadPoolExecutor exe(10);
350 exe.addObserver(observer);
351 exe.setNumThreads(3);
352 exe.setNumThreads(0);
353 exe.setNumThreads(7);
354 exe.removeObserver(observer);
355 exe.setNumThreads(10);
358 observer->checkCalls();
361 TEST(ThreadPoolExecutorTest, CPUObserver) {
362 auto observer = std::make_shared<TestObserver>();
365 CPUThreadPoolExecutor exe(10);
366 exe.addObserver(observer);
367 exe.setNumThreads(3);
368 exe.setNumThreads(0);
369 exe.setNumThreads(7);
370 exe.removeObserver(observer);
371 exe.setNumThreads(10);
374 observer->checkCalls();
377 TEST(ThreadPoolExecutorTest, AddWithPriority) {
378 std::atomic_int c{0};
379 auto f = [&] { c++; };
381 // IO exe doesn't support priorities
382 IOThreadPoolExecutor ioExe(10);
383 EXPECT_THROW(ioExe.addWithPriority(f, 0), std::runtime_error);
385 CPUThreadPoolExecutor cpuExe(10, 3);
386 cpuExe.addWithPriority(f, -1);
387 cpuExe.addWithPriority(f, 0);
388 cpuExe.addWithPriority(f, 1);
389 cpuExe.addWithPriority(f, -2); // will add at the lowest priority
390 cpuExe.addWithPriority(f, 2); // will add at the highest priority
391 cpuExe.addWithPriority(f, Executor::LO_PRI);
392 cpuExe.addWithPriority(f, Executor::HI_PRI);
398 TEST(ThreadPoolExecutorTest, BlockingQueue) {
399 std::atomic_int c{0};
404 const int kQueueCapacity = 1;
405 const int kThreads = 1;
407 auto queue = std::make_unique<LifoSemMPMCQueue<
408 CPUThreadPoolExecutor::CPUTask,
409 QueueBehaviorIfFull::BLOCK>>(kQueueCapacity);
411 CPUThreadPoolExecutor cpuExe(
414 std::make_shared<NamedThreadFactory>("CPUThreadPool"));
416 // Add `f` five times. It sleeps for 1ms every time. Calling
417 // `cppExec.add()` is *almost* guaranteed to block because there's
418 // only 1 cpu worker thread.
419 for (int i = 0; i < 5; i++) {
420 EXPECT_NO_THROW(cpuExe.add(f));
427 TEST(PriorityThreadFactoryTest, ThreadPriority) {
428 PriorityThreadFactory factory(
429 std::make_shared<NamedThreadFactory>("stuff"), 1);
430 int actualPriority = -21;
431 factory.newThread([&]() { actualPriority = getpriority(PRIO_PROCESS, 0); })
433 EXPECT_EQ(1, actualPriority);
436 class TestData : public folly::RequestData {
438 explicit TestData(int data) : data_(data) {}
439 ~TestData() override {}
443 TEST(ThreadPoolExecutorTest, RequestContext) {
444 CPUThreadPoolExecutor executor(1);
446 RequestContextScopeGuard rctx; // create new request context for this scope
447 EXPECT_EQ(nullptr, RequestContext::get()->getContextData("test"));
448 RequestContext::get()->setContextData(
449 "test", std::unique_ptr<TestData>(new TestData(42)));
450 auto data = RequestContext::get()->getContextData("test");
451 EXPECT_EQ(42, dynamic_cast<TestData*>(data)->data_);
454 auto data = RequestContext::get()->getContextData("test");
455 ASSERT_TRUE(data != nullptr);
456 EXPECT_EQ(42, dynamic_cast<TestData*>(data)->data_);
461 explicit SlowMover(bool slow = false) : slow(slow) {}
462 SlowMover(SlowMover&& other) noexcept {
463 *this = std::move(other);
465 SlowMover& operator=(SlowMover&& other) noexcept {
468 /* sleep override */ std::this_thread::sleep_for(milliseconds(50));
476 TEST(ThreadPoolExecutorTest, BugD3527722) {
477 // Test that the queue does not get stuck if writes are completed in
478 // order opposite to how they are initiated.
479 LifoSemMPMCQueue<SlowMover> q(1024);
480 std::atomic<int> turn{};
482 std::thread consumer1([&] {
486 std::thread consumer2([&] {
491 std::thread producer1([&] {
497 q.add(SlowMover(true));
499 std::thread producer2([&] {
504 q.add(SlowMover(false));
513 template <typename TPE, typename ERR_T>
514 static void ShutdownTest() {
515 // test that adding a .then() after we have
516 // started shutting down does not deadlock
517 folly::Optional<folly::Future<int>> f;
520 f = folly::makeFuture().via(&fe).then([]() { burnMs(100)(); }).then([]() {
524 EXPECT_THROW(f->get(), ERR_T);
527 TEST(ThreadPoolExecutorTest, ShutdownTestIO) {
528 ShutdownTest<IOThreadPoolExecutor, std::runtime_error>();
531 TEST(ThreadPoolExecutorTest, ShutdownTestCPU) {
532 ShutdownTest<CPUThreadPoolExecutor, folly::FutureException>();
535 template <typename TPE>
536 static void removeThreadTest() {
537 // test that adding a .then() after we have removed some threads
538 // doesn't cause deadlock and they are executed on different threads
539 folly::Optional<folly::Future<int>> f;
540 std::thread::id id1, id2;
542 f = folly::makeFuture()
546 id1 = std::this_thread::get_id();
550 id2 = std::this_thread::get_id();
554 // future::then should be fulfilled because there is other thread available
555 EXPECT_EQ(77, f->get());
556 // two thread should be different because then part should be rescheduled to
561 TEST(ThreadPoolExecutorTest, RemoveThreadTestIO) {
562 removeThreadTest<IOThreadPoolExecutor>();
565 TEST(ThreadPoolExecutorTest, RemoveThreadTestCPU) {
566 removeThreadTest<CPUThreadPoolExecutor>();
569 template <typename TPE>
570 static void resizeThreadWhileExecutingTest() {
572 EXPECT_EQ(10, tpe.numThreads());
574 std::atomic<int> completed(0);
579 for (int i = 0; i < 1000; i++) {
582 tpe.setNumThreads(8);
583 EXPECT_EQ(8, tpe.numThreads());
584 tpe.setNumThreads(5);
585 EXPECT_EQ(5, tpe.numThreads());
586 tpe.setNumThreads(15);
587 EXPECT_EQ(15, tpe.numThreads());
589 EXPECT_EQ(1000, completed);
592 TEST(ThreadPoolExecutorTest, resizeThreadWhileExecutingTestIO) {
593 resizeThreadWhileExecutingTest<IOThreadPoolExecutor>();
596 TEST(ThreadPoolExecutorTest, resizeThreadWhileExecutingTestCPU) {
597 resizeThreadWhileExecutingTest<CPUThreadPoolExecutor>();