/// Back-off strategy
typedef cds::backoff::Default back_off;
+
+ /// Single-consumer version
+ /**
+ For single-consumer version of algorithm some additional functions
+ (\p front(), \p pop_front()) is available.
+
+ Default is \p false
+ */
+ static CDS_CONSTEXPR bool const single_consumer = false;
};
/// Metafunction converting option list to \p vyukov_queue::traits
No dynamic memory allocation/management during operation. Producers and consumers are separated from each other (as in the two-lock queue),
i.e. do not touch the same data while queue is not empty.
+ There is multiple producer/single consumer version \p cds::container::VyukovMPSCCycleQueue
+ that supports \p front() and \p pop_front() functions.
+
Source:
- http://www.1024cores.net/home/lock-free-algorithms/queues/bounded-mpmc-queue
typedef typename traits::value_cleaner value_cleaner; ///< Value cleaner, see \p vyukov_queue::traits::value_cleaner
typedef typename traits::back_off back_off; ///< back-off strategy
+ /// \p true for single-consumer version, \p false otherwise
+ static CDS_CONSTEXPR bool const c_single_consumer = traits::single_consumer;
+
/// Rebind template arguments
template <typename T2, typename Traits2>
struct rebind {
return dequeue_with( f );
}
+ /// Returns a pointer to top element of the queue or \p nullptr if queue is empty (only for single-consumer version)
+ template <bool SC = c_single_consumer >
+ typename std::enable_if<SC, value_type *>::type front()
+ {
+ static_assert( c_single_consumer, "front() is enabled only if traits::single_consumer is true");
+
+ cell_type * cell;
+ back_off bkoff;
+
+ size_t pos = m_posDequeue.load( memory_model::memory_order_relaxed );
+ for ( ;;)
+ {
+ cell = &m_buffer[pos & m_nBufferMask];
+ size_t seq = cell->sequence.load( memory_model::memory_order_acquire );
+ intptr_t dif = static_cast<intptr_t>(seq) - static_cast<intptr_t>(pos + 1);
+
+ if ( dif == 0 )
+ return &cell->data;
+ else if ( dif < 0 ) {
+ // Queue empty?
+ if ( pos - m_posEnqueue.load( memory_model::memory_order_relaxed ) == 0 )
+ return nullptr; // queue empty
+ bkoff();
+ pos = m_posDequeue.load( memory_model::memory_order_relaxed );
+ }
+ else
+ pos = m_posDequeue.load( memory_model::memory_order_relaxed );
+ }
+ }
+
+ /// Pops top element; returns \p true if queue is not empty, \p false otherwise (only for single-consumer version)
+ template <bool SC = c_single_consumer >
+ typename std::enable_if<SC, bool>::type pop_front()
+ {
+ return dequeue_with( []( value_type& ) {} );
+ }
+
/// Checks if the queue is empty
bool empty() const
{
return m_buffer.capacity();
}
};
+
+ //@cond
+ namespace vyukov_queue {
+ template <typename Traits>
+ struct single_consumer_traits : public Traits
+ {
+ static CDS_CONSTEXPR bool const single_consumer = true;
+ };
+ } // namespace vyukov_queue
+ //@endcond
+
+ /// Vyukov's queue multiple producer - single consumer version
+ template <typename T, typename Traits = vyukov_queue::traits >
+ using VyukovMPSCCycleQueue = VyukovMPMCCycleQueue< T, vyukov_queue::single_consumer_traits<Traits> >;
+
}} // namespace cds::container
#endif // #ifndef CDSLIB_CONTAINER_VYUKOV_MPMC_CYCLE_QUEUE_H
assert( m_p );
m_funcFree( m_p );
- CDS_STRICT_DO( m_p = nullptr );
- CDS_STRICT_DO( m_funcFree = nullptr );
+ CDS_STRICT_DO( clear() );
+ }
+
+ /// Checks if the retired pointer is not empty
+ explicit operator bool() const CDS_NOEXCEPT
+ {
+ return m_p != nullptr;
+ }
+
+ /// Clears retired pointer without \p free() call
+ void clear()
+ {
+ m_p = nullptr;
+ m_funcFree = nullptr;
}
};
template <typename Func>
std::pair<iterator, bool> update_at_locked( position& pos, value_type& val, Func func, bool bInsert )
{
- // RCU lock should be locked!!!
+ // RCU should be locked!!!
assert( gc::is_locked() );
while ( true ) {
*/
struct epoch_retired_ptr: public retired_ptr
{
- uint64_t m_nEpoch ; ///< The epoch when the object has been retired
+ uint64_t m_nEpoch; ///< The epoch when the object has been retired
//@cond
epoch_retired_ptr()
i.e. until the RCU quiescent state will come. After that the buffer and all retired objects are freed.
This synchronization cycle may be called in any thread that calls \p retire_ptr function.
- The \p Buffer contains items of \ref cds_urcu_retired_ptr "retired_ptr" type and it should support a queue interface with
+ The \p Buffer contains items of \ref cds_urcu_retired_ptr "epoch_retired_ptr" type and it should support a queue interface with
three function:
- <tt> bool push( retired_ptr& p ) </tt> - places the retired pointer \p p into queue. If the function
returns \p false it means that the buffer is full and RCU synchronization cycle must be processed.
this function must return \p false
- <tt>size_t size()</tt> - returns queue's item count.
- The buffer is considered as full if \p push returns \p false or the buffer size reaches the RCU threshold.
+ The buffer is considered as full if \p push() returns \p false or the buffer size reaches the RCU threshold.
There is a wrapper \ref cds_urcu_general_buffered_gc "gc<general_buffered>" for \p %general_buffered class
that provides unified RCU interface. You should use this wrapper class instead \p %general_buffered
Template arguments:
- - \p Buffer - buffer type. Default is cds::container::VyukovMPMCCycleQueue
+ - \p Buffer - buffer type. Default is \p cds::container::VyukovMPMCCycleQueue
- \p Lock - mutex type, default is \p std::mutex
- \p Backoff - back-off schema, default is cds::backoff::Default
*/
protected:
//@cond
- buffer_type m_Buffer;
- atomics::atomic<uint64_t> m_nCurEpoch;
- lock_type m_Lock;
- size_t const m_nCapacity;
+ buffer_type m_Buffer;
+ atomics::atomic<uint64_t> m_nCurEpoch;
+ lock_type m_Lock;
+ size_t const m_nCapacity;
//@endcond
public:
This implementation is similar to \ref general_buffered but separate thread is created
for deleting the retired objects. Like \p %general_buffered, the class contains an internal buffer
where retired objects are accumulated. When the buffer becomes full,
- the RCU \p synchronize function is called that waits until all reader/updater threads end up their read-side critical sections,
- i.e. until the RCU quiescent state will come. After that the "work ready" message is sent to reclamation tread.
+ the RCU \p synchronize() function is called that waits until all reader/updater threads end up their read-side critical sections,
+ i.e. until the RCU quiescent state will come. After that the "work ready" message is sent to reclamation thread.
The reclamation thread frees the buffer.
- This synchronization cycle may be called in any thread that calls \ref retire_ptr function.
+ This synchronization cycle may be called in any thread that calls \p retire_ptr() function.
There is a wrapper \ref cds_urcu_general_threaded_gc "gc<general_threaded>" for \p %general_threaded class
that provides unified RCU interface. You should use this wrapper class instead \p %general_threaded
+ The \p Buffer contains items of \ref cds_urcu_retired_ptr "epoch_retired_ptr" type
+ and it should support a multiple producer/single consumer queue with the following interface:
+ - <tt> bool push( epoch_retired_ptr& p ) </tt> - places the retired pointer \p p into queue. If the function
+ returns \p false it means that the buffer is full and RCU synchronization cycle must be processed.
+ - <tt>epoch_retired_ptr * front() </tt> - returns a pointer to the top element or \p nullptr if the buffer is empty.
+ - <tt>bool pop_front() </tt> - pops the top element; returns \p false if the buffer is empty.
+ - <tt>size_t size()</tt> - returns queue's item count.
+
+ The buffer is considered as full if \p push() returns \p false or the buffer size reaches the RCU threshold.
+
Template arguments:
- - \p Buffer - buffer type with FIFO semantics. Default is \p cds::container::VyukovMPMCCycleQueue. See \ref general_buffered
- for description of buffer's interface. The buffer contains the objects of \ref epoch_retired_ptr
+ - \p Buffer - MPSC (muliple producer/single consumer) buffer type with FIFO semantics.
+ Default is \p cds::container::VyukovMPSCCycleQueue. The buffer contains the objects of \ref epoch_retired_ptr
type that contains additional \p m_nEpoch field. This field specifies an epoch when the object
has been placed into the buffer. The \p %general_threaded object has a global epoch counter
- that is incremented on each \p synchronize call. The epoch is used internally to prevent early deletion.
+ that is incremented on each \p synchronize() call. The epoch is used internally to prevent early deletion.
- \p Lock - mutex type, default is \p std::mutex
- \p DisposerThread - the reclamation thread class. Default is \ref cds::urcu::dispose_thread,
see the description of this class for required interface.
- \p Backoff - back-off schema, default is cds::backoff::Default
*/
template <
- class Buffer = cds::container::VyukovMPMCCycleQueue< epoch_retired_ptr >
+ class Buffer = cds::container::VyukovMPSCCycleQueue< epoch_retired_ptr >
,class Lock = std::mutex
,class DisposerThread = dispose_thread<Buffer>
,class Backoff = cds::backoff::Default
bool bPushed = m_Buffer.push( p );
if ( !bPushed || m_Buffer.size() >= capacity() ) {
synchronize();
- if ( !bPushed ) {
+ if ( !bPushed )
p.free();
- }
return true;
}
return false;
}
}
-
/// Waits to finish a grace period and calls disposing thread
void synchronize()
{
void synchronize( bool bSync )
{
uint64_t nPrevEpoch = m_nCurEpoch.fetch_add( 1, atomics::memory_order_release );
-
{
std::unique_lock<lock_type> sl( m_Lock );
flip_and_wait();
flip_and_wait();
}
-
m_DisposerThread.dispose( m_Buffer, nPrevEpoch, bSync );
}
void force_dispose()
that provides unified RCU interface. You should use this wrapper class instead \p %signal_threaded
Template arguments:
- - \p Buffer - buffer type with FIFO semantics. Default is cds::container::VyukovMPMCCycleQueue. See \ref signal_buffered
+ - \p Buffer - buffer type with FIFO semantics. Default is \p cds::container::VyukovMPSCCycleQueue. See \ref signal_buffered
for description of buffer's interface. The buffer contains the objects of \ref epoch_retired_ptr
type that contains additional \p m_nEpoch field. This field specifies an epoch when the object
has been placed into the buffer. The \p %signal_threaded object has a global epoch counter
- that is incremented on each \p synchronize call. The epoch is used internally to prevent early deletion.
+ that is incremented on each \p synchronize() call. The epoch is used internally to prevent early deletion.
- \p Lock - mutex type, default is \p std::mutex
- \p DisposerThread - the reclamation thread class. Default is \ref cds::urcu::dispose_thread,
see the description of this class for required interface.
- \p Backoff - back-off schema, default is cds::backoff::Default
*/
template <
- class Buffer = cds::container::VyukovMPMCCycleQueue< epoch_retired_ptr >
+ class Buffer = cds::container::VyukovMPSCCycleQueue< epoch_retired_ptr >
,class Lock = std::mutex
,class DisposerThread = dispose_thread<Buffer>
,class Backoff = cds::backoff::Default
#include <mutex>
#include <condition_variable>
#include <cds/details/aligned_type.h>
+#include <cds/algo/atomic.h>
namespace cds { namespace urcu {
typedef Buffer buffer_type ; ///< Buffer type
private:
//@cond
- typedef std::thread thread_type;
+ typedef std::thread thread_type;
typedef std::mutex mutex_type;
typedef std::condition_variable condvar_type;
typedef std::unique_lock< mutex_type > unique_lock;
condvar_type m_cvDataReady;
// Task for thread (dispose cycle)
- buffer_type * volatile m_pBuffer;
- uint64_t volatile m_nCurEpoch;
+ atomics::atomic<buffer_type *> m_pBuffer;
+ uint64_t volatile m_nCurEpoch;
// Quit flag
- bool volatile m_bQuit;
+ atomics::atomic<bool> m_bQuit;
// disposing pass sync
- condvar_type m_cvReady;
- bool volatile m_bReady;
+ condvar_type m_cvReady;
+ atomics::atomic<bool> m_bReady;
//@endcond
private: // methods called from disposing thread
uint64_t nCurEpoch;
bool bQuit = false;
- epoch_retired_ptr rest;
-
while ( !bQuit ) {
+
+ // signal that we are ready to dispose
{
unique_lock lock( m_Mutex );
+ m_bReady.store( true, atomics::memory_order_relaxed );
+ }
+ m_cvReady.notify_one();
- // signal that we are ready to dispose
- m_bReady = true;
- m_cvReady.notify_one();
-
+ {
// wait new data portion
- while ( !m_pBuffer )
+ unique_lock lock( m_Mutex );
+
+ while ( (pBuffer = m_pBuffer.load( atomics::memory_order_relaxed )) == nullptr )
m_cvDataReady.wait( lock );
// New work is ready
- m_bReady = false ; // we are busy
+ m_bReady.store( false, atomics::memory_order_relaxed ); // we are busy
- bQuit = m_bQuit;
+ bQuit = m_bQuit.load( atomics::memory_order_relaxed );
nCurEpoch = m_nCurEpoch;
- pBuffer = m_pBuffer;
- m_pBuffer = nullptr;
- }
-
- if ( rest.m_p ) {
- assert( rest.m_nEpoch <= nCurEpoch );
- rest.free();
+ m_pBuffer.store( nullptr, atomics::memory_order_relaxed );
}
if ( pBuffer )
- rest = dispose_buffer( pBuffer, nCurEpoch );
+ dispose_buffer( pBuffer, nCurEpoch );
}
}
- epoch_retired_ptr dispose_buffer( buffer_type * pBuf, uint64_t nCurEpoch )
+ void dispose_buffer( buffer_type * pBuf, uint64_t nCurEpoch )
{
- epoch_retired_ptr p;
- while ( pBuf->pop( p )) {
- if ( p.m_nEpoch <= nCurEpoch ) {
- p.free();
+ epoch_retired_ptr * p;
+ while ( ( p = pBuf->front()) != nullptr ) {
+ if ( p->m_nEpoch <= nCurEpoch ) {
+ p->free();
+ CDS_VERIFY( pBuf->pop_front() );
}
- else {
- if ( !pBuf->push( p ))
- return p;
+ else
break;
- }
}
- return epoch_retired_ptr();
}
//@endcond
unique_lock lock( m_Mutex );
// wait while retiring pass done
- while ( !m_bReady )
+ while ( !m_bReady.load( atomics::memory_order_relaxed ))
m_cvReady.wait( lock );
// give a new work and set stop flag
- m_pBuffer = &buf;
m_nCurEpoch = nCurEpoch;
- m_bQuit = true;
+ m_pBuffer.store( &buf, atomics::memory_order_relaxed );
+ m_bQuit.store( true, atomics::memory_order_relaxed );
}
m_cvDataReady.notify_one();
*/
void dispose( buffer_type& buf, uint64_t nCurEpoch, bool bSync )
{
- unique_lock lock( m_Mutex );
-
- // wait while disposing pass done
- while ( !m_bReady )
- m_cvReady.wait( lock );
-
- if ( bSync )
- m_bReady = false;
+ {
+ unique_lock lock( m_Mutex );
- // new work
- m_nCurEpoch = nCurEpoch;
- m_pBuffer = &buf;
+ // wait while disposing pass done
+ while ( !m_bReady.load( atomics::memory_order_relaxed ))
+ m_cvReady.wait( lock );
+ // new work
+ m_bReady.store( false, atomics::memory_order_relaxed );
+ m_nCurEpoch = nCurEpoch;
+ m_pBuffer.store( &buf, atomics::memory_order_relaxed );
+ }
m_cvDataReady.notify_one();
if ( bSync ) {
- while ( !m_bReady )
+ unique_lock lock( m_Mutex );
+ while ( !m_bReady.load( atomics::memory_order_relaxed ))
m_cvReady.wait( lock );
}
}
This is a wrapper around \p general_threaded class.
Template arguments:
- - \p Buffer - lock-free queue or lock-free bounded queue.
- Default is \p cds::container::VyukovMPMCCycleQueue< retired_ptr >
+ - \p Buffer - lock-free MPSC (muliple producer/single consumer) queue.
+ Default is \p cds::container::VyukovMPSCCycleQueue< retired_ptr >
- \p Lock - mutex type, default is \p std::mutex
- \p DisposerThread - reclamation thread class, default is \p cds::urcu::dispose_thread
See \ref cds::urcu::dispose_thread for class interface.
*/
template <
#ifdef CDS_DOXGEN_INVOKED
- class Buffer = cds::container::VyukovMPMCCycleQueue< epoch_retired_ptr >
+ class Buffer = cds::container::VyukovMPSCCycleQueue< epoch_retired_ptr >
,class Lock = std::mutex
,class DisposerThread = dispose_thread<Buffer>
,class Backoff = cds::backoff::Default
This is a wrapper around \p signal_threaded class.
Template arguments:
- - \p Buffer - lock-free queue or lock-free bounded queue.
- Default is \p cds::container::VyukovMPMCCycleQueue< retired_ptr >
+ - \p Buffer - lock-free MPSC (muliple producer/single consumer) queue.
+ Default is \p cds::container::VyukovMPSCCycleQueue< epoch_retired_ptr >
- \p Lock - mutex type, default is \p std::mutex
- \p DisposerThread - reclamation thread class, default is \p %general_threaded_dispose_thread
See \ref cds::urcu::dispose_thread for class interface.
*/
template <
#ifdef CDS_DOXGEN_INVOKED
- class Buffer = cds::container::VyukovMPMCCycleQueue< epoch_retired_ptr >
+ class Buffer = cds::container::VyukovMPSCCycleQueue< epoch_retired_ptr >
,class Lock = std::mutex
,class DisposerThread = dispose_thread<Buffer>
,class Backoff = cds::backoff::Default