futures::retrying.
authorYedidya Feldblum <yfeldblum@fb.com>
Fri, 31 Jul 2015 03:10:19 +0000 (20:10 -0700)
committerfacebook-github-bot-1 <folly-bot@fb.com>
Fri, 31 Jul 2015 03:22:15 +0000 (20:22 -0700)
Summary: [Folly] futures::retrying.

Reviewed By: @fugalh

Differential Revision: D2201630

folly/futures/Future-inl.h
folly/futures/helpers.h
folly/futures/test/RetryingTest.cpp [new file with mode: 0644]
folly/test/Makefile.am

index d4614fce2e8bc50dd4098d364b57ef7984369250..995d94b7e087151c79d47833e4d1210db5b0f7ac 100644 (file)
 
 #pragma once
 
+#include <algorithm>
 #include <chrono>
+#include <random>
 #include <thread>
 
 #include <folly/experimental/fibers/Baton.h>
 #include <folly/Optional.h>
+#include <folly/Random.h>
+#include <folly/Traits.h>
 #include <folly/futures/detail/Core.h>
 #include <folly/futures/Timekeeper.h>
 
@@ -1093,6 +1097,197 @@ namespace futures {
   }
 }
 
+namespace futures {
+
+namespace detail {
+
+struct retrying_policy_raw_tag {};
+struct retrying_policy_fut_tag {};
+
+template <class Policy>
+struct retrying_policy_traits {
+  using ew = exception_wrapper;
+  FOLLY_CREATE_HAS_MEMBER_FN_TRAITS(has_op_call, operator());
+  template <class Ret>
+  using has_op = typename std::integral_constant<bool,
+        has_op_call<Policy, Ret(size_t, const ew&)>::value ||
+        has_op_call<Policy, Ret(size_t, const ew&) const>::value>;
+  using is_raw = has_op<bool>;
+  using is_fut = has_op<Future<bool>>;
+  using tag = typename std::conditional<
+        is_raw::value, retrying_policy_raw_tag, typename std::conditional<
+        is_fut::value, retrying_policy_fut_tag, void>::type>::type;
+};
+
+template <class Policy, class FF>
+typename std::result_of<FF(size_t)>::type
+retrying(size_t k, Policy&& p, FF&& ff) {
+  using F = typename std::result_of<FF(size_t)>::type;
+  using T = typename F::value_type;
+  auto f = ff(k++);
+  auto pm = makeMoveWrapper(p);
+  auto ffm = makeMoveWrapper(ff);
+  return f.onError([=](exception_wrapper x) mutable {
+      auto q = (*pm)(k, x);
+      auto xm = makeMoveWrapper(std::move(x));
+      return q.then([=](bool r) mutable {
+          return r
+            ? retrying(k, pm.move(), ffm.move())
+            : makeFuture<T>(xm.move());
+      });
+  });
+}
+
+template <class Policy, class FF>
+typename std::result_of<FF(size_t)>::type
+retrying(Policy&& p, FF&& ff, retrying_policy_raw_tag) {
+  auto pm = makeMoveWrapper(std::move(p));
+  auto q = [=](size_t k, exception_wrapper x) {
+    return makeFuture<bool>((*pm)(k, x));
+  };
+  return retrying(0, std::move(q), std::forward<FF>(ff));
+}
+
+template <class Policy, class FF>
+typename std::result_of<FF(size_t)>::type
+retrying(Policy&& p, FF&& ff, retrying_policy_fut_tag) {
+  return retrying(0, std::forward<Policy>(p), std::forward<FF>(ff));
+}
+
+//  jittered exponential backoff, clamped to [backoff_min, backoff_max]
+template <class URNG>
+Duration retryingJitteredExponentialBackoffDur(
+    size_t n,
+    Duration backoff_min,
+    Duration backoff_max,
+    double jitter_param,
+    URNG& rng) {
+  using d = Duration;
+  auto dist = std::normal_distribution<double>(0.0, jitter_param);
+  auto jitter = std::exp(dist(rng));
+  auto backoff = d(d::rep(jitter * backoff_min.count() * std::pow(2, n - 1)));
+  return std::max(backoff_min, std::min(backoff_max, backoff));
+}
+
+template <class Policy, class URNG>
+std::function<Future<bool>(size_t, const exception_wrapper&)>
+retryingPolicyCappedJitteredExponentialBackoff(
+    size_t max_tries,
+    Duration backoff_min,
+    Duration backoff_max,
+    double jitter_param,
+    URNG rng,
+    Policy&& p) {
+  auto pm = makeMoveWrapper(std::move(p));
+  auto rngp = std::make_shared<URNG>(std::move(rng));
+  return [=](size_t n, const exception_wrapper& ex) mutable {
+    if (n == max_tries) { return makeFuture(false); }
+    return (*pm)(n, ex).then([=](bool v) {
+        if (!v) { return makeFuture(false); }
+        auto backoff = detail::retryingJitteredExponentialBackoffDur(
+            n, backoff_min, backoff_max, jitter_param, *rngp);
+        return futures::sleep(backoff).then([] { return true; });
+    });
+  };
+}
+
+template <class Policy, class URNG>
+std::function<Future<bool>(size_t, const exception_wrapper&)>
+retryingPolicyCappedJitteredExponentialBackoff(
+    size_t max_tries,
+    Duration backoff_min,
+    Duration backoff_max,
+    double jitter_param,
+    URNG rng,
+    Policy&& p,
+    retrying_policy_raw_tag) {
+  auto pm = makeMoveWrapper(std::move(p));
+  auto q = [=](size_t n, const exception_wrapper& e) {
+    return makeFuture((*pm)(n, e));
+  };
+  return retryingPolicyCappedJitteredExponentialBackoff(
+      max_tries,
+      backoff_min,
+      backoff_max,
+      jitter_param,
+      std::move(rng),
+      std::move(q));
+}
+
+template <class Policy, class URNG>
+std::function<Future<bool>(size_t, const exception_wrapper&)>
+retryingPolicyCappedJitteredExponentialBackoff(
+    size_t max_tries,
+    Duration backoff_min,
+    Duration backoff_max,
+    double jitter_param,
+    URNG rng,
+    Policy&& p,
+    retrying_policy_fut_tag) {
+  return retryingPolicyCappedJitteredExponentialBackoff(
+      max_tries,
+      backoff_min,
+      backoff_max,
+      jitter_param,
+      std::move(rng),
+      std::move(p));
+}
+
+}
+
+template <class Policy, class FF>
+typename std::result_of<FF(size_t)>::type
+retrying(Policy&& p, FF&& ff) {
+  using tag = typename detail::retrying_policy_traits<Policy>::tag;
+  return detail::retrying(std::forward<Policy>(p), std::forward<FF>(ff), tag());
+}
+
+inline
+std::function<bool(size_t, const exception_wrapper&)>
+retryingPolicyBasic(
+    size_t max_tries) {
+  return [=](size_t n, const exception_wrapper&) { return n < max_tries; };
+}
+
+template <class Policy, class URNG>
+std::function<Future<bool>(size_t, const exception_wrapper&)>
+retryingPolicyCappedJitteredExponentialBackoff(
+    size_t max_tries,
+    Duration backoff_min,
+    Duration backoff_max,
+    double jitter_param,
+    URNG rng,
+    Policy&& p) {
+  using tag = typename detail::retrying_policy_traits<Policy>::tag;
+  return detail::retryingPolicyCappedJitteredExponentialBackoff(
+      max_tries,
+      backoff_min,
+      backoff_max,
+      jitter_param,
+      std::move(rng),
+      std::move(p),
+      tag());
+}
+
+inline
+std::function<Future<bool>(size_t, const exception_wrapper&)>
+retryingPolicyCappedJitteredExponentialBackoff(
+    size_t max_tries,
+    Duration backoff_min,
+    Duration backoff_max,
+    double jitter_param) {
+  auto p = [](size_t, const exception_wrapper&) { return true; };
+  return retryingPolicyCappedJitteredExponentialBackoff(
+      max_tries,
+      backoff_min,
+      backoff_max,
+      jitter_param,
+      ThreadLocalPRNG(),
+      std::move(p));
+}
+
+}
+
 // Instantiate the most common Future types to save compile time
 extern template class Future<Unit>;
 extern template class Future<bool>;
