/*
- * Copyright 2017 Facebook, Inc.
+ * Copyright 2017-present Facebook, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
#include <chrono>
#include <memory>
+#include <glog/logging.h>
+
#include <folly/concurrency/CacheLocality.h>
+#include <folly/experimental/hazptr/hazptr.h>
+#include <folly/synchronization/SaturatingSemaphore.h>
namespace folly {
/// - SingleConsumer: true if there can be only one consumer at a
/// time.
/// - MayBlock: true if consumers may block, false if they only
-/// spins. A performance tuning parameter.
+/// spin. A performance tuning parameter.
/// - LgSegmentSize (default 8): Log base 2 of number of elements per
/// segment. A performance tuning parameter. See below.
+/// - LgAlign (default 7): Log base 2 of alignment directive; can be
+/// used to balance scalability (avoidance of false sharing) with
+/// memory efficiency.
///
/// When to use UnboundedQueue:
/// - If a small bound may lead to deadlock or performance degradation
/// SPSC) ProducerConsumerQueue.
///
/// Template Aliases:
-/// USPSCQueue<T, MayBlock, LgSegmentSize>
-/// UMPSCQueue<T, MayBlock, LgSegmentSize>
-/// USPMCQueue<T, MayBlock, LgSegmentSize>
-/// UMPMCQueue<T, MayBlock, LgSegmentSize>
+/// USPSCQueue<T, MayBlock, LgSegmentSize, LgAlign>
+/// UMPSCQueue<T, MayBlock, LgSegmentSize, LgAlign>
+/// USPMCQueue<T, MayBlock, LgSegmentSize, LgAlign>
+/// UMPMCQueue<T, MayBlock, LgSegmentSize, LgAlign>
///
/// Functions:
/// Producer operations never wait or fail (unless OOM)
/// Extracts an element from the front of the queue. Waits
/// until an element is available if needed.
/// bool try_dequeue(T&);
-/// Tries to extracts an element from the front of the queue
+/// Tries to extract an element from the front of the queue
/// if available. Returns true if successful, false otherwise.
/// bool try_dequeue_until(T&, time_point& deadline);
-/// Tries to extracts an element from the front of the queue
+/// Tries to extract an element from the front of the queue
/// if available until the specified deadline. Returns true
/// if successful, false otherwise.
/// bool try_dequeue_for(T&, duration&);
-/// Tries to extracts an element from the front of the queue
-/// if available for for the specified duration. Returns true
-/// if successful, false otherwise.
+/// Tries to extract an element from the front of the queue if
+/// available for until the expiration of the specified
+/// duration. Returns true if successful, false otherwise.
///
/// Secondary functions:
/// size_t size();
/// exactly once.
/// - Each entry is composed of a futex and a single element.
/// - The queue contains two 64-bit ticket variables. The producer
-/// ticket counts the number of producer tickets isued so far, and
+/// ticket counts the number of producer tickets issued so far, and
/// the same for the consumer ticket. Each ticket number corresponds
/// to a specific entry in a specific segment.
/// - The queue maintains two pointers, head and tail. Head points to
/// one or two more segment than fits its contents.
/// - Removed segments are not reclaimed until there are no threads,
/// producers or consumers, have references to them or their
-/// predessors. That is, a lagging thread may delay the reclamation
+/// predecessors. That is, a lagging thread may delay the reclamation
/// of a chain of removed segments.
+/// - The template parameter LgAlign can be used to reduce memory usage
+/// at the cost of increased chance of false sharing.
///
/// Performance considerations:
/// - All operations take constant time, excluding the costs of
-/// allocation, reclamation, interence from other threads, and
+/// allocation, reclamation, interference from other threads, and
/// waiting for actions by other threads.
/// - In general, using the single producer and or single consumer
-/// variants yields better performance than the MP and MC
+/// variants yield better performance than the MP and MC
/// alternatives.
/// - SPSC without blocking is the fastest configuration. It doesn't
/// include any read-modify-write atomic operations, full fences, or
/// - MC adds a fetch_add or compare_exchange to the critical path of
/// each consumer operation.
/// - The possibility of consumers blocking, even if they never do,
-/// adds a compare_exchange to the crtical path of each producer
+/// adds a compare_exchange to the critical path of each producer
/// operation.
/// - MPMC, SPMC, MPSC require the use of a deferred reclamation
/// mechanism to guarantee that segments removed from the linked
/// - Another consideration is that the queue is guaranteed to have
/// enough space for a number of consumers equal to 2^LgSegmentSize
/// for local blocking. Excess waiting consumers spin.
-/// - It is recommended to measure perforamnce with different variants
+/// - It is recommended to measure performance with different variants
/// when applicable, e.g., UMPMC vs UMPSC. Depending on the use
/// case, sometimes the variant with the higher sequential overhead
/// may yield better results due to, for example, more favorable
-/// producer-consumer balance or favorable timining for avoiding
+/// producer-consumer balance or favorable timing for avoiding
/// costly blocking.
template <
bool SingleConsumer,
bool MayBlock,
size_t LgSegmentSize = 8,
+ size_t LgAlign = 7,
template <typename> class Atom = std::atomic>
class UnboundedQueue {
using Ticket = uint64_t;
class Segment;
static constexpr bool SPSC = SingleProducer && SingleConsumer;
- static constexpr size_t SegmentSize = 1 << LgSegmentSize;
static constexpr size_t Stride = SPSC || (LgSegmentSize <= 1) ? 1 : 27;
+ static constexpr size_t SegmentSize = 1u << LgSegmentSize;
+ static constexpr size_t Align = 1u << LgAlign;
static_assert(
std::is_nothrow_destructible<T>::value,
"T must be nothrow_destructible");
static_assert((Stride & 1) == 1, "Stride must be odd");
static_assert(LgSegmentSize < 32, "LgSegmentSize must be < 32");
+ static_assert(LgAlign < 16, "LgAlign must be < 16");
+
+ struct Consumer {
+ Atom<Segment*> head;
+ Atom<Ticket> ticket;
+ };
+ struct Producer {
+ Atom<Segment*> tail;
+ Atom<Ticket> ticket;
+ };
- FOLLY_ALIGN_TO_AVOID_FALSE_SHARING
- Atom<Segment*> head_;
- Atom<Ticket> consumerTicket_;
- FOLLY_ALIGN_TO_AVOID_FALSE_SHARING
- Atom<Segment*> tail_;
- Atom<Ticket> producerTicket_;
+ alignas(Align) Consumer c_;
+ alignas(Align) Producer p_;
public:
- UnboundedQueue();
- ~UnboundedQueue();
+ /** constructor */
+ UnboundedQueue() {
+ setProducerTicket(0);
+ setConsumerTicket(0);
+ Segment* s = new Segment(0);
+ setTail(s);
+ setHead(s);
+ }
+
+ /** destructor */
+ ~UnboundedQueue() {
+ Segment* next;
+ for (Segment* s = head(); s; s = next) {
+ next = s->nextSegment();
+ reclaimSegment(s);
+ }
+ }
/** enqueue */
FOLLY_ALWAYS_INLINE void enqueue(const T& arg) {
}
/** dequeue */
- void dequeue(T& item) noexcept;
+ FOLLY_ALWAYS_INLINE void dequeue(T& item) noexcept {
+ dequeueImpl(item);
+ }
/** try_dequeue */
- bool try_dequeue(T& item) noexcept {
- return try_dequeue_until(
- item, std::chrono::steady_clock::time_point::min());
+ FOLLY_ALWAYS_INLINE bool try_dequeue(T& item) noexcept {
+ return tryDequeueUntil(item, std::chrono::steady_clock::time_point::min());
}
/** try_dequeue_until */
template <typename Clock, typename Duration>
- bool try_dequeue_until(
+ FOLLY_ALWAYS_INLINE bool try_dequeue_until(
T& item,
- const std::chrono::time_point<Clock, Duration>& deadline) noexcept;
+ const std::chrono::time_point<Clock, Duration>& deadline) noexcept {
+ return tryDequeueUntil(item, deadline);
+ }
/** try_dequeue_for */
template <typename Rep, typename Period>
- bool try_dequeue_for(
+ FOLLY_ALWAYS_INLINE bool try_dequeue_for(
T& item,
const std::chrono::duration<Rep, Period>& duration) noexcept {
- return try_dequeue_until(item, std::chrono::steady_clock::now() + duration);
+ if (LIKELY(try_dequeue(item))) {
+ return true;
+ }
+ return tryDequeueUntil(item, std::chrono::steady_clock::now() + duration);
}
/** size */
}
private:
+ /** enqueueImpl */
template <typename Arg>
- void enqueueImpl(Arg&& arg);
+ FOLLY_ALWAYS_INLINE void enqueueImpl(Arg&& arg) {
+ if (SPSC) {
+ Segment* s = tail();
+ enqueueCommon(s, std::forward<Arg>(arg));
+ } else {
+ // Using hazptr_holder instead of hazptr_local because it is
+ // possible that the T ctor happens to use hazard pointers.
+ folly::hazptr::hazptr_holder hptr;
+ Segment* s = hptr.get_protected(p_.tail);
+ enqueueCommon(s, std::forward<Arg>(arg));
+ }
+ }
+ /** enqueueCommon */
template <typename Arg>
- void enqueueCommon(Segment* s, Arg&& arg);
+ FOLLY_ALWAYS_INLINE void enqueueCommon(Segment* s, Arg&& arg) {
+ Ticket t = fetchIncrementProducerTicket();
+ if (!SingleProducer) {
+ s = findSegment(s, t);
+ }
+ DCHECK_GE(t, s->minTicket());
+ DCHECK_LT(t, s->minTicket() + SegmentSize);
+ size_t idx = index(t);
+ Entry& e = s->entry(idx);
+ e.putItem(std::forward<Arg>(arg));
+ if (responsibleForAlloc(t)) {
+ allocNextSegment(s, t + SegmentSize);
+ }
+ if (responsibleForAdvance(t)) {
+ advanceTail(s);
+ }
+ }
+
+ /** dequeueImpl */
+ FOLLY_ALWAYS_INLINE void dequeueImpl(T& item) noexcept {
+ if (SPSC) {
+ Segment* s = head();
+ dequeueCommon(s, item);
+ } else {
+ // Using hazptr_holder instead of hazptr_local because it is
+ // possible to call the T dtor and it may happen to use hazard
+ // pointers.
+ folly::hazptr::hazptr_holder hptr;
+ Segment* s = hptr.get_protected(c_.head);
+ dequeueCommon(s, item);
+ }
+ }
+
+ /** dequeueCommon */
+ FOLLY_ALWAYS_INLINE void dequeueCommon(Segment* s, T& item) noexcept {
+ Ticket t = fetchIncrementConsumerTicket();
+ if (!SingleConsumer) {
+ s = findSegment(s, t);
+ }
+ size_t idx = index(t);
+ Entry& e = s->entry(idx);
+ e.takeItem(item);
+ if (responsibleForAdvance(t)) {
+ advanceHead(s);
+ }
+ }
- void dequeueCommon(Segment* s, T& item) noexcept;
+ /** tryDequeueUntil */
+ template <typename Clock, typename Duration>
+ FOLLY_ALWAYS_INLINE bool tryDequeueUntil(
+ T& item,
+ const std::chrono::time_point<Clock, Duration>& deadline) noexcept {
+ if (SingleConsumer) {
+ Segment* s = head();
+ return tryDequeueUntilSC(s, item, deadline);
+ } else {
+ // Using hazptr_holder instead of hazptr_local because it is
+ // possible to call ~T() and it may happen to use hazard pointers.
+ folly::hazptr::hazptr_holder hptr;
+ Segment* s = hptr.get_protected(c_.head);
+ return ryDequeueUntilMC(s, item, deadline);
+ }
+ }
+ /** ryDequeueUntilSC */
template <typename Clock, typename Duration>
- bool singleConsumerTryDequeueUntil(
+ FOLLY_ALWAYS_INLINE bool tryDequeueUntilSC(
Segment* s,
T& item,
- const std::chrono::time_point<Clock, Duration>& deadline) noexcept;
+ const std::chrono::time_point<Clock, Duration>& deadline) noexcept {
+ Ticket t = consumerTicket();
+ DCHECK_GE(t, s->minTicket());
+ DCHECK_LT(t, (s->minTicket() + SegmentSize));
+ size_t idx = index(t);
+ Entry& e = s->entry(idx);
+ if (!e.tryWaitUntil(deadline)) {
+ return false;
+ }
+ setConsumerTicket(t + 1);
+ e.takeItem(item);
+ if (responsibleForAdvance(t)) {
+ advanceHead(s);
+ }
+ return true;
+ }
+ /** tryDequeueUntilMC */
template <typename Clock, typename Duration>
- bool multiConsumerTryDequeueUntil(
+ FOLLY_ALWAYS_INLINE bool ryDequeueUntilMC(
Segment* s,
T& item,
- const std::chrono::time_point<Clock, Duration>& deadline) noexcept;
+ const std::chrono::time_point<Clock, Duration>& deadline) noexcept {
+ while (true) {
+ Ticket t = consumerTicket();
+ if (UNLIKELY(t >= (s->minTicket() + SegmentSize))) {
+ s = tryGetNextSegmentUntil(s, deadline);
+ if (s == nullptr) {
+ return false; // timed out
+ }
+ continue;
+ }
+ size_t idx = index(t);
+ Entry& e = s->entry(idx);
+ if (!e.tryWaitUntil(deadline)) {
+ return false;
+ }
+ if (!c_.ticket.compare_exchange_weak(
+ t, t + 1, std::memory_order_acq_rel, std::memory_order_acquire)) {
+ continue;
+ }
+ e.takeItem(item);
+ if (responsibleForAdvance(t)) {
+ advanceHead(s);
+ }
+ return true;
+ }
+ }
+
+ /** findSegment */
+ FOLLY_ALWAYS_INLINE
+ Segment* findSegment(Segment* s, const Ticket t) const noexcept {
+ while (UNLIKELY(t >= (s->minTicket() + SegmentSize))) {
+ auto deadline = std::chrono::steady_clock::time_point::max();
+ s = tryGetNextSegmentUntil(s, deadline);
+ DCHECK(s != nullptr);
+ }
+ return s;
+ }
- Segment* findSegment(Segment* s, const Ticket t) const noexcept;
+ /** tryGetNextSegmentUntil */
+ template <typename Clock, typename Duration>
+ Segment* tryGetNextSegmentUntil(
+ Segment* s,
+ const std::chrono::time_point<Clock, Duration>& deadline) const noexcept {
+ // The following loop will not spin indefinitely (as long as the
+ // number of concurrently waiting consumers does not exceeds
+ // SegmentSize and the OS scheduler does not pause ready threads
+ // indefinitely). Under such conditions, the algorithm guarantees
+ // that the producer reponsible for advancing the tail pointer to
+ // the next segment has already acquired its ticket.
+ while (tail() == s) {
+ if (deadline < Clock::time_point::max() && deadline > Clock::now()) {
+ return nullptr;
+ }
+ asm_volatile_pause();
+ }
+ Segment* next = s->nextSegment();
+ DCHECK(next != nullptr);
+ return next;
+ }
- void allocNextSegment(Segment* s, const Ticket t);
+ /** allocNextSegment */
+ void allocNextSegment(Segment* s, const Ticket t) {
+ Segment* next = new Segment(t);
+ if (!SPSC) {
+ next->acquire_ref_safe(); // hazptr
+ }
+ DCHECK(s->nextSegment() == nullptr);
+ s->setNextSegment(next);
+ }
+
+ /** advanceTail */
+ void advanceTail(Segment* s) noexcept {
+ Segment* next = s->nextSegment();
+ if (!SingleProducer) {
+ // The following loop will not spin indefinitely (as long as the
+ // OS scheduler does not pause ready threads indefinitely). The
+ // algorithm guarantees that the producer reponsible for setting
+ // the next pointer has already acquired its ticket.
+ while (next == nullptr) {
+ asm_volatile_pause();
+ next = s->nextSegment();
+ }
+ }
+ DCHECK(next != nullptr);
+ setTail(next);
+ }
- void advanceTail(Segment* s) noexcept;
+ /** advanceHead */
+ void advanceHead(Segment* s) noexcept {
+ auto deadline = std::chrono::steady_clock::time_point::max();
+ Segment* next = tryGetNextSegmentUntil(s, deadline);
+ DCHECK(next != nullptr);
+ while (head() != s) {
+ // Wait for head to advance to the current segment first before
+ // advancing head to the next segment. Otherwise, a lagging
+ // consumer responsible for advancing head from an earlier
+ // segment may incorrectly set head back.
+ asm_volatile_pause();
+ }
+ setHead(next);
+ reclaimSegment(s);
+ }
- void advanceHead(Segment* s) noexcept;
+ /** reclaimSegment */
+ void reclaimSegment(Segment* s) noexcept {
+ if (SPSC) {
+ delete s;
+ } else {
+ s->retire(); // hazptr
+ }
+ }
FOLLY_ALWAYS_INLINE size_t index(Ticket t) const noexcept {
return (t * Stride) & (SegmentSize - 1);
}
FOLLY_ALWAYS_INLINE Segment* head() const noexcept {
- return head_.load(std::memory_order_acquire);
+ return c_.head.load(std::memory_order_acquire);
}
FOLLY_ALWAYS_INLINE Segment* tail() const noexcept {
- return tail_.load(std::memory_order_acquire);
+ return p_.tail.load(std::memory_order_acquire);
}
FOLLY_ALWAYS_INLINE Ticket producerTicket() const noexcept {
- return producerTicket_.load(std::memory_order_acquire);
+ return p_.ticket.load(std::memory_order_acquire);
}
FOLLY_ALWAYS_INLINE Ticket consumerTicket() const noexcept {
- return consumerTicket_.load(std::memory_order_acquire);
+ return c_.ticket.load(std::memory_order_acquire);
}
void setHead(Segment* s) noexcept {
- head_.store(s, std::memory_order_release);
+ c_.head.store(s, std::memory_order_release);
}
void setTail(Segment* s) noexcept {
- tail_.store(s, std::memory_order_release);
+ p_.tail.store(s, std::memory_order_release);
}
FOLLY_ALWAYS_INLINE void setProducerTicket(Ticket t) noexcept {
- producerTicket_.store(t, std::memory_order_release);
+ p_.ticket.store(t, std::memory_order_release);
}
FOLLY_ALWAYS_INLINE void setConsumerTicket(Ticket t) noexcept {
- consumerTicket_.store(t, std::memory_order_release);
+ c_.ticket.store(t, std::memory_order_release);
}
FOLLY_ALWAYS_INLINE Ticket fetchIncrementConsumerTicket() noexcept {
if (SingleConsumer) {
- auto oldval = consumerTicket();
+ Ticket oldval = consumerTicket();
setConsumerTicket(oldval + 1);
return oldval;
} else { // MC
- return consumerTicket_.fetch_add(1, std::memory_order_acq_rel);
+ return c_.ticket.fetch_add(1, std::memory_order_acq_rel);
}
}
FOLLY_ALWAYS_INLINE Ticket fetchIncrementProducerTicket() noexcept {
if (SingleProducer) {
- auto oldval = producerTicket();
+ Ticket oldval = producerTicket();
setProducerTicket(oldval + 1);
return oldval;
} else { // MP
- return producerTicket_.fetch_add(1, std::memory_order_acq_rel);
+ return p_.ticket.fetch_add(1, std::memory_order_acq_rel);
}
}
+
+ /**
+ * Entry
+ */
+ class Entry {
+ folly::SaturatingSemaphore<MayBlock, Atom> flag_;
+ typename std::aligned_storage<sizeof(T), alignof(T)>::type item_;
+
+ public:
+ template <typename Arg>
+ FOLLY_ALWAYS_INLINE void putItem(Arg&& arg) {
+ new (&item_) T(std::forward<Arg>(arg));
+ flag_.post();
+ }
+
+ FOLLY_ALWAYS_INLINE void takeItem(T& item) noexcept {
+ flag_.wait();
+ getItem(item);
+ }
+
+ template <typename Clock, typename Duration>
+ FOLLY_ALWAYS_INLINE bool tryWaitUntil(
+ const std::chrono::time_point<Clock, Duration>& deadline) noexcept {
+ return flag_.try_wait_until(deadline);
+ }
+
+ private:
+ FOLLY_ALWAYS_INLINE void getItem(T& item) noexcept {
+ item = std::move(*(itemPtr()));
+ destroyItem();
+ }
+
+ FOLLY_ALWAYS_INLINE T* itemPtr() noexcept {
+ return static_cast<T*>(static_cast<void*>(&item_));
+ }
+
+ FOLLY_ALWAYS_INLINE void destroyItem() noexcept {
+ itemPtr()->~T();
+ }
+ }; // Entry
+
+ /**
+ * Segment
+ */
+ class Segment : public folly::hazptr::hazptr_obj_base_refcounted<Segment> {
+ Atom<Segment*> next_;
+ const Ticket min_;
+ bool marked_; // used for iterative deletion
+ FOLLY_ALIGNED(Align)
+ Entry b_[SegmentSize];
+
+ public:
+ explicit Segment(const Ticket t)
+ : next_(nullptr), min_(t), marked_(false) {}
+
+ ~Segment() {
+ if (!SPSC && !marked_) {
+ Segment* next = nextSegment();
+ while (next) {
+ if (!next->release_ref()) { // hazptr
+ return;
+ }
+ Segment* s = next;
+ next = s->nextSegment();
+ s->marked_ = true;
+ delete s;
+ }
+ }
+ }
+
+ Segment* nextSegment() const noexcept {
+ return next_.load(std::memory_order_acquire);
+ }
+
+ void setNextSegment(Segment* s) noexcept {
+ next_.store(s, std::memory_order_release);
+ }
+
+ FOLLY_ALWAYS_INLINE Ticket minTicket() const noexcept {
+ DCHECK_EQ((min_ & (SegmentSize - 1)), 0);
+ return min_;
+ }
+
+ FOLLY_ALWAYS_INLINE Entry& entry(size_t index) noexcept {
+ return b_[index];
+ }
+ }; // Segment
+
}; // UnboundedQueue
/* Aliases */
typename T,
bool MayBlock,
size_t LgSegmentSize = 8,
+ size_t LgAlign = 7,
template <typename> class Atom = std::atomic>
-using USPSCQueue = UnboundedQueue<T, true, true, MayBlock, LgSegmentSize, Atom>;
+using USPSCQueue =
+ UnboundedQueue<T, true, true, MayBlock, LgSegmentSize, LgAlign, Atom>;
template <
typename T,
bool MayBlock,
size_t LgSegmentSize = 8,
+ size_t LgAlign = 7,
template <typename> class Atom = std::atomic>
using UMPSCQueue =
- UnboundedQueue<T, false, true, MayBlock, LgSegmentSize, Atom>;
+ UnboundedQueue<T, false, true, MayBlock, LgSegmentSize, LgAlign, Atom>;
template <
typename T,
bool MayBlock,
size_t LgSegmentSize = 8,
+ size_t LgAlign = 7,
template <typename> class Atom = std::atomic>
using USPMCQueue =
- UnboundedQueue<T, true, false, MayBlock, LgSegmentSize, Atom>;
+ UnboundedQueue<T, true, false, MayBlock, LgSegmentSize, LgAlign, Atom>;
template <
typename T,
bool MayBlock,
size_t LgSegmentSize = 8,
+ size_t LgAlign = 7,
template <typename> class Atom = std::atomic>
using UMPMCQueue =
- UnboundedQueue<T, false, false, MayBlock, LgSegmentSize, Atom>;
+ UnboundedQueue<T, false, false, MayBlock, LgSegmentSize, LgAlign, Atom>;
} // namespace folly
-
-#include <folly/concurrency/UnboundedQueue-inl.h>