2 * Copyright 2017-present Facebook, Inc.
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
8 * http://www.apache.org/licenses/LICENSE-2.0
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
23 #include <glog/logging.h>
25 #include <folly/concurrency/CacheLocality.h>
26 #include <folly/experimental/hazptr/hazptr.h>
27 #include <folly/synchronization/SaturatingSemaphore.h>
31 /// UnboundedQueue supports a variety of options for unbounded
32 /// dynamically expanding an shrinking queues, including variations of:
33 /// - Single vs. multiple producers
34 /// - Single vs. multiple consumers
35 /// - Blocking vs. spin-waiting
36 /// - Non-waiting, timed, and waiting consumer operations.
37 /// Producer operations never wait or fail (unless out-of-memory).
39 /// Template parameters:
41 /// - SingleProducer: true if there can be only one producer at a
43 /// - SingleConsumer: true if there can be only one consumer at a
45 /// - MayBlock: true if consumers may block, false if they only
46 /// spin. A performance tuning parameter.
47 /// - LgSegmentSize (default 8): Log base 2 of number of elements per
48 /// segment. A performance tuning parameter. See below.
49 /// - LgAlign (default 7): Log base 2 of alignment directive; can be
50 /// used to balance scalability (avoidance of false sharing) with
51 /// memory efficiency.
53 /// When to use UnboundedQueue:
54 /// - If a small bound may lead to deadlock or performance degradation
55 /// under bursty patterns.
56 /// - If there is no risk of the queue growing too much.
58 /// When not to use UnboundedQueue:
59 /// - If there is risk of the queue growing too much and a large bound
60 /// is acceptable, then use DynamicBoundedQueue.
61 /// - If the queue must not allocate on enqueue or it must have a
62 /// small bound, then use fixed-size MPMCQueue or (if non-blocking
63 /// SPSC) ProducerConsumerQueue.
66 /// USPSCQueue<T, MayBlock, LgSegmentSize, LgAlign>
67 /// UMPSCQueue<T, MayBlock, LgSegmentSize, LgAlign>
68 /// USPMCQueue<T, MayBlock, LgSegmentSize, LgAlign>
69 /// UMPMCQueue<T, MayBlock, LgSegmentSize, LgAlign>
72 /// Producer operations never wait or fail (unless OOM)
73 /// void enqueue(const T&);
74 /// void enqueue(T&&);
75 /// Adds an element to the end of the queue.
77 /// Consumer operations:
79 /// Extracts an element from the front of the queue. Waits
80 /// until an element is available if needed.
81 /// bool try_dequeue(T&);
82 /// Tries to extract an element from the front of the queue
83 /// if available. Returns true if successful, false otherwise.
84 /// bool try_dequeue_until(T&, time_point& deadline);
85 /// Tries to extract an element from the front of the queue
86 /// if available until the specified deadline. Returns true
87 /// if successful, false otherwise.
88 /// bool try_dequeue_for(T&, duration&);
89 /// Tries to extract an element from the front of the queue if
90 /// available for until the expiration of the specified
91 /// duration. Returns true if successful, false otherwise.
93 /// Secondary functions:
95 /// Returns an estimate of the size of the queue.
97 /// Returns true only if the queue was empty during the call.
98 /// Note: size() and empty() are guaranteed to be accurate only if
99 /// the queue is not changed concurrently.
103 /// /* UMPSC, doesn't block, 1024 int elements per segment */
104 /// UMPSCQueue<int, false, 10> q;
108 /// ASSERT_FALSE(q.empty());
109 /// ASSERT_EQ(q.size(), 3);
113 /// ASSERT_TRUE(try_dequeue(v));
115 /// ASSERT_TRUE(try_dequeue_until(v, now() + seconds(1)));
117 /// ASSERT_TRUE(q.empty());
118 /// ASSERT_EQ(q.size(), 0);
119 /// ASSERT_FALSE(try_dequeue(v));
120 /// ASSERT_FALSE(try_dequeue_for(v, microseconds(100)));
124 /// - The queue is composed of one or more segments. Each segment has
125 /// a fixed size of 2^LgSegmentSize entries. Each segment is used
127 /// - Each entry is composed of a futex and a single element.
128 /// - The queue contains two 64-bit ticket variables. The producer
129 /// ticket counts the number of producer tickets issued so far, and
130 /// the same for the consumer ticket. Each ticket number corresponds
131 /// to a specific entry in a specific segment.
132 /// - The queue maintains two pointers, head and tail. Head points to
133 /// the segment that corresponds to the current consumer
134 /// ticket. Similarly, tail pointer points to the segment that
135 /// corresponds to the producer ticket.
136 /// - Segments are organized as a singly linked list.
137 /// - The producer with the first ticket in the current producer
138 /// segment is solely responsible for allocating and linking the
140 /// - The producer with the last ticket in the current producer
141 /// segment is solely responsible for advancing the tail pointer to
142 /// the next segment.
143 /// - Similarly, the consumer with the last ticket in the current
144 /// consumer segment is solely responsible for advancing the head
145 /// pointer to the next segment. It must ensure that head never
149 /// - An empty queue contains one segment. A nonempty queue contains
150 /// one or two more segment than fits its contents.
151 /// - Removed segments are not reclaimed until there are no threads,
152 /// producers or consumers, have references to them or their
153 /// predecessors. That is, a lagging thread may delay the reclamation
154 /// of a chain of removed segments.
155 /// - The template parameter LgAlign can be used to reduce memory usage
156 /// at the cost of increased chance of false sharing.
158 /// Performance considerations:
159 /// - All operations take constant time, excluding the costs of
160 /// allocation, reclamation, interference from other threads, and
161 /// waiting for actions by other threads.
162 /// - In general, using the single producer and or single consumer
163 /// variants yield better performance than the MP and MC
165 /// - SPSC without blocking is the fastest configuration. It doesn't
166 /// include any read-modify-write atomic operations, full fences, or
167 /// system calls in the critical path.
168 /// - MP adds a fetch_add to the critical path of each producer operation.
169 /// - MC adds a fetch_add or compare_exchange to the critical path of
170 /// each consumer operation.
171 /// - The possibility of consumers blocking, even if they never do,
172 /// adds a compare_exchange to the critical path of each producer
174 /// - MPMC, SPMC, MPSC require the use of a deferred reclamation
175 /// mechanism to guarantee that segments removed from the linked
176 /// list, i.e., unreachable from the head pointer, are reclaimed
177 /// only after they are no longer needed by any lagging producers or
179 /// - The overheads of segment allocation and reclamation are intended
180 /// to be mostly out of the critical path of the queue's throughput.
181 /// - If the template parameter LgSegmentSize is changed, it should be
182 /// set adequately high to keep the amortized cost of allocation and
184 /// - Another consideration is that the queue is guaranteed to have
185 /// enough space for a number of consumers equal to 2^LgSegmentSize
186 /// for local blocking. Excess waiting consumers spin.
187 /// - It is recommended to measure performance with different variants
188 /// when applicable, e.g., UMPMC vs UMPSC. Depending on the use
189 /// case, sometimes the variant with the higher sequential overhead
190 /// may yield better results due to, for example, more favorable
191 /// producer-consumer balance or favorable timing for avoiding
199 size_t LgSegmentSize = 8,
201 template <typename> class Atom = std::atomic>
202 class UnboundedQueue {
203 using Ticket = uint64_t;
207 static constexpr bool SPSC = SingleProducer && SingleConsumer;
208 static constexpr size_t Stride = SPSC || (LgSegmentSize <= 1) ? 1 : 27;
209 static constexpr size_t SegmentSize = 1u << LgSegmentSize;
210 static constexpr size_t Align = 1u << LgAlign;
213 std::is_nothrow_destructible<T>::value,
214 "T must be nothrow_destructible");
215 static_assert((Stride & 1) == 1, "Stride must be odd");
216 static_assert(LgSegmentSize < 32, "LgSegmentSize must be < 32");
217 static_assert(LgAlign < 16, "LgAlign must be < 16");
222 folly::hazptr::hazptr_obj_batch batch;
229 alignas(Align) Consumer c_;
230 alignas(Align) Producer p_;
235 setProducerTicket(0);
236 setConsumerTicket(0);
237 Segment* s = new Segment(0);
245 for (Segment* s = head(); s; s = next) {
246 next = s->nextSegment();
252 FOLLY_ALWAYS_INLINE void enqueue(const T& arg) {
256 FOLLY_ALWAYS_INLINE void enqueue(T&& arg) {
257 enqueueImpl(std::move(arg));
261 FOLLY_ALWAYS_INLINE void dequeue(T& item) noexcept {
266 FOLLY_ALWAYS_INLINE bool try_dequeue(T& item) noexcept {
267 return tryDequeueUntil(item, std::chrono::steady_clock::time_point::min());
270 /** try_dequeue_until */
271 template <typename Clock, typename Duration>
272 FOLLY_ALWAYS_INLINE bool try_dequeue_until(
274 const std::chrono::time_point<Clock, Duration>& deadline) noexcept {
275 return tryDequeueUntil(item, deadline);
278 /** try_dequeue_for */
279 template <typename Rep, typename Period>
280 FOLLY_ALWAYS_INLINE bool try_dequeue_for(
282 const std::chrono::duration<Rep, Period>& duration) noexcept {
283 if (LIKELY(try_dequeue(item))) {
286 return tryDequeueUntil(item, std::chrono::steady_clock::now() + duration);
290 size_t size() const noexcept {
291 auto p = producerTicket();
292 auto c = consumerTicket();
293 return p > c ? p - c : 0;
297 bool empty() const noexcept {
298 auto c = consumerTicket();
299 auto p = producerTicket();
305 template <typename Arg>
306 FOLLY_ALWAYS_INLINE void enqueueImpl(Arg&& arg) {
309 enqueueCommon(s, std::forward<Arg>(arg));
311 // Using hazptr_holder instead of hazptr_local because it is
312 // possible that the T ctor happens to use hazard pointers.
313 folly::hazptr::hazptr_holder hptr;
314 Segment* s = hptr.get_protected(p_.tail);
315 enqueueCommon(s, std::forward<Arg>(arg));
320 template <typename Arg>
321 FOLLY_ALWAYS_INLINE void enqueueCommon(Segment* s, Arg&& arg) {
322 Ticket t = fetchIncrementProducerTicket();
323 if (!SingleProducer) {
324 s = findSegment(s, t);
326 DCHECK_GE(t, s->minTicket());
327 DCHECK_LT(t, s->minTicket() + SegmentSize);
328 size_t idx = index(t);
329 Entry& e = s->entry(idx);
330 e.putItem(std::forward<Arg>(arg));
331 if (responsibleForAlloc(t)) {
332 allocNextSegment(s, t + SegmentSize);
334 if (responsibleForAdvance(t)) {
340 FOLLY_ALWAYS_INLINE void dequeueImpl(T& item) noexcept {
343 dequeueCommon(s, item);
345 // Using hazptr_holder instead of hazptr_local because it is
346 // possible to call the T dtor and it may happen to use hazard
348 folly::hazptr::hazptr_holder hptr;
349 Segment* s = hptr.get_protected(c_.head);
350 dequeueCommon(s, item);
355 FOLLY_ALWAYS_INLINE void dequeueCommon(Segment* s, T& item) noexcept {
356 Ticket t = fetchIncrementConsumerTicket();
357 if (!SingleConsumer) {
358 s = findSegment(s, t);
360 size_t idx = index(t);
361 Entry& e = s->entry(idx);
363 if (responsibleForAdvance(t)) {
368 /** tryDequeueUntil */
369 template <typename Clock, typename Duration>
370 FOLLY_ALWAYS_INLINE bool tryDequeueUntil(
372 const std::chrono::time_point<Clock, Duration>& deadline) noexcept {
373 if (SingleConsumer) {
375 return tryDequeueUntilSC(s, item, deadline);
377 // Using hazptr_holder instead of hazptr_local because it is
378 // possible to call ~T() and it may happen to use hazard pointers.
379 folly::hazptr::hazptr_holder hptr;
380 Segment* s = hptr.get_protected(c_.head);
381 return tryDequeueUntilMC(s, item, deadline);
385 /** tryDequeueUntilSC */
386 template <typename Clock, typename Duration>
387 FOLLY_ALWAYS_INLINE bool tryDequeueUntilSC(
390 const std::chrono::time_point<Clock, Duration>& deadline) noexcept {
391 Ticket t = consumerTicket();
392 DCHECK_GE(t, s->minTicket());
393 DCHECK_LT(t, (s->minTicket() + SegmentSize));
394 size_t idx = index(t);
395 Entry& e = s->entry(idx);
396 if (UNLIKELY(!tryDequeueWaitElem(e, t, deadline))) {
399 setConsumerTicket(t + 1);
401 if (responsibleForAdvance(t)) {
407 /** tryDequeueUntilMC */
408 template <typename Clock, typename Duration>
409 FOLLY_ALWAYS_INLINE bool tryDequeueUntilMC(
412 const std::chrono::time_point<Clock, Duration>& deadline) noexcept {
414 Ticket t = consumerTicket();
415 if (UNLIKELY(t >= (s->minTicket() + SegmentSize))) {
416 s = tryGetNextSegmentUntil(s, deadline);
418 return false; // timed out
422 size_t idx = index(t);
423 Entry& e = s->entry(idx);
424 if (UNLIKELY(!tryDequeueWaitElem(e, t, deadline))) {
427 if (!c_.ticket.compare_exchange_weak(
428 t, t + 1, std::memory_order_acq_rel, std::memory_order_acquire)) {
432 if (responsibleForAdvance(t)) {
439 /** tryDequeueWaitElem */
440 template <typename Clock, typename Duration>
441 FOLLY_ALWAYS_INLINE bool tryDequeueWaitElem(
444 const std::chrono::time_point<Clock, Duration>& deadline) noexcept {
446 if (LIKELY(e.tryWaitUntil(deadline))) {
449 if (t >= producerTicket()) {
452 asm_volatile_pause();
458 Segment* findSegment(Segment* s, const Ticket t) const noexcept {
459 while (UNLIKELY(t >= (s->minTicket() + SegmentSize))) {
460 auto deadline = std::chrono::steady_clock::time_point::max();
461 s = tryGetNextSegmentUntil(s, deadline);
462 DCHECK(s != nullptr);
467 /** tryGetNextSegmentUntil */
468 template <typename Clock, typename Duration>
469 Segment* tryGetNextSegmentUntil(
471 const std::chrono::time_point<Clock, Duration>& deadline) const noexcept {
472 // The following loop will not spin indefinitely (as long as the
473 // number of concurrently waiting consumers does not exceeds
474 // SegmentSize and the OS scheduler does not pause ready threads
475 // indefinitely). Under such conditions, the algorithm guarantees
476 // that the producer reponsible for advancing the tail pointer to
477 // the next segment has already acquired its ticket.
478 while (tail() == s) {
479 if (deadline < Clock::time_point::max() && deadline > Clock::now()) {
482 asm_volatile_pause();
484 Segment* next = s->nextSegment();
485 DCHECK(next != nullptr);
489 /** allocNextSegment */
490 void allocNextSegment(Segment* s, const Ticket t) {
491 Segment* next = new Segment(t);
493 next->acquire_ref_safe(); // hazptr
495 DCHECK(s->nextSegment() == nullptr);
496 s->setNextSegment(next);
500 void advanceTail(Segment* s) noexcept {
501 Segment* next = s->nextSegment();
502 if (!SingleProducer) {
503 // The following loop will not spin indefinitely (as long as the
504 // OS scheduler does not pause ready threads indefinitely). The
505 // algorithm guarantees that the producer reponsible for setting
506 // the next pointer has already acquired its ticket.
507 while (next == nullptr) {
508 asm_volatile_pause();
509 next = s->nextSegment();
512 DCHECK(next != nullptr);
517 void advanceHead(Segment* s) noexcept {
518 auto deadline = std::chrono::steady_clock::time_point::max();
519 Segment* next = tryGetNextSegmentUntil(s, deadline);
520 DCHECK(next != nullptr);
521 while (head() != s) {
522 // Wait for head to advance to the current segment first before
523 // advancing head to the next segment. Otherwise, a lagging
524 // consumer responsible for advancing head from an earlier
525 // segment may incorrectly set head back.
526 asm_volatile_pause();
528 /* ***IMPORTANT*** prepReclaimSegment() must be called after
529 * confirming that head() is up-to-date and before calling
530 * setHead() to be thread-safe. */
531 /* ***IMPORTANT*** Segment s cannot be retired before the call to
532 * setHead(s). This is why prep_retire_refcounted(), which is
533 * called by prepReclaimSegment() does not retire objects, it
534 * merely adds the object to the batch and returns a private batch
535 * structure of a list of objects that can be retired later, if
536 * there are enough objects for amortizing the cost of updating
537 * the domain structure. */
538 auto res = prepReclaimSegment(s);
540 /* Now it is safe to retire s. */
541 /* ***IMPORTANT*** The destructor of res automatically calls
542 * retire_all(), which retires to the domain any objects moved to
543 * res from batch in the call to prepReclaimSegment(). */
546 /** reclaimSegment */
547 void reclaimSegment(Segment* s) noexcept {
551 s->retire(); // hazptr
555 /** prepReclaimSegment */
556 folly::hazptr::hazptr_obj_batch prepReclaimSegment(Segment* s) noexcept {
559 /*Return an empty result; nothing more to do for this segment */
560 return folly::hazptr::hazptr_obj_batch();
562 return c_.batch.prep_retire_refcounted(s);
566 FOLLY_ALWAYS_INLINE size_t index(Ticket t) const noexcept {
567 return (t * Stride) & (SegmentSize - 1);
570 FOLLY_ALWAYS_INLINE bool responsibleForAlloc(Ticket t) const noexcept {
571 return (t & (SegmentSize - 1)) == 0;
574 FOLLY_ALWAYS_INLINE bool responsibleForAdvance(Ticket t) const noexcept {
575 return (t & (SegmentSize - 1)) == (SegmentSize - 1);
578 FOLLY_ALWAYS_INLINE Segment* head() const noexcept {
579 return c_.head.load(std::memory_order_acquire);
582 FOLLY_ALWAYS_INLINE Segment* tail() const noexcept {
583 return p_.tail.load(std::memory_order_acquire);
586 FOLLY_ALWAYS_INLINE Ticket producerTicket() const noexcept {
587 return p_.ticket.load(std::memory_order_acquire);
590 FOLLY_ALWAYS_INLINE Ticket consumerTicket() const noexcept {
591 return c_.ticket.load(std::memory_order_acquire);
594 void setHead(Segment* s) noexcept {
595 c_.head.store(s, std::memory_order_release);
598 void setTail(Segment* s) noexcept {
599 p_.tail.store(s, std::memory_order_release);
602 FOLLY_ALWAYS_INLINE void setProducerTicket(Ticket t) noexcept {
603 p_.ticket.store(t, std::memory_order_release);
606 FOLLY_ALWAYS_INLINE void setConsumerTicket(Ticket t) noexcept {
607 c_.ticket.store(t, std::memory_order_release);
610 FOLLY_ALWAYS_INLINE Ticket fetchIncrementConsumerTicket() noexcept {
611 if (SingleConsumer) {
612 Ticket oldval = consumerTicket();
613 setConsumerTicket(oldval + 1);
616 return c_.ticket.fetch_add(1, std::memory_order_acq_rel);
620 FOLLY_ALWAYS_INLINE Ticket fetchIncrementProducerTicket() noexcept {
621 if (SingleProducer) {
622 Ticket oldval = producerTicket();
623 setProducerTicket(oldval + 1);
626 return p_.ticket.fetch_add(1, std::memory_order_acq_rel);
634 folly::SaturatingSemaphore<MayBlock, Atom> flag_;
635 typename std::aligned_storage<sizeof(T), alignof(T)>::type item_;
638 template <typename Arg>
639 FOLLY_ALWAYS_INLINE void putItem(Arg&& arg) {
640 new (&item_) T(std::forward<Arg>(arg));
644 FOLLY_ALWAYS_INLINE void takeItem(T& item) noexcept {
649 template <typename Clock, typename Duration>
650 FOLLY_ALWAYS_INLINE bool tryWaitUntil(
651 const std::chrono::time_point<Clock, Duration>& deadline) noexcept {
652 // wait-options from benchmarks on contended queues:
654 flag_.wait_options().spin_max(std::chrono::microseconds(10));
655 return flag_.try_wait_until(deadline, opt);
659 FOLLY_ALWAYS_INLINE void getItem(T& item) noexcept {
660 item = std::move(*(itemPtr()));
664 FOLLY_ALWAYS_INLINE T* itemPtr() noexcept {
665 return static_cast<T*>(static_cast<void*>(&item_));
668 FOLLY_ALWAYS_INLINE void destroyItem() noexcept {
676 class Segment : public folly::hazptr::hazptr_obj_base_refcounted<Segment> {
677 Atom<Segment*> next_;
679 bool marked_; // used for iterative deletion
680 alignas(Align) Entry b_[SegmentSize];
683 explicit Segment(const Ticket t)
684 : next_(nullptr), min_(t), marked_(false) {}
687 if (!SPSC && !marked_) {
688 Segment* next = nextSegment();
690 if (!next->release_ref()) { // hazptr
694 next = s->nextSegment();
701 Segment* nextSegment() const noexcept {
702 return next_.load(std::memory_order_acquire);
705 void setNextSegment(Segment* s) noexcept {
706 next_.store(s, std::memory_order_release);
709 FOLLY_ALWAYS_INLINE Ticket minTicket() const noexcept {
710 DCHECK_EQ((min_ & (SegmentSize - 1)), 0);
714 FOLLY_ALWAYS_INLINE Entry& entry(size_t index) noexcept {
726 size_t LgSegmentSize = 8,
728 template <typename> class Atom = std::atomic>
730 UnboundedQueue<T, true, true, MayBlock, LgSegmentSize, LgAlign, Atom>;
735 size_t LgSegmentSize = 8,
737 template <typename> class Atom = std::atomic>
739 UnboundedQueue<T, false, true, MayBlock, LgSegmentSize, LgAlign, Atom>;
744 size_t LgSegmentSize = 8,
746 template <typename> class Atom = std::atomic>
748 UnboundedQueue<T, true, false, MayBlock, LgSegmentSize, LgAlign, Atom>;
753 size_t LgSegmentSize = 8,
755 template <typename> class Atom = std::atomic>
757 UnboundedQueue<T, false, false, MayBlock, LgSegmentSize, LgAlign, Atom>;