From be26a23d4d4db7eb057b3e28f6048090eb38d3f1 Mon Sep 17 00:00:00 2001 From: khizmax Date: Wed, 28 Dec 2016 00:29:34 +0300 Subject: [PATCH] Fixed a memory leak in flat-combining algorithm --- cds/algo/flat_combining/defs.h | 4 +- cds/algo/flat_combining/kernel.h | 174 +++++++++++++++++-------------- 2 files changed, 98 insertions(+), 80 deletions(-) diff --git a/cds/algo/flat_combining/defs.h b/cds/algo/flat_combining/defs.h index a959e8ec..23470d69 100644 --- a/cds/algo/flat_combining/defs.h +++ b/cds/algo/flat_combining/defs.h @@ -60,7 +60,8 @@ namespace cds { namespace algo { namespace flat_combining { atomics::atomic nRequest; ///< Request field (depends on data structure) atomics::atomic nState; ///< Record state: inactive, active, removed atomics::atomic nAge; ///< Age of the record - atomics::atomic pNext; ///< Next record in publication list + atomics::atomic pNext; ///< Next record in active publication list + atomics::atomic pNextAllocated; ///< Next record in allocated publication list /// Initializes publication record publication_record() @@ -68,6 +69,7 @@ namespace cds { namespace algo { namespace flat_combining { , nState( inactive ) , nAge( 0 ) , pNext( nullptr ) + , pNextAllocated( nullptr ) {} /// Returns the value of \p nRequest field diff --git a/cds/algo/flat_combining/kernel.h b/cds/algo/flat_combining/kernel.h index 654992d6..a8d3ce9c 100644 --- a/cds/algo/flat_combining/kernel.h +++ b/cds/algo/flat_combining/kernel.h @@ -254,11 +254,12 @@ namespace cds { namespace algo { //@endcond protected: - atomics::atomic m_nCount; ///< Total count of combining passes. Used as an age. - publication_record_type * m_pHead; ///< Head of publication list + atomics::atomic m_nCount; ///< Total count of combining passes. Used as an age. + publication_record_type* m_pHead; ///< Head of active publication list + publication_record_type* m_pAllocatedHead; ///< Head of allocated publication list boost::thread_specific_ptr< publication_record_type > m_pThreadRec; ///< Thread-local publication record - mutable global_lock_type m_Mutex; ///< Global mutex - mutable stat m_Stat; ///< Internal statistics + mutable global_lock_type m_Mutex; ///< Global mutex + mutable stat m_Stat; ///< Internal statistics unsigned int const m_nCompactFactor; ///< Publication list compacting factor (the list will be compacted through \p %m_nCompactFactor combining passes) unsigned int const m_nCombinePassCount; ///< Number of combining passes wait_strategy m_waitStrategy; ///< Wait strategy @@ -281,24 +282,29 @@ namespace cds { namespace algo { ) : m_nCount(0) , m_pHead( nullptr ) + , m_pAllocatedHead( nullptr ) , m_pThreadRec( tls_cleanup ) , m_nCompactFactor( (unsigned int)( cds::beans::ceil2( nCompactFactor ) - 1 )) // binary mask , m_nCombinePassCount( nCombinePassCount ) { - init(); + assert( m_pThreadRec.get() == nullptr ); + publication_record_type* pRec = cxx11_allocator().New(); + m_pAllocatedHead = + m_pHead = pRec; + m_pThreadRec.reset( pRec ); + m_Stat.onCreatePubRecord(); } - /// Destroys the objects and mark all publication records as inactive + /// Destroys the object and all publication records ~kernel() { m_pThreadRec.reset(); // calls tls_cleanup() // delete all publication records - for ( publication_record* p = m_pHead; p; ) { + for ( publication_record* p = m_pAllocatedHead; p; ) { publication_record * pRec = p; - p = p->pNext.load( memory_model::memory_order_relaxed ); + p = p->pNextAllocated.load( memory_model::memory_order_relaxed ); free_publication_record( static_cast( pRec )); - m_Stat.onDeletePubRecord(); } } @@ -315,9 +321,17 @@ namespace cds { namespace algo { pRec = cxx11_allocator().New(); m_pThreadRec.reset( pRec ); m_Stat.onCreatePubRecord(); - } - if ( pRec->nState.load( memory_model::memory_order_acquire ) != active ) + // Insert in allocated list + assert( m_pAllocatedHead != nullptr ); + publication_record* p = m_pAllocatedHead->pNextAllocated.load( memory_model::memory_order_acquire ); + do { + pRec->pNextAllocated.store( p, memory_model::memory_order_relaxed ); + } while ( !m_pAllocatedHead->pNextAllocated.compare_exchange_weak( p, pRec, memory_model::memory_order_release, atomics::memory_order_acquire )); + + publish( pRec ); + } + else if ( pRec->nState.load( memory_model::memory_order_acquire ) != active ) publish( pRec ); assert( pRec->op() == req_EmptyRecord ); @@ -573,26 +587,18 @@ namespace cds { namespace algo { pRec->nState.store( removed, memory_model::memory_order_release ); } - static void free_publication_record( publication_record_type* pRec ) + void free_publication_record( publication_record_type* pRec ) { cxx11_allocator().Delete( pRec ); - } - - void init() - { - assert( m_pThreadRec.get() == nullptr ); - publication_record_type* pRec = cxx11_allocator().New(); - m_pHead = pRec; - m_pThreadRec.reset( pRec ); - m_Stat.onCreatePubRecord(); + m_Stat.onDeletePubRecord(); } void publish( publication_record_type* pRec ) { assert( pRec->nState.load( memory_model::memory_order_relaxed ) == inactive ); - pRec->nAge.store( m_nCount.load(memory_model::memory_order_relaxed), memory_model::memory_order_release ); - pRec->nState.store( active, memory_model::memory_order_release ); + pRec->nAge.store( m_nCount.load(memory_model::memory_order_relaxed), memory_model::memory_order_relaxed ); + pRec->nState.store( active, memory_model::memory_order_relaxed ); // Insert record to publication list if ( m_pHead != static_cast(pRec)) { @@ -690,42 +696,36 @@ namespace cds { namespace algo { } m_Stat.onCombining(); - if ( (nCurAge & m_nCompactFactor) == 0 ) + if ( ( nCurAge & m_nCompactFactor ) == 0 ) compact_list( nCurAge ); } template bool combining_pass( Container& owner, unsigned int nCurAge ) { - publication_record* pPrev = nullptr; publication_record* p = m_pHead; bool bOpDone = false; while ( p ) { - switch ( p->nState.load( memory_model::memory_order_acquire )) { - case active: - if ( p->op() >= req_Operation ) { - p->nAge.store( nCurAge, memory_model::memory_order_release ); - owner.fc_apply( static_cast(p)); - operation_done( *p ); - bOpDone = true; - } - break; - case inactive: - // Only m_pHead can be inactive in the publication list - assert( p == m_pHead ); - break; - case removed: - // The record should be removed (except m_pHead) - if ( pPrev ) { - p = unlink_and_delete_record( pPrev, p ); - continue; - } - break; - default: - /// ??? That is impossible - assert(false); + switch ( p->nState.load( memory_model::memory_order_acquire ) ) { + case active: + if ( p->op() >= req_Operation ) { + p->nAge.store( nCurAge, memory_model::memory_order_relaxed ); + owner.fc_apply( static_cast( p ) ); + operation_done( *p ); + bOpDone = true; + } + break; + case inactive: + // Only m_pHead can be inactive in the publication list + assert( p == m_pHead ); + break; + case removed: + // Such record will be removed on compacting phase + break; + default: + /// ??? That is impossible + assert( false ); } - pPrev = p; p = p->pNext.load( memory_model::memory_order_acquire ); } return bOpDone; @@ -735,16 +735,16 @@ namespace cds { namespace algo { void batch_combining( Container& owner ) { // The thread is a combiner - assert( !m_Mutex.try_lock()); + assert( !m_Mutex.try_lock() ); unsigned int const nCurAge = m_nCount.fetch_add( 1, memory_model::memory_order_relaxed ) + 1; for ( unsigned int nPass = 0; nPass < m_nCombinePassCount; ++nPass ) - owner.fc_process( begin(), end()); + owner.fc_process( begin(), end() ); combining_pass( owner, nCurAge ); m_Stat.onCombining(); - if ( (nCurAge & m_nCompactFactor) == 0 ) + if ( ( nCurAge & m_nCompactFactor ) == 0 ) compact_list( nCurAge ); } @@ -760,10 +760,10 @@ namespace cds { namespace algo { m_Stat.onPassiveWaitIteration(); // Wait while operation processing - if ( m_waitStrategy.wait( *this, *pRec )) + if ( m_waitStrategy.wait( *this, *pRec ) ) m_Stat.onWakeupByNotifying(); - if ( m_Mutex.try_lock()) { + if ( m_Mutex.try_lock() ) { if ( pRec->op( memory_model::memory_order_acquire ) == req_Response ) { // Operation is done m_Mutex.unlock(); @@ -782,18 +782,21 @@ namespace cds { namespace algo { return true; } - void compact_list( unsigned int const nCurAge ) + void compact_list( unsigned int nCurAge ) { - // Thinning publication list - publication_record * pPrev = nullptr; - for ( publication_record * p = m_pHead; p; ) { - if ( p->nState.load( memory_model::memory_order_acquire ) == active - && p->nAge.load( memory_model::memory_order_acquire ) + m_nCompactFactor < nCurAge ) - { - if ( pPrev ) { - publication_record * pNext = p->pNext.load( memory_model::memory_order_acquire ); + // Compacts publication list + // This function is called only by combiner thread + + try_again: + publication_record * pPrev = m_pHead; + for ( publication_record * p = pPrev->pNext.load( memory_model::memory_order_acquire ); p; ) { + switch ( p->nState.load( memory_model::memory_order_relaxed ) ) { + case active: + if ( p->nAge.load( memory_model::memory_order_relaxed ) + m_nCompactFactor < nCurAge ) + { + publication_record * pNext = p->pNext.load( memory_model::memory_order_relaxed ); if ( pPrev->pNext.compare_exchange_strong( p, pNext, - memory_model::memory_order_release, atomics::memory_order_relaxed )) + memory_model::memory_order_acquire, atomics::memory_order_relaxed ) ) { p->nState.store( inactive, memory_model::memory_order_release ); p = pNext; @@ -801,28 +804,41 @@ namespace cds { namespace algo { continue; } } + break; + + case removed: + publication_record * pNext = p->pNext.load( memory_model::memory_order_acquire ); + if ( cds_likely( pPrev->pNext.compare_exchange_strong( p, pNext, memory_model::memory_order_acquire, atomics::memory_order_relaxed ))) { + p = pNext; + continue; + } + else { + // CAS can be failed only in beginning of list + assert( pPrev == m_pHead ); + goto try_again; + } } pPrev = p; p = p->pNext.load( memory_model::memory_order_acquire ); } - m_Stat.onCompactPublicationList(); - } - - publication_record * unlink_and_delete_record( publication_record * pPrev, publication_record * p ) - { - // m_pHead is persistent node and cannot be deleted - assert( pPrev != nullptr ); - assert( p != m_pHead ); + // Iterate over allocated list to find removed records + pPrev = m_pAllocatedHead; + for ( publication_record * p = pPrev->pNextAllocated.load( memory_model::memory_order_acquire ); p; ) { + if ( p->nState.load( memory_model::memory_order_relaxed ) == removed ) { + publication_record * pNext = p->pNextAllocated.load( memory_model::memory_order_relaxed ); + if ( pPrev->pNextAllocated.compare_exchange_strong( p, pNext, memory_model::memory_order_acquire, atomics::memory_order_relaxed )) { + free_publication_record( static_cast( p )); + p = pNext; + continue; + } + } - publication_record * pNext = p->pNext.load( memory_model::memory_order_acquire ); - if ( pPrev->pNext.compare_exchange_strong( p, pNext, - memory_model::memory_order_acquire, atomics::memory_order_relaxed )) - { - free_publication_record( static_cast( p )); - m_Stat.onDeletePubRecord(); + pPrev = p; + p = p->pNextAllocated.load( memory_model::memory_order_relaxed ); } - return pNext; + + m_Stat.onCompactPublicationList(); } //@endcond }; -- 2.34.1