2 * Copyright 2014 Facebook, Inc.
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
8 * http://www.apache.org/licenses/LICENSE-2.0
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
17 #include <folly/experimental/wangle/concurrent/FutureExecutor.h>
18 #include <folly/experimental/wangle/concurrent/ThreadPoolExecutor.h>
19 #include <folly/experimental/wangle/concurrent/CPUThreadPoolExecutor.h>
20 #include <folly/experimental/wangle/concurrent/IOThreadPoolExecutor.h>
21 #include <glog/logging.h>
22 #include <gtest/gtest.h>
24 using namespace folly::wangle;
25 using namespace std::chrono;
27 static folly::Func burnMs(uint64_t ms) {
28 return [ms]() { std::this_thread::sleep_for(milliseconds(ms)); };
37 TEST(ThreadPoolExecutorTest, CPUBasic) {
38 basic<CPUThreadPoolExecutor>();
41 TEST(IOThreadPoolExecutorTest, IOBasic) {
42 basic<IOThreadPoolExecutor>();
46 static void resize() {
48 EXPECT_EQ(100, tpe.numThreads());
49 tpe.setNumThreads(50);
50 EXPECT_EQ(50, tpe.numThreads());
51 tpe.setNumThreads(150);
52 EXPECT_EQ(150, tpe.numThreads());
55 TEST(ThreadPoolExecutorTest, CPUResize) {
56 resize<CPUThreadPoolExecutor>();
59 TEST(ThreadPoolExecutorTest, IOResize) {
60 resize<IOThreadPoolExecutor>();
66 std::atomic<int> completed(0);
71 for (int i = 0; i < 1000; i++) {
75 EXPECT_GT(1000, completed);
78 // IOThreadPoolExecutor's stop() behaves like join(). Outstanding tasks belong
79 // to the event base, will be executed upon its destruction, and cannot be
82 void stop<IOThreadPoolExecutor>() {
83 IOThreadPoolExecutor tpe(1);
84 std::atomic<int> completed(0);
89 for (int i = 0; i < 10; i++) {
93 EXPECT_EQ(10, completed);
96 TEST(ThreadPoolExecutorTest, CPUStop) {
97 stop<CPUThreadPoolExecutor>();
100 TEST(ThreadPoolExecutorTest, IOStop) {
101 stop<IOThreadPoolExecutor>();
107 std::atomic<int> completed(0);
112 for (int i = 0; i < 1000; i++) {
116 EXPECT_EQ(1000, completed);
119 TEST(ThreadPoolExecutorTest, CPUJoin) {
120 join<CPUThreadPoolExecutor>();
123 TEST(ThreadPoolExecutorTest, IOJoin) {
124 join<IOThreadPoolExecutor>();
128 static void resizeUnderLoad() {
130 std::atomic<int> completed(0);
135 for (int i = 0; i < 1000; i++) {
138 tpe.setNumThreads(5);
139 tpe.setNumThreads(15);
141 EXPECT_EQ(1000, completed);
144 TEST(ThreadPoolExecutorTest, CPUResizeUnderLoad) {
145 resizeUnderLoad<CPUThreadPoolExecutor>();
148 TEST(ThreadPoolExecutorTest, IOResizeUnderLoad) {
149 resizeUnderLoad<IOThreadPoolExecutor>();
153 static void poolStats() {
154 folly::Baton<> startBaton, endBaton;
156 auto stats = tpe.getPoolStats();
157 EXPECT_EQ(1, stats.threadCount);
158 EXPECT_EQ(1, stats.idleThreadCount);
159 EXPECT_EQ(0, stats.activeThreadCount);
160 EXPECT_EQ(0, stats.pendingTaskCount);
161 EXPECT_EQ(0, stats.totalTaskCount);
162 tpe.add([&](){ startBaton.post(); endBaton.wait(); });
165 stats = tpe.getPoolStats();
166 EXPECT_EQ(1, stats.threadCount);
167 EXPECT_EQ(0, stats.idleThreadCount);
168 EXPECT_EQ(1, stats.activeThreadCount);
169 EXPECT_EQ(1, stats.pendingTaskCount);
170 EXPECT_EQ(2, stats.totalTaskCount);
174 TEST(ThreadPoolExecutorTest, CPUPoolStats) {
175 poolStats<CPUThreadPoolExecutor>();
178 TEST(ThreadPoolExecutorTest, IOPoolStats) {
179 poolStats<IOThreadPoolExecutor>();
183 static void taskStats() {
185 std::atomic<int> c(0);
186 auto s = tpe.subscribeToTaskStats(
187 Observer<ThreadPoolExecutor::TaskStats>::create(
188 [&](ThreadPoolExecutor::TaskStats stats) {
190 EXPECT_LT(milliseconds(0), stats.runTime);
192 EXPECT_LT(milliseconds(0), stats.waitTime);
201 TEST(ThreadPoolExecutorTest, CPUTaskStats) {
202 taskStats<CPUThreadPoolExecutor>();
205 TEST(ThreadPoolExecutorTest, IOTaskStats) {
206 taskStats<IOThreadPoolExecutor>();
210 static void expiration() {
212 std::atomic<int> statCbCount(0);
213 auto s = tpe.subscribeToTaskStats(
214 Observer<ThreadPoolExecutor::TaskStats>::create(
215 [&](ThreadPoolExecutor::TaskStats stats) {
216 int i = statCbCount++;
218 EXPECT_FALSE(stats.expired);
220 EXPECT_TRUE(stats.expired);
225 std::atomic<int> expireCbCount(0);
226 auto expireCb = [&] () { expireCbCount++; };
227 tpe.add(burnMs(10), seconds(60), expireCb);
228 tpe.add(burnMs(10), milliseconds(10), expireCb);
230 EXPECT_EQ(2, statCbCount);
231 EXPECT_EQ(1, expireCbCount);
234 TEST(ThreadPoolExecutorTest, CPUExpiration) {
235 expiration<CPUThreadPoolExecutor>();
238 TEST(ThreadPoolExecutorTest, IOExpiration) {
239 expiration<IOThreadPoolExecutor>();
242 template <typename TPE>
243 static void futureExecutor() {
244 FutureExecutor<TPE> fe(2);
245 std::atomic<int> c{0};
246 fe.addFuture([] () { return makeFuture<int>(42); }).then(
249 EXPECT_EQ(42, t.value());
251 fe.addFuture([] () { return 100; }).then(
254 EXPECT_EQ(100, t.value());
256 fe.addFuture([] () { return makeFuture(); }).then(
257 [&] (Try<void>&& t) {
259 EXPECT_NO_THROW(t.value());
261 fe.addFuture([] () { return; }).then(
262 [&] (Try<void>&& t) {
264 EXPECT_NO_THROW(t.value());
266 fe.addFuture([] () { throw std::runtime_error("oops"); }).then(
267 [&] (Try<void>&& t) {
269 EXPECT_THROW(t.value(), std::runtime_error);
271 // Test doing actual async work
272 folly::Baton<> baton;
273 fe.addFuture([&] () {
274 auto p = std::make_shared<Promise<int>>();
280 return p->getFuture();
281 }).then([&] (Try<int>&& t) {
282 EXPECT_EQ(42, t.value());
291 TEST(ThreadPoolExecutorTest, CPUFuturePool) {
292 futureExecutor<CPUThreadPoolExecutor>();
295 TEST(ThreadPoolExecutorTest, IOFuturePool) {
296 futureExecutor<IOThreadPoolExecutor>();
299 TEST(ThreadPoolExecutorTest, PriorityPreemptionTest) {
300 bool tookLopri = false;
303 EXPECT_FALSE(tookLopri);
310 CPUThreadPoolExecutor pool(0, 2);
311 for (int i = 0; i < 50; i++) {
314 for (int i = 0; i < 50; i++) {
317 pool.setNumThreads(1);
319 EXPECT_EQ(100, completed);