2 This file is a part of libcds - Concurrent Data Structures library
4 (C) Copyright Maxim Khizhinsky (libcds.dev@gmail.com) 2006-2017
6 Source code repo: http://github.com/khizmax/libcds/
7 Download: http://sourceforge.net/projects/libcds/files/
9 Redistribution and use in source and binary forms, with or without
10 modification, are permitted provided that the following conditions are met:
12 * Redistributions of source code must retain the above copyright notice, this
13 list of conditions and the following disclaimer.
15 * Redistributions in binary form must reproduce the above copyright notice,
16 this list of conditions and the following disclaimer in the documentation
17 and/or other materials provided with the distribution.
19 THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
20 AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
21 IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
22 DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE
23 FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
24 DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
25 SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
26 CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,
27 OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
28 OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
34 #include <cds/gc/dhp_smr.h>
35 #include <cds/os/thread.h>
37 namespace cds { namespace gc { namespace dhp {
40 void * default_alloc_memory( size_t size )
42 return new uintptr_t[( size + sizeof( uintptr_t ) - 1 ) / sizeof( uintptr_t )];
45 void default_free_memory( void* p )
47 delete[] reinterpret_cast<uintptr_t*>( p );
51 static size_t const c_extended_guard_block_size = 16;
54 void* ( *s_alloc_memory )( size_t size ) = default_alloc_memory;
55 void( *s_free_memory )( void* p ) = default_free_memory;
64 allocator( allocator const& ) {}
66 explicit allocator( allocator<U> const& ) {}
68 static T* allocate( size_t nCount )
70 return reinterpret_cast<T*>( s_alloc_memory( sizeof( value_type ) * nCount ));
73 static void deallocate( T* p, size_t /*nCount*/ )
75 s_free_memory( reinterpret_cast<void*>( p ));
81 /*static*/ CDS_EXPORT_API smr* smr::instance_ = nullptr;
82 thread_local thread_data* tls_ = nullptr;
84 CDS_EXPORT_API hp_allocator::~hp_allocator()
86 while ( guard_block* gp = static_cast<guard_block*>( free_list_.get())) {
92 CDS_EXPORT_API guard_block* hp_allocator::alloc()
95 auto block = free_list_.get();
97 gb = static_cast< guard_block* >( block );
100 gb = new( s_alloc_memory( sizeof( guard_block ) + sizeof( guard ) * defaults::c_extended_guard_block_size )) guard_block;
101 new ( gb->first() ) guard[defaults::c_extended_guard_block_size];
104 // links guards in the block
105 guard* p = gb->first();
106 for ( guard* last = p + defaults::c_extended_guard_block_size - 1; p != last; ++p ) {
107 p->clear( atomics::memory_order_relaxed );
116 CDS_EXPORT_API retired_allocator::~retired_allocator()
118 while ( retired_block* rb = static_cast<retired_block*>( free_list_.get() ) ) {
119 rb->~retired_block();
124 CDS_EXPORT_API retired_block* retired_allocator::alloc()
127 auto block = free_list_.get();
129 rb = static_cast< retired_block* >( block );
131 // allocate new block
132 rb = new( s_alloc_memory( sizeof( retired_block ) + sizeof( retired_ptr ) * retired_block::c_capacity )) retired_block;
133 new ( rb->first()) retired_ptr[retired_block::c_capacity];
140 struct smr::thread_record: thread_data
142 atomics::atomic<thread_record*> m_pNextNode; ///< next hazard ptr record in list
143 atomics::atomic<cds::OS::ThreadId> m_idOwner; ///< Owner thread id; 0 - the record is free (not owned)
144 atomics::atomic<bool> m_bFree; ///< true if record is free (not owned)
146 thread_record( guard* guards, size_t guard_count )
147 : thread_data( guards, guard_count )
152 /*static*/ CDS_EXPORT_API thread_data* smr::tls()
154 assert( tls_ != nullptr );
158 /*static*/ CDS_EXPORT_API void smr::set_memory_allocator(
159 void* ( *alloc_func )( size_t size ),
160 void( *free_func )( void * p )
163 // The memory allocation functions may be set BEFORE initializing DHP SMR!!!
164 assert( instance_ == nullptr );
166 s_alloc_memory = alloc_func;
167 s_free_memory = free_func;
170 /*static*/ CDS_EXPORT_API void smr::construct( size_t nInitialHazardPtrCount )
173 instance_ = new( s_alloc_memory( sizeof( smr ))) smr( nInitialHazardPtrCount );
177 /*static*/ CDS_EXPORT_API void smr::destruct( bool bDetachAll )
181 instance_->detach_all_thread();
184 s_free_memory( instance_ );
189 CDS_EXPORT_API smr::smr( size_t nInitialHazardPtrCount )
190 : thread_list_( nullptr )
191 , initial_hazard_count_( nInitialHazardPtrCount < 4 ? 16 : nInitialHazardPtrCount )
192 , last_plist_size_( initial_hazard_count_ * 64 )
195 CDS_EXPORT_API smr::~smr()
197 CDS_DEBUG_ONLY( const cds::OS::ThreadId nullThreadId = cds::OS::c_NullThreadId; )
198 CDS_DEBUG_ONLY( const cds::OS::ThreadId mainThreadId = cds::OS::get_current_thread_id(); )
200 thread_record* pHead = thread_list_.load( atomics::memory_order_relaxed );
201 thread_list_.store( nullptr, atomics::memory_order_relaxed );
203 thread_record* pNext = nullptr;
204 for ( thread_record* hprec = pHead; hprec; hprec = pNext )
206 assert( hprec->m_idOwner.load( atomics::memory_order_relaxed ) == nullThreadId
207 || hprec->m_idOwner.load( atomics::memory_order_relaxed ) == mainThreadId
208 || !cds::OS::is_thread_alive( hprec->m_idOwner.load( atomics::memory_order_relaxed ) )
211 retired_array& retired = hprec->retired_;
213 // delete retired data
214 for ( retired_block* block = retired.list_head_; block && block != retired.current_block_; block = block->next_ ) {
215 for ( retired_ptr* p = block->first(); p != block->last(); ++p )
218 if ( retired.current_block_ ) {
219 for ( retired_ptr* p = retired.current_block_->first(); p != retired.current_cell_; ++p )
222 hprec->retired_.fini();
223 hprec->hazards_.clear();
225 pNext = hprec->m_pNextNode.load( atomics::memory_order_relaxed );
226 hprec->m_bFree.store( true, atomics::memory_order_relaxed );
227 destroy_thread_data( hprec );
231 /*static*/ CDS_EXPORT_API void smr::attach_thread()
234 tls_ = instance().alloc_thread_data();
237 /*static*/ CDS_EXPORT_API void smr::detach_thread()
239 thread_data* rec = tls_;
242 instance().free_thread_data( static_cast<thread_record*>( rec ) );
246 CDS_EXPORT_API void smr::detach_all_thread()
248 thread_record * pNext = nullptr;
249 const cds::OS::ThreadId nullThreadId = cds::OS::c_NullThreadId;
251 for ( thread_record * hprec = thread_list_.load( atomics::memory_order_relaxed ); hprec; hprec = pNext ) {
252 pNext = hprec->m_pNextNode.load( atomics::memory_order_relaxed );
253 if ( hprec->m_idOwner.load( atomics::memory_order_relaxed ) != nullThreadId ) {
254 free_thread_data( hprec );
259 CDS_EXPORT_API smr::thread_record* smr::create_thread_data()
261 size_t const guard_array_size = sizeof( guard ) * initial_hazard_count_;
264 The memory is allocated by contnuous block
266 +--------------------------+
272 |--------------------------| |
276 +--------------------------+
279 char* mem = reinterpret_cast<char*>( s_alloc_memory( sizeof( thread_record ) + guard_array_size ));
280 return new( mem ) thread_record(
281 reinterpret_cast<guard*>( mem + sizeof( thread_record ) ), initial_hazard_count_
285 /*static*/ CDS_EXPORT_API void smr::destroy_thread_data( thread_record* pRec )
287 // all retired pointers must be freed
288 pRec->~thread_record();
289 s_free_memory( pRec );
292 CDS_EXPORT_API smr::thread_record* smr::alloc_thread_data()
294 thread_record * hprec = nullptr;
295 const cds::OS::ThreadId nullThreadId = cds::OS::c_NullThreadId;
296 const cds::OS::ThreadId curThreadId = cds::OS::get_current_thread_id();
298 // First try to reuse a free (non-active) DHP record
299 for ( hprec = thread_list_.load( atomics::memory_order_acquire ); hprec; hprec = hprec->m_pNextNode.load( atomics::memory_order_relaxed ) ) {
300 cds::OS::ThreadId thId = nullThreadId;
301 if ( !hprec->m_idOwner.compare_exchange_strong( thId, curThreadId, atomics::memory_order_relaxed, atomics::memory_order_relaxed ) )
303 hprec->m_bFree.store( false, atomics::memory_order_release );
308 // No HP records available for reuse
309 // Allocate and push a new HP record
310 hprec = create_thread_data();
311 hprec->m_idOwner.store( curThreadId, atomics::memory_order_relaxed );
313 thread_record* pOldHead = thread_list_.load( atomics::memory_order_relaxed );
315 hprec->m_pNextNode.store( pOldHead, atomics::memory_order_relaxed );
316 } while ( !thread_list_.compare_exchange_weak( pOldHead, hprec, atomics::memory_order_release, atomics::memory_order_acquire ) );
319 hprec->hazards_.init();
320 hprec->retired_.init();
325 CDS_EXPORT_API void smr::free_thread_data( thread_record* pRec )
327 assert( pRec != nullptr );
328 //CDS_HAZARDPTR_STATISTIC( ++m_Stat.m_RetireHPRec )
330 pRec->hazards_.clear();
334 if ( pRec->retired_.empty() ) {
335 pRec->retired_.fini();
336 pRec->m_bFree.store( true, std::memory_order_release );
339 // Free all empty blocks
340 retired_block* free_block = pRec->retired_.current_block_->next_;
342 pRec->retired_.current_block_->next_ = nullptr;
343 while ( free_block ) {
344 retired_block* next = free_block->next_;
345 retired_allocator_.free( free_block );
347 --pRec->retired_.block_count_;
352 pRec->m_idOwner.store( cds::OS::c_NullThreadId, atomics::memory_order_release );
356 typedef std::vector<void*, allocator<void*>> hp_vector;
358 inline void copy_hazards( hp_vector& vect, guard const* arr, size_t size )
360 for ( guard const* end = arr + size; arr != end; ++arr ) {
361 void* hp = arr->get();
363 vect.push_back( hp );
367 inline size_t retire_data( hp_vector const& plist, retired_array& stg, retired_block* block, size_t block_size )
369 auto hp_begin = plist.begin();
370 auto hp_end = plist.end();
373 for ( retired_ptr* p = block->first(), *end = p + block_size; p != end; ++p ) {
374 if ( cds_unlikely( std::binary_search( hp_begin, hp_end, p->m_p )))
387 CDS_EXPORT_API void smr::scan( thread_data* pThreadRec )
389 thread_record* pRec = static_cast<thread_record*>( pThreadRec );
392 size_t plist_size = last_plist_size_.load( std::memory_order_relaxed );
393 plist.reserve( plist_size );
395 // Stage 1: Scan HP list and insert non-null values in plist
396 thread_record* pNode = thread_list_.load( atomics::memory_order_acquire );
398 if ( pNode->m_idOwner.load( std::memory_order_relaxed ) != cds::OS::c_NullThreadId ) {
399 copy_hazards( plist, pNode->hazards_.array_, pNode->hazards_.initial_capacity_ );
401 for ( guard_block* block = pNode->hazards_.extended_list_; block; block = block->next_ )
402 copy_hazards( plist, block->first(), defaults::c_extended_guard_block_size );
405 pNode = pNode->m_pNextNode.load( atomics::memory_order_relaxed );
408 // Store plist size for next scan() call (vector reallocation optimization)
409 if ( plist.size() > plist_size )
410 last_plist_size_.compare_exchange_weak( plist_size, plist.size(), std::memory_order_relaxed, std::memory_order_relaxed );
412 // Sort plist to simplify search in
413 std::sort( plist.begin(), plist.end() );
415 // Stage 2: Search plist
416 size_t free_count = 0;
417 retired_block* last_block = pRec->retired_.current_block_;
418 retired_ptr* last_block_cell = pRec->retired_.current_cell_;
420 pRec->retired_.current_block_ = pRec->retired_.list_head_;
421 pRec->retired_.current_cell_ = pRec->retired_.current_block_->first();
423 for ( retired_block* block = pRec->retired_.list_head_; block; block = block->next_ ) {
424 bool const end_block = block == last_block;
425 size_t const size = end_block ? last_block_cell - block->first() : retired_block::c_capacity;
427 free_count += retire_data( plist, pRec->retired_, block, size );
433 // If the count of freed elements is too small, increase retired array
434 if ( free_count == 0 && last_block == pRec->retired_.list_tail_ && last_block_cell == last_block->last() )
435 pRec->retired_.extend();
438 CDS_EXPORT_API void smr::help_scan( thread_data* pThis )
440 assert( static_cast<thread_record*>( pThis )->m_idOwner.load( atomics::memory_order_relaxed ) == cds::OS::get_current_thread_id() );
442 const cds::OS::ThreadId nullThreadId = cds::OS::c_NullThreadId;
443 const cds::OS::ThreadId curThreadId = cds::OS::get_current_thread_id();
444 for ( thread_record* hprec = thread_list_.load( atomics::memory_order_acquire ); hprec; hprec = hprec->m_pNextNode.load( atomics::memory_order_relaxed ) )
446 // If m_bFree == true then hprec->retired_ is empty - we don't need to see it
447 if ( hprec->m_bFree.load( atomics::memory_order_acquire ) ) {
448 assert( hprec->retired_.empty() );
453 // Several threads may work concurrently so we use atomic technique
455 cds::OS::ThreadId curOwner = hprec->m_idOwner.load( atomics::memory_order_relaxed );
456 if ( curOwner == nullThreadId || !cds::OS::is_thread_alive( curOwner ) ) {
457 if ( !hprec->m_idOwner.compare_exchange_strong( curOwner, curThreadId, atomics::memory_order_acquire, atomics::memory_order_relaxed ) )
464 // We own the thread record successfully. Now, we can see whether it has retired pointers.
465 // If it has ones then we move to pThis that is private for current thread.
466 retired_array& src = hprec->retired_;
467 retired_array& dest = pThis->retired_;
469 for ( retired_block* block = src.list_head_; block; block = block->next_ ) {
470 retired_ptr* last = block == src.current_block_ ? src.current_cell_ : block->last();
471 for ( retired_ptr* p = block->first(); p != last; ++p ) {
472 if ( !dest.push( *p ) )
476 if ( block == src.current_block_ )
481 hprec->m_bFree.store( true, atomics::memory_order_relaxed );
482 hprec->m_idOwner.store( nullThreadId, atomics::memory_order_release );
488 }}} // namespace cds::gc::dhp