futures/detail/Core.h \
futures/detail/FSM.h \
futures/detail/Types.h \
+ futures/test/TestExecutor.h \
gen/Base.h \
gen/Base-inl.h \
gen/Combine.h \
futures/ManualExecutor.cpp \
futures/QueuedImmediateExecutor.cpp \
futures/ThreadWheelTimekeeper.cpp \
+ futures/test/TestExecutor.cpp \
detail/Futex.cpp \
detail/StaticSingletonManager.cpp \
detail/ThreadLocalDetail.cpp \
is_fut::value, retrying_policy_fut_tag, void>::type>::type;
};
+template <class Policy, class FF, class Prom>
+void retryingImpl(size_t k, Policy&& p, FF&& ff, Prom prom) {
+ using F = typename std::result_of<FF(size_t)>::type;
+ using T = typename F::value_type;
+ auto f = ff(k++);
+ f.then([
+ k,
+ prom = std::move(prom),
+ pm = std::forward<Policy>(p),
+ ffm = std::forward<FF>(ff)
+ ](Try<T> && t) mutable {
+ if (t.hasValue()) {
+ prom.setValue(std::move(t).value());
+ return;
+ }
+ auto& x = t.exception();
+ auto q = pm(k, x);
+ q.then([
+ k,
+ prom = std::move(prom),
+ xm = std::move(x),
+ pm = std::move(pm),
+ ffm = std::move(ffm)
+ ](bool shouldRetry) mutable {
+ if (shouldRetry) {
+ retryingImpl(k, std::move(pm), std::move(ffm), std::move(prom));
+ } else {
+ prom.setException(std::move(xm));
+ };
+ });
+ });
+}
+
template <class Policy, class FF>
typename std::result_of<FF(size_t)>::type
retrying(size_t k, Policy&& p, FF&& ff) {
using F = typename std::result_of<FF(size_t)>::type;
using T = typename F::value_type;
- auto f = ff(k++);
- return f.onError(
- [ k, pm = std::forward<Policy>(p), ffm = std::forward<FF>(ff) ](
- exception_wrapper x) mutable {
- auto q = pm(k, x);
- return q.then(
- [ k, xm = std::move(x), pm = std::move(pm), ffm = std::move(ffm) ](
- bool r) mutable {
- return r ? retrying(k, std::move(pm), std::move(ffm))
- : makeFuture<T>(std::move(xm));
- });
- });
+ auto prom = Promise<T>();
+ auto f = prom.getFuture();
+ retryingImpl(
+ k, std::forward<Policy>(p), std::forward<FF>(ff), std::move(prom));
+ return f;
}
template <class Policy, class FF>
* indicating that the failure was transitory.
*
* Cancellation is not supported.
+ *
+ * If both FF and Policy inline executes, then it is possible to hit a stack
+ * overflow due to the recursive nature of the retry implementation
*/
template <class Policy, class FF>
typename std::result_of<FF(size_t)>::type
#include <folly/futures/Future.h>
#include <folly/portability/GTest.h>
+#include <folly/portability/SysResource.h>
+#include "TestExecutor.h"
using namespace std;
using namespace std::chrono;
});
}
+TEST(RetryingTest, large_retries) {
+ rlimit oldMemLimit;
+ PCHECK(getrlimit(RLIMIT_AS, &oldMemLimit) == 0);
+
+ rlimit newMemLimit;
+ newMemLimit.rlim_cur = std::min(1UL << 30, oldMemLimit.rlim_max);
+ newMemLimit.rlim_max = oldMemLimit.rlim_max;
+ PCHECK(setrlimit(RLIMIT_AS, &newMemLimit) == 0);
+ SCOPE_EXIT {
+ PCHECK(setrlimit(RLIMIT_AS, &oldMemLimit) == 0);
+ };
+
+ TestExecutor executor;
+ // size of implicit promise is at least the size of the return.
+ using LargeReturn = array<uint64_t, 16000>;
+ auto func = [&executor](size_t retryNum) -> Future<LargeReturn> {
+ return via(&executor).then([retryNum] {
+ return retryNum < 10000
+ ? makeFuture<LargeReturn>(
+ make_exception_wrapper<std::runtime_error>("keep trying"))
+ : makeFuture<LargeReturn>(LargeReturn());
+ });
+ };
+
+ vector<Future<LargeReturn>> futures;
+ for (auto idx = 0; idx < 40; ++idx) {
+ futures.emplace_back(futures::retrying(
+ [&executor](size_t, const exception_wrapper&) {
+ return via(&executor).then([] { return true; });
+ },
+ func));
+ }
+
+ for (auto& f : futures) {
+ f.wait();
+ EXPECT_TRUE(f.hasValue());
+ }
+}
+
/*
TEST(RetryingTest, policy_sleep_cancel) {
multiAttemptExpectDurationWithin(5, milliseconds(0), milliseconds(10), []{
--- /dev/null
+/*
+ * Copyright 2017 Facebook, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "TestExecutor.h"
+
+using namespace std;
+
+namespace folly {
+
+TestExecutor::TestExecutor() {
+ const auto kWorkers = std::max(1U, thread::hardware_concurrency());
+ for (auto idx = 0U; idx < kWorkers; ++idx) {
+ workers_.emplace_back([this] {
+ while (true) {
+ Func work;
+ {
+ unique_lock<mutex> lk(m_);
+ cv_.wait(lk, [this] { return !workItems_.empty(); });
+ work = std::move(workItems_.front());
+ workItems_.pop();
+ }
+ if (!work) {
+ break;
+ }
+ work();
+ }
+ });
+ }
+}
+
+TestExecutor::~TestExecutor() {
+ for (auto& worker : workers_) {
+ addImpl({});
+ }
+
+ for (auto& worker : workers_) {
+ worker.join();
+ }
+}
+
+void TestExecutor::add(Func f) {
+ if (f) {
+ addImpl(std::move(f));
+ }
+}
+
+uint32_t TestExecutor::numThreads() const {
+ return workers_.size();
+}
+
+void TestExecutor::addImpl(Func f) {
+ {
+ lock_guard<mutex> g(m_);
+ workItems_.push(std::move(f));
+ }
+ cv_.notify_one();
+}
+
+} // folly
--- /dev/null
+/*
+ * Copyright 2017 Facebook, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#pragma once
+
+#include <condition_variable>
+#include <queue>
+#include <thread>
+
+#include <folly/Executor.h>
+
+namespace folly {
+
+/**
+ * A simple multithreaded executor for use in tests etc
+ */
+class TestExecutor : public Executor {
+ public:
+ TestExecutor();
+
+ ~TestExecutor() override;
+
+ void add(Func f) override;
+
+ uint32_t numThreads() const;
+
+ private:
+ void addImpl(Func f);
+
+ std::mutex m_;
+ std::queue<Func> workItems_;
+ std::condition_variable cv_;
+
+ std::vector<std::thread> workers_;
+};
+
+} // folly
--- /dev/null
+/*
+ * Copyright 2017 Facebook, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include <folly/portability/GTest.h>
+#include "TestExecutor.h"
+
+using namespace std;
+using namespace std::chrono;
+using namespace folly;
+
+TEST(TestExecutor, parallel_run) {
+ mutex m;
+ set<thread::id> ids;
+ auto executor = std::make_unique<TestExecutor>();
+ const auto numThreads = executor->numThreads();
+ for (auto idx = 0U; idx < numThreads * 10; ++idx) {
+ executor->add([&m, &ids]() mutable {
+ /* sleep override */ this_thread::sleep_for(milliseconds(100));
+ lock_guard<mutex> lg(m);
+ ids.insert(this_thread::get_id());
+ });
+ }
+
+ executor = nullptr;
+ EXPECT_EQ(ids.size(), numThreads);
+}
../futures/test/RetryingTest.cpp \
../futures/test/SelfDestructTest.cpp \
../futures/test/SharedPromiseTest.cpp \
+ ../futures/test/TestExecutorTest.cpp \
../futures/test/ThenCompileTest.cpp \
../futures/test/ThenTest.cpp \
../futures/test/TimekeeperTest.cpp \