From 77142203061ff77389e9890b4ea06ad16b22c9c4 Mon Sep 17 00:00:00 2001 From: khizmax Date: Sun, 29 Nov 2015 09:39:21 +0300 Subject: [PATCH] Fixed memory leaks in flat-combining algo --- cds/algo/flat_combining.h | 51 ++++++++++++++++++++++++--------------- 1 file changed, 31 insertions(+), 20 deletions(-) diff --git a/cds/algo/flat_combining.h b/cds/algo/flat_combining.h index afc71e5d..32519eee 100644 --- a/cds/algo/flat_combining.h +++ b/cds/algo/flat_combining.h @@ -199,8 +199,8 @@ namespace cds { namespace algo { - \p opt::allocator - allocator type, default is \ref CDS_DEFAULT_ALLOCATOR - \p opt::stat - internal statistics, possible type: \ref stat, \ref empty_stat (the default) - \p opt::memory_model - C++ memory ordering model. - List of all available memory ordering see opt::memory_model. - Default if cds::opt::v:relaxed_ordering + List of all available memory ordering see \p opt::memory_model. + Default is \p cds::opt::v::relaxed_ordering */ template struct make_traits { @@ -223,22 +223,22 @@ namespace cds { namespace algo { The kernel object should be a member of a container class. The container cooperates with flat combining kernel object. There are two ways to interact with the kernel: - - One-by-one processing the active records of the publication list. This mode provides \ref combine function: - the container acquires its publication record by \ref acquire_record, fills its fields and calls - \p combine function of its kernel object. If the current thread becomes a combiner, the kernel - calls \p fc_apply function of the container for each active non-empty record. Then, the container - should release its publication record by \ref release_record. Only one pass through the publication + - One-by-one processing the active records of the publication list. This mode provides by \p combine() function: + the container acquires its publication record by \p acquire_record(), fills its fields and calls + \p combine() function of its kernel object. If the current thread becomes a combiner, the kernel + calls \p fc_apply() function of the container for each active non-empty record. Then, the container + should release its publication record by \p release_record(). Only one pass through the publication list is possible. - - Batch processing (\ref batch_combine function). It this mode the container obtains access + - Batch processing - \p batch_combine() function. It this mode the container obtains access to entire publication list. This mode allows the container to perform an elimination, for example, - the stack can collide \p push and \p pop requests. The sequence of invocations is the following: - the container acquires its publication record by \ref acquire_record, fills its field and call - \p batch_combine function of its kernel object. If the current thread becomes a combiner, - the kernel calls \p fc_process function of the container passing two iterators pointing to - begin and end of publication list (see \ref iterator class). The iterators allows + the stack can collide \p push() and \p pop() requests. The sequence of invocations is the following: + the container acquires its publication record by \p acquire_record(), fills its field and call + \p batch_combine() function of its kernel object. If the current thread becomes a combiner, + the kernel calls \p fc_process() function of the container passing two iterators pointing to + the begin and the end of publication list (see \ref iterator class). The iterators allow multiple pass through active records of publication list. For each processed record the container - should call \ref operation_done function. On the end, the container should release - its record by \ref release_record. + should call \p operation_done() function. On the end, the container should release + its record by \p release_record(). */ template < typename PublicationRecord @@ -305,8 +305,14 @@ namespace cds { namespace algo { ~kernel() { // mark all publication record as detached - for ( publication_record * p = m_pHead; p; p = p->pNext.load( memory_model::memory_order_relaxed )) + for ( publication_record * p = m_pHead; p; ) { p->pOwner = nullptr; + + publication_record * pRec = p; + p = p->pNext.load( memory_model::memory_order_relaxed ); + if ( pRec->nState.load( memory_model::memory_order_relaxed ) == removed ) + free_publication_record( static_cast( pRec )); + } } /// Gets publication list record for the current thread @@ -539,18 +545,23 @@ namespace cds { namespace algo { // Thread done // pRec that is TLS data should be excluded from publication list if ( pRec ) { - if ( pRec->nState.load(memory_model::memory_order_relaxed) == active && pRec->pOwner ) { + if ( pRec->pOwner && pRec->nState.load(memory_model::memory_order_relaxed) == active ) { // record is active and kernel is alive unsigned int nState = active; pRec->nState.compare_exchange_strong( nState, removed, memory_model::memory_order_release, atomics::memory_order_relaxed ); } else { // record is not in publication list or kernel already deleted - cxx11_allocator().Delete( pRec ); + free_publication_record( pRec ); } } } + static void free_publication_record( publication_record_type * pRec ) + { + cxx11_allocator().Delete( pRec ); + } + void init() { assert( m_pThreadRec.get() == nullptr ); @@ -770,14 +781,14 @@ namespace cds { namespace algo { if ( pPrev->pNext.compare_exchange_strong( p, pNext, memory_model::memory_order_release, atomics::memory_order_relaxed )) { - cxx11_allocator().Delete( static_cast( p )); + free_publication_record( static_cast( p )); m_Stat.onDeletePubRecord(); } return pNext; } else { m_pHead = static_cast( p->pNext.load( memory_model::memory_order_acquire )); - cxx11_allocator().Delete( static_cast( p )); + free_publication_record( static_cast( p )); m_Stat.onDeletePubRecord(); return m_pHead; } -- 2.34.1