2 * Copyright 2016 Facebook, Inc.
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
8 * http://www.apache.org/licenses/LICENSE-2.0
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
20 #include <gtest/gtest.h>
22 #include <folly/Benchmark.h>
23 #include <folly/Memory.h>
24 #include <folly/futures/Future.h>
26 #include <folly/fibers/AddTasks.h>
27 #include <folly/fibers/EventBaseLoopController.h>
28 #include <folly/fibers/FiberManager.h>
29 #include <folly/fibers/FiberManagerMap.h>
30 #include <folly/fibers/GenericBaton.h>
31 #include <folly/fibers/SimpleLoopController.h>
32 #include <folly/fibers/WhenN.h>
34 using namespace folly::fibers;
38 TEST(FiberManager, batonTimedWaitTimeout) {
39 bool taskAdded = false;
40 size_t iterations = 0;
42 FiberManager manager(folly::make_unique<SimpleLoopController>());
43 auto& loopController =
44 dynamic_cast<SimpleLoopController&>(manager.loopController());
46 auto loopFunc = [&]() {
48 manager.addTask([&]() {
51 auto res = baton.timed_wait(std::chrono::milliseconds(230));
54 EXPECT_EQ(5, iterations);
56 loopController.stop();
58 manager.addTask([&]() {
61 auto res = baton.timed_wait(std::chrono::milliseconds(130));
64 EXPECT_EQ(3, iterations);
66 loopController.stop();
70 std::this_thread::sleep_for(std::chrono::milliseconds(50));
75 loopController.loop(std::move(loopFunc));
78 TEST(FiberManager, batonTimedWaitPost) {
79 bool taskAdded = false;
80 size_t iterations = 0;
83 FiberManager manager(folly::make_unique<SimpleLoopController>());
84 auto& loopController =
85 dynamic_cast<SimpleLoopController&>(manager.loopController());
87 auto loopFunc = [&]() {
89 manager.addTask([&]() {
93 auto res = baton.timed_wait(std::chrono::milliseconds(130));
96 EXPECT_EQ(2, iterations);
98 loopController.stop();
102 std::this_thread::sleep_for(std::chrono::milliseconds(50));
104 if (iterations == 2) {
110 loopController.loop(std::move(loopFunc));
113 TEST(FiberManager, batonTimedWaitTimeoutEvb) {
114 size_t tasksComplete = 0;
116 folly::EventBase evb;
118 FiberManager manager(folly::make_unique<EventBaseLoopController>());
119 dynamic_cast<EventBaseLoopController&>(manager.loopController())
120 .attachEventBase(evb);
122 auto task = [&](size_t timeout_ms) {
125 auto start = EventBaseLoopController::Clock::now();
126 auto res = baton.timed_wait(std::chrono::milliseconds(timeout_ms));
127 auto finish = EventBaseLoopController::Clock::now();
132 std::chrono::duration_cast<std::chrono::milliseconds>(finish - start);
134 EXPECT_GT(duration_ms.count(), timeout_ms - 50);
135 EXPECT_LT(duration_ms.count(), timeout_ms + 50);
137 if (++tasksComplete == 2) {
138 evb.terminateLoopSoon();
142 evb.runInEventBaseThread([&]() {
143 manager.addTask([&]() { task(500); });
144 manager.addTask([&]() { task(250); });
149 EXPECT_EQ(2, tasksComplete);
152 TEST(FiberManager, batonTimedWaitPostEvb) {
153 size_t tasksComplete = 0;
155 folly::EventBase evb;
157 FiberManager manager(folly::make_unique<EventBaseLoopController>());
158 dynamic_cast<EventBaseLoopController&>(manager.loopController())
159 .attachEventBase(evb);
161 evb.runInEventBaseThread([&]() {
162 manager.addTask([&]() {
165 evb.tryRunAfterDelay([&]() { baton.post(); }, 100);
167 auto start = EventBaseLoopController::Clock::now();
168 auto res = baton.timed_wait(std::chrono::milliseconds(130));
169 auto finish = EventBaseLoopController::Clock::now();
174 std::chrono::duration_cast<std::chrono::milliseconds>(finish - start);
176 EXPECT_TRUE(duration_ms.count() > 95 && duration_ms.count() < 110);
178 if (++tasksComplete == 1) {
179 evb.terminateLoopSoon();
186 EXPECT_EQ(1, tasksComplete);
189 TEST(FiberManager, batonTryWait) {
190 FiberManager manager(folly::make_unique<SimpleLoopController>());
192 // Check if try_wait and post work as expected
195 manager.addTask([&]() {
196 while (!b.try_wait()) {
199 auto thr = std::thread([&]() {
200 std::this_thread::sleep_for(std::chrono::milliseconds(300));
204 manager.loopUntilNoReady();
209 // Check try_wait without post
210 manager.addTask([&]() {
212 while (cnt && !c.try_wait()) {
215 EXPECT_TRUE(!c.try_wait()); // must still hold
219 manager.loopUntilNoReady();
222 TEST(FiberManager, genericBatonFiberWait) {
223 FiberManager manager(folly::make_unique<SimpleLoopController>());
226 bool fiberRunning = false;
228 manager.addTask([&]() {
229 EXPECT_EQ(manager.hasActiveFiber(), true);
232 fiberRunning = false;
235 EXPECT_FALSE(fiberRunning);
236 manager.loopUntilNoReady();
237 EXPECT_TRUE(fiberRunning); // ensure fiber still active
239 auto thr = std::thread([&]() {
240 std::this_thread::sleep_for(std::chrono::milliseconds(300));
244 while (fiberRunning) {
245 manager.loopUntilNoReady();
251 TEST(FiberManager, genericBatonThreadWait) {
252 FiberManager manager(folly::make_unique<SimpleLoopController>());
254 std::atomic<bool> threadWaiting(false);
256 auto thr = std::thread([&]() {
257 threadWaiting = true;
259 threadWaiting = false;
262 while (!threadWaiting) {
264 std::this_thread::sleep_for(std::chrono::milliseconds(300));
266 manager.addTask([&]() {
267 EXPECT_EQ(manager.hasActiveFiber(), true);
268 EXPECT_TRUE(threadWaiting);
270 while (threadWaiting) {
274 manager.loopUntilNoReady();
278 TEST(FiberManager, addTasksNoncopyable) {
279 std::vector<Promise<int>> pendingFibers;
280 bool taskAdded = false;
282 FiberManager manager(folly::make_unique<SimpleLoopController>());
283 auto& loopController =
284 dynamic_cast<SimpleLoopController&>(manager.loopController());
286 auto loopFunc = [&]() {
288 manager.addTask([&]() {
289 std::vector<std::function<std::unique_ptr<int>()>> funcs;
290 for (size_t i = 0; i < 3; ++i) {
291 funcs.push_back([i, &pendingFibers]() {
292 await([&pendingFibers](Promise<int> promise) {
293 pendingFibers.push_back(std::move(promise));
295 return folly::make_unique<int>(i * 2 + 1);
299 auto iter = addTasks(funcs.begin(), funcs.end());
302 while (iter.hasNext()) {
303 auto result = iter.awaitNext();
304 EXPECT_EQ(2 * iter.getTaskID() + 1, *result);
305 EXPECT_GE(2 - n, pendingFibers.size());
311 } else if (pendingFibers.size()) {
312 pendingFibers.back().setValue(0);
313 pendingFibers.pop_back();
315 loopController.stop();
319 loopController.loop(std::move(loopFunc));
322 TEST(FiberManager, awaitThrow) {
323 folly::EventBase evb;
324 struct ExpectedException {};
328 await([](Promise<int> p) {
330 throw ExpectedException();
336 await([&](Promise<int> p) {
337 evb.runInEventBaseThread([p = std::move(p)]() mutable {
340 throw ExpectedException();
347 TEST(FiberManager, addTasksThrow) {
348 std::vector<Promise<int>> pendingFibers;
349 bool taskAdded = false;
351 FiberManager manager(folly::make_unique<SimpleLoopController>());
352 auto& loopController =
353 dynamic_cast<SimpleLoopController&>(manager.loopController());
355 auto loopFunc = [&]() {
357 manager.addTask([&]() {
358 std::vector<std::function<int()>> funcs;
359 for (size_t i = 0; i < 3; ++i) {
360 funcs.push_back([i, &pendingFibers]() {
361 await([&pendingFibers](Promise<int> promise) {
362 pendingFibers.push_back(std::move(promise));
365 throw std::runtime_error("Runtime");
371 auto iter = addTasks(funcs.begin(), funcs.end());
374 while (iter.hasNext()) {
376 int result = iter.awaitNext();
377 EXPECT_EQ(1, iter.getTaskID() % 2);
378 EXPECT_EQ(2 * iter.getTaskID() + 1, result);
380 EXPECT_EQ(0, iter.getTaskID() % 2);
382 EXPECT_GE(2 - n, pendingFibers.size());
388 } else if (pendingFibers.size()) {
389 pendingFibers.back().setValue(0);
390 pendingFibers.pop_back();
392 loopController.stop();
396 loopController.loop(std::move(loopFunc));
399 TEST(FiberManager, addTasksVoid) {
400 std::vector<Promise<int>> pendingFibers;
401 bool taskAdded = false;
403 FiberManager manager(folly::make_unique<SimpleLoopController>());
404 auto& loopController =
405 dynamic_cast<SimpleLoopController&>(manager.loopController());
407 auto loopFunc = [&]() {
409 manager.addTask([&]() {
410 std::vector<std::function<void()>> funcs;
411 for (size_t i = 0; i < 3; ++i) {
412 funcs.push_back([i, &pendingFibers]() {
413 await([&pendingFibers](Promise<int> promise) {
414 pendingFibers.push_back(std::move(promise));
419 auto iter = addTasks(funcs.begin(), funcs.end());
422 while (iter.hasNext()) {
424 EXPECT_GE(2 - n, pendingFibers.size());
430 } else if (pendingFibers.size()) {
431 pendingFibers.back().setValue(0);
432 pendingFibers.pop_back();
434 loopController.stop();
438 loopController.loop(std::move(loopFunc));
441 TEST(FiberManager, addTasksVoidThrow) {
442 std::vector<Promise<int>> pendingFibers;
443 bool taskAdded = false;
445 FiberManager manager(folly::make_unique<SimpleLoopController>());
446 auto& loopController =
447 dynamic_cast<SimpleLoopController&>(manager.loopController());
449 auto loopFunc = [&]() {
451 manager.addTask([&]() {
452 std::vector<std::function<void()>> funcs;
453 for (size_t i = 0; i < 3; ++i) {
454 funcs.push_back([i, &pendingFibers]() {
455 await([&pendingFibers](Promise<int> promise) {
456 pendingFibers.push_back(std::move(promise));
459 throw std::runtime_error("");
464 auto iter = addTasks(funcs.begin(), funcs.end());
467 while (iter.hasNext()) {
470 EXPECT_EQ(1, iter.getTaskID() % 2);
472 EXPECT_EQ(0, iter.getTaskID() % 2);
474 EXPECT_GE(2 - n, pendingFibers.size());
480 } else if (pendingFibers.size()) {
481 pendingFibers.back().setValue(0);
482 pendingFibers.pop_back();
484 loopController.stop();
488 loopController.loop(std::move(loopFunc));
491 TEST(FiberManager, addTasksReserve) {
492 std::vector<Promise<int>> pendingFibers;
493 bool taskAdded = false;
495 FiberManager manager(folly::make_unique<SimpleLoopController>());
496 auto& loopController =
497 dynamic_cast<SimpleLoopController&>(manager.loopController());
499 auto loopFunc = [&]() {
501 manager.addTask([&]() {
502 std::vector<std::function<void()>> funcs;
503 for (size_t i = 0; i < 3; ++i) {
504 funcs.push_back([&pendingFibers]() {
505 await([&pendingFibers](Promise<int> promise) {
506 pendingFibers.push_back(std::move(promise));
511 auto iter = addTasks(funcs.begin(), funcs.end());
514 EXPECT_TRUE(iter.hasCompleted());
515 EXPECT_TRUE(iter.hasPending());
516 EXPECT_TRUE(iter.hasNext());
519 EXPECT_TRUE(iter.hasCompleted());
520 EXPECT_TRUE(iter.hasPending());
521 EXPECT_TRUE(iter.hasNext());
524 EXPECT_FALSE(iter.hasCompleted());
525 EXPECT_TRUE(iter.hasPending());
526 EXPECT_TRUE(iter.hasNext());
529 EXPECT_FALSE(iter.hasCompleted());
530 EXPECT_FALSE(iter.hasPending());
531 EXPECT_FALSE(iter.hasNext());
534 } else if (pendingFibers.size()) {
535 pendingFibers.back().setValue(0);
536 pendingFibers.pop_back();
538 loopController.stop();
542 loopController.loop(std::move(loopFunc));
545 TEST(FiberManager, addTaskDynamic) {
546 folly::EventBase evb;
550 auto makeTask = [&](size_t taskId) {
551 return [&, taskId]() -> size_t {
552 batons[taskId].wait();
558 .addTaskFuture([&]() {
559 TaskIterator<size_t> iterator;
561 iterator.addTask(makeTask(0));
562 iterator.addTask(makeTask(1));
566 EXPECT_EQ(1, iterator.awaitNext());
568 iterator.addTask(makeTask(2));
572 EXPECT_EQ(2, iterator.awaitNext());
576 EXPECT_EQ(0, iterator.awaitNext());
581 TEST(FiberManager, forEach) {
582 std::vector<Promise<int>> pendingFibers;
583 bool taskAdded = false;
585 FiberManager manager(folly::make_unique<SimpleLoopController>());
586 auto& loopController =
587 dynamic_cast<SimpleLoopController&>(manager.loopController());
589 auto loopFunc = [&]() {
591 manager.addTask([&]() {
592 std::vector<std::function<int()>> funcs;
593 for (size_t i = 0; i < 3; ++i) {
594 funcs.push_back([i, &pendingFibers]() {
595 await([&pendingFibers](Promise<int> promise) {
596 pendingFibers.push_back(std::move(promise));
602 std::vector<std::pair<size_t, int>> results;
603 forEach(funcs.begin(), funcs.end(), [&results](size_t id, int result) {
604 results.emplace_back(id, result);
606 EXPECT_EQ(3, results.size());
607 EXPECT_TRUE(pendingFibers.empty());
608 for (size_t i = 0; i < 3; ++i) {
609 EXPECT_EQ(results[i].first * 2 + 1, results[i].second);
613 } else if (pendingFibers.size()) {
614 pendingFibers.back().setValue(0);
615 pendingFibers.pop_back();
617 loopController.stop();
621 loopController.loop(std::move(loopFunc));
624 TEST(FiberManager, collectN) {
625 std::vector<Promise<int>> pendingFibers;
626 bool taskAdded = false;
628 FiberManager manager(folly::make_unique<SimpleLoopController>());
629 auto& loopController =
630 dynamic_cast<SimpleLoopController&>(manager.loopController());
632 auto loopFunc = [&]() {
634 manager.addTask([&]() {
635 std::vector<std::function<int()>> funcs;
636 for (size_t i = 0; i < 3; ++i) {
637 funcs.push_back([i, &pendingFibers]() {
638 await([&pendingFibers](Promise<int> promise) {
639 pendingFibers.push_back(std::move(promise));
645 auto results = collectN(funcs.begin(), funcs.end(), 2);
646 EXPECT_EQ(2, results.size());
647 EXPECT_EQ(1, pendingFibers.size());
648 for (size_t i = 0; i < 2; ++i) {
649 EXPECT_EQ(results[i].first * 2 + 1, results[i].second);
653 } else if (pendingFibers.size()) {
654 pendingFibers.back().setValue(0);
655 pendingFibers.pop_back();
657 loopController.stop();
661 loopController.loop(std::move(loopFunc));
664 TEST(FiberManager, collectNThrow) {
665 std::vector<Promise<int>> pendingFibers;
666 bool taskAdded = false;
668 FiberManager manager(folly::make_unique<SimpleLoopController>());
669 auto& loopController =
670 dynamic_cast<SimpleLoopController&>(manager.loopController());
672 auto loopFunc = [&]() {
674 manager.addTask([&]() {
675 std::vector<std::function<int()>> funcs;
676 for (size_t i = 0; i < 3; ++i) {
677 funcs.push_back([i, &pendingFibers]() {
678 await([&pendingFibers](Promise<int> promise) {
679 pendingFibers.push_back(std::move(promise));
681 throw std::runtime_error("Runtime");
687 collectN(funcs.begin(), funcs.end(), 2);
689 EXPECT_EQ(1, pendingFibers.size());
693 } else if (pendingFibers.size()) {
694 pendingFibers.back().setValue(0);
695 pendingFibers.pop_back();
697 loopController.stop();
701 loopController.loop(std::move(loopFunc));
704 TEST(FiberManager, collectNVoid) {
705 std::vector<Promise<int>> pendingFibers;
706 bool taskAdded = false;
708 FiberManager manager(folly::make_unique<SimpleLoopController>());
709 auto& loopController =
710 dynamic_cast<SimpleLoopController&>(manager.loopController());
712 auto loopFunc = [&]() {
714 manager.addTask([&]() {
715 std::vector<std::function<void()>> funcs;
716 for (size_t i = 0; i < 3; ++i) {
717 funcs.push_back([i, &pendingFibers]() {
718 await([&pendingFibers](Promise<int> promise) {
719 pendingFibers.push_back(std::move(promise));
724 auto results = collectN(funcs.begin(), funcs.end(), 2);
725 EXPECT_EQ(2, results.size());
726 EXPECT_EQ(1, pendingFibers.size());
729 } else if (pendingFibers.size()) {
730 pendingFibers.back().setValue(0);
731 pendingFibers.pop_back();
733 loopController.stop();
737 loopController.loop(std::move(loopFunc));
740 TEST(FiberManager, collectNVoidThrow) {
741 std::vector<Promise<int>> pendingFibers;
742 bool taskAdded = false;
744 FiberManager manager(folly::make_unique<SimpleLoopController>());
745 auto& loopController =
746 dynamic_cast<SimpleLoopController&>(manager.loopController());
748 auto loopFunc = [&]() {
750 manager.addTask([&]() {
751 std::vector<std::function<void()>> funcs;
752 for (size_t i = 0; i < 3; ++i) {
753 funcs.push_back([i, &pendingFibers]() {
754 await([&pendingFibers](Promise<int> promise) {
755 pendingFibers.push_back(std::move(promise));
757 throw std::runtime_error("Runtime");
762 collectN(funcs.begin(), funcs.end(), 2);
764 EXPECT_EQ(1, pendingFibers.size());
768 } else if (pendingFibers.size()) {
769 pendingFibers.back().setValue(0);
770 pendingFibers.pop_back();
772 loopController.stop();
776 loopController.loop(std::move(loopFunc));
779 TEST(FiberManager, collectAll) {
780 std::vector<Promise<int>> pendingFibers;
781 bool taskAdded = false;
783 FiberManager manager(folly::make_unique<SimpleLoopController>());
784 auto& loopController =
785 dynamic_cast<SimpleLoopController&>(manager.loopController());
787 auto loopFunc = [&]() {
789 manager.addTask([&]() {
790 std::vector<std::function<int()>> funcs;
791 for (size_t i = 0; i < 3; ++i) {
792 funcs.push_back([i, &pendingFibers]() {
793 await([&pendingFibers](Promise<int> promise) {
794 pendingFibers.push_back(std::move(promise));
800 auto results = collectAll(funcs.begin(), funcs.end());
801 EXPECT_TRUE(pendingFibers.empty());
802 for (size_t i = 0; i < 3; ++i) {
803 EXPECT_EQ(i * 2 + 1, results[i]);
807 } else if (pendingFibers.size()) {
808 pendingFibers.back().setValue(0);
809 pendingFibers.pop_back();
811 loopController.stop();
815 loopController.loop(std::move(loopFunc));
818 TEST(FiberManager, collectAllVoid) {
819 std::vector<Promise<int>> pendingFibers;
820 bool taskAdded = false;
822 FiberManager manager(folly::make_unique<SimpleLoopController>());
823 auto& loopController =
824 dynamic_cast<SimpleLoopController&>(manager.loopController());
826 auto loopFunc = [&]() {
828 manager.addTask([&]() {
829 std::vector<std::function<void()>> funcs;
830 for (size_t i = 0; i < 3; ++i) {
831 funcs.push_back([i, &pendingFibers]() {
832 await([&pendingFibers](Promise<int> promise) {
833 pendingFibers.push_back(std::move(promise));
838 collectAll(funcs.begin(), funcs.end());
839 EXPECT_TRUE(pendingFibers.empty());
842 } else if (pendingFibers.size()) {
843 pendingFibers.back().setValue(0);
844 pendingFibers.pop_back();
846 loopController.stop();
850 loopController.loop(std::move(loopFunc));
853 TEST(FiberManager, collectAny) {
854 std::vector<Promise<int>> pendingFibers;
855 bool taskAdded = false;
857 FiberManager manager(folly::make_unique<SimpleLoopController>());
858 auto& loopController =
859 dynamic_cast<SimpleLoopController&>(manager.loopController());
861 auto loopFunc = [&]() {
863 manager.addTask([&]() {
864 std::vector<std::function<int()>> funcs;
865 for (size_t i = 0; i < 3; ++i) {
866 funcs.push_back([i, &pendingFibers]() {
867 await([&pendingFibers](Promise<int> promise) {
868 pendingFibers.push_back(std::move(promise));
871 throw std::runtime_error("This exception will be ignored");
877 auto result = collectAny(funcs.begin(), funcs.end());
878 EXPECT_EQ(2, pendingFibers.size());
879 EXPECT_EQ(2, result.first);
880 EXPECT_EQ(2 * 2 + 1, result.second);
883 } else if (pendingFibers.size()) {
884 pendingFibers.back().setValue(0);
885 pendingFibers.pop_back();
887 loopController.stop();
891 loopController.loop(std::move(loopFunc));
895 /* Checks that this function was run from a main context,
896 by comparing an address on a stack to a known main stack address
897 and a known related fiber stack address. The assumption
898 is that fiber stack and main stack will be far enough apart,
899 while any two values on the same stack will be close. */
900 void expectMainContext(bool& ran, int* mainLocation, int* fiberLocation) {
902 /* 2 pages is a good guess */
903 constexpr ssize_t DISTANCE = 0x2000 / sizeof(int);
905 EXPECT_TRUE(std::abs(&here - fiberLocation) > DISTANCE);
908 EXPECT_TRUE(std::abs(&here - mainLocation) < DISTANCE);
916 TEST(FiberManager, runInMainContext) {
917 FiberManager manager(folly::make_unique<SimpleLoopController>());
918 auto& loopController =
919 dynamic_cast<SimpleLoopController&>(manager.loopController());
921 bool checkRan = false;
924 manager.runInMainContext(
925 [&]() { expectMainContext(checkRan, &mainLocation, nullptr); });
926 EXPECT_TRUE(checkRan);
930 manager.addTask([&]() {
932 explicit A(int value_) : value(value_) {}
933 A(const A&) = delete;
939 auto ret = runInMainContext([&]() {
940 expectMainContext(checkRan, &mainLocation, &stackLocation);
943 EXPECT_TRUE(checkRan);
944 EXPECT_EQ(42, ret.value);
947 loopController.loop([&]() { loopController.stop(); });
949 EXPECT_TRUE(checkRan);
952 TEST(FiberManager, addTaskFinally) {
953 FiberManager manager(folly::make_unique<SimpleLoopController>());
954 auto& loopController =
955 dynamic_cast<SimpleLoopController&>(manager.loopController());
957 bool checkRan = false;
961 manager.addTaskFinally(
962 [&]() { return 1234; },
963 [&](Try<int>&& result) {
964 EXPECT_EQ(result.value(), 1234);
966 expectMainContext(checkRan, &mainLocation, nullptr);
969 EXPECT_FALSE(checkRan);
971 loopController.loop([&]() { loopController.stop(); });
973 EXPECT_TRUE(checkRan);
976 TEST(FiberManager, fibersPoolWithinLimit) {
977 FiberManager::Options opts;
978 opts.maxFibersPoolSize = 5;
980 FiberManager manager(folly::make_unique<SimpleLoopController>(), opts);
981 auto& loopController =
982 dynamic_cast<SimpleLoopController&>(manager.loopController());
984 size_t fibersRun = 0;
986 for (size_t i = 0; i < 5; ++i) {
987 manager.addTask([&]() { ++fibersRun; });
989 loopController.loop([&]() { loopController.stop(); });
991 EXPECT_EQ(5, fibersRun);
992 EXPECT_EQ(5, manager.fibersAllocated());
993 EXPECT_EQ(5, manager.fibersPoolSize());
995 for (size_t i = 0; i < 5; ++i) {
996 manager.addTask([&]() { ++fibersRun; });
998 loopController.loop([&]() { loopController.stop(); });
1000 EXPECT_EQ(10, fibersRun);
1001 EXPECT_EQ(5, manager.fibersAllocated());
1002 EXPECT_EQ(5, manager.fibersPoolSize());
1005 TEST(FiberManager, fibersPoolOverLimit) {
1006 FiberManager::Options opts;
1007 opts.maxFibersPoolSize = 5;
1009 FiberManager manager(folly::make_unique<SimpleLoopController>(), opts);
1010 auto& loopController =
1011 dynamic_cast<SimpleLoopController&>(manager.loopController());
1013 size_t fibersRun = 0;
1015 for (size_t i = 0; i < 10; ++i) {
1016 manager.addTask([&]() { ++fibersRun; });
1019 EXPECT_EQ(0, fibersRun);
1020 EXPECT_EQ(10, manager.fibersAllocated());
1021 EXPECT_EQ(0, manager.fibersPoolSize());
1023 loopController.loop([&]() { loopController.stop(); });
1025 EXPECT_EQ(10, fibersRun);
1026 EXPECT_EQ(5, manager.fibersAllocated());
1027 EXPECT_EQ(5, manager.fibersPoolSize());
1030 TEST(FiberManager, remoteFiberBasic) {
1031 FiberManager manager(folly::make_unique<SimpleLoopController>());
1032 auto& loopController =
1033 dynamic_cast<SimpleLoopController&>(manager.loopController());
1036 result[0] = result[1] = 0;
1037 folly::Optional<Promise<int>> savedPromise[2];
1038 manager.addTask([&]() {
1040 [&](Promise<int> promise) { savedPromise[0] = std::move(promise); });
1042 manager.addTask([&]() {
1044 [&](Promise<int> promise) { savedPromise[1] = std::move(promise); });
1047 manager.loopUntilNoReady();
1049 EXPECT_TRUE(savedPromise[0].hasValue());
1050 EXPECT_TRUE(savedPromise[1].hasValue());
1051 EXPECT_EQ(0, result[0]);
1052 EXPECT_EQ(0, result[1]);
1054 std::thread remoteThread0{[&]() { savedPromise[0]->setValue(42); }};
1055 std::thread remoteThread1{[&]() { savedPromise[1]->setValue(43); }};
1056 remoteThread0.join();
1057 remoteThread1.join();
1058 EXPECT_EQ(0, result[0]);
1059 EXPECT_EQ(0, result[1]);
1060 /* Should only have scheduled once */
1061 EXPECT_EQ(1, loopController.remoteScheduleCalled());
1063 manager.loopUntilNoReady();
1064 EXPECT_EQ(42, result[0]);
1065 EXPECT_EQ(43, result[1]);
1068 TEST(FiberManager, addTaskRemoteBasic) {
1069 FiberManager manager(folly::make_unique<SimpleLoopController>());
1072 result[0] = result[1] = 0;
1073 folly::Optional<Promise<int>> savedPromise[2];
1075 std::thread remoteThread0{[&]() {
1076 manager.addTaskRemote([&]() {
1078 [&](Promise<int> promise) { savedPromise[0] = std::move(promise); });
1081 std::thread remoteThread1{[&]() {
1082 manager.addTaskRemote([&]() {
1084 [&](Promise<int> promise) { savedPromise[1] = std::move(promise); });
1087 remoteThread0.join();
1088 remoteThread1.join();
1090 manager.loopUntilNoReady();
1092 EXPECT_TRUE(savedPromise[0].hasValue());
1093 EXPECT_TRUE(savedPromise[1].hasValue());
1094 EXPECT_EQ(0, result[0]);
1095 EXPECT_EQ(0, result[1]);
1097 savedPromise[0]->setValue(42);
1098 savedPromise[1]->setValue(43);
1100 EXPECT_EQ(0, result[0]);
1101 EXPECT_EQ(0, result[1]);
1103 manager.loopUntilNoReady();
1104 EXPECT_EQ(42, result[0]);
1105 EXPECT_EQ(43, result[1]);
1108 TEST(FiberManager, remoteHasTasks) {
1110 FiberManager fm(folly::make_unique<SimpleLoopController>());
1111 std::thread remote([&]() { fm.addTaskRemote([&]() { ++counter; }); });
1115 while (fm.hasTasks()) {
1116 fm.loopUntilNoReady();
1119 EXPECT_FALSE(fm.hasTasks());
1120 EXPECT_EQ(counter, 1);
1123 TEST(FiberManager, remoteHasReadyTasks) {
1125 folly::Optional<Promise<int>> savedPromise;
1126 FiberManager fm(folly::make_unique<SimpleLoopController>());
1127 std::thread remote([&]() {
1128 fm.addTaskRemote([&]() {
1130 [&](Promise<int> promise) { savedPromise = std::move(promise); });
1131 EXPECT_TRUE(fm.hasTasks());
1136 EXPECT_TRUE(fm.hasTasks());
1138 fm.loopUntilNoReady();
1139 EXPECT_TRUE(fm.hasTasks());
1141 std::thread remote2([&]() { savedPromise->setValue(47); });
1143 EXPECT_TRUE(fm.hasTasks());
1145 fm.loopUntilNoReady();
1146 EXPECT_FALSE(fm.hasTasks());
1148 EXPECT_EQ(result, 47);
1151 template <typename Data>
1152 void testFiberLocal() {
1154 LocalType<Data>(), folly::make_unique<SimpleLoopController>());
1157 EXPECT_EQ(42, local<Data>().value);
1159 local<Data>().value = 43;
1162 EXPECT_EQ(43, local<Data>().value);
1164 local<Data>().value = 44;
1166 addTask([]() { EXPECT_EQ(44, local<Data>().value); });
1171 EXPECT_EQ(42, local<Data>().value);
1173 local<Data>().value = 43;
1175 fm.addTaskRemote([]() { EXPECT_EQ(43, local<Data>().value); });
1179 EXPECT_EQ(42, local<Data>().value);
1180 local<Data>().value = 43;
1183 EXPECT_EQ(43, local<Data>().value);
1184 local<Data>().value = 44;
1186 std::vector<std::function<void()>> tasks{task};
1187 collectAny(tasks.begin(), tasks.end());
1189 EXPECT_EQ(43, local<Data>().value);
1192 fm.loopUntilNoReady();
1193 EXPECT_FALSE(fm.hasTasks());
1196 TEST(FiberManager, fiberLocal) {
1201 testFiberLocal<SimpleData>();
1204 TEST(FiberManager, fiberLocalHeap) {
1206 char _[1024 * 1024];
1210 testFiberLocal<LargeData>();
1213 TEST(FiberManager, fiberLocalDestructor) {
1220 EXPECT_EQ(42, local<CrazyData>().data);
1221 // Make sure we don't have infinite loop
1222 local<CrazyData>().data = 0;
1229 LocalType<CrazyData>(), folly::make_unique<SimpleLoopController>());
1231 fm.addTask([]() { local<CrazyData>().data = 41; });
1233 fm.loopUntilNoReady();
1234 EXPECT_FALSE(fm.hasTasks());
1237 TEST(FiberManager, yieldTest) {
1238 FiberManager manager(folly::make_unique<SimpleLoopController>());
1239 auto& loopController =
1240 dynamic_cast<SimpleLoopController&>(manager.loopController());
1242 bool checkRan = false;
1244 manager.addTask([&]() {
1249 loopController.loop([&]() {
1251 loopController.stop();
1255 EXPECT_TRUE(checkRan);
1258 TEST(FiberManager, RequestContext) {
1259 FiberManager fm(folly::make_unique<SimpleLoopController>());
1261 bool checkRun1 = false;
1262 bool checkRun2 = false;
1263 bool checkRun3 = false;
1264 bool checkRun4 = false;
1265 folly::fibers::Baton baton1;
1266 folly::fibers::Baton baton2;
1267 folly::fibers::Baton baton3;
1268 folly::fibers::Baton baton4;
1271 folly::RequestContextScopeGuard rctx;
1272 auto rcontext1 = folly::RequestContext::get();
1273 fm.addTask([&, rcontext1]() {
1274 EXPECT_EQ(rcontext1, folly::RequestContext::get());
1276 [&]() { EXPECT_EQ(rcontext1, folly::RequestContext::get()); });
1277 EXPECT_EQ(rcontext1, folly::RequestContext::get());
1279 [&]() { EXPECT_EQ(rcontext1, folly::RequestContext::get()); });
1284 folly::RequestContextScopeGuard rctx;
1285 auto rcontext2 = folly::RequestContext::get();
1286 fm.addTaskRemote([&, rcontext2]() {
1287 EXPECT_EQ(rcontext2, folly::RequestContext::get());
1289 EXPECT_EQ(rcontext2, folly::RequestContext::get());
1294 folly::RequestContextScopeGuard rctx;
1295 auto rcontext3 = folly::RequestContext::get();
1298 EXPECT_EQ(rcontext3, folly::RequestContext::get());
1300 EXPECT_EQ(rcontext3, folly::RequestContext::get());
1302 return folly::Unit();
1304 [&, rcontext3](Try<folly::Unit>&& /* t */) {
1305 EXPECT_EQ(rcontext3, folly::RequestContext::get());
1310 folly::RequestContext::setContext(nullptr);
1312 folly::RequestContextScopeGuard rctx;
1313 auto rcontext4 = folly::RequestContext::get();
1315 EXPECT_EQ(rcontext4, folly::RequestContext::get());
1320 folly::RequestContextScopeGuard rctx;
1321 auto rcontext = folly::RequestContext::get();
1323 fm.loopUntilNoReady();
1324 EXPECT_EQ(rcontext, folly::RequestContext::get());
1327 EXPECT_EQ(rcontext, folly::RequestContext::get());
1328 fm.loopUntilNoReady();
1329 EXPECT_TRUE(checkRun1);
1330 EXPECT_EQ(rcontext, folly::RequestContext::get());
1333 EXPECT_EQ(rcontext, folly::RequestContext::get());
1334 fm.loopUntilNoReady();
1335 EXPECT_TRUE(checkRun2);
1336 EXPECT_EQ(rcontext, folly::RequestContext::get());
1339 EXPECT_EQ(rcontext, folly::RequestContext::get());
1340 fm.loopUntilNoReady();
1341 EXPECT_TRUE(checkRun3);
1342 EXPECT_EQ(rcontext, folly::RequestContext::get());
1345 EXPECT_EQ(rcontext, folly::RequestContext::get());
1346 fm.loopUntilNoReady();
1347 EXPECT_TRUE(checkRun4);
1348 EXPECT_EQ(rcontext, folly::RequestContext::get());
1352 TEST(FiberManager, resizePeriodically) {
1353 FiberManager::Options opts;
1354 opts.fibersPoolResizePeriodMs = 300;
1355 opts.maxFibersPoolSize = 5;
1357 FiberManager manager(folly::make_unique<EventBaseLoopController>(), opts);
1359 folly::EventBase evb;
1360 dynamic_cast<EventBaseLoopController&>(manager.loopController())
1361 .attachEventBase(evb);
1363 std::vector<Baton> batons(10);
1365 size_t tasksRun = 0;
1366 for (size_t i = 0; i < 30; ++i) {
1367 manager.addTask([i, &batons, &tasksRun]() {
1369 // Keep some fibers active indefinitely
1370 if (i < batons.size()) {
1376 EXPECT_EQ(0, tasksRun);
1377 EXPECT_EQ(30, manager.fibersAllocated());
1378 EXPECT_EQ(0, manager.fibersPoolSize());
1381 EXPECT_EQ(30, tasksRun);
1382 EXPECT_EQ(30, manager.fibersAllocated());
1383 // Can go over maxFibersPoolSize, 10 of 30 fibers still active
1384 EXPECT_EQ(20, manager.fibersPoolSize());
1386 std::this_thread::sleep_for(std::chrono::milliseconds(400));
1387 evb.loopOnce(); // no fibers active in this period
1388 EXPECT_EQ(30, manager.fibersAllocated());
1389 EXPECT_EQ(20, manager.fibersPoolSize());
1391 std::this_thread::sleep_for(std::chrono::milliseconds(400));
1392 evb.loopOnce(); // should shrink fibers pool to maxFibersPoolSize
1393 EXPECT_EQ(15, manager.fibersAllocated());
1394 EXPECT_EQ(5, manager.fibersPoolSize());
1396 for (size_t i = 0; i < batons.size(); ++i) {
1400 EXPECT_EQ(15, manager.fibersAllocated());
1401 EXPECT_EQ(15, manager.fibersPoolSize());
1403 std::this_thread::sleep_for(std::chrono::milliseconds(400));
1404 evb.loopOnce(); // 10 fibers active in last period
1405 EXPECT_EQ(10, manager.fibersAllocated());
1406 EXPECT_EQ(10, manager.fibersPoolSize());
1408 std::this_thread::sleep_for(std::chrono::milliseconds(400));
1410 EXPECT_EQ(5, manager.fibersAllocated());
1411 EXPECT_EQ(5, manager.fibersPoolSize());
1414 TEST(FiberManager, batonWaitTimeoutHandler) {
1415 FiberManager manager(folly::make_unique<EventBaseLoopController>());
1417 folly::EventBase evb;
1418 dynamic_cast<EventBaseLoopController&>(manager.loopController())
1419 .attachEventBase(evb);
1421 size_t fibersRun = 0;
1423 Baton::TimeoutHandler timeoutHandler;
1425 manager.addTask([&]() {
1426 baton.wait(timeoutHandler);
1429 manager.loopUntilNoReady();
1431 EXPECT_FALSE(baton.try_wait());
1432 EXPECT_EQ(0, fibersRun);
1434 timeoutHandler.scheduleTimeout(std::chrono::milliseconds(250));
1435 std::this_thread::sleep_for(std::chrono::milliseconds(500));
1437 EXPECT_FALSE(baton.try_wait());
1438 EXPECT_EQ(0, fibersRun);
1441 manager.loopUntilNoReady();
1443 EXPECT_EQ(1, fibersRun);
1446 TEST(FiberManager, batonWaitTimeoutMany) {
1447 FiberManager manager(folly::make_unique<EventBaseLoopController>());
1449 folly::EventBase evb;
1450 dynamic_cast<EventBaseLoopController&>(manager.loopController())
1451 .attachEventBase(evb);
1453 constexpr size_t kNumTimeoutTasks = 10000;
1454 size_t tasksCount = kNumTimeoutTasks;
1456 // We add many tasks to hit timeout queue deallocation logic.
1457 for (size_t i = 0; i < kNumTimeoutTasks; ++i) {
1458 manager.addTask([&]() {
1460 Baton::TimeoutHandler timeoutHandler;
1462 folly::fibers::addTask([&] {
1463 timeoutHandler.scheduleTimeout(std::chrono::milliseconds(1000));
1466 baton.wait(timeoutHandler);
1467 if (--tasksCount == 0) {
1468 evb.terminateLoopSoon();
1476 TEST(FiberManager, remoteFutureTest) {
1477 FiberManager fiberManager(folly::make_unique<SimpleLoopController>());
1478 auto& loopController =
1479 dynamic_cast<SimpleLoopController&>(fiberManager.loopController());
1483 auto f1 = fiberManager.addTaskFuture([&]() { return testValue1; });
1484 auto f2 = fiberManager.addTaskRemoteFuture([&]() { return testValue2; });
1485 loopController.loop([&]() { loopController.stop(); });
1489 EXPECT_EQ(v1, testValue1);
1490 EXPECT_EQ(v2, testValue2);
1493 // Test that a void function produes a Future<Unit>.
1494 TEST(FiberManager, remoteFutureVoidUnitTest) {
1495 FiberManager fiberManager(folly::make_unique<SimpleLoopController>());
1496 auto& loopController =
1497 dynamic_cast<SimpleLoopController&>(fiberManager.loopController());
1499 bool ranLocal = false;
1500 folly::Future<folly::Unit> futureLocal =
1501 fiberManager.addTaskFuture([&]() { ranLocal = true; });
1503 bool ranRemote = false;
1504 folly::Future<folly::Unit> futureRemote =
1505 fiberManager.addTaskRemoteFuture([&]() { ranRemote = true; });
1507 loopController.loop([&]() { loopController.stop(); });
1510 ASSERT_TRUE(ranLocal);
1512 futureRemote.wait();
1513 ASSERT_TRUE(ranRemote);
1516 TEST(FiberManager, nestedFiberManagers) {
1517 folly::EventBase outerEvb;
1518 folly::EventBase innerEvb;
1520 getFiberManager(outerEvb).addTask([&]() {
1522 &getFiberManager(outerEvb), FiberManager::getFiberManagerUnsafe());
1524 runInMainContext([&]() {
1525 getFiberManager(innerEvb).addTask([&]() {
1527 &getFiberManager(innerEvb), FiberManager::getFiberManagerUnsafe());
1529 innerEvb.terminateLoopSoon();
1532 innerEvb.loopForever();
1536 &getFiberManager(outerEvb), FiberManager::getFiberManagerUnsafe());
1538 outerEvb.terminateLoopSoon();
1541 outerEvb.loopForever();
1544 static size_t sNumAwaits;
1546 void runBenchmark(size_t numAwaits, size_t toSend) {
1547 sNumAwaits = numAwaits;
1549 FiberManager fiberManager(folly::make_unique<SimpleLoopController>());
1550 auto& loopController =
1551 dynamic_cast<SimpleLoopController&>(fiberManager.loopController());
1553 std::queue<Promise<int>> pendingRequests;
1554 static const size_t maxOutstanding = 5;
1556 auto loop = [&fiberManager, &loopController, &pendingRequests, &toSend]() {
1557 if (pendingRequests.size() == maxOutstanding || toSend == 0) {
1558 if (pendingRequests.empty()) {
1561 pendingRequests.front().setValue(0);
1562 pendingRequests.pop();
1564 fiberManager.addTask([&pendingRequests]() {
1565 for (size_t i = 0; i < sNumAwaits; ++i) {
1566 auto result = await([&pendingRequests](Promise<int> promise) {
1567 pendingRequests.push(std::move(promise));
1569 DCHECK_EQ(result, 0);
1573 if (--toSend == 0) {
1574 loopController.stop();
1579 loopController.loop(std::move(loop));
1582 BENCHMARK(FiberManagerBasicOneAwait, iters) {
1583 runBenchmark(1, iters);
1586 BENCHMARK(FiberManagerBasicFiveAwaits, iters) {
1587 runBenchmark(5, iters);
1590 BENCHMARK(FiberManagerCreateDestroy, iters) {
1591 for (size_t i = 0; i < iters; ++i) {
1592 folly::EventBase evb;
1593 auto& fm = folly::fibers::getFiberManager(evb);
1594 fm.addTask([]() {});
1599 BENCHMARK(FiberManagerAllocateDeallocatePattern, iters) {
1600 static const size_t kNumAllocations = 10000;
1602 FiberManager::Options opts;
1603 opts.maxFibersPoolSize = 0;
1605 FiberManager fiberManager(folly::make_unique<SimpleLoopController>(), opts);
1607 for (size_t iter = 0; iter < iters; ++iter) {
1608 EXPECT_EQ(0, fiberManager.fibersPoolSize());
1610 size_t fibersRun = 0;
1612 for (size_t i = 0; i < kNumAllocations; ++i) {
1613 fiberManager.addTask([&fibersRun] { ++fibersRun; });
1614 fiberManager.loopUntilNoReady();
1617 EXPECT_EQ(10000, fibersRun);
1618 EXPECT_EQ(0, fiberManager.fibersPoolSize());
1622 BENCHMARK(FiberManagerAllocateLargeChunk, iters) {
1623 static const size_t kNumAllocations = 10000;
1625 FiberManager::Options opts;
1626 opts.maxFibersPoolSize = 0;
1628 FiberManager fiberManager(folly::make_unique<SimpleLoopController>(), opts);
1630 for (size_t iter = 0; iter < iters; ++iter) {
1631 EXPECT_EQ(0, fiberManager.fibersPoolSize());
1633 size_t fibersRun = 0;
1635 for (size_t i = 0; i < kNumAllocations; ++i) {
1636 fiberManager.addTask([&fibersRun] { ++fibersRun; });
1639 fiberManager.loopUntilNoReady();
1641 EXPECT_EQ(10000, fibersRun);
1642 EXPECT_EQ(0, fiberManager.fibersPoolSize());