From 9ce176b1e0587a698d5930b70be86ca63a139af8 Mon Sep 17 00:00:00 2001 From: Maged Michael Date: Wed, 30 Aug 2017 12:53:16 -0700 Subject: [PATCH] Dynamic MPMCQueue: Eliminate cases of enqueue indefinite blocking and failure in the extensible version that impossible under the default pre-allocated version Summary: Currently under the extensible version (Dynamic == true), some enqueue operations may block indefinitely or fail (return false) even though such outcomes are impossible under the default (Dynamic == false) pre-allocated version. This diff eliminates such cases by changing the algorithms for the extensible version. Some of the high-level changes: - The offset formula for an expansion guarantees that no enqueue operation left behind in a closed array does not have an existing dequeue operation that unblocks it. The old formula was 1 + max(head, tail). The new formula is max(head, current offset) + current capacity. - Conditional operations validate state after the success of CAS. Reviewed By: djwatson Differential Revision: D5701013 fbshipit-source-id: 4917c5b35b7e2a2fddfd2e11fb5aeb478502137c --- folly/MPMCQueue.h | 360 +++++++++++++++++++++-------------- folly/test/MPMCQueueTest.cpp | 40 +++- 2 files changed, 247 insertions(+), 153 deletions(-) diff --git a/folly/MPMCQueue.h b/folly/MPMCQueue.h index b3564126..743de144 100644 --- a/folly/MPMCQueue.h +++ b/folly/MPMCQueue.h @@ -118,8 +118,22 @@ class MPMCQueue : public detail::MPMCQueueBase> { /// The dynamic version of MPMCQueue allows dynamic expansion of queue /// capacity, such that a queue may start with a smaller capacity than -/// specified and expand only if needed. Users may optionally specify -/// the initial capacity and the expansion multiplier. +/// specified and expand if needed up to the specified capacity. +/// Shrinking is not supported at this point. +/// +/// Users may optionally specify the initial capacity and the +/// expansion multiplier. Otherwise default values are used. +/// +/// Operation on the dynamic version have the same semantics as for +/// the default fixed-size version, except that the total queue +/// capacity can temporarily exceed the specified capacity when there +/// are lagging consumers that haven't yet consumed all the elements +/// in closed arrays. Users should not rely on the capacity of dynamic +/// queues for synchronization, e.g., they should not expect that a +/// thread will definitely block on a call to blockingWrite() when the +/// queue size is known to be equal to its capacity. +/// +/// Design Overview: /// /// The design uses a seqlock to enforce mutual exclusion among /// expansion attempts. Regular operations read up-to-date queue @@ -144,7 +158,7 @@ class MPMCQueue : public detail::MPMCQueueBase> { /// closed arrays instead of the current one. Information about closed /// slots arrays (array address, capacity, stride, and offset) is /// maintained in a logarithmic-sized structure. Each entry in that -/// structure never need to be changed once set. The number of closed +/// structure never needs to be changed once set. The number of closed /// arrays is half the value of the seqlock (when unlocked). /// /// The acquisition of the seqlock to perform an expansion does not @@ -154,21 +168,6 @@ class MPMCQueue : public detail::MPMCQueueBase> { /// seqlock read-only section (and hence obtained information for /// older closed arrays). /// -/// Note that the total queue capacity can temporarily exceed the -/// specified capacity when there are lagging consumers that haven't -/// yet consumed all the elements in closed arrays. Users should not -/// rely on the capacity of dynamic queues for synchronization, e.g., -/// they should not expect that a thread will definitely block on a -/// call to blockingWrite() when the queue size is known to be equal -/// to its capacity. -/// -/// Note that some writeIfNotFull() and tryWriteUntil() operations may -/// fail even if the size of the queue is less than its maximum -/// capacity and despite the success of expansion, if the operation -/// happens to acquire a ticket that belongs to a closed array. This -/// is a transient condition. Typically, one or two ticket values may -/// be subject to such condition per expansion. -/// /// The dynamic version is a partial specialization of MPMCQueue with /// Dynamic == true template class Atom> @@ -272,36 +271,31 @@ class MPMCQueue : int stride; uint64_t state; uint64_t offset; - do { - if (!trySeqlockReadSection(state, slots, cap, stride)) { + while (true) { + while (UNLIKELY(!trySeqlockReadSection(state, slots, cap, stride))) { asm_volatile_pause(); - continue; } - if (maybeUpdateFromClosed(state, ticket, offset, slots, cap, stride)) { - // There was an expansion after this ticket was issued. + maybeUpdateFromClosed(state, ticket, offset, slots, cap, stride); + if (LIKELY(slots[this->idx((ticket - offset), cap, stride)].mayEnqueue( + this->turn(ticket - offset, cap)))) { + // A slot is ready. Fast path. No need to expand. break; } - if (slots[this->idx((ticket-offset), cap, stride)] - .mayEnqueue(this->turn(ticket-offset, cap))) { - // A slot is ready. No need to expand. - break; - } else if (this->popTicket_.load(std::memory_order_relaxed) + cap - > ticket) { + // Slow path + auto head = this->popTicket_.load(std::memory_order_relaxed); + auto avail = std::max(head, offset) + cap; + if (ticket < avail) { // May block, but a pop is in progress. No need to expand. - // Get seqlock read section info again in case an expansion - // occurred with an equal or higher ticket. - continue; - } else { - // May block. See if we can expand. - if (tryExpand(state, cap)) { - // This or another thread started an expansion. Get updated info. - continue; - } else { - // Can't expand. - break; - } + break; + } + // Try to expand, otherwise this operation may block + // indefinitely awaiting a consumer to unblock it. + if (!tryExpand(state, cap)) { + // Can't expand. Block. + break; } - } while (true); + // Either this or another thread started an expansion Get up-to-date info. + } this->enqueueWithTicketBase(ticket-offset, slots, cap, stride, std::forward(args)...); } @@ -313,7 +307,7 @@ class MPMCQueue : int stride; uint64_t state; uint64_t offset; - while (!trySeqlockReadSection(state, slots, cap, stride)) { + while (UNLIKELY(!trySeqlockReadSection(state, slots, cap, stride))) { asm_volatile_pause(); } // If there was an expansion after the corresponding push ticket @@ -323,11 +317,10 @@ class MPMCQueue : } private: - enum { - kSeqlockBits = 6, - kDefaultMinDynamicCapacity = 10, - kDefaultExpansionMultiplier = 10, + kSeqlockBits = 8, + kDefaultMinDynamicCapacity = 16, + kDefaultExpansionMultiplier = 8, }; size_t dmult_; @@ -354,69 +347,86 @@ class MPMCQueue : uint64_t& ticket, Slot*& slots, size_t& cap, int& stride ) noexcept { uint64_t state; - do { + while (true) { ticket = this->pushTicket_.load(std::memory_order_acquire); // A - if (!trySeqlockReadSection(state, slots, cap, stride)) { + if (UNLIKELY(!trySeqlockReadSection(state, slots, cap, stride))) { asm_volatile_pause(); continue; } - // If there was an expansion with offset greater than this ticket, // adjust accordingly uint64_t offset; maybeUpdateFromClosed(state, ticket, offset, slots, cap, stride); - - if (slots[this->idx((ticket-offset), cap, stride)] - .mayEnqueue(this->turn(ticket-offset, cap))) { + if (LIKELY(slots[this->idx((ticket - offset), cap, stride)].mayEnqueue( + this->turn(ticket - offset, cap)))) { // A slot is ready. - if (this->pushTicket_.compare_exchange_strong(ticket, ticket + 1)) { - // Adjust ticket - ticket -= offset; - return true; - } else { + if (UNLIKELY(!this->pushTicket_.compare_exchange_strong( + ticket, ticket + 1))) { continue; } - } else { - if (ticket != this->pushTicket_.load(std::memory_order_relaxed)) { // B - // Try again. Ticket changed. - continue; + // Validate that state is the same + if (LIKELY(state == this->dstate_.load(std::memory_order_acquire))) { + ticket -= offset; + return true; } - // Likely to block. - // Try to expand unless the ticket is for a closed array - if (offset == getOffset(state)) { - if (tryExpand(state, cap)) { - // This or another thread started an expansion. Get up-to-date info. - continue; + // Slow path - state changed - get up-to-date info for obtained ticket + while (true) { + state = this->dstate_.load(std::memory_order_acquire); + if (trySeqlockReadSection(state, slots, cap, stride)) { + break; } + asm_volatile_pause(); } - return false; + maybeUpdateFromClosed(state, ticket, offset, slots, cap, stride); + ticket -= offset; + return true; + } + // slow path - no ready ticket + if (ticket != this->pushTicket_.load(std::memory_order_relaxed)) { // B + // Ticket changed. Start over. + continue; + } + auto head = this->popTicket_.load(std::memory_order_acquire); + auto avail = std::max(head, offset) + cap; + if (ticket < avail) { + // a consumer is in the process of making the slot available + // don't try to expand. Spin if capacity is not + // exhausted. Otherwise return false. + if (cap == this->capacity_) { + return false; + } + asm_volatile_pause(); + continue; } - } while (true); + // Likely to block. Try to expand. + if (tryExpand(state, cap)) { + // This or another thread started an expansion. Get up-to-date info. + continue; + } + // No ready ticket and cannot expand + return false; + } } bool tryObtainPromisedPushTicket( uint64_t& ticket, Slot*& slots, size_t& cap, int& stride ) noexcept { uint64_t state; - do { + while (true) { ticket = this->pushTicket_.load(std::memory_order_acquire); - auto numPops = this->popTicket_.load(std::memory_order_acquire); - if (!trySeqlockReadSection(state, slots, cap, stride)) { + auto head = this->popTicket_.load(std::memory_order_acquire); + if (UNLIKELY(!trySeqlockReadSection(state, slots, cap, stride))) { asm_volatile_pause(); continue; } - - const auto curCap = cap; // If there was an expansion with offset greater than this ticket, // adjust accordingly uint64_t offset; maybeUpdateFromClosed(state, ticket, offset, slots, cap, stride); - - int64_t n = ticket - numPops; - - if (n >= static_cast(cap)) { - if ((cap == curCap) && tryExpand(state, cap)) { - // This or another thread started an expansion. Start over. + auto avail = std::max(offset, head) + cap; + if (UNLIKELY(ticket >= avail)) { + if (tryExpand(state, cap)) { + // Space may be available. Start over. continue; } // Can't expand. @@ -424,69 +434,110 @@ class MPMCQueue : return false; } - if (this->pushTicket_.compare_exchange_strong(ticket, ticket + 1)) { - // Adjust ticket + if (UNLIKELY((!this->pushTicket_.compare_exchange_strong( + ticket, ticket + 1)))) { + continue; + } + // Validate that state is the same + if (LIKELY(state == this->dstate_.load(std::memory_order_acquire))) { ticket -= offset; return true; } - } while (true); + // Obtained ticket but info is out-of-date - Update info + while (true) { + state = this->dstate_.load(std::memory_order_acquire); + if (trySeqlockReadSection(state, slots, cap, stride)) { + break; + } + asm_volatile_pause(); + } + maybeUpdateFromClosed(state, ticket, offset, slots, cap, stride); + ticket -= offset; + return true; + } } bool tryObtainReadyPopTicket( uint64_t& ticket, Slot*& slots, size_t& cap, int& stride ) noexcept { uint64_t state; - do { + while (true) { ticket = this->popTicket_.load(std::memory_order_relaxed); - if (!trySeqlockReadSection(state, slots, cap, stride)) { + if (UNLIKELY(!trySeqlockReadSection(state, slots, cap, stride))) { asm_volatile_pause(); continue; } - // If there was an expansion after the corresponding push ticket // was issued, adjust accordingly uint64_t offset; maybeUpdateFromClosed(state, ticket, offset, slots, cap, stride); - - if (slots[this->idx((ticket-offset), cap, stride)] - .mayDequeue(this->turn(ticket-offset, cap))) { - if (this->popTicket_.compare_exchange_strong(ticket, ticket + 1)) { - // Adjust ticket - ticket -= offset; - return true; - } - } else { + if (UNLIKELY(!slots[this->idx((ticket - offset), cap, stride)].mayDequeue( + this->turn(ticket - offset, cap)))) { return false; } - } while (true); + if (UNLIKELY( + !this->popTicket_.compare_exchange_strong(ticket, ticket + 1))) { + continue; + } + // Validate that state is the same + if (LIKELY(state == this->dstate_.load(std::memory_order_acquire))) { + ticket -= offset; + return true; + } + // Obtained ticket but info is out-of-date - Update info + while (true) { + state = this->dstate_.load(std::memory_order_acquire); + if (trySeqlockReadSection(state, slots, cap, stride)) { + break; + } + asm_volatile_pause(); + } + maybeUpdateFromClosed(state, ticket, offset, slots, cap, stride); + ticket -= offset; + return true; + } } bool tryObtainPromisedPopTicket( uint64_t& ticket, Slot*& slots, size_t& cap, int& stride ) noexcept { uint64_t state; - do { + while (true) { ticket = this->popTicket_.load(std::memory_order_acquire); auto numPushes = this->pushTicket_.load(std::memory_order_acquire); - if (!trySeqlockReadSection(state, slots, cap, stride)) { + if (UNLIKELY(!trySeqlockReadSection(state, slots, cap, stride))) { asm_volatile_pause(); continue; } - uint64_t offset; // If there was an expansion after the corresponding push // ticket was issued, adjust accordingly maybeUpdateFromClosed(state, ticket, offset, slots, cap, stride); - - if (ticket >= numPushes) { + if (UNLIKELY(ticket >= numPushes)) { ticket -= offset; return false; } - if (this->popTicket_.compare_exchange_strong(ticket, ticket + 1)) { + if (UNLIKELY( + !this->popTicket_.compare_exchange_strong(ticket, ticket + 1))) { + continue; + } + // Validate that state is the same + if (LIKELY(state == this->dstate_.load(std::memory_order_acquire))) { ticket -= offset; return true; } - } while (true); + // Obtained ticket but info is out-of-date - Update info + while (true) { + state = this->dstate_.load(std::memory_order_acquire); + if (trySeqlockReadSection(state, slots, cap, stride)) { + break; + } + asm_volatile_pause(); + } + maybeUpdateFromClosed(state, ticket, offset, slots, cap, stride); + ticket -= offset; + return true; + } } /// Enqueues an element with a specific ticket number @@ -498,7 +549,9 @@ class MPMCQueue : uint64_t state; uint64_t offset; - while (!trySeqlockReadSection(state, slots, cap, stride)) {} + while (UNLIKELY(!trySeqlockReadSection(state, slots, cap, stride))) { + asm_volatile_pause(); + } // If there was an expansion after this ticket was issued, adjust // accordingly @@ -517,52 +570,56 @@ class MPMCQueue : } /// Try to expand the queue. Returns true if this expansion was - /// successful or a concurent expansion is in progress. Returns + /// successful or a concurent expansion is in progresse. Returns /// false if the queue has reached its maximum capacity or /// allocation has failed. bool tryExpand(const uint64_t state, const size_t cap) noexcept { - if (cap == this->capacity_) { + if (LIKELY(cap == this->capacity_)) { return false; } + return tryExpandWithSeqlock(state, cap); + } + + bool tryExpandWithSeqlock(const uint64_t state, const size_t cap) noexcept { // Acquire seqlock uint64_t oldval = state; assert((state & 1) == 0); - if (this->dstate_.compare_exchange_strong(oldval, state + 1)) { - assert(cap == this->dcapacity_.load()); - uint64_t ticket = 1 + std::max(this->pushTicket_.load(), - this->popTicket_.load()); - size_t newCapacity = - std::min(dmult_ * cap, this->capacity_); - Slot* newSlots = - new (std::nothrow) Slot[newCapacity + 2 * this->kSlotPadding]; - if (newSlots == nullptr) { - // Expansion failed. Restore the seqlock - this->dstate_.store(state); - return false; - } - // Successful expansion - // calculate the current ticket offset - uint64_t offset = getOffset(state); - // calculate index in closed array - int index = getNumClosed(state); - assert((index << 1) < (1 << kSeqlockBits)); - // fill the info for the closed slots array - closed_[index].offset_ = offset; - closed_[index].slots_ = this->dslots_.load(); - closed_[index].capacity_ = cap; - closed_[index].stride_ = this->dstride_.load(); - // update the new slots array info - this->dslots_.store(newSlots); - this->dcapacity_.store(newCapacity); - this->dstride_.store(this->computeStride(newCapacity)); - // Release the seqlock and record the new ticket offset - this->dstate_.store((ticket << kSeqlockBits) + (2 * (index + 1))); - return true; - } else { // failed to acquire seqlock - // Someone acaquired the seqlock. Go back to the caller and get - // up-to-date info. + if (!this->dstate_.compare_exchange_strong(oldval, state + 1)) { + // Failed to acquire seqlock. Another thread acaquired it. + // Go back to the caller and get up-to-date info. return true; } + // Write critical section + assert(cap == this->dcapacity_.load()); + auto head = this->popTicket_.load(std::memory_order_acquire); + auto avail = std::max(head, getOffset(state)) + cap; + uint64_t newOffset = avail; + size_t newCapacity = std::min(dmult_ * cap, this->capacity_); + Slot* newSlots = + new (std::nothrow) Slot[newCapacity + 2 * this->kSlotPadding]; + if (newSlots == nullptr) { + // Expansion failed. Restore the seqlock + this->dstate_.store(state); + return false; + } + // Successful expansion + // calculate the current ticket offset + uint64_t offset = getOffset(state); + // calculate index in list of closed slots arrays + int index = getNumClosed(state); + assert((index << 1) < (1 << kSeqlockBits)); + // fill the info for the closed slots array + closed_[index].offset_ = offset; + closed_[index].slots_ = this->dslots_.load(); + closed_[index].capacity_ = cap; + closed_[index].stride_ = this->dstride_.load(); + // update the new slots array info + this->dslots_.store(newSlots); + this->dcapacity_.store(newCapacity); + this->dstride_.store(this->computeStride(newCapacity)); + // Release the seqlock and record the new ticket offset + this->dstate_.store((newOffset << kSeqlockBits) + (2 * (index + 1))); + return true; } /// Seqlock read-only section @@ -570,7 +627,7 @@ class MPMCQueue : uint64_t& state, Slot*& slots, size_t& cap, int& stride ) noexcept { state = this->dstate_.load(std::memory_order_acquire); - if (state & 1) { + if (UNLIKELY(state & 1)) { // Locked. return false; } @@ -580,7 +637,7 @@ class MPMCQueue : stride = this->dstride_.load(std::memory_order_relaxed); // End of read-only section. Validate seqlock. std::atomic_thread_fence(std::memory_order_acquire); - return (state == this->dstate_.load(std::memory_order_relaxed)); + return LIKELY(state == this->dstate_.load(std::memory_order_relaxed)); } /// If there was an expansion after ticket was issued, update local variables @@ -594,21 +651,32 @@ class MPMCQueue : size_t& cap, int& stride) noexcept { offset = getOffset(state); - if (ticket >= offset) { + if (LIKELY(ticket >= offset)) { return false; } + updateFromClosed(state, ticket, offset, slots, cap, stride); + return true; + } + + void updateFromClosed( + const uint64_t state, + const uint64_t ticket, + uint64_t& offset, + Slot*& slots, + size_t& cap, + int& stride) noexcept { for (int i = getNumClosed(state) - 1; i >= 0; --i) { offset = closed_[i].offset_; if (offset <= ticket) { slots = closed_[i].slots_; cap = closed_[i].capacity_; stride = closed_[i].stride_; - return true; + return; } } // A closed array with offset <= ticket should have been found assert(false); - return false; + return; } }; diff --git a/folly/test/MPMCQueueTest.cpp b/folly/test/MPMCQueueTest.cpp index 0874a8f2..3b1dd2c9 100644 --- a/folly/test/MPMCQueueTest.cpp +++ b/folly/test/MPMCQueueTest.cpp @@ -748,11 +748,6 @@ void runMtNeverFail(std::vector& nts, int n) { } } -// All the never_fail tests are for the non-dynamic version only. -// False positive for dynamic version. Some writeIfNotFull() and -// tryWriteUntil() operations may fail in transient conditions related -// to expansion. - TEST(MPMCQueue, mt_never_fail) { std::vector nts {1, 3, 100}; int n = 100000; @@ -765,6 +760,18 @@ TEST(MPMCQueue, mt_never_fail_emulated_futex) { runMtNeverFail(nts, n); } +TEST(MPMCQueue, mt_never_fail_dynamic) { + std::vector nts{1, 3, 100}; + int n = 100000; + runMtNeverFail(nts, n); +} + +TEST(MPMCQueue, mt_never_fail_emulated_futex_dynamic) { + std::vector nts{1, 3, 100}; + int n = 100000; + runMtNeverFail(nts, n); +} + template void runMtNeverFailDeterministic(std::vector& nts, int n, long seed) { LOG(INFO) << "using seed " << seed; @@ -787,6 +794,13 @@ TEST(MPMCQueue, mt_never_fail_deterministic) { runMtNeverFailDeterministic(nts, n, seed); } +TEST(MPMCQueue, mt_never_fail_deterministic_dynamic) { + std::vector nts{3, 10}; + long seed = 0; // nowMicro() % 10000; + int n = 1000; + runMtNeverFailDeterministic(nts, n, seed); +} + template class Atom, bool Dynamic> void runNeverFailUntilThread(int numThreads, int n, /*numOps*/ @@ -851,6 +865,12 @@ TEST(MPMCQueue, mt_never_fail_until_system) { runMtNeverFailUntilSystem(nts, n); } +TEST(MPMCQueue, mt_never_fail_until_system_dynamic) { + std::vector nts{1, 3, 100}; + int n = 100000; + runMtNeverFailUntilSystem(nts, n); +} + template void runMtNeverFailUntilSteady(std::vector& nts, int n) { for (int nt : nts) { @@ -867,6 +887,12 @@ TEST(MPMCQueue, mt_never_fail_until_steady) { runMtNeverFailUntilSteady(nts, n); } +TEST(MPMCQueue, mt_never_fail_until_steady_dynamic) { + std::vector nts{1, 3, 100}; + int n = 100000; + runMtNeverFailUntilSteady(nts, n); +} + enum LifecycleEvent { NOTHING = -1, DEFAULT_CONSTRUCTOR, @@ -1213,7 +1239,7 @@ TEST(MPMCQueue, try_write_until_timeout) { testTimeout(queue); } -TEST(MPMCQueue, must_fail_try_write_until_dynamic) { - folly::MPMCQueue queue(200, 1, 2); +TEST(MPMCQueue, try_write_until_timeout_dynamic) { + folly::MPMCQueue queue(1); testTimeout(queue); } -- 2.34.1