e0c7e5787c6b006ad241a47708ef0dcbac6d6582
[folly.git] / folly / experimental / fibers / FiberManager-inl.h
1 /*
2  * Copyright 2015 Facebook, Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *   http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 #pragma once
17
18 #include <cassert>
19
20 #include <folly/CPortability.h>
21 #include <folly/Memory.h>
22 #include <folly/Optional.h>
23 #include <folly/Portability.h>
24 #include <folly/ScopeGuard.h>
25 #ifdef __APPLE__
26 #include <folly/ThreadLocal.h>
27 #endif
28 #include <folly/experimental/fibers/Baton.h>
29 #include <folly/experimental/fibers/Fiber.h>
30 #include <folly/experimental/fibers/LoopController.h>
31 #include <folly/experimental/fibers/Promise.h>
32 #include <folly/futures/Try.h>
33
34 namespace folly { namespace fibers {
35
36 namespace {
37
38 inline FiberManager::Options preprocessOptions(FiberManager::Options opts) {
39 #ifdef FOLLY_SANITIZE_ADDRESS
40   /* ASAN needs a lot of extra stack space.
41      16x is a conservative estimate, 8x also worked with tests
42      where it mattered.  Note that overallocating here does not necessarily
43      increase RSS, since unused memory is pretty much free. */
44   opts.stackSize *= 16;
45 #endif
46   return opts;
47 }
48
49 }  // anonymous
50
51 inline void FiberManager::ensureLoopScheduled() {
52   if (isLoopScheduled_) {
53     return;
54   }
55
56   isLoopScheduled_ = true;
57   loopController_->schedule();
58 }
59
60 inline void FiberManager::runReadyFiber(Fiber* fiber) {
61   SCOPE_EXIT {
62     assert(currentFiber_ == nullptr);
63     assert(activeFiber_ == nullptr);
64   };
65
66   assert(fiber->state_ == Fiber::NOT_STARTED ||
67          fiber->state_ == Fiber::READY_TO_RUN);
68   currentFiber_ = fiber;
69   if (observer_) {
70     observer_->starting(reinterpret_cast<uintptr_t>(fiber));
71   }
72
73   while (fiber->state_ == Fiber::NOT_STARTED ||
74          fiber->state_ == Fiber::READY_TO_RUN) {
75     activeFiber_ = fiber;
76     jumpContext(&mainContext_, &fiber->fcontext_, fiber->data_);
77     if (fiber->state_ == Fiber::AWAITING_IMMEDIATE) {
78       try {
79         immediateFunc_();
80       } catch (...) {
81         exceptionCallback_(std::current_exception(), "running immediateFunc_");
82       }
83       immediateFunc_ = nullptr;
84       fiber->state_ = Fiber::READY_TO_RUN;
85     }
86   }
87
88   if (fiber->state_ == Fiber::AWAITING) {
89     awaitFunc_(*fiber);
90     awaitFunc_ = nullptr;
91     if (observer_) {
92       observer_->stopped(reinterpret_cast<uintptr_t>(fiber));
93     }
94     currentFiber_ = nullptr;
95   } else if (fiber->state_ == Fiber::INVALID) {
96     assert(fibersActive_ > 0);
97     --fibersActive_;
98     // Making sure that task functor is deleted once task is complete.
99     // NOTE: we must do it on main context, as the fiber is not
100     // running at this point.
101     fiber->func_ = nullptr;
102     fiber->resultFunc_ = nullptr;
103     if (fiber->finallyFunc_) {
104       try {
105         fiber->finallyFunc_();
106       } catch (...) {
107         exceptionCallback_(std::current_exception(), "running finallyFunc_");
108       }
109       fiber->finallyFunc_ = nullptr;
110     }
111     // Make sure LocalData is not accessible from its destructor
112     if (observer_) {
113       observer_->stopped(reinterpret_cast<uintptr_t>(fiber));
114     }
115     currentFiber_ = nullptr;
116     fiber->localData_.reset();
117
118     if (fibersPoolSize_ < options_.maxFibersPoolSize) {
119       fibersPool_.push_front(*fiber);
120       ++fibersPoolSize_;
121     } else {
122       delete fiber;
123       assert(fibersAllocated_ > 0);
124       --fibersAllocated_;
125     }
126   } else if (fiber->state_ == Fiber::YIELDED) {
127     if (observer_) {
128       observer_->stopped(reinterpret_cast<uintptr_t>(fiber));
129     }
130     currentFiber_ = nullptr;
131     fiber->state_ = Fiber::READY_TO_RUN;
132     yieldedFibers_.push_back(*fiber);
133   }
134 }
135
136 inline bool FiberManager::loopUntilNoReady() {
137   SCOPE_EXIT {
138     isLoopScheduled_ = false;
139     if (!readyFibers_.empty()) {
140       ensureLoopScheduled();
141     }
142     currentFiberManager_ = nullptr;
143   };
144
145   currentFiberManager_ = this;
146
147   bool hadRemoteFiber = true;
148   while (hadRemoteFiber) {
149     hadRemoteFiber = false;
150
151     while (!readyFibers_.empty()) {
152       auto& fiber = readyFibers_.front();
153       readyFibers_.pop_front();
154       runReadyFiber(&fiber);
155     }
156
157     remoteReadyQueue_.sweep(
158       [this, &hadRemoteFiber] (Fiber* fiber) {
159         runReadyFiber(fiber);
160         hadRemoteFiber = true;
161       }
162     );
163
164     remoteTaskQueue_.sweep(
165       [this, &hadRemoteFiber] (RemoteTask* taskPtr) {
166         std::unique_ptr<RemoteTask> task(taskPtr);
167         auto fiber = getFiber();
168         if (task->localData) {
169           fiber->localData_ = *task->localData;
170         }
171
172         fiber->setFunction(std::move(task->func));
173         fiber->data_ = reinterpret_cast<intptr_t>(fiber);
174         if (observer_) {
175           observer_->runnable(reinterpret_cast<uintptr_t>(fiber));
176         }
177         runReadyFiber(fiber);
178         hadRemoteFiber = true;
179       }
180     );
181   }
182
183   if (observer_) {
184     for (auto& yielded : yieldedFibers_) {
185       observer_->runnable(reinterpret_cast<uintptr_t>(&yielded));
186     }
187   }
188   readyFibers_.splice(readyFibers_.end(), yieldedFibers_);
189
190   return fibersActive_ > 0;
191 }
192
193 // We need this to be in a struct, not inlined in addTask, because clang crashes
194 // otherwise.
195 template <typename F>
196 struct FiberManager::AddTaskHelper {
197   class Func;
198
199   static constexpr bool allocateInBuffer =
200     sizeof(Func) <= Fiber::kUserBufferSize;
201
202   class Func {
203    public:
204     Func(F&& func, FiberManager& fm) :
205         func_(std::forward<F>(func)), fm_(fm) {}
206
207     void operator()() {
208       try {
209         func_();
210       } catch (...) {
211         fm_.exceptionCallback_(std::current_exception(),
212                                "running Func functor");
213       }
214       if (allocateInBuffer) {
215         this->~Func();
216       } else {
217         delete this;
218       }
219     }
220
221    private:
222     F func_;
223     FiberManager& fm_;
224   };
225 };
226
227 template <typename F>
228 void FiberManager::addTask(F&& func) {
229   typedef AddTaskHelper<F> Helper;
230
231   auto fiber = getFiber();
232   initLocalData(*fiber);
233
234   if (Helper::allocateInBuffer) {
235     auto funcLoc = static_cast<typename Helper::Func*>(fiber->getUserBuffer());
236     new (funcLoc) typename Helper::Func(std::forward<F>(func), *this);
237
238     fiber->setFunction(std::ref(*funcLoc));
239   } else {
240     auto funcLoc = new typename Helper::Func(std::forward<F>(func), *this);
241
242     fiber->setFunction(std::ref(*funcLoc));
243   }
244
245   fiber->data_ = reinterpret_cast<intptr_t>(fiber);
246   readyFibers_.push_back(*fiber);
247   if (observer_) {
248     observer_->runnable(reinterpret_cast<uintptr_t>(fiber));
249   }
250
251   ensureLoopScheduled();
252 }
253
254 template <typename F>
255 void FiberManager::addTaskRemote(F&& func) {
256   auto task = [&]() {
257     auto currentFm = getFiberManagerUnsafe();
258     if (currentFm &&
259         currentFm->currentFiber_ &&
260         currentFm->localType_ == localType_) {
261       return folly::make_unique<RemoteTask>(
262         std::forward<F>(func),
263         currentFm->currentFiber_->localData_);
264     }
265     return folly::make_unique<RemoteTask>(std::forward<F>(func));
266   }();
267   if (remoteTaskQueue_.insertHead(task.release())) {
268     loopController_->scheduleThreadSafe();
269   }
270 }
271
272 template <typename X>
273 struct IsRvalueRefTry { static const bool value = false; };
274 template <typename T>
275 struct IsRvalueRefTry<folly::Try<T>&&> { static const bool value = true; };
276
277 // We need this to be in a struct, not inlined in addTaskFinally, because clang
278 // crashes otherwise.
279 template <typename F, typename G>
280 struct FiberManager::AddTaskFinallyHelper {
281   class Func;
282   class Finally;
283
284   typedef typename std::result_of<F()>::type Result;
285
286   static constexpr bool allocateInBuffer =
287     sizeof(Func) + sizeof(Finally) <= Fiber::kUserBufferSize;
288
289   class Finally {
290    public:
291     Finally(G&& finally,
292             FiberManager& fm) :
293         finally_(std::move(finally)),
294         fm_(fm) {
295     }
296
297     void operator()() {
298       try {
299         finally_(std::move(*result_));
300       } catch (...) {
301         fm_.exceptionCallback_(std::current_exception(),
302                                "running Finally functor");
303       }
304
305       if (allocateInBuffer) {
306         this->~Finally();
307       } else {
308         delete this;
309       }
310     }
311
312    private:
313     friend class Func;
314
315     G finally_;
316     folly::Optional<folly::Try<Result>> result_;
317     FiberManager& fm_;
318   };
319
320   class Func {
321    public:
322     Func(F&& func, Finally& finally) :
323         func_(std::move(func)), result_(finally.result_) {}
324
325     void operator()() {
326       result_ = folly::makeTryWith(std::move(func_));
327
328       if (allocateInBuffer) {
329         this->~Func();
330       } else {
331         delete this;
332       }
333     }
334
335    private:
336     F func_;
337     folly::Optional<folly::Try<Result>>& result_;
338   };
339 };
340
341 template <typename F, typename G>
342 void FiberManager::addTaskFinally(F&& func, G&& finally) {
343   typedef typename std::result_of<F()>::type Result;
344
345   static_assert(
346     IsRvalueRefTry<typename FirstArgOf<G>::type>::value,
347     "finally(arg): arg must be Try<T>&&");
348   static_assert(
349     std::is_convertible<
350       Result,
351       typename std::remove_reference<
352         typename FirstArgOf<G>::type
353       >::type::element_type
354     >::value,
355     "finally(Try<T>&&): T must be convertible from func()'s return type");
356
357   auto fiber = getFiber();
358   initLocalData(*fiber);
359
360   typedef AddTaskFinallyHelper<F,G> Helper;
361
362   if (Helper::allocateInBuffer) {
363     auto funcLoc = static_cast<typename Helper::Func*>(
364       fiber->getUserBuffer());
365     auto finallyLoc = static_cast<typename Helper::Finally*>(
366       static_cast<void*>(funcLoc + 1));
367
368     new (finallyLoc) typename Helper::Finally(std::move(finally), *this);
369     new (funcLoc) typename Helper::Func(std::move(func), *finallyLoc);
370
371     fiber->setFunctionFinally(std::ref(*funcLoc), std::ref(*finallyLoc));
372   } else {
373     auto finallyLoc = new typename Helper::Finally(std::move(finally), *this);
374     auto funcLoc = new typename Helper::Func(std::move(func), *finallyLoc);
375
376     fiber->setFunctionFinally(std::ref(*funcLoc), std::ref(*finallyLoc));
377   }
378
379   fiber->data_ = reinterpret_cast<intptr_t>(fiber);
380   readyFibers_.push_back(*fiber);
381   if (observer_) {
382     observer_->runnable(reinterpret_cast<uintptr_t>(fiber));
383   }
384
385   ensureLoopScheduled();
386 }
387
388 template <typename F>
389 typename std::result_of<F()>::type
390 FiberManager::runInMainContext(F&& func) {
391   return runInMainContextHelper(std::forward<F>(func));
392 }
393
394 template <typename F>
395 inline typename std::enable_if<
396   !std::is_same<typename std::result_of<F()>::type, void>::value,
397   typename std::result_of<F()>::type>::type
398 FiberManager::runInMainContextHelper(F&& func) {
399   if (UNLIKELY(activeFiber_ == nullptr)) {
400     return func();
401   }
402
403   typedef typename std::result_of<F()>::type Result;
404
405   folly::Try<Result> result;
406   auto f = [&func, &result]() mutable {
407     result = folly::makeTryWith(std::forward<F>(func));
408   };
409
410   immediateFunc_ = std::ref(f);
411   activeFiber_->preempt(Fiber::AWAITING_IMMEDIATE);
412
413   return std::move(result.value());
414 }
415
416 template <typename F>
417 inline typename std::enable_if<
418   std::is_same<typename std::result_of<F()>::type, void>::value,
419   void>::type
420 FiberManager::runInMainContextHelper(F&& func) {
421   if (UNLIKELY(activeFiber_ == nullptr)) {
422     func();
423     return;
424   }
425
426   immediateFunc_ = std::ref(func);
427   activeFiber_->preempt(Fiber::AWAITING_IMMEDIATE);
428 }
429
430 inline FiberManager& FiberManager::getFiberManager() {
431   assert(currentFiberManager_ != nullptr);
432   return *currentFiberManager_;
433 }
434
435 inline FiberManager* FiberManager::getFiberManagerUnsafe() {
436   return currentFiberManager_;
437 }
438
439 inline bool FiberManager::hasActiveFiber() const {
440   return activeFiber_ != nullptr;
441 }
442
443 inline void FiberManager::yield() {
444   assert(currentFiberManager_ == this);
445   assert(activeFiber_ != nullptr);
446   assert(activeFiber_->state_ == Fiber::RUNNING);
447   activeFiber_->preempt(Fiber::YIELDED);
448 }
449
450 template <typename T>
451 T& FiberManager::local() {
452   if (std::type_index(typeid(T)) == localType_ && currentFiber_) {
453     return currentFiber_->localData_.get<T>();
454   }
455   return localThread<T>();
456 }
457
458 template <typename T>
459 T& FiberManager::localThread() {
460 #ifndef __APPLE__
461   static thread_local T t;
462   return t;
463 #else // osx doesn't support thread_local
464   static ThreadLocal<T> t;
465   return *t;
466 #endif
467 }
468
469 inline void FiberManager::initLocalData(Fiber& fiber) {
470   auto fm = getFiberManagerUnsafe();
471   if (fm && fm->currentFiber_ && fm->localType_ == localType_) {
472     fiber.localData_ = fm->currentFiber_->localData_;
473   }
474 }
475
476 template <typename LocalT>
477 FiberManager::FiberManager(
478   LocalType<LocalT>,
479   std::unique_ptr<LoopController> loopController__,
480   Options options)  :
481     loopController_(std::move(loopController__)),
482     options_(preprocessOptions(std::move(options))),
483     exceptionCallback_([](std::exception_ptr eptr, std::string context) {
484         try {
485           std::rethrow_exception(eptr);
486         } catch (const std::exception& e) {
487           LOG(DFATAL) << "Exception " << typeid(e).name()
488                       << " with message '" << e.what() << "' was thrown in "
489                       << "FiberManager with context '" << context << "'";
490           throw;
491         } catch (...) {
492           LOG(DFATAL) << "Unknown exception was thrown in FiberManager with "
493                       << "context '" << context << "'";
494           throw;
495         }
496       }),
497     timeoutManager_(std::make_shared<TimeoutController>(*loopController_)),
498     localType_(typeid(LocalT)) {
499   loopController_->setFiberManager(this);
500 }
501
502 template <typename F>
503 typename FirstArgOf<F>::type::value_type
504 inline await(F&& func) {
505   typedef typename FirstArgOf<F>::type::value_type Result;
506
507   folly::Try<Result> result;
508
509   Baton baton;
510   baton.wait([&func, &result, &baton]() mutable {
511       func(Promise<Result>(result, baton));
512     });
513
514   return folly::moveFromTry(std::move(result));
515 }
516
517 }}