2 * Copyright 2017 Facebook, Inc.
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
8 * http://www.apache.org/licenses/LICENSE-2.0
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
20 #include <folly/Memory.h>
21 #include <folly/Random.h>
22 #include <folly/futures/Future.h>
24 #include <folly/Conv.h>
25 #include <folly/fibers/AddTasks.h>
26 #include <folly/fibers/AtomicBatchDispatcher.h>
27 #include <folly/fibers/BatchDispatcher.h>
28 #include <folly/fibers/EventBaseLoopController.h>
29 #include <folly/fibers/FiberManager.h>
30 #include <folly/fibers/FiberManagerMap.h>
31 #include <folly/fibers/GenericBaton.h>
32 #include <folly/fibers/Semaphore.h>
33 #include <folly/fibers/SimpleLoopController.h>
34 #include <folly/fibers/TimedMutex.h>
35 #include <folly/fibers/WhenN.h>
36 #include <folly/io/async/ScopedEventBaseThread.h>
37 #include <folly/portability/GTest.h>
39 using namespace folly::fibers;
43 TEST(FiberManager, batonTimedWaitTimeout) {
44 bool taskAdded = false;
45 size_t iterations = 0;
47 FiberManager manager(std::make_unique<SimpleLoopController>());
48 auto& loopController =
49 dynamic_cast<SimpleLoopController&>(manager.loopController());
51 auto loopFunc = [&]() {
53 manager.addTask([&]() {
56 auto res = baton.timed_wait(std::chrono::milliseconds(230));
59 EXPECT_EQ(5, iterations);
61 loopController.stop();
63 manager.addTask([&]() {
66 auto res = baton.timed_wait(std::chrono::milliseconds(130));
69 EXPECT_EQ(3, iterations);
71 loopController.stop();
75 std::this_thread::sleep_for(std::chrono::milliseconds(50));
80 loopController.loop(std::move(loopFunc));
83 TEST(FiberManager, batonTimedWaitPost) {
84 bool taskAdded = false;
85 size_t iterations = 0;
88 FiberManager manager(std::make_unique<SimpleLoopController>());
89 auto& loopController =
90 dynamic_cast<SimpleLoopController&>(manager.loopController());
92 auto loopFunc = [&]() {
94 manager.addTask([&]() {
98 auto res = baton.timed_wait(std::chrono::milliseconds(130));
101 EXPECT_EQ(2, iterations);
103 loopController.stop();
107 std::this_thread::sleep_for(std::chrono::milliseconds(50));
109 if (iterations == 2) {
115 loopController.loop(std::move(loopFunc));
118 TEST(FiberManager, batonTimedWaitTimeoutEvb) {
119 size_t tasksComplete = 0;
121 folly::EventBase evb;
123 FiberManager manager(std::make_unique<EventBaseLoopController>());
124 dynamic_cast<EventBaseLoopController&>(manager.loopController())
125 .attachEventBase(evb);
127 auto task = [&](size_t timeout_ms) {
130 auto start = EventBaseLoopController::Clock::now();
131 auto res = baton.timed_wait(std::chrono::milliseconds(timeout_ms));
132 auto finish = EventBaseLoopController::Clock::now();
137 std::chrono::duration_cast<std::chrono::milliseconds>(finish - start);
139 EXPECT_GT(duration_ms.count(), timeout_ms - 50);
140 EXPECT_LT(duration_ms.count(), timeout_ms + 50);
142 if (++tasksComplete == 2) {
143 evb.terminateLoopSoon();
147 evb.runInEventBaseThread([&]() {
148 manager.addTask([&]() { task(500); });
149 manager.addTask([&]() { task(250); });
154 EXPECT_EQ(2, tasksComplete);
157 TEST(FiberManager, batonTimedWaitPostEvb) {
158 size_t tasksComplete = 0;
160 folly::EventBase evb;
162 FiberManager manager(std::make_unique<EventBaseLoopController>());
163 dynamic_cast<EventBaseLoopController&>(manager.loopController())
164 .attachEventBase(evb);
166 evb.runInEventBaseThread([&]() {
167 manager.addTask([&]() {
170 evb.tryRunAfterDelay([&]() { baton.post(); }, 100);
172 auto start = EventBaseLoopController::Clock::now();
173 auto res = baton.timed_wait(std::chrono::milliseconds(130));
174 auto finish = EventBaseLoopController::Clock::now();
179 std::chrono::duration_cast<std::chrono::milliseconds>(finish - start);
181 EXPECT_TRUE(duration_ms.count() > 95 && duration_ms.count() < 110);
183 if (++tasksComplete == 1) {
184 evb.terminateLoopSoon();
191 EXPECT_EQ(1, tasksComplete);
194 TEST(FiberManager, batonTryWait) {
195 FiberManager manager(std::make_unique<SimpleLoopController>());
197 // Check if try_wait and post work as expected
200 manager.addTask([&]() {
201 while (!b.try_wait()) {
204 auto thr = std::thread([&]() {
205 std::this_thread::sleep_for(std::chrono::milliseconds(300));
209 manager.loopUntilNoReady();
214 // Check try_wait without post
215 manager.addTask([&]() {
217 while (cnt && !c.try_wait()) {
220 EXPECT_TRUE(!c.try_wait()); // must still hold
224 manager.loopUntilNoReady();
227 TEST(FiberManager, genericBatonFiberWait) {
228 FiberManager manager(std::make_unique<SimpleLoopController>());
231 bool fiberRunning = false;
233 manager.addTask([&]() {
234 EXPECT_EQ(manager.hasActiveFiber(), true);
237 fiberRunning = false;
240 EXPECT_FALSE(fiberRunning);
241 manager.loopUntilNoReady();
242 EXPECT_TRUE(fiberRunning); // ensure fiber still active
244 auto thr = std::thread([&]() {
245 std::this_thread::sleep_for(std::chrono::milliseconds(300));
249 while (fiberRunning) {
250 manager.loopUntilNoReady();
256 TEST(FiberManager, genericBatonThreadWait) {
257 FiberManager manager(std::make_unique<SimpleLoopController>());
259 std::atomic<bool> threadWaiting(false);
261 auto thr = std::thread([&]() {
262 threadWaiting = true;
264 threadWaiting = false;
267 while (!threadWaiting) {
269 std::this_thread::sleep_for(std::chrono::milliseconds(300));
271 manager.addTask([&]() {
272 EXPECT_EQ(manager.hasActiveFiber(), true);
273 EXPECT_TRUE(threadWaiting);
275 while (threadWaiting) {
279 manager.loopUntilNoReady();
283 TEST(FiberManager, addTasksNoncopyable) {
284 std::vector<Promise<int>> pendingFibers;
285 bool taskAdded = false;
287 FiberManager manager(std::make_unique<SimpleLoopController>());
288 auto& loopController =
289 dynamic_cast<SimpleLoopController&>(manager.loopController());
291 auto loopFunc = [&]() {
293 manager.addTask([&]() {
294 std::vector<std::function<std::unique_ptr<int>()>> funcs;
295 for (int i = 0; i < 3; ++i) {
296 funcs.push_back([i, &pendingFibers]() {
297 await([&pendingFibers](Promise<int> promise) {
298 pendingFibers.push_back(std::move(promise));
300 return std::make_unique<int>(i * 2 + 1);
304 auto iter = addTasks(funcs.begin(), funcs.end());
307 while (iter.hasNext()) {
308 auto result = iter.awaitNext();
309 EXPECT_EQ(2 * iter.getTaskID() + 1, *result);
310 EXPECT_GE(2 - n, pendingFibers.size());
316 } else if (pendingFibers.size()) {
317 pendingFibers.back().setValue(0);
318 pendingFibers.pop_back();
320 loopController.stop();
324 loopController.loop(std::move(loopFunc));
327 TEST(FiberManager, awaitThrow) {
328 folly::EventBase evb;
329 struct ExpectedException {};
333 await([](Promise<int> p) {
335 throw ExpectedException();
340 await([&](Promise<int> p) {
341 evb.runInEventBaseThread([p = std::move(p)]() mutable {
344 throw ExpectedException();
351 TEST(FiberManager, addTasksThrow) {
352 std::vector<Promise<int>> pendingFibers;
353 bool taskAdded = false;
355 FiberManager manager(std::make_unique<SimpleLoopController>());
356 auto& loopController =
357 dynamic_cast<SimpleLoopController&>(manager.loopController());
359 auto loopFunc = [&]() {
361 manager.addTask([&]() {
362 std::vector<std::function<int()>> funcs;
363 for (size_t i = 0; i < 3; ++i) {
364 funcs.push_back([i, &pendingFibers]() {
365 await([&pendingFibers](Promise<int> promise) {
366 pendingFibers.push_back(std::move(promise));
369 throw std::runtime_error("Runtime");
375 auto iter = addTasks(funcs.begin(), funcs.end());
378 while (iter.hasNext()) {
380 int result = iter.awaitNext();
381 EXPECT_EQ(1, iter.getTaskID() % 2);
382 EXPECT_EQ(2 * iter.getTaskID() + 1, result);
384 EXPECT_EQ(0, iter.getTaskID() % 2);
386 EXPECT_GE(2 - n, pendingFibers.size());
392 } else if (pendingFibers.size()) {
393 pendingFibers.back().setValue(0);
394 pendingFibers.pop_back();
396 loopController.stop();
400 loopController.loop(std::move(loopFunc));
403 TEST(FiberManager, addTasksVoid) {
404 std::vector<Promise<int>> pendingFibers;
405 bool taskAdded = false;
407 FiberManager manager(std::make_unique<SimpleLoopController>());
408 auto& loopController =
409 dynamic_cast<SimpleLoopController&>(manager.loopController());
411 auto loopFunc = [&]() {
413 manager.addTask([&]() {
414 std::vector<std::function<void()>> funcs;
415 for (size_t i = 0; i < 3; ++i) {
416 funcs.push_back([&pendingFibers]() {
417 await([&pendingFibers](Promise<int> promise) {
418 pendingFibers.push_back(std::move(promise));
423 auto iter = addTasks(funcs.begin(), funcs.end());
426 while (iter.hasNext()) {
428 EXPECT_GE(2 - n, pendingFibers.size());
434 } else if (pendingFibers.size()) {
435 pendingFibers.back().setValue(0);
436 pendingFibers.pop_back();
438 loopController.stop();
442 loopController.loop(std::move(loopFunc));
445 TEST(FiberManager, addTasksVoidThrow) {
446 std::vector<Promise<int>> pendingFibers;
447 bool taskAdded = false;
449 FiberManager manager(std::make_unique<SimpleLoopController>());
450 auto& loopController =
451 dynamic_cast<SimpleLoopController&>(manager.loopController());
453 auto loopFunc = [&]() {
455 manager.addTask([&]() {
456 std::vector<std::function<void()>> funcs;
457 for (size_t i = 0; i < 3; ++i) {
458 funcs.push_back([i, &pendingFibers]() {
459 await([&pendingFibers](Promise<int> promise) {
460 pendingFibers.push_back(std::move(promise));
463 throw std::runtime_error("");
468 auto iter = addTasks(funcs.begin(), funcs.end());
471 while (iter.hasNext()) {
474 EXPECT_EQ(1, iter.getTaskID() % 2);
476 EXPECT_EQ(0, iter.getTaskID() % 2);
478 EXPECT_GE(2 - n, pendingFibers.size());
484 } else if (pendingFibers.size()) {
485 pendingFibers.back().setValue(0);
486 pendingFibers.pop_back();
488 loopController.stop();
492 loopController.loop(std::move(loopFunc));
495 TEST(FiberManager, addTasksReserve) {
496 std::vector<Promise<int>> pendingFibers;
497 bool taskAdded = false;
499 FiberManager manager(std::make_unique<SimpleLoopController>());
500 auto& loopController =
501 dynamic_cast<SimpleLoopController&>(manager.loopController());
503 auto loopFunc = [&]() {
505 manager.addTask([&]() {
506 std::vector<std::function<void()>> funcs;
507 for (size_t i = 0; i < 3; ++i) {
508 funcs.push_back([&pendingFibers]() {
509 await([&pendingFibers](Promise<int> promise) {
510 pendingFibers.push_back(std::move(promise));
515 auto iter = addTasks(funcs.begin(), funcs.end());
518 EXPECT_TRUE(iter.hasCompleted());
519 EXPECT_TRUE(iter.hasPending());
520 EXPECT_TRUE(iter.hasNext());
523 EXPECT_TRUE(iter.hasCompleted());
524 EXPECT_TRUE(iter.hasPending());
525 EXPECT_TRUE(iter.hasNext());
528 EXPECT_FALSE(iter.hasCompleted());
529 EXPECT_TRUE(iter.hasPending());
530 EXPECT_TRUE(iter.hasNext());
533 EXPECT_FALSE(iter.hasCompleted());
534 EXPECT_FALSE(iter.hasPending());
535 EXPECT_FALSE(iter.hasNext());
538 } else if (pendingFibers.size()) {
539 pendingFibers.back().setValue(0);
540 pendingFibers.pop_back();
542 loopController.stop();
546 loopController.loop(std::move(loopFunc));
549 TEST(FiberManager, addTaskDynamic) {
550 folly::EventBase evb;
554 auto makeTask = [&](size_t taskId) {
555 return [&, taskId]() -> size_t {
556 batons[taskId].wait();
562 .addTaskFuture([&]() {
563 TaskIterator<size_t> iterator;
565 iterator.addTask(makeTask(0));
566 iterator.addTask(makeTask(1));
570 EXPECT_EQ(1, iterator.awaitNext());
572 iterator.addTask(makeTask(2));
576 EXPECT_EQ(2, iterator.awaitNext());
580 EXPECT_EQ(0, iterator.awaitNext());
585 TEST(FiberManager, forEach) {
586 std::vector<Promise<int>> pendingFibers;
587 bool taskAdded = false;
589 FiberManager manager(std::make_unique<SimpleLoopController>());
590 auto& loopController =
591 dynamic_cast<SimpleLoopController&>(manager.loopController());
593 auto loopFunc = [&]() {
595 manager.addTask([&]() {
596 std::vector<std::function<int()>> funcs;
597 for (size_t i = 0; i < 3; ++i) {
598 funcs.push_back([i, &pendingFibers]() {
599 await([&pendingFibers](Promise<int> promise) {
600 pendingFibers.push_back(std::move(promise));
606 std::vector<std::pair<size_t, int>> results;
607 forEach(funcs.begin(), funcs.end(), [&results](size_t id, int result) {
608 results.emplace_back(id, result);
610 EXPECT_EQ(3, results.size());
611 EXPECT_TRUE(pendingFibers.empty());
612 for (size_t i = 0; i < 3; ++i) {
613 EXPECT_EQ(results[i].first * 2 + 1, results[i].second);
617 } else if (pendingFibers.size()) {
618 pendingFibers.back().setValue(0);
619 pendingFibers.pop_back();
621 loopController.stop();
625 loopController.loop(std::move(loopFunc));
628 TEST(FiberManager, collectN) {
629 std::vector<Promise<int>> pendingFibers;
630 bool taskAdded = false;
632 FiberManager manager(std::make_unique<SimpleLoopController>());
633 auto& loopController =
634 dynamic_cast<SimpleLoopController&>(manager.loopController());
636 auto loopFunc = [&]() {
638 manager.addTask([&]() {
639 std::vector<std::function<int()>> funcs;
640 for (size_t i = 0; i < 3; ++i) {
641 funcs.push_back([i, &pendingFibers]() {
642 await([&pendingFibers](Promise<int> promise) {
643 pendingFibers.push_back(std::move(promise));
649 auto results = collectN(funcs.begin(), funcs.end(), 2);
650 EXPECT_EQ(2, results.size());
651 EXPECT_EQ(1, pendingFibers.size());
652 for (size_t i = 0; i < 2; ++i) {
653 EXPECT_EQ(results[i].first * 2 + 1, results[i].second);
657 } else if (pendingFibers.size()) {
658 pendingFibers.back().setValue(0);
659 pendingFibers.pop_back();
661 loopController.stop();
665 loopController.loop(std::move(loopFunc));
668 TEST(FiberManager, collectNThrow) {
669 std::vector<Promise<int>> pendingFibers;
670 bool taskAdded = false;
672 FiberManager manager(std::make_unique<SimpleLoopController>());
673 auto& loopController =
674 dynamic_cast<SimpleLoopController&>(manager.loopController());
676 auto loopFunc = [&]() {
678 manager.addTask([&]() {
679 std::vector<std::function<int()>> funcs;
680 for (size_t i = 0; i < 3; ++i) {
681 funcs.push_back([&pendingFibers]() -> size_t {
682 await([&pendingFibers](Promise<int> promise) {
683 pendingFibers.push_back(std::move(promise));
685 throw std::runtime_error("Runtime");
690 collectN(funcs.begin(), funcs.end(), 2);
692 EXPECT_EQ(1, pendingFibers.size());
696 } else if (pendingFibers.size()) {
697 pendingFibers.back().setValue(0);
698 pendingFibers.pop_back();
700 loopController.stop();
704 loopController.loop(std::move(loopFunc));
707 TEST(FiberManager, collectNVoid) {
708 std::vector<Promise<int>> pendingFibers;
709 bool taskAdded = false;
711 FiberManager manager(std::make_unique<SimpleLoopController>());
712 auto& loopController =
713 dynamic_cast<SimpleLoopController&>(manager.loopController());
715 auto loopFunc = [&]() {
717 manager.addTask([&]() {
718 std::vector<std::function<void()>> funcs;
719 for (size_t i = 0; i < 3; ++i) {
720 funcs.push_back([&pendingFibers]() {
721 await([&pendingFibers](Promise<int> promise) {
722 pendingFibers.push_back(std::move(promise));
727 auto results = collectN(funcs.begin(), funcs.end(), 2);
728 EXPECT_EQ(2, results.size());
729 EXPECT_EQ(1, pendingFibers.size());
732 } else if (pendingFibers.size()) {
733 pendingFibers.back().setValue(0);
734 pendingFibers.pop_back();
736 loopController.stop();
740 loopController.loop(std::move(loopFunc));
743 TEST(FiberManager, collectNVoidThrow) {
744 std::vector<Promise<int>> pendingFibers;
745 bool taskAdded = false;
747 FiberManager manager(std::make_unique<SimpleLoopController>());
748 auto& loopController =
749 dynamic_cast<SimpleLoopController&>(manager.loopController());
751 auto loopFunc = [&]() {
753 manager.addTask([&]() {
754 std::vector<std::function<void()>> funcs;
755 for (size_t i = 0; i < 3; ++i) {
756 funcs.push_back([&pendingFibers]() {
757 await([&pendingFibers](Promise<int> promise) {
758 pendingFibers.push_back(std::move(promise));
760 throw std::runtime_error("Runtime");
765 collectN(funcs.begin(), funcs.end(), 2);
767 EXPECT_EQ(1, pendingFibers.size());
771 } else if (pendingFibers.size()) {
772 pendingFibers.back().setValue(0);
773 pendingFibers.pop_back();
775 loopController.stop();
779 loopController.loop(std::move(loopFunc));
782 TEST(FiberManager, collectAll) {
783 std::vector<Promise<int>> pendingFibers;
784 bool taskAdded = false;
786 FiberManager manager(std::make_unique<SimpleLoopController>());
787 auto& loopController =
788 dynamic_cast<SimpleLoopController&>(manager.loopController());
790 auto loopFunc = [&]() {
792 manager.addTask([&]() {
793 std::vector<std::function<int()>> funcs;
794 for (size_t i = 0; i < 3; ++i) {
795 funcs.push_back([i, &pendingFibers]() {
796 await([&pendingFibers](Promise<int> promise) {
797 pendingFibers.push_back(std::move(promise));
803 auto results = collectAll(funcs.begin(), funcs.end());
804 EXPECT_TRUE(pendingFibers.empty());
805 for (size_t i = 0; i < 3; ++i) {
806 EXPECT_EQ(i * 2 + 1, results[i]);
810 } else if (pendingFibers.size()) {
811 pendingFibers.back().setValue(0);
812 pendingFibers.pop_back();
814 loopController.stop();
818 loopController.loop(std::move(loopFunc));
821 TEST(FiberManager, collectAllVoid) {
822 std::vector<Promise<int>> pendingFibers;
823 bool taskAdded = false;
825 FiberManager manager(std::make_unique<SimpleLoopController>());
826 auto& loopController =
827 dynamic_cast<SimpleLoopController&>(manager.loopController());
829 auto loopFunc = [&]() {
831 manager.addTask([&]() {
832 std::vector<std::function<void()>> funcs;
833 for (size_t i = 0; i < 3; ++i) {
834 funcs.push_back([&pendingFibers]() {
835 await([&pendingFibers](Promise<int> promise) {
836 pendingFibers.push_back(std::move(promise));
841 collectAll(funcs.begin(), funcs.end());
842 EXPECT_TRUE(pendingFibers.empty());
845 } else if (pendingFibers.size()) {
846 pendingFibers.back().setValue(0);
847 pendingFibers.pop_back();
849 loopController.stop();
853 loopController.loop(std::move(loopFunc));
856 TEST(FiberManager, collectAny) {
857 std::vector<Promise<int>> pendingFibers;
858 bool taskAdded = false;
860 FiberManager manager(std::make_unique<SimpleLoopController>());
861 auto& loopController =
862 dynamic_cast<SimpleLoopController&>(manager.loopController());
864 auto loopFunc = [&]() {
866 manager.addTask([&]() {
867 std::vector<std::function<int()>> funcs;
868 for (size_t i = 0; i < 3; ++i) {
869 funcs.push_back([i, &pendingFibers]() {
870 await([&pendingFibers](Promise<int> promise) {
871 pendingFibers.push_back(std::move(promise));
874 throw std::runtime_error("This exception will be ignored");
880 auto result = collectAny(funcs.begin(), funcs.end());
881 EXPECT_EQ(2, pendingFibers.size());
882 EXPECT_EQ(2, result.first);
883 EXPECT_EQ(2 * 2 + 1, result.second);
886 } else if (pendingFibers.size()) {
887 pendingFibers.back().setValue(0);
888 pendingFibers.pop_back();
890 loopController.stop();
894 loopController.loop(std::move(loopFunc));
898 /* Checks that this function was run from a main context,
899 by comparing an address on a stack to a known main stack address
900 and a known related fiber stack address. The assumption
901 is that fiber stack and main stack will be far enough apart,
902 while any two values on the same stack will be close. */
903 void expectMainContext(bool& ran, int* mainLocation, int* fiberLocation) {
905 /* 2 pages is a good guess */
906 constexpr ssize_t DISTANCE = 0x2000 / sizeof(int);
908 EXPECT_TRUE(std::abs(&here - fiberLocation) > DISTANCE);
911 EXPECT_TRUE(std::abs(&here - mainLocation) < DISTANCE);
919 TEST(FiberManager, runInMainContext) {
920 FiberManager manager(std::make_unique<SimpleLoopController>());
921 auto& loopController =
922 dynamic_cast<SimpleLoopController&>(manager.loopController());
924 bool checkRan = false;
927 manager.runInMainContext(
928 [&]() { expectMainContext(checkRan, &mainLocation, nullptr); });
929 EXPECT_TRUE(checkRan);
933 manager.addTask([&]() {
935 explicit A(int value_) : value(value_) {}
936 A(const A&) = delete;
942 auto ret = runInMainContext([&]() {
943 expectMainContext(checkRan, &mainLocation, &stackLocation);
946 EXPECT_TRUE(checkRan);
947 EXPECT_EQ(42, ret.value);
950 loopController.loop([&]() { loopController.stop(); });
952 EXPECT_TRUE(checkRan);
955 TEST(FiberManager, addTaskFinally) {
956 FiberManager manager(std::make_unique<SimpleLoopController>());
957 auto& loopController =
958 dynamic_cast<SimpleLoopController&>(manager.loopController());
960 bool checkRan = false;
964 manager.addTaskFinally(
965 [&]() { return 1234; },
966 [&](Try<int>&& result) {
967 EXPECT_EQ(result.value(), 1234);
969 expectMainContext(checkRan, &mainLocation, nullptr);
972 EXPECT_FALSE(checkRan);
974 loopController.loop([&]() { loopController.stop(); });
976 EXPECT_TRUE(checkRan);
979 TEST(FiberManager, fibersPoolWithinLimit) {
980 FiberManager::Options opts;
981 opts.maxFibersPoolSize = 5;
983 FiberManager manager(std::make_unique<SimpleLoopController>(), opts);
984 auto& loopController =
985 dynamic_cast<SimpleLoopController&>(manager.loopController());
987 size_t fibersRun = 0;
989 for (size_t i = 0; i < 5; ++i) {
990 manager.addTask([&]() { ++fibersRun; });
992 loopController.loop([&]() { loopController.stop(); });
994 EXPECT_EQ(5, fibersRun);
995 EXPECT_EQ(5, manager.fibersAllocated());
996 EXPECT_EQ(5, manager.fibersPoolSize());
998 for (size_t i = 0; i < 5; ++i) {
999 manager.addTask([&]() { ++fibersRun; });
1001 loopController.loop([&]() { loopController.stop(); });
1003 EXPECT_EQ(10, fibersRun);
1004 EXPECT_EQ(5, manager.fibersAllocated());
1005 EXPECT_EQ(5, manager.fibersPoolSize());
1008 TEST(FiberManager, fibersPoolOverLimit) {
1009 FiberManager::Options opts;
1010 opts.maxFibersPoolSize = 5;
1012 FiberManager manager(std::make_unique<SimpleLoopController>(), opts);
1013 auto& loopController =
1014 dynamic_cast<SimpleLoopController&>(manager.loopController());
1016 size_t fibersRun = 0;
1018 for (size_t i = 0; i < 10; ++i) {
1019 manager.addTask([&]() { ++fibersRun; });
1022 EXPECT_EQ(0, fibersRun);
1023 EXPECT_EQ(10, manager.fibersAllocated());
1024 EXPECT_EQ(0, manager.fibersPoolSize());
1026 loopController.loop([&]() { loopController.stop(); });
1028 EXPECT_EQ(10, fibersRun);
1029 EXPECT_EQ(5, manager.fibersAllocated());
1030 EXPECT_EQ(5, manager.fibersPoolSize());
1033 TEST(FiberManager, remoteFiberBasic) {
1034 FiberManager manager(std::make_unique<SimpleLoopController>());
1035 auto& loopController =
1036 dynamic_cast<SimpleLoopController&>(manager.loopController());
1039 result[0] = result[1] = 0;
1040 folly::Optional<Promise<int>> savedPromise[2];
1041 manager.addTask([&]() {
1043 [&](Promise<int> promise) { savedPromise[0] = std::move(promise); });
1045 manager.addTask([&]() {
1047 [&](Promise<int> promise) { savedPromise[1] = std::move(promise); });
1050 manager.loopUntilNoReady();
1052 EXPECT_TRUE(savedPromise[0].hasValue());
1053 EXPECT_TRUE(savedPromise[1].hasValue());
1054 EXPECT_EQ(0, result[0]);
1055 EXPECT_EQ(0, result[1]);
1057 std::thread remoteThread0{[&]() { savedPromise[0]->setValue(42); }};
1058 std::thread remoteThread1{[&]() { savedPromise[1]->setValue(43); }};
1059 remoteThread0.join();
1060 remoteThread1.join();
1061 EXPECT_EQ(0, result[0]);
1062 EXPECT_EQ(0, result[1]);
1063 /* Should only have scheduled once */
1064 EXPECT_EQ(1, loopController.remoteScheduleCalled());
1066 manager.loopUntilNoReady();
1067 EXPECT_EQ(42, result[0]);
1068 EXPECT_EQ(43, result[1]);
1071 TEST(FiberManager, addTaskRemoteBasic) {
1072 FiberManager manager(std::make_unique<SimpleLoopController>());
1075 result[0] = result[1] = 0;
1076 folly::Optional<Promise<int>> savedPromise[2];
1078 std::thread remoteThread0{[&]() {
1079 manager.addTaskRemote([&]() {
1081 [&](Promise<int> promise) { savedPromise[0] = std::move(promise); });
1084 std::thread remoteThread1{[&]() {
1085 manager.addTaskRemote([&]() {
1087 [&](Promise<int> promise) { savedPromise[1] = std::move(promise); });
1090 remoteThread0.join();
1091 remoteThread1.join();
1093 manager.loopUntilNoReady();
1095 EXPECT_TRUE(savedPromise[0].hasValue());
1096 EXPECT_TRUE(savedPromise[1].hasValue());
1097 EXPECT_EQ(0, result[0]);
1098 EXPECT_EQ(0, result[1]);
1100 savedPromise[0]->setValue(42);
1101 savedPromise[1]->setValue(43);
1103 EXPECT_EQ(0, result[0]);
1104 EXPECT_EQ(0, result[1]);
1106 manager.loopUntilNoReady();
1107 EXPECT_EQ(42, result[0]);
1108 EXPECT_EQ(43, result[1]);
1111 TEST(FiberManager, remoteHasTasks) {
1113 FiberManager fm(std::make_unique<SimpleLoopController>());
1114 std::thread remote([&]() { fm.addTaskRemote([&]() { ++counter; }); });
1118 while (fm.hasTasks()) {
1119 fm.loopUntilNoReady();
1122 EXPECT_FALSE(fm.hasTasks());
1123 EXPECT_EQ(counter, 1);
1126 TEST(FiberManager, remoteHasReadyTasks) {
1128 folly::Optional<Promise<int>> savedPromise;
1129 FiberManager fm(std::make_unique<SimpleLoopController>());
1130 std::thread remote([&]() {
1131 fm.addTaskRemote([&]() {
1133 [&](Promise<int> promise) { savedPromise = std::move(promise); });
1134 EXPECT_TRUE(fm.hasTasks());
1139 EXPECT_TRUE(fm.hasTasks());
1141 fm.loopUntilNoReady();
1142 EXPECT_TRUE(fm.hasTasks());
1144 std::thread remote2([&]() { savedPromise->setValue(47); });
1146 EXPECT_TRUE(fm.hasTasks());
1148 fm.loopUntilNoReady();
1149 EXPECT_FALSE(fm.hasTasks());
1151 EXPECT_EQ(result, 47);
1154 template <typename Data>
1155 void testFiberLocal() {
1156 FiberManager fm(LocalType<Data>(), std::make_unique<SimpleLoopController>());
1159 EXPECT_EQ(42, local<Data>().value);
1161 local<Data>().value = 43;
1164 EXPECT_EQ(43, local<Data>().value);
1166 local<Data>().value = 44;
1168 addTask([]() { EXPECT_EQ(44, local<Data>().value); });
1173 EXPECT_EQ(42, local<Data>().value);
1175 local<Data>().value = 43;
1177 fm.addTaskRemote([]() { EXPECT_EQ(43, local<Data>().value); });
1181 EXPECT_EQ(42, local<Data>().value);
1182 local<Data>().value = 43;
1185 EXPECT_EQ(43, local<Data>().value);
1186 local<Data>().value = 44;
1188 std::vector<std::function<void()>> tasks{task};
1189 collectAny(tasks.begin(), tasks.end());
1191 EXPECT_EQ(43, local<Data>().value);
1194 fm.loopUntilNoReady();
1195 EXPECT_FALSE(fm.hasTasks());
1198 TEST(FiberManager, fiberLocal) {
1203 testFiberLocal<SimpleData>();
1206 TEST(FiberManager, fiberLocalHeap) {
1208 char _[1024 * 1024];
1212 testFiberLocal<LargeData>();
1215 TEST(FiberManager, fiberLocalDestructor) {
1222 EXPECT_EQ(42, local<CrazyData>().data);
1223 // Make sure we don't have infinite loop
1224 local<CrazyData>().data = 0;
1231 LocalType<CrazyData>(), std::make_unique<SimpleLoopController>());
1233 fm.addTask([]() { local<CrazyData>().data = 41; });
1235 fm.loopUntilNoReady();
1236 EXPECT_FALSE(fm.hasTasks());
1239 TEST(FiberManager, yieldTest) {
1240 FiberManager manager(std::make_unique<SimpleLoopController>());
1241 auto& loopController =
1242 dynamic_cast<SimpleLoopController&>(manager.loopController());
1244 bool checkRan = false;
1246 manager.addTask([&]() {
1251 loopController.loop([&]() {
1253 loopController.stop();
1257 EXPECT_TRUE(checkRan);
1260 TEST(FiberManager, RequestContext) {
1261 FiberManager fm(std::make_unique<SimpleLoopController>());
1263 bool checkRun1 = false;
1264 bool checkRun2 = false;
1265 bool checkRun3 = false;
1266 bool checkRun4 = false;
1267 folly::fibers::Baton baton1;
1268 folly::fibers::Baton baton2;
1269 folly::fibers::Baton baton3;
1270 folly::fibers::Baton baton4;
1273 folly::RequestContextScopeGuard rctx;
1274 auto rcontext1 = folly::RequestContext::get();
1275 fm.addTask([&, rcontext1]() {
1276 EXPECT_EQ(rcontext1, folly::RequestContext::get());
1278 [&]() { EXPECT_EQ(rcontext1, folly::RequestContext::get()); });
1279 EXPECT_EQ(rcontext1, folly::RequestContext::get());
1281 [&]() { EXPECT_EQ(rcontext1, folly::RequestContext::get()); });
1286 folly::RequestContextScopeGuard rctx;
1287 auto rcontext2 = folly::RequestContext::get();
1288 fm.addTaskRemote([&, rcontext2]() {
1289 EXPECT_EQ(rcontext2, folly::RequestContext::get());
1291 EXPECT_EQ(rcontext2, folly::RequestContext::get());
1296 folly::RequestContextScopeGuard rctx;
1297 auto rcontext3 = folly::RequestContext::get();
1300 EXPECT_EQ(rcontext3, folly::RequestContext::get());
1302 EXPECT_EQ(rcontext3, folly::RequestContext::get());
1304 return folly::Unit();
1306 [&, rcontext3](Try<folly::Unit>&& /* t */) {
1307 EXPECT_EQ(rcontext3, folly::RequestContext::get());
1312 folly::RequestContext::setContext(nullptr);
1314 folly::RequestContextScopeGuard rctx;
1315 auto rcontext4 = folly::RequestContext::get();
1317 EXPECT_EQ(rcontext4, folly::RequestContext::get());
1322 folly::RequestContextScopeGuard rctx;
1323 auto rcontext = folly::RequestContext::get();
1325 fm.loopUntilNoReady();
1326 EXPECT_EQ(rcontext, folly::RequestContext::get());
1329 EXPECT_EQ(rcontext, folly::RequestContext::get());
1330 fm.loopUntilNoReady();
1331 EXPECT_TRUE(checkRun1);
1332 EXPECT_EQ(rcontext, folly::RequestContext::get());
1335 EXPECT_EQ(rcontext, folly::RequestContext::get());
1336 fm.loopUntilNoReady();
1337 EXPECT_TRUE(checkRun2);
1338 EXPECT_EQ(rcontext, folly::RequestContext::get());
1341 EXPECT_EQ(rcontext, folly::RequestContext::get());
1342 fm.loopUntilNoReady();
1343 EXPECT_TRUE(checkRun3);
1344 EXPECT_EQ(rcontext, folly::RequestContext::get());
1347 EXPECT_EQ(rcontext, folly::RequestContext::get());
1348 fm.loopUntilNoReady();
1349 EXPECT_TRUE(checkRun4);
1350 EXPECT_EQ(rcontext, folly::RequestContext::get());
1354 TEST(FiberManager, resizePeriodically) {
1355 FiberManager::Options opts;
1356 opts.fibersPoolResizePeriodMs = 300;
1357 opts.maxFibersPoolSize = 5;
1359 FiberManager manager(std::make_unique<EventBaseLoopController>(), opts);
1361 folly::EventBase evb;
1362 dynamic_cast<EventBaseLoopController&>(manager.loopController())
1363 .attachEventBase(evb);
1365 std::vector<Baton> batons(10);
1367 size_t tasksRun = 0;
1368 for (size_t i = 0; i < 30; ++i) {
1369 manager.addTask([i, &batons, &tasksRun]() {
1371 // Keep some fibers active indefinitely
1372 if (i < batons.size()) {
1378 EXPECT_EQ(0, tasksRun);
1379 EXPECT_EQ(30, manager.fibersAllocated());
1380 EXPECT_EQ(0, manager.fibersPoolSize());
1383 EXPECT_EQ(30, tasksRun);
1384 EXPECT_EQ(30, manager.fibersAllocated());
1385 // Can go over maxFibersPoolSize, 10 of 30 fibers still active
1386 EXPECT_EQ(20, manager.fibersPoolSize());
1388 std::this_thread::sleep_for(std::chrono::milliseconds(400));
1389 evb.loopOnce(); // no fibers active in this period
1390 EXPECT_EQ(30, manager.fibersAllocated());
1391 EXPECT_EQ(20, manager.fibersPoolSize());
1393 std::this_thread::sleep_for(std::chrono::milliseconds(400));
1394 evb.loopOnce(); // should shrink fibers pool to maxFibersPoolSize
1395 EXPECT_EQ(15, manager.fibersAllocated());
1396 EXPECT_EQ(5, manager.fibersPoolSize());
1398 for (size_t i = 0; i < batons.size(); ++i) {
1402 EXPECT_EQ(15, manager.fibersAllocated());
1403 EXPECT_EQ(15, manager.fibersPoolSize());
1405 std::this_thread::sleep_for(std::chrono::milliseconds(400));
1406 evb.loopOnce(); // 10 fibers active in last period
1407 EXPECT_EQ(10, manager.fibersAllocated());
1408 EXPECT_EQ(10, manager.fibersPoolSize());
1410 std::this_thread::sleep_for(std::chrono::milliseconds(400));
1412 EXPECT_EQ(5, manager.fibersAllocated());
1413 EXPECT_EQ(5, manager.fibersPoolSize());
1416 TEST(FiberManager, batonWaitTimeoutHandler) {
1417 FiberManager manager(std::make_unique<EventBaseLoopController>());
1419 folly::EventBase evb;
1420 dynamic_cast<EventBaseLoopController&>(manager.loopController())
1421 .attachEventBase(evb);
1423 size_t fibersRun = 0;
1425 Baton::TimeoutHandler timeoutHandler;
1427 manager.addTask([&]() {
1428 baton.wait(timeoutHandler);
1431 manager.loopUntilNoReady();
1433 EXPECT_FALSE(baton.try_wait());
1434 EXPECT_EQ(0, fibersRun);
1436 timeoutHandler.scheduleTimeout(std::chrono::milliseconds(250));
1437 std::this_thread::sleep_for(std::chrono::milliseconds(500));
1439 EXPECT_FALSE(baton.try_wait());
1440 EXPECT_EQ(0, fibersRun);
1443 manager.loopUntilNoReady();
1445 EXPECT_EQ(1, fibersRun);
1448 TEST(FiberManager, batonWaitTimeoutMany) {
1449 FiberManager manager(std::make_unique<EventBaseLoopController>());
1451 folly::EventBase evb;
1452 dynamic_cast<EventBaseLoopController&>(manager.loopController())
1453 .attachEventBase(evb);
1455 constexpr size_t kNumTimeoutTasks = 10000;
1456 size_t tasksCount = kNumTimeoutTasks;
1458 // We add many tasks to hit timeout queue deallocation logic.
1459 for (size_t i = 0; i < kNumTimeoutTasks; ++i) {
1460 manager.addTask([&]() {
1462 Baton::TimeoutHandler timeoutHandler;
1464 folly::fibers::addTask([&] {
1465 timeoutHandler.scheduleTimeout(std::chrono::milliseconds(1000));
1468 baton.wait(timeoutHandler);
1469 if (--tasksCount == 0) {
1470 evb.terminateLoopSoon();
1478 TEST(FiberManager, remoteFutureTest) {
1479 FiberManager fiberManager(std::make_unique<SimpleLoopController>());
1480 auto& loopController =
1481 dynamic_cast<SimpleLoopController&>(fiberManager.loopController());
1485 auto f1 = fiberManager.addTaskFuture([&]() { return testValue1; });
1486 auto f2 = fiberManager.addTaskRemoteFuture([&]() { return testValue2; });
1487 loopController.loop([&]() { loopController.stop(); });
1491 EXPECT_EQ(v1, testValue1);
1492 EXPECT_EQ(v2, testValue2);
1495 // Test that a void function produes a Future<Unit>.
1496 TEST(FiberManager, remoteFutureVoidUnitTest) {
1497 FiberManager fiberManager(std::make_unique<SimpleLoopController>());
1498 auto& loopController =
1499 dynamic_cast<SimpleLoopController&>(fiberManager.loopController());
1501 bool ranLocal = false;
1502 folly::Future<folly::Unit> futureLocal =
1503 fiberManager.addTaskFuture([&]() { ranLocal = true; });
1505 bool ranRemote = false;
1506 folly::Future<folly::Unit> futureRemote =
1507 fiberManager.addTaskRemoteFuture([&]() { ranRemote = true; });
1509 loopController.loop([&]() { loopController.stop(); });
1512 ASSERT_TRUE(ranLocal);
1514 futureRemote.wait();
1515 ASSERT_TRUE(ranRemote);
1518 TEST(FiberManager, nestedFiberManagers) {
1519 folly::EventBase outerEvb;
1520 folly::EventBase innerEvb;
1522 getFiberManager(outerEvb).addTask([&]() {
1524 &getFiberManager(outerEvb), FiberManager::getFiberManagerUnsafe());
1526 runInMainContext([&]() {
1527 getFiberManager(innerEvb).addTask([&]() {
1529 &getFiberManager(innerEvb), FiberManager::getFiberManagerUnsafe());
1531 innerEvb.terminateLoopSoon();
1534 innerEvb.loopForever();
1538 &getFiberManager(outerEvb), FiberManager::getFiberManagerUnsafe());
1540 outerEvb.terminateLoopSoon();
1543 outerEvb.loopForever();
1546 TEST(FiberManager, semaphore) {
1547 static constexpr size_t kTasks = 10;
1548 static constexpr size_t kIterations = 10000;
1549 static constexpr size_t kNumTokens = 10;
1551 Semaphore sem(kNumTokens);
1555 auto task = [&sem](int& counter, folly::fibers::Baton& baton) {
1556 FiberManager manager(std::make_unique<EventBaseLoopController>());
1557 folly::EventBase evb;
1558 dynamic_cast<EventBaseLoopController&>(manager.loopController())
1559 .attachEventBase(evb);
1562 std::shared_ptr<folly::EventBase> completionCounter(
1563 &evb, [](folly::EventBase* evb) { evb->terminateLoopSoon(); });
1565 for (size_t i = 0; i < kTasks; ++i) {
1566 manager.addTask([&, completionCounter]() {
1567 for (size_t j = 0; j < kIterations; ++j) {
1573 EXPECT_LT(counter, kNumTokens);
1574 EXPECT_GE(counter, 0);
1584 folly::fibers::Baton batonA;
1585 folly::fibers::Baton batonB;
1586 std::thread threadA([&] { task(counterA, batonA); });
1587 std::thread threadB([&] { task(counterB, batonB); });
1594 EXPECT_LT(counterA, kNumTokens);
1595 EXPECT_LT(counterB, kNumTokens);
1596 EXPECT_GE(counterA, 0);
1597 EXPECT_GE(counterB, 0);
1600 template <typename ExecutorT>
1601 void singleBatchDispatch(ExecutorT& executor, int batchSize, int index) {
1602 thread_local BatchDispatcher<int, std::string, ExecutorT> batchDispatcher(
1603 executor, [=](std::vector<int>&& batch) {
1604 EXPECT_EQ(batchSize, batch.size());
1605 std::vector<std::string> results;
1606 for (auto& it : batch) {
1607 results.push_back(folly::to<std::string>(it));
1612 auto indexCopy = index;
1613 auto result = batchDispatcher.add(std::move(indexCopy));
1614 EXPECT_EQ(folly::to<std::string>(index), result.get());
1617 TEST(FiberManager, batchDispatchTest) {
1618 folly::EventBase evb;
1619 auto& executor = getFiberManager(evb);
1621 // Launch multiple fibers with a single id.
1622 executor.add([&]() {
1624 for (int i = 0; i < batchSize; i++) {
1626 [=, &executor]() { singleBatchDispatch(executor, batchSize, i); });
1631 // Reuse the same BatchDispatcher to batch once again.
1632 executor.add([&]() {
1634 for (int i = 0; i < batchSize; i++) {
1636 [=, &executor]() { singleBatchDispatch(executor, batchSize, i); });
1642 template <typename ExecutorT>
1643 folly::Future<std::vector<std::string>> doubleBatchInnerDispatch(
1644 ExecutorT& executor,
1645 int totalNumberOfElements,
1646 std::vector<int> input) {
1647 thread_local BatchDispatcher<
1649 std::vector<std::string>,
1651 batchDispatcher(executor, [=](std::vector<std::vector<int>>&& batch) {
1652 std::vector<std::vector<std::string>> results;
1653 int numberOfElements = 0;
1654 for (auto& unit : batch) {
1655 numberOfElements += unit.size();
1656 std::vector<std::string> result;
1657 for (auto& element : unit) {
1658 result.push_back(folly::to<std::string>(element));
1660 results.push_back(std::move(result));
1662 EXPECT_EQ(totalNumberOfElements, numberOfElements);
1666 return batchDispatcher.add(std::move(input));
1670 * Batch values in groups of 5, and then call inner dispatch.
1672 template <typename ExecutorT>
1673 void doubleBatchOuterDispatch(
1674 ExecutorT& executor,
1675 int totalNumberOfElements,
1677 thread_local BatchDispatcher<int, std::string, ExecutorT> batchDispatcher(
1678 executor, [=, &executor](std::vector<int>&& batch) {
1679 EXPECT_EQ(totalNumberOfElements, batch.size());
1680 std::vector<std::string> results;
1681 std::vector<folly::Future<std::vector<std::string>>>
1682 innerDispatchResultFutures;
1684 std::vector<int> group;
1685 for (auto unit : batch) {
1686 group.push_back(unit);
1687 if (group.size() == 5) {
1688 auto localGroup = group;
1691 innerDispatchResultFutures.push_back(doubleBatchInnerDispatch(
1692 executor, totalNumberOfElements, localGroup));
1697 innerDispatchResultFutures.begin(),
1698 innerDispatchResultFutures.end())
1699 .then([&](std::vector<Try<std::vector<std::string>>>
1700 innerDispatchResults) {
1701 for (auto& unit : innerDispatchResults) {
1702 for (auto& element : unit.value()) {
1703 results.push_back(element);
1711 auto indexCopy = index;
1712 auto result = batchDispatcher.add(std::move(indexCopy));
1713 EXPECT_EQ(folly::to<std::string>(index), result.get());
1716 TEST(FiberManager, doubleBatchDispatchTest) {
1717 folly::EventBase evb;
1718 auto& executor = getFiberManager(evb);
1720 // Launch multiple fibers with a single id.
1721 executor.add([&]() {
1722 int totalNumberOfElements = 20;
1723 for (int i = 0; i < totalNumberOfElements; i++) {
1724 executor.add([=, &executor]() {
1725 doubleBatchOuterDispatch(executor, totalNumberOfElements, i);
1732 template <typename ExecutorT>
1733 void batchDispatchExceptionHandling(ExecutorT& executor, int i) {
1734 thread_local BatchDispatcher<int, int, ExecutorT> batchDispatcher(
1735 executor, [](std::vector<int> &&) -> std::vector<int> {
1736 throw std::runtime_error("Surprise!!");
1739 EXPECT_THROW(batchDispatcher.add(i).get(), std::runtime_error);
1742 TEST(FiberManager, batchDispatchExceptionHandlingTest) {
1743 folly::EventBase evb;
1744 auto& executor = getFiberManager(evb);
1746 // Launch multiple fibers with a single id.
1747 executor.add([&]() {
1748 int totalNumberOfElements = 5;
1749 for (int i = 0; i < totalNumberOfElements; i++) {
1751 [=, &executor]() { batchDispatchExceptionHandling(executor, i); });
1757 namespace AtomicBatchDispatcherTesting {
1759 using ValueT = size_t;
1760 using ResultT = std::string;
1761 using DispatchFunctionT =
1762 folly::Function<std::vector<ResultT>(std::vector<ValueT>&&)>;
1764 #define ENABLE_TRACE_IN_TEST 0 // Set to 1 to debug issues in ABD tests
1765 #if ENABLE_TRACE_IN_TEST
1766 #define OUTPUT_TRACE std::cerr
1767 #else // ENABLE_TRACE_IN_TEST
1768 struct DevNullPiper {
1769 template <typename T>
1770 DevNullPiper& operator<<(const T&) {
1774 DevNullPiper& operator<<(std::ostream& (*)(std::ostream&)) {
1778 #define OUTPUT_TRACE devNullPiper
1779 #endif // ENABLE_TRACE_IN_TEST
1782 AtomicBatchDispatcher<ValueT, ResultT>::Token token;
1785 void preprocess(FiberManager& executor, bool die) {
1786 // Yield for a random duration [0, 10] ms to simulate I/O in preprocessing
1787 clock_t msecToDoIO = folly::Random::rand32() % 10;
1788 double start = (1000.0 * clock()) / CLOCKS_PER_SEC;
1789 double endAfter = start + msecToDoIO;
1790 while ((1000.0 * clock()) / CLOCKS_PER_SEC < endAfter) {
1794 throw std::logic_error("Simulating preprocessing failure");
1798 Job(AtomicBatchDispatcher<ValueT, ResultT>::Token&& t, ValueT i)
1799 : token(std::move(t)), input(i) {}
1801 Job(Job&&) = default;
1802 Job& operator=(Job&&) = default;
1805 ResultT processSingleInput(ValueT&& input) {
1806 return folly::to<ResultT>(std::move(input));
1809 std::vector<ResultT> userDispatchFunc(std::vector<ValueT>&& inputs) {
1810 size_t expectedCount = inputs.size();
1811 std::vector<ResultT> results;
1812 results.reserve(expectedCount);
1813 for (size_t i = 0; i < expectedCount; ++i) {
1814 results.emplace_back(processSingleInput(std::move(inputs[i])));
1820 AtomicBatchDispatcher<ValueT, ResultT>& atomicBatchDispatcher,
1821 std::vector<Job>& jobs,
1824 for (size_t i = 0; i < count; ++i) {
1825 jobs.emplace_back(Job(atomicBatchDispatcher.getToken(), i));
1829 enum class DispatchProblem {
1836 FiberManager& executor,
1837 std::vector<Job>& jobs,
1838 std::vector<folly::Optional<folly::Future<ResultT>>>& results,
1839 DispatchProblem dispatchProblem = DispatchProblem::None,
1840 size_t problemIndex = size_t(-1)) {
1842 dispatchProblem == DispatchProblem::None || problemIndex < jobs.size());
1844 results.resize(jobs.size());
1845 for (size_t i = 0; i < jobs.size(); ++i) {
1847 [i, &executor, &jobs, &results, dispatchProblem, problemIndex]() {
1849 Job job(std::move(jobs[i]));
1851 if (dispatchProblem == DispatchProblem::PreprocessThrows) {
1852 if (i == problemIndex) {
1853 EXPECT_THROW(job.preprocess(executor, true), std::logic_error);
1858 job.preprocess(executor, false);
1859 OUTPUT_TRACE << "Dispatching job #" << i << std::endl;
1860 results[i] = job.token.dispatch(job.input);
1861 OUTPUT_TRACE << "Result future filled for job #" << i << std::endl;
1863 if (dispatchProblem == DispatchProblem::DuplicateDispatch) {
1864 if (i == problemIndex) {
1865 EXPECT_THROW(job.token.dispatch(job.input), ABDUsageException);
1869 OUTPUT_TRACE << "Preprocessing failed for job #" << i << std::endl;
1875 void validateResult(
1876 std::vector<folly::Optional<folly::Future<ResultT>>>& results,
1879 OUTPUT_TRACE << "results[" << i << "].value() : " << results[i]->value()
1881 } catch (std::exception& e) {
1882 OUTPUT_TRACE << "Exception : " << e.what() << std::endl;
1887 template <typename TException>
1888 void validateResults(
1889 std::vector<folly::Optional<folly::Future<ResultT>>>& results,
1890 size_t expectedNumResults) {
1891 size_t numResultsFilled = 0;
1892 for (size_t i = 0; i < results.size(); ++i) {
1897 EXPECT_THROW(validateResult(results, i), TException);
1899 EXPECT_EQ(numResultsFilled, expectedNumResults);
1902 void validateResults(
1903 std::vector<folly::Optional<folly::Future<ResultT>>>& results,
1904 size_t expectedNumResults) {
1905 size_t numResultsFilled = 0;
1906 for (size_t i = 0; i < results.size(); ++i) {
1911 EXPECT_NO_THROW(validateResult(results, i));
1912 ValueT expectedInput = i;
1914 results[i]->value(), processSingleInput(std::move(expectedInput)));
1916 EXPECT_EQ(numResultsFilled, expectedNumResults);
1919 } // namespace AtomicBatchDispatcherTesting
1921 #define SET_UP_TEST_FUNC \
1922 using namespace AtomicBatchDispatcherTesting; \
1923 folly::EventBase evb; \
1924 auto& executor = getFiberManager(evb); \
1925 const size_t COUNT = 11; \
1926 std::vector<Job> jobs; \
1927 jobs.reserve(COUNT); \
1928 std::vector<folly::Optional<folly::Future<ResultT>>> results; \
1929 results.reserve(COUNT); \
1930 DispatchFunctionT dispatchFunc
1932 TEST(FiberManager, ABD_Test) {
1936 // Testing AtomicBatchDispatcher with explicit call to commit()
1938 dispatchFunc = userDispatchFunc;
1939 auto atomicBatchDispatcher =
1940 createAtomicBatchDispatcher(std::move(dispatchFunc));
1941 createJobs(atomicBatchDispatcher, jobs, COUNT);
1942 dispatchJobs(executor, jobs, results);
1943 atomicBatchDispatcher.commit();
1945 validateResults(results, COUNT);
1948 TEST(FiberManager, ABD_DispatcherDestroyedBeforeCallingCommit) {
1952 // Testing AtomicBatchDispatcher destroyed before calling commit.
1953 // Handles error cases for:
1954 // - User might have forgotten to add the call to commit() in the code
1955 // - An unexpected exception got thrown in user code before commit() is called
1958 dispatchFunc = userDispatchFunc;
1959 auto atomicBatchDispatcher =
1960 createAtomicBatchDispatcher(std::move(dispatchFunc));
1961 createJobs(atomicBatchDispatcher, jobs, COUNT);
1962 dispatchJobs(executor, jobs, results);
1963 throw std::runtime_error(
1964 "Unexpected exception in user code before commit called");
1965 // atomicBatchDispatcher.commit();
1967 /* User code handles the exception and does not exit process */
1970 validateResults<ABDCommitNotCalledException>(results, COUNT);
1973 TEST(FiberManager, ABD_PreprocessingFailureTest) {
1977 // Testing preprocessing failure on a job throws
1979 dispatchFunc = userDispatchFunc;
1980 auto atomicBatchDispatcher =
1981 createAtomicBatchDispatcher(std::move(dispatchFunc));
1982 createJobs(atomicBatchDispatcher, jobs, COUNT);
1983 dispatchJobs(executor, jobs, results, DispatchProblem::PreprocessThrows, 8);
1984 atomicBatchDispatcher.commit();
1986 validateResults<ABDTokenNotDispatchedException>(results, COUNT - 1);
1989 TEST(FiberManager, ABD_MultipleDispatchOnSameTokenErrorTest) {
1993 // Testing that calling dispatch more than once on the same token throws
1995 dispatchFunc = userDispatchFunc;
1996 auto atomicBatchDispatcher =
1997 createAtomicBatchDispatcher(std::move(dispatchFunc));
1998 createJobs(atomicBatchDispatcher, jobs, COUNT);
1999 dispatchJobs(executor, jobs, results, DispatchProblem::DuplicateDispatch, 4);
2000 atomicBatchDispatcher.commit();
2004 TEST(FiberManager, ABD_GetTokenCalledAfterCommitTest) {
2008 // Testing that exception set on attempt to call getToken after commit called
2010 dispatchFunc = userDispatchFunc;
2011 auto atomicBatchDispatcher =
2012 createAtomicBatchDispatcher(std::move(dispatchFunc));
2013 createJobs(atomicBatchDispatcher, jobs, COUNT);
2014 atomicBatchDispatcher.commit();
2015 EXPECT_THROW(atomicBatchDispatcher.getToken(), ABDUsageException);
2016 dispatchJobs(executor, jobs, results);
2017 EXPECT_THROW(atomicBatchDispatcher.getToken(), ABDUsageException);
2019 validateResults(results, COUNT);
2020 EXPECT_THROW(atomicBatchDispatcher.getToken(), ABDUsageException);
2023 TEST(FiberManager, ABD_UserProvidedBatchDispatchThrowsTest) {
2027 // Testing that exception is set if user provided batch dispatch throws
2029 dispatchFunc = [](std::vector<ValueT>&& inputs) -> std::vector<ResultT> {
2030 (void)userDispatchFunc(std::move(inputs));
2031 throw std::runtime_error("Unexpected exception in user dispatch function");
2033 auto atomicBatchDispatcher =
2034 createAtomicBatchDispatcher(std::move(dispatchFunc));
2035 createJobs(atomicBatchDispatcher, jobs, COUNT);
2036 dispatchJobs(executor, jobs, results);
2037 atomicBatchDispatcher.commit();
2039 validateResults<std::runtime_error>(results, COUNT);
2042 TEST(FiberManager, VirtualEventBase) {
2046 folly::ScopedEventBaseThread thread;
2049 std::make_unique<folly::VirtualEventBase>(*thread.getEventBase());
2050 auto& evb2 = thread.getEventBase()->getVirtualEventBase();
2052 getFiberManager(*evb1).addTaskRemote([&] {
2054 baton.timed_wait(std::chrono::milliseconds{100});
2059 getFiberManager(evb2).addTaskRemote([&] {
2061 baton.timed_wait(std::chrono::milliseconds{200});
2066 EXPECT_FALSE(done1);
2067 EXPECT_FALSE(done2);
2071 EXPECT_FALSE(done2);
2076 TEST(TimedMutex, ThreadsAndFibersDontDeadlock) {
2077 folly::EventBase evb;
2078 auto& fm = getFiberManager(evb);
2080 std::thread testThread([&] {
2081 for (int i = 0; i < 100; i++) {
2086 b.timed_wait(std::chrono::milliseconds(1));
2091 for (int numFibers = 0; numFibers < 100; numFibers++) {
2093 for (int i = 0; i < 20; i++) {
2097 b.timed_wait(std::chrono::milliseconds(1));
2102 b.timed_wait(std::chrono::milliseconds(1));
2109 EXPECT_EQ(0, fm.hasTasks());
2113 TEST(TimedMutex, ThreadFiberDeadlockOrder) {
2114 folly::EventBase evb;
2115 auto& fm = getFiberManager(evb);
2119 std::thread unlockThread([&] {
2120 /* sleep override */ std::this_thread::sleep_for(
2121 std::chrono::milliseconds{100});
2125 fm.addTask([&] { std::lock_guard<TimedMutex> lg(mutex); });
2127 runInMainContext([&] {
2128 auto locked = mutex.timed_lock(std::chrono::seconds{1});
2129 EXPECT_TRUE(locked);
2137 EXPECT_EQ(0, fm.hasTasks());
2139 unlockThread.join();
2142 TEST(TimedMutex, ThreadFiberDeadlockRace) {
2143 folly::EventBase evb;
2144 auto& fm = getFiberManager(evb);
2150 auto locked = mutex.timed_lock(std::chrono::seconds{1});
2151 EXPECT_TRUE(locked);
2158 runInMainContext([&] {
2159 auto locked = mutex.timed_lock(std::chrono::seconds{1});
2160 EXPECT_TRUE(locked);
2168 EXPECT_EQ(0, fm.hasTasks());
2172 * Test that we can properly track fiber stack usage.
2174 * This functionality can only be enabled when ASAN is disabled, so avoid
2175 * running this test with ASAN.
2177 #ifndef FOLLY_SANITIZE_ADDRESS
2178 TEST(FiberManager, recordStack) {
2180 folly::fibers::FiberManager::Options opts;
2181 opts.recordStackEvery = 1;
2183 FiberManager fm(std::make_unique<SimpleLoopController>(), opts);
2184 auto& loopController =
2185 dynamic_cast<SimpleLoopController&>(fm.loopController());
2187 static constexpr size_t n = 1000;
2191 for (size_t i = 0; i < n; ++i) {
2194 for (size_t i = 0; i + 1 < n; ++i) {
2195 s += b[i] * b[i + 1];
2201 loopController.loop([&]() { loopController.stop(); });
2203 // Check that we properly accounted fiber stack usage.
2204 EXPECT_LT(n * sizeof(int), fm.stackHighWatermark());
2206 std::thread(f).join();