2 This file is a part of libcds - Concurrent Data Structures library
4 (C) Copyright Maxim Khizhinsky (libcds.dev@gmail.com) 2006-2016
6 Source code repo: http://github.com/khizmax/libcds/
7 Download: http://sourceforge.net/projects/libcds/files/
9 Redistribution and use in source and binary forms, with or without
10 modification, are permitted provided that the following conditions are met:
12 * Redistributions of source code must retain the above copyright notice, this
13 list of conditions and the following disclaimer.
15 * Redistributions in binary form must reproduce the above copyright notice,
16 this list of conditions and the following disclaimer in the documentation
17 and/or other materials provided with the distribution.
19 THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
20 AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
21 IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
22 DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE
23 FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
24 DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
25 SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
26 CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,
27 OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
28 OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
31 #ifndef CDSLIB_INTRUSIVE_FCQUEUE_H
32 #define CDSLIB_INTRUSIVE_FCQUEUE_H
34 #include <cds/algo/flat_combining.h>
35 #include <cds/algo/elimination_opt.h>
36 #include <cds/intrusive/options.h>
37 #include <boost/intrusive/list.hpp>
39 namespace cds { namespace intrusive {
41 /// \p FCQueue related definitions
44 /// \p FCQueue internal statistics
45 template <typename Counter = cds::atomicity::event_counter >
46 struct stat: public cds::algo::flat_combining::stat<Counter>
48 typedef cds::algo::flat_combining::stat<Counter> flat_combining_stat; ///< Flat-combining statistics
49 typedef typename flat_combining_stat::counter_type counter_type; ///< Counter type
51 counter_type m_nEnqueue ; ///< Count of push operations
52 counter_type m_nDequeue ; ///< Count of success pop operations
53 counter_type m_nFailedDeq ; ///< Count of failed pop operations (pop from empty queue)
54 counter_type m_nCollided ; ///< How many pairs of push/pop were collided, if elimination is enabled
57 void onEnqueue() { ++m_nEnqueue; }
58 void onDequeue( bool bFailed ) { if ( bFailed ) ++m_nFailedDeq; else ++m_nDequeue; }
59 void onCollide() { ++m_nCollided; }
63 /// FCQueue dummy statistics, no overhead
64 struct empty_stat: public cds::algo::flat_combining::empty_stat
68 void onDequeue(bool) {}
73 /// \p FCQueue type traits
74 struct traits: public cds::algo::flat_combining::traits
76 typedef cds::intrusive::opt::v::empty_disposer disposer ; ///< Disposer to erase removed elements. Used only in \p FCQueue::clear() function
77 typedef empty_stat stat; ///< Internal statistics
78 static CDS_CONSTEXPR const bool enable_elimination = false; ///< Enable \ref cds_elimination_description "elimination"
81 /// Metafunction converting option list to traits
84 - any \p cds::algo::flat_combining::make_traits options
85 - \p opt::disposer - the functor used to dispose removed items. Default is \p opt::intrusive::v::empty_disposer.
86 This option is used only in \p FCQueue::clear() function.
87 - \p opt::stat - internal statistics, possible type: \p fcqueue::stat, \p fcqueue::empty_stat (the default)
88 - \p opt::enable_elimination - enable/disable operation \ref cds_elimination_description "elimination"
89 By default, the elimination is disabled (\p false)
91 template <typename... Options>
93 # ifdef CDS_DOXYGEN_INVOKED
94 typedef implementation_defined type ; ///< Metafunction result
96 typedef typename cds::opt::make_options<
97 typename cds::opt::find_type_traits< traits, Options... >::type
102 } // namespace fcqueue
104 /// Flat-combining intrusive queue
106 @ingroup cds_intrusive_queue
107 @ingroup cds_flat_combining_intrusive
109 \ref cds_flat_combining_description "Flat combining" sequential intrusive queue.
112 - \p T - a value type stored in the queue
113 - \p Container - sequential intrusive container with \p push_back and \p pop_front functions.
114 Default is \p boost::intrusive::list
115 - \p Traits - type traits of flat combining, default is \p fcqueue::traits.
116 \p fcqueue::make_traits metafunction can be used to construct \p %fcqueue::traits specialization
119 ,class Container = boost::intrusive::list<T>
120 ,typename Traits = fcqueue::traits
123 #ifndef CDS_DOXYGEN_INVOKED
124 : public cds::algo::flat_combining::container
128 typedef T value_type; ///< Value type
129 typedef Container container_type; ///< Sequential container type
130 typedef Traits traits; ///< Queue traits
132 typedef typename traits::disposer disposer; ///< The disposer functor. The disposer is used only in \ref clear() function
133 typedef typename traits::stat stat; ///< Internal statistics type
134 static CDS_CONSTEXPR const bool c_bEliminationEnabled = traits::enable_elimination; ///< \p true if elimination is enabled
138 /// Queue operation IDs
140 op_enq = cds::algo::flat_combining::req_Operation, ///< Enqueue
143 op_clear_and_dispose ///< Clear and dispose
146 /// Flat combining publication list record
147 struct fc_record: public cds::algo::flat_combining::publication_record
149 value_type * pVal; ///< Value to enqueue or dequeue
150 bool bEmpty; ///< \p true if the queue is empty
154 /// Flat combining kernel
155 typedef cds::algo::flat_combining::kernel< fc_record, traits > fc_kernel;
159 mutable fc_kernel m_FlatCombining;
160 container_type m_Queue;
164 /// Initializes empty queue object
168 /// Initializes empty queue object and gives flat combining parameters
170 unsigned int nCompactFactor ///< Flat combining: publication list compacting factor
171 ,unsigned int nCombinePassCount ///< Flat combining: number of combining passes for combiner thread
173 : m_FlatCombining( nCompactFactor, nCombinePassCount )
176 /// Inserts a new element at the end of the queue
178 The function always returns \p true.
180 bool enqueue( value_type& val )
182 auto pRec = m_FlatCombining.acquire_record();
185 if ( c_bEliminationEnabled )
186 m_FlatCombining.batch_combine( op_enq, pRec, *this );
188 m_FlatCombining.combine( op_enq, pRec, *this );
190 assert( pRec->is_done() );
191 m_FlatCombining.release_record( pRec );
192 m_FlatCombining.internal_statistics().onEnqueue();
196 /// Inserts a new element at the end of the queue (a synonym for \ref enqueue)
197 bool push( value_type& val )
199 return enqueue( val );
202 /// Removes the next element from the queue
204 If the queue is empty the function returns \p nullptr
206 value_type * dequeue()
208 auto pRec = m_FlatCombining.acquire_record();
209 pRec->pVal = nullptr;
211 if ( c_bEliminationEnabled )
212 m_FlatCombining.batch_combine( op_deq, pRec, *this );
214 m_FlatCombining.combine( op_deq, pRec, *this );
216 assert( pRec->is_done() );
217 m_FlatCombining.release_record( pRec );
219 m_FlatCombining.internal_statistics().onDequeue( pRec->bEmpty );
223 /// Removes the next element from the queue (a synonym for \ref dequeue)
231 If \p bDispose is \p true, the disposer provided in \p Traits class' template parameter
232 will be called for each removed element.
234 void clear( bool bDispose = false )
236 auto pRec = m_FlatCombining.acquire_record();
238 if ( c_bEliminationEnabled )
239 m_FlatCombining.batch_combine( bDispose ? op_clear_and_dispose : op_clear, pRec, *this );
241 m_FlatCombining.combine( bDispose ? op_clear_and_dispose : op_clear, pRec, *this );
243 assert( pRec->is_done() );
244 m_FlatCombining.release_record( pRec );
247 /// Returns the number of elements in the queue.
249 Note that <tt>size() == 0</tt> is not mean that the queue is empty because
250 combining record can be in process.
251 To check emptiness use \ref empty function.
255 return m_Queue.size();
258 /// Checks if the queue is empty
260 If the combining is in process the function waits while it is done.
265 auto const& queue = m_Queue;
266 m_FlatCombining.invoke_exclusive([&queue, &bRet]() { bRet = queue.empty(); });
270 /// Internal statistics
271 stat const& statistics() const
273 return m_FlatCombining.statistics();
276 public: // flat combining cooperation, not for direct use!
278 /// Flat combining supporting function. Do not call it directly!
280 The function is called by \ref cds::algo::flat_combining::kernel "flat combining kernel"
281 object if the current thread becomes a combiner. Invocation of the function means that
282 the queue should perform an action recorded in \p pRec.
284 void fc_apply( fc_record * pRec )
288 // this function is called under FC mutex, so switch TSan off
289 // All TSan warnings are false positive
290 CDS_TSAN_ANNOTATE_IGNORE_RW_BEGIN;
292 switch ( pRec->op() ) {
294 assert( pRec->pVal );
295 m_Queue.push_back( *(pRec->pVal ) );
298 pRec->bEmpty = m_Queue.empty();
299 if ( !pRec->bEmpty ) {
300 pRec->pVal = &m_Queue.front();
307 case op_clear_and_dispose:
308 m_Queue.clear_and_dispose( disposer() );
314 CDS_TSAN_ANNOTATE_IGNORE_RW_END;
317 /// Batch-processing flat combining
318 void fc_process( typename fc_kernel::iterator itBegin, typename fc_kernel::iterator itEnd )
320 // this function is called under FC mutex, so switch TSan off
321 // All TSan warnings are false positive
322 CDS_TSAN_ANNOTATE_IGNORE_RW_BEGIN;
324 typedef typename fc_kernel::iterator fc_iterator;
325 for ( fc_iterator it = itBegin, itPrev = itEnd; it != itEnd; ++it ) {
326 switch ( it->op() ) {
329 if ( m_Queue.empty() ) {
330 if ( itPrev != itEnd && collide( *itPrev, *it ))
338 CDS_TSAN_ANNOTATE_IGNORE_RW_END;
344 bool collide( fc_record& rec1, fc_record& rec2 )
346 assert( m_Queue.empty() );
348 switch ( rec1.op() ) {
350 if ( rec2.op() == op_deq ) {
352 rec2.pVal = rec1.pVal;
354 m_FlatCombining.operation_done( rec1 );
355 m_FlatCombining.operation_done( rec2 );
356 m_FlatCombining.internal_statistics().onCollide();
361 if ( rec2.op() == op_enq ) {
363 rec1.pVal = rec2.pVal;
365 m_FlatCombining.operation_done( rec1 );
366 m_FlatCombining.operation_done( rec2 );
367 m_FlatCombining.internal_statistics().onCollide();
377 }} // namespace cds::intrusive
379 #endif // #ifndef CDSLIB_INTRUSIVE_FCQUEUE_H