index 3b0ef2995767ba05c325e3f2564a07d1d54a5b15..ea1dc70d8aec4c4225335dcf538e503031c89cce 100644 (file)
@@ -285,4 +285,72 @@ auto unorderedReduce(Collection&& c, T&& initial, F&& func)
       std::forward<F>(func));
 }
 
+namespace futures {
+
+/**
+ *  retrying
+ *
+ *  Given a policy and a future-factory, creates futures according to the
+ *  policy.
+ *
+ *  The policy must be moveable - retrying will move it a lot - and callable of
+ *  either of the two forms:
+ *  - Future<bool>(size_t, exception_wrapper)
+ *  - bool(size_t, exception_wrapper)
+ *  Internally, the latter is transformed into the former in the obvious way.
+ *  The first parameter is the attempt number of the next prospective attempt;
+ *  the second parameter is the most recent exception. The policy returns a
+ *  Future<bool> which, when completed with true, indicates that a retry is
+ *  desired.
+ *
+ *  We provide a few generic policies:
+ *  - Basic
+ *  - CappedJitteredexponentialBackoff
+ *
+ *  Custom policies may use the most recent try number and exception to decide
+ *  whether to retry and optionally to do something interesting like delay
+ *  before the retry. Users may pass inline lambda expressions as policies, or
+ *  may define their own data types meeting the above requirements. Users are
+ *  responsible for managing the lifetimes of anything pointed to or referred to
+ *  from inside the policy.
+ *
+ *  For example, one custom policy may try up to k times, but only if the most
+ *  recent exception is one of a few types or has one of a few error codes
+ *  indicating that the failure was transitory.
+ *
+ *  Cancellation is not supported.
+ */
+template <class Policy, class FF>
+typename std::result_of<FF(size_t)>::type
+retrying(Policy&& p, FF&& ff);
+
+/**
+ *  generic retrying policies
+ */
+
+inline
+std::function<bool(size_t, const exception_wrapper&)>
+retryingPolicyBasic(
+    size_t max_tries);
+
+template <class Policy, class URNG>
+std::function<Future<bool>(size_t, const exception_wrapper&)>
+retryingPolicyCappedJitteredExponentialBackoff(
+    size_t max_tries,
+    Duration backoff_min,
+    Duration backoff_max,
+    double jitter_param,
+    URNG rng,
+    Policy&& p);
+
+inline
+std::function<Future<bool>(size_t, const exception_wrapper&)>
+retryingPolicyCappedJitteredExponentialBackoff(
+    size_t max_tries,
+    Duration backoff_min,
+    Duration backoff_max,
+    double jitter_param);
+
+}
+
 } // namespace
