Rebuilt threaded uRCU logic
authorkhizmax <libcds.dev@gmail.com>
Sun, 20 Dec 2015 08:48:50 +0000 (11:48 +0300)
committerkhizmax <libcds.dev@gmail.com>
Sun, 20 Dec 2015 08:48:50 +0000 (11:48 +0300)
Added single-consumer option to Vyukov's queue

cds/container/vyukov_mpmc_cycle_queue.h
cds/gc/details/retired_ptr.h
cds/intrusive/michael_list_rcu.h
cds/urcu/details/base.h
cds/urcu/details/gpb.h
cds/urcu/details/gpt.h
cds/urcu/details/sig_threaded.h
cds/urcu/dispose_thread.h
cds/urcu/general_threaded.h
cds/urcu/signal_threaded.h

index 0238152ec6f8dd129b24f0fd7aa61bcee5592cbb..5f57d115b465c627a9de5413184507acdd5c4f6f 100644 (file)
@@ -52,6 +52,15 @@ namespace cds { namespace container {
 
             /// Back-off strategy
             typedef cds::backoff::Default           back_off;
+
+            /// Single-consumer version
+            /**
+                For single-consumer version of algorithm some additional functions 
+                (\p front(), \p pop_front()) is available.
+
+                Default is \p false
+            */
+            static CDS_CONSTEXPR bool const single_consumer = false;
         };
 
         /// Metafunction converting option list to \p vyukov_queue::traits
@@ -107,6 +116,9 @@ namespace cds { namespace container {
         No dynamic memory allocation/management during operation. Producers and consumers are separated from each other (as in the two-lock queue),
         i.e. do not touch the same data while queue is not empty.
 
+        There is multiple producer/single consumer version \p cds::container::VyukovMPSCCycleQueue
+        that supports \p front() and \p pop_front() functions.
+
         Source:
             - http://www.1024cores.net/home/lock-free-algorithms/queues/bounded-mpmc-queue
 
@@ -142,6 +154,9 @@ namespace cds { namespace container {
         typedef typename traits::value_cleaner value_cleaner; ///< Value cleaner, see \p vyukov_queue::traits::value_cleaner
         typedef typename traits::back_off  back_off;          ///< back-off strategy
 
+        /// \p true for single-consumer version, \p false otherwise
+        static CDS_CONSTEXPR bool const c_single_consumer = traits::single_consumer;
+
         /// Rebind template arguments
         template <typename T2, typename Traits2>
         struct rebind {
@@ -358,6 +373,43 @@ namespace cds { namespace container {
             return dequeue_with( f );
         }
 
+        /// Returns a pointer to top element of the queue or \p nullptr if queue is empty (only for single-consumer version)
+        template <bool SC = c_single_consumer >
+        typename std::enable_if<SC, value_type *>::type front()
+        {
+            static_assert( c_single_consumer, "front() is enabled only if traits::single_consumer is true");
+
+            cell_type * cell;
+            back_off bkoff;
+
+            size_t pos = m_posDequeue.load( memory_model::memory_order_relaxed );
+            for ( ;;)
+            {
+                cell = &m_buffer[pos & m_nBufferMask];
+                size_t seq = cell->sequence.load( memory_model::memory_order_acquire );
+                intptr_t dif = static_cast<intptr_t>(seq) - static_cast<intptr_t>(pos + 1);
+
+                if ( dif == 0 )
+                    return &cell->data;
+                else if ( dif < 0 ) {
+                    // Queue empty?
+                    if ( pos - m_posEnqueue.load( memory_model::memory_order_relaxed ) == 0 )
+                        return nullptr;   // queue empty
+                    bkoff();
+                    pos = m_posDequeue.load( memory_model::memory_order_relaxed );
+                }
+                else
+                    pos = m_posDequeue.load( memory_model::memory_order_relaxed );
+            }
+        }
+
+        /// Pops top element; returns \p true if queue is not empty, \p false otherwise (only for single-consumer version)
+        template <bool SC = c_single_consumer >
+        typename std::enable_if<SC, bool>::type pop_front()
+        {
+            return dequeue_with( []( value_type& ) {} );
+        }
+
         /// Checks if the queue is empty
         bool empty() const
         {
@@ -405,6 +457,21 @@ namespace cds { namespace container {
             return m_buffer.capacity();
         }
     };
+
+    //@cond
+    namespace vyukov_queue { 
+        template <typename Traits>
+        struct single_consumer_traits : public Traits
+        {
+            static CDS_CONSTEXPR bool const single_consumer = true;
+        };
+    } // namespace vyukov_queue
+    //@endcond
+
+    /// Vyukov's queue multiple producer - single consumer version 
+    template <typename T, typename Traits = vyukov_queue::traits >
+    using VyukovMPSCCycleQueue = VyukovMPMCCycleQueue< T, vyukov_queue::single_consumer_traits<Traits> >;
+
 }}  // namespace cds::container
 
 #endif // #ifndef CDSLIB_CONTAINER_VYUKOV_MPMC_CYCLE_QUEUE_H
index 5faebf497b920b61d445f82c86d20a46f08d2b18..84d892beec97aca6b74346b16e7462a82ca61213 100644 (file)
@@ -68,8 +68,20 @@ namespace cds { namespace gc {
                 assert( m_p );
                 m_funcFree( m_p );
 
-                CDS_STRICT_DO( m_p = nullptr );
-                CDS_STRICT_DO( m_funcFree = nullptr );
+                CDS_STRICT_DO( clear() );
+            }
+
+            /// Checks if the retired pointer is not empty
+            explicit operator bool() const CDS_NOEXCEPT
+            {
+                return m_p != nullptr;
+            }
+
+            /// Clears retired pointer without \p free() call
+            void clear()
+            {
+                m_p = nullptr;
+                m_funcFree = nullptr;
             }
         };
 
index b77cc9e880f4ec02484a21301f18621974de48f0..182ab32690881424a1b42a530a7f868d686726f8 100644 (file)
@@ -1145,7 +1145,7 @@ namespace cds { namespace intrusive {
         template <typename Func>
         std::pair<iterator, bool> update_at_locked( position& pos, value_type& val, Func func, bool bInsert )
         {
-            // RCU lock should be locked!!!
+            // RCU should be locked!!!
             assert( gc::is_locked() );
 
             while ( true ) {
index 8c6e726d33b4af9329e9ccc0cbff5144b1c304b8..c8ef100d7a5cf9ffc594ca8a46bbb452ffa2ad56 100644 (file)
@@ -435,7 +435,7 @@ namespace cds {
         */
         struct epoch_retired_ptr: public retired_ptr
         {
-            uint64_t    m_nEpoch ;  ///< The epoch when the object has been retired
+            uint64_t    m_nEpoch;  ///< The epoch when the object has been retired
 
             //@cond
             epoch_retired_ptr()
index 9dd22509b12bb7d6a4d4eae515d32100d49fc4cc..db4334eb233b77ed8aff43ce57f1af22c8b84e9a 100644 (file)
@@ -21,7 +21,7 @@ namespace cds { namespace urcu {
         i.e. until the RCU quiescent state will come. After that the buffer and all retired objects are freed.
         This synchronization cycle may be called in any thread that calls \p retire_ptr function.
 
-        The \p Buffer contains items of \ref cds_urcu_retired_ptr "retired_ptr" type and it should support a queue interface with
+        The \p Buffer contains items of \ref cds_urcu_retired_ptr "epoch_retired_ptr" type and it should support a queue interface with
         three function:
         - <tt> bool push( retired_ptr& p ) </tt> - places the retired pointer \p p into queue. If the function
             returns \p false it means that the buffer is full and RCU synchronization cycle must be processed.
@@ -29,13 +29,13 @@ namespace cds { namespace urcu {
             this function must return \p false
         - <tt>size_t size()</tt> - returns queue's item count.
 
-        The buffer is considered as full if \p push returns \p false or the buffer size reaches the RCU threshold.
+        The buffer is considered as full if \p push() returns \p false or the buffer size reaches the RCU threshold.
 
         There is a wrapper \ref cds_urcu_general_buffered_gc "gc<general_buffered>" for \p %general_buffered class
         that provides unified RCU interface. You should use this wrapper class instead \p %general_buffered
 
         Template arguments:
-        - \p Buffer - buffer type. Default is cds::container::VyukovMPMCCycleQueue
+        - \p Buffer - buffer type. Default is \p cds::container::VyukovMPMCCycleQueue
         - \p Lock - mutex type, default is \p std::mutex
         - \p Backoff - back-off schema, default is cds::backoff::Default
     */
@@ -67,10 +67,10 @@ namespace cds { namespace urcu {
 
     protected:
         //@cond
-        buffer_type                     m_Buffer;
-        atomics::atomic<uint64_t>    m_nCurEpoch;
-        lock_type                       m_Lock;
-        size_t const                    m_nCapacity;
+        buffer_type                m_Buffer;
+        atomics::atomic<uint64_t>  m_nCurEpoch;
+        lock_type                  m_Lock;
+        size_t const               m_nCapacity;
         //@endcond
 
     public:
index ad38ef82828e8fefa50f7347ce5ac1110293e3d2..519a704f8e50b07b41b506b8260735fefa2f2a06 100644 (file)
@@ -19,27 +19,37 @@ namespace cds { namespace urcu {
         This implementation is similar to \ref general_buffered but separate thread is created
         for deleting the retired objects. Like \p %general_buffered, the class contains an internal buffer
         where retired objects are accumulated. When the buffer becomes full,
-        the RCU \p synchronize function is called that waits until all reader/updater threads end up their read-side critical sections,
-        i.e. until the RCU quiescent state will come. After that the "work ready" message is sent to reclamation tread.
+        the RCU \p synchronize() function is called that waits until all reader/updater threads end up their read-side critical sections,
+        i.e. until the RCU quiescent state will come. After that the "work ready" message is sent to reclamation thread.
         The reclamation thread frees the buffer.
-        This synchronization cycle may be called in any thread that calls \ref retire_ptr function.
+        This synchronization cycle may be called in any thread that calls \p retire_ptr() function.
 
         There is a wrapper \ref cds_urcu_general_threaded_gc "gc<general_threaded>" for \p %general_threaded class
         that provides unified RCU interface. You should use this wrapper class instead \p %general_threaded
 
+        The \p Buffer contains items of \ref cds_urcu_retired_ptr "epoch_retired_ptr" type 
+        and it should support a multiple producer/single consumer queue with the following interface:
+        - <tt> bool push( epoch_retired_ptr& p ) </tt> - places the retired pointer \p p into queue. If the function
+        returns \p false it means that the buffer is full and RCU synchronization cycle must be processed.
+        - <tt>epoch_retired_ptr * front() </tt> - returns a pointer to the top element or \p nullptr if the buffer is empty.
+        - <tt>bool pop_front() </tt> - pops the top element; returns \p false if the buffer is empty.
+        - <tt>size_t size()</tt> - returns queue's item count.
+
+        The buffer is considered as full if \p push() returns \p false or the buffer size reaches the RCU threshold.
+
         Template arguments:
-        - \p Buffer - buffer type with FIFO semantics. Default is \p cds::container::VyukovMPMCCycleQueue. See \ref general_buffered
-            for description of buffer's interface. The buffer contains the objects of \ref epoch_retired_ptr
+        - \p Buffer - MPSC (muliple producer/single consumer) buffer type with FIFO semantics. 
+            Default is \p cds::container::VyukovMPSCCycleQueue. The buffer contains the objects of \ref epoch_retired_ptr
             type that contains additional \p m_nEpoch field. This field specifies an epoch when the object
             has been placed into the buffer. The \p %general_threaded object has a global epoch counter
-            that is incremented on each \p synchronize call. The epoch is used internally to prevent early deletion.
+            that is incremented on each \p synchronize() call. The epoch is used internally to prevent early deletion.
         - \p Lock - mutex type, default is \p std::mutex
         - \p DisposerThread - the reclamation thread class. Default is \ref cds::urcu::dispose_thread,
             see the description of this class for required interface.
         - \p Backoff - back-off schema, default is cds::backoff::Default
     */
     template <
-        class Buffer = cds::container::VyukovMPMCCycleQueue< epoch_retired_ptr >
+        class Buffer = cds::container::VyukovMPSCCycleQueue< epoch_retired_ptr >
         ,class Lock = std::mutex
         ,class DisposerThread = dispose_thread<Buffer>
         ,class Backoff = cds::backoff::Default
@@ -114,9 +124,8 @@ namespace cds { namespace urcu {
             bool bPushed = m_Buffer.push( p );
             if ( !bPushed || m_Buffer.size() >= capacity() ) {
                 synchronize();
-                if ( !bPushed ) {
+                if ( !bPushed )
                     p.free();
-                }
                 return true;
             }
             return false;
@@ -197,7 +206,6 @@ namespace cds { namespace urcu {
             }
         }
 
-
         /// Waits to finish a grace period and calls disposing thread
         void synchronize()
         {
@@ -208,13 +216,11 @@ namespace cds { namespace urcu {
         void synchronize( bool bSync )
         {
             uint64_t nPrevEpoch = m_nCurEpoch.fetch_add( 1, atomics::memory_order_release );
-
             {
                 std::unique_lock<lock_type> sl( m_Lock );
                 flip_and_wait();
                 flip_and_wait();
             }
-
             m_DisposerThread.dispose( m_Buffer, nPrevEpoch, bSync );
         }
         void force_dispose()
index e19d536a5cc5b42a957dc45abc5b5bc826eca1b9..3039d064786e495550ba30056b410be1c6a86237 100644 (file)
@@ -30,18 +30,18 @@ namespace cds { namespace urcu {
         that provides unified RCU interface. You should use this wrapper class instead \p %signal_threaded
 
         Template arguments:
-        - \p Buffer - buffer type with FIFO semantics. Default is cds::container::VyukovMPMCCycleQueue. See \ref signal_buffered
+        - \p Buffer - buffer type with FIFO semantics. Default is \p cds::container::VyukovMPSCCycleQueue. See \ref signal_buffered
             for description of buffer's interface. The buffer contains the objects of \ref epoch_retired_ptr
             type that contains additional \p m_nEpoch field. This field specifies an epoch when the object
             has been placed into the buffer. The \p %signal_threaded object has a global epoch counter
-            that is incremented on each \p synchronize call. The epoch is used internally to prevent early deletion.
+            that is incremented on each \p synchronize() call. The epoch is used internally to prevent early deletion.
         - \p Lock - mutex type, default is \p std::mutex
         - \p DisposerThread - the reclamation thread class. Default is \ref cds::urcu::dispose_thread,
             see the description of this class for required interface.
         - \p Backoff - back-off schema, default is cds::backoff::Default
     */
     template <
-        class Buffer = cds::container::VyukovMPMCCycleQueue< epoch_retired_ptr >
+        class Buffer = cds::container::VyukovMPSCCycleQueue< epoch_retired_ptr >
         ,class Lock = std::mutex
         ,class DisposerThread = dispose_thread<Buffer>
         ,class Backoff = cds::backoff::Default
index 5e2fc622d1060411fff3678b69815281aa2b55c1..9319560350c27ea984cc0b3d5aaff26a0bbc0e04 100644 (file)
@@ -8,6 +8,7 @@
 #include <mutex>
 #include <condition_variable>
 #include <cds/details/aligned_type.h>
+#include <cds/algo/atomic.h>
 
 namespace cds { namespace urcu {
 
@@ -26,7 +27,7 @@ namespace cds { namespace urcu {
         typedef Buffer  buffer_type ;   ///< Buffer type
     private:
         //@cond
-        typedef std::thread                 thread_type;
+        typedef std::thread             thread_type;
         typedef std::mutex              mutex_type;
         typedef std::condition_variable condvar_type;
         typedef std::unique_lock< mutex_type >  unique_lock;
@@ -53,15 +54,15 @@ namespace cds { namespace urcu {
         condvar_type    m_cvDataReady;
 
         // Task for thread (dispose cycle)
-        buffer_type * volatile  m_pBuffer;
-        uint64_t volatile       m_nCurEpoch;
+        atomics::atomic<buffer_type *>  m_pBuffer;
+        uint64_t volatile      m_nCurEpoch;
 
         // Quit flag
-        bool volatile           m_bQuit;
+        atomics::atomic<bool>  m_bQuit;
 
         // disposing pass sync
-        condvar_type            m_cvReady;
-        bool volatile           m_bReady;
+        condvar_type           m_cvReady;
+        atomics::atomic<bool>  m_bReady;
         //@endcond
 
     private: // methods called from disposing thread
@@ -72,53 +73,46 @@ namespace cds { namespace urcu {
             uint64_t        nCurEpoch;
             bool            bQuit = false;
 
-            epoch_retired_ptr rest;
-
             while ( !bQuit ) {
+
+                // signal that we are ready to dispose
                 {
                     unique_lock lock( m_Mutex );
+                    m_bReady.store( true, atomics::memory_order_relaxed );
+                }
+                m_cvReady.notify_one();
 
-                    // signal that we are ready to dispose
-                    m_bReady = true;
-                    m_cvReady.notify_one();
-
+                {
                     // wait new data portion
-                    while ( !m_pBuffer )
+                    unique_lock lock( m_Mutex );
+
+                    while ( (pBuffer = m_pBuffer.load( atomics::memory_order_relaxed )) == nullptr )
                         m_cvDataReady.wait( lock );
 
                     // New work is ready
-                    m_bReady = false ;   // we are busy
+                    m_bReady.store( false, atomics::memory_order_relaxed ); // we are busy
 
-                    bQuit = m_bQuit;
+                    bQuit = m_bQuit.load( atomics::memory_order_relaxed );
                     nCurEpoch = m_nCurEpoch;
-                    pBuffer = m_pBuffer;
-                    m_pBuffer = nullptr;
-                }
-
-                if ( rest.m_p ) {
-                    assert( rest.m_nEpoch <= nCurEpoch );
-                    rest.free();
+                    m_pBuffer.store( nullptr, atomics::memory_order_relaxed );
                 }
 
                 if ( pBuffer )
-                    rest = dispose_buffer( pBuffer, nCurEpoch );
+                    dispose_buffer( pBuffer, nCurEpoch );
             }
         }
 
-        epoch_retired_ptr dispose_buffer( buffer_type * pBuf, uint64_t nCurEpoch )
+        void dispose_buffer( buffer_type * pBuf, uint64_t nCurEpoch )
         {
-            epoch_retired_ptr p;
-            while ( pBuf->pop( p )) {
-                if ( p.m_nEpoch <= nCurEpoch ) {
-                    p.free();
+            epoch_retired_ptr * p;
+            while ( ( p = pBuf->front()) != nullptr ) {
+                if ( p->m_nEpoch <= nCurEpoch ) {
+                    p->free();
+                    CDS_VERIFY( pBuf->pop_front() );
                 }
-                else {
-                    if ( !pBuf->push( p ))
-                        return p;
+                else
                     break;
-                }
             }
-            return epoch_retired_ptr();
         }
         //@endcond
 
@@ -156,13 +150,13 @@ namespace cds { namespace urcu {
                 unique_lock lock( m_Mutex );
 
                 // wait while retiring pass done
-                while ( !m_bReady )
+                while ( !m_bReady.load( atomics::memory_order_relaxed ))
                     m_cvReady.wait( lock );
 
                 // give a new work and set stop flag
-                m_pBuffer = &buf;
                 m_nCurEpoch = nCurEpoch;
-                m_bQuit = true;
+                m_pBuffer.store( &buf, atomics::memory_order_relaxed );
+                m_bQuit.store( true, atomics::memory_order_relaxed );
             }
             m_cvDataReady.notify_one();
 
@@ -182,23 +176,23 @@ namespace cds { namespace urcu {
         */
         void dispose( buffer_type& buf, uint64_t nCurEpoch, bool bSync )
         {
-            unique_lock lock( m_Mutex );
-
-            // wait while disposing pass done
-            while ( !m_bReady )
-                m_cvReady.wait( lock );
-
-            if ( bSync )
-                m_bReady = false;
+            {
+                unique_lock lock( m_Mutex );
 
-            // new work
-            m_nCurEpoch = nCurEpoch;
-            m_pBuffer = &buf;
+                // wait while disposing pass done
+                while ( !m_bReady.load( atomics::memory_order_relaxed ))
+                    m_cvReady.wait( lock );
 
+                // new work
+                m_bReady.store( false, atomics::memory_order_relaxed );
+                m_nCurEpoch = nCurEpoch;
+                m_pBuffer.store( &buf, atomics::memory_order_relaxed );
+            }
             m_cvDataReady.notify_one();
 
             if ( bSync ) {
-                while ( !m_bReady )
+                unique_lock lock( m_Mutex );
+                while ( !m_bReady.load( atomics::memory_order_relaxed ))
                     m_cvReady.wait( lock );
             }
         }
index 3cabc8fbbd08d1adad89b501606c217d26245cfe..b72d22869821c2ef9b58c91ab439a6f2039a0dea 100644 (file)
@@ -13,8 +13,8 @@ namespace cds { namespace urcu {
         This is a wrapper around \p general_threaded class.
 
         Template arguments:
-        - \p Buffer - lock-free queue or lock-free bounded queue.
-            Default is \p cds::container::VyukovMPMCCycleQueue< retired_ptr >
+        - \p Buffer - lock-free MPSC (muliple producer/single consumer) queue.
+            Default is \p cds::container::VyukovMPSCCycleQueue< retired_ptr >
         - \p Lock - mutex type, default is \p std::mutex
         - \p DisposerThread - reclamation thread class, default is \p cds::urcu::dispose_thread
             See \ref cds::urcu::dispose_thread for class interface.
@@ -23,7 +23,7 @@ namespace cds { namespace urcu {
     */
     template <
 #ifdef CDS_DOXGEN_INVOKED
-        class Buffer = cds::container::VyukovMPMCCycleQueue< epoch_retired_ptr >
+        class Buffer = cds::container::VyukovMPSCCycleQueue< epoch_retired_ptr >
         ,class Lock = std::mutex
         ,class DisposerThread = dispose_thread<Buffer>
         ,class Backoff = cds::backoff::Default
index 28eb5558cc4a092562de9ab74a42280c8e09da75..24aeb34b73062a75fbec7d2eb7b66540685ac969 100644 (file)
@@ -15,8 +15,8 @@ namespace cds { namespace urcu {
         This is a wrapper around \p signal_threaded class.
 
         Template arguments:
-        - \p Buffer - lock-free queue or lock-free bounded queue.
-            Default is \p cds::container::VyukovMPMCCycleQueue< retired_ptr >
+        - \p Buffer - lock-free MPSC (muliple producer/single consumer) queue.
+            Default is \p cds::container::VyukovMPSCCycleQueue< epoch_retired_ptr >
         - \p Lock - mutex type, default is \p std::mutex
         - \p DisposerThread - reclamation thread class, default is \p %general_threaded_dispose_thread
             See \ref cds::urcu::dispose_thread for class interface.
@@ -25,7 +25,7 @@ namespace cds { namespace urcu {
     */
     template <
 #ifdef CDS_DOXGEN_INVOKED
-        class Buffer = cds::container::VyukovMPMCCycleQueue< epoch_retired_ptr >
+        class Buffer = cds::container::VyukovMPSCCycleQueue< epoch_retired_ptr >
         ,class Lock = std::mutex
         ,class DisposerThread = dispose_thread<Buffer>
         ,class Backoff = cds::backoff::Default