folly/futures: replace MoveWrappers with generalised lambda capture
[folly.git] / folly / experimental / fibers / FiberManager-inl.h
1 /*
2  * Copyright 2016 Facebook, Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *   http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 #pragma once
17
18 #include <cassert>
19
20 #include <folly/CPortability.h>
21 #include <folly/Memory.h>
22 #include <folly/MoveWrapper.h>
23 #include <folly/Optional.h>
24 #include <folly/Portability.h>
25 #include <folly/ScopeGuard.h>
26 #ifdef __APPLE__
27 #include <folly/ThreadLocal.h>
28 #endif
29 #include <folly/experimental/fibers/Baton.h>
30 #include <folly/experimental/fibers/Fiber.h>
31 #include <folly/experimental/fibers/LoopController.h>
32 #include <folly/experimental/fibers/Promise.h>
33 #include <folly/futures/Promise.h>
34 #include <folly/futures/Try.h>
35
36 namespace folly { namespace fibers {
37
38 namespace {
39
40 inline FiberManager::Options preprocessOptions(FiberManager::Options opts) {
41 #ifdef FOLLY_SANITIZE_ADDRESS
42   /* ASAN needs a lot of extra stack space.
43      16x is a conservative estimate, 8x also worked with tests
44      where it mattered.  Note that overallocating here does not necessarily
45      increase RSS, since unused memory is pretty much free. */
46   opts.stackSize *= 16;
47 #endif
48   return opts;
49 }
50
51 }  // anonymous
52
53 inline void FiberManager::ensureLoopScheduled() {
54   if (isLoopScheduled_) {
55     return;
56   }
57
58   isLoopScheduled_ = true;
59   loopController_->schedule();
60 }
61
62 inline intptr_t FiberManager::activateFiber(Fiber* fiber) {
63   DCHECK_EQ(activeFiber_, (Fiber*)nullptr);
64
65 #ifdef FOLLY_SANITIZE_ADDRESS
66   registerFiberActivationWithAsan(fiber);
67 #endif
68
69   activeFiber_ = fiber;
70   return jumpContext(&mainContext_, &fiber->fcontext_, fiber->data_);
71 }
72
73 inline intptr_t FiberManager::deactivateFiber(Fiber* fiber) {
74   DCHECK_EQ(activeFiber_, fiber);
75
76 #ifdef FOLLY_SANITIZE_ADDRESS
77   registerFiberDeactivationWithAsan(fiber);
78 #endif
79
80   activeFiber_ = nullptr;
81   return jumpContext(&fiber->fcontext_, &mainContext_, 0);
82 }
83
84 inline void FiberManager::runReadyFiber(Fiber* fiber) {
85   SCOPE_EXIT {
86     assert(currentFiber_ == nullptr);
87     assert(activeFiber_ == nullptr);
88   };
89
90   assert(fiber->state_ == Fiber::NOT_STARTED ||
91          fiber->state_ == Fiber::READY_TO_RUN);
92   currentFiber_ = fiber;
93   fiber->rcontext_ = RequestContext::setContext(std::move(fiber->rcontext_));
94   if (observer_) {
95     observer_->starting(reinterpret_cast<uintptr_t>(fiber));
96   }
97
98   while (fiber->state_ == Fiber::NOT_STARTED ||
99          fiber->state_ == Fiber::READY_TO_RUN) {
100     activateFiber(fiber);
101     if (fiber->state_ == Fiber::AWAITING_IMMEDIATE) {
102       try {
103         immediateFunc_();
104       } catch (...) {
105         exceptionCallback_(std::current_exception(), "running immediateFunc_");
106       }
107       immediateFunc_ = nullptr;
108       fiber->state_ = Fiber::READY_TO_RUN;
109     }
110   }
111
112   if (fiber->state_ == Fiber::AWAITING) {
113     awaitFunc_(*fiber);
114     awaitFunc_ = nullptr;
115     if (observer_) {
116       observer_->stopped(reinterpret_cast<uintptr_t>(fiber));
117     }
118     currentFiber_ = nullptr;
119     fiber->rcontext_ = RequestContext::setContext(std::move(fiber->rcontext_));
120   } else if (fiber->state_ == Fiber::INVALID) {
121     assert(fibersActive_ > 0);
122     --fibersActive_;
123     // Making sure that task functor is deleted once task is complete.
124     // NOTE: we must do it on main context, as the fiber is not
125     // running at this point.
126     fiber->func_ = nullptr;
127     fiber->resultFunc_ = nullptr;
128     if (fiber->finallyFunc_) {
129       try {
130         fiber->finallyFunc_();
131       } catch (...) {
132         exceptionCallback_(std::current_exception(), "running finallyFunc_");
133       }
134       fiber->finallyFunc_ = nullptr;
135     }
136     // Make sure LocalData is not accessible from its destructor
137     if (observer_) {
138       observer_->stopped(reinterpret_cast<uintptr_t>(fiber));
139     }
140     currentFiber_ = nullptr;
141     fiber->rcontext_ = RequestContext::setContext(std::move(fiber->rcontext_));
142     fiber->localData_.reset();
143     fiber->rcontext_.reset();
144
145     if (fibersPoolSize_ < options_.maxFibersPoolSize ||
146         options_.fibersPoolResizePeriodMs > 0) {
147       fibersPool_.push_front(*fiber);
148       ++fibersPoolSize_;
149     } else {
150       delete fiber;
151       assert(fibersAllocated_ > 0);
152       --fibersAllocated_;
153     }
154   } else if (fiber->state_ == Fiber::YIELDED) {
155     if (observer_) {
156       observer_->stopped(reinterpret_cast<uintptr_t>(fiber));
157     }
158     currentFiber_ = nullptr;
159     fiber->rcontext_ = RequestContext::setContext(std::move(fiber->rcontext_));
160     fiber->state_ = Fiber::READY_TO_RUN;
161     yieldedFibers_.push_back(*fiber);
162   }
163 }
164
165 inline bool FiberManager::loopUntilNoReady() {
166   // Support nested FiberManagers
167   auto originalFiberManager = this;
168   std::swap(currentFiberManager_, originalFiberManager);
169
170   SCOPE_EXIT {
171     isLoopScheduled_ = false;
172     if (!readyFibers_.empty()) {
173       ensureLoopScheduled();
174     }
175     std::swap(currentFiberManager_, originalFiberManager);
176     CHECK_EQ(this, originalFiberManager);
177   };
178
179   bool hadRemoteFiber = true;
180   while (hadRemoteFiber) {
181     hadRemoteFiber = false;
182
183     while (!readyFibers_.empty()) {
184       auto& fiber = readyFibers_.front();
185       readyFibers_.pop_front();
186       runReadyFiber(&fiber);
187     }
188
189     remoteReadyQueue_.sweep(
190       [this, &hadRemoteFiber] (Fiber* fiber) {
191         runReadyFiber(fiber);
192         hadRemoteFiber = true;
193       }
194     );
195
196     remoteTaskQueue_.sweep(
197       [this, &hadRemoteFiber] (RemoteTask* taskPtr) {
198         std::unique_ptr<RemoteTask> task(taskPtr);
199         auto fiber = getFiber();
200         if (task->localData) {
201           fiber->localData_ = *task->localData;
202         }
203         fiber->rcontext_ = std::move(task->rcontext);
204
205         fiber->setFunction(std::move(task->func));
206         fiber->data_ = reinterpret_cast<intptr_t>(fiber);
207         if (observer_) {
208           observer_->runnable(reinterpret_cast<uintptr_t>(fiber));
209         }
210         runReadyFiber(fiber);
211         hadRemoteFiber = true;
212       }
213     );
214   }
215
216   if (observer_) {
217     for (auto& yielded : yieldedFibers_) {
218       observer_->runnable(reinterpret_cast<uintptr_t>(&yielded));
219     }
220   }
221   readyFibers_.splice(readyFibers_.end(), yieldedFibers_);
222
223   return fibersActive_ > 0;
224 }
225
226 // We need this to be in a struct, not inlined in addTask, because clang crashes
227 // otherwise.
228 template <typename F>
229 struct FiberManager::AddTaskHelper {
230   class Func;
231
232   static constexpr bool allocateInBuffer =
233     sizeof(Func) <= Fiber::kUserBufferSize;
234
235   class Func {
236    public:
237     Func(F&& func, FiberManager& fm) :
238         func_(std::forward<F>(func)), fm_(fm) {}
239
240     void operator()() {
241       try {
242         func_();
243       } catch (...) {
244         fm_.exceptionCallback_(std::current_exception(),
245                                "running Func functor");
246       }
247       if (allocateInBuffer) {
248         this->~Func();
249       } else {
250         delete this;
251       }
252     }
253
254    private:
255     F func_;
256     FiberManager& fm_;
257   };
258 };
259
260 template <typename F>
261 void FiberManager::addTask(F&& func) {
262   typedef AddTaskHelper<F> Helper;
263
264   auto fiber = getFiber();
265   initLocalData(*fiber);
266
267   if (Helper::allocateInBuffer) {
268     auto funcLoc = static_cast<typename Helper::Func*>(fiber->getUserBuffer());
269     new (funcLoc) typename Helper::Func(std::forward<F>(func), *this);
270
271     fiber->setFunction(std::ref(*funcLoc));
272   } else {
273     auto funcLoc = new typename Helper::Func(std::forward<F>(func), *this);
274
275     fiber->setFunction(std::ref(*funcLoc));
276   }
277
278   fiber->data_ = reinterpret_cast<intptr_t>(fiber);
279   readyFibers_.push_back(*fiber);
280   if (observer_) {
281     observer_->runnable(reinterpret_cast<uintptr_t>(fiber));
282   }
283
284   ensureLoopScheduled();
285 }
286
287 template <typename F>
288 auto FiberManager::addTaskFuture(F&& func)
289     -> folly::Future<typename std::result_of<F()>::type> {
290   using T = typename std::result_of<F()>::type;
291   folly::Promise<T> p;
292   auto f = p.getFuture();
293   addTaskFinally([func = std::forward<F>(func)]() mutable { return func(); },
294                  [p = std::move(p)](folly::Try<T> && t) mutable {
295                    p.setTry(std::move(t));
296                  });
297   return f;
298 }
299
300 template <typename F>
301 void FiberManager::addTaskRemote(F&& func) {
302   // addTaskRemote indirectly requires wrapping the function in a
303   // std::function, which must be copyable. As move-only lambdas may be
304   // passed in we wrap it first in a move wrapper and then capture the wrapped
305   // version.
306   auto functionWrapper = [f = folly::makeMoveWrapper(
307                               std::forward<F>(func))]() mutable {
308     return (*f)();
309   };
310   auto task = [&]() {
311     auto currentFm = getFiberManagerUnsafe();
312     if (currentFm &&
313         currentFm->currentFiber_ &&
314         currentFm->localType_ == localType_) {
315       return folly::make_unique<RemoteTask>(
316           std::move(functionWrapper), currentFm->currentFiber_->localData_);
317     }
318     return folly::make_unique<RemoteTask>(std::move(functionWrapper));
319   }();
320   auto insertHead =
321       [&]() { return remoteTaskQueue_.insertHead(task.release()); };
322   loopController_->scheduleThreadSafe(std::ref(insertHead));
323 }
324
325 template <typename F>
326 auto FiberManager::addTaskRemoteFuture(F&& func)
327     -> folly::Future<typename std::result_of<F()>::type> {
328   folly::Promise<typename std::result_of<F()>::type> p;
329   auto f = p.getFuture();
330   addTaskRemote(
331       [ p = std::move(p), func = std::forward<F>(func), this ]() mutable {
332         auto t = folly::makeTryWith(std::forward<F>(func));
333         runInMainContext([&]() { p.setTry(std::move(t)); });
334       });
335   return f;
336 }
337
338 template <typename X>
339 struct IsRvalueRefTry { static const bool value = false; };
340 template <typename T>
341 struct IsRvalueRefTry<folly::Try<T>&&> { static const bool value = true; };
342
343 // We need this to be in a struct, not inlined in addTaskFinally, because clang
344 // crashes otherwise.
345 template <typename F, typename G>
346 struct FiberManager::AddTaskFinallyHelper {
347   class Func;
348   class Finally;
349
350   typedef typename std::result_of<F()>::type Result;
351
352   static constexpr bool allocateInBuffer =
353     sizeof(Func) + sizeof(Finally) <= Fiber::kUserBufferSize;
354
355   class Finally {
356    public:
357     Finally(G&& finally,
358             FiberManager& fm) :
359         finally_(std::forward<G>(finally)),
360         fm_(fm) {
361     }
362
363     void operator()() {
364       try {
365         finally_(std::move(*result_));
366       } catch (...) {
367         fm_.exceptionCallback_(std::current_exception(),
368                                "running Finally functor");
369       }
370
371       if (allocateInBuffer) {
372         this->~Finally();
373       } else {
374         delete this;
375       }
376     }
377
378    private:
379     friend class Func;
380
381     G finally_;
382     folly::Optional<folly::Try<Result>> result_;
383     FiberManager& fm_;
384   };
385
386   class Func {
387    public:
388     Func(F&& func, Finally& finally) :
389         func_(std::move(func)), result_(finally.result_) {}
390
391     void operator()() {
392       result_ = folly::makeTryWith(std::move(func_));
393
394       if (allocateInBuffer) {
395         this->~Func();
396       } else {
397         delete this;
398       }
399     }
400
401    private:
402     F func_;
403     folly::Optional<folly::Try<Result>>& result_;
404   };
405 };
406
407 template <typename F, typename G>
408 void FiberManager::addTaskFinally(F&& func, G&& finally) {
409   typedef typename std::result_of<F()>::type Result;
410
411   static_assert(
412     IsRvalueRefTry<typename FirstArgOf<G>::type>::value,
413     "finally(arg): arg must be Try<T>&&");
414   static_assert(
415     std::is_convertible<
416       Result,
417       typename std::remove_reference<
418         typename FirstArgOf<G>::type
419       >::type::element_type
420     >::value,
421     "finally(Try<T>&&): T must be convertible from func()'s return type");
422
423   auto fiber = getFiber();
424   initLocalData(*fiber);
425
426   typedef AddTaskFinallyHelper<F,G> Helper;
427
428   if (Helper::allocateInBuffer) {
429     auto funcLoc = static_cast<typename Helper::Func*>(
430       fiber->getUserBuffer());
431     auto finallyLoc = static_cast<typename Helper::Finally*>(
432       static_cast<void*>(funcLoc + 1));
433
434     new (finallyLoc) typename Helper::Finally(std::forward<G>(finally), *this);
435     new (funcLoc) typename Helper::Func(std::move(func), *finallyLoc);
436
437     fiber->setFunctionFinally(std::ref(*funcLoc), std::ref(*finallyLoc));
438   } else {
439     auto finallyLoc =
440         new typename Helper::Finally(std::forward<G>(finally), *this);
441     auto funcLoc = new typename Helper::Func(std::move(func), *finallyLoc);
442
443     fiber->setFunctionFinally(std::ref(*funcLoc), std::ref(*finallyLoc));
444   }
445
446   fiber->data_ = reinterpret_cast<intptr_t>(fiber);
447   readyFibers_.push_back(*fiber);
448   if (observer_) {
449     observer_->runnable(reinterpret_cast<uintptr_t>(fiber));
450   }
451
452   ensureLoopScheduled();
453 }
454
455 template <typename F>
456 typename std::result_of<F()>::type
457 FiberManager::runInMainContext(F&& func) {
458   if (UNLIKELY(activeFiber_ == nullptr)) {
459     return func();
460   }
461
462   typedef typename std::result_of<F()>::type Result;
463
464   folly::Try<Result> result;
465   auto f = [&func, &result]() mutable {
466     result = folly::makeTryWith(std::forward<F>(func));
467   };
468
469   immediateFunc_ = std::ref(f);
470   activeFiber_->preempt(Fiber::AWAITING_IMMEDIATE);
471
472   return std::move(result).value();
473 }
474
475 inline FiberManager& FiberManager::getFiberManager() {
476   assert(currentFiberManager_ != nullptr);
477   return *currentFiberManager_;
478 }
479
480 inline FiberManager* FiberManager::getFiberManagerUnsafe() {
481   return currentFiberManager_;
482 }
483
484 inline bool FiberManager::hasActiveFiber() const {
485   return activeFiber_ != nullptr;
486 }
487
488 inline void FiberManager::yield() {
489   assert(currentFiberManager_ == this);
490   assert(activeFiber_ != nullptr);
491   assert(activeFiber_->state_ == Fiber::RUNNING);
492   activeFiber_->preempt(Fiber::YIELDED);
493 }
494
495 template <typename T>
496 T& FiberManager::local() {
497   if (std::type_index(typeid(T)) == localType_ && currentFiber_) {
498     return currentFiber_->localData_.get<T>();
499   }
500   return localThread<T>();
501 }
502
503 template <typename T>
504 T& FiberManager::localThread() {
505 #ifndef __APPLE__
506   static thread_local T t;
507   return t;
508 #else // osx doesn't support thread_local
509   static ThreadLocal<T> t;
510   return *t;
511 #endif
512 }
513
514 inline void FiberManager::initLocalData(Fiber& fiber) {
515   auto fm = getFiberManagerUnsafe();
516   if (fm && fm->currentFiber_ && fm->localType_ == localType_) {
517     fiber.localData_ = fm->currentFiber_->localData_;
518   }
519   fiber.rcontext_ = RequestContext::saveContext();
520 }
521
522 template <typename LocalT>
523 FiberManager::FiberManager(
524   LocalType<LocalT>,
525   std::unique_ptr<LoopController> loopController__,
526   Options options)  :
527     loopController_(std::move(loopController__)),
528     stackAllocator_(options.useGuardPages),
529     options_(preprocessOptions(std::move(options))),
530     exceptionCallback_([](std::exception_ptr eptr, std::string context) {
531         try {
532           std::rethrow_exception(eptr);
533         } catch (const std::exception& e) {
534           LOG(DFATAL) << "Exception " << typeid(e).name()
535                       << " with message '" << e.what() << "' was thrown in "
536                       << "FiberManager with context '" << context << "'";
537           throw;
538         } catch (...) {
539           LOG(DFATAL) << "Unknown exception was thrown in FiberManager with "
540                       << "context '" << context << "'";
541           throw;
542         }
543       }),
544     timeoutManager_(std::make_shared<TimeoutController>(*loopController_)),
545     fibersPoolResizer_(*this),
546     localType_(typeid(LocalT)) {
547   loopController_->setFiberManager(this);
548 }
549
550 template <typename F>
551 typename FirstArgOf<F>::type::value_type
552 inline await(F&& func) {
553   typedef typename FirstArgOf<F>::type::value_type Result;
554
555   folly::Try<Result> result;
556
557   Baton baton;
558   baton.wait([&func, &result, &baton]() mutable {
559       func(Promise<Result>(result, baton));
560     });
561
562   return folly::moveFromTry(result);
563 }
564
565 }}