From a865fd850a706e165a6556b91b38f19363451a02 Mon Sep 17 00:00:00 2001 From: khizmax Date: Thu, 21 May 2015 08:59:12 +0300 Subject: [PATCH] Fixed MichaelList removal bug (to be continued) --- cds/intrusive/michael_list_rcu.h | 184 +++++++++++++++++++++++-------- 1 file changed, 141 insertions(+), 43 deletions(-) diff --git a/cds/intrusive/michael_list_rcu.h b/cds/intrusive/michael_list_rcu.h index 25f9ce58..54083229 100644 --- a/cds/intrusive/michael_list_rcu.h +++ b/cds/intrusive/michael_list_rcu.h @@ -11,6 +11,30 @@ namespace cds { namespace intrusive { + //@cond + namespace michael_list { + + /// Node specialization for uRCU + template + struct node< cds::urcu::gc< RCU >, Tag > + { + typedef cds::urcu::gc< RCU > gc; ///< Garbage collector + typedef Tag tag; ///< tag + + typedef cds::details::marked_ptr marked_ptr ; ///< marked pointer + typedef typename gc::template atomic_marked_ptr< marked_ptr> atomic_marked_ptr ; ///< atomic marked pointer specific for GC + + atomic_marked_ptr m_pNext ; ///< pointer to the next node in the container + node * m_pDelChain; ///< Deleted node chain (local for a thread) + + CDS_CONSTEXPR node() CDS_NOEXCEPT + : m_pNext( nullptr ) + , m_pDelChain( nullptr ) + {} + }; + } // namespace michael_list + //@endcond + /// Michael's lock-free ordered single-linked list (template specialization for \ref cds_urcu_desc "RCU") /** @ingroup cds_intrusive_list \anchor cds_intrusive_MichaelList_rcu @@ -99,6 +123,26 @@ namespace cds { namespace intrusive { atomic_node_ptr * pPrev ; ///< Previous node node_type * pCur ; ///< Current node node_type * pNext ; ///< Next node + + atomic_node_ptr& refHead; + node_type * pDelChain; ///< Head of deleted node chain + + position( atomic_node_ptr& head ) + : refHead( head ) + , pDelChain( nullptr ) + {} + +# ifdef _DEBUG + ~position() + { + assert( pDelChain == nullptr ); + } +# endif + }; + + enum { + erase_mask = 1, + extract_mask = 3 }; typedef cds::urcu::details::check_deadlock_policy< gc, rcu_check_deadlock> check_deadlock_policy; @@ -106,6 +150,7 @@ namespace cds { namespace intrusive { static void clear_links( node_type * pNode ) { pNode->m_pNext.store( marked_node_ptr(), memory_model::memory_order_relaxed ); + pNode->m_pDelChain = nullptr; } struct clear_and_dispose { @@ -142,16 +187,47 @@ namespace cds { namespace intrusive { return pos.pPrev->compare_exchange_strong( p, marked_node_ptr(pNode), memory_model::memory_order_release, atomics::memory_order_relaxed ); } - bool unlink_node( position& pos ) + static void link_to_remove_chain( position& pos, node_type * pDel ) + { + assert( pDel->m_pDelChain == nullptr ); + + pDel->m_pDelChain = pos.pDelChain; + pos.pDelChain = pDel; + } + + static void free_node_chain( position& pos ) + { + assert( !gc::is_locked() ); + + node_type * p = pos.pDelChain; + if ( p ) { + pos.pDelChain = nullptr; + while ( p ) { + node_type * pNext = p->m_pDelChain; + dispose_node( p ); + p = pNext; + } + } + } + + bool unlink_node( position& pos, bool bExtract ) { - // Mark the node (logical deleting) + // Mark the node (logical deletion) marked_node_ptr next(pos.pNext, 0); - if ( pos.pCur->m_pNext.compare_exchange_strong( next, marked_node_ptr(pos.pNext, 1), memory_model::memory_order_release, atomics::memory_order_relaxed )) { + + int const nMask = bExtract ? extract_mask : erase_mask; + if ( pos.pCur->m_pNext.compare_exchange_strong( next, next | nMask, memory_model::memory_order_acq_rel, atomics::memory_order_relaxed )) { + + // Try physical removal marked_node_ptr cur(pos.pCur); - if ( pos.pPrev->compare_exchange_strong( cur, marked_node_ptr( pos.pNext ), memory_model::memory_order_acquire, atomics::memory_order_relaxed )) - return true; - next |= 1; - CDS_VERIFY( pos.pCur->m_pNext.compare_exchange_strong( next, next ^ 1, memory_model::memory_order_relaxed, atomics::memory_order_relaxed )); + if ( pos.pPrev->compare_exchange_strong(cur, marked_node_ptr(pos.pNext), memory_model::memory_order_acquire, atomics::memory_order_relaxed) ) { + if ( !bExtract ) + link_to_remove_chain( pos, pos.pCur ); + } + else { + search( pos.refHead, *node_traits::to_value_ptr( pos.pCur ), pos, key_comparator() ); + } + return true; } return false; } @@ -679,8 +755,8 @@ namespace cds { namespace intrusive { RCU \p synchronize method can be called. Note that depending on RCU type used the \ref disposer invocation can be deferred. - The function can throw cds::urcu::rcu_deadlock exception if an deadlock is encountered and - deadlock checking policy is opt::v::rcu_throw_deadlock. + The function can throw \p cds::urcu::rcu_deadlock exception if an deadlock is encountered and + deadlock checking policy is \p opt::v::rcu_throw_deadlock. */ void clear() { @@ -691,7 +767,7 @@ namespace cds { namespace intrusive { for (;;) { { rcu_lock l; - pHead = m_pHead.load(memory_model::memory_order_consume); + pHead = m_pHead.load(memory_model::memory_order_acquire); if ( !pHead.ptr() ) break; marked_node_ptr pNext( pHead->m_pNext.load(memory_model::memory_order_relaxed) ); @@ -745,12 +821,20 @@ namespace cds { namespace intrusive { return insert_at( refHead, *node_traits::to_value_ptr( pNode ) ); } - bool insert_at( atomic_node_ptr& refHead, value_type& val, bool bLock = true ) + bool insert_at( atomic_node_ptr& refHead, value_type& val ) { + rcu_lock l; + return insert_at_locked( refHead, val ); + } + + bool insert_at_locked( atomic_node_ptr& refHead, value_type& val ) + { + // RCU lock should be locked!!! + assert( gc::is_locked() ); + link_checker::is_empty( node_traits::to_node_ptr( val ) ); - position pos; + position pos( refHead ); - rcu_lock l( bLock ); while ( true ) { if ( search( refHead, val, pos, key_comparator() ) ) return false; @@ -769,7 +853,7 @@ namespace cds { namespace intrusive { bool insert_at( atomic_node_ptr& refHead, value_type& val, Func f ) { link_checker::is_empty( node_traits::to_node_ptr( val ) ); - position pos; + position pos( refHead ); rcu_lock l; while ( true ) { @@ -787,20 +871,29 @@ namespace cds { namespace intrusive { } } - iterator insert_at_( atomic_node_ptr& refHead, value_type& val, bool bLock = true ) + iterator insert_at_( atomic_node_ptr& refHead, value_type& val ) { - rcu_lock l( bLock ); - if ( insert_at( refHead, val, false )) + rcu_lock l; + if ( insert_at_locked( refHead, val )) return iterator( node_traits::to_node_ptr( val )); return end(); } template - std::pair ensure_at_( atomic_node_ptr& refHead, value_type& val, Func func, bool bLock = true ) + std::pair ensure_at_( atomic_node_ptr& refHead, value_type& val, Func func ) { - position pos; + rcu_lock l; + return ensure_at_locked( refHead, val, func ); + } + + template + std::pair ensure_at_locked( atomic_node_ptr& refHead, value_type& val, Func func ) + { + position pos( refHead ); + + // RCU lock should be locked!!! + assert( gc::is_locked() ); - rcu_lock l( bLock ); while ( true ) { if ( search( refHead, val, pos, key_comparator() ) ) { assert( key_comparator()( val, *node_traits::to_value_ptr( *pos.pCur ) ) == 0 ); @@ -824,16 +917,16 @@ namespace cds { namespace intrusive { } template - std::pair ensure_at( atomic_node_ptr& refHead, value_type& val, Func func, bool bLock = true ) + std::pair ensure_at( atomic_node_ptr& refHead, value_type& val, Func func ) { - rcu_lock l( bLock ); - std::pair ret = ensure_at_( refHead, val, func, false ); + rcu_lock l; + std::pair ret = ensure_at_locked( refHead, val, func ); return std::make_pair( ret.first != end(), ret.second ); } bool unlink_at( atomic_node_ptr& refHead, value_type& val ) { - position pos; + position pos( refHead ); back_off bkoff; check_deadlock_policy::check(); @@ -842,14 +935,14 @@ namespace cds { namespace intrusive { rcu_lock l; if ( !search( refHead, val, pos, key_comparator() ) || node_traits::to_value_ptr( *pos.pCur ) != &val ) return false; - if ( !unlink_node( pos )) { + if ( !unlink_node( pos, false )) { bkoff(); continue; } } --m_ItemCounter; - dispose_node( pos.pCur ); + free_node_chain( pos ); return true; } } @@ -865,7 +958,7 @@ namespace cds { namespace intrusive { rcu_lock l; if ( !search( refHead, val, pos, cmp ) ) return false; - if ( !unlink_node( pos )) { + if ( !unlink_node( pos, false )) { bkoff(); continue; } @@ -873,7 +966,7 @@ namespace cds { namespace intrusive { f( *node_traits::to_value_ptr( *pos.pCur ) ); --m_ItemCounter; - dispose_node( pos.pCur ); + free_node_chain( pos ); return true; } } @@ -881,28 +974,28 @@ namespace cds { namespace intrusive { template bool erase_at( atomic_node_ptr& refHead, Q const& val, Compare cmp, Func f ) { - position pos; + position pos( refHead ); return erase_at( refHead, val, cmp, f, pos ); } template bool erase_at( atomic_node_ptr& refHead, const Q& val, Compare cmp ) { - position pos; + position pos( refHead ); return erase_at( refHead, val, cmp, [](value_type const&){}, pos ); } template value_type * extract_at( atomic_node_ptr& refHead, Q const& val, Compare cmp ) { - position pos; + position pos( refHead ); back_off bkoff; assert( gc::is_locked() ) ; // RCU must be locked!!! for (;;) { if ( !search( refHead, val, pos, cmp ) ) return nullptr; - if ( !unlink_node( pos )) { + if ( !unlink_node( pos, true )) { bkoff(); continue; } @@ -913,11 +1006,11 @@ namespace cds { namespace intrusive { } template - bool find_at( atomic_node_ptr& refHead, Q& val, Compare cmp, Func f, bool bLock = true ) const + bool find_at( atomic_node_ptr& refHead, Q& val, Compare cmp, Func f ) const { - position pos; + position pos( refHead ); - rcu_lock l( bLock ); + rcu_lock l; if ( search( refHead, val, pos, cmp ) ) { assert( pos.pCur != nullptr ); f( *node_traits::to_value_ptr( *pos.pCur ), val ); @@ -946,7 +1039,7 @@ namespace cds { namespace intrusive { const_iterator find_at_( atomic_node_ptr& refHead, Q const& val, Compare cmp ) const { assert( gc::is_locked() ); - position pos; + position pos( refHead ); if ( search( refHead, val, pos, cmp ) ) { assert( pos.pCur != nullptr ); @@ -961,7 +1054,7 @@ namespace cds { namespace intrusive { //@cond template - bool search( atomic_node_ptr& refHead, const Q& val, position& pos, Compare cmp ) const + bool search( atomic_node_ptr& refHead, const Q& val, position& pos, Compare cmp ) { // RCU lock should be locked!!! assert( gc::is_locked() ); @@ -988,17 +1081,22 @@ namespace cds { namespace intrusive { pNext = pCur->m_pNext.load(memory_model::memory_order_acquire); if ( pPrev->load(memory_model::memory_order_relaxed) != pCur - || pNext != pCur->m_pNext.load(memory_model::memory_order_relaxed) - || pNext.bits() != 0 ) // pNext contains deletion mark for pCur + || pNext != pCur->m_pNext.load(memory_model::memory_order_relaxed)) { - // if pCur is marked as deleted (pNext.bits() != 0) - // we wait for physical removal. - // Helping technique is not suitable for RCU since it requires - // that the RCU should be in unlocking state. bkoff(); goto try_again; } + if ( pNext.bits() ) { + // pCur is marked as deleted. Try to unlink it from the list + if ( pPrev->compare_exchange_weak( pCur, marked_node_ptr( pNext.ptr() ), memory_model::memory_order_acquire, atomics::memory_order_relaxed ) ) { + if ( pNext.bits() == erase_mask ) + link_to_remove_chain( pos, pCur.ptr() ); + } + + goto try_again; + } + assert( pCur.ptr() != nullptr ); int nCmp = cmp( *node_traits::to_value_ptr( pCur.ptr() ), val ); if ( nCmp >= 0 ) { -- 2.34.1