2 * Copyright 2017 Facebook, Inc.
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
8 * http://www.apache.org/licenses/LICENSE-2.0
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
20 #include <folly/Memory.h>
21 #include <folly/Random.h>
22 #include <folly/futures/Future.h>
24 #include <folly/Conv.h>
25 #include <folly/fibers/AddTasks.h>
26 #include <folly/fibers/AtomicBatchDispatcher.h>
27 #include <folly/fibers/BatchDispatcher.h>
28 #include <folly/fibers/EventBaseLoopController.h>
29 #include <folly/fibers/FiberManager.h>
30 #include <folly/fibers/FiberManagerMap.h>
31 #include <folly/fibers/GenericBaton.h>
32 #include <folly/fibers/Semaphore.h>
33 #include <folly/fibers/SimpleLoopController.h>
34 #include <folly/fibers/TimedMutex.h>
35 #include <folly/fibers/WhenN.h>
36 #include <folly/io/async/ScopedEventBaseThread.h>
37 #include <folly/portability/GTest.h>
39 using namespace folly::fibers;
43 TEST(FiberManager, batonTimedWaitTimeout) {
44 bool taskAdded = false;
45 size_t iterations = 0;
47 FiberManager manager(std::make_unique<SimpleLoopController>());
48 auto& loopController =
49 dynamic_cast<SimpleLoopController&>(manager.loopController());
51 auto loopFunc = [&]() {
53 manager.addTask([&]() {
56 auto res = baton.timed_wait(std::chrono::milliseconds(230));
59 EXPECT_EQ(5, iterations);
61 loopController.stop();
63 manager.addTask([&]() {
66 auto res = baton.timed_wait(std::chrono::milliseconds(130));
69 EXPECT_EQ(3, iterations);
71 loopController.stop();
75 std::this_thread::sleep_for(std::chrono::milliseconds(50));
80 loopController.loop(std::move(loopFunc));
83 TEST(FiberManager, batonTimedWaitPost) {
84 bool taskAdded = false;
85 size_t iterations = 0;
88 FiberManager manager(std::make_unique<SimpleLoopController>());
89 auto& loopController =
90 dynamic_cast<SimpleLoopController&>(manager.loopController());
92 auto loopFunc = [&]() {
94 manager.addTask([&]() {
98 auto res = baton.timed_wait(std::chrono::milliseconds(130));
101 EXPECT_EQ(2, iterations);
103 loopController.stop();
107 std::this_thread::sleep_for(std::chrono::milliseconds(50));
109 if (iterations == 2) {
115 loopController.loop(std::move(loopFunc));
118 TEST(FiberManager, batonTimedWaitTimeoutEvb) {
119 size_t tasksComplete = 0;
121 folly::EventBase evb;
123 FiberManager manager(std::make_unique<EventBaseLoopController>());
124 dynamic_cast<EventBaseLoopController&>(manager.loopController())
125 .attachEventBase(evb);
127 auto task = [&](size_t timeout_ms) {
130 auto start = EventBaseLoopController::Clock::now();
131 auto res = baton.timed_wait(std::chrono::milliseconds(timeout_ms));
132 auto finish = EventBaseLoopController::Clock::now();
137 std::chrono::duration_cast<std::chrono::milliseconds>(finish - start);
139 EXPECT_GT(duration_ms.count(), timeout_ms - 50);
140 EXPECT_LT(duration_ms.count(), timeout_ms + 50);
142 if (++tasksComplete == 2) {
143 evb.terminateLoopSoon();
147 evb.runInEventBaseThread([&]() {
148 manager.addTask([&]() { task(500); });
149 manager.addTask([&]() { task(250); });
154 EXPECT_EQ(2, tasksComplete);
157 TEST(FiberManager, batonTimedWaitPostEvb) {
158 size_t tasksComplete = 0;
160 folly::EventBase evb;
162 FiberManager manager(std::make_unique<EventBaseLoopController>());
163 dynamic_cast<EventBaseLoopController&>(manager.loopController())
164 .attachEventBase(evb);
166 evb.runInEventBaseThread([&]() {
167 manager.addTask([&]() {
170 evb.tryRunAfterDelay([&]() { baton.post(); }, 100);
172 auto start = EventBaseLoopController::Clock::now();
173 auto res = baton.timed_wait(std::chrono::milliseconds(130));
174 auto finish = EventBaseLoopController::Clock::now();
179 std::chrono::duration_cast<std::chrono::milliseconds>(finish - start);
181 EXPECT_TRUE(duration_ms.count() > 95 && duration_ms.count() < 110);
183 if (++tasksComplete == 1) {
184 evb.terminateLoopSoon();
191 EXPECT_EQ(1, tasksComplete);
194 TEST(FiberManager, batonTryWait) {
195 FiberManager manager(std::make_unique<SimpleLoopController>());
197 // Check if try_wait and post work as expected
200 manager.addTask([&]() {
201 while (!b.try_wait()) {
204 auto thr = std::thread([&]() {
205 std::this_thread::sleep_for(std::chrono::milliseconds(300));
209 manager.loopUntilNoReady();
214 // Check try_wait without post
215 manager.addTask([&]() {
217 while (cnt && !c.try_wait()) {
220 EXPECT_TRUE(!c.try_wait()); // must still hold
224 manager.loopUntilNoReady();
227 TEST(FiberManager, genericBatonFiberWait) {
228 FiberManager manager(std::make_unique<SimpleLoopController>());
231 bool fiberRunning = false;
233 manager.addTask([&]() {
234 EXPECT_EQ(manager.hasActiveFiber(), true);
237 fiberRunning = false;
240 EXPECT_FALSE(fiberRunning);
241 manager.loopUntilNoReady();
242 EXPECT_TRUE(fiberRunning); // ensure fiber still active
244 auto thr = std::thread([&]() {
245 std::this_thread::sleep_for(std::chrono::milliseconds(300));
249 while (fiberRunning) {
250 manager.loopUntilNoReady();
256 TEST(FiberManager, genericBatonThreadWait) {
257 FiberManager manager(std::make_unique<SimpleLoopController>());
259 std::atomic<bool> threadWaiting(false);
261 auto thr = std::thread([&]() {
262 threadWaiting = true;
264 threadWaiting = false;
267 while (!threadWaiting) {
269 std::this_thread::sleep_for(std::chrono::milliseconds(300));
271 manager.addTask([&]() {
272 EXPECT_EQ(manager.hasActiveFiber(), true);
273 EXPECT_TRUE(threadWaiting);
275 while (threadWaiting) {
279 manager.loopUntilNoReady();
283 TEST(FiberManager, addTasksNoncopyable) {
284 std::vector<Promise<int>> pendingFibers;
285 bool taskAdded = false;
287 FiberManager manager(std::make_unique<SimpleLoopController>());
288 auto& loopController =
289 dynamic_cast<SimpleLoopController&>(manager.loopController());
291 auto loopFunc = [&]() {
293 manager.addTask([&]() {
294 std::vector<std::function<std::unique_ptr<int>()>> funcs;
295 for (int i = 0; i < 3; ++i) {
296 funcs.push_back([i, &pendingFibers]() {
297 await([&pendingFibers](Promise<int> promise) {
298 pendingFibers.push_back(std::move(promise));
300 return std::make_unique<int>(i * 2 + 1);
304 auto iter = addTasks(funcs.begin(), funcs.end());
307 while (iter.hasNext()) {
308 auto result = iter.awaitNext();
309 EXPECT_EQ(2 * iter.getTaskID() + 1, *result);
310 EXPECT_GE(2 - n, pendingFibers.size());
316 } else if (pendingFibers.size()) {
317 pendingFibers.back().setValue(0);
318 pendingFibers.pop_back();
320 loopController.stop();
324 loopController.loop(std::move(loopFunc));
327 TEST(FiberManager, awaitThrow) {
328 folly::EventBase evb;
329 struct ExpectedException {};
333 await([](Promise<int> p) {
335 throw ExpectedException();
341 await([&](Promise<int> p) {
342 evb.runInEventBaseThread([p = std::move(p)]() mutable {
345 throw ExpectedException();
352 TEST(FiberManager, addTasksThrow) {
353 std::vector<Promise<int>> pendingFibers;
354 bool taskAdded = false;
356 FiberManager manager(std::make_unique<SimpleLoopController>());
357 auto& loopController =
358 dynamic_cast<SimpleLoopController&>(manager.loopController());
360 auto loopFunc = [&]() {
362 manager.addTask([&]() {
363 std::vector<std::function<int()>> funcs;
364 for (size_t i = 0; i < 3; ++i) {
365 funcs.push_back([i, &pendingFibers]() {
366 await([&pendingFibers](Promise<int> promise) {
367 pendingFibers.push_back(std::move(promise));
370 throw std::runtime_error("Runtime");
376 auto iter = addTasks(funcs.begin(), funcs.end());
379 while (iter.hasNext()) {
381 int result = iter.awaitNext();
382 EXPECT_EQ(1, iter.getTaskID() % 2);
383 EXPECT_EQ(2 * iter.getTaskID() + 1, result);
385 EXPECT_EQ(0, iter.getTaskID() % 2);
387 EXPECT_GE(2 - n, pendingFibers.size());
393 } else if (pendingFibers.size()) {
394 pendingFibers.back().setValue(0);
395 pendingFibers.pop_back();
397 loopController.stop();
401 loopController.loop(std::move(loopFunc));
404 TEST(FiberManager, addTasksVoid) {
405 std::vector<Promise<int>> pendingFibers;
406 bool taskAdded = false;
408 FiberManager manager(std::make_unique<SimpleLoopController>());
409 auto& loopController =
410 dynamic_cast<SimpleLoopController&>(manager.loopController());
412 auto loopFunc = [&]() {
414 manager.addTask([&]() {
415 std::vector<std::function<void()>> funcs;
416 for (size_t i = 0; i < 3; ++i) {
417 funcs.push_back([i, &pendingFibers]() {
418 await([&pendingFibers](Promise<int> promise) {
419 pendingFibers.push_back(std::move(promise));
424 auto iter = addTasks(funcs.begin(), funcs.end());
427 while (iter.hasNext()) {
429 EXPECT_GE(2 - n, pendingFibers.size());
435 } else if (pendingFibers.size()) {
436 pendingFibers.back().setValue(0);
437 pendingFibers.pop_back();
439 loopController.stop();
443 loopController.loop(std::move(loopFunc));
446 TEST(FiberManager, addTasksVoidThrow) {
447 std::vector<Promise<int>> pendingFibers;
448 bool taskAdded = false;
450 FiberManager manager(std::make_unique<SimpleLoopController>());
451 auto& loopController =
452 dynamic_cast<SimpleLoopController&>(manager.loopController());
454 auto loopFunc = [&]() {
456 manager.addTask([&]() {
457 std::vector<std::function<void()>> funcs;
458 for (size_t i = 0; i < 3; ++i) {
459 funcs.push_back([i, &pendingFibers]() {
460 await([&pendingFibers](Promise<int> promise) {
461 pendingFibers.push_back(std::move(promise));
464 throw std::runtime_error("");
469 auto iter = addTasks(funcs.begin(), funcs.end());
472 while (iter.hasNext()) {
475 EXPECT_EQ(1, iter.getTaskID() % 2);
477 EXPECT_EQ(0, iter.getTaskID() % 2);
479 EXPECT_GE(2 - n, pendingFibers.size());
485 } else if (pendingFibers.size()) {
486 pendingFibers.back().setValue(0);
487 pendingFibers.pop_back();
489 loopController.stop();
493 loopController.loop(std::move(loopFunc));
496 TEST(FiberManager, addTasksReserve) {
497 std::vector<Promise<int>> pendingFibers;
498 bool taskAdded = false;
500 FiberManager manager(std::make_unique<SimpleLoopController>());
501 auto& loopController =
502 dynamic_cast<SimpleLoopController&>(manager.loopController());
504 auto loopFunc = [&]() {
506 manager.addTask([&]() {
507 std::vector<std::function<void()>> funcs;
508 for (size_t i = 0; i < 3; ++i) {
509 funcs.push_back([&pendingFibers]() {
510 await([&pendingFibers](Promise<int> promise) {
511 pendingFibers.push_back(std::move(promise));
516 auto iter = addTasks(funcs.begin(), funcs.end());
519 EXPECT_TRUE(iter.hasCompleted());
520 EXPECT_TRUE(iter.hasPending());
521 EXPECT_TRUE(iter.hasNext());
524 EXPECT_TRUE(iter.hasCompleted());
525 EXPECT_TRUE(iter.hasPending());
526 EXPECT_TRUE(iter.hasNext());
529 EXPECT_FALSE(iter.hasCompleted());
530 EXPECT_TRUE(iter.hasPending());
531 EXPECT_TRUE(iter.hasNext());
534 EXPECT_FALSE(iter.hasCompleted());
535 EXPECT_FALSE(iter.hasPending());
536 EXPECT_FALSE(iter.hasNext());
539 } else if (pendingFibers.size()) {
540 pendingFibers.back().setValue(0);
541 pendingFibers.pop_back();
543 loopController.stop();
547 loopController.loop(std::move(loopFunc));
550 TEST(FiberManager, addTaskDynamic) {
551 folly::EventBase evb;
555 auto makeTask = [&](size_t taskId) {
556 return [&, taskId]() -> size_t {
557 batons[taskId].wait();
563 .addTaskFuture([&]() {
564 TaskIterator<size_t> iterator;
566 iterator.addTask(makeTask(0));
567 iterator.addTask(makeTask(1));
571 EXPECT_EQ(1, iterator.awaitNext());
573 iterator.addTask(makeTask(2));
577 EXPECT_EQ(2, iterator.awaitNext());
581 EXPECT_EQ(0, iterator.awaitNext());
586 TEST(FiberManager, forEach) {
587 std::vector<Promise<int>> pendingFibers;
588 bool taskAdded = false;
590 FiberManager manager(std::make_unique<SimpleLoopController>());
591 auto& loopController =
592 dynamic_cast<SimpleLoopController&>(manager.loopController());
594 auto loopFunc = [&]() {
596 manager.addTask([&]() {
597 std::vector<std::function<int()>> funcs;
598 for (size_t i = 0; i < 3; ++i) {
599 funcs.push_back([i, &pendingFibers]() {
600 await([&pendingFibers](Promise<int> promise) {
601 pendingFibers.push_back(std::move(promise));
607 std::vector<std::pair<size_t, int>> results;
608 forEach(funcs.begin(), funcs.end(), [&results](size_t id, int result) {
609 results.emplace_back(id, result);
611 EXPECT_EQ(3, results.size());
612 EXPECT_TRUE(pendingFibers.empty());
613 for (size_t i = 0; i < 3; ++i) {
614 EXPECT_EQ(results[i].first * 2 + 1, results[i].second);
618 } else if (pendingFibers.size()) {
619 pendingFibers.back().setValue(0);
620 pendingFibers.pop_back();
622 loopController.stop();
626 loopController.loop(std::move(loopFunc));
629 TEST(FiberManager, collectN) {
630 std::vector<Promise<int>> pendingFibers;
631 bool taskAdded = false;
633 FiberManager manager(std::make_unique<SimpleLoopController>());
634 auto& loopController =
635 dynamic_cast<SimpleLoopController&>(manager.loopController());
637 auto loopFunc = [&]() {
639 manager.addTask([&]() {
640 std::vector<std::function<int()>> funcs;
641 for (size_t i = 0; i < 3; ++i) {
642 funcs.push_back([i, &pendingFibers]() {
643 await([&pendingFibers](Promise<int> promise) {
644 pendingFibers.push_back(std::move(promise));
650 auto results = collectN(funcs.begin(), funcs.end(), 2);
651 EXPECT_EQ(2, results.size());
652 EXPECT_EQ(1, pendingFibers.size());
653 for (size_t i = 0; i < 2; ++i) {
654 EXPECT_EQ(results[i].first * 2 + 1, results[i].second);
658 } else if (pendingFibers.size()) {
659 pendingFibers.back().setValue(0);
660 pendingFibers.pop_back();
662 loopController.stop();
666 loopController.loop(std::move(loopFunc));
669 TEST(FiberManager, collectNThrow) {
670 std::vector<Promise<int>> pendingFibers;
671 bool taskAdded = false;
673 FiberManager manager(std::make_unique<SimpleLoopController>());
674 auto& loopController =
675 dynamic_cast<SimpleLoopController&>(manager.loopController());
677 auto loopFunc = [&]() {
679 manager.addTask([&]() {
680 std::vector<std::function<int()>> funcs;
681 for (size_t i = 0; i < 3; ++i) {
682 funcs.push_back([i, &pendingFibers]() -> size_t {
683 await([&pendingFibers](Promise<int> promise) {
684 pendingFibers.push_back(std::move(promise));
686 throw std::runtime_error("Runtime");
691 collectN(funcs.begin(), funcs.end(), 2);
693 EXPECT_EQ(1, pendingFibers.size());
697 } else if (pendingFibers.size()) {
698 pendingFibers.back().setValue(0);
699 pendingFibers.pop_back();
701 loopController.stop();
705 loopController.loop(std::move(loopFunc));
708 TEST(FiberManager, collectNVoid) {
709 std::vector<Promise<int>> pendingFibers;
710 bool taskAdded = false;
712 FiberManager manager(std::make_unique<SimpleLoopController>());
713 auto& loopController =
714 dynamic_cast<SimpleLoopController&>(manager.loopController());
716 auto loopFunc = [&]() {
718 manager.addTask([&]() {
719 std::vector<std::function<void()>> funcs;
720 for (size_t i = 0; i < 3; ++i) {
721 funcs.push_back([i, &pendingFibers]() {
722 await([&pendingFibers](Promise<int> promise) {
723 pendingFibers.push_back(std::move(promise));
728 auto results = collectN(funcs.begin(), funcs.end(), 2);
729 EXPECT_EQ(2, results.size());
730 EXPECT_EQ(1, pendingFibers.size());
733 } else if (pendingFibers.size()) {
734 pendingFibers.back().setValue(0);
735 pendingFibers.pop_back();
737 loopController.stop();
741 loopController.loop(std::move(loopFunc));
744 TEST(FiberManager, collectNVoidThrow) {
745 std::vector<Promise<int>> pendingFibers;
746 bool taskAdded = false;
748 FiberManager manager(std::make_unique<SimpleLoopController>());
749 auto& loopController =
750 dynamic_cast<SimpleLoopController&>(manager.loopController());
752 auto loopFunc = [&]() {
754 manager.addTask([&]() {
755 std::vector<std::function<void()>> funcs;
756 for (size_t i = 0; i < 3; ++i) {
757 funcs.push_back([i, &pendingFibers]() {
758 await([&pendingFibers](Promise<int> promise) {
759 pendingFibers.push_back(std::move(promise));
761 throw std::runtime_error("Runtime");
766 collectN(funcs.begin(), funcs.end(), 2);
768 EXPECT_EQ(1, pendingFibers.size());
772 } else if (pendingFibers.size()) {
773 pendingFibers.back().setValue(0);
774 pendingFibers.pop_back();
776 loopController.stop();
780 loopController.loop(std::move(loopFunc));
783 TEST(FiberManager, collectAll) {
784 std::vector<Promise<int>> pendingFibers;
785 bool taskAdded = false;
787 FiberManager manager(std::make_unique<SimpleLoopController>());
788 auto& loopController =
789 dynamic_cast<SimpleLoopController&>(manager.loopController());
791 auto loopFunc = [&]() {
793 manager.addTask([&]() {
794 std::vector<std::function<int()>> funcs;
795 for (size_t i = 0; i < 3; ++i) {
796 funcs.push_back([i, &pendingFibers]() {
797 await([&pendingFibers](Promise<int> promise) {
798 pendingFibers.push_back(std::move(promise));
804 auto results = collectAll(funcs.begin(), funcs.end());
805 EXPECT_TRUE(pendingFibers.empty());
806 for (size_t i = 0; i < 3; ++i) {
807 EXPECT_EQ(i * 2 + 1, results[i]);
811 } else if (pendingFibers.size()) {
812 pendingFibers.back().setValue(0);
813 pendingFibers.pop_back();
815 loopController.stop();
819 loopController.loop(std::move(loopFunc));
822 TEST(FiberManager, collectAllVoid) {
823 std::vector<Promise<int>> pendingFibers;
824 bool taskAdded = false;
826 FiberManager manager(std::make_unique<SimpleLoopController>());
827 auto& loopController =
828 dynamic_cast<SimpleLoopController&>(manager.loopController());
830 auto loopFunc = [&]() {
832 manager.addTask([&]() {
833 std::vector<std::function<void()>> funcs;
834 for (size_t i = 0; i < 3; ++i) {
835 funcs.push_back([i, &pendingFibers]() {
836 await([&pendingFibers](Promise<int> promise) {
837 pendingFibers.push_back(std::move(promise));
842 collectAll(funcs.begin(), funcs.end());
843 EXPECT_TRUE(pendingFibers.empty());
846 } else if (pendingFibers.size()) {
847 pendingFibers.back().setValue(0);
848 pendingFibers.pop_back();
850 loopController.stop();
854 loopController.loop(std::move(loopFunc));
857 TEST(FiberManager, collectAny) {
858 std::vector<Promise<int>> pendingFibers;
859 bool taskAdded = false;
861 FiberManager manager(std::make_unique<SimpleLoopController>());
862 auto& loopController =
863 dynamic_cast<SimpleLoopController&>(manager.loopController());
865 auto loopFunc = [&]() {
867 manager.addTask([&]() {
868 std::vector<std::function<int()>> funcs;
869 for (size_t i = 0; i < 3; ++i) {
870 funcs.push_back([i, &pendingFibers]() {
871 await([&pendingFibers](Promise<int> promise) {
872 pendingFibers.push_back(std::move(promise));
875 throw std::runtime_error("This exception will be ignored");
881 auto result = collectAny(funcs.begin(), funcs.end());
882 EXPECT_EQ(2, pendingFibers.size());
883 EXPECT_EQ(2, result.first);
884 EXPECT_EQ(2 * 2 + 1, result.second);
887 } else if (pendingFibers.size()) {
888 pendingFibers.back().setValue(0);
889 pendingFibers.pop_back();
891 loopController.stop();
895 loopController.loop(std::move(loopFunc));
899 /* Checks that this function was run from a main context,
900 by comparing an address on a stack to a known main stack address
901 and a known related fiber stack address. The assumption
902 is that fiber stack and main stack will be far enough apart,
903 while any two values on the same stack will be close. */
904 void expectMainContext(bool& ran, int* mainLocation, int* fiberLocation) {
906 /* 2 pages is a good guess */
907 constexpr ssize_t DISTANCE = 0x2000 / sizeof(int);
909 EXPECT_TRUE(std::abs(&here - fiberLocation) > DISTANCE);
912 EXPECT_TRUE(std::abs(&here - mainLocation) < DISTANCE);
920 TEST(FiberManager, runInMainContext) {
921 FiberManager manager(std::make_unique<SimpleLoopController>());
922 auto& loopController =
923 dynamic_cast<SimpleLoopController&>(manager.loopController());
925 bool checkRan = false;
928 manager.runInMainContext(
929 [&]() { expectMainContext(checkRan, &mainLocation, nullptr); });
930 EXPECT_TRUE(checkRan);
934 manager.addTask([&]() {
936 explicit A(int value_) : value(value_) {}
937 A(const A&) = delete;
943 auto ret = runInMainContext([&]() {
944 expectMainContext(checkRan, &mainLocation, &stackLocation);
947 EXPECT_TRUE(checkRan);
948 EXPECT_EQ(42, ret.value);
951 loopController.loop([&]() { loopController.stop(); });
953 EXPECT_TRUE(checkRan);
956 TEST(FiberManager, addTaskFinally) {
957 FiberManager manager(std::make_unique<SimpleLoopController>());
958 auto& loopController =
959 dynamic_cast<SimpleLoopController&>(manager.loopController());
961 bool checkRan = false;
965 manager.addTaskFinally(
966 [&]() { return 1234; },
967 [&](Try<int>&& result) {
968 EXPECT_EQ(result.value(), 1234);
970 expectMainContext(checkRan, &mainLocation, nullptr);
973 EXPECT_FALSE(checkRan);
975 loopController.loop([&]() { loopController.stop(); });
977 EXPECT_TRUE(checkRan);
980 TEST(FiberManager, fibersPoolWithinLimit) {
981 FiberManager::Options opts;
982 opts.maxFibersPoolSize = 5;
984 FiberManager manager(std::make_unique<SimpleLoopController>(), opts);
985 auto& loopController =
986 dynamic_cast<SimpleLoopController&>(manager.loopController());
988 size_t fibersRun = 0;
990 for (size_t i = 0; i < 5; ++i) {
991 manager.addTask([&]() { ++fibersRun; });
993 loopController.loop([&]() { loopController.stop(); });
995 EXPECT_EQ(5, fibersRun);
996 EXPECT_EQ(5, manager.fibersAllocated());
997 EXPECT_EQ(5, manager.fibersPoolSize());
999 for (size_t i = 0; i < 5; ++i) {
1000 manager.addTask([&]() { ++fibersRun; });
1002 loopController.loop([&]() { loopController.stop(); });
1004 EXPECT_EQ(10, fibersRun);
1005 EXPECT_EQ(5, manager.fibersAllocated());
1006 EXPECT_EQ(5, manager.fibersPoolSize());
1009 TEST(FiberManager, fibersPoolOverLimit) {
1010 FiberManager::Options opts;
1011 opts.maxFibersPoolSize = 5;
1013 FiberManager manager(std::make_unique<SimpleLoopController>(), opts);
1014 auto& loopController =
1015 dynamic_cast<SimpleLoopController&>(manager.loopController());
1017 size_t fibersRun = 0;
1019 for (size_t i = 0; i < 10; ++i) {
1020 manager.addTask([&]() { ++fibersRun; });
1023 EXPECT_EQ(0, fibersRun);
1024 EXPECT_EQ(10, manager.fibersAllocated());
1025 EXPECT_EQ(0, manager.fibersPoolSize());
1027 loopController.loop([&]() { loopController.stop(); });
1029 EXPECT_EQ(10, fibersRun);
1030 EXPECT_EQ(5, manager.fibersAllocated());
1031 EXPECT_EQ(5, manager.fibersPoolSize());
1034 TEST(FiberManager, remoteFiberBasic) {
1035 FiberManager manager(std::make_unique<SimpleLoopController>());
1036 auto& loopController =
1037 dynamic_cast<SimpleLoopController&>(manager.loopController());
1040 result[0] = result[1] = 0;
1041 folly::Optional<Promise<int>> savedPromise[2];
1042 manager.addTask([&]() {
1044 [&](Promise<int> promise) { savedPromise[0] = std::move(promise); });
1046 manager.addTask([&]() {
1048 [&](Promise<int> promise) { savedPromise[1] = std::move(promise); });
1051 manager.loopUntilNoReady();
1053 EXPECT_TRUE(savedPromise[0].hasValue());
1054 EXPECT_TRUE(savedPromise[1].hasValue());
1055 EXPECT_EQ(0, result[0]);
1056 EXPECT_EQ(0, result[1]);
1058 std::thread remoteThread0{[&]() { savedPromise[0]->setValue(42); }};
1059 std::thread remoteThread1{[&]() { savedPromise[1]->setValue(43); }};
1060 remoteThread0.join();
1061 remoteThread1.join();
1062 EXPECT_EQ(0, result[0]);
1063 EXPECT_EQ(0, result[1]);
1064 /* Should only have scheduled once */
1065 EXPECT_EQ(1, loopController.remoteScheduleCalled());
1067 manager.loopUntilNoReady();
1068 EXPECT_EQ(42, result[0]);
1069 EXPECT_EQ(43, result[1]);
1072 TEST(FiberManager, addTaskRemoteBasic) {
1073 FiberManager manager(std::make_unique<SimpleLoopController>());
1076 result[0] = result[1] = 0;
1077 folly::Optional<Promise<int>> savedPromise[2];
1079 std::thread remoteThread0{[&]() {
1080 manager.addTaskRemote([&]() {
1082 [&](Promise<int> promise) { savedPromise[0] = std::move(promise); });
1085 std::thread remoteThread1{[&]() {
1086 manager.addTaskRemote([&]() {
1088 [&](Promise<int> promise) { savedPromise[1] = std::move(promise); });
1091 remoteThread0.join();
1092 remoteThread1.join();
1094 manager.loopUntilNoReady();
1096 EXPECT_TRUE(savedPromise[0].hasValue());
1097 EXPECT_TRUE(savedPromise[1].hasValue());
1098 EXPECT_EQ(0, result[0]);
1099 EXPECT_EQ(0, result[1]);
1101 savedPromise[0]->setValue(42);
1102 savedPromise[1]->setValue(43);
1104 EXPECT_EQ(0, result[0]);
1105 EXPECT_EQ(0, result[1]);
1107 manager.loopUntilNoReady();
1108 EXPECT_EQ(42, result[0]);
1109 EXPECT_EQ(43, result[1]);
1112 TEST(FiberManager, remoteHasTasks) {
1114 FiberManager fm(std::make_unique<SimpleLoopController>());
1115 std::thread remote([&]() { fm.addTaskRemote([&]() { ++counter; }); });
1119 while (fm.hasTasks()) {
1120 fm.loopUntilNoReady();
1123 EXPECT_FALSE(fm.hasTasks());
1124 EXPECT_EQ(counter, 1);
1127 TEST(FiberManager, remoteHasReadyTasks) {
1129 folly::Optional<Promise<int>> savedPromise;
1130 FiberManager fm(std::make_unique<SimpleLoopController>());
1131 std::thread remote([&]() {
1132 fm.addTaskRemote([&]() {
1134 [&](Promise<int> promise) { savedPromise = std::move(promise); });
1135 EXPECT_TRUE(fm.hasTasks());
1140 EXPECT_TRUE(fm.hasTasks());
1142 fm.loopUntilNoReady();
1143 EXPECT_TRUE(fm.hasTasks());
1145 std::thread remote2([&]() { savedPromise->setValue(47); });
1147 EXPECT_TRUE(fm.hasTasks());
1149 fm.loopUntilNoReady();
1150 EXPECT_FALSE(fm.hasTasks());
1152 EXPECT_EQ(result, 47);
1155 template <typename Data>
1156 void testFiberLocal() {
1157 FiberManager fm(LocalType<Data>(), std::make_unique<SimpleLoopController>());
1160 EXPECT_EQ(42, local<Data>().value);
1162 local<Data>().value = 43;
1165 EXPECT_EQ(43, local<Data>().value);
1167 local<Data>().value = 44;
1169 addTask([]() { EXPECT_EQ(44, local<Data>().value); });
1174 EXPECT_EQ(42, local<Data>().value);
1176 local<Data>().value = 43;
1178 fm.addTaskRemote([]() { EXPECT_EQ(43, local<Data>().value); });
1182 EXPECT_EQ(42, local<Data>().value);
1183 local<Data>().value = 43;
1186 EXPECT_EQ(43, local<Data>().value);
1187 local<Data>().value = 44;
1189 std::vector<std::function<void()>> tasks{task};
1190 collectAny(tasks.begin(), tasks.end());
1192 EXPECT_EQ(43, local<Data>().value);
1195 fm.loopUntilNoReady();
1196 EXPECT_FALSE(fm.hasTasks());
1199 TEST(FiberManager, fiberLocal) {
1204 testFiberLocal<SimpleData>();
1207 TEST(FiberManager, fiberLocalHeap) {
1209 char _[1024 * 1024];
1213 testFiberLocal<LargeData>();
1216 TEST(FiberManager, fiberLocalDestructor) {
1223 EXPECT_EQ(42, local<CrazyData>().data);
1224 // Make sure we don't have infinite loop
1225 local<CrazyData>().data = 0;
1232 LocalType<CrazyData>(), std::make_unique<SimpleLoopController>());
1234 fm.addTask([]() { local<CrazyData>().data = 41; });
1236 fm.loopUntilNoReady();
1237 EXPECT_FALSE(fm.hasTasks());
1240 TEST(FiberManager, yieldTest) {
1241 FiberManager manager(std::make_unique<SimpleLoopController>());
1242 auto& loopController =
1243 dynamic_cast<SimpleLoopController&>(manager.loopController());
1245 bool checkRan = false;
1247 manager.addTask([&]() {
1252 loopController.loop([&]() {
1254 loopController.stop();
1258 EXPECT_TRUE(checkRan);
1261 TEST(FiberManager, RequestContext) {
1262 FiberManager fm(std::make_unique<SimpleLoopController>());
1264 bool checkRun1 = false;
1265 bool checkRun2 = false;
1266 bool checkRun3 = false;
1267 bool checkRun4 = false;
1268 folly::fibers::Baton baton1;
1269 folly::fibers::Baton baton2;
1270 folly::fibers::Baton baton3;
1271 folly::fibers::Baton baton4;
1274 folly::RequestContextScopeGuard rctx;
1275 auto rcontext1 = folly::RequestContext::get();
1276 fm.addTask([&, rcontext1]() {
1277 EXPECT_EQ(rcontext1, folly::RequestContext::get());
1279 [&]() { EXPECT_EQ(rcontext1, folly::RequestContext::get()); });
1280 EXPECT_EQ(rcontext1, folly::RequestContext::get());
1282 [&]() { EXPECT_EQ(rcontext1, folly::RequestContext::get()); });
1287 folly::RequestContextScopeGuard rctx;
1288 auto rcontext2 = folly::RequestContext::get();
1289 fm.addTaskRemote([&, rcontext2]() {
1290 EXPECT_EQ(rcontext2, folly::RequestContext::get());
1292 EXPECT_EQ(rcontext2, folly::RequestContext::get());
1297 folly::RequestContextScopeGuard rctx;
1298 auto rcontext3 = folly::RequestContext::get();
1301 EXPECT_EQ(rcontext3, folly::RequestContext::get());
1303 EXPECT_EQ(rcontext3, folly::RequestContext::get());
1305 return folly::Unit();
1307 [&, rcontext3](Try<folly::Unit>&& /* t */) {
1308 EXPECT_EQ(rcontext3, folly::RequestContext::get());
1313 folly::RequestContext::setContext(nullptr);
1315 folly::RequestContextScopeGuard rctx;
1316 auto rcontext4 = folly::RequestContext::get();
1318 EXPECT_EQ(rcontext4, folly::RequestContext::get());
1323 folly::RequestContextScopeGuard rctx;
1324 auto rcontext = folly::RequestContext::get();
1326 fm.loopUntilNoReady();
1327 EXPECT_EQ(rcontext, folly::RequestContext::get());
1330 EXPECT_EQ(rcontext, folly::RequestContext::get());
1331 fm.loopUntilNoReady();
1332 EXPECT_TRUE(checkRun1);
1333 EXPECT_EQ(rcontext, folly::RequestContext::get());
1336 EXPECT_EQ(rcontext, folly::RequestContext::get());
1337 fm.loopUntilNoReady();
1338 EXPECT_TRUE(checkRun2);
1339 EXPECT_EQ(rcontext, folly::RequestContext::get());
1342 EXPECT_EQ(rcontext, folly::RequestContext::get());
1343 fm.loopUntilNoReady();
1344 EXPECT_TRUE(checkRun3);
1345 EXPECT_EQ(rcontext, folly::RequestContext::get());
1348 EXPECT_EQ(rcontext, folly::RequestContext::get());
1349 fm.loopUntilNoReady();
1350 EXPECT_TRUE(checkRun4);
1351 EXPECT_EQ(rcontext, folly::RequestContext::get());
1355 TEST(FiberManager, resizePeriodically) {
1356 FiberManager::Options opts;
1357 opts.fibersPoolResizePeriodMs = 300;
1358 opts.maxFibersPoolSize = 5;
1360 FiberManager manager(std::make_unique<EventBaseLoopController>(), opts);
1362 folly::EventBase evb;
1363 dynamic_cast<EventBaseLoopController&>(manager.loopController())
1364 .attachEventBase(evb);
1366 std::vector<Baton> batons(10);
1368 size_t tasksRun = 0;
1369 for (size_t i = 0; i < 30; ++i) {
1370 manager.addTask([i, &batons, &tasksRun]() {
1372 // Keep some fibers active indefinitely
1373 if (i < batons.size()) {
1379 EXPECT_EQ(0, tasksRun);
1380 EXPECT_EQ(30, manager.fibersAllocated());
1381 EXPECT_EQ(0, manager.fibersPoolSize());
1384 EXPECT_EQ(30, tasksRun);
1385 EXPECT_EQ(30, manager.fibersAllocated());
1386 // Can go over maxFibersPoolSize, 10 of 30 fibers still active
1387 EXPECT_EQ(20, manager.fibersPoolSize());
1389 std::this_thread::sleep_for(std::chrono::milliseconds(400));
1390 evb.loopOnce(); // no fibers active in this period
1391 EXPECT_EQ(30, manager.fibersAllocated());
1392 EXPECT_EQ(20, manager.fibersPoolSize());
1394 std::this_thread::sleep_for(std::chrono::milliseconds(400));
1395 evb.loopOnce(); // should shrink fibers pool to maxFibersPoolSize
1396 EXPECT_EQ(15, manager.fibersAllocated());
1397 EXPECT_EQ(5, manager.fibersPoolSize());
1399 for (size_t i = 0; i < batons.size(); ++i) {
1403 EXPECT_EQ(15, manager.fibersAllocated());
1404 EXPECT_EQ(15, manager.fibersPoolSize());
1406 std::this_thread::sleep_for(std::chrono::milliseconds(400));
1407 evb.loopOnce(); // 10 fibers active in last period
1408 EXPECT_EQ(10, manager.fibersAllocated());
1409 EXPECT_EQ(10, manager.fibersPoolSize());
1411 std::this_thread::sleep_for(std::chrono::milliseconds(400));
1413 EXPECT_EQ(5, manager.fibersAllocated());
1414 EXPECT_EQ(5, manager.fibersPoolSize());
1417 TEST(FiberManager, batonWaitTimeoutHandler) {
1418 FiberManager manager(std::make_unique<EventBaseLoopController>());
1420 folly::EventBase evb;
1421 dynamic_cast<EventBaseLoopController&>(manager.loopController())
1422 .attachEventBase(evb);
1424 size_t fibersRun = 0;
1426 Baton::TimeoutHandler timeoutHandler;
1428 manager.addTask([&]() {
1429 baton.wait(timeoutHandler);
1432 manager.loopUntilNoReady();
1434 EXPECT_FALSE(baton.try_wait());
1435 EXPECT_EQ(0, fibersRun);
1437 timeoutHandler.scheduleTimeout(std::chrono::milliseconds(250));
1438 std::this_thread::sleep_for(std::chrono::milliseconds(500));
1440 EXPECT_FALSE(baton.try_wait());
1441 EXPECT_EQ(0, fibersRun);
1444 manager.loopUntilNoReady();
1446 EXPECT_EQ(1, fibersRun);
1449 TEST(FiberManager, batonWaitTimeoutMany) {
1450 FiberManager manager(std::make_unique<EventBaseLoopController>());
1452 folly::EventBase evb;
1453 dynamic_cast<EventBaseLoopController&>(manager.loopController())
1454 .attachEventBase(evb);
1456 constexpr size_t kNumTimeoutTasks = 10000;
1457 size_t tasksCount = kNumTimeoutTasks;
1459 // We add many tasks to hit timeout queue deallocation logic.
1460 for (size_t i = 0; i < kNumTimeoutTasks; ++i) {
1461 manager.addTask([&]() {
1463 Baton::TimeoutHandler timeoutHandler;
1465 folly::fibers::addTask([&] {
1466 timeoutHandler.scheduleTimeout(std::chrono::milliseconds(1000));
1469 baton.wait(timeoutHandler);
1470 if (--tasksCount == 0) {
1471 evb.terminateLoopSoon();
1479 TEST(FiberManager, remoteFutureTest) {
1480 FiberManager fiberManager(std::make_unique<SimpleLoopController>());
1481 auto& loopController =
1482 dynamic_cast<SimpleLoopController&>(fiberManager.loopController());
1486 auto f1 = fiberManager.addTaskFuture([&]() { return testValue1; });
1487 auto f2 = fiberManager.addTaskRemoteFuture([&]() { return testValue2; });
1488 loopController.loop([&]() { loopController.stop(); });
1492 EXPECT_EQ(v1, testValue1);
1493 EXPECT_EQ(v2, testValue2);
1496 // Test that a void function produes a Future<Unit>.
1497 TEST(FiberManager, remoteFutureVoidUnitTest) {
1498 FiberManager fiberManager(std::make_unique<SimpleLoopController>());
1499 auto& loopController =
1500 dynamic_cast<SimpleLoopController&>(fiberManager.loopController());
1502 bool ranLocal = false;
1503 folly::Future<folly::Unit> futureLocal =
1504 fiberManager.addTaskFuture([&]() { ranLocal = true; });
1506 bool ranRemote = false;
1507 folly::Future<folly::Unit> futureRemote =
1508 fiberManager.addTaskRemoteFuture([&]() { ranRemote = true; });
1510 loopController.loop([&]() { loopController.stop(); });
1513 ASSERT_TRUE(ranLocal);
1515 futureRemote.wait();
1516 ASSERT_TRUE(ranRemote);
1519 TEST(FiberManager, nestedFiberManagers) {
1520 folly::EventBase outerEvb;
1521 folly::EventBase innerEvb;
1523 getFiberManager(outerEvb).addTask([&]() {
1525 &getFiberManager(outerEvb), FiberManager::getFiberManagerUnsafe());
1527 runInMainContext([&]() {
1528 getFiberManager(innerEvb).addTask([&]() {
1530 &getFiberManager(innerEvb), FiberManager::getFiberManagerUnsafe());
1532 innerEvb.terminateLoopSoon();
1535 innerEvb.loopForever();
1539 &getFiberManager(outerEvb), FiberManager::getFiberManagerUnsafe());
1541 outerEvb.terminateLoopSoon();
1544 outerEvb.loopForever();
1547 TEST(FiberManager, semaphore) {
1548 constexpr size_t kTasks = 10;
1549 constexpr size_t kIterations = 10000;
1550 constexpr size_t kNumTokens = 10;
1552 Semaphore sem(kNumTokens);
1556 auto task = [&sem, kTasks, kIterations, kNumTokens](
1557 int& counter, folly::fibers::Baton& baton) {
1558 FiberManager manager(std::make_unique<EventBaseLoopController>());
1559 folly::EventBase evb;
1560 dynamic_cast<EventBaseLoopController&>(manager.loopController())
1561 .attachEventBase(evb);
1564 std::shared_ptr<folly::EventBase> completionCounter(
1565 &evb, [](folly::EventBase* evb) { evb->terminateLoopSoon(); });
1567 for (size_t i = 0; i < kTasks; ++i) {
1568 manager.addTask([&, completionCounter]() {
1569 for (size_t j = 0; j < kIterations; ++j) {
1575 EXPECT_LT(counter, kNumTokens);
1576 EXPECT_GE(counter, 0);
1586 folly::fibers::Baton batonA;
1587 folly::fibers::Baton batonB;
1588 std::thread threadA([&] { task(counterA, batonA); });
1589 std::thread threadB([&] { task(counterB, batonB); });
1596 EXPECT_LT(counterA, kNumTokens);
1597 EXPECT_LT(counterB, kNumTokens);
1598 EXPECT_GE(counterA, 0);
1599 EXPECT_GE(counterB, 0);
1602 template <typename ExecutorT>
1603 void singleBatchDispatch(ExecutorT& executor, int batchSize, int index) {
1604 thread_local BatchDispatcher<int, std::string, ExecutorT> batchDispatcher(
1605 executor, [=](std::vector<int>&& batch) {
1606 EXPECT_EQ(batchSize, batch.size());
1607 std::vector<std::string> results;
1608 for (auto& it : batch) {
1609 results.push_back(folly::to<std::string>(it));
1614 auto indexCopy = index;
1615 auto result = batchDispatcher.add(std::move(indexCopy));
1616 EXPECT_EQ(folly::to<std::string>(index), result.get());
1619 TEST(FiberManager, batchDispatchTest) {
1620 folly::EventBase evb;
1621 auto& executor = getFiberManager(evb);
1623 // Launch multiple fibers with a single id.
1624 executor.add([&]() {
1626 for (int i = 0; i < batchSize; i++) {
1628 [=, &executor]() { singleBatchDispatch(executor, batchSize, i); });
1633 // Reuse the same BatchDispatcher to batch once again.
1634 executor.add([&]() {
1636 for (int i = 0; i < batchSize; i++) {
1638 [=, &executor]() { singleBatchDispatch(executor, batchSize, i); });
1644 template <typename ExecutorT>
1645 folly::Future<std::vector<std::string>> doubleBatchInnerDispatch(
1646 ExecutorT& executor,
1647 int totalNumberOfElements,
1648 std::vector<int> input) {
1649 thread_local BatchDispatcher<
1651 std::vector<std::string>,
1653 batchDispatcher(executor, [=](std::vector<std::vector<int>>&& batch) {
1654 std::vector<std::vector<std::string>> results;
1655 int numberOfElements = 0;
1656 for (auto& unit : batch) {
1657 numberOfElements += unit.size();
1658 std::vector<std::string> result;
1659 for (auto& element : unit) {
1660 result.push_back(folly::to<std::string>(element));
1662 results.push_back(std::move(result));
1664 EXPECT_EQ(totalNumberOfElements, numberOfElements);
1668 return batchDispatcher.add(std::move(input));
1672 * Batch values in groups of 5, and then call inner dispatch.
1674 template <typename ExecutorT>
1675 void doubleBatchOuterDispatch(
1676 ExecutorT& executor,
1677 int totalNumberOfElements,
1679 thread_local BatchDispatcher<int, std::string, ExecutorT>
1680 batchDispatcher(executor, [=, &executor](std::vector<int>&& batch) {
1681 EXPECT_EQ(totalNumberOfElements, batch.size());
1682 std::vector<std::string> results;
1683 std::vector<folly::Future<std::vector<std::string>>>
1684 innerDispatchResultFutures;
1686 std::vector<int> group;
1687 for (auto unit : batch) {
1688 group.push_back(unit);
1689 if (group.size() == 5) {
1690 auto localGroup = group;
1693 innerDispatchResultFutures.push_back(doubleBatchInnerDispatch(
1694 executor, totalNumberOfElements, localGroup));
1699 innerDispatchResultFutures.begin(), innerDispatchResultFutures.end())
1701 std::vector<Try<std::vector<std::string>>> innerDispatchResults) {
1702 for (auto& unit : innerDispatchResults) {
1703 for (auto& element : unit.value()) {
1704 results.push_back(element);
1712 auto indexCopy = index;
1713 auto result = batchDispatcher.add(std::move(indexCopy));
1714 EXPECT_EQ(folly::to<std::string>(index), result.get());
1717 TEST(FiberManager, doubleBatchDispatchTest) {
1718 folly::EventBase evb;
1719 auto& executor = getFiberManager(evb);
1721 // Launch multiple fibers with a single id.
1722 executor.add([&]() {
1723 int totalNumberOfElements = 20;
1724 for (int i = 0; i < totalNumberOfElements; i++) {
1725 executor.add([=, &executor]() {
1726 doubleBatchOuterDispatch(executor, totalNumberOfElements, i);
1733 template <typename ExecutorT>
1734 void batchDispatchExceptionHandling(ExecutorT& executor, int i) {
1735 thread_local BatchDispatcher<int, int, ExecutorT> batchDispatcher(
1736 executor, [=, &executor](std::vector<int> &&) -> std::vector<int> {
1737 throw std::runtime_error("Surprise!!");
1740 EXPECT_THROW(batchDispatcher.add(i).get(), std::runtime_error);
1743 TEST(FiberManager, batchDispatchExceptionHandlingTest) {
1744 folly::EventBase evb;
1745 auto& executor = getFiberManager(evb);
1747 // Launch multiple fibers with a single id.
1748 executor.add([&]() {
1749 int totalNumberOfElements = 5;
1750 for (int i = 0; i < totalNumberOfElements; i++) {
1752 [=, &executor]() { batchDispatchExceptionHandling(executor, i); });
1758 namespace AtomicBatchDispatcherTesting {
1760 using ValueT = size_t;
1761 using ResultT = std::string;
1762 using DispatchFunctionT =
1763 folly::Function<std::vector<ResultT>(std::vector<ValueT>&&)>;
1765 #define ENABLE_TRACE_IN_TEST 0 // Set to 1 to debug issues in ABD tests
1766 #if ENABLE_TRACE_IN_TEST
1767 #define OUTPUT_TRACE std::cerr
1768 #else // ENABLE_TRACE_IN_TEST
1769 struct DevNullPiper {
1770 template <typename T>
1771 DevNullPiper& operator<<(const T&) {
1775 DevNullPiper& operator<<(std::ostream& (*)(std::ostream&)) {
1779 #define OUTPUT_TRACE devNullPiper
1780 #endif // ENABLE_TRACE_IN_TEST
1783 AtomicBatchDispatcher<ValueT, ResultT>::Token token;
1786 void preprocess(FiberManager& executor, bool die) {
1787 // Yield for a random duration [0, 10] ms to simulate I/O in preprocessing
1788 clock_t msecToDoIO = folly::Random::rand32() % 10;
1789 double start = (1000.0 * clock()) / CLOCKS_PER_SEC;
1790 double endAfter = start + msecToDoIO;
1791 while ((1000.0 * clock()) / CLOCKS_PER_SEC < endAfter) {
1795 throw std::logic_error("Simulating preprocessing failure");
1799 Job(AtomicBatchDispatcher<ValueT, ResultT>::Token&& t, ValueT i)
1800 : token(std::move(t)), input(i) {}
1802 Job(Job&&) = default;
1803 Job& operator=(Job&&) = default;
1806 ResultT processSingleInput(ValueT&& input) {
1807 return folly::to<ResultT>(std::move(input));
1810 std::vector<ResultT> userDispatchFunc(std::vector<ValueT>&& inputs) {
1811 size_t expectedCount = inputs.size();
1812 std::vector<ResultT> results;
1813 results.reserve(expectedCount);
1814 for (size_t i = 0; i < expectedCount; ++i) {
1815 results.emplace_back(processSingleInput(std::move(inputs[i])));
1821 AtomicBatchDispatcher<ValueT, ResultT>& atomicBatchDispatcher,
1822 std::vector<Job>& jobs,
1825 for (size_t i = 0; i < count; ++i) {
1826 jobs.emplace_back(Job(atomicBatchDispatcher.getToken(), i));
1830 enum class DispatchProblem {
1837 FiberManager& executor,
1838 std::vector<Job>& jobs,
1839 std::vector<folly::Optional<folly::Future<ResultT>>>& results,
1840 DispatchProblem dispatchProblem = DispatchProblem::None,
1841 size_t problemIndex = size_t(-1)) {
1843 dispatchProblem == DispatchProblem::None || problemIndex < jobs.size());
1845 results.resize(jobs.size());
1846 for (size_t i = 0; i < jobs.size(); ++i) {
1848 [i, &executor, &jobs, &results, dispatchProblem, problemIndex]() {
1850 Job job(std::move(jobs[i]));
1852 if (dispatchProblem == DispatchProblem::PreprocessThrows) {
1853 if (i == problemIndex) {
1854 EXPECT_THROW(job.preprocess(executor, true), std::logic_error);
1859 job.preprocess(executor, false);
1860 OUTPUT_TRACE << "Dispatching job #" << i << std::endl;
1861 results[i] = job.token.dispatch(job.input);
1862 OUTPUT_TRACE << "Result future filled for job #" << i << std::endl;
1864 if (dispatchProblem == DispatchProblem::DuplicateDispatch) {
1865 if (i == problemIndex) {
1866 EXPECT_THROW(job.token.dispatch(job.input), ABDUsageException);
1870 OUTPUT_TRACE << "Preprocessing failed for job #" << i << std::endl;
1876 void validateResult(
1877 std::vector<folly::Optional<folly::Future<ResultT>>>& results,
1880 OUTPUT_TRACE << "results[" << i << "].value() : " << results[i]->value()
1882 } catch (std::exception& e) {
1883 OUTPUT_TRACE << "Exception : " << e.what() << std::endl;
1888 template <typename TException>
1889 void validateResults(
1890 std::vector<folly::Optional<folly::Future<ResultT>>>& results,
1891 size_t expectedNumResults) {
1892 size_t numResultsFilled = 0;
1893 for (size_t i = 0; i < results.size(); ++i) {
1898 EXPECT_THROW(validateResult(results, i), TException);
1900 EXPECT_EQ(numResultsFilled, expectedNumResults);
1903 void validateResults(
1904 std::vector<folly::Optional<folly::Future<ResultT>>>& results,
1905 size_t expectedNumResults) {
1906 size_t numResultsFilled = 0;
1907 for (size_t i = 0; i < results.size(); ++i) {
1912 EXPECT_NO_THROW(validateResult(results, i));
1913 ValueT expectedInput = i;
1915 results[i]->value(), processSingleInput(std::move(expectedInput)));
1917 EXPECT_EQ(numResultsFilled, expectedNumResults);
1920 } // AtomicBatchDispatcherTesting
1922 #define SET_UP_TEST_FUNC \
1923 using namespace AtomicBatchDispatcherTesting; \
1924 folly::EventBase evb; \
1925 auto& executor = getFiberManager(evb); \
1926 const size_t COUNT = 11; \
1927 std::vector<Job> jobs; \
1928 jobs.reserve(COUNT); \
1929 std::vector<folly::Optional<folly::Future<ResultT>>> results; \
1930 results.reserve(COUNT); \
1931 DispatchFunctionT dispatchFunc
1933 TEST(FiberManager, ABD_Test) {
1937 // Testing AtomicBatchDispatcher with explicit call to commit()
1939 dispatchFunc = userDispatchFunc;
1940 auto atomicBatchDispatcher =
1941 createAtomicBatchDispatcher(std::move(dispatchFunc));
1942 createJobs(atomicBatchDispatcher, jobs, COUNT);
1943 dispatchJobs(executor, jobs, results);
1944 atomicBatchDispatcher.commit();
1946 validateResults(results, COUNT);
1949 TEST(FiberManager, ABD_DispatcherDestroyedBeforeCallingCommit) {
1953 // Testing AtomicBatchDispatcher destroyed before calling commit.
1954 // Handles error cases for:
1955 // - User might have forgotten to add the call to commit() in the code
1956 // - An unexpected exception got thrown in user code before commit() is called
1959 dispatchFunc = userDispatchFunc;
1960 auto atomicBatchDispatcher =
1961 createAtomicBatchDispatcher(std::move(dispatchFunc));
1962 createJobs(atomicBatchDispatcher, jobs, COUNT);
1963 dispatchJobs(executor, jobs, results);
1964 throw std::runtime_error(
1965 "Unexpected exception in user code before commit called");
1966 // atomicBatchDispatcher.commit();
1968 /* User code handles the exception and does not exit process */
1971 validateResults<ABDCommitNotCalledException>(results, COUNT);
1974 TEST(FiberManager, ABD_PreprocessingFailureTest) {
1978 // Testing preprocessing failure on a job throws
1980 dispatchFunc = userDispatchFunc;
1981 auto atomicBatchDispatcher =
1982 createAtomicBatchDispatcher(std::move(dispatchFunc));
1983 createJobs(atomicBatchDispatcher, jobs, COUNT);
1984 dispatchJobs(executor, jobs, results, DispatchProblem::PreprocessThrows, 8);
1985 atomicBatchDispatcher.commit();
1987 validateResults<ABDTokenNotDispatchedException>(results, COUNT - 1);
1990 TEST(FiberManager, ABD_MultipleDispatchOnSameTokenErrorTest) {
1994 // Testing that calling dispatch more than once on the same token throws
1996 dispatchFunc = userDispatchFunc;
1997 auto atomicBatchDispatcher =
1998 createAtomicBatchDispatcher(std::move(dispatchFunc));
1999 createJobs(atomicBatchDispatcher, jobs, COUNT);
2000 dispatchJobs(executor, jobs, results, DispatchProblem::DuplicateDispatch, 4);
2001 atomicBatchDispatcher.commit();
2005 TEST(FiberManager, ABD_GetTokenCalledAfterCommitTest) {
2009 // Testing that exception set on attempt to call getToken after commit called
2011 dispatchFunc = userDispatchFunc;
2012 auto atomicBatchDispatcher =
2013 createAtomicBatchDispatcher(std::move(dispatchFunc));
2014 createJobs(atomicBatchDispatcher, jobs, COUNT);
2015 atomicBatchDispatcher.commit();
2016 EXPECT_THROW(atomicBatchDispatcher.getToken(), ABDUsageException);
2017 dispatchJobs(executor, jobs, results);
2018 EXPECT_THROW(atomicBatchDispatcher.getToken(), ABDUsageException);
2020 validateResults(results, COUNT);
2021 EXPECT_THROW(atomicBatchDispatcher.getToken(), ABDUsageException);
2024 TEST(FiberManager, ABD_UserProvidedBatchDispatchThrowsTest) {
2028 // Testing that exception is set if user provided batch dispatch throws
2030 dispatchFunc = [](std::vector<ValueT>&& inputs) -> std::vector<ResultT> {
2031 (void)userDispatchFunc(std::move(inputs));
2032 throw std::runtime_error("Unexpected exception in user dispatch function");
2034 auto atomicBatchDispatcher =
2035 createAtomicBatchDispatcher(std::move(dispatchFunc));
2036 createJobs(atomicBatchDispatcher, jobs, COUNT);
2037 dispatchJobs(executor, jobs, results);
2038 atomicBatchDispatcher.commit();
2040 validateResults<std::runtime_error>(results, COUNT);
2043 TEST(FiberManager, VirtualEventBase) {
2047 folly::ScopedEventBaseThread thread;
2050 std::make_unique<folly::VirtualEventBase>(*thread.getEventBase());
2051 auto& evb2 = thread.getEventBase()->getVirtualEventBase();
2053 getFiberManager(*evb1).addTaskRemote([&] {
2055 baton.timed_wait(std::chrono::milliseconds{100});
2060 getFiberManager(evb2).addTaskRemote([&] {
2062 baton.timed_wait(std::chrono::milliseconds{200});
2067 EXPECT_FALSE(done1);
2068 EXPECT_FALSE(done2);
2072 EXPECT_FALSE(done2);
2077 TEST(TimedMutex, ThreadFiberDeadlockOrder) {
2078 folly::EventBase evb;
2079 auto& fm = getFiberManager(evb);
2083 std::thread unlockThread([&] {
2084 /* sleep override */ std::this_thread::sleep_for(
2085 std::chrono::milliseconds{100});
2089 fm.addTask([&] { std::lock_guard<TimedMutex> lg(mutex); });
2091 runInMainContext([&] {
2092 auto locked = mutex.timed_lock(std::chrono::seconds{1});
2093 EXPECT_TRUE(locked);
2101 EXPECT_EQ(0, fm.hasTasks());
2103 unlockThread.join();
2106 TEST(TimedMutex, ThreadFiberDeadlockRace) {
2107 folly::EventBase evb;
2108 auto& fm = getFiberManager(evb);
2114 auto locked = mutex.timed_lock(std::chrono::seconds{1});
2115 EXPECT_TRUE(locked);
2122 runInMainContext([&] {
2123 auto locked = mutex.timed_lock(std::chrono::seconds{1});
2124 EXPECT_TRUE(locked);
2132 EXPECT_EQ(0, fm.hasTasks());
2136 * Test that we can properly track fiber stack usage.
2138 * This functionality can only be enabled when ASAN is disabled, so avoid
2139 * running this test with ASAN.
2141 #ifndef FOLLY_SANITIZE_ADDRESS
2142 TEST(FiberManager, recordStack) {
2144 folly::fibers::FiberManager::Options opts;
2145 opts.recordStackEvery = 1;
2147 FiberManager fm(std::make_unique<SimpleLoopController>(), opts);
2148 auto& loopController =
2149 dynamic_cast<SimpleLoopController&>(fm.loopController());
2151 static constexpr size_t n = 1000;
2155 for (size_t i = 0; i < n; ++i) {
2158 for (size_t i = 0; i + 1 < n; ++i) {
2159 s += b[i] * b[i + 1];
2165 loopController.loop([&]() { loopController.stop(); });
2167 // Check that we properly accounted fiber stack usage.
2168 EXPECT_LT(n * sizeof(int), fm.stackHighWatermark());