#include <functional>
#include <memory>
#include <queue>
+#include <thread>
+#include <typeindex>
#include <unordered_set>
#include <vector>
#include <folly/AtomicLinkedList.h>
+#include <folly/Executor.h>
#include <folly/Likely.h>
#include <folly/IntrusiveList.h>
#include <folly/futures/Try.h>
+#include <folly/experimental/ExecutionObserver.h>
#include <folly/experimental/fibers/BoostContextCompatibility.h>
#include <folly/experimental/fibers/Fiber.h>
-#include <folly/experimental/fibers/traits.h>
-
-#ifdef USE_GUARD_ALLOCATOR
#include <folly/experimental/fibers/GuardPageAllocator.h>
-#endif
+#include <folly/experimental/fibers/traits.h>
namespace folly { namespace fibers {
class LoopController;
class TimeoutController;
+template <typename T>
+class LocalType {
+};
+
/**
* @class FiberManager
* @brief Single-threaded task execution engine.
* call. This will pause execution of this task and it will be resumed only
* when it is unblocked (via setData()).
*/
-class FiberManager {
+class FiberManager : public ::folly::Executor {
public:
struct Options {
-#ifdef FOLLY_SANITIZE_ADDRESS
- /* ASAN needs a lot of extra stack space.
- 16x is a conservative estimate, 8x also worked with tests
- where it mattered. Note that overallocating here does not necessarily
- increase RSS, since unused memory is pretty much free. */
- static constexpr size_t kDefaultStackSize{16 * 16 * 1024};
-#else
static constexpr size_t kDefaultStackSize{16 * 1024};
-#endif
+
/**
* Maximum stack size for fibers which will be used for executing all the
* tasks.
* This is fairly expensive: we fill each newly allocated stack
* with some known value and find the boundary of unused stack
* with linear search every time we surrender the stack back to fibersPool.
+ * 0 disables stack recording.
*/
- bool debugRecordStackUsed{false};
+ size_t recordStackEvery{0};
/**
* Keep at most this many free fibers in the pool.
typedef std::function<void(std::exception_ptr, std::string)>
ExceptionCallback;
+ FiberManager(const FiberManager&) = delete;
+ FiberManager& operator=(const FiberManager&) = delete;
+
/**
* Initializes, but doesn't start FiberManager loop
*
+ * @param loopController
* @param options FiberManager options
*/
explicit FiberManager(std::unique_ptr<LoopController> loopController,
Options options = Options());
+ /**
+ * Initializes, but doesn't start FiberManager loop
+ *
+ * @param loopController
+ * @param options FiberManager options
+ * @tparam LocalT only local of this type may be stored on fibers.
+ * Locals of other types will be considered thread-locals.
+ */
+ template <typename LocalT>
+ FiberManager(LocalType<LocalT>,
+ std::unique_ptr<LoopController> loopController,
+ Options options = Options());
+
+
~FiberManager();
/**
template <typename F>
void addTaskRemote(F&& func);
+ // Executor interface calls addTaskRemote
+ void add(std::function<void()> f) {
+ addTaskRemote(std::move(f));
+ }
+
/**
* Add a new task. When the task is complete, execute finally(Try<Result>&&)
* on the main context.
/**
* return true if running activeFiber_ is not nullptr.
*/
- bool hasActiveFiber();
+ bool hasActiveFiber() const;
/**
* @return What was the most observed fiber stack usage (in bytes).
*/
size_t stackHighWatermark() const;
+ /**
+ * Yield execution of the currently running fiber. Must only be called from a
+ * fiber executing on this FiberManager. The calling fiber will be scheduled
+ * when all other fibers have had a chance to run and the event loop is
+ * serviced.
+ */
+ void yield();
+
+ /**
+ * Setup fibers execution observation/instrumentation. Fiber locals are
+ * available to observer.
+ *
+ * @param observer Fiber's execution observer.
+ */
+ void setObserver(ExecutionObserver* observer);
+
static FiberManager& getFiberManager();
static FiberManager* getFiberManagerUnsafe();
Fiber* currentFiber_{nullptr};
FiberTailQueue readyFibers_; /**< queue of fibers ready to be executed */
+ FiberTailQueue yieldedFibers_; /**< queue of fibers which have yielded
+ execution */
FiberTailQueue fibersPool_; /**< pool of unitialized Fiber objects */
size_t fibersAllocated_{0}; /**< total number of fibers allocated */
size_t fibersPoolSize_{0}; /**< total number of fibers in the free pool */
size_t fibersActive_{0}; /**< number of running or blocked fibers */
+ size_t fiberId_{0}; /**< id of last fiber used */
FContext::ContextStruct mainContext_; /**< stores loop function context */
* Allocator used to allocate stack for Fibers in the pool.
* Allocates stack on the stack of the main context.
*/
-#ifdef USE_GUARD_ALLOCATOR
- /* This is too slow for production use; can be fixed
- if we allocated all stack storage once upfront */
GuardPageAllocator stackAllocator_;
-#else
- std::allocator<unsigned char> stackAllocator_;
-#endif
const Options options_; /**< FiberManager options */
*/
Fiber* getFiber();
+ /**
+ * Sets local data for given fiber if all conditions are met.
+ */
+ void initLocalData(Fiber& fiber);
+
/**
* Function passed to the await call.
*/
*/
std::function<void()> immediateFunc_;
+ /**
+ * Fiber's execution observer.
+ */
+ ExecutionObserver* observer_{nullptr};
+
ExceptionCallback exceptionCallback_; /**< task exception callback */
folly::AtomicLinkedList<Fiber, &Fiber::nextRemoteReady_> remoteReadyQueue_;
std::shared_ptr<TimeoutController> timeoutManager_;
+ /**
+ * Only local of this type will be available for fibers.
+ */
+ std::type_index localType_;
+
void runReadyFiber(Fiber* fiber);
void remoteReadyInsert(Fiber* fiber);
};
return FiberManager::localThread<T>();
}
+inline void yield() {
+ auto fm = FiberManager::getFiberManagerUnsafe();
+ if (fm) {
+ fm->yield();
+ } else {
+ std::this_thread::yield();
+ }
+}
+
}}
#include "FiberManager-inl.h"