3 #ifndef CDSLIB_URCU_DETAILS_SIG_THREADED_H
4 #define CDSLIB_URCU_DETAILS_SIG_THREADED_H
6 #include <cds/urcu/details/sh.h>
7 #ifdef CDS_URCU_SIGNAL_HANDLING_ENABLED
9 #include <mutex> //unique_lock
11 #include <cds/urcu/dispose_thread.h>
12 #include <cds/algo/backoff_strategy.h>
13 #include <cds/container/vyukov_mpmc_cycle_queue.h>
15 namespace cds { namespace urcu {
17 /// User-space signal-handled RCU with deferred threaded reclamation
19 @headerfile cds/urcu/signal_threaded.h
21 This implementation is similar to \ref signal_buffered but separate thread is created
22 for deleting the retired objects. Like \p %signal_buffered, the class contains an internal buffer
23 where retired objects are accumulated. When the buffer becomes full,
24 the RCU \p synchronize function is called that waits until all reader/updater threads end up their read-side critical sections,
25 i.e. until the RCU quiescent state will come. After that the "work ready" message is sent to reclamation thread.
26 The reclamation thread frees the buffer.
27 This synchronization cycle may be called in any thread that calls \ref retire_ptr function.
29 There is a wrapper \ref cds_urcu_signal_threaded_gc "gc<signal_threaded>" for \p %signal_threaded class
30 that provides unified RCU interface. You should use this wrapper class instead \p %signal_threaded
33 - \p Buffer - buffer type with FIFO semantics. Default is \p cds::container::VyukovMPSCCycleQueue. See \ref signal_buffered
34 for description of buffer's interface. The buffer contains the objects of \ref epoch_retired_ptr
35 type that contains additional \p m_nEpoch field. This field specifies an epoch when the object
36 has been placed into the buffer. The \p %signal_threaded object has a global epoch counter
37 that is incremented on each \p synchronize() call. The epoch is used internally to prevent early deletion.
38 - \p Lock - mutex type, default is \p std::mutex
39 - \p DisposerThread - the reclamation thread class. Default is \ref cds::urcu::dispose_thread,
40 see the description of this class for required interface.
41 - \p Backoff - back-off schema, default is cds::backoff::Default
44 class Buffer = cds::container::VyukovMPSCCycleQueue< epoch_retired_ptr >
45 ,class Lock = std::mutex
46 ,class DisposerThread = dispose_thread<Buffer>
47 ,class Backoff = cds::backoff::Default
49 class signal_threaded: public details::sh_singleton< signal_threaded_tag >
52 typedef details::sh_singleton< signal_threaded_tag > base_class;
55 typedef Buffer buffer_type ; ///< Buffer type
56 typedef Lock lock_type ; ///< Lock type
57 typedef Backoff back_off ; ///< Back-off scheme
58 typedef DisposerThread disposer_thread ; ///< Disposer thread type
60 typedef signal_threaded_tag rcu_tag ; ///< Thread-side RCU part
61 typedef base_class::thread_gc thread_gc ; ///< Access lock class
62 typedef typename thread_gc::scoped_lock scoped_lock ; ///< Access lock class
64 static bool const c_bBuffered = true ; ///< This RCU buffers disposed elements
68 typedef details::sh_singleton_instance< rcu_tag > singleton_ptr;
70 struct scoped_disposer {
71 void operator ()( signal_threaded * p )
81 atomics::atomic<uint64_t> m_nCurEpoch;
83 size_t const m_nCapacity;
84 disposer_thread m_DisposerThread;
88 /// Returns singleton instance
89 static signal_threaded * instance()
91 return static_cast<signal_threaded *>( base_class::instance() );
93 /// Checks if the singleton is created and ready to use
96 return singleton_ptr::s_pRCU != nullptr;
101 signal_threaded( size_t nBufferCapacity, int nSignal = SIGUSR1 )
102 : base_class( nSignal )
103 , m_Buffer( nBufferCapacity )
105 , m_nCapacity( nBufferCapacity )
108 // Return: true - synchronize has been called, false - otherwise
109 bool push_buffer( epoch_retired_ptr&& p )
111 bool bPushed = m_Buffer.push( p );
112 if ( !bPushed || m_Buffer.size() >= capacity() ) {
130 /// Creates singleton object and starts reclamation thread
132 The \p nBufferCapacity parameter defines RCU threshold.
134 The \p nSignal parameter defines a signal number stated for RCU, default is \p SIGUSR1
136 static void Construct( size_t nBufferCapacity = 256, int nSignal = SIGUSR1 )
138 if ( !singleton_ptr::s_pRCU ) {
139 std::unique_ptr< signal_threaded, scoped_disposer > pRCU( new signal_threaded( nBufferCapacity, nSignal ) );
140 pRCU->m_DisposerThread.start();
142 singleton_ptr::s_pRCU = pRCU.release();
146 /// Destroys singleton object and terminates internal reclamation thread
147 static void Destruct( bool bDetachAll = false )
150 signal_threaded * pThis = instance();
152 pThis->m_ThreadList.detach_all();
154 pThis->m_DisposerThread.stop( pThis->m_Buffer, std::numeric_limits< uint64_t >::max());
157 singleton_ptr::s_pRCU = nullptr;
162 /// Retires \p p pointer
164 The method pushes \p p pointer to internal buffer.
165 When the buffer becomes full \ref synchronize function is called
166 to wait for the end of grace period and then
167 a message is sent to the reclamation thread.
169 virtual void retire_ptr( retired_ptr& p )
172 push_buffer( epoch_retired_ptr( p, m_nCurEpoch.load( atomics::memory_order_acquire )));
175 /// Retires the pointer chain [\p itFirst, \p itLast)
176 template <typename ForwardIterator>
177 void batch_retire( ForwardIterator itFirst, ForwardIterator itLast )
179 uint64_t nEpoch = m_nCurEpoch.load( atomics::memory_order_relaxed );
180 while ( itFirst != itLast ) {
181 epoch_retired_ptr ep( *itFirst, nEpoch );
183 push_buffer( std::move(ep));
187 /// Retires the pointer chain until \p Func returns \p nullptr retired pointer
188 template <typename Func>
189 void batch_retire( Func e )
191 uint64_t nEpoch = m_nCurEpoch.load( atomics::memory_order_relaxed );
192 for ( retired_ptr p{ e() }; p.m_p; ) {
193 epoch_retired_ptr ep( p, nEpoch );
195 push_buffer( std::move(ep));
200 /// Waits to finish a grace period and calls disposing thread
203 synchronize( false );
207 void synchronize( bool bSync )
209 uint64_t nPrevEpoch = m_nCurEpoch.fetch_add( 1, atomics::memory_order_release );
212 std::unique_lock<lock_type> sl( m_Lock );
215 base_class::force_membar_all_threads( bkOff );
216 base_class::switch_next_epoch();
218 base_class::wait_for_quiescent_state( bkOff );
219 base_class::switch_next_epoch();
221 base_class::wait_for_quiescent_state( bkOff );
222 base_class::force_membar_all_threads( bkOff );
224 m_DisposerThread.dispose( m_Buffer, nPrevEpoch, bSync );
233 /// Returns the threshold of internal buffer
234 size_t capacity() const
239 /// Returns the signal number stated for RCU
240 int signal_no() const
242 return base_class::signal_no();
245 }} // namespace cds::urcu
247 #endif // #ifdef CDS_URCU_SIGNAL_HANDLING_ENABLED
248 #endif // #ifndef CDSLIB_URCU_DETAILS_SIG_THREADED_H