Removed trailing spaces
[libcds.git] / cds / algo / flat_combining / kernel.h
index f98437592b32dd6c5253e816c015cf29c4e0b2c3..5e333e157bf4daeb1205285e2f715c3af4db126d 100644 (file)
@@ -1,11 +1,11 @@
 /*
     This file is a part of libcds - Concurrent Data Structures library
 
-    (C) Copyright Maxim Khizhinsky (libcds.dev@gmail.com) 2006-2016
+    (C) Copyright Maxim Khizhinsky (libcds.dev@gmail.com) 2006-2017
 
     Source code repo: http://github.com/khizmax/libcds/
     Download: http://sourceforge.net/projects/libcds/files/
-    
+
     Redistribution and use in source and binary forms, with or without
     modification, are permitted provided that the following conditions are met:
 
@@ -25,7 +25,7 @@
     SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
     CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,
     OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
-    OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.     
+    OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
 */
 
 #ifndef CDSLIB_ALGO_FLAT_COMBINING_KERNEL_H
@@ -55,10 +55,10 @@ namespace cds { namespace algo {
         of a <i>publication list</i>. The publication list is a list of thread-local records
         of a size proportional to the number of threads that are concurrently accessing the shared object.
 
-        Each thread \p t accessing the structure to perform an invocation of some method \p m
+        Each thread \p t accessing the structure to perform an invocation of some method \p f()
         on the shared object executes the following sequence of steps:
         <ol>
-        <li>Write the invocation opcode and parameters (if any) of the method \p m to be applied
+        <li>Write the invocation opcode and parameters (if any) of the method \p f() to be applied
         sequentially to the shared object in the <i>request</i> field of your thread local publication
         record (there is no need to use a load-store memory barrier). The <i>request</i> field will later
         be used to receive the response. If your thread local publication record is marked as active
@@ -66,15 +66,15 @@ namespace cds { namespace algo {
         <li>Check if the global lock is taken. If so (another thread is an active combiner), spin on the <i>request</i>
         field waiting for a response to the invocation (one can add a yield at this point to allow other threads
         on the same core to run). Once in a while while spinning check if the lock is still taken and that your
-        record is active. If your record is inactive proceed to step 5. Once the response is available,
-        reset the request field to null and return the response.</li>
+        record is active (you may use any of \p wait_strategy instead of spinning). If your record is inactive proceed to step 5.
+        Once the response is available, reset the request field to null and return the response.</li>
         <li>If the lock is not taken, attempt to acquire it and become a combiner. If you fail,
         return to spinning in step 2.</li>
         <li>Otherwise, you hold the lock and are a combiner.
         <ul>
             <li>Increment the combining pass count by one.</li>
             <li>Execute a \p fc_apply() by traversing the publication list from the head,
-            combining all nonnull method call invocations, setting the <i>age</i> of each of these records
+            combining all non-null method call invocations, setting the <i>age</i> of each of these records
             to the current <i>count</i>, applying the combined method calls to the structure D, and returning
             responses to all the invocations. This traversal is guaranteed to be wait-free.</li>
             <li>If the <i>count</i> is such that a cleanup needs to be performed, traverse the publication
@@ -143,7 +143,7 @@ namespace cds { namespace algo {
             void    onInvokeExclusive()         { ++m_nInvokeExclusive;         }
             void    onWakeupByNotifying()       { ++m_nWakeupByNotifying;       }
             void    onPassiveToCombiner()       { ++m_nPassiveToCombiner;       }
-            
+
             //@endcond
         };
 
@@ -254,11 +254,12 @@ namespace cds { namespace algo {
             //@endcond
 
         protected:
-            atomics::atomic<unsigned int>  m_nCount;   ///< Total count of combining passes. Used as an age.
-            publication_record_type *   m_pHead;    ///< Head of publication list
+            atomics::atomic<unsigned int>  m_nCount;    ///< Total count of combining passes. Used as an age.
+            publication_record_type*    m_pHead;        ///< Head of active publication list
+            publication_record_type*    m_pAllocatedHead; ///< Head of allocated publication list
             boost::thread_specific_ptr< publication_record_type > m_pThreadRec;   ///< Thread-local publication record
-            mutable global_lock_type    m_Mutex;    ///< Global mutex
-            mutable stat                m_Stat;     ///< Internal statistics
+            mutable global_lock_type    m_Mutex;        ///< Global mutex
+            mutable stat                m_Stat;         ///< Internal statistics
             unsigned int const          m_nCompactFactor;    ///< Publication list compacting factor (the list will be compacted through \p %m_nCompactFactor combining passes)
             unsigned int const          m_nCombinePassCount; ///< Number of combining passes
             wait_strategy               m_waitStrategy;      ///< Wait strategy
@@ -281,24 +282,29 @@ namespace cds { namespace algo {
                 )
                 : m_nCount(0)
                 , m_pHead( nullptr )
+                , m_pAllocatedHead( nullptr )
                 , m_pThreadRec( tls_cleanup )
                 , m_nCompactFactor( (unsigned int)( cds::beans::ceil2( nCompactFactor ) - 1 ))   // binary mask
                 , m_nCombinePassCount( nCombinePassCount )
             {
-                init();
+                assert( m_pThreadRec.get() == nullptr );
+                publication_record_type* pRec = cxx11_allocator().New();
+                m_pAllocatedHead =
+                    m_pHead = pRec;
+                m_pThreadRec.reset( pRec );
+                m_Stat.onCreatePubRecord();
             }
 
-            /// Destroys the objects and mark all publication records as inactive
+            /// Destroys the object and all publication records
             ~kernel()
             {
-                // mark all publication record as detached
-                for ( publication_record* p = m_pHead; p; ) {
-                    p->pOwner = nullptr;
+                m_pThreadRec.reset();   // calls tls_cleanup()
 
+                // delete all publication records
+                for ( publication_record* p = m_pAllocatedHead; p; ) {
                     publication_record * pRec = p;
-                    p = p->pNext.load( memory_model::memory_order_relaxed );
-                    if ( pRec->nState.load( memory_model::memory_order_acquire ) == removed )
-                        free_publication_record( static_cast<publication_record_type *>( pRec ));
+                    p = p->pNextAllocated.load( memory_model::memory_order_relaxed );
+                    free_publication_record( static_cast<publication_record_type *>( pRec ));
                 }
             }
 
@@ -313,12 +319,19 @@ namespace cds { namespace algo {
                 if ( !pRec ) {
                     // Allocate new publication record
                     pRec = cxx11_allocator().New();
-                    pRec->pOwner = reinterpret_cast<void *>( this );
                     m_pThreadRec.reset( pRec );
                     m_Stat.onCreatePubRecord();
-                }
 
-                if ( pRec->nState.load( memory_model::memory_order_acquire ) != active )
+                    // Insert in allocated list
+                    assert( m_pAllocatedHead != nullptr );
+                    publication_record* p = m_pAllocatedHead->pNextAllocated.load( memory_model::memory_order_acquire );
+                    do {
+                        pRec->pNextAllocated.store( p, memory_model::memory_order_relaxed );
+                    } while ( !m_pAllocatedHead->pNextAllocated.compare_exchange_weak( p, pRec, memory_model::memory_order_release, atomics::memory_order_acquire ));
+
+                    publish( pRec );
+                }
+                else if ( pRec->nState.load( memory_model::memory_order_acquire ) != active )
                     publish( pRec );
 
                 assert( pRec->op() == req_EmptyRecord );
@@ -329,7 +342,7 @@ namespace cds { namespace algo {
             /// Marks publication record for the current thread as empty
             void release_record( publication_record_type * pRec )
             {
-                assert( pRec->is_done() );
+                assert( pRec->is_done());
                 pRec->nRequest.store( req_EmptyRecord, memory_model::memory_order_release );
             }
 
@@ -391,7 +404,7 @@ namespace cds { namespace algo {
                 Some operation in flat combining containers should be called in exclusive mode
                 i.e the current thread should become the combiner to process the operation.
                 The typical example is \p empty() function.
-                
+
                 \p %invoke_exclusive() allows do that: the current thread becomes the combiner,
                 invokes \p f exclusively but unlike a typical usage the thread does not process any pending request.
                 Instead, after end of \p f call the current thread wakes up a pending thread if any.
@@ -410,7 +423,7 @@ namespace cds { namespace algo {
             /// Marks \p rec as executed
             /**
                 This function should be called by container if \p batch_combine mode is used.
-                For usual combining (see \p combine() ) this function is excess.
+                For usual combining (see \p combine()) this function is excess.
             */
             void operation_done( publication_record& rec )
             {
@@ -571,49 +584,31 @@ namespace cds { namespace algo {
             {
                 // Thread done
                 // pRec that is TLS data should be excluded from publication list
-                if ( pRec ) {
-                    if ( pRec->pOwner ) {
-                        // kernel is alive
-                        pRec->nState.store( removed, memory_model::memory_order_release );
-                    }
-                    else {
-                        // kernel already deleted
-                        free_publication_record( pRec );
-                    }
-                }
+                pRec->nState.store( removed, memory_model::memory_order_release );
             }
 
-            static void free_publication_record( publication_record_type* pRec )
+            void free_publication_record( publication_record_type* pRec )
             {
                 cxx11_allocator().Delete( pRec );
-            }
-
-            void init()
-            {
-                assert( m_pThreadRec.get() == nullptr );
-                publication_record_type* pRec = cxx11_allocator().New();
-                m_pHead = pRec;
-                pRec->pOwner = this;
-                m_pThreadRec.reset( pRec );
-                m_Stat.onCreatePubRecord();
+                m_Stat.onDeletePubRecord();
             }
 
             void publish( publication_record_type* pRec )
             {
                 assert( pRec->nState.load( memory_model::memory_order_relaxed ) == inactive );
 
-                pRec->nAge.store( m_nCount.load(memory_model::memory_order_relaxed), memory_model::memory_order_release );
-                pRec->nState.store( active, memory_model::memory_order_release );
+                pRec->nAge.store( m_nCount.load(memory_model::memory_order_relaxed), memory_model::memory_order_relaxed );
+                pRec->nState.store( active, memory_model::memory_order_relaxed );
 
                 // Insert record to publication list
-                if ( m_pHead != static_cast<publication_record *>(pRec) ) {
+                if ( m_pHead != static_cast<publication_record *>(pRec)) {
                     publication_record * p = m_pHead->pNext.load(memory_model::memory_order_relaxed);
                     if ( p != static_cast<publication_record *>( pRec )) {
                         do {
-                            pRec->pNext = p;
+                            pRec->pNext.store( p, memory_model::memory_order_relaxed );
                             // Failed CAS changes p
                         } while ( !m_pHead->pNext.compare_exchange_weak( p, static_cast<publication_record *>(pRec),
-                            memory_model::memory_order_release, atomics::memory_order_relaxed ));
+                            memory_model::memory_order_release, atomics::memory_order_acquire ));
                         m_Stat.onActivatePubRecord();
                     }
                 }
@@ -630,9 +625,9 @@ namespace cds { namespace algo {
             template <class Container>
             void try_combining( Container& owner, publication_record_type* pRec )
             {
-                if ( m_Mutex.try_lock() ) {
+                if ( m_Mutex.try_lock()) {
                     // The thread becomes a combiner
-                    lock_guard l( m_Mutex, std::adopt_lock_t() );
+                    lock_guard l( m_Mutex, std::adopt_lock_t());
 
                     // The record pRec can be excluded from publication list. Re-publish it
                     republish( pRec );
@@ -642,9 +637,9 @@ namespace cds { namespace algo {
                 }
                 else {
                     // There is another combiner, wait while it executes our request
-                    if ( !wait_for_combining( pRec ) ) {
+                    if ( !wait_for_combining( pRec )) {
                         // The thread becomes a combiner
-                        lock_guard l( m_Mutex, std::adopt_lock_t() );
+                        lock_guard l( m_Mutex, std::adopt_lock_t());
 
                         // The record pRec can be excluded from publication list. Re-publish it
                         republish( pRec );
@@ -658,9 +653,9 @@ namespace cds { namespace algo {
             template <class Container>
             void try_batch_combining( Container& owner, publication_record_type * pRec )
             {
-                if ( m_Mutex.try_lock() ) {
+                if ( m_Mutex.try_lock()) {
                     // The thread becomes a combiner
-                    lock_guard l( m_Mutex, std::adopt_lock_t() );
+                    lock_guard l( m_Mutex, std::adopt_lock_t());
 
                     // The record pRec can be excluded from publication list. Re-publish it
                     republish( pRec );
@@ -670,9 +665,9 @@ namespace cds { namespace algo {
                 }
                 else {
                     // There is another combiner, wait while it executes our request
-                    if ( !wait_for_combining( pRec ) ) {
+                    if ( !wait_for_combining( pRec )) {
                         // The thread becomes a combiner
-                        lock_guard l( m_Mutex, std::adopt_lock_t() );
+                        lock_guard l( m_Mutex, std::adopt_lock_t());
 
                         // The record pRec can be excluded from publication list. Re-publish it
                         republish( pRec );
@@ -687,7 +682,7 @@ namespace cds { namespace algo {
             void combining( Container& owner )
             {
                 // The thread is a combiner
-                assert( !m_Mutex.try_lock() );
+                assert( !m_Mutex.try_lock());
 
                 unsigned int const nCurAge = m_nCount.fetch_add( 1, memory_model::memory_order_relaxed ) + 1;
 
@@ -701,39 +696,36 @@ namespace cds { namespace algo {
                 }
 
                 m_Stat.onCombining();
-                if ( (nCurAge & m_nCompactFactor) == 0 )
+                if ( ( nCurAge & m_nCompactFactor ) == 0 )
                     compact_list( nCurAge );
             }
 
             template <class Container>
             bool combining_pass( Container& owner, unsigned int nCurAge )
             {
-                publication_record* pPrev = nullptr;
                 publication_record* p = m_pHead;
                 bool bOpDone = false;
                 while ( p ) {
                     switch ( p->nState.load( memory_model::memory_order_acquire )) {
-                        case active:
-                            if ( p->op() >= req_Operation ) {
-                                p->nAge.store( nCurAge, memory_model::memory_order_release );
-                                owner.fc_apply( static_cast<publication_record_type*>(p) );
-                                operation_done( *p );
-                                bOpDone = true;
-                            }
-                            break;
-                        case inactive:
-                            // Only m_pHead can be inactive in the publication list
-                            assert( p == m_pHead );
-                            break;
-                        case removed:
-                            // The record should be removed
-                            p = unlink_and_delete_record( pPrev, p );
-                            continue;
-                        default:
-                            /// ??? That is impossible
-                            assert(false);
+                    case active:
+                        if ( p->op() >= req_Operation ) {
+                            p->nAge.store( nCurAge, memory_model::memory_order_relaxed );
+                            owner.fc_apply( static_cast<publication_record_type*>( p ));
+                            operation_done( *p );
+                            bOpDone = true;
+                        }
+                        break;
+                    case inactive:
+                        // Only m_pHead can be inactive in the publication list
+                        assert( p == m_pHead );
+                        break;
+                    case removed:
+                        // Such record will be removed on compacting phase
+                        break;
+                    default:
+                        /// ??? That is impossible
+                        assert( false );
                     }
-                    pPrev = p;
                     p = p->pNext.load( memory_model::memory_order_acquire );
                 }
                 return bOpDone;
@@ -743,16 +735,16 @@ namespace cds { namespace algo {
             void batch_combining( Container& owner )
             {
                 // The thread is a combiner
-                assert( !m_Mutex.try_lock() );
+                assert( !m_Mutex.try_lock());
 
                 unsigned int const nCurAge = m_nCount.fetch_add( 1, memory_model::memory_order_relaxed ) + 1;
 
                 for ( unsigned int nPass = 0; nPass < m_nCombinePassCount; ++nPass )
-                    owner.fc_process( begin(), end() );
+                    owner.fc_process( begin(), end());
 
                 combining_pass( owner, nCurAge );
                 m_Stat.onCombining();
-                if ( (nCurAge & m_nCompactFactor) == 0 )
+                if ( ( nCurAge & m_nCompactFactor ) == 0 )
                     compact_list( nCurAge );
             }
 
@@ -771,7 +763,7 @@ namespace cds { namespace algo {
                     if ( m_waitStrategy.wait( *this, *pRec ))
                         m_Stat.onWakeupByNotifying();
 
-                    if ( m_Mutex.try_lock() ) {
+                    if ( m_Mutex.try_lock()) {
                         if ( pRec->op( memory_model::memory_order_acquire ) == req_Response ) {
                             // Operation is done
                             m_Mutex.unlock();
@@ -790,18 +782,21 @@ namespace cds { namespace algo {
                 return true;
             }
 
-            void compact_list( unsigned int const nCurAge )
+            void compact_list( unsigned int nCurAge )
             {
-                // Thinning publication list
-                publication_record * pPrev = nullptr;
-                for ( publication_record * p = m_pHead; p; ) {
-                    if ( p->nState.load( memory_model::memory_order_acquire ) == active
-                      && p->nAge.load( memory_model::memory_order_acquire ) + m_nCompactFactor < nCurAge )
-                    {
-                        if ( pPrev ) {
-                            publication_record * pNext = p->pNext.load( memory_model::memory_order_acquire );
+                // Compacts publication list
+                // This function is called only by combiner thread
+
+            try_again:
+                publication_record * pPrev = m_pHead;
+                for ( publication_record * p = pPrev->pNext.load( memory_model::memory_order_acquire ); p; ) {
+                    switch ( p->nState.load( memory_model::memory_order_relaxed )) {
+                    case active:
+                        if ( p->nAge.load( memory_model::memory_order_relaxed ) + m_nCompactFactor < nCurAge )
+                        {
+                            publication_record * pNext = p->pNext.load( memory_model::memory_order_relaxed );
                             if ( pPrev->pNext.compare_exchange_strong( p, pNext,
-                                memory_model::memory_order_release, atomics::memory_order_relaxed ))
+                                memory_model::memory_order_acquire, atomics::memory_order_relaxed ))
                             {
                                 p->nState.store( inactive, memory_model::memory_order_release );
                                 p = pNext;
@@ -809,32 +804,41 @@ namespace cds { namespace algo {
                                 continue;
                             }
                         }
+                        break;
+
+                    case removed:
+                        publication_record * pNext = p->pNext.load( memory_model::memory_order_acquire );
+                        if ( cds_likely( pPrev->pNext.compare_exchange_strong( p, pNext, memory_model::memory_order_acquire, atomics::memory_order_relaxed ))) {
+                            p = pNext;
+                            continue;
+                        }
+                        else {
+                            // CAS can be failed only in beginning of list
+                            assert( pPrev == m_pHead );
+                            goto try_again;
+                        }
                     }
                     pPrev = p;
                     p = p->pNext.load( memory_model::memory_order_acquire );
                 }
 
-                m_Stat.onCompactPublicationList();
-            }
-
-            publication_record * unlink_and_delete_record( publication_record * pPrev, publication_record * p )
-            {
-                if ( pPrev ) {
-                    publication_record * pNext = p->pNext.load( memory_model::memory_order_acquire );
-                    if ( pPrev->pNext.compare_exchange_strong( p, pNext,
-                        memory_model::memory_order_release, atomics::memory_order_relaxed ))
-                    {
-                        free_publication_record( static_cast<publication_record_type *>( p ));
-                        m_Stat.onDeletePubRecord();
+                // Iterate over allocated list to find removed records
+                pPrev = m_pAllocatedHead;
+                for ( publication_record * p = pPrev->pNextAllocated.load( memory_model::memory_order_acquire ); p; ) {
+                    if ( p->nState.load( memory_model::memory_order_relaxed ) == removed ) {
+                        publication_record * pNext = p->pNextAllocated.load( memory_model::memory_order_relaxed );
+                        if ( pPrev->pNextAllocated.compare_exchange_strong( p, pNext, memory_model::memory_order_acquire, atomics::memory_order_relaxed )) {
+                            free_publication_record( static_cast<publication_record_type *>( p ));
+                            p = pNext;
+                            continue;
+                        }
                     }
-                    return pNext;
-                }
-                else {
-                    m_pHead = static_cast<publication_record_type *>( p->pNext.load( memory_model::memory_order_acquire ));
-                    free_publication_record( static_cast<publication_record_type *>( p ));
-                    m_Stat.onDeletePubRecord();
-                    return m_pHead;
+
+                    pPrev = p;
+                    p = p->pNextAllocated.load( memory_model::memory_order_relaxed );
                 }
+
+                m_Stat.onCompactPublicationList();
             }
             //@endcond
         };