2 This file is a part of libcds - Concurrent Data Structures library
4 (C) Copyright Maxim Khizhinsky (libcds.dev@gmail.com) 2006-2016
6 Source code repo: http://github.com/khizmax/libcds/
7 Download: http://sourceforge.net/projects/libcds/files/
9 Redistribution and use in source and binary forms, with or without
10 modification, are permitted provided that the following conditions are met:
12 * Redistributions of source code must retain the above copyright notice, this
13 list of conditions and the following disclaimer.
15 * Redistributions in binary form must reproduce the above copyright notice,
16 this list of conditions and the following disclaimer in the documentation
17 and/or other materials provided with the distribution.
19 THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
20 AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
21 IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
22 DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE
23 FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
24 DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
25 SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
26 CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,
27 OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
28 OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
31 #ifndef CDSLIB_INTRUSIVE_MSPRIORITY_QUEUE_H
32 #define CDSLIB_INTRUSIVE_MSPRIORITY_QUEUE_H
34 #include <mutex> // std::unique_lock
35 #include <cds/intrusive/details/base.h>
36 #include <cds/sync/spinlock.h>
37 #include <cds/os/thread.h>
38 #include <cds/details/bit_reverse_counter.h>
39 #include <cds/intrusive/options.h>
40 #include <cds/opt/buffer.h>
41 #include <cds/opt/compare.h>
42 #include <cds/details/bounded_container.h>
44 namespace cds { namespace intrusive {
46 /// MSPriorityQueue related definitions
47 /** @ingroup cds_intrusive_helper
49 namespace mspriority_queue {
51 /// MSPriorityQueue statistics
52 template <typename Counter = cds::atomicity::event_counter>
54 typedef Counter event_counter ; ///< Event counter type
56 event_counter m_nPushCount; ///< Count of success push operation
57 event_counter m_nPopCount; ///< Count of success pop operation
58 event_counter m_nPushFailCount; ///< Count of failed ("the queue is full") push operation
59 event_counter m_nPopFailCount; ///< Count of failed ("the queue is empty") pop operation
60 event_counter m_nPushHeapifySwapCount; ///< Count of item swapping when heapifying in push
61 event_counter m_nPopHeapifySwapCount; ///< Count of item swapping when heapifying in pop
62 event_counter m_nItemMovedTop; ///< Count of events when \p push() encountered that inserted item was moved to top by a concurrent \p pop()
63 event_counter m_nItemMovedUp; ///< Count of events when \p push() encountered that inserted item was moved upwards by a concurrent \p pop()
64 event_counter m_nPushEmptyPass; ///< Count of empty pass during heapify via concurrent operations
67 void onPushSuccess() { ++m_nPushCount ;}
68 void onPopSuccess() { ++m_nPopCount ;}
69 void onPushFailed() { ++m_nPushFailCount ;}
70 void onPopFailed() { ++m_nPopFailCount ;}
71 void onPushHeapifySwap() { ++m_nPushHeapifySwapCount ;}
72 void onPopHeapifySwap() { ++m_nPopHeapifySwapCount ;}
74 void onItemMovedTop() { ++m_nItemMovedTop ;}
75 void onItemMovedUp() { ++m_nItemMovedUp ;}
76 void onPushEmptyPass() { ++m_nPushEmptyPass ;}
80 /// MSPriorityQueue empty statistics
83 void onPushSuccess() const {}
84 void onPopSuccess() const {}
85 void onPushFailed() const {}
86 void onPopFailed() const {}
87 void onPushHeapifySwap() const {}
88 void onPopHeapifySwap() const {}
90 void onItemMovedTop() const {}
91 void onItemMovedUp() const {}
92 void onPushEmptyPass() const {}
96 /// MSPriorityQueue traits
100 The storage type for the heap array. Default is \p cds::opt::v::initialized_dynamic_buffer.
102 You may specify any type of buffer's value since at instantiation time
103 the \p buffer::rebind member metafunction is called to change type
104 of values stored in the buffer.
106 typedef opt::v::initialized_dynamic_buffer<void *> buffer;
108 /// Priority compare functor
110 No default functor is provided. If the option is not specified, the \p less is used.
112 typedef opt::none compare;
114 /// Specifies binary predicate used for priority comparing.
116 Default is \p std::less<T>.
118 typedef opt::none less;
120 /// Type of mutual-exclusion lock
121 typedef cds::sync::spin lock_type;
123 /// Back-off strategy
124 typedef backoff::yield back_off;
126 /// Internal statistics
128 Possible types: \p mspriority_queue::empty_stat (the default, no overhead), \p mspriority_queue::stat
129 or any other with interface like \p %mspriority_queue::stat
131 typedef empty_stat stat;
134 /// Metafunction converting option list to traits
137 - \p opt::buffer - the buffer type for heap array. Possible type are: \p opt::v::initialized_static_buffer, \p opt::v::initialized_dynamic_buffer.
138 Default is \p %opt::v::initialized_dynamic_buffer.
139 You may specify any type of value for the buffer since at instantiation time
140 the \p buffer::rebind member metafunction is called to change the type of values stored in the buffer.
141 - \p opt::compare - priority compare functor. No default functor is provided.
142 If the option is not specified, the \p opt::less is used.
143 - \p opt::less - specifies binary predicate used for priority compare. Default is \p std::less<T>.
144 - \p opt::lock_type - lock type. Default is \p cds::sync::spin
145 - \p opt::back_off - back-off strategy. Default is \p cds::backoff::yield
146 - \p opt::stat - internal statistics. Available types: \p mspriority_queue::stat, \p mspriority_queue::empty_stat (the default, no overhead)
148 template <typename... Options>
150 # ifdef CDS_DOXYGEN_INVOKED
151 typedef implementation_defined type ; ///< Metafunction result
153 typedef typename cds::opt::make_options<
154 typename cds::opt::find_type_traits< traits, Options... >::type
160 } // namespace mspriority_queue
162 /// Michael & Scott array-based lock-based concurrent priority queue heap
163 /** @ingroup cds_intrusive_priority_queue
165 - [1996] G.Hunt, M.Michael, S. Parthasarathy, M.Scott
166 "An efficient algorithm for concurrent priority queue heaps"
168 \p %MSPriorityQueue augments the standard array-based heap data structure with
169 a mutual-exclusion lock on the heap's size and locks on each node in the heap.
170 Each node also has a tag that indicates whether
171 it is empty, valid, or in a transient state due to an update to the heap
172 by an inserting thread.
173 The algorithm allows concurrent insertions and deletions in opposite directions,
174 without risking deadlock and without the need for special server threads.
175 It also uses a "bit-reversal" technique to scatter accesses across the fringe
176 of the tree to reduce contention.
177 On large heaps the algorithm achieves significant performance improvements
178 over serialized single-lock algorithm, for various insertion/deletion
179 workloads. For small heaps it still performs well, but not as well as
180 single-lock algorithm.
183 - \p T - type to be stored in the queue. The priority is a part of \p T type.
184 - \p Traits - type traits. See \p mspriority_queue::traits for explanation.
185 It is possible to declare option-based queue with \p cds::container::mspriority_queue::make_traits
186 metafunction instead of \p Traits template argument.
188 template <typename T, class Traits = mspriority_queue::traits >
189 class MSPriorityQueue: public cds::bounded_container
192 typedef T value_type ; ///< Value type stored in the queue
193 typedef Traits traits ; ///< Traits template parameter
195 # ifdef CDS_DOXYGEN_INVOKED
196 typedef implementation_defined key_comparator ; ///< priority comparing functor based on opt::compare and opt::less option setter.
198 typedef typename opt::details::make_comparator< value_type, traits >::type key_comparator;
201 typedef typename traits::lock_type lock_type; ///< heap's size lock type
202 typedef typename traits::back_off back_off; ///< Back-off strategy
203 typedef typename traits::stat stat; ///< internal statistics type
207 typedef cds::OS::ThreadId tag_type;
218 value_type * m_pVal ; ///< A value pointer
219 tag_type volatile m_nTag ; ///< A tag
220 mutable lock_type m_Lock ; ///< Node-level lock
222 /// Creates empty node
225 , m_nTag( tag_type(Empty) )
243 typedef typename traits::buffer::template rebind<node>::other buffer_type ; ///< Heap array buffer type
246 typedef cds::bitop::bit_reverse_counter<> item_counter_type;
247 typedef typename item_counter_type::counter_type counter_type;
251 item_counter_type m_ItemCounter ; ///< Item counter
252 mutable lock_type m_Lock ; ///< Heap's size lock
253 buffer_type m_Heap ; ///< Heap array
254 stat m_Stat ; ///< internal statistics accumulator
257 /// Constructs empty priority queue
259 For \p cds::opt::v::initialized_static_buffer the \p nCapacity parameter is ignored.
261 MSPriorityQueue( size_t nCapacity )
262 : m_Heap( nCapacity )
265 /// Clears priority queue and destructs the object
271 /// Inserts a item into priority queue
273 If the priority queue is full, the function returns \p false,
274 no item has been added.
275 Otherwise, the function inserts the pointer to \p val into the heap
278 The function does not make a copy of \p val.
280 bool push( value_type& val )
282 tag_type const curId = cds::OS::get_current_thread_id();
284 // Insert new item at bottom of the heap
286 if ( m_ItemCounter.value() >= capacity() ) {
289 m_Stat.onPushFailed();
293 counter_type i = m_ItemCounter.inc();
294 assert( i < m_Heap.capacity() );
296 node& refNode = m_Heap[i];
299 refNode.m_pVal = &val;
300 refNode.m_nTag = curId;
303 // Move item towards top of the heap while it has higher priority than parent
304 heapify_after_push( i, curId );
306 m_Stat.onPushSuccess();
310 /// Extracts item with high priority
312 If the priority queue is empty, the function returns \p nullptr.
313 Otherwise, it returns the item extracted.
318 if ( m_ItemCounter.value() == 0 ) {
321 m_Stat.onPopFailed();
324 counter_type nBottom = m_ItemCounter.reversed_value();
326 assert( nBottom < m_Heap.capacity() );
327 assert( nBottom > 0 );
329 node& refBottom = m_Heap[ nBottom ];
332 refBottom.m_nTag = tag_type(Empty);
333 value_type * pVal = refBottom.m_pVal;
334 refBottom.m_pVal = nullptr;
337 node& refTop = m_Heap[ 1 ];
339 if ( refTop.m_nTag == tag_type(Empty) ) {
342 m_Stat.onPopSuccess();
346 std::swap( refTop.m_pVal, pVal );
347 refTop.m_nTag = tag_type( Available );
349 // refTop will be unlocked inside heapify_after_pop
350 heapify_after_pop( &refTop );
352 m_Stat.onPopSuccess();
356 /// Clears the queue (not atomic)
358 This function is no atomic, but thread-safe
362 clear_with( []( value_type const& /*src*/ ) {} );
365 /// Clears the queue (not atomic)
367 This function is no atomic, but thread-safe.
369 For each item removed the functor \p f is called.
370 \p Func interface is:
374 void operator()( value_type& item );
377 A lambda function or a function pointer can be used as \p f.
379 template <typename Func>
380 void clear_with( Func f )
383 value_type * pVal = pop();
389 /// Checks is the priority queue is empty
395 /// Checks if the priority queue is full
398 return size() == capacity();
401 /// Returns current size of priority queue
404 std::unique_lock<lock_type> l( m_Lock );
405 return static_cast<size_t>( m_ItemCounter.value());
408 /// Return capacity of the priority queue
409 size_t capacity() const
411 // m_Heap[0] is not used
412 return m_Heap.capacity() - 1;
415 /// Returns const reference to internal statistics
416 stat const& statistics() const
424 void heapify_after_push( counter_type i, tag_type curId )
429 // Move item towards top of the heap while it has higher priority than parent
431 bool bProgress = true;
432 counter_type nParent = i / 2;
433 node& refParent = m_Heap[nParent];
435 node& refItem = m_Heap[i];
438 if ( refParent.m_nTag == tag_type(Available) && refItem.m_nTag == curId ) {
439 if ( cmp( *refItem.m_pVal, *refParent.m_pVal ) > 0 ) {
440 std::swap( refItem.m_nTag, refParent.m_nTag );
441 std::swap( refItem.m_pVal, refParent.m_pVal );
442 m_Stat.onPushHeapifySwap();
446 refItem.m_nTag = tag_type(Available);
450 else if ( refParent.m_nTag == tag_type( Empty ) ) {
451 m_Stat.onItemMovedTop();
454 else if ( refItem.m_nTag != curId ) {
455 m_Stat.onItemMovedUp();
459 m_Stat.onPushEmptyPass();
473 node& refItem = m_Heap[i];
475 if ( refItem.m_nTag == curId )
476 refItem.m_nTag = tag_type(Available);
481 void heapify_after_pop( node * pParent )
484 counter_type const nCapacity = m_Heap.capacity();
486 counter_type nParent = 1;
487 for ( counter_type nChild = nParent * 2; nChild < nCapacity; nChild *= 2 ) {
488 node* pChild = &m_Heap[ nChild ];
491 if ( pChild->m_nTag == tag_type( Empty )) {
496 counter_type const nRight = nChild + 1;
497 if ( nRight < nCapacity ) {
498 node& refRight = m_Heap[nRight];
501 if ( refRight.m_nTag != tag_type( Empty ) && cmp( *refRight.m_pVal, *pChild->m_pVal ) > 0 ) {
511 // If child has higher priority that parent then swap
513 if ( cmp( *pChild->m_pVal, *pParent->m_pVal ) > 0 ) {
514 std::swap( pParent->m_nTag, pChild->m_nTag );
515 std::swap( pParent->m_pVal, pChild->m_pVal );
517 m_Stat.onPopHeapifySwap();
531 }} // namespace cds::intrusive
533 #endif // #ifndef CDSLIB_INTRUSIVE_MSPRIORITY_QUEUE_H