3 #ifndef __CDS_INTRUSIVE_MSPRIORITY_QUEUE_H
4 #define __CDS_INTRUSIVE_MSPRIORITY_QUEUE_H
6 #include <functional> // ref
7 #include <cds/intrusive/details/base.h>
8 #include <cds/lock/spinlock.h>
9 #include <cds/os/thread.h>
10 #include <cds/details/bit_reverse_counter.h>
11 #include <cds/intrusive/options.h>
12 #include <cds/opt/buffer.h>
13 #include <cds/opt/compare.h>
14 #include <cds/details/bounded_container.h>
16 namespace cds { namespace intrusive {
18 /// MSPriorityQueue related definitions
19 /** @ingroup cds_intrusive_helper
21 namespace mspriority_queue {
23 /// MSPriorityQueue statistics
24 template <typename Counter = cds::atomicity::event_counter>
26 typedef Counter event_counter ; ///< Event counter type
28 event_counter m_nPushCount ; ///< Count of success push operation
29 event_counter m_nPopCount ; ///< Count of success pop operation
30 event_counter m_nPushFailCount ; ///< Count of failed ("the queue is full") push operation
31 event_counter m_nPopFailCount ; ///< Count of failed ("the queue is empty") pop operation
32 event_counter m_nPushHeapifySwapCount ; ///< Count of item swapping when heapifying in push
33 event_counter m_nPopHeapifySwapCount ; ///< Count of item swapping when heapifying in pop
36 void onPushSuccess() { ++m_nPushCount ;}
37 void onPopSuccess() { ++m_nPopCount ;}
38 void onPushFailed() { ++m_nPushFailCount ;}
39 void onPopFailed() { ++m_nPopFailCount ;}
40 void onPushHeapifySwap() { ++m_nPushHeapifySwapCount ;}
41 void onPopHeapifySwap() { ++m_nPopHeapifySwapCount ;}
45 /// MSPriorityQueue empty statistics
48 void onPushSuccess() {}
49 void onPopSuccess() {}
50 void onPushFailed() {}
52 void onPushHeapifySwap() {}
53 void onPopHeapifySwap() {}
57 /// Type traits for MSPriorityQueue
61 The storage type for the heap array. Default is cds::opt::v::dynamic_buffer.
63 You may specify any type of buffer's value since at instantiation time
64 the \p buffer::rebind member metafunction is called to change type
65 of values stored in the buffer.
67 typedef opt::v::dynamic_buffer<void *> buffer;
69 /// Priority compare functor
71 No default functor is provided. If the option is not specified, the \p less is used.
73 typedef opt::none compare;
75 /// specifies binary predicate used for priority comparing.
77 Default is \p std::less<T>.
79 typedef opt::none less;
81 /// Type of mutual-exclusion lock
82 typedef lock::Spin lock_type;
85 typedef backoff::yield back_off;
87 /// Internal statistics
89 Possible types: mspriority_queue::empty_stat (the default), mspriority_queue::stat
90 or any other with interface like \p %mspriority_queue::stat
92 typedef empty_stat stat;
95 /// Metafunction converting option list to traits
97 This is a wrapper for <tt> cds::opt::make_options< type_traits, Options...> </tt>
99 See \ref MSPriorityQueue, \ref type_traits, \ref cds::opt::make_options.
101 template <typename... Options>
103 # ifdef CDS_DOXYGEN_INVOKED
104 typedef implementation_defined type ; ///< Metafunction result
106 typedef typename cds::opt::make_options<
107 typename cds::opt::find_type_traits< type_traits, Options... >::type
113 } // namespace mspriority_queue
115 /// Michael & Scott array-based lock-based concurrent priority queue heap
116 /** @ingroup cds_intrusive_priority_queue
118 - [1996] G.Hunt, M.Michael, S. Parthasarathy, M.Scott
119 "An efficient algorithm for concurrent priority queue heaps"
121 \p %MSPriorityQueue augments the standard array-based heap data structure with
122 a mutual-exclusion lock on the heap's size and locks on each node in the heap.
123 Each node also has a tag that indicates whether
124 it is empty, valid, or in a transient state due to an update to the heap
125 by an inserting thread.
126 The algorithm allows concurrent insertions and deletions in opposite directions,
127 without risking deadlock and without the need for special server threads.
128 It also uses a "bit-reversal" technique to scatter accesses across the fringe
129 of the tree to reduce contention.
130 On large heaps the algorithm achieves significant performance improvements
131 over serialized single-lock algorithm, for various insertion/deletion
132 workloads. For small heaps it still performs well, but not as well as
133 single-lock algorithm.
136 - \p T - type to be stored in the list. The priority is a part of \p T type.
137 - \p Traits - type traits. See mspriority_queue::type_traits for explanation.
139 It is possible to declare option-based queue with cds::container::mspriority_queue::make_traits
140 metafunction instead of \p Traits template argument.
141 Template argument list \p Options of \p %cds::container::mspriority_queue::make_traits metafunction are:
142 - opt::buffer - the buffer type for heap array. Possible type are: opt::v::static_buffer, opt::v::dynamic_buffer.
143 Default is \p %opt::v::dynamic_buffer.
144 You may specify any type of values for the buffer since at instantiation time
145 the \p buffer::rebind member metafunction is called to change the type of values stored in the buffer.
146 - opt::compare - priority compare functor. No default functor is provided.
147 If the option is not specified, the opt::less is used.
148 - opt::less - specifies binary predicate used for priority compare. Default is \p std::less<T>.
149 - opt::lock_type - lock type. Default is cds::lock::Spin.
150 - opt::back_off - back-off strategy. Default is cds::backoff::yield
151 - opt::stat - internal statistics. Available types: mspriority_queue::stat, mspriority_queue::empty_stat (the default)
153 template <typename T, class Traits>
154 class MSPriorityQueue: public cds::bounded_container
157 typedef T value_type ; ///< Value type stored in the queue
158 typedef Traits traits ; ///< Traits template parameter
160 # ifdef CDS_DOXYGEN_INVOKED
161 typedef implementation_defined key_comparator ; ///< priority comparing functor based on opt::compare and opt::less option setter.
163 typedef typename opt::details::make_comparator< value_type, traits >::type key_comparator;
166 typedef typename traits::lock_type lock_type ; ///< heap's size lock type
167 typedef typename traits::back_off back_off ; ///< Back-off strategy
168 typedef typename traits::stat stat ; ///< internal statistics type
172 typedef cds::OS::ThreadId tag_type;
183 value_type * m_pVal ; ///< A value pointer
184 tag_type volatile m_nTag ; ///< A tag
185 mutable lock_type m_Lock ; ///< Node-level lock
187 /// Creates empty node
190 , m_nTag( tag_type(Empty) )
208 typedef typename traits::buffer::template rebind<node>::other buffer_type ; ///< Heap array buffer type
211 typedef cds::bitop::bit_reverse_counter<> item_counter_type;
212 typedef typename item_counter_type::counter_type counter_type;
216 item_counter_type m_ItemCounter ; ///< Item counter
217 mutable lock_type m_Lock ; ///< Heap's size lock
218 buffer_type m_Heap ; ///< Heap array
219 stat m_Stat ; ///< internal statistics accumulator
222 /// Constructs empty priority queue
224 For cds::opt::v::static_buffer the \p nCapacity parameter is ignored.
226 MSPriorityQueue( size_t nCapacity )
227 : m_Heap( nCapacity )
230 /// Clears priority queue and destructs the object
236 /// Inserts a item into priority queue
238 If the priority queue is full, the function returns \p false,
239 no item has been added.
240 Otherwise, the function inserts the copy of \p val into the heap
243 The function use copy constructor to create new heap item from \p val.
245 bool push( value_type& val )
247 tag_type const curId = cds::OS::getCurrentThreadId();
249 // Insert new item at bottom of the heap
251 if ( m_ItemCounter.value() >= capacity() ) {
254 m_Stat.onPushFailed();
258 counter_type i = m_ItemCounter.inc();
259 assert( i < m_Heap.capacity() );
261 node& refNode = m_Heap[i];
264 refNode.m_pVal = &val;
265 refNode.m_nTag = curId;
268 // Move item towards top of the heap while it has higher priority than parent
269 heapify_after_push( i, curId );
271 m_Stat.onPushSuccess();
275 /// Extracts item with high priority
277 If the priority queue is empty, the function returns \p nullptr.
278 Otherwise, it returns the item extracted.
280 The item returned may be disposed immediately.
285 if ( m_ItemCounter.value() == 0 ) {
288 m_Stat.onPopFailed();
291 counter_type nBottom = m_ItemCounter.reversed_value();
293 // Since m_Heap[0] is not used, capacity() returns m_Heap.capacity() - 1
294 // Consequently, "<=" is here
295 assert( nBottom <= capacity() );
296 assert( nBottom > 0 );
298 node& refBottom = m_Heap[ nBottom ];
301 refBottom.m_nTag = tag_type(Empty);
302 value_type * pVal = refBottom.m_pVal;
303 refBottom.m_pVal = nullptr;
306 node& refTop = m_Heap[ 1 ];
308 if ( refTop.m_nTag == tag_type(Empty) ) {
311 m_Stat.onPopSuccess();
315 std::swap( refTop.m_pVal, pVal );
316 refTop.m_nTag = tag_type( Available );
318 assert( nBottom > 1 );
320 // refTop will be unlocked inside heapify_after_pop
321 heapify_after_pop( 1, &refTop );
323 m_Stat.onPopSuccess();
327 /// Clears the queue (not atomic)
329 This function is no atomic, but thread-safe
333 clear_with( []( value_type const& src ) {} );
336 /// Clears the queue (not atomic)
338 This function is no atomic, but thread-safe.
340 For each item removed the functor \p f is called.
341 \p Func interface is:
345 void operator()( value_type& item );
348 A lambda function or a function pointer can be used as \p f.
350 template <typename Func>
351 void clear_with( Func f )
354 value_type * pVal = pop();
360 /// Checks is the priority queue is empty
366 /// Checks if the priority queue is full
369 return size() == capacity();
372 /// Returns current size of priority queue
376 size_t nSize = (size_t) m_ItemCounter.value();
381 /// Return capacity of the priority queue
382 size_t capacity() const
384 // m_Heap[0] is not used
385 return m_Heap.capacity() - 1;
388 /// Returns const reference to internal statistics
389 stat const& statistics() const
397 void heapify_after_push( counter_type i, tag_type curId )
402 // Move item towards top of the heap while it has higher priority than parent
404 bool bProgress = true;
405 counter_type nParent = i / 2;
406 node& refParent = m_Heap[nParent];
408 node& refItem = m_Heap[i];
411 if ( refParent.m_nTag == tag_type(Available) && refItem.m_nTag == curId ) {
412 if ( cmp( *refItem.m_pVal, *refParent.m_pVal ) > 0 ) {
413 std::swap( refItem.m_nTag, refParent.m_nTag );
414 std::swap( refItem.m_pVal, refParent.m_pVal );
415 m_Stat.onPushHeapifySwap();
419 refItem.m_nTag = tag_type(Available);
423 else if ( refParent.m_nTag == tag_type(Empty) )
425 else if ( refItem.m_nTag != curId )
440 node& refItem = m_Heap[i];
442 if ( refItem.m_nTag == curId )
443 refItem.m_nTag = tag_type(Available);
448 void heapify_after_pop( counter_type nParent, node * pParent )
452 while ( nParent < m_Heap.capacity() / 2 ) {
453 counter_type nLeft = nParent * 2;
454 counter_type nRight = nLeft + 1;
455 node& refLeft = m_Heap[nLeft];
456 node& refRight = m_Heap[nRight];
462 if ( refLeft.m_nTag == tag_type(Empty) ) {
467 else if ( refRight.m_nTag == tag_type(Empty) || cmp( *refLeft.m_pVal, *refRight.m_pVal ) > 0 ) {
478 // If child has higher priority that parent then swap
480 if ( cmp( *pChild->m_pVal, *pParent->m_pVal ) > 0 ) {
481 std::swap( pParent->m_nTag, pChild->m_nTag );
482 std::swap( pParent->m_pVal, pChild->m_pVal );
484 m_Stat.onPopHeapifySwap();
498 }} // namespace cds::intrusive
500 #endif // #ifndef __CDS_INTRUSIVE_MSPRIORITY_QUEUE_H