3 #ifndef __CDS_CONTAINER_FCQUEUE_H
4 #define __CDS_CONTAINER_FCQUEUE_H
6 #include <cds/algo/flat_combining.h>
7 #include <cds/algo/elimination_opt.h>
10 namespace cds { namespace container {
12 /// FCQueue related definitions
13 /** @ingroup cds_nonintrusive_helper
17 /// FCQueue internal statistics
18 template <typename Counter = cds::atomicity::event_counter >
19 struct stat: public cds::algo::flat_combining::stat<Counter>
21 typedef cds::algo::flat_combining::stat<Counter> flat_combining_stat; ///< Flat-combining statistics
22 typedef typename flat_combining_stat::counter_type counter_type; ///< Counter type
24 counter_type m_nEnqueue ; ///< Count of enqueue operations
25 counter_type m_nEnqMove ; ///< Count of enqueue operations with move semantics
26 counter_type m_nDequeue ; ///< Count of success dequeue operations
27 counter_type m_nFailedDeq ; ///< Count of failed dequeue operations (pop from empty queue)
28 counter_type m_nCollided ; ///< How many pairs of enqueue/dequeue were collided, if elimination is enabled
31 void onEnqueue() { ++m_nEnqueue; }
32 void onEnqMove() { ++m_nEnqMove; }
33 void onDequeue( bool bFailed ) { if ( bFailed ) ++m_nFailedDeq; else ++m_nDequeue; }
34 void onCollide() { ++m_nCollided; }
38 /// FCQueue dummy statistics, no overhead
39 struct empty_stat: public cds::algo::flat_combining::empty_stat
44 void onDequeue(bool) {}
49 /// FCQueue type traits
50 struct type_traits: public cds::algo::flat_combining::type_traits
52 typedef empty_stat stat; ///< Internal statistics
53 static CDS_CONSTEXPR_CONST bool enable_elimination = false; ///< Enable \ref cds_elimination_description "elimination"
56 /// Metafunction converting option list to traits
58 This is a wrapper for <tt> cds::opt::make_options< type_traits, Options...> </tt>
60 - \p opt::lock_type - mutex type, default is \p cds::lock::Spin
61 - \p opt::back_off - back-off strategy, defalt is \p cds::backoff::Default
62 - \p opt::allocator - allocator type, default is \ref CDS_DEFAULT_ALLOCATOR
63 - \p opt::stat - internal statistics, possible type: \ref stat, \ref empty_stat (the default)
64 - \p opt::memory_model - C++ memory ordering model.
65 List of all available memory ordering see opt::memory_model.
66 Default if cds::opt::v:relaxed_ordering
67 - \p opt::enable_elimination - enable/disable operation \ref cds_elimination_description "elimination"
68 By default, the elimination is disabled. For queue, the elimination is possible if the queue
71 template <typename... Options>
73 # ifdef CDS_DOXYGEN_INVOKED
74 typedef implementation_defined type ; ///< Metafunction result
76 typedef typename cds::opt::make_options<
77 typename cds::opt::find_type_traits< type_traits, Options... >::type
83 } // namespace fcqueue
85 /// Flat-combining queue
87 @ingroup cds_nonintrusive_queue
88 @ingroup cds_flat_combining_container
90 \ref cds_flat_combining_description "Flat combining" sequential queue.
91 The class can be considered as a concurrent FC-based wrapper for \p std::queue.
94 - \p T - a value type stored in the queue
95 - \p Queue - sequential queue implementation, default is \p std::queue<T>
96 - \p Trats - type traits of flat combining, default is \p fcqueue::type_traits.
97 \p fcqueue::make_traits metafunction can be used to construct specialized \p %type_traits
100 class Queue = std::queue<T>,
101 typename Traits = fcqueue::type_traits
104 #ifndef CDS_DOXYGEN_INVOKED
105 : public cds::algo::flat_combining::container
109 typedef T value_type; ///< Value type
110 typedef Queue queue_type; ///< Sequential queue class
111 typedef Traits type_traits; ///< Queue type traits
113 typedef typename type_traits::stat stat; ///< Internal statistics type
114 static CDS_CONSTEXPR_CONST bool c_bEliminationEnabled = type_traits::enable_elimination; ///< \p true if elimination is enabled
118 /// Queue operation IDs
120 op_enq = cds::algo::flat_combining::req_Operation, ///< Enqueue
121 op_enq_move, ///< Enqueue (move semantics)
126 /// Flat combining publication list record
127 struct fc_record: public cds::algo::flat_combining::publication_record
130 value_type const * pValEnq; ///< Value to enqueue
131 value_type * pValDeq; ///< Dequeue destination
133 bool bEmpty; ///< \p true if the queue is empty
137 /// Flat combining kernel
138 typedef cds::algo::flat_combining::kernel< fc_record, type_traits > fc_kernel;
142 fc_kernel m_FlatCombining;
147 /// Initializes empty queue object
151 /// Initializes empty queue object and gives flat combining parameters
153 unsigned int nCompactFactor ///< Flat combining: publication list compacting factor
154 ,unsigned int nCombinePassCount ///< Flat combining: number of combining passes for combiner thread
156 : m_FlatCombining( nCompactFactor, nCombinePassCount )
159 /// Inserts a new element at the end of the queue
161 The content of the new element initialized to a copy of \p val.
163 The function always returns \p true
165 bool enqueue( value_type const& val )
167 fc_record * pRec = m_FlatCombining.acquire_record();
168 pRec->pValEnq = &val;
170 if ( c_bEliminationEnabled )
171 m_FlatCombining.batch_combine( op_enq, pRec, *this );
173 m_FlatCombining.combine( op_enq, pRec, *this );
175 assert( pRec->is_done() );
176 m_FlatCombining.release_record( pRec );
177 m_FlatCombining.internal_statistics().onEnqueue();
181 /// Inserts a new element at the end of the queue (a synonym for \ref enqueue)
182 bool push( value_type const& val )
184 return enqueue( val );
187 /// Inserts a new element at the end of the queue (move semantics)
189 \p val is moved to inserted element
191 bool enqueue( value_type&& val )
193 fc_record * pRec = m_FlatCombining.acquire_record();
194 pRec->pValEnq = &val;
196 if ( c_bEliminationEnabled )
197 m_FlatCombining.batch_combine( op_enq_move, pRec, *this );
199 m_FlatCombining.combine( op_enq_move, pRec, *this );
201 assert( pRec->is_done() );
202 m_FlatCombining.release_record( pRec );
204 m_FlatCombining.internal_statistics().onEnqMove();
208 /// Inserts a new element at the end of the queue (move semantics, synonym for \p enqueue)
209 bool push( value_type&& val )
211 return enqueue( val );
214 /// Removes the next element from the queue
216 \p val takes a copy of the element
218 bool dequeue( value_type& val )
220 fc_record * pRec = m_FlatCombining.acquire_record();
221 pRec->pValDeq = &val;
223 if ( c_bEliminationEnabled )
224 m_FlatCombining.batch_combine( op_deq, pRec, *this );
226 m_FlatCombining.combine( op_deq, pRec, *this );
228 assert( pRec->is_done() );
229 m_FlatCombining.release_record( pRec );
231 m_FlatCombining.internal_statistics().onDequeue( pRec->bEmpty );
232 return !pRec->bEmpty;
235 /// Removes the next element from the queue (a synonym for \ref dequeue)
236 bool pop( value_type& val )
238 return dequeue( val );
244 fc_record * pRec = m_FlatCombining.acquire_record();
246 if ( c_bEliminationEnabled )
247 m_FlatCombining.batch_combine( op_clear, pRec, *this );
249 m_FlatCombining.combine( op_clear, pRec, *this );
251 assert( pRec->is_done() );
252 m_FlatCombining.release_record( pRec );
255 /// Returns the number of elements in the queue.
257 Note that <tt>size() == 0</tt> is not mean that the queue is empty because
258 combining record can be in process.
259 To check emptiness use \ref empty function.
263 return m_Queue.size();
266 /// Checks if the queue is empty
268 If the combining is in process the function waits while combining done.
272 m_FlatCombining.wait_while_combining();
273 return m_Queue.empty();
276 /// Internal statistics
277 stat const& statistics() const
279 return m_FlatCombining.statistics();
282 public: // flat combining cooperation, not for direct use!
284 /// Flat combining supporting function. Do not call it directly!
286 The function is called by \ref cds::algo::flat_combining::kernel "flat combining kernel"
287 object if the current thread becomes a combiner. Invocation of the function means that
288 the queue should perform an action recorded in \p pRec.
290 void fc_apply( fc_record * pRec )
294 switch ( pRec->op() ) {
296 assert( pRec->pValEnq );
297 m_Queue.push( *(pRec->pValEnq ) );
300 assert( pRec->pValEnq );
301 m_Queue.push( std::move( *(pRec->pValEnq )) );
304 assert( pRec->pValDeq );
305 pRec->bEmpty = m_Queue.empty();
306 if ( !pRec->bEmpty ) {
307 *(pRec->pValDeq) = m_Queue.front();
312 while ( !m_Queue.empty() )
321 /// Batch-processing flat combining
322 void fc_process( typename fc_kernel::iterator itBegin, typename fc_kernel::iterator itEnd )
324 typedef typename fc_kernel::iterator fc_iterator;
325 for ( fc_iterator it = itBegin, itPrev = itEnd; it != itEnd; ++it ) {
326 switch ( it->op() ) {
330 if ( m_Queue.empty() ) {
331 if ( itPrev != itEnd && collide( *itPrev, *it ))
344 bool collide( fc_record& rec1, fc_record& rec2 )
346 switch ( rec1.op() ) {
348 if ( rec2.op() == op_deq ) {
349 assert(rec1.pValEnq);
350 assert(rec2.pValDeq);
351 *rec2.pValDeq = *rec1.pValEnq;
357 if ( rec2.op() == op_deq ) {
358 assert(rec1.pValEnq);
359 assert(rec2.pValDeq);
360 *rec2.pValDeq = std::move( *rec1.pValEnq );
366 switch ( rec2.op() ) {
369 return collide( rec2, rec1 );
375 m_FlatCombining.operation_done( rec1 );
376 m_FlatCombining.operation_done( rec2 );
377 m_FlatCombining.internal_statistics().onCollide();
383 }} // namespace cds::container
385 #endif // #ifndef __CDS_CONTAINER_FCQUEUE_H