2 * Copyright 2014-present Facebook, Inc.
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
8 * http://www.apache.org/licenses/LICENSE-2.0
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
20 #include <folly/Memory.h>
21 #include <folly/Random.h>
22 #include <folly/futures/Future.h>
24 #include <folly/Conv.h>
25 #include <folly/fibers/AddTasks.h>
26 #include <folly/fibers/AtomicBatchDispatcher.h>
27 #include <folly/fibers/BatchDispatcher.h>
28 #include <folly/fibers/EventBaseLoopController.h>
29 #include <folly/fibers/FiberManager.h>
30 #include <folly/fibers/FiberManagerMap.h>
31 #include <folly/fibers/GenericBaton.h>
32 #include <folly/fibers/Semaphore.h>
33 #include <folly/fibers/SimpleLoopController.h>
34 #include <folly/fibers/TimedMutex.h>
35 #include <folly/fibers/WhenN.h>
36 #include <folly/io/async/ScopedEventBaseThread.h>
37 #include <folly/portability/GTest.h>
39 using namespace folly::fibers;
43 TEST(FiberManager, batonTimedWaitTimeout) {
44 bool taskAdded = false;
45 size_t iterations = 0;
47 FiberManager manager(std::make_unique<SimpleLoopController>());
48 auto& loopController =
49 dynamic_cast<SimpleLoopController&>(manager.loopController());
51 auto loopFunc = [&]() {
53 manager.addTask([&]() {
56 auto res = baton.try_wait_for(std::chrono::milliseconds(230));
59 EXPECT_EQ(5, iterations);
61 loopController.stop();
63 manager.addTask([&]() {
66 auto res = baton.try_wait_for(std::chrono::milliseconds(130));
69 EXPECT_EQ(3, iterations);
71 loopController.stop();
75 std::this_thread::sleep_for(std::chrono::milliseconds(50));
80 loopController.loop(std::move(loopFunc));
83 TEST(FiberManager, batonTimedWaitPost) {
84 bool taskAdded = false;
85 size_t iterations = 0;
88 FiberManager manager(std::make_unique<SimpleLoopController>());
89 auto& loopController =
90 dynamic_cast<SimpleLoopController&>(manager.loopController());
92 auto loopFunc = [&]() {
94 manager.addTask([&]() {
98 auto res = baton.try_wait_for(std::chrono::milliseconds(130));
101 EXPECT_EQ(2, iterations);
103 loopController.stop();
107 std::this_thread::sleep_for(std::chrono::milliseconds(50));
109 if (iterations == 2) {
115 loopController.loop(std::move(loopFunc));
118 TEST(FiberManager, batonTimedWaitTimeoutEvb) {
119 size_t tasksComplete = 0;
121 folly::EventBase evb;
123 FiberManager manager(std::make_unique<EventBaseLoopController>());
124 dynamic_cast<EventBaseLoopController&>(manager.loopController())
125 .attachEventBase(evb);
127 auto task = [&](size_t timeout_ms) {
130 auto start = EventBaseLoopController::Clock::now();
131 auto res = baton.try_wait_for(std::chrono::milliseconds(timeout_ms));
132 auto finish = EventBaseLoopController::Clock::now();
137 std::chrono::duration_cast<std::chrono::milliseconds>(finish - start);
139 EXPECT_GT(duration_ms.count(), timeout_ms - 50);
140 EXPECT_LT(duration_ms.count(), timeout_ms + 50);
142 if (++tasksComplete == 2) {
143 evb.terminateLoopSoon();
147 evb.runInEventBaseThread([&]() {
148 manager.addTask([&]() { task(500); });
149 manager.addTask([&]() { task(250); });
154 EXPECT_EQ(2, tasksComplete);
157 TEST(FiberManager, batonTimedWaitPostEvb) {
158 size_t tasksComplete = 0;
160 folly::EventBase evb;
162 FiberManager manager(std::make_unique<EventBaseLoopController>());
163 dynamic_cast<EventBaseLoopController&>(manager.loopController())
164 .attachEventBase(evb);
166 evb.runInEventBaseThread([&]() {
167 manager.addTask([&]() {
170 evb.tryRunAfterDelay([&]() { baton.post(); }, 100);
172 auto start = EventBaseLoopController::Clock::now();
173 auto res = baton.try_wait_for(std::chrono::milliseconds(130));
174 auto finish = EventBaseLoopController::Clock::now();
179 std::chrono::duration_cast<std::chrono::milliseconds>(finish - start);
181 EXPECT_TRUE(duration_ms.count() > 95 && duration_ms.count() < 110);
183 if (++tasksComplete == 1) {
184 evb.terminateLoopSoon();
191 EXPECT_EQ(1, tasksComplete);
194 TEST(FiberManager, batonTryWait) {
195 FiberManager manager(std::make_unique<SimpleLoopController>());
197 // Check if try_wait and post work as expected
200 manager.addTask([&]() {
201 while (!b.try_wait()) {
204 auto thr = std::thread([&]() {
205 std::this_thread::sleep_for(std::chrono::milliseconds(300));
209 manager.loopUntilNoReady();
214 // Check try_wait without post
215 manager.addTask([&]() {
217 while (cnt && !c.try_wait()) {
220 EXPECT_TRUE(!c.try_wait()); // must still hold
224 manager.loopUntilNoReady();
227 TEST(FiberManager, genericBatonFiberWait) {
228 FiberManager manager(std::make_unique<SimpleLoopController>());
231 bool fiberRunning = false;
233 manager.addTask([&]() {
234 EXPECT_EQ(manager.hasActiveFiber(), true);
237 fiberRunning = false;
240 EXPECT_FALSE(fiberRunning);
241 manager.loopUntilNoReady();
242 EXPECT_TRUE(fiberRunning); // ensure fiber still active
244 auto thr = std::thread([&]() {
245 std::this_thread::sleep_for(std::chrono::milliseconds(300));
249 while (fiberRunning) {
250 manager.loopUntilNoReady();
256 TEST(FiberManager, genericBatonThreadWait) {
257 FiberManager manager(std::make_unique<SimpleLoopController>());
259 std::atomic<bool> threadWaiting(false);
261 auto thr = std::thread([&]() {
262 threadWaiting = true;
264 threadWaiting = false;
267 while (!threadWaiting) {
269 std::this_thread::sleep_for(std::chrono::milliseconds(300));
271 manager.addTask([&]() {
272 EXPECT_EQ(manager.hasActiveFiber(), true);
273 EXPECT_TRUE(threadWaiting);
275 while (threadWaiting) {
279 manager.loopUntilNoReady();
283 TEST(FiberManager, addTasksNoncopyable) {
284 std::vector<Promise<int>> pendingFibers;
285 bool taskAdded = false;
287 FiberManager manager(std::make_unique<SimpleLoopController>());
288 auto& loopController =
289 dynamic_cast<SimpleLoopController&>(manager.loopController());
291 auto loopFunc = [&]() {
293 manager.addTask([&]() {
294 std::vector<std::function<std::unique_ptr<int>()>> funcs;
295 for (int i = 0; i < 3; ++i) {
296 funcs.push_back([i, &pendingFibers]() {
297 await([&pendingFibers](Promise<int> promise) {
298 pendingFibers.push_back(std::move(promise));
300 return std::make_unique<int>(i * 2 + 1);
304 auto iter = addTasks(funcs.begin(), funcs.end());
307 while (iter.hasNext()) {
308 auto result = iter.awaitNext();
309 EXPECT_EQ(2 * iter.getTaskID() + 1, *result);
310 EXPECT_GE(2 - n, pendingFibers.size());
316 } else if (pendingFibers.size()) {
317 pendingFibers.back().setValue(0);
318 pendingFibers.pop_back();
320 loopController.stop();
324 loopController.loop(std::move(loopFunc));
327 TEST(FiberManager, awaitThrow) {
328 folly::EventBase evb;
329 struct ExpectedException {};
333 await([](Promise<int> p) {
335 throw ExpectedException();
340 await([&](Promise<int> p) {
341 evb.runInEventBaseThread([p = std::move(p)]() mutable {
344 throw ExpectedException();
351 TEST(FiberManager, addTasksThrow) {
352 std::vector<Promise<int>> pendingFibers;
353 bool taskAdded = false;
355 FiberManager manager(std::make_unique<SimpleLoopController>());
356 auto& loopController =
357 dynamic_cast<SimpleLoopController&>(manager.loopController());
359 auto loopFunc = [&]() {
361 manager.addTask([&]() {
362 std::vector<std::function<int()>> funcs;
363 for (size_t i = 0; i < 3; ++i) {
364 funcs.push_back([i, &pendingFibers]() {
365 await([&pendingFibers](Promise<int> promise) {
366 pendingFibers.push_back(std::move(promise));
369 throw std::runtime_error("Runtime");
375 auto iter = addTasks(funcs.begin(), funcs.end());
378 while (iter.hasNext()) {
380 int result = iter.awaitNext();
381 EXPECT_EQ(1, iter.getTaskID() % 2);
382 EXPECT_EQ(2 * iter.getTaskID() + 1, result);
384 EXPECT_EQ(0, iter.getTaskID() % 2);
386 EXPECT_GE(2 - n, pendingFibers.size());
392 } else if (pendingFibers.size()) {
393 pendingFibers.back().setValue(0);
394 pendingFibers.pop_back();
396 loopController.stop();
400 loopController.loop(std::move(loopFunc));
403 TEST(FiberManager, addTasksVoid) {
404 std::vector<Promise<int>> pendingFibers;
405 bool taskAdded = false;
407 FiberManager manager(std::make_unique<SimpleLoopController>());
408 auto& loopController =
409 dynamic_cast<SimpleLoopController&>(manager.loopController());
411 auto loopFunc = [&]() {
413 manager.addTask([&]() {
414 std::vector<std::function<void()>> funcs;
415 for (size_t i = 0; i < 3; ++i) {
416 funcs.push_back([&pendingFibers]() {
417 await([&pendingFibers](Promise<int> promise) {
418 pendingFibers.push_back(std::move(promise));
423 auto iter = addTasks(funcs.begin(), funcs.end());
426 while (iter.hasNext()) {
428 EXPECT_GE(2 - n, pendingFibers.size());
434 } else if (pendingFibers.size()) {
435 pendingFibers.back().setValue(0);
436 pendingFibers.pop_back();
438 loopController.stop();
442 loopController.loop(std::move(loopFunc));
445 TEST(FiberManager, addTasksVoidThrow) {
446 std::vector<Promise<int>> pendingFibers;
447 bool taskAdded = false;
449 FiberManager manager(std::make_unique<SimpleLoopController>());
450 auto& loopController =
451 dynamic_cast<SimpleLoopController&>(manager.loopController());
453 auto loopFunc = [&]() {
455 manager.addTask([&]() {
456 std::vector<std::function<void()>> funcs;
457 for (size_t i = 0; i < 3; ++i) {
458 funcs.push_back([i, &pendingFibers]() {
459 await([&pendingFibers](Promise<int> promise) {
460 pendingFibers.push_back(std::move(promise));
463 throw std::runtime_error("");
468 auto iter = addTasks(funcs.begin(), funcs.end());
471 while (iter.hasNext()) {
474 EXPECT_EQ(1, iter.getTaskID() % 2);
476 EXPECT_EQ(0, iter.getTaskID() % 2);
478 EXPECT_GE(2 - n, pendingFibers.size());
484 } else if (pendingFibers.size()) {
485 pendingFibers.back().setValue(0);
486 pendingFibers.pop_back();
488 loopController.stop();
492 loopController.loop(std::move(loopFunc));
495 TEST(FiberManager, addTasksReserve) {
496 std::vector<Promise<int>> pendingFibers;
497 bool taskAdded = false;
499 FiberManager manager(std::make_unique<SimpleLoopController>());
500 auto& loopController =
501 dynamic_cast<SimpleLoopController&>(manager.loopController());
503 auto loopFunc = [&]() {
505 manager.addTask([&]() {
506 std::vector<std::function<void()>> funcs;
507 for (size_t i = 0; i < 3; ++i) {
508 funcs.push_back([&pendingFibers]() {
509 await([&pendingFibers](Promise<int> promise) {
510 pendingFibers.push_back(std::move(promise));
515 auto iter = addTasks(funcs.begin(), funcs.end());
518 EXPECT_TRUE(iter.hasCompleted());
519 EXPECT_TRUE(iter.hasPending());
520 EXPECT_TRUE(iter.hasNext());
523 EXPECT_TRUE(iter.hasCompleted());
524 EXPECT_TRUE(iter.hasPending());
525 EXPECT_TRUE(iter.hasNext());
528 EXPECT_FALSE(iter.hasCompleted());
529 EXPECT_TRUE(iter.hasPending());
530 EXPECT_TRUE(iter.hasNext());
533 EXPECT_FALSE(iter.hasCompleted());
534 EXPECT_FALSE(iter.hasPending());
535 EXPECT_FALSE(iter.hasNext());
538 } else if (pendingFibers.size()) {
539 pendingFibers.back().setValue(0);
540 pendingFibers.pop_back();
542 loopController.stop();
546 loopController.loop(std::move(loopFunc));
549 TEST(FiberManager, addTaskDynamic) {
550 folly::EventBase evb;
554 auto makeTask = [&](size_t taskId) {
555 return [&, taskId]() -> size_t {
556 batons[taskId].wait();
562 .addTaskFuture([&]() {
563 TaskIterator<size_t> iterator;
565 iterator.addTask(makeTask(0));
566 iterator.addTask(makeTask(1));
570 EXPECT_EQ(1, iterator.awaitNext());
572 iterator.addTask(makeTask(2));
576 EXPECT_EQ(2, iterator.awaitNext());
580 EXPECT_EQ(0, iterator.awaitNext());
585 TEST(FiberManager, forEach) {
586 std::vector<Promise<int>> pendingFibers;
587 bool taskAdded = false;
589 FiberManager manager(std::make_unique<SimpleLoopController>());
590 auto& loopController =
591 dynamic_cast<SimpleLoopController&>(manager.loopController());
593 auto loopFunc = [&]() {
595 manager.addTask([&]() {
596 std::vector<std::function<int()>> funcs;
597 for (size_t i = 0; i < 3; ++i) {
598 funcs.push_back([i, &pendingFibers]() {
599 await([&pendingFibers](Promise<int> promise) {
600 pendingFibers.push_back(std::move(promise));
606 std::vector<std::pair<size_t, int>> results;
607 forEach(funcs.begin(), funcs.end(), [&results](size_t id, int result) {
608 results.emplace_back(id, result);
610 EXPECT_EQ(3, results.size());
611 EXPECT_TRUE(pendingFibers.empty());
612 for (size_t i = 0; i < 3; ++i) {
613 EXPECT_EQ(results[i].first * 2 + 1, results[i].second);
617 } else if (pendingFibers.size()) {
618 pendingFibers.back().setValue(0);
619 pendingFibers.pop_back();
621 loopController.stop();
625 loopController.loop(std::move(loopFunc));
628 TEST(FiberManager, collectN) {
629 std::vector<Promise<int>> pendingFibers;
630 bool taskAdded = false;
632 FiberManager manager(std::make_unique<SimpleLoopController>());
633 auto& loopController =
634 dynamic_cast<SimpleLoopController&>(manager.loopController());
636 auto loopFunc = [&]() {
638 manager.addTask([&]() {
639 std::vector<std::function<int()>> funcs;
640 for (size_t i = 0; i < 3; ++i) {
641 funcs.push_back([i, &pendingFibers]() {
642 await([&pendingFibers](Promise<int> promise) {
643 pendingFibers.push_back(std::move(promise));
649 auto results = collectN(funcs.begin(), funcs.end(), 2);
650 EXPECT_EQ(2, results.size());
651 EXPECT_EQ(1, pendingFibers.size());
652 for (size_t i = 0; i < 2; ++i) {
653 EXPECT_EQ(results[i].first * 2 + 1, results[i].second);
657 } else if (pendingFibers.size()) {
658 pendingFibers.back().setValue(0);
659 pendingFibers.pop_back();
661 loopController.stop();
665 loopController.loop(std::move(loopFunc));
668 TEST(FiberManager, collectNThrow) {
669 std::vector<Promise<int>> pendingFibers;
670 bool taskAdded = false;
672 FiberManager manager(std::make_unique<SimpleLoopController>());
673 auto& loopController =
674 dynamic_cast<SimpleLoopController&>(manager.loopController());
676 auto loopFunc = [&]() {
678 manager.addTask([&]() {
679 std::vector<std::function<int()>> funcs;
680 for (size_t i = 0; i < 3; ++i) {
681 funcs.push_back([&pendingFibers]() -> size_t {
682 await([&pendingFibers](Promise<int> promise) {
683 pendingFibers.push_back(std::move(promise));
685 throw std::runtime_error("Runtime");
690 collectN(funcs.begin(), funcs.end(), 2);
692 EXPECT_EQ(1, pendingFibers.size());
696 } else if (pendingFibers.size()) {
697 pendingFibers.back().setValue(0);
698 pendingFibers.pop_back();
700 loopController.stop();
704 loopController.loop(std::move(loopFunc));
707 TEST(FiberManager, collectNVoid) {
708 std::vector<Promise<int>> pendingFibers;
709 bool taskAdded = false;
711 FiberManager manager(std::make_unique<SimpleLoopController>());
712 auto& loopController =
713 dynamic_cast<SimpleLoopController&>(manager.loopController());
715 auto loopFunc = [&]() {
717 manager.addTask([&]() {
718 std::vector<std::function<void()>> funcs;
719 for (size_t i = 0; i < 3; ++i) {
720 funcs.push_back([&pendingFibers]() {
721 await([&pendingFibers](Promise<int> promise) {
722 pendingFibers.push_back(std::move(promise));
727 auto results = collectN(funcs.begin(), funcs.end(), 2);
728 EXPECT_EQ(2, results.size());
729 EXPECT_EQ(1, pendingFibers.size());
732 } else if (pendingFibers.size()) {
733 pendingFibers.back().setValue(0);
734 pendingFibers.pop_back();
736 loopController.stop();
740 loopController.loop(std::move(loopFunc));
743 TEST(FiberManager, collectNVoidThrow) {
744 std::vector<Promise<int>> pendingFibers;
745 bool taskAdded = false;
747 FiberManager manager(std::make_unique<SimpleLoopController>());
748 auto& loopController =
749 dynamic_cast<SimpleLoopController&>(manager.loopController());
751 auto loopFunc = [&]() {
753 manager.addTask([&]() {
754 std::vector<std::function<void()>> funcs;
755 for (size_t i = 0; i < 3; ++i) {
756 funcs.push_back([&pendingFibers]() {
757 await([&pendingFibers](Promise<int> promise) {
758 pendingFibers.push_back(std::move(promise));
760 throw std::runtime_error("Runtime");
765 collectN(funcs.begin(), funcs.end(), 2);
767 EXPECT_EQ(1, pendingFibers.size());
771 } else if (pendingFibers.size()) {
772 pendingFibers.back().setValue(0);
773 pendingFibers.pop_back();
775 loopController.stop();
779 loopController.loop(std::move(loopFunc));
782 TEST(FiberManager, collectAll) {
783 std::vector<Promise<int>> pendingFibers;
784 bool taskAdded = false;
786 FiberManager manager(std::make_unique<SimpleLoopController>());
787 auto& loopController =
788 dynamic_cast<SimpleLoopController&>(manager.loopController());
790 auto loopFunc = [&]() {
792 manager.addTask([&]() {
793 std::vector<std::function<int()>> funcs;
794 for (size_t i = 0; i < 3; ++i) {
795 funcs.push_back([i, &pendingFibers]() {
796 await([&pendingFibers](Promise<int> promise) {
797 pendingFibers.push_back(std::move(promise));
803 auto results = collectAll(funcs.begin(), funcs.end());
804 EXPECT_TRUE(pendingFibers.empty());
805 for (size_t i = 0; i < 3; ++i) {
806 EXPECT_EQ(i * 2 + 1, results[i]);
810 } else if (pendingFibers.size()) {
811 pendingFibers.back().setValue(0);
812 pendingFibers.pop_back();
814 loopController.stop();
818 loopController.loop(std::move(loopFunc));
821 TEST(FiberManager, collectAllVoid) {
822 std::vector<Promise<int>> pendingFibers;
823 bool taskAdded = false;
825 FiberManager manager(std::make_unique<SimpleLoopController>());
826 auto& loopController =
827 dynamic_cast<SimpleLoopController&>(manager.loopController());
829 auto loopFunc = [&]() {
831 manager.addTask([&]() {
832 std::vector<std::function<void()>> funcs;
833 for (size_t i = 0; i < 3; ++i) {
834 funcs.push_back([&pendingFibers]() {
835 await([&pendingFibers](Promise<int> promise) {
836 pendingFibers.push_back(std::move(promise));
841 collectAll(funcs.begin(), funcs.end());
842 EXPECT_TRUE(pendingFibers.empty());
845 } else if (pendingFibers.size()) {
846 pendingFibers.back().setValue(0);
847 pendingFibers.pop_back();
849 loopController.stop();
853 loopController.loop(std::move(loopFunc));
856 TEST(FiberManager, collectAny) {
857 std::vector<Promise<int>> pendingFibers;
858 bool taskAdded = false;
860 FiberManager manager(std::make_unique<SimpleLoopController>());
861 auto& loopController =
862 dynamic_cast<SimpleLoopController&>(manager.loopController());
864 auto loopFunc = [&]() {
866 manager.addTask([&]() {
867 std::vector<std::function<int()>> funcs;
868 for (size_t i = 0; i < 3; ++i) {
869 funcs.push_back([i, &pendingFibers]() {
870 await([&pendingFibers](Promise<int> promise) {
871 pendingFibers.push_back(std::move(promise));
874 throw std::runtime_error("This exception will be ignored");
880 auto result = collectAny(funcs.begin(), funcs.end());
881 EXPECT_EQ(2, pendingFibers.size());
882 EXPECT_EQ(2, result.first);
883 EXPECT_EQ(2 * 2 + 1, result.second);
886 } else if (pendingFibers.size()) {
887 pendingFibers.back().setValue(0);
888 pendingFibers.pop_back();
890 loopController.stop();
894 loopController.loop(std::move(loopFunc));
898 /* Checks that this function was run from a main context,
899 by comparing an address on a stack to a known main stack address
900 and a known related fiber stack address. The assumption
901 is that fiber stack and main stack will be far enough apart,
902 while any two values on the same stack will be close. */
903 void expectMainContext(bool& ran, int* mainLocation, int* fiberLocation) {
905 /* 2 pages is a good guess */
906 constexpr auto const kHereToFiberMaxDist = 0x2000 / sizeof(int);
908 // With ASAN's detect_stack_use_after_return=1, this must be much larger
909 // I measured 410028 on x86_64, so allow for quadruple that, just in case.
910 constexpr auto const kHereToMainMaxDist =
911 folly::kIsSanitizeAddress ? 4 * 410028 : kHereToFiberMaxDist;
914 EXPECT_GT(std::abs(&here - fiberLocation), kHereToFiberMaxDist);
917 EXPECT_LT(std::abs(&here - mainLocation), kHereToMainMaxDist);
925 TEST(FiberManager, runInMainContext) {
926 FiberManager manager(std::make_unique<SimpleLoopController>());
927 auto& loopController =
928 dynamic_cast<SimpleLoopController&>(manager.loopController());
930 bool checkRan = false;
933 manager.runInMainContext(
934 [&]() { expectMainContext(checkRan, &mainLocation, nullptr); });
935 EXPECT_TRUE(checkRan);
939 manager.addTask([&]() {
941 explicit A(int value_) : value(value_) {}
942 A(const A&) = delete;
948 auto ret = runInMainContext([&]() {
949 expectMainContext(checkRan, &mainLocation, &stackLocation);
952 EXPECT_TRUE(checkRan);
953 EXPECT_EQ(42, ret.value);
956 loopController.loop([&]() { loopController.stop(); });
958 EXPECT_TRUE(checkRan);
961 TEST(FiberManager, addTaskFinally) {
962 FiberManager manager(std::make_unique<SimpleLoopController>());
963 auto& loopController =
964 dynamic_cast<SimpleLoopController&>(manager.loopController());
966 bool checkRan = false;
970 manager.addTaskFinally(
971 [&]() { return 1234; },
972 [&](Try<int>&& result) {
973 EXPECT_EQ(result.value(), 1234);
975 expectMainContext(checkRan, &mainLocation, nullptr);
978 EXPECT_FALSE(checkRan);
980 loopController.loop([&]() { loopController.stop(); });
982 EXPECT_TRUE(checkRan);
985 TEST(FiberManager, fibersPoolWithinLimit) {
986 FiberManager::Options opts;
987 opts.maxFibersPoolSize = 5;
989 FiberManager manager(std::make_unique<SimpleLoopController>(), opts);
990 auto& loopController =
991 dynamic_cast<SimpleLoopController&>(manager.loopController());
993 size_t fibersRun = 0;
995 for (size_t i = 0; i < 5; ++i) {
996 manager.addTask([&]() { ++fibersRun; });
998 loopController.loop([&]() { loopController.stop(); });
1000 EXPECT_EQ(5, fibersRun);
1001 EXPECT_EQ(5, manager.fibersAllocated());
1002 EXPECT_EQ(5, manager.fibersPoolSize());
1004 for (size_t i = 0; i < 5; ++i) {
1005 manager.addTask([&]() { ++fibersRun; });
1007 loopController.loop([&]() { loopController.stop(); });
1009 EXPECT_EQ(10, fibersRun);
1010 EXPECT_EQ(5, manager.fibersAllocated());
1011 EXPECT_EQ(5, manager.fibersPoolSize());
1014 TEST(FiberManager, fibersPoolOverLimit) {
1015 FiberManager::Options opts;
1016 opts.maxFibersPoolSize = 5;
1018 FiberManager manager(std::make_unique<SimpleLoopController>(), opts);
1019 auto& loopController =
1020 dynamic_cast<SimpleLoopController&>(manager.loopController());
1022 size_t fibersRun = 0;
1024 for (size_t i = 0; i < 10; ++i) {
1025 manager.addTask([&]() { ++fibersRun; });
1028 EXPECT_EQ(0, fibersRun);
1029 EXPECT_EQ(10, manager.fibersAllocated());
1030 EXPECT_EQ(0, manager.fibersPoolSize());
1032 loopController.loop([&]() { loopController.stop(); });
1034 EXPECT_EQ(10, fibersRun);
1035 EXPECT_EQ(5, manager.fibersAllocated());
1036 EXPECT_EQ(5, manager.fibersPoolSize());
1039 TEST(FiberManager, remoteFiberBasic) {
1040 FiberManager manager(std::make_unique<SimpleLoopController>());
1041 auto& loopController =
1042 dynamic_cast<SimpleLoopController&>(manager.loopController());
1045 result[0] = result[1] = 0;
1046 folly::Optional<Promise<int>> savedPromise[2];
1047 manager.addTask([&]() {
1049 [&](Promise<int> promise) { savedPromise[0] = std::move(promise); });
1051 manager.addTask([&]() {
1053 [&](Promise<int> promise) { savedPromise[1] = std::move(promise); });
1056 manager.loopUntilNoReady();
1058 EXPECT_TRUE(savedPromise[0].hasValue());
1059 EXPECT_TRUE(savedPromise[1].hasValue());
1060 EXPECT_EQ(0, result[0]);
1061 EXPECT_EQ(0, result[1]);
1063 std::thread remoteThread0{[&]() { savedPromise[0]->setValue(42); }};
1064 std::thread remoteThread1{[&]() { savedPromise[1]->setValue(43); }};
1065 remoteThread0.join();
1066 remoteThread1.join();
1067 EXPECT_EQ(0, result[0]);
1068 EXPECT_EQ(0, result[1]);
1069 /* Should only have scheduled once */
1070 EXPECT_EQ(1, loopController.remoteScheduleCalled());
1072 manager.loopUntilNoReady();
1073 EXPECT_EQ(42, result[0]);
1074 EXPECT_EQ(43, result[1]);
1077 TEST(FiberManager, addTaskRemoteBasic) {
1078 FiberManager manager(std::make_unique<SimpleLoopController>());
1081 result[0] = result[1] = 0;
1082 folly::Optional<Promise<int>> savedPromise[2];
1084 std::thread remoteThread0{[&]() {
1085 manager.addTaskRemote([&]() {
1087 [&](Promise<int> promise) { savedPromise[0] = std::move(promise); });
1090 std::thread remoteThread1{[&]() {
1091 manager.addTaskRemote([&]() {
1093 [&](Promise<int> promise) { savedPromise[1] = std::move(promise); });
1096 remoteThread0.join();
1097 remoteThread1.join();
1099 manager.loopUntilNoReady();
1101 EXPECT_TRUE(savedPromise[0].hasValue());
1102 EXPECT_TRUE(savedPromise[1].hasValue());
1103 EXPECT_EQ(0, result[0]);
1104 EXPECT_EQ(0, result[1]);
1106 savedPromise[0]->setValue(42);
1107 savedPromise[1]->setValue(43);
1109 EXPECT_EQ(0, result[0]);
1110 EXPECT_EQ(0, result[1]);
1112 manager.loopUntilNoReady();
1113 EXPECT_EQ(42, result[0]);
1114 EXPECT_EQ(43, result[1]);
1117 TEST(FiberManager, remoteHasTasks) {
1119 FiberManager fm(std::make_unique<SimpleLoopController>());
1120 std::thread remote([&]() { fm.addTaskRemote([&]() { ++counter; }); });
1124 while (fm.hasTasks()) {
1125 fm.loopUntilNoReady();
1128 EXPECT_FALSE(fm.hasTasks());
1129 EXPECT_EQ(counter, 1);
1132 TEST(FiberManager, remoteHasReadyTasks) {
1134 folly::Optional<Promise<int>> savedPromise;
1135 FiberManager fm(std::make_unique<SimpleLoopController>());
1136 std::thread remote([&]() {
1137 fm.addTaskRemote([&]() {
1139 [&](Promise<int> promise) { savedPromise = std::move(promise); });
1140 EXPECT_TRUE(fm.hasTasks());
1145 EXPECT_TRUE(fm.hasTasks());
1147 fm.loopUntilNoReady();
1148 EXPECT_TRUE(fm.hasTasks());
1150 std::thread remote2([&]() { savedPromise->setValue(47); });
1152 EXPECT_TRUE(fm.hasTasks());
1154 fm.loopUntilNoReady();
1155 EXPECT_FALSE(fm.hasTasks());
1157 EXPECT_EQ(result, 47);
1160 template <typename Data>
1161 void testFiberLocal() {
1162 FiberManager fm(LocalType<Data>(), std::make_unique<SimpleLoopController>());
1165 EXPECT_EQ(42, local<Data>().value);
1167 local<Data>().value = 43;
1170 EXPECT_EQ(43, local<Data>().value);
1172 local<Data>().value = 44;
1174 addTask([]() { EXPECT_EQ(44, local<Data>().value); });
1179 EXPECT_EQ(42, local<Data>().value);
1181 local<Data>().value = 43;
1183 fm.addTaskRemote([]() { EXPECT_EQ(43, local<Data>().value); });
1187 EXPECT_EQ(42, local<Data>().value);
1188 local<Data>().value = 43;
1191 EXPECT_EQ(43, local<Data>().value);
1192 local<Data>().value = 44;
1194 std::vector<std::function<void()>> tasks{task};
1195 collectAny(tasks.begin(), tasks.end());
1197 EXPECT_EQ(43, local<Data>().value);
1200 fm.loopUntilNoReady();
1201 EXPECT_FALSE(fm.hasTasks());
1204 TEST(FiberManager, fiberLocal) {
1209 testFiberLocal<SimpleData>();
1212 TEST(FiberManager, fiberLocalHeap) {
1214 char _[1024 * 1024];
1218 testFiberLocal<LargeData>();
1221 TEST(FiberManager, fiberLocalDestructor) {
1228 EXPECT_EQ(42, local<CrazyData>().data);
1229 // Make sure we don't have infinite loop
1230 local<CrazyData>().data = 0;
1237 LocalType<CrazyData>(), std::make_unique<SimpleLoopController>());
1239 fm.addTask([]() { local<CrazyData>().data = 41; });
1241 fm.loopUntilNoReady();
1242 EXPECT_FALSE(fm.hasTasks());
1245 TEST(FiberManager, yieldTest) {
1246 FiberManager manager(std::make_unique<SimpleLoopController>());
1247 auto& loopController =
1248 dynamic_cast<SimpleLoopController&>(manager.loopController());
1250 bool checkRan = false;
1252 manager.addTask([&]() {
1257 loopController.loop([&]() {
1259 loopController.stop();
1263 EXPECT_TRUE(checkRan);
1266 TEST(FiberManager, RequestContext) {
1267 FiberManager fm(std::make_unique<SimpleLoopController>());
1269 bool checkRun1 = false;
1270 bool checkRun2 = false;
1271 bool checkRun3 = false;
1272 bool checkRun4 = false;
1273 folly::fibers::Baton baton1;
1274 folly::fibers::Baton baton2;
1275 folly::fibers::Baton baton3;
1276 folly::fibers::Baton baton4;
1279 folly::RequestContextScopeGuard rctx;
1280 auto rcontext1 = folly::RequestContext::get();
1281 fm.addTask([&, rcontext1]() {
1282 EXPECT_EQ(rcontext1, folly::RequestContext::get());
1284 [&]() { EXPECT_EQ(rcontext1, folly::RequestContext::get()); });
1285 EXPECT_EQ(rcontext1, folly::RequestContext::get());
1287 [&]() { EXPECT_EQ(rcontext1, folly::RequestContext::get()); });
1292 folly::RequestContextScopeGuard rctx;
1293 auto rcontext2 = folly::RequestContext::get();
1294 fm.addTaskRemote([&, rcontext2]() {
1295 EXPECT_EQ(rcontext2, folly::RequestContext::get());
1297 EXPECT_EQ(rcontext2, folly::RequestContext::get());
1302 folly::RequestContextScopeGuard rctx;
1303 auto rcontext3 = folly::RequestContext::get();
1306 EXPECT_EQ(rcontext3, folly::RequestContext::get());
1308 EXPECT_EQ(rcontext3, folly::RequestContext::get());
1310 return folly::Unit();
1312 [&, rcontext3](Try<folly::Unit>&& /* t */) {
1313 EXPECT_EQ(rcontext3, folly::RequestContext::get());
1318 folly::RequestContext::setContext(nullptr);
1320 folly::RequestContextScopeGuard rctx;
1321 auto rcontext4 = folly::RequestContext::get();
1323 EXPECT_EQ(rcontext4, folly::RequestContext::get());
1328 folly::RequestContextScopeGuard rctx;
1329 auto rcontext = folly::RequestContext::get();
1331 fm.loopUntilNoReady();
1332 EXPECT_EQ(rcontext, folly::RequestContext::get());
1335 EXPECT_EQ(rcontext, folly::RequestContext::get());
1336 fm.loopUntilNoReady();
1337 EXPECT_TRUE(checkRun1);
1338 EXPECT_EQ(rcontext, folly::RequestContext::get());
1341 EXPECT_EQ(rcontext, folly::RequestContext::get());
1342 fm.loopUntilNoReady();
1343 EXPECT_TRUE(checkRun2);
1344 EXPECT_EQ(rcontext, folly::RequestContext::get());
1347 EXPECT_EQ(rcontext, folly::RequestContext::get());
1348 fm.loopUntilNoReady();
1349 EXPECT_TRUE(checkRun3);
1350 EXPECT_EQ(rcontext, folly::RequestContext::get());
1353 EXPECT_EQ(rcontext, folly::RequestContext::get());
1354 fm.loopUntilNoReady();
1355 EXPECT_TRUE(checkRun4);
1356 EXPECT_EQ(rcontext, folly::RequestContext::get());
1360 TEST(FiberManager, resizePeriodically) {
1361 FiberManager::Options opts;
1362 opts.fibersPoolResizePeriodMs = 300;
1363 opts.maxFibersPoolSize = 5;
1365 FiberManager manager(std::make_unique<EventBaseLoopController>(), opts);
1367 folly::EventBase evb;
1368 dynamic_cast<EventBaseLoopController&>(manager.loopController())
1369 .attachEventBase(evb);
1371 std::vector<Baton> batons(10);
1373 size_t tasksRun = 0;
1374 for (size_t i = 0; i < 30; ++i) {
1375 manager.addTask([i, &batons, &tasksRun]() {
1377 // Keep some fibers active indefinitely
1378 if (i < batons.size()) {
1384 EXPECT_EQ(0, tasksRun);
1385 EXPECT_EQ(30, manager.fibersAllocated());
1386 EXPECT_EQ(0, manager.fibersPoolSize());
1389 EXPECT_EQ(30, tasksRun);
1390 EXPECT_EQ(30, manager.fibersAllocated());
1391 // Can go over maxFibersPoolSize, 10 of 30 fibers still active
1392 EXPECT_EQ(20, manager.fibersPoolSize());
1394 std::this_thread::sleep_for(std::chrono::milliseconds(400));
1395 evb.loopOnce(); // no fibers active in this period
1396 EXPECT_EQ(30, manager.fibersAllocated());
1397 EXPECT_EQ(20, manager.fibersPoolSize());
1399 std::this_thread::sleep_for(std::chrono::milliseconds(400));
1400 evb.loopOnce(); // should shrink fibers pool to maxFibersPoolSize
1401 EXPECT_EQ(15, manager.fibersAllocated());
1402 EXPECT_EQ(5, manager.fibersPoolSize());
1404 for (size_t i = 0; i < batons.size(); ++i) {
1408 EXPECT_EQ(15, manager.fibersAllocated());
1409 EXPECT_EQ(15, manager.fibersPoolSize());
1411 std::this_thread::sleep_for(std::chrono::milliseconds(400));
1412 evb.loopOnce(); // 10 fibers active in last period
1413 EXPECT_EQ(10, manager.fibersAllocated());
1414 EXPECT_EQ(10, manager.fibersPoolSize());
1416 std::this_thread::sleep_for(std::chrono::milliseconds(400));
1418 EXPECT_EQ(5, manager.fibersAllocated());
1419 EXPECT_EQ(5, manager.fibersPoolSize());
1422 TEST(FiberManager, batonWaitTimeoutHandler) {
1423 FiberManager manager(std::make_unique<EventBaseLoopController>());
1425 folly::EventBase evb;
1426 dynamic_cast<EventBaseLoopController&>(manager.loopController())
1427 .attachEventBase(evb);
1429 size_t fibersRun = 0;
1431 Baton::TimeoutHandler timeoutHandler;
1433 manager.addTask([&]() {
1434 baton.wait(timeoutHandler);
1437 manager.loopUntilNoReady();
1439 EXPECT_FALSE(baton.try_wait());
1440 EXPECT_EQ(0, fibersRun);
1442 timeoutHandler.scheduleTimeout(std::chrono::milliseconds(250));
1443 std::this_thread::sleep_for(std::chrono::milliseconds(500));
1445 EXPECT_FALSE(baton.try_wait());
1446 EXPECT_EQ(0, fibersRun);
1449 manager.loopUntilNoReady();
1451 EXPECT_EQ(1, fibersRun);
1454 TEST(FiberManager, batonWaitTimeoutMany) {
1455 FiberManager manager(std::make_unique<EventBaseLoopController>());
1457 folly::EventBase evb;
1458 dynamic_cast<EventBaseLoopController&>(manager.loopController())
1459 .attachEventBase(evb);
1461 constexpr size_t kNumTimeoutTasks = 10000;
1462 size_t tasksCount = kNumTimeoutTasks;
1464 // We add many tasks to hit timeout queue deallocation logic.
1465 for (size_t i = 0; i < kNumTimeoutTasks; ++i) {
1466 manager.addTask([&]() {
1468 Baton::TimeoutHandler timeoutHandler;
1470 folly::fibers::addTask([&] {
1471 timeoutHandler.scheduleTimeout(std::chrono::milliseconds(1000));
1474 baton.wait(timeoutHandler);
1475 if (--tasksCount == 0) {
1476 evb.terminateLoopSoon();
1484 TEST(FiberManager, remoteFutureTest) {
1485 FiberManager fiberManager(std::make_unique<SimpleLoopController>());
1486 auto& loopController =
1487 dynamic_cast<SimpleLoopController&>(fiberManager.loopController());
1491 auto f1 = fiberManager.addTaskFuture([&]() { return testValue1; });
1492 auto f2 = fiberManager.addTaskRemoteFuture([&]() { return testValue2; });
1493 loopController.loop([&]() { loopController.stop(); });
1497 EXPECT_EQ(v1, testValue1);
1498 EXPECT_EQ(v2, testValue2);
1501 // Test that a void function produes a Future<Unit>.
1502 TEST(FiberManager, remoteFutureVoidUnitTest) {
1503 FiberManager fiberManager(std::make_unique<SimpleLoopController>());
1504 auto& loopController =
1505 dynamic_cast<SimpleLoopController&>(fiberManager.loopController());
1507 bool ranLocal = false;
1508 folly::Future<folly::Unit> futureLocal =
1509 fiberManager.addTaskFuture([&]() { ranLocal = true; });
1511 bool ranRemote = false;
1512 folly::Future<folly::Unit> futureRemote =
1513 fiberManager.addTaskRemoteFuture([&]() { ranRemote = true; });
1515 loopController.loop([&]() { loopController.stop(); });
1518 ASSERT_TRUE(ranLocal);
1520 futureRemote.wait();
1521 ASSERT_TRUE(ranRemote);
1524 TEST(FiberManager, nestedFiberManagers) {
1525 folly::EventBase outerEvb;
1526 folly::EventBase innerEvb;
1528 getFiberManager(outerEvb).addTask([&]() {
1530 &getFiberManager(outerEvb), FiberManager::getFiberManagerUnsafe());
1532 runInMainContext([&]() {
1533 getFiberManager(innerEvb).addTask([&]() {
1535 &getFiberManager(innerEvb), FiberManager::getFiberManagerUnsafe());
1537 innerEvb.terminateLoopSoon();
1540 innerEvb.loopForever();
1544 &getFiberManager(outerEvb), FiberManager::getFiberManagerUnsafe());
1546 outerEvb.terminateLoopSoon();
1549 outerEvb.loopForever();
1552 TEST(FiberManager, semaphore) {
1553 static constexpr size_t kTasks = 10;
1554 static constexpr size_t kIterations = 10000;
1555 static constexpr size_t kNumTokens = 10;
1557 Semaphore sem(kNumTokens);
1561 auto task = [&sem](int& counter, folly::fibers::Baton& baton) {
1562 FiberManager manager(std::make_unique<EventBaseLoopController>());
1563 folly::EventBase evb;
1564 dynamic_cast<EventBaseLoopController&>(manager.loopController())
1565 .attachEventBase(evb);
1568 std::shared_ptr<folly::EventBase> completionCounter(
1569 &evb, [](folly::EventBase* evb) { evb->terminateLoopSoon(); });
1571 for (size_t i = 0; i < kTasks; ++i) {
1572 manager.addTask([&, completionCounter]() {
1573 for (size_t j = 0; j < kIterations; ++j) {
1579 EXPECT_LT(counter, kNumTokens);
1580 EXPECT_GE(counter, 0);
1590 folly::fibers::Baton batonA;
1591 folly::fibers::Baton batonB;
1592 std::thread threadA([&] { task(counterA, batonA); });
1593 std::thread threadB([&] { task(counterB, batonB); });
1600 EXPECT_LT(counterA, kNumTokens);
1601 EXPECT_LT(counterB, kNumTokens);
1602 EXPECT_GE(counterA, 0);
1603 EXPECT_GE(counterB, 0);
1606 template <typename ExecutorT>
1607 void singleBatchDispatch(ExecutorT& executor, int batchSize, int index) {
1608 thread_local BatchDispatcher<int, std::string, ExecutorT> batchDispatcher(
1609 executor, [=](std::vector<int>&& batch) {
1610 EXPECT_EQ(batchSize, batch.size());
1611 std::vector<std::string> results;
1612 for (auto& it : batch) {
1613 results.push_back(folly::to<std::string>(it));
1618 auto indexCopy = index;
1619 auto result = batchDispatcher.add(std::move(indexCopy));
1620 EXPECT_EQ(folly::to<std::string>(index), result.get());
1623 TEST(FiberManager, batchDispatchTest) {
1624 folly::EventBase evb;
1625 auto& executor = getFiberManager(evb);
1627 // Launch multiple fibers with a single id.
1628 executor.add([&]() {
1630 for (int i = 0; i < batchSize; i++) {
1632 [=, &executor]() { singleBatchDispatch(executor, batchSize, i); });
1637 // Reuse the same BatchDispatcher to batch once again.
1638 executor.add([&]() {
1640 for (int i = 0; i < batchSize; i++) {
1642 [=, &executor]() { singleBatchDispatch(executor, batchSize, i); });
1648 template <typename ExecutorT>
1649 folly::Future<std::vector<std::string>> doubleBatchInnerDispatch(
1650 ExecutorT& executor,
1651 int totalNumberOfElements,
1652 std::vector<int> input) {
1653 thread_local BatchDispatcher<
1655 std::vector<std::string>,
1657 batchDispatcher(executor, [=](std::vector<std::vector<int>>&& batch) {
1658 std::vector<std::vector<std::string>> results;
1659 int numberOfElements = 0;
1660 for (auto& unit : batch) {
1661 numberOfElements += unit.size();
1662 std::vector<std::string> result;
1663 for (auto& element : unit) {
1664 result.push_back(folly::to<std::string>(element));
1666 results.push_back(std::move(result));
1668 EXPECT_EQ(totalNumberOfElements, numberOfElements);
1672 return batchDispatcher.add(std::move(input));
1676 * Batch values in groups of 5, and then call inner dispatch.
1678 template <typename ExecutorT>
1679 void doubleBatchOuterDispatch(
1680 ExecutorT& executor,
1681 int totalNumberOfElements,
1683 thread_local BatchDispatcher<int, std::string, ExecutorT> batchDispatcher(
1684 executor, [=, &executor](std::vector<int>&& batch) {
1685 EXPECT_EQ(totalNumberOfElements, batch.size());
1686 std::vector<std::string> results;
1687 std::vector<folly::Future<std::vector<std::string>>>
1688 innerDispatchResultFutures;
1690 std::vector<int> group;
1691 for (auto unit : batch) {
1692 group.push_back(unit);
1693 if (group.size() == 5) {
1694 auto localGroup = group;
1697 innerDispatchResultFutures.push_back(doubleBatchInnerDispatch(
1698 executor, totalNumberOfElements, localGroup));
1703 innerDispatchResultFutures.begin(),
1704 innerDispatchResultFutures.end())
1705 .then([&](std::vector<Try<std::vector<std::string>>>
1706 innerDispatchResults) {
1707 for (auto& unit : innerDispatchResults) {
1708 for (auto& element : unit.value()) {
1709 results.push_back(element);
1717 auto indexCopy = index;
1718 auto result = batchDispatcher.add(std::move(indexCopy));
1719 EXPECT_EQ(folly::to<std::string>(index), result.get());
1722 TEST(FiberManager, doubleBatchDispatchTest) {
1723 folly::EventBase evb;
1724 auto& executor = getFiberManager(evb);
1726 // Launch multiple fibers with a single id.
1727 executor.add([&]() {
1728 int totalNumberOfElements = 20;
1729 for (int i = 0; i < totalNumberOfElements; i++) {
1730 executor.add([=, &executor]() {
1731 doubleBatchOuterDispatch(executor, totalNumberOfElements, i);
1738 template <typename ExecutorT>
1739 void batchDispatchExceptionHandling(ExecutorT& executor, int i) {
1740 thread_local BatchDispatcher<int, int, ExecutorT> batchDispatcher(
1741 executor, [](std::vector<int> &&) -> std::vector<int> {
1742 throw std::runtime_error("Surprise!!");
1745 EXPECT_THROW(batchDispatcher.add(i).get(), std::runtime_error);
1748 TEST(FiberManager, batchDispatchExceptionHandlingTest) {
1749 folly::EventBase evb;
1750 auto& executor = getFiberManager(evb);
1752 // Launch multiple fibers with a single id.
1753 executor.add([&]() {
1754 int totalNumberOfElements = 5;
1755 for (int i = 0; i < totalNumberOfElements; i++) {
1757 [=, &executor]() { batchDispatchExceptionHandling(executor, i); });
1763 namespace AtomicBatchDispatcherTesting {
1765 using ValueT = size_t;
1766 using ResultT = std::string;
1767 using DispatchFunctionT =
1768 folly::Function<std::vector<ResultT>(std::vector<ValueT>&&)>;
1770 #define ENABLE_TRACE_IN_TEST 0 // Set to 1 to debug issues in ABD tests
1771 #if ENABLE_TRACE_IN_TEST
1772 #define OUTPUT_TRACE std::cerr
1773 #else // ENABLE_TRACE_IN_TEST
1774 struct DevNullPiper {
1775 template <typename T>
1776 DevNullPiper& operator<<(const T&) {
1780 DevNullPiper& operator<<(std::ostream& (*)(std::ostream&)) {
1784 #define OUTPUT_TRACE devNullPiper
1785 #endif // ENABLE_TRACE_IN_TEST
1788 AtomicBatchDispatcher<ValueT, ResultT>::Token token;
1791 void preprocess(FiberManager& executor, bool die) {
1792 // Yield for a random duration [0, 10] ms to simulate I/O in preprocessing
1793 clock_t msecToDoIO = folly::Random::rand32() % 10;
1794 double start = (1000.0 * clock()) / CLOCKS_PER_SEC;
1795 double endAfter = start + msecToDoIO;
1796 while ((1000.0 * clock()) / CLOCKS_PER_SEC < endAfter) {
1800 throw std::logic_error("Simulating preprocessing failure");
1804 Job(AtomicBatchDispatcher<ValueT, ResultT>::Token&& t, ValueT i)
1805 : token(std::move(t)), input(i) {}
1807 Job(Job&&) = default;
1808 Job& operator=(Job&&) = default;
1811 ResultT processSingleInput(ValueT&& input) {
1812 return folly::to<ResultT>(std::move(input));
1815 std::vector<ResultT> userDispatchFunc(std::vector<ValueT>&& inputs) {
1816 size_t expectedCount = inputs.size();
1817 std::vector<ResultT> results;
1818 results.reserve(expectedCount);
1819 for (size_t i = 0; i < expectedCount; ++i) {
1820 results.emplace_back(processSingleInput(std::move(inputs[i])));
1826 AtomicBatchDispatcher<ValueT, ResultT>& atomicBatchDispatcher,
1827 std::vector<Job>& jobs,
1830 for (size_t i = 0; i < count; ++i) {
1831 jobs.emplace_back(Job(atomicBatchDispatcher.getToken(), i));
1835 enum class DispatchProblem {
1842 FiberManager& executor,
1843 std::vector<Job>& jobs,
1844 std::vector<folly::Optional<folly::Future<ResultT>>>& results,
1845 DispatchProblem dispatchProblem = DispatchProblem::None,
1846 size_t problemIndex = size_t(-1)) {
1848 dispatchProblem == DispatchProblem::None || problemIndex < jobs.size());
1850 results.resize(jobs.size());
1851 for (size_t i = 0; i < jobs.size(); ++i) {
1853 [i, &executor, &jobs, &results, dispatchProblem, problemIndex]() {
1855 Job job(std::move(jobs[i]));
1857 if (dispatchProblem == DispatchProblem::PreprocessThrows) {
1858 if (i == problemIndex) {
1859 EXPECT_THROW(job.preprocess(executor, true), std::logic_error);
1864 job.preprocess(executor, false);
1865 OUTPUT_TRACE << "Dispatching job #" << i << std::endl;
1866 results[i] = job.token.dispatch(job.input);
1867 OUTPUT_TRACE << "Result future filled for job #" << i << std::endl;
1869 if (dispatchProblem == DispatchProblem::DuplicateDispatch) {
1870 if (i == problemIndex) {
1871 EXPECT_THROW(job.token.dispatch(job.input), ABDUsageException);
1875 OUTPUT_TRACE << "Preprocessing failed for job #" << i << std::endl;
1881 void validateResult(
1882 std::vector<folly::Optional<folly::Future<ResultT>>>& results,
1885 OUTPUT_TRACE << "results[" << i << "].value() : " << results[i]->value()
1887 } catch (std::exception& e) {
1888 OUTPUT_TRACE << "Exception : " << e.what() << std::endl;
1893 template <typename TException>
1894 void validateResults(
1895 std::vector<folly::Optional<folly::Future<ResultT>>>& results,
1896 size_t expectedNumResults) {
1897 size_t numResultsFilled = 0;
1898 for (size_t i = 0; i < results.size(); ++i) {
1903 EXPECT_THROW(validateResult(results, i), TException);
1905 EXPECT_EQ(numResultsFilled, expectedNumResults);
1908 void validateResults(
1909 std::vector<folly::Optional<folly::Future<ResultT>>>& results,
1910 size_t expectedNumResults) {
1911 size_t numResultsFilled = 0;
1912 for (size_t i = 0; i < results.size(); ++i) {
1917 EXPECT_NO_THROW(validateResult(results, i));
1918 ValueT expectedInput = i;
1920 results[i]->value(), processSingleInput(std::move(expectedInput)));
1922 EXPECT_EQ(numResultsFilled, expectedNumResults);
1925 } // namespace AtomicBatchDispatcherTesting
1927 #define SET_UP_TEST_FUNC \
1928 using namespace AtomicBatchDispatcherTesting; \
1929 folly::EventBase evb; \
1930 auto& executor = getFiberManager(evb); \
1931 const size_t COUNT = 11; \
1932 std::vector<Job> jobs; \
1933 jobs.reserve(COUNT); \
1934 std::vector<folly::Optional<folly::Future<ResultT>>> results; \
1935 results.reserve(COUNT); \
1936 DispatchFunctionT dispatchFunc
1938 TEST(FiberManager, ABD_Test) {
1942 // Testing AtomicBatchDispatcher with explicit call to commit()
1944 dispatchFunc = userDispatchFunc;
1945 auto atomicBatchDispatcher =
1946 createAtomicBatchDispatcher(std::move(dispatchFunc));
1947 createJobs(atomicBatchDispatcher, jobs, COUNT);
1948 dispatchJobs(executor, jobs, results);
1949 atomicBatchDispatcher.commit();
1951 validateResults(results, COUNT);
1954 TEST(FiberManager, ABD_DispatcherDestroyedBeforeCallingCommit) {
1958 // Testing AtomicBatchDispatcher destroyed before calling commit.
1959 // Handles error cases for:
1960 // - User might have forgotten to add the call to commit() in the code
1961 // - An unexpected exception got thrown in user code before commit() is called
1964 dispatchFunc = userDispatchFunc;
1965 auto atomicBatchDispatcher =
1966 createAtomicBatchDispatcher(std::move(dispatchFunc));
1967 createJobs(atomicBatchDispatcher, jobs, COUNT);
1968 dispatchJobs(executor, jobs, results);
1969 throw std::runtime_error(
1970 "Unexpected exception in user code before commit called");
1971 // atomicBatchDispatcher.commit();
1973 /* User code handles the exception and does not exit process */
1976 validateResults<ABDCommitNotCalledException>(results, COUNT);
1979 TEST(FiberManager, ABD_PreprocessingFailureTest) {
1983 // Testing preprocessing failure on a job throws
1985 dispatchFunc = userDispatchFunc;
1986 auto atomicBatchDispatcher =
1987 createAtomicBatchDispatcher(std::move(dispatchFunc));
1988 createJobs(atomicBatchDispatcher, jobs, COUNT);
1989 dispatchJobs(executor, jobs, results, DispatchProblem::PreprocessThrows, 8);
1990 atomicBatchDispatcher.commit();
1992 validateResults<ABDTokenNotDispatchedException>(results, COUNT - 1);
1995 TEST(FiberManager, ABD_MultipleDispatchOnSameTokenErrorTest) {
1999 // Testing that calling dispatch more than once on the same token throws
2001 dispatchFunc = userDispatchFunc;
2002 auto atomicBatchDispatcher =
2003 createAtomicBatchDispatcher(std::move(dispatchFunc));
2004 createJobs(atomicBatchDispatcher, jobs, COUNT);
2005 dispatchJobs(executor, jobs, results, DispatchProblem::DuplicateDispatch, 4);
2006 atomicBatchDispatcher.commit();
2010 TEST(FiberManager, ABD_GetTokenCalledAfterCommitTest) {
2014 // Testing that exception set on attempt to call getToken after commit called
2016 dispatchFunc = userDispatchFunc;
2017 auto atomicBatchDispatcher =
2018 createAtomicBatchDispatcher(std::move(dispatchFunc));
2019 createJobs(atomicBatchDispatcher, jobs, COUNT);
2020 atomicBatchDispatcher.commit();
2021 EXPECT_THROW(atomicBatchDispatcher.getToken(), ABDUsageException);
2022 dispatchJobs(executor, jobs, results);
2023 EXPECT_THROW(atomicBatchDispatcher.getToken(), ABDUsageException);
2025 validateResults(results, COUNT);
2026 EXPECT_THROW(atomicBatchDispatcher.getToken(), ABDUsageException);
2029 TEST(FiberManager, ABD_UserProvidedBatchDispatchThrowsTest) {
2033 // Testing that exception is set if user provided batch dispatch throws
2035 dispatchFunc = [](std::vector<ValueT>&& inputs) -> std::vector<ResultT> {
2036 (void)userDispatchFunc(std::move(inputs));
2037 throw std::runtime_error("Unexpected exception in user dispatch function");
2039 auto atomicBatchDispatcher =
2040 createAtomicBatchDispatcher(std::move(dispatchFunc));
2041 createJobs(atomicBatchDispatcher, jobs, COUNT);
2042 dispatchJobs(executor, jobs, results);
2043 atomicBatchDispatcher.commit();
2045 validateResults<std::runtime_error>(results, COUNT);
2048 TEST(FiberManager, VirtualEventBase) {
2052 folly::ScopedEventBaseThread thread;
2055 std::make_unique<folly::VirtualEventBase>(*thread.getEventBase());
2056 auto& evb2 = thread.getEventBase()->getVirtualEventBase();
2058 getFiberManager(*evb1).addTaskRemote([&] {
2060 baton.try_wait_for(std::chrono::milliseconds{100});
2065 getFiberManager(evb2).addTaskRemote([&] {
2067 baton.try_wait_for(std::chrono::milliseconds{200});
2072 EXPECT_FALSE(done1);
2073 EXPECT_FALSE(done2);
2077 EXPECT_FALSE(done2);
2082 TEST(TimedMutex, ThreadsAndFibersDontDeadlock) {
2083 folly::EventBase evb;
2084 auto& fm = getFiberManager(evb);
2086 std::thread testThread([&] {
2087 for (int i = 0; i < 100; i++) {
2092 b.try_wait_for(std::chrono::milliseconds(1));
2097 for (int numFibers = 0; numFibers < 100; numFibers++) {
2099 for (int i = 0; i < 20; i++) {
2103 b.try_wait_for(std::chrono::milliseconds(1));
2108 b.try_wait_for(std::chrono::milliseconds(1));
2115 EXPECT_EQ(0, fm.hasTasks());
2119 TEST(TimedMutex, ThreadFiberDeadlockOrder) {
2120 folly::EventBase evb;
2121 auto& fm = getFiberManager(evb);
2125 std::thread unlockThread([&] {
2126 /* sleep override */ std::this_thread::sleep_for(
2127 std::chrono::milliseconds{100});
2131 fm.addTask([&] { std::lock_guard<TimedMutex> lg(mutex); });
2133 runInMainContext([&] {
2134 auto locked = mutex.timed_lock(std::chrono::seconds{1});
2135 EXPECT_TRUE(locked);
2143 EXPECT_EQ(0, fm.hasTasks());
2145 unlockThread.join();
2148 TEST(TimedMutex, ThreadFiberDeadlockRace) {
2149 folly::EventBase evb;
2150 auto& fm = getFiberManager(evb);
2156 auto locked = mutex.timed_lock(std::chrono::seconds{1});
2157 EXPECT_TRUE(locked);
2164 runInMainContext([&] {
2165 auto locked = mutex.timed_lock(std::chrono::seconds{1});
2166 EXPECT_TRUE(locked);
2174 EXPECT_EQ(0, fm.hasTasks());
2178 * Test that we can properly track fiber stack usage.
2180 * This functionality can only be enabled when ASAN is disabled, so avoid
2181 * running this test with ASAN.
2183 #ifndef FOLLY_SANITIZE_ADDRESS
2184 TEST(FiberManager, recordStack) {
2186 folly::fibers::FiberManager::Options opts;
2187 opts.recordStackEvery = 1;
2189 FiberManager fm(std::make_unique<SimpleLoopController>(), opts);
2190 auto& loopController =
2191 dynamic_cast<SimpleLoopController&>(fm.loopController());
2193 static constexpr size_t n = 1000;
2197 for (size_t i = 0; i < n; ++i) {
2200 for (size_t i = 0; i + 1 < n; ++i) {
2201 s += b[i] * b[i + 1];
2207 loopController.loop([&]() { loopController.stop(); });
2209 // Check that we properly accounted fiber stack usage.
2210 EXPECT_LT(n * sizeof(int), fm.stackHighWatermark());
2212 std::thread(f).join();