From 4af1a46f51f88e46cc09de40302cd5e52d05b044 Mon Sep 17 00:00:00 2001 From: khizmax Date: Thu, 30 Jul 2015 23:31:12 +0300 Subject: [PATCH] Intrusive MultiLevelHashSet draft done --- .../details/multilevel_hashset_base.h | 68 ++- cds/intrusive/impl/multilevel_hashset.h | 444 +++++++++++++++++- 2 files changed, 494 insertions(+), 18 deletions(-) diff --git a/cds/intrusive/details/multilevel_hashset_base.h b/cds/intrusive/details/multilevel_hashset_base.h index 45fc5102..6bb87688 100644 --- a/cds/intrusive/details/multilevel_hashset_base.h +++ b/cds/intrusive/details/multilevel_hashset_base.h @@ -64,10 +64,65 @@ namespace cds { namespace intrusive { template struct stat { typedef EventCounter event_counter ; ///< Event counter type + + event_counter m_nInsertSuccess; ///< Number of success \p insert() operations + event_counter m_nInsertFailed; ///< Number of failed \p insert() operations + event_counter m_nInsertRetry; ///< Number of attempts to insert new item + event_counter m_nUpdateNew; ///< Number of new item inserted for \p update() + event_counter m_nUpdateExisting; ///< Number of existing item updates + event_counter m_nUpdateFailed; ///< Number of failed \p update() call + event_counter m_nUpdateRetry; ///< Number of attempts to update the item + event_counter m_nEraseSuccess; ///< Number of successful \p erase(), \p unlink(), \p extract() operations + event_counter m_nEraseFailed; ///< Number of failed \p erase(), \p unlink(), \p extract() operations + event_counter m_nEraseRetry; ///< Number of attempts to \p erase() an item + event_counter m_nFindSuccess; ///< Number of successful \p find() and \p get() operations + event_counter m_nFindFailed; ///< Number of failed \p find() and \p get() operations + + event_counter m_nExpandNodeSuccess; ///< Number of succeeded attempts converting data node to array node + event_counter m_nExpandNodeFailed; ///< Number of failed attempts converting data node to array node + event_counter m_nSlotChanged; ///< Number of array node slot changing by other thread during an operation + event_counter m_nSlotConverting; ///< Number of events when we encounter a slot while it is converting to array node + + void onInsertSuccess() { ++m_nInsertSuccess; } + void onInserFailed() { ++m_nInsertFailed; } + void onInsertRetry() { ++m_nInsertRetry; } + void onUpdateNew() { ++m_nUpdateNew; } + void onUpdateExisting() { ++m_nUpdateExisting; } + void onUpdateFailed() { ++m_nUpdateFailed; } + void onUpdateRetry() { ++m_nUpdateRetry; } + void onEraseSuccess() { ++m_nEraseSuccess; } + void onEraseFailed() { ++m_nEraseFailed; } + void onEraseRetry() { ++m_nEraseRetry; } + void onFindSuccess() { ++m_nFinSuccess; } + void onFindFailed() { ++m_nFindFailed; } + + void onExpandNodeSuccess() { ++m_nExpandNodeSuccess; } + void onExpandNodeFailed() { ++m_nExpandNodeFailed; } + void onSlotChanged() { ++m_nSlotChanged; } + void onSlotConverting() { ++m_nSlotConverting; } }; /// \p MultiLevelHashSet empty internal statistics struct empty_stat { + //@cond + void onInsertSuccess() const {} + void onInsertFailed() const {} + void onInsertRetry() const {} + void onUpdateNew() const {} + void onUpdateExisting() const {} + void onUpdateFailed() const {} + void onUpdateRetry() const {} + void onEraseSuccess() const {} + void onEraseFailed() const {} + void onEraseRetry() const {} + void onFindSuccess() const {} + void onFindFailed() const {} + + void onExpandNodeSuccess() const {} + void onExpandNodeFailed() const {} + void onSlotChanged() const {} + void onSlotConverting() const {} + //@endcond }; /// MultiLevelHashSet traits @@ -87,7 +142,8 @@ namespace cds { namespace intrusive { // ... other fields }; - struct foo_hash_functor { + // Hash accessor + struct foo_hash_accessor { hash_type const& operator()( foo const& d ) const { return d.hash; @@ -234,6 +290,7 @@ namespace cds { namespace intrusive { explicit hash_splitter( hash_type const& h ) : m_ptr(reinterpret_cast( &h )) , m_pos(0) + , m_first( m_ptr ) # ifdef _DEBUG , m_last( m_ptr + c_nHashSize ) # endif @@ -297,11 +354,18 @@ namespace cds { namespace intrusive { return nBits ? cut( nBits ) : 0; } + void reset() + { + m_ptr = m_first; + m_pos = 0; + } + private: uint_type const* m_ptr; ///< current position in the hash size_t m_pos; ///< current position in bits + uint_type const* m_first; ///< first position # ifdef _DEBUG - uint_type const* m_last; + uint_type const* m_last; ///< last position # endif }; } // namespace details diff --git a/cds/intrusive/impl/multilevel_hashset.h b/cds/intrusive/impl/multilevel_hashset.h index 6a11bbe4..f0516ef0 100644 --- a/cds/intrusive/impl/multilevel_hashset.h +++ b/cds/intrusive/impl/multilevel_hashset.h @@ -3,6 +3,9 @@ #ifndef CDSLIB_INTRUSIVE_IMPL_MULTILEVEL_HASHSET_H #define CDSLIB_INTRUSIVE_IMPL_MULTILEVEL_HASHSET_H +#include // std::tie +#include // std::ref + #include #include @@ -117,15 +120,20 @@ namespace cds { namespace intrusive { typedef typename gc::template guarded_ptr< value_type > guarded_ptr; ///< Guarded pointer + /// Count of hazard pointers required + static CDS_CONSTEXPR size_t const c_nHazardPtrCount = 1; + + /// Node marked poiter + typedef cds::details::marked_ptr< value_type, 3 > node_ptr; + /// Array node element + typedef atomics::atomic< node_ptr > atomic_node_ptr; + protected: //@cond - enum cell_flags { - logically_deleted = 1, ///< the cell (data node) is logically deleted - array_converting = 2, ///< the cell is converting from data node to an array node - array_node = 4 ///< the cell is a pointer to an array node + enum node_flags { + array_converting = 1, ///< the cell is converting from data node to an array node + array_node = 2 ///< the cell is a pointer to an array node }; - typedef cds::details::marked_ptr< value_type, 7 > node_ptr; - typedef atomics::atomic< node_ptr * > atomic_node_ptr; typedef cds::details::Allocator< atomic_node_ptr, head_node_allocator > cxx_head_node_allocator; typedef cds::details::Allocator< atomic_node_ptr, array_node_allocator > cxx_array_node_allocator; @@ -138,6 +146,7 @@ namespace cds { namespace intrusive { size_t array_node_size; // power-of-two size_t array_node_size_log;// log2( array_node_size ) }; + //@endcond private: @@ -162,14 +171,14 @@ namespace cds { namespace intrusive { */ MultiLevelHashSet( size_t head_bits = 8, size_t array_bits = 4 ) : m_Metrics(make_metrics( head_bits, array_bits )) - , m_Head( cxx_head_node_allocator().NewArray( m_Metrics.head_node_size, nullptr )) + , m_Head( cxx_head_node_allocator().NewArray( head_size(), nullptr )) {} /// Destructs the set and frees all data ~MultiLevelHashSet() { destroy_tree(); - cxx_head_node_allocator().Delete( m_Head, m_Metrics.head_node_size ); + cxx_array_node_allocator().Delete( m_Head, head_size() ); } /// Inserts new node @@ -181,6 +190,7 @@ namespace cds { namespace intrusive { */ bool insert( value_type& val ) { + return insert( val, [](value_type&) {} ); } /// Inserts new node @@ -190,7 +200,7 @@ namespace cds { namespace intrusive { The function allows to split creating of new item into two part: - create item with key only - insert new item into the set - - if inserting is success, calls \p f functor to initialize \p val. + - if inserting is success, calls \p f functor to initialize \p val. The functor signature is: \code @@ -205,6 +215,73 @@ namespace cds { namespace intrusive { template bool insert( value_type& val, Func f ) { + hash_type const& hash = hash_accessor()( val ); + hash_splitter splitter( hash ); + hash_comparator cmp; + gc::Guard guard; + back_off bkoff; + + atomic_node_ptr * pArr = m_Head; + size_t nSlot = splitter.cut( m_Metrics.head_node_size_log ); + assert( nSlot < m_Metrics.head_node_size ); + + while ( true ) { + node_ptr slot = pArr[nSlot].load( memory_model::memory_order_acquire ); + if ( slot.bits() == array_node ) { + // array node, go down the tree + assert( slot.ptr() != nullptr ); + nSlot = splitter.cut( m_Metrics.array_node_size_log ); + assert( nSlot < m_Metrics.array_node_size ); + pArr = to_array( slot.ptr() ); + } + else if ( slot.bits() == array_converting ) { + // the slot is converting to array node right now + bkoff(); + m_Stat.onSlotConverting(); + } + else { + // data node + assert(slot.bits() == 0 ); + + // protect data node by hazard pointer + if ( guard.protect( pArr[nSlot], [](node_ptr p) -> value_type * { return p.ptr(); }) != slot ) { + // slot value has been changed - retry + m_Stat.onSlotChanged(); + } + else if ( slot.ptr() ) { + if ( cmp( hash, hash_accessor()( *slot.ptr() )) == 0 ) { + // the item with that hash value already exists + m_Stat.onInsertFailed(); + return false; + } + + // the slot must be expanded + atomic_node_ptr * pNewArr; + size_t nNewSlot; + std::tie( pNewArr, nNewSlot ) = expand_slot( pArr[ nSlot ], slot, splitter ); + if ( pNewArr ) { + pArr = pNewArr; + nSlot = nNewSlot; + } + } + else { + // the slot is empty, try to insert data node + node_ptr pNull; + if ( pArr[nSlot].compare_exchange_strong( pNull, node_ptr( &val ), memory_model::memory_order_release, atomics::memory_order_relaxed ) + { + // the new data node has been inserted + f( val ); + ++m_ItemCounter; + m_Stat.onInsertSuccess(); + return true; + } + + // insert failed - slot has been changed by another thread + // retry inserting + m_Stat.onInsertRetry(); + } + } + } // while } /// Updates the node @@ -225,6 +302,87 @@ namespace cds { namespace intrusive { template std::pair update( value_type& val, bool bInsert = true ) { + hash_type const& hash = hash_accessor()( val ); + hash_splitter splitter( hash ); + hash_comparator cmp; + gc::Guard guard; + back_off bkoff; + + atomic_node_ptr * pArr = m_Head; + size_t nSlot = splitter.cut( m_Metrics.head_node_size_log ); + assert( nSlot < m_Metrics.head_node_size ); + + while ( true ) { + node_ptr slot = pArr[nSlot].load( memory_model::memory_order_acquire ); + if ( slot.bits() == array_node ) { + // array node, go down the tree + assert( slot.ptr() != nullptr ); + nSlot = splitter.cut( m_Metrics.array_node_size_log ); + assert( nSlot < m_Metrics.array_node_size ); + pArr = to_array( slot.ptr() ); + } + else if ( slot.bits() == array_converting ) { + // the slot is converting to array node right now + bkoff(); + m_Stat.onSlotConverting(); + } + else { + // data node + assert(slot.bits() == 0 ); + + // protect data node by hazard pointer + if ( guard.protect( pArr[nSlot], [](node_ptr p) -> value_type * { return p.ptr(); }) != slot ) { + // slot value has been changed - retry + m_Stat.onSlotChanged(); + } + else if ( slot.ptr() ) { + if ( cmp( hash, hash_accessor()( *slot.ptr() )) == 0 ) { + // the item with that hash value already exists + // Replace it with val + if ( pArr[nSlot].compare_exchange_strong( slot, node_ptr( &val ), memory_model::memory_order_release, atomics::memory_order_relaxed ) ) { + // slot can be disposed + gc::template retire( slot.ptr() ); + m_Stat.onUpdateExisting(); + return std::make_pair( true, false ); + } + + m_Stat.onUpdateRetry(); + continue; + } + + // the slot must be expanded + atomic_node_ptr * pNewArr; + size_t nNewSlot; + std::tie( pNewArr, nNewSlot ) = expand_slot( pArr[ nSlot ], slot, splitter ); + if ( pNewArr ) { + pArr = pNewArr; + nSlot = nNewSlot; + } + } + else { + // the slot is empty, try to insert data node + if ( bInsert ) { + node_ptr pNull; + if ( pArr[nSlot].compare_exchange_strong( pNull, node_ptr( &val ), memory_model::memory_order_release, atomics::memory_order_relaxed ) + { + // the new data node has been inserted + f( val ); + ++m_ItemCounter; + m_Stat.onUpdateNew(); + return std::make_pair( true, true ); + } + } + else { + m_Stat.onUpdateFailed(); + return std:make_pair( false, false ); + } + + // insert failed - slot has been changed by another thread + // retry updating + m_Stat.onUpdateRetry(); + } + } + } // while } /// Unlinks the item \p val from the set @@ -236,6 +394,18 @@ namespace cds { namespace intrusive { */ bool unlink( value_type& val ) { + typename gc::Guard guard; + auto pred = [&val](value_type const& item) -> bool { return &item == &val; }; + value_type * p = do_erase( hash, guard, std::ref( pred )); + + // p is guarded by HP + if ( p ) { + gc::template retire( p ); + --m_ItemCounter; + m_Stat.onEraseSuccess(); + return true; + } + return false; } /// Deletes the item from the set @@ -249,6 +419,7 @@ namespace cds { namespace intrusive { */ bool erase( hash_type const& hash ) { + return erase(hash, [](value_type const&) {} ); } /// Deletes the item from the set @@ -261,7 +432,7 @@ namespace cds { namespace intrusive { The \p Func interface is \code struct functor { - void operator()( value_type const& item ); + void operator()( value_type& item ); }; \endcode @@ -270,6 +441,18 @@ namespace cds { namespace intrusive { template bool erase( hash_type const& hash, Func f ) { + typename gc::Guard guard; + value_type * p = do_erase( hash, guard, []( value_type const&) -> bool {return true; } ); + + // p is guarded by HP + if ( p ) { + gc::template retire( p ); + --m_ItemCounter; + f( *p ); + m_Stat.onEraseSuccess(); + return true; + } + return false; } /// Extracts the item with specified \p hash @@ -299,6 +482,17 @@ namespace cds { namespace intrusive { */ guarded_ptr extract( hash_type const& hash ) { + typename gc::Guard guard; + value_type * p = do_erase( hash, guard, []( value_type const&) -> bool {return true; } ); + + // p is guarded by HP + if ( p ) { + gc::template retire( p ); + --m_ItemCounter; + m_Stat.onEraseSuccess(); + return guarded_ptr(p); + } + return guarded_ptr(); } /// Finds an item by it's \p hash @@ -322,6 +516,15 @@ namespace cds { namespace intrusive { template bool find( hash_type const& hash, Func f ) { + typename gc::Guard guard; + value_type * p = search( hash, guard ); + + // p is guarded by HP + if ( p ) { + f( *p ); + return true; + } + return false; } /// Checks whether the set contains \p hash @@ -331,6 +534,7 @@ namespace cds { namespace intrusive { */ bool contains( hash_type const& hash ) { + return find( hash, [](value_type&) {} ); } /// Finds an item by it's \p hash and returns the item found @@ -358,20 +562,26 @@ namespace cds { namespace intrusive { */ guarded_ptr get( hash_type const& hash ) { + typename gc::Guard guard; + value_type * p = search( hash, guard ); + + // p is guarded by HP + if ( p ) + return guarded_ptr( p ); + return guarded_ptr(); } /// Clears the set (non-atomic) /** - The function unlink all items from the set. - The function is not atomic. It removes all data nodes and then resets the item counter to zero. - If there are a thread that performs insertion while \p %clear() is working the result is undefined in general case: - \p empty() may return \p true but the set may contain item(s). - Therefore, \p %clear() may be used only for debugging purposes. + The function unlink all data node from the set. + The function is not atomic but is thread-safe. + After \p %clear() the set may not be empty because another threads may insert items. For each item the \p disposer is called after unlinking. */ void clear() { + clear_array( m_Head, head_size() ); } /// Checks if the set is empty @@ -439,16 +649,218 @@ namespace cds { namespace intrusive { return (reinterpret_cast(p) & node_ptr::bitmask) == 0; } + atomic_node_ptr * alloc_array_node() const + { + return cxx_array_node_allocator().NewArray( array_node_size(), nullptr ) + } + + void free_array_node( atomic_node_ptr * parr ) const + { + cxx_array_node_allocator().Delete( parr, array_node_size() ); + } + void destroy_tree() { // The function is not thread-safe. For use in dtor only // Remove data node clear(); - //TODO: free all array nodes + // Destroy all array nodes + destroy_array_nodes( m_Head, head_size()); + } + + void destroy_array_nodes( atomic_node_ptr * pArr, size_t nSize ) + { + for ( atomic_node_ptr * pLast = pArr + nSize; pArr != pLast; ++pArr ) { + node_ptr slot = pArr->load( memory_model::memory_order_acquire ); + if ( slot.bits() == array_node ) { + destroy_array_nodes(to_array(slot.ptr()), array_node_size()); + free_array_node( to_array(slot.ptr())); + pArr->store(node_ptr(), memory_model::memory_order_relaxed ); + } + } + } + + void clear_array( atomic_node_ptr * pArr, size_t nSize ) + { + back_off bkoff; + + for ( atomic_node_ptr * pLast = pArr + nSize; pArr != pLast; ++pArr ) { + while ( true ) { + node_ptr slot = pArr->load( memory_model::memory_order_acquire ); + if ( slot.bits() == array_node ) { + // array node, go down the tree + assert( slot.ptr() != nullptr ); + clear_array( to_array( slot.ptr()), array_node_size() ); + break; + } + else if ( slot.bits() == array_converting ) { + // the slot is converting to array node right now + while ( (slot = pArr->load(memory_model::memory_order_acquire)).bits() == array_converting ) { + bkoff(); + m_Stat.onSlotConverting(); + } + bkoff.reset(); + + assert( slot.ptr() != nullptr ); + assert(slot.bits() == array_node ); + clear_array( to_array( slot.ptr()), array_node_size() ); + break; + } + else { + // data node + if ( pArr->compare_exchange_strong( slot, node_ptr(), memory_model::memory_order_acquire, atomics::memory_order_relaxed ) ) { + gc::template retire( p ); + --m_ItemCounter; + m_Stat.onEraseSuccess(); + break; + } + } + } + } + } + + union converter { + value_type * pData; + atomic_node_ptr * pArr; + + converter( value_type * p ) + : pData( p ) + {} + + converter( atomic_node_ptr * p ) + : pArr( p ) + {} + }; + + static atomic_node_ptr * to_array( value_type * p ) + { + return converter( p ).pArr; + } + static node_ptr * to_node( atomic_node_ptr * p ) + { + return converter( p ).pData; } + + std::pair< atomic_node_ptr *, size_t > expand_slot( atomic_node_ptr& slot, node_ptr current, hash_splitter& splitter ) + { + node_ptr cur(current.ptr()); + if ( !slot.compare_exchange_strong( cur, cur | array_converting, memory_model::memory_order_release, atomics::memory_order_relaxed )) { + m_Stat.onExpandNodeFailed(); + return std::make_pair(static_cast(nullptr), size_t(0)); + } + + atomic_node_ptr * pArr = alloc_array_node(); + size_t idx = splitter.cut( m_Metrics.array_node_size_log ); + pArr[idx].store( current, memory_model::memory_order_release ); + + cur = cur | array_converting; + CDS_VERIFY( slot.compare_exchange_strong( cur, node_ptr( to_node( pArr ), array_node ), memory_model::memory_order_release, atomics::memory_order_relaxed ))); + + return std::make_pair( pArr, idx ); + } + + value_type * search( hash_type const& hash, typename gc::Guard& guard ) + { + hash_splitter splitter( hash ); + hash_comparator cmp; + back_off bkoff; + + atomic_node_ptr * pArr = m_Head; + size_t nSlot = splitter.cut( m_Metrics.head_node_size_log ); + assert( nSlot < m_Metrics.head_node_size ); + + while ( true ) { + node_ptr slot = pArr[nSlot].load( memory_model::memory_order_acquire ); + if ( slot.bits() == array_node ) { + // array node, go down the tree + assert( slot.ptr() != nullptr ); + nSlot = splitter.cut( m_Metrics.array_node_size_log ); + assert( nSlot < m_Metrics.array_node_size ); + pArr = to_array( slot.ptr() ); + } + else if ( slot.bits() == array_converting ) { + // the slot is converting to array node right now + bkoff(); + m_Stat.onSlotConverting(); + } + else { + // data node + assert(slot.bits() == 0 ); + + // protect data node by hazard pointer + if ( guard.protect( pArr[nSlot], [](node_ptr p) -> value_type * { return p.ptr(); }) != slot ) { + // slot value has been changed - retry + m_Stat.onSlotChanged(); + } + else if ( slot.ptr() && cmp( hash, hash_accessor()( *slot.ptr() )) == 0 ) { + // item found + m_Stat.onFindSucces(); + return slot.ptr(); + } + m_Stat.onFindFailed(); + return nullptr; + } + } // while + } + + template + value_type * do_erase( hash_type const& hash, typename gc::Guard& guard, Predicate pred ) + { + hash_splitter splitter( hash ); + hash_comparator cmp; + back_off bkoff; + + atomic_node_ptr * pArr = m_Head; + size_t nSlot = splitter.cut( m_Metrics.head_node_size_log ); + assert( nSlot < m_Metrics.head_node_size ); + + while ( true ) { + node_ptr slot = pArr[nSlot].load( memory_model::memory_order_acquire ); + if ( slot.bits() == array_node ) { + // array node, go down the tree + assert( slot.ptr() != nullptr ); + nSlot = splitter.cut( m_Metrics.array_node_size_log ); + assert( nSlot < m_Metrics.array_node_size ); + pArr = to_array( slot.ptr() ); + } + else if ( slot.bits() == array_converting ) { + // the slot is converting to array node right now + bkoff(); + m_Stat.onSlotConverting(); + } + else { + // data node + assert(slot.bits() == 0 ); + + // protect data node by hazard pointer + if ( guard.protect( pArr[nSlot], [](node_ptr p) -> value_type * { return p.ptr(); }) != slot ) { + // slot value has been changed - retry + m_Stat.onSlotChanged(); + } + else if ( slot.ptr() ) { + if ( cmp( hash, hash_accessor()( *slot.ptr() )) == 0 && pred( *slot.ptr() )) { + // item found - replace it with nullptr + if ( pArr[nSlot].compare_exchange_strong(slot, node_ptr(nullptr), memory_model::memory_order_acquire, atomics::memory_order_relaxed)) + return slot.ptr(); + m_Stat.onEraseRetry(); + continue; + } + m_Stat.onEraseFailed(); + return nullptr; + } + else { + // the slot is empty + m_Stat.onEraseFailed(); + return nullptr; + } + } + } // while + } + //@endcond }; }} // namespace cds::intrusive #endif // #ifndef CDSLIB_INTRUSIVE_IMPL_MULTILEVEL_HASHSET_H +1 \ No newline at end of file -- 2.34.1