Change ThreadManager interface to Executor
[folly.git] / folly / experimental / fibers / FiberManager.h
1 /*
2  * Copyright 2015 Facebook, Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *   http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 #pragma once
17
18 #include <functional>
19 #include <memory>
20 #include <queue>
21 #include <thread>
22 #include <typeindex>
23 #include <unordered_set>
24 #include <vector>
25
26 #include <folly/AtomicLinkedList.h>
27 #include <folly/Executor.h>
28 #include <folly/Likely.h>
29 #include <folly/IntrusiveList.h>
30 #include <folly/futures/Try.h>
31
32 #include <folly/experimental/fibers/BoostContextCompatibility.h>
33 #include <folly/experimental/fibers/ExecutionObserver.h>
34 #include <folly/experimental/fibers/Fiber.h>
35 #include <folly/experimental/fibers/traits.h>
36
37 #ifdef USE_GUARD_ALLOCATOR
38 #include <folly/experimental/fibers/GuardPageAllocator.h>
39 #endif
40
41 namespace folly { namespace fibers {
42
43 class Baton;
44 class Fiber;
45 class LoopController;
46 class TimeoutController;
47
48 template <typename T>
49 class LocalType {
50 };
51
52 /**
53  * @class FiberManager
54  * @brief Single-threaded task execution engine.
55  *
56  * FiberManager allows semi-parallel task execution on the same thread. Each
57  * task can notify FiberManager that it is blocked on something (via await())
58  * call. This will pause execution of this task and it will be resumed only
59  * when it is unblocked (via setData()).
60  */
61 class FiberManager : public ::folly::Executor {
62  public:
63   struct Options {
64 #ifdef FOLLY_SANITIZE_ADDRESS
65     /* ASAN needs a lot of extra stack space.
66        16x is a conservative estimate, 8x also worked with tests
67        where it mattered.  Note that overallocating here does not necessarily
68        increase RSS, since unused memory is pretty much free. */
69     static constexpr size_t kDefaultStackSize{16 * 16 * 1024};
70 #else
71     static constexpr size_t kDefaultStackSize{16 * 1024};
72 #endif
73     /**
74      * Maximum stack size for fibers which will be used for executing all the
75      * tasks.
76      */
77     size_t stackSize{kDefaultStackSize};
78
79     /**
80      * Record exact amount of stack used.
81      *
82      * This is fairly expensive: we fill each newly allocated stack
83      * with some known value and find the boundary of unused stack
84      * with linear search every time we surrender the stack back to fibersPool.
85      * 0 disables stack recording.
86      */
87     size_t recordStackEvery{0};
88
89     /**
90      * Keep at most this many free fibers in the pool.
91      * This way the total number of fibers in the system is always bounded
92      * by the number of active fibers + maxFibersPoolSize.
93      */
94     size_t maxFibersPoolSize{1000};
95
96     constexpr Options() {}
97   };
98
99   typedef std::function<void(std::exception_ptr, std::string)>
100   ExceptionCallback;
101
102   FiberManager(const FiberManager&) = delete;
103   FiberManager& operator=(const FiberManager&) = delete;
104
105   /**
106    * Initializes, but doesn't start FiberManager loop
107    *
108    * @param loopController
109    * @param options FiberManager options
110    */
111   explicit FiberManager(std::unique_ptr<LoopController> loopController,
112                         Options options = Options());
113
114   /**
115    * Initializes, but doesn't start FiberManager loop
116    *
117    * @param loopController
118    * @param options FiberManager options
119    * @tparam LocalT only local of this type may be stored on fibers.
120    *                Locals of other types will be considered thread-locals.
121    */
122   template <typename LocalT>
123   FiberManager(LocalType<LocalT>,
124                std::unique_ptr<LoopController> loopController,
125                Options options = Options());
126
127
128   ~FiberManager();
129
130   /**
131    * Controller access.
132    */
133   LoopController& loopController();
134   const LoopController& loopController() const;
135
136   /**
137    * Keeps running ready tasks until the list of ready tasks is empty.
138    *
139    * @return True if there are any waiting tasks remaining.
140    */
141   bool loopUntilNoReady();
142
143   /**
144    * @return true if there are outstanding tasks.
145    */
146   bool hasTasks() const;
147
148   /**
149    * Sets exception callback which will be called if any of the tasks throws an
150    * exception.
151    *
152    * @param ec
153    */
154   void setExceptionCallback(ExceptionCallback ec);
155
156   /**
157    * Add a new task to be executed. Must be called from FiberManager's thread.
158    *
159    * @param func Task functor; must have a signature of `void func()`.
160    *             The object will be destroyed once task execution is complete.
161    */
162   template <typename F>
163   void addTask(F&& func);
164
165   /**
166    * Add a new task to be executed. Safe to call from other threads.
167    *
168    * @param func Task function; must have a signature of `void func()`.
169    *             The object will be destroyed once task execution is complete.
170    */
171   template <typename F>
172   void addTaskRemote(F&& func);
173
174   // Executor interface calls addTaskRemote
175   void add(std::function<void()> f) {
176     addTaskRemote(std::move(f));
177   }
178
179   /**
180    * Add a new task. When the task is complete, execute finally(Try<Result>&&)
181    * on the main context.
182    *
183    * @param func Task functor; must have a signature of `T func()` for some T.
184    * @param finally Finally functor; must have a signature of
185    *                `void finally(Try<T>&&)` and will be passed
186    *                the result of func() (including the exception if occurred).
187    */
188   template <typename F, typename G>
189   void addTaskFinally(F&& func, G&& finally);
190
191   /**
192    * If called from a fiber, immediately switches to the FiberManager's context
193    * and runs func(), going back to the Fiber's context after completion.
194    * Outside a fiber, just calls func() directly.
195    *
196    * @return value returned by func().
197    */
198   template <typename F>
199   typename std::result_of<F()>::type
200   runInMainContext(F&& func);
201
202   /**
203    * Returns a refference to a fiber-local context for given Fiber. Should be
204    * always called with the same T for each fiber. Fiber-local context is lazily
205    * default-constructed on first request.
206    * When new task is scheduled via addTask / addTaskRemote from a fiber its
207    * fiber-local context is copied into the new fiber.
208    */
209   template <typename T>
210   T& local();
211
212   template <typename T>
213   static T& localThread();
214
215   /**
216    * @return How many fiber objects (and stacks) has this manager allocated.
217    */
218   size_t fibersAllocated() const;
219
220   /**
221    * @return How many of the allocated fiber objects are currently
222    * in the free pool.
223    */
224   size_t fibersPoolSize() const;
225
226   /**
227    * return     true if running activeFiber_ is not nullptr.
228    */
229   bool hasActiveFiber() const;
230
231   /**
232    * @return What was the most observed fiber stack usage (in bytes).
233    */
234   size_t stackHighWatermark() const;
235
236   /**
237    * Yield execution of the currently running fiber. Must only be called from a
238    * fiber executing on this FiberManager. The calling fiber will be scheduled
239    * when all other fibers have had a chance to run and the event loop is
240    * serviced.
241    */
242   void yield();
243
244   /**
245    * Setup fibers execution observation/instrumentation. Fiber locals are
246    * available to observer.
247    *
248    * @param observer  Fiber's execution observer.
249    */
250   void setObserver(ExecutionObserver* observer);
251
252   static FiberManager& getFiberManager();
253   static FiberManager* getFiberManagerUnsafe();
254
255  private:
256   friend class Baton;
257   friend class Fiber;
258   template <typename F>
259   struct AddTaskHelper;
260   template <typename F, typename G>
261   struct AddTaskFinallyHelper;
262
263   struct RemoteTask {
264     template <typename F>
265     explicit RemoteTask(F&& f) : func(std::forward<F>(f)) {}
266     template <typename F>
267     RemoteTask(F&& f, const Fiber::LocalData& localData_) :
268         func(std::forward<F>(f)),
269         localData(folly::make_unique<Fiber::LocalData>(localData_)) {}
270     std::function<void()> func;
271     std::unique_ptr<Fiber::LocalData> localData;
272     AtomicLinkedListHook<RemoteTask> nextRemoteTask;
273   };
274
275   typedef folly::IntrusiveList<Fiber, &Fiber::listHook_> FiberTailQueue;
276
277   Fiber* activeFiber_{nullptr}; /**< active fiber, nullptr on main context */
278   /**
279    * Same as active fiber, but also set for functions run from fiber on main
280    * context.
281    */
282   Fiber* currentFiber_{nullptr};
283
284   FiberTailQueue readyFibers_;  /**< queue of fibers ready to be executed */
285   FiberTailQueue yieldedFibers_;  /**< queue of fibers which have yielded
286                                        execution */
287   FiberTailQueue fibersPool_;   /**< pool of unitialized Fiber objects */
288
289   size_t fibersAllocated_{0};   /**< total number of fibers allocated */
290   size_t fibersPoolSize_{0};    /**< total number of fibers in the free pool */
291   size_t fibersActive_{0};      /**< number of running or blocked fibers */
292   size_t fiberId_{0};           /**< id of last fiber used */
293
294   FContext::ContextStruct mainContext_;  /**< stores loop function context */
295
296   std::unique_ptr<LoopController> loopController_;
297   bool isLoopScheduled_{false}; /**< was the ready loop scheduled to run? */
298
299   /**
300    * When we are inside FiberManager loop this points to FiberManager. Otherwise
301    * it's nullptr
302    */
303   static __thread FiberManager* currentFiberManager_;
304
305   /**
306    * runInMainContext implementation for non-void functions.
307    */
308   template <typename F>
309   typename std::enable_if<
310     !std::is_same<typename std::result_of<F()>::type, void>::value,
311     typename std::result_of<F()>::type>::type
312   runInMainContextHelper(F&& func);
313
314   /**
315    * runInMainContext implementation for void functions
316    */
317   template <typename F>
318   typename std::enable_if<
319     std::is_same<typename std::result_of<F()>::type, void>::value,
320     void>::type
321   runInMainContextHelper(F&& func);
322
323   /**
324    * Allocator used to allocate stack for Fibers in the pool.
325    * Allocates stack on the stack of the main context.
326    */
327 #ifdef USE_GUARD_ALLOCATOR
328   /* This is too slow for production use; can be fixed
329      if we allocated all stack storage once upfront */
330   GuardPageAllocator stackAllocator_;
331 #else
332   std::allocator<unsigned char> stackAllocator_;
333 #endif
334
335   const Options options_;       /**< FiberManager options */
336
337   /**
338    * Largest observed individual Fiber stack usage in bytes.
339    */
340   size_t stackHighWatermark_{0};
341
342   /**
343    * Schedules a loop with loopController (unless already scheduled before).
344    */
345   void ensureLoopScheduled();
346
347   /**
348    * @return An initialized Fiber object from the pool
349    */
350   Fiber* getFiber();
351
352   /**
353    * Sets local data for given fiber if all conditions are met.
354    */
355   void initLocalData(Fiber& fiber);
356
357   /**
358    * Function passed to the await call.
359    */
360   std::function<void(Fiber&)> awaitFunc_;
361
362   /**
363    * Function passed to the runInMainContext call.
364    */
365   std::function<void()> immediateFunc_;
366
367   /**
368    * Fiber's execution observer.
369    */
370   ExecutionObserver* observer_{nullptr};
371
372   ExceptionCallback exceptionCallback_; /**< task exception callback */
373
374   folly::AtomicLinkedList<Fiber, &Fiber::nextRemoteReady_> remoteReadyQueue_;
375
376   folly::AtomicLinkedList<RemoteTask, &RemoteTask::nextRemoteTask>
377       remoteTaskQueue_;
378
379   std::shared_ptr<TimeoutController> timeoutManager_;
380
381   /**
382    * Only local of this type will be available for fibers.
383    */
384   std::type_index localType_;
385
386   void runReadyFiber(Fiber* fiber);
387   void remoteReadyInsert(Fiber* fiber);
388 };
389
390 /**
391  * @return      true iff we are running in a fiber's context
392  */
393 inline bool onFiber() {
394   auto fm = FiberManager::getFiberManagerUnsafe();
395   return fm ? fm->hasActiveFiber() : false;
396 }
397
398 /**
399  * Add a new task to be executed.
400  *
401  * @param func Task functor; must have a signature of `void func()`.
402  *             The object will be destroyed once task execution is complete.
403  */
404 template <typename F>
405 inline void addTask(F&& func) {
406   return FiberManager::getFiberManager().addTask(std::forward<F>(func));
407 }
408
409 /**
410  * Add a new task. When the task is complete, execute finally(Try<Result>&&)
411  * on the main context.
412  * Task functor is run and destroyed on the fiber context.
413  * Finally functor is run and destroyed on the main context.
414  *
415  * @param func Task functor; must have a signature of `T func()` for some T.
416  * @param finally Finally functor; must have a signature of
417  *                `void finally(Try<T>&&)` and will be passed
418  *                the result of func() (including the exception if occurred).
419  */
420 template <typename F, typename G>
421 inline void addTaskFinally(F&& func, G&& finally) {
422   return FiberManager::getFiberManager().addTaskFinally(
423     std::forward<F>(func), std::forward<G>(finally));
424 }
425
426 /**
427  * Blocks task execution until given promise is fulfilled.
428  *
429  * Calls function passing in a Promise<T>, which has to be fulfilled.
430  *
431  * @return data which was used to fulfill the promise.
432  */
433 template <typename F>
434 typename FirstArgOf<F>::type::value_type
435 inline await(F&& func);
436
437 /**
438  * If called from a fiber, immediately switches to the FiberManager's context
439  * and runs func(), going back to the Fiber's context after completion.
440  * Outside a fiber, just calls func() directly.
441  *
442  * @return value returned by func().
443  */
444 template <typename F>
445 typename std::result_of<F()>::type
446 inline runInMainContext(F&& func) {
447   auto fm = FiberManager::getFiberManagerUnsafe();
448   if (UNLIKELY(fm == nullptr)) {
449     return func();
450   }
451   return fm->runInMainContext(std::forward<F>(func));
452 }
453
454 /**
455  * Returns a refference to a fiber-local context for given Fiber. Should be
456  * always called with the same T for each fiber. Fiber-local context is lazily
457  * default-constructed on first request.
458  * When new task is scheduled via addTask / addTaskRemote from a fiber its
459  * fiber-local context is copied into the new fiber.
460  */
461 template <typename T>
462 T& local() {
463   auto fm = FiberManager::getFiberManagerUnsafe();
464   if (fm) {
465     return fm->local<T>();
466   }
467   return FiberManager::localThread<T>();
468 }
469
470 inline void yield() {
471   auto fm = FiberManager::getFiberManagerUnsafe();
472   if (fm) {
473     fm->yield();
474   } else {
475     std::this_thread::yield();
476   }
477 }
478
479 }}
480
481 #include "FiberManager-inl.h"