2 This file is a part of libcds - Concurrent Data Structures library
4 (C) Copyright Maxim Khizhinsky (libcds.dev@gmail.com) 2006-2017
6 Source code repo: http://github.com/khizmax/libcds/
7 Download: http://sourceforge.net/projects/libcds/files/
9 Redistribution and use in source and binary forms, with or without
10 modification, are permitted provided that the following conditions are met:
12 * Redistributions of source code must retain the above copyright notice, this
13 list of conditions and the following disclaimer.
15 * Redistributions in binary form must reproduce the above copyright notice,
16 this list of conditions and the following disclaimer in the documentation
17 and/or other materials provided with the distribution.
19 THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
20 AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
21 IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
22 DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE
23 FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
24 DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
25 SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
26 CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,
27 OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
28 OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
31 #ifndef CDSLIB_URCU_DISPOSE_THREAD_H
32 #define CDSLIB_URCU_DISPOSE_THREAD_H
37 #include <condition_variable>
38 #include <cds/details/aligned_type.h>
39 #include <cds/algo/atomic.h>
41 namespace cds { namespace urcu {
43 /// Reclamation thread for \p general_threaded URCU
45 The object of this class contains a reclamation thread object and
46 necessary synchronization object(s). The object manages reclamation thread
47 and defines a set of messages (i.e. methods) to communicate with the thread.
49 Template argument \p Buffer defines the buffer type of \ref general_threaded URCU.
51 template <class Buffer>
55 typedef Buffer buffer_type ; ///< Buffer type
58 typedef std::thread thread_type;
59 typedef std::mutex mutex_type;
60 typedef std::condition_variable condvar_type;
61 typedef std::unique_lock< mutex_type > unique_lock;
63 class dispose_thread_starter: public thread_type
65 static void thread_func( dispose_thread * pThis )
71 dispose_thread_starter( dispose_thread * pThis )
72 : thread_type( thread_func, pThis )
76 typedef char thread_placeholder[ sizeof(dispose_thread_starter) ];
77 typename cds::details::aligned_type< thread_placeholder, alignof( dispose_thread_starter ) >::type m_threadPlaceholder;
78 dispose_thread_starter * m_DisposeThread;
80 // synchronization with disposing thread
82 condvar_type m_cvDataReady;
84 // Task for thread (dispose cycle)
85 atomics::atomic<buffer_type *> m_pBuffer{ nullptr };
86 uint64_t m_nCurEpoch = 0;
91 // disposing pass sync
92 condvar_type m_cvReady;
93 bool m_bReady = false;
96 private: // methods called from disposing thread
100 buffer_type * pBuffer;
106 // signal that we are ready to dispose
108 unique_lock lock( m_Mutex );
111 m_cvReady.notify_one();
114 // wait new data portion
115 unique_lock lock( m_Mutex );
117 while ( (pBuffer = m_pBuffer.load( atomics::memory_order_relaxed )) == nullptr )
118 m_cvDataReady.wait( lock );
121 m_bReady = false; // we are busy
124 nCurEpoch = m_nCurEpoch;
125 m_pBuffer.store( nullptr, atomics::memory_order_relaxed );
129 dispose_buffer( pBuffer, nCurEpoch );
133 void dispose_buffer( buffer_type * pBuf, uint64_t nCurEpoch )
135 epoch_retired_ptr * p;
136 while ( ( p = pBuf->front()) != nullptr ) {
137 if ( p->m_nEpoch <= nCurEpoch ) {
139 CDS_VERIFY( pBuf->pop_front());
147 public: // methods called from any thread
148 /// Start reclamation thread
150 This function is called by \ref general_threaded object to start
151 internal reclamation thread.
155 m_DisposeThread = new (m_threadPlaceholder) dispose_thread_starter( this );
158 /// Stop reclamation thread
160 This function is called by \ref general_threaded object to
161 start reclamation cycle and then to terminate reclamation thread.
163 \p buf buffer contains retired objects ready to free.
165 void stop( buffer_type& buf, uint64_t nCurEpoch )
168 unique_lock lock( m_Mutex );
170 // wait while retiring pass done
172 m_cvReady.wait( lock );
174 // give a new work and set stop flag
175 m_nCurEpoch = nCurEpoch;
176 m_pBuffer.store( &buf, atomics::memory_order_relaxed );
179 m_cvDataReady.notify_one();
181 m_DisposeThread->join();
184 /// Start reclamation cycle
186 This function is called by \ref general_threaded object
187 to notify the reclamation thread about a new work.
188 \p buf buffer contains retired objects ready to free.
189 The reclamation thread should free all \p buf objects
190 \p m_nEpoch field of which is no more than \p nCurEpoch.
192 If \p bSync parameter is \p true the calling thread should
193 wait until disposing done.
195 void dispose( buffer_type& buf, uint64_t nCurEpoch, bool bSync )
198 unique_lock lock( m_Mutex );
200 // wait while disposing pass done
202 m_cvReady.wait( lock );
206 m_nCurEpoch = nCurEpoch;
207 m_pBuffer.store( &buf, atomics::memory_order_relaxed );
209 m_cvDataReady.notify_one();
212 unique_lock lock( m_Mutex );
214 m_cvReady.wait( lock );
218 }} // namespace cds::urcu
220 #endif // #ifdef CDSLIB_URCU_DISPOSE_THREAD_H