From 59ea1768a8dd7c4064bf1bf2d4c3fe92b7c6386d Mon Sep 17 00:00:00 2001 From: Maged Michael Date: Thu, 8 Sep 2016 10:43:23 -0700 Subject: [PATCH] Dynamic expansion of folly MPMC queue Summary: This diff allows queues to start with small capacity and expand as needed up to the specified capacity. The main additions and changes: - Extra template parameter `Dynamic` that enables dynamic expansion (`default 'false'). - `ClosedArray` type. - Extra members: -- `dstate_`: a packed 64 bit unsigned int that contains a seqlock (which implicitly indicates the number of expansions and the lowest ticket for the current `dslots_/dcapacity_/dstride_` configuration. -- `dcapacity_`: current dynamic capacity. -- `dslots_`: current dynamic slots array. (in anonymous union with `slots_`) -- `dstride_`: current dynamic stride. (in anonymous union with `stride_`) -- `closed_` a logarithmic-sized array of ClosedArray to hold information about earlier smaller queue arrays for use by lagging consumers. Design sketch: - Reallocate a new larger array on expansion - Expansion uses a seqlock. The common case critical path includes a seqlock read-only section. - Lagging consumers and lagging blocking producers use a logarithmic-sized array for info about closed arrays - Tickets are adjusted by an offset (to accounts for the tickets associated with earlier closed arrays) in order to calculate appropriate index and turn. - The synching of `pushTicket_` with the ticket offset packed in `dstate_` is tricky. `pushTicket_` is accessed outside `dstate_`'s seqlock. Reviewed By: djwatson Differential Revision: D3462592 fbshipit-source-id: d442a7694190cca3c33753409ffac941d7463f83 --- folly/MPMCQueue.h | 756 +++++++++++++++++++++++++++--- folly/detail/MPMCPipelineDetail.h | 11 +- folly/test/MPMCQueueTest.cpp | 525 +++++++++++++++------ 3 files changed, 1058 insertions(+), 234 deletions(-) diff --git a/folly/MPMCQueue.h b/folly/MPMCQueue.h index 08b2e1b8..2bf89c53 100644 --- a/folly/MPMCQueue.h +++ b/folly/MPMCQueue.h @@ -38,6 +38,9 @@ struct SingleElementQueue; template class MPMCPipelineStageImpl; +/// MPMCQueue base CRTP template +template class MPMCQueueBase; + } // namespace detail /// MPMCQueue is a high-performance bounded concurrent queue that @@ -93,34 +96,536 @@ template class MPMCPipelineStageImpl; /// are you can enqueue one sentinel and then have each consumer requeue /// two sentinels after it receives it (by requeuing 2 the shutdown can /// complete in O(log P) time instead of O(P)). -template class Atom = std::atomic> -class MPMCQueue : boost::noncopyable { +template class Atom = std::atomic, + bool Dynamic = false> +class MPMCQueue : public detail::MPMCQueueBase> { + friend class detail::MPMCPipelineStageImpl; + using Slot = detail::SingleElementQueue; + public: + + explicit MPMCQueue(size_t queueCapacity) + : detail::MPMCQueueBase>(queueCapacity) + { + this->stride_ = this->computeStride(queueCapacity); + this->slots_ = new Slot[queueCapacity + 2 * this->kSlotPadding]; + } + + MPMCQueue() noexcept { } +}; + +/// The dynamic version of MPMCQueue allows dynamic expansion of queue +/// capacity, such that a queue may start with a smaller capacity than +/// specified and expand only if needed. Users may optionally specify +/// the initial capacity and the expansion multiplier. +/// +/// The design uses a seqlock to enforce mutual exclusion among +/// expansion attempts. Regular operations read up-to-date queue +/// information (slots array, capacity, stride) inside read-only +/// seqlock sections, which are unimpeded when no expansion is in +/// progress. +/// +/// An expansion computes a new capacity, allocates a new slots array, +/// and updates stride. No information needs to be copied from the +/// current slots array to the new one. When this happens, new slots +/// will not have sequence numbers that match ticket numbers. The +/// expansion needs to compute a ticket offset such that operations +/// that use new arrays can adjust the calculations of slot indexes +/// and sequence numbers that take into account that the new slots +/// start with sequence numbers of zero. The current ticket offset is +/// packed with the seqlock in an atomic 64-bit integer. The initial +/// offset is zero. +/// +/// Lagging write and read operations with tickets lower than the +/// ticket offset of the current slots array (i.e., the minimum ticket +/// number that can be served by the current array) must use earlier +/// closed arrays instead of the current one. Information about closed +/// slots arrays (array address, capacity, stride, and offset) is +/// maintained in a logarithmic-sized structure. Each entry in that +/// structure never need to be changed once set. The number of closed +/// arrays is half the value of the seqlock (when unlocked). +/// +/// The acquisition of the seqlock to perform an expansion does not +/// prevent the issuing of new push and pop tickets concurrently. The +/// expansion must set the new ticket offset to a value that couldn't +/// have been issued to an operation that has already gone through a +/// seqlock read-only section (and hence obtained information for +/// older closed arrays). +/// +/// Note that the total queue capacity can temporarily exceed the +/// specified capacity when there are lagging consumers that haven't +/// yet consumed all the elements in closed arrays. Users should not +/// rely on the capacity of dynamic queues for synchronization, e.g., +/// they should not expect that a thread will definitely block on a +/// call to blockingWrite() when the queue size is known to be equal +/// to its capacity. +/// +/// The dynamic version is a partial specialization of MPMCQueue with +/// Dynamic == true +template class Atom> +class MPMCQueue : + public detail::MPMCQueueBase> { + friend class detail::MPMCQueueBase>; + using Slot = detail::SingleElementQueue; + + struct ClosedArray { + uint64_t offset_ {0}; + Slot* slots_ {nullptr}; + size_t capacity_ {0}; + int stride_ {0}; + }; + + public: + + explicit MPMCQueue(size_t queueCapacity) + : detail::MPMCQueueBase>(queueCapacity) + { + size_t cap = std::min(kDefaultMinDynamicCapacity, queueCapacity); + initQueue(cap, kDefaultExpansionMultiplier); + } + + explicit MPMCQueue(size_t queueCapacity, + size_t minCapacity, + size_t expansionMultiplier) + : detail::MPMCQueueBase>(queueCapacity) + { + minCapacity = std::max(1, minCapacity); + size_t cap = std::min(minCapacity, queueCapacity); + expansionMultiplier = std::max(2, expansionMultiplier); + initQueue(cap, expansionMultiplier); + } + + MPMCQueue() noexcept { + dmult_ = 0; + closed_ = nullptr; + } + + MPMCQueue(MPMCQueue&& rhs) noexcept { + this->capacity_ = rhs.capacity_; + this->slots_ = rhs.slots_; + this->stride_ = rhs.stride_; + this->dstate_.store(rhs.dstate_.load(std::memory_order_relaxed), + std::memory_order_relaxed); + this->dcapacity_.store(rhs.dcapacity_.load(std::memory_order_relaxed), + std::memory_order_relaxed); + this->pushTicket_.store(rhs.pushTicket_.load(std::memory_order_relaxed), + std::memory_order_relaxed); + this->popTicket_.store(rhs.popTicket_.load(std::memory_order_relaxed), + std::memory_order_relaxed); + this->pushSpinCutoff_.store( + rhs.pushSpinCutoff_.load(std::memory_order_relaxed), + std::memory_order_relaxed); + this->popSpinCutoff_.store( + rhs.popSpinCutoff_.load(std::memory_order_relaxed), + std::memory_order_relaxed); + dmult_ = rhs.dmult_; + closed_ = rhs.closed_; + + rhs.capacity_ = 0; + rhs.slots_ = nullptr; + rhs.stride_ = 0; + rhs.dstate_.store(0, std::memory_order_relaxed); + rhs.dcapacity_.store(0, std::memory_order_relaxed); + rhs.pushTicket_.store(0, std::memory_order_relaxed); + rhs.popTicket_.store(0, std::memory_order_relaxed); + rhs.pushSpinCutoff_.store(0, std::memory_order_relaxed); + rhs.popSpinCutoff_.store(0, std::memory_order_relaxed); + rhs.dmult_ = 0; + rhs.closed_ = nullptr; + } + + MPMCQueue const& operator= (MPMCQueue&& rhs) { + if (this != &rhs) { + this->~MPMCQueue(); + new (this) MPMCQueue(std::move(rhs)); + } + return *this; + } + + ~MPMCQueue() { + if (closed_ != nullptr) { + for (int i = getNumClosed(this->dstate_.load()) - 1; i >= 0; --i) { + delete[] closed_[i].slots_; + } + delete[] closed_; + } + } + + size_t allocatedCapacity() const noexcept { + return this->dcapacity_.load(std::memory_order_relaxed); + } + + template + void blockingWrite(Args&&... args) noexcept { + uint64_t ticket = this->pushTicket_++; + Slot* slots; + size_t cap; + int stride; + uint64_t state; + uint64_t offset; + do { + if (!trySeqlockReadSection(state, slots, cap, stride)) { + continue; + } + offset = getOffset(state); + if (ticket < offset) { + // There was an expansion after this ticket was issued. + updateFromClosed(state, ticket, offset, slots, cap, stride); + break; + } + if (slots[this->idx((ticket-offset), cap, stride)] + .mayEnqueue(this->turn(ticket-offset, cap))) { + // A slot is ready. No need to expand. + break; + } else if (this->popTicket_.load(std::memory_order_relaxed) + cap + > ticket) { + // May block, but a pop is in progress. No need to expand. + // Get seqlock read section info again in case an expansion + // occurred with an equal or higher ticket. + continue; + } else { + // May block. See if we can expand. + if (tryExpand(state, cap)) { + // This or another thread started an expansion. Get updated info. + continue; + } else { + // Can't expand. + break; + } + } + } while (true); + this->enqueueWithTicketBase(ticket-offset, slots, cap, stride, + std::forward(args)...); + } + + void blockingReadWithTicket(uint64_t& ticket, T& elem) noexcept { + ticket = this->popTicket_++; + Slot* slots; + size_t cap; + int stride; + uint64_t state; + uint64_t offset; + while (!trySeqlockReadSection(state, slots, cap, stride)); + offset = getOffset(state); + if (ticket < offset) { + // There was an expansion after the corresponding push ticket + // was issued. + updateFromClosed(state, ticket, offset, slots, cap, stride); + } + this->dequeueWithTicketBase(ticket-offset, slots, cap, stride, elem); + } + + private: + + enum { + kSeqlockBits = 6, + kDefaultMinDynamicCapacity = 10, + kDefaultExpansionMultiplier = 10, + }; + + size_t dmult_; + + // Info about closed slots arrays for use by lagging operations + ClosedArray* closed_; + + void initQueue(const size_t cap, const size_t mult) { + this->stride_ = this->computeStride(cap); + this->slots_ = new Slot[cap + 2 * this->kSlotPadding]; + this->dstate_.store(0); + this->dcapacity_.store(cap); + dmult_ = mult; + size_t maxClosed = 0; + for (size_t expanded = cap; + expanded < this->capacity_; + expanded *= mult) { + ++maxClosed; + } + closed_ = (maxClosed > 0) ? new ClosedArray[maxClosed] : nullptr; + } + + bool tryObtainReadyPushTicket( + uint64_t& ticket, Slot*& slots, size_t& cap, int& stride + ) noexcept { + uint64_t state; + do { + ticket = this->pushTicket_.load(std::memory_order_acquire); // A + if (!trySeqlockReadSection(state, slots, cap, stride)) { + continue; + } + uint64_t offset = getOffset(state); + if (ticket < offset) { + // There was an expansion with offset greater than this ticket + updateFromClosed(state, ticket, offset, slots, cap, stride); + } + if (slots[this->idx((ticket-offset), cap, stride)] + .mayEnqueue(this->turn(ticket-offset, cap))) { + // A slot is ready. + if (this->pushTicket_.compare_exchange_strong(ticket, ticket + 1)) { + // Adjust ticket + ticket -= offset; + return true; + } else { + continue; + } + } else { + if (ticket != this->pushTicket_.load(std::memory_order_relaxed)) { // B + // Try again. Ticket changed. + continue; + } + // Likely to block. + // Try to expand unless the ticket is for a closed array + if (offset == getOffset(state)) { + if (tryExpand(state, cap)) { + // This or another thread started an expansion. Get up-to-date info. + continue; + } + } + return false; + } + } while (true); + } + + bool tryObtainPromisedPushTicket( + uint64_t& ticket, Slot*& slots, size_t& cap, int& stride + ) noexcept { + uint64_t state; + do { + ticket = this->pushTicket_.load(std::memory_order_acquire); + auto numPops = this->popTicket_.load(std::memory_order_acquire); + if (!trySeqlockReadSection(state, slots, cap, stride)) { + continue; + } + int64_t n = ticket - numPops; + if (n >= static_cast(this->capacity_)) { + return false; + } + if ((n >= static_cast(cap))) { + if (tryExpand(state, cap)) { + // This or another thread started an expansion. Start over + // with a new state. + continue; + } else { + // Can't expand. + return false; + } + } + uint64_t offset = getOffset(state); + if (ticket < offset) { + // There was an expansion with offset greater than this ticket + updateFromClosed(state, ticket, offset, slots, cap, stride); + } + if (this->pushTicket_.compare_exchange_strong(ticket, ticket + 1)) { + // Adjust ticket + ticket -= offset; + return true; + } + } while (true); + } + + bool tryObtainReadyPopTicket( + uint64_t& ticket, Slot*& slots, size_t& cap, int& stride + ) noexcept { + uint64_t state; + do { + ticket = this->popTicket_.load(std::memory_order_relaxed); + if (!trySeqlockReadSection(state, slots, cap, stride)) { + continue; + } + uint64_t offset = getOffset(state); + if (ticket < offset) { + // There was an expansion after the corresponding push ticket + // was issued. + updateFromClosed(state, ticket, offset, slots, cap, stride); + } + if (slots[this->idx((ticket-offset), cap, stride)] + .mayDequeue(this->turn(ticket-offset, cap))) { + if (this->popTicket_.compare_exchange_strong(ticket, ticket + 1)) { + // Adjust ticket + ticket -= offset; + return true; + } + } else { + return false; + } + } while (true); + } + + bool tryObtainPromisedPopTicket( + uint64_t& ticket, Slot*& slots, size_t& cap, int& stride + ) noexcept { + uint64_t state; + do { + ticket = this->popTicket_.load(std::memory_order_acquire); + auto numPushes = this->pushTicket_.load(std::memory_order_acquire); + if (!trySeqlockReadSection(state, slots, cap, stride)) { + continue; + } + if (ticket >= numPushes) { + return false; + } + if (this->popTicket_.compare_exchange_strong(ticket, ticket + 1)) { + // Adjust ticket + uint64_t offset = getOffset(state); + if (ticket < offset) { + // There was an expansion after the corresponding push + // ticket was issued. + updateFromClosed(state, ticket, offset, slots, cap, stride); + } + // Adjust ticket + ticket -= offset; + return true; + } + } while (true); + } + + /// Enqueues an element with a specific ticket number + template + void enqueueWithTicket(const uint64_t ticket, Args&&... args) noexcept { + Slot* slots; + size_t cap; + int stride; + uint64_t state; + uint64_t offset; + while (!trySeqlockReadSection(state, slots, cap, stride)) {} + offset = getOffset(state); + if (ticket < offset) { + // There was an expansion after this ticket was issued. + updateFromClosed(state, ticket, offset, slots, cap, stride); + } + this->enqueueWithTicketBase(ticket-offset, slots, cap, stride, + std::forward(args)...); + } + + uint64_t getOffset(const uint64_t state) const noexcept { + return state >> kSeqlockBits; + } + + int getNumClosed(const uint64_t state) const noexcept { + return (state & ((1 << kSeqlockBits) - 1)) >> 1; + } + + /// Try to expand the queue. Returns true if this expansion was + /// successful or a concurent expansion is in progress. Returns + /// false if the queue has reached its maximum capacity or + /// allocation has failed. + bool tryExpand(const uint64_t state, const size_t cap) noexcept { + if (cap == this->capacity_) { + return false; + } + // Acquire seqlock + uint64_t oldval = state; + assert((state & 1) == 0); + if (this->dstate_.compare_exchange_strong(oldval, state + 1)) { + assert(cap == this->dcapacity_.load()); + uint64_t ticket = 1 + std::max(this->pushTicket_.load(), + this->popTicket_.load()); + size_t newCapacity = + std::min(dmult_ * cap, this->capacity_); + Slot* newSlots = + new (std::nothrow) Slot[newCapacity + 2 * this->kSlotPadding]; + if (newSlots == nullptr) { + // Expansion failed. Restore the seqlock + this->dstate_.store(state); + return false; + } + // Successful expansion + // calculate the current ticket offset + uint64_t offset = getOffset(state); + // calculate index in closed array + int index = getNumClosed(state); + assert((index << 1) < (1 << kSeqlockBits)); + // fill the info for the closed slots array + closed_[index].offset_ = offset; + closed_[index].slots_ = this->dslots_.load(); + closed_[index].capacity_ = cap; + closed_[index].stride_ = this->dstride_.load(); + // update the new slots array info + this->dslots_.store(newSlots); + this->dcapacity_.store(newCapacity); + this->dstride_.store(this->computeStride(newCapacity)); + // Release the seqlock and record the new ticket offset + this->dstate_.store((ticket << kSeqlockBits) + (2 * (index + 1))); + return true; + } else { // failed to acquire seqlock + // Someone acaquired the seqlock. Go back to the caller and get + // up-to-date info. + return true; + } + } + + /// Seqlock read-only section + bool trySeqlockReadSection( + uint64_t& state, Slot*& slots, size_t& cap, int& stride + ) noexcept { + state = this->dstate_.load(std::memory_order_acquire); + if (state & 1) { + // Locked. + return false; + } + // Start read-only section. + slots = this->dslots_.load(std::memory_order_relaxed); + cap = this->dcapacity_.load(std::memory_order_relaxed); + stride = this->dstride_.load(std::memory_order_relaxed); + // End of read-only section. Validate seqlock. + std::atomic_thread_fence(std::memory_order_acquire); + return (state == this->dstate_.load(std::memory_order_relaxed)); + } + + /// Update local variables of a lagging operation using the + /// most recent closed array with offset <= ticket + void updateFromClosed( + const uint64_t state, const uint64_t ticket, + uint64_t& offset, Slot*& slots, size_t& cap, int& stride + ) noexcept { + for (int i = getNumClosed(state) - 1; i >= 0; --i) { + offset = closed_[i].offset_; + if (offset <= ticket) { + slots = closed_[i].slots_; + cap = closed_[i].capacity_; + stride = closed_[i].stride_; + return;; + } + } + // A closed array with offset <= ticket should have been found + assert(false); + } +}; + +namespace detail { + +/// CRTP specialization of MPMCQueueBase +template< + template< + typename T, template class Atom, bool Dynamic> class Derived, + typename T, template class Atom, bool Dynamic> +class MPMCQueueBase> : boost::noncopyable { + +// Note: Using CRTP static casts in several functions of this base +// template instead of making called functions virtual or duplicating +// the code of calling functions in the derived partially specialized +// template static_assert(std::is_nothrow_constructible::value || folly::IsRelocatable::value, "T must be relocatable or have a noexcept move constructor"); - friend class detail::MPMCPipelineStageImpl; public: typedef T value_type; - explicit MPMCQueue(size_t queueCapacity) + using Slot = detail::SingleElementQueue; + + explicit MPMCQueueBase(size_t queueCapacity) : capacity_(queueCapacity) , pushTicket_(0) , popTicket_(0) , pushSpinCutoff_(0) , popSpinCutoff_(0) { - if (queueCapacity == 0) + if (queueCapacity == 0) { throw std::invalid_argument( "MPMCQueue with explicit capacity 0 is impossible" + // Stride computation in derived classes would sigfpe if capacity is 0 ); - - // would sigfpe if capacity is 0 - stride_ = computeStride(queueCapacity); - slots_ = new detail::SingleElementQueue[queueCapacity + - 2 * kSlotPadding]; + } // ideally this would be a static assert, but g++ doesn't allow it assert(alignof(MPMCQueue) @@ -132,10 +637,12 @@ class MPMCQueue : boost::noncopyable { /// A default-constructed queue is useful because a usable (non-zero /// capacity) queue can be moved onto it or swapped with it - MPMCQueue() noexcept + MPMCQueueBase() noexcept : capacity_(0) , slots_(nullptr) , stride_(0) + , dstate_(0) + , dcapacity_(0) , pushTicket_(0) , popTicket_(0) , pushSpinCutoff_(0) @@ -145,10 +652,12 @@ class MPMCQueue : boost::noncopyable { /// IMPORTANT: The move constructor is here to make it easier to perform /// the initialization phase, it is not safe to use when there are any /// concurrent accesses (this is not checked). - MPMCQueue(MPMCQueue&& rhs) noexcept + MPMCQueueBase(MPMCQueueBase>&& rhs) noexcept : capacity_(rhs.capacity_) , slots_(rhs.slots_) , stride_(rhs.stride_) + , dstate_(rhs.dstate_.load(std::memory_order_relaxed)) + , dcapacity_(rhs.dcapacity_.load(std::memory_order_relaxed)) , pushTicket_(rhs.pushTicket_.load(std::memory_order_relaxed)) , popTicket_(rhs.popTicket_.load(std::memory_order_relaxed)) , pushSpinCutoff_(rhs.pushSpinCutoff_.load(std::memory_order_relaxed)) @@ -161,6 +670,8 @@ class MPMCQueue : boost::noncopyable { rhs.capacity_ = 0; rhs.slots_ = nullptr; rhs.stride_ = 0; + rhs.dstate_.store(0, std::memory_order_relaxed); + rhs.dcapacity_.store(0, std::memory_order_relaxed); rhs.pushTicket_.store(0, std::memory_order_relaxed); rhs.popTicket_.store(0, std::memory_order_relaxed); rhs.pushSpinCutoff_.store(0, std::memory_order_relaxed); @@ -170,17 +681,18 @@ class MPMCQueue : boost::noncopyable { /// IMPORTANT: The move operator is here to make it easier to perform /// the initialization phase, it is not safe to use when there are any /// concurrent accesses (this is not checked). - MPMCQueue const& operator= (MPMCQueue&& rhs) { + MPMCQueueBase> const& operator= + (MPMCQueueBase>&& rhs) { if (this != &rhs) { - this->~MPMCQueue(); - new (this) MPMCQueue(std::move(rhs)); + this->~MPMCQueueBase(); + new (this) MPMCQueueBase(std::move(rhs)); } return *this; } /// MPMCQueue can only be safely destroyed when there are no /// pending enqueuers or dequeuers (this is not checked). - ~MPMCQueue() { + ~MPMCQueueBase() { delete[] slots_; } @@ -235,6 +747,11 @@ class MPMCQueue : boost::noncopyable { return capacity_; } + /// Doesn't change for non-dynamic + size_t allocatedCapacity() const noexcept { + return capacity_; + } + /// Returns the total number of calls to blockingWrite or successful /// calls to write, including those blockingWrite calls that are /// currently blocking @@ -256,7 +773,8 @@ class MPMCQueue : boost::noncopyable { /// to a T constructor. template void blockingWrite(Args&&... args) noexcept { - enqueueWithTicket(pushTicket_++, std::forward(args)...); + enqueueWithTicketBase(pushTicket_++, slots_, capacity_, stride_, + std::forward(args)...); } /// If an item can be enqueued with no blocking, does so and returns @@ -275,9 +793,14 @@ class MPMCQueue : boost::noncopyable { template bool write(Args&&... args) noexcept { uint64_t ticket; - if (tryObtainReadyPushTicket(ticket)) { + Slot* slots; + size_t cap; + int stride; + if (static_cast*>(this)-> + tryObtainReadyPushTicket(ticket, slots, cap, stride)) { // we have pre-validated that the ticket won't block - enqueueWithTicket(ticket, std::forward(args)...); + enqueueWithTicketBase(ticket, slots, cap, stride, + std::forward(args)...); return true; } else { return false; @@ -288,11 +811,15 @@ class MPMCQueue : boost::noncopyable { bool tryWriteUntil(const std::chrono::time_point& when, Args&&... args) noexcept { uint64_t ticket; - if (tryObtainPromisedPushTicketUntil(ticket, when)) { - // we have pre-validated that the ticket won't block, or rather that - // it won't block longer than it takes another thread to dequeue an - // element from the slot it identifies. - enqueueWithTicket(ticket, std::forward(args)...); + Slot* slots; + size_t cap; + int stride; + if (tryObtainPromisedPushTicketUntil(ticket, slots, cap, stride, when)) { + // we have pre-validated that the ticket won't block, or rather that + // it won't block longer than it takes another thread to dequeue an + // element from the slot it identifies. + enqueueWithTicketBase(ticket, slots, cap, stride, + std::forward(args)...); return true; } else { return false; @@ -315,10 +842,15 @@ class MPMCQueue : boost::noncopyable { template bool writeIfNotFull(Args&&... args) noexcept { uint64_t ticket; - if (tryObtainPromisedPushTicket(ticket)) { + Slot* slots; + size_t cap; + int stride; + if (static_cast*>(this)-> + tryObtainPromisedPushTicket(ticket, slots, cap, stride)) { // some other thread is already dequeuing the slot into which we // are going to enqueue, but we might have to wait for them to finish - enqueueWithTicket(ticket, std::forward(args)...); + enqueueWithTicketBase(ticket, slots, cap, stride, + std::forward(args)...); return true; } else { return false; @@ -328,16 +860,33 @@ class MPMCQueue : boost::noncopyable { /// Moves a dequeued element onto elem, blocking until an element /// is available void blockingRead(T& elem) noexcept { - dequeueWithTicket(popTicket_++, elem); + uint64_t ticket; + static_cast*>(this)-> + blockingReadWithTicket(ticket, elem); + } + + /// Same as blockingRead() but also records the ticket nunmer + void blockingReadWithTicket(uint64_t& ticket, T& elem) noexcept { + ticket = popTicket_++; + dequeueWithTicketBase(ticket, slots_, capacity_, stride_, elem); } /// If an item can be dequeued with no blocking, does so and returns /// true, otherwise returns false. bool read(T& elem) noexcept { uint64_t ticket; - if (tryObtainReadyPopTicket(ticket)) { + return readAndGetTicket(ticket, elem); + } + + /// Same as read() but also records the ticket nunmer + bool readAndGetTicket(uint64_t& ticket, T& elem) noexcept { + Slot* slots; + size_t cap; + int stride; + if (static_cast*>(this)-> + tryObtainReadyPopTicket(ticket, slots, cap, stride)) { // the ticket has been pre-validated to not block - dequeueWithTicket(ticket, elem); + dequeueWithTicketBase(ticket, slots, cap, stride, elem); return true; } else { return false; @@ -351,16 +900,20 @@ class MPMCQueue : boost::noncopyable { /// prefer read. bool readIfNotEmpty(T& elem) noexcept { uint64_t ticket; - if (tryObtainPromisedPopTicket(ticket)) { + Slot* slots; + size_t cap; + int stride; + if (static_cast*>(this)-> + tryObtainPromisedPopTicket(ticket, slots, cap, stride)) { // the matching enqueue already has a ticket, but might not be done - dequeueWithTicket(ticket, elem); + dequeueWithTicketBase(ticket, slots, cap, stride, elem); return true; } else { return false; } } - private: + protected: enum { /// Once every kAdaptationFreq we will spin longer, to try to estimate /// the proper spin backoff @@ -370,21 +923,41 @@ class MPMCQueue : boost::noncopyable { /// allocations, we pad it with this many SingleElementQueue-s at /// each end kSlotPadding = (detail::CacheLocality::kFalseSharingRange - 1) - / sizeof(detail::SingleElementQueue) + 1 + / sizeof(Slot) + 1 }; /// The maximum number of items in the queue at once size_t FOLLY_ALIGN_TO_AVOID_FALSE_SHARING capacity_; - /// An array of capacity_ SingleElementQueue-s, each of which holds - /// either 0 or 1 item. We over-allocate by 2 * kSlotPadding and don't - /// touch the slots at either end, to avoid false sharing - detail::SingleElementQueue* slots_; + /// Anonymous union for use when Dynamic = false and true, respectively + union { + /// An array of capacity_ SingleElementQueue-s, each of which holds + /// either 0 or 1 item. We over-allocate by 2 * kSlotPadding and don't + /// touch the slots at either end, to avoid false sharing + Slot* slots_; + /// Current dynamic slots array of dcapacity_ SingleElementQueue-s + Atom dslots_; + }; + + /// Anonymous union for use when Dynamic = false and true, respectively + union { + /// The number of slots_ indices that we advance for each ticket, to + /// avoid false sharing. Ideally slots_[i] and slots_[i + stride_] + /// aren't on the same cache line + int stride_; + /// Current stride + Atom dstride_; + }; - /// The number of slots_ indices that we advance for each ticket, to - /// avoid false sharing. Ideally slots_[i] and slots_[i + stride_] - /// aren't on the same cache line - int stride_; + /// The following two memebers are used by dynamic MPMCQueue. + /// Ideally they should be in MPMCQueue, but we get + /// better cache locality if they are in the same cache line as + /// dslots_ and dstride_. + /// + /// Dynamic state. A packed seqlock and ticket offset + Atom dstate_; + /// Dynamic capacity + Atom dcapacity_; /// Enqueuers get tickets from here Atom FOLLY_ALIGN_TO_AVOID_FALSE_SHARING pushTicket_; @@ -405,7 +978,6 @@ class MPMCQueue : boost::noncopyable { char padding_[detail::CacheLocality::kFalseSharingRange - sizeof(Atom)]; - /// We assign tickets in increasing order, but we don't want to /// access neighboring elements of slots_ because that will lead to /// false sharing (multiple cores accessing the same cache line even @@ -446,23 +1018,29 @@ class MPMCQueue : boost::noncopyable { /// Returns the index into slots_ that should be used when enqueuing or /// dequeuing with the specified ticket - size_t idx(uint64_t ticket) noexcept { - return ((ticket * stride_) % capacity_) + kSlotPadding; + size_t idx(uint64_t ticket, size_t cap, int stride) noexcept { + return ((ticket * stride) % cap) + kSlotPadding; } /// Maps an enqueue or dequeue ticket to the turn should be used at the /// corresponding SingleElementQueue - uint32_t turn(uint64_t ticket) noexcept { - return ticket / capacity_; + uint32_t turn(uint64_t ticket, size_t cap) noexcept { + return ticket / cap; } /// Tries to obtain a push ticket for which SingleElementQueue::enqueue /// won't block. Returns true on immediate success, false on immediate /// failure. - bool tryObtainReadyPushTicket(uint64_t& rv) noexcept { - auto ticket = pushTicket_.load(std::memory_order_acquire); // A + bool tryObtainReadyPushTicket( + uint64_t& ticket, Slot*& slots, size_t& cap, int& stride + ) noexcept { + ticket = pushTicket_.load(std::memory_order_acquire); // A + slots = slots_; + cap = capacity_; + stride = stride_; while (true) { - if (!slots_[idx(ticket)].mayEnqueue(turn(ticket))) { + if (!slots[idx(ticket, cap, stride)] + .mayEnqueue(turn(ticket, cap))) { // if we call enqueue(ticket, ...) on the SingleElementQueue // right now it would block, but this might no longer be the next // ticket. We can increase the chance of tryEnqueue success under @@ -479,7 +1057,6 @@ class MPMCQueue : boost::noncopyable { // or prev failing CAS) and the following CAS. If the CAS fails // it will effect a load of pushTicket_ if (pushTicket_.compare_exchange_strong(ticket, ticket + 1)) { - rv = ticket; return true; } } @@ -492,18 +1069,22 @@ class MPMCQueue : boost::noncopyable { /// ticket is filled on success AND failure. template bool tryObtainPromisedPushTicketUntil( - uint64_t& ticket, const std::chrono::time_point& when) noexcept { + uint64_t& ticket, Slot*& slots, size_t& cap, int& stride, + const std::chrono::time_point& when + ) noexcept { bool deadlineReached = false; while (!deadlineReached) { - if (tryObtainPromisedPushTicket(ticket)) { + if (static_cast*>(this)-> + tryObtainPromisedPushTicket(ticket, slots, cap, stride)) { return true; } // ticket is a blocking ticket until the preceding ticket has been // processed: wait until this ticket's turn arrives. We have not reserved // this ticket so we will have to re-attempt to get a non-blocking ticket // if we wake up before we time-out. - deadlineReached = !slots_[idx(ticket)].tryWaitForEnqueueTurnUntil( - turn(ticket), pushSpinCutoff_, (ticket % kAdaptationFreq) == 0, when); + deadlineReached = !slots[idx(ticket, cap, stride)] + .tryWaitForEnqueueTurnUntil(turn(ticket, cap), pushSpinCutoff_, + (ticket % kAdaptationFreq) == 0, when); } return false; } @@ -513,13 +1094,18 @@ class MPMCQueue : boost::noncopyable { /// blocking may be required when using the returned ticket if some /// other thread's pop is still in progress (ticket has been granted but /// pop has not yet completed). - bool tryObtainPromisedPushTicket(uint64_t& rv) noexcept { + bool tryObtainPromisedPushTicket( + uint64_t& ticket, Slot*& slots, size_t& cap, int& stride + ) noexcept { auto numPushes = pushTicket_.load(std::memory_order_acquire); // A + slots = slots_; + cap = capacity_; + stride = stride_; while (true) { auto numPops = popTicket_.load(std::memory_order_acquire); // B // n will be negative if pops are pending int64_t n = numPushes - numPops; - rv = numPushes; + ticket = numPushes; if (n >= static_cast(capacity_)) { // Full, linearize at B. We don't need to recheck the read we // performed at A, because if numPushes was stale at B then the @@ -535,10 +1121,16 @@ class MPMCQueue : boost::noncopyable { /// Tries to obtain a pop ticket for which SingleElementQueue::dequeue /// won't block. Returns true on immediate success, false on immediate /// failure. - bool tryObtainReadyPopTicket(uint64_t& rv) noexcept { - auto ticket = popTicket_.load(std::memory_order_acquire); + bool tryObtainReadyPopTicket( + uint64_t& ticket, Slot*& slots, size_t& cap, int& stride + ) noexcept { + ticket = popTicket_.load(std::memory_order_acquire); + slots = slots_; + cap = capacity_; + stride = stride_; while (true) { - if (!slots_[idx(ticket)].mayDequeue(turn(ticket))) { + if (!slots[idx(ticket, cap, stride)] + .mayDequeue(turn(ticket, cap))) { auto prev = ticket; ticket = popTicket_.load(std::memory_order_acquire); if (prev == ticket) { @@ -546,7 +1138,6 @@ class MPMCQueue : boost::noncopyable { } } else { if (popTicket_.compare_exchange_strong(ticket, ticket + 1)) { - rv = ticket; return true; } } @@ -563,7 +1154,9 @@ class MPMCQueue : boost::noncopyable { /// to block waiting for someone to call enqueue, although we might /// have to block waiting for them to finish executing code inside the /// MPMCQueue itself. - bool tryObtainPromisedPopTicket(uint64_t& rv) noexcept { + bool tryObtainPromisedPopTicket( + uint64_t& ticket, Slot*& slots, size_t& cap, int& stride + ) noexcept { auto numPops = popTicket_.load(std::memory_order_acquire); // A while (true) { auto numPushes = pushTicket_.load(std::memory_order_acquire); // B @@ -574,7 +1167,10 @@ class MPMCQueue : boost::noncopyable { return false; } if (popTicket_.compare_exchange_strong(numPops, numPops + 1)) { - rv = numPops; + ticket = numPops; + slots = slots_; + cap = capacity_; + stride = stride_; return true; } } @@ -582,25 +1178,35 @@ class MPMCQueue : boost::noncopyable { // Given a ticket, constructs an enqueued item using args template + void enqueueWithTicketBase( + uint64_t ticket, Slot* slots, size_t cap, int stride, Args&&... args + ) noexcept { + slots[idx(ticket, cap, stride)] + .enqueue(turn(ticket, cap), + pushSpinCutoff_, + (ticket % kAdaptationFreq) == 0, + std::forward(args)...); + } + + // To support tracking ticket numbers in MPMCPipelineStageImpl + template void enqueueWithTicket(uint64_t ticket, Args&&... args) noexcept { - slots_[idx(ticket)].enqueue(turn(ticket), - pushSpinCutoff_, - (ticket % kAdaptationFreq) == 0, - std::forward(args)...); + enqueueWithTicketBase(ticket, slots_, capacity_, stride_, + std::forward(args)...); } // Given a ticket, dequeues the corresponding element - void dequeueWithTicket(uint64_t ticket, T& elem) noexcept { - slots_[idx(ticket)].dequeue(turn(ticket), - popSpinCutoff_, - (ticket % kAdaptationFreq) == 0, - elem); + void dequeueWithTicketBase( + uint64_t ticket, Slot* slots, size_t cap, int stride, T& elem + ) noexcept { + slots[idx(ticket, cap, stride)] + .dequeue(turn(ticket, cap), + popSpinCutoff_, + (ticket % kAdaptationFreq) == 0, + elem); } }; - -namespace detail { - /// SingleElementQueue implements a blocking queue that holds at most one /// item, and that requires its users to assign incrementing identifiers /// (turns) to each enqueue and dequeue operation. Note that the turns diff --git a/folly/detail/MPMCPipelineDetail.h b/folly/detail/MPMCPipelineDetail.h index aa2d07e4..9ef7a39a 100644 --- a/folly/detail/MPMCPipelineDetail.h +++ b/folly/detail/MPMCPipelineDetail.h @@ -76,8 +76,8 @@ class MPMCPipelineStageImpl { } uint64_t blockingRead(T& elem) noexcept { - uint64_t ticket = queue_.popTicket_++; - queue_.dequeueWithTicket(ticket, elem); + uint64_t ticket; + queue_.blockingReadWithTicket(ticket, elem); return ticket; } @@ -87,12 +87,7 @@ class MPMCPipelineStageImpl { template bool readAndGetTicket(uint64_t& ticket, T& elem) noexcept { - if (queue_.tryObtainReadyPopTicket(ticket)) { - queue_.dequeueWithTicket(ticket, elem); - return true; - } else { - return false; - } + return queue_.readAndGetTicket(ticket, elem); } // See MPMCQueue::writeCount; only works for the first stage diff --git a/folly/test/MPMCQueueTest.cpp b/folly/test/MPMCQueueTest.cpp index 05ff5715..f4a195a4 100644 --- a/folly/test/MPMCQueueTest.cpp +++ b/folly/test/MPMCQueueTest.cpp @@ -101,9 +101,9 @@ TEST(MPMCQueue, sequencer_deterministic) { run_mt_sequencer_test(10, 1000, -100); } -template +template void runElementTypeTest(T&& src) { - MPMCQueue cq(10); + MPMCQueue cq(10); cq.blockingWrite(std::forward(src)); T dest; cq.blockingRead(dest); @@ -153,7 +153,21 @@ TEST(MPMCQueue, lots_of_element_types) { EXPECT_EQ(RefCounted::active_instances, 0); } +TEST(MPMCQueue, lots_of_element_types_dynamic) { + runElementTypeTest(10); + runElementTypeTest(string("abc")); + runElementTypeTest(std::make_pair(10, string("def"))); + runElementTypeTest(vector{{"abc"}}); + runElementTypeTest(std::make_shared('a')); + runElementTypeTest(folly::make_unique('a')); + runElementTypeTest(boost::intrusive_ptr(new RefCounted)); + EXPECT_EQ(RefCounted::active_instances, 0); +} + TEST(MPMCQueue, single_thread_enqdeq) { + // Non-dynamic version only. + // False positive for dynamic version. Capacity can be temporarily + // higher than specified. MPMCQueue cq(10); for (int pass = 0; pass < 10; ++pass) { @@ -184,6 +198,9 @@ TEST(MPMCQueue, single_thread_enqdeq) { } TEST(MPMCQueue, tryenq_capacity_test) { + // Non-dynamic version only. + // False positive for dynamic version. Capacity can be temporarily + // higher than specified. for (size_t cap = 1; cap < 100; ++cap) { MPMCQueue cq(cap); for (size_t i = 0; i < cap; ++i) { @@ -194,6 +211,9 @@ TEST(MPMCQueue, tryenq_capacity_test) { } TEST(MPMCQueue, enq_capacity_test) { + // Non-dynamic version only. + // False positive for dynamic version. Capacity can be temporarily + // higher than specified. for (auto cap : { 1, 100, 10000 }) { MPMCQueue cq(cap); for (int i = 0; i < cap; ++i) { @@ -214,11 +234,11 @@ TEST(MPMCQueue, enq_capacity_test) { } } -template class Atom> +template class Atom, bool Dynamic = false> void runTryEnqDeqThread( int numThreads, int n, /*numOps*/ - MPMCQueue& cq, + MPMCQueue& cq, std::atomic& sum, int t) { uint64_t threadSum = 0; @@ -241,18 +261,18 @@ void runTryEnqDeqThread( sum += threadSum; } -template class Atom> +template class Atom, bool Dynamic = false> void runTryEnqDeqTest(int numThreads, int numOps) { // write and read aren't linearizable, so we don't have // hard guarantees on their individual behavior. We can still test // correctness in aggregate - MPMCQueue cq(numThreads); + MPMCQueue cq(numThreads); uint64_t n = numOps; vector threads(numThreads); std::atomic sum(0); for (int t = 0; t < numThreads; ++t) { - threads[t] = DSched::thread(std::bind(runTryEnqDeqThread, + threads[t] = DSched::thread(std::bind(runTryEnqDeqThread, numThreads, n, std::ref(cq), std::ref(sum), t)); } for (auto& t : threads) { @@ -271,6 +291,15 @@ TEST(MPMCQueue, mt_try_enq_deq) { } } +TEST(MPMCQueue, mt_try_enq_deq_dynamic) { + int nts[] = { 1, 3, 100 }; + + int n = 100000; + for (int nt : nts) { + runTryEnqDeqTest(nt, n); + } +} + TEST(MPMCQueue, mt_try_enq_deq_emulated_futex) { int nts[] = { 1, 3, 100 }; @@ -280,6 +309,15 @@ TEST(MPMCQueue, mt_try_enq_deq_emulated_futex) { } } +TEST(MPMCQueue, mt_try_enq_deq_emulated_futex_dynamic) { + int nts[] = { 1, 3, 100 }; + + int n = 100000; + for (int nt : nts) { + runTryEnqDeqTest(nt, n); + } +} + TEST(MPMCQueue, mt_try_enq_deq_deterministic) { int nts[] = { 3, 10 }; @@ -296,6 +334,14 @@ TEST(MPMCQueue, mt_try_enq_deq_deterministic) { DSched sched(DSched::uniformSubset(seed, 2)); runTryEnqDeqTest(nt, n); } + { + DSched sched(DSched::uniform(seed)); + runTryEnqDeqTest(nt, n); + } + { + DSched sched(DSched::uniformSubset(seed, 2)); + runTryEnqDeqTest(nt, n); + } } } @@ -414,10 +460,11 @@ string producerConsumerBench(Q&& queue, long csw = endUsage.ru_nvcsw + endUsage.ru_nivcsw - (beginUsage.ru_nvcsw + beginUsage.ru_nivcsw); uint64_t failures = failed; + size_t allocated = q.allocatedCapacity(); return folly::sformat( "{}, {} {} producers, {} consumers => {} nanos/handoff, {} csw / {} " - "handoff, {} failures", + "handoff, {} failures, {} allocated", qName, numProducers, writer.methodName(), @@ -425,134 +472,254 @@ string producerConsumerBench(Q&& queue, nanosPer, csw, n, - failures); + failures, + allocated); } -TEST(MPMCQueue, mt_prod_cons_deterministic) { +template +void runMtProdConsDeterministic(long seed) { // we use the Bench method, but perf results are meaningless under DSched - DSched sched(DSched::uniform(0)); + DSched sched(DSched::uniform(seed)); + + vector>>> callers; + callers.emplace_back(make_unique>>()); + callers.emplace_back(make_unique>>()); + callers.emplace_back(make_unique>>()); + callers.emplace_back(make_unique>>(milliseconds(1))); + callers.emplace_back(make_unique>>(seconds(2))); + size_t cap; + + for (const auto& caller : callers) { + cap = 10; + LOG(INFO) << + producerConsumerBench( + MPMCQueue(cap), + "MPMCQueue(" + + folly::to(cap)+")", + 1, + 1, + 1000, + *caller); + cap = 100; + LOG(INFO) << + producerConsumerBench( + MPMCQueue(cap), + "MPMCQueue(" + + folly::to(cap)+")", + 10, + 10, + 1000, + *caller); + cap = 10; + LOG(INFO) << + producerConsumerBench( + MPMCQueue(cap), + "MPMCQueue(" + + folly::to(cap)+")", + 1, + 1, + 1000, + *caller); + cap = 100; + LOG(INFO) << + producerConsumerBench( + MPMCQueue(cap), + "MPMCQueue(" + + folly::to(cap)+")", + 10, + 10, + 1000, + *caller); + cap = 1; + LOG(INFO) << + producerConsumerBench( + MPMCQueue(cap), + "MPMCQueue(" + + folly::to(cap)+")", + 10, + 10, + 1000, + *caller); + } +} - vector>>> - callers; - callers.emplace_back( - make_unique>>()); - callers.emplace_back( - make_unique>>()); - callers.emplace_back( - make_unique>>()); - callers.emplace_back( - make_unique>>( - milliseconds(1))); - callers.emplace_back( - make_unique>>( - seconds(2))); +void runMtProdConsDeterministicDynamic( + long seed, + uint32_t prods, + uint32_t cons, + uint32_t numOps, + size_t cap, + size_t minCap, + size_t mult +) { + // we use the Bench method, but perf results are meaningless under DSched + DSched sched(DSched::uniform(seed)); + + vector>>> callers; + callers.emplace_back(make_unique>>()); + callers.emplace_back(make_unique>>()); + callers.emplace_back(make_unique>>()); + callers.emplace_back(make_unique>>(milliseconds(1))); + callers.emplace_back(make_unique>>(seconds(2))); for (const auto& caller : callers) { - LOG(INFO) - << producerConsumerBench(MPMCQueue(10), - "MPMCQueue(10)", - 1, - 1, - 1000, - *caller); - LOG(INFO) - << producerConsumerBench(MPMCQueue(100), - "MPMCQueue(100)", - 10, - 10, - 1000, - *caller); - LOG(INFO) - << producerConsumerBench(MPMCQueue(10), - "MPMCQueue(10)", - 1, - 1, - 1000, - *caller); - LOG(INFO) - << producerConsumerBench(MPMCQueue(100), - "MPMCQueue(100)", - 10, - 10, - 1000, - *caller); - LOG(INFO) << producerConsumerBench(MPMCQueue(1), - "MPMCQueue(1)", - 10, - 10, - 1000, - *caller); + LOG(INFO) << + producerConsumerBench( + MPMCQueue(cap, minCap, mult), + "MPMCQueue(" + + folly::to(cap) + ", " + + folly::to(minCap) + ", " + + folly::to(mult)+")", + prods, + cons, + numOps, + *caller); } } +TEST(MPMCQueue, mt_prod_cons_deterministic) { + runMtProdConsDeterministic(0); +} + +TEST(MPMCQueue, mt_prod_cons_deterministic_dynamic) { + runMtProdConsDeterministic(0); +} + +template +void setFromEnv(T& var, const char* envvar) { + char* str = std::getenv(envvar); + if (str) { var = atoi(str); } +} + +TEST(MPMCQueue, mt_prod_cons_deterministic_dynamic_with_arguments) { + long seed = 0; + uint32_t prods = 10; + uint32_t cons = 10; + uint32_t numOps = 1000; + size_t cap = 10000; + size_t minCap = 9; + size_t mult = 3; + setFromEnv(seed, "SEED"); + setFromEnv(prods, "PRODS"); + setFromEnv(cons, "CONS"); + setFromEnv(numOps, "NUM_OPS"); + setFromEnv(cap, "CAP"); + setFromEnv(minCap, "MIN_CAP"); + setFromEnv(mult, "MULT"); + runMtProdConsDeterministicDynamic( + seed, prods, cons, numOps, cap, minCap, mult); +} + #define PC_BENCH(q, np, nc, ...) \ producerConsumerBench(q, #q, (np), (nc), __VA_ARGS__) -TEST(MPMCQueue, mt_prod_cons) { +template +void runMtProdCons() { int n = 100000; - vector>>> callers; - callers.emplace_back(make_unique>>()); - callers.emplace_back(make_unique>>()); - callers.emplace_back(make_unique>>()); - callers.emplace_back( - make_unique>>(milliseconds(1))); - callers.emplace_back( - make_unique>>(seconds(2))); + setFromEnv(n, "NUM_OPS"); + vector>>> + callers; + callers.emplace_back(make_unique>>()); + callers.emplace_back(make_unique>>()); + callers.emplace_back(make_unique>>()); + callers.emplace_back(make_unique>>(milliseconds(1))); + callers.emplace_back(make_unique>>(seconds(2))); for (const auto& caller : callers) { - LOG(INFO) << PC_BENCH(MPMCQueue(10), 1, 1, n, *caller); - LOG(INFO) << PC_BENCH(MPMCQueue(10), 10, 1, n, *caller); - LOG(INFO) << PC_BENCH(MPMCQueue(10), 1, 10, n, *caller); - LOG(INFO) << PC_BENCH(MPMCQueue(10), 10, 10, n, *caller); - LOG(INFO) << PC_BENCH(MPMCQueue(10000), 1, 1, n, *caller); - LOG(INFO) << PC_BENCH(MPMCQueue(10000), 10, 1, n, *caller); - LOG(INFO) << PC_BENCH(MPMCQueue(10000), 1, 10, n, *caller); - LOG(INFO) << PC_BENCH(MPMCQueue(10000), 10, 10, n, *caller); - LOG(INFO) << PC_BENCH(MPMCQueue(100000), 32, 100, n, *caller); + LOG(INFO) << PC_BENCH((MPMCQueue(10)), + 1, 1, n, *caller); + LOG(INFO) << PC_BENCH((MPMCQueue(10)), + 10, 1, n, *caller); + LOG(INFO) << PC_BENCH((MPMCQueue(10)), + 1, 10, n, *caller); + LOG(INFO) << PC_BENCH((MPMCQueue(10)), + 10, 10, n, *caller); + LOG(INFO) << PC_BENCH((MPMCQueue(10000)), + 1, 1, n, *caller); + LOG(INFO) << PC_BENCH((MPMCQueue(10000)), + 10, 1, n, *caller); + LOG(INFO) << PC_BENCH((MPMCQueue(10000)), + 1, 10, n, *caller); + LOG(INFO) << PC_BENCH((MPMCQueue(10000)), + 10, 10, n, *caller); + LOG(INFO) << PC_BENCH((MPMCQueue(100000)), + 32, 100, n, *caller); } } -TEST(MPMCQueue, mt_prod_cons_emulated_futex) { +TEST(MPMCQueue, mt_prod_cons) { + runMtProdCons(); +} + +TEST(MPMCQueue, mt_prod_cons_dynamic) { + runMtProdCons(); +} + +template +void runMtProdConsEmulatedFutex() { int n = 100000; - vector>>> - callers; - callers.emplace_back( - make_unique>>()); - callers.emplace_back( - make_unique>>()); - callers.emplace_back( - make_unique>>()); - callers.emplace_back( - make_unique>>( - milliseconds(1))); - callers.emplace_back( - make_unique>>( - seconds(2))); + vector>>> callers; + callers.emplace_back(make_unique>>()); + callers.emplace_back(make_unique>>()); + callers.emplace_back(make_unique>>()); + callers.emplace_back(make_unique>>(milliseconds(1))); + callers.emplace_back(make_unique>>(seconds(2))); for (const auto& caller : callers) { LOG(INFO) << PC_BENCH( - (MPMCQueue(10)), 1, 1, n, *caller); + (MPMCQueue(10)), 1, 1, n, *caller); LOG(INFO) << PC_BENCH( - (MPMCQueue(10)), 10, 1, n, *caller); + (MPMCQueue(10)), 10, 1, n, *caller); LOG(INFO) << PC_BENCH( - (MPMCQueue(10)), 1, 10, n, *caller); + (MPMCQueue(10)), 1, 10, n, *caller); LOG(INFO) << PC_BENCH( - (MPMCQueue(10)), 10, 10, n, *caller); + (MPMCQueue(10)), 10, 10, n, *caller); LOG(INFO) << PC_BENCH( - (MPMCQueue(10000)), 1, 1, n, *caller); + (MPMCQueue(10000)), 1, 1, n, *caller); LOG(INFO) << PC_BENCH( - (MPMCQueue(10000)), 10, 1, n, *caller); + (MPMCQueue(10000)), 10, 1, n, *caller); LOG(INFO) << PC_BENCH( - (MPMCQueue(10000)), 1, 10, n, *caller); - LOG(INFO) << PC_BENCH( - (MPMCQueue(10000)), 10, 10, n, *caller); - LOG(INFO) << PC_BENCH( - (MPMCQueue(100000)), 32, 100, n, *caller); + (MPMCQueue(10000)), 1, 10, n, *caller); + LOG(INFO) << PC_BENCH((MPMCQueue + (10000)), 10, 10, n, *caller); + LOG(INFO) << PC_BENCH((MPMCQueue + (100000)), 32, 100, n, *caller); } } -template