From 626aa2ddb890c1eb8ba1f3226d8f8641188e1280 Mon Sep 17 00:00:00 2001 From: Andrii Grynenko Date: Thu, 2 Apr 2015 17:17:46 -0700 Subject: [PATCH] Fiber-local context Summary: This adds fiber-local context, which behaves more like static objects+fork rather than thread-locals. Test Plan: unit test Reviewed By: pavlo@fb.com, bwatling@fb.com Subscribers: rushix, alikhtarov, bwatling FB internal diff: D1958135 Signature: t1:1958135:1427999426:8e4b89f4af53a1a119b2e5a765fb549dd8442c50 --- folly/experimental/fibers/Fiber-inl.h | 39 +++++++++++ folly/experimental/fibers/Fiber.cpp | 43 ++++++++++++ folly/experimental/fibers/Fiber.h | 35 ++++++++++ folly/experimental/fibers/FiberManager-inl.h | 35 +++++++++- folly/experimental/fibers/FiberManager.h | 36 +++++++++- folly/experimental/fibers/test/FibersTest.cpp | 65 +++++++++++++++++++ 6 files changed, 249 insertions(+), 4 deletions(-) diff --git a/folly/experimental/fibers/Fiber-inl.h b/folly/experimental/fibers/Fiber-inl.h index adb78a49..8823caf6 100644 --- a/folly/experimental/fibers/Fiber-inl.h +++ b/folly/experimental/fibers/Fiber-inl.h @@ -45,4 +45,43 @@ void Fiber::setReadyFunction(G&& func) { readyFunc_ = std::move(func); } +template +T& Fiber::LocalData::get() { + if (data_) { + assert(*dataType_ == typeid(T)); + return *reinterpret_cast(data_); + } + + dataSize_ = sizeof(T); + dataType_ = &typeid(T); + if (sizeof(T) <= kBufferSize) { + dataDestructor_ = dataBufferDestructor; + data_ = &buffer_; + } else { + dataDestructor_ = dataHeapDestructor; + data_ = allocateHeapBuffer(dataSize_); + } + dataCopyConstructor_ = dataCopyConstructor; + + new (reinterpret_cast(data_)) T(); + + return *reinterpret_cast(data_); +} + +template +void Fiber::LocalData::dataCopyConstructor(void* ptr, const void* other) { + new (reinterpret_cast(ptr)) T(*reinterpret_cast(other)); +} + +template +void Fiber::LocalData::dataBufferDestructor(void* ptr) { + reinterpret_cast(ptr)->~T(); +} + +template +void Fiber::LocalData::dataHeapDestructor(void *ptr) { + reinterpret_cast(ptr)->~T(); + freeHeapBuffer(ptr); +} + }} // folly::fibers diff --git a/folly/experimental/fibers/Fiber.cpp b/folly/experimental/fibers/Fiber.cpp index d738f67f..f4958c1e 100644 --- a/folly/experimental/fibers/Fiber.cpp +++ b/folly/experimental/fibers/Fiber.cpp @@ -176,4 +176,47 @@ intptr_t Fiber::preempt(State state) { return ret; } +Fiber::LocalData::LocalData(const LocalData& other) : data_(nullptr) { + *this = other; +} + +Fiber::LocalData& Fiber::LocalData::operator=(const LocalData& other) { + reset(); + if (!other.data_) { + return *this; + } + + dataSize_ = other.dataSize_; + dataType_ = other.dataType_; + dataDestructor_ = other.dataDestructor_; + dataCopyConstructor_ = other.dataCopyConstructor_; + + if (dataSize_ <= kBufferSize) { + data_ = &buffer_; + } else { + data_ = allocateHeapBuffer(dataSize_); + } + + dataCopyConstructor_(data_, other.data_); + + return *this; +} + +void Fiber::LocalData::reset() { + if (!data_) { + return; + } + + dataDestructor_(data_); + data_ = nullptr; +} + +void* Fiber::LocalData::allocateHeapBuffer(size_t size) { + return new char[size]; +} + +void Fiber::LocalData::freeHeapBuffer(void* buffer) { + delete[] reinterpret_cast(buffer); +} + }} diff --git a/folly/experimental/fibers/Fiber.h b/folly/experimental/fibers/Fiber.h index d18bb782..e3a38d55 100644 --- a/folly/experimental/fibers/Fiber.h +++ b/folly/experimental/fibers/Fiber.h @@ -16,6 +16,7 @@ #pragma once #include +#include #include #include @@ -116,6 +117,40 @@ class Fiber { std::function resultFunc_; std::function finallyFunc_; + class LocalData { + public: + LocalData() {} + LocalData(const LocalData& other); + LocalData& operator=(const LocalData& other); + + template + T& get(); + + void reset(); + + //private: + static void* allocateHeapBuffer(size_t size); + static void freeHeapBuffer(void* buffer); + + template + static void dataCopyConstructor(void*, const void*); + template + static void dataBufferDestructor(void*); + template + static void dataHeapDestructor(void*); + + static constexpr size_t kBufferSize = 128; + std::aligned_storage::type buffer_; + size_t dataSize_; + + const std::type_info* dataType_; + void (*dataDestructor_)(void*); + void (*dataCopyConstructor_)(void*, const void*); + void* data_{nullptr}; + }; + + LocalData localData_; + folly::IntrusiveListHook listHook_; /**< list hook for different FiberManager queues */ pid_t threadId_{0}; diff --git a/folly/experimental/fibers/FiberManager-inl.h b/folly/experimental/fibers/FiberManager-inl.h index 9bf6eb6b..cf963ee0 100644 --- a/folly/experimental/fibers/FiberManager-inl.h +++ b/folly/experimental/fibers/FiberManager-inl.h @@ -41,8 +41,9 @@ inline void FiberManager::ensureLoopScheduled() { inline void FiberManager::runReadyFiber(Fiber* fiber) { assert(fiber->state_ == Fiber::NOT_STARTED || fiber->state_ == Fiber::READY_TO_RUN); + currentFiber_ = fiber; - while (fiber->state_ == Fiber::NOT_STARTED || + while (fiber->state_ == Fiber::NOT_STARTED || fiber->state_ == Fiber::READY_TO_RUN) { activeFiber_ = fiber; if (fiber->readyFunc_) { @@ -79,6 +80,7 @@ inline void FiberManager::runReadyFiber(Fiber* fiber) { } fiber->finallyFunc_ = nullptr; } + fiber->localData_.reset(); if (fibersPoolSize_ < options_.maxFibersPoolSize) { fibersPool_.push_front(*fiber); @@ -89,6 +91,7 @@ inline void FiberManager::runReadyFiber(Fiber* fiber) { --fibersAllocated_; } } + currentFiber_ = nullptr; } inline bool FiberManager::loopUntilNoReady() { @@ -120,6 +123,10 @@ inline bool FiberManager::loopUntilNoReady() { [this, &hadRemoteFiber] (RemoteTask* taskPtr) { std::unique_ptr task(taskPtr); auto fiber = getFiber(); + if (task->localData) { + fiber->localData_ = *task->localData; + } + fiber->setFunction(std::move(task->func)); fiber->data_ = reinterpret_cast(fiber); runReadyFiber(fiber); @@ -170,6 +177,9 @@ void FiberManager::addTask(F&& func) { typedef AddTaskHelper Helper; auto fiber = getFiber(); + if (currentFiber_) { + fiber->localData_ = currentFiber_->localData_; + } if (Helper::allocateInBuffer) { auto funcLoc = static_cast(fiber->getUserBuffer()); @@ -191,6 +201,10 @@ void FiberManager::addTask(F&& func) { template void FiberManager::addTaskReadyFunc(F&& func, G&& readyFunc) { auto fiber = getFiber(); + if (currentFiber_) { + fiber->localData_ = currentFiber_->localData_; + } + fiber->setFunction(std::forward(func)); fiber->setReadyFunction(std::forward(readyFunc)); @@ -202,7 +216,15 @@ void FiberManager::addTaskReadyFunc(F&& func, G&& readyFunc) { template void FiberManager::addTaskRemote(F&& func) { - auto task = folly::make_unique(std::move(func)); + auto task = [&]() { + auto currentFm = getFiberManagerUnsafe(); + if (currentFm && currentFm->currentFiber_) { + return folly::make_unique( + std::forward(func), + currentFm->currentFiber_->localData_); + } + return folly::make_unique(std::forward(func)); + }(); if (remoteTaskQueue_.insertHead(task.release())) { loopController_->scheduleThreadSafe(); } @@ -294,6 +316,9 @@ void FiberManager::addTaskFinally(F&& func, G&& finally) { "finally(Try&&): T must be convertible from func()'s return type"); auto fiber = getFiber(); + if (currentFiber_) { + fiber->localData_ = currentFiber_->localData_; + } typedef AddTaskFinallyHelper Helper; @@ -375,6 +400,12 @@ inline bool FiberManager::hasActiveFiber() { return activeFiber_ != nullptr; } +template +T& FiberManager::local() { + assert(getFiberManager().currentFiber_ != nullptr); + return currentFiber_->localData_.get(); +} + template typename FirstArgOf::type::value_type inline await(F&& func) { diff --git a/folly/experimental/fibers/FiberManager.h b/folly/experimental/fibers/FiberManager.h index aba56173..e0fbf71c 100644 --- a/folly/experimental/fibers/FiberManager.h +++ b/folly/experimental/fibers/FiberManager.h @@ -179,6 +179,16 @@ class FiberManager { typename std::result_of::type runInMainContext(F&& func); + /** + * Returns a refference to a fiber-local context for given Fiber. Should be + * always called with the same T for each fiber. Fiber-local context is lazily + * default-constructed on first request. + * When new task is scheduled via addTask / addTaskRemote from a fiber its + * fiber-local context is copied into the new fiber. + */ + template + T& local(); + /** * @return How many fiber objects (and stacks) has this manager allocated. */ @@ -213,14 +223,24 @@ class FiberManager { struct RemoteTask { template - explicit RemoteTask(F&& f) : func(std::move(f)) {} + explicit RemoteTask(F&& f) : func(std::forward(f)) {} + template + RemoteTask(F&& f, const Fiber::LocalData& localData_) : + func(std::forward(f)), + localData(folly::make_unique(localData_)) {} std::function func; - folly::AtomicLinkedListHook nextRemoteTask; + std::unique_ptr localData; + AtomicLinkedListHook nextRemoteTask; }; typedef folly::IntrusiveList FiberTailQueue; Fiber* activeFiber_{nullptr}; /**< active fiber, nullptr on main context */ + /** + * Same as active fiber, but also set for functions run from fiber on main + * context. + */ + Fiber* currentFiber_{nullptr}; FiberTailQueue readyFibers_; /**< queue of fibers ready to be executed */ FiberTailQueue fibersPool_; /**< pool of unitialized Fiber objects */ @@ -374,6 +394,18 @@ inline runInMainContext(F&& func) { return fm->runInMainContext(std::forward(func)); } +/** + * Returns a refference to a fiber-local context for given Fiber. Should be + * always called with the same T for each fiber. Fiber-local context is lazily + * default-constructed on first request. + * When new task is scheduled via addTask / addTaskRemote from a fiber its + * fiber-local context is copied into the new fiber. + */ +template +T& local() { + return FiberManager::getFiberManager().local(); +} + }} #include "FiberManager-inl.h" diff --git a/folly/experimental/fibers/test/FibersTest.cpp b/folly/experimental/fibers/test/FibersTest.cpp index a0709630..0b79cafa 100644 --- a/folly/experimental/fibers/test/FibersTest.cpp +++ b/folly/experimental/fibers/test/FibersTest.cpp @@ -1215,6 +1215,71 @@ TEST(FiberManager, remoteHasReadyTasks) { EXPECT_EQ(result, 47); } +template +void testFiberLocal() { + FiberManager fm(folly::make_unique()); + + fm.addTask([]() { + EXPECT_EQ(42, local().value); + + local().value = 43; + + addTask([]() { + EXPECT_EQ(43, local().value); + + local().value = 44; + + addTask([]() { + EXPECT_EQ(44, local().value); + }); + }); + }); + + fm.addTask([&]() { + EXPECT_EQ(42, local().value); + + local().value = 43; + + fm.addTaskRemote([]() { + EXPECT_EQ(43, local().value); + }); + }); + + fm.addTask([]() { + EXPECT_EQ(42, local().value); + local().value = 43; + + auto task = []() { + EXPECT_EQ(43, local().value); + local().value = 44; + }; + std::vector> tasks{task}; + whenAny(tasks.begin(), tasks.end()); + + EXPECT_EQ(43, local().value); + }); + + fm.loopUntilNoReady(); + EXPECT_FALSE(fm.hasTasks()); +} + +TEST(FiberManager, fiberLocal) { + struct SimpleData { + int value{42}; + }; + + testFiberLocal(); +} + +TEST(FiberManager, fiberLocalHeap) { + struct LargeData { + char _[1024*1024]; + int value{42}; + }; + + testFiberLocal(); +} + static size_t sNumAwaits; void runBenchmark(size_t numAwaits, size_t toSend) { -- 2.34.1