2 * Copyright 2017 Facebook, Inc.
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
8 * http://www.apache.org/licenses/LICENSE-2.0
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
19 #include <folly/Baton.h>
20 #include <folly/MPMCQueue.h>
21 #include <folly/executors/DrivableExecutor.h>
22 #include <folly/futures/Future.h>
23 #include <folly/futures/InlineExecutor.h>
24 #include <folly/futures/ManualExecutor.h>
25 #include <folly/portability/GTest.h>
27 using namespace folly;
29 struct ManualWaiter : public DrivableExecutor {
30 explicit ManualWaiter(std::shared_ptr<ManualExecutor> ex) : ex(ex) {}
32 void add(Func f) override {
33 ex->add(std::move(f));
36 void drive() override {
41 std::shared_ptr<ManualExecutor> ex;
44 struct ViaFixture : public testing::Test {
46 westExecutor(new ManualExecutor),
47 eastExecutor(new ManualExecutor),
48 waiter(new ManualWaiter(westExecutor)),
52 ManualWaiter eastWaiter(eastExecutor);
58 ~ViaFixture() override {
60 eastExecutor->add([=]() { });
64 void addAsync(int a, int b, std::function<void(int&&)>&& cob) {
65 eastExecutor->add([=]() {
70 std::shared_ptr<ManualExecutor> westExecutor;
71 std::shared_ptr<ManualExecutor> eastExecutor;
72 std::shared_ptr<ManualWaiter> waiter;
73 InlineExecutor inlineExecutor;
74 std::atomic<bool> done;
78 TEST(Via, exceptionOnLaunch) {
79 auto future = makeFuture<int>(std::runtime_error("E"));
80 EXPECT_THROW(future.value(), std::runtime_error);
83 TEST(Via, thenValue) {
84 auto future = makeFuture(std::move(1))
85 .then([](Try<int>&& t) {
86 return t.value() == 1;
90 EXPECT_TRUE(future.value());
93 TEST(Via, thenFuture) {
94 auto future = makeFuture(1)
95 .then([](Try<int>&& t) {
96 return makeFuture(t.value() == 1);
98 EXPECT_TRUE(future.value());
101 static Future<std::string> doWorkStatic(Try<std::string>&& t) {
102 return makeFuture(t.value() + ";static");
105 TEST(Via, thenFunction) {
107 Future<std::string> doWork(Try<std::string>&& t) {
108 return makeFuture(t.value() + ";class");
110 static Future<std::string> doWorkStatic(Try<std::string>&& t) {
111 return makeFuture(t.value() + ";class-static");
115 auto f = makeFuture(std::string("start"))
117 .then(Worker::doWorkStatic)
118 .then(&Worker::doWork, &w)
121 EXPECT_EQ(f.value(), "start;static;class-static;class");
124 TEST_F(ViaFixture, threadHops) {
125 auto westThreadId = std::this_thread::get_id();
126 auto f = via(eastExecutor.get())
127 .then([=](Try<Unit>&& /* t */) {
128 EXPECT_NE(std::this_thread::get_id(), westThreadId);
129 return makeFuture<int>(1);
131 .via(westExecutor.get())
132 .then([=](Try<int>&& t) {
133 EXPECT_EQ(std::this_thread::get_id(), westThreadId);
136 EXPECT_EQ(f.getVia(waiter.get()), 1);
139 TEST_F(ViaFixture, chainVias) {
140 auto westThreadId = std::this_thread::get_id();
141 auto f = via(eastExecutor.get()).then([=]() {
142 EXPECT_NE(std::this_thread::get_id(), westThreadId);
144 }).then([=](int val) {
145 return makeFuture(val).via(westExecutor.get())
146 .then([=](int v) mutable {
147 EXPECT_EQ(std::this_thread::get_id(), westThreadId);
150 }).then([=](int val) {
151 // even though ultimately the future that triggers this one executed in
152 // the west thread, this then() inherited the executor from its
153 // predecessor, ie the eastExecutor.
154 EXPECT_NE(std::this_thread::get_id(), westThreadId);
156 }).via(westExecutor.get()).then([=](int val) {
157 // go back to west, so we can wait on it
158 EXPECT_EQ(std::this_thread::get_id(), westThreadId);
162 EXPECT_EQ(f.getVia(waiter.get()), 4);
165 TEST_F(ViaFixture, bareViaAssignment) {
166 auto f = via(eastExecutor.get());
168 TEST_F(ViaFixture, viaAssignment) {
170 auto f = makeFuture().via(eastExecutor.get());
172 auto f2 = f.via(eastExecutor.get());
178 .thenMulti([] { return 42; })
184 auto f = makeFuture().thenMulti(
185 [&]{ count++; return 3.14159; },
186 [&](double) { count++; return std::string("hello"); },
187 [&]{ count++; return makeFuture(42); });
188 EXPECT_EQ(42, f.get());
192 struct PriorityExecutor : public Executor {
193 void add(Func /* f */) override {}
195 void addWithPriority(Func f, int8_t priority) override {
196 int mid = getNumPriorities() / 2;
197 int p = priority < 0 ?
198 std::max(0, mid + priority) :
199 std::min(getNumPriorities() - 1, mid + priority);
212 uint8_t getNumPriorities() const override {
221 TEST(Via, priority) {
222 PriorityExecutor exe;
223 via(&exe, -1).then([]{});
224 via(&exe, 0).then([]{});
225 via(&exe, 1).then([]{});
226 via(&exe, 42).then([]{}); // overflow should go to max priority
227 via(&exe, -42).then([]{}); // underflow should go to min priority
228 via(&exe).then([]{}); // default to mid priority
229 via(&exe, Executor::LO_PRI).then([]{});
230 via(&exe, Executor::HI_PRI).then([]{});
231 EXPECT_EQ(3, exe.count0);
232 EXPECT_EQ(2, exe.count1);
233 EXPECT_EQ(3, exe.count2);
236 TEST_F(ViaFixture, chainX1) {
239 .thenMultiWithExecutor(eastExecutor.get(),[] { return 42; })
243 TEST_F(ViaFixture, chainX3) {
244 auto westThreadId = std::this_thread::get_id();
246 auto f = via(westExecutor.get()).thenMultiWithExecutor(
249 EXPECT_NE(std::this_thread::get_id(), westThreadId);
250 count++; return 3.14159;
252 [&](double) { count++; return std::string("hello"); },
255 EXPECT_EQ(std::this_thread::get_id(), westThreadId);
256 return makeFuture(42);
258 EXPECT_EQ(42, f.getVia(waiter.get()));
263 ManualExecutor x1, x2;
264 bool a = false, b = false, c = false;
266 .then([&]{ a = true; })
267 .then(&x2, [&]{ b = true; })
268 .then([&]{ c = true; });
286 TEST(Via, then2Variadic) {
287 struct Foo { bool a = false; void foo(Try<Unit>) { a = true; } };
290 makeFuture().then(&x, &Foo::foo, &f);
296 #ifndef __APPLE__ // TODO #7372389
297 /// Simple executor that does work in another thread
298 class ThreadExecutor : public Executor {
299 folly::MPMCQueue<Func> funcs;
300 std::atomic<bool> done {false};
302 folly::Baton<> baton;
308 while (!funcs.isEmpty()) {
309 funcs.blockingRead(fn);
316 explicit ThreadExecutor(size_t n = 1024)
318 worker = std::thread(std::bind(&ThreadExecutor::work, this));
321 ~ThreadExecutor() override {
327 void add(Func fn) override {
328 funcs.blockingWrite(std::move(fn));
331 void waitForStartup() {
336 TEST(Via, viaThenGetWasRacy) {
338 std::unique_ptr<int> val =
339 folly::via(&x).then([] { return std::make_unique<int>(42); }).get();
344 TEST(Via, callbackRace) {
348 auto promises = std::make_shared<std::vector<Promise<Unit>>>(4);
349 std::vector<Future<Unit>> futures;
351 for (auto& p : *promises) {
352 futures.emplace_back(
355 .then([](Try<Unit>&&){}));
360 for (auto& p : *promises) {
365 return collectAll(futures);
372 class DummyDrivableExecutor : public DrivableExecutor {
374 void add(Func /* f */) override {}
375 void drive() override { ran = true; }
383 auto f = via(&x).then([]{ return true; });
384 EXPECT_TRUE(f.getVia(&x));
390 auto f = via(&x).then();
395 DummyDrivableExecutor x;
396 auto f = makeFuture(true);
397 EXPECT_TRUE(f.getVia(&x));
402 TEST(Via, getTryVia) {
406 auto f = via(&x).then([] { return 23; });
407 EXPECT_FALSE(f.isReady());
408 EXPECT_EQ(23, f.getTryVia(&x).value());
414 auto f = via(&x).then();
415 EXPECT_FALSE(f.isReady());
416 auto t = f.getTryVia(&x);
417 EXPECT_TRUE(t.hasValue());
421 DummyDrivableExecutor x;
422 auto f = makeFuture(23);
423 EXPECT_EQ(23, f.getTryVia(&x).value());
431 auto f = via(&x).then();
432 EXPECT_FALSE(f.isReady());
434 EXPECT_TRUE(f.isReady());
438 // try rvalue as well
440 auto f = via(&x).then().waitVia(&x);
441 EXPECT_TRUE(f.isReady());
445 DummyDrivableExecutor x;
446 makeFuture(true).waitVia(&x);
451 TEST(Via, viaRaces) {
454 auto tid = std::this_thread::get_id();
460 .then([&](Try<Unit>&&) { EXPECT_EQ(tid, std::this_thread::get_id()); })
461 .then([&](Try<Unit>&&) { EXPECT_EQ(tid, std::this_thread::get_id()); })
462 .then([&](Try<Unit>&&) { done = true; });
469 while (!done) x.run();
474 TEST(Via, viaDummyExecutorFutureSetValueFirst) {
475 // The callback object will get destroyed when passed to the executor.
477 // A promise will be captured by the callback lambda so we can observe that
478 // it will be destroyed.
479 Promise<Unit> captured_promise;
480 auto captured_promise_future = captured_promise.getFuture();
482 DummyDrivableExecutor x;
483 auto future = makeFuture().via(&x).then(
484 [c = std::move(captured_promise)] { return 42; });
486 EXPECT_THROW(future.get(std::chrono::seconds(5)), BrokenPromise);
488 captured_promise_future.get(std::chrono::seconds(5)), BrokenPromise);
491 TEST(Via, viaDummyExecutorFutureSetCallbackFirst) {
492 // The callback object will get destroyed when passed to the executor.
494 // A promise will be captured by the callback lambda so we can observe that
495 // it will be destroyed.
496 Promise<Unit> captured_promise;
497 auto captured_promise_future = captured_promise.getFuture();
499 DummyDrivableExecutor x;
500 Promise<Unit> trigger;
501 auto future = trigger.getFuture().via(&x).then(
502 [c = std::move(captured_promise)] { return 42; });
505 EXPECT_THROW(future.get(std::chrono::seconds(5)), BrokenPromise);
507 captured_promise_future.get(std::chrono::seconds(5)), BrokenPromise);
510 TEST(Via, viaExecutorDiscardsTaskFutureSetValueFirst) {
511 // The callback object will get destroyed when the ManualExecutor runs out
514 // A promise will be captured by the callback lambda so we can observe that
515 // it will be destroyed.
516 Promise<Unit> captured_promise;
517 auto captured_promise_future = captured_promise.getFuture();
519 Optional<Future<int>> future;
522 future = makeFuture().via(&x).then(
523 [c = std::move(captured_promise)] { return 42; });
526 EXPECT_THROW(future->get(std::chrono::seconds(5)), BrokenPromise);
528 captured_promise_future.get(std::chrono::seconds(5)), BrokenPromise);
531 TEST(Via, viaExecutorDiscardsTaskFutureSetCallbackFirst) {
532 // The callback object will get destroyed when the ManualExecutor runs out
535 // A promise will be captured by the callback lambda so we can observe that
536 // it will be destroyed.
537 Promise<Unit> captured_promise;
538 auto captured_promise_future = captured_promise.getFuture();
540 Optional<Future<int>> future;
543 Promise<Unit> trigger;
544 future = trigger.getFuture().via(&x).then(
545 [c = std::move(captured_promise)] { return 42; });
549 EXPECT_THROW(future->get(std::chrono::seconds(5)), BrokenPromise);
551 captured_promise_future.get(std::chrono::seconds(5)), BrokenPromise);
554 TEST(ViaFunc, liftsVoid) {
557 Future<Unit> f = via(&x, [&]{ count++; });
564 TEST(ViaFunc, value) {
566 EXPECT_EQ(42, via(&x, []{ return 42; }).getVia(&x));
569 TEST(ViaFunc, exception) {
572 via(&x, []() -> int { throw std::runtime_error("expected"); })
577 TEST(ViaFunc, future) {
579 EXPECT_EQ(42, via(&x, []{ return makeFuture(42); })
583 TEST(ViaFunc, voidFuture) {
586 via(&x, [&]{ count++; }).getVia(&x);
590 TEST(ViaFunc, isSticky) {
594 auto f = via(&x, [&]{ count++; });
597 f.then([&]{ count++; });
603 TEST(ViaFunc, moveOnly) {
605 auto intp = std::make_unique<int>(42);
607 EXPECT_EQ(42, via(&x, [intp = std::move(intp)] { return *intp; }).getVia(&x));