2 This file is a part of libcds - Concurrent Data Structures library
4 (C) Copyright Maxim Khizhinsky (libcds.dev@gmail.com) 2006-2016
6 Source code repo: http://github.com/khizmax/libcds/
7 Download: http://sourceforge.net/projects/libcds/files/
9 Redistribution and use in source and binary forms, with or without
10 modification, are permitted provided that the following conditions are met:
12 * Redistributions of source code must retain the above copyright notice, this
13 list of conditions and the following disclaimer.
15 * Redistributions in binary form must reproduce the above copyright notice,
16 this list of conditions and the following disclaimer in the documentation
17 and/or other materials provided with the distribution.
19 THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
20 AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
21 IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
22 DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE
23 FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
24 DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
25 SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
26 CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,
27 OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
28 OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
31 #ifndef CDSLIB_URCU_DISPOSE_THREAD_H
32 #define CDSLIB_URCU_DISPOSE_THREAD_H
37 #include <condition_variable>
38 #include <cds/details/aligned_type.h>
39 #include <cds/algo/atomic.h>
41 namespace cds { namespace urcu {
43 /// Reclamation thread for \p general_threaded and \p signal_threaded URCU
45 The object of this class contains a reclamation thread object and
46 necessary synchronization object(s). The object manages reclamation thread
47 and defines a set of messages (i.e. methods) to communicate with the thread.
49 Template argument \p Buffer defines the buffer type of \ref general_threaded (or \ref signal_threaded) URCU.
51 template <class Buffer>
55 typedef Buffer buffer_type ; ///< Buffer type
58 typedef std::thread thread_type;
59 typedef std::mutex mutex_type;
60 typedef std::condition_variable condvar_type;
61 typedef std::unique_lock< mutex_type > unique_lock;
63 class dispose_thread_starter: public thread_type
65 static void thread_func( dispose_thread * pThis )
71 dispose_thread_starter( dispose_thread * pThis )
72 : thread_type( thread_func, pThis )
76 typedef char thread_placeholder[ sizeof(dispose_thread_starter) ];
77 typename cds::details::aligned_type< thread_placeholder, alignof( dispose_thread_starter ) >::type m_threadPlaceholder;
78 dispose_thread_starter * m_DisposeThread;
80 // synchronization with disposing thread
82 condvar_type m_cvDataReady;
84 // Task for thread (dispose cycle)
85 atomics::atomic<buffer_type *> m_pBuffer;
86 uint64_t volatile m_nCurEpoch;
89 atomics::atomic<bool> m_bQuit;
91 // disposing pass sync
92 condvar_type m_cvReady;
93 atomics::atomic<bool> m_bReady;
96 private: // methods called from disposing thread
100 buffer_type * pBuffer;
106 // signal that we are ready to dispose
108 unique_lock lock( m_Mutex );
109 m_bReady.store( true, atomics::memory_order_relaxed );
111 m_cvReady.notify_one();
114 // wait new data portion
115 unique_lock lock( m_Mutex );
117 while ( (pBuffer = m_pBuffer.load( atomics::memory_order_relaxed )) == nullptr )
118 m_cvDataReady.wait( lock );
121 m_bReady.store( false, atomics::memory_order_relaxed ); // we are busy
123 bQuit = m_bQuit.load( atomics::memory_order_relaxed );
124 nCurEpoch = m_nCurEpoch;
125 m_pBuffer.store( nullptr, atomics::memory_order_relaxed );
129 dispose_buffer( pBuffer, nCurEpoch );
133 void dispose_buffer( buffer_type * pBuf, uint64_t nCurEpoch )
135 epoch_retired_ptr * p;
136 while ( ( p = pBuf->front()) != nullptr ) {
137 if ( p->m_nEpoch <= nCurEpoch ) {
139 CDS_VERIFY( pBuf->pop_front() );
150 : m_pBuffer( nullptr )
157 public: // methods called from any thread
158 /// Start reclamation thread
160 This function is called by \ref general_threaded object to start
161 internal reclamation thread.
165 m_DisposeThread = new (m_threadPlaceholder) dispose_thread_starter( this );
168 /// Stop reclamation thread
170 This function is called by \ref general_threaded object to
171 start reclamation cycle and then to terminate reclamation thread.
173 \p buf buffer contains retired objects ready to free.
175 void stop( buffer_type& buf, uint64_t nCurEpoch )
178 unique_lock lock( m_Mutex );
180 // wait while retiring pass done
181 while ( !m_bReady.load( atomics::memory_order_relaxed ))
182 m_cvReady.wait( lock );
184 // give a new work and set stop flag
185 m_nCurEpoch = nCurEpoch;
186 m_pBuffer.store( &buf, atomics::memory_order_relaxed );
187 m_bQuit.store( true, atomics::memory_order_relaxed );
189 m_cvDataReady.notify_one();
191 m_DisposeThread->join();
194 /// Start reclamation cycle
196 This function is called by \ref general_threaded object
197 to notify the reclamation thread about a new work.
198 \p buf buffer contains retired objects ready to free.
199 The reclamation thread should free all \p buf objects
200 \p m_nEpoch field of which is no more than \p nCurEpoch.
202 If \p bSync parameter is \p true the calling thread should
203 wait until disposing done.
205 void dispose( buffer_type& buf, uint64_t nCurEpoch, bool bSync )
208 unique_lock lock( m_Mutex );
210 // wait while disposing pass done
211 while ( !m_bReady.load( atomics::memory_order_relaxed ))
212 m_cvReady.wait( lock );
215 m_bReady.store( false, atomics::memory_order_relaxed );
216 m_nCurEpoch = nCurEpoch;
217 m_pBuffer.store( &buf, atomics::memory_order_relaxed );
219 m_cvDataReady.notify_one();
222 unique_lock lock( m_Mutex );
223 while ( !m_bReady.load( atomics::memory_order_relaxed ))
224 m_cvReady.wait( lock );
228 }} // namespace cds::urcu
230 #endif // #ifdef CDSLIB_URCU_DISPOSE_THREAD_H