From 5de6b4582e52ca6c224f364bc335c6a8161704c3 Mon Sep 17 00:00:00 2001 From: Rushi Desai Date: Thu, 2 Apr 2015 13:09:03 -0700 Subject: [PATCH] Move mcrouter/lib/fibers to folly/experimental/fibers Summary: The mcrouter fibers library is quite general purpose and reusable by other projects. Hence I'm moving it into folly. Test Plan: fbconfig -r folly/experimental/fibers && fbmake runtests fbconfig -r mcrouter && fbmake runtests fbconfig -r tao && fbmake fbconfig -r ti && fbmake Reviewed By: andrii@fb.com Subscribers: vikas, zhuohuang, jmkaldor, jhunt, pavlo, int, aap, trunkagent, fredemmott, alikhtarov, folly-diffs@, jsedgwick, yfeldblum, chalfant, chip FB internal diff: D1958061 Signature: t1:1958061:1428005194:b57bfecfe9678e81c48526f57e6197270e2b5a27 --- folly/experimental/fibers/AddTasks-inl.h | 137 ++ folly/experimental/fibers/AddTasks.h | 125 ++ folly/experimental/fibers/Baton-inl.h | 112 ++ folly/experimental/fibers/Baton.cpp | 158 +++ folly/experimental/fibers/Baton.h | 155 ++ .../fibers/BoostContextCompatibility.h | 109 ++ .../fibers/EventBaseLoopController-inl.h | 105 ++ .../fibers/EventBaseLoopController.h | 66 + folly/experimental/fibers/Fiber-inl.h | 48 + folly/experimental/fibers/Fiber.cpp | 179 +++ folly/experimental/fibers/Fiber.h | 126 ++ folly/experimental/fibers/FiberManager-inl.h | 393 +++++ folly/experimental/fibers/FiberManager.cpp | 124 ++ folly/experimental/fibers/FiberManager.h | 379 +++++ folly/experimental/fibers/FiberManagerMap.cpp | 61 + folly/experimental/fibers/FiberManagerMap.h | 27 + folly/experimental/fibers/ForEach-inl.h | 88 ++ folly/experimental/fibers/ForEach.h | 43 + folly/experimental/fibers/GenericBaton.h | 26 + .../fibers/GuardPageAllocator-inl.h | 69 + .../experimental/fibers/GuardPageAllocator.h | 32 + folly/experimental/fibers/LoopController.h | 60 + folly/experimental/fibers/Makefile.am | 1 + folly/experimental/fibers/Promise-inl.h | 93 ++ folly/experimental/fibers/Promise.h | 100 ++ .../fibers/SimpleLoopController.h | 108 ++ .../experimental/fibers/TimeoutController.cpp | 105 ++ folly/experimental/fibers/TimeoutController.h | 69 + folly/experimental/fibers/WhenN-inl.h | 227 +++ folly/experimental/fibers/WhenN.h | 142 ++ folly/experimental/fibers/test/FibersTest.cpp | 1263 +++++++++++++++++ .../fibers/test/FibersTestApp.cpp | 96 ++ folly/experimental/fibers/test/Makefile.am | 8 + folly/experimental/fibers/test/main.cpp | 31 + folly/experimental/fibers/traits.h | 70 + 35 files changed, 4935 insertions(+) create mode 100644 folly/experimental/fibers/AddTasks-inl.h create mode 100644 folly/experimental/fibers/AddTasks.h create mode 100644 folly/experimental/fibers/Baton-inl.h create mode 100644 folly/experimental/fibers/Baton.cpp create mode 100644 folly/experimental/fibers/Baton.h create mode 100644 folly/experimental/fibers/BoostContextCompatibility.h create mode 100644 folly/experimental/fibers/EventBaseLoopController-inl.h create mode 100644 folly/experimental/fibers/EventBaseLoopController.h create mode 100644 folly/experimental/fibers/Fiber-inl.h create mode 100644 folly/experimental/fibers/Fiber.cpp create mode 100644 folly/experimental/fibers/Fiber.h create mode 100644 folly/experimental/fibers/FiberManager-inl.h create mode 100644 folly/experimental/fibers/FiberManager.cpp create mode 100644 folly/experimental/fibers/FiberManager.h create mode 100644 folly/experimental/fibers/FiberManagerMap.cpp create mode 100644 folly/experimental/fibers/FiberManagerMap.h create mode 100644 folly/experimental/fibers/ForEach-inl.h create mode 100644 folly/experimental/fibers/ForEach.h create mode 100644 folly/experimental/fibers/GenericBaton.h create mode 100644 folly/experimental/fibers/GuardPageAllocator-inl.h create mode 100644 folly/experimental/fibers/GuardPageAllocator.h create mode 100644 folly/experimental/fibers/LoopController.h create mode 100644 folly/experimental/fibers/Makefile.am create mode 100644 folly/experimental/fibers/Promise-inl.h create mode 100644 folly/experimental/fibers/Promise.h create mode 100644 folly/experimental/fibers/SimpleLoopController.h create mode 100644 folly/experimental/fibers/TimeoutController.cpp create mode 100644 folly/experimental/fibers/TimeoutController.h create mode 100644 folly/experimental/fibers/WhenN-inl.h create mode 100644 folly/experimental/fibers/WhenN.h create mode 100644 folly/experimental/fibers/test/FibersTest.cpp create mode 100644 folly/experimental/fibers/test/FibersTestApp.cpp create mode 100644 folly/experimental/fibers/test/Makefile.am create mode 100644 folly/experimental/fibers/test/main.cpp create mode 100644 folly/experimental/fibers/traits.h diff --git a/folly/experimental/fibers/AddTasks-inl.h b/folly/experimental/fibers/AddTasks-inl.h new file mode 100644 index 00000000..1e74f63d --- /dev/null +++ b/folly/experimental/fibers/AddTasks-inl.h @@ -0,0 +1,137 @@ +/* + * Copyright 2015 Facebook, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#include +#include + +#include + +namespace folly { namespace fibers { + +template +TaskIterator::TaskIterator(TaskIterator&& other) noexcept + : context_(std::move(other.context_)), + id_(other.id_) { +} + +template +TaskIterator::TaskIterator(std::shared_ptr context) + : context_(std::move(context)), + id_(-1) { + assert(context_); +} + +template +inline bool TaskIterator::hasCompleted() const { + return context_->tasksConsumed < context_->results.size(); +} + +template +inline bool TaskIterator::hasPending() const { + return !context_.unique(); +} + +template +inline bool TaskIterator::hasNext() const { + return hasPending() || hasCompleted(); +} + +template +folly::Try TaskIterator::awaitNextResult() { + assert(hasCompleted() || hasPending()); + reserve(1); + + size_t i = context_->tasksConsumed++; + id_ = context_->results[i].first; + return std::move(context_->results[i].second); +} + +template +inline T TaskIterator::awaitNext() { + return std::move(awaitNextResult().value()); +} + +template <> +inline void TaskIterator::awaitNext() { + awaitNextResult().value(); +} + +template +inline void TaskIterator::reserve(size_t n) { + size_t tasksReady = context_->results.size() - context_->tasksConsumed; + + // we don't need to do anything if there are already n or more tasks complete + // or if we have no tasks left to execute. + if (!hasPending() || tasksReady >= n) { + return; + } + + n -= tasksReady; + size_t tasksLeft = context_->totalTasks - context_->results.size(); + n = std::min(n, tasksLeft); + + await( + [this, n](Promise promise) { + context_->tasksToFulfillPromise = n; + context_->promise.assign(std::move(promise)); + }); +} + +template +inline size_t TaskIterator::getTaskID() const { + assert(id_ != -1); + return id_; +} + +template +TaskIterator::value_type()>::type> +addTasks(InputIterator first, InputIterator last) { + typedef typename std::result_of< + typename std::iterator_traits::value_type()>::type + ResultType; + typedef TaskIterator IteratorType; + + auto context = std::make_shared(); + context->totalTasks = std::distance(first, last); + context->results.reserve(context->totalTasks); + + for (size_t i = 0; first != last; ++i, ++first) { +#ifdef __clang__ +#pragma clang diagnostic push // ignore generalized lambda capture warning +#pragma clang diagnostic ignored "-Wc++1y-extensions" +#endif + addTask( + [i, context, f = std::move(*first)]() { + context->results.emplace_back(i, folly::makeTryFunction(std::move(f))); + + // Check for awaiting iterator. + if (context->promise.hasValue()) { + if (--context->tasksToFulfillPromise == 0) { + context->promise->setValue(); + context->promise.clear(); + } + } + } + ); +#ifdef __clang__ +#pragma clang diagnostic pop +#endif + } + + return IteratorType(std::move(context)); +} + +}} diff --git a/folly/experimental/fibers/AddTasks.h b/folly/experimental/fibers/AddTasks.h new file mode 100644 index 00000000..f5a10a39 --- /dev/null +++ b/folly/experimental/fibers/AddTasks.h @@ -0,0 +1,125 @@ +/* + * Copyright 2015 Facebook, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#pragma once + +#include +#include + +#include +#include +#include + +namespace folly { namespace fibers { + +template +class TaskIterator; + +/** + * Schedules several tasks and immediately returns an iterator, that + * allow to traverse tasks in the order of their completion. All results and + * exptions thrown are stored alongside with the task id and are + * accessible via iterator. + * + * @param first Range of tasks to be scheduled + * @param last + * + * @return movable, non-copyable iterator + */ +template +TaskIterator< + typename std::result_of< + typename std::iterator_traits::value_type()>::type> +inline addTasks(InputIterator first, InputIterator last); + +template +class TaskIterator { + public: + typedef T value_type; + + // not copyable + TaskIterator(const TaskIterator& other) = delete; + TaskIterator& operator=(const TaskIterator& other) = delete; + + // movable + TaskIterator(TaskIterator&& other) noexcept; + TaskIterator& operator=(TaskIterator&& other) = delete; + + /** + * @return True if there are tasks immediately available to be consumed (no + * need to await on them). + */ + bool hasCompleted() const; + + /** + * @return True if there are tasks pending execution (need to awaited on). + */ + bool hasPending() const; + + /** + * @return True if there are any tasks (hasCompleted() || hasPending()). + */ + bool hasNext() const; + + /** + * Await for another task to complete. Will not await if the result is + * already available. + * + * @return result of the task completed. + * @throw exception thrown by the task. + */ + T awaitNext(); + + /** + * Await until the specified number of tasks completes or there are no + * tasks left to await for. + * Note: Will not await if there are already the specified number of tasks + * available. + * + * @param n Number of tasks to await for completition. + */ + void reserve(size_t n); + + /** + * @return id of the last task that was processed by awaitNext(). + */ + size_t getTaskID() const; + + private: + template + friend TaskIterator< + typename std::result_of< + typename std::iterator_traits::value_type()>::type> + addTasks(InputIterator first, InputIterator last); + + struct Context { + std::vector>> results; + folly::Optional> promise; + size_t totalTasks{0}; + size_t tasksConsumed{0}; + size_t tasksToFulfillPromise{0}; + }; + + std::shared_ptr context_; + size_t id_; + + explicit TaskIterator(std::shared_ptr context); + + folly::Try awaitNextResult(); +}; + +}} + +#include diff --git a/folly/experimental/fibers/Baton-inl.h b/folly/experimental/fibers/Baton-inl.h new file mode 100644 index 00000000..2cfca6d9 --- /dev/null +++ b/folly/experimental/fibers/Baton-inl.h @@ -0,0 +1,112 @@ +/* + * Copyright 2015 Facebook, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#include +#include + +namespace folly { namespace fibers { + +inline Baton::Baton() : Baton(NO_WAITER) { + assert(Baton(NO_WAITER).futex_.futex == static_cast(NO_WAITER)); + assert(Baton(POSTED).futex_.futex == static_cast(POSTED)); + assert(Baton(TIMEOUT).futex_.futex == static_cast(TIMEOUT)); + assert(Baton(THREAD_WAITING).futex_.futex == + static_cast(THREAD_WAITING)); + + assert(futex_.futex.is_lock_free()); + assert(waitingFiber_.is_lock_free()); +} + +template +void Baton::wait(F&& mainContextFunc) { + auto fm = FiberManager::getFiberManagerUnsafe(); + if (!fm || !fm->activeFiber_) { + mainContextFunc(); + return waitThread(); + } + + return waitFiber(*fm, std::forward(mainContextFunc)); +} + +template +void Baton::waitFiber(FiberManager& fm, F&& mainContextFunc) { + auto& waitingFiber = waitingFiber_; + auto f = [&mainContextFunc, &waitingFiber](Fiber& fiber) mutable { + auto baton_fiber = waitingFiber.load(); + do { + if (LIKELY(baton_fiber == NO_WAITER)) { + continue; + } else if (baton_fiber == POSTED || baton_fiber == TIMEOUT) { + fiber.setData(0); + break; + } else { + throw std::logic_error("Some Fiber is already waiting on this Baton."); + } + } while(!waitingFiber.compare_exchange_weak( + baton_fiber, + reinterpret_cast(&fiber))); + + mainContextFunc(); + }; + + fm.awaitFunc_ = std::ref(f); + fm.activeFiber_->preempt(Fiber::AWAITING); +} + +template +bool Baton::timed_wait(TimeoutController::Duration timeout, + F&& mainContextFunc) { + auto fm = FiberManager::getFiberManagerUnsafe(); + + if (!fm || !fm->activeFiber_) { + mainContextFunc(); + return timedWaitThread(timeout); + } + + auto& baton = *this; + bool canceled = false; + auto timeoutFunc = [&baton, &canceled]() mutable { + baton.postHelper(TIMEOUT); + canceled = true; + }; + + auto id = fm->timeoutManager_->registerTimeout( + std::ref(timeoutFunc), timeout); + + waitFiber(*fm, std::move(mainContextFunc)); + + auto posted = waitingFiber_ == POSTED; + + if (!canceled) { + fm->timeoutManager_->cancel(id); + } + + return posted; +} + +template +bool Baton::timed_wait(const std::chrono::time_point& timeout) { + auto now = C::now(); + + if (LIKELY(now <= timeout)) { + return timed_wait( + std::chrono::duration_cast(timeout - now)); + } else { + return timed_wait(TimeoutController::Duration(0)); + } +} + + +}} diff --git a/folly/experimental/fibers/Baton.cpp b/folly/experimental/fibers/Baton.cpp new file mode 100644 index 00000000..a33f8560 --- /dev/null +++ b/folly/experimental/fibers/Baton.cpp @@ -0,0 +1,158 @@ +/* + * Copyright 2015 Facebook, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#include "Baton.h" + +#include + +namespace folly { namespace fibers { + +void Baton::wait() { + wait([](){}); +} + +bool Baton::timed_wait(TimeoutController::Duration timeout) { + return timed_wait(timeout, [](){}); +} + +void Baton::waitThread() { + if (spinWaitForEarlyPost()) { + assert(waitingFiber_.load(std::memory_order_acquire) == POSTED); + return; + } + + auto fiber = waitingFiber_.load(); + + if (LIKELY(fiber == NO_WAITER && + waitingFiber_.compare_exchange_strong(fiber, THREAD_WAITING))) { + do { + folly::detail::MemoryIdler::futexWait(futex_.futex, THREAD_WAITING); + fiber = waitingFiber_.load(std::memory_order_relaxed); + } while (fiber == THREAD_WAITING); + } + + if (LIKELY(fiber == POSTED)) { + return; + } + + // Handle errors + if (fiber == TIMEOUT) { + throw std::logic_error("Thread baton can't have timeout status"); + } + if (fiber == THREAD_WAITING) { + throw std::logic_error("Other thread is already waiting on this baton"); + } + throw std::logic_error("Other fiber is already waiting on this baton"); +} + +bool Baton::spinWaitForEarlyPost() { + static_assert(PreBlockAttempts > 0, + "isn't this assert clearer than an uninitialized variable warning?"); + for (int i = 0; i < PreBlockAttempts; ++i) { + if (try_wait()) { + // hooray! + return true; + } +#if FOLLY_X64 + // The pause instruction is the polite way to spin, but it doesn't + // actually affect correctness to omit it if we don't have it. + // Pausing donates the full capabilities of the current core to + // its other hyperthreads for a dozen cycles or so + asm volatile ("pause"); +#endif + } + + return false; +} + +bool Baton::timedWaitThread(TimeoutController::Duration timeout) { + if (spinWaitForEarlyPost()) { + assert(waitingFiber_.load(std::memory_order_acquire) == POSTED); + return true; + } + + auto fiber = waitingFiber_.load(); + + if (LIKELY(fiber == NO_WAITER && + waitingFiber_.compare_exchange_strong(fiber, THREAD_WAITING))) { + auto deadline = TimeoutController::Clock::now() + timeout; + do { + const auto wait_rv = + futex_.futex.futexWaitUntil(THREAD_WAITING, deadline); + if (wait_rv == folly::detail::FutexResult::TIMEDOUT) { + return false; + } + fiber = waitingFiber_.load(std::memory_order_relaxed); + } while (fiber == THREAD_WAITING); + } + + if (LIKELY(fiber == POSTED)) { + return true; + } + + // Handle errors + if (fiber == TIMEOUT) { + throw std::logic_error("Thread baton can't have timeout status"); + } + if (fiber == THREAD_WAITING) { + throw std::logic_error("Other thread is already waiting on this baton"); + } + throw std::logic_error("Other fiber is already waiting on this baton"); +} + +void Baton::post() { + postHelper(POSTED); +} + +void Baton::postHelper(intptr_t new_value) { + auto fiber = waitingFiber_.load(); + + do { + if (fiber == THREAD_WAITING) { + assert(new_value == POSTED); + + return postThread(); + } + + if (fiber == POSTED || fiber == TIMEOUT) { + return; + } + } while (!waitingFiber_.compare_exchange_weak(fiber, new_value)); + + if (fiber != NO_WAITER) { + reinterpret_cast(fiber)->setData(0); + } +} + +bool Baton::try_wait() { + auto state = waitingFiber_.load(); + return state == POSTED; +} + +void Baton::postThread() { + auto expected = THREAD_WAITING; + + if (!waitingFiber_.compare_exchange_strong(expected, POSTED)) { + return; + } + + futex_.futex.futexWake(1); +} + +void Baton::reset() { + waitingFiber_.store(NO_WAITER, std::memory_order_relaxed);; +} + +}} diff --git a/folly/experimental/fibers/Baton.h b/folly/experimental/fibers/Baton.h new file mode 100644 index 00000000..49585d9e --- /dev/null +++ b/folly/experimental/fibers/Baton.h @@ -0,0 +1,155 @@ +/* + * Copyright 2015 Facebook, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#pragma once + +#include + +#include +#include + +namespace folly { namespace fibers { + +class Fiber; + +/** + * @class Baton + * + * Primitive which allows to put current Fiber to sleep and wake it from another + * Fiber/thread. + */ +class Baton { + public: + Baton(); + + ~Baton() {} + + /** + * Puts active fiber to sleep. Returns when post is called. + */ + void wait(); + + /** + * Puts active fiber to sleep. Returns when post is called. + * + * @param mainContextFunc this function is immediately executed on the main + * context. + */ + template + void wait(F&& mainContextFunc); + + /** + * This is here only not break tao/locks. Please don't use it, because it is + * inefficient when used on Fibers. + */ + template + bool timed_wait(const std::chrono::time_point& timeout); + + /** + * Puts active fiber to sleep. Returns when post is called. + * + * @param timeout Baton will be automatically awaken if timeout is hit + * + * @return true if was posted, false if timeout expired + */ + bool timed_wait(TimeoutController::Duration timeout); + + /** + * Puts active fiber to sleep. Returns when post is called. + * + * @param timeout Baton will be automatically awaken if timeout is hit + * @param mainContextFunc this function is immediately executed on the main + * context. + * + * @return true if was posted, false if timeout expired + */ + template + bool timed_wait(TimeoutController::Duration timeout, F&& mainContextFunc); + + /** + * Checks if the baton has been posted without blocking. + * @return true iff the baton has been posted. + */ + bool try_wait(); + + /** + * Wakes up Fiber which was waiting on this Baton (or if no Fiber is waiting, + * next wait() call will return immediately). + */ + void post(); + + /** + * Reset's the baton (equivalent to destroying the object and constructing + * another one in place). + * Caller is responsible for making sure no one is waiting on/posting the + * baton when reset() is called. + */ + void reset(); + + private: + enum { + /** + * Must be positive. If multiple threads are actively using a + * higher-level data structure that uses batons internally, it is + * likely that the post() and wait() calls happen almost at the same + * time. In this state, we lose big 50% of the time if the wait goes + * to sleep immediately. On circa-2013 devbox hardware it costs about + * 7 usec to FUTEX_WAIT and then be awoken (half the t/iter as the + * posix_sem_pingpong test in BatonTests). We can improve our chances + * of early post by spinning for a bit, although we have to balance + * this against the loss if we end up sleeping any way. Spins on this + * hw take about 7 nanos (all but 0.5 nanos is the pause instruction). + * We give ourself 300 spins, which is about 2 usec of waiting. As a + * partial consolation, since we are using the pause instruction we + * are giving a speed boost to the colocated hyperthread. + */ + PreBlockAttempts = 300, + }; + + explicit Baton(intptr_t state) : waitingFiber_(state) {}; + + void postHelper(intptr_t new_value); + void postThread(); + void waitThread(); + + template + inline void waitFiber(FiberManager& fm, F&& mainContextFunc); + /** + * Spin for "some time" (see discussion on PreBlockAttempts) waiting + * for a post. + * @return true if we received a post the spin wait, false otherwise. If the + * function returns true then Baton state is guaranteed to be POSTED + */ + bool spinWaitForEarlyPost(); + + bool timedWaitThread(TimeoutController::Duration timeout); + + static constexpr intptr_t NO_WAITER = 0; + static constexpr intptr_t POSTED = -1; + static constexpr intptr_t TIMEOUT = -2; + static constexpr intptr_t THREAD_WAITING = -3; + + union { + std::atomic waitingFiber_; + struct { + folly::detail::Futex<> futex; + int32_t _unused_packing; + } futex_; + }; +}; + +}} + +#include diff --git a/folly/experimental/fibers/BoostContextCompatibility.h b/folly/experimental/fibers/BoostContextCompatibility.h new file mode 100644 index 00000000..2b04b089 --- /dev/null +++ b/folly/experimental/fibers/BoostContextCompatibility.h @@ -0,0 +1,109 @@ +/* + * Copyright 2015 Facebook, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#pragma once + +#include +#include + +/** + * Wrappers for different versions of boost::context library + * API reference for different versions + * Boost 1.51: http://www.boost.org/doc/libs/1_51_0/libs/context/doc/html/context/context/boost_fcontext.html + * Boost 1.52: http://www.boost.org/doc/libs/1_52_0/libs/context/doc/html/context/context/boost_fcontext.html + * Boost 1.56: http://www.boost.org/doc/libs/1_56_0/libs/context/doc/html/context/context/boost_fcontext.html + */ + +namespace folly { namespace fibers { + +struct FContext { + public: + +#if BOOST_VERSION >= 105200 + using ContextStruct = boost::context::fcontext_t; +#else + using ContextStruct = boost::ctx::fcontext_t; +#endif + + void* stackLimit() const { + return stackLimit_; + } + + void* stackBase() const { + return stackBase_; + } + + private: + void* stackLimit_; + void* stackBase_; + +#if BOOST_VERSION >= 105600 + ContextStruct context_; +#elif BOOST_VERSION >= 105200 + ContextStruct* context_; +#else + ContextStruct context_; +#endif + + friend intptr_t jumpContext(FContext* oldC, FContext::ContextStruct* newC, + intptr_t p); + friend intptr_t jumpContext(FContext::ContextStruct* oldC, FContext* newC, + intptr_t p); + friend FContext makeContext(void* stackLimit, size_t stackSize, + void(*fn)(intptr_t)); +}; + +inline intptr_t jumpContext(FContext* oldC, FContext::ContextStruct* newC, + intptr_t p) { + +#if BOOST_VERSION >= 105600 + return boost::context::jump_fcontext(&oldC->context_, *newC, p); +#elif BOOST_VERSION >= 105200 + return boost::context::jump_fcontext(oldC->context_, newC, p); +#else + return jump_fcontext(&oldC->context_, newC, p); +#endif + +} + +inline intptr_t jumpContext(FContext::ContextStruct* oldC, FContext* newC, + intptr_t p) { + +#if BOOST_VERSION >= 105200 + return boost::context::jump_fcontext(oldC, newC->context_, p); +#else + return jump_fcontext(oldC, &newC->context_, p); +#endif + +} + +inline FContext makeContext(void* stackLimit, size_t stackSize, + void(*fn)(intptr_t)) { + FContext res; + res.stackLimit_ = stackLimit; + res.stackBase_ = static_cast(stackLimit) + stackSize; + +#if BOOST_VERSION >= 105200 + res.context_ = boost::context::make_fcontext(res.stackBase_, stackSize, fn); +#else + res.context_.fc_stack.limit = stackLimit; + res.context_.fc_stack.base = res.stackBase_; + make_fcontext(&res.context_, fn); +#endif + + return res; +} + +}} // folly::fibers diff --git a/folly/experimental/fibers/EventBaseLoopController-inl.h b/folly/experimental/fibers/EventBaseLoopController-inl.h new file mode 100644 index 00000000..a38d3637 --- /dev/null +++ b/folly/experimental/fibers/EventBaseLoopController-inl.h @@ -0,0 +1,105 @@ +/* + * Copyright 2015 Facebook, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#include +#include +#include +#include + +namespace folly { namespace fibers { + +class EventBaseLoopController::ControllerCallback : + public folly::EventBase::LoopCallback { + public: + explicit ControllerCallback(EventBaseLoopController& controller) + : controller_(controller) {} + + void runLoopCallback() noexcept override { + controller_.runLoop(); + } + private: + EventBaseLoopController& controller_; +}; + +inline EventBaseLoopController::EventBaseLoopController() + : callback_(folly::make_unique(*this)) { +} + +inline EventBaseLoopController::~EventBaseLoopController() { + callback_->cancelLoopCallback(); +} + +inline void EventBaseLoopController::attachEventBase( + folly::EventBase& eventBase) { + + if (eventBase_ != nullptr) { + LOG(ERROR) << "Attempt to reattach EventBase to LoopController"; + } + + eventBase_ = &eventBase; + + eventBaseAttached_ = true; + + if (awaitingScheduling_) { + schedule(); + } +} + +inline void EventBaseLoopController::setFiberManager(FiberManager* fm) { + fm_ = fm; +} + +inline void EventBaseLoopController::schedule() { + if (eventBase_ == nullptr) { + // In this case we need to postpone scheduling. + awaitingScheduling_ = true; + } else { + // Schedule it to run in current iteration. + eventBase_->runInLoop(callback_.get(), true); + awaitingScheduling_ = false; + } +} + +inline void EventBaseLoopController::cancel() { + callback_->cancelLoopCallback(); +} + +inline void EventBaseLoopController::runLoop() { + fm_->loopUntilNoReady(); +} + +inline void EventBaseLoopController::scheduleThreadSafe() { + /* The only way we could end up here is if + 1) Fiber thread creates a fiber that awaits (which means we must + have already attached, fiber thread wouldn't be running). + 2) We move the promise to another thread (this move is a memory fence) + 3) We fulfill the promise from the other thread. */ + assert(eventBaseAttached_); + eventBase_->runInEventBaseThread([this] () { runLoop(); }); +} + +inline void EventBaseLoopController::timedSchedule(std::function func, + TimePoint time) { + assert(eventBaseAttached_); + + // We want upper bound for the cast, thus we just add 1 + auto delay_ms = std::chrono::duration_cast< + std::chrono::milliseconds>(time - Clock::now()).count() + 1; + // If clock is not monotonic + delay_ms = std::max(delay_ms, 0L); + eventBase_->tryRunAfterDelay(func, delay_ms); +} + +}} // folly::fibers diff --git a/folly/experimental/fibers/EventBaseLoopController.h b/folly/experimental/fibers/EventBaseLoopController.h new file mode 100644 index 00000000..0a8f83ec --- /dev/null +++ b/folly/experimental/fibers/EventBaseLoopController.h @@ -0,0 +1,66 @@ +/* + * Copyright 2015 Facebook, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#pragma once + +#include +#include + +namespace folly { +class EventBase; +} + +namespace folly { namespace fibers { + +class FiberManager; + +class EventBaseLoopController : public LoopController { + public: + explicit EventBaseLoopController(); + ~EventBaseLoopController(); + + /** + * Attach EventBase after LoopController was created. + */ + void attachEventBase(folly::EventBase& eventBase); + + folly::EventBase* getEventBase() { + return eventBase_; + } + + private: + class ControllerCallback; + + bool awaitingScheduling_{false}; + folly::EventBase* eventBase_{nullptr}; + std::unique_ptr callback_; + FiberManager* fm_{nullptr}; + std::atomic eventBaseAttached_{false}; + + /* LoopController interface */ + + void setFiberManager(FiberManager* fm) override; + void schedule() override; + void cancel() override; + void runLoop(); + void scheduleThreadSafe() override; + void timedSchedule(std::function func, TimePoint time) override; + + friend class FiberManager; +}; + +}} // folly::fibers + +#include "EventBaseLoopController-inl.h" diff --git a/folly/experimental/fibers/Fiber-inl.h b/folly/experimental/fibers/Fiber-inl.h new file mode 100644 index 00000000..adb78a49 --- /dev/null +++ b/folly/experimental/fibers/Fiber-inl.h @@ -0,0 +1,48 @@ +/* + * Copyright 2015 Facebook, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#pragma once + +#include + +namespace folly { namespace fibers { + +template +void Fiber::setFunction(F&& func) { + assert(state_ == INVALID); + func_ = std::move(func); + state_ = NOT_STARTED; +} + +template +void Fiber::setFunctionFinally(F&& resultFunc, + G&& finallyFunc) { + assert(state_ == INVALID); + resultFunc_ = std::move(resultFunc); + finallyFunc_ = std::move(finallyFunc); + state_ = NOT_STARTED; +} + +inline void* Fiber::getUserBuffer() { + return &userBuffer_; +} + +template +void Fiber::setReadyFunction(G&& func) { + assert(state_ == INVALID || state_ == NOT_STARTED); + readyFunc_ = std::move(func); +} + +}} // folly::fibers diff --git a/folly/experimental/fibers/Fiber.cpp b/folly/experimental/fibers/Fiber.cpp new file mode 100644 index 00000000..d738f67f --- /dev/null +++ b/folly/experimental/fibers/Fiber.cpp @@ -0,0 +1,179 @@ +/* + * Copyright 2015 Facebook, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#include "Fiber.h" + +#include +#include + +#include +#include +#include +#include + +#include +#include +#include +#include + +namespace folly { namespace fibers { + +namespace { +static const uint64_t kMagic8Bytes = 0xfaceb00cfaceb00c; + +pid_t localThreadId() { + static thread_local pid_t threadId = syscall(SYS_gettid); + return threadId; +} + +static void fillMagic(const FContext& context) { + uint64_t* begin = static_cast(context.stackLimit()); + uint64_t* end = static_cast(context.stackBase()); + + std::fill(begin, end, kMagic8Bytes); +} + +/* Size of the region from p + nBytes down to the last non-magic value */ +static size_t nonMagicInBytes(const FContext& context) { + uint64_t* begin = static_cast(context.stackLimit()); + uint64_t* end = static_cast(context.stackBase()); + + auto firstNonMagic = std::find_if( + begin, end, + [](uint64_t val) { + return val != kMagic8Bytes; + } + ); + + return (end - firstNonMagic) * sizeof(uint64_t); +} + +} // anonymous namespace + +void Fiber::setData(intptr_t data) { + assert(state_ == AWAITING); + data_ = data; + state_ = READY_TO_RUN; + + if (LIKELY(threadId_ == localThreadId())) { + fiberManager_.readyFibers_.push_back(*this); + fiberManager_.ensureLoopScheduled(); + } else { + fiberManager_.remoteReadyInsert(this); + } +} + +Fiber::Fiber(FiberManager& fiberManager) : + fiberManager_(fiberManager) { + + auto size = fiberManager_.options_.stackSize; + auto limit = fiberManager_.stackAllocator_.allocate(size); + + fcontext_ = makeContext(limit, size, &Fiber::fiberFuncHelper); + + if (UNLIKELY(fiberManager_.options_.debugRecordStackUsed)) { + fillMagic(fcontext_); + } +} + +Fiber::~Fiber() { + fiberManager_.stackAllocator_.deallocate( + static_cast(fcontext_.stackLimit()), + fiberManager_.options_.stackSize); +} + +void Fiber::recordStackPosition() { + int stackDummy; + fiberManager_.stackHighWatermark_ = + std::max(fiberManager_.stackHighWatermark_, + static_cast( + static_cast(fcontext_.stackBase()) - + static_cast( + static_cast(&stackDummy)))); +} + +void Fiber::fiberFuncHelper(intptr_t fiber) { + reinterpret_cast(fiber)->fiberFunc(); +} + +/* + * Some weird bug in ASAN causes fiberFunc to allocate boundless amounts of + * memory inside __asan_handle_no_return. Work around this in ASAN builds by + * tricking the compiler into thinking it may, someday, return. + */ +#ifdef FOLLY_SANITIZE_ADDRESS +volatile bool loopForever = true; +#else +static constexpr bool loopForever = true; +#endif + +void Fiber::fiberFunc() { + while (loopForever) { + assert(state_ == NOT_STARTED); + + threadId_ = localThreadId(); + state_ = RUNNING; + + try { + if (resultFunc_) { + assert(finallyFunc_); + assert(!func_); + + resultFunc_(); + } else { + assert(func_); + func_(); + } + } catch (...) { + fiberManager_.exceptionCallback_(std::current_exception(), + "running Fiber func_/resultFunc_"); + } + + if (UNLIKELY(fiberManager_.options_.debugRecordStackUsed)) { + fiberManager_.stackHighWatermark_ = + std::max(fiberManager_.stackHighWatermark_, + nonMagicInBytes(fcontext_)); + } + + state_ = INVALID; + + fiberManager_.activeFiber_ = nullptr; + + auto fiber = reinterpret_cast( + jumpContext(&fcontext_, &fiberManager_.mainContext_, 0)); + assert(fiber == this); + } +} + +intptr_t Fiber::preempt(State state) { + assert(fiberManager_.activeFiber_ == this); + assert(state_ == RUNNING); + assert(state != RUNNING); + + fiberManager_.activeFiber_ = nullptr; + state_ = state; + + recordStackPosition(); + + auto ret = jumpContext(&fcontext_, &fiberManager_.mainContext_, 0); + + assert(fiberManager_.activeFiber_ == this); + assert(state_ == READY_TO_RUN); + state_ = RUNNING; + + return ret; +} + +}} diff --git a/folly/experimental/fibers/Fiber.h b/folly/experimental/fibers/Fiber.h new file mode 100644 index 00000000..d18bb782 --- /dev/null +++ b/folly/experimental/fibers/Fiber.h @@ -0,0 +1,126 @@ +/* + * Copyright 2015 Facebook, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#pragma once + +#include + +#include +#include +#include +#include +#include + +namespace folly { namespace fibers { + +class Baton; +class FiberManager; + +/** + * @class Fiber + * @brief Fiber object used by FiberManager to execute tasks. + * + * Each Fiber object can be executing at most one task at a time. In active + * phase it is running the task function and keeps its context. + * Fiber is also used to pass data to blocked task and thus unblock it. + * Each Fiber may be associated with a single FiberManager. + */ +class Fiber { + public: + /** + * Sets data for the blocked task + * + * @param data this data will be returned by await() when task is resumed. + */ + void setData(intptr_t data); + + Fiber(const Fiber&) = delete; + Fiber& operator=(const Fiber&) = delete; + + ~Fiber(); + private: + enum State { + INVALID, /**< Does't have task function */ + NOT_STARTED, /**< Has task function, not started */ + READY_TO_RUN, /**< Was started, blocked, then unblocked */ + RUNNING, /**< Is running right now */ + AWAITING, /**< Is currently blocked */ + AWAITING_IMMEDIATE, /**< Was preempted to run an immediate function, + and will be resumed right away */ + }; + + State state_{INVALID}; /**< current Fiber state */ + + friend class Baton; + friend class FiberManager; + + explicit Fiber(FiberManager& fiberManager); + + template + void setFunction(F&& func); + + template + void setFunctionFinally(F&& func, G&& finally); + + template + void setReadyFunction(G&& func); + + static void fiberFuncHelper(intptr_t fiber); + void fiberFunc(); + + /** + * Switch out of fiber context into the main context, + * performing necessary housekeeping for the new state. + * + * @param state New state, must not be RUNNING. + * + * @return The value passed back from the main context. + */ + intptr_t preempt(State state); + + /** + * Examines how much of the stack we used at this moment and + * registers with the FiberManager (for monitoring). + */ + void recordStackPosition(); + + FiberManager& fiberManager_; /**< Associated FiberManager */ + FContext fcontext_; /**< current task execution context */ + intptr_t data_; /**< Used to keep some data with the Fiber */ + std::function func_; /**< task function */ + std::function readyFunc_; /**< function to be executed before jumping + to this fiber */ + + /** + * Points to next fiber in remote ready list + */ + folly::AtomicLinkedListHook nextRemoteReady_; + + static constexpr size_t kUserBufferSize = 256; + std::aligned_storage::type userBuffer_; + + void* getUserBuffer(); + + std::function resultFunc_; + std::function finallyFunc_; + + folly::IntrusiveListHook listHook_; /**< list hook for different FiberManager + queues */ + pid_t threadId_{0}; +}; + +}} + +#include diff --git a/folly/experimental/fibers/FiberManager-inl.h b/folly/experimental/fibers/FiberManager-inl.h new file mode 100644 index 00000000..9bf6eb6b --- /dev/null +++ b/folly/experimental/fibers/FiberManager-inl.h @@ -0,0 +1,393 @@ +/* + * Copyright 2015 Facebook, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#pragma once + +#include + +#include +#include +#include +#include +#include +#include +#include +#include +#include + +namespace folly { namespace fibers { + +inline void FiberManager::ensureLoopScheduled() { + if (isLoopScheduled_) { + return; + } + + isLoopScheduled_ = true; + loopController_->schedule(); +} + +inline void FiberManager::runReadyFiber(Fiber* fiber) { + assert(fiber->state_ == Fiber::NOT_STARTED || + fiber->state_ == Fiber::READY_TO_RUN); + + while (fiber->state_ == Fiber::NOT_STARTED || + fiber->state_ == Fiber::READY_TO_RUN) { + activeFiber_ = fiber; + if (fiber->readyFunc_) { + fiber->readyFunc_(); + } + jumpContext(&mainContext_, &fiber->fcontext_, fiber->data_); + if (fiber->state_ == Fiber::AWAITING_IMMEDIATE) { + try { + immediateFunc_(); + } catch (...) { + exceptionCallback_(std::current_exception(), "running immediateFunc_"); + } + immediateFunc_ = nullptr; + fiber->state_ = Fiber::READY_TO_RUN; + } + } + + if (fiber->state_ == Fiber::AWAITING) { + awaitFunc_(*fiber); + awaitFunc_ = nullptr; + } else if (fiber->state_ == Fiber::INVALID) { + assert(fibersActive_ > 0); + --fibersActive_; + // Making sure that task functor is deleted once task is complete. + // NOTE: we must do it on main context, as the fiber is not + // running at this point. + fiber->func_ = nullptr; + fiber->resultFunc_ = nullptr; + if (fiber->finallyFunc_) { + try { + fiber->finallyFunc_(); + } catch (...) { + exceptionCallback_(std::current_exception(), "running finallyFunc_"); + } + fiber->finallyFunc_ = nullptr; + } + + if (fibersPoolSize_ < options_.maxFibersPoolSize) { + fibersPool_.push_front(*fiber); + ++fibersPoolSize_; + } else { + delete fiber; + assert(fibersAllocated_ > 0); + --fibersAllocated_; + } + } +} + +inline bool FiberManager::loopUntilNoReady() { + SCOPE_EXIT { + isLoopScheduled_ = false; + currentFiberManager_ = nullptr; + }; + + currentFiberManager_ = this; + + bool hadRemoteFiber = true; + while (hadRemoteFiber) { + hadRemoteFiber = false; + + while (!readyFibers_.empty()) { + auto& fiber = readyFibers_.front(); + readyFibers_.pop_front(); + runReadyFiber(&fiber); + } + + remoteReadyQueue_.sweep( + [this, &hadRemoteFiber] (Fiber* fiber) { + runReadyFiber(fiber); + hadRemoteFiber = true; + } + ); + + remoteTaskQueue_.sweep( + [this, &hadRemoteFiber] (RemoteTask* taskPtr) { + std::unique_ptr task(taskPtr); + auto fiber = getFiber(); + fiber->setFunction(std::move(task->func)); + fiber->data_ = reinterpret_cast(fiber); + runReadyFiber(fiber); + hadRemoteFiber = true; + } + ); + } + + return fibersActive_ > 0; +} + +// We need this to be in a struct, not inlined in addTask, because clang crashes +// otherwise. +template +struct FiberManager::AddTaskHelper { + class Func; + + static constexpr bool allocateInBuffer = + sizeof(Func) <= Fiber::kUserBufferSize; + + class Func { + public: + Func(F&& func, FiberManager& fm) : + func_(std::forward(func)), fm_(fm) {} + + void operator()() { + try { + func_(); + } catch (...) { + fm_.exceptionCallback_(std::current_exception(), + "running Func functor"); + } + if (allocateInBuffer) { + this->~Func(); + } else { + delete this; + } + } + + private: + F func_; + FiberManager& fm_; + }; +}; + +template +void FiberManager::addTask(F&& func) { + typedef AddTaskHelper Helper; + + auto fiber = getFiber(); + + if (Helper::allocateInBuffer) { + auto funcLoc = static_cast(fiber->getUserBuffer()); + new (funcLoc) typename Helper::Func(std::forward(func), *this); + + fiber->setFunction(std::ref(*funcLoc)); + } else { + auto funcLoc = new typename Helper::Func(std::forward(func), *this); + + fiber->setFunction(std::ref(*funcLoc)); + } + + fiber->data_ = reinterpret_cast(fiber); + readyFibers_.push_back(*fiber); + + ensureLoopScheduled(); +} + +template +void FiberManager::addTaskReadyFunc(F&& func, G&& readyFunc) { + auto fiber = getFiber(); + fiber->setFunction(std::forward(func)); + fiber->setReadyFunction(std::forward(readyFunc)); + + fiber->data_ = reinterpret_cast(fiber); + readyFibers_.push_back(*fiber); + + ensureLoopScheduled(); +} + +template +void FiberManager::addTaskRemote(F&& func) { + auto task = folly::make_unique(std::move(func)); + if (remoteTaskQueue_.insertHead(task.release())) { + loopController_->scheduleThreadSafe(); + } +} + +template +struct IsRvalueRefTry { static const bool value = false; }; +template +struct IsRvalueRefTry&&> { static const bool value = true; }; + +// We need this to be in a struct, not inlined in addTaskFinally, because clang +// crashes otherwise. +template +struct FiberManager::AddTaskFinallyHelper { + class Func; + class Finally; + + typedef typename std::result_of::type Result; + + static constexpr bool allocateInBuffer = + sizeof(Func) + sizeof(Finally) <= Fiber::kUserBufferSize; + + class Finally { + public: + Finally(G&& finally, + FiberManager& fm) : + finally_(std::move(finally)), + fm_(fm) { + } + + void operator()() { + try { + finally_(std::move(*result_)); + } catch (...) { + fm_.exceptionCallback_(std::current_exception(), + "running Finally functor"); + } + + if (allocateInBuffer) { + this->~Finally(); + } else { + delete this; + } + } + + private: + friend class Func; + + G finally_; + folly::Optional> result_; + FiberManager& fm_; + }; + + class Func { + public: + Func(F&& func, Finally& finally) : + func_(std::move(func)), result_(finally.result_) {} + + void operator()() { + result_ = folly::makeTryFunction(std::move(func_)); + + if (allocateInBuffer) { + this->~Func(); + } else { + delete this; + } + } + + private: + F func_; + folly::Optional>& result_; + }; +}; + +template +void FiberManager::addTaskFinally(F&& func, G&& finally) { + typedef typename std::result_of::type Result; + + static_assert( + IsRvalueRefTry::type>::value, + "finally(arg): arg must be Try&&"); + static_assert( + std::is_convertible< + Result, + typename std::remove_reference< + typename FirstArgOf::type + >::type::element_type + >::value, + "finally(Try&&): T must be convertible from func()'s return type"); + + auto fiber = getFiber(); + + typedef AddTaskFinallyHelper Helper; + + if (Helper::allocateInBuffer) { + auto funcLoc = static_cast( + fiber->getUserBuffer()); + auto finallyLoc = static_cast( + static_cast(funcLoc + 1)); + + new (finallyLoc) typename Helper::Finally(std::move(finally), *this); + new (funcLoc) typename Helper::Func(std::move(func), *finallyLoc); + + fiber->setFunctionFinally(std::ref(*funcLoc), std::ref(*finallyLoc)); + } else { + auto finallyLoc = new typename Helper::Finally(std::move(finally), *this); + auto funcLoc = new typename Helper::Func(std::move(func), *finallyLoc); + + fiber->setFunctionFinally(std::ref(*funcLoc), std::ref(*finallyLoc)); + } + + fiber->data_ = reinterpret_cast(fiber); + readyFibers_.push_back(*fiber); + + ensureLoopScheduled(); +} + +template +typename std::result_of::type +FiberManager::runInMainContext(F&& func) { + return runInMainContextHelper(std::forward(func)); +} + +template +inline typename std::enable_if< + !std::is_same::type, void>::value, + typename std::result_of::type>::type +FiberManager::runInMainContextHelper(F&& func) { + if (UNLIKELY(activeFiber_ == nullptr)) { + return func(); + } + + typedef typename std::result_of::type Result; + + folly::Try result; + auto f = [&func, &result]() mutable { + result = folly::makeTryFunction(std::forward(func)); + }; + + immediateFunc_ = std::ref(f); + activeFiber_->preempt(Fiber::AWAITING_IMMEDIATE); + + return std::move(result.value()); +} + +template +inline typename std::enable_if< + std::is_same::type, void>::value, + void>::type +FiberManager::runInMainContextHelper(F&& func) { + if (UNLIKELY(activeFiber_ == nullptr)) { + func(); + return; + } + + immediateFunc_ = std::ref(func); + activeFiber_->preempt(Fiber::AWAITING_IMMEDIATE); +} + +inline FiberManager& FiberManager::getFiberManager() { + assert(currentFiberManager_ != nullptr); + return *currentFiberManager_; +} + +inline FiberManager* FiberManager::getFiberManagerUnsafe() { + return currentFiberManager_; +} + +inline bool FiberManager::hasActiveFiber() { + return activeFiber_ != nullptr; +} + +template +typename FirstArgOf::type::value_type +inline await(F&& func) { + typedef typename FirstArgOf::type::value_type Result; + + folly::Try result; + + Baton baton; + baton.wait([&func, &result, &baton]() mutable { + func(Promise(result, baton)); + }); + + return folly::moveFromTry(std::move(result)); +} + +}} diff --git a/folly/experimental/fibers/FiberManager.cpp b/folly/experimental/fibers/FiberManager.cpp new file mode 100644 index 00000000..41daf681 --- /dev/null +++ b/folly/experimental/fibers/FiberManager.cpp @@ -0,0 +1,124 @@ +/* + * Copyright 2015 Facebook, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#include "FiberManager.h" + +#include +#include + +#include +#include + +#include + +#include +#include + +namespace folly { namespace fibers { + +__thread FiberManager* FiberManager::currentFiberManager_ = nullptr; + +FiberManager::FiberManager(std::unique_ptr loopController, + Options options) : + loopController_(std::move(loopController)), + options_(options), + exceptionCallback_([](std::exception_ptr e, std::string context) { + try { + std::rethrow_exception(e); + } catch (const std::exception& e) { + LOG(DFATAL) << "Exception " << typeid(e).name() + << " with message '" << e.what() << "' was thrown in " + << "FiberManager with context '" << context << "'"; + throw; + } catch (...) { + LOG(DFATAL) << "Unknown exception was thrown in FiberManager with " + << "context '" << context << "'"; + throw; + } + }), + timeoutManager_(std::make_shared(*loopController_)) { + loopController_->setFiberManager(this); +} + +FiberManager::~FiberManager() { + if (isLoopScheduled_) { + loopController_->cancel(); + } + + Fiber* fiberIt; + Fiber* fiberItNext; + while (!fibersPool_.empty()) { + fibersPool_.pop_front_and_dispose([] (Fiber* fiber) { + delete fiber; + }); + } + assert(readyFibers_.empty()); + assert(fibersActive_ == 0); +} + +LoopController& FiberManager::loopController() { + return *loopController_; +} + +const LoopController& FiberManager::loopController() const { + return *loopController_; +} + +bool FiberManager::hasTasks() const { + return fibersActive_ > 0 || + !remoteReadyQueue_.empty() || + !remoteTaskQueue_.empty(); +} + +Fiber* FiberManager::getFiber() { + Fiber* fiber = nullptr; + if (fibersPool_.empty()) { + fiber = new Fiber(*this); + ++fibersAllocated_; + } else { + fiber = &fibersPool_.front(); + fibersPool_.pop_front(); + assert(fibersPoolSize_ > 0); + --fibersPoolSize_; + } + ++fibersActive_; + assert(fiber); + return fiber; +} + +void FiberManager::setExceptionCallback(FiberManager::ExceptionCallback ec) { + assert(ec); + exceptionCallback_ = std::move(ec); +} + +size_t FiberManager::fibersAllocated() const { + return fibersAllocated_; +} + +size_t FiberManager::fibersPoolSize() const { + return fibersPoolSize_; +} + +size_t FiberManager::stackHighWatermark() const { + return stackHighWatermark_; +} + +void FiberManager::remoteReadyInsert(Fiber* fiber) { + if (remoteReadyQueue_.insertHead(fiber)) { + loopController_->scheduleThreadSafe(); + } +} + +}} diff --git a/folly/experimental/fibers/FiberManager.h b/folly/experimental/fibers/FiberManager.h new file mode 100644 index 00000000..aba56173 --- /dev/null +++ b/folly/experimental/fibers/FiberManager.h @@ -0,0 +1,379 @@ +/* + * Copyright 2015 Facebook, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#pragma once + +#include +#include +#include +#include +#include + +#include +#include +#include +#include + +#include +#include +#include + +#ifdef USE_GUARD_ALLOCATOR +#include +#endif + +namespace folly { namespace fibers { + +class Baton; +class Fiber; +class LoopController; +class TimeoutController; + +/** + * @class FiberManager + * @brief Single-threaded task execution engine. + * + * FiberManager allows semi-parallel task execution on the same thread. Each + * task can notify FiberManager that it is blocked on something (via await()) + * call. This will pause execution of this task and it will be resumed only + * when it is unblocked (via setData()). + */ +class FiberManager { + public: + struct Options { +#ifdef FOLLY_SANITIZE_ADDRESS + /* ASAN needs a lot of extra stack space. + 16x is a conservative estimate, 8x also worked with tests + where it mattered. Note that overallocating here does not necessarily + increase RSS, since unused memory is pretty much free. */ + static constexpr size_t kDefaultStackSize{16 * 16 * 1024}; +#else + static constexpr size_t kDefaultStackSize{16 * 1024}; +#endif + /** + * Maximum stack size for fibers which will be used for executing all the + * tasks. + */ + size_t stackSize{kDefaultStackSize}; + + /** + * Record exact amount of stack used. + * + * This is fairly expensive: we fill each newly allocated stack + * with some known value and find the boundary of unused stack + * with linear search every time we surrender the stack back to fibersPool. + */ + bool debugRecordStackUsed{false}; + + /** + * Keep at most this many free fibers in the pool. + * This way the total number of fibers in the system is always bounded + * by the number of active fibers + maxFibersPoolSize. + */ + size_t maxFibersPoolSize{1000}; + + constexpr Options() {} + }; + + typedef std::function + ExceptionCallback; + + /** + * Initializes, but doesn't start FiberManager loop + * + * @param options FiberManager options + */ + explicit FiberManager(std::unique_ptr loopController, + Options options = Options()); + + ~FiberManager(); + + /** + * Controller access. + */ + LoopController& loopController(); + const LoopController& loopController() const; + + /** + * Keeps running ready tasks until the list of ready tasks is empty. + * + * @return True if there are any waiting tasks remaining. + */ + bool loopUntilNoReady(); + + /** + * @return true if there are outstanding tasks. + */ + bool hasTasks() const; + + /** + * Sets exception callback which will be called if any of the tasks throws an + * exception. + * + * @param ec + */ + void setExceptionCallback(ExceptionCallback ec); + + /** + * Add a new task to be executed. Must be called from FiberManager's thread. + * + * @param func Task functor; must have a signature of `void func()`. + * The object will be destroyed once task execution is complete. + */ + template + void addTask(F&& func); + + /** + * Add a new task to be executed, along with a function readyFunc_ which needs + * to be executed just before jumping to the ready fiber + * + * @param func Task functor; must have a signature of `T func()` for some T. + * @param readyFunc functor that needs to be executed just before jumping to + * ready fiber on the main context. This can for example be + * used to set up state before starting or resuming a fiber. + */ + template + void addTaskReadyFunc(F&& func, G&& readyFunc); + + /** + * Add a new task to be executed. Safe to call from other threads. + * + * @param func Task function; must have a signature of `void func()`. + * The object will be destroyed once task execution is complete. + */ + template + void addTaskRemote(F&& func); + + /** + * Add a new task. When the task is complete, execute finally(Try&&) + * on the main context. + * + * @param func Task functor; must have a signature of `T func()` for some T. + * @param finally Finally functor; must have a signature of + * `void finally(Try&&)` and will be passed + * the result of func() (including the exception if occurred). + */ + template + void addTaskFinally(F&& func, G&& finally); + + /** + * If called from a fiber, immediately switches to the FiberManager's context + * and runs func(), going back to the Fiber's context after completion. + * Outside a fiber, just calls func() directly. + * + * @return value returned by func(). + */ + template + typename std::result_of::type + runInMainContext(F&& func); + + /** + * @return How many fiber objects (and stacks) has this manager allocated. + */ + size_t fibersAllocated() const; + + /** + * @return How many of the allocated fiber objects are currently + * in the free pool. + */ + size_t fibersPoolSize() const; + + /** + * return true if running activeFiber_ is not nullptr. + */ + bool hasActiveFiber(); + + /** + * @return What was the most observed fiber stack usage (in bytes). + */ + size_t stackHighWatermark() const; + + static FiberManager& getFiberManager(); + static FiberManager* getFiberManagerUnsafe(); + + private: + friend class Baton; + friend class Fiber; + template + struct AddTaskHelper; + template + struct AddTaskFinallyHelper; + + struct RemoteTask { + template + explicit RemoteTask(F&& f) : func(std::move(f)) {} + std::function func; + folly::AtomicLinkedListHook nextRemoteTask; + }; + + typedef folly::IntrusiveList FiberTailQueue; + + Fiber* activeFiber_{nullptr}; /**< active fiber, nullptr on main context */ + + FiberTailQueue readyFibers_; /**< queue of fibers ready to be executed */ + FiberTailQueue fibersPool_; /**< pool of unitialized Fiber objects */ + + size_t fibersAllocated_{0}; /**< total number of fibers allocated */ + size_t fibersPoolSize_{0}; /**< total number of fibers in the free pool */ + size_t fibersActive_{0}; /**< number of running or blocked fibers */ + + FContext::ContextStruct mainContext_; /**< stores loop function context */ + + std::unique_ptr loopController_; + bool isLoopScheduled_{false}; /**< was the ready loop scheduled to run? */ + + /** + * When we are inside FiberManager loop this points to FiberManager. Otherwise + * it's nullptr + */ + static __thread FiberManager* currentFiberManager_; + + /** + * runInMainContext implementation for non-void functions. + */ + template + typename std::enable_if< + !std::is_same::type, void>::value, + typename std::result_of::type>::type + runInMainContextHelper(F&& func); + + /** + * runInMainContext implementation for void functions + */ + template + typename std::enable_if< + std::is_same::type, void>::value, + void>::type + runInMainContextHelper(F&& func); + + /** + * Allocator used to allocate stack for Fibers in the pool. + * Allocates stack on the stack of the main context. + */ +#ifdef USE_GUARD_ALLOCATOR + /* This is too slow for production use; can be fixed + if we allocated all stack storage once upfront */ + GuardPageAllocator stackAllocator_; +#else + std::allocator stackAllocator_; +#endif + + const Options options_; /**< FiberManager options */ + + /** + * Largest observed individual Fiber stack usage in bytes. + */ + size_t stackHighWatermark_{0}; + + /** + * Schedules a loop with loopController (unless already scheduled before). + */ + void ensureLoopScheduled(); + + /** + * @return An initialized Fiber object from the pool + */ + Fiber* getFiber(); + + /** + * Function passed to the await call. + */ + std::function awaitFunc_; + + /** + * Function passed to the runInMainContext call. + */ + std::function immediateFunc_; + + ExceptionCallback exceptionCallback_; /**< task exception callback */ + + folly::AtomicLinkedList remoteReadyQueue_; + + folly::AtomicLinkedList + remoteTaskQueue_; + + std::shared_ptr timeoutManager_; + + void runReadyFiber(Fiber* fiber); + void remoteReadyInsert(Fiber* fiber); +}; + +/** + * @return true iff we are running in a fiber's context + */ +inline bool onFiber() { + auto fm = FiberManager::getFiberManagerUnsafe(); + return fm ? fm->hasActiveFiber() : false; +} + +/** + * Add a new task to be executed. + * + * @param func Task functor; must have a signature of `void func()`. + * The object will be destroyed once task execution is complete. + */ +template +inline void addTask(F&& func) { + return FiberManager::getFiberManager().addTask(std::forward(func)); +} + +/** + * Add a new task. When the task is complete, execute finally(Try&&) + * on the main context. + * Task functor is run and destroyed on the fiber context. + * Finally functor is run and destroyed on the main context. + * + * @param func Task functor; must have a signature of `T func()` for some T. + * @param finally Finally functor; must have a signature of + * `void finally(Try&&)` and will be passed + * the result of func() (including the exception if occurred). + */ +template +inline void addTaskFinally(F&& func, G&& finally) { + return FiberManager::getFiberManager().addTaskFinally( + std::forward(func), std::forward(finally)); +} + +/** + * Blocks task execution until given promise is fulfilled. + * + * Calls function passing in a Promise, which has to be fulfilled. + * + * @return data which was used to fulfill the promise. + */ +template +typename FirstArgOf::type::value_type +inline await(F&& func); + +/** + * If called from a fiber, immediately switches to the FiberManager's context + * and runs func(), going back to the Fiber's context after completion. + * Outside a fiber, just calls func() directly. + * + * @return value returned by func(). + */ +template +typename std::result_of::type +inline runInMainContext(F&& func) { + auto fm = FiberManager::getFiberManagerUnsafe(); + if (UNLIKELY(fm == nullptr)) { + return func(); + } + return fm->runInMainContext(std::forward(func)); +} + +}} + +#include "FiberManager-inl.h" diff --git a/folly/experimental/fibers/FiberManagerMap.cpp b/folly/experimental/fibers/FiberManagerMap.cpp new file mode 100644 index 00000000..0f3e441b --- /dev/null +++ b/folly/experimental/fibers/FiberManagerMap.cpp @@ -0,0 +1,61 @@ +/* + * Copyright 2015 Facebook, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#include + +#include +#include + +namespace folly { namespace fibers { + +namespace detail { + +thread_local std::unordered_map + localFiberManagerMap; +std::unordered_map> + fiberManagerMap; +std::mutex fiberManagerMapMutex; + +FiberManager* getFiberManagerThreadSafe(folly::EventBase& evb, + const FiberManager::Options& opts) { + std::lock_guard lg(fiberManagerMapMutex); + + auto it = fiberManagerMap.find(&evb); + if (LIKELY(it != fiberManagerMap.end())) { + return it->second.get(); + } + + auto loopController = folly::make_unique(); + loopController->attachEventBase(evb); + auto fiberManager = + folly::make_unique(std::move(loopController), opts); + auto result = fiberManagerMap.emplace(&evb, std::move(fiberManager)); + return result.first->second.get(); +} + +} // detail namespace + +FiberManager& getFiberManager(folly::EventBase& evb, + const FiberManager::Options& opts) { + auto it = detail::localFiberManagerMap.find(&evb); + if (LIKELY(it != detail::localFiberManagerMap.end())) { + return *(it->second); + } + + return *(detail::localFiberManagerMap[&evb] = + detail::getFiberManagerThreadSafe(evb, opts)); +} + +}} diff --git a/folly/experimental/fibers/FiberManagerMap.h b/folly/experimental/fibers/FiberManagerMap.h new file mode 100644 index 00000000..fced0dee --- /dev/null +++ b/folly/experimental/fibers/FiberManagerMap.h @@ -0,0 +1,27 @@ +/* + * Copyright 2015 Facebook, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#pragma once + +#include +#include + +namespace folly { namespace fibers { + +FiberManager& getFiberManager( + folly::EventBase& evb, + const FiberManager::Options& opts = FiberManager::Options()); + +}} diff --git a/folly/experimental/fibers/ForEach-inl.h b/folly/experimental/fibers/ForEach-inl.h new file mode 100644 index 00000000..ed5b72d6 --- /dev/null +++ b/folly/experimental/fibers/ForEach-inl.h @@ -0,0 +1,88 @@ +/* + * Copyright 2015 Facebook, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#include + +namespace folly { namespace fibers { + +namespace { + +template +typename std::enable_if< + !std::is_same::type, void>::value, void>::type +inline callFuncs(F&& f, G&& g, size_t id) { + g(id, f()); +} + +template +typename std::enable_if< + std::is_same::type, void>::value, void>::type +inline callFuncs(F&& f, G&& g, size_t id) { + f(); + g(id); +} + +} // anonymous namespace + +template +inline void forEach(InputIterator first, InputIterator last, F&& f) { + if (first == last) { + return; + } + + typedef typename std::iterator_traits::value_type FuncType; + + size_t tasksTodo = 1; + std::exception_ptr e; + Baton baton; + +#ifdef __clang__ +#pragma clang diagnostic push // ignore generalized lambda capture warning +#pragma clang diagnostic ignored "-Wc++1y-extensions" +#endif + auto taskFunc = + [&tasksTodo, &e, &f, &baton] (size_t id, FuncType&& func) { + return [id, &tasksTodo, &e, &f, &baton, + func_ = std::forward(func)]() mutable { + try { + callFuncs(std::forward(func_), f, id); + } catch (...) { + e = std::current_exception(); + } + if (--tasksTodo == 0) { + baton.post(); + } + }; + }; +#ifdef __clang__ +#pragma clang diagnostic pop +#endif + + auto firstTask = first; + ++first; + + for (size_t i = 1; first != last; ++i, ++first, ++tasksTodo) { + addTask(taskFunc(i, std::move(*first))); + } + + taskFunc(0, std::move(*firstTask))(); + baton.wait(); + + if (e != std::exception_ptr()) { + std::rethrow_exception(e); + } +} + +}} // folly::fibers diff --git a/folly/experimental/fibers/ForEach.h b/folly/experimental/fibers/ForEach.h new file mode 100644 index 00000000..3568aaaf --- /dev/null +++ b/folly/experimental/fibers/ForEach.h @@ -0,0 +1,43 @@ +/* + * Copyright 2015 Facebook, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#pragma once + +namespace folly { namespace fibers { + +/** + * Schedules several tasks and blocks until all of them are completed. + * In the process of their successfull completion given callback would be called + * for each of them with the index of the task and the result it returned (if + * not void). + * If any of these n tasks throws an exception, this exception will be + * re-thrown, but only when all tasks are complete. If several tasks throw + * exceptions one of them will be re-thrown. Callback won't be called for + * tasks that throw exception. + * + * @param first Range of tasks to be scheduled + * @param last + * @param F callback to call for each result. + * In case of each task returning void it should be callable + * F(size_t id) + * otherwise should be callable + * F(size_t id, Result) + */ +template +inline void forEach(InputIterator first, InputIterator last, F&& f); + +}} // folly::fibers + +#include diff --git a/folly/experimental/fibers/GenericBaton.h b/folly/experimental/fibers/GenericBaton.h new file mode 100644 index 00000000..3a20d641 --- /dev/null +++ b/folly/experimental/fibers/GenericBaton.h @@ -0,0 +1,26 @@ +/* + * Copyright 2015 Facebook, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#pragma once + +#include + +#include + +namespace folly { namespace fibers { + +typedef Baton GenericBaton; + +}} diff --git a/folly/experimental/fibers/GuardPageAllocator-inl.h b/folly/experimental/fibers/GuardPageAllocator-inl.h new file mode 100644 index 00000000..959b5c38 --- /dev/null +++ b/folly/experimental/fibers/GuardPageAllocator-inl.h @@ -0,0 +1,69 @@ +/* + * Copyright 2015 Facebook, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#pragma once + +#include +#include + +#include + +namespace folly { namespace fibers { + +namespace { +size_t pagesize() { + static const size_t pagesize = sysconf(_SC_PAGESIZE); + return pagesize; +} + +/* Returns a multiple of pagesize() enough to store size + one guard page */ +size_t allocSize(size_t size) { + return pagesize() * ((size + pagesize() - 1)/pagesize() + 1); +} +} + +unsigned char* GuardPageAllocator::allocate(size_t size) { + /* We allocate minimum number of pages required, plus a guard page. + Since we use this for stack storage, requested allocation is aligned + at the top of the allocated pages, while the guard page is at the bottom. + + -- increasing addresses --> + Guard page Normal pages + |xxxxxxxxxx|..........|..........| + <- size --------> + return value -^ + */ + void* p = nullptr; + PCHECK(!::posix_memalign(&p, pagesize(), allocSize(size))); + + /* Try to protect first page + (stack grows downwards from last allocated address), ignore errors */ + ::mprotect(p, pagesize(), PROT_NONE); + /* Return pointer to top 'size' bytes in allocated storage */ + auto up = reinterpret_cast(p) + allocSize(size) - size; + assert(up >= reinterpret_cast(p) + pagesize()); + return up; +} + +void GuardPageAllocator::deallocate(unsigned char* up, size_t size) { + /* Get allocation base */ + auto p = up + size - allocSize(size); + /* Try to unprotect the page for memory allocator to re-use, + ignore errors (in cases we failed to protect in the first place */ + ::mprotect(p, pagesize(), PROT_READ|PROT_WRITE); + ::free(p); +} + +}} // folly::fibers diff --git a/folly/experimental/fibers/GuardPageAllocator.h b/folly/experimental/fibers/GuardPageAllocator.h new file mode 100644 index 00000000..67e16cf2 --- /dev/null +++ b/folly/experimental/fibers/GuardPageAllocator.h @@ -0,0 +1,32 @@ +/* + * Copyright 2015 Facebook, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#pragma once + +namespace folly { namespace fibers { + +/** + * Stack allocator that protects an extra memory page after + * the end of the stack. + */ +class GuardPageAllocator { + public: + inline unsigned char* allocate(size_t size); + inline void deallocate(unsigned char* up, size_t size); +}; + +}} // folly::fibers + +#include "GuardPageAllocator-inl.h" diff --git a/folly/experimental/fibers/LoopController.h b/folly/experimental/fibers/LoopController.h new file mode 100644 index 00000000..a7e1220b --- /dev/null +++ b/folly/experimental/fibers/LoopController.h @@ -0,0 +1,60 @@ +/* + * Copyright 2015 Facebook, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#pragma once + +#include +#include + +namespace folly { namespace fibers { + +class FiberManager; + +class LoopController { + public: + typedef std::chrono::steady_clock Clock; + typedef std::chrono::time_point TimePoint; + + virtual ~LoopController() {} + + /** + * Called by FiberManager to associate itself with the LoopController. + */ + virtual void setFiberManager(FiberManager*) = 0; + + /** + * Called by FiberManager to schedule the loop function run + * at some point in the future. + */ + virtual void schedule() = 0; + + /** + * Same as schedule(), but safe to call from any thread. + */ + virtual void scheduleThreadSafe() = 0; + + /** + * Called by FiberManager to cancel a previously scheduled + * loop function run. + */ + virtual void cancel() = 0; + + /** + * Called by FiberManager to schedule some function to be run at some time. + */ + virtual void timedSchedule(std::function func, TimePoint time) = 0; +}; + +}} // folly::fibers diff --git a/folly/experimental/fibers/Makefile.am b/folly/experimental/fibers/Makefile.am new file mode 100644 index 00000000..02af5b3e --- /dev/null +++ b/folly/experimental/fibers/Makefile.am @@ -0,0 +1 @@ +SUBDIRS = test diff --git a/folly/experimental/fibers/Promise-inl.h b/folly/experimental/fibers/Promise-inl.h new file mode 100644 index 00000000..6b1aae13 --- /dev/null +++ b/folly/experimental/fibers/Promise-inl.h @@ -0,0 +1,93 @@ +/* + * Copyright 2015 Facebook, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#include + +namespace folly { namespace fibers { + +template +Promise::Promise(folly::Try& value, Baton& baton) : + value_(&value), baton_(&baton) +{} + +template +Promise::Promise(Promise&& other) noexcept : +value_(other.value_), baton_(other.baton_) { + other.value_ = nullptr; + other.baton_ = nullptr; +} + +template +Promise& Promise::operator=(Promise&& other) { + std::swap(value_, other.value_); + std::swap(baton_, other.baton_); + return *this; +} + +template +void Promise::throwIfFulfilled() const { + if (!value_) { + throw std::logic_error("promise already fulfilled"); + } +} + +template +Promise::~Promise() { + if (value_) { + setException(folly::make_exception_wrapper( + "promise not fulfilled")); + } +} + +template +void Promise::setException(folly::exception_wrapper e) { + fulfilTry(folly::Try(e)); +} + +template +void Promise::fulfilTry(folly::Try&& t) { + throwIfFulfilled(); + + *value_ = std::move(t); + baton_->post(); + + value_ = nullptr; + baton_ = nullptr; +} + +template +template +void Promise::setValue(M&& v) { + static_assert(!std::is_same::value, + "Use setValue() instead"); + + fulfilTry(folly::Try(std::forward(v))); +} + +template +void Promise::setValue() { + static_assert(std::is_same::value, + "Use setValue(value) instead"); + + fulfilTry(folly::Try()); +} + +template +template +void Promise::fulfil(F&& func) { + fulfilTry(makeTryFunction(std::forward(func))); +} + +}} diff --git a/folly/experimental/fibers/Promise.h b/folly/experimental/fibers/Promise.h new file mode 100644 index 00000000..7d24821d --- /dev/null +++ b/folly/experimental/fibers/Promise.h @@ -0,0 +1,100 @@ +/* + * Copyright 2015 Facebook, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#pragma once + +#include +#include + +namespace folly { namespace fibers { + +class Baton; + +template +typename FirstArgOf::type::value_type +inline await(F&& func); + +template +class Promise { + public: + typedef T value_type; + + ~Promise(); + + // not copyable + Promise(const Promise&) = delete; + Promise& operator=(const Promise&) = delete; + + // movable + Promise(Promise&&) noexcept; + Promise& operator=(Promise&&); + + /** Fulfil this promise (only for Promise) */ + void setValue(); + + /** Set the value (use perfect forwarding for both move and copy) */ + template + void setValue(M&& value); + + /** + * Fulfill the promise with a given try + * + * @param t + */ + void fulfilTry(folly::Try&& t); + + /** Fulfil this promise with the result of a function that takes no + arguments and returns something implicitly convertible to T. + Captures exceptions. e.g. + + p.fulfil([] { do something that may throw; return a T; }); + */ + template + void fulfil(F&& func); + + /** Fulfil the Promise with an exception_wrapper, e.g. + auto ew = folly::try_and_catch([]{ ... }); + if (ew) { + p.setException(std::move(ew)); + } + */ + void setException(folly::exception_wrapper); + + private: + template + friend typename FirstArgOf::type::value_type await(F&&); + + Promise(folly::Try& value, Baton& baton); + folly::Try* value_; + Baton* baton_; + + void throwIfFulfilled() const; + + template + typename std::enable_if< + std::is_convertible::type, T>::value && + !std::is_same::value>::type + fulfilHelper(F&& func); + + template + typename std::enable_if< + std::is_same::type, void>::value && + std::is_same::value>::type + fulfilHelper(F&& func); +}; + +}} + +#include diff --git a/folly/experimental/fibers/SimpleLoopController.h b/folly/experimental/fibers/SimpleLoopController.h new file mode 100644 index 00000000..de074bbd --- /dev/null +++ b/folly/experimental/fibers/SimpleLoopController.h @@ -0,0 +1,108 @@ +/* + * Copyright 2015 Facebook, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#pragma once + +#include + +#include + +namespace folly { namespace fibers { + +class FiberManager; + +class SimpleLoopController : public LoopController { + public: + SimpleLoopController() + : fm_(nullptr), + stopRequested_(false) { + } + + /** + * Run FiberManager loop; if no ready task are present, + * run provided function. Stops after both stop() has been called + * and no waiting tasks remain. + */ + template + void loop(F&& func) { + bool waiting = false; + stopRequested_ = false; + + while (LIKELY(waiting || !stopRequested_)) { + func(); + + auto time = Clock::now(); + + for (size_t i=0; iloopUntilNoReady(); + } + } + } + + /** + * Requests exit from loop() as soon as all waiting tasks complete. + */ + void stop() { + stopRequested_ = true; + } + + int remoteScheduleCalled() const { + return remoteScheduleCalled_; + } + + void schedule() override { + scheduled_ = true; + } + + void timedSchedule(std::function func, TimePoint time) override { + scheduledFuncs_.push_back({time, std::move(func)}); + } + + private: + FiberManager* fm_; + std::atomic scheduled_{false}; + bool stopRequested_; + std::atomic remoteScheduleCalled_{0}; + std::vector>> scheduledFuncs_; + + /* LoopController interface */ + + void setFiberManager(FiberManager* fm) override { + fm_ = fm; + } + + void cancel() override { + scheduled_ = false; + } + + void scheduleThreadSafe() override { + ++remoteScheduleCalled_; + scheduled_ = true; + } + + friend class FiberManager; +}; + +}} // folly::fibers diff --git a/folly/experimental/fibers/TimeoutController.cpp b/folly/experimental/fibers/TimeoutController.cpp new file mode 100644 index 00000000..3a138aad --- /dev/null +++ b/folly/experimental/fibers/TimeoutController.cpp @@ -0,0 +1,105 @@ +/* + * Copyright 2015 Facebook, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#include "TimeoutController.h" +#include + +namespace folly { namespace fibers { + +TimeoutController::TimeoutController(LoopController& loopController) : + nextTimeout_(TimePoint::max()), + loopController_(loopController) {} + +intptr_t TimeoutController::registerTimeout(std::function f, + Duration duration) { + auto& list = [&]() -> TimeoutHandleList& { + for (auto& bucket : timeoutHandleBuckets_) { + if (bucket.first == duration) { + return *bucket.second; + } + } + + timeoutHandleBuckets_.emplace_back(duration, + folly::make_unique()); + return *timeoutHandleBuckets_.back().second; + }(); + + auto timeout = Clock::now() + duration; + list.emplace(std::move(f), timeout, list); + + if (timeout < nextTimeout_) { + nextTimeout_ = timeout; + scheduleRun(); + } + + return reinterpret_cast(&list.back()); +} + +void TimeoutController::runTimeouts(TimePoint time) { + auto now = Clock::now(); + // Make sure we don't skip some events if function was run before actual time. + if (time < now) { + time = now; + } + if (nextTimeout_ > time) { + return; + } + + nextTimeout_ = TimePoint::max(); + + for (auto& bucket : timeoutHandleBuckets_) { + auto& list = *bucket.second; + + while (!list.empty()) { + if (!list.front().canceled) { + if (list.front().timeout > time) { + nextTimeout_ = std::min(nextTimeout_, list.front().timeout); + break; + } + + list.front().func(); + } + list.pop(); + } + } + + if (nextTimeout_ != TimePoint::max()) { + scheduleRun(); + } +} + +void TimeoutController::scheduleRun() { + auto time = nextTimeout_; + std::weak_ptr timeoutControllerWeak = shared_from_this(); + + loopController_.timedSchedule([timeoutControllerWeak, time]() { + if (auto timeoutController = timeoutControllerWeak.lock()) { + timeoutController->runTimeouts(time); + } + }, time); +} + +void TimeoutController::cancel(intptr_t p) { + auto handle = reinterpret_cast(p); + handle->canceled = true; + + auto& list = handle->list; + + while (!list.empty() && list.front().canceled) { + list.pop(); + } +} + +}} diff --git a/folly/experimental/fibers/TimeoutController.h b/folly/experimental/fibers/TimeoutController.h new file mode 100644 index 00000000..f74faeb5 --- /dev/null +++ b/folly/experimental/fibers/TimeoutController.h @@ -0,0 +1,69 @@ +/* + * Copyright 2015 Facebook, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#pragma once + +#include +#include +#include +#include + +#include + +#include + +#include + +namespace folly { namespace fibers { + +class TimeoutController : + public std::enable_shared_from_this { + public: + typedef std::chrono::steady_clock Clock; + typedef std::chrono::time_point TimePoint; + typedef Clock::duration Duration; + + explicit TimeoutController(LoopController& loopController); + + intptr_t registerTimeout(std::function f, Duration duration); + void cancel(intptr_t id); + + void runTimeouts(TimePoint time); + + private: + void scheduleRun(); + + class TimeoutHandle; + typedef std::queue TimeoutHandleList; + typedef std::unique_ptr TimeoutHandleListPtr; + + struct TimeoutHandle { + TimeoutHandle(std::function func_, + TimePoint timeout_, + TimeoutHandleList& list_) : + func(std::move(func_)), timeout(timeout_), list(list_) {} + + std::function func; + bool canceled{false}; + TimePoint timeout; + TimeoutHandleList& list; + }; + + std::vector> timeoutHandleBuckets_; + TimePoint nextTimeout_; + LoopController& loopController_; +}; + +}} diff --git a/folly/experimental/fibers/WhenN-inl.h b/folly/experimental/fibers/WhenN-inl.h new file mode 100644 index 00000000..137dd9f9 --- /dev/null +++ b/folly/experimental/fibers/WhenN-inl.h @@ -0,0 +1,227 @@ +/* + * Copyright 2015 Facebook, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#include + +#include +#include + +namespace folly { namespace fibers { + +template +typename std::vector< + typename std::enable_if< + !std::is_same< + typename std::result_of< + typename std::iterator_traits::value_type()>::type, void + >::value, + typename std::pair< + size_t, + typename std::result_of< + typename std::iterator_traits::value_type()>::type> + >::type + > +whenN(InputIterator first, InputIterator last, size_t n) { + typedef typename std::result_of< + typename std::iterator_traits::value_type()>::type Result; + assert(n > 0); + assert(n <= std::distance(first, last)); + + struct Context { + std::vector> results; + size_t tasksTodo; + std::exception_ptr e; + folly::Optional> promise; + + Context(size_t tasksTodo_) : tasksTodo(tasksTodo_) { + this->results.reserve(tasksTodo_); + } + }; + auto context = std::make_shared(n); + + await( + [first, last, context](Promise promise) mutable { + context->promise = std::move(promise); + for (size_t i = 0; first != last; ++i, ++first) { +#ifdef __clang__ +#pragma clang diagnostic push // ignore generalized lambda capture warning +#pragma clang diagnostic ignored "-Wc++1y-extensions" +#endif + addTask( + [i, context, f = std::move(*first)]() { + try { + auto result = f(); + if (context->tasksTodo == 0) { + return; + } + context->results.emplace_back(i, std::move(result)); + } catch (...) { + if (context->tasksTodo == 0) { + return; + } + context->e = std::current_exception(); + } + if (--context->tasksTodo == 0) { + context->promise->setValue(); + } + }); +#ifdef __clang__ +#pragma clang diagnostic pop +#endif + } + }); + + if (context->e != std::exception_ptr()) { + std::rethrow_exception(context->e); + } + + return std::move(context->results); +} + +template +typename std::enable_if< + std::is_same< + typename std::result_of< + typename std::iterator_traits::value_type()>::type, void + >::value, std::vector>::type +whenN(InputIterator first, InputIterator last, size_t n) { + assert(n > 0); + assert(n <= std::distance(first, last)); + + struct Context { + std::vector taskIndices; + std::exception_ptr e; + size_t tasksTodo; + folly::Optional> promise; + + Context(size_t tasksTodo_) : tasksTodo(tasksTodo_) { + this->taskIndices.reserve(tasksTodo_); + } + }; + auto context = std::make_shared(n); + + await( + [first, last, context](Promise promise) mutable { + context->promise = std::move(promise); + for (size_t i = 0; first != last; ++i, ++first) { +#ifdef __clang__ +#pragma clang diagnostic push // ignore generalized lambda capture warning +#pragma clang diagnostic ignored "-Wc++1y-extensions" +#endif + addTask( + [i, context, f = std::move(*first)]() { + try { + f(); + if (context->tasksTodo == 0) { + return; + } + context->taskIndices.push_back(i); + } catch (...) { + if (context->tasksTodo == 0) { + return; + } + context->e = std::current_exception(); + } + if (--context->tasksTodo == 0) { + context->promise->setValue(); + } + }); +#ifdef __clang__ +#pragma clang diagnostic pop +#endif + } + }); + + if (context->e != std::exception_ptr()) { + std::rethrow_exception(context->e); + } + + return context->taskIndices; +} + +template +typename std::vector< + typename std::enable_if< + !std::is_same< + typename std::result_of< + typename std::iterator_traits::value_type()>::type, void + >::value, + typename std::result_of< + typename std::iterator_traits::value_type()>::type>::type> +inline whenAll(InputIterator first, InputIterator last) { + typedef typename std::result_of< + typename std::iterator_traits::value_type()>::type Result; + size_t n = std::distance(first, last); + std::vector results; + std::vector order(n); + results.reserve(n); + + forEach(first, last, + [&results, &order] (size_t id, Result result) { + order[id] = results.size(); + results.emplace_back(std::move(result)); + }); + assert(results.size() == n); + + std::vector orderedResults; + orderedResults.reserve(n); + + for (size_t i = 0; i < n; ++i) { + orderedResults.emplace_back(std::move(results[order[i]])); + } + + return orderedResults; +} + +template +typename std::enable_if< + std::is_same< + typename std::result_of< + typename std::iterator_traits::value_type()>::type, void + >::value, void>::type +inline whenAll(InputIterator first, InputIterator last) { + forEach(first, last, [] (size_t id) {}); +} + +template +typename std::enable_if< + !std::is_same< + typename std::result_of< + typename std::iterator_traits::value_type()>::type, void + >::value, + typename std::pair< + size_t, + typename std::result_of< + typename std::iterator_traits::value_type()>::type> + >::type +inline whenAny(InputIterator first, InputIterator last) { + auto result = whenN(first, last, 1); + assert(result.size() == 1); + return std::move(result[0]); +} + +template +typename std::enable_if< + std::is_same< + typename std::result_of< + typename std::iterator_traits::value_type()>::type, void + >::value, size_t>::type +inline whenAny(InputIterator first, InputIterator last) { + auto result = whenN(first, last, 1); + assert(result.size() == 1); + return std::move(result[0]); +} + +}} diff --git a/folly/experimental/fibers/WhenN.h b/folly/experimental/fibers/WhenN.h new file mode 100644 index 00000000..8d11eb21 --- /dev/null +++ b/folly/experimental/fibers/WhenN.h @@ -0,0 +1,142 @@ +/* + * Copyright 2015 Facebook, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#pragma once + +namespace folly { namespace fibers { + +/** + * Schedules several tasks and blocks until n of these tasks are completed. + * If any of these n tasks throws an exception, this exception will be + * re-thrown, but only when n tasks are complete. If several tasks throw + * exceptions one of them will be re-thrown. + * + * @param first Range of tasks to be scheduled + * @param last + * @param n Number of tasks to wait for + * + * @return vector of pairs (task index, return value of task) + */ +template +typename std::vector< + typename std::enable_if< + !std::is_same< + typename std::result_of< + typename std::iterator_traits::value_type()>::type, + void>::value, + typename std::pair< + size_t, + typename std::result_of< + typename std::iterator_traits::value_type()>::type> + >::type + > +inline whenN(InputIterator first, InputIterator last, size_t n); + +/** + * whenN specialization for functions returning void + * + * @param first Range of tasks to be scheduled + * @param last + * @param n Number of tasks to wait for + * + * @return vector of completed task indices + */ +template +typename std::enable_if< + std::is_same< + typename std::result_of< + typename std::iterator_traits::value_type()>::type, void + >::value, std::vector>::type +inline whenN(InputIterator first, InputIterator last, size_t n); + +/** + * Schedules several tasks and blocks until all of these tasks are completed. + * If any of the tasks throws an exception, this exception will be re-thrown, + * but only when all the tasks are complete. If several tasks throw exceptions + * one of them will be re-thrown. + * + * @param first Range of tasks to be scheduled + * @param last + * + * @return vector of values returned by tasks + */ +template +typename std::vector< + typename std::enable_if< + !std::is_same< + typename std::result_of< + typename std::iterator_traits::value_type()>::type, + void>::value, + typename std::result_of< + typename std::iterator_traits::value_type()>::type>::type + > +inline whenAll(InputIterator first, InputIterator last); + +/** + * whenAll specialization for functions returning void + * + * @param first Range of tasks to be scheduled + * @param last + */ +template +typename std::enable_if< + std::is_same< + typename std::result_of< + typename std::iterator_traits::value_type()>::type, void + >::value, void>::type +inline whenAll(InputIterator first, InputIterator last); + +/** + * Schedules several tasks and blocks until one of them is completed. + * If this task throws an exception, this exception will be re-thrown. + * Exceptions thrown by all other tasks will be ignored. + * + * @param first Range of tasks to be scheduled + * @param last + * + * @return pair of index of the first completed task and its return value + */ +template +typename std::enable_if< + !std::is_same< + typename std::result_of< + typename std::iterator_traits::value_type()>::type, void + >::value, + typename std::pair< + size_t, + typename std::result_of< + typename std::iterator_traits::value_type()>::type> + >::type +inline whenAny(InputIterator first, InputIterator last); + +/** + * WhenAny specialization for functions returning void. + * + * @param first Range of tasks to be scheduled + * @param last + * + * @return index of the first completed task + */ +template +typename std::enable_if< + std::is_same< + typename std::result_of< + typename std::iterator_traits::value_type()>::type, void + >::value, size_t>::type +inline whenAny(InputIterator first, InputIterator last); + +}} + +#include diff --git a/folly/experimental/fibers/test/FibersTest.cpp b/folly/experimental/fibers/test/FibersTest.cpp new file mode 100644 index 00000000..a0709630 --- /dev/null +++ b/folly/experimental/fibers/test/FibersTest.cpp @@ -0,0 +1,1263 @@ +/* + * Copyright 2015 Facebook, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#include +#include +#include + +#include + +#include +#include + +#include +#include +#include +#include +#include +#include + +using namespace folly::fibers; + +using folly::Try; + +TEST(FiberManager, batonTimedWaitTimeout) { + bool taskAdded = false; + size_t iterations = 0; + + FiberManager manager(folly::make_unique()); + auto& loopController = + dynamic_cast(manager.loopController()); + + auto loopFunc = [&]() { + if (!taskAdded) { + manager.addTask( + [&]() { + Baton baton; + + auto res = baton.timed_wait(std::chrono::milliseconds(230)); + + EXPECT_FALSE(res); + EXPECT_EQ(5, iterations); + + loopController.stop(); + } + ); + manager.addTask( + [&]() { + Baton baton; + + auto res = baton.timed_wait(std::chrono::milliseconds(130)); + + EXPECT_FALSE(res); + EXPECT_EQ(3, iterations); + + loopController.stop(); + } + ); + taskAdded = true; + } else { + std::this_thread::sleep_for(std::chrono::milliseconds(50)); + iterations ++; + } + }; + + loopController.loop(std::move(loopFunc)); +} + +TEST(FiberManager, batonTimedWaitPost) { + bool taskAdded = false; + size_t iterations = 0; + Baton* baton_ptr; + + FiberManager manager(folly::make_unique()); + auto& loopController = + dynamic_cast(manager.loopController()); + + auto loopFunc = [&]() { + if (!taskAdded) { + manager.addTask( + [&]() { + Baton baton; + baton_ptr = &baton; + + auto res = baton.timed_wait(std::chrono::milliseconds(130)); + + EXPECT_TRUE(res); + EXPECT_EQ(2, iterations); + + loopController.stop(); + } + ); + taskAdded = true; + } else { + std::this_thread::sleep_for(std::chrono::milliseconds(50)); + iterations ++; + if (iterations == 2) { + baton_ptr->post(); + } + } + }; + + loopController.loop(std::move(loopFunc)); +} + +TEST(FiberManager, batonTimedWaitTimeoutEvb) { + size_t tasksComplete = 0; + + folly::EventBase evb; + + FiberManager manager(folly::make_unique()); + dynamic_cast( + manager.loopController()).attachEventBase(evb); + + auto task = [&](size_t timeout_ms) { + Baton baton; + + auto start = EventBaseLoopController::Clock::now(); + auto res = baton.timed_wait(std::chrono::milliseconds(timeout_ms)); + auto finish = EventBaseLoopController::Clock::now(); + + EXPECT_FALSE(res); + + auto duration_ms = + std::chrono::duration_cast(finish - start); + + EXPECT_GT(duration_ms.count(), timeout_ms - 50); + EXPECT_LT(duration_ms.count(), timeout_ms + 50); + + if (++tasksComplete == 2) { + evb.terminateLoopSoon(); + } + }; + + evb.runInEventBaseThread([&]() { + manager.addTask( + [&]() { + task(500); + } + ); + manager.addTask( + [&]() { + task(250); + } + ); + }); + + evb.loopForever(); + + EXPECT_EQ(2, tasksComplete); +} + +TEST(FiberManager, batonTimedWaitPostEvb) { + size_t tasksComplete = 0; + + folly::EventBase evb; + + FiberManager manager(folly::make_unique()); + dynamic_cast( + manager.loopController()).attachEventBase(evb); + + evb.runInEventBaseThread([&]() { + manager.addTask([&]() { + Baton baton; + + evb.tryRunAfterDelay([&]() { + baton.post(); + }, + 100); + + auto start = EventBaseLoopController::Clock::now(); + auto res = baton.timed_wait(std::chrono::milliseconds(130)); + auto finish = EventBaseLoopController::Clock::now(); + + EXPECT_TRUE(res); + + auto duration_ms = std::chrono::duration_cast< + std::chrono::milliseconds>(finish - start); + + EXPECT_TRUE(duration_ms.count() > 95 && + duration_ms.count() < 110); + + if (++tasksComplete == 1) { + evb.terminateLoopSoon(); + } + }); + }); + + evb.loopForever(); + + EXPECT_EQ(1, tasksComplete); +} + +TEST(FiberManager, batonTryWait) { + + FiberManager manager(folly::make_unique()); + auto& loopController = + dynamic_cast(manager.loopController()); + + // Check if try_wait and post work as expected + Baton b; + + manager.addTask([&](){ + while (!b.try_wait()) { + } + }); + auto thr = std::thread([&](){ + std::this_thread::sleep_for(std::chrono::milliseconds(300)); + b.post(); + }); + + manager.loopUntilNoReady(); + thr.join(); + + Baton c; + + // Check try_wait without post + manager.addTask([&](){ + int cnt = 100; + while (cnt && !c.try_wait()) { + cnt--; + } + EXPECT_TRUE(!c.try_wait()); // must still hold + EXPECT_EQ(cnt, 0); + }); + + manager.loopUntilNoReady(); +} + +TEST(FiberManager, genericBatonFiberWait) { + FiberManager manager(folly::make_unique()); + + GenericBaton b; + bool fiberRunning = false; + + manager.addTask([&](){ + EXPECT_EQ(manager.hasActiveFiber(), true); + fiberRunning = true; + b.wait(); + fiberRunning = false; + }); + + EXPECT_FALSE(fiberRunning); + manager.loopUntilNoReady(); + EXPECT_TRUE(fiberRunning); // ensure fiber still active + + auto thr = std::thread([&](){ + std::this_thread::sleep_for(std::chrono::milliseconds(300)); + b.post(); + }); + + while (fiberRunning) { + manager.loopUntilNoReady(); + } + + thr.join(); +} + +TEST(FiberManager, genericBatonThreadWait) { + FiberManager manager(folly::make_unique()); + GenericBaton b; + std::atomic threadWaiting(false); + + auto thr = std::thread([&](){ + threadWaiting = true; + b.wait(); + threadWaiting = false; + }); + + while (!threadWaiting) {} + std::this_thread::sleep_for(std::chrono::milliseconds(300)); + + manager.addTask([&](){ + EXPECT_EQ(manager.hasActiveFiber(), true); + EXPECT_TRUE(threadWaiting); + b.post(); + while(threadWaiting) {} + }); + + manager.loopUntilNoReady(); + thr.join(); +} + +TEST(FiberManager, addTasksNoncopyable) { + std::vector> pendingFibers; + bool taskAdded = false; + + FiberManager manager(folly::make_unique()); + auto& loopController = + dynamic_cast(manager.loopController()); + + auto loopFunc = [&]() { + if (!taskAdded) { + manager.addTask( + [&]() { + std::vector()>> funcs; + for (size_t i = 0; i < 3; ++i) { + funcs.push_back( + [i, &pendingFibers]() { + await([&pendingFibers](Promise promise) { + pendingFibers.push_back(std::move(promise)); + }); + return folly::make_unique(i*2 + 1); + } + ); + } + + auto iter = addTasks(funcs.begin(), funcs.end()); + + size_t n = 0; + while (iter.hasNext()) { + auto result = iter.awaitNext(); + EXPECT_EQ(2 * iter.getTaskID() + 1, *result); + EXPECT_GE(2 - n, pendingFibers.size()); + ++n; + } + EXPECT_EQ(3, n); + } + ); + taskAdded = true; + } else if (pendingFibers.size()) { + pendingFibers.back().setValue(0); + pendingFibers.pop_back(); + } else { + loopController.stop(); + } + }; + + loopController.loop(std::move(loopFunc)); +} + +TEST(FiberManager, addTasksThrow) { + std::vector> pendingFibers; + bool taskAdded = false; + + FiberManager manager(folly::make_unique()); + auto& loopController = + dynamic_cast(manager.loopController()); + + auto loopFunc = [&]() { + if (!taskAdded) { + manager.addTask( + [&]() { + std::vector> funcs; + for (size_t i = 0; i < 3; ++i) { + funcs.push_back( + [i, &pendingFibers]() { + await([&pendingFibers](Promise promise) { + pendingFibers.push_back(std::move(promise)); + }); + if (i % 2 == 0) { + throw std::runtime_error("Runtime"); + } + return i*2 + 1; + } + ); + } + + auto iter = addTasks(funcs.begin(), funcs.end()); + + size_t n = 0; + while (iter.hasNext()) { + try { + int result = iter.awaitNext(); + EXPECT_EQ(1, iter.getTaskID() % 2); + EXPECT_EQ(2 * iter.getTaskID() + 1, result); + } catch (...) { + EXPECT_EQ(0, iter.getTaskID() % 2); + } + EXPECT_GE(2 - n, pendingFibers.size()); + ++n; + } + EXPECT_EQ(3, n); + } + ); + taskAdded = true; + } else if (pendingFibers.size()) { + pendingFibers.back().setValue(0); + pendingFibers.pop_back(); + } else { + loopController.stop(); + } + }; + + loopController.loop(std::move(loopFunc)); +} + +TEST(FiberManager, addTasksVoid) { + std::vector> pendingFibers; + bool taskAdded = false; + + FiberManager manager(folly::make_unique()); + auto& loopController = + dynamic_cast(manager.loopController()); + + auto loopFunc = [&]() { + if (!taskAdded) { + manager.addTask( + [&]() { + std::vector> funcs; + for (size_t i = 0; i < 3; ++i) { + funcs.push_back( + [i, &pendingFibers]() { + await([&pendingFibers](Promise promise) { + pendingFibers.push_back(std::move(promise)); + }); + } + ); + } + + auto iter = addTasks(funcs.begin(), funcs.end()); + + size_t n = 0; + while (iter.hasNext()) { + iter.awaitNext(); + EXPECT_GE(2 - n, pendingFibers.size()); + ++n; + } + EXPECT_EQ(3, n); + } + ); + taskAdded = true; + } else if (pendingFibers.size()) { + pendingFibers.back().setValue(0); + pendingFibers.pop_back(); + } else { + loopController.stop(); + } + }; + + loopController.loop(std::move(loopFunc)); +} + +TEST(FiberManager, addTasksVoidThrow) { + std::vector> pendingFibers; + bool taskAdded = false; + + FiberManager manager(folly::make_unique()); + auto& loopController = + dynamic_cast(manager.loopController()); + + auto loopFunc = [&]() { + if (!taskAdded) { + manager.addTask( + [&]() { + std::vector> funcs; + for (size_t i = 0; i < 3; ++i) { + funcs.push_back( + [i, &pendingFibers]() { + await([&pendingFibers](Promise promise) { + pendingFibers.push_back(std::move(promise)); + }); + if (i % 2 == 0) { + throw std::runtime_error(""); + } + } + ); + } + + auto iter = addTasks(funcs.begin(), funcs.end()); + + size_t n = 0; + while (iter.hasNext()) { + try { + iter.awaitNext(); + EXPECT_EQ(1, iter.getTaskID() % 2); + } catch (...) { + EXPECT_EQ(0, iter.getTaskID() % 2); + } + EXPECT_GE(2 - n, pendingFibers.size()); + ++n; + } + EXPECT_EQ(3, n); + } + ); + taskAdded = true; + } else if (pendingFibers.size()) { + pendingFibers.back().setValue(0); + pendingFibers.pop_back(); + } else { + loopController.stop(); + } + }; + + loopController.loop(std::move(loopFunc)); +} + +TEST(FiberManager, reserve) { + std::vector> pendingFibers; + bool taskAdded = false; + + FiberManager manager(folly::make_unique()); + auto& loopController = + dynamic_cast(manager.loopController()); + + auto loopFunc = [&]() { + if (!taskAdded) { + manager.addTask( + [&]() { + std::vector> funcs; + for (size_t i = 0; i < 3; ++i) { + funcs.push_back( + [&pendingFibers]() { + await([&pendingFibers](Promise promise) { + pendingFibers.push_back(std::move(promise)); + }); + } + ); + } + + auto iter = addTasks(funcs.begin(), funcs.end()); + + iter.reserve(2); + EXPECT_TRUE(iter.hasCompleted()); + EXPECT_TRUE(iter.hasPending()); + EXPECT_TRUE(iter.hasNext()); + + iter.awaitNext(); + EXPECT_TRUE(iter.hasCompleted()); + EXPECT_TRUE(iter.hasPending()); + EXPECT_TRUE(iter.hasNext()); + + iter.awaitNext(); + EXPECT_FALSE(iter.hasCompleted()); + EXPECT_TRUE(iter.hasPending()); + EXPECT_TRUE(iter.hasNext()); + + iter.awaitNext(); + EXPECT_FALSE(iter.hasCompleted()); + EXPECT_FALSE(iter.hasPending()); + EXPECT_FALSE(iter.hasNext()); + } + ); + taskAdded = true; + } else if (pendingFibers.size()) { + pendingFibers.back().setValue(0); + pendingFibers.pop_back(); + } else { + loopController.stop(); + } + }; + + loopController.loop(std::move(loopFunc)); +} + +TEST(FiberManager, forEach) { + std::vector> pendingFibers; + bool taskAdded = false; + + FiberManager manager(folly::make_unique()); + auto& loopController = + dynamic_cast(manager.loopController()); + + auto loopFunc = [&]() { + if (!taskAdded) { + manager.addTask( + [&]() { + std::vector> funcs; + for (size_t i = 0; i < 3; ++i) { + funcs.push_back( + [i, &pendingFibers]() { + await([&pendingFibers](Promise promise) { + pendingFibers.push_back(std::move(promise)); + }); + return i * 2 + 1; + } + ); + } + + std::vector> results; + forEach(funcs.begin(), funcs.end(), + [&results](size_t id, int result) { + results.push_back(std::make_pair(id, result)); + }); + EXPECT_EQ(3, results.size()); + EXPECT_TRUE(pendingFibers.empty()); + for (size_t i = 0; i < 3; ++i) { + EXPECT_EQ(results[i].first * 2 + 1, results[i].second); + } + } + ); + taskAdded = true; + } else if (pendingFibers.size()) { + pendingFibers.back().setValue(0); + pendingFibers.pop_back(); + } else { + loopController.stop(); + } + }; + + loopController.loop(std::move(loopFunc)); +} + +TEST(FiberManager, whenN) { + std::vector> pendingFibers; + bool taskAdded = false; + + FiberManager manager(folly::make_unique()); + auto& loopController = + dynamic_cast(manager.loopController()); + + auto loopFunc = [&]() { + if (!taskAdded) { + manager.addTask( + [&]() { + std::vector> funcs; + for (size_t i = 0; i < 3; ++i) { + funcs.push_back( + [i, &pendingFibers]() { + await([&pendingFibers](Promise promise) { + pendingFibers.push_back(std::move(promise)); + }); + return i*2 + 1; + } + ); + } + + auto results = whenN(funcs.begin(), funcs.end(), 2); + EXPECT_EQ(2, results.size()); + EXPECT_EQ(1, pendingFibers.size()); + for (size_t i = 0; i < 2; ++i) { + EXPECT_EQ(results[i].first*2 + 1, results[i].second); + } + } + ); + taskAdded = true; + } else if (pendingFibers.size()) { + pendingFibers.back().setValue(0); + pendingFibers.pop_back(); + } else { + loopController.stop(); + } + }; + + loopController.loop(std::move(loopFunc)); +} + +TEST(FiberManager, whenNThrow) { + std::vector> pendingFibers; + bool taskAdded = false; + + FiberManager manager(folly::make_unique()); + auto& loopController = + dynamic_cast(manager.loopController()); + + auto loopFunc = [&]() { + if (!taskAdded) { + manager.addTask( + [&]() { + std::vector> funcs; + for (size_t i = 0; i < 3; ++i) { + funcs.push_back( + [i, &pendingFibers]() { + await([&pendingFibers](Promise promise) { + pendingFibers.push_back(std::move(promise)); + }); + throw std::runtime_error("Runtime"); + return i*2+1; + } + ); + } + + try { + whenN(funcs.begin(), funcs.end(), 2); + } catch (...) { + EXPECT_EQ(1, pendingFibers.size()); + } + } + ); + taskAdded = true; + } else if (pendingFibers.size()) { + pendingFibers.back().setValue(0); + pendingFibers.pop_back(); + } else { + loopController.stop(); + } + }; + + loopController.loop(std::move(loopFunc)); +} + +TEST(FiberManager, whenNVoid) { + std::vector> pendingFibers; + bool taskAdded = false; + + FiberManager manager(folly::make_unique()); + auto& loopController = + dynamic_cast(manager.loopController()); + + auto loopFunc = [&]() { + if (!taskAdded) { + manager.addTask( + [&]() { + std::vector> funcs; + for (size_t i = 0; i < 3; ++i) { + funcs.push_back( + [i, &pendingFibers]() { + await([&pendingFibers](Promise promise) { + pendingFibers.push_back(std::move(promise)); + }); + } + ); + } + + auto results = whenN(funcs.begin(), funcs.end(), 2); + EXPECT_EQ(2, results.size()); + EXPECT_EQ(1, pendingFibers.size()); + } + ); + taskAdded = true; + } else if (pendingFibers.size()) { + pendingFibers.back().setValue(0); + pendingFibers.pop_back(); + } else { + loopController.stop(); + } + }; + + loopController.loop(std::move(loopFunc)); +} + +TEST(FiberManager, whenNVoidThrow) { + std::vector> pendingFibers; + bool taskAdded = false; + + FiberManager manager(folly::make_unique()); + auto& loopController = + dynamic_cast(manager.loopController()); + + auto loopFunc = [&]() { + if (!taskAdded) { + manager.addTask( + [&]() { + std::vector> funcs; + for (size_t i = 0; i < 3; ++i) { + funcs.push_back( + [i, &pendingFibers]() { + await([&pendingFibers](Promise promise) { + pendingFibers.push_back(std::move(promise)); + }); + throw std::runtime_error("Runtime"); + } + ); + } + + try { + whenN(funcs.begin(), funcs.end(), 2); + } catch (...) { + EXPECT_EQ(1, pendingFibers.size()); + } + } + ); + taskAdded = true; + } else if (pendingFibers.size()) { + pendingFibers.back().setValue(0); + pendingFibers.pop_back(); + } else { + loopController.stop(); + } + }; + + loopController.loop(std::move(loopFunc)); +} + +TEST(FiberManager, whenAll) { + std::vector> pendingFibers; + bool taskAdded = false; + + FiberManager manager(folly::make_unique()); + auto& loopController = + dynamic_cast(manager.loopController()); + + auto loopFunc = [&]() { + if (!taskAdded) { + manager.addTask( + [&]() { + std::vector> funcs; + for (size_t i = 0; i < 3; ++i) { + funcs.push_back( + [i, &pendingFibers]() { + await([&pendingFibers](Promise promise) { + pendingFibers.push_back(std::move(promise)); + }); + return i*2+1; + } + ); + } + + auto results = whenAll(funcs.begin(), funcs.end()); + EXPECT_TRUE(pendingFibers.empty()); + for (size_t i = 0; i < 3; ++i) { + EXPECT_EQ(i*2+1, results[i]); + } + } + ); + taskAdded = true; + } else if (pendingFibers.size()) { + pendingFibers.back().setValue(0); + pendingFibers.pop_back(); + } else { + loopController.stop(); + } + }; + + loopController.loop(std::move(loopFunc)); +} + +TEST(FiberManager, whenAllVoid) { + std::vector> pendingFibers; + bool taskAdded = false; + + FiberManager manager(folly::make_unique()); + auto& loopController = + dynamic_cast(manager.loopController()); + + auto loopFunc = [&]() { + if (!taskAdded) { + manager.addTask( + [&]() { + std::vector> funcs; + for (size_t i = 0; i < 3; ++ i) { + funcs.push_back( + [i, &pendingFibers]() { + await([&pendingFibers](Promise promise) { + pendingFibers.push_back(std::move(promise)); + }); + } + ); + } + + whenAll(funcs.begin(), funcs.end()); + EXPECT_TRUE(pendingFibers.empty()); + } + ); + taskAdded = true; + } else if (pendingFibers.size()) { + pendingFibers.back().setValue(0); + pendingFibers.pop_back(); + } else { + loopController.stop(); + } + }; + + loopController.loop(std::move(loopFunc)); +} + +TEST(FiberManager, whenAny) { + std::vector> pendingFibers; + bool taskAdded = false; + + FiberManager manager(folly::make_unique()); + auto& loopController = + dynamic_cast(manager.loopController()); + + auto loopFunc = [&]() { + if (!taskAdded) { + manager.addTask( + [&]() { + std::vector > funcs; + for (size_t i = 0; i < 3; ++ i) { + funcs.push_back( + [i, &pendingFibers]() { + await([&pendingFibers](Promise promise) { + pendingFibers.push_back(std::move(promise)); + }); + if (i == 1) { + throw std::runtime_error("This exception will be ignored"); + } + return i*2+1; + } + ); + } + + auto result = whenAny(funcs.begin(), funcs.end()); + EXPECT_EQ(2, pendingFibers.size()); + EXPECT_EQ(2, result.first); + EXPECT_EQ(2*2+1, result.second); + } + ); + taskAdded = true; + } else if (pendingFibers.size()) { + pendingFibers.back().setValue(0); + pendingFibers.pop_back(); + } else { + loopController.stop(); + } + }; + + loopController.loop(std::move(loopFunc)); +} + +namespace { +/* Checks that this function was run from a main context, + by comparing an address on a stack to a known main stack address + and a known related fiber stack address. The assumption + is that fiber stack and main stack will be far enough apart, + while any two values on the same stack will be close. */ +void expectMainContext(bool& ran, int* mainLocation, int* fiberLocation) { + int here; + /* 2 pages is a good guess */ + constexpr ssize_t DISTANCE = 0x2000 / sizeof(int); + if (fiberLocation) { + EXPECT_TRUE(std::abs(&here - fiberLocation) > DISTANCE); + } + if (mainLocation) { + EXPECT_TRUE(std::abs(&here - mainLocation) < DISTANCE); + } + + EXPECT_FALSE(ran); + ran = true; +} +} + +TEST(FiberManager, runInMainContext) { + FiberManager manager(folly::make_unique()); + auto& loopController = + dynamic_cast(manager.loopController()); + + bool checkRan = false; + + int mainLocation; + manager.runInMainContext( + [&]() { + expectMainContext(checkRan, &mainLocation, nullptr); + }); + EXPECT_TRUE(checkRan); + + checkRan = false; + + manager.addTask( + [&]() { + int stackLocation; + runInMainContext( + [&]() { + expectMainContext(checkRan, &mainLocation, &stackLocation); + }); + EXPECT_TRUE(checkRan); + } + ); + + loopController.loop( + [&]() { + loopController.stop(); + } + ); + + EXPECT_TRUE(checkRan); +} + +TEST(FiberManager, addTaskFinally) { + FiberManager manager(folly::make_unique()); + auto& loopController = + dynamic_cast(manager.loopController()); + + bool checkRan = false; + + int mainLocation; + + manager.addTaskFinally( + [&]() { + return 1234; + }, + [&](Try&& result) { + EXPECT_EQ(result.value(), 1234); + + expectMainContext(checkRan, &mainLocation, nullptr); + } + ); + + EXPECT_FALSE(checkRan); + + loopController.loop( + [&]() { + loopController.stop(); + } + ); + + EXPECT_TRUE(checkRan); +} + +TEST(FiberManager, fibersPoolWithinLimit) { + FiberManager::Options opts; + opts.maxFibersPoolSize = 5; + + FiberManager manager(folly::make_unique(), opts); + auto& loopController = + dynamic_cast(manager.loopController()); + + size_t fibersRun = 0; + + for (size_t i = 0; i < 5; ++i) { + manager.addTask( + [&]() { + ++fibersRun; + } + ); + } + loopController.loop( + [&]() { + loopController.stop(); + } + ); + + EXPECT_EQ(5, fibersRun); + EXPECT_EQ(5, manager.fibersAllocated()); + EXPECT_EQ(5, manager.fibersPoolSize()); + + for (size_t i = 0; i < 5; ++i) { + manager.addTask( + [&]() { + ++fibersRun; + } + ); + } + loopController.loop( + [&]() { + loopController.stop(); + } + ); + + EXPECT_EQ(10, fibersRun); + EXPECT_EQ(5, manager.fibersAllocated()); + EXPECT_EQ(5, manager.fibersPoolSize()); +} + +TEST(FiberManager, fibersPoolOverLimit) { + FiberManager::Options opts; + opts.maxFibersPoolSize = 5; + + FiberManager manager(folly::make_unique(), opts); + auto& loopController = + dynamic_cast(manager.loopController()); + + size_t fibersRun = 0; + + for (size_t i = 0; i < 10; ++i) { + manager.addTask( + [&]() { + ++fibersRun; + } + ); + } + + EXPECT_EQ(0, fibersRun); + EXPECT_EQ(10, manager.fibersAllocated()); + EXPECT_EQ(0, manager.fibersPoolSize()); + + loopController.loop( + [&]() { + loopController.stop(); + } + ); + + EXPECT_EQ(10, fibersRun); + EXPECT_EQ(5, manager.fibersAllocated()); + EXPECT_EQ(5, manager.fibersPoolSize()); +} + +TEST(FiberManager, remoteFiberBasic) { + FiberManager manager(folly::make_unique()); + auto& loopController = + dynamic_cast(manager.loopController()); + + int result[2]; + result[0] = result[1] = 0; + folly::Optional> savedPromise[2]; + manager.addTask( + [&] () { + result[0] = await([&] (Promise promise) { + savedPromise[0] = std::move(promise); + }); + }); + manager.addTask( + [&] () { + result[1] = await([&] (Promise promise) { + savedPromise[1] = std::move(promise); + }); + }); + + manager.loopUntilNoReady(); + + EXPECT_TRUE(savedPromise[0].hasValue()); + EXPECT_TRUE(savedPromise[1].hasValue()); + EXPECT_EQ(0, result[0]); + EXPECT_EQ(0, result[1]); + + std::thread remoteThread0{ + [&] () { + savedPromise[0]->setValue(42); + } + }; + std::thread remoteThread1{ + [&] () { + savedPromise[1]->setValue(43); + } + }; + remoteThread0.join(); + remoteThread1.join(); + EXPECT_EQ(0, result[0]); + EXPECT_EQ(0, result[1]); + /* Should only have scheduled once */ + EXPECT_EQ(1, loopController.remoteScheduleCalled()); + + manager.loopUntilNoReady(); + EXPECT_EQ(42, result[0]); + EXPECT_EQ(43, result[1]); +} + +TEST(FiberManager, addTaskRemoteBasic) { + FiberManager manager(folly::make_unique()); + + int result[2]; + result[0] = result[1] = 0; + folly::Optional> savedPromise[2]; + + std::thread remoteThread0{ + [&] () { + manager.addTaskRemote( + [&] () { + result[0] = await([&] (Promise promise) { + savedPromise[0] = std::move(promise); + }); + }); + } + }; + std::thread remoteThread1{ + [&] () { + manager.addTaskRemote( + [&] () { + result[1] = await([&] (Promise promise) { + savedPromise[1] = std::move(promise); + }); + }); + } + }; + remoteThread0.join(); + remoteThread1.join(); + + manager.loopUntilNoReady(); + + EXPECT_TRUE(savedPromise[0].hasValue()); + EXPECT_TRUE(savedPromise[1].hasValue()); + EXPECT_EQ(0, result[0]); + EXPECT_EQ(0, result[1]); + + savedPromise[0]->setValue(42); + savedPromise[1]->setValue(43); + + EXPECT_EQ(0, result[0]); + EXPECT_EQ(0, result[1]); + + manager.loopUntilNoReady(); + EXPECT_EQ(42, result[0]); + EXPECT_EQ(43, result[1]); +} + +TEST(FiberManager, remoteHasTasks) { + size_t counter = 0; + FiberManager fm(folly::make_unique()); + std::thread remote([&]() { + fm.addTaskRemote([&]() { + ++counter; + }); + }); + + remote.join(); + + while (fm.hasTasks()) { + fm.loopUntilNoReady(); + } + + EXPECT_FALSE(fm.hasTasks()); + EXPECT_EQ(counter, 1); +} + +TEST(FiberManager, remoteHasReadyTasks) { + int result = 0; + folly::Optional> savedPromise; + FiberManager fm(folly::make_unique()); + std::thread remote([&]() { + fm.addTaskRemote([&]() { + result = await([&](Promise promise) { + savedPromise = std::move(promise); + }); + EXPECT_TRUE(fm.hasTasks()); + }); + }); + + remote.join(); + EXPECT_TRUE(fm.hasTasks()); + + fm.loopUntilNoReady(); + EXPECT_TRUE(fm.hasTasks()); + + std::thread remote2([&](){ + savedPromise->setValue(47); + }); + remote2.join(); + EXPECT_TRUE(fm.hasTasks()); + + fm.loopUntilNoReady(); + EXPECT_FALSE(fm.hasTasks()); + + EXPECT_EQ(result, 47); +} + +static size_t sNumAwaits; + +void runBenchmark(size_t numAwaits, size_t toSend) { + sNumAwaits = numAwaits; + + FiberManager fiberManager(folly::make_unique()); + auto& loopController = + dynamic_cast(fiberManager.loopController()); + + std::queue> pendingRequests; + static const size_t maxOutstanding = 5; + + auto loop = [&fiberManager, &loopController, &pendingRequests, &toSend]() { + if (pendingRequests.size() == maxOutstanding || toSend == 0) { + if (pendingRequests.empty()) { + return; + } + pendingRequests.front().setValue(0); + pendingRequests.pop(); + } else { + fiberManager.addTask([&pendingRequests]() { + for (size_t i = 0; i < sNumAwaits; ++i) { + auto result = await( + [&pendingRequests](Promise promise) { + pendingRequests.push(std::move(promise)); + }); + assert(result == 0); + } + }); + + if (--toSend == 0) { + loopController.stop(); + } + } + }; + + loopController.loop(std::move(loop)); +} + +BENCHMARK(FiberManagerBasicOneAwait, iters) { + runBenchmark(1, iters); +} + +BENCHMARK(FiberManagerBasicFiveAwaits, iters) { + runBenchmark(5, iters); +} diff --git a/folly/experimental/fibers/test/FibersTestApp.cpp b/folly/experimental/fibers/test/FibersTestApp.cpp new file mode 100644 index 00000000..116f7119 --- /dev/null +++ b/folly/experimental/fibers/test/FibersTestApp.cpp @@ -0,0 +1,96 @@ +/* + * Copyright 2015 Facebook, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#include +#include + +#include + +#include +#include + +using namespace folly::fibers; + +struct Application { + public: + Application () + : fiberManager(folly::make_unique()), + toSend(20), + maxOutstanding(5) { + } + + void loop() { + if (pendingRequests.size() == maxOutstanding || toSend == 0) { + if (pendingRequests.empty()) { + return; + } + intptr_t value = rand()%1000; + std::cout << "Completing request with data = " << value << std::endl; + + pendingRequests.front().setValue(value); + pendingRequests.pop(); + } else { + static size_t id_counter = 1; + size_t id = id_counter++; + std::cout << "Adding new request with id = " << id << std::endl; + + fiberManager.addTask([this, id]() { + std::cout << "Executing fiber with id = " << id << std::endl; + + auto result1 = await( + [this](Promise fiber) { + pendingRequests.push(std::move(fiber)); + }); + + std::cout << "Fiber id = " << id + << " got result1 = " << result1 << std::endl; + + auto result2 = await + ([this](Promise fiber) { + pendingRequests.push(std::move(fiber)); + }); + std::cout << "Fiber id = " << id + << " got result2 = " << result2 << std::endl; + }); + + if (--toSend == 0) { + auto& loopController = + dynamic_cast(fiberManager.loopController()); + loopController.stop(); + } + } + } + + FiberManager fiberManager; + + std::queue> pendingRequests; + size_t toSend; + size_t maxOutstanding; +}; + +int main() { + Application app; + + auto loop = [&app]() { + app.loop(); + }; + + auto& loopController = + dynamic_cast(app.fiberManager.loopController()); + + loopController.loop(std::move(loop)); + + return 0; +} diff --git a/folly/experimental/fibers/test/Makefile.am b/folly/experimental/fibers/test/Makefile.am new file mode 100644 index 00000000..707ea33b --- /dev/null +++ b/folly/experimental/fibers/test/Makefile.am @@ -0,0 +1,8 @@ +check_PROGRAMS = mcrouter_fibers_test + +mcrouter_fibers_test_SOURCES = \ + FibersTest.cpp \ + main.cpp + +mcrouter_fibers_test_CPPFLAGS = -I$(top_srcdir)/oss_include +mcrouter_fibers_test_LDADD = $(top_builddir)/lib/libmcrouter.a -lgtest -lfollybenchmark diff --git a/folly/experimental/fibers/test/main.cpp b/folly/experimental/fibers/test/main.cpp new file mode 100644 index 00000000..083af7e5 --- /dev/null +++ b/folly/experimental/fibers/test/main.cpp @@ -0,0 +1,31 @@ +/* + * Copyright 2015 Facebook, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#include + +#include + +// for backward compatibility with gflags +namespace gflags { } +namespace google { using namespace gflags; } + +int main(int argc, char** argv) { + testing::InitGoogleTest(&argc, argv); + google::ParseCommandLineFlags(&argc, &argv, true); + + auto rc = RUN_ALL_TESTS(); + folly::runBenchmarksOnFlag(); + return rc; +} diff --git a/folly/experimental/fibers/traits.h b/folly/experimental/fibers/traits.h new file mode 100644 index 00000000..f777f610 --- /dev/null +++ b/folly/experimental/fibers/traits.h @@ -0,0 +1,70 @@ +/* + * Copyright 2015 Facebook, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#pragma once + +#include + +namespace folly { namespace fibers { + +/** + * For any functor F taking >= 1 argument, + * FirstArgOf::type is the type of F's first parameter. + * + * Rationale: we want to declare a function func(F), where F has the + * signature `void(X)` and func should return T (T and X are some types). + * Solution: + * + * template + * T::type> + * func(F&& f); + */ + +namespace detail { + +/** + * If F is a pointer-to-member, will contain a typedef type + * with the type of F's first parameter + */ +template +struct ExtractFirstMemfn; + +template +struct ExtractFirstMemfn { + typedef First type; +}; + +template +struct ExtractFirstMemfn { + typedef First type; +}; + +} // detail + +/** Default - use boost */ +template +struct FirstArgOf { + typedef typename boost::function_traits< + typename std::remove_pointer::type>::arg1_type type; +}; + +/** Specialization for function objects */ +template +struct FirstArgOf::value>::type> { + typedef typename detail::ExtractFirstMemfn< + decltype(&F::operator())>::type type; +}; + +}} // folly::fibers -- 2.34.1