diff --git a/folly/futures/test/RetryingTest.cpp b/folly/futures/test/RetryingTest.cpp
new file mode 100644 (file)
index 0000000..2060223
--- /dev/null
@@ -0,0 +1,147 @@
+/*
+ * Copyright 2015 Facebook, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include <atomic>
+#include <exception>
+#include <folly/futures/Future.h>
+
+#include <gtest/gtest.h>
+
+using namespace std;
+using namespace std::chrono;
+using namespace folly;
+
+TEST(RetryingTest, has_op_call) {
+  using ew = exception_wrapper;
+  auto policy_raw = [](size_t n, const ew&) { return n < 3; };
+  auto policy_fut = [](size_t n, const ew&) { return makeFuture(n < 3); };
+  using namespace futures::detail;
+  EXPECT_TRUE(retrying_policy_traits<decltype(policy_raw)>::is_raw::value);
+  EXPECT_TRUE(retrying_policy_traits<decltype(policy_fut)>::is_fut::value);
+}
+
+TEST(RetryingTest, basic) {
+  auto r = futures::retrying(
+      [](size_t n, const exception_wrapper&) { return n < 3; },
+      [](size_t n) {
+          return n < 2
+            ? makeFuture<size_t>(runtime_error("ha"))
+            : makeFuture(n);
+      }
+  ).wait();
+  EXPECT_EQ(2, r.value());
+}
+
+TEST(RetryingTest, policy_future) {
+  atomic<size_t> sleeps {0};
+  auto r = futures::retrying(
+      [&](size_t n, const exception_wrapper&) {
+          return n < 3
+            ? makeFuture(++sleeps).then([] { return true; })
+            : makeFuture(false);
+      },
+      [](size_t n) {
+          return n < 2
+            ? makeFuture<size_t>(runtime_error("ha"))
+            : makeFuture(n);
+      }
+  ).wait();
+  EXPECT_EQ(2, r.value());
+  EXPECT_EQ(2, sleeps);
+}
+
+TEST(RetryingTest, policy_basic) {
+  auto r = futures::retrying(
+      futures::retryingPolicyBasic(3),
+      [](size_t n) {
+          return n < 2
+            ? makeFuture<size_t>(runtime_error("ha"))
+            : makeFuture(n);
+      }
+  ).wait();
+  EXPECT_EQ(2, r.value());
+}
+
+TEST(RetryingTest, policy_capped_jittered_exponential_backoff) {
+  using ms = milliseconds;
+  auto start = steady_clock::now();
+  auto r = futures::retrying(
+      futures::retryingPolicyCappedJitteredExponentialBackoff(
+        3, ms(100), ms(1000), 0.1, mt19937_64(0),
+        [](size_t, const exception_wrapper&) { return true; }),
+      [](size_t n) {
+          return n < 2
+            ? makeFuture<size_t>(runtime_error("ha"))
+            : makeFuture(n);
+      }
+  ).wait();
+  auto finish = steady_clock::now();
+  auto duration = duration_cast<milliseconds>(finish - start);
+  EXPECT_EQ(2, r.value());
+  EXPECT_NEAR(
+      milliseconds(300).count(),
+      duration.count(),
+      milliseconds(25).count());
+}
+
+TEST(RetryingTest, policy_sleep_defaults) {
+  //  To ensure that this compiles with default params.
+  using ms = milliseconds;
+  auto start = steady_clock::now();
+  auto r = futures::retrying(
+      futures::retryingPolicyCappedJitteredExponentialBackoff(
+        3, ms(100), ms(1000), 0.1),
+      [](size_t n) {
+          return n < 2
+            ? makeFuture<size_t>(runtime_error("ha"))
+            : makeFuture(n);
+      }
+  ).wait();
+  auto finish = steady_clock::now();
+  auto duration = duration_cast<milliseconds>(finish - start);
+  EXPECT_EQ(2, r.value());
+  EXPECT_NEAR(
+      milliseconds(300).count(),
+      duration.count(),
+      milliseconds(100).count());
+}
+
+/*
+TEST(RetryingTest, policy_sleep_cancel) {
+  mt19937_64 rng(0);
+  using ms = milliseconds;
+  auto start = steady_clock::now();
+  auto r = futures::retrying(
+      futures::retryingPolicyCappedJitteredExponentialBackoff(
+        5, ms(100), ms(1000), 0.1, rng,
+        [](size_t n, const exception_wrapper&) { return true; }),
+      [](size_t n) {
+          return n < 4
+            ? makeFuture<size_t>(runtime_error("ha"))
+            : makeFuture(n);
+      }
+  );
+  r.cancel();
+  r.wait();
+  auto finish = steady_clock::now();
+  auto duration = duration_cast<milliseconds>(finish - start);
+  EXPECT_EQ(2, r.value());
+  EXPECT_NEAR(
+      milliseconds(0).count(),
+      duration.count(),
+      milliseconds(10).count());
+}
+*/
index c3a94ba76eb1e27539afaf2311f99fc67dd4c2a3..22ee324528ddaae0e8fb63c6d799e44330940566 100644 (file)
@@ -198,6 +198,7 @@ futures_test_SOURCES = \
     ../futures/test/PollTest.cpp \
     ../futures/test/PromiseTest.cpp \
     ../futures/test/ReduceTest.cpp \
+    ../futures/test/RetryingTest.cpp \
     ../futures/test/SharedPromiseTest.cpp \
     ../futures/test/ThenCompileTest.cpp \
     ../futures/test/ThenTest.cpp \