fibers/Fiber-inl.h \
fibers/FiberManager.h \
fibers/FiberManager-inl.h \
+ fibers/FiberManagerInternal.h \
+ fibers/FiberManagerInternal-inl.h \
fibers/FiberManagerMap.h \
fibers/ForEach.h \
fibers/ForEach-inl.h \
#include <vector>
#include <folly/Optional.h>
-#include <folly/fibers/FiberManager.h>
+#include <folly/fibers/FiberManagerInternal.h>
#include <folly/fibers/Promise.h>
#include <folly/Try.h>
* limitations under the License.
*/
#include <folly/fibers/Fiber.h>
-#include <folly/fibers/FiberManager.h>
+#include <folly/fibers/FiberManagerInternal.h>
namespace folly {
namespace fibers {
#include <chrono>
#include <folly/detail/MemoryIdler.h>
-#include <folly/fibers/FiberManager.h>
+#include <folly/fibers/FiberManagerInternal.h>
#include <folly/portability/Asm.h>
namespace folly {
*/
#pragma once
-#include <folly/fibers/FiberManager.h>
+#include <folly/fibers/FiberManagerInternal.h>
#include <folly/fibers/LoopController.h>
#include <folly/io/async/EventBase.h>
#include <atomic>
#include <folly/Likely.h>
#include <folly/Portability.h>
#include <folly/fibers/BoostContextCompatibility.h>
-#include <folly/fibers/FiberManager.h>
+#include <folly/fibers/FiberManagerInternal.h>
#include <folly/portability/SysSyscall.h>
#include <folly/portability/Unistd.h>
*/
#pragma once
-#include <cassert>
-
-#include <folly/CPortability.h>
-#include <folly/Memory.h>
-#include <folly/Optional.h>
-#include <folly/Portability.h>
-#include <folly/ScopeGuard.h>
-#ifdef __APPLE__
-#include <folly/ThreadLocal.h>
-#endif
-#include <folly/fibers/Baton.h>
-#include <folly/fibers/Fiber.h>
-#include <folly/fibers/LoopController.h>
-#include <folly/fibers/Promise.h>
#include <folly/futures/Promise.h>
-#include <folly/Try.h>
namespace folly {
namespace fibers {
-namespace {
-
-inline FiberManager::Options preprocessOptions(FiberManager::Options opts) {
-#ifdef FOLLY_SANITIZE_ADDRESS
- /* ASAN needs a lot of extra stack space.
- 16x is a conservative estimate, 8x also worked with tests
- where it mattered. Note that overallocating here does not necessarily
- increase RSS, since unused memory is pretty much free. */
- opts.stackSize *= 16;
-#endif
- return opts;
-}
-
-} // anonymous
-
-inline void FiberManager::ensureLoopScheduled() {
- if (isLoopScheduled_) {
- return;
- }
-
- isLoopScheduled_ = true;
- loopController_->schedule();
-}
-
-inline intptr_t FiberManager::activateFiber(Fiber* fiber) {
- DCHECK_EQ(activeFiber_, (Fiber*)nullptr);
-
-#ifdef FOLLY_SANITIZE_ADDRESS
- registerFiberActivationWithAsan(fiber);
-#endif
-
- activeFiber_ = fiber;
- return jumpContext(&mainContext_, &fiber->fcontext_, fiber->data_);
-}
-
-inline intptr_t FiberManager::deactivateFiber(Fiber* fiber) {
- DCHECK_EQ(activeFiber_, fiber);
-
-#ifdef FOLLY_SANITIZE_ADDRESS
- registerFiberDeactivationWithAsan(fiber);
-#endif
-
- activeFiber_ = nullptr;
- return jumpContext(&fiber->fcontext_, &mainContext_, 0);
-}
-
-inline void FiberManager::runReadyFiber(Fiber* fiber) {
- SCOPE_EXIT {
- assert(currentFiber_ == nullptr);
- assert(activeFiber_ == nullptr);
- };
-
- assert(
- fiber->state_ == Fiber::NOT_STARTED ||
- fiber->state_ == Fiber::READY_TO_RUN);
- currentFiber_ = fiber;
- fiber->rcontext_ = RequestContext::setContext(std::move(fiber->rcontext_));
- if (observer_) {
- observer_->starting(reinterpret_cast<uintptr_t>(fiber));
- }
-
- while (fiber->state_ == Fiber::NOT_STARTED ||
- fiber->state_ == Fiber::READY_TO_RUN) {
- activateFiber(fiber);
- if (fiber->state_ == Fiber::AWAITING_IMMEDIATE) {
- try {
- immediateFunc_();
- } catch (...) {
- exceptionCallback_(std::current_exception(), "running immediateFunc_");
- }
- immediateFunc_ = nullptr;
- fiber->state_ = Fiber::READY_TO_RUN;
- }
- }
-
- if (fiber->state_ == Fiber::AWAITING) {
- awaitFunc_(*fiber);
- awaitFunc_ = nullptr;
- if (observer_) {
- observer_->stopped(reinterpret_cast<uintptr_t>(fiber));
- }
- currentFiber_ = nullptr;
- fiber->rcontext_ = RequestContext::setContext(std::move(fiber->rcontext_));
- } else if (fiber->state_ == Fiber::INVALID) {
- assert(fibersActive_ > 0);
- --fibersActive_;
- // Making sure that task functor is deleted once task is complete.
- // NOTE: we must do it on main context, as the fiber is not
- // running at this point.
- fiber->func_ = nullptr;
- fiber->resultFunc_ = nullptr;
- if (fiber->finallyFunc_) {
- try {
- fiber->finallyFunc_();
- } catch (...) {
- exceptionCallback_(std::current_exception(), "running finallyFunc_");
- }
- fiber->finallyFunc_ = nullptr;
- }
- // Make sure LocalData is not accessible from its destructor
- if (observer_) {
- observer_->stopped(reinterpret_cast<uintptr_t>(fiber));
- }
- currentFiber_ = nullptr;
- fiber->rcontext_ = RequestContext::setContext(std::move(fiber->rcontext_));
- fiber->localData_.reset();
- fiber->rcontext_.reset();
-
- if (fibersPoolSize_ < options_.maxFibersPoolSize ||
- options_.fibersPoolResizePeriodMs > 0) {
- fibersPool_.push_front(*fiber);
- ++fibersPoolSize_;
- } else {
- delete fiber;
- assert(fibersAllocated_ > 0);
- --fibersAllocated_;
- }
- } else if (fiber->state_ == Fiber::YIELDED) {
- if (observer_) {
- observer_->stopped(reinterpret_cast<uintptr_t>(fiber));
- }
- currentFiber_ = nullptr;
- fiber->rcontext_ = RequestContext::setContext(std::move(fiber->rcontext_));
- fiber->state_ = Fiber::READY_TO_RUN;
- yieldedFibers_.push_back(*fiber);
- }
-}
-
-inline bool FiberManager::loopUntilNoReady() {
-#ifndef _WIN32
- if (UNLIKELY(!alternateSignalStackRegistered_)) {
- registerAlternateSignalStack();
- }
-#endif
-
- // Support nested FiberManagers
- auto originalFiberManager = this;
- std::swap(currentFiberManager_, originalFiberManager);
-
- SCOPE_EXIT {
- isLoopScheduled_ = false;
- if (!readyFibers_.empty()) {
- ensureLoopScheduled();
- }
- std::swap(currentFiberManager_, originalFiberManager);
- CHECK_EQ(this, originalFiberManager);
- };
-
- bool hadRemoteFiber = true;
- while (hadRemoteFiber) {
- hadRemoteFiber = false;
-
- while (!readyFibers_.empty()) {
- auto& fiber = readyFibers_.front();
- readyFibers_.pop_front();
- runReadyFiber(&fiber);
- }
-
- remoteReadyQueue_.sweep([this, &hadRemoteFiber](Fiber* fiber) {
- runReadyFiber(fiber);
- hadRemoteFiber = true;
- });
-
- remoteTaskQueue_.sweep([this, &hadRemoteFiber](RemoteTask* taskPtr) {
- std::unique_ptr<RemoteTask> task(taskPtr);
- auto fiber = getFiber();
- if (task->localData) {
- fiber->localData_ = *task->localData;
- }
- fiber->rcontext_ = std::move(task->rcontext);
-
- fiber->setFunction(std::move(task->func));
- fiber->data_ = reinterpret_cast<intptr_t>(fiber);
- if (observer_) {
- observer_->runnable(reinterpret_cast<uintptr_t>(fiber));
- }
- runReadyFiber(fiber);
- hadRemoteFiber = true;
- });
- }
-
- if (observer_) {
- for (auto& yielded : yieldedFibers_) {
- observer_->runnable(reinterpret_cast<uintptr_t>(&yielded));
- }
- }
- readyFibers_.splice(readyFibers_.end(), yieldedFibers_);
-
- return fibersActive_ > 0;
-}
-
-// We need this to be in a struct, not inlined in addTask, because clang crashes
-// otherwise.
-template <typename F>
-struct FiberManager::AddTaskHelper {
- class Func;
-
- static constexpr bool allocateInBuffer =
- sizeof(Func) <= Fiber::kUserBufferSize;
-
- class Func {
- public:
- Func(F&& func, FiberManager& fm) : func_(std::forward<F>(func)), fm_(fm) {}
-
- void operator()() {
- try {
- func_();
- } catch (...) {
- fm_.exceptionCallback_(
- std::current_exception(), "running Func functor");
- }
- if (allocateInBuffer) {
- this->~Func();
- } else {
- delete this;
- }
- }
-
- private:
- F func_;
- FiberManager& fm_;
- };
-};
-
-template <typename F>
-void FiberManager::addTask(F&& func) {
- typedef AddTaskHelper<F> Helper;
-
- auto fiber = getFiber();
- initLocalData(*fiber);
-
- if (Helper::allocateInBuffer) {
- auto funcLoc = static_cast<typename Helper::Func*>(fiber->getUserBuffer());
- new (funcLoc) typename Helper::Func(std::forward<F>(func), *this);
-
- fiber->setFunction(std::ref(*funcLoc));
- } else {
- auto funcLoc = new typename Helper::Func(std::forward<F>(func), *this);
-
- fiber->setFunction(std::ref(*funcLoc));
- }
-
- fiber->data_ = reinterpret_cast<intptr_t>(fiber);
- readyFibers_.push_back(*fiber);
- if (observer_) {
- observer_->runnable(reinterpret_cast<uintptr_t>(fiber));
- }
-
- ensureLoopScheduled();
-}
-
template <typename F>
auto FiberManager::addTaskFuture(F&& func) -> folly::Future<
typename folly::Unit::Lift<typename std::result_of<F()>::type>::type> {
return f;
}
-template <typename F>
-void FiberManager::addTaskRemote(F&& func) {
- auto task = [&]() {
- auto currentFm = getFiberManagerUnsafe();
- if (currentFm && currentFm->currentFiber_ &&
- currentFm->localType_ == localType_) {
- return folly::make_unique<RemoteTask>(
- std::forward<F>(func), currentFm->currentFiber_->localData_);
- }
- return folly::make_unique<RemoteTask>(std::forward<F>(func));
- }();
- auto insertHead = [&]() {
- return remoteTaskQueue_.insertHead(task.release());
- };
- loopController_->scheduleThreadSafe(std::ref(insertHead));
-}
-
template <typename F>
auto FiberManager::addTaskRemoteFuture(F&& func) -> folly::Future<
typename folly::Unit::Lift<typename std::result_of<F()>::type>::type> {
});
return f;
}
-
-template <typename X>
-struct IsRvalueRefTry {
- static const bool value = false;
-};
-template <typename T>
-struct IsRvalueRefTry<folly::Try<T>&&> {
- static const bool value = true;
-};
-
-// We need this to be in a struct, not inlined in addTaskFinally, because clang
-// crashes otherwise.
-template <typename F, typename G>
-struct FiberManager::AddTaskFinallyHelper {
- class Func;
-
- typedef typename std::result_of<F()>::type Result;
-
- class Finally {
- public:
- Finally(G finally, FiberManager& fm)
- : finally_(std::move(finally)), fm_(fm) {}
-
- void operator()() {
- try {
- finally_(std::move(*result_));
- } catch (...) {
- fm_.exceptionCallback_(
- std::current_exception(), "running Finally functor");
- }
-
- if (allocateInBuffer) {
- this->~Finally();
- } else {
- delete this;
- }
- }
-
- private:
- friend class Func;
-
- G finally_;
- folly::Optional<folly::Try<Result>> result_;
- FiberManager& fm_;
- };
-
- class Func {
- public:
- Func(F func, Finally& finally)
- : func_(std::move(func)), result_(finally.result_) {}
-
- void operator()() {
- result_ = folly::makeTryWith(std::move(func_));
-
- if (allocateInBuffer) {
- this->~Func();
- } else {
- delete this;
- }
- }
-
- private:
- F func_;
- folly::Optional<folly::Try<Result>>& result_;
- };
-
- static constexpr bool allocateInBuffer =
- sizeof(Func) + sizeof(Finally) <= Fiber::kUserBufferSize;
-};
-
-template <typename F, typename G>
-void FiberManager::addTaskFinally(F&& func, G&& finally) {
- typedef typename std::result_of<F()>::type Result;
-
- static_assert(
- IsRvalueRefTry<typename FirstArgOf<G>::type>::value,
- "finally(arg): arg must be Try<T>&&");
- static_assert(
- std::is_convertible<
- Result,
- typename std::remove_reference<
- typename FirstArgOf<G>::type>::type::element_type>::value,
- "finally(Try<T>&&): T must be convertible from func()'s return type");
-
- auto fiber = getFiber();
- initLocalData(*fiber);
-
- typedef AddTaskFinallyHelper<
- typename std::decay<F>::type,
- typename std::decay<G>::type>
- Helper;
-
- if (Helper::allocateInBuffer) {
- auto funcLoc = static_cast<typename Helper::Func*>(fiber->getUserBuffer());
- auto finallyLoc =
- static_cast<typename Helper::Finally*>(static_cast<void*>(funcLoc + 1));
-
- new (finallyLoc) typename Helper::Finally(std::forward<G>(finally), *this);
- new (funcLoc) typename Helper::Func(std::forward<F>(func), *finallyLoc);
-
- fiber->setFunctionFinally(std::ref(*funcLoc), std::ref(*finallyLoc));
- } else {
- auto finallyLoc =
- new typename Helper::Finally(std::forward<G>(finally), *this);
- auto funcLoc =
- new typename Helper::Func(std::forward<F>(func), *finallyLoc);
-
- fiber->setFunctionFinally(std::ref(*funcLoc), std::ref(*finallyLoc));
- }
-
- fiber->data_ = reinterpret_cast<intptr_t>(fiber);
- readyFibers_.push_back(*fiber);
- if (observer_) {
- observer_->runnable(reinterpret_cast<uintptr_t>(fiber));
- }
-
- ensureLoopScheduled();
-}
-
-template <typename F>
-typename std::result_of<F()>::type FiberManager::runInMainContext(F&& func) {
- if (UNLIKELY(activeFiber_ == nullptr)) {
- return func();
- }
-
- typedef typename std::result_of<F()>::type Result;
-
- folly::Try<Result> result;
- auto f = [&func, &result]() mutable {
- result = folly::makeTryWith(std::forward<F>(func));
- };
-
- immediateFunc_ = std::ref(f);
- activeFiber_->preempt(Fiber::AWAITING_IMMEDIATE);
-
- return std::move(result).value();
-}
-
-inline FiberManager& FiberManager::getFiberManager() {
- assert(currentFiberManager_ != nullptr);
- return *currentFiberManager_;
-}
-
-inline FiberManager* FiberManager::getFiberManagerUnsafe() {
- return currentFiberManager_;
-}
-
-inline bool FiberManager::hasActiveFiber() const {
- return activeFiber_ != nullptr;
-}
-
-inline void FiberManager::yield() {
- assert(currentFiberManager_ == this);
- assert(activeFiber_ != nullptr);
- assert(activeFiber_->state_ == Fiber::RUNNING);
- activeFiber_->preempt(Fiber::YIELDED);
-}
-
-template <typename T>
-T& FiberManager::local() {
- if (std::type_index(typeid(T)) == localType_ && currentFiber_) {
- return currentFiber_->localData_.get<T>();
- }
- return localThread<T>();
-}
-
-template <typename T>
-T& FiberManager::localThread() {
-#ifndef __APPLE__
- static thread_local T t;
- return t;
-#else // osx doesn't support thread_local
- static ThreadLocal<T> t;
- return *t;
-#endif
-}
-
-inline void FiberManager::initLocalData(Fiber& fiber) {
- auto fm = getFiberManagerUnsafe();
- if (fm && fm->currentFiber_ && fm->localType_ == localType_) {
- fiber.localData_ = fm->currentFiber_->localData_;
- }
- fiber.rcontext_ = RequestContext::saveContext();
-}
-
-template <typename LocalT>
-FiberManager::FiberManager(
- LocalType<LocalT>,
- std::unique_ptr<LoopController> loopController__,
- Options options)
- : loopController_(std::move(loopController__)),
- stackAllocator_(options.useGuardPages),
- options_(preprocessOptions(std::move(options))),
- exceptionCallback_([](std::exception_ptr eptr, std::string context) {
- try {
- std::rethrow_exception(eptr);
- } catch (const std::exception& e) {
- LOG(DFATAL) << "Exception " << typeid(e).name() << " with message '"
- << e.what() << "' was thrown in "
- << "FiberManager with context '" << context << "'";
- } catch (...) {
- LOG(DFATAL) << "Unknown exception was thrown in FiberManager with "
- << "context '" << context << "'";
- }
- }),
- timeoutManager_(std::make_shared<TimeoutController>(*loopController_)),
- fibersPoolResizer_(*this),
- localType_(typeid(LocalT)) {
- loopController_->setFiberManager(this);
-}
-
-template <typename F>
-typename FirstArgOf<F>::type::value_type inline await(F&& func) {
- typedef typename FirstArgOf<F>::type::value_type Result;
- typedef typename FirstArgOf<F>::type::baton_type BatonT;
-
- return Promise<Result, BatonT>::await(std::forward<F>(func));
-}
}
}
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-#include "FiberManager.h"
+#include "FiberManagerInternal.h"
#include <signal.h>
*/
#pragma once
-#include <functional>
-#include <memory>
-#include <queue>
-#include <thread>
-#include <type_traits>
-#include <typeindex>
-#include <unordered_set>
-#include <vector>
-
-#include <folly/AtomicIntrusiveLinkedList.h>
-#include <folly/Executor.h>
-#include <folly/IntrusiveList.h>
-#include <folly/Likely.h>
-#include <folly/Try.h>
-#include <folly/io/async/Request.h>
-
-#include <folly/experimental/ExecutionObserver.h>
-#include <folly/fibers/BoostContextCompatibility.h>
-#include <folly/fibers/Fiber.h>
-#include <folly/fibers/GuardPageAllocator.h>
-#include <folly/fibers/TimeoutController.h>
-#include <folly/fibers/traits.h>
-
-namespace folly {
-
-template <class T>
-class Future;
-
-namespace fibers {
-
-class Baton;
-class Fiber;
-class LoopController;
-class TimeoutController;
-
-template <typename T>
-class LocalType {};
-
-class InlineFunctionRunner {
- public:
- virtual ~InlineFunctionRunner() {}
-
- /**
- * func must be executed inline and only once.
- */
- virtual void run(folly::Function<void()> func) = 0;
-};
-
-/**
- * @class FiberManager
- * @brief Single-threaded task execution engine.
- *
- * FiberManager allows semi-parallel task execution on the same thread. Each
- * task can notify FiberManager that it is blocked on something (via await())
- * call. This will pause execution of this task and it will be resumed only
- * when it is unblocked (via setData()).
- */
-class FiberManager : public ::folly::Executor {
- public:
- struct Options {
- static constexpr size_t kDefaultStackSize{16 * 1024};
-
- /**
- * Maximum stack size for fibers which will be used for executing all the
- * tasks.
- */
- size_t stackSize{kDefaultStackSize};
-
- /**
- * Record exact amount of stack used.
- *
- * This is fairly expensive: we fill each newly allocated stack
- * with some known value and find the boundary of unused stack
- * with linear search every time we surrender the stack back to fibersPool.
- * 0 disables stack recording.
- */
- size_t recordStackEvery{0};
-
- /**
- * Keep at most this many free fibers in the pool.
- * This way the total number of fibers in the system is always bounded
- * by the number of active fibers + maxFibersPoolSize.
- */
- size_t maxFibersPoolSize{1000};
-
- /**
- * Protect limited amount of fiber stacks with guard pages.
- */
- bool useGuardPages{true};
-
- /**
- * Free unnecessary fibers in the fibers pool every fibersPoolResizePeriodMs
- * milliseconds. If value is 0, periodic resizing of the fibers pool is
- * disabled.
- */
- uint32_t fibersPoolResizePeriodMs{0};
-
- constexpr Options() {}
- };
-
- using ExceptionCallback =
- folly::Function<void(std::exception_ptr, std::string)>;
-
- FiberManager(const FiberManager&) = delete;
- FiberManager& operator=(const FiberManager&) = delete;
-
- /**
- * Initializes, but doesn't start FiberManager loop
- *
- * @param loopController
- * @param options FiberManager options
- */
- explicit FiberManager(
- std::unique_ptr<LoopController> loopController,
- Options options = Options());
-
- /**
- * Initializes, but doesn't start FiberManager loop
- *
- * @param loopController
- * @param options FiberManager options
- * @tparam LocalT only local of this type may be stored on fibers.
- * Locals of other types will be considered thread-locals.
- */
- template <typename LocalT>
- FiberManager(
- LocalType<LocalT>,
- std::unique_ptr<LoopController> loopController,
- Options options = Options());
-
- ~FiberManager();
-
- /**
- * Controller access.
- */
- LoopController& loopController();
- const LoopController& loopController() const;
-
- /**
- * Keeps running ready tasks until the list of ready tasks is empty.
- *
- * @return True if there are any waiting tasks remaining.
- */
- bool loopUntilNoReady();
-
- /**
- * @return true if there are outstanding tasks.
- */
- bool hasTasks() const;
-
- /**
- * Sets exception callback which will be called if any of the tasks throws an
- * exception.
- *
- * @param ec
- */
- void setExceptionCallback(ExceptionCallback ec);
-
- /**
- * Add a new task to be executed. Must be called from FiberManager's thread.
- *
- * @param func Task functor; must have a signature of `void func()`.
- * The object will be destroyed once task execution is complete.
- */
- template <typename F>
- void addTask(F&& func);
-
- /**
- * Add a new task to be executed and return a future that will be set on
- * return from func. Must be called from FiberManager's thread.
- *
- * @param func Task functor; must have a signature of `void func()`.
- * The object will be destroyed once task execution is complete.
- */
- template <typename F>
- auto addTaskFuture(F&& func) -> folly::Future<
- typename folly::Unit::Lift<typename std::result_of<F()>::type>::type>;
- /**
- * Add a new task to be executed. Safe to call from other threads.
- *
- * @param func Task function; must have a signature of `void func()`.
- * The object will be destroyed once task execution is complete.
- */
- template <typename F>
- void addTaskRemote(F&& func);
-
- /**
- * Add a new task to be executed and return a future that will be set on
- * return from func. Safe to call from other threads.
- *
- * @param func Task function; must have a signature of `void func()`.
- * The object will be destroyed once task execution is complete.
- */
- template <typename F>
- auto addTaskRemoteFuture(F&& func) -> folly::Future<
- typename folly::Unit::Lift<typename std::result_of<F()>::type>::type>;
-
- // Executor interface calls addTaskRemote
- void add(folly::Func f) override {
- addTaskRemote(std::move(f));
- }
-
- /**
- * Add a new task. When the task is complete, execute finally(Try<Result>&&)
- * on the main context.
- *
- * @param func Task functor; must have a signature of `T func()` for some T.
- * @param finally Finally functor; must have a signature of
- * `void finally(Try<T>&&)` and will be passed
- * the result of func() (including the exception if occurred).
- */
- template <typename F, typename G>
- void addTaskFinally(F&& func, G&& finally);
-
- /**
- * If called from a fiber, immediately switches to the FiberManager's context
- * and runs func(), going back to the Fiber's context after completion.
- * Outside a fiber, just calls func() directly.
- *
- * @return value returned by func().
- */
- template <typename F>
- typename std::result_of<F()>::type runInMainContext(F&& func);
-
- /**
- * Returns a refference to a fiber-local context for given Fiber. Should be
- * always called with the same T for each fiber. Fiber-local context is lazily
- * default-constructed on first request.
- * When new task is scheduled via addTask / addTaskRemote from a fiber its
- * fiber-local context is copied into the new fiber.
- */
- template <typename T>
- T& local();
-
- template <typename T>
- static T& localThread();
-
- /**
- * @return How many fiber objects (and stacks) has this manager allocated.
- */
- size_t fibersAllocated() const;
-
- /**
- * @return How many of the allocated fiber objects are currently
- * in the free pool.
- */
- size_t fibersPoolSize() const;
-
- /**
- * return true if running activeFiber_ is not nullptr.
- */
- bool hasActiveFiber() const;
-
- /**
- * @return The currently running fiber or null if no fiber is executing.
- */
- Fiber* currentFiber() const {
- return currentFiber_;
- }
-
- /**
- * @return What was the most observed fiber stack usage (in bytes).
- */
- size_t stackHighWatermark() const;
-
- /**
- * Yield execution of the currently running fiber. Must only be called from a
- * fiber executing on this FiberManager. The calling fiber will be scheduled
- * when all other fibers have had a chance to run and the event loop is
- * serviced.
- */
- void yield();
-
- /**
- * Setup fibers execution observation/instrumentation. Fiber locals are
- * available to observer.
- *
- * @param observer Fiber's execution observer.
- */
- void setObserver(ExecutionObserver* observer);
-
- /**
- * @return Current observer for this FiberManager. Returns nullptr
- * if no observer has been set.
- */
- ExecutionObserver* getObserver();
-
- /**
- * Setup fibers preempt runner.
- */
- void setPreemptRunner(InlineFunctionRunner* preemptRunner);
-
- /**
- * Returns an estimate of the number of fibers which are waiting to run (does
- * not include fibers or tasks scheduled remotely).
- */
- size_t runQueueSize() const {
- return readyFibers_.size() + yieldedFibers_.size();
- }
-
- static FiberManager& getFiberManager();
- static FiberManager* getFiberManagerUnsafe();
-
- private:
- friend class Baton;
- friend class Fiber;
- template <typename F>
- struct AddTaskHelper;
- template <typename F, typename G>
- struct AddTaskFinallyHelper;
-
- struct RemoteTask {
- template <typename F>
- explicit RemoteTask(F&& f)
- : func(std::forward<F>(f)), rcontext(RequestContext::saveContext()) {}
- template <typename F>
- RemoteTask(F&& f, const Fiber::LocalData& localData_)
- : func(std::forward<F>(f)),
- localData(folly::make_unique<Fiber::LocalData>(localData_)),
- rcontext(RequestContext::saveContext()) {}
- folly::Function<void()> func;
- std::unique_ptr<Fiber::LocalData> localData;
- std::shared_ptr<RequestContext> rcontext;
- AtomicIntrusiveLinkedListHook<RemoteTask> nextRemoteTask;
- };
-
- intptr_t activateFiber(Fiber* fiber);
- intptr_t deactivateFiber(Fiber* fiber);
-
- typedef folly::IntrusiveList<Fiber, &Fiber::listHook_> FiberTailQueue;
- typedef folly::IntrusiveList<Fiber, &Fiber::globalListHook_>
- GlobalFiberTailQueue;
-
- Fiber* activeFiber_{nullptr}; /**< active fiber, nullptr on main context */
- /**
- * Same as active fiber, but also set for functions run from fiber on main
- * context.
- */
- Fiber* currentFiber_{nullptr};
-
- FiberTailQueue readyFibers_; /**< queue of fibers ready to be executed */
- FiberTailQueue yieldedFibers_; /**< queue of fibers which have yielded
- execution */
- FiberTailQueue fibersPool_; /**< pool of unitialized Fiber objects */
-
- GlobalFiberTailQueue allFibers_; /**< list of all Fiber objects owned */
-
- size_t fibersAllocated_{0}; /**< total number of fibers allocated */
- size_t fibersPoolSize_{0}; /**< total number of fibers in the free pool */
- size_t fibersActive_{0}; /**< number of running or blocked fibers */
- size_t fiberId_{0}; /**< id of last fiber used */
-
- /**
- * Maximum number of active fibers in the last period lasting
- * Options::fibersPoolResizePeriod milliseconds.
- */
- size_t maxFibersActiveLastPeriod_{0};
-
- FContext::ContextStruct mainContext_; /**< stores loop function context */
-
- std::unique_ptr<LoopController> loopController_;
- bool isLoopScheduled_{false}; /**< was the ready loop scheduled to run? */
-
- /**
- * When we are inside FiberManager loop this points to FiberManager. Otherwise
- * it's nullptr
- */
- static FOLLY_TLS FiberManager* currentFiberManager_;
-
- /**
- * Allocator used to allocate stack for Fibers in the pool.
- * Allocates stack on the stack of the main context.
- */
- GuardPageAllocator stackAllocator_;
-
- const Options options_; /**< FiberManager options */
-
- /**
- * Largest observed individual Fiber stack usage in bytes.
- */
- size_t stackHighWatermark_{0};
-
- /**
- * Schedules a loop with loopController (unless already scheduled before).
- */
- void ensureLoopScheduled();
-
- /**
- * @return An initialized Fiber object from the pool
- */
- Fiber* getFiber();
-
- /**
- * Sets local data for given fiber if all conditions are met.
- */
- void initLocalData(Fiber& fiber);
-
- /**
- * Function passed to the await call.
- */
- folly::Function<void(Fiber&)> awaitFunc_;
-
- /**
- * Function passed to the runInMainContext call.
- */
- folly::Function<void()> immediateFunc_;
-
- /**
- * Preempt runner.
- */
- InlineFunctionRunner* preemptRunner_{nullptr};
-
- /**
- * Fiber's execution observer.
- */
- ExecutionObserver* observer_{nullptr};
-
- ExceptionCallback exceptionCallback_; /**< task exception callback */
-
- folly::AtomicIntrusiveLinkedList<Fiber, &Fiber::nextRemoteReady_>
- remoteReadyQueue_;
-
- folly::AtomicIntrusiveLinkedList<RemoteTask, &RemoteTask::nextRemoteTask>
- remoteTaskQueue_;
-
- std::shared_ptr<TimeoutController> timeoutManager_;
-
- struct FibersPoolResizer {
- explicit FibersPoolResizer(FiberManager& fm) : fiberManager_(fm) {}
- void operator()();
-
- private:
- FiberManager& fiberManager_;
- };
-
- FibersPoolResizer fibersPoolResizer_;
- bool fibersPoolResizerScheduled_{false};
-
- void doFibersPoolResizing();
-
- /**
- * Only local of this type will be available for fibers.
- */
- std::type_index localType_;
-
- void runReadyFiber(Fiber* fiber);
- void remoteReadyInsert(Fiber* fiber);
-
-#ifdef FOLLY_SANITIZE_ADDRESS
-
- // These methods notify ASAN when a fiber is entered/exited so that ASAN can
- // find the right stack extents when it needs to poison/unpoison the stack.
-
- void registerFiberActivationWithAsan(Fiber* fiber);
- void registerFiberDeactivationWithAsan(Fiber* fiber);
- void unpoisonFiberStack(const Fiber* fiber);
-
-#endif // FOLLY_SANITIZE_ADDRESS
-
-#ifndef _WIN32
- bool alternateSignalStackRegistered_{false};
-
- void registerAlternateSignalStack();
-#endif
-};
-
-/**
- * @return true iff we are running in a fiber's context
- */
-inline bool onFiber() {
- auto fm = FiberManager::getFiberManagerUnsafe();
- return fm ? fm->hasActiveFiber() : false;
-}
-
-/**
- * Add a new task to be executed.
- *
- * @param func Task functor; must have a signature of `void func()`.
- * The object will be destroyed once task execution is complete.
- */
-template <typename F>
-inline void addTask(F&& func) {
- return FiberManager::getFiberManager().addTask(std::forward<F>(func));
-}
-
-/**
- * Add a new task. When the task is complete, execute finally(Try<Result>&&)
- * on the main context.
- * Task functor is run and destroyed on the fiber context.
- * Finally functor is run and destroyed on the main context.
- *
- * @param func Task functor; must have a signature of `T func()` for some T.
- * @param finally Finally functor; must have a signature of
- * `void finally(Try<T>&&)` and will be passed
- * the result of func() (including the exception if occurred).
- */
-template <typename F, typename G>
-inline void addTaskFinally(F&& func, G&& finally) {
- return FiberManager::getFiberManager().addTaskFinally(
- std::forward<F>(func), std::forward<G>(finally));
-}
-
-/**
- * Blocks task execution until given promise is fulfilled.
- *
- * Calls function passing in a Promise<T>, which has to be fulfilled.
- *
- * @return data which was used to fulfill the promise.
- */
-template <typename F>
-typename FirstArgOf<F>::type::value_type inline await(F&& func);
-
-/**
- * If called from a fiber, immediately switches to the FiberManager's context
- * and runs func(), going back to the Fiber's context after completion.
- * Outside a fiber, just calls func() directly.
- *
- * @return value returned by func().
- */
-template <typename F>
-typename std::result_of<F()>::type inline runInMainContext(F&& func) {
- auto fm = FiberManager::getFiberManagerUnsafe();
- if (UNLIKELY(fm == nullptr)) {
- return func();
- }
- return fm->runInMainContext(std::forward<F>(func));
-}
-
-/**
- * Returns a refference to a fiber-local context for given Fiber. Should be
- * always called with the same T for each fiber. Fiber-local context is lazily
- * default-constructed on first request.
- * When new task is scheduled via addTask / addTaskRemote from a fiber its
- * fiber-local context is copied into the new fiber.
- */
-template <typename T>
-T& local() {
- auto fm = FiberManager::getFiberManagerUnsafe();
- if (fm) {
- return fm->local<T>();
- }
- return FiberManager::localThread<T>();
-}
-
-inline void yield() {
- auto fm = FiberManager::getFiberManagerUnsafe();
- if (fm) {
- fm->yield();
- } else {
- std::this_thread::yield();
- }
-}
-}
-}
-
+#include <folly/fibers/FiberManagerInternal.h>
#include <folly/fibers/FiberManager-inl.h>
--- /dev/null
+/*
+ * Copyright 2016 Facebook, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#pragma once
+
+#include <folly/futures/Promise.h>
+
+namespace folly {
+namespace fibers {
+
+template <typename F>
+auto FiberManager::addTaskFuture(F&& func) -> folly::Future<
+ typename folly::Unit::Lift<typename std::result_of<F()>::type>::type> {
+ using T = typename std::result_of<F()>::type;
+ using FutureT = typename folly::Unit::Lift<T>::type;
+
+ folly::Promise<FutureT> p;
+ auto f = p.getFuture();
+ addTaskFinally(
+ [func = std::forward<F>(func)]() mutable { return func(); },
+ [p = std::move(p)](folly::Try<T> && t) mutable {
+ p.setTry(std::move(t));
+ });
+ return f;
+}
+
+template <typename F>
+auto FiberManager::addTaskRemoteFuture(F&& func) -> folly::Future<
+ typename folly::Unit::Lift<typename std::result_of<F()>::type>::type> {
+ folly::Promise<
+ typename folly::Unit::Lift<typename std::result_of<F()>::type>::type>
+ p;
+ auto f = p.getFuture();
+ addTaskRemote(
+ [ p = std::move(p), func = std::forward<F>(func), this ]() mutable {
+ auto t = folly::makeTryWith(std::forward<F>(func));
+ runInMainContext([&]() { p.setTry(std::move(t)); });
+ });
+ return f;
+}
+}
+}
--- /dev/null
+/*
+ * Copyright 2016 Facebook, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#pragma once
+
+#include <cassert>
+
+#include <folly/CPortability.h>
+#include <folly/Memory.h>
+#include <folly/Optional.h>
+#include <folly/Portability.h>
+#include <folly/ScopeGuard.h>
+#ifdef __APPLE__
+#include <folly/ThreadLocal.h>
+#endif
+#include <folly/fibers/Baton.h>
+#include <folly/fibers/Fiber.h>
+#include <folly/fibers/LoopController.h>
+#include <folly/fibers/Promise.h>
+#include <folly/Try.h>
+
+namespace folly {
+namespace fibers {
+
+namespace {
+
+inline FiberManager::Options preprocessOptions(FiberManager::Options opts) {
+#ifdef FOLLY_SANITIZE_ADDRESS
+ /* ASAN needs a lot of extra stack space.
+ 16x is a conservative estimate, 8x also worked with tests
+ where it mattered. Note that overallocating here does not necessarily
+ increase RSS, since unused memory is pretty much free. */
+ opts.stackSize *= 16;
+#endif
+ return opts;
+}
+
+} // anonymous
+
+inline void FiberManager::ensureLoopScheduled() {
+ if (isLoopScheduled_) {
+ return;
+ }
+
+ isLoopScheduled_ = true;
+ loopController_->schedule();
+}
+
+inline intptr_t FiberManager::activateFiber(Fiber* fiber) {
+ DCHECK_EQ(activeFiber_, (Fiber*)nullptr);
+
+#ifdef FOLLY_SANITIZE_ADDRESS
+ registerFiberActivationWithAsan(fiber);
+#endif
+
+ activeFiber_ = fiber;
+ return jumpContext(&mainContext_, &fiber->fcontext_, fiber->data_);
+}
+
+inline intptr_t FiberManager::deactivateFiber(Fiber* fiber) {
+ DCHECK_EQ(activeFiber_, fiber);
+
+#ifdef FOLLY_SANITIZE_ADDRESS
+ registerFiberDeactivationWithAsan(fiber);
+#endif
+
+ activeFiber_ = nullptr;
+ return jumpContext(&fiber->fcontext_, &mainContext_, 0);
+}
+
+inline void FiberManager::runReadyFiber(Fiber* fiber) {
+ SCOPE_EXIT {
+ assert(currentFiber_ == nullptr);
+ assert(activeFiber_ == nullptr);
+ };
+
+ assert(
+ fiber->state_ == Fiber::NOT_STARTED ||
+ fiber->state_ == Fiber::READY_TO_RUN);
+ currentFiber_ = fiber;
+ fiber->rcontext_ = RequestContext::setContext(std::move(fiber->rcontext_));
+ if (observer_) {
+ observer_->starting(reinterpret_cast<uintptr_t>(fiber));
+ }
+
+ while (fiber->state_ == Fiber::NOT_STARTED ||
+ fiber->state_ == Fiber::READY_TO_RUN) {
+ activateFiber(fiber);
+ if (fiber->state_ == Fiber::AWAITING_IMMEDIATE) {
+ try {
+ immediateFunc_();
+ } catch (...) {
+ exceptionCallback_(std::current_exception(), "running immediateFunc_");
+ }
+ immediateFunc_ = nullptr;
+ fiber->state_ = Fiber::READY_TO_RUN;
+ }
+ }
+
+ if (fiber->state_ == Fiber::AWAITING) {
+ awaitFunc_(*fiber);
+ awaitFunc_ = nullptr;
+ if (observer_) {
+ observer_->stopped(reinterpret_cast<uintptr_t>(fiber));
+ }
+ currentFiber_ = nullptr;
+ fiber->rcontext_ = RequestContext::setContext(std::move(fiber->rcontext_));
+ } else if (fiber->state_ == Fiber::INVALID) {
+ assert(fibersActive_ > 0);
+ --fibersActive_;
+ // Making sure that task functor is deleted once task is complete.
+ // NOTE: we must do it on main context, as the fiber is not
+ // running at this point.
+ fiber->func_ = nullptr;
+ fiber->resultFunc_ = nullptr;
+ if (fiber->finallyFunc_) {
+ try {
+ fiber->finallyFunc_();
+ } catch (...) {
+ exceptionCallback_(std::current_exception(), "running finallyFunc_");
+ }
+ fiber->finallyFunc_ = nullptr;
+ }
+ // Make sure LocalData is not accessible from its destructor
+ if (observer_) {
+ observer_->stopped(reinterpret_cast<uintptr_t>(fiber));
+ }
+ currentFiber_ = nullptr;
+ fiber->rcontext_ = RequestContext::setContext(std::move(fiber->rcontext_));
+ fiber->localData_.reset();
+ fiber->rcontext_.reset();
+
+ if (fibersPoolSize_ < options_.maxFibersPoolSize ||
+ options_.fibersPoolResizePeriodMs > 0) {
+ fibersPool_.push_front(*fiber);
+ ++fibersPoolSize_;
+ } else {
+ delete fiber;
+ assert(fibersAllocated_ > 0);
+ --fibersAllocated_;
+ }
+ } else if (fiber->state_ == Fiber::YIELDED) {
+ if (observer_) {
+ observer_->stopped(reinterpret_cast<uintptr_t>(fiber));
+ }
+ currentFiber_ = nullptr;
+ fiber->rcontext_ = RequestContext::setContext(std::move(fiber->rcontext_));
+ fiber->state_ = Fiber::READY_TO_RUN;
+ yieldedFibers_.push_back(*fiber);
+ }
+}
+
+inline bool FiberManager::loopUntilNoReady() {
+#ifndef _WIN32
+ if (UNLIKELY(!alternateSignalStackRegistered_)) {
+ registerAlternateSignalStack();
+ }
+#endif
+
+ // Support nested FiberManagers
+ auto originalFiberManager = this;
+ std::swap(currentFiberManager_, originalFiberManager);
+
+ SCOPE_EXIT {
+ isLoopScheduled_ = false;
+ if (!readyFibers_.empty()) {
+ ensureLoopScheduled();
+ }
+ std::swap(currentFiberManager_, originalFiberManager);
+ CHECK_EQ(this, originalFiberManager);
+ };
+
+ bool hadRemoteFiber = true;
+ while (hadRemoteFiber) {
+ hadRemoteFiber = false;
+
+ while (!readyFibers_.empty()) {
+ auto& fiber = readyFibers_.front();
+ readyFibers_.pop_front();
+ runReadyFiber(&fiber);
+ }
+
+ remoteReadyQueue_.sweep([this, &hadRemoteFiber](Fiber* fiber) {
+ runReadyFiber(fiber);
+ hadRemoteFiber = true;
+ });
+
+ remoteTaskQueue_.sweep([this, &hadRemoteFiber](RemoteTask* taskPtr) {
+ std::unique_ptr<RemoteTask> task(taskPtr);
+ auto fiber = getFiber();
+ if (task->localData) {
+ fiber->localData_ = *task->localData;
+ }
+ fiber->rcontext_ = std::move(task->rcontext);
+
+ fiber->setFunction(std::move(task->func));
+ fiber->data_ = reinterpret_cast<intptr_t>(fiber);
+ if (observer_) {
+ observer_->runnable(reinterpret_cast<uintptr_t>(fiber));
+ }
+ runReadyFiber(fiber);
+ hadRemoteFiber = true;
+ });
+ }
+
+ if (observer_) {
+ for (auto& yielded : yieldedFibers_) {
+ observer_->runnable(reinterpret_cast<uintptr_t>(&yielded));
+ }
+ }
+ readyFibers_.splice(readyFibers_.end(), yieldedFibers_);
+
+ return fibersActive_ > 0;
+}
+
+// We need this to be in a struct, not inlined in addTask, because clang crashes
+// otherwise.
+template <typename F>
+struct FiberManager::AddTaskHelper {
+ class Func;
+
+ static constexpr bool allocateInBuffer =
+ sizeof(Func) <= Fiber::kUserBufferSize;
+
+ class Func {
+ public:
+ Func(F&& func, FiberManager& fm) : func_(std::forward<F>(func)), fm_(fm) {}
+
+ void operator()() {
+ try {
+ func_();
+ } catch (...) {
+ fm_.exceptionCallback_(
+ std::current_exception(), "running Func functor");
+ }
+ if (allocateInBuffer) {
+ this->~Func();
+ } else {
+ delete this;
+ }
+ }
+
+ private:
+ F func_;
+ FiberManager& fm_;
+ };
+};
+
+template <typename F>
+void FiberManager::addTask(F&& func) {
+ typedef AddTaskHelper<F> Helper;
+
+ auto fiber = getFiber();
+ initLocalData(*fiber);
+
+ if (Helper::allocateInBuffer) {
+ auto funcLoc = static_cast<typename Helper::Func*>(fiber->getUserBuffer());
+ new (funcLoc) typename Helper::Func(std::forward<F>(func), *this);
+
+ fiber->setFunction(std::ref(*funcLoc));
+ } else {
+ auto funcLoc = new typename Helper::Func(std::forward<F>(func), *this);
+
+ fiber->setFunction(std::ref(*funcLoc));
+ }
+
+ fiber->data_ = reinterpret_cast<intptr_t>(fiber);
+ readyFibers_.push_back(*fiber);
+ if (observer_) {
+ observer_->runnable(reinterpret_cast<uintptr_t>(fiber));
+ }
+
+ ensureLoopScheduled();
+}
+
+template <typename F>
+void FiberManager::addTaskRemote(F&& func) {
+ auto task = [&]() {
+ auto currentFm = getFiberManagerUnsafe();
+ if (currentFm && currentFm->currentFiber_ &&
+ currentFm->localType_ == localType_) {
+ return folly::make_unique<RemoteTask>(
+ std::forward<F>(func), currentFm->currentFiber_->localData_);
+ }
+ return folly::make_unique<RemoteTask>(std::forward<F>(func));
+ }();
+ auto insertHead = [&]() {
+ return remoteTaskQueue_.insertHead(task.release());
+ };
+ loopController_->scheduleThreadSafe(std::ref(insertHead));
+}
+
+template <typename X>
+struct IsRvalueRefTry {
+ static const bool value = false;
+};
+template <typename T>
+struct IsRvalueRefTry<folly::Try<T>&&> {
+ static const bool value = true;
+};
+
+// We need this to be in a struct, not inlined in addTaskFinally, because clang
+// crashes otherwise.
+template <typename F, typename G>
+struct FiberManager::AddTaskFinallyHelper {
+ class Func;
+
+ typedef typename std::result_of<F()>::type Result;
+
+ class Finally {
+ public:
+ Finally(G finally, FiberManager& fm)
+ : finally_(std::move(finally)), fm_(fm) {}
+
+ void operator()() {
+ try {
+ finally_(std::move(*result_));
+ } catch (...) {
+ fm_.exceptionCallback_(
+ std::current_exception(), "running Finally functor");
+ }
+
+ if (allocateInBuffer) {
+ this->~Finally();
+ } else {
+ delete this;
+ }
+ }
+
+ private:
+ friend class Func;
+
+ G finally_;
+ folly::Optional<folly::Try<Result>> result_;
+ FiberManager& fm_;
+ };
+
+ class Func {
+ public:
+ Func(F func, Finally& finally)
+ : func_(std::move(func)), result_(finally.result_) {}
+
+ void operator()() {
+ result_ = folly::makeTryWith(std::move(func_));
+
+ if (allocateInBuffer) {
+ this->~Func();
+ } else {
+ delete this;
+ }
+ }
+
+ private:
+ F func_;
+ folly::Optional<folly::Try<Result>>& result_;
+ };
+
+ static constexpr bool allocateInBuffer =
+ sizeof(Func) + sizeof(Finally) <= Fiber::kUserBufferSize;
+};
+
+template <typename F, typename G>
+void FiberManager::addTaskFinally(F&& func, G&& finally) {
+ typedef typename std::result_of<F()>::type Result;
+
+ static_assert(
+ IsRvalueRefTry<typename FirstArgOf<G>::type>::value,
+ "finally(arg): arg must be Try<T>&&");
+ static_assert(
+ std::is_convertible<
+ Result,
+ typename std::remove_reference<
+ typename FirstArgOf<G>::type>::type::element_type>::value,
+ "finally(Try<T>&&): T must be convertible from func()'s return type");
+
+ auto fiber = getFiber();
+ initLocalData(*fiber);
+
+ typedef AddTaskFinallyHelper<
+ typename std::decay<F>::type,
+ typename std::decay<G>::type>
+ Helper;
+
+ if (Helper::allocateInBuffer) {
+ auto funcLoc = static_cast<typename Helper::Func*>(fiber->getUserBuffer());
+ auto finallyLoc =
+ static_cast<typename Helper::Finally*>(static_cast<void*>(funcLoc + 1));
+
+ new (finallyLoc) typename Helper::Finally(std::forward<G>(finally), *this);
+ new (funcLoc) typename Helper::Func(std::forward<F>(func), *finallyLoc);
+
+ fiber->setFunctionFinally(std::ref(*funcLoc), std::ref(*finallyLoc));
+ } else {
+ auto finallyLoc =
+ new typename Helper::Finally(std::forward<G>(finally), *this);
+ auto funcLoc =
+ new typename Helper::Func(std::forward<F>(func), *finallyLoc);
+
+ fiber->setFunctionFinally(std::ref(*funcLoc), std::ref(*finallyLoc));
+ }
+
+ fiber->data_ = reinterpret_cast<intptr_t>(fiber);
+ readyFibers_.push_back(*fiber);
+ if (observer_) {
+ observer_->runnable(reinterpret_cast<uintptr_t>(fiber));
+ }
+
+ ensureLoopScheduled();
+}
+
+template <typename F>
+typename std::result_of<F()>::type FiberManager::runInMainContext(F&& func) {
+ if (UNLIKELY(activeFiber_ == nullptr)) {
+ return func();
+ }
+
+ typedef typename std::result_of<F()>::type Result;
+
+ folly::Try<Result> result;
+ auto f = [&func, &result]() mutable {
+ result = folly::makeTryWith(std::forward<F>(func));
+ };
+
+ immediateFunc_ = std::ref(f);
+ activeFiber_->preempt(Fiber::AWAITING_IMMEDIATE);
+
+ return std::move(result).value();
+}
+
+inline FiberManager& FiberManager::getFiberManager() {
+ assert(currentFiberManager_ != nullptr);
+ return *currentFiberManager_;
+}
+
+inline FiberManager* FiberManager::getFiberManagerUnsafe() {
+ return currentFiberManager_;
+}
+
+inline bool FiberManager::hasActiveFiber() const {
+ return activeFiber_ != nullptr;
+}
+
+inline void FiberManager::yield() {
+ assert(currentFiberManager_ == this);
+ assert(activeFiber_ != nullptr);
+ assert(activeFiber_->state_ == Fiber::RUNNING);
+ activeFiber_->preempt(Fiber::YIELDED);
+}
+
+template <typename T>
+T& FiberManager::local() {
+ if (std::type_index(typeid(T)) == localType_ && currentFiber_) {
+ return currentFiber_->localData_.get<T>();
+ }
+ return localThread<T>();
+}
+
+template <typename T>
+T& FiberManager::localThread() {
+#ifndef __APPLE__
+ static thread_local T t;
+ return t;
+#else // osx doesn't support thread_local
+ static ThreadLocal<T> t;
+ return *t;
+#endif
+}
+
+inline void FiberManager::initLocalData(Fiber& fiber) {
+ auto fm = getFiberManagerUnsafe();
+ if (fm && fm->currentFiber_ && fm->localType_ == localType_) {
+ fiber.localData_ = fm->currentFiber_->localData_;
+ }
+ fiber.rcontext_ = RequestContext::saveContext();
+}
+
+template <typename LocalT>
+FiberManager::FiberManager(
+ LocalType<LocalT>,
+ std::unique_ptr<LoopController> loopController__,
+ Options options)
+ : loopController_(std::move(loopController__)),
+ stackAllocator_(options.useGuardPages),
+ options_(preprocessOptions(std::move(options))),
+ exceptionCallback_([](std::exception_ptr eptr, std::string context) {
+ try {
+ std::rethrow_exception(eptr);
+ } catch (const std::exception& e) {
+ LOG(DFATAL) << "Exception " << typeid(e).name() << " with message '"
+ << e.what() << "' was thrown in "
+ << "FiberManager with context '" << context << "'";
+ } catch (...) {
+ LOG(DFATAL) << "Unknown exception was thrown in FiberManager with "
+ << "context '" << context << "'";
+ }
+ }),
+ timeoutManager_(std::make_shared<TimeoutController>(*loopController_)),
+ fibersPoolResizer_(*this),
+ localType_(typeid(LocalT)) {
+ loopController_->setFiberManager(this);
+}
+
+template <typename F>
+typename FirstArgOf<F>::type::value_type inline await(F&& func) {
+ typedef typename FirstArgOf<F>::type::value_type Result;
+ typedef typename FirstArgOf<F>::type::baton_type BatonT;
+
+ return Promise<Result, BatonT>::await(std::forward<F>(func));
+}
+}
+}
--- /dev/null
+/*
+ * Copyright 2016 Facebook, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#pragma once
+
+#include <functional>
+#include <memory>
+#include <queue>
+#include <thread>
+#include <type_traits>
+#include <typeindex>
+#include <unordered_set>
+#include <vector>
+
+#include <folly/AtomicIntrusiveLinkedList.h>
+#include <folly/Executor.h>
+#include <folly/IntrusiveList.h>
+#include <folly/Likely.h>
+#include <folly/Try.h>
+#include <folly/io/async/Request.h>
+
+#include <folly/experimental/ExecutionObserver.h>
+#include <folly/fibers/BoostContextCompatibility.h>
+#include <folly/fibers/Fiber.h>
+#include <folly/fibers/GuardPageAllocator.h>
+#include <folly/fibers/TimeoutController.h>
+#include <folly/fibers/traits.h>
+
+namespace folly {
+
+template <class T>
+class Future;
+
+namespace fibers {
+
+class Baton;
+class Fiber;
+class LoopController;
+class TimeoutController;
+
+template <typename T>
+class LocalType {};
+
+class InlineFunctionRunner {
+ public:
+ virtual ~InlineFunctionRunner() {}
+
+ /**
+ * func must be executed inline and only once.
+ */
+ virtual void run(folly::Function<void()> func) = 0;
+};
+
+/**
+ * @class FiberManager
+ * @brief Single-threaded task execution engine.
+ *
+ * FiberManager allows semi-parallel task execution on the same thread. Each
+ * task can notify FiberManager that it is blocked on something (via await())
+ * call. This will pause execution of this task and it will be resumed only
+ * when it is unblocked (via setData()).
+ */
+class FiberManager : public ::folly::Executor {
+ public:
+ struct Options {
+ static constexpr size_t kDefaultStackSize{16 * 1024};
+
+ /**
+ * Maximum stack size for fibers which will be used for executing all the
+ * tasks.
+ */
+ size_t stackSize{kDefaultStackSize};
+
+ /**
+ * Record exact amount of stack used.
+ *
+ * This is fairly expensive: we fill each newly allocated stack
+ * with some known value and find the boundary of unused stack
+ * with linear search every time we surrender the stack back to fibersPool.
+ * 0 disables stack recording.
+ */
+ size_t recordStackEvery{0};
+
+ /**
+ * Keep at most this many free fibers in the pool.
+ * This way the total number of fibers in the system is always bounded
+ * by the number of active fibers + maxFibersPoolSize.
+ */
+ size_t maxFibersPoolSize{1000};
+
+ /**
+ * Protect limited amount of fiber stacks with guard pages.
+ */
+ bool useGuardPages{true};
+
+ /**
+ * Free unnecessary fibers in the fibers pool every fibersPoolResizePeriodMs
+ * milliseconds. If value is 0, periodic resizing of the fibers pool is
+ * disabled.
+ */
+ uint32_t fibersPoolResizePeriodMs{0};
+
+ constexpr Options() {}
+ };
+
+ using ExceptionCallback =
+ folly::Function<void(std::exception_ptr, std::string)>;
+
+ FiberManager(const FiberManager&) = delete;
+ FiberManager& operator=(const FiberManager&) = delete;
+
+ /**
+ * Initializes, but doesn't start FiberManager loop
+ *
+ * @param loopController
+ * @param options FiberManager options
+ */
+ explicit FiberManager(
+ std::unique_ptr<LoopController> loopController,
+ Options options = Options());
+
+ /**
+ * Initializes, but doesn't start FiberManager loop
+ *
+ * @param loopController
+ * @param options FiberManager options
+ * @tparam LocalT only local of this type may be stored on fibers.
+ * Locals of other types will be considered thread-locals.
+ */
+ template <typename LocalT>
+ FiberManager(
+ LocalType<LocalT>,
+ std::unique_ptr<LoopController> loopController,
+ Options options = Options());
+
+ ~FiberManager();
+
+ /**
+ * Controller access.
+ */
+ LoopController& loopController();
+ const LoopController& loopController() const;
+
+ /**
+ * Keeps running ready tasks until the list of ready tasks is empty.
+ *
+ * @return True if there are any waiting tasks remaining.
+ */
+ bool loopUntilNoReady();
+
+ /**
+ * @return true if there are outstanding tasks.
+ */
+ bool hasTasks() const;
+
+ /**
+ * Sets exception callback which will be called if any of the tasks throws an
+ * exception.
+ *
+ * @param ec
+ */
+ void setExceptionCallback(ExceptionCallback ec);
+
+ /**
+ * Add a new task to be executed. Must be called from FiberManager's thread.
+ *
+ * @param func Task functor; must have a signature of `void func()`.
+ * The object will be destroyed once task execution is complete.
+ */
+ template <typename F>
+ void addTask(F&& func);
+
+ /**
+ * Add a new task to be executed and return a future that will be set on
+ * return from func. Must be called from FiberManager's thread.
+ *
+ * @param func Task functor; must have a signature of `void func()`.
+ * The object will be destroyed once task execution is complete.
+ */
+ template <typename F>
+ auto addTaskFuture(F&& func) -> folly::Future<
+ typename folly::Unit::Lift<typename std::result_of<F()>::type>::type>;
+ /**
+ * Add a new task to be executed. Safe to call from other threads.
+ *
+ * @param func Task function; must have a signature of `void func()`.
+ * The object will be destroyed once task execution is complete.
+ */
+ template <typename F>
+ void addTaskRemote(F&& func);
+
+ /**
+ * Add a new task to be executed and return a future that will be set on
+ * return from func. Safe to call from other threads.
+ *
+ * @param func Task function; must have a signature of `void func()`.
+ * The object will be destroyed once task execution is complete.
+ */
+ template <typename F>
+ auto addTaskRemoteFuture(F&& func) -> folly::Future<
+ typename folly::Unit::Lift<typename std::result_of<F()>::type>::type>;
+
+ // Executor interface calls addTaskRemote
+ void add(folly::Func f) override {
+ addTaskRemote(std::move(f));
+ }
+
+ /**
+ * Add a new task. When the task is complete, execute finally(Try<Result>&&)
+ * on the main context.
+ *
+ * @param func Task functor; must have a signature of `T func()` for some T.
+ * @param finally Finally functor; must have a signature of
+ * `void finally(Try<T>&&)` and will be passed
+ * the result of func() (including the exception if occurred).
+ */
+ template <typename F, typename G>
+ void addTaskFinally(F&& func, G&& finally);
+
+ /**
+ * If called from a fiber, immediately switches to the FiberManager's context
+ * and runs func(), going back to the Fiber's context after completion.
+ * Outside a fiber, just calls func() directly.
+ *
+ * @return value returned by func().
+ */
+ template <typename F>
+ typename std::result_of<F()>::type runInMainContext(F&& func);
+
+ /**
+ * Returns a refference to a fiber-local context for given Fiber. Should be
+ * always called with the same T for each fiber. Fiber-local context is lazily
+ * default-constructed on first request.
+ * When new task is scheduled via addTask / addTaskRemote from a fiber its
+ * fiber-local context is copied into the new fiber.
+ */
+ template <typename T>
+ T& local();
+
+ template <typename T>
+ static T& localThread();
+
+ /**
+ * @return How many fiber objects (and stacks) has this manager allocated.
+ */
+ size_t fibersAllocated() const;
+
+ /**
+ * @return How many of the allocated fiber objects are currently
+ * in the free pool.
+ */
+ size_t fibersPoolSize() const;
+
+ /**
+ * return true if running activeFiber_ is not nullptr.
+ */
+ bool hasActiveFiber() const;
+
+ /**
+ * @return The currently running fiber or null if no fiber is executing.
+ */
+ Fiber* currentFiber() const {
+ return currentFiber_;
+ }
+
+ /**
+ * @return What was the most observed fiber stack usage (in bytes).
+ */
+ size_t stackHighWatermark() const;
+
+ /**
+ * Yield execution of the currently running fiber. Must only be called from a
+ * fiber executing on this FiberManager. The calling fiber will be scheduled
+ * when all other fibers have had a chance to run and the event loop is
+ * serviced.
+ */
+ void yield();
+
+ /**
+ * Setup fibers execution observation/instrumentation. Fiber locals are
+ * available to observer.
+ *
+ * @param observer Fiber's execution observer.
+ */
+ void setObserver(ExecutionObserver* observer);
+
+ /**
+ * @return Current observer for this FiberManager. Returns nullptr
+ * if no observer has been set.
+ */
+ ExecutionObserver* getObserver();
+
+ /**
+ * Setup fibers preempt runner.
+ */
+ void setPreemptRunner(InlineFunctionRunner* preemptRunner);
+
+ /**
+ * Returns an estimate of the number of fibers which are waiting to run (does
+ * not include fibers or tasks scheduled remotely).
+ */
+ size_t runQueueSize() const {
+ return readyFibers_.size() + yieldedFibers_.size();
+ }
+
+ static FiberManager& getFiberManager();
+ static FiberManager* getFiberManagerUnsafe();
+
+ private:
+ friend class Baton;
+ friend class Fiber;
+ template <typename F>
+ struct AddTaskHelper;
+ template <typename F, typename G>
+ struct AddTaskFinallyHelper;
+
+ struct RemoteTask {
+ template <typename F>
+ explicit RemoteTask(F&& f)
+ : func(std::forward<F>(f)), rcontext(RequestContext::saveContext()) {}
+ template <typename F>
+ RemoteTask(F&& f, const Fiber::LocalData& localData_)
+ : func(std::forward<F>(f)),
+ localData(folly::make_unique<Fiber::LocalData>(localData_)),
+ rcontext(RequestContext::saveContext()) {}
+ folly::Function<void()> func;
+ std::unique_ptr<Fiber::LocalData> localData;
+ std::shared_ptr<RequestContext> rcontext;
+ AtomicIntrusiveLinkedListHook<RemoteTask> nextRemoteTask;
+ };
+
+ intptr_t activateFiber(Fiber* fiber);
+ intptr_t deactivateFiber(Fiber* fiber);
+
+ typedef folly::IntrusiveList<Fiber, &Fiber::listHook_> FiberTailQueue;
+ typedef folly::IntrusiveList<Fiber, &Fiber::globalListHook_>
+ GlobalFiberTailQueue;
+
+ Fiber* activeFiber_{nullptr}; /**< active fiber, nullptr on main context */
+ /**
+ * Same as active fiber, but also set for functions run from fiber on main
+ * context.
+ */
+ Fiber* currentFiber_{nullptr};
+
+ FiberTailQueue readyFibers_; /**< queue of fibers ready to be executed */
+ FiberTailQueue yieldedFibers_; /**< queue of fibers which have yielded
+ execution */
+ FiberTailQueue fibersPool_; /**< pool of unitialized Fiber objects */
+
+ GlobalFiberTailQueue allFibers_; /**< list of all Fiber objects owned */
+
+ size_t fibersAllocated_{0}; /**< total number of fibers allocated */
+ size_t fibersPoolSize_{0}; /**< total number of fibers in the free pool */
+ size_t fibersActive_{0}; /**< number of running or blocked fibers */
+ size_t fiberId_{0}; /**< id of last fiber used */
+
+ /**
+ * Maximum number of active fibers in the last period lasting
+ * Options::fibersPoolResizePeriod milliseconds.
+ */
+ size_t maxFibersActiveLastPeriod_{0};
+
+ FContext::ContextStruct mainContext_; /**< stores loop function context */
+
+ std::unique_ptr<LoopController> loopController_;
+ bool isLoopScheduled_{false}; /**< was the ready loop scheduled to run? */
+
+ /**
+ * When we are inside FiberManager loop this points to FiberManager. Otherwise
+ * it's nullptr
+ */
+ static FOLLY_TLS FiberManager* currentFiberManager_;
+
+ /**
+ * Allocator used to allocate stack for Fibers in the pool.
+ * Allocates stack on the stack of the main context.
+ */
+ GuardPageAllocator stackAllocator_;
+
+ const Options options_; /**< FiberManager options */
+
+ /**
+ * Largest observed individual Fiber stack usage in bytes.
+ */
+ size_t stackHighWatermark_{0};
+
+ /**
+ * Schedules a loop with loopController (unless already scheduled before).
+ */
+ void ensureLoopScheduled();
+
+ /**
+ * @return An initialized Fiber object from the pool
+ */
+ Fiber* getFiber();
+
+ /**
+ * Sets local data for given fiber if all conditions are met.
+ */
+ void initLocalData(Fiber& fiber);
+
+ /**
+ * Function passed to the await call.
+ */
+ folly::Function<void(Fiber&)> awaitFunc_;
+
+ /**
+ * Function passed to the runInMainContext call.
+ */
+ folly::Function<void()> immediateFunc_;
+
+ /**
+ * Preempt runner.
+ */
+ InlineFunctionRunner* preemptRunner_{nullptr};
+
+ /**
+ * Fiber's execution observer.
+ */
+ ExecutionObserver* observer_{nullptr};
+
+ ExceptionCallback exceptionCallback_; /**< task exception callback */
+
+ folly::AtomicIntrusiveLinkedList<Fiber, &Fiber::nextRemoteReady_>
+ remoteReadyQueue_;
+
+ folly::AtomicIntrusiveLinkedList<RemoteTask, &RemoteTask::nextRemoteTask>
+ remoteTaskQueue_;
+
+ std::shared_ptr<TimeoutController> timeoutManager_;
+
+ struct FibersPoolResizer {
+ explicit FibersPoolResizer(FiberManager& fm) : fiberManager_(fm) {}
+ void operator()();
+
+ private:
+ FiberManager& fiberManager_;
+ };
+
+ FibersPoolResizer fibersPoolResizer_;
+ bool fibersPoolResizerScheduled_{false};
+
+ void doFibersPoolResizing();
+
+ /**
+ * Only local of this type will be available for fibers.
+ */
+ std::type_index localType_;
+
+ void runReadyFiber(Fiber* fiber);
+ void remoteReadyInsert(Fiber* fiber);
+
+#ifdef FOLLY_SANITIZE_ADDRESS
+
+ // These methods notify ASAN when a fiber is entered/exited so that ASAN can
+ // find the right stack extents when it needs to poison/unpoison the stack.
+
+ void registerFiberActivationWithAsan(Fiber* fiber);
+ void registerFiberDeactivationWithAsan(Fiber* fiber);
+ void unpoisonFiberStack(const Fiber* fiber);
+
+#endif // FOLLY_SANITIZE_ADDRESS
+
+#ifndef _WIN32
+ bool alternateSignalStackRegistered_{false};
+
+ void registerAlternateSignalStack();
+#endif
+};
+
+/**
+ * @return true iff we are running in a fiber's context
+ */
+inline bool onFiber() {
+ auto fm = FiberManager::getFiberManagerUnsafe();
+ return fm ? fm->hasActiveFiber() : false;
+}
+
+/**
+ * Add a new task to be executed.
+ *
+ * @param func Task functor; must have a signature of `void func()`.
+ * The object will be destroyed once task execution is complete.
+ */
+template <typename F>
+inline void addTask(F&& func) {
+ return FiberManager::getFiberManager().addTask(std::forward<F>(func));
+}
+
+/**
+ * Add a new task. When the task is complete, execute finally(Try<Result>&&)
+ * on the main context.
+ * Task functor is run and destroyed on the fiber context.
+ * Finally functor is run and destroyed on the main context.
+ *
+ * @param func Task functor; must have a signature of `T func()` for some T.
+ * @param finally Finally functor; must have a signature of
+ * `void finally(Try<T>&&)` and will be passed
+ * the result of func() (including the exception if occurred).
+ */
+template <typename F, typename G>
+inline void addTaskFinally(F&& func, G&& finally) {
+ return FiberManager::getFiberManager().addTaskFinally(
+ std::forward<F>(func), std::forward<G>(finally));
+}
+
+/**
+ * Blocks task execution until given promise is fulfilled.
+ *
+ * Calls function passing in a Promise<T>, which has to be fulfilled.
+ *
+ * @return data which was used to fulfill the promise.
+ */
+template <typename F>
+typename FirstArgOf<F>::type::value_type inline await(F&& func);
+
+/**
+ * If called from a fiber, immediately switches to the FiberManager's context
+ * and runs func(), going back to the Fiber's context after completion.
+ * Outside a fiber, just calls func() directly.
+ *
+ * @return value returned by func().
+ */
+template <typename F>
+typename std::result_of<F()>::type inline runInMainContext(F&& func) {
+ auto fm = FiberManager::getFiberManagerUnsafe();
+ if (UNLIKELY(fm == nullptr)) {
+ return func();
+ }
+ return fm->runInMainContext(std::forward<F>(func));
+}
+
+/**
+ * Returns a refference to a fiber-local context for given Fiber. Should be
+ * always called with the same T for each fiber. Fiber-local context is lazily
+ * default-constructed on first request.
+ * When new task is scheduled via addTask / addTaskRemote from a fiber its
+ * fiber-local context is copied into the new fiber.
+ */
+template <typename T>
+T& local() {
+ auto fm = FiberManager::getFiberManagerUnsafe();
+ if (fm) {
+ return fm->local<T>();
+ }
+ return FiberManager::localThread<T>();
+}
+
+inline void yield() {
+ auto fm = FiberManager::getFiberManagerUnsafe();
+ if (fm) {
+ fm->yield();
+ } else {
+ std::this_thread::yield();
+ }
+}
+}
+}
+
+#include <folly/fibers/FiberManagerInternal-inl.h>
#pragma once
#include <folly/fibers/EventBaseLoopController.h>
-#include <folly/fibers/FiberManager.h>
+#include <folly/fibers/FiberManagerInternal.h>
namespace folly {
namespace fibers {
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-#include <folly/fibers/FiberManager.h>
+#include <folly/fibers/FiberManagerInternal.h>
namespace folly {
namespace fibers {
*/
#include <folly/Optional.h>
-#include <folly/fibers/FiberManager.h>
+#include <folly/fibers/FiberManagerInternal.h>
#include <folly/fibers/ForEach.h>
namespace folly {