Text formatting, docfix
[libcds.git] / cds / urcu / details / gpt.h
1 //$$CDS-header$$1
2
3 #ifndef CDSLIB_URCU_DETAILS_GPT_H
4 #define CDSLIB_URCU_DETAILS_GPT_H
5
6 #include <mutex>    //unique_lock
7 #include <limits>
8 #include <cds/urcu/details/gp.h>
9 #include <cds/urcu/dispose_thread.h>
10 #include <cds/algo/backoff_strategy.h>
11 #include <cds/container/vyukov_mpmc_cycle_queue.h>
12
13 namespace cds { namespace urcu {
14
15     /// User-space general-purpose RCU with deferred threaded reclamation
16     /**
17         @headerfile cds/urcu/general_threaded.h
18
19         This implementation is similar to \ref general_buffered but separate thread is created
20         for deleting the retired objects. Like \p %general_buffered, the class contains an internal buffer
21         where retired objects are accumulated. When the buffer becomes full,
22         the RCU \p synchronize() function is called that waits until all reader/updater threads end up their read-side critical sections,
23         i.e. until the RCU quiescent state will come. After that the "work ready" message is sent to reclamation thread.
24         The reclamation thread frees the buffer.
25         This synchronization cycle may be called in any thread that calls \p retire_ptr() function.
26
27         There is a wrapper \ref cds_urcu_general_threaded_gc "gc<general_threaded>" for \p %general_threaded class
28         that provides unified RCU interface. You should use this wrapper class instead \p %general_threaded
29
30         The \p Buffer contains items of \ref cds_urcu_retired_ptr "epoch_retired_ptr" type
31
32         and it should support a multiple producer/single consumer queue with the following interface:
33         - <tt> bool push( epoch_retired_ptr& p ) </tt> - places the retired pointer \p p into queue. If the function
34         returns \p false it means that the buffer is full and RCU synchronization cycle must be processed.
35         - <tt>epoch_retired_ptr * front() </tt> - returns a pointer to the top element or \p nullptr if the buffer is empty.
36         - <tt>bool pop_front() </tt> - pops the top element; returns \p false if the buffer is empty.
37         - <tt>size_t size()</tt> - returns queue's item count.
38
39         The buffer is considered as full if \p push() returns \p false or the buffer size reaches the RCU threshold.
40
41         Template arguments:
42         - \p Buffer - MPSC (muliple producer/single consumer) buffer type with FIFO semantics.
43
44             Default is \p cds::container::VyukovMPSCCycleQueue. The buffer contains the objects of \ref epoch_retired_ptr
45             type that contains additional \p m_nEpoch field. This field specifies an epoch when the object
46             has been placed into the buffer. The \p %general_threaded object has a global epoch counter
47             that is incremented on each \p synchronize() call. The epoch is used internally to prevent early deletion.
48         - \p Lock - mutex type, default is \p std::mutex
49         - \p DisposerThread - the reclamation thread class. Default is \ref cds::urcu::dispose_thread,
50             see the description of this class for required interface.
51         - \p Backoff - back-off schema, default is cds::backoff::Default
52     */
53     template <
54         class Buffer = cds::container::VyukovMPSCCycleQueue< epoch_retired_ptr >
55         ,class Lock = std::mutex
56         ,class DisposerThread = dispose_thread<Buffer>
57         ,class Backoff = cds::backoff::Default
58     >
59     class general_threaded: public details::gp_singleton< general_threaded_tag >
60     {
61         //@cond
62         typedef details::gp_singleton< general_threaded_tag > base_class;
63         //@endcond
64     public:
65         typedef Buffer          buffer_type ;   ///< Buffer type
66         typedef Lock            lock_type   ;   ///< Lock type
67         typedef Backoff         back_off    ;   ///< Back-off scheme
68         typedef DisposerThread  disposer_thread ;   ///< Disposer thread type
69
70         typedef general_threaded_tag    rcu_tag ;       ///< Thread-side RCU part
71         typedef base_class::thread_gc   thread_gc ;     ///< Access lock class
72         typedef typename thread_gc::scoped_lock scoped_lock ; ///< Access lock class
73
74         static bool const c_bBuffered = true ; ///< This RCU buffers disposed elements
75
76     protected:
77         //@cond
78         typedef details::gp_singleton_instance< rcu_tag >    singleton_ptr;
79
80         struct scoped_disposer {
81             void operator ()( general_threaded * p )
82             {
83                 delete p;
84             }
85         };
86         //@endcond
87
88     protected:
89         //@cond
90         buffer_type               m_Buffer;
91         atomics::atomic<uint64_t> m_nCurEpoch;
92         lock_type                 m_Lock;
93         size_t const              m_nCapacity;
94         disposer_thread           m_DisposerThread;
95         //@endcond
96
97     public:
98         /// Returns singleton instance
99         static general_threaded * instance()
100         {
101             return static_cast<general_threaded *>( base_class::instance() );
102         }
103         /// Checks if the singleton is created and ready to use
104         static bool isUsed()
105         {
106             return singleton_ptr::s_pRCU != nullptr;
107         }
108
109     protected:
110         //@cond
111         general_threaded( size_t nBufferCapacity )
112             : m_Buffer( nBufferCapacity )
113             , m_nCurEpoch( 1 )
114             , m_nCapacity( nBufferCapacity )
115         {}
116
117         void flip_and_wait()
118         {
119             back_off bkoff;
120             base_class::flip_and_wait( bkoff );
121         }
122
123         // Return: true - synchronize has been called, false - otherwise
124         bool push_buffer( epoch_retired_ptr&& p )
125         {
126             bool bPushed = m_Buffer.push( p );
127             if ( !bPushed || m_Buffer.size() >= capacity() ) {
128                 synchronize();
129                 if ( !bPushed )
130                     p.free();
131                 return true;
132             }
133             return false;
134         }
135
136         //@endcond
137
138     public:
139         //@cond
140         ~general_threaded()
141         {}
142         //@endcond
143
144         /// Creates singleton object and starts reclamation thread
145         /**
146             The \p nBufferCapacity parameter defines RCU threshold.
147         */
148         static void Construct( size_t nBufferCapacity = 256 )
149         {
150             if ( !singleton_ptr::s_pRCU ) {
151                 std::unique_ptr< general_threaded, scoped_disposer > pRCU( new general_threaded( nBufferCapacity ) );
152                 pRCU->m_DisposerThread.start();
153
154                 singleton_ptr::s_pRCU = pRCU.release();
155             }
156         }
157
158         /// Destroys singleton object and terminates internal reclamation thread
159         static void Destruct( bool bDetachAll = false )
160         {
161             if ( isUsed() ) {
162                 general_threaded * pThis = instance();
163                 if ( bDetachAll )
164                     pThis->m_ThreadList.detach_all();
165
166                 pThis->m_DisposerThread.stop( pThis->m_Buffer, std::numeric_limits< uint64_t >::max());
167
168                 delete pThis;
169                 singleton_ptr::s_pRCU = nullptr;
170             }
171         }
172
173     public:
174         /// Retires \p p pointer
175         /**
176             The method pushes \p p pointer to internal buffer.
177             When the buffer becomes full \ref synchronize function is called
178             to wait for the end of grace period and then
179             a message is sent to the reclamation thread.
180         */
181         virtual void retire_ptr( retired_ptr& p )
182         {
183             if ( p.m_p )
184                 push_buffer( epoch_retired_ptr( p, m_nCurEpoch.load( atomics::memory_order_acquire )));
185         }
186
187         /// Retires the pointer chain [\p itFirst, \p itLast)
188         template <typename ForwardIterator>
189         void batch_retire( ForwardIterator itFirst, ForwardIterator itLast )
190         {
191             uint64_t nEpoch = m_nCurEpoch.load( atomics::memory_order_acquire );
192             while ( itFirst != itLast ) {
193                 epoch_retired_ptr ep( *itFirst, nEpoch );
194                 ++itFirst;
195                 push_buffer( std::move(ep));
196             }
197         }
198
199         /// Retires the pointer chain until \p Func returns \p nullptr retired pointer
200         template <typename Func>
201         void batch_retire( Func e )
202         {
203             uint64_t nEpoch = m_nCurEpoch.load( atomics::memory_order_acquire );
204             for ( retired_ptr p{ e() }; p.m_p; ) {
205                 epoch_retired_ptr ep( p, nEpoch );
206                 p = e();
207                 push_buffer( std::move(ep));
208             }
209         }
210
211         /// Waits to finish a grace period and calls disposing thread
212         void synchronize()
213         {
214             synchronize( false );
215         }
216
217         //@cond
218         void synchronize( bool bSync )
219         {
220             uint64_t nPrevEpoch = m_nCurEpoch.fetch_add( 1, atomics::memory_order_release );
221             {
222                 std::unique_lock<lock_type> sl( m_Lock );
223                 flip_and_wait();
224                 flip_and_wait();
225             }
226             m_DisposerThread.dispose( m_Buffer, nPrevEpoch, bSync );
227         }
228         void force_dispose()
229         {
230             synchronize( true );
231         }
232         //@endcond
233
234         /// Returns the threshold of internal buffer
235         size_t capacity() const
236         {
237             return m_nCapacity;
238         }
239     };
240 }} // namespace cds::urcu
241
242 #endif // #ifndef CDSLIB_URCU_DETAILS_GPT_H