From e0de0a27de45542ce9bd05685ea398afc4e1acb4 Mon Sep 17 00:00:00 2001 From: Yedidya Feldblum Date: Thu, 30 Jul 2015 20:10:19 -0700 Subject: [PATCH] futures::retrying. Summary: [Folly] futures::retrying. Reviewed By: @fugalh Differential Revision: D2201630 --- folly/futures/Future-inl.h | 195 ++++++++++++++++++++++++++++ folly/futures/helpers.h | 68 ++++++++++ folly/futures/test/RetryingTest.cpp | 147 +++++++++++++++++++++ folly/test/Makefile.am | 1 + 4 files changed, 411 insertions(+) create mode 100644 folly/futures/test/RetryingTest.cpp diff --git a/folly/futures/Future-inl.h b/folly/futures/Future-inl.h index d4614fce..995d94b7 100644 --- a/folly/futures/Future-inl.h +++ b/folly/futures/Future-inl.h @@ -16,11 +16,15 @@ #pragma once +#include #include +#include #include #include #include +#include +#include #include #include @@ -1093,6 +1097,197 @@ namespace futures { } } +namespace futures { + +namespace detail { + +struct retrying_policy_raw_tag {}; +struct retrying_policy_fut_tag {}; + +template +struct retrying_policy_traits { + using ew = exception_wrapper; + FOLLY_CREATE_HAS_MEMBER_FN_TRAITS(has_op_call, operator()); + template + using has_op = typename std::integral_constant::value || + has_op_call::value>; + using is_raw = has_op; + using is_fut = has_op>; + using tag = typename std::conditional< + is_raw::value, retrying_policy_raw_tag, typename std::conditional< + is_fut::value, retrying_policy_fut_tag, void>::type>::type; +}; + +template +typename std::result_of::type +retrying(size_t k, Policy&& p, FF&& ff) { + using F = typename std::result_of::type; + using T = typename F::value_type; + auto f = ff(k++); + auto pm = makeMoveWrapper(p); + auto ffm = makeMoveWrapper(ff); + return f.onError([=](exception_wrapper x) mutable { + auto q = (*pm)(k, x); + auto xm = makeMoveWrapper(std::move(x)); + return q.then([=](bool r) mutable { + return r + ? retrying(k, pm.move(), ffm.move()) + : makeFuture(xm.move()); + }); + }); +} + +template +typename std::result_of::type +retrying(Policy&& p, FF&& ff, retrying_policy_raw_tag) { + auto pm = makeMoveWrapper(std::move(p)); + auto q = [=](size_t k, exception_wrapper x) { + return makeFuture((*pm)(k, x)); + }; + return retrying(0, std::move(q), std::forward(ff)); +} + +template +typename std::result_of::type +retrying(Policy&& p, FF&& ff, retrying_policy_fut_tag) { + return retrying(0, std::forward(p), std::forward(ff)); +} + +// jittered exponential backoff, clamped to [backoff_min, backoff_max] +template +Duration retryingJitteredExponentialBackoffDur( + size_t n, + Duration backoff_min, + Duration backoff_max, + double jitter_param, + URNG& rng) { + using d = Duration; + auto dist = std::normal_distribution(0.0, jitter_param); + auto jitter = std::exp(dist(rng)); + auto backoff = d(d::rep(jitter * backoff_min.count() * std::pow(2, n - 1))); + return std::max(backoff_min, std::min(backoff_max, backoff)); +} + +template +std::function(size_t, const exception_wrapper&)> +retryingPolicyCappedJitteredExponentialBackoff( + size_t max_tries, + Duration backoff_min, + Duration backoff_max, + double jitter_param, + URNG rng, + Policy&& p) { + auto pm = makeMoveWrapper(std::move(p)); + auto rngp = std::make_shared(std::move(rng)); + return [=](size_t n, const exception_wrapper& ex) mutable { + if (n == max_tries) { return makeFuture(false); } + return (*pm)(n, ex).then([=](bool v) { + if (!v) { return makeFuture(false); } + auto backoff = detail::retryingJitteredExponentialBackoffDur( + n, backoff_min, backoff_max, jitter_param, *rngp); + return futures::sleep(backoff).then([] { return true; }); + }); + }; +} + +template +std::function(size_t, const exception_wrapper&)> +retryingPolicyCappedJitteredExponentialBackoff( + size_t max_tries, + Duration backoff_min, + Duration backoff_max, + double jitter_param, + URNG rng, + Policy&& p, + retrying_policy_raw_tag) { + auto pm = makeMoveWrapper(std::move(p)); + auto q = [=](size_t n, const exception_wrapper& e) { + return makeFuture((*pm)(n, e)); + }; + return retryingPolicyCappedJitteredExponentialBackoff( + max_tries, + backoff_min, + backoff_max, + jitter_param, + std::move(rng), + std::move(q)); +} + +template +std::function(size_t, const exception_wrapper&)> +retryingPolicyCappedJitteredExponentialBackoff( + size_t max_tries, + Duration backoff_min, + Duration backoff_max, + double jitter_param, + URNG rng, + Policy&& p, + retrying_policy_fut_tag) { + return retryingPolicyCappedJitteredExponentialBackoff( + max_tries, + backoff_min, + backoff_max, + jitter_param, + std::move(rng), + std::move(p)); +} + +} + +template +typename std::result_of::type +retrying(Policy&& p, FF&& ff) { + using tag = typename detail::retrying_policy_traits::tag; + return detail::retrying(std::forward(p), std::forward(ff), tag()); +} + +inline +std::function +retryingPolicyBasic( + size_t max_tries) { + return [=](size_t n, const exception_wrapper&) { return n < max_tries; }; +} + +template +std::function(size_t, const exception_wrapper&)> +retryingPolicyCappedJitteredExponentialBackoff( + size_t max_tries, + Duration backoff_min, + Duration backoff_max, + double jitter_param, + URNG rng, + Policy&& p) { + using tag = typename detail::retrying_policy_traits::tag; + return detail::retryingPolicyCappedJitteredExponentialBackoff( + max_tries, + backoff_min, + backoff_max, + jitter_param, + std::move(rng), + std::move(p), + tag()); +} + +inline +std::function(size_t, const exception_wrapper&)> +retryingPolicyCappedJitteredExponentialBackoff( + size_t max_tries, + Duration backoff_min, + Duration backoff_max, + double jitter_param) { + auto p = [](size_t, const exception_wrapper&) { return true; }; + return retryingPolicyCappedJitteredExponentialBackoff( + max_tries, + backoff_min, + backoff_max, + jitter_param, + ThreadLocalPRNG(), + std::move(p)); +} + +} + // Instantiate the most common Future types to save compile time extern template class Future; extern template class Future; diff --git a/folly/futures/helpers.h b/folly/futures/helpers.h index 3b0ef299..ea1dc70d 100644 --- a/folly/futures/helpers.h +++ b/folly/futures/helpers.h @@ -285,4 +285,72 @@ auto unorderedReduce(Collection&& c, T&& initial, F&& func) std::forward(func)); } +namespace futures { + +/** + * retrying + * + * Given a policy and a future-factory, creates futures according to the + * policy. + * + * The policy must be moveable - retrying will move it a lot - and callable of + * either of the two forms: + * - Future(size_t, exception_wrapper) + * - bool(size_t, exception_wrapper) + * Internally, the latter is transformed into the former in the obvious way. + * The first parameter is the attempt number of the next prospective attempt; + * the second parameter is the most recent exception. The policy returns a + * Future which, when completed with true, indicates that a retry is + * desired. + * + * We provide a few generic policies: + * - Basic + * - CappedJitteredexponentialBackoff + * + * Custom policies may use the most recent try number and exception to decide + * whether to retry and optionally to do something interesting like delay + * before the retry. Users may pass inline lambda expressions as policies, or + * may define their own data types meeting the above requirements. Users are + * responsible for managing the lifetimes of anything pointed to or referred to + * from inside the policy. + * + * For example, one custom policy may try up to k times, but only if the most + * recent exception is one of a few types or has one of a few error codes + * indicating that the failure was transitory. + * + * Cancellation is not supported. + */ +template +typename std::result_of::type +retrying(Policy&& p, FF&& ff); + +/** + * generic retrying policies + */ + +inline +std::function +retryingPolicyBasic( + size_t max_tries); + +template +std::function(size_t, const exception_wrapper&)> +retryingPolicyCappedJitteredExponentialBackoff( + size_t max_tries, + Duration backoff_min, + Duration backoff_max, + double jitter_param, + URNG rng, + Policy&& p); + +inline +std::function(size_t, const exception_wrapper&)> +retryingPolicyCappedJitteredExponentialBackoff( + size_t max_tries, + Duration backoff_min, + Duration backoff_max, + double jitter_param); + +} + } // namespace diff --git a/folly/futures/test/RetryingTest.cpp b/folly/futures/test/RetryingTest.cpp new file mode 100644 index 00000000..20602230 --- /dev/null +++ b/folly/futures/test/RetryingTest.cpp @@ -0,0 +1,147 @@ +/* + * Copyright 2015 Facebook, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include +#include +#include + +#include + +using namespace std; +using namespace std::chrono; +using namespace folly; + +TEST(RetryingTest, has_op_call) { + using ew = exception_wrapper; + auto policy_raw = [](size_t n, const ew&) { return n < 3; }; + auto policy_fut = [](size_t n, const ew&) { return makeFuture(n < 3); }; + using namespace futures::detail; + EXPECT_TRUE(retrying_policy_traits::is_raw::value); + EXPECT_TRUE(retrying_policy_traits::is_fut::value); +} + +TEST(RetryingTest, basic) { + auto r = futures::retrying( + [](size_t n, const exception_wrapper&) { return n < 3; }, + [](size_t n) { + return n < 2 + ? makeFuture(runtime_error("ha")) + : makeFuture(n); + } + ).wait(); + EXPECT_EQ(2, r.value()); +} + +TEST(RetryingTest, policy_future) { + atomic sleeps {0}; + auto r = futures::retrying( + [&](size_t n, const exception_wrapper&) { + return n < 3 + ? makeFuture(++sleeps).then([] { return true; }) + : makeFuture(false); + }, + [](size_t n) { + return n < 2 + ? makeFuture(runtime_error("ha")) + : makeFuture(n); + } + ).wait(); + EXPECT_EQ(2, r.value()); + EXPECT_EQ(2, sleeps); +} + +TEST(RetryingTest, policy_basic) { + auto r = futures::retrying( + futures::retryingPolicyBasic(3), + [](size_t n) { + return n < 2 + ? makeFuture(runtime_error("ha")) + : makeFuture(n); + } + ).wait(); + EXPECT_EQ(2, r.value()); +} + +TEST(RetryingTest, policy_capped_jittered_exponential_backoff) { + using ms = milliseconds; + auto start = steady_clock::now(); + auto r = futures::retrying( + futures::retryingPolicyCappedJitteredExponentialBackoff( + 3, ms(100), ms(1000), 0.1, mt19937_64(0), + [](size_t, const exception_wrapper&) { return true; }), + [](size_t n) { + return n < 2 + ? makeFuture(runtime_error("ha")) + : makeFuture(n); + } + ).wait(); + auto finish = steady_clock::now(); + auto duration = duration_cast(finish - start); + EXPECT_EQ(2, r.value()); + EXPECT_NEAR( + milliseconds(300).count(), + duration.count(), + milliseconds(25).count()); +} + +TEST(RetryingTest, policy_sleep_defaults) { + // To ensure that this compiles with default params. + using ms = milliseconds; + auto start = steady_clock::now(); + auto r = futures::retrying( + futures::retryingPolicyCappedJitteredExponentialBackoff( + 3, ms(100), ms(1000), 0.1), + [](size_t n) { + return n < 2 + ? makeFuture(runtime_error("ha")) + : makeFuture(n); + } + ).wait(); + auto finish = steady_clock::now(); + auto duration = duration_cast(finish - start); + EXPECT_EQ(2, r.value()); + EXPECT_NEAR( + milliseconds(300).count(), + duration.count(), + milliseconds(100).count()); +} + +/* +TEST(RetryingTest, policy_sleep_cancel) { + mt19937_64 rng(0); + using ms = milliseconds; + auto start = steady_clock::now(); + auto r = futures::retrying( + futures::retryingPolicyCappedJitteredExponentialBackoff( + 5, ms(100), ms(1000), 0.1, rng, + [](size_t n, const exception_wrapper&) { return true; }), + [](size_t n) { + return n < 4 + ? makeFuture(runtime_error("ha")) + : makeFuture(n); + } + ); + r.cancel(); + r.wait(); + auto finish = steady_clock::now(); + auto duration = duration_cast(finish - start); + EXPECT_EQ(2, r.value()); + EXPECT_NEAR( + milliseconds(0).count(), + duration.count(), + milliseconds(10).count()); +} +*/ diff --git a/folly/test/Makefile.am b/folly/test/Makefile.am index c3a94ba7..22ee3245 100644 --- a/folly/test/Makefile.am +++ b/folly/test/Makefile.am @@ -198,6 +198,7 @@ futures_test_SOURCES = \ ../futures/test/PollTest.cpp \ ../futures/test/PromiseTest.cpp \ ../futures/test/ReduceTest.cpp \ + ../futures/test/RetryingTest.cpp \ ../futures/test/SharedPromiseTest.cpp \ ../futures/test/ThenCompileTest.cpp \ ../futures/test/ThenTest.cpp \ -- 2.34.1