X-Git-Url: http://demsky.eecs.uci.edu/git/?a=blobdiff_plain;f=cds%2Fintrusive%2Fmspriority_queue.h;h=a41cd9893650fe9491810570de6b104fb84e1bab;hb=e24698da9526a66440762b361c017a34eae600d1;hp=b502fd2d79e200712192054c0daf9cbc315b14d2;hpb=696ef88ac5b9503da2300246865ba1671d87aae3;p=libcds.git diff --git a/cds/intrusive/mspriority_queue.h b/cds/intrusive/mspriority_queue.h index b502fd2d..a41cd989 100644 --- a/cds/intrusive/mspriority_queue.h +++ b/cds/intrusive/mspriority_queue.h @@ -1,17 +1,45 @@ -//$$CDS-header$$ - -#ifndef __CDS_INTRUSIVE_MSPRIORITY_QUEUE_H -#define __CDS_INTRUSIVE_MSPRIORITY_QUEUE_H - -#include -#include +/* + This file is a part of libcds - Concurrent Data Structures library + + (C) Copyright Maxim Khizhinsky (libcds.dev@gmail.com) 2006-2017 + + Source code repo: http://github.com/khizmax/libcds/ + Download: http://sourceforge.net/projects/libcds/files/ + + Redistribution and use in source and binary forms, with or without + modification, are permitted provided that the following conditions are met: + + * Redistributions of source code must retain the above copyright notice, this + list of conditions and the following disclaimer. + + * Redistributions in binary form must reproduce the above copyright notice, + this list of conditions and the following disclaimer in the documentation + and/or other materials provided with the distribution. + + THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" + AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE + IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE + DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE + FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL + DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR + SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER + CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, + OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. +*/ + +#ifndef CDSLIB_INTRUSIVE_MSPRIORITY_QUEUE_H +#define CDSLIB_INTRUSIVE_MSPRIORITY_QUEUE_H + +#include // std::unique_lock +#include +#include #include #include #include #include #include #include -#include namespace cds { namespace intrusive { @@ -25,12 +53,15 @@ namespace cds { namespace intrusive { struct stat { typedef Counter event_counter ; ///< Event counter type - event_counter m_nPushCount ; ///< Count of success push operation - event_counter m_nPopCount ; ///< Count of success pop operation - event_counter m_nPushFailCount ; ///< Count of failed ("the queue is full") push operation - event_counter m_nPopFailCount ; ///< Count of failed ("the queue is empty") pop operation - event_counter m_nPushHeapifySwapCount ; ///< Count of item swapping when heapifying in push - event_counter m_nPopHeapifySwapCount ; ///< Count of item swapping when heapifying in pop + event_counter m_nPushCount; ///< Count of success push operation + event_counter m_nPopCount; ///< Count of success pop operation + event_counter m_nPushFailCount; ///< Count of failed ("the queue is full") push operation + event_counter m_nPopFailCount; ///< Count of failed ("the queue is empty") pop operation + event_counter m_nPushHeapifySwapCount; ///< Count of item swapping when heapifying in push + event_counter m_nPopHeapifySwapCount; ///< Count of item swapping when heapifying in pop + event_counter m_nItemMovedTop; ///< Count of events when \p push() encountered that inserted item was moved to top by a concurrent \p pop() + event_counter m_nItemMovedUp; ///< Count of events when \p push() encountered that inserted item was moved upwards by a concurrent \p pop() + event_counter m_nPushEmptyPass; ///< Count of empty pass during heapify via concurrent operations //@cond void onPushSuccess() { ++m_nPushCount ;} @@ -39,54 +70,62 @@ namespace cds { namespace intrusive { void onPopFailed() { ++m_nPopFailCount ;} void onPushHeapifySwap() { ++m_nPushHeapifySwapCount ;} void onPopHeapifySwap() { ++m_nPopHeapifySwapCount ;} + + void onItemMovedTop() { ++m_nItemMovedTop ;} + void onItemMovedUp() { ++m_nItemMovedUp ;} + void onPushEmptyPass() { ++m_nPushEmptyPass ;} //@endcond }; /// MSPriorityQueue empty statistics struct empty_stat { //@cond - void onPushSuccess() {} - void onPopSuccess() {} - void onPushFailed() {} - void onPopFailed() {} - void onPushHeapifySwap() {} - void onPopHeapifySwap() {} + void onPushSuccess() const {} + void onPopSuccess() const {} + void onPushFailed() const {} + void onPopFailed() const {} + void onPushHeapifySwap() const {} + void onPopHeapifySwap() const {} + + void onItemMovedTop() const {} + void onItemMovedUp() const {} + void onPushEmptyPass() const {} //@endcond }; - /// Type traits for MSPriorityQueue - struct type_traits { + /// MSPriorityQueue traits + struct traits { /// Storage type /** - The storage type for the heap array. Default is cds::opt::v::dynamic_buffer. + The storage type for the heap array. Default is \p cds::opt::v::initialized_dynamic_buffer. You may specify any type of buffer's value since at instantiation time the \p buffer::rebind member metafunction is called to change type of values stored in the buffer. */ - typedef opt::v::dynamic_buffer buffer; + typedef opt::v::initialized_dynamic_buffer buffer; /// Priority compare functor /** No default functor is provided. If the option is not specified, the \p less is used. */ - typedef opt::none compare; + typedef opt::none compare; - /// specifies binary predicate used for priority comparing. + /// Specifies binary predicate used for priority comparing. /** Default is \p std::less. */ - typedef opt::none less; + typedef opt::none less; - /// Type of mutual-exclusion lock - typedef lock::Spin lock_type; + /// Type of mutual-exclusion lock. The lock is not need to be recursive. + typedef cds::sync::spin lock_type; /// Back-off strategy - typedef backoff::yield back_off; + typedef backoff::Default back_off; /// Internal statistics /** - Possible types: mspriority_queue::empty_stat (the default), mspriority_queue::stat + Possible types: \p mspriority_queue::empty_stat (the default, no overhead), \p mspriority_queue::stat or any other with interface like \p %mspriority_queue::stat */ typedef empty_stat stat; @@ -94,18 +133,26 @@ namespace cds { namespace intrusive { /// Metafunction converting option list to traits /** - This is a wrapper for cds::opt::make_options< type_traits, Options...> - - See \ref MSPriorityQueue, \ref type_traits, \ref cds::opt::make_options. + \p Options: + - \p opt::buffer - the buffer type for heap array. Possible type are: \p opt::v::initialized_static_buffer, \p opt::v::initialized_dynamic_buffer. + Default is \p %opt::v::initialized_dynamic_buffer. + You may specify any type of value for the buffer since at instantiation time + the \p buffer::rebind member metafunction is called to change the type of values stored in the buffer. + - \p opt::compare - priority compare functor. No default functor is provided. + If the option is not specified, the \p opt::less is used. + - \p opt::less - specifies binary predicate used for priority compare. Default is \p std::less. + - \p opt::lock_type - lock type. Default is \p cds::sync::spin + - \p opt::back_off - back-off strategy. Default is \p cds::backoff::yield + - \p opt::stat - internal statistics. Available types: \p mspriority_queue::stat, \p mspriority_queue::empty_stat (the default, no overhead) */ - template + template struct make_traits { # ifdef CDS_DOXYGEN_INVOKED typedef implementation_defined type ; ///< Metafunction result # else typedef typename cds::opt::make_options< - typename cds::opt::find_type_traits< type_traits, CDS_OPTIONS7 >::type - ,CDS_OPTIONS7 + typename cds::opt::find_type_traits< traits, Options... >::type + ,Options... >::type type; # endif }; @@ -133,24 +180,12 @@ namespace cds { namespace intrusive { single-lock algorithm. Template parameters: - - \p T - type to be stored in the list. The priority is a part of \p T type. - - \p Traits - type traits. See mspriority_queue::type_traits for explanation. - - It is possible to declare option-based queue with cds::container::mspriority_queue::make_traits - metafunction instead of \p Traits template argument. - Template argument list \p Options of \p %cds::container::mspriority_queue::make_traits metafunction are: - - opt::buffer - the buffer type for heap array. Possible type are: opt::v::static_buffer, opt::v::dynamic_buffer. - Default is \p %opt::v::dynamic_buffer. - You may specify any type of values for the buffer since at instantiation time - the \p buffer::rebind member metafunction is called to change the type of values stored in the buffer. - - opt::compare - priority compare functor. No default functor is provided. - If the option is not specified, the opt::less is used. - - opt::less - specifies binary predicate used for priority compare. Default is \p std::less. - - opt::lock_type - lock type. Default is cds::lock::Spin. - - opt::back_off - back-off strategy. Default is cds::backoff::yield - - opt::stat - internal statistics. Available types: mspriority_queue::stat, mspriority_queue::empty_stat (the default) + - \p T - type to be stored in the queue. The priority is a part of \p T type. + - \p Traits - type traits. See \p mspriority_queue::traits for explanation. + It is possible to declare option-based queue with \p cds::container::mspriority_queue::make_traits + metafunction instead of \p Traits template argument. */ - template + template class MSPriorityQueue: public cds::bounded_container { public: @@ -163,13 +198,14 @@ namespace cds { namespace intrusive { typedef typename opt::details::make_comparator< value_type, traits >::type key_comparator; # endif - typedef typename traits::lock_type lock_type ; ///< heap's size lock type - typedef typename traits::back_off back_off ; ///< Back-off strategy - typedef typename traits::stat stat ; ///< internal statistics type + typedef typename traits::lock_type lock_type; ///< heap's size lock type + typedef typename traits::back_off back_off; ///< Back-off strategy + typedef typename traits::stat stat; ///< internal statistics type, see \p mspriority_queue::traits::stat + typedef typename cds::bitop::bit_reverse_counter<> item_counter;///< Item counter type protected: //@cond - typedef std::thread::id tag_type; + typedef cds::OS::ThreadId tag_type; enum tag_value { Available = -1, @@ -187,7 +223,7 @@ namespace cds { namespace intrusive { /// Creates empty node node() : m_pVal( nullptr ) - , m_nTag( tag_type(Empty) ) + , m_nTag( tag_type(Empty)) {} /// Lock the node @@ -204,27 +240,15 @@ namespace cds { namespace intrusive { }; //@endcond - protected: - //@cond -# ifndef CDS_CXX11_LAMBDA_SUPPORT - struct empty_cleaner - { - void operator()( value_type const& ) const - {} - }; -# endif - //@endcond - public: typedef typename traits::buffer::template rebind::other buffer_type ; ///< Heap array buffer type //@cond - typedef cds::bitop::bit_reverse_counter<> item_counter_type; - typedef typename item_counter_type::counter_type counter_type; + typedef typename item_counter::counter_type counter_type; //@endcond protected: - item_counter_type m_ItemCounter ; ///< Item counter + item_counter m_ItemCounter ; ///< Item counter mutable lock_type m_Lock ; ///< Heap's size lock buffer_type m_Heap ; ///< Heap array stat m_Stat ; ///< internal statistics accumulator @@ -232,7 +256,7 @@ namespace cds { namespace intrusive { public: /// Constructs empty priority queue /** - For cds::opt::v::static_buffer the \p nCapacity parameter is ignored. + For \p cds::opt::v::initialized_static_buffer the \p nCapacity parameter is ignored. */ MSPriorityQueue( size_t nCapacity ) : m_Heap( nCapacity ) @@ -248,18 +272,18 @@ namespace cds { namespace intrusive { /** If the priority queue is full, the function returns \p false, no item has been added. - Otherwise, the function inserts the copy of \p val into the heap + Otherwise, the function inserts the pointer to \p val into the heap and returns \p true. - The function use copy constructor to create new heap item from \p val. + The function does not make a copy of \p val. */ bool push( value_type& val ) { - tag_type const curId = std::this_thread::get_id(); + tag_type const curId = cds::OS::get_current_thread_id(); // Insert new item at bottom of the heap m_Lock.lock(); - if ( m_ItemCounter.value() >= capacity() ) { + if ( m_ItemCounter.value() >= capacity()) { // the heap is full m_Lock.unlock(); m_Stat.onPushFailed(); @@ -267,16 +291,18 @@ namespace cds { namespace intrusive { } counter_type i = m_ItemCounter.inc(); - assert( i < m_Heap.capacity() ); + assert( i < m_Heap.capacity()); node& refNode = m_Heap[i]; refNode.lock(); m_Lock.unlock(); + assert( refNode.m_nTag == tag_type( Empty )); + assert( refNode.m_pVal == nullptr ); refNode.m_pVal = &val; refNode.m_nTag = curId; refNode.unlock(); - // Move item towards top of the heap while it has higher priority than parent + // Move item towards top of heap while it has a higher priority than its parent heapify_after_push( i, curId ); m_Stat.onPushSuccess(); @@ -287,26 +313,34 @@ namespace cds { namespace intrusive { /** If the priority queue is empty, the function returns \p nullptr. Otherwise, it returns the item extracted. - - The item returned may be disposed immediately. */ value_type * pop() { + node& refTop = m_Heap[1]; + m_Lock.lock(); if ( m_ItemCounter.value() == 0 ) { // the heap is empty m_Lock.unlock(); m_Stat.onPopFailed(); - return false; + return nullptr; } - counter_type nBottom = m_ItemCounter.reversed_value(); - m_ItemCounter.dec(); - // Since m_Heap[0] is not used, capacity() returns m_Heap.capacity() - 1 - // Consequently, "<=" is here - assert( nBottom <= capacity() ); + counter_type nBottom = m_ItemCounter.dec(); + assert( nBottom < m_Heap.capacity()); assert( nBottom > 0 ); - node& refBottom = m_Heap[ nBottom ]; + refTop.lock(); + if ( nBottom == 1 ) { + refTop.m_nTag = tag_type( Empty ); + value_type * pVal = refTop.m_pVal; + refTop.m_pVal = nullptr; + refTop.unlock(); + m_Lock.unlock(); + m_Stat.onPopSuccess(); + return pVal; + } + + node& refBottom = m_Heap[nBottom]; refBottom.lock(); m_Lock.unlock(); refBottom.m_nTag = tag_type(Empty); @@ -314,9 +348,7 @@ namespace cds { namespace intrusive { refBottom.m_pVal = nullptr; refBottom.unlock(); - node& refTop = m_Heap[ 1 ]; - refTop.lock(); - if ( refTop.m_nTag == tag_type(Empty) ) { + if ( refTop.m_nTag == tag_type(Empty)) { // nBottom == nTop refTop.unlock(); m_Stat.onPopSuccess(); @@ -326,10 +358,8 @@ namespace cds { namespace intrusive { std::swap( refTop.m_pVal, pVal ); refTop.m_nTag = tag_type( Available ); - assert( nBottom > 1 ); - // refTop will be unlocked inside heapify_after_pop - heapify_after_pop( 1, &refTop ); + heapify_after_pop( &refTop ); m_Stat.onPopSuccess(); return pVal; @@ -341,11 +371,7 @@ namespace cds { namespace intrusive { */ void clear() { -# ifdef CDS_CXX11_LAMBDA_SUPPORT - clear_with( []( value_type const& src ) {} ); -# else - clear_with( empty_cleaner() ); -# endif + clear_with( []( value_type const& /*src*/ ) {} ); } /// Clears the queue (not atomic) @@ -365,11 +391,9 @@ namespace cds { namespace intrusive { template void clear_with( Func f ) { - while ( !empty() ) { - value_type * pVal = pop(); - if ( pVal ) - cds::unref(f)( *pVal ); - } + value_type * pVal; + while (( pVal = pop()) != nullptr ) + f( *pVal ); } /// Checks is the priority queue is empty @@ -387,10 +411,8 @@ namespace cds { namespace intrusive { /// Returns current size of priority queue size_t size() const { - m_Lock.lock(); - size_t nSize = (size_t) m_ItemCounter.value(); - m_Lock.unlock(); - return nSize; + std::unique_lock l( m_Lock ); + return static_cast( m_ItemCounter.value()); } /// Return capacity of the priority queue @@ -435,12 +457,18 @@ namespace cds { namespace intrusive { i = 0; } } - else if ( refParent.m_nTag == tag_type(Empty) ) + else if ( refParent.m_nTag == tag_type( Empty )) { + m_Stat.onItemMovedTop(); i = 0; - else if ( refItem.m_nTag != curId ) + } + else if ( refItem.m_nTag != curId ) { + m_Stat.onItemMovedUp(); i = nParent; - else + } + else { + m_Stat.onPushEmptyPass(); bProgress = false; + } refItem.unlock(); refParent.unlock(); @@ -460,37 +488,37 @@ namespace cds { namespace intrusive { } } - void heapify_after_pop( counter_type nParent, node * pParent ) + void heapify_after_pop( node * pParent ) { key_comparator cmp; + counter_type const nCapacity = m_Heap.capacity(); - while ( nParent < m_Heap.capacity() / 2 ) { - counter_type nLeft = nParent * 2; - counter_type nRight = nLeft + 1; - node& refLeft = m_Heap[nLeft]; - node& refRight = m_Heap[nRight]; - refLeft.lock(); - refRight.lock(); - - counter_type nChild; - node * pChild; - if ( refLeft.m_nTag == tag_type(Empty) ) { - refRight.unlock(); - refLeft.unlock(); + counter_type nParent = 1; + for ( counter_type nChild = nParent * 2; nChild < nCapacity; nChild *= 2 ) { + node* pChild = &m_Heap[ nChild ]; + pChild->lock(); + + if ( pChild->m_nTag == tag_type( Empty )) { + pChild->unlock(); break; } - else if ( refRight.m_nTag == tag_type(Empty) || cmp( *refLeft.m_pVal, *refRight.m_pVal ) > 0 ) { - refRight.unlock(); - nChild = nLeft; - pChild = &refLeft; - } - else { - refLeft.unlock(); - nChild = nRight; - pChild = &refRight; + + counter_type const nRight = nChild + 1; + if ( nRight < nCapacity ) { + node& refRight = m_Heap[nRight]; + refRight.lock(); + + if ( refRight.m_nTag != tag_type( Empty ) && cmp( *refRight.m_pVal, *pChild->m_pVal ) > 0 ) { + // get right child + pChild->unlock(); + nChild = nRight; + pChild = &refRight; + } + else + refRight.unlock(); } - // If child has higher priority that parent then swap + // If child has higher priority than parent then swap // Otherwise stop if ( cmp( *pChild->m_pVal, *pParent->m_pVal ) > 0 ) { std::swap( pParent->m_nTag, pChild->m_nTag ); @@ -512,4 +540,4 @@ namespace cds { namespace intrusive { }} // namespace cds::intrusive -#endif // #ifndef __CDS_INTRUSIVE_MSPRIORITY_QUEUE_H +#endif // #ifndef CDSLIB_INTRUSIVE_MSPRIORITY_QUEUE_H