Move libcds 1.6.0 from SVN
[libcds.git] / cds / urcu / dispose_thread.h
1 //$$CDS-header$$1
2
3 #ifndef _CDS_URCU_DISPOSE_THREAD_H
4 #define _CDS_URCU_DISPOSE_THREAD_H
5
6 //#include <cds/backoff_strategy.h>
7 #include <cds/details/std/thread.h>
8 #include <cds/details/std/mutex.h>
9 #include <cds/details/std/condition_variable.h>
10 #include <cds/details/std/memory.h>     // unique_ptr
11 #include <cds/details/aligned_type.h>
12
13 namespace cds { namespace urcu {
14
15     /// Reclamation thread for \p general_threaded and \p signal_threaded URCU
16     /**
17         The object of this class contains a reclamation thread object and
18         necessary synchronization object(s). The object manages reclamation thread
19         and defines a set of messages (i.e. methods) to communicate with the thread.
20
21         Template argument \p Buffer defines the buffer type of \ref general_threaded (or \ref signal_threaded) URCU.
22     */
23     template <class Buffer>
24     class dispose_thread
25     {
26     public:
27         typedef Buffer  buffer_type ;   ///< Buffer type
28     private:
29         //@cond
30         typedef cds_std::thread                     thread_type;
31         typedef cds_std::mutex                      mutex_type;
32         typedef cds_std::condition_variable         condvar_type;
33         typedef cds_std::unique_lock< mutex_type >  unique_lock;
34
35         class dispose_thread_starter: public thread_type
36         {
37             static void thread_func( dispose_thread * pThis )
38             {
39                 pThis->execute();
40             }
41
42         public:
43             dispose_thread_starter( dispose_thread * pThis )
44                 : thread_type( thread_func, pThis )
45             {}
46         };
47
48         typedef char thread_placeholder[ sizeof(dispose_thread_starter) ];
49         typename cds::details::aligned_type< thread_placeholder, alignof( dispose_thread_starter ) >::type  m_threadPlaceholder;
50         dispose_thread_starter *                m_DisposeThread;
51
52         // synchronization with disposing thread
53         mutex_type      m_Mutex;
54         condvar_type    m_cvDataReady;
55
56         // Task for thread (dispose cycle)
57         buffer_type * volatile  m_pBuffer;
58         uint64_t volatile       m_nCurEpoch;
59
60         // Quit flag
61         bool volatile           m_bQuit;
62
63         // disposing pass sync
64         condvar_type            m_cvReady;
65         bool volatile           m_bReady;
66         //@endcond
67
68     private: // methods called from disposing thread
69         //@cond
70         void execute()
71         {
72             buffer_type *   pBuffer;
73             uint64_t        nCurEpoch;
74             bool            bQuit = false;
75
76             while ( !bQuit ) {
77                 {
78                     unique_lock lock( m_Mutex );
79
80                     // signal that we are ready to dispose
81                     m_bReady = true;
82                     m_cvReady.notify_one();
83
84                     // wait new data portion
85                     while ( !m_pBuffer )
86                         m_cvDataReady.wait( lock );
87
88                     // New work is ready
89                     m_bReady = false ;   // we are busy
90
91                     bQuit = m_bQuit;
92                     nCurEpoch = m_nCurEpoch;
93                     pBuffer = m_pBuffer;
94                     m_pBuffer = null_ptr<buffer_type *>();
95                 }
96
97                 if ( pBuffer )
98                     dispose_buffer( pBuffer, nCurEpoch );
99             }
100         }
101
102         void dispose_buffer( buffer_type * pBuf, uint64_t nCurEpoch )
103         {
104             epoch_retired_ptr p;
105             while ( pBuf->pop( p ) ) {
106                 if ( p.m_nEpoch <= nCurEpoch )
107                     p.free();
108                 else {
109                     pBuf->push( p );
110                     break;
111                 }
112             }
113         }
114         //@endcond
115
116     public:
117         //@cond
118         dispose_thread()
119             : m_pBuffer( null_ptr<buffer_type *>() )
120             , m_nCurEpoch(0)
121             , m_bQuit( false )
122             , m_bReady( false )
123         {}
124         //@endcond
125
126     public: // methods called from any thread
127         /// Start reclamation thread
128         /**
129             This function is called by \ref general_threaded object to start
130             internal reclamation thread.
131         */
132         void start()
133         {
134             m_DisposeThread = new (m_threadPlaceholder) dispose_thread_starter( this );
135         }
136
137         /// Stop reclamation thread
138         /**
139             This function is called by \ref general_threaded object to
140             start reclamation cycle and then to terminate reclamation thread.
141
142             \p buf buffer contains retired objects ready to free.
143         */
144         void stop( buffer_type& buf, uint64_t nCurEpoch )
145         {
146             {
147                 unique_lock lock( m_Mutex );
148
149                 // wait while retiring pass done
150                 while ( !m_bReady )
151                     m_cvReady.wait( lock );
152
153                 // give a new work and set stop flag
154                 m_pBuffer = &buf;
155                 m_nCurEpoch = nCurEpoch;
156                 m_bQuit = true;
157             }
158             m_cvDataReady.notify_one();
159
160             m_DisposeThread->join();
161         }
162
163         /// Start reclamation cycle
164         /**
165             This function is called by \ref general_threaded object
166             to notify the reclamation thread about new work.
167             \p buf buffer contains retired objects ready to free.
168             The reclamation thread should free all \p buf objects
169             \p m_nEpoch field of which is no more than \p nCurEpoch.
170
171             If \p bSync parameter is \p true the calling thread should
172             wait until disposing done.
173         */
174         void dispose( buffer_type& buf, uint64_t nCurEpoch, bool bSync )
175         {
176             unique_lock lock( m_Mutex );
177
178             // wait while disposing pass done
179             while ( !m_bReady )
180                 m_cvReady.wait( lock );
181
182             if ( bSync )
183                 m_bReady = false;
184
185             // new work
186             m_nCurEpoch = nCurEpoch;
187             m_pBuffer = &buf;
188
189             m_cvDataReady.notify_one();
190
191             if ( bSync ) {
192                 while ( !m_bReady )
193                     m_cvReady.wait( lock );
194             }
195         }
196     };
197 }} // namespace cds::urcu
198
199 #endif // #ifdef _CDS_URCU_DISPOSE_THREAD_H