--- /dev/null
+/*
+ * Copyright 2017-present Facebook, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#pragma once
+
+#include <folly/concurrency/CacheLocality.h>
+#include <folly/concurrency/UnboundedQueue.h>
+
+#include <glog/logging.h>
+
+#include <atomic>
+#include <chrono>
+
+namespace folly {
+
+/// DynamicBoundedQueue supports:
+
+/// - Dynamic memory usage that grows and shrink in proportion to the
+/// number of elements in the queue.
+/// - Adjustable capacity that helps throttle pathological cases of
+/// producer-consumer imbalance that may lead to excessive memory
+/// usage.
+/// - The adjustable capacity can also help prevent deadlock by
+/// allowing users to temporarily increase capacity substantially to
+/// guarantee accommodating producer requests that cannot wait.
+/// - SPSC, SPMC, MPSC, MPMC variants.
+/// - Blocking and spinning-only variants.
+/// - Inter-operable non-waiting, timed until, timed for, and waiting
+/// variants of producer and consumer operations.
+/// - Optional variable element weights.
+///
+/// Element Weights
+/// - Queue elements may have variable weights (calculated using a
+/// template parameter) that are by default 1.
+/// - Element weights count towards the queue's capacity.
+/// - Elements weights are not priorities and do not affect element
+/// order. Queues with variable element weights follow FIFO order,
+/// the same as default queues.
+///
+/// When to use DynamicBoundedQueue:
+/// - If a small maximum capacity may lead to deadlock or performance
+/// degradation under bursty patterns and a larger capacity is
+/// sufficient.
+/// - If the typical queue size is expected to be much lower than the
+/// maximum capacity
+/// - If an unbounded queue is susceptible to growing too much.
+/// - If support for variable element weights is needed.
+///
+/// When not to use DynamicBoundedQueue?
+/// - If dynamic memory allocation is unacceptable or if the maximum
+/// capacity needs to be small, then use fixed-size MPMCQueue or (if
+/// non-blocking SPSC) ProducerConsumerQueue.
+/// - If there is no risk of the queue growing too much, then use
+/// UnboundedQueue.
+///
+/// Setting capacity
+/// - The general rule is to set the capacity as high as acceptable.
+/// The queue performs best when it is not near full capacity.
+/// - The implementation may allow extra slack in capacity (~10%) for
+/// amortizing some costly steps. Therefore, precise capacity is not
+/// guaranteed and cannot be relied on for synchronization; i.e.,
+/// this queue cannot be used as a semaphore.
+///
+/// Performance expectations:
+/// - As long as the queue size is below capacity in the common case,
+/// performance is comparable to MPMCQueue and better in cases of
+/// higher producer demand.
+/// - Performance degrades gracefully at full capacity.
+/// - It is recommended to measure performance with different variants
+/// when applicable, e.g., DMPMC vs DMPSC. Depending on the use
+/// case, sometimes the variant with the higher sequential overhead
+/// may yield better results due to, for example, more favorable
+/// producer-consumer balance or favorable timing for avoiding
+/// costly blocking.
+/// - See DynamicBoundedQueueTest.cpp for some benchmark results.
+///
+/// Template prameters:
+/// - T: element type
+/// - SingleProducer: true if there can be only one producer at a
+/// time.
+/// - SingleConsumer: true if there can be only one consumer at a
+/// time.
+/// - MayBlock: true if producers or consumers may block.
+/// - LgSegmentSize (default 8): Log base 2 of number of elements per
+/// UnboundedQueue segment.
+/// - LgAlign (default 7): Log base 2 of alignment directive; can be
+/// used to balance scalability (avoidance of false sharing) with
+/// memory efficiency.
+/// - WeightFn (DefaultWeightFn<T>): A customizable weight computing type
+/// for computing the weights of elements. The default weight is 1.
+///
+/// Template Aliases:
+/// DSPSCQueue<T, MayBlock, LgSegmentSize, LgAlign>
+/// DMPSCQueue<T, MayBlock, LgSegmentSize, LgAlign>
+/// DSPMCQueue<T, MayBlock, LgSegmentSize, LgAlign>
+/// DMPMCQueue<T, MayBlock, LgSegmentSize, LgAlign>
+///
+/// Functions:
+/// Constructor
+/// Takes a capacity value as an argument.
+///
+/// Producer functions:
+/// void enqueue(const T&);
+/// void enqueue(T&&);
+/// Adds an element to the end of the queue. Waits until
+/// capacity is available if necessary.
+/// bool try_enqueue(const T&);
+/// bool try_enqueue(T&&);
+/// Tries to add an element to the end of the queue if
+/// capacity allows it. Returns true if successful. Otherwise
+/// Returns false.
+/// bool try_enqueue_until(const T&, time_point&);
+/// bool try_enqueue_until(T&&, time_point&);
+/// Tries to add an element to the end of the queue if
+/// capacity allows it until the specified timepoint. Returns
+/// true if successful, otherwise false.
+/// bool try_enqueue_for(const T&, duration&);
+/// bool try_enqueue_for(T&&, duration&);
+/// Tries to add an element to the end of the queue if
+/// capacity allows it until the specified timepoint. Returns
+/// true if successful, otherwise false.
+///
+/// Consumer functions:
+/// void dequeue(T&);
+/// Extracts an element from the front of the queue. Waits
+/// until an element is available if necessary.
+/// bool try_dequeue(T&);
+/// Tries to extracts an element from the front of the queue
+/// if available. Returns true if successful, otherwise false.
+/// bool try_dequeue_until(T&, time_point&);
+/// Tries to extracts an element from the front of the queue
+/// if available until the specified time_point. Returns true
+/// if successful. Otherwise Returns false.
+/// bool try_dequeue_for(T&, duration&);
+/// Tries to extracts an element from the front of the queue
+/// if available until the expiration of the specified
+/// duration. Returns true if successful. Otherwise Returns
+/// false.
+///
+/// Secondary functions:
+/// void reset_capacity(size_t capacity);
+/// Changes the capacity of the queue. Does not affect the
+/// current contents of the queue. Guaranteed only to affect
+/// subsequent enqueue operations. May or may not affect
+/// concurrent operations. Capacity must be at least 1000.
+/// Weight weight();
+/// Returns an estimate of the total weight of the elements in
+/// the queue.
+/// size_t size();
+/// Returns an estimate of the total number of elements.
+/// bool empty();
+/// Returns true only if the queue was empty during the call.
+/// Note: weight(), size(), and empty() are guaranteed to be
+/// accurate only if there are no concurrent changes to the queue.
+///
+/// Usage example with default weight:
+/// @code
+/// /* DMPSC, doesn't block, 1024 int elements per segment */
+/// DMPSCQueue<int, false, 10> q(100000);
+/// ASSERT_TRUE(q.empty());
+/// ASSERT_EQ(q.size(), 0);
+/// q.enqueue(1));
+/// ASSERT_TRUE(q.try_enqueue(2));
+/// ASSERT_TRUE(q.try_enqueue_until(3, deadline));
+/// ASSERT_TRUE(q.try_enqueue(4, duration));
+/// // ... enqueue more elements until capacity is full
+/// // See above comments about imprecise capacity guarantees
+/// ASSERT_FALSE(q.try_enqueue(100001)); // can't enqueue but can't wait
+/// size_t sz = q.size();
+/// ASSERT_GE(sz, 100000);
+/// q.reset_capacity(1000000000); // set huge capacity
+/// ASSERT_TRUE(q.try_enqueue(100001)); // now enqueue succeeds
+/// q.reset_capacity(100000); // set capacity back to 100,000
+/// ASSERT_FALSE(q.try_enqueue(100002));
+/// ASSERT_EQ(q.size(), sz + 1);
+/// int v;
+/// q.dequeue(v);
+/// ASSERT_EQ(v, 1);
+/// ASSERT_TRUE(q.try_dequeue(v));
+/// ASSERT_EQ(v, 2);
+/// ASSERT_TRUE(q.try_dequeue_until(v, deadline));
+/// ASSERT_EQ(v, 3);
+/// ASSERT_TRUE(q.try_dequeue_for(v, duration));
+/// ASSERT_EQ(v, 4);
+/// ASSERT_EQ(q.size(), sz - 3);
+/// @endcode
+///
+/// Usage example with custom weights:
+/// @code
+/// struct CustomWeightFn {
+/// uint64_t operator()(int val) { return val / 100; }
+/// };
+/// DMPMCQueue<int, false, 10, CustomWeightFn> q(20);
+/// ASSERT_TRUE(q.empty());
+/// q.enqueue(100);
+/// ASSERT_TRUE(q.try_enqueue(200));
+/// ASSERT_TRUE(q.try_enqueue_until(500, now() + seconds(1)));
+/// ASSERT_EQ(q.size(), 3);
+/// ASSERT_EQ(q.weight(), 8);
+/// ASSERT_FALSE(q.try_enqueue_for(1700, microseconds(1)));
+/// q.reset_capacity(1000000); // set capacity to 1000000 instead of 20
+/// ASSERT_TRUE(q.try_enqueue_for(1700, microseconds(1)));
+/// q.reset_capacity(20); // set capacity to 20 again
+/// ASSERT_FALSE(q.try_enqueue(100));
+/// ASSERT_EQ(q.size(), 4);
+/// ASSERT_EQ(q.weight(), 25);
+/// int v;
+/// q.dequeue(v);
+/// ASSERT_EQ(v, 100);
+/// ASSERT_TRUE(q.try_dequeue(v));
+/// ASSERT_EQ(v, 200);
+/// ASSERT_TRUE(q.try_dequeue_until(v, now() + seconds(1)));
+/// ASSERT_EQ(v, 500);
+/// ASSERT_EQ(q.size(), 1);
+/// ASSERT_EQ(q.weight(), 17);
+/// @endcode
+///
+/// Design:
+/// - The implementation is on top of UnboundedQueue.
+/// - The main FIFO functionality is in UnboundedQueue.
+/// DynamicBoundedQueue manages keeping the total queue weight
+/// within the specified capacity.
+/// - For the sake of scalability, the data structures are designed to
+/// minimize interference between producers on one side and
+/// consumers on the other.
+/// - Producers add to a debit variable the weight of the added
+/// element and check capacity.
+/// - Consumers add to a credit variable the weight of the removed
+/// element.
+/// - Producers, for the sake of scalability, use fetch_add to add to
+/// the debit variable and subtract if it exceeded capacity,
+/// rather than using compare_exchange to avoid overshooting.
+/// - Consumers, infrequently, transfer credit to a transfer variable
+/// and unblock any blocked producers. The transfer variable can be
+/// used by producers to decrease their debit when needed.
+/// - Note that a low capacity will trigger frequent credit transfer
+/// by consumers that may degrade performance. Capacity should not
+/// be set too low.
+/// - Transfer of credit by consumers is triggered when the amount of
+/// credit reaches a threshold (1/10 of capacity).
+/// - The waiting of consumers is handled in UnboundedQueue.
+/// The waiting of producers is handled in this template.
+/// - For a producer operation, if the difference between debit and
+/// capacity (plus some slack to account for the transfer threshold)
+/// does not accommodate the weight of the new element, it first
+/// tries to transfer credit that may have already been made
+/// available by consumers. If this is insufficient and MayBlock is
+/// true, then the producer uses a futex to block until new credit
+/// is transferred by a consumer.
+///
+/// Memory Usage:
+/// - Aside from three cache lines for managing capacity, the memory
+/// for queue elements is managed using UnboundedQueue and grows and
+/// shrinks dynamically with the number of elements.
+/// - The template parameter LgAlign can be used to reduce memory usage
+/// at the cost of increased chance of false sharing.
+
+template <typename T>
+struct DefaultWeightFn {
+ template <typename Arg>
+ uint64_t operator()(Arg&&) const noexcept {
+ return 1;
+ }
+};
+
+template <
+ typename T,
+ bool SingleProducer,
+ bool SingleConsumer,
+ bool MayBlock,
+ size_t LgSegmentSize = 8,
+ size_t LgAlign = 7,
+ typename WeightFn = DefaultWeightFn<T>,
+ template <typename> class Atom = std::atomic>
+class DynamicBoundedQueue {
+ using Weight = uint64_t;
+
+ enum WaitingState : uint32_t {
+ NOTWAITING = 0,
+ WAITING = 1,
+ };
+
+ static constexpr bool SPSC = SingleProducer && SingleConsumer;
+ static constexpr size_t Align = 1u << LgAlign;
+
+ static_assert(LgAlign < 16, "LgAlign must be < 16");
+
+ /// Data members
+
+ // Read mostly by producers
+ alignas(Align) Atom<Weight> debit_; // written frequently only by producers
+ Atom<Weight> capacity_; // written rarely by capacity resets
+
+ // Read mostly by consumers
+ alignas(Align) Atom<Weight> credit_; // written frequently only by consumers
+ Atom<Weight> threshold_; // written rarely only by capacity resets
+
+ // Normally written and read rarely by producers and consumers
+ // May be read frequently by producers when capacity is full
+ alignas(Align) Atom<Weight> transfer_;
+ detail::Futex<Atom> waiting_;
+
+ // Underlying unbounded queue
+ UnboundedQueue<
+ T,
+ SingleProducer,
+ SingleConsumer,
+ MayBlock,
+ LgSegmentSize,
+ LgAlign,
+ Atom>
+ q_;
+
+ public:
+ /** constructor */
+ explicit DynamicBoundedQueue(Weight capacity)
+ : debit_(0),
+ capacity_(capacity + threshold(capacity)), // capacity slack
+ credit_(0),
+ threshold_(threshold(capacity)),
+ transfer_(0) {}
+
+ /** destructor */
+ ~DynamicBoundedQueue() {}
+
+ /// Enqueue functions
+
+ /** enqueue */
+ FOLLY_ALWAYS_INLINE void enqueue(const T& v) {
+ enqueueImpl(v);
+ }
+
+ FOLLY_ALWAYS_INLINE void enqueue(T&& v) {
+ enqueueImpl(std::move(v));
+ }
+
+ /** try_enqueue */
+ FOLLY_ALWAYS_INLINE bool try_enqueue(const T& v) {
+ return tryEnqueueImpl(v);
+ }
+
+ FOLLY_ALWAYS_INLINE bool try_enqueue(T&& v) {
+ return tryEnqueueImpl(std::move(v));
+ }
+
+ /** try_enqueue_until */
+ template <typename Clock, typename Duration>
+ FOLLY_ALWAYS_INLINE bool try_enqueue_until(
+ const T& v,
+ const std::chrono::time_point<Clock, Duration>& deadline) {
+ return tryEnqueueUntilImpl(v, deadline);
+ }
+
+ template <typename Clock, typename Duration>
+ FOLLY_ALWAYS_INLINE bool try_enqueue_until(
+ T&& v,
+ const std::chrono::time_point<Clock, Duration>& deadline) {
+ return tryEnqueueUntilImpl(std::move(v), deadline);
+ }
+
+ /** try_enqueue_for */
+ template <typename Rep, typename Period>
+ FOLLY_ALWAYS_INLINE bool try_enqueue_for(
+ const T& v,
+ const std::chrono::duration<Rep, Period>& duration) {
+ return tryEnqueueForImpl(v, duration);
+ }
+
+ template <typename Rep, typename Period>
+ FOLLY_ALWAYS_INLINE bool try_enqueue_for(
+ T&& v,
+ const std::chrono::duration<Rep, Period>& duration) {
+ return tryEnqueueForImpl(std::move(v), duration);
+ }
+
+ /// Dequeue functions
+
+ /** dequeue */
+ FOLLY_ALWAYS_INLINE void dequeue(T& elem) {
+ q_.dequeue(elem);
+ addCredit(WeightFn()(elem));
+ }
+
+ /** try_dequeue */
+ FOLLY_ALWAYS_INLINE bool try_dequeue(T& elem) {
+ if (q_.try_dequeue(elem)) {
+ addCredit(WeightFn()(elem));
+ return true;
+ }
+ return false;
+ }
+
+ /** try_dequeue_until */
+ template <typename Clock, typename Duration>
+ FOLLY_ALWAYS_INLINE bool try_dequeue_until(
+ T& elem,
+ const std::chrono::time_point<Clock, Duration>& deadline) {
+ if (q_.try_dequeue_until(elem, deadline)) {
+ addCredit(WeightFn()(elem));
+ return true;
+ }
+ return false;
+ }
+
+ /** try_dequeue_for */
+ template <typename Rep, typename Period>
+ FOLLY_ALWAYS_INLINE bool try_dequeue_for(
+ T& elem,
+ const std::chrono::duration<Rep, Period>& duration) {
+ if (q_.try_dequeue_for(elem, duration)) {
+ addCredit(WeightFn()(elem));
+ return true;
+ }
+ return false;
+ }
+
+ /// Secondary functions
+
+ /** reset_capacity */
+ void reset_capacity(Weight capacity) noexcept {
+ Weight thresh = threshold(capacity);
+ capacity_.store(capacity + thresh, std::memory_order_release);
+ threshold_.store(thresh, std::memory_order_release);
+ }
+
+ /** weight */
+ Weight weight() const noexcept {
+ auto d = getDebit();
+ auto c = getCredit();
+ auto t = getTransfer();
+ return d > (c + t) ? d - (c + t) : 0;
+ }
+
+ /** size */
+ size_t size() const noexcept {
+ return q_.size();
+ }
+
+ /** empty */
+ bool empty() const noexcept {
+ return q_.empty();
+ }
+
+ private:
+ /// Private functions ///
+
+ // Calculation of threshold to move credits in bulk from consumers
+ // to producers
+ constexpr Weight threshold(Weight capacity) const noexcept {
+ return capacity / 10;
+ }
+
+ // Functions called frequently by producers
+
+ template <typename Arg>
+ FOLLY_ALWAYS_INLINE void enqueueImpl(Arg&& v) {
+ tryEnqueueUntilImpl(
+ std::forward<Arg>(v), std::chrono::steady_clock::time_point::max());
+ }
+
+ template <typename Arg>
+ FOLLY_ALWAYS_INLINE bool tryEnqueueImpl(Arg&& v) {
+ return tryEnqueueUntilImpl(
+ std::forward<Arg>(v), std::chrono::steady_clock::time_point::min());
+ }
+
+ template <typename Clock, typename Duration, typename Arg>
+ FOLLY_ALWAYS_INLINE bool tryEnqueueUntilImpl(
+ Arg&& v,
+ const std::chrono::time_point<Clock, Duration>& deadline) {
+ Weight weight = WeightFn()(std::forward<Arg>(v));
+ if (LIKELY(tryAddDebit(weight))) {
+ q_.enqueue(std::forward<Arg>(v));
+ return true;
+ }
+ return tryEnqueueUntilSlow(std::forward<Arg>(v), deadline);
+ }
+
+ template <typename Rep, typename Period, typename Arg>
+ FOLLY_ALWAYS_INLINE bool tryEnqueueForImpl(
+ Arg&& v,
+ const std::chrono::duration<Rep, Period>& duration) {
+ if (LIKELY(tryEnqueueImpl(std::forward<Arg>(v)))) {
+ return true;
+ }
+ auto deadline = std::chrono::steady_clock::now() + duration;
+ return tryEnqueueUntilSlow(std::forward<Arg>(v), deadline);
+ }
+
+ FOLLY_ALWAYS_INLINE bool tryAddDebit(Weight weight) noexcept {
+ Weight capacity = getCapacity();
+ Weight before = fetchAddDebit(weight);
+ if (LIKELY(before + weight <= capacity)) {
+ return true;
+ } else {
+ subDebit(weight);
+ return false;
+ }
+ }
+
+ FOLLY_ALWAYS_INLINE Weight getCapacity() const noexcept {
+ return capacity_.load(std::memory_order_acquire);
+ }
+
+ FOLLY_ALWAYS_INLINE Weight fetchAddDebit(Weight weight) noexcept {
+ Weight before;
+ if (SingleProducer) {
+ before = getDebit();
+ debit_.store(before + weight, std::memory_order_relaxed);
+ } else {
+ before = debit_.fetch_add(weight, std::memory_order_acq_rel);
+ }
+ return before;
+ }
+
+ FOLLY_ALWAYS_INLINE Weight getDebit() const noexcept {
+ return debit_.load(std::memory_order_acquire);
+ }
+
+ // Functions called frequently by consumers
+
+ FOLLY_ALWAYS_INLINE void addCredit(Weight weight) noexcept {
+ Weight before = fetchAddCredit(weight);
+ Weight thresh = getThreshold();
+ if (before + weight >= thresh && before < thresh) {
+ transferCredit();
+ }
+ }
+
+ FOLLY_ALWAYS_INLINE Weight fetchAddCredit(Weight weight) noexcept {
+ Weight before;
+ if (SingleConsumer) {
+ before = getCredit();
+ credit_.store(before + weight, std::memory_order_relaxed);
+ } else {
+ before = credit_.fetch_add(weight, std::memory_order_acq_rel);
+ }
+ return before;
+ }
+
+ FOLLY_ALWAYS_INLINE Weight getCredit() const noexcept {
+ return credit_.load(std::memory_order_acquire);
+ }
+
+ FOLLY_ALWAYS_INLINE Weight getThreshold() const noexcept {
+ return threshold_.load(std::memory_order_acquire);
+ }
+
+ /** Functions called infrequently by producers */
+
+ void subDebit(Weight weight) noexcept {
+ Weight before;
+ if (SingleProducer) {
+ before = getDebit();
+ debit_.store(before - weight, std::memory_order_relaxed);
+ } else {
+ before = debit_.fetch_sub(weight, std::memory_order_acq_rel);
+ }
+ DCHECK_GE(before, weight);
+ }
+
+ template <typename Clock, typename Duration, typename Arg>
+ bool tryEnqueueUntilSlow(
+ Arg&& v,
+ const std::chrono::time_point<Clock, Duration>& deadline) {
+ Weight weight = WeightFn()(std::forward<Arg>(v));
+ if (canEnqueue(deadline, weight)) {
+ q_.enqueue(std::forward<Arg>(v));
+ return true;
+ } else {
+ return false;
+ }
+ }
+
+ template <typename Clock, typename Duration>
+ bool canEnqueue(
+ const std::chrono::time_point<Clock, Duration>& deadline,
+ Weight weight) noexcept {
+ Weight capacity = getCapacity();
+ while (true) {
+ tryReduceDebit();
+ Weight debit = getDebit();
+ if ((debit + weight <= capacity) && tryAddDebit(weight)) {
+ return true;
+ }
+ if (Clock::now() >= deadline) {
+ return false;
+ }
+ if (MayBlock) {
+ if (canBlock(weight, capacity)) {
+ waiting_.futexWaitUntil(WAITING, deadline);
+ }
+ } else {
+ asm_volatile_pause();
+ }
+ }
+ }
+
+ bool canBlock(Weight weight, Weight capacity) noexcept {
+ waiting_.store(WAITING, std::memory_order_relaxed);
+ std::atomic_thread_fence(std::memory_order_seq_cst);
+ tryReduceDebit();
+ Weight debit = getDebit();
+ return debit + weight > capacity;
+ }
+
+ bool tryReduceDebit() noexcept {
+ Weight w = takeTransfer();
+ if (w > 0) {
+ subDebit(w);
+ }
+ return w > 0;
+ }
+
+ Weight takeTransfer() noexcept {
+ Weight w = getTransfer();
+ if (w > 0) {
+ w = transfer_.exchange(0, std::memory_order_acq_rel);
+ }
+ return w;
+ }
+
+ Weight getTransfer() const noexcept {
+ return transfer_.load(std::memory_order_acquire);
+ }
+
+ /** Functions called infrequently by consumers */
+
+ void transferCredit() noexcept {
+ Weight credit = takeCredit();
+ transfer_.fetch_add(credit, std::memory_order_acq_rel);
+ if (MayBlock) {
+ std::atomic_thread_fence(std::memory_order_seq_cst);
+ waiting_.store(NOTWAITING, std::memory_order_relaxed);
+ waiting_.futexWake();
+ }
+ }
+
+ Weight takeCredit() noexcept {
+ Weight credit;
+ if (SingleConsumer) {
+ credit = credit_.load(std::memory_order_relaxed);
+ credit_.store(0, std::memory_order_relaxed);
+ } else {
+ credit = credit_.exchange(0, std::memory_order_acq_rel);
+ }
+ return credit;
+ }
+
+}; // DynamicBoundedQueue
+
+/// Aliases
+
+/** DSPSCQueue */
+template <
+ typename T,
+ bool MayBlock,
+ size_t LgSegmentSize = 8,
+ size_t LgAlign = 7,
+ typename WeightFn = DefaultWeightFn<T>,
+ template <typename> class Atom = std::atomic>
+using DSPSCQueue = DynamicBoundedQueue<
+ T,
+ true,
+ true,
+ MayBlock,
+ LgSegmentSize,
+ LgAlign,
+ WeightFn,
+ Atom>;
+
+/** DMPSCQueue */
+template <
+ typename T,
+ bool MayBlock,
+ size_t LgSegmentSize = 8,
+ size_t LgAlign = 7,
+ typename WeightFn = DefaultWeightFn<T>,
+ template <typename> class Atom = std::atomic>
+using DMPSCQueue = DynamicBoundedQueue<
+ T,
+ false,
+ true,
+ MayBlock,
+ LgSegmentSize,
+ LgAlign,
+ WeightFn,
+ Atom>;
+
+/** DSPMCQueue */
+template <
+ typename T,
+ bool MayBlock,
+ size_t LgSegmentSize = 8,
+ size_t LgAlign = 7,
+ typename WeightFn = DefaultWeightFn<T>,
+ template <typename> class Atom = std::atomic>
+using DSPMCQueue = DynamicBoundedQueue<
+ T,
+ true,
+ false,
+ MayBlock,
+ LgSegmentSize,
+ LgAlign,
+ WeightFn,
+ Atom>;
+
+/** DMPMCQueue */
+template <
+ typename T,
+ bool MayBlock,
+ size_t LgSegmentSize = 8,
+ size_t LgAlign = 7,
+ typename WeightFn = DefaultWeightFn<T>,
+ template <typename> class Atom = std::atomic>
+using DMPMCQueue = DynamicBoundedQueue<
+ T,
+ false,
+ false,
+ MayBlock,
+ LgSegmentSize,
+ LgAlign,
+ WeightFn,
+ Atom>;
+
+} // namespace folly
--- /dev/null
+/*
+ * Copyright 2017-present Facebook, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include <folly/concurrency/DynamicBoundedQueue.h>
+#include <folly/MPMCQueue.h>
+#include <folly/ProducerConsumerQueue.h>
+#include <folly/portability/GTest.h>
+
+#include <glog/logging.h>
+
+#include <atomic>
+#include <thread>
+
+DEFINE_bool(bench, false, "run benchmark");
+DEFINE_int32(reps, 10, "number of reps");
+DEFINE_int32(ops, 1000000, "number of operations per rep");
+DEFINE_int64(capacity, 1000000, "capacity");
+
+template <typename T, bool MayBlock, typename WeightFn>
+using DSPSC = folly::DSPSCQueue<T, MayBlock, 8, 7, WeightFn>;
+
+template <typename T, bool MayBlock, typename WeightFn>
+using DMPSC = folly::DMPSCQueue<T, MayBlock, 8, 7, WeightFn>;
+
+template <typename T, bool MayBlock, typename WeightFn>
+using DSPMC = folly::DSPMCQueue<T, MayBlock, 8, 7, WeightFn>;
+
+template <typename T, bool MayBlock, typename WeightFn>
+using DMPMC = folly::DMPMCQueue<T, MayBlock, 8, 7, WeightFn>;
+
+template <template <typename, bool, typename> class Q, bool MayBlock>
+void basic_test() {
+ auto dur = std::chrono::microseconds(100);
+ auto deadline = std::chrono::steady_clock::now() + dur;
+
+ struct CustomWeightFn {
+ uint64_t operator()(int val) {
+ return val * 100;
+ }
+ };
+
+ Q<int, MayBlock, CustomWeightFn> q(10000);
+
+ ASSERT_TRUE(q.empty());
+ ASSERT_EQ(q.size(), 0);
+ int v;
+ ASSERT_FALSE(q.try_dequeue(v));
+
+ q.enqueue(1);
+ ASSERT_TRUE(q.try_enqueue(2));
+ ASSERT_TRUE(q.try_enqueue_until(3, deadline));
+ ASSERT_TRUE(q.try_enqueue_for(4, dur));
+
+ ASSERT_EQ(q.size(), 4);
+ ASSERT_EQ(q.weight(), 1000);
+ ASSERT_FALSE(q.empty());
+
+ q.dequeue(v);
+ ASSERT_EQ(v, 1);
+ ASSERT_TRUE(q.try_dequeue(v));
+ ASSERT_EQ(v, 2);
+ ASSERT_TRUE(q.try_dequeue_until(v, deadline));
+ ASSERT_EQ(v, 3);
+ ASSERT_TRUE(q.try_dequeue_for(v, dur));
+ ASSERT_EQ(v, 4);
+
+ ASSERT_TRUE(q.empty());
+ ASSERT_EQ(q.size(), 0);
+ ASSERT_EQ(q.weight(), 0);
+}
+
+TEST(DynamicBoundedQueue, basic) {
+ basic_test<DSPSC, false>();
+ basic_test<DMPSC, false>();
+ basic_test<DSPMC, false>();
+ basic_test<DMPMC, false>();
+ basic_test<DSPSC, true>();
+ basic_test<DMPSC, true>();
+ basic_test<DSPMC, true>();
+ basic_test<DMPMC, true>();
+}
+
+TEST(DynamicBoundedQueue, size) {
+ {
+ folly::DynamicBoundedQueue<int, true, true, true> q(10);
+ ASSERT_EQ(sizeof(q), 640);
+ }
+ {
+ folly::DynamicBoundedQueue<uint64_t, false, false, false, 7, 4> q(10);
+ ASSERT_EQ(sizeof(q), 80);
+ }
+}
+
+template <template <typename, bool, typename> class Q, bool MayBlock>
+void move_test() {
+ struct Foo {
+ int v_;
+ explicit Foo(int v) noexcept : v_(v) {}
+ Foo(const Foo&) = delete;
+ Foo& operator=(const Foo&) = delete;
+ Foo(Foo&& other) noexcept : v_(other.v_) {}
+ Foo& operator=(Foo&& other) noexcept {
+ v_ = other.v_;
+ return *this;
+ }
+ };
+
+ struct CustomWeightFn {
+ uint64_t operator()(Foo&&) {
+ return 10;
+ }
+ };
+
+ auto dur = std::chrono::microseconds(100);
+ auto deadline = std::chrono::steady_clock::now() + dur;
+
+ Q<Foo, MayBlock, CustomWeightFn> q(100);
+ Foo v(1);
+ q.enqueue(std::move(v));
+ ASSERT_TRUE(q.try_enqueue(std::move(v)));
+ ASSERT_TRUE(q.try_enqueue_until(std::move(v), deadline));
+ ASSERT_TRUE(q.try_enqueue_for(std::move(v), dur));
+
+ ASSERT_EQ(q.size(), 4);
+ ASSERT_EQ(q.weight(), 40);
+}
+
+TEST(DynamicBoundedQueue, move) {
+ move_test<DSPSC, false>();
+ move_test<DMPSC, false>();
+ move_test<DSPMC, false>();
+ move_test<DMPMC, false>();
+ move_test<DSPSC, true>();
+ move_test<DMPSC, true>();
+ move_test<DSPMC, true>();
+ move_test<DMPMC, true>();
+}
+
+template <template <typename, bool, typename> class Q, bool MayBlock>
+void capacity_test() {
+ struct CustomWeightFn {
+ uint64_t operator()(int val) {
+ return val;
+ }
+ };
+
+ Q<int, MayBlock, CustomWeightFn> q(1000);
+ ASSERT_EQ(q.weight(), 0);
+ int v;
+ q.enqueue(100);
+ ASSERT_EQ(q.weight(), 100);
+ q.enqueue(300);
+ ASSERT_EQ(q.weight(), 400);
+ ASSERT_FALSE(q.try_enqueue(1200));
+ q.reset_capacity(2000); // reset capacityy to 2000
+ ASSERT_TRUE(q.try_enqueue(1200));
+ ASSERT_EQ(q.weight(), 1600);
+ ASSERT_EQ(q.size(), 3);
+ q.reset_capacity(1000); // reset capacity back to 1000
+ ASSERT_FALSE(q.try_enqueue(100));
+ q.dequeue(v);
+ ASSERT_EQ(v, 100);
+ ASSERT_EQ(q.weight(), 1500);
+ q.dequeue(v);
+ ASSERT_EQ(v, 300);
+ ASSERT_EQ(q.weight(), 1200);
+}
+
+TEST(DynamicBoundedQueue, capacity) {
+ capacity_test<DSPSC, false>();
+ capacity_test<DMPSC, false>();
+ capacity_test<DSPMC, false>();
+ capacity_test<DMPMC, false>();
+ capacity_test<DSPSC, true>();
+ capacity_test<DMPSC, true>();
+ capacity_test<DSPMC, true>();
+ capacity_test<DMPMC, true>();
+}
+
+template <typename ProdFunc, typename ConsFunc, typename EndFunc>
+inline uint64_t run_once(
+ int nprod,
+ int ncons,
+ const ProdFunc& prodFn,
+ const ConsFunc& consFn,
+ const EndFunc& endFn) {
+ std::atomic<bool> start{false};
+ std::atomic<int> ready{0};
+
+ /* producers */
+ std::vector<std::thread> prodThr(nprod);
+ for (int tid = 0; tid < nprod; ++tid) {
+ prodThr[tid] = std::thread([&, tid] {
+ ++ready;
+ while (!start.load()) {
+ /* spin */;
+ }
+ prodFn(tid);
+ });
+ }
+
+ /* consumers */
+ std::vector<std::thread> consThr(ncons);
+ for (int tid = 0; tid < ncons; ++tid) {
+ consThr[tid] = std::thread([&, tid] {
+ ++ready;
+ while (!start.load()) {
+ /* spin */;
+ }
+ consFn(tid);
+ });
+ }
+
+ /* wait for all producers and consumers to be ready */
+ while (ready.load() < (nprod + ncons)) {
+ /* spin */;
+ }
+
+ /* begin time measurement */
+ auto tbegin = std::chrono::steady_clock::now();
+ start.store(true);
+
+ /* wait for completion */
+ for (int i = 0; i < nprod; ++i) {
+ prodThr[i].join();
+ }
+ for (int i = 0; i < ncons; ++i) {
+ consThr[i].join();
+ }
+
+ /* end time measurement */
+ auto tend = std::chrono::steady_clock::now();
+ endFn();
+ return std::chrono::duration_cast<std::chrono::nanoseconds>(tend - tbegin)
+ .count();
+}
+
+template <bool SingleProducer, bool SingleConsumer, bool MayBlock>
+void enq_deq_test(const int nprod, const int ncons) {
+ if (SingleProducer) {
+ ASSERT_EQ(nprod, 1);
+ }
+ if (SingleConsumer) {
+ ASSERT_EQ(ncons, 1);
+ }
+
+ int ops = 1000;
+ folly::DynamicBoundedQueue<int, SingleProducer, SingleConsumer, MayBlock, 2>
+ q(10);
+ std::atomic<uint64_t> sum(0);
+
+ auto prod = [&](int tid) {
+ for (int i = tid; i < ops; i += nprod) {
+ if ((i % 3) == 0) {
+ while (!q.try_enqueue(i)) {
+ /* keep trying */;
+ }
+ } else if ((i % 3) == 1) {
+ auto dur = std::chrono::microseconds(100);
+ while (!q.try_enqueue_for(i, dur)) {
+ /* keep trying */;
+ }
+ } else {
+ q.enqueue(i);
+ }
+ }
+ };
+
+ auto cons = [&](int tid) {
+ uint64_t mysum = 0;
+ for (int i = tid; i < ops; i += ncons) {
+ int v;
+ if ((i % 3) == 0) {
+ while (!q.try_dequeue(v)) {
+ /* keep trying */;
+ }
+ } else if ((i % 3) == 1) {
+ auto dur = std::chrono::microseconds(100);
+ while (!q.try_dequeue_for(v, dur)) {
+ /* keep trying */;
+ }
+ } else {
+ q.dequeue(v);
+ }
+ if (nprod == 1 && ncons == 1) {
+ ASSERT_EQ(v, i);
+ }
+ mysum += v;
+ }
+ sum.fetch_add(mysum);
+ };
+
+ auto endfn = [&] {
+ uint64_t expected = ops;
+ expected *= ops - 1;
+ expected /= 2;
+ ASSERT_EQ(sum.load(), expected);
+ };
+ run_once(nprod, ncons, prod, cons, endfn);
+}
+
+TEST(DynamicBoundedQueue, enq_deq) {
+ /* SPSC */
+ enq_deq_test<true, true, false>(1, 1);
+ enq_deq_test<true, true, true>(1, 1);
+ /* MPSC */
+ enq_deq_test<false, true, false>(1, 1);
+ enq_deq_test<false, true, true>(1, 1);
+ enq_deq_test<false, true, false>(2, 1);
+ enq_deq_test<false, true, true>(2, 1);
+ enq_deq_test<false, true, false>(10, 1);
+ enq_deq_test<false, true, true>(10, 1);
+ /* SPMC */
+ enq_deq_test<true, false, false>(1, 1);
+ enq_deq_test<true, false, true>(1, 1);
+ enq_deq_test<true, false, false>(1, 2);
+ enq_deq_test<true, false, true>(1, 2);
+ enq_deq_test<true, false, false>(1, 10);
+ enq_deq_test<true, false, true>(1, 10);
+ /* MPMC */
+ enq_deq_test<false, false, false>(1, 1);
+ enq_deq_test<false, false, true>(1, 1);
+ enq_deq_test<false, false, false>(2, 1);
+ enq_deq_test<false, false, true>(2, 1);
+ enq_deq_test<false, false, false>(10, 1);
+ enq_deq_test<false, false, true>(10, 1);
+ enq_deq_test<false, false, false>(1, 2);
+ enq_deq_test<false, false, true>(1, 2);
+ enq_deq_test<false, false, false>(1, 10);
+ enq_deq_test<false, false, true>(1, 10);
+ enq_deq_test<false, false, false>(2, 2);
+ enq_deq_test<false, false, true>(2, 2);
+ enq_deq_test<false, false, false>(10, 10);
+ enq_deq_test<false, false, true>(10, 10);
+}
+
+template <typename RepFunc>
+uint64_t runBench(const std::string& name, int ops, const RepFunc& repFn) {
+ int reps = FLAGS_reps;
+ uint64_t min = UINTMAX_MAX;
+ uint64_t max = 0;
+ uint64_t sum = 0;
+
+ repFn(); // sometimes first run is outlier
+ for (int r = 0; r < reps; ++r) {
+ uint64_t dur = repFn();
+ sum += dur;
+ min = std::min(min, dur);
+ max = std::max(max, dur);
+ // if each rep takes too long run at least 2 reps
+ const uint64_t minute = 60000000000ULL;
+ if (sum > minute && r >= 1) {
+ reps = r + 1;
+ break;
+ }
+ }
+
+ const std::string unit = " ns";
+ uint64_t avg = sum / reps;
+ uint64_t res = min;
+ std::cout << name;
+ std::cout << " " << std::setw(4) << max / ops << unit;
+ std::cout << " " << std::setw(4) << avg / ops << unit;
+ std::cout << " " << std::setw(4) << res / ops << unit;
+ std::cout << std::endl;
+ return res;
+}
+
+template <template <typename, bool, typename> class Q, typename T, int Op>
+uint64_t bench(const int nprod, const int ncons, const std::string& name) {
+ int ops = FLAGS_ops;
+ auto repFn = [&] {
+ Q<T, Op == 3 || Op == 4 || Op == 5, folly::DefaultWeightFn<T>> q(
+ FLAGS_capacity);
+ std::atomic<uint64_t> sum(0);
+ auto prod = [&](int tid) {
+ for (int i = tid; i < ops; i += nprod) {
+ if (Op == 0 || Op == 3) {
+ while (!q.try_enqueue(i)) {
+ /* keep trying */;
+ }
+ } else if (Op == 1 || Op == 4) {
+ while (!q.try_enqueue_for(i, std::chrono::microseconds(1000))) {
+ /* keep trying */;
+ }
+ } else {
+ q.enqueue(i);
+ }
+ }
+ };
+ auto cons = [&](int tid) {
+ uint64_t mysum = 0;
+ T v = -1;
+ for (int i = tid; i < ops; i += ncons) {
+ if (Op == 0 || Op == 3) {
+ while (!q.try_dequeue(v)) {
+ /* keep trying */;
+ }
+ } else if (Op == 1 || Op == 4) {
+ while (!q.try_dequeue_for(v, std::chrono::microseconds(1000))) {
+ /* keep trying */;
+ }
+ } else {
+ q.dequeue(v);
+ }
+ if (nprod == 1 && ncons == 1) {
+ DCHECK_EQ(int(v), i);
+ }
+ mysum += v;
+ }
+ sum.fetch_add(mysum);
+ };
+ auto endfn = [&] {
+ uint64_t expected = ops;
+ expected *= ops - 1;
+ expected /= 2;
+ ASSERT_EQ(sum.load(), expected);
+ };
+ return run_once(nprod, ncons, prod, cons, endfn);
+ };
+ return runBench(name, ops, repFn);
+}
+
+/* For performance comparison */
+template <typename T>
+class MPMC {
+ folly::MPMCQueue<T> q_;
+
+ public:
+ explicit MPMC(uint64_t capacity) : q_(capacity) {}
+
+ void enqueue(const T& v) {
+ q_.blockingWrite(v);
+ }
+
+ void enqueue(T&& v) {
+ q_.blockingWrite(std::move(v));
+ }
+
+ bool try_enqueue(const T& v) {
+ return q_.write(v);
+ }
+
+ bool try_enqueue(const T&& v) {
+ return q_.write(std::move(v));
+ }
+
+ template <typename Rep, typename Period>
+ bool try_enqueue_for(
+ const T& v,
+ const std::chrono::duration<Rep, Period>& duration) {
+ return q_.tryWriteUntil(std::chrono::steady_clock::now() + duration, v);
+ }
+
+ void dequeue(T& item) {
+ q_.blockingRead(item);
+ }
+
+ bool try_dequeue(T& item) {
+ return q_.read(item);
+ }
+
+ template <typename Rep, typename Period>
+ bool try_dequeue_for(
+ T& item,
+ const std::chrono::duration<Rep, Period>& duration) {
+ return q_.tryReadUntil(std::chrono::steady_clock::now() + duration, item);
+ }
+};
+
+template <typename T, bool, typename>
+using FMPMC = MPMC<T>;
+
+template <typename T>
+class PCQ {
+ folly::ProducerConsumerQueue<T> q_;
+
+ public:
+ explicit PCQ(uint64_t capacity) : q_(capacity) {}
+
+ void enqueue(const T&) {
+ ASSERT_TRUE(false);
+ }
+
+ bool try_enqueue(const T& v) {
+ return q_.write(v);
+ }
+
+ bool try_enqueue(T&& v) {
+ return q_.write(std::move(v));
+ }
+
+ template <typename Rep, typename Period>
+ bool try_enqueue_for(const T&, const std::chrono::duration<Rep, Period>&) {
+ return false;
+ }
+
+ void dequeue(T&) {
+ ASSERT_TRUE(false);
+ }
+
+ bool try_dequeue(T& item) {
+ return q_.read(item);
+ }
+
+ template <typename Rep, typename Period>
+ bool try_dequeue_for(T&, const std::chrono::duration<Rep, Period>&) {
+ return false;
+ }
+};
+
+template <typename T, bool, typename>
+using FPCQ = PCQ<T>;
+
+template <size_t M>
+struct IntArray {
+ int a[M];
+ IntArray() {}
+ /* implicit */ IntArray(int v) {
+ for (size_t i = 0; i < M; ++i) {
+ a[i] = v;
+ }
+ }
+ operator int() {
+ return a[0];
+ }
+};
+
+void dottedLine() {
+ std::cout << ".............................................................."
+ << std::endl;
+}
+
+template <typename T>
+void type_benches(const int np, const int nc, const std::string& name) {
+ std::cout << name
+ << "===========================================" << std::endl;
+ if (np == 1 && nc == 1) {
+ bench<DSPSC, T, 0>(1, 1, "DSPSC try spin only ");
+ bench<DSPSC, T, 1>(1, 1, "DSPSC timed spin only ");
+ bench<DSPSC, T, 2>(1, 1, "DSPSC wait spin only ");
+ bench<DSPSC, T, 3>(1, 1, "DSPSC try may block ");
+ bench<DSPSC, T, 4>(1, 1, "DSPSC timed may block ");
+ bench<DSPSC, T, 5>(1, 1, "DSPSC wait may block ");
+ dottedLine();
+ }
+ if (nc == 1) {
+ bench<DMPSC, T, 0>(np, 1, "DMPSC try spin only ");
+ bench<DMPSC, T, 1>(np, 1, "DMPSC timed spin only ");
+ bench<DMPSC, T, 2>(np, 1, "DMPSC wait spin only ");
+ bench<DMPSC, T, 3>(np, 1, "DMPSC try may block ");
+ bench<DMPSC, T, 4>(np, 1, "DMPSC timed may block ");
+ bench<DMPSC, T, 5>(np, 1, "DMPSC wait may block ");
+ dottedLine();
+ }
+ if (np == 1) {
+ bench<DSPMC, T, 0>(1, nc, "DSPMC try spin only ");
+ bench<DSPMC, T, 1>(1, nc, "DSPMC timed spin only ");
+ bench<DSPMC, T, 2>(1, nc, "DSPMC wait spin only ");
+ bench<DSPMC, T, 3>(1, nc, "DSPMC try may block ");
+ bench<DSPMC, T, 4>(1, nc, "DSPMC timed may block ");
+ bench<DSPMC, T, 5>(1, nc, "DSPMC wait may block ");
+ dottedLine();
+ }
+ bench<DMPMC, T, 0>(np, nc, "DMPMC try spin only ");
+ bench<DMPMC, T, 1>(np, nc, "DMPMC timed spin only ");
+ bench<DMPMC, T, 2>(np, nc, "DMPMC wait spin only ");
+ bench<DMPMC, T, 3>(np, nc, "DMPMC try may block ");
+ bench<DMPMC, T, 4>(np, nc, "DMPMC timed may block ");
+ bench<DMPMC, T, 5>(np, nc, "DMPMC wait may block ");
+ dottedLine();
+ if (np == 1 && nc == 1) {
+ bench<FPCQ, T, 0>(1, 1, "folly::PCQ read ");
+ dottedLine();
+ }
+ bench<FMPMC, T, 3>(np, nc, "folly::MPMC read ");
+ bench<FMPMC, T, 4>(np, nc, "folly::MPMC tryReadUntil ");
+ bench<FMPMC, T, 5>(np, nc, "folly::MPMC blockingRead ");
+ std::cout << "=============================================================="
+ << std::endl;
+}
+
+void benches(const int np, const int nc) {
+ std::cout << "====================== " << std::setw(2) << np << " prod"
+ << " " << std::setw(2) << nc << " cons"
+ << " ======================" << std::endl;
+ type_benches<uint32_t>(np, nc, "=== uint32_t ======");
+ // Benchmarks for other element sizes can be added as follows:
+ // type_benches<IntArray<4>>(np, nc, "=== IntArray<4> ===");
+}
+
+TEST(DynamicBoundedQueue, bench) {
+ if (!FLAGS_bench) {
+ return;
+ }
+ std::cout << "=============================================================="
+ << std::endl;
+ std::cout << std::setw(2) << FLAGS_reps << " reps of " << std::setw(8)
+ << FLAGS_ops << " handoffs\n";
+ dottedLine();
+ std::cout << "Using capacity " << FLAGS_capacity << " for all queues\n";
+ std::cout << "=============================================================="
+ << std::endl;
+ std::cout << "Test name Max time Avg time Min time"
+ << std::endl;
+
+ for (int np : {1, 8, 32}) {
+ for (int nc : {1, 8, 32}) {
+ benches(np, nc);
+ }
+ }
+}
+
+/*
+$ numactl -N 1 dynamic_bounded_queue_test --bench --capacity=1000000
+==============================================================
+10 reps of 1000000 handoffs
+..............................................................
+Using capacity 1000000 for all queues
+==============================================================
+Test name Max time Avg time Min time
+====================== 1 prod 1 cons ======================
+=== uint32_t =================================================
+DSPSC try spin only 7 ns 7 ns 7 ns
+DSPSC timed spin only 9 ns 9 ns 9 ns
+DSPSC wait spin only 7 ns 7 ns 7 ns
+DSPSC try may block 39 ns 36 ns 33 ns
+DSPSC timed may block 39 ns 38 ns 37 ns
+DSPSC wait may block 37 ns 34 ns 33 ns
+..............................................................
+DMPSC try spin only 54 ns 53 ns 52 ns
+DMPSC timed spin only 53 ns 52 ns 51 ns
+DMPSC wait spin only 53 ns 52 ns 51 ns
+DMPSC try may block 67 ns 65 ns 64 ns
+DMPSC timed may block 64 ns 62 ns 60 ns
+DMPSC wait may block 64 ns 62 ns 60 ns
+..............................................................
+DSPMC try spin only 25 ns 24 ns 23 ns
+DSPMC timed spin only 24 ns 23 ns 23 ns
+DSPMC wait spin only 22 ns 21 ns 21 ns
+DSPMC try may block 30 ns 26 ns 21 ns
+DSPMC timed may block 25 ns 24 ns 24 ns
+DSPMC wait may block 22 ns 22 ns 21 ns
+..............................................................
+DMPMC try spin only 48 ns 45 ns 39 ns
+DMPMC timed spin only 31 ns 30 ns 24 ns
+DMPMC wait spin only 49 ns 47 ns 43 ns
+DMPMC try may block 63 ns 62 ns 61 ns
+DMPMC timed may block 64 ns 60 ns 46 ns
+DMPMC wait may block 61 ns 60 ns 58 ns
+..............................................................
+folly::PCQ read 8 ns 7 ns 7 ns
+..............................................................
+folly::MPMC read 53 ns 51 ns 49 ns
+folly::MPMC tryReadUntil 112 ns 106 ns 103 ns
+folly::MPMC blockingRead 50 ns 47 ns 46 ns
+==============================================================
+====================== 1 prod 8 cons ======================
+=== uint32_t =================================================
+DSPMC try spin only 166 ns 159 ns 153 ns
+DSPMC timed spin only 169 ns 163 ns 156 ns
+DSPMC wait spin only 60 ns 57 ns 54 ns
+DSPMC try may block 170 ns 163 ns 153 ns
+DSPMC timed may block 165 ns 157 ns 150 ns
+DSPMC wait may block 94 ns 78 ns 52 ns
+..............................................................
+DMPMC try spin only 170 ns 161 ns 149 ns
+DMPMC timed spin only 167 ns 158 ns 149 ns
+DMPMC wait spin only 93 ns 80 ns 51 ns
+DMPMC try may block 164 ns 161 ns 154 ns
+DMPMC timed may block 163 ns 156 ns 145 ns
+DMPMC wait may block 117 ns 102 ns 87 ns
+..............................................................
+folly::MPMC read 176 ns 168 ns 159 ns
+folly::MPMC tryReadUntil 1846 ns 900 ns 521 ns
+folly::MPMC blockingRead 219 ns 193 ns 178 ns
+==============================================================
+====================== 1 prod 32 cons ======================
+=== uint32_t =================================================
+DSPMC try spin only 224 ns 213 ns 204 ns
+DSPMC timed spin only 215 ns 209 ns 199 ns
+DSPMC wait spin only 334 ns 114 ns 52 ns
+DSPMC try may block 240 ns 215 ns 202 ns
+DSPMC timed may block 245 ns 221 ns 200 ns
+DSPMC wait may block 215 ns 151 ns 98 ns
+..............................................................
+DMPMC try spin only 348 ns 252 ns 204 ns
+DMPMC timed spin only 379 ns 244 ns 198 ns
+DMPMC wait spin only 173 ns 116 ns 89 ns
+DMPMC try may block 362 ns 231 ns 173 ns
+DMPMC timed may block 282 ns 236 ns 206 ns
+DMPMC wait may block 252 ns 172 ns 134 ns
+..............................................................
+folly::MPMC read 540 ns 290 ns 186 ns
+folly::MPMC tryReadUntil 24946 ns 24280 ns 23113 ns
+folly::MPMC blockingRead 1345 ns 1297 ns 1265 ns
+==============================================================
+====================== 8 prod 1 cons ======================
+=== uint32_t =================================================
+DMPSC try spin only 68 ns 64 ns 60 ns
+DMPSC timed spin only 69 ns 66 ns 61 ns
+DMPSC wait spin only 67 ns 65 ns 62 ns
+DMPSC try may block 77 ns 73 ns 67 ns
+DMPSC timed may block 75 ns 74 ns 68 ns
+DMPSC wait may block 76 ns 73 ns 69 ns
+..............................................................
+DMPMC try spin only 76 ns 66 ns 60 ns
+DMPMC timed spin only 77 ns 68 ns 63 ns
+DMPMC wait spin only 68 ns 65 ns 63 ns
+DMPMC try may block 76 ns 72 ns 64 ns
+DMPMC timed may block 82 ns 74 ns 67 ns
+DMPMC wait may block 77 ns 72 ns 68 ns
+..............................................................
+folly::MPMC read 170 ns 166 ns 161 ns
+folly::MPMC tryReadUntil 184 ns 179 ns 173 ns
+folly::MPMC blockingRead 79 ns 73 ns 53 ns
+==============================================================
+====================== 8 prod 8 cons ======================
+=== uint32_t =================================================
+DMPMC try spin only 181 ns 169 ns 133 ns
+DMPMC timed spin only 179 ns 168 ns 148 ns
+DMPMC wait spin only 77 ns 76 ns 71 ns
+DMPMC try may block 180 ns 179 ns 176 ns
+DMPMC timed may block 174 ns 166 ns 153 ns
+DMPMC wait may block 79 ns 78 ns 75 ns
+..............................................................
+folly::MPMC read 219 ns 206 ns 183 ns
+folly::MPMC tryReadUntil 262 ns 244 ns 213 ns
+folly::MPMC blockingRead 61 ns 58 ns 54 ns
+==============================================================
+====================== 8 prod 32 cons ======================
+=== uint32_t =================================================
+DMPMC try spin only 265 ns 217 ns 203 ns
+DMPMC timed spin only 236 ns 215 ns 202 ns
+DMPMC wait spin only 93 ns 83 ns 77 ns
+DMPMC try may block 325 ns 234 ns 200 ns
+DMPMC timed may block 206 ns 202 ns 193 ns
+DMPMC wait may block 139 ns 93 ns 76 ns
+..............................................................
+folly::MPMC read 259 ns 214 ns 201 ns
+folly::MPMC tryReadUntil 281 ns 274 ns 267 ns
+folly::MPMC blockingRead 62 ns 59 ns 57 ns
+==============================================================
+====================== 32 prod 1 cons ======================
+=== uint32_t =================================================
+DMPSC try spin only 95 ns 57 ns 45 ns
+DMPSC timed spin only 94 ns 52 ns 46 ns
+DMPSC wait spin only 104 ns 54 ns 43 ns
+DMPSC try may block 59 ns 54 ns 51 ns
+DMPSC timed may block 86 ns 58 ns 52 ns
+DMPSC wait may block 76 ns 57 ns 53 ns
+..............................................................
+DMPMC try spin only 68 ns 64 ns 60 ns
+DMPMC timed spin only 137 ns 73 ns 61 ns
+DMPMC wait spin only 86 ns 65 ns 58 ns
+DMPMC try may block 89 ns 71 ns 65 ns
+DMPMC timed may block 82 ns 69 ns 65 ns
+DMPMC wait may block 84 ns 71 ns 66 ns
+..............................................................
+folly::MPMC read 222 ns 203 ns 192 ns
+folly::MPMC tryReadUntil 239 ns 232 ns 191 ns
+folly::MPMC blockingRead 78 ns 68 ns 64 ns
+==============================================================
+====================== 32 prod 8 cons ======================
+=== uint32_t =================================================
+DMPMC try spin only 183 ns 138 ns 107 ns
+DMPMC timed spin only 237 ns 158 ns 98 ns
+DMPMC wait spin only 87 ns 70 ns 58 ns
+DMPMC try may block 169 ns 132 ns 92 ns
+DMPMC timed may block 172 ns 133 ns 79 ns
+DMPMC wait may block 166 ns 89 ns 66 ns
+..............................................................
+folly::MPMC read 221 ns 194 ns 183 ns
+folly::MPMC tryReadUntil 258 ns 244 ns 230 ns
+folly::MPMC blockingRead 60 ns 54 ns 47 ns
+==============================================================
+====================== 32 prod 32 cons ======================
+=== uint32_t =================================================
+DMPMC try spin only 419 ns 252 ns 181 ns
+DMPMC timed spin only 252 ns 212 ns 179 ns
+DMPMC wait spin only 153 ns 79 ns 62 ns
+DMPMC try may block 302 ns 236 ns 182 ns
+DMPMC timed may block 266 ns 213 ns 170 ns
+DMPMC wait may block 262 ns 120 ns 64 ns
+..............................................................
+folly::MPMC read 269 ns 245 ns 199 ns
+folly::MPMC tryReadUntil 257 ns 245 ns 235 ns
+folly::MPMC blockingRead 53 ns 48 ns 45 ns
+==============================================================
+
+$ numactl -N 1 dynamic_bounded_queue_test --bench --capacity=1000
+==============================================================
+10 reps of 1000000 handoffs
+..............................................................
+Using capacity 1000 for all queues
+==============================================================
+Test name Max time Avg time Min time
+====================== 1 prod 1 cons ======================
+=== uint32_t =================================================
+DSPSC try spin only 7 ns 7 ns 7 ns
+DSPSC timed spin only 9 ns 9 ns 9 ns
+DSPSC wait spin only 7 ns 7 ns 7 ns
+DSPSC try may block 34 ns 33 ns 31 ns
+DSPSC timed may block 34 ns 34 ns 33 ns
+DSPSC wait may block 30 ns 30 ns 29 ns
+..............................................................
+DMPSC try spin only 60 ns 57 ns 55 ns
+DMPSC timed spin only 55 ns 52 ns 51 ns
+DMPSC wait spin only 57 ns 54 ns 52 ns
+DMPSC try may block 66 ns 62 ns 39 ns
+DMPSC timed may block 67 ns 64 ns 62 ns
+DMPSC wait may block 67 ns 65 ns 64 ns
+..............................................................
+DSPMC try spin only 27 ns 25 ns 24 ns
+DSPMC timed spin only 25 ns 25 ns 24 ns
+DSPMC wait spin only 23 ns 23 ns 22 ns
+DSPMC try may block 31 ns 26 ns 24 ns
+DSPMC timed may block 33 ns 30 ns 30 ns
+DSPMC wait may block 37 ns 29 ns 28 ns
+..............................................................
+DMPMC try spin only 55 ns 53 ns 51 ns
+DMPMC timed spin only 36 ns 31 ns 26 ns
+DMPMC wait spin only 54 ns 53 ns 51 ns
+DMPMC try may block 68 ns 64 ns 51 ns
+DMPMC timed may block 66 ns 63 ns 60 ns
+DMPMC wait may block 68 ns 63 ns 60 ns
+..............................................................
+folly::PCQ read 15 ns 13 ns 11 ns
+..............................................................
+folly::MPMC read 60 ns 56 ns 51 ns
+folly::MPMC tryReadUntil 134 ns 112 ns 102 ns
+folly::MPMC blockingRead 57 ns 51 ns 48 ns
+==============================================================
+====================== 1 prod 8 cons ======================
+=== uint32_t =================================================
+DSPMC try spin only 169 ns 162 ns 151 ns
+DSPMC timed spin only 178 ns 166 ns 149 ns
+DSPMC wait spin only 59 ns 55 ns 54 ns
+DSPMC try may block 173 ns 163 ns 153 ns
+DSPMC timed may block 171 ns 166 ns 156 ns
+DSPMC wait may block 71 ns 57 ns 51 ns
+..............................................................
+DMPMC try spin only 172 ns 164 ns 158 ns
+DMPMC timed spin only 173 ns 164 ns 156 ns
+DMPMC wait spin only 77 ns 62 ns 53 ns
+DMPMC try may block 181 ns 163 ns 152 ns
+DMPMC timed may block 174 ns 165 ns 151 ns
+DMPMC wait may block 91 ns 72 ns 52 ns
+..............................................................
+folly::MPMC read 178 ns 167 ns 161 ns
+folly::MPMC tryReadUntil 991 ns 676 ns 423 ns
+folly::MPMC blockingRead 154 ns 129 ns 96 ns
+==============================================================
+====================== 1 prod 32 cons ======================
+=== uint32_t =================================================
+DSPMC try spin only 462 ns 288 ns 201 ns
+DSPMC timed spin only 514 ns 283 ns 201 ns
+DSPMC wait spin only 100 ns 60 ns 45 ns
+DSPMC try may block 531 ns 318 ns 203 ns
+DSPMC timed may block 1379 ns 891 ns 460 ns
+DSPMC wait may block 148 ns 111 ns 82 ns
+..............................................................
+DMPMC try spin only 404 ns 312 ns 205 ns
+DMPMC timed spin only 337 ns 253 ns 219 ns
+DMPMC wait spin only 130 ns 97 ns 72 ns
+DMPMC try may block 532 ns 265 ns 201 ns
+DMPMC timed may block 846 ns 606 ns 412 ns
+DMPMC wait may block 158 ns 112 ns 87 ns
+..............................................................
+folly::MPMC read 880 ns 419 ns 284 ns
+folly::MPMC tryReadUntil 23432 ns 23184 ns 23007 ns
+folly::MPMC blockingRead 1353 ns 1308 ns 1279 ns
+==============================================================
+====================== 8 prod 1 cons ======================
+=== uint32_t =================================================
+DMPSC try spin only 67 ns 63 ns 51 ns
+DMPSC timed spin only 69 ns 65 ns 63 ns
+DMPSC wait spin only 67 ns 65 ns 61 ns
+DMPSC try may block 73 ns 69 ns 63 ns
+DMPSC timed may block 72 ns 69 ns 64 ns
+DMPSC wait may block 71 ns 70 ns 68 ns
+..............................................................
+DMPMC try spin only 70 ns 64 ns 59 ns
+DMPMC timed spin only 76 ns 66 ns 53 ns
+DMPMC wait spin only 68 ns 66 ns 64 ns
+DMPMC try may block 71 ns 68 ns 66 ns
+DMPMC timed may block 72 ns 70 ns 67 ns
+DMPMC wait may block 73 ns 70 ns 67 ns
+..............................................................
+folly::MPMC read 193 ns 167 ns 153 ns
+folly::MPMC tryReadUntil 497 ns 415 ns 348 ns
+folly::MPMC blockingRead 163 ns 134 ns 115 ns
+==============================================================
+====================== 8 prod 8 cons ======================
+=== uint32_t =================================================
+DMPMC try spin only 216 ns 203 ns 196 ns
+DMPMC timed spin only 199 ns 186 ns 178 ns
+DMPMC wait spin only 63 ns 60 ns 58 ns
+DMPMC try may block 212 ns 198 ns 183 ns
+DMPMC timed may block 180 ns 170 ns 162 ns
+DMPMC wait may block 72 ns 68 ns 65 ns
+..............................................................
+folly::MPMC read 225 ns 201 ns 188 ns
+folly::MPMC tryReadUntil 255 ns 248 ns 232 ns
+folly::MPMC blockingRead 52 ns 48 ns 42 ns
+==============================================================
+====================== 8 prod 32 cons ======================
+=== uint32_t =================================================
+DMPMC try spin only 360 ns 302 ns 195 ns
+DMPMC timed spin only 350 ns 272 ns 218 ns
+DMPMC wait spin only 92 ns 72 ns 61 ns
+DMPMC try may block 352 ns 263 ns 223 ns
+DMPMC timed may block 218 ns 213 ns 209 ns
+DMPMC wait may block 98 ns 77 ns 70 ns
+..............................................................
+folly::MPMC read 611 ns 461 ns 339 ns
+folly::MPMC tryReadUntil 270 ns 260 ns 253 ns
+folly::MPMC blockingRead 89 ns 84 ns 80 ns
+==============================================================
+====================== 32 prod 1 cons ======================
+=== uint32_t =================================================
+DMPSC try spin only 389 ns 248 ns 149 ns
+DMPSC timed spin only 356 ns 235 ns 120 ns
+DMPSC wait spin only 343 ns 242 ns 125 ns
+DMPSC try may block 412 ns 294 ns 168 ns
+DMPSC timed may block 332 ns 271 ns 189 ns
+DMPSC wait may block 280 ns 252 ns 199 ns
+..............................................................
+DMPMC try spin only 393 ns 269 ns 105 ns
+DMPMC timed spin only 328 ns 240 ns 112 ns
+DMPMC wait spin only 502 ns 266 ns 107 ns
+DMPMC try may block 514 ns 346 ns 192 ns
+DMPMC timed may block 339 ns 318 ns 278 ns
+DMPMC wait may block 319 ns 307 ns 292 ns
+..............................................................
+folly::MPMC read 948 ns 517 ns 232 ns
+folly::MPMC tryReadUntil 9649 ns 7567 ns 4140 ns
+folly::MPMC blockingRead 1365 ns 1316 ns 1131 ns
+==============================================================
+====================== 32 prod 8 cons ======================
+=== uint32_t =================================================
+DMPMC try spin only 436 ns 257 ns 115 ns
+DMPMC timed spin only 402 ns 272 ns 121 ns
+DMPMC wait spin only 136 ns 78 ns 55 ns
+DMPMC try may block 454 ns 227 ns 78 ns
+DMPMC timed may block 155 ns 137 ns 116 ns
+DMPMC wait may block 62 ns 59 ns 57 ns
+..............................................................
+folly::MPMC read 677 ns 497 ns 336 ns
+folly::MPMC tryReadUntil 268 ns 262 ns 258 ns
+folly::MPMC blockingRead 87 ns 85 ns 82 ns
+==============================================================
+====================== 32 prod 32 cons ======================
+=== uint32_t =================================================
+DMPMC try spin only 786 ns 381 ns 142 ns
+DMPMC timed spin only 795 ns 346 ns 126 ns
+DMPMC wait spin only 334 ns 107 ns 55 ns
+DMPMC try may block 535 ns 317 ns 144 ns
+DMPMC timed may block 197 ns 192 ns 183 ns
+DMPMC wait may block 189 ns 75 ns 60 ns
+..............................................................
+folly::MPMC read 1110 ns 919 ns 732 ns
+folly::MPMC tryReadUntil 214 ns 210 ns 206 ns
+folly::MPMC blockingRead 53 ns 52 ns 51 ns
+==============================================================
+
+$ lscpu
+Architecture: x86_64
+CPU op-mode(s): 32-bit, 64-bit
+Byte Order: Little Endian
+CPU(s): 32
+On-line CPU(s) list: 0-31
+Thread(s) per core: 2
+Core(s) per socket: 8
+Socket(s): 2
+NUMA node(s): 2
+Vendor ID: GenuineIntel
+CPU family: 6
+Model: 45
+Model name: Intel(R) Xeon(R) CPU E5-2660 0 @ 2.20GHz
+Stepping: 6
+CPU MHz: 2200.000
+CPU max MHz: 2200.0000
+CPU min MHz: 1200.0000
+BogoMIPS: 4399.92
+Virtualization: VT-x
+L1d cache: 32K
+L1i cache: 32K
+L2 cache: 256K
+L3 cache: 20480K
+NUMA node0 CPU(s): 0-7,16-23
+NUMA node1 CPU(s): 8-15,24-31
+
+Flags: fpu vme de pse tsc msr pae mce cx8 apic sep mtrr
+ pge mca cmov pat pse36 clflush dts acpi mmx fxsr
+ sse sse2 ss ht tm pbe syscall nx pdpe1gb rdtscp
+ lm constant_tsc arch_perfmon pebs bts rep_good
+ nopl xtopology nonstop_tsc aperfmperf eagerfpu
+ pni pclmulqdq dtes64 monitor ds_cpl vmx smx est
+ tm2 ssse3 cx16 xtpr pdcm pcid dca sse4_1 sse4_2
+ x2apic popcnt tsc_deadline_timer aes xsave avx
+ lahf_lm epb tpr_shadow vnmi flexpriority ept vpid
+ xsaveopt dtherm arat pln pts
+ */