readyFunc_ = std::move(func);
}
+template <typename T>
+T& Fiber::LocalData::get() {
+ if (data_) {
+ assert(*dataType_ == typeid(T));
+ return *reinterpret_cast<T*>(data_);
+ }
+
+ dataSize_ = sizeof(T);
+ dataType_ = &typeid(T);
+ if (sizeof(T) <= kBufferSize) {
+ dataDestructor_ = dataBufferDestructor<T>;
+ data_ = &buffer_;
+ } else {
+ dataDestructor_ = dataHeapDestructor<T>;
+ data_ = allocateHeapBuffer(dataSize_);
+ }
+ dataCopyConstructor_ = dataCopyConstructor<T>;
+
+ new (reinterpret_cast<T*>(data_)) T();
+
+ return *reinterpret_cast<T*>(data_);
+}
+
+template <typename T>
+void Fiber::LocalData::dataCopyConstructor(void* ptr, const void* other) {
+ new (reinterpret_cast<T*>(ptr)) T(*reinterpret_cast<const T*>(other));
+}
+
+template <typename T>
+void Fiber::LocalData::dataBufferDestructor(void* ptr) {
+ reinterpret_cast<T*>(ptr)->~T();
+}
+
+template <typename T>
+void Fiber::LocalData::dataHeapDestructor(void *ptr) {
+ reinterpret_cast<T*>(ptr)->~T();
+ freeHeapBuffer(ptr);
+}
+
}} // folly::fibers
return ret;
}
+Fiber::LocalData::LocalData(const LocalData& other) : data_(nullptr) {
+ *this = other;
+}
+
+Fiber::LocalData& Fiber::LocalData::operator=(const LocalData& other) {
+ reset();
+ if (!other.data_) {
+ return *this;
+ }
+
+ dataSize_ = other.dataSize_;
+ dataType_ = other.dataType_;
+ dataDestructor_ = other.dataDestructor_;
+ dataCopyConstructor_ = other.dataCopyConstructor_;
+
+ if (dataSize_ <= kBufferSize) {
+ data_ = &buffer_;
+ } else {
+ data_ = allocateHeapBuffer(dataSize_);
+ }
+
+ dataCopyConstructor_(data_, other.data_);
+
+ return *this;
+}
+
+void Fiber::LocalData::reset() {
+ if (!data_) {
+ return;
+ }
+
+ dataDestructor_(data_);
+ data_ = nullptr;
+}
+
+void* Fiber::LocalData::allocateHeapBuffer(size_t size) {
+ return new char[size];
+}
+
+void Fiber::LocalData::freeHeapBuffer(void* buffer) {
+ delete[] reinterpret_cast<char*>(buffer);
+}
+
}}
#pragma once
#include <functional>
+#include <typeinfo>
#include <boost/context/all.hpp>
#include <boost/version.hpp>
std::function<void()> resultFunc_;
std::function<void()> finallyFunc_;
+ class LocalData {
+ public:
+ LocalData() {}
+ LocalData(const LocalData& other);
+ LocalData& operator=(const LocalData& other);
+
+ template <typename T>
+ T& get();
+
+ void reset();
+
+ //private:
+ static void* allocateHeapBuffer(size_t size);
+ static void freeHeapBuffer(void* buffer);
+
+ template <typename T>
+ static void dataCopyConstructor(void*, const void*);
+ template <typename T>
+ static void dataBufferDestructor(void*);
+ template <typename T>
+ static void dataHeapDestructor(void*);
+
+ static constexpr size_t kBufferSize = 128;
+ std::aligned_storage<kBufferSize>::type buffer_;
+ size_t dataSize_;
+
+ const std::type_info* dataType_;
+ void (*dataDestructor_)(void*);
+ void (*dataCopyConstructor_)(void*, const void*);
+ void* data_{nullptr};
+ };
+
+ LocalData localData_;
+
folly::IntrusiveListHook listHook_; /**< list hook for different FiberManager
queues */
pid_t threadId_{0};
inline void FiberManager::runReadyFiber(Fiber* fiber) {
assert(fiber->state_ == Fiber::NOT_STARTED ||
fiber->state_ == Fiber::READY_TO_RUN);
+ currentFiber_ = fiber;
- while (fiber->state_ == Fiber::NOT_STARTED ||
+ while (fiber->state_ == Fiber::NOT_STARTED ||
fiber->state_ == Fiber::READY_TO_RUN) {
activeFiber_ = fiber;
if (fiber->readyFunc_) {
}
fiber->finallyFunc_ = nullptr;
}
+ fiber->localData_.reset();
if (fibersPoolSize_ < options_.maxFibersPoolSize) {
fibersPool_.push_front(*fiber);
--fibersAllocated_;
}
}
+ currentFiber_ = nullptr;
}
inline bool FiberManager::loopUntilNoReady() {
[this, &hadRemoteFiber] (RemoteTask* taskPtr) {
std::unique_ptr<RemoteTask> task(taskPtr);
auto fiber = getFiber();
+ if (task->localData) {
+ fiber->localData_ = *task->localData;
+ }
+
fiber->setFunction(std::move(task->func));
fiber->data_ = reinterpret_cast<intptr_t>(fiber);
runReadyFiber(fiber);
typedef AddTaskHelper<F> Helper;
auto fiber = getFiber();
+ if (currentFiber_) {
+ fiber->localData_ = currentFiber_->localData_;
+ }
if (Helper::allocateInBuffer) {
auto funcLoc = static_cast<typename Helper::Func*>(fiber->getUserBuffer());
template <typename F, typename G>
void FiberManager::addTaskReadyFunc(F&& func, G&& readyFunc) {
auto fiber = getFiber();
+ if (currentFiber_) {
+ fiber->localData_ = currentFiber_->localData_;
+ }
+
fiber->setFunction(std::forward<F>(func));
fiber->setReadyFunction(std::forward<G>(readyFunc));
template <typename F>
void FiberManager::addTaskRemote(F&& func) {
- auto task = folly::make_unique<RemoteTask>(std::move(func));
+ auto task = [&]() {
+ auto currentFm = getFiberManagerUnsafe();
+ if (currentFm && currentFm->currentFiber_) {
+ return folly::make_unique<RemoteTask>(
+ std::forward<F>(func),
+ currentFm->currentFiber_->localData_);
+ }
+ return folly::make_unique<RemoteTask>(std::forward<F>(func));
+ }();
if (remoteTaskQueue_.insertHead(task.release())) {
loopController_->scheduleThreadSafe();
}
"finally(Try<T>&&): T must be convertible from func()'s return type");
auto fiber = getFiber();
+ if (currentFiber_) {
+ fiber->localData_ = currentFiber_->localData_;
+ }
typedef AddTaskFinallyHelper<F,G> Helper;
return activeFiber_ != nullptr;
}
+template <typename T>
+T& FiberManager::local() {
+ assert(getFiberManager().currentFiber_ != nullptr);
+ return currentFiber_->localData_.get<T>();
+}
+
template <typename F>
typename FirstArgOf<F>::type::value_type
inline await(F&& func) {
typename std::result_of<F()>::type
runInMainContext(F&& func);
+ /**
+ * Returns a refference to a fiber-local context for given Fiber. Should be
+ * always called with the same T for each fiber. Fiber-local context is lazily
+ * default-constructed on first request.
+ * When new task is scheduled via addTask / addTaskRemote from a fiber its
+ * fiber-local context is copied into the new fiber.
+ */
+ template <typename T>
+ T& local();
+
/**
* @return How many fiber objects (and stacks) has this manager allocated.
*/
struct RemoteTask {
template <typename F>
- explicit RemoteTask(F&& f) : func(std::move(f)) {}
+ explicit RemoteTask(F&& f) : func(std::forward<F>(f)) {}
+ template <typename F>
+ RemoteTask(F&& f, const Fiber::LocalData& localData_) :
+ func(std::forward<F>(f)),
+ localData(folly::make_unique<Fiber::LocalData>(localData_)) {}
std::function<void()> func;
- folly::AtomicLinkedListHook<RemoteTask> nextRemoteTask;
+ std::unique_ptr<Fiber::LocalData> localData;
+ AtomicLinkedListHook<RemoteTask> nextRemoteTask;
};
typedef folly::IntrusiveList<Fiber, &Fiber::listHook_> FiberTailQueue;
Fiber* activeFiber_{nullptr}; /**< active fiber, nullptr on main context */
+ /**
+ * Same as active fiber, but also set for functions run from fiber on main
+ * context.
+ */
+ Fiber* currentFiber_{nullptr};
FiberTailQueue readyFibers_; /**< queue of fibers ready to be executed */
FiberTailQueue fibersPool_; /**< pool of unitialized Fiber objects */
return fm->runInMainContext(std::forward<F>(func));
}
+/**
+ * Returns a refference to a fiber-local context for given Fiber. Should be
+ * always called with the same T for each fiber. Fiber-local context is lazily
+ * default-constructed on first request.
+ * When new task is scheduled via addTask / addTaskRemote from a fiber its
+ * fiber-local context is copied into the new fiber.
+ */
+template <typename T>
+T& local() {
+ return FiberManager::getFiberManager().local<T>();
+}
+
}}
#include "FiberManager-inl.h"
EXPECT_EQ(result, 47);
}
+template <typename Data>
+void testFiberLocal() {
+ FiberManager fm(folly::make_unique<SimpleLoopController>());
+
+ fm.addTask([]() {
+ EXPECT_EQ(42, local<Data>().value);
+
+ local<Data>().value = 43;
+
+ addTask([]() {
+ EXPECT_EQ(43, local<Data>().value);
+
+ local<Data>().value = 44;
+
+ addTask([]() {
+ EXPECT_EQ(44, local<Data>().value);
+ });
+ });
+ });
+
+ fm.addTask([&]() {
+ EXPECT_EQ(42, local<Data>().value);
+
+ local<Data>().value = 43;
+
+ fm.addTaskRemote([]() {
+ EXPECT_EQ(43, local<Data>().value);
+ });
+ });
+
+ fm.addTask([]() {
+ EXPECT_EQ(42, local<Data>().value);
+ local<Data>().value = 43;
+
+ auto task = []() {
+ EXPECT_EQ(43, local<Data>().value);
+ local<Data>().value = 44;
+ };
+ std::vector<std::function<void()>> tasks{task};
+ whenAny(tasks.begin(), tasks.end());
+
+ EXPECT_EQ(43, local<Data>().value);
+ });
+
+ fm.loopUntilNoReady();
+ EXPECT_FALSE(fm.hasTasks());
+}
+
+TEST(FiberManager, fiberLocal) {
+ struct SimpleData {
+ int value{42};
+ };
+
+ testFiberLocal<SimpleData>();
+}
+
+TEST(FiberManager, fiberLocalHeap) {
+ struct LargeData {
+ char _[1024*1024];
+ int value{42};
+ };
+
+ testFiberLocal<LargeData>();
+}
+
static size_t sNumAwaits;
void runBenchmark(size_t numAwaits, size_t toSend) {