UnboundedQueue: Fix advanceHead
[folly.git] / folly / concurrency / UnboundedQueue.h
index 1ff19ed2ffed182a64f63c8c8db06a4420218f6e..77bc5b9fa7d005dfb5f022608bffd1685fbfa115 100644 (file)
@@ -1,5 +1,5 @@
 /*
- * Copyright 2017 Facebook, Inc.
+ * Copyright 2017-present Facebook, Inc.
  *
  * Licensed under the Apache License, Version 2.0 (the "License");
  * you may not use this file except in compliance with the License.
 #include <chrono>
 #include <memory>
 
+#include <glog/logging.h>
+
 #include <folly/concurrency/CacheLocality.h>
+#include <folly/experimental/hazptr/hazptr.h>
+#include <folly/synchronization/SaturatingSemaphore.h>
 
 namespace folly {
 
@@ -39,9 +43,12 @@ namespace folly {
 /// - SingleConsumer: true if there can be only one consumer at a
 ///   time.
 /// - MayBlock: true if consumers may block, false if they only
-///   spins. A performance tuning parameter.
+///   spin. A performance tuning parameter.
 /// - LgSegmentSize (default 8): Log base 2 of number of elements per
 ///   segment. A performance tuning parameter. See below.
+/// - LgAlign (default 7): Log base 2 of alignment directive; can be
+///   used to balance scalability (avoidance of false sharing) with
+///   memory efficiency.
 ///
 /// When to use UnboundedQueue:
 /// - If a small bound may lead to deadlock or performance degradation
@@ -56,10 +63,10 @@ namespace folly {
 ///   SPSC) ProducerConsumerQueue.
 ///
 /// Template Aliases:
-///   USPSCQueue<T, MayBlock, LgSegmentSize>
-///   UMPSCQueue<T, MayBlock, LgSegmentSize>
-///   USPMCQueue<T, MayBlock, LgSegmentSize>
-///   UMPMCQueue<T, MayBlock, LgSegmentSize>
+///   USPSCQueue<T, MayBlock, LgSegmentSize, LgAlign>
+///   UMPSCQueue<T, MayBlock, LgSegmentSize, LgAlign>
+///   USPMCQueue<T, MayBlock, LgSegmentSize, LgAlign>
+///   UMPMCQueue<T, MayBlock, LgSegmentSize, LgAlign>
 ///
 /// Functions:
 ///   Producer operations never wait or fail (unless OOM)
@@ -72,16 +79,16 @@ namespace folly {
 ///         Extracts an element from the front of the queue. Waits
 ///         until an element is available if needed.
 ///     bool try_dequeue(T&);
-///         Tries to extracts an element from the front of the queue
+///         Tries to extract an element from the front of the queue
 ///         if available. Returns true if successful, false otherwise.
 ///     bool try_dequeue_until(T&, time_point& deadline);
-///         Tries to extracts an element from the front of the queue
+///         Tries to extract an element from the front of the queue
 ///         if available until the specified deadline.  Returns true
 ///         if successful, false otherwise.
 ///     bool try_dequeue_for(T&, duration&);
-///         Tries to extracts an element from the front of the queue
-///         if available for for the specified duration.  Returns true
-///         if successful, false otherwise.
+///         Tries to extract an element from the front of the queue if
+///         available for until the expiration of the specified
+///         duration.  Returns true if successful, false otherwise.
 ///
 ///   Secondary functions:
 ///     size_t size();
@@ -119,7 +126,7 @@ namespace folly {
 ///   exactly once.
 /// - Each entry is composed of a futex and a single element.
 /// - The queue contains two 64-bit ticket variables. The producer
-///   ticket counts the number of producer tickets isued so far, and
+///   ticket counts the number of producer tickets issued so far, and
 ///   the same for the consumer ticket. Each ticket number corresponds
 ///   to a specific entry in a specific segment.
 /// - The queue maintains two pointers, head and tail. Head points to
@@ -143,15 +150,17 @@ namespace folly {
 ///   one or two more segment than fits its contents.
 /// - Removed segments are not reclaimed until there are no threads,
 ///   producers or consumers, have references to them or their
-///   predessors. That is, a lagging thread may delay the reclamation
+///   predecessors. That is, a lagging thread may delay the reclamation
 ///   of a chain of removed segments.
+/// - The template parameter LgAlign can be used to reduce memory usage
+///   at the cost of increased chance of false sharing.
 ///
 /// Performance considerations:
 /// - All operations take constant time, excluding the costs of
-///   allocation, reclamation, interence from other threads, and
+///   allocation, reclamation, interference from other threads, and
 ///   waiting for actions by other threads.
 /// - In general, using the single producer and or single consumer
-///   variants yields better performance than the MP and MC
+///   variants yield better performance than the MP and MC
 ///   alternatives.
 /// - SPSC without blocking is the fastest configuration. It doesn't
 ///   include any read-modify-write atomic operations, full fences, or
@@ -160,7 +169,7 @@ namespace folly {
 /// - MC adds a fetch_add or compare_exchange to the critical path of
 ///   each consumer operation.
 /// - The possibility of consumers blocking, even if they never do,
-///   adds a compare_exchange to the crtical path of each producer
+///   adds a compare_exchange to the critical path of each producer
 ///   operation.
 /// - MPMC, SPMC, MPSC require the use of a deferred reclamation
 ///   mechanism to guarantee that segments removed from the linked
@@ -175,11 +184,11 @@ namespace folly {
 /// - Another consideration is that the queue is guaranteed to have
 ///   enough space for a number of consumers equal to 2^LgSegmentSize
 ///   for local blocking. Excess waiting consumers spin.
-/// - It is recommended to measure perforamnce with different variants
+/// - It is recommended to measure performance with different variants
 ///   when applicable, e.g., UMPMC vs UMPSC. Depending on the use
 ///   case, sometimes the variant with the higher sequential overhead
 ///   may yield better results due to, for example, more favorable
-///   producer-consumer balance or favorable timining for avoiding
+///   producer-consumer balance or favorable timing for avoiding
 ///   costly blocking.
 
 template <
@@ -188,6 +197,7 @@ template <
     bool SingleConsumer,
     bool MayBlock,
     size_t LgSegmentSize = 8,
+    size_t LgAlign = 7,
     template <typename> class Atom = std::atomic>
 class UnboundedQueue {
   using Ticket = uint64_t;
@@ -195,25 +205,47 @@ class UnboundedQueue {
   class Segment;
 
   static constexpr bool SPSC = SingleProducer && SingleConsumer;
-  static constexpr size_t SegmentSize = 1 << LgSegmentSize;
   static constexpr size_t Stride = SPSC || (LgSegmentSize <= 1) ? 1 : 27;
+  static constexpr size_t SegmentSize = 1u << LgSegmentSize;
+  static constexpr size_t Align = 1u << LgAlign;
 
   static_assert(
       std::is_nothrow_destructible<T>::value,
       "T must be nothrow_destructible");
   static_assert((Stride & 1) == 1, "Stride must be odd");
   static_assert(LgSegmentSize < 32, "LgSegmentSize must be < 32");
+  static_assert(LgAlign < 16, "LgAlign must be < 16");
+
+  struct Consumer {
+    Atom<Segment*> head;
+    Atom<Ticket> ticket;
+  };
+  struct Producer {
+    Atom<Segment*> tail;
+    Atom<Ticket> ticket;
+  };
 
-  FOLLY_ALIGN_TO_AVOID_FALSE_SHARING
-  Atom<Segment*> head_;
-  Atom<Ticket> consumerTicket_;
-  FOLLY_ALIGN_TO_AVOID_FALSE_SHARING
-  Atom<Segment*> tail_;
-  Atom<Ticket> producerTicket_;
+  alignas(Align) Consumer c_;
+  alignas(Align) Producer p_;
 
  public:
-  UnboundedQueue();
-  ~UnboundedQueue();
+  /** constructor */
+  UnboundedQueue() {
+    setProducerTicket(0);
+    setConsumerTicket(0);
+    Segment* s = new Segment(0);
+    setTail(s);
+    setHead(s);
+  }
+
+  /** destructor */
+  ~UnboundedQueue() {
+    Segment* next;
+    for (Segment* s = head(); s; s = next) {
+      next = s->nextSegment();
+      reclaimSegment(s);
+    }
+  }
 
   /** enqueue */
   FOLLY_ALWAYS_INLINE void enqueue(const T& arg) {
@@ -225,26 +257,32 @@ class UnboundedQueue {
   }
 
   /** dequeue */
-  void dequeue(T& item) noexcept;
+  FOLLY_ALWAYS_INLINE void dequeue(T& item) noexcept {
+    dequeueImpl(item);
+  }
 
   /** try_dequeue */
-  bool try_dequeue(T& item) noexcept {
-    return try_dequeue_until(
-        item, std::chrono::steady_clock::time_point::min());
+  FOLLY_ALWAYS_INLINE bool try_dequeue(T& item) noexcept {
+    return tryDequeueUntil(item, std::chrono::steady_clock::time_point::min());
   }
 
   /** try_dequeue_until */
   template <typename Clock, typename Duration>
-  bool try_dequeue_until(
+  FOLLY_ALWAYS_INLINE bool try_dequeue_until(
       T& item,
-      const std::chrono::time_point<Clock, Duration>& deadline) noexcept;
+      const std::chrono::time_point<Clock, Duration>& deadline) noexcept {
+    return tryDequeueUntil(item, deadline);
+  }
 
   /** try_dequeue_for */
   template <typename Rep, typename Period>
-  bool try_dequeue_for(
+  FOLLY_ALWAYS_INLINE bool try_dequeue_for(
       T& item,
       const std::chrono::duration<Rep, Period>& duration) noexcept {
-    return try_dequeue_until(item, std::chrono::steady_clock::now() + duration);
+    if (LIKELY(try_dequeue(item))) {
+      return true;
+    }
+    return tryDequeueUntil(item, std::chrono::steady_clock::now() + duration);
   }
 
   /** size */
@@ -262,33 +300,225 @@ class UnboundedQueue {
   }
 
  private:
+  /** enqueueImpl */
   template <typename Arg>
-  void enqueueImpl(Arg&& arg);
+  FOLLY_ALWAYS_INLINE void enqueueImpl(Arg&& arg) {
+    if (SPSC) {
+      Segment* s = tail();
+      enqueueCommon(s, std::forward<Arg>(arg));
+    } else {
+      // Using hazptr_holder instead of hazptr_local because it is
+      // possible that the T ctor happens to use hazard pointers.
+      folly::hazptr::hazptr_holder hptr;
+      Segment* s = hptr.get_protected(p_.tail);
+      enqueueCommon(s, std::forward<Arg>(arg));
+    }
+  }
 
+  /** enqueueCommon */
   template <typename Arg>
-  void enqueueCommon(Segment* s, Arg&& arg);
+  FOLLY_ALWAYS_INLINE void enqueueCommon(Segment* s, Arg&& arg) {
+    Ticket t = fetchIncrementProducerTicket();
+    if (!SingleProducer) {
+      s = findSegment(s, t);
+    }
+    DCHECK_GE(t, s->minTicket());
+    DCHECK_LT(t, s->minTicket() + SegmentSize);
+    size_t idx = index(t);
+    Entry& e = s->entry(idx);
+    e.putItem(std::forward<Arg>(arg));
+    if (responsibleForAlloc(t)) {
+      allocNextSegment(s, t + SegmentSize);
+    }
+    if (responsibleForAdvance(t)) {
+      advanceTail(s);
+    }
+  }
+
+  /** dequeueImpl */
+  FOLLY_ALWAYS_INLINE void dequeueImpl(T& item) noexcept {
+    if (SPSC) {
+      Segment* s = head();
+      dequeueCommon(s, item);
+    } else {
+      // Using hazptr_holder instead of hazptr_local because it is
+      // possible to call the T dtor and it may happen to use hazard
+      // pointers.
+      folly::hazptr::hazptr_holder hptr;
+      Segment* s = hptr.get_protected(c_.head);
+      dequeueCommon(s, item);
+    }
+  }
+
+  /** dequeueCommon */
+  FOLLY_ALWAYS_INLINE void dequeueCommon(Segment* s, T& item) noexcept {
+    Ticket t = fetchIncrementConsumerTicket();
+    if (!SingleConsumer) {
+      s = findSegment(s, t);
+    }
+    size_t idx = index(t);
+    Entry& e = s->entry(idx);
+    e.takeItem(item);
+    if (responsibleForAdvance(t)) {
+      advanceHead(s);
+    }
+  }
 
-  void dequeueCommon(Segment* s, T& item) noexcept;
+  /** tryDequeueUntil */
+  template <typename Clock, typename Duration>
+  FOLLY_ALWAYS_INLINE bool tryDequeueUntil(
+      T& item,
+      const std::chrono::time_point<Clock, Duration>& deadline) noexcept {
+    if (SingleConsumer) {
+      Segment* s = head();
+      return tryDequeueUntilSC(s, item, deadline);
+    } else {
+      // Using hazptr_holder instead of hazptr_local because it is
+      // possible to call ~T() and it may happen to use hazard pointers.
+      folly::hazptr::hazptr_holder hptr;
+      Segment* s = hptr.get_protected(c_.head);
+      return ryDequeueUntilMC(s, item, deadline);
+    }
+  }
 
+  /** ryDequeueUntilSC */
   template <typename Clock, typename Duration>
-  bool singleConsumerTryDequeueUntil(
+  FOLLY_ALWAYS_INLINE bool tryDequeueUntilSC(
       Segment* s,
       T& item,
-      const std::chrono::time_point<Clock, Duration>& deadline) noexcept;
+      const std::chrono::time_point<Clock, Duration>& deadline) noexcept {
+    Ticket t = consumerTicket();
+    DCHECK_GE(t, s->minTicket());
+    DCHECK_LT(t, (s->minTicket() + SegmentSize));
+    size_t idx = index(t);
+    Entry& e = s->entry(idx);
+    if (!e.tryWaitUntil(deadline)) {
+      return false;
+    }
+    setConsumerTicket(t + 1);
+    e.takeItem(item);
+    if (responsibleForAdvance(t)) {
+      advanceHead(s);
+    }
+    return true;
+  }
 
+  /** tryDequeueUntilMC */
   template <typename Clock, typename Duration>
-  bool multiConsumerTryDequeueUntil(
+  FOLLY_ALWAYS_INLINE bool ryDequeueUntilMC(
       Segment* s,
       T& item,
-      const std::chrono::time_point<Clock, Duration>& deadline) noexcept;
+      const std::chrono::time_point<Clock, Duration>& deadline) noexcept {
+    while (true) {
+      Ticket t = consumerTicket();
+      if (UNLIKELY(t >= (s->minTicket() + SegmentSize))) {
+        s = tryGetNextSegmentUntil(s, deadline);
+        if (s == nullptr) {
+          return false; // timed out
+        }
+        continue;
+      }
+      size_t idx = index(t);
+      Entry& e = s->entry(idx);
+      if (!e.tryWaitUntil(deadline)) {
+        return false;
+      }
+      if (!c_.ticket.compare_exchange_weak(
+              t, t + 1, std::memory_order_acq_rel, std::memory_order_acquire)) {
+        continue;
+      }
+      e.takeItem(item);
+      if (responsibleForAdvance(t)) {
+        advanceHead(s);
+      }
+      return true;
+    }
+  }
+
+  /** findSegment */
+  FOLLY_ALWAYS_INLINE
+  Segment* findSegment(Segment* s, const Ticket t) const noexcept {
+    while (UNLIKELY(t >= (s->minTicket() + SegmentSize))) {
+      auto deadline = std::chrono::steady_clock::time_point::max();
+      s = tryGetNextSegmentUntil(s, deadline);
+      DCHECK(s != nullptr);
+    }
+    return s;
+  }
 
-  Segment* findSegment(Segment* s, const Ticket t) const noexcept;
+  /** tryGetNextSegmentUntil */
+  template <typename Clock, typename Duration>
+  Segment* tryGetNextSegmentUntil(
+      Segment* s,
+      const std::chrono::time_point<Clock, Duration>& deadline) const noexcept {
+    // The following loop will not spin indefinitely (as long as the
+    // number of concurrently waiting consumers does not exceeds
+    // SegmentSize and the OS scheduler does not pause ready threads
+    // indefinitely). Under such conditions, the algorithm guarantees
+    // that the producer reponsible for advancing the tail pointer to
+    // the next segment has already acquired its ticket.
+    while (tail() == s) {
+      if (deadline < Clock::time_point::max() && deadline > Clock::now()) {
+        return nullptr;
+      }
+      asm_volatile_pause();
+    }
+    Segment* next = s->nextSegment();
+    DCHECK(next != nullptr);
+    return next;
+  }
 
-  void allocNextSegment(Segment* s, const Ticket t);
+  /** allocNextSegment */
+  void allocNextSegment(Segment* s, const Ticket t) {
+    Segment* next = new Segment(t);
+    if (!SPSC) {
+      next->acquire_ref_safe(); // hazptr
+    }
+    DCHECK(s->nextSegment() == nullptr);
+    s->setNextSegment(next);
+  }
+
+  /** advanceTail */
+  void advanceTail(Segment* s) noexcept {
+    Segment* next = s->nextSegment();
+    if (!SingleProducer) {
+      // The following loop will not spin indefinitely (as long as the
+      // OS scheduler does not pause ready threads indefinitely). The
+      // algorithm guarantees that the producer reponsible for setting
+      // the next pointer has already acquired its ticket.
+      while (next == nullptr) {
+        asm_volatile_pause();
+        next = s->nextSegment();
+      }
+    }
+    DCHECK(next != nullptr);
+    setTail(next);
+  }
 
-  void advanceTail(Segment* s) noexcept;
+  /** advanceHead */
+  void advanceHead(Segment* s) noexcept {
+    auto deadline = std::chrono::steady_clock::time_point::max();
+    Segment* next = tryGetNextSegmentUntil(s, deadline);
+    DCHECK(next != nullptr);
+    while (head() != s) {
+      // Wait for head to advance to the current segment first before
+      // advancing head to the next segment. Otherwise, a lagging
+      // consumer responsible for advancing head from an earlier
+      // segment may incorrectly set head back.
+      asm_volatile_pause();
+    }
+    setHead(next);
+    reclaimSegment(s);
+  }
 
-  void advanceHead(Segment* s) noexcept;
+  /** reclaimSegment */
+  void reclaimSegment(Segment* s) noexcept {
+    if (SPSC) {
+      delete s;
+    } else {
+      s->retire(); // hazptr
+    }
+  }
 
   FOLLY_ALWAYS_INLINE size_t index(Ticket t) const noexcept {
     return (t * Stride) & (SegmentSize - 1);
@@ -303,56 +533,144 @@ class UnboundedQueue {
   }
 
   FOLLY_ALWAYS_INLINE Segment* head() const noexcept {
-    return head_.load(std::memory_order_acquire);
+    return c_.head.load(std::memory_order_acquire);
   }
 
   FOLLY_ALWAYS_INLINE Segment* tail() const noexcept {
-    return tail_.load(std::memory_order_acquire);
+    return p_.tail.load(std::memory_order_acquire);
   }
 
   FOLLY_ALWAYS_INLINE Ticket producerTicket() const noexcept {
-    return producerTicket_.load(std::memory_order_acquire);
+    return p_.ticket.load(std::memory_order_acquire);
   }
 
   FOLLY_ALWAYS_INLINE Ticket consumerTicket() const noexcept {
-    return consumerTicket_.load(std::memory_order_acquire);
+    return c_.ticket.load(std::memory_order_acquire);
   }
 
   void setHead(Segment* s) noexcept {
-    head_.store(s, std::memory_order_release);
+    c_.head.store(s, std::memory_order_release);
   }
 
   void setTail(Segment* s) noexcept {
-    tail_.store(s, std::memory_order_release);
+    p_.tail.store(s, std::memory_order_release);
   }
 
   FOLLY_ALWAYS_INLINE void setProducerTicket(Ticket t) noexcept {
-    producerTicket_.store(t, std::memory_order_release);
+    p_.ticket.store(t, std::memory_order_release);
   }
 
   FOLLY_ALWAYS_INLINE void setConsumerTicket(Ticket t) noexcept {
-    consumerTicket_.store(t, std::memory_order_release);
+    c_.ticket.store(t, std::memory_order_release);
   }
 
   FOLLY_ALWAYS_INLINE Ticket fetchIncrementConsumerTicket() noexcept {
     if (SingleConsumer) {
-      auto oldval = consumerTicket();
+      Ticket oldval = consumerTicket();
       setConsumerTicket(oldval + 1);
       return oldval;
     } else { // MC
-      return consumerTicket_.fetch_add(1, std::memory_order_acq_rel);
+      return c_.ticket.fetch_add(1, std::memory_order_acq_rel);
     }
   }
 
   FOLLY_ALWAYS_INLINE Ticket fetchIncrementProducerTicket() noexcept {
     if (SingleProducer) {
-      auto oldval = producerTicket();
+      Ticket oldval = producerTicket();
       setProducerTicket(oldval + 1);
       return oldval;
     } else { // MP
-      return producerTicket_.fetch_add(1, std::memory_order_acq_rel);
+      return p_.ticket.fetch_add(1, std::memory_order_acq_rel);
     }
   }
+
+  /**
+   *  Entry
+   */
+  class Entry {
+    folly::SaturatingSemaphore<MayBlock, Atom> flag_;
+    typename std::aligned_storage<sizeof(T), alignof(T)>::type item_;
+
+   public:
+    template <typename Arg>
+    FOLLY_ALWAYS_INLINE void putItem(Arg&& arg) {
+      new (&item_) T(std::forward<Arg>(arg));
+      flag_.post();
+    }
+
+    FOLLY_ALWAYS_INLINE void takeItem(T& item) noexcept {
+      flag_.wait();
+      getItem(item);
+    }
+
+    template <typename Clock, typename Duration>
+    FOLLY_ALWAYS_INLINE bool tryWaitUntil(
+        const std::chrono::time_point<Clock, Duration>& deadline) noexcept {
+      return flag_.try_wait_until(deadline);
+    }
+
+   private:
+    FOLLY_ALWAYS_INLINE void getItem(T& item) noexcept {
+      item = std::move(*(itemPtr()));
+      destroyItem();
+    }
+
+    FOLLY_ALWAYS_INLINE T* itemPtr() noexcept {
+      return static_cast<T*>(static_cast<void*>(&item_));
+    }
+
+    FOLLY_ALWAYS_INLINE void destroyItem() noexcept {
+      itemPtr()->~T();
+    }
+  }; // Entry
+
+  /**
+   *  Segment
+   */
+  class Segment : public folly::hazptr::hazptr_obj_base_refcounted<Segment> {
+    Atom<Segment*> next_;
+    const Ticket min_;
+    bool marked_; // used for iterative deletion
+    FOLLY_ALIGNED(Align)
+    Entry b_[SegmentSize];
+
+   public:
+    explicit Segment(const Ticket t)
+        : next_(nullptr), min_(t), marked_(false) {}
+
+    ~Segment() {
+      if (!SPSC && !marked_) {
+        Segment* next = nextSegment();
+        while (next) {
+          if (!next->release_ref()) { // hazptr
+            return;
+          }
+          Segment* s = next;
+          next = s->nextSegment();
+          s->marked_ = true;
+          delete s;
+        }
+      }
+    }
+
+    Segment* nextSegment() const noexcept {
+      return next_.load(std::memory_order_acquire);
+    }
+
+    void setNextSegment(Segment* s) noexcept {
+      next_.store(s, std::memory_order_release);
+    }
+
+    FOLLY_ALWAYS_INLINE Ticket minTicket() const noexcept {
+      DCHECK_EQ((min_ & (SegmentSize - 1)), 0);
+      return min_;
+    }
+
+    FOLLY_ALWAYS_INLINE Entry& entry(size_t index) noexcept {
+      return b_[index];
+    }
+  }; // Segment
+
 }; // UnboundedQueue
 
 /* Aliases */
@@ -361,33 +679,36 @@ template <
     typename T,
     bool MayBlock,
     size_t LgSegmentSize = 8,
+    size_t LgAlign = 7,
     template <typename> class Atom = std::atomic>
-using USPSCQueue = UnboundedQueue<T, true, true, MayBlock, LgSegmentSize, Atom>;
+using USPSCQueue =
+    UnboundedQueue<T, true, true, MayBlock, LgSegmentSize, LgAlign, Atom>;
 
 template <
     typename T,
     bool MayBlock,
     size_t LgSegmentSize = 8,
+    size_t LgAlign = 7,
     template <typename> class Atom = std::atomic>
 using UMPSCQueue =
-    UnboundedQueue<T, false, true, MayBlock, LgSegmentSize, Atom>;
+    UnboundedQueue<T, false, true, MayBlock, LgSegmentSize, LgAlign, Atom>;
 
 template <
     typename T,
     bool MayBlock,
     size_t LgSegmentSize = 8,
+    size_t LgAlign = 7,
     template <typename> class Atom = std::atomic>
 using USPMCQueue =
-    UnboundedQueue<T, true, false, MayBlock, LgSegmentSize, Atom>;
+    UnboundedQueue<T, true, false, MayBlock, LgSegmentSize, LgAlign, Atom>;
 
 template <
     typename T,
     bool MayBlock,
     size_t LgSegmentSize = 8,
+    size_t LgAlign = 7,
     template <typename> class Atom = std::atomic>
 using UMPMCQueue =
-    UnboundedQueue<T, false, false, MayBlock, LgSegmentSize, Atom>;
+    UnboundedQueue<T, false, false, MayBlock, LgSegmentSize, LgAlign, Atom>;
 
 } // namespace folly
-
-#include <folly/concurrency/UnboundedQueue-inl.h>