From c4035d0669465fa9e7ecb20f71633f71903fdeb7 Mon Sep 17 00:00:00 2001 From: Hans Fugal Date: Wed, 5 Nov 2014 15:44:06 -0800 Subject: [PATCH] (wangle) fix a race condition in Core::maybeCallback Summary: `calledBack_` could be seen as true by both threads in this conditional. Classic rookie mistake. :-/ Test Plan: run unit tests Reviewed By: darshan@fb.com Subscribers: trunkagent, hannesr, net-systems@, fugalh, exa, njormrod, folly-diffs@ FB internal diff: D1661199 Tasks: 5542938, 5506504 Signature: t1:1661199:1415215840:fb69f56c8cf6f59beeca809724ce015b5260d9ad Blame Revision: D1636487 --- folly/wangle/detail/Core.h | 25 ++++++------ folly/wangle/test/FutureTest.cpp | 65 ++++++++++++++++++++++++++++++++ 2 files changed, 78 insertions(+), 12 deletions(-) diff --git a/folly/wangle/detail/Core.h b/folly/wangle/detail/Core.h index bae0c870..1113e0b2 100644 --- a/folly/wangle/detail/Core.h +++ b/folly/wangle/detail/Core.h @@ -193,18 +193,19 @@ class Core : protected FSM { private: void maybeCallback() { assert(ready()); - if (!calledBack_ && isActive() && callback_) { - // TODO(5306911) we should probably try/catch - calledBack_ = true; - Executor* x = executor_; - - RequestContext::setContext(context_); - if (x) { - MoveWrapper&&)>> cb(std::move(callback_)); - MoveWrapper>> val(std::move(result_)); - x->add([cb, val]() mutable { (*cb)(std::move(**val)); }); - } else { - callback_(std::move(*result_)); + if (isActive() && callback_) { + if (!calledBack_.exchange(true)) { + // TODO(5306911) we should probably try/catch + Executor* x = executor_; + + RequestContext::setContext(context_); + if (x) { + MoveWrapper&&)>> cb(std::move(callback_)); + MoveWrapper>> val(std::move(result_)); + x->add([cb, val]() mutable { (*cb)(std::move(**val)); }); + } else { + callback_(std::move(*result_)); + } } } } diff --git a/folly/wangle/test/FutureTest.cpp b/folly/wangle/test/FutureTest.cpp index 1ecca30c..db15993c 100644 --- a/folly/wangle/test/FutureTest.cpp +++ b/folly/wangle/test/FutureTest.cpp @@ -27,6 +27,7 @@ #include #include #include +#include #include @@ -40,6 +41,43 @@ using std::vector; #define EXPECT_TYPE(x, T) \ EXPECT_TRUE((std::is_same::value)) +/// Simple executor that does work in another thread +class ThreadExecutor : public Executor { + folly::MPMCQueue funcs; + std::atomic done {false}; + std::thread worker; + folly::Baton<> baton; + + void work() { + baton.post(); + Func fn; + while (!done) { + while (!funcs.isEmpty()) { + funcs.blockingRead(fn); + fn(); + } + } + } + + public: + ThreadExecutor(size_t n = 1024) + : funcs(n), worker(std::bind(&ThreadExecutor::work, this)) {} + + ~ThreadExecutor() { + done = true; + funcs.write([]{}); + worker.join(); + } + + void add(Func fn) override { + funcs.blockingWrite(std::move(fn)); + } + + void waitForStartup() { + baton.wait(); + } +}; + typedef WangleException eggs_t; static eggs_t eggs("eggs"); @@ -950,3 +988,30 @@ TEST(Future, context) { // Fulfil the promise p.setValue(); } + + +// This only fails about 1 in 1k times when the bug is present :( +TEST(Future, t5506504) { + ThreadExecutor x; + + auto fn = [&x]{ + auto promises = std::make_shared>>(4); + vector> futures; + + for (auto& p : *promises) { + futures.emplace_back( + p.getFuture() + .via(&x) + .then([](Try&&){})); + } + + x.waitForStartup(); + x.add([promises]{ + for (auto& p : *promises) p.setValue(); + }); + + return whenAll(futures.begin(), futures.end()); + }; + + waitWithSemaphore(fn()); +} -- 2.34.1