Fixed memory leaks in flat-combining algo
authorkhizmax <libcds.dev@gmail.com>
Sun, 29 Nov 2015 06:39:21 +0000 (09:39 +0300)
committerkhizmax <libcds.dev@gmail.com>
Sun, 29 Nov 2015 06:39:21 +0000 (09:39 +0300)
cds/algo/flat_combining.h

index afc71e5da48b1902b6ff45b487903d3943c9d4c9..32519eee58a23c6ff278b28a8d1068172a6551fa 100644 (file)
@@ -199,8 +199,8 @@ namespace cds { namespace algo {
             - \p opt::allocator - allocator type, default is \ref CDS_DEFAULT_ALLOCATOR
             - \p opt::stat - internal statistics, possible type: \ref stat, \ref empty_stat (the default)
             - \p opt::memory_model - C++ memory ordering model.
-                List of all available memory ordering see opt::memory_model.
-                Default if cds::opt::v:relaxed_ordering
+                List of all available memory ordering see \p opt::memory_model.
+                Default is \p cds::opt::v::relaxed_ordering
         */
         template <typename... Options>
         struct make_traits {
@@ -223,22 +223,22 @@ namespace cds { namespace algo {
 
             The kernel object should be a member of a container class. The container cooperates with flat combining
             kernel object. There are two ways to interact with the kernel:
-            - One-by-one processing the active records of the publication list. This mode provides \ref combine function:
-              the container acquires its publication record by \ref acquire_record, fills its fields and calls
-              \p combine function of its kernel object. If the current thread becomes a combiner, the kernel
-              calls \p fc_apply function of the container for each active non-empty record. Then, the container
-              should release its publication record by \ref release_record. Only one pass through the publication
+            - One-by-one processing the active records of the publication list. This mode provides by \p combine() function:
+              the container acquires its publication record by \p acquire_record(), fills its fields and calls
+              \p combine() function of its kernel object. If the current thread becomes a combiner, the kernel
+              calls \p fc_apply() function of the container for each active non-empty record. Then, the container
+              should release its publication record by \p release_record(). Only one pass through the publication
               list is possible.
-            - Batch processing (\ref batch_combine function). It this mode the container obtains access
+            - Batch processing - \p batch_combine() function. It this mode the container obtains access
               to entire publication list. This mode allows the container to perform an elimination, for example,
-              the stack can collide \p push and \p pop requests. The sequence of invocations is the following:
-              the container acquires its publication record by \ref acquire_record, fills its field and call
-              \p batch_combine function of its kernel object. If the current thread becomes a combiner,
-              the kernel calls \p fc_process function of the container passing two iterators pointing to
-              begin and end of publication list (see \ref iterator class). The iterators allows
+              the stack can collide \p push() and \p pop() requests. The sequence of invocations is the following:
+              the container acquires its publication record by \p acquire_record(), fills its field and call
+              \p batch_combine() function of its kernel object. If the current thread becomes a combiner,
+              the kernel calls \p fc_process() function of the container passing two iterators pointing to
+              the begin and the end of publication list (see \ref iterator class). The iterators allow
               multiple pass through active records of publication list. For each processed record the container
-              should call \ref operation_done function. On the end, the container should release
-              its record by \ref release_record.
+              should call \p operation_done() function. On the end, the container should release
+              its record by \p release_record().
         */
         template <
             typename PublicationRecord
@@ -305,8 +305,14 @@ namespace cds { namespace algo {
             ~kernel()
             {
                 // mark all publication record as detached
-                for ( publication_record * p = m_pHead; p; p = p->pNext.load( memory_model::memory_order_relaxed ))
+                for ( publication_record * p = m_pHead; p; ) {
                     p->pOwner = nullptr;
+
+                    publication_record * pRec = p;
+                    p = p->pNext.load( memory_model::memory_order_relaxed );
+                    if ( pRec->nState.load( memory_model::memory_order_relaxed ) == removed )
+                        free_publication_record( static_cast<publication_record_type *>( pRec ));
+                }
             }
 
             /// Gets publication list record for the current thread
@@ -539,18 +545,23 @@ namespace cds { namespace algo {
                 // Thread done
                 // pRec that is TLS data should be excluded from publication list
                 if ( pRec ) {
-                    if ( pRec->nState.load(memory_model::memory_order_relaxed) == active && pRec->pOwner ) {
+                    if ( pRec->pOwner && pRec->nState.load(memory_model::memory_order_relaxed) == active ) {
                         // record is active and kernel is alive
                         unsigned int nState = active;
                         pRec->nState.compare_exchange_strong( nState, removed, memory_model::memory_order_release, atomics::memory_order_relaxed );
                     }
                     else {
                         // record is not in publication list or kernel already deleted
-                        cxx11_allocator().Delete( pRec );
+                        free_publication_record( pRec );
                     }
                 }
             }
 
+            static void free_publication_record( publication_record_type * pRec )
+            {
+                cxx11_allocator().Delete( pRec );
+            }
+
             void init()
             {
                 assert( m_pThreadRec.get() == nullptr );
@@ -770,14 +781,14 @@ namespace cds { namespace algo {
                     if ( pPrev->pNext.compare_exchange_strong( p, pNext,
                         memory_model::memory_order_release, atomics::memory_order_relaxed ))
                     {
-                        cxx11_allocator().Delete( static_cast<publication_record_type *>( p ));
+                        free_publication_record( static_cast<publication_record_type *>( p ));
                         m_Stat.onDeletePubRecord();
                     }
                     return pNext;
                 }
                 else {
                     m_pHead = static_cast<publication_record_type *>( p->pNext.load( memory_model::memory_order_acquire ));
-                    cxx11_allocator().Delete( static_cast<publication_record_type *>( p ));
+                    free_publication_record( static_cast<publication_record_type *>( p ));
                     m_Stat.onDeletePubRecord();
                     return m_pHead;
                 }