#include <folly/experimental/fibers/FiberManager.h>
-namespace folly { namespace fibers {
+namespace folly {
+namespace fibers {
template <typename T>
TaskIterator<T>::TaskIterator(TaskIterator&& other) noexcept
- : context_(std::move(other.context_)),
- id_(other.id_) {
-}
+ : context_(std::move(other.context_)), id_(other.id_) {}
template <typename T>
TaskIterator<T>::TaskIterator(std::shared_ptr<Context> context)
- : context_(std::move(context)),
- id_(-1) {
+ : context_(std::move(context)), id_(-1) {
assert(context_);
}
size_t tasksLeft = context_->totalTasks - context_->results.size();
n = std::min(n, tasksLeft);
- await(
- [this, n](Promise<void> promise) {
- context_->tasksToFulfillPromise = n;
- context_->promise.assign(std::move(promise));
- });
+ await([this, n](Promise<void> promise) {
+ context_->tasksToFulfillPromise = n;
+ context_->promise.assign(std::move(promise));
+ });
}
template <typename T>
template <class InputIterator>
TaskIterator<typename std::result_of<
- typename std::iterator_traits<InputIterator>::value_type()>::type>
+ typename std::iterator_traits<InputIterator>::value_type()>::type>
addTasks(InputIterator first, InputIterator last) {
typedef typename std::result_of<
- typename std::iterator_traits<InputIterator>::value_type()>::type
+ typename std::iterator_traits<InputIterator>::value_type()>::type
ResultType;
typedef TaskIterator<ResultType> IteratorType;
#pragma clang diagnostic push // ignore generalized lambda capture warning
#pragma clang diagnostic ignored "-Wc++1y-extensions"
#endif
- addTask(
- [i, context, f = std::move(*first)]() {
- context->results.emplace_back(i, folly::makeTryWith(std::move(f)));
-
- // Check for awaiting iterator.
- if (context->promise.hasValue()) {
- if (--context->tasksToFulfillPromise == 0) {
- context->promise->setValue();
- context->promise.clear();
- }
+ addTask([ i, context, f = std::move(*first) ]() {
+ context->results.emplace_back(i, folly::makeTryWith(std::move(f)));
+
+ // Check for awaiting iterator.
+ if (context->promise.hasValue()) {
+ if (--context->tasksToFulfillPromise == 0) {
+ context->promise->setValue();
+ context->promise.clear();
}
}
- );
+ });
#ifdef __clang__
#pragma clang diagnostic pop
#endif
return IteratorType(std::move(context));
}
-
-}}
+}
+}
#include <folly/experimental/fibers/Promise.h>
#include <folly/futures/Try.h>
-namespace folly { namespace fibers {
+namespace folly {
+namespace fibers {
template <typename T>
class TaskIterator;
* @return movable, non-copyable iterator
*/
template <class InputIterator>
-TaskIterator<
- typename std::result_of<
- typename std::iterator_traits<InputIterator>::value_type()>::type>
-inline addTasks(InputIterator first, InputIterator last);
+TaskIterator<typename std::result_of<
+ typename std::iterator_traits<InputIterator>::value_type()>::
+ type> inline addTasks(InputIterator first, InputIterator last);
template <typename T>
class TaskIterator {
private:
template <class InputIterator>
- friend TaskIterator<
- typename std::result_of<
- typename std::iterator_traits<InputIterator>::value_type()>::type>
+ friend TaskIterator<typename std::result_of<
+ typename std::iterator_traits<InputIterator>::value_type()>::type>
addTasks(InputIterator first, InputIterator last);
struct Context {
folly::Try<T> awaitNextResult();
};
-
-}}
+}
+}
#include <folly/experimental/fibers/AddTasks-inl.h>
#include <folly/experimental/fibers/Fiber.h>
#include <folly/experimental/fibers/FiberManager.h>
-namespace folly { namespace fibers {
+namespace folly {
+namespace fibers {
inline Baton::Baton() : Baton(NO_WAITER) {
assert(Baton(NO_WAITER).futex_.futex == static_cast<uint32_t>(NO_WAITER));
assert(Baton(POSTED).futex_.futex == static_cast<uint32_t>(POSTED));
assert(Baton(TIMEOUT).futex_.futex == static_cast<uint32_t>(TIMEOUT));
- assert(Baton(THREAD_WAITING).futex_.futex ==
- static_cast<uint32_t>(THREAD_WAITING));
+ assert(
+ Baton(THREAD_WAITING).futex_.futex ==
+ static_cast<uint32_t>(THREAD_WAITING));
assert(futex_.futex.is_lock_free());
assert(waitingFiber_.is_lock_free());
} else {
throw std::logic_error("Some Fiber is already waiting on this Baton.");
}
- } while(!waitingFiber.compare_exchange_weak(
- baton_fiber,
- reinterpret_cast<intptr_t>(&fiber)));
+ } while (!waitingFiber.compare_exchange_weak(
+ baton_fiber, reinterpret_cast<intptr_t>(&fiber)));
mainContextFunc();
};
}
template <typename F>
-bool Baton::timed_wait(TimeoutController::Duration timeout,
- F&& mainContextFunc) {
+bool Baton::timed_wait(
+ TimeoutController::Duration timeout,
+ F&& mainContextFunc) {
auto fm = FiberManager::getFiberManagerUnsafe();
if (!fm || !fm->activeFiber_) {
canceled = true;
};
- auto id = fm->timeoutManager_->registerTimeout(
- std::ref(timeoutFunc), timeout);
+ auto id =
+ fm->timeoutManager_->registerTimeout(std::ref(timeoutFunc), timeout);
waitFiber(*fm, std::move(mainContextFunc));
return posted;
}
-template<typename C, typename D>
-bool Baton::timed_wait(const std::chrono::time_point<C,D>& timeout) {
+template <typename C, typename D>
+bool Baton::timed_wait(const std::chrono::time_point<C, D>& timeout) {
auto now = C::now();
if (LIKELY(now <= timeout)) {
return timed_wait(TimeoutController::Duration(0));
}
}
-
-
-}}
+}
+}
#include <folly/experimental/fibers/FiberManager.h>
#include <folly/portability/Asm.h>
-namespace folly { namespace fibers {
+namespace folly {
+namespace fibers {
void Baton::wait() {
- wait([](){});
+ wait([]() {});
}
void Baton::wait(TimeoutHandler& timeoutHandler) {
}
bool Baton::timed_wait(TimeoutController::Duration timeout) {
- return timed_wait(timeout, [](){});
+ return timed_wait(timeout, []() {});
}
void Baton::waitThread() {
auto fiber = waitingFiber_.load();
- if (LIKELY(fiber == NO_WAITER &&
- waitingFiber_.compare_exchange_strong(fiber, THREAD_WAITING))) {
+ if (LIKELY(
+ fiber == NO_WAITER &&
+ waitingFiber_.compare_exchange_strong(fiber, THREAD_WAITING))) {
do {
folly::detail::MemoryIdler::futexWait(futex_.futex, THREAD_WAITING);
fiber = waitingFiber_.load(std::memory_order_relaxed);
}
bool Baton::spinWaitForEarlyPost() {
- static_assert(PreBlockAttempts > 0,
+ static_assert(
+ PreBlockAttempts > 0,
"isn't this assert clearer than an uninitialized variable warning?");
for (int i = 0; i < PreBlockAttempts; ++i) {
if (try_wait()) {
auto fiber = waitingFiber_.load();
- if (LIKELY(fiber == NO_WAITER &&
- waitingFiber_.compare_exchange_strong(fiber, THREAD_WAITING))) {
+ if (LIKELY(
+ fiber == NO_WAITER &&
+ waitingFiber_.compare_exchange_strong(fiber, THREAD_WAITING))) {
auto deadline = TimeoutController::Clock::now() + timeout;
do {
const auto wait_rv =
- futex_.futex.futexWaitUntil(THREAD_WAITING, deadline);
+ futex_.futex.futexWaitUntil(THREAD_WAITING, deadline);
if (wait_rv == folly::detail::FutexResult::TIMEDOUT) {
return false;
}
}
void Baton::reset() {
- waitingFiber_.store(NO_WAITER, std::memory_order_relaxed);;
+ waitingFiber_.store(NO_WAITER, std::memory_order_relaxed);
+ ;
}
void Baton::TimeoutHandler::scheduleTimeout(
assert(timeoutPtr_ == 0);
if (timeout.count() > 0) {
- timeoutPtr_ = fiberManager_->timeoutManager_->registerTimeout(
- timeoutFunc_, timeout);
+ timeoutPtr_ =
+ fiberManager_->timeoutManager_->registerTimeout(timeoutFunc_, timeout);
}
}
fiberManager_->timeoutManager_->cancel(timeoutPtr_);
}
}
-
-}}
+}
+}
#include <folly/detail/Futex.h>
#include <folly/experimental/fibers/TimeoutController.h>
-namespace folly { namespace fibers {
+namespace folly {
+namespace fibers {
class Fiber;
class FiberManager;
* This is here only not break tao/locks. Please don't use it, because it is
* inefficient when used on Fibers.
*/
- template<typename C, typename D = typename C::duration>
- bool timed_wait(const std::chrono::time_point<C,D>& timeout);
+ template <typename C, typename D = typename C::duration>
+ bool timed_wait(const std::chrono::time_point<C, D>& timeout);
/**
* Puts active fiber to sleep. Returns when post is called.
PreBlockAttempts = 300,
};
- explicit Baton(intptr_t state) : waitingFiber_(state) {};
+ explicit Baton(intptr_t state) : waitingFiber_(state){};
void postHelper(intptr_t new_value);
void postThread();
} futex_;
};
};
-
-}}
+}
+}
#include <folly/experimental/fibers/Baton-inl.h>
/**
* Wrappers for different versions of boost::context library
* API reference for different versions
- * Boost 1.51: http://www.boost.org/doc/libs/1_51_0/libs/context/doc/html/context/context/boost_fcontext.html
- * Boost 1.52: http://www.boost.org/doc/libs/1_52_0/libs/context/doc/html/context/context/boost_fcontext.html
- * Boost 1.56: http://www.boost.org/doc/libs/1_56_0/libs/context/doc/html/context/context/boost_fcontext.html
+ * Boost 1.51:
+ * http://www.boost.org/doc/libs/1_51_0/libs/context/doc/html/context/context/boost_fcontext.html
+ * Boost 1.52:
+ * http://www.boost.org/doc/libs/1_52_0/libs/context/doc/html/context/context/boost_fcontext.html
+ * Boost 1.56:
+ * http://www.boost.org/doc/libs/1_56_0/libs/context/doc/html/context/context/boost_fcontext.html
*/
-namespace folly { namespace fibers {
+namespace folly {
+namespace fibers {
struct FContext {
public:
-
#if BOOST_VERSION >= 105200
using ContextStruct = boost::context::fcontext_t;
#else
ContextStruct context_;
#endif
- friend intptr_t jumpContext(FContext* oldC, FContext::ContextStruct* newC,
- intptr_t p);
- friend intptr_t jumpContext(FContext::ContextStruct* oldC, FContext* newC,
- intptr_t p);
- friend FContext makeContext(void* stackLimit, size_t stackSize,
- void(*fn)(intptr_t));
+ friend intptr_t
+ jumpContext(FContext* oldC, FContext::ContextStruct* newC, intptr_t p);
+ friend intptr_t
+ jumpContext(FContext::ContextStruct* oldC, FContext* newC, intptr_t p);
+ friend FContext
+ makeContext(void* stackLimit, size_t stackSize, void (*fn)(intptr_t));
};
-inline intptr_t jumpContext(FContext* oldC, FContext::ContextStruct* newC,
- intptr_t p) {
-
+inline intptr_t
+jumpContext(FContext* oldC, FContext::ContextStruct* newC, intptr_t p) {
#if BOOST_VERSION >= 105600
return boost::context::jump_fcontext(&oldC->context_, *newC, p);
#elif BOOST_VERSION >= 105200
#else
return jump_fcontext(&oldC->context_, newC, p);
#endif
-
}
-inline intptr_t jumpContext(FContext::ContextStruct* oldC, FContext* newC,
- intptr_t p) {
-
+inline intptr_t
+jumpContext(FContext::ContextStruct* oldC, FContext* newC, intptr_t p) {
#if BOOST_VERSION >= 105200
return boost::context::jump_fcontext(oldC, newC->context_, p);
#else
return jump_fcontext(oldC, &newC->context_, p);
#endif
-
}
-inline FContext makeContext(void* stackLimit, size_t stackSize,
- void(*fn)(intptr_t)) {
+inline FContext
+makeContext(void* stackLimit, size_t stackSize, void (*fn)(intptr_t)) {
FContext res;
res.stackLimit_ = stackLimit;
res.stackBase_ = static_cast<unsigned char*>(stackLimit) + stackSize;
return res;
}
-
-}} // folly::fibers
+}
+} // folly::fibers
#include <folly/experimental/fibers/EventBaseLoopController.h>
#include <folly/experimental/fibers/FiberManager.h>
-namespace folly { namespace fibers {
+namespace folly {
+namespace fibers {
inline EventBaseLoopController::EventBaseLoopController()
: callback_(*this), aliveWeak_(destructionCallback_.getWeak()) {}
}
inline void EventBaseLoopController::attachEventBase(
- folly::EventBase& eventBase) {
-
+ folly::EventBase& eventBase) {
if (eventBase_ != nullptr) {
LOG(ERROR) << "Attempt to reattach EventBase to LoopController";
}
}
}
-inline void EventBaseLoopController::timedSchedule(std::function<void()> func,
- TimePoint time) {
+inline void EventBaseLoopController::timedSchedule(
+ std::function<void()> func,
+ TimePoint time) {
assert(eventBaseAttached_);
// We want upper bound for the cast, thus we just add 1
- auto delay_ms = std::chrono::duration_cast<
- std::chrono::milliseconds>(time - Clock::now()).count() + 1;
+ auto delay_ms =
+ std::chrono::duration_cast<std::chrono::milliseconds>(time - Clock::now())
+ .count() +
+ 1;
// If clock is not monotonic
delay_ms = std::max<decltype(delay_ms)>(delay_ms, 0L);
eventBase_->tryRunAfterDelay(func, delay_ms);
}
-
-}} // folly::fibers
+}
+} // folly::fibers
*/
#pragma once
-#include <memory>
-#include <atomic>
#include <folly/experimental/fibers/LoopController.h>
#include <folly/io/async/EventBase.h>
+#include <atomic>
+#include <memory>
namespace folly {
class EventBase;
}
-namespace folly { namespace fibers {
+namespace folly {
+namespace fibers {
class FiberManager;
explicit ControllerCallback(EventBaseLoopController& controller)
: controller_(controller) {}
- void runLoopCallback() noexcept override { controller_.runLoop(); }
+ void runLoopCallback() noexcept override {
+ controller_.runLoop();
+ }
private:
EventBaseLoopController& controller_;
class DestructionCallback : public folly::EventBase::LoopCallback {
public:
DestructionCallback() : alive_(new int(42)) {}
- ~DestructionCallback() { reset(); }
+ ~DestructionCallback() {
+ reset();
+ }
- void runLoopCallback() noexcept override { reset(); }
+ void runLoopCallback() noexcept override {
+ reset();
+ }
- std::weak_ptr<void> getWeak() { return {alive_}; }
+ std::weak_ptr<void> getWeak() {
+ return {alive_};
+ }
private:
void reset() {
friend class FiberManager;
};
-
-}} // folly::fibers
+}
+} // folly::fibers
#include "EventBaseLoopController-inl.h"
#include <cassert>
-namespace folly { namespace fibers {
+namespace folly {
+namespace fibers {
template <typename F>
void Fiber::setFunction(F&& func) {
}
template <typename F, typename G>
-void Fiber::setFunctionFinally(F&& resultFunc,
- G&& finallyFunc) {
+void Fiber::setFunctionFinally(F&& resultFunc, G&& finallyFunc) {
assert(state_ == INVALID);
resultFunc_ = std::forward<F>(resultFunc);
finallyFunc_ = std::forward<G>(finallyFunc);
}
template <typename T>
-void Fiber::LocalData::dataHeapDestructor(void *ptr) {
+void Fiber::LocalData::dataHeapDestructor(void* ptr) {
reinterpret_cast<T*>(ptr)->~T();
freeHeapBuffer(ptr);
}
-
-}} // folly::fibers
+}
+} // folly::fibers
#include <sys/syscall.h>
#include <unistd.h>
+#include <glog/logging.h>
#include <algorithm>
#include <cstring>
#include <stdexcept>
-#include <glog/logging.h>
#include <folly/Likely.h>
#include <folly/Portability.h>
#include <folly/experimental/fibers/FiberManager.h>
#include <folly/portability/SysSyscall.h>
-namespace folly { namespace fibers {
+namespace folly {
+namespace fibers {
namespace {
static const uint64_t kMagic8Bytes = 0xfaceb00cfaceb00c;
uint64_t* end = static_cast<uint64_t*>(context.stackBase());
auto firstNonMagic = std::find_if(
- begin, end,
- [](uint64_t val) {
- return val != kMagic8Bytes;
- }
- );
+ begin, end, [](uint64_t val) { return val != kMagic8Bytes; });
return (end - firstNonMagic) * sizeof(uint64_t);
}
-} // anonymous namespace
+} // anonymous namespace
void Fiber::setData(intptr_t data) {
DCHECK_EQ(state_, AWAITING);
}
}
-Fiber::Fiber(FiberManager& fiberManager) :
- fiberManager_(fiberManager) {
-
+Fiber::Fiber(FiberManager& fiberManager) : fiberManager_(fiberManager) {
auto size = fiberManager_.options_.stackSize;
auto limit = fiberManager_.stackAllocator_.allocate(size);
auto limit = fcontext_.stackLimit();
auto base = fcontext_.stackBase();
- std::fill(static_cast<uint64_t*>(limit),
- static_cast<uint64_t*>(base),
- kMagic8Bytes);
+ std::fill(
+ static_cast<uint64_t*>(limit),
+ static_cast<uint64_t*>(base),
+ kMagic8Bytes);
// newer versions of boost allocate context on fiber stack,
// need to create a new one
Fiber::~Fiber() {
fiberManager_.stackAllocator_.deallocate(
- static_cast<unsigned char*>(fcontext_.stackLimit()),
- fiberManager_.options_.stackSize);
+ static_cast<unsigned char*>(fcontext_.stackLimit()),
+ fiberManager_.options_.stackSize);
}
void Fiber::recordStackPosition() {
int stackDummy;
auto currentPosition = static_cast<size_t>(
- static_cast<unsigned char*>(fcontext_.stackBase()) -
- static_cast<unsigned char*>(static_cast<void*>(&stackDummy)));
+ static_cast<unsigned char*>(fcontext_.stackBase()) -
+ static_cast<unsigned char*>(static_cast<void*>(&stackDummy)));
fiberManager_.stackHighWatermark_ =
- std::max(fiberManager_.stackHighWatermark_, currentPosition);
+ std::max(fiberManager_.stackHighWatermark_, currentPosition);
VLOG(4) << "Stack usage: " << currentPosition;
}
func_();
}
} catch (...) {
- fiberManager_.exceptionCallback_(std::current_exception(),
- "running Fiber func_/resultFunc_");
+ fiberManager_.exceptionCallback_(
+ std::current_exception(), "running Fiber func_/resultFunc_");
}
if (UNLIKELY(recordStackUsed_)) {
- fiberManager_.stackHighWatermark_ =
- std::max(fiberManager_.stackHighWatermark_,
- nonMagicInBytes(fcontext_));
+ fiberManager_.stackHighWatermark_ = std::max(
+ fiberManager_.stackHighWatermark_, nonMagicInBytes(fcontext_));
VLOG(3) << "Max stack usage: " << fiberManager_.stackHighWatermark_;
- CHECK(fiberManager_.stackHighWatermark_ <
- fiberManager_.options_.stackSize - 64) << "Fiber stack overflow";
+ CHECK(
+ fiberManager_.stackHighWatermark_ <
+ fiberManager_.options_.stackSize - 64)
+ << "Fiber stack overflow";
}
state_ = INVALID;
void Fiber::LocalData::freeHeapBuffer(void* buffer) {
delete[] reinterpret_cast<char*>(buffer);
}
-
-}}
+}
+}
#include <boost/context/all.hpp>
#include <boost/version.hpp>
-namespace folly { namespace fibers {
+namespace folly {
+namespace fibers {
class Baton;
class FiberManager;
*/
std::pair<void*, size_t> getStack() const {
void* const stack =
- std::min<void*>(fcontext_.stackLimit(), fcontext_.stackBase());
+ std::min<void*>(fcontext_.stackLimit(), fcontext_.stackBase());
const size_t size = std::abs<intptr_t>(
reinterpret_cast<intptr_t>(fcontext_.stackBase()) -
reinterpret_cast<intptr_t>(fcontext_.stackLimit()));
- return { stack, size };
+ return {stack, size};
}
private:
enum State {
- INVALID, /**< Does't have task function */
- NOT_STARTED, /**< Has task function, not started */
- READY_TO_RUN, /**< Was started, blocked, then unblocked */
- RUNNING, /**< Is running right now */
- AWAITING, /**< Is currently blocked */
- AWAITING_IMMEDIATE, /**< Was preempted to run an immediate function,
- and will be resumed right away */
- YIELDED, /**< The fiber yielded execution voluntarily */
+ INVALID, /**< Does't have task function */
+ NOT_STARTED, /**< Has task function, not started */
+ READY_TO_RUN, /**< Was started, blocked, then unblocked */
+ RUNNING, /**< Is running right now */
+ AWAITING, /**< Is currently blocked */
+ AWAITING_IMMEDIATE, /**< Was preempted to run an immediate function,
+ and will be resumed right away */
+ YIELDED, /**< The fiber yielded execution voluntarily */
};
- State state_{INVALID}; /**< current Fiber state */
+ State state_{INVALID}; /**< current Fiber state */
friend class Baton;
friend class FiberManager;
*/
void recordStackPosition();
- FiberManager& fiberManager_; /**< Associated FiberManager */
- FContext fcontext_; /**< current task execution context */
- intptr_t data_; /**< Used to keep some data with the Fiber */
+ FiberManager& fiberManager_; /**< Associated FiberManager */
+ FContext fcontext_; /**< current task execution context */
+ intptr_t data_; /**< Used to keep some data with the Fiber */
std::shared_ptr<RequestContext> rcontext_; /**< current RequestContext */
- folly::Function<void()> func_; /**< task function */
+ folly::Function<void()> func_; /**< task function */
bool recordStackUsed_{false};
bool stackFilledWithMagic_{false};
void reset();
- //private:
+ // private:
template <typename T>
FOLLY_NOINLINE T& getSlow();
folly::IntrusiveListHook globalListHook_; /**< list hook for global list */
pid_t threadId_{0};
};
-
-}}
+}
+}
#include <folly/experimental/fibers/Fiber-inl.h>
#include <folly/futures/Promise.h>
#include <folly/futures/Try.h>
-namespace folly { namespace fibers {
+namespace folly {
+namespace fibers {
namespace {
return opts;
}
-} // anonymous
+} // anonymous
inline void FiberManager::ensureLoopScheduled() {
if (isLoopScheduled_) {
assert(activeFiber_ == nullptr);
};
- assert(fiber->state_ == Fiber::NOT_STARTED ||
- fiber->state_ == Fiber::READY_TO_RUN);
+ assert(
+ fiber->state_ == Fiber::NOT_STARTED ||
+ fiber->state_ == Fiber::READY_TO_RUN);
currentFiber_ = fiber;
fiber->rcontext_ = RequestContext::setContext(std::move(fiber->rcontext_));
if (observer_) {
runReadyFiber(&fiber);
}
- remoteReadyQueue_.sweep(
- [this, &hadRemoteFiber] (Fiber* fiber) {
- runReadyFiber(fiber);
- hadRemoteFiber = true;
+ remoteReadyQueue_.sweep([this, &hadRemoteFiber](Fiber* fiber) {
+ runReadyFiber(fiber);
+ hadRemoteFiber = true;
+ });
+
+ remoteTaskQueue_.sweep([this, &hadRemoteFiber](RemoteTask* taskPtr) {
+ std::unique_ptr<RemoteTask> task(taskPtr);
+ auto fiber = getFiber();
+ if (task->localData) {
+ fiber->localData_ = *task->localData;
}
- );
-
- remoteTaskQueue_.sweep(
- [this, &hadRemoteFiber] (RemoteTask* taskPtr) {
- std::unique_ptr<RemoteTask> task(taskPtr);
- auto fiber = getFiber();
- if (task->localData) {
- fiber->localData_ = *task->localData;
- }
- fiber->rcontext_ = std::move(task->rcontext);
+ fiber->rcontext_ = std::move(task->rcontext);
- fiber->setFunction(std::move(task->func));
- fiber->data_ = reinterpret_cast<intptr_t>(fiber);
- if (observer_) {
- observer_->runnable(reinterpret_cast<uintptr_t>(fiber));
- }
- runReadyFiber(fiber);
- hadRemoteFiber = true;
+ fiber->setFunction(std::move(task->func));
+ fiber->data_ = reinterpret_cast<intptr_t>(fiber);
+ if (observer_) {
+ observer_->runnable(reinterpret_cast<uintptr_t>(fiber));
}
- );
+ runReadyFiber(fiber);
+ hadRemoteFiber = true;
+ });
}
if (observer_) {
class Func;
static constexpr bool allocateInBuffer =
- sizeof(Func) <= Fiber::kUserBufferSize;
+ sizeof(Func) <= Fiber::kUserBufferSize;
class Func {
public:
- Func(F&& func, FiberManager& fm) :
- func_(std::forward<F>(func)), fm_(fm) {}
+ Func(F&& func, FiberManager& fm) : func_(std::forward<F>(func)), fm_(fm) {}
void operator()() {
try {
func_();
} catch (...) {
- fm_.exceptionCallback_(std::current_exception(),
- "running Func functor");
+ fm_.exceptionCallback_(
+ std::current_exception(), "running Func functor");
}
if (allocateInBuffer) {
this->~Func();
using T = typename std::result_of<F()>::type;
folly::Promise<T> p;
auto f = p.getFuture();
- addTaskFinally([func = std::forward<F>(func)]() mutable { return func(); },
- [p = std::move(p)](folly::Try<T> && t) mutable {
- p.setTry(std::move(t));
- });
+ addTaskFinally(
+ [func = std::forward<F>(func)]() mutable { return func(); },
+ [p = std::move(p)](folly::Try<T> && t) mutable {
+ p.setTry(std::move(t));
+ });
return f;
}
void FiberManager::addTaskRemote(F&& func) {
auto task = [&]() {
auto currentFm = getFiberManagerUnsafe();
- if (currentFm &&
- currentFm->currentFiber_ &&
+ if (currentFm && currentFm->currentFiber_ &&
currentFm->localType_ == localType_) {
return folly::make_unique<RemoteTask>(
std::forward<F>(func), currentFm->currentFiber_->localData_);
}
return folly::make_unique<RemoteTask>(std::forward<F>(func));
}();
- auto insertHead =
- [&]() { return remoteTaskQueue_.insertHead(task.release()); };
+ auto insertHead = [&]() {
+ return remoteTaskQueue_.insertHead(task.release());
+ };
loopController_->scheduleThreadSafe(std::ref(insertHead));
}
}
template <typename X>
-struct IsRvalueRefTry { static const bool value = false; };
+struct IsRvalueRefTry {
+ static const bool value = false;
+};
template <typename T>
-struct IsRvalueRefTry<folly::Try<T>&&> { static const bool value = true; };
+struct IsRvalueRefTry<folly::Try<T>&&> {
+ static const bool value = true;
+};
// We need this to be in a struct, not inlined in addTaskFinally, because clang
// crashes otherwise.
try {
finally_(std::move(*result_));
} catch (...) {
- fm_.exceptionCallback_(std::current_exception(),
- "running Finally functor");
+ fm_.exceptionCallback_(
+ std::current_exception(), "running Finally functor");
}
if (allocateInBuffer) {
typedef typename std::result_of<F()>::type Result;
static_assert(
- IsRvalueRefTry<typename FirstArgOf<G>::type>::value,
- "finally(arg): arg must be Try<T>&&");
+ IsRvalueRefTry<typename FirstArgOf<G>::type>::value,
+ "finally(arg): arg must be Try<T>&&");
static_assert(
- std::is_convertible<
- Result,
- typename std::remove_reference<
- typename FirstArgOf<G>::type
- >::type::element_type
- >::value,
- "finally(Try<T>&&): T must be convertible from func()'s return type");
+ std::is_convertible<
+ Result,
+ typename std::remove_reference<
+ typename FirstArgOf<G>::type>::type::element_type>::value,
+ "finally(Try<T>&&): T must be convertible from func()'s return type");
auto fiber = getFiber();
initLocalData(*fiber);
Helper;
if (Helper::allocateInBuffer) {
- auto funcLoc = static_cast<typename Helper::Func*>(
- fiber->getUserBuffer());
- auto finallyLoc = static_cast<typename Helper::Finally*>(
- static_cast<void*>(funcLoc + 1));
+ auto funcLoc = static_cast<typename Helper::Func*>(fiber->getUserBuffer());
+ auto finallyLoc =
+ static_cast<typename Helper::Finally*>(static_cast<void*>(funcLoc + 1));
new (finallyLoc) typename Helper::Finally(std::forward<G>(finally), *this);
new (funcLoc) typename Helper::Func(std::forward<F>(func), *finallyLoc);
}
template <typename F>
-typename std::result_of<F()>::type
-FiberManager::runInMainContext(F&& func) {
+typename std::result_of<F()>::type FiberManager::runInMainContext(F&& func) {
if (UNLIKELY(activeFiber_ == nullptr)) {
return func();
}
template <typename LocalT>
FiberManager::FiberManager(
- LocalType<LocalT>,
- std::unique_ptr<LoopController> loopController__,
- Options options) :
- loopController_(std::move(loopController__)),
- stackAllocator_(options.useGuardPages),
- options_(preprocessOptions(std::move(options))),
- exceptionCallback_([](std::exception_ptr eptr, std::string context) {
+ LocalType<LocalT>,
+ std::unique_ptr<LoopController> loopController__,
+ Options options)
+ : loopController_(std::move(loopController__)),
+ stackAllocator_(options.useGuardPages),
+ options_(preprocessOptions(std::move(options))),
+ exceptionCallback_([](std::exception_ptr eptr, std::string context) {
try {
std::rethrow_exception(eptr);
} catch (const std::exception& e) {
- LOG(DFATAL) << "Exception " << typeid(e).name()
- << " with message '" << e.what() << "' was thrown in "
+ LOG(DFATAL) << "Exception " << typeid(e).name() << " with message '"
+ << e.what() << "' was thrown in "
<< "FiberManager with context '" << context << "'";
throw;
} catch (...) {
throw;
}
}),
- timeoutManager_(std::make_shared<TimeoutController>(*loopController_)),
- fibersPoolResizer_(*this),
- localType_(typeid(LocalT)) {
+ timeoutManager_(std::make_shared<TimeoutController>(*loopController_)),
+ fibersPoolResizer_(*this),
+ localType_(typeid(LocalT)) {
loopController_->setFiberManager(this);
}
template <typename F>
-typename FirstArgOf<F>::type::value_type
-inline await(F&& func) {
+typename FirstArgOf<F>::type::value_type inline await(F&& func) {
typedef typename FirstArgOf<F>::type::value_type Result;
folly::Try<Result> result;
Baton baton;
baton.wait([&func, &result, &baton]() mutable {
- func(Promise<Result>(result, baton));
- });
+ func(Promise<Result>(result, baton));
+ });
return folly::moveFromTry(result);
}
-
-}}
+}
+}
typedef void (*AsanEnterFiberFuncPtr)(void const*, size_t);
typedef void (*AsanExitFiberFuncPtr)();
-namespace folly { namespace fibers {
+namespace folly {
+namespace fibers {
static AsanEnterFiberFuncPtr getEnterFiberFunc();
static AsanExitFiberFuncPtr getExitFiberFunc();
-
-}}
+}
+}
#endif
-namespace folly { namespace fibers {
+namespace folly {
+namespace fibers {
FOLLY_TLS FiberManager* FiberManager::currentFiberManager_ = nullptr;
-FiberManager::FiberManager(std::unique_ptr<LoopController> loopController,
- Options options) :
- FiberManager(LocalType<void>(),
- std::move(loopController),
- std::move(options)) {}
+FiberManager::FiberManager(
+ std::unique_ptr<LoopController> loopController,
+ Options options)
+ : FiberManager(
+ LocalType<void>(),
+ std::move(loopController),
+ std::move(options)) {}
FiberManager::~FiberManager() {
if (isLoopScheduled_) {
}
while (!fibersPool_.empty()) {
- fibersPool_.pop_front_and_dispose([] (Fiber* fiber) {
- delete fiber;
- });
+ fibersPool_.pop_front_and_dispose([](Fiber* fiber) { delete fiber; });
}
assert(readyFibers_.empty());
assert(fibersActive_ == 0);
}
bool FiberManager::hasTasks() const {
- return fibersActive_ > 0 ||
- !remoteReadyQueue_.empty() ||
- !remoteTaskQueue_.empty();
+ return fibersActive_ > 0 || !remoteReadyQueue_.empty() ||
+ !remoteTaskQueue_.empty();
}
Fiber* FiberManager::getFiber() {
}
++fiberId_;
bool recordStack = (options_.recordStackEvery != 0) &&
- (fiberId_ % options_.recordStackEvery == 0);
+ (fiberId_ % options_.recordStackEvery == 0);
return fiber;
}
fiberManager_.timeoutManager_->registerTimeout(
*this,
std::chrono::milliseconds(
- fiberManager_.options_.fibersPoolResizePeriodMs));
+ fiberManager_.options_.fibersPoolResizePeriodMs));
}
#ifdef FOLLY_SANITIZE_ADDRESS
}
#endif // FOLLY_SANITIZE_ADDRESS
-}}
+}
+}
#include <memory>
#include <queue>
#include <thread>
-#include <typeindex>
#include <type_traits>
+#include <typeindex>
#include <unordered_set>
#include <vector>
class TimeoutController;
template <typename T>
-class LocalType {
-};
+class LocalType {};
class InlineFunctionRunner {
public:
* @param loopController
* @param options FiberManager options
*/
- explicit FiberManager(std::unique_ptr<LoopController> loopController,
- Options options = Options());
+ explicit FiberManager(
+ std::unique_ptr<LoopController> loopController,
+ Options options = Options());
/**
* Initializes, but doesn't start FiberManager loop
* Locals of other types will be considered thread-locals.
*/
template <typename LocalT>
- FiberManager(LocalType<LocalT>,
- std::unique_ptr<LoopController> loopController,
- Options options = Options());
-
+ FiberManager(
+ LocalType<LocalT>,
+ std::unique_ptr<LoopController> loopController,
+ Options options = Options());
~FiberManager();
* @return value returned by func().
*/
template <typename F>
- typename std::result_of<F()>::type
- runInMainContext(F&& func);
+ typename std::result_of<F()>::type runInMainContext(F&& func);
/**
* Returns a refference to a fiber-local context for given Fiber. Should be
struct RemoteTask {
template <typename F>
- explicit RemoteTask(F&& f) :
- func(std::forward<F>(f)),
- rcontext(RequestContext::saveContext()) {}
+ explicit RemoteTask(F&& f)
+ : func(std::forward<F>(f)), rcontext(RequestContext::saveContext()) {}
template <typename F>
- RemoteTask(F&& f, const Fiber::LocalData& localData_) :
- func(std::forward<F>(f)),
- localData(folly::make_unique<Fiber::LocalData>(localData_)),
- rcontext(RequestContext::saveContext()) {}
+ RemoteTask(F&& f, const Fiber::LocalData& localData_)
+ : func(std::forward<F>(f)),
+ localData(folly::make_unique<Fiber::LocalData>(localData_)),
+ rcontext(RequestContext::saveContext()) {}
folly::Function<void()> func;
std::unique_ptr<Fiber::LocalData> localData;
std::shared_ptr<RequestContext> rcontext;
*/
Fiber* currentFiber_{nullptr};
- FiberTailQueue readyFibers_; /**< queue of fibers ready to be executed */
- FiberTailQueue yieldedFibers_; /**< queue of fibers which have yielded
- execution */
- FiberTailQueue fibersPool_; /**< pool of unitialized Fiber objects */
+ FiberTailQueue readyFibers_; /**< queue of fibers ready to be executed */
+ FiberTailQueue yieldedFibers_; /**< queue of fibers which have yielded
+ execution */
+ FiberTailQueue fibersPool_; /**< pool of unitialized Fiber objects */
GlobalFiberTailQueue allFibers_; /**< list of all Fiber objects owned */
- size_t fibersAllocated_{0}; /**< total number of fibers allocated */
- size_t fibersPoolSize_{0}; /**< total number of fibers in the free pool */
- size_t fibersActive_{0}; /**< number of running or blocked fibers */
- size_t fiberId_{0}; /**< id of last fiber used */
+ size_t fibersAllocated_{0}; /**< total number of fibers allocated */
+ size_t fibersPoolSize_{0}; /**< total number of fibers in the free pool */
+ size_t fibersActive_{0}; /**< number of running or blocked fibers */
+ size_t fiberId_{0}; /**< id of last fiber used */
/**
* Maximum number of active fibers in the last period lasting
*/
size_t maxFibersActiveLastPeriod_{0};
- FContext::ContextStruct mainContext_; /**< stores loop function context */
+ FContext::ContextStruct mainContext_; /**< stores loop function context */
std::unique_ptr<LoopController> loopController_;
bool isLoopScheduled_{false}; /**< was the ready loop scheduled to run? */
*/
GuardPageAllocator stackAllocator_;
- const Options options_; /**< FiberManager options */
+ const Options options_; /**< FiberManager options */
/**
* Largest observed individual Fiber stack usage in bytes.
std::shared_ptr<TimeoutController> timeoutManager_;
struct FibersPoolResizer {
- explicit FibersPoolResizer(FiberManager& fm) :
- fiberManager_(fm) {}
+ explicit FibersPoolResizer(FiberManager& fm) : fiberManager_(fm) {}
void operator()();
+
private:
FiberManager& fiberManager_;
};
template <typename F, typename G>
inline void addTaskFinally(F&& func, G&& finally) {
return FiberManager::getFiberManager().addTaskFinally(
- std::forward<F>(func), std::forward<G>(finally));
+ std::forward<F>(func), std::forward<G>(finally));
}
/**
* @return data which was used to fulfill the promise.
*/
template <typename F>
-typename FirstArgOf<F>::type::value_type
-inline await(F&& func);
+typename FirstArgOf<F>::type::value_type inline await(F&& func);
/**
* If called from a fiber, immediately switches to the FiberManager's context
* @return value returned by func().
*/
template <typename F>
-typename std::result_of<F()>::type
-inline runInMainContext(F&& func) {
+typename std::result_of<F()>::type inline runInMainContext(F&& func) {
auto fm = FiberManager::getFiberManagerUnsafe();
if (UNLIKELY(fm == nullptr)) {
return func();
std::this_thread::yield();
}
}
-
-}}
+}
+}
#include "FiberManager-inl.h"
#include <memory>
#include <unordered_map>
-#include <folly/ThreadLocal.h>
#include <folly/Synchronized.h>
+#include <folly/ThreadLocal.h>
-namespace folly { namespace fibers {
+namespace folly {
+namespace fibers {
namespace {
ThreadLocalCache() {}
struct ThreadLocalCacheTag {};
- using ThreadThreadLocalCache = ThreadLocal<ThreadLocalCache, ThreadLocalCacheTag>;
+ using ThreadThreadLocalCache =
+ ThreadLocal<ThreadLocalCache, ThreadLocalCacheTag>;
// Leak this intentionally. During shutdown, we may call getFiberManager,
// and want access to the fiber managers during that time.
static ThreadThreadLocalCache& instance() {
- static auto ret = new ThreadThreadLocalCache([]() { return new ThreadLocalCache(); });
+ static auto ret =
+ new ThreadThreadLocalCache([]() { return new ThreadLocalCache(); });
return *ret;
}
} // namespace
-FiberManager& getFiberManager(EventBase& evb,
- const FiberManager::Options& opts) {
+FiberManager& getFiberManager(
+ EventBase& evb,
+ const FiberManager::Options& opts) {
return ThreadLocalCache::get(evb, opts);
}
-
-}}
+}
+}
*/
#pragma once
-#include <folly/experimental/fibers/FiberManager.h>
#include <folly/experimental/fibers/EventBaseLoopController.h>
+#include <folly/experimental/fibers/FiberManager.h>
-namespace folly { namespace fibers {
+namespace folly {
+namespace fibers {
FiberManager& getFiberManager(
folly::EventBase& evb,
const FiberManager::Options& opts = FiberManager::Options());
-
-}}
+}
+}
*/
#include <folly/experimental/fibers/FiberManager.h>
-namespace folly { namespace fibers {
+namespace folly {
+namespace fibers {
namespace {
template <class F, class G>
typename std::enable_if<
- !std::is_same<typename std::result_of<F()>::type, void>::value, void>::type
-inline callFuncs(F&& f, G&& g, size_t id) {
+ !std::is_same<typename std::result_of<F()>::type, void>::value,
+ void>::type inline callFuncs(F&& f, G&& g, size_t id) {
g(id, f());
}
template <class F, class G>
typename std::enable_if<
- std::is_same<typename std::result_of<F()>::type, void>::value, void>::type
-inline callFuncs(F&& f, G&& g, size_t id) {
+ std::is_same<typename std::result_of<F()>::type, void>::value,
+ void>::type inline callFuncs(F&& f, G&& g, size_t id) {
f();
g(id);
}
-} // anonymous namespace
+} // anonymous namespace
template <class InputIterator, class F>
inline void forEach(InputIterator first, InputIterator last, F&& f) {
#pragma clang diagnostic push // ignore generalized lambda capture warning
#pragma clang diagnostic ignored "-Wc++1y-extensions"
#endif
- auto taskFunc =
- [&tasksTodo, &e, &f, &baton] (size_t id, FuncType&& func) {
- return [id, &tasksTodo, &e, &f, &baton,
- func_ = std::forward<FuncType>(func)]() mutable {
- try {
- callFuncs(std::forward<FuncType>(func_), f, id);
- } catch (...) {
- e = std::current_exception();
- }
- if (--tasksTodo == 0) {
- baton.post();
- }
- };
+ auto taskFunc = [&tasksTodo, &e, &f, &baton](size_t id, FuncType&& func) {
+ return [
+ id,
+ &tasksTodo,
+ &e,
+ &f,
+ &baton,
+ func_ = std::forward<FuncType>(func)
+ ]() mutable {
+ try {
+ callFuncs(std::forward<FuncType>(func_), f, id);
+ } catch (...) {
+ e = std::current_exception();
+ }
+ if (--tasksTodo == 0) {
+ baton.post();
+ }
};
+ };
#ifdef __clang__
#pragma clang diagnostic pop
#endif
std::rethrow_exception(e);
}
}
-
-}} // folly::fibers
+}
+} // folly::fibers
*/
#pragma once
-namespace folly { namespace fibers {
+namespace folly {
+namespace fibers {
/**
* Schedules several tasks and blocks until all of them are completed.
*/
template <class InputIterator, class F>
inline void forEach(InputIterator first, InputIterator last, F&& f);
-
-}} // folly::fibers
+}
+} // folly::fibers
#include <folly/experimental/fibers/ForEach-inl.h>
#include <folly/experimental/fibers/Baton.h>
-namespace folly { namespace fibers {
+namespace folly {
+namespace fibers {
typedef Baton GenericBaton;
-
-}}
+}
+}
#include <glog/logging.h>
-namespace folly { namespace fibers {
+namespace folly {
+namespace fibers {
/**
* Each stack with a guard page creates two memory mappings.
*/
class StackCache {
public:
- explicit StackCache(size_t stackSize)
- : allocSize_(allocSize(stackSize)) {
- auto p = ::mmap(nullptr, allocSize_ * kNumGuarded,
- PROT_READ | PROT_WRITE,
- MAP_PRIVATE | MAP_ANONYMOUS,
- -1, 0);
+ explicit StackCache(size_t stackSize) : allocSize_(allocSize(stackSize)) {
+ auto p = ::mmap(
+ nullptr,
+ allocSize_ * kNumGuarded,
+ PROT_READ | PROT_WRITE,
+ MAP_PRIVATE | MAP_ANONYMOUS,
+ -1,
+ 0);
PCHECK(p != (void*)(-1));
storage_ = reinterpret_cast<unsigned char*>(p);
/* Returns a multiple of pagesize() enough to store size + one guard page */
static size_t allocSize(size_t size) {
- return pagesize() * ((size + pagesize() - 1)/pagesize() + 1);
+ return pagesize() * ((size + pagesize() - 1) / pagesize() + 1);
}
};
class StackCacheEntry {
public:
explicit StackCacheEntry(size_t stackSize)
- : stackCache_(folly::make_unique<StackCache>(stackSize)) {
- }
+ : stackCache_(folly::make_unique<StackCache>(stackSize)) {}
StackCache& cache() const noexcept {
return *stackCache_;
};
GuardPageAllocator::GuardPageAllocator(bool useGuardPages)
- : useGuardPages_(useGuardPages) {
-}
+ : useGuardPages_(useGuardPages) {}
GuardPageAllocator::~GuardPageAllocator() = default;
fallbackAllocator_.deallocate(limit, size);
}
}
-
-}} // folly::fibers
+}
+} // folly::fibers
#include <memory>
-namespace folly { namespace fibers {
+namespace folly {
+namespace fibers {
class StackCacheEntry;
std::allocator<unsigned char> fallbackAllocator_;
bool useGuardPages_{true};
};
-
-}} // folly::fibers
+}
+} // folly::fibers
#include <chrono>
#include <functional>
-namespace folly { namespace fibers {
+namespace folly {
+namespace fibers {
class FiberManager;
*/
virtual void timedSchedule(std::function<void()> func, TimePoint time) = 0;
};
-
-}} // folly::fibers
+}
+} // folly::fibers
*/
#include <folly/experimental/fibers/Baton.h>
-namespace folly { namespace fibers {
+namespace folly {
+namespace fibers {
template <class T>
-Promise<T>::Promise(folly::Try<T>& value, Baton& baton) :
- value_(&value), baton_(&baton)
-{}
+Promise<T>::Promise(folly::Try<T>& value, Baton& baton)
+ : value_(&value), baton_(&baton) {}
template <class T>
-Promise<T>::Promise(Promise&& other) noexcept :
-value_(other.value_), baton_(other.baton_) {
+Promise<T>::Promise(Promise&& other) noexcept
+ : value_(other.value_), baton_(other.baton_) {
other.value_ = nullptr;
other.baton_ = nullptr;
}
template <class T>
template <class M>
void Promise<T>::setValue(M&& v) {
- static_assert(!std::is_same<T, void>::value,
- "Use setValue() instead");
+ static_assert(!std::is_same<T, void>::value, "Use setValue() instead");
setTry(folly::Try<T>(std::forward<M>(v)));
}
template <class T>
void Promise<T>::setValue() {
- static_assert(std::is_same<T, void>::value,
- "Use setValue(value) instead");
+ static_assert(std::is_same<T, void>::value, "Use setValue(value) instead");
setTry(folly::Try<void>());
}
void Promise<T>::setWith(F&& func) {
setTry(makeTryWith(std::forward<F>(func)));
}
-
-}}
+}
+}
#include <folly/experimental/fibers/traits.h>
#include <folly/futures/Try.h>
-namespace folly { namespace fibers {
+namespace folly {
+namespace fibers {
class Baton;
template <typename F>
-typename FirstArgOf<F>::type::value_type
-inline await(F&& func);
+typename FirstArgOf<F>::type::value_type inline await(F&& func);
template <typename T>
class Promise {
template <class F>
typename std::enable_if<
- std::is_convertible<typename std::result_of<F()>::type, T>::value &&
- !std::is_same<T, void>::value>::type
+ std::is_convertible<typename std::result_of<F()>::type, T>::value &&
+ !std::is_same<T, void>::value>::type
fulfilHelper(F&& func);
template <class F>
typename std::enable_if<
- std::is_same<typename std::result_of<F()>::type, void>::value &&
- std::is_same<T, void>::value>::type
+ std::is_same<typename std::result_of<F()>::type, void>::value &&
+ std::is_same<T, void>::value>::type
fulfilHelper(F&& func);
};
-
-}}
+}
+}
#include <folly/experimental/fibers/Promise-inl.h>
#include <folly/experimental/fibers/LoopController.h>
-namespace folly { namespace fibers {
+namespace folly {
+namespace fibers {
class FiberManager;
class SimpleLoopController : public LoopController {
public:
- SimpleLoopController()
- : fm_(nullptr),
- stopRequested_(false) {
- }
+ SimpleLoopController() : fm_(nullptr), stopRequested_(false) {}
/**
* Run FiberManager loop; if no ready task are present,
auto time = Clock::now();
- for (size_t i=0; i<scheduledFuncs_.size(); ++i) {
+ for (size_t i = 0; i < scheduledFuncs_.size(); ++i) {
if (scheduledFuncs_[i].first <= time) {
scheduledFuncs_[i].second();
swap(scheduledFuncs_[i], scheduledFuncs_.back());
friend class FiberManager;
};
-
-}} // folly::fibers
+}
+} // folly::fibers
*/
#pragma once
-
-namespace folly { namespace fibers {
+namespace folly {
+namespace fibers {
//
// TimedMutex implementation
assert(state_ == State::READ_LOCKED);
return;
}
- assert((state_ == State::UNLOCKED && readers_ == 0) ||
- (state_ == State::READ_LOCKED && readers_ > 0));
+ assert(
+ (state_ == State::UNLOCKED && readers_ == 0) ||
+ (state_ == State::READ_LOCKED && readers_ > 0));
assert(read_waiters_.empty());
state_ = State::READ_LOCKED;
readers_ += 1;
}
return true;
}
- assert((state_ == State::UNLOCKED && readers_ == 0) ||
- (state_ == State::READ_LOCKED && readers_ > 0));
+ assert(
+ (state_ == State::UNLOCKED && readers_ == 0) ||
+ (state_ == State::READ_LOCKED && readers_ > 0));
assert(read_waiters_.empty());
state_ = State::READ_LOCKED;
readers_ += 1;
bool TimedRWMutex<BatonType>::try_read_lock() {
pthread_spin_lock(&lock_);
if (state_ != State::WRITE_LOCKED) {
- assert((state_ == State::UNLOCKED && readers_ == 0) ||
- (state_ == State::READ_LOCKED && readers_ > 0));
+ assert(
+ (state_ == State::UNLOCKED && readers_ == 0) ||
+ (state_ == State::READ_LOCKED && readers_ > 0));
assert(read_waiters_.empty());
state_ = State::READ_LOCKED;
readers_ += 1;
void TimedRWMutex<BatonType>::unlock() {
pthread_spin_lock(&lock_);
assert(state_ != State::UNLOCKED);
- assert((state_ == State::READ_LOCKED && readers_ > 0) ||
- (state_ == State::WRITE_LOCKED && readers_ == 0));
+ assert(
+ (state_ == State::READ_LOCKED && readers_ > 0) ||
+ (state_ == State::WRITE_LOCKED && readers_ == 0));
if (state_ == State::READ_LOCKED) {
readers_ -= 1;
}
if (!read_waiters_.empty()) {
- assert(state_ == State::WRITE_LOCKED && readers_ == 0 &&
- "read waiters can only accumulate while write locked");
+ assert(
+ state_ == State::WRITE_LOCKED && readers_ == 0 &&
+ "read waiters can only accumulate while write locked");
state_ = State::READ_LOCKED;
readers_ = read_waiters_.size();
}
pthread_spin_unlock(&lock_);
}
-
-}}
+}
+}
#include <folly/experimental/fibers/GenericBaton.h>
-namespace folly { namespace fibers {
+namespace folly {
+namespace fibers {
/**
* @class TimedMutex
//
// @return true if the mutex was locked, false otherwise
template <typename Rep, typename Period>
- bool timed_lock(
- const std::chrono::duration<Rep, Period>& duration);
+ bool timed_lock(const std::chrono::duration<Rep, Period>& duration);
// Try to obtain lock without blocking the thread or fiber
bool try_lock();
MutexWaiterHookType hook;
};
- typedef boost::intrusive::member_hook<MutexWaiter,
- MutexWaiterHookType,
- &MutexWaiter::hook> MutexWaiterHook;
+ typedef boost::intrusive::
+ member_hook<MutexWaiter, MutexWaiterHookType, &MutexWaiter::hook>
+ MutexWaiterHook;
- typedef boost::intrusive::list<MutexWaiter,
- MutexWaiterHook,
- boost::intrusive::constant_time_size<true>>
- MutexWaiterList;
+ typedef boost::intrusive::list<
+ MutexWaiter,
+ MutexWaiterHook,
+ boost::intrusive::constant_time_size<true>>
+ MutexWaiterList;
- pthread_spinlock_t lock_; //< lock to protect waiter list
- bool locked_ = false; //< is this locked by some thread?
- MutexWaiterList waiters_; //< list of waiters
+ pthread_spinlock_t lock_; //< lock to protect waiter list
+ bool locked_ = false; //< is this locked by some thread?
+ MutexWaiterList waiters_; //< list of waiters
};
/**
bool try_write_lock();
// Wrapper for write_lock() for compatibility with Mutex
- void lock() { write_lock(); }
+ void lock() {
+ write_lock();
+ }
// Realease the lock. The thread / fiber will wake up all readers if there are
// any. If there are waiting writers then only one of them will be woken up.
class ReadHolder {
public:
- explicit ReadHolder(TimedRWMutex& lock)
- : lock_(&lock) {
+ explicit ReadHolder(TimedRWMutex& lock) : lock_(&lock) {
lock_->read_lock();
}
MutexWaiterHookType hook;
};
- typedef boost::intrusive::member_hook<MutexWaiter,
- MutexWaiterHookType,
- &MutexWaiter::hook> MutexWaiterHook;
+ typedef boost::intrusive::
+ member_hook<MutexWaiter, MutexWaiterHookType, &MutexWaiter::hook>
+ MutexWaiterHook;
- typedef boost::intrusive::list<MutexWaiter,
- MutexWaiterHook,
- boost::intrusive::constant_time_size<true>>
- MutexWaiterList;
+ typedef boost::intrusive::list<
+ MutexWaiter,
+ MutexWaiterHook,
+ boost::intrusive::constant_time_size<true>>
+ MutexWaiterList;
- pthread_spinlock_t lock_; //< lock protecting the internal state
- // (state_, read_waiters_, etc.)
+ pthread_spinlock_t lock_; //< lock protecting the internal state
+ // (state_, read_waiters_, etc.)
State state_ = State::UNLOCKED;
- uint32_t readers_ = 0; //< Number of readers who have the lock
+ uint32_t readers_ = 0; //< Number of readers who have the lock
- MutexWaiterList write_waiters_; //< List of thread / fibers waiting for
- // exclusive access
+ MutexWaiterList write_waiters_; //< List of thread / fibers waiting for
+ // exclusive access
- MutexWaiterList read_waiters_; //< List of thread / fibers waiting for
- // shared access
+ MutexWaiterList read_waiters_; //< List of thread / fibers waiting for
+ // shared access
};
-
-}}
+}
+}
#include "TimedMutex-inl.h"
#include "TimeoutController.h"
#include <folly/Memory.h>
-namespace folly { namespace fibers {
+namespace folly {
+namespace fibers {
-TimeoutController::TimeoutController(LoopController& loopController) :
- nextTimeout_(TimePoint::max()),
- loopController_(loopController) {}
+TimeoutController::TimeoutController(LoopController& loopController)
+ : nextTimeout_(TimePoint::max()), loopController_(loopController) {}
-intptr_t TimeoutController::registerTimeout(std::function<void()> f,
- Duration duration) {
+intptr_t TimeoutController::registerTimeout(
+ std::function<void()> f,
+ Duration duration) {
auto& list = [&]() -> TimeoutHandleList& {
for (auto& bucket : timeoutHandleBuckets_) {
if (bucket.first == duration) {
}
}
- timeoutHandleBuckets_.emplace_back(duration,
- folly::make_unique<TimeoutHandleList>());
+ timeoutHandleBuckets_.emplace_back(
+ duration, folly::make_unique<TimeoutHandleList>());
return *timeoutHandleBuckets_.back().second;
}();
auto time = nextTimeout_;
std::weak_ptr<TimeoutController> timeoutControllerWeak = shared_from_this();
- loopController_.timedSchedule([timeoutControllerWeak, time]() {
- if (auto timeoutController = timeoutControllerWeak.lock()) {
- timeoutController->runTimeouts(time);
- }
- }, time);
+ loopController_.timedSchedule(
+ [timeoutControllerWeak, time]() {
+ if (auto timeoutController = timeoutControllerWeak.lock()) {
+ timeoutController->runTimeouts(time);
+ }
+ },
+ time);
}
void TimeoutController::cancel(intptr_t p) {
list.pop();
}
}
-
-}}
+}
+}
#include <folly/experimental/fibers/LoopController.h>
-namespace folly { namespace fibers {
+namespace folly {
+namespace fibers {
-class TimeoutController :
- public std::enable_shared_from_this<TimeoutController> {
+class TimeoutController
+ : public std::enable_shared_from_this<TimeoutController> {
public:
typedef std::chrono::steady_clock Clock;
typedef std::chrono::time_point<Clock> TimePoint;
typedef std::unique_ptr<TimeoutHandleList> TimeoutHandleListPtr;
struct TimeoutHandle {
- TimeoutHandle(std::function<void()> func_,
- TimePoint timeout_,
- TimeoutHandleList& list_) :
- func(std::move(func_)), timeout(timeout_), list(list_) {}
+ TimeoutHandle(
+ std::function<void()> func_,
+ TimePoint timeout_,
+ TimeoutHandleList& list_)
+ : func(std::move(func_)), timeout(timeout_), list(list_) {}
std::function<void()> func;
bool canceled{false};
TimePoint nextTimeout_;
LoopController& loopController_;
};
-
-}}
+}
+}
#include <folly/experimental/fibers/FiberManager.h>
#include <folly/experimental/fibers/ForEach.h>
-namespace folly { namespace fibers {
+namespace folly {
+namespace fibers {
template <class InputIterator>
-typename std::vector<
- typename std::enable_if<
+typename std::vector<typename std::enable_if<
!std::is_same<
- typename std::result_of<
- typename std::iterator_traits<InputIterator>::value_type()>::type, void
- >::value,
+ typename std::result_of<
+ typename std::iterator_traits<InputIterator>::value_type()>::type,
+ void>::value,
typename std::pair<
- size_t,
- typename std::result_of<
- typename std::iterator_traits<InputIterator>::value_type()>::type>
- >::type
- >
+ size_t,
+ typename std::result_of<typename std::iterator_traits<
+ InputIterator>::value_type()>::type>>::type>
collectN(InputIterator first, InputIterator last, size_t n) {
typedef typename std::result_of<
- typename std::iterator_traits<InputIterator>::value_type()>::type Result;
+ typename std::iterator_traits<InputIterator>::value_type()>::type Result;
assert(n > 0);
assert(std::distance(first, last) >= 0);
assert(n <= static_cast<size_t>(std::distance(first, last)));
};
auto context = std::make_shared<Context>(n);
- await(
- [first, last, context](Promise<void> promise) mutable {
- context->promise = std::move(promise);
- for (size_t i = 0; first != last; ++i, ++first) {
+ await([first, last, context](Promise<void> promise) mutable {
+ context->promise = std::move(promise);
+ for (size_t i = 0; first != last; ++i, ++first) {
#ifdef __clang__
#pragma clang diagnostic push // ignore generalized lambda capture warning
#pragma clang diagnostic ignored "-Wc++1y-extensions"
#endif
- addTask(
- [i, context, f = std::move(*first)]() {
- try {
- auto result = f();
- if (context->tasksTodo == 0) {
- return;
- }
- context->results.emplace_back(i, std::move(result));
- } catch (...) {
- if (context->tasksTodo == 0) {
- return;
- }
- context->e = std::current_exception();
- }
- if (--context->tasksTodo == 0) {
- context->promise->setValue();
- }
- });
+ addTask([ i, context, f = std::move(*first) ]() {
+ try {
+ auto result = f();
+ if (context->tasksTodo == 0) {
+ return;
+ }
+ context->results.emplace_back(i, std::move(result));
+ } catch (...) {
+ if (context->tasksTodo == 0) {
+ return;
+ }
+ context->e = std::current_exception();
+ }
+ if (--context->tasksTodo == 0) {
+ context->promise->setValue();
+ }
+ });
#ifdef __clang__
#pragma clang diagnostic pop
#endif
- }
- });
+ }
+ });
if (context->e != std::exception_ptr()) {
std::rethrow_exception(context->e);
template <class InputIterator>
typename std::enable_if<
- std::is_same<
- typename std::result_of<
- typename std::iterator_traits<InputIterator>::value_type()>::type, void
- >::value, std::vector<size_t>>::type
+ std::is_same<
+ typename std::result_of<
+ typename std::iterator_traits<InputIterator>::value_type()>::type,
+ void>::value,
+ std::vector<size_t>>::type
collectN(InputIterator first, InputIterator last, size_t n) {
assert(n > 0);
assert(std::distance(first, last) >= 0);
};
auto context = std::make_shared<Context>(n);
- await(
- [first, last, context](Promise<void> promise) mutable {
- context->promise = std::move(promise);
- for (size_t i = 0; first != last; ++i, ++first) {
+ await([first, last, context](Promise<void> promise) mutable {
+ context->promise = std::move(promise);
+ for (size_t i = 0; first != last; ++i, ++first) {
#ifdef __clang__
#pragma clang diagnostic push // ignore generalized lambda capture warning
#pragma clang diagnostic ignored "-Wc++1y-extensions"
#endif
- addTask(
- [i, context, f = std::move(*first)]() {
- try {
- f();
- if (context->tasksTodo == 0) {
- return;
- }
- context->taskIndices.push_back(i);
- } catch (...) {
- if (context->tasksTodo == 0) {
- return;
- }
- context->e = std::current_exception();
- }
- if (--context->tasksTodo == 0) {
- context->promise->setValue();
- }
- });
+ addTask([ i, context, f = std::move(*first) ]() {
+ try {
+ f();
+ if (context->tasksTodo == 0) {
+ return;
+ }
+ context->taskIndices.push_back(i);
+ } catch (...) {
+ if (context->tasksTodo == 0) {
+ return;
+ }
+ context->e = std::current_exception();
+ }
+ if (--context->tasksTodo == 0) {
+ context->promise->setValue();
+ }
+ });
#ifdef __clang__
#pragma clang diagnostic pop
#endif
- }
- });
+ }
+ });
if (context->e != std::exception_ptr()) {
std::rethrow_exception(context->e);
template <class InputIterator>
typename std::vector<
- typename std::enable_if<
- !std::is_same<
- typename std::result_of<
- typename std::iterator_traits<InputIterator>::value_type()>::type, void
- >::value,
- typename std::result_of<
- typename std::iterator_traits<InputIterator>::value_type()>::type>::type>
-inline collectAll(InputIterator first, InputIterator last) {
+ typename std::enable_if<
+ !std::is_same<
+ typename std::result_of<typename std::iterator_traits<
+ InputIterator>::value_type()>::type,
+ void>::value,
+ typename std::result_of<
+ typename std::iterator_traits<InputIterator>::value_type()>::type>::
+ type> inline collectAll(InputIterator first, InputIterator last) {
typedef typename std::result_of<
- typename std::iterator_traits<InputIterator>::value_type()>::type Result;
+ typename std::iterator_traits<InputIterator>::value_type()>::type Result;
size_t n = std::distance(first, last);
std::vector<Result> results;
std::vector<size_t> order(n);
results.reserve(n);
- forEach(first, last,
- [&results, &order] (size_t id, Result result) {
- order[id] = results.size();
- results.emplace_back(std::move(result));
- });
+ forEach(first, last, [&results, &order](size_t id, Result result) {
+ order[id] = results.size();
+ results.emplace_back(std::move(result));
+ });
assert(results.size() == n);
std::vector<Result> orderedResults;
template <class InputIterator>
typename std::enable_if<
- std::is_same<
- typename std::result_of<
- typename std::iterator_traits<InputIterator>::value_type()>::type, void
- >::value, void>::type
-inline collectAll(InputIterator first, InputIterator last) {
+ std::is_same<
+ typename std::result_of<
+ typename std::iterator_traits<InputIterator>::value_type()>::type,
+ void>::value,
+ void>::type inline collectAll(InputIterator first, InputIterator last) {
forEach(first, last, [](size_t /* id */) {});
}
template <class InputIterator>
typename std::enable_if<
- !std::is_same<
- typename std::result_of<
- typename std::iterator_traits<InputIterator>::value_type()>::type, void
- >::value,
- typename std::pair<
- size_t,
- typename std::result_of<
- typename std::iterator_traits<InputIterator>::value_type()>::type>
- >::type
-inline collectAny(InputIterator first, InputIterator last) {
+ !std::is_same<
+ typename std::result_of<
+ typename std::iterator_traits<InputIterator>::value_type()>::type,
+ void>::value,
+ typename std::pair<
+ size_t,
+ typename std::result_of<typename std::iterator_traits<
+ InputIterator>::value_type()>::type>>::
+ type inline collectAny(InputIterator first, InputIterator last) {
auto result = collectN(first, last, 1);
assert(result.size() == 1);
return std::move(result[0]);
template <class InputIterator>
typename std::enable_if<
- std::is_same<
- typename std::result_of<
- typename std::iterator_traits<InputIterator>::value_type()>::type, void
- >::value, size_t>::type
-inline collectAny(InputIterator first, InputIterator last) {
+ std::is_same<
+ typename std::result_of<
+ typename std::iterator_traits<InputIterator>::value_type()>::type,
+ void>::value,
+ size_t>::type inline collectAny(InputIterator first, InputIterator last) {
auto result = collectN(first, last, 1);
assert(result.size() == 1);
return std::move(result[0]);
}
-
-}}
+}
+}
*/
#pragma once
-namespace folly { namespace fibers {
+namespace folly {
+namespace fibers {
/**
* Schedules several tasks and blocks until n of these tasks are completed.
*/
template <class InputIterator>
typename std::vector<
- typename std::enable_if<
- !std::is_same<
- typename std::result_of<
- typename std::iterator_traits<InputIterator>::value_type()>::type,
- void>::value,
- typename std::pair<
- size_t,
- typename std::result_of<
- typename std::iterator_traits<InputIterator>::value_type()>::type>
- >::type
- >
-inline collectN(InputIterator first, InputIterator last, size_t n);
+ typename std::enable_if<
+ !std::is_same<
+ typename std::result_of<typename std::iterator_traits<
+ InputIterator>::value_type()>::type,
+ void>::value,
+ typename std::pair<
+ size_t,
+ typename std::result_of<typename std::iterator_traits<
+ InputIterator>::value_type()>::type>>::
+ type> inline collectN(InputIterator first, InputIterator last, size_t n);
/**
* collectN specialization for functions returning void
*/
template <class InputIterator>
typename std::enable_if<
- std::is_same<
- typename std::result_of<
- typename std::iterator_traits<InputIterator>::value_type()>::type, void
- >::value, std::vector<size_t>>::type
-inline collectN(InputIterator first, InputIterator last, size_t n);
+ std::is_same<
+ typename std::result_of<
+ typename std::iterator_traits<InputIterator>::value_type()>::type,
+ void>::value,
+ std::vector<size_t>>::
+ type inline collectN(InputIterator first, InputIterator last, size_t n);
/**
* Schedules several tasks and blocks until all of these tasks are completed.
* @return vector of values returned by tasks
*/
template <class InputIterator>
-typename std::vector<
- typename std::enable_if<
+typename std::vector<typename std::enable_if<
!std::is_same<
- typename std::result_of<
- typename std::iterator_traits<InputIterator>::value_type()>::type,
- void>::value,
+ typename std::result_of<
+ typename std::iterator_traits<InputIterator>::value_type()>::type,
+ void>::value,
typename std::result_of<
- typename std::iterator_traits<InputIterator>::value_type()>::type>::type
- >
-inline collectAll(InputIterator first, InputIterator last);
+ typename std::iterator_traits<InputIterator>::value_type()>::
+ type>::type> inline collectAll(InputIterator first, InputIterator last);
/**
* collectAll specialization for functions returning void
*/
template <class InputIterator>
typename std::enable_if<
- std::is_same<
- typename std::result_of<
- typename std::iterator_traits<InputIterator>::value_type()>::type, void
- >::value, void>::type
-inline collectAll(InputIterator first, InputIterator last);
+ std::is_same<
+ typename std::result_of<
+ typename std::iterator_traits<InputIterator>::value_type()>::type,
+ void>::value,
+ void>::type inline collectAll(InputIterator first, InputIterator last);
/**
* Schedules several tasks and blocks until one of them is completed.
*/
template <class InputIterator>
typename std::enable_if<
- !std::is_same<
- typename std::result_of<
- typename std::iterator_traits<InputIterator>::value_type()>::type, void
- >::value,
- typename std::pair<
- size_t,
- typename std::result_of<
- typename std::iterator_traits<InputIterator>::value_type()>::type>
- >::type
-inline collectAny(InputIterator first, InputIterator last);
+ !std::is_same<
+ typename std::result_of<
+ typename std::iterator_traits<InputIterator>::value_type()>::type,
+ void>::value,
+ typename std::pair<
+ size_t,
+ typename std::result_of<typename std::iterator_traits<
+ InputIterator>::value_type()>::type>>::
+ type inline collectAny(InputIterator first, InputIterator last);
/**
* WhenAny specialization for functions returning void.
*/
template <class InputIterator>
typename std::enable_if<
- std::is_same<
- typename std::result_of<
- typename std::iterator_traits<InputIterator>::value_type()>::type, void
- >::value, size_t>::type
-inline collectAny(InputIterator first, InputIterator last);
-
-}}
+ std::is_same<
+ typename std::result_of<
+ typename std::iterator_traits<InputIterator>::value_type()>::type,
+ void>::value,
+ size_t>::type inline collectAny(InputIterator first, InputIterator last);
+}
+}
#include <folly/experimental/fibers/WhenN-inl.h>
FiberManager manager(folly::make_unique<SimpleLoopController>());
auto& loopController =
- dynamic_cast<SimpleLoopController&>(manager.loopController());
+ dynamic_cast<SimpleLoopController&>(manager.loopController());
auto loopFunc = [&]() {
- if (!taskAdded) {
- manager.addTask(
- [&]() {
- Baton baton;
+ if (!taskAdded) {
+ manager.addTask([&]() {
+ Baton baton;
- auto res = baton.timed_wait(std::chrono::milliseconds(230));
+ auto res = baton.timed_wait(std::chrono::milliseconds(230));
- EXPECT_FALSE(res);
- EXPECT_EQ(5, iterations);
+ EXPECT_FALSE(res);
+ EXPECT_EQ(5, iterations);
- loopController.stop();
- }
- );
- manager.addTask(
- [&]() {
- Baton baton;
+ loopController.stop();
+ });
+ manager.addTask([&]() {
+ Baton baton;
- auto res = baton.timed_wait(std::chrono::milliseconds(130));
+ auto res = baton.timed_wait(std::chrono::milliseconds(130));
- EXPECT_FALSE(res);
- EXPECT_EQ(3, iterations);
+ EXPECT_FALSE(res);
+ EXPECT_EQ(3, iterations);
- loopController.stop();
- }
- );
+ loopController.stop();
+ });
taskAdded = true;
} else {
std::this_thread::sleep_for(std::chrono::milliseconds(50));
- iterations ++;
+ iterations++;
}
};
FiberManager manager(folly::make_unique<SimpleLoopController>());
auto& loopController =
- dynamic_cast<SimpleLoopController&>(manager.loopController());
+ dynamic_cast<SimpleLoopController&>(manager.loopController());
auto loopFunc = [&]() {
if (!taskAdded) {
- manager.addTask(
- [&]() {
- Baton baton;
- baton_ptr = &baton;
+ manager.addTask([&]() {
+ Baton baton;
+ baton_ptr = &baton;
- auto res = baton.timed_wait(std::chrono::milliseconds(130));
+ auto res = baton.timed_wait(std::chrono::milliseconds(130));
- EXPECT_TRUE(res);
- EXPECT_EQ(2, iterations);
+ EXPECT_TRUE(res);
+ EXPECT_EQ(2, iterations);
- loopController.stop();
- }
- );
+ loopController.stop();
+ });
taskAdded = true;
} else {
std::this_thread::sleep_for(std::chrono::milliseconds(50));
- iterations ++;
+ iterations++;
if (iterations == 2) {
baton_ptr->post();
}
folly::EventBase evb;
FiberManager manager(folly::make_unique<EventBaseLoopController>());
- dynamic_cast<EventBaseLoopController&>(
- manager.loopController()).attachEventBase(evb);
+ dynamic_cast<EventBaseLoopController&>(manager.loopController())
+ .attachEventBase(evb);
auto task = [&](size_t timeout_ms) {
Baton baton;
EXPECT_FALSE(res);
auto duration_ms =
- std::chrono::duration_cast<std::chrono::milliseconds>(finish - start);
+ std::chrono::duration_cast<std::chrono::milliseconds>(finish - start);
EXPECT_GT(duration_ms.count(), timeout_ms - 50);
EXPECT_LT(duration_ms.count(), timeout_ms + 50);
};
evb.runInEventBaseThread([&]() {
- manager.addTask(
- [&]() {
- task(500);
- }
- );
- manager.addTask(
- [&]() {
- task(250);
- }
- );
+ manager.addTask([&]() { task(500); });
+ manager.addTask([&]() { task(250); });
});
evb.loopForever();
folly::EventBase evb;
FiberManager manager(folly::make_unique<EventBaseLoopController>());
- dynamic_cast<EventBaseLoopController&>(
- manager.loopController()).attachEventBase(evb);
+ dynamic_cast<EventBaseLoopController&>(manager.loopController())
+ .attachEventBase(evb);
evb.runInEventBaseThread([&]() {
- manager.addTask([&]() {
- Baton baton;
+ manager.addTask([&]() {
+ Baton baton;
- evb.tryRunAfterDelay([&]() {
- baton.post();
- },
- 100);
+ evb.tryRunAfterDelay([&]() { baton.post(); }, 100);
- auto start = EventBaseLoopController::Clock::now();
- auto res = baton.timed_wait(std::chrono::milliseconds(130));
- auto finish = EventBaseLoopController::Clock::now();
+ auto start = EventBaseLoopController::Clock::now();
+ auto res = baton.timed_wait(std::chrono::milliseconds(130));
+ auto finish = EventBaseLoopController::Clock::now();
- EXPECT_TRUE(res);
+ EXPECT_TRUE(res);
- auto duration_ms = std::chrono::duration_cast<
- std::chrono::milliseconds>(finish - start);
+ auto duration_ms =
+ std::chrono::duration_cast<std::chrono::milliseconds>(finish - start);
- EXPECT_TRUE(duration_ms.count() > 95 &&
- duration_ms.count() < 110);
+ EXPECT_TRUE(duration_ms.count() > 95 && duration_ms.count() < 110);
- if (++tasksComplete == 1) {
- evb.terminateLoopSoon();
- }
- });
+ if (++tasksComplete == 1) {
+ evb.terminateLoopSoon();
+ }
});
+ });
evb.loopForever();
}
TEST(FiberManager, batonTryWait) {
-
FiberManager manager(folly::make_unique<SimpleLoopController>());
// Check if try_wait and post work as expected
Baton b;
- manager.addTask([&](){
+ manager.addTask([&]() {
while (!b.try_wait()) {
}
});
- auto thr = std::thread([&](){
+ auto thr = std::thread([&]() {
std::this_thread::sleep_for(std::chrono::milliseconds(300));
b.post();
});
Baton c;
// Check try_wait without post
- manager.addTask([&](){
+ manager.addTask([&]() {
int cnt = 100;
while (cnt && !c.try_wait()) {
cnt--;
GenericBaton b;
bool fiberRunning = false;
- manager.addTask([&](){
+ manager.addTask([&]() {
EXPECT_EQ(manager.hasActiveFiber(), true);
fiberRunning = true;
b.wait();
manager.loopUntilNoReady();
EXPECT_TRUE(fiberRunning); // ensure fiber still active
- auto thr = std::thread([&](){
+ auto thr = std::thread([&]() {
std::this_thread::sleep_for(std::chrono::milliseconds(300));
b.post();
});
GenericBaton b;
std::atomic<bool> threadWaiting(false);
- auto thr = std::thread([&](){
+ auto thr = std::thread([&]() {
threadWaiting = true;
b.wait();
threadWaiting = false;
});
- while (!threadWaiting) {}
+ while (!threadWaiting) {
+ }
std::this_thread::sleep_for(std::chrono::milliseconds(300));
- manager.addTask([&](){
+ manager.addTask([&]() {
EXPECT_EQ(manager.hasActiveFiber(), true);
EXPECT_TRUE(threadWaiting);
b.post();
- while(threadWaiting) {}
+ while (threadWaiting) {
+ }
});
manager.loopUntilNoReady();
FiberManager manager(folly::make_unique<SimpleLoopController>());
auto& loopController =
- dynamic_cast<SimpleLoopController&>(manager.loopController());
+ dynamic_cast<SimpleLoopController&>(manager.loopController());
auto loopFunc = [&]() {
if (!taskAdded) {
- manager.addTask(
- [&]() {
- std::vector<std::function<std::unique_ptr<int>()>> funcs;
- for (size_t i = 0; i < 3; ++i) {
- funcs.push_back(
- [i, &pendingFibers]() {
- await([&pendingFibers](Promise<int> promise) {
- pendingFibers.push_back(std::move(promise));
- });
- return folly::make_unique<int>(i*2 + 1);
- }
- );
- }
+ manager.addTask([&]() {
+ std::vector<std::function<std::unique_ptr<int>()>> funcs;
+ for (size_t i = 0; i < 3; ++i) {
+ funcs.push_back([i, &pendingFibers]() {
+ await([&pendingFibers](Promise<int> promise) {
+ pendingFibers.push_back(std::move(promise));
+ });
+ return folly::make_unique<int>(i * 2 + 1);
+ });
+ }
- auto iter = addTasks(funcs.begin(), funcs.end());
+ auto iter = addTasks(funcs.begin(), funcs.end());
- size_t n = 0;
- while (iter.hasNext()) {
- auto result = iter.awaitNext();
- EXPECT_EQ(2 * iter.getTaskID() + 1, *result);
- EXPECT_GE(2 - n, pendingFibers.size());
- ++n;
- }
- EXPECT_EQ(3, n);
+ size_t n = 0;
+ while (iter.hasNext()) {
+ auto result = iter.awaitNext();
+ EXPECT_EQ(2 * iter.getTaskID() + 1, *result);
+ EXPECT_GE(2 - n, pendingFibers.size());
+ ++n;
}
- );
+ EXPECT_EQ(3, n);
+ });
taskAdded = true;
} else if (pendingFibers.size()) {
pendingFibers.back().setValue(0);
FiberManager manager(folly::make_unique<SimpleLoopController>());
auto& loopController =
- dynamic_cast<SimpleLoopController&>(manager.loopController());
+ dynamic_cast<SimpleLoopController&>(manager.loopController());
auto loopFunc = [&]() {
if (!taskAdded) {
- manager.addTask(
- [&]() {
- std::vector<std::function<int()>> funcs;
- for (size_t i = 0; i < 3; ++i) {
- funcs.push_back(
- [i, &pendingFibers]() {
- await([&pendingFibers](Promise<int> promise) {
- pendingFibers.push_back(std::move(promise));
- });
- if (i % 2 == 0) {
- throw std::runtime_error("Runtime");
- }
- return i*2 + 1;
- }
- );
- }
+ manager.addTask([&]() {
+ std::vector<std::function<int()>> funcs;
+ for (size_t i = 0; i < 3; ++i) {
+ funcs.push_back([i, &pendingFibers]() {
+ await([&pendingFibers](Promise<int> promise) {
+ pendingFibers.push_back(std::move(promise));
+ });
+ if (i % 2 == 0) {
+ throw std::runtime_error("Runtime");
+ }
+ return i * 2 + 1;
+ });
+ }
- auto iter = addTasks(funcs.begin(), funcs.end());
+ auto iter = addTasks(funcs.begin(), funcs.end());
- size_t n = 0;
- while (iter.hasNext()) {
- try {
- int result = iter.awaitNext();
- EXPECT_EQ(1, iter.getTaskID() % 2);
- EXPECT_EQ(2 * iter.getTaskID() + 1, result);
- } catch (...) {
- EXPECT_EQ(0, iter.getTaskID() % 2);
- }
- EXPECT_GE(2 - n, pendingFibers.size());
- ++n;
+ size_t n = 0;
+ while (iter.hasNext()) {
+ try {
+ int result = iter.awaitNext();
+ EXPECT_EQ(1, iter.getTaskID() % 2);
+ EXPECT_EQ(2 * iter.getTaskID() + 1, result);
+ } catch (...) {
+ EXPECT_EQ(0, iter.getTaskID() % 2);
}
- EXPECT_EQ(3, n);
+ EXPECT_GE(2 - n, pendingFibers.size());
+ ++n;
}
- );
+ EXPECT_EQ(3, n);
+ });
taskAdded = true;
} else if (pendingFibers.size()) {
pendingFibers.back().setValue(0);
FiberManager manager(folly::make_unique<SimpleLoopController>());
auto& loopController =
- dynamic_cast<SimpleLoopController&>(manager.loopController());
+ dynamic_cast<SimpleLoopController&>(manager.loopController());
auto loopFunc = [&]() {
if (!taskAdded) {
- manager.addTask(
- [&]() {
- std::vector<std::function<void()>> funcs;
- for (size_t i = 0; i < 3; ++i) {
- funcs.push_back(
- [i, &pendingFibers]() {
- await([&pendingFibers](Promise<int> promise) {
- pendingFibers.push_back(std::move(promise));
- });
- }
- );
- }
+ manager.addTask([&]() {
+ std::vector<std::function<void()>> funcs;
+ for (size_t i = 0; i < 3; ++i) {
+ funcs.push_back([i, &pendingFibers]() {
+ await([&pendingFibers](Promise<int> promise) {
+ pendingFibers.push_back(std::move(promise));
+ });
+ });
+ }
- auto iter = addTasks(funcs.begin(), funcs.end());
+ auto iter = addTasks(funcs.begin(), funcs.end());
- size_t n = 0;
- while (iter.hasNext()) {
- iter.awaitNext();
- EXPECT_GE(2 - n, pendingFibers.size());
- ++n;
- }
- EXPECT_EQ(3, n);
+ size_t n = 0;
+ while (iter.hasNext()) {
+ iter.awaitNext();
+ EXPECT_GE(2 - n, pendingFibers.size());
+ ++n;
}
- );
+ EXPECT_EQ(3, n);
+ });
taskAdded = true;
} else if (pendingFibers.size()) {
pendingFibers.back().setValue(0);
FiberManager manager(folly::make_unique<SimpleLoopController>());
auto& loopController =
- dynamic_cast<SimpleLoopController&>(manager.loopController());
+ dynamic_cast<SimpleLoopController&>(manager.loopController());
auto loopFunc = [&]() {
if (!taskAdded) {
- manager.addTask(
- [&]() {
- std::vector<std::function<void()>> funcs;
- for (size_t i = 0; i < 3; ++i) {
- funcs.push_back(
- [i, &pendingFibers]() {
- await([&pendingFibers](Promise<int> promise) {
- pendingFibers.push_back(std::move(promise));
- });
- if (i % 2 == 0) {
- throw std::runtime_error("");
- }
- }
- );
- }
+ manager.addTask([&]() {
+ std::vector<std::function<void()>> funcs;
+ for (size_t i = 0; i < 3; ++i) {
+ funcs.push_back([i, &pendingFibers]() {
+ await([&pendingFibers](Promise<int> promise) {
+ pendingFibers.push_back(std::move(promise));
+ });
+ if (i % 2 == 0) {
+ throw std::runtime_error("");
+ }
+ });
+ }
- auto iter = addTasks(funcs.begin(), funcs.end());
+ auto iter = addTasks(funcs.begin(), funcs.end());
- size_t n = 0;
- while (iter.hasNext()) {
- try {
- iter.awaitNext();
- EXPECT_EQ(1, iter.getTaskID() % 2);
- } catch (...) {
- EXPECT_EQ(0, iter.getTaskID() % 2);
- }
- EXPECT_GE(2 - n, pendingFibers.size());
- ++n;
+ size_t n = 0;
+ while (iter.hasNext()) {
+ try {
+ iter.awaitNext();
+ EXPECT_EQ(1, iter.getTaskID() % 2);
+ } catch (...) {
+ EXPECT_EQ(0, iter.getTaskID() % 2);
}
- EXPECT_EQ(3, n);
+ EXPECT_GE(2 - n, pendingFibers.size());
+ ++n;
}
- );
+ EXPECT_EQ(3, n);
+ });
taskAdded = true;
} else if (pendingFibers.size()) {
pendingFibers.back().setValue(0);
FiberManager manager(folly::make_unique<SimpleLoopController>());
auto& loopController =
- dynamic_cast<SimpleLoopController&>(manager.loopController());
+ dynamic_cast<SimpleLoopController&>(manager.loopController());
auto loopFunc = [&]() {
if (!taskAdded) {
- manager.addTask(
- [&]() {
- std::vector<std::function<void()>> funcs;
- for (size_t i = 0; i < 3; ++i) {
- funcs.push_back(
- [&pendingFibers]() {
- await([&pendingFibers](Promise<int> promise) {
- pendingFibers.push_back(std::move(promise));
- });
- }
- );
- }
+ manager.addTask([&]() {
+ std::vector<std::function<void()>> funcs;
+ for (size_t i = 0; i < 3; ++i) {
+ funcs.push_back([&pendingFibers]() {
+ await([&pendingFibers](Promise<int> promise) {
+ pendingFibers.push_back(std::move(promise));
+ });
+ });
+ }
- auto iter = addTasks(funcs.begin(), funcs.end());
+ auto iter = addTasks(funcs.begin(), funcs.end());
- iter.reserve(2);
- EXPECT_TRUE(iter.hasCompleted());
- EXPECT_TRUE(iter.hasPending());
- EXPECT_TRUE(iter.hasNext());
+ iter.reserve(2);
+ EXPECT_TRUE(iter.hasCompleted());
+ EXPECT_TRUE(iter.hasPending());
+ EXPECT_TRUE(iter.hasNext());
- iter.awaitNext();
- EXPECT_TRUE(iter.hasCompleted());
- EXPECT_TRUE(iter.hasPending());
- EXPECT_TRUE(iter.hasNext());
+ iter.awaitNext();
+ EXPECT_TRUE(iter.hasCompleted());
+ EXPECT_TRUE(iter.hasPending());
+ EXPECT_TRUE(iter.hasNext());
- iter.awaitNext();
- EXPECT_FALSE(iter.hasCompleted());
- EXPECT_TRUE(iter.hasPending());
- EXPECT_TRUE(iter.hasNext());
+ iter.awaitNext();
+ EXPECT_FALSE(iter.hasCompleted());
+ EXPECT_TRUE(iter.hasPending());
+ EXPECT_TRUE(iter.hasNext());
- iter.awaitNext();
- EXPECT_FALSE(iter.hasCompleted());
- EXPECT_FALSE(iter.hasPending());
- EXPECT_FALSE(iter.hasNext());
- }
- );
+ iter.awaitNext();
+ EXPECT_FALSE(iter.hasCompleted());
+ EXPECT_FALSE(iter.hasPending());
+ EXPECT_FALSE(iter.hasNext());
+ });
taskAdded = true;
} else if (pendingFibers.size()) {
pendingFibers.back().setValue(0);
FiberManager manager(folly::make_unique<SimpleLoopController>());
auto& loopController =
- dynamic_cast<SimpleLoopController&>(manager.loopController());
+ dynamic_cast<SimpleLoopController&>(manager.loopController());
auto loopFunc = [&]() {
if (!taskAdded) {
- manager.addTask(
- [&]() {
- std::vector<std::function<int()>> funcs;
- for (size_t i = 0; i < 3; ++i) {
- funcs.push_back(
- [i, &pendingFibers]() {
- await([&pendingFibers](Promise<int> promise) {
- pendingFibers.push_back(std::move(promise));
- });
- return i * 2 + 1;
- }
- );
- }
-
- std::vector<std::pair<size_t, int>> results;
- forEach(funcs.begin(), funcs.end(),
- [&results](size_t id, int result) {
- results.emplace_back(id, result);
+ manager.addTask([&]() {
+ std::vector<std::function<int()>> funcs;
+ for (size_t i = 0; i < 3; ++i) {
+ funcs.push_back([i, &pendingFibers]() {
+ await([&pendingFibers](Promise<int> promise) {
+ pendingFibers.push_back(std::move(promise));
});
- EXPECT_EQ(3, results.size());
- EXPECT_TRUE(pendingFibers.empty());
- for (size_t i = 0; i < 3; ++i) {
- EXPECT_EQ(results[i].first * 2 + 1, results[i].second);
- }
+ return i * 2 + 1;
+ });
}
- );
+
+ std::vector<std::pair<size_t, int>> results;
+ forEach(funcs.begin(), funcs.end(), [&results](size_t id, int result) {
+ results.emplace_back(id, result);
+ });
+ EXPECT_EQ(3, results.size());
+ EXPECT_TRUE(pendingFibers.empty());
+ for (size_t i = 0; i < 3; ++i) {
+ EXPECT_EQ(results[i].first * 2 + 1, results[i].second);
+ }
+ });
taskAdded = true;
} else if (pendingFibers.size()) {
pendingFibers.back().setValue(0);
FiberManager manager(folly::make_unique<SimpleLoopController>());
auto& loopController =
- dynamic_cast<SimpleLoopController&>(manager.loopController());
+ dynamic_cast<SimpleLoopController&>(manager.loopController());
auto loopFunc = [&]() {
if (!taskAdded) {
- manager.addTask(
- [&]() {
- std::vector<std::function<int()>> funcs;
- for (size_t i = 0; i < 3; ++i) {
- funcs.push_back(
- [i, &pendingFibers]() {
- await([&pendingFibers](Promise<int> promise) {
- pendingFibers.push_back(std::move(promise));
- });
- return i*2 + 1;
- }
- );
- }
+ manager.addTask([&]() {
+ std::vector<std::function<int()>> funcs;
+ for (size_t i = 0; i < 3; ++i) {
+ funcs.push_back([i, &pendingFibers]() {
+ await([&pendingFibers](Promise<int> promise) {
+ pendingFibers.push_back(std::move(promise));
+ });
+ return i * 2 + 1;
+ });
+ }
- auto results = collectN(funcs.begin(), funcs.end(), 2);
- EXPECT_EQ(2, results.size());
- EXPECT_EQ(1, pendingFibers.size());
- for (size_t i = 0; i < 2; ++i) {
- EXPECT_EQ(results[i].first*2 + 1, results[i].second);
- }
+ auto results = collectN(funcs.begin(), funcs.end(), 2);
+ EXPECT_EQ(2, results.size());
+ EXPECT_EQ(1, pendingFibers.size());
+ for (size_t i = 0; i < 2; ++i) {
+ EXPECT_EQ(results[i].first * 2 + 1, results[i].second);
}
- );
+ });
taskAdded = true;
} else if (pendingFibers.size()) {
pendingFibers.back().setValue(0);
FiberManager manager(folly::make_unique<SimpleLoopController>());
auto& loopController =
- dynamic_cast<SimpleLoopController&>(manager.loopController());
+ dynamic_cast<SimpleLoopController&>(manager.loopController());
auto loopFunc = [&]() {
if (!taskAdded) {
- manager.addTask(
- [&]() {
- std::vector<std::function<int()>> funcs;
- for (size_t i = 0; i < 3; ++i) {
- funcs.push_back(
- [i, &pendingFibers]() {
- await([&pendingFibers](Promise<int> promise) {
- pendingFibers.push_back(std::move(promise));
- });
- throw std::runtime_error("Runtime");
- return i*2+1;
- }
- );
- }
+ manager.addTask([&]() {
+ std::vector<std::function<int()>> funcs;
+ for (size_t i = 0; i < 3; ++i) {
+ funcs.push_back([i, &pendingFibers]() {
+ await([&pendingFibers](Promise<int> promise) {
+ pendingFibers.push_back(std::move(promise));
+ });
+ throw std::runtime_error("Runtime");
+ return i * 2 + 1;
+ });
+ }
- try {
- collectN(funcs.begin(), funcs.end(), 2);
- } catch (...) {
- EXPECT_EQ(1, pendingFibers.size());
- }
+ try {
+ collectN(funcs.begin(), funcs.end(), 2);
+ } catch (...) {
+ EXPECT_EQ(1, pendingFibers.size());
}
- );
+ });
taskAdded = true;
} else if (pendingFibers.size()) {
pendingFibers.back().setValue(0);
FiberManager manager(folly::make_unique<SimpleLoopController>());
auto& loopController =
- dynamic_cast<SimpleLoopController&>(manager.loopController());
+ dynamic_cast<SimpleLoopController&>(manager.loopController());
auto loopFunc = [&]() {
if (!taskAdded) {
- manager.addTask(
- [&]() {
- std::vector<std::function<void()>> funcs;
- for (size_t i = 0; i < 3; ++i) {
- funcs.push_back(
- [i, &pendingFibers]() {
- await([&pendingFibers](Promise<int> promise) {
- pendingFibers.push_back(std::move(promise));
- });
- }
- );
- }
-
- auto results = collectN(funcs.begin(), funcs.end(), 2);
- EXPECT_EQ(2, results.size());
- EXPECT_EQ(1, pendingFibers.size());
+ manager.addTask([&]() {
+ std::vector<std::function<void()>> funcs;
+ for (size_t i = 0; i < 3; ++i) {
+ funcs.push_back([i, &pendingFibers]() {
+ await([&pendingFibers](Promise<int> promise) {
+ pendingFibers.push_back(std::move(promise));
+ });
+ });
}
- );
+
+ auto results = collectN(funcs.begin(), funcs.end(), 2);
+ EXPECT_EQ(2, results.size());
+ EXPECT_EQ(1, pendingFibers.size());
+ });
taskAdded = true;
} else if (pendingFibers.size()) {
pendingFibers.back().setValue(0);
FiberManager manager(folly::make_unique<SimpleLoopController>());
auto& loopController =
- dynamic_cast<SimpleLoopController&>(manager.loopController());
+ dynamic_cast<SimpleLoopController&>(manager.loopController());
auto loopFunc = [&]() {
if (!taskAdded) {
- manager.addTask(
- [&]() {
- std::vector<std::function<void()>> funcs;
- for (size_t i = 0; i < 3; ++i) {
- funcs.push_back(
- [i, &pendingFibers]() {
- await([&pendingFibers](Promise<int> promise) {
- pendingFibers.push_back(std::move(promise));
- });
- throw std::runtime_error("Runtime");
- }
- );
- }
+ manager.addTask([&]() {
+ std::vector<std::function<void()>> funcs;
+ for (size_t i = 0; i < 3; ++i) {
+ funcs.push_back([i, &pendingFibers]() {
+ await([&pendingFibers](Promise<int> promise) {
+ pendingFibers.push_back(std::move(promise));
+ });
+ throw std::runtime_error("Runtime");
+ });
+ }
- try {
- collectN(funcs.begin(), funcs.end(), 2);
- } catch (...) {
- EXPECT_EQ(1, pendingFibers.size());
- }
+ try {
+ collectN(funcs.begin(), funcs.end(), 2);
+ } catch (...) {
+ EXPECT_EQ(1, pendingFibers.size());
}
- );
+ });
taskAdded = true;
} else if (pendingFibers.size()) {
pendingFibers.back().setValue(0);
FiberManager manager(folly::make_unique<SimpleLoopController>());
auto& loopController =
- dynamic_cast<SimpleLoopController&>(manager.loopController());
+ dynamic_cast<SimpleLoopController&>(manager.loopController());
auto loopFunc = [&]() {
if (!taskAdded) {
- manager.addTask(
- [&]() {
- std::vector<std::function<int()>> funcs;
- for (size_t i = 0; i < 3; ++i) {
- funcs.push_back(
- [i, &pendingFibers]() {
- await([&pendingFibers](Promise<int> promise) {
- pendingFibers.push_back(std::move(promise));
- });
- return i*2+1;
- }
- );
- }
+ manager.addTask([&]() {
+ std::vector<std::function<int()>> funcs;
+ for (size_t i = 0; i < 3; ++i) {
+ funcs.push_back([i, &pendingFibers]() {
+ await([&pendingFibers](Promise<int> promise) {
+ pendingFibers.push_back(std::move(promise));
+ });
+ return i * 2 + 1;
+ });
+ }
- auto results = collectAll(funcs.begin(), funcs.end());
- EXPECT_TRUE(pendingFibers.empty());
- for (size_t i = 0; i < 3; ++i) {
- EXPECT_EQ(i*2+1, results[i]);
- }
+ auto results = collectAll(funcs.begin(), funcs.end());
+ EXPECT_TRUE(pendingFibers.empty());
+ for (size_t i = 0; i < 3; ++i) {
+ EXPECT_EQ(i * 2 + 1, results[i]);
}
- );
+ });
taskAdded = true;
} else if (pendingFibers.size()) {
pendingFibers.back().setValue(0);
FiberManager manager(folly::make_unique<SimpleLoopController>());
auto& loopController =
- dynamic_cast<SimpleLoopController&>(manager.loopController());
+ dynamic_cast<SimpleLoopController&>(manager.loopController());
auto loopFunc = [&]() {
if (!taskAdded) {
- manager.addTask(
- [&]() {
- std::vector<std::function<void()>> funcs;
- for (size_t i = 0; i < 3; ++ i) {
- funcs.push_back(
- [i, &pendingFibers]() {
- await([&pendingFibers](Promise<int> promise) {
- pendingFibers.push_back(std::move(promise));
- });
- }
- );
- }
-
- collectAll(funcs.begin(), funcs.end());
- EXPECT_TRUE(pendingFibers.empty());
+ manager.addTask([&]() {
+ std::vector<std::function<void()>> funcs;
+ for (size_t i = 0; i < 3; ++i) {
+ funcs.push_back([i, &pendingFibers]() {
+ await([&pendingFibers](Promise<int> promise) {
+ pendingFibers.push_back(std::move(promise));
+ });
+ });
}
- );
+
+ collectAll(funcs.begin(), funcs.end());
+ EXPECT_TRUE(pendingFibers.empty());
+ });
taskAdded = true;
} else if (pendingFibers.size()) {
pendingFibers.back().setValue(0);
FiberManager manager(folly::make_unique<SimpleLoopController>());
auto& loopController =
- dynamic_cast<SimpleLoopController&>(manager.loopController());
+ dynamic_cast<SimpleLoopController&>(manager.loopController());
auto loopFunc = [&]() {
if (!taskAdded) {
- manager.addTask(
- [&]() {
- std::vector<std::function<int()> > funcs;
- for (size_t i = 0; i < 3; ++ i) {
- funcs.push_back(
- [i, &pendingFibers]() {
- await([&pendingFibers](Promise<int> promise) {
- pendingFibers.push_back(std::move(promise));
- });
- if (i == 1) {
- throw std::runtime_error("This exception will be ignored");
- }
- return i*2+1;
- }
- );
- }
-
- auto result = collectAny(funcs.begin(), funcs.end());
- EXPECT_EQ(2, pendingFibers.size());
- EXPECT_EQ(2, result.first);
- EXPECT_EQ(2*2+1, result.second);
+ manager.addTask([&]() {
+ std::vector<std::function<int()>> funcs;
+ for (size_t i = 0; i < 3; ++i) {
+ funcs.push_back([i, &pendingFibers]() {
+ await([&pendingFibers](Promise<int> promise) {
+ pendingFibers.push_back(std::move(promise));
+ });
+ if (i == 1) {
+ throw std::runtime_error("This exception will be ignored");
+ }
+ return i * 2 + 1;
+ });
}
- );
+
+ auto result = collectAny(funcs.begin(), funcs.end());
+ EXPECT_EQ(2, pendingFibers.size());
+ EXPECT_EQ(2, result.first);
+ EXPECT_EQ(2 * 2 + 1, result.second);
+ });
taskAdded = true;
} else if (pendingFibers.size()) {
pendingFibers.back().setValue(0);
TEST(FiberManager, runInMainContext) {
FiberManager manager(folly::make_unique<SimpleLoopController>());
auto& loopController =
- dynamic_cast<SimpleLoopController&>(manager.loopController());
+ dynamic_cast<SimpleLoopController&>(manager.loopController());
bool checkRan = false;
int mainLocation;
manager.runInMainContext(
- [&]() {
- expectMainContext(checkRan, &mainLocation, nullptr);
- });
+ [&]() { expectMainContext(checkRan, &mainLocation, nullptr); });
EXPECT_TRUE(checkRan);
checkRan = false;
EXPECT_EQ(42, ret.value);
});
- loopController.loop(
- [&]() {
- loopController.stop();
- }
- );
+ loopController.loop([&]() { loopController.stop(); });
EXPECT_TRUE(checkRan);
}
TEST(FiberManager, addTaskFinally) {
FiberManager manager(folly::make_unique<SimpleLoopController>());
auto& loopController =
- dynamic_cast<SimpleLoopController&>(manager.loopController());
+ dynamic_cast<SimpleLoopController&>(manager.loopController());
bool checkRan = false;
int mainLocation;
manager.addTaskFinally(
- [&]() {
- return 1234;
- },
- [&](Try<int>&& result) {
- EXPECT_EQ(result.value(), 1234);
+ [&]() { return 1234; },
+ [&](Try<int>&& result) {
+ EXPECT_EQ(result.value(), 1234);
- expectMainContext(checkRan, &mainLocation, nullptr);
- }
- );
+ expectMainContext(checkRan, &mainLocation, nullptr);
+ });
EXPECT_FALSE(checkRan);
- loopController.loop(
- [&]() {
- loopController.stop();
- }
- );
+ loopController.loop([&]() { loopController.stop(); });
EXPECT_TRUE(checkRan);
}
FiberManager manager(folly::make_unique<SimpleLoopController>(), opts);
auto& loopController =
- dynamic_cast<SimpleLoopController&>(manager.loopController());
+ dynamic_cast<SimpleLoopController&>(manager.loopController());
size_t fibersRun = 0;
for (size_t i = 0; i < 5; ++i) {
- manager.addTask(
- [&]() {
- ++fibersRun;
- }
- );
+ manager.addTask([&]() { ++fibersRun; });
}
- loopController.loop(
- [&]() {
- loopController.stop();
- }
- );
+ loopController.loop([&]() { loopController.stop(); });
EXPECT_EQ(5, fibersRun);
EXPECT_EQ(5, manager.fibersAllocated());
EXPECT_EQ(5, manager.fibersPoolSize());
for (size_t i = 0; i < 5; ++i) {
- manager.addTask(
- [&]() {
- ++fibersRun;
- }
- );
+ manager.addTask([&]() { ++fibersRun; });
}
- loopController.loop(
- [&]() {
- loopController.stop();
- }
- );
+ loopController.loop([&]() { loopController.stop(); });
EXPECT_EQ(10, fibersRun);
EXPECT_EQ(5, manager.fibersAllocated());
FiberManager manager(folly::make_unique<SimpleLoopController>(), opts);
auto& loopController =
- dynamic_cast<SimpleLoopController&>(manager.loopController());
+ dynamic_cast<SimpleLoopController&>(manager.loopController());
size_t fibersRun = 0;
for (size_t i = 0; i < 10; ++i) {
- manager.addTask(
- [&]() {
- ++fibersRun;
- }
- );
+ manager.addTask([&]() { ++fibersRun; });
}
EXPECT_EQ(0, fibersRun);
EXPECT_EQ(10, manager.fibersAllocated());
EXPECT_EQ(0, manager.fibersPoolSize());
- loopController.loop(
- [&]() {
- loopController.stop();
- }
- );
+ loopController.loop([&]() { loopController.stop(); });
EXPECT_EQ(10, fibersRun);
EXPECT_EQ(5, manager.fibersAllocated());
TEST(FiberManager, remoteFiberBasic) {
FiberManager manager(folly::make_unique<SimpleLoopController>());
auto& loopController =
- dynamic_cast<SimpleLoopController&>(manager.loopController());
+ dynamic_cast<SimpleLoopController&>(manager.loopController());
int result[2];
result[0] = result[1] = 0;
folly::Optional<Promise<int>> savedPromise[2];
- manager.addTask(
- [&] () {
- result[0] = await([&] (Promise<int> promise) {
- savedPromise[0] = std::move(promise);
- });
- });
- manager.addTask(
- [&] () {
- result[1] = await([&] (Promise<int> promise) {
- savedPromise[1] = std::move(promise);
- });
- });
+ manager.addTask([&]() {
+ result[0] = await(
+ [&](Promise<int> promise) { savedPromise[0] = std::move(promise); });
+ });
+ manager.addTask([&]() {
+ result[1] = await(
+ [&](Promise<int> promise) { savedPromise[1] = std::move(promise); });
+ });
manager.loopUntilNoReady();
EXPECT_EQ(0, result[0]);
EXPECT_EQ(0, result[1]);
- std::thread remoteThread0{
- [&] () {
- savedPromise[0]->setValue(42);
- }
- };
- std::thread remoteThread1{
- [&] () {
- savedPromise[1]->setValue(43);
- }
- };
+ std::thread remoteThread0{[&]() { savedPromise[0]->setValue(42); }};
+ std::thread remoteThread1{[&]() { savedPromise[1]->setValue(43); }};
remoteThread0.join();
remoteThread1.join();
EXPECT_EQ(0, result[0]);
result[0] = result[1] = 0;
folly::Optional<Promise<int>> savedPromise[2];
- std::thread remoteThread0{
- [&] () {
- manager.addTaskRemote(
- [&] () {
- result[0] = await([&] (Promise<int> promise) {
- savedPromise[0] = std::move(promise);
- });
- });
- }
- };
- std::thread remoteThread1{
- [&] () {
- manager.addTaskRemote(
- [&] () {
- result[1] = await([&] (Promise<int> promise) {
- savedPromise[1] = std::move(promise);
- });
- });
- }
- };
+ std::thread remoteThread0{[&]() {
+ manager.addTaskRemote([&]() {
+ result[0] = await(
+ [&](Promise<int> promise) { savedPromise[0] = std::move(promise); });
+ });
+ }};
+ std::thread remoteThread1{[&]() {
+ manager.addTaskRemote([&]() {
+ result[1] = await(
+ [&](Promise<int> promise) { savedPromise[1] = std::move(promise); });
+ });
+ }};
remoteThread0.join();
remoteThread1.join();
TEST(FiberManager, remoteHasTasks) {
size_t counter = 0;
FiberManager fm(folly::make_unique<SimpleLoopController>());
- std::thread remote([&]() {
- fm.addTaskRemote([&]() {
- ++counter;
- });
- });
+ std::thread remote([&]() { fm.addTaskRemote([&]() { ++counter; }); });
remote.join();
FiberManager fm(folly::make_unique<SimpleLoopController>());
std::thread remote([&]() {
fm.addTaskRemote([&]() {
- result = await([&](Promise<int> promise) {
- savedPromise = std::move(promise);
- });
+ result = await(
+ [&](Promise<int> promise) { savedPromise = std::move(promise); });
EXPECT_TRUE(fm.hasTasks());
});
});
fm.loopUntilNoReady();
EXPECT_TRUE(fm.hasTasks());
- std::thread remote2([&](){
- savedPromise->setValue(47);
- });
+ std::thread remote2([&]() { savedPromise->setValue(47); });
remote2.join();
EXPECT_TRUE(fm.hasTasks());
template <typename Data>
void testFiberLocal() {
- FiberManager fm(LocalType<Data>(),
- folly::make_unique<SimpleLoopController>());
+ FiberManager fm(
+ LocalType<Data>(), folly::make_unique<SimpleLoopController>());
fm.addTask([]() {
- EXPECT_EQ(42, local<Data>().value);
+ EXPECT_EQ(42, local<Data>().value);
- local<Data>().value = 43;
+ local<Data>().value = 43;
- addTask([]() {
- EXPECT_EQ(43, local<Data>().value);
+ addTask([]() {
+ EXPECT_EQ(43, local<Data>().value);
- local<Data>().value = 44;
+ local<Data>().value = 44;
- addTask([]() {
- EXPECT_EQ(44, local<Data>().value);
- });
- });
- });
+ addTask([]() { EXPECT_EQ(44, local<Data>().value); });
+ });
+ });
fm.addTask([&]() {
- EXPECT_EQ(42, local<Data>().value);
+ EXPECT_EQ(42, local<Data>().value);
- local<Data>().value = 43;
+ local<Data>().value = 43;
- fm.addTaskRemote([]() {
- EXPECT_EQ(43, local<Data>().value);
- });
- });
+ fm.addTaskRemote([]() { EXPECT_EQ(43, local<Data>().value); });
+ });
fm.addTask([]() {
- EXPECT_EQ(42, local<Data>().value);
- local<Data>().value = 43;
-
- auto task = []() {
- EXPECT_EQ(43, local<Data>().value);
- local<Data>().value = 44;
- };
- std::vector<std::function<void()>> tasks{task};
- collectAny(tasks.begin(), tasks.end());
+ EXPECT_EQ(42, local<Data>().value);
+ local<Data>().value = 43;
+ auto task = []() {
EXPECT_EQ(43, local<Data>().value);
- });
+ local<Data>().value = 44;
+ };
+ std::vector<std::function<void()>> tasks{task};
+ collectAny(tasks.begin(), tasks.end());
+
+ EXPECT_EQ(43, local<Data>().value);
+ });
fm.loopUntilNoReady();
EXPECT_FALSE(fm.hasTasks());
TEST(FiberManager, fiberLocalHeap) {
struct LargeData {
- char _[1024*1024];
+ char _[1024 * 1024];
int value{42};
};
~CrazyData() {
if (data == 41) {
addTask([]() {
- EXPECT_EQ(42, local<CrazyData>().data);
- // Make sure we don't have infinite loop
- local<CrazyData>().data = 0;
- });
+ EXPECT_EQ(42, local<CrazyData>().data);
+ // Make sure we don't have infinite loop
+ local<CrazyData>().data = 0;
+ });
}
}
};
- FiberManager fm(LocalType<CrazyData>(),
- folly::make_unique<SimpleLoopController>());
+ FiberManager fm(
+ LocalType<CrazyData>(), folly::make_unique<SimpleLoopController>());
- fm.addTask([]() {
- local<CrazyData>().data = 41;
- });
+ fm.addTask([]() { local<CrazyData>().data = 41; });
fm.loopUntilNoReady();
EXPECT_FALSE(fm.hasTasks());
TEST(FiberManager, yieldTest) {
FiberManager manager(folly::make_unique<SimpleLoopController>());
auto& loopController =
- dynamic_cast<SimpleLoopController&>(manager.loopController());
+ dynamic_cast<SimpleLoopController&>(manager.loopController());
bool checkRan = false;
- manager.addTask(
- [&]() {
- manager.yield();
- checkRan = true;
- }
- );
+ manager.addTask([&]() {
+ manager.yield();
+ checkRan = true;
+ });
- loopController.loop(
- [&]() {
- if (checkRan) {
- loopController.stop();
- }
+ loopController.loop([&]() {
+ if (checkRan) {
+ loopController.stop();
}
- );
+ });
EXPECT_TRUE(checkRan);
}
folly::RequestContext::create();
auto rcontext1 = folly::RequestContext::get();
fm.addTask([&]() {
- EXPECT_EQ(rcontext1, folly::RequestContext::get());
- baton1.wait([&]() {
- EXPECT_EQ(rcontext1, folly::RequestContext::get());
- });
- EXPECT_EQ(rcontext1, folly::RequestContext::get());
- runInMainContext([&]() {
- EXPECT_EQ(rcontext1, folly::RequestContext::get());
- });
- checkRun1 = true;
- });
+ EXPECT_EQ(rcontext1, folly::RequestContext::get());
+ baton1.wait([&]() { EXPECT_EQ(rcontext1, folly::RequestContext::get()); });
+ EXPECT_EQ(rcontext1, folly::RequestContext::get());
+ runInMainContext(
+ [&]() { EXPECT_EQ(rcontext1, folly::RequestContext::get()); });
+ checkRun1 = true;
+ });
folly::RequestContext::create();
auto rcontext2 = folly::RequestContext::get();
fm.addTaskRemote([&]() {
- EXPECT_EQ(rcontext2, folly::RequestContext::get());
- baton2.wait();
- EXPECT_EQ(rcontext2, folly::RequestContext::get());
- checkRun2 = true;
- });
+ EXPECT_EQ(rcontext2, folly::RequestContext::get());
+ baton2.wait();
+ EXPECT_EQ(rcontext2, folly::RequestContext::get());
+ checkRun2 = true;
+ });
folly::RequestContext::create();
auto rcontext3 = folly::RequestContext::get();
folly::EventBase evb;
dynamic_cast<EventBaseLoopController&>(manager.loopController())
- .attachEventBase(evb);
+ .attachEventBase(evb);
std::vector<Baton> batons(10);
folly::EventBase evb;
dynamic_cast<EventBaseLoopController&>(manager.loopController())
- .attachEventBase(evb);
+ .attachEventBase(evb);
size_t fibersRun = 0;
Baton baton;
folly::EventBase evb;
dynamic_cast<EventBaseLoopController&>(manager.loopController())
- .attachEventBase(evb);
+ .attachEventBase(evb);
constexpr size_t kNumTimeoutTasks = 10000;
size_t tasksCount = kNumTimeoutTasks;
folly::EventBase innerEvb;
getFiberManager(outerEvb).addTask([&]() {
- EXPECT_EQ(&getFiberManager(outerEvb),
- FiberManager::getFiberManagerUnsafe());
+ EXPECT_EQ(
+ &getFiberManager(outerEvb), FiberManager::getFiberManagerUnsafe());
runInMainContext([&]() {
getFiberManager(innerEvb).addTask([&]() {
- EXPECT_EQ(&getFiberManager(innerEvb),
- FiberManager::getFiberManagerUnsafe());
+ EXPECT_EQ(
+ &getFiberManager(innerEvb), FiberManager::getFiberManagerUnsafe());
innerEvb.terminateLoopSoon();
});
innerEvb.loopForever();
});
- EXPECT_EQ(&getFiberManager(outerEvb),
- FiberManager::getFiberManagerUnsafe());
+ EXPECT_EQ(
+ &getFiberManager(outerEvb), FiberManager::getFiberManagerUnsafe());
outerEvb.terminateLoopSoon();
});
FiberManager fiberManager(folly::make_unique<SimpleLoopController>());
auto& loopController =
- dynamic_cast<SimpleLoopController&>(fiberManager.loopController());
+ dynamic_cast<SimpleLoopController&>(fiberManager.loopController());
std::queue<Promise<int>> pendingRequests;
static const size_t maxOutstanding = 5;
pendingRequests.pop();
} else {
fiberManager.addTask([&pendingRequests]() {
- for (size_t i = 0; i < sNumAwaits; ++i) {
- auto result = await(
- [&pendingRequests](Promise<int> promise) {
- pendingRequests.push(std::move(promise));
- });
- DCHECK_EQ(result, 0);
- }
- });
+ for (size_t i = 0; i < sNumAwaits; ++i) {
+ auto result = await([&pendingRequests](Promise<int> promise) {
+ pendingRequests.push(std::move(promise));
+ });
+ DCHECK_EQ(result, 0);
+ }
+ });
if (--toSend == 0) {
loopController.stop();
size_t fibersRun = 0;
for (size_t i = 0; i < kNumAllocations; ++i) {
- fiberManager.addTask(
- [&fibersRun] {
- ++fibersRun;
- }
- );
+ fiberManager.addTask([&fibersRun] { ++fibersRun; });
fiberManager.loopUntilNoReady();
}
size_t fibersRun = 0;
for (size_t i = 0; i < kNumAllocations; ++i) {
- fiberManager.addTask(
- [&fibersRun] {
- ++fibersRun;
- }
- );
+ fiberManager.addTask([&fibersRun] { ++fibersRun; });
}
fiberManager.loopUntilNoReady();
struct Application {
public:
- Application ()
+ Application()
: fiberManager(folly::make_unique<SimpleLoopController>()),
toSend(20),
- maxOutstanding(5) {
- }
+ maxOutstanding(5) {}
void loop() {
if (pendingRequests.size() == maxOutstanding || toSend == 0) {
if (pendingRequests.empty()) {
return;
}
- intptr_t value = rand()%1000;
+ intptr_t value = rand() % 1000;
std::cout << "Completing request with data = " << value << std::endl;
pendingRequests.front().setValue(value);
std::cout << "Adding new request with id = " << id << std::endl;
fiberManager.addTask([this, id]() {
- std::cout << "Executing fiber with id = " << id << std::endl;
+ std::cout << "Executing fiber with id = " << id << std::endl;
- auto result1 = await(
- [this](Promise<int> fiber) {
- pendingRequests.push(std::move(fiber));
- });
+ auto result1 = await([this](Promise<int> fiber) {
+ pendingRequests.push(std::move(fiber));
+ });
- std::cout << "Fiber id = " << id
- << " got result1 = " << result1 << std::endl;
+ std::cout << "Fiber id = " << id << " got result1 = " << result1
+ << std::endl;
- auto result2 = await
- ([this](Promise<int> fiber) {
- pendingRequests.push(std::move(fiber));
- });
- std::cout << "Fiber id = " << id
- << " got result2 = " << result2 << std::endl;
- });
+ auto result2 = await([this](Promise<int> fiber) {
+ pendingRequests.push(std::move(fiber));
+ });
+ std::cout << "Fiber id = " << id << " got result2 = " << result2
+ << std::endl;
+ });
if (--toSend == 0) {
auto& loopController =
- dynamic_cast<SimpleLoopController&>(fiberManager.loopController());
+ dynamic_cast<SimpleLoopController&>(fiberManager.loopController());
loopController.stop();
}
}
int main() {
Application app;
- auto loop = [&app]() {
- app.loop();
- };
+ auto loop = [&app]() { app.loop(); };
auto& loopController =
- dynamic_cast<SimpleLoopController&>(app.fiberManager.loopController());
+ dynamic_cast<SimpleLoopController&>(app.fiberManager.loopController());
loopController.loop(std::move(loop));
#include <folly/Benchmark.h>
// for backward compatibility with gflags
-namespace gflags { }
-namespace google { using namespace gflags; }
+namespace gflags {}
+namespace google {
+using namespace gflags;
+}
int main(int argc, char** argv) {
testing::InitGoogleTest(&argc, argv);
#include <boost/type_traits.hpp>
-namespace folly { namespace fibers {
+namespace folly {
+namespace fibers {
/**
* For any functor F taking >= 1 argument,
* If F is a pointer-to-member, will contain a typedef type
* with the type of F's first parameter
*/
-template<typename>
+template <typename>
struct ExtractFirstMemfn;
template <typename Ret, typename T, typename First, typename... Args>
typedef First type;
};
-} // detail
+} // detail
/** Default - use boost */
template <typename F, typename Enable = void>
struct FirstArgOf {
typedef typename boost::function_traits<
- typename std::remove_pointer<F>::type>::arg1_type type;
+ typename std::remove_pointer<F>::type>::arg1_type type;
};
/** Specialization for function objects */
template <typename F>
struct FirstArgOf<F, typename std::enable_if<std::is_class<F>::value>::type> {
- typedef typename detail::ExtractFirstMemfn<
- decltype(&F::operator())>::type type;
+ typedef
+ typename detail::ExtractFirstMemfn<decltype(&F::operator())>::type type;
};
-
-}} // folly::fibers
+}
+} // folly::fibers