From 44ecd1efe287e2a4c7ba4c6618dced8f4919c629 Mon Sep 17 00:00:00 2001 From: Marc Celani Date: Mon, 3 Mar 2014 11:36:05 -0800 Subject: [PATCH] Make it easy to wrap pre-existing cob-style async apis Summary: Tao neesd a way to wrap its c-style async apis with Later. Although my comments suggest that you can do this, it turns out I never got around to implementing it. Well, here is the implementation. Basically, we supply the callback to the pre-existing api, and that callback will fulfill a promise that is used internally within Later. This is thread safe because the async call is not made until the starter is fired, and we can use the future immediately for chaining then() calls. Test Plan: unit test Reviewed By: hannesr@fb.com FB internal diff: D1197721 --- folly/wangle/Later-inl.h | 12 ++++++++++++ folly/wangle/Later.h | 31 ++++++++++++++++++++++++++++++- folly/wangle/test/LaterTest.cpp | 25 +++++++++++++++++++++++++ 3 files changed, 67 insertions(+), 1 deletion(-) diff --git a/folly/wangle/Later-inl.h b/folly/wangle/Later-inl.h index e17f2d7e..e20b6f6f 100644 --- a/folly/wangle/Later-inl.h +++ b/folly/wangle/Later-inl.h @@ -68,6 +68,18 @@ Later::Later(U&& input) { }); } +template +template +Later::Later(std::function&&)>&& fn) { + folly::MoveWrapper> promise; + future_ = promise->getFuture(); + starter_.getFuture().then([=](Try&& t) mutable { + fn([=](U&& output) mutable { + promise->setValue(std::move(output)); + }); + }); +} + template template typename std::enable_if< diff --git a/folly/wangle/Later.h b/folly/wangle/Later.h index 49f719c1..0d2088cf 100644 --- a/folly/wangle/Later.h +++ b/folly/wangle/Later.h @@ -32,7 +32,8 @@ template struct isLater; * threadsafe manner. * * The interface to add additional work is the same as future: a then() method - * that can take either a type T, a Future, or a Later + * that takes a function that can return either a type T, a Future, or a + * Later * * Thread transitions are done by using executors and calling the via() method. * @@ -62,16 +63,44 @@ class Later { public: typedef T value_type; + /* + * This default constructor is used to build an asynchronous workflow that + * takes no input. + */ template ::value>::type, class = typename std::enable_if::value>::type> Later(); + /* + * This constructor is used to build an asynchronous workflow that takes a + * value as input, and that value is passed in. + */ template ::value>::type, class = typename std::enable_if::value>::type> explicit Later(U&& input); + /* + * This constructor is used to wrap a pre-existing cob-style asynchronous api + * so that it can be used in wangle in a threadsafe manner. wangle provides + * the callback to this pre-existing api, and this callback will fulfill a + * promise so as to incorporate this api into the workflow. + * + * Example usage: + * + * // This adds two ints asynchronously. cob is called in another thread. + * void addAsync(int a, int b, std::function&& cob); + * + * Later asyncWrapper([=](std::function&& fn) { + * addAsync(1, 2, std::move(fn)); + * }); + */ + template ::value>::type, + class = typename std::enable_if::value>::type> + explicit Later(std::function&&)>&& fn); + /* * then() adds additional work to the end of the workflow. If the lambda * provided to then() returns a future, that future must be fulfilled in the diff --git a/folly/wangle/test/LaterTest.cpp b/folly/wangle/test/LaterTest.cpp index 37d0a4bf..a8243f5d 100644 --- a/folly/wangle/test/LaterTest.cpp +++ b/folly/wangle/test/LaterTest.cpp @@ -54,6 +54,12 @@ struct LaterFixture : public testing::Test { t.join(); } + void addAsync(int a, int b, std::function&& cob) { + eastExecutor->add([=]() { + cob(a + b); + }); + } + Later later; std::shared_ptr westExecutor; std::shared_ptr eastExecutor; @@ -112,6 +118,25 @@ TEST_F(LaterFixture, thread_hops) { EXPECT_EQ(future.value(), 1); } +TEST_F(LaterFixture, wrapping_preexisting_async_modules) { + auto westThreadId = std::this_thread::get_id(); + std::function&&)> wrapper = + [=](std::function&& fn) { + addAsync(2, 2, std::move(fn)); + }; + auto future = Later(std::move(wrapper)) + .via(westExecutor.get()) + .then([=](Try&& t) { + EXPECT_EQ(std::this_thread::get_id(), westThreadId); + return t.value(); + }) + .launch(); + while (!future.isReady()) { + waiter->makeProgress(); + } + EXPECT_EQ(future.value(), 4); +} + TEST_F(LaterFixture, chain_laters) { auto westThreadId = std::this_thread::get_id(); auto future = later.via(eastExecutor.get()).then([=](Try&& t) { -- 2.34.1