wangle/Executor.h \
wangle/Future-inl.h \
wangle/Future.h \
- wangle/GenericThreadGate.h \
wangle/InlineExecutor.h \
wangle/Later-inl.h \
wangle/Later.h \
wangle/Promise.h \
wangle/QueuedImmediateExecutor.h \
wangle/ScheduledExecutor.h \
- wangle/ThreadGate.h \
wangle/Try-inl.h \
wangle/Try.h \
wangle/WangleException.h \
Version.cpp \
wangle/InlineExecutor.cpp \
wangle/ManualExecutor.cpp \
- wangle/ThreadGate.cpp \
experimental/io/FsUtil.cpp \
experimental/Singleton.cpp \
experimental/TestUtil.cpp \
+++ /dev/null
-/*
- * Copyright 2014 Facebook, Inc.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-#pragma once
-#include <folly/wangle/ThreadGate.h>
-#include <folly/wangle/Executor.h>
-#include <type_traits>
-
-namespace folly { namespace wangle {
-
-/// This generic threadgate takes two executors and an optional waiter (if you
-/// need to support waiting). Hint: use executors that inherit from Executor
-/// (in Executor.h), then you just do
-///
-/// GenericThreadGate tg(westExecutor, eastExecutor, waiter);
-template <
- class WestExecutorPtr = Executor*,
- class EastExecutorPtr = Executor*,
- class WaiterPtr = void*>
-class GenericThreadGate : public ThreadGate {
-public:
- /**
- EastExecutor and WestExecutor respond threadsafely to
- `add(std::function<void()>&&)`
-
- Waiter responds to `makeProgress()`. It may block, as long as progress
- will be made on the west front.
- */
- GenericThreadGate(WestExecutorPtr west,
- EastExecutorPtr east,
- WaiterPtr waiter = nullptr) :
- westExecutor(west),
- eastExecutor(east),
- waiter(waiter)
- {}
-
- void addWest(std::function<void()>&& fn) { westExecutor->add(std::move(fn)); }
- void addEast(std::function<void()>&& fn) { eastExecutor->add(std::move(fn)); }
-
- virtual void makeProgress() {
- makeProgress_(std::is_same<WaiterPtr, void*>());
- }
-
- WestExecutorPtr westExecutor;
- EastExecutorPtr eastExecutor;
- WaiterPtr waiter;
-
-private:
- void makeProgress_(std::true_type const&) {
- throw std::logic_error("No waiter.");
- }
-
- void makeProgress_(std::false_type const&) {
- waiter->makeProgress();
- }
-};
-
-}} // executor
later.launch(); // explicit launch
```
-The third and least flexible (but sometimes very useful) method assumes only two threads and that you want to do something in the far thread, then come back to the current thread. `ThreadGate` is an interface for a bidirectional gateway between two threads. It's usually easier to use a Later, but ThreadGate can be more efficient, and if the pattern is used often in your code it can be more convenient.
-```C++
-// Using a ThreadGate (which has two executors xe and xw)
-tg.gate(a).then(b);
-
-// Using via
-makeFuture()
- .via(xe).then(a)
- .via(xw).then(b);
-```
-
## You make me Promises, Promises
If you are wrapping an asynchronous operation, or providing an asynchronous API to users, then you will want to make Promises. Every Future has a corresponding Promise (except Futures that spring into existence already completed, with `makeFuture()`). Promises are simple, you make one, you extract the Future, and you fulfil it with a value or an exception. Example:
+++ /dev/null
-/*
- * Copyright 2014 Facebook, Inc.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-#include <folly/wangle/ThreadGate.h>
-#include <stdexcept>
-
-namespace folly { namespace wangle {
-
-void ThreadGate::makeProgress()
-{
- throw std::logic_error("This ThreadGate doesn't know how to "
- "make progress.");
-}
-
-}} // namespace
+++ /dev/null
-/*
- * Copyright 2014 Facebook, Inc.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-#pragma once
-#include <memory>
-#include <folly/wangle/Future.h>
-#include <folly/wangle/Deprecated.h>
-
-namespace folly { namespace wangle {
-
-/**
- The ThreadGate strategy encapsulates a bidirectional gate via two Executors,
- kind of like a stargate for wangle Future chains. Its implementation is
- slightly more efficient then using Future::via in both directions, and if
- the pattern is common it can be more convenient (although the overhead of
- setting up a ThreadGate is less convenient in most situations).
-
- // Using a ThreadGate (which has two executors xe and xw)
- tg.gate(a).then(b);
-
- // Using via
- makeFuture()
- .via(xe).then(a)
- .via(xw).then(b);
-
- If you're not sure whether you want a ThreadGate, you don't. Use via.
-
- There are two actors, the east thread which does the asynchronous operation
- (the server) and the west thread that wants the asynchronous operation done
- (the client).
-
- The client calls gate<T>(fn), which returns a Future<T>. Practically speaking
- the returned Future<T> is the same as the Future<T> returned by fn. But
- there are actually two futures involved - the original Future which will be
- generated by fn (called the east Future), and the Future actually returned
- by gate<T>(fn) (called the west Future).
-
- These two futures are decoupled, and although the fulfilment of the east
- Future eventually causes fulfilment of the west Future, those fulfilments
- happen in their own threads.
-
- In order to make and use a ThreadGate, you need to provide a strategy for
- executing code in the east and west threads. These strategies may be
- different. The only requirement is a threadsafe method
- `void add(function<void()>&&)`.
-
- In order for your ThreadGate to do anything, you need to drive those
- executors somehow. An event loop is a natural fit. A thread pool might be
- made to work. You could use a busy loop to make a very expensive space
- heater. 0MQ would be pleasant.
-
- Another pattern supported by the ThreadGate is the single-thread pattern. In
- this pattern, non-blocking I/O drives the asynchronous operation, and
- futures are fulfilled in an event loop callback. In this scenario,
- ThreadGate is largely superfluous, and the executors would likely just
- execute code immediately and inline (and therefore not need to be driven, or
- threadsafe). But a Waiter strategy that makes progress by driving the event
- loop one iteration would allow for gate-and-wait code which is agnostic to
- the small detail that everything happens in one thread. It would also make
- Future change toward a multithreaded architecture easier, as you need only
- change the components of the ThreadGate which your client code is already
- using.
- */
-// DEPRECATED. Just use Future::via() to accomplish the same thing. If it's
-// not obvious how, feel free to reach out.
-class DEPRECATED ThreadGate {
-public:
- virtual ~ThreadGate() {}
-
- /**
- Returns a Future that will be fulfilled after the Future that will be
- returned by fn() has been fulfilled, with the same value or exception
- (moved).
-
- There's a lot of nuance in that sentence. Let's break it down.
-
- fn kicks off the asynchronous operation (makes the east Promise), and must
- be executed in the east thread because the east thread is where the east
- Promise will be fulfilled. Since gate is being called from the west
- thread, we must gate fn using the east executor. fn is not executed
- immediately, it is queued up and will be executed by the east thread as it
- drives the executor.
-
- We create the west Promise and return its Future.
-
- When the east thread executes its task, fn is called and the resulting
- Future gets a callback that will gate another task back to the west.
-
- Sometime later, the asynchronous operation completes and the east Promise
- is fulfilled. Then the east Future executes its callback, which adds a
- task to the west executor that task is to fulfil the west Promise with the
- same Try<T>, and it will execute in the west thread.
-
- At this point, the west Future is still unfulfilled, even though the east
- Future has been fulfilled and its callback has finished executing. Only
- when the west executor is driven to execute that task, the west Future
- will be completed and its callbacks called.
-
- In summary, both east and west need to have plans to drive their
- executors, or nothing will actually happen. When the executors are driven,
- then everything flows. */
- template <class T>
- Future<T> gate(std::function<Future<T>()>&& fn) {
- Promise<T> pWest;
- Future<T> fWest = pWest.getFuture();
-
- gate(std::move(fn), std::move(pWest));
- return fWest;
- }
-
- /**
- * This version of gate is to support use cases where the calling thread is
- * not the west thread. Here is an example use case.
- *
- * Promise<T> pWest;
- * Future<T> fWest = pWest.getFuture();
- *
- * // Set up callbacks for west from a thread that is not west.
- * fWest.then(...).then(...);
- *
- * threadGate.gate(..., std::move(pWest));
- *
- * This function assumes that it is safe to call addEast from a thread that is
- * not the west thread.
- */
- template <class T>
- void gate(std::function<Future<T>()>&& fn,
- Promise<T>&& p) {
- folly::MoveWrapper<Promise<T>> pWest(std::move(p));
- folly::MoveWrapper<std::function<Future<T>()>> fnm(std::move(fn));
- this->addEast([pWest, fnm, this]() mutable {
- (*fnm)().then([pWest, this](Try<T>&& t) mutable {
- folly::MoveWrapper<Try<T>> tm(std::move(t));
- this->addWest([pWest, tm]() mutable {
- pWest->fulfilTry(std::move(*tm));
- });
- });
- });
- }
-
- /**
- If your workflow calls for synchronizing with a
- west Future, then you may call waitFor, but if your west thread is
- event-driven you will probably not need to call waitFor.
-
- In order for waitFor to behave properly, you must ensure that the Waiter's
- makeProgress method causes some progress to be made on the west thread,
- i.e. drives the west executor either directly or indirectly.
-
- (Naturally, progress needs to be made on the east thread as well. i.e. the
- east executor is driven, the asynchronous operation happens, and its
- Promise is fulfilled. It is likely that none of this concerns the consumer
- of waitFor.)
-
- This is the only function that uses the Waiter. It is never called
- internally. Therefore, if you never use waitFor you can safely provide a
- DummyWaiter.
- */
- template <class T>
- void waitFor(Future<T> const& f) {
- while (!f.isReady()) {
- this->makeProgress();
- }
- }
-
- template <class T>
- typename std::add_lvalue_reference<T>::type
- value(Future<T>& f) {
- waitFor<T>(f);
- return f.value();
- }
-
- template <class T>
- typename std::add_lvalue_reference<const T>::type
- value(Future<T> const& f) {
- waitFor<T>(f);
- return f.value();
- }
-
- virtual void addEast(std::function<void()>&&) = 0;
- virtual void addWest(std::function<void()>&&) = 0;
- virtual void makeProgress();
-};
-
-}} // namespace
+++ /dev/null
-/*
- * Copyright 2014 Facebook, Inc.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-#include <gtest/gtest.h>
-#include <thread>
-#include <future>
-
-#include <folly/wangle/Executor.h>
-#include <folly/wangle/ManualExecutor.h>
-#include <folly/wangle/ThreadGate.h>
-#include <folly/wangle/GenericThreadGate.h>
-
-using namespace folly::wangle;
-using std::make_shared;
-using std::shared_ptr;
-using std::thread;
-using std::vector;
-
-struct ManualWaiter {
- explicit ManualWaiter(shared_ptr<ManualExecutor> ex) : ex(ex) {}
-
- void makeProgress() {
- ex->wait();
- ex->run();
- }
-
- shared_ptr<ManualExecutor> ex;
-};
-
-struct GenericThreadGateFixture : public testing::Test {
- GenericThreadGateFixture() :
- westExecutor(new ManualExecutor),
- eastExecutor(new ManualExecutor),
- waiter(new ManualWaiter(westExecutor)),
- tg(westExecutor, eastExecutor, waiter),
- done(false)
- {
- t = thread([=] {
- ManualWaiter eastWaiter(eastExecutor);
- while (!done)
- eastWaiter.makeProgress();
- });
- }
-
- ~GenericThreadGateFixture() {
- done = true;
- tg.gate<void>([] { return makeFuture(); });
- t.join();
- }
-
- shared_ptr<ManualExecutor> westExecutor;
- shared_ptr<ManualExecutor> eastExecutor;
- shared_ptr<ManualWaiter> waiter;
- GenericThreadGate<
- shared_ptr<ManualExecutor>,
- shared_ptr<ManualExecutor>,
- shared_ptr<ManualWaiter>> tg;
- bool done;
- thread t;
-};
-
-TEST_F(GenericThreadGateFixture, gate_and_wait) {
- auto f = tg.gate<void>([] { return makeFuture(); });
- EXPECT_FALSE(f.isReady());
-
- tg.waitFor(f);
- EXPECT_TRUE(f.isReady());
-}
-
-TEST_F(GenericThreadGateFixture, gate_many) {
- vector<Future<void>> fs;
- int n = 10;
-
- for (int i = 0; i < n; i++)
- fs.push_back(tg.gate<void>([&] { return makeFuture(); }));
-
- for (auto& f : fs)
- EXPECT_FALSE(f.isReady());
-
- auto all = whenAll(fs.begin(), fs.end());
- tg.waitFor(all);
-}
-
-TEST_F(GenericThreadGateFixture, gate_alternating) {
- vector<Promise<void>> ps(10);
- vector<Future<void>> fs;
- size_t count = 0;
-
- for (auto& p : ps) {
- auto* pp = &p;
- auto f = tg.gate<void>([=] { return pp->getFuture(); });
-
- // abuse the thread gate to do our dirty work in the other thread
- tg.gate<void>([=] { pp->setValue(); return makeFuture(); });
-
- fs.push_back(f.then([&](Try<void>&&) { count++; }));
- }
-
- for (auto& f : fs)
- EXPECT_FALSE(f.isReady());
- EXPECT_EQ(0, count);
-
- auto all = whenAll(fs.begin(), fs.end());
- tg.waitFor(all);
-
- EXPECT_EQ(ps.size(), count);
-}
-
-TEST(GenericThreadGate, noWaiter) {
- auto west = make_shared<ManualExecutor>();
- auto east = make_shared<ManualExecutor>();
- Promise<void> p;
- auto dummyFuture = p.getFuture();
-
- GenericThreadGate<ManualExecutor*, ManualExecutor*>
- tg(west.get(), east.get());
-
- EXPECT_THROW(tg.waitFor(dummyFuture), std::logic_error);
-}
-
-TEST_F(GenericThreadGateFixture, gate_with_promise) {
- Promise<int> p;
-
- auto westId = std::this_thread::get_id();
- bool westThenCalled = false;
- auto f = p.getFuture().then(
- [westId, &westThenCalled](Try<int>&& t) {
- EXPECT_EQ(t.value(), 1);
- EXPECT_EQ(std::this_thread::get_id(), westId);
- westThenCalled = true;
- return t.value();
- });
-
- bool eastPromiseMade = false;
- auto thread = std::thread([&p, &eastPromiseMade, this]() {
- EXPECT_NE(t.get_id(), std::this_thread::get_id());
- // South thread != west thread. p gets set in west thread.
- tg.gate<int>([&p, &eastPromiseMade, this] {
- EXPECT_EQ(t.get_id(), std::this_thread::get_id());
- Promise<int> eastPromise;
- auto eastFuture = eastPromise.getFuture();
- eastPromise.setValue(1);
- eastPromiseMade = true;
- return eastFuture;
- },
- std::move(p));
- });
-
- tg.waitFor(f);
- EXPECT_TRUE(westThenCalled);
- EXPECT_TRUE(eastPromiseMade);
- EXPECT_EQ(f.value(), 1);
- thread.join();
-}