MSPriorityQueue: revised pop()
[libcds.git] / cds / intrusive / mspriority_queue.h
1 /*
2     This file is a part of libcds - Concurrent Data Structures library
3
4     (C) Copyright Maxim Khizhinsky (libcds.dev@gmail.com) 2006-2016
5
6     Source code repo: http://github.com/khizmax/libcds/
7     Download: http://sourceforge.net/projects/libcds/files/
8     
9     Redistribution and use in source and binary forms, with or without
10     modification, are permitted provided that the following conditions are met:
11
12     * Redistributions of source code must retain the above copyright notice, this
13       list of conditions and the following disclaimer.
14
15     * Redistributions in binary form must reproduce the above copyright notice,
16       this list of conditions and the following disclaimer in the documentation
17       and/or other materials provided with the distribution.
18
19     THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
20     AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
21     IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
22     DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE
23     FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
24     DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
25     SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
26     CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,
27     OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
28     OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
29 */
30
31 #ifndef CDSLIB_INTRUSIVE_MSPRIORITY_QUEUE_H
32 #define CDSLIB_INTRUSIVE_MSPRIORITY_QUEUE_H
33
34 #include <mutex>  // std::unique_lock
35 #include <cds/intrusive/details/base.h>
36 #include <cds/sync/spinlock.h>
37 #include <cds/os/thread.h>
38 #include <cds/details/bit_reverse_counter.h>
39 #include <cds/intrusive/options.h>
40 #include <cds/opt/buffer.h>
41 #include <cds/opt/compare.h>
42 #include <cds/details/bounded_container.h>
43
44 namespace cds { namespace intrusive {
45
46     /// MSPriorityQueue related definitions
47     /** @ingroup cds_intrusive_helper
48     */
49     namespace mspriority_queue {
50
51         /// MSPriorityQueue statistics
52         template <typename Counter = cds::atomicity::event_counter>
53         struct stat {
54             typedef Counter   event_counter ; ///< Event counter type
55
56             event_counter   m_nPushCount;            ///< Count of success push operation
57             event_counter   m_nPopCount;             ///< Count of success pop operation
58             event_counter   m_nPushFailCount;        ///< Count of failed ("the queue is full") push operation
59             event_counter   m_nPopFailCount;         ///< Count of failed ("the queue is empty") pop operation
60             event_counter   m_nPushHeapifySwapCount; ///< Count of item swapping when heapifying in push
61             event_counter   m_nPopHeapifySwapCount;  ///< Count of item swapping when heapifying in pop
62             event_counter   m_nItemMovedTop;         ///< Count of events when \p push() encountered that inserted item was moved to top by a concurrent \p pop()
63             event_counter   m_nItemMovedUp;          ///< Count of events when \p push() encountered that inserted item was moved upwards by a concurrent \p pop()
64             event_counter   m_nPushEmptyPass;        ///< Count of empty pass during heapify via concurrent operations
65
66             //@cond
67             void onPushSuccess()            { ++m_nPushCount            ;}
68             void onPopSuccess()             { ++m_nPopCount             ;}
69             void onPushFailed()             { ++m_nPushFailCount        ;}
70             void onPopFailed()              { ++m_nPopFailCount         ;}
71             void onPushHeapifySwap()        { ++m_nPushHeapifySwapCount ;}
72             void onPopHeapifySwap()         { ++m_nPopHeapifySwapCount  ;}
73
74             void onItemMovedTop()           { ++m_nItemMovedTop         ;}
75             void onItemMovedUp()            { ++m_nItemMovedUp          ;}
76             void onPushEmptyPass()          { ++m_nPushEmptyPass        ;}
77             //@endcond
78         };
79
80         /// MSPriorityQueue empty statistics
81         struct empty_stat {
82             //@cond
83             void onPushSuccess()            const {}
84             void onPopSuccess()             const {}
85             void onPushFailed()             const {}
86             void onPopFailed()              const {}
87             void onPushHeapifySwap()        const {}
88             void onPopHeapifySwap()         const {}
89
90             void onItemMovedTop()           const {}
91             void onItemMovedUp()            const {}
92             void onPushEmptyPass()          const {}
93             //@endcond
94         };
95
96         /// MSPriorityQueue traits
97         struct traits {
98             /// Storage type
99             /**
100                 The storage type for the heap array. Default is \p cds::opt::v::initialized_dynamic_buffer.
101
102                 You may specify any type of buffer's value since at instantiation time
103                 the \p buffer::rebind member metafunction is called to change type
104                 of values stored in the buffer.
105             */
106             typedef opt::v::initialized_dynamic_buffer<void *>  buffer;
107
108             /// Priority compare functor
109             /**
110                 No default functor is provided. If the option is not specified, the \p less is used.
111             */
112             typedef opt::none       compare;
113
114             /// Specifies binary predicate used for priority comparing.
115             /**
116                 Default is \p std::less<T>.
117             */
118             typedef opt::none       less;
119
120             /// Type of mutual-exclusion lock
121             typedef cds::sync::spin lock_type;
122
123             /// Back-off strategy
124             typedef backoff::yield      back_off;
125
126             /// Internal statistics
127             /**
128                 Possible types: \p mspriority_queue::empty_stat (the default, no overhead), \p mspriority_queue::stat
129                 or any other with interface like \p %mspriority_queue::stat
130             */
131             typedef empty_stat      stat;
132         };
133
134         /// Metafunction converting option list to traits
135         /**
136             \p Options:
137             - \p opt::buffer - the buffer type for heap array. Possible type are: \p opt::v::initialized_static_buffer, \p opt::v::initialized_dynamic_buffer.
138                 Default is \p %opt::v::initialized_dynamic_buffer.
139                 You may specify any type of value for the buffer since at instantiation time
140                 the \p buffer::rebind member metafunction is called to change the type of values stored in the buffer.
141             - \p opt::compare - priority compare functor. No default functor is provided.
142                 If the option is not specified, the \p opt::less is used.
143             - \p opt::less - specifies binary predicate used for priority compare. Default is \p std::less<T>.
144             - \p opt::lock_type - lock type. Default is \p cds::sync::spin
145             - \p opt::back_off - back-off strategy. Default is \p cds::backoff::yield
146             - \p opt::stat - internal statistics. Available types: \p mspriority_queue::stat, \p mspriority_queue::empty_stat (the default, no overhead)
147         */
148         template <typename... Options>
149         struct make_traits {
150 #   ifdef CDS_DOXYGEN_INVOKED
151             typedef implementation_defined type ;   ///< Metafunction result
152 #   else
153             typedef typename cds::opt::make_options<
154                 typename cds::opt::find_type_traits< traits, Options... >::type
155                 ,Options...
156             >::type   type;
157 #   endif
158         };
159
160     }   // namespace mspriority_queue
161
162     /// Michael & Scott array-based lock-based concurrent priority queue heap
163     /** @ingroup cds_intrusive_priority_queue
164         Source:
165             - [1996] G.Hunt, M.Michael, S. Parthasarathy, M.Scott
166                 "An efficient algorithm for concurrent priority queue heaps"
167
168         \p %MSPriorityQueue augments the standard array-based heap data structure with
169         a mutual-exclusion lock on the heap's size and locks on each node in the heap.
170         Each node also has a tag that indicates whether
171         it is empty, valid, or in a transient state due to an update to the heap
172         by an inserting thread.
173         The algorithm allows concurrent insertions and deletions in opposite directions,
174         without risking deadlock and without the need for special server threads.
175         It also uses a "bit-reversal" technique to scatter accesses across the fringe
176         of the tree to reduce contention.
177         On large heaps the algorithm achieves significant performance improvements
178         over serialized single-lock algorithm, for various insertion/deletion
179         workloads. For small heaps it still performs well, but not as well as
180         single-lock algorithm.
181
182         Template parameters:
183         - \p T - type to be stored in the queue. The priority is a part of \p T type.
184         - \p Traits - type traits. See \p mspriority_queue::traits for explanation.
185             It is possible to declare option-based queue with \p cds::container::mspriority_queue::make_traits
186             metafunction instead of \p Traits template argument.
187     */
188     template <typename T, class Traits = mspriority_queue::traits >
189     class MSPriorityQueue: public cds::bounded_container
190     {
191     public:
192         typedef T           value_type  ;   ///< Value type stored in the queue
193         typedef Traits      traits      ;   ///< Traits template parameter
194
195 #   ifdef CDS_DOXYGEN_INVOKED
196         typedef implementation_defined key_comparator  ;    ///< priority comparing functor based on opt::compare and opt::less option setter.
197 #   else
198         typedef typename opt::details::make_comparator< value_type, traits >::type key_comparator;
199 #   endif
200
201         typedef typename traits::lock_type lock_type;   ///< heap's size lock type
202         typedef typename traits::back_off  back_off;    ///< Back-off strategy
203         typedef typename traits::stat      stat;        ///< internal statistics type
204
205     protected:
206         //@cond
207         typedef cds::OS::ThreadId   tag_type;
208
209         enum tag_value {
210             Available   = -1,
211             Empty       = 0
212         };
213         //@endcond
214
215         //@cond
216         /// Heap item type
217         struct node {
218             value_type *        m_pVal  ;   ///< A value pointer
219             tag_type volatile   m_nTag  ;   ///< A tag
220             mutable lock_type   m_Lock  ;   ///< Node-level lock
221
222             /// Creates empty node
223             node()
224                 : m_pVal( nullptr )
225                 , m_nTag( tag_type(Empty) )
226             {}
227
228             /// Lock the node
229             void lock()
230             {
231                 m_Lock.lock();
232             }
233
234             /// Unlock the node
235             void unlock()
236             {
237                 m_Lock.unlock();
238             }
239         };
240         //@endcond
241
242     public:
243         typedef typename traits::buffer::template rebind<node>::other   buffer_type ;   ///< Heap array buffer type
244
245         //@cond
246         typedef cds::bitop::bit_reverse_counter<>           item_counter_type;
247         typedef typename item_counter_type::counter_type    counter_type;
248         //@endcond
249
250     protected:
251         item_counter_type   m_ItemCounter   ;   ///< Item counter
252         mutable lock_type   m_Lock          ;   ///< Heap's size lock
253         buffer_type         m_Heap          ;   ///< Heap array
254         stat                m_Stat          ;   ///< internal statistics accumulator
255
256     public:
257         /// Constructs empty priority queue
258         /**
259             For \p cds::opt::v::initialized_static_buffer the \p nCapacity parameter is ignored.
260         */
261         MSPriorityQueue( size_t nCapacity )
262             : m_Heap( nCapacity )
263         {}
264
265         /// Clears priority queue and destructs the object
266         ~MSPriorityQueue()
267         {
268             clear();
269         }
270
271         /// Inserts a item into priority queue
272         /**
273             If the priority queue is full, the function returns \p false,
274             no item has been added.
275             Otherwise, the function inserts the pointer to \p val into the heap
276             and returns \p true.
277
278             The function does not make a copy of \p val.
279         */
280         bool push( value_type& val )
281         {
282             tag_type const curId = cds::OS::get_current_thread_id();
283
284             // Insert new item at bottom of the heap
285             m_Lock.lock();
286             if ( m_ItemCounter.value() >= capacity() ) {
287                 // the heap is full
288                 m_Lock.unlock();
289                 m_Stat.onPushFailed();
290                 return false;
291             }
292
293             counter_type i = m_ItemCounter.inc();
294             assert( i < m_Heap.capacity() );
295
296             node& refNode = m_Heap[i];
297             refNode.lock();
298             m_Lock.unlock();
299             refNode.m_pVal = &val;
300             refNode.m_nTag = curId;
301             refNode.unlock();
302
303             // Move item towards top of the heap while it has higher priority than parent
304             heapify_after_push( i, curId );
305
306             m_Stat.onPushSuccess();
307             return true;
308         }
309
310         /// Extracts item with high priority
311         /**
312             If the priority queue is empty, the function returns \p nullptr.
313             Otherwise, it returns the item extracted.
314         */
315         value_type * pop()
316         {
317             m_Lock.lock();
318             if ( m_ItemCounter.value() == 0 ) {
319                 // the heap is empty
320                 m_Lock.unlock();
321                 m_Stat.onPopFailed();
322                 return nullptr;
323             }
324             counter_type nBottom = m_ItemCounter.reversed_value();
325             m_ItemCounter.dec();
326             assert( nBottom < m_Heap.capacity() );
327             assert( nBottom > 0 );
328
329             node& refBottom = m_Heap[ nBottom ];
330             refBottom.lock();
331             m_Lock.unlock();
332             refBottom.m_nTag = tag_type(Empty);
333             value_type * pVal = refBottom.m_pVal;
334             refBottom.m_pVal = nullptr;
335             refBottom.unlock();
336
337             node& refTop = m_Heap[ 1 ];
338             refTop.lock();
339             if ( refTop.m_nTag == tag_type(Empty) ) {
340                 // nBottom == nTop
341                 refTop.unlock();
342                 m_Stat.onPopSuccess();
343                 return pVal;
344             }
345
346             std::swap( refTop.m_pVal, pVal );
347             refTop.m_nTag = tag_type( Available );
348
349             // refTop will be unlocked inside heapify_after_pop
350             heapify_after_pop( &refTop );
351
352             m_Stat.onPopSuccess();
353             return pVal;
354         }
355
356         /// Clears the queue (not atomic)
357         /**
358             This function is no atomic, but thread-safe
359         */
360         void clear()
361         {
362             clear_with( []( value_type const& /*src*/ ) {} );
363         }
364
365         /// Clears the queue (not atomic)
366         /**
367             This function is no atomic, but thread-safe.
368
369             For each item removed the functor \p f is called.
370             \p Func interface is:
371             \code
372                 struct clear_functor
373                 {
374                     void operator()( value_type& item );
375                 };
376             \endcode
377             A lambda function or a function pointer can be used as \p f.
378         */
379         template <typename Func>
380         void clear_with( Func f )
381         {
382             while ( !empty() ) {
383                 value_type * pVal = pop();
384                 if ( pVal )
385                     f( *pVal );
386             }
387         }
388
389         /// Checks is the priority queue is empty
390         bool empty() const
391         {
392             return size() == 0;
393         }
394
395         /// Checks if the priority queue is full
396         bool full() const
397         {
398             return size() == capacity();
399         }
400
401         /// Returns current size of priority queue
402         size_t size() const
403         {
404             std::unique_lock<lock_type> l( m_Lock );
405             return static_cast<size_t>( m_ItemCounter.value());
406         }
407
408         /// Return capacity of the priority queue
409         size_t capacity() const
410         {
411             // m_Heap[0] is not used
412             return m_Heap.capacity() - 1;
413         }
414
415         /// Returns const reference to internal statistics
416         stat const& statistics() const
417         {
418             return m_Stat;
419         }
420
421     protected:
422         //@cond
423
424         void heapify_after_push( counter_type i, tag_type curId )
425         {
426             key_comparator  cmp;
427             back_off        bkoff;
428
429             // Move item towards top of the heap while it has higher priority than parent
430             while ( i > 1 ) {
431                 bool bProgress = true;
432                 counter_type nParent = i / 2;
433                 node& refParent = m_Heap[nParent];
434                 refParent.lock();
435                 node& refItem = m_Heap[i];
436                 refItem.lock();
437
438                 if ( refParent.m_nTag == tag_type(Available) && refItem.m_nTag == curId ) {
439                     if ( cmp( *refItem.m_pVal, *refParent.m_pVal ) > 0 ) {
440                         std::swap( refItem.m_nTag, refParent.m_nTag );
441                         std::swap( refItem.m_pVal, refParent.m_pVal );
442                         m_Stat.onPushHeapifySwap();
443                         i = nParent;
444                     }
445                     else {
446                         refItem.m_nTag = tag_type(Available);
447                         i = 0;
448                     }
449                 }
450                 else if ( refParent.m_nTag == tag_type( Empty ) ) {
451                     m_Stat.onItemMovedTop();
452                     i = 0;
453                 }
454                 else if ( refItem.m_nTag != curId ) {
455                     m_Stat.onItemMovedUp();
456                     i = nParent;
457                 }
458                 else {
459                     m_Stat.onPushEmptyPass();
460                     bProgress = false;
461                 }
462
463                 refItem.unlock();
464                 refParent.unlock();
465
466                 if ( !bProgress )
467                     bkoff();
468                 else
469                     bkoff.reset();
470             }
471
472             if ( i == 1 ) {
473                 node& refItem = m_Heap[i];
474                 refItem.lock();
475                 if ( refItem.m_nTag == curId )
476                     refItem.m_nTag = tag_type(Available);
477                 refItem.unlock();
478             }
479         }
480
481         void heapify_after_pop( node * pParent )
482         {
483             key_comparator cmp;
484             counter_type const nCapacity = m_Heap.capacity();
485
486             counter_type nParent = 1;
487             for ( counter_type nChild = nParent * 2; nChild < nCapacity; nChild *= 2 ) {
488                 node* pChild = &m_Heap[ nChild ];
489                 pChild->lock();
490
491                 if ( pChild->m_nTag == tag_type( Empty )) {
492                     pChild->unlock();
493                     break;
494                 }
495
496                 counter_type const nRight = nChild + 1;
497                 if ( nRight < nCapacity ) {
498                     node& refRight = m_Heap[nRight];
499                     refRight.lock();
500
501                     if ( refRight.m_nTag != tag_type( Empty ) && cmp( *refRight.m_pVal, *pChild->m_pVal ) > 0 ) {
502                         // get right child
503                         pChild->unlock();
504                         nChild = nRight;
505                         pChild = &refRight;
506                     }
507                     else
508                         refRight.unlock();
509                 }
510
511                 // If child has higher priority that parent then swap
512                 // Otherwise stop
513                 if ( cmp( *pChild->m_pVal, *pParent->m_pVal ) > 0 ) {
514                     std::swap( pParent->m_nTag, pChild->m_nTag );
515                     std::swap( pParent->m_pVal, pChild->m_pVal );
516                     pParent->unlock();
517                     m_Stat.onPopHeapifySwap();
518                     nParent = nChild;
519                     pParent = pChild;
520                 }
521                 else {
522                     pChild->unlock();
523                     break;
524                 }
525             }
526             pParent->unlock();
527         }
528         //@endcond
529     };
530
531 }} // namespace cds::intrusive
532
533 #endif // #ifndef CDSLIB_INTRUSIVE_MSPRIORITY_QUEUE_H