2 * Copyright 2015 Facebook, Inc.
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
8 * http://www.apache.org/licenses/LICENSE-2.0
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
17 #include <folly/wangle/concurrent/FutureExecutor.h>
18 #include <folly/wangle/concurrent/ThreadPoolExecutor.h>
19 #include <folly/wangle/concurrent/CPUThreadPoolExecutor.h>
20 #include <folly/wangle/concurrent/IOThreadPoolExecutor.h>
21 #include <glog/logging.h>
22 #include <gtest/gtest.h>
24 using namespace folly;
25 using namespace folly::wangle;
26 using namespace std::chrono;
28 static folly::Func burnMs(uint64_t ms) {
29 return [ms]() { std::this_thread::sleep_for(milliseconds(ms)); };
38 TEST(ThreadPoolExecutorTest, CPUBasic) {
39 basic<CPUThreadPoolExecutor>();
42 TEST(IOThreadPoolExecutorTest, IOBasic) {
43 basic<IOThreadPoolExecutor>();
47 static void resize() {
49 EXPECT_EQ(100, tpe.numThreads());
50 tpe.setNumThreads(50);
51 EXPECT_EQ(50, tpe.numThreads());
52 tpe.setNumThreads(150);
53 EXPECT_EQ(150, tpe.numThreads());
56 TEST(ThreadPoolExecutorTest, CPUResize) {
57 resize<CPUThreadPoolExecutor>();
60 TEST(ThreadPoolExecutorTest, IOResize) {
61 resize<IOThreadPoolExecutor>();
67 std::atomic<int> completed(0);
72 for (int i = 0; i < 1000; i++) {
76 EXPECT_GT(1000, completed);
79 // IOThreadPoolExecutor's stop() behaves like join(). Outstanding tasks belong
80 // to the event base, will be executed upon its destruction, and cannot be
83 void stop<IOThreadPoolExecutor>() {
84 IOThreadPoolExecutor tpe(1);
85 std::atomic<int> completed(0);
90 for (int i = 0; i < 10; i++) {
94 EXPECT_EQ(10, completed);
97 TEST(ThreadPoolExecutorTest, CPUStop) {
98 stop<CPUThreadPoolExecutor>();
101 TEST(ThreadPoolExecutorTest, IOStop) {
102 stop<IOThreadPoolExecutor>();
108 std::atomic<int> completed(0);
113 for (int i = 0; i < 1000; i++) {
117 EXPECT_EQ(1000, completed);
120 TEST(ThreadPoolExecutorTest, CPUJoin) {
121 join<CPUThreadPoolExecutor>();
124 TEST(ThreadPoolExecutorTest, IOJoin) {
125 join<IOThreadPoolExecutor>();
129 static void resizeUnderLoad() {
131 std::atomic<int> completed(0);
136 for (int i = 0; i < 1000; i++) {
139 tpe.setNumThreads(5);
140 tpe.setNumThreads(15);
142 EXPECT_EQ(1000, completed);
145 TEST(ThreadPoolExecutorTest, CPUResizeUnderLoad) {
146 resizeUnderLoad<CPUThreadPoolExecutor>();
149 TEST(ThreadPoolExecutorTest, IOResizeUnderLoad) {
150 resizeUnderLoad<IOThreadPoolExecutor>();
154 static void poolStats() {
155 folly::Baton<> startBaton, endBaton;
157 auto stats = tpe.getPoolStats();
158 EXPECT_EQ(1, stats.threadCount);
159 EXPECT_EQ(1, stats.idleThreadCount);
160 EXPECT_EQ(0, stats.activeThreadCount);
161 EXPECT_EQ(0, stats.pendingTaskCount);
162 EXPECT_EQ(0, stats.totalTaskCount);
163 tpe.add([&](){ startBaton.post(); endBaton.wait(); });
166 stats = tpe.getPoolStats();
167 EXPECT_EQ(1, stats.threadCount);
168 EXPECT_EQ(0, stats.idleThreadCount);
169 EXPECT_EQ(1, stats.activeThreadCount);
170 EXPECT_EQ(1, stats.pendingTaskCount);
171 EXPECT_EQ(2, stats.totalTaskCount);
175 TEST(ThreadPoolExecutorTest, CPUPoolStats) {
176 poolStats<CPUThreadPoolExecutor>();
179 TEST(ThreadPoolExecutorTest, IOPoolStats) {
180 poolStats<IOThreadPoolExecutor>();
184 static void taskStats() {
186 std::atomic<int> c(0);
187 auto s = tpe.subscribeToTaskStats(
188 Observer<ThreadPoolExecutor::TaskStats>::create(
189 [&](ThreadPoolExecutor::TaskStats stats) {
191 EXPECT_LT(milliseconds(0), stats.runTime);
193 EXPECT_LT(milliseconds(0), stats.waitTime);
202 TEST(ThreadPoolExecutorTest, CPUTaskStats) {
203 taskStats<CPUThreadPoolExecutor>();
206 TEST(ThreadPoolExecutorTest, IOTaskStats) {
207 taskStats<IOThreadPoolExecutor>();
211 static void expiration() {
213 std::atomic<int> statCbCount(0);
214 auto s = tpe.subscribeToTaskStats(
215 Observer<ThreadPoolExecutor::TaskStats>::create(
216 [&](ThreadPoolExecutor::TaskStats stats) {
217 int i = statCbCount++;
219 EXPECT_FALSE(stats.expired);
221 EXPECT_TRUE(stats.expired);
226 std::atomic<int> expireCbCount(0);
227 auto expireCb = [&] () { expireCbCount++; };
228 tpe.add(burnMs(10), seconds(60), expireCb);
229 tpe.add(burnMs(10), milliseconds(10), expireCb);
231 EXPECT_EQ(2, statCbCount);
232 EXPECT_EQ(1, expireCbCount);
235 TEST(ThreadPoolExecutorTest, CPUExpiration) {
236 expiration<CPUThreadPoolExecutor>();
239 TEST(ThreadPoolExecutorTest, IOExpiration) {
240 expiration<IOThreadPoolExecutor>();
243 template <typename TPE>
244 static void futureExecutor() {
245 FutureExecutor<TPE> fe(2);
246 std::atomic<int> c{0};
247 fe.addFuture([] () { return makeFuture<int>(42); }).then(
250 EXPECT_EQ(42, t.value());
252 fe.addFuture([] () { return 100; }).then(
255 EXPECT_EQ(100, t.value());
257 fe.addFuture([] () { return makeFuture(); }).then(
258 [&] (Try<void>&& t) {
260 EXPECT_NO_THROW(t.value());
262 fe.addFuture([] () { return; }).then(
263 [&] (Try<void>&& t) {
265 EXPECT_NO_THROW(t.value());
267 fe.addFuture([] () { throw std::runtime_error("oops"); }).then(
268 [&] (Try<void>&& t) {
270 EXPECT_THROW(t.value(), std::runtime_error);
272 // Test doing actual async work
273 folly::Baton<> baton;
274 fe.addFuture([&] () {
275 auto p = std::make_shared<Promise<int>>();
281 return p->getFuture();
282 }).then([&] (Try<int>&& t) {
283 EXPECT_EQ(42, t.value());
292 TEST(ThreadPoolExecutorTest, CPUFuturePool) {
293 futureExecutor<CPUThreadPoolExecutor>();
296 TEST(ThreadPoolExecutorTest, IOFuturePool) {
297 futureExecutor<IOThreadPoolExecutor>();
300 TEST(ThreadPoolExecutorTest, PriorityPreemptionTest) {
301 bool tookLopri = false;
304 EXPECT_FALSE(tookLopri);
311 CPUThreadPoolExecutor pool(0, 2);
312 for (int i = 0; i < 50; i++) {
313 pool.addWithPriority(lopri, Executor::LO_PRI);
315 for (int i = 0; i < 50; i++) {
316 pool.addWithPriority(hipri, Executor::HI_PRI);
318 pool.setNumThreads(1);
320 EXPECT_EQ(100, completed);
323 class TestObserver : public ThreadPoolExecutor::Observer {
325 void threadStarted(ThreadPoolExecutor::ThreadHandle*) {
328 void threadStopped(ThreadPoolExecutor::ThreadHandle*) {
331 void threadPreviouslyStarted(ThreadPoolExecutor::ThreadHandle*) {
334 void threadNotYetStopped(ThreadPoolExecutor::ThreadHandle*) {
338 ASSERT_EQ(threads_, 0);
341 std::atomic<int> threads_{0};
344 TEST(ThreadPoolExecutorTest, IOObserver) {
345 auto observer = std::make_shared<TestObserver>();
348 IOThreadPoolExecutor exe(10);
349 exe.addObserver(observer);
350 exe.setNumThreads(3);
351 exe.setNumThreads(0);
352 exe.setNumThreads(7);
353 exe.removeObserver(observer);
354 exe.setNumThreads(10);
357 observer->checkCalls();
360 TEST(ThreadPoolExecutorTest, CPUObserver) {
361 auto observer = std::make_shared<TestObserver>();
364 CPUThreadPoolExecutor exe(10);
365 exe.addObserver(observer);
366 exe.setNumThreads(3);
367 exe.setNumThreads(0);
368 exe.setNumThreads(7);
369 exe.removeObserver(observer);
370 exe.setNumThreads(10);
373 observer->checkCalls();
376 TEST(ThreadPoolExecutorTest, AddWithPriority) {
377 std::atomic_int c{0};
378 auto f = [&]{ c++; };
380 // IO exe doesn't support priorities
381 IOThreadPoolExecutor ioExe(10);
382 EXPECT_THROW(ioExe.addWithPriority(f, 0), std::runtime_error);
384 CPUThreadPoolExecutor cpuExe(10, 3);
385 cpuExe.addWithPriority(f, -1);
386 cpuExe.addWithPriority(f, 0);
387 cpuExe.addWithPriority(f, 1);
388 cpuExe.addWithPriority(f, -2); // will add at the lowest priority
389 cpuExe.addWithPriority(f, 2); // will add at the highest priority
390 cpuExe.addWithPriority(f, Executor::LO_PRI);
391 cpuExe.addWithPriority(f, Executor::HI_PRI);