#pragma once
+#include <algorithm>
#include <chrono>
+#include <random>
#include <thread>
#include <folly/experimental/fibers/Baton.h>
#include <folly/Optional.h>
+#include <folly/Random.h>
+#include <folly/Traits.h>
#include <folly/futures/detail/Core.h>
#include <folly/futures/Timekeeper.h>
}
}
+namespace futures {
+
+namespace detail {
+
+struct retrying_policy_raw_tag {};
+struct retrying_policy_fut_tag {};
+
+template <class Policy>
+struct retrying_policy_traits {
+ using ew = exception_wrapper;
+ FOLLY_CREATE_HAS_MEMBER_FN_TRAITS(has_op_call, operator());
+ template <class Ret>
+ using has_op = typename std::integral_constant<bool,
+ has_op_call<Policy, Ret(size_t, const ew&)>::value ||
+ has_op_call<Policy, Ret(size_t, const ew&) const>::value>;
+ using is_raw = has_op<bool>;
+ using is_fut = has_op<Future<bool>>;
+ using tag = typename std::conditional<
+ is_raw::value, retrying_policy_raw_tag, typename std::conditional<
+ is_fut::value, retrying_policy_fut_tag, void>::type>::type;
+};
+
+template <class Policy, class FF>
+typename std::result_of<FF(size_t)>::type
+retrying(size_t k, Policy&& p, FF&& ff) {
+ using F = typename std::result_of<FF(size_t)>::type;
+ using T = typename F::value_type;
+ auto f = ff(k++);
+ auto pm = makeMoveWrapper(p);
+ auto ffm = makeMoveWrapper(ff);
+ return f.onError([=](exception_wrapper x) mutable {
+ auto q = (*pm)(k, x);
+ auto xm = makeMoveWrapper(std::move(x));
+ return q.then([=](bool r) mutable {
+ return r
+ ? retrying(k, pm.move(), ffm.move())
+ : makeFuture<T>(xm.move());
+ });
+ });
+}
+
+template <class Policy, class FF>
+typename std::result_of<FF(size_t)>::type
+retrying(Policy&& p, FF&& ff, retrying_policy_raw_tag) {
+ auto pm = makeMoveWrapper(std::move(p));
+ auto q = [=](size_t k, exception_wrapper x) {
+ return makeFuture<bool>((*pm)(k, x));
+ };
+ return retrying(0, std::move(q), std::forward<FF>(ff));
+}
+
+template <class Policy, class FF>
+typename std::result_of<FF(size_t)>::type
+retrying(Policy&& p, FF&& ff, retrying_policy_fut_tag) {
+ return retrying(0, std::forward<Policy>(p), std::forward<FF>(ff));
+}
+
+// jittered exponential backoff, clamped to [backoff_min, backoff_max]
+template <class URNG>
+Duration retryingJitteredExponentialBackoffDur(
+ size_t n,
+ Duration backoff_min,
+ Duration backoff_max,
+ double jitter_param,
+ URNG& rng) {
+ using d = Duration;
+ auto dist = std::normal_distribution<double>(0.0, jitter_param);
+ auto jitter = std::exp(dist(rng));
+ auto backoff = d(d::rep(jitter * backoff_min.count() * std::pow(2, n - 1)));
+ return std::max(backoff_min, std::min(backoff_max, backoff));
+}
+
+template <class Policy, class URNG>
+std::function<Future<bool>(size_t, const exception_wrapper&)>
+retryingPolicyCappedJitteredExponentialBackoff(
+ size_t max_tries,
+ Duration backoff_min,
+ Duration backoff_max,
+ double jitter_param,
+ URNG rng,
+ Policy&& p) {
+ auto pm = makeMoveWrapper(std::move(p));
+ auto rngp = std::make_shared<URNG>(std::move(rng));
+ return [=](size_t n, const exception_wrapper& ex) mutable {
+ if (n == max_tries) { return makeFuture(false); }
+ return (*pm)(n, ex).then([=](bool v) {
+ if (!v) { return makeFuture(false); }
+ auto backoff = detail::retryingJitteredExponentialBackoffDur(
+ n, backoff_min, backoff_max, jitter_param, *rngp);
+ return futures::sleep(backoff).then([] { return true; });
+ });
+ };
+}
+
+template <class Policy, class URNG>
+std::function<Future<bool>(size_t, const exception_wrapper&)>
+retryingPolicyCappedJitteredExponentialBackoff(
+ size_t max_tries,
+ Duration backoff_min,
+ Duration backoff_max,
+ double jitter_param,
+ URNG rng,
+ Policy&& p,
+ retrying_policy_raw_tag) {
+ auto pm = makeMoveWrapper(std::move(p));
+ auto q = [=](size_t n, const exception_wrapper& e) {
+ return makeFuture((*pm)(n, e));
+ };
+ return retryingPolicyCappedJitteredExponentialBackoff(
+ max_tries,
+ backoff_min,
+ backoff_max,
+ jitter_param,
+ std::move(rng),
+ std::move(q));
+}
+
+template <class Policy, class URNG>
+std::function<Future<bool>(size_t, const exception_wrapper&)>
+retryingPolicyCappedJitteredExponentialBackoff(
+ size_t max_tries,
+ Duration backoff_min,
+ Duration backoff_max,
+ double jitter_param,
+ URNG rng,
+ Policy&& p,
+ retrying_policy_fut_tag) {
+ return retryingPolicyCappedJitteredExponentialBackoff(
+ max_tries,
+ backoff_min,
+ backoff_max,
+ jitter_param,
+ std::move(rng),
+ std::move(p));
+}
+
+}
+
+template <class Policy, class FF>
+typename std::result_of<FF(size_t)>::type
+retrying(Policy&& p, FF&& ff) {
+ using tag = typename detail::retrying_policy_traits<Policy>::tag;
+ return detail::retrying(std::forward<Policy>(p), std::forward<FF>(ff), tag());
+}
+
+inline
+std::function<bool(size_t, const exception_wrapper&)>
+retryingPolicyBasic(
+ size_t max_tries) {
+ return [=](size_t n, const exception_wrapper&) { return n < max_tries; };
+}
+
+template <class Policy, class URNG>
+std::function<Future<bool>(size_t, const exception_wrapper&)>
+retryingPolicyCappedJitteredExponentialBackoff(
+ size_t max_tries,
+ Duration backoff_min,
+ Duration backoff_max,
+ double jitter_param,
+ URNG rng,
+ Policy&& p) {
+ using tag = typename detail::retrying_policy_traits<Policy>::tag;
+ return detail::retryingPolicyCappedJitteredExponentialBackoff(
+ max_tries,
+ backoff_min,
+ backoff_max,
+ jitter_param,
+ std::move(rng),
+ std::move(p),
+ tag());
+}
+
+inline
+std::function<Future<bool>(size_t, const exception_wrapper&)>
+retryingPolicyCappedJitteredExponentialBackoff(
+ size_t max_tries,
+ Duration backoff_min,
+ Duration backoff_max,
+ double jitter_param) {
+ auto p = [](size_t, const exception_wrapper&) { return true; };
+ return retryingPolicyCappedJitteredExponentialBackoff(
+ max_tries,
+ backoff_min,
+ backoff_max,
+ jitter_param,
+ ThreadLocalPRNG(),
+ std::move(p));
+}
+
+}
+
// Instantiate the most common Future types to save compile time
extern template class Future<Unit>;
extern template class Future<bool>;
std::forward<F>(func));
}
+namespace futures {
+
+/**
+ * retrying
+ *
+ * Given a policy and a future-factory, creates futures according to the
+ * policy.
+ *
+ * The policy must be moveable - retrying will move it a lot - and callable of
+ * either of the two forms:
+ * - Future<bool>(size_t, exception_wrapper)
+ * - bool(size_t, exception_wrapper)
+ * Internally, the latter is transformed into the former in the obvious way.
+ * The first parameter is the attempt number of the next prospective attempt;
+ * the second parameter is the most recent exception. The policy returns a
+ * Future<bool> which, when completed with true, indicates that a retry is
+ * desired.
+ *
+ * We provide a few generic policies:
+ * - Basic
+ * - CappedJitteredexponentialBackoff
+ *
+ * Custom policies may use the most recent try number and exception to decide
+ * whether to retry and optionally to do something interesting like delay
+ * before the retry. Users may pass inline lambda expressions as policies, or
+ * may define their own data types meeting the above requirements. Users are
+ * responsible for managing the lifetimes of anything pointed to or referred to
+ * from inside the policy.
+ *
+ * For example, one custom policy may try up to k times, but only if the most
+ * recent exception is one of a few types or has one of a few error codes
+ * indicating that the failure was transitory.
+ *
+ * Cancellation is not supported.
+ */
+template <class Policy, class FF>
+typename std::result_of<FF(size_t)>::type
+retrying(Policy&& p, FF&& ff);
+
+/**
+ * generic retrying policies
+ */
+
+inline
+std::function<bool(size_t, const exception_wrapper&)>
+retryingPolicyBasic(
+ size_t max_tries);
+
+template <class Policy, class URNG>
+std::function<Future<bool>(size_t, const exception_wrapper&)>
+retryingPolicyCappedJitteredExponentialBackoff(
+ size_t max_tries,
+ Duration backoff_min,
+ Duration backoff_max,
+ double jitter_param,
+ URNG rng,
+ Policy&& p);
+
+inline
+std::function<Future<bool>(size_t, const exception_wrapper&)>
+retryingPolicyCappedJitteredExponentialBackoff(
+ size_t max_tries,
+ Duration backoff_min,
+ Duration backoff_max,
+ double jitter_param);
+
+}
+
} // namespace
--- /dev/null
+/*
+ * Copyright 2015 Facebook, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include <atomic>
+#include <exception>
+#include <folly/futures/Future.h>
+
+#include <gtest/gtest.h>
+
+using namespace std;
+using namespace std::chrono;
+using namespace folly;
+
+TEST(RetryingTest, has_op_call) {
+ using ew = exception_wrapper;
+ auto policy_raw = [](size_t n, const ew&) { return n < 3; };
+ auto policy_fut = [](size_t n, const ew&) { return makeFuture(n < 3); };
+ using namespace futures::detail;
+ EXPECT_TRUE(retrying_policy_traits<decltype(policy_raw)>::is_raw::value);
+ EXPECT_TRUE(retrying_policy_traits<decltype(policy_fut)>::is_fut::value);
+}
+
+TEST(RetryingTest, basic) {
+ auto r = futures::retrying(
+ [](size_t n, const exception_wrapper&) { return n < 3; },
+ [](size_t n) {
+ return n < 2
+ ? makeFuture<size_t>(runtime_error("ha"))
+ : makeFuture(n);
+ }
+ ).wait();
+ EXPECT_EQ(2, r.value());
+}
+
+TEST(RetryingTest, policy_future) {
+ atomic<size_t> sleeps {0};
+ auto r = futures::retrying(
+ [&](size_t n, const exception_wrapper&) {
+ return n < 3
+ ? makeFuture(++sleeps).then([] { return true; })
+ : makeFuture(false);
+ },
+ [](size_t n) {
+ return n < 2
+ ? makeFuture<size_t>(runtime_error("ha"))
+ : makeFuture(n);
+ }
+ ).wait();
+ EXPECT_EQ(2, r.value());
+ EXPECT_EQ(2, sleeps);
+}
+
+TEST(RetryingTest, policy_basic) {
+ auto r = futures::retrying(
+ futures::retryingPolicyBasic(3),
+ [](size_t n) {
+ return n < 2
+ ? makeFuture<size_t>(runtime_error("ha"))
+ : makeFuture(n);
+ }
+ ).wait();
+ EXPECT_EQ(2, r.value());
+}
+
+TEST(RetryingTest, policy_capped_jittered_exponential_backoff) {
+ using ms = milliseconds;
+ auto start = steady_clock::now();
+ auto r = futures::retrying(
+ futures::retryingPolicyCappedJitteredExponentialBackoff(
+ 3, ms(100), ms(1000), 0.1, mt19937_64(0),
+ [](size_t, const exception_wrapper&) { return true; }),
+ [](size_t n) {
+ return n < 2
+ ? makeFuture<size_t>(runtime_error("ha"))
+ : makeFuture(n);
+ }
+ ).wait();
+ auto finish = steady_clock::now();
+ auto duration = duration_cast<milliseconds>(finish - start);
+ EXPECT_EQ(2, r.value());
+ EXPECT_NEAR(
+ milliseconds(300).count(),
+ duration.count(),
+ milliseconds(25).count());
+}
+
+TEST(RetryingTest, policy_sleep_defaults) {
+ // To ensure that this compiles with default params.
+ using ms = milliseconds;
+ auto start = steady_clock::now();
+ auto r = futures::retrying(
+ futures::retryingPolicyCappedJitteredExponentialBackoff(
+ 3, ms(100), ms(1000), 0.1),
+ [](size_t n) {
+ return n < 2
+ ? makeFuture<size_t>(runtime_error("ha"))
+ : makeFuture(n);
+ }
+ ).wait();
+ auto finish = steady_clock::now();
+ auto duration = duration_cast<milliseconds>(finish - start);
+ EXPECT_EQ(2, r.value());
+ EXPECT_NEAR(
+ milliseconds(300).count(),
+ duration.count(),
+ milliseconds(100).count());
+}
+
+/*
+TEST(RetryingTest, policy_sleep_cancel) {
+ mt19937_64 rng(0);
+ using ms = milliseconds;
+ auto start = steady_clock::now();
+ auto r = futures::retrying(
+ futures::retryingPolicyCappedJitteredExponentialBackoff(
+ 5, ms(100), ms(1000), 0.1, rng,
+ [](size_t n, const exception_wrapper&) { return true; }),
+ [](size_t n) {
+ return n < 4
+ ? makeFuture<size_t>(runtime_error("ha"))
+ : makeFuture(n);
+ }
+ );
+ r.cancel();
+ r.wait();
+ auto finish = steady_clock::now();
+ auto duration = duration_cast<milliseconds>(finish - start);
+ EXPECT_EQ(2, r.value());
+ EXPECT_NEAR(
+ milliseconds(0).count(),
+ duration.count(),
+ milliseconds(10).count());
+}
+*/
../futures/test/PollTest.cpp \
../futures/test/PromiseTest.cpp \
../futures/test/ReduceTest.cpp \
+ ../futures/test/RetryingTest.cpp \
../futures/test/SharedPromiseTest.cpp \
../futures/test/ThenCompileTest.cpp \
../futures/test/ThenTest.cpp \