From: Maged Michael Date: Fri, 8 Dec 2017 17:25:57 +0000 (-0800) Subject: UnboundedQueue: Add LgAlign template parameter - Refactor code X-Git-Tag: v2017.12.11.00~7 X-Git-Url: http://demsky.eecs.uci.edu/git/?a=commitdiff_plain;h=b782340cfac92c0793644599046036678f54bb7e;p=folly.git UnboundedQueue: Add LgAlign template parameter - Refactor code Summary: - Add a template parameter, LgAlign, to control memory usage. The parameter is used in DynamicBoundedQueue. - Refactor code. Reviewed By: yfeldblum Differential Revision: D6508015 fbshipit-source-id: 6e17b1d8fd900595147dc4217e04d379a13fbdf8 --- diff --git a/folly/concurrency/UnboundedQueue-inl.h b/folly/concurrency/UnboundedQueue-inl.h deleted file mode 100644 index 101f6683..00000000 --- a/folly/concurrency/UnboundedQueue-inl.h +++ /dev/null @@ -1,568 +0,0 @@ -/* - * Copyright 2017 Facebook, Inc. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -#pragma once - -#include -#include -#include - -#include - -namespace folly { - -/* constructor */ - -template < - typename T, - bool SingleProducer, - bool SingleConsumer, - bool MayBlock, - size_t LgSegmentSize, - template class Atom> -inline UnboundedQueue< - T, - SingleProducer, - SingleConsumer, - MayBlock, - LgSegmentSize, - Atom>::UnboundedQueue() { - setProducerTicket(0); - setConsumerTicket(0); - auto s = new Segment(0); - DEBUG_PRINT(s); - setTail(s); - setHead(s); -} - -/* destructor */ - -template < - typename T, - bool SingleProducer, - bool SingleConsumer, - bool MayBlock, - size_t LgSegmentSize, - template class Atom> -inline UnboundedQueue< - T, - SingleProducer, - SingleConsumer, - MayBlock, - LgSegmentSize, - Atom>::~UnboundedQueue() { - Segment* next; - for (auto s = head(); s; s = next) { - next = s->nextSegment(); - if (SPSC) { - delete s; - } else { - s->retire(); // hazptr - } - } -} - -/* dequeue */ - -template < - typename T, - bool SingleProducer, - bool SingleConsumer, - bool MayBlock, - size_t LgSegmentSize, - template class Atom> -FOLLY_ALWAYS_INLINE void UnboundedQueue< - T, - SingleProducer, - SingleConsumer, - MayBlock, - LgSegmentSize, - Atom>::dequeue(T& item) noexcept { - if (SPSC) { - auto s = head(); - dequeueCommon(s, item); - } else { - // Using hazptr_holder instead of hazptr_local because it is - // possible to call ~T() and it may happen to use hazard pointers. - folly::hazptr::hazptr_holder hptr; - auto s = hptr.get_protected(head_); - dequeueCommon(s, item); - } -} - -/* try_dequeue_until */ - -template < - typename T, - bool SingleProducer, - bool SingleConsumer, - bool MayBlock, - size_t LgSegmentSize, - template class Atom> -template -FOLLY_ALWAYS_INLINE bool UnboundedQueue< - T, - SingleProducer, - SingleConsumer, - MayBlock, - LgSegmentSize, - Atom>:: - try_dequeue_until( - T& item, - const std::chrono::time_point& deadline) noexcept { - if (SingleConsumer) { - auto s = head(); - return singleConsumerTryDequeueUntil(s, item, deadline); - } else { - // Using hazptr_holder instead of hazptr_local because it is - // possible to call ~T() and it may happen to use hazard pointers. - folly::hazptr::hazptr_holder hptr; - auto s = hptr.get_protected(head_); - return multiConsumerTryDequeueUntil(s, item, deadline); - } -} - -/* enqueueImpl */ - -template < - typename T, - bool SingleProducer, - bool SingleConsumer, - bool MayBlock, - size_t LgSegmentSize, - template class Atom> -template -FOLLY_ALWAYS_INLINE void UnboundedQueue< - T, - SingleProducer, - SingleConsumer, - MayBlock, - LgSegmentSize, - Atom>::enqueueImpl(Arg&& arg) { - if (SPSC) { - auto s = tail(); - enqueueCommon(s, std::forward(arg)); - } else { - // Using hazptr_holder instead of hazptr_local because it is - // possible that the T construcctor happens to use hazard - // pointers. - folly::hazptr::hazptr_holder hptr; - auto s = hptr.get_protected(tail_); - enqueueCommon(s, std::forward(arg)); - } -} - -/* enqueueCommon */ - -template < - typename T, - bool SingleProducer, - bool SingleConsumer, - bool MayBlock, - size_t LgSegmentSize, - template class Atom> -template -FOLLY_ALWAYS_INLINE void UnboundedQueue< - T, - SingleProducer, - SingleConsumer, - MayBlock, - LgSegmentSize, - Atom>::enqueueCommon(Segment* s, Arg&& arg) { - auto t = fetchIncrementProducerTicket(); - if (!SingleProducer) { - s = findSegment(s, t); - } - DCHECK_GE(t, s->minTicket()); - DCHECK_LT(t, (s->minTicket() + SegmentSize)); - size_t idx = index(t); - Entry& e = s->entry(idx); - e.putItem(std::forward(arg)); - if (responsibleForAlloc(t)) { - allocNextSegment(s, t + SegmentSize); - } - if (responsibleForAdvance(t)) { - advanceTail(s); - } -} - -/* dequeueCommon */ - -template < - typename T, - bool SingleProducer, - bool SingleConsumer, - bool MayBlock, - size_t LgSegmentSize, - template class Atom> -FOLLY_ALWAYS_INLINE void UnboundedQueue< - T, - SingleProducer, - SingleConsumer, - MayBlock, - LgSegmentSize, - Atom>::dequeueCommon(Segment* s, T& item) noexcept { - auto t = fetchIncrementConsumerTicket(); - if (!SingleConsumer) { - s = findSegment(s, t); - } - size_t idx = index(t); - Entry& e = s->entry(idx); - e.takeItem(item); - if (responsibleForAdvance(t)) { - advanceHead(s); - } -} - -/* singleConsumerTryDequeueUntil */ - -template < - typename T, - bool SingleProducer, - bool SingleConsumer, - bool MayBlock, - size_t LgSegmentSize, - template class Atom> -template -FOLLY_ALWAYS_INLINE bool UnboundedQueue< - T, - SingleProducer, - SingleConsumer, - MayBlock, - LgSegmentSize, - Atom>:: - singleConsumerTryDequeueUntil( - Segment* s, - T& item, - const std::chrono::time_point& deadline) noexcept { - auto t = consumerTicket(); - DCHECK_GE(t, s->minTicket()); - DCHECK_LT(t, (s->minTicket() + SegmentSize)); - size_t idx = index(t); - Entry& e = s->entry(idx); - if (!e.tryWaitUntil(deadline)) { - return false; - } - setConsumerTicket(t + 1); - e.takeItem(item); - if (responsibleForAdvance(t)) { - advanceHead(s); - } - return true; -} - -/* multiConsumerTryDequeueUntil */ - -template < - typename T, - bool SingleProducer, - bool SingleConsumer, - bool MayBlock, - size_t LgSegmentSize, - template class Atom> -template -FOLLY_ALWAYS_INLINE bool UnboundedQueue< - T, - SingleProducer, - SingleConsumer, - MayBlock, - LgSegmentSize, - Atom>:: - multiConsumerTryDequeueUntil( - Segment* s, - T& item, - const std::chrono::time_point& deadline) noexcept { - while (true) { - auto t = consumerTicket(); - if (UNLIKELY(t >= (s->minTicket() + SegmentSize))) { - Segment* next; - // Note that the following loop will not spin indefinitely (as - // long as the number of concurrently waiting consumers is not - // greater than SegmentSize). The algorithm guarantees in such a - // case that the producer reponsible for setting the next - // pointer is already running. - while ((next = s->nextSegment()) == nullptr) { - if (Clock::now() > deadline) { - return false; - } - asm_volatile_pause(); - } - s = next; - DCHECK(s != nullptr); - continue; - } - size_t idx = index(t); - Entry& e = s->entry(idx); - if (!e.tryWaitUntil(deadline)) { - return false; - } - if (!consumerTicket_.compare_exchange_weak( - t, t + 1, std::memory_order_acq_rel, std::memory_order_acquire)) { - continue; - } - e.takeItem(item); - if (responsibleForAdvance(t)) { - advanceHead(s); - } - return true; - } -} - -/* findSegment */ - -template < - typename T, - bool SingleProducer, - bool SingleConsumer, - bool MayBlock, - size_t LgSegmentSize, - template class Atom> -inline typename UnboundedQueue< - T, - SingleProducer, - SingleConsumer, - MayBlock, - LgSegmentSize, - Atom>::Segment* -UnboundedQueue< - T, - SingleProducer, - SingleConsumer, - MayBlock, - LgSegmentSize, - Atom>::findSegment(Segment* s, const Ticket t) const noexcept { - while (t >= (s->minTicket() + SegmentSize)) { - Segment* next = s->nextSegment(); - // Note that the following loop will not spin indefinitely. The - // algorithm guarantees that the producer reponsible for setting - // the next pointer is already running. - while (next == nullptr) { - asm_volatile_pause(); - next = s->nextSegment(); - } - DCHECK(next != nullptr); - s = next; - } - return s; -} - -/* allocNextSegment */ - -template < - typename T, - bool SingleProducer, - bool SingleConsumer, - bool MayBlock, - size_t LgSegmentSize, - template class Atom> -inline void UnboundedQueue< - T, - SingleProducer, - SingleConsumer, - MayBlock, - LgSegmentSize, - Atom>::allocNextSegment(Segment* s, const Ticket t) { - auto next = new Segment(t); - if (!SPSC) { - next->acquire_ref_safe(); // hazptr - } - DEBUG_PRINT(s << " " << next); - DCHECK(s->nextSegment() == nullptr); - s->setNextSegment(next); -} - -/* advanceTail */ - -template < - typename T, - bool SingleProducer, - bool SingleConsumer, - bool MayBlock, - size_t LgSegmentSize, - template class Atom> -inline void UnboundedQueue< - T, - SingleProducer, - SingleConsumer, - MayBlock, - LgSegmentSize, - Atom>::advanceTail(Segment* s) noexcept { - Segment* next = s->nextSegment(); - if (!SingleProducer) { - // Note that the following loop will not spin indefinitely. The - // algorithm guarantees that the producer reponsible for setting - // the next pointer is already running. - while (next == nullptr) { - asm_volatile_pause(); - next = s->nextSegment(); - } - } - DCHECK(next != nullptr); - DEBUG_PRINT(s << " " << next); - setTail(next); -} - -/* advanceHead */ - -template < - typename T, - bool SingleProducer, - bool SingleConsumer, - bool MayBlock, - size_t LgSegmentSize, - template class Atom> -inline void UnboundedQueue< - T, - SingleProducer, - SingleConsumer, - MayBlock, - LgSegmentSize, - Atom>::advanceHead(Segment* s) noexcept { - // Note that the following loops will not spin indefinitely. The - // algorithm guarantees that the producers reponsible for advancing - // the tail pointer and setting the next pointer are already - // running. - while (tail() == s) { - asm_volatile_pause(); - } - auto next = s->nextSegment(); - while (next == nullptr) { - next = s->nextSegment(); - } - DEBUG_PRINT(s << " " << next); - setHead(next); - if (SPSC) { - delete s; - } else { - s->retire(); // hazptr - } -} - -/** - * Entry - */ - -template < - typename T, - bool SingleProducer, - bool SingleConsumer, - bool MayBlock, - size_t LgSegmentSize, - template class Atom> -class UnboundedQueue< - T, - SingleProducer, - SingleConsumer, - MayBlock, - LgSegmentSize, - Atom>::Entry { - folly::SaturatingSemaphore flag_; - typename std::aligned_storage::type item_; - - public: - template - FOLLY_ALWAYS_INLINE void putItem(Arg&& arg) { - new (&item_) T(std::forward(arg)); - flag_.post(); - } - - FOLLY_ALWAYS_INLINE void takeItem(T& item) noexcept { - flag_.wait(); - getItem(item); - } - - template - FOLLY_ALWAYS_INLINE bool tryWaitUntil( - const std::chrono::time_point& deadline) noexcept { - return flag_.try_wait_until(deadline); - } - - private: - FOLLY_ALWAYS_INLINE void getItem(T& item) noexcept { - item = std::move(*(folly::launder(itemPtr()))); - destroyItem(); - } - - FOLLY_ALWAYS_INLINE T* itemPtr() noexcept { - return static_cast(static_cast(&item_)); - } - - FOLLY_ALWAYS_INLINE void destroyItem() noexcept { - itemPtr()->~T(); - } -}; // UnboundedQueue::Entry - -/** - * Segment - */ - -template < - typename T, - bool SingleProducer, - bool SingleConsumer, - bool MayBlock, - size_t LgSegmentSize, - template class Atom> -class UnboundedQueue< - T, - SingleProducer, - SingleConsumer, - MayBlock, - LgSegmentSize, - Atom>::Segment : public folly::hazptr::hazptr_obj_base_refcounted { - Atom next_; - const Ticket min_; - bool marked_; // used for iterative deletion - FOLLY_ALIGN_TO_AVOID_FALSE_SHARING - Entry b_[SegmentSize]; - - public: - explicit Segment(const Ticket t) : next_(nullptr), min_(t), marked_(false) {} - - ~Segment() { - if (!SPSC && !marked_) { - auto next = nextSegment(); - while (next) { - if (!next->release_ref()) { // hazptr - return; - } - auto s = next; - next = s->nextSegment(); - s->marked_ = true; - delete s; - } - } - } - - Segment* nextSegment() const noexcept { - return next_.load(std::memory_order_acquire); - } - - void setNextSegment(Segment* s) noexcept { - next_.store(s, std::memory_order_release); - } - - FOLLY_ALWAYS_INLINE Ticket minTicket() const noexcept { - DCHECK_EQ((min_ & (SegmentSize - 1)), 0); - return min_; - } - - FOLLY_ALWAYS_INLINE Entry& entry(size_t index) noexcept { - return b_[index]; - } -}; // UnboundedQueue::Segment - -} // namespace folly diff --git a/folly/concurrency/UnboundedQueue.h b/folly/concurrency/UnboundedQueue.h index 1ff19ed2..7320b6cd 100644 --- a/folly/concurrency/UnboundedQueue.h +++ b/folly/concurrency/UnboundedQueue.h @@ -1,5 +1,5 @@ /* - * Copyright 2017 Facebook, Inc. + * Copyright 2017-present Facebook, Inc. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -20,7 +20,11 @@ #include #include +#include + #include +#include +#include namespace folly { @@ -42,6 +46,9 @@ namespace folly { /// spins. A performance tuning parameter. /// - LgSegmentSize (default 8): Log base 2 of number of elements per /// segment. A performance tuning parameter. See below. +/// - LgAlign (default 7): Log base 2 of alignment directive; can be +/// used to balance scalability (avoidance of false sharing) with +/// memory efficiency. /// /// When to use UnboundedQueue: /// - If a small bound may lead to deadlock or performance degradation @@ -56,10 +63,10 @@ namespace folly { /// SPSC) ProducerConsumerQueue. /// /// Template Aliases: -/// USPSCQueue -/// UMPSCQueue -/// USPMCQueue -/// UMPMCQueue +/// USPSCQueue +/// UMPSCQueue +/// USPMCQueue +/// UMPMCQueue /// /// Functions: /// Producer operations never wait or fail (unless OOM) @@ -145,6 +152,8 @@ namespace folly { /// producers or consumers, have references to them or their /// predessors. That is, a lagging thread may delay the reclamation /// of a chain of removed segments. +/// - The template parameter LgAlign can be used to reduce memory usage +/// at the cost of increased chance of false sharing. /// /// Performance considerations: /// - All operations take constant time, excluding the costs of @@ -188,6 +197,7 @@ template < bool SingleConsumer, bool MayBlock, size_t LgSegmentSize = 8, + size_t LgAlign = 7, template class Atom = std::atomic> class UnboundedQueue { using Ticket = uint64_t; @@ -195,25 +205,42 @@ class UnboundedQueue { class Segment; static constexpr bool SPSC = SingleProducer && SingleConsumer; - static constexpr size_t SegmentSize = 1 << LgSegmentSize; static constexpr size_t Stride = SPSC || (LgSegmentSize <= 1) ? 1 : 27; + static constexpr size_t SegmentSize = 1u << LgSegmentSize; + static constexpr size_t Align = 1u << LgAlign; static_assert( std::is_nothrow_destructible::value, "T must be nothrow_destructible"); static_assert((Stride & 1) == 1, "Stride must be odd"); static_assert(LgSegmentSize < 32, "LgSegmentSize must be < 32"); + static_assert(LgAlign < 16, "LgAlign must be < 16"); - FOLLY_ALIGN_TO_AVOID_FALSE_SHARING + FOLLY_ALIGNED(Align) Atom head_; Atom consumerTicket_; - FOLLY_ALIGN_TO_AVOID_FALSE_SHARING + FOLLY_ALIGNED(Align) Atom tail_; Atom producerTicket_; public: - UnboundedQueue(); - ~UnboundedQueue(); + /** constructor */ + UnboundedQueue() { + setProducerTicket(0); + setConsumerTicket(0); + Segment* s = new Segment(0); + setTail(s); + setHead(s); + } + + /** destructor */ + ~UnboundedQueue() { + Segment* next; + for (Segment* s = head(); s; s = next) { + next = s->nextSegment(); + reclaimSegment(s); + } + } /** enqueue */ FOLLY_ALWAYS_INLINE void enqueue(const T& arg) { @@ -225,26 +252,32 @@ class UnboundedQueue { } /** dequeue */ - void dequeue(T& item) noexcept; + FOLLY_ALWAYS_INLINE void dequeue(T& item) noexcept { + dequeueImpl(item); + } /** try_dequeue */ - bool try_dequeue(T& item) noexcept { - return try_dequeue_until( - item, std::chrono::steady_clock::time_point::min()); + FOLLY_ALWAYS_INLINE bool try_dequeue(T& item) noexcept { + return tryDequeueUntil(item, std::chrono::steady_clock::time_point::min()); } /** try_dequeue_until */ template - bool try_dequeue_until( + FOLLY_ALWAYS_INLINE bool try_dequeue_until( T& item, - const std::chrono::time_point& deadline) noexcept; + const std::chrono::time_point& deadline) noexcept { + return tryDequeueUntil(item, deadline); + } /** try_dequeue_for */ template - bool try_dequeue_for( + FOLLY_ALWAYS_INLINE bool try_dequeue_for( T& item, const std::chrono::duration& duration) noexcept { - return try_dequeue_until(item, std::chrono::steady_clock::now() + duration); + if (LIKELY(try_dequeue(item))) { + return true; + } + return tryDequeueUntil(item, std::chrono::steady_clock::now() + duration); } /** size */ @@ -262,33 +295,218 @@ class UnboundedQueue { } private: + /** enqueueImpl */ template - void enqueueImpl(Arg&& arg); + FOLLY_ALWAYS_INLINE void enqueueImpl(Arg&& arg) { + if (SPSC) { + Segment* s = tail(); + enqueueCommon(s, std::forward(arg)); + } else { + // Using hazptr_holder instead of hazptr_local because it is + // possible that the T ctor happens to use hazard pointers. + folly::hazptr::hazptr_holder hptr; + Segment* s = hptr.get_protected(tail_); + enqueueCommon(s, std::forward(arg)); + } + } + /** enqueueCommon */ template - void enqueueCommon(Segment* s, Arg&& arg); + FOLLY_ALWAYS_INLINE void enqueueCommon(Segment* s, Arg&& arg) { + Ticket t = fetchIncrementProducerTicket(); + if (!SingleProducer) { + s = findSegment(s, t); + } + DCHECK_GE(t, s->minTicket()); + DCHECK_LT(t, s->minTicket() + SegmentSize); + size_t idx = index(t); + Entry& e = s->entry(idx); + e.putItem(std::forward(arg)); + if (responsibleForAlloc(t)) { + allocNextSegment(s, t + SegmentSize); + } + if (responsibleForAdvance(t)) { + advanceTail(s); + } + } - void dequeueCommon(Segment* s, T& item) noexcept; + /** dequeueImpl */ + FOLLY_ALWAYS_INLINE void dequeueImpl(T& item) noexcept { + if (SPSC) { + Segment* s = head(); + dequeueCommon(s, item); + } else { + // Using hazptr_holder instead of hazptr_local because it is + // possible to call the T dtor and it may happen to use hazard + // pointers. + folly::hazptr::hazptr_holder hptr; + Segment* s = hptr.get_protected(head_); + dequeueCommon(s, item); + } + } + /** dequeueCommon */ + FOLLY_ALWAYS_INLINE void dequeueCommon(Segment* s, T& item) noexcept { + Ticket t = fetchIncrementConsumerTicket(); + if (!SingleConsumer) { + s = findSegment(s, t); + } + size_t idx = index(t); + Entry& e = s->entry(idx); + e.takeItem(item); + if (responsibleForAdvance(t)) { + advanceHead(s); + } + } + + /** tryDequeueUntil */ template - bool singleConsumerTryDequeueUntil( + FOLLY_ALWAYS_INLINE bool tryDequeueUntil( + T& item, + const std::chrono::time_point& deadline) noexcept { + if (SingleConsumer) { + Segment* s = head(); + return tryDequeueUntilSC(s, item, deadline); + } else { + // Using hazptr_holder instead of hazptr_local because it is + // possible to call ~T() and it may happen to use hazard pointers. + folly::hazptr::hazptr_holder hptr; + Segment* s = hptr.get_protected(head_); + return ryDequeueUntilMC(s, item, deadline); + } + } + + /** ryDequeueUntilSC */ + template + FOLLY_ALWAYS_INLINE bool tryDequeueUntilSC( Segment* s, T& item, - const std::chrono::time_point& deadline) noexcept; + const std::chrono::time_point& deadline) noexcept { + Ticket t = consumerTicket(); + DCHECK_GE(t, s->minTicket()); + DCHECK_LT(t, (s->minTicket() + SegmentSize)); + size_t idx = index(t); + Entry& e = s->entry(idx); + if (!e.tryWaitUntil(deadline)) { + return false; + } + setConsumerTicket(t + 1); + e.takeItem(item); + if (responsibleForAdvance(t)) { + advanceHead(s); + } + return true; + } + /** tryDequeueUntilMC */ template - bool multiConsumerTryDequeueUntil( + FOLLY_ALWAYS_INLINE bool ryDequeueUntilMC( Segment* s, T& item, - const std::chrono::time_point& deadline) noexcept; + const std::chrono::time_point& deadline) noexcept { + while (true) { + Ticket t = consumerTicket(); + if (UNLIKELY(t >= (s->minTicket() + SegmentSize))) { + s = tryGetNextSegmentUntil(s, deadline); + if (s == nullptr) { + return false; // timed out + } + continue; + } + size_t idx = index(t); + Entry& e = s->entry(idx); + if (!e.tryWaitUntil(deadline)) { + return false; + } + if (!consumerTicket_.compare_exchange_weak( + t, t + 1, std::memory_order_acq_rel, std::memory_order_acquire)) { + continue; + } + e.takeItem(item); + if (responsibleForAdvance(t)) { + advanceHead(s); + } + return true; + } + } - Segment* findSegment(Segment* s, const Ticket t) const noexcept; + /** findSegment */ + FOLLY_ALWAYS_INLINE + Segment* findSegment(Segment* s, const Ticket t) const noexcept { + while (UNLIKELY(t >= (s->minTicket() + SegmentSize))) { + auto deadline = std::chrono::steady_clock::time_point::max(); + s = tryGetNextSegmentUntil(s, deadline); + DCHECK(s != nullptr); + } + return s; + } - void allocNextSegment(Segment* s, const Ticket t); + /** tryGetNextSegmentUntil */ + template + Segment* tryGetNextSegmentUntil( + Segment* s, + const std::chrono::time_point& deadline) const noexcept { + // The following loop will not spin indefinitely (as long as the + // number of concurrently waiting consumers does not exceeds + // SegmentSize and the OS scheduler does not pause ready threads + // indefinitely). Under such conditions, the algorithm guarantees + // that the producer reponsible for advancing the tail pointer to + // the next segment has already acquired its ticket. + while (tail() == s) { + if (deadline < Clock::time_point::max() && deadline > Clock::now()) { + return nullptr; + } + asm_volatile_pause(); + } + Segment* next = s->nextSegment(); + DCHECK(next != nullptr); + return next; + } - void advanceTail(Segment* s) noexcept; + /** allocNextSegment */ + void allocNextSegment(Segment* s, const Ticket t) { + Segment* next = new Segment(t); + if (!SPSC) { + next->acquire_ref_safe(); // hazptr + } + DCHECK(s->nextSegment() == nullptr); + s->setNextSegment(next); + } - void advanceHead(Segment* s) noexcept; + /** advanceTail */ + void advanceTail(Segment* s) noexcept { + Segment* next = s->nextSegment(); + if (!SingleProducer) { + // The following loop will not spin indefinitely (as long as the + // OS scheduler does not pause ready threads indefinitely). The + // algorithm guarantees that the producer reponsible for setting + // the next pointer has already acquired its ticket. + while (next == nullptr) { + asm_volatile_pause(); + next = s->nextSegment(); + } + } + DCHECK(next != nullptr); + setTail(next); + } + + /** advanceHead */ + void advanceHead(Segment* s) noexcept { + auto deadline = std::chrono::steady_clock::time_point::max(); + Segment* next = tryGetNextSegmentUntil(s, deadline); + DCHECK(next != nullptr); + setHead(next); + reclaimSegment(s); + } + + /** reclaimSegment */ + void reclaimSegment(Segment* s) noexcept { + if (SPSC) { + delete s; + } else { + s->retire(); // hazptr + } + } FOLLY_ALWAYS_INLINE size_t index(Ticket t) const noexcept { return (t * Stride) & (SegmentSize - 1); @@ -336,7 +554,7 @@ class UnboundedQueue { FOLLY_ALWAYS_INLINE Ticket fetchIncrementConsumerTicket() noexcept { if (SingleConsumer) { - auto oldval = consumerTicket(); + Ticket oldval = consumerTicket(); setConsumerTicket(oldval + 1); return oldval; } else { // MC @@ -346,13 +564,101 @@ class UnboundedQueue { FOLLY_ALWAYS_INLINE Ticket fetchIncrementProducerTicket() noexcept { if (SingleProducer) { - auto oldval = producerTicket(); + Ticket oldval = producerTicket(); setProducerTicket(oldval + 1); return oldval; } else { // MP return producerTicket_.fetch_add(1, std::memory_order_acq_rel); } } + + /** + * Entry + */ + class Entry { + folly::SaturatingSemaphore flag_; + typename std::aligned_storage::type item_; + + public: + template + FOLLY_ALWAYS_INLINE void putItem(Arg&& arg) { + new (&item_) T(std::forward(arg)); + flag_.post(); + } + + FOLLY_ALWAYS_INLINE void takeItem(T& item) noexcept { + flag_.wait(); + getItem(item); + } + + template + FOLLY_ALWAYS_INLINE bool tryWaitUntil( + const std::chrono::time_point& deadline) noexcept { + return flag_.try_wait_until(deadline); + } + + private: + FOLLY_ALWAYS_INLINE void getItem(T& item) noexcept { + item = std::move(*(itemPtr())); + destroyItem(); + } + + FOLLY_ALWAYS_INLINE T* itemPtr() noexcept { + return static_cast(static_cast(&item_)); + } + + FOLLY_ALWAYS_INLINE void destroyItem() noexcept { + itemPtr()->~T(); + } + }; // Entry + + /** + * Segment + */ + class Segment : public folly::hazptr::hazptr_obj_base_refcounted { + Atom next_; + const Ticket min_; + bool marked_; // used for iterative deletion + FOLLY_ALIGNED(Align) + Entry b_[SegmentSize]; + + public: + explicit Segment(const Ticket t) + : next_(nullptr), min_(t), marked_(false) {} + + ~Segment() { + if (!SPSC && !marked_) { + Segment* next = nextSegment(); + while (next) { + if (!next->release_ref()) { // hazptr + return; + } + Segment* s = next; + next = s->nextSegment(); + s->marked_ = true; + delete s; + } + } + } + + Segment* nextSegment() const noexcept { + return next_.load(std::memory_order_acquire); + } + + void setNextSegment(Segment* s) noexcept { + next_.store(s, std::memory_order_release); + } + + FOLLY_ALWAYS_INLINE Ticket minTicket() const noexcept { + DCHECK_EQ((min_ & (SegmentSize - 1)), 0); + return min_; + } + + FOLLY_ALWAYS_INLINE Entry& entry(size_t index) noexcept { + return b_[index]; + } + }; // Segment + }; // UnboundedQueue /* Aliases */ @@ -361,33 +667,36 @@ template < typename T, bool MayBlock, size_t LgSegmentSize = 8, + size_t LgAlign = 7, template class Atom = std::atomic> -using USPSCQueue = UnboundedQueue; +using USPSCQueue = + UnboundedQueue; template < typename T, bool MayBlock, size_t LgSegmentSize = 8, + size_t LgAlign = 7, template class Atom = std::atomic> using UMPSCQueue = - UnboundedQueue; + UnboundedQueue; template < typename T, bool MayBlock, size_t LgSegmentSize = 8, + size_t LgAlign = 7, template class Atom = std::atomic> using USPMCQueue = - UnboundedQueue; + UnboundedQueue; template < typename T, bool MayBlock, size_t LgSegmentSize = 8, + size_t LgAlign = 7, template class Atom = std::atomic> using UMPMCQueue = - UnboundedQueue; + UnboundedQueue; } // namespace folly - -#include diff --git a/folly/concurrency/test/UnboundedQueueTest.cpp b/folly/concurrency/test/UnboundedQueueTest.cpp index 4eccea11..0139b156 100644 --- a/folly/concurrency/test/UnboundedQueueTest.cpp +++ b/folly/concurrency/test/UnboundedQueueTest.cpp @@ -1,5 +1,5 @@ /* - * Copyright 2017 Facebook, Inc. + * Copyright 2017-present Facebook, Inc. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -27,6 +27,7 @@ DEFINE_bool(bench, false, "run benchmark"); DEFINE_int32(reps, 10, "number of reps"); DEFINE_int32(ops, 1000000, "number of operations per rep"); +DEFINE_int64(capacity, 256 * 1024, "capacity"); template using USPSC = folly::USPSCQueue; @@ -81,7 +82,7 @@ TEST(UnboundedQueue, basic) { template