X-Git-Url: http://demsky.eecs.uci.edu/git/?a=blobdiff_plain;f=folly%2Ffutures%2FFuture-inl.h;h=1baee146817ac679e9c72c210c5ff312c8cd8df7;hb=6e7e5a64d39ea0f2ed11660f46c4ba4d926ff419;hp=18130baf7c1a5ac6a802d2e994a7568424563e0e;hpb=a95a6976020356513a4c0dc8a3c8557c0b2b4496;p=folly.git diff --git a/folly/futures/Future-inl.h b/folly/futures/Future-inl.h index 18130baf..1baee146 100644 --- a/folly/futures/Future-inl.h +++ b/folly/futures/Future-inl.h @@ -19,12 +19,11 @@ #include #include #include -#include #include #include #include -#include +#include #include #include @@ -130,106 +129,25 @@ inline auto makeCoreCallbackState(Promise&& p, F&& f) noexcept( return CoreCallbackState>>( std::move(p), std::forward(f)); } -} // namespace detail -} // namespace futures - -template -SemiFuture::type> makeSemiFuture(T&& t) { - return makeSemiFuture(Try::type>(std::forward(t))); -} - -inline SemiFuture makeSemiFuture() { - return makeSemiFuture(Unit{}); -} - -// makeSemiFutureWith(SemiFuture()) -> SemiFuture -template -typename std::enable_if< - isSemiFuture::type>::value, - typename std::result_of::type>::type -makeSemiFutureWith(F&& func) { - using InnerType = - typename isSemiFuture::type>::Inner; - try { - return std::forward(func)(); - } catch (std::exception& e) { - return makeSemiFuture( - exception_wrapper(std::current_exception(), e)); - } catch (...) { - return makeSemiFuture( - exception_wrapper(std::current_exception())); - } -} - -// makeSemiFutureWith(T()) -> SemiFuture -// makeSemiFutureWith(void()) -> SemiFuture -template -typename std::enable_if< - !(isSemiFuture::type>::value), - SemiFuture::type>>>::type -makeSemiFutureWith(F&& func) { - using LiftedResult = Unit::LiftT::type>; - return makeSemiFuture( - makeTryWith([&func]() mutable { return std::forward(func)(); })); -} - -template -SemiFuture makeSemiFuture(std::exception_ptr const& e) { - return makeSemiFuture(Try(e)); -} - -template -SemiFuture makeSemiFuture(exception_wrapper ew) { - return makeSemiFuture(Try(std::move(ew))); -} - -template -typename std:: - enable_if::value, SemiFuture>::type - makeSemiFuture(E const& e) { - return makeSemiFuture(Try(make_exception_wrapper(e))); -} - -template -SemiFuture makeSemiFuture(Try&& t) { - return SemiFuture(new futures::detail::Core(std::move(t))); -} - -template -SemiFuture SemiFuture::makeEmpty() { - return SemiFuture(futures::detail::EmptyConstruct{}); -} template -SemiFuture::SemiFuture(SemiFuture&& other) noexcept : core_(other.core_) { +FutureBase::FutureBase(SemiFuture&& other) noexcept : core_(other.core_) { other.core_ = nullptr; } template -SemiFuture& SemiFuture::operator=(SemiFuture&& other) noexcept { - std::swap(core_, other.core_); - return *this; -} - -template -SemiFuture::SemiFuture(Future&& other) noexcept : core_(other.core_) { +FutureBase::FutureBase(Future&& other) noexcept : core_(other.core_) { other.core_ = nullptr; } -template -SemiFuture& SemiFuture::operator=(Future&& other) noexcept { - std::swap(core_, other.core_); - return *this; -} - template template -SemiFuture::SemiFuture(T2&& val) +FutureBase::FutureBase(T2&& val) : core_(new futures::detail::Core(Try(std::forward(val)))) {} template template -SemiFuture::SemiFuture( +FutureBase::FutureBase( typename std::enable_if::value>::type*) : core_(new futures::detail::Core(Try(T()))) {} @@ -238,72 +156,68 @@ template < class... Args, typename std::enable_if::value, int>:: type> -SemiFuture::SemiFuture(in_place_t, Args&&... args) +FutureBase::FutureBase(in_place_t, Args&&... args) : core_( new futures::detail::Core(in_place, std::forward(args)...)) { } template -SemiFuture::~SemiFuture() { +template +void FutureBase::assign(FutureType& other) noexcept { + std::swap(core_, other.core_); +} + +template +FutureBase::~FutureBase() { detach(); } template -typename std::add_lvalue_reference::type SemiFuture::value() { +T& FutureBase::value() & { throwIfInvalid(); return core_->getTry().value(); } template -typename std::add_lvalue_reference::type SemiFuture::value() const { +T const& FutureBase::value() const& { throwIfInvalid(); return core_->getTry().value(); } template -inline Future SemiFuture::via(Executor* executor, int8_t priority) && { +T&& FutureBase::value() && { throwIfInvalid(); - setExecutor(executor, priority); - - auto newFuture = Future(core_); - core_ = nullptr; - return newFuture; + return std::move(core_->getTry().value()); } template -inline Future SemiFuture::via(Executor* executor, int8_t priority) & { +T const&& FutureBase::value() const&& { throwIfInvalid(); - Promise p; - auto f = p.getFuture(); - auto func = [p = std::move(p)](Try&& t) mutable { - p.setTry(std::move(t)); - }; - using R = futures::detail::callableResult; - thenImplementation(std::move(func), typename R::Arg()); - return std::move(f).via(executor, priority); + + return std::move(core_->getTry().value()); } template -bool SemiFuture::isReady() const { +bool FutureBase::isReady() const { throwIfInvalid(); return core_->ready(); } template -bool SemiFuture::hasValue() { +bool FutureBase::hasValue() { return getTry().hasValue(); } template -bool SemiFuture::hasException() { +bool FutureBase::hasException() { return getTry().hasException(); } template -void SemiFuture::detach() { +void FutureBase::detach() { if (core_) { core_->detachFuture(); core_ = nullptr; @@ -311,20 +225,21 @@ void SemiFuture::detach() { } template -Try& SemiFuture::getTry() { +Try& FutureBase::getTry() { throwIfInvalid(); return core_->getTry(); } template -void SemiFuture::throwIfInvalid() const { - if (!core_) +void FutureBase::throwIfInvalid() const { + if (!core_) { throwNoState(); + } } template -Optional> SemiFuture::poll() { +Optional> FutureBase::poll() { Optional> o; if (core_->ready()) { o = std::move(core_->getTry()); @@ -333,104 +248,21 @@ Optional> SemiFuture::poll() { } template -void SemiFuture::raise(exception_wrapper exception) { +void FutureBase::raise(exception_wrapper exception) { core_->raise(std::move(exception)); } template template -void SemiFuture::setCallback_(F&& func) { +void FutureBase::setCallback_(F&& func) { throwIfInvalid(); core_->setCallback(std::forward(func)); } template -SemiFuture::SemiFuture(futures::detail::EmptyConstruct) noexcept +FutureBase::FutureBase(futures::detail::EmptyConstruct) noexcept : core_(nullptr) {} -template -Future Future::makeEmpty() { - return Future(futures::detail::EmptyConstruct{}); -} - -template -Future::Future(Future&& other) noexcept - : SemiFuture(std::move(other)) {} - -template -Future& Future::operator=(Future&& other) noexcept { - SemiFuture::operator=(SemiFuture{std::move(other)}); - return *this; -} - -template -template < - class T2, - typename std::enable_if< - !std::is_same::type>::value && - std::is_constructible::value && - std::is_convertible::value, - int>::type> -Future::Future(Future&& other) - : Future(std::move(other).then([](T2&& v) { return T(std::move(v)); })) {} - -template -template < - class T2, - typename std::enable_if< - !std::is_same::type>::value && - std::is_constructible::value && - !std::is_convertible::value, - int>::type> -Future::Future(Future&& other) - : Future(std::move(other).then([](T2&& v) { return T(std::move(v)); })) {} - -template -template < - class T2, - typename std::enable_if< - !std::is_same::type>::value && - std::is_constructible::value, - int>::type> -Future& Future::operator=(Future&& other) { - return operator=( - std::move(other).then([](T2&& v) { return T(std::move(v)); })); -} - -// TODO: isSemiFuture -template -template -Future::Future(T2&& val) : SemiFuture(std::forward(val)) {} - -template -template -Future::Future(typename std::enable_if::value>::type*) - : SemiFuture() {} - -template -template < - class... Args, - typename std::enable_if::value, int>:: - type> -Future::Future(in_place_t, Args&&... args) - : SemiFuture(in_place, std::forward(args)...) {} - -template -Future::~Future() { -} - -// unwrap - -template -template -typename std::enable_if::value, - Future::Inner>>::type -Future::unwrap() { - return then([](Future::Inner> internal_future) { - return internal_future; - }); -} - // then // Variant: returns a value @@ -438,7 +270,7 @@ Future::unwrap() { template template typename std::enable_if::type -SemiFuture::thenImplementation( +FutureBase::thenImplementation( F&& func, futures::detail::argResult) { static_assert(sizeof...(Args) <= 1, "Then must take zero/one argument"); @@ -501,7 +333,7 @@ SemiFuture::thenImplementation( template template typename std::enable_if::type -SemiFuture::thenImplementation( +FutureBase::thenImplementation( F&& func, futures::detail::argResult) { static_assert(sizeof...(Args) <= 1, "Then must take zero/one argument"); @@ -534,6 +366,261 @@ SemiFuture::thenImplementation( return f; } +} // namespace detail +} // namespace futures + +template +SemiFuture::type> makeSemiFuture(T&& t) { + return makeSemiFuture(Try::type>(std::forward(t))); +} + +// makeSemiFutureWith(SemiFuture()) -> SemiFuture +template +typename std::enable_if< + isSemiFuture::type>::value, + typename std::result_of::type>::type +makeSemiFutureWith(F&& func) { + using InnerType = + typename isSemiFuture::type>::Inner; + try { + return std::forward(func)(); + } catch (std::exception& e) { + return makeSemiFuture( + exception_wrapper(std::current_exception(), e)); + } catch (...) { + return makeSemiFuture( + exception_wrapper(std::current_exception())); + } +} + +// makeSemiFutureWith(T()) -> SemiFuture +// makeSemiFutureWith(void()) -> SemiFuture +template +typename std::enable_if< + !(isSemiFuture::type>::value), + SemiFuture::type>>>::type +makeSemiFutureWith(F&& func) { + using LiftedResult = Unit::LiftT::type>; + return makeSemiFuture( + makeTryWith([&func]() mutable { return std::forward(func)(); })); +} + +template +SemiFuture makeSemiFuture(std::exception_ptr const& e) { + return makeSemiFuture(Try(e)); +} + +template +SemiFuture makeSemiFuture(exception_wrapper ew) { + return makeSemiFuture(Try(std::move(ew))); +} + +template +typename std:: + enable_if::value, SemiFuture>::type + makeSemiFuture(E const& e) { + return makeSemiFuture(Try(make_exception_wrapper(e))); +} + +template +SemiFuture makeSemiFuture(Try&& t) { + return SemiFuture(new futures::detail::Core(std::move(t))); +} + +// This must be defined after the constructors to avoid a bug in MSVC +// https://connect.microsoft.com/VisualStudio/feedback/details/3142777/out-of-line-constructor-definition-after-implicit-reference-causes-incorrect-c2244 +inline SemiFuture makeSemiFuture() { + return makeSemiFuture(Unit{}); +} + +template +SemiFuture SemiFuture::makeEmpty() { + return SemiFuture(futures::detail::EmptyConstruct{}); +} + +template +SemiFuture::SemiFuture(SemiFuture&& other) noexcept + : futures::detail::FutureBase(std::move(other)) {} + +template +SemiFuture::SemiFuture(Future&& other) noexcept + : futures::detail::FutureBase(std::move(other)) { + // SemiFuture should not have an executor on construction + if (this->core_) { + this->setExecutor(nullptr); + } +} + +template +SemiFuture& SemiFuture::operator=(SemiFuture&& other) noexcept { + this->assign(other); + return *this; +} + +template +SemiFuture& SemiFuture::operator=(Future&& other) noexcept { + this->assign(other); + // SemiFuture should not have an executor on construction + if (this->core_) { + this->setExecutor(nullptr); + } + return *this; +} + +template +void SemiFuture::boost_() { + // If a SemiFuture has an executor it should be deferred, so boost it + if (auto e = this->getExecutor()) { + // We know in a SemiFuture that if we have an executor it should be + // DeferredExecutor. Verify this in debug mode. + DCHECK(dynamic_cast(e)); + + auto ka = static_cast(e)->getKeepAliveToken(); + static_cast(e)->boost(); + } +} + +template +inline Future SemiFuture::via(Executor* executor, int8_t priority) && { + throwIfInvalid(); + + // If current executor is deferred, boost block to ensure that work + // progresses and is run on the new executor. + auto oldExecutor = this->getExecutor(); + if (oldExecutor && executor && (executor != oldExecutor)) { + // We know in a SemiFuture that if we have an executor it should be + // DeferredExecutor. Verify this in debug mode. + DCHECK(dynamic_cast(this->getExecutor())); + if (static_cast(oldExecutor)) { + executor->add([oldExecutorKA = oldExecutor->getKeepAliveToken()]() { + static_cast(oldExecutorKA.get())->boost(); + }); + } + } + + this->setExecutor(executor, priority); + + auto newFuture = Future(this->core_); + this->core_ = nullptr; + return newFuture; +} + +template +template +SemiFuture::Return::value_type> +SemiFuture::defer(F&& func) && { + // If we already have a deferred executor, use it, otherwise create one + auto defKeepAlive = this->getExecutor() + ? this->getExecutor()->getKeepAliveToken() + : DeferredExecutor::create(); + auto e = defKeepAlive.get(); + // We know in a SemiFuture that if we have an executor it should be + // DeferredExecutor (either it was that way before, or we just created it). + // Verify this in debug mode. + DCHECK(dynamic_cast(e)); + // Convert to a folly::future with a deferred executor + // Will be low-cost if this is not a new executor as via optimises for that + // case + auto sf = + std::move(*this) + .via(defKeepAlive.get()) + // Then add the work, with a wrapper function that captures the + // keepAlive so the executor is destroyed at the right time. + .then( + DeferredExecutor::wrap(std::move(defKeepAlive), std::move(func))) + // Finally, convert back o a folly::SemiFuture to hide the executor + .semi(); + // Carry deferred executor through chain as constructor from Future will + // nullify it + sf.setExecutor(e); + return sf; +} + +template +Future Future::makeEmpty() { + return Future(futures::detail::EmptyConstruct{}); +} + +template +Future::Future(Future&& other) noexcept + : futures::detail::FutureBase(std::move(other)) {} + +template +Future& Future::operator=(Future&& other) noexcept { + this->assign(other); + return *this; +} + +template +template < + class T2, + typename std::enable_if< + !std::is_same::type>::value && + std::is_constructible::value && + std::is_convertible::value, + int>::type> +Future::Future(Future&& other) + : Future(std::move(other).then([](T2&& v) { return T(std::move(v)); })) {} + +template +template < + class T2, + typename std::enable_if< + !std::is_same::type>::value && + std::is_constructible::value && + !std::is_convertible::value, + int>::type> +Future::Future(Future&& other) + : Future(std::move(other).then([](T2&& v) { return T(std::move(v)); })) {} + +template +template < + class T2, + typename std::enable_if< + !std::is_same::type>::value && + std::is_constructible::value, + int>::type> +Future& Future::operator=(Future&& other) { + return operator=( + std::move(other).then([](T2&& v) { return T(std::move(v)); })); +} + +// unwrap + +template +template +typename std:: + enable_if::value, Future::Inner>>::type + Future::unwrap() { + return then([](Future::Inner> internal_future) { + return internal_future; + }); +} + +template +inline Future Future::via(Executor* executor, int8_t priority) && { + this->throwIfInvalid(); + + this->setExecutor(executor, priority); + + auto newFuture = Future(this->core_); + this->core_ = nullptr; + return newFuture; +} + +template +inline Future Future::via(Executor* executor, int8_t priority) & { + this->throwIfInvalid(); + Promise p; + auto f = p.getFuture(); + auto func = [p = std::move(p)](Try&& t) mutable { + p.setTry(std::move(t)); + }; + using R = futures::detail::callableResult; + this->template thenImplementation( + std::move(func), typename R::Arg()); + return std::move(f).via(executor, priority); +} template template @@ -717,10 +804,6 @@ auto via(Executor* x, Func&& func) return via(x).then(std::forward(func)); } -template -Future::Future(futures::detail::EmptyConstruct) noexcept - : SemiFuture(futures::detail::EmptyConstruct{}) {} - // makeFuture template @@ -1053,27 +1136,38 @@ Future reduce(It first, It last, T&& initial, F&& func) { template std::vector> window(Collection input, F func, size_t n) { - struct WindowContext { - WindowContext(Collection&& i, F&& fn) - : input_(std::move(i)), promises_(input_.size()), - func_(std::move(fn)) - {} - std::atomic i_ {0}; - Collection input_; - std::vector> promises_; - F func_; + // Use global inline executor singleton + auto executor = &InlineExecutor::instance(); + return window(executor, std::move(input), std::move(func), n); +} - static inline void spawn(const std::shared_ptr& ctx) { - size_t i = ctx->i_++; - if (i < ctx->input_.size()) { - // Using setCallback_ directly since we don't need the Future - ctx->func_(std::move(ctx->input_[i])).setCallback_( - // ctx is captured by value - [ctx, i](Try&& t) { - ctx->promises_[i].setTry(std::move(t)); +template +std::vector> +window(Executor* executor, Collection input, F func, size_t n) { + struct WindowContext { + WindowContext(Executor* executor_, Collection&& input_, F&& func_) + : executor(executor_), + input(std::move(input_)), + promises(input.size()), + func(std::move(func_)) {} + std::atomic i{0}; + Executor* executor; + Collection input; + std::vector> promises; + F func; + + static inline void spawn(std::shared_ptr ctx) { + size_t i = ctx->i++; + if (i < ctx->input.size()) { + auto fut = ctx->func(std::move(ctx->input[i])); + fut.setCallback_([ctx = std::move(ctx), i](Try&& t) mutable { + const auto executor_ = ctx->executor; + executor_->add([ctx = std::move(ctx), i, t = std::move(t)]() mutable { + ctx->promises[i].setTry(std::move(t)); // Chain another future onto this one spawn(std::move(ctx)); }); + }); } } }; @@ -1081,16 +1175,16 @@ window(Collection input, F func, size_t n) { auto max = std::min(n, input.size()); auto ctx = std::make_shared( - std::move(input), std::move(func)); + executor, std::move(input), std::move(func)); + // Start the first n Futures for (size_t i = 0; i < max; ++i) { - // Start the first n Futures - WindowContext::spawn(ctx); + executor->add([ctx]() mutable { WindowContext::spawn(std::move(ctx)); }); } std::vector> futures; - futures.reserve(ctx->promises_.size()); - for (auto& promise : ctx->promises_) { + futures.reserve(ctx->promises.size()); + for (auto& promise : ctx->promises) { futures.emplace_back(promise.getFuture()); } @@ -1191,9 +1285,13 @@ Future Future::within(Duration dur, E e, Timekeeper* tk) { } std::shared_ptr tks; - if (!tk) { + if (LIKELY(!tk)) { tks = folly::detail::getTimekeeperSingleton(); - tk = DCHECK_NOTNULL(tks.get()); + tk = tks.get(); + } + + if (UNLIKELY(!tk)) { + return makeFuture(NoTimekeeper()); } auto ctx = std::make_shared(std::move(e)); @@ -1231,22 +1329,33 @@ Future Future::within(Duration dur, E e, Timekeeper* tk) { template Future Future::delayed(Duration dur, Timekeeper* tk) { return collectAll(*this, futures::sleep(dur, tk)) - .then([](std::tuple, Try> tup) { - Try& t = std::get<0>(tup); - return makeFuture(std::move(t)); - }); + .then([](std::tuple, Try> tup) { + Try& t = std::get<0>(tup); + return makeFuture(std::move(t)); + }); } namespace futures { namespace detail { +template +void doBoost(folly::Future& /* usused */) {} + +template +void doBoost(folly::SemiFuture& f) { + f.boost_(); +} + template void waitImpl(FutureType& f) { // short-circuit if there's nothing to do - if (f.isReady()) return; + if (f.isReady()) { + return; + } FutureBatonType baton; f.setCallback_([&](const Try& /* t */) { baton.post(); }); + doBoost(f); baton.wait(); assert(f.isReady()); } @@ -1261,10 +1370,11 @@ void waitImpl(FutureType& f, Duration dur) { Promise promise; auto ret = promise.getFuture(); auto baton = std::make_shared(); - f.setCallback_([ baton, promise = std::move(promise) ](Try && t) mutable { + f.setCallback_([baton, promise = std::move(promise)](Try&& t) mutable { promise.setTry(std::move(t)); baton->post(); }); + doBoost(f); f = std::move(ret); if (baton->timed_wait(dur)) { assert(f.isReady()); @@ -1276,8 +1386,9 @@ void waitViaImpl(Future& f, DrivableExecutor* e) { // Set callback so to ensure that the via executor has something on it // so that once the preceding future triggers this callback, drive will // always have a callback to satisfy it - if (f.isReady()) + if (f.isReady()) { return; + } f = f.via(e).then([](T&& t) { return std::move(t); }); while (!f.isReady()) { e->drive(); @@ -1313,12 +1424,12 @@ SemiFuture&& SemiFuture::wait(Duration dur) && { } template -T SemiFuture::get() { +T SemiFuture::get() && { return std::move(wait().value()); } template -T SemiFuture::get(Duration dur) { +T SemiFuture::get(Duration dur) && { wait(dur); if (this->isReady()) { return std::move(this->value()); @@ -1363,6 +1474,21 @@ Future&& Future::waitVia(DrivableExecutor* e) && { return std::move(*this); } +template +T Future::get() { + return std::move(wait().value()); +} + +template +T Future::get(Duration dur) { + wait(dur); + if (this->isReady()) { + return std::move(this->value()); + } else { + throwTimedOut(); + } +} + template T Future::getVia(DrivableExecutor* e) { return std::move(waitVia(e).value()); @@ -1432,236 +1558,15 @@ Future times(const int n, F&& thunk) { } namespace futures { - template - std::vector> map(It first, It last, F func) { - std::vector> results; - for (auto it = first; it != last; it++) { - results.push_back(it->then(func)); - } - return results; +template +std::vector> map(It first, It last, F func) { + std::vector> results; + for (auto it = first; it != last; it++) { + results.push_back(it->then(func)); } + return results; } - -namespace futures { - -namespace detail { - -struct retrying_policy_raw_tag {}; -struct retrying_policy_fut_tag {}; - -template -struct retrying_policy_traits { - using result = std::result_of_t; - using is_raw = std::is_same; - using is_fut = std::is_same>; - using tag = typename std::conditional< - is_raw::value, retrying_policy_raw_tag, typename std::conditional< - is_fut::value, retrying_policy_fut_tag, void>::type>::type; -}; - -template -void retryingImpl(size_t k, Policy&& p, FF&& ff, Prom prom) { - using F = typename std::result_of::type; - using T = typename F::value_type; - auto f = makeFutureWith([&] { return ff(k++); }); - f.then([ - k, - prom = std::move(prom), - pm = std::forward(p), - ffm = std::forward(ff) - ](Try && t) mutable { - if (t.hasValue()) { - prom.setValue(std::move(t).value()); - return; - } - auto& x = t.exception(); - auto q = pm(k, x); - q.then([ - k, - prom = std::move(prom), - xm = std::move(x), - pm = std::move(pm), - ffm = std::move(ffm) - ](bool shouldRetry) mutable { - if (shouldRetry) { - retryingImpl(k, std::move(pm), std::move(ffm), std::move(prom)); - } else { - prom.setException(std::move(xm)); - }; - }); - }); -} - -template -typename std::result_of::type -retrying(size_t k, Policy&& p, FF&& ff) { - using F = typename std::result_of::type; - using T = typename F::value_type; - auto prom = Promise(); - auto f = prom.getFuture(); - retryingImpl( - k, std::forward(p), std::forward(ff), std::move(prom)); - return f; -} - -template -typename std::result_of::type -retrying(Policy&& p, FF&& ff, retrying_policy_raw_tag) { - auto q = [pm = std::forward(p)](size_t k, exception_wrapper x) { - return makeFuture(pm(k, x)); - }; - return retrying(0, std::move(q), std::forward(ff)); -} - -template -typename std::result_of::type -retrying(Policy&& p, FF&& ff, retrying_policy_fut_tag) { - return retrying(0, std::forward(p), std::forward(ff)); -} - -// jittered exponential backoff, clamped to [backoff_min, backoff_max] -template -Duration retryingJitteredExponentialBackoffDur( - size_t n, - Duration backoff_min, - Duration backoff_max, - double jitter_param, - URNG& rng) { - using d = Duration; - auto dist = std::normal_distribution(0.0, jitter_param); - auto jitter = std::exp(dist(rng)); - auto backoff = d(d::rep(jitter * backoff_min.count() * std::pow(2, n - 1))); - return std::max(backoff_min, std::min(backoff_max, backoff)); -} - -template -std::function(size_t, const exception_wrapper&)> -retryingPolicyCappedJitteredExponentialBackoff( - size_t max_tries, - Duration backoff_min, - Duration backoff_max, - double jitter_param, - URNG&& rng, - Policy&& p) { - return [ - pm = std::forward(p), - max_tries, - backoff_min, - backoff_max, - jitter_param, - rngp = std::forward(rng) - ](size_t n, const exception_wrapper& ex) mutable { - if (n == max_tries) { - return makeFuture(false); - } - return pm(n, ex).then( - [ n, backoff_min, backoff_max, jitter_param, rngp = std::move(rngp) ]( - bool v) mutable { - if (!v) { - return makeFuture(false); - } - auto backoff = detail::retryingJitteredExponentialBackoffDur( - n, backoff_min, backoff_max, jitter_param, rngp); - return futures::sleep(backoff).then([] { return true; }); - }); - }; -} - -template -std::function(size_t, const exception_wrapper&)> -retryingPolicyCappedJitteredExponentialBackoff( - size_t max_tries, - Duration backoff_min, - Duration backoff_max, - double jitter_param, - URNG&& rng, - Policy&& p, - retrying_policy_raw_tag) { - auto q = [pm = std::forward(p)]( - size_t n, const exception_wrapper& e) { - return makeFuture(pm(n, e)); - }; - return retryingPolicyCappedJitteredExponentialBackoff( - max_tries, - backoff_min, - backoff_max, - jitter_param, - std::forward(rng), - std::move(q)); -} - -template -std::function(size_t, const exception_wrapper&)> -retryingPolicyCappedJitteredExponentialBackoff( - size_t max_tries, - Duration backoff_min, - Duration backoff_max, - double jitter_param, - URNG&& rng, - Policy&& p, - retrying_policy_fut_tag) { - return retryingPolicyCappedJitteredExponentialBackoff( - max_tries, - backoff_min, - backoff_max, - jitter_param, - std::forward(rng), - std::forward(p)); -} -} - -template -typename std::result_of::type -retrying(Policy&& p, FF&& ff) { - using tag = typename detail::retrying_policy_traits::tag; - return detail::retrying(std::forward(p), std::forward(ff), tag()); -} - -inline -std::function -retryingPolicyBasic( - size_t max_tries) { - return [=](size_t n, const exception_wrapper&) { return n < max_tries; }; -} - -template -std::function(size_t, const exception_wrapper&)> -retryingPolicyCappedJitteredExponentialBackoff( - size_t max_tries, - Duration backoff_min, - Duration backoff_max, - double jitter_param, - URNG&& rng, - Policy&& p) { - using tag = typename detail::retrying_policy_traits::tag; - return detail::retryingPolicyCappedJitteredExponentialBackoff( - max_tries, - backoff_min, - backoff_max, - jitter_param, - std::forward(rng), - std::forward(p), - tag()); -} - -inline -std::function(size_t, const exception_wrapper&)> -retryingPolicyCappedJitteredExponentialBackoff( - size_t max_tries, - Duration backoff_min, - Duration backoff_max, - double jitter_param) { - auto p = [](size_t, const exception_wrapper&) { return true; }; - return retryingPolicyCappedJitteredExponentialBackoff( - max_tries, - backoff_min, - backoff_max, - jitter_param, - ThreadLocalPRNG(), - std::move(p)); -} - -} +} // namespace futures // Instantiate the most common Future types to save compile time extern template class Future;