2 * Copyright 2016 Facebook, Inc.
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
8 * http://www.apache.org/licenses/LICENSE-2.0
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
20 #include <folly/CPortability.h>
21 #include <folly/Memory.h>
22 #include <folly/MoveWrapper.h>
23 #include <folly/Optional.h>
24 #include <folly/Portability.h>
25 #include <folly/ScopeGuard.h>
27 #include <folly/ThreadLocal.h>
29 #include <folly/experimental/fibers/Baton.h>
30 #include <folly/experimental/fibers/Fiber.h>
31 #include <folly/experimental/fibers/LoopController.h>
32 #include <folly/experimental/fibers/Promise.h>
33 #include <folly/futures/Promise.h>
34 #include <folly/futures/Try.h>
36 namespace folly { namespace fibers {
40 inline FiberManager::Options preprocessOptions(FiberManager::Options opts) {
41 #ifdef FOLLY_SANITIZE_ADDRESS
42 /* ASAN needs a lot of extra stack space.
43 16x is a conservative estimate, 8x also worked with tests
44 where it mattered. Note that overallocating here does not necessarily
45 increase RSS, since unused memory is pretty much free. */
53 inline void FiberManager::ensureLoopScheduled() {
54 if (isLoopScheduled_) {
58 isLoopScheduled_ = true;
59 loopController_->schedule();
62 inline intptr_t FiberManager::activateFiber(Fiber* fiber) {
63 DCHECK_EQ(activeFiber_, (Fiber*)nullptr);
65 #ifdef FOLLY_SANITIZE_ADDRESS
66 registerFiberActivationWithAsan(fiber);
70 return jumpContext(&mainContext_, &fiber->fcontext_, fiber->data_);
73 inline intptr_t FiberManager::deactivateFiber(Fiber* fiber) {
74 DCHECK_EQ(activeFiber_, fiber);
76 #ifdef FOLLY_SANITIZE_ADDRESS
77 registerFiberDeactivationWithAsan(fiber);
80 activeFiber_ = nullptr;
81 return jumpContext(&fiber->fcontext_, &mainContext_, 0);
84 inline void FiberManager::runReadyFiber(Fiber* fiber) {
86 assert(currentFiber_ == nullptr);
87 assert(activeFiber_ == nullptr);
90 assert(fiber->state_ == Fiber::NOT_STARTED ||
91 fiber->state_ == Fiber::READY_TO_RUN);
92 currentFiber_ = fiber;
93 fiber->rcontext_ = RequestContext::setContext(std::move(fiber->rcontext_));
95 observer_->starting(reinterpret_cast<uintptr_t>(fiber));
98 while (fiber->state_ == Fiber::NOT_STARTED ||
99 fiber->state_ == Fiber::READY_TO_RUN) {
100 activateFiber(fiber);
101 if (fiber->state_ == Fiber::AWAITING_IMMEDIATE) {
105 exceptionCallback_(std::current_exception(), "running immediateFunc_");
107 immediateFunc_ = nullptr;
108 fiber->state_ = Fiber::READY_TO_RUN;
112 if (fiber->state_ == Fiber::AWAITING) {
114 awaitFunc_ = nullptr;
116 observer_->stopped(reinterpret_cast<uintptr_t>(fiber));
118 currentFiber_ = nullptr;
119 fiber->rcontext_ = RequestContext::setContext(std::move(fiber->rcontext_));
120 } else if (fiber->state_ == Fiber::INVALID) {
121 assert(fibersActive_ > 0);
123 // Making sure that task functor is deleted once task is complete.
124 // NOTE: we must do it on main context, as the fiber is not
125 // running at this point.
126 fiber->func_ = nullptr;
127 fiber->resultFunc_ = nullptr;
128 if (fiber->finallyFunc_) {
130 fiber->finallyFunc_();
132 exceptionCallback_(std::current_exception(), "running finallyFunc_");
134 fiber->finallyFunc_ = nullptr;
136 // Make sure LocalData is not accessible from its destructor
138 observer_->stopped(reinterpret_cast<uintptr_t>(fiber));
140 currentFiber_ = nullptr;
141 fiber->rcontext_ = RequestContext::setContext(std::move(fiber->rcontext_));
142 fiber->localData_.reset();
143 fiber->rcontext_.reset();
145 if (fibersPoolSize_ < options_.maxFibersPoolSize ||
146 options_.fibersPoolResizePeriodMs > 0) {
147 fibersPool_.push_front(*fiber);
151 assert(fibersAllocated_ > 0);
154 } else if (fiber->state_ == Fiber::YIELDED) {
156 observer_->stopped(reinterpret_cast<uintptr_t>(fiber));
158 currentFiber_ = nullptr;
159 fiber->rcontext_ = RequestContext::setContext(std::move(fiber->rcontext_));
160 fiber->state_ = Fiber::READY_TO_RUN;
161 yieldedFibers_.push_back(*fiber);
165 inline bool FiberManager::loopUntilNoReady() {
166 // Support nested FiberManagers
167 auto originalFiberManager = this;
168 std::swap(currentFiberManager_, originalFiberManager);
171 isLoopScheduled_ = false;
172 if (!readyFibers_.empty()) {
173 ensureLoopScheduled();
175 std::swap(currentFiberManager_, originalFiberManager);
176 CHECK_EQ(this, originalFiberManager);
179 bool hadRemoteFiber = true;
180 while (hadRemoteFiber) {
181 hadRemoteFiber = false;
183 while (!readyFibers_.empty()) {
184 auto& fiber = readyFibers_.front();
185 readyFibers_.pop_front();
186 runReadyFiber(&fiber);
189 remoteReadyQueue_.sweep(
190 [this, &hadRemoteFiber] (Fiber* fiber) {
191 runReadyFiber(fiber);
192 hadRemoteFiber = true;
196 remoteTaskQueue_.sweep(
197 [this, &hadRemoteFiber] (RemoteTask* taskPtr) {
198 std::unique_ptr<RemoteTask> task(taskPtr);
199 auto fiber = getFiber();
200 if (task->localData) {
201 fiber->localData_ = *task->localData;
203 fiber->rcontext_ = std::move(task->rcontext);
205 fiber->setFunction(std::move(task->func));
206 fiber->data_ = reinterpret_cast<intptr_t>(fiber);
208 observer_->runnable(reinterpret_cast<uintptr_t>(fiber));
210 runReadyFiber(fiber);
211 hadRemoteFiber = true;
217 for (auto& yielded : yieldedFibers_) {
218 observer_->runnable(reinterpret_cast<uintptr_t>(&yielded));
221 readyFibers_.splice(readyFibers_.end(), yieldedFibers_);
223 return fibersActive_ > 0;
226 // We need this to be in a struct, not inlined in addTask, because clang crashes
228 template <typename F>
229 struct FiberManager::AddTaskHelper {
232 static constexpr bool allocateInBuffer =
233 sizeof(Func) <= Fiber::kUserBufferSize;
237 Func(F&& func, FiberManager& fm) :
238 func_(std::forward<F>(func)), fm_(fm) {}
244 fm_.exceptionCallback_(std::current_exception(),
245 "running Func functor");
247 if (allocateInBuffer) {
260 template <typename F>
261 void FiberManager::addTask(F&& func) {
262 typedef AddTaskHelper<F> Helper;
264 auto fiber = getFiber();
265 initLocalData(*fiber);
267 if (Helper::allocateInBuffer) {
268 auto funcLoc = static_cast<typename Helper::Func*>(fiber->getUserBuffer());
269 new (funcLoc) typename Helper::Func(std::forward<F>(func), *this);
271 fiber->setFunction(std::ref(*funcLoc));
273 auto funcLoc = new typename Helper::Func(std::forward<F>(func), *this);
275 fiber->setFunction(std::ref(*funcLoc));
278 fiber->data_ = reinterpret_cast<intptr_t>(fiber);
279 readyFibers_.push_back(*fiber);
281 observer_->runnable(reinterpret_cast<uintptr_t>(fiber));
284 ensureLoopScheduled();
287 template <typename F>
288 auto FiberManager::addTaskFuture(F&& func)
289 -> folly::Future<typename std::result_of<F()>::type> {
290 using T = typename std::result_of<F()>::type;
292 auto f = p.getFuture();
293 addTaskFinally([func = std::forward<F>(func)]() mutable { return func(); },
294 [p = std::move(p)](folly::Try<T> && t) mutable {
295 p.setTry(std::move(t));
300 template <typename F>
301 void FiberManager::addTaskRemote(F&& func) {
302 // addTaskRemote indirectly requires wrapping the function in a
303 // std::function, which must be copyable. As move-only lambdas may be
304 // passed in we wrap it first in a move wrapper and then capture the wrapped
306 auto functionWrapper = [f = folly::makeMoveWrapper(
307 std::forward<F>(func))]() mutable {
311 auto currentFm = getFiberManagerUnsafe();
313 currentFm->currentFiber_ &&
314 currentFm->localType_ == localType_) {
315 return folly::make_unique<RemoteTask>(
316 std::move(functionWrapper), currentFm->currentFiber_->localData_);
318 return folly::make_unique<RemoteTask>(std::move(functionWrapper));
321 [&]() { return remoteTaskQueue_.insertHead(task.release()); };
322 loopController_->scheduleThreadSafe(std::ref(insertHead));
325 template <typename F>
326 auto FiberManager::addTaskRemoteFuture(F&& func)
327 -> folly::Future<typename std::result_of<F()>::type> {
328 folly::Promise<typename std::result_of<F()>::type> p;
329 auto f = p.getFuture();
331 [ p = std::move(p), func = std::forward<F>(func), this ]() mutable {
332 auto t = folly::makeTryWith(std::forward<F>(func));
333 runInMainContext([&]() { p.setTry(std::move(t)); });
338 template <typename X>
339 struct IsRvalueRefTry { static const bool value = false; };
340 template <typename T>
341 struct IsRvalueRefTry<folly::Try<T>&&> { static const bool value = true; };
343 // We need this to be in a struct, not inlined in addTaskFinally, because clang
344 // crashes otherwise.
345 template <typename F, typename G>
346 struct FiberManager::AddTaskFinallyHelper {
350 typedef typename std::result_of<F()>::type Result;
352 static constexpr bool allocateInBuffer =
353 sizeof(Func) + sizeof(Finally) <= Fiber::kUserBufferSize;
359 finally_(std::forward<G>(finally)),
365 finally_(std::move(*result_));
367 fm_.exceptionCallback_(std::current_exception(),
368 "running Finally functor");
371 if (allocateInBuffer) {
382 folly::Optional<folly::Try<Result>> result_;
388 Func(F&& func, Finally& finally) :
389 func_(std::move(func)), result_(finally.result_) {}
392 result_ = folly::makeTryWith(std::move(func_));
394 if (allocateInBuffer) {
403 folly::Optional<folly::Try<Result>>& result_;
407 template <typename F, typename G>
408 void FiberManager::addTaskFinally(F&& func, G&& finally) {
409 typedef typename std::result_of<F()>::type Result;
412 IsRvalueRefTry<typename FirstArgOf<G>::type>::value,
413 "finally(arg): arg must be Try<T>&&");
417 typename std::remove_reference<
418 typename FirstArgOf<G>::type
419 >::type::element_type
421 "finally(Try<T>&&): T must be convertible from func()'s return type");
423 auto fiber = getFiber();
424 initLocalData(*fiber);
426 typedef AddTaskFinallyHelper<F,G> Helper;
428 if (Helper::allocateInBuffer) {
429 auto funcLoc = static_cast<typename Helper::Func*>(
430 fiber->getUserBuffer());
431 auto finallyLoc = static_cast<typename Helper::Finally*>(
432 static_cast<void*>(funcLoc + 1));
434 new (finallyLoc) typename Helper::Finally(std::forward<G>(finally), *this);
435 new (funcLoc) typename Helper::Func(std::move(func), *finallyLoc);
437 fiber->setFunctionFinally(std::ref(*funcLoc), std::ref(*finallyLoc));
440 new typename Helper::Finally(std::forward<G>(finally), *this);
441 auto funcLoc = new typename Helper::Func(std::move(func), *finallyLoc);
443 fiber->setFunctionFinally(std::ref(*funcLoc), std::ref(*finallyLoc));
446 fiber->data_ = reinterpret_cast<intptr_t>(fiber);
447 readyFibers_.push_back(*fiber);
449 observer_->runnable(reinterpret_cast<uintptr_t>(fiber));
452 ensureLoopScheduled();
455 template <typename F>
456 typename std::result_of<F()>::type
457 FiberManager::runInMainContext(F&& func) {
458 if (UNLIKELY(activeFiber_ == nullptr)) {
462 typedef typename std::result_of<F()>::type Result;
464 folly::Try<Result> result;
465 auto f = [&func, &result]() mutable {
466 result = folly::makeTryWith(std::forward<F>(func));
469 immediateFunc_ = std::ref(f);
470 activeFiber_->preempt(Fiber::AWAITING_IMMEDIATE);
472 return std::move(result).value();
475 inline FiberManager& FiberManager::getFiberManager() {
476 assert(currentFiberManager_ != nullptr);
477 return *currentFiberManager_;
480 inline FiberManager* FiberManager::getFiberManagerUnsafe() {
481 return currentFiberManager_;
484 inline bool FiberManager::hasActiveFiber() const {
485 return activeFiber_ != nullptr;
488 inline void FiberManager::yield() {
489 assert(currentFiberManager_ == this);
490 assert(activeFiber_ != nullptr);
491 assert(activeFiber_->state_ == Fiber::RUNNING);
492 activeFiber_->preempt(Fiber::YIELDED);
495 template <typename T>
496 T& FiberManager::local() {
497 if (std::type_index(typeid(T)) == localType_ && currentFiber_) {
498 return currentFiber_->localData_.get<T>();
500 return localThread<T>();
503 template <typename T>
504 T& FiberManager::localThread() {
506 static thread_local T t;
508 #else // osx doesn't support thread_local
509 static ThreadLocal<T> t;
514 inline void FiberManager::initLocalData(Fiber& fiber) {
515 auto fm = getFiberManagerUnsafe();
516 if (fm && fm->currentFiber_ && fm->localType_ == localType_) {
517 fiber.localData_ = fm->currentFiber_->localData_;
519 fiber.rcontext_ = RequestContext::saveContext();
522 template <typename LocalT>
523 FiberManager::FiberManager(
525 std::unique_ptr<LoopController> loopController__,
527 loopController_(std::move(loopController__)),
528 stackAllocator_(options.useGuardPages),
529 options_(preprocessOptions(std::move(options))),
530 exceptionCallback_([](std::exception_ptr eptr, std::string context) {
532 std::rethrow_exception(eptr);
533 } catch (const std::exception& e) {
534 LOG(DFATAL) << "Exception " << typeid(e).name()
535 << " with message '" << e.what() << "' was thrown in "
536 << "FiberManager with context '" << context << "'";
539 LOG(DFATAL) << "Unknown exception was thrown in FiberManager with "
540 << "context '" << context << "'";
544 timeoutManager_(std::make_shared<TimeoutController>(*loopController_)),
545 fibersPoolResizer_(*this),
546 localType_(typeid(LocalT)) {
547 loopController_->setFiberManager(this);
550 template <typename F>
551 typename FirstArgOf<F>::type::value_type
552 inline await(F&& func) {
553 typedef typename FirstArgOf<F>::type::value_type Result;
555 folly::Try<Result> result;
558 baton.wait([&func, &result, &baton]() mutable {
559 func(Promise<Result>(result, baton));
562 return folly::moveFromTry(result);