Dynamic expansion of folly MPMC queue
authorMaged Michael <magedmichael@fb.com>
Thu, 8 Sep 2016 17:43:23 +0000 (10:43 -0700)
committerFacebook Github Bot 4 <facebook-github-bot-4-bot@fb.com>
Thu, 8 Sep 2016 17:53:27 +0000 (10:53 -0700)
Summary:
This diff allows queues to start with small capacity and expand as needed up to the specified capacity.
The main additions and changes:
- Extra template parameter `Dynamic` that enables dynamic expansion (`default 'false').
- `ClosedArray` type.
- Extra members:
  -- `dstate_`: a packed 64 bit unsigned int that contains a seqlock (which implicitly indicates the number of expansions and the lowest ticket for the current `dslots_/dcapacity_/dstride_` configuration.
 -- `dcapacity_`: current dynamic capacity.
 -- `dslots_`: current dynamic slots array. (in anonymous union with `slots_`)
 -- `dstride_`: current dynamic stride. (in anonymous union with `stride_`)
 -- `closed_` a logarithmic-sized array of ClosedArray to hold information about earlier smaller queue arrays for use by lagging consumers.

Design sketch:
- Reallocate a new larger array on expansion
- Expansion uses a seqlock. The common case critical path includes a seqlock read-only section.
- Lagging consumers and lagging blocking producers use a logarithmic-sized array for info about closed arrays
- Tickets are adjusted by an offset (to accounts for the tickets associated with earlier closed arrays) in order to calculate appropriate index and turn.
- The synching of `pushTicket_` with the ticket offset packed in `dstate_` is tricky. `pushTicket_` is accessed outside `dstate_`'s seqlock.

Reviewed By: djwatson

Differential Revision: D3462592

fbshipit-source-id: d442a7694190cca3c33753409ffac941d7463f83

folly/MPMCQueue.h
folly/detail/MPMCPipelineDetail.h
folly/test/MPMCQueueTest.cpp

index 08b2e1b8f7e31cd86b9881c30311fcb1fe08ed7f..2bf89c5301e41b4bde6d87a5108652db3f04e419 100644 (file)
@@ -38,6 +38,9 @@ struct SingleElementQueue;
 
 template <typename T> class MPMCPipelineStageImpl;
 
+/// MPMCQueue base CRTP template
+template <typename> class MPMCQueueBase;
+
 } // namespace detail
 
 /// MPMCQueue<T> is a high-performance bounded concurrent queue that
@@ -93,34 +96,536 @@ template <typename T> class MPMCPipelineStageImpl;
 /// are you can enqueue one sentinel and then have each consumer requeue
 /// two sentinels after it receives it (by requeuing 2 the shutdown can
 /// complete in O(log P) time instead of O(P)).
-template<typename T,
-         template<typename> class Atom = std::atomic>
-class MPMCQueue : boost::noncopyable {
+template<typename T, template<typename> class Atom = std::atomic,
+         bool Dynamic = false>
+class MPMCQueue : public detail::MPMCQueueBase<MPMCQueue<T,Atom,Dynamic>> {
+  friend class detail::MPMCPipelineStageImpl<T>;
+  using Slot = detail::SingleElementQueue<T,Atom>;
+ public:
+
+  explicit MPMCQueue(size_t queueCapacity)
+    : detail::MPMCQueueBase<MPMCQueue<T,Atom,Dynamic>>(queueCapacity)
+  {
+    this->stride_ = this->computeStride(queueCapacity);
+    this->slots_ = new Slot[queueCapacity + 2 * this->kSlotPadding];
+  }
+
+  MPMCQueue() noexcept { }
+};
+
+/// The dynamic version of MPMCQueue allows dynamic expansion of queue
+/// capacity, such that a queue may start with a smaller capacity than
+/// specified and expand only if needed. Users may optionally specify
+/// the initial capacity and the expansion multiplier.
+///
+/// The design uses a seqlock to enforce mutual exclusion among
+/// expansion attempts. Regular operations read up-to-date queue
+/// information (slots array, capacity, stride) inside read-only
+/// seqlock sections, which are unimpeded when no expansion is in
+/// progress.
+///
+/// An expansion computes a new capacity, allocates a new slots array,
+/// and updates stride. No information needs to be copied from the
+/// current slots array to the new one. When this happens, new slots
+/// will not have sequence numbers that match ticket numbers. The
+/// expansion needs to compute a ticket offset such that operations
+/// that use new arrays can adjust the calculations of slot indexes
+/// and sequence numbers that take into account that the new slots
+/// start with sequence numbers of zero. The current ticket offset is
+/// packed with the seqlock in an atomic 64-bit integer. The initial
+/// offset is zero.
+///
+/// Lagging write and read operations with tickets lower than the
+/// ticket offset of the current slots array (i.e., the minimum ticket
+/// number that can be served by the current array) must use earlier
+/// closed arrays instead of the current one. Information about closed
+/// slots arrays (array address, capacity, stride, and offset) is
+/// maintained in a logarithmic-sized structure. Each entry in that
+/// structure never need to be changed once set. The number of closed
+/// arrays is half the value of the seqlock (when unlocked).
+///
+/// The acquisition of the seqlock to perform an expansion does not
+/// prevent the issuing of new push and pop tickets concurrently. The
+/// expansion must set the new ticket offset to a value that couldn't
+/// have been issued to an operation that has already gone through a
+/// seqlock read-only section (and hence obtained information for
+/// older closed arrays).
+///
+/// Note that the total queue capacity can temporarily exceed the
+/// specified capacity when there are lagging consumers that haven't
+/// yet consumed all the elements in closed arrays. Users should not
+/// rely on the capacity of dynamic queues for synchronization, e.g.,
+/// they should not expect that a thread will definitely block on a
+/// call to blockingWrite() when the queue size is known to be equal
+/// to its capacity.
+///
+/// The dynamic version is a partial specialization of MPMCQueue with
+/// Dynamic == true
+template <typename T, template<typename> class Atom>
+class MPMCQueue<T,Atom,true> :
+      public detail::MPMCQueueBase<MPMCQueue<T,Atom,true>> {
+  friend class detail::MPMCQueueBase<MPMCQueue<T,Atom,true>>;
+  using Slot = detail::SingleElementQueue<T,Atom>;
+
+  struct ClosedArray {
+    uint64_t offset_ {0};
+    Slot* slots_ {nullptr};
+    size_t capacity_ {0};
+    int stride_ {0};
+  };
+
+ public:
+
+  explicit MPMCQueue(size_t queueCapacity)
+    : detail::MPMCQueueBase<MPMCQueue<T,Atom,true>>(queueCapacity)
+  {
+    size_t cap = std::min<size_t>(kDefaultMinDynamicCapacity, queueCapacity);
+    initQueue(cap, kDefaultExpansionMultiplier);
+  }
+
+  explicit MPMCQueue(size_t queueCapacity,
+                     size_t minCapacity,
+                     size_t expansionMultiplier)
+    : detail::MPMCQueueBase<MPMCQueue<T,Atom,true>>(queueCapacity)
+  {
+    minCapacity = std::max<size_t>(1, minCapacity);
+    size_t cap = std::min<size_t>(minCapacity, queueCapacity);
+    expansionMultiplier = std::max<size_t>(2, expansionMultiplier);
+    initQueue(cap, expansionMultiplier);
+  }
+
+  MPMCQueue() noexcept {
+    dmult_ = 0;
+    closed_ = nullptr;
+  }
+
+  MPMCQueue(MPMCQueue<T,Atom,true>&& rhs) noexcept {
+    this->capacity_ = rhs.capacity_;
+    this->slots_ = rhs.slots_;
+    this->stride_ = rhs.stride_;
+    this->dstate_.store(rhs.dstate_.load(std::memory_order_relaxed),
+                        std::memory_order_relaxed);
+    this->dcapacity_.store(rhs.dcapacity_.load(std::memory_order_relaxed),
+                           std::memory_order_relaxed);
+    this->pushTicket_.store(rhs.pushTicket_.load(std::memory_order_relaxed),
+                            std::memory_order_relaxed);
+    this->popTicket_.store(rhs.popTicket_.load(std::memory_order_relaxed),
+                           std::memory_order_relaxed);
+    this->pushSpinCutoff_.store(
+      rhs.pushSpinCutoff_.load(std::memory_order_relaxed),
+      std::memory_order_relaxed);
+    this->popSpinCutoff_.store(
+      rhs.popSpinCutoff_.load(std::memory_order_relaxed),
+      std::memory_order_relaxed);
+    dmult_ = rhs.dmult_;
+    closed_ = rhs.closed_;
+
+    rhs.capacity_ = 0;
+    rhs.slots_ = nullptr;
+    rhs.stride_ = 0;
+    rhs.dstate_.store(0, std::memory_order_relaxed);
+    rhs.dcapacity_.store(0, std::memory_order_relaxed);
+    rhs.pushTicket_.store(0, std::memory_order_relaxed);
+    rhs.popTicket_.store(0, std::memory_order_relaxed);
+    rhs.pushSpinCutoff_.store(0, std::memory_order_relaxed);
+    rhs.popSpinCutoff_.store(0, std::memory_order_relaxed);
+    rhs.dmult_ = 0;
+    rhs.closed_ = nullptr;
+  }
+
+  MPMCQueue<T,Atom, true> const& operator= (MPMCQueue<T,Atom, true>&& rhs) {
+    if (this != &rhs) {
+      this->~MPMCQueue();
+      new (this) MPMCQueue(std::move(rhs));
+    }
+    return *this;
+  }
+
+  ~MPMCQueue() {
+    if (closed_ != nullptr) {
+      for (int i = getNumClosed(this->dstate_.load()) - 1; i >= 0; --i) {
+        delete[] closed_[i].slots_;
+      }
+      delete[] closed_;
+    }
+  }
+
+  size_t allocatedCapacity() const noexcept {
+    return this->dcapacity_.load(std::memory_order_relaxed);
+  }
+
+  template <typename ...Args>
+  void blockingWrite(Args&&... args) noexcept {
+    uint64_t ticket = this->pushTicket_++;
+    Slot* slots;
+    size_t cap;
+    int stride;
+    uint64_t state;
+    uint64_t offset;
+    do {
+      if (!trySeqlockReadSection(state, slots, cap, stride)) {
+        continue;
+      }
+      offset = getOffset(state);
+      if (ticket < offset) {
+        // There was an expansion after this ticket was issued.
+        updateFromClosed(state, ticket, offset, slots, cap, stride);
+        break;
+      }
+      if (slots[this->idx((ticket-offset), cap, stride)]
+          .mayEnqueue(this->turn(ticket-offset, cap))) {
+        // A slot is ready. No need to expand.
+        break;
+      } else if (this->popTicket_.load(std::memory_order_relaxed) + cap
+                 > ticket) {
+        // May block, but a pop is in progress. No need to expand.
+        // Get seqlock read section info again in case an expansion
+        // occurred with an equal or higher ticket.
+        continue;
+      } else {
+        // May block. See if we can expand.
+        if (tryExpand(state, cap)) {
+          // This or another thread started an expansion. Get updated info.
+          continue;
+        } else {
+          // Can't expand.
+          break;
+        }
+      }
+    } while (true);
+    this->enqueueWithTicketBase(ticket-offset, slots, cap, stride,
+                                std::forward<Args>(args)...);
+  }
+
+  void blockingReadWithTicket(uint64_t& ticket, T& elem) noexcept {
+    ticket = this->popTicket_++;
+    Slot* slots;
+    size_t cap;
+    int stride;
+    uint64_t state;
+    uint64_t offset;
+    while (!trySeqlockReadSection(state, slots, cap, stride));
+    offset = getOffset(state);
+    if (ticket < offset) {
+      // There was an expansion after the corresponding push ticket
+      // was issued.
+      updateFromClosed(state, ticket, offset, slots, cap, stride);
+    }
+    this->dequeueWithTicketBase(ticket-offset, slots, cap, stride, elem);
+  }
+
+ private:
+
+  enum {
+    kSeqlockBits = 6,
+    kDefaultMinDynamicCapacity = 10,
+    kDefaultExpansionMultiplier = 10,
+  };
+
+  size_t dmult_;
+
+  //  Info about closed slots arrays for use by lagging operations
+  ClosedArray* closed_;
+
+  void initQueue(const size_t cap, const size_t mult) {
+    this->stride_ = this->computeStride(cap);
+    this->slots_ = new Slot[cap + 2 * this->kSlotPadding];
+    this->dstate_.store(0);
+    this->dcapacity_.store(cap);
+    dmult_ = mult;
+    size_t maxClosed = 0;
+    for (size_t expanded = cap;
+         expanded < this->capacity_;
+         expanded *= mult) {
+      ++maxClosed;
+    }
+    closed_ = (maxClosed > 0) ? new ClosedArray[maxClosed] : nullptr;
+  }
+
+  bool tryObtainReadyPushTicket(
+      uint64_t& ticket, Slot*& slots, size_t& cap, int& stride
+  ) noexcept {
+    uint64_t state;
+    do {
+      ticket = this->pushTicket_.load(std::memory_order_acquire); // A
+      if (!trySeqlockReadSection(state, slots, cap, stride)) {
+        continue;
+      }
+      uint64_t offset = getOffset(state);
+      if (ticket < offset) {
+        // There was an expansion with offset greater than this ticket
+        updateFromClosed(state, ticket, offset, slots, cap, stride);
+      }
+      if (slots[this->idx((ticket-offset), cap, stride)]
+          .mayEnqueue(this->turn(ticket-offset, cap))) {
+        // A slot is ready.
+        if (this->pushTicket_.compare_exchange_strong(ticket, ticket + 1)) {
+          // Adjust ticket
+          ticket -= offset;
+          return true;
+        } else {
+          continue;
+        }
+      } else {
+        if (ticket != this->pushTicket_.load(std::memory_order_relaxed)) { // B
+          // Try again. Ticket changed.
+          continue;
+        }
+        // Likely to block.
+        // Try to expand unless the ticket is for a closed array
+        if (offset == getOffset(state)) {
+          if (tryExpand(state, cap)) {
+            // This or another thread started an expansion. Get up-to-date info.
+            continue;
+          }
+        }
+        return false;
+      }
+    } while (true);
+  }
+
+  bool tryObtainPromisedPushTicket(
+    uint64_t& ticket, Slot*& slots, size_t& cap, int& stride
+  ) noexcept {
+    uint64_t state;
+    do {
+      ticket = this->pushTicket_.load(std::memory_order_acquire);
+      auto numPops = this->popTicket_.load(std::memory_order_acquire);
+      if (!trySeqlockReadSection(state, slots, cap, stride)) {
+        continue;
+      }
+      int64_t n = ticket - numPops;
+      if (n >= static_cast<ssize_t>(this->capacity_)) {
+        return false;
+      }
+      if ((n >= static_cast<ssize_t>(cap))) {
+        if (tryExpand(state, cap)) {
+          // This or another thread started an expansion. Start over
+          // with a new state.
+          continue;
+        } else {
+          // Can't expand.
+          return false;
+        }
+      }
+      uint64_t offset = getOffset(state);
+      if (ticket < offset) {
+        // There was an expansion with offset greater than this ticket
+        updateFromClosed(state, ticket, offset, slots, cap, stride);
+      }
+      if (this->pushTicket_.compare_exchange_strong(ticket, ticket + 1)) {
+        // Adjust ticket
+        ticket -= offset;
+        return true;
+      }
+    } while (true);
+  }
+
+  bool tryObtainReadyPopTicket(
+    uint64_t& ticket, Slot*& slots, size_t& cap, int& stride
+  ) noexcept {
+    uint64_t state;
+    do {
+      ticket = this->popTicket_.load(std::memory_order_relaxed);
+      if (!trySeqlockReadSection(state, slots, cap, stride)) {
+        continue;
+      }
+      uint64_t offset = getOffset(state);
+      if (ticket < offset) {
+        // There was an expansion after the corresponding push ticket
+        // was issued.
+        updateFromClosed(state, ticket, offset, slots, cap, stride);
+      }
+      if (slots[this->idx((ticket-offset), cap, stride)]
+          .mayDequeue(this->turn(ticket-offset, cap))) {
+        if (this->popTicket_.compare_exchange_strong(ticket, ticket + 1)) {
+          // Adjust ticket
+          ticket -= offset;
+          return true;
+        }
+      } else {
+        return false;
+      }
+    } while (true);
+  }
+
+  bool tryObtainPromisedPopTicket(
+    uint64_t& ticket, Slot*& slots, size_t& cap, int& stride
+  ) noexcept {
+    uint64_t state;
+    do {
+      ticket = this->popTicket_.load(std::memory_order_acquire);
+      auto numPushes = this->pushTicket_.load(std::memory_order_acquire);
+      if (!trySeqlockReadSection(state, slots, cap, stride)) {
+        continue;
+      }
+      if (ticket >= numPushes) {
+        return false;
+      }
+      if (this->popTicket_.compare_exchange_strong(ticket, ticket + 1)) {
+        // Adjust ticket
+        uint64_t offset = getOffset(state);
+        if (ticket < offset) {
+          // There was an expansion after the corresponding push
+          // ticket was issued.
+          updateFromClosed(state, ticket, offset, slots, cap, stride);
+        }
+        // Adjust ticket
+        ticket -= offset;
+        return true;
+      }
+    } while (true);
+  }
+
+  /// Enqueues an element with a specific ticket number
+  template <typename ...Args>
+  void enqueueWithTicket(const uint64_t ticket, Args&&... args) noexcept {
+    Slot* slots;
+    size_t cap;
+    int stride;
+    uint64_t state;
+    uint64_t offset;
+    while (!trySeqlockReadSection(state, slots, cap, stride)) {}
+    offset = getOffset(state);
+    if (ticket < offset) {
+      // There was an expansion after this ticket was issued.
+      updateFromClosed(state, ticket, offset, slots, cap, stride);
+    }
+    this->enqueueWithTicketBase(ticket-offset, slots, cap, stride,
+                                std::forward<Args>(args)...);
+  }
+
+  uint64_t getOffset(const uint64_t state) const noexcept {
+    return state >> kSeqlockBits;
+  }
+
+  int getNumClosed(const uint64_t state) const noexcept {
+    return (state & ((1 << kSeqlockBits) - 1)) >> 1;
+  }
+
+  /// Try to expand the queue. Returns true if this expansion was
+  /// successful or a concurent expansion is in progress. Returns
+  /// false if the queue has reached its maximum capacity or
+  /// allocation has failed.
+  bool tryExpand(const uint64_t state, const size_t cap) noexcept {
+    if (cap == this->capacity_) {
+      return false;
+    }
+    // Acquire seqlock
+    uint64_t oldval = state;
+    assert((state & 1) == 0);
+    if (this->dstate_.compare_exchange_strong(oldval, state + 1)) {
+      assert(cap == this->dcapacity_.load());
+      uint64_t ticket = 1 + std::max(this->pushTicket_.load(),
+                                     this->popTicket_.load());
+      size_t newCapacity =
+        std::min(dmult_ * cap, this->capacity_);
+      Slot* newSlots =
+        new (std::nothrow) Slot[newCapacity + 2 * this->kSlotPadding];
+      if (newSlots == nullptr) {
+        // Expansion failed. Restore the seqlock
+        this->dstate_.store(state);
+        return false;
+      }
+      // Successful expansion
+      // calculate the current ticket offset
+      uint64_t offset = getOffset(state);
+      // calculate index in closed array
+      int index = getNumClosed(state);
+      assert((index << 1) < (1 << kSeqlockBits));
+      // fill the info for the closed slots array
+      closed_[index].offset_ = offset;
+      closed_[index].slots_ = this->dslots_.load();
+      closed_[index].capacity_ = cap;
+      closed_[index].stride_ = this->dstride_.load();
+      // update the new slots array info
+      this->dslots_.store(newSlots);
+      this->dcapacity_.store(newCapacity);
+      this->dstride_.store(this->computeStride(newCapacity));
+      // Release the seqlock and record the new ticket offset
+      this->dstate_.store((ticket << kSeqlockBits) + (2 * (index + 1)));
+      return true;
+    } else { // failed to acquire seqlock
+      // Someone acaquired the seqlock. Go back to the caller and get
+      // up-to-date info.
+      return true;
+    }
+  }
+
+  /// Seqlock read-only section
+  bool trySeqlockReadSection(
+    uint64_t& state, Slot*& slots, size_t& cap, int& stride
+  ) noexcept {
+    state = this->dstate_.load(std::memory_order_acquire);
+    if (state & 1) {
+      // Locked.
+      return false;
+    }
+    // Start read-only section.
+    slots = this->dslots_.load(std::memory_order_relaxed);
+    cap = this->dcapacity_.load(std::memory_order_relaxed);
+    stride = this->dstride_.load(std::memory_order_relaxed);
+    // End of read-only section. Validate seqlock.
+    std::atomic_thread_fence(std::memory_order_acquire);
+    return (state == this->dstate_.load(std::memory_order_relaxed));
+  }
+
+  /// Update local variables of a lagging operation using the
+  /// most recent closed array with offset <= ticket
+  void updateFromClosed(
+    const uint64_t state, const uint64_t ticket,
+    uint64_t& offset, Slot*& slots, size_t& cap, int& stride
+  ) noexcept {
+    for (int i = getNumClosed(state) - 1; i >= 0; --i) {
+      offset = closed_[i].offset_;
+      if (offset <= ticket) {
+        slots = closed_[i].slots_;
+        cap = closed_[i].capacity_;
+        stride = closed_[i].stride_;
+        return;;
+      }
+    }
+    // A closed array with offset <= ticket should have been found
+    assert(false);
+  }
+};
+
+namespace detail {
+
+/// CRTP specialization of MPMCQueueBase
+template<
+  template<
+    typename T, template<typename> class Atom, bool Dynamic> class Derived,
+  typename T, template<typename> class Atom, bool Dynamic>
+class MPMCQueueBase<Derived<T, Atom, Dynamic>> : boost::noncopyable {
+
+// Note: Using CRTP static casts in several functions of this base
+// template instead of making called functions virtual or duplicating
+// the code of calling functions in the derived partially specialized
+// template
 
   static_assert(std::is_nothrow_constructible<T,T&&>::value ||
                 folly::IsRelocatable<T>::value,
       "T must be relocatable or have a noexcept move constructor");
 
-  friend class detail::MPMCPipelineStageImpl<T>;
  public:
   typedef T value_type;
 
-  explicit MPMCQueue(size_t queueCapacity)
+  using Slot = detail::SingleElementQueue<T,Atom>;
+
+  explicit MPMCQueueBase(size_t queueCapacity)
     : capacity_(queueCapacity)
     , pushTicket_(0)
     , popTicket_(0)
     , pushSpinCutoff_(0)
     , popSpinCutoff_(0)
   {
-    if (queueCapacity == 0)
+    if (queueCapacity == 0) {
       throw std::invalid_argument(
         "MPMCQueue with explicit capacity 0 is impossible"
+        // Stride computation in derived classes would sigfpe if capacity is 0
       );
-
-    // would sigfpe if capacity is 0
-    stride_ = computeStride(queueCapacity);
-    slots_ = new detail::SingleElementQueue<T,Atom>[queueCapacity +
-                                                    2 * kSlotPadding];
+    }
 
     // ideally this would be a static assert, but g++ doesn't allow it
     assert(alignof(MPMCQueue<T,Atom>)
@@ -132,10 +637,12 @@ class MPMCQueue : boost::noncopyable {
 
   /// A default-constructed queue is useful because a usable (non-zero
   /// capacity) queue can be moved onto it or swapped with it
-  MPMCQueue() noexcept
+  MPMCQueueBase() noexcept
     : capacity_(0)
     , slots_(nullptr)
     , stride_(0)
+    , dstate_(0)
+    , dcapacity_(0)
     , pushTicket_(0)
     , popTicket_(0)
     , pushSpinCutoff_(0)
@@ -145,10 +652,12 @@ class MPMCQueue : boost::noncopyable {
   /// IMPORTANT: The move constructor is here to make it easier to perform
   /// the initialization phase, it is not safe to use when there are any
   /// concurrent accesses (this is not checked).
-  MPMCQueue(MPMCQueue<T,Atom>&& rhs) noexcept
+  MPMCQueueBase(MPMCQueueBase<Derived<T,Atom,Dynamic>>&& rhs) noexcept
     : capacity_(rhs.capacity_)
     , slots_(rhs.slots_)
     , stride_(rhs.stride_)
+    , dstate_(rhs.dstate_.load(std::memory_order_relaxed))
+    , dcapacity_(rhs.dcapacity_.load(std::memory_order_relaxed))
     , pushTicket_(rhs.pushTicket_.load(std::memory_order_relaxed))
     , popTicket_(rhs.popTicket_.load(std::memory_order_relaxed))
     , pushSpinCutoff_(rhs.pushSpinCutoff_.load(std::memory_order_relaxed))
@@ -161,6 +670,8 @@ class MPMCQueue : boost::noncopyable {
     rhs.capacity_ = 0;
     rhs.slots_ = nullptr;
     rhs.stride_ = 0;
+    rhs.dstate_.store(0, std::memory_order_relaxed);
+    rhs.dcapacity_.store(0, std::memory_order_relaxed);
     rhs.pushTicket_.store(0, std::memory_order_relaxed);
     rhs.popTicket_.store(0, std::memory_order_relaxed);
     rhs.pushSpinCutoff_.store(0, std::memory_order_relaxed);
@@ -170,17 +681,18 @@ class MPMCQueue : boost::noncopyable {
   /// IMPORTANT: The move operator is here to make it easier to perform
   /// the initialization phase, it is not safe to use when there are any
   /// concurrent accesses (this is not checked).
-  MPMCQueue<T,Atom> const& operator= (MPMCQueue<T,Atom>&& rhs) {
+  MPMCQueueBase<Derived<T,Atom,Dynamic>> const& operator=
+    (MPMCQueueBase<Derived<T,Atom,Dynamic>>&& rhs) {
     if (this != &rhs) {
-      this->~MPMCQueue();
-      new (this) MPMCQueue(std::move(rhs));
+      this->~MPMCQueueBase();
+      new (this) MPMCQueueBase(std::move(rhs));
     }
     return *this;
   }
 
   /// MPMCQueue can only be safely destroyed when there are no
   /// pending enqueuers or dequeuers (this is not checked).
-  ~MPMCQueue() {
+  ~MPMCQueueBase() {
     delete[] slots_;
   }
 
@@ -235,6 +747,11 @@ class MPMCQueue : boost::noncopyable {
     return capacity_;
   }
 
+  /// Doesn't change for non-dynamic
+  size_t allocatedCapacity() const noexcept {
+    return capacity_;
+  }
+
   /// Returns the total number of calls to blockingWrite or successful
   /// calls to write, including those blockingWrite calls that are
   /// currently blocking
@@ -256,7 +773,8 @@ class MPMCQueue : boost::noncopyable {
   /// to a T constructor.
   template <typename ...Args>
   void blockingWrite(Args&&... args) noexcept {
-    enqueueWithTicket(pushTicket_++, std::forward<Args>(args)...);
+    enqueueWithTicketBase(pushTicket_++, slots_, capacity_, stride_,
+                          std::forward<Args>(args)...);
   }
 
   /// If an item can be enqueued with no blocking, does so and returns
@@ -275,9 +793,14 @@ class MPMCQueue : boost::noncopyable {
   template <typename ...Args>
   bool write(Args&&... args) noexcept {
     uint64_t ticket;
-    if (tryObtainReadyPushTicket(ticket)) {
+    Slot* slots;
+    size_t cap;
+    int stride;
+    if (static_cast<Derived<T,Atom,Dynamic>*>(this)->
+        tryObtainReadyPushTicket(ticket, slots, cap, stride)) {
       // we have pre-validated that the ticket won't block
-      enqueueWithTicket(ticket, std::forward<Args>(args)...);
+      enqueueWithTicketBase(ticket, slots, cap, stride,
+                            std::forward<Args>(args)...);
       return true;
     } else {
       return false;
@@ -288,11 +811,15 @@ class MPMCQueue : boost::noncopyable {
   bool tryWriteUntil(const std::chrono::time_point<Clock>& when,
                      Args&&... args) noexcept {
     uint64_t ticket;
-    if (tryObtainPromisedPushTicketUntil(ticket, when)) {
-      // we have pre-validated that the ticket won't block, or rather that
-      // it won't block longer than it takes another thread to dequeue an
-      // element from the slot it identifies.
-      enqueueWithTicket(ticket, std::forward<Args>(args)...);
+    Slot* slots;
+    size_t cap;
+    int stride;
+    if (tryObtainPromisedPushTicketUntil(ticket, slots, cap, stride, when)) {
+        // we have pre-validated that the ticket won't block, or rather that
+        // it won't block longer than it takes another thread to dequeue an
+        // element from the slot it identifies.
+      enqueueWithTicketBase(ticket, slots, cap, stride,
+                            std::forward<Args>(args)...);
       return true;
     } else {
       return false;
@@ -315,10 +842,15 @@ class MPMCQueue : boost::noncopyable {
   template <typename ...Args>
   bool writeIfNotFull(Args&&... args) noexcept {
     uint64_t ticket;
-    if (tryObtainPromisedPushTicket(ticket)) {
+    Slot* slots;
+    size_t cap;
+    int stride;
+    if (static_cast<Derived<T,Atom,Dynamic>*>(this)->
+        tryObtainPromisedPushTicket(ticket, slots, cap, stride)) {
       // some other thread is already dequeuing the slot into which we
       // are going to enqueue, but we might have to wait for them to finish
-      enqueueWithTicket(ticket, std::forward<Args>(args)...);
+      enqueueWithTicketBase(ticket, slots, cap, stride,
+                            std::forward<Args>(args)...);
       return true;
     } else {
       return false;
@@ -328,16 +860,33 @@ class MPMCQueue : boost::noncopyable {
   /// Moves a dequeued element onto elem, blocking until an element
   /// is available
   void blockingRead(T& elem) noexcept {
-    dequeueWithTicket(popTicket_++, elem);
+    uint64_t ticket;
+    static_cast<Derived<T,Atom,Dynamic>*>(this)->
+      blockingReadWithTicket(ticket, elem);
+  }
+
+  /// Same as blockingRead() but also records the ticket nunmer
+  void blockingReadWithTicket(uint64_t& ticket, T& elem) noexcept {
+    ticket = popTicket_++;
+    dequeueWithTicketBase(ticket, slots_, capacity_, stride_, elem);
   }
 
   /// If an item can be dequeued with no blocking, does so and returns
   /// true, otherwise returns false.
   bool read(T& elem) noexcept {
     uint64_t ticket;
-    if (tryObtainReadyPopTicket(ticket)) {
+    return readAndGetTicket(ticket, elem);
+  }
+
+  /// Same as read() but also records the ticket nunmer
+  bool readAndGetTicket(uint64_t& ticket, T& elem) noexcept {
+    Slot* slots;
+    size_t cap;
+    int stride;
+    if (static_cast<Derived<T,Atom,Dynamic>*>(this)->
+        tryObtainReadyPopTicket(ticket, slots, cap, stride)) {
       // the ticket has been pre-validated to not block
-      dequeueWithTicket(ticket, elem);
+      dequeueWithTicketBase(ticket, slots, cap, stride, elem);
       return true;
     } else {
       return false;
@@ -351,16 +900,20 @@ class MPMCQueue : boost::noncopyable {
   /// prefer read.
   bool readIfNotEmpty(T& elem) noexcept {
     uint64_t ticket;
-    if (tryObtainPromisedPopTicket(ticket)) {
+    Slot* slots;
+    size_t cap;
+    int stride;
+    if (static_cast<Derived<T,Atom,Dynamic>*>(this)->
+        tryObtainPromisedPopTicket(ticket, slots, cap, stride)) {
       // the matching enqueue already has a ticket, but might not be done
-      dequeueWithTicket(ticket, elem);
+      dequeueWithTicketBase(ticket, slots, cap, stride, elem);
       return true;
     } else {
       return false;
     }
   }
 
- private:
+ protected:
   enum {
     /// Once every kAdaptationFreq we will spin longer, to try to estimate
     /// the proper spin backoff
@@ -370,21 +923,41 @@ class MPMCQueue : boost::noncopyable {
     /// allocations, we pad it with this many SingleElementQueue-s at
     /// each end
     kSlotPadding = (detail::CacheLocality::kFalseSharingRange - 1)
-        / sizeof(detail::SingleElementQueue<T,Atom>) + 1
+        / sizeof(Slot) + 1
   };
 
   /// The maximum number of items in the queue at once
   size_t FOLLY_ALIGN_TO_AVOID_FALSE_SHARING capacity_;
 
-  /// An array of capacity_ SingleElementQueue-s, each of which holds
-  /// either 0 or 1 item.  We over-allocate by 2 * kSlotPadding and don't
-  /// touch the slots at either end, to avoid false sharing
-  detail::SingleElementQueue<T,Atom>* slots_;
+  /// Anonymous union for use when Dynamic = false and true, respectively
+  union {
+    /// An array of capacity_ SingleElementQueue-s, each of which holds
+    /// either 0 or 1 item.  We over-allocate by 2 * kSlotPadding and don't
+    /// touch the slots at either end, to avoid false sharing
+    Slot* slots_;
+    /// Current dynamic slots array of dcapacity_ SingleElementQueue-s
+    Atom<Slot*> dslots_;
+  };
+
+  /// Anonymous union for use when Dynamic = false and true, respectively
+  union {
+    /// The number of slots_ indices that we advance for each ticket, to
+    /// avoid false sharing.  Ideally slots_[i] and slots_[i + stride_]
+    /// aren't on the same cache line
+    int stride_;
+    /// Current stride
+    Atom<int> dstride_;
+  };
 
-  /// The number of slots_ indices that we advance for each ticket, to
-  /// avoid false sharing.  Ideally slots_[i] and slots_[i + stride_]
-  /// aren't on the same cache line
-  int stride_;
+  /// The following two memebers are used by dynamic MPMCQueue.
+  /// Ideally they should be in MPMCQueue<T,Atom,true>, but we get
+  /// better cache locality if they are in the same cache line as
+  /// dslots_ and dstride_.
+  ///
+  /// Dynamic state. A packed seqlock and ticket offset
+  Atom<uint64_t> dstate_;
+  /// Dynamic capacity
+  Atom<size_t> dcapacity_;
 
   /// Enqueuers get tickets from here
   Atom<uint64_t> FOLLY_ALIGN_TO_AVOID_FALSE_SHARING pushTicket_;
@@ -405,7 +978,6 @@ class MPMCQueue : boost::noncopyable {
   char padding_[detail::CacheLocality::kFalseSharingRange -
                 sizeof(Atom<uint32_t>)];
 
-
   /// We assign tickets in increasing order, but we don't want to
   /// access neighboring elements of slots_ because that will lead to
   /// false sharing (multiple cores accessing the same cache line even
@@ -446,23 +1018,29 @@ class MPMCQueue : boost::noncopyable {
 
   /// Returns the index into slots_ that should be used when enqueuing or
   /// dequeuing with the specified ticket
-  size_t idx(uint64_t ticket) noexcept {
-    return ((ticket * stride_) % capacity_) + kSlotPadding;
+  size_t idx(uint64_t ticket, size_t cap, int stride) noexcept {
+    return ((ticket * stride) % cap) + kSlotPadding;
   }
 
   /// Maps an enqueue or dequeue ticket to the turn should be used at the
   /// corresponding SingleElementQueue
-  uint32_t turn(uint64_t ticket) noexcept {
-    return ticket / capacity_;
+  uint32_t turn(uint64_t ticket, size_t cap) noexcept {
+    return ticket / cap;
   }
 
   /// Tries to obtain a push ticket for which SingleElementQueue::enqueue
   /// won't block.  Returns true on immediate success, false on immediate
   /// failure.
-  bool tryObtainReadyPushTicket(uint64_t& rv) noexcept {
-    auto ticket = pushTicket_.load(std::memory_order_acquire); // A
+  bool tryObtainReadyPushTicket(
+    uint64_t& ticket, Slot*& slots, size_t& cap, int& stride
+  ) noexcept {
+    ticket = pushTicket_.load(std::memory_order_acquire); // A
+    slots = slots_;
+    cap = capacity_;
+    stride = stride_;
     while (true) {
-      if (!slots_[idx(ticket)].mayEnqueue(turn(ticket))) {
+      if (!slots[idx(ticket, cap, stride)]
+          .mayEnqueue(turn(ticket, cap))) {
         // if we call enqueue(ticket, ...) on the SingleElementQueue
         // right now it would block, but this might no longer be the next
         // ticket.  We can increase the chance of tryEnqueue success under
@@ -479,7 +1057,6 @@ class MPMCQueue : boost::noncopyable {
         // or prev failing CAS) and the following CAS.  If the CAS fails
         // it will effect a load of pushTicket_
         if (pushTicket_.compare_exchange_strong(ticket, ticket + 1)) {
-          rv = ticket;
           return true;
         }
       }
@@ -492,18 +1069,22 @@ class MPMCQueue : boost::noncopyable {
   /// ticket is filled on success AND failure.
   template <class Clock>
   bool tryObtainPromisedPushTicketUntil(
-      uint64_t& ticket, const std::chrono::time_point<Clock>& when) noexcept {
+    uint64_t& ticket, Slot*& slots, size_t& cap, int& stride,
+    const std::chrono::time_point<Clock>& when
+  ) noexcept {
     bool deadlineReached = false;
     while (!deadlineReached) {
-      if (tryObtainPromisedPushTicket(ticket)) {
+      if (static_cast<Derived<T,Atom,Dynamic>*>(this)->
+          tryObtainPromisedPushTicket(ticket, slots, cap, stride)) {
         return true;
       }
       // ticket is a blocking ticket until the preceding ticket has been
       // processed: wait until this ticket's turn arrives. We have not reserved
       // this ticket so we will have to re-attempt to get a non-blocking ticket
       // if we wake up before we time-out.
-      deadlineReached = !slots_[idx(ticket)].tryWaitForEnqueueTurnUntil(
-          turn(ticket), pushSpinCutoff_, (ticket % kAdaptationFreq) == 0, when);
+      deadlineReached = !slots[idx(ticket, cap, stride)]
+        .tryWaitForEnqueueTurnUntil(turn(ticket, cap), pushSpinCutoff_,
+                                    (ticket % kAdaptationFreq) == 0, when);
     }
     return false;
   }
@@ -513,13 +1094,18 @@ class MPMCQueue : boost::noncopyable {
   /// blocking may be required when using the returned ticket if some
   /// other thread's pop is still in progress (ticket has been granted but
   /// pop has not yet completed).
-  bool tryObtainPromisedPushTicket(uint64_t& rv) noexcept {
+  bool tryObtainPromisedPushTicket(
+    uint64_t& ticket, Slot*& slots, size_t& cap, int& stride
+  ) noexcept {
     auto numPushes = pushTicket_.load(std::memory_order_acquire); // A
+    slots = slots_;
+    cap = capacity_;
+    stride = stride_;
     while (true) {
       auto numPops = popTicket_.load(std::memory_order_acquire); // B
       // n will be negative if pops are pending
       int64_t n = numPushes - numPops;
-      rv = numPushes;
+      ticket = numPushes;
       if (n >= static_cast<ssize_t>(capacity_)) {
         // Full, linearize at B.  We don't need to recheck the read we
         // performed at A, because if numPushes was stale at B then the
@@ -535,10 +1121,16 @@ class MPMCQueue : boost::noncopyable {
   /// Tries to obtain a pop ticket for which SingleElementQueue::dequeue
   /// won't block.  Returns true on immediate success, false on immediate
   /// failure.
-  bool tryObtainReadyPopTicket(uint64_t& rv) noexcept {
-    auto ticket = popTicket_.load(std::memory_order_acquire);
+  bool tryObtainReadyPopTicket(
+    uint64_t& ticket, Slot*& slots, size_t& cap, int& stride
+  ) noexcept {
+    ticket = popTicket_.load(std::memory_order_acquire);
+    slots = slots_;
+    cap = capacity_;
+    stride = stride_;
     while (true) {
-      if (!slots_[idx(ticket)].mayDequeue(turn(ticket))) {
+      if (!slots[idx(ticket, cap, stride)]
+          .mayDequeue(turn(ticket, cap))) {
         auto prev = ticket;
         ticket = popTicket_.load(std::memory_order_acquire);
         if (prev == ticket) {
@@ -546,7 +1138,6 @@ class MPMCQueue : boost::noncopyable {
         }
       } else {
         if (popTicket_.compare_exchange_strong(ticket, ticket + 1)) {
-          rv = ticket;
           return true;
         }
       }
@@ -563,7 +1154,9 @@ class MPMCQueue : boost::noncopyable {
   /// to block waiting for someone to call enqueue, although we might
   /// have to block waiting for them to finish executing code inside the
   /// MPMCQueue itself.
-  bool tryObtainPromisedPopTicket(uint64_t& rv) noexcept {
+  bool tryObtainPromisedPopTicket(
+    uint64_t& ticket, Slot*& slots, size_t& cap, int& stride
+  ) noexcept {
     auto numPops = popTicket_.load(std::memory_order_acquire); // A
     while (true) {
       auto numPushes = pushTicket_.load(std::memory_order_acquire); // B
@@ -574,7 +1167,10 @@ class MPMCQueue : boost::noncopyable {
         return false;
       }
       if (popTicket_.compare_exchange_strong(numPops, numPops + 1)) {
-        rv = numPops;
+        ticket = numPops;
+        slots = slots_;
+        cap = capacity_;
+        stride = stride_;
         return true;
       }
     }
@@ -582,25 +1178,35 @@ class MPMCQueue : boost::noncopyable {
 
   // Given a ticket, constructs an enqueued item using args
   template <typename ...Args>
+  void enqueueWithTicketBase(
+    uint64_t ticket, Slot* slots, size_t cap, int stride, Args&&... args
+  ) noexcept {
+    slots[idx(ticket, cap, stride)]
+      .enqueue(turn(ticket, cap),
+               pushSpinCutoff_,
+               (ticket % kAdaptationFreq) == 0,
+               std::forward<Args>(args)...);
+  }
+
+  // To support tracking ticket numbers in MPMCPipelineStageImpl
+  template <typename ...Args>
   void enqueueWithTicket(uint64_t ticket, Args&&... args) noexcept {
-    slots_[idx(ticket)].enqueue(turn(ticket),
-                                pushSpinCutoff_,
-                                (ticket % kAdaptationFreq) == 0,
-                                std::forward<Args>(args)...);
+    enqueueWithTicketBase(ticket, slots_, capacity_, stride_,
+                          std::forward<Args>(args)...);
   }
 
   // Given a ticket, dequeues the corresponding element
-  void dequeueWithTicket(uint64_t ticket, T& elem) noexcept {
-    slots_[idx(ticket)].dequeue(turn(ticket),
-                                popSpinCutoff_,
-                                (ticket % kAdaptationFreq) == 0,
-                                elem);
+  void dequeueWithTicketBase(
+    uint64_t ticket, Slot* slots, size_t cap, int stride, T& elem
+  ) noexcept {
+    slots[idx(ticket, cap, stride)]
+      .dequeue(turn(ticket, cap),
+               popSpinCutoff_,
+               (ticket % kAdaptationFreq) == 0,
+               elem);
   }
 };
 
-
-namespace detail {
-
 /// SingleElementQueue implements a blocking queue that holds at most one
 /// item, and that requires its users to assign incrementing identifiers
 /// (turns) to each enqueue and dequeue operation.  Note that the turns
index aa2d07e4bc9f6eefd539640270b44f04d4b2cce9..9ef7a39aaf555980f0d0516f19d29d495515e44a 100644 (file)
@@ -76,8 +76,8 @@ class MPMCPipelineStageImpl {
   }
 
   uint64_t blockingRead(T& elem) noexcept {
-    uint64_t ticket = queue_.popTicket_++;
-    queue_.dequeueWithTicket(ticket, elem);
+    uint64_t ticket;
+    queue_.blockingReadWithTicket(ticket, elem);
     return ticket;
   }
 
@@ -87,12 +87,7 @@ class MPMCPipelineStageImpl {
 
   template <class... Args>
   bool readAndGetTicket(uint64_t& ticket, T& elem) noexcept {
-    if (queue_.tryObtainReadyPopTicket(ticket)) {
-      queue_.dequeueWithTicket(ticket, elem);
-      return true;
-    } else {
-      return false;
-    }
+    return queue_.readAndGetTicket(ticket, elem);
   }
 
   // See MPMCQueue<T>::writeCount; only works for the first stage
index 05ff5715bca7af3beeea667ebad8e2477d9d1445..f4a195a492b998ce9d88f854e5d5a62bf50a191d 100644 (file)
@@ -101,9 +101,9 @@ TEST(MPMCQueue, sequencer_deterministic) {
   run_mt_sequencer_test<DeterministicAtomic>(10, 1000, -100);
 }
 
-template <typename T>
+template <bool Dynamic = false, typename T>
 void runElementTypeTest(T&& src) {
-  MPMCQueue<T> cq(10);
+  MPMCQueue<T, std::atomic, Dynamic> cq(10);
   cq.blockingWrite(std::forward<T>(src));
   T dest;
   cq.blockingRead(dest);
@@ -153,7 +153,21 @@ TEST(MPMCQueue, lots_of_element_types) {
   EXPECT_EQ(RefCounted::active_instances, 0);
 }
 
+TEST(MPMCQueue, lots_of_element_types_dynamic) {
+  runElementTypeTest<true>(10);
+  runElementTypeTest<true>(string("abc"));
+  runElementTypeTest<true>(std::make_pair(10, string("def")));
+  runElementTypeTest<true>(vector<string>{{"abc"}});
+  runElementTypeTest<true>(std::make_shared<char>('a'));
+  runElementTypeTest<true>(folly::make_unique<char>('a'));
+  runElementTypeTest<true>(boost::intrusive_ptr<RefCounted>(new RefCounted));
+  EXPECT_EQ(RefCounted::active_instances, 0);
+}
+
 TEST(MPMCQueue, single_thread_enqdeq) {
+  // Non-dynamic version only.
+  // False positive for dynamic version. Capacity can be temporarily
+  // higher than specified.
   MPMCQueue<int> cq(10);
 
   for (int pass = 0; pass < 10; ++pass) {
@@ -184,6 +198,9 @@ TEST(MPMCQueue, single_thread_enqdeq) {
 }
 
 TEST(MPMCQueue, tryenq_capacity_test) {
+  // Non-dynamic version only.
+  // False positive for dynamic version. Capacity can be temporarily
+  // higher than specified.
   for (size_t cap = 1; cap < 100; ++cap) {
     MPMCQueue<int> cq(cap);
     for (size_t i = 0; i < cap; ++i) {
@@ -194,6 +211,9 @@ TEST(MPMCQueue, tryenq_capacity_test) {
 }
 
 TEST(MPMCQueue, enq_capacity_test) {
+  // Non-dynamic version only.
+  // False positive for dynamic version. Capacity can be temporarily
+  // higher than specified.
   for (auto cap : { 1, 100, 10000 }) {
     MPMCQueue<int> cq(cap);
     for (int i = 0; i < cap; ++i) {
@@ -214,11 +234,11 @@ TEST(MPMCQueue, enq_capacity_test) {
   }
 }
 
-template <template<typename> class Atom>
+template <template<typename> class Atom, bool Dynamic = false>
 void runTryEnqDeqThread(
     int numThreads,
     int n, /*numOps*/
-    MPMCQueue<int, Atom>& cq,
+    MPMCQueue<int, Atom, Dynamic>& cq,
     std::atomic<uint64_t>& sum,
     int t) {
   uint64_t threadSum = 0;
@@ -241,18 +261,18 @@ void runTryEnqDeqThread(
   sum += threadSum;
 }
 
-template <template<typename> class Atom>
+template <template<typename> class Atom, bool Dynamic = false>
 void runTryEnqDeqTest(int numThreads, int numOps) {
   // write and read aren't linearizable, so we don't have
   // hard guarantees on their individual behavior.  We can still test
   // correctness in aggregate
-  MPMCQueue<int,Atom> cq(numThreads);
+  MPMCQueue<int,Atom, Dynamic> cq(numThreads);
 
   uint64_t n = numOps;
   vector<std::thread> threads(numThreads);
   std::atomic<uint64_t> sum(0);
   for (int t = 0; t < numThreads; ++t) {
-    threads[t] = DSched::thread(std::bind(runTryEnqDeqThread<Atom>,
+    threads[t] = DSched::thread(std::bind(runTryEnqDeqThread<Atom, Dynamic>,
           numThreads, n, std::ref(cq), std::ref(sum), t));
   }
   for (auto& t : threads) {
@@ -271,6 +291,15 @@ TEST(MPMCQueue, mt_try_enq_deq) {
   }
 }
 
+TEST(MPMCQueue, mt_try_enq_deq_dynamic) {
+  int nts[] = { 1, 3, 100 };
+
+  int n = 100000;
+  for (int nt : nts) {
+    runTryEnqDeqTest<std::atomic, /* Dynamic = */ true>(nt, n);
+  }
+}
+
 TEST(MPMCQueue, mt_try_enq_deq_emulated_futex) {
   int nts[] = { 1, 3, 100 };
 
@@ -280,6 +309,15 @@ TEST(MPMCQueue, mt_try_enq_deq_emulated_futex) {
   }
 }
 
+TEST(MPMCQueue, mt_try_enq_deq_emulated_futex_dynamic) {
+  int nts[] = { 1, 3, 100 };
+
+  int n = 100000;
+  for (int nt : nts) {
+    runTryEnqDeqTest<EmulatedFutexAtomic, /* Dynamic = */ true>(nt, n);
+  }
+}
+
 TEST(MPMCQueue, mt_try_enq_deq_deterministic) {
   int nts[] = { 3, 10 };
 
@@ -296,6 +334,14 @@ TEST(MPMCQueue, mt_try_enq_deq_deterministic) {
       DSched sched(DSched::uniformSubset(seed, 2));
       runTryEnqDeqTest<DeterministicAtomic>(nt, n);
     }
+    {
+      DSched sched(DSched::uniform(seed));
+      runTryEnqDeqTest<DeterministicAtomic, /*Dynamic = */ true>(nt, n);
+    }
+    {
+      DSched sched(DSched::uniformSubset(seed, 2));
+      runTryEnqDeqTest<DeterministicAtomic, /*Dynamic = */ true>(nt, n);
+    }
   }
 }
 
@@ -414,10 +460,11 @@ string producerConsumerBench(Q&& queue,
   long csw = endUsage.ru_nvcsw + endUsage.ru_nivcsw -
       (beginUsage.ru_nvcsw + beginUsage.ru_nivcsw);
   uint64_t failures = failed;
+  size_t allocated = q.allocatedCapacity();
 
   return folly::sformat(
       "{}, {} {} producers, {} consumers => {} nanos/handoff, {} csw / {} "
-      "handoff, {} failures",
+      "handoff, {} failures, {} allocated",
       qName,
       numProducers,
       writer.methodName(),
@@ -425,134 +472,254 @@ string producerConsumerBench(Q&& queue,
       nanosPer,
       csw,
       n,
-      failures);
+      failures,
+      allocated);
 }
 
-TEST(MPMCQueue, mt_prod_cons_deterministic) {
+template <bool Dynamic = false>
+void runMtProdConsDeterministic(long seed) {
   // we use the Bench method, but perf results are meaningless under DSched
-  DSched sched(DSched::uniform(0));
+  DSched sched(DSched::uniform(seed));
+
+  vector<unique_ptr<WriteMethodCaller<MPMCQueue<int, DeterministicAtomic,
+                                                Dynamic>>>> callers;
+  callers.emplace_back(make_unique<BlockingWriteCaller<MPMCQueue<int,
+                       DeterministicAtomic, Dynamic>>>());
+  callers.emplace_back(make_unique<WriteIfNotFullCaller<MPMCQueue<int,
+                       DeterministicAtomic, Dynamic>>>());
+  callers.emplace_back(make_unique<WriteCaller<MPMCQueue<int,
+                       DeterministicAtomic, Dynamic>>>());
+  callers.emplace_back(make_unique<TryWriteUntilCaller<MPMCQueue<int,
+                       DeterministicAtomic, Dynamic>>>(milliseconds(1)));
+  callers.emplace_back(make_unique<TryWriteUntilCaller<MPMCQueue<int,
+                       DeterministicAtomic, Dynamic>>>(seconds(2)));
+  size_t cap;
+
+  for (const auto& caller : callers) {
+    cap = 10;
+    LOG(INFO) <<
+      producerConsumerBench(
+        MPMCQueue<int, DeterministicAtomic, Dynamic>(cap),
+        "MPMCQueue<int, DeterministicAtomic, Dynamic>("
+          + folly::to<std::string>(cap)+")",
+        1,
+        1,
+        1000,
+        *caller);
+    cap = 100;
+    LOG(INFO) <<
+      producerConsumerBench(
+        MPMCQueue<int, DeterministicAtomic, Dynamic>(cap),
+        "MPMCQueue<int, DeterministicAtomic, Dynamic>("
+          + folly::to<std::string>(cap)+")",
+        10,
+        10,
+        1000,
+        *caller);
+    cap = 10;
+    LOG(INFO) <<
+      producerConsumerBench(
+        MPMCQueue<int, DeterministicAtomic, Dynamic>(cap),
+        "MPMCQueue<int, DeterministicAtomic, Dynamic>("
+          + folly::to<std::string>(cap)+")",
+        1,
+        1,
+        1000,
+        *caller);
+    cap = 100;
+    LOG(INFO) <<
+      producerConsumerBench(
+        MPMCQueue<int, DeterministicAtomic, Dynamic>(cap),
+        "MPMCQueue<int, DeterministicAtomic, Dynamic>("
+          + folly::to<std::string>(cap)+")",
+        10,
+        10,
+        1000,
+        *caller);
+    cap = 1;
+    LOG(INFO) <<
+      producerConsumerBench(
+        MPMCQueue<int, DeterministicAtomic, Dynamic>(cap),
+        "MPMCQueue<int, DeterministicAtomic, Dynamic>("
+          + folly::to<std::string>(cap)+")",
+        10,
+        10,
+        1000,
+        *caller);
+  }
+}
 
-  vector<unique_ptr<WriteMethodCaller<MPMCQueue<int, DeterministicAtomic>>>>
-      callers;
-  callers.emplace_back(
-      make_unique<BlockingWriteCaller<MPMCQueue<int, DeterministicAtomic>>>());
-  callers.emplace_back(
-      make_unique<WriteIfNotFullCaller<MPMCQueue<int, DeterministicAtomic>>>());
-  callers.emplace_back(
-      make_unique<WriteCaller<MPMCQueue<int, DeterministicAtomic>>>());
-  callers.emplace_back(
-      make_unique<TryWriteUntilCaller<MPMCQueue<int, DeterministicAtomic>>>(
-          milliseconds(1)));
-  callers.emplace_back(
-      make_unique<TryWriteUntilCaller<MPMCQueue<int, DeterministicAtomic>>>(
-          seconds(2)));
+void runMtProdConsDeterministicDynamic(
+  long seed,
+  uint32_t prods,
+  uint32_t cons,
+  uint32_t numOps,
+  size_t cap,
+  size_t minCap,
+  size_t mult
+) {
+  // we use the Bench method, but perf results are meaningless under DSched
+  DSched sched(DSched::uniform(seed));
+
+  vector<unique_ptr<WriteMethodCaller<MPMCQueue<int, DeterministicAtomic,
+                                                true>>>> callers;
+  callers.emplace_back(make_unique<BlockingWriteCaller<MPMCQueue<int,
+                       DeterministicAtomic, true>>>());
+  callers.emplace_back(make_unique<WriteIfNotFullCaller<MPMCQueue<int,
+                       DeterministicAtomic, true>>>());
+  callers.emplace_back(make_unique<WriteCaller<MPMCQueue<int,
+                       DeterministicAtomic, true>>>());
+  callers.emplace_back(make_unique<TryWriteUntilCaller<MPMCQueue<int,
+                       DeterministicAtomic, true>>>(milliseconds(1)));
+  callers.emplace_back(make_unique<TryWriteUntilCaller<MPMCQueue<int,
+                       DeterministicAtomic, true>>>(seconds(2)));
 
   for (const auto& caller : callers) {
-    LOG(INFO)
-        << producerConsumerBench(MPMCQueue<int, DeterministicAtomic>(10),
-                                 "MPMCQueue<int, DeterministicAtomic>(10)",
-                                 1,
-                                 1,
-                                 1000,
-                                 *caller);
-    LOG(INFO)
-        << producerConsumerBench(MPMCQueue<int, DeterministicAtomic>(100),
-                                 "MPMCQueue<int, DeterministicAtomic>(100)",
-                                 10,
-                                 10,
-                                 1000,
-                                 *caller);
-    LOG(INFO)
-        << producerConsumerBench(MPMCQueue<int, DeterministicAtomic>(10),
-                                 "MPMCQueue<int, DeterministicAtomic>(10)",
-                                 1,
-                                 1,
-                                 1000,
-                                 *caller);
-    LOG(INFO)
-        << producerConsumerBench(MPMCQueue<int, DeterministicAtomic>(100),
-                                 "MPMCQueue<int, DeterministicAtomic>(100)",
-                                 10,
-                                 10,
-                                 1000,
-                                 *caller);
-    LOG(INFO) << producerConsumerBench(MPMCQueue<int, DeterministicAtomic>(1),
-                                       "MPMCQueue<int, DeterministicAtomic>(1)",
-                                       10,
-                                       10,
-                                       1000,
-                                       *caller);
+    LOG(INFO) <<
+      producerConsumerBench(
+        MPMCQueue<int, DeterministicAtomic, true>(cap, minCap, mult),
+        "MPMCQueue<int, DeterministicAtomic, true>("
+          + folly::to<std::string>(cap) + ", "
+          + folly::to<std::string>(minCap) + ", "
+          + folly::to<std::string>(mult)+")",
+        prods,
+        cons,
+        numOps,
+        *caller);
   }
 }
 
+TEST(MPMCQueue, mt_prod_cons_deterministic) {
+  runMtProdConsDeterministic(0);
+}
+
+TEST(MPMCQueue, mt_prod_cons_deterministic_dynamic) {
+  runMtProdConsDeterministic<true>(0);
+}
+
+template <typename T>
+void setFromEnv(T& var, const char* envvar) {
+  char* str = std::getenv(envvar);
+  if (str) { var = atoi(str); }
+}
+
+TEST(MPMCQueue, mt_prod_cons_deterministic_dynamic_with_arguments) {
+  long seed = 0;
+  uint32_t prods = 10;
+  uint32_t cons = 10;
+  uint32_t numOps = 1000;
+  size_t cap = 10000;
+  size_t minCap = 9;
+  size_t mult = 3;
+  setFromEnv(seed, "SEED");
+  setFromEnv(prods, "PRODS");
+  setFromEnv(cons, "CONS");
+  setFromEnv(numOps, "NUM_OPS");
+  setFromEnv(cap, "CAP");
+  setFromEnv(minCap, "MIN_CAP");
+  setFromEnv(mult, "MULT");
+  runMtProdConsDeterministicDynamic(
+    seed, prods, cons, numOps, cap, minCap, mult);
+}
+
 #define PC_BENCH(q, np, nc, ...) \
     producerConsumerBench(q, #q, (np), (nc), __VA_ARGS__)
 
-TEST(MPMCQueue, mt_prod_cons) {
+template <bool Dynamic = false>
+void runMtProdCons() {
   int n = 100000;
-  vector<unique_ptr<WriteMethodCaller<MPMCQueue<int>>>> callers;
-  callers.emplace_back(make_unique<BlockingWriteCaller<MPMCQueue<int>>>());
-  callers.emplace_back(make_unique<WriteIfNotFullCaller<MPMCQueue<int>>>());
-  callers.emplace_back(make_unique<WriteCaller<MPMCQueue<int>>>());
-  callers.emplace_back(
-      make_unique<TryWriteUntilCaller<MPMCQueue<int>>>(milliseconds(1)));
-  callers.emplace_back(
-      make_unique<TryWriteUntilCaller<MPMCQueue<int>>>(seconds(2)));
+  setFromEnv(n, "NUM_OPS");
+  vector<unique_ptr<WriteMethodCaller<MPMCQueue<int, std::atomic, Dynamic>>>>
+    callers;
+  callers.emplace_back(make_unique<BlockingWriteCaller<MPMCQueue<int,
+                       std::atomic, Dynamic>>>());
+  callers.emplace_back(make_unique<WriteIfNotFullCaller<MPMCQueue<int,
+                       std::atomic, Dynamic>>>());
+  callers.emplace_back(make_unique<WriteCaller<MPMCQueue<int, std::atomic,
+                       Dynamic>>>());
+  callers.emplace_back(make_unique<TryWriteUntilCaller<MPMCQueue<int,
+                       std::atomic, Dynamic>>>(milliseconds(1)));
+  callers.emplace_back(make_unique<TryWriteUntilCaller<MPMCQueue<int,
+                       std::atomic, Dynamic>>>(seconds(2)));
   for (const auto& caller : callers) {
-    LOG(INFO) << PC_BENCH(MPMCQueue<int>(10), 1, 1, n, *caller);
-    LOG(INFO) << PC_BENCH(MPMCQueue<int>(10), 10, 1, n, *caller);
-    LOG(INFO) << PC_BENCH(MPMCQueue<int>(10), 1, 10, n, *caller);
-    LOG(INFO) << PC_BENCH(MPMCQueue<int>(10), 10, 10, n, *caller);
-    LOG(INFO) << PC_BENCH(MPMCQueue<int>(10000), 1, 1, n, *caller);
-    LOG(INFO) << PC_BENCH(MPMCQueue<int>(10000), 10, 1, n, *caller);
-    LOG(INFO) << PC_BENCH(MPMCQueue<int>(10000), 1, 10, n, *caller);
-    LOG(INFO) << PC_BENCH(MPMCQueue<int>(10000), 10, 10, n, *caller);
-    LOG(INFO) << PC_BENCH(MPMCQueue<int>(100000), 32, 100, n, *caller);
+    LOG(INFO) << PC_BENCH((MPMCQueue<int, std::atomic, Dynamic>(10)),
+                          1, 1, n, *caller);
+    LOG(INFO) << PC_BENCH((MPMCQueue<int, std::atomic, Dynamic>(10)),
+                          10, 1, n, *caller);
+    LOG(INFO) << PC_BENCH((MPMCQueue<int, std::atomic, Dynamic>(10)),
+                          1, 10, n, *caller);
+    LOG(INFO) << PC_BENCH((MPMCQueue<int, std::atomic, Dynamic>(10)),
+                          10, 10, n, *caller);
+    LOG(INFO) << PC_BENCH((MPMCQueue<int, std::atomic, Dynamic>(10000)),
+                          1, 1, n, *caller);
+    LOG(INFO) << PC_BENCH((MPMCQueue<int, std::atomic, Dynamic>(10000)),
+                          10, 1, n, *caller);
+    LOG(INFO) << PC_BENCH((MPMCQueue<int, std::atomic, Dynamic>(10000)),
+                          1, 10, n, *caller);
+    LOG(INFO) << PC_BENCH((MPMCQueue<int, std::atomic, Dynamic>(10000)),
+                          10, 10, n, *caller);
+    LOG(INFO) << PC_BENCH((MPMCQueue<int, std::atomic, Dynamic>(100000)),
+                          32, 100, n, *caller);
   }
 }
 
-TEST(MPMCQueue, mt_prod_cons_emulated_futex) {
+TEST(MPMCQueue, mt_prod_cons) {
+  runMtProdCons();
+}
+
+TEST(MPMCQueue, mt_prod_cons_dynamic) {
+  runMtProdCons</* Dynamic = */ true>();
+}
+
+template <bool Dynamic = false>
+void runMtProdConsEmulatedFutex() {
   int n = 100000;
-  vector<unique_ptr<WriteMethodCaller<MPMCQueue<int, EmulatedFutexAtomic>>>>
-      callers;
-  callers.emplace_back(
-      make_unique<BlockingWriteCaller<MPMCQueue<int, EmulatedFutexAtomic>>>());
-  callers.emplace_back(
-      make_unique<WriteIfNotFullCaller<MPMCQueue<int, EmulatedFutexAtomic>>>());
-  callers.emplace_back(
-      make_unique<WriteCaller<MPMCQueue<int, EmulatedFutexAtomic>>>());
-  callers.emplace_back(
-      make_unique<TryWriteUntilCaller<MPMCQueue<int, EmulatedFutexAtomic>>>(
-          milliseconds(1)));
-  callers.emplace_back(
-      make_unique<TryWriteUntilCaller<MPMCQueue<int, EmulatedFutexAtomic>>>(
-          seconds(2)));
+  vector<unique_ptr<WriteMethodCaller<MPMCQueue<int, EmulatedFutexAtomic,
+                                                Dynamic>>>> callers;
+  callers.emplace_back(make_unique<BlockingWriteCaller<MPMCQueue<int,
+                       EmulatedFutexAtomic, Dynamic>>>());
+  callers.emplace_back(make_unique<WriteIfNotFullCaller<MPMCQueue<int,
+                       EmulatedFutexAtomic, Dynamic>>>());
+  callers.emplace_back(make_unique<WriteCaller<MPMCQueue<int,
+                       EmulatedFutexAtomic, Dynamic>>>());
+  callers.emplace_back(make_unique<TryWriteUntilCaller<MPMCQueue<int,
+                       EmulatedFutexAtomic, Dynamic>>>(milliseconds(1)));
+  callers.emplace_back(make_unique<TryWriteUntilCaller<MPMCQueue<int,
+                       EmulatedFutexAtomic, Dynamic>>>(seconds(2)));
   for (const auto& caller : callers) {
     LOG(INFO) << PC_BENCH(
-        (MPMCQueue<int, EmulatedFutexAtomic>(10)), 1, 1, n, *caller);
+      (MPMCQueue<int, EmulatedFutexAtomic, Dynamic>(10)), 1, 1, n, *caller);
     LOG(INFO) << PC_BENCH(
-        (MPMCQueue<int, EmulatedFutexAtomic>(10)), 10, 1, n, *caller);
+      (MPMCQueue<int, EmulatedFutexAtomic, Dynamic>(10)), 10, 1, n, *caller);
     LOG(INFO) << PC_BENCH(
-        (MPMCQueue<int, EmulatedFutexAtomic>(10)), 1, 10, n, *caller);
+      (MPMCQueue<int, EmulatedFutexAtomic, Dynamic>(10)), 1, 10, n, *caller);
     LOG(INFO) << PC_BENCH(
-        (MPMCQueue<int, EmulatedFutexAtomic>(10)), 10, 10, n, *caller);
+      (MPMCQueue<int, EmulatedFutexAtomic, Dynamic>(10)), 10, 10, n, *caller);
     LOG(INFO) << PC_BENCH(
-        (MPMCQueue<int, EmulatedFutexAtomic>(10000)), 1, 1, n, *caller);
+      (MPMCQueue<int, EmulatedFutexAtomic, Dynamic>(10000)), 1, 1, n, *caller);
     LOG(INFO) << PC_BENCH(
-        (MPMCQueue<int, EmulatedFutexAtomic>(10000)), 10, 1, n, *caller);
+      (MPMCQueue<int, EmulatedFutexAtomic, Dynamic>(10000)), 10, 1, n, *caller);
     LOG(INFO) << PC_BENCH(
-        (MPMCQueue<int, EmulatedFutexAtomic>(10000)), 1, 10, n, *caller);
-    LOG(INFO) << PC_BENCH(
-        (MPMCQueue<int, EmulatedFutexAtomic>(10000)), 10, 10, n, *caller);
-    LOG(INFO) << PC_BENCH(
-        (MPMCQueue<int, EmulatedFutexAtomic>(100000)), 32, 100, n, *caller);
+      (MPMCQueue<int, EmulatedFutexAtomic, Dynamic>(10000)), 1, 10, n, *caller);
+    LOG(INFO) << PC_BENCH((MPMCQueue<int, EmulatedFutexAtomic, Dynamic>
+                           (10000)), 10, 10, n, *caller);
+    LOG(INFO) << PC_BENCH((MPMCQueue<int, EmulatedFutexAtomic, Dynamic>
+                           (100000)), 32, 100, n, *caller);
   }
 }
 
-template <template <typename> class Atom>
+TEST(MPMCQueue, mt_prod_cons_emulated_futex) {
+  runMtProdConsEmulatedFutex();
+}
+
+TEST(MPMCQueue, mt_prod_cons_emulated_futex_dynamic) {
+  runMtProdConsEmulatedFutex</* Dynamic = */ true>();
+}
+
+template <template <typename> class Atom, bool Dynamic = false>
 void runNeverFailThread(int numThreads,
                         int n, /*numOps*/
-                        MPMCQueue<int, Atom>& cq,
+                        MPMCQueue<int, Atom, Dynamic>& cq,
                         std::atomic<uint64_t>& sum,
                         int t) {
   uint64_t threadSum = 0;
@@ -568,10 +735,10 @@ void runNeverFailThread(int numThreads,
   sum += threadSum;
 }
 
-template <template <typename> class Atom>
+template <template <typename> class Atom, bool Dynamic = false>
 uint64_t runNeverFailTest(int numThreads, int numOps) {
   // always #enq >= #deq
-  MPMCQueue<int, Atom> cq(numThreads);
+  MPMCQueue<int, Atom, Dynamic> cq(numThreads);
 
   uint64_t n = numOps;
   auto beginMicro = nowMicro();
@@ -579,7 +746,7 @@ uint64_t runNeverFailTest(int numThreads, int numOps) {
   vector<std::thread> threads(numThreads);
   std::atomic<uint64_t> sum(0);
   for (int t = 0; t < numThreads; ++t) {
-    threads[t] = DSched::thread(std::bind(runNeverFailThread<Atom>,
+    threads[t] = DSched::thread(std::bind(runNeverFailThread<Atom, Dynamic>,
                                           numThreads,
                                           n,
                                           std::ref(cq),
@@ -595,51 +762,72 @@ uint64_t runNeverFailTest(int numThreads, int numOps) {
   return nowMicro() - beginMicro;
 }
 
-TEST(MPMCQueue, mt_never_fail) {
-  int nts[] = {1, 3, 100};
-
-  int n = 100000;
+template <template<typename> class Atom, bool Dynamic = false>
+void runMtNeverFail(std::vector<int>& nts, int n) {
   for (int nt : nts) {
-    uint64_t elapsed = runNeverFailTest<std::atomic>(nt, n);
+    uint64_t elapsed = runNeverFailTest<Atom, Dynamic>(nt, n);
     LOG(INFO) << (elapsed * 1000.0) / (n * 2) << " nanos per op with " << nt
               << " threads";
   }
 }
 
-TEST(MPMCQueue, mt_never_fail_emulated_futex) {
-  int nts[] = {1, 3, 100};
+TEST(MPMCQueue, mt_never_fail) {
+  std::vector<int> nts {1, 3, 100};
+  int n = 100000;
+  runMtNeverFail<std::atomic>(nts, n);
+}
 
+TEST(MPMCQueue, mt_never_fail_dynamic) {
+  std::vector<int> nts {1, 3, 100};
   int n = 100000;
-  for (int nt : nts) {
-    uint64_t elapsed = runNeverFailTest<EmulatedFutexAtomic>(nt, n);
-    LOG(INFO) << (elapsed * 1000.0) / (n * 2) << " nanos per op with " << nt
-              << " threads";
-  }
+  runMtNeverFail<std::atomic, true>(nts, n);
 }
 
-TEST(MPMCQueue, mt_never_fail_deterministic) {
-  int nts[] = {3, 10};
+TEST(MPMCQueue, mt_never_fail_emulated_futex) {
+  std::vector<int> nts {1, 3, 100};
+  int n = 100000;
+  runMtNeverFail<EmulatedFutexAtomic>(nts, n);
+}
 
-  long seed = 0; // nowMicro() % 10000;
-  LOG(INFO) << "using seed " << seed;
+TEST(MPMCQueue, mt_never_fail_emulated_futex_dynamic) {
+  std::vector<int> nts {1, 3, 100};
+  int n = 100000;
+  runMtNeverFail<EmulatedFutexAtomic, true>(nts, n);
+}
 
-  int n = 1000;
+template<bool Dynamic = false>
+void runMtNeverFailDeterministic(std::vector<int>& nts, int n, long seed) {
+  LOG(INFO) << "using seed " << seed;
   for (int nt : nts) {
     {
       DSched sched(DSched::uniform(seed));
-      runNeverFailTest<DeterministicAtomic>(nt, n);
+      runNeverFailTest<DeterministicAtomic, Dynamic>(nt, n);
     }
     {
       DSched sched(DSched::uniformSubset(seed, 2));
-      runNeverFailTest<DeterministicAtomic>(nt, n);
+      runNeverFailTest<DeterministicAtomic, Dynamic>(nt, n);
     }
   }
 }
 
-template <class Clock, template <typename> class Atom>
+TEST(MPMCQueue, mt_never_fail_deterministic) {
+  std::vector<int> nts {3, 10};
+  long seed = 0; // nowMicro() % 10000;
+  int n = 1000;
+  runMtNeverFailDeterministic(nts, n, seed);
+}
+
+TEST(MPMCQueue, mt_never_fail_deterministic_dynamic) {
+  std::vector<int> nts {3, 10};
+  long seed = 0; // nowMicro() % 10000;
+  int n = 1000;
+  runMtNeverFailDeterministic<true>(nts, n, seed);
+}
+
+template <class Clock, template <typename> class Atom, bool Dynamic>
 void runNeverFailUntilThread(int numThreads,
                              int n, /*numOps*/
-                             MPMCQueue<int, Atom>& cq,
+                             MPMCQueue<int, Atom, Dynamic>& cq,
                              std::atomic<uint64_t>& sum,
                              int t) {
   uint64_t threadSum = 0;
@@ -656,10 +844,10 @@ void runNeverFailUntilThread(int numThreads,
   sum += threadSum;
 }
 
-template <class Clock, template <typename> class Atom>
+template <class Clock, template <typename> class Atom, bool Dynamic = false>
 uint64_t runNeverFailTest(int numThreads, int numOps) {
   // always #enq >= #deq
-  MPMCQueue<int, Atom> cq(numThreads);
+  MPMCQueue<int, Atom, Dynamic> cq(numThreads);
 
   uint64_t n = numOps;
   auto beginMicro = nowMicro();
@@ -667,12 +855,13 @@ uint64_t runNeverFailTest(int numThreads, int numOps) {
   vector<std::thread> threads(numThreads);
   std::atomic<uint64_t> sum(0);
   for (int t = 0; t < numThreads; ++t) {
-    threads[t] = DSched::thread(std::bind(runNeverFailUntilThread<Clock, Atom>,
-                                          numThreads,
-                                          n,
-                                          std::ref(cq),
-                                          std::ref(sum),
-                                          t));
+    threads[t] = DSched::thread(std::bind(
+                                  runNeverFailUntilThread<Clock, Atom, Dynamic>,
+                                  numThreads,
+                                  n,
+                                  std::ref(cq),
+                                  std::ref(sum),
+                                  t));
   }
   for (auto& t : threads) {
     DSched::join(t);
@@ -683,30 +872,50 @@ uint64_t runNeverFailTest(int numThreads, int numOps) {
   return nowMicro() - beginMicro;
 }
 
-TEST(MPMCQueue, mt_never_fail_until_system) {
-  int nts[] = {1, 3, 100};
-
-  int n = 100000;
+template <bool Dynamic = false>
+void runMtNeverFailUntilSystem(std::vector<int>& nts, int n) {
   for (int nt : nts) {
     uint64_t elapsed =
-        runNeverFailTest<std::chrono::system_clock, std::atomic>(nt, n);
+      runNeverFailTest<std::chrono::system_clock, std::atomic, Dynamic>(nt, n);
     LOG(INFO) << (elapsed * 1000.0) / (n * 2) << " nanos per op with " << nt
               << " threads";
   }
 }
 
-TEST(MPMCQueue, mt_never_fail_until_steady) {
-  int nts[] = {1, 3, 100};
+TEST(MPMCQueue, mt_never_fail_until_system) {
+  std::vector<int> nts {1, 3, 100};
+  int n = 100000;
+  runMtNeverFailUntilSystem(nts, n);
+}
 
+TEST(MPMCQueue, mt_never_fail_until_system_dynamic) {
+  std::vector<int> nts {1, 3, 100};
   int n = 100000;
+  runMtNeverFailUntilSystem<true>(nts, n);
+}
+
+template <bool Dynamic = false>
+void runMtNeverFailUntilSteady(std::vector<int>& nts, int n) {
   for (int nt : nts) {
     uint64_t elapsed =
-        runNeverFailTest<std::chrono::steady_clock, std::atomic>(nt, n);
+      runNeverFailTest<std::chrono::steady_clock, std::atomic, Dynamic>(nt, n);
     LOG(INFO) << (elapsed * 1000.0) / (n * 2) << " nanos per op with " << nt
               << " threads";
   }
 }
 
+TEST(MPMCQueue, mt_never_fail_until_steady) {
+  std::vector<int> nts {1, 3, 100};
+  int n = 100000;
+  runMtNeverFailUntilSteady(nts, n);
+}
+
+TEST(MPMCQueue, mt_never_fail_until_steady_dynamic) {
+  std::vector<int> nts {1, 3, 100};
+  int n = 100000;
+  runMtNeverFailUntilSteady<true>(nts, n);
+}
+
 enum LifecycleEvent {
   NOTHING = -1,
   DEFAULT_CONSTRUCTOR,
@@ -794,7 +1003,8 @@ void runPerfectForwardingTest() {
   EXPECT_EQ(lc_outstanding(), 0);
 
   {
-    MPMCQueue<Lifecycle<R>> queue(50);
+    // Non-dynamic only. False positive for dynamic.
+    MPMCQueue<Lifecycle<R>, std::atomic> queue(50);
     LIFECYCLE_STEP(NOTHING);
 
     for (int pass = 0; pass < 10; ++pass) {
@@ -870,19 +1080,21 @@ TEST(MPMCQueue, perfect_forwarding_relocatable) {
   runPerfectForwardingTest<std::true_type>();
 }
 
-TEST(MPMCQueue, queue_moving) {
+template <bool Dynamic = false>
+void run_queue_moving() {
   lc_snap();
   EXPECT_EQ(lc_outstanding(), 0);
 
   {
-    MPMCQueue<Lifecycle<std::false_type>> a(50);
+    MPMCQueue<Lifecycle<std::false_type>, std::atomic, Dynamic> a(50);
     LIFECYCLE_STEP(NOTHING);
 
     a.blockingWrite();
     LIFECYCLE_STEP(DEFAULT_CONSTRUCTOR);
 
     // move constructor
-    MPMCQueue<Lifecycle<std::false_type>> b = std::move(a);
+    MPMCQueue<Lifecycle<std::false_type>, std::atomic, Dynamic> b
+      = std::move(a);
     LIFECYCLE_STEP(NOTHING);
     EXPECT_EQ(a.capacity(), 0);
     EXPECT_EQ(a.size(), 0);
@@ -893,7 +1105,7 @@ TEST(MPMCQueue, queue_moving) {
     LIFECYCLE_STEP(DEFAULT_CONSTRUCTOR);
 
     // move operator
-    MPMCQueue<Lifecycle<std::false_type>> c;
+    MPMCQueue<Lifecycle<std::false_type>, std::atomic, Dynamic> c;
     LIFECYCLE_STEP(NOTHING);
     c = std::move(b);
     LIFECYCLE_STEP(NOTHING);
@@ -908,7 +1120,7 @@ TEST(MPMCQueue, queue_moving) {
 
       {
         // swap
-        MPMCQueue<Lifecycle<std::false_type>> d(10);
+        MPMCQueue<Lifecycle<std::false_type>, std::atomic, Dynamic> d(10);
         LIFECYCLE_STEP(NOTHING);
         std::swap(c, d);
         LIFECYCLE_STEP(NOTHING);
@@ -933,6 +1145,17 @@ TEST(MPMCQueue, queue_moving) {
   LIFECYCLE_STEP(DESTRUCTOR);
 }
 
+TEST(MPMCQueue, queue_moving) {
+  run_queue_moving();
+}
+
+TEST(MPMCQueue, queue_moving_dynamic) {
+  run_queue_moving<true>();
+}
+
 TEST(MPMCQueue, explicit_zero_capacity_fail) {
   ASSERT_THROW(MPMCQueue<int> cq(0), std::invalid_argument);
+
+  using DynamicMPMCQueueInt = MPMCQueue<int, std::atomic, true>;
+  ASSERT_THROW(DynamicMPMCQueueInt cq(0), std::invalid_argument);
 }