#include <algorithm>
#include <cassert>
#include <chrono>
-#include <random>
#include <thread>
#include <folly/Baton.h>
#include <folly/Optional.h>
-#include <folly/Random.h>
+#include <folly/executors/InlineExecutor.h>
#include <folly/futures/Timekeeper.h>
#include <folly/futures/detail/Core.h>
} // namespace futures
template <class T>
-Future<T> Future<T>::makeEmpty() {
- return Future<T>(futures::detail::EmptyConstruct{});
+SemiFuture<typename std::decay<T>::type> makeSemiFuture(T&& t) {
+ return makeSemiFuture(Try<typename std::decay<T>::type>(std::forward<T>(t)));
+}
+
+// makeSemiFutureWith(SemiFuture<T>()) -> SemiFuture<T>
+template <class F>
+typename std::enable_if<
+ isSemiFuture<typename std::result_of<F()>::type>::value,
+ typename std::result_of<F()>::type>::type
+makeSemiFutureWith(F&& func) {
+ using InnerType =
+ typename isSemiFuture<typename std::result_of<F()>::type>::Inner;
+ try {
+ return std::forward<F>(func)();
+ } catch (std::exception& e) {
+ return makeSemiFuture<InnerType>(
+ exception_wrapper(std::current_exception(), e));
+ } catch (...) {
+ return makeSemiFuture<InnerType>(
+ exception_wrapper(std::current_exception()));
+ }
+}
+
+// makeSemiFutureWith(T()) -> SemiFuture<T>
+// makeSemiFutureWith(void()) -> SemiFuture<Unit>
+template <class F>
+typename std::enable_if<
+ !(isSemiFuture<typename std::result_of<F()>::type>::value),
+ SemiFuture<Unit::LiftT<typename std::result_of<F()>::type>>>::type
+makeSemiFutureWith(F&& func) {
+ using LiftedResult = Unit::LiftT<typename std::result_of<F()>::type>;
+ return makeSemiFuture<LiftedResult>(
+ makeTryWith([&func]() mutable { return std::forward<F>(func)(); }));
}
template <class T>
-Future<T>::Future(Future<T>&& other) noexcept : core_(other.core_) {
+SemiFuture<T> makeSemiFuture(std::exception_ptr const& e) {
+ return makeSemiFuture(Try<T>(e));
+}
+
+template <class T>
+SemiFuture<T> makeSemiFuture(exception_wrapper ew) {
+ return makeSemiFuture(Try<T>(std::move(ew)));
+}
+
+template <class T, class E>
+typename std::
+ enable_if<std::is_base_of<std::exception, E>::value, SemiFuture<T>>::type
+ makeSemiFuture(E const& e) {
+ return makeSemiFuture(Try<T>(make_exception_wrapper<E>(e)));
+}
+
+template <class T>
+SemiFuture<T> makeSemiFuture(Try<T>&& t) {
+ return SemiFuture<T>(new futures::detail::Core<T>(std::move(t)));
+}
+
+template <class T>
+SemiFuture<T> SemiFuture<T>::makeEmpty() {
+ return SemiFuture<T>(futures::detail::EmptyConstruct{});
+}
+
+template <class T>
+SemiFuture<T>::SemiFuture(SemiFuture<T>&& other) noexcept : core_(other.core_) {
other.core_ = nullptr;
}
template <class T>
-Future<T>& Future<T>::operator=(Future<T>&& other) noexcept {
+SemiFuture<T>& SemiFuture<T>::operator=(SemiFuture<T>&& other) noexcept {
+ std::swap(core_, other.core_);
+ return *this;
+}
+
+template <class T>
+SemiFuture<T>::SemiFuture(Future<T>&& other) noexcept : core_(other.core_) {
+ other.core_ = nullptr;
+}
+
+template <class T>
+SemiFuture<T>& SemiFuture<T>::operator=(Future<T>&& other) noexcept {
std::swap(core_, other.core_);
return *this;
}
+template <class T>
+template <class T2, typename>
+SemiFuture<T>::SemiFuture(T2&& val)
+ : core_(new futures::detail::Core<T>(Try<T>(std::forward<T2>(val)))) {}
+
+template <class T>
+template <typename T2>
+SemiFuture<T>::SemiFuture(
+ typename std::enable_if<std::is_same<Unit, T2>::value>::type*)
+ : core_(new futures::detail::Core<T>(Try<T>(T()))) {}
+
+template <class T>
+template <
+ class... Args,
+ typename std::enable_if<std::is_constructible<T, Args&&...>::value, int>::
+ type>
+SemiFuture<T>::SemiFuture(in_place_t, Args&&... args)
+ : core_(
+ new futures::detail::Core<T>(in_place, std::forward<Args>(args)...)) {
+}
+
+template <class T>
+SemiFuture<T>::~SemiFuture() {
+ detach();
+}
+
+// This must be defined after the constructors to avoid a bug in MSVC
+// https://connect.microsoft.com/VisualStudio/feedback/details/3142777/out-of-line-constructor-definition-after-implicit-reference-causes-incorrect-c2244
+inline SemiFuture<Unit> makeSemiFuture() {
+ return makeSemiFuture(Unit{});
+}
+
+template <class T>
+T& SemiFuture<T>::value() & {
+ throwIfInvalid();
+
+ return core_->getTry().value();
+}
+
+template <class T>
+T const& SemiFuture<T>::value() const& {
+ throwIfInvalid();
+
+ return core_->getTry().value();
+}
+
+template <class T>
+T&& SemiFuture<T>::value() && {
+ throwIfInvalid();
+
+ return std::move(core_->getTry().value());
+}
+
+template <class T>
+T const&& SemiFuture<T>::value() const&& {
+ throwIfInvalid();
+
+ return std::move(core_->getTry().value());
+}
+
+template <class T>
+inline Future<T> SemiFuture<T>::via(Executor* executor, int8_t priority) && {
+ throwIfInvalid();
+
+ setExecutor(executor, priority);
+
+ auto newFuture = Future<T>(core_);
+ core_ = nullptr;
+ return newFuture;
+}
+
+template <class T>
+inline Future<T> SemiFuture<T>::via(Executor* executor, int8_t priority) & {
+ throwIfInvalid();
+ Promise<T> p;
+ auto f = p.getFuture();
+ auto func = [p = std::move(p)](Try<T>&& t) mutable {
+ p.setTry(std::move(t));
+ };
+ using R = futures::detail::callableResult<T, decltype(func)>;
+ thenImplementation<decltype(func), R>(std::move(func), typename R::Arg());
+ return std::move(f).via(executor, priority);
+}
+
+template <class T>
+bool SemiFuture<T>::isReady() const {
+ throwIfInvalid();
+ return core_->ready();
+}
+
+template <class T>
+bool SemiFuture<T>::hasValue() {
+ return getTry().hasValue();
+}
+
+template <class T>
+bool SemiFuture<T>::hasException() {
+ return getTry().hasException();
+}
+
+template <class T>
+void SemiFuture<T>::detach() {
+ if (core_) {
+ core_->detachFuture();
+ core_ = nullptr;
+ }
+}
+
+template <class T>
+Try<T>& SemiFuture<T>::getTry() {
+ throwIfInvalid();
+
+ return core_->getTry();
+}
+
+template <class T>
+void SemiFuture<T>::throwIfInvalid() const {
+ if (!core_) {
+ throwNoState();
+}
+}
+
+template <class T>
+Optional<Try<T>> SemiFuture<T>::poll() {
+ Optional<Try<T>> o;
+ if (core_->ready()) {
+ o = std::move(core_->getTry());
+ }
+ return o;
+}
+
+template <class T>
+void SemiFuture<T>::raise(exception_wrapper exception) {
+ core_->raise(std::move(exception));
+}
+
+template <class T>
+template <class F>
+void SemiFuture<T>::setCallback_(F&& func) {
+ throwIfInvalid();
+ core_->setCallback(std::forward<F>(func));
+}
+
+template <class T>
+SemiFuture<T>::SemiFuture(futures::detail::EmptyConstruct) noexcept
+ : core_(nullptr) {}
+
+template <class T>
+Future<T> Future<T>::makeEmpty() {
+ return Future<T>(futures::detail::EmptyConstruct{});
+}
+
+template <class T>
+Future<T>::Future(Future<T>&& other) noexcept
+ : SemiFuture<T>(std::move(other)) {}
+
+template <class T>
+Future<T>& Future<T>::operator=(Future<T>&& other) noexcept {
+ SemiFuture<T>::operator=(SemiFuture<T>{std::move(other)});
+ return *this;
+}
+
template <class T>
template <
class T2,
std::move(other).then([](T2&& v) { return T(std::move(v)); }));
}
+// TODO: isSemiFuture
template <class T>
template <class T2, typename>
-Future<T>::Future(T2&& val)
- : core_(new futures::detail::Core<T>(Try<T>(std::forward<T2>(val)))) {}
+Future<T>::Future(T2&& val) : SemiFuture<T>(std::forward<T2>(val)) {}
template <class T>
template <typename T2>
Future<T>::Future(typename std::enable_if<std::is_same<Unit, T2>::value>::type*)
- : core_(new futures::detail::Core<T>(Try<T>(T()))) {}
+ : SemiFuture<T>() {}
template <class T>
template <
typename std::enable_if<std::is_constructible<T, Args&&...>::value, int>::
type>
Future<T>::Future(in_place_t, Args&&... args)
- : core_(
- new futures::detail::Core<T>(in_place, std::forward<Args>(args)...)) {
-}
+ : SemiFuture<T>(in_place, std::forward<Args>(args)...) {}
template <class T>
Future<T>::~Future() {
- detach();
-}
-
-template <class T>
-void Future<T>::detach() {
- if (core_) {
- core_->detachFuture();
- core_ = nullptr;
- }
-}
-
-template <class T>
-void Future<T>::throwIfInvalid() const {
- if (!core_)
- throwNoState();
-}
-
-template <class T>
-template <class F>
-void Future<T>::setCallback_(F&& func) {
- throwIfInvalid();
- core_->setCallback(std::forward<F>(func));
}
// unwrap
template <class T>
template <typename F, typename R, bool isTry, typename... Args>
typename std::enable_if<!R::ReturnsFuture::value, typename R::Return>::type
-Future<T>::thenImplementation(
+SemiFuture<T>::thenImplementation(
F&& func,
futures::detail::argResult<isTry, F, Args...>) {
static_assert(sizeof...(Args) <= 1, "Then must take zero/one argument");
typedef typename R::ReturnsFuture::Inner B;
- throwIfInvalid();
+ this->throwIfInvalid();
Promise<B> p;
- p.core_->setInterruptHandlerNoLock(core_->getInterruptHandler());
+ p.core_->setInterruptHandlerNoLock(this->core_->getInterruptHandler());
// grab the Future now before we lose our handle on the Promise
auto f = p.getFuture();
- f.core_->setExecutorNoLock(getExecutor());
+ f.core_->setExecutorNoLock(this->getExecutor());
/* This is a bit tricky.
in some circumstances, but I think it should be explicit not implicit
in the destruction of the Future used to create it.
*/
- setCallback_(
+ this->setCallback_(
[state = futures::detail::makeCoreCallbackState(
- std::move(p), std::forward<F>(func))](Try<T> && t) mutable {
+ std::move(p), std::forward<F>(func))](Try<T>&& t) mutable {
+
if (!isTry && t.hasException()) {
state.setException(std::move(t.exception()));
} else {
[&] { return state.invoke(t.template get<isTry, Args>()...); }));
}
});
-
return f;
}
template <class T>
template <typename F, typename R, bool isTry, typename... Args>
typename std::enable_if<R::ReturnsFuture::value, typename R::Return>::type
-Future<T>::thenImplementation(
+SemiFuture<T>::thenImplementation(
F&& func,
futures::detail::argResult<isTry, F, Args...>) {
static_assert(sizeof...(Args) <= 1, "Then must take zero/one argument");
typedef typename R::ReturnsFuture::Inner B;
-
- throwIfInvalid();
+ this->throwIfInvalid();
Promise<B> p;
- p.core_->setInterruptHandlerNoLock(core_->getInterruptHandler());
+ p.core_->setInterruptHandlerNoLock(this->core_->getInterruptHandler());
// grab the Future now before we lose our handle on the Promise
auto f = p.getFuture();
- f.core_->setExecutorNoLock(getExecutor());
+ f.core_->setExecutorNoLock(this->getExecutor());
- setCallback_(
+ this->setCallback_(
[state = futures::detail::makeCoreCallbackState(
- std::move(p), std::forward<F>(func))](Try<T> && t) mutable {
+ std::move(p), std::forward<F>(func))](Try<T>&& t) mutable {
if (!isTry && t.hasException()) {
state.setException(std::move(t.exception()));
} else {
typedef typename std::remove_cv<typename std::remove_reference<
typename futures::detail::ArgType<Args...>::FirstArg>::type>::type
FirstArg;
+
return then([instance, func](Try<T>&& t){
return (instance->*func)(t.template get<isTry<FirstArg>::value, Args>()...);
});
"Return type of onError callback must be T or Future<T>");
Promise<T> p;
- p.core_->setInterruptHandlerNoLock(core_->getInterruptHandler());
+ p.core_->setInterruptHandlerNoLock(this->core_->getInterruptHandler());
auto f = p.getFuture();
- setCallback_(
+ this->setCallback_(
[state = futures::detail::makeCoreCallbackState(
- std::move(p), std::forward<F>(func))](Try<T> && t) mutable {
+ std::move(p), std::forward<F>(func))](Try<T>&& t) mutable {
if (auto e = t.template tryGetExceptionObject<Exn>()) {
state.setTry(makeTryWith([&] { return state.invoke(*e); }));
} else {
Promise<T> p;
auto f = p.getFuture();
- setCallback_(
+ this->setCallback_(
[state = futures::detail::makeCoreCallbackState(
- std::move(p), std::forward<F>(func))](Try<T> && t) mutable {
+ std::move(p), std::forward<F>(func))](Try<T>&& t) mutable {
if (auto e = t.template tryGetExceptionObject<Exn>()) {
auto tf2 = state.tryInvoke(*e);
if (tf2.hasException()) {
Promise<T> p;
auto f = p.getFuture();
- setCallback_(
+ this->setCallback_(
[state = futures::detail::makeCoreCallbackState(
std::move(p), std::forward<F>(func))](Try<T> t) mutable {
if (t.hasException()) {
Promise<T> p;
auto f = p.getFuture();
- setCallback_(
+ this->setCallback_(
[state = futures::detail::makeCoreCallbackState(
- std::move(p), std::forward<F>(func))](Try<T> && t) mutable {
+ std::move(p), std::forward<F>(func))](Try<T>&& t) mutable {
if (t.hasException()) {
state.setTry(makeTryWith(
[&] { return state.invoke(std::move(t.exception())); }));
return f;
}
-template <class T>
-typename std::add_lvalue_reference<T>::type Future<T>::value() {
- throwIfInvalid();
-
- return core_->getTry().value();
-}
-
-template <class T>
-typename std::add_lvalue_reference<const T>::type Future<T>::value() const {
- throwIfInvalid();
-
- return core_->getTry().value();
-}
-
-template <class T>
-Try<T>& Future<T>::getTry() {
- throwIfInvalid();
-
- return core_->getTry();
-}
-
template <class T>
Try<T>& Future<T>::getTryVia(DrivableExecutor* e) {
return waitVia(e).getTry();
}
-template <class T>
-Optional<Try<T>> Future<T>::poll() {
- Optional<Try<T>> o;
- if (core_->ready()) {
- o = std::move(core_->getTry());
- }
- return o;
-}
-
-template <class T>
-inline Future<T> Future<T>::via(Executor* executor, int8_t priority) && {
- throwIfInvalid();
-
- setExecutor(executor, priority);
-
- return std::move(*this);
-}
-
-template <class T>
-inline Future<T> Future<T>::via(Executor* executor, int8_t priority) & {
- throwIfInvalid();
-
- Promise<T> p;
- auto f = p.getFuture();
- then([p = std::move(p)](Try<T> && t) mutable { p.setTry(std::move(t)); });
- return std::move(f).via(executor, priority);
-}
-
template <class Func>
auto via(Executor* x, Func&& func)
-> Future<typename isFuture<decltype(std::declval<Func>()())>::Inner> {
}
template <class T>
-bool Future<T>::isReady() const {
- throwIfInvalid();
- return core_->ready();
-}
-
-template <class T>
-bool Future<T>::hasValue() {
- return getTry().hasValue();
-}
-
-template <class T>
-bool Future<T>::hasException() {
- return getTry().hasException();
-}
-
-template <class T>
-void Future<T>::raise(exception_wrapper exception) {
- core_->raise(std::move(exception));
-}
-
-template <class T>
-Future<T>::Future(futures::detail::EmptyConstruct) noexcept : core_(nullptr) {}
+Future<T>::Future(futures::detail::EmptyConstruct) noexcept
+ : SemiFuture<T>(futures::detail::EmptyConstruct{}) {}
// makeFuture
return makeFuture(Try<typename std::decay<T>::type>(std::forward<T>(t)));
}
-inline // for multiple translation units
-Future<Unit> makeFuture() {
+inline Future<Unit> makeFuture() {
return makeFuture(Unit{});
}
template <class F>
typename std::enable_if<
!(isFuture<typename std::result_of<F()>::type>::value),
- Future<typename Unit::Lift<typename std::result_of<F()>::type>::type>>::type
+ Future<Unit::LiftT<typename std::result_of<F()>::type>>>::type
makeFutureWith(F&& func) {
- using LiftedResult =
- typename Unit::Lift<typename std::result_of<F()>::type>::type;
+ using LiftedResult = Unit::LiftT<typename std::result_of<F()>::type>;
return makeFuture<LiftedResult>(
makeTryWith([&func]() mutable { return std::forward<F>(func)(); }));
}
template <class Collection, class F, class ItT, class Result>
std::vector<Future<Result>>
window(Collection input, F func, size_t n) {
- struct WindowContext {
- WindowContext(Collection&& i, F&& fn)
- : input_(std::move(i)), promises_(input_.size()),
- func_(std::move(fn))
- {}
- std::atomic<size_t> i_ {0};
- Collection input_;
- std::vector<Promise<Result>> promises_;
- F func_;
+ // Use global inline executor singleton
+ auto executor = &InlineExecutor::instance();
+ return window(executor, std::move(input), std::move(func), n);
+}
- static inline void spawn(const std::shared_ptr<WindowContext>& ctx) {
- size_t i = ctx->i_++;
- if (i < ctx->input_.size()) {
- // Using setCallback_ directly since we don't need the Future
- ctx->func_(std::move(ctx->input_[i])).setCallback_(
- // ctx is captured by value
- [ctx, i](Try<Result>&& t) {
- ctx->promises_[i].setTry(std::move(t));
+template <class Collection, class F, class ItT, class Result>
+std::vector<Future<Result>>
+window(Executor* executor, Collection input, F func, size_t n) {
+ struct WindowContext {
+ WindowContext(Executor* executor_, Collection&& input_, F&& func_)
+ : executor(executor_),
+ input(std::move(input_)),
+ promises(input.size()),
+ func(std::move(func_)) {}
+ std::atomic<size_t> i{0};
+ Executor* executor;
+ Collection input;
+ std::vector<Promise<Result>> promises;
+ F func;
+
+ static inline void spawn(std::shared_ptr<WindowContext> ctx) {
+ size_t i = ctx->i++;
+ if (i < ctx->input.size()) {
+ auto fut = ctx->func(std::move(ctx->input[i]));
+ fut.setCallback_([ctx = std::move(ctx), i](Try<Result>&& t) mutable {
+ const auto executor_ = ctx->executor;
+ executor_->add([ctx = std::move(ctx), i, t = std::move(t)]() mutable {
+ ctx->promises[i].setTry(std::move(t));
// Chain another future onto this one
spawn(std::move(ctx));
});
+ });
}
}
};
auto max = std::min(n, input.size());
auto ctx = std::make_shared<WindowContext>(
- std::move(input), std::move(func));
+ executor, std::move(input), std::move(func));
+ // Start the first n Futures
for (size_t i = 0; i < max; ++i) {
- // Start the first n Futures
- WindowContext::spawn(ctx);
+ executor->add([ctx]() mutable { WindowContext::spawn(std::move(ctx)); });
}
std::vector<Future<Result>> futures;
- futures.reserve(ctx->promises_.size());
- for (auto& promise : ctx->promises_) {
+ futures.reserve(ctx->promises.size());
+ for (auto& promise : ctx->promises) {
futures.emplace_back(promise.getFuture());
}
}
std::shared_ptr<Timekeeper> tks;
- if (!tk) {
+ if (LIKELY(!tk)) {
tks = folly::detail::getTimekeeperSingleton();
- tk = DCHECK_NOTNULL(tks.get());
+ tk = tks.get();
+ }
+
+ if (UNLIKELY(!tk)) {
+ return makeFuture<T>(NoTimekeeper());
}
auto ctx = std::make_shared<Context>(std::move(e));
}
});
- return ctx->promise.getFuture().via(getExecutor());
+ return ctx->promise.getFuture().via(this->getExecutor());
}
// delayed
namespace futures {
namespace detail {
-template <class T>
-void waitImpl(Future<T>& f) {
+template <class FutureType, typename T = typename FutureType::value_type>
+void waitImpl(FutureType& f) {
// short-circuit if there's nothing to do
- if (f.isReady()) return;
+ if (f.isReady()) {
+ return;
+ }
FutureBatonType baton;
f.setCallback_([&](const Try<T>& /* t */) { baton.post(); });
assert(f.isReady());
}
-template <class T>
-void waitImpl(Future<T>& f, Duration dur) {
+template <class FutureType, typename T = typename FutureType::value_type>
+void waitImpl(FutureType& f, Duration dur) {
// short-circuit if there's nothing to do
if (f.isReady()) {
return;
// Set callback so to ensure that the via executor has something on it
// so that once the preceding future triggers this callback, drive will
// always have a callback to satisfy it
- if (f.isReady())
+ if (f.isReady()) {
return;
+ }
f = f.via(e).then([](T&& t) { return std::move(t); });
while (!f.isReady()) {
e->drive();
} // namespace detail
} // namespace futures
+template <class T>
+SemiFuture<T>& SemiFuture<T>::wait() & {
+ futures::detail::waitImpl(*this);
+ return *this;
+}
+
+template <class T>
+SemiFuture<T>&& SemiFuture<T>::wait() && {
+ futures::detail::waitImpl(*this);
+ return std::move(*this);
+}
+
+template <class T>
+SemiFuture<T>& SemiFuture<T>::wait(Duration dur) & {
+ futures::detail::waitImpl(*this, dur);
+ return *this;
+}
+
+template <class T>
+SemiFuture<T>&& SemiFuture<T>::wait(Duration dur) && {
+ futures::detail::waitImpl(*this, dur);
+ return std::move(*this);
+}
+
+template <class T>
+T SemiFuture<T>::get() {
+ return std::move(wait().value());
+}
+
+template <class T>
+T SemiFuture<T>::get(Duration dur) {
+ wait(dur);
+ if (this->isReady()) {
+ return std::move(this->value());
+ } else {
+ throwTimedOut();
+ }
+}
+
template <class T>
Future<T>& Future<T>::wait() & {
futures::detail::waitImpl(*this);
return std::move(*this);
}
-template <class T>
-T Future<T>::get() {
- return std::move(wait().value());
-}
-
-template <class T>
-T Future<T>::get(Duration dur) {
- wait(dur);
- if (isReady()) {
- return std::move(value());
- } else {
- throwTimedOut();
- }
-}
-
template <class T>
T Future<T>::getVia(DrivableExecutor* e) {
return std::move(waitVia(e).value());
}
}
-namespace futures {
-
-namespace detail {
-
-struct retrying_policy_raw_tag {};
-struct retrying_policy_fut_tag {};
-
-template <class Policy>
-struct retrying_policy_traits {
- using result = std::result_of_t<Policy(size_t, const exception_wrapper&)>;
- using is_raw = std::is_same<result, bool>;
- using is_fut = std::is_same<result, Future<bool>>;
- using tag = typename std::conditional<
- is_raw::value, retrying_policy_raw_tag, typename std::conditional<
- is_fut::value, retrying_policy_fut_tag, void>::type>::type;
-};
-
-template <class Policy, class FF, class Prom>
-void retryingImpl(size_t k, Policy&& p, FF&& ff, Prom prom) {
- using F = typename std::result_of<FF(size_t)>::type;
- using T = typename F::value_type;
- auto f = makeFutureWith([&] { return ff(k++); });
- f.then([
- k,
- prom = std::move(prom),
- pm = std::forward<Policy>(p),
- ffm = std::forward<FF>(ff)
- ](Try<T> && t) mutable {
- if (t.hasValue()) {
- prom.setValue(std::move(t).value());
- return;
- }
- auto& x = t.exception();
- auto q = pm(k, x);
- q.then([
- k,
- prom = std::move(prom),
- xm = std::move(x),
- pm = std::move(pm),
- ffm = std::move(ffm)
- ](bool shouldRetry) mutable {
- if (shouldRetry) {
- retryingImpl(k, std::move(pm), std::move(ffm), std::move(prom));
- } else {
- prom.setException(std::move(xm));
- };
- });
- });
-}
-
-template <class Policy, class FF>
-typename std::result_of<FF(size_t)>::type
-retrying(size_t k, Policy&& p, FF&& ff) {
- using F = typename std::result_of<FF(size_t)>::type;
- using T = typename F::value_type;
- auto prom = Promise<T>();
- auto f = prom.getFuture();
- retryingImpl(
- k, std::forward<Policy>(p), std::forward<FF>(ff), std::move(prom));
- return f;
-}
-
-template <class Policy, class FF>
-typename std::result_of<FF(size_t)>::type
-retrying(Policy&& p, FF&& ff, retrying_policy_raw_tag) {
- auto q = [pm = std::forward<Policy>(p)](size_t k, exception_wrapper x) {
- return makeFuture<bool>(pm(k, x));
- };
- return retrying(0, std::move(q), std::forward<FF>(ff));
-}
-
-template <class Policy, class FF>
-typename std::result_of<FF(size_t)>::type
-retrying(Policy&& p, FF&& ff, retrying_policy_fut_tag) {
- return retrying(0, std::forward<Policy>(p), std::forward<FF>(ff));
-}
-
-// jittered exponential backoff, clamped to [backoff_min, backoff_max]
-template <class URNG>
-Duration retryingJitteredExponentialBackoffDur(
- size_t n,
- Duration backoff_min,
- Duration backoff_max,
- double jitter_param,
- URNG& rng) {
- using d = Duration;
- auto dist = std::normal_distribution<double>(0.0, jitter_param);
- auto jitter = std::exp(dist(rng));
- auto backoff = d(d::rep(jitter * backoff_min.count() * std::pow(2, n - 1)));
- return std::max(backoff_min, std::min(backoff_max, backoff));
-}
-
-template <class Policy, class URNG>
-std::function<Future<bool>(size_t, const exception_wrapper&)>
-retryingPolicyCappedJitteredExponentialBackoff(
- size_t max_tries,
- Duration backoff_min,
- Duration backoff_max,
- double jitter_param,
- URNG&& rng,
- Policy&& p) {
- return [
- pm = std::forward<Policy>(p),
- max_tries,
- backoff_min,
- backoff_max,
- jitter_param,
- rngp = std::forward<URNG>(rng)
- ](size_t n, const exception_wrapper& ex) mutable {
- if (n == max_tries) {
- return makeFuture(false);
- }
- return pm(n, ex).then(
- [ n, backoff_min, backoff_max, jitter_param, rngp = std::move(rngp) ](
- bool v) mutable {
- if (!v) {
- return makeFuture(false);
- }
- auto backoff = detail::retryingJitteredExponentialBackoffDur(
- n, backoff_min, backoff_max, jitter_param, rngp);
- return futures::sleep(backoff).then([] { return true; });
- });
- };
-}
-
-template <class Policy, class URNG>
-std::function<Future<bool>(size_t, const exception_wrapper&)>
-retryingPolicyCappedJitteredExponentialBackoff(
- size_t max_tries,
- Duration backoff_min,
- Duration backoff_max,
- double jitter_param,
- URNG&& rng,
- Policy&& p,
- retrying_policy_raw_tag) {
- auto q = [pm = std::forward<Policy>(p)](
- size_t n, const exception_wrapper& e) {
- return makeFuture(pm(n, e));
- };
- return retryingPolicyCappedJitteredExponentialBackoff(
- max_tries,
- backoff_min,
- backoff_max,
- jitter_param,
- std::forward<URNG>(rng),
- std::move(q));
-}
-
-template <class Policy, class URNG>
-std::function<Future<bool>(size_t, const exception_wrapper&)>
-retryingPolicyCappedJitteredExponentialBackoff(
- size_t max_tries,
- Duration backoff_min,
- Duration backoff_max,
- double jitter_param,
- URNG&& rng,
- Policy&& p,
- retrying_policy_fut_tag) {
- return retryingPolicyCappedJitteredExponentialBackoff(
- max_tries,
- backoff_min,
- backoff_max,
- jitter_param,
- std::forward<URNG>(rng),
- std::forward<Policy>(p));
-}
-}
-
-template <class Policy, class FF>
-typename std::result_of<FF(size_t)>::type
-retrying(Policy&& p, FF&& ff) {
- using tag = typename detail::retrying_policy_traits<Policy>::tag;
- return detail::retrying(std::forward<Policy>(p), std::forward<FF>(ff), tag());
-}
-
-inline
-std::function<bool(size_t, const exception_wrapper&)>
-retryingPolicyBasic(
- size_t max_tries) {
- return [=](size_t n, const exception_wrapper&) { return n < max_tries; };
-}
-
-template <class Policy, class URNG>
-std::function<Future<bool>(size_t, const exception_wrapper&)>
-retryingPolicyCappedJitteredExponentialBackoff(
- size_t max_tries,
- Duration backoff_min,
- Duration backoff_max,
- double jitter_param,
- URNG&& rng,
- Policy&& p) {
- using tag = typename detail::retrying_policy_traits<Policy>::tag;
- return detail::retryingPolicyCappedJitteredExponentialBackoff(
- max_tries,
- backoff_min,
- backoff_max,
- jitter_param,
- std::forward<URNG>(rng),
- std::forward<Policy>(p),
- tag());
-}
-
-inline
-std::function<Future<bool>(size_t, const exception_wrapper&)>
-retryingPolicyCappedJitteredExponentialBackoff(
- size_t max_tries,
- Duration backoff_min,
- Duration backoff_max,
- double jitter_param) {
- auto p = [](size_t, const exception_wrapper&) { return true; };
- return retryingPolicyCappedJitteredExponentialBackoff(
- max_tries,
- backoff_min,
- backoff_max,
- jitter_param,
- ThreadLocalPRNG(),
- std::move(p));
-}
-
-}
-
// Instantiate the most common Future types to save compile time
extern template class Future<Unit>;
extern template class Future<bool>;
extern template class Future<int64_t>;
extern template class Future<std::string>;
extern template class Future<double>;
-
} // namespace folly