Fixed memory leaks in threaded URCU (general_threaded, signal_threaded)
[libcds.git] / cds / urcu / details / gpt.h
1 //$$CDS-header$$1
2
3 #ifndef CDSLIB_URCU_DETAILS_GPT_H
4 #define CDSLIB_URCU_DETAILS_GPT_H
5
6 #include <mutex>    //unique_lock
7 #include <limits>
8 #include <cds/urcu/details/gp.h>
9 #include <cds/urcu/dispose_thread.h>
10 #include <cds/algo/backoff_strategy.h>
11 #include <cds/container/vyukov_mpmc_cycle_queue.h>
12
13 namespace cds { namespace urcu {
14
15     /// User-space general-purpose RCU with deferred threaded reclamation
16     /**
17         @headerfile cds/urcu/general_threaded.h
18
19         This implementation is similar to \ref general_buffered but separate thread is created
20         for deleting the retired objects. Like \p %general_buffered, the class contains an internal buffer
21         where retired objects are accumulated. When the buffer becomes full,
22         the RCU \p synchronize function is called that waits until all reader/updater threads end up their read-side critical sections,
23         i.e. until the RCU quiescent state will come. After that the "work ready" message is sent to reclamation tread.
24         The reclamation thread frees the buffer.
25         This synchronization cycle may be called in any thread that calls \ref retire_ptr function.
26
27         There is a wrapper \ref cds_urcu_general_threaded_gc "gc<general_threaded>" for \p %general_threaded class
28         that provides unified RCU interface. You should use this wrapper class instead \p %general_threaded
29
30         Template arguments:
31         - \p Buffer - buffer type with FIFO semantics. Default is cds::container::VyukovMPMCCycleQueue. See \ref general_buffered
32             for description of buffer's interface. The buffer contains the objects of \ref epoch_retired_ptr
33             type that contains additional \p m_nEpoch field. This field specifies an epoch when the object
34             has been placed into the buffer. The \p %general_threaded object has a global epoch counter
35             that is incremented on each \p synchronize call. The epoch is used internally to prevent early deletion.
36         - \p Lock - mutex type, default is \p std::mutex
37         - \p DisposerThread - the reclamation thread class. Default is \ref cds::urcu::dispose_thread,
38             see the description of this class for required interface.
39         - \p Backoff - back-off schema, default is cds::backoff::Default
40     */
41     template <
42         class Buffer = cds::container::VyukovMPMCCycleQueue< epoch_retired_ptr >
43         ,class Lock = std::mutex
44         ,class DisposerThread = dispose_thread<Buffer>
45         ,class Backoff = cds::backoff::Default
46     >
47     class general_threaded: public details::gp_singleton< general_threaded_tag >
48     {
49         //@cond
50         typedef details::gp_singleton< general_threaded_tag > base_class;
51         //@endcond
52     public:
53         typedef Buffer          buffer_type ;   ///< Buffer type
54         typedef Lock            lock_type   ;   ///< Lock type
55         typedef Backoff         back_off    ;   ///< Back-off scheme
56         typedef DisposerThread  disposer_thread ;   ///< Disposer thread type
57
58         typedef general_threaded_tag    rcu_tag ;       ///< Thread-side RCU part
59         typedef base_class::thread_gc   thread_gc ;     ///< Access lock class
60         typedef typename thread_gc::scoped_lock scoped_lock ; ///< Access lock class
61
62         static bool const c_bBuffered = true ; ///< This RCU buffers disposed elements
63
64     protected:
65         //@cond
66         typedef details::gp_singleton_instance< rcu_tag >    singleton_ptr;
67
68         struct scoped_disposer {
69             void operator ()( general_threaded * p )
70             {
71                 delete p;
72             }
73         };
74         //@endcond
75
76     protected:
77         //@cond
78         buffer_type               m_Buffer;
79         atomics::atomic<uint64_t> m_nCurEpoch;
80         lock_type                 m_Lock;
81         size_t const              m_nCapacity;
82         disposer_thread           m_DisposerThread;
83         //@endcond
84
85     public:
86         /// Returns singleton instance
87         static general_threaded * instance()
88         {
89             return static_cast<general_threaded *>( base_class::instance() );
90         }
91         /// Checks if the singleton is created and ready to use
92         static bool isUsed()
93         {
94             return singleton_ptr::s_pRCU != nullptr;
95         }
96
97     protected:
98         //@cond
99         general_threaded( size_t nBufferCapacity )
100             : m_Buffer( nBufferCapacity )
101             , m_nCurEpoch( 1 )
102             , m_nCapacity( nBufferCapacity )
103         {}
104
105         void flip_and_wait()
106         {
107             back_off bkoff;
108             base_class::flip_and_wait( bkoff );
109         }
110
111         // Return: true - synchronize has been called, false - otherwise
112         bool push_buffer( epoch_retired_ptr&& p )
113         {
114             bool bPushed = m_Buffer.push( p );
115             if ( !bPushed || m_Buffer.size() >= capacity() ) {
116                 synchronize();
117                 if ( !bPushed ) {
118                     p.free();
119                 }
120                 return true;
121             }
122             return false;
123         }
124
125         //@endcond
126
127     public:
128         //@cond
129         ~general_threaded()
130         {}
131         //@endcond
132
133         /// Creates singleton object and starts reclamation thread
134         /**
135             The \p nBufferCapacity parameter defines RCU threshold.
136         */
137         static void Construct( size_t nBufferCapacity = 256 )
138         {
139             if ( !singleton_ptr::s_pRCU ) {
140                 std::unique_ptr< general_threaded, scoped_disposer > pRCU( new general_threaded( nBufferCapacity ) );
141                 pRCU->m_DisposerThread.start();
142
143                 singleton_ptr::s_pRCU = pRCU.release();
144             }
145         }
146
147         /// Destroys singleton object and terminates internal reclamation thread
148         static void Destruct( bool bDetachAll = false )
149         {
150             if ( isUsed() ) {
151                 general_threaded * pThis = instance();
152                 if ( bDetachAll )
153                     pThis->m_ThreadList.detach_all();
154
155                 pThis->m_DisposerThread.stop( pThis->m_Buffer, std::numeric_limits< uint64_t >::max());
156
157                 delete pThis;
158                 singleton_ptr::s_pRCU = nullptr;
159             }
160         }
161
162     public:
163         /// Retires \p p pointer
164         /**
165             The method pushes \p p pointer to internal buffer.
166             When the buffer becomes full \ref synchronize function is called
167             to wait for the end of grace period and then
168             a message is sent to the reclamation thread.
169         */
170         virtual void retire_ptr( retired_ptr& p )
171         {
172             if ( p.m_p )
173                 push_buffer( epoch_retired_ptr( p, m_nCurEpoch.load( atomics::memory_order_acquire )));
174         }
175
176         /// Retires the pointer chain [\p itFirst, \p itLast)
177         template <typename ForwardIterator>
178         void batch_retire( ForwardIterator itFirst, ForwardIterator itLast )
179         {
180             uint64_t nEpoch = m_nCurEpoch.load( atomics::memory_order_acquire );
181             while ( itFirst != itLast ) {
182                 epoch_retired_ptr ep( *itFirst, nEpoch );
183                 ++itFirst;
184                 push_buffer( std::move(ep));
185             }
186         }
187
188         /// Retires the pointer chain until \p Func returns \p nullptr retired pointer
189         template <typename Func>
190         void batch_retire( Func e )
191         {
192             uint64_t nEpoch = m_nCurEpoch.load( atomics::memory_order_acquire );
193             for ( retired_ptr p{ e() }; p.m_p; ) {
194                 epoch_retired_ptr ep( p, nEpoch );
195                 p = e();
196                 push_buffer( std::move(ep));
197             }
198         }
199
200
201         /// Waits to finish a grace period and calls disposing thread
202         void synchronize()
203         {
204             synchronize( false );
205         }
206
207         //@cond
208         void synchronize( bool bSync )
209         {
210             uint64_t nPrevEpoch = m_nCurEpoch.fetch_add( 1, atomics::memory_order_release );
211
212             {
213                 std::unique_lock<lock_type> sl( m_Lock );
214                 flip_and_wait();
215                 flip_and_wait();
216             }
217
218             m_DisposerThread.dispose( m_Buffer, nPrevEpoch, bSync );
219         }
220         void force_dispose()
221         {
222             synchronize( true );
223         }
224         //@endcond
225
226         /// Returns the threshold of internal buffer
227         size_t capacity() const
228         {
229             return m_nCapacity;
230         }
231     };
232 }} // namespace cds::urcu
233
234 #endif // #ifndef CDSLIB_URCU_DETAILS_GPT_H