2 * Copyright 2016 Facebook, Inc.
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
8 * http://www.apache.org/licenses/LICENSE-2.0
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
20 #include <gtest/gtest.h>
22 #include <folly/Benchmark.h>
23 #include <folly/Memory.h>
24 #include <folly/futures/Future.h>
26 #include <folly/experimental/fibers/AddTasks.h>
27 #include <folly/experimental/fibers/EventBaseLoopController.h>
28 #include <folly/experimental/fibers/FiberManager.h>
29 #include <folly/experimental/fibers/FiberManagerMap.h>
30 #include <folly/experimental/fibers/GenericBaton.h>
31 #include <folly/experimental/fibers/SimpleLoopController.h>
32 #include <folly/experimental/fibers/WhenN.h>
34 using namespace folly::fibers;
38 TEST(FiberManager, batonTimedWaitTimeout) {
39 bool taskAdded = false;
40 size_t iterations = 0;
42 FiberManager manager(folly::make_unique<SimpleLoopController>());
43 auto& loopController =
44 dynamic_cast<SimpleLoopController&>(manager.loopController());
46 auto loopFunc = [&]() {
48 manager.addTask([&]() {
51 auto res = baton.timed_wait(std::chrono::milliseconds(230));
54 EXPECT_EQ(5, iterations);
56 loopController.stop();
58 manager.addTask([&]() {
61 auto res = baton.timed_wait(std::chrono::milliseconds(130));
64 EXPECT_EQ(3, iterations);
66 loopController.stop();
70 std::this_thread::sleep_for(std::chrono::milliseconds(50));
75 loopController.loop(std::move(loopFunc));
78 TEST(FiberManager, batonTimedWaitPost) {
79 bool taskAdded = false;
80 size_t iterations = 0;
83 FiberManager manager(folly::make_unique<SimpleLoopController>());
84 auto& loopController =
85 dynamic_cast<SimpleLoopController&>(manager.loopController());
87 auto loopFunc = [&]() {
89 manager.addTask([&]() {
93 auto res = baton.timed_wait(std::chrono::milliseconds(130));
96 EXPECT_EQ(2, iterations);
98 loopController.stop();
102 std::this_thread::sleep_for(std::chrono::milliseconds(50));
104 if (iterations == 2) {
110 loopController.loop(std::move(loopFunc));
113 TEST(FiberManager, batonTimedWaitTimeoutEvb) {
114 size_t tasksComplete = 0;
116 folly::EventBase evb;
118 FiberManager manager(folly::make_unique<EventBaseLoopController>());
119 dynamic_cast<EventBaseLoopController&>(manager.loopController())
120 .attachEventBase(evb);
122 auto task = [&](size_t timeout_ms) {
125 auto start = EventBaseLoopController::Clock::now();
126 auto res = baton.timed_wait(std::chrono::milliseconds(timeout_ms));
127 auto finish = EventBaseLoopController::Clock::now();
132 std::chrono::duration_cast<std::chrono::milliseconds>(finish - start);
134 EXPECT_GT(duration_ms.count(), timeout_ms - 50);
135 EXPECT_LT(duration_ms.count(), timeout_ms + 50);
137 if (++tasksComplete == 2) {
138 evb.terminateLoopSoon();
142 evb.runInEventBaseThread([&]() {
143 manager.addTask([&]() { task(500); });
144 manager.addTask([&]() { task(250); });
149 EXPECT_EQ(2, tasksComplete);
152 TEST(FiberManager, batonTimedWaitPostEvb) {
153 size_t tasksComplete = 0;
155 folly::EventBase evb;
157 FiberManager manager(folly::make_unique<EventBaseLoopController>());
158 dynamic_cast<EventBaseLoopController&>(manager.loopController())
159 .attachEventBase(evb);
161 evb.runInEventBaseThread([&]() {
162 manager.addTask([&]() {
165 evb.tryRunAfterDelay([&]() { baton.post(); }, 100);
167 auto start = EventBaseLoopController::Clock::now();
168 auto res = baton.timed_wait(std::chrono::milliseconds(130));
169 auto finish = EventBaseLoopController::Clock::now();
174 std::chrono::duration_cast<std::chrono::milliseconds>(finish - start);
176 EXPECT_TRUE(duration_ms.count() > 95 && duration_ms.count() < 110);
178 if (++tasksComplete == 1) {
179 evb.terminateLoopSoon();
186 EXPECT_EQ(1, tasksComplete);
189 TEST(FiberManager, batonTryWait) {
190 FiberManager manager(folly::make_unique<SimpleLoopController>());
192 // Check if try_wait and post work as expected
195 manager.addTask([&]() {
196 while (!b.try_wait()) {
199 auto thr = std::thread([&]() {
200 std::this_thread::sleep_for(std::chrono::milliseconds(300));
204 manager.loopUntilNoReady();
209 // Check try_wait without post
210 manager.addTask([&]() {
212 while (cnt && !c.try_wait()) {
215 EXPECT_TRUE(!c.try_wait()); // must still hold
219 manager.loopUntilNoReady();
222 TEST(FiberManager, genericBatonFiberWait) {
223 FiberManager manager(folly::make_unique<SimpleLoopController>());
226 bool fiberRunning = false;
228 manager.addTask([&]() {
229 EXPECT_EQ(manager.hasActiveFiber(), true);
232 fiberRunning = false;
235 EXPECT_FALSE(fiberRunning);
236 manager.loopUntilNoReady();
237 EXPECT_TRUE(fiberRunning); // ensure fiber still active
239 auto thr = std::thread([&]() {
240 std::this_thread::sleep_for(std::chrono::milliseconds(300));
244 while (fiberRunning) {
245 manager.loopUntilNoReady();
251 TEST(FiberManager, genericBatonThreadWait) {
252 FiberManager manager(folly::make_unique<SimpleLoopController>());
254 std::atomic<bool> threadWaiting(false);
256 auto thr = std::thread([&]() {
257 threadWaiting = true;
259 threadWaiting = false;
262 while (!threadWaiting) {
264 std::this_thread::sleep_for(std::chrono::milliseconds(300));
266 manager.addTask([&]() {
267 EXPECT_EQ(manager.hasActiveFiber(), true);
268 EXPECT_TRUE(threadWaiting);
270 while (threadWaiting) {
274 manager.loopUntilNoReady();
278 TEST(FiberManager, addTasksNoncopyable) {
279 std::vector<Promise<int>> pendingFibers;
280 bool taskAdded = false;
282 FiberManager manager(folly::make_unique<SimpleLoopController>());
283 auto& loopController =
284 dynamic_cast<SimpleLoopController&>(manager.loopController());
286 auto loopFunc = [&]() {
288 manager.addTask([&]() {
289 std::vector<std::function<std::unique_ptr<int>()>> funcs;
290 for (size_t i = 0; i < 3; ++i) {
291 funcs.push_back([i, &pendingFibers]() {
292 await([&pendingFibers](Promise<int> promise) {
293 pendingFibers.push_back(std::move(promise));
295 return folly::make_unique<int>(i * 2 + 1);
299 auto iter = addTasks(funcs.begin(), funcs.end());
302 while (iter.hasNext()) {
303 auto result = iter.awaitNext();
304 EXPECT_EQ(2 * iter.getTaskID() + 1, *result);
305 EXPECT_GE(2 - n, pendingFibers.size());
311 } else if (pendingFibers.size()) {
312 pendingFibers.back().setValue(0);
313 pendingFibers.pop_back();
315 loopController.stop();
319 loopController.loop(std::move(loopFunc));
322 TEST(FiberManager, addTasksThrow) {
323 std::vector<Promise<int>> pendingFibers;
324 bool taskAdded = false;
326 FiberManager manager(folly::make_unique<SimpleLoopController>());
327 auto& loopController =
328 dynamic_cast<SimpleLoopController&>(manager.loopController());
330 auto loopFunc = [&]() {
332 manager.addTask([&]() {
333 std::vector<std::function<int()>> funcs;
334 for (size_t i = 0; i < 3; ++i) {
335 funcs.push_back([i, &pendingFibers]() {
336 await([&pendingFibers](Promise<int> promise) {
337 pendingFibers.push_back(std::move(promise));
340 throw std::runtime_error("Runtime");
346 auto iter = addTasks(funcs.begin(), funcs.end());
349 while (iter.hasNext()) {
351 int result = iter.awaitNext();
352 EXPECT_EQ(1, iter.getTaskID() % 2);
353 EXPECT_EQ(2 * iter.getTaskID() + 1, result);
355 EXPECT_EQ(0, iter.getTaskID() % 2);
357 EXPECT_GE(2 - n, pendingFibers.size());
363 } else if (pendingFibers.size()) {
364 pendingFibers.back().setValue(0);
365 pendingFibers.pop_back();
367 loopController.stop();
371 loopController.loop(std::move(loopFunc));
374 TEST(FiberManager, addTasksVoid) {
375 std::vector<Promise<int>> pendingFibers;
376 bool taskAdded = false;
378 FiberManager manager(folly::make_unique<SimpleLoopController>());
379 auto& loopController =
380 dynamic_cast<SimpleLoopController&>(manager.loopController());
382 auto loopFunc = [&]() {
384 manager.addTask([&]() {
385 std::vector<std::function<void()>> funcs;
386 for (size_t i = 0; i < 3; ++i) {
387 funcs.push_back([i, &pendingFibers]() {
388 await([&pendingFibers](Promise<int> promise) {
389 pendingFibers.push_back(std::move(promise));
394 auto iter = addTasks(funcs.begin(), funcs.end());
397 while (iter.hasNext()) {
399 EXPECT_GE(2 - n, pendingFibers.size());
405 } else if (pendingFibers.size()) {
406 pendingFibers.back().setValue(0);
407 pendingFibers.pop_back();
409 loopController.stop();
413 loopController.loop(std::move(loopFunc));
416 TEST(FiberManager, addTasksVoidThrow) {
417 std::vector<Promise<int>> pendingFibers;
418 bool taskAdded = false;
420 FiberManager manager(folly::make_unique<SimpleLoopController>());
421 auto& loopController =
422 dynamic_cast<SimpleLoopController&>(manager.loopController());
424 auto loopFunc = [&]() {
426 manager.addTask([&]() {
427 std::vector<std::function<void()>> funcs;
428 for (size_t i = 0; i < 3; ++i) {
429 funcs.push_back([i, &pendingFibers]() {
430 await([&pendingFibers](Promise<int> promise) {
431 pendingFibers.push_back(std::move(promise));
434 throw std::runtime_error("");
439 auto iter = addTasks(funcs.begin(), funcs.end());
442 while (iter.hasNext()) {
445 EXPECT_EQ(1, iter.getTaskID() % 2);
447 EXPECT_EQ(0, iter.getTaskID() % 2);
449 EXPECT_GE(2 - n, pendingFibers.size());
455 } else if (pendingFibers.size()) {
456 pendingFibers.back().setValue(0);
457 pendingFibers.pop_back();
459 loopController.stop();
463 loopController.loop(std::move(loopFunc));
466 TEST(FiberManager, addTasksReserve) {
467 std::vector<Promise<int>> pendingFibers;
468 bool taskAdded = false;
470 FiberManager manager(folly::make_unique<SimpleLoopController>());
471 auto& loopController =
472 dynamic_cast<SimpleLoopController&>(manager.loopController());
474 auto loopFunc = [&]() {
476 manager.addTask([&]() {
477 std::vector<std::function<void()>> funcs;
478 for (size_t i = 0; i < 3; ++i) {
479 funcs.push_back([&pendingFibers]() {
480 await([&pendingFibers](Promise<int> promise) {
481 pendingFibers.push_back(std::move(promise));
486 auto iter = addTasks(funcs.begin(), funcs.end());
489 EXPECT_TRUE(iter.hasCompleted());
490 EXPECT_TRUE(iter.hasPending());
491 EXPECT_TRUE(iter.hasNext());
494 EXPECT_TRUE(iter.hasCompleted());
495 EXPECT_TRUE(iter.hasPending());
496 EXPECT_TRUE(iter.hasNext());
499 EXPECT_FALSE(iter.hasCompleted());
500 EXPECT_TRUE(iter.hasPending());
501 EXPECT_TRUE(iter.hasNext());
504 EXPECT_FALSE(iter.hasCompleted());
505 EXPECT_FALSE(iter.hasPending());
506 EXPECT_FALSE(iter.hasNext());
509 } else if (pendingFibers.size()) {
510 pendingFibers.back().setValue(0);
511 pendingFibers.pop_back();
513 loopController.stop();
517 loopController.loop(std::move(loopFunc));
520 TEST(FiberManager, addTaskDynamic) {
521 folly::EventBase evb;
525 auto makeTask = [&](size_t taskId) {
526 return [&, taskId]() -> size_t {
527 batons[taskId].wait();
533 .addTaskFuture([&]() {
534 TaskIterator<size_t> iterator;
536 iterator.addTask(makeTask(0));
537 iterator.addTask(makeTask(1));
541 EXPECT_EQ(1, iterator.awaitNext());
543 iterator.addTask(makeTask(2));
547 EXPECT_EQ(2, iterator.awaitNext());
551 EXPECT_EQ(0, iterator.awaitNext());
556 TEST(FiberManager, forEach) {
557 std::vector<Promise<int>> pendingFibers;
558 bool taskAdded = false;
560 FiberManager manager(folly::make_unique<SimpleLoopController>());
561 auto& loopController =
562 dynamic_cast<SimpleLoopController&>(manager.loopController());
564 auto loopFunc = [&]() {
566 manager.addTask([&]() {
567 std::vector<std::function<int()>> funcs;
568 for (size_t i = 0; i < 3; ++i) {
569 funcs.push_back([i, &pendingFibers]() {
570 await([&pendingFibers](Promise<int> promise) {
571 pendingFibers.push_back(std::move(promise));
577 std::vector<std::pair<size_t, int>> results;
578 forEach(funcs.begin(), funcs.end(), [&results](size_t id, int result) {
579 results.emplace_back(id, result);
581 EXPECT_EQ(3, results.size());
582 EXPECT_TRUE(pendingFibers.empty());
583 for (size_t i = 0; i < 3; ++i) {
584 EXPECT_EQ(results[i].first * 2 + 1, results[i].second);
588 } else if (pendingFibers.size()) {
589 pendingFibers.back().setValue(0);
590 pendingFibers.pop_back();
592 loopController.stop();
596 loopController.loop(std::move(loopFunc));
599 TEST(FiberManager, collectN) {
600 std::vector<Promise<int>> pendingFibers;
601 bool taskAdded = false;
603 FiberManager manager(folly::make_unique<SimpleLoopController>());
604 auto& loopController =
605 dynamic_cast<SimpleLoopController&>(manager.loopController());
607 auto loopFunc = [&]() {
609 manager.addTask([&]() {
610 std::vector<std::function<int()>> funcs;
611 for (size_t i = 0; i < 3; ++i) {
612 funcs.push_back([i, &pendingFibers]() {
613 await([&pendingFibers](Promise<int> promise) {
614 pendingFibers.push_back(std::move(promise));
620 auto results = collectN(funcs.begin(), funcs.end(), 2);
621 EXPECT_EQ(2, results.size());
622 EXPECT_EQ(1, pendingFibers.size());
623 for (size_t i = 0; i < 2; ++i) {
624 EXPECT_EQ(results[i].first * 2 + 1, results[i].second);
628 } else if (pendingFibers.size()) {
629 pendingFibers.back().setValue(0);
630 pendingFibers.pop_back();
632 loopController.stop();
636 loopController.loop(std::move(loopFunc));
639 TEST(FiberManager, collectNThrow) {
640 std::vector<Promise<int>> pendingFibers;
641 bool taskAdded = false;
643 FiberManager manager(folly::make_unique<SimpleLoopController>());
644 auto& loopController =
645 dynamic_cast<SimpleLoopController&>(manager.loopController());
647 auto loopFunc = [&]() {
649 manager.addTask([&]() {
650 std::vector<std::function<int()>> funcs;
651 for (size_t i = 0; i < 3; ++i) {
652 funcs.push_back([i, &pendingFibers]() {
653 await([&pendingFibers](Promise<int> promise) {
654 pendingFibers.push_back(std::move(promise));
656 throw std::runtime_error("Runtime");
662 collectN(funcs.begin(), funcs.end(), 2);
664 EXPECT_EQ(1, pendingFibers.size());
668 } else if (pendingFibers.size()) {
669 pendingFibers.back().setValue(0);
670 pendingFibers.pop_back();
672 loopController.stop();
676 loopController.loop(std::move(loopFunc));
679 TEST(FiberManager, collectNVoid) {
680 std::vector<Promise<int>> pendingFibers;
681 bool taskAdded = false;
683 FiberManager manager(folly::make_unique<SimpleLoopController>());
684 auto& loopController =
685 dynamic_cast<SimpleLoopController&>(manager.loopController());
687 auto loopFunc = [&]() {
689 manager.addTask([&]() {
690 std::vector<std::function<void()>> funcs;
691 for (size_t i = 0; i < 3; ++i) {
692 funcs.push_back([i, &pendingFibers]() {
693 await([&pendingFibers](Promise<int> promise) {
694 pendingFibers.push_back(std::move(promise));
699 auto results = collectN(funcs.begin(), funcs.end(), 2);
700 EXPECT_EQ(2, results.size());
701 EXPECT_EQ(1, pendingFibers.size());
704 } else if (pendingFibers.size()) {
705 pendingFibers.back().setValue(0);
706 pendingFibers.pop_back();
708 loopController.stop();
712 loopController.loop(std::move(loopFunc));
715 TEST(FiberManager, collectNVoidThrow) {
716 std::vector<Promise<int>> pendingFibers;
717 bool taskAdded = false;
719 FiberManager manager(folly::make_unique<SimpleLoopController>());
720 auto& loopController =
721 dynamic_cast<SimpleLoopController&>(manager.loopController());
723 auto loopFunc = [&]() {
725 manager.addTask([&]() {
726 std::vector<std::function<void()>> funcs;
727 for (size_t i = 0; i < 3; ++i) {
728 funcs.push_back([i, &pendingFibers]() {
729 await([&pendingFibers](Promise<int> promise) {
730 pendingFibers.push_back(std::move(promise));
732 throw std::runtime_error("Runtime");
737 collectN(funcs.begin(), funcs.end(), 2);
739 EXPECT_EQ(1, pendingFibers.size());
743 } else if (pendingFibers.size()) {
744 pendingFibers.back().setValue(0);
745 pendingFibers.pop_back();
747 loopController.stop();
751 loopController.loop(std::move(loopFunc));
754 TEST(FiberManager, collectAll) {
755 std::vector<Promise<int>> pendingFibers;
756 bool taskAdded = false;
758 FiberManager manager(folly::make_unique<SimpleLoopController>());
759 auto& loopController =
760 dynamic_cast<SimpleLoopController&>(manager.loopController());
762 auto loopFunc = [&]() {
764 manager.addTask([&]() {
765 std::vector<std::function<int()>> funcs;
766 for (size_t i = 0; i < 3; ++i) {
767 funcs.push_back([i, &pendingFibers]() {
768 await([&pendingFibers](Promise<int> promise) {
769 pendingFibers.push_back(std::move(promise));
775 auto results = collectAll(funcs.begin(), funcs.end());
776 EXPECT_TRUE(pendingFibers.empty());
777 for (size_t i = 0; i < 3; ++i) {
778 EXPECT_EQ(i * 2 + 1, results[i]);
782 } else if (pendingFibers.size()) {
783 pendingFibers.back().setValue(0);
784 pendingFibers.pop_back();
786 loopController.stop();
790 loopController.loop(std::move(loopFunc));
793 TEST(FiberManager, collectAllVoid) {
794 std::vector<Promise<int>> pendingFibers;
795 bool taskAdded = false;
797 FiberManager manager(folly::make_unique<SimpleLoopController>());
798 auto& loopController =
799 dynamic_cast<SimpleLoopController&>(manager.loopController());
801 auto loopFunc = [&]() {
803 manager.addTask([&]() {
804 std::vector<std::function<void()>> funcs;
805 for (size_t i = 0; i < 3; ++i) {
806 funcs.push_back([i, &pendingFibers]() {
807 await([&pendingFibers](Promise<int> promise) {
808 pendingFibers.push_back(std::move(promise));
813 collectAll(funcs.begin(), funcs.end());
814 EXPECT_TRUE(pendingFibers.empty());
817 } else if (pendingFibers.size()) {
818 pendingFibers.back().setValue(0);
819 pendingFibers.pop_back();
821 loopController.stop();
825 loopController.loop(std::move(loopFunc));
828 TEST(FiberManager, collectAny) {
829 std::vector<Promise<int>> pendingFibers;
830 bool taskAdded = false;
832 FiberManager manager(folly::make_unique<SimpleLoopController>());
833 auto& loopController =
834 dynamic_cast<SimpleLoopController&>(manager.loopController());
836 auto loopFunc = [&]() {
838 manager.addTask([&]() {
839 std::vector<std::function<int()>> funcs;
840 for (size_t i = 0; i < 3; ++i) {
841 funcs.push_back([i, &pendingFibers]() {
842 await([&pendingFibers](Promise<int> promise) {
843 pendingFibers.push_back(std::move(promise));
846 throw std::runtime_error("This exception will be ignored");
852 auto result = collectAny(funcs.begin(), funcs.end());
853 EXPECT_EQ(2, pendingFibers.size());
854 EXPECT_EQ(2, result.first);
855 EXPECT_EQ(2 * 2 + 1, result.second);
858 } else if (pendingFibers.size()) {
859 pendingFibers.back().setValue(0);
860 pendingFibers.pop_back();
862 loopController.stop();
866 loopController.loop(std::move(loopFunc));
870 /* Checks that this function was run from a main context,
871 by comparing an address on a stack to a known main stack address
872 and a known related fiber stack address. The assumption
873 is that fiber stack and main stack will be far enough apart,
874 while any two values on the same stack will be close. */
875 void expectMainContext(bool& ran, int* mainLocation, int* fiberLocation) {
877 /* 2 pages is a good guess */
878 constexpr ssize_t DISTANCE = 0x2000 / sizeof(int);
880 EXPECT_TRUE(std::abs(&here - fiberLocation) > DISTANCE);
883 EXPECT_TRUE(std::abs(&here - mainLocation) < DISTANCE);
891 TEST(FiberManager, runInMainContext) {
892 FiberManager manager(folly::make_unique<SimpleLoopController>());
893 auto& loopController =
894 dynamic_cast<SimpleLoopController&>(manager.loopController());
896 bool checkRan = false;
899 manager.runInMainContext(
900 [&]() { expectMainContext(checkRan, &mainLocation, nullptr); });
901 EXPECT_TRUE(checkRan);
905 manager.addTask([&]() {
907 explicit A(int value_) : value(value_) {}
908 A(const A&) = delete;
914 auto ret = runInMainContext([&]() {
915 expectMainContext(checkRan, &mainLocation, &stackLocation);
918 EXPECT_TRUE(checkRan);
919 EXPECT_EQ(42, ret.value);
922 loopController.loop([&]() { loopController.stop(); });
924 EXPECT_TRUE(checkRan);
927 TEST(FiberManager, addTaskFinally) {
928 FiberManager manager(folly::make_unique<SimpleLoopController>());
929 auto& loopController =
930 dynamic_cast<SimpleLoopController&>(manager.loopController());
932 bool checkRan = false;
936 manager.addTaskFinally(
937 [&]() { return 1234; },
938 [&](Try<int>&& result) {
939 EXPECT_EQ(result.value(), 1234);
941 expectMainContext(checkRan, &mainLocation, nullptr);
944 EXPECT_FALSE(checkRan);
946 loopController.loop([&]() { loopController.stop(); });
948 EXPECT_TRUE(checkRan);
951 TEST(FiberManager, fibersPoolWithinLimit) {
952 FiberManager::Options opts;
953 opts.maxFibersPoolSize = 5;
955 FiberManager manager(folly::make_unique<SimpleLoopController>(), opts);
956 auto& loopController =
957 dynamic_cast<SimpleLoopController&>(manager.loopController());
959 size_t fibersRun = 0;
961 for (size_t i = 0; i < 5; ++i) {
962 manager.addTask([&]() { ++fibersRun; });
964 loopController.loop([&]() { loopController.stop(); });
966 EXPECT_EQ(5, fibersRun);
967 EXPECT_EQ(5, manager.fibersAllocated());
968 EXPECT_EQ(5, manager.fibersPoolSize());
970 for (size_t i = 0; i < 5; ++i) {
971 manager.addTask([&]() { ++fibersRun; });
973 loopController.loop([&]() { loopController.stop(); });
975 EXPECT_EQ(10, fibersRun);
976 EXPECT_EQ(5, manager.fibersAllocated());
977 EXPECT_EQ(5, manager.fibersPoolSize());
980 TEST(FiberManager, fibersPoolOverLimit) {
981 FiberManager::Options opts;
982 opts.maxFibersPoolSize = 5;
984 FiberManager manager(folly::make_unique<SimpleLoopController>(), opts);
985 auto& loopController =
986 dynamic_cast<SimpleLoopController&>(manager.loopController());
988 size_t fibersRun = 0;
990 for (size_t i = 0; i < 10; ++i) {
991 manager.addTask([&]() { ++fibersRun; });
994 EXPECT_EQ(0, fibersRun);
995 EXPECT_EQ(10, manager.fibersAllocated());
996 EXPECT_EQ(0, manager.fibersPoolSize());
998 loopController.loop([&]() { loopController.stop(); });
1000 EXPECT_EQ(10, fibersRun);
1001 EXPECT_EQ(5, manager.fibersAllocated());
1002 EXPECT_EQ(5, manager.fibersPoolSize());
1005 TEST(FiberManager, remoteFiberBasic) {
1006 FiberManager manager(folly::make_unique<SimpleLoopController>());
1007 auto& loopController =
1008 dynamic_cast<SimpleLoopController&>(manager.loopController());
1011 result[0] = result[1] = 0;
1012 folly::Optional<Promise<int>> savedPromise[2];
1013 manager.addTask([&]() {
1015 [&](Promise<int> promise) { savedPromise[0] = std::move(promise); });
1017 manager.addTask([&]() {
1019 [&](Promise<int> promise) { savedPromise[1] = std::move(promise); });
1022 manager.loopUntilNoReady();
1024 EXPECT_TRUE(savedPromise[0].hasValue());
1025 EXPECT_TRUE(savedPromise[1].hasValue());
1026 EXPECT_EQ(0, result[0]);
1027 EXPECT_EQ(0, result[1]);
1029 std::thread remoteThread0{[&]() { savedPromise[0]->setValue(42); }};
1030 std::thread remoteThread1{[&]() { savedPromise[1]->setValue(43); }};
1031 remoteThread0.join();
1032 remoteThread1.join();
1033 EXPECT_EQ(0, result[0]);
1034 EXPECT_EQ(0, result[1]);
1035 /* Should only have scheduled once */
1036 EXPECT_EQ(1, loopController.remoteScheduleCalled());
1038 manager.loopUntilNoReady();
1039 EXPECT_EQ(42, result[0]);
1040 EXPECT_EQ(43, result[1]);
1043 TEST(FiberManager, addTaskRemoteBasic) {
1044 FiberManager manager(folly::make_unique<SimpleLoopController>());
1047 result[0] = result[1] = 0;
1048 folly::Optional<Promise<int>> savedPromise[2];
1050 std::thread remoteThread0{[&]() {
1051 manager.addTaskRemote([&]() {
1053 [&](Promise<int> promise) { savedPromise[0] = std::move(promise); });
1056 std::thread remoteThread1{[&]() {
1057 manager.addTaskRemote([&]() {
1059 [&](Promise<int> promise) { savedPromise[1] = std::move(promise); });
1062 remoteThread0.join();
1063 remoteThread1.join();
1065 manager.loopUntilNoReady();
1067 EXPECT_TRUE(savedPromise[0].hasValue());
1068 EXPECT_TRUE(savedPromise[1].hasValue());
1069 EXPECT_EQ(0, result[0]);
1070 EXPECT_EQ(0, result[1]);
1072 savedPromise[0]->setValue(42);
1073 savedPromise[1]->setValue(43);
1075 EXPECT_EQ(0, result[0]);
1076 EXPECT_EQ(0, result[1]);
1078 manager.loopUntilNoReady();
1079 EXPECT_EQ(42, result[0]);
1080 EXPECT_EQ(43, result[1]);
1083 TEST(FiberManager, remoteHasTasks) {
1085 FiberManager fm(folly::make_unique<SimpleLoopController>());
1086 std::thread remote([&]() { fm.addTaskRemote([&]() { ++counter; }); });
1090 while (fm.hasTasks()) {
1091 fm.loopUntilNoReady();
1094 EXPECT_FALSE(fm.hasTasks());
1095 EXPECT_EQ(counter, 1);
1098 TEST(FiberManager, remoteHasReadyTasks) {
1100 folly::Optional<Promise<int>> savedPromise;
1101 FiberManager fm(folly::make_unique<SimpleLoopController>());
1102 std::thread remote([&]() {
1103 fm.addTaskRemote([&]() {
1105 [&](Promise<int> promise) { savedPromise = std::move(promise); });
1106 EXPECT_TRUE(fm.hasTasks());
1111 EXPECT_TRUE(fm.hasTasks());
1113 fm.loopUntilNoReady();
1114 EXPECT_TRUE(fm.hasTasks());
1116 std::thread remote2([&]() { savedPromise->setValue(47); });
1118 EXPECT_TRUE(fm.hasTasks());
1120 fm.loopUntilNoReady();
1121 EXPECT_FALSE(fm.hasTasks());
1123 EXPECT_EQ(result, 47);
1126 template <typename Data>
1127 void testFiberLocal() {
1129 LocalType<Data>(), folly::make_unique<SimpleLoopController>());
1132 EXPECT_EQ(42, local<Data>().value);
1134 local<Data>().value = 43;
1137 EXPECT_EQ(43, local<Data>().value);
1139 local<Data>().value = 44;
1141 addTask([]() { EXPECT_EQ(44, local<Data>().value); });
1146 EXPECT_EQ(42, local<Data>().value);
1148 local<Data>().value = 43;
1150 fm.addTaskRemote([]() { EXPECT_EQ(43, local<Data>().value); });
1154 EXPECT_EQ(42, local<Data>().value);
1155 local<Data>().value = 43;
1158 EXPECT_EQ(43, local<Data>().value);
1159 local<Data>().value = 44;
1161 std::vector<std::function<void()>> tasks{task};
1162 collectAny(tasks.begin(), tasks.end());
1164 EXPECT_EQ(43, local<Data>().value);
1167 fm.loopUntilNoReady();
1168 EXPECT_FALSE(fm.hasTasks());
1171 TEST(FiberManager, fiberLocal) {
1176 testFiberLocal<SimpleData>();
1179 TEST(FiberManager, fiberLocalHeap) {
1181 char _[1024 * 1024];
1185 testFiberLocal<LargeData>();
1188 TEST(FiberManager, fiberLocalDestructor) {
1195 EXPECT_EQ(42, local<CrazyData>().data);
1196 // Make sure we don't have infinite loop
1197 local<CrazyData>().data = 0;
1204 LocalType<CrazyData>(), folly::make_unique<SimpleLoopController>());
1206 fm.addTask([]() { local<CrazyData>().data = 41; });
1208 fm.loopUntilNoReady();
1209 EXPECT_FALSE(fm.hasTasks());
1212 TEST(FiberManager, yieldTest) {
1213 FiberManager manager(folly::make_unique<SimpleLoopController>());
1214 auto& loopController =
1215 dynamic_cast<SimpleLoopController&>(manager.loopController());
1217 bool checkRan = false;
1219 manager.addTask([&]() {
1224 loopController.loop([&]() {
1226 loopController.stop();
1230 EXPECT_TRUE(checkRan);
1233 TEST(FiberManager, RequestContext) {
1234 FiberManager fm(folly::make_unique<SimpleLoopController>());
1236 bool checkRun1 = false;
1237 bool checkRun2 = false;
1238 bool checkRun3 = false;
1239 bool checkRun4 = false;
1240 folly::fibers::Baton baton1;
1241 folly::fibers::Baton baton2;
1242 folly::fibers::Baton baton3;
1243 folly::fibers::Baton baton4;
1245 folly::RequestContext::create();
1246 auto rcontext1 = folly::RequestContext::get();
1248 EXPECT_EQ(rcontext1, folly::RequestContext::get());
1249 baton1.wait([&]() { EXPECT_EQ(rcontext1, folly::RequestContext::get()); });
1250 EXPECT_EQ(rcontext1, folly::RequestContext::get());
1252 [&]() { EXPECT_EQ(rcontext1, folly::RequestContext::get()); });
1256 folly::RequestContext::create();
1257 auto rcontext2 = folly::RequestContext::get();
1258 fm.addTaskRemote([&]() {
1259 EXPECT_EQ(rcontext2, folly::RequestContext::get());
1261 EXPECT_EQ(rcontext2, folly::RequestContext::get());
1265 folly::RequestContext::create();
1266 auto rcontext3 = folly::RequestContext::get();
1269 EXPECT_EQ(rcontext3, folly::RequestContext::get());
1271 EXPECT_EQ(rcontext3, folly::RequestContext::get());
1273 return folly::Unit();
1275 [&](Try<folly::Unit>&& /* t */) {
1276 EXPECT_EQ(rcontext3, folly::RequestContext::get());
1280 folly::RequestContext::setContext(nullptr);
1282 folly::RequestContext::create();
1283 auto rcontext4 = folly::RequestContext::get();
1285 EXPECT_EQ(rcontext4, folly::RequestContext::get());
1289 folly::RequestContext::create();
1290 auto rcontext = folly::RequestContext::get();
1292 fm.loopUntilNoReady();
1293 EXPECT_EQ(rcontext, folly::RequestContext::get());
1296 EXPECT_EQ(rcontext, folly::RequestContext::get());
1297 fm.loopUntilNoReady();
1298 EXPECT_TRUE(checkRun1);
1299 EXPECT_EQ(rcontext, folly::RequestContext::get());
1302 EXPECT_EQ(rcontext, folly::RequestContext::get());
1303 fm.loopUntilNoReady();
1304 EXPECT_TRUE(checkRun2);
1305 EXPECT_EQ(rcontext, folly::RequestContext::get());
1308 EXPECT_EQ(rcontext, folly::RequestContext::get());
1309 fm.loopUntilNoReady();
1310 EXPECT_TRUE(checkRun3);
1311 EXPECT_EQ(rcontext, folly::RequestContext::get());
1314 EXPECT_EQ(rcontext, folly::RequestContext::get());
1315 fm.loopUntilNoReady();
1316 EXPECT_TRUE(checkRun4);
1317 EXPECT_EQ(rcontext, folly::RequestContext::get());
1320 TEST(FiberManager, resizePeriodically) {
1321 FiberManager::Options opts;
1322 opts.fibersPoolResizePeriodMs = 300;
1323 opts.maxFibersPoolSize = 5;
1325 FiberManager manager(folly::make_unique<EventBaseLoopController>(), opts);
1327 folly::EventBase evb;
1328 dynamic_cast<EventBaseLoopController&>(manager.loopController())
1329 .attachEventBase(evb);
1331 std::vector<Baton> batons(10);
1333 size_t tasksRun = 0;
1334 for (size_t i = 0; i < 30; ++i) {
1335 manager.addTask([i, &batons, &tasksRun]() {
1337 // Keep some fibers active indefinitely
1338 if (i < batons.size()) {
1344 EXPECT_EQ(0, tasksRun);
1345 EXPECT_EQ(30, manager.fibersAllocated());
1346 EXPECT_EQ(0, manager.fibersPoolSize());
1349 EXPECT_EQ(30, tasksRun);
1350 EXPECT_EQ(30, manager.fibersAllocated());
1351 // Can go over maxFibersPoolSize, 10 of 30 fibers still active
1352 EXPECT_EQ(20, manager.fibersPoolSize());
1354 std::this_thread::sleep_for(std::chrono::milliseconds(400));
1355 evb.loopOnce(); // no fibers active in this period
1356 EXPECT_EQ(30, manager.fibersAllocated());
1357 EXPECT_EQ(20, manager.fibersPoolSize());
1359 std::this_thread::sleep_for(std::chrono::milliseconds(400));
1360 evb.loopOnce(); // should shrink fibers pool to maxFibersPoolSize
1361 EXPECT_EQ(15, manager.fibersAllocated());
1362 EXPECT_EQ(5, manager.fibersPoolSize());
1364 for (size_t i = 0; i < batons.size(); ++i) {
1368 EXPECT_EQ(15, manager.fibersAllocated());
1369 EXPECT_EQ(15, manager.fibersPoolSize());
1371 std::this_thread::sleep_for(std::chrono::milliseconds(400));
1372 evb.loopOnce(); // 10 fibers active in last period
1373 EXPECT_EQ(10, manager.fibersAllocated());
1374 EXPECT_EQ(10, manager.fibersPoolSize());
1376 std::this_thread::sleep_for(std::chrono::milliseconds(400));
1378 EXPECT_EQ(5, manager.fibersAllocated());
1379 EXPECT_EQ(5, manager.fibersPoolSize());
1382 TEST(FiberManager, batonWaitTimeoutHandler) {
1383 FiberManager manager(folly::make_unique<EventBaseLoopController>());
1385 folly::EventBase evb;
1386 dynamic_cast<EventBaseLoopController&>(manager.loopController())
1387 .attachEventBase(evb);
1389 size_t fibersRun = 0;
1391 Baton::TimeoutHandler timeoutHandler;
1393 manager.addTask([&]() {
1394 baton.wait(timeoutHandler);
1397 manager.loopUntilNoReady();
1399 EXPECT_FALSE(baton.try_wait());
1400 EXPECT_EQ(0, fibersRun);
1402 timeoutHandler.scheduleTimeout(std::chrono::milliseconds(250));
1403 std::this_thread::sleep_for(std::chrono::milliseconds(500));
1405 EXPECT_FALSE(baton.try_wait());
1406 EXPECT_EQ(0, fibersRun);
1409 manager.loopUntilNoReady();
1411 EXPECT_EQ(1, fibersRun);
1414 TEST(FiberManager, batonWaitTimeoutMany) {
1415 FiberManager manager(folly::make_unique<EventBaseLoopController>());
1417 folly::EventBase evb;
1418 dynamic_cast<EventBaseLoopController&>(manager.loopController())
1419 .attachEventBase(evb);
1421 constexpr size_t kNumTimeoutTasks = 10000;
1422 size_t tasksCount = kNumTimeoutTasks;
1424 // We add many tasks to hit timeout queue deallocation logic.
1425 for (size_t i = 0; i < kNumTimeoutTasks; ++i) {
1426 manager.addTask([&]() {
1428 Baton::TimeoutHandler timeoutHandler;
1430 folly::fibers::addTask([&] {
1431 timeoutHandler.scheduleTimeout(std::chrono::milliseconds(1000));
1434 baton.wait(timeoutHandler);
1435 if (--tasksCount == 0) {
1436 evb.terminateLoopSoon();
1444 TEST(FiberManager, remoteFutureTest) {
1445 FiberManager fiberManager(folly::make_unique<SimpleLoopController>());
1446 auto& loopController =
1447 dynamic_cast<SimpleLoopController&>(fiberManager.loopController());
1451 auto f1 = fiberManager.addTaskFuture([&]() { return testValue1; });
1452 auto f2 = fiberManager.addTaskRemoteFuture([&]() { return testValue2; });
1453 loopController.loop([&]() { loopController.stop(); });
1457 EXPECT_EQ(v1, testValue1);
1458 EXPECT_EQ(v2, testValue2);
1461 // Test that a void function produes a Future<Unit>.
1462 TEST(FiberManager, remoteFutureVoidUnitTest) {
1463 FiberManager fiberManager(folly::make_unique<SimpleLoopController>());
1464 auto& loopController =
1465 dynamic_cast<SimpleLoopController&>(fiberManager.loopController());
1467 bool ranLocal = false;
1468 folly::Future<folly::Unit> futureLocal =
1469 fiberManager.addTaskFuture([&]() { ranLocal = true; });
1471 bool ranRemote = false;
1472 folly::Future<folly::Unit> futureRemote =
1473 fiberManager.addTaskRemoteFuture([&]() { ranRemote = true; });
1475 loopController.loop([&]() { loopController.stop(); });
1478 ASSERT_TRUE(ranLocal);
1480 futureRemote.wait();
1481 ASSERT_TRUE(ranRemote);
1484 TEST(FiberManager, nestedFiberManagers) {
1485 folly::EventBase outerEvb;
1486 folly::EventBase innerEvb;
1488 getFiberManager(outerEvb).addTask([&]() {
1490 &getFiberManager(outerEvb), FiberManager::getFiberManagerUnsafe());
1492 runInMainContext([&]() {
1493 getFiberManager(innerEvb).addTask([&]() {
1495 &getFiberManager(innerEvb), FiberManager::getFiberManagerUnsafe());
1497 innerEvb.terminateLoopSoon();
1500 innerEvb.loopForever();
1504 &getFiberManager(outerEvb), FiberManager::getFiberManagerUnsafe());
1506 outerEvb.terminateLoopSoon();
1509 outerEvb.loopForever();
1512 static size_t sNumAwaits;
1514 void runBenchmark(size_t numAwaits, size_t toSend) {
1515 sNumAwaits = numAwaits;
1517 FiberManager fiberManager(folly::make_unique<SimpleLoopController>());
1518 auto& loopController =
1519 dynamic_cast<SimpleLoopController&>(fiberManager.loopController());
1521 std::queue<Promise<int>> pendingRequests;
1522 static const size_t maxOutstanding = 5;
1524 auto loop = [&fiberManager, &loopController, &pendingRequests, &toSend]() {
1525 if (pendingRequests.size() == maxOutstanding || toSend == 0) {
1526 if (pendingRequests.empty()) {
1529 pendingRequests.front().setValue(0);
1530 pendingRequests.pop();
1532 fiberManager.addTask([&pendingRequests]() {
1533 for (size_t i = 0; i < sNumAwaits; ++i) {
1534 auto result = await([&pendingRequests](Promise<int> promise) {
1535 pendingRequests.push(std::move(promise));
1537 DCHECK_EQ(result, 0);
1541 if (--toSend == 0) {
1542 loopController.stop();
1547 loopController.loop(std::move(loop));
1550 BENCHMARK(FiberManagerBasicOneAwait, iters) {
1551 runBenchmark(1, iters);
1554 BENCHMARK(FiberManagerBasicFiveAwaits, iters) {
1555 runBenchmark(5, iters);
1558 BENCHMARK(FiberManagerCreateDestroy, iters) {
1559 for (size_t i = 0; i < iters; ++i) {
1560 folly::EventBase evb;
1561 auto& fm = folly::fibers::getFiberManager(evb);
1562 fm.addTask([]() {});
1567 BENCHMARK(FiberManagerAllocateDeallocatePattern, iters) {
1568 static const size_t kNumAllocations = 10000;
1570 FiberManager::Options opts;
1571 opts.maxFibersPoolSize = 0;
1573 FiberManager fiberManager(folly::make_unique<SimpleLoopController>(), opts);
1575 for (size_t iter = 0; iter < iters; ++iter) {
1576 EXPECT_EQ(0, fiberManager.fibersPoolSize());
1578 size_t fibersRun = 0;
1580 for (size_t i = 0; i < kNumAllocations; ++i) {
1581 fiberManager.addTask([&fibersRun] { ++fibersRun; });
1582 fiberManager.loopUntilNoReady();
1585 EXPECT_EQ(10000, fibersRun);
1586 EXPECT_EQ(0, fiberManager.fibersPoolSize());
1590 BENCHMARK(FiberManagerAllocateLargeChunk, iters) {
1591 static const size_t kNumAllocations = 10000;
1593 FiberManager::Options opts;
1594 opts.maxFibersPoolSize = 0;
1596 FiberManager fiberManager(folly::make_unique<SimpleLoopController>(), opts);
1598 for (size_t iter = 0; iter < iters; ++iter) {
1599 EXPECT_EQ(0, fiberManager.fibersPoolSize());
1601 size_t fibersRun = 0;
1603 for (size_t i = 0; i < kNumAllocations; ++i) {
1604 fiberManager.addTask([&fibersRun] { ++fibersRun; });
1607 fiberManager.loopUntilNoReady();
1609 EXPECT_EQ(10000, fibersRun);
1610 EXPECT_EQ(0, fiberManager.fibersPoolSize());