2 * Copyright 2016 Facebook, Inc.
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
8 * http://www.apache.org/licenses/LICENSE-2.0
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
20 #include <folly/Memory.h>
21 #include <folly/Random.h>
22 #include <folly/futures/Future.h>
24 #include <folly/Conv.h>
25 #include <folly/fibers/AddTasks.h>
26 #include <folly/fibers/AtomicBatchDispatcher.h>
27 #include <folly/fibers/BatchDispatcher.h>
28 #include <folly/fibers/EventBaseLoopController.h>
29 #include <folly/fibers/FiberManager.h>
30 #include <folly/fibers/FiberManagerMap.h>
31 #include <folly/fibers/GenericBaton.h>
32 #include <folly/fibers/Semaphore.h>
33 #include <folly/fibers/SimpleLoopController.h>
34 #include <folly/fibers/WhenN.h>
35 #include <folly/io/async/ScopedEventBaseThread.h>
36 #include <folly/portability/GTest.h>
38 using namespace folly::fibers;
42 TEST(FiberManager, batonTimedWaitTimeout) {
43 bool taskAdded = false;
44 size_t iterations = 0;
46 FiberManager manager(folly::make_unique<SimpleLoopController>());
47 auto& loopController =
48 dynamic_cast<SimpleLoopController&>(manager.loopController());
50 auto loopFunc = [&]() {
52 manager.addTask([&]() {
55 auto res = baton.timed_wait(std::chrono::milliseconds(230));
58 EXPECT_EQ(5, iterations);
60 loopController.stop();
62 manager.addTask([&]() {
65 auto res = baton.timed_wait(std::chrono::milliseconds(130));
68 EXPECT_EQ(3, iterations);
70 loopController.stop();
74 std::this_thread::sleep_for(std::chrono::milliseconds(50));
79 loopController.loop(std::move(loopFunc));
82 TEST(FiberManager, batonTimedWaitPost) {
83 bool taskAdded = false;
84 size_t iterations = 0;
87 FiberManager manager(folly::make_unique<SimpleLoopController>());
88 auto& loopController =
89 dynamic_cast<SimpleLoopController&>(manager.loopController());
91 auto loopFunc = [&]() {
93 manager.addTask([&]() {
97 auto res = baton.timed_wait(std::chrono::milliseconds(130));
100 EXPECT_EQ(2, iterations);
102 loopController.stop();
106 std::this_thread::sleep_for(std::chrono::milliseconds(50));
108 if (iterations == 2) {
114 loopController.loop(std::move(loopFunc));
117 TEST(FiberManager, batonTimedWaitTimeoutEvb) {
118 size_t tasksComplete = 0;
120 folly::EventBase evb;
122 FiberManager manager(folly::make_unique<EventBaseLoopController>());
123 dynamic_cast<EventBaseLoopController&>(manager.loopController())
124 .attachEventBase(evb);
126 auto task = [&](size_t timeout_ms) {
129 auto start = EventBaseLoopController::Clock::now();
130 auto res = baton.timed_wait(std::chrono::milliseconds(timeout_ms));
131 auto finish = EventBaseLoopController::Clock::now();
136 std::chrono::duration_cast<std::chrono::milliseconds>(finish - start);
138 EXPECT_GT(duration_ms.count(), timeout_ms - 50);
139 EXPECT_LT(duration_ms.count(), timeout_ms + 50);
141 if (++tasksComplete == 2) {
142 evb.terminateLoopSoon();
146 evb.runInEventBaseThread([&]() {
147 manager.addTask([&]() { task(500); });
148 manager.addTask([&]() { task(250); });
153 EXPECT_EQ(2, tasksComplete);
156 TEST(FiberManager, batonTimedWaitPostEvb) {
157 size_t tasksComplete = 0;
159 folly::EventBase evb;
161 FiberManager manager(folly::make_unique<EventBaseLoopController>());
162 dynamic_cast<EventBaseLoopController&>(manager.loopController())
163 .attachEventBase(evb);
165 evb.runInEventBaseThread([&]() {
166 manager.addTask([&]() {
169 evb.tryRunAfterDelay([&]() { baton.post(); }, 100);
171 auto start = EventBaseLoopController::Clock::now();
172 auto res = baton.timed_wait(std::chrono::milliseconds(130));
173 auto finish = EventBaseLoopController::Clock::now();
178 std::chrono::duration_cast<std::chrono::milliseconds>(finish - start);
180 EXPECT_TRUE(duration_ms.count() > 95 && duration_ms.count() < 110);
182 if (++tasksComplete == 1) {
183 evb.terminateLoopSoon();
190 EXPECT_EQ(1, tasksComplete);
193 TEST(FiberManager, batonTryWait) {
194 FiberManager manager(folly::make_unique<SimpleLoopController>());
196 // Check if try_wait and post work as expected
199 manager.addTask([&]() {
200 while (!b.try_wait()) {
203 auto thr = std::thread([&]() {
204 std::this_thread::sleep_for(std::chrono::milliseconds(300));
208 manager.loopUntilNoReady();
213 // Check try_wait without post
214 manager.addTask([&]() {
216 while (cnt && !c.try_wait()) {
219 EXPECT_TRUE(!c.try_wait()); // must still hold
223 manager.loopUntilNoReady();
226 TEST(FiberManager, genericBatonFiberWait) {
227 FiberManager manager(folly::make_unique<SimpleLoopController>());
230 bool fiberRunning = false;
232 manager.addTask([&]() {
233 EXPECT_EQ(manager.hasActiveFiber(), true);
236 fiberRunning = false;
239 EXPECT_FALSE(fiberRunning);
240 manager.loopUntilNoReady();
241 EXPECT_TRUE(fiberRunning); // ensure fiber still active
243 auto thr = std::thread([&]() {
244 std::this_thread::sleep_for(std::chrono::milliseconds(300));
248 while (fiberRunning) {
249 manager.loopUntilNoReady();
255 TEST(FiberManager, genericBatonThreadWait) {
256 FiberManager manager(folly::make_unique<SimpleLoopController>());
258 std::atomic<bool> threadWaiting(false);
260 auto thr = std::thread([&]() {
261 threadWaiting = true;
263 threadWaiting = false;
266 while (!threadWaiting) {
268 std::this_thread::sleep_for(std::chrono::milliseconds(300));
270 manager.addTask([&]() {
271 EXPECT_EQ(manager.hasActiveFiber(), true);
272 EXPECT_TRUE(threadWaiting);
274 while (threadWaiting) {
278 manager.loopUntilNoReady();
282 TEST(FiberManager, addTasksNoncopyable) {
283 std::vector<Promise<int>> pendingFibers;
284 bool taskAdded = false;
286 FiberManager manager(folly::make_unique<SimpleLoopController>());
287 auto& loopController =
288 dynamic_cast<SimpleLoopController&>(manager.loopController());
290 auto loopFunc = [&]() {
292 manager.addTask([&]() {
293 std::vector<std::function<std::unique_ptr<int>()>> funcs;
294 for (size_t i = 0; i < 3; ++i) {
295 funcs.push_back([i, &pendingFibers]() {
296 await([&pendingFibers](Promise<int> promise) {
297 pendingFibers.push_back(std::move(promise));
299 return folly::make_unique<int>(i * 2 + 1);
303 auto iter = addTasks(funcs.begin(), funcs.end());
306 while (iter.hasNext()) {
307 auto result = iter.awaitNext();
308 EXPECT_EQ(2 * iter.getTaskID() + 1, *result);
309 EXPECT_GE(2 - n, pendingFibers.size());
315 } else if (pendingFibers.size()) {
316 pendingFibers.back().setValue(0);
317 pendingFibers.pop_back();
319 loopController.stop();
323 loopController.loop(std::move(loopFunc));
326 TEST(FiberManager, awaitThrow) {
327 folly::EventBase evb;
328 struct ExpectedException {};
332 await([](Promise<int> p) {
334 throw ExpectedException();
340 await([&](Promise<int> p) {
341 evb.runInEventBaseThread([p = std::move(p)]() mutable {
344 throw ExpectedException();
351 TEST(FiberManager, addTasksThrow) {
352 std::vector<Promise<int>> pendingFibers;
353 bool taskAdded = false;
355 FiberManager manager(folly::make_unique<SimpleLoopController>());
356 auto& loopController =
357 dynamic_cast<SimpleLoopController&>(manager.loopController());
359 auto loopFunc = [&]() {
361 manager.addTask([&]() {
362 std::vector<std::function<int()>> funcs;
363 for (size_t i = 0; i < 3; ++i) {
364 funcs.push_back([i, &pendingFibers]() {
365 await([&pendingFibers](Promise<int> promise) {
366 pendingFibers.push_back(std::move(promise));
369 throw std::runtime_error("Runtime");
375 auto iter = addTasks(funcs.begin(), funcs.end());
378 while (iter.hasNext()) {
380 int result = iter.awaitNext();
381 EXPECT_EQ(1, iter.getTaskID() % 2);
382 EXPECT_EQ(2 * iter.getTaskID() + 1, result);
384 EXPECT_EQ(0, iter.getTaskID() % 2);
386 EXPECT_GE(2 - n, pendingFibers.size());
392 } else if (pendingFibers.size()) {
393 pendingFibers.back().setValue(0);
394 pendingFibers.pop_back();
396 loopController.stop();
400 loopController.loop(std::move(loopFunc));
403 TEST(FiberManager, addTasksVoid) {
404 std::vector<Promise<int>> pendingFibers;
405 bool taskAdded = false;
407 FiberManager manager(folly::make_unique<SimpleLoopController>());
408 auto& loopController =
409 dynamic_cast<SimpleLoopController&>(manager.loopController());
411 auto loopFunc = [&]() {
413 manager.addTask([&]() {
414 std::vector<std::function<void()>> funcs;
415 for (size_t i = 0; i < 3; ++i) {
416 funcs.push_back([i, &pendingFibers]() {
417 await([&pendingFibers](Promise<int> promise) {
418 pendingFibers.push_back(std::move(promise));
423 auto iter = addTasks(funcs.begin(), funcs.end());
426 while (iter.hasNext()) {
428 EXPECT_GE(2 - n, pendingFibers.size());
434 } else if (pendingFibers.size()) {
435 pendingFibers.back().setValue(0);
436 pendingFibers.pop_back();
438 loopController.stop();
442 loopController.loop(std::move(loopFunc));
445 TEST(FiberManager, addTasksVoidThrow) {
446 std::vector<Promise<int>> pendingFibers;
447 bool taskAdded = false;
449 FiberManager manager(folly::make_unique<SimpleLoopController>());
450 auto& loopController =
451 dynamic_cast<SimpleLoopController&>(manager.loopController());
453 auto loopFunc = [&]() {
455 manager.addTask([&]() {
456 std::vector<std::function<void()>> funcs;
457 for (size_t i = 0; i < 3; ++i) {
458 funcs.push_back([i, &pendingFibers]() {
459 await([&pendingFibers](Promise<int> promise) {
460 pendingFibers.push_back(std::move(promise));
463 throw std::runtime_error("");
468 auto iter = addTasks(funcs.begin(), funcs.end());
471 while (iter.hasNext()) {
474 EXPECT_EQ(1, iter.getTaskID() % 2);
476 EXPECT_EQ(0, iter.getTaskID() % 2);
478 EXPECT_GE(2 - n, pendingFibers.size());
484 } else if (pendingFibers.size()) {
485 pendingFibers.back().setValue(0);
486 pendingFibers.pop_back();
488 loopController.stop();
492 loopController.loop(std::move(loopFunc));
495 TEST(FiberManager, addTasksReserve) {
496 std::vector<Promise<int>> pendingFibers;
497 bool taskAdded = false;
499 FiberManager manager(folly::make_unique<SimpleLoopController>());
500 auto& loopController =
501 dynamic_cast<SimpleLoopController&>(manager.loopController());
503 auto loopFunc = [&]() {
505 manager.addTask([&]() {
506 std::vector<std::function<void()>> funcs;
507 for (size_t i = 0; i < 3; ++i) {
508 funcs.push_back([&pendingFibers]() {
509 await([&pendingFibers](Promise<int> promise) {
510 pendingFibers.push_back(std::move(promise));
515 auto iter = addTasks(funcs.begin(), funcs.end());
518 EXPECT_TRUE(iter.hasCompleted());
519 EXPECT_TRUE(iter.hasPending());
520 EXPECT_TRUE(iter.hasNext());
523 EXPECT_TRUE(iter.hasCompleted());
524 EXPECT_TRUE(iter.hasPending());
525 EXPECT_TRUE(iter.hasNext());
528 EXPECT_FALSE(iter.hasCompleted());
529 EXPECT_TRUE(iter.hasPending());
530 EXPECT_TRUE(iter.hasNext());
533 EXPECT_FALSE(iter.hasCompleted());
534 EXPECT_FALSE(iter.hasPending());
535 EXPECT_FALSE(iter.hasNext());
538 } else if (pendingFibers.size()) {
539 pendingFibers.back().setValue(0);
540 pendingFibers.pop_back();
542 loopController.stop();
546 loopController.loop(std::move(loopFunc));
549 TEST(FiberManager, addTaskDynamic) {
550 folly::EventBase evb;
554 auto makeTask = [&](size_t taskId) {
555 return [&, taskId]() -> size_t {
556 batons[taskId].wait();
562 .addTaskFuture([&]() {
563 TaskIterator<size_t> iterator;
565 iterator.addTask(makeTask(0));
566 iterator.addTask(makeTask(1));
570 EXPECT_EQ(1, iterator.awaitNext());
572 iterator.addTask(makeTask(2));
576 EXPECT_EQ(2, iterator.awaitNext());
580 EXPECT_EQ(0, iterator.awaitNext());
585 TEST(FiberManager, forEach) {
586 std::vector<Promise<int>> pendingFibers;
587 bool taskAdded = false;
589 FiberManager manager(folly::make_unique<SimpleLoopController>());
590 auto& loopController =
591 dynamic_cast<SimpleLoopController&>(manager.loopController());
593 auto loopFunc = [&]() {
595 manager.addTask([&]() {
596 std::vector<std::function<int()>> funcs;
597 for (size_t i = 0; i < 3; ++i) {
598 funcs.push_back([i, &pendingFibers]() {
599 await([&pendingFibers](Promise<int> promise) {
600 pendingFibers.push_back(std::move(promise));
606 std::vector<std::pair<size_t, int>> results;
607 forEach(funcs.begin(), funcs.end(), [&results](size_t id, int result) {
608 results.emplace_back(id, result);
610 EXPECT_EQ(3, results.size());
611 EXPECT_TRUE(pendingFibers.empty());
612 for (size_t i = 0; i < 3; ++i) {
613 EXPECT_EQ(results[i].first * 2 + 1, results[i].second);
617 } else if (pendingFibers.size()) {
618 pendingFibers.back().setValue(0);
619 pendingFibers.pop_back();
621 loopController.stop();
625 loopController.loop(std::move(loopFunc));
628 TEST(FiberManager, collectN) {
629 std::vector<Promise<int>> pendingFibers;
630 bool taskAdded = false;
632 FiberManager manager(folly::make_unique<SimpleLoopController>());
633 auto& loopController =
634 dynamic_cast<SimpleLoopController&>(manager.loopController());
636 auto loopFunc = [&]() {
638 manager.addTask([&]() {
639 std::vector<std::function<int()>> funcs;
640 for (size_t i = 0; i < 3; ++i) {
641 funcs.push_back([i, &pendingFibers]() {
642 await([&pendingFibers](Promise<int> promise) {
643 pendingFibers.push_back(std::move(promise));
649 auto results = collectN(funcs.begin(), funcs.end(), 2);
650 EXPECT_EQ(2, results.size());
651 EXPECT_EQ(1, pendingFibers.size());
652 for (size_t i = 0; i < 2; ++i) {
653 EXPECT_EQ(results[i].first * 2 + 1, results[i].second);
657 } else if (pendingFibers.size()) {
658 pendingFibers.back().setValue(0);
659 pendingFibers.pop_back();
661 loopController.stop();
665 loopController.loop(std::move(loopFunc));
668 TEST(FiberManager, collectNThrow) {
669 std::vector<Promise<int>> pendingFibers;
670 bool taskAdded = false;
672 FiberManager manager(folly::make_unique<SimpleLoopController>());
673 auto& loopController =
674 dynamic_cast<SimpleLoopController&>(manager.loopController());
676 auto loopFunc = [&]() {
678 manager.addTask([&]() {
679 std::vector<std::function<int()>> funcs;
680 for (size_t i = 0; i < 3; ++i) {
681 funcs.push_back([i, &pendingFibers]() -> size_t {
682 await([&pendingFibers](Promise<int> promise) {
683 pendingFibers.push_back(std::move(promise));
685 throw std::runtime_error("Runtime");
690 collectN(funcs.begin(), funcs.end(), 2);
692 EXPECT_EQ(1, pendingFibers.size());
696 } else if (pendingFibers.size()) {
697 pendingFibers.back().setValue(0);
698 pendingFibers.pop_back();
700 loopController.stop();
704 loopController.loop(std::move(loopFunc));
707 TEST(FiberManager, collectNVoid) {
708 std::vector<Promise<int>> pendingFibers;
709 bool taskAdded = false;
711 FiberManager manager(folly::make_unique<SimpleLoopController>());
712 auto& loopController =
713 dynamic_cast<SimpleLoopController&>(manager.loopController());
715 auto loopFunc = [&]() {
717 manager.addTask([&]() {
718 std::vector<std::function<void()>> funcs;
719 for (size_t i = 0; i < 3; ++i) {
720 funcs.push_back([i, &pendingFibers]() {
721 await([&pendingFibers](Promise<int> promise) {
722 pendingFibers.push_back(std::move(promise));
727 auto results = collectN(funcs.begin(), funcs.end(), 2);
728 EXPECT_EQ(2, results.size());
729 EXPECT_EQ(1, pendingFibers.size());
732 } else if (pendingFibers.size()) {
733 pendingFibers.back().setValue(0);
734 pendingFibers.pop_back();
736 loopController.stop();
740 loopController.loop(std::move(loopFunc));
743 TEST(FiberManager, collectNVoidThrow) {
744 std::vector<Promise<int>> pendingFibers;
745 bool taskAdded = false;
747 FiberManager manager(folly::make_unique<SimpleLoopController>());
748 auto& loopController =
749 dynamic_cast<SimpleLoopController&>(manager.loopController());
751 auto loopFunc = [&]() {
753 manager.addTask([&]() {
754 std::vector<std::function<void()>> funcs;
755 for (size_t i = 0; i < 3; ++i) {
756 funcs.push_back([i, &pendingFibers]() {
757 await([&pendingFibers](Promise<int> promise) {
758 pendingFibers.push_back(std::move(promise));
760 throw std::runtime_error("Runtime");
765 collectN(funcs.begin(), funcs.end(), 2);
767 EXPECT_EQ(1, pendingFibers.size());
771 } else if (pendingFibers.size()) {
772 pendingFibers.back().setValue(0);
773 pendingFibers.pop_back();
775 loopController.stop();
779 loopController.loop(std::move(loopFunc));
782 TEST(FiberManager, collectAll) {
783 std::vector<Promise<int>> pendingFibers;
784 bool taskAdded = false;
786 FiberManager manager(folly::make_unique<SimpleLoopController>());
787 auto& loopController =
788 dynamic_cast<SimpleLoopController&>(manager.loopController());
790 auto loopFunc = [&]() {
792 manager.addTask([&]() {
793 std::vector<std::function<int()>> funcs;
794 for (size_t i = 0; i < 3; ++i) {
795 funcs.push_back([i, &pendingFibers]() {
796 await([&pendingFibers](Promise<int> promise) {
797 pendingFibers.push_back(std::move(promise));
803 auto results = collectAll(funcs.begin(), funcs.end());
804 EXPECT_TRUE(pendingFibers.empty());
805 for (size_t i = 0; i < 3; ++i) {
806 EXPECT_EQ(i * 2 + 1, results[i]);
810 } else if (pendingFibers.size()) {
811 pendingFibers.back().setValue(0);
812 pendingFibers.pop_back();
814 loopController.stop();
818 loopController.loop(std::move(loopFunc));
821 TEST(FiberManager, collectAllVoid) {
822 std::vector<Promise<int>> pendingFibers;
823 bool taskAdded = false;
825 FiberManager manager(folly::make_unique<SimpleLoopController>());
826 auto& loopController =
827 dynamic_cast<SimpleLoopController&>(manager.loopController());
829 auto loopFunc = [&]() {
831 manager.addTask([&]() {
832 std::vector<std::function<void()>> funcs;
833 for (size_t i = 0; i < 3; ++i) {
834 funcs.push_back([i, &pendingFibers]() {
835 await([&pendingFibers](Promise<int> promise) {
836 pendingFibers.push_back(std::move(promise));
841 collectAll(funcs.begin(), funcs.end());
842 EXPECT_TRUE(pendingFibers.empty());
845 } else if (pendingFibers.size()) {
846 pendingFibers.back().setValue(0);
847 pendingFibers.pop_back();
849 loopController.stop();
853 loopController.loop(std::move(loopFunc));
856 TEST(FiberManager, collectAny) {
857 std::vector<Promise<int>> pendingFibers;
858 bool taskAdded = false;
860 FiberManager manager(folly::make_unique<SimpleLoopController>());
861 auto& loopController =
862 dynamic_cast<SimpleLoopController&>(manager.loopController());
864 auto loopFunc = [&]() {
866 manager.addTask([&]() {
867 std::vector<std::function<int()>> funcs;
868 for (size_t i = 0; i < 3; ++i) {
869 funcs.push_back([i, &pendingFibers]() {
870 await([&pendingFibers](Promise<int> promise) {
871 pendingFibers.push_back(std::move(promise));
874 throw std::runtime_error("This exception will be ignored");
880 auto result = collectAny(funcs.begin(), funcs.end());
881 EXPECT_EQ(2, pendingFibers.size());
882 EXPECT_EQ(2, result.first);
883 EXPECT_EQ(2 * 2 + 1, result.second);
886 } else if (pendingFibers.size()) {
887 pendingFibers.back().setValue(0);
888 pendingFibers.pop_back();
890 loopController.stop();
894 loopController.loop(std::move(loopFunc));
898 /* Checks that this function was run from a main context,
899 by comparing an address on a stack to a known main stack address
900 and a known related fiber stack address. The assumption
901 is that fiber stack and main stack will be far enough apart,
902 while any two values on the same stack will be close. */
903 void expectMainContext(bool& ran, int* mainLocation, int* fiberLocation) {
905 /* 2 pages is a good guess */
906 constexpr ssize_t DISTANCE = 0x2000 / sizeof(int);
908 EXPECT_TRUE(std::abs(&here - fiberLocation) > DISTANCE);
911 EXPECT_TRUE(std::abs(&here - mainLocation) < DISTANCE);
919 TEST(FiberManager, runInMainContext) {
920 FiberManager manager(folly::make_unique<SimpleLoopController>());
921 auto& loopController =
922 dynamic_cast<SimpleLoopController&>(manager.loopController());
924 bool checkRan = false;
927 manager.runInMainContext(
928 [&]() { expectMainContext(checkRan, &mainLocation, nullptr); });
929 EXPECT_TRUE(checkRan);
933 manager.addTask([&]() {
935 explicit A(int value_) : value(value_) {}
936 A(const A&) = delete;
942 auto ret = runInMainContext([&]() {
943 expectMainContext(checkRan, &mainLocation, &stackLocation);
946 EXPECT_TRUE(checkRan);
947 EXPECT_EQ(42, ret.value);
950 loopController.loop([&]() { loopController.stop(); });
952 EXPECT_TRUE(checkRan);
955 TEST(FiberManager, addTaskFinally) {
956 FiberManager manager(folly::make_unique<SimpleLoopController>());
957 auto& loopController =
958 dynamic_cast<SimpleLoopController&>(manager.loopController());
960 bool checkRan = false;
964 manager.addTaskFinally(
965 [&]() { return 1234; },
966 [&](Try<int>&& result) {
967 EXPECT_EQ(result.value(), 1234);
969 expectMainContext(checkRan, &mainLocation, nullptr);
972 EXPECT_FALSE(checkRan);
974 loopController.loop([&]() { loopController.stop(); });
976 EXPECT_TRUE(checkRan);
979 TEST(FiberManager, fibersPoolWithinLimit) {
980 FiberManager::Options opts;
981 opts.maxFibersPoolSize = 5;
983 FiberManager manager(folly::make_unique<SimpleLoopController>(), opts);
984 auto& loopController =
985 dynamic_cast<SimpleLoopController&>(manager.loopController());
987 size_t fibersRun = 0;
989 for (size_t i = 0; i < 5; ++i) {
990 manager.addTask([&]() { ++fibersRun; });
992 loopController.loop([&]() { loopController.stop(); });
994 EXPECT_EQ(5, fibersRun);
995 EXPECT_EQ(5, manager.fibersAllocated());
996 EXPECT_EQ(5, manager.fibersPoolSize());
998 for (size_t i = 0; i < 5; ++i) {
999 manager.addTask([&]() { ++fibersRun; });
1001 loopController.loop([&]() { loopController.stop(); });
1003 EXPECT_EQ(10, fibersRun);
1004 EXPECT_EQ(5, manager.fibersAllocated());
1005 EXPECT_EQ(5, manager.fibersPoolSize());
1008 TEST(FiberManager, fibersPoolOverLimit) {
1009 FiberManager::Options opts;
1010 opts.maxFibersPoolSize = 5;
1012 FiberManager manager(folly::make_unique<SimpleLoopController>(), opts);
1013 auto& loopController =
1014 dynamic_cast<SimpleLoopController&>(manager.loopController());
1016 size_t fibersRun = 0;
1018 for (size_t i = 0; i < 10; ++i) {
1019 manager.addTask([&]() { ++fibersRun; });
1022 EXPECT_EQ(0, fibersRun);
1023 EXPECT_EQ(10, manager.fibersAllocated());
1024 EXPECT_EQ(0, manager.fibersPoolSize());
1026 loopController.loop([&]() { loopController.stop(); });
1028 EXPECT_EQ(10, fibersRun);
1029 EXPECT_EQ(5, manager.fibersAllocated());
1030 EXPECT_EQ(5, manager.fibersPoolSize());
1033 TEST(FiberManager, remoteFiberBasic) {
1034 FiberManager manager(folly::make_unique<SimpleLoopController>());
1035 auto& loopController =
1036 dynamic_cast<SimpleLoopController&>(manager.loopController());
1039 result[0] = result[1] = 0;
1040 folly::Optional<Promise<int>> savedPromise[2];
1041 manager.addTask([&]() {
1043 [&](Promise<int> promise) { savedPromise[0] = std::move(promise); });
1045 manager.addTask([&]() {
1047 [&](Promise<int> promise) { savedPromise[1] = std::move(promise); });
1050 manager.loopUntilNoReady();
1052 EXPECT_TRUE(savedPromise[0].hasValue());
1053 EXPECT_TRUE(savedPromise[1].hasValue());
1054 EXPECT_EQ(0, result[0]);
1055 EXPECT_EQ(0, result[1]);
1057 std::thread remoteThread0{[&]() { savedPromise[0]->setValue(42); }};
1058 std::thread remoteThread1{[&]() { savedPromise[1]->setValue(43); }};
1059 remoteThread0.join();
1060 remoteThread1.join();
1061 EXPECT_EQ(0, result[0]);
1062 EXPECT_EQ(0, result[1]);
1063 /* Should only have scheduled once */
1064 EXPECT_EQ(1, loopController.remoteScheduleCalled());
1066 manager.loopUntilNoReady();
1067 EXPECT_EQ(42, result[0]);
1068 EXPECT_EQ(43, result[1]);
1071 TEST(FiberManager, addTaskRemoteBasic) {
1072 FiberManager manager(folly::make_unique<SimpleLoopController>());
1075 result[0] = result[1] = 0;
1076 folly::Optional<Promise<int>> savedPromise[2];
1078 std::thread remoteThread0{[&]() {
1079 manager.addTaskRemote([&]() {
1081 [&](Promise<int> promise) { savedPromise[0] = std::move(promise); });
1084 std::thread remoteThread1{[&]() {
1085 manager.addTaskRemote([&]() {
1087 [&](Promise<int> promise) { savedPromise[1] = std::move(promise); });
1090 remoteThread0.join();
1091 remoteThread1.join();
1093 manager.loopUntilNoReady();
1095 EXPECT_TRUE(savedPromise[0].hasValue());
1096 EXPECT_TRUE(savedPromise[1].hasValue());
1097 EXPECT_EQ(0, result[0]);
1098 EXPECT_EQ(0, result[1]);
1100 savedPromise[0]->setValue(42);
1101 savedPromise[1]->setValue(43);
1103 EXPECT_EQ(0, result[0]);
1104 EXPECT_EQ(0, result[1]);
1106 manager.loopUntilNoReady();
1107 EXPECT_EQ(42, result[0]);
1108 EXPECT_EQ(43, result[1]);
1111 TEST(FiberManager, remoteHasTasks) {
1113 FiberManager fm(folly::make_unique<SimpleLoopController>());
1114 std::thread remote([&]() { fm.addTaskRemote([&]() { ++counter; }); });
1118 while (fm.hasTasks()) {
1119 fm.loopUntilNoReady();
1122 EXPECT_FALSE(fm.hasTasks());
1123 EXPECT_EQ(counter, 1);
1126 TEST(FiberManager, remoteHasReadyTasks) {
1128 folly::Optional<Promise<int>> savedPromise;
1129 FiberManager fm(folly::make_unique<SimpleLoopController>());
1130 std::thread remote([&]() {
1131 fm.addTaskRemote([&]() {
1133 [&](Promise<int> promise) { savedPromise = std::move(promise); });
1134 EXPECT_TRUE(fm.hasTasks());
1139 EXPECT_TRUE(fm.hasTasks());
1141 fm.loopUntilNoReady();
1142 EXPECT_TRUE(fm.hasTasks());
1144 std::thread remote2([&]() { savedPromise->setValue(47); });
1146 EXPECT_TRUE(fm.hasTasks());
1148 fm.loopUntilNoReady();
1149 EXPECT_FALSE(fm.hasTasks());
1151 EXPECT_EQ(result, 47);
1154 template <typename Data>
1155 void testFiberLocal() {
1157 LocalType<Data>(), folly::make_unique<SimpleLoopController>());
1160 EXPECT_EQ(42, local<Data>().value);
1162 local<Data>().value = 43;
1165 EXPECT_EQ(43, local<Data>().value);
1167 local<Data>().value = 44;
1169 addTask([]() { EXPECT_EQ(44, local<Data>().value); });
1174 EXPECT_EQ(42, local<Data>().value);
1176 local<Data>().value = 43;
1178 fm.addTaskRemote([]() { EXPECT_EQ(43, local<Data>().value); });
1182 EXPECT_EQ(42, local<Data>().value);
1183 local<Data>().value = 43;
1186 EXPECT_EQ(43, local<Data>().value);
1187 local<Data>().value = 44;
1189 std::vector<std::function<void()>> tasks{task};
1190 collectAny(tasks.begin(), tasks.end());
1192 EXPECT_EQ(43, local<Data>().value);
1195 fm.loopUntilNoReady();
1196 EXPECT_FALSE(fm.hasTasks());
1199 TEST(FiberManager, fiberLocal) {
1204 testFiberLocal<SimpleData>();
1207 TEST(FiberManager, fiberLocalHeap) {
1209 char _[1024 * 1024];
1213 testFiberLocal<LargeData>();
1216 TEST(FiberManager, fiberLocalDestructor) {
1223 EXPECT_EQ(42, local<CrazyData>().data);
1224 // Make sure we don't have infinite loop
1225 local<CrazyData>().data = 0;
1232 LocalType<CrazyData>(), folly::make_unique<SimpleLoopController>());
1234 fm.addTask([]() { local<CrazyData>().data = 41; });
1236 fm.loopUntilNoReady();
1237 EXPECT_FALSE(fm.hasTasks());
1240 TEST(FiberManager, yieldTest) {
1241 FiberManager manager(folly::make_unique<SimpleLoopController>());
1242 auto& loopController =
1243 dynamic_cast<SimpleLoopController&>(manager.loopController());
1245 bool checkRan = false;
1247 manager.addTask([&]() {
1252 loopController.loop([&]() {
1254 loopController.stop();
1258 EXPECT_TRUE(checkRan);
1261 TEST(FiberManager, RequestContext) {
1262 FiberManager fm(folly::make_unique<SimpleLoopController>());
1264 bool checkRun1 = false;
1265 bool checkRun2 = false;
1266 bool checkRun3 = false;
1267 bool checkRun4 = false;
1268 folly::fibers::Baton baton1;
1269 folly::fibers::Baton baton2;
1270 folly::fibers::Baton baton3;
1271 folly::fibers::Baton baton4;
1274 folly::RequestContextScopeGuard rctx;
1275 auto rcontext1 = folly::RequestContext::get();
1276 fm.addTask([&, rcontext1]() {
1277 EXPECT_EQ(rcontext1, folly::RequestContext::get());
1279 [&]() { EXPECT_EQ(rcontext1, folly::RequestContext::get()); });
1280 EXPECT_EQ(rcontext1, folly::RequestContext::get());
1282 [&]() { EXPECT_EQ(rcontext1, folly::RequestContext::get()); });
1287 folly::RequestContextScopeGuard rctx;
1288 auto rcontext2 = folly::RequestContext::get();
1289 fm.addTaskRemote([&, rcontext2]() {
1290 EXPECT_EQ(rcontext2, folly::RequestContext::get());
1292 EXPECT_EQ(rcontext2, folly::RequestContext::get());
1297 folly::RequestContextScopeGuard rctx;
1298 auto rcontext3 = folly::RequestContext::get();
1301 EXPECT_EQ(rcontext3, folly::RequestContext::get());
1303 EXPECT_EQ(rcontext3, folly::RequestContext::get());
1305 return folly::Unit();
1307 [&, rcontext3](Try<folly::Unit>&& /* t */) {
1308 EXPECT_EQ(rcontext3, folly::RequestContext::get());
1313 folly::RequestContext::setContext(nullptr);
1315 folly::RequestContextScopeGuard rctx;
1316 auto rcontext4 = folly::RequestContext::get();
1318 EXPECT_EQ(rcontext4, folly::RequestContext::get());
1323 folly::RequestContextScopeGuard rctx;
1324 auto rcontext = folly::RequestContext::get();
1326 fm.loopUntilNoReady();
1327 EXPECT_EQ(rcontext, folly::RequestContext::get());
1330 EXPECT_EQ(rcontext, folly::RequestContext::get());
1331 fm.loopUntilNoReady();
1332 EXPECT_TRUE(checkRun1);
1333 EXPECT_EQ(rcontext, folly::RequestContext::get());
1336 EXPECT_EQ(rcontext, folly::RequestContext::get());
1337 fm.loopUntilNoReady();
1338 EXPECT_TRUE(checkRun2);
1339 EXPECT_EQ(rcontext, folly::RequestContext::get());
1342 EXPECT_EQ(rcontext, folly::RequestContext::get());
1343 fm.loopUntilNoReady();
1344 EXPECT_TRUE(checkRun3);
1345 EXPECT_EQ(rcontext, folly::RequestContext::get());
1348 EXPECT_EQ(rcontext, folly::RequestContext::get());
1349 fm.loopUntilNoReady();
1350 EXPECT_TRUE(checkRun4);
1351 EXPECT_EQ(rcontext, folly::RequestContext::get());
1355 TEST(FiberManager, resizePeriodically) {
1356 FiberManager::Options opts;
1357 opts.fibersPoolResizePeriodMs = 300;
1358 opts.maxFibersPoolSize = 5;
1360 FiberManager manager(folly::make_unique<EventBaseLoopController>(), opts);
1362 folly::EventBase evb;
1363 dynamic_cast<EventBaseLoopController&>(manager.loopController())
1364 .attachEventBase(evb);
1366 std::vector<Baton> batons(10);
1368 size_t tasksRun = 0;
1369 for (size_t i = 0; i < 30; ++i) {
1370 manager.addTask([i, &batons, &tasksRun]() {
1372 // Keep some fibers active indefinitely
1373 if (i < batons.size()) {
1379 EXPECT_EQ(0, tasksRun);
1380 EXPECT_EQ(30, manager.fibersAllocated());
1381 EXPECT_EQ(0, manager.fibersPoolSize());
1384 EXPECT_EQ(30, tasksRun);
1385 EXPECT_EQ(30, manager.fibersAllocated());
1386 // Can go over maxFibersPoolSize, 10 of 30 fibers still active
1387 EXPECT_EQ(20, manager.fibersPoolSize());
1389 std::this_thread::sleep_for(std::chrono::milliseconds(400));
1390 evb.loopOnce(); // no fibers active in this period
1391 EXPECT_EQ(30, manager.fibersAllocated());
1392 EXPECT_EQ(20, manager.fibersPoolSize());
1394 std::this_thread::sleep_for(std::chrono::milliseconds(400));
1395 evb.loopOnce(); // should shrink fibers pool to maxFibersPoolSize
1396 EXPECT_EQ(15, manager.fibersAllocated());
1397 EXPECT_EQ(5, manager.fibersPoolSize());
1399 for (size_t i = 0; i < batons.size(); ++i) {
1403 EXPECT_EQ(15, manager.fibersAllocated());
1404 EXPECT_EQ(15, manager.fibersPoolSize());
1406 std::this_thread::sleep_for(std::chrono::milliseconds(400));
1407 evb.loopOnce(); // 10 fibers active in last period
1408 EXPECT_EQ(10, manager.fibersAllocated());
1409 EXPECT_EQ(10, manager.fibersPoolSize());
1411 std::this_thread::sleep_for(std::chrono::milliseconds(400));
1413 EXPECT_EQ(5, manager.fibersAllocated());
1414 EXPECT_EQ(5, manager.fibersPoolSize());
1417 TEST(FiberManager, batonWaitTimeoutHandler) {
1418 FiberManager manager(folly::make_unique<EventBaseLoopController>());
1420 folly::EventBase evb;
1421 dynamic_cast<EventBaseLoopController&>(manager.loopController())
1422 .attachEventBase(evb);
1424 size_t fibersRun = 0;
1426 Baton::TimeoutHandler timeoutHandler;
1428 manager.addTask([&]() {
1429 baton.wait(timeoutHandler);
1432 manager.loopUntilNoReady();
1434 EXPECT_FALSE(baton.try_wait());
1435 EXPECT_EQ(0, fibersRun);
1437 timeoutHandler.scheduleTimeout(std::chrono::milliseconds(250));
1438 std::this_thread::sleep_for(std::chrono::milliseconds(500));
1440 EXPECT_FALSE(baton.try_wait());
1441 EXPECT_EQ(0, fibersRun);
1444 manager.loopUntilNoReady();
1446 EXPECT_EQ(1, fibersRun);
1449 TEST(FiberManager, batonWaitTimeoutMany) {
1450 FiberManager manager(folly::make_unique<EventBaseLoopController>());
1452 folly::EventBase evb;
1453 dynamic_cast<EventBaseLoopController&>(manager.loopController())
1454 .attachEventBase(evb);
1456 constexpr size_t kNumTimeoutTasks = 10000;
1457 size_t tasksCount = kNumTimeoutTasks;
1459 // We add many tasks to hit timeout queue deallocation logic.
1460 for (size_t i = 0; i < kNumTimeoutTasks; ++i) {
1461 manager.addTask([&]() {
1463 Baton::TimeoutHandler timeoutHandler;
1465 folly::fibers::addTask([&] {
1466 timeoutHandler.scheduleTimeout(std::chrono::milliseconds(1000));
1469 baton.wait(timeoutHandler);
1470 if (--tasksCount == 0) {
1471 evb.terminateLoopSoon();
1479 TEST(FiberManager, remoteFutureTest) {
1480 FiberManager fiberManager(folly::make_unique<SimpleLoopController>());
1481 auto& loopController =
1482 dynamic_cast<SimpleLoopController&>(fiberManager.loopController());
1486 auto f1 = fiberManager.addTaskFuture([&]() { return testValue1; });
1487 auto f2 = fiberManager.addTaskRemoteFuture([&]() { return testValue2; });
1488 loopController.loop([&]() { loopController.stop(); });
1492 EXPECT_EQ(v1, testValue1);
1493 EXPECT_EQ(v2, testValue2);
1496 // Test that a void function produes a Future<Unit>.
1497 TEST(FiberManager, remoteFutureVoidUnitTest) {
1498 FiberManager fiberManager(folly::make_unique<SimpleLoopController>());
1499 auto& loopController =
1500 dynamic_cast<SimpleLoopController&>(fiberManager.loopController());
1502 bool ranLocal = false;
1503 folly::Future<folly::Unit> futureLocal =
1504 fiberManager.addTaskFuture([&]() { ranLocal = true; });
1506 bool ranRemote = false;
1507 folly::Future<folly::Unit> futureRemote =
1508 fiberManager.addTaskRemoteFuture([&]() { ranRemote = true; });
1510 loopController.loop([&]() { loopController.stop(); });
1513 ASSERT_TRUE(ranLocal);
1515 futureRemote.wait();
1516 ASSERT_TRUE(ranRemote);
1519 TEST(FiberManager, nestedFiberManagers) {
1520 folly::EventBase outerEvb;
1521 folly::EventBase innerEvb;
1523 getFiberManager(outerEvb).addTask([&]() {
1525 &getFiberManager(outerEvb), FiberManager::getFiberManagerUnsafe());
1527 runInMainContext([&]() {
1528 getFiberManager(innerEvb).addTask([&]() {
1530 &getFiberManager(innerEvb), FiberManager::getFiberManagerUnsafe());
1532 innerEvb.terminateLoopSoon();
1535 innerEvb.loopForever();
1539 &getFiberManager(outerEvb), FiberManager::getFiberManagerUnsafe());
1541 outerEvb.terminateLoopSoon();
1544 outerEvb.loopForever();
1547 TEST(FiberManager, semaphore) {
1548 constexpr size_t kTasks = 10;
1549 constexpr size_t kIterations = 10000;
1550 constexpr size_t kNumTokens = 10;
1552 Semaphore sem(kNumTokens);
1556 auto task = [&sem, kTasks, kIterations, kNumTokens](
1557 int& counter, folly::fibers::Baton& baton) {
1558 FiberManager manager(folly::make_unique<EventBaseLoopController>());
1559 folly::EventBase evb;
1560 dynamic_cast<EventBaseLoopController&>(manager.loopController())
1561 .attachEventBase(evb);
1564 std::shared_ptr<folly::EventBase> completionCounter(
1565 &evb, [](folly::EventBase* evb) { evb->terminateLoopSoon(); });
1567 for (size_t i = 0; i < kTasks; ++i) {
1568 manager.addTask([&, completionCounter]() {
1569 for (size_t j = 0; j < kIterations; ++j) {
1575 EXPECT_LT(counter, kNumTokens);
1576 EXPECT_GE(counter, 0);
1586 folly::fibers::Baton batonA;
1587 folly::fibers::Baton batonB;
1588 std::thread threadA([&] { task(counterA, batonA); });
1589 std::thread threadB([&] { task(counterB, batonB); });
1596 EXPECT_LT(counterA, kNumTokens);
1597 EXPECT_LT(counterB, kNumTokens);
1598 EXPECT_GE(counterA, 0);
1599 EXPECT_GE(counterB, 0);
1602 template <typename ExecutorT>
1603 void singleBatchDispatch(ExecutorT& executor, int batchSize, int index) {
1604 thread_local BatchDispatcher<int, std::string, ExecutorT> batchDispatcher(
1605 executor, [=](std::vector<int>&& batch) {
1606 EXPECT_EQ(batchSize, batch.size());
1607 std::vector<std::string> results;
1608 for (auto& it : batch) {
1609 results.push_back(folly::to<std::string>(it));
1614 auto indexCopy = index;
1615 auto result = batchDispatcher.add(std::move(indexCopy));
1616 EXPECT_EQ(folly::to<std::string>(index), result.get());
1619 TEST(FiberManager, batchDispatchTest) {
1620 folly::EventBase evb;
1621 auto& executor = getFiberManager(evb);
1623 // Launch multiple fibers with a single id.
1624 executor.add([&]() {
1626 for (int i = 0; i < batchSize; i++) {
1628 [=, &executor]() { singleBatchDispatch(executor, batchSize, i); });
1633 // Reuse the same BatchDispatcher to batch once again.
1634 executor.add([&]() {
1636 for (int i = 0; i < batchSize; i++) {
1638 [=, &executor]() { singleBatchDispatch(executor, batchSize, i); });
1644 template <typename ExecutorT>
1645 folly::Future<std::vector<std::string>> doubleBatchInnerDispatch(
1646 ExecutorT& executor,
1647 int totalNumberOfElements,
1648 std::vector<int> input) {
1649 thread_local BatchDispatcher<
1651 std::vector<std::string>,
1653 batchDispatcher(executor, [=](std::vector<std::vector<int>>&& batch) {
1654 std::vector<std::vector<std::string>> results;
1655 int numberOfElements = 0;
1656 for (auto& unit : batch) {
1657 numberOfElements += unit.size();
1658 std::vector<std::string> result;
1659 for (auto& element : unit) {
1660 result.push_back(folly::to<std::string>(element));
1662 results.push_back(std::move(result));
1664 EXPECT_EQ(totalNumberOfElements, numberOfElements);
1668 return batchDispatcher.add(std::move(input));
1672 * Batch values in groups of 5, and then call inner dispatch.
1674 template <typename ExecutorT>
1675 void doubleBatchOuterDispatch(
1676 ExecutorT& executor,
1677 int totalNumberOfElements,
1679 thread_local BatchDispatcher<int, std::string, ExecutorT>
1680 batchDispatcher(executor, [=, &executor](std::vector<int>&& batch) {
1681 EXPECT_EQ(totalNumberOfElements, batch.size());
1682 std::vector<std::string> results;
1683 std::vector<folly::Future<std::vector<std::string>>>
1684 innerDispatchResultFutures;
1686 std::vector<int> group;
1687 for (auto unit : batch) {
1688 group.push_back(unit);
1689 if (group.size() == 5) {
1690 auto localGroup = group;
1693 innerDispatchResultFutures.push_back(doubleBatchInnerDispatch(
1694 executor, totalNumberOfElements, localGroup));
1699 innerDispatchResultFutures.begin(), innerDispatchResultFutures.end())
1701 std::vector<Try<std::vector<std::string>>> innerDispatchResults) {
1702 for (auto& unit : innerDispatchResults) {
1703 for (auto& element : unit.value()) {
1704 results.push_back(element);
1712 auto indexCopy = index;
1713 auto result = batchDispatcher.add(std::move(indexCopy));
1714 EXPECT_EQ(folly::to<std::string>(index), result.get());
1717 TEST(FiberManager, doubleBatchDispatchTest) {
1718 folly::EventBase evb;
1719 auto& executor = getFiberManager(evb);
1721 // Launch multiple fibers with a single id.
1722 executor.add([&]() {
1723 int totalNumberOfElements = 20;
1724 for (int i = 0; i < totalNumberOfElements; i++) {
1725 executor.add([=, &executor]() {
1726 doubleBatchOuterDispatch(executor, totalNumberOfElements, i);
1733 template <typename ExecutorT>
1734 void batchDispatchExceptionHandling(ExecutorT& executor, int i) {
1735 thread_local BatchDispatcher<int, int, ExecutorT> batchDispatcher(
1736 executor, [=, &executor](std::vector<int> &&) -> std::vector<int> {
1737 throw std::runtime_error("Surprise!!");
1740 EXPECT_THROW(batchDispatcher.add(i).get(), std::runtime_error);
1743 TEST(FiberManager, batchDispatchExceptionHandlingTest) {
1744 folly::EventBase evb;
1745 auto& executor = getFiberManager(evb);
1747 // Launch multiple fibers with a single id.
1748 executor.add([&]() {
1749 int totalNumberOfElements = 5;
1750 for (int i = 0; i < totalNumberOfElements; i++) {
1752 [=, &executor]() { batchDispatchExceptionHandling(executor, i); });
1758 namespace AtomicBatchDispatcherTesting {
1760 using ValueT = size_t;
1761 using ResultT = std::string;
1762 using DispatchFunctionT =
1763 folly::Function<std::vector<ResultT>(std::vector<ValueT>&&)>;
1765 #define ENABLE_TRACE_IN_TEST 0 // Set to 1 to debug issues in ABD tests
1766 #if ENABLE_TRACE_IN_TEST
1767 #define OUTPUT_TRACE std::cerr
1768 #else // ENABLE_TRACE_IN_TEST
1769 struct DevNullPiper {
1770 template <typename T>
1771 DevNullPiper& operator<<(const T&) {
1775 DevNullPiper& operator<<(std::ostream& (*)(std::ostream&)) {
1779 #define OUTPUT_TRACE devNullPiper
1780 #endif // ENABLE_TRACE_IN_TEST
1783 AtomicBatchDispatcher<ValueT, ResultT>::Token token;
1786 void preprocess(FiberManager& executor, bool die) {
1787 // Yield for a random duration [0, 10] ms to simulate I/O in preprocessing
1788 clock_t msecToDoIO = folly::Random::rand32() % 10;
1789 double start = (1000.0 * clock()) / CLOCKS_PER_SEC;
1790 double endAfter = start + msecToDoIO;
1791 while ((1000.0 * clock()) / CLOCKS_PER_SEC < endAfter) {
1795 throw std::logic_error("Simulating preprocessing failure");
1799 Job(AtomicBatchDispatcher<ValueT, ResultT>::Token&& t, ValueT i)
1800 : token(std::move(t)), input(i) {}
1802 Job(Job&&) = default;
1803 Job& operator=(Job&&) = default;
1806 ResultT processSingleInput(ValueT&& input) {
1807 return folly::to<ResultT>(std::move(input));
1810 std::vector<ResultT> userDispatchFunc(std::vector<ValueT>&& inputs) {
1811 size_t expectedCount = inputs.size();
1812 std::vector<ResultT> results;
1813 results.reserve(expectedCount);
1814 for (size_t i = 0; i < expectedCount; ++i) {
1815 results.emplace_back(processSingleInput(std::move(inputs[i])));
1821 AtomicBatchDispatcher<ValueT, ResultT>& atomicBatchDispatcher,
1822 std::vector<Job>& jobs,
1825 for (size_t i = 0; i < count; ++i) {
1826 jobs.emplace_back(Job(atomicBatchDispatcher.getToken(), i));
1830 enum class DispatchProblem {
1837 FiberManager& executor,
1838 std::vector<Job>& jobs,
1839 std::vector<folly::Optional<folly::Future<ResultT>>>& results,
1840 DispatchProblem dispatchProblem = DispatchProblem::None,
1841 size_t problemIndex = size_t(-1)) {
1843 dispatchProblem == DispatchProblem::None || problemIndex < jobs.size());
1845 results.resize(jobs.size());
1846 for (size_t i = 0; i < jobs.size(); ++i) {
1848 [i, &executor, &jobs, &results, dispatchProblem, problemIndex]() {
1850 Job job(std::move(jobs[i]));
1852 if (dispatchProblem == DispatchProblem::PreprocessThrows) {
1853 if (i == problemIndex) {
1854 EXPECT_THROW(job.preprocess(executor, true), std::logic_error);
1859 job.preprocess(executor, false);
1860 OUTPUT_TRACE << "Dispatching job #" << i << std::endl;
1861 results[i] = job.token.dispatch(job.input);
1862 OUTPUT_TRACE << "Result future filled for job #" << i << std::endl;
1864 if (dispatchProblem == DispatchProblem::DuplicateDispatch) {
1865 if (i == problemIndex) {
1866 EXPECT_THROW(job.token.dispatch(job.input), std::logic_error);
1870 OUTPUT_TRACE << "Preprocessing failed for job #" << i << std::endl;
1876 void validateResult(
1877 std::vector<folly::Optional<folly::Future<ResultT>>>& results,
1880 OUTPUT_TRACE << "results[" << i << "].value() : " << results[i]->value()
1882 } catch (std::exception& e) {
1883 OUTPUT_TRACE << "Exception : " << e.what() << std::endl;
1888 template <typename TException>
1889 void validateResults(
1890 std::vector<folly::Optional<folly::Future<ResultT>>>& results,
1891 size_t expectedNumResults) {
1892 size_t numResultsFilled = 0;
1893 for (size_t i = 0; i < results.size(); ++i) {
1898 EXPECT_THROW(validateResult(results, i), TException);
1900 EXPECT_EQ(numResultsFilled, expectedNumResults);
1903 void validateResults(
1904 std::vector<folly::Optional<folly::Future<ResultT>>>& results,
1905 size_t expectedNumResults) {
1906 size_t numResultsFilled = 0;
1907 for (size_t i = 0; i < results.size(); ++i) {
1912 EXPECT_NO_THROW(validateResult(results, i));
1913 ValueT expectedInput = i;
1915 results[i]->value(), processSingleInput(std::move(expectedInput)));
1917 EXPECT_EQ(numResultsFilled, expectedNumResults);
1920 } // AtomicBatchDispatcherTesting
1922 #define SET_UP_TEST_FUNC \
1923 using namespace AtomicBatchDispatcherTesting; \
1924 folly::EventBase evb; \
1925 auto& executor = getFiberManager(evb); \
1926 const size_t COUNT = 11; \
1927 std::vector<Job> jobs; \
1928 jobs.reserve(COUNT); \
1929 std::vector<folly::Optional<folly::Future<ResultT>>> results; \
1930 results.reserve(COUNT); \
1931 DispatchFunctionT dispatchFunc
1933 TEST(FiberManager, ABD_Test) {
1937 // Testing AtomicBatchDispatcher with explicit call to commit()
1939 dispatchFunc = userDispatchFunc;
1940 auto atomicBatchDispatcher =
1941 createAtomicBatchDispatcher(std::move(dispatchFunc));
1942 createJobs(atomicBatchDispatcher, jobs, COUNT);
1943 dispatchJobs(executor, jobs, results);
1944 atomicBatchDispatcher.commit();
1946 validateResults(results, COUNT);
1949 TEST(FiberManager, ABD_DispatcherDestroyedBeforeCallingCommit) {
1953 // Testing AtomicBatchDispatcher destroyed before calling commit.
1954 // Handles error cases for:
1955 // - User might have forgotten to add the call to commit() in the code
1956 // - An unexpected exception got thrown in user code before commit() is called
1959 dispatchFunc = userDispatchFunc;
1960 auto atomicBatchDispatcher =
1961 createAtomicBatchDispatcher(std::move(dispatchFunc));
1962 createJobs(atomicBatchDispatcher, jobs, COUNT);
1963 dispatchJobs(executor, jobs, results);
1964 throw std::runtime_error(
1965 "Unexpected exception in user code before commit called");
1966 // atomicBatchDispatcher.commit();
1968 /* User code handles the exception and does not exit process */
1971 validateResults<std::logic_error>(results, COUNT);
1974 TEST(FiberManager, ABD_PreprocessingFailureTest) {
1978 // Testing preprocessing failure on a job throws
1980 dispatchFunc = userDispatchFunc;
1981 auto atomicBatchDispatcher =
1982 createAtomicBatchDispatcher(std::move(dispatchFunc));
1983 createJobs(atomicBatchDispatcher, jobs, COUNT);
1984 dispatchJobs(executor, jobs, results, DispatchProblem::PreprocessThrows, 8);
1985 atomicBatchDispatcher.commit();
1987 validateResults<std::logic_error>(results, COUNT - 1);
1990 TEST(FiberManager, ABD_MultipleDispatchOnSameTokenErrorTest) {
1994 // Testing that calling dispatch more than once on the same token throws
1996 dispatchFunc = userDispatchFunc;
1997 auto atomicBatchDispatcher =
1998 createAtomicBatchDispatcher(std::move(dispatchFunc));
1999 createJobs(atomicBatchDispatcher, jobs, COUNT);
2000 dispatchJobs(executor, jobs, results, DispatchProblem::DuplicateDispatch, 4);
2001 atomicBatchDispatcher.commit();
2005 TEST(FiberManager, ABD_GetTokenCalledAfterCommitTest) {
2009 // Testing that exception set on attempt to call getToken after commit called
2011 dispatchFunc = userDispatchFunc;
2012 auto atomicBatchDispatcher =
2013 createAtomicBatchDispatcher(std::move(dispatchFunc));
2014 createJobs(atomicBatchDispatcher, jobs, COUNT);
2015 atomicBatchDispatcher.commit();
2016 EXPECT_THROW(atomicBatchDispatcher.getToken(), std::logic_error);
2017 dispatchJobs(executor, jobs, results);
2018 EXPECT_THROW(atomicBatchDispatcher.getToken(), std::logic_error);
2020 validateResults(results, COUNT);
2021 EXPECT_THROW(atomicBatchDispatcher.getToken(), std::logic_error);
2024 TEST(FiberManager, ABD_UserProvidedBatchDispatchThrowsTest) {
2028 // Testing that exception is set if user provided batch dispatch throws
2030 dispatchFunc = [](std::vector<ValueT>&& inputs) -> std::vector<ResultT> {
2031 (void)userDispatchFunc(std::move(inputs));
2032 throw std::runtime_error("Unexpected exception in user dispatch function");
2034 auto atomicBatchDispatcher =
2035 createAtomicBatchDispatcher(std::move(dispatchFunc));
2036 createJobs(atomicBatchDispatcher, jobs, COUNT);
2037 dispatchJobs(executor, jobs, results);
2038 atomicBatchDispatcher.commit();
2040 validateResults<std::runtime_error>(results, COUNT);
2043 TEST(FiberManager, VirtualEventBase) {
2044 folly::ScopedEventBaseThread thread;
2047 folly::make_unique<folly::VirtualEventBase>(*thread.getEventBase());
2049 folly::make_unique<folly::VirtualEventBase>(*thread.getEventBase());
2054 getFiberManager(*evb1).addTaskRemote([&] {
2056 baton.timed_wait(std::chrono::milliseconds{100});
2061 getFiberManager(*evb2).addTaskRemote([&] {
2063 baton.timed_wait(std::chrono::milliseconds{200});
2076 * Test that we can properly track fiber stack usage.
2078 * This functionality can only be enabled when ASAN is disabled, so avoid
2079 * running this test with ASAN.
2081 #ifndef FOLLY_SANITIZE_ADDRESS
2082 TEST(FiberManager, recordStack) {
2084 folly::fibers::FiberManager::Options opts;
2085 opts.recordStackEvery = 1;
2087 FiberManager fm(folly::make_unique<SimpleLoopController>(), opts);
2088 auto& loopController =
2089 dynamic_cast<SimpleLoopController&>(fm.loopController());
2091 static constexpr size_t n = 1000;
2095 for (size_t i = 0; i < n; ++i) {
2098 for (size_t i = 0; i + 1 < n; ++i) {
2099 s += b[i] * b[i + 1];
2105 loopController.loop([&]() { loopController.stop(); });
2107 // Check that we properly accounted fiber stack usage.
2108 EXPECT_LT(n * sizeof(int), fm.stackHighWatermark());