3 #ifndef CDSLIB_CONTAINER_FCQUEUE_H
4 #define CDSLIB_CONTAINER_FCQUEUE_H
6 #include <cds/algo/flat_combining.h>
7 #include <cds/algo/elimination_opt.h>
10 namespace cds { namespace container {
12 /// FCQueue related definitions
13 /** @ingroup cds_nonintrusive_helper
17 /// FCQueue internal statistics
18 template <typename Counter = cds::atomicity::event_counter >
19 struct stat: public cds::algo::flat_combining::stat<Counter>
21 typedef cds::algo::flat_combining::stat<Counter> flat_combining_stat; ///< Flat-combining statistics
22 typedef typename flat_combining_stat::counter_type counter_type; ///< Counter type
24 counter_type m_nEnqueue ; ///< Count of enqueue operations
25 counter_type m_nEnqMove ; ///< Count of enqueue operations with move semantics
26 counter_type m_nDequeue ; ///< Count of success dequeue operations
27 counter_type m_nFailedDeq ; ///< Count of failed dequeue operations (pop from empty queue)
28 counter_type m_nCollided ; ///< How many pairs of enqueue/dequeue were collided, if elimination is enabled
31 void onEnqueue() { ++m_nEnqueue; }
32 void onEnqMove() { ++m_nEnqMove; }
33 void onDequeue( bool bFailed ) { if ( bFailed ) ++m_nFailedDeq; else ++m_nDequeue; }
34 void onCollide() { ++m_nCollided; }
38 /// FCQueue dummy statistics, no overhead
39 struct empty_stat: public cds::algo::flat_combining::empty_stat
44 void onDequeue(bool) {}
49 /// FCQueue type traits
50 struct traits: public cds::algo::flat_combining::traits
52 typedef empty_stat stat; ///< Internal statistics
53 static CDS_CONSTEXPR const bool enable_elimination = false; ///< Enable \ref cds_elimination_description "elimination"
56 /// Metafunction converting option list to traits
59 - \p opt::lock_type - mutex type, default is \p cds::sync::spin
60 - \p opt::back_off - back-off strategy, defalt is \p cds::backoff::delay_of<2>
61 - \p opt::allocator - allocator type, default is \ref CDS_DEFAULT_ALLOCATOR
62 - \p opt::stat - internal statistics, possible type: \p fcqueue::stat, \p fcqueue::empty_stat (the default)
63 - \p opt::memory_model - C++ memory ordering model.
64 List of all available memory ordering see \p opt::memory_model.
65 Default is \p cds::opt::v:relaxed_ordering
66 - \p opt::enable_elimination - enable/disable operation \ref cds_elimination_description "elimination"
67 By default, the elimination is disabled. For queue, the elimination is possible if the queue
70 template <typename... Options>
72 # ifdef CDS_DOXYGEN_INVOKED
73 typedef implementation_defined type ; ///< Metafunction result
75 typedef typename cds::opt::make_options<
76 typename cds::opt::find_type_traits< traits, Options... >::type
82 } // namespace fcqueue
84 /// Flat-combining queue
86 @ingroup cds_nonintrusive_queue
87 @ingroup cds_flat_combining_container
89 \ref cds_flat_combining_description "Flat combining" sequential queue.
90 The class can be considered as a concurrent FC-based wrapper for \p std::queue.
93 - \p T - a value type stored in the queue
94 - \p Queue - sequential queue implementation, default is \p std::queue<T>
95 - \p Trats - type traits of flat combining, default is \p fcqueue::traits.
96 \p fcqueue::make_traits metafunction can be used to construct \p %fcqueue::traits specialization.
99 class Queue = std::queue<T>,
100 typename Traits = fcqueue::traits
103 #ifndef CDS_DOXYGEN_INVOKED
104 : public cds::algo::flat_combining::container
108 typedef T value_type; ///< Value type
109 typedef Queue queue_type; ///< Sequential queue class
110 typedef Traits traits; ///< Queue type traits
112 typedef typename traits::stat stat; ///< Internal statistics type
113 static CDS_CONSTEXPR const bool c_bEliminationEnabled = traits::enable_elimination; ///< \p true if elimination is enabled
117 /// Queue operation IDs
119 op_enq = cds::algo::flat_combining::req_Operation, ///< Enqueue
120 op_enq_move, ///< Enqueue (move semantics)
125 /// Flat combining publication list record
126 struct fc_record: public cds::algo::flat_combining::publication_record
129 value_type const * pValEnq; ///< Value to enqueue
130 value_type * pValDeq; ///< Dequeue destination
132 bool bEmpty; ///< \p true if the queue is empty
136 /// Flat combining kernel
137 typedef cds::algo::flat_combining::kernel< fc_record, traits > fc_kernel;
141 fc_kernel m_FlatCombining;
146 /// Initializes empty queue object
150 /// Initializes empty queue object and gives flat combining parameters
152 unsigned int nCompactFactor ///< Flat combining: publication list compacting factor
153 ,unsigned int nCombinePassCount ///< Flat combining: number of combining passes for combiner thread
155 : m_FlatCombining( nCompactFactor, nCombinePassCount )
158 /// Inserts a new element at the end of the queue
160 The content of the new element initialized to a copy of \p val.
162 The function always returns \p true
164 bool enqueue( value_type const& val )
166 fc_record * pRec = m_FlatCombining.acquire_record();
167 pRec->pValEnq = &val;
169 if ( c_bEliminationEnabled )
170 m_FlatCombining.batch_combine( op_enq, pRec, *this );
172 m_FlatCombining.combine( op_enq, pRec, *this );
174 assert( pRec->is_done() );
175 m_FlatCombining.release_record( pRec );
176 m_FlatCombining.internal_statistics().onEnqueue();
180 /// Inserts a new element at the end of the queue (a synonym for \ref enqueue)
181 bool push( value_type const& val )
183 return enqueue( val );
186 /// Inserts a new element at the end of the queue (move semantics)
188 \p val is moved to inserted element
190 bool enqueue( value_type&& val )
192 fc_record * pRec = m_FlatCombining.acquire_record();
193 pRec->pValEnq = &val;
195 if ( c_bEliminationEnabled )
196 m_FlatCombining.batch_combine( op_enq_move, pRec, *this );
198 m_FlatCombining.combine( op_enq_move, pRec, *this );
200 assert( pRec->is_done() );
201 m_FlatCombining.release_record( pRec );
203 m_FlatCombining.internal_statistics().onEnqMove();
207 /// Inserts a new element at the end of the queue (move semantics, synonym for \p enqueue)
208 bool push( value_type&& val )
210 return enqueue( val );
213 /// Removes the next element from the queue
215 \p val takes a copy of the element
217 bool dequeue( value_type& val )
219 fc_record * pRec = m_FlatCombining.acquire_record();
220 pRec->pValDeq = &val;
222 if ( c_bEliminationEnabled )
223 m_FlatCombining.batch_combine( op_deq, pRec, *this );
225 m_FlatCombining.combine( op_deq, pRec, *this );
227 assert( pRec->is_done() );
228 m_FlatCombining.release_record( pRec );
230 m_FlatCombining.internal_statistics().onDequeue( pRec->bEmpty );
231 return !pRec->bEmpty;
234 /// Removes the next element from the queue (a synonym for \ref dequeue)
235 bool pop( value_type& val )
237 return dequeue( val );
243 fc_record * pRec = m_FlatCombining.acquire_record();
245 if ( c_bEliminationEnabled )
246 m_FlatCombining.batch_combine( op_clear, pRec, *this );
248 m_FlatCombining.combine( op_clear, pRec, *this );
250 assert( pRec->is_done() );
251 m_FlatCombining.release_record( pRec );
254 /// Returns the number of elements in the queue.
256 Note that <tt>size() == 0</tt> is not mean that the queue is empty because
257 combining record can be in process.
258 To check emptiness use \ref empty function.
262 return m_Queue.size();
265 /// Checks if the queue is empty
267 If the combining is in process the function waits while combining done.
271 m_FlatCombining.wait_while_combining();
272 return m_Queue.empty();
275 /// Internal statistics
276 stat const& statistics() const
278 return m_FlatCombining.statistics();
281 public: // flat combining cooperation, not for direct use!
283 /// Flat combining supporting function. Do not call it directly!
285 The function is called by \ref cds::algo::flat_combining::kernel "flat combining kernel"
286 object if the current thread becomes a combiner. Invocation of the function means that
287 the queue should perform an action recorded in \p pRec.
289 void fc_apply( fc_record * pRec )
293 switch ( pRec->op() ) {
295 assert( pRec->pValEnq );
296 m_Queue.push( *(pRec->pValEnq ) );
299 assert( pRec->pValEnq );
300 m_Queue.push( std::move( *(pRec->pValEnq )) );
303 assert( pRec->pValDeq );
304 pRec->bEmpty = m_Queue.empty();
305 if ( !pRec->bEmpty ) {
306 *(pRec->pValDeq) = m_Queue.front();
311 while ( !m_Queue.empty() )
320 /// Batch-processing flat combining
321 void fc_process( typename fc_kernel::iterator itBegin, typename fc_kernel::iterator itEnd )
323 typedef typename fc_kernel::iterator fc_iterator;
324 for ( fc_iterator it = itBegin, itPrev = itEnd; it != itEnd; ++it ) {
325 switch ( it->op() ) {
329 if ( m_Queue.empty() ) {
330 if ( itPrev != itEnd && collide( *itPrev, *it ))
343 bool collide( fc_record& rec1, fc_record& rec2 )
345 switch ( rec1.op() ) {
347 if ( rec2.op() == op_deq ) {
348 assert(rec1.pValEnq);
349 assert(rec2.pValDeq);
350 *rec2.pValDeq = *rec1.pValEnq;
356 if ( rec2.op() == op_deq ) {
357 assert(rec1.pValEnq);
358 assert(rec2.pValDeq);
359 *rec2.pValDeq = std::move( *rec1.pValEnq );
365 switch ( rec2.op() ) {
368 return collide( rec2, rec1 );
374 m_FlatCombining.operation_done( rec1 );
375 m_FlatCombining.operation_done( rec2 );
376 m_FlatCombining.internal_statistics().onCollide();
382 }} // namespace cds::container
384 #endif // #ifndef CDSLIB_CONTAINER_FCQUEUE_H