template <typename T> class MPMCPipelineStageImpl;
+/// MPMCQueue base CRTP template
+template <typename> class MPMCQueueBase;
} // namespace detail
/// MPMCQueue<T> is a high-performance bounded concurrent queue that
/// are you can enqueue one sentinel and then have each consumer requeue
/// two sentinels after it receives it (by requeuing 2 the shutdown can
/// complete in O(log P) time instead of O(P)).
-template<typename T,
- template<typename> class Atom = std::atomic>
-class MPMCQueue : boost::noncopyable {
+template<typename T, template<typename> class Atom = std::atomic,
+ bool Dynamic = false>
+class MPMCQueue : public detail::MPMCQueueBase<MPMCQueue<T,Atom,Dynamic>> {
+ friend class detail::MPMCPipelineStageImpl<T>;
+ using Slot = detail::SingleElementQueue<T,Atom>;
+ public:
+ explicit MPMCQueue(size_t queueCapacity)
+ : detail::MPMCQueueBase<MPMCQueue<T,Atom,Dynamic>>(queueCapacity)
+ {
+ this->stride_ = this->computeStride(queueCapacity);
+ this->slots_ = new Slot[queueCapacity + 2 * this->kSlotPadding];
+ }
+ MPMCQueue() noexcept { }
+/// The dynamic version of MPMCQueue allows dynamic expansion of queue
+/// capacity, such that a queue may start with a smaller capacity than
+/// specified and expand only if needed. Users may optionally specify
+/// the initial capacity and the expansion multiplier.
+/// The design uses a seqlock to enforce mutual exclusion among
+/// expansion attempts. Regular operations read up-to-date queue
+/// information (slots array, capacity, stride) inside read-only
+/// seqlock sections, which are unimpeded when no expansion is in
+/// progress.
+/// An expansion computes a new capacity, allocates a new slots array,
+/// and updates stride. No information needs to be copied from the
+/// current slots array to the new one. When this happens, new slots
+/// will not have sequence numbers that match ticket numbers. The
+/// expansion needs to compute a ticket offset such that operations
+/// that use new arrays can adjust the calculations of slot indexes
+/// and sequence numbers that take into account that the new slots
+/// start with sequence numbers of zero. The current ticket offset is
+/// packed with the seqlock in an atomic 64-bit integer. The initial
+/// offset is zero.
+/// Lagging write and read operations with tickets lower than the
+/// ticket offset of the current slots array (i.e., the minimum ticket
+/// number that can be served by the current array) must use earlier
+/// closed arrays instead of the current one. Information about closed
+/// slots arrays (array address, capacity, stride, and offset) is
+/// maintained in a logarithmic-sized structure. Each entry in that
+/// structure never need to be changed once set. The number of closed
+/// arrays is half the value of the seqlock (when unlocked).
+/// The acquisition of the seqlock to perform an expansion does not
+/// prevent the issuing of new push and pop tickets concurrently. The
+/// expansion must set the new ticket offset to a value that couldn't
+/// have been issued to an operation that has already gone through a
+/// seqlock read-only section (and hence obtained information for
+/// older closed arrays).
+/// Note that the total queue capacity can temporarily exceed the
+/// specified capacity when there are lagging consumers that haven't
+/// yet consumed all the elements in closed arrays. Users should not
+/// rely on the capacity of dynamic queues for synchronization, e.g.,
+/// they should not expect that a thread will definitely block on a
+/// call to blockingWrite() when the queue size is known to be equal
+/// to its capacity.
+/// The dynamic version is a partial specialization of MPMCQueue with
+/// Dynamic == true
+template <typename T, template<typename> class Atom>
+class MPMCQueue<T,Atom,true> :
+ public detail::MPMCQueueBase<MPMCQueue<T,Atom,true>> {
+ friend class detail::MPMCQueueBase<MPMCQueue<T,Atom,true>>;
+ using Slot = detail::SingleElementQueue<T,Atom>;
+ struct ClosedArray {
+ uint64_t offset_ {0};
+ Slot* slots_ {nullptr};
+ size_t capacity_ {0};
+ int stride_ {0};
+ };
+ public:
+ explicit MPMCQueue(size_t queueCapacity)
+ : detail::MPMCQueueBase<MPMCQueue<T,Atom,true>>(queueCapacity)
+ {
+ size_t cap = std::min<size_t>(kDefaultMinDynamicCapacity, queueCapacity);
+ initQueue(cap, kDefaultExpansionMultiplier);
+ }
+ explicit MPMCQueue(size_t queueCapacity,
+ size_t minCapacity,
+ size_t expansionMultiplier)
+ : detail::MPMCQueueBase<MPMCQueue<T,Atom,true>>(queueCapacity)
+ {
+ minCapacity = std::max<size_t>(1, minCapacity);
+ size_t cap = std::min<size_t>(minCapacity, queueCapacity);
+ expansionMultiplier = std::max<size_t>(2, expansionMultiplier);
+ initQueue(cap, expansionMultiplier);
+ }
+ MPMCQueue() noexcept {
+ dmult_ = 0;
+ closed_ = nullptr;
+ }
+ MPMCQueue(MPMCQueue<T,Atom,true>&& rhs) noexcept {
+ this->capacity_ = rhs.capacity_;
+ this->slots_ = rhs.slots_;
+ this->stride_ = rhs.stride_;
+ this->dstate_.store(rhs.dstate_.load(std::memory_order_relaxed),
+ std::memory_order_relaxed);
+ this->dcapacity_.store(rhs.dcapacity_.load(std::memory_order_relaxed),
+ std::memory_order_relaxed);
+ this->pushTicket_.store(rhs.pushTicket_.load(std::memory_order_relaxed),
+ std::memory_order_relaxed);
+ this->popTicket_.store(rhs.popTicket_.load(std::memory_order_relaxed),
+ std::memory_order_relaxed);
+ this->pushSpinCutoff_.store(
+ rhs.pushSpinCutoff_.load(std::memory_order_relaxed),
+ std::memory_order_relaxed);
+ this->popSpinCutoff_.store(
+ rhs.popSpinCutoff_.load(std::memory_order_relaxed),
+ std::memory_order_relaxed);
+ dmult_ = rhs.dmult_;
+ closed_ = rhs.closed_;
+ rhs.capacity_ = 0;
+ rhs.slots_ = nullptr;
+ rhs.stride_ = 0;
+ rhs.dstate_.store(0, std::memory_order_relaxed);
+ rhs.dcapacity_.store(0, std::memory_order_relaxed);
+ rhs.pushTicket_.store(0, std::memory_order_relaxed);
+ rhs.popTicket_.store(0, std::memory_order_relaxed);
+ rhs.pushSpinCutoff_.store(0, std::memory_order_relaxed);
+ rhs.popSpinCutoff_.store(0, std::memory_order_relaxed);
+ rhs.dmult_ = 0;
+ rhs.closed_ = nullptr;
+ }
+ MPMCQueue<T,Atom, true> const& operator= (MPMCQueue<T,Atom, true>&& rhs) {
+ if (this != &rhs) {
+ this->~MPMCQueue();
+ new (this) MPMCQueue(std::move(rhs));
+ }
+ return *this;
+ }
+ ~MPMCQueue() {
+ if (closed_ != nullptr) {
+ for (int i = getNumClosed(this->dstate_.load()) - 1; i >= 0; --i) {
+ delete[] closed_[i].slots_;
+ }
+ delete[] closed_;
+ }
+ }
+ size_t allocatedCapacity() const noexcept {
+ return this->dcapacity_.load(std::memory_order_relaxed);
+ }
+ template <typename ...Args>
+ void blockingWrite(Args&&... args) noexcept {
+ uint64_t ticket = this->pushTicket_++;
+ Slot* slots;
+ size_t cap;
+ int stride;
+ uint64_t state;
+ uint64_t offset;
+ do {
+ if (!trySeqlockReadSection(state, slots, cap, stride)) {
+ continue;
+ }
+ offset = getOffset(state);
+ if (ticket < offset) {
+ // There was an expansion after this ticket was issued.
+ updateFromClosed(state, ticket, offset, slots, cap, stride);
+ break;
+ }
+ if (slots[this->idx((ticket-offset), cap, stride)]
+ .mayEnqueue(this->turn(ticket-offset, cap))) {
+ // A slot is ready. No need to expand.
+ break;
+ } else if (this->popTicket_.load(std::memory_order_relaxed) + cap
+ > ticket) {
+ // May block, but a pop is in progress. No need to expand.
+ // Get seqlock read section info again in case an expansion
+ // occurred with an equal or higher ticket.
+ continue;
+ } else {
+ // May block. See if we can expand.
+ if (tryExpand(state, cap)) {
+ // This or another thread started an expansion. Get updated info.
+ continue;
+ } else {
+ // Can't expand.
+ break;
+ }
+ }
+ } while (true);
+ this->enqueueWithTicketBase(ticket-offset, slots, cap, stride,
+ std::forward<Args>(args)...);
+ }
+ void blockingReadWithTicket(uint64_t& ticket, T& elem) noexcept {
+ ticket = this->popTicket_++;
+ Slot* slots;
+ size_t cap;
+ int stride;
+ uint64_t state;
+ uint64_t offset;
+ while (!trySeqlockReadSection(state, slots, cap, stride));
+ offset = getOffset(state);
+ if (ticket < offset) {
+ // There was an expansion after the corresponding push ticket
+ // was issued.
+ updateFromClosed(state, ticket, offset, slots, cap, stride);
+ }
+ this->dequeueWithTicketBase(ticket-offset, slots, cap, stride, elem);
+ }
+ private:
+ enum {
+ kSeqlockBits = 6,
+ kDefaultMinDynamicCapacity = 10,
+ kDefaultExpansionMultiplier = 10,
+ };
+ size_t dmult_;
+ // Info about closed slots arrays for use by lagging operations
+ ClosedArray* closed_;
+ void initQueue(const size_t cap, const size_t mult) {
+ this->stride_ = this->computeStride(cap);
+ this->slots_ = new Slot[cap + 2 * this->kSlotPadding];
+ this->dstate_.store(0);
+ this->dcapacity_.store(cap);
+ dmult_ = mult;
+ size_t maxClosed = 0;
+ for (size_t expanded = cap;
+ expanded < this->capacity_;
+ expanded *= mult) {
+ ++maxClosed;
+ }
+ closed_ = (maxClosed > 0) ? new ClosedArray[maxClosed] : nullptr;
+ }
+ bool tryObtainReadyPushTicket(
+ uint64_t& ticket, Slot*& slots, size_t& cap, int& stride
+ ) noexcept {
+ uint64_t state;
+ do {
+ ticket = this->pushTicket_.load(std::memory_order_acquire); // A
+ if (!trySeqlockReadSection(state, slots, cap, stride)) {
+ continue;
+ }
+ uint64_t offset = getOffset(state);
+ if (ticket < offset) {
+ // There was an expansion with offset greater than this ticket
+ updateFromClosed(state, ticket, offset, slots, cap, stride);
+ }
+ if (slots[this->idx((ticket-offset), cap, stride)]
+ .mayEnqueue(this->turn(ticket-offset, cap))) {
+ // A slot is ready.
+ if (this->pushTicket_.compare_exchange_strong(ticket, ticket + 1)) {
+ // Adjust ticket
+ ticket -= offset;
+ return true;
+ } else {
+ continue;
+ }
+ } else {
+ if (ticket != this->pushTicket_.load(std::memory_order_relaxed)) { // B
+ // Try again. Ticket changed.
+ continue;
+ }
+ // Likely to block.
+ // Try to expand unless the ticket is for a closed array
+ if (offset == getOffset(state)) {
+ if (tryExpand(state, cap)) {
+ // This or another thread started an expansion. Get up-to-date info.
+ continue;
+ }
+ }
+ return false;
+ }
+ } while (true);
+ }
+ bool tryObtainPromisedPushTicket(
+ uint64_t& ticket, Slot*& slots, size_t& cap, int& stride
+ ) noexcept {
+ uint64_t state;
+ do {
+ ticket = this->pushTicket_.load(std::memory_order_acquire);
+ auto numPops = this->popTicket_.load(std::memory_order_acquire);
+ if (!trySeqlockReadSection(state, slots, cap, stride)) {
+ continue;
+ }
+ int64_t n = ticket - numPops;
+ if (n >= static_cast<ssize_t>(this->capacity_)) {
+ return false;
+ }
+ if ((n >= static_cast<ssize_t>(cap))) {
+ if (tryExpand(state, cap)) {
+ // This or another thread started an expansion. Start over
+ // with a new state.
+ continue;
+ } else {
+ // Can't expand.
+ return false;
+ }
+ }
+ uint64_t offset = getOffset(state);
+ if (ticket < offset) {
+ // There was an expansion with offset greater than this ticket
+ updateFromClosed(state, ticket, offset, slots, cap, stride);
+ }
+ if (this->pushTicket_.compare_exchange_strong(ticket, ticket + 1)) {
+ // Adjust ticket
+ ticket -= offset;
+ return true;
+ }
+ } while (true);
+ }
+ bool tryObtainReadyPopTicket(
+ uint64_t& ticket, Slot*& slots, size_t& cap, int& stride
+ ) noexcept {
+ uint64_t state;
+ do {
+ ticket = this->popTicket_.load(std::memory_order_relaxed);
+ if (!trySeqlockReadSection(state, slots, cap, stride)) {
+ continue;
+ }
+ uint64_t offset = getOffset(state);
+ if (ticket < offset) {
+ // There was an expansion after the corresponding push ticket
+ // was issued.
+ updateFromClosed(state, ticket, offset, slots, cap, stride);
+ }
+ if (slots[this->idx((ticket-offset), cap, stride)]
+ .mayDequeue(this->turn(ticket-offset, cap))) {
+ if (this->popTicket_.compare_exchange_strong(ticket, ticket + 1)) {
+ // Adjust ticket
+ ticket -= offset;
+ return true;
+ }
+ } else {
+ return false;
+ }
+ } while (true);
+ }
+ bool tryObtainPromisedPopTicket(
+ uint64_t& ticket, Slot*& slots, size_t& cap, int& stride
+ ) noexcept {
+ uint64_t state;
+ do {
+ ticket = this->popTicket_.load(std::memory_order_acquire);
+ auto numPushes = this->pushTicket_.load(std::memory_order_acquire);
+ if (!trySeqlockReadSection(state, slots, cap, stride)) {
+ continue;
+ }
+ if (ticket >= numPushes) {
+ return false;
+ }
+ if (this->popTicket_.compare_exchange_strong(ticket, ticket + 1)) {
+ // Adjust ticket
+ uint64_t offset = getOffset(state);
+ if (ticket < offset) {
+ // There was an expansion after the corresponding push
+ // ticket was issued.
+ updateFromClosed(state, ticket, offset, slots, cap, stride);
+ }
+ // Adjust ticket
+ ticket -= offset;
+ return true;
+ }
+ } while (true);
+ }
+ /// Enqueues an element with a specific ticket number
+ template <typename ...Args>
+ void enqueueWithTicket(const uint64_t ticket, Args&&... args) noexcept {
+ Slot* slots;
+ size_t cap;
+ int stride;
+ uint64_t state;
+ uint64_t offset;
+ while (!trySeqlockReadSection(state, slots, cap, stride)) {}
+ offset = getOffset(state);
+ if (ticket < offset) {
+ // There was an expansion after this ticket was issued.
+ updateFromClosed(state, ticket, offset, slots, cap, stride);
+ }
+ this->enqueueWithTicketBase(ticket-offset, slots, cap, stride,
+ std::forward<Args>(args)...);
+ }
+ uint64_t getOffset(const uint64_t state) const noexcept {
+ return state >> kSeqlockBits;
+ }
+ int getNumClosed(const uint64_t state) const noexcept {
+ return (state & ((1 << kSeqlockBits) - 1)) >> 1;
+ }
+ /// Try to expand the queue. Returns true if this expansion was
+ /// successful or a concurent expansion is in progress. Returns
+ /// false if the queue has reached its maximum capacity or
+ /// allocation has failed.
+ bool tryExpand(const uint64_t state, const size_t cap) noexcept {
+ if (cap == this->capacity_) {
+ return false;
+ }
+ // Acquire seqlock
+ uint64_t oldval = state;
+ assert((state & 1) == 0);
+ if (this->dstate_.compare_exchange_strong(oldval, state + 1)) {
+ assert(cap == this->dcapacity_.load());
+ uint64_t ticket = 1 + std::max(this->pushTicket_.load(),
+ this->popTicket_.load());
+ size_t newCapacity =
+ std::min(dmult_ * cap, this->capacity_);
+ Slot* newSlots =
+ new (std::nothrow) Slot[newCapacity + 2 * this->kSlotPadding];
+ if (newSlots == nullptr) {
+ // Expansion failed. Restore the seqlock
+ this->dstate_.store(state);
+ return false;
+ }
+ // Successful expansion
+ // calculate the current ticket offset
+ uint64_t offset = getOffset(state);
+ // calculate index in closed array
+ int index = getNumClosed(state);
+ assert((index << 1) < (1 << kSeqlockBits));
+ // fill the info for the closed slots array
+ closed_[index].offset_ = offset;
+ closed_[index].slots_ = this->dslots_.load();
+ closed_[index].capacity_ = cap;
+ closed_[index].stride_ = this->dstride_.load();
+ // update the new slots array info
+ this->dslots_.store(newSlots);
+ this->dcapacity_.store(newCapacity);
+ this->dstride_.store(this->computeStride(newCapacity));
+ // Release the seqlock and record the new ticket offset
+ this->dstate_.store((ticket << kSeqlockBits) + (2 * (index + 1)));
+ return true;
+ } else { // failed to acquire seqlock
+ // Someone acaquired the seqlock. Go back to the caller and get
+ // up-to-date info.
+ return true;
+ }
+ }
+ /// Seqlock read-only section
+ bool trySeqlockReadSection(
+ uint64_t& state, Slot*& slots, size_t& cap, int& stride
+ ) noexcept {
+ state = this->dstate_.load(std::memory_order_acquire);
+ if (state & 1) {
+ // Locked.
+ return false;
+ }
+ // Start read-only section.
+ slots = this->dslots_.load(std::memory_order_relaxed);
+ cap = this->dcapacity_.load(std::memory_order_relaxed);
+ stride = this->dstride_.load(std::memory_order_relaxed);
+ // End of read-only section. Validate seqlock.
+ std::atomic_thread_fence(std::memory_order_acquire);
+ return (state == this->dstate_.load(std::memory_order_relaxed));
+ }
+ /// Update local variables of a lagging operation using the
+ /// most recent closed array with offset <= ticket
+ void updateFromClosed(
+ const uint64_t state, const uint64_t ticket,
+ uint64_t& offset, Slot*& slots, size_t& cap, int& stride
+ ) noexcept {
+ for (int i = getNumClosed(state) - 1; i >= 0; --i) {
+ offset = closed_[i].offset_;
+ if (offset <= ticket) {
+ slots = closed_[i].slots_;
+ cap = closed_[i].capacity_;
+ stride = closed_[i].stride_;
+ return;;
+ }
+ }
+ // A closed array with offset <= ticket should have been found
+ assert(false);
+ }
+namespace detail {
+/// CRTP specialization of MPMCQueueBase
+ template<
+ typename T, template<typename> class Atom, bool Dynamic> class Derived,
+ typename T, template<typename> class Atom, bool Dynamic>
+class MPMCQueueBase<Derived<T, Atom, Dynamic>> : boost::noncopyable {
+// Note: Using CRTP static casts in several functions of this base
+// template instead of making called functions virtual or duplicating
+// the code of calling functions in the derived partially specialized
+// template
static_assert(std::is_nothrow_constructible<T,T&&>::value ||
"T must be relocatable or have a noexcept move constructor");
- friend class detail::MPMCPipelineStageImpl<T>;
typedef T value_type;
- explicit MPMCQueue(size_t queueCapacity)
+ using Slot = detail::SingleElementQueue<T,Atom>;
+ explicit MPMCQueueBase(size_t queueCapacity)
: capacity_(queueCapacity)
, pushTicket_(0)
, popTicket_(0)
, pushSpinCutoff_(0)
, popSpinCutoff_(0)
- if (queueCapacity == 0)
+ if (queueCapacity == 0) {
throw std::invalid_argument(
"MPMCQueue with explicit capacity 0 is impossible"
+ // Stride computation in derived classes would sigfpe if capacity is 0
- // would sigfpe if capacity is 0
- stride_ = computeStride(queueCapacity);
- slots_ = new detail::SingleElementQueue<T,Atom>[queueCapacity +
- 2 * kSlotPadding];
+ }
// ideally this would be a static assert, but g++ doesn't allow it
/// A default-constructed queue is useful because a usable (non-zero
/// capacity) queue can be moved onto it or swapped with it
- MPMCQueue() noexcept
+ MPMCQueueBase() noexcept
: capacity_(0)
, slots_(nullptr)
, stride_(0)
+ , dstate_(0)
+ , dcapacity_(0)
, pushTicket_(0)
, popTicket_(0)
, pushSpinCutoff_(0)
/// IMPORTANT: The move constructor is here to make it easier to perform
/// the initialization phase, it is not safe to use when there are any
/// concurrent accesses (this is not checked).
- MPMCQueue(MPMCQueue<T,Atom>&& rhs) noexcept
+ MPMCQueueBase(MPMCQueueBase<Derived<T,Atom,Dynamic>>&& rhs) noexcept
: capacity_(rhs.capacity_)
, slots_(rhs.slots_)
, stride_(rhs.stride_)
+ , dstate_(rhs.dstate_.load(std::memory_order_relaxed))
+ , dcapacity_(rhs.dcapacity_.load(std::memory_order_relaxed))
, pushTicket_(rhs.pushTicket_.load(std::memory_order_relaxed))
, popTicket_(rhs.popTicket_.load(std::memory_order_relaxed))
, pushSpinCutoff_(rhs.pushSpinCutoff_.load(std::memory_order_relaxed))
rhs.capacity_ = 0;
rhs.slots_ = nullptr;
rhs.stride_ = 0;
+ rhs.dstate_.store(0, std::memory_order_relaxed);
+ rhs.dcapacity_.store(0, std::memory_order_relaxed);
rhs.pushTicket_.store(0, std::memory_order_relaxed);
rhs.popTicket_.store(0, std::memory_order_relaxed);
rhs.pushSpinCutoff_.store(0, std::memory_order_relaxed);
/// IMPORTANT: The move operator is here to make it easier to perform
/// the initialization phase, it is not safe to use when there are any
/// concurrent accesses (this is not checked).
- MPMCQueue<T,Atom> const& operator= (MPMCQueue<T,Atom>&& rhs) {
+ MPMCQueueBase<Derived<T,Atom,Dynamic>> const& operator=
+ (MPMCQueueBase<Derived<T,Atom,Dynamic>>&& rhs) {
if (this != &rhs) {
- this->~MPMCQueue();
- new (this) MPMCQueue(std::move(rhs));
+ this->~MPMCQueueBase();
+ new (this) MPMCQueueBase(std::move(rhs));
return *this;
/// MPMCQueue can only be safely destroyed when there are no
/// pending enqueuers or dequeuers (this is not checked).
- ~MPMCQueue() {
+ ~MPMCQueueBase() {
delete[] slots_;
return capacity_;
+ /// Doesn't change for non-dynamic
+ size_t allocatedCapacity() const noexcept {
+ return capacity_;
+ }
/// Returns the total number of calls to blockingWrite or successful
/// calls to write, including those blockingWrite calls that are
/// currently blocking
/// to a T constructor.
template <typename ...Args>
void blockingWrite(Args&&... args) noexcept {
- enqueueWithTicket(pushTicket_++, std::forward<Args>(args)...);
+ enqueueWithTicketBase(pushTicket_++, slots_, capacity_, stride_,
+ std::forward<Args>(args)...);
/// If an item can be enqueued with no blocking, does so and returns
template <typename ...Args>
bool write(Args&&... args) noexcept {
uint64_t ticket;
- if (tryObtainReadyPushTicket(ticket)) {
+ Slot* slots;
+ size_t cap;
+ int stride;
+ if (static_cast<Derived<T,Atom,Dynamic>*>(this)->
+ tryObtainReadyPushTicket(ticket, slots, cap, stride)) {
// we have pre-validated that the ticket won't block
- enqueueWithTicket(ticket, std::forward<Args>(args)...);
+ enqueueWithTicketBase(ticket, slots, cap, stride,
+ std::forward<Args>(args)...);
return true;
} else {
return false;
bool tryWriteUntil(const std::chrono::time_point<Clock>& when,
Args&&... args) noexcept {
uint64_t ticket;
- if (tryObtainPromisedPushTicketUntil(ticket, when)) {
- // we have pre-validated that the ticket won't block, or rather that
- // it won't block longer than it takes another thread to dequeue an
- // element from the slot it identifies.
- enqueueWithTicket(ticket, std::forward<Args>(args)...);
+ Slot* slots;
+ size_t cap;
+ int stride;
+ if (tryObtainPromisedPushTicketUntil(ticket, slots, cap, stride, when)) {
+ // we have pre-validated that the ticket won't block, or rather that
+ // it won't block longer than it takes another thread to dequeue an
+ // element from the slot it identifies.
+ enqueueWithTicketBase(ticket, slots, cap, stride,
+ std::forward<Args>(args)...);
return true;
} else {
return false;
template <typename ...Args>
bool writeIfNotFull(Args&&... args) noexcept {
uint64_t ticket;
- if (tryObtainPromisedPushTicket(ticket)) {
+ Slot* slots;
+ size_t cap;
+ int stride;
+ if (static_cast<Derived<T,Atom,Dynamic>*>(this)->
+ tryObtainPromisedPushTicket(ticket, slots, cap, stride)) {
// some other thread is already dequeuing the slot into which we
// are going to enqueue, but we might have to wait for them to finish
- enqueueWithTicket(ticket, std::forward<Args>(args)...);
+ enqueueWithTicketBase(ticket, slots, cap, stride,
+ std::forward<Args>(args)...);
return true;
} else {
return false;
/// Moves a dequeued element onto elem, blocking until an element
/// is available
void blockingRead(T& elem) noexcept {
- dequeueWithTicket(popTicket_++, elem);
+ uint64_t ticket;
+ static_cast<Derived<T,Atom,Dynamic>*>(this)->
+ blockingReadWithTicket(ticket, elem);
+ }
+ /// Same as blockingRead() but also records the ticket nunmer
+ void blockingReadWithTicket(uint64_t& ticket, T& elem) noexcept {
+ ticket = popTicket_++;
+ dequeueWithTicketBase(ticket, slots_, capacity_, stride_, elem);
/// If an item can be dequeued with no blocking, does so and returns
/// true, otherwise returns false.
bool read(T& elem) noexcept {
uint64_t ticket;
- if (tryObtainReadyPopTicket(ticket)) {
+ return readAndGetTicket(ticket, elem);
+ }
+ /// Same as read() but also records the ticket nunmer
+ bool readAndGetTicket(uint64_t& ticket, T& elem) noexcept {
+ Slot* slots;
+ size_t cap;
+ int stride;
+ if (static_cast<Derived<T,Atom,Dynamic>*>(this)->
+ tryObtainReadyPopTicket(ticket, slots, cap, stride)) {
// the ticket has been pre-validated to not block
- dequeueWithTicket(ticket, elem);
+ dequeueWithTicketBase(ticket, slots, cap, stride, elem);
return true;
} else {
return false;
/// prefer read.
bool readIfNotEmpty(T& elem) noexcept {
uint64_t ticket;
- if (tryObtainPromisedPopTicket(ticket)) {
+ Slot* slots;
+ size_t cap;
+ int stride;
+ if (static_cast<Derived<T,Atom,Dynamic>*>(this)->
+ tryObtainPromisedPopTicket(ticket, slots, cap, stride)) {
// the matching enqueue already has a ticket, but might not be done
- dequeueWithTicket(ticket, elem);
+ dequeueWithTicketBase(ticket, slots, cap, stride, elem);
return true;
} else {
return false;
- private:
+ protected:
enum {
/// Once every kAdaptationFreq we will spin longer, to try to estimate
/// the proper spin backoff
/// allocations, we pad it with this many SingleElementQueue-s at
/// each end
kSlotPadding = (detail::CacheLocality::kFalseSharingRange - 1)
- / sizeof(detail::SingleElementQueue<T,Atom>) + 1
+ / sizeof(Slot) + 1
/// The maximum number of items in the queue at once
- /// An array of capacity_ SingleElementQueue-s, each of which holds
- /// either 0 or 1 item. We over-allocate by 2 * kSlotPadding and don't
- /// touch the slots at either end, to avoid false sharing
- detail::SingleElementQueue<T,Atom>* slots_;
+ /// Anonymous union for use when Dynamic = false and true, respectively
+ union {
+ /// An array of capacity_ SingleElementQueue-s, each of which holds
+ /// either 0 or 1 item. We over-allocate by 2 * kSlotPadding and don't
+ /// touch the slots at either end, to avoid false sharing
+ Slot* slots_;
+ /// Current dynamic slots array of dcapacity_ SingleElementQueue-s
+ Atom<Slot*> dslots_;
+ };
+ /// Anonymous union for use when Dynamic = false and true, respectively
+ union {
+ /// The number of slots_ indices that we advance for each ticket, to
+ /// avoid false sharing. Ideally slots_[i] and slots_[i + stride_]
+ /// aren't on the same cache line
+ int stride_;
+ /// Current stride
+ Atom<int> dstride_;
+ };
- /// The number of slots_ indices that we advance for each ticket, to
- /// avoid false sharing. Ideally slots_[i] and slots_[i + stride_]
- /// aren't on the same cache line
- int stride_;
+ /// The following two memebers are used by dynamic MPMCQueue.
+ /// Ideally they should be in MPMCQueue<T,Atom,true>, but we get
+ /// better cache locality if they are in the same cache line as
+ /// dslots_ and dstride_.
+ ///
+ /// Dynamic state. A packed seqlock and ticket offset
+ Atom<uint64_t> dstate_;
+ /// Dynamic capacity
+ Atom<size_t> dcapacity_;
/// Enqueuers get tickets from here
Atom<uint64_t> FOLLY_ALIGN_TO_AVOID_FALSE_SHARING pushTicket_;
char padding_[detail::CacheLocality::kFalseSharingRange -
/// We assign tickets in increasing order, but we don't want to
/// access neighboring elements of slots_ because that will lead to
/// false sharing (multiple cores accessing the same cache line even
/// Returns the index into slots_ that should be used when enqueuing or
/// dequeuing with the specified ticket
- size_t idx(uint64_t ticket) noexcept {
- return ((ticket * stride_) % capacity_) + kSlotPadding;
+ size_t idx(uint64_t ticket, size_t cap, int stride) noexcept {
+ return ((ticket * stride) % cap) + kSlotPadding;
/// Maps an enqueue or dequeue ticket to the turn should be used at the
/// corresponding SingleElementQueue
- uint32_t turn(uint64_t ticket) noexcept {
- return ticket / capacity_;
+ uint32_t turn(uint64_t ticket, size_t cap) noexcept {
+ return ticket / cap;
/// Tries to obtain a push ticket for which SingleElementQueue::enqueue
/// won't block. Returns true on immediate success, false on immediate
/// failure.
- bool tryObtainReadyPushTicket(uint64_t& rv) noexcept {
- auto ticket = pushTicket_.load(std::memory_order_acquire); // A
+ bool tryObtainReadyPushTicket(
+ uint64_t& ticket, Slot*& slots, size_t& cap, int& stride
+ ) noexcept {
+ ticket = pushTicket_.load(std::memory_order_acquire); // A
+ slots = slots_;
+ cap = capacity_;
+ stride = stride_;
while (true) {
- if (!slots_[idx(ticket)].mayEnqueue(turn(ticket))) {
+ if (!slots[idx(ticket, cap, stride)]
+ .mayEnqueue(turn(ticket, cap))) {
// if we call enqueue(ticket, ...) on the SingleElementQueue
// right now it would block, but this might no longer be the next
// ticket. We can increase the chance of tryEnqueue success under
// or prev failing CAS) and the following CAS. If the CAS fails
// it will effect a load of pushTicket_
if (pushTicket_.compare_exchange_strong(ticket, ticket + 1)) {
- rv = ticket;
return true;
/// ticket is filled on success AND failure.
template <class Clock>
bool tryObtainPromisedPushTicketUntil(
- uint64_t& ticket, const std::chrono::time_point<Clock>& when) noexcept {
+ uint64_t& ticket, Slot*& slots, size_t& cap, int& stride,
+ const std::chrono::time_point<Clock>& when
+ ) noexcept {
bool deadlineReached = false;
while (!deadlineReached) {
- if (tryObtainPromisedPushTicket(ticket)) {
+ if (static_cast<Derived<T,Atom,Dynamic>*>(this)->
+ tryObtainPromisedPushTicket(ticket, slots, cap, stride)) {
return true;
// ticket is a blocking ticket until the preceding ticket has been
// processed: wait until this ticket's turn arrives. We have not reserved
// this ticket so we will have to re-attempt to get a non-blocking ticket
// if we wake up before we time-out.
- deadlineReached = !slots_[idx(ticket)].tryWaitForEnqueueTurnUntil(
- turn(ticket), pushSpinCutoff_, (ticket % kAdaptationFreq) == 0, when);
+ deadlineReached = !slots[idx(ticket, cap, stride)]
+ .tryWaitForEnqueueTurnUntil(turn(ticket, cap), pushSpinCutoff_,
+ (ticket % kAdaptationFreq) == 0, when);
return false;
/// blocking may be required when using the returned ticket if some
/// other thread's pop is still in progress (ticket has been granted but
/// pop has not yet completed).
- bool tryObtainPromisedPushTicket(uint64_t& rv) noexcept {
+ bool tryObtainPromisedPushTicket(
+ uint64_t& ticket, Slot*& slots, size_t& cap, int& stride
+ ) noexcept {
auto numPushes = pushTicket_.load(std::memory_order_acquire); // A
+ slots = slots_;
+ cap = capacity_;
+ stride = stride_;
while (true) {
auto numPops = popTicket_.load(std::memory_order_acquire); // B
// n will be negative if pops are pending
int64_t n = numPushes - numPops;
- rv = numPushes;
+ ticket = numPushes;
if (n >= static_cast<ssize_t>(capacity_)) {
// Full, linearize at B. We don't need to recheck the read we
// performed at A, because if numPushes was stale at B then the
/// Tries to obtain a pop ticket for which SingleElementQueue::dequeue
/// won't block. Returns true on immediate success, false on immediate
/// failure.
- bool tryObtainReadyPopTicket(uint64_t& rv) noexcept {
- auto ticket = popTicket_.load(std::memory_order_acquire);
+ bool tryObtainReadyPopTicket(
+ uint64_t& ticket, Slot*& slots, size_t& cap, int& stride
+ ) noexcept {
+ ticket = popTicket_.load(std::memory_order_acquire);
+ slots = slots_;
+ cap = capacity_;
+ stride = stride_;
while (true) {
- if (!slots_[idx(ticket)].mayDequeue(turn(ticket))) {
+ if (!slots[idx(ticket, cap, stride)]
+ .mayDequeue(turn(ticket, cap))) {
auto prev = ticket;
ticket = popTicket_.load(std::memory_order_acquire);
if (prev == ticket) {
} else {
if (popTicket_.compare_exchange_strong(ticket, ticket + 1)) {
- rv = ticket;
return true;
/// to block waiting for someone to call enqueue, although we might
/// have to block waiting for them to finish executing code inside the
/// MPMCQueue itself.
- bool tryObtainPromisedPopTicket(uint64_t& rv) noexcept {
+ bool tryObtainPromisedPopTicket(
+ uint64_t& ticket, Slot*& slots, size_t& cap, int& stride
+ ) noexcept {
auto numPops = popTicket_.load(std::memory_order_acquire); // A
while (true) {
auto numPushes = pushTicket_.load(std::memory_order_acquire); // B
return false;
if (popTicket_.compare_exchange_strong(numPops, numPops + 1)) {
- rv = numPops;
+ ticket = numPops;
+ slots = slots_;
+ cap = capacity_;
+ stride = stride_;
return true;
// Given a ticket, constructs an enqueued item using args
template <typename ...Args>
+ void enqueueWithTicketBase(
+ uint64_t ticket, Slot* slots, size_t cap, int stride, Args&&... args
+ ) noexcept {
+ slots[idx(ticket, cap, stride)]
+ .enqueue(turn(ticket, cap),
+ pushSpinCutoff_,
+ (ticket % kAdaptationFreq) == 0,
+ std::forward<Args>(args)...);
+ }
+ // To support tracking ticket numbers in MPMCPipelineStageImpl
+ template <typename ...Args>
void enqueueWithTicket(uint64_t ticket, Args&&... args) noexcept {
- slots_[idx(ticket)].enqueue(turn(ticket),
- pushSpinCutoff_,
- (ticket % kAdaptationFreq) == 0,
- std::forward<Args>(args)...);
+ enqueueWithTicketBase(ticket, slots_, capacity_, stride_,
+ std::forward<Args>(args)...);
// Given a ticket, dequeues the corresponding element
- void dequeueWithTicket(uint64_t ticket, T& elem) noexcept {
- slots_[idx(ticket)].dequeue(turn(ticket),
- popSpinCutoff_,
- (ticket % kAdaptationFreq) == 0,
- elem);
+ void dequeueWithTicketBase(
+ uint64_t ticket, Slot* slots, size_t cap, int stride, T& elem
+ ) noexcept {
+ slots[idx(ticket, cap, stride)]
+ .dequeue(turn(ticket, cap),
+ popSpinCutoff_,
+ (ticket % kAdaptationFreq) == 0,
+ elem);
-namespace detail {
/// SingleElementQueue implements a blocking queue that holds at most one
/// item, and that requires its users to assign incrementing identifiers
/// (turns) to each enqueue and dequeue operation. Note that the turns
run_mt_sequencer_test<DeterministicAtomic>(10, 1000, -100);
-template <typename T>
+template <bool Dynamic = false, typename T>
void runElementTypeTest(T&& src) {
- MPMCQueue<T> cq(10);
+ MPMCQueue<T, std::atomic, Dynamic> cq(10);
T dest;
EXPECT_EQ(RefCounted::active_instances, 0);
+TEST(MPMCQueue, lots_of_element_types_dynamic) {
+ runElementTypeTest<true>(10);
+ runElementTypeTest<true>(string("abc"));
+ runElementTypeTest<true>(std::make_pair(10, string("def")));
+ runElementTypeTest<true>(vector<string>{{"abc"}});
+ runElementTypeTest<true>(std::make_shared<char>('a'));
+ runElementTypeTest<true>(folly::make_unique<char>('a'));
+ runElementTypeTest<true>(boost::intrusive_ptr<RefCounted>(new RefCounted));
+ EXPECT_EQ(RefCounted::active_instances, 0);
TEST(MPMCQueue, single_thread_enqdeq) {
+ // Non-dynamic version only.
+ // False positive for dynamic version. Capacity can be temporarily
+ // higher than specified.
MPMCQueue<int> cq(10);
for (int pass = 0; pass < 10; ++pass) {
TEST(MPMCQueue, tryenq_capacity_test) {
+ // Non-dynamic version only.
+ // False positive for dynamic version. Capacity can be temporarily
+ // higher than specified.
for (size_t cap = 1; cap < 100; ++cap) {
MPMCQueue<int> cq(cap);
for (size_t i = 0; i < cap; ++i) {
TEST(MPMCQueue, enq_capacity_test) {
+ // Non-dynamic version only.
+ // False positive for dynamic version. Capacity can be temporarily
+ // higher than specified.
for (auto cap : { 1, 100, 10000 }) {
MPMCQueue<int> cq(cap);
for (int i = 0; i < cap; ++i) {
-template <template<typename> class Atom>
+template <template<typename> class Atom, bool Dynamic = false>
void runTryEnqDeqThread(
int numThreads,
int n, /*numOps*/
- MPMCQueue<int, Atom>& cq,
+ MPMCQueue<int, Atom, Dynamic>& cq,
std::atomic<uint64_t>& sum,
int t) {
uint64_t threadSum = 0;
sum += threadSum;
-template <template<typename> class Atom>
+template <template<typename> class Atom, bool Dynamic = false>
void runTryEnqDeqTest(int numThreads, int numOps) {
// write and read aren't linearizable, so we don't have
// hard guarantees on their individual behavior. We can still test
// correctness in aggregate
- MPMCQueue<int,Atom> cq(numThreads);
+ MPMCQueue<int,Atom, Dynamic> cq(numThreads);
uint64_t n = numOps;
vector<std::thread> threads(numThreads);
std::atomic<uint64_t> sum(0);
for (int t = 0; t < numThreads; ++t) {
- threads[t] = DSched::thread(std::bind(runTryEnqDeqThread<Atom>,
+ threads[t] = DSched::thread(std::bind(runTryEnqDeqThread<Atom, Dynamic>,
numThreads, n, std::ref(cq), std::ref(sum), t));
for (auto& t : threads) {
+TEST(MPMCQueue, mt_try_enq_deq_dynamic) {
+ int nts[] = { 1, 3, 100 };
+ int n = 100000;
+ for (int nt : nts) {
+ runTryEnqDeqTest<std::atomic, /* Dynamic = */ true>(nt, n);
+ }
TEST(MPMCQueue, mt_try_enq_deq_emulated_futex) {
int nts[] = { 1, 3, 100 };
+TEST(MPMCQueue, mt_try_enq_deq_emulated_futex_dynamic) {
+ int nts[] = { 1, 3, 100 };
+ int n = 100000;
+ for (int nt : nts) {
+ runTryEnqDeqTest<EmulatedFutexAtomic, /* Dynamic = */ true>(nt, n);
+ }
TEST(MPMCQueue, mt_try_enq_deq_deterministic) {
int nts[] = { 3, 10 };
DSched sched(DSched::uniformSubset(seed, 2));
runTryEnqDeqTest<DeterministicAtomic>(nt, n);
+ {
+ DSched sched(DSched::uniform(seed));
+ runTryEnqDeqTest<DeterministicAtomic, /*Dynamic = */ true>(nt, n);
+ }
+ {
+ DSched sched(DSched::uniformSubset(seed, 2));
+ runTryEnqDeqTest<DeterministicAtomic, /*Dynamic = */ true>(nt, n);
+ }
long csw = endUsage.ru_nvcsw + endUsage.ru_nivcsw -
(beginUsage.ru_nvcsw + beginUsage.ru_nivcsw);
uint64_t failures = failed;
+ size_t allocated = q.allocatedCapacity();
return folly::sformat(
"{}, {} {} producers, {} consumers => {} nanos/handoff, {} csw / {} "
- "handoff, {} failures",
+ "handoff, {} failures, {} allocated",
- failures);
+ failures,
+ allocated);
-TEST(MPMCQueue, mt_prod_cons_deterministic) {
+template <bool Dynamic = false>
+void runMtProdConsDeterministic(long seed) {
// we use the Bench method, but perf results are meaningless under DSched
- DSched sched(DSched::uniform(0));
+ DSched sched(DSched::uniform(seed));
+ vector<unique_ptr<WriteMethodCaller<MPMCQueue<int, DeterministicAtomic,
+ Dynamic>>>> callers;
+ callers.emplace_back(make_unique<BlockingWriteCaller<MPMCQueue<int,
+ DeterministicAtomic, Dynamic>>>());
+ callers.emplace_back(make_unique<WriteIfNotFullCaller<MPMCQueue<int,
+ DeterministicAtomic, Dynamic>>>());
+ callers.emplace_back(make_unique<WriteCaller<MPMCQueue<int,
+ DeterministicAtomic, Dynamic>>>());
+ callers.emplace_back(make_unique<TryWriteUntilCaller<MPMCQueue<int,
+ DeterministicAtomic, Dynamic>>>(milliseconds(1)));
+ callers.emplace_back(make_unique<TryWriteUntilCaller<MPMCQueue<int,
+ DeterministicAtomic, Dynamic>>>(seconds(2)));
+ size_t cap;
+ for (const auto& caller : callers) {
+ cap = 10;
+ LOG(INFO) <<
+ producerConsumerBench(
+ MPMCQueue<int, DeterministicAtomic, Dynamic>(cap),
+ "MPMCQueue<int, DeterministicAtomic, Dynamic>("
+ + folly::to<std::string>(cap)+")",
+ 1,
+ 1,
+ 1000,
+ *caller);
+ cap = 100;
+ LOG(INFO) <<
+ producerConsumerBench(
+ MPMCQueue<int, DeterministicAtomic, Dynamic>(cap),
+ "MPMCQueue<int, DeterministicAtomic, Dynamic>("
+ + folly::to<std::string>(cap)+")",
+ 10,
+ 10,
+ 1000,
+ *caller);
+ cap = 10;
+ LOG(INFO) <<
+ producerConsumerBench(
+ MPMCQueue<int, DeterministicAtomic, Dynamic>(cap),
+ "MPMCQueue<int, DeterministicAtomic, Dynamic>("
+ + folly::to<std::string>(cap)+")",
+ 1,
+ 1,
+ 1000,
+ *caller);
+ cap = 100;
+ LOG(INFO) <<
+ producerConsumerBench(
+ MPMCQueue<int, DeterministicAtomic, Dynamic>(cap),
+ "MPMCQueue<int, DeterministicAtomic, Dynamic>("
+ + folly::to<std::string>(cap)+")",
+ 10,
+ 10,
+ 1000,
+ *caller);
+ cap = 1;
+ LOG(INFO) <<
+ producerConsumerBench(
+ MPMCQueue<int, DeterministicAtomic, Dynamic>(cap),
+ "MPMCQueue<int, DeterministicAtomic, Dynamic>("
+ + folly::to<std::string>(cap)+")",
+ 10,
+ 10,
+ 1000,
+ *caller);
+ }
- vector<unique_ptr<WriteMethodCaller<MPMCQueue<int, DeterministicAtomic>>>>
- callers;
- callers.emplace_back(
- make_unique<BlockingWriteCaller<MPMCQueue<int, DeterministicAtomic>>>());
- callers.emplace_back(
- make_unique<WriteIfNotFullCaller<MPMCQueue<int, DeterministicAtomic>>>());
- callers.emplace_back(
- make_unique<WriteCaller<MPMCQueue<int, DeterministicAtomic>>>());
- callers.emplace_back(
- make_unique<TryWriteUntilCaller<MPMCQueue<int, DeterministicAtomic>>>(
- milliseconds(1)));
- callers.emplace_back(
- make_unique<TryWriteUntilCaller<MPMCQueue<int, DeterministicAtomic>>>(
- seconds(2)));
+void runMtProdConsDeterministicDynamic(
+ long seed,
+ uint32_t prods,
+ uint32_t cons,
+ uint32_t numOps,
+ size_t cap,
+ size_t minCap,
+ size_t mult
+) {
+ // we use the Bench method, but perf results are meaningless under DSched
+ DSched sched(DSched::uniform(seed));
+ vector<unique_ptr<WriteMethodCaller<MPMCQueue<int, DeterministicAtomic,
+ true>>>> callers;
+ callers.emplace_back(make_unique<BlockingWriteCaller<MPMCQueue<int,
+ DeterministicAtomic, true>>>());
+ callers.emplace_back(make_unique<WriteIfNotFullCaller<MPMCQueue<int,
+ DeterministicAtomic, true>>>());
+ callers.emplace_back(make_unique<WriteCaller<MPMCQueue<int,
+ DeterministicAtomic, true>>>());
+ callers.emplace_back(make_unique<TryWriteUntilCaller<MPMCQueue<int,
+ DeterministicAtomic, true>>>(milliseconds(1)));
+ callers.emplace_back(make_unique<TryWriteUntilCaller<MPMCQueue<int,
+ DeterministicAtomic, true>>>(seconds(2)));
for (const auto& caller : callers) {
- << producerConsumerBench(MPMCQueue<int, DeterministicAtomic>(10),
- "MPMCQueue<int, DeterministicAtomic>(10)",
- 1,
- 1,
- 1000,
- *caller);
- << producerConsumerBench(MPMCQueue<int, DeterministicAtomic>(100),
- "MPMCQueue<int, DeterministicAtomic>(100)",
- 10,
- 10,
- 1000,
- *caller);
- << producerConsumerBench(MPMCQueue<int, DeterministicAtomic>(10),
- "MPMCQueue<int, DeterministicAtomic>(10)",
- 1,
- 1,
- 1000,
- *caller);
- << producerConsumerBench(MPMCQueue<int, DeterministicAtomic>(100),
- "MPMCQueue<int, DeterministicAtomic>(100)",
- 10,
- 10,
- 1000,
- *caller);
- LOG(INFO) << producerConsumerBench(MPMCQueue<int, DeterministicAtomic>(1),
- "MPMCQueue<int, DeterministicAtomic>(1)",
- 10,
- 10,
- 1000,
- *caller);
+ LOG(INFO) <<
+ producerConsumerBench(
+ MPMCQueue<int, DeterministicAtomic, true>(cap, minCap, mult),
+ "MPMCQueue<int, DeterministicAtomic, true>("
+ + folly::to<std::string>(cap) + ", "
+ + folly::to<std::string>(minCap) + ", "
+ + folly::to<std::string>(mult)+")",
+ prods,
+ cons,
+ numOps,
+ *caller);
+TEST(MPMCQueue, mt_prod_cons_deterministic) {
+ runMtProdConsDeterministic(0);
+TEST(MPMCQueue, mt_prod_cons_deterministic_dynamic) {
+ runMtProdConsDeterministic<true>(0);
+template <typename T>
+void setFromEnv(T& var, const char* envvar) {
+ char* str = std::getenv(envvar);
+ if (str) { var = atoi(str); }
+TEST(MPMCQueue, mt_prod_cons_deterministic_dynamic_with_arguments) {
+ long seed = 0;
+ uint32_t prods = 10;
+ uint32_t cons = 10;
+ uint32_t numOps = 1000;
+ size_t cap = 10000;
+ size_t minCap = 9;
+ size_t mult = 3;
+ setFromEnv(seed, "SEED");
+ setFromEnv(prods, "PRODS");
+ setFromEnv(cons, "CONS");
+ setFromEnv(numOps, "NUM_OPS");
+ setFromEnv(cap, "CAP");
+ setFromEnv(minCap, "MIN_CAP");
+ setFromEnv(mult, "MULT");
+ runMtProdConsDeterministicDynamic(
+ seed, prods, cons, numOps, cap, minCap, mult);
#define PC_BENCH(q, np, nc, ...) \
producerConsumerBench(q, #q, (np), (nc), __VA_ARGS__)
-TEST(MPMCQueue, mt_prod_cons) {
+template <bool Dynamic = false>
+void runMtProdCons() {
int n = 100000;
- vector<unique_ptr<WriteMethodCaller<MPMCQueue<int>>>> callers;
- callers.emplace_back(make_unique<BlockingWriteCaller<MPMCQueue<int>>>());
- callers.emplace_back(make_unique<WriteIfNotFullCaller<MPMCQueue<int>>>());
- callers.emplace_back(make_unique<WriteCaller<MPMCQueue<int>>>());
- callers.emplace_back(
- make_unique<TryWriteUntilCaller<MPMCQueue<int>>>(milliseconds(1)));
- callers.emplace_back(
- make_unique<TryWriteUntilCaller<MPMCQueue<int>>>(seconds(2)));
+ setFromEnv(n, "NUM_OPS");
+ vector<unique_ptr<WriteMethodCaller<MPMCQueue<int, std::atomic, Dynamic>>>>
+ callers;
+ callers.emplace_back(make_unique<BlockingWriteCaller<MPMCQueue<int,
+ std::atomic, Dynamic>>>());
+ callers.emplace_back(make_unique<WriteIfNotFullCaller<MPMCQueue<int,
+ std::atomic, Dynamic>>>());
+ callers.emplace_back(make_unique<WriteCaller<MPMCQueue<int, std::atomic,
+ Dynamic>>>());
+ callers.emplace_back(make_unique<TryWriteUntilCaller<MPMCQueue<int,
+ std::atomic, Dynamic>>>(milliseconds(1)));
+ callers.emplace_back(make_unique<TryWriteUntilCaller<MPMCQueue<int,
+ std::atomic, Dynamic>>>(seconds(2)));
for (const auto& caller : callers) {
- LOG(INFO) << PC_BENCH(MPMCQueue<int>(10), 1, 1, n, *caller);
- LOG(INFO) << PC_BENCH(MPMCQueue<int>(10), 10, 1, n, *caller);
- LOG(INFO) << PC_BENCH(MPMCQueue<int>(10), 1, 10, n, *caller);
- LOG(INFO) << PC_BENCH(MPMCQueue<int>(10), 10, 10, n, *caller);
- LOG(INFO) << PC_BENCH(MPMCQueue<int>(10000), 1, 1, n, *caller);
- LOG(INFO) << PC_BENCH(MPMCQueue<int>(10000), 10, 1, n, *caller);
- LOG(INFO) << PC_BENCH(MPMCQueue<int>(10000), 1, 10, n, *caller);
- LOG(INFO) << PC_BENCH(MPMCQueue<int>(10000), 10, 10, n, *caller);
- LOG(INFO) << PC_BENCH(MPMCQueue<int>(100000), 32, 100, n, *caller);
+ LOG(INFO) << PC_BENCH((MPMCQueue<int, std::atomic, Dynamic>(10)),
+ 1, 1, n, *caller);
+ LOG(INFO) << PC_BENCH((MPMCQueue<int, std::atomic, Dynamic>(10)),
+ 10, 1, n, *caller);
+ LOG(INFO) << PC_BENCH((MPMCQueue<int, std::atomic, Dynamic>(10)),
+ 1, 10, n, *caller);
+ LOG(INFO) << PC_BENCH((MPMCQueue<int, std::atomic, Dynamic>(10)),
+ 10, 10, n, *caller);
+ LOG(INFO) << PC_BENCH((MPMCQueue<int, std::atomic, Dynamic>(10000)),
+ 1, 1, n, *caller);
+ LOG(INFO) << PC_BENCH((MPMCQueue<int, std::atomic, Dynamic>(10000)),
+ 10, 1, n, *caller);
+ LOG(INFO) << PC_BENCH((MPMCQueue<int, std::atomic, Dynamic>(10000)),
+ 1, 10, n, *caller);
+ LOG(INFO) << PC_BENCH((MPMCQueue<int, std::atomic, Dynamic>(10000)),
+ 10, 10, n, *caller);
+ LOG(INFO) << PC_BENCH((MPMCQueue<int, std::atomic, Dynamic>(100000)),
+ 32, 100, n, *caller);
-TEST(MPMCQueue, mt_prod_cons_emulated_futex) {
+TEST(MPMCQueue, mt_prod_cons) {
+ runMtProdCons();
+TEST(MPMCQueue, mt_prod_cons_dynamic) {
+ runMtProdCons</* Dynamic = */ true>();
+template <bool Dynamic = false>
+void runMtProdConsEmulatedFutex() {
int n = 100000;
- vector<unique_ptr<WriteMethodCaller<MPMCQueue<int, EmulatedFutexAtomic>>>>
- callers;
- callers.emplace_back(
- make_unique<BlockingWriteCaller<MPMCQueue<int, EmulatedFutexAtomic>>>());
- callers.emplace_back(
- make_unique<WriteIfNotFullCaller<MPMCQueue<int, EmulatedFutexAtomic>>>());
- callers.emplace_back(
- make_unique<WriteCaller<MPMCQueue<int, EmulatedFutexAtomic>>>());
- callers.emplace_back(
- make_unique<TryWriteUntilCaller<MPMCQueue<int, EmulatedFutexAtomic>>>(
- milliseconds(1)));
- callers.emplace_back(
- make_unique<TryWriteUntilCaller<MPMCQueue<int, EmulatedFutexAtomic>>>(
- seconds(2)));
+ vector<unique_ptr<WriteMethodCaller<MPMCQueue<int, EmulatedFutexAtomic,
+ Dynamic>>>> callers;
+ callers.emplace_back(make_unique<BlockingWriteCaller<MPMCQueue<int,
+ EmulatedFutexAtomic, Dynamic>>>());
+ callers.emplace_back(make_unique<WriteIfNotFullCaller<MPMCQueue<int,
+ EmulatedFutexAtomic, Dynamic>>>());
+ callers.emplace_back(make_unique<WriteCaller<MPMCQueue<int,
+ EmulatedFutexAtomic, Dynamic>>>());
+ callers.emplace_back(make_unique<TryWriteUntilCaller<MPMCQueue<int,
+ EmulatedFutexAtomic, Dynamic>>>(milliseconds(1)));
+ callers.emplace_back(make_unique<TryWriteUntilCaller<MPMCQueue<int,
+ EmulatedFutexAtomic, Dynamic>>>(seconds(2)));
for (const auto& caller : callers) {
- (MPMCQueue<int, EmulatedFutexAtomic>(10)), 1, 1, n, *caller);
+ (MPMCQueue<int, EmulatedFutexAtomic, Dynamic>(10)), 1, 1, n, *caller);
- (MPMCQueue<int, EmulatedFutexAtomic>(10)), 10, 1, n, *caller);
+ (MPMCQueue<int, EmulatedFutexAtomic, Dynamic>(10)), 10, 1, n, *caller);
- (MPMCQueue<int, EmulatedFutexAtomic>(10)), 1, 10, n, *caller);
+ (MPMCQueue<int, EmulatedFutexAtomic, Dynamic>(10)), 1, 10, n, *caller);
- (MPMCQueue<int, EmulatedFutexAtomic>(10)), 10, 10, n, *caller);
+ (MPMCQueue<int, EmulatedFutexAtomic, Dynamic>(10)), 10, 10, n, *caller);
- (MPMCQueue<int, EmulatedFutexAtomic>(10000)), 1, 1, n, *caller);
+ (MPMCQueue<int, EmulatedFutexAtomic, Dynamic>(10000)), 1, 1, n, *caller);
- (MPMCQueue<int, EmulatedFutexAtomic>(10000)), 10, 1, n, *caller);
+ (MPMCQueue<int, EmulatedFutexAtomic, Dynamic>(10000)), 10, 1, n, *caller);
- (MPMCQueue<int, EmulatedFutexAtomic>(10000)), 1, 10, n, *caller);
- (MPMCQueue<int, EmulatedFutexAtomic>(10000)), 10, 10, n, *caller);
- (MPMCQueue<int, EmulatedFutexAtomic>(100000)), 32, 100, n, *caller);
+ (MPMCQueue<int, EmulatedFutexAtomic, Dynamic>(10000)), 1, 10, n, *caller);
+ LOG(INFO) << PC_BENCH((MPMCQueue<int, EmulatedFutexAtomic, Dynamic>
+ (10000)), 10, 10, n, *caller);
+ LOG(INFO) << PC_BENCH((MPMCQueue<int, EmulatedFutexAtomic, Dynamic>
+ (100000)), 32, 100, n, *caller);
-template <template <typename> class Atom>
+TEST(MPMCQueue, mt_prod_cons_emulated_futex) {
+ runMtProdConsEmulatedFutex();
+TEST(MPMCQueue, mt_prod_cons_emulated_futex_dynamic) {
+ runMtProdConsEmulatedFutex</* Dynamic = */ true>();
+template <template <typename> class Atom, bool Dynamic = false>
void runNeverFailThread(int numThreads,
int n, /*numOps*/
- MPMCQueue<int, Atom>& cq,
+ MPMCQueue<int, Atom, Dynamic>& cq,
std::atomic<uint64_t>& sum,
int t) {
uint64_t threadSum = 0;
sum += threadSum;
-template <template <typename> class Atom>
+template <template <typename> class Atom, bool Dynamic = false>
uint64_t runNeverFailTest(int numThreads, int numOps) {
// always #enq >= #deq
- MPMCQueue<int, Atom> cq(numThreads);
+ MPMCQueue<int, Atom, Dynamic> cq(numThreads);
uint64_t n = numOps;
auto beginMicro = nowMicro();
vector<std::thread> threads(numThreads);
std::atomic<uint64_t> sum(0);
for (int t = 0; t < numThreads; ++t) {
- threads[t] = DSched::thread(std::bind(runNeverFailThread<Atom>,
+ threads[t] = DSched::thread(std::bind(runNeverFailThread<Atom, Dynamic>,
return nowMicro() - beginMicro;
-TEST(MPMCQueue, mt_never_fail) {
- int nts[] = {1, 3, 100};
- int n = 100000;
+template <template<typename> class Atom, bool Dynamic = false>
+void runMtNeverFail(std::vector<int>& nts, int n) {
for (int nt : nts) {
- uint64_t elapsed = runNeverFailTest<std::atomic>(nt, n);
+ uint64_t elapsed = runNeverFailTest<Atom, Dynamic>(nt, n);
LOG(INFO) << (elapsed * 1000.0) / (n * 2) << " nanos per op with " << nt
<< " threads";
-TEST(MPMCQueue, mt_never_fail_emulated_futex) {
- int nts[] = {1, 3, 100};
+TEST(MPMCQueue, mt_never_fail) {
+ std::vector<int> nts {1, 3, 100};
+ int n = 100000;
+ runMtNeverFail<std::atomic>(nts, n);
+TEST(MPMCQueue, mt_never_fail_dynamic) {
+ std::vector<int> nts {1, 3, 100};
int n = 100000;
- for (int nt : nts) {
- uint64_t elapsed = runNeverFailTest<EmulatedFutexAtomic>(nt, n);
- LOG(INFO) << (elapsed * 1000.0) / (n * 2) << " nanos per op with " << nt
- << " threads";
- }
+ runMtNeverFail<std::atomic, true>(nts, n);
-TEST(MPMCQueue, mt_never_fail_deterministic) {
- int nts[] = {3, 10};
+TEST(MPMCQueue, mt_never_fail_emulated_futex) {
+ std::vector<int> nts {1, 3, 100};
+ int n = 100000;
+ runMtNeverFail<EmulatedFutexAtomic>(nts, n);
- long seed = 0; // nowMicro() % 10000;
- LOG(INFO) << "using seed " << seed;
+TEST(MPMCQueue, mt_never_fail_emulated_futex_dynamic) {
+ std::vector<int> nts {1, 3, 100};
+ int n = 100000;
+ runMtNeverFail<EmulatedFutexAtomic, true>(nts, n);
- int n = 1000;
+template<bool Dynamic = false>
+void runMtNeverFailDeterministic(std::vector<int>& nts, int n, long seed) {
+ LOG(INFO) << "using seed " << seed;
for (int nt : nts) {
DSched sched(DSched::uniform(seed));
- runNeverFailTest<DeterministicAtomic>(nt, n);
+ runNeverFailTest<DeterministicAtomic, Dynamic>(nt, n);
DSched sched(DSched::uniformSubset(seed, 2));
- runNeverFailTest<DeterministicAtomic>(nt, n);
+ runNeverFailTest<DeterministicAtomic, Dynamic>(nt, n);
-template <class Clock, template <typename> class Atom>
+TEST(MPMCQueue, mt_never_fail_deterministic) {
+ std::vector<int> nts {3, 10};
+ long seed = 0; // nowMicro() % 10000;
+ int n = 1000;
+ runMtNeverFailDeterministic(nts, n, seed);
+TEST(MPMCQueue, mt_never_fail_deterministic_dynamic) {
+ std::vector<int> nts {3, 10};
+ long seed = 0; // nowMicro() % 10000;
+ int n = 1000;
+ runMtNeverFailDeterministic<true>(nts, n, seed);
+template <class Clock, template <typename> class Atom, bool Dynamic>
void runNeverFailUntilThread(int numThreads,
int n, /*numOps*/
- MPMCQueue<int, Atom>& cq,
+ MPMCQueue<int, Atom, Dynamic>& cq,
std::atomic<uint64_t>& sum,
int t) {
uint64_t threadSum = 0;
sum += threadSum;
-template <class Clock, template <typename> class Atom>
+template <class Clock, template <typename> class Atom, bool Dynamic = false>
uint64_t runNeverFailTest(int numThreads, int numOps) {
// always #enq >= #deq
- MPMCQueue<int, Atom> cq(numThreads);
+ MPMCQueue<int, Atom, Dynamic> cq(numThreads);
uint64_t n = numOps;
auto beginMicro = nowMicro();
vector<std::thread> threads(numThreads);
std::atomic<uint64_t> sum(0);
for (int t = 0; t < numThreads; ++t) {
- threads[t] = DSched::thread(std::bind(runNeverFailUntilThread<Clock, Atom>,
- numThreads,
- n,
- std::ref(cq),
- std::ref(sum),
- t));
+ threads[t] = DSched::thread(std::bind(
+ runNeverFailUntilThread<Clock, Atom, Dynamic>,
+ numThreads,
+ n,
+ std::ref(cq),
+ std::ref(sum),
+ t));
for (auto& t : threads) {
return nowMicro() - beginMicro;
-TEST(MPMCQueue, mt_never_fail_until_system) {
- int nts[] = {1, 3, 100};
- int n = 100000;
+template <bool Dynamic = false>
+void runMtNeverFailUntilSystem(std::vector<int>& nts, int n) {
for (int nt : nts) {
uint64_t elapsed =
- runNeverFailTest<std::chrono::system_clock, std::atomic>(nt, n);
+ runNeverFailTest<std::chrono::system_clock, std::atomic, Dynamic>(nt, n);
LOG(INFO) << (elapsed * 1000.0) / (n * 2) << " nanos per op with " << nt
<< " threads";
-TEST(MPMCQueue, mt_never_fail_until_steady) {
- int nts[] = {1, 3, 100};
+TEST(MPMCQueue, mt_never_fail_until_system) {
+ std::vector<int> nts {1, 3, 100};
+ int n = 100000;
+ runMtNeverFailUntilSystem(nts, n);
+TEST(MPMCQueue, mt_never_fail_until_system_dynamic) {
+ std::vector<int> nts {1, 3, 100};
int n = 100000;
+ runMtNeverFailUntilSystem<true>(nts, n);
+template <bool Dynamic = false>
+void runMtNeverFailUntilSteady(std::vector<int>& nts, int n) {
for (int nt : nts) {
uint64_t elapsed =
- runNeverFailTest<std::chrono::steady_clock, std::atomic>(nt, n);
+ runNeverFailTest<std::chrono::steady_clock, std::atomic, Dynamic>(nt, n);
LOG(INFO) << (elapsed * 1000.0) / (n * 2) << " nanos per op with " << nt
<< " threads";
+TEST(MPMCQueue, mt_never_fail_until_steady) {
+ std::vector<int> nts {1, 3, 100};
+ int n = 100000;
+ runMtNeverFailUntilSteady(nts, n);
+TEST(MPMCQueue, mt_never_fail_until_steady_dynamic) {
+ std::vector<int> nts {1, 3, 100};
+ int n = 100000;
+ runMtNeverFailUntilSteady<true>(nts, n);
enum LifecycleEvent {
EXPECT_EQ(lc_outstanding(), 0);
- MPMCQueue<Lifecycle<R>> queue(50);
+ // Non-dynamic only. False positive for dynamic.
+ MPMCQueue<Lifecycle<R>, std::atomic> queue(50);
for (int pass = 0; pass < 10; ++pass) {
-TEST(MPMCQueue, queue_moving) {
+template <bool Dynamic = false>
+void run_queue_moving() {
EXPECT_EQ(lc_outstanding(), 0);
- MPMCQueue<Lifecycle<std::false_type>> a(50);
+ MPMCQueue<Lifecycle<std::false_type>, std::atomic, Dynamic> a(50);
// move constructor
- MPMCQueue<Lifecycle<std::false_type>> b = std::move(a);
+ MPMCQueue<Lifecycle<std::false_type>, std::atomic, Dynamic> b
+ = std::move(a);
EXPECT_EQ(a.capacity(), 0);
EXPECT_EQ(a.size(), 0);
// move operator
- MPMCQueue<Lifecycle<std::false_type>> c;
+ MPMCQueue<Lifecycle<std::false_type>, std::atomic, Dynamic> c;
c = std::move(b);
// swap
- MPMCQueue<Lifecycle<std::false_type>> d(10);
+ MPMCQueue<Lifecycle<std::false_type>, std::atomic, Dynamic> d(10);
std::swap(c, d);
+TEST(MPMCQueue, queue_moving) {
+ run_queue_moving();
+TEST(MPMCQueue, queue_moving_dynamic) {
+ run_queue_moving<true>();
TEST(MPMCQueue, explicit_zero_capacity_fail) {
ASSERT_THROW(MPMCQueue<int> cq(0), std::invalid_argument);
+ using DynamicMPMCQueueInt = MPMCQueue<int, std::atomic, true>;
+ ASSERT_THROW(DynamicMPMCQueueInt cq(0), std::invalid_argument);