2 This file is a part of libcds - Concurrent Data Structures library
4 (C) Copyright Maxim Khizhinsky (libcds.dev@gmail.com) 2006-2016
6 Source code repo: http://github.com/khizmax/libcds/
7 Download: http://sourceforge.net/projects/libcds/files/
9 Redistribution and use in source and binary forms, with or without
10 modification, are permitted provided that the following conditions are met:
12 * Redistributions of source code must retain the above copyright notice, this
13 list of conditions and the following disclaimer.
15 * Redistributions in binary form must reproduce the above copyright notice,
16 this list of conditions and the following disclaimer in the documentation
17 and/or other materials provided with the distribution.
19 THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
20 AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
21 IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
22 DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE
23 FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
24 DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
25 SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
26 CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,
27 OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
28 OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
31 #ifndef CDSLIB_CONTAINER_FCQUEUE_H
32 #define CDSLIB_CONTAINER_FCQUEUE_H
34 #include <cds/algo/flat_combining.h>
35 #include <cds/algo/elimination_opt.h>
38 namespace cds { namespace container {
40 /// FCQueue related definitions
41 /** @ingroup cds_nonintrusive_helper
45 /// FCQueue internal statistics
46 template <typename Counter = cds::atomicity::event_counter >
47 struct stat: public cds::algo::flat_combining::stat<Counter>
49 typedef cds::algo::flat_combining::stat<Counter> flat_combining_stat; ///< Flat-combining statistics
50 typedef typename flat_combining_stat::counter_type counter_type; ///< Counter type
52 counter_type m_nEnqueue ; ///< Count of enqueue operations
53 counter_type m_nEnqMove ; ///< Count of enqueue operations with move semantics
54 counter_type m_nDequeue ; ///< Count of success dequeue operations
55 counter_type m_nFailedDeq ; ///< Count of failed dequeue operations (pop from empty queue)
56 counter_type m_nCollided ; ///< How many pairs of enqueue/dequeue were collided, if elimination is enabled
59 void onEnqueue() { ++m_nEnqueue; }
60 void onEnqMove() { ++m_nEnqMove; }
61 void onDequeue( bool bFailed ) { if ( bFailed ) ++m_nFailedDeq; else ++m_nDequeue; }
62 void onCollide() { ++m_nCollided; }
66 /// FCQueue dummy statistics, no overhead
67 struct empty_stat: public cds::algo::flat_combining::empty_stat
72 void onDequeue(bool) {}
77 /// FCQueue type traits
78 struct traits: public cds::algo::flat_combining::traits
80 typedef empty_stat stat; ///< Internal statistics
81 static CDS_CONSTEXPR const bool enable_elimination = false; ///< Enable \ref cds_elimination_description "elimination"
84 /// Metafunction converting option list to traits
87 - any \p cds::algo::flat_combining::make_traits options
88 - \p opt::stat - internal statistics, possible type: \p fcqueue::stat, \p fcqueue::empty_stat (the default)
89 - \p opt::enable_elimination - enable/disable operation \ref cds_elimination_description "elimination"
90 By default, the elimination is disabled. For queue, the elimination is possible if the queue
93 template <typename... Options>
95 # ifdef CDS_DOXYGEN_INVOKED
96 typedef implementation_defined type ; ///< Metafunction result
98 typedef typename cds::opt::make_options<
99 typename cds::opt::find_type_traits< traits, Options... >::type
105 } // namespace fcqueue
107 /// Flat-combining queue
109 @ingroup cds_nonintrusive_queue
110 @ingroup cds_flat_combining_container
112 \ref cds_flat_combining_description "Flat combining" sequential queue.
113 The class can be considered as a concurrent FC-based wrapper for \p std::queue.
116 - \p T - a value type stored in the queue
117 - \p Queue - sequential queue implementation, default is \p std::queue<T>
118 - \p Trats - type traits of flat combining, default is \p fcqueue::traits.
119 \p fcqueue::make_traits metafunction can be used to construct \p %fcqueue::traits specialization.
121 template <typename T,
122 class Queue = std::queue<T>,
123 typename Traits = fcqueue::traits
126 #ifndef CDS_DOXYGEN_INVOKED
127 : public cds::algo::flat_combining::container
131 typedef T value_type; ///< Value type
132 typedef Queue queue_type; ///< Sequential queue class
133 typedef Traits traits; ///< Queue type traits
135 typedef typename traits::stat stat; ///< Internal statistics type
136 static CDS_CONSTEXPR const bool c_bEliminationEnabled = traits::enable_elimination; ///< \p true if elimination is enabled
140 /// Queue operation IDs
142 op_enq = cds::algo::flat_combining::req_Operation, ///< Enqueue
143 op_enq_move, ///< Enqueue (move semantics)
148 /// Flat combining publication list record
149 struct fc_record: public cds::algo::flat_combining::publication_record
152 value_type const * pValEnq; ///< Value to enqueue
153 value_type * pValDeq; ///< Dequeue destination
155 bool bEmpty; ///< \p true if the queue is empty
159 /// Flat combining kernel
160 typedef cds::algo::flat_combining::kernel< fc_record, traits > fc_kernel;
164 mutable fc_kernel m_FlatCombining;
169 /// Initializes empty queue object
173 /// Initializes empty queue object and gives flat combining parameters
175 unsigned int nCompactFactor ///< Flat combining: publication list compacting factor
176 ,unsigned int nCombinePassCount ///< Flat combining: number of combining passes for combiner thread
178 : m_FlatCombining( nCompactFactor, nCombinePassCount )
181 /// Inserts a new element at the end of the queue
183 The content of the new element initialized to a copy of \p val.
185 The function always returns \p true
187 bool enqueue( value_type const& val )
189 auto pRec = m_FlatCombining.acquire_record();
190 pRec->pValEnq = &val;
192 if ( c_bEliminationEnabled )
193 m_FlatCombining.batch_combine( op_enq, pRec, *this );
195 m_FlatCombining.combine( op_enq, pRec, *this );
197 assert( pRec->is_done());
198 m_FlatCombining.release_record( pRec );
199 m_FlatCombining.internal_statistics().onEnqueue();
203 /// Inserts a new element at the end of the queue (a synonym for \ref enqueue)
204 bool push( value_type const& val )
206 return enqueue( val );
209 /// Inserts a new element at the end of the queue (move semantics)
211 \p val is moved to inserted element
213 bool enqueue( value_type&& val )
215 auto pRec = m_FlatCombining.acquire_record();
216 pRec->pValEnq = &val;
218 if ( c_bEliminationEnabled )
219 m_FlatCombining.batch_combine( op_enq_move, pRec, *this );
221 m_FlatCombining.combine( op_enq_move, pRec, *this );
223 assert( pRec->is_done());
224 m_FlatCombining.release_record( pRec );
226 m_FlatCombining.internal_statistics().onEnqMove();
230 /// Inserts a new element at the end of the queue (move semantics, synonym for \p enqueue)
231 bool push( value_type&& val )
233 return enqueue( val );
236 /// Removes the next element from the queue
238 \p val takes a copy of the element
240 bool dequeue( value_type& val )
242 auto pRec = m_FlatCombining.acquire_record();
243 pRec->pValDeq = &val;
245 if ( c_bEliminationEnabled )
246 m_FlatCombining.batch_combine( op_deq, pRec, *this );
248 m_FlatCombining.combine( op_deq, pRec, *this );
250 assert( pRec->is_done());
251 m_FlatCombining.release_record( pRec );
253 m_FlatCombining.internal_statistics().onDequeue( pRec->bEmpty );
254 return !pRec->bEmpty;
257 /// Removes the next element from the queue (a synonym for \ref dequeue)
258 bool pop( value_type& val )
260 return dequeue( val );
266 auto pRec = m_FlatCombining.acquire_record();
268 if ( c_bEliminationEnabled )
269 m_FlatCombining.batch_combine( op_clear, pRec, *this );
271 m_FlatCombining.combine( op_clear, pRec, *this );
273 assert( pRec->is_done());
274 m_FlatCombining.release_record( pRec );
277 /// Returns the number of elements in the queue.
279 Note that <tt>size() == 0</tt> is not mean that the queue is empty because
280 combining record can be in process.
281 To check emptiness use \ref empty function.
285 return m_Queue.size();
288 /// Checks if the queue is empty
290 If the combining is in process the function waits while combining done.
295 auto const& queue = m_Queue;
296 m_FlatCombining.invoke_exclusive( [&queue, &bRet]() { bRet = queue.empty(); } );
300 /// Internal statistics
301 stat const& statistics() const
303 return m_FlatCombining.statistics();
306 public: // flat combining cooperation, not for direct use!
308 /// Flat combining supporting function. Do not call it directly!
310 The function is called by \ref cds::algo::flat_combining::kernel "flat combining kernel"
311 object if the current thread becomes a combiner. Invocation of the function means that
312 the queue should perform an action recorded in \p pRec.
314 void fc_apply( fc_record * pRec )
318 // this function is called under FC mutex, so switch TSan off
319 CDS_TSAN_ANNOTATE_IGNORE_RW_BEGIN;
321 switch ( pRec->op()) {
323 assert( pRec->pValEnq );
324 m_Queue.push( *(pRec->pValEnq ));
327 assert( pRec->pValEnq );
328 m_Queue.push( std::move( *(pRec->pValEnq )));
331 assert( pRec->pValDeq );
332 pRec->bEmpty = m_Queue.empty();
333 if ( !pRec->bEmpty ) {
334 *(pRec->pValDeq) = std::move( m_Queue.front());
339 while ( !m_Queue.empty())
346 CDS_TSAN_ANNOTATE_IGNORE_RW_END;
349 /// Batch-processing flat combining
350 void fc_process( typename fc_kernel::iterator itBegin, typename fc_kernel::iterator itEnd )
352 typedef typename fc_kernel::iterator fc_iterator;
354 // this function is called under FC mutex, so switch TSan off
355 CDS_TSAN_ANNOTATE_IGNORE_RW_BEGIN;
357 for ( fc_iterator it = itBegin, itPrev = itEnd; it != itEnd; ++it ) {
362 if ( m_Queue.empty()) {
363 if ( itPrev != itEnd && collide( *itPrev, *it ))
371 CDS_TSAN_ANNOTATE_IGNORE_RW_END;
377 bool collide( fc_record& rec1, fc_record& rec2 )
379 switch ( rec1.op()) {
381 if ( rec2.op() == op_deq ) {
382 assert(rec1.pValEnq);
383 assert(rec2.pValDeq);
384 *rec2.pValDeq = *rec1.pValEnq;
390 if ( rec2.op() == op_deq ) {
391 assert(rec1.pValEnq);
392 assert(rec2.pValDeq);
393 *rec2.pValDeq = std::move( *rec1.pValEnq );
399 switch ( rec2.op()) {
402 return collide( rec2, rec1 );
408 m_FlatCombining.operation_done( rec1 );
409 m_FlatCombining.operation_done( rec2 );
410 m_FlatCombining.internal_statistics().onCollide();
416 }} // namespace cds::container
418 #endif // #ifndef CDSLIB_CONTAINER_FCQUEUE_H