Summary:
Introduce ProgressableExecutor, which is an Executor that can be driven somehow. Examples include EventBase and ManualExecutor
Then introduce Future<T>::getVia(ProgressableExecutor*) and Future<T>::waitVia(ProgressableExecutor*) that drive the given executor until the Future is complete, with the usual semantics of get and wait respectively
This is a really common pattern in tests and you can see in the various changes to other projects lends sopme nice redness and cleanliness
Some notes:
1. I don't like ProgressableExecutor::makeProgress() that much. Too verbose. Maybe DrivableExecutor::drive()? Something else? Thoughts?
2. I still need to integrate this with some stuff in Zeus (SessionFuture) and Zookeeper (ZookeeperFuture) but I'm going to do that in a separate diff because it's going to be a little more intrusive
3. These APIs take a raw ptr so that they are consistent with via()
4. See inline note on ManualExecutor
5. See inline note in added unit tests
Test Plan: add unit for new API, wait for contbuild
Reviewed By: hans@fb.com
Subscribers: trunkagent, dresende, pzq, tdimson, fbcode-common-diffs@, targeting-diff-backend@, alandau, apollo-diffs@, bmatheny, everstore-dev@, zhuohuang, laser-diffs@, mshneer, folly-diffs@, hannesr, jsedgwick
FB internal diff:
D1789122
Tasks:
5940008
Signature: t1:
1789122:
1421868315:
6ea2fc2702be1dc283c24a46d345fb5da3935b32
Format.h \
Format-inl.h \
futures/Deprecated.h \
+ futures/DrivableExecutor.h \
futures/Future-inl.h \
futures/Future.h \
futures/FutureException.h \
--- /dev/null
+/*
+ * Copyright 2015 Facebook, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#pragma once
+
+#include <folly/Executor.h>
+
+namespace folly {
+
+/*
+ * A DrivableExecutor can be driven via its drive() method
+ * Examples include EventBase (via loopOnce()) and ManualExecutor
+ * (via makeProgress()).
+ *
+ * This interface is most handy in conjunction with
+ * Future<T>::getVia(DrivableExecutor*) and
+ * Future<T>::waitVia(DrivableExecutor*)
+ *
+ * These call drive() * repeatedly until the Future is fulfilled.
+ * getVia() returns the value (or throws the exception) and waitVia() returns
+ * the same Future for chainability.
+ *
+ * These will be most helpful in tests, for instance if you need to pump a mock
+ * EventBase until Futures complete.
+ */
+class DrivableExecutor : public virtual Executor {
+ public:
+ virtual ~DrivableExecutor() = default;
+
+ // Make progress on this Executor's work.
+ virtual void drive() = 0;
+};
+
+} // folly
getWaitTimeoutHelper(this, dur).value();
}
+template <class T>
+T Future<T>::getVia(DrivableExecutor* e) {
+ while (!isReady()) {
+ e->drive();
+ }
+ return std::move(value());
+}
+
+template <>
+inline void Future<void>::getVia(DrivableExecutor* e) {
+ while (!isReady()) {
+ e->drive();
+ }
+ value();
+}
+
template <class T>
Future<T> Future<T>::within(Duration dur, Timekeeper* tk) {
return within(dur, TimedOut(), tk);
return done;
}
+template <class T>
+Future<T>& Future<T>::waitVia(DrivableExecutor* e) & {
+ while (!isReady()) {
+ e->drive();
+ }
+ return *this;
+}
+
+template <class T>
+Future<T> Future<T>::waitVia(DrivableExecutor* e) && {
+ while (!isReady()) {
+ e->drive();
+ }
+ return std::move(*this);
+}
+
}
// I haven't included a Future<T&> specialization because I don't forsee us
#include <folly/MoveWrapper.h>
#include <folly/futures/Deprecated.h>
+#include <folly/futures/DrivableExecutor.h>
#include <folly/futures/Promise.h>
#include <folly/futures/Try.h>
#include <folly/futures/FutureException.h>
typedef typename ArgType<Args...>::FirstArg FirstArg;
};
-
} // detail
struct Timekeeper;
/// exception).
T get(Duration dur);
+ /// Call e->drive() repeatedly until the future is fulfilled. Examples
+ /// of DrivableExecutor include EventBase and ManualExecutor. Returns the
+ /// value (moved out), or throws the exception.
+ T getVia(DrivableExecutor* e);
+
/** When this Future has completed, execute func which is a function that
takes a Try<T>&&. A Future for the return type of func is
returned. e.g.
/// depending on whether the Duration passed.
Future<T> wait(Duration);
+ /// Call e->drive() repeatedly until the future is fulfilled. Examples
+ /// of DrivableExecutor include EventBase and ManualExecutor. Returns a
+ /// reference to this Future so that you can chain calls if desired.
+ /// value (moved out), or throws the exception.
+ Future<T>& waitVia(DrivableExecutor* e) &;
+
+ /// Overload of waitVia() for rvalue Futures
+ Future<T> waitVia(DrivableExecutor* e) &&;
+
private:
typedef detail::Core<T>* corePtr;
*/
#pragma once
+#include <folly/futures/DrivableExecutor.h>
#include <folly/futures/ScheduledExecutor.h>
#include <semaphore.h>
#include <memory>
///
/// NB No attempt has been made to make anything other than add and schedule
/// threadsafe.
- class ManualExecutor : public ScheduledExecutor {
+ class ManualExecutor : public DrivableExecutor,
+ public ScheduledExecutor {
public:
ManualExecutor();
run();
}
+ /// Implements DrivableExecutor
+ void drive() override {
+ makeProgress();
+ }
+
/// makeProgress until this Future is ready.
template <class F> void waitFor(F const& f) {
// TODO(5427828)
namespace folly {
// An executor that supports timed scheduling. Like RxScheduler.
- class ScheduledExecutor : public Executor {
+ class ScheduledExecutor : public virtual Executor {
public:
// Reality is that better than millisecond resolution is very hard to
// achieve. However, we reserve the right to be incredible.
#include <folly/Executor.h>
#include <folly/futures/Future.h>
#include <folly/futures/ManualExecutor.h>
+#include <folly/futures/DrivableExecutor.h>
#include <folly/MPMCQueue.h>
#include <folly/io/async/Request.h>
}
}
+class DummyDrivableExecutor : public DrivableExecutor {
+ public:
+ void add(Func f) override {}
+ void drive() override { ran = true; }
+ bool ran{false};
+};
+
+TEST(Future, getVia) {
+ {
+ // non-void
+ ManualExecutor x;
+ auto f = via(&x).then([]{ return true; });
+ EXPECT_TRUE(f.getVia(&x));
+ }
+
+ {
+ // void
+ ManualExecutor x;
+ auto f = via(&x).then();
+ f.getVia(&x);
+ }
+
+ {
+ DummyDrivableExecutor x;
+ auto f = makeFuture(true);
+ EXPECT_TRUE(f.getVia(&x));
+ EXPECT_FALSE(x.ran);
+ }
+}
+
+TEST(Future, waitVia) {
+ {
+ ManualExecutor x;
+ auto f = via(&x).then();
+ EXPECT_FALSE(f.isReady());
+ f.waitVia(&x);
+ EXPECT_TRUE(f.isReady());
+ }
+
+ {
+ // try rvalue as well
+ ManualExecutor x;
+ auto f = via(&x).activate().then().waitVia(&x);
+ EXPECT_TRUE(f.isReady());
+ }
+
+ {
+ DummyDrivableExecutor x;
+ makeFuture(true).waitVia(&x);
+ EXPECT_FALSE(x.ran);
+ }
+}
+
TEST(Future, callbackAfterActivate) {
Promise<void> p;
auto f = p.getFuture();
#include <folly/futures/Future.h>
#include <folly/futures/InlineExecutor.h>
#include <folly/futures/ManualExecutor.h>
+#include <folly/futures/DrivableExecutor.h>
using namespace folly;
-struct ManualWaiter {
+struct ManualWaiter : public DrivableExecutor {
explicit ManualWaiter(std::shared_ptr<ManualExecutor> ex) : ex(ex) {}
- void makeProgress() {
+ void add(Func f) {
+ ex->add(f);
+ }
+
+ void drive() override {
ex->wait();
ex->run();
}
t = std::thread([=] {
ManualWaiter eastWaiter(eastExecutor);
while (!done)
- eastWaiter.makeProgress();
+ eastWaiter.drive();
});
}
EXPECT_EQ(std::this_thread::get_id(), westThreadId);
return t.value();
});
- while (!f.isReady()) {
- waiter->makeProgress();
- }
- EXPECT_EQ(f.value(), 1);
+ EXPECT_EQ(f.getVia(waiter.get()), 1);
}
TEST_F(ViaFixture, chain_vias) {
return t.value();
});
- while (!f.isReady()) {
- waiter->makeProgress();
- }
- EXPECT_EQ(f.value(), 1);
+ EXPECT_EQ(f.getVia(waiter.get()), 1);
}
TEST_F(ViaFixture, bareViaAssignment) {
#include <folly/io/async/TimeoutManager.h>
#include <folly/io/async/Request.h>
#include <folly/Executor.h>
+#include <folly/futures/DrivableExecutor.h>
#include <memory>
#include <stack>
#include <list>
* EventBase from other threads. When it is safe to call a method from
* another thread it is explicitly listed in the method comments.
*/
-class EventBase :
- private boost::noncopyable, public TimeoutManager, public Executor
-{
+class EventBase : private boost::noncopyable,
+ public TimeoutManager,
+ public DrivableExecutor {
public:
/**
* A callback interface to use with runInLoop()
runInEventBaseThread(fn);
}
+ /// Implements the DrivableExecutor interface
+ void drive() override {
+ loopOnce();
+ }
+
private:
// TimeoutManager