919b09651dcda9599caa3351efb42d2cb12140a0
[folly.git] / folly / experimental / fibers / FiberManager.h
1 /*
2  * Copyright 2016 Facebook, Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *   http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 #pragma once
17
18 #include <functional>
19 #include <memory>
20 #include <queue>
21 #include <thread>
22 #include <typeindex>
23 #include <type_traits>
24 #include <unordered_set>
25 #include <vector>
26
27 #include <folly/AtomicLinkedList.h>
28 #include <folly/Executor.h>
29 #include <folly/Likely.h>
30 #include <folly/IntrusiveList.h>
31 #include <folly/io/async/Request.h>
32 #include <folly/futures/Try.h>
33
34 #include <folly/experimental/ExecutionObserver.h>
35 #include <folly/experimental/fibers/BoostContextCompatibility.h>
36 #include <folly/experimental/fibers/Fiber.h>
37 #include <folly/experimental/fibers/GuardPageAllocator.h>
38 #include <folly/experimental/fibers/TimeoutController.h>
39 #include <folly/experimental/fibers/traits.h>
40
41 namespace folly {
42
43 template <class T>
44 class Future;
45
46 namespace fibers {
47
48 class Baton;
49 class Fiber;
50 class LoopController;
51 class TimeoutController;
52
53 template <typename T>
54 class LocalType {
55 };
56
57 class InlineFunctionRunner {
58  public:
59   virtual ~InlineFunctionRunner() {}
60
61   /**
62    * func must be executed inline and only once.
63    */
64   virtual void run(std::function<void()> func) = 0;
65 };
66
67 /**
68  * @class FiberManager
69  * @brief Single-threaded task execution engine.
70  *
71  * FiberManager allows semi-parallel task execution on the same thread. Each
72  * task can notify FiberManager that it is blocked on something (via await())
73  * call. This will pause execution of this task and it will be resumed only
74  * when it is unblocked (via setData()).
75  */
76 class FiberManager : public ::folly::Executor {
77  public:
78   struct Options {
79     static constexpr size_t kDefaultStackSize{16 * 1024};
80
81     /**
82      * Maximum stack size for fibers which will be used for executing all the
83      * tasks.
84      */
85     size_t stackSize{kDefaultStackSize};
86
87     /**
88      * Record exact amount of stack used.
89      *
90      * This is fairly expensive: we fill each newly allocated stack
91      * with some known value and find the boundary of unused stack
92      * with linear search every time we surrender the stack back to fibersPool.
93      * 0 disables stack recording.
94      */
95     size_t recordStackEvery{0};
96
97     /**
98      * Keep at most this many free fibers in the pool.
99      * This way the total number of fibers in the system is always bounded
100      * by the number of active fibers + maxFibersPoolSize.
101      */
102     size_t maxFibersPoolSize{1000};
103
104     /**
105      * Protect limited amount of fiber stacks with guard pages.
106      */
107     bool useGuardPages{false};
108
109     /**
110      * Free unnecessary fibers in the fibers pool every fibersPoolResizePeriodMs
111      * milliseconds. If value is 0, periodic resizing of the fibers pool is
112      * disabled.
113      */
114     uint32_t fibersPoolResizePeriodMs{0};
115
116     constexpr Options() {}
117   };
118
119   typedef std::function<void(std::exception_ptr, std::string)>
120   ExceptionCallback;
121
122   FiberManager(const FiberManager&) = delete;
123   FiberManager& operator=(const FiberManager&) = delete;
124
125   /**
126    * Initializes, but doesn't start FiberManager loop
127    *
128    * @param loopController
129    * @param options FiberManager options
130    */
131   explicit FiberManager(std::unique_ptr<LoopController> loopController,
132                         Options options = Options());
133
134   /**
135    * Initializes, but doesn't start FiberManager loop
136    *
137    * @param loopController
138    * @param options FiberManager options
139    * @tparam LocalT only local of this type may be stored on fibers.
140    *                Locals of other types will be considered thread-locals.
141    */
142   template <typename LocalT>
143   FiberManager(LocalType<LocalT>,
144                std::unique_ptr<LoopController> loopController,
145                Options options = Options());
146
147
148   ~FiberManager();
149
150   /**
151    * Controller access.
152    */
153   LoopController& loopController();
154   const LoopController& loopController() const;
155
156   /**
157    * Keeps running ready tasks until the list of ready tasks is empty.
158    *
159    * @return True if there are any waiting tasks remaining.
160    */
161   bool loopUntilNoReady();
162
163   /**
164    * @return true if there are outstanding tasks.
165    */
166   bool hasTasks() const;
167
168   /**
169    * Sets exception callback which will be called if any of the tasks throws an
170    * exception.
171    *
172    * @param ec
173    */
174   void setExceptionCallback(ExceptionCallback ec);
175
176   /**
177    * Add a new task to be executed. Must be called from FiberManager's thread.
178    *
179    * @param func Task functor; must have a signature of `void func()`.
180    *             The object will be destroyed once task execution is complete.
181    */
182   template <typename F>
183   void addTask(F&& func);
184
185   /**
186    * Add a new task to be executed and return a future that will be set on
187    * return from func. Must be called from FiberManager's thread.
188    *
189    * @param func Task functor; must have a signature of `void func()`.
190    *             The object will be destroyed once task execution is complete.
191    */
192   template <typename F>
193   auto addTaskFuture(F&& func)
194       -> folly::Future<typename std::result_of<F()>::type>;
195   /**
196    * Add a new task to be executed. Safe to call from other threads.
197    *
198    * @param func Task function; must have a signature of `void func()`.
199    *             The object will be destroyed once task execution is complete.
200    */
201   template <typename F>
202   void addTaskRemote(F&& func);
203
204   /**
205    * Add a new task to be executed and return a future that will be set on
206    * return from func. Safe to call from other threads.
207    *
208    * @param func Task function; must have a signature of `void func()`.
209    *             The object will be destroyed once task execution is complete.
210    */
211   template <typename F>
212   auto addTaskRemoteFuture(F&& func)
213       -> folly::Future<typename std::result_of<F()>::type>;
214
215   // Executor interface calls addTaskRemote
216   void add(std::function<void()> f) {
217     addTaskRemote(std::move(f));
218   }
219
220   /**
221    * Add a new task. When the task is complete, execute finally(Try<Result>&&)
222    * on the main context.
223    *
224    * @param func Task functor; must have a signature of `T func()` for some T.
225    * @param finally Finally functor; must have a signature of
226    *                `void finally(Try<T>&&)` and will be passed
227    *                the result of func() (including the exception if occurred).
228    */
229   template <typename F, typename G>
230   void addTaskFinally(F&& func, G&& finally);
231
232   /**
233    * If called from a fiber, immediately switches to the FiberManager's context
234    * and runs func(), going back to the Fiber's context after completion.
235    * Outside a fiber, just calls func() directly.
236    *
237    * @return value returned by func().
238    */
239   template <typename F>
240   typename std::result_of<F()>::type
241   runInMainContext(F&& func);
242
243   /**
244    * Returns a refference to a fiber-local context for given Fiber. Should be
245    * always called with the same T for each fiber. Fiber-local context is lazily
246    * default-constructed on first request.
247    * When new task is scheduled via addTask / addTaskRemote from a fiber its
248    * fiber-local context is copied into the new fiber.
249    */
250   template <typename T>
251   T& local();
252
253   template <typename T>
254   static T& localThread();
255
256   /**
257    * @return How many fiber objects (and stacks) has this manager allocated.
258    */
259   size_t fibersAllocated() const;
260
261   /**
262    * @return How many of the allocated fiber objects are currently
263    * in the free pool.
264    */
265   size_t fibersPoolSize() const;
266
267   /**
268    * return     true if running activeFiber_ is not nullptr.
269    */
270   bool hasActiveFiber() const;
271
272   /**
273    * @return The currently running fiber or null if no fiber is executing.
274    */
275   Fiber* currentFiber() const {
276     return currentFiber_;
277   }
278
279   /**
280    * @return What was the most observed fiber stack usage (in bytes).
281    */
282   size_t stackHighWatermark() const;
283
284   /**
285    * Yield execution of the currently running fiber. Must only be called from a
286    * fiber executing on this FiberManager. The calling fiber will be scheduled
287    * when all other fibers have had a chance to run and the event loop is
288    * serviced.
289    */
290   void yield();
291
292   /**
293    * Setup fibers execution observation/instrumentation. Fiber locals are
294    * available to observer.
295    *
296    * @param observer  Fiber's execution observer.
297    */
298   void setObserver(ExecutionObserver* observer);
299
300   /**
301    * Setup fibers preempt runner.
302    */
303   void setPreemptRunner(InlineFunctionRunner* preemptRunner);
304
305   /**
306    * Returns an estimate of the number of fibers which are waiting to run (does
307    * not include fibers or tasks scheduled remotely).
308    */
309   size_t runQueueSize() const {
310     return readyFibers_.size() + yieldedFibers_.size();
311   }
312
313   static FiberManager& getFiberManager();
314   static FiberManager* getFiberManagerUnsafe();
315
316  private:
317   friend class Baton;
318   friend class Fiber;
319   template <typename F>
320   struct AddTaskHelper;
321   template <typename F, typename G>
322   struct AddTaskFinallyHelper;
323
324   struct RemoteTask {
325     template <typename F>
326     explicit RemoteTask(F&& f) :
327         func(std::forward<F>(f)),
328         rcontext(RequestContext::saveContext()) {}
329     template <typename F>
330     RemoteTask(F&& f, const Fiber::LocalData& localData_) :
331         func(std::forward<F>(f)),
332         localData(folly::make_unique<Fiber::LocalData>(localData_)),
333         rcontext(RequestContext::saveContext()) {}
334     std::function<void()> func;
335     std::unique_ptr<Fiber::LocalData> localData;
336     std::shared_ptr<RequestContext> rcontext;
337     AtomicLinkedListHook<RemoteTask> nextRemoteTask;
338   };
339
340   intptr_t activateFiber(Fiber* fiber);
341   intptr_t deactivateFiber(Fiber* fiber);
342
343   typedef folly::IntrusiveList<Fiber, &Fiber::listHook_> FiberTailQueue;
344
345   Fiber* activeFiber_{nullptr}; /**< active fiber, nullptr on main context */
346   /**
347    * Same as active fiber, but also set for functions run from fiber on main
348    * context.
349    */
350   Fiber* currentFiber_{nullptr};
351
352   FiberTailQueue readyFibers_;  /**< queue of fibers ready to be executed */
353   FiberTailQueue yieldedFibers_;  /**< queue of fibers which have yielded
354                                        execution */
355   FiberTailQueue fibersPool_;   /**< pool of unitialized Fiber objects */
356
357   size_t fibersAllocated_{0};   /**< total number of fibers allocated */
358   size_t fibersPoolSize_{0};    /**< total number of fibers in the free pool */
359   size_t fibersActive_{0};      /**< number of running or blocked fibers */
360   size_t fiberId_{0};           /**< id of last fiber used */
361
362   /**
363    * Maximum number of active fibers in the last period lasting
364    * Options::fibersPoolResizePeriod milliseconds.
365    */
366   size_t maxFibersActiveLastPeriod_{0};
367
368   FContext::ContextStruct mainContext_;  /**< stores loop function context */
369
370   std::unique_ptr<LoopController> loopController_;
371   bool isLoopScheduled_{false}; /**< was the ready loop scheduled to run? */
372
373   /**
374    * When we are inside FiberManager loop this points to FiberManager. Otherwise
375    * it's nullptr
376    */
377   static FOLLY_TLS FiberManager* currentFiberManager_;
378
379   /**
380    * Allocator used to allocate stack for Fibers in the pool.
381    * Allocates stack on the stack of the main context.
382    */
383   GuardPageAllocator stackAllocator_;
384
385   const Options options_;       /**< FiberManager options */
386
387   /**
388    * Largest observed individual Fiber stack usage in bytes.
389    */
390   size_t stackHighWatermark_{0};
391
392   /**
393    * Schedules a loop with loopController (unless already scheduled before).
394    */
395   void ensureLoopScheduled();
396
397   /**
398    * @return An initialized Fiber object from the pool
399    */
400   Fiber* getFiber();
401
402   /**
403    * Sets local data for given fiber if all conditions are met.
404    */
405   void initLocalData(Fiber& fiber);
406
407   /**
408    * Function passed to the await call.
409    */
410   std::function<void(Fiber&)> awaitFunc_;
411
412   /**
413    * Function passed to the runInMainContext call.
414    */
415   std::function<void()> immediateFunc_;
416
417   /**
418    * Preempt runner.
419    */
420   InlineFunctionRunner* preemptRunner_{nullptr};
421
422   /**
423    * Fiber's execution observer.
424    */
425   ExecutionObserver* observer_{nullptr};
426
427   ExceptionCallback exceptionCallback_; /**< task exception callback */
428
429   folly::AtomicLinkedList<Fiber, &Fiber::nextRemoteReady_> remoteReadyQueue_;
430
431   folly::AtomicLinkedList<RemoteTask, &RemoteTask::nextRemoteTask>
432       remoteTaskQueue_;
433
434   std::shared_ptr<TimeoutController> timeoutManager_;
435
436   struct FibersPoolResizer {
437     explicit FibersPoolResizer(FiberManager& fm) :
438       fiberManager_(fm) {}
439     void operator()();
440    private:
441     FiberManager& fiberManager_;
442   };
443
444   FibersPoolResizer fibersPoolResizer_;
445   bool fibersPoolResizerScheduled_{false};
446
447   void doFibersPoolResizing();
448
449   /**
450    * Only local of this type will be available for fibers.
451    */
452   std::type_index localType_;
453
454   void runReadyFiber(Fiber* fiber);
455   void remoteReadyInsert(Fiber* fiber);
456
457 #ifdef FOLLY_SANITIZE_ADDRESS
458
459   // These methods notify ASAN when a fiber is entered/exited so that ASAN can
460   // find the right stack extents when it needs to poison/unpoison the stack.
461
462   void registerFiberActivationWithAsan(Fiber* fiber);
463   void registerFiberDeactivationWithAsan(Fiber* fiber);
464
465 #endif // FOLLY_SANITIZE_ADDRESS
466 };
467
468 /**
469  * @return      true iff we are running in a fiber's context
470  */
471 inline bool onFiber() {
472   auto fm = FiberManager::getFiberManagerUnsafe();
473   return fm ? fm->hasActiveFiber() : false;
474 }
475
476 /**
477  * Add a new task to be executed.
478  *
479  * @param func Task functor; must have a signature of `void func()`.
480  *             The object will be destroyed once task execution is complete.
481  */
482 template <typename F>
483 inline void addTask(F&& func) {
484   return FiberManager::getFiberManager().addTask(std::forward<F>(func));
485 }
486
487 /**
488  * Add a new task. When the task is complete, execute finally(Try<Result>&&)
489  * on the main context.
490  * Task functor is run and destroyed on the fiber context.
491  * Finally functor is run and destroyed on the main context.
492  *
493  * @param func Task functor; must have a signature of `T func()` for some T.
494  * @param finally Finally functor; must have a signature of
495  *                `void finally(Try<T>&&)` and will be passed
496  *                the result of func() (including the exception if occurred).
497  */
498 template <typename F, typename G>
499 inline void addTaskFinally(F&& func, G&& finally) {
500   return FiberManager::getFiberManager().addTaskFinally(
501     std::forward<F>(func), std::forward<G>(finally));
502 }
503
504 /**
505  * Blocks task execution until given promise is fulfilled.
506  *
507  * Calls function passing in a Promise<T>, which has to be fulfilled.
508  *
509  * @return data which was used to fulfill the promise.
510  */
511 template <typename F>
512 typename FirstArgOf<F>::type::value_type
513 inline await(F&& func);
514
515 /**
516  * If called from a fiber, immediately switches to the FiberManager's context
517  * and runs func(), going back to the Fiber's context after completion.
518  * Outside a fiber, just calls func() directly.
519  *
520  * @return value returned by func().
521  */
522 template <typename F>
523 typename std::result_of<F()>::type
524 inline runInMainContext(F&& func) {
525   auto fm = FiberManager::getFiberManagerUnsafe();
526   if (UNLIKELY(fm == nullptr)) {
527     return func();
528   }
529   return fm->runInMainContext(std::forward<F>(func));
530 }
531
532 /**
533  * Returns a refference to a fiber-local context for given Fiber. Should be
534  * always called with the same T for each fiber. Fiber-local context is lazily
535  * default-constructed on first request.
536  * When new task is scheduled via addTask / addTaskRemote from a fiber its
537  * fiber-local context is copied into the new fiber.
538  */
539 template <typename T>
540 T& local() {
541   auto fm = FiberManager::getFiberManagerUnsafe();
542   if (fm) {
543     return fm->local<T>();
544   }
545   return FiberManager::localThread<T>();
546 }
547
548 inline void yield() {
549   auto fm = FiberManager::getFiberManagerUnsafe();
550   if (fm) {
551     fm->yield();
552   } else {
553     std::this_thread::yield();
554   }
555 }
556
557 }}
558
559 #include "FiberManager-inl.h"