UnboundedQueue: Use hazptr_obj_batch
[folly.git] / folly / concurrency / UnboundedQueue.h
index 7459c26329be81b9e89efa7be0feeb77199cda19..1947cdae210e1b61b81b6038b33576255a2fd59b 100644 (file)
@@ -216,10 +216,18 @@ class UnboundedQueue {
   static_assert(LgSegmentSize < 32, "LgSegmentSize must be < 32");
   static_assert(LgAlign < 16, "LgAlign must be < 16");
 
-  alignas(Align) Atom<Segment*> head_;
-  Atom<Ticket> consumerTicket_;
-  alignas(Align) Atom<Segment*> tail_;
-  Atom<Ticket> producerTicket_;
+  struct Consumer {
+    Atom<Segment*> head;
+    Atom<Ticket> ticket;
+    folly::hazptr::hazptr_obj_batch batch;
+  };
+  struct Producer {
+    Atom<Segment*> tail;
+    Atom<Ticket> ticket;
+  };
+
+  alignas(Align) Consumer c_;
+  alignas(Align) Producer p_;
 
  public:
   /** constructor */
@@ -303,7 +311,7 @@ class UnboundedQueue {
       // Using hazptr_holder instead of hazptr_local because it is
       // possible that the T ctor happens to use hazard pointers.
       folly::hazptr::hazptr_holder hptr;
-      Segment* s = hptr.get_protected(tail_);
+      Segment* s = hptr.get_protected(p_.tail);
       enqueueCommon(s, std::forward<Arg>(arg));
     }
   }
@@ -338,7 +346,7 @@ class UnboundedQueue {
       // possible to call the T dtor and it may happen to use hazard
       // pointers.
       folly::hazptr::hazptr_holder hptr;
-      Segment* s = hptr.get_protected(head_);
+      Segment* s = hptr.get_protected(c_.head);
       dequeueCommon(s, item);
     }
   }
@@ -369,12 +377,12 @@ class UnboundedQueue {
       // Using hazptr_holder instead of hazptr_local because it is
       // possible to call ~T() and it may happen to use hazard pointers.
       folly::hazptr::hazptr_holder hptr;
-      Segment* s = hptr.get_protected(head_);
-      return ryDequeueUntilMC(s, item, deadline);
+      Segment* s = hptr.get_protected(c_.head);
+      return tryDequeueUntilMC(s, item, deadline);
     }
   }
 
-  /** ryDequeueUntilSC */
+  /** tryDequeueUntilSC */
   template <typename Clock, typename Duration>
   FOLLY_ALWAYS_INLINE bool tryDequeueUntilSC(
       Segment* s,
@@ -385,7 +393,7 @@ class UnboundedQueue {
     DCHECK_LT(t, (s->minTicket() + SegmentSize));
     size_t idx = index(t);
     Entry& e = s->entry(idx);
-    if (!e.tryWaitUntil(deadline)) {
+    if (UNLIKELY(!tryDequeueWaitElem(e, t, deadline))) {
       return false;
     }
     setConsumerTicket(t + 1);
@@ -398,7 +406,7 @@ class UnboundedQueue {
 
   /** tryDequeueUntilMC */
   template <typename Clock, typename Duration>
-  FOLLY_ALWAYS_INLINE bool ryDequeueUntilMC(
+  FOLLY_ALWAYS_INLINE bool tryDequeueUntilMC(
       Segment* s,
       T& item,
       const std::chrono::time_point<Clock, Duration>& deadline) noexcept {
@@ -413,10 +421,10 @@ class UnboundedQueue {
       }
       size_t idx = index(t);
       Entry& e = s->entry(idx);
-      if (!e.tryWaitUntil(deadline)) {
+      if (UNLIKELY(!tryDequeueWaitElem(e, t, deadline))) {
         return false;
       }
-      if (!consumerTicket_.compare_exchange_weak(
+      if (!c_.ticket.compare_exchange_weak(
               t, t + 1, std::memory_order_acq_rel, std::memory_order_acquire)) {
         continue;
       }
@@ -428,6 +436,23 @@ class UnboundedQueue {
     }
   }
 
+  /** tryDequeueWaitElem */
+  template <typename Clock, typename Duration>
+  FOLLY_ALWAYS_INLINE bool tryDequeueWaitElem(
+      Entry& e,
+      Ticket t,
+      const std::chrono::time_point<Clock, Duration>& deadline) noexcept {
+    while (true) {
+      if (LIKELY(e.tryWaitUntil(deadline))) {
+        return true;
+      }
+      if (t >= producerTicket()) {
+        return false;
+      }
+      asm_volatile_pause();
+    }
+  }
+
   /** findSegment */
   FOLLY_ALWAYS_INLINE
   Segment* findSegment(Segment* s, const Ticket t) const noexcept {
@@ -493,8 +518,29 @@ class UnboundedQueue {
     auto deadline = std::chrono::steady_clock::time_point::max();
     Segment* next = tryGetNextSegmentUntil(s, deadline);
     DCHECK(next != nullptr);
+    while (head() != s) {
+      // Wait for head to advance to the current segment first before
+      // advancing head to the next segment. Otherwise, a lagging
+      // consumer responsible for advancing head from an earlier
+      // segment may incorrectly set head back.
+      asm_volatile_pause();
+    }
+    /* ***IMPORTANT*** prepReclaimSegment() must be called after
+     * confirming that head() is up-to-date and before calling
+     * setHead() to be thread-safe. */
+    /* ***IMPORTANT*** Segment s cannot be retired before the call to
+     * setHead(s). This is why prep_retire_refcounted(), which is
+     * called by prepReclaimSegment() does not retire objects, it
+     * merely adds the object to the batch and returns a private batch
+     * structure of a list of objects that can be retired later, if
+     * there are enough objects for amortizing the cost of updating
+     * the domain structure. */
+    auto res = prepReclaimSegment(s);
     setHead(next);
-    reclaimSegment(s);
+    /* Now it is safe to retire s. */
+    /* ***IMPORTANT*** The destructor of res automatically calls
+     * retire_all(), which retires to the domain any objects moved to
+     * res from batch in the call to prepReclaimSegment(). */
   }
 
   /** reclaimSegment */
@@ -506,6 +552,17 @@ class UnboundedQueue {
     }
   }
 
+  /** prepReclaimSegment */
+  folly::hazptr::hazptr_obj_batch prepReclaimSegment(Segment* s) noexcept {
+    if (SPSC) {
+      delete s;
+      /*Return an empty result; nothing more to do for this segment */
+      return folly::hazptr::hazptr_obj_batch();
+    } else {
+      return c_.batch.prep_retire_refcounted(s);
+    }
+  }
+
   FOLLY_ALWAYS_INLINE size_t index(Ticket t) const noexcept {
     return (t * Stride) & (SegmentSize - 1);
   }
@@ -519,35 +576,35 @@ class UnboundedQueue {
   }
 
   FOLLY_ALWAYS_INLINE Segment* head() const noexcept {
-    return head_.load(std::memory_order_acquire);
+    return c_.head.load(std::memory_order_acquire);
   }
 
   FOLLY_ALWAYS_INLINE Segment* tail() const noexcept {
-    return tail_.load(std::memory_order_acquire);
+    return p_.tail.load(std::memory_order_acquire);
   }
 
   FOLLY_ALWAYS_INLINE Ticket producerTicket() const noexcept {
-    return producerTicket_.load(std::memory_order_acquire);
+    return p_.ticket.load(std::memory_order_acquire);
   }
 
   FOLLY_ALWAYS_INLINE Ticket consumerTicket() const noexcept {
-    return consumerTicket_.load(std::memory_order_acquire);
+    return c_.ticket.load(std::memory_order_acquire);
   }
 
   void setHead(Segment* s) noexcept {
-    head_.store(s, std::memory_order_release);
+    c_.head.store(s, std::memory_order_release);
   }
 
   void setTail(Segment* s) noexcept {
-    tail_.store(s, std::memory_order_release);
+    p_.tail.store(s, std::memory_order_release);
   }
 
   FOLLY_ALWAYS_INLINE void setProducerTicket(Ticket t) noexcept {
-    producerTicket_.store(t, std::memory_order_release);
+    p_.ticket.store(t, std::memory_order_release);
   }
 
   FOLLY_ALWAYS_INLINE void setConsumerTicket(Ticket t) noexcept {
-    consumerTicket_.store(t, std::memory_order_release);
+    c_.ticket.store(t, std::memory_order_release);
   }
 
   FOLLY_ALWAYS_INLINE Ticket fetchIncrementConsumerTicket() noexcept {
@@ -556,7 +613,7 @@ class UnboundedQueue {
       setConsumerTicket(oldval + 1);
       return oldval;
     } else { // MC
-      return consumerTicket_.fetch_add(1, std::memory_order_acq_rel);
+      return c_.ticket.fetch_add(1, std::memory_order_acq_rel);
     }
   }
 
@@ -566,7 +623,7 @@ class UnboundedQueue {
       setProducerTicket(oldval + 1);
       return oldval;
     } else { // MP
-      return producerTicket_.fetch_add(1, std::memory_order_acq_rel);
+      return p_.ticket.fetch_add(1, std::memory_order_acq_rel);
     }
   }
 
@@ -592,7 +649,10 @@ class UnboundedQueue {
     template <typename Clock, typename Duration>
     FOLLY_ALWAYS_INLINE bool tryWaitUntil(
         const std::chrono::time_point<Clock, Duration>& deadline) noexcept {
-      return flag_.try_wait_until(deadline);
+      // wait-options from benchmarks on contended queues:
+      auto const opt =
+          flag_.wait_options().spin_max(std::chrono::microseconds(10));
+      return flag_.try_wait_until(deadline, opt);
     }
 
    private:
@@ -617,8 +677,7 @@ class UnboundedQueue {
     Atom<Segment*> next_;
     const Ticket min_;
     bool marked_; // used for iterative deletion
-    FOLLY_ALIGNED(Align)
-    Entry b_[SegmentSize];
+    alignas(Align) Entry b_[SegmentSize];
 
    public:
     explicit Segment(const Ticket t)