#pragma once
+#include <assert.h>
+#include <errno.h>
#include <stdint.h>
#include <atomic>
-#include <errno.h>
-#include <assert.h>
+#include <thread>
#include <folly/detail/Futex.h>
#include <folly/detail/MemoryIdler.h>
namespace folly {
-/// A Baton allows a thread to block once and be awoken: it captures
-/// a single handoff. During its lifecycle (from construction/reset to
-/// destruction/reset) a baton must either be post()ed and wait()ed exactly
-/// once each, or not at all.
+/// A Baton allows a thread to block once and be awoken. The single
+/// poster version (with SinglePoster == true) captures a single
+/// handoff, and during its lifecycle (from construction/reset to
+/// destruction/reset) a baton must either be post()ed and wait()ed
+/// exactly once each, or not at all.
+///
+/// The multi-poster version (SinglePoster == false) allows multiple
+/// concurrent handoff attempts, the first of which completes the
+/// handoff and the rest if any are idempotent.
///
/// Baton includes no internal padding, and is only 4 bytes in size.
/// Any alignment or padding to avoid false sharing is up to the user.
///
-/// This is basically a stripped-down semaphore that supports only a
-/// single call to sem_post and a single call to sem_wait. The current
-/// posix semaphore sem_t isn't too bad, but this provides more a bit more
-/// speed, inlining, smaller size, a guarantee that the implementation
-/// won't change, and compatibility with DeterministicSchedule. By having
-/// a much more restrictive lifecycle we can also add a bunch of assertions
-/// that can help to catch race conditions ahead of time.
-template <template<typename> class Atom = std::atomic>
+/// This is basically a stripped-down semaphore that supports (only a
+/// single call to sem_post, when SinglePoster == true) and a single
+/// call to sem_wait.
+///
+/// The non-blocking version (Blocking == false) provides more speed
+/// by using only load acquire and store release operations in the
+/// critical path, at the cost of disallowing blocking and timing out.
+///
+/// The current posix semaphore sem_t isn't too bad, but this provides
+/// more a bit more speed, inlining, smaller size, a guarantee that
+/// the implementation won't change, and compatibility with
+/// DeterministicSchedule. By having a much more restrictive
+/// lifecycle we can also add a bunch of assertions that can help to
+/// catch race conditions ahead of time.
+template <
+ template <typename> class Atom = std::atomic,
+ bool SinglePoster = true, // single vs multiple posters
+ bool Blocking = true> // blocking vs spinning
struct Baton {
constexpr Baton() : state_(INIT) {}
}
/// Causes wait() to wake up. For each lifetime of a Baton (where a
- /// lifetime starts at construction or reset() and ends at destruction
- /// or reset()) there can be at most one call to post(). Any thread
- /// may call post().
- ///
- /// Although we could implement a more generic semaphore semantics
- /// without any extra size or CPU overhead, the single-call limitation
- /// allows us to have better assert-ions during debug builds.
+ /// lifetime starts at construction or reset() and ends at
+ /// destruction or reset()) there can be at most one call to post(),
+ /// in the single poster version. Any thread may call post().
void post() {
- uint32_t before = state_.load(std::memory_order_acquire);
-
- assert(before == INIT || before == WAITING || before == TIMED_OUT);
-
- if (before == INIT &&
- state_.compare_exchange_strong(before, EARLY_DELIVERY)) {
+ if (!Blocking) {
+ /// Non-blocking version
+ ///
+ assert([&] {
+ auto state = state_.load(std::memory_order_relaxed);
+ return (state == INIT || state == EARLY_DELIVERY);
+ }());
+ state_.store(EARLY_DELIVERY, std::memory_order_release);
return;
}
- assert(before == WAITING || before == TIMED_OUT);
+ /// Blocking versions
+ ///
+ if (SinglePoster) {
+ /// Single poster version
+ ///
+ uint32_t before = state_.load(std::memory_order_acquire);
- if (before == TIMED_OUT) {
- return;
- }
+ assert(before == INIT || before == WAITING || before == TIMED_OUT);
- assert(before == WAITING);
- state_.store(LATE_DELIVERY, std::memory_order_release);
- state_.futexWake(1);
+ if (before == INIT &&
+ state_.compare_exchange_strong(before, EARLY_DELIVERY)) {
+ return;
+ }
+
+ assert(before == WAITING || before == TIMED_OUT);
+
+ if (before == TIMED_OUT) {
+ return;
+ }
+
+ assert(before == WAITING);
+ state_.store(LATE_DELIVERY, std::memory_order_release);
+ state_.futexWake(1);
+ } else {
+ /// Multi-poster version
+ ///
+ while (true) {
+ uint32_t before = state_.load(std::memory_order_acquire);
+
+ if (before == INIT &&
+ state_.compare_exchange_strong(before, EARLY_DELIVERY)) {
+ return;
+ }
+
+ if (before == TIMED_OUT) {
+ return;
+ }
+
+ if (before == EARLY_DELIVERY || before == LATE_DELIVERY) {
+ // The reason for not simply returning (without the following
+ // atomic operation) is to avoid the following case:
+ //
+ // T1: T2: T3:
+ // local1.post(); local2.post(); global.wait();
+ // global.post(); global.post(); local1.try_wait() == true;
+ // local2.try_wait() == false;
+ //
+ if (state_.fetch_add(0) != before) {
+ continue;
+ }
+ return;
+ }
+
+ assert(before == WAITING);
+ if (!state_.compare_exchange_weak(before, LATE_DELIVERY)) {
+ continue;
+ }
+ state_.futexWake(1);
+ return;
+ }
+ }
}
/// Waits until post() has been called in the current Baton lifetime.
return;
}
+ if (!Blocking) {
+ while (!try_wait()) {
+ std::this_thread::yield();
+ }
+ return;
+ }
+
// guess we have to block :(
uint32_t expected = INIT;
if (!state_.compare_exchange_strong(expected, WAITING)) {
/// false otherwise
template <typename Clock, typename Duration = typename Clock::duration>
bool timed_wait(const std::chrono::time_point<Clock,Duration>& deadline) {
+ static_assert(Blocking, "Non-blocking Baton does not support timed wait.");
+
if (spinWaitForEarlyDelivery()) {
assert(state_.load(std::memory_order_acquire) == EARLY_DELIVERY);
return true;
/// call wait, try_wait or timed_wait on the same baton without resetting
///
/// @return true if baton has been posted, false othewise
- bool try_wait() {
+ bool try_wait() const {
auto s = state_.load(std::memory_order_acquire);
assert(s == INIT || s == EARLY_DELIVERY);
return s == EARLY_DELIVERY;
using namespace folly::test;
using folly::detail::EmulatedFutexAtomic;
-TEST(Baton, basic) {
- Baton<> b;
- b.post();
- b.wait();
+/// Basic test
+
+TEST(Baton, basic_single_poster_blocking) {
+ run_basic_test<std::atomic, true, true>();
+ run_basic_test<EmulatedFutexAtomic, true, true>();
+ run_basic_test<DeterministicAtomic, true, true>();
+}
+
+TEST(Baton, basic_single_poster_nonblocking) {
+ run_basic_test<std::atomic, true, false>();
+ run_basic_test<EmulatedFutexAtomic, true, false>();
+ run_basic_test<DeterministicAtomic, true, false>();
+}
+
+TEST(Baton, basic_multi_poster_blocking) {
+ run_basic_test<std::atomic, false, true>();
+}
+
+TEST(Baton, basic_multi_poster_nonblocking) {
+ run_basic_test<std::atomic, false, false>();
+}
+
+/// Ping pong tests
+
+TEST(Baton, pingpong_single_poster_blocking) {
+ DSched sched(DSched::uniform(0));
+
+ run_pingpong_test<DeterministicAtomic, true, true>(1000);
+}
+
+TEST(Baton, pingpong_single_poster_nonblocking) {
+ DSched sched(DSched::uniform(0));
+
+ run_pingpong_test<DeterministicAtomic, true, false>(1000);
}
-TEST(Baton, pingpong) {
+TEST(Baton, pingpong_multi_poster_blocking) {
DSched sched(DSched::uniform(0));
- run_pingpong_test<DeterministicAtomic>(1000);
+ run_pingpong_test<DeterministicAtomic, false, true>(1000);
+}
+
+TEST(Baton, pingpong_multi_poster_nonblocking) {
+ DSched sched(DSched::uniform(0));
+
+ run_pingpong_test<DeterministicAtomic, false, false>(1000);
+}
+
+/// Timed wait tests - Nonblocking Baton does not support timed_wait()
+
+// Timed wait basic system clock tests
+
+TEST(Baton, timed_wait_basic_system_clock_single_poster) {
+ run_basic_timed_wait_tests<std::atomic, std::chrono::system_clock, true>();
+ run_basic_timed_wait_tests<
+ EmulatedFutexAtomic,
+ std::chrono::system_clock,
+ true>();
+ run_basic_timed_wait_tests<
+ DeterministicAtomic,
+ std::chrono::system_clock,
+ true>();
}
-TEST(Baton, timed_wait_basic_system_clock) {
- run_basic_timed_wait_tests<std::atomic, std::chrono::system_clock>();
- run_basic_timed_wait_tests<EmulatedFutexAtomic, std::chrono::system_clock>();
- run_basic_timed_wait_tests<DeterministicAtomic, std::chrono::system_clock>();
+TEST(Baton, timed_wait_basic_system_clock_multi_poster) {
+ run_basic_timed_wait_tests<std::atomic, std::chrono::system_clock, false>();
+ run_basic_timed_wait_tests<
+ EmulatedFutexAtomic,
+ std::chrono::system_clock,
+ false>();
+ run_basic_timed_wait_tests<
+ DeterministicAtomic,
+ std::chrono::system_clock,
+ false>();
}
-TEST(Baton, timed_wait_timeout_system_clock) {
- run_timed_wait_tmo_tests<std::atomic, std::chrono::system_clock>();
- run_timed_wait_tmo_tests<EmulatedFutexAtomic, std::chrono::system_clock>();
- run_timed_wait_tmo_tests<DeterministicAtomic, std::chrono::system_clock>();
+// Timed wait timeout system clock tests
+
+TEST(Baton, timed_wait_timeout_system_clock_single_poster) {
+ run_timed_wait_tmo_tests<std::atomic, std::chrono::system_clock, true>();
+ run_timed_wait_tmo_tests<
+ EmulatedFutexAtomic,
+ std::chrono::system_clock,
+ true>();
+ run_timed_wait_tmo_tests<
+ DeterministicAtomic,
+ std::chrono::system_clock,
+ true>();
}
-TEST(Baton, timed_wait_system_clock) {
- run_timed_wait_regular_test<std::atomic, std::chrono::system_clock>();
- run_timed_wait_regular_test<EmulatedFutexAtomic, std::chrono::system_clock>();
- run_timed_wait_regular_test<DeterministicAtomic, std::chrono::system_clock>();
+TEST(Baton, timed_wait_timeout_system_clock_multi_poster) {
+ run_timed_wait_tmo_tests<std::atomic, std::chrono::system_clock, false>();
+ run_timed_wait_tmo_tests<
+ EmulatedFutexAtomic,
+ std::chrono::system_clock,
+ false>();
+ run_timed_wait_tmo_tests<
+ DeterministicAtomic,
+ std::chrono::system_clock,
+ false>();
+}
+
+// Timed wait regular system clock tests
+
+TEST(Baton, timed_wait_system_clock_single_poster) {
+ run_timed_wait_regular_test<std::atomic, std::chrono::system_clock, true>();
+ run_timed_wait_regular_test<
+ EmulatedFutexAtomic,
+ std::chrono::system_clock,
+ true>();
+ run_timed_wait_regular_test<
+ DeterministicAtomic,
+ std::chrono::system_clock,
+ true>();
}
-TEST(Baton, timed_wait_basic_steady_clock) {
- run_basic_timed_wait_tests<std::atomic, std::chrono::steady_clock>();
- run_basic_timed_wait_tests<EmulatedFutexAtomic, std::chrono::steady_clock>();
- run_basic_timed_wait_tests<DeterministicAtomic, std::chrono::steady_clock>();
+TEST(Baton, timed_wait_system_clock_multi_poster) {
+ run_timed_wait_regular_test<std::atomic, std::chrono::system_clock, false>();
+ run_timed_wait_regular_test<
+ EmulatedFutexAtomic,
+ std::chrono::system_clock,
+ false>();
+ run_timed_wait_regular_test<
+ DeterministicAtomic,
+ std::chrono::system_clock,
+ false>();
+}
+
+// Timed wait basic steady clock tests
+
+TEST(Baton, timed_wait_basic_steady_clock_single_poster) {
+ run_basic_timed_wait_tests<std::atomic, std::chrono::steady_clock, true>();
+ run_basic_timed_wait_tests<
+ EmulatedFutexAtomic,
+ std::chrono::steady_clock,
+ true>();
+ run_basic_timed_wait_tests<
+ DeterministicAtomic,
+ std::chrono::steady_clock,
+ true>();
+}
+
+TEST(Baton, timed_wait_basic_steady_clock_multi_poster) {
+ run_basic_timed_wait_tests<std::atomic, std::chrono::steady_clock, false>();
+ run_basic_timed_wait_tests<
+ EmulatedFutexAtomic,
+ std::chrono::steady_clock,
+ false>();
+ run_basic_timed_wait_tests<
+ DeterministicAtomic,
+ std::chrono::steady_clock,
+ false>();
+}
+
+// Timed wait timeout steady clock tests
+
+TEST(Baton, timed_wait_timeout_steady_clock_single_poster) {
+ run_timed_wait_tmo_tests<std::atomic, std::chrono::steady_clock, true>();
+ run_timed_wait_tmo_tests<
+ EmulatedFutexAtomic,
+ std::chrono::steady_clock,
+ true>();
+ run_timed_wait_tmo_tests<
+ DeterministicAtomic,
+ std::chrono::steady_clock,
+ true>();
+}
+
+TEST(Baton, timed_wait_timeout_steady_clock_multi_poster) {
+ run_timed_wait_tmo_tests<std::atomic, std::chrono::steady_clock, false>();
+ run_timed_wait_tmo_tests<
+ EmulatedFutexAtomic,
+ std::chrono::steady_clock,
+ false>();
+ run_timed_wait_tmo_tests<
+ DeterministicAtomic,
+ std::chrono::steady_clock,
+ false>();
+}
+
+// Timed wait regular steady clock tests
+
+TEST(Baton, timed_wait_steady_clock_single_poster) {
+ run_timed_wait_regular_test<std::atomic, std::chrono::steady_clock, true>();
+ run_timed_wait_regular_test<
+ EmulatedFutexAtomic,
+ std::chrono::steady_clock,
+ true>();
+ run_timed_wait_regular_test<
+ DeterministicAtomic,
+ std::chrono::steady_clock,
+ true>();
+}
+
+TEST(Baton, timed_wait_steady_clock_multi_poster) {
+ run_timed_wait_regular_test<std::atomic, std::chrono::steady_clock, false>();
+ run_timed_wait_regular_test<
+ EmulatedFutexAtomic,
+ std::chrono::steady_clock,
+ false>();
+ run_timed_wait_regular_test<
+ DeterministicAtomic,
+ std::chrono::steady_clock,
+ false>();
+}
+
+/// Try wait tests
+
+TEST(Baton, try_wait_single_poster_blocking) {
+ run_try_wait_tests<std::atomic, true, true>();
+ run_try_wait_tests<EmulatedFutexAtomic, true, true>();
+ run_try_wait_tests<DeterministicAtomic, true, true>();
+}
+
+TEST(Baton, try_wait_single_poster_nonblocking) {
+ run_try_wait_tests<std::atomic, true, false>();
+ run_try_wait_tests<EmulatedFutexAtomic, true, false>();
+ run_try_wait_tests<DeterministicAtomic, true, false>();
+}
+
+TEST(Baton, try_wait_multi_poster_blocking) {
+ run_try_wait_tests<std::atomic, false, true>();
+ run_try_wait_tests<EmulatedFutexAtomic, false, true>();
+ run_try_wait_tests<DeterministicAtomic, false, true>();
+}
+
+TEST(Baton, try_wait_multi_poster_nonblocking) {
+ run_try_wait_tests<std::atomic, false, false>();
+ run_try_wait_tests<EmulatedFutexAtomic, false, false>();
+ run_try_wait_tests<DeterministicAtomic, false, false>();
+}
+
+/// Multi-producer tests
+
+TEST(Baton, multi_producer_single_poster_blocking) {
+ run_try_wait_tests<std::atomic, true, true>();
+ run_try_wait_tests<EmulatedFutexAtomic, true, true>();
+ run_try_wait_tests<DeterministicAtomic, true, true>();
}
-TEST(Baton, timed_wait_timeout_steady_clock) {
- run_timed_wait_tmo_tests<std::atomic, std::chrono::steady_clock>();
- run_timed_wait_tmo_tests<EmulatedFutexAtomic, std::chrono::steady_clock>();
- run_timed_wait_tmo_tests<DeterministicAtomic, std::chrono::steady_clock>();
+TEST(Baton, multi_producer_single_poster_nonblocking) {
+ run_try_wait_tests<std::atomic, true, false>();
+ run_try_wait_tests<EmulatedFutexAtomic, true, false>();
+ run_try_wait_tests<DeterministicAtomic, true, false>();
}
-TEST(Baton, timed_wait_steady_clock) {
- run_timed_wait_regular_test<std::atomic, std::chrono::steady_clock>();
- run_timed_wait_regular_test<EmulatedFutexAtomic, std::chrono::steady_clock>();
- run_timed_wait_regular_test<DeterministicAtomic, std::chrono::steady_clock>();
+TEST(Baton, multi_producer_multi_poster_blocking) {
+ run_try_wait_tests<std::atomic, false, true>();
+ run_try_wait_tests<EmulatedFutexAtomic, false, true>();
+ run_try_wait_tests<DeterministicAtomic, false, true>();
}
-TEST(Baton, try_wait) {
- run_try_wait_tests<std::atomic>();
- run_try_wait_tests<EmulatedFutexAtomic>();
- run_try_wait_tests<DeterministicAtomic>();
+TEST(Baton, multi_producer_multi_poster_nonblocking) {
+ run_try_wait_tests<std::atomic, false, false>();
+ run_try_wait_tests<EmulatedFutexAtomic, false, false>();
+ run_try_wait_tests<DeterministicAtomic, false, false>();
}
typedef DeterministicSchedule DSched;
-template <template <typename> class Atom>
+template <template <typename> class Atom, bool SinglePoster, bool Blocking>
+void run_basic_test() {
+ Baton<Atom, SinglePoster, Blocking> b;
+ b.post();
+ b.wait();
+}
+
+template <template <typename> class Atom, bool SinglePoster, bool Blocking>
void run_pingpong_test(int numRounds) {
- Baton<Atom> batons[17];
- Baton<Atom>& a = batons[0];
- Baton<Atom>& b = batons[16]; // to get it on a different cache line
+ using B = Baton<Atom, SinglePoster, Blocking>;
+ B batons[17];
+ B& a = batons[0];
+ B& b = batons[16]; // to get it on a different cache line
auto thr = DSched::thread([&] {
for (int i = 0; i < numRounds; ++i) {
a.wait();
DSched::join(thr);
}
-template <template <typename> class Atom, typename Clock>
+template <template <typename> class Atom, typename Clock, bool SinglePoster>
void run_basic_timed_wait_tests() {
- Baton<Atom> b;
+ Baton<Atom, SinglePoster> b;
b.post();
// tests if early delivery works fine
EXPECT_TRUE(b.timed_wait(Clock::now()));
}
-template <template <typename> class Atom, typename Clock>
+template <template <typename> class Atom, typename Clock, bool SinglePoster>
void run_timed_wait_tmo_tests() {
- Baton<Atom> b;
+ Baton<Atom, SinglePoster> b;
auto thr = DSched::thread([&] {
bool rv = b.timed_wait(Clock::now() + std::chrono::milliseconds(1));
DSched::join(thr);
}
-template <template <typename> class Atom, typename Clock>
+template <template <typename> class Atom, typename Clock, bool SinglePoster>
void run_timed_wait_regular_test() {
- Baton<Atom> b;
+ Baton<Atom, SinglePoster> b;
auto thr = DSched::thread([&] {
// To wait forever we'd like to use time_point<Clock>::max, but
DSched::join(thr);
}
-template <template <typename> class Atom>
+template <template <typename> class Atom, bool SinglePoster, bool Blocking>
void run_try_wait_tests() {
- Baton<Atom> b;
+ Baton<Atom, SinglePoster, Blocking> b;
EXPECT_FALSE(b.try_wait());
b.post();
EXPECT_TRUE(b.try_wait());
}
+
+template <template <typename> class Atom, bool SinglePoster, bool Blocking>
+void run_multi_producer_tests() {
+ constexpr int NPROD = 5;
+ Baton<Atom, SinglePoster, Blocking> local_ping[NPROD];
+ Baton<Atom, SinglePoster, Blocking> local_pong[NPROD];
+ Baton<Atom, /* SingleProducer = */ false, Blocking> global;
+ Baton<Atom, SinglePoster, Blocking> shutdown;
+
+ std::thread prod[NPROD];
+ for (int i = 0; i < NPROD; ++i) {
+ prod[i] = DSched::thread([&, i] {
+ if (!std::is_same<Atom<int>, DeterministicAtomic<int>>::value) {
+ // If we are using std::atomic (or EmulatedFutexAtomic) then
+ // a variable sleep here will make it more likely that
+ // global.post()-s will span more than one global.wait() by
+ // the consumer thread and for the latter to block (if the
+ // global baton is blocking). For DeterministicAtomic, we just
+ // rely on DeterministicSchedule to do the scheduling. The
+ // test won't fail if we lose the race, we just don't get
+ // coverage.
+ for (int j = 0; j < i; ++j) {
+ std::this_thread::sleep_for(std::chrono::microseconds(1));
+ }
+ }
+ local_ping[i].post();
+ global.post();
+ local_pong[i].wait();
+ });
+ }
+
+ auto cons = DSched::thread([&] {
+ while (true) {
+ global.wait();
+ global.reset();
+ if (shutdown.try_wait()) {
+ return;
+ }
+ for (int i = 0; i < NPROD; ++i) {
+ if (local_ping.try_wait()) {
+ local_ping.reset();
+ local_pong.post();
+ }
+ }
+ }
+ });
+
+ for (auto& t : prod) {
+ DSched::join(t);
+ }
+
+ global.post();
+ shutdown.post();
+ DSched::join(cons);
}
-}
+
+} // namespace test {
+} // namespace folly {