--- /dev/null
+/*
+ * Copyright 2015 Facebook, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#include <memory>
+#include <vector>
+
+#include <folly/experimental/fibers/FiberManager.h>
+
+namespace folly { namespace fibers {
+
+template <typename T>
+TaskIterator<T>::TaskIterator(TaskIterator&& other) noexcept
+ : context_(std::move(other.context_)),
+ id_(other.id_) {
+}
+
+template <typename T>
+TaskIterator<T>::TaskIterator(std::shared_ptr<Context> context)
+ : context_(std::move(context)),
+ id_(-1) {
+ assert(context_);
+}
+
+template <typename T>
+inline bool TaskIterator<T>::hasCompleted() const {
+ return context_->tasksConsumed < context_->results.size();
+}
+
+template <typename T>
+inline bool TaskIterator<T>::hasPending() const {
+ return !context_.unique();
+}
+
+template <typename T>
+inline bool TaskIterator<T>::hasNext() const {
+ return hasPending() || hasCompleted();
+}
+
+template <typename T>
+folly::Try<T> TaskIterator<T>::awaitNextResult() {
+ assert(hasCompleted() || hasPending());
+ reserve(1);
+
+ size_t i = context_->tasksConsumed++;
+ id_ = context_->results[i].first;
+ return std::move(context_->results[i].second);
+}
+
+template <typename T>
+inline T TaskIterator<T>::awaitNext() {
+ return std::move(awaitNextResult().value());
+}
+
+template <>
+inline void TaskIterator<void>::awaitNext() {
+ awaitNextResult().value();
+}
+
+template <typename T>
+inline void TaskIterator<T>::reserve(size_t n) {
+ size_t tasksReady = context_->results.size() - context_->tasksConsumed;
+
+ // we don't need to do anything if there are already n or more tasks complete
+ // or if we have no tasks left to execute.
+ if (!hasPending() || tasksReady >= n) {
+ return;
+ }
+
+ n -= tasksReady;
+ size_t tasksLeft = context_->totalTasks - context_->results.size();
+ n = std::min(n, tasksLeft);
+
+ await(
+ [this, n](Promise<void> promise) {
+ context_->tasksToFulfillPromise = n;
+ context_->promise.assign(std::move(promise));
+ });
+}
+
+template <typename T>
+inline size_t TaskIterator<T>::getTaskID() const {
+ assert(id_ != -1);
+ return id_;
+}
+
+template <class InputIterator>
+TaskIterator<typename std::result_of<
+ typename std::iterator_traits<InputIterator>::value_type()>::type>
+addTasks(InputIterator first, InputIterator last) {
+ typedef typename std::result_of<
+ typename std::iterator_traits<InputIterator>::value_type()>::type
+ ResultType;
+ typedef TaskIterator<ResultType> IteratorType;
+
+ auto context = std::make_shared<typename IteratorType::Context>();
+ context->totalTasks = std::distance(first, last);
+ context->results.reserve(context->totalTasks);
+
+ for (size_t i = 0; first != last; ++i, ++first) {
+#ifdef __clang__
+#pragma clang diagnostic push // ignore generalized lambda capture warning
+#pragma clang diagnostic ignored "-Wc++1y-extensions"
+#endif
+ addTask(
+ [i, context, f = std::move(*first)]() {
+ context->results.emplace_back(i, folly::makeTryFunction(std::move(f)));
+
+ // Check for awaiting iterator.
+ if (context->promise.hasValue()) {
+ if (--context->tasksToFulfillPromise == 0) {
+ context->promise->setValue();
+ context->promise.clear();
+ }
+ }
+ }
+ );
+#ifdef __clang__
+#pragma clang diagnostic pop
+#endif
+ }
+
+ return IteratorType(std::move(context));
+}
+
+}}
--- /dev/null
+/*
+ * Copyright 2015 Facebook, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#pragma once
+
+#include <functional>
+#include <vector>
+
+#include <folly/Optional.h>
+#include <folly/experimental/fibers/Promise.h>
+#include <folly/futures/Try.h>
+
+namespace folly { namespace fibers {
+
+template <typename T>
+class TaskIterator;
+
+/**
+ * Schedules several tasks and immediately returns an iterator, that
+ * allow to traverse tasks in the order of their completion. All results and
+ * exptions thrown are stored alongside with the task id and are
+ * accessible via iterator.
+ *
+ * @param first Range of tasks to be scheduled
+ * @param last
+ *
+ * @return movable, non-copyable iterator
+ */
+template <class InputIterator>
+TaskIterator<
+ typename std::result_of<
+ typename std::iterator_traits<InputIterator>::value_type()>::type>
+inline addTasks(InputIterator first, InputIterator last);
+
+template <typename T>
+class TaskIterator {
+ public:
+ typedef T value_type;
+
+ // not copyable
+ TaskIterator(const TaskIterator& other) = delete;
+ TaskIterator& operator=(const TaskIterator& other) = delete;
+
+ // movable
+ TaskIterator(TaskIterator&& other) noexcept;
+ TaskIterator& operator=(TaskIterator&& other) = delete;
+
+ /**
+ * @return True if there are tasks immediately available to be consumed (no
+ * need to await on them).
+ */
+ bool hasCompleted() const;
+
+ /**
+ * @return True if there are tasks pending execution (need to awaited on).
+ */
+ bool hasPending() const;
+
+ /**
+ * @return True if there are any tasks (hasCompleted() || hasPending()).
+ */
+ bool hasNext() const;
+
+ /**
+ * Await for another task to complete. Will not await if the result is
+ * already available.
+ *
+ * @return result of the task completed.
+ * @throw exception thrown by the task.
+ */
+ T awaitNext();
+
+ /**
+ * Await until the specified number of tasks completes or there are no
+ * tasks left to await for.
+ * Note: Will not await if there are already the specified number of tasks
+ * available.
+ *
+ * @param n Number of tasks to await for completition.
+ */
+ void reserve(size_t n);
+
+ /**
+ * @return id of the last task that was processed by awaitNext().
+ */
+ size_t getTaskID() const;
+
+ private:
+ template <class InputIterator>
+ friend TaskIterator<
+ typename std::result_of<
+ typename std::iterator_traits<InputIterator>::value_type()>::type>
+ addTasks(InputIterator first, InputIterator last);
+
+ struct Context {
+ std::vector<std::pair<size_t, folly::Try<T>>> results;
+ folly::Optional<Promise<void>> promise;
+ size_t totalTasks{0};
+ size_t tasksConsumed{0};
+ size_t tasksToFulfillPromise{0};
+ };
+
+ std::shared_ptr<Context> context_;
+ size_t id_;
+
+ explicit TaskIterator(std::shared_ptr<Context> context);
+
+ folly::Try<T> awaitNextResult();
+};
+
+}}
+
+#include <folly/experimental/fibers/AddTasks-inl.h>
--- /dev/null
+/*
+ * Copyright 2015 Facebook, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#include <folly/experimental/fibers/Fiber.h>
+#include <folly/experimental/fibers/FiberManager.h>
+
+namespace folly { namespace fibers {
+
+inline Baton::Baton() : Baton(NO_WAITER) {
+ assert(Baton(NO_WAITER).futex_.futex == static_cast<uint32_t>(NO_WAITER));
+ assert(Baton(POSTED).futex_.futex == static_cast<uint32_t>(POSTED));
+ assert(Baton(TIMEOUT).futex_.futex == static_cast<uint32_t>(TIMEOUT));
+ assert(Baton(THREAD_WAITING).futex_.futex ==
+ static_cast<uint32_t>(THREAD_WAITING));
+
+ assert(futex_.futex.is_lock_free());
+ assert(waitingFiber_.is_lock_free());
+}
+
+template <typename F>
+void Baton::wait(F&& mainContextFunc) {
+ auto fm = FiberManager::getFiberManagerUnsafe();
+ if (!fm || !fm->activeFiber_) {
+ mainContextFunc();
+ return waitThread();
+ }
+
+ return waitFiber(*fm, std::forward<F>(mainContextFunc));
+}
+
+template <typename F>
+void Baton::waitFiber(FiberManager& fm, F&& mainContextFunc) {
+ auto& waitingFiber = waitingFiber_;
+ auto f = [&mainContextFunc, &waitingFiber](Fiber& fiber) mutable {
+ auto baton_fiber = waitingFiber.load();
+ do {
+ if (LIKELY(baton_fiber == NO_WAITER)) {
+ continue;
+ } else if (baton_fiber == POSTED || baton_fiber == TIMEOUT) {
+ fiber.setData(0);
+ break;
+ } else {
+ throw std::logic_error("Some Fiber is already waiting on this Baton.");
+ }
+ } while(!waitingFiber.compare_exchange_weak(
+ baton_fiber,
+ reinterpret_cast<intptr_t>(&fiber)));
+
+ mainContextFunc();
+ };
+
+ fm.awaitFunc_ = std::ref(f);
+ fm.activeFiber_->preempt(Fiber::AWAITING);
+}
+
+template <typename F>
+bool Baton::timed_wait(TimeoutController::Duration timeout,
+ F&& mainContextFunc) {
+ auto fm = FiberManager::getFiberManagerUnsafe();
+
+ if (!fm || !fm->activeFiber_) {
+ mainContextFunc();
+ return timedWaitThread(timeout);
+ }
+
+ auto& baton = *this;
+ bool canceled = false;
+ auto timeoutFunc = [&baton, &canceled]() mutable {
+ baton.postHelper(TIMEOUT);
+ canceled = true;
+ };
+
+ auto id = fm->timeoutManager_->registerTimeout(
+ std::ref(timeoutFunc), timeout);
+
+ waitFiber(*fm, std::move(mainContextFunc));
+
+ auto posted = waitingFiber_ == POSTED;
+
+ if (!canceled) {
+ fm->timeoutManager_->cancel(id);
+ }
+
+ return posted;
+}
+
+template<typename C, typename D>
+bool Baton::timed_wait(const std::chrono::time_point<C,D>& timeout) {
+ auto now = C::now();
+
+ if (LIKELY(now <= timeout)) {
+ return timed_wait(
+ std::chrono::duration_cast<std::chrono::milliseconds>(timeout - now));
+ } else {
+ return timed_wait(TimeoutController::Duration(0));
+ }
+}
+
+
+}}
--- /dev/null
+/*
+ * Copyright 2015 Facebook, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#include "Baton.h"
+
+#include <folly/detail/MemoryIdler.h>
+
+namespace folly { namespace fibers {
+
+void Baton::wait() {
+ wait([](){});
+}
+
+bool Baton::timed_wait(TimeoutController::Duration timeout) {
+ return timed_wait(timeout, [](){});
+}
+
+void Baton::waitThread() {
+ if (spinWaitForEarlyPost()) {
+ assert(waitingFiber_.load(std::memory_order_acquire) == POSTED);
+ return;
+ }
+
+ auto fiber = waitingFiber_.load();
+
+ if (LIKELY(fiber == NO_WAITER &&
+ waitingFiber_.compare_exchange_strong(fiber, THREAD_WAITING))) {
+ do {
+ folly::detail::MemoryIdler::futexWait(futex_.futex, THREAD_WAITING);
+ fiber = waitingFiber_.load(std::memory_order_relaxed);
+ } while (fiber == THREAD_WAITING);
+ }
+
+ if (LIKELY(fiber == POSTED)) {
+ return;
+ }
+
+ // Handle errors
+ if (fiber == TIMEOUT) {
+ throw std::logic_error("Thread baton can't have timeout status");
+ }
+ if (fiber == THREAD_WAITING) {
+ throw std::logic_error("Other thread is already waiting on this baton");
+ }
+ throw std::logic_error("Other fiber is already waiting on this baton");
+}
+
+bool Baton::spinWaitForEarlyPost() {
+ static_assert(PreBlockAttempts > 0,
+ "isn't this assert clearer than an uninitialized variable warning?");
+ for (int i = 0; i < PreBlockAttempts; ++i) {
+ if (try_wait()) {
+ // hooray!
+ return true;
+ }
+#if FOLLY_X64
+ // The pause instruction is the polite way to spin, but it doesn't
+ // actually affect correctness to omit it if we don't have it.
+ // Pausing donates the full capabilities of the current core to
+ // its other hyperthreads for a dozen cycles or so
+ asm volatile ("pause");
+#endif
+ }
+
+ return false;
+}
+
+bool Baton::timedWaitThread(TimeoutController::Duration timeout) {
+ if (spinWaitForEarlyPost()) {
+ assert(waitingFiber_.load(std::memory_order_acquire) == POSTED);
+ return true;
+ }
+
+ auto fiber = waitingFiber_.load();
+
+ if (LIKELY(fiber == NO_WAITER &&
+ waitingFiber_.compare_exchange_strong(fiber, THREAD_WAITING))) {
+ auto deadline = TimeoutController::Clock::now() + timeout;
+ do {
+ const auto wait_rv =
+ futex_.futex.futexWaitUntil(THREAD_WAITING, deadline);
+ if (wait_rv == folly::detail::FutexResult::TIMEDOUT) {
+ return false;
+ }
+ fiber = waitingFiber_.load(std::memory_order_relaxed);
+ } while (fiber == THREAD_WAITING);
+ }
+
+ if (LIKELY(fiber == POSTED)) {
+ return true;
+ }
+
+ // Handle errors
+ if (fiber == TIMEOUT) {
+ throw std::logic_error("Thread baton can't have timeout status");
+ }
+ if (fiber == THREAD_WAITING) {
+ throw std::logic_error("Other thread is already waiting on this baton");
+ }
+ throw std::logic_error("Other fiber is already waiting on this baton");
+}
+
+void Baton::post() {
+ postHelper(POSTED);
+}
+
+void Baton::postHelper(intptr_t new_value) {
+ auto fiber = waitingFiber_.load();
+
+ do {
+ if (fiber == THREAD_WAITING) {
+ assert(new_value == POSTED);
+
+ return postThread();
+ }
+
+ if (fiber == POSTED || fiber == TIMEOUT) {
+ return;
+ }
+ } while (!waitingFiber_.compare_exchange_weak(fiber, new_value));
+
+ if (fiber != NO_WAITER) {
+ reinterpret_cast<Fiber*>(fiber)->setData(0);
+ }
+}
+
+bool Baton::try_wait() {
+ auto state = waitingFiber_.load();
+ return state == POSTED;
+}
+
+void Baton::postThread() {
+ auto expected = THREAD_WAITING;
+
+ if (!waitingFiber_.compare_exchange_strong(expected, POSTED)) {
+ return;
+ }
+
+ futex_.futex.futexWake(1);
+}
+
+void Baton::reset() {
+ waitingFiber_.store(NO_WAITER, std::memory_order_relaxed);;
+}
+
+}}
--- /dev/null
+/*
+ * Copyright 2015 Facebook, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#pragma once
+
+#include <atomic>
+
+#include <folly/detail/Futex.h>
+#include <folly/experimental/fibers/TimeoutController.h>
+
+namespace folly { namespace fibers {
+
+class Fiber;
+
+/**
+ * @class Baton
+ *
+ * Primitive which allows to put current Fiber to sleep and wake it from another
+ * Fiber/thread.
+ */
+class Baton {
+ public:
+ Baton();
+
+ ~Baton() {}
+
+ /**
+ * Puts active fiber to sleep. Returns when post is called.
+ */
+ void wait();
+
+ /**
+ * Puts active fiber to sleep. Returns when post is called.
+ *
+ * @param mainContextFunc this function is immediately executed on the main
+ * context.
+ */
+ template <typename F>
+ void wait(F&& mainContextFunc);
+
+ /**
+ * This is here only not break tao/locks. Please don't use it, because it is
+ * inefficient when used on Fibers.
+ */
+ template<typename C, typename D = typename C::duration>
+ bool timed_wait(const std::chrono::time_point<C,D>& timeout);
+
+ /**
+ * Puts active fiber to sleep. Returns when post is called.
+ *
+ * @param timeout Baton will be automatically awaken if timeout is hit
+ *
+ * @return true if was posted, false if timeout expired
+ */
+ bool timed_wait(TimeoutController::Duration timeout);
+
+ /**
+ * Puts active fiber to sleep. Returns when post is called.
+ *
+ * @param timeout Baton will be automatically awaken if timeout is hit
+ * @param mainContextFunc this function is immediately executed on the main
+ * context.
+ *
+ * @return true if was posted, false if timeout expired
+ */
+ template <typename F>
+ bool timed_wait(TimeoutController::Duration timeout, F&& mainContextFunc);
+
+ /**
+ * Checks if the baton has been posted without blocking.
+ * @return true iff the baton has been posted.
+ */
+ bool try_wait();
+
+ /**
+ * Wakes up Fiber which was waiting on this Baton (or if no Fiber is waiting,
+ * next wait() call will return immediately).
+ */
+ void post();
+
+ /**
+ * Reset's the baton (equivalent to destroying the object and constructing
+ * another one in place).
+ * Caller is responsible for making sure no one is waiting on/posting the
+ * baton when reset() is called.
+ */
+ void reset();
+
+ private:
+ enum {
+ /**
+ * Must be positive. If multiple threads are actively using a
+ * higher-level data structure that uses batons internally, it is
+ * likely that the post() and wait() calls happen almost at the same
+ * time. In this state, we lose big 50% of the time if the wait goes
+ * to sleep immediately. On circa-2013 devbox hardware it costs about
+ * 7 usec to FUTEX_WAIT and then be awoken (half the t/iter as the
+ * posix_sem_pingpong test in BatonTests). We can improve our chances
+ * of early post by spinning for a bit, although we have to balance
+ * this against the loss if we end up sleeping any way. Spins on this
+ * hw take about 7 nanos (all but 0.5 nanos is the pause instruction).
+ * We give ourself 300 spins, which is about 2 usec of waiting. As a
+ * partial consolation, since we are using the pause instruction we
+ * are giving a speed boost to the colocated hyperthread.
+ */
+ PreBlockAttempts = 300,
+ };
+
+ explicit Baton(intptr_t state) : waitingFiber_(state) {};
+
+ void postHelper(intptr_t new_value);
+ void postThread();
+ void waitThread();
+
+ template <typename F>
+ inline void waitFiber(FiberManager& fm, F&& mainContextFunc);
+ /**
+ * Spin for "some time" (see discussion on PreBlockAttempts) waiting
+ * for a post.
+ * @return true if we received a post the spin wait, false otherwise. If the
+ * function returns true then Baton state is guaranteed to be POSTED
+ */
+ bool spinWaitForEarlyPost();
+
+ bool timedWaitThread(TimeoutController::Duration timeout);
+
+ static constexpr intptr_t NO_WAITER = 0;
+ static constexpr intptr_t POSTED = -1;
+ static constexpr intptr_t TIMEOUT = -2;
+ static constexpr intptr_t THREAD_WAITING = -3;
+
+ union {
+ std::atomic<intptr_t> waitingFiber_;
+ struct {
+ folly::detail::Futex<> futex;
+ int32_t _unused_packing;
+ } futex_;
+ };
+};
+
+}}
+
+#include <folly/experimental/fibers/Baton-inl.h>
--- /dev/null
+/*
+ * Copyright 2015 Facebook, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#pragma once
+
+#include <boost/context/all.hpp>
+#include <boost/version.hpp>
+
+/**
+ * Wrappers for different versions of boost::context library
+ * API reference for different versions
+ * Boost 1.51: http://www.boost.org/doc/libs/1_51_0/libs/context/doc/html/context/context/boost_fcontext.html
+ * Boost 1.52: http://www.boost.org/doc/libs/1_52_0/libs/context/doc/html/context/context/boost_fcontext.html
+ * Boost 1.56: http://www.boost.org/doc/libs/1_56_0/libs/context/doc/html/context/context/boost_fcontext.html
+ */
+
+namespace folly { namespace fibers {
+
+struct FContext {
+ public:
+
+#if BOOST_VERSION >= 105200
+ using ContextStruct = boost::context::fcontext_t;
+#else
+ using ContextStruct = boost::ctx::fcontext_t;
+#endif
+
+ void* stackLimit() const {
+ return stackLimit_;
+ }
+
+ void* stackBase() const {
+ return stackBase_;
+ }
+
+ private:
+ void* stackLimit_;
+ void* stackBase_;
+
+#if BOOST_VERSION >= 105600
+ ContextStruct context_;
+#elif BOOST_VERSION >= 105200
+ ContextStruct* context_;
+#else
+ ContextStruct context_;
+#endif
+
+ friend intptr_t jumpContext(FContext* oldC, FContext::ContextStruct* newC,
+ intptr_t p);
+ friend intptr_t jumpContext(FContext::ContextStruct* oldC, FContext* newC,
+ intptr_t p);
+ friend FContext makeContext(void* stackLimit, size_t stackSize,
+ void(*fn)(intptr_t));
+};
+
+inline intptr_t jumpContext(FContext* oldC, FContext::ContextStruct* newC,
+ intptr_t p) {
+
+#if BOOST_VERSION >= 105600
+ return boost::context::jump_fcontext(&oldC->context_, *newC, p);
+#elif BOOST_VERSION >= 105200
+ return boost::context::jump_fcontext(oldC->context_, newC, p);
+#else
+ return jump_fcontext(&oldC->context_, newC, p);
+#endif
+
+}
+
+inline intptr_t jumpContext(FContext::ContextStruct* oldC, FContext* newC,
+ intptr_t p) {
+
+#if BOOST_VERSION >= 105200
+ return boost::context::jump_fcontext(oldC, newC->context_, p);
+#else
+ return jump_fcontext(oldC, &newC->context_, p);
+#endif
+
+}
+
+inline FContext makeContext(void* stackLimit, size_t stackSize,
+ void(*fn)(intptr_t)) {
+ FContext res;
+ res.stackLimit_ = stackLimit;
+ res.stackBase_ = static_cast<unsigned char*>(stackLimit) + stackSize;
+
+#if BOOST_VERSION >= 105200
+ res.context_ = boost::context::make_fcontext(res.stackBase_, stackSize, fn);
+#else
+ res.context_.fc_stack.limit = stackLimit;
+ res.context_.fc_stack.base = res.stackBase_;
+ make_fcontext(&res.context_, fn);
+#endif
+
+ return res;
+}
+
+}} // folly::fibers
--- /dev/null
+/*
+ * Copyright 2015 Facebook, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#include <folly/Memory.h>
+#include <folly/experimental/fibers/EventBaseLoopController.h>
+#include <folly/experimental/fibers/FiberManager.h>
+#include <folly/io/async/EventBase.h>
+
+namespace folly { namespace fibers {
+
+class EventBaseLoopController::ControllerCallback :
+ public folly::EventBase::LoopCallback {
+ public:
+ explicit ControllerCallback(EventBaseLoopController& controller)
+ : controller_(controller) {}
+
+ void runLoopCallback() noexcept override {
+ controller_.runLoop();
+ }
+ private:
+ EventBaseLoopController& controller_;
+};
+
+inline EventBaseLoopController::EventBaseLoopController()
+ : callback_(folly::make_unique<ControllerCallback>(*this)) {
+}
+
+inline EventBaseLoopController::~EventBaseLoopController() {
+ callback_->cancelLoopCallback();
+}
+
+inline void EventBaseLoopController::attachEventBase(
+ folly::EventBase& eventBase) {
+
+ if (eventBase_ != nullptr) {
+ LOG(ERROR) << "Attempt to reattach EventBase to LoopController";
+ }
+
+ eventBase_ = &eventBase;
+
+ eventBaseAttached_ = true;
+
+ if (awaitingScheduling_) {
+ schedule();
+ }
+}
+
+inline void EventBaseLoopController::setFiberManager(FiberManager* fm) {
+ fm_ = fm;
+}
+
+inline void EventBaseLoopController::schedule() {
+ if (eventBase_ == nullptr) {
+ // In this case we need to postpone scheduling.
+ awaitingScheduling_ = true;
+ } else {
+ // Schedule it to run in current iteration.
+ eventBase_->runInLoop(callback_.get(), true);
+ awaitingScheduling_ = false;
+ }
+}
+
+inline void EventBaseLoopController::cancel() {
+ callback_->cancelLoopCallback();
+}
+
+inline void EventBaseLoopController::runLoop() {
+ fm_->loopUntilNoReady();
+}
+
+inline void EventBaseLoopController::scheduleThreadSafe() {
+ /* The only way we could end up here is if
+ 1) Fiber thread creates a fiber that awaits (which means we must
+ have already attached, fiber thread wouldn't be running).
+ 2) We move the promise to another thread (this move is a memory fence)
+ 3) We fulfill the promise from the other thread. */
+ assert(eventBaseAttached_);
+ eventBase_->runInEventBaseThread([this] () { runLoop(); });
+}
+
+inline void EventBaseLoopController::timedSchedule(std::function<void()> func,
+ TimePoint time) {
+ assert(eventBaseAttached_);
+
+ // We want upper bound for the cast, thus we just add 1
+ auto delay_ms = std::chrono::duration_cast<
+ std::chrono::milliseconds>(time - Clock::now()).count() + 1;
+ // If clock is not monotonic
+ delay_ms = std::max(delay_ms, 0L);
+ eventBase_->tryRunAfterDelay(func, delay_ms);
+}
+
+}} // folly::fibers
--- /dev/null
+/*
+ * Copyright 2015 Facebook, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#pragma once
+
+#include <memory>
+#include <folly/experimental/fibers/LoopController.h>
+
+namespace folly {
+class EventBase;
+}
+
+namespace folly { namespace fibers {
+
+class FiberManager;
+
+class EventBaseLoopController : public LoopController {
+ public:
+ explicit EventBaseLoopController();
+ ~EventBaseLoopController();
+
+ /**
+ * Attach EventBase after LoopController was created.
+ */
+ void attachEventBase(folly::EventBase& eventBase);
+
+ folly::EventBase* getEventBase() {
+ return eventBase_;
+ }
+
+ private:
+ class ControllerCallback;
+
+ bool awaitingScheduling_{false};
+ folly::EventBase* eventBase_{nullptr};
+ std::unique_ptr<ControllerCallback> callback_;
+ FiberManager* fm_{nullptr};
+ std::atomic<bool> eventBaseAttached_{false};
+
+ /* LoopController interface */
+
+ void setFiberManager(FiberManager* fm) override;
+ void schedule() override;
+ void cancel() override;
+ void runLoop();
+ void scheduleThreadSafe() override;
+ void timedSchedule(std::function<void()> func, TimePoint time) override;
+
+ friend class FiberManager;
+};
+
+}} // folly::fibers
+
+#include "EventBaseLoopController-inl.h"
--- /dev/null
+/*
+ * Copyright 2015 Facebook, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#pragma once
+
+#include <cassert>
+
+namespace folly { namespace fibers {
+
+template <typename F>
+void Fiber::setFunction(F&& func) {
+ assert(state_ == INVALID);
+ func_ = std::move(func);
+ state_ = NOT_STARTED;
+}
+
+template <typename F, typename G>
+void Fiber::setFunctionFinally(F&& resultFunc,
+ G&& finallyFunc) {
+ assert(state_ == INVALID);
+ resultFunc_ = std::move(resultFunc);
+ finallyFunc_ = std::move(finallyFunc);
+ state_ = NOT_STARTED;
+}
+
+inline void* Fiber::getUserBuffer() {
+ return &userBuffer_;
+}
+
+template <typename G>
+void Fiber::setReadyFunction(G&& func) {
+ assert(state_ == INVALID || state_ == NOT_STARTED);
+ readyFunc_ = std::move(func);
+}
+
+}} // folly::fibers
--- /dev/null
+/*
+ * Copyright 2015 Facebook, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#include "Fiber.h"
+
+#include <sys/syscall.h>
+#include <unistd.h>
+
+#include <algorithm>
+#include <cassert>
+#include <cstring>
+#include <stdexcept>
+
+#include <folly/Likely.h>
+#include <folly/Portability.h>
+#include <folly/experimental/fibers/BoostContextCompatibility.h>
+#include <folly/experimental/fibers/FiberManager.h>
+
+namespace folly { namespace fibers {
+
+namespace {
+static const uint64_t kMagic8Bytes = 0xfaceb00cfaceb00c;
+
+pid_t localThreadId() {
+ static thread_local pid_t threadId = syscall(SYS_gettid);
+ return threadId;
+}
+
+static void fillMagic(const FContext& context) {
+ uint64_t* begin = static_cast<uint64_t*>(context.stackLimit());
+ uint64_t* end = static_cast<uint64_t*>(context.stackBase());
+
+ std::fill(begin, end, kMagic8Bytes);
+}
+
+/* Size of the region from p + nBytes down to the last non-magic value */
+static size_t nonMagicInBytes(const FContext& context) {
+ uint64_t* begin = static_cast<uint64_t*>(context.stackLimit());
+ uint64_t* end = static_cast<uint64_t*>(context.stackBase());
+
+ auto firstNonMagic = std::find_if(
+ begin, end,
+ [](uint64_t val) {
+ return val != kMagic8Bytes;
+ }
+ );
+
+ return (end - firstNonMagic) * sizeof(uint64_t);
+}
+
+} // anonymous namespace
+
+void Fiber::setData(intptr_t data) {
+ assert(state_ == AWAITING);
+ data_ = data;
+ state_ = READY_TO_RUN;
+
+ if (LIKELY(threadId_ == localThreadId())) {
+ fiberManager_.readyFibers_.push_back(*this);
+ fiberManager_.ensureLoopScheduled();
+ } else {
+ fiberManager_.remoteReadyInsert(this);
+ }
+}
+
+Fiber::Fiber(FiberManager& fiberManager) :
+ fiberManager_(fiberManager) {
+
+ auto size = fiberManager_.options_.stackSize;
+ auto limit = fiberManager_.stackAllocator_.allocate(size);
+
+ fcontext_ = makeContext(limit, size, &Fiber::fiberFuncHelper);
+
+ if (UNLIKELY(fiberManager_.options_.debugRecordStackUsed)) {
+ fillMagic(fcontext_);
+ }
+}
+
+Fiber::~Fiber() {
+ fiberManager_.stackAllocator_.deallocate(
+ static_cast<unsigned char*>(fcontext_.stackLimit()),
+ fiberManager_.options_.stackSize);
+}
+
+void Fiber::recordStackPosition() {
+ int stackDummy;
+ fiberManager_.stackHighWatermark_ =
+ std::max(fiberManager_.stackHighWatermark_,
+ static_cast<size_t>(
+ static_cast<unsigned char*>(fcontext_.stackBase()) -
+ static_cast<unsigned char*>(
+ static_cast<void*>(&stackDummy))));
+}
+
+void Fiber::fiberFuncHelper(intptr_t fiber) {
+ reinterpret_cast<Fiber*>(fiber)->fiberFunc();
+}
+
+/*
+ * Some weird bug in ASAN causes fiberFunc to allocate boundless amounts of
+ * memory inside __asan_handle_no_return. Work around this in ASAN builds by
+ * tricking the compiler into thinking it may, someday, return.
+ */
+#ifdef FOLLY_SANITIZE_ADDRESS
+volatile bool loopForever = true;
+#else
+static constexpr bool loopForever = true;
+#endif
+
+void Fiber::fiberFunc() {
+ while (loopForever) {
+ assert(state_ == NOT_STARTED);
+
+ threadId_ = localThreadId();
+ state_ = RUNNING;
+
+ try {
+ if (resultFunc_) {
+ assert(finallyFunc_);
+ assert(!func_);
+
+ resultFunc_();
+ } else {
+ assert(func_);
+ func_();
+ }
+ } catch (...) {
+ fiberManager_.exceptionCallback_(std::current_exception(),
+ "running Fiber func_/resultFunc_");
+ }
+
+ if (UNLIKELY(fiberManager_.options_.debugRecordStackUsed)) {
+ fiberManager_.stackHighWatermark_ =
+ std::max(fiberManager_.stackHighWatermark_,
+ nonMagicInBytes(fcontext_));
+ }
+
+ state_ = INVALID;
+
+ fiberManager_.activeFiber_ = nullptr;
+
+ auto fiber = reinterpret_cast<Fiber*>(
+ jumpContext(&fcontext_, &fiberManager_.mainContext_, 0));
+ assert(fiber == this);
+ }
+}
+
+intptr_t Fiber::preempt(State state) {
+ assert(fiberManager_.activeFiber_ == this);
+ assert(state_ == RUNNING);
+ assert(state != RUNNING);
+
+ fiberManager_.activeFiber_ = nullptr;
+ state_ = state;
+
+ recordStackPosition();
+
+ auto ret = jumpContext(&fcontext_, &fiberManager_.mainContext_, 0);
+
+ assert(fiberManager_.activeFiber_ == this);
+ assert(state_ == READY_TO_RUN);
+ state_ = RUNNING;
+
+ return ret;
+}
+
+}}
--- /dev/null
+/*
+ * Copyright 2015 Facebook, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#pragma once
+
+#include <functional>
+
+#include <boost/context/all.hpp>
+#include <boost/version.hpp>
+#include <folly/AtomicLinkedList.h>
+#include <folly/IntrusiveList.h>
+#include <folly/experimental/fibers/BoostContextCompatibility.h>
+
+namespace folly { namespace fibers {
+
+class Baton;
+class FiberManager;
+
+/**
+ * @class Fiber
+ * @brief Fiber object used by FiberManager to execute tasks.
+ *
+ * Each Fiber object can be executing at most one task at a time. In active
+ * phase it is running the task function and keeps its context.
+ * Fiber is also used to pass data to blocked task and thus unblock it.
+ * Each Fiber may be associated with a single FiberManager.
+ */
+class Fiber {
+ public:
+ /**
+ * Sets data for the blocked task
+ *
+ * @param data this data will be returned by await() when task is resumed.
+ */
+ void setData(intptr_t data);
+
+ Fiber(const Fiber&) = delete;
+ Fiber& operator=(const Fiber&) = delete;
+
+ ~Fiber();
+ private:
+ enum State {
+ INVALID, /**< Does't have task function */
+ NOT_STARTED, /**< Has task function, not started */
+ READY_TO_RUN, /**< Was started, blocked, then unblocked */
+ RUNNING, /**< Is running right now */
+ AWAITING, /**< Is currently blocked */
+ AWAITING_IMMEDIATE, /**< Was preempted to run an immediate function,
+ and will be resumed right away */
+ };
+
+ State state_{INVALID}; /**< current Fiber state */
+
+ friend class Baton;
+ friend class FiberManager;
+
+ explicit Fiber(FiberManager& fiberManager);
+
+ template <typename F>
+ void setFunction(F&& func);
+
+ template <typename F, typename G>
+ void setFunctionFinally(F&& func, G&& finally);
+
+ template <typename G>
+ void setReadyFunction(G&& func);
+
+ static void fiberFuncHelper(intptr_t fiber);
+ void fiberFunc();
+
+ /**
+ * Switch out of fiber context into the main context,
+ * performing necessary housekeeping for the new state.
+ *
+ * @param state New state, must not be RUNNING.
+ *
+ * @return The value passed back from the main context.
+ */
+ intptr_t preempt(State state);
+
+ /**
+ * Examines how much of the stack we used at this moment and
+ * registers with the FiberManager (for monitoring).
+ */
+ void recordStackPosition();
+
+ FiberManager& fiberManager_; /**< Associated FiberManager */
+ FContext fcontext_; /**< current task execution context */
+ intptr_t data_; /**< Used to keep some data with the Fiber */
+ std::function<void()> func_; /**< task function */
+ std::function<void()> readyFunc_; /**< function to be executed before jumping
+ to this fiber */
+
+ /**
+ * Points to next fiber in remote ready list
+ */
+ folly::AtomicLinkedListHook<Fiber> nextRemoteReady_;
+
+ static constexpr size_t kUserBufferSize = 256;
+ std::aligned_storage<kUserBufferSize>::type userBuffer_;
+
+ void* getUserBuffer();
+
+ std::function<void()> resultFunc_;
+ std::function<void()> finallyFunc_;
+
+ folly::IntrusiveListHook listHook_; /**< list hook for different FiberManager
+ queues */
+ pid_t threadId_{0};
+};
+
+}}
+
+#include <folly/experimental/fibers/Fiber-inl.h>
--- /dev/null
+/*
+ * Copyright 2015 Facebook, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#pragma once
+
+#include <cassert>
+
+#include <folly/Memory.h>
+#include <folly/Optional.h>
+#include <folly/Portability.h>
+#include <folly/ScopeGuard.h>
+#include <folly/experimental/fibers/Baton.h>
+#include <folly/experimental/fibers/Fiber.h>
+#include <folly/experimental/fibers/Promise.h>
+#include <folly/experimental/fibers/LoopController.h>
+#include <folly/futures/Try.h>
+
+namespace folly { namespace fibers {
+
+inline void FiberManager::ensureLoopScheduled() {
+ if (isLoopScheduled_) {
+ return;
+ }
+
+ isLoopScheduled_ = true;
+ loopController_->schedule();
+}
+
+inline void FiberManager::runReadyFiber(Fiber* fiber) {
+ assert(fiber->state_ == Fiber::NOT_STARTED ||
+ fiber->state_ == Fiber::READY_TO_RUN);
+
+ while (fiber->state_ == Fiber::NOT_STARTED ||
+ fiber->state_ == Fiber::READY_TO_RUN) {
+ activeFiber_ = fiber;
+ if (fiber->readyFunc_) {
+ fiber->readyFunc_();
+ }
+ jumpContext(&mainContext_, &fiber->fcontext_, fiber->data_);
+ if (fiber->state_ == Fiber::AWAITING_IMMEDIATE) {
+ try {
+ immediateFunc_();
+ } catch (...) {
+ exceptionCallback_(std::current_exception(), "running immediateFunc_");
+ }
+ immediateFunc_ = nullptr;
+ fiber->state_ = Fiber::READY_TO_RUN;
+ }
+ }
+
+ if (fiber->state_ == Fiber::AWAITING) {
+ awaitFunc_(*fiber);
+ awaitFunc_ = nullptr;
+ } else if (fiber->state_ == Fiber::INVALID) {
+ assert(fibersActive_ > 0);
+ --fibersActive_;
+ // Making sure that task functor is deleted once task is complete.
+ // NOTE: we must do it on main context, as the fiber is not
+ // running at this point.
+ fiber->func_ = nullptr;
+ fiber->resultFunc_ = nullptr;
+ if (fiber->finallyFunc_) {
+ try {
+ fiber->finallyFunc_();
+ } catch (...) {
+ exceptionCallback_(std::current_exception(), "running finallyFunc_");
+ }
+ fiber->finallyFunc_ = nullptr;
+ }
+
+ if (fibersPoolSize_ < options_.maxFibersPoolSize) {
+ fibersPool_.push_front(*fiber);
+ ++fibersPoolSize_;
+ } else {
+ delete fiber;
+ assert(fibersAllocated_ > 0);
+ --fibersAllocated_;
+ }
+ }
+}
+
+inline bool FiberManager::loopUntilNoReady() {
+ SCOPE_EXIT {
+ isLoopScheduled_ = false;
+ currentFiberManager_ = nullptr;
+ };
+
+ currentFiberManager_ = this;
+
+ bool hadRemoteFiber = true;
+ while (hadRemoteFiber) {
+ hadRemoteFiber = false;
+
+ while (!readyFibers_.empty()) {
+ auto& fiber = readyFibers_.front();
+ readyFibers_.pop_front();
+ runReadyFiber(&fiber);
+ }
+
+ remoteReadyQueue_.sweep(
+ [this, &hadRemoteFiber] (Fiber* fiber) {
+ runReadyFiber(fiber);
+ hadRemoteFiber = true;
+ }
+ );
+
+ remoteTaskQueue_.sweep(
+ [this, &hadRemoteFiber] (RemoteTask* taskPtr) {
+ std::unique_ptr<RemoteTask> task(taskPtr);
+ auto fiber = getFiber();
+ fiber->setFunction(std::move(task->func));
+ fiber->data_ = reinterpret_cast<intptr_t>(fiber);
+ runReadyFiber(fiber);
+ hadRemoteFiber = true;
+ }
+ );
+ }
+
+ return fibersActive_ > 0;
+}
+
+// We need this to be in a struct, not inlined in addTask, because clang crashes
+// otherwise.
+template <typename F>
+struct FiberManager::AddTaskHelper {
+ class Func;
+
+ static constexpr bool allocateInBuffer =
+ sizeof(Func) <= Fiber::kUserBufferSize;
+
+ class Func {
+ public:
+ Func(F&& func, FiberManager& fm) :
+ func_(std::forward<F>(func)), fm_(fm) {}
+
+ void operator()() {
+ try {
+ func_();
+ } catch (...) {
+ fm_.exceptionCallback_(std::current_exception(),
+ "running Func functor");
+ }
+ if (allocateInBuffer) {
+ this->~Func();
+ } else {
+ delete this;
+ }
+ }
+
+ private:
+ F func_;
+ FiberManager& fm_;
+ };
+};
+
+template <typename F>
+void FiberManager::addTask(F&& func) {
+ typedef AddTaskHelper<F> Helper;
+
+ auto fiber = getFiber();
+
+ if (Helper::allocateInBuffer) {
+ auto funcLoc = static_cast<typename Helper::Func*>(fiber->getUserBuffer());
+ new (funcLoc) typename Helper::Func(std::forward<F>(func), *this);
+
+ fiber->setFunction(std::ref(*funcLoc));
+ } else {
+ auto funcLoc = new typename Helper::Func(std::forward<F>(func), *this);
+
+ fiber->setFunction(std::ref(*funcLoc));
+ }
+
+ fiber->data_ = reinterpret_cast<intptr_t>(fiber);
+ readyFibers_.push_back(*fiber);
+
+ ensureLoopScheduled();
+}
+
+template <typename F, typename G>
+void FiberManager::addTaskReadyFunc(F&& func, G&& readyFunc) {
+ auto fiber = getFiber();
+ fiber->setFunction(std::forward<F>(func));
+ fiber->setReadyFunction(std::forward<G>(readyFunc));
+
+ fiber->data_ = reinterpret_cast<intptr_t>(fiber);
+ readyFibers_.push_back(*fiber);
+
+ ensureLoopScheduled();
+}
+
+template <typename F>
+void FiberManager::addTaskRemote(F&& func) {
+ auto task = folly::make_unique<RemoteTask>(std::move(func));
+ if (remoteTaskQueue_.insertHead(task.release())) {
+ loopController_->scheduleThreadSafe();
+ }
+}
+
+template <typename X>
+struct IsRvalueRefTry { static const bool value = false; };
+template <typename T>
+struct IsRvalueRefTry<folly::Try<T>&&> { static const bool value = true; };
+
+// We need this to be in a struct, not inlined in addTaskFinally, because clang
+// crashes otherwise.
+template <typename F, typename G>
+struct FiberManager::AddTaskFinallyHelper {
+ class Func;
+ class Finally;
+
+ typedef typename std::result_of<F()>::type Result;
+
+ static constexpr bool allocateInBuffer =
+ sizeof(Func) + sizeof(Finally) <= Fiber::kUserBufferSize;
+
+ class Finally {
+ public:
+ Finally(G&& finally,
+ FiberManager& fm) :
+ finally_(std::move(finally)),
+ fm_(fm) {
+ }
+
+ void operator()() {
+ try {
+ finally_(std::move(*result_));
+ } catch (...) {
+ fm_.exceptionCallback_(std::current_exception(),
+ "running Finally functor");
+ }
+
+ if (allocateInBuffer) {
+ this->~Finally();
+ } else {
+ delete this;
+ }
+ }
+
+ private:
+ friend class Func;
+
+ G finally_;
+ folly::Optional<folly::Try<Result>> result_;
+ FiberManager& fm_;
+ };
+
+ class Func {
+ public:
+ Func(F&& func, Finally& finally) :
+ func_(std::move(func)), result_(finally.result_) {}
+
+ void operator()() {
+ result_ = folly::makeTryFunction(std::move(func_));
+
+ if (allocateInBuffer) {
+ this->~Func();
+ } else {
+ delete this;
+ }
+ }
+
+ private:
+ F func_;
+ folly::Optional<folly::Try<Result>>& result_;
+ };
+};
+
+template <typename F, typename G>
+void FiberManager::addTaskFinally(F&& func, G&& finally) {
+ typedef typename std::result_of<F()>::type Result;
+
+ static_assert(
+ IsRvalueRefTry<typename FirstArgOf<G>::type>::value,
+ "finally(arg): arg must be Try<T>&&");
+ static_assert(
+ std::is_convertible<
+ Result,
+ typename std::remove_reference<
+ typename FirstArgOf<G>::type
+ >::type::element_type
+ >::value,
+ "finally(Try<T>&&): T must be convertible from func()'s return type");
+
+ auto fiber = getFiber();
+
+ typedef AddTaskFinallyHelper<F,G> Helper;
+
+ if (Helper::allocateInBuffer) {
+ auto funcLoc = static_cast<typename Helper::Func*>(
+ fiber->getUserBuffer());
+ auto finallyLoc = static_cast<typename Helper::Finally*>(
+ static_cast<void*>(funcLoc + 1));
+
+ new (finallyLoc) typename Helper::Finally(std::move(finally), *this);
+ new (funcLoc) typename Helper::Func(std::move(func), *finallyLoc);
+
+ fiber->setFunctionFinally(std::ref(*funcLoc), std::ref(*finallyLoc));
+ } else {
+ auto finallyLoc = new typename Helper::Finally(std::move(finally), *this);
+ auto funcLoc = new typename Helper::Func(std::move(func), *finallyLoc);
+
+ fiber->setFunctionFinally(std::ref(*funcLoc), std::ref(*finallyLoc));
+ }
+
+ fiber->data_ = reinterpret_cast<intptr_t>(fiber);
+ readyFibers_.push_back(*fiber);
+
+ ensureLoopScheduled();
+}
+
+template <typename F>
+typename std::result_of<F()>::type
+FiberManager::runInMainContext(F&& func) {
+ return runInMainContextHelper(std::forward<F>(func));
+}
+
+template <typename F>
+inline typename std::enable_if<
+ !std::is_same<typename std::result_of<F()>::type, void>::value,
+ typename std::result_of<F()>::type>::type
+FiberManager::runInMainContextHelper(F&& func) {
+ if (UNLIKELY(activeFiber_ == nullptr)) {
+ return func();
+ }
+
+ typedef typename std::result_of<F()>::type Result;
+
+ folly::Try<Result> result;
+ auto f = [&func, &result]() mutable {
+ result = folly::makeTryFunction(std::forward<F>(func));
+ };
+
+ immediateFunc_ = std::ref(f);
+ activeFiber_->preempt(Fiber::AWAITING_IMMEDIATE);
+
+ return std::move(result.value());
+}
+
+template <typename F>
+inline typename std::enable_if<
+ std::is_same<typename std::result_of<F()>::type, void>::value,
+ void>::type
+FiberManager::runInMainContextHelper(F&& func) {
+ if (UNLIKELY(activeFiber_ == nullptr)) {
+ func();
+ return;
+ }
+
+ immediateFunc_ = std::ref(func);
+ activeFiber_->preempt(Fiber::AWAITING_IMMEDIATE);
+}
+
+inline FiberManager& FiberManager::getFiberManager() {
+ assert(currentFiberManager_ != nullptr);
+ return *currentFiberManager_;
+}
+
+inline FiberManager* FiberManager::getFiberManagerUnsafe() {
+ return currentFiberManager_;
+}
+
+inline bool FiberManager::hasActiveFiber() {
+ return activeFiber_ != nullptr;
+}
+
+template <typename F>
+typename FirstArgOf<F>::type::value_type
+inline await(F&& func) {
+ typedef typename FirstArgOf<F>::type::value_type Result;
+
+ folly::Try<Result> result;
+
+ Baton baton;
+ baton.wait([&func, &result, &baton]() mutable {
+ func(Promise<Result>(result, baton));
+ });
+
+ return folly::moveFromTry(std::move(result));
+}
+
+}}
--- /dev/null
+/*
+ * Copyright 2015 Facebook, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#include "FiberManager.h"
+
+#include <sys/syscall.h>
+#include <unistd.h>
+
+#include <cassert>
+#include <stdexcept>
+
+#include <glog/logging.h>
+
+#include <folly/experimental/fibers/Fiber.h>
+#include <folly/experimental/fibers/LoopController.h>
+
+namespace folly { namespace fibers {
+
+__thread FiberManager* FiberManager::currentFiberManager_ = nullptr;
+
+FiberManager::FiberManager(std::unique_ptr<LoopController> loopController,
+ Options options) :
+ loopController_(std::move(loopController)),
+ options_(options),
+ exceptionCallback_([](std::exception_ptr e, std::string context) {
+ try {
+ std::rethrow_exception(e);
+ } catch (const std::exception& e) {
+ LOG(DFATAL) << "Exception " << typeid(e).name()
+ << " with message '" << e.what() << "' was thrown in "
+ << "FiberManager with context '" << context << "'";
+ throw;
+ } catch (...) {
+ LOG(DFATAL) << "Unknown exception was thrown in FiberManager with "
+ << "context '" << context << "'";
+ throw;
+ }
+ }),
+ timeoutManager_(std::make_shared<TimeoutController>(*loopController_)) {
+ loopController_->setFiberManager(this);
+}
+
+FiberManager::~FiberManager() {
+ if (isLoopScheduled_) {
+ loopController_->cancel();
+ }
+
+ Fiber* fiberIt;
+ Fiber* fiberItNext;
+ while (!fibersPool_.empty()) {
+ fibersPool_.pop_front_and_dispose([] (Fiber* fiber) {
+ delete fiber;
+ });
+ }
+ assert(readyFibers_.empty());
+ assert(fibersActive_ == 0);
+}
+
+LoopController& FiberManager::loopController() {
+ return *loopController_;
+}
+
+const LoopController& FiberManager::loopController() const {
+ return *loopController_;
+}
+
+bool FiberManager::hasTasks() const {
+ return fibersActive_ > 0 ||
+ !remoteReadyQueue_.empty() ||
+ !remoteTaskQueue_.empty();
+}
+
+Fiber* FiberManager::getFiber() {
+ Fiber* fiber = nullptr;
+ if (fibersPool_.empty()) {
+ fiber = new Fiber(*this);
+ ++fibersAllocated_;
+ } else {
+ fiber = &fibersPool_.front();
+ fibersPool_.pop_front();
+ assert(fibersPoolSize_ > 0);
+ --fibersPoolSize_;
+ }
+ ++fibersActive_;
+ assert(fiber);
+ return fiber;
+}
+
+void FiberManager::setExceptionCallback(FiberManager::ExceptionCallback ec) {
+ assert(ec);
+ exceptionCallback_ = std::move(ec);
+}
+
+size_t FiberManager::fibersAllocated() const {
+ return fibersAllocated_;
+}
+
+size_t FiberManager::fibersPoolSize() const {
+ return fibersPoolSize_;
+}
+
+size_t FiberManager::stackHighWatermark() const {
+ return stackHighWatermark_;
+}
+
+void FiberManager::remoteReadyInsert(Fiber* fiber) {
+ if (remoteReadyQueue_.insertHead(fiber)) {
+ loopController_->scheduleThreadSafe();
+ }
+}
+
+}}
--- /dev/null
+/*
+ * Copyright 2015 Facebook, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#pragma once
+
+#include <functional>
+#include <memory>
+#include <queue>
+#include <unordered_set>
+#include <vector>
+
+#include <folly/AtomicLinkedList.h>
+#include <folly/Likely.h>
+#include <folly/IntrusiveList.h>
+#include <folly/futures/Try.h>
+
+#include <folly/experimental/fibers/BoostContextCompatibility.h>
+#include <folly/experimental/fibers/Fiber.h>
+#include <folly/experimental/fibers/traits.h>
+
+#ifdef USE_GUARD_ALLOCATOR
+#include <folly/experimental/fibers/GuardPageAllocator.h>
+#endif
+
+namespace folly { namespace fibers {
+
+class Baton;
+class Fiber;
+class LoopController;
+class TimeoutController;
+
+/**
+ * @class FiberManager
+ * @brief Single-threaded task execution engine.
+ *
+ * FiberManager allows semi-parallel task execution on the same thread. Each
+ * task can notify FiberManager that it is blocked on something (via await())
+ * call. This will pause execution of this task and it will be resumed only
+ * when it is unblocked (via setData()).
+ */
+class FiberManager {
+ public:
+ struct Options {
+#ifdef FOLLY_SANITIZE_ADDRESS
+ /* ASAN needs a lot of extra stack space.
+ 16x is a conservative estimate, 8x also worked with tests
+ where it mattered. Note that overallocating here does not necessarily
+ increase RSS, since unused memory is pretty much free. */
+ static constexpr size_t kDefaultStackSize{16 * 16 * 1024};
+#else
+ static constexpr size_t kDefaultStackSize{16 * 1024};
+#endif
+ /**
+ * Maximum stack size for fibers which will be used for executing all the
+ * tasks.
+ */
+ size_t stackSize{kDefaultStackSize};
+
+ /**
+ * Record exact amount of stack used.
+ *
+ * This is fairly expensive: we fill each newly allocated stack
+ * with some known value and find the boundary of unused stack
+ * with linear search every time we surrender the stack back to fibersPool.
+ */
+ bool debugRecordStackUsed{false};
+
+ /**
+ * Keep at most this many free fibers in the pool.
+ * This way the total number of fibers in the system is always bounded
+ * by the number of active fibers + maxFibersPoolSize.
+ */
+ size_t maxFibersPoolSize{1000};
+
+ constexpr Options() {}
+ };
+
+ typedef std::function<void(std::exception_ptr, std::string)>
+ ExceptionCallback;
+
+ /**
+ * Initializes, but doesn't start FiberManager loop
+ *
+ * @param options FiberManager options
+ */
+ explicit FiberManager(std::unique_ptr<LoopController> loopController,
+ Options options = Options());
+
+ ~FiberManager();
+
+ /**
+ * Controller access.
+ */
+ LoopController& loopController();
+ const LoopController& loopController() const;
+
+ /**
+ * Keeps running ready tasks until the list of ready tasks is empty.
+ *
+ * @return True if there are any waiting tasks remaining.
+ */
+ bool loopUntilNoReady();
+
+ /**
+ * @return true if there are outstanding tasks.
+ */
+ bool hasTasks() const;
+
+ /**
+ * Sets exception callback which will be called if any of the tasks throws an
+ * exception.
+ *
+ * @param ec
+ */
+ void setExceptionCallback(ExceptionCallback ec);
+
+ /**
+ * Add a new task to be executed. Must be called from FiberManager's thread.
+ *
+ * @param func Task functor; must have a signature of `void func()`.
+ * The object will be destroyed once task execution is complete.
+ */
+ template <typename F>
+ void addTask(F&& func);
+
+ /**
+ * Add a new task to be executed, along with a function readyFunc_ which needs
+ * to be executed just before jumping to the ready fiber
+ *
+ * @param func Task functor; must have a signature of `T func()` for some T.
+ * @param readyFunc functor that needs to be executed just before jumping to
+ * ready fiber on the main context. This can for example be
+ * used to set up state before starting or resuming a fiber.
+ */
+ template <typename F, typename G>
+ void addTaskReadyFunc(F&& func, G&& readyFunc);
+
+ /**
+ * Add a new task to be executed. Safe to call from other threads.
+ *
+ * @param func Task function; must have a signature of `void func()`.
+ * The object will be destroyed once task execution is complete.
+ */
+ template <typename F>
+ void addTaskRemote(F&& func);
+
+ /**
+ * Add a new task. When the task is complete, execute finally(Try<Result>&&)
+ * on the main context.
+ *
+ * @param func Task functor; must have a signature of `T func()` for some T.
+ * @param finally Finally functor; must have a signature of
+ * `void finally(Try<T>&&)` and will be passed
+ * the result of func() (including the exception if occurred).
+ */
+ template <typename F, typename G>
+ void addTaskFinally(F&& func, G&& finally);
+
+ /**
+ * If called from a fiber, immediately switches to the FiberManager's context
+ * and runs func(), going back to the Fiber's context after completion.
+ * Outside a fiber, just calls func() directly.
+ *
+ * @return value returned by func().
+ */
+ template <typename F>
+ typename std::result_of<F()>::type
+ runInMainContext(F&& func);
+
+ /**
+ * @return How many fiber objects (and stacks) has this manager allocated.
+ */
+ size_t fibersAllocated() const;
+
+ /**
+ * @return How many of the allocated fiber objects are currently
+ * in the free pool.
+ */
+ size_t fibersPoolSize() const;
+
+ /**
+ * return true if running activeFiber_ is not nullptr.
+ */
+ bool hasActiveFiber();
+
+ /**
+ * @return What was the most observed fiber stack usage (in bytes).
+ */
+ size_t stackHighWatermark() const;
+
+ static FiberManager& getFiberManager();
+ static FiberManager* getFiberManagerUnsafe();
+
+ private:
+ friend class Baton;
+ friend class Fiber;
+ template <typename F>
+ struct AddTaskHelper;
+ template <typename F, typename G>
+ struct AddTaskFinallyHelper;
+
+ struct RemoteTask {
+ template <typename F>
+ explicit RemoteTask(F&& f) : func(std::move(f)) {}
+ std::function<void()> func;
+ folly::AtomicLinkedListHook<RemoteTask> nextRemoteTask;
+ };
+
+ typedef folly::IntrusiveList<Fiber, &Fiber::listHook_> FiberTailQueue;
+
+ Fiber* activeFiber_{nullptr}; /**< active fiber, nullptr on main context */
+
+ FiberTailQueue readyFibers_; /**< queue of fibers ready to be executed */
+ FiberTailQueue fibersPool_; /**< pool of unitialized Fiber objects */
+
+ size_t fibersAllocated_{0}; /**< total number of fibers allocated */
+ size_t fibersPoolSize_{0}; /**< total number of fibers in the free pool */
+ size_t fibersActive_{0}; /**< number of running or blocked fibers */
+
+ FContext::ContextStruct mainContext_; /**< stores loop function context */
+
+ std::unique_ptr<LoopController> loopController_;
+ bool isLoopScheduled_{false}; /**< was the ready loop scheduled to run? */
+
+ /**
+ * When we are inside FiberManager loop this points to FiberManager. Otherwise
+ * it's nullptr
+ */
+ static __thread FiberManager* currentFiberManager_;
+
+ /**
+ * runInMainContext implementation for non-void functions.
+ */
+ template <typename F>
+ typename std::enable_if<
+ !std::is_same<typename std::result_of<F()>::type, void>::value,
+ typename std::result_of<F()>::type>::type
+ runInMainContextHelper(F&& func);
+
+ /**
+ * runInMainContext implementation for void functions
+ */
+ template <typename F>
+ typename std::enable_if<
+ std::is_same<typename std::result_of<F()>::type, void>::value,
+ void>::type
+ runInMainContextHelper(F&& func);
+
+ /**
+ * Allocator used to allocate stack for Fibers in the pool.
+ * Allocates stack on the stack of the main context.
+ */
+#ifdef USE_GUARD_ALLOCATOR
+ /* This is too slow for production use; can be fixed
+ if we allocated all stack storage once upfront */
+ GuardPageAllocator stackAllocator_;
+#else
+ std::allocator<unsigned char> stackAllocator_;
+#endif
+
+ const Options options_; /**< FiberManager options */
+
+ /**
+ * Largest observed individual Fiber stack usage in bytes.
+ */
+ size_t stackHighWatermark_{0};
+
+ /**
+ * Schedules a loop with loopController (unless already scheduled before).
+ */
+ void ensureLoopScheduled();
+
+ /**
+ * @return An initialized Fiber object from the pool
+ */
+ Fiber* getFiber();
+
+ /**
+ * Function passed to the await call.
+ */
+ std::function<void(Fiber&)> awaitFunc_;
+
+ /**
+ * Function passed to the runInMainContext call.
+ */
+ std::function<void()> immediateFunc_;
+
+ ExceptionCallback exceptionCallback_; /**< task exception callback */
+
+ folly::AtomicLinkedList<Fiber, &Fiber::nextRemoteReady_> remoteReadyQueue_;
+
+ folly::AtomicLinkedList<RemoteTask, &RemoteTask::nextRemoteTask>
+ remoteTaskQueue_;
+
+ std::shared_ptr<TimeoutController> timeoutManager_;
+
+ void runReadyFiber(Fiber* fiber);
+ void remoteReadyInsert(Fiber* fiber);
+};
+
+/**
+ * @return true iff we are running in a fiber's context
+ */
+inline bool onFiber() {
+ auto fm = FiberManager::getFiberManagerUnsafe();
+ return fm ? fm->hasActiveFiber() : false;
+}
+
+/**
+ * Add a new task to be executed.
+ *
+ * @param func Task functor; must have a signature of `void func()`.
+ * The object will be destroyed once task execution is complete.
+ */
+template <typename F>
+inline void addTask(F&& func) {
+ return FiberManager::getFiberManager().addTask(std::forward<F>(func));
+}
+
+/**
+ * Add a new task. When the task is complete, execute finally(Try<Result>&&)
+ * on the main context.
+ * Task functor is run and destroyed on the fiber context.
+ * Finally functor is run and destroyed on the main context.
+ *
+ * @param func Task functor; must have a signature of `T func()` for some T.
+ * @param finally Finally functor; must have a signature of
+ * `void finally(Try<T>&&)` and will be passed
+ * the result of func() (including the exception if occurred).
+ */
+template <typename F, typename G>
+inline void addTaskFinally(F&& func, G&& finally) {
+ return FiberManager::getFiberManager().addTaskFinally(
+ std::forward<F>(func), std::forward<G>(finally));
+}
+
+/**
+ * Blocks task execution until given promise is fulfilled.
+ *
+ * Calls function passing in a Promise<T>, which has to be fulfilled.
+ *
+ * @return data which was used to fulfill the promise.
+ */
+template <typename F>
+typename FirstArgOf<F>::type::value_type
+inline await(F&& func);
+
+/**
+ * If called from a fiber, immediately switches to the FiberManager's context
+ * and runs func(), going back to the Fiber's context after completion.
+ * Outside a fiber, just calls func() directly.
+ *
+ * @return value returned by func().
+ */
+template <typename F>
+typename std::result_of<F()>::type
+inline runInMainContext(F&& func) {
+ auto fm = FiberManager::getFiberManagerUnsafe();
+ if (UNLIKELY(fm == nullptr)) {
+ return func();
+ }
+ return fm->runInMainContext(std::forward<F>(func));
+}
+
+}}
+
+#include "FiberManager-inl.h"
--- /dev/null
+/*
+ * Copyright 2015 Facebook, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#include <folly/experimental/fibers/FiberManagerMap.h>
+
+#include <memory>
+#include <unordered_map>
+
+namespace folly { namespace fibers {
+
+namespace detail {
+
+thread_local std::unordered_map<folly::EventBase*, FiberManager*>
+ localFiberManagerMap;
+std::unordered_map<folly::EventBase*, std::unique_ptr<FiberManager>>
+ fiberManagerMap;
+std::mutex fiberManagerMapMutex;
+
+FiberManager* getFiberManagerThreadSafe(folly::EventBase& evb,
+ const FiberManager::Options& opts) {
+ std::lock_guard<std::mutex> lg(fiberManagerMapMutex);
+
+ auto it = fiberManagerMap.find(&evb);
+ if (LIKELY(it != fiberManagerMap.end())) {
+ return it->second.get();
+ }
+
+ auto loopController = folly::make_unique<EventBaseLoopController>();
+ loopController->attachEventBase(evb);
+ auto fiberManager =
+ folly::make_unique<FiberManager>(std::move(loopController), opts);
+ auto result = fiberManagerMap.emplace(&evb, std::move(fiberManager));
+ return result.first->second.get();
+}
+
+} // detail namespace
+
+FiberManager& getFiberManager(folly::EventBase& evb,
+ const FiberManager::Options& opts) {
+ auto it = detail::localFiberManagerMap.find(&evb);
+ if (LIKELY(it != detail::localFiberManagerMap.end())) {
+ return *(it->second);
+ }
+
+ return *(detail::localFiberManagerMap[&evb] =
+ detail::getFiberManagerThreadSafe(evb, opts));
+}
+
+}}
--- /dev/null
+/*
+ * Copyright 2015 Facebook, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#pragma once
+
+#include <folly/experimental/fibers/FiberManager.h>
+#include <folly/experimental/fibers/EventBaseLoopController.h>
+
+namespace folly { namespace fibers {
+
+FiberManager& getFiberManager(
+ folly::EventBase& evb,
+ const FiberManager::Options& opts = FiberManager::Options());
+
+}}
--- /dev/null
+/*
+ * Copyright 2015 Facebook, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#include <folly/experimental/fibers/FiberManager.h>
+
+namespace folly { namespace fibers {
+
+namespace {
+
+template <class F, class G>
+typename std::enable_if<
+ !std::is_same<typename std::result_of<F()>::type, void>::value, void>::type
+inline callFuncs(F&& f, G&& g, size_t id) {
+ g(id, f());
+}
+
+template <class F, class G>
+typename std::enable_if<
+ std::is_same<typename std::result_of<F()>::type, void>::value, void>::type
+inline callFuncs(F&& f, G&& g, size_t id) {
+ f();
+ g(id);
+}
+
+} // anonymous namespace
+
+template <class InputIterator, class F>
+inline void forEach(InputIterator first, InputIterator last, F&& f) {
+ if (first == last) {
+ return;
+ }
+
+ typedef typename std::iterator_traits<InputIterator>::value_type FuncType;
+
+ size_t tasksTodo = 1;
+ std::exception_ptr e;
+ Baton baton;
+
+#ifdef __clang__
+#pragma clang diagnostic push // ignore generalized lambda capture warning
+#pragma clang diagnostic ignored "-Wc++1y-extensions"
+#endif
+ auto taskFunc =
+ [&tasksTodo, &e, &f, &baton] (size_t id, FuncType&& func) {
+ return [id, &tasksTodo, &e, &f, &baton,
+ func_ = std::forward<FuncType>(func)]() mutable {
+ try {
+ callFuncs(std::forward<FuncType>(func_), f, id);
+ } catch (...) {
+ e = std::current_exception();
+ }
+ if (--tasksTodo == 0) {
+ baton.post();
+ }
+ };
+ };
+#ifdef __clang__
+#pragma clang diagnostic pop
+#endif
+
+ auto firstTask = first;
+ ++first;
+
+ for (size_t i = 1; first != last; ++i, ++first, ++tasksTodo) {
+ addTask(taskFunc(i, std::move(*first)));
+ }
+
+ taskFunc(0, std::move(*firstTask))();
+ baton.wait();
+
+ if (e != std::exception_ptr()) {
+ std::rethrow_exception(e);
+ }
+}
+
+}} // folly::fibers
--- /dev/null
+/*
+ * Copyright 2015 Facebook, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#pragma once
+
+namespace folly { namespace fibers {
+
+/**
+ * Schedules several tasks and blocks until all of them are completed.
+ * In the process of their successfull completion given callback would be called
+ * for each of them with the index of the task and the result it returned (if
+ * not void).
+ * If any of these n tasks throws an exception, this exception will be
+ * re-thrown, but only when all tasks are complete. If several tasks throw
+ * exceptions one of them will be re-thrown. Callback won't be called for
+ * tasks that throw exception.
+ *
+ * @param first Range of tasks to be scheduled
+ * @param last
+ * @param F callback to call for each result.
+ * In case of each task returning void it should be callable
+ * F(size_t id)
+ * otherwise should be callable
+ * F(size_t id, Result)
+ */
+template <class InputIterator, class F>
+inline void forEach(InputIterator first, InputIterator last, F&& f);
+
+}} // folly::fibers
+
+#include <folly/experimental/fibers/ForEach-inl.h>
--- /dev/null
+/*
+ * Copyright 2015 Facebook, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#pragma once
+
+#include <folly/Baton.h>
+
+#include <folly/experimental/fibers/Baton.h>
+
+namespace folly { namespace fibers {
+
+typedef Baton GenericBaton;
+
+}}
--- /dev/null
+/*
+ * Copyright 2015 Facebook, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#pragma once
+
+#include <sys/mman.h>
+#include <unistd.h>
+
+#include <glog/logging.h>
+
+namespace folly { namespace fibers {
+
+namespace {
+size_t pagesize() {
+ static const size_t pagesize = sysconf(_SC_PAGESIZE);
+ return pagesize;
+}
+
+/* Returns a multiple of pagesize() enough to store size + one guard page */
+size_t allocSize(size_t size) {
+ return pagesize() * ((size + pagesize() - 1)/pagesize() + 1);
+}
+}
+
+unsigned char* GuardPageAllocator::allocate(size_t size) {
+ /* We allocate minimum number of pages required, plus a guard page.
+ Since we use this for stack storage, requested allocation is aligned
+ at the top of the allocated pages, while the guard page is at the bottom.
+
+ -- increasing addresses -->
+ Guard page Normal pages
+ |xxxxxxxxxx|..........|..........|
+ <- size -------->
+ return value -^
+ */
+ void* p = nullptr;
+ PCHECK(!::posix_memalign(&p, pagesize(), allocSize(size)));
+
+ /* Try to protect first page
+ (stack grows downwards from last allocated address), ignore errors */
+ ::mprotect(p, pagesize(), PROT_NONE);
+ /* Return pointer to top 'size' bytes in allocated storage */
+ auto up = reinterpret_cast<unsigned char*>(p) + allocSize(size) - size;
+ assert(up >= reinterpret_cast<unsigned char*>(p) + pagesize());
+ return up;
+}
+
+void GuardPageAllocator::deallocate(unsigned char* up, size_t size) {
+ /* Get allocation base */
+ auto p = up + size - allocSize(size);
+ /* Try to unprotect the page for memory allocator to re-use,
+ ignore errors (in cases we failed to protect in the first place */
+ ::mprotect(p, pagesize(), PROT_READ|PROT_WRITE);
+ ::free(p);
+}
+
+}} // folly::fibers
--- /dev/null
+/*
+ * Copyright 2015 Facebook, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#pragma once
+
+namespace folly { namespace fibers {
+
+/**
+ * Stack allocator that protects an extra memory page after
+ * the end of the stack.
+ */
+class GuardPageAllocator {
+ public:
+ inline unsigned char* allocate(size_t size);
+ inline void deallocate(unsigned char* up, size_t size);
+};
+
+}} // folly::fibers
+
+#include "GuardPageAllocator-inl.h"
--- /dev/null
+/*
+ * Copyright 2015 Facebook, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#pragma once
+
+#include <chrono>
+#include <functional>
+
+namespace folly { namespace fibers {
+
+class FiberManager;
+
+class LoopController {
+ public:
+ typedef std::chrono::steady_clock Clock;
+ typedef std::chrono::time_point<Clock> TimePoint;
+
+ virtual ~LoopController() {}
+
+ /**
+ * Called by FiberManager to associate itself with the LoopController.
+ */
+ virtual void setFiberManager(FiberManager*) = 0;
+
+ /**
+ * Called by FiberManager to schedule the loop function run
+ * at some point in the future.
+ */
+ virtual void schedule() = 0;
+
+ /**
+ * Same as schedule(), but safe to call from any thread.
+ */
+ virtual void scheduleThreadSafe() = 0;
+
+ /**
+ * Called by FiberManager to cancel a previously scheduled
+ * loop function run.
+ */
+ virtual void cancel() = 0;
+
+ /**
+ * Called by FiberManager to schedule some function to be run at some time.
+ */
+ virtual void timedSchedule(std::function<void()> func, TimePoint time) = 0;
+};
+
+}} // folly::fibers
--- /dev/null
+SUBDIRS = test
--- /dev/null
+/*
+ * Copyright 2015 Facebook, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#include <folly/experimental/fibers/Baton.h>
+
+namespace folly { namespace fibers {
+
+template <class T>
+Promise<T>::Promise(folly::Try<T>& value, Baton& baton) :
+ value_(&value), baton_(&baton)
+{}
+
+template <class T>
+Promise<T>::Promise(Promise&& other) noexcept :
+value_(other.value_), baton_(other.baton_) {
+ other.value_ = nullptr;
+ other.baton_ = nullptr;
+}
+
+template <class T>
+Promise<T>& Promise<T>::operator=(Promise&& other) {
+ std::swap(value_, other.value_);
+ std::swap(baton_, other.baton_);
+ return *this;
+}
+
+template <class T>
+void Promise<T>::throwIfFulfilled() const {
+ if (!value_) {
+ throw std::logic_error("promise already fulfilled");
+ }
+}
+
+template <class T>
+Promise<T>::~Promise() {
+ if (value_) {
+ setException(folly::make_exception_wrapper<std::logic_error>(
+ "promise not fulfilled"));
+ }
+}
+
+template <class T>
+void Promise<T>::setException(folly::exception_wrapper e) {
+ fulfilTry(folly::Try<T>(e));
+}
+
+template <class T>
+void Promise<T>::fulfilTry(folly::Try<T>&& t) {
+ throwIfFulfilled();
+
+ *value_ = std::move(t);
+ baton_->post();
+
+ value_ = nullptr;
+ baton_ = nullptr;
+}
+
+template <class T>
+template <class M>
+void Promise<T>::setValue(M&& v) {
+ static_assert(!std::is_same<T, void>::value,
+ "Use setValue() instead");
+
+ fulfilTry(folly::Try<T>(std::forward<M>(v)));
+}
+
+template <class T>
+void Promise<T>::setValue() {
+ static_assert(std::is_same<T, void>::value,
+ "Use setValue(value) instead");
+
+ fulfilTry(folly::Try<void>());
+}
+
+template <class T>
+template <class F>
+void Promise<T>::fulfil(F&& func) {
+ fulfilTry(makeTryFunction(std::forward<F>(func)));
+}
+
+}}
--- /dev/null
+/*
+ * Copyright 2015 Facebook, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#pragma once
+
+#include <folly/experimental/fibers/traits.h>
+#include <folly/futures/Try.h>
+
+namespace folly { namespace fibers {
+
+class Baton;
+
+template <typename F>
+typename FirstArgOf<F>::type::value_type
+inline await(F&& func);
+
+template <typename T>
+class Promise {
+ public:
+ typedef T value_type;
+
+ ~Promise();
+
+ // not copyable
+ Promise(const Promise&) = delete;
+ Promise& operator=(const Promise&) = delete;
+
+ // movable
+ Promise(Promise&&) noexcept;
+ Promise& operator=(Promise&&);
+
+ /** Fulfil this promise (only for Promise<void>) */
+ void setValue();
+
+ /** Set the value (use perfect forwarding for both move and copy) */
+ template <class M>
+ void setValue(M&& value);
+
+ /**
+ * Fulfill the promise with a given try
+ *
+ * @param t
+ */
+ void fulfilTry(folly::Try<T>&& t);
+
+ /** Fulfil this promise with the result of a function that takes no
+ arguments and returns something implicitly convertible to T.
+ Captures exceptions. e.g.
+
+ p.fulfil([] { do something that may throw; return a T; });
+ */
+ template <class F>
+ void fulfil(F&& func);
+
+ /** Fulfil the Promise with an exception_wrapper, e.g.
+ auto ew = folly::try_and_catch<std::exception>([]{ ... });
+ if (ew) {
+ p.setException(std::move(ew));
+ }
+ */
+ void setException(folly::exception_wrapper);
+
+ private:
+ template <typename F>
+ friend typename FirstArgOf<F>::type::value_type await(F&&);
+
+ Promise(folly::Try<T>& value, Baton& baton);
+ folly::Try<T>* value_;
+ Baton* baton_;
+
+ void throwIfFulfilled() const;
+
+ template <class F>
+ typename std::enable_if<
+ std::is_convertible<typename std::result_of<F()>::type, T>::value &&
+ !std::is_same<T, void>::value>::type
+ fulfilHelper(F&& func);
+
+ template <class F>
+ typename std::enable_if<
+ std::is_same<typename std::result_of<F()>::type, void>::value &&
+ std::is_same<T, void>::value>::type
+ fulfilHelper(F&& func);
+};
+
+}}
+
+#include <folly/experimental/fibers/Promise-inl.h>
--- /dev/null
+/*
+ * Copyright 2015 Facebook, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#pragma once
+
+#include <folly/Likely.h>
+
+#include <folly/experimental/fibers/LoopController.h>
+
+namespace folly { namespace fibers {
+
+class FiberManager;
+
+class SimpleLoopController : public LoopController {
+ public:
+ SimpleLoopController()
+ : fm_(nullptr),
+ stopRequested_(false) {
+ }
+
+ /**
+ * Run FiberManager loop; if no ready task are present,
+ * run provided function. Stops after both stop() has been called
+ * and no waiting tasks remain.
+ */
+ template <typename F>
+ void loop(F&& func) {
+ bool waiting = false;
+ stopRequested_ = false;
+
+ while (LIKELY(waiting || !stopRequested_)) {
+ func();
+
+ auto time = Clock::now();
+
+ for (size_t i=0; i<scheduledFuncs_.size(); ++i) {
+ if (scheduledFuncs_[i].first <= time) {
+ scheduledFuncs_[i].second();
+ swap(scheduledFuncs_[i], scheduledFuncs_.back());
+ scheduledFuncs_.pop_back();
+ --i;
+ }
+ }
+
+ if (scheduled_) {
+ scheduled_ = false;
+ waiting = fm_->loopUntilNoReady();
+ }
+ }
+ }
+
+ /**
+ * Requests exit from loop() as soon as all waiting tasks complete.
+ */
+ void stop() {
+ stopRequested_ = true;
+ }
+
+ int remoteScheduleCalled() const {
+ return remoteScheduleCalled_;
+ }
+
+ void schedule() override {
+ scheduled_ = true;
+ }
+
+ void timedSchedule(std::function<void()> func, TimePoint time) override {
+ scheduledFuncs_.push_back({time, std::move(func)});
+ }
+
+ private:
+ FiberManager* fm_;
+ std::atomic<bool> scheduled_{false};
+ bool stopRequested_;
+ std::atomic<int> remoteScheduleCalled_{0};
+ std::vector<std::pair<TimePoint, std::function<void()>>> scheduledFuncs_;
+
+ /* LoopController interface */
+
+ void setFiberManager(FiberManager* fm) override {
+ fm_ = fm;
+ }
+
+ void cancel() override {
+ scheduled_ = false;
+ }
+
+ void scheduleThreadSafe() override {
+ ++remoteScheduleCalled_;
+ scheduled_ = true;
+ }
+
+ friend class FiberManager;
+};
+
+}} // folly::fibers
--- /dev/null
+/*
+ * Copyright 2015 Facebook, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#include "TimeoutController.h"
+#include <folly/Memory.h>
+
+namespace folly { namespace fibers {
+
+TimeoutController::TimeoutController(LoopController& loopController) :
+ nextTimeout_(TimePoint::max()),
+ loopController_(loopController) {}
+
+intptr_t TimeoutController::registerTimeout(std::function<void()> f,
+ Duration duration) {
+ auto& list = [&]() -> TimeoutHandleList& {
+ for (auto& bucket : timeoutHandleBuckets_) {
+ if (bucket.first == duration) {
+ return *bucket.second;
+ }
+ }
+
+ timeoutHandleBuckets_.emplace_back(duration,
+ folly::make_unique<TimeoutHandleList>());
+ return *timeoutHandleBuckets_.back().second;
+ }();
+
+ auto timeout = Clock::now() + duration;
+ list.emplace(std::move(f), timeout, list);
+
+ if (timeout < nextTimeout_) {
+ nextTimeout_ = timeout;
+ scheduleRun();
+ }
+
+ return reinterpret_cast<intptr_t>(&list.back());
+}
+
+void TimeoutController::runTimeouts(TimePoint time) {
+ auto now = Clock::now();
+ // Make sure we don't skip some events if function was run before actual time.
+ if (time < now) {
+ time = now;
+ }
+ if (nextTimeout_ > time) {
+ return;
+ }
+
+ nextTimeout_ = TimePoint::max();
+
+ for (auto& bucket : timeoutHandleBuckets_) {
+ auto& list = *bucket.second;
+
+ while (!list.empty()) {
+ if (!list.front().canceled) {
+ if (list.front().timeout > time) {
+ nextTimeout_ = std::min(nextTimeout_, list.front().timeout);
+ break;
+ }
+
+ list.front().func();
+ }
+ list.pop();
+ }
+ }
+
+ if (nextTimeout_ != TimePoint::max()) {
+ scheduleRun();
+ }
+}
+
+void TimeoutController::scheduleRun() {
+ auto time = nextTimeout_;
+ std::weak_ptr<TimeoutController> timeoutControllerWeak = shared_from_this();
+
+ loopController_.timedSchedule([timeoutControllerWeak, time]() {
+ if (auto timeoutController = timeoutControllerWeak.lock()) {
+ timeoutController->runTimeouts(time);
+ }
+ }, time);
+}
+
+void TimeoutController::cancel(intptr_t p) {
+ auto handle = reinterpret_cast<TimeoutHandle*>(p);
+ handle->canceled = true;
+
+ auto& list = handle->list;
+
+ while (!list.empty() && list.front().canceled) {
+ list.pop();
+ }
+}
+
+}}
--- /dev/null
+/*
+ * Copyright 2015 Facebook, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#pragma once
+
+#include <chrono>
+#include <functional>
+#include <memory>
+#include <queue>
+
+#include <boost/intrusive/list.hpp>
+
+#include <folly/Likely.h>
+
+#include <folly/experimental/fibers/LoopController.h>
+
+namespace folly { namespace fibers {
+
+class TimeoutController :
+ public std::enable_shared_from_this<TimeoutController> {
+ public:
+ typedef std::chrono::steady_clock Clock;
+ typedef std::chrono::time_point<Clock> TimePoint;
+ typedef Clock::duration Duration;
+
+ explicit TimeoutController(LoopController& loopController);
+
+ intptr_t registerTimeout(std::function<void()> f, Duration duration);
+ void cancel(intptr_t id);
+
+ void runTimeouts(TimePoint time);
+
+ private:
+ void scheduleRun();
+
+ class TimeoutHandle;
+ typedef std::queue<TimeoutHandle> TimeoutHandleList;
+ typedef std::unique_ptr<TimeoutHandleList> TimeoutHandleListPtr;
+
+ struct TimeoutHandle {
+ TimeoutHandle(std::function<void()> func_,
+ TimePoint timeout_,
+ TimeoutHandleList& list_) :
+ func(std::move(func_)), timeout(timeout_), list(list_) {}
+
+ std::function<void()> func;
+ bool canceled{false};
+ TimePoint timeout;
+ TimeoutHandleList& list;
+ };
+
+ std::vector<std::pair<Duration, TimeoutHandleListPtr>> timeoutHandleBuckets_;
+ TimePoint nextTimeout_;
+ LoopController& loopController_;
+};
+
+}}
--- /dev/null
+/*
+ * Copyright 2015 Facebook, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#include <folly/Optional.h>
+
+#include <folly/experimental/fibers/FiberManager.h>
+#include <folly/experimental/fibers/ForEach.h>
+
+namespace folly { namespace fibers {
+
+template <class InputIterator>
+typename std::vector<
+ typename std::enable_if<
+ !std::is_same<
+ typename std::result_of<
+ typename std::iterator_traits<InputIterator>::value_type()>::type, void
+ >::value,
+ typename std::pair<
+ size_t,
+ typename std::result_of<
+ typename std::iterator_traits<InputIterator>::value_type()>::type>
+ >::type
+ >
+whenN(InputIterator first, InputIterator last, size_t n) {
+ typedef typename std::result_of<
+ typename std::iterator_traits<InputIterator>::value_type()>::type Result;
+ assert(n > 0);
+ assert(n <= std::distance(first, last));
+
+ struct Context {
+ std::vector<std::pair<size_t, Result>> results;
+ size_t tasksTodo;
+ std::exception_ptr e;
+ folly::Optional<Promise<void>> promise;
+
+ Context(size_t tasksTodo_) : tasksTodo(tasksTodo_) {
+ this->results.reserve(tasksTodo_);
+ }
+ };
+ auto context = std::make_shared<Context>(n);
+
+ await(
+ [first, last, context](Promise<void> promise) mutable {
+ context->promise = std::move(promise);
+ for (size_t i = 0; first != last; ++i, ++first) {
+#ifdef __clang__
+#pragma clang diagnostic push // ignore generalized lambda capture warning
+#pragma clang diagnostic ignored "-Wc++1y-extensions"
+#endif
+ addTask(
+ [i, context, f = std::move(*first)]() {
+ try {
+ auto result = f();
+ if (context->tasksTodo == 0) {
+ return;
+ }
+ context->results.emplace_back(i, std::move(result));
+ } catch (...) {
+ if (context->tasksTodo == 0) {
+ return;
+ }
+ context->e = std::current_exception();
+ }
+ if (--context->tasksTodo == 0) {
+ context->promise->setValue();
+ }
+ });
+#ifdef __clang__
+#pragma clang diagnostic pop
+#endif
+ }
+ });
+
+ if (context->e != std::exception_ptr()) {
+ std::rethrow_exception(context->e);
+ }
+
+ return std::move(context->results);
+}
+
+template <class InputIterator>
+typename std::enable_if<
+ std::is_same<
+ typename std::result_of<
+ typename std::iterator_traits<InputIterator>::value_type()>::type, void
+ >::value, std::vector<size_t>>::type
+whenN(InputIterator first, InputIterator last, size_t n) {
+ assert(n > 0);
+ assert(n <= std::distance(first, last));
+
+ struct Context {
+ std::vector<size_t> taskIndices;
+ std::exception_ptr e;
+ size_t tasksTodo;
+ folly::Optional<Promise<void>> promise;
+
+ Context(size_t tasksTodo_) : tasksTodo(tasksTodo_) {
+ this->taskIndices.reserve(tasksTodo_);
+ }
+ };
+ auto context = std::make_shared<Context>(n);
+
+ await(
+ [first, last, context](Promise<void> promise) mutable {
+ context->promise = std::move(promise);
+ for (size_t i = 0; first != last; ++i, ++first) {
+#ifdef __clang__
+#pragma clang diagnostic push // ignore generalized lambda capture warning
+#pragma clang diagnostic ignored "-Wc++1y-extensions"
+#endif
+ addTask(
+ [i, context, f = std::move(*first)]() {
+ try {
+ f();
+ if (context->tasksTodo == 0) {
+ return;
+ }
+ context->taskIndices.push_back(i);
+ } catch (...) {
+ if (context->tasksTodo == 0) {
+ return;
+ }
+ context->e = std::current_exception();
+ }
+ if (--context->tasksTodo == 0) {
+ context->promise->setValue();
+ }
+ });
+#ifdef __clang__
+#pragma clang diagnostic pop
+#endif
+ }
+ });
+
+ if (context->e != std::exception_ptr()) {
+ std::rethrow_exception(context->e);
+ }
+
+ return context->taskIndices;
+}
+
+template <class InputIterator>
+typename std::vector<
+ typename std::enable_if<
+ !std::is_same<
+ typename std::result_of<
+ typename std::iterator_traits<InputIterator>::value_type()>::type, void
+ >::value,
+ typename std::result_of<
+ typename std::iterator_traits<InputIterator>::value_type()>::type>::type>
+inline whenAll(InputIterator first, InputIterator last) {
+ typedef typename std::result_of<
+ typename std::iterator_traits<InputIterator>::value_type()>::type Result;
+ size_t n = std::distance(first, last);
+ std::vector<Result> results;
+ std::vector<size_t> order(n);
+ results.reserve(n);
+
+ forEach(first, last,
+ [&results, &order] (size_t id, Result result) {
+ order[id] = results.size();
+ results.emplace_back(std::move(result));
+ });
+ assert(results.size() == n);
+
+ std::vector<Result> orderedResults;
+ orderedResults.reserve(n);
+
+ for (size_t i = 0; i < n; ++i) {
+ orderedResults.emplace_back(std::move(results[order[i]]));
+ }
+
+ return orderedResults;
+}
+
+template <class InputIterator>
+typename std::enable_if<
+ std::is_same<
+ typename std::result_of<
+ typename std::iterator_traits<InputIterator>::value_type()>::type, void
+ >::value, void>::type
+inline whenAll(InputIterator first, InputIterator last) {
+ forEach(first, last, [] (size_t id) {});
+}
+
+template <class InputIterator>
+typename std::enable_if<
+ !std::is_same<
+ typename std::result_of<
+ typename std::iterator_traits<InputIterator>::value_type()>::type, void
+ >::value,
+ typename std::pair<
+ size_t,
+ typename std::result_of<
+ typename std::iterator_traits<InputIterator>::value_type()>::type>
+ >::type
+inline whenAny(InputIterator first, InputIterator last) {
+ auto result = whenN(first, last, 1);
+ assert(result.size() == 1);
+ return std::move(result[0]);
+}
+
+template <class InputIterator>
+typename std::enable_if<
+ std::is_same<
+ typename std::result_of<
+ typename std::iterator_traits<InputIterator>::value_type()>::type, void
+ >::value, size_t>::type
+inline whenAny(InputIterator first, InputIterator last) {
+ auto result = whenN(first, last, 1);
+ assert(result.size() == 1);
+ return std::move(result[0]);
+}
+
+}}
--- /dev/null
+/*
+ * Copyright 2015 Facebook, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#pragma once
+
+namespace folly { namespace fibers {
+
+/**
+ * Schedules several tasks and blocks until n of these tasks are completed.
+ * If any of these n tasks throws an exception, this exception will be
+ * re-thrown, but only when n tasks are complete. If several tasks throw
+ * exceptions one of them will be re-thrown.
+ *
+ * @param first Range of tasks to be scheduled
+ * @param last
+ * @param n Number of tasks to wait for
+ *
+ * @return vector of pairs (task index, return value of task)
+ */
+template <class InputIterator>
+typename std::vector<
+ typename std::enable_if<
+ !std::is_same<
+ typename std::result_of<
+ typename std::iterator_traits<InputIterator>::value_type()>::type,
+ void>::value,
+ typename std::pair<
+ size_t,
+ typename std::result_of<
+ typename std::iterator_traits<InputIterator>::value_type()>::type>
+ >::type
+ >
+inline whenN(InputIterator first, InputIterator last, size_t n);
+
+/**
+ * whenN specialization for functions returning void
+ *
+ * @param first Range of tasks to be scheduled
+ * @param last
+ * @param n Number of tasks to wait for
+ *
+ * @return vector of completed task indices
+ */
+template <class InputIterator>
+typename std::enable_if<
+ std::is_same<
+ typename std::result_of<
+ typename std::iterator_traits<InputIterator>::value_type()>::type, void
+ >::value, std::vector<size_t>>::type
+inline whenN(InputIterator first, InputIterator last, size_t n);
+
+/**
+ * Schedules several tasks and blocks until all of these tasks are completed.
+ * If any of the tasks throws an exception, this exception will be re-thrown,
+ * but only when all the tasks are complete. If several tasks throw exceptions
+ * one of them will be re-thrown.
+ *
+ * @param first Range of tasks to be scheduled
+ * @param last
+ *
+ * @return vector of values returned by tasks
+ */
+template <class InputIterator>
+typename std::vector<
+ typename std::enable_if<
+ !std::is_same<
+ typename std::result_of<
+ typename std::iterator_traits<InputIterator>::value_type()>::type,
+ void>::value,
+ typename std::result_of<
+ typename std::iterator_traits<InputIterator>::value_type()>::type>::type
+ >
+inline whenAll(InputIterator first, InputIterator last);
+
+/**
+ * whenAll specialization for functions returning void
+ *
+ * @param first Range of tasks to be scheduled
+ * @param last
+ */
+template <class InputIterator>
+typename std::enable_if<
+ std::is_same<
+ typename std::result_of<
+ typename std::iterator_traits<InputIterator>::value_type()>::type, void
+ >::value, void>::type
+inline whenAll(InputIterator first, InputIterator last);
+
+/**
+ * Schedules several tasks and blocks until one of them is completed.
+ * If this task throws an exception, this exception will be re-thrown.
+ * Exceptions thrown by all other tasks will be ignored.
+ *
+ * @param first Range of tasks to be scheduled
+ * @param last
+ *
+ * @return pair of index of the first completed task and its return value
+ */
+template <class InputIterator>
+typename std::enable_if<
+ !std::is_same<
+ typename std::result_of<
+ typename std::iterator_traits<InputIterator>::value_type()>::type, void
+ >::value,
+ typename std::pair<
+ size_t,
+ typename std::result_of<
+ typename std::iterator_traits<InputIterator>::value_type()>::type>
+ >::type
+inline whenAny(InputIterator first, InputIterator last);
+
+/**
+ * WhenAny specialization for functions returning void.
+ *
+ * @param first Range of tasks to be scheduled
+ * @param last
+ *
+ * @return index of the first completed task
+ */
+template <class InputIterator>
+typename std::enable_if<
+ std::is_same<
+ typename std::result_of<
+ typename std::iterator_traits<InputIterator>::value_type()>::type, void
+ >::value, size_t>::type
+inline whenAny(InputIterator first, InputIterator last);
+
+}}
+
+#include <folly/experimental/fibers/WhenN-inl.h>
--- /dev/null
+/*
+ * Copyright 2015 Facebook, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#include <atomic>
+#include <thread>
+#include <vector>
+
+#include <gtest/gtest.h>
+
+#include <folly/Benchmark.h>
+#include <folly/Memory.h>
+
+#include <folly/experimental/fibers/AddTasks.h>
+#include <folly/experimental/fibers/EventBaseLoopController.h>
+#include <folly/experimental/fibers/FiberManager.h>
+#include <folly/experimental/fibers/GenericBaton.h>
+#include <folly/experimental/fibers/SimpleLoopController.h>
+#include <folly/experimental/fibers/WhenN.h>
+
+using namespace folly::fibers;
+
+using folly::Try;
+
+TEST(FiberManager, batonTimedWaitTimeout) {
+ bool taskAdded = false;
+ size_t iterations = 0;
+
+ FiberManager manager(folly::make_unique<SimpleLoopController>());
+ auto& loopController =
+ dynamic_cast<SimpleLoopController&>(manager.loopController());
+
+ auto loopFunc = [&]() {
+ if (!taskAdded) {
+ manager.addTask(
+ [&]() {
+ Baton baton;
+
+ auto res = baton.timed_wait(std::chrono::milliseconds(230));
+
+ EXPECT_FALSE(res);
+ EXPECT_EQ(5, iterations);
+
+ loopController.stop();
+ }
+ );
+ manager.addTask(
+ [&]() {
+ Baton baton;
+
+ auto res = baton.timed_wait(std::chrono::milliseconds(130));
+
+ EXPECT_FALSE(res);
+ EXPECT_EQ(3, iterations);
+
+ loopController.stop();
+ }
+ );
+ taskAdded = true;
+ } else {
+ std::this_thread::sleep_for(std::chrono::milliseconds(50));
+ iterations ++;
+ }
+ };
+
+ loopController.loop(std::move(loopFunc));
+}
+
+TEST(FiberManager, batonTimedWaitPost) {
+ bool taskAdded = false;
+ size_t iterations = 0;
+ Baton* baton_ptr;
+
+ FiberManager manager(folly::make_unique<SimpleLoopController>());
+ auto& loopController =
+ dynamic_cast<SimpleLoopController&>(manager.loopController());
+
+ auto loopFunc = [&]() {
+ if (!taskAdded) {
+ manager.addTask(
+ [&]() {
+ Baton baton;
+ baton_ptr = &baton;
+
+ auto res = baton.timed_wait(std::chrono::milliseconds(130));
+
+ EXPECT_TRUE(res);
+ EXPECT_EQ(2, iterations);
+
+ loopController.stop();
+ }
+ );
+ taskAdded = true;
+ } else {
+ std::this_thread::sleep_for(std::chrono::milliseconds(50));
+ iterations ++;
+ if (iterations == 2) {
+ baton_ptr->post();
+ }
+ }
+ };
+
+ loopController.loop(std::move(loopFunc));
+}
+
+TEST(FiberManager, batonTimedWaitTimeoutEvb) {
+ size_t tasksComplete = 0;
+
+ folly::EventBase evb;
+
+ FiberManager manager(folly::make_unique<EventBaseLoopController>());
+ dynamic_cast<EventBaseLoopController&>(
+ manager.loopController()).attachEventBase(evb);
+
+ auto task = [&](size_t timeout_ms) {
+ Baton baton;
+
+ auto start = EventBaseLoopController::Clock::now();
+ auto res = baton.timed_wait(std::chrono::milliseconds(timeout_ms));
+ auto finish = EventBaseLoopController::Clock::now();
+
+ EXPECT_FALSE(res);
+
+ auto duration_ms =
+ std::chrono::duration_cast<std::chrono::milliseconds>(finish - start);
+
+ EXPECT_GT(duration_ms.count(), timeout_ms - 50);
+ EXPECT_LT(duration_ms.count(), timeout_ms + 50);
+
+ if (++tasksComplete == 2) {
+ evb.terminateLoopSoon();
+ }
+ };
+
+ evb.runInEventBaseThread([&]() {
+ manager.addTask(
+ [&]() {
+ task(500);
+ }
+ );
+ manager.addTask(
+ [&]() {
+ task(250);
+ }
+ );
+ });
+
+ evb.loopForever();
+
+ EXPECT_EQ(2, tasksComplete);
+}
+
+TEST(FiberManager, batonTimedWaitPostEvb) {
+ size_t tasksComplete = 0;
+
+ folly::EventBase evb;
+
+ FiberManager manager(folly::make_unique<EventBaseLoopController>());
+ dynamic_cast<EventBaseLoopController&>(
+ manager.loopController()).attachEventBase(evb);
+
+ evb.runInEventBaseThread([&]() {
+ manager.addTask([&]() {
+ Baton baton;
+
+ evb.tryRunAfterDelay([&]() {
+ baton.post();
+ },
+ 100);
+
+ auto start = EventBaseLoopController::Clock::now();
+ auto res = baton.timed_wait(std::chrono::milliseconds(130));
+ auto finish = EventBaseLoopController::Clock::now();
+
+ EXPECT_TRUE(res);
+
+ auto duration_ms = std::chrono::duration_cast<
+ std::chrono::milliseconds>(finish - start);
+
+ EXPECT_TRUE(duration_ms.count() > 95 &&
+ duration_ms.count() < 110);
+
+ if (++tasksComplete == 1) {
+ evb.terminateLoopSoon();
+ }
+ });
+ });
+
+ evb.loopForever();
+
+ EXPECT_EQ(1, tasksComplete);
+}
+
+TEST(FiberManager, batonTryWait) {
+
+ FiberManager manager(folly::make_unique<SimpleLoopController>());
+ auto& loopController =
+ dynamic_cast<SimpleLoopController&>(manager.loopController());
+
+ // Check if try_wait and post work as expected
+ Baton b;
+
+ manager.addTask([&](){
+ while (!b.try_wait()) {
+ }
+ });
+ auto thr = std::thread([&](){
+ std::this_thread::sleep_for(std::chrono::milliseconds(300));
+ b.post();
+ });
+
+ manager.loopUntilNoReady();
+ thr.join();
+
+ Baton c;
+
+ // Check try_wait without post
+ manager.addTask([&](){
+ int cnt = 100;
+ while (cnt && !c.try_wait()) {
+ cnt--;
+ }
+ EXPECT_TRUE(!c.try_wait()); // must still hold
+ EXPECT_EQ(cnt, 0);
+ });
+
+ manager.loopUntilNoReady();
+}
+
+TEST(FiberManager, genericBatonFiberWait) {
+ FiberManager manager(folly::make_unique<SimpleLoopController>());
+
+ GenericBaton b;
+ bool fiberRunning = false;
+
+ manager.addTask([&](){
+ EXPECT_EQ(manager.hasActiveFiber(), true);
+ fiberRunning = true;
+ b.wait();
+ fiberRunning = false;
+ });
+
+ EXPECT_FALSE(fiberRunning);
+ manager.loopUntilNoReady();
+ EXPECT_TRUE(fiberRunning); // ensure fiber still active
+
+ auto thr = std::thread([&](){
+ std::this_thread::sleep_for(std::chrono::milliseconds(300));
+ b.post();
+ });
+
+ while (fiberRunning) {
+ manager.loopUntilNoReady();
+ }
+
+ thr.join();
+}
+
+TEST(FiberManager, genericBatonThreadWait) {
+ FiberManager manager(folly::make_unique<SimpleLoopController>());
+ GenericBaton b;
+ std::atomic<bool> threadWaiting(false);
+
+ auto thr = std::thread([&](){
+ threadWaiting = true;
+ b.wait();
+ threadWaiting = false;
+ });
+
+ while (!threadWaiting) {}
+ std::this_thread::sleep_for(std::chrono::milliseconds(300));
+
+ manager.addTask([&](){
+ EXPECT_EQ(manager.hasActiveFiber(), true);
+ EXPECT_TRUE(threadWaiting);
+ b.post();
+ while(threadWaiting) {}
+ });
+
+ manager.loopUntilNoReady();
+ thr.join();
+}
+
+TEST(FiberManager, addTasksNoncopyable) {
+ std::vector<Promise<int>> pendingFibers;
+ bool taskAdded = false;
+
+ FiberManager manager(folly::make_unique<SimpleLoopController>());
+ auto& loopController =
+ dynamic_cast<SimpleLoopController&>(manager.loopController());
+
+ auto loopFunc = [&]() {
+ if (!taskAdded) {
+ manager.addTask(
+ [&]() {
+ std::vector<std::function<std::unique_ptr<int>()>> funcs;
+ for (size_t i = 0; i < 3; ++i) {
+ funcs.push_back(
+ [i, &pendingFibers]() {
+ await([&pendingFibers](Promise<int> promise) {
+ pendingFibers.push_back(std::move(promise));
+ });
+ return folly::make_unique<int>(i*2 + 1);
+ }
+ );
+ }
+
+ auto iter = addTasks(funcs.begin(), funcs.end());
+
+ size_t n = 0;
+ while (iter.hasNext()) {
+ auto result = iter.awaitNext();
+ EXPECT_EQ(2 * iter.getTaskID() + 1, *result);
+ EXPECT_GE(2 - n, pendingFibers.size());
+ ++n;
+ }
+ EXPECT_EQ(3, n);
+ }
+ );
+ taskAdded = true;
+ } else if (pendingFibers.size()) {
+ pendingFibers.back().setValue(0);
+ pendingFibers.pop_back();
+ } else {
+ loopController.stop();
+ }
+ };
+
+ loopController.loop(std::move(loopFunc));
+}
+
+TEST(FiberManager, addTasksThrow) {
+ std::vector<Promise<int>> pendingFibers;
+ bool taskAdded = false;
+
+ FiberManager manager(folly::make_unique<SimpleLoopController>());
+ auto& loopController =
+ dynamic_cast<SimpleLoopController&>(manager.loopController());
+
+ auto loopFunc = [&]() {
+ if (!taskAdded) {
+ manager.addTask(
+ [&]() {
+ std::vector<std::function<int()>> funcs;
+ for (size_t i = 0; i < 3; ++i) {
+ funcs.push_back(
+ [i, &pendingFibers]() {
+ await([&pendingFibers](Promise<int> promise) {
+ pendingFibers.push_back(std::move(promise));
+ });
+ if (i % 2 == 0) {
+ throw std::runtime_error("Runtime");
+ }
+ return i*2 + 1;
+ }
+ );
+ }
+
+ auto iter = addTasks(funcs.begin(), funcs.end());
+
+ size_t n = 0;
+ while (iter.hasNext()) {
+ try {
+ int result = iter.awaitNext();
+ EXPECT_EQ(1, iter.getTaskID() % 2);
+ EXPECT_EQ(2 * iter.getTaskID() + 1, result);
+ } catch (...) {
+ EXPECT_EQ(0, iter.getTaskID() % 2);
+ }
+ EXPECT_GE(2 - n, pendingFibers.size());
+ ++n;
+ }
+ EXPECT_EQ(3, n);
+ }
+ );
+ taskAdded = true;
+ } else if (pendingFibers.size()) {
+ pendingFibers.back().setValue(0);
+ pendingFibers.pop_back();
+ } else {
+ loopController.stop();
+ }
+ };
+
+ loopController.loop(std::move(loopFunc));
+}
+
+TEST(FiberManager, addTasksVoid) {
+ std::vector<Promise<int>> pendingFibers;
+ bool taskAdded = false;
+
+ FiberManager manager(folly::make_unique<SimpleLoopController>());
+ auto& loopController =
+ dynamic_cast<SimpleLoopController&>(manager.loopController());
+
+ auto loopFunc = [&]() {
+ if (!taskAdded) {
+ manager.addTask(
+ [&]() {
+ std::vector<std::function<void()>> funcs;
+ for (size_t i = 0; i < 3; ++i) {
+ funcs.push_back(
+ [i, &pendingFibers]() {
+ await([&pendingFibers](Promise<int> promise) {
+ pendingFibers.push_back(std::move(promise));
+ });
+ }
+ );
+ }
+
+ auto iter = addTasks(funcs.begin(), funcs.end());
+
+ size_t n = 0;
+ while (iter.hasNext()) {
+ iter.awaitNext();
+ EXPECT_GE(2 - n, pendingFibers.size());
+ ++n;
+ }
+ EXPECT_EQ(3, n);
+ }
+ );
+ taskAdded = true;
+ } else if (pendingFibers.size()) {
+ pendingFibers.back().setValue(0);
+ pendingFibers.pop_back();
+ } else {
+ loopController.stop();
+ }
+ };
+
+ loopController.loop(std::move(loopFunc));
+}
+
+TEST(FiberManager, addTasksVoidThrow) {
+ std::vector<Promise<int>> pendingFibers;
+ bool taskAdded = false;
+
+ FiberManager manager(folly::make_unique<SimpleLoopController>());
+ auto& loopController =
+ dynamic_cast<SimpleLoopController&>(manager.loopController());
+
+ auto loopFunc = [&]() {
+ if (!taskAdded) {
+ manager.addTask(
+ [&]() {
+ std::vector<std::function<void()>> funcs;
+ for (size_t i = 0; i < 3; ++i) {
+ funcs.push_back(
+ [i, &pendingFibers]() {
+ await([&pendingFibers](Promise<int> promise) {
+ pendingFibers.push_back(std::move(promise));
+ });
+ if (i % 2 == 0) {
+ throw std::runtime_error("");
+ }
+ }
+ );
+ }
+
+ auto iter = addTasks(funcs.begin(), funcs.end());
+
+ size_t n = 0;
+ while (iter.hasNext()) {
+ try {
+ iter.awaitNext();
+ EXPECT_EQ(1, iter.getTaskID() % 2);
+ } catch (...) {
+ EXPECT_EQ(0, iter.getTaskID() % 2);
+ }
+ EXPECT_GE(2 - n, pendingFibers.size());
+ ++n;
+ }
+ EXPECT_EQ(3, n);
+ }
+ );
+ taskAdded = true;
+ } else if (pendingFibers.size()) {
+ pendingFibers.back().setValue(0);
+ pendingFibers.pop_back();
+ } else {
+ loopController.stop();
+ }
+ };
+
+ loopController.loop(std::move(loopFunc));
+}
+
+TEST(FiberManager, reserve) {
+ std::vector<Promise<int>> pendingFibers;
+ bool taskAdded = false;
+
+ FiberManager manager(folly::make_unique<SimpleLoopController>());
+ auto& loopController =
+ dynamic_cast<SimpleLoopController&>(manager.loopController());
+
+ auto loopFunc = [&]() {
+ if (!taskAdded) {
+ manager.addTask(
+ [&]() {
+ std::vector<std::function<void()>> funcs;
+ for (size_t i = 0; i < 3; ++i) {
+ funcs.push_back(
+ [&pendingFibers]() {
+ await([&pendingFibers](Promise<int> promise) {
+ pendingFibers.push_back(std::move(promise));
+ });
+ }
+ );
+ }
+
+ auto iter = addTasks(funcs.begin(), funcs.end());
+
+ iter.reserve(2);
+ EXPECT_TRUE(iter.hasCompleted());
+ EXPECT_TRUE(iter.hasPending());
+ EXPECT_TRUE(iter.hasNext());
+
+ iter.awaitNext();
+ EXPECT_TRUE(iter.hasCompleted());
+ EXPECT_TRUE(iter.hasPending());
+ EXPECT_TRUE(iter.hasNext());
+
+ iter.awaitNext();
+ EXPECT_FALSE(iter.hasCompleted());
+ EXPECT_TRUE(iter.hasPending());
+ EXPECT_TRUE(iter.hasNext());
+
+ iter.awaitNext();
+ EXPECT_FALSE(iter.hasCompleted());
+ EXPECT_FALSE(iter.hasPending());
+ EXPECT_FALSE(iter.hasNext());
+ }
+ );
+ taskAdded = true;
+ } else if (pendingFibers.size()) {
+ pendingFibers.back().setValue(0);
+ pendingFibers.pop_back();
+ } else {
+ loopController.stop();
+ }
+ };
+
+ loopController.loop(std::move(loopFunc));
+}
+
+TEST(FiberManager, forEach) {
+ std::vector<Promise<int>> pendingFibers;
+ bool taskAdded = false;
+
+ FiberManager manager(folly::make_unique<SimpleLoopController>());
+ auto& loopController =
+ dynamic_cast<SimpleLoopController&>(manager.loopController());
+
+ auto loopFunc = [&]() {
+ if (!taskAdded) {
+ manager.addTask(
+ [&]() {
+ std::vector<std::function<int()>> funcs;
+ for (size_t i = 0; i < 3; ++i) {
+ funcs.push_back(
+ [i, &pendingFibers]() {
+ await([&pendingFibers](Promise<int> promise) {
+ pendingFibers.push_back(std::move(promise));
+ });
+ return i * 2 + 1;
+ }
+ );
+ }
+
+ std::vector<std::pair<size_t, int>> results;
+ forEach(funcs.begin(), funcs.end(),
+ [&results](size_t id, int result) {
+ results.push_back(std::make_pair(id, result));
+ });
+ EXPECT_EQ(3, results.size());
+ EXPECT_TRUE(pendingFibers.empty());
+ for (size_t i = 0; i < 3; ++i) {
+ EXPECT_EQ(results[i].first * 2 + 1, results[i].second);
+ }
+ }
+ );
+ taskAdded = true;
+ } else if (pendingFibers.size()) {
+ pendingFibers.back().setValue(0);
+ pendingFibers.pop_back();
+ } else {
+ loopController.stop();
+ }
+ };
+
+ loopController.loop(std::move(loopFunc));
+}
+
+TEST(FiberManager, whenN) {
+ std::vector<Promise<int>> pendingFibers;
+ bool taskAdded = false;
+
+ FiberManager manager(folly::make_unique<SimpleLoopController>());
+ auto& loopController =
+ dynamic_cast<SimpleLoopController&>(manager.loopController());
+
+ auto loopFunc = [&]() {
+ if (!taskAdded) {
+ manager.addTask(
+ [&]() {
+ std::vector<std::function<int()>> funcs;
+ for (size_t i = 0; i < 3; ++i) {
+ funcs.push_back(
+ [i, &pendingFibers]() {
+ await([&pendingFibers](Promise<int> promise) {
+ pendingFibers.push_back(std::move(promise));
+ });
+ return i*2 + 1;
+ }
+ );
+ }
+
+ auto results = whenN(funcs.begin(), funcs.end(), 2);
+ EXPECT_EQ(2, results.size());
+ EXPECT_EQ(1, pendingFibers.size());
+ for (size_t i = 0; i < 2; ++i) {
+ EXPECT_EQ(results[i].first*2 + 1, results[i].second);
+ }
+ }
+ );
+ taskAdded = true;
+ } else if (pendingFibers.size()) {
+ pendingFibers.back().setValue(0);
+ pendingFibers.pop_back();
+ } else {
+ loopController.stop();
+ }
+ };
+
+ loopController.loop(std::move(loopFunc));
+}
+
+TEST(FiberManager, whenNThrow) {
+ std::vector<Promise<int>> pendingFibers;
+ bool taskAdded = false;
+
+ FiberManager manager(folly::make_unique<SimpleLoopController>());
+ auto& loopController =
+ dynamic_cast<SimpleLoopController&>(manager.loopController());
+
+ auto loopFunc = [&]() {
+ if (!taskAdded) {
+ manager.addTask(
+ [&]() {
+ std::vector<std::function<int()>> funcs;
+ for (size_t i = 0; i < 3; ++i) {
+ funcs.push_back(
+ [i, &pendingFibers]() {
+ await([&pendingFibers](Promise<int> promise) {
+ pendingFibers.push_back(std::move(promise));
+ });
+ throw std::runtime_error("Runtime");
+ return i*2+1;
+ }
+ );
+ }
+
+ try {
+ whenN(funcs.begin(), funcs.end(), 2);
+ } catch (...) {
+ EXPECT_EQ(1, pendingFibers.size());
+ }
+ }
+ );
+ taskAdded = true;
+ } else if (pendingFibers.size()) {
+ pendingFibers.back().setValue(0);
+ pendingFibers.pop_back();
+ } else {
+ loopController.stop();
+ }
+ };
+
+ loopController.loop(std::move(loopFunc));
+}
+
+TEST(FiberManager, whenNVoid) {
+ std::vector<Promise<int>> pendingFibers;
+ bool taskAdded = false;
+
+ FiberManager manager(folly::make_unique<SimpleLoopController>());
+ auto& loopController =
+ dynamic_cast<SimpleLoopController&>(manager.loopController());
+
+ auto loopFunc = [&]() {
+ if (!taskAdded) {
+ manager.addTask(
+ [&]() {
+ std::vector<std::function<void()>> funcs;
+ for (size_t i = 0; i < 3; ++i) {
+ funcs.push_back(
+ [i, &pendingFibers]() {
+ await([&pendingFibers](Promise<int> promise) {
+ pendingFibers.push_back(std::move(promise));
+ });
+ }
+ );
+ }
+
+ auto results = whenN(funcs.begin(), funcs.end(), 2);
+ EXPECT_EQ(2, results.size());
+ EXPECT_EQ(1, pendingFibers.size());
+ }
+ );
+ taskAdded = true;
+ } else if (pendingFibers.size()) {
+ pendingFibers.back().setValue(0);
+ pendingFibers.pop_back();
+ } else {
+ loopController.stop();
+ }
+ };
+
+ loopController.loop(std::move(loopFunc));
+}
+
+TEST(FiberManager, whenNVoidThrow) {
+ std::vector<Promise<int>> pendingFibers;
+ bool taskAdded = false;
+
+ FiberManager manager(folly::make_unique<SimpleLoopController>());
+ auto& loopController =
+ dynamic_cast<SimpleLoopController&>(manager.loopController());
+
+ auto loopFunc = [&]() {
+ if (!taskAdded) {
+ manager.addTask(
+ [&]() {
+ std::vector<std::function<void()>> funcs;
+ for (size_t i = 0; i < 3; ++i) {
+ funcs.push_back(
+ [i, &pendingFibers]() {
+ await([&pendingFibers](Promise<int> promise) {
+ pendingFibers.push_back(std::move(promise));
+ });
+ throw std::runtime_error("Runtime");
+ }
+ );
+ }
+
+ try {
+ whenN(funcs.begin(), funcs.end(), 2);
+ } catch (...) {
+ EXPECT_EQ(1, pendingFibers.size());
+ }
+ }
+ );
+ taskAdded = true;
+ } else if (pendingFibers.size()) {
+ pendingFibers.back().setValue(0);
+ pendingFibers.pop_back();
+ } else {
+ loopController.stop();
+ }
+ };
+
+ loopController.loop(std::move(loopFunc));
+}
+
+TEST(FiberManager, whenAll) {
+ std::vector<Promise<int>> pendingFibers;
+ bool taskAdded = false;
+
+ FiberManager manager(folly::make_unique<SimpleLoopController>());
+ auto& loopController =
+ dynamic_cast<SimpleLoopController&>(manager.loopController());
+
+ auto loopFunc = [&]() {
+ if (!taskAdded) {
+ manager.addTask(
+ [&]() {
+ std::vector<std::function<int()>> funcs;
+ for (size_t i = 0; i < 3; ++i) {
+ funcs.push_back(
+ [i, &pendingFibers]() {
+ await([&pendingFibers](Promise<int> promise) {
+ pendingFibers.push_back(std::move(promise));
+ });
+ return i*2+1;
+ }
+ );
+ }
+
+ auto results = whenAll(funcs.begin(), funcs.end());
+ EXPECT_TRUE(pendingFibers.empty());
+ for (size_t i = 0; i < 3; ++i) {
+ EXPECT_EQ(i*2+1, results[i]);
+ }
+ }
+ );
+ taskAdded = true;
+ } else if (pendingFibers.size()) {
+ pendingFibers.back().setValue(0);
+ pendingFibers.pop_back();
+ } else {
+ loopController.stop();
+ }
+ };
+
+ loopController.loop(std::move(loopFunc));
+}
+
+TEST(FiberManager, whenAllVoid) {
+ std::vector<Promise<int>> pendingFibers;
+ bool taskAdded = false;
+
+ FiberManager manager(folly::make_unique<SimpleLoopController>());
+ auto& loopController =
+ dynamic_cast<SimpleLoopController&>(manager.loopController());
+
+ auto loopFunc = [&]() {
+ if (!taskAdded) {
+ manager.addTask(
+ [&]() {
+ std::vector<std::function<void()>> funcs;
+ for (size_t i = 0; i < 3; ++ i) {
+ funcs.push_back(
+ [i, &pendingFibers]() {
+ await([&pendingFibers](Promise<int> promise) {
+ pendingFibers.push_back(std::move(promise));
+ });
+ }
+ );
+ }
+
+ whenAll(funcs.begin(), funcs.end());
+ EXPECT_TRUE(pendingFibers.empty());
+ }
+ );
+ taskAdded = true;
+ } else if (pendingFibers.size()) {
+ pendingFibers.back().setValue(0);
+ pendingFibers.pop_back();
+ } else {
+ loopController.stop();
+ }
+ };
+
+ loopController.loop(std::move(loopFunc));
+}
+
+TEST(FiberManager, whenAny) {
+ std::vector<Promise<int>> pendingFibers;
+ bool taskAdded = false;
+
+ FiberManager manager(folly::make_unique<SimpleLoopController>());
+ auto& loopController =
+ dynamic_cast<SimpleLoopController&>(manager.loopController());
+
+ auto loopFunc = [&]() {
+ if (!taskAdded) {
+ manager.addTask(
+ [&]() {
+ std::vector<std::function<int()> > funcs;
+ for (size_t i = 0; i < 3; ++ i) {
+ funcs.push_back(
+ [i, &pendingFibers]() {
+ await([&pendingFibers](Promise<int> promise) {
+ pendingFibers.push_back(std::move(promise));
+ });
+ if (i == 1) {
+ throw std::runtime_error("This exception will be ignored");
+ }
+ return i*2+1;
+ }
+ );
+ }
+
+ auto result = whenAny(funcs.begin(), funcs.end());
+ EXPECT_EQ(2, pendingFibers.size());
+ EXPECT_EQ(2, result.first);
+ EXPECT_EQ(2*2+1, result.second);
+ }
+ );
+ taskAdded = true;
+ } else if (pendingFibers.size()) {
+ pendingFibers.back().setValue(0);
+ pendingFibers.pop_back();
+ } else {
+ loopController.stop();
+ }
+ };
+
+ loopController.loop(std::move(loopFunc));
+}
+
+namespace {
+/* Checks that this function was run from a main context,
+ by comparing an address on a stack to a known main stack address
+ and a known related fiber stack address. The assumption
+ is that fiber stack and main stack will be far enough apart,
+ while any two values on the same stack will be close. */
+void expectMainContext(bool& ran, int* mainLocation, int* fiberLocation) {
+ int here;
+ /* 2 pages is a good guess */
+ constexpr ssize_t DISTANCE = 0x2000 / sizeof(int);
+ if (fiberLocation) {
+ EXPECT_TRUE(std::abs(&here - fiberLocation) > DISTANCE);
+ }
+ if (mainLocation) {
+ EXPECT_TRUE(std::abs(&here - mainLocation) < DISTANCE);
+ }
+
+ EXPECT_FALSE(ran);
+ ran = true;
+}
+}
+
+TEST(FiberManager, runInMainContext) {
+ FiberManager manager(folly::make_unique<SimpleLoopController>());
+ auto& loopController =
+ dynamic_cast<SimpleLoopController&>(manager.loopController());
+
+ bool checkRan = false;
+
+ int mainLocation;
+ manager.runInMainContext(
+ [&]() {
+ expectMainContext(checkRan, &mainLocation, nullptr);
+ });
+ EXPECT_TRUE(checkRan);
+
+ checkRan = false;
+
+ manager.addTask(
+ [&]() {
+ int stackLocation;
+ runInMainContext(
+ [&]() {
+ expectMainContext(checkRan, &mainLocation, &stackLocation);
+ });
+ EXPECT_TRUE(checkRan);
+ }
+ );
+
+ loopController.loop(
+ [&]() {
+ loopController.stop();
+ }
+ );
+
+ EXPECT_TRUE(checkRan);
+}
+
+TEST(FiberManager, addTaskFinally) {
+ FiberManager manager(folly::make_unique<SimpleLoopController>());
+ auto& loopController =
+ dynamic_cast<SimpleLoopController&>(manager.loopController());
+
+ bool checkRan = false;
+
+ int mainLocation;
+
+ manager.addTaskFinally(
+ [&]() {
+ return 1234;
+ },
+ [&](Try<int>&& result) {
+ EXPECT_EQ(result.value(), 1234);
+
+ expectMainContext(checkRan, &mainLocation, nullptr);
+ }
+ );
+
+ EXPECT_FALSE(checkRan);
+
+ loopController.loop(
+ [&]() {
+ loopController.stop();
+ }
+ );
+
+ EXPECT_TRUE(checkRan);
+}
+
+TEST(FiberManager, fibersPoolWithinLimit) {
+ FiberManager::Options opts;
+ opts.maxFibersPoolSize = 5;
+
+ FiberManager manager(folly::make_unique<SimpleLoopController>(), opts);
+ auto& loopController =
+ dynamic_cast<SimpleLoopController&>(manager.loopController());
+
+ size_t fibersRun = 0;
+
+ for (size_t i = 0; i < 5; ++i) {
+ manager.addTask(
+ [&]() {
+ ++fibersRun;
+ }
+ );
+ }
+ loopController.loop(
+ [&]() {
+ loopController.stop();
+ }
+ );
+
+ EXPECT_EQ(5, fibersRun);
+ EXPECT_EQ(5, manager.fibersAllocated());
+ EXPECT_EQ(5, manager.fibersPoolSize());
+
+ for (size_t i = 0; i < 5; ++i) {
+ manager.addTask(
+ [&]() {
+ ++fibersRun;
+ }
+ );
+ }
+ loopController.loop(
+ [&]() {
+ loopController.stop();
+ }
+ );
+
+ EXPECT_EQ(10, fibersRun);
+ EXPECT_EQ(5, manager.fibersAllocated());
+ EXPECT_EQ(5, manager.fibersPoolSize());
+}
+
+TEST(FiberManager, fibersPoolOverLimit) {
+ FiberManager::Options opts;
+ opts.maxFibersPoolSize = 5;
+
+ FiberManager manager(folly::make_unique<SimpleLoopController>(), opts);
+ auto& loopController =
+ dynamic_cast<SimpleLoopController&>(manager.loopController());
+
+ size_t fibersRun = 0;
+
+ for (size_t i = 0; i < 10; ++i) {
+ manager.addTask(
+ [&]() {
+ ++fibersRun;
+ }
+ );
+ }
+
+ EXPECT_EQ(0, fibersRun);
+ EXPECT_EQ(10, manager.fibersAllocated());
+ EXPECT_EQ(0, manager.fibersPoolSize());
+
+ loopController.loop(
+ [&]() {
+ loopController.stop();
+ }
+ );
+
+ EXPECT_EQ(10, fibersRun);
+ EXPECT_EQ(5, manager.fibersAllocated());
+ EXPECT_EQ(5, manager.fibersPoolSize());
+}
+
+TEST(FiberManager, remoteFiberBasic) {
+ FiberManager manager(folly::make_unique<SimpleLoopController>());
+ auto& loopController =
+ dynamic_cast<SimpleLoopController&>(manager.loopController());
+
+ int result[2];
+ result[0] = result[1] = 0;
+ folly::Optional<Promise<int>> savedPromise[2];
+ manager.addTask(
+ [&] () {
+ result[0] = await([&] (Promise<int> promise) {
+ savedPromise[0] = std::move(promise);
+ });
+ });
+ manager.addTask(
+ [&] () {
+ result[1] = await([&] (Promise<int> promise) {
+ savedPromise[1] = std::move(promise);
+ });
+ });
+
+ manager.loopUntilNoReady();
+
+ EXPECT_TRUE(savedPromise[0].hasValue());
+ EXPECT_TRUE(savedPromise[1].hasValue());
+ EXPECT_EQ(0, result[0]);
+ EXPECT_EQ(0, result[1]);
+
+ std::thread remoteThread0{
+ [&] () {
+ savedPromise[0]->setValue(42);
+ }
+ };
+ std::thread remoteThread1{
+ [&] () {
+ savedPromise[1]->setValue(43);
+ }
+ };
+ remoteThread0.join();
+ remoteThread1.join();
+ EXPECT_EQ(0, result[0]);
+ EXPECT_EQ(0, result[1]);
+ /* Should only have scheduled once */
+ EXPECT_EQ(1, loopController.remoteScheduleCalled());
+
+ manager.loopUntilNoReady();
+ EXPECT_EQ(42, result[0]);
+ EXPECT_EQ(43, result[1]);
+}
+
+TEST(FiberManager, addTaskRemoteBasic) {
+ FiberManager manager(folly::make_unique<SimpleLoopController>());
+
+ int result[2];
+ result[0] = result[1] = 0;
+ folly::Optional<Promise<int>> savedPromise[2];
+
+ std::thread remoteThread0{
+ [&] () {
+ manager.addTaskRemote(
+ [&] () {
+ result[0] = await([&] (Promise<int> promise) {
+ savedPromise[0] = std::move(promise);
+ });
+ });
+ }
+ };
+ std::thread remoteThread1{
+ [&] () {
+ manager.addTaskRemote(
+ [&] () {
+ result[1] = await([&] (Promise<int> promise) {
+ savedPromise[1] = std::move(promise);
+ });
+ });
+ }
+ };
+ remoteThread0.join();
+ remoteThread1.join();
+
+ manager.loopUntilNoReady();
+
+ EXPECT_TRUE(savedPromise[0].hasValue());
+ EXPECT_TRUE(savedPromise[1].hasValue());
+ EXPECT_EQ(0, result[0]);
+ EXPECT_EQ(0, result[1]);
+
+ savedPromise[0]->setValue(42);
+ savedPromise[1]->setValue(43);
+
+ EXPECT_EQ(0, result[0]);
+ EXPECT_EQ(0, result[1]);
+
+ manager.loopUntilNoReady();
+ EXPECT_EQ(42, result[0]);
+ EXPECT_EQ(43, result[1]);
+}
+
+TEST(FiberManager, remoteHasTasks) {
+ size_t counter = 0;
+ FiberManager fm(folly::make_unique<SimpleLoopController>());
+ std::thread remote([&]() {
+ fm.addTaskRemote([&]() {
+ ++counter;
+ });
+ });
+
+ remote.join();
+
+ while (fm.hasTasks()) {
+ fm.loopUntilNoReady();
+ }
+
+ EXPECT_FALSE(fm.hasTasks());
+ EXPECT_EQ(counter, 1);
+}
+
+TEST(FiberManager, remoteHasReadyTasks) {
+ int result = 0;
+ folly::Optional<Promise<int>> savedPromise;
+ FiberManager fm(folly::make_unique<SimpleLoopController>());
+ std::thread remote([&]() {
+ fm.addTaskRemote([&]() {
+ result = await([&](Promise<int> promise) {
+ savedPromise = std::move(promise);
+ });
+ EXPECT_TRUE(fm.hasTasks());
+ });
+ });
+
+ remote.join();
+ EXPECT_TRUE(fm.hasTasks());
+
+ fm.loopUntilNoReady();
+ EXPECT_TRUE(fm.hasTasks());
+
+ std::thread remote2([&](){
+ savedPromise->setValue(47);
+ });
+ remote2.join();
+ EXPECT_TRUE(fm.hasTasks());
+
+ fm.loopUntilNoReady();
+ EXPECT_FALSE(fm.hasTasks());
+
+ EXPECT_EQ(result, 47);
+}
+
+static size_t sNumAwaits;
+
+void runBenchmark(size_t numAwaits, size_t toSend) {
+ sNumAwaits = numAwaits;
+
+ FiberManager fiberManager(folly::make_unique<SimpleLoopController>());
+ auto& loopController =
+ dynamic_cast<SimpleLoopController&>(fiberManager.loopController());
+
+ std::queue<Promise<int>> pendingRequests;
+ static const size_t maxOutstanding = 5;
+
+ auto loop = [&fiberManager, &loopController, &pendingRequests, &toSend]() {
+ if (pendingRequests.size() == maxOutstanding || toSend == 0) {
+ if (pendingRequests.empty()) {
+ return;
+ }
+ pendingRequests.front().setValue(0);
+ pendingRequests.pop();
+ } else {
+ fiberManager.addTask([&pendingRequests]() {
+ for (size_t i = 0; i < sNumAwaits; ++i) {
+ auto result = await(
+ [&pendingRequests](Promise<int> promise) {
+ pendingRequests.push(std::move(promise));
+ });
+ assert(result == 0);
+ }
+ });
+
+ if (--toSend == 0) {
+ loopController.stop();
+ }
+ }
+ };
+
+ loopController.loop(std::move(loop));
+}
+
+BENCHMARK(FiberManagerBasicOneAwait, iters) {
+ runBenchmark(1, iters);
+}
+
+BENCHMARK(FiberManagerBasicFiveAwaits, iters) {
+ runBenchmark(5, iters);
+}
--- /dev/null
+/*
+ * Copyright 2015 Facebook, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#include <iostream>
+#include <queue>
+
+#include <folly/Memory.h>
+
+#include <folly/experimental/fibers/FiberManager.h>
+#include <folly/experimental/fibers/SimpleLoopController.h>
+
+using namespace folly::fibers;
+
+struct Application {
+ public:
+ Application ()
+ : fiberManager(folly::make_unique<SimpleLoopController>()),
+ toSend(20),
+ maxOutstanding(5) {
+ }
+
+ void loop() {
+ if (pendingRequests.size() == maxOutstanding || toSend == 0) {
+ if (pendingRequests.empty()) {
+ return;
+ }
+ intptr_t value = rand()%1000;
+ std::cout << "Completing request with data = " << value << std::endl;
+
+ pendingRequests.front().setValue(value);
+ pendingRequests.pop();
+ } else {
+ static size_t id_counter = 1;
+ size_t id = id_counter++;
+ std::cout << "Adding new request with id = " << id << std::endl;
+
+ fiberManager.addTask([this, id]() {
+ std::cout << "Executing fiber with id = " << id << std::endl;
+
+ auto result1 = await(
+ [this](Promise<int> fiber) {
+ pendingRequests.push(std::move(fiber));
+ });
+
+ std::cout << "Fiber id = " << id
+ << " got result1 = " << result1 << std::endl;
+
+ auto result2 = await
+ ([this](Promise<int> fiber) {
+ pendingRequests.push(std::move(fiber));
+ });
+ std::cout << "Fiber id = " << id
+ << " got result2 = " << result2 << std::endl;
+ });
+
+ if (--toSend == 0) {
+ auto& loopController =
+ dynamic_cast<SimpleLoopController&>(fiberManager.loopController());
+ loopController.stop();
+ }
+ }
+ }
+
+ FiberManager fiberManager;
+
+ std::queue<Promise<int>> pendingRequests;
+ size_t toSend;
+ size_t maxOutstanding;
+};
+
+int main() {
+ Application app;
+
+ auto loop = [&app]() {
+ app.loop();
+ };
+
+ auto& loopController =
+ dynamic_cast<SimpleLoopController&>(app.fiberManager.loopController());
+
+ loopController.loop(std::move(loop));
+
+ return 0;
+}
--- /dev/null
+check_PROGRAMS = mcrouter_fibers_test
+
+mcrouter_fibers_test_SOURCES = \
+ FibersTest.cpp \
+ main.cpp
+
+mcrouter_fibers_test_CPPFLAGS = -I$(top_srcdir)/oss_include
+mcrouter_fibers_test_LDADD = $(top_builddir)/lib/libmcrouter.a -lgtest -lfollybenchmark
--- /dev/null
+/*
+ * Copyright 2015 Facebook, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#include <gtest/gtest.h>
+
+#include <folly/Benchmark.h>
+
+// for backward compatibility with gflags
+namespace gflags { }
+namespace google { using namespace gflags; }
+
+int main(int argc, char** argv) {
+ testing::InitGoogleTest(&argc, argv);
+ google::ParseCommandLineFlags(&argc, &argv, true);
+
+ auto rc = RUN_ALL_TESTS();
+ folly::runBenchmarksOnFlag();
+ return rc;
+}
--- /dev/null
+/*
+ * Copyright 2015 Facebook, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#pragma once
+
+#include <boost/type_traits.hpp>
+
+namespace folly { namespace fibers {
+
+/**
+ * For any functor F taking >= 1 argument,
+ * FirstArgOf<F>::type is the type of F's first parameter.
+ *
+ * Rationale: we want to declare a function func(F), where F has the
+ * signature `void(X)` and func should return T<X> (T and X are some types).
+ * Solution:
+ *
+ * template <typename F>
+ * T<typename FirstArgOf<F>::type>
+ * func(F&& f);
+ */
+
+namespace detail {
+
+/**
+ * If F is a pointer-to-member, will contain a typedef type
+ * with the type of F's first parameter
+ */
+template<typename>
+struct ExtractFirstMemfn;
+
+template <typename Ret, typename T, typename First, typename... Args>
+struct ExtractFirstMemfn<Ret (T::*)(First, Args...)> {
+ typedef First type;
+};
+
+template <typename Ret, typename T, typename First, typename... Args>
+struct ExtractFirstMemfn<Ret (T::*)(First, Args...) const> {
+ typedef First type;
+};
+
+} // detail
+
+/** Default - use boost */
+template <typename F, typename Enable = void>
+struct FirstArgOf {
+ typedef typename boost::function_traits<
+ typename std::remove_pointer<F>::type>::arg1_type type;
+};
+
+/** Specialization for function objects */
+template <typename F>
+struct FirstArgOf<F, typename std::enable_if<std::is_class<F>::value>::type> {
+ typedef typename detail::ExtractFirstMemfn<
+ decltype(&F::operator())>::type type;
+};
+
+}} // folly::fibers