Make RequestContext provider overridable in order to save cost of setContext() on...
[folly.git] / folly / fibers / test / FibersTest.cpp
1 /*
2  * Copyright 2017 Facebook, Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *   http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 #include <atomic>
17 #include <thread>
18 #include <vector>
19
20 #include <folly/Memory.h>
21 #include <folly/Random.h>
22 #include <folly/futures/Future.h>
23
24 #include <folly/Conv.h>
25 #include <folly/fibers/AddTasks.h>
26 #include <folly/fibers/AtomicBatchDispatcher.h>
27 #include <folly/fibers/BatchDispatcher.h>
28 #include <folly/fibers/EventBaseLoopController.h>
29 #include <folly/fibers/FiberManager.h>
30 #include <folly/fibers/FiberManagerMap.h>
31 #include <folly/fibers/GenericBaton.h>
32 #include <folly/fibers/Semaphore.h>
33 #include <folly/fibers/SimpleLoopController.h>
34 #include <folly/fibers/TimedMutex.h>
35 #include <folly/fibers/WhenN.h>
36 #include <folly/io/async/Request.h>
37 #include <folly/io/async/ScopedEventBaseThread.h>
38 #include <folly/portability/GTest.h>
39
40 using namespace folly::fibers;
41
42 using folly::Try;
43
44 TEST(FiberManager, batonTimedWaitTimeout) {
45   bool taskAdded = false;
46   size_t iterations = 0;
47
48   FiberManager manager(std::make_unique<SimpleLoopController>());
49   auto& loopController =
50       dynamic_cast<SimpleLoopController&>(manager.loopController());
51
52   auto loopFunc = [&]() {
53     if (!taskAdded) {
54       manager.addTask([&]() {
55         Baton baton;
56
57         auto res = baton.timed_wait(std::chrono::milliseconds(230));
58
59         EXPECT_FALSE(res);
60         EXPECT_EQ(5, iterations);
61
62         loopController.stop();
63       });
64       manager.addTask([&]() {
65         Baton baton;
66
67         auto res = baton.timed_wait(std::chrono::milliseconds(130));
68
69         EXPECT_FALSE(res);
70         EXPECT_EQ(3, iterations);
71
72         loopController.stop();
73       });
74       taskAdded = true;
75     } else {
76       std::this_thread::sleep_for(std::chrono::milliseconds(50));
77       iterations++;
78     }
79   };
80
81   loopController.loop(std::move(loopFunc));
82 }
83
84 TEST(FiberManager, batonTimedWaitPost) {
85   bool taskAdded = false;
86   size_t iterations = 0;
87   Baton* baton_ptr;
88
89   FiberManager manager(std::make_unique<SimpleLoopController>());
90   auto& loopController =
91       dynamic_cast<SimpleLoopController&>(manager.loopController());
92
93   auto loopFunc = [&]() {
94     if (!taskAdded) {
95       manager.addTask([&]() {
96         Baton baton;
97         baton_ptr = &baton;
98
99         auto res = baton.timed_wait(std::chrono::milliseconds(130));
100
101         EXPECT_TRUE(res);
102         EXPECT_EQ(2, iterations);
103
104         loopController.stop();
105       });
106       taskAdded = true;
107     } else {
108       std::this_thread::sleep_for(std::chrono::milliseconds(50));
109       iterations++;
110       if (iterations == 2) {
111         baton_ptr->post();
112       }
113     }
114   };
115
116   loopController.loop(std::move(loopFunc));
117 }
118
119 TEST(FiberManager, batonTimedWaitTimeoutEvb) {
120   size_t tasksComplete = 0;
121
122   folly::EventBase evb;
123
124   FiberManager manager(std::make_unique<EventBaseLoopController>());
125   dynamic_cast<EventBaseLoopController&>(manager.loopController())
126       .attachEventBase(evb);
127
128   auto task = [&](size_t timeout_ms) {
129     Baton baton;
130
131     auto start = EventBaseLoopController::Clock::now();
132     auto res = baton.timed_wait(std::chrono::milliseconds(timeout_ms));
133     auto finish = EventBaseLoopController::Clock::now();
134
135     EXPECT_FALSE(res);
136
137     auto duration_ms =
138         std::chrono::duration_cast<std::chrono::milliseconds>(finish - start);
139
140     EXPECT_GT(duration_ms.count(), timeout_ms - 50);
141     EXPECT_LT(duration_ms.count(), timeout_ms + 50);
142
143     if (++tasksComplete == 2) {
144       evb.terminateLoopSoon();
145     }
146   };
147
148   evb.runInEventBaseThread([&]() {
149     manager.addTask([&]() { task(500); });
150     manager.addTask([&]() { task(250); });
151   });
152
153   evb.loopForever();
154
155   EXPECT_EQ(2, tasksComplete);
156 }
157
158 TEST(FiberManager, batonTimedWaitPostEvb) {
159   size_t tasksComplete = 0;
160
161   folly::EventBase evb;
162
163   FiberManager manager(std::make_unique<EventBaseLoopController>());
164   dynamic_cast<EventBaseLoopController&>(manager.loopController())
165       .attachEventBase(evb);
166
167   evb.runInEventBaseThread([&]() {
168     manager.addTask([&]() {
169       Baton baton;
170
171       evb.tryRunAfterDelay([&]() { baton.post(); }, 100);
172
173       auto start = EventBaseLoopController::Clock::now();
174       auto res = baton.timed_wait(std::chrono::milliseconds(130));
175       auto finish = EventBaseLoopController::Clock::now();
176
177       EXPECT_TRUE(res);
178
179       auto duration_ms =
180           std::chrono::duration_cast<std::chrono::milliseconds>(finish - start);
181
182       EXPECT_TRUE(duration_ms.count() > 95 && duration_ms.count() < 110);
183
184       if (++tasksComplete == 1) {
185         evb.terminateLoopSoon();
186       }
187     });
188   });
189
190   evb.loopForever();
191
192   EXPECT_EQ(1, tasksComplete);
193 }
194
195 TEST(FiberManager, batonTryWait) {
196   FiberManager manager(std::make_unique<SimpleLoopController>());
197
198   // Check if try_wait and post work as expected
199   Baton b;
200
201   manager.addTask([&]() {
202     while (!b.try_wait()) {
203     }
204   });
205   auto thr = std::thread([&]() {
206     std::this_thread::sleep_for(std::chrono::milliseconds(300));
207     b.post();
208   });
209
210   manager.loopUntilNoReady();
211   thr.join();
212
213   Baton c;
214
215   // Check try_wait without post
216   manager.addTask([&]() {
217     int cnt = 100;
218     while (cnt && !c.try_wait()) {
219       cnt--;
220     }
221     EXPECT_TRUE(!c.try_wait()); // must still hold
222     EXPECT_EQ(cnt, 0);
223   });
224
225   manager.loopUntilNoReady();
226 }
227
228 TEST(FiberManager, genericBatonFiberWait) {
229   FiberManager manager(std::make_unique<SimpleLoopController>());
230
231   GenericBaton b;
232   bool fiberRunning = false;
233
234   manager.addTask([&]() {
235     EXPECT_EQ(manager.hasActiveFiber(), true);
236     fiberRunning = true;
237     b.wait();
238     fiberRunning = false;
239   });
240
241   EXPECT_FALSE(fiberRunning);
242   manager.loopUntilNoReady();
243   EXPECT_TRUE(fiberRunning); // ensure fiber still active
244
245   auto thr = std::thread([&]() {
246     std::this_thread::sleep_for(std::chrono::milliseconds(300));
247     b.post();
248   });
249
250   while (fiberRunning) {
251     manager.loopUntilNoReady();
252   }
253
254   thr.join();
255 }
256
257 TEST(FiberManager, genericBatonThreadWait) {
258   FiberManager manager(std::make_unique<SimpleLoopController>());
259   GenericBaton b;
260   std::atomic<bool> threadWaiting(false);
261
262   auto thr = std::thread([&]() {
263     threadWaiting = true;
264     b.wait();
265     threadWaiting = false;
266   });
267
268   while (!threadWaiting) {
269   }
270   std::this_thread::sleep_for(std::chrono::milliseconds(300));
271
272   manager.addTask([&]() {
273     EXPECT_EQ(manager.hasActiveFiber(), true);
274     EXPECT_TRUE(threadWaiting);
275     b.post();
276     while (threadWaiting) {
277     }
278   });
279
280   manager.loopUntilNoReady();
281   thr.join();
282 }
283
284 TEST(FiberManager, addTasksNoncopyable) {
285   std::vector<Promise<int>> pendingFibers;
286   bool taskAdded = false;
287
288   FiberManager manager(std::make_unique<SimpleLoopController>());
289   auto& loopController =
290       dynamic_cast<SimpleLoopController&>(manager.loopController());
291
292   auto loopFunc = [&]() {
293     if (!taskAdded) {
294       manager.addTask([&]() {
295         std::vector<std::function<std::unique_ptr<int>()>> funcs;
296         for (int i = 0; i < 3; ++i) {
297           funcs.push_back([i, &pendingFibers]() {
298             await([&pendingFibers](Promise<int> promise) {
299               pendingFibers.push_back(std::move(promise));
300             });
301             return std::make_unique<int>(i * 2 + 1);
302           });
303         }
304
305         auto iter = addTasks(funcs.begin(), funcs.end());
306
307         size_t n = 0;
308         while (iter.hasNext()) {
309           auto result = iter.awaitNext();
310           EXPECT_EQ(2 * iter.getTaskID() + 1, *result);
311           EXPECT_GE(2 - n, pendingFibers.size());
312           ++n;
313         }
314         EXPECT_EQ(3, n);
315       });
316       taskAdded = true;
317     } else if (pendingFibers.size()) {
318       pendingFibers.back().setValue(0);
319       pendingFibers.pop_back();
320     } else {
321       loopController.stop();
322     }
323   };
324
325   loopController.loop(std::move(loopFunc));
326 }
327
328 TEST(FiberManager, awaitThrow) {
329   folly::EventBase evb;
330   struct ExpectedException {};
331   getFiberManager(evb)
332       .addTaskFuture([&] {
333         EXPECT_THROW(
334             await([](Promise<int> p) {
335               p.setValue(42);
336               throw ExpectedException();
337             }),
338             ExpectedException);
339
340         EXPECT_THROW(
341             await([&](Promise<int> p) {
342               evb.runInEventBaseThread([p = std::move(p)]() mutable {
343                 p.setValue(42);
344               });
345               throw ExpectedException();
346             }),
347             ExpectedException);
348       })
349       .waitVia(&evb);
350 }
351
352 TEST(FiberManager, addTasksThrow) {
353   std::vector<Promise<int>> pendingFibers;
354   bool taskAdded = false;
355
356   FiberManager manager(std::make_unique<SimpleLoopController>());
357   auto& loopController =
358       dynamic_cast<SimpleLoopController&>(manager.loopController());
359
360   auto loopFunc = [&]() {
361     if (!taskAdded) {
362       manager.addTask([&]() {
363         std::vector<std::function<int()>> funcs;
364         for (size_t i = 0; i < 3; ++i) {
365           funcs.push_back([i, &pendingFibers]() {
366             await([&pendingFibers](Promise<int> promise) {
367               pendingFibers.push_back(std::move(promise));
368             });
369             if (i % 2 == 0) {
370               throw std::runtime_error("Runtime");
371             }
372             return i * 2 + 1;
373           });
374         }
375
376         auto iter = addTasks(funcs.begin(), funcs.end());
377
378         size_t n = 0;
379         while (iter.hasNext()) {
380           try {
381             int result = iter.awaitNext();
382             EXPECT_EQ(1, iter.getTaskID() % 2);
383             EXPECT_EQ(2 * iter.getTaskID() + 1, result);
384           } catch (...) {
385             EXPECT_EQ(0, iter.getTaskID() % 2);
386           }
387           EXPECT_GE(2 - n, pendingFibers.size());
388           ++n;
389         }
390         EXPECT_EQ(3, n);
391       });
392       taskAdded = true;
393     } else if (pendingFibers.size()) {
394       pendingFibers.back().setValue(0);
395       pendingFibers.pop_back();
396     } else {
397       loopController.stop();
398     }
399   };
400
401   loopController.loop(std::move(loopFunc));
402 }
403
404 TEST(FiberManager, addTasksVoid) {
405   std::vector<Promise<int>> pendingFibers;
406   bool taskAdded = false;
407
408   FiberManager manager(std::make_unique<SimpleLoopController>());
409   auto& loopController =
410       dynamic_cast<SimpleLoopController&>(manager.loopController());
411
412   auto loopFunc = [&]() {
413     if (!taskAdded) {
414       manager.addTask([&]() {
415         std::vector<std::function<void()>> funcs;
416         for (size_t i = 0; i < 3; ++i) {
417           funcs.push_back([&pendingFibers]() {
418             await([&pendingFibers](Promise<int> promise) {
419               pendingFibers.push_back(std::move(promise));
420             });
421           });
422         }
423
424         auto iter = addTasks(funcs.begin(), funcs.end());
425
426         size_t n = 0;
427         while (iter.hasNext()) {
428           iter.awaitNext();
429           EXPECT_GE(2 - n, pendingFibers.size());
430           ++n;
431         }
432         EXPECT_EQ(3, n);
433       });
434       taskAdded = true;
435     } else if (pendingFibers.size()) {
436       pendingFibers.back().setValue(0);
437       pendingFibers.pop_back();
438     } else {
439       loopController.stop();
440     }
441   };
442
443   loopController.loop(std::move(loopFunc));
444 }
445
446 TEST(FiberManager, addTasksVoidThrow) {
447   std::vector<Promise<int>> pendingFibers;
448   bool taskAdded = false;
449
450   FiberManager manager(std::make_unique<SimpleLoopController>());
451   auto& loopController =
452       dynamic_cast<SimpleLoopController&>(manager.loopController());
453
454   auto loopFunc = [&]() {
455     if (!taskAdded) {
456       manager.addTask([&]() {
457         std::vector<std::function<void()>> funcs;
458         for (size_t i = 0; i < 3; ++i) {
459           funcs.push_back([i, &pendingFibers]() {
460             await([&pendingFibers](Promise<int> promise) {
461               pendingFibers.push_back(std::move(promise));
462             });
463             if (i % 2 == 0) {
464               throw std::runtime_error("");
465             }
466           });
467         }
468
469         auto iter = addTasks(funcs.begin(), funcs.end());
470
471         size_t n = 0;
472         while (iter.hasNext()) {
473           try {
474             iter.awaitNext();
475             EXPECT_EQ(1, iter.getTaskID() % 2);
476           } catch (...) {
477             EXPECT_EQ(0, iter.getTaskID() % 2);
478           }
479           EXPECT_GE(2 - n, pendingFibers.size());
480           ++n;
481         }
482         EXPECT_EQ(3, n);
483       });
484       taskAdded = true;
485     } else if (pendingFibers.size()) {
486       pendingFibers.back().setValue(0);
487       pendingFibers.pop_back();
488     } else {
489       loopController.stop();
490     }
491   };
492
493   loopController.loop(std::move(loopFunc));
494 }
495
496 TEST(FiberManager, addTasksReserve) {
497   std::vector<Promise<int>> pendingFibers;
498   bool taskAdded = false;
499
500   FiberManager manager(std::make_unique<SimpleLoopController>());
501   auto& loopController =
502       dynamic_cast<SimpleLoopController&>(manager.loopController());
503
504   auto loopFunc = [&]() {
505     if (!taskAdded) {
506       manager.addTask([&]() {
507         std::vector<std::function<void()>> funcs;
508         for (size_t i = 0; i < 3; ++i) {
509           funcs.push_back([&pendingFibers]() {
510             await([&pendingFibers](Promise<int> promise) {
511               pendingFibers.push_back(std::move(promise));
512             });
513           });
514         }
515
516         auto iter = addTasks(funcs.begin(), funcs.end());
517
518         iter.reserve(2);
519         EXPECT_TRUE(iter.hasCompleted());
520         EXPECT_TRUE(iter.hasPending());
521         EXPECT_TRUE(iter.hasNext());
522
523         iter.awaitNext();
524         EXPECT_TRUE(iter.hasCompleted());
525         EXPECT_TRUE(iter.hasPending());
526         EXPECT_TRUE(iter.hasNext());
527
528         iter.awaitNext();
529         EXPECT_FALSE(iter.hasCompleted());
530         EXPECT_TRUE(iter.hasPending());
531         EXPECT_TRUE(iter.hasNext());
532
533         iter.awaitNext();
534         EXPECT_FALSE(iter.hasCompleted());
535         EXPECT_FALSE(iter.hasPending());
536         EXPECT_FALSE(iter.hasNext());
537       });
538       taskAdded = true;
539     } else if (pendingFibers.size()) {
540       pendingFibers.back().setValue(0);
541       pendingFibers.pop_back();
542     } else {
543       loopController.stop();
544     }
545   };
546
547   loopController.loop(std::move(loopFunc));
548 }
549
550 TEST(FiberManager, addTaskDynamic) {
551   folly::EventBase evb;
552
553   Baton batons[3];
554
555   auto makeTask = [&](size_t taskId) {
556     return [&, taskId]() -> size_t {
557       batons[taskId].wait();
558       return taskId;
559     };
560   };
561
562   getFiberManager(evb)
563       .addTaskFuture([&]() {
564         TaskIterator<size_t> iterator;
565
566         iterator.addTask(makeTask(0));
567         iterator.addTask(makeTask(1));
568
569         batons[1].post();
570
571         EXPECT_EQ(1, iterator.awaitNext());
572
573         iterator.addTask(makeTask(2));
574
575         batons[2].post();
576
577         EXPECT_EQ(2, iterator.awaitNext());
578
579         batons[0].post();
580
581         EXPECT_EQ(0, iterator.awaitNext());
582       })
583       .waitVia(&evb);
584 }
585
586 TEST(FiberManager, forEach) {
587   std::vector<Promise<int>> pendingFibers;
588   bool taskAdded = false;
589
590   FiberManager manager(std::make_unique<SimpleLoopController>());
591   auto& loopController =
592       dynamic_cast<SimpleLoopController&>(manager.loopController());
593
594   auto loopFunc = [&]() {
595     if (!taskAdded) {
596       manager.addTask([&]() {
597         std::vector<std::function<int()>> funcs;
598         for (size_t i = 0; i < 3; ++i) {
599           funcs.push_back([i, &pendingFibers]() {
600             await([&pendingFibers](Promise<int> promise) {
601               pendingFibers.push_back(std::move(promise));
602             });
603             return i * 2 + 1;
604           });
605         }
606
607         std::vector<std::pair<size_t, int>> results;
608         forEach(funcs.begin(), funcs.end(), [&results](size_t id, int result) {
609           results.emplace_back(id, result);
610         });
611         EXPECT_EQ(3, results.size());
612         EXPECT_TRUE(pendingFibers.empty());
613         for (size_t i = 0; i < 3; ++i) {
614           EXPECT_EQ(results[i].first * 2 + 1, results[i].second);
615         }
616       });
617       taskAdded = true;
618     } else if (pendingFibers.size()) {
619       pendingFibers.back().setValue(0);
620       pendingFibers.pop_back();
621     } else {
622       loopController.stop();
623     }
624   };
625
626   loopController.loop(std::move(loopFunc));
627 }
628
629 TEST(FiberManager, collectN) {
630   std::vector<Promise<int>> pendingFibers;
631   bool taskAdded = false;
632
633   FiberManager manager(std::make_unique<SimpleLoopController>());
634   auto& loopController =
635       dynamic_cast<SimpleLoopController&>(manager.loopController());
636
637   auto loopFunc = [&]() {
638     if (!taskAdded) {
639       manager.addTask([&]() {
640         std::vector<std::function<int()>> funcs;
641         for (size_t i = 0; i < 3; ++i) {
642           funcs.push_back([i, &pendingFibers]() {
643             await([&pendingFibers](Promise<int> promise) {
644               pendingFibers.push_back(std::move(promise));
645             });
646             return i * 2 + 1;
647           });
648         }
649
650         auto results = collectN(funcs.begin(), funcs.end(), 2);
651         EXPECT_EQ(2, results.size());
652         EXPECT_EQ(1, pendingFibers.size());
653         for (size_t i = 0; i < 2; ++i) {
654           EXPECT_EQ(results[i].first * 2 + 1, results[i].second);
655         }
656       });
657       taskAdded = true;
658     } else if (pendingFibers.size()) {
659       pendingFibers.back().setValue(0);
660       pendingFibers.pop_back();
661     } else {
662       loopController.stop();
663     }
664   };
665
666   loopController.loop(std::move(loopFunc));
667 }
668
669 TEST(FiberManager, collectNThrow) {
670   std::vector<Promise<int>> pendingFibers;
671   bool taskAdded = false;
672
673   FiberManager manager(std::make_unique<SimpleLoopController>());
674   auto& loopController =
675       dynamic_cast<SimpleLoopController&>(manager.loopController());
676
677   auto loopFunc = [&]() {
678     if (!taskAdded) {
679       manager.addTask([&]() {
680         std::vector<std::function<int()>> funcs;
681         for (size_t i = 0; i < 3; ++i) {
682           funcs.push_back([&pendingFibers]() -> size_t {
683             await([&pendingFibers](Promise<int> promise) {
684               pendingFibers.push_back(std::move(promise));
685             });
686             throw std::runtime_error("Runtime");
687           });
688         }
689
690         try {
691           collectN(funcs.begin(), funcs.end(), 2);
692         } catch (...) {
693           EXPECT_EQ(1, pendingFibers.size());
694         }
695       });
696       taskAdded = true;
697     } else if (pendingFibers.size()) {
698       pendingFibers.back().setValue(0);
699       pendingFibers.pop_back();
700     } else {
701       loopController.stop();
702     }
703   };
704
705   loopController.loop(std::move(loopFunc));
706 }
707
708 TEST(FiberManager, collectNVoid) {
709   std::vector<Promise<int>> pendingFibers;
710   bool taskAdded = false;
711
712   FiberManager manager(std::make_unique<SimpleLoopController>());
713   auto& loopController =
714       dynamic_cast<SimpleLoopController&>(manager.loopController());
715
716   auto loopFunc = [&]() {
717     if (!taskAdded) {
718       manager.addTask([&]() {
719         std::vector<std::function<void()>> funcs;
720         for (size_t i = 0; i < 3; ++i) {
721           funcs.push_back([&pendingFibers]() {
722             await([&pendingFibers](Promise<int> promise) {
723               pendingFibers.push_back(std::move(promise));
724             });
725           });
726         }
727
728         auto results = collectN(funcs.begin(), funcs.end(), 2);
729         EXPECT_EQ(2, results.size());
730         EXPECT_EQ(1, pendingFibers.size());
731       });
732       taskAdded = true;
733     } else if (pendingFibers.size()) {
734       pendingFibers.back().setValue(0);
735       pendingFibers.pop_back();
736     } else {
737       loopController.stop();
738     }
739   };
740
741   loopController.loop(std::move(loopFunc));
742 }
743
744 TEST(FiberManager, collectNVoidThrow) {
745   std::vector<Promise<int>> pendingFibers;
746   bool taskAdded = false;
747
748   FiberManager manager(std::make_unique<SimpleLoopController>());
749   auto& loopController =
750       dynamic_cast<SimpleLoopController&>(manager.loopController());
751
752   auto loopFunc = [&]() {
753     if (!taskAdded) {
754       manager.addTask([&]() {
755         std::vector<std::function<void()>> funcs;
756         for (size_t i = 0; i < 3; ++i) {
757           funcs.push_back([&pendingFibers]() {
758             await([&pendingFibers](Promise<int> promise) {
759               pendingFibers.push_back(std::move(promise));
760             });
761             throw std::runtime_error("Runtime");
762           });
763         }
764
765         try {
766           collectN(funcs.begin(), funcs.end(), 2);
767         } catch (...) {
768           EXPECT_EQ(1, pendingFibers.size());
769         }
770       });
771       taskAdded = true;
772     } else if (pendingFibers.size()) {
773       pendingFibers.back().setValue(0);
774       pendingFibers.pop_back();
775     } else {
776       loopController.stop();
777     }
778   };
779
780   loopController.loop(std::move(loopFunc));
781 }
782
783 TEST(FiberManager, collectAll) {
784   std::vector<Promise<int>> pendingFibers;
785   bool taskAdded = false;
786
787   FiberManager manager(std::make_unique<SimpleLoopController>());
788   auto& loopController =
789       dynamic_cast<SimpleLoopController&>(manager.loopController());
790
791   auto loopFunc = [&]() {
792     if (!taskAdded) {
793       manager.addTask([&]() {
794         std::vector<std::function<int()>> funcs;
795         for (size_t i = 0; i < 3; ++i) {
796           funcs.push_back([i, &pendingFibers]() {
797             await([&pendingFibers](Promise<int> promise) {
798               pendingFibers.push_back(std::move(promise));
799             });
800             return i * 2 + 1;
801           });
802         }
803
804         auto results = collectAll(funcs.begin(), funcs.end());
805         EXPECT_TRUE(pendingFibers.empty());
806         for (size_t i = 0; i < 3; ++i) {
807           EXPECT_EQ(i * 2 + 1, results[i]);
808         }
809       });
810       taskAdded = true;
811     } else if (pendingFibers.size()) {
812       pendingFibers.back().setValue(0);
813       pendingFibers.pop_back();
814     } else {
815       loopController.stop();
816     }
817   };
818
819   loopController.loop(std::move(loopFunc));
820 }
821
822 TEST(FiberManager, collectAllVoid) {
823   std::vector<Promise<int>> pendingFibers;
824   bool taskAdded = false;
825
826   FiberManager manager(std::make_unique<SimpleLoopController>());
827   auto& loopController =
828       dynamic_cast<SimpleLoopController&>(manager.loopController());
829
830   auto loopFunc = [&]() {
831     if (!taskAdded) {
832       manager.addTask([&]() {
833         std::vector<std::function<void()>> funcs;
834         for (size_t i = 0; i < 3; ++i) {
835           funcs.push_back([&pendingFibers]() {
836             await([&pendingFibers](Promise<int> promise) {
837               pendingFibers.push_back(std::move(promise));
838             });
839           });
840         }
841
842         collectAll(funcs.begin(), funcs.end());
843         EXPECT_TRUE(pendingFibers.empty());
844       });
845       taskAdded = true;
846     } else if (pendingFibers.size()) {
847       pendingFibers.back().setValue(0);
848       pendingFibers.pop_back();
849     } else {
850       loopController.stop();
851     }
852   };
853
854   loopController.loop(std::move(loopFunc));
855 }
856
857 TEST(FiberManager, collectAny) {
858   std::vector<Promise<int>> pendingFibers;
859   bool taskAdded = false;
860
861   FiberManager manager(std::make_unique<SimpleLoopController>());
862   auto& loopController =
863       dynamic_cast<SimpleLoopController&>(manager.loopController());
864
865   auto loopFunc = [&]() {
866     if (!taskAdded) {
867       manager.addTask([&]() {
868         std::vector<std::function<int()>> funcs;
869         for (size_t i = 0; i < 3; ++i) {
870           funcs.push_back([i, &pendingFibers]() {
871             await([&pendingFibers](Promise<int> promise) {
872               pendingFibers.push_back(std::move(promise));
873             });
874             if (i == 1) {
875               throw std::runtime_error("This exception will be ignored");
876             }
877             return i * 2 + 1;
878           });
879         }
880
881         auto result = collectAny(funcs.begin(), funcs.end());
882         EXPECT_EQ(2, pendingFibers.size());
883         EXPECT_EQ(2, result.first);
884         EXPECT_EQ(2 * 2 + 1, result.second);
885       });
886       taskAdded = true;
887     } else if (pendingFibers.size()) {
888       pendingFibers.back().setValue(0);
889       pendingFibers.pop_back();
890     } else {
891       loopController.stop();
892     }
893   };
894
895   loopController.loop(std::move(loopFunc));
896 }
897
898 namespace {
899 /* Checks that this function was run from a main context,
900    by comparing an address on a stack to a known main stack address
901    and a known related fiber stack address.  The assumption
902    is that fiber stack and main stack will be far enough apart,
903    while any two values on the same stack will be close. */
904 void expectMainContext(bool& ran, int* mainLocation, int* fiberLocation) {
905   int here;
906   /* 2 pages is a good guess */
907   constexpr ssize_t DISTANCE = 0x2000 / sizeof(int);
908   if (fiberLocation) {
909     EXPECT_TRUE(std::abs(&here - fiberLocation) > DISTANCE);
910   }
911   if (mainLocation) {
912     EXPECT_TRUE(std::abs(&here - mainLocation) < DISTANCE);
913   }
914
915   EXPECT_FALSE(ran);
916   ran = true;
917 }
918 }
919
920 TEST(FiberManager, runInMainContext) {
921   FiberManager manager(std::make_unique<SimpleLoopController>());
922   auto& loopController =
923       dynamic_cast<SimpleLoopController&>(manager.loopController());
924
925   bool checkRan = false;
926
927   int mainLocation;
928   manager.runInMainContext(
929       [&]() { expectMainContext(checkRan, &mainLocation, nullptr); });
930   EXPECT_TRUE(checkRan);
931
932   checkRan = false;
933
934   manager.addTask([&]() {
935     struct A {
936       explicit A(int value_) : value(value_) {}
937       A(const A&) = delete;
938       A(A&&) = default;
939
940       int value;
941     };
942     int stackLocation;
943     auto ret = runInMainContext([&]() {
944       expectMainContext(checkRan, &mainLocation, &stackLocation);
945       return A(42);
946     });
947     EXPECT_TRUE(checkRan);
948     EXPECT_EQ(42, ret.value);
949   });
950
951   loopController.loop([&]() { loopController.stop(); });
952
953   EXPECT_TRUE(checkRan);
954 }
955
956 TEST(FiberManager, addTaskFinally) {
957   FiberManager manager(std::make_unique<SimpleLoopController>());
958   auto& loopController =
959       dynamic_cast<SimpleLoopController&>(manager.loopController());
960
961   bool checkRan = false;
962
963   int mainLocation;
964
965   manager.addTaskFinally(
966       [&]() { return 1234; },
967       [&](Try<int>&& result) {
968         EXPECT_EQ(result.value(), 1234);
969
970         expectMainContext(checkRan, &mainLocation, nullptr);
971       });
972
973   EXPECT_FALSE(checkRan);
974
975   loopController.loop([&]() { loopController.stop(); });
976
977   EXPECT_TRUE(checkRan);
978 }
979
980 TEST(FiberManager, fibersPoolWithinLimit) {
981   FiberManager::Options opts;
982   opts.maxFibersPoolSize = 5;
983
984   FiberManager manager(std::make_unique<SimpleLoopController>(), opts);
985   auto& loopController =
986       dynamic_cast<SimpleLoopController&>(manager.loopController());
987
988   size_t fibersRun = 0;
989
990   for (size_t i = 0; i < 5; ++i) {
991     manager.addTask([&]() { ++fibersRun; });
992   }
993   loopController.loop([&]() { loopController.stop(); });
994
995   EXPECT_EQ(5, fibersRun);
996   EXPECT_EQ(5, manager.fibersAllocated());
997   EXPECT_EQ(5, manager.fibersPoolSize());
998
999   for (size_t i = 0; i < 5; ++i) {
1000     manager.addTask([&]() { ++fibersRun; });
1001   }
1002   loopController.loop([&]() { loopController.stop(); });
1003
1004   EXPECT_EQ(10, fibersRun);
1005   EXPECT_EQ(5, manager.fibersAllocated());
1006   EXPECT_EQ(5, manager.fibersPoolSize());
1007 }
1008
1009 TEST(FiberManager, fibersPoolOverLimit) {
1010   FiberManager::Options opts;
1011   opts.maxFibersPoolSize = 5;
1012
1013   FiberManager manager(std::make_unique<SimpleLoopController>(), opts);
1014   auto& loopController =
1015       dynamic_cast<SimpleLoopController&>(manager.loopController());
1016
1017   size_t fibersRun = 0;
1018
1019   for (size_t i = 0; i < 10; ++i) {
1020     manager.addTask([&]() { ++fibersRun; });
1021   }
1022
1023   EXPECT_EQ(0, fibersRun);
1024   EXPECT_EQ(10, manager.fibersAllocated());
1025   EXPECT_EQ(0, manager.fibersPoolSize());
1026
1027   loopController.loop([&]() { loopController.stop(); });
1028
1029   EXPECT_EQ(10, fibersRun);
1030   EXPECT_EQ(5, manager.fibersAllocated());
1031   EXPECT_EQ(5, manager.fibersPoolSize());
1032 }
1033
1034 TEST(FiberManager, remoteFiberBasic) {
1035   FiberManager manager(std::make_unique<SimpleLoopController>());
1036   auto& loopController =
1037       dynamic_cast<SimpleLoopController&>(manager.loopController());
1038
1039   int result[2];
1040   result[0] = result[1] = 0;
1041   folly::Optional<Promise<int>> savedPromise[2];
1042   manager.addTask([&]() {
1043     result[0] = await(
1044         [&](Promise<int> promise) { savedPromise[0] = std::move(promise); });
1045   });
1046   manager.addTask([&]() {
1047     result[1] = await(
1048         [&](Promise<int> promise) { savedPromise[1] = std::move(promise); });
1049   });
1050
1051   manager.loopUntilNoReady();
1052
1053   EXPECT_TRUE(savedPromise[0].hasValue());
1054   EXPECT_TRUE(savedPromise[1].hasValue());
1055   EXPECT_EQ(0, result[0]);
1056   EXPECT_EQ(0, result[1]);
1057
1058   std::thread remoteThread0{[&]() { savedPromise[0]->setValue(42); }};
1059   std::thread remoteThread1{[&]() { savedPromise[1]->setValue(43); }};
1060   remoteThread0.join();
1061   remoteThread1.join();
1062   EXPECT_EQ(0, result[0]);
1063   EXPECT_EQ(0, result[1]);
1064   /* Should only have scheduled once */
1065   EXPECT_EQ(1, loopController.remoteScheduleCalled());
1066
1067   manager.loopUntilNoReady();
1068   EXPECT_EQ(42, result[0]);
1069   EXPECT_EQ(43, result[1]);
1070 }
1071
1072 TEST(FiberManager, addTaskRemoteBasic) {
1073   FiberManager manager(std::make_unique<SimpleLoopController>());
1074
1075   int result[2];
1076   result[0] = result[1] = 0;
1077   folly::Optional<Promise<int>> savedPromise[2];
1078
1079   std::thread remoteThread0{[&]() {
1080     manager.addTaskRemote([&]() {
1081       result[0] = await(
1082           [&](Promise<int> promise) { savedPromise[0] = std::move(promise); });
1083     });
1084   }};
1085   std::thread remoteThread1{[&]() {
1086     manager.addTaskRemote([&]() {
1087       result[1] = await(
1088           [&](Promise<int> promise) { savedPromise[1] = std::move(promise); });
1089     });
1090   }};
1091   remoteThread0.join();
1092   remoteThread1.join();
1093
1094   manager.loopUntilNoReady();
1095
1096   EXPECT_TRUE(savedPromise[0].hasValue());
1097   EXPECT_TRUE(savedPromise[1].hasValue());
1098   EXPECT_EQ(0, result[0]);
1099   EXPECT_EQ(0, result[1]);
1100
1101   savedPromise[0]->setValue(42);
1102   savedPromise[1]->setValue(43);
1103
1104   EXPECT_EQ(0, result[0]);
1105   EXPECT_EQ(0, result[1]);
1106
1107   manager.loopUntilNoReady();
1108   EXPECT_EQ(42, result[0]);
1109   EXPECT_EQ(43, result[1]);
1110 }
1111
1112 TEST(FiberManager, remoteHasTasks) {
1113   size_t counter = 0;
1114   FiberManager fm(std::make_unique<SimpleLoopController>());
1115   std::thread remote([&]() { fm.addTaskRemote([&]() { ++counter; }); });
1116
1117   remote.join();
1118
1119   while (fm.hasTasks()) {
1120     fm.loopUntilNoReady();
1121   }
1122
1123   EXPECT_FALSE(fm.hasTasks());
1124   EXPECT_EQ(counter, 1);
1125 }
1126
1127 TEST(FiberManager, remoteHasReadyTasks) {
1128   int result = 0;
1129   folly::Optional<Promise<int>> savedPromise;
1130   FiberManager fm(std::make_unique<SimpleLoopController>());
1131   std::thread remote([&]() {
1132     fm.addTaskRemote([&]() {
1133       result = await(
1134           [&](Promise<int> promise) { savedPromise = std::move(promise); });
1135       EXPECT_TRUE(fm.hasTasks());
1136     });
1137   });
1138
1139   remote.join();
1140   EXPECT_TRUE(fm.hasTasks());
1141
1142   fm.loopUntilNoReady();
1143   EXPECT_TRUE(fm.hasTasks());
1144
1145   std::thread remote2([&]() { savedPromise->setValue(47); });
1146   remote2.join();
1147   EXPECT_TRUE(fm.hasTasks());
1148
1149   fm.loopUntilNoReady();
1150   EXPECT_FALSE(fm.hasTasks());
1151
1152   EXPECT_EQ(result, 47);
1153 }
1154
1155 template <typename Data>
1156 void testFiberLocal() {
1157   FiberManager fm(LocalType<Data>(), std::make_unique<SimpleLoopController>());
1158
1159   fm.addTask([]() {
1160     EXPECT_EQ(42, local<Data>().value);
1161
1162     local<Data>().value = 43;
1163
1164     addTask([]() {
1165       EXPECT_EQ(43, local<Data>().value);
1166
1167       local<Data>().value = 44;
1168
1169       addTask([]() { EXPECT_EQ(44, local<Data>().value); });
1170     });
1171   });
1172
1173   fm.addTask([&]() {
1174     EXPECT_EQ(42, local<Data>().value);
1175
1176     local<Data>().value = 43;
1177
1178     fm.addTaskRemote([]() { EXPECT_EQ(43, local<Data>().value); });
1179   });
1180
1181   fm.addTask([]() {
1182     EXPECT_EQ(42, local<Data>().value);
1183     local<Data>().value = 43;
1184
1185     auto task = []() {
1186       EXPECT_EQ(43, local<Data>().value);
1187       local<Data>().value = 44;
1188     };
1189     std::vector<std::function<void()>> tasks{task};
1190     collectAny(tasks.begin(), tasks.end());
1191
1192     EXPECT_EQ(43, local<Data>().value);
1193   });
1194
1195   fm.loopUntilNoReady();
1196   EXPECT_FALSE(fm.hasTasks());
1197 }
1198
1199 TEST(FiberManager, fiberLocal) {
1200   struct SimpleData {
1201     int value{42};
1202   };
1203
1204   testFiberLocal<SimpleData>();
1205 }
1206
1207 TEST(FiberManager, fiberLocalHeap) {
1208   struct LargeData {
1209     char _[1024 * 1024];
1210     int value{42};
1211   };
1212
1213   testFiberLocal<LargeData>();
1214 }
1215
1216 TEST(FiberManager, fiberLocalDestructor) {
1217   struct CrazyData {
1218     size_t data{42};
1219
1220     ~CrazyData() {
1221       if (data == 41) {
1222         addTask([]() {
1223           EXPECT_EQ(42, local<CrazyData>().data);
1224           // Make sure we don't have infinite loop
1225           local<CrazyData>().data = 0;
1226         });
1227       }
1228     }
1229   };
1230
1231   FiberManager fm(
1232       LocalType<CrazyData>(), std::make_unique<SimpleLoopController>());
1233
1234   fm.addTask([]() { local<CrazyData>().data = 41; });
1235
1236   fm.loopUntilNoReady();
1237   EXPECT_FALSE(fm.hasTasks());
1238 }
1239
1240 TEST(FiberManager, fiberRequestContext) {
1241   folly::EventBase evb;
1242   FiberManager fm(std::make_unique<EventBaseLoopController>());
1243   dynamic_cast<EventBaseLoopController&>(fm.loopController())
1244       .attachEventBase(evb);
1245
1246   struct TestContext : public folly::RequestData {
1247     explicit TestContext(std::string s) : data(std::move(s)) {}
1248     std::string data;
1249   };
1250
1251   class AfterFibersCallback : public folly::EventBase::LoopCallback {
1252    public:
1253     AfterFibersCallback(
1254         folly::EventBase& evb,
1255         const bool& fibersDone,
1256         folly::Function<void()> afterFibersFunc)
1257         : evb_(evb),
1258           fibersDone_(fibersDone),
1259           afterFibersFunc_(std::move(afterFibersFunc)) {}
1260
1261     void runLoopCallback() noexcept override {
1262       if (fibersDone_) {
1263         afterFibersFunc_();
1264         delete this;
1265       } else {
1266         evb_.runInLoop(this);
1267       }
1268     }
1269
1270    private:
1271     folly::EventBase& evb_;
1272     const bool& fibersDone_;
1273     folly::Function<void()> afterFibersFunc_;
1274   };
1275
1276   bool fibersDone = false;
1277   size_t tasksRun = 0;
1278   evb.runInEventBaseThread([&evb, &fm, &tasksRun, &fibersDone]() {
1279     ++tasksRun;
1280     auto* const evbCtx = folly::RequestContext::get();
1281     EXPECT_NE(nullptr, evbCtx);
1282     EXPECT_EQ(nullptr, evbCtx->getContextData("key"));
1283     evbCtx->setContextData("key", std::make_unique<TestContext>("evb_value"));
1284
1285     // This callback allows us to check that FiberManager has restored the
1286     // RequestContext provider as expected after a fiber loop.
1287     auto* afterFibersCallback =
1288         new AfterFibersCallback(evb, fibersDone, [&tasksRun, evbCtx]() {
1289           ++tasksRun;
1290           EXPECT_EQ(evbCtx, folly::RequestContext::get());
1291           EXPECT_EQ(
1292               "evb_value",
1293               dynamic_cast<TestContext*>(evbCtx->getContextData("key"))->data);
1294         });
1295     evb.runInLoop(afterFibersCallback);
1296
1297     // Launching a fiber allows us to hit FiberManager RequestContext
1298     // setup/teardown logic.
1299     fm.addTask([&evb, &tasksRun, &fibersDone, evbCtx]() {
1300       ++tasksRun;
1301
1302       // Initially, fiber starts with same RequestContext as its parent task.
1303       EXPECT_EQ(evbCtx, folly::RequestContext::get());
1304       EXPECT_NE(nullptr, evbCtx->getContextData("key"));
1305       EXPECT_EQ(
1306           "evb_value",
1307           dynamic_cast<TestContext*>(evbCtx->getContextData("key"))->data);
1308
1309       // Create a new RequestContext for this fiber so we can distinguish from
1310       // RequestContext first EventBase callback started with.
1311       folly::RequestContext::create();
1312       auto* const fiberCtx = folly::RequestContext::get();
1313       EXPECT_NE(nullptr, fiberCtx);
1314       EXPECT_EQ(nullptr, fiberCtx->getContextData("key"));
1315       fiberCtx->setContextData(
1316           "key", std::make_unique<TestContext>("fiber_value"));
1317
1318       // Task launched from within fiber should share current fiber's
1319       // RequestContext
1320       evb.runInEventBaseThread([&tasksRun, fiberCtx]() {
1321         ++tasksRun;
1322         auto* const evbCtx2 = folly::RequestContext::get();
1323         EXPECT_EQ(fiberCtx, evbCtx2);
1324         EXPECT_NE(nullptr, evbCtx2->getContextData("key"));
1325         EXPECT_EQ(
1326             "fiber_value",
1327             dynamic_cast<TestContext*>(evbCtx2->getContextData("key"))->data);
1328       });
1329
1330       fibersDone = true;
1331     });
1332   });
1333
1334   evb.loop();
1335
1336   EXPECT_EQ(4, tasksRun);
1337   EXPECT_TRUE(fibersDone);
1338   EXPECT_FALSE(fm.hasTasks());
1339 }
1340
1341 TEST(FiberManager, yieldTest) {
1342   FiberManager manager(std::make_unique<SimpleLoopController>());
1343   auto& loopController =
1344       dynamic_cast<SimpleLoopController&>(manager.loopController());
1345
1346   bool checkRan = false;
1347
1348   manager.addTask([&]() {
1349     manager.yield();
1350     checkRan = true;
1351   });
1352
1353   loopController.loop([&]() {
1354     if (checkRan) {
1355       loopController.stop();
1356     }
1357   });
1358
1359   EXPECT_TRUE(checkRan);
1360 }
1361
1362 TEST(FiberManager, RequestContext) {
1363   FiberManager fm(std::make_unique<SimpleLoopController>());
1364
1365   bool checkRun1 = false;
1366   bool checkRun2 = false;
1367   bool checkRun3 = false;
1368   bool checkRun4 = false;
1369   folly::fibers::Baton baton1;
1370   folly::fibers::Baton baton2;
1371   folly::fibers::Baton baton3;
1372   folly::fibers::Baton baton4;
1373
1374   {
1375     folly::RequestContextScopeGuard rctx;
1376     auto rcontext1 = folly::RequestContext::get();
1377     fm.addTask([&, rcontext1]() {
1378       EXPECT_EQ(rcontext1, folly::RequestContext::get());
1379       baton1.wait(
1380           [&]() { EXPECT_EQ(rcontext1, folly::RequestContext::get()); });
1381       EXPECT_EQ(rcontext1, folly::RequestContext::get());
1382       runInMainContext(
1383           [&]() { EXPECT_EQ(rcontext1, folly::RequestContext::get()); });
1384       checkRun1 = true;
1385     });
1386   }
1387   {
1388     folly::RequestContextScopeGuard rctx;
1389     auto rcontext2 = folly::RequestContext::get();
1390     fm.addTaskRemote([&, rcontext2]() {
1391       EXPECT_EQ(rcontext2, folly::RequestContext::get());
1392       baton2.wait();
1393       EXPECT_EQ(rcontext2, folly::RequestContext::get());
1394       checkRun2 = true;
1395     });
1396   }
1397   {
1398     folly::RequestContextScopeGuard rctx;
1399     auto rcontext3 = folly::RequestContext::get();
1400     fm.addTaskFinally(
1401         [&, rcontext3]() {
1402           EXPECT_EQ(rcontext3, folly::RequestContext::get());
1403           baton3.wait();
1404           EXPECT_EQ(rcontext3, folly::RequestContext::get());
1405
1406           return folly::Unit();
1407         },
1408         [&, rcontext3](Try<folly::Unit>&& /* t */) {
1409           EXPECT_EQ(rcontext3, folly::RequestContext::get());
1410           checkRun3 = true;
1411         });
1412   }
1413   {
1414     folly::RequestContext::setContext(nullptr);
1415     fm.addTask([&]() {
1416       folly::RequestContextScopeGuard rctx;
1417       auto rcontext4 = folly::RequestContext::get();
1418       baton4.wait();
1419       EXPECT_EQ(rcontext4, folly::RequestContext::get());
1420       checkRun4 = true;
1421     });
1422   }
1423   {
1424     folly::RequestContextScopeGuard rctx;
1425     auto rcontext = folly::RequestContext::get();
1426
1427     fm.loopUntilNoReady();
1428     EXPECT_EQ(rcontext, folly::RequestContext::get());
1429
1430     baton1.post();
1431     EXPECT_EQ(rcontext, folly::RequestContext::get());
1432     fm.loopUntilNoReady();
1433     EXPECT_TRUE(checkRun1);
1434     EXPECT_EQ(rcontext, folly::RequestContext::get());
1435
1436     baton2.post();
1437     EXPECT_EQ(rcontext, folly::RequestContext::get());
1438     fm.loopUntilNoReady();
1439     EXPECT_TRUE(checkRun2);
1440     EXPECT_EQ(rcontext, folly::RequestContext::get());
1441
1442     baton3.post();
1443     EXPECT_EQ(rcontext, folly::RequestContext::get());
1444     fm.loopUntilNoReady();
1445     EXPECT_TRUE(checkRun3);
1446     EXPECT_EQ(rcontext, folly::RequestContext::get());
1447
1448     baton4.post();
1449     EXPECT_EQ(rcontext, folly::RequestContext::get());
1450     fm.loopUntilNoReady();
1451     EXPECT_TRUE(checkRun4);
1452     EXPECT_EQ(rcontext, folly::RequestContext::get());
1453   }
1454 }
1455
1456 TEST(FiberManager, resizePeriodically) {
1457   FiberManager::Options opts;
1458   opts.fibersPoolResizePeriodMs = 300;
1459   opts.maxFibersPoolSize = 5;
1460
1461   FiberManager manager(std::make_unique<EventBaseLoopController>(), opts);
1462
1463   folly::EventBase evb;
1464   dynamic_cast<EventBaseLoopController&>(manager.loopController())
1465       .attachEventBase(evb);
1466
1467   std::vector<Baton> batons(10);
1468
1469   size_t tasksRun = 0;
1470   for (size_t i = 0; i < 30; ++i) {
1471     manager.addTask([i, &batons, &tasksRun]() {
1472       ++tasksRun;
1473       // Keep some fibers active indefinitely
1474       if (i < batons.size()) {
1475         batons[i].wait();
1476       }
1477     });
1478   }
1479
1480   EXPECT_EQ(0, tasksRun);
1481   EXPECT_EQ(30, manager.fibersAllocated());
1482   EXPECT_EQ(0, manager.fibersPoolSize());
1483
1484   evb.loopOnce();
1485   EXPECT_EQ(30, tasksRun);
1486   EXPECT_EQ(30, manager.fibersAllocated());
1487   // Can go over maxFibersPoolSize, 10 of 30 fibers still active
1488   EXPECT_EQ(20, manager.fibersPoolSize());
1489
1490   std::this_thread::sleep_for(std::chrono::milliseconds(400));
1491   evb.loopOnce(); // no fibers active in this period
1492   EXPECT_EQ(30, manager.fibersAllocated());
1493   EXPECT_EQ(20, manager.fibersPoolSize());
1494
1495   std::this_thread::sleep_for(std::chrono::milliseconds(400));
1496   evb.loopOnce(); // should shrink fibers pool to maxFibersPoolSize
1497   EXPECT_EQ(15, manager.fibersAllocated());
1498   EXPECT_EQ(5, manager.fibersPoolSize());
1499
1500   for (size_t i = 0; i < batons.size(); ++i) {
1501     batons[i].post();
1502   }
1503   evb.loopOnce();
1504   EXPECT_EQ(15, manager.fibersAllocated());
1505   EXPECT_EQ(15, manager.fibersPoolSize());
1506
1507   std::this_thread::sleep_for(std::chrono::milliseconds(400));
1508   evb.loopOnce(); // 10 fibers active in last period
1509   EXPECT_EQ(10, manager.fibersAllocated());
1510   EXPECT_EQ(10, manager.fibersPoolSize());
1511
1512   std::this_thread::sleep_for(std::chrono::milliseconds(400));
1513   evb.loopOnce();
1514   EXPECT_EQ(5, manager.fibersAllocated());
1515   EXPECT_EQ(5, manager.fibersPoolSize());
1516 }
1517
1518 TEST(FiberManager, batonWaitTimeoutHandler) {
1519   FiberManager manager(std::make_unique<EventBaseLoopController>());
1520
1521   folly::EventBase evb;
1522   dynamic_cast<EventBaseLoopController&>(manager.loopController())
1523       .attachEventBase(evb);
1524
1525   size_t fibersRun = 0;
1526   Baton baton;
1527   Baton::TimeoutHandler timeoutHandler;
1528
1529   manager.addTask([&]() {
1530     baton.wait(timeoutHandler);
1531     ++fibersRun;
1532   });
1533   manager.loopUntilNoReady();
1534
1535   EXPECT_FALSE(baton.try_wait());
1536   EXPECT_EQ(0, fibersRun);
1537
1538   timeoutHandler.scheduleTimeout(std::chrono::milliseconds(250));
1539   std::this_thread::sleep_for(std::chrono::milliseconds(500));
1540
1541   EXPECT_FALSE(baton.try_wait());
1542   EXPECT_EQ(0, fibersRun);
1543
1544   evb.loopOnce();
1545   manager.loopUntilNoReady();
1546
1547   EXPECT_EQ(1, fibersRun);
1548 }
1549
1550 TEST(FiberManager, batonWaitTimeoutMany) {
1551   FiberManager manager(std::make_unique<EventBaseLoopController>());
1552
1553   folly::EventBase evb;
1554   dynamic_cast<EventBaseLoopController&>(manager.loopController())
1555       .attachEventBase(evb);
1556
1557   constexpr size_t kNumTimeoutTasks = 10000;
1558   size_t tasksCount = kNumTimeoutTasks;
1559
1560   // We add many tasks to hit timeout queue deallocation logic.
1561   for (size_t i = 0; i < kNumTimeoutTasks; ++i) {
1562     manager.addTask([&]() {
1563       Baton baton;
1564       Baton::TimeoutHandler timeoutHandler;
1565
1566       folly::fibers::addTask([&] {
1567         timeoutHandler.scheduleTimeout(std::chrono::milliseconds(1000));
1568       });
1569
1570       baton.wait(timeoutHandler);
1571       if (--tasksCount == 0) {
1572         evb.terminateLoopSoon();
1573       }
1574     });
1575   }
1576
1577   evb.loopForever();
1578 }
1579
1580 TEST(FiberManager, remoteFutureTest) {
1581   FiberManager fiberManager(std::make_unique<SimpleLoopController>());
1582   auto& loopController =
1583       dynamic_cast<SimpleLoopController&>(fiberManager.loopController());
1584
1585   int testValue1 = 5;
1586   int testValue2 = 7;
1587   auto f1 = fiberManager.addTaskFuture([&]() { return testValue1; });
1588   auto f2 = fiberManager.addTaskRemoteFuture([&]() { return testValue2; });
1589   loopController.loop([&]() { loopController.stop(); });
1590   auto v1 = f1.get();
1591   auto v2 = f2.get();
1592
1593   EXPECT_EQ(v1, testValue1);
1594   EXPECT_EQ(v2, testValue2);
1595 }
1596
1597 // Test that a void function produes a Future<Unit>.
1598 TEST(FiberManager, remoteFutureVoidUnitTest) {
1599   FiberManager fiberManager(std::make_unique<SimpleLoopController>());
1600   auto& loopController =
1601       dynamic_cast<SimpleLoopController&>(fiberManager.loopController());
1602
1603   bool ranLocal = false;
1604   folly::Future<folly::Unit> futureLocal =
1605       fiberManager.addTaskFuture([&]() { ranLocal = true; });
1606
1607   bool ranRemote = false;
1608   folly::Future<folly::Unit> futureRemote =
1609       fiberManager.addTaskRemoteFuture([&]() { ranRemote = true; });
1610
1611   loopController.loop([&]() { loopController.stop(); });
1612
1613   futureLocal.wait();
1614   ASSERT_TRUE(ranLocal);
1615
1616   futureRemote.wait();
1617   ASSERT_TRUE(ranRemote);
1618 }
1619
1620 TEST(FiberManager, nestedFiberManagers) {
1621   folly::EventBase outerEvb;
1622   folly::EventBase innerEvb;
1623
1624   getFiberManager(outerEvb).addTask([&]() {
1625     EXPECT_EQ(
1626         &getFiberManager(outerEvb), FiberManager::getFiberManagerUnsafe());
1627
1628     runInMainContext([&]() {
1629       getFiberManager(innerEvb).addTask([&]() {
1630         EXPECT_EQ(
1631             &getFiberManager(innerEvb), FiberManager::getFiberManagerUnsafe());
1632
1633         innerEvb.terminateLoopSoon();
1634       });
1635
1636       innerEvb.loopForever();
1637     });
1638
1639     EXPECT_EQ(
1640         &getFiberManager(outerEvb), FiberManager::getFiberManagerUnsafe());
1641
1642     outerEvb.terminateLoopSoon();
1643   });
1644
1645   outerEvb.loopForever();
1646 }
1647
1648 TEST(FiberManager, semaphore) {
1649   static constexpr size_t kTasks = 10;
1650   static constexpr size_t kIterations = 10000;
1651   static constexpr size_t kNumTokens = 10;
1652
1653   Semaphore sem(kNumTokens);
1654   int counterA = 0;
1655   int counterB = 0;
1656
1657   auto task = [&sem](int& counter, folly::fibers::Baton& baton) {
1658     FiberManager manager(std::make_unique<EventBaseLoopController>());
1659     folly::EventBase evb;
1660     dynamic_cast<EventBaseLoopController&>(manager.loopController())
1661         .attachEventBase(evb);
1662
1663     {
1664       std::shared_ptr<folly::EventBase> completionCounter(
1665           &evb, [](folly::EventBase* evb) { evb->terminateLoopSoon(); });
1666
1667       for (size_t i = 0; i < kTasks; ++i) {
1668         manager.addTask([&, completionCounter]() {
1669           for (size_t j = 0; j < kIterations; ++j) {
1670             sem.wait();
1671             ++counter;
1672             sem.signal();
1673             --counter;
1674
1675             EXPECT_LT(counter, kNumTokens);
1676             EXPECT_GE(counter, 0);
1677           }
1678         });
1679       }
1680
1681       baton.wait();
1682     }
1683     evb.loopForever();
1684   };
1685
1686   folly::fibers::Baton batonA;
1687   folly::fibers::Baton batonB;
1688   std::thread threadA([&] { task(counterA, batonA); });
1689   std::thread threadB([&] { task(counterB, batonB); });
1690
1691   batonA.post();
1692   batonB.post();
1693   threadA.join();
1694   threadB.join();
1695
1696   EXPECT_LT(counterA, kNumTokens);
1697   EXPECT_LT(counterB, kNumTokens);
1698   EXPECT_GE(counterA, 0);
1699   EXPECT_GE(counterB, 0);
1700 }
1701
1702 template <typename ExecutorT>
1703 void singleBatchDispatch(ExecutorT& executor, int batchSize, int index) {
1704   thread_local BatchDispatcher<int, std::string, ExecutorT> batchDispatcher(
1705       executor, [=](std::vector<int>&& batch) {
1706         EXPECT_EQ(batchSize, batch.size());
1707         std::vector<std::string> results;
1708         for (auto& it : batch) {
1709           results.push_back(folly::to<std::string>(it));
1710         }
1711         return results;
1712       });
1713
1714   auto indexCopy = index;
1715   auto result = batchDispatcher.add(std::move(indexCopy));
1716   EXPECT_EQ(folly::to<std::string>(index), result.get());
1717 }
1718
1719 TEST(FiberManager, batchDispatchTest) {
1720   folly::EventBase evb;
1721   auto& executor = getFiberManager(evb);
1722
1723   // Launch multiple fibers with a single id.
1724   executor.add([&]() {
1725     int batchSize = 10;
1726     for (int i = 0; i < batchSize; i++) {
1727       executor.add(
1728           [=, &executor]() { singleBatchDispatch(executor, batchSize, i); });
1729     }
1730   });
1731   evb.loop();
1732
1733   // Reuse the same BatchDispatcher to batch once again.
1734   executor.add([&]() {
1735     int batchSize = 10;
1736     for (int i = 0; i < batchSize; i++) {
1737       executor.add(
1738           [=, &executor]() { singleBatchDispatch(executor, batchSize, i); });
1739     }
1740   });
1741   evb.loop();
1742 }
1743
1744 template <typename ExecutorT>
1745 folly::Future<std::vector<std::string>> doubleBatchInnerDispatch(
1746     ExecutorT& executor,
1747     int totalNumberOfElements,
1748     std::vector<int> input) {
1749   thread_local BatchDispatcher<
1750       std::vector<int>,
1751       std::vector<std::string>,
1752       ExecutorT>
1753       batchDispatcher(executor, [=](std::vector<std::vector<int>>&& batch) {
1754         std::vector<std::vector<std::string>> results;
1755         int numberOfElements = 0;
1756         for (auto& unit : batch) {
1757           numberOfElements += unit.size();
1758           std::vector<std::string> result;
1759           for (auto& element : unit) {
1760             result.push_back(folly::to<std::string>(element));
1761           }
1762           results.push_back(std::move(result));
1763         }
1764         EXPECT_EQ(totalNumberOfElements, numberOfElements);
1765         return results;
1766       });
1767
1768   return batchDispatcher.add(std::move(input));
1769 }
1770
1771 /**
1772  * Batch values in groups of 5, and then call inner dispatch.
1773  */
1774 template <typename ExecutorT>
1775 void doubleBatchOuterDispatch(
1776     ExecutorT& executor,
1777     int totalNumberOfElements,
1778     int index) {
1779   thread_local BatchDispatcher<int, std::string, ExecutorT> batchDispatcher(
1780       executor, [=, &executor](std::vector<int>&& batch) {
1781         EXPECT_EQ(totalNumberOfElements, batch.size());
1782         std::vector<std::string> results;
1783         std::vector<folly::Future<std::vector<std::string>>>
1784             innerDispatchResultFutures;
1785
1786         std::vector<int> group;
1787         for (auto unit : batch) {
1788           group.push_back(unit);
1789           if (group.size() == 5) {
1790             auto localGroup = group;
1791             group.clear();
1792
1793             innerDispatchResultFutures.push_back(doubleBatchInnerDispatch(
1794                 executor, totalNumberOfElements, localGroup));
1795           }
1796         }
1797
1798         folly::collectAll(
1799             innerDispatchResultFutures.begin(),
1800             innerDispatchResultFutures.end())
1801             .then([&](std::vector<Try<std::vector<std::string>>>
1802                           innerDispatchResults) {
1803               for (auto& unit : innerDispatchResults) {
1804                 for (auto& element : unit.value()) {
1805                   results.push_back(element);
1806                 }
1807               }
1808             })
1809             .get();
1810         return results;
1811       });
1812
1813   auto indexCopy = index;
1814   auto result = batchDispatcher.add(std::move(indexCopy));
1815   EXPECT_EQ(folly::to<std::string>(index), result.get());
1816 }
1817
1818 TEST(FiberManager, doubleBatchDispatchTest) {
1819   folly::EventBase evb;
1820   auto& executor = getFiberManager(evb);
1821
1822   // Launch multiple fibers with a single id.
1823   executor.add([&]() {
1824     int totalNumberOfElements = 20;
1825     for (int i = 0; i < totalNumberOfElements; i++) {
1826       executor.add([=, &executor]() {
1827         doubleBatchOuterDispatch(executor, totalNumberOfElements, i);
1828       });
1829     }
1830   });
1831   evb.loop();
1832 }
1833
1834 template <typename ExecutorT>
1835 void batchDispatchExceptionHandling(ExecutorT& executor, int i) {
1836   thread_local BatchDispatcher<int, int, ExecutorT> batchDispatcher(
1837       executor, [](std::vector<int> &&) -> std::vector<int> {
1838         throw std::runtime_error("Surprise!!");
1839       });
1840
1841   EXPECT_THROW(batchDispatcher.add(i).get(), std::runtime_error);
1842 }
1843
1844 TEST(FiberManager, batchDispatchExceptionHandlingTest) {
1845   folly::EventBase evb;
1846   auto& executor = getFiberManager(evb);
1847
1848   // Launch multiple fibers with a single id.
1849   executor.add([&]() {
1850     int totalNumberOfElements = 5;
1851     for (int i = 0; i < totalNumberOfElements; i++) {
1852       executor.add(
1853           [=, &executor]() { batchDispatchExceptionHandling(executor, i); });
1854     }
1855   });
1856   evb.loop();
1857 }
1858
1859 namespace AtomicBatchDispatcherTesting {
1860
1861 using ValueT = size_t;
1862 using ResultT = std::string;
1863 using DispatchFunctionT =
1864     folly::Function<std::vector<ResultT>(std::vector<ValueT>&&)>;
1865
1866 #define ENABLE_TRACE_IN_TEST 0 // Set to 1 to debug issues in ABD tests
1867 #if ENABLE_TRACE_IN_TEST
1868 #define OUTPUT_TRACE std::cerr
1869 #else // ENABLE_TRACE_IN_TEST
1870 struct DevNullPiper {
1871   template <typename T>
1872   DevNullPiper& operator<<(const T&) {
1873     return *this;
1874   }
1875
1876   DevNullPiper& operator<<(std::ostream& (*)(std::ostream&)) {
1877     return *this;
1878   }
1879 } devNullPiper;
1880 #define OUTPUT_TRACE devNullPiper
1881 #endif // ENABLE_TRACE_IN_TEST
1882
1883 struct Job {
1884   AtomicBatchDispatcher<ValueT, ResultT>::Token token;
1885   ValueT input;
1886
1887   void preprocess(FiberManager& executor, bool die) {
1888     // Yield for a random duration [0, 10] ms to simulate I/O in preprocessing
1889     clock_t msecToDoIO = folly::Random::rand32() % 10;
1890     double start = (1000.0 * clock()) / CLOCKS_PER_SEC;
1891     double endAfter = start + msecToDoIO;
1892     while ((1000.0 * clock()) / CLOCKS_PER_SEC < endAfter) {
1893       executor.yield();
1894     }
1895     if (die) {
1896       throw std::logic_error("Simulating preprocessing failure");
1897     }
1898   }
1899
1900   Job(AtomicBatchDispatcher<ValueT, ResultT>::Token&& t, ValueT i)
1901       : token(std::move(t)), input(i) {}
1902
1903   Job(Job&&) = default;
1904   Job& operator=(Job&&) = default;
1905 };
1906
1907 ResultT processSingleInput(ValueT&& input) {
1908   return folly::to<ResultT>(std::move(input));
1909 }
1910
1911 std::vector<ResultT> userDispatchFunc(std::vector<ValueT>&& inputs) {
1912   size_t expectedCount = inputs.size();
1913   std::vector<ResultT> results;
1914   results.reserve(expectedCount);
1915   for (size_t i = 0; i < expectedCount; ++i) {
1916     results.emplace_back(processSingleInput(std::move(inputs[i])));
1917   }
1918   return results;
1919 }
1920
1921 void createJobs(
1922     AtomicBatchDispatcher<ValueT, ResultT>& atomicBatchDispatcher,
1923     std::vector<Job>& jobs,
1924     size_t count) {
1925   jobs.clear();
1926   for (size_t i = 0; i < count; ++i) {
1927     jobs.emplace_back(Job(atomicBatchDispatcher.getToken(), i));
1928   }
1929 }
1930
1931 enum class DispatchProblem {
1932   None,
1933   PreprocessThrows,
1934   DuplicateDispatch,
1935 };
1936
1937 void dispatchJobs(
1938     FiberManager& executor,
1939     std::vector<Job>& jobs,
1940     std::vector<folly::Optional<folly::Future<ResultT>>>& results,
1941     DispatchProblem dispatchProblem = DispatchProblem::None,
1942     size_t problemIndex = size_t(-1)) {
1943   EXPECT_TRUE(
1944       dispatchProblem == DispatchProblem::None || problemIndex < jobs.size());
1945   results.clear();
1946   results.resize(jobs.size());
1947   for (size_t i = 0; i < jobs.size(); ++i) {
1948     executor.add(
1949         [i, &executor, &jobs, &results, dispatchProblem, problemIndex]() {
1950           try {
1951             Job job(std::move(jobs[i]));
1952
1953             if (dispatchProblem == DispatchProblem::PreprocessThrows) {
1954               if (i == problemIndex) {
1955                 EXPECT_THROW(job.preprocess(executor, true), std::logic_error);
1956                 return;
1957               }
1958             }
1959
1960             job.preprocess(executor, false);
1961             OUTPUT_TRACE << "Dispatching job #" << i << std::endl;
1962             results[i] = job.token.dispatch(job.input);
1963             OUTPUT_TRACE << "Result future filled for job #" << i << std::endl;
1964
1965             if (dispatchProblem == DispatchProblem::DuplicateDispatch) {
1966               if (i == problemIndex) {
1967                 EXPECT_THROW(job.token.dispatch(job.input), ABDUsageException);
1968               }
1969             }
1970           } catch (...) {
1971             OUTPUT_TRACE << "Preprocessing failed for job #" << i << std::endl;
1972           }
1973         });
1974   }
1975 }
1976
1977 void validateResult(
1978     std::vector<folly::Optional<folly::Future<ResultT>>>& results,
1979     size_t i) {
1980   try {
1981     OUTPUT_TRACE << "results[" << i << "].value() : " << results[i]->value()
1982                  << std::endl;
1983   } catch (std::exception& e) {
1984     OUTPUT_TRACE << "Exception : " << e.what() << std::endl;
1985     throw;
1986   }
1987 }
1988
1989 template <typename TException>
1990 void validateResults(
1991     std::vector<folly::Optional<folly::Future<ResultT>>>& results,
1992     size_t expectedNumResults) {
1993   size_t numResultsFilled = 0;
1994   for (size_t i = 0; i < results.size(); ++i) {
1995     if (!results[i]) {
1996       continue;
1997     }
1998     ++numResultsFilled;
1999     EXPECT_THROW(validateResult(results, i), TException);
2000   }
2001   EXPECT_EQ(numResultsFilled, expectedNumResults);
2002 }
2003
2004 void validateResults(
2005     std::vector<folly::Optional<folly::Future<ResultT>>>& results,
2006     size_t expectedNumResults) {
2007   size_t numResultsFilled = 0;
2008   for (size_t i = 0; i < results.size(); ++i) {
2009     if (!results[i]) {
2010       continue;
2011     }
2012     ++numResultsFilled;
2013     EXPECT_NO_THROW(validateResult(results, i));
2014     ValueT expectedInput = i;
2015     EXPECT_EQ(
2016         results[i]->value(), processSingleInput(std::move(expectedInput)));
2017   }
2018   EXPECT_EQ(numResultsFilled, expectedNumResults);
2019 }
2020
2021 } // namespace AtomicBatchDispatcherTesting
2022
2023 #define SET_UP_TEST_FUNC                                        \
2024   using namespace AtomicBatchDispatcherTesting;                 \
2025   folly::EventBase evb;                                         \
2026   auto& executor = getFiberManager(evb);                        \
2027   const size_t COUNT = 11;                                      \
2028   std::vector<Job> jobs;                                        \
2029   jobs.reserve(COUNT);                                          \
2030   std::vector<folly::Optional<folly::Future<ResultT>>> results; \
2031   results.reserve(COUNT);                                       \
2032   DispatchFunctionT dispatchFunc
2033
2034 TEST(FiberManager, ABD_Test) {
2035   SET_UP_TEST_FUNC;
2036
2037   //
2038   // Testing AtomicBatchDispatcher with explicit call to commit()
2039   //
2040   dispatchFunc = userDispatchFunc;
2041   auto atomicBatchDispatcher =
2042       createAtomicBatchDispatcher(std::move(dispatchFunc));
2043   createJobs(atomicBatchDispatcher, jobs, COUNT);
2044   dispatchJobs(executor, jobs, results);
2045   atomicBatchDispatcher.commit();
2046   evb.loop();
2047   validateResults(results, COUNT);
2048 }
2049
2050 TEST(FiberManager, ABD_DispatcherDestroyedBeforeCallingCommit) {
2051   SET_UP_TEST_FUNC;
2052
2053   //
2054   // Testing AtomicBatchDispatcher destroyed before calling commit.
2055   // Handles error cases for:
2056   // - User might have forgotten to add the call to commit() in the code
2057   // - An unexpected exception got thrown in user code before commit() is called
2058   //
2059   try {
2060     dispatchFunc = userDispatchFunc;
2061     auto atomicBatchDispatcher =
2062         createAtomicBatchDispatcher(std::move(dispatchFunc));
2063     createJobs(atomicBatchDispatcher, jobs, COUNT);
2064     dispatchJobs(executor, jobs, results);
2065     throw std::runtime_error(
2066         "Unexpected exception in user code before commit called");
2067     // atomicBatchDispatcher.commit();
2068   } catch (...) {
2069     /* User code handles the exception and does not exit process */
2070   }
2071   evb.loop();
2072   validateResults<ABDCommitNotCalledException>(results, COUNT);
2073 }
2074
2075 TEST(FiberManager, ABD_PreprocessingFailureTest) {
2076   SET_UP_TEST_FUNC;
2077
2078   //
2079   // Testing preprocessing failure on a job throws
2080   //
2081   dispatchFunc = userDispatchFunc;
2082   auto atomicBatchDispatcher =
2083       createAtomicBatchDispatcher(std::move(dispatchFunc));
2084   createJobs(atomicBatchDispatcher, jobs, COUNT);
2085   dispatchJobs(executor, jobs, results, DispatchProblem::PreprocessThrows, 8);
2086   atomicBatchDispatcher.commit();
2087   evb.loop();
2088   validateResults<ABDTokenNotDispatchedException>(results, COUNT - 1);
2089 }
2090
2091 TEST(FiberManager, ABD_MultipleDispatchOnSameTokenErrorTest) {
2092   SET_UP_TEST_FUNC;
2093
2094   //
2095   // Testing that calling dispatch more than once on the same token throws
2096   //
2097   dispatchFunc = userDispatchFunc;
2098   auto atomicBatchDispatcher =
2099       createAtomicBatchDispatcher(std::move(dispatchFunc));
2100   createJobs(atomicBatchDispatcher, jobs, COUNT);
2101   dispatchJobs(executor, jobs, results, DispatchProblem::DuplicateDispatch, 4);
2102   atomicBatchDispatcher.commit();
2103   evb.loop();
2104 }
2105
2106 TEST(FiberManager, ABD_GetTokenCalledAfterCommitTest) {
2107   SET_UP_TEST_FUNC;
2108
2109   //
2110   // Testing that exception set on attempt to call getToken after commit called
2111   //
2112   dispatchFunc = userDispatchFunc;
2113   auto atomicBatchDispatcher =
2114       createAtomicBatchDispatcher(std::move(dispatchFunc));
2115   createJobs(atomicBatchDispatcher, jobs, COUNT);
2116   atomicBatchDispatcher.commit();
2117   EXPECT_THROW(atomicBatchDispatcher.getToken(), ABDUsageException);
2118   dispatchJobs(executor, jobs, results);
2119   EXPECT_THROW(atomicBatchDispatcher.getToken(), ABDUsageException);
2120   evb.loop();
2121   validateResults(results, COUNT);
2122   EXPECT_THROW(atomicBatchDispatcher.getToken(), ABDUsageException);
2123 }
2124
2125 TEST(FiberManager, ABD_UserProvidedBatchDispatchThrowsTest) {
2126   SET_UP_TEST_FUNC;
2127
2128   //
2129   // Testing that exception is set if user provided batch dispatch throws
2130   //
2131   dispatchFunc = [](std::vector<ValueT>&& inputs) -> std::vector<ResultT> {
2132     (void)userDispatchFunc(std::move(inputs));
2133     throw std::runtime_error("Unexpected exception in user dispatch function");
2134   };
2135   auto atomicBatchDispatcher =
2136       createAtomicBatchDispatcher(std::move(dispatchFunc));
2137   createJobs(atomicBatchDispatcher, jobs, COUNT);
2138   dispatchJobs(executor, jobs, results);
2139   atomicBatchDispatcher.commit();
2140   evb.loop();
2141   validateResults<std::runtime_error>(results, COUNT);
2142 }
2143
2144 TEST(FiberManager, VirtualEventBase) {
2145   bool done1{false};
2146   bool done2{false};
2147   {
2148     folly::ScopedEventBaseThread thread;
2149
2150     auto evb1 =
2151         std::make_unique<folly::VirtualEventBase>(*thread.getEventBase());
2152     auto& evb2 = thread.getEventBase()->getVirtualEventBase();
2153
2154     getFiberManager(*evb1).addTaskRemote([&] {
2155       Baton baton;
2156       baton.timed_wait(std::chrono::milliseconds{100});
2157
2158       done1 = true;
2159     });
2160
2161     getFiberManager(evb2).addTaskRemote([&] {
2162       Baton baton;
2163       baton.timed_wait(std::chrono::milliseconds{200});
2164
2165       done2 = true;
2166     });
2167
2168     EXPECT_FALSE(done1);
2169     EXPECT_FALSE(done2);
2170
2171     evb1.reset();
2172     EXPECT_TRUE(done1);
2173     EXPECT_FALSE(done2);
2174   }
2175   EXPECT_TRUE(done2);
2176 }
2177
2178 TEST(TimedMutex, ThreadFiberDeadlockOrder) {
2179   folly::EventBase evb;
2180   auto& fm = getFiberManager(evb);
2181   TimedMutex mutex;
2182
2183   mutex.lock();
2184   std::thread unlockThread([&] {
2185     /* sleep override */ std::this_thread::sleep_for(
2186         std::chrono::milliseconds{100});
2187     mutex.unlock();
2188   });
2189
2190   fm.addTask([&] { std::lock_guard<TimedMutex> lg(mutex); });
2191   fm.addTask([&] {
2192     runInMainContext([&] {
2193       auto locked = mutex.timed_lock(std::chrono::seconds{1});
2194       EXPECT_TRUE(locked);
2195       if (locked) {
2196         mutex.unlock();
2197       }
2198     });
2199   });
2200
2201   evb.loopOnce();
2202   EXPECT_EQ(0, fm.hasTasks());
2203
2204   unlockThread.join();
2205 }
2206
2207 TEST(TimedMutex, ThreadFiberDeadlockRace) {
2208   folly::EventBase evb;
2209   auto& fm = getFiberManager(evb);
2210   TimedMutex mutex;
2211
2212   mutex.lock();
2213
2214   fm.addTask([&] {
2215     auto locked = mutex.timed_lock(std::chrono::seconds{1});
2216     EXPECT_TRUE(locked);
2217     if (locked) {
2218       mutex.unlock();
2219     }
2220   });
2221   fm.addTask([&] {
2222     mutex.unlock();
2223     runInMainContext([&] {
2224       auto locked = mutex.timed_lock(std::chrono::seconds{1});
2225       EXPECT_TRUE(locked);
2226       if (locked) {
2227         mutex.unlock();
2228       }
2229     });
2230   });
2231
2232   evb.loopOnce();
2233   EXPECT_EQ(0, fm.hasTasks());
2234 }
2235
2236 /**
2237  * Test that we can properly track fiber stack usage.
2238  *
2239  * This functionality can only be enabled when ASAN is disabled, so avoid
2240  * running this test with ASAN.
2241  */
2242 #ifndef FOLLY_SANITIZE_ADDRESS
2243 TEST(FiberManager, recordStack) {
2244   auto f = [] {
2245     folly::fibers::FiberManager::Options opts;
2246     opts.recordStackEvery = 1;
2247
2248     FiberManager fm(std::make_unique<SimpleLoopController>(), opts);
2249     auto& loopController =
2250         dynamic_cast<SimpleLoopController&>(fm.loopController());
2251
2252     static constexpr size_t n = 1000;
2253     int s = 0;
2254     fm.addTask([&]() {
2255       int b[n] = {0};
2256       for (size_t i = 0; i < n; ++i) {
2257         b[i] = i;
2258       }
2259       for (size_t i = 0; i + 1 < n; ++i) {
2260         s += b[i] * b[i + 1];
2261       }
2262     });
2263
2264     (void)s;
2265
2266     loopController.loop([&]() { loopController.stop(); });
2267
2268     // Check that we properly accounted fiber stack usage.
2269     EXPECT_LT(n * sizeof(int), fm.stackHighWatermark());
2270   };
2271   std::thread(f).join();
2272 }
2273 #endif