2 * Copyright 2017 Facebook, Inc.
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
8 * http://www.apache.org/licenses/LICENSE-2.0
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
20 #include <folly/Memory.h>
21 #include <folly/Random.h>
22 #include <folly/futures/Future.h>
24 #include <folly/Conv.h>
25 #include <folly/fibers/AddTasks.h>
26 #include <folly/fibers/AtomicBatchDispatcher.h>
27 #include <folly/fibers/BatchDispatcher.h>
28 #include <folly/fibers/EventBaseLoopController.h>
29 #include <folly/fibers/FiberManager.h>
30 #include <folly/fibers/FiberManagerMap.h>
31 #include <folly/fibers/GenericBaton.h>
32 #include <folly/fibers/Semaphore.h>
33 #include <folly/fibers/SimpleLoopController.h>
34 #include <folly/fibers/TimedMutex.h>
35 #include <folly/fibers/WhenN.h>
36 #include <folly/io/async/ScopedEventBaseThread.h>
37 #include <folly/portability/GTest.h>
39 using namespace folly::fibers;
43 TEST(FiberManager, batonTimedWaitTimeout) {
44 bool taskAdded = false;
45 size_t iterations = 0;
47 FiberManager manager(std::make_unique<SimpleLoopController>());
48 auto& loopController =
49 dynamic_cast<SimpleLoopController&>(manager.loopController());
51 auto loopFunc = [&]() {
53 manager.addTask([&]() {
56 auto res = baton.timed_wait(std::chrono::milliseconds(230));
59 EXPECT_EQ(5, iterations);
61 loopController.stop();
63 manager.addTask([&]() {
66 auto res = baton.timed_wait(std::chrono::milliseconds(130));
69 EXPECT_EQ(3, iterations);
71 loopController.stop();
75 std::this_thread::sleep_for(std::chrono::milliseconds(50));
80 loopController.loop(std::move(loopFunc));
83 TEST(FiberManager, batonTimedWaitPost) {
84 bool taskAdded = false;
85 size_t iterations = 0;
88 FiberManager manager(std::make_unique<SimpleLoopController>());
89 auto& loopController =
90 dynamic_cast<SimpleLoopController&>(manager.loopController());
92 auto loopFunc = [&]() {
94 manager.addTask([&]() {
98 auto res = baton.timed_wait(std::chrono::milliseconds(130));
101 EXPECT_EQ(2, iterations);
103 loopController.stop();
107 std::this_thread::sleep_for(std::chrono::milliseconds(50));
109 if (iterations == 2) {
115 loopController.loop(std::move(loopFunc));
118 TEST(FiberManager, batonTimedWaitTimeoutEvb) {
119 size_t tasksComplete = 0;
121 folly::EventBase evb;
123 FiberManager manager(std::make_unique<EventBaseLoopController>());
124 dynamic_cast<EventBaseLoopController&>(manager.loopController())
125 .attachEventBase(evb);
127 auto task = [&](size_t timeout_ms) {
130 auto start = EventBaseLoopController::Clock::now();
131 auto res = baton.timed_wait(std::chrono::milliseconds(timeout_ms));
132 auto finish = EventBaseLoopController::Clock::now();
137 std::chrono::duration_cast<std::chrono::milliseconds>(finish - start);
139 EXPECT_GT(duration_ms.count(), timeout_ms - 50);
140 EXPECT_LT(duration_ms.count(), timeout_ms + 50);
142 if (++tasksComplete == 2) {
143 evb.terminateLoopSoon();
147 evb.runInEventBaseThread([&]() {
148 manager.addTask([&]() { task(500); });
149 manager.addTask([&]() { task(250); });
154 EXPECT_EQ(2, tasksComplete);
157 TEST(FiberManager, batonTimedWaitPostEvb) {
158 size_t tasksComplete = 0;
160 folly::EventBase evb;
162 FiberManager manager(std::make_unique<EventBaseLoopController>());
163 dynamic_cast<EventBaseLoopController&>(manager.loopController())
164 .attachEventBase(evb);
166 evb.runInEventBaseThread([&]() {
167 manager.addTask([&]() {
170 evb.tryRunAfterDelay([&]() { baton.post(); }, 100);
172 auto start = EventBaseLoopController::Clock::now();
173 auto res = baton.timed_wait(std::chrono::milliseconds(130));
174 auto finish = EventBaseLoopController::Clock::now();
179 std::chrono::duration_cast<std::chrono::milliseconds>(finish - start);
181 EXPECT_TRUE(duration_ms.count() > 95 && duration_ms.count() < 110);
183 if (++tasksComplete == 1) {
184 evb.terminateLoopSoon();
191 EXPECT_EQ(1, tasksComplete);
194 TEST(FiberManager, batonTryWait) {
195 FiberManager manager(std::make_unique<SimpleLoopController>());
197 // Check if try_wait and post work as expected
200 manager.addTask([&]() {
201 while (!b.try_wait()) {
204 auto thr = std::thread([&]() {
205 std::this_thread::sleep_for(std::chrono::milliseconds(300));
209 manager.loopUntilNoReady();
214 // Check try_wait without post
215 manager.addTask([&]() {
217 while (cnt && !c.try_wait()) {
220 EXPECT_TRUE(!c.try_wait()); // must still hold
224 manager.loopUntilNoReady();
227 TEST(FiberManager, genericBatonFiberWait) {
228 FiberManager manager(std::make_unique<SimpleLoopController>());
231 bool fiberRunning = false;
233 manager.addTask([&]() {
234 EXPECT_EQ(manager.hasActiveFiber(), true);
237 fiberRunning = false;
240 EXPECT_FALSE(fiberRunning);
241 manager.loopUntilNoReady();
242 EXPECT_TRUE(fiberRunning); // ensure fiber still active
244 auto thr = std::thread([&]() {
245 std::this_thread::sleep_for(std::chrono::milliseconds(300));
249 while (fiberRunning) {
250 manager.loopUntilNoReady();
256 TEST(FiberManager, genericBatonThreadWait) {
257 FiberManager manager(std::make_unique<SimpleLoopController>());
259 std::atomic<bool> threadWaiting(false);
261 auto thr = std::thread([&]() {
262 threadWaiting = true;
264 threadWaiting = false;
267 while (!threadWaiting) {
269 std::this_thread::sleep_for(std::chrono::milliseconds(300));
271 manager.addTask([&]() {
272 EXPECT_EQ(manager.hasActiveFiber(), true);
273 EXPECT_TRUE(threadWaiting);
275 while (threadWaiting) {
279 manager.loopUntilNoReady();
283 TEST(FiberManager, addTasksNoncopyable) {
284 std::vector<Promise<int>> pendingFibers;
285 bool taskAdded = false;
287 FiberManager manager(std::make_unique<SimpleLoopController>());
288 auto& loopController =
289 dynamic_cast<SimpleLoopController&>(manager.loopController());
291 auto loopFunc = [&]() {
293 manager.addTask([&]() {
294 std::vector<std::function<std::unique_ptr<int>()>> funcs;
295 for (int i = 0; i < 3; ++i) {
296 funcs.push_back([i, &pendingFibers]() {
297 await([&pendingFibers](Promise<int> promise) {
298 pendingFibers.push_back(std::move(promise));
300 return std::make_unique<int>(i * 2 + 1);
304 auto iter = addTasks(funcs.begin(), funcs.end());
307 while (iter.hasNext()) {
308 auto result = iter.awaitNext();
309 EXPECT_EQ(2 * iter.getTaskID() + 1, *result);
310 EXPECT_GE(2 - n, pendingFibers.size());
316 } else if (pendingFibers.size()) {
317 pendingFibers.back().setValue(0);
318 pendingFibers.pop_back();
320 loopController.stop();
324 loopController.loop(std::move(loopFunc));
327 TEST(FiberManager, awaitThrow) {
328 folly::EventBase evb;
329 struct ExpectedException {};
333 await([](Promise<int> p) {
335 throw ExpectedException();
341 await([&](Promise<int> p) {
342 evb.runInEventBaseThread([p = std::move(p)]() mutable {
345 throw ExpectedException();
352 TEST(FiberManager, addTasksThrow) {
353 std::vector<Promise<int>> pendingFibers;
354 bool taskAdded = false;
356 FiberManager manager(std::make_unique<SimpleLoopController>());
357 auto& loopController =
358 dynamic_cast<SimpleLoopController&>(manager.loopController());
360 auto loopFunc = [&]() {
362 manager.addTask([&]() {
363 std::vector<std::function<int()>> funcs;
364 for (size_t i = 0; i < 3; ++i) {
365 funcs.push_back([i, &pendingFibers]() {
366 await([&pendingFibers](Promise<int> promise) {
367 pendingFibers.push_back(std::move(promise));
370 throw std::runtime_error("Runtime");
376 auto iter = addTasks(funcs.begin(), funcs.end());
379 while (iter.hasNext()) {
381 int result = iter.awaitNext();
382 EXPECT_EQ(1, iter.getTaskID() % 2);
383 EXPECT_EQ(2 * iter.getTaskID() + 1, result);
385 EXPECT_EQ(0, iter.getTaskID() % 2);
387 EXPECT_GE(2 - n, pendingFibers.size());
393 } else if (pendingFibers.size()) {
394 pendingFibers.back().setValue(0);
395 pendingFibers.pop_back();
397 loopController.stop();
401 loopController.loop(std::move(loopFunc));
404 TEST(FiberManager, addTasksVoid) {
405 std::vector<Promise<int>> pendingFibers;
406 bool taskAdded = false;
408 FiberManager manager(std::make_unique<SimpleLoopController>());
409 auto& loopController =
410 dynamic_cast<SimpleLoopController&>(manager.loopController());
412 auto loopFunc = [&]() {
414 manager.addTask([&]() {
415 std::vector<std::function<void()>> funcs;
416 for (size_t i = 0; i < 3; ++i) {
417 funcs.push_back([&pendingFibers]() {
418 await([&pendingFibers](Promise<int> promise) {
419 pendingFibers.push_back(std::move(promise));
424 auto iter = addTasks(funcs.begin(), funcs.end());
427 while (iter.hasNext()) {
429 EXPECT_GE(2 - n, pendingFibers.size());
435 } else if (pendingFibers.size()) {
436 pendingFibers.back().setValue(0);
437 pendingFibers.pop_back();
439 loopController.stop();
443 loopController.loop(std::move(loopFunc));
446 TEST(FiberManager, addTasksVoidThrow) {
447 std::vector<Promise<int>> pendingFibers;
448 bool taskAdded = false;
450 FiberManager manager(std::make_unique<SimpleLoopController>());
451 auto& loopController =
452 dynamic_cast<SimpleLoopController&>(manager.loopController());
454 auto loopFunc = [&]() {
456 manager.addTask([&]() {
457 std::vector<std::function<void()>> funcs;
458 for (size_t i = 0; i < 3; ++i) {
459 funcs.push_back([i, &pendingFibers]() {
460 await([&pendingFibers](Promise<int> promise) {
461 pendingFibers.push_back(std::move(promise));
464 throw std::runtime_error("");
469 auto iter = addTasks(funcs.begin(), funcs.end());
472 while (iter.hasNext()) {
475 EXPECT_EQ(1, iter.getTaskID() % 2);
477 EXPECT_EQ(0, iter.getTaskID() % 2);
479 EXPECT_GE(2 - n, pendingFibers.size());
485 } else if (pendingFibers.size()) {
486 pendingFibers.back().setValue(0);
487 pendingFibers.pop_back();
489 loopController.stop();
493 loopController.loop(std::move(loopFunc));
496 TEST(FiberManager, addTasksReserve) {
497 std::vector<Promise<int>> pendingFibers;
498 bool taskAdded = false;
500 FiberManager manager(std::make_unique<SimpleLoopController>());
501 auto& loopController =
502 dynamic_cast<SimpleLoopController&>(manager.loopController());
504 auto loopFunc = [&]() {
506 manager.addTask([&]() {
507 std::vector<std::function<void()>> funcs;
508 for (size_t i = 0; i < 3; ++i) {
509 funcs.push_back([&pendingFibers]() {
510 await([&pendingFibers](Promise<int> promise) {
511 pendingFibers.push_back(std::move(promise));
516 auto iter = addTasks(funcs.begin(), funcs.end());
519 EXPECT_TRUE(iter.hasCompleted());
520 EXPECT_TRUE(iter.hasPending());
521 EXPECT_TRUE(iter.hasNext());
524 EXPECT_TRUE(iter.hasCompleted());
525 EXPECT_TRUE(iter.hasPending());
526 EXPECT_TRUE(iter.hasNext());
529 EXPECT_FALSE(iter.hasCompleted());
530 EXPECT_TRUE(iter.hasPending());
531 EXPECT_TRUE(iter.hasNext());
534 EXPECT_FALSE(iter.hasCompleted());
535 EXPECT_FALSE(iter.hasPending());
536 EXPECT_FALSE(iter.hasNext());
539 } else if (pendingFibers.size()) {
540 pendingFibers.back().setValue(0);
541 pendingFibers.pop_back();
543 loopController.stop();
547 loopController.loop(std::move(loopFunc));
550 TEST(FiberManager, addTaskDynamic) {
551 folly::EventBase evb;
555 auto makeTask = [&](size_t taskId) {
556 return [&, taskId]() -> size_t {
557 batons[taskId].wait();
563 .addTaskFuture([&]() {
564 TaskIterator<size_t> iterator;
566 iterator.addTask(makeTask(0));
567 iterator.addTask(makeTask(1));
571 EXPECT_EQ(1, iterator.awaitNext());
573 iterator.addTask(makeTask(2));
577 EXPECT_EQ(2, iterator.awaitNext());
581 EXPECT_EQ(0, iterator.awaitNext());
586 TEST(FiberManager, forEach) {
587 std::vector<Promise<int>> pendingFibers;
588 bool taskAdded = false;
590 FiberManager manager(std::make_unique<SimpleLoopController>());
591 auto& loopController =
592 dynamic_cast<SimpleLoopController&>(manager.loopController());
594 auto loopFunc = [&]() {
596 manager.addTask([&]() {
597 std::vector<std::function<int()>> funcs;
598 for (size_t i = 0; i < 3; ++i) {
599 funcs.push_back([i, &pendingFibers]() {
600 await([&pendingFibers](Promise<int> promise) {
601 pendingFibers.push_back(std::move(promise));
607 std::vector<std::pair<size_t, int>> results;
608 forEach(funcs.begin(), funcs.end(), [&results](size_t id, int result) {
609 results.emplace_back(id, result);
611 EXPECT_EQ(3, results.size());
612 EXPECT_TRUE(pendingFibers.empty());
613 for (size_t i = 0; i < 3; ++i) {
614 EXPECT_EQ(results[i].first * 2 + 1, results[i].second);
618 } else if (pendingFibers.size()) {
619 pendingFibers.back().setValue(0);
620 pendingFibers.pop_back();
622 loopController.stop();
626 loopController.loop(std::move(loopFunc));
629 TEST(FiberManager, collectN) {
630 std::vector<Promise<int>> pendingFibers;
631 bool taskAdded = false;
633 FiberManager manager(std::make_unique<SimpleLoopController>());
634 auto& loopController =
635 dynamic_cast<SimpleLoopController&>(manager.loopController());
637 auto loopFunc = [&]() {
639 manager.addTask([&]() {
640 std::vector<std::function<int()>> funcs;
641 for (size_t i = 0; i < 3; ++i) {
642 funcs.push_back([i, &pendingFibers]() {
643 await([&pendingFibers](Promise<int> promise) {
644 pendingFibers.push_back(std::move(promise));
650 auto results = collectN(funcs.begin(), funcs.end(), 2);
651 EXPECT_EQ(2, results.size());
652 EXPECT_EQ(1, pendingFibers.size());
653 for (size_t i = 0; i < 2; ++i) {
654 EXPECT_EQ(results[i].first * 2 + 1, results[i].second);
658 } else if (pendingFibers.size()) {
659 pendingFibers.back().setValue(0);
660 pendingFibers.pop_back();
662 loopController.stop();
666 loopController.loop(std::move(loopFunc));
669 TEST(FiberManager, collectNThrow) {
670 std::vector<Promise<int>> pendingFibers;
671 bool taskAdded = false;
673 FiberManager manager(std::make_unique<SimpleLoopController>());
674 auto& loopController =
675 dynamic_cast<SimpleLoopController&>(manager.loopController());
677 auto loopFunc = [&]() {
679 manager.addTask([&]() {
680 std::vector<std::function<int()>> funcs;
681 for (size_t i = 0; i < 3; ++i) {
682 funcs.push_back([&pendingFibers]() -> size_t {
683 await([&pendingFibers](Promise<int> promise) {
684 pendingFibers.push_back(std::move(promise));
686 throw std::runtime_error("Runtime");
691 collectN(funcs.begin(), funcs.end(), 2);
693 EXPECT_EQ(1, pendingFibers.size());
697 } else if (pendingFibers.size()) {
698 pendingFibers.back().setValue(0);
699 pendingFibers.pop_back();
701 loopController.stop();
705 loopController.loop(std::move(loopFunc));
708 TEST(FiberManager, collectNVoid) {
709 std::vector<Promise<int>> pendingFibers;
710 bool taskAdded = false;
712 FiberManager manager(std::make_unique<SimpleLoopController>());
713 auto& loopController =
714 dynamic_cast<SimpleLoopController&>(manager.loopController());
716 auto loopFunc = [&]() {
718 manager.addTask([&]() {
719 std::vector<std::function<void()>> funcs;
720 for (size_t i = 0; i < 3; ++i) {
721 funcs.push_back([&pendingFibers]() {
722 await([&pendingFibers](Promise<int> promise) {
723 pendingFibers.push_back(std::move(promise));
728 auto results = collectN(funcs.begin(), funcs.end(), 2);
729 EXPECT_EQ(2, results.size());
730 EXPECT_EQ(1, pendingFibers.size());
733 } else if (pendingFibers.size()) {
734 pendingFibers.back().setValue(0);
735 pendingFibers.pop_back();
737 loopController.stop();
741 loopController.loop(std::move(loopFunc));
744 TEST(FiberManager, collectNVoidThrow) {
745 std::vector<Promise<int>> pendingFibers;
746 bool taskAdded = false;
748 FiberManager manager(std::make_unique<SimpleLoopController>());
749 auto& loopController =
750 dynamic_cast<SimpleLoopController&>(manager.loopController());
752 auto loopFunc = [&]() {
754 manager.addTask([&]() {
755 std::vector<std::function<void()>> funcs;
756 for (size_t i = 0; i < 3; ++i) {
757 funcs.push_back([&pendingFibers]() {
758 await([&pendingFibers](Promise<int> promise) {
759 pendingFibers.push_back(std::move(promise));
761 throw std::runtime_error("Runtime");
766 collectN(funcs.begin(), funcs.end(), 2);
768 EXPECT_EQ(1, pendingFibers.size());
772 } else if (pendingFibers.size()) {
773 pendingFibers.back().setValue(0);
774 pendingFibers.pop_back();
776 loopController.stop();
780 loopController.loop(std::move(loopFunc));
783 TEST(FiberManager, collectAll) {
784 std::vector<Promise<int>> pendingFibers;
785 bool taskAdded = false;
787 FiberManager manager(std::make_unique<SimpleLoopController>());
788 auto& loopController =
789 dynamic_cast<SimpleLoopController&>(manager.loopController());
791 auto loopFunc = [&]() {
793 manager.addTask([&]() {
794 std::vector<std::function<int()>> funcs;
795 for (size_t i = 0; i < 3; ++i) {
796 funcs.push_back([i, &pendingFibers]() {
797 await([&pendingFibers](Promise<int> promise) {
798 pendingFibers.push_back(std::move(promise));
804 auto results = collectAll(funcs.begin(), funcs.end());
805 EXPECT_TRUE(pendingFibers.empty());
806 for (size_t i = 0; i < 3; ++i) {
807 EXPECT_EQ(i * 2 + 1, results[i]);
811 } else if (pendingFibers.size()) {
812 pendingFibers.back().setValue(0);
813 pendingFibers.pop_back();
815 loopController.stop();
819 loopController.loop(std::move(loopFunc));
822 TEST(FiberManager, collectAllVoid) {
823 std::vector<Promise<int>> pendingFibers;
824 bool taskAdded = false;
826 FiberManager manager(std::make_unique<SimpleLoopController>());
827 auto& loopController =
828 dynamic_cast<SimpleLoopController&>(manager.loopController());
830 auto loopFunc = [&]() {
832 manager.addTask([&]() {
833 std::vector<std::function<void()>> funcs;
834 for (size_t i = 0; i < 3; ++i) {
835 funcs.push_back([&pendingFibers]() {
836 await([&pendingFibers](Promise<int> promise) {
837 pendingFibers.push_back(std::move(promise));
842 collectAll(funcs.begin(), funcs.end());
843 EXPECT_TRUE(pendingFibers.empty());
846 } else if (pendingFibers.size()) {
847 pendingFibers.back().setValue(0);
848 pendingFibers.pop_back();
850 loopController.stop();
854 loopController.loop(std::move(loopFunc));
857 TEST(FiberManager, collectAny) {
858 std::vector<Promise<int>> pendingFibers;
859 bool taskAdded = false;
861 FiberManager manager(std::make_unique<SimpleLoopController>());
862 auto& loopController =
863 dynamic_cast<SimpleLoopController&>(manager.loopController());
865 auto loopFunc = [&]() {
867 manager.addTask([&]() {
868 std::vector<std::function<int()>> funcs;
869 for (size_t i = 0; i < 3; ++i) {
870 funcs.push_back([i, &pendingFibers]() {
871 await([&pendingFibers](Promise<int> promise) {
872 pendingFibers.push_back(std::move(promise));
875 throw std::runtime_error("This exception will be ignored");
881 auto result = collectAny(funcs.begin(), funcs.end());
882 EXPECT_EQ(2, pendingFibers.size());
883 EXPECT_EQ(2, result.first);
884 EXPECT_EQ(2 * 2 + 1, result.second);
887 } else if (pendingFibers.size()) {
888 pendingFibers.back().setValue(0);
889 pendingFibers.pop_back();
891 loopController.stop();
895 loopController.loop(std::move(loopFunc));
899 /* Checks that this function was run from a main context,
900 by comparing an address on a stack to a known main stack address
901 and a known related fiber stack address. The assumption
902 is that fiber stack and main stack will be far enough apart,
903 while any two values on the same stack will be close. */
904 void expectMainContext(bool& ran, int* mainLocation, int* fiberLocation) {
906 /* 2 pages is a good guess */
907 constexpr ssize_t DISTANCE = 0x2000 / sizeof(int);
909 EXPECT_TRUE(std::abs(&here - fiberLocation) > DISTANCE);
912 EXPECT_TRUE(std::abs(&here - mainLocation) < DISTANCE);
920 TEST(FiberManager, runInMainContext) {
921 FiberManager manager(std::make_unique<SimpleLoopController>());
922 auto& loopController =
923 dynamic_cast<SimpleLoopController&>(manager.loopController());
925 bool checkRan = false;
928 manager.runInMainContext(
929 [&]() { expectMainContext(checkRan, &mainLocation, nullptr); });
930 EXPECT_TRUE(checkRan);
934 manager.addTask([&]() {
936 explicit A(int value_) : value(value_) {}
937 A(const A&) = delete;
943 auto ret = runInMainContext([&]() {
944 expectMainContext(checkRan, &mainLocation, &stackLocation);
947 EXPECT_TRUE(checkRan);
948 EXPECT_EQ(42, ret.value);
951 loopController.loop([&]() { loopController.stop(); });
953 EXPECT_TRUE(checkRan);
956 TEST(FiberManager, addTaskFinally) {
957 FiberManager manager(std::make_unique<SimpleLoopController>());
958 auto& loopController =
959 dynamic_cast<SimpleLoopController&>(manager.loopController());
961 bool checkRan = false;
965 manager.addTaskFinally(
966 [&]() { return 1234; },
967 [&](Try<int>&& result) {
968 EXPECT_EQ(result.value(), 1234);
970 expectMainContext(checkRan, &mainLocation, nullptr);
973 EXPECT_FALSE(checkRan);
975 loopController.loop([&]() { loopController.stop(); });
977 EXPECT_TRUE(checkRan);
980 TEST(FiberManager, fibersPoolWithinLimit) {
981 FiberManager::Options opts;
982 opts.maxFibersPoolSize = 5;
984 FiberManager manager(std::make_unique<SimpleLoopController>(), opts);
985 auto& loopController =
986 dynamic_cast<SimpleLoopController&>(manager.loopController());
988 size_t fibersRun = 0;
990 for (size_t i = 0; i < 5; ++i) {
991 manager.addTask([&]() { ++fibersRun; });
993 loopController.loop([&]() { loopController.stop(); });
995 EXPECT_EQ(5, fibersRun);
996 EXPECT_EQ(5, manager.fibersAllocated());
997 EXPECT_EQ(5, manager.fibersPoolSize());
999 for (size_t i = 0; i < 5; ++i) {
1000 manager.addTask([&]() { ++fibersRun; });
1002 loopController.loop([&]() { loopController.stop(); });
1004 EXPECT_EQ(10, fibersRun);
1005 EXPECT_EQ(5, manager.fibersAllocated());
1006 EXPECT_EQ(5, manager.fibersPoolSize());
1009 TEST(FiberManager, fibersPoolOverLimit) {
1010 FiberManager::Options opts;
1011 opts.maxFibersPoolSize = 5;
1013 FiberManager manager(std::make_unique<SimpleLoopController>(), opts);
1014 auto& loopController =
1015 dynamic_cast<SimpleLoopController&>(manager.loopController());
1017 size_t fibersRun = 0;
1019 for (size_t i = 0; i < 10; ++i) {
1020 manager.addTask([&]() { ++fibersRun; });
1023 EXPECT_EQ(0, fibersRun);
1024 EXPECT_EQ(10, manager.fibersAllocated());
1025 EXPECT_EQ(0, manager.fibersPoolSize());
1027 loopController.loop([&]() { loopController.stop(); });
1029 EXPECT_EQ(10, fibersRun);
1030 EXPECT_EQ(5, manager.fibersAllocated());
1031 EXPECT_EQ(5, manager.fibersPoolSize());
1034 TEST(FiberManager, remoteFiberBasic) {
1035 FiberManager manager(std::make_unique<SimpleLoopController>());
1036 auto& loopController =
1037 dynamic_cast<SimpleLoopController&>(manager.loopController());
1040 result[0] = result[1] = 0;
1041 folly::Optional<Promise<int>> savedPromise[2];
1042 manager.addTask([&]() {
1044 [&](Promise<int> promise) { savedPromise[0] = std::move(promise); });
1046 manager.addTask([&]() {
1048 [&](Promise<int> promise) { savedPromise[1] = std::move(promise); });
1051 manager.loopUntilNoReady();
1053 EXPECT_TRUE(savedPromise[0].hasValue());
1054 EXPECT_TRUE(savedPromise[1].hasValue());
1055 EXPECT_EQ(0, result[0]);
1056 EXPECT_EQ(0, result[1]);
1058 std::thread remoteThread0{[&]() { savedPromise[0]->setValue(42); }};
1059 std::thread remoteThread1{[&]() { savedPromise[1]->setValue(43); }};
1060 remoteThread0.join();
1061 remoteThread1.join();
1062 EXPECT_EQ(0, result[0]);
1063 EXPECT_EQ(0, result[1]);
1064 /* Should only have scheduled once */
1065 EXPECT_EQ(1, loopController.remoteScheduleCalled());
1067 manager.loopUntilNoReady();
1068 EXPECT_EQ(42, result[0]);
1069 EXPECT_EQ(43, result[1]);
1072 TEST(FiberManager, addTaskRemoteBasic) {
1073 FiberManager manager(std::make_unique<SimpleLoopController>());
1076 result[0] = result[1] = 0;
1077 folly::Optional<Promise<int>> savedPromise[2];
1079 std::thread remoteThread0{[&]() {
1080 manager.addTaskRemote([&]() {
1082 [&](Promise<int> promise) { savedPromise[0] = std::move(promise); });
1085 std::thread remoteThread1{[&]() {
1086 manager.addTaskRemote([&]() {
1088 [&](Promise<int> promise) { savedPromise[1] = std::move(promise); });
1091 remoteThread0.join();
1092 remoteThread1.join();
1094 manager.loopUntilNoReady();
1096 EXPECT_TRUE(savedPromise[0].hasValue());
1097 EXPECT_TRUE(savedPromise[1].hasValue());
1098 EXPECT_EQ(0, result[0]);
1099 EXPECT_EQ(0, result[1]);
1101 savedPromise[0]->setValue(42);
1102 savedPromise[1]->setValue(43);
1104 EXPECT_EQ(0, result[0]);
1105 EXPECT_EQ(0, result[1]);
1107 manager.loopUntilNoReady();
1108 EXPECT_EQ(42, result[0]);
1109 EXPECT_EQ(43, result[1]);
1112 TEST(FiberManager, remoteHasTasks) {
1114 FiberManager fm(std::make_unique<SimpleLoopController>());
1115 std::thread remote([&]() { fm.addTaskRemote([&]() { ++counter; }); });
1119 while (fm.hasTasks()) {
1120 fm.loopUntilNoReady();
1123 EXPECT_FALSE(fm.hasTasks());
1124 EXPECT_EQ(counter, 1);
1127 TEST(FiberManager, remoteHasReadyTasks) {
1129 folly::Optional<Promise<int>> savedPromise;
1130 FiberManager fm(std::make_unique<SimpleLoopController>());
1131 std::thread remote([&]() {
1132 fm.addTaskRemote([&]() {
1134 [&](Promise<int> promise) { savedPromise = std::move(promise); });
1135 EXPECT_TRUE(fm.hasTasks());
1140 EXPECT_TRUE(fm.hasTasks());
1142 fm.loopUntilNoReady();
1143 EXPECT_TRUE(fm.hasTasks());
1145 std::thread remote2([&]() { savedPromise->setValue(47); });
1147 EXPECT_TRUE(fm.hasTasks());
1149 fm.loopUntilNoReady();
1150 EXPECT_FALSE(fm.hasTasks());
1152 EXPECT_EQ(result, 47);
1155 template <typename Data>
1156 void testFiberLocal() {
1157 FiberManager fm(LocalType<Data>(), std::make_unique<SimpleLoopController>());
1160 EXPECT_EQ(42, local<Data>().value);
1162 local<Data>().value = 43;
1165 EXPECT_EQ(43, local<Data>().value);
1167 local<Data>().value = 44;
1169 addTask([]() { EXPECT_EQ(44, local<Data>().value); });
1174 EXPECT_EQ(42, local<Data>().value);
1176 local<Data>().value = 43;
1178 fm.addTaskRemote([]() { EXPECT_EQ(43, local<Data>().value); });
1182 EXPECT_EQ(42, local<Data>().value);
1183 local<Data>().value = 43;
1186 EXPECT_EQ(43, local<Data>().value);
1187 local<Data>().value = 44;
1189 std::vector<std::function<void()>> tasks{task};
1190 collectAny(tasks.begin(), tasks.end());
1192 EXPECT_EQ(43, local<Data>().value);
1195 fm.loopUntilNoReady();
1196 EXPECT_FALSE(fm.hasTasks());
1199 TEST(FiberManager, fiberLocal) {
1204 testFiberLocal<SimpleData>();
1207 TEST(FiberManager, fiberLocalHeap) {
1209 char _[1024 * 1024];
1213 testFiberLocal<LargeData>();
1216 TEST(FiberManager, fiberLocalDestructor) {
1223 EXPECT_EQ(42, local<CrazyData>().data);
1224 // Make sure we don't have infinite loop
1225 local<CrazyData>().data = 0;
1232 LocalType<CrazyData>(), std::make_unique<SimpleLoopController>());
1234 fm.addTask([]() { local<CrazyData>().data = 41; });
1236 fm.loopUntilNoReady();
1237 EXPECT_FALSE(fm.hasTasks());
1240 TEST(FiberManager, yieldTest) {
1241 FiberManager manager(std::make_unique<SimpleLoopController>());
1242 auto& loopController =
1243 dynamic_cast<SimpleLoopController&>(manager.loopController());
1245 bool checkRan = false;
1247 manager.addTask([&]() {
1252 loopController.loop([&]() {
1254 loopController.stop();
1258 EXPECT_TRUE(checkRan);
1261 TEST(FiberManager, RequestContext) {
1262 FiberManager fm(std::make_unique<SimpleLoopController>());
1264 bool checkRun1 = false;
1265 bool checkRun2 = false;
1266 bool checkRun3 = false;
1267 bool checkRun4 = false;
1268 folly::fibers::Baton baton1;
1269 folly::fibers::Baton baton2;
1270 folly::fibers::Baton baton3;
1271 folly::fibers::Baton baton4;
1274 folly::RequestContextScopeGuard rctx;
1275 auto rcontext1 = folly::RequestContext::get();
1276 fm.addTask([&, rcontext1]() {
1277 EXPECT_EQ(rcontext1, folly::RequestContext::get());
1279 [&]() { EXPECT_EQ(rcontext1, folly::RequestContext::get()); });
1280 EXPECT_EQ(rcontext1, folly::RequestContext::get());
1282 [&]() { EXPECT_EQ(rcontext1, folly::RequestContext::get()); });
1287 folly::RequestContextScopeGuard rctx;
1288 auto rcontext2 = folly::RequestContext::get();
1289 fm.addTaskRemote([&, rcontext2]() {
1290 EXPECT_EQ(rcontext2, folly::RequestContext::get());
1292 EXPECT_EQ(rcontext2, folly::RequestContext::get());
1297 folly::RequestContextScopeGuard rctx;
1298 auto rcontext3 = folly::RequestContext::get();
1301 EXPECT_EQ(rcontext3, folly::RequestContext::get());
1303 EXPECT_EQ(rcontext3, folly::RequestContext::get());
1305 return folly::Unit();
1307 [&, rcontext3](Try<folly::Unit>&& /* t */) {
1308 EXPECT_EQ(rcontext3, folly::RequestContext::get());
1313 folly::RequestContext::setContext(nullptr);
1315 folly::RequestContextScopeGuard rctx;
1316 auto rcontext4 = folly::RequestContext::get();
1318 EXPECT_EQ(rcontext4, folly::RequestContext::get());
1323 folly::RequestContextScopeGuard rctx;
1324 auto rcontext = folly::RequestContext::get();
1326 fm.loopUntilNoReady();
1327 EXPECT_EQ(rcontext, folly::RequestContext::get());
1330 EXPECT_EQ(rcontext, folly::RequestContext::get());
1331 fm.loopUntilNoReady();
1332 EXPECT_TRUE(checkRun1);
1333 EXPECT_EQ(rcontext, folly::RequestContext::get());
1336 EXPECT_EQ(rcontext, folly::RequestContext::get());
1337 fm.loopUntilNoReady();
1338 EXPECT_TRUE(checkRun2);
1339 EXPECT_EQ(rcontext, folly::RequestContext::get());
1342 EXPECT_EQ(rcontext, folly::RequestContext::get());
1343 fm.loopUntilNoReady();
1344 EXPECT_TRUE(checkRun3);
1345 EXPECT_EQ(rcontext, folly::RequestContext::get());
1348 EXPECT_EQ(rcontext, folly::RequestContext::get());
1349 fm.loopUntilNoReady();
1350 EXPECT_TRUE(checkRun4);
1351 EXPECT_EQ(rcontext, folly::RequestContext::get());
1355 TEST(FiberManager, resizePeriodically) {
1356 FiberManager::Options opts;
1357 opts.fibersPoolResizePeriodMs = 300;
1358 opts.maxFibersPoolSize = 5;
1360 FiberManager manager(std::make_unique<EventBaseLoopController>(), opts);
1362 folly::EventBase evb;
1363 dynamic_cast<EventBaseLoopController&>(manager.loopController())
1364 .attachEventBase(evb);
1366 std::vector<Baton> batons(10);
1368 size_t tasksRun = 0;
1369 for (size_t i = 0; i < 30; ++i) {
1370 manager.addTask([i, &batons, &tasksRun]() {
1372 // Keep some fibers active indefinitely
1373 if (i < batons.size()) {
1379 EXPECT_EQ(0, tasksRun);
1380 EXPECT_EQ(30, manager.fibersAllocated());
1381 EXPECT_EQ(0, manager.fibersPoolSize());
1384 EXPECT_EQ(30, tasksRun);
1385 EXPECT_EQ(30, manager.fibersAllocated());
1386 // Can go over maxFibersPoolSize, 10 of 30 fibers still active
1387 EXPECT_EQ(20, manager.fibersPoolSize());
1389 std::this_thread::sleep_for(std::chrono::milliseconds(400));
1390 evb.loopOnce(); // no fibers active in this period
1391 EXPECT_EQ(30, manager.fibersAllocated());
1392 EXPECT_EQ(20, manager.fibersPoolSize());
1394 std::this_thread::sleep_for(std::chrono::milliseconds(400));
1395 evb.loopOnce(); // should shrink fibers pool to maxFibersPoolSize
1396 EXPECT_EQ(15, manager.fibersAllocated());
1397 EXPECT_EQ(5, manager.fibersPoolSize());
1399 for (size_t i = 0; i < batons.size(); ++i) {
1403 EXPECT_EQ(15, manager.fibersAllocated());
1404 EXPECT_EQ(15, manager.fibersPoolSize());
1406 std::this_thread::sleep_for(std::chrono::milliseconds(400));
1407 evb.loopOnce(); // 10 fibers active in last period
1408 EXPECT_EQ(10, manager.fibersAllocated());
1409 EXPECT_EQ(10, manager.fibersPoolSize());
1411 std::this_thread::sleep_for(std::chrono::milliseconds(400));
1413 EXPECT_EQ(5, manager.fibersAllocated());
1414 EXPECT_EQ(5, manager.fibersPoolSize());
1417 TEST(FiberManager, batonWaitTimeoutHandler) {
1418 FiberManager manager(std::make_unique<EventBaseLoopController>());
1420 folly::EventBase evb;
1421 dynamic_cast<EventBaseLoopController&>(manager.loopController())
1422 .attachEventBase(evb);
1424 size_t fibersRun = 0;
1426 Baton::TimeoutHandler timeoutHandler;
1428 manager.addTask([&]() {
1429 baton.wait(timeoutHandler);
1432 manager.loopUntilNoReady();
1434 EXPECT_FALSE(baton.try_wait());
1435 EXPECT_EQ(0, fibersRun);
1437 timeoutHandler.scheduleTimeout(std::chrono::milliseconds(250));
1438 std::this_thread::sleep_for(std::chrono::milliseconds(500));
1440 EXPECT_FALSE(baton.try_wait());
1441 EXPECT_EQ(0, fibersRun);
1444 manager.loopUntilNoReady();
1446 EXPECT_EQ(1, fibersRun);
1449 TEST(FiberManager, batonWaitTimeoutMany) {
1450 FiberManager manager(std::make_unique<EventBaseLoopController>());
1452 folly::EventBase evb;
1453 dynamic_cast<EventBaseLoopController&>(manager.loopController())
1454 .attachEventBase(evb);
1456 constexpr size_t kNumTimeoutTasks = 10000;
1457 size_t tasksCount = kNumTimeoutTasks;
1459 // We add many tasks to hit timeout queue deallocation logic.
1460 for (size_t i = 0; i < kNumTimeoutTasks; ++i) {
1461 manager.addTask([&]() {
1463 Baton::TimeoutHandler timeoutHandler;
1465 folly::fibers::addTask([&] {
1466 timeoutHandler.scheduleTimeout(std::chrono::milliseconds(1000));
1469 baton.wait(timeoutHandler);
1470 if (--tasksCount == 0) {
1471 evb.terminateLoopSoon();
1479 TEST(FiberManager, remoteFutureTest) {
1480 FiberManager fiberManager(std::make_unique<SimpleLoopController>());
1481 auto& loopController =
1482 dynamic_cast<SimpleLoopController&>(fiberManager.loopController());
1486 auto f1 = fiberManager.addTaskFuture([&]() { return testValue1; });
1487 auto f2 = fiberManager.addTaskRemoteFuture([&]() { return testValue2; });
1488 loopController.loop([&]() { loopController.stop(); });
1492 EXPECT_EQ(v1, testValue1);
1493 EXPECT_EQ(v2, testValue2);
1496 // Test that a void function produes a Future<Unit>.
1497 TEST(FiberManager, remoteFutureVoidUnitTest) {
1498 FiberManager fiberManager(std::make_unique<SimpleLoopController>());
1499 auto& loopController =
1500 dynamic_cast<SimpleLoopController&>(fiberManager.loopController());
1502 bool ranLocal = false;
1503 folly::Future<folly::Unit> futureLocal =
1504 fiberManager.addTaskFuture([&]() { ranLocal = true; });
1506 bool ranRemote = false;
1507 folly::Future<folly::Unit> futureRemote =
1508 fiberManager.addTaskRemoteFuture([&]() { ranRemote = true; });
1510 loopController.loop([&]() { loopController.stop(); });
1513 ASSERT_TRUE(ranLocal);
1515 futureRemote.wait();
1516 ASSERT_TRUE(ranRemote);
1519 TEST(FiberManager, nestedFiberManagers) {
1520 folly::EventBase outerEvb;
1521 folly::EventBase innerEvb;
1523 getFiberManager(outerEvb).addTask([&]() {
1525 &getFiberManager(outerEvb), FiberManager::getFiberManagerUnsafe());
1527 runInMainContext([&]() {
1528 getFiberManager(innerEvb).addTask([&]() {
1530 &getFiberManager(innerEvb), FiberManager::getFiberManagerUnsafe());
1532 innerEvb.terminateLoopSoon();
1535 innerEvb.loopForever();
1539 &getFiberManager(outerEvb), FiberManager::getFiberManagerUnsafe());
1541 outerEvb.terminateLoopSoon();
1544 outerEvb.loopForever();
1547 TEST(FiberManager, semaphore) {
1548 static constexpr size_t kTasks = 10;
1549 static constexpr size_t kIterations = 10000;
1550 static constexpr size_t kNumTokens = 10;
1552 Semaphore sem(kNumTokens);
1556 auto task = [&sem](int& counter, folly::fibers::Baton& baton) {
1557 FiberManager manager(std::make_unique<EventBaseLoopController>());
1558 folly::EventBase evb;
1559 dynamic_cast<EventBaseLoopController&>(manager.loopController())
1560 .attachEventBase(evb);
1563 std::shared_ptr<folly::EventBase> completionCounter(
1564 &evb, [](folly::EventBase* evb) { evb->terminateLoopSoon(); });
1566 for (size_t i = 0; i < kTasks; ++i) {
1567 manager.addTask([&, completionCounter]() {
1568 for (size_t j = 0; j < kIterations; ++j) {
1574 EXPECT_LT(counter, kNumTokens);
1575 EXPECT_GE(counter, 0);
1585 folly::fibers::Baton batonA;
1586 folly::fibers::Baton batonB;
1587 std::thread threadA([&] { task(counterA, batonA); });
1588 std::thread threadB([&] { task(counterB, batonB); });
1595 EXPECT_LT(counterA, kNumTokens);
1596 EXPECT_LT(counterB, kNumTokens);
1597 EXPECT_GE(counterA, 0);
1598 EXPECT_GE(counterB, 0);
1601 template <typename ExecutorT>
1602 void singleBatchDispatch(ExecutorT& executor, int batchSize, int index) {
1603 thread_local BatchDispatcher<int, std::string, ExecutorT> batchDispatcher(
1604 executor, [=](std::vector<int>&& batch) {
1605 EXPECT_EQ(batchSize, batch.size());
1606 std::vector<std::string> results;
1607 for (auto& it : batch) {
1608 results.push_back(folly::to<std::string>(it));
1613 auto indexCopy = index;
1614 auto result = batchDispatcher.add(std::move(indexCopy));
1615 EXPECT_EQ(folly::to<std::string>(index), result.get());
1618 TEST(FiberManager, batchDispatchTest) {
1619 folly::EventBase evb;
1620 auto& executor = getFiberManager(evb);
1622 // Launch multiple fibers with a single id.
1623 executor.add([&]() {
1625 for (int i = 0; i < batchSize; i++) {
1627 [=, &executor]() { singleBatchDispatch(executor, batchSize, i); });
1632 // Reuse the same BatchDispatcher to batch once again.
1633 executor.add([&]() {
1635 for (int i = 0; i < batchSize; i++) {
1637 [=, &executor]() { singleBatchDispatch(executor, batchSize, i); });
1643 template <typename ExecutorT>
1644 folly::Future<std::vector<std::string>> doubleBatchInnerDispatch(
1645 ExecutorT& executor,
1646 int totalNumberOfElements,
1647 std::vector<int> input) {
1648 thread_local BatchDispatcher<
1650 std::vector<std::string>,
1652 batchDispatcher(executor, [=](std::vector<std::vector<int>>&& batch) {
1653 std::vector<std::vector<std::string>> results;
1654 int numberOfElements = 0;
1655 for (auto& unit : batch) {
1656 numberOfElements += unit.size();
1657 std::vector<std::string> result;
1658 for (auto& element : unit) {
1659 result.push_back(folly::to<std::string>(element));
1661 results.push_back(std::move(result));
1663 EXPECT_EQ(totalNumberOfElements, numberOfElements);
1667 return batchDispatcher.add(std::move(input));
1671 * Batch values in groups of 5, and then call inner dispatch.
1673 template <typename ExecutorT>
1674 void doubleBatchOuterDispatch(
1675 ExecutorT& executor,
1676 int totalNumberOfElements,
1678 thread_local BatchDispatcher<int, std::string, ExecutorT>
1679 batchDispatcher(executor, [=, &executor](std::vector<int>&& batch) {
1680 EXPECT_EQ(totalNumberOfElements, batch.size());
1681 std::vector<std::string> results;
1682 std::vector<folly::Future<std::vector<std::string>>>
1683 innerDispatchResultFutures;
1685 std::vector<int> group;
1686 for (auto unit : batch) {
1687 group.push_back(unit);
1688 if (group.size() == 5) {
1689 auto localGroup = group;
1692 innerDispatchResultFutures.push_back(doubleBatchInnerDispatch(
1693 executor, totalNumberOfElements, localGroup));
1698 innerDispatchResultFutures.begin(), innerDispatchResultFutures.end())
1700 std::vector<Try<std::vector<std::string>>> innerDispatchResults) {
1701 for (auto& unit : innerDispatchResults) {
1702 for (auto& element : unit.value()) {
1703 results.push_back(element);
1711 auto indexCopy = index;
1712 auto result = batchDispatcher.add(std::move(indexCopy));
1713 EXPECT_EQ(folly::to<std::string>(index), result.get());
1716 TEST(FiberManager, doubleBatchDispatchTest) {
1717 folly::EventBase evb;
1718 auto& executor = getFiberManager(evb);
1720 // Launch multiple fibers with a single id.
1721 executor.add([&]() {
1722 int totalNumberOfElements = 20;
1723 for (int i = 0; i < totalNumberOfElements; i++) {
1724 executor.add([=, &executor]() {
1725 doubleBatchOuterDispatch(executor, totalNumberOfElements, i);
1732 template <typename ExecutorT>
1733 void batchDispatchExceptionHandling(ExecutorT& executor, int i) {
1734 thread_local BatchDispatcher<int, int, ExecutorT> batchDispatcher(
1735 executor, [](std::vector<int> &&) -> std::vector<int> {
1736 throw std::runtime_error("Surprise!!");
1739 EXPECT_THROW(batchDispatcher.add(i).get(), std::runtime_error);
1742 TEST(FiberManager, batchDispatchExceptionHandlingTest) {
1743 folly::EventBase evb;
1744 auto& executor = getFiberManager(evb);
1746 // Launch multiple fibers with a single id.
1747 executor.add([&]() {
1748 int totalNumberOfElements = 5;
1749 for (int i = 0; i < totalNumberOfElements; i++) {
1751 [=, &executor]() { batchDispatchExceptionHandling(executor, i); });
1757 namespace AtomicBatchDispatcherTesting {
1759 using ValueT = size_t;
1760 using ResultT = std::string;
1761 using DispatchFunctionT =
1762 folly::Function<std::vector<ResultT>(std::vector<ValueT>&&)>;
1764 #define ENABLE_TRACE_IN_TEST 0 // Set to 1 to debug issues in ABD tests
1765 #if ENABLE_TRACE_IN_TEST
1766 #define OUTPUT_TRACE std::cerr
1767 #else // ENABLE_TRACE_IN_TEST
1768 struct DevNullPiper {
1769 template <typename T>
1770 DevNullPiper& operator<<(const T&) {
1774 DevNullPiper& operator<<(std::ostream& (*)(std::ostream&)) {
1778 #define OUTPUT_TRACE devNullPiper
1779 #endif // ENABLE_TRACE_IN_TEST
1782 AtomicBatchDispatcher<ValueT, ResultT>::Token token;
1785 void preprocess(FiberManager& executor, bool die) {
1786 // Yield for a random duration [0, 10] ms to simulate I/O in preprocessing
1787 clock_t msecToDoIO = folly::Random::rand32() % 10;
1788 double start = (1000.0 * clock()) / CLOCKS_PER_SEC;
1789 double endAfter = start + msecToDoIO;
1790 while ((1000.0 * clock()) / CLOCKS_PER_SEC < endAfter) {
1794 throw std::logic_error("Simulating preprocessing failure");
1798 Job(AtomicBatchDispatcher<ValueT, ResultT>::Token&& t, ValueT i)
1799 : token(std::move(t)), input(i) {}
1801 Job(Job&&) = default;
1802 Job& operator=(Job&&) = default;
1805 ResultT processSingleInput(ValueT&& input) {
1806 return folly::to<ResultT>(std::move(input));
1809 std::vector<ResultT> userDispatchFunc(std::vector<ValueT>&& inputs) {
1810 size_t expectedCount = inputs.size();
1811 std::vector<ResultT> results;
1812 results.reserve(expectedCount);
1813 for (size_t i = 0; i < expectedCount; ++i) {
1814 results.emplace_back(processSingleInput(std::move(inputs[i])));
1820 AtomicBatchDispatcher<ValueT, ResultT>& atomicBatchDispatcher,
1821 std::vector<Job>& jobs,
1824 for (size_t i = 0; i < count; ++i) {
1825 jobs.emplace_back(Job(atomicBatchDispatcher.getToken(), i));
1829 enum class DispatchProblem {
1836 FiberManager& executor,
1837 std::vector<Job>& jobs,
1838 std::vector<folly::Optional<folly::Future<ResultT>>>& results,
1839 DispatchProblem dispatchProblem = DispatchProblem::None,
1840 size_t problemIndex = size_t(-1)) {
1842 dispatchProblem == DispatchProblem::None || problemIndex < jobs.size());
1844 results.resize(jobs.size());
1845 for (size_t i = 0; i < jobs.size(); ++i) {
1847 [i, &executor, &jobs, &results, dispatchProblem, problemIndex]() {
1849 Job job(std::move(jobs[i]));
1851 if (dispatchProblem == DispatchProblem::PreprocessThrows) {
1852 if (i == problemIndex) {
1853 EXPECT_THROW(job.preprocess(executor, true), std::logic_error);
1858 job.preprocess(executor, false);
1859 OUTPUT_TRACE << "Dispatching job #" << i << std::endl;
1860 results[i] = job.token.dispatch(job.input);
1861 OUTPUT_TRACE << "Result future filled for job #" << i << std::endl;
1863 if (dispatchProblem == DispatchProblem::DuplicateDispatch) {
1864 if (i == problemIndex) {
1865 EXPECT_THROW(job.token.dispatch(job.input), ABDUsageException);
1869 OUTPUT_TRACE << "Preprocessing failed for job #" << i << std::endl;
1875 void validateResult(
1876 std::vector<folly::Optional<folly::Future<ResultT>>>& results,
1879 OUTPUT_TRACE << "results[" << i << "].value() : " << results[i]->value()
1881 } catch (std::exception& e) {
1882 OUTPUT_TRACE << "Exception : " << e.what() << std::endl;
1887 template <typename TException>
1888 void validateResults(
1889 std::vector<folly::Optional<folly::Future<ResultT>>>& results,
1890 size_t expectedNumResults) {
1891 size_t numResultsFilled = 0;
1892 for (size_t i = 0; i < results.size(); ++i) {
1897 EXPECT_THROW(validateResult(results, i), TException);
1899 EXPECT_EQ(numResultsFilled, expectedNumResults);
1902 void validateResults(
1903 std::vector<folly::Optional<folly::Future<ResultT>>>& results,
1904 size_t expectedNumResults) {
1905 size_t numResultsFilled = 0;
1906 for (size_t i = 0; i < results.size(); ++i) {
1911 EXPECT_NO_THROW(validateResult(results, i));
1912 ValueT expectedInput = i;
1914 results[i]->value(), processSingleInput(std::move(expectedInput)));
1916 EXPECT_EQ(numResultsFilled, expectedNumResults);
1919 } // AtomicBatchDispatcherTesting
1921 #define SET_UP_TEST_FUNC \
1922 using namespace AtomicBatchDispatcherTesting; \
1923 folly::EventBase evb; \
1924 auto& executor = getFiberManager(evb); \
1925 const size_t COUNT = 11; \
1926 std::vector<Job> jobs; \
1927 jobs.reserve(COUNT); \
1928 std::vector<folly::Optional<folly::Future<ResultT>>> results; \
1929 results.reserve(COUNT); \
1930 DispatchFunctionT dispatchFunc
1932 TEST(FiberManager, ABD_Test) {
1936 // Testing AtomicBatchDispatcher with explicit call to commit()
1938 dispatchFunc = userDispatchFunc;
1939 auto atomicBatchDispatcher =
1940 createAtomicBatchDispatcher(std::move(dispatchFunc));
1941 createJobs(atomicBatchDispatcher, jobs, COUNT);
1942 dispatchJobs(executor, jobs, results);
1943 atomicBatchDispatcher.commit();
1945 validateResults(results, COUNT);
1948 TEST(FiberManager, ABD_DispatcherDestroyedBeforeCallingCommit) {
1952 // Testing AtomicBatchDispatcher destroyed before calling commit.
1953 // Handles error cases for:
1954 // - User might have forgotten to add the call to commit() in the code
1955 // - An unexpected exception got thrown in user code before commit() is called
1958 dispatchFunc = userDispatchFunc;
1959 auto atomicBatchDispatcher =
1960 createAtomicBatchDispatcher(std::move(dispatchFunc));
1961 createJobs(atomicBatchDispatcher, jobs, COUNT);
1962 dispatchJobs(executor, jobs, results);
1963 throw std::runtime_error(
1964 "Unexpected exception in user code before commit called");
1965 // atomicBatchDispatcher.commit();
1967 /* User code handles the exception and does not exit process */
1970 validateResults<ABDCommitNotCalledException>(results, COUNT);
1973 TEST(FiberManager, ABD_PreprocessingFailureTest) {
1977 // Testing preprocessing failure on a job throws
1979 dispatchFunc = userDispatchFunc;
1980 auto atomicBatchDispatcher =
1981 createAtomicBatchDispatcher(std::move(dispatchFunc));
1982 createJobs(atomicBatchDispatcher, jobs, COUNT);
1983 dispatchJobs(executor, jobs, results, DispatchProblem::PreprocessThrows, 8);
1984 atomicBatchDispatcher.commit();
1986 validateResults<ABDTokenNotDispatchedException>(results, COUNT - 1);
1989 TEST(FiberManager, ABD_MultipleDispatchOnSameTokenErrorTest) {
1993 // Testing that calling dispatch more than once on the same token throws
1995 dispatchFunc = userDispatchFunc;
1996 auto atomicBatchDispatcher =
1997 createAtomicBatchDispatcher(std::move(dispatchFunc));
1998 createJobs(atomicBatchDispatcher, jobs, COUNT);
1999 dispatchJobs(executor, jobs, results, DispatchProblem::DuplicateDispatch, 4);
2000 atomicBatchDispatcher.commit();
2004 TEST(FiberManager, ABD_GetTokenCalledAfterCommitTest) {
2008 // Testing that exception set on attempt to call getToken after commit called
2010 dispatchFunc = userDispatchFunc;
2011 auto atomicBatchDispatcher =
2012 createAtomicBatchDispatcher(std::move(dispatchFunc));
2013 createJobs(atomicBatchDispatcher, jobs, COUNT);
2014 atomicBatchDispatcher.commit();
2015 EXPECT_THROW(atomicBatchDispatcher.getToken(), ABDUsageException);
2016 dispatchJobs(executor, jobs, results);
2017 EXPECT_THROW(atomicBatchDispatcher.getToken(), ABDUsageException);
2019 validateResults(results, COUNT);
2020 EXPECT_THROW(atomicBatchDispatcher.getToken(), ABDUsageException);
2023 TEST(FiberManager, ABD_UserProvidedBatchDispatchThrowsTest) {
2027 // Testing that exception is set if user provided batch dispatch throws
2029 dispatchFunc = [](std::vector<ValueT>&& inputs) -> std::vector<ResultT> {
2030 (void)userDispatchFunc(std::move(inputs));
2031 throw std::runtime_error("Unexpected exception in user dispatch function");
2033 auto atomicBatchDispatcher =
2034 createAtomicBatchDispatcher(std::move(dispatchFunc));
2035 createJobs(atomicBatchDispatcher, jobs, COUNT);
2036 dispatchJobs(executor, jobs, results);
2037 atomicBatchDispatcher.commit();
2039 validateResults<std::runtime_error>(results, COUNT);
2042 TEST(FiberManager, VirtualEventBase) {
2046 folly::ScopedEventBaseThread thread;
2049 std::make_unique<folly::VirtualEventBase>(*thread.getEventBase());
2050 auto& evb2 = thread.getEventBase()->getVirtualEventBase();
2052 getFiberManager(*evb1).addTaskRemote([&] {
2054 baton.timed_wait(std::chrono::milliseconds{100});
2059 getFiberManager(evb2).addTaskRemote([&] {
2061 baton.timed_wait(std::chrono::milliseconds{200});
2066 EXPECT_FALSE(done1);
2067 EXPECT_FALSE(done2);
2071 EXPECT_FALSE(done2);
2076 TEST(TimedMutex, ThreadFiberDeadlockOrder) {
2077 folly::EventBase evb;
2078 auto& fm = getFiberManager(evb);
2082 std::thread unlockThread([&] {
2083 /* sleep override */ std::this_thread::sleep_for(
2084 std::chrono::milliseconds{100});
2088 fm.addTask([&] { std::lock_guard<TimedMutex> lg(mutex); });
2090 runInMainContext([&] {
2091 auto locked = mutex.timed_lock(std::chrono::seconds{1});
2092 EXPECT_TRUE(locked);
2100 EXPECT_EQ(0, fm.hasTasks());
2102 unlockThread.join();
2105 TEST(TimedMutex, ThreadFiberDeadlockRace) {
2106 folly::EventBase evb;
2107 auto& fm = getFiberManager(evb);
2113 auto locked = mutex.timed_lock(std::chrono::seconds{1});
2114 EXPECT_TRUE(locked);
2121 runInMainContext([&] {
2122 auto locked = mutex.timed_lock(std::chrono::seconds{1});
2123 EXPECT_TRUE(locked);
2131 EXPECT_EQ(0, fm.hasTasks());
2135 * Test that we can properly track fiber stack usage.
2137 * This functionality can only be enabled when ASAN is disabled, so avoid
2138 * running this test with ASAN.
2140 #ifndef FOLLY_SANITIZE_ADDRESS
2141 TEST(FiberManager, recordStack) {
2143 folly::fibers::FiberManager::Options opts;
2144 opts.recordStackEvery = 1;
2146 FiberManager fm(std::make_unique<SimpleLoopController>(), opts);
2147 auto& loopController =
2148 dynamic_cast<SimpleLoopController&>(fm.loopController());
2150 static constexpr size_t n = 1000;
2154 for (size_t i = 0; i < n; ++i) {
2157 for (size_t i = 0; i + 1 < n; ++i) {
2158 s += b[i] * b[i + 1];
2164 loopController.loop([&]() { loopController.stop(); });
2166 // Check that we properly accounted fiber stack usage.
2167 EXPECT_LT(n * sizeof(int), fm.stackHighWatermark());