RequestContext support
[folly.git] / folly / experimental / fibers / FiberManager.h
1 /*
2  * Copyright 2015 Facebook, Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *   http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 #pragma once
17
18 #include <functional>
19 #include <memory>
20 #include <queue>
21 #include <thread>
22 #include <typeindex>
23 #include <unordered_set>
24 #include <vector>
25
26 #include <folly/AtomicLinkedList.h>
27 #include <folly/Executor.h>
28 #include <folly/Likely.h>
29 #include <folly/IntrusiveList.h>
30 #include <folly/io/async/Request.h>
31 #include <folly/futures/Try.h>
32
33 #include <folly/experimental/ExecutionObserver.h>
34 #include <folly/experimental/fibers/BoostContextCompatibility.h>
35 #include <folly/experimental/fibers/Fiber.h>
36 #include <folly/experimental/fibers/GuardPageAllocator.h>
37 #include <folly/experimental/fibers/traits.h>
38
39 namespace folly { namespace fibers {
40
41 class Baton;
42 class Fiber;
43 class LoopController;
44 class TimeoutController;
45
46 template <typename T>
47 class LocalType {
48 };
49
50 /**
51  * @class FiberManager
52  * @brief Single-threaded task execution engine.
53  *
54  * FiberManager allows semi-parallel task execution on the same thread. Each
55  * task can notify FiberManager that it is blocked on something (via await())
56  * call. This will pause execution of this task and it will be resumed only
57  * when it is unblocked (via setData()).
58  */
59 class FiberManager : public ::folly::Executor {
60  public:
61   struct Options {
62     static constexpr size_t kDefaultStackSize{16 * 1024};
63
64     /**
65      * Maximum stack size for fibers which will be used for executing all the
66      * tasks.
67      */
68     size_t stackSize{kDefaultStackSize};
69
70     /**
71      * Record exact amount of stack used.
72      *
73      * This is fairly expensive: we fill each newly allocated stack
74      * with some known value and find the boundary of unused stack
75      * with linear search every time we surrender the stack back to fibersPool.
76      * 0 disables stack recording.
77      */
78     size_t recordStackEvery{0};
79
80     /**
81      * Keep at most this many free fibers in the pool.
82      * This way the total number of fibers in the system is always bounded
83      * by the number of active fibers + maxFibersPoolSize.
84      */
85     size_t maxFibersPoolSize{1000};
86
87     constexpr Options() {}
88   };
89
90   typedef std::function<void(std::exception_ptr, std::string)>
91   ExceptionCallback;
92
93   FiberManager(const FiberManager&) = delete;
94   FiberManager& operator=(const FiberManager&) = delete;
95
96   /**
97    * Initializes, but doesn't start FiberManager loop
98    *
99    * @param loopController
100    * @param options FiberManager options
101    */
102   explicit FiberManager(std::unique_ptr<LoopController> loopController,
103                         Options options = Options());
104
105   /**
106    * Initializes, but doesn't start FiberManager loop
107    *
108    * @param loopController
109    * @param options FiberManager options
110    * @tparam LocalT only local of this type may be stored on fibers.
111    *                Locals of other types will be considered thread-locals.
112    */
113   template <typename LocalT>
114   FiberManager(LocalType<LocalT>,
115                std::unique_ptr<LoopController> loopController,
116                Options options = Options());
117
118
119   ~FiberManager();
120
121   /**
122    * Controller access.
123    */
124   LoopController& loopController();
125   const LoopController& loopController() const;
126
127   /**
128    * Keeps running ready tasks until the list of ready tasks is empty.
129    *
130    * @return True if there are any waiting tasks remaining.
131    */
132   bool loopUntilNoReady();
133
134   /**
135    * @return true if there are outstanding tasks.
136    */
137   bool hasTasks() const;
138
139   /**
140    * Sets exception callback which will be called if any of the tasks throws an
141    * exception.
142    *
143    * @param ec
144    */
145   void setExceptionCallback(ExceptionCallback ec);
146
147   /**
148    * Add a new task to be executed. Must be called from FiberManager's thread.
149    *
150    * @param func Task functor; must have a signature of `void func()`.
151    *             The object will be destroyed once task execution is complete.
152    */
153   template <typename F>
154   void addTask(F&& func);
155
156   /**
157    * Add a new task to be executed. Safe to call from other threads.
158    *
159    * @param func Task function; must have a signature of `void func()`.
160    *             The object will be destroyed once task execution is complete.
161    */
162   template <typename F>
163   void addTaskRemote(F&& func);
164
165   // Executor interface calls addTaskRemote
166   void add(std::function<void()> f) {
167     addTaskRemote(std::move(f));
168   }
169
170   /**
171    * Add a new task. When the task is complete, execute finally(Try<Result>&&)
172    * on the main context.
173    *
174    * @param func Task functor; must have a signature of `T func()` for some T.
175    * @param finally Finally functor; must have a signature of
176    *                `void finally(Try<T>&&)` and will be passed
177    *                the result of func() (including the exception if occurred).
178    */
179   template <typename F, typename G>
180   void addTaskFinally(F&& func, G&& finally);
181
182   /**
183    * If called from a fiber, immediately switches to the FiberManager's context
184    * and runs func(), going back to the Fiber's context after completion.
185    * Outside a fiber, just calls func() directly.
186    *
187    * @return value returned by func().
188    */
189   template <typename F>
190   typename std::result_of<F()>::type
191   runInMainContext(F&& func);
192
193   /**
194    * Returns a refference to a fiber-local context for given Fiber. Should be
195    * always called with the same T for each fiber. Fiber-local context is lazily
196    * default-constructed on first request.
197    * When new task is scheduled via addTask / addTaskRemote from a fiber its
198    * fiber-local context is copied into the new fiber.
199    */
200   template <typename T>
201   T& local();
202
203   template <typename T>
204   static T& localThread();
205
206   /**
207    * @return How many fiber objects (and stacks) has this manager allocated.
208    */
209   size_t fibersAllocated() const;
210
211   /**
212    * @return How many of the allocated fiber objects are currently
213    * in the free pool.
214    */
215   size_t fibersPoolSize() const;
216
217   /**
218    * return     true if running activeFiber_ is not nullptr.
219    */
220   bool hasActiveFiber() const;
221
222   /**
223    * @return What was the most observed fiber stack usage (in bytes).
224    */
225   size_t stackHighWatermark() const;
226
227   /**
228    * Yield execution of the currently running fiber. Must only be called from a
229    * fiber executing on this FiberManager. The calling fiber will be scheduled
230    * when all other fibers have had a chance to run and the event loop is
231    * serviced.
232    */
233   void yield();
234
235   /**
236    * Setup fibers execution observation/instrumentation. Fiber locals are
237    * available to observer.
238    *
239    * @param observer  Fiber's execution observer.
240    */
241   void setObserver(ExecutionObserver* observer);
242
243   static FiberManager& getFiberManager();
244   static FiberManager* getFiberManagerUnsafe();
245
246  private:
247   friend class Baton;
248   friend class Fiber;
249   template <typename F>
250   struct AddTaskHelper;
251   template <typename F, typename G>
252   struct AddTaskFinallyHelper;
253
254   struct RemoteTask {
255     template <typename F>
256     explicit RemoteTask(F&& f) :
257         func(std::forward<F>(f)),
258         rcontext(RequestContext::saveContext()) {}
259     template <typename F>
260     RemoteTask(F&& f, const Fiber::LocalData& localData_) :
261         func(std::forward<F>(f)),
262         localData(folly::make_unique<Fiber::LocalData>(localData_)),
263         rcontext(RequestContext::saveContext()) {}
264     std::function<void()> func;
265     std::unique_ptr<Fiber::LocalData> localData;
266     std::shared_ptr<RequestContext> rcontext;
267     AtomicLinkedListHook<RemoteTask> nextRemoteTask;
268   };
269
270   typedef folly::IntrusiveList<Fiber, &Fiber::listHook_> FiberTailQueue;
271
272   Fiber* activeFiber_{nullptr}; /**< active fiber, nullptr on main context */
273   /**
274    * Same as active fiber, but also set for functions run from fiber on main
275    * context.
276    */
277   Fiber* currentFiber_{nullptr};
278
279   FiberTailQueue readyFibers_;  /**< queue of fibers ready to be executed */
280   FiberTailQueue yieldedFibers_;  /**< queue of fibers which have yielded
281                                        execution */
282   FiberTailQueue fibersPool_;   /**< pool of unitialized Fiber objects */
283
284   size_t fibersAllocated_{0};   /**< total number of fibers allocated */
285   size_t fibersPoolSize_{0};    /**< total number of fibers in the free pool */
286   size_t fibersActive_{0};      /**< number of running or blocked fibers */
287   size_t fiberId_{0};           /**< id of last fiber used */
288
289   FContext::ContextStruct mainContext_;  /**< stores loop function context */
290
291   std::unique_ptr<LoopController> loopController_;
292   bool isLoopScheduled_{false}; /**< was the ready loop scheduled to run? */
293
294   /**
295    * When we are inside FiberManager loop this points to FiberManager. Otherwise
296    * it's nullptr
297    */
298   static __thread FiberManager* currentFiberManager_;
299
300   /**
301    * runInMainContext implementation for non-void functions.
302    */
303   template <typename F>
304   typename std::enable_if<
305     !std::is_same<typename std::result_of<F()>::type, void>::value,
306     typename std::result_of<F()>::type>::type
307   runInMainContextHelper(F&& func);
308
309   /**
310    * runInMainContext implementation for void functions
311    */
312   template <typename F>
313   typename std::enable_if<
314     std::is_same<typename std::result_of<F()>::type, void>::value,
315     void>::type
316   runInMainContextHelper(F&& func);
317
318   /**
319    * Allocator used to allocate stack for Fibers in the pool.
320    * Allocates stack on the stack of the main context.
321    */
322   GuardPageAllocator stackAllocator_;
323
324   const Options options_;       /**< FiberManager options */
325
326   /**
327    * Largest observed individual Fiber stack usage in bytes.
328    */
329   size_t stackHighWatermark_{0};
330
331   /**
332    * Schedules a loop with loopController (unless already scheduled before).
333    */
334   void ensureLoopScheduled();
335
336   /**
337    * @return An initialized Fiber object from the pool
338    */
339   Fiber* getFiber();
340
341   /**
342    * Sets local data for given fiber if all conditions are met.
343    */
344   void initLocalData(Fiber& fiber);
345
346   /**
347    * Function passed to the await call.
348    */
349   std::function<void(Fiber&)> awaitFunc_;
350
351   /**
352    * Function passed to the runInMainContext call.
353    */
354   std::function<void()> immediateFunc_;
355
356   /**
357    * Fiber's execution observer.
358    */
359   ExecutionObserver* observer_{nullptr};
360
361   ExceptionCallback exceptionCallback_; /**< task exception callback */
362
363   folly::AtomicLinkedList<Fiber, &Fiber::nextRemoteReady_> remoteReadyQueue_;
364
365   folly::AtomicLinkedList<RemoteTask, &RemoteTask::nextRemoteTask>
366       remoteTaskQueue_;
367
368   std::shared_ptr<TimeoutController> timeoutManager_;
369
370   /**
371    * Only local of this type will be available for fibers.
372    */
373   std::type_index localType_;
374
375   void runReadyFiber(Fiber* fiber);
376   void remoteReadyInsert(Fiber* fiber);
377 };
378
379 /**
380  * @return      true iff we are running in a fiber's context
381  */
382 inline bool onFiber() {
383   auto fm = FiberManager::getFiberManagerUnsafe();
384   return fm ? fm->hasActiveFiber() : false;
385 }
386
387 /**
388  * Add a new task to be executed.
389  *
390  * @param func Task functor; must have a signature of `void func()`.
391  *             The object will be destroyed once task execution is complete.
392  */
393 template <typename F>
394 inline void addTask(F&& func) {
395   return FiberManager::getFiberManager().addTask(std::forward<F>(func));
396 }
397
398 /**
399  * Add a new task. When the task is complete, execute finally(Try<Result>&&)
400  * on the main context.
401  * Task functor is run and destroyed on the fiber context.
402  * Finally functor is run and destroyed on the main context.
403  *
404  * @param func Task functor; must have a signature of `T func()` for some T.
405  * @param finally Finally functor; must have a signature of
406  *                `void finally(Try<T>&&)` and will be passed
407  *                the result of func() (including the exception if occurred).
408  */
409 template <typename F, typename G>
410 inline void addTaskFinally(F&& func, G&& finally) {
411   return FiberManager::getFiberManager().addTaskFinally(
412     std::forward<F>(func), std::forward<G>(finally));
413 }
414
415 /**
416  * Blocks task execution until given promise is fulfilled.
417  *
418  * Calls function passing in a Promise<T>, which has to be fulfilled.
419  *
420  * @return data which was used to fulfill the promise.
421  */
422 template <typename F>
423 typename FirstArgOf<F>::type::value_type
424 inline await(F&& func);
425
426 /**
427  * If called from a fiber, immediately switches to the FiberManager's context
428  * and runs func(), going back to the Fiber's context after completion.
429  * Outside a fiber, just calls func() directly.
430  *
431  * @return value returned by func().
432  */
433 template <typename F>
434 typename std::result_of<F()>::type
435 inline runInMainContext(F&& func) {
436   auto fm = FiberManager::getFiberManagerUnsafe();
437   if (UNLIKELY(fm == nullptr)) {
438     return func();
439   }
440   return fm->runInMainContext(std::forward<F>(func));
441 }
442
443 /**
444  * Returns a refference to a fiber-local context for given Fiber. Should be
445  * always called with the same T for each fiber. Fiber-local context is lazily
446  * default-constructed on first request.
447  * When new task is scheduled via addTask / addTaskRemote from a fiber its
448  * fiber-local context is copied into the new fiber.
449  */
450 template <typename T>
451 T& local() {
452   auto fm = FiberManager::getFiberManagerUnsafe();
453   if (fm) {
454     return fm->local<T>();
455   }
456   return FiberManager::localThread<T>();
457 }
458
459 inline void yield() {
460   auto fm = FiberManager::getFiberManagerUnsafe();
461   if (fm) {
462     fm->yield();
463   } else {
464     std::this_thread::yield();
465   }
466 }
467
468 }}
469
470 #include "FiberManager-inl.h"