X-Git-Url: http://demsky.eecs.uci.edu/git/?a=blobdiff_plain;f=cds%2Fintrusive%2Fskip_list_rcu.h;h=dfe817171bb47ee731079041cfc4013e7cfa7121;hb=40e34e6d0b104b6f5aff506ad67d43fd410e52bc;hp=e5d17ee82923d526ee1222d581d7867b508dd89d;hpb=1503681e52c392bee2329cf66c910be0f8640105;p=libcds.git diff --git a/cds/intrusive/skip_list_rcu.h b/cds/intrusive/skip_list_rcu.h index e5d17ee8..dfe81717 100644 --- a/cds/intrusive/skip_list_rcu.h +++ b/cds/intrusive/skip_list_rcu.h @@ -1,17 +1,45 @@ -//$$CDS-header$$ +/* + This file is a part of libcds - Concurrent Data Structures library -#ifndef __CDS_INTRUSIVE_SKIP_LIST_RCU_H -#define __CDS_INTRUSIVE_SKIP_LIST_RCU_H + (C) Copyright Maxim Khizhinsky (libcds.dev@gmail.com) 2006-2017 + + Source code repo: http://github.com/khizmax/libcds/ + Download: http://sourceforge.net/projects/libcds/files/ + + Redistribution and use in source and binary forms, with or without + modification, are permitted provided that the following conditions are met: + + * Redistributions of source code must retain the above copyright notice, this + list of conditions and the following disclaimer. + + * Redistributions in binary form must reproduce the above copyright notice, + this list of conditions and the following disclaimer in the documentation + and/or other materials provided with the distribution. + + THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" + AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE + IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE + DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE + FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL + DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR + SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER + CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, + OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. +*/ + +#ifndef CDSLIB_INTRUSIVE_SKIP_LIST_RCU_H +#define CDSLIB_INTRUSIVE_SKIP_LIST_RCU_H #include #include -#include // ref #include #include #include #include #include - +#include +#include namespace cds { namespace intrusive { @@ -28,42 +56,29 @@ namespace cds { namespace intrusive { // Mark bits: // bit 0 - the item is logically deleted // bit 1 - the item is extracted (only for level 0) - typedef cds::details::marked_ptr marked_ptr ; ///< marked pointer - typedef atomics::atomic< marked_ptr > atomic_marked_ptr ; ///< atomic marked pointer - typedef atomic_marked_ptr tower_item_type; + typedef cds::details::marked_ptr marked_ptr; ///< marked pointer + typedef atomics::atomic< marked_ptr > atomic_marked_ptr; ///< atomic marked pointer + typedef atomic_marked_ptr tower_item_type; protected: - atomic_marked_ptr m_pNext ; ///< Next item in bottom-list (list at level 0) + atomic_marked_ptr m_pNext; ///< Next item in bottom-list (list at level 0) public: - node * m_pDelChain ; ///< Deleted node chain (local for a thread) -# ifdef _DEBUG - bool volatile m_bLinked; - bool volatile m_bUnlinked; -# endif + node * m_pDelChain; ///< Deleted node chain (local for a thread) protected: - unsigned int m_nHeight ; ///< Node height (size of m_arrNext array). For node at level 0 the height is 1. - atomic_marked_ptr * m_arrNext ; ///< Array of next items for levels 1 .. m_nHeight - 1. For node at level 0 \p m_arrNext is \p nullptr + unsigned int m_nHeight; ///< Node height (size of m_arrNext array). For node at level 0 the height is 1. + atomic_marked_ptr * m_arrNext; ///< Array of next items for levels 1 .. m_nHeight - 1. For node at level 0 \p m_arrNext is \p nullptr + atomics::atomic m_nUnlink; ///< Unlink helper public: /// Constructs a node of height 1 (a bottom-list node) - node() + CDS_CONSTEXPR node() : m_pNext( nullptr ) , m_pDelChain( nullptr ) -# ifdef _DEBUG - , m_bLinked( false ) - , m_bUnlinked( false ) -# endif , m_nHeight(1) , m_arrNext( nullptr ) + , m_nUnlink(1) {} -# ifdef _DEBUG - ~node() - { - assert( !m_bLinked || m_bUnlinked ); - } -# endif - /// Constructs a node of height \p nHeight void make_tower( unsigned int nHeight, atomic_marked_ptr * nextTower ) { @@ -74,6 +89,7 @@ namespace cds { namespace intrusive { m_arrNext = nextTower; m_nHeight = nHeight; + m_nUnlink.store( nHeight, atomics::memory_order_release ); } atomic_marked_ptr * release_tower() @@ -98,19 +114,35 @@ namespace cds { namespace intrusive { /// Access to element of next pointer array atomic_marked_ptr& next( unsigned int nLevel ) { - assert( nLevel < height() ); - assert( nLevel == 0 || (nLevel > 0 && m_arrNext != nullptr) ); - + assert( nLevel < height()); + assert( nLevel == 0 || (nLevel > 0 && m_arrNext != nullptr)); + +# ifdef CDS_THREAD_SANITIZER_ENABLED + // TSan false positive: m_arrNext is read-only array + CDS_TSAN_ANNOTATE_IGNORE_READS_BEGIN; + atomic_marked_ptr& r = nLevel ? m_arrNext[ nLevel - 1] : m_pNext; + CDS_TSAN_ANNOTATE_IGNORE_READS_END; + return r; +# else return nLevel ? m_arrNext[ nLevel - 1] : m_pNext; +# endif } /// Access to element of next pointer array (const version) atomic_marked_ptr const& next( unsigned int nLevel ) const { - assert( nLevel < height() ); + assert( nLevel < height()); assert( nLevel == 0 || nLevel > 0 && m_arrNext != nullptr ); +# ifdef CDS_THREAD_SANITIZER_ENABLED + // TSan false positive: m_arrNext is read-only array + CDS_TSAN_ANNOTATE_IGNORE_READS_BEGIN; + atomic_marked_ptr& r = nLevel ? m_arrNext[ nLevel - 1] : m_pNext; + CDS_TSAN_ANNOTATE_IGNORE_READS_END; + return r; +# else return nLevel ? m_arrNext[ nLevel - 1] : m_pNext; +# endif } /// Access to element of next pointer array (same as \ref next function) @@ -145,6 +177,16 @@ namespace cds { namespace intrusive { && m_arrNext == nullptr && m_nHeight <= 1; } + + bool level_unlinked( unsigned nCount = 1 ) + { + return m_nUnlink.fetch_sub( nCount, std::memory_order_relaxed ) == 1; + } + + bool is_upper_level( unsigned nLevel ) const + { + return m_nUnlink.load( atomics::memory_order_relaxed ) == nLevel + 1; + } }; } // namespace skip_list //@endcond @@ -174,13 +216,10 @@ namespace cds { namespace intrusive { protected: void next() { - // RCU should be locked before iterating!!! - assert( gc::is_locked() ); - back_off bkoff; for (;;) { - if ( m_pNode->next( m_pNode->height() - 1 ).load( atomics::memory_order_acquire ).bits() ) { + if ( m_pNode->next( m_pNode->height() - 1 ).load( atomics::memory_order_acquire ).bits()) { // Current node is marked as deleted. So, its next pointer can point to anything // In this case we interrupt our iteration and returns end() iterator. *this = iterator(); @@ -189,12 +228,12 @@ namespace cds { namespace intrusive { marked_ptr p = m_pNode->next(0).load( atomics::memory_order_relaxed ); node_type * pp = p.ptr(); - if ( p.bits() ) { + if ( p.bits()) { // p is marked as deleted. Spin waiting for physical removal bkoff(); continue; } - else if ( pp && pp->next( pp->height() - 1 ).load( atomics::memory_order_relaxed ).bits() ) { + else if ( pp && pp->next( pp->height() - 1 ).load( atomics::memory_order_relaxed ).bits()) { // p is marked as deleted. Spin waiting for physical removal bkoff(); continue; @@ -209,21 +248,18 @@ namespace cds { namespace intrusive { iterator( node_type& refHead ) : m_pNode( nullptr ) { - // RCU should be locked before iterating!!! - assert( gc::is_locked() ); - back_off bkoff; for (;;) { marked_ptr p = refHead.next(0).load( atomics::memory_order_relaxed ); - if ( !p.ptr() ) { + if ( !p.ptr()) { // empty skip-list break; } node_type * pp = p.ptr(); // Logically deleted node is marked from highest level - if ( !pp->next( pp->height() - 1 ).load( atomics::memory_order_acquire ).bits() ) { + if ( !pp->next( pp->height() - 1 ).load( atomics::memory_order_acquire ).bits()) { m_pNode = pp; break; } @@ -235,17 +271,11 @@ namespace cds { namespace intrusive { public: iterator() : m_pNode( nullptr ) - { - // RCU should be locked before iterating!!! - assert( gc::is_locked() ); - } + {} iterator( iterator const& s) : m_pNode( s.m_pNode ) - { - // RCU should be locked before iterating!!! - assert( gc::is_locked() ); - } + {} value_type * operator ->() const { @@ -313,39 +343,14 @@ namespace cds { namespace intrusive { The lock-free variant of skip-list is implemented according to book - [2008] M.Herlihy, N.Shavit "The Art of Multiprocessor Programming", chapter 14.4 "A Lock-Free Concurrent Skiplist". - \note The algorithm described in this book cannot be directly adapted for C++ (roughly speaking, - the algo contains a lot of bugs). The \b libcds implementation applies the approach discovered - by M.Michael in his \ref cds_intrusive_MichaelList_hp "lock-free linked list". Template arguments: - \p RCU - one of \ref cds_urcu_gc "RCU type" - \p T - type to be stored in the list. The type must be based on \p skip_list::node (for \p skip_list::base_hook) or it must have a member of type \p skip_list::node (for \p skip_list::member_hook). - - \p Traits - type traits. See \p skip_list::type_traits (the default) for explanation. - - It is possible to declare option-based list with \p cds::intrusive::skip_list::make_traits metafunction instead of \p Traits template - argument. - Template argument list \p Options of \p %cds::intrusive::skip_list::make_traits metafunction is: - - \p opt::hook - hook used. Possible values are: \p skip_list::base_hook, \p skip_list::member_hook, \p skip_list::traits_hook. - If the option is not specified, skip_list::base_hook<> is used. - - \p opt::compare - key comparison functor. No default functor is provided. - If the option is not specified, the \p opt::less is used. - - \p opt::less - specifies binary predicate used for key comparison. Default is \p std::less. - - \p opt::disposer - the functor used for dispose removed items. Default is \p opt::v::empty_disposer. Due the nature - of GC schema the disposer may be called asynchronously. - - \p opt::item_counter - the type of item counting feature. Default is \p atomicity::empty_item_counter that is no item counting. - - \p opt::memory_model - C++ memory ordering model. Can be \p opt::v::relaxed_ordering (relaxed memory model, the default) - or \p opt::v::sequential_consistent (sequentially consisnent memory model). - - \p skip_list::random_level_generator - random level generator. Can be \p skip_list::xorshift, \p skip_list::turbo_pascal or - user-provided one. See \p skip_list::random_level_generator option description for explanation. - Default is \p %skip_list::turbo_pascal. - - \p opt::allocator - although the skip-list is an intrusive container, - an allocator should be provided to maintain variable randomly-calculated height of the node - since the node can contain up to 32 next pointers. The allocator option is used to allocate an array of next pointers - for nodes which height is more than 1. Default is \ref CDS_DEFAULT_ALLOCATOR. - - \p opt::back_off - back-off strategy used. If the option is not specified, the \p cds::backoff::Default is used. - - \p opt::stat - internal statistics. Available types: \p skip_list::stat, \p skip_list::empty_stat (the default) - - \p opt::rcu_check_deadlock - a deadlock checking policy. Default is \p opt::v::rcu_throw_deadlock + - \p Traits - set traits, default is \p skip_list::traits + It is possible to declare option-based list with \p cds::intrusive::skip_list::make_traits metafunction + instead of \p Traits template argument. @note Before including you should include appropriate RCU header file, see \ref cds_urcu_gc "RCU type" for list of existing RCU class and corresponding header files. @@ -378,7 +383,7 @@ namespace cds { namespace intrusive { // Traits for your skip-list. // At least, you should define cds::opt::less or cds::opt::compare for Foo struct - struct my_traits: public cds::intrusive::skip_list::type_traits + struct my_traits: public cds::intrusive::skip_list::traits { // ... }; @@ -428,8 +433,8 @@ namespace cds { namespace intrusive { How to use You should incorporate skip_list::node into your struct \p T and provide - appropriate skip_list::type_traits::hook in your \p Traits template parameters. Usually, for \p Traits you - define a struct based on \p skip_list::type_traits. + appropriate skip_list::traits::hook in your \p Traits template parameters. Usually, for \p Traits you + define a struct based on \p skip_list::traits. Example for cds::urcu::general_buffered<> RCU and base hook: \code @@ -470,9 +475,8 @@ namespace cds { namespace intrusive { } }; - - // Declare type_traits - struct my_traits: public cds::intrusive::skip_list::type_traits + // Declare traits + struct my_traits: public cds::intrusive::skip_list::traits { typedef cds::intrusive::skip_list::base_hook< cds::opt::gc< rcu_type > > hook; typedef my_data_cmp compare; @@ -511,7 +515,7 @@ namespace cds { namespace intrusive { class RCU ,typename T #ifdef CDS_DOXYGEN_INVOKED - ,typename Traits = skip_list::type_traits + ,typename Traits = skip_list::traits #else ,typename Traits #endif @@ -519,31 +523,31 @@ namespace cds { namespace intrusive { class SkipListSet< cds::urcu::gc< RCU >, T, Traits > { public: - typedef T value_type ; ///< type of value stored in the skip-list - typedef Traits options ; ///< Traits template parameter + typedef cds::urcu::gc< RCU > gc; ///< Garbage collector + typedef T value_type; ///< type of value stored in the skip-list + typedef Traits traits; ///< Traits template parameter - typedef typename options::hook hook ; ///< hook type - typedef typename hook::node_type node_type ; ///< node type + typedef typename traits::hook hook; ///< hook type + typedef typename hook::node_type node_type; ///< node type # ifdef CDS_DOXYGEN_INVOKED - typedef implementation_defined key_comparator ; ///< key comparison functor based on opt::compare and opt::less option setter. + typedef implementation_defined key_comparator ; ///< key comparison functor based on \p Traits::compare and \p Traits::less # else - typedef typename opt::details::make_comparator< value_type, options >::type key_comparator; + typedef typename opt::details::make_comparator< value_type, traits >::type key_comparator; # endif - typedef typename options::disposer disposer ; ///< disposer used - typedef typename get_node_traits< value_type, node_type, hook>::type node_traits ; ///< node traits + typedef typename traits::disposer disposer; ///< disposer + typedef typename get_node_traits< value_type, node_type, hook>::type node_traits; ///< node traits - typedef cds::urcu::gc< RCU > gc ; ///< Garbage collector - typedef typename options::item_counter item_counter; ///< Item counting policy used - typedef typename options::memory_model memory_model; ///< Memory ordering. See cds::opt::memory_model option - typedef typename options::random_level_generator random_level_generator ; ///< random level generator - typedef typename options::allocator allocator_type ; ///< allocator for maintaining array of next pointers of the node - typedef typename options::back_off back_off ; ///< Back-off trategy - typedef typename options::stat stat ; ///< internal statistics type - typedef typename options::rcu_check_deadlock rcu_check_deadlock ; ///< Deadlock checking policy - typedef typename gc::scoped_lock rcu_lock ; ///< RCU scoped lock - static CDS_CONSTEXPR_CONST bool c_bExtractLockExternal = false; ///< Group of \p extract_xxx functions does not require external locking + typedef typename traits::item_counter item_counter; ///< Item counting policy used + typedef typename traits::memory_model memory_model; ///< Memory ordering, see \p cds::opt::memory_model option + typedef typename traits::random_level_generator random_level_generator; ///< random level generator + typedef typename traits::allocator allocator_type; ///< allocator for maintaining array of next pointers of the node + typedef typename traits::back_off back_off; ///< Back-off strategy + typedef typename traits::stat stat; ///< internal statistics type + typedef typename traits::rcu_check_deadlock rcu_check_deadlock; ///< Deadlock checking policy + typedef typename gc::scoped_lock rcu_lock; ///< RCU scoped lock + static CDS_CONSTEXPR const bool c_bExtractLockExternal = false; ///< Group of \p extract_xxx functions does not require external locking /// Max node height. The actual node height should be in range [0 .. c_nMaxHeight) @@ -570,13 +574,46 @@ namespace cds { namespace intrusive { typedef skip_list::details::intrusive_node_builder< node_type, atomic_node_ptr, allocator_type > intrusive_node_builder; typedef typename std::conditional< - std::is_same< typename options::internal_node_builder, cds::opt::none >::value + std::is_same< typename traits::internal_node_builder, cds::opt::none >::value ,intrusive_node_builder - ,typename options::internal_node_builder + ,typename traits::internal_node_builder >::type node_builder; typedef std::unique_ptr< node_type, typename node_builder::node_disposer > scoped_node_ptr; + static void dispose_node( value_type * pVal ) + { + assert( pVal ); + + typename node_builder::node_disposer()( node_traits::to_node_ptr(pVal)); + disposer()( pVal ); + } + + struct node_disposer + { + void operator()( value_type * pVal ) + { + dispose_node( pVal ); + } + }; + + static void dispose_chain( node_type * pChain ) + { + if ( pChain ) { + assert( !gc::is_locked()); + + auto f = [&pChain]() -> cds::urcu::retired_ptr { + node_type * p = pChain; + if ( p ) { + pChain = p->m_pDelChain; + return cds::urcu::make_retired_ptr( node_traits::to_value_ptr( p )); + } + return cds::urcu::make_retired_ptr( static_cast(nullptr)); + }; + gc::batch_retire(std::ref(f)); + } + } + struct position { node_type * pPrev[ c_nMaxHeight ]; node_type * pSucc[ c_nMaxHeight ]; @@ -588,25 +625,33 @@ namespace cds { namespace intrusive { position() : pDelChain( nullptr ) {} -# ifdef _DEBUG + ~position() { - assert( pDelChain == nullptr ); + dispose_chain( pDelChain ); + } + + void dispose( node_type * p ) + { + assert( p != nullptr ); + assert( p->m_pDelChain == nullptr ); + + p->m_pDelChain = pDelChain; + pDelChain = p; } -# endif }; typedef cds::urcu::details::check_deadlock_policy< gc, rcu_check_deadlock> check_deadlock_policy; //@endcond protected: - skip_list::details::head_node< node_type > m_Head ; ///< head tower (max height) + skip_list::details::head_node< node_type > m_Head; ///< head tower (max height) - item_counter m_ItemCounter ; ///< item counter - random_level_generator m_RandomLevelGen ; ///< random level generator instance - atomics::atomic m_nHeight ; ///< estimated high level + item_counter m_ItemCounter; ///< item counter + random_level_generator m_RandomLevelGen; ///< random level generator instance + atomics::atomic m_nHeight; ///< estimated high level atomics::atomic m_pDeferredDelChain ; ///< Deferred deleted node chain - mutable stat m_Stat ; ///< internal statistics + mutable stat m_Stat; ///< internal statistics protected: //@cond @@ -622,475 +667,304 @@ namespace cds { namespace intrusive { { return node_builder::make_tower( v, m_RandomLevelGen ); } + //@endcond - static void dispose_node( value_type * pVal ) - { - assert( pVal ); - - typename node_builder::node_disposer()( node_traits::to_node_ptr(pVal) ); - disposer()( pVal ); - } + public: + using exempt_ptr = cds::urcu::exempt_ptr< gc, value_type, value_type, node_disposer, void >; ///< pointer to extracted node - struct node_disposer - { - void operator()( value_type * pVal ) + private: + //@cond + struct chain_disposer { + void operator()( node_type * pChain ) const { - dispose_node( pVal ); + dispose_chain( pChain ); } }; + typedef cds::intrusive::details::raw_ptr_disposer< gc, node_type, chain_disposer> raw_ptr_disposer; //@endcond public: - typedef cds::urcu::exempt_ptr< gc, value_type, value_type, node_disposer, void > exempt_ptr ; ///< pointer to extracted node - - protected: - //@cond + /// Result of \p get(), \p get_with() functions - pointer to the node found + typedef cds::urcu::raw_ptr< gc, value_type, raw_ptr_disposer > raw_ptr; - bool is_extracted( marked_node_ptr const p ) const + public: + /// Default constructor + SkipListSet() + : m_Head( c_nMaxHeight ) + , m_nHeight( c_nMinHeight ) + , m_pDeferredDelChain( nullptr ) { - return (p.bits() & 2) != 0; + static_assert( (std::is_same< gc, typename node_type::gc >::value), "GC and node_type::gc must be the same type" ); + + // Barrier for head node + atomics::atomic_thread_fence( memory_model::memory_order_release ); } - template - bool find_position( Q const& val, position& pos, Compare cmp, bool bStopIfFound ) + /// Clears and destructs the skip-list + ~SkipListSet() { - assert( gc::is_locked() ); - - node_type * pPred; - marked_node_ptr pSucc; - marked_node_ptr pCur; - int nCmp = 1; - - retry: - pPred = m_Head.head(); - - for ( int nLevel = static_cast(c_nMaxHeight - 1); nLevel >= 0; --nLevel ) { - - while ( true ) { - pCur = pPred->next( nLevel ).load( memory_model::memory_order_relaxed ); - if ( pCur.bits() ) { - // pCur.bits() means that pPred is logically deleted - goto retry; - } - - if ( pCur.ptr() == nullptr ) { - // end of the list at level nLevel - goto next level - break; - } - - // pSucc contains deletion mark for pCur - pSucc = pCur->next( nLevel ).load( memory_model::memory_order_relaxed ); + destroy(); + } - if ( pPred->next( nLevel ).load( memory_model::memory_order_relaxed ).all() != pCur.ptr() ) - goto retry; + public: + ///@name Forward iterators (thread-safe under RCU lock) + //@{ + /// Forward iterator + /** + The forward iterator has some features: + - it has no post-increment operator + - it depends on iterator of underlying \p OrderedList - if ( pSucc.bits() ) { - // pCur is marked, i.e. logically deleted. - marked_node_ptr p( pCur.ptr() ); - if ( pPred->next( nLevel ).compare_exchange_strong( p, marked_node_ptr( pSucc.ptr() ), - memory_model::memory_order_release, atomics::memory_order_relaxed )) - { - if ( nLevel == 0 ) { -# ifdef _DEBUG - pCur->m_bUnlinked = true; -# endif - - if ( !is_extracted( pSucc )) { - // We cannot free the node at this moment since RCU is locked - // Link deleted nodes to a chain to free later - link_for_remove( pos, pCur.ptr() ); - m_Stat.onEraseWhileFind(); - } - else { - m_Stat.onExtractWhileFind(); - } - } - } - goto retry; - } - else { - nCmp = cmp( *node_traits::to_value_ptr( pCur.ptr()), val ); - if ( nCmp < 0 ) - pPred = pCur.ptr(); - else if ( nCmp == 0 && bStopIfFound ) - goto found; - else - break; - } - } + You may safely use iterators in multi-threaded environment only under RCU lock. + Otherwise, a crash is possible if another thread deletes the element the iterator points to. + */ + typedef skip_list::details::iterator< gc, node_traits, back_off, false > iterator; - // Next level - pos.pPrev[ nLevel ] = pPred; - pos.pSucc[ nLevel ] = pCur.ptr(); - } + /// Const iterator type + typedef skip_list::details::iterator< gc, node_traits, back_off, true > const_iterator; - if ( nCmp != 0 ) - return false; + /// Returns a forward iterator addressing the first element in a set + iterator begin() + { + return iterator( *m_Head.head()); + } - found: - pos.pCur = pCur.ptr(); - return pCur.ptr() && nCmp == 0; + /// Returns a forward const iterator addressing the first element in a set + const_iterator begin() const + { + return const_iterator( *m_Head.head()); } - bool find_min_position( position& pos ) + /// Returns a forward const iterator addressing the first element in a set + const_iterator cbegin() const { - assert( gc::is_locked() ); + return const_iterator( *m_Head.head()); + } - node_type * pPred; - marked_node_ptr pSucc; - marked_node_ptr pCur; + /// Returns a forward iterator that addresses the location succeeding the last element in a set. + iterator end() + { + return iterator(); + } - retry: - pPred = m_Head.head(); + /// Returns a forward const iterator that addresses the location succeeding the last element in a set. + const_iterator end() const + { + return const_iterator(); + } - for ( int nLevel = static_cast(c_nMaxHeight - 1); nLevel >= 0; --nLevel ) { + /// Returns a forward const iterator that addresses the location succeeding the last element in a set. + const_iterator cend() const + { + return const_iterator(); + } + //@} - pCur = pPred->next( nLevel ).load( memory_model::memory_order_relaxed ); - // pCur.bits() means that pPred is logically deleted - // head cannot be deleted - assert( pCur.bits() == 0 ); + public: + /// Inserts new node + /** + The function inserts \p val in the set if it does not contain + an item with key equal to \p val. - if ( pCur.ptr() ) { + The function applies RCU lock internally. - // pSucc contains deletion mark for pCur - pSucc = pCur->next( nLevel ).load( memory_model::memory_order_relaxed ); + Returns \p true if \p val is placed into the set, \p false otherwise. + */ + bool insert( value_type& val ) + { + return insert( val, []( value_type& ) {} ); + } - if ( pPred->next( nLevel ).load( memory_model::memory_order_relaxed ).all() != pCur.ptr() ) - goto retry; + /// Inserts new node + /** + This function is intended for derived non-intrusive containers. - if ( pSucc.bits() ) { - // pCur is marked, i.e. logically deleted. - marked_node_ptr p( pCur.ptr() ); - if ( pPred->next( nLevel ).compare_exchange_strong( p, marked_node_ptr( pSucc.ptr() ), - memory_model::memory_order_release, atomics::memory_order_relaxed )) - { - if ( nLevel == 0 ) { -# ifdef _DEBUG - pCur->m_bUnlinked = true; -# endif - - if ( !is_extracted( pSucc )) { - // We cannot free the node at this moment since RCU is locked - // Link deleted nodes to a chain to free later - link_for_remove( pos, pCur.ptr() ); - m_Stat.onEraseWhileFind(); - } - else { - m_Stat.onExtractWhileFind(); - } - } - } - goto retry; - } - } + The function allows to split creating of new item into two part: + - create item with key only + - insert new item into the set + - if inserting is success, calls \p f functor to initialize value-field of \p val. - // Next level - pos.pPrev[ nLevel ] = pPred; - pos.pSucc[ nLevel ] = pCur.ptr(); - } - return (pos.pCur = pCur.ptr()) != nullptr; - } + The functor signature is: + \code + void func( value_type& val ); + \endcode + where \p val is the item inserted. User-defined functor \p f should guarantee that during changing + \p val no any other changes could be made on this set's item by concurrent threads. + The user-defined functor is called only if the inserting is success. - bool find_max_position( position& pos ) + RCU \p synchronize method can be called. RCU should not be locked. + */ + template + bool insert( value_type& val, Func f ) { - assert( gc::is_locked() ); + check_deadlock_policy::check(); - node_type * pPred; - marked_node_ptr pSucc; - marked_node_ptr pCur; + position pos; + bool bRet; -retry: - pPred = m_Head.head(); + { + node_type * pNode = node_traits::to_node_ptr( val ); + scoped_node_ptr scp( pNode ); + unsigned int nHeight = pNode->height(); + bool bTowerOk = nHeight > 1 && pNode->get_tower() != nullptr; + bool bTowerMade = false; - for ( int nLevel = static_cast(c_nMaxHeight - 1); nLevel >= 0; --nLevel ) { + rcu_lock rcuLock; - while ( true ) { - pCur = pPred->next( nLevel ).load( memory_model::memory_order_relaxed ); - if ( pCur.bits() ) { - // pCur.bits() means that pPred is logically deleted - goto retry; - } + while ( true ) + { + bool bFound = find_position( val, pos, key_comparator(), true ); + if ( bFound ) { + // scoped_node_ptr deletes the node tower if we create it + if ( !bTowerMade ) + scp.release(); - if ( pCur.ptr() == nullptr ) { - // end of the list at level nLevel - goto next level + m_Stat.onInsertFailed(); + bRet = false; break; } - // pSucc contains deletion mark for pCur - pSucc = pCur->next( nLevel ).load( memory_model::memory_order_relaxed ); + if ( !bTowerOk ) { + build_node( pNode ); + nHeight = pNode->height(); + bTowerMade = + bTowerOk = true; + } - if ( pPred->next( nLevel ).load( memory_model::memory_order_relaxed ).all() != pCur.ptr() ) - goto retry; + if ( !insert_at_position( val, pNode, pos, f )) { + m_Stat.onInsertRetry(); + continue; + } - if ( pSucc.bits() ) { - // pCur is marked, i.e. logically deleted. - marked_node_ptr p( pCur.ptr() ); - if ( pPred->next( nLevel ).compare_exchange_strong( p, marked_node_ptr( pSucc.ptr() ), - memory_model::memory_order_release, atomics::memory_order_relaxed )) - { - if ( nLevel == 0 ) { -# ifdef _DEBUG - pCur->m_bUnlinked = true; -# endif - - if ( !is_extracted( pSucc )) { - // We cannot free the node at this moment since RCU is locked - // Link deleted nodes to a chain to free later - link_for_remove( pos, pCur.ptr() ); - m_Stat.onEraseWhileFind(); - } - else { - m_Stat.onExtractWhileFind(); - } - } - } - goto retry; - } - else { - if ( !pSucc.ptr() ) - break; - - pPred = pCur.ptr(); - } + increase_height( nHeight ); + ++m_ItemCounter; + m_Stat.onAddNode( nHeight ); + m_Stat.onInsertSuccess(); + scp.release(); + bRet = true; + break; } - - // Next level - pos.pPrev[ nLevel ] = pPred; - pos.pSucc[ nLevel ] = pCur.ptr(); } - return (pos.pCur = pCur.ptr()) != nullptr; + return bRet; } - template - bool insert_at_position( value_type& val, node_type * pNode, position& pos, Func f ) - { - assert( gc::is_locked() ); - - unsigned int nHeight = pNode->height(); - pNode->clear_tower(); - - { - marked_node_ptr p( pos.pSucc[0] ); - pNode->next( 0 ).store( p, memory_model::memory_order_release ); - if ( !pos.pPrev[0]->next(0).compare_exchange_strong( p, marked_node_ptr(pNode), memory_model::memory_order_release, atomics::memory_order_relaxed )) { - return false; - } -# ifdef _DEBUG - pNode->m_bLinked = true; -# endif - f( val ); - } + /// Updates the node + /** + The operation performs inserting or changing data with lock-free manner. - for ( unsigned int nLevel = 1; nLevel < nHeight; ++nLevel ) { - marked_node_ptr p; - while ( true ) { - marked_node_ptr q( pos.pSucc[ nLevel ]); - if ( !pNode->next( nLevel ).compare_exchange_strong( p, q, memory_model::memory_order_acquire, atomics::memory_order_relaxed )) { - // pNode has been marked as removed while we are inserting it - // Stop inserting - assert( p.bits() ); - m_Stat.onLogicDeleteWhileInsert(); - return true; - } - p = q; - if ( pos.pPrev[nLevel]->next(nLevel).compare_exchange_strong( q, marked_node_ptr( pNode ), memory_model::memory_order_release, atomics::memory_order_relaxed ) ) - break; + If the item \p val is not found in the set, then \p val is inserted into the set + iff \p bInsert is \p true. + Otherwise, the functor \p func is called with item found. + The functor signature is: + \code + void func( bool bNew, value_type& item, value_type& val ); + \endcode + with arguments: + - \p bNew - \p true if the item has been inserted, \p false otherwise + - \p item - item of the set + - \p val - argument \p val passed into the \p %update() function + If new item has been inserted (i.e. \p bNew is \p true) then \p item and \p val arguments + refer to the same thing. - // Renew insert position - m_Stat.onRenewInsertPosition(); - if ( !find_position( val, pos, key_comparator(), false )) { - // The node has been deleted while we are inserting it - m_Stat.onNotFoundWhileInsert(); - return true; - } - } - } - return true; - } + The functor can change non-key fields of the \p item; however, \p func must guarantee + that during changing no any other modifications could be made on this item by concurrent threads. - static void link_for_remove( position& pos, node_type * pDel ) - { - assert( pDel->m_pDelChain == nullptr ); + RCU \p synchronize method can be called. RCU should not be locked. - pDel->m_pDelChain = pos.pDelChain; - pos.pDelChain = pDel; - } + Returns std::pair where \p first is \p true if operation is successful, + i.e. the node has been inserted or updated, + \p second is \p true if new item has been added or \p false if the item with \p key + already exists. + @warning See \ref cds_intrusive_item_creating "insert item troubleshooting" + */ template - bool try_remove_at( node_type * pDel, position& pos, Func f, bool bExtract ) + std::pair update( value_type& val, Func func, bool bInsert = true ) { - assert( pDel != nullptr ); - assert( gc::is_locked() ); - - marked_node_ptr pSucc; + check_deadlock_policy::check(); - // logical deletion (marking) - for ( unsigned int nLevel = pDel->height() - 1; nLevel > 0; --nLevel ) { - pSucc = pDel->next(nLevel).load( memory_model::memory_order_relaxed ); - while ( true ) { - if ( pSucc.bits() - || pDel->next(nLevel).compare_exchange_weak( pSucc, pSucc | 1, memory_model::memory_order_acquire, atomics::memory_order_relaxed )) - { - break; - } - } - } + position pos; + std::pair bRet( true, false ); - pSucc = pDel->next(0).load( memory_model::memory_order_relaxed ); - while ( true ) { - if ( pSucc.bits() ) - return false; + { + node_type * pNode = node_traits::to_node_ptr( val ); + scoped_node_ptr scp( pNode ); + unsigned int nHeight = pNode->height(); + bool bTowerOk = nHeight > 1 && pNode->get_tower() != nullptr; + bool bTowerMade = false; - int const nMask = bExtract ? 3 : 1; - if ( pDel->next(0).compare_exchange_strong( pSucc, pSucc | nMask, memory_model::memory_order_acquire, atomics::memory_order_relaxed )) + rcu_lock rcuLock; + while ( true ) { - f( *node_traits::to_value_ptr( pDel )); + bool bFound = find_position( val, pos, key_comparator(), true ); + if ( bFound ) { + // scoped_node_ptr deletes the node tower if we create it before + if ( !bTowerMade ) + scp.release(); - // physical deletion - // try fast erase - pSucc = pDel; - for ( int nLevel = static_cast( pDel->height() - 1 ); nLevel >= 0; --nLevel ) { - if ( !pos.pPrev[nLevel]->next(nLevel).compare_exchange_strong( pSucc, - marked_node_ptr( pDel->next(nLevel).load(memory_model::memory_order_relaxed).ptr() ), - memory_model::memory_order_release, atomics::memory_order_relaxed) ) - { - // Do slow erase - find_position( *node_traits::to_value_ptr(pDel), pos, key_comparator(), false ); - if ( bExtract ) - m_Stat.onSlowExtract(); - else - m_Stat.onSlowErase(); -# ifdef _DEBUG - assert( pDel->m_bUnlinked ); -# endif - return true; - } + func( false, *node_traits::to_value_ptr(pos.pCur), val ); + m_Stat.onUpdateExist(); + break; } -# ifdef _DEBUG - pDel->m_bUnlinked = true; -# endif - if ( !bExtract ) { - // We cannot free the node at this moment since RCU is locked - // Link deleted nodes to a chain to free later - link_for_remove( pos, pDel ); - m_Stat.onFastErase(); + if ( !bInsert ) { + scp.release(); + bRet.first = false; + break; } - else - m_Stat.onFastExtract(); - - return true; - } - } - } - - enum finsd_fastpath_result { - find_fastpath_found, - find_fastpath_not_found, - find_fastpath_abort - }; - template - finsd_fastpath_result find_fastpath( Q& val, Compare cmp, Func f ) const - { - node_type * pPred; - marked_node_ptr pCur; - marked_node_ptr pSucc; - marked_node_ptr pNull; - - back_off bkoff; - - pPred = m_Head.head(); - for ( int nLevel = static_cast(m_nHeight.load(memory_model::memory_order_relaxed) - 1); nLevel >= 0; --nLevel ) { - pCur = pPred->next(nLevel).load( memory_model::memory_order_acquire ); - if ( pCur == pNull ) - continue; - while ( pCur != pNull ) { - if ( pCur.bits() ) { - // Wait until pCur is removed - unsigned int nAttempt = 0; - while ( pCur.bits() && nAttempt++ < 16 ) { - bkoff(); - pCur = pPred->next(nLevel).load( memory_model::memory_order_acquire ); - } - bkoff.reset(); - - if ( pCur.bits() ) { - // Maybe, we are on deleted node sequence - // Abort searching, try slow-path - return find_fastpath_abort; - } + if ( !bTowerOk ) { + build_node( pNode ); + nHeight = pNode->height(); + bTowerMade = + bTowerOk = true; } - if ( pCur.ptr() ) { - int nCmp = cmp( *node_traits::to_value_ptr( pCur.ptr() ), val ); - if ( nCmp < 0 ) { - pPred = pCur.ptr(); - pCur = pCur->next(nLevel).load( memory_model::memory_order_acquire ); - } - else if ( nCmp == 0 ) { - // found - f( *node_traits::to_value_ptr( pCur.ptr() ), val ); - return find_fastpath_found; - } - else // pCur > val - go down - break; + if ( !insert_at_position( val, pNode, pos, [&func]( value_type& item ) { func( true, item, item ); })) { + m_Stat.onInsertRetry(); + continue; } + + increase_height( nHeight ); + ++m_ItemCounter; + scp.release(); + m_Stat.onAddNode( nHeight ); + m_Stat.onUpdateNew(); + bRet.second = true; + break; } } - return find_fastpath_not_found; + return bRet; } - - template - bool find_slowpath( Q& val, Compare cmp, Func f, position& pos ) + //@cond + template + CDS_DEPRECATED("ensure() is deprecated, use update()") + std::pair ensure( value_type& val, Func func ) { - if ( find_position( val, pos, cmp, true )) { - assert( cmp( *node_traits::to_value_ptr( pos.pCur ), val ) == 0 ); - - f( *node_traits::to_value_ptr( pos.pCur ), val ); - return true; - } - else - return false; + return update( val, func, true ); } + //@endcond - template - bool do_find_with( Q& val, Compare cmp, Func f ) - { - position pos; - bool bRet; - - rcu_lock l; - - switch ( find_fastpath( val, cmp, f )) { - case find_fastpath_found: - m_Stat.onFindFastSuccess(); - return true; - case find_fastpath_not_found: - m_Stat.onFindFastFailed(); - return false; - default: - break; - } + /// Unlinks the item \p val from the set + /** + The function searches the item \p val in the set and unlink it from the set + if it is found and is equal to \p val. - if ( find_slowpath( val, cmp, f, pos )) { - m_Stat.onFindSlowSuccess(); - bRet = true; - } - else { - m_Stat.onFindSlowFailed(); - bRet = false; - } + Difference between \p erase() and \p %unlink() functions: \p erase() finds a key + and deletes the item found. \p %unlink() searches an item by key and deletes it + only if \p val is an item of that set, i.e. the pointer to item found + is equal to &val . - defer_chain( pos ); + RCU \p synchronize method can be called. RCU should not be locked. - return bRet; - } + The \ref disposer specified in \p Traits class template parameter is called + by garbage collector \p GC asynchronously. - template - bool do_erase( Q const& val, Compare cmp, Func f ) + The function returns \p true if success and \p false otherwise. + */ + bool unlink( value_type& val ) { check_deadlock_policy::check(); @@ -1098,1010 +972,1041 @@ retry: bool bRet; { - rcu_lock rcuLock; + rcu_lock l; - if ( !find_position( val, pos, cmp, false ) ) { - m_Stat.onEraseFailed(); + if ( !find_position( val, pos, key_comparator(), false )) { + m_Stat.onUnlinkFailed(); bRet = false; } else { node_type * pDel = pos.pCur; - assert( cmp( *node_traits::to_value_ptr( pDel ), val ) == 0 ); + assert( key_comparator()( *node_traits::to_value_ptr( pDel ), val ) == 0 ); unsigned int nHeight = pDel->height(); - if ( try_remove_at( pDel, pos, f, false )) { + + if ( node_traits::to_value_ptr( pDel ) == &val && try_remove_at( pDel, pos, [](value_type const&) {}, false )) { --m_ItemCounter; m_Stat.onRemoveNode( nHeight ); - m_Stat.onEraseSuccess(); + m_Stat.onUnlinkSuccess(); bRet = true; } else { - m_Stat.onEraseFailed(); + m_Stat.onUnlinkFailed(); bRet = false; } } } - dispose_chain( pos ); return bRet; } - template - value_type * do_extract_key( Q const& key, Compare cmp ) - { - // RCU should be locked!!! - assert( gc::is_locked() ); - - position pos; - node_type * pDel; - - if ( !find_position( key, pos, cmp, false ) ) { - m_Stat.onExtractFailed(); - pDel = nullptr; - } - else { - pDel = pos.pCur; - assert( cmp( *node_traits::to_value_ptr( pDel ), key ) == 0 ); + /// Extracts the item from the set with specified \p key + /** \anchor cds_intrusive_SkipListSet_rcu_extract + The function searches an item with key equal to \p key in the set, + unlinks it from the set, and returns \ref cds::urcu::exempt_ptr "exempt_ptr" pointer to the item found. + If the item with key equal to \p key is not found the function returns an empty \p exempt_ptr. - unsigned int const nHeight = pDel->height(); + Note the compare functor should accept a parameter of type \p Q that can be not the same as \p value_type. - if ( try_remove_at( pDel, pos, [](value_type const&) {}, true )) { - --m_ItemCounter; - m_Stat.onRemoveNode( nHeight ); - m_Stat.onExtractSuccess(); - } - else { - m_Stat.onExtractFailed(); - pDel = nullptr; - } - } + RCU \p synchronize method can be called. RCU should NOT be locked. + The function does not call the disposer for the item found. + The disposer will be implicitly invoked when the returned object is destroyed or when + its \p release() member function is called. + Example: + \code + typedef cds::intrusive::SkipListSet< cds::urcu::gc< cds::urcu::general_buffered<> >, foo, my_traits > skip_list; + skip_list theList; + // ... - defer_chain( pos ); - return pDel ? node_traits::to_value_ptr( pDel ) : nullptr; + typename skip_list::exempt_ptr ep( theList.extract( 5 )); + if ( ep ) { + // Deal with ep + //... + + // Dispose returned item. + ep.release(); + } + \endcode + */ + template + exempt_ptr extract( Q const& key ) + { + return exempt_ptr( do_extract( key )); } - template - bool do_extract( ExemptPtr& result, Q const& key ) + /// Extracts the item from the set with comparing functor \p pred + /** + The function is an analog of \p extract(Q const&) but \p pred predicate is used for key comparing. + \p Less has the interface like \p std::less. + \p pred must imply the same element order as the comparator used for building the set. + */ + template + exempt_ptr extract_with( Q const& key, Less pred ) { - check_deadlock_policy::check(); + return exempt_ptr( do_extract_with( key, pred )); + } - bool bReturn; - { - rcu_lock l; - value_type * pDel = do_extract_key( key, key_comparator() ); - bReturn = pDel != nullptr; - if ( bReturn ) - result = pDel; - } + /// Extracts an item with minimal key from the list + /** + The function searches an item with minimal key, unlinks it, and returns \ref cds::urcu::exempt_ptr "exempt_ptr" pointer to the item. + If the skip-list is empty the function returns an empty \p exempt_ptr. - dispose_deferred(); - return bReturn; - } + RCU \p synchronize method can be called. RCU should NOT be locked. + The function does not call the disposer for the item found. + The disposer will be implicitly invoked when the returned object is destroyed or when + its \p release() member function is manually called. + Example: + \code + typedef cds::intrusive::SkipListSet< cds::urcu::gc< cds::urcu::general_buffered<> >, foo, my_traits > skip_list; + skip_list theList; + // ... - template - bool do_extract_with( ExemptPtr& result, Q const& key, Less pred ) - { - check_deadlock_policy::check(); + typename skip_list::exempt_ptr ep(theList.extract_min()); + if ( ep ) { + // Deal with ep + //... - bool bReturn; - { - rcu_lock l; - value_type * pDel = do_extract_key( key, cds::opt::details::make_comparator_from_less() ); - bReturn = pDel != nullptr; - if ( bReturn ) - result = pDel; + // Dispose returned item. + ep.release(); } + \endcode - dispose_deferred(); - return bReturn; - } - - node_type * do_extract_min() + @note Due the concurrent nature of the list, the function extracts nearly minimum key. + It means that the function gets leftmost item and tries to unlink it. + During unlinking, a concurrent thread may insert an item with key less than leftmost item's key. + So, the function returns the item with minimum key at the moment of list traversing. + */ + exempt_ptr extract_min() { - assert( gc::is_locked() ); + return exempt_ptr( do_extract_min()); + } - position pos; - node_type * pDel; + /// Extracts an item with maximal key from the list + /** + The function searches an item with maximal key, unlinks it, and returns \ref cds::urcu::exempt_ptr "exempt_ptr" pointer to the item. + If the skip-list is empty the function returns an empty \p exempt_ptr. - if ( !find_min_position( pos ) ) { - m_Stat.onExtractMinFailed(); - pDel = nullptr; - } - else { - pDel = pos.pCur; - unsigned int const nHeight = pDel->height(); + RCU \p synchronize method can be called. RCU should NOT be locked. + The function does not call the disposer for the item found. + The disposer will be implicitly invoked when the returned object is destroyed or when + its \p release() member function is manually called. + Example: + \code + typedef cds::intrusive::SkipListSet< cds::urcu::gc< cds::urcu::general_buffered<> >, foo, my_traits > skip_list; + skip_list theList; + // ... - if ( try_remove_at( pDel, pos, [](value_type const&) {}, true )) { - --m_ItemCounter; - m_Stat.onRemoveNode( nHeight ); - m_Stat.onExtractMinSuccess(); - } - else { - m_Stat.onExtractMinFailed(); - pDel = nullptr; - } + typename skip_list::exempt_ptr ep( theList.extract_max()); + if ( ep ) { + // Deal with ep + //... + // Dispose returned item. + ep.release(); } + \endcode - defer_chain( pos ); - return pDel; + @note Due the concurrent nature of the list, the function extracts nearly maximal key. + It means that the function gets rightmost item and tries to unlink it. + During unlinking, a concurrent thread can insert an item with key greater than rightmost item's key. + So, the function returns the item with maximum key at the moment of list traversing. + */ + exempt_ptr extract_max() + { + return exempt_ptr( do_extract_max()); } - template - bool do_extract_min( ExemptPtr& result ) - { - check_deadlock_policy::check(); + /// Deletes the item from the set + /** \anchor cds_intrusive_SkipListSet_rcu_erase + The function searches an item with key equal to \p key in the set, + unlinks it from the set, and returns \p true. + If the item with key equal to \p key is not found the function return \p false. - bool bReturn; - { - rcu_lock l; - node_type * pDel = do_extract_min(); - bReturn = pDel != nullptr; - if ( bReturn ) - result = node_traits::to_value_ptr(pDel); - } + Note the hash functor should accept a parameter of type \p Q that can be not the same as \p value_type. - dispose_deferred(); - return bReturn; + RCU \p synchronize method can be called. RCU should not be locked. + */ + template + bool erase( const Q& key ) + { + return do_erase( key, key_comparator(), [](value_type const&) {} ); } - node_type * do_extract_max() + /// Delete the item from the set with comparing functor \p pred + /** + The function is an analog of \ref cds_intrusive_SkipListSet_rcu_erase "erase(Q const&)" + but \p pred predicate is used for key comparing. + \p Less has the interface like \p std::less. + \p pred must imply the same element order as the comparator used for building the set. + */ + template + bool erase_with( const Q& key, Less pred ) { - assert( gc::is_locked() ); - - position pos; - node_type * pDel; + CDS_UNUSED( pred ); + return do_erase( key, cds::opt::details::make_comparator_from_less(), [](value_type const&) {} ); + } - if ( !find_max_position( pos ) ) { - m_Stat.onExtractMaxFailed(); - pDel = nullptr; - } - else { - pDel = pos.pCur; - unsigned int const nHeight = pDel->height(); + /// Deletes the item from the set + /** \anchor cds_intrusive_SkipListSet_rcu_erase_func + The function searches an item with key equal to \p key in the set, + call \p f functor with item found, unlinks it from the set, and returns \p true. + The \ref disposer specified in \p Traits class template parameter is called + by garbage collector \p GC asynchronously. - if ( try_remove_at( pDel, pos, [](value_type const&) {}, true )) { - --m_ItemCounter; - m_Stat.onRemoveNode( nHeight ); - m_Stat.onExtractMaxSuccess(); - } - else { - m_Stat.onExtractMaxFailed(); - pDel = nullptr; - } - } + The \p Func interface is + \code + struct functor { + void operator()( value_type const& item ); + }; + \endcode + If the item with key equal to \p key is not found the function return \p false. - defer_chain( pos ); - return pDel; - } + Note the hash functor should accept a parameter of type \p Q that can be not the same as \p value_type. - template - bool do_extract_max( ExemptPtr& result ) + RCU \p synchronize method can be called. RCU should not be locked. + */ + template + bool erase( Q const& key, Func f ) { - check_deadlock_policy::check(); - - bool bReturn; - { - rcu_lock l; - node_type * pDel = do_extract_max(); - bReturn = pDel != nullptr; - if ( bReturn ) - result = node_traits::to_value_ptr(pDel); - } - - dispose_deferred(); - return bReturn; + return do_erase( key, key_comparator(), f ); } - void increase_height( unsigned int nHeight ) + /// Delete the item from the set with comparing functor \p pred + /** + The function is an analog of \ref cds_intrusive_SkipListSet_rcu_erase_func "erase(Q const&, Func)" + but \p pred predicate is used for key comparing. + \p Less has the interface like \p std::less. + \p pred must imply the same element order as the comparator used for building the set. + */ + template + bool erase_with( Q const& key, Less pred, Func f ) { - unsigned int nCur = m_nHeight.load( memory_model::memory_order_relaxed ); - if ( nCur < nHeight ) - m_nHeight.compare_exchange_strong( nCur, nHeight, memory_model::memory_order_release, atomics::memory_order_relaxed ); + CDS_UNUSED( pred ); + return do_erase( key, cds::opt::details::make_comparator_from_less(), f ); } - class deferred_list_iterator - { - node_type * pCur; - public: - explicit deferred_list_iterator( node_type * p ) - : pCur(p) - {} - deferred_list_iterator() - : pCur( nullptr ) - {} + /// Finds \p key + /** @anchor cds_intrusive_SkipListSet_rcu_find_func + The function searches the item with key equal to \p key and calls the functor \p f for item found. + The interface of \p Func functor is: + \code + struct functor { + void operator()( value_type& item, Q& key ); + }; + \endcode + where \p item is the item found, \p key is the find function argument. - cds::urcu::retired_ptr operator *() const - { - return cds::urcu::retired_ptr( node_traits::to_value_ptr(pCur), dispose_node ); - } + The functor can change non-key fields of \p item. Note that the functor is only guarantee + that \p item cannot be disposed during functor is executing. + The functor does not serialize simultaneous access to the set \p item. If such access is + possible you must provide your own synchronization schema on item level to exclude unsafe item modifications. - void operator ++() - { - pCur = pCur->m_pDelChain; - } + The \p key argument is non-const since it can be used as \p f functor destination i.e., the functor + can modify both arguments. - bool operator ==( deferred_list_iterator const& i ) const - { - return pCur == i.pCur; - } - bool operator !=( deferred_list_iterator const& i ) const - { - return !operator ==( i ); - } - }; + The function applies RCU lock internally. - void dispose_chain( node_type * pHead ) + The function returns \p true if \p key is found, \p false otherwise. + */ + template + bool find( Q& key, Func f ) { - // RCU should NOT be locked - check_deadlock_policy::check(); - - gc::batch_retire( deferred_list_iterator( pHead ), deferred_list_iterator() ); + return do_find_with( key, key_comparator(), f ); } - - void dispose_chain( position& pos ) + //@cond + template + bool find( Q const& key, Func f ) { - // RCU should NOT be locked - check_deadlock_policy::check(); - - // Delete local chain - if ( pos.pDelChain ) { - dispose_chain( pos.pDelChain ); - pos.pDelChain = nullptr; - } - - // Delete deferred chain - dispose_deferred(); + return do_find_with( key, key_comparator(), f ); } + //@endcond - void dispose_deferred() + /// Finds the key \p key with comparing functor \p pred + /** + The function is an analog of \ref cds_intrusive_SkipListSet_rcu_find_func "find(Q&, Func)" + but \p cmp is used for key comparison. + \p Less functor has the interface like \p std::less. + \p cmp must imply the same element order as the comparator used for building the set. + */ + template + bool find_with( Q& key, Less pred, Func f ) { - dispose_chain( m_pDeferredDelChain.exchange( nullptr, memory_model::memory_order_acq_rel ) ); + CDS_UNUSED( pred ); + return do_find_with( key, cds::opt::details::make_comparator_from_less(), f ); } - - void defer_chain( position& pos ) + //@cond + template + bool find_with( Q const& key, Less pred, Func f ) { - if ( pos.pDelChain ) { - node_type * pHead = pos.pDelChain; - node_type * pTail = pHead; - while ( pTail->m_pDelChain ) - pTail = pTail->m_pDelChain; + CDS_UNUSED( pred ); + return do_find_with( key, cds::opt::details::make_comparator_from_less(), f ); + } + //@endcond - node_type * pDeferList = m_pDeferredDelChain.load( memory_model::memory_order_relaxed ); - do { - pTail->m_pDelChain = pDeferList; - } while ( !m_pDeferredDelChain.compare_exchange_weak( pDeferList, pHead, memory_model::memory_order_acq_rel, atomics::memory_order_relaxed )); + /// Checks whether the set contains \p key + /** + The function searches the item with key equal to \p key + and returns \p true if it is found, and \p false otherwise. - pos.pDelChain = nullptr; - } + The function applies RCU lock internally. + */ + template + bool contains( Q const& key ) + { + return do_find_with( key, key_comparator(), [](value_type& , Q const& ) {} ); + } + //@cond + template + CDS_DEPRECATED("deprecated, use contains()") + bool find( Q const& key ) + { + return contains( key ); } - //@endcond - public: - /// Default constructor - SkipListSet() - : m_Head( c_nMaxHeight ) - , m_nHeight( c_nMinHeight ) - , m_pDeferredDelChain( nullptr ) + /// Checks whether the set contains \p key using \p pred predicate for searching + /** + The function is similar to contains( key ) but \p pred is used for key comparing. + \p Less functor has the interface like \p std::less. + \p Less must imply the same element order as the comparator used for building the set. + */ + template + bool contains( Q const& key, Less pred ) { - static_assert( (std::is_same< gc, typename node_type::gc >::value), "GC and node_type::gc must be the same type" ); - - // Barrier for head node - atomics::atomic_thread_fence( memory_model::memory_order_release ); + CDS_UNUSED( pred ); + return do_find_with( key, cds::opt::details::make_comparator_from_less(), [](value_type& , Q const& ) {} ); } - - /// Clears and destructs the skip-list - ~SkipListSet() + //@cond + template + CDS_DEPRECATED("deprecated, use contains()") + bool find_with( Q const& key, Less pred ) { - clear(); + return contains( key, pred ); } + //@endcond - public: - /// Iterator type - typedef skip_list::details::iterator< gc, node_traits, back_off, false > iterator; + /// Finds \p key and return the item found + /** \anchor cds_intrusive_SkipListSet_rcu_get + The function searches the item with key equal to \p key and returns a \p raw_ptr object pointed to item found. + If \p key is not found it returns empty \p raw_ptr. - /// Const iterator type - typedef skip_list::details::iterator< gc, node_traits, back_off, true > const_iterator; + Note the compare functor should accept a parameter of type \p Q that can be not the same as \p value_type. - /// Returns a forward iterator addressing the first element in a set - iterator begin() + RCU should be locked before call of this function. + Returned item is valid only while RCU is locked: + \code + typedef cds::intrusive::SkipListSet< cds::urcu::gc< cds::urcu::general_buffered<> >, foo, my_traits > skip_list; + skip_list theList; + // ... + typename skip_list::raw_ptr pVal; + { + // Lock RCU + skip_list::rcu_lock lock; + + pVal = theList.get( 5 ); + if ( pVal ) { + // Deal with pVal + //... + } + } + // You can manually release pVal after RCU-locked section + pVal.release(); + \endcode + */ + template + raw_ptr get( Q const& key ) { - return iterator( *m_Head.head() ); + assert( gc::is_locked()); + + position pos; + value_type * pFound; + if ( do_find_with( key, key_comparator(), [&pFound](value_type& found, Q const& ) { pFound = &found; }, pos )) + return raw_ptr( pFound, raw_ptr_disposer( pos )); + return raw_ptr( raw_ptr_disposer( pos )); } - /// Returns a forward const iterator addressing the first element in a set - const_iterator begin() const + /// Finds \p key and return the item found + /** + The function is an analog of \ref cds_intrusive_SkipListSet_rcu_get "get(Q const&)" + but \p pred is used for comparing the keys. + + \p Less functor has the semantics like \p std::less but should take arguments of type \ref value_type and \p Q + in any order. + \p pred must imply the same element order as the comparator used for building the set. + */ + template + raw_ptr get_with( Q const& key, Less pred ) { - return const_iterator( *m_Head.head() ); + CDS_UNUSED( pred ); + assert( gc::is_locked()); + + value_type * pFound = nullptr; + position pos; + if ( do_find_with( key, cds::opt::details::make_comparator_from_less(), + [&pFound](value_type& found, Q const& ) { pFound = &found; }, pos )) + { + return raw_ptr( pFound, raw_ptr_disposer( pos )); + } + return raw_ptr( raw_ptr_disposer( pos )); } - /// Returns a forward const iterator addressing the first element in a set - const_iterator cbegin() + /// Returns item count in the set + /** + The value returned depends on item counter type provided by \p Traits template parameter. + For \p atomicity::empty_item_counter the function always returns 0. + Therefore, the function is not suitable for checking the set emptiness, use \p empty() + member function for this purpose. + */ + size_t size() const { - return const_iterator( *m_Head.head() ); + return m_ItemCounter; } - /// Returns a forward iterator that addresses the location succeeding the last element in a set. - iterator end() + /// Checks if the set is empty + bool empty() const { - return iterator(); + return m_Head.head()->next( 0 ).load( memory_model::memory_order_relaxed ) == nullptr; } - /// Returns a forward const iterator that addresses the location succeeding the last element in a set. - const_iterator end() const + /// Clears the set (not atomic) + /** + The function unlink all items from the set. + The function is not atomic, thus, in multi-threaded environment with parallel insertions + this sequence + \code + set.clear(); + assert( set.empty()); + \endcode + the assertion could be raised. + + For each item the \p disposer will be called automatically after unlinking. + */ + void clear() { - return const_iterator(); + exempt_ptr ep; + while ( (ep = extract_min())); } - /// Returns a forward const iterator that addresses the location succeeding the last element in a set. - const_iterator cend() + /// Returns maximum height of skip-list. The max height is a constant for each object and does not exceed 32. + static CDS_CONSTEXPR unsigned int max_height() CDS_NOEXCEPT { - return const_iterator(); + return c_nMaxHeight; } - public: - /// Inserts new node - /** - The function inserts \p val in the set if it does not contain - an item with key equal to \p val. + /// Returns const reference to internal statistics + stat const& statistics() const + { + return m_Stat; + } - The function applies RCU lock internally. + protected: + //@cond - Returns \p true if \p val is placed into the set, \p false otherwise. - */ - bool insert( value_type& val ) + bool is_extracted( marked_node_ptr const p ) const { - return insert( val, []( value_type& ) {} ); + return ( p.bits() & 2 ) != 0; } - /// Inserts new node - /** - This function is intended for derived non-intrusive containers. - - The function allows to split creating of new item into two part: - - create item with key only - - insert new item into the set - - if inserting is success, calls \p f functor to initialize value-field of \p val. + void help_remove( int nLevel, node_type* pPred, marked_node_ptr pCur, marked_node_ptr pSucc, position& pos ) + { + marked_node_ptr p( pCur.ptr() ); - The functor signature is: - \code - void func( value_type& val ); - \endcode - where \p val is the item inserted. User-defined functor \p f should guarantee that during changing - \p val no any other changes could be made on this set's item by concurrent threads. - The user-defined functor is called only if the inserting is success and may be passed by reference - using \p std::ref. + if ( pCur->is_upper_level( nLevel ) + && pPred->next( nLevel ).compare_exchange_strong( p, marked_node_ptr( pSucc.ptr()), + memory_model::memory_order_release, atomics::memory_order_relaxed )) + { + if ( pCur->level_unlinked()) { + if ( !is_extracted( pSucc ) ) { + // We cannot free the node at this moment because RCU is locked + // Link deleted nodes to a chain to free later + pos.dispose( pCur.ptr() ); + m_Stat.onEraseWhileFind(); + } + else + m_Stat.onExtractWhileFind(); + } + } + } - RCU \p synchronize method can be called. RCU should not be locked. - */ - template - bool insert( value_type& val, Func f ) + template + bool find_position( Q const& val, position& pos, Compare cmp, bool bStopIfFound ) { - check_deadlock_policy::check(); + assert( gc::is_locked() ); - position pos; - bool bRet; + node_type * pPred; + marked_node_ptr pSucc; + marked_node_ptr pCur; + int nCmp = 1; - { - node_type * pNode = node_traits::to_node_ptr( val ); - scoped_node_ptr scp( pNode ); - unsigned int nHeight = pNode->height(); - bool bTowerOk = nHeight > 1 && pNode->get_tower() != nullptr; - bool bTowerMade = false; + retry: + pPred = m_Head.head(); - rcu_lock rcuLock; + for ( int nLevel = static_cast( c_nMaxHeight - 1 ); nLevel >= 0; --nLevel ) { - while ( true ) - { - bool bFound = find_position( val, pos, key_comparator(), true ); - if ( bFound ) { - // scoped_node_ptr deletes the node tower if we create it - if ( !bTowerMade ) - scp.release(); + while ( true ) { + pCur = pPred->next( nLevel ).load( memory_model::memory_order_acquire ); + if ( pCur.bits() ) { + // pCur.bits() means that pPred is logically deleted + goto retry; + } - m_Stat.onInsertFailed(); - bRet = false; + if ( pCur.ptr() == nullptr ) { + // end of the list at level nLevel - goto next level break; } - if ( !bTowerOk ) { - build_node( pNode ); - nHeight = pNode->height(); - bTowerMade = - bTowerOk = true; - } + // pSucc contains deletion mark for pCur + pSucc = pCur->next( nLevel ).load( memory_model::memory_order_acquire ); - if ( !insert_at_position( val, pNode, pos, f )) { - m_Stat.onInsertRetry(); - continue; - } + if ( pPred->next( nLevel ).load( memory_model::memory_order_acquire ).all() != pCur.ptr() ) + goto retry; - increase_height( nHeight ); - ++m_ItemCounter; - m_Stat.onAddNode( nHeight ); - m_Stat.onInsertSuccess(); - scp.release(); - bRet = true; - break; + if ( pSucc.bits() ) { + // pCur is marked, i.e. logically deleted. + help_remove( nLevel, pPred, pCur, pSucc, pos ); + goto retry; + } + else { + nCmp = cmp( *node_traits::to_value_ptr( pCur.ptr() ), val ); + if ( nCmp < 0 ) + pPred = pCur.ptr(); + else if ( nCmp == 0 && bStopIfFound ) + goto found; + else + break; + } } + + // Next level + pos.pPrev[nLevel] = pPred; + pos.pSucc[nLevel] = pCur.ptr(); } - dispose_chain( pos ); + if ( nCmp != 0 ) + return false; - return bRet; + found: + pos.pCur = pCur.ptr(); + return pCur.ptr() && nCmp == 0; } - /// Ensures that the \p val exists in the set - /** - The operation performs inserting or changing data with lock-free manner. - - If the item \p val is not found in the set, then \p val is inserted into the set. - Otherwise, the functor \p func is called with item found. - The functor signature is: - \code - void func( bool bNew, value_type& item, value_type& val ); - \endcode - with arguments: - - \p bNew - \p true if the item has been inserted, \p false otherwise - - \p item - item of the set - - \p val - argument \p val passed into the \p ensure function - If new item has been inserted (i.e. \p bNew is \p true) then \p item and \p val arguments - refer to the same thing. + bool find_min_position( position& pos ) + { + assert( gc::is_locked() ); - The functor can change non-key fields of the \p item; however, \p func must guarantee - that during changing no any other modifications could be made on this item by concurrent threads. + node_type * pPred; + marked_node_ptr pSucc; + marked_node_ptr pCur; - You can pass \p func argument by value or by reference using \p std::ref. + retry: + pPred = m_Head.head(); - RCU \p synchronize method can be called. RCU should not be locked. + for ( int nLevel = static_cast( c_nMaxHeight - 1 ); nLevel >= 0; --nLevel ) { - Returns std::pair where \p first is \p true if operation is successfull, - \p second is \p true if new item has been added or \p false if the item with \p key - already is in the set. - */ - template - std::pair ensure( value_type& val, Func func ) - { - check_deadlock_policy::check(); + pCur = pPred->next( nLevel ).load( memory_model::memory_order_acquire ); + // pCur.bits() means that pPred is logically deleted + // head cannot be deleted + assert( pCur.bits() == 0 ); - position pos; - std::pair bRet( true, false ); + if ( pCur.ptr() ) { - { - node_type * pNode = node_traits::to_node_ptr( val ); - scoped_node_ptr scp( pNode ); - unsigned int nHeight = pNode->height(); - bool bTowerOk = nHeight > 1 && pNode->get_tower() != nullptr; - bool bTowerMade = false; + // pSucc contains deletion mark for pCur + pSucc = pCur->next( nLevel ).load( memory_model::memory_order_acquire ); - rcu_lock rcuLock; - while ( true ) - { - bool bFound = find_position( val, pos, key_comparator(), true ); - if ( bFound ) { - // scoped_node_ptr deletes the node tower if we create it before - if ( !bTowerMade ) - scp.release(); + if ( pPred->next( nLevel ).load( memory_model::memory_order_acquire ).all() != pCur.ptr() ) + goto retry; - func( false, *node_traits::to_value_ptr(pos.pCur), val ); - m_Stat.onEnsureExist(); - break; + if ( pSucc.bits() ) { + // pCur is marked, i.e. logically deleted. + help_remove( nLevel, pPred, pCur, pSucc, pos ); + goto retry; } - - if ( !bTowerOk ) { - build_node( pNode ); - nHeight = pNode->height(); - bTowerMade = - bTowerOk = true; - } - - if ( !insert_at_position( val, pNode, pos, [&func]( value_type& item ) { func( true, item, item ); })) { - m_Stat.onInsertRetry(); - continue; - } - - increase_height( nHeight ); - ++m_ItemCounter; - scp.release(); - m_Stat.onAddNode( nHeight ); - m_Stat.onEnsureNew(); - bRet.second = true; - break; } - } - - dispose_chain( pos ); - return bRet; + // Next level + pos.pPrev[nLevel] = pPred; + pos.pSucc[nLevel] = pCur.ptr(); + } + return ( pos.pCur = pCur.ptr() ) != nullptr; } - /// Unlinks the item \p val from the set - /** - The function searches the item \p val in the set and unlink it from the set - if it is found and is equal to \p val. - - Difference between \ref erase and \p unlink functions: \p erase finds a key - and deletes the item found. \p unlink finds an item by key and deletes it - only if \p val is an item of that set, i.e. the pointer to item found - is equal to &val . + bool find_max_position( position& pos ) + { + assert( gc::is_locked() ); - RCU \p synchronize method can be called. RCU should not be locked. + node_type * pPred; + marked_node_ptr pSucc; + marked_node_ptr pCur; - The \ref disposer specified in \p Traits class template parameter is called - by garbage collector \p GC asynchronously. + retry: + pPred = m_Head.head(); - The function returns \p true if success and \p false otherwise. - */ - bool unlink( value_type& val ) - { - check_deadlock_policy::check(); + for ( int nLevel = static_cast( c_nMaxHeight - 1 ); nLevel >= 0; --nLevel ) { - position pos; - bool bRet; + while ( true ) { + pCur = pPred->next( nLevel ).load( memory_model::memory_order_acquire ); + if ( pCur.bits() ) { + // pCur.bits() means that pPred is logically deleted + goto retry; + } - { - rcu_lock rcuLock; + if ( pCur.ptr() == nullptr ) { + // end of the list at level nLevel - goto next level + break; + } - if ( !find_position( val, pos, key_comparator(), false ) ) { - m_Stat.onUnlinkFailed(); - bRet = false; - } - else { - node_type * pDel = pos.pCur; - assert( key_comparator()( *node_traits::to_value_ptr( pDel ), val ) == 0 ); + // pSucc contains deletion mark for pCur + pSucc = pCur->next( nLevel ).load( memory_model::memory_order_acquire ); - unsigned int nHeight = pDel->height(); + if ( pPred->next( nLevel ).load( memory_model::memory_order_acquire ).all() != pCur.ptr() ) + goto retry; - if ( node_traits::to_value_ptr( pDel ) == &val && try_remove_at( pDel, pos, [](value_type const&) {}, false )) { - --m_ItemCounter; - m_Stat.onRemoveNode( nHeight ); - m_Stat.onUnlinkSuccess(); - bRet = true; + if ( pSucc.bits() ) { + // pCur is marked, i.e. logically deleted. + help_remove( nLevel, pPred, pCur, pSucc, pos ); + goto retry; } else { - m_Stat.onUnlinkFailed(); - bRet = false; + if ( !pSucc.ptr() ) + break; + + pPred = pCur.ptr(); } } - } - dispose_chain( pos ); + // Next level + pos.pPrev[nLevel] = pPred; + pos.pSucc[nLevel] = pCur.ptr(); + } - return bRet; + return ( pos.pCur = pCur.ptr() ) != nullptr; } - /// Extracts the item from the set with specified \p key - /** \anchor cds_intrusive_SkipListSet_rcu_extract - The function searches an item with key equal to \p key in the set, - unlinks it from the set, places it to \p result parameter, and returns \p true. - If the item with key equal to \p key is not found the function returns \p false. + template + bool insert_at_position( value_type& val, node_type * pNode, position& pos, Func f ) + { + assert( gc::is_locked() ); - Note the compare functor should accept a parameter of type \p Q that can be not the same as \p value_type. + unsigned int const nHeight = pNode->height(); + pNode->clear_tower(); - RCU \p synchronize method can be called. RCU should NOT be locked. - The function does not call the disposer for the item found. - The disposer will be implicitly invoked when \p result object is destroyed or when - result.release() is called, see cds::urcu::exempt_ptr for explanation. - @note Before reusing \p result object you should call its \p release() method. - Example: - \code - typedef cds::intrusive::SkipListSet< cds::urcu::gc< cds::urcu::general_buffered<> >, foo, my_traits > skip_list; - skip_list theList; - // ... + // Insert at level 0 + { + marked_node_ptr p( pos.pSucc[0] ); + pNode->next( 0 ).store( p, memory_model::memory_order_relaxed ); + if ( !pos.pPrev[0]->next( 0 ).compare_exchange_strong( p, marked_node_ptr( pNode ), memory_model::memory_order_release, atomics::memory_order_relaxed )) + return false; - typename skip_list::exempt_ptr ep; - if ( theList.extract( ep, 5 ) ) { - // Deal with ep - //... + f( val ); + } - // Dispose returned item. - ep.release(); + // Insert at level 1..max + for ( unsigned int nLevel = 1; nLevel < nHeight; ++nLevel ) { + marked_node_ptr p; + while ( true ) { + marked_node_ptr pSucc( pos.pSucc[nLevel] ); + + // Set pNode->next + // pNode->next must be null but can have a "logical deleted" flag if another thread is removing pNode right now + if ( !pNode->next( nLevel ).compare_exchange_strong( p, pSucc, + memory_model::memory_order_acq_rel, atomics::memory_order_acquire ) ) + { + // pNode has been marked as removed while we are inserting it + // Stop inserting + assert( p.bits() != 0 ); + if ( pNode->level_unlinked( nHeight - nLevel )) + pos.dispose( pNode ); + m_Stat.onLogicDeleteWhileInsert(); + return true; + } + p = pSucc; + + // Link pNode into the list at nLevel + if ( pos.pPrev[nLevel]->next( nLevel ).compare_exchange_strong( pSucc, marked_node_ptr( pNode ), + memory_model::memory_order_release, atomics::memory_order_relaxed )) + { + // go to next level + break; + } + + // Renew insert position + m_Stat.onRenewInsertPosition(); + if ( !find_position( val, pos, key_comparator(), false ) ) { + // The node has been deleted while we are inserting it + m_Stat.onNotFoundWhileInsert(); + return true; + } + } } - \endcode - */ - template - bool extract( exempt_ptr& result, Q const& key ) - { - return do_extract( result, key ); + return true; } - /// Extracts the item from the set with comparing functor \p pred - /** - The function is an analog of \ref cds_intrusive_SkipListSet_rcu_extract "extract(exempt_ptr&, Q const&)" - but \p pred predicate is used for key comparing. - \p Less has the interface like \p std::less. - \p pred must imply the same element order as the comparator used for building the set. - */ - template - bool extract_with( exempt_ptr& result, Q const& key, Less pred ) + template + bool try_remove_at( node_type * pDel, position& pos, Func f, bool bExtract ) { - return do_extract_with( result, key, pred ); - } - - /// Extracts an item with minimal key from the list - /** - The function searches an item with minimal key, unlinks it, and returns the item found in \p result parameter. - If the skip-list is empty the function returns \p false. + assert( pDel != nullptr ); + assert( gc::is_locked() ); - RCU \p synchronize method can be called. RCU should NOT be locked. - The function does not call the disposer for the item found. - The disposer will be implicitly invoked when \p result object is destroyed or when - result.release() is called, see cds::urcu::exempt_ptr for explanation. - @note Before reusing \p result object you should call its \p release() method. - Example: - \code - typedef cds::intrusive::SkipListSet< cds::urcu::gc< cds::urcu::general_buffered<> >, foo, my_traits > skip_list; - skip_list theList; - // ... + marked_node_ptr pSucc; + back_off bkoff; - typename skip_list::exempt_ptr ep; - if ( theList.extract_min(ep)) { - // Deal with ep - //... + unsigned const nMask = bExtract ? 3u : 1u; - // Dispose returned item. - ep.release(); + // logical deletion (marking) + for ( unsigned int nLevel = pDel->height() - 1; nLevel > 0; --nLevel ) { + pSucc = pDel->next( nLevel ).load( memory_model::memory_order_relaxed ); + if ( pSucc.bits() == 0 ) { + bkoff.reset(); + while ( !pDel->next( nLevel ).compare_exchange_weak( pSucc, pSucc | nMask, + memory_model::memory_order_release, atomics::memory_order_acquire )) + { + if ( pSucc.bits() == 0 ) { + bkoff(); + m_Stat.onMarkFailed(); + } + else if ( pSucc.bits() != nMask ) + return false; + } + } } - \endcode - @note Due the concurrent nature of the list, the function extracts nearly minimum key. - It means that the function gets leftmost item and tries to unlink it. - During unlinking, a concurrent thread may insert an item with key less than leftmost item's key. - So, the function returns the item with minimum key at the moment of list traversing. - */ - bool extract_min( exempt_ptr& result ) - { - return do_extract_min( result ); - } + marked_node_ptr p( pDel->next( 0 ).load( memory_model::memory_order_relaxed ).ptr() ); + while ( true ) { + if ( pDel->next( 0 ).compare_exchange_strong( p, p | nMask, memory_model::memory_order_release, atomics::memory_order_acquire )) + { + f( *node_traits::to_value_ptr( pDel ) ); - /// Extracts an item with maximal key from the list - /** - The function searches an item with maximal key, unlinks it, and returns the item found in \p result parameter. - If the skip-list is empty the function returns \p false. + // physical deletion + // try fast erase + p = pDel; + for ( int nLevel = static_cast( pDel->height() - 1 ); nLevel >= 0; --nLevel ) { - RCU \p synchronize method can be called. RCU should NOT be locked. - The function does not call the disposer for the item found. - The disposer will be implicitly invoked when \p result object is destroyed or when - result.release() is called, see cds::urcu::exempt_ptr for explanation. - @note Before reusing \p result object you should call its \p release() method. - Example: - \code - typedef cds::intrusive::SkipListSet< cds::urcu::gc< cds::urcu::general_buffered<> >, foo, my_traits > skip_list; - skip_list theList; - // ... + pSucc = pDel->next( nLevel ).load( memory_model::memory_order_relaxed ); + if ( pos.pPrev[nLevel]->next( nLevel ).compare_exchange_strong( p, marked_node_ptr( pSucc.ptr() ), + memory_model::memory_order_acq_rel, atomics::memory_order_acquire ) ) + { + pDel->level_unlinked(); + } + else { + // Make slow erase + find_position( *node_traits::to_value_ptr( pDel ), pos, key_comparator(), false ); + if ( bExtract ) + m_Stat.onSlowExtract(); + else + m_Stat.onSlowErase(); - typename skip_list::exempt_ptr ep; - if ( theList.extract_max(ep) ) { - // Deal with ep - //... - // Dispose returned item. - ep.release(); + return true; + } + } + + // Fast erasing success + if ( !bExtract ) { + // We cannot free the node at this moment since RCU is locked + // Link deleted nodes to a chain to free later + pos.dispose( pDel ); + m_Stat.onFastErase(); + } + else + m_Stat.onFastExtract(); + return true; + } + else if ( p.bits() ) { + // Another thread is deleting pDel right now + m_Stat.onEraseContention(); + return false; + } + + m_Stat.onEraseRetry(); + bkoff(); } - \endcode + } - @note Due the concurrent nature of the list, the function extracts nearly maximal key. - It means that the function gets rightmost item and tries to unlink it. - During unlinking, a concurrent thread can insert an item with key greater than rightmost item's key. - So, the function returns the item with maximum key at the moment of list traversing. - */ - bool extract_max( exempt_ptr& result ) + enum finsd_fastpath_result { + find_fastpath_found, + find_fastpath_not_found, + find_fastpath_abort + }; + template + finsd_fastpath_result find_fastpath( Q& val, Compare cmp, Func f ) const { - return do_extract_max( result ); - } + node_type * pPred; + marked_node_ptr pCur; + marked_node_ptr pSucc; + marked_node_ptr pNull; - /// Deletes the item from the set - /** \anchor cds_intrusive_SkipListSet_rcu_erase - The function searches an item with key equal to \p val in the set, - unlinks it from the set, and returns \p true. - If the item with key equal to \p val is not found the function return \p false. + back_off bkoff; + unsigned attempt = 0; - Note the hash functor should accept a parameter of type \p Q that can be not the same as \p value_type. + try_again: + pPred = m_Head.head(); + for ( int nLevel = static_cast( m_nHeight.load( memory_model::memory_order_relaxed ) - 1 ); nLevel >= 0; --nLevel ) { + pCur = pPred->next( nLevel ).load( memory_model::memory_order_acquire ); - RCU \p synchronize method can be called. RCU should not be locked. - */ - template - bool erase( const Q& val ) - { - return do_erase( val, key_comparator(), [](value_type const&) {} ); - } + while ( pCur != pNull ) { + if ( pCur.bits() ) { + // pPred is being removed + if ( ++attempt < 4 ) { + bkoff(); + goto try_again; + } - /// Delete the item from the set with comparing functor \p pred - /** - The function is an analog of \ref cds_intrusive_SkipListSet_rcu_erase "erase(Q const&)" - but \p pred predicate is used for key comparing. - \p Less has the interface like \p std::less. - \p pred must imply the same element order as the comparator used for building the set. - */ - template - bool erase_with( const Q& val, Less pred ) - { - return do_erase( val, cds::opt::details::make_comparator_from_less(), [](value_type const&) {} ); - } + return find_fastpath_abort; + } - /// Deletes the item from the set - /** \anchor cds_intrusive_SkipListSet_rcu_erase_func - The function searches an item with key equal to \p val in the set, - call \p f functor with item found, unlinks it from the set, and returns \p true. - The \ref disposer specified in \p Traits class template parameter is called - by garbage collector \p GC asynchronously. + if ( pCur.ptr() ) { + int nCmp = cmp( *node_traits::to_value_ptr( pCur.ptr() ), val ); + if ( nCmp < 0 ) { + pPred = pCur.ptr(); + pCur = pCur->next( nLevel ).load( memory_model::memory_order_acquire ); + } + else if ( nCmp == 0 ) { + // found + f( *node_traits::to_value_ptr( pCur.ptr() ), val ); + return find_fastpath_found; + } + else // pCur > val - go down + break; + } + } + } - The \p Func interface is - \code - struct functor { - void operator()( value_type const& item ); - }; - \endcode - The functor can be passed by reference with boost:ref + return find_fastpath_not_found; + } - If the item with key equal to \p val is not found the function return \p false. + template + bool find_slowpath( Q& val, Compare cmp, Func f, position& pos ) + { + if ( find_position( val, pos, cmp, true ) ) { + assert( cmp( *node_traits::to_value_ptr( pos.pCur ), val ) == 0 ); - Note the hash functor should accept a parameter of type \p Q that can be not the same as \p value_type. + f( *node_traits::to_value_ptr( pos.pCur ), val ); + return true; + } + else + return false; + } - RCU \p synchronize method can be called. RCU should not be locked. - */ - template - bool erase( Q const& val, Func f ) + template + bool do_find_with( Q& val, Compare cmp, Func f ) { - return do_erase( val, key_comparator(), f ); + position pos; + return do_find_with( val, cmp, f, pos ); } - /// Delete the item from the set with comparing functor \p pred - /** - The function is an analog of \ref cds_intrusive_SkipListSet_rcu_erase_func "erase(Q const&, Func)" - but \p pred predicate is used for key comparing. - \p Less has the interface like \p std::less. - \p pred must imply the same element order as the comparator used for building the set. - */ - template - bool erase_with( Q const& val, Less pred, Func f ) + template + bool do_find_with( Q& val, Compare cmp, Func f, position& pos ) { - return do_erase( val, cds::opt::details::make_comparator_from_less(), f ); + bool bRet; + + { + rcu_lock l; + + switch ( find_fastpath( val, cmp, f ) ) { + case find_fastpath_found: + m_Stat.onFindFastSuccess(); + return true; + case find_fastpath_not_found: + m_Stat.onFindFastFailed(); + return false; + default: + break; + } + + if ( find_slowpath( val, cmp, f, pos ) ) { + m_Stat.onFindSlowSuccess(); + bRet = true; + } + else { + m_Stat.onFindSlowFailed(); + bRet = false; + } + } + return bRet; } - /// Finds the key \p val - /** @anchor cds_intrusive_SkipListSet_rcu_find_func - The function searches the item with key equal to \p val and calls the functor \p f for item found. - The interface of \p Func functor is: - \code - struct functor { - void operator()( value_type& item, Q& val ); - }; - \endcode - where \p item is the item found, \p val is the find function argument. + template + bool do_erase( Q const& val, Compare cmp, Func f ) + { + check_deadlock_policy::check(); - You can pass \p f argument by value or by reference using \p std::ref. + position pos; + bool bRet; - The functor can change non-key fields of \p item. Note that the functor is only guarantee - that \p item cannot be disposed during functor is executing. - The functor does not serialize simultaneous access to the set \p item. If such access is - possible you must provide your own synchronization schema on item level to exclude unsafe item modifications. + { + rcu_lock rcuLock; - The \p val argument is non-const since it can be used as \p f functor destination i.e., the functor - can modify both arguments. + if ( !find_position( val, pos, cmp, false ) ) { + m_Stat.onEraseFailed(); + bRet = false; + } + else { + node_type * pDel = pos.pCur; + assert( cmp( *node_traits::to_value_ptr( pDel ), val ) == 0 ); - The function applies RCU lock internally. + unsigned int nHeight = pDel->height(); + if ( try_remove_at( pDel, pos, f, false ) ) { + --m_ItemCounter; + m_Stat.onRemoveNode( nHeight ); + m_Stat.onEraseSuccess(); + bRet = true; + } + else { + m_Stat.onEraseFailed(); + bRet = false; + } + } + } - The function returns \p true if \p val is found, \p false otherwise. - */ - template - bool find( Q& val, Func f ) - { - return do_find_with( val, key_comparator(), f ); + return bRet; } - /// Finds the key \p val with comparing functor \p pred - /** - The function is an analog of \ref cds_intrusive_SkipListSet_rcu_find_func "find(Q&, Func)" - but \p cmp is used for key comparison. - \p Less functor has the interface like \p std::less. - \p cmp must imply the same element order as the comparator used for building the set. - */ - template - bool find_with( Q& val, Less pred, Func f ) + template + value_type * do_extract_key( Q const& key, Compare cmp, position& pos ) { - return do_find_with( val, cds::opt::details::make_comparator_from_less(), f ); - } - - /// Finds the key \p val - /** @anchor cds_intrusive_SkipListSet_rcu_find_cfunc - The function searches the item with key equal to \p val and calls the functor \p f for item found. - The interface of \p Func functor is: - \code - struct functor { - void operator()( value_type& item, Q const& val ); - }; - \endcode - where \p item is the item found, \p val is the find function argument. + // RCU should be locked!!! + assert( gc::is_locked() ); - You can pass \p f argument by value or by reference using \p std::ref. + node_type * pDel; - The functor can change non-key fields of \p item. Note that the functor is only guarantee - that \p item cannot be disposed during functor is executing. - The functor does not serialize simultaneous access to the set \p item. If such access is - possible you must provide your own synchronization schema on item level to exclude unsafe item modifications. + if ( !find_position( key, pos, cmp, false ) ) { + m_Stat.onExtractFailed(); + pDel = nullptr; + } + else { + pDel = pos.pCur; + assert( cmp( *node_traits::to_value_ptr( pDel ), key ) == 0 ); - The function applies RCU lock internally. + unsigned int const nHeight = pDel->height(); - The function returns \p true if \p val is found, \p false otherwise. - */ - template - bool find( Q const& val, Func f ) - { - return do_find_with( val, key_comparator(), f ); - } + if ( try_remove_at( pDel, pos, []( value_type const& ) {}, true ) ) { + --m_ItemCounter; + m_Stat.onRemoveNode( nHeight ); + m_Stat.onExtractSuccess(); + } + else { + m_Stat.onExtractFailed(); + pDel = nullptr; + } + } - /// Finds the key \p val with comparing functor \p pred - /** - The function is an analog of \ref cds_intrusive_SkipListSet_rcu_find_cfunc "find(Q const&, Func)" - but \p cmp is used for key comparison. - \p Less functor has the interface like \p std::less. - \p cmp must imply the same element order as the comparator used for building the set. - */ - template - bool find_with( Q const& val, Less pred, Func f ) - { - return do_find_with( val, cds::opt::details::make_comparator_from_less(), f ); + return pDel ? node_traits::to_value_ptr( pDel ) : nullptr; } - /// Finds the key \p val - /** @anchor cds_intrusive_SkipListSet_rcu_find_val - The function searches the item with key equal to \p val - and returns \p true if it is found, and \p false otherwise. - - The function applies RCU lock internally. - */ template - bool find( Q const& val ) + value_type * do_extract( Q const& key ) { - return do_find_with( val, key_comparator(), [](value_type& , Q const& ) {} ); + check_deadlock_policy::check(); + value_type * pDel = nullptr; + position pos; + { + rcu_lock l; + pDel = do_extract_key( key, key_comparator(), pos ); + } + + return pDel; } - /// Finds the key \p val with comparing functor \p pred - /** - The function is an analog of \ref cds_intrusive_SkipListSet_rcu_find_val "find(Q const&)" - but \p pred is used for key compare. - \p Less functor has the interface like \p std::less. - \p pred must imply the same element order as the comparator used for building the set. - */ template - bool find_with( Q const& val, Less pred ) + value_type * do_extract_with( Q const& key, Less pred ) { - return do_find_with( val, cds::opt::details::make_comparator_from_less(), [](value_type& , Q const& ) {} ); + CDS_UNUSED( pred ); + check_deadlock_policy::check(); + value_type * pDel = nullptr; + position pos; + { + rcu_lock l; + pDel = do_extract_key( key, cds::opt::details::make_comparator_from_less(), pos ); + } + + return pDel; } - /// Finds the key \p val and return the item found - /** \anchor cds_intrusive_SkipListSet_rcu_get - The function searches the item with key equal to \p val and returns the pointer to item found. - If \p val is not found it returns \p nullptr. + value_type * do_extract_min() + { + assert( !gc::is_locked() ); - Note the compare functor should accept a parameter of type \p Q that can be not the same as \p value_type. + position pos; + node_type * pDel; - RCU should be locked before call of this function. - Returned item is valid only while RCU is locked: - \code - typedef cds::intrusive::SkipListSet< cds::urcu::gc< cds::urcu::general_buffered<> >, foo, my_traits > skip_list; - skip_list theList; - // ... { - // Lock RCU - skip_list::rcu_lock lock; + rcu_lock l; - foo * pVal = theList.get( 5 ); - if ( pVal ) { - // Deal with pVal - //... + if ( !find_min_position( pos ) ) { + m_Stat.onExtractMinFailed(); + pDel = nullptr; } - } - // Unlock RCU by rcu_lock destructor - // pVal can be retired by disposer at any time after RCU has been unlocked - \endcode + else { + pDel = pos.pCur; + unsigned int const nHeight = pDel->height(); - After RCU unlocking the \p %force_dispose member function can be called manually, - see \ref force_dispose for explanation. - */ - template - value_type * get( Q const& val ) - { - assert( gc::is_locked()); + if ( try_remove_at( pDel, pos, []( value_type const& ) {}, true ) ) { + --m_ItemCounter; + m_Stat.onRemoveNode( nHeight ); + m_Stat.onExtractMinSuccess(); + } + else { + m_Stat.onExtractMinFailed(); + pDel = nullptr; + } + } + } - value_type * pFound; - return do_find_with( val, key_comparator(), [&pFound](value_type& found, Q const& ) { pFound = &found; } ) - ? pFound : nullptr; + return pDel ? node_traits::to_value_ptr( pDel ) : nullptr; } - /// Finds the key \p val and return the item found - /** - The function is an analog of \ref cds_intrusive_SkipListSet_rcu_get "get(Q const&)" - but \p pred is used for comparing the keys. - - \p Less functor has the semantics like \p std::less but should take arguments of type \ref value_type and \p Q - in any order. - \p pred must imply the same element order as the comparator used for building the set. - */ - template - value_type * get_with( Q const& val, Less pred ) + value_type * do_extract_max() { - assert( gc::is_locked()); + assert( !gc::is_locked() ); - value_type * pFound; - return do_find_with( val, cds::opt::details::make_comparator_from_less(), - [&pFound](value_type& found, Q const& ) { pFound = &found; } ) - ? pFound : nullptr; - } + position pos; + node_type * pDel; - /// Returns item count in the set - /** - The value returned depends on item counter type provided by \p Traits template parameter. - If it is atomicity::empty_item_counter this function always returns 0. - Therefore, the function is not suitable for checking the set emptiness, use \ref empty - member function for this purpose. - */ - size_t size() const - { - return m_ItemCounter; - } + { + rcu_lock l; - /// Checks if the set is empty - bool empty() const - { - return m_Head.head()->next( 0 ).load( memory_model::memory_order_relaxed ) == nullptr; - } + if ( !find_max_position( pos ) ) { + m_Stat.onExtractMaxFailed(); + pDel = nullptr; + } + else { + pDel = pos.pCur; + unsigned int const nHeight = pDel->height(); - /// Clears the set (non-atomic) - /** - The function unlink all items from the set. - The function is not atomic, thus, in multi-threaded environment with parallel insertions - this sequence - \code - set.clear(); - assert( set.empty() ); - \endcode - the assertion could be raised. + if ( try_remove_at( pDel, pos, []( value_type const& ) {}, true ) ) { + --m_ItemCounter; + m_Stat.onRemoveNode( nHeight ); + m_Stat.onExtractMaxSuccess(); + } + else { + m_Stat.onExtractMaxFailed(); + pDel = nullptr; + } + } + } - For each item the \ref disposer will be called automatically after unlinking. - */ - void clear() - { - exempt_ptr ep; - while ( extract_min(ep) ) - ep.release(); + return pDel ? node_traits::to_value_ptr( pDel ) : nullptr; } - /// Returns maximum height of skip-list. The max height is a constant for each object and does not exceed 32. - static CDS_CONSTEXPR unsigned int max_height() CDS_NOEXCEPT + void increase_height( unsigned int nHeight ) { - return c_nMaxHeight; + unsigned int nCur = m_nHeight.load( memory_model::memory_order_relaxed ); + if ( nCur < nHeight ) + m_nHeight.compare_exchange_strong( nCur, nHeight, memory_model::memory_order_release, atomics::memory_order_relaxed ); } - /// Returns const reference to internal statistics - stat const& statistics() const + void destroy() { - return m_Stat; + node_type* p = m_Head.head()->next( 0 ).load( atomics::memory_order_relaxed ).ptr(); + while ( p ) { + node_type* pNext = p->next( 0 ).load( atomics::memory_order_relaxed ).ptr(); + dispose_node( node_traits::to_value_ptr( p ) ); + p = pNext; + } } - /// Clears internal list of ready-to-remove items passing it to RCU reclamation cycle - /** @anchor cds_intrusive_SkipListSet_rcu_force_dispose - Skip list has complex multi-step algorithm for removing an item. In fact, when you - remove the item it is just marked as removed that is enough for the success of your operation. - Actual removing can take place in the future, in another call or even in another thread. - Inside RCU lock the removed item cannot be passed to RCU reclamation cycle - since it can lead to deadlock. To solve this problem, the current skip list implementation - has internal list of items which is ready to remove but is not yet passed to RCU reclamation. - Usually, this list will be passed to RCU reclamation in the next suitable call of skip list member function. - In some cases we want to pass it to RCU reclamation immediately after RCU unlocking. - This function provides such opportunity: it checks whether the RCU is not locked and if it is true - the function passes the internal ready-to-remove list to RCU reclamation cycle. - - The RCU \p synchronize can be called. - */ - void force_dispose() - { - if ( !gc::is_locked() ) - dispose_deferred(); - } + //@endcond }; }} // namespace cds::intrusive -#endif // #ifndef __CDS_INTRUSIVE_SKIP_LIST_RCU_H +#endif // #ifndef CDSLIB_INTRUSIVE_SKIP_LIST_RCU_H