2 * Copyright 2016 Facebook, Inc.
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
8 * http://www.apache.org/licenses/LICENSE-2.0
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
20 #include <folly/CPortability.h>
21 #include <folly/Memory.h>
22 #include <folly/Optional.h>
23 #include <folly/Portability.h>
24 #include <folly/ScopeGuard.h>
26 #include <folly/ThreadLocal.h>
28 #include <folly/fibers/Baton.h>
29 #include <folly/fibers/Fiber.h>
30 #include <folly/fibers/LoopController.h>
31 #include <folly/fibers/Promise.h>
32 #include <folly/futures/Promise.h>
33 #include <folly/Try.h>
40 inline FiberManager::Options preprocessOptions(FiberManager::Options opts) {
41 #ifdef FOLLY_SANITIZE_ADDRESS
42 /* ASAN needs a lot of extra stack space.
43 16x is a conservative estimate, 8x also worked with tests
44 where it mattered. Note that overallocating here does not necessarily
45 increase RSS, since unused memory is pretty much free. */
53 inline void FiberManager::ensureLoopScheduled() {
54 if (isLoopScheduled_) {
58 isLoopScheduled_ = true;
59 loopController_->schedule();
62 inline intptr_t FiberManager::activateFiber(Fiber* fiber) {
63 DCHECK_EQ(activeFiber_, (Fiber*)nullptr);
65 #ifdef FOLLY_SANITIZE_ADDRESS
66 registerFiberActivationWithAsan(fiber);
70 return jumpContext(&mainContext_, &fiber->fcontext_, fiber->data_);
73 inline intptr_t FiberManager::deactivateFiber(Fiber* fiber) {
74 DCHECK_EQ(activeFiber_, fiber);
76 #ifdef FOLLY_SANITIZE_ADDRESS
77 registerFiberDeactivationWithAsan(fiber);
80 activeFiber_ = nullptr;
81 return jumpContext(&fiber->fcontext_, &mainContext_, 0);
84 inline void FiberManager::runReadyFiber(Fiber* fiber) {
86 assert(currentFiber_ == nullptr);
87 assert(activeFiber_ == nullptr);
91 fiber->state_ == Fiber::NOT_STARTED ||
92 fiber->state_ == Fiber::READY_TO_RUN);
93 currentFiber_ = fiber;
94 fiber->rcontext_ = RequestContext::setContext(std::move(fiber->rcontext_));
96 observer_->starting(reinterpret_cast<uintptr_t>(fiber));
99 while (fiber->state_ == Fiber::NOT_STARTED ||
100 fiber->state_ == Fiber::READY_TO_RUN) {
101 activateFiber(fiber);
102 if (fiber->state_ == Fiber::AWAITING_IMMEDIATE) {
106 exceptionCallback_(std::current_exception(), "running immediateFunc_");
108 immediateFunc_ = nullptr;
109 fiber->state_ = Fiber::READY_TO_RUN;
113 if (fiber->state_ == Fiber::AWAITING) {
115 awaitFunc_ = nullptr;
117 observer_->stopped(reinterpret_cast<uintptr_t>(fiber));
119 currentFiber_ = nullptr;
120 fiber->rcontext_ = RequestContext::setContext(std::move(fiber->rcontext_));
121 } else if (fiber->state_ == Fiber::INVALID) {
122 assert(fibersActive_ > 0);
124 // Making sure that task functor is deleted once task is complete.
125 // NOTE: we must do it on main context, as the fiber is not
126 // running at this point.
127 fiber->func_ = nullptr;
128 fiber->resultFunc_ = nullptr;
129 if (fiber->finallyFunc_) {
131 fiber->finallyFunc_();
133 exceptionCallback_(std::current_exception(), "running finallyFunc_");
135 fiber->finallyFunc_ = nullptr;
137 // Make sure LocalData is not accessible from its destructor
139 observer_->stopped(reinterpret_cast<uintptr_t>(fiber));
141 currentFiber_ = nullptr;
142 fiber->rcontext_ = RequestContext::setContext(std::move(fiber->rcontext_));
143 fiber->localData_.reset();
144 fiber->rcontext_.reset();
146 if (fibersPoolSize_ < options_.maxFibersPoolSize ||
147 options_.fibersPoolResizePeriodMs > 0) {
148 fibersPool_.push_front(*fiber);
152 assert(fibersAllocated_ > 0);
155 } else if (fiber->state_ == Fiber::YIELDED) {
157 observer_->stopped(reinterpret_cast<uintptr_t>(fiber));
159 currentFiber_ = nullptr;
160 fiber->rcontext_ = RequestContext::setContext(std::move(fiber->rcontext_));
161 fiber->state_ = Fiber::READY_TO_RUN;
162 yieldedFibers_.push_back(*fiber);
166 inline bool FiberManager::loopUntilNoReady() {
168 if (UNLIKELY(!alternateSignalStackRegistered_)) {
169 registerAlternateSignalStack();
173 // Support nested FiberManagers
174 auto originalFiberManager = this;
175 std::swap(currentFiberManager_, originalFiberManager);
178 isLoopScheduled_ = false;
179 if (!readyFibers_.empty()) {
180 ensureLoopScheduled();
182 std::swap(currentFiberManager_, originalFiberManager);
183 CHECK_EQ(this, originalFiberManager);
186 bool hadRemoteFiber = true;
187 while (hadRemoteFiber) {
188 hadRemoteFiber = false;
190 while (!readyFibers_.empty()) {
191 auto& fiber = readyFibers_.front();
192 readyFibers_.pop_front();
193 runReadyFiber(&fiber);
196 remoteReadyQueue_.sweep([this, &hadRemoteFiber](Fiber* fiber) {
197 runReadyFiber(fiber);
198 hadRemoteFiber = true;
201 remoteTaskQueue_.sweep([this, &hadRemoteFiber](RemoteTask* taskPtr) {
202 std::unique_ptr<RemoteTask> task(taskPtr);
203 auto fiber = getFiber();
204 if (task->localData) {
205 fiber->localData_ = *task->localData;
207 fiber->rcontext_ = std::move(task->rcontext);
209 fiber->setFunction(std::move(task->func));
210 fiber->data_ = reinterpret_cast<intptr_t>(fiber);
212 observer_->runnable(reinterpret_cast<uintptr_t>(fiber));
214 runReadyFiber(fiber);
215 hadRemoteFiber = true;
220 for (auto& yielded : yieldedFibers_) {
221 observer_->runnable(reinterpret_cast<uintptr_t>(&yielded));
224 readyFibers_.splice(readyFibers_.end(), yieldedFibers_);
226 return fibersActive_ > 0;
229 // We need this to be in a struct, not inlined in addTask, because clang crashes
231 template <typename F>
232 struct FiberManager::AddTaskHelper {
235 static constexpr bool allocateInBuffer =
236 sizeof(Func) <= Fiber::kUserBufferSize;
240 Func(F&& func, FiberManager& fm) : func_(std::forward<F>(func)), fm_(fm) {}
246 fm_.exceptionCallback_(
247 std::current_exception(), "running Func functor");
249 if (allocateInBuffer) {
262 template <typename F>
263 void FiberManager::addTask(F&& func) {
264 typedef AddTaskHelper<F> Helper;
266 auto fiber = getFiber();
267 initLocalData(*fiber);
269 if (Helper::allocateInBuffer) {
270 auto funcLoc = static_cast<typename Helper::Func*>(fiber->getUserBuffer());
271 new (funcLoc) typename Helper::Func(std::forward<F>(func), *this);
273 fiber->setFunction(std::ref(*funcLoc));
275 auto funcLoc = new typename Helper::Func(std::forward<F>(func), *this);
277 fiber->setFunction(std::ref(*funcLoc));
280 fiber->data_ = reinterpret_cast<intptr_t>(fiber);
281 readyFibers_.push_back(*fiber);
283 observer_->runnable(reinterpret_cast<uintptr_t>(fiber));
286 ensureLoopScheduled();
289 template <typename F>
290 auto FiberManager::addTaskFuture(F&& func) -> folly::Future<
291 typename folly::Unit::Lift<typename std::result_of<F()>::type>::type> {
292 using T = typename std::result_of<F()>::type;
293 using FutureT = typename folly::Unit::Lift<T>::type;
295 folly::Promise<FutureT> p;
296 auto f = p.getFuture();
298 [func = std::forward<F>(func)]() mutable { return func(); },
299 [p = std::move(p)](folly::Try<T> && t) mutable {
300 p.setTry(std::move(t));
305 template <typename F>
306 void FiberManager::addTaskRemote(F&& func) {
308 auto currentFm = getFiberManagerUnsafe();
309 if (currentFm && currentFm->currentFiber_ &&
310 currentFm->localType_ == localType_) {
311 return folly::make_unique<RemoteTask>(
312 std::forward<F>(func), currentFm->currentFiber_->localData_);
314 return folly::make_unique<RemoteTask>(std::forward<F>(func));
316 auto insertHead = [&]() {
317 return remoteTaskQueue_.insertHead(task.release());
319 loopController_->scheduleThreadSafe(std::ref(insertHead));
322 template <typename F>
323 auto FiberManager::addTaskRemoteFuture(F&& func) -> folly::Future<
324 typename folly::Unit::Lift<typename std::result_of<F()>::type>::type> {
326 typename folly::Unit::Lift<typename std::result_of<F()>::type>::type>
328 auto f = p.getFuture();
330 [ p = std::move(p), func = std::forward<F>(func), this ]() mutable {
331 auto t = folly::makeTryWith(std::forward<F>(func));
332 runInMainContext([&]() { p.setTry(std::move(t)); });
337 template <typename X>
338 struct IsRvalueRefTry {
339 static const bool value = false;
341 template <typename T>
342 struct IsRvalueRefTry<folly::Try<T>&&> {
343 static const bool value = true;
346 // We need this to be in a struct, not inlined in addTaskFinally, because clang
347 // crashes otherwise.
348 template <typename F, typename G>
349 struct FiberManager::AddTaskFinallyHelper {
352 typedef typename std::result_of<F()>::type Result;
356 Finally(G finally, FiberManager& fm)
357 : finally_(std::move(finally)), fm_(fm) {}
361 finally_(std::move(*result_));
363 fm_.exceptionCallback_(
364 std::current_exception(), "running Finally functor");
367 if (allocateInBuffer) {
378 folly::Optional<folly::Try<Result>> result_;
384 Func(F func, Finally& finally)
385 : func_(std::move(func)), result_(finally.result_) {}
388 result_ = folly::makeTryWith(std::move(func_));
390 if (allocateInBuffer) {
399 folly::Optional<folly::Try<Result>>& result_;
402 static constexpr bool allocateInBuffer =
403 sizeof(Func) + sizeof(Finally) <= Fiber::kUserBufferSize;
406 template <typename F, typename G>
407 void FiberManager::addTaskFinally(F&& func, G&& finally) {
408 typedef typename std::result_of<F()>::type Result;
411 IsRvalueRefTry<typename FirstArgOf<G>::type>::value,
412 "finally(arg): arg must be Try<T>&&");
416 typename std::remove_reference<
417 typename FirstArgOf<G>::type>::type::element_type>::value,
418 "finally(Try<T>&&): T must be convertible from func()'s return type");
420 auto fiber = getFiber();
421 initLocalData(*fiber);
423 typedef AddTaskFinallyHelper<
424 typename std::decay<F>::type,
425 typename std::decay<G>::type>
428 if (Helper::allocateInBuffer) {
429 auto funcLoc = static_cast<typename Helper::Func*>(fiber->getUserBuffer());
431 static_cast<typename Helper::Finally*>(static_cast<void*>(funcLoc + 1));
433 new (finallyLoc) typename Helper::Finally(std::forward<G>(finally), *this);
434 new (funcLoc) typename Helper::Func(std::forward<F>(func), *finallyLoc);
436 fiber->setFunctionFinally(std::ref(*funcLoc), std::ref(*finallyLoc));
439 new typename Helper::Finally(std::forward<G>(finally), *this);
441 new typename Helper::Func(std::forward<F>(func), *finallyLoc);
443 fiber->setFunctionFinally(std::ref(*funcLoc), std::ref(*finallyLoc));
446 fiber->data_ = reinterpret_cast<intptr_t>(fiber);
447 readyFibers_.push_back(*fiber);
449 observer_->runnable(reinterpret_cast<uintptr_t>(fiber));
452 ensureLoopScheduled();
455 template <typename F>
456 typename std::result_of<F()>::type FiberManager::runInMainContext(F&& func) {
457 if (UNLIKELY(activeFiber_ == nullptr)) {
461 typedef typename std::result_of<F()>::type Result;
463 folly::Try<Result> result;
464 auto f = [&func, &result]() mutable {
465 result = folly::makeTryWith(std::forward<F>(func));
468 immediateFunc_ = std::ref(f);
469 activeFiber_->preempt(Fiber::AWAITING_IMMEDIATE);
471 return std::move(result).value();
474 inline FiberManager& FiberManager::getFiberManager() {
475 assert(currentFiberManager_ != nullptr);
476 return *currentFiberManager_;
479 inline FiberManager* FiberManager::getFiberManagerUnsafe() {
480 return currentFiberManager_;
483 inline bool FiberManager::hasActiveFiber() const {
484 return activeFiber_ != nullptr;
487 inline void FiberManager::yield() {
488 assert(currentFiberManager_ == this);
489 assert(activeFiber_ != nullptr);
490 assert(activeFiber_->state_ == Fiber::RUNNING);
491 activeFiber_->preempt(Fiber::YIELDED);
494 template <typename T>
495 T& FiberManager::local() {
496 if (std::type_index(typeid(T)) == localType_ && currentFiber_) {
497 return currentFiber_->localData_.get<T>();
499 return localThread<T>();
502 template <typename T>
503 T& FiberManager::localThread() {
505 static thread_local T t;
507 #else // osx doesn't support thread_local
508 static ThreadLocal<T> t;
513 inline void FiberManager::initLocalData(Fiber& fiber) {
514 auto fm = getFiberManagerUnsafe();
515 if (fm && fm->currentFiber_ && fm->localType_ == localType_) {
516 fiber.localData_ = fm->currentFiber_->localData_;
518 fiber.rcontext_ = RequestContext::saveContext();
521 template <typename LocalT>
522 FiberManager::FiberManager(
524 std::unique_ptr<LoopController> loopController__,
526 : loopController_(std::move(loopController__)),
527 stackAllocator_(options.useGuardPages),
528 options_(preprocessOptions(std::move(options))),
529 exceptionCallback_([](std::exception_ptr eptr, std::string context) {
531 std::rethrow_exception(eptr);
532 } catch (const std::exception& e) {
533 LOG(DFATAL) << "Exception " << typeid(e).name() << " with message '"
534 << e.what() << "' was thrown in "
535 << "FiberManager with context '" << context << "'";
537 LOG(DFATAL) << "Unknown exception was thrown in FiberManager with "
538 << "context '" << context << "'";
541 timeoutManager_(std::make_shared<TimeoutController>(*loopController_)),
542 fibersPoolResizer_(*this),
543 localType_(typeid(LocalT)) {
544 loopController_->setFiberManager(this);
547 template <typename F>
548 typename FirstArgOf<F>::type::value_type inline await(F&& func) {
549 typedef typename FirstArgOf<F>::type::value_type Result;
550 typedef typename FirstArgOf<F>::type::baton_type BatonT;
552 return Promise<Result, BatonT>::await(std::forward<F>(func));