From 9a8218df9c4f07c4390bd039a2605e33ec2eaf47 Mon Sep 17 00:00:00 2001 From: Maged Michael Date: Mon, 4 Dec 2017 11:07:30 -0800 Subject: [PATCH] Unbounded queue Summary: UnboundedQueue supports: - SPSC, MPSC, SCMP, MPMC - Non-waiting, waiting, and timed consumer operations. - Producers never wait or fail (unless out-of-memory). - Memory usage grows and shrinks dynamically ``` /// UnboundedQueue supports a variety of options for unbounded /// dynamically expanding an shrinking queues, including variations of: /// - Single vs. multiple producers /// - Single vs. multiple consumers /// - Blocking vs. spin-waiting /// - Non-waiting, timed, and waiting consumer operations. /// Producer operations never wait or fail (unless out-of-memory). /// /// Template parameters: /// - T: element type /// - SingleProducer: true if there can be only one producer at a /// time. /// - SingleConsumer: true if there can be only one consumer at a /// time. /// - MayBlock: true if consumers may block, false if they only /// spins. A performance tuning parameter. /// - LgSegmentSize (default 8): Log base 2 of number of elements per /// segment. A performance tuning parameter. See below. /// /// When to use UnboundedQueue: /// - If a small bound may lead to deadlock or performance degradation /// under bursty patterns. /// - If there is no risk of the queue growing too much. /// /// When not to use UnboundedQueue: /// - If there is risk of the queue growing too much and a large bound /// is acceptable, then use DynamicBoundedQueue. /// - If the queue must not allocate on enqueue or it must have a /// small bound, then use fixed-size MPMCQueue or (if non-blocking /// SPSC) ProducerConsumerQueue. /// /// Template Aliases: /// USPSCQueue /// UMPSCQueue /// USPMCQueue /// UMPMCQueue /// /// Functions: /// Producer operations never wait or fail (unless OOM) /// void enqueue(const T&); /// void enqueue(T&&); /// Adds an element to the end of the queue. /// /// Consumer operations: /// void dequeue(T&); /// Extracts an element from the front of the queue. Waits /// until an element is available if needed. /// bool try_dequeue(T&); /// Tries to extracts an element from the front of the queue /// if available. Returns true if successful, false otherwise. /// bool try_dequeue_until(T&, time_point& deadline); /// Tries to extracts an element from the front of the queue /// if available until the specified deadline. Returns true /// if successful, false otherwise. /// bool try_dequeue_for(T&, duration&); /// Tries to extracts an element from the front of the queue /// if available for for the specified duration. Returns true /// if successful, false otherwise. /// /// Secondary functions: /// size_t size(); /// Returns an estimate of the size of the queue. /// bool empty(); /// Returns true only if the queue was empty during the call. /// Note: size() and empty() are guaranteed to be accurate only if ``` Reviewed By: djwatson Differential Revision: D6157613 fbshipit-source-id: db423f86d1d0604d22f6b9c71ea0ed08be32e2a1 --- folly/Makefile.am | 1 + folly/concurrency/UnboundedQueue-inl.h | 568 ++++++++ folly/concurrency/UnboundedQueue.h | 393 ++++++ folly/concurrency/test/UnboundedQueueTest.cpp | 1151 +++++++++++++++++ 4 files changed, 2113 insertions(+) create mode 100644 folly/concurrency/UnboundedQueue-inl.h create mode 100644 folly/concurrency/UnboundedQueue.h create mode 100644 folly/concurrency/test/UnboundedQueueTest.cpp diff --git a/folly/Makefile.am b/folly/Makefile.am index be999f3c..4ca4388d 100644 --- a/folly/Makefile.am +++ b/folly/Makefile.am @@ -57,6 +57,7 @@ nobase_follyinclude_HEADERS = \ concurrency/ConcurrentHashMap.h \ concurrency/CoreCachedSharedPtr.h \ concurrency/detail/ConcurrentHashMap-detail.h \ + concurrency/UnboundedQueue.h \ container/Access.h \ container/Array.h \ container/Iterator.h \ diff --git a/folly/concurrency/UnboundedQueue-inl.h b/folly/concurrency/UnboundedQueue-inl.h new file mode 100644 index 00000000..101f6683 --- /dev/null +++ b/folly/concurrency/UnboundedQueue-inl.h @@ -0,0 +1,568 @@ +/* + * Copyright 2017 Facebook, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#pragma once + +#include +#include +#include + +#include + +namespace folly { + +/* constructor */ + +template < + typename T, + bool SingleProducer, + bool SingleConsumer, + bool MayBlock, + size_t LgSegmentSize, + template class Atom> +inline UnboundedQueue< + T, + SingleProducer, + SingleConsumer, + MayBlock, + LgSegmentSize, + Atom>::UnboundedQueue() { + setProducerTicket(0); + setConsumerTicket(0); + auto s = new Segment(0); + DEBUG_PRINT(s); + setTail(s); + setHead(s); +} + +/* destructor */ + +template < + typename T, + bool SingleProducer, + bool SingleConsumer, + bool MayBlock, + size_t LgSegmentSize, + template class Atom> +inline UnboundedQueue< + T, + SingleProducer, + SingleConsumer, + MayBlock, + LgSegmentSize, + Atom>::~UnboundedQueue() { + Segment* next; + for (auto s = head(); s; s = next) { + next = s->nextSegment(); + if (SPSC) { + delete s; + } else { + s->retire(); // hazptr + } + } +} + +/* dequeue */ + +template < + typename T, + bool SingleProducer, + bool SingleConsumer, + bool MayBlock, + size_t LgSegmentSize, + template class Atom> +FOLLY_ALWAYS_INLINE void UnboundedQueue< + T, + SingleProducer, + SingleConsumer, + MayBlock, + LgSegmentSize, + Atom>::dequeue(T& item) noexcept { + if (SPSC) { + auto s = head(); + dequeueCommon(s, item); + } else { + // Using hazptr_holder instead of hazptr_local because it is + // possible to call ~T() and it may happen to use hazard pointers. + folly::hazptr::hazptr_holder hptr; + auto s = hptr.get_protected(head_); + dequeueCommon(s, item); + } +} + +/* try_dequeue_until */ + +template < + typename T, + bool SingleProducer, + bool SingleConsumer, + bool MayBlock, + size_t LgSegmentSize, + template class Atom> +template +FOLLY_ALWAYS_INLINE bool UnboundedQueue< + T, + SingleProducer, + SingleConsumer, + MayBlock, + LgSegmentSize, + Atom>:: + try_dequeue_until( + T& item, + const std::chrono::time_point& deadline) noexcept { + if (SingleConsumer) { + auto s = head(); + return singleConsumerTryDequeueUntil(s, item, deadline); + } else { + // Using hazptr_holder instead of hazptr_local because it is + // possible to call ~T() and it may happen to use hazard pointers. + folly::hazptr::hazptr_holder hptr; + auto s = hptr.get_protected(head_); + return multiConsumerTryDequeueUntil(s, item, deadline); + } +} + +/* enqueueImpl */ + +template < + typename T, + bool SingleProducer, + bool SingleConsumer, + bool MayBlock, + size_t LgSegmentSize, + template class Atom> +template +FOLLY_ALWAYS_INLINE void UnboundedQueue< + T, + SingleProducer, + SingleConsumer, + MayBlock, + LgSegmentSize, + Atom>::enqueueImpl(Arg&& arg) { + if (SPSC) { + auto s = tail(); + enqueueCommon(s, std::forward(arg)); + } else { + // Using hazptr_holder instead of hazptr_local because it is + // possible that the T construcctor happens to use hazard + // pointers. + folly::hazptr::hazptr_holder hptr; + auto s = hptr.get_protected(tail_); + enqueueCommon(s, std::forward(arg)); + } +} + +/* enqueueCommon */ + +template < + typename T, + bool SingleProducer, + bool SingleConsumer, + bool MayBlock, + size_t LgSegmentSize, + template class Atom> +template +FOLLY_ALWAYS_INLINE void UnboundedQueue< + T, + SingleProducer, + SingleConsumer, + MayBlock, + LgSegmentSize, + Atom>::enqueueCommon(Segment* s, Arg&& arg) { + auto t = fetchIncrementProducerTicket(); + if (!SingleProducer) { + s = findSegment(s, t); + } + DCHECK_GE(t, s->minTicket()); + DCHECK_LT(t, (s->minTicket() + SegmentSize)); + size_t idx = index(t); + Entry& e = s->entry(idx); + e.putItem(std::forward(arg)); + if (responsibleForAlloc(t)) { + allocNextSegment(s, t + SegmentSize); + } + if (responsibleForAdvance(t)) { + advanceTail(s); + } +} + +/* dequeueCommon */ + +template < + typename T, + bool SingleProducer, + bool SingleConsumer, + bool MayBlock, + size_t LgSegmentSize, + template class Atom> +FOLLY_ALWAYS_INLINE void UnboundedQueue< + T, + SingleProducer, + SingleConsumer, + MayBlock, + LgSegmentSize, + Atom>::dequeueCommon(Segment* s, T& item) noexcept { + auto t = fetchIncrementConsumerTicket(); + if (!SingleConsumer) { + s = findSegment(s, t); + } + size_t idx = index(t); + Entry& e = s->entry(idx); + e.takeItem(item); + if (responsibleForAdvance(t)) { + advanceHead(s); + } +} + +/* singleConsumerTryDequeueUntil */ + +template < + typename T, + bool SingleProducer, + bool SingleConsumer, + bool MayBlock, + size_t LgSegmentSize, + template class Atom> +template +FOLLY_ALWAYS_INLINE bool UnboundedQueue< + T, + SingleProducer, + SingleConsumer, + MayBlock, + LgSegmentSize, + Atom>:: + singleConsumerTryDequeueUntil( + Segment* s, + T& item, + const std::chrono::time_point& deadline) noexcept { + auto t = consumerTicket(); + DCHECK_GE(t, s->minTicket()); + DCHECK_LT(t, (s->minTicket() + SegmentSize)); + size_t idx = index(t); + Entry& e = s->entry(idx); + if (!e.tryWaitUntil(deadline)) { + return false; + } + setConsumerTicket(t + 1); + e.takeItem(item); + if (responsibleForAdvance(t)) { + advanceHead(s); + } + return true; +} + +/* multiConsumerTryDequeueUntil */ + +template < + typename T, + bool SingleProducer, + bool SingleConsumer, + bool MayBlock, + size_t LgSegmentSize, + template class Atom> +template +FOLLY_ALWAYS_INLINE bool UnboundedQueue< + T, + SingleProducer, + SingleConsumer, + MayBlock, + LgSegmentSize, + Atom>:: + multiConsumerTryDequeueUntil( + Segment* s, + T& item, + const std::chrono::time_point& deadline) noexcept { + while (true) { + auto t = consumerTicket(); + if (UNLIKELY(t >= (s->minTicket() + SegmentSize))) { + Segment* next; + // Note that the following loop will not spin indefinitely (as + // long as the number of concurrently waiting consumers is not + // greater than SegmentSize). The algorithm guarantees in such a + // case that the producer reponsible for setting the next + // pointer is already running. + while ((next = s->nextSegment()) == nullptr) { + if (Clock::now() > deadline) { + return false; + } + asm_volatile_pause(); + } + s = next; + DCHECK(s != nullptr); + continue; + } + size_t idx = index(t); + Entry& e = s->entry(idx); + if (!e.tryWaitUntil(deadline)) { + return false; + } + if (!consumerTicket_.compare_exchange_weak( + t, t + 1, std::memory_order_acq_rel, std::memory_order_acquire)) { + continue; + } + e.takeItem(item); + if (responsibleForAdvance(t)) { + advanceHead(s); + } + return true; + } +} + +/* findSegment */ + +template < + typename T, + bool SingleProducer, + bool SingleConsumer, + bool MayBlock, + size_t LgSegmentSize, + template class Atom> +inline typename UnboundedQueue< + T, + SingleProducer, + SingleConsumer, + MayBlock, + LgSegmentSize, + Atom>::Segment* +UnboundedQueue< + T, + SingleProducer, + SingleConsumer, + MayBlock, + LgSegmentSize, + Atom>::findSegment(Segment* s, const Ticket t) const noexcept { + while (t >= (s->minTicket() + SegmentSize)) { + Segment* next = s->nextSegment(); + // Note that the following loop will not spin indefinitely. The + // algorithm guarantees that the producer reponsible for setting + // the next pointer is already running. + while (next == nullptr) { + asm_volatile_pause(); + next = s->nextSegment(); + } + DCHECK(next != nullptr); + s = next; + } + return s; +} + +/* allocNextSegment */ + +template < + typename T, + bool SingleProducer, + bool SingleConsumer, + bool MayBlock, + size_t LgSegmentSize, + template class Atom> +inline void UnboundedQueue< + T, + SingleProducer, + SingleConsumer, + MayBlock, + LgSegmentSize, + Atom>::allocNextSegment(Segment* s, const Ticket t) { + auto next = new Segment(t); + if (!SPSC) { + next->acquire_ref_safe(); // hazptr + } + DEBUG_PRINT(s << " " << next); + DCHECK(s->nextSegment() == nullptr); + s->setNextSegment(next); +} + +/* advanceTail */ + +template < + typename T, + bool SingleProducer, + bool SingleConsumer, + bool MayBlock, + size_t LgSegmentSize, + template class Atom> +inline void UnboundedQueue< + T, + SingleProducer, + SingleConsumer, + MayBlock, + LgSegmentSize, + Atom>::advanceTail(Segment* s) noexcept { + Segment* next = s->nextSegment(); + if (!SingleProducer) { + // Note that the following loop will not spin indefinitely. The + // algorithm guarantees that the producer reponsible for setting + // the next pointer is already running. + while (next == nullptr) { + asm_volatile_pause(); + next = s->nextSegment(); + } + } + DCHECK(next != nullptr); + DEBUG_PRINT(s << " " << next); + setTail(next); +} + +/* advanceHead */ + +template < + typename T, + bool SingleProducer, + bool SingleConsumer, + bool MayBlock, + size_t LgSegmentSize, + template class Atom> +inline void UnboundedQueue< + T, + SingleProducer, + SingleConsumer, + MayBlock, + LgSegmentSize, + Atom>::advanceHead(Segment* s) noexcept { + // Note that the following loops will not spin indefinitely. The + // algorithm guarantees that the producers reponsible for advancing + // the tail pointer and setting the next pointer are already + // running. + while (tail() == s) { + asm_volatile_pause(); + } + auto next = s->nextSegment(); + while (next == nullptr) { + next = s->nextSegment(); + } + DEBUG_PRINT(s << " " << next); + setHead(next); + if (SPSC) { + delete s; + } else { + s->retire(); // hazptr + } +} + +/** + * Entry + */ + +template < + typename T, + bool SingleProducer, + bool SingleConsumer, + bool MayBlock, + size_t LgSegmentSize, + template class Atom> +class UnboundedQueue< + T, + SingleProducer, + SingleConsumer, + MayBlock, + LgSegmentSize, + Atom>::Entry { + folly::SaturatingSemaphore flag_; + typename std::aligned_storage::type item_; + + public: + template + FOLLY_ALWAYS_INLINE void putItem(Arg&& arg) { + new (&item_) T(std::forward(arg)); + flag_.post(); + } + + FOLLY_ALWAYS_INLINE void takeItem(T& item) noexcept { + flag_.wait(); + getItem(item); + } + + template + FOLLY_ALWAYS_INLINE bool tryWaitUntil( + const std::chrono::time_point& deadline) noexcept { + return flag_.try_wait_until(deadline); + } + + private: + FOLLY_ALWAYS_INLINE void getItem(T& item) noexcept { + item = std::move(*(folly::launder(itemPtr()))); + destroyItem(); + } + + FOLLY_ALWAYS_INLINE T* itemPtr() noexcept { + return static_cast(static_cast(&item_)); + } + + FOLLY_ALWAYS_INLINE void destroyItem() noexcept { + itemPtr()->~T(); + } +}; // UnboundedQueue::Entry + +/** + * Segment + */ + +template < + typename T, + bool SingleProducer, + bool SingleConsumer, + bool MayBlock, + size_t LgSegmentSize, + template class Atom> +class UnboundedQueue< + T, + SingleProducer, + SingleConsumer, + MayBlock, + LgSegmentSize, + Atom>::Segment : public folly::hazptr::hazptr_obj_base_refcounted { + Atom next_; + const Ticket min_; + bool marked_; // used for iterative deletion + FOLLY_ALIGN_TO_AVOID_FALSE_SHARING + Entry b_[SegmentSize]; + + public: + explicit Segment(const Ticket t) : next_(nullptr), min_(t), marked_(false) {} + + ~Segment() { + if (!SPSC && !marked_) { + auto next = nextSegment(); + while (next) { + if (!next->release_ref()) { // hazptr + return; + } + auto s = next; + next = s->nextSegment(); + s->marked_ = true; + delete s; + } + } + } + + Segment* nextSegment() const noexcept { + return next_.load(std::memory_order_acquire); + } + + void setNextSegment(Segment* s) noexcept { + next_.store(s, std::memory_order_release); + } + + FOLLY_ALWAYS_INLINE Ticket minTicket() const noexcept { + DCHECK_EQ((min_ & (SegmentSize - 1)), 0); + return min_; + } + + FOLLY_ALWAYS_INLINE Entry& entry(size_t index) noexcept { + return b_[index]; + } +}; // UnboundedQueue::Segment + +} // namespace folly diff --git a/folly/concurrency/UnboundedQueue.h b/folly/concurrency/UnboundedQueue.h new file mode 100644 index 00000000..1ff19ed2 --- /dev/null +++ b/folly/concurrency/UnboundedQueue.h @@ -0,0 +1,393 @@ +/* + * Copyright 2017 Facebook, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#pragma once + +#include +#include +#include + +#include + +namespace folly { + +/// UnboundedQueue supports a variety of options for unbounded +/// dynamically expanding an shrinking queues, including variations of: +/// - Single vs. multiple producers +/// - Single vs. multiple consumers +/// - Blocking vs. spin-waiting +/// - Non-waiting, timed, and waiting consumer operations. +/// Producer operations never wait or fail (unless out-of-memory). +/// +/// Template parameters: +/// - T: element type +/// - SingleProducer: true if there can be only one producer at a +/// time. +/// - SingleConsumer: true if there can be only one consumer at a +/// time. +/// - MayBlock: true if consumers may block, false if they only +/// spins. A performance tuning parameter. +/// - LgSegmentSize (default 8): Log base 2 of number of elements per +/// segment. A performance tuning parameter. See below. +/// +/// When to use UnboundedQueue: +/// - If a small bound may lead to deadlock or performance degradation +/// under bursty patterns. +/// - If there is no risk of the queue growing too much. +/// +/// When not to use UnboundedQueue: +/// - If there is risk of the queue growing too much and a large bound +/// is acceptable, then use DynamicBoundedQueue. +/// - If the queue must not allocate on enqueue or it must have a +/// small bound, then use fixed-size MPMCQueue or (if non-blocking +/// SPSC) ProducerConsumerQueue. +/// +/// Template Aliases: +/// USPSCQueue +/// UMPSCQueue +/// USPMCQueue +/// UMPMCQueue +/// +/// Functions: +/// Producer operations never wait or fail (unless OOM) +/// void enqueue(const T&); +/// void enqueue(T&&); +/// Adds an element to the end of the queue. +/// +/// Consumer operations: +/// void dequeue(T&); +/// Extracts an element from the front of the queue. Waits +/// until an element is available if needed. +/// bool try_dequeue(T&); +/// Tries to extracts an element from the front of the queue +/// if available. Returns true if successful, false otherwise. +/// bool try_dequeue_until(T&, time_point& deadline); +/// Tries to extracts an element from the front of the queue +/// if available until the specified deadline. Returns true +/// if successful, false otherwise. +/// bool try_dequeue_for(T&, duration&); +/// Tries to extracts an element from the front of the queue +/// if available for for the specified duration. Returns true +/// if successful, false otherwise. +/// +/// Secondary functions: +/// size_t size(); +/// Returns an estimate of the size of the queue. +/// bool empty(); +/// Returns true only if the queue was empty during the call. +/// Note: size() and empty() are guaranteed to be accurate only if +/// the queue is not changed concurrently. +/// +/// Usage examples: +/// @code +/// /* UMPSC, doesn't block, 1024 int elements per segment */ +/// UMPSCQueue q; +/// q.enqueue(1); +/// q.enqueue(2); +/// q.enqueue(3); +/// ASSERT_FALSE(q.empty()); +/// ASSERT_EQ(q.size(), 3); +/// int v; +/// q.dequeue(v); +/// ASSERT_EQ(v, 1); +/// ASSERT_TRUE(try_dequeue(v)); +/// ASSERT_EQ(v, 2); +/// ASSERT_TRUE(try_dequeue_until(v, now() + seconds(1))); +/// ASSERT_EQ(v, 3); +/// ASSERT_TRUE(q.empty()); +/// ASSERT_EQ(q.size(), 0); +/// ASSERT_FALSE(try_dequeue(v)); +/// ASSERT_FALSE(try_dequeue_for(v, microseconds(100))); +/// @endcode +/// +/// Design: +/// - The queue is composed of one or more segments. Each segment has +/// a fixed size of 2^LgSegmentSize entries. Each segment is used +/// exactly once. +/// - Each entry is composed of a futex and a single element. +/// - The queue contains two 64-bit ticket variables. The producer +/// ticket counts the number of producer tickets isued so far, and +/// the same for the consumer ticket. Each ticket number corresponds +/// to a specific entry in a specific segment. +/// - The queue maintains two pointers, head and tail. Head points to +/// the segment that corresponds to the current consumer +/// ticket. Similarly, tail pointer points to the segment that +/// corresponds to the producer ticket. +/// - Segments are organized as a singly linked list. +/// - The producer with the first ticket in the current producer +/// segment is solely responsible for allocating and linking the +/// next segment. +/// - The producer with the last ticket in the current producer +/// segment is solely responsible for advancing the tail pointer to +/// the next segment. +/// - Similarly, the consumer with the last ticket in the current +/// consumer segment is solely responsible for advancing the head +/// pointer to the next segment. It must ensure that head never +/// overtakes tail. +/// +/// Memory Usage: +/// - An empty queue contains one segment. A nonempty queue contains +/// one or two more segment than fits its contents. +/// - Removed segments are not reclaimed until there are no threads, +/// producers or consumers, have references to them or their +/// predessors. That is, a lagging thread may delay the reclamation +/// of a chain of removed segments. +/// +/// Performance considerations: +/// - All operations take constant time, excluding the costs of +/// allocation, reclamation, interence from other threads, and +/// waiting for actions by other threads. +/// - In general, using the single producer and or single consumer +/// variants yields better performance than the MP and MC +/// alternatives. +/// - SPSC without blocking is the fastest configuration. It doesn't +/// include any read-modify-write atomic operations, full fences, or +/// system calls in the critical path. +/// - MP adds a fetch_add to the critical path of each producer operation. +/// - MC adds a fetch_add or compare_exchange to the critical path of +/// each consumer operation. +/// - The possibility of consumers blocking, even if they never do, +/// adds a compare_exchange to the crtical path of each producer +/// operation. +/// - MPMC, SPMC, MPSC require the use of a deferred reclamation +/// mechanism to guarantee that segments removed from the linked +/// list, i.e., unreachable from the head pointer, are reclaimed +/// only after they are no longer needed by any lagging producers or +/// consumers. +/// - The overheads of segment allocation and reclamation are intended +/// to be mostly out of the critical path of the queue's throughput. +/// - If the template parameter LgSegmentSize is changed, it should be +/// set adequately high to keep the amortized cost of allocation and +/// reclamation low. +/// - Another consideration is that the queue is guaranteed to have +/// enough space for a number of consumers equal to 2^LgSegmentSize +/// for local blocking. Excess waiting consumers spin. +/// - It is recommended to measure perforamnce with different variants +/// when applicable, e.g., UMPMC vs UMPSC. Depending on the use +/// case, sometimes the variant with the higher sequential overhead +/// may yield better results due to, for example, more favorable +/// producer-consumer balance or favorable timining for avoiding +/// costly blocking. + +template < + typename T, + bool SingleProducer, + bool SingleConsumer, + bool MayBlock, + size_t LgSegmentSize = 8, + template class Atom = std::atomic> +class UnboundedQueue { + using Ticket = uint64_t; + class Entry; + class Segment; + + static constexpr bool SPSC = SingleProducer && SingleConsumer; + static constexpr size_t SegmentSize = 1 << LgSegmentSize; + static constexpr size_t Stride = SPSC || (LgSegmentSize <= 1) ? 1 : 27; + + static_assert( + std::is_nothrow_destructible::value, + "T must be nothrow_destructible"); + static_assert((Stride & 1) == 1, "Stride must be odd"); + static_assert(LgSegmentSize < 32, "LgSegmentSize must be < 32"); + + FOLLY_ALIGN_TO_AVOID_FALSE_SHARING + Atom head_; + Atom consumerTicket_; + FOLLY_ALIGN_TO_AVOID_FALSE_SHARING + Atom tail_; + Atom producerTicket_; + + public: + UnboundedQueue(); + ~UnboundedQueue(); + + /** enqueue */ + FOLLY_ALWAYS_INLINE void enqueue(const T& arg) { + enqueueImpl(arg); + } + + FOLLY_ALWAYS_INLINE void enqueue(T&& arg) { + enqueueImpl(std::move(arg)); + } + + /** dequeue */ + void dequeue(T& item) noexcept; + + /** try_dequeue */ + bool try_dequeue(T& item) noexcept { + return try_dequeue_until( + item, std::chrono::steady_clock::time_point::min()); + } + + /** try_dequeue_until */ + template + bool try_dequeue_until( + T& item, + const std::chrono::time_point& deadline) noexcept; + + /** try_dequeue_for */ + template + bool try_dequeue_for( + T& item, + const std::chrono::duration& duration) noexcept { + return try_dequeue_until(item, std::chrono::steady_clock::now() + duration); + } + + /** size */ + size_t size() const noexcept { + auto p = producerTicket(); + auto c = consumerTicket(); + return p > c ? p - c : 0; + } + + /** empty */ + bool empty() const noexcept { + auto c = consumerTicket(); + auto p = producerTicket(); + return p <= c; + } + + private: + template + void enqueueImpl(Arg&& arg); + + template + void enqueueCommon(Segment* s, Arg&& arg); + + void dequeueCommon(Segment* s, T& item) noexcept; + + template + bool singleConsumerTryDequeueUntil( + Segment* s, + T& item, + const std::chrono::time_point& deadline) noexcept; + + template + bool multiConsumerTryDequeueUntil( + Segment* s, + T& item, + const std::chrono::time_point& deadline) noexcept; + + Segment* findSegment(Segment* s, const Ticket t) const noexcept; + + void allocNextSegment(Segment* s, const Ticket t); + + void advanceTail(Segment* s) noexcept; + + void advanceHead(Segment* s) noexcept; + + FOLLY_ALWAYS_INLINE size_t index(Ticket t) const noexcept { + return (t * Stride) & (SegmentSize - 1); + } + + FOLLY_ALWAYS_INLINE bool responsibleForAlloc(Ticket t) const noexcept { + return (t & (SegmentSize - 1)) == 0; + } + + FOLLY_ALWAYS_INLINE bool responsibleForAdvance(Ticket t) const noexcept { + return (t & (SegmentSize - 1)) == (SegmentSize - 1); + } + + FOLLY_ALWAYS_INLINE Segment* head() const noexcept { + return head_.load(std::memory_order_acquire); + } + + FOLLY_ALWAYS_INLINE Segment* tail() const noexcept { + return tail_.load(std::memory_order_acquire); + } + + FOLLY_ALWAYS_INLINE Ticket producerTicket() const noexcept { + return producerTicket_.load(std::memory_order_acquire); + } + + FOLLY_ALWAYS_INLINE Ticket consumerTicket() const noexcept { + return consumerTicket_.load(std::memory_order_acquire); + } + + void setHead(Segment* s) noexcept { + head_.store(s, std::memory_order_release); + } + + void setTail(Segment* s) noexcept { + tail_.store(s, std::memory_order_release); + } + + FOLLY_ALWAYS_INLINE void setProducerTicket(Ticket t) noexcept { + producerTicket_.store(t, std::memory_order_release); + } + + FOLLY_ALWAYS_INLINE void setConsumerTicket(Ticket t) noexcept { + consumerTicket_.store(t, std::memory_order_release); + } + + FOLLY_ALWAYS_INLINE Ticket fetchIncrementConsumerTicket() noexcept { + if (SingleConsumer) { + auto oldval = consumerTicket(); + setConsumerTicket(oldval + 1); + return oldval; + } else { // MC + return consumerTicket_.fetch_add(1, std::memory_order_acq_rel); + } + } + + FOLLY_ALWAYS_INLINE Ticket fetchIncrementProducerTicket() noexcept { + if (SingleProducer) { + auto oldval = producerTicket(); + setProducerTicket(oldval + 1); + return oldval; + } else { // MP + return producerTicket_.fetch_add(1, std::memory_order_acq_rel); + } + } +}; // UnboundedQueue + +/* Aliases */ + +template < + typename T, + bool MayBlock, + size_t LgSegmentSize = 8, + template class Atom = std::atomic> +using USPSCQueue = UnboundedQueue; + +template < + typename T, + bool MayBlock, + size_t LgSegmentSize = 8, + template class Atom = std::atomic> +using UMPSCQueue = + UnboundedQueue; + +template < + typename T, + bool MayBlock, + size_t LgSegmentSize = 8, + template class Atom = std::atomic> +using USPMCQueue = + UnboundedQueue; + +template < + typename T, + bool MayBlock, + size_t LgSegmentSize = 8, + template class Atom = std::atomic> +using UMPMCQueue = + UnboundedQueue; + +} // namespace folly + +#include diff --git a/folly/concurrency/test/UnboundedQueueTest.cpp b/folly/concurrency/test/UnboundedQueueTest.cpp new file mode 100644 index 00000000..4eccea11 --- /dev/null +++ b/folly/concurrency/test/UnboundedQueueTest.cpp @@ -0,0 +1,1151 @@ +/* + * Copyright 2017 Facebook, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include +#include +#include +#include + +#include + +#include +#include + +DEFINE_bool(bench, false, "run benchmark"); +DEFINE_int32(reps, 10, "number of reps"); +DEFINE_int32(ops, 1000000, "number of operations per rep"); + +template +using USPSC = folly::USPSCQueue; + +template +using UMPSC = folly::UMPSCQueue; + +template +using USPMC = folly::USPMCQueue; + +template +using UMPMC = folly::UMPMCQueue; + +template