2 * Copyright 2017 Facebook, Inc.
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
8 * http://www.apache.org/licenses/LICENSE-2.0
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
20 #include <folly/Memory.h>
21 #include <folly/Random.h>
22 #include <folly/futures/Future.h>
24 #include <folly/Conv.h>
25 #include <folly/fibers/AddTasks.h>
26 #include <folly/fibers/AtomicBatchDispatcher.h>
27 #include <folly/fibers/BatchDispatcher.h>
28 #include <folly/fibers/EventBaseLoopController.h>
29 #include <folly/fibers/FiberManager.h>
30 #include <folly/fibers/FiberManagerMap.h>
31 #include <folly/fibers/GenericBaton.h>
32 #include <folly/fibers/Semaphore.h>
33 #include <folly/fibers/SimpleLoopController.h>
34 #include <folly/fibers/TimedMutex.h>
35 #include <folly/fibers/WhenN.h>
36 #include <folly/io/async/Request.h>
37 #include <folly/io/async/ScopedEventBaseThread.h>
38 #include <folly/portability/GTest.h>
40 using namespace folly::fibers;
44 TEST(FiberManager, batonTimedWaitTimeout) {
45 bool taskAdded = false;
46 size_t iterations = 0;
48 FiberManager manager(std::make_unique<SimpleLoopController>());
49 auto& loopController =
50 dynamic_cast<SimpleLoopController&>(manager.loopController());
52 auto loopFunc = [&]() {
54 manager.addTask([&]() {
57 auto res = baton.timed_wait(std::chrono::milliseconds(230));
60 EXPECT_EQ(5, iterations);
62 loopController.stop();
64 manager.addTask([&]() {
67 auto res = baton.timed_wait(std::chrono::milliseconds(130));
70 EXPECT_EQ(3, iterations);
72 loopController.stop();
76 std::this_thread::sleep_for(std::chrono::milliseconds(50));
81 loopController.loop(std::move(loopFunc));
84 TEST(FiberManager, batonTimedWaitPost) {
85 bool taskAdded = false;
86 size_t iterations = 0;
89 FiberManager manager(std::make_unique<SimpleLoopController>());
90 auto& loopController =
91 dynamic_cast<SimpleLoopController&>(manager.loopController());
93 auto loopFunc = [&]() {
95 manager.addTask([&]() {
99 auto res = baton.timed_wait(std::chrono::milliseconds(130));
102 EXPECT_EQ(2, iterations);
104 loopController.stop();
108 std::this_thread::sleep_for(std::chrono::milliseconds(50));
110 if (iterations == 2) {
116 loopController.loop(std::move(loopFunc));
119 TEST(FiberManager, batonTimedWaitTimeoutEvb) {
120 size_t tasksComplete = 0;
122 folly::EventBase evb;
124 FiberManager manager(std::make_unique<EventBaseLoopController>());
125 dynamic_cast<EventBaseLoopController&>(manager.loopController())
126 .attachEventBase(evb);
128 auto task = [&](size_t timeout_ms) {
131 auto start = EventBaseLoopController::Clock::now();
132 auto res = baton.timed_wait(std::chrono::milliseconds(timeout_ms));
133 auto finish = EventBaseLoopController::Clock::now();
138 std::chrono::duration_cast<std::chrono::milliseconds>(finish - start);
140 EXPECT_GT(duration_ms.count(), timeout_ms - 50);
141 EXPECT_LT(duration_ms.count(), timeout_ms + 50);
143 if (++tasksComplete == 2) {
144 evb.terminateLoopSoon();
148 evb.runInEventBaseThread([&]() {
149 manager.addTask([&]() { task(500); });
150 manager.addTask([&]() { task(250); });
155 EXPECT_EQ(2, tasksComplete);
158 TEST(FiberManager, batonTimedWaitPostEvb) {
159 size_t tasksComplete = 0;
161 folly::EventBase evb;
163 FiberManager manager(std::make_unique<EventBaseLoopController>());
164 dynamic_cast<EventBaseLoopController&>(manager.loopController())
165 .attachEventBase(evb);
167 evb.runInEventBaseThread([&]() {
168 manager.addTask([&]() {
171 evb.tryRunAfterDelay([&]() { baton.post(); }, 100);
173 auto start = EventBaseLoopController::Clock::now();
174 auto res = baton.timed_wait(std::chrono::milliseconds(130));
175 auto finish = EventBaseLoopController::Clock::now();
180 std::chrono::duration_cast<std::chrono::milliseconds>(finish - start);
182 EXPECT_TRUE(duration_ms.count() > 95 && duration_ms.count() < 110);
184 if (++tasksComplete == 1) {
185 evb.terminateLoopSoon();
192 EXPECT_EQ(1, tasksComplete);
195 TEST(FiberManager, batonTryWait) {
196 FiberManager manager(std::make_unique<SimpleLoopController>());
198 // Check if try_wait and post work as expected
201 manager.addTask([&]() {
202 while (!b.try_wait()) {
205 auto thr = std::thread([&]() {
206 std::this_thread::sleep_for(std::chrono::milliseconds(300));
210 manager.loopUntilNoReady();
215 // Check try_wait without post
216 manager.addTask([&]() {
218 while (cnt && !c.try_wait()) {
221 EXPECT_TRUE(!c.try_wait()); // must still hold
225 manager.loopUntilNoReady();
228 TEST(FiberManager, genericBatonFiberWait) {
229 FiberManager manager(std::make_unique<SimpleLoopController>());
232 bool fiberRunning = false;
234 manager.addTask([&]() {
235 EXPECT_EQ(manager.hasActiveFiber(), true);
238 fiberRunning = false;
241 EXPECT_FALSE(fiberRunning);
242 manager.loopUntilNoReady();
243 EXPECT_TRUE(fiberRunning); // ensure fiber still active
245 auto thr = std::thread([&]() {
246 std::this_thread::sleep_for(std::chrono::milliseconds(300));
250 while (fiberRunning) {
251 manager.loopUntilNoReady();
257 TEST(FiberManager, genericBatonThreadWait) {
258 FiberManager manager(std::make_unique<SimpleLoopController>());
260 std::atomic<bool> threadWaiting(false);
262 auto thr = std::thread([&]() {
263 threadWaiting = true;
265 threadWaiting = false;
268 while (!threadWaiting) {
270 std::this_thread::sleep_for(std::chrono::milliseconds(300));
272 manager.addTask([&]() {
273 EXPECT_EQ(manager.hasActiveFiber(), true);
274 EXPECT_TRUE(threadWaiting);
276 while (threadWaiting) {
280 manager.loopUntilNoReady();
284 TEST(FiberManager, addTasksNoncopyable) {
285 std::vector<Promise<int>> pendingFibers;
286 bool taskAdded = false;
288 FiberManager manager(std::make_unique<SimpleLoopController>());
289 auto& loopController =
290 dynamic_cast<SimpleLoopController&>(manager.loopController());
292 auto loopFunc = [&]() {
294 manager.addTask([&]() {
295 std::vector<std::function<std::unique_ptr<int>()>> funcs;
296 for (int i = 0; i < 3; ++i) {
297 funcs.push_back([i, &pendingFibers]() {
298 await([&pendingFibers](Promise<int> promise) {
299 pendingFibers.push_back(std::move(promise));
301 return std::make_unique<int>(i * 2 + 1);
305 auto iter = addTasks(funcs.begin(), funcs.end());
308 while (iter.hasNext()) {
309 auto result = iter.awaitNext();
310 EXPECT_EQ(2 * iter.getTaskID() + 1, *result);
311 EXPECT_GE(2 - n, pendingFibers.size());
317 } else if (pendingFibers.size()) {
318 pendingFibers.back().setValue(0);
319 pendingFibers.pop_back();
321 loopController.stop();
325 loopController.loop(std::move(loopFunc));
328 TEST(FiberManager, awaitThrow) {
329 folly::EventBase evb;
330 struct ExpectedException {};
334 await([](Promise<int> p) {
336 throw ExpectedException();
341 await([&](Promise<int> p) {
342 evb.runInEventBaseThread([p = std::move(p)]() mutable {
345 throw ExpectedException();
352 TEST(FiberManager, addTasksThrow) {
353 std::vector<Promise<int>> pendingFibers;
354 bool taskAdded = false;
356 FiberManager manager(std::make_unique<SimpleLoopController>());
357 auto& loopController =
358 dynamic_cast<SimpleLoopController&>(manager.loopController());
360 auto loopFunc = [&]() {
362 manager.addTask([&]() {
363 std::vector<std::function<int()>> funcs;
364 for (size_t i = 0; i < 3; ++i) {
365 funcs.push_back([i, &pendingFibers]() {
366 await([&pendingFibers](Promise<int> promise) {
367 pendingFibers.push_back(std::move(promise));
370 throw std::runtime_error("Runtime");
376 auto iter = addTasks(funcs.begin(), funcs.end());
379 while (iter.hasNext()) {
381 int result = iter.awaitNext();
382 EXPECT_EQ(1, iter.getTaskID() % 2);
383 EXPECT_EQ(2 * iter.getTaskID() + 1, result);
385 EXPECT_EQ(0, iter.getTaskID() % 2);
387 EXPECT_GE(2 - n, pendingFibers.size());
393 } else if (pendingFibers.size()) {
394 pendingFibers.back().setValue(0);
395 pendingFibers.pop_back();
397 loopController.stop();
401 loopController.loop(std::move(loopFunc));
404 TEST(FiberManager, addTasksVoid) {
405 std::vector<Promise<int>> pendingFibers;
406 bool taskAdded = false;
408 FiberManager manager(std::make_unique<SimpleLoopController>());
409 auto& loopController =
410 dynamic_cast<SimpleLoopController&>(manager.loopController());
412 auto loopFunc = [&]() {
414 manager.addTask([&]() {
415 std::vector<std::function<void()>> funcs;
416 for (size_t i = 0; i < 3; ++i) {
417 funcs.push_back([&pendingFibers]() {
418 await([&pendingFibers](Promise<int> promise) {
419 pendingFibers.push_back(std::move(promise));
424 auto iter = addTasks(funcs.begin(), funcs.end());
427 while (iter.hasNext()) {
429 EXPECT_GE(2 - n, pendingFibers.size());
435 } else if (pendingFibers.size()) {
436 pendingFibers.back().setValue(0);
437 pendingFibers.pop_back();
439 loopController.stop();
443 loopController.loop(std::move(loopFunc));
446 TEST(FiberManager, addTasksVoidThrow) {
447 std::vector<Promise<int>> pendingFibers;
448 bool taskAdded = false;
450 FiberManager manager(std::make_unique<SimpleLoopController>());
451 auto& loopController =
452 dynamic_cast<SimpleLoopController&>(manager.loopController());
454 auto loopFunc = [&]() {
456 manager.addTask([&]() {
457 std::vector<std::function<void()>> funcs;
458 for (size_t i = 0; i < 3; ++i) {
459 funcs.push_back([i, &pendingFibers]() {
460 await([&pendingFibers](Promise<int> promise) {
461 pendingFibers.push_back(std::move(promise));
464 throw std::runtime_error("");
469 auto iter = addTasks(funcs.begin(), funcs.end());
472 while (iter.hasNext()) {
475 EXPECT_EQ(1, iter.getTaskID() % 2);
477 EXPECT_EQ(0, iter.getTaskID() % 2);
479 EXPECT_GE(2 - n, pendingFibers.size());
485 } else if (pendingFibers.size()) {
486 pendingFibers.back().setValue(0);
487 pendingFibers.pop_back();
489 loopController.stop();
493 loopController.loop(std::move(loopFunc));
496 TEST(FiberManager, addTasksReserve) {
497 std::vector<Promise<int>> pendingFibers;
498 bool taskAdded = false;
500 FiberManager manager(std::make_unique<SimpleLoopController>());
501 auto& loopController =
502 dynamic_cast<SimpleLoopController&>(manager.loopController());
504 auto loopFunc = [&]() {
506 manager.addTask([&]() {
507 std::vector<std::function<void()>> funcs;
508 for (size_t i = 0; i < 3; ++i) {
509 funcs.push_back([&pendingFibers]() {
510 await([&pendingFibers](Promise<int> promise) {
511 pendingFibers.push_back(std::move(promise));
516 auto iter = addTasks(funcs.begin(), funcs.end());
519 EXPECT_TRUE(iter.hasCompleted());
520 EXPECT_TRUE(iter.hasPending());
521 EXPECT_TRUE(iter.hasNext());
524 EXPECT_TRUE(iter.hasCompleted());
525 EXPECT_TRUE(iter.hasPending());
526 EXPECT_TRUE(iter.hasNext());
529 EXPECT_FALSE(iter.hasCompleted());
530 EXPECT_TRUE(iter.hasPending());
531 EXPECT_TRUE(iter.hasNext());
534 EXPECT_FALSE(iter.hasCompleted());
535 EXPECT_FALSE(iter.hasPending());
536 EXPECT_FALSE(iter.hasNext());
539 } else if (pendingFibers.size()) {
540 pendingFibers.back().setValue(0);
541 pendingFibers.pop_back();
543 loopController.stop();
547 loopController.loop(std::move(loopFunc));
550 TEST(FiberManager, addTaskDynamic) {
551 folly::EventBase evb;
555 auto makeTask = [&](size_t taskId) {
556 return [&, taskId]() -> size_t {
557 batons[taskId].wait();
563 .addTaskFuture([&]() {
564 TaskIterator<size_t> iterator;
566 iterator.addTask(makeTask(0));
567 iterator.addTask(makeTask(1));
571 EXPECT_EQ(1, iterator.awaitNext());
573 iterator.addTask(makeTask(2));
577 EXPECT_EQ(2, iterator.awaitNext());
581 EXPECT_EQ(0, iterator.awaitNext());
586 TEST(FiberManager, forEach) {
587 std::vector<Promise<int>> pendingFibers;
588 bool taskAdded = false;
590 FiberManager manager(std::make_unique<SimpleLoopController>());
591 auto& loopController =
592 dynamic_cast<SimpleLoopController&>(manager.loopController());
594 auto loopFunc = [&]() {
596 manager.addTask([&]() {
597 std::vector<std::function<int()>> funcs;
598 for (size_t i = 0; i < 3; ++i) {
599 funcs.push_back([i, &pendingFibers]() {
600 await([&pendingFibers](Promise<int> promise) {
601 pendingFibers.push_back(std::move(promise));
607 std::vector<std::pair<size_t, int>> results;
608 forEach(funcs.begin(), funcs.end(), [&results](size_t id, int result) {
609 results.emplace_back(id, result);
611 EXPECT_EQ(3, results.size());
612 EXPECT_TRUE(pendingFibers.empty());
613 for (size_t i = 0; i < 3; ++i) {
614 EXPECT_EQ(results[i].first * 2 + 1, results[i].second);
618 } else if (pendingFibers.size()) {
619 pendingFibers.back().setValue(0);
620 pendingFibers.pop_back();
622 loopController.stop();
626 loopController.loop(std::move(loopFunc));
629 TEST(FiberManager, collectN) {
630 std::vector<Promise<int>> pendingFibers;
631 bool taskAdded = false;
633 FiberManager manager(std::make_unique<SimpleLoopController>());
634 auto& loopController =
635 dynamic_cast<SimpleLoopController&>(manager.loopController());
637 auto loopFunc = [&]() {
639 manager.addTask([&]() {
640 std::vector<std::function<int()>> funcs;
641 for (size_t i = 0; i < 3; ++i) {
642 funcs.push_back([i, &pendingFibers]() {
643 await([&pendingFibers](Promise<int> promise) {
644 pendingFibers.push_back(std::move(promise));
650 auto results = collectN(funcs.begin(), funcs.end(), 2);
651 EXPECT_EQ(2, results.size());
652 EXPECT_EQ(1, pendingFibers.size());
653 for (size_t i = 0; i < 2; ++i) {
654 EXPECT_EQ(results[i].first * 2 + 1, results[i].second);
658 } else if (pendingFibers.size()) {
659 pendingFibers.back().setValue(0);
660 pendingFibers.pop_back();
662 loopController.stop();
666 loopController.loop(std::move(loopFunc));
669 TEST(FiberManager, collectNThrow) {
670 std::vector<Promise<int>> pendingFibers;
671 bool taskAdded = false;
673 FiberManager manager(std::make_unique<SimpleLoopController>());
674 auto& loopController =
675 dynamic_cast<SimpleLoopController&>(manager.loopController());
677 auto loopFunc = [&]() {
679 manager.addTask([&]() {
680 std::vector<std::function<int()>> funcs;
681 for (size_t i = 0; i < 3; ++i) {
682 funcs.push_back([&pendingFibers]() -> size_t {
683 await([&pendingFibers](Promise<int> promise) {
684 pendingFibers.push_back(std::move(promise));
686 throw std::runtime_error("Runtime");
691 collectN(funcs.begin(), funcs.end(), 2);
693 EXPECT_EQ(1, pendingFibers.size());
697 } else if (pendingFibers.size()) {
698 pendingFibers.back().setValue(0);
699 pendingFibers.pop_back();
701 loopController.stop();
705 loopController.loop(std::move(loopFunc));
708 TEST(FiberManager, collectNVoid) {
709 std::vector<Promise<int>> pendingFibers;
710 bool taskAdded = false;
712 FiberManager manager(std::make_unique<SimpleLoopController>());
713 auto& loopController =
714 dynamic_cast<SimpleLoopController&>(manager.loopController());
716 auto loopFunc = [&]() {
718 manager.addTask([&]() {
719 std::vector<std::function<void()>> funcs;
720 for (size_t i = 0; i < 3; ++i) {
721 funcs.push_back([&pendingFibers]() {
722 await([&pendingFibers](Promise<int> promise) {
723 pendingFibers.push_back(std::move(promise));
728 auto results = collectN(funcs.begin(), funcs.end(), 2);
729 EXPECT_EQ(2, results.size());
730 EXPECT_EQ(1, pendingFibers.size());
733 } else if (pendingFibers.size()) {
734 pendingFibers.back().setValue(0);
735 pendingFibers.pop_back();
737 loopController.stop();
741 loopController.loop(std::move(loopFunc));
744 TEST(FiberManager, collectNVoidThrow) {
745 std::vector<Promise<int>> pendingFibers;
746 bool taskAdded = false;
748 FiberManager manager(std::make_unique<SimpleLoopController>());
749 auto& loopController =
750 dynamic_cast<SimpleLoopController&>(manager.loopController());
752 auto loopFunc = [&]() {
754 manager.addTask([&]() {
755 std::vector<std::function<void()>> funcs;
756 for (size_t i = 0; i < 3; ++i) {
757 funcs.push_back([&pendingFibers]() {
758 await([&pendingFibers](Promise<int> promise) {
759 pendingFibers.push_back(std::move(promise));
761 throw std::runtime_error("Runtime");
766 collectN(funcs.begin(), funcs.end(), 2);
768 EXPECT_EQ(1, pendingFibers.size());
772 } else if (pendingFibers.size()) {
773 pendingFibers.back().setValue(0);
774 pendingFibers.pop_back();
776 loopController.stop();
780 loopController.loop(std::move(loopFunc));
783 TEST(FiberManager, collectAll) {
784 std::vector<Promise<int>> pendingFibers;
785 bool taskAdded = false;
787 FiberManager manager(std::make_unique<SimpleLoopController>());
788 auto& loopController =
789 dynamic_cast<SimpleLoopController&>(manager.loopController());
791 auto loopFunc = [&]() {
793 manager.addTask([&]() {
794 std::vector<std::function<int()>> funcs;
795 for (size_t i = 0; i < 3; ++i) {
796 funcs.push_back([i, &pendingFibers]() {
797 await([&pendingFibers](Promise<int> promise) {
798 pendingFibers.push_back(std::move(promise));
804 auto results = collectAll(funcs.begin(), funcs.end());
805 EXPECT_TRUE(pendingFibers.empty());
806 for (size_t i = 0; i < 3; ++i) {
807 EXPECT_EQ(i * 2 + 1, results[i]);
811 } else if (pendingFibers.size()) {
812 pendingFibers.back().setValue(0);
813 pendingFibers.pop_back();
815 loopController.stop();
819 loopController.loop(std::move(loopFunc));
822 TEST(FiberManager, collectAllVoid) {
823 std::vector<Promise<int>> pendingFibers;
824 bool taskAdded = false;
826 FiberManager manager(std::make_unique<SimpleLoopController>());
827 auto& loopController =
828 dynamic_cast<SimpleLoopController&>(manager.loopController());
830 auto loopFunc = [&]() {
832 manager.addTask([&]() {
833 std::vector<std::function<void()>> funcs;
834 for (size_t i = 0; i < 3; ++i) {
835 funcs.push_back([&pendingFibers]() {
836 await([&pendingFibers](Promise<int> promise) {
837 pendingFibers.push_back(std::move(promise));
842 collectAll(funcs.begin(), funcs.end());
843 EXPECT_TRUE(pendingFibers.empty());
846 } else if (pendingFibers.size()) {
847 pendingFibers.back().setValue(0);
848 pendingFibers.pop_back();
850 loopController.stop();
854 loopController.loop(std::move(loopFunc));
857 TEST(FiberManager, collectAny) {
858 std::vector<Promise<int>> pendingFibers;
859 bool taskAdded = false;
861 FiberManager manager(std::make_unique<SimpleLoopController>());
862 auto& loopController =
863 dynamic_cast<SimpleLoopController&>(manager.loopController());
865 auto loopFunc = [&]() {
867 manager.addTask([&]() {
868 std::vector<std::function<int()>> funcs;
869 for (size_t i = 0; i < 3; ++i) {
870 funcs.push_back([i, &pendingFibers]() {
871 await([&pendingFibers](Promise<int> promise) {
872 pendingFibers.push_back(std::move(promise));
875 throw std::runtime_error("This exception will be ignored");
881 auto result = collectAny(funcs.begin(), funcs.end());
882 EXPECT_EQ(2, pendingFibers.size());
883 EXPECT_EQ(2, result.first);
884 EXPECT_EQ(2 * 2 + 1, result.second);
887 } else if (pendingFibers.size()) {
888 pendingFibers.back().setValue(0);
889 pendingFibers.pop_back();
891 loopController.stop();
895 loopController.loop(std::move(loopFunc));
899 /* Checks that this function was run from a main context,
900 by comparing an address on a stack to a known main stack address
901 and a known related fiber stack address. The assumption
902 is that fiber stack and main stack will be far enough apart,
903 while any two values on the same stack will be close. */
904 void expectMainContext(bool& ran, int* mainLocation, int* fiberLocation) {
906 /* 2 pages is a good guess */
907 constexpr ssize_t DISTANCE = 0x2000 / sizeof(int);
909 EXPECT_TRUE(std::abs(&here - fiberLocation) > DISTANCE);
912 EXPECT_TRUE(std::abs(&here - mainLocation) < DISTANCE);
920 TEST(FiberManager, runInMainContext) {
921 FiberManager manager(std::make_unique<SimpleLoopController>());
922 auto& loopController =
923 dynamic_cast<SimpleLoopController&>(manager.loopController());
925 bool checkRan = false;
928 manager.runInMainContext(
929 [&]() { expectMainContext(checkRan, &mainLocation, nullptr); });
930 EXPECT_TRUE(checkRan);
934 manager.addTask([&]() {
936 explicit A(int value_) : value(value_) {}
937 A(const A&) = delete;
943 auto ret = runInMainContext([&]() {
944 expectMainContext(checkRan, &mainLocation, &stackLocation);
947 EXPECT_TRUE(checkRan);
948 EXPECT_EQ(42, ret.value);
951 loopController.loop([&]() { loopController.stop(); });
953 EXPECT_TRUE(checkRan);
956 TEST(FiberManager, addTaskFinally) {
957 FiberManager manager(std::make_unique<SimpleLoopController>());
958 auto& loopController =
959 dynamic_cast<SimpleLoopController&>(manager.loopController());
961 bool checkRan = false;
965 manager.addTaskFinally(
966 [&]() { return 1234; },
967 [&](Try<int>&& result) {
968 EXPECT_EQ(result.value(), 1234);
970 expectMainContext(checkRan, &mainLocation, nullptr);
973 EXPECT_FALSE(checkRan);
975 loopController.loop([&]() { loopController.stop(); });
977 EXPECT_TRUE(checkRan);
980 TEST(FiberManager, fibersPoolWithinLimit) {
981 FiberManager::Options opts;
982 opts.maxFibersPoolSize = 5;
984 FiberManager manager(std::make_unique<SimpleLoopController>(), opts);
985 auto& loopController =
986 dynamic_cast<SimpleLoopController&>(manager.loopController());
988 size_t fibersRun = 0;
990 for (size_t i = 0; i < 5; ++i) {
991 manager.addTask([&]() { ++fibersRun; });
993 loopController.loop([&]() { loopController.stop(); });
995 EXPECT_EQ(5, fibersRun);
996 EXPECT_EQ(5, manager.fibersAllocated());
997 EXPECT_EQ(5, manager.fibersPoolSize());
999 for (size_t i = 0; i < 5; ++i) {
1000 manager.addTask([&]() { ++fibersRun; });
1002 loopController.loop([&]() { loopController.stop(); });
1004 EXPECT_EQ(10, fibersRun);
1005 EXPECT_EQ(5, manager.fibersAllocated());
1006 EXPECT_EQ(5, manager.fibersPoolSize());
1009 TEST(FiberManager, fibersPoolOverLimit) {
1010 FiberManager::Options opts;
1011 opts.maxFibersPoolSize = 5;
1013 FiberManager manager(std::make_unique<SimpleLoopController>(), opts);
1014 auto& loopController =
1015 dynamic_cast<SimpleLoopController&>(manager.loopController());
1017 size_t fibersRun = 0;
1019 for (size_t i = 0; i < 10; ++i) {
1020 manager.addTask([&]() { ++fibersRun; });
1023 EXPECT_EQ(0, fibersRun);
1024 EXPECT_EQ(10, manager.fibersAllocated());
1025 EXPECT_EQ(0, manager.fibersPoolSize());
1027 loopController.loop([&]() { loopController.stop(); });
1029 EXPECT_EQ(10, fibersRun);
1030 EXPECT_EQ(5, manager.fibersAllocated());
1031 EXPECT_EQ(5, manager.fibersPoolSize());
1034 TEST(FiberManager, remoteFiberBasic) {
1035 FiberManager manager(std::make_unique<SimpleLoopController>());
1036 auto& loopController =
1037 dynamic_cast<SimpleLoopController&>(manager.loopController());
1040 result[0] = result[1] = 0;
1041 folly::Optional<Promise<int>> savedPromise[2];
1042 manager.addTask([&]() {
1044 [&](Promise<int> promise) { savedPromise[0] = std::move(promise); });
1046 manager.addTask([&]() {
1048 [&](Promise<int> promise) { savedPromise[1] = std::move(promise); });
1051 manager.loopUntilNoReady();
1053 EXPECT_TRUE(savedPromise[0].hasValue());
1054 EXPECT_TRUE(savedPromise[1].hasValue());
1055 EXPECT_EQ(0, result[0]);
1056 EXPECT_EQ(0, result[1]);
1058 std::thread remoteThread0{[&]() { savedPromise[0]->setValue(42); }};
1059 std::thread remoteThread1{[&]() { savedPromise[1]->setValue(43); }};
1060 remoteThread0.join();
1061 remoteThread1.join();
1062 EXPECT_EQ(0, result[0]);
1063 EXPECT_EQ(0, result[1]);
1064 /* Should only have scheduled once */
1065 EXPECT_EQ(1, loopController.remoteScheduleCalled());
1067 manager.loopUntilNoReady();
1068 EXPECT_EQ(42, result[0]);
1069 EXPECT_EQ(43, result[1]);
1072 TEST(FiberManager, addTaskRemoteBasic) {
1073 FiberManager manager(std::make_unique<SimpleLoopController>());
1076 result[0] = result[1] = 0;
1077 folly::Optional<Promise<int>> savedPromise[2];
1079 std::thread remoteThread0{[&]() {
1080 manager.addTaskRemote([&]() {
1082 [&](Promise<int> promise) { savedPromise[0] = std::move(promise); });
1085 std::thread remoteThread1{[&]() {
1086 manager.addTaskRemote([&]() {
1088 [&](Promise<int> promise) { savedPromise[1] = std::move(promise); });
1091 remoteThread0.join();
1092 remoteThread1.join();
1094 manager.loopUntilNoReady();
1096 EXPECT_TRUE(savedPromise[0].hasValue());
1097 EXPECT_TRUE(savedPromise[1].hasValue());
1098 EXPECT_EQ(0, result[0]);
1099 EXPECT_EQ(0, result[1]);
1101 savedPromise[0]->setValue(42);
1102 savedPromise[1]->setValue(43);
1104 EXPECT_EQ(0, result[0]);
1105 EXPECT_EQ(0, result[1]);
1107 manager.loopUntilNoReady();
1108 EXPECT_EQ(42, result[0]);
1109 EXPECT_EQ(43, result[1]);
1112 TEST(FiberManager, remoteHasTasks) {
1114 FiberManager fm(std::make_unique<SimpleLoopController>());
1115 std::thread remote([&]() { fm.addTaskRemote([&]() { ++counter; }); });
1119 while (fm.hasTasks()) {
1120 fm.loopUntilNoReady();
1123 EXPECT_FALSE(fm.hasTasks());
1124 EXPECT_EQ(counter, 1);
1127 TEST(FiberManager, remoteHasReadyTasks) {
1129 folly::Optional<Promise<int>> savedPromise;
1130 FiberManager fm(std::make_unique<SimpleLoopController>());
1131 std::thread remote([&]() {
1132 fm.addTaskRemote([&]() {
1134 [&](Promise<int> promise) { savedPromise = std::move(promise); });
1135 EXPECT_TRUE(fm.hasTasks());
1140 EXPECT_TRUE(fm.hasTasks());
1142 fm.loopUntilNoReady();
1143 EXPECT_TRUE(fm.hasTasks());
1145 std::thread remote2([&]() { savedPromise->setValue(47); });
1147 EXPECT_TRUE(fm.hasTasks());
1149 fm.loopUntilNoReady();
1150 EXPECT_FALSE(fm.hasTasks());
1152 EXPECT_EQ(result, 47);
1155 template <typename Data>
1156 void testFiberLocal() {
1157 FiberManager fm(LocalType<Data>(), std::make_unique<SimpleLoopController>());
1160 EXPECT_EQ(42, local<Data>().value);
1162 local<Data>().value = 43;
1165 EXPECT_EQ(43, local<Data>().value);
1167 local<Data>().value = 44;
1169 addTask([]() { EXPECT_EQ(44, local<Data>().value); });
1174 EXPECT_EQ(42, local<Data>().value);
1176 local<Data>().value = 43;
1178 fm.addTaskRemote([]() { EXPECT_EQ(43, local<Data>().value); });
1182 EXPECT_EQ(42, local<Data>().value);
1183 local<Data>().value = 43;
1186 EXPECT_EQ(43, local<Data>().value);
1187 local<Data>().value = 44;
1189 std::vector<std::function<void()>> tasks{task};
1190 collectAny(tasks.begin(), tasks.end());
1192 EXPECT_EQ(43, local<Data>().value);
1195 fm.loopUntilNoReady();
1196 EXPECT_FALSE(fm.hasTasks());
1199 TEST(FiberManager, fiberLocal) {
1204 testFiberLocal<SimpleData>();
1207 TEST(FiberManager, fiberLocalHeap) {
1209 char _[1024 * 1024];
1213 testFiberLocal<LargeData>();
1216 TEST(FiberManager, fiberLocalDestructor) {
1223 EXPECT_EQ(42, local<CrazyData>().data);
1224 // Make sure we don't have infinite loop
1225 local<CrazyData>().data = 0;
1232 LocalType<CrazyData>(), std::make_unique<SimpleLoopController>());
1234 fm.addTask([]() { local<CrazyData>().data = 41; });
1236 fm.loopUntilNoReady();
1237 EXPECT_FALSE(fm.hasTasks());
1240 TEST(FiberManager, fiberRequestContext) {
1241 folly::EventBase evb;
1242 FiberManager fm(std::make_unique<EventBaseLoopController>());
1243 dynamic_cast<EventBaseLoopController&>(fm.loopController())
1244 .attachEventBase(evb);
1246 struct TestContext : public folly::RequestData {
1247 explicit TestContext(std::string s) : data(std::move(s)) {}
1251 class AfterFibersCallback : public folly::EventBase::LoopCallback {
1253 AfterFibersCallback(
1254 folly::EventBase& evb,
1255 const bool& fibersDone,
1256 folly::Function<void()> afterFibersFunc)
1258 fibersDone_(fibersDone),
1259 afterFibersFunc_(std::move(afterFibersFunc)) {}
1261 void runLoopCallback() noexcept override {
1266 evb_.runInLoop(this);
1271 folly::EventBase& evb_;
1272 const bool& fibersDone_;
1273 folly::Function<void()> afterFibersFunc_;
1276 bool fibersDone = false;
1277 size_t tasksRun = 0;
1278 evb.runInEventBaseThread([&evb, &fm, &tasksRun, &fibersDone]() {
1280 auto* const evbCtx = folly::RequestContext::get();
1281 EXPECT_NE(nullptr, evbCtx);
1282 EXPECT_EQ(nullptr, evbCtx->getContextData("key"));
1283 evbCtx->setContextData("key", std::make_unique<TestContext>("evb_value"));
1285 // This callback allows us to check that FiberManager has restored the
1286 // RequestContext provider as expected after a fiber loop.
1287 auto* afterFibersCallback =
1288 new AfterFibersCallback(evb, fibersDone, [&tasksRun, evbCtx]() {
1290 EXPECT_EQ(evbCtx, folly::RequestContext::get());
1293 dynamic_cast<TestContext*>(evbCtx->getContextData("key"))->data);
1295 evb.runInLoop(afterFibersCallback);
1297 // Launching a fiber allows us to hit FiberManager RequestContext
1298 // setup/teardown logic.
1299 fm.addTask([&evb, &tasksRun, &fibersDone, evbCtx]() {
1302 // Initially, fiber starts with same RequestContext as its parent task.
1303 EXPECT_EQ(evbCtx, folly::RequestContext::get());
1304 EXPECT_NE(nullptr, evbCtx->getContextData("key"));
1307 dynamic_cast<TestContext*>(evbCtx->getContextData("key"))->data);
1309 // Create a new RequestContext for this fiber so we can distinguish from
1310 // RequestContext first EventBase callback started with.
1311 folly::RequestContext::create();
1312 auto* const fiberCtx = folly::RequestContext::get();
1313 EXPECT_NE(nullptr, fiberCtx);
1314 EXPECT_EQ(nullptr, fiberCtx->getContextData("key"));
1315 fiberCtx->setContextData(
1316 "key", std::make_unique<TestContext>("fiber_value"));
1318 // Task launched from within fiber should share current fiber's
1320 evb.runInEventBaseThread([&tasksRun, fiberCtx]() {
1322 auto* const evbCtx2 = folly::RequestContext::get();
1323 EXPECT_EQ(fiberCtx, evbCtx2);
1324 EXPECT_NE(nullptr, evbCtx2->getContextData("key"));
1327 dynamic_cast<TestContext*>(evbCtx2->getContextData("key"))->data);
1336 EXPECT_EQ(4, tasksRun);
1337 EXPECT_TRUE(fibersDone);
1338 EXPECT_FALSE(fm.hasTasks());
1341 TEST(FiberManager, yieldTest) {
1342 FiberManager manager(std::make_unique<SimpleLoopController>());
1343 auto& loopController =
1344 dynamic_cast<SimpleLoopController&>(manager.loopController());
1346 bool checkRan = false;
1348 manager.addTask([&]() {
1353 loopController.loop([&]() {
1355 loopController.stop();
1359 EXPECT_TRUE(checkRan);
1362 TEST(FiberManager, RequestContext) {
1363 FiberManager fm(std::make_unique<SimpleLoopController>());
1365 bool checkRun1 = false;
1366 bool checkRun2 = false;
1367 bool checkRun3 = false;
1368 bool checkRun4 = false;
1369 folly::fibers::Baton baton1;
1370 folly::fibers::Baton baton2;
1371 folly::fibers::Baton baton3;
1372 folly::fibers::Baton baton4;
1375 folly::RequestContextScopeGuard rctx;
1376 auto rcontext1 = folly::RequestContext::get();
1377 fm.addTask([&, rcontext1]() {
1378 EXPECT_EQ(rcontext1, folly::RequestContext::get());
1380 [&]() { EXPECT_EQ(rcontext1, folly::RequestContext::get()); });
1381 EXPECT_EQ(rcontext1, folly::RequestContext::get());
1383 [&]() { EXPECT_EQ(rcontext1, folly::RequestContext::get()); });
1388 folly::RequestContextScopeGuard rctx;
1389 auto rcontext2 = folly::RequestContext::get();
1390 fm.addTaskRemote([&, rcontext2]() {
1391 EXPECT_EQ(rcontext2, folly::RequestContext::get());
1393 EXPECT_EQ(rcontext2, folly::RequestContext::get());
1398 folly::RequestContextScopeGuard rctx;
1399 auto rcontext3 = folly::RequestContext::get();
1402 EXPECT_EQ(rcontext3, folly::RequestContext::get());
1404 EXPECT_EQ(rcontext3, folly::RequestContext::get());
1406 return folly::Unit();
1408 [&, rcontext3](Try<folly::Unit>&& /* t */) {
1409 EXPECT_EQ(rcontext3, folly::RequestContext::get());
1414 folly::RequestContext::setContext(nullptr);
1416 folly::RequestContextScopeGuard rctx;
1417 auto rcontext4 = folly::RequestContext::get();
1419 EXPECT_EQ(rcontext4, folly::RequestContext::get());
1424 folly::RequestContextScopeGuard rctx;
1425 auto rcontext = folly::RequestContext::get();
1427 fm.loopUntilNoReady();
1428 EXPECT_EQ(rcontext, folly::RequestContext::get());
1431 EXPECT_EQ(rcontext, folly::RequestContext::get());
1432 fm.loopUntilNoReady();
1433 EXPECT_TRUE(checkRun1);
1434 EXPECT_EQ(rcontext, folly::RequestContext::get());
1437 EXPECT_EQ(rcontext, folly::RequestContext::get());
1438 fm.loopUntilNoReady();
1439 EXPECT_TRUE(checkRun2);
1440 EXPECT_EQ(rcontext, folly::RequestContext::get());
1443 EXPECT_EQ(rcontext, folly::RequestContext::get());
1444 fm.loopUntilNoReady();
1445 EXPECT_TRUE(checkRun3);
1446 EXPECT_EQ(rcontext, folly::RequestContext::get());
1449 EXPECT_EQ(rcontext, folly::RequestContext::get());
1450 fm.loopUntilNoReady();
1451 EXPECT_TRUE(checkRun4);
1452 EXPECT_EQ(rcontext, folly::RequestContext::get());
1456 TEST(FiberManager, resizePeriodically) {
1457 FiberManager::Options opts;
1458 opts.fibersPoolResizePeriodMs = 300;
1459 opts.maxFibersPoolSize = 5;
1461 FiberManager manager(std::make_unique<EventBaseLoopController>(), opts);
1463 folly::EventBase evb;
1464 dynamic_cast<EventBaseLoopController&>(manager.loopController())
1465 .attachEventBase(evb);
1467 std::vector<Baton> batons(10);
1469 size_t tasksRun = 0;
1470 for (size_t i = 0; i < 30; ++i) {
1471 manager.addTask([i, &batons, &tasksRun]() {
1473 // Keep some fibers active indefinitely
1474 if (i < batons.size()) {
1480 EXPECT_EQ(0, tasksRun);
1481 EXPECT_EQ(30, manager.fibersAllocated());
1482 EXPECT_EQ(0, manager.fibersPoolSize());
1485 EXPECT_EQ(30, tasksRun);
1486 EXPECT_EQ(30, manager.fibersAllocated());
1487 // Can go over maxFibersPoolSize, 10 of 30 fibers still active
1488 EXPECT_EQ(20, manager.fibersPoolSize());
1490 std::this_thread::sleep_for(std::chrono::milliseconds(400));
1491 evb.loopOnce(); // no fibers active in this period
1492 EXPECT_EQ(30, manager.fibersAllocated());
1493 EXPECT_EQ(20, manager.fibersPoolSize());
1495 std::this_thread::sleep_for(std::chrono::milliseconds(400));
1496 evb.loopOnce(); // should shrink fibers pool to maxFibersPoolSize
1497 EXPECT_EQ(15, manager.fibersAllocated());
1498 EXPECT_EQ(5, manager.fibersPoolSize());
1500 for (size_t i = 0; i < batons.size(); ++i) {
1504 EXPECT_EQ(15, manager.fibersAllocated());
1505 EXPECT_EQ(15, manager.fibersPoolSize());
1507 std::this_thread::sleep_for(std::chrono::milliseconds(400));
1508 evb.loopOnce(); // 10 fibers active in last period
1509 EXPECT_EQ(10, manager.fibersAllocated());
1510 EXPECT_EQ(10, manager.fibersPoolSize());
1512 std::this_thread::sleep_for(std::chrono::milliseconds(400));
1514 EXPECT_EQ(5, manager.fibersAllocated());
1515 EXPECT_EQ(5, manager.fibersPoolSize());
1518 TEST(FiberManager, batonWaitTimeoutHandler) {
1519 FiberManager manager(std::make_unique<EventBaseLoopController>());
1521 folly::EventBase evb;
1522 dynamic_cast<EventBaseLoopController&>(manager.loopController())
1523 .attachEventBase(evb);
1525 size_t fibersRun = 0;
1527 Baton::TimeoutHandler timeoutHandler;
1529 manager.addTask([&]() {
1530 baton.wait(timeoutHandler);
1533 manager.loopUntilNoReady();
1535 EXPECT_FALSE(baton.try_wait());
1536 EXPECT_EQ(0, fibersRun);
1538 timeoutHandler.scheduleTimeout(std::chrono::milliseconds(250));
1539 std::this_thread::sleep_for(std::chrono::milliseconds(500));
1541 EXPECT_FALSE(baton.try_wait());
1542 EXPECT_EQ(0, fibersRun);
1545 manager.loopUntilNoReady();
1547 EXPECT_EQ(1, fibersRun);
1550 TEST(FiberManager, batonWaitTimeoutMany) {
1551 FiberManager manager(std::make_unique<EventBaseLoopController>());
1553 folly::EventBase evb;
1554 dynamic_cast<EventBaseLoopController&>(manager.loopController())
1555 .attachEventBase(evb);
1557 constexpr size_t kNumTimeoutTasks = 10000;
1558 size_t tasksCount = kNumTimeoutTasks;
1560 // We add many tasks to hit timeout queue deallocation logic.
1561 for (size_t i = 0; i < kNumTimeoutTasks; ++i) {
1562 manager.addTask([&]() {
1564 Baton::TimeoutHandler timeoutHandler;
1566 folly::fibers::addTask([&] {
1567 timeoutHandler.scheduleTimeout(std::chrono::milliseconds(1000));
1570 baton.wait(timeoutHandler);
1571 if (--tasksCount == 0) {
1572 evb.terminateLoopSoon();
1580 TEST(FiberManager, remoteFutureTest) {
1581 FiberManager fiberManager(std::make_unique<SimpleLoopController>());
1582 auto& loopController =
1583 dynamic_cast<SimpleLoopController&>(fiberManager.loopController());
1587 auto f1 = fiberManager.addTaskFuture([&]() { return testValue1; });
1588 auto f2 = fiberManager.addTaskRemoteFuture([&]() { return testValue2; });
1589 loopController.loop([&]() { loopController.stop(); });
1593 EXPECT_EQ(v1, testValue1);
1594 EXPECT_EQ(v2, testValue2);
1597 // Test that a void function produes a Future<Unit>.
1598 TEST(FiberManager, remoteFutureVoidUnitTest) {
1599 FiberManager fiberManager(std::make_unique<SimpleLoopController>());
1600 auto& loopController =
1601 dynamic_cast<SimpleLoopController&>(fiberManager.loopController());
1603 bool ranLocal = false;
1604 folly::Future<folly::Unit> futureLocal =
1605 fiberManager.addTaskFuture([&]() { ranLocal = true; });
1607 bool ranRemote = false;
1608 folly::Future<folly::Unit> futureRemote =
1609 fiberManager.addTaskRemoteFuture([&]() { ranRemote = true; });
1611 loopController.loop([&]() { loopController.stop(); });
1614 ASSERT_TRUE(ranLocal);
1616 futureRemote.wait();
1617 ASSERT_TRUE(ranRemote);
1620 TEST(FiberManager, nestedFiberManagers) {
1621 folly::EventBase outerEvb;
1622 folly::EventBase innerEvb;
1624 getFiberManager(outerEvb).addTask([&]() {
1626 &getFiberManager(outerEvb), FiberManager::getFiberManagerUnsafe());
1628 runInMainContext([&]() {
1629 getFiberManager(innerEvb).addTask([&]() {
1631 &getFiberManager(innerEvb), FiberManager::getFiberManagerUnsafe());
1633 innerEvb.terminateLoopSoon();
1636 innerEvb.loopForever();
1640 &getFiberManager(outerEvb), FiberManager::getFiberManagerUnsafe());
1642 outerEvb.terminateLoopSoon();
1645 outerEvb.loopForever();
1648 TEST(FiberManager, semaphore) {
1649 static constexpr size_t kTasks = 10;
1650 static constexpr size_t kIterations = 10000;
1651 static constexpr size_t kNumTokens = 10;
1653 Semaphore sem(kNumTokens);
1657 auto task = [&sem](int& counter, folly::fibers::Baton& baton) {
1658 FiberManager manager(std::make_unique<EventBaseLoopController>());
1659 folly::EventBase evb;
1660 dynamic_cast<EventBaseLoopController&>(manager.loopController())
1661 .attachEventBase(evb);
1664 std::shared_ptr<folly::EventBase> completionCounter(
1665 &evb, [](folly::EventBase* evb) { evb->terminateLoopSoon(); });
1667 for (size_t i = 0; i < kTasks; ++i) {
1668 manager.addTask([&, completionCounter]() {
1669 for (size_t j = 0; j < kIterations; ++j) {
1675 EXPECT_LT(counter, kNumTokens);
1676 EXPECT_GE(counter, 0);
1686 folly::fibers::Baton batonA;
1687 folly::fibers::Baton batonB;
1688 std::thread threadA([&] { task(counterA, batonA); });
1689 std::thread threadB([&] { task(counterB, batonB); });
1696 EXPECT_LT(counterA, kNumTokens);
1697 EXPECT_LT(counterB, kNumTokens);
1698 EXPECT_GE(counterA, 0);
1699 EXPECT_GE(counterB, 0);
1702 template <typename ExecutorT>
1703 void singleBatchDispatch(ExecutorT& executor, int batchSize, int index) {
1704 thread_local BatchDispatcher<int, std::string, ExecutorT> batchDispatcher(
1705 executor, [=](std::vector<int>&& batch) {
1706 EXPECT_EQ(batchSize, batch.size());
1707 std::vector<std::string> results;
1708 for (auto& it : batch) {
1709 results.push_back(folly::to<std::string>(it));
1714 auto indexCopy = index;
1715 auto result = batchDispatcher.add(std::move(indexCopy));
1716 EXPECT_EQ(folly::to<std::string>(index), result.get());
1719 TEST(FiberManager, batchDispatchTest) {
1720 folly::EventBase evb;
1721 auto& executor = getFiberManager(evb);
1723 // Launch multiple fibers with a single id.
1724 executor.add([&]() {
1726 for (int i = 0; i < batchSize; i++) {
1728 [=, &executor]() { singleBatchDispatch(executor, batchSize, i); });
1733 // Reuse the same BatchDispatcher to batch once again.
1734 executor.add([&]() {
1736 for (int i = 0; i < batchSize; i++) {
1738 [=, &executor]() { singleBatchDispatch(executor, batchSize, i); });
1744 template <typename ExecutorT>
1745 folly::Future<std::vector<std::string>> doubleBatchInnerDispatch(
1746 ExecutorT& executor,
1747 int totalNumberOfElements,
1748 std::vector<int> input) {
1749 thread_local BatchDispatcher<
1751 std::vector<std::string>,
1753 batchDispatcher(executor, [=](std::vector<std::vector<int>>&& batch) {
1754 std::vector<std::vector<std::string>> results;
1755 int numberOfElements = 0;
1756 for (auto& unit : batch) {
1757 numberOfElements += unit.size();
1758 std::vector<std::string> result;
1759 for (auto& element : unit) {
1760 result.push_back(folly::to<std::string>(element));
1762 results.push_back(std::move(result));
1764 EXPECT_EQ(totalNumberOfElements, numberOfElements);
1768 return batchDispatcher.add(std::move(input));
1772 * Batch values in groups of 5, and then call inner dispatch.
1774 template <typename ExecutorT>
1775 void doubleBatchOuterDispatch(
1776 ExecutorT& executor,
1777 int totalNumberOfElements,
1779 thread_local BatchDispatcher<int, std::string, ExecutorT> batchDispatcher(
1780 executor, [=, &executor](std::vector<int>&& batch) {
1781 EXPECT_EQ(totalNumberOfElements, batch.size());
1782 std::vector<std::string> results;
1783 std::vector<folly::Future<std::vector<std::string>>>
1784 innerDispatchResultFutures;
1786 std::vector<int> group;
1787 for (auto unit : batch) {
1788 group.push_back(unit);
1789 if (group.size() == 5) {
1790 auto localGroup = group;
1793 innerDispatchResultFutures.push_back(doubleBatchInnerDispatch(
1794 executor, totalNumberOfElements, localGroup));
1799 innerDispatchResultFutures.begin(),
1800 innerDispatchResultFutures.end())
1801 .then([&](std::vector<Try<std::vector<std::string>>>
1802 innerDispatchResults) {
1803 for (auto& unit : innerDispatchResults) {
1804 for (auto& element : unit.value()) {
1805 results.push_back(element);
1813 auto indexCopy = index;
1814 auto result = batchDispatcher.add(std::move(indexCopy));
1815 EXPECT_EQ(folly::to<std::string>(index), result.get());
1818 TEST(FiberManager, doubleBatchDispatchTest) {
1819 folly::EventBase evb;
1820 auto& executor = getFiberManager(evb);
1822 // Launch multiple fibers with a single id.
1823 executor.add([&]() {
1824 int totalNumberOfElements = 20;
1825 for (int i = 0; i < totalNumberOfElements; i++) {
1826 executor.add([=, &executor]() {
1827 doubleBatchOuterDispatch(executor, totalNumberOfElements, i);
1834 template <typename ExecutorT>
1835 void batchDispatchExceptionHandling(ExecutorT& executor, int i) {
1836 thread_local BatchDispatcher<int, int, ExecutorT> batchDispatcher(
1837 executor, [](std::vector<int> &&) -> std::vector<int> {
1838 throw std::runtime_error("Surprise!!");
1841 EXPECT_THROW(batchDispatcher.add(i).get(), std::runtime_error);
1844 TEST(FiberManager, batchDispatchExceptionHandlingTest) {
1845 folly::EventBase evb;
1846 auto& executor = getFiberManager(evb);
1848 // Launch multiple fibers with a single id.
1849 executor.add([&]() {
1850 int totalNumberOfElements = 5;
1851 for (int i = 0; i < totalNumberOfElements; i++) {
1853 [=, &executor]() { batchDispatchExceptionHandling(executor, i); });
1859 namespace AtomicBatchDispatcherTesting {
1861 using ValueT = size_t;
1862 using ResultT = std::string;
1863 using DispatchFunctionT =
1864 folly::Function<std::vector<ResultT>(std::vector<ValueT>&&)>;
1866 #define ENABLE_TRACE_IN_TEST 0 // Set to 1 to debug issues in ABD tests
1867 #if ENABLE_TRACE_IN_TEST
1868 #define OUTPUT_TRACE std::cerr
1869 #else // ENABLE_TRACE_IN_TEST
1870 struct DevNullPiper {
1871 template <typename T>
1872 DevNullPiper& operator<<(const T&) {
1876 DevNullPiper& operator<<(std::ostream& (*)(std::ostream&)) {
1880 #define OUTPUT_TRACE devNullPiper
1881 #endif // ENABLE_TRACE_IN_TEST
1884 AtomicBatchDispatcher<ValueT, ResultT>::Token token;
1887 void preprocess(FiberManager& executor, bool die) {
1888 // Yield for a random duration [0, 10] ms to simulate I/O in preprocessing
1889 clock_t msecToDoIO = folly::Random::rand32() % 10;
1890 double start = (1000.0 * clock()) / CLOCKS_PER_SEC;
1891 double endAfter = start + msecToDoIO;
1892 while ((1000.0 * clock()) / CLOCKS_PER_SEC < endAfter) {
1896 throw std::logic_error("Simulating preprocessing failure");
1900 Job(AtomicBatchDispatcher<ValueT, ResultT>::Token&& t, ValueT i)
1901 : token(std::move(t)), input(i) {}
1903 Job(Job&&) = default;
1904 Job& operator=(Job&&) = default;
1907 ResultT processSingleInput(ValueT&& input) {
1908 return folly::to<ResultT>(std::move(input));
1911 std::vector<ResultT> userDispatchFunc(std::vector<ValueT>&& inputs) {
1912 size_t expectedCount = inputs.size();
1913 std::vector<ResultT> results;
1914 results.reserve(expectedCount);
1915 for (size_t i = 0; i < expectedCount; ++i) {
1916 results.emplace_back(processSingleInput(std::move(inputs[i])));
1922 AtomicBatchDispatcher<ValueT, ResultT>& atomicBatchDispatcher,
1923 std::vector<Job>& jobs,
1926 for (size_t i = 0; i < count; ++i) {
1927 jobs.emplace_back(Job(atomicBatchDispatcher.getToken(), i));
1931 enum class DispatchProblem {
1938 FiberManager& executor,
1939 std::vector<Job>& jobs,
1940 std::vector<folly::Optional<folly::Future<ResultT>>>& results,
1941 DispatchProblem dispatchProblem = DispatchProblem::None,
1942 size_t problemIndex = size_t(-1)) {
1944 dispatchProblem == DispatchProblem::None || problemIndex < jobs.size());
1946 results.resize(jobs.size());
1947 for (size_t i = 0; i < jobs.size(); ++i) {
1949 [i, &executor, &jobs, &results, dispatchProblem, problemIndex]() {
1951 Job job(std::move(jobs[i]));
1953 if (dispatchProblem == DispatchProblem::PreprocessThrows) {
1954 if (i == problemIndex) {
1955 EXPECT_THROW(job.preprocess(executor, true), std::logic_error);
1960 job.preprocess(executor, false);
1961 OUTPUT_TRACE << "Dispatching job #" << i << std::endl;
1962 results[i] = job.token.dispatch(job.input);
1963 OUTPUT_TRACE << "Result future filled for job #" << i << std::endl;
1965 if (dispatchProblem == DispatchProblem::DuplicateDispatch) {
1966 if (i == problemIndex) {
1967 EXPECT_THROW(job.token.dispatch(job.input), ABDUsageException);
1971 OUTPUT_TRACE << "Preprocessing failed for job #" << i << std::endl;
1977 void validateResult(
1978 std::vector<folly::Optional<folly::Future<ResultT>>>& results,
1981 OUTPUT_TRACE << "results[" << i << "].value() : " << results[i]->value()
1983 } catch (std::exception& e) {
1984 OUTPUT_TRACE << "Exception : " << e.what() << std::endl;
1989 template <typename TException>
1990 void validateResults(
1991 std::vector<folly::Optional<folly::Future<ResultT>>>& results,
1992 size_t expectedNumResults) {
1993 size_t numResultsFilled = 0;
1994 for (size_t i = 0; i < results.size(); ++i) {
1999 EXPECT_THROW(validateResult(results, i), TException);
2001 EXPECT_EQ(numResultsFilled, expectedNumResults);
2004 void validateResults(
2005 std::vector<folly::Optional<folly::Future<ResultT>>>& results,
2006 size_t expectedNumResults) {
2007 size_t numResultsFilled = 0;
2008 for (size_t i = 0; i < results.size(); ++i) {
2013 EXPECT_NO_THROW(validateResult(results, i));
2014 ValueT expectedInput = i;
2016 results[i]->value(), processSingleInput(std::move(expectedInput)));
2018 EXPECT_EQ(numResultsFilled, expectedNumResults);
2021 } // namespace AtomicBatchDispatcherTesting
2023 #define SET_UP_TEST_FUNC \
2024 using namespace AtomicBatchDispatcherTesting; \
2025 folly::EventBase evb; \
2026 auto& executor = getFiberManager(evb); \
2027 const size_t COUNT = 11; \
2028 std::vector<Job> jobs; \
2029 jobs.reserve(COUNT); \
2030 std::vector<folly::Optional<folly::Future<ResultT>>> results; \
2031 results.reserve(COUNT); \
2032 DispatchFunctionT dispatchFunc
2034 TEST(FiberManager, ABD_Test) {
2038 // Testing AtomicBatchDispatcher with explicit call to commit()
2040 dispatchFunc = userDispatchFunc;
2041 auto atomicBatchDispatcher =
2042 createAtomicBatchDispatcher(std::move(dispatchFunc));
2043 createJobs(atomicBatchDispatcher, jobs, COUNT);
2044 dispatchJobs(executor, jobs, results);
2045 atomicBatchDispatcher.commit();
2047 validateResults(results, COUNT);
2050 TEST(FiberManager, ABD_DispatcherDestroyedBeforeCallingCommit) {
2054 // Testing AtomicBatchDispatcher destroyed before calling commit.
2055 // Handles error cases for:
2056 // - User might have forgotten to add the call to commit() in the code
2057 // - An unexpected exception got thrown in user code before commit() is called
2060 dispatchFunc = userDispatchFunc;
2061 auto atomicBatchDispatcher =
2062 createAtomicBatchDispatcher(std::move(dispatchFunc));
2063 createJobs(atomicBatchDispatcher, jobs, COUNT);
2064 dispatchJobs(executor, jobs, results);
2065 throw std::runtime_error(
2066 "Unexpected exception in user code before commit called");
2067 // atomicBatchDispatcher.commit();
2069 /* User code handles the exception and does not exit process */
2072 validateResults<ABDCommitNotCalledException>(results, COUNT);
2075 TEST(FiberManager, ABD_PreprocessingFailureTest) {
2079 // Testing preprocessing failure on a job throws
2081 dispatchFunc = userDispatchFunc;
2082 auto atomicBatchDispatcher =
2083 createAtomicBatchDispatcher(std::move(dispatchFunc));
2084 createJobs(atomicBatchDispatcher, jobs, COUNT);
2085 dispatchJobs(executor, jobs, results, DispatchProblem::PreprocessThrows, 8);
2086 atomicBatchDispatcher.commit();
2088 validateResults<ABDTokenNotDispatchedException>(results, COUNT - 1);
2091 TEST(FiberManager, ABD_MultipleDispatchOnSameTokenErrorTest) {
2095 // Testing that calling dispatch more than once on the same token throws
2097 dispatchFunc = userDispatchFunc;
2098 auto atomicBatchDispatcher =
2099 createAtomicBatchDispatcher(std::move(dispatchFunc));
2100 createJobs(atomicBatchDispatcher, jobs, COUNT);
2101 dispatchJobs(executor, jobs, results, DispatchProblem::DuplicateDispatch, 4);
2102 atomicBatchDispatcher.commit();
2106 TEST(FiberManager, ABD_GetTokenCalledAfterCommitTest) {
2110 // Testing that exception set on attempt to call getToken after commit called
2112 dispatchFunc = userDispatchFunc;
2113 auto atomicBatchDispatcher =
2114 createAtomicBatchDispatcher(std::move(dispatchFunc));
2115 createJobs(atomicBatchDispatcher, jobs, COUNT);
2116 atomicBatchDispatcher.commit();
2117 EXPECT_THROW(atomicBatchDispatcher.getToken(), ABDUsageException);
2118 dispatchJobs(executor, jobs, results);
2119 EXPECT_THROW(atomicBatchDispatcher.getToken(), ABDUsageException);
2121 validateResults(results, COUNT);
2122 EXPECT_THROW(atomicBatchDispatcher.getToken(), ABDUsageException);
2125 TEST(FiberManager, ABD_UserProvidedBatchDispatchThrowsTest) {
2129 // Testing that exception is set if user provided batch dispatch throws
2131 dispatchFunc = [](std::vector<ValueT>&& inputs) -> std::vector<ResultT> {
2132 (void)userDispatchFunc(std::move(inputs));
2133 throw std::runtime_error("Unexpected exception in user dispatch function");
2135 auto atomicBatchDispatcher =
2136 createAtomicBatchDispatcher(std::move(dispatchFunc));
2137 createJobs(atomicBatchDispatcher, jobs, COUNT);
2138 dispatchJobs(executor, jobs, results);
2139 atomicBatchDispatcher.commit();
2141 validateResults<std::runtime_error>(results, COUNT);
2144 TEST(FiberManager, VirtualEventBase) {
2148 folly::ScopedEventBaseThread thread;
2151 std::make_unique<folly::VirtualEventBase>(*thread.getEventBase());
2152 auto& evb2 = thread.getEventBase()->getVirtualEventBase();
2154 getFiberManager(*evb1).addTaskRemote([&] {
2156 baton.timed_wait(std::chrono::milliseconds{100});
2161 getFiberManager(evb2).addTaskRemote([&] {
2163 baton.timed_wait(std::chrono::milliseconds{200});
2168 EXPECT_FALSE(done1);
2169 EXPECT_FALSE(done2);
2173 EXPECT_FALSE(done2);
2178 TEST(TimedMutex, ThreadFiberDeadlockOrder) {
2179 folly::EventBase evb;
2180 auto& fm = getFiberManager(evb);
2184 std::thread unlockThread([&] {
2185 /* sleep override */ std::this_thread::sleep_for(
2186 std::chrono::milliseconds{100});
2190 fm.addTask([&] { std::lock_guard<TimedMutex> lg(mutex); });
2192 runInMainContext([&] {
2193 auto locked = mutex.timed_lock(std::chrono::seconds{1});
2194 EXPECT_TRUE(locked);
2202 EXPECT_EQ(0, fm.hasTasks());
2204 unlockThread.join();
2207 TEST(TimedMutex, ThreadFiberDeadlockRace) {
2208 folly::EventBase evb;
2209 auto& fm = getFiberManager(evb);
2215 auto locked = mutex.timed_lock(std::chrono::seconds{1});
2216 EXPECT_TRUE(locked);
2223 runInMainContext([&] {
2224 auto locked = mutex.timed_lock(std::chrono::seconds{1});
2225 EXPECT_TRUE(locked);
2233 EXPECT_EQ(0, fm.hasTasks());
2237 * Test that we can properly track fiber stack usage.
2239 * This functionality can only be enabled when ASAN is disabled, so avoid
2240 * running this test with ASAN.
2242 #ifndef FOLLY_SANITIZE_ADDRESS
2243 TEST(FiberManager, recordStack) {
2245 folly::fibers::FiberManager::Options opts;
2246 opts.recordStackEvery = 1;
2248 FiberManager fm(std::make_unique<SimpleLoopController>(), opts);
2249 auto& loopController =
2250 dynamic_cast<SimpleLoopController&>(fm.loopController());
2252 static constexpr size_t n = 1000;
2256 for (size_t i = 0; i < n; ++i) {
2259 for (size_t i = 0; i + 1 < n; ++i) {
2260 s += b[i] * b[i + 1];
2266 loopController.loop([&]() { loopController.stop(); });
2268 // Check that we properly accounted fiber stack usage.
2269 EXPECT_LT(n * sizeof(int), fm.stackHighWatermark());
2271 std::thread(f).join();