--- /dev/null
+/*
+ * Copyright 2014 Facebook, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#pragma once
+#include <folly/wangle/Future.h>
+
+namespace folly { namespace wangle {
+
+template <typename ExecutorImpl>
+class FutureExecutor : public ExecutorImpl {
+ public:
+ template <typename... Args>
+ explicit FutureExecutor(Args&&... args)
+ : ExecutorImpl(std::forward<Args>(args)...) {}
+
+ /*
+ * Given a function func that returns a Future<T>, adds that function to the
+ * contained Executor and returns a Future<T> which will be fulfilled with
+ * func's result once it has been executed.
+ *
+ * For example: auto f = futureExecutor.addFuture([](){
+ * return doAsyncWorkAndReturnAFuture();
+ * });
+ */
+ template <typename F>
+ typename std::enable_if<isFuture<typename std::result_of<F()>::type>::value,
+ typename std::result_of<F()>::type>::type
+ addFuture(F func) {
+ typedef typename std::result_of<F()>::type::value_type T;
+ Promise<T> promise;
+ auto future = promise.getFuture();
+ auto movePromise = folly::makeMoveWrapper(std::move(promise));
+ auto moveFunc = folly::makeMoveWrapper(std::move(func));
+ ExecutorImpl::add([movePromise, moveFunc] () mutable {
+ (*moveFunc)().then([movePromise] (Try<T>&& t) mutable {
+ movePromise->fulfilTry(std::move(t));
+ });
+ });
+ return future;
+ }
+
+ /*
+ * Similar to addFuture above, but takes a func that returns some non-Future
+ * type T.
+ *
+ * For example: auto f = futureExecutor.addFuture([]() {
+ * return 42;
+ * });
+ */
+ template <typename F>
+ typename std::enable_if<!isFuture<typename std::result_of<F()>::type>::value,
+ Future<typename std::result_of<F()>::type>>::type
+ addFuture(F func) {
+ typedef typename std::result_of<F()>::type T;
+ Promise<T> promise;
+ auto future = promise.getFuture();
+ auto movePromise = folly::makeMoveWrapper(std::move(promise));
+ auto moveFunc = folly::makeMoveWrapper(std::move(func));
+ ExecutorImpl::add([movePromise, moveFunc] () mutable {
+ movePromise->fulfil(std::move(*moveFunc));
+ });
+ return future;
+ }
+};
+
+}}
* limitations under the License.
*/
+#include <folly/experimental/wangle/concurrent/FutureExecutor.h>
#include <folly/experimental/wangle/concurrent/ThreadPoolExecutor.h>
#include <folly/experimental/wangle/concurrent/CPUThreadPoolExecutor.h>
#include <folly/experimental/wangle/concurrent/IOThreadPoolExecutor.h>
TEST(ThreadPoolExecutorTest, IOExpiration) {
expiration<IOThreadPoolExecutor>();
}
+
+template <typename TPE>
+static void futureExecutor() {
+ FutureExecutor<TPE> fe(2);
+ int c = 0;
+ fe.addFuture([] () { return makeFuture<int>(42); }).then(
+ [&] (Try<int>&& t) {
+ c++;
+ EXPECT_EQ(42, t.value());
+ });
+ fe.addFuture([] () { return 100; }).then(
+ [&] (Try<int>&& t) {
+ c++;
+ EXPECT_EQ(100, t.value());
+ });
+ fe.addFuture([] () { return makeFuture(); }).then(
+ [&] (Try<void>&& t) {
+ c++;
+ EXPECT_NO_THROW(t.value());
+ });
+ fe.addFuture([] () { return; }).then(
+ [&] (Try<void>&& t) {
+ c++;
+ EXPECT_NO_THROW(t.value());
+ });
+ fe.addFuture([] () { throw std::runtime_error("oops"); }).then(
+ [&] (Try<void>&& t) {
+ c++;
+ EXPECT_THROW(t.value(), std::runtime_error);
+ });
+ // Test doing actual async work
+ fe.addFuture([] () {
+ auto p = std::make_shared<Promise<int>>();
+ std::thread t([p](){
+ burnMs(10)();
+ p->setValue(42);
+ });
+ t.detach();
+ return p->getFuture();
+ }).then([&] (Try<int>&& t) {
+ EXPECT_EQ(42, t.value());
+ c++;
+ });
+ burnMs(15)(); // Sleep long enough for the promise to be fulfilled
+ fe.join();
+ EXPECT_EQ(6, c);
+}
+
+TEST(ThreadPoolExecutorTest, CPUFuturePool) {
+ futureExecutor<CPUThreadPoolExecutor>();
+}
+
+TEST(ThreadPoolExecutorTest, IOFuturePool) {
+ futureExecutor<IOThreadPoolExecutor>();
+}