2 * Copyright 2017-present Facebook, Inc.
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
8 * http://www.apache.org/licenses/LICENSE-2.0
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
19 #include <folly/concurrency/CacheLocality.h>
20 #include <folly/concurrency/UnboundedQueue.h>
22 #include <glog/logging.h>
29 /// DynamicBoundedQueue supports:
30 /// - Dynamic memory usage that grows and shrink in proportion to the
31 /// number of elements in the queue.
32 /// - Adjustable capacity that helps throttle pathological cases of
33 /// producer-consumer imbalance that may lead to excessive memory
35 /// - The adjustable capacity can also help prevent deadlock by
36 /// allowing users to temporarily increase capacity substantially to
37 /// guarantee accommodating producer requests that cannot wait.
38 /// - SPSC, SPMC, MPSC, MPMC variants.
39 /// - Blocking and spinning-only variants.
40 /// - Inter-operable non-waiting, timed until, timed for, and waiting
41 /// variants of producer and consumer operations.
42 /// - Optional variable element weights.
45 /// - Queue elements may have variable weights (calculated using a
46 /// template parameter) that are by default 1.
47 /// - Element weights count towards the queue's capacity.
48 /// - Elements weights are not priorities and do not affect element
49 /// order. Queues with variable element weights follow FIFO order,
50 /// the same as default queues.
52 /// When to use DynamicBoundedQueue:
53 /// - If a small maximum capacity may lead to deadlock or performance
54 /// degradation under bursty patterns and a larger capacity is
56 /// - If the typical queue size is expected to be much lower than the
58 /// - If an unbounded queue is susceptible to growing too much.
59 /// - If support for variable element weights is needed.
61 /// When not to use DynamicBoundedQueue?
62 /// - If dynamic memory allocation is unacceptable or if the maximum
63 /// capacity needs to be small, then use fixed-size MPMCQueue or (if
64 /// non-blocking SPSC) ProducerConsumerQueue.
65 /// - If there is no risk of the queue growing too much, then use
69 /// - The general rule is to set the capacity as high as acceptable.
70 /// The queue performs best when it is not near full capacity.
71 /// - The implementation may allow extra slack in capacity (~10%) for
72 /// amortizing some costly steps. Therefore, precise capacity is not
73 /// guaranteed and cannot be relied on for synchronization; i.e.,
74 /// this queue cannot be used as a semaphore.
76 /// Performance expectations:
77 /// - As long as the queue size is below capacity in the common case,
78 /// performance is comparable to MPMCQueue and better in cases of
79 /// higher producer demand.
80 /// - Performance degrades gracefully at full capacity.
81 /// - It is recommended to measure performance with different variants
82 /// when applicable, e.g., DMPMC vs DMPSC. Depending on the use
83 /// case, sometimes the variant with the higher sequential overhead
84 /// may yield better results due to, for example, more favorable
85 /// producer-consumer balance or favorable timing for avoiding
87 /// - See DynamicBoundedQueueTest.cpp for some benchmark results.
89 /// Template parameters:
91 /// - SingleProducer: true if there can be only one producer at a
93 /// - SingleConsumer: true if there can be only one consumer at a
95 /// - MayBlock: true if producers or consumers may block.
96 /// - LgSegmentSize (default 8): Log base 2 of number of elements per
97 /// UnboundedQueue segment.
98 /// - LgAlign (default 7): Log base 2 of alignment directive; can be
99 /// used to balance scalability (avoidance of false sharing) with
100 /// memory efficiency.
101 /// - WeightFn (DefaultWeightFn<T>): A customizable weight computing type
102 /// for computing the weights of elements. The default weight is 1.
104 /// Template Aliases:
105 /// DSPSCQueue<T, MayBlock, LgSegmentSize, LgAlign>
106 /// DMPSCQueue<T, MayBlock, LgSegmentSize, LgAlign>
107 /// DSPMCQueue<T, MayBlock, LgSegmentSize, LgAlign>
108 /// DMPMCQueue<T, MayBlock, LgSegmentSize, LgAlign>
112 /// Takes a capacity value as an argument.
114 /// Producer functions:
115 /// void enqueue(const T&);
116 /// void enqueue(T&&);
117 /// Adds an element to the end of the queue. Waits until
118 /// capacity is available if necessary.
119 /// bool try_enqueue(const T&);
120 /// bool try_enqueue(T&&);
121 /// Tries to add an element to the end of the queue if
122 /// capacity allows it. Returns true if successful. Otherwise
124 /// bool try_enqueue_until(const T&, time_point& deadline);
125 /// bool try_enqueue_until(T&&, time_point& deadline);
126 /// Tries to add an element to the end of the queue if
127 /// capacity allows it until the specified deadline. Returns
128 /// true if successful, otherwise false.
129 /// bool try_enqueue_for(const T&, duration&);
130 /// bool try_enqueue_for(T&&, duration&);
131 /// Tries to add an element to the end of the queue if
132 /// capacity allows until the expiration of the specified
133 /// duration. Returns true if successful, otherwise false.
135 /// Consumer functions:
136 /// void dequeue(T&);
137 /// Extracts an element from the front of the queue. Waits
138 /// until an element is available if necessary.
139 /// bool try_dequeue(T&);
140 /// Tries to extracts an element from the front of the queue
141 /// if available. Returns true if successful, otherwise false.
142 /// bool try_dequeue_until(T&, time_point& deadline);
143 /// Tries to extracts an element from the front of the queue
144 /// if available until the specified daedline. Returns true
145 /// if successful. Otherwise Returns false.
146 /// bool try_dequeue_for(T&, duration&);
147 /// Tries to extracts an element from the front of the queue
148 /// if available until the expiration of the specified
149 /// duration. Returns true if successful. Otherwise Returns
152 /// Secondary functions:
153 /// void reset_capacity(size_t capacity);
154 /// Changes the capacity of the queue. Does not affect the
155 /// current contents of the queue. Guaranteed only to affect
156 /// subsequent enqueue operations. May or may not affect
157 /// concurrent operations. Capacity must be at least 1000.
159 /// Returns an estimate of the total weight of the elements in
162 /// Returns an estimate of the total number of elements.
164 /// Returns true only if the queue was empty during the call.
165 /// Note: weight(), size(), and empty() are guaranteed to be
166 /// accurate only if there are no concurrent changes to the queue.
168 /// Usage example with default weight:
170 /// /* DMPSC, doesn't block, 1024 int elements per segment */
171 /// DMPSCQueue<int, false, 10> q(100000);
172 /// ASSERT_TRUE(q.empty());
173 /// ASSERT_EQ(q.size(), 0);
175 /// ASSERT_TRUE(q.try_enqueue(2));
176 /// ASSERT_TRUE(q.try_enqueue_until(3, deadline));
177 /// ASSERT_TRUE(q.try_enqueue(4, duration));
178 /// // ... enqueue more elements until capacity is full
179 /// // See above comments about imprecise capacity guarantees
180 /// ASSERT_FALSE(q.try_enqueue(100001)); // can't enqueue but can't wait
181 /// size_t sz = q.size();
182 /// ASSERT_GE(sz, 100000);
183 /// q.reset_capacity(1000000000); // set huge capacity
184 /// ASSERT_TRUE(q.try_enqueue(100001)); // now enqueue succeeds
185 /// q.reset_capacity(100000); // set capacity back to 100,000
186 /// ASSERT_FALSE(q.try_enqueue(100002));
187 /// ASSERT_EQ(q.size(), sz + 1);
191 /// ASSERT_TRUE(q.try_dequeue(v));
193 /// ASSERT_TRUE(q.try_dequeue_until(v, deadline));
195 /// ASSERT_TRUE(q.try_dequeue_for(v, duration));
197 /// ASSERT_EQ(q.size(), sz - 3);
200 /// Usage example with custom weights:
202 /// struct CustomWeightFn {
203 /// uint64_t operator()(int val) { return val / 100; }
205 /// DMPMCQueue<int, false, 10, CustomWeightFn> q(20);
206 /// ASSERT_TRUE(q.empty());
208 /// ASSERT_TRUE(q.try_enqueue(200));
209 /// ASSERT_TRUE(q.try_enqueue_until(500, now() + seconds(1)));
210 /// ASSERT_EQ(q.size(), 3);
211 /// ASSERT_EQ(q.weight(), 8);
212 /// ASSERT_FALSE(q.try_enqueue_for(1700, microseconds(1)));
213 /// q.reset_capacity(1000000); // set capacity to 1000000 instead of 20
214 /// ASSERT_TRUE(q.try_enqueue_for(1700, microseconds(1)));
215 /// q.reset_capacity(20); // set capacity to 20 again
216 /// ASSERT_FALSE(q.try_enqueue(100));
217 /// ASSERT_EQ(q.size(), 4);
218 /// ASSERT_EQ(q.weight(), 25);
221 /// ASSERT_EQ(v, 100);
222 /// ASSERT_TRUE(q.try_dequeue(v));
223 /// ASSERT_EQ(v, 200);
224 /// ASSERT_TRUE(q.try_dequeue_until(v, now() + seconds(1)));
225 /// ASSERT_EQ(v, 500);
226 /// ASSERT_EQ(q.size(), 1);
227 /// ASSERT_EQ(q.weight(), 17);
231 /// - The implementation is on top of UnboundedQueue.
232 /// - The main FIFO functionality is in UnboundedQueue.
233 /// DynamicBoundedQueue manages keeping the total queue weight
234 /// within the specified capacity.
235 /// - For the sake of scalability, the data structures are designed to
236 /// minimize interference between producers on one side and
237 /// consumers on the other.
238 /// - Producers add to a debit variable the weight of the added
239 /// element and check capacity.
240 /// - Consumers add to a credit variable the weight of the removed
242 /// - Producers, for the sake of scalability, use fetch_add to add to
243 /// the debit variable and subtract if it exceeded capacity,
244 /// rather than using compare_exchange to avoid overshooting.
245 /// - Consumers, infrequently, transfer credit to a transfer variable
246 /// and unblock any blocked producers. The transfer variable can be
247 /// used by producers to decrease their debit when needed.
248 /// - Note that a low capacity will trigger frequent credit transfer
249 /// by consumers that may degrade performance. Capacity should not
251 /// - Transfer of credit by consumers is triggered when the amount of
252 /// credit reaches a threshold (1/10 of capacity).
253 /// - The waiting of consumers is handled in UnboundedQueue.
254 /// The waiting of producers is handled in this template.
255 /// - For a producer operation, if the difference between debit and
256 /// capacity (plus some slack to account for the transfer threshold)
257 /// does not accommodate the weight of the new element, it first
258 /// tries to transfer credit that may have already been made
259 /// available by consumers. If this is insufficient and MayBlock is
260 /// true, then the producer uses a futex to block until new credit
261 /// is transferred by a consumer.
264 /// - Aside from three cache lines for managing capacity, the memory
265 /// for queue elements is managed using UnboundedQueue and grows and
266 /// shrinks dynamically with the number of elements.
267 /// - The template parameter LgAlign can be used to reduce memory usage
268 /// at the cost of increased chance of false sharing.
270 template <typename T>
271 struct DefaultWeightFn {
272 template <typename Arg>
273 uint64_t operator()(Arg&&) const noexcept {
283 size_t LgSegmentSize = 8,
285 typename WeightFn = DefaultWeightFn<T>,
286 template <typename> class Atom = std::atomic>
287 class DynamicBoundedQueue {
288 using Weight = uint64_t;
290 enum WaitingState : uint32_t {
295 static constexpr bool SPSC = SingleProducer && SingleConsumer;
296 static constexpr size_t Align = 1u << LgAlign;
298 static_assert(LgAlign < 16, "LgAlign must be < 16");
302 // Read mostly by producers
303 alignas(Align) Atom<Weight> debit_; // written frequently only by producers
304 Atom<Weight> capacity_; // written rarely by capacity resets
306 // Read mostly by consumers
307 alignas(Align) Atom<Weight> credit_; // written frequently only by consumers
308 Atom<Weight> threshold_; // written rarely only by capacity resets
310 // Normally written and read rarely by producers and consumers
311 // May be read frequently by producers when capacity is full
312 alignas(Align) Atom<Weight> transfer_;
313 detail::Futex<Atom> waiting_;
315 // Underlying unbounded queue
328 explicit DynamicBoundedQueue(Weight capacity)
330 capacity_(capacity + threshold(capacity)), // capacity slack
332 threshold_(threshold(capacity)),
337 ~DynamicBoundedQueue() {}
339 /// Enqueue functions
342 FOLLY_ALWAYS_INLINE void enqueue(const T& v) {
346 FOLLY_ALWAYS_INLINE void enqueue(T&& v) {
347 enqueueImpl(std::move(v));
351 FOLLY_ALWAYS_INLINE bool try_enqueue(const T& v) {
352 return tryEnqueueImpl(v);
355 FOLLY_ALWAYS_INLINE bool try_enqueue(T&& v) {
356 return tryEnqueueImpl(std::move(v));
359 /** try_enqueue_until */
360 template <typename Clock, typename Duration>
361 FOLLY_ALWAYS_INLINE bool try_enqueue_until(
363 const std::chrono::time_point<Clock, Duration>& deadline) {
364 return tryEnqueueUntilImpl(v, deadline);
367 template <typename Clock, typename Duration>
368 FOLLY_ALWAYS_INLINE bool try_enqueue_until(
370 const std::chrono::time_point<Clock, Duration>& deadline) {
371 return tryEnqueueUntilImpl(std::move(v), deadline);
374 /** try_enqueue_for */
375 template <typename Rep, typename Period>
376 FOLLY_ALWAYS_INLINE bool try_enqueue_for(
378 const std::chrono::duration<Rep, Period>& duration) {
379 return tryEnqueueForImpl(v, duration);
382 template <typename Rep, typename Period>
383 FOLLY_ALWAYS_INLINE bool try_enqueue_for(
385 const std::chrono::duration<Rep, Period>& duration) {
386 return tryEnqueueForImpl(std::move(v), duration);
389 /// Dequeue functions
392 FOLLY_ALWAYS_INLINE void dequeue(T& elem) {
394 addCredit(WeightFn()(elem));
398 FOLLY_ALWAYS_INLINE bool try_dequeue(T& elem) {
399 if (q_.try_dequeue(elem)) {
400 addCredit(WeightFn()(elem));
406 /** try_dequeue_until */
407 template <typename Clock, typename Duration>
408 FOLLY_ALWAYS_INLINE bool try_dequeue_until(
410 const std::chrono::time_point<Clock, Duration>& deadline) {
411 if (q_.try_dequeue_until(elem, deadline)) {
412 addCredit(WeightFn()(elem));
418 /** try_dequeue_for */
419 template <typename Rep, typename Period>
420 FOLLY_ALWAYS_INLINE bool try_dequeue_for(
422 const std::chrono::duration<Rep, Period>& duration) {
423 if (q_.try_dequeue_for(elem, duration)) {
424 addCredit(WeightFn()(elem));
430 /// Secondary functions
432 /** reset_capacity */
433 void reset_capacity(Weight capacity) noexcept {
434 Weight thresh = threshold(capacity);
435 capacity_.store(capacity + thresh, std::memory_order_release);
436 threshold_.store(thresh, std::memory_order_release);
440 Weight weight() const noexcept {
442 auto c = getCredit();
443 auto t = getTransfer();
444 return d > (c + t) ? d - (c + t) : 0;
448 size_t size() const noexcept {
453 bool empty() const noexcept {
458 /// Private functions ///
460 // Calculation of threshold to move credits in bulk from consumers
462 constexpr Weight threshold(Weight capacity) const noexcept {
463 return capacity / 10;
466 // Functions called frequently by producers
468 template <typename Arg>
469 FOLLY_ALWAYS_INLINE void enqueueImpl(Arg&& v) {
471 std::forward<Arg>(v), std::chrono::steady_clock::time_point::max());
474 template <typename Arg>
475 FOLLY_ALWAYS_INLINE bool tryEnqueueImpl(Arg&& v) {
476 return tryEnqueueUntilImpl(
477 std::forward<Arg>(v), std::chrono::steady_clock::time_point::min());
480 template <typename Clock, typename Duration, typename Arg>
481 FOLLY_ALWAYS_INLINE bool tryEnqueueUntilImpl(
483 const std::chrono::time_point<Clock, Duration>& deadline) {
484 Weight weight = WeightFn()(std::forward<Arg>(v));
485 if (LIKELY(tryAddDebit(weight))) {
486 q_.enqueue(std::forward<Arg>(v));
489 return tryEnqueueUntilSlow(std::forward<Arg>(v), deadline);
492 template <typename Rep, typename Period, typename Arg>
493 FOLLY_ALWAYS_INLINE bool tryEnqueueForImpl(
495 const std::chrono::duration<Rep, Period>& duration) {
496 if (LIKELY(tryEnqueueImpl(std::forward<Arg>(v)))) {
499 auto deadline = std::chrono::steady_clock::now() + duration;
500 return tryEnqueueUntilSlow(std::forward<Arg>(v), deadline);
503 FOLLY_ALWAYS_INLINE bool tryAddDebit(Weight weight) noexcept {
504 Weight capacity = getCapacity();
505 Weight before = fetchAddDebit(weight);
506 if (LIKELY(before + weight <= capacity)) {
514 FOLLY_ALWAYS_INLINE Weight getCapacity() const noexcept {
515 return capacity_.load(std::memory_order_acquire);
518 FOLLY_ALWAYS_INLINE Weight fetchAddDebit(Weight weight) noexcept {
520 if (SingleProducer) {
522 debit_.store(before + weight, std::memory_order_relaxed);
524 before = debit_.fetch_add(weight, std::memory_order_acq_rel);
529 FOLLY_ALWAYS_INLINE Weight getDebit() const noexcept {
530 return debit_.load(std::memory_order_acquire);
533 // Functions called frequently by consumers
535 FOLLY_ALWAYS_INLINE void addCredit(Weight weight) noexcept {
536 Weight before = fetchAddCredit(weight);
537 Weight thresh = getThreshold();
538 if (before + weight >= thresh && before < thresh) {
543 FOLLY_ALWAYS_INLINE Weight fetchAddCredit(Weight weight) noexcept {
545 if (SingleConsumer) {
546 before = getCredit();
547 credit_.store(before + weight, std::memory_order_relaxed);
549 before = credit_.fetch_add(weight, std::memory_order_acq_rel);
554 FOLLY_ALWAYS_INLINE Weight getCredit() const noexcept {
555 return credit_.load(std::memory_order_acquire);
558 FOLLY_ALWAYS_INLINE Weight getThreshold() const noexcept {
559 return threshold_.load(std::memory_order_acquire);
562 /** Functions called infrequently by producers */
564 void subDebit(Weight weight) noexcept {
566 if (SingleProducer) {
568 debit_.store(before - weight, std::memory_order_relaxed);
570 before = debit_.fetch_sub(weight, std::memory_order_acq_rel);
572 DCHECK_GE(before, weight);
575 template <typename Clock, typename Duration, typename Arg>
576 bool tryEnqueueUntilSlow(
578 const std::chrono::time_point<Clock, Duration>& deadline) {
579 Weight weight = WeightFn()(std::forward<Arg>(v));
580 if (canEnqueue(deadline, weight)) {
581 q_.enqueue(std::forward<Arg>(v));
588 template <typename Clock, typename Duration>
590 const std::chrono::time_point<Clock, Duration>& deadline,
591 Weight weight) noexcept {
592 Weight capacity = getCapacity();
595 Weight debit = getDebit();
596 if ((debit + weight <= capacity) && tryAddDebit(weight)) {
599 if (Clock::now() >= deadline) {
603 if (canBlock(weight, capacity)) {
604 waiting_.futexWaitUntil(WAITING, deadline);
607 asm_volatile_pause();
612 bool canBlock(Weight weight, Weight capacity) noexcept {
613 waiting_.store(WAITING, std::memory_order_relaxed);
614 std::atomic_thread_fence(std::memory_order_seq_cst);
616 Weight debit = getDebit();
617 return debit + weight > capacity;
620 bool tryReduceDebit() noexcept {
621 Weight w = takeTransfer();
628 Weight takeTransfer() noexcept {
629 Weight w = getTransfer();
631 w = transfer_.exchange(0, std::memory_order_acq_rel);
636 Weight getTransfer() const noexcept {
637 return transfer_.load(std::memory_order_acquire);
640 /** Functions called infrequently by consumers */
642 void transferCredit() noexcept {
643 Weight credit = takeCredit();
644 transfer_.fetch_add(credit, std::memory_order_acq_rel);
646 std::atomic_thread_fence(std::memory_order_seq_cst);
647 waiting_.store(NOTWAITING, std::memory_order_relaxed);
648 waiting_.futexWake();
652 Weight takeCredit() noexcept {
654 if (SingleConsumer) {
655 credit = credit_.load(std::memory_order_relaxed);
656 credit_.store(0, std::memory_order_relaxed);
658 credit = credit_.exchange(0, std::memory_order_acq_rel);
663 }; // DynamicBoundedQueue
671 size_t LgSegmentSize = 8,
673 typename WeightFn = DefaultWeightFn<T>,
674 template <typename> class Atom = std::atomic>
675 using DSPSCQueue = DynamicBoundedQueue<
689 size_t LgSegmentSize = 8,
691 typename WeightFn = DefaultWeightFn<T>,
692 template <typename> class Atom = std::atomic>
693 using DMPSCQueue = DynamicBoundedQueue<
707 size_t LgSegmentSize = 8,
709 typename WeightFn = DefaultWeightFn<T>,
710 template <typename> class Atom = std::atomic>
711 using DSPMCQueue = DynamicBoundedQueue<
725 size_t LgSegmentSize = 8,
727 typename WeightFn = DefaultWeightFn<T>,
728 template <typename> class Atom = std::atomic>
729 using DMPMCQueue = DynamicBoundedQueue<