From: Nathan Bronson Date: Thu, 9 Oct 2014 02:44:16 +0000 (-0700) Subject: add per node mutex for emulated futex X-Git-Tag: v0.22.0~289 X-Git-Url: http://demsky.eecs.uci.edu/git/?a=commitdiff_plain;h=8961aec70edc04d3e9f43fb28cc73d084896d8e1;p=folly.git add per node mutex for emulated futex Summary: The emulated futex (not used on Linux) has an optimization to defer notification until after the bucket mutex has been unlocked, to avoid lock collisions between the waiter and waker. That code will have a use-after-free problem if the waiter's condition_variable has a spurious wakeup, which is allowed by the spec. That code also doesn't do anything about contention between multiple waiters. This diff adds a mutex to each wait node, relieving the waiters from having to acquire the bucket lock on wakeup. Rather than trying to perform a racy late notification, we just make sure to release the node lock immediately after calling notify_one, which seems to have the desired effect. Test Plan: 1) existing unit tests 2) new unit tests Reviewed By: delong.j@fb.com Subscribers: boseant, njormrod FB internal diff: D1602360 --- diff --git a/folly/detail/Futex.cpp b/folly/detail/Futex.cpp index 489cec39..d1861449 100644 --- a/folly/detail/Futex.cpp +++ b/folly/detail/Futex.cpp @@ -123,17 +123,24 @@ FutexResult nativeFutexWaitImpl(void* addr, /////////////////////////////////////////////////////// // compatibility implementation using standard C++ API +// Our emulated futex uses 4096 lists of wait nodes. There are two levels +// of locking: a per-list mutex that controls access to the list and a +// per-node mutex, condvar, and bool that are used for the actual wakeups. +// The per-node mutex allows us to do precise wakeups without thundering +// herds. + struct EmulatedFutexWaitNode : public boost::intrusive::list_base_hook<> { void* const addr_; const uint32_t waitMask_; - bool hasTimeout_; + + // tricky: hold both bucket and node mutex to write, either to read bool signaled_; + std::mutex mutex_; std::condition_variable cond_; - EmulatedFutexWaitNode(void* addr, uint32_t waitMask, bool hasTimeout) + EmulatedFutexWaitNode(void* addr, uint32_t waitMask) : addr_(addr) , waitMask_(waitMask) - , hasTimeout_(hasTimeout) , signaled_(false) { } @@ -162,42 +169,24 @@ std::once_flag EmulatedFutexBucket::gBucketInit; int emulatedFutexWake(void* addr, int count, uint32_t waitMask) { auto& bucket = EmulatedFutexBucket::bucketFor(addr); + std::unique_lock bucketLock(bucket.mutex_); int numAwoken = 0; - boost::intrusive::list deferredWakeups; - - { - std::unique_lock lock(bucket.mutex_); - - for (auto iter = bucket.waiters_.begin(); - numAwoken < count && iter != bucket.waiters_.end(); ) { - auto current = iter; - auto& node = *iter++; - if (node.addr_ == addr && (node.waitMask_ & waitMask) != 0) { - // We unlink, but waiter destroys the node. We must signal timed - // waiters under the lock, to avoid a race where we release the lock, - // the waiter times out and deletes the node, and then we try to - // signal it. This problem doesn't exist for unbounded waiters, - // so for them we optimize their wakeup by releasing the lock first. - bucket.waiters_.erase(current); - if (node.hasTimeout_) { - node.signaled_ = true; - node.cond_.notify_one(); - } else { - deferredWakeups.push_back(node); - } - ++numAwoken; - } + for (auto iter = bucket.waiters_.begin(); + numAwoken < count && iter != bucket.waiters_.end(); ) { + auto current = iter; + auto& node = *iter++; + if (node.addr_ == addr && (node.waitMask_ & waitMask) != 0) { + ++numAwoken; + + // we unlink, but waiter destroys the node + bucket.waiters_.erase(current); + + std::unique_lock nodeLock(node.mutex_); + node.signaled_ = true; + node.cond_.notify_one(); } } - - while (!deferredWakeups.empty()) { - auto& node = deferredWakeups.front(); - deferredWakeups.pop_front(); - node.signaled_ = true; - node.cond_.notify_one(); - } - return numAwoken; } @@ -207,30 +196,39 @@ FutexResult emulatedFutexWaitImpl( time_point* absSystemTime, time_point* absSteadyTime, uint32_t waitMask) { - bool hasTimeout = absSystemTime != nullptr || absSteadyTime != nullptr; - EmulatedFutexWaitNode node(addr, waitMask, hasTimeout); - auto& bucket = EmulatedFutexBucket::bucketFor(addr); - std::unique_lock lock(bucket.mutex_); + EmulatedFutexWaitNode node(addr, waitMask); - uint32_t actual; - memcpy(&actual, addr, sizeof(uint32_t)); - if (actual != expected) { - return FutexResult::VALUE_CHANGED; - } + { + std::unique_lock bucketLock(bucket.mutex_); + + uint32_t actual; + memcpy(&actual, addr, sizeof(uint32_t)); + if (actual != expected) { + return FutexResult::VALUE_CHANGED; + } + + bucket.waiters_.push_back(node); + } // bucketLock scope - bucket.waiters_.push_back(node); - while (!node.signaled_) { - std::cv_status status = std::cv_status::no_timeout; - if (absSystemTime != nullptr) { - status = node.cond_.wait_until(lock, *absSystemTime); - } else if (absSteadyTime != nullptr) { - status = node.cond_.wait_until(lock, *absSteadyTime); - } else { - node.cond_.wait(lock); + std::cv_status status = std::cv_status::no_timeout; + { + std::unique_lock nodeLock(node.mutex_); + while (!node.signaled_ && status != std::cv_status::timeout) { + if (absSystemTime != nullptr) { + status = node.cond_.wait_until(nodeLock, *absSystemTime); + } else if (absSteadyTime != nullptr) { + status = node.cond_.wait_until(nodeLock, *absSteadyTime); + } else { + node.cond_.wait(nodeLock); + } } + } // nodeLock scope - if (status == std::cv_status::timeout) { + if (status == std::cv_status::timeout) { + // it's not really a timeout until we unlink the unsignaled node + std::unique_lock bucketLock(bucket.mutex_); + if (!node.signaled_) { bucket.waiters_.erase(bucket.waiters_.iterator_to(node)); return FutexResult::TIMEDOUT; } diff --git a/folly/test/FutexTest.cpp b/folly/test/FutexTest.cpp index c1063cf8..3c5c63d7 100644 --- a/folly/test/FutexTest.cpp +++ b/folly/test/FutexTest.cpp @@ -54,36 +54,36 @@ void run_basic_tests() { DSched::join(thr); } -template class Atom> -void run_wait_until_tests(); +template class Atom, typename Clock> +void liveClockWaitUntilTests() { + Futex f(0); -template -void stdAtomicWaitUntilTests() { - Futex f(0); - - auto thrA = DSched::thread([&]{ - while (true) { - typename Clock::time_point nowPlus2s = Clock::now() + seconds(2); - auto res = f.futexWaitUntil(0, nowPlus2s); - EXPECT_TRUE(res == FutexResult::TIMEDOUT || res == FutexResult::AWOKEN); - if (res == FutexResult::AWOKEN) { - break; + for (int stress = 0; stress < 1000; ++stress) { + auto fp = &f; // workaround for t5336595 + auto thrA = DSched::thread([fp,stress]{ + while (true) { + auto deadline = Clock::now() + microseconds(1 << (stress % 20)); + auto res = fp->futexWaitUntil(0, deadline); + EXPECT_TRUE(res == FutexResult::TIMEDOUT || res == FutexResult::AWOKEN); + if (res == FutexResult::AWOKEN) { + break; + } } + }); + + while (f.futexWake() != 1) { + std::this_thread::yield(); } - }); - while (f.futexWake() != 1) { - std::this_thread::yield(); + DSched::join(thrA); } - DSched::join(thrA); - auto start = Clock::now(); EXPECT_EQ(f.futexWaitUntil(0, start + milliseconds(100)), FutexResult::TIMEDOUT); LOG(INFO) << "Futex wait timed out after waiting for " << duration_cast(Clock::now() - start).count() - << "ms"; + << "ms, should be ~100ms"; } template @@ -96,10 +96,10 @@ void deterministicAtomicWaitUntilTests() { EXPECT_TRUE(res == FutexResult::TIMEDOUT || res == FutexResult::INTERRUPTED); } -template <> -void run_wait_until_tests() { - stdAtomicWaitUntilTests(); - stdAtomicWaitUntilTests(); +template class Atom> +void run_wait_until_tests() { + liveClockWaitUntilTests(); + liveClockWaitUntilTests(); } template <> @@ -177,6 +177,11 @@ TEST(Futex, basic_live) { run_wait_until_tests(); } +TEST(Futex, basic_emulated) { + run_basic_tests(); + run_wait_until_tests(); +} + TEST(Futex, basic_deterministic) { DSched sched(DSched::uniform(0)); run_basic_tests();