2 This file is a part of libcds - Concurrent Data Structures library
4 (C) Copyright Maxim Khizhinsky (libcds.dev@gmail.com) 2006-2016
6 Source code repo: http://github.com/khizmax/libcds/
7 Download: http://sourceforge.net/projects/libcds/files/
9 Redistribution and use in source and binary forms, with or without
10 modification, are permitted provided that the following conditions are met:
12 * Redistributions of source code must retain the above copyright notice, this
13 list of conditions and the following disclaimer.
15 * Redistributions in binary form must reproduce the above copyright notice,
16 this list of conditions and the following disclaimer in the documentation
17 and/or other materials provided with the distribution.
19 THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
20 AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
21 IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
22 DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE
23 FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
24 DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
25 SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
26 CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,
27 OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
28 OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
31 #ifndef CDSLIB_CONTAINER_FCQUEUE_H
32 #define CDSLIB_CONTAINER_FCQUEUE_H
34 #include <cds/algo/flat_combining.h>
35 #include <cds/algo/elimination_opt.h>
38 namespace cds { namespace container {
40 /// FCQueue related definitions
41 /** @ingroup cds_nonintrusive_helper
45 /// FCQueue internal statistics
46 template <typename Counter = cds::atomicity::event_counter >
47 struct stat: public cds::algo::flat_combining::stat<Counter>
49 typedef cds::algo::flat_combining::stat<Counter> flat_combining_stat; ///< Flat-combining statistics
50 typedef typename flat_combining_stat::counter_type counter_type; ///< Counter type
52 counter_type m_nEnqueue ; ///< Count of enqueue operations
53 counter_type m_nEnqMove ; ///< Count of enqueue operations with move semantics
54 counter_type m_nDequeue ; ///< Count of success dequeue operations
55 counter_type m_nFailedDeq ; ///< Count of failed dequeue operations (pop from empty queue)
56 counter_type m_nCollided ; ///< How many pairs of enqueue/dequeue were collided, if elimination is enabled
59 void onEnqueue() { ++m_nEnqueue; }
60 void onEnqMove() { ++m_nEnqMove; }
61 void onDequeue( bool bFailed ) { if ( bFailed ) ++m_nFailedDeq; else ++m_nDequeue; }
62 void onCollide() { ++m_nCollided; }
66 /// FCQueue dummy statistics, no overhead
67 struct empty_stat: public cds::algo::flat_combining::empty_stat
72 void onDequeue(bool) {}
77 /// FCQueue type traits
78 struct traits: public cds::algo::flat_combining::traits
80 typedef empty_stat stat; ///< Internal statistics
81 static CDS_CONSTEXPR const bool enable_elimination = false; ///< Enable \ref cds_elimination_description "elimination"
84 /// Metafunction converting option list to traits
87 - \p opt::lock_type - mutex type, default is \p cds::sync::spin
88 - \p opt::back_off - back-off strategy, defalt is \p cds::backoff::delay_of<2>
89 - \p opt::allocator - allocator type, default is \ref CDS_DEFAULT_ALLOCATOR
90 - \p opt::stat - internal statistics, possible type: \p fcqueue::stat, \p fcqueue::empty_stat (the default)
91 - \p opt::memory_model - C++ memory ordering model.
92 List of all available memory ordering see \p opt::memory_model.
93 Default is \p cds::opt::v:relaxed_ordering
94 - \p opt::enable_elimination - enable/disable operation \ref cds_elimination_description "elimination"
95 By default, the elimination is disabled. For queue, the elimination is possible if the queue
98 template <typename... Options>
100 # ifdef CDS_DOXYGEN_INVOKED
101 typedef implementation_defined type ; ///< Metafunction result
103 typedef typename cds::opt::make_options<
104 typename cds::opt::find_type_traits< traits, Options... >::type
110 } // namespace fcqueue
112 /// Flat-combining queue
114 @ingroup cds_nonintrusive_queue
115 @ingroup cds_flat_combining_container
117 \ref cds_flat_combining_description "Flat combining" sequential queue.
118 The class can be considered as a concurrent FC-based wrapper for \p std::queue.
121 - \p T - a value type stored in the queue
122 - \p Queue - sequential queue implementation, default is \p std::queue<T>
123 - \p Trats - type traits of flat combining, default is \p fcqueue::traits.
124 \p fcqueue::make_traits metafunction can be used to construct \p %fcqueue::traits specialization.
126 template <typename T,
127 class Queue = std::queue<T>,
128 typename Traits = fcqueue::traits
131 #ifndef CDS_DOXYGEN_INVOKED
132 : public cds::algo::flat_combining::container
136 typedef T value_type; ///< Value type
137 typedef Queue queue_type; ///< Sequential queue class
138 typedef Traits traits; ///< Queue type traits
140 typedef typename traits::stat stat; ///< Internal statistics type
141 static CDS_CONSTEXPR const bool c_bEliminationEnabled = traits::enable_elimination; ///< \p true if elimination is enabled
145 /// Queue operation IDs
147 op_enq = cds::algo::flat_combining::req_Operation, ///< Enqueue
148 op_enq_move, ///< Enqueue (move semantics)
154 /// Flat combining publication list record
155 struct fc_record: public cds::algo::flat_combining::publication_record
158 value_type const * pValEnq; ///< Value to enqueue
159 value_type * pValDeq; ///< Dequeue destination
161 bool bEmpty; ///< \p true if the queue is empty
165 /// Flat combining kernel
166 typedef cds::algo::flat_combining::kernel< fc_record, traits > fc_kernel;
170 fc_kernel m_FlatCombining;
175 /// Initializes empty queue object
179 /// Initializes empty queue object and gives flat combining parameters
181 unsigned int nCompactFactor ///< Flat combining: publication list compacting factor
182 ,unsigned int nCombinePassCount ///< Flat combining: number of combining passes for combiner thread
184 : m_FlatCombining( nCompactFactor, nCombinePassCount )
187 /// Inserts a new element at the end of the queue
189 The content of the new element initialized to a copy of \p val.
191 The function always returns \p true
193 bool enqueue( value_type const& val )
195 fc_record * pRec = m_FlatCombining.acquire_record();
196 pRec->pValEnq = &val;
198 if ( c_bEliminationEnabled )
199 m_FlatCombining.batch_combine( op_enq, pRec, *this );
201 m_FlatCombining.combine( op_enq, pRec, *this );
203 assert( pRec->is_done() );
204 m_FlatCombining.release_record( pRec );
205 m_FlatCombining.internal_statistics().onEnqueue();
209 /// Inserts a new element at the end of the queue (a synonym for \ref enqueue)
210 bool push( value_type const& val )
212 return enqueue( val );
215 /// Inserts a new element at the end of the queue (move semantics)
217 \p val is moved to inserted element
219 bool enqueue( value_type&& val )
221 fc_record * pRec = m_FlatCombining.acquire_record();
222 pRec->pValEnq = &val;
224 if ( c_bEliminationEnabled )
225 m_FlatCombining.batch_combine( op_enq_move, pRec, *this );
227 m_FlatCombining.combine( op_enq_move, pRec, *this );
229 assert( pRec->is_done() );
230 m_FlatCombining.release_record( pRec );
232 m_FlatCombining.internal_statistics().onEnqMove();
236 /// Inserts a new element at the end of the queue (move semantics, synonym for \p enqueue)
237 bool push( value_type&& val )
239 return enqueue( val );
242 /// Removes the next element from the queue
244 \p val takes a copy of the element
246 bool dequeue( value_type& val )
248 fc_record * pRec = m_FlatCombining.acquire_record();
249 pRec->pValDeq = &val;
251 if ( c_bEliminationEnabled )
252 m_FlatCombining.batch_combine( op_deq, pRec, *this );
254 m_FlatCombining.combine( op_deq, pRec, *this );
256 assert( pRec->is_done() );
257 m_FlatCombining.release_record( pRec );
259 m_FlatCombining.internal_statistics().onDequeue( pRec->bEmpty );
260 return !pRec->bEmpty;
263 /// Removes the next element from the queue (a synonym for \ref dequeue)
264 bool pop( value_type& val )
266 return dequeue( val );
272 fc_record * pRec = m_FlatCombining.acquire_record();
274 if ( c_bEliminationEnabled )
275 m_FlatCombining.batch_combine( op_clear, pRec, *this );
277 m_FlatCombining.combine( op_clear, pRec, *this );
279 assert( pRec->is_done() );
280 m_FlatCombining.release_record( pRec );
283 /// Returns the number of elements in the queue.
285 Note that <tt>size() == 0</tt> is not mean that the queue is empty because
286 combining record can be in process.
287 To check emptiness use \ref empty function.
291 return m_Queue.size();
294 /// Checks if the queue is empty
296 If the combining is in process the function waits while combining done.
300 fc_record * pRec = m_FlatCombining.acquire_record();
302 if ( c_bEliminationEnabled )
303 m_FlatCombining.batch_combine( op_empty, pRec, *this );
305 m_FlatCombining.combine( op_empty, pRec, *this );
307 assert( pRec->is_done() );
308 m_FlatCombining.release_record( pRec );
312 /// Internal statistics
313 stat const& statistics() const
315 return m_FlatCombining.statistics();
318 public: // flat combining cooperation, not for direct use!
320 /// Flat combining supporting function. Do not call it directly!
322 The function is called by \ref cds::algo::flat_combining::kernel "flat combining kernel"
323 object if the current thread becomes a combiner. Invocation of the function means that
324 the queue should perform an action recorded in \p pRec.
326 void fc_apply( fc_record * pRec )
330 // this function is called under FC mutex, so switch TSan off
331 CDS_TSAN_ANNOTATE_IGNORE_RW_BEGIN;
333 switch ( pRec->op() ) {
335 assert( pRec->pValEnq );
336 m_Queue.push( *(pRec->pValEnq ) );
339 assert( pRec->pValEnq );
340 m_Queue.push( std::move( *(pRec->pValEnq )) );
343 assert( pRec->pValDeq );
344 pRec->bEmpty = m_Queue.empty();
345 if ( !pRec->bEmpty ) {
346 *(pRec->pValDeq) = m_Queue.front();
351 while ( !m_Queue.empty() )
355 pRec->bEmpty = m_Queue.empty();
361 CDS_TSAN_ANNOTATE_IGNORE_RW_END;
364 /// Batch-processing flat combining
365 void fc_process( typename fc_kernel::iterator itBegin, typename fc_kernel::iterator itEnd )
367 typedef typename fc_kernel::iterator fc_iterator;
369 // this function is called under FC mutex, so switch TSan off
370 CDS_TSAN_ANNOTATE_IGNORE_RW_BEGIN;
372 for ( fc_iterator it = itBegin, itPrev = itEnd; it != itEnd; ++it ) {
373 switch ( it->op() ) {
377 if ( m_Queue.empty() ) {
378 if ( itPrev != itEnd && collide( *itPrev, *it ))
386 CDS_TSAN_ANNOTATE_IGNORE_RW_END;
392 bool collide( fc_record& rec1, fc_record& rec2 )
394 switch ( rec1.op() ) {
396 if ( rec2.op() == op_deq ) {
397 assert(rec1.pValEnq);
398 assert(rec2.pValDeq);
399 *rec2.pValDeq = *rec1.pValEnq;
405 if ( rec2.op() == op_deq ) {
406 assert(rec1.pValEnq);
407 assert(rec2.pValDeq);
408 *rec2.pValDeq = std::move( *rec1.pValEnq );
414 switch ( rec2.op() ) {
417 return collide( rec2, rec1 );
423 m_FlatCombining.operation_done( rec1 );
424 m_FlatCombining.operation_done( rec2 );
425 m_FlatCombining.internal_statistics().onCollide();
431 }} // namespace cds::container
433 #endif // #ifndef CDSLIB_CONTAINER_FCQUEUE_H