Added possibility to remove node's value via RCU disposing cycle
[libcds.git] / cds / urcu / dispose_thread.h
1 //$$CDS-header$$1
2
3 #ifndef CDSLIB_URCU_DISPOSE_THREAD_H
4 #define CDSLIB_URCU_DISPOSE_THREAD_H
5
6 #include <memory>
7 #include <thread>
8 #include <mutex>
9 #include <condition_variable>
10 #include <cds/details/aligned_type.h>
11
12 namespace cds { namespace urcu {
13
14     /// Reclamation thread for \p general_threaded and \p signal_threaded URCU
15     /**
16         The object of this class contains a reclamation thread object and
17         necessary synchronization object(s). The object manages reclamation thread
18         and defines a set of messages (i.e. methods) to communicate with the thread.
19
20         Template argument \p Buffer defines the buffer type of \ref general_threaded (or \ref signal_threaded) URCU.
21     */
22     template <class Buffer>
23     class dispose_thread
24     {
25     public:
26         typedef Buffer  buffer_type ;   ///< Buffer type
27     private:
28         //@cond
29         typedef std::thread                 thread_type;
30         typedef std::mutex              mutex_type;
31         typedef std::condition_variable condvar_type;
32         typedef std::unique_lock< mutex_type >  unique_lock;
33
34         class dispose_thread_starter: public thread_type
35         {
36             static void thread_func( dispose_thread * pThis )
37             {
38                 pThis->execute();
39             }
40
41         public:
42             dispose_thread_starter( dispose_thread * pThis )
43                 : thread_type( thread_func, pThis )
44             {}
45         };
46
47         typedef char thread_placeholder[ sizeof(dispose_thread_starter) ];
48         typename cds::details::aligned_type< thread_placeholder, alignof( dispose_thread_starter ) >::type  m_threadPlaceholder;
49         dispose_thread_starter *                m_DisposeThread;
50
51         // synchronization with disposing thread
52         mutex_type      m_Mutex;
53         condvar_type    m_cvDataReady;
54
55         // Task for thread (dispose cycle)
56         buffer_type * volatile  m_pBuffer;
57         uint64_t volatile       m_nCurEpoch;
58
59         // Quit flag
60         bool volatile           m_bQuit;
61
62         // disposing pass sync
63         condvar_type            m_cvReady;
64         bool volatile           m_bReady;
65         //@endcond
66
67     private: // methods called from disposing thread
68         //@cond
69         void execute()
70         {
71             buffer_type *   pBuffer;
72             uint64_t        nCurEpoch;
73             bool            bQuit = false;
74
75             while ( !bQuit ) {
76                 {
77                     unique_lock lock( m_Mutex );
78
79                     // signal that we are ready to dispose
80                     m_bReady = true;
81                     m_cvReady.notify_one();
82
83                     // wait new data portion
84                     while ( !m_pBuffer )
85                         m_cvDataReady.wait( lock );
86
87                     // New work is ready
88                     m_bReady = false ;   // we are busy
89
90                     bQuit = m_bQuit;
91                     nCurEpoch = m_nCurEpoch;
92                     pBuffer = m_pBuffer;
93                     m_pBuffer = nullptr;
94                 }
95
96                 if ( pBuffer )
97                     dispose_buffer( pBuffer, nCurEpoch );
98             }
99         }
100
101         void dispose_buffer( buffer_type * pBuf, uint64_t nCurEpoch )
102         {
103             epoch_retired_ptr p;
104             while ( pBuf->pop( p ) ) {
105                 if ( p.m_nEpoch <= nCurEpoch )
106                     p.free();
107                 else {
108                     pBuf->push( p );
109                     break;
110                 }
111             }
112         }
113         //@endcond
114
115     public:
116         //@cond
117         dispose_thread()
118             : m_pBuffer( nullptr )
119             , m_nCurEpoch(0)
120             , m_bQuit( false )
121             , m_bReady( false )
122         {}
123         //@endcond
124
125     public: // methods called from any thread
126         /// Start reclamation thread
127         /**
128             This function is called by \ref general_threaded object to start
129             internal reclamation thread.
130         */
131         void start()
132         {
133             m_DisposeThread = new (m_threadPlaceholder) dispose_thread_starter( this );
134         }
135
136         /// Stop reclamation thread
137         /**
138             This function is called by \ref general_threaded object to
139             start reclamation cycle and then to terminate reclamation thread.
140
141             \p buf buffer contains retired objects ready to free.
142         */
143         void stop( buffer_type& buf, uint64_t nCurEpoch )
144         {
145             {
146                 unique_lock lock( m_Mutex );
147
148                 // wait while retiring pass done
149                 while ( !m_bReady )
150                     m_cvReady.wait( lock );
151
152                 // give a new work and set stop flag
153                 m_pBuffer = &buf;
154                 m_nCurEpoch = nCurEpoch;
155                 m_bQuit = true;
156             }
157             m_cvDataReady.notify_one();
158
159             m_DisposeThread->join();
160         }
161
162         /// Start reclamation cycle
163         /**
164             This function is called by \ref general_threaded object
165             to notify the reclamation thread about new work.
166             \p buf buffer contains retired objects ready to free.
167             The reclamation thread should free all \p buf objects
168             \p m_nEpoch field of which is no more than \p nCurEpoch.
169
170             If \p bSync parameter is \p true the calling thread should
171             wait until disposing done.
172         */
173         void dispose( buffer_type& buf, uint64_t nCurEpoch, bool bSync )
174         {
175             unique_lock lock( m_Mutex );
176
177             // wait while disposing pass done
178             while ( !m_bReady )
179                 m_cvReady.wait( lock );
180
181             if ( bSync )
182                 m_bReady = false;
183
184             // new work
185             m_nCurEpoch = nCurEpoch;
186             m_pBuffer = &buf;
187
188             m_cvDataReady.notify_one();
189
190             if ( bSync ) {
191                 while ( !m_bReady )
192                     m_cvReady.wait( lock );
193             }
194         }
195     };
196 }} // namespace cds::urcu
197
198 #endif // #ifdef CDSLIB_URCU_DISPOSE_THREAD_H