Increase fibers stack size if we're running with ASAN
[folly.git] / folly / experimental / fibers / FiberManager-inl.h
1 /*
2  * Copyright 2015 Facebook, Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *   http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 #pragma once
17
18 #include <cassert>
19
20 #include <folly/CPortability.h>
21 #include <folly/experimental/fibers/Baton.h>
22 #include <folly/experimental/fibers/Fiber.h>
23 #include <folly/experimental/fibers/LoopController.h>
24 #include <folly/experimental/fibers/Promise.h>
25 #include <folly/futures/Try.h>
26 #include <folly/Memory.h>
27 #include <folly/Optional.h>
28 #include <folly/Portability.h>
29 #include <folly/ScopeGuard.h>
30
31 namespace folly { namespace fibers {
32
33 namespace {
34
35 inline FiberManager::Options preprocessOptions(FiberManager::Options opts) {
36 #ifdef FOLLY_SANITIZE_ADDRESS
37   /* ASAN needs a lot of extra stack space.
38      16x is a conservative estimate, 8x also worked with tests
39      where it mattered.  Note that overallocating here does not necessarily
40      increase RSS, since unused memory is pretty much free. */
41   opts.stackSize *= 16;
42 #endif
43   return opts;
44 }
45
46 }  // anonymous
47
48 inline void FiberManager::ensureLoopScheduled() {
49   if (isLoopScheduled_) {
50     return;
51   }
52
53   isLoopScheduled_ = true;
54   loopController_->schedule();
55 }
56
57 inline void FiberManager::runReadyFiber(Fiber* fiber) {
58   SCOPE_EXIT {
59     assert(currentFiber_ == nullptr);
60     assert(activeFiber_ == nullptr);
61   };
62
63   assert(fiber->state_ == Fiber::NOT_STARTED ||
64          fiber->state_ == Fiber::READY_TO_RUN);
65   currentFiber_ = fiber;
66   if (observer_) {
67     observer_->starting();
68   }
69
70   while (fiber->state_ == Fiber::NOT_STARTED ||
71          fiber->state_ == Fiber::READY_TO_RUN) {
72     activeFiber_ = fiber;
73     jumpContext(&mainContext_, &fiber->fcontext_, fiber->data_);
74     if (fiber->state_ == Fiber::AWAITING_IMMEDIATE) {
75       try {
76         immediateFunc_();
77       } catch (...) {
78         exceptionCallback_(std::current_exception(), "running immediateFunc_");
79       }
80       immediateFunc_ = nullptr;
81       fiber->state_ = Fiber::READY_TO_RUN;
82     }
83   }
84
85   if (fiber->state_ == Fiber::AWAITING) {
86     awaitFunc_(*fiber);
87     awaitFunc_ = nullptr;
88     if (observer_) {
89       observer_->stopped();
90     }
91     currentFiber_ = nullptr;
92   } else if (fiber->state_ == Fiber::INVALID) {
93     assert(fibersActive_ > 0);
94     --fibersActive_;
95     // Making sure that task functor is deleted once task is complete.
96     // NOTE: we must do it on main context, as the fiber is not
97     // running at this point.
98     fiber->func_ = nullptr;
99     fiber->resultFunc_ = nullptr;
100     if (fiber->finallyFunc_) {
101       try {
102         fiber->finallyFunc_();
103       } catch (...) {
104         exceptionCallback_(std::current_exception(), "running finallyFunc_");
105       }
106       fiber->finallyFunc_ = nullptr;
107     }
108     // Make sure LocalData is not accessible from its destructor
109     if (observer_) {
110       observer_->stopped();
111     }
112     currentFiber_ = nullptr;
113     fiber->localData_.reset();
114
115     if (fibersPoolSize_ < options_.maxFibersPoolSize) {
116       fibersPool_.push_front(*fiber);
117       ++fibersPoolSize_;
118     } else {
119       delete fiber;
120       assert(fibersAllocated_ > 0);
121       --fibersAllocated_;
122     }
123   } else if (fiber->state_ == Fiber::YIELDED) {
124     if (observer_) {
125       observer_->stopped();
126     }
127     currentFiber_ = nullptr;
128     fiber->state_ = Fiber::READY_TO_RUN;
129     yieldedFibers_.push_back(*fiber);
130   }
131 }
132
133 inline bool FiberManager::loopUntilNoReady() {
134   SCOPE_EXIT {
135     isLoopScheduled_ = false;
136     if (!readyFibers_.empty()) {
137       ensureLoopScheduled();
138     }
139     currentFiberManager_ = nullptr;
140   };
141
142   currentFiberManager_ = this;
143
144   bool hadRemoteFiber = true;
145   while (hadRemoteFiber) {
146     hadRemoteFiber = false;
147
148     while (!readyFibers_.empty()) {
149       auto& fiber = readyFibers_.front();
150       readyFibers_.pop_front();
151       runReadyFiber(&fiber);
152     }
153
154     remoteReadyQueue_.sweep(
155       [this, &hadRemoteFiber] (Fiber* fiber) {
156         runReadyFiber(fiber);
157         hadRemoteFiber = true;
158       }
159     );
160
161     remoteTaskQueue_.sweep(
162       [this, &hadRemoteFiber] (RemoteTask* taskPtr) {
163         std::unique_ptr<RemoteTask> task(taskPtr);
164         auto fiber = getFiber();
165         if (task->localData) {
166           fiber->localData_ = *task->localData;
167         }
168
169         fiber->setFunction(std::move(task->func));
170         fiber->data_ = reinterpret_cast<intptr_t>(fiber);
171         runReadyFiber(fiber);
172         hadRemoteFiber = true;
173       }
174     );
175   }
176
177   readyFibers_.splice(readyFibers_.end(), yieldedFibers_);
178
179   return fibersActive_ > 0;
180 }
181
182 // We need this to be in a struct, not inlined in addTask, because clang crashes
183 // otherwise.
184 template <typename F>
185 struct FiberManager::AddTaskHelper {
186   class Func;
187
188   static constexpr bool allocateInBuffer =
189     sizeof(Func) <= Fiber::kUserBufferSize;
190
191   class Func {
192    public:
193     Func(F&& func, FiberManager& fm) :
194         func_(std::forward<F>(func)), fm_(fm) {}
195
196     void operator()() {
197       try {
198         func_();
199       } catch (...) {
200         fm_.exceptionCallback_(std::current_exception(),
201                                "running Func functor");
202       }
203       if (allocateInBuffer) {
204         this->~Func();
205       } else {
206         delete this;
207       }
208     }
209
210    private:
211     F func_;
212     FiberManager& fm_;
213   };
214 };
215
216 template <typename F>
217 void FiberManager::addTask(F&& func) {
218   typedef AddTaskHelper<F> Helper;
219
220   auto fiber = getFiber();
221   initLocalData(*fiber);
222
223   if (Helper::allocateInBuffer) {
224     auto funcLoc = static_cast<typename Helper::Func*>(fiber->getUserBuffer());
225     new (funcLoc) typename Helper::Func(std::forward<F>(func), *this);
226
227     fiber->setFunction(std::ref(*funcLoc));
228   } else {
229     auto funcLoc = new typename Helper::Func(std::forward<F>(func), *this);
230
231     fiber->setFunction(std::ref(*funcLoc));
232   }
233
234   fiber->data_ = reinterpret_cast<intptr_t>(fiber);
235   readyFibers_.push_back(*fiber);
236
237   ensureLoopScheduled();
238 }
239
240 template <typename F>
241 void FiberManager::addTaskRemote(F&& func) {
242   auto task = [&]() {
243     auto currentFm = getFiberManagerUnsafe();
244     if (currentFm &&
245         currentFm->currentFiber_ &&
246         currentFm->localType_ == localType_) {
247       return folly::make_unique<RemoteTask>(
248         std::forward<F>(func),
249         currentFm->currentFiber_->localData_);
250     }
251     return folly::make_unique<RemoteTask>(std::forward<F>(func));
252   }();
253   if (remoteTaskQueue_.insertHead(task.release())) {
254     loopController_->scheduleThreadSafe();
255   }
256 }
257
258 template <typename X>
259 struct IsRvalueRefTry { static const bool value = false; };
260 template <typename T>
261 struct IsRvalueRefTry<folly::Try<T>&&> { static const bool value = true; };
262
263 // We need this to be in a struct, not inlined in addTaskFinally, because clang
264 // crashes otherwise.
265 template <typename F, typename G>
266 struct FiberManager::AddTaskFinallyHelper {
267   class Func;
268   class Finally;
269
270   typedef typename std::result_of<F()>::type Result;
271
272   static constexpr bool allocateInBuffer =
273     sizeof(Func) + sizeof(Finally) <= Fiber::kUserBufferSize;
274
275   class Finally {
276    public:
277     Finally(G&& finally,
278             FiberManager& fm) :
279         finally_(std::move(finally)),
280         fm_(fm) {
281     }
282
283     void operator()() {
284       try {
285         finally_(std::move(*result_));
286       } catch (...) {
287         fm_.exceptionCallback_(std::current_exception(),
288                                "running Finally functor");
289       }
290
291       if (allocateInBuffer) {
292         this->~Finally();
293       } else {
294         delete this;
295       }
296     }
297
298    private:
299     friend class Func;
300
301     G finally_;
302     folly::Optional<folly::Try<Result>> result_;
303     FiberManager& fm_;
304   };
305
306   class Func {
307    public:
308     Func(F&& func, Finally& finally) :
309         func_(std::move(func)), result_(finally.result_) {}
310
311     void operator()() {
312       result_ = folly::makeTryFunction(std::move(func_));
313
314       if (allocateInBuffer) {
315         this->~Func();
316       } else {
317         delete this;
318       }
319     }
320
321    private:
322     F func_;
323     folly::Optional<folly::Try<Result>>& result_;
324   };
325 };
326
327 template <typename F, typename G>
328 void FiberManager::addTaskFinally(F&& func, G&& finally) {
329   typedef typename std::result_of<F()>::type Result;
330
331   static_assert(
332     IsRvalueRefTry<typename FirstArgOf<G>::type>::value,
333     "finally(arg): arg must be Try<T>&&");
334   static_assert(
335     std::is_convertible<
336       Result,
337       typename std::remove_reference<
338         typename FirstArgOf<G>::type
339       >::type::element_type
340     >::value,
341     "finally(Try<T>&&): T must be convertible from func()'s return type");
342
343   auto fiber = getFiber();
344   initLocalData(*fiber);
345
346   typedef AddTaskFinallyHelper<F,G> Helper;
347
348   if (Helper::allocateInBuffer) {
349     auto funcLoc = static_cast<typename Helper::Func*>(
350       fiber->getUserBuffer());
351     auto finallyLoc = static_cast<typename Helper::Finally*>(
352       static_cast<void*>(funcLoc + 1));
353
354     new (finallyLoc) typename Helper::Finally(std::move(finally), *this);
355     new (funcLoc) typename Helper::Func(std::move(func), *finallyLoc);
356
357     fiber->setFunctionFinally(std::ref(*funcLoc), std::ref(*finallyLoc));
358   } else {
359     auto finallyLoc = new typename Helper::Finally(std::move(finally), *this);
360     auto funcLoc = new typename Helper::Func(std::move(func), *finallyLoc);
361
362     fiber->setFunctionFinally(std::ref(*funcLoc), std::ref(*finallyLoc));
363   }
364
365   fiber->data_ = reinterpret_cast<intptr_t>(fiber);
366   readyFibers_.push_back(*fiber);
367
368   ensureLoopScheduled();
369 }
370
371 template <typename F>
372 typename std::result_of<F()>::type
373 FiberManager::runInMainContext(F&& func) {
374   return runInMainContextHelper(std::forward<F>(func));
375 }
376
377 template <typename F>
378 inline typename std::enable_if<
379   !std::is_same<typename std::result_of<F()>::type, void>::value,
380   typename std::result_of<F()>::type>::type
381 FiberManager::runInMainContextHelper(F&& func) {
382   if (UNLIKELY(activeFiber_ == nullptr)) {
383     return func();
384   }
385
386   typedef typename std::result_of<F()>::type Result;
387
388   folly::Try<Result> result;
389   auto f = [&func, &result]() mutable {
390     result = folly::makeTryFunction(std::forward<F>(func));
391   };
392
393   immediateFunc_ = std::ref(f);
394   activeFiber_->preempt(Fiber::AWAITING_IMMEDIATE);
395
396   return std::move(result.value());
397 }
398
399 template <typename F>
400 inline typename std::enable_if<
401   std::is_same<typename std::result_of<F()>::type, void>::value,
402   void>::type
403 FiberManager::runInMainContextHelper(F&& func) {
404   if (UNLIKELY(activeFiber_ == nullptr)) {
405     func();
406     return;
407   }
408
409   immediateFunc_ = std::ref(func);
410   activeFiber_->preempt(Fiber::AWAITING_IMMEDIATE);
411 }
412
413 inline FiberManager& FiberManager::getFiberManager() {
414   assert(currentFiberManager_ != nullptr);
415   return *currentFiberManager_;
416 }
417
418 inline FiberManager* FiberManager::getFiberManagerUnsafe() {
419   return currentFiberManager_;
420 }
421
422 inline bool FiberManager::hasActiveFiber() const {
423   return activeFiber_ != nullptr;
424 }
425
426 inline void FiberManager::yield() {
427   assert(currentFiberManager_ == this);
428   assert(activeFiber_ != nullptr);
429   assert(activeFiber_->state_ == Fiber::RUNNING);
430   activeFiber_->preempt(Fiber::YIELDED);
431 }
432
433 template <typename T>
434 T& FiberManager::local() {
435   if (std::type_index(typeid(T)) == localType_ && currentFiber_) {
436     return currentFiber_->localData_.get<T>();
437   }
438   return localThread<T>();
439 }
440
441 template <typename T>
442 T& FiberManager::localThread() {
443   static thread_local T t;
444   return t;
445 }
446
447 inline void FiberManager::initLocalData(Fiber& fiber) {
448   auto fm = getFiberManagerUnsafe();
449   if (fm && fm->currentFiber_ && fm->localType_ == localType_) {
450     fiber.localData_ = fm->currentFiber_->localData_;
451   }
452 }
453
454 template <typename LocalT>
455 FiberManager::FiberManager(
456   LocalType<LocalT>,
457   std::unique_ptr<LoopController> loopController__,
458   Options options)  :
459     loopController_(std::move(loopController__)),
460     options_(preprocessOptions(std::move(options))),
461     exceptionCallback_([](std::exception_ptr eptr, std::string context) {
462         try {
463           std::rethrow_exception(eptr);
464         } catch (const std::exception& e) {
465           LOG(DFATAL) << "Exception " << typeid(e).name()
466                       << " with message '" << e.what() << "' was thrown in "
467                       << "FiberManager with context '" << context << "'";
468           throw;
469         } catch (...) {
470           LOG(DFATAL) << "Unknown exception was thrown in FiberManager with "
471                       << "context '" << context << "'";
472           throw;
473         }
474       }),
475     timeoutManager_(std::make_shared<TimeoutController>(*loopController_)),
476     localType_(typeid(LocalT)) {
477   loopController_->setFiberManager(this);
478 }
479
480 template <typename F>
481 typename FirstArgOf<F>::type::value_type
482 inline await(F&& func) {
483   typedef typename FirstArgOf<F>::type::value_type Result;
484
485   folly::Try<Result> result;
486
487   Baton baton;
488   baton.wait([&func, &result, &baton]() mutable {
489       func(Promise<Result>(result, baton));
490     });
491
492   return folly::moveFromTry(std::move(result));
493 }
494
495 }}