3 #ifndef CDSLIB_URCU_DETAILS_GPT_H
4 #define CDSLIB_URCU_DETAILS_GPT_H
6 #include <mutex> //unique_lock
8 #include <cds/urcu/details/gp.h>
9 #include <cds/urcu/dispose_thread.h>
10 #include <cds/algo/backoff_strategy.h>
11 #include <cds/container/vyukov_mpmc_cycle_queue.h>
13 namespace cds { namespace urcu {
15 /// User-space general-purpose RCU with deferred threaded reclamation
17 @headerfile cds/urcu/general_threaded.h
19 This implementation is similar to \ref general_buffered but separate thread is created
20 for deleting the retired objects. Like \p %general_buffered, the class contains an internal buffer
21 where retired objects are accumulated. When the buffer becomes full,
22 the RCU \p synchronize() function is called that waits until all reader/updater threads end up their read-side critical sections,
23 i.e. until the RCU quiescent state will come. After that the "work ready" message is sent to reclamation thread.
24 The reclamation thread frees the buffer.
25 This synchronization cycle may be called in any thread that calls \p retire_ptr() function.
27 There is a wrapper \ref cds_urcu_general_threaded_gc "gc<general_threaded>" for \p %general_threaded class
28 that provides unified RCU interface. You should use this wrapper class instead \p %general_threaded
30 The \p Buffer contains items of \ref cds_urcu_retired_ptr "epoch_retired_ptr" type
31 and it should support a multiple producer/single consumer queue with the following interface:
32 - <tt> bool push( epoch_retired_ptr& p ) </tt> - places the retired pointer \p p into queue. If the function
33 returns \p false it means that the buffer is full and RCU synchronization cycle must be processed.
34 - <tt>epoch_retired_ptr * front() </tt> - returns a pointer to the top element or \p nullptr if the buffer is empty.
35 - <tt>bool pop_front() </tt> - pops the top element; returns \p false if the buffer is empty.
36 - <tt>size_t size()</tt> - returns queue's item count.
38 The buffer is considered as full if \p push() returns \p false or the buffer size reaches the RCU threshold.
41 - \p Buffer - MPSC (muliple producer/single consumer) buffer type with FIFO semantics.
42 Default is \p cds::container::VyukovMPSCCycleQueue. The buffer contains the objects of \ref epoch_retired_ptr
43 type that contains additional \p m_nEpoch field. This field specifies an epoch when the object
44 has been placed into the buffer. The \p %general_threaded object has a global epoch counter
45 that is incremented on each \p synchronize() call. The epoch is used internally to prevent early deletion.
46 - \p Lock - mutex type, default is \p std::mutex
47 - \p DisposerThread - the reclamation thread class. Default is \ref cds::urcu::dispose_thread,
48 see the description of this class for required interface.
49 - \p Backoff - back-off schema, default is cds::backoff::Default
52 class Buffer = cds::container::VyukovMPSCCycleQueue< epoch_retired_ptr >
53 ,class Lock = std::mutex
54 ,class DisposerThread = dispose_thread<Buffer>
55 ,class Backoff = cds::backoff::Default
57 class general_threaded: public details::gp_singleton< general_threaded_tag >
60 typedef details::gp_singleton< general_threaded_tag > base_class;
63 typedef Buffer buffer_type ; ///< Buffer type
64 typedef Lock lock_type ; ///< Lock type
65 typedef Backoff back_off ; ///< Back-off scheme
66 typedef DisposerThread disposer_thread ; ///< Disposer thread type
68 typedef general_threaded_tag rcu_tag ; ///< Thread-side RCU part
69 typedef base_class::thread_gc thread_gc ; ///< Access lock class
70 typedef typename thread_gc::scoped_lock scoped_lock ; ///< Access lock class
72 static bool const c_bBuffered = true ; ///< This RCU buffers disposed elements
76 typedef details::gp_singleton_instance< rcu_tag > singleton_ptr;
78 struct scoped_disposer {
79 void operator ()( general_threaded * p )
89 atomics::atomic<uint64_t> m_nCurEpoch;
91 size_t const m_nCapacity;
92 disposer_thread m_DisposerThread;
96 /// Returns singleton instance
97 static general_threaded * instance()
99 return static_cast<general_threaded *>( base_class::instance() );
101 /// Checks if the singleton is created and ready to use
104 return singleton_ptr::s_pRCU != nullptr;
109 general_threaded( size_t nBufferCapacity )
110 : m_Buffer( nBufferCapacity )
112 , m_nCapacity( nBufferCapacity )
118 base_class::flip_and_wait( bkoff );
121 // Return: true - synchronize has been called, false - otherwise
122 bool push_buffer( epoch_retired_ptr&& p )
124 bool bPushed = m_Buffer.push( p );
125 if ( !bPushed || m_Buffer.size() >= capacity() ) {
142 /// Creates singleton object and starts reclamation thread
144 The \p nBufferCapacity parameter defines RCU threshold.
146 static void Construct( size_t nBufferCapacity = 256 )
148 if ( !singleton_ptr::s_pRCU ) {
149 std::unique_ptr< general_threaded, scoped_disposer > pRCU( new general_threaded( nBufferCapacity ) );
150 pRCU->m_DisposerThread.start();
152 singleton_ptr::s_pRCU = pRCU.release();
156 /// Destroys singleton object and terminates internal reclamation thread
157 static void Destruct( bool bDetachAll = false )
160 general_threaded * pThis = instance();
162 pThis->m_ThreadList.detach_all();
164 pThis->m_DisposerThread.stop( pThis->m_Buffer, std::numeric_limits< uint64_t >::max());
167 singleton_ptr::s_pRCU = nullptr;
172 /// Retires \p p pointer
174 The method pushes \p p pointer to internal buffer.
175 When the buffer becomes full \ref synchronize function is called
176 to wait for the end of grace period and then
177 a message is sent to the reclamation thread.
179 virtual void retire_ptr( retired_ptr& p )
182 push_buffer( epoch_retired_ptr( p, m_nCurEpoch.load( atomics::memory_order_acquire )));
185 /// Retires the pointer chain [\p itFirst, \p itLast)
186 template <typename ForwardIterator>
187 void batch_retire( ForwardIterator itFirst, ForwardIterator itLast )
189 uint64_t nEpoch = m_nCurEpoch.load( atomics::memory_order_acquire );
190 while ( itFirst != itLast ) {
191 epoch_retired_ptr ep( *itFirst, nEpoch );
193 push_buffer( std::move(ep));
197 /// Retires the pointer chain until \p Func returns \p nullptr retired pointer
198 template <typename Func>
199 void batch_retire( Func e )
201 uint64_t nEpoch = m_nCurEpoch.load( atomics::memory_order_acquire );
202 for ( retired_ptr p{ e() }; p.m_p; ) {
203 epoch_retired_ptr ep( p, nEpoch );
205 push_buffer( std::move(ep));
209 /// Waits to finish a grace period and calls disposing thread
212 synchronize( false );
216 void synchronize( bool bSync )
218 uint64_t nPrevEpoch = m_nCurEpoch.fetch_add( 1, atomics::memory_order_release );
220 std::unique_lock<lock_type> sl( m_Lock );
224 m_DisposerThread.dispose( m_Buffer, nPrevEpoch, bSync );
232 /// Returns the threshold of internal buffer
233 size_t capacity() const
238 }} // namespace cds::urcu
240 #endif // #ifndef CDSLIB_URCU_DETAILS_GPT_H