/// Back-off strategy
typedef cds::backoff::Default back_off;
+ /// Single-consumer version
+ /**
+ For single-consumer version of algorithm some additional functions
+ (\p front(), \p pop_front()) is available.
+ Default is \p false
+ */
+ static CDS_CONSTEXPR bool const single_consumer = false;
/// Metafunction converting option list to \p vyukov_queue::traits
No dynamic memory allocation/management during operation. Producers and consumers are separated from each other (as in the two-lock queue),
i.e. do not touch the same data while queue is not empty.
+ There is multiple producer/single consumer version \p cds::container::VyukovMPSCCycleQueue
+ that supports \p front() and \p pop_front() functions.
- http://www.1024cores.net/home/lock-free-algorithms/queues/bounded-mpmc-queue
typedef typename traits::value_cleaner value_cleaner; ///< Value cleaner, see \p vyukov_queue::traits::value_cleaner
typedef typename traits::back_off back_off; ///< back-off strategy
+ /// \p true for single-consumer version, \p false otherwise
+ static CDS_CONSTEXPR bool const c_single_consumer = traits::single_consumer;
/// Rebind template arguments
template <typename T2, typename Traits2>
struct rebind {
return dequeue_with( f );
+ /// Returns a pointer to top element of the queue or \p nullptr if queue is empty (only for single-consumer version)
+ template <bool SC = c_single_consumer >
+ typename std::enable_if<SC, value_type *>::type front()
+ {
+ static_assert( c_single_consumer, "front() is enabled only if traits::single_consumer is true");
+ cell_type * cell;
+ back_off bkoff;
+ size_t pos = m_posDequeue.load( memory_model::memory_order_relaxed );
+ for ( ;;)
+ {
+ cell = &m_buffer[pos & m_nBufferMask];
+ size_t seq = cell->sequence.load( memory_model::memory_order_acquire );
+ intptr_t dif = static_cast<intptr_t>(seq) - static_cast<intptr_t>(pos + 1);
+ if ( dif == 0 )
+ return &cell->data;
+ else if ( dif < 0 ) {
+ // Queue empty?
+ if ( pos - m_posEnqueue.load( memory_model::memory_order_relaxed ) == 0 )
+ return nullptr; // queue empty
+ bkoff();
+ pos = m_posDequeue.load( memory_model::memory_order_relaxed );
+ }
+ else
+ pos = m_posDequeue.load( memory_model::memory_order_relaxed );
+ }
+ }
+ /// Pops top element; returns \p true if queue is not empty, \p false otherwise (only for single-consumer version)
+ template <bool SC = c_single_consumer >
+ typename std::enable_if<SC, bool>::type pop_front()
+ {
+ return dequeue_with( []( value_type& ) {} );
+ }
/// Checks if the queue is empty
bool empty() const
return m_buffer.capacity();
+ //@cond
+ namespace vyukov_queue {
+ template <typename Traits>
+ struct single_consumer_traits : public Traits
+ {
+ static CDS_CONSTEXPR bool const single_consumer = true;
+ };
+ } // namespace vyukov_queue
+ //@endcond
+ /// Vyukov's queue multiple producer - single consumer version
+ template <typename T, typename Traits = vyukov_queue::traits >
+ using VyukovMPSCCycleQueue = VyukovMPMCCycleQueue< T, vyukov_queue::single_consumer_traits<Traits> >;
}} // namespace cds::container
assert( m_p );
m_funcFree( m_p );
- CDS_STRICT_DO( m_p = nullptr );
- CDS_STRICT_DO( m_funcFree = nullptr );
+ CDS_STRICT_DO( clear() );
+ }
+ /// Checks if the retired pointer is not empty
+ explicit operator bool() const CDS_NOEXCEPT
+ {
+ return m_p != nullptr;
+ }
+ /// Clears retired pointer without \p free() call
+ void clear()
+ {
+ m_p = nullptr;
+ m_funcFree = nullptr;
template <typename Func>
std::pair<iterator, bool> update_at_locked( position& pos, value_type& val, Func func, bool bInsert )
- // RCU lock should be locked!!!
+ // RCU should be locked!!!
assert( gc::is_locked() );
while ( true ) {
struct epoch_retired_ptr: public retired_ptr
- uint64_t m_nEpoch ; ///< The epoch when the object has been retired
+ uint64_t m_nEpoch; ///< The epoch when the object has been retired
i.e. until the RCU quiescent state will come. After that the buffer and all retired objects are freed.
This synchronization cycle may be called in any thread that calls \p retire_ptr function.
- The \p Buffer contains items of \ref cds_urcu_retired_ptr "retired_ptr" type and it should support a queue interface with
+ The \p Buffer contains items of \ref cds_urcu_retired_ptr "epoch_retired_ptr" type and it should support a queue interface with
three function:
- <tt> bool push( retired_ptr& p ) </tt> - places the retired pointer \p p into queue. If the function
returns \p false it means that the buffer is full and RCU synchronization cycle must be processed.
this function must return \p false
- <tt>size_t size()</tt> - returns queue's item count.
- The buffer is considered as full if \p push returns \p false or the buffer size reaches the RCU threshold.
+ The buffer is considered as full if \p push() returns \p false or the buffer size reaches the RCU threshold.
There is a wrapper \ref cds_urcu_general_buffered_gc "gc<general_buffered>" for \p %general_buffered class
that provides unified RCU interface. You should use this wrapper class instead \p %general_buffered
Template arguments:
- - \p Buffer - buffer type. Default is cds::container::VyukovMPMCCycleQueue
+ - \p Buffer - buffer type. Default is \p cds::container::VyukovMPMCCycleQueue
- \p Lock - mutex type, default is \p std::mutex
- \p Backoff - back-off schema, default is cds::backoff::Default
- buffer_type m_Buffer;
- atomics::atomic<uint64_t> m_nCurEpoch;
- lock_type m_Lock;
- size_t const m_nCapacity;
+ buffer_type m_Buffer;
+ atomics::atomic<uint64_t> m_nCurEpoch;
+ lock_type m_Lock;
+ size_t const m_nCapacity;
This implementation is similar to \ref general_buffered but separate thread is created
for deleting the retired objects. Like \p %general_buffered, the class contains an internal buffer
where retired objects are accumulated. When the buffer becomes full,
- the RCU \p synchronize function is called that waits until all reader/updater threads end up their read-side critical sections,
- i.e. until the RCU quiescent state will come. After that the "work ready" message is sent to reclamation tread.
+ the RCU \p synchronize() function is called that waits until all reader/updater threads end up their read-side critical sections,
+ i.e. until the RCU quiescent state will come. After that the "work ready" message is sent to reclamation thread.
The reclamation thread frees the buffer.
- This synchronization cycle may be called in any thread that calls \ref retire_ptr function.
+ This synchronization cycle may be called in any thread that calls \p retire_ptr() function.
There is a wrapper \ref cds_urcu_general_threaded_gc "gc<general_threaded>" for \p %general_threaded class
that provides unified RCU interface. You should use this wrapper class instead \p %general_threaded
+ The \p Buffer contains items of \ref cds_urcu_retired_ptr "epoch_retired_ptr" type
+ and it should support a multiple producer/single consumer queue with the following interface:
+ - <tt> bool push( epoch_retired_ptr& p ) </tt> - places the retired pointer \p p into queue. If the function
+ returns \p false it means that the buffer is full and RCU synchronization cycle must be processed.
+ - <tt>epoch_retired_ptr * front() </tt> - returns a pointer to the top element or \p nullptr if the buffer is empty.
+ - <tt>bool pop_front() </tt> - pops the top element; returns \p false if the buffer is empty.
+ - <tt>size_t size()</tt> - returns queue's item count.
+ The buffer is considered as full if \p push() returns \p false or the buffer size reaches the RCU threshold.
Template arguments:
- - \p Buffer - buffer type with FIFO semantics. Default is \p cds::container::VyukovMPMCCycleQueue. See \ref general_buffered
- for description of buffer's interface. The buffer contains the objects of \ref epoch_retired_ptr
+ - \p Buffer - MPSC (muliple producer/single consumer) buffer type with FIFO semantics.
+ Default is \p cds::container::VyukovMPSCCycleQueue. The buffer contains the objects of \ref epoch_retired_ptr
type that contains additional \p m_nEpoch field. This field specifies an epoch when the object
has been placed into the buffer. The \p %general_threaded object has a global epoch counter
- that is incremented on each \p synchronize call. The epoch is used internally to prevent early deletion.
+ that is incremented on each \p synchronize() call. The epoch is used internally to prevent early deletion.
- \p Lock - mutex type, default is \p std::mutex
- \p DisposerThread - the reclamation thread class. Default is \ref cds::urcu::dispose_thread,
see the description of this class for required interface.
- \p Backoff - back-off schema, default is cds::backoff::Default
template <
- class Buffer = cds::container::VyukovMPMCCycleQueue< epoch_retired_ptr >
+ class Buffer = cds::container::VyukovMPSCCycleQueue< epoch_retired_ptr >
,class Lock = std::mutex
,class DisposerThread = dispose_thread<Buffer>
,class Backoff = cds::backoff::Default
bool bPushed = m_Buffer.push( p );
if ( !bPushed || m_Buffer.size() >= capacity() ) {
- if ( !bPushed ) {
+ if ( !bPushed )
- }
return true;
return false;
/// Waits to finish a grace period and calls disposing thread
void synchronize()
void synchronize( bool bSync )
uint64_t nPrevEpoch = m_nCurEpoch.fetch_add( 1, atomics::memory_order_release );
std::unique_lock<lock_type> sl( m_Lock );
m_DisposerThread.dispose( m_Buffer, nPrevEpoch, bSync );
void force_dispose()
that provides unified RCU interface. You should use this wrapper class instead \p %signal_threaded
Template arguments:
- - \p Buffer - buffer type with FIFO semantics. Default is cds::container::VyukovMPMCCycleQueue. See \ref signal_buffered
+ - \p Buffer - buffer type with FIFO semantics. Default is \p cds::container::VyukovMPSCCycleQueue. See \ref signal_buffered
for description of buffer's interface. The buffer contains the objects of \ref epoch_retired_ptr
type that contains additional \p m_nEpoch field. This field specifies an epoch when the object
has been placed into the buffer. The \p %signal_threaded object has a global epoch counter
- that is incremented on each \p synchronize call. The epoch is used internally to prevent early deletion.
+ that is incremented on each \p synchronize() call. The epoch is used internally to prevent early deletion.
- \p Lock - mutex type, default is \p std::mutex
- \p DisposerThread - the reclamation thread class. Default is \ref cds::urcu::dispose_thread,
see the description of this class for required interface.
- \p Backoff - back-off schema, default is cds::backoff::Default
template <
- class Buffer = cds::container::VyukovMPMCCycleQueue< epoch_retired_ptr >
+ class Buffer = cds::container::VyukovMPSCCycleQueue< epoch_retired_ptr >
,class Lock = std::mutex
,class DisposerThread = dispose_thread<Buffer>
,class Backoff = cds::backoff::Default
#include <mutex>
#include <condition_variable>
#include <cds/details/aligned_type.h>
+#include <cds/algo/atomic.h>
namespace cds { namespace urcu {
typedef Buffer buffer_type ; ///< Buffer type
- typedef std::thread thread_type;
+ typedef std::thread thread_type;
typedef std::mutex mutex_type;
typedef std::condition_variable condvar_type;
typedef std::unique_lock< mutex_type > unique_lock;
condvar_type m_cvDataReady;
// Task for thread (dispose cycle)
- buffer_type * volatile m_pBuffer;
- uint64_t volatile m_nCurEpoch;
+ atomics::atomic<buffer_type *> m_pBuffer;
+ uint64_t volatile m_nCurEpoch;
// Quit flag
- bool volatile m_bQuit;
+ atomics::atomic<bool> m_bQuit;
// disposing pass sync
- condvar_type m_cvReady;
- bool volatile m_bReady;
+ condvar_type m_cvReady;
+ atomics::atomic<bool> m_bReady;
private: // methods called from disposing thread
uint64_t nCurEpoch;
bool bQuit = false;
- epoch_retired_ptr rest;
while ( !bQuit ) {
+ // signal that we are ready to dispose
unique_lock lock( m_Mutex );
+ m_bReady.store( true, atomics::memory_order_relaxed );
+ }
+ m_cvReady.notify_one();
- // signal that we are ready to dispose
- m_bReady = true;
- m_cvReady.notify_one();
+ {
// wait new data portion
- while ( !m_pBuffer )
+ unique_lock lock( m_Mutex );
+ while ( (pBuffer = m_pBuffer.load( atomics::memory_order_relaxed )) == nullptr )
m_cvDataReady.wait( lock );
// New work is ready
- m_bReady = false ; // we are busy
+ m_bReady.store( false, atomics::memory_order_relaxed ); // we are busy
- bQuit = m_bQuit;
+ bQuit = m_bQuit.load( atomics::memory_order_relaxed );
nCurEpoch = m_nCurEpoch;
- pBuffer = m_pBuffer;
- m_pBuffer = nullptr;
- }
- if ( rest.m_p ) {
- assert( rest.m_nEpoch <= nCurEpoch );
- rest.free();
+ m_pBuffer.store( nullptr, atomics::memory_order_relaxed );
if ( pBuffer )
- rest = dispose_buffer( pBuffer, nCurEpoch );
+ dispose_buffer( pBuffer, nCurEpoch );
- epoch_retired_ptr dispose_buffer( buffer_type * pBuf, uint64_t nCurEpoch )
+ void dispose_buffer( buffer_type * pBuf, uint64_t nCurEpoch )
- epoch_retired_ptr p;
- while ( pBuf->pop( p )) {
- if ( p.m_nEpoch <= nCurEpoch ) {
- p.free();
+ epoch_retired_ptr * p;
+ while ( ( p = pBuf->front()) != nullptr ) {
+ if ( p->m_nEpoch <= nCurEpoch ) {
+ p->free();
+ CDS_VERIFY( pBuf->pop_front() );
- else {
- if ( !pBuf->push( p ))
- return p;
+ else
- }
- return epoch_retired_ptr();
unique_lock lock( m_Mutex );
// wait while retiring pass done
- while ( !m_bReady )
+ while ( !m_bReady.load( atomics::memory_order_relaxed ))
m_cvReady.wait( lock );
// give a new work and set stop flag
- m_pBuffer = &buf;
m_nCurEpoch = nCurEpoch;
- m_bQuit = true;
+ m_pBuffer.store( &buf, atomics::memory_order_relaxed );
+ m_bQuit.store( true, atomics::memory_order_relaxed );
void dispose( buffer_type& buf, uint64_t nCurEpoch, bool bSync )
- unique_lock lock( m_Mutex );
- // wait while disposing pass done
- while ( !m_bReady )
- m_cvReady.wait( lock );
- if ( bSync )
- m_bReady = false;
+ {
+ unique_lock lock( m_Mutex );
- // new work
- m_nCurEpoch = nCurEpoch;
- m_pBuffer = &buf;
+ // wait while disposing pass done
+ while ( !m_bReady.load( atomics::memory_order_relaxed ))
+ m_cvReady.wait( lock );
+ // new work
+ m_bReady.store( false, atomics::memory_order_relaxed );
+ m_nCurEpoch = nCurEpoch;
+ m_pBuffer.store( &buf, atomics::memory_order_relaxed );
+ }
if ( bSync ) {
- while ( !m_bReady )
+ unique_lock lock( m_Mutex );
+ while ( !m_bReady.load( atomics::memory_order_relaxed ))
m_cvReady.wait( lock );
This is a wrapper around \p general_threaded class.
Template arguments:
- - \p Buffer - lock-free queue or lock-free bounded queue.
- Default is \p cds::container::VyukovMPMCCycleQueue< retired_ptr >
+ - \p Buffer - lock-free MPSC (muliple producer/single consumer) queue.
+ Default is \p cds::container::VyukovMPSCCycleQueue< retired_ptr >
- \p Lock - mutex type, default is \p std::mutex
- \p DisposerThread - reclamation thread class, default is \p cds::urcu::dispose_thread
See \ref cds::urcu::dispose_thread for class interface.
template <
- class Buffer = cds::container::VyukovMPMCCycleQueue< epoch_retired_ptr >
+ class Buffer = cds::container::VyukovMPSCCycleQueue< epoch_retired_ptr >
,class Lock = std::mutex
,class DisposerThread = dispose_thread<Buffer>
,class Backoff = cds::backoff::Default
This is a wrapper around \p signal_threaded class.
Template arguments:
- - \p Buffer - lock-free queue or lock-free bounded queue.
- Default is \p cds::container::VyukovMPMCCycleQueue< retired_ptr >
+ - \p Buffer - lock-free MPSC (muliple producer/single consumer) queue.
+ Default is \p cds::container::VyukovMPSCCycleQueue< epoch_retired_ptr >
- \p Lock - mutex type, default is \p std::mutex
- \p DisposerThread - reclamation thread class, default is \p %general_threaded_dispose_thread
See \ref cds::urcu::dispose_thread for class interface.
template <
- class Buffer = cds::container::VyukovMPMCCycleQueue< epoch_retired_ptr >
+ class Buffer = cds::container::VyukovMPSCCycleQueue< epoch_retired_ptr >
,class Lock = std::mutex
,class DisposerThread = dispose_thread<Buffer>
,class Backoff = cds::backoff::Default