Dynamic MPMCQueue: Eliminate cases of enqueue indefinite blocking and failure in...
authorMaged Michael <magedmichael@fb.com>
Wed, 30 Aug 2017 19:53:16 +0000 (12:53 -0700)
committerFacebook Github Bot <facebook-github-bot@users.noreply.github.com>
Wed, 30 Aug 2017 20:09:17 +0000 (13:09 -0700)
Summary:
Currently under the extensible version (Dynamic == true), some enqueue operations may block indefinitely or fail (return false) even though such outcomes are impossible under the default (Dynamic == false) pre-allocated version.

This diff eliminates such cases by changing the algorithms for the extensible version. Some of the high-level changes:
- The offset formula for an expansion guarantees that no enqueue operation left behind in a closed array does not have an existing dequeue operation that unblocks it. The old formula was 1 + max(head, tail). The new formula is max(head, current offset) + current capacity.
- Conditional operations validate state after the success of CAS.

Reviewed By: djwatson

Differential Revision: D5701013

fbshipit-source-id: 4917c5b35b7e2a2fddfd2e11fb5aeb478502137c

folly/MPMCQueue.h
folly/test/MPMCQueueTest.cpp

index b3564126600dbc09dff350d62ed9a03df18b8463..743de144fe9c51282c9cac4705fbff1bdccb88a2 100644 (file)
@@ -118,8 +118,22 @@ class MPMCQueue : public detail::MPMCQueueBase<MPMCQueue<T,Atom,Dynamic>> {
 
 /// The dynamic version of MPMCQueue allows dynamic expansion of queue
 /// capacity, such that a queue may start with a smaller capacity than
-/// specified and expand only if needed. Users may optionally specify
-/// the initial capacity and the expansion multiplier.
+/// specified and expand if needed up to the specified capacity.
+/// Shrinking is not supported at this point.
+///
+/// Users may optionally specify the initial capacity and the
+/// expansion multiplier. Otherwise default values are used.
+///
+/// Operation on the dynamic version have the same semantics as for
+/// the default fixed-size version, except that the total queue
+/// capacity can temporarily exceed the specified capacity when there
+/// are lagging consumers that haven't yet consumed all the elements
+/// in closed arrays. Users should not rely on the capacity of dynamic
+/// queues for synchronization, e.g., they should not expect that a
+/// thread will definitely block on a call to blockingWrite() when the
+/// queue size is known to be equal to its capacity.
+///
+/// Design Overview:
 ///
 /// The design uses a seqlock to enforce mutual exclusion among
 /// expansion attempts. Regular operations read up-to-date queue
@@ -144,7 +158,7 @@ class MPMCQueue : public detail::MPMCQueueBase<MPMCQueue<T,Atom,Dynamic>> {
 /// closed arrays instead of the current one. Information about closed
 /// slots arrays (array address, capacity, stride, and offset) is
 /// maintained in a logarithmic-sized structure. Each entry in that
-/// structure never need to be changed once set. The number of closed
+/// structure never needs to be changed once set. The number of closed
 /// arrays is half the value of the seqlock (when unlocked).
 ///
 /// The acquisition of the seqlock to perform an expansion does not
@@ -154,21 +168,6 @@ class MPMCQueue : public detail::MPMCQueueBase<MPMCQueue<T,Atom,Dynamic>> {
 /// seqlock read-only section (and hence obtained information for
 /// older closed arrays).
 ///
-/// Note that the total queue capacity can temporarily exceed the
-/// specified capacity when there are lagging consumers that haven't
-/// yet consumed all the elements in closed arrays. Users should not
-/// rely on the capacity of dynamic queues for synchronization, e.g.,
-/// they should not expect that a thread will definitely block on a
-/// call to blockingWrite() when the queue size is known to be equal
-/// to its capacity.
-///
-/// Note that some writeIfNotFull() and tryWriteUntil() operations may
-/// fail even if the size of the queue is less than its maximum
-/// capacity and despite the success of expansion, if the operation
-/// happens to acquire a ticket that belongs to a closed array. This
-/// is a transient condition. Typically, one or two ticket values may
-/// be subject to such condition per expansion.
-///
 /// The dynamic version is a partial specialization of MPMCQueue with
 /// Dynamic == true
 template <typename T, template <typename> class Atom>
@@ -272,36 +271,31 @@ class MPMCQueue<T,Atom,true> :
     int stride;
     uint64_t state;
     uint64_t offset;
-    do {
-      if (!trySeqlockReadSection(state, slots, cap, stride)) {
+    while (true) {
+      while (UNLIKELY(!trySeqlockReadSection(state, slots, cap, stride))) {
         asm_volatile_pause();
-        continue;
       }
-      if (maybeUpdateFromClosed(state, ticket, offset, slots, cap, stride)) {
-        // There was an expansion after this ticket was issued.
+      maybeUpdateFromClosed(state, ticket, offset, slots, cap, stride);
+      if (LIKELY(slots[this->idx((ticket - offset), cap, stride)].mayEnqueue(
+              this->turn(ticket - offset, cap)))) {
+        // A slot is ready. Fast path. No need to expand.
         break;
       }
-      if (slots[this->idx((ticket-offset), cap, stride)]
-          .mayEnqueue(this->turn(ticket-offset, cap))) {
-        // A slot is ready. No need to expand.
-        break;
-      } else if (this->popTicket_.load(std::memory_order_relaxed) + cap
-                 > ticket) {
+      // Slow path
+      auto head = this->popTicket_.load(std::memory_order_relaxed);
+      auto avail = std::max(head, offset) + cap;
+      if (ticket < avail) {
         // May block, but a pop is in progress. No need to expand.
-        // Get seqlock read section info again in case an expansion
-        // occurred with an equal or higher ticket.
-        continue;
-      } else {
-        // May block. See if we can expand.
-        if (tryExpand(state, cap)) {
-          // This or another thread started an expansion. Get updated info.
-          continue;
-        } else {
-          // Can't expand.
-          break;
-        }
+        break;
+      }
+      // Try to expand, otherwise this operation may block
+      // indefinitely awaiting a consumer to unblock it.
+      if (!tryExpand(state, cap)) {
+        // Can't expand. Block.
+        break;
       }
-    } while (true);
+      // Either this or another thread started an expansion Get up-to-date info.
+    }
     this->enqueueWithTicketBase(ticket-offset, slots, cap, stride,
                                 std::forward<Args>(args)...);
   }
@@ -313,7 +307,7 @@ class MPMCQueue<T,Atom,true> :
     int stride;
     uint64_t state;
     uint64_t offset;
-    while (!trySeqlockReadSection(state, slots, cap, stride)) {
+    while (UNLIKELY(!trySeqlockReadSection(state, slots, cap, stride))) {
       asm_volatile_pause();
     }
     // If there was an expansion after the corresponding push ticket
@@ -323,11 +317,10 @@ class MPMCQueue<T,Atom,true> :
   }
 
  private:
-
   enum {
-    kSeqlockBits = 6,
-    kDefaultMinDynamicCapacity = 10,
-    kDefaultExpansionMultiplier = 10,
+    kSeqlockBits = 8,
+    kDefaultMinDynamicCapacity = 16,
+    kDefaultExpansionMultiplier = 8,
   };
 
   size_t dmult_;
@@ -354,69 +347,86 @@ class MPMCQueue<T,Atom,true> :
       uint64_t& ticket, Slot*& slots, size_t& cap, int& stride
   ) noexcept {
     uint64_t state;
-    do {
+    while (true) {
       ticket = this->pushTicket_.load(std::memory_order_acquire); // A
-      if (!trySeqlockReadSection(state, slots, cap, stride)) {
+      if (UNLIKELY(!trySeqlockReadSection(state, slots, cap, stride))) {
         asm_volatile_pause();
         continue;
       }
-
       // If there was an expansion with offset greater than this ticket,
       // adjust accordingly
       uint64_t offset;
       maybeUpdateFromClosed(state, ticket, offset, slots, cap, stride);
-
-      if (slots[this->idx((ticket-offset), cap, stride)]
-          .mayEnqueue(this->turn(ticket-offset, cap))) {
+      if (LIKELY(slots[this->idx((ticket - offset), cap, stride)].mayEnqueue(
+              this->turn(ticket - offset, cap)))) {
         // A slot is ready.
-        if (this->pushTicket_.compare_exchange_strong(ticket, ticket + 1)) {
-          // Adjust ticket
-          ticket -= offset;
-          return true;
-        } else {
+        if (UNLIKELY(!this->pushTicket_.compare_exchange_strong(
+                ticket, ticket + 1))) {
           continue;
         }
-      } else {
-        if (ticket != this->pushTicket_.load(std::memory_order_relaxed)) { // B
-          // Try again. Ticket changed.
-          continue;
+        // Validate that state is the same
+        if (LIKELY(state == this->dstate_.load(std::memory_order_acquire))) {
+          ticket -= offset;
+          return true;
         }
-        // Likely to block.
-        // Try to expand unless the ticket is for a closed array
-        if (offset == getOffset(state)) {
-          if (tryExpand(state, cap)) {
-            // This or another thread started an expansion. Get up-to-date info.
-            continue;
+        // Slow path - state changed - get up-to-date info for obtained ticket
+        while (true) {
+          state = this->dstate_.load(std::memory_order_acquire);
+          if (trySeqlockReadSection(state, slots, cap, stride)) {
+            break;
           }
+          asm_volatile_pause();
         }
-        return false;
+        maybeUpdateFromClosed(state, ticket, offset, slots, cap, stride);
+        ticket -= offset;
+        return true;
+      }
+      // slow path - no ready ticket
+      if (ticket != this->pushTicket_.load(std::memory_order_relaxed)) { // B
+        // Ticket changed. Start over.
+        continue;
+      }
+      auto head = this->popTicket_.load(std::memory_order_acquire);
+      auto avail = std::max(head, offset) + cap;
+      if (ticket < avail) {
+        // a consumer is in the process of making the slot available
+        // don't try to expand. Spin if capacity is not
+        // exhausted. Otherwise return false.
+        if (cap == this->capacity_) {
+          return false;
+        }
+        asm_volatile_pause();
+        continue;
       }
-    } while (true);
+      // Likely to block. Try to expand.
+      if (tryExpand(state, cap)) {
+        // This or another thread started an expansion. Get up-to-date info.
+        continue;
+      }
+      // No ready ticket and cannot expand
+      return false;
+    }
   }
 
   bool tryObtainPromisedPushTicket(
     uint64_t& ticket, Slot*& slots, size_t& cap, int& stride
   ) noexcept {
     uint64_t state;
-    do {
+    while (true) {
       ticket = this->pushTicket_.load(std::memory_order_acquire);
-      auto numPops = this->popTicket_.load(std::memory_order_acquire);
-      if (!trySeqlockReadSection(state, slots, cap, stride)) {
+      auto head = this->popTicket_.load(std::memory_order_acquire);
+      if (UNLIKELY(!trySeqlockReadSection(state, slots, cap, stride))) {
         asm_volatile_pause();
         continue;
       }
-
-      const auto curCap = cap;
       // If there was an expansion with offset greater than this ticket,
       // adjust accordingly
       uint64_t offset;
       maybeUpdateFromClosed(state, ticket, offset, slots, cap, stride);
-
-      int64_t n = ticket - numPops;
-
-      if (n >= static_cast<ssize_t>(cap)) {
-        if ((cap == curCap) && tryExpand(state, cap)) {
-          // This or another thread started an expansion. Start over.
+      auto avail = std::max(offset, head) + cap;
+      if (UNLIKELY(ticket >= avail)) {
+        if (tryExpand(state, cap)) {
+          // Space may be available. Start over.
           continue;
         }
         // Can't expand.
@@ -424,69 +434,110 @@ class MPMCQueue<T,Atom,true> :
         return false;
       }
 
-      if (this->pushTicket_.compare_exchange_strong(ticket, ticket + 1)) {
-        // Adjust ticket
+      if (UNLIKELY((!this->pushTicket_.compare_exchange_strong(
+              ticket, ticket + 1)))) {
+        continue;
+      }
+      // Validate that state is the same
+      if (LIKELY(state == this->dstate_.load(std::memory_order_acquire))) {
         ticket -= offset;
         return true;
       }
-    } while (true);
+      // Obtained ticket but info is out-of-date - Update info
+      while (true) {
+        state = this->dstate_.load(std::memory_order_acquire);
+        if (trySeqlockReadSection(state, slots, cap, stride)) {
+          break;
+        }
+        asm_volatile_pause();
+      }
+      maybeUpdateFromClosed(state, ticket, offset, slots, cap, stride);
+      ticket -= offset;
+      return true;
+    }
   }
 
   bool tryObtainReadyPopTicket(
     uint64_t& ticket, Slot*& slots, size_t& cap, int& stride
   ) noexcept {
     uint64_t state;
-    do {
+    while (true) {
       ticket = this->popTicket_.load(std::memory_order_relaxed);
-      if (!trySeqlockReadSection(state, slots, cap, stride)) {
+      if (UNLIKELY(!trySeqlockReadSection(state, slots, cap, stride))) {
         asm_volatile_pause();
         continue;
       }
-
       // If there was an expansion after the corresponding push ticket
       // was issued, adjust accordingly
       uint64_t offset;
       maybeUpdateFromClosed(state, ticket, offset, slots, cap, stride);
-
-      if (slots[this->idx((ticket-offset), cap, stride)]
-          .mayDequeue(this->turn(ticket-offset, cap))) {
-        if (this->popTicket_.compare_exchange_strong(ticket, ticket + 1)) {
-          // Adjust ticket
-          ticket -= offset;
-          return true;
-        }
-      } else {
+      if (UNLIKELY(!slots[this->idx((ticket - offset), cap, stride)].mayDequeue(
+              this->turn(ticket - offset, cap)))) {
         return false;
       }
-    } while (true);
+      if (UNLIKELY(
+              !this->popTicket_.compare_exchange_strong(ticket, ticket + 1))) {
+        continue;
+      }
+      // Validate that state is the same
+      if (LIKELY(state == this->dstate_.load(std::memory_order_acquire))) {
+        ticket -= offset;
+        return true;
+      }
+      // Obtained ticket but info is out-of-date - Update info
+      while (true) {
+        state = this->dstate_.load(std::memory_order_acquire);
+        if (trySeqlockReadSection(state, slots, cap, stride)) {
+          break;
+        }
+        asm_volatile_pause();
+      }
+      maybeUpdateFromClosed(state, ticket, offset, slots, cap, stride);
+      ticket -= offset;
+      return true;
+    }
   }
 
   bool tryObtainPromisedPopTicket(
     uint64_t& ticket, Slot*& slots, size_t& cap, int& stride
   ) noexcept {
     uint64_t state;
-    do {
+    while (true) {
       ticket = this->popTicket_.load(std::memory_order_acquire);
       auto numPushes = this->pushTicket_.load(std::memory_order_acquire);
-      if (!trySeqlockReadSection(state, slots, cap, stride)) {
+      if (UNLIKELY(!trySeqlockReadSection(state, slots, cap, stride))) {
         asm_volatile_pause();
         continue;
       }
-
       uint64_t offset;
       // If there was an expansion after the corresponding push
       // ticket was issued, adjust accordingly
       maybeUpdateFromClosed(state, ticket, offset, slots, cap, stride);
-
-      if (ticket >= numPushes) {
+      if (UNLIKELY(ticket >= numPushes)) {
         ticket -= offset;
         return false;
       }
-      if (this->popTicket_.compare_exchange_strong(ticket, ticket + 1)) {
+      if (UNLIKELY(
+              !this->popTicket_.compare_exchange_strong(ticket, ticket + 1))) {
+        continue;
+      }
+      // Validate that state is the same
+      if (LIKELY(state == this->dstate_.load(std::memory_order_acquire))) {
         ticket -= offset;
         return true;
       }
-    } while (true);
+      // Obtained ticket but info is out-of-date - Update info
+      while (true) {
+        state = this->dstate_.load(std::memory_order_acquire);
+        if (trySeqlockReadSection(state, slots, cap, stride)) {
+          break;
+        }
+        asm_volatile_pause();
+      }
+      maybeUpdateFromClosed(state, ticket, offset, slots, cap, stride);
+      ticket -= offset;
+      return true;
+    }
   }
 
   /// Enqueues an element with a specific ticket number
@@ -498,7 +549,9 @@ class MPMCQueue<T,Atom,true> :
     uint64_t state;
     uint64_t offset;
 
-    while (!trySeqlockReadSection(state, slots, cap, stride)) {}
+    while (UNLIKELY(!trySeqlockReadSection(state, slots, cap, stride))) {
+      asm_volatile_pause();
+    }
 
     // If there was an expansion after this ticket was issued, adjust
     // accordingly
@@ -517,52 +570,56 @@ class MPMCQueue<T,Atom,true> :
   }
 
   /// Try to expand the queue. Returns true if this expansion was
-  /// successful or a concurent expansion is in progress. Returns
+  /// successful or a concurent expansion is in progresse. Returns
   /// false if the queue has reached its maximum capacity or
   /// allocation has failed.
   bool tryExpand(const uint64_t state, const size_t cap) noexcept {
-    if (cap == this->capacity_) {
+    if (LIKELY(cap == this->capacity_)) {
       return false;
     }
+    return tryExpandWithSeqlock(state, cap);
+  }
+
+  bool tryExpandWithSeqlock(const uint64_t state, const size_t cap) noexcept {
     // Acquire seqlock
     uint64_t oldval = state;
     assert((state & 1) == 0);
-    if (this->dstate_.compare_exchange_strong(oldval, state + 1)) {
-      assert(cap == this->dcapacity_.load());
-      uint64_t ticket = 1 + std::max(this->pushTicket_.load(),
-                                     this->popTicket_.load());
-      size_t newCapacity =
-        std::min(dmult_ * cap, this->capacity_);
-      Slot* newSlots =
-        new (std::nothrow) Slot[newCapacity + 2 * this->kSlotPadding];
-      if (newSlots == nullptr) {
-        // Expansion failed. Restore the seqlock
-        this->dstate_.store(state);
-        return false;
-      }
-      // Successful expansion
-      // calculate the current ticket offset
-      uint64_t offset = getOffset(state);
-      // calculate index in closed array
-      int index = getNumClosed(state);
-      assert((index << 1) < (1 << kSeqlockBits));
-      // fill the info for the closed slots array
-      closed_[index].offset_ = offset;
-      closed_[index].slots_ = this->dslots_.load();
-      closed_[index].capacity_ = cap;
-      closed_[index].stride_ = this->dstride_.load();
-      // update the new slots array info
-      this->dslots_.store(newSlots);
-      this->dcapacity_.store(newCapacity);
-      this->dstride_.store(this->computeStride(newCapacity));
-      // Release the seqlock and record the new ticket offset
-      this->dstate_.store((ticket << kSeqlockBits) + (2 * (index + 1)));
-      return true;
-    } else { // failed to acquire seqlock
-      // Someone acaquired the seqlock. Go back to the caller and get
-      // up-to-date info.
+    if (!this->dstate_.compare_exchange_strong(oldval, state + 1)) {
+      // Failed to acquire seqlock. Another thread acaquired it.
+      // Go back to the caller and get up-to-date info.
       return true;
     }
+    // Write critical section
+    assert(cap == this->dcapacity_.load());
+    auto head = this->popTicket_.load(std::memory_order_acquire);
+    auto avail = std::max(head, getOffset(state)) + cap;
+    uint64_t newOffset = avail;
+    size_t newCapacity = std::min(dmult_ * cap, this->capacity_);
+    Slot* newSlots =
+        new (std::nothrow) Slot[newCapacity + 2 * this->kSlotPadding];
+    if (newSlots == nullptr) {
+      // Expansion failed. Restore the seqlock
+      this->dstate_.store(state);
+      return false;
+    }
+    // Successful expansion
+    // calculate the current ticket offset
+    uint64_t offset = getOffset(state);
+    // calculate index in list of closed slots arrays
+    int index = getNumClosed(state);
+    assert((index << 1) < (1 << kSeqlockBits));
+    // fill the info for the closed slots array
+    closed_[index].offset_ = offset;
+    closed_[index].slots_ = this->dslots_.load();
+    closed_[index].capacity_ = cap;
+    closed_[index].stride_ = this->dstride_.load();
+    // update the new slots array info
+    this->dslots_.store(newSlots);
+    this->dcapacity_.store(newCapacity);
+    this->dstride_.store(this->computeStride(newCapacity));
+    // Release the seqlock and record the new ticket offset
+    this->dstate_.store((newOffset << kSeqlockBits) + (2 * (index + 1)));
+    return true;
   }
 
   /// Seqlock read-only section
@@ -570,7 +627,7 @@ class MPMCQueue<T,Atom,true> :
     uint64_t& state, Slot*& slots, size_t& cap, int& stride
   ) noexcept {
     state = this->dstate_.load(std::memory_order_acquire);
-    if (state & 1) {
+    if (UNLIKELY(state & 1)) {
       // Locked.
       return false;
     }
@@ -580,7 +637,7 @@ class MPMCQueue<T,Atom,true> :
     stride = this->dstride_.load(std::memory_order_relaxed);
     // End of read-only section. Validate seqlock.
     std::atomic_thread_fence(std::memory_order_acquire);
-    return (state == this->dstate_.load(std::memory_order_relaxed));
+    return LIKELY(state == this->dstate_.load(std::memory_order_relaxed));
   }
 
   /// If there was an expansion after ticket was issued, update local variables
@@ -594,21 +651,32 @@ class MPMCQueue<T,Atom,true> :
       size_t& cap,
       int& stride) noexcept {
     offset = getOffset(state);
-    if (ticket >= offset) {
+    if (LIKELY(ticket >= offset)) {
       return false;
     }
+    updateFromClosed(state, ticket, offset, slots, cap, stride);
+    return true;
+  }
+
+  void updateFromClosed(
+      const uint64_t state,
+      const uint64_t ticket,
+      uint64_t& offset,
+      Slot*& slots,
+      size_t& cap,
+      int& stride) noexcept {
     for (int i = getNumClosed(state) - 1; i >= 0; --i) {
       offset = closed_[i].offset_;
       if (offset <= ticket) {
         slots = closed_[i].slots_;
         cap = closed_[i].capacity_;
         stride = closed_[i].stride_;
-        return true;
+        return;
       }
     }
     // A closed array with offset <= ticket should have been found
     assert(false);
-    return false;
+    return;
   }
 };
 
index 0874a8f2f8e6c05ed168f8474a1f257091f4a58f..3b1dd2c9fd90a88f7ba1a6bc6865e97586d12f37 100644 (file)
@@ -748,11 +748,6 @@ void runMtNeverFail(std::vector<int>& nts, int n) {
   }
 }
 
-// All the never_fail tests are for the non-dynamic version only.
-// False positive for dynamic version. Some writeIfNotFull() and
-// tryWriteUntil() operations may fail in transient conditions related
-// to expansion.
-
 TEST(MPMCQueue, mt_never_fail) {
   std::vector<int> nts {1, 3, 100};
   int n = 100000;
@@ -765,6 +760,18 @@ TEST(MPMCQueue, mt_never_fail_emulated_futex) {
   runMtNeverFail<EmulatedFutexAtomic>(nts, n);
 }
 
+TEST(MPMCQueue, mt_never_fail_dynamic) {
+  std::vector<int> nts{1, 3, 100};
+  int n = 100000;
+  runMtNeverFail<std::atomic, true>(nts, n);
+}
+
+TEST(MPMCQueue, mt_never_fail_emulated_futex_dynamic) {
+  std::vector<int> nts{1, 3, 100};
+  int n = 100000;
+  runMtNeverFail<EmulatedFutexAtomic, true>(nts, n);
+}
+
 template <bool Dynamic = false>
 void runMtNeverFailDeterministic(std::vector<int>& nts, int n, long seed) {
   LOG(INFO) << "using seed " << seed;
@@ -787,6 +794,13 @@ TEST(MPMCQueue, mt_never_fail_deterministic) {
   runMtNeverFailDeterministic(nts, n, seed);
 }
 
+TEST(MPMCQueue, mt_never_fail_deterministic_dynamic) {
+  std::vector<int> nts{3, 10};
+  long seed = 0; // nowMicro() % 10000;
+  int n = 1000;
+  runMtNeverFailDeterministic<true>(nts, n, seed);
+}
+
 template <class Clock, template <typename> class Atom, bool Dynamic>
 void runNeverFailUntilThread(int numThreads,
                              int n, /*numOps*/
@@ -851,6 +865,12 @@ TEST(MPMCQueue, mt_never_fail_until_system) {
   runMtNeverFailUntilSystem(nts, n);
 }
 
+TEST(MPMCQueue, mt_never_fail_until_system_dynamic) {
+  std::vector<int> nts{1, 3, 100};
+  int n = 100000;
+  runMtNeverFailUntilSystem<true>(nts, n);
+}
+
 template <bool Dynamic = false>
 void runMtNeverFailUntilSteady(std::vector<int>& nts, int n) {
   for (int nt : nts) {
@@ -867,6 +887,12 @@ TEST(MPMCQueue, mt_never_fail_until_steady) {
   runMtNeverFailUntilSteady(nts, n);
 }
 
+TEST(MPMCQueue, mt_never_fail_until_steady_dynamic) {
+  std::vector<int> nts{1, 3, 100};
+  int n = 100000;
+  runMtNeverFailUntilSteady<true>(nts, n);
+}
+
 enum LifecycleEvent {
   NOTHING = -1,
   DEFAULT_CONSTRUCTOR,
@@ -1213,7 +1239,7 @@ TEST(MPMCQueue, try_write_until_timeout) {
   testTimeout<false>(queue);
 }
 
-TEST(MPMCQueue, must_fail_try_write_until_dynamic) {
-  folly::MPMCQueue<int, std::atomic, true> queue(200, 1, 2);
+TEST(MPMCQueue, try_write_until_timeout_dynamic) {
+  folly::MPMCQueue<int, std::atomic, true> queue(1);
   testTimeout<true>(queue);
 }