From a95dbe434014196d44fa03d0f60d5efef662fd20 Mon Sep 17 00:00:00 2001 From: Delyan Kratunov Date: Thu, 28 May 2015 17:29:46 -0700 Subject: [PATCH] Extract TurnSequencer to detail/TurnSequencer Summary: A completely mechanical transformation that moves TurnSequencer into its own header, to be used by other collections. Test Plan: Existing tests. Reviewed By: jmkaldor@fb.com Subscribers: folly-diffs@, yfeldblum, chalfant FB internal diff: D2065108 Signature: t1:2065108:1431474613:a3a9d063ebd2bedb31abb37be5fd33f0fb3eca6a --- folly/MPMCQueue.h | 203 +----------------------------- folly/Makefile.am | 1 + folly/detail/TurnSequencer.h | 231 +++++++++++++++++++++++++++++++++++ 3 files changed, 233 insertions(+), 202 deletions(-) create mode 100644 folly/detail/TurnSequencer.h diff --git a/folly/MPMCQueue.h b/folly/MPMCQueue.h index 2ab40033..a2681954 100644 --- a/folly/MPMCQueue.h +++ b/folly/MPMCQueue.h @@ -27,7 +27,7 @@ #include #include -#include +#include namespace folly { @@ -564,207 +564,6 @@ class MPMCQueue : boost::noncopyable { namespace detail { -/// A TurnSequencer allows threads to order their execution according to -/// a monotonically increasing (with wraparound) "turn" value. The two -/// operations provided are to wait for turn T, and to move to the next -/// turn. Every thread that is waiting for T must have arrived before -/// that turn is marked completed (for MPMCQueue only one thread waits -/// for any particular turn, so this is trivially true). -/// -/// TurnSequencer's state_ holds 26 bits of the current turn (shifted -/// left by 6), along with a 6 bit saturating value that records the -/// maximum waiter minus the current turn. Wraparound of the turn space -/// is expected and handled. This allows us to atomically adjust the -/// number of outstanding waiters when we perform a FUTEX_WAKE operation. -/// Compare this strategy to sem_t's separate num_waiters field, which -/// isn't decremented until after the waiting thread gets scheduled, -/// during which time more enqueues might have occurred and made pointless -/// FUTEX_WAKE calls. -/// -/// TurnSequencer uses futex() directly. It is optimized for the -/// case that the highest awaited turn is 32 or less higher than the -/// current turn. We use the FUTEX_WAIT_BITSET variant, which lets -/// us embed 32 separate wakeup channels in a single futex. See -/// http://locklessinc.com/articles/futex_cheat_sheet for a description. -/// -/// We only need to keep exact track of the delta between the current -/// turn and the maximum waiter for the 32 turns that follow the current -/// one, because waiters at turn t+32 will be awoken at turn t. At that -/// point they can then adjust the delta using the higher base. Since we -/// need to encode waiter deltas of 0 to 32 inclusive, we use 6 bits. -/// We actually store waiter deltas up to 63, since that might reduce -/// the number of CAS operations a tiny bit. -/// -/// To avoid some futex() calls entirely, TurnSequencer uses an adaptive -/// spin cutoff before waiting. The overheads (and convergence rate) -/// of separately tracking the spin cutoff for each TurnSequencer would -/// be prohibitive, so the actual storage is passed in as a parameter and -/// updated atomically. This also lets the caller use different adaptive -/// cutoffs for different operations (read versus write, for example). -/// To avoid contention, the spin cutoff is only updated when requested -/// by the caller. -template class Atom> -struct TurnSequencer { - explicit TurnSequencer(const uint32_t firstTurn = 0) noexcept - : state_(encode(firstTurn << kTurnShift, 0)) - {} - - /// Returns true iff a call to waitForTurn(turn, ...) won't block - bool isTurn(const uint32_t turn) const noexcept { - auto state = state_.load(std::memory_order_acquire); - return decodeCurrentSturn(state) == (turn << kTurnShift); - } - - // Internally we always work with shifted turn values, which makes the - // truncation and wraparound work correctly. This leaves us bits at - // the bottom to store the number of waiters. We call shifted turns - // "sturns" inside this class. - - /// Blocks the current thread until turn has arrived. If - /// updateSpinCutoff is true then this will spin for up to kMaxSpins tries - /// before blocking and will adjust spinCutoff based on the results, - /// otherwise it will spin for at most spinCutoff spins. - void waitForTurn(const uint32_t turn, - Atom& spinCutoff, - const bool updateSpinCutoff) noexcept { - uint32_t prevThresh = spinCutoff.load(std::memory_order_relaxed); - const uint32_t effectiveSpinCutoff = - updateSpinCutoff || prevThresh == 0 ? kMaxSpins : prevThresh; - - uint32_t tries; - const uint32_t sturn = turn << kTurnShift; - for (tries = 0; ; ++tries) { - uint32_t state = state_.load(std::memory_order_acquire); - uint32_t current_sturn = decodeCurrentSturn(state); - if (current_sturn == sturn) { - break; - } - - // wrap-safe version of assert(current_sturn < sturn) - assert(sturn - current_sturn < std::numeric_limits::max() / 2); - - // the first effectSpinCutoff tries are spins, after that we will - // record ourself as a waiter and block with futexWait - if (tries < effectiveSpinCutoff) { - asm volatile ("pause"); - continue; - } - - uint32_t current_max_waiter_delta = decodeMaxWaitersDelta(state); - uint32_t our_waiter_delta = (sturn - current_sturn) >> kTurnShift; - uint32_t new_state; - if (our_waiter_delta <= current_max_waiter_delta) { - // state already records us as waiters, probably because this - // isn't our first time around this loop - new_state = state; - } else { - new_state = encode(current_sturn, our_waiter_delta); - if (state != new_state && - !state_.compare_exchange_strong(state, new_state)) { - continue; - } - } - state_.futexWait(new_state, futexChannel(turn)); - } - - if (updateSpinCutoff || prevThresh == 0) { - // if we hit kMaxSpins then spinning was pointless, so the right - // spinCutoff is kMinSpins - uint32_t target; - if (tries >= kMaxSpins) { - target = kMinSpins; - } else { - // to account for variations, we allow ourself to spin 2*N when - // we think that N is actually required in order to succeed - target = std::min(kMaxSpins, - std::max(kMinSpins, tries * 2)); - } - - if (prevThresh == 0) { - // bootstrap - spinCutoff.store(target); - } else { - // try once, keep moving if CAS fails. Exponential moving average - // with alpha of 7/8 - // Be careful that the quantity we add to prevThresh is signed. - spinCutoff.compare_exchange_weak( - prevThresh, prevThresh + int(target - prevThresh) / 8); - } - } - } - - /// Unblocks a thread running waitForTurn(turn + 1) - void completeTurn(const uint32_t turn) noexcept { - uint32_t state = state_.load(std::memory_order_acquire); - while (true) { - assert(state == encode(turn << kTurnShift, decodeMaxWaitersDelta(state))); - uint32_t max_waiter_delta = decodeMaxWaitersDelta(state); - uint32_t new_state = encode( - (turn + 1) << kTurnShift, - max_waiter_delta == 0 ? 0 : max_waiter_delta - 1); - if (state_.compare_exchange_strong(state, new_state)) { - if (max_waiter_delta != 0) { - state_.futexWake(std::numeric_limits::max(), - futexChannel(turn + 1)); - } - break; - } - // failing compare_exchange_strong updates first arg to the value - // that caused the failure, so no need to reread state_ - } - } - - /// Returns the least-most significant byte of the current uncompleted - /// turn. The full 32 bit turn cannot be recovered. - uint8_t uncompletedTurnLSB() const noexcept { - return state_.load(std::memory_order_acquire) >> kTurnShift; - } - - private: - enum : uint32_t { - /// kTurnShift counts the bits that are stolen to record the delta - /// between the current turn and the maximum waiter. It needs to be big - /// enough to record wait deltas of 0 to 32 inclusive. Waiters more - /// than 32 in the future will be woken up 32*n turns early (since - /// their BITSET will hit) and will adjust the waiter count again. - /// We go a bit beyond and let the waiter count go up to 63, which - /// is free and might save us a few CAS - kTurnShift = 6, - kWaitersMask = (1 << kTurnShift) - 1, - - /// The minimum spin count that we will adaptively select - kMinSpins = 20, - - /// The maximum spin count that we will adaptively select, and the - /// spin count that will be used when probing to get a new data point - /// for the adaptation - kMaxSpins = 2000, - }; - - /// This holds both the current turn, and the highest waiting turn, - /// stored as (current_turn << 6) | min(63, max(waited_turn - current_turn)) - Futex state_; - - /// Returns the bitmask to pass futexWait or futexWake when communicating - /// about the specified turn - int futexChannel(uint32_t turn) const noexcept { - return 1 << (turn & 31); - } - - uint32_t decodeCurrentSturn(uint32_t state) const noexcept { - return state & ~kWaitersMask; - } - - uint32_t decodeMaxWaitersDelta(uint32_t state) const noexcept { - return state & kWaitersMask; - } - - uint32_t encode(uint32_t currentSturn, uint32_t maxWaiterD) const noexcept { - return currentSturn | std::min(uint32_t{ kWaitersMask }, maxWaiterD); - } -}; - - /// SingleElementQueue implements a blocking queue that holds at most one /// item, and that requires its users to assign incrementing identifiers /// (turns) to each enqueue and dequeue operation. Note that the turns diff --git a/folly/Makefile.am b/folly/Makefile.am index 38776bdb..f5546b56 100644 --- a/folly/Makefile.am +++ b/folly/Makefile.am @@ -59,6 +59,7 @@ nobase_follyinclude_HEADERS = \ detail/SpinLockImpl.h \ detail/Stats.h \ detail/ThreadLocalDetail.h \ + detail/TurnSequencer.h \ detail/UncaughtExceptionCounter.h \ Demangle.h \ DiscriminatedPtr.h \ diff --git a/folly/detail/TurnSequencer.h b/folly/detail/TurnSequencer.h new file mode 100644 index 00000000..aafeaa16 --- /dev/null +++ b/folly/detail/TurnSequencer.h @@ -0,0 +1,231 @@ +/* + * Copyright 2015 Facebook, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#pragma once + +#include +#include +#include +#include + +#include + +namespace folly { + +namespace detail { + +/// A TurnSequencer allows threads to order their execution according to +/// a monotonically increasing (with wraparound) "turn" value. The two +/// operations provided are to wait for turn T, and to move to the next +/// turn. Every thread that is waiting for T must have arrived before +/// that turn is marked completed (for MPMCQueue only one thread waits +/// for any particular turn, so this is trivially true). +/// +/// TurnSequencer's state_ holds 26 bits of the current turn (shifted +/// left by 6), along with a 6 bit saturating value that records the +/// maximum waiter minus the current turn. Wraparound of the turn space +/// is expected and handled. This allows us to atomically adjust the +/// number of outstanding waiters when we perform a FUTEX_WAKE operation. +/// Compare this strategy to sem_t's separate num_waiters field, which +/// isn't decremented until after the waiting thread gets scheduled, +/// during which time more enqueues might have occurred and made pointless +/// FUTEX_WAKE calls. +/// +/// TurnSequencer uses futex() directly. It is optimized for the +/// case that the highest awaited turn is 32 or less higher than the +/// current turn. We use the FUTEX_WAIT_BITSET variant, which lets +/// us embed 32 separate wakeup channels in a single futex. See +/// http://locklessinc.com/articles/futex_cheat_sheet for a description. +/// +/// We only need to keep exact track of the delta between the current +/// turn and the maximum waiter for the 32 turns that follow the current +/// one, because waiters at turn t+32 will be awoken at turn t. At that +/// point they can then adjust the delta using the higher base. Since we +/// need to encode waiter deltas of 0 to 32 inclusive, we use 6 bits. +/// We actually store waiter deltas up to 63, since that might reduce +/// the number of CAS operations a tiny bit. +/// +/// To avoid some futex() calls entirely, TurnSequencer uses an adaptive +/// spin cutoff before waiting. The overheads (and convergence rate) +/// of separately tracking the spin cutoff for each TurnSequencer would +/// be prohibitive, so the actual storage is passed in as a parameter and +/// updated atomically. This also lets the caller use different adaptive +/// cutoffs for different operations (read versus write, for example). +/// To avoid contention, the spin cutoff is only updated when requested +/// by the caller. +template class Atom> +struct TurnSequencer { + explicit TurnSequencer(const uint32_t firstTurn = 0) noexcept + : state_(encode(firstTurn << kTurnShift, 0)) + {} + + /// Returns true iff a call to waitForTurn(turn, ...) won't block + bool isTurn(const uint32_t turn) const noexcept { + auto state = state_.load(std::memory_order_acquire); + return decodeCurrentSturn(state) == (turn << kTurnShift); + } + + // Internally we always work with shifted turn values, which makes the + // truncation and wraparound work correctly. This leaves us bits at + // the bottom to store the number of waiters. We call shifted turns + // "sturns" inside this class. + + /// Blocks the current thread until turn has arrived. If + /// updateSpinCutoff is true then this will spin for up to kMaxSpins tries + /// before blocking and will adjust spinCutoff based on the results, + /// otherwise it will spin for at most spinCutoff spins. + void waitForTurn(const uint32_t turn, + Atom& spinCutoff, + const bool updateSpinCutoff) noexcept { + uint32_t prevThresh = spinCutoff.load(std::memory_order_relaxed); + const uint32_t effectiveSpinCutoff = + updateSpinCutoff || prevThresh == 0 ? kMaxSpins : prevThresh; + + uint32_t tries; + const uint32_t sturn = turn << kTurnShift; + for (tries = 0; ; ++tries) { + uint32_t state = state_.load(std::memory_order_acquire); + uint32_t current_sturn = decodeCurrentSturn(state); + if (current_sturn == sturn) { + break; + } + + // wrap-safe version of assert(current_sturn < sturn) + assert(sturn - current_sturn < std::numeric_limits::max() / 2); + + // the first effectSpinCutoff tries are spins, after that we will + // record ourself as a waiter and block with futexWait + if (tries < effectiveSpinCutoff) { + asm volatile ("pause"); + continue; + } + + uint32_t current_max_waiter_delta = decodeMaxWaitersDelta(state); + uint32_t our_waiter_delta = (sturn - current_sturn) >> kTurnShift; + uint32_t new_state; + if (our_waiter_delta <= current_max_waiter_delta) { + // state already records us as waiters, probably because this + // isn't our first time around this loop + new_state = state; + } else { + new_state = encode(current_sturn, our_waiter_delta); + if (state != new_state && + !state_.compare_exchange_strong(state, new_state)) { + continue; + } + } + state_.futexWait(new_state, futexChannel(turn)); + } + + if (updateSpinCutoff || prevThresh == 0) { + // if we hit kMaxSpins then spinning was pointless, so the right + // spinCutoff is kMinSpins + uint32_t target; + if (tries >= kMaxSpins) { + target = kMinSpins; + } else { + // to account for variations, we allow ourself to spin 2*N when + // we think that N is actually required in order to succeed + target = std::min(kMaxSpins, + std::max(kMinSpins, tries * 2)); + } + + if (prevThresh == 0) { + // bootstrap + spinCutoff.store(target); + } else { + // try once, keep moving if CAS fails. Exponential moving average + // with alpha of 7/8 + // Be careful that the quantity we add to prevThresh is signed. + spinCutoff.compare_exchange_weak( + prevThresh, prevThresh + int(target - prevThresh) / 8); + } + } + } + + /// Unblocks a thread running waitForTurn(turn + 1) + void completeTurn(const uint32_t turn) noexcept { + uint32_t state = state_.load(std::memory_order_acquire); + while (true) { + assert(state == encode(turn << kTurnShift, decodeMaxWaitersDelta(state))); + uint32_t max_waiter_delta = decodeMaxWaitersDelta(state); + uint32_t new_state = encode( + (turn + 1) << kTurnShift, + max_waiter_delta == 0 ? 0 : max_waiter_delta - 1); + if (state_.compare_exchange_strong(state, new_state)) { + if (max_waiter_delta != 0) { + state_.futexWake(std::numeric_limits::max(), + futexChannel(turn + 1)); + } + break; + } + // failing compare_exchange_strong updates first arg to the value + // that caused the failure, so no need to reread state_ + } + } + + /// Returns the least-most significant byte of the current uncompleted + /// turn. The full 32 bit turn cannot be recovered. + uint8_t uncompletedTurnLSB() const noexcept { + return state_.load(std::memory_order_acquire) >> kTurnShift; + } + + private: + enum : uint32_t { + /// kTurnShift counts the bits that are stolen to record the delta + /// between the current turn and the maximum waiter. It needs to be big + /// enough to record wait deltas of 0 to 32 inclusive. Waiters more + /// than 32 in the future will be woken up 32*n turns early (since + /// their BITSET will hit) and will adjust the waiter count again. + /// We go a bit beyond and let the waiter count go up to 63, which + /// is free and might save us a few CAS + kTurnShift = 6, + kWaitersMask = (1 << kTurnShift) - 1, + + /// The minimum spin count that we will adaptively select + kMinSpins = 20, + + /// The maximum spin count that we will adaptively select, and the + /// spin count that will be used when probing to get a new data point + /// for the adaptation + kMaxSpins = 2000, + }; + + /// This holds both the current turn, and the highest waiting turn, + /// stored as (current_turn << 6) | min(63, max(waited_turn - current_turn)) + Futex state_; + + /// Returns the bitmask to pass futexWait or futexWake when communicating + /// about the specified turn + int futexChannel(uint32_t turn) const noexcept { + return 1 << (turn & 31); + } + + uint32_t decodeCurrentSturn(uint32_t state) const noexcept { + return state & ~kWaitersMask; + } + + uint32_t decodeMaxWaitersDelta(uint32_t state) const noexcept { + return state & kWaitersMask; + } + + uint32_t encode(uint32_t currentSturn, uint32_t maxWaiterD) const noexcept { + return currentSturn | std::min(uint32_t{ kWaitersMask }, maxWaiterD); + } +}; + +} // namespace detail +} // namespace folly -- 2.34.1