/// The dynamic version of MPMCQueue allows dynamic expansion of queue
/// capacity, such that a queue may start with a smaller capacity than
-/// specified and expand only if needed. Users may optionally specify
-/// the initial capacity and the expansion multiplier.
+/// specified and expand if needed up to the specified capacity.
+/// Shrinking is not supported at this point.
+///
+/// Users may optionally specify the initial capacity and the
+/// expansion multiplier. Otherwise default values are used.
+///
+/// Operation on the dynamic version have the same semantics as for
+/// the default fixed-size version, except that the total queue
+/// capacity can temporarily exceed the specified capacity when there
+/// are lagging consumers that haven't yet consumed all the elements
+/// in closed arrays. Users should not rely on the capacity of dynamic
+/// queues for synchronization, e.g., they should not expect that a
+/// thread will definitely block on a call to blockingWrite() when the
+/// queue size is known to be equal to its capacity.
+///
+/// Design Overview:
///
/// The design uses a seqlock to enforce mutual exclusion among
/// expansion attempts. Regular operations read up-to-date queue
/// closed arrays instead of the current one. Information about closed
/// slots arrays (array address, capacity, stride, and offset) is
/// maintained in a logarithmic-sized structure. Each entry in that
-/// structure never need to be changed once set. The number of closed
+/// structure never needs to be changed once set. The number of closed
/// arrays is half the value of the seqlock (when unlocked).
///
/// The acquisition of the seqlock to perform an expansion does not
/// seqlock read-only section (and hence obtained information for
/// older closed arrays).
///
-/// Note that the total queue capacity can temporarily exceed the
-/// specified capacity when there are lagging consumers that haven't
-/// yet consumed all the elements in closed arrays. Users should not
-/// rely on the capacity of dynamic queues for synchronization, e.g.,
-/// they should not expect that a thread will definitely block on a
-/// call to blockingWrite() when the queue size is known to be equal
-/// to its capacity.
-///
-/// Note that some writeIfNotFull() and tryWriteUntil() operations may
-/// fail even if the size of the queue is less than its maximum
-/// capacity and despite the success of expansion, if the operation
-/// happens to acquire a ticket that belongs to a closed array. This
-/// is a transient condition. Typically, one or two ticket values may
-/// be subject to such condition per expansion.
-///
/// The dynamic version is a partial specialization of MPMCQueue with
/// Dynamic == true
template <typename T, template <typename> class Atom>
int stride;
uint64_t state;
uint64_t offset;
- do {
- if (!trySeqlockReadSection(state, slots, cap, stride)) {
+ while (true) {
+ while (UNLIKELY(!trySeqlockReadSection(state, slots, cap, stride))) {
asm_volatile_pause();
- continue;
}
- if (maybeUpdateFromClosed(state, ticket, offset, slots, cap, stride)) {
- // There was an expansion after this ticket was issued.
+ maybeUpdateFromClosed(state, ticket, offset, slots, cap, stride);
+ if (LIKELY(slots[this->idx((ticket - offset), cap, stride)].mayEnqueue(
+ this->turn(ticket - offset, cap)))) {
+ // A slot is ready. Fast path. No need to expand.
break;
}
- if (slots[this->idx((ticket-offset), cap, stride)]
- .mayEnqueue(this->turn(ticket-offset, cap))) {
- // A slot is ready. No need to expand.
- break;
- } else if (this->popTicket_.load(std::memory_order_relaxed) + cap
- > ticket) {
+ // Slow path
+ auto head = this->popTicket_.load(std::memory_order_relaxed);
+ auto avail = std::max(head, offset) + cap;
+ if (ticket < avail) {
// May block, but a pop is in progress. No need to expand.
- // Get seqlock read section info again in case an expansion
- // occurred with an equal or higher ticket.
- continue;
- } else {
- // May block. See if we can expand.
- if (tryExpand(state, cap)) {
- // This or another thread started an expansion. Get updated info.
- continue;
- } else {
- // Can't expand.
- break;
- }
+ break;
+ }
+ // Try to expand, otherwise this operation may block
+ // indefinitely awaiting a consumer to unblock it.
+ if (!tryExpand(state, cap)) {
+ // Can't expand. Block.
+ break;
}
- } while (true);
+ // Either this or another thread started an expansion Get up-to-date info.
+ }
this->enqueueWithTicketBase(ticket-offset, slots, cap, stride,
std::forward<Args>(args)...);
}
int stride;
uint64_t state;
uint64_t offset;
- while (!trySeqlockReadSection(state, slots, cap, stride)) {
+ while (UNLIKELY(!trySeqlockReadSection(state, slots, cap, stride))) {
asm_volatile_pause();
}
// If there was an expansion after the corresponding push ticket
}
private:
-
enum {
- kSeqlockBits = 6,
- kDefaultMinDynamicCapacity = 10,
- kDefaultExpansionMultiplier = 10,
+ kSeqlockBits = 8,
+ kDefaultMinDynamicCapacity = 16,
+ kDefaultExpansionMultiplier = 8,
};
size_t dmult_;
uint64_t& ticket, Slot*& slots, size_t& cap, int& stride
) noexcept {
uint64_t state;
- do {
+ while (true) {
ticket = this->pushTicket_.load(std::memory_order_acquire); // A
- if (!trySeqlockReadSection(state, slots, cap, stride)) {
+ if (UNLIKELY(!trySeqlockReadSection(state, slots, cap, stride))) {
asm_volatile_pause();
continue;
}
-
// If there was an expansion with offset greater than this ticket,
// adjust accordingly
uint64_t offset;
maybeUpdateFromClosed(state, ticket, offset, slots, cap, stride);
-
- if (slots[this->idx((ticket-offset), cap, stride)]
- .mayEnqueue(this->turn(ticket-offset, cap))) {
+ if (LIKELY(slots[this->idx((ticket - offset), cap, stride)].mayEnqueue(
+ this->turn(ticket - offset, cap)))) {
// A slot is ready.
- if (this->pushTicket_.compare_exchange_strong(ticket, ticket + 1)) {
- // Adjust ticket
- ticket -= offset;
- return true;
- } else {
+ if (UNLIKELY(!this->pushTicket_.compare_exchange_strong(
+ ticket, ticket + 1))) {
continue;
}
- } else {
- if (ticket != this->pushTicket_.load(std::memory_order_relaxed)) { // B
- // Try again. Ticket changed.
- continue;
+ // Validate that state is the same
+ if (LIKELY(state == this->dstate_.load(std::memory_order_acquire))) {
+ ticket -= offset;
+ return true;
}
- // Likely to block.
- // Try to expand unless the ticket is for a closed array
- if (offset == getOffset(state)) {
- if (tryExpand(state, cap)) {
- // This or another thread started an expansion. Get up-to-date info.
- continue;
+ // Slow path - state changed - get up-to-date info for obtained ticket
+ while (true) {
+ state = this->dstate_.load(std::memory_order_acquire);
+ if (trySeqlockReadSection(state, slots, cap, stride)) {
+ break;
}
+ asm_volatile_pause();
}
- return false;
+ maybeUpdateFromClosed(state, ticket, offset, slots, cap, stride);
+ ticket -= offset;
+ return true;
+ }
+ // slow path - no ready ticket
+ if (ticket != this->pushTicket_.load(std::memory_order_relaxed)) { // B
+ // Ticket changed. Start over.
+ continue;
+ }
+ auto head = this->popTicket_.load(std::memory_order_acquire);
+ auto avail = std::max(head, offset) + cap;
+ if (ticket < avail) {
+ // a consumer is in the process of making the slot available
+ // don't try to expand. Spin if capacity is not
+ // exhausted. Otherwise return false.
+ if (cap == this->capacity_) {
+ return false;
+ }
+ asm_volatile_pause();
+ continue;
}
- } while (true);
+ // Likely to block. Try to expand.
+ if (tryExpand(state, cap)) {
+ // This or another thread started an expansion. Get up-to-date info.
+ continue;
+ }
+ // No ready ticket and cannot expand
+ return false;
+ }
}
bool tryObtainPromisedPushTicket(
uint64_t& ticket, Slot*& slots, size_t& cap, int& stride
) noexcept {
uint64_t state;
- do {
+ while (true) {
ticket = this->pushTicket_.load(std::memory_order_acquire);
- auto numPops = this->popTicket_.load(std::memory_order_acquire);
- if (!trySeqlockReadSection(state, slots, cap, stride)) {
+ auto head = this->popTicket_.load(std::memory_order_acquire);
+ if (UNLIKELY(!trySeqlockReadSection(state, slots, cap, stride))) {
asm_volatile_pause();
continue;
}
-
- const auto curCap = cap;
// If there was an expansion with offset greater than this ticket,
// adjust accordingly
uint64_t offset;
maybeUpdateFromClosed(state, ticket, offset, slots, cap, stride);
-
- int64_t n = ticket - numPops;
-
- if (n >= static_cast<ssize_t>(cap)) {
- if ((cap == curCap) && tryExpand(state, cap)) {
- // This or another thread started an expansion. Start over.
+ auto avail = std::max(offset, head) + cap;
+ if (UNLIKELY(ticket >= avail)) {
+ if (tryExpand(state, cap)) {
+ // Space may be available. Start over.
continue;
}
// Can't expand.
return false;
}
- if (this->pushTicket_.compare_exchange_strong(ticket, ticket + 1)) {
- // Adjust ticket
+ if (UNLIKELY((!this->pushTicket_.compare_exchange_strong(
+ ticket, ticket + 1)))) {
+ continue;
+ }
+ // Validate that state is the same
+ if (LIKELY(state == this->dstate_.load(std::memory_order_acquire))) {
ticket -= offset;
return true;
}
- } while (true);
+ // Obtained ticket but info is out-of-date - Update info
+ while (true) {
+ state = this->dstate_.load(std::memory_order_acquire);
+ if (trySeqlockReadSection(state, slots, cap, stride)) {
+ break;
+ }
+ asm_volatile_pause();
+ }
+ maybeUpdateFromClosed(state, ticket, offset, slots, cap, stride);
+ ticket -= offset;
+ return true;
+ }
}
bool tryObtainReadyPopTicket(
uint64_t& ticket, Slot*& slots, size_t& cap, int& stride
) noexcept {
uint64_t state;
- do {
+ while (true) {
ticket = this->popTicket_.load(std::memory_order_relaxed);
- if (!trySeqlockReadSection(state, slots, cap, stride)) {
+ if (UNLIKELY(!trySeqlockReadSection(state, slots, cap, stride))) {
asm_volatile_pause();
continue;
}
-
// If there was an expansion after the corresponding push ticket
// was issued, adjust accordingly
uint64_t offset;
maybeUpdateFromClosed(state, ticket, offset, slots, cap, stride);
-
- if (slots[this->idx((ticket-offset), cap, stride)]
- .mayDequeue(this->turn(ticket-offset, cap))) {
- if (this->popTicket_.compare_exchange_strong(ticket, ticket + 1)) {
- // Adjust ticket
- ticket -= offset;
- return true;
- }
- } else {
+ if (UNLIKELY(!slots[this->idx((ticket - offset), cap, stride)].mayDequeue(
+ this->turn(ticket - offset, cap)))) {
return false;
}
- } while (true);
+ if (UNLIKELY(
+ !this->popTicket_.compare_exchange_strong(ticket, ticket + 1))) {
+ continue;
+ }
+ // Validate that state is the same
+ if (LIKELY(state == this->dstate_.load(std::memory_order_acquire))) {
+ ticket -= offset;
+ return true;
+ }
+ // Obtained ticket but info is out-of-date - Update info
+ while (true) {
+ state = this->dstate_.load(std::memory_order_acquire);
+ if (trySeqlockReadSection(state, slots, cap, stride)) {
+ break;
+ }
+ asm_volatile_pause();
+ }
+ maybeUpdateFromClosed(state, ticket, offset, slots, cap, stride);
+ ticket -= offset;
+ return true;
+ }
}
bool tryObtainPromisedPopTicket(
uint64_t& ticket, Slot*& slots, size_t& cap, int& stride
) noexcept {
uint64_t state;
- do {
+ while (true) {
ticket = this->popTicket_.load(std::memory_order_acquire);
auto numPushes = this->pushTicket_.load(std::memory_order_acquire);
- if (!trySeqlockReadSection(state, slots, cap, stride)) {
+ if (UNLIKELY(!trySeqlockReadSection(state, slots, cap, stride))) {
asm_volatile_pause();
continue;
}
-
uint64_t offset;
// If there was an expansion after the corresponding push
// ticket was issued, adjust accordingly
maybeUpdateFromClosed(state, ticket, offset, slots, cap, stride);
-
- if (ticket >= numPushes) {
+ if (UNLIKELY(ticket >= numPushes)) {
ticket -= offset;
return false;
}
- if (this->popTicket_.compare_exchange_strong(ticket, ticket + 1)) {
+ if (UNLIKELY(
+ !this->popTicket_.compare_exchange_strong(ticket, ticket + 1))) {
+ continue;
+ }
+ // Validate that state is the same
+ if (LIKELY(state == this->dstate_.load(std::memory_order_acquire))) {
ticket -= offset;
return true;
}
- } while (true);
+ // Obtained ticket but info is out-of-date - Update info
+ while (true) {
+ state = this->dstate_.load(std::memory_order_acquire);
+ if (trySeqlockReadSection(state, slots, cap, stride)) {
+ break;
+ }
+ asm_volatile_pause();
+ }
+ maybeUpdateFromClosed(state, ticket, offset, slots, cap, stride);
+ ticket -= offset;
+ return true;
+ }
}
/// Enqueues an element with a specific ticket number
uint64_t state;
uint64_t offset;
- while (!trySeqlockReadSection(state, slots, cap, stride)) {}
+ while (UNLIKELY(!trySeqlockReadSection(state, slots, cap, stride))) {
+ asm_volatile_pause();
+ }
// If there was an expansion after this ticket was issued, adjust
// accordingly
}
/// Try to expand the queue. Returns true if this expansion was
- /// successful or a concurent expansion is in progress. Returns
+ /// successful or a concurent expansion is in progresse. Returns
/// false if the queue has reached its maximum capacity or
/// allocation has failed.
bool tryExpand(const uint64_t state, const size_t cap) noexcept {
- if (cap == this->capacity_) {
+ if (LIKELY(cap == this->capacity_)) {
return false;
}
+ return tryExpandWithSeqlock(state, cap);
+ }
+
+ bool tryExpandWithSeqlock(const uint64_t state, const size_t cap) noexcept {
// Acquire seqlock
uint64_t oldval = state;
assert((state & 1) == 0);
- if (this->dstate_.compare_exchange_strong(oldval, state + 1)) {
- assert(cap == this->dcapacity_.load());
- uint64_t ticket = 1 + std::max(this->pushTicket_.load(),
- this->popTicket_.load());
- size_t newCapacity =
- std::min(dmult_ * cap, this->capacity_);
- Slot* newSlots =
- new (std::nothrow) Slot[newCapacity + 2 * this->kSlotPadding];
- if (newSlots == nullptr) {
- // Expansion failed. Restore the seqlock
- this->dstate_.store(state);
- return false;
- }
- // Successful expansion
- // calculate the current ticket offset
- uint64_t offset = getOffset(state);
- // calculate index in closed array
- int index = getNumClosed(state);
- assert((index << 1) < (1 << kSeqlockBits));
- // fill the info for the closed slots array
- closed_[index].offset_ = offset;
- closed_[index].slots_ = this->dslots_.load();
- closed_[index].capacity_ = cap;
- closed_[index].stride_ = this->dstride_.load();
- // update the new slots array info
- this->dslots_.store(newSlots);
- this->dcapacity_.store(newCapacity);
- this->dstride_.store(this->computeStride(newCapacity));
- // Release the seqlock and record the new ticket offset
- this->dstate_.store((ticket << kSeqlockBits) + (2 * (index + 1)));
- return true;
- } else { // failed to acquire seqlock
- // Someone acaquired the seqlock. Go back to the caller and get
- // up-to-date info.
+ if (!this->dstate_.compare_exchange_strong(oldval, state + 1)) {
+ // Failed to acquire seqlock. Another thread acaquired it.
+ // Go back to the caller and get up-to-date info.
return true;
}
+ // Write critical section
+ assert(cap == this->dcapacity_.load());
+ auto head = this->popTicket_.load(std::memory_order_acquire);
+ auto avail = std::max(head, getOffset(state)) + cap;
+ uint64_t newOffset = avail;
+ size_t newCapacity = std::min(dmult_ * cap, this->capacity_);
+ Slot* newSlots =
+ new (std::nothrow) Slot[newCapacity + 2 * this->kSlotPadding];
+ if (newSlots == nullptr) {
+ // Expansion failed. Restore the seqlock
+ this->dstate_.store(state);
+ return false;
+ }
+ // Successful expansion
+ // calculate the current ticket offset
+ uint64_t offset = getOffset(state);
+ // calculate index in list of closed slots arrays
+ int index = getNumClosed(state);
+ assert((index << 1) < (1 << kSeqlockBits));
+ // fill the info for the closed slots array
+ closed_[index].offset_ = offset;
+ closed_[index].slots_ = this->dslots_.load();
+ closed_[index].capacity_ = cap;
+ closed_[index].stride_ = this->dstride_.load();
+ // update the new slots array info
+ this->dslots_.store(newSlots);
+ this->dcapacity_.store(newCapacity);
+ this->dstride_.store(this->computeStride(newCapacity));
+ // Release the seqlock and record the new ticket offset
+ this->dstate_.store((newOffset << kSeqlockBits) + (2 * (index + 1)));
+ return true;
}
/// Seqlock read-only section
uint64_t& state, Slot*& slots, size_t& cap, int& stride
) noexcept {
state = this->dstate_.load(std::memory_order_acquire);
- if (state & 1) {
+ if (UNLIKELY(state & 1)) {
// Locked.
return false;
}
stride = this->dstride_.load(std::memory_order_relaxed);
// End of read-only section. Validate seqlock.
std::atomic_thread_fence(std::memory_order_acquire);
- return (state == this->dstate_.load(std::memory_order_relaxed));
+ return LIKELY(state == this->dstate_.load(std::memory_order_relaxed));
}
/// If there was an expansion after ticket was issued, update local variables
size_t& cap,
int& stride) noexcept {
offset = getOffset(state);
- if (ticket >= offset) {
+ if (LIKELY(ticket >= offset)) {
return false;
}
+ updateFromClosed(state, ticket, offset, slots, cap, stride);
+ return true;
+ }
+
+ void updateFromClosed(
+ const uint64_t state,
+ const uint64_t ticket,
+ uint64_t& offset,
+ Slot*& slots,
+ size_t& cap,
+ int& stride) noexcept {
for (int i = getNumClosed(state) - 1; i >= 0; --i) {
offset = closed_[i].offset_;
if (offset <= ticket) {
slots = closed_[i].slots_;
cap = closed_[i].capacity_;
stride = closed_[i].stride_;
- return true;
+ return;
}
}
// A closed array with offset <= ticket should have been found
assert(false);
- return false;
+ return;
}
};