Dynamic expansion of folly MPMC queue
[folly.git] / folly / MPMCQueue.h
index 08b2e1b8f7e31cd86b9881c30311fcb1fe08ed7f..2bf89c5301e41b4bde6d87a5108652db3f04e419 100644 (file)
@@ -38,6 +38,9 @@ struct SingleElementQueue;
 
 template <typename T> class MPMCPipelineStageImpl;
 
+/// MPMCQueue base CRTP template
+template <typename> class MPMCQueueBase;
+
 } // namespace detail
 
 /// MPMCQueue<T> is a high-performance bounded concurrent queue that
@@ -93,34 +96,536 @@ template <typename T> class MPMCPipelineStageImpl;
 /// are you can enqueue one sentinel and then have each consumer requeue
 /// two sentinels after it receives it (by requeuing 2 the shutdown can
 /// complete in O(log P) time instead of O(P)).
-template<typename T,
-         template<typename> class Atom = std::atomic>
-class MPMCQueue : boost::noncopyable {
+template<typename T, template<typename> class Atom = std::atomic,
+         bool Dynamic = false>
+class MPMCQueue : public detail::MPMCQueueBase<MPMCQueue<T,Atom,Dynamic>> {
+  friend class detail::MPMCPipelineStageImpl<T>;
+  using Slot = detail::SingleElementQueue<T,Atom>;
+ public:
+
+  explicit MPMCQueue(size_t queueCapacity)
+    : detail::MPMCQueueBase<MPMCQueue<T,Atom,Dynamic>>(queueCapacity)
+  {
+    this->stride_ = this->computeStride(queueCapacity);
+    this->slots_ = new Slot[queueCapacity + 2 * this->kSlotPadding];
+  }
+
+  MPMCQueue() noexcept { }
+};
+
+/// The dynamic version of MPMCQueue allows dynamic expansion of queue
+/// capacity, such that a queue may start with a smaller capacity than
+/// specified and expand only if needed. Users may optionally specify
+/// the initial capacity and the expansion multiplier.
+///
+/// The design uses a seqlock to enforce mutual exclusion among
+/// expansion attempts. Regular operations read up-to-date queue
+/// information (slots array, capacity, stride) inside read-only
+/// seqlock sections, which are unimpeded when no expansion is in
+/// progress.
+///
+/// An expansion computes a new capacity, allocates a new slots array,
+/// and updates stride. No information needs to be copied from the
+/// current slots array to the new one. When this happens, new slots
+/// will not have sequence numbers that match ticket numbers. The
+/// expansion needs to compute a ticket offset such that operations
+/// that use new arrays can adjust the calculations of slot indexes
+/// and sequence numbers that take into account that the new slots
+/// start with sequence numbers of zero. The current ticket offset is
+/// packed with the seqlock in an atomic 64-bit integer. The initial
+/// offset is zero.
+///
+/// Lagging write and read operations with tickets lower than the
+/// ticket offset of the current slots array (i.e., the minimum ticket
+/// number that can be served by the current array) must use earlier
+/// closed arrays instead of the current one. Information about closed
+/// slots arrays (array address, capacity, stride, and offset) is
+/// maintained in a logarithmic-sized structure. Each entry in that
+/// structure never need to be changed once set. The number of closed
+/// arrays is half the value of the seqlock (when unlocked).
+///
+/// The acquisition of the seqlock to perform an expansion does not
+/// prevent the issuing of new push and pop tickets concurrently. The
+/// expansion must set the new ticket offset to a value that couldn't
+/// have been issued to an operation that has already gone through a
+/// seqlock read-only section (and hence obtained information for
+/// older closed arrays).
+///
+/// Note that the total queue capacity can temporarily exceed the
+/// specified capacity when there are lagging consumers that haven't
+/// yet consumed all the elements in closed arrays. Users should not
+/// rely on the capacity of dynamic queues for synchronization, e.g.,
+/// they should not expect that a thread will definitely block on a
+/// call to blockingWrite() when the queue size is known to be equal
+/// to its capacity.
+///
+/// The dynamic version is a partial specialization of MPMCQueue with
+/// Dynamic == true
+template <typename T, template<typename> class Atom>
+class MPMCQueue<T,Atom,true> :
+      public detail::MPMCQueueBase<MPMCQueue<T,Atom,true>> {
+  friend class detail::MPMCQueueBase<MPMCQueue<T,Atom,true>>;
+  using Slot = detail::SingleElementQueue<T,Atom>;
+
+  struct ClosedArray {
+    uint64_t offset_ {0};
+    Slot* slots_ {nullptr};
+    size_t capacity_ {0};
+    int stride_ {0};
+  };
+
+ public:
+
+  explicit MPMCQueue(size_t queueCapacity)
+    : detail::MPMCQueueBase<MPMCQueue<T,Atom,true>>(queueCapacity)
+  {
+    size_t cap = std::min<size_t>(kDefaultMinDynamicCapacity, queueCapacity);
+    initQueue(cap, kDefaultExpansionMultiplier);
+  }
+
+  explicit MPMCQueue(size_t queueCapacity,
+                     size_t minCapacity,
+                     size_t expansionMultiplier)
+    : detail::MPMCQueueBase<MPMCQueue<T,Atom,true>>(queueCapacity)
+  {
+    minCapacity = std::max<size_t>(1, minCapacity);
+    size_t cap = std::min<size_t>(minCapacity, queueCapacity);
+    expansionMultiplier = std::max<size_t>(2, expansionMultiplier);
+    initQueue(cap, expansionMultiplier);
+  }
+
+  MPMCQueue() noexcept {
+    dmult_ = 0;
+    closed_ = nullptr;
+  }
+
+  MPMCQueue(MPMCQueue<T,Atom,true>&& rhs) noexcept {
+    this->capacity_ = rhs.capacity_;
+    this->slots_ = rhs.slots_;
+    this->stride_ = rhs.stride_;
+    this->dstate_.store(rhs.dstate_.load(std::memory_order_relaxed),
+                        std::memory_order_relaxed);
+    this->dcapacity_.store(rhs.dcapacity_.load(std::memory_order_relaxed),
+                           std::memory_order_relaxed);
+    this->pushTicket_.store(rhs.pushTicket_.load(std::memory_order_relaxed),
+                            std::memory_order_relaxed);
+    this->popTicket_.store(rhs.popTicket_.load(std::memory_order_relaxed),
+                           std::memory_order_relaxed);
+    this->pushSpinCutoff_.store(
+      rhs.pushSpinCutoff_.load(std::memory_order_relaxed),
+      std::memory_order_relaxed);
+    this->popSpinCutoff_.store(
+      rhs.popSpinCutoff_.load(std::memory_order_relaxed),
+      std::memory_order_relaxed);
+    dmult_ = rhs.dmult_;
+    closed_ = rhs.closed_;
+
+    rhs.capacity_ = 0;
+    rhs.slots_ = nullptr;
+    rhs.stride_ = 0;
+    rhs.dstate_.store(0, std::memory_order_relaxed);
+    rhs.dcapacity_.store(0, std::memory_order_relaxed);
+    rhs.pushTicket_.store(0, std::memory_order_relaxed);
+    rhs.popTicket_.store(0, std::memory_order_relaxed);
+    rhs.pushSpinCutoff_.store(0, std::memory_order_relaxed);
+    rhs.popSpinCutoff_.store(0, std::memory_order_relaxed);
+    rhs.dmult_ = 0;
+    rhs.closed_ = nullptr;
+  }
+
+  MPMCQueue<T,Atom, true> const& operator= (MPMCQueue<T,Atom, true>&& rhs) {
+    if (this != &rhs) {
+      this->~MPMCQueue();
+      new (this) MPMCQueue(std::move(rhs));
+    }
+    return *this;
+  }
+
+  ~MPMCQueue() {
+    if (closed_ != nullptr) {
+      for (int i = getNumClosed(this->dstate_.load()) - 1; i >= 0; --i) {
+        delete[] closed_[i].slots_;
+      }
+      delete[] closed_;
+    }
+  }
+
+  size_t allocatedCapacity() const noexcept {
+    return this->dcapacity_.load(std::memory_order_relaxed);
+  }
+
+  template <typename ...Args>
+  void blockingWrite(Args&&... args) noexcept {
+    uint64_t ticket = this->pushTicket_++;
+    Slot* slots;
+    size_t cap;
+    int stride;
+    uint64_t state;
+    uint64_t offset;
+    do {
+      if (!trySeqlockReadSection(state, slots, cap, stride)) {
+        continue;
+      }
+      offset = getOffset(state);
+      if (ticket < offset) {
+        // There was an expansion after this ticket was issued.
+        updateFromClosed(state, ticket, offset, slots, cap, stride);
+        break;
+      }
+      if (slots[this->idx((ticket-offset), cap, stride)]
+          .mayEnqueue(this->turn(ticket-offset, cap))) {
+        // A slot is ready. No need to expand.
+        break;
+      } else if (this->popTicket_.load(std::memory_order_relaxed) + cap
+                 > ticket) {
+        // May block, but a pop is in progress. No need to expand.
+        // Get seqlock read section info again in case an expansion
+        // occurred with an equal or higher ticket.
+        continue;
+      } else {
+        // May block. See if we can expand.
+        if (tryExpand(state, cap)) {
+          // This or another thread started an expansion. Get updated info.
+          continue;
+        } else {
+          // Can't expand.
+          break;
+        }
+      }
+    } while (true);
+    this->enqueueWithTicketBase(ticket-offset, slots, cap, stride,
+                                std::forward<Args>(args)...);
+  }
+
+  void blockingReadWithTicket(uint64_t& ticket, T& elem) noexcept {
+    ticket = this->popTicket_++;
+    Slot* slots;
+    size_t cap;
+    int stride;
+    uint64_t state;
+    uint64_t offset;
+    while (!trySeqlockReadSection(state, slots, cap, stride));
+    offset = getOffset(state);
+    if (ticket < offset) {
+      // There was an expansion after the corresponding push ticket
+      // was issued.
+      updateFromClosed(state, ticket, offset, slots, cap, stride);
+    }
+    this->dequeueWithTicketBase(ticket-offset, slots, cap, stride, elem);
+  }
+
+ private:
+
+  enum {
+    kSeqlockBits = 6,
+    kDefaultMinDynamicCapacity = 10,
+    kDefaultExpansionMultiplier = 10,
+  };
+
+  size_t dmult_;
+
+  //  Info about closed slots arrays for use by lagging operations
+  ClosedArray* closed_;
+
+  void initQueue(const size_t cap, const size_t mult) {
+    this->stride_ = this->computeStride(cap);
+    this->slots_ = new Slot[cap + 2 * this->kSlotPadding];
+    this->dstate_.store(0);
+    this->dcapacity_.store(cap);
+    dmult_ = mult;
+    size_t maxClosed = 0;
+    for (size_t expanded = cap;
+         expanded < this->capacity_;
+         expanded *= mult) {
+      ++maxClosed;
+    }
+    closed_ = (maxClosed > 0) ? new ClosedArray[maxClosed] : nullptr;
+  }
+
+  bool tryObtainReadyPushTicket(
+      uint64_t& ticket, Slot*& slots, size_t& cap, int& stride
+  ) noexcept {
+    uint64_t state;
+    do {
+      ticket = this->pushTicket_.load(std::memory_order_acquire); // A
+      if (!trySeqlockReadSection(state, slots, cap, stride)) {
+        continue;
+      }
+      uint64_t offset = getOffset(state);
+      if (ticket < offset) {
+        // There was an expansion with offset greater than this ticket
+        updateFromClosed(state, ticket, offset, slots, cap, stride);
+      }
+      if (slots[this->idx((ticket-offset), cap, stride)]
+          .mayEnqueue(this->turn(ticket-offset, cap))) {
+        // A slot is ready.
+        if (this->pushTicket_.compare_exchange_strong(ticket, ticket + 1)) {
+          // Adjust ticket
+          ticket -= offset;
+          return true;
+        } else {
+          continue;
+        }
+      } else {
+        if (ticket != this->pushTicket_.load(std::memory_order_relaxed)) { // B
+          // Try again. Ticket changed.
+          continue;
+        }
+        // Likely to block.
+        // Try to expand unless the ticket is for a closed array
+        if (offset == getOffset(state)) {
+          if (tryExpand(state, cap)) {
+            // This or another thread started an expansion. Get up-to-date info.
+            continue;
+          }
+        }
+        return false;
+      }
+    } while (true);
+  }
+
+  bool tryObtainPromisedPushTicket(
+    uint64_t& ticket, Slot*& slots, size_t& cap, int& stride
+  ) noexcept {
+    uint64_t state;
+    do {
+      ticket = this->pushTicket_.load(std::memory_order_acquire);
+      auto numPops = this->popTicket_.load(std::memory_order_acquire);
+      if (!trySeqlockReadSection(state, slots, cap, stride)) {
+        continue;
+      }
+      int64_t n = ticket - numPops;
+      if (n >= static_cast<ssize_t>(this->capacity_)) {
+        return false;
+      }
+      if ((n >= static_cast<ssize_t>(cap))) {
+        if (tryExpand(state, cap)) {
+          // This or another thread started an expansion. Start over
+          // with a new state.
+          continue;
+        } else {
+          // Can't expand.
+          return false;
+        }
+      }
+      uint64_t offset = getOffset(state);
+      if (ticket < offset) {
+        // There was an expansion with offset greater than this ticket
+        updateFromClosed(state, ticket, offset, slots, cap, stride);
+      }
+      if (this->pushTicket_.compare_exchange_strong(ticket, ticket + 1)) {
+        // Adjust ticket
+        ticket -= offset;
+        return true;
+      }
+    } while (true);
+  }
+
+  bool tryObtainReadyPopTicket(
+    uint64_t& ticket, Slot*& slots, size_t& cap, int& stride
+  ) noexcept {
+    uint64_t state;
+    do {
+      ticket = this->popTicket_.load(std::memory_order_relaxed);
+      if (!trySeqlockReadSection(state, slots, cap, stride)) {
+        continue;
+      }
+      uint64_t offset = getOffset(state);
+      if (ticket < offset) {
+        // There was an expansion after the corresponding push ticket
+        // was issued.
+        updateFromClosed(state, ticket, offset, slots, cap, stride);
+      }
+      if (slots[this->idx((ticket-offset), cap, stride)]
+          .mayDequeue(this->turn(ticket-offset, cap))) {
+        if (this->popTicket_.compare_exchange_strong(ticket, ticket + 1)) {
+          // Adjust ticket
+          ticket -= offset;
+          return true;
+        }
+      } else {
+        return false;
+      }
+    } while (true);
+  }
+
+  bool tryObtainPromisedPopTicket(
+    uint64_t& ticket, Slot*& slots, size_t& cap, int& stride
+  ) noexcept {
+    uint64_t state;
+    do {
+      ticket = this->popTicket_.load(std::memory_order_acquire);
+      auto numPushes = this->pushTicket_.load(std::memory_order_acquire);
+      if (!trySeqlockReadSection(state, slots, cap, stride)) {
+        continue;
+      }
+      if (ticket >= numPushes) {
+        return false;
+      }
+      if (this->popTicket_.compare_exchange_strong(ticket, ticket + 1)) {
+        // Adjust ticket
+        uint64_t offset = getOffset(state);
+        if (ticket < offset) {
+          // There was an expansion after the corresponding push
+          // ticket was issued.
+          updateFromClosed(state, ticket, offset, slots, cap, stride);
+        }
+        // Adjust ticket
+        ticket -= offset;
+        return true;
+      }
+    } while (true);
+  }
+
+  /// Enqueues an element with a specific ticket number
+  template <typename ...Args>
+  void enqueueWithTicket(const uint64_t ticket, Args&&... args) noexcept {
+    Slot* slots;
+    size_t cap;
+    int stride;
+    uint64_t state;
+    uint64_t offset;
+    while (!trySeqlockReadSection(state, slots, cap, stride)) {}
+    offset = getOffset(state);
+    if (ticket < offset) {
+      // There was an expansion after this ticket was issued.
+      updateFromClosed(state, ticket, offset, slots, cap, stride);
+    }
+    this->enqueueWithTicketBase(ticket-offset, slots, cap, stride,
+                                std::forward<Args>(args)...);
+  }
+
+  uint64_t getOffset(const uint64_t state) const noexcept {
+    return state >> kSeqlockBits;
+  }
+
+  int getNumClosed(const uint64_t state) const noexcept {
+    return (state & ((1 << kSeqlockBits) - 1)) >> 1;
+  }
+
+  /// Try to expand the queue. Returns true if this expansion was
+  /// successful or a concurent expansion is in progress. Returns
+  /// false if the queue has reached its maximum capacity or
+  /// allocation has failed.
+  bool tryExpand(const uint64_t state, const size_t cap) noexcept {
+    if (cap == this->capacity_) {
+      return false;
+    }
+    // Acquire seqlock
+    uint64_t oldval = state;
+    assert((state & 1) == 0);
+    if (this->dstate_.compare_exchange_strong(oldval, state + 1)) {
+      assert(cap == this->dcapacity_.load());
+      uint64_t ticket = 1 + std::max(this->pushTicket_.load(),
+                                     this->popTicket_.load());
+      size_t newCapacity =
+        std::min(dmult_ * cap, this->capacity_);
+      Slot* newSlots =
+        new (std::nothrow) Slot[newCapacity + 2 * this->kSlotPadding];
+      if (newSlots == nullptr) {
+        // Expansion failed. Restore the seqlock
+        this->dstate_.store(state);
+        return false;
+      }
+      // Successful expansion
+      // calculate the current ticket offset
+      uint64_t offset = getOffset(state);
+      // calculate index in closed array
+      int index = getNumClosed(state);
+      assert((index << 1) < (1 << kSeqlockBits));
+      // fill the info for the closed slots array
+      closed_[index].offset_ = offset;
+      closed_[index].slots_ = this->dslots_.load();
+      closed_[index].capacity_ = cap;
+      closed_[index].stride_ = this->dstride_.load();
+      // update the new slots array info
+      this->dslots_.store(newSlots);
+      this->dcapacity_.store(newCapacity);
+      this->dstride_.store(this->computeStride(newCapacity));
+      // Release the seqlock and record the new ticket offset
+      this->dstate_.store((ticket << kSeqlockBits) + (2 * (index + 1)));
+      return true;
+    } else { // failed to acquire seqlock
+      // Someone acaquired the seqlock. Go back to the caller and get
+      // up-to-date info.
+      return true;
+    }
+  }
+
+  /// Seqlock read-only section
+  bool trySeqlockReadSection(
+    uint64_t& state, Slot*& slots, size_t& cap, int& stride
+  ) noexcept {
+    state = this->dstate_.load(std::memory_order_acquire);
+    if (state & 1) {
+      // Locked.
+      return false;
+    }
+    // Start read-only section.
+    slots = this->dslots_.load(std::memory_order_relaxed);
+    cap = this->dcapacity_.load(std::memory_order_relaxed);
+    stride = this->dstride_.load(std::memory_order_relaxed);
+    // End of read-only section. Validate seqlock.
+    std::atomic_thread_fence(std::memory_order_acquire);
+    return (state == this->dstate_.load(std::memory_order_relaxed));
+  }
+
+  /// Update local variables of a lagging operation using the
+  /// most recent closed array with offset <= ticket
+  void updateFromClosed(
+    const uint64_t state, const uint64_t ticket,
+    uint64_t& offset, Slot*& slots, size_t& cap, int& stride
+  ) noexcept {
+    for (int i = getNumClosed(state) - 1; i >= 0; --i) {
+      offset = closed_[i].offset_;
+      if (offset <= ticket) {
+        slots = closed_[i].slots_;
+        cap = closed_[i].capacity_;
+        stride = closed_[i].stride_;
+        return;;
+      }
+    }
+    // A closed array with offset <= ticket should have been found
+    assert(false);
+  }
+};
+
+namespace detail {
+
+/// CRTP specialization of MPMCQueueBase
+template<
+  template<
+    typename T, template<typename> class Atom, bool Dynamic> class Derived,
+  typename T, template<typename> class Atom, bool Dynamic>
+class MPMCQueueBase<Derived<T, Atom, Dynamic>> : boost::noncopyable {
+
+// Note: Using CRTP static casts in several functions of this base
+// template instead of making called functions virtual or duplicating
+// the code of calling functions in the derived partially specialized
+// template
 
   static_assert(std::is_nothrow_constructible<T,T&&>::value ||
                 folly::IsRelocatable<T>::value,
       "T must be relocatable or have a noexcept move constructor");
 
-  friend class detail::MPMCPipelineStageImpl<T>;
  public:
   typedef T value_type;
 
-  explicit MPMCQueue(size_t queueCapacity)
+  using Slot = detail::SingleElementQueue<T,Atom>;
+
+  explicit MPMCQueueBase(size_t queueCapacity)
     : capacity_(queueCapacity)
     , pushTicket_(0)
     , popTicket_(0)
     , pushSpinCutoff_(0)
     , popSpinCutoff_(0)
   {
-    if (queueCapacity == 0)
+    if (queueCapacity == 0) {
       throw std::invalid_argument(
         "MPMCQueue with explicit capacity 0 is impossible"
+        // Stride computation in derived classes would sigfpe if capacity is 0
       );
-
-    // would sigfpe if capacity is 0
-    stride_ = computeStride(queueCapacity);
-    slots_ = new detail::SingleElementQueue<T,Atom>[queueCapacity +
-                                                    2 * kSlotPadding];
+    }
 
     // ideally this would be a static assert, but g++ doesn't allow it
     assert(alignof(MPMCQueue<T,Atom>)
@@ -132,10 +637,12 @@ class MPMCQueue : boost::noncopyable {
 
   /// A default-constructed queue is useful because a usable (non-zero
   /// capacity) queue can be moved onto it or swapped with it
-  MPMCQueue() noexcept
+  MPMCQueueBase() noexcept
     : capacity_(0)
     , slots_(nullptr)
     , stride_(0)
+    , dstate_(0)
+    , dcapacity_(0)
     , pushTicket_(0)
     , popTicket_(0)
     , pushSpinCutoff_(0)
@@ -145,10 +652,12 @@ class MPMCQueue : boost::noncopyable {
   /// IMPORTANT: The move constructor is here to make it easier to perform
   /// the initialization phase, it is not safe to use when there are any
   /// concurrent accesses (this is not checked).
-  MPMCQueue(MPMCQueue<T,Atom>&& rhs) noexcept
+  MPMCQueueBase(MPMCQueueBase<Derived<T,Atom,Dynamic>>&& rhs) noexcept
     : capacity_(rhs.capacity_)
     , slots_(rhs.slots_)
     , stride_(rhs.stride_)
+    , dstate_(rhs.dstate_.load(std::memory_order_relaxed))
+    , dcapacity_(rhs.dcapacity_.load(std::memory_order_relaxed))
     , pushTicket_(rhs.pushTicket_.load(std::memory_order_relaxed))
     , popTicket_(rhs.popTicket_.load(std::memory_order_relaxed))
     , pushSpinCutoff_(rhs.pushSpinCutoff_.load(std::memory_order_relaxed))
@@ -161,6 +670,8 @@ class MPMCQueue : boost::noncopyable {
     rhs.capacity_ = 0;
     rhs.slots_ = nullptr;
     rhs.stride_ = 0;
+    rhs.dstate_.store(0, std::memory_order_relaxed);
+    rhs.dcapacity_.store(0, std::memory_order_relaxed);
     rhs.pushTicket_.store(0, std::memory_order_relaxed);
     rhs.popTicket_.store(0, std::memory_order_relaxed);
     rhs.pushSpinCutoff_.store(0, std::memory_order_relaxed);
@@ -170,17 +681,18 @@ class MPMCQueue : boost::noncopyable {
   /// IMPORTANT: The move operator is here to make it easier to perform
   /// the initialization phase, it is not safe to use when there are any
   /// concurrent accesses (this is not checked).
-  MPMCQueue<T,Atom> const& operator= (MPMCQueue<T,Atom>&& rhs) {
+  MPMCQueueBase<Derived<T,Atom,Dynamic>> const& operator=
+    (MPMCQueueBase<Derived<T,Atom,Dynamic>>&& rhs) {
     if (this != &rhs) {
-      this->~MPMCQueue();
-      new (this) MPMCQueue(std::move(rhs));
+      this->~MPMCQueueBase();
+      new (this) MPMCQueueBase(std::move(rhs));
     }
     return *this;
   }
 
   /// MPMCQueue can only be safely destroyed when there are no
   /// pending enqueuers or dequeuers (this is not checked).
-  ~MPMCQueue() {
+  ~MPMCQueueBase() {
     delete[] slots_;
   }
 
@@ -235,6 +747,11 @@ class MPMCQueue : boost::noncopyable {
     return capacity_;
   }
 
+  /// Doesn't change for non-dynamic
+  size_t allocatedCapacity() const noexcept {
+    return capacity_;
+  }
+
   /// Returns the total number of calls to blockingWrite or successful
   /// calls to write, including those blockingWrite calls that are
   /// currently blocking
@@ -256,7 +773,8 @@ class MPMCQueue : boost::noncopyable {
   /// to a T constructor.
   template <typename ...Args>
   void blockingWrite(Args&&... args) noexcept {
-    enqueueWithTicket(pushTicket_++, std::forward<Args>(args)...);
+    enqueueWithTicketBase(pushTicket_++, slots_, capacity_, stride_,
+                          std::forward<Args>(args)...);
   }
 
   /// If an item can be enqueued with no blocking, does so and returns
@@ -275,9 +793,14 @@ class MPMCQueue : boost::noncopyable {
   template <typename ...Args>
   bool write(Args&&... args) noexcept {
     uint64_t ticket;
-    if (tryObtainReadyPushTicket(ticket)) {
+    Slot* slots;
+    size_t cap;
+    int stride;
+    if (static_cast<Derived<T,Atom,Dynamic>*>(this)->
+        tryObtainReadyPushTicket(ticket, slots, cap, stride)) {
       // we have pre-validated that the ticket won't block
-      enqueueWithTicket(ticket, std::forward<Args>(args)...);
+      enqueueWithTicketBase(ticket, slots, cap, stride,
+                            std::forward<Args>(args)...);
       return true;
     } else {
       return false;
@@ -288,11 +811,15 @@ class MPMCQueue : boost::noncopyable {
   bool tryWriteUntil(const std::chrono::time_point<Clock>& when,
                      Args&&... args) noexcept {
     uint64_t ticket;
-    if (tryObtainPromisedPushTicketUntil(ticket, when)) {
-      // we have pre-validated that the ticket won't block, or rather that
-      // it won't block longer than it takes another thread to dequeue an
-      // element from the slot it identifies.
-      enqueueWithTicket(ticket, std::forward<Args>(args)...);
+    Slot* slots;
+    size_t cap;
+    int stride;
+    if (tryObtainPromisedPushTicketUntil(ticket, slots, cap, stride, when)) {
+        // we have pre-validated that the ticket won't block, or rather that
+        // it won't block longer than it takes another thread to dequeue an
+        // element from the slot it identifies.
+      enqueueWithTicketBase(ticket, slots, cap, stride,
+                            std::forward<Args>(args)...);
       return true;
     } else {
       return false;
@@ -315,10 +842,15 @@ class MPMCQueue : boost::noncopyable {
   template <typename ...Args>
   bool writeIfNotFull(Args&&... args) noexcept {
     uint64_t ticket;
-    if (tryObtainPromisedPushTicket(ticket)) {
+    Slot* slots;
+    size_t cap;
+    int stride;
+    if (static_cast<Derived<T,Atom,Dynamic>*>(this)->
+        tryObtainPromisedPushTicket(ticket, slots, cap, stride)) {
       // some other thread is already dequeuing the slot into which we
       // are going to enqueue, but we might have to wait for them to finish
-      enqueueWithTicket(ticket, std::forward<Args>(args)...);
+      enqueueWithTicketBase(ticket, slots, cap, stride,
+                            std::forward<Args>(args)...);
       return true;
     } else {
       return false;
@@ -328,16 +860,33 @@ class MPMCQueue : boost::noncopyable {
   /// Moves a dequeued element onto elem, blocking until an element
   /// is available
   void blockingRead(T& elem) noexcept {
-    dequeueWithTicket(popTicket_++, elem);
+    uint64_t ticket;
+    static_cast<Derived<T,Atom,Dynamic>*>(this)->
+      blockingReadWithTicket(ticket, elem);
+  }
+
+  /// Same as blockingRead() but also records the ticket nunmer
+  void blockingReadWithTicket(uint64_t& ticket, T& elem) noexcept {
+    ticket = popTicket_++;
+    dequeueWithTicketBase(ticket, slots_, capacity_, stride_, elem);
   }
 
   /// If an item can be dequeued with no blocking, does so and returns
   /// true, otherwise returns false.
   bool read(T& elem) noexcept {
     uint64_t ticket;
-    if (tryObtainReadyPopTicket(ticket)) {
+    return readAndGetTicket(ticket, elem);
+  }
+
+  /// Same as read() but also records the ticket nunmer
+  bool readAndGetTicket(uint64_t& ticket, T& elem) noexcept {
+    Slot* slots;
+    size_t cap;
+    int stride;
+    if (static_cast<Derived<T,Atom,Dynamic>*>(this)->
+        tryObtainReadyPopTicket(ticket, slots, cap, stride)) {
       // the ticket has been pre-validated to not block
-      dequeueWithTicket(ticket, elem);
+      dequeueWithTicketBase(ticket, slots, cap, stride, elem);
       return true;
     } else {
       return false;
@@ -351,16 +900,20 @@ class MPMCQueue : boost::noncopyable {
   /// prefer read.
   bool readIfNotEmpty(T& elem) noexcept {
     uint64_t ticket;
-    if (tryObtainPromisedPopTicket(ticket)) {
+    Slot* slots;
+    size_t cap;
+    int stride;
+    if (static_cast<Derived<T,Atom,Dynamic>*>(this)->
+        tryObtainPromisedPopTicket(ticket, slots, cap, stride)) {
       // the matching enqueue already has a ticket, but might not be done
-      dequeueWithTicket(ticket, elem);
+      dequeueWithTicketBase(ticket, slots, cap, stride, elem);
       return true;
     } else {
       return false;
     }
   }
 
- private:
+ protected:
   enum {
     /// Once every kAdaptationFreq we will spin longer, to try to estimate
     /// the proper spin backoff
@@ -370,21 +923,41 @@ class MPMCQueue : boost::noncopyable {
     /// allocations, we pad it with this many SingleElementQueue-s at
     /// each end
     kSlotPadding = (detail::CacheLocality::kFalseSharingRange - 1)
-        / sizeof(detail::SingleElementQueue<T,Atom>) + 1
+        / sizeof(Slot) + 1
   };
 
   /// The maximum number of items in the queue at once
   size_t FOLLY_ALIGN_TO_AVOID_FALSE_SHARING capacity_;
 
-  /// An array of capacity_ SingleElementQueue-s, each of which holds
-  /// either 0 or 1 item.  We over-allocate by 2 * kSlotPadding and don't
-  /// touch the slots at either end, to avoid false sharing
-  detail::SingleElementQueue<T,Atom>* slots_;
+  /// Anonymous union for use when Dynamic = false and true, respectively
+  union {
+    /// An array of capacity_ SingleElementQueue-s, each of which holds
+    /// either 0 or 1 item.  We over-allocate by 2 * kSlotPadding and don't
+    /// touch the slots at either end, to avoid false sharing
+    Slot* slots_;
+    /// Current dynamic slots array of dcapacity_ SingleElementQueue-s
+    Atom<Slot*> dslots_;
+  };
+
+  /// Anonymous union for use when Dynamic = false and true, respectively
+  union {
+    /// The number of slots_ indices that we advance for each ticket, to
+    /// avoid false sharing.  Ideally slots_[i] and slots_[i + stride_]
+    /// aren't on the same cache line
+    int stride_;
+    /// Current stride
+    Atom<int> dstride_;
+  };
 
-  /// The number of slots_ indices that we advance for each ticket, to
-  /// avoid false sharing.  Ideally slots_[i] and slots_[i + stride_]
-  /// aren't on the same cache line
-  int stride_;
+  /// The following two memebers are used by dynamic MPMCQueue.
+  /// Ideally they should be in MPMCQueue<T,Atom,true>, but we get
+  /// better cache locality if they are in the same cache line as
+  /// dslots_ and dstride_.
+  ///
+  /// Dynamic state. A packed seqlock and ticket offset
+  Atom<uint64_t> dstate_;
+  /// Dynamic capacity
+  Atom<size_t> dcapacity_;
 
   /// Enqueuers get tickets from here
   Atom<uint64_t> FOLLY_ALIGN_TO_AVOID_FALSE_SHARING pushTicket_;
@@ -405,7 +978,6 @@ class MPMCQueue : boost::noncopyable {
   char padding_[detail::CacheLocality::kFalseSharingRange -
                 sizeof(Atom<uint32_t>)];
 
-
   /// We assign tickets in increasing order, but we don't want to
   /// access neighboring elements of slots_ because that will lead to
   /// false sharing (multiple cores accessing the same cache line even
@@ -446,23 +1018,29 @@ class MPMCQueue : boost::noncopyable {
 
   /// Returns the index into slots_ that should be used when enqueuing or
   /// dequeuing with the specified ticket
-  size_t idx(uint64_t ticket) noexcept {
-    return ((ticket * stride_) % capacity_) + kSlotPadding;
+  size_t idx(uint64_t ticket, size_t cap, int stride) noexcept {
+    return ((ticket * stride) % cap) + kSlotPadding;
   }
 
   /// Maps an enqueue or dequeue ticket to the turn should be used at the
   /// corresponding SingleElementQueue
-  uint32_t turn(uint64_t ticket) noexcept {
-    return ticket / capacity_;
+  uint32_t turn(uint64_t ticket, size_t cap) noexcept {
+    return ticket / cap;
   }
 
   /// Tries to obtain a push ticket for which SingleElementQueue::enqueue
   /// won't block.  Returns true on immediate success, false on immediate
   /// failure.
-  bool tryObtainReadyPushTicket(uint64_t& rv) noexcept {
-    auto ticket = pushTicket_.load(std::memory_order_acquire); // A
+  bool tryObtainReadyPushTicket(
+    uint64_t& ticket, Slot*& slots, size_t& cap, int& stride
+  ) noexcept {
+    ticket = pushTicket_.load(std::memory_order_acquire); // A
+    slots = slots_;
+    cap = capacity_;
+    stride = stride_;
     while (true) {
-      if (!slots_[idx(ticket)].mayEnqueue(turn(ticket))) {
+      if (!slots[idx(ticket, cap, stride)]
+          .mayEnqueue(turn(ticket, cap))) {
         // if we call enqueue(ticket, ...) on the SingleElementQueue
         // right now it would block, but this might no longer be the next
         // ticket.  We can increase the chance of tryEnqueue success under
@@ -479,7 +1057,6 @@ class MPMCQueue : boost::noncopyable {
         // or prev failing CAS) and the following CAS.  If the CAS fails
         // it will effect a load of pushTicket_
         if (pushTicket_.compare_exchange_strong(ticket, ticket + 1)) {
-          rv = ticket;
           return true;
         }
       }
@@ -492,18 +1069,22 @@ class MPMCQueue : boost::noncopyable {
   /// ticket is filled on success AND failure.
   template <class Clock>
   bool tryObtainPromisedPushTicketUntil(
-      uint64_t& ticket, const std::chrono::time_point<Clock>& when) noexcept {
+    uint64_t& ticket, Slot*& slots, size_t& cap, int& stride,
+    const std::chrono::time_point<Clock>& when
+  ) noexcept {
     bool deadlineReached = false;
     while (!deadlineReached) {
-      if (tryObtainPromisedPushTicket(ticket)) {
+      if (static_cast<Derived<T,Atom,Dynamic>*>(this)->
+          tryObtainPromisedPushTicket(ticket, slots, cap, stride)) {
         return true;
       }
       // ticket is a blocking ticket until the preceding ticket has been
       // processed: wait until this ticket's turn arrives. We have not reserved
       // this ticket so we will have to re-attempt to get a non-blocking ticket
       // if we wake up before we time-out.
-      deadlineReached = !slots_[idx(ticket)].tryWaitForEnqueueTurnUntil(
-          turn(ticket), pushSpinCutoff_, (ticket % kAdaptationFreq) == 0, when);
+      deadlineReached = !slots[idx(ticket, cap, stride)]
+        .tryWaitForEnqueueTurnUntil(turn(ticket, cap), pushSpinCutoff_,
+                                    (ticket % kAdaptationFreq) == 0, when);
     }
     return false;
   }
@@ -513,13 +1094,18 @@ class MPMCQueue : boost::noncopyable {
   /// blocking may be required when using the returned ticket if some
   /// other thread's pop is still in progress (ticket has been granted but
   /// pop has not yet completed).
-  bool tryObtainPromisedPushTicket(uint64_t& rv) noexcept {
+  bool tryObtainPromisedPushTicket(
+    uint64_t& ticket, Slot*& slots, size_t& cap, int& stride
+  ) noexcept {
     auto numPushes = pushTicket_.load(std::memory_order_acquire); // A
+    slots = slots_;
+    cap = capacity_;
+    stride = stride_;
     while (true) {
       auto numPops = popTicket_.load(std::memory_order_acquire); // B
       // n will be negative if pops are pending
       int64_t n = numPushes - numPops;
-      rv = numPushes;
+      ticket = numPushes;
       if (n >= static_cast<ssize_t>(capacity_)) {
         // Full, linearize at B.  We don't need to recheck the read we
         // performed at A, because if numPushes was stale at B then the
@@ -535,10 +1121,16 @@ class MPMCQueue : boost::noncopyable {
   /// Tries to obtain a pop ticket for which SingleElementQueue::dequeue
   /// won't block.  Returns true on immediate success, false on immediate
   /// failure.
-  bool tryObtainReadyPopTicket(uint64_t& rv) noexcept {
-    auto ticket = popTicket_.load(std::memory_order_acquire);
+  bool tryObtainReadyPopTicket(
+    uint64_t& ticket, Slot*& slots, size_t& cap, int& stride
+  ) noexcept {
+    ticket = popTicket_.load(std::memory_order_acquire);
+    slots = slots_;
+    cap = capacity_;
+    stride = stride_;
     while (true) {
-      if (!slots_[idx(ticket)].mayDequeue(turn(ticket))) {
+      if (!slots[idx(ticket, cap, stride)]
+          .mayDequeue(turn(ticket, cap))) {
         auto prev = ticket;
         ticket = popTicket_.load(std::memory_order_acquire);
         if (prev == ticket) {
@@ -546,7 +1138,6 @@ class MPMCQueue : boost::noncopyable {
         }
       } else {
         if (popTicket_.compare_exchange_strong(ticket, ticket + 1)) {
-          rv = ticket;
           return true;
         }
       }
@@ -563,7 +1154,9 @@ class MPMCQueue : boost::noncopyable {
   /// to block waiting for someone to call enqueue, although we might
   /// have to block waiting for them to finish executing code inside the
   /// MPMCQueue itself.
-  bool tryObtainPromisedPopTicket(uint64_t& rv) noexcept {
+  bool tryObtainPromisedPopTicket(
+    uint64_t& ticket, Slot*& slots, size_t& cap, int& stride
+  ) noexcept {
     auto numPops = popTicket_.load(std::memory_order_acquire); // A
     while (true) {
       auto numPushes = pushTicket_.load(std::memory_order_acquire); // B
@@ -574,7 +1167,10 @@ class MPMCQueue : boost::noncopyable {
         return false;
       }
       if (popTicket_.compare_exchange_strong(numPops, numPops + 1)) {
-        rv = numPops;
+        ticket = numPops;
+        slots = slots_;
+        cap = capacity_;
+        stride = stride_;
         return true;
       }
     }
@@ -582,25 +1178,35 @@ class MPMCQueue : boost::noncopyable {
 
   // Given a ticket, constructs an enqueued item using args
   template <typename ...Args>
+  void enqueueWithTicketBase(
+    uint64_t ticket, Slot* slots, size_t cap, int stride, Args&&... args
+  ) noexcept {
+    slots[idx(ticket, cap, stride)]
+      .enqueue(turn(ticket, cap),
+               pushSpinCutoff_,
+               (ticket % kAdaptationFreq) == 0,
+               std::forward<Args>(args)...);
+  }
+
+  // To support tracking ticket numbers in MPMCPipelineStageImpl
+  template <typename ...Args>
   void enqueueWithTicket(uint64_t ticket, Args&&... args) noexcept {
-    slots_[idx(ticket)].enqueue(turn(ticket),
-                                pushSpinCutoff_,
-                                (ticket % kAdaptationFreq) == 0,
-                                std::forward<Args>(args)...);
+    enqueueWithTicketBase(ticket, slots_, capacity_, stride_,
+                          std::forward<Args>(args)...);
   }
 
   // Given a ticket, dequeues the corresponding element
-  void dequeueWithTicket(uint64_t ticket, T& elem) noexcept {
-    slots_[idx(ticket)].dequeue(turn(ticket),
-                                popSpinCutoff_,
-                                (ticket % kAdaptationFreq) == 0,
-                                elem);
+  void dequeueWithTicketBase(
+    uint64_t ticket, Slot* slots, size_t cap, int stride, T& elem
+  ) noexcept {
+    slots[idx(ticket, cap, stride)]
+      .dequeue(turn(ticket, cap),
+               popSpinCutoff_,
+               (ticket % kAdaptationFreq) == 0,
+               elem);
   }
 };
 
-
-namespace detail {
-
 /// SingleElementQueue implements a blocking queue that holds at most one
 /// item, and that requires its users to assign incrementing identifiers
 /// (turns) to each enqueue and dequeue operation.  Note that the turns