3 #ifndef __CDS_CONTAINER_FCQUEUE_H
4 #define __CDS_CONTAINER_FCQUEUE_H
6 #include <cds/algo/flat_combining.h>
7 #include <cds/algo/elimination_opt.h>
10 namespace cds { namespace container {
12 /// FCQueue related definitions
13 /** @ingroup cds_nonintrusive_helper
17 /// FCQueue internal statistics
18 template <typename Counter = cds::atomicity::event_counter >
19 struct stat: public cds::algo::flat_combining::stat<Counter>
21 typedef cds::algo::flat_combining::stat<Counter> flat_combining_stat; ///< Flat-combining statistics
22 typedef typename flat_combining_stat::counter_type counter_type; ///< Counter type
24 counter_type m_nEnqueue ; ///< Count of enqueue operations
25 counter_type m_nEnqMove ; ///< Count of enqueue operations with move semantics
26 counter_type m_nDequeue ; ///< Count of success dequeue operations
27 counter_type m_nFailedDeq ; ///< Count of failed dequeue operations (pop from empty queue)
28 counter_type m_nCollided ; ///< How many pairs of enqueue/dequeue were collided, if elimination is enabled
31 void onEnqueue() { ++m_nEnqueue; }
32 void onEnqMove() { ++m_nEnqMove; }
33 void onDequeue( bool bFailed ) { if ( bFailed ) ++m_nFailedDeq; else ++m_nDequeue; }
34 void onCollide() { ++m_nCollided; }
38 /// FCQueue dummy statistics, no overhead
39 struct empty_stat: public cds::algo::flat_combining::empty_stat
44 void onDequeue(bool) {}
49 /// FCQueue type traits
50 struct type_traits: public cds::algo::flat_combining::type_traits
52 typedef empty_stat stat; ///< Internal statistics
53 static CDS_CONSTEXPR_CONST bool enable_elimination = false; ///< Enable \ref cds_elimination_description "elimination"
56 /// Metafunction converting option list to traits
58 This is a wrapper for <tt> cds::opt::make_options< type_traits, Options...> </tt>
60 - \p opt::lock_type - mutex type, default is \p cds::lock::Spin
61 - \p opt::back_off - back-off strategy, defalt is \p cds::backoff::Default
62 - \p opt::allocator - allocator type, default is \ref CDS_DEFAULT_ALLOCATOR
63 - \p opt::stat - internal statistics, possible type: \ref stat, \ref empty_stat (the default)
64 - \p opt::memory_model - C++ memory ordering model.
65 List of all available memory ordering see opt::memory_model.
66 Default if cds::opt::v:relaxed_ordering
67 - \p opt::enable_elimination - enable/disable operation \ref cds_elimination_description "elimination"
68 By default, the elimination is disabled. For queue, the elimination is possible if the queue
71 template <CDS_DECL_OPTIONS8>
73 # ifdef CDS_DOXYGEN_INVOKED
74 typedef implementation_defined type ; ///< Metafunction result
76 typedef typename cds::opt::make_options<
77 typename cds::opt::find_type_traits< type_traits, CDS_OPTIONS8 >::type
83 } // namespace fcqueue
85 /// Flat-combining queue
87 @ingroup cds_nonintrusive_queue
88 @ingroup cds_flat_combining_container
90 \ref cds_flat_combining_description "Flat combining" sequential queue.
91 The class can be considered as a concurrent FC-based wrapper for \p std::queue.
94 - \p T - a value type stored in the queue
95 - \p Queue - sequential queue implementation, default is \p std::queue<T>
96 - \p Trats - type traits of flat combining, default is \p fcqueue::type_traits.
97 \p fcqueue::make_traits metafunction can be used to construct specialized \p %type_traits
100 class Queue = std::queue<T>,
101 typename Traits = fcqueue::type_traits
104 #ifndef CDS_DOXYGEN_INVOKED
105 : public cds::algo::flat_combining::container
109 typedef T value_type; ///< Value type
110 typedef Queue queue_type; ///< Sequential queue class
111 typedef Traits type_traits; ///< Queue type traits
113 typedef typename type_traits::stat stat; ///< Internal statistics type
114 static CDS_CONSTEXPR_CONST bool c_bEliminationEnabled = type_traits::enable_elimination; ///< \p true if elimination is enabled
118 /// Queue operation IDs
120 op_enq = cds::algo::flat_combining::req_Operation, ///< Enqueue
121 op_enq_move, ///< Enqueue (move semantics)
126 /// Flat combining publication list record
127 struct fc_record: public cds::algo::flat_combining::publication_record
130 value_type const * pValEnq; ///< Value to enqueue
131 value_type * pValDeq; ///< Dequeue destination
133 bool bEmpty; ///< \p true if the queue is empty
137 /// Flat combining kernel
138 typedef cds::algo::flat_combining::kernel< fc_record, type_traits > fc_kernel;
142 fc_kernel m_FlatCombining;
147 /// Initializes empty queue object
151 /// Initializes empty queue object and gives flat combining parameters
153 unsigned int nCompactFactor ///< Flat combining: publication list compacting factor
154 ,unsigned int nCombinePassCount ///< Flat combining: number of combining passes for combiner thread
156 : m_FlatCombining( nCompactFactor, nCombinePassCount )
159 /// Inserts a new element at the end of the queue
161 The content of the new element initialized to a copy of \p val.
163 The function always returns \p true
165 bool enqueue( value_type const& val )
167 fc_record * pRec = m_FlatCombining.acquire_record();
168 pRec->pValEnq = &val;
170 if ( c_bEliminationEnabled )
171 m_FlatCombining.batch_combine( op_enq, pRec, *this );
173 m_FlatCombining.combine( op_enq, pRec, *this );
175 assert( pRec->is_done() );
176 m_FlatCombining.release_record( pRec );
177 m_FlatCombining.internal_statistics().onEnqueue();
181 /// Inserts a new element at the end of the queue (a synonym for \ref enqueue)
182 bool push( value_type const& val )
184 return enqueue( val );
187 # ifdef CDS_MOVE_SEMANTICS_SUPPORT
188 /// Inserts a new element at the end of the queue (move semantics)
190 \p val is moved to inserted element
192 bool enqueue( value_type&& val )
194 fc_record * pRec = m_FlatCombining.acquire_record();
195 pRec->pValEnq = &val;
197 if ( c_bEliminationEnabled )
198 m_FlatCombining.batch_combine( op_enq_move, pRec, *this );
200 m_FlatCombining.combine( op_enq_move, pRec, *this );
202 assert( pRec->is_done() );
203 m_FlatCombining.release_record( pRec );
205 m_FlatCombining.internal_statistics().onEnqMove();
209 /// Inserts a new element at the end of the queue (move semantics, synonym for \p enqueue)
210 bool push( value_type&& val )
212 return enqueue( val );
216 /// Removes the next element from the queue
218 \p val takes a copy of the element
220 bool dequeue( value_type& val )
222 fc_record * pRec = m_FlatCombining.acquire_record();
223 pRec->pValDeq = &val;
225 if ( c_bEliminationEnabled )
226 m_FlatCombining.batch_combine( op_deq, pRec, *this );
228 m_FlatCombining.combine( op_deq, pRec, *this );
230 assert( pRec->is_done() );
231 m_FlatCombining.release_record( pRec );
233 m_FlatCombining.internal_statistics().onDequeue( pRec->bEmpty );
234 return !pRec->bEmpty;
237 /// Removes the next element from the queue (a synonym for \ref dequeue)
238 bool pop( value_type& val )
240 return dequeue( val );
246 fc_record * pRec = m_FlatCombining.acquire_record();
248 if ( c_bEliminationEnabled )
249 m_FlatCombining.batch_combine( op_clear, pRec, *this );
251 m_FlatCombining.combine( op_clear, pRec, *this );
253 assert( pRec->is_done() );
254 m_FlatCombining.release_record( pRec );
257 /// Returns the number of elements in the queue.
259 Note that <tt>size() == 0</tt> is not mean that the queue is empty because
260 combining record can be in process.
261 To check emptiness use \ref empty function.
265 return m_Queue.size();
268 /// Checks if the queue is empty
270 If the combining is in process the function waits while combining done.
274 m_FlatCombining.wait_while_combining();
275 return m_Queue.empty();
278 /// Internal statistics
279 stat const& statistics() const
281 return m_FlatCombining.statistics();
284 public: // flat combining cooperation, not for direct use!
286 /// Flat combining supporting function. Do not call it directly!
288 The function is called by \ref cds::algo::flat_combining::kernel "flat combining kernel"
289 object if the current thread becomes a combiner. Invocation of the function means that
290 the queue should perform an action recorded in \p pRec.
292 void fc_apply( fc_record * pRec )
296 switch ( pRec->op() ) {
298 assert( pRec->pValEnq );
299 m_Queue.push( *(pRec->pValEnq ) );
301 # ifdef CDS_MOVE_SEMANTICS_SUPPORT
303 assert( pRec->pValEnq );
304 m_Queue.push( std::move( *(pRec->pValEnq )) );
308 assert( pRec->pValDeq );
309 pRec->bEmpty = m_Queue.empty();
310 if ( !pRec->bEmpty ) {
311 *(pRec->pValDeq) = m_Queue.front();
316 while ( !m_Queue.empty() )
325 /// Batch-processing flat combining
326 void fc_process( typename fc_kernel::iterator itBegin, typename fc_kernel::iterator itEnd )
328 typedef typename fc_kernel::iterator fc_iterator;
329 for ( fc_iterator it = itBegin, itPrev = itEnd; it != itEnd; ++it ) {
330 switch ( it->op() ) {
334 if ( m_Queue.empty() ) {
335 if ( itPrev != itEnd && collide( *itPrev, *it ))
348 bool collide( fc_record& rec1, fc_record& rec2 )
350 switch ( rec1.op() ) {
352 if ( rec2.op() == op_deq ) {
353 assert(rec1.pValEnq);
354 assert(rec2.pValDeq);
355 *rec2.pValDeq = *rec1.pValEnq;
360 # ifdef CDS_MOVE_SEMANTICS_SUPPORT
362 if ( rec2.op() == op_deq ) {
363 assert(rec1.pValEnq);
364 assert(rec2.pValDeq);
365 *rec2.pValDeq = std::move( *rec1.pValEnq );
372 switch ( rec2.op() ) {
374 # ifdef CDS_MOVE_SEMANTICS_SUPPORT
377 return collide( rec2, rec1 );
383 m_FlatCombining.operation_done( rec1 );
384 m_FlatCombining.operation_done( rec2 );
385 m_FlatCombining.internal_statistics().onCollide();
391 }} // namespace cds::container
393 #endif // #ifndef __CDS_CONTAINER_FCQUEUE_H