3 #ifndef __CDS_INTRUSIVE_FCQUEUE_H
4 #define __CDS_INTRUSIVE_FCQUEUE_H
6 #include <cds/algo/flat_combining.h>
7 #include <cds/algo/elimination_opt.h>
8 #include <cds/intrusive/options.h>
9 #include <boost/intrusive/list.hpp>
11 namespace cds { namespace intrusive {
13 /// FCQueue related definitions
16 /// FCQueue internal statistics
17 template <typename Counter = cds::atomicity::event_counter >
18 struct stat: public cds::algo::flat_combining::stat<Counter>
20 typedef cds::algo::flat_combining::stat<Counter> flat_combining_stat; ///< Flat-combining statistics
21 typedef typename flat_combining_stat::counter_type counter_type; ///< Counter type
23 counter_type m_nEnqueue ; ///< Count of push operations
24 counter_type m_nDequeue ; ///< Count of success pop operations
25 counter_type m_nFailedDeq ; ///< Count of failed pop operations (pop from empty queue)
26 counter_type m_nCollided ; ///< How many pairs of push/pop were collided, if elimination is enabled
29 void onEnqueue() { ++m_nEnqueue; }
30 void onDequeue( bool bFailed ) { if ( bFailed ) ++m_nFailedDeq; else ++m_nDequeue; }
31 void onCollide() { ++m_nCollided; }
35 /// FCQueue dummy statistics, no overhead
36 struct empty_stat: public cds::algo::flat_combining::empty_stat
40 void onDequeue(bool) {}
45 /// FCQueue type traits
46 struct type_traits: public cds::algo::flat_combining::type_traits
48 typedef cds::intrusive::opt::v::empty_disposer disposer ; ///< Disposer to erase removed elements. Used only in \p FCQueue::clear() function
49 typedef empty_stat stat; ///< Internal statistics
50 static CDS_CONSTEXPR_CONST bool enable_elimination = false; ///< Enable \ref cds_elimination_description "elimination"
53 /// Metafunction converting option list to traits
55 This is a wrapper for <tt> cds::opt::make_options< type_traits, Options...> </tt>
57 - \p opt::lock_type - mutex type, default is \p cds::lock::Spin
58 - \p opt::back_off - back-off strategy, defalt is \p cds::backoff::Default
59 - \p opt::disposer - the functor used for dispose removed items. Default is opt::intrusive::v::empty_disposer.
60 This option is used only in \p FCQueue::clear() function.
61 - \p opt::allocator - allocator type, default is \ref CDS_DEFAULT_ALLOCATOR
62 - \p opt::stat - internal statistics, possible type: \ref stat, \ref empty_stat (the default)
63 - \p opt::memory_model - C++ memory ordering model.
64 List of all available memory ordering see opt::memory_model.
65 Default if cds::opt::v:relaxed_ordering
66 - \p opt::enable_elimination - enable/disable operation \ref cds_elimination_description "elimination"
67 By default, the elimination is disabled.
69 template <typename... Options>
71 # ifdef CDS_DOXYGEN_INVOKED
72 typedef implementation_defined type ; ///< Metafunction result
74 typedef typename cds::opt::make_options<
75 typename cds::opt::find_type_traits< type_traits, Options... >::type
80 } // namespace fcqueue
82 /// Flat-combining intrusive queue
84 @ingroup cds_intrusive_queue
85 @ingroup cds_flat_combining_intrusive
87 \ref cds_flat_combining_description "Flat combining" sequential intrusive queue.
90 - \p T - a value type stored in the queue
91 - \p Container - sequential intrusive container with \p push_back and \p pop_front functions.
92 Default is \p boost::intrusive::list
93 - \p Traits - type traits of flat combining, default is \p fcqueue::type_traits.
94 \p fcqueue::make_traits metafunction can be used to construct specialized \p %type_traits
97 ,class Container = boost::intrusive::list<T>
98 ,typename Traits = fcqueue::type_traits
101 #ifndef CDS_DOXYGEN_INVOKED
102 : public cds::algo::flat_combining::container
106 typedef T value_type; ///< Value type
107 typedef Container container_type; ///< Sequential container type
108 typedef Traits type_traits; ///< Queue type traits
110 typedef typename type_traits::disposer disposer; ///< The disposer functor. The disposer is used only in \ref clear() function
111 typedef typename type_traits::stat stat; ///< Internal statistics type
112 static CDS_CONSTEXPR_CONST bool c_bEliminationEnabled = type_traits::enable_elimination; ///< \p true if elimination is enabled
116 /// Queue operation IDs
118 op_enq = cds::algo::flat_combining::req_Operation, ///< Enqueue
121 op_clear_and_dispose ///< Clear and dispose
124 /// Flat combining publication list record
125 struct fc_record: public cds::algo::flat_combining::publication_record
127 value_type * pVal; ///< Value to enqueue or dequeue
128 bool bEmpty; ///< \p true if the queue is empty
132 /// Flat combining kernel
133 typedef cds::algo::flat_combining::kernel< fc_record, type_traits > fc_kernel;
137 fc_kernel m_FlatCombining;
138 container_type m_Queue;
142 /// Initializes empty queue object
146 /// Initializes empty queue object and gives flat combining parameters
148 unsigned int nCompactFactor ///< Flat combining: publication list compacting factor
149 ,unsigned int nCombinePassCount ///< Flat combining: number of combining passes for combiner thread
151 : m_FlatCombining( nCompactFactor, nCombinePassCount )
154 /// Inserts a new element at the end of the queue
156 The function always returns \p true.
158 bool enqueue( value_type& val )
160 fc_record * pRec = m_FlatCombining.acquire_record();
163 if ( c_bEliminationEnabled )
164 m_FlatCombining.batch_combine( op_enq, pRec, *this );
166 m_FlatCombining.combine( op_enq, pRec, *this );
168 assert( pRec->is_done() );
169 m_FlatCombining.release_record( pRec );
170 m_FlatCombining.internal_statistics().onEnqueue();
174 /// Inserts a new element at the end of the queue (a synonym for \ref enqueue)
175 bool push( value_type& val )
177 return enqueue( val );
180 /// Removes the next element from the queue
182 If the queue is empty the function returns \p nullptr
184 value_type * dequeue()
186 fc_record * pRec = m_FlatCombining.acquire_record();
187 pRec->pVal = nullptr;
189 if ( c_bEliminationEnabled )
190 m_FlatCombining.batch_combine( op_deq, pRec, *this );
192 m_FlatCombining.combine( op_deq, pRec, *this );
194 assert( pRec->is_done() );
195 m_FlatCombining.release_record( pRec );
197 m_FlatCombining.internal_statistics().onDequeue( pRec->bEmpty );
201 /// Removes the next element from the queue (a synonym for \ref dequeue)
209 If \p bDispose is \p true, the disposer provided in \p Traits class' template parameter
210 will be called for each removed element.
212 void clear( bool bDispose = false )
214 fc_record * pRec = m_FlatCombining.acquire_record();
216 if ( c_bEliminationEnabled )
217 m_FlatCombining.batch_combine( bDispose ? op_clear_and_dispose : op_clear, pRec, *this );
219 m_FlatCombining.combine( bDispose ? op_clear_and_dispose : op_clear, pRec, *this );
221 assert( pRec->is_done() );
222 m_FlatCombining.release_record( pRec );
225 /// Returns the number of elements in the queue.
227 Note that <tt>size() == 0</tt> is not mean that the queue is empty because
228 combining record can be in process.
229 To check emptiness use \ref empty function.
233 return m_Queue.size();
236 /// Checks if the queue is empty
238 If the combining is in process the function waits while it is done.
242 m_FlatCombining.wait_while_combining();
243 return m_Queue.empty();
246 /// Internal statistics
247 stat const& statistics() const
249 return m_FlatCombining.statistics();
252 public: // flat combining cooperation, not for direct use!
254 /// Flat combining supporting function. Do not call it directly!
256 The function is called by \ref cds::algo::flat_combining::kernel "flat combining kernel"
257 object if the current thread becomes a combiner. Invocation of the function means that
258 the queue should perform an action recorded in \p pRec.
260 void fc_apply( fc_record * pRec )
264 switch ( pRec->op() ) {
266 assert( pRec->pVal );
267 m_Queue.push_back( *(pRec->pVal ) );
270 pRec->bEmpty = m_Queue.empty();
271 if ( !pRec->bEmpty ) {
272 pRec->pVal = &m_Queue.front();
279 case op_clear_and_dispose:
280 m_Queue.clear_and_dispose( disposer() );
288 /// Batch-processing flat combining
289 void fc_process( typename fc_kernel::iterator itBegin, typename fc_kernel::iterator itEnd )
291 typedef typename fc_kernel::iterator fc_iterator;
292 for ( fc_iterator it = itBegin, itPrev = itEnd; it != itEnd; ++it ) {
293 switch ( it->op() ) {
296 if ( m_Queue.empty() ) {
297 if ( itPrev != itEnd && collide( *itPrev, *it ))
310 bool collide( fc_record& rec1, fc_record& rec2 )
312 assert( m_Queue.empty() );
314 switch ( rec1.op() ) {
316 if ( rec2.op() == op_deq ) {
318 rec2.pVal = rec1.pVal;
320 m_FlatCombining.operation_done( rec1 );
321 m_FlatCombining.operation_done( rec2 );
322 m_FlatCombining.internal_statistics().onCollide();
327 if ( rec2.op() == op_enq ) {
329 rec1.pVal = rec2.pVal;
331 m_FlatCombining.operation_done( rec1 );
332 m_FlatCombining.operation_done( rec2 );
333 m_FlatCombining.internal_statistics().onCollide();
343 }} // namespace cds::intrusive
345 #endif // #ifndef __CDS_INTRUSIVE_FCQUEUE_H