+++ /dev/null
-/*
- * Copyright 2017 Facebook, Inc.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-#pragma once
-
-#include <folly/experimental/hazptr/hazptr.h>
-#include <folly/lang/Launder.h>
-#include <folly/synchronization/SaturatingSemaphore.h>
-
-#include <glog/logging.h>
-
-namespace folly {
-
-/* constructor */
-
-template <
- typename T,
- bool SingleProducer,
- bool SingleConsumer,
- bool MayBlock,
- size_t LgSegmentSize,
- template <typename> class Atom>
-inline UnboundedQueue<
- T,
- SingleProducer,
- SingleConsumer,
- MayBlock,
- LgSegmentSize,
- Atom>::UnboundedQueue() {
- setProducerTicket(0);
- setConsumerTicket(0);
- auto s = new Segment(0);
- DEBUG_PRINT(s);
- setTail(s);
- setHead(s);
-}
-
-/* destructor */
-
-template <
- typename T,
- bool SingleProducer,
- bool SingleConsumer,
- bool MayBlock,
- size_t LgSegmentSize,
- template <typename> class Atom>
-inline UnboundedQueue<
- T,
- SingleProducer,
- SingleConsumer,
- MayBlock,
- LgSegmentSize,
- Atom>::~UnboundedQueue() {
- Segment* next;
- for (auto s = head(); s; s = next) {
- next = s->nextSegment();
- if (SPSC) {
- delete s;
- } else {
- s->retire(); // hazptr
- }
- }
-}
-
-/* dequeue */
-
-template <
- typename T,
- bool SingleProducer,
- bool SingleConsumer,
- bool MayBlock,
- size_t LgSegmentSize,
- template <typename> class Atom>
-FOLLY_ALWAYS_INLINE void UnboundedQueue<
- T,
- SingleProducer,
- SingleConsumer,
- MayBlock,
- LgSegmentSize,
- Atom>::dequeue(T& item) noexcept {
- if (SPSC) {
- auto s = head();
- dequeueCommon(s, item);
- } else {
- // Using hazptr_holder instead of hazptr_local because it is
- // possible to call ~T() and it may happen to use hazard pointers.
- folly::hazptr::hazptr_holder hptr;
- auto s = hptr.get_protected(head_);
- dequeueCommon(s, item);
- }
-}
-
-/* try_dequeue_until */
-
-template <
- typename T,
- bool SingleProducer,
- bool SingleConsumer,
- bool MayBlock,
- size_t LgSegmentSize,
- template <typename> class Atom>
-template <typename Clock, typename Duration>
-FOLLY_ALWAYS_INLINE bool UnboundedQueue<
- T,
- SingleProducer,
- SingleConsumer,
- MayBlock,
- LgSegmentSize,
- Atom>::
- try_dequeue_until(
- T& item,
- const std::chrono::time_point<Clock, Duration>& deadline) noexcept {
- if (SingleConsumer) {
- auto s = head();
- return singleConsumerTryDequeueUntil(s, item, deadline);
- } else {
- // Using hazptr_holder instead of hazptr_local because it is
- // possible to call ~T() and it may happen to use hazard pointers.
- folly::hazptr::hazptr_holder hptr;
- auto s = hptr.get_protected(head_);
- return multiConsumerTryDequeueUntil(s, item, deadline);
- }
-}
-
-/* enqueueImpl */
-
-template <
- typename T,
- bool SingleProducer,
- bool SingleConsumer,
- bool MayBlock,
- size_t LgSegmentSize,
- template <typename> class Atom>
-template <typename Arg>
-FOLLY_ALWAYS_INLINE void UnboundedQueue<
- T,
- SingleProducer,
- SingleConsumer,
- MayBlock,
- LgSegmentSize,
- Atom>::enqueueImpl(Arg&& arg) {
- if (SPSC) {
- auto s = tail();
- enqueueCommon(s, std::forward<Arg>(arg));
- } else {
- // Using hazptr_holder instead of hazptr_local because it is
- // possible that the T construcctor happens to use hazard
- // pointers.
- folly::hazptr::hazptr_holder hptr;
- auto s = hptr.get_protected(tail_);
- enqueueCommon(s, std::forward<Arg>(arg));
- }
-}
-
-/* enqueueCommon */
-
-template <
- typename T,
- bool SingleProducer,
- bool SingleConsumer,
- bool MayBlock,
- size_t LgSegmentSize,
- template <typename> class Atom>
-template <typename Arg>
-FOLLY_ALWAYS_INLINE void UnboundedQueue<
- T,
- SingleProducer,
- SingleConsumer,
- MayBlock,
- LgSegmentSize,
- Atom>::enqueueCommon(Segment* s, Arg&& arg) {
- auto t = fetchIncrementProducerTicket();
- if (!SingleProducer) {
- s = findSegment(s, t);
- }
- DCHECK_GE(t, s->minTicket());
- DCHECK_LT(t, (s->minTicket() + SegmentSize));
- size_t idx = index(t);
- Entry& e = s->entry(idx);
- e.putItem(std::forward<Arg>(arg));
- if (responsibleForAlloc(t)) {
- allocNextSegment(s, t + SegmentSize);
- }
- if (responsibleForAdvance(t)) {
- advanceTail(s);
- }
-}
-
-/* dequeueCommon */
-
-template <
- typename T,
- bool SingleProducer,
- bool SingleConsumer,
- bool MayBlock,
- size_t LgSegmentSize,
- template <typename> class Atom>
-FOLLY_ALWAYS_INLINE void UnboundedQueue<
- T,
- SingleProducer,
- SingleConsumer,
- MayBlock,
- LgSegmentSize,
- Atom>::dequeueCommon(Segment* s, T& item) noexcept {
- auto t = fetchIncrementConsumerTicket();
- if (!SingleConsumer) {
- s = findSegment(s, t);
- }
- size_t idx = index(t);
- Entry& e = s->entry(idx);
- e.takeItem(item);
- if (responsibleForAdvance(t)) {
- advanceHead(s);
- }
-}
-
-/* singleConsumerTryDequeueUntil */
-
-template <
- typename T,
- bool SingleProducer,
- bool SingleConsumer,
- bool MayBlock,
- size_t LgSegmentSize,
- template <typename> class Atom>
-template <typename Clock, typename Duration>
-FOLLY_ALWAYS_INLINE bool UnboundedQueue<
- T,
- SingleProducer,
- SingleConsumer,
- MayBlock,
- LgSegmentSize,
- Atom>::
- singleConsumerTryDequeueUntil(
- Segment* s,
- T& item,
- const std::chrono::time_point<Clock, Duration>& deadline) noexcept {
- auto t = consumerTicket();
- DCHECK_GE(t, s->minTicket());
- DCHECK_LT(t, (s->minTicket() + SegmentSize));
- size_t idx = index(t);
- Entry& e = s->entry(idx);
- if (!e.tryWaitUntil(deadline)) {
- return false;
- }
- setConsumerTicket(t + 1);
- e.takeItem(item);
- if (responsibleForAdvance(t)) {
- advanceHead(s);
- }
- return true;
-}
-
-/* multiConsumerTryDequeueUntil */
-
-template <
- typename T,
- bool SingleProducer,
- bool SingleConsumer,
- bool MayBlock,
- size_t LgSegmentSize,
- template <typename> class Atom>
-template <typename Clock, typename Duration>
-FOLLY_ALWAYS_INLINE bool UnboundedQueue<
- T,
- SingleProducer,
- SingleConsumer,
- MayBlock,
- LgSegmentSize,
- Atom>::
- multiConsumerTryDequeueUntil(
- Segment* s,
- T& item,
- const std::chrono::time_point<Clock, Duration>& deadline) noexcept {
- while (true) {
- auto t = consumerTicket();
- if (UNLIKELY(t >= (s->minTicket() + SegmentSize))) {
- Segment* next;
- // Note that the following loop will not spin indefinitely (as
- // long as the number of concurrently waiting consumers is not
- // greater than SegmentSize). The algorithm guarantees in such a
- // case that the producer reponsible for setting the next
- // pointer is already running.
- while ((next = s->nextSegment()) == nullptr) {
- if (Clock::now() > deadline) {
- return false;
- }
- asm_volatile_pause();
- }
- s = next;
- DCHECK(s != nullptr);
- continue;
- }
- size_t idx = index(t);
- Entry& e = s->entry(idx);
- if (!e.tryWaitUntil(deadline)) {
- return false;
- }
- if (!consumerTicket_.compare_exchange_weak(
- t, t + 1, std::memory_order_acq_rel, std::memory_order_acquire)) {
- continue;
- }
- e.takeItem(item);
- if (responsibleForAdvance(t)) {
- advanceHead(s);
- }
- return true;
- }
-}
-
-/* findSegment */
-
-template <
- typename T,
- bool SingleProducer,
- bool SingleConsumer,
- bool MayBlock,
- size_t LgSegmentSize,
- template <typename> class Atom>
-inline typename UnboundedQueue<
- T,
- SingleProducer,
- SingleConsumer,
- MayBlock,
- LgSegmentSize,
- Atom>::Segment*
-UnboundedQueue<
- T,
- SingleProducer,
- SingleConsumer,
- MayBlock,
- LgSegmentSize,
- Atom>::findSegment(Segment* s, const Ticket t) const noexcept {
- while (t >= (s->minTicket() + SegmentSize)) {
- Segment* next = s->nextSegment();
- // Note that the following loop will not spin indefinitely. The
- // algorithm guarantees that the producer reponsible for setting
- // the next pointer is already running.
- while (next == nullptr) {
- asm_volatile_pause();
- next = s->nextSegment();
- }
- DCHECK(next != nullptr);
- s = next;
- }
- return s;
-}
-
-/* allocNextSegment */
-
-template <
- typename T,
- bool SingleProducer,
- bool SingleConsumer,
- bool MayBlock,
- size_t LgSegmentSize,
- template <typename> class Atom>
-inline void UnboundedQueue<
- T,
- SingleProducer,
- SingleConsumer,
- MayBlock,
- LgSegmentSize,
- Atom>::allocNextSegment(Segment* s, const Ticket t) {
- auto next = new Segment(t);
- if (!SPSC) {
- next->acquire_ref_safe(); // hazptr
- }
- DEBUG_PRINT(s << " " << next);
- DCHECK(s->nextSegment() == nullptr);
- s->setNextSegment(next);
-}
-
-/* advanceTail */
-
-template <
- typename T,
- bool SingleProducer,
- bool SingleConsumer,
- bool MayBlock,
- size_t LgSegmentSize,
- template <typename> class Atom>
-inline void UnboundedQueue<
- T,
- SingleProducer,
- SingleConsumer,
- MayBlock,
- LgSegmentSize,
- Atom>::advanceTail(Segment* s) noexcept {
- Segment* next = s->nextSegment();
- if (!SingleProducer) {
- // Note that the following loop will not spin indefinitely. The
- // algorithm guarantees that the producer reponsible for setting
- // the next pointer is already running.
- while (next == nullptr) {
- asm_volatile_pause();
- next = s->nextSegment();
- }
- }
- DCHECK(next != nullptr);
- DEBUG_PRINT(s << " " << next);
- setTail(next);
-}
-
-/* advanceHead */
-
-template <
- typename T,
- bool SingleProducer,
- bool SingleConsumer,
- bool MayBlock,
- size_t LgSegmentSize,
- template <typename> class Atom>
-inline void UnboundedQueue<
- T,
- SingleProducer,
- SingleConsumer,
- MayBlock,
- LgSegmentSize,
- Atom>::advanceHead(Segment* s) noexcept {
- // Note that the following loops will not spin indefinitely. The
- // algorithm guarantees that the producers reponsible for advancing
- // the tail pointer and setting the next pointer are already
- // running.
- while (tail() == s) {
- asm_volatile_pause();
- }
- auto next = s->nextSegment();
- while (next == nullptr) {
- next = s->nextSegment();
- }
- DEBUG_PRINT(s << " " << next);
- setHead(next);
- if (SPSC) {
- delete s;
- } else {
- s->retire(); // hazptr
- }
-}
-
-/**
- * Entry
- */
-
-template <
- typename T,
- bool SingleProducer,
- bool SingleConsumer,
- bool MayBlock,
- size_t LgSegmentSize,
- template <typename> class Atom>
-class UnboundedQueue<
- T,
- SingleProducer,
- SingleConsumer,
- MayBlock,
- LgSegmentSize,
- Atom>::Entry {
- folly::SaturatingSemaphore<MayBlock, Atom> flag_;
- typename std::aligned_storage<sizeof(T), alignof(T)>::type item_;
-
- public:
- template <typename Arg>
- FOLLY_ALWAYS_INLINE void putItem(Arg&& arg) {
- new (&item_) T(std::forward<Arg>(arg));
- flag_.post();
- }
-
- FOLLY_ALWAYS_INLINE void takeItem(T& item) noexcept {
- flag_.wait();
- getItem(item);
- }
-
- template <typename Clock, typename Duration>
- FOLLY_ALWAYS_INLINE bool tryWaitUntil(
- const std::chrono::time_point<Clock, Duration>& deadline) noexcept {
- return flag_.try_wait_until(deadline);
- }
-
- private:
- FOLLY_ALWAYS_INLINE void getItem(T& item) noexcept {
- item = std::move(*(folly::launder(itemPtr())));
- destroyItem();
- }
-
- FOLLY_ALWAYS_INLINE T* itemPtr() noexcept {
- return static_cast<T*>(static_cast<void*>(&item_));
- }
-
- FOLLY_ALWAYS_INLINE void destroyItem() noexcept {
- itemPtr()->~T();
- }
-}; // UnboundedQueue::Entry
-
-/**
- * Segment
- */
-
-template <
- typename T,
- bool SingleProducer,
- bool SingleConsumer,
- bool MayBlock,
- size_t LgSegmentSize,
- template <typename> class Atom>
-class UnboundedQueue<
- T,
- SingleProducer,
- SingleConsumer,
- MayBlock,
- LgSegmentSize,
- Atom>::Segment : public folly::hazptr::hazptr_obj_base_refcounted<Segment> {
- Atom<Segment*> next_;
- const Ticket min_;
- bool marked_; // used for iterative deletion
- FOLLY_ALIGN_TO_AVOID_FALSE_SHARING
- Entry b_[SegmentSize];
-
- public:
- explicit Segment(const Ticket t) : next_(nullptr), min_(t), marked_(false) {}
-
- ~Segment() {
- if (!SPSC && !marked_) {
- auto next = nextSegment();
- while (next) {
- if (!next->release_ref()) { // hazptr
- return;
- }
- auto s = next;
- next = s->nextSegment();
- s->marked_ = true;
- delete s;
- }
- }
- }
-
- Segment* nextSegment() const noexcept {
- return next_.load(std::memory_order_acquire);
- }
-
- void setNextSegment(Segment* s) noexcept {
- next_.store(s, std::memory_order_release);
- }
-
- FOLLY_ALWAYS_INLINE Ticket minTicket() const noexcept {
- DCHECK_EQ((min_ & (SegmentSize - 1)), 0);
- return min_;
- }
-
- FOLLY_ALWAYS_INLINE Entry& entry(size_t index) noexcept {
- return b_[index];
- }
-}; // UnboundedQueue::Segment
-
-} // namespace folly
/*
- * Copyright 2017 Facebook, Inc.
+ * Copyright 2017-present Facebook, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
#include <chrono>
#include <memory>
+#include <glog/logging.h>
+
#include <folly/concurrency/CacheLocality.h>
+#include <folly/experimental/hazptr/hazptr.h>
+#include <folly/synchronization/SaturatingSemaphore.h>
namespace folly {
/// spins. A performance tuning parameter.
/// - LgSegmentSize (default 8): Log base 2 of number of elements per
/// segment. A performance tuning parameter. See below.
+/// - LgAlign (default 7): Log base 2 of alignment directive; can be
+/// used to balance scalability (avoidance of false sharing) with
+/// memory efficiency.
///
/// When to use UnboundedQueue:
/// - If a small bound may lead to deadlock or performance degradation
/// SPSC) ProducerConsumerQueue.
///
/// Template Aliases:
-/// USPSCQueue<T, MayBlock, LgSegmentSize>
-/// UMPSCQueue<T, MayBlock, LgSegmentSize>
-/// USPMCQueue<T, MayBlock, LgSegmentSize>
-/// UMPMCQueue<T, MayBlock, LgSegmentSize>
+/// USPSCQueue<T, MayBlock, LgSegmentSize, LgAlign>
+/// UMPSCQueue<T, MayBlock, LgSegmentSize, LgAlign>
+/// USPMCQueue<T, MayBlock, LgSegmentSize, LgAlign>
+/// UMPMCQueue<T, MayBlock, LgSegmentSize, LgAlign>
///
/// Functions:
/// Producer operations never wait or fail (unless OOM)
/// producers or consumers, have references to them or their
/// predessors. That is, a lagging thread may delay the reclamation
/// of a chain of removed segments.
+/// - The template parameter LgAlign can be used to reduce memory usage
+/// at the cost of increased chance of false sharing.
///
/// Performance considerations:
/// - All operations take constant time, excluding the costs of
bool SingleConsumer,
bool MayBlock,
size_t LgSegmentSize = 8,
+ size_t LgAlign = 7,
template <typename> class Atom = std::atomic>
class UnboundedQueue {
using Ticket = uint64_t;
class Segment;
static constexpr bool SPSC = SingleProducer && SingleConsumer;
- static constexpr size_t SegmentSize = 1 << LgSegmentSize;
static constexpr size_t Stride = SPSC || (LgSegmentSize <= 1) ? 1 : 27;
+ static constexpr size_t SegmentSize = 1u << LgSegmentSize;
+ static constexpr size_t Align = 1u << LgAlign;
static_assert(
std::is_nothrow_destructible<T>::value,
"T must be nothrow_destructible");
static_assert((Stride & 1) == 1, "Stride must be odd");
static_assert(LgSegmentSize < 32, "LgSegmentSize must be < 32");
+ static_assert(LgAlign < 16, "LgAlign must be < 16");
- FOLLY_ALIGN_TO_AVOID_FALSE_SHARING
+ FOLLY_ALIGNED(Align)
Atom<Segment*> head_;
Atom<Ticket> consumerTicket_;
- FOLLY_ALIGN_TO_AVOID_FALSE_SHARING
+ FOLLY_ALIGNED(Align)
Atom<Segment*> tail_;
Atom<Ticket> producerTicket_;
public:
- UnboundedQueue();
- ~UnboundedQueue();
+ /** constructor */
+ UnboundedQueue() {
+ setProducerTicket(0);
+ setConsumerTicket(0);
+ Segment* s = new Segment(0);
+ setTail(s);
+ setHead(s);
+ }
+
+ /** destructor */
+ ~UnboundedQueue() {
+ Segment* next;
+ for (Segment* s = head(); s; s = next) {
+ next = s->nextSegment();
+ reclaimSegment(s);
+ }
+ }
/** enqueue */
FOLLY_ALWAYS_INLINE void enqueue(const T& arg) {
}
/** dequeue */
- void dequeue(T& item) noexcept;
+ FOLLY_ALWAYS_INLINE void dequeue(T& item) noexcept {
+ dequeueImpl(item);
+ }
/** try_dequeue */
- bool try_dequeue(T& item) noexcept {
- return try_dequeue_until(
- item, std::chrono::steady_clock::time_point::min());
+ FOLLY_ALWAYS_INLINE bool try_dequeue(T& item) noexcept {
+ return tryDequeueUntil(item, std::chrono::steady_clock::time_point::min());
}
/** try_dequeue_until */
template <typename Clock, typename Duration>
- bool try_dequeue_until(
+ FOLLY_ALWAYS_INLINE bool try_dequeue_until(
T& item,
- const std::chrono::time_point<Clock, Duration>& deadline) noexcept;
+ const std::chrono::time_point<Clock, Duration>& deadline) noexcept {
+ return tryDequeueUntil(item, deadline);
+ }
/** try_dequeue_for */
template <typename Rep, typename Period>
- bool try_dequeue_for(
+ FOLLY_ALWAYS_INLINE bool try_dequeue_for(
T& item,
const std::chrono::duration<Rep, Period>& duration) noexcept {
- return try_dequeue_until(item, std::chrono::steady_clock::now() + duration);
+ if (LIKELY(try_dequeue(item))) {
+ return true;
+ }
+ return tryDequeueUntil(item, std::chrono::steady_clock::now() + duration);
}
/** size */
}
private:
+ /** enqueueImpl */
template <typename Arg>
- void enqueueImpl(Arg&& arg);
+ FOLLY_ALWAYS_INLINE void enqueueImpl(Arg&& arg) {
+ if (SPSC) {
+ Segment* s = tail();
+ enqueueCommon(s, std::forward<Arg>(arg));
+ } else {
+ // Using hazptr_holder instead of hazptr_local because it is
+ // possible that the T ctor happens to use hazard pointers.
+ folly::hazptr::hazptr_holder hptr;
+ Segment* s = hptr.get_protected(tail_);
+ enqueueCommon(s, std::forward<Arg>(arg));
+ }
+ }
+ /** enqueueCommon */
template <typename Arg>
- void enqueueCommon(Segment* s, Arg&& arg);
+ FOLLY_ALWAYS_INLINE void enqueueCommon(Segment* s, Arg&& arg) {
+ Ticket t = fetchIncrementProducerTicket();
+ if (!SingleProducer) {
+ s = findSegment(s, t);
+ }
+ DCHECK_GE(t, s->minTicket());
+ DCHECK_LT(t, s->minTicket() + SegmentSize);
+ size_t idx = index(t);
+ Entry& e = s->entry(idx);
+ e.putItem(std::forward<Arg>(arg));
+ if (responsibleForAlloc(t)) {
+ allocNextSegment(s, t + SegmentSize);
+ }
+ if (responsibleForAdvance(t)) {
+ advanceTail(s);
+ }
+ }
- void dequeueCommon(Segment* s, T& item) noexcept;
+ /** dequeueImpl */
+ FOLLY_ALWAYS_INLINE void dequeueImpl(T& item) noexcept {
+ if (SPSC) {
+ Segment* s = head();
+ dequeueCommon(s, item);
+ } else {
+ // Using hazptr_holder instead of hazptr_local because it is
+ // possible to call the T dtor and it may happen to use hazard
+ // pointers.
+ folly::hazptr::hazptr_holder hptr;
+ Segment* s = hptr.get_protected(head_);
+ dequeueCommon(s, item);
+ }
+ }
+ /** dequeueCommon */
+ FOLLY_ALWAYS_INLINE void dequeueCommon(Segment* s, T& item) noexcept {
+ Ticket t = fetchIncrementConsumerTicket();
+ if (!SingleConsumer) {
+ s = findSegment(s, t);
+ }
+ size_t idx = index(t);
+ Entry& e = s->entry(idx);
+ e.takeItem(item);
+ if (responsibleForAdvance(t)) {
+ advanceHead(s);
+ }
+ }
+
+ /** tryDequeueUntil */
template <typename Clock, typename Duration>
- bool singleConsumerTryDequeueUntil(
+ FOLLY_ALWAYS_INLINE bool tryDequeueUntil(
+ T& item,
+ const std::chrono::time_point<Clock, Duration>& deadline) noexcept {
+ if (SingleConsumer) {
+ Segment* s = head();
+ return tryDequeueUntilSC(s, item, deadline);
+ } else {
+ // Using hazptr_holder instead of hazptr_local because it is
+ // possible to call ~T() and it may happen to use hazard pointers.
+ folly::hazptr::hazptr_holder hptr;
+ Segment* s = hptr.get_protected(head_);
+ return ryDequeueUntilMC(s, item, deadline);
+ }
+ }
+
+ /** ryDequeueUntilSC */
+ template <typename Clock, typename Duration>
+ FOLLY_ALWAYS_INLINE bool tryDequeueUntilSC(
Segment* s,
T& item,
- const std::chrono::time_point<Clock, Duration>& deadline) noexcept;
+ const std::chrono::time_point<Clock, Duration>& deadline) noexcept {
+ Ticket t = consumerTicket();
+ DCHECK_GE(t, s->minTicket());
+ DCHECK_LT(t, (s->minTicket() + SegmentSize));
+ size_t idx = index(t);
+ Entry& e = s->entry(idx);
+ if (!e.tryWaitUntil(deadline)) {
+ return false;
+ }
+ setConsumerTicket(t + 1);
+ e.takeItem(item);
+ if (responsibleForAdvance(t)) {
+ advanceHead(s);
+ }
+ return true;
+ }
+ /** tryDequeueUntilMC */
template <typename Clock, typename Duration>
- bool multiConsumerTryDequeueUntil(
+ FOLLY_ALWAYS_INLINE bool ryDequeueUntilMC(
Segment* s,
T& item,
- const std::chrono::time_point<Clock, Duration>& deadline) noexcept;
+ const std::chrono::time_point<Clock, Duration>& deadline) noexcept {
+ while (true) {
+ Ticket t = consumerTicket();
+ if (UNLIKELY(t >= (s->minTicket() + SegmentSize))) {
+ s = tryGetNextSegmentUntil(s, deadline);
+ if (s == nullptr) {
+ return false; // timed out
+ }
+ continue;
+ }
+ size_t idx = index(t);
+ Entry& e = s->entry(idx);
+ if (!e.tryWaitUntil(deadline)) {
+ return false;
+ }
+ if (!consumerTicket_.compare_exchange_weak(
+ t, t + 1, std::memory_order_acq_rel, std::memory_order_acquire)) {
+ continue;
+ }
+ e.takeItem(item);
+ if (responsibleForAdvance(t)) {
+ advanceHead(s);
+ }
+ return true;
+ }
+ }
- Segment* findSegment(Segment* s, const Ticket t) const noexcept;
+ /** findSegment */
+ FOLLY_ALWAYS_INLINE
+ Segment* findSegment(Segment* s, const Ticket t) const noexcept {
+ while (UNLIKELY(t >= (s->minTicket() + SegmentSize))) {
+ auto deadline = std::chrono::steady_clock::time_point::max();
+ s = tryGetNextSegmentUntil(s, deadline);
+ DCHECK(s != nullptr);
+ }
+ return s;
+ }
- void allocNextSegment(Segment* s, const Ticket t);
+ /** tryGetNextSegmentUntil */
+ template <typename Clock, typename Duration>
+ Segment* tryGetNextSegmentUntil(
+ Segment* s,
+ const std::chrono::time_point<Clock, Duration>& deadline) const noexcept {
+ // The following loop will not spin indefinitely (as long as the
+ // number of concurrently waiting consumers does not exceeds
+ // SegmentSize and the OS scheduler does not pause ready threads
+ // indefinitely). Under such conditions, the algorithm guarantees
+ // that the producer reponsible for advancing the tail pointer to
+ // the next segment has already acquired its ticket.
+ while (tail() == s) {
+ if (deadline < Clock::time_point::max() && deadline > Clock::now()) {
+ return nullptr;
+ }
+ asm_volatile_pause();
+ }
+ Segment* next = s->nextSegment();
+ DCHECK(next != nullptr);
+ return next;
+ }
- void advanceTail(Segment* s) noexcept;
+ /** allocNextSegment */
+ void allocNextSegment(Segment* s, const Ticket t) {
+ Segment* next = new Segment(t);
+ if (!SPSC) {
+ next->acquire_ref_safe(); // hazptr
+ }
+ DCHECK(s->nextSegment() == nullptr);
+ s->setNextSegment(next);
+ }
- void advanceHead(Segment* s) noexcept;
+ /** advanceTail */
+ void advanceTail(Segment* s) noexcept {
+ Segment* next = s->nextSegment();
+ if (!SingleProducer) {
+ // The following loop will not spin indefinitely (as long as the
+ // OS scheduler does not pause ready threads indefinitely). The
+ // algorithm guarantees that the producer reponsible for setting
+ // the next pointer has already acquired its ticket.
+ while (next == nullptr) {
+ asm_volatile_pause();
+ next = s->nextSegment();
+ }
+ }
+ DCHECK(next != nullptr);
+ setTail(next);
+ }
+
+ /** advanceHead */
+ void advanceHead(Segment* s) noexcept {
+ auto deadline = std::chrono::steady_clock::time_point::max();
+ Segment* next = tryGetNextSegmentUntil(s, deadline);
+ DCHECK(next != nullptr);
+ setHead(next);
+ reclaimSegment(s);
+ }
+
+ /** reclaimSegment */
+ void reclaimSegment(Segment* s) noexcept {
+ if (SPSC) {
+ delete s;
+ } else {
+ s->retire(); // hazptr
+ }
+ }
FOLLY_ALWAYS_INLINE size_t index(Ticket t) const noexcept {
return (t * Stride) & (SegmentSize - 1);
FOLLY_ALWAYS_INLINE Ticket fetchIncrementConsumerTicket() noexcept {
if (SingleConsumer) {
- auto oldval = consumerTicket();
+ Ticket oldval = consumerTicket();
setConsumerTicket(oldval + 1);
return oldval;
} else { // MC
FOLLY_ALWAYS_INLINE Ticket fetchIncrementProducerTicket() noexcept {
if (SingleProducer) {
- auto oldval = producerTicket();
+ Ticket oldval = producerTicket();
setProducerTicket(oldval + 1);
return oldval;
} else { // MP
return producerTicket_.fetch_add(1, std::memory_order_acq_rel);
}
}
+
+ /**
+ * Entry
+ */
+ class Entry {
+ folly::SaturatingSemaphore<MayBlock, Atom> flag_;
+ typename std::aligned_storage<sizeof(T), alignof(T)>::type item_;
+
+ public:
+ template <typename Arg>
+ FOLLY_ALWAYS_INLINE void putItem(Arg&& arg) {
+ new (&item_) T(std::forward<Arg>(arg));
+ flag_.post();
+ }
+
+ FOLLY_ALWAYS_INLINE void takeItem(T& item) noexcept {
+ flag_.wait();
+ getItem(item);
+ }
+
+ template <typename Clock, typename Duration>
+ FOLLY_ALWAYS_INLINE bool tryWaitUntil(
+ const std::chrono::time_point<Clock, Duration>& deadline) noexcept {
+ return flag_.try_wait_until(deadline);
+ }
+
+ private:
+ FOLLY_ALWAYS_INLINE void getItem(T& item) noexcept {
+ item = std::move(*(itemPtr()));
+ destroyItem();
+ }
+
+ FOLLY_ALWAYS_INLINE T* itemPtr() noexcept {
+ return static_cast<T*>(static_cast<void*>(&item_));
+ }
+
+ FOLLY_ALWAYS_INLINE void destroyItem() noexcept {
+ itemPtr()->~T();
+ }
+ }; // Entry
+
+ /**
+ * Segment
+ */
+ class Segment : public folly::hazptr::hazptr_obj_base_refcounted<Segment> {
+ Atom<Segment*> next_;
+ const Ticket min_;
+ bool marked_; // used for iterative deletion
+ FOLLY_ALIGNED(Align)
+ Entry b_[SegmentSize];
+
+ public:
+ explicit Segment(const Ticket t)
+ : next_(nullptr), min_(t), marked_(false) {}
+
+ ~Segment() {
+ if (!SPSC && !marked_) {
+ Segment* next = nextSegment();
+ while (next) {
+ if (!next->release_ref()) { // hazptr
+ return;
+ }
+ Segment* s = next;
+ next = s->nextSegment();
+ s->marked_ = true;
+ delete s;
+ }
+ }
+ }
+
+ Segment* nextSegment() const noexcept {
+ return next_.load(std::memory_order_acquire);
+ }
+
+ void setNextSegment(Segment* s) noexcept {
+ next_.store(s, std::memory_order_release);
+ }
+
+ FOLLY_ALWAYS_INLINE Ticket minTicket() const noexcept {
+ DCHECK_EQ((min_ & (SegmentSize - 1)), 0);
+ return min_;
+ }
+
+ FOLLY_ALWAYS_INLINE Entry& entry(size_t index) noexcept {
+ return b_[index];
+ }
+ }; // Segment
+
}; // UnboundedQueue
/* Aliases */
typename T,
bool MayBlock,
size_t LgSegmentSize = 8,
+ size_t LgAlign = 7,
template <typename> class Atom = std::atomic>
-using USPSCQueue = UnboundedQueue<T, true, true, MayBlock, LgSegmentSize, Atom>;
+using USPSCQueue =
+ UnboundedQueue<T, true, true, MayBlock, LgSegmentSize, LgAlign, Atom>;
template <
typename T,
bool MayBlock,
size_t LgSegmentSize = 8,
+ size_t LgAlign = 7,
template <typename> class Atom = std::atomic>
using UMPSCQueue =
- UnboundedQueue<T, false, true, MayBlock, LgSegmentSize, Atom>;
+ UnboundedQueue<T, false, true, MayBlock, LgSegmentSize, LgAlign, Atom>;
template <
typename T,
bool MayBlock,
size_t LgSegmentSize = 8,
+ size_t LgAlign = 7,
template <typename> class Atom = std::atomic>
using USPMCQueue =
- UnboundedQueue<T, true, false, MayBlock, LgSegmentSize, Atom>;
+ UnboundedQueue<T, true, false, MayBlock, LgSegmentSize, LgAlign, Atom>;
template <
typename T,
bool MayBlock,
size_t LgSegmentSize = 8,
+ size_t LgAlign = 7,
template <typename> class Atom = std::atomic>
using UMPMCQueue =
- UnboundedQueue<T, false, false, MayBlock, LgSegmentSize, Atom>;
+ UnboundedQueue<T, false, false, MayBlock, LgSegmentSize, LgAlign, Atom>;
} // namespace folly
-
-#include <folly/concurrency/UnboundedQueue-inl.h>
/*
- * Copyright 2017 Facebook, Inc.
+ * Copyright 2017-present Facebook, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
DEFINE_bool(bench, false, "run benchmark");
DEFINE_int32(reps, 10, "number of reps");
DEFINE_int32(ops, 1000000, "number of operations per rep");
+DEFINE_int64(capacity, 256 * 1024, "capacity");
template <typename T, bool MayBlock>
using USPSC = folly::USPSCQueue<T, MayBlock>;
template <template <typename, bool> class Q, bool MayBlock>
void timeout_test() {
Q<int, MayBlock> q;
- int v = -1;
+ int v;
ASSERT_FALSE(q.try_dequeue_until(
v, std::chrono::steady_clock::now() + std::chrono::microseconds(1)));
ASSERT_FALSE(q.try_dequeue_for(v, std::chrono::microseconds(1)));
/* keep trying */;
}
} else if ((i % 3) == 1) {
- std::chrono::steady_clock::time_point deadline =
- std::chrono::steady_clock::now() + std::chrono::milliseconds(1);
- while (!q.try_dequeue_until(v, deadline)) {
+ auto duration = std::chrono::milliseconds(1);
+ while (!q.try_dequeue_for(v, duration)) {
/* keep trying */;
}
} else {
}
};
auto cons = [&](int tid) {
- std::chrono::steady_clock::time_point deadline =
- std::chrono::steady_clock::now() + std::chrono::hours(24);
uint64_t mysum = 0;
for (int i = tid; i < ops; i += ncons) {
T v;
/* keep trying */;
}
} else if (Op == 1 || Op == 4) {
- while (UNLIKELY(!q.try_dequeue_until(v, deadline))) {
+ auto duration = std::chrono::microseconds(1000);
+ while (UNLIKELY(!q.try_dequeue_for(v, duration))) {
/* keep trying */;
}
} else {
}
/* For performance comparison */
-template <typename T, size_t capacity>
+template <typename T>
class MPMC {
folly::MPMCQueue<T> q_;
public:
- MPMC() : q_(capacity) {}
+ MPMC() : q_(FLAGS_capacity) {}
template <typename... Args>
void enqueue(Args&&... args) {
return q_.read(item);
}
- template <typename Clock, typename Duration>
- bool try_dequeue_until(
+ template <typename Rep, typename Period>
+ bool try_dequeue_for(
T& item,
- const std::chrono::time_point<Clock, Duration>& deadline) {
+ const std::chrono::duration<Rep, Period>& duration) noexcept {
+ auto deadline = std::chrono::steady_clock::now() + duration;
return q_.tryReadUntil(deadline, item);
}
};
template <typename T, bool ignore>
-using FMPMC = MPMC<T, 256 * 1024>;
+using FMPMC = MPMC<T>;
-template <typename T, size_t capacity>
+template <typename T>
class PCQ {
folly::ProducerConsumerQueue<T> q_;
public:
- PCQ() : q_(capacity) {}
+ PCQ() : q_(FLAGS_capacity) {}
template <typename... Args>
void enqueue(Args&&... args) {
return q_.read(item);
}
- template <typename Clock, typename Duration>
- bool try_dequeue_until(T&, const std::chrono::time_point<Clock, Duration>&) {
+ template <typename Rep, typename Period>
+ bool try_dequeue_for(T&, const std::chrono::duration<Rep, Period>&) noexcept {
return false;
}
};
template <typename T, bool ignore>
-using FPCQ = PCQ<T, 256 * 1024>;
+using FPCQ = PCQ<T>;
template <size_t M>
struct IntArray {
dottedLine();
std::cout << "$ numactl -N 1 $dir/unbounded_queue_test --bench\n";
dottedLine();
- std::cout << "Using a capacity of 256K for folly::ProducerConsumerQueue\n"
- << "and folly::MPMCQueue\n";
+ std::cout << "Using capacity " << FLAGS_capacity
+ << " for folly::ProducerConsumerQueue and\n"
+ << "folly::MPMCQueue\n";
std::cout << "=============================================================="
<< std::endl;
std::cout << "Test name Max time Avg time Min time"
..............................................................
$ numactl -N 1 $dir/unbounded_queue_test --bench
..............................................................
-Using a capacity of 256K for folly::ProducerConsumerQueue
-and folly::MPMCQueue
+Using capacity 262144 for folly::ProducerConsumerQueue and
+folly::MPMCQueue
==============================================================
Test name Max time Avg time Min time
====================== 1 prod 1 cons ======================
Unbounded SPMC wait spin only 36 ns 36 ns 35 ns
Unbounded SPMC try may block 202 ns 195 ns 190 ns
Unbounded SPMC timed may block 208 ns 197 ns 190 ns
-Unbounded SPMC wait may block 1645 ns 1427 ns 36 ns
+Unbounded SPMC wait may block 96 ns 77 ns 64 ns
..............................................................
Unbounded MPMC try spin only 204 ns 198 ns 194 ns
Unbounded MPMC timed spin only 202 ns 195 ns 190 ns
Unbounded MPMC wait spin only 61 ns 59 ns 57 ns
Unbounded MPMC try may block 206 ns 196 ns 191 ns
Unbounded MPMC timed may block 204 ns 198 ns 192 ns
-Unbounded MPMC wait may block 1658 ns 1293 ns 70 ns
+Unbounded MPMC wait may block 100 ns 88 ns 84 ns
..............................................................
folly::MPMC read 210 ns 191 ns 182 ns
folly::MPMC tryReadUntil 574 ns 248 ns 192 ns
Unbounded SPMC wait spin only 175 ns 51 ns 33 ns
Unbounded SPMC try may block 215 ns 203 ns 186 ns
Unbounded SPMC timed may block 453 ns 334 ns 204 ns
-Unbounded SPMC wait may block 1601 ns 1514 ns 1373 ns
+Unbounded SPMC wait may block 110 ns 87 ns 55 ns
..............................................................
Unbounded MPMC try spin only 328 ns 218 ns 197 ns
Unbounded MPMC timed spin only 217 ns 206 ns 200 ns
Unbounded MPMC wait spin only 147 ns 85 ns 58 ns
Unbounded MPMC try may block 310 ns 223 ns 199 ns
Unbounded MPMC timed may block 461 ns 275 ns 196 ns
-Unbounded MPMC wait may block 1623 ns 1526 ns 888 ns
+Unbounded MPMC wait may block 148 ns 111 ns 78 ns
..............................................................
folly::MPMC read 280 ns 215 ns 194 ns
folly::MPMC tryReadUntil 28740 ns 13508 ns 212 ns