Fixed include directive
[libcds.git] / cds / container / fcpriority_queue.h
1 /*
2     This file is a part of libcds - Concurrent Data Structures library
3
4     (C) Copyright Maxim Khizhinsky (libcds.dev@gmail.com) 2006-2017
5
6     Source code repo: http://github.com/khizmax/libcds/
7     Download: http://sourceforge.net/projects/libcds/files/
8
9     Redistribution and use in source and binary forms, with or without
10     modification, are permitted provided that the following conditions are met:
11
12     * Redistributions of source code must retain the above copyright notice, this
13       list of conditions and the following disclaimer.
14
15     * Redistributions in binary form must reproduce the above copyright notice,
16       this list of conditions and the following disclaimer in the documentation
17       and/or other materials provided with the distribution.
18
19     THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
20     AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
21     IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
22     DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE
23     FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
24     DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
25     SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
26     CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,
27     OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
28     OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
29 */
30
31 #ifndef CDSLIB_CONTAINER_FCPRIORITY_QUEUE_H
32 #define CDSLIB_CONTAINER_FCPRIORITY_QUEUE_H
33
34 #include <cds/algo/flat_combining.h>
35 #include <cds/algo/elimination_opt.h>
36 #include <queue>
37
38 namespace cds { namespace container {
39
40     /// FCPriorityQueue related definitions
41     /** @ingroup cds_nonintrusive_helper
42     */
43     namespace fcpqueue {
44
45         /// FCPriorityQueue internal statistics
46         template <typename Counter = cds::atomicity::event_counter >
47         struct stat: public cds::algo::flat_combining::stat<Counter>
48         {
49             typedef cds::algo::flat_combining::stat<Counter>    flat_combining_stat; ///< Flat-combining statistics
50             typedef typename flat_combining_stat::counter_type  counter_type;        ///< Counter type
51
52             counter_type    m_nPush     ;  ///< Count of push operations
53             counter_type    m_nPushMove ;  ///< Count of push operations with move semantics
54             counter_type    m_nPop      ;  ///< Count of success pop operations
55             counter_type    m_nFailedPop;  ///< Count of failed pop operations (pop from empty queue)
56
57             //@cond
58             void    onPush()             { ++m_nPush; }
59             void    onPushMove()         { ++m_nPushMove; }
60             void    onPop( bool bFailed ) { if ( bFailed ) ++m_nFailedPop; else ++m_nPop;  }
61             //@endcond
62         };
63
64         /// FCPriorityQueue dummy statistics, no overhead
65         struct empty_stat: public cds::algo::flat_combining::empty_stat
66         {
67             //@cond
68             void    onPush()       {}
69             void    onPushMove()   {}
70             void    onPop(bool)    {}
71             //@endcond
72         };
73
74         /// FCPriorityQueue traits
75         struct traits: public cds::algo::flat_combining::traits
76         {
77             typedef empty_stat      stat;   ///< Internal statistics
78         };
79
80         /// Metafunction converting option list to traits
81         /**
82             \p Options are:
83             - any \p cds::algo::flat_combining::make_traits options
84             - \p opt::stat - internal statistics, possible type: \p fcpqueue::stat, \p fcpqueue::empty_stat (the default)
85         */
86         template <typename... Options>
87         struct make_traits {
88 #   ifdef CDS_DOXYGEN_INVOKED
89             typedef implementation_defined type ;   ///< Metafunction result
90 #   else
91             typedef typename cds::opt::make_options<
92                 typename cds::opt::find_type_traits< traits, Options... >::type
93                 ,Options...
94             >::type   type;
95 #   endif
96         };
97
98     } // namespace fcpqueue
99
100     /// Flat-combining priority queue
101     /**
102         @ingroup cds_nonintrusive_priority_queue
103         @ingroup cds_flat_combining_container
104
105         \ref cds_flat_combining_description "Flat combining" sequential priority queue.
106         The class can be considered as a concurrent FC-based wrapper for \p std::priority_queue.
107
108         Template parameters:
109         - \p T - a value type stored in the queue
110         - \p PriorityQueue - sequential priority queue implementation, default is \p std::priority_queue<T>
111         - \p Traits - type traits of flat combining, default is \p fcpqueue::traits.
112             \p fcpqueue::make_traits metafunction can be used to construct specialized \p %fcpqueue::traits
113     */
114     template <typename T,
115         class PriorityQueue = std::priority_queue<T>,
116         typename Traits = fcpqueue::traits
117     >
118     class FCPriorityQueue
119 #ifndef CDS_DOXYGEN_INVOKED
120         : public cds::algo::flat_combining::container
121 #endif
122     {
123     public:
124         typedef T               value_type;          ///< Value type
125         typedef PriorityQueue   priority_queue_type; ///< Sequential priority queue class
126         typedef Traits          traits;              ///< Priority queue type traits
127
128         typedef typename traits::stat  stat;    ///< Internal statistics type
129
130     protected:
131         //@cond
132         // Priority queue operation IDs
133         enum fc_operation {
134             op_push = cds::algo::flat_combining::req_Operation,
135             op_push_move,
136             op_pop,
137             op_clear
138         };
139
140         // Flat combining publication list record
141         struct fc_record: public cds::algo::flat_combining::publication_record
142         {
143             union {
144                 value_type const *  pValPush; // Value to push
145                 value_type *        pValPop;  // Pop destination
146             };
147             bool            bEmpty; // true if the queue is empty
148         };
149         //@endcond
150
151         /// Flat combining kernel
152         typedef cds::algo::flat_combining::kernel< fc_record, traits > fc_kernel;
153
154     protected:
155         //@cond
156         mutable fc_kernel   m_FlatCombining;
157         priority_queue_type m_PQueue;
158         //@endcond
159
160     public:
161         /// Initializes empty priority queue object
162         FCPriorityQueue()
163         {}
164
165         /// Initializes empty priority queue object and gives flat combining parameters
166         FCPriorityQueue(
167             unsigned int nCompactFactor     ///< Flat combining: publication list compacting factor
168             ,unsigned int nCombinePassCount ///< Flat combining: number of combining passes for combiner thread
169             )
170             : m_FlatCombining( nCompactFactor, nCombinePassCount )
171         {}
172
173         /// Inserts a new element in the priority queue
174         /**
175             The function always returns \p true
176         */
177         bool push(
178             value_type const& val ///< Value to be copied to inserted element
179         )
180         {
181             auto pRec = m_FlatCombining.acquire_record();
182             pRec->pValPush = &val;
183
184             m_FlatCombining.combine( op_push, pRec, *this );
185
186             assert( pRec->is_done());
187             m_FlatCombining.release_record( pRec );
188             m_FlatCombining.internal_statistics().onPush();
189             return true;
190         }
191
192         /// Inserts a new element in the priority queue (move semantics)
193         /**
194             The function always returns \p true
195         */
196         bool push(
197             value_type&& val ///< Value to be moved to inserted element
198         )
199         {
200             auto pRec = m_FlatCombining.acquire_record();
201             pRec->pValPush = &val;
202
203             m_FlatCombining.combine( op_push_move, pRec, *this );
204
205             assert( pRec->is_done());
206             m_FlatCombining.release_record( pRec );
207             m_FlatCombining.internal_statistics().onPushMove();
208             return true;
209         }
210
211         /// Removes the top element from priority queue
212         /**
213             The function returns \p false if the queue is empty, \p true otherwise.
214             If the queue is empty \p val is not changed.
215         */
216         bool pop(
217             value_type& val ///< Target to be received the copy of top element
218         )
219         {
220             auto pRec = m_FlatCombining.acquire_record();
221             pRec->pValPop = &val;
222
223             m_FlatCombining.combine( op_pop, pRec, *this );
224
225             assert( pRec->is_done());
226             m_FlatCombining.release_record( pRec );
227             m_FlatCombining.internal_statistics().onPop( pRec->bEmpty );
228             return !pRec->bEmpty;
229         }
230
231         /// Clears the priority queue
232         void clear()
233         {
234             auto pRec = m_FlatCombining.acquire_record();
235
236            m_FlatCombining.combine( op_clear, pRec, *this );
237
238             assert( pRec->is_done());
239             m_FlatCombining.release_record( pRec );
240         }
241
242         /// Returns the number of elements in the priority queue.
243         /**
244             Note that <tt>size() == 0</tt> does not mean that the queue is empty because
245             combining record can be in process.
246             To check emptiness use \ref empty function.
247         */
248         size_t size() const
249         {
250             return m_PQueue.size();
251         }
252
253         /// Checks if the priority queue is empty
254         /**
255             If the combining is in process the function waits while combining done.
256         */
257         bool empty()
258         {
259             bool bRet = false;
260             auto const& pq = m_PQueue;
261             m_FlatCombining.invoke_exclusive( [&pq, &bRet]() { bRet = pq.empty(); } );
262             return bRet;
263         }
264
265         /// Internal statistics
266         stat const& statistics() const
267         {
268             return m_FlatCombining.statistics();
269         }
270
271     public: // flat combining cooperation, not for direct use!
272         //@cond
273         /*
274             The function is called by \ref cds::algo::flat_combining::kernel "flat combining kernel"
275             object if the current thread becomes a combiner. Invocation of the function means that
276             the priority queue should perform an action recorded in \p pRec.
277         */
278         void fc_apply( fc_record * pRec )
279         {
280             assert( pRec );
281
282             // this function is called under FC mutex, so switch TSan off
283             //CDS_TSAN_ANNOTATE_IGNORE_RW_BEGIN;
284
285             switch ( pRec->op()) {
286             case op_push:
287                 assert( pRec->pValPush );
288                 m_PQueue.push( *(pRec->pValPush));
289                 break;
290             case op_push_move:
291                 assert( pRec->pValPush );
292                 m_PQueue.push( std::move( *(pRec->pValPush )));
293                 break;
294             case op_pop:
295                 assert( pRec->pValPop );
296                 pRec->bEmpty = m_PQueue.empty();
297                 if ( !pRec->bEmpty ) {
298                     *(pRec->pValPop) = std::move( m_PQueue.top());
299                     m_PQueue.pop();
300                 }
301                 break;
302             case op_clear:
303                 while ( !m_PQueue.empty())
304                     m_PQueue.pop();
305                 break;
306             default:
307                 assert(false);
308                 break;
309             }
310
311             //CDS_TSAN_ANNOTATE_IGNORE_RW_END;
312         }
313         //@endcond
314     };
315
316 }} // namespace cds::container
317
318 #endif // #ifndef CDSLIB_CONTAINER_FCPRIORITY_QUEUE_H