Removed trailing spaces
[libcds.git] / cds / algo / flat_combining / kernel.h
index cbc39b778b7bb1d9409c8cbef31507962a860e3e..5e333e157bf4daeb1205285e2f715c3af4db126d 100644 (file)
@@ -1,7 +1,7 @@
 /*
     This file is a part of libcds - Concurrent Data Structures library
 
-    (C) Copyright Maxim Khizhinsky (libcds.dev@gmail.com) 2006-2016
+    (C) Copyright Maxim Khizhinsky (libcds.dev@gmail.com) 2006-2017
 
     Source code repo: http://github.com/khizmax/libcds/
     Download: http://sourceforge.net/projects/libcds/files/
@@ -254,11 +254,12 @@ namespace cds { namespace algo {
             //@endcond
 
         protected:
-            atomics::atomic<unsigned int>  m_nCount;   ///< Total count of combining passes. Used as an age.
-            publication_record_type *   m_pHead;    ///< Head of publication list
+            atomics::atomic<unsigned int>  m_nCount;    ///< Total count of combining passes. Used as an age.
+            publication_record_type*    m_pHead;        ///< Head of active publication list
+            publication_record_type*    m_pAllocatedHead; ///< Head of allocated publication list
             boost::thread_specific_ptr< publication_record_type > m_pThreadRec;   ///< Thread-local publication record
-            mutable global_lock_type    m_Mutex;    ///< Global mutex
-            mutable stat                m_Stat;     ///< Internal statistics
+            mutable global_lock_type    m_Mutex;        ///< Global mutex
+            mutable stat                m_Stat;         ///< Internal statistics
             unsigned int const          m_nCompactFactor;    ///< Publication list compacting factor (the list will be compacted through \p %m_nCompactFactor combining passes)
             unsigned int const          m_nCombinePassCount; ///< Number of combining passes
             wait_strategy               m_waitStrategy;      ///< Wait strategy
@@ -281,20 +282,28 @@ namespace cds { namespace algo {
                 )
                 : m_nCount(0)
                 , m_pHead( nullptr )
+                , m_pAllocatedHead( nullptr )
                 , m_pThreadRec( tls_cleanup )
                 , m_nCompactFactor( (unsigned int)( cds::beans::ceil2( nCompactFactor ) - 1 ))   // binary mask
                 , m_nCombinePassCount( nCombinePassCount )
             {
-                init();
+                assert( m_pThreadRec.get() == nullptr );
+                publication_record_type* pRec = cxx11_allocator().New();
+                m_pAllocatedHead =
+                    m_pHead = pRec;
+                m_pThreadRec.reset( pRec );
+                m_Stat.onCreatePubRecord();
             }
 
-            /// Destroys the objects and mark all publication records as inactive
+            /// Destroys the object and all publication records
             ~kernel()
             {
-                // mark all publication record as detached
-                for ( publication_record* p = m_pHead; p; ) {
+                m_pThreadRec.reset();   // calls tls_cleanup()
+
+                // delete all publication records
+                for ( publication_record* p = m_pAllocatedHead; p; ) {
                     publication_record * pRec = p;
-                    p = p->pNext.load( memory_model::memory_order_relaxed );
+                    p = p->pNextAllocated.load( memory_model::memory_order_relaxed );
                     free_publication_record( static_cast<publication_record_type *>( pRec ));
                 }
             }
@@ -312,9 +321,17 @@ namespace cds { namespace algo {
                     pRec = cxx11_allocator().New();
                     m_pThreadRec.reset( pRec );
                     m_Stat.onCreatePubRecord();
-                }
 
-                if ( pRec->nState.load( memory_model::memory_order_acquire ) != active )
+                    // Insert in allocated list
+                    assert( m_pAllocatedHead != nullptr );
+                    publication_record* p = m_pAllocatedHead->pNextAllocated.load( memory_model::memory_order_acquire );
+                    do {
+                        pRec->pNextAllocated.store( p, memory_model::memory_order_relaxed );
+                    } while ( !m_pAllocatedHead->pNextAllocated.compare_exchange_weak( p, pRec, memory_model::memory_order_release, atomics::memory_order_acquire ));
+
+                    publish( pRec );
+                }
+                else if ( pRec->nState.load( memory_model::memory_order_acquire ) != active )
                     publish( pRec );
 
                 assert( pRec->op() == req_EmptyRecord );
@@ -570,36 +587,28 @@ namespace cds { namespace algo {
                 pRec->nState.store( removed, memory_model::memory_order_release );
             }
 
-            static void free_publication_record( publication_record_type* pRec )
+            void free_publication_record( publication_record_type* pRec )
             {
                 cxx11_allocator().Delete( pRec );
-            }
-
-            void init()
-            {
-                assert( m_pThreadRec.get() == nullptr );
-                publication_record_type* pRec = cxx11_allocator().New();
-                m_pHead = pRec;
-                m_pThreadRec.reset( pRec );
-                m_Stat.onCreatePubRecord();
+                m_Stat.onDeletePubRecord();
             }
 
             void publish( publication_record_type* pRec )
             {
                 assert( pRec->nState.load( memory_model::memory_order_relaxed ) == inactive );
 
-                pRec->nAge.store( m_nCount.load(memory_model::memory_order_relaxed), memory_model::memory_order_release );
-                pRec->nState.store( active, memory_model::memory_order_release );
+                pRec->nAge.store( m_nCount.load(memory_model::memory_order_relaxed), memory_model::memory_order_relaxed );
+                pRec->nState.store( active, memory_model::memory_order_relaxed );
 
                 // Insert record to publication list
                 if ( m_pHead != static_cast<publication_record *>(pRec)) {
                     publication_record * p = m_pHead->pNext.load(memory_model::memory_order_relaxed);
                     if ( p != static_cast<publication_record *>( pRec )) {
                         do {
-                            pRec->pNext = p;
+                            pRec->pNext.store( p, memory_model::memory_order_relaxed );
                             // Failed CAS changes p
                         } while ( !m_pHead->pNext.compare_exchange_weak( p, static_cast<publication_record *>(pRec),
-                            memory_model::memory_order_release, atomics::memory_order_relaxed ));
+                            memory_model::memory_order_release, atomics::memory_order_acquire ));
                         m_Stat.onActivatePubRecord();
                     }
                 }
@@ -687,39 +696,36 @@ namespace cds { namespace algo {
                 }
 
                 m_Stat.onCombining();
-                if ( (nCurAge & m_nCompactFactor) == 0 )
+                if ( ( nCurAge & m_nCompactFactor ) == 0 )
                     compact_list( nCurAge );
             }
 
             template <class Container>
             bool combining_pass( Container& owner, unsigned int nCurAge )
             {
-                publication_record* pPrev = nullptr;
                 publication_record* p = m_pHead;
                 bool bOpDone = false;
                 while ( p ) {
                     switch ( p->nState.load( memory_model::memory_order_acquire )) {
-                        case active:
-                            if ( p->op() >= req_Operation ) {
-                                p->nAge.store( nCurAge, memory_model::memory_order_release );
-                                owner.fc_apply( static_cast<publication_record_type*>(p));
-                                operation_done( *p );
-                                bOpDone = true;
-                            }
-                            break;
-                        case inactive:
-                            // Only m_pHead can be inactive in the publication list
-                            assert( p == m_pHead );
-                            break;
-                        case removed:
-                            // The record should be removed
-                            p = unlink_and_delete_record( pPrev, p );
-                            continue;
-                        default:
-                            /// ??? That is impossible
-                            assert(false);
+                    case active:
+                        if ( p->op() >= req_Operation ) {
+                            p->nAge.store( nCurAge, memory_model::memory_order_relaxed );
+                            owner.fc_apply( static_cast<publication_record_type*>( p ));
+                            operation_done( *p );
+                            bOpDone = true;
+                        }
+                        break;
+                    case inactive:
+                        // Only m_pHead can be inactive in the publication list
+                        assert( p == m_pHead );
+                        break;
+                    case removed:
+                        // Such record will be removed on compacting phase
+                        break;
+                    default:
+                        /// ??? That is impossible
+                        assert( false );
                     }
-                    pPrev = p;
                     p = p->pNext.load( memory_model::memory_order_acquire );
                 }
                 return bOpDone;
@@ -738,7 +744,7 @@ namespace cds { namespace algo {
 
                 combining_pass( owner, nCurAge );
                 m_Stat.onCombining();
-                if ( (nCurAge & m_nCompactFactor) == 0 )
+                if ( ( nCurAge & m_nCompactFactor ) == 0 )
                     compact_list( nCurAge );
             }
 
@@ -776,18 +782,21 @@ namespace cds { namespace algo {
                 return true;
             }
 
-            void compact_list( unsigned int const nCurAge )
+            void compact_list( unsigned int nCurAge )
             {
-                // Thinning publication list
-                publication_record * pPrev = nullptr;
-                for ( publication_record * p = m_pHead; p; ) {
-                    if ( p->nState.load( memory_model::memory_order_acquire ) == active
-                      && p->nAge.load( memory_model::memory_order_acquire ) + m_nCompactFactor < nCurAge )
-                    {
-                        if ( pPrev ) {
-                            publication_record * pNext = p->pNext.load( memory_model::memory_order_acquire );
+                // Compacts publication list
+                // This function is called only by combiner thread
+
+            try_again:
+                publication_record * pPrev = m_pHead;
+                for ( publication_record * p = pPrev->pNext.load( memory_model::memory_order_acquire ); p; ) {
+                    switch ( p->nState.load( memory_model::memory_order_relaxed )) {
+                    case active:
+                        if ( p->nAge.load( memory_model::memory_order_relaxed ) + m_nCompactFactor < nCurAge )
+                        {
+                            publication_record * pNext = p->pNext.load( memory_model::memory_order_relaxed );
                             if ( pPrev->pNext.compare_exchange_strong( p, pNext,
-                                memory_model::memory_order_release, atomics::memory_order_relaxed ))
+                                memory_model::memory_order_acquire, atomics::memory_order_relaxed ))
                             {
                                 p->nState.store( inactive, memory_model::memory_order_release );
                                 p = pNext;
@@ -795,32 +804,41 @@ namespace cds { namespace algo {
                                 continue;
                             }
                         }
+                        break;
+
+                    case removed:
+                        publication_record * pNext = p->pNext.load( memory_model::memory_order_acquire );
+                        if ( cds_likely( pPrev->pNext.compare_exchange_strong( p, pNext, memory_model::memory_order_acquire, atomics::memory_order_relaxed ))) {
+                            p = pNext;
+                            continue;
+                        }
+                        else {
+                            // CAS can be failed only in beginning of list
+                            assert( pPrev == m_pHead );
+                            goto try_again;
+                        }
                     }
                     pPrev = p;
                     p = p->pNext.load( memory_model::memory_order_acquire );
                 }
 
-                m_Stat.onCompactPublicationList();
-            }
-
-            publication_record * unlink_and_delete_record( publication_record * pPrev, publication_record * p )
-            {
-                if ( pPrev ) {
-                    publication_record * pNext = p->pNext.load( memory_model::memory_order_acquire );
-                    if ( pPrev->pNext.compare_exchange_strong( p, pNext,
-                        memory_model::memory_order_release, atomics::memory_order_relaxed ))
-                    {
-                        free_publication_record( static_cast<publication_record_type *>( p ));
-                        m_Stat.onDeletePubRecord();
+                // Iterate over allocated list to find removed records
+                pPrev = m_pAllocatedHead;
+                for ( publication_record * p = pPrev->pNextAllocated.load( memory_model::memory_order_acquire ); p; ) {
+                    if ( p->nState.load( memory_model::memory_order_relaxed ) == removed ) {
+                        publication_record * pNext = p->pNextAllocated.load( memory_model::memory_order_relaxed );
+                        if ( pPrev->pNextAllocated.compare_exchange_strong( p, pNext, memory_model::memory_order_acquire, atomics::memory_order_relaxed )) {
+                            free_publication_record( static_cast<publication_record_type *>( p ));
+                            p = pNext;
+                            continue;
+                        }
                     }
-                    return pNext;
-                }
-                else {
-                    m_pHead = static_cast<publication_record_type *>( p->pNext.load( memory_model::memory_order_acquire ));
-                    free_publication_record( static_cast<publication_record_type *>( p ));
-                    m_Stat.onDeletePubRecord();
-                    return m_pHead;
+
+                    pPrev = p;
+                    p = p->pNextAllocated.load( memory_model::memory_order_relaxed );
                 }
+
+                m_Stat.onCompactPublicationList();
             }
             //@endcond
         };