2 This file is a part of libcds - Concurrent Data Structures library
4 (C) Copyright Maxim Khizhinsky (libcds.dev@gmail.com) 2006-2017
6 Source code repo: http://github.com/khizmax/libcds/
7 Download: http://sourceforge.net/projects/libcds/files/
9 Redistribution and use in source and binary forms, with or without
10 modification, are permitted provided that the following conditions are met:
12 * Redistributions of source code must retain the above copyright notice, this
13 list of conditions and the following disclaimer.
15 * Redistributions in binary form must reproduce the above copyright notice,
16 this list of conditions and the following disclaimer in the documentation
17 and/or other materials provided with the distribution.
19 THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
20 AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
21 IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
22 DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE
23 FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
24 DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
25 SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
26 CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,
27 OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
28 OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
31 #ifndef CDSLIB_INTRUSIVE_MSPRIORITY_QUEUE_H
32 #define CDSLIB_INTRUSIVE_MSPRIORITY_QUEUE_H
34 #include <mutex> // std::unique_lock
35 #include <cds/intrusive/details/base.h>
36 #include <cds/sync/spinlock.h>
37 #include <cds/os/thread.h>
38 #include <cds/details/bit_reverse_counter.h>
39 #include <cds/intrusive/options.h>
40 #include <cds/opt/buffer.h>
41 #include <cds/opt/compare.h>
42 #include <cds/details/bounded_container.h>
44 namespace cds { namespace intrusive {
46 /// MSPriorityQueue related definitions
47 /** @ingroup cds_intrusive_helper
49 namespace mspriority_queue {
51 /// MSPriorityQueue statistics
52 template <typename Counter = cds::atomicity::event_counter>
54 typedef Counter event_counter ; ///< Event counter type
56 event_counter m_nPushCount; ///< Count of success push operation
57 event_counter m_nPopCount; ///< Count of success pop operation
58 event_counter m_nPushFailCount; ///< Count of failed ("the queue is full") push operation
59 event_counter m_nPopFailCount; ///< Count of failed ("the queue is empty") pop operation
60 event_counter m_nPushHeapifySwapCount; ///< Count of item swapping when heapifying in push
61 event_counter m_nPopHeapifySwapCount; ///< Count of item swapping when heapifying in pop
62 event_counter m_nItemMovedTop; ///< Count of events when \p push() encountered that inserted item was moved to top by a concurrent \p pop()
63 event_counter m_nItemMovedUp; ///< Count of events when \p push() encountered that inserted item was moved upwards by a concurrent \p pop()
64 event_counter m_nPushEmptyPass; ///< Count of empty pass during heapify via concurrent operations
67 void onPushSuccess() { ++m_nPushCount ;}
68 void onPopSuccess() { ++m_nPopCount ;}
69 void onPushFailed() { ++m_nPushFailCount ;}
70 void onPopFailed() { ++m_nPopFailCount ;}
71 void onPushHeapifySwap() { ++m_nPushHeapifySwapCount ;}
72 void onPopHeapifySwap() { ++m_nPopHeapifySwapCount ;}
74 void onItemMovedTop() { ++m_nItemMovedTop ;}
75 void onItemMovedUp() { ++m_nItemMovedUp ;}
76 void onPushEmptyPass() { ++m_nPushEmptyPass ;}
80 /// MSPriorityQueue empty statistics
83 void onPushSuccess() const {}
84 void onPopSuccess() const {}
85 void onPushFailed() const {}
86 void onPopFailed() const {}
87 void onPushHeapifySwap() const {}
88 void onPopHeapifySwap() const {}
90 void onItemMovedTop() const {}
91 void onItemMovedUp() const {}
92 void onPushEmptyPass() const {}
96 /// MSPriorityQueue traits
100 The storage type for the heap array. Default is \p cds::opt::v::initialized_dynamic_buffer.
102 You may specify any type of buffer's value since at instantiation time
103 the \p buffer::rebind member metafunction is called to change type
104 of values stored in the buffer.
106 typedef opt::v::initialized_dynamic_buffer<void *> buffer;
108 /// Priority compare functor
110 No default functor is provided. If the option is not specified, the \p less is used.
112 typedef opt::none compare;
114 /// Specifies binary predicate used for priority comparing.
116 Default is \p std::less<T>.
118 typedef opt::none less;
120 /// Type of mutual-exclusion lock. The lock is not need to be recursive.
121 typedef cds::sync::spin lock_type;
123 /// Back-off strategy
124 typedef backoff::Default back_off;
126 /// Internal statistics
128 Possible types: \p mspriority_queue::empty_stat (the default, no overhead), \p mspriority_queue::stat
129 or any other with interface like \p %mspriority_queue::stat
131 typedef empty_stat stat;
134 /// Metafunction converting option list to traits
137 - \p opt::buffer - the buffer type for heap array. Possible type are: \p opt::v::initialized_static_buffer, \p opt::v::initialized_dynamic_buffer.
138 Default is \p %opt::v::initialized_dynamic_buffer.
139 You may specify any type of value for the buffer since at instantiation time
140 the \p buffer::rebind member metafunction is called to change the type of values stored in the buffer.
141 - \p opt::compare - priority compare functor. No default functor is provided.
142 If the option is not specified, the \p opt::less is used.
143 - \p opt::less - specifies binary predicate used for priority compare. Default is \p std::less<T>.
144 - \p opt::lock_type - lock type. Default is \p cds::sync::spin
145 - \p opt::back_off - back-off strategy. Default is \p cds::backoff::yield
146 - \p opt::stat - internal statistics. Available types: \p mspriority_queue::stat, \p mspriority_queue::empty_stat (the default, no overhead)
148 template <typename... Options>
150 # ifdef CDS_DOXYGEN_INVOKED
151 typedef implementation_defined type ; ///< Metafunction result
153 typedef typename cds::opt::make_options<
154 typename cds::opt::find_type_traits< traits, Options... >::type
160 } // namespace mspriority_queue
162 /// Michael & Scott array-based lock-based concurrent priority queue heap
163 /** @ingroup cds_intrusive_priority_queue
165 - [1996] G.Hunt, M.Michael, S. Parthasarathy, M.Scott
166 "An efficient algorithm for concurrent priority queue heaps"
168 \p %MSPriorityQueue augments the standard array-based heap data structure with
169 a mutual-exclusion lock on the heap's size and locks on each node in the heap.
170 Each node also has a tag that indicates whether
171 it is empty, valid, or in a transient state due to an update to the heap
172 by an inserting thread.
173 The algorithm allows concurrent insertions and deletions in opposite directions,
174 without risking deadlock and without the need for special server threads.
175 It also uses a "bit-reversal" technique to scatter accesses across the fringe
176 of the tree to reduce contention.
177 On large heaps the algorithm achieves significant performance improvements
178 over serialized single-lock algorithm, for various insertion/deletion
179 workloads. For small heaps it still performs well, but not as well as
180 single-lock algorithm.
183 - \p T - type to be stored in the queue. The priority is a part of \p T type.
184 - \p Traits - type traits. See \p mspriority_queue::traits for explanation.
185 It is possible to declare option-based queue with \p cds::container::mspriority_queue::make_traits
186 metafunction instead of \p Traits template argument.
188 template <typename T, class Traits = mspriority_queue::traits >
189 class MSPriorityQueue: public cds::bounded_container
192 typedef T value_type ; ///< Value type stored in the queue
193 typedef Traits traits ; ///< Traits template parameter
195 # ifdef CDS_DOXYGEN_INVOKED
196 typedef implementation_defined key_comparator ; ///< priority comparing functor based on opt::compare and opt::less option setter.
198 typedef typename opt::details::make_comparator< value_type, traits >::type key_comparator;
201 typedef typename traits::lock_type lock_type; ///< heap's size lock type
202 typedef typename traits::back_off back_off; ///< Back-off strategy
203 typedef typename traits::stat stat; ///< internal statistics type, see \p mspriority_queue::traits::stat
204 typedef typename cds::bitop::bit_reverse_counter<> item_counter;///< Item counter type
208 typedef cds::OS::ThreadId tag_type;
219 value_type * m_pVal ; ///< A value pointer
220 tag_type volatile m_nTag ; ///< A tag
221 mutable lock_type m_Lock ; ///< Node-level lock
223 /// Creates empty node
226 , m_nTag( tag_type(Empty))
244 typedef typename traits::buffer::template rebind<node>::other buffer_type ; ///< Heap array buffer type
247 typedef typename item_counter::counter_type counter_type;
251 item_counter m_ItemCounter ; ///< Item counter
252 mutable lock_type m_Lock ; ///< Heap's size lock
253 buffer_type m_Heap ; ///< Heap array
254 stat m_Stat ; ///< internal statistics accumulator
257 /// Constructs empty priority queue
259 For \p cds::opt::v::initialized_static_buffer the \p nCapacity parameter is ignored.
261 MSPriorityQueue( size_t nCapacity )
262 : m_Heap( nCapacity )
265 /// Clears priority queue and destructs the object
271 /// Inserts a item into priority queue
273 If the priority queue is full, the function returns \p false,
274 no item has been added.
275 Otherwise, the function inserts the pointer to \p val into the heap
278 The function does not make a copy of \p val.
280 bool push( value_type& val )
282 tag_type const curId = cds::OS::get_current_thread_id();
284 // Insert new item at bottom of the heap
286 if ( m_ItemCounter.value() >= capacity()) {
289 m_Stat.onPushFailed();
293 counter_type i = m_ItemCounter.inc();
294 assert( i < m_Heap.capacity());
296 node& refNode = m_Heap[i];
299 assert( refNode.m_nTag == tag_type( Empty ));
300 assert( refNode.m_pVal == nullptr );
301 refNode.m_pVal = &val;
302 refNode.m_nTag = curId;
305 // Move item towards top of heap while it has a higher priority than its parent
306 heapify_after_push( i, curId );
308 m_Stat.onPushSuccess();
312 /// Extracts item with high priority
314 If the priority queue is empty, the function returns \p nullptr.
315 Otherwise, it returns the item extracted.
319 node& refTop = m_Heap[1];
322 if ( m_ItemCounter.value() == 0 ) {
325 m_Stat.onPopFailed();
328 counter_type nBottom = m_ItemCounter.dec();
329 assert( nBottom < m_Heap.capacity());
330 assert( nBottom > 0 );
333 if ( nBottom == 1 ) {
334 refTop.m_nTag = tag_type( Empty );
335 value_type * pVal = refTop.m_pVal;
336 refTop.m_pVal = nullptr;
339 m_Stat.onPopSuccess();
343 node& refBottom = m_Heap[nBottom];
346 refBottom.m_nTag = tag_type(Empty);
347 value_type * pVal = refBottom.m_pVal;
348 refBottom.m_pVal = nullptr;
351 if ( refTop.m_nTag == tag_type(Empty)) {
354 m_Stat.onPopSuccess();
358 std::swap( refTop.m_pVal, pVal );
359 refTop.m_nTag = tag_type( Available );
361 // refTop will be unlocked inside heapify_after_pop
362 heapify_after_pop( &refTop );
364 m_Stat.onPopSuccess();
368 /// Clears the queue (not atomic)
370 This function is no atomic, but thread-safe
374 clear_with( []( value_type const& /*src*/ ) {} );
377 /// Clears the queue (not atomic)
379 This function is no atomic, but thread-safe.
381 For each item removed the functor \p f is called.
382 \p Func interface is:
386 void operator()( value_type& item );
389 A lambda function or a function pointer can be used as \p f.
391 template <typename Func>
392 void clear_with( Func f )
395 while (( pVal = pop()) != nullptr )
399 /// Checks is the priority queue is empty
405 /// Checks if the priority queue is full
408 return size() == capacity();
411 /// Returns current size of priority queue
414 std::unique_lock<lock_type> l( m_Lock );
415 return static_cast<size_t>( m_ItemCounter.value());
418 /// Return capacity of the priority queue
419 size_t capacity() const
421 // m_Heap[0] is not used
422 return m_Heap.capacity() - 1;
425 /// Returns const reference to internal statistics
426 stat const& statistics() const
434 void heapify_after_push( counter_type i, tag_type curId )
439 // Move item towards top of the heap while it has higher priority than parent
441 bool bProgress = true;
442 counter_type nParent = i / 2;
443 node& refParent = m_Heap[nParent];
445 node& refItem = m_Heap[i];
448 if ( refParent.m_nTag == tag_type(Available) && refItem.m_nTag == curId ) {
449 if ( cmp( *refItem.m_pVal, *refParent.m_pVal ) > 0 ) {
450 std::swap( refItem.m_nTag, refParent.m_nTag );
451 std::swap( refItem.m_pVal, refParent.m_pVal );
452 m_Stat.onPushHeapifySwap();
456 refItem.m_nTag = tag_type(Available);
460 else if ( refParent.m_nTag == tag_type( Empty )) {
461 m_Stat.onItemMovedTop();
464 else if ( refItem.m_nTag != curId ) {
465 m_Stat.onItemMovedUp();
469 m_Stat.onPushEmptyPass();
483 node& refItem = m_Heap[i];
485 if ( refItem.m_nTag == curId )
486 refItem.m_nTag = tag_type(Available);
491 void heapify_after_pop( node * pParent )
494 counter_type const nCapacity = m_Heap.capacity();
496 counter_type nParent = 1;
497 for ( counter_type nChild = nParent * 2; nChild < nCapacity; nChild *= 2 ) {
498 node* pChild = &m_Heap[ nChild ];
501 if ( pChild->m_nTag == tag_type( Empty )) {
506 counter_type const nRight = nChild + 1;
507 if ( nRight < nCapacity ) {
508 node& refRight = m_Heap[nRight];
511 if ( refRight.m_nTag != tag_type( Empty ) && cmp( *refRight.m_pVal, *pChild->m_pVal ) > 0 ) {
521 // If child has higher priority than parent then swap
523 if ( cmp( *pChild->m_pVal, *pParent->m_pVal ) > 0 ) {
524 std::swap( pParent->m_nTag, pChild->m_nTag );
525 std::swap( pParent->m_pVal, pChild->m_pVal );
527 m_Stat.onPopHeapifySwap();
541 }} // namespace cds::intrusive
543 #endif // #ifndef CDSLIB_INTRUSIVE_MSPRIORITY_QUEUE_H