From 1cf811d992476e3d62cef948ba4c3cb1292016b0 Mon Sep 17 00:00:00 2001 From: khizmax Date: Sun, 20 Dec 2015 11:48:50 +0300 Subject: [PATCH] Rebuilt threaded uRCU logic Added single-consumer option to Vyukov's queue --- cds/container/vyukov_mpmc_cycle_queue.h | 67 ++++++++++++++++++ cds/gc/details/retired_ptr.h | 16 ++++- cds/intrusive/michael_list_rcu.h | 2 +- cds/urcu/details/base.h | 2 +- cds/urcu/details/gpb.h | 14 ++-- cds/urcu/details/gpt.h | 30 +++++---- cds/urcu/details/sig_threaded.h | 6 +- cds/urcu/dispose_thread.h | 90 ++++++++++++------------- cds/urcu/general_threaded.h | 6 +- cds/urcu/signal_threaded.h | 6 +- 10 files changed, 159 insertions(+), 80 deletions(-) diff --git a/cds/container/vyukov_mpmc_cycle_queue.h b/cds/container/vyukov_mpmc_cycle_queue.h index 0238152e..5f57d115 100644 --- a/cds/container/vyukov_mpmc_cycle_queue.h +++ b/cds/container/vyukov_mpmc_cycle_queue.h @@ -52,6 +52,15 @@ namespace cds { namespace container { /// Back-off strategy typedef cds::backoff::Default back_off; + + /// Single-consumer version + /** + For single-consumer version of algorithm some additional functions + (\p front(), \p pop_front()) is available. + + Default is \p false + */ + static CDS_CONSTEXPR bool const single_consumer = false; }; /// Metafunction converting option list to \p vyukov_queue::traits @@ -107,6 +116,9 @@ namespace cds { namespace container { No dynamic memory allocation/management during operation. Producers and consumers are separated from each other (as in the two-lock queue), i.e. do not touch the same data while queue is not empty. + There is multiple producer/single consumer version \p cds::container::VyukovMPSCCycleQueue + that supports \p front() and \p pop_front() functions. + Source: - http://www.1024cores.net/home/lock-free-algorithms/queues/bounded-mpmc-queue @@ -142,6 +154,9 @@ namespace cds { namespace container { typedef typename traits::value_cleaner value_cleaner; ///< Value cleaner, see \p vyukov_queue::traits::value_cleaner typedef typename traits::back_off back_off; ///< back-off strategy + /// \p true for single-consumer version, \p false otherwise + static CDS_CONSTEXPR bool const c_single_consumer = traits::single_consumer; + /// Rebind template arguments template struct rebind { @@ -358,6 +373,43 @@ namespace cds { namespace container { return dequeue_with( f ); } + /// Returns a pointer to top element of the queue or \p nullptr if queue is empty (only for single-consumer version) + template + typename std::enable_if::type front() + { + static_assert( c_single_consumer, "front() is enabled only if traits::single_consumer is true"); + + cell_type * cell; + back_off bkoff; + + size_t pos = m_posDequeue.load( memory_model::memory_order_relaxed ); + for ( ;;) + { + cell = &m_buffer[pos & m_nBufferMask]; + size_t seq = cell->sequence.load( memory_model::memory_order_acquire ); + intptr_t dif = static_cast(seq) - static_cast(pos + 1); + + if ( dif == 0 ) + return &cell->data; + else if ( dif < 0 ) { + // Queue empty? + if ( pos - m_posEnqueue.load( memory_model::memory_order_relaxed ) == 0 ) + return nullptr; // queue empty + bkoff(); + pos = m_posDequeue.load( memory_model::memory_order_relaxed ); + } + else + pos = m_posDequeue.load( memory_model::memory_order_relaxed ); + } + } + + /// Pops top element; returns \p true if queue is not empty, \p false otherwise (only for single-consumer version) + template + typename std::enable_if::type pop_front() + { + return dequeue_with( []( value_type& ) {} ); + } + /// Checks if the queue is empty bool empty() const { @@ -405,6 +457,21 @@ namespace cds { namespace container { return m_buffer.capacity(); } }; + + //@cond + namespace vyukov_queue { + template + struct single_consumer_traits : public Traits + { + static CDS_CONSTEXPR bool const single_consumer = true; + }; + } // namespace vyukov_queue + //@endcond + + /// Vyukov's queue multiple producer - single consumer version + template + using VyukovMPSCCycleQueue = VyukovMPMCCycleQueue< T, vyukov_queue::single_consumer_traits >; + }} // namespace cds::container #endif // #ifndef CDSLIB_CONTAINER_VYUKOV_MPMC_CYCLE_QUEUE_H diff --git a/cds/gc/details/retired_ptr.h b/cds/gc/details/retired_ptr.h index 5faebf49..84d892be 100644 --- a/cds/gc/details/retired_ptr.h +++ b/cds/gc/details/retired_ptr.h @@ -68,8 +68,20 @@ namespace cds { namespace gc { assert( m_p ); m_funcFree( m_p ); - CDS_STRICT_DO( m_p = nullptr ); - CDS_STRICT_DO( m_funcFree = nullptr ); + CDS_STRICT_DO( clear() ); + } + + /// Checks if the retired pointer is not empty + explicit operator bool() const CDS_NOEXCEPT + { + return m_p != nullptr; + } + + /// Clears retired pointer without \p free() call + void clear() + { + m_p = nullptr; + m_funcFree = nullptr; } }; diff --git a/cds/intrusive/michael_list_rcu.h b/cds/intrusive/michael_list_rcu.h index b77cc9e8..182ab326 100644 --- a/cds/intrusive/michael_list_rcu.h +++ b/cds/intrusive/michael_list_rcu.h @@ -1145,7 +1145,7 @@ namespace cds { namespace intrusive { template std::pair update_at_locked( position& pos, value_type& val, Func func, bool bInsert ) { - // RCU lock should be locked!!! + // RCU should be locked!!! assert( gc::is_locked() ); while ( true ) { diff --git a/cds/urcu/details/base.h b/cds/urcu/details/base.h index 8c6e726d..c8ef100d 100644 --- a/cds/urcu/details/base.h +++ b/cds/urcu/details/base.h @@ -435,7 +435,7 @@ namespace cds { */ struct epoch_retired_ptr: public retired_ptr { - uint64_t m_nEpoch ; ///< The epoch when the object has been retired + uint64_t m_nEpoch; ///< The epoch when the object has been retired //@cond epoch_retired_ptr() diff --git a/cds/urcu/details/gpb.h b/cds/urcu/details/gpb.h index 9dd22509..db4334eb 100644 --- a/cds/urcu/details/gpb.h +++ b/cds/urcu/details/gpb.h @@ -21,7 +21,7 @@ namespace cds { namespace urcu { i.e. until the RCU quiescent state will come. After that the buffer and all retired objects are freed. This synchronization cycle may be called in any thread that calls \p retire_ptr function. - The \p Buffer contains items of \ref cds_urcu_retired_ptr "retired_ptr" type and it should support a queue interface with + The \p Buffer contains items of \ref cds_urcu_retired_ptr "epoch_retired_ptr" type and it should support a queue interface with three function: - bool push( retired_ptr& p ) - places the retired pointer \p p into queue. If the function returns \p false it means that the buffer is full and RCU synchronization cycle must be processed. @@ -29,13 +29,13 @@ namespace cds { namespace urcu { this function must return \p false - size_t size() - returns queue's item count. - The buffer is considered as full if \p push returns \p false or the buffer size reaches the RCU threshold. + The buffer is considered as full if \p push() returns \p false or the buffer size reaches the RCU threshold. There is a wrapper \ref cds_urcu_general_buffered_gc "gc" for \p %general_buffered class that provides unified RCU interface. You should use this wrapper class instead \p %general_buffered Template arguments: - - \p Buffer - buffer type. Default is cds::container::VyukovMPMCCycleQueue + - \p Buffer - buffer type. Default is \p cds::container::VyukovMPMCCycleQueue - \p Lock - mutex type, default is \p std::mutex - \p Backoff - back-off schema, default is cds::backoff::Default */ @@ -67,10 +67,10 @@ namespace cds { namespace urcu { protected: //@cond - buffer_type m_Buffer; - atomics::atomic m_nCurEpoch; - lock_type m_Lock; - size_t const m_nCapacity; + buffer_type m_Buffer; + atomics::atomic m_nCurEpoch; + lock_type m_Lock; + size_t const m_nCapacity; //@endcond public: diff --git a/cds/urcu/details/gpt.h b/cds/urcu/details/gpt.h index ad38ef82..519a704f 100644 --- a/cds/urcu/details/gpt.h +++ b/cds/urcu/details/gpt.h @@ -19,27 +19,37 @@ namespace cds { namespace urcu { This implementation is similar to \ref general_buffered but separate thread is created for deleting the retired objects. Like \p %general_buffered, the class contains an internal buffer where retired objects are accumulated. When the buffer becomes full, - the RCU \p synchronize function is called that waits until all reader/updater threads end up their read-side critical sections, - i.e. until the RCU quiescent state will come. After that the "work ready" message is sent to reclamation tread. + the RCU \p synchronize() function is called that waits until all reader/updater threads end up their read-side critical sections, + i.e. until the RCU quiescent state will come. After that the "work ready" message is sent to reclamation thread. The reclamation thread frees the buffer. - This synchronization cycle may be called in any thread that calls \ref retire_ptr function. + This synchronization cycle may be called in any thread that calls \p retire_ptr() function. There is a wrapper \ref cds_urcu_general_threaded_gc "gc" for \p %general_threaded class that provides unified RCU interface. You should use this wrapper class instead \p %general_threaded + The \p Buffer contains items of \ref cds_urcu_retired_ptr "epoch_retired_ptr" type + and it should support a multiple producer/single consumer queue with the following interface: + - bool push( epoch_retired_ptr& p ) - places the retired pointer \p p into queue. If the function + returns \p false it means that the buffer is full and RCU synchronization cycle must be processed. + - epoch_retired_ptr * front() - returns a pointer to the top element or \p nullptr if the buffer is empty. + - bool pop_front() - pops the top element; returns \p false if the buffer is empty. + - size_t size() - returns queue's item count. + + The buffer is considered as full if \p push() returns \p false or the buffer size reaches the RCU threshold. + Template arguments: - - \p Buffer - buffer type with FIFO semantics. Default is \p cds::container::VyukovMPMCCycleQueue. See \ref general_buffered - for description of buffer's interface. The buffer contains the objects of \ref epoch_retired_ptr + - \p Buffer - MPSC (muliple producer/single consumer) buffer type with FIFO semantics. + Default is \p cds::container::VyukovMPSCCycleQueue. The buffer contains the objects of \ref epoch_retired_ptr type that contains additional \p m_nEpoch field. This field specifies an epoch when the object has been placed into the buffer. The \p %general_threaded object has a global epoch counter - that is incremented on each \p synchronize call. The epoch is used internally to prevent early deletion. + that is incremented on each \p synchronize() call. The epoch is used internally to prevent early deletion. - \p Lock - mutex type, default is \p std::mutex - \p DisposerThread - the reclamation thread class. Default is \ref cds::urcu::dispose_thread, see the description of this class for required interface. - \p Backoff - back-off schema, default is cds::backoff::Default */ template < - class Buffer = cds::container::VyukovMPMCCycleQueue< epoch_retired_ptr > + class Buffer = cds::container::VyukovMPSCCycleQueue< epoch_retired_ptr > ,class Lock = std::mutex ,class DisposerThread = dispose_thread ,class Backoff = cds::backoff::Default @@ -114,9 +124,8 @@ namespace cds { namespace urcu { bool bPushed = m_Buffer.push( p ); if ( !bPushed || m_Buffer.size() >= capacity() ) { synchronize(); - if ( !bPushed ) { + if ( !bPushed ) p.free(); - } return true; } return false; @@ -197,7 +206,6 @@ namespace cds { namespace urcu { } } - /// Waits to finish a grace period and calls disposing thread void synchronize() { @@ -208,13 +216,11 @@ namespace cds { namespace urcu { void synchronize( bool bSync ) { uint64_t nPrevEpoch = m_nCurEpoch.fetch_add( 1, atomics::memory_order_release ); - { std::unique_lock sl( m_Lock ); flip_and_wait(); flip_and_wait(); } - m_DisposerThread.dispose( m_Buffer, nPrevEpoch, bSync ); } void force_dispose() diff --git a/cds/urcu/details/sig_threaded.h b/cds/urcu/details/sig_threaded.h index e19d536a..3039d064 100644 --- a/cds/urcu/details/sig_threaded.h +++ b/cds/urcu/details/sig_threaded.h @@ -30,18 +30,18 @@ namespace cds { namespace urcu { that provides unified RCU interface. You should use this wrapper class instead \p %signal_threaded Template arguments: - - \p Buffer - buffer type with FIFO semantics. Default is cds::container::VyukovMPMCCycleQueue. See \ref signal_buffered + - \p Buffer - buffer type with FIFO semantics. Default is \p cds::container::VyukovMPSCCycleQueue. See \ref signal_buffered for description of buffer's interface. The buffer contains the objects of \ref epoch_retired_ptr type that contains additional \p m_nEpoch field. This field specifies an epoch when the object has been placed into the buffer. The \p %signal_threaded object has a global epoch counter - that is incremented on each \p synchronize call. The epoch is used internally to prevent early deletion. + that is incremented on each \p synchronize() call. The epoch is used internally to prevent early deletion. - \p Lock - mutex type, default is \p std::mutex - \p DisposerThread - the reclamation thread class. Default is \ref cds::urcu::dispose_thread, see the description of this class for required interface. - \p Backoff - back-off schema, default is cds::backoff::Default */ template < - class Buffer = cds::container::VyukovMPMCCycleQueue< epoch_retired_ptr > + class Buffer = cds::container::VyukovMPSCCycleQueue< epoch_retired_ptr > ,class Lock = std::mutex ,class DisposerThread = dispose_thread ,class Backoff = cds::backoff::Default diff --git a/cds/urcu/dispose_thread.h b/cds/urcu/dispose_thread.h index 5e2fc622..93195603 100644 --- a/cds/urcu/dispose_thread.h +++ b/cds/urcu/dispose_thread.h @@ -8,6 +8,7 @@ #include #include #include +#include namespace cds { namespace urcu { @@ -26,7 +27,7 @@ namespace cds { namespace urcu { typedef Buffer buffer_type ; ///< Buffer type private: //@cond - typedef std::thread thread_type; + typedef std::thread thread_type; typedef std::mutex mutex_type; typedef std::condition_variable condvar_type; typedef std::unique_lock< mutex_type > unique_lock; @@ -53,15 +54,15 @@ namespace cds { namespace urcu { condvar_type m_cvDataReady; // Task for thread (dispose cycle) - buffer_type * volatile m_pBuffer; - uint64_t volatile m_nCurEpoch; + atomics::atomic m_pBuffer; + uint64_t volatile m_nCurEpoch; // Quit flag - bool volatile m_bQuit; + atomics::atomic m_bQuit; // disposing pass sync - condvar_type m_cvReady; - bool volatile m_bReady; + condvar_type m_cvReady; + atomics::atomic m_bReady; //@endcond private: // methods called from disposing thread @@ -72,53 +73,46 @@ namespace cds { namespace urcu { uint64_t nCurEpoch; bool bQuit = false; - epoch_retired_ptr rest; - while ( !bQuit ) { + + // signal that we are ready to dispose { unique_lock lock( m_Mutex ); + m_bReady.store( true, atomics::memory_order_relaxed ); + } + m_cvReady.notify_one(); - // signal that we are ready to dispose - m_bReady = true; - m_cvReady.notify_one(); - + { // wait new data portion - while ( !m_pBuffer ) + unique_lock lock( m_Mutex ); + + while ( (pBuffer = m_pBuffer.load( atomics::memory_order_relaxed )) == nullptr ) m_cvDataReady.wait( lock ); // New work is ready - m_bReady = false ; // we are busy + m_bReady.store( false, atomics::memory_order_relaxed ); // we are busy - bQuit = m_bQuit; + bQuit = m_bQuit.load( atomics::memory_order_relaxed ); nCurEpoch = m_nCurEpoch; - pBuffer = m_pBuffer; - m_pBuffer = nullptr; - } - - if ( rest.m_p ) { - assert( rest.m_nEpoch <= nCurEpoch ); - rest.free(); + m_pBuffer.store( nullptr, atomics::memory_order_relaxed ); } if ( pBuffer ) - rest = dispose_buffer( pBuffer, nCurEpoch ); + dispose_buffer( pBuffer, nCurEpoch ); } } - epoch_retired_ptr dispose_buffer( buffer_type * pBuf, uint64_t nCurEpoch ) + void dispose_buffer( buffer_type * pBuf, uint64_t nCurEpoch ) { - epoch_retired_ptr p; - while ( pBuf->pop( p )) { - if ( p.m_nEpoch <= nCurEpoch ) { - p.free(); + epoch_retired_ptr * p; + while ( ( p = pBuf->front()) != nullptr ) { + if ( p->m_nEpoch <= nCurEpoch ) { + p->free(); + CDS_VERIFY( pBuf->pop_front() ); } - else { - if ( !pBuf->push( p )) - return p; + else break; - } } - return epoch_retired_ptr(); } //@endcond @@ -156,13 +150,13 @@ namespace cds { namespace urcu { unique_lock lock( m_Mutex ); // wait while retiring pass done - while ( !m_bReady ) + while ( !m_bReady.load( atomics::memory_order_relaxed )) m_cvReady.wait( lock ); // give a new work and set stop flag - m_pBuffer = &buf; m_nCurEpoch = nCurEpoch; - m_bQuit = true; + m_pBuffer.store( &buf, atomics::memory_order_relaxed ); + m_bQuit.store( true, atomics::memory_order_relaxed ); } m_cvDataReady.notify_one(); @@ -182,23 +176,23 @@ namespace cds { namespace urcu { */ void dispose( buffer_type& buf, uint64_t nCurEpoch, bool bSync ) { - unique_lock lock( m_Mutex ); - - // wait while disposing pass done - while ( !m_bReady ) - m_cvReady.wait( lock ); - - if ( bSync ) - m_bReady = false; + { + unique_lock lock( m_Mutex ); - // new work - m_nCurEpoch = nCurEpoch; - m_pBuffer = &buf; + // wait while disposing pass done + while ( !m_bReady.load( atomics::memory_order_relaxed )) + m_cvReady.wait( lock ); + // new work + m_bReady.store( false, atomics::memory_order_relaxed ); + m_nCurEpoch = nCurEpoch; + m_pBuffer.store( &buf, atomics::memory_order_relaxed ); + } m_cvDataReady.notify_one(); if ( bSync ) { - while ( !m_bReady ) + unique_lock lock( m_Mutex ); + while ( !m_bReady.load( atomics::memory_order_relaxed )) m_cvReady.wait( lock ); } } diff --git a/cds/urcu/general_threaded.h b/cds/urcu/general_threaded.h index 3cabc8fb..b72d2286 100644 --- a/cds/urcu/general_threaded.h +++ b/cds/urcu/general_threaded.h @@ -13,8 +13,8 @@ namespace cds { namespace urcu { This is a wrapper around \p general_threaded class. Template arguments: - - \p Buffer - lock-free queue or lock-free bounded queue. - Default is \p cds::container::VyukovMPMCCycleQueue< retired_ptr > + - \p Buffer - lock-free MPSC (muliple producer/single consumer) queue. + Default is \p cds::container::VyukovMPSCCycleQueue< retired_ptr > - \p Lock - mutex type, default is \p std::mutex - \p DisposerThread - reclamation thread class, default is \p cds::urcu::dispose_thread See \ref cds::urcu::dispose_thread for class interface. @@ -23,7 +23,7 @@ namespace cds { namespace urcu { */ template < #ifdef CDS_DOXGEN_INVOKED - class Buffer = cds::container::VyukovMPMCCycleQueue< epoch_retired_ptr > + class Buffer = cds::container::VyukovMPSCCycleQueue< epoch_retired_ptr > ,class Lock = std::mutex ,class DisposerThread = dispose_thread ,class Backoff = cds::backoff::Default diff --git a/cds/urcu/signal_threaded.h b/cds/urcu/signal_threaded.h index 28eb5558..24aeb34b 100644 --- a/cds/urcu/signal_threaded.h +++ b/cds/urcu/signal_threaded.h @@ -15,8 +15,8 @@ namespace cds { namespace urcu { This is a wrapper around \p signal_threaded class. Template arguments: - - \p Buffer - lock-free queue or lock-free bounded queue. - Default is \p cds::container::VyukovMPMCCycleQueue< retired_ptr > + - \p Buffer - lock-free MPSC (muliple producer/single consumer) queue. + Default is \p cds::container::VyukovMPSCCycleQueue< epoch_retired_ptr > - \p Lock - mutex type, default is \p std::mutex - \p DisposerThread - reclamation thread class, default is \p %general_threaded_dispose_thread See \ref cds::urcu::dispose_thread for class interface. @@ -25,7 +25,7 @@ namespace cds { namespace urcu { */ template < #ifdef CDS_DOXGEN_INVOKED - class Buffer = cds::container::VyukovMPMCCycleQueue< epoch_retired_ptr > + class Buffer = cds::container::VyukovMPSCCycleQueue< epoch_retired_ptr > ,class Lock = std::mutex ,class DisposerThread = dispose_thread ,class Backoff = cds::backoff::Default -- 2.34.1