From 1219e49411f20b15f0d577375891d5d20a20e254 Mon Sep 17 00:00:00 2001 From: Yedidya Feldblum Date: Mon, 2 Feb 2015 11:37:23 -0800 Subject: [PATCH] EventBase::runInEventLoopThreadAndWait. Summary: [Folly] EventBase::runInEventLoopThreadAndWait. Useful for when some code needs to be run in the event loop thread, but another thread needs to trigger the code and then wait for it to be done. Test Plan: Unit tests: * `folly/io/async/test/EventBaseTest.cpp` Reviewed By: davejwatson@fb.com Subscribers: trunkagent, folly-diffs@, brettp, dougw FB internal diff: D1810764 Signature: t1:1810764:1422900654:7ff0aa7feb2792266f620b344cf8a1110a09f7ef --- folly/io/async/EventBase.cpp | 35 ++++++++++++++++++++++ folly/io/async/EventBase.h | 22 ++++++++++++++ folly/io/async/test/EventBaseTest.cpp | 42 +++++++++++++++++++++++++++ 3 files changed, 99 insertions(+) diff --git a/folly/io/async/EventBase.cpp b/folly/io/async/EventBase.cpp index e956ca18..00aa9d80 100644 --- a/folly/io/async/EventBase.cpp +++ b/folly/io/async/EventBase.cpp @@ -20,6 +20,7 @@ #include +#include #include #include @@ -562,6 +563,40 @@ bool EventBase::runInEventBaseThread(const Cob& fn) { return true; } +bool EventBase::runInEventBaseThreadAndWait(void (*fn)(void*), void* arg) { + if (inRunningEventBaseThread()) { + LOG(ERROR) << "EventBase " << this << ": Waiting in the event loop is not " + << "allowed"; + return false; + } + + Baton<> ready; + runInEventBaseThread([&] { + fn(arg); + ready.post(); + }); + ready.wait(); + + return true; +} + +bool EventBase::runInEventBaseThreadAndWait(const Cob& fn) { + if (inRunningEventBaseThread()) { + LOG(ERROR) << "EventBase " << this << ": Waiting in the event loop is not " + << "allowed"; + return false; + } + + Baton<> ready; + runInEventBaseThread([&] { + fn(); + ready.post(); + }); + ready.wait(); + + return true; +} + bool EventBase::runAfterDelay(const Cob& cob, int milliseconds, TimeoutManager::InternalEnum in) { diff --git a/folly/io/async/EventBase.h b/folly/io/async/EventBase.h index 484977b1..3cdfd2dd 100644 --- a/folly/io/async/EventBase.h +++ b/folly/io/async/EventBase.h @@ -346,6 +346,28 @@ class EventBase : private boost::noncopyable, */ bool runInEventBaseThread(const Cob& fn); + /* + * Like runInEventBaseThread, but the caller waits for the callback to be + * executed. + */ + template + bool runInEventBaseThreadAndWait(void (*fn)(T*), T* arg) { + return runInEventBaseThreadAndWait(reinterpret_cast(fn), + reinterpret_cast(arg)); + } + + /* + * Like runInEventBaseThread, but the caller waits for the callback to be + * executed. + */ + bool runInEventBaseThreadAndWait(void (*fn)(void*), void* arg); + + /* + * Like runInEventBaseThread, but the caller waits for the callback to be + * executed. + */ + bool runInEventBaseThreadAndWait(const Cob& fn); + /** * Runs the given Cob at some time after the specified number of * milliseconds. (No guarantees exactly when.) diff --git a/folly/io/async/test/EventBaseTest.cpp b/folly/io/async/test/EventBaseTest.cpp index cacf18fe..25505499 100644 --- a/folly/io/async/test/EventBaseTest.cpp +++ b/folly/io/async/test/EventBaseTest.cpp @@ -22,14 +22,17 @@ #include #include +#include #include #include #include #include +using std::atomic; using std::deque; using std::pair; using std::vector; +using std::thread; using std::make_pair; using std::cerr; using std::endl; @@ -1171,6 +1174,45 @@ TEST(EventBaseTest, RunInThread) { } } +// This test simulates some calls, and verifies that the waiting happens by +// triggering what otherwise would be race conditions, and trying to detect +// whether any of the race conditions happened. +TEST(EventBaseTest, RunInEventLoopThreadAndWait) { + const size_t c = 256; + vector> atoms(c); + for (size_t i = 0; i < c; ++i) { + auto& atom = atoms.at(i); + atom = 0; + } + vector threads(c); + for (size_t i = 0; i < c; ++i) { + auto& atom = atoms.at(i); + auto& th = threads.at(i); + th = thread([&atom] { + EventBase eb; + auto ebth = thread([&]{ eb.loopForever(); }); + eb.waitUntilRunning(); + eb.runInEventBaseThreadAndWait([&] { + size_t x = 0; + atom.compare_exchange_weak( + x, 1, std::memory_order_release, std::memory_order_relaxed); + }); + size_t x = 0; + atom.compare_exchange_weak( + x, 2, std::memory_order_release, std::memory_order_relaxed); + eb.terminateLoopSoon(); + ebth.join(); + }); + } + for (size_t i = 0; i < c; ++i) { + auto& th = threads.at(i); + th.join(); + } + size_t sum = 0; + for (auto& atom : atoms) sum += atom; + EXPECT_EQ(c, sum); +} + /////////////////////////////////////////////////////////////////////////// // Tests for runInLoop() /////////////////////////////////////////////////////////////////////////// -- 2.34.1