///////////////////////////////////////////////////////
// compatibility implementation using standard C++ API
+// Our emulated futex uses 4096 lists of wait nodes. There are two levels
+// of locking: a per-list mutex that controls access to the list and a
+// per-node mutex, condvar, and bool that are used for the actual wakeups.
+// The per-node mutex allows us to do precise wakeups without thundering
+// herds.
+
struct EmulatedFutexWaitNode : public boost::intrusive::list_base_hook<> {
void* const addr_;
const uint32_t waitMask_;
- bool hasTimeout_;
+
+ // tricky: hold both bucket and node mutex to write, either to read
bool signaled_;
+ std::mutex mutex_;
std::condition_variable cond_;
- EmulatedFutexWaitNode(void* addr, uint32_t waitMask, bool hasTimeout)
+ EmulatedFutexWaitNode(void* addr, uint32_t waitMask)
: addr_(addr)
, waitMask_(waitMask)
- , hasTimeout_(hasTimeout)
, signaled_(false)
{
}
int emulatedFutexWake(void* addr, int count, uint32_t waitMask) {
auto& bucket = EmulatedFutexBucket::bucketFor(addr);
+ std::unique_lock<std::mutex> bucketLock(bucket.mutex_);
int numAwoken = 0;
- boost::intrusive::list<EmulatedFutexWaitNode> deferredWakeups;
-
- {
- std::unique_lock<std::mutex> lock(bucket.mutex_);
-
- for (auto iter = bucket.waiters_.begin();
- numAwoken < count && iter != bucket.waiters_.end(); ) {
- auto current = iter;
- auto& node = *iter++;
- if (node.addr_ == addr && (node.waitMask_ & waitMask) != 0) {
- // We unlink, but waiter destroys the node. We must signal timed
- // waiters under the lock, to avoid a race where we release the lock,
- // the waiter times out and deletes the node, and then we try to
- // signal it. This problem doesn't exist for unbounded waiters,
- // so for them we optimize their wakeup by releasing the lock first.
- bucket.waiters_.erase(current);
- if (node.hasTimeout_) {
- node.signaled_ = true;
- node.cond_.notify_one();
- } else {
- deferredWakeups.push_back(node);
- }
- ++numAwoken;
- }
+ for (auto iter = bucket.waiters_.begin();
+ numAwoken < count && iter != bucket.waiters_.end(); ) {
+ auto current = iter;
+ auto& node = *iter++;
+ if (node.addr_ == addr && (node.waitMask_ & waitMask) != 0) {
+ ++numAwoken;
+
+ // we unlink, but waiter destroys the node
+ bucket.waiters_.erase(current);
+
+ std::unique_lock<std::mutex> nodeLock(node.mutex_);
+ node.signaled_ = true;
+ node.cond_.notify_one();
}
}
-
- while (!deferredWakeups.empty()) {
- auto& node = deferredWakeups.front();
- deferredWakeups.pop_front();
- node.signaled_ = true;
- node.cond_.notify_one();
- }
-
return numAwoken;
}
time_point<system_clock>* absSystemTime,
time_point<steady_clock>* absSteadyTime,
uint32_t waitMask) {
- bool hasTimeout = absSystemTime != nullptr || absSteadyTime != nullptr;
- EmulatedFutexWaitNode node(addr, waitMask, hasTimeout);
-
auto& bucket = EmulatedFutexBucket::bucketFor(addr);
- std::unique_lock<std::mutex> lock(bucket.mutex_);
+ EmulatedFutexWaitNode node(addr, waitMask);
- uint32_t actual;
- memcpy(&actual, addr, sizeof(uint32_t));
- if (actual != expected) {
- return FutexResult::VALUE_CHANGED;
- }
+ {
+ std::unique_lock<std::mutex> bucketLock(bucket.mutex_);
+
+ uint32_t actual;
+ memcpy(&actual, addr, sizeof(uint32_t));
+ if (actual != expected) {
+ return FutexResult::VALUE_CHANGED;
+ }
+
+ bucket.waiters_.push_back(node);
+ } // bucketLock scope
- bucket.waiters_.push_back(node);
- while (!node.signaled_) {
- std::cv_status status = std::cv_status::no_timeout;
- if (absSystemTime != nullptr) {
- status = node.cond_.wait_until(lock, *absSystemTime);
- } else if (absSteadyTime != nullptr) {
- status = node.cond_.wait_until(lock, *absSteadyTime);
- } else {
- node.cond_.wait(lock);
+ std::cv_status status = std::cv_status::no_timeout;
+ {
+ std::unique_lock<std::mutex> nodeLock(node.mutex_);
+ while (!node.signaled_ && status != std::cv_status::timeout) {
+ if (absSystemTime != nullptr) {
+ status = node.cond_.wait_until(nodeLock, *absSystemTime);
+ } else if (absSteadyTime != nullptr) {
+ status = node.cond_.wait_until(nodeLock, *absSteadyTime);
+ } else {
+ node.cond_.wait(nodeLock);
+ }
}
+ } // nodeLock scope
- if (status == std::cv_status::timeout) {
+ if (status == std::cv_status::timeout) {
+ // it's not really a timeout until we unlink the unsignaled node
+ std::unique_lock<std::mutex> bucketLock(bucket.mutex_);
+ if (!node.signaled_) {
bucket.waiters_.erase(bucket.waiters_.iterator_to(node));
return FutexResult::TIMEDOUT;
}
DSched::join(thr);
}
-template<template<typename> class Atom>
-void run_wait_until_tests();
+template <template<typename> class Atom, typename Clock>
+void liveClockWaitUntilTests() {
+ Futex<Atom> f(0);
-template <typename Clock>
-void stdAtomicWaitUntilTests() {
- Futex<std::atomic> f(0);
-
- auto thrA = DSched::thread([&]{
- while (true) {
- typename Clock::time_point nowPlus2s = Clock::now() + seconds(2);
- auto res = f.futexWaitUntil(0, nowPlus2s);
- EXPECT_TRUE(res == FutexResult::TIMEDOUT || res == FutexResult::AWOKEN);
- if (res == FutexResult::AWOKEN) {
- break;
+ for (int stress = 0; stress < 1000; ++stress) {
+ auto fp = &f; // workaround for t5336595
+ auto thrA = DSched::thread([fp,stress]{
+ while (true) {
+ auto deadline = Clock::now() + microseconds(1 << (stress % 20));
+ auto res = fp->futexWaitUntil(0, deadline);
+ EXPECT_TRUE(res == FutexResult::TIMEDOUT || res == FutexResult::AWOKEN);
+ if (res == FutexResult::AWOKEN) {
+ break;
+ }
}
+ });
+
+ while (f.futexWake() != 1) {
+ std::this_thread::yield();
}
- });
- while (f.futexWake() != 1) {
- std::this_thread::yield();
+ DSched::join(thrA);
}
- DSched::join(thrA);
-
auto start = Clock::now();
EXPECT_EQ(f.futexWaitUntil(0, start + milliseconds(100)),
FutexResult::TIMEDOUT);
LOG(INFO) << "Futex wait timed out after waiting for "
<< duration_cast<milliseconds>(Clock::now() - start).count()
- << "ms";
+ << "ms, should be ~100ms";
}
template <typename Clock>
EXPECT_TRUE(res == FutexResult::TIMEDOUT || res == FutexResult::INTERRUPTED);
}
-template <>
-void run_wait_until_tests<std::atomic>() {
- stdAtomicWaitUntilTests<system_clock>();
- stdAtomicWaitUntilTests<steady_clock>();
+template<template<typename> class Atom>
+void run_wait_until_tests() {
+ liveClockWaitUntilTests<Atom, system_clock>();
+ liveClockWaitUntilTests<Atom, steady_clock>();
}
template <>
run_wait_until_tests<std::atomic>();
}
+TEST(Futex, basic_emulated) {
+ run_basic_tests<EmulatedFutexAtomic>();
+ run_wait_until_tests<EmulatedFutexAtomic>();
+}
+
TEST(Futex, basic_deterministic) {
DSched sched(DSched::uniform(0));
run_basic_tests<DeterministicAtomic>();