-//$$CDS-header$$
-
-#ifndef __CDS_INTRUSIVE_MSPRIORITY_QUEUE_H
-#define __CDS_INTRUSIVE_MSPRIORITY_QUEUE_H
-
-#include <cds/intrusive/base.h>
-#include <cds/lock/spinlock.h>
+/*
+ This file is a part of libcds - Concurrent Data Structures library
+
+ (C) Copyright Maxim Khizhinsky (libcds.dev@gmail.com) 2006-2016
+
+ Source code repo: http://github.com/khizmax/libcds/
+ Download: http://sourceforge.net/projects/libcds/files/
+
+ Redistribution and use in source and binary forms, with or without
+ modification, are permitted provided that the following conditions are met:
+
+ * Redistributions of source code must retain the above copyright notice, this
+ list of conditions and the following disclaimer.
+
+ * Redistributions in binary form must reproduce the above copyright notice,
+ this list of conditions and the following disclaimer in the documentation
+ and/or other materials provided with the distribution.
+
+ THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
+ AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
+ IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
+ DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE
+ FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
+ DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
+ SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
+ CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,
+ OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+ OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+*/
+
+#ifndef CDSLIB_INTRUSIVE_MSPRIORITY_QUEUE_H
+#define CDSLIB_INTRUSIVE_MSPRIORITY_QUEUE_H
+
+#include <mutex> // std::unique_lock
+#include <cds/intrusive/details/base.h>
+#include <cds/sync/spinlock.h>
#include <cds/os/thread.h>
#include <cds/details/bit_reverse_counter.h>
#include <cds/intrusive/options.h>
#include <cds/opt/buffer.h>
#include <cds/opt/compare.h>
#include <cds/details/bounded_container.h>
-#include <cds/ref.h>
namespace cds { namespace intrusive {
struct stat {
typedef Counter event_counter ; ///< Event counter type
- event_counter m_nPushCount ; ///< Count of success push operation
- event_counter m_nPopCount ; ///< Count of success pop operation
- event_counter m_nPushFailCount ; ///< Count of failed ("the queue is full") push operation
- event_counter m_nPopFailCount ; ///< Count of failed ("the queue is empty") pop operation
- event_counter m_nPushHeapifySwapCount ; ///< Count of item swapping when heapifying in push
- event_counter m_nPopHeapifySwapCount ; ///< Count of item swapping when heapifying in pop
+ event_counter m_nPushCount; ///< Count of success push operation
+ event_counter m_nPopCount; ///< Count of success pop operation
+ event_counter m_nPushFailCount; ///< Count of failed ("the queue is full") push operation
+ event_counter m_nPopFailCount; ///< Count of failed ("the queue is empty") pop operation
+ event_counter m_nPushHeapifySwapCount; ///< Count of item swapping when heapifying in push
+ event_counter m_nPopHeapifySwapCount; ///< Count of item swapping when heapifying in pop
+ event_counter m_nItemMovedTop; ///< Count of events when \p push() encountered that inserted item was moved to top by a concurrent \p pop()
+ event_counter m_nItemMovedUp; ///< Count of events when \p push() encountered that inserted item was moved upwards by a concurrent \p pop()
+ event_counter m_nPushEmptyPass; ///< Count of empty pass during heapify via concurrent operations
//@cond
void onPushSuccess() { ++m_nPushCount ;}
void onPopFailed() { ++m_nPopFailCount ;}
void onPushHeapifySwap() { ++m_nPushHeapifySwapCount ;}
void onPopHeapifySwap() { ++m_nPopHeapifySwapCount ;}
+
+ void onItemMovedTop() { ++m_nItemMovedTop ;}
+ void onItemMovedUp() { ++m_nItemMovedUp ;}
+ void onPushEmptyPass() { ++m_nPushEmptyPass ;}
//@endcond
};
/// MSPriorityQueue empty statistics
struct empty_stat {
//@cond
- void onPushSuccess() {}
- void onPopSuccess() {}
- void onPushFailed() {}
- void onPopFailed() {}
- void onPushHeapifySwap() {}
- void onPopHeapifySwap() {}
+ void onPushSuccess() const {}
+ void onPopSuccess() const {}
+ void onPushFailed() const {}
+ void onPopFailed() const {}
+ void onPushHeapifySwap() const {}
+ void onPopHeapifySwap() const {}
+
+ void onItemMovedTop() const {}
+ void onItemMovedUp() const {}
+ void onPushEmptyPass() const {}
//@endcond
};
- /// Type traits for MSPriorityQueue
- struct type_traits {
+ /// Monotonic item counter, see \p traits::item_counter for explanation
+ class monotonic_counter
+ {
+ //@cond
+ public:
+ typedef size_t counter_type;
+
+ monotonic_counter()
+ : m_nCounter(0)
+ {}
+
+ size_t inc()
+ {
+ return ++m_nCounter;
+ }
+
+ size_t dec()
+ {
+ return m_nCounter--;
+ }
+
+ size_t value() const
+ {
+ return m_nCounter;
+ }
+
+ private:
+ size_t m_nCounter;
+ //@endcond
+ };
+
+ /// MSPriorityQueue traits
+ struct traits {
/// Storage type
/**
- The storage type for the heap array. Default is cds::opt::v::dynamic_buffer.
+ The storage type for the heap array. Default is \p cds::opt::v::initialized_dynamic_buffer.
You may specify any type of buffer's value since at instantiation time
the \p buffer::rebind member metafunction is called to change type
of values stored in the buffer.
*/
- typedef opt::v::dynamic_buffer<void *> buffer;
+ typedef opt::v::initialized_dynamic_buffer<void *> buffer;
/// Priority compare functor
/**
No default functor is provided. If the option is not specified, the \p less is used.
*/
- typedef opt::none compare;
+ typedef opt::none compare;
- /// specifies binary predicate used for priority comparing.
+ /// Specifies binary predicate used for priority comparing.
/**
Default is \p std::less<T>.
*/
- typedef opt::none less;
+ typedef opt::none less;
- /// Type of mutual-exclusion lock
- typedef lock::Spin lock_type;
+ /// Type of mutual-exclusion lock. The lock is not need to be recursive.
+ typedef cds::sync::spin lock_type;
/// Back-off strategy
typedef backoff::yield back_off;
/// Internal statistics
/**
- Possible types: mspriority_queue::empty_stat (the default), mspriority_queue::stat
+ Possible types: \p mspriority_queue::empty_stat (the default, no overhead), \p mspriority_queue::stat
or any other with interface like \p %mspriority_queue::stat
*/
typedef empty_stat stat;
+
+ /// Item counter type
+ /**
+ Two type are possible:
+ - \p cds::bitop::bit_reverse_counter - a counter described in <a href="http://www.research.ibm.com/people/m/michael/ipl-1996.pdf">original paper</a>,
+ which was developed for reducing lock contention. However, bit-reversing technigue requires more memory than classic heapifying algorithm
+ because of sparsing of elements: for priority queue of max size \p N the bit-reversing technique requires array size up to 2<sup>K</sup>
+ where \p K - the nearest power of two such that <tt>2<sup>K</sup> >= N</tt>.
+ - \p mspriority_queue::monotonic_counter - a classic monotonic item counter. This counter can lead to false sharing under high contention.
+ By the other hand, for priority queue of max size \p N it requires \p N array size.
+
+ By default, \p MSPriorityQueue uses \p %cds::bitop::bit_reverse_counter as described in original paper.
+ */
+ typedef cds::bitop::bit_reverse_counter<> item_counter;
};
/// Metafunction converting option list to traits
/**
- This is a wrapper for <tt> cds::opt::make_options< type_traits, Options...> </tt>
-
- See \ref MSPriorityQueue, \ref type_traits, \ref cds::opt::make_options.
+ \p Options:
+ - \p opt::buffer - the buffer type for heap array. Possible type are: \p opt::v::initialized_static_buffer, \p opt::v::initialized_dynamic_buffer.
+ Default is \p %opt::v::initialized_dynamic_buffer.
+ You may specify any type of value for the buffer since at instantiation time
+ the \p buffer::rebind member metafunction is called to change the type of values stored in the buffer.
+ - \p opt::compare - priority compare functor. No default functor is provided.
+ If the option is not specified, the \p opt::less is used.
+ - \p opt::less - specifies binary predicate used for priority compare. Default is \p std::less<T>.
+ - \p opt::lock_type - lock type. Default is \p cds::sync::spin
+ - \p opt::back_off - back-off strategy. Default is \p cds::backoff::yield
+ - \p opt::stat - internal statistics. Available types: \p mspriority_queue::stat, \p mspriority_queue::empty_stat (the default, no overhead)
+ - \p opt::item_counter - an item counter type for \p MSPriorityQueue.
+ Available type: \p cds::bitop::bit_reverse_counter, \p mspriority_queue::monotonic_counter. See \p traits::item_counter for details.
*/
template <typename... Options>
struct make_traits {
typedef implementation_defined type ; ///< Metafunction result
# else
typedef typename cds::opt::make_options<
- typename cds::opt::find_type_traits< type_traits, Options... >::type
+ typename cds::opt::find_type_traits< traits, Options... >::type
,Options...
>::type type;
# endif
single-lock algorithm.
Template parameters:
- - \p T - type to be stored in the list. The priority is a part of \p T type.
- - \p Traits - type traits. See mspriority_queue::type_traits for explanation.
-
- It is possible to declare option-based queue with cds::container::mspriority_queue::make_traits
- metafunction instead of \p Traits template argument.
- Template argument list \p Options of \p %cds::container::mspriority_queue::make_traits metafunction are:
- - opt::buffer - the buffer type for heap array. Possible type are: opt::v::static_buffer, opt::v::dynamic_buffer.
- Default is \p %opt::v::dynamic_buffer.
- You may specify any type of values for the buffer since at instantiation time
- the \p buffer::rebind member metafunction is called to change the type of values stored in the buffer.
- - opt::compare - priority compare functor. No default functor is provided.
- If the option is not specified, the opt::less is used.
- - opt::less - specifies binary predicate used for priority compare. Default is \p std::less<T>.
- - opt::lock_type - lock type. Default is cds::lock::Spin.
- - opt::back_off - back-off strategy. Default is cds::backoff::yield
- - opt::stat - internal statistics. Available types: mspriority_queue::stat, mspriority_queue::empty_stat (the default)
+ - \p T - type to be stored in the queue. The priority is a part of \p T type.
+ - \p Traits - type traits. See \p mspriority_queue::traits for explanation.
+ It is possible to declare option-based queue with \p cds::container::mspriority_queue::make_traits
+ metafunction instead of \p Traits template argument.
*/
- template <typename T, class Traits>
+ template <typename T, class Traits = mspriority_queue::traits >
class MSPriorityQueue: public cds::bounded_container
{
public:
typedef typename opt::details::make_comparator< value_type, traits >::type key_comparator;
# endif
- typedef typename traits::lock_type lock_type ; ///< heap's size lock type
- typedef typename traits::back_off back_off ; ///< Back-off strategy
- typedef typename traits::stat stat ; ///< internal statistics type
+ typedef typename traits::lock_type lock_type; ///< heap's size lock type
+ typedef typename traits::back_off back_off; ///< Back-off strategy
+ typedef typename traits::stat stat; ///< internal statistics type, see \p mspriority_queue::traits::stat
+ typedef typename traits::item_counter item_counter;///< Item counter type, see \p mspriority_queue::traits::item_counter
protected:
//@cond
typedef typename traits::buffer::template rebind<node>::other buffer_type ; ///< Heap array buffer type
//@cond
- typedef cds::bitop::bit_reverse_counter<> item_counter_type;
- typedef typename item_counter_type::counter_type counter_type;
+ typedef typename item_counter::counter_type counter_type;
//@endcond
protected:
- item_counter_type m_ItemCounter ; ///< Item counter
+ item_counter m_ItemCounter ; ///< Item counter
mutable lock_type m_Lock ; ///< Heap's size lock
buffer_type m_Heap ; ///< Heap array
stat m_Stat ; ///< internal statistics accumulator
public:
/// Constructs empty priority queue
/**
- For cds::opt::v::static_buffer the \p nCapacity parameter is ignored.
+ For \p cds::opt::v::initialized_static_buffer the \p nCapacity parameter is ignored.
*/
MSPriorityQueue( size_t nCapacity )
: m_Heap( nCapacity )
/**
If the priority queue is full, the function returns \p false,
no item has been added.
- Otherwise, the function inserts the copy of \p val into the heap
+ Otherwise, the function inserts the pointer to \p val into the heap
and returns \p true.
- The function use copy constructor to create new heap item from \p val.
+ The function does not make a copy of \p val.
*/
bool push( value_type& val )
{
- tag_type const curId = cds::OS::getCurrentThreadId();
+ tag_type const curId = cds::OS::get_current_thread_id();
// Insert new item at bottom of the heap
m_Lock.lock();
node& refNode = m_Heap[i];
refNode.lock();
m_Lock.unlock();
+ assert( refNode.m_nTag == tag_type( Empty ));
+ assert( refNode.m_pVal == nullptr );
refNode.m_pVal = &val;
refNode.m_nTag = curId;
refNode.unlock();
- // Move item towards top of the heap while it has higher priority than parent
+ // Move item towards top of heap while it has a higher priority than its parent
heapify_after_push( i, curId );
m_Stat.onPushSuccess();
/**
If the priority queue is empty, the function returns \p nullptr.
Otherwise, it returns the item extracted.
-
- The item returned may be disposed immediately.
*/
value_type * pop()
{
+ node& refTop = m_Heap[1];
+
m_Lock.lock();
if ( m_ItemCounter.value() == 0 ) {
// the heap is empty
m_Stat.onPopFailed();
return nullptr;
}
- counter_type nBottom = m_ItemCounter.reversed_value();
- m_ItemCounter.dec();
- // Since m_Heap[0] is not used, capacity() returns m_Heap.capacity() - 1
- // Consequently, "<=" is here
- assert( nBottom <= capacity() );
+ counter_type nBottom = m_ItemCounter.dec();
+ assert( nBottom < m_Heap.capacity() );
assert( nBottom > 0 );
- node& refBottom = m_Heap[ nBottom ];
+ refTop.lock();
+ if ( nBottom == 1 ) {
+ refTop.m_nTag = tag_type( Empty );
+ value_type * pVal = refTop.m_pVal;
+ refTop.m_pVal = nullptr;
+ refTop.unlock();
+ m_Lock.unlock();
+ m_Stat.onPopSuccess();
+ return pVal;
+ }
+
+ node& refBottom = m_Heap[nBottom];
refBottom.lock();
m_Lock.unlock();
refBottom.m_nTag = tag_type(Empty);
refBottom.m_pVal = nullptr;
refBottom.unlock();
- node& refTop = m_Heap[ 1 ];
- refTop.lock();
if ( refTop.m_nTag == tag_type(Empty) ) {
// nBottom == nTop
refTop.unlock();
std::swap( refTop.m_pVal, pVal );
refTop.m_nTag = tag_type( Available );
- assert( nBottom > 1 );
-
// refTop will be unlocked inside heapify_after_pop
- heapify_after_pop( 1, &refTop );
+ heapify_after_pop( &refTop );
m_Stat.onPopSuccess();
return pVal;
*/
void clear()
{
- clear_with( []( value_type const& src ) {} );
+ clear_with( []( value_type const& /*src*/ ) {} );
}
/// Clears the queue (not atomic)
template <typename Func>
void clear_with( Func f )
{
- while ( !empty() ) {
- value_type * pVal = pop();
- if ( pVal )
- cds::unref(f)( *pVal );
- }
+ value_type * pVal;
+ while (( pVal = pop()) != nullptr )
+ f( *pVal );
}
/// Checks is the priority queue is empty
/// Returns current size of priority queue
size_t size() const
{
- m_Lock.lock();
- size_t nSize = (size_t) m_ItemCounter.value();
- m_Lock.unlock();
- return nSize;
+ std::unique_lock<lock_type> l( m_Lock );
+ return static_cast<size_t>( m_ItemCounter.value());
}
/// Return capacity of the priority queue
i = 0;
}
}
- else if ( refParent.m_nTag == tag_type(Empty) )
+ else if ( refParent.m_nTag == tag_type( Empty ) ) {
+ m_Stat.onItemMovedTop();
i = 0;
- else if ( refItem.m_nTag != curId )
+ }
+ else if ( refItem.m_nTag != curId ) {
+ m_Stat.onItemMovedUp();
i = nParent;
- else
+ }
+ else {
+ m_Stat.onPushEmptyPass();
bProgress = false;
+ }
refItem.unlock();
refParent.unlock();
}
}
- void heapify_after_pop( counter_type nParent, node * pParent )
+ void heapify_after_pop( node * pParent )
{
key_comparator cmp;
+ counter_type const nCapacity = m_Heap.capacity();
+
+ counter_type nParent = 1;
+ for ( counter_type nChild = nParent * 2; nChild < nCapacity; nChild *= 2 ) {
+ node* pChild = &m_Heap[ nChild ];
+ pChild->lock();
- while ( nParent < m_Heap.capacity() / 2 ) {
- counter_type nLeft = nParent * 2;
- counter_type nRight = nLeft + 1;
- node& refLeft = m_Heap[nLeft];
- node& refRight = m_Heap[nRight];
- refLeft.lock();
- refRight.lock();
-
- counter_type nChild;
- node * pChild;
- if ( refLeft.m_nTag == tag_type(Empty) ) {
- refRight.unlock();
- refLeft.unlock();
+ if ( pChild->m_nTag == tag_type( Empty )) {
+ pChild->unlock();
break;
}
- else if ( refRight.m_nTag == tag_type(Empty) || cmp( *refLeft.m_pVal, *refRight.m_pVal ) > 0 ) {
- refRight.unlock();
- nChild = nLeft;
- pChild = &refLeft;
- }
- else {
- refLeft.unlock();
- nChild = nRight;
- pChild = &refRight;
+
+ counter_type const nRight = nChild + 1;
+ if ( nRight < nCapacity ) {
+ node& refRight = m_Heap[nRight];
+ refRight.lock();
+
+ if ( refRight.m_nTag != tag_type( Empty ) && cmp( *refRight.m_pVal, *pChild->m_pVal ) > 0 ) {
+ // get right child
+ pChild->unlock();
+ nChild = nRight;
+ pChild = &refRight;
+ }
+ else
+ refRight.unlock();
}
- // If child has higher priority that parent then swap
+ // If child has higher priority than parent then swap
// Otherwise stop
if ( cmp( *pChild->m_pVal, *pParent->m_pVal ) > 0 ) {
std::swap( pParent->m_nTag, pChild->m_nTag );
}} // namespace cds::intrusive
-#endif // #ifndef __CDS_INTRUSIVE_MSPRIORITY_QUEUE_H
+#endif // #ifndef CDSLIB_INTRUSIVE_MSPRIORITY_QUEUE_H