3 #ifndef CDSLIB_URCU_DETAILS_GPT_H
4 #define CDSLIB_URCU_DETAILS_GPT_H
6 #include <mutex> //unique_lock
8 #include <cds/urcu/details/gp.h>
9 #include <cds/urcu/dispose_thread.h>
10 #include <cds/algo/backoff_strategy.h>
11 #include <cds/container/vyukov_mpmc_cycle_queue.h>
13 namespace cds { namespace urcu {
15 /// User-space general-purpose RCU with deferred threaded reclamation
17 @headerfile cds/urcu/general_threaded.h
19 This implementation is similar to \ref general_buffered but separate thread is created
20 for deleting the retired objects. Like \p %general_buffered, the class contains an internal buffer
21 where retired objects are accumulated. When the buffer becomes full,
22 the RCU \p synchronize() function is called that waits until all reader/updater threads end up their read-side critical sections,
23 i.e. until the RCU quiescent state will come. After that the "work ready" message is sent to reclamation thread.
24 The reclamation thread frees the buffer.
25 This synchronization cycle may be called in any thread that calls \p retire_ptr() function.
27 There is a wrapper \ref cds_urcu_general_threaded_gc "gc<general_threaded>" for \p %general_threaded class
28 that provides unified RCU interface. You should use this wrapper class instead \p %general_threaded
30 The \p Buffer contains items of \ref cds_urcu_retired_ptr "epoch_retired_ptr" type
32 and it should support a multiple producer/single consumer queue with the following interface:
33 - <tt> bool push( epoch_retired_ptr& p ) </tt> - places the retired pointer \p p into queue. If the function
34 returns \p false it means that the buffer is full and RCU synchronization cycle must be processed.
35 - <tt>epoch_retired_ptr * front() </tt> - returns a pointer to the top element or \p nullptr if the buffer is empty.
36 - <tt>bool pop_front() </tt> - pops the top element; returns \p false if the buffer is empty.
37 - <tt>size_t size()</tt> - returns queue's item count.
39 The buffer is considered as full if \p push() returns \p false or the buffer size reaches the RCU threshold.
42 - \p Buffer - MPSC (muliple producer/single consumer) buffer type with FIFO semantics.
44 Default is \p cds::container::VyukovMPSCCycleQueue. The buffer contains the objects of \ref epoch_retired_ptr
45 type that contains additional \p m_nEpoch field. This field specifies an epoch when the object
46 has been placed into the buffer. The \p %general_threaded object has a global epoch counter
47 that is incremented on each \p synchronize() call. The epoch is used internally to prevent early deletion.
48 - \p Lock - mutex type, default is \p std::mutex
49 - \p DisposerThread - the reclamation thread class. Default is \ref cds::urcu::dispose_thread,
50 see the description of this class for required interface.
51 - \p Backoff - back-off schema, default is cds::backoff::Default
54 class Buffer = cds::container::VyukovMPSCCycleQueue< epoch_retired_ptr >
55 ,class Lock = std::mutex
56 ,class DisposerThread = dispose_thread<Buffer>
57 ,class Backoff = cds::backoff::Default
59 class general_threaded: public details::gp_singleton< general_threaded_tag >
62 typedef details::gp_singleton< general_threaded_tag > base_class;
65 typedef Buffer buffer_type ; ///< Buffer type
66 typedef Lock lock_type ; ///< Lock type
67 typedef Backoff back_off ; ///< Back-off scheme
68 typedef DisposerThread disposer_thread ; ///< Disposer thread type
70 typedef general_threaded_tag rcu_tag ; ///< Thread-side RCU part
71 typedef base_class::thread_gc thread_gc ; ///< Access lock class
72 typedef typename thread_gc::scoped_lock scoped_lock ; ///< Access lock class
74 static bool const c_bBuffered = true ; ///< This RCU buffers disposed elements
78 typedef details::gp_singleton_instance< rcu_tag > singleton_ptr;
80 struct scoped_disposer {
81 void operator ()( general_threaded * p )
91 atomics::atomic<uint64_t> m_nCurEpoch;
93 size_t const m_nCapacity;
94 disposer_thread m_DisposerThread;
98 /// Returns singleton instance
99 static general_threaded * instance()
101 return static_cast<general_threaded *>( base_class::instance() );
103 /// Checks if the singleton is created and ready to use
106 return singleton_ptr::s_pRCU != nullptr;
111 general_threaded( size_t nBufferCapacity )
112 : m_Buffer( nBufferCapacity )
114 , m_nCapacity( nBufferCapacity )
120 base_class::flip_and_wait( bkoff );
123 // Return: true - synchronize has been called, false - otherwise
124 bool push_buffer( epoch_retired_ptr&& p )
126 bool bPushed = m_Buffer.push( p );
127 if ( !bPushed || m_Buffer.size() >= capacity() ) {
144 /// Creates singleton object and starts reclamation thread
146 The \p nBufferCapacity parameter defines RCU threshold.
148 static void Construct( size_t nBufferCapacity = 256 )
150 if ( !singleton_ptr::s_pRCU ) {
151 std::unique_ptr< general_threaded, scoped_disposer > pRCU( new general_threaded( nBufferCapacity ) );
152 pRCU->m_DisposerThread.start();
154 singleton_ptr::s_pRCU = pRCU.release();
158 /// Destroys singleton object and terminates internal reclamation thread
159 static void Destruct( bool bDetachAll = false )
162 general_threaded * pThis = instance();
164 pThis->m_ThreadList.detach_all();
166 pThis->m_DisposerThread.stop( pThis->m_Buffer, std::numeric_limits< uint64_t >::max());
169 singleton_ptr::s_pRCU = nullptr;
174 /// Retires \p p pointer
176 The method pushes \p p pointer to internal buffer.
177 When the buffer becomes full \ref synchronize function is called
178 to wait for the end of grace period and then
179 a message is sent to the reclamation thread.
181 virtual void retire_ptr( retired_ptr& p )
184 push_buffer( epoch_retired_ptr( p, m_nCurEpoch.load( atomics::memory_order_acquire )));
187 /// Retires the pointer chain [\p itFirst, \p itLast)
188 template <typename ForwardIterator>
189 void batch_retire( ForwardIterator itFirst, ForwardIterator itLast )
191 uint64_t nEpoch = m_nCurEpoch.load( atomics::memory_order_acquire );
192 while ( itFirst != itLast ) {
193 epoch_retired_ptr ep( *itFirst, nEpoch );
195 push_buffer( std::move(ep));
199 /// Retires the pointer chain until \p Func returns \p nullptr retired pointer
200 template <typename Func>
201 void batch_retire( Func e )
203 uint64_t nEpoch = m_nCurEpoch.load( atomics::memory_order_acquire );
204 for ( retired_ptr p{ e() }; p.m_p; ) {
205 epoch_retired_ptr ep( p, nEpoch );
207 push_buffer( std::move(ep));
211 /// Waits to finish a grace period and calls disposing thread
214 synchronize( false );
218 void synchronize( bool bSync )
220 uint64_t nPrevEpoch = m_nCurEpoch.fetch_add( 1, atomics::memory_order_release );
222 std::unique_lock<lock_type> sl( m_Lock );
226 m_DisposerThread.dispose( m_Buffer, nPrevEpoch, bSync );
234 /// Returns the threshold of internal buffer
235 size_t capacity() const
240 }} // namespace cds::urcu
242 #endif // #ifndef CDSLIB_URCU_DETAILS_GPT_H