UnboundedQueue: Use hazptr_obj_batch
[folly.git] / folly / concurrency / UnboundedQueue.h
index 2f2571cc8ecab14c2c9e9f2a722ed18485ea63a5..1947cdae210e1b61b81b6038b33576255a2fd59b 100644 (file)
@@ -43,7 +43,7 @@ namespace folly {
 /// - SingleConsumer: true if there can be only one consumer at a
 ///   time.
 /// - MayBlock: true if consumers may block, false if they only
-///   spins. A performance tuning parameter.
+///   spin. A performance tuning parameter.
 /// - LgSegmentSize (default 8): Log base 2 of number of elements per
 ///   segment. A performance tuning parameter. See below.
 /// - LgAlign (default 7): Log base 2 of alignment directive; can be
@@ -79,16 +79,16 @@ namespace folly {
 ///         Extracts an element from the front of the queue. Waits
 ///         until an element is available if needed.
 ///     bool try_dequeue(T&);
-///         Tries to extracts an element from the front of the queue
+///         Tries to extract an element from the front of the queue
 ///         if available. Returns true if successful, false otherwise.
 ///     bool try_dequeue_until(T&, time_point& deadline);
-///         Tries to extracts an element from the front of the queue
+///         Tries to extract an element from the front of the queue
 ///         if available until the specified deadline.  Returns true
 ///         if successful, false otherwise.
 ///     bool try_dequeue_for(T&, duration&);
-///         Tries to extracts an element from the front of the queue
-///         if available for for the specified duration.  Returns true
-///         if successful, false otherwise.
+///         Tries to extract an element from the front of the queue if
+///         available for until the expiration of the specified
+///         duration.  Returns true if successful, false otherwise.
 ///
 ///   Secondary functions:
 ///     size_t size();
@@ -126,7 +126,7 @@ namespace folly {
 ///   exactly once.
 /// - Each entry is composed of a futex and a single element.
 /// - The queue contains two 64-bit ticket variables. The producer
-///   ticket counts the number of producer tickets isued so far, and
+///   ticket counts the number of producer tickets issued so far, and
 ///   the same for the consumer ticket. Each ticket number corresponds
 ///   to a specific entry in a specific segment.
 /// - The queue maintains two pointers, head and tail. Head points to
@@ -150,17 +150,17 @@ namespace folly {
 ///   one or two more segment than fits its contents.
 /// - Removed segments are not reclaimed until there are no threads,
 ///   producers or consumers, have references to them or their
-///   predessors. That is, a lagging thread may delay the reclamation
+///   predecessors. That is, a lagging thread may delay the reclamation
 ///   of a chain of removed segments.
 /// - The template parameter LgAlign can be used to reduce memory usage
 ///   at the cost of increased chance of false sharing.
 ///
 /// Performance considerations:
 /// - All operations take constant time, excluding the costs of
-///   allocation, reclamation, interence from other threads, and
+///   allocation, reclamation, interference from other threads, and
 ///   waiting for actions by other threads.
 /// - In general, using the single producer and or single consumer
-///   variants yields better performance than the MP and MC
+///   variants yield better performance than the MP and MC
 ///   alternatives.
 /// - SPSC without blocking is the fastest configuration. It doesn't
 ///   include any read-modify-write atomic operations, full fences, or
@@ -169,7 +169,7 @@ namespace folly {
 /// - MC adds a fetch_add or compare_exchange to the critical path of
 ///   each consumer operation.
 /// - The possibility of consumers blocking, even if they never do,
-///   adds a compare_exchange to the crtical path of each producer
+///   adds a compare_exchange to the critical path of each producer
 ///   operation.
 /// - MPMC, SPMC, MPSC require the use of a deferred reclamation
 ///   mechanism to guarantee that segments removed from the linked
@@ -184,11 +184,11 @@ namespace folly {
 /// - Another consideration is that the queue is guaranteed to have
 ///   enough space for a number of consumers equal to 2^LgSegmentSize
 ///   for local blocking. Excess waiting consumers spin.
-/// - It is recommended to measure perforamnce with different variants
+/// - It is recommended to measure performance with different variants
 ///   when applicable, e.g., UMPMC vs UMPSC. Depending on the use
 ///   case, sometimes the variant with the higher sequential overhead
 ///   may yield better results due to, for example, more favorable
-///   producer-consumer balance or favorable timining for avoiding
+///   producer-consumer balance or favorable timing for avoiding
 ///   costly blocking.
 
 template <
@@ -216,10 +216,18 @@ class UnboundedQueue {
   static_assert(LgSegmentSize < 32, "LgSegmentSize must be < 32");
   static_assert(LgAlign < 16, "LgAlign must be < 16");
 
-  alignas(Align) Atom<Segment*> head_;
-  Atom<Ticket> consumerTicket_;
-  alignas(Align) Atom<Segment*> tail_;
-  Atom<Ticket> producerTicket_;
+  struct Consumer {
+    Atom<Segment*> head;
+    Atom<Ticket> ticket;
+    folly::hazptr::hazptr_obj_batch batch;
+  };
+  struct Producer {
+    Atom<Segment*> tail;
+    Atom<Ticket> ticket;
+  };
+
+  alignas(Align) Consumer c_;
+  alignas(Align) Producer p_;
 
  public:
   /** constructor */
@@ -303,7 +311,7 @@ class UnboundedQueue {
       // Using hazptr_holder instead of hazptr_local because it is
       // possible that the T ctor happens to use hazard pointers.
       folly::hazptr::hazptr_holder hptr;
-      Segment* s = hptr.get_protected(tail_);
+      Segment* s = hptr.get_protected(p_.tail);
       enqueueCommon(s, std::forward<Arg>(arg));
     }
   }
@@ -338,7 +346,7 @@ class UnboundedQueue {
       // possible to call the T dtor and it may happen to use hazard
       // pointers.
       folly::hazptr::hazptr_holder hptr;
-      Segment* s = hptr.get_protected(head_);
+      Segment* s = hptr.get_protected(c_.head);
       dequeueCommon(s, item);
     }
   }
@@ -369,12 +377,12 @@ class UnboundedQueue {
       // Using hazptr_holder instead of hazptr_local because it is
       // possible to call ~T() and it may happen to use hazard pointers.
       folly::hazptr::hazptr_holder hptr;
-      Segment* s = hptr.get_protected(head_);
-      return ryDequeueUntilMC(s, item, deadline);
+      Segment* s = hptr.get_protected(c_.head);
+      return tryDequeueUntilMC(s, item, deadline);
     }
   }
 
-  /** ryDequeueUntilSC */
+  /** tryDequeueUntilSC */
   template <typename Clock, typename Duration>
   FOLLY_ALWAYS_INLINE bool tryDequeueUntilSC(
       Segment* s,
@@ -385,7 +393,7 @@ class UnboundedQueue {
     DCHECK_LT(t, (s->minTicket() + SegmentSize));
     size_t idx = index(t);
     Entry& e = s->entry(idx);
-    if (!e.tryWaitUntil(deadline)) {
+    if (UNLIKELY(!tryDequeueWaitElem(e, t, deadline))) {
       return false;
     }
     setConsumerTicket(t + 1);
@@ -398,7 +406,7 @@ class UnboundedQueue {
 
   /** tryDequeueUntilMC */
   template <typename Clock, typename Duration>
-  FOLLY_ALWAYS_INLINE bool ryDequeueUntilMC(
+  FOLLY_ALWAYS_INLINE bool tryDequeueUntilMC(
       Segment* s,
       T& item,
       const std::chrono::time_point<Clock, Duration>& deadline) noexcept {
@@ -413,10 +421,10 @@ class UnboundedQueue {
       }
       size_t idx = index(t);
       Entry& e = s->entry(idx);
-      if (!e.tryWaitUntil(deadline)) {
+      if (UNLIKELY(!tryDequeueWaitElem(e, t, deadline))) {
         return false;
       }
-      if (!consumerTicket_.compare_exchange_weak(
+      if (!c_.ticket.compare_exchange_weak(
               t, t + 1, std::memory_order_acq_rel, std::memory_order_acquire)) {
         continue;
       }
@@ -428,6 +436,23 @@ class UnboundedQueue {
     }
   }
 
+  /** tryDequeueWaitElem */
+  template <typename Clock, typename Duration>
+  FOLLY_ALWAYS_INLINE bool tryDequeueWaitElem(
+      Entry& e,
+      Ticket t,
+      const std::chrono::time_point<Clock, Duration>& deadline) noexcept {
+    while (true) {
+      if (LIKELY(e.tryWaitUntil(deadline))) {
+        return true;
+      }
+      if (t >= producerTicket()) {
+        return false;
+      }
+      asm_volatile_pause();
+    }
+  }
+
   /** findSegment */
   FOLLY_ALWAYS_INLINE
   Segment* findSegment(Segment* s, const Ticket t) const noexcept {
@@ -493,8 +518,29 @@ class UnboundedQueue {
     auto deadline = std::chrono::steady_clock::time_point::max();
     Segment* next = tryGetNextSegmentUntil(s, deadline);
     DCHECK(next != nullptr);
+    while (head() != s) {
+      // Wait for head to advance to the current segment first before
+      // advancing head to the next segment. Otherwise, a lagging
+      // consumer responsible for advancing head from an earlier
+      // segment may incorrectly set head back.
+      asm_volatile_pause();
+    }
+    /* ***IMPORTANT*** prepReclaimSegment() must be called after
+     * confirming that head() is up-to-date and before calling
+     * setHead() to be thread-safe. */
+    /* ***IMPORTANT*** Segment s cannot be retired before the call to
+     * setHead(s). This is why prep_retire_refcounted(), which is
+     * called by prepReclaimSegment() does not retire objects, it
+     * merely adds the object to the batch and returns a private batch
+     * structure of a list of objects that can be retired later, if
+     * there are enough objects for amortizing the cost of updating
+     * the domain structure. */
+    auto res = prepReclaimSegment(s);
     setHead(next);
-    reclaimSegment(s);
+    /* Now it is safe to retire s. */
+    /* ***IMPORTANT*** The destructor of res automatically calls
+     * retire_all(), which retires to the domain any objects moved to
+     * res from batch in the call to prepReclaimSegment(). */
   }
 
   /** reclaimSegment */
@@ -506,6 +552,17 @@ class UnboundedQueue {
     }
   }
 
+  /** prepReclaimSegment */
+  folly::hazptr::hazptr_obj_batch prepReclaimSegment(Segment* s) noexcept {
+    if (SPSC) {
+      delete s;
+      /*Return an empty result; nothing more to do for this segment */
+      return folly::hazptr::hazptr_obj_batch();
+    } else {
+      return c_.batch.prep_retire_refcounted(s);
+    }
+  }
+
   FOLLY_ALWAYS_INLINE size_t index(Ticket t) const noexcept {
     return (t * Stride) & (SegmentSize - 1);
   }
@@ -519,35 +576,35 @@ class UnboundedQueue {
   }
 
   FOLLY_ALWAYS_INLINE Segment* head() const noexcept {
-    return head_.load(std::memory_order_acquire);
+    return c_.head.load(std::memory_order_acquire);
   }
 
   FOLLY_ALWAYS_INLINE Segment* tail() const noexcept {
-    return tail_.load(std::memory_order_acquire);
+    return p_.tail.load(std::memory_order_acquire);
   }
 
   FOLLY_ALWAYS_INLINE Ticket producerTicket() const noexcept {
-    return producerTicket_.load(std::memory_order_acquire);
+    return p_.ticket.load(std::memory_order_acquire);
   }
 
   FOLLY_ALWAYS_INLINE Ticket consumerTicket() const noexcept {
-    return consumerTicket_.load(std::memory_order_acquire);
+    return c_.ticket.load(std::memory_order_acquire);
   }
 
   void setHead(Segment* s) noexcept {
-    head_.store(s, std::memory_order_release);
+    c_.head.store(s, std::memory_order_release);
   }
 
   void setTail(Segment* s) noexcept {
-    tail_.store(s, std::memory_order_release);
+    p_.tail.store(s, std::memory_order_release);
   }
 
   FOLLY_ALWAYS_INLINE void setProducerTicket(Ticket t) noexcept {
-    producerTicket_.store(t, std::memory_order_release);
+    p_.ticket.store(t, std::memory_order_release);
   }
 
   FOLLY_ALWAYS_INLINE void setConsumerTicket(Ticket t) noexcept {
-    consumerTicket_.store(t, std::memory_order_release);
+    c_.ticket.store(t, std::memory_order_release);
   }
 
   FOLLY_ALWAYS_INLINE Ticket fetchIncrementConsumerTicket() noexcept {
@@ -556,7 +613,7 @@ class UnboundedQueue {
       setConsumerTicket(oldval + 1);
       return oldval;
     } else { // MC
-      return consumerTicket_.fetch_add(1, std::memory_order_acq_rel);
+      return c_.ticket.fetch_add(1, std::memory_order_acq_rel);
     }
   }
 
@@ -566,7 +623,7 @@ class UnboundedQueue {
       setProducerTicket(oldval + 1);
       return oldval;
     } else { // MP
-      return producerTicket_.fetch_add(1, std::memory_order_acq_rel);
+      return p_.ticket.fetch_add(1, std::memory_order_acq_rel);
     }
   }
 
@@ -592,7 +649,10 @@ class UnboundedQueue {
     template <typename Clock, typename Duration>
     FOLLY_ALWAYS_INLINE bool tryWaitUntil(
         const std::chrono::time_point<Clock, Duration>& deadline) noexcept {
-      return flag_.try_wait_until(deadline);
+      // wait-options from benchmarks on contended queues:
+      auto const opt =
+          flag_.wait_options().spin_max(std::chrono::microseconds(10));
+      return flag_.try_wait_until(deadline, opt);
     }
 
    private:
@@ -617,8 +677,7 @@ class UnboundedQueue {
     Atom<Segment*> next_;
     const Ticket min_;
     bool marked_; // used for iterative deletion
-    FOLLY_ALIGNED(Align)
-    Entry b_[SegmentSize];
+    alignas(Align) Entry b_[SegmentSize];
 
    public:
     explicit Segment(const Ticket t)