Rebuilt threaded uRCU logic
[libcds.git] / cds / urcu / details / sig_threaded.h
1 //$$CDS-header$$1
2
3 #ifndef CDSLIB_URCU_DETAILS_SIG_THREADED_H
4 #define CDSLIB_URCU_DETAILS_SIG_THREADED_H
5
6 #include <cds/urcu/details/sh.h>
7 #ifdef CDS_URCU_SIGNAL_HANDLING_ENABLED
8
9 #include <mutex>    //unique_lock
10 #include <limits>
11 #include <cds/urcu/dispose_thread.h>
12 #include <cds/algo/backoff_strategy.h>
13 #include <cds/container/vyukov_mpmc_cycle_queue.h>
14
15 namespace cds { namespace urcu {
16
17     /// User-space signal-handled RCU with deferred threaded reclamation
18     /**
19         @headerfile cds/urcu/signal_threaded.h
20
21         This implementation is similar to \ref signal_buffered but separate thread is created
22         for deleting the retired objects. Like \p %signal_buffered, the class contains an internal buffer
23         where retired objects are accumulated. When the buffer becomes full,
24         the RCU \p synchronize function is called that waits until all reader/updater threads end up their read-side critical sections,
25         i.e. until the RCU quiescent state will come. After that the "work ready" message is sent to reclamation thread.
26         The reclamation thread frees the buffer.
27         This synchronization cycle may be called in any thread that calls \ref retire_ptr function.
28
29         There is a wrapper \ref cds_urcu_signal_threaded_gc "gc<signal_threaded>" for \p %signal_threaded class
30         that provides unified RCU interface. You should use this wrapper class instead \p %signal_threaded
31
32         Template arguments:
33         - \p Buffer - buffer type with FIFO semantics. Default is \p cds::container::VyukovMPSCCycleQueue. See \ref signal_buffered
34             for description of buffer's interface. The buffer contains the objects of \ref epoch_retired_ptr
35             type that contains additional \p m_nEpoch field. This field specifies an epoch when the object
36             has been placed into the buffer. The \p %signal_threaded object has a global epoch counter
37             that is incremented on each \p synchronize() call. The epoch is used internally to prevent early deletion.
38         - \p Lock - mutex type, default is \p std::mutex
39         - \p DisposerThread - the reclamation thread class. Default is \ref cds::urcu::dispose_thread,
40             see the description of this class for required interface.
41         - \p Backoff - back-off schema, default is cds::backoff::Default
42     */
43     template <
44         class Buffer = cds::container::VyukovMPSCCycleQueue< epoch_retired_ptr >
45         ,class Lock = std::mutex
46         ,class DisposerThread = dispose_thread<Buffer>
47         ,class Backoff = cds::backoff::Default
48     >
49     class signal_threaded: public details::sh_singleton< signal_threaded_tag >
50     {
51         //@cond
52         typedef details::sh_singleton< signal_threaded_tag > base_class;
53         //@endcond
54     public:
55         typedef Buffer          buffer_type ;   ///< Buffer type
56         typedef Lock            lock_type   ;   ///< Lock type
57         typedef Backoff         back_off    ;   ///< Back-off scheme
58         typedef DisposerThread  disposer_thread ;   ///< Disposer thread type
59
60         typedef signal_threaded_tag     rcu_tag ;       ///< Thread-side RCU part
61         typedef base_class::thread_gc   thread_gc ;     ///< Access lock class
62         typedef typename thread_gc::scoped_lock scoped_lock ; ///< Access lock class
63
64         static bool const c_bBuffered = true ; ///< This RCU buffers disposed elements
65
66     protected:
67         //@cond
68         typedef details::sh_singleton_instance< rcu_tag >    singleton_ptr;
69
70         struct scoped_disposer {
71             void operator ()( signal_threaded * p )
72             {
73                 delete p;
74             }
75         };
76         //@endcond
77
78     protected:
79         //@cond
80         buffer_type                  m_Buffer;
81         atomics::atomic<uint64_t>    m_nCurEpoch;
82         lock_type                    m_Lock;
83         size_t const                 m_nCapacity;
84         disposer_thread              m_DisposerThread;
85         //@endcond
86
87     public:
88         /// Returns singleton instance
89         static signal_threaded * instance()
90         {
91             return static_cast<signal_threaded *>( base_class::instance() );
92         }
93         /// Checks if the singleton is created and ready to use
94         static bool isUsed()
95         {
96             return singleton_ptr::s_pRCU != nullptr;
97         }
98
99     protected:
100         //@cond
101         signal_threaded( size_t nBufferCapacity, int nSignal = SIGUSR1 )
102             : base_class( nSignal )
103             , m_Buffer( nBufferCapacity )
104             , m_nCurEpoch( 1 )
105             , m_nCapacity( nBufferCapacity )
106         {}
107
108         // Return: true - synchronize has been called, false - otherwise
109         bool push_buffer( epoch_retired_ptr&& p )
110         {
111             bool bPushed = m_Buffer.push( p );
112             if ( !bPushed || m_Buffer.size() >= capacity() ) {
113                 synchronize();
114                 if ( !bPushed ) {
115                     p.free();
116                 }
117                 return true;
118             }
119             return false;
120         }
121
122         //@endcond
123
124     public:
125         //@cond
126         ~signal_threaded()
127         {}
128         //@endcond
129
130         /// Creates singleton object and starts reclamation thread
131         /**
132             The \p nBufferCapacity parameter defines RCU threshold.
133
134             The \p nSignal parameter defines a signal number stated for RCU, default is \p SIGUSR1
135         */
136         static void Construct( size_t nBufferCapacity = 256, int nSignal = SIGUSR1 )
137         {
138             if ( !singleton_ptr::s_pRCU ) {
139                 std::unique_ptr< signal_threaded, scoped_disposer > pRCU( new signal_threaded( nBufferCapacity, nSignal ) );
140                 pRCU->m_DisposerThread.start();
141
142                 singleton_ptr::s_pRCU = pRCU.release();
143             }
144         }
145
146         /// Destroys singleton object and terminates internal reclamation thread
147         static void Destruct( bool bDetachAll = false )
148         {
149             if ( isUsed() ) {
150                 signal_threaded * pThis = instance();
151                 if ( bDetachAll )
152                     pThis->m_ThreadList.detach_all();
153
154                 pThis->m_DisposerThread.stop( pThis->m_Buffer, std::numeric_limits< uint64_t >::max());
155
156                 delete pThis;
157                 singleton_ptr::s_pRCU = nullptr;
158             }
159         }
160
161     public:
162         /// Retires \p p pointer
163         /**
164             The method pushes \p p pointer to internal buffer.
165             When the buffer becomes full \ref synchronize function is called
166             to wait for the end of grace period and then
167             a message is sent to the reclamation thread.
168         */
169         virtual void retire_ptr( retired_ptr& p )
170         {
171             if ( p.m_p )
172                 push_buffer( epoch_retired_ptr( p, m_nCurEpoch.load( atomics::memory_order_acquire )));
173         }
174
175         /// Retires the pointer chain [\p itFirst, \p itLast)
176         template <typename ForwardIterator>
177         void batch_retire( ForwardIterator itFirst, ForwardIterator itLast )
178         {
179             uint64_t nEpoch = m_nCurEpoch.load( atomics::memory_order_relaxed );
180             while ( itFirst != itLast ) {
181                 epoch_retired_ptr ep( *itFirst, nEpoch );
182                 ++itFirst;
183                 push_buffer( std::move(ep));
184             }
185         }
186
187         /// Retires the pointer chain until \p Func returns \p nullptr retired pointer
188         template <typename Func>
189         void batch_retire( Func e )
190         {
191             uint64_t nEpoch = m_nCurEpoch.load( atomics::memory_order_relaxed );
192             for ( retired_ptr p{ e() }; p.m_p; ) {
193                 epoch_retired_ptr ep( p, nEpoch );
194                 p = e();
195                 push_buffer( std::move(ep));
196             }
197         }
198
199
200         /// Waits to finish a grace period and calls disposing thread
201         void synchronize()
202         {
203             synchronize( false );
204         }
205
206         //@cond
207         void synchronize( bool bSync )
208         {
209             uint64_t nPrevEpoch = m_nCurEpoch.fetch_add( 1, atomics::memory_order_release );
210
211             {
212                 std::unique_lock<lock_type> sl( m_Lock );
213
214                 back_off bkOff;
215                 base_class::force_membar_all_threads( bkOff );
216                 base_class::switch_next_epoch();
217                 bkOff.reset();
218                 base_class::wait_for_quiescent_state( bkOff );
219                 base_class::switch_next_epoch();
220                 bkOff.reset();
221                 base_class::wait_for_quiescent_state( bkOff );
222                 base_class::force_membar_all_threads( bkOff );
223
224                 m_DisposerThread.dispose( m_Buffer, nPrevEpoch, bSync );
225             }
226         }
227         void force_dispose()
228         {
229             synchronize( true );
230         }
231         //@endcond
232
233         /// Returns the threshold of internal buffer
234         size_t capacity() const
235         {
236             return m_nCapacity;
237         }
238
239         /// Returns the signal number stated for RCU
240         int signal_no() const
241         {
242             return base_class::signal_no();
243         }
244     };
245 }} // namespace cds::urcu
246
247 #endif // #ifdef CDS_URCU_SIGNAL_HANDLING_ENABLED
248 #endif // #ifndef CDSLIB_URCU_DETAILS_SIG_THREADED_H