Support nested FiberManagers
[folly.git] / folly / experimental / fibers / FiberManager-inl.h
1 /*
2  * Copyright 2016 Facebook, Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *   http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 #pragma once
17
18 #include <cassert>
19
20 #include <folly/CPortability.h>
21 #include <folly/Memory.h>
22 #include <folly/Optional.h>
23 #include <folly/Portability.h>
24 #include <folly/ScopeGuard.h>
25 #ifdef __APPLE__
26 #include <folly/ThreadLocal.h>
27 #endif
28 #include <folly/experimental/fibers/Baton.h>
29 #include <folly/experimental/fibers/Fiber.h>
30 #include <folly/experimental/fibers/LoopController.h>
31 #include <folly/experimental/fibers/Promise.h>
32 #include <folly/futures/Promise.h>
33 #include <folly/futures/Try.h>
34
35 namespace folly { namespace fibers {
36
37 namespace {
38
39 inline FiberManager::Options preprocessOptions(FiberManager::Options opts) {
40 #ifdef FOLLY_SANITIZE_ADDRESS
41   /* ASAN needs a lot of extra stack space.
42      16x is a conservative estimate, 8x also worked with tests
43      where it mattered.  Note that overallocating here does not necessarily
44      increase RSS, since unused memory is pretty much free. */
45   opts.stackSize *= 16;
46 #endif
47   return opts;
48 }
49
50 }  // anonymous
51
52 inline void FiberManager::ensureLoopScheduled() {
53   if (isLoopScheduled_) {
54     return;
55   }
56
57   isLoopScheduled_ = true;
58   loopController_->schedule();
59 }
60
61 inline void FiberManager::runReadyFiber(Fiber* fiber) {
62   SCOPE_EXIT {
63     assert(currentFiber_ == nullptr);
64     assert(activeFiber_ == nullptr);
65   };
66
67   assert(fiber->state_ == Fiber::NOT_STARTED ||
68          fiber->state_ == Fiber::READY_TO_RUN);
69   currentFiber_ = fiber;
70   fiber->rcontext_ = RequestContext::setContext(std::move(fiber->rcontext_));
71   if (observer_) {
72     observer_->starting(reinterpret_cast<uintptr_t>(fiber));
73   }
74
75   while (fiber->state_ == Fiber::NOT_STARTED ||
76          fiber->state_ == Fiber::READY_TO_RUN) {
77     activeFiber_ = fiber;
78     jumpContext(&mainContext_, &fiber->fcontext_, fiber->data_);
79     if (fiber->state_ == Fiber::AWAITING_IMMEDIATE) {
80       try {
81         immediateFunc_();
82       } catch (...) {
83         exceptionCallback_(std::current_exception(), "running immediateFunc_");
84       }
85       immediateFunc_ = nullptr;
86       fiber->state_ = Fiber::READY_TO_RUN;
87     }
88   }
89
90   if (fiber->state_ == Fiber::AWAITING) {
91     awaitFunc_(*fiber);
92     awaitFunc_ = nullptr;
93     if (observer_) {
94       observer_->stopped(reinterpret_cast<uintptr_t>(fiber));
95     }
96     currentFiber_ = nullptr;
97     fiber->rcontext_ = RequestContext::setContext(std::move(fiber->rcontext_));
98   } else if (fiber->state_ == Fiber::INVALID) {
99     assert(fibersActive_ > 0);
100     --fibersActive_;
101     // Making sure that task functor is deleted once task is complete.
102     // NOTE: we must do it on main context, as the fiber is not
103     // running at this point.
104     fiber->func_ = nullptr;
105     fiber->resultFunc_ = nullptr;
106     if (fiber->finallyFunc_) {
107       try {
108         fiber->finallyFunc_();
109       } catch (...) {
110         exceptionCallback_(std::current_exception(), "running finallyFunc_");
111       }
112       fiber->finallyFunc_ = nullptr;
113     }
114     // Make sure LocalData is not accessible from its destructor
115     if (observer_) {
116       observer_->stopped(reinterpret_cast<uintptr_t>(fiber));
117     }
118     currentFiber_ = nullptr;
119     fiber->rcontext_ = RequestContext::setContext(std::move(fiber->rcontext_));
120     fiber->localData_.reset();
121     fiber->rcontext_.reset();
122
123     if (fibersPoolSize_ < options_.maxFibersPoolSize ||
124         options_.fibersPoolResizePeriodMs > 0) {
125       fibersPool_.push_front(*fiber);
126       ++fibersPoolSize_;
127     } else {
128       delete fiber;
129       assert(fibersAllocated_ > 0);
130       --fibersAllocated_;
131     }
132   } else if (fiber->state_ == Fiber::YIELDED) {
133     if (observer_) {
134       observer_->stopped(reinterpret_cast<uintptr_t>(fiber));
135     }
136     currentFiber_ = nullptr;
137     fiber->rcontext_ = RequestContext::setContext(std::move(fiber->rcontext_));
138     fiber->state_ = Fiber::READY_TO_RUN;
139     yieldedFibers_.push_back(*fiber);
140   }
141 }
142
143 inline bool FiberManager::loopUntilNoReady() {
144   // Support nested FiberManagers
145   auto originalFiberManager = this;
146   std::swap(currentFiberManager_, originalFiberManager);
147
148   SCOPE_EXIT {
149     isLoopScheduled_ = false;
150     if (!readyFibers_.empty()) {
151       ensureLoopScheduled();
152     }
153     std::swap(currentFiberManager_, originalFiberManager);
154     CHECK_EQ(this, originalFiberManager);
155   };
156
157   bool hadRemoteFiber = true;
158   while (hadRemoteFiber) {
159     hadRemoteFiber = false;
160
161     while (!readyFibers_.empty()) {
162       auto& fiber = readyFibers_.front();
163       readyFibers_.pop_front();
164       runReadyFiber(&fiber);
165     }
166
167     remoteReadyQueue_.sweep(
168       [this, &hadRemoteFiber] (Fiber* fiber) {
169         runReadyFiber(fiber);
170         hadRemoteFiber = true;
171       }
172     );
173
174     remoteTaskQueue_.sweep(
175       [this, &hadRemoteFiber] (RemoteTask* taskPtr) {
176         std::unique_ptr<RemoteTask> task(taskPtr);
177         auto fiber = getFiber();
178         if (task->localData) {
179           fiber->localData_ = *task->localData;
180         }
181         fiber->rcontext_ = std::move(task->rcontext);
182
183         fiber->setFunction(std::move(task->func));
184         fiber->data_ = reinterpret_cast<intptr_t>(fiber);
185         if (observer_) {
186           observer_->runnable(reinterpret_cast<uintptr_t>(fiber));
187         }
188         runReadyFiber(fiber);
189         hadRemoteFiber = true;
190       }
191     );
192   }
193
194   if (observer_) {
195     for (auto& yielded : yieldedFibers_) {
196       observer_->runnable(reinterpret_cast<uintptr_t>(&yielded));
197     }
198   }
199   readyFibers_.splice(readyFibers_.end(), yieldedFibers_);
200
201   return fibersActive_ > 0;
202 }
203
204 // We need this to be in a struct, not inlined in addTask, because clang crashes
205 // otherwise.
206 template <typename F>
207 struct FiberManager::AddTaskHelper {
208   class Func;
209
210   static constexpr bool allocateInBuffer =
211     sizeof(Func) <= Fiber::kUserBufferSize;
212
213   class Func {
214    public:
215     Func(F&& func, FiberManager& fm) :
216         func_(std::forward<F>(func)), fm_(fm) {}
217
218     void operator()() {
219       try {
220         func_();
221       } catch (...) {
222         fm_.exceptionCallback_(std::current_exception(),
223                                "running Func functor");
224       }
225       if (allocateInBuffer) {
226         this->~Func();
227       } else {
228         delete this;
229       }
230     }
231
232    private:
233     F func_;
234     FiberManager& fm_;
235   };
236 };
237
238 template <typename F>
239 void FiberManager::addTask(F&& func) {
240   typedef AddTaskHelper<F> Helper;
241
242   auto fiber = getFiber();
243   initLocalData(*fiber);
244
245   if (Helper::allocateInBuffer) {
246     auto funcLoc = static_cast<typename Helper::Func*>(fiber->getUserBuffer());
247     new (funcLoc) typename Helper::Func(std::forward<F>(func), *this);
248
249     fiber->setFunction(std::ref(*funcLoc));
250   } else {
251     auto funcLoc = new typename Helper::Func(std::forward<F>(func), *this);
252
253     fiber->setFunction(std::ref(*funcLoc));
254   }
255
256   fiber->data_ = reinterpret_cast<intptr_t>(fiber);
257   readyFibers_.push_back(*fiber);
258   if (observer_) {
259     observer_->runnable(reinterpret_cast<uintptr_t>(fiber));
260   }
261
262   ensureLoopScheduled();
263 }
264
265 template <typename F>
266 auto FiberManager::addTaskFuture(F&& func)
267     -> folly::Future<typename std::result_of<F()>::type> {
268   using T = typename std::result_of<F()>::type;
269   folly::Promise<T> p;
270   auto f = p.getFuture();
271   addTaskFinally([func = std::forward<F>(func)]() mutable { return func(); },
272                  [p = std::move(p)](folly::Try<T> && t) mutable {
273                    p.setTry(std::move(t));
274                  });
275   return f;
276 }
277
278 template <typename F>
279 void FiberManager::addTaskRemote(F&& func) {
280   // addTaskRemote indirectly requires wrapping the function in a
281   // std::function, which must be copyable. As move-only lambdas may be
282   // passed in we wrap it first in a move wrapper and then capture the wrapped
283   // version.
284   auto functionWrapper = [f = folly::makeMoveWrapper(
285                               std::forward<F>(func))]() mutable {
286     return (*f)();
287   };
288   auto task = [&]() {
289     auto currentFm = getFiberManagerUnsafe();
290     if (currentFm &&
291         currentFm->currentFiber_ &&
292         currentFm->localType_ == localType_) {
293       return folly::make_unique<RemoteTask>(
294           std::move(functionWrapper), currentFm->currentFiber_->localData_);
295     }
296     return folly::make_unique<RemoteTask>(std::move(functionWrapper));
297   }();
298   auto insertHead =
299       [&]() { return remoteTaskQueue_.insertHead(task.release()); };
300   loopController_->scheduleThreadSafe(std::ref(insertHead));
301 }
302
303 template <typename F>
304 auto FiberManager::addTaskRemoteFuture(F&& func)
305     -> folly::Future<typename std::result_of<F()>::type> {
306   folly::Promise<typename std::result_of<F()>::type> p;
307   auto f = p.getFuture();
308   addTaskRemote(
309       [ p = std::move(p), func = std::forward<F>(func), this ]() mutable {
310         auto t = folly::makeTryWith(std::forward<F>(func));
311         runInMainContext([&]() { p.setTry(std::move(t)); });
312       });
313   return f;
314 }
315
316 template <typename X>
317 struct IsRvalueRefTry { static const bool value = false; };
318 template <typename T>
319 struct IsRvalueRefTry<folly::Try<T>&&> { static const bool value = true; };
320
321 // We need this to be in a struct, not inlined in addTaskFinally, because clang
322 // crashes otherwise.
323 template <typename F, typename G>
324 struct FiberManager::AddTaskFinallyHelper {
325   class Func;
326   class Finally;
327
328   typedef typename std::result_of<F()>::type Result;
329
330   static constexpr bool allocateInBuffer =
331     sizeof(Func) + sizeof(Finally) <= Fiber::kUserBufferSize;
332
333   class Finally {
334    public:
335     Finally(G&& finally,
336             FiberManager& fm) :
337         finally_(std::move(finally)),
338         fm_(fm) {
339     }
340
341     void operator()() {
342       try {
343         finally_(std::move(*result_));
344       } catch (...) {
345         fm_.exceptionCallback_(std::current_exception(),
346                                "running Finally functor");
347       }
348
349       if (allocateInBuffer) {
350         this->~Finally();
351       } else {
352         delete this;
353       }
354     }
355
356    private:
357     friend class Func;
358
359     G finally_;
360     folly::Optional<folly::Try<Result>> result_;
361     FiberManager& fm_;
362   };
363
364   class Func {
365    public:
366     Func(F&& func, Finally& finally) :
367         func_(std::move(func)), result_(finally.result_) {}
368
369     void operator()() {
370       result_ = folly::makeTryWith(std::move(func_));
371
372       if (allocateInBuffer) {
373         this->~Func();
374       } else {
375         delete this;
376       }
377     }
378
379    private:
380     F func_;
381     folly::Optional<folly::Try<Result>>& result_;
382   };
383 };
384
385 template <typename F, typename G>
386 void FiberManager::addTaskFinally(F&& func, G&& finally) {
387   typedef typename std::result_of<F()>::type Result;
388
389   static_assert(
390     IsRvalueRefTry<typename FirstArgOf<G>::type>::value,
391     "finally(arg): arg must be Try<T>&&");
392   static_assert(
393     std::is_convertible<
394       Result,
395       typename std::remove_reference<
396         typename FirstArgOf<G>::type
397       >::type::element_type
398     >::value,
399     "finally(Try<T>&&): T must be convertible from func()'s return type");
400
401   auto fiber = getFiber();
402   initLocalData(*fiber);
403
404   typedef AddTaskFinallyHelper<F,G> Helper;
405
406   if (Helper::allocateInBuffer) {
407     auto funcLoc = static_cast<typename Helper::Func*>(
408       fiber->getUserBuffer());
409     auto finallyLoc = static_cast<typename Helper::Finally*>(
410       static_cast<void*>(funcLoc + 1));
411
412     new (finallyLoc) typename Helper::Finally(std::move(finally), *this);
413     new (funcLoc) typename Helper::Func(std::move(func), *finallyLoc);
414
415     fiber->setFunctionFinally(std::ref(*funcLoc), std::ref(*finallyLoc));
416   } else {
417     auto finallyLoc = new typename Helper::Finally(std::move(finally), *this);
418     auto funcLoc = new typename Helper::Func(std::move(func), *finallyLoc);
419
420     fiber->setFunctionFinally(std::ref(*funcLoc), std::ref(*finallyLoc));
421   }
422
423   fiber->data_ = reinterpret_cast<intptr_t>(fiber);
424   readyFibers_.push_back(*fiber);
425   if (observer_) {
426     observer_->runnable(reinterpret_cast<uintptr_t>(fiber));
427   }
428
429   ensureLoopScheduled();
430 }
431
432 template <typename F>
433 typename std::result_of<F()>::type
434 FiberManager::runInMainContext(F&& func) {
435   if (UNLIKELY(activeFiber_ == nullptr)) {
436     return func();
437   }
438
439   typedef typename std::result_of<F()>::type Result;
440
441   folly::Try<Result> result;
442   auto f = [&func, &result]() mutable {
443     result = folly::makeTryWith(std::forward<F>(func));
444   };
445
446   immediateFunc_ = std::ref(f);
447   activeFiber_->preempt(Fiber::AWAITING_IMMEDIATE);
448
449   return std::move(result).value();
450 }
451
452 inline FiberManager& FiberManager::getFiberManager() {
453   assert(currentFiberManager_ != nullptr);
454   return *currentFiberManager_;
455 }
456
457 inline FiberManager* FiberManager::getFiberManagerUnsafe() {
458   return currentFiberManager_;
459 }
460
461 inline bool FiberManager::hasActiveFiber() const {
462   return activeFiber_ != nullptr;
463 }
464
465 inline void FiberManager::yield() {
466   assert(currentFiberManager_ == this);
467   assert(activeFiber_ != nullptr);
468   assert(activeFiber_->state_ == Fiber::RUNNING);
469   activeFiber_->preempt(Fiber::YIELDED);
470 }
471
472 template <typename T>
473 T& FiberManager::local() {
474   if (std::type_index(typeid(T)) == localType_ && currentFiber_) {
475     return currentFiber_->localData_.get<T>();
476   }
477   return localThread<T>();
478 }
479
480 template <typename T>
481 T& FiberManager::localThread() {
482 #ifndef __APPLE__
483   static thread_local T t;
484   return t;
485 #else // osx doesn't support thread_local
486   static ThreadLocal<T> t;
487   return *t;
488 #endif
489 }
490
491 inline void FiberManager::initLocalData(Fiber& fiber) {
492   auto fm = getFiberManagerUnsafe();
493   if (fm && fm->currentFiber_ && fm->localType_ == localType_) {
494     fiber.localData_ = fm->currentFiber_->localData_;
495   }
496   fiber.rcontext_ = RequestContext::saveContext();
497 }
498
499 template <typename LocalT>
500 FiberManager::FiberManager(
501   LocalType<LocalT>,
502   std::unique_ptr<LoopController> loopController__,
503   Options options)  :
504     loopController_(std::move(loopController__)),
505     stackAllocator_(options.useGuardPages),
506     options_(preprocessOptions(std::move(options))),
507     exceptionCallback_([](std::exception_ptr eptr, std::string context) {
508         try {
509           std::rethrow_exception(eptr);
510         } catch (const std::exception& e) {
511           LOG(DFATAL) << "Exception " << typeid(e).name()
512                       << " with message '" << e.what() << "' was thrown in "
513                       << "FiberManager with context '" << context << "'";
514           throw;
515         } catch (...) {
516           LOG(DFATAL) << "Unknown exception was thrown in FiberManager with "
517                       << "context '" << context << "'";
518           throw;
519         }
520       }),
521     timeoutManager_(std::make_shared<TimeoutController>(*loopController_)),
522     fibersPoolResizer_(*this),
523     localType_(typeid(LocalT)) {
524   loopController_->setFiberManager(this);
525 }
526
527 template <typename F>
528 typename FirstArgOf<F>::type::value_type
529 inline await(F&& func) {
530   typedef typename FirstArgOf<F>::type::value_type Result;
531
532   folly::Try<Result> result;
533
534   Baton baton;
535   baton.wait([&func, &result, &baton]() mutable {
536       func(Promise<Result>(result, baton));
537     });
538
539   return folly::moveFromTry(result);
540 }
541
542 }}