3 #ifndef CDSLIB_CONTAINER_VYUKOV_MPMC_CYCLE_QUEUE_H
4 #define CDSLIB_CONTAINER_VYUKOV_MPMC_CYCLE_QUEUE_H
6 #include <cds/container/details/base.h>
7 #include <cds/opt/buffer.h>
8 #include <cds/opt/value_cleaner.h>
9 #include <cds/algo/atomic.h>
10 #include <cds/details/bounded_container.h>
12 namespace cds { namespace container {
14 /// VyukovMPMCCycleQueue related definitions
15 /** @ingroup cds_nonintrusive_helper
17 namespace vyukov_queue {
19 /// VyukovMPMCCycleQueue default traits
21 /// Buffer type for internal array
23 The type of element for the buffer is not important: the queue rebinds
24 buffer for required type via \p rebind metafunction.
26 For \p VyukovMPMCCycleQueue queue the buffer size should have power-of-2 size.
28 typedef cds::opt::v::dynamic_buffer< void * > buffer;
30 /// A functor to clean item dequeued.
32 The functor calls the destructor for queue item.
33 After an item is dequeued, \p value_cleaner cleans the cell that the item has been occupied.
34 If \p T is a complex type, \p value_cleaner may be the useful feature.
36 Default value is \ref opt::v::destruct_cleaner
38 typedef cds::opt::v::destruct_cleaner value_cleaner;
40 /// Item counting feature; by default, disabled. Use \p cds::atomicity::item_counter to enable item counting
41 typedef cds::atomicity::empty_item_counter item_counter;
43 /// C++ memory ordering model
45 Can be \p opt::v::relaxed_ordering (relaxed memory model, the default)
46 or \p opt::v::sequential_consistent (sequentially consisnent memory model).
48 typedef opt::v::relaxed_ordering memory_model;
50 /// Padding for internal critical atomic data. Default is \p opt::cache_line_padding
51 enum { padding = opt::cache_line_padding };
54 typedef cds::backoff::Default back_off;
56 /// Single-consumer version
58 For single-consumer version of algorithm some additional functions
59 (\p front(), \p pop_front()) is available.
63 static CDS_CONSTEXPR bool const single_consumer = false;
66 /// Metafunction converting option list to \p vyukov_queue::traits
68 Supported \p Options are:
69 - \p opt::buffer - the buffer type for internal cyclic array. Possible types are:
70 \p opt::v::dynamic_buffer (the default), \p opt::v::static_buffer. The type of
71 element in the buffer is not important: it will be changed via \p rebind metafunction.
72 - \p opt::value_cleaner - a functor to clean item dequeued.
73 The functor calls the destructor for queue item.
74 After an item is dequeued, \p value_cleaner cleans the cell that the item has been occupied.
75 If \p T is a complex type, \p value_cleaner can be an useful feature.
76 Default value is \ref opt::v::destruct_cleaner
77 - \p opt::back_off - back-off strategy used. If the option is not specified, the \p cds::backoff::Default is used.
78 - \p opt::item_counter - the type of item counting feature. Default is \p cds::atomicity::empty_item_counter (item counting disabled)
79 To enable item counting use \p cds::atomicity::item_counter
80 - \p opt::padding - padding for internal critical atomic data. Default is \p opt::cache_line_padding
81 - \p opt::memory_model - C++ memory ordering model. Can be \p opt::v::relaxed_ordering (relaxed memory model, the default)
82 or \p opt::v::sequential_consistent (sequentially consisnent memory model).
84 Example: declare \p %VyukovMPMCCycleQueue with item counting and static iternal buffer of size 1024:
86 typedef cds::container::VyukovMPMCCycleQueue< Foo,
87 typename cds::container::vyukov_queue::make_traits<
88 cds::opt::buffer< cds::opt::v::static_buffer< void *, 1024 >,
89 cds::opt::item_counte< cds::atomicity::item_counter >
94 template <typename... Options>
96 # ifdef CDS_DOXYGEN_INVOKED
97 typedef implementation_defined type; ///< Metafunction result
99 typedef typename cds::opt::make_options<
100 typename cds::opt::find_type_traits< traits, Options... >::type
106 } //namespace vyukov_queue
108 /// Vyukov's MPMC bounded queue
109 /** @ingroup cds_nonintrusive_queue
110 This algorithm is developed by Dmitry Vyukov (see http://www.1024cores.net)
111 It's multi-producer multi-consumer (MPMC), array-based, fails on overflow, does not require GC, w/o priorities, causal FIFO,
112 blocking producers and consumers queue. The algorithm is pretty simple and fast. It's not lock-free in the official meaning,
113 just implemented by means of atomic RMW operations w/o mutexes.
115 The cost of enqueue/dequeue is 1 CAS per operation.
116 No dynamic memory allocation/management during operation. Producers and consumers are separated from each other (as in the two-lock queue),
117 i.e. do not touch the same data while queue is not empty.
119 There is multiple producer/single consumer version \p cds::container::VyukovMPSCCycleQueue
120 that supports \p front() and \p pop_front() functions.
123 - http://www.1024cores.net/home/lock-free-algorithms/queues/bounded-mpmc-queue
126 - \p T - type stored in queue.
127 - \p Traits - queue traits, default is \p vykov_queue::traits. You can use \p vykov_queue::make_traits
128 metafunction to make your traits or just derive your traits from \p %vykov_queue::traits:
130 struct myTraits: public cds::container::vykov_queue::traits {
131 typedef cds::atomicity::item_counter item_counter;
133 typedef cds::container::VyukovMPMCCycleQueue< Foo, myTraits > myQueue;
135 // Equivalent make_traits example:
136 typedef cds::container::VyukovMPMCCycleQueue< cds::gc::HP, Foo,
137 typename cds::container::vykov_queue::make_traits<
138 cds::opt::item_counter< cds::atomicity::item_counter >
144 Simplified BSD license by Dmitry Vyukov (http://www.1024cores.net/site/1024cores/home/code-license)
146 template <typename T, typename Traits = vyukov_queue::traits >
147 class VyukovMPMCCycleQueue : public cds::bounded_container
150 typedef T value_type; ///< Value type to be stored in the queue
151 typedef Traits traits; ///< Queue traits
152 typedef typename traits::item_counter item_counter; ///< Item counter type
153 typedef typename traits::memory_model memory_model; ///< Memory ordering. See cds::opt::memory_model option
154 typedef typename traits::value_cleaner value_cleaner; ///< Value cleaner, see \p vyukov_queue::traits::value_cleaner
155 typedef typename traits::back_off back_off; ///< back-off strategy
157 /// \p true for single-consumer version, \p false otherwise
158 static CDS_CONSTEXPR bool const c_single_consumer = traits::single_consumer;
160 /// Rebind template arguments
161 template <typename T2, typename Traits2>
163 typedef VyukovMPMCCycleQueue< T2, Traits2 > other ; ///< Rebinding result
168 typedef atomics::atomic<size_t> sequence_type;
171 sequence_type sequence;
178 typedef typename traits::buffer::template rebind<cell_type>::other buffer;
184 size_t const m_nBufferMask;
185 typename opt::details::apply_padding< size_t, traits::padding >::padding_type pad1_;
186 sequence_type m_posEnqueue;
187 typename opt::details::apply_padding< sequence_type, traits::padding >::padding_type pad2_;
188 sequence_type m_posDequeue;
189 typename opt::details::apply_padding< sequence_type, traits::padding >::padding_type pad3_;
190 item_counter m_ItemCounter;
194 /// Constructs the queue of capacity \p nCapacity
196 For \p cds::opt::v::static_buffer the \p nCapacity parameter is ignored.
198 The buffer capacity must be the power of two.
200 VyukovMPMCCycleQueue(
203 : m_buffer( nCapacity )
204 , m_nBufferMask( m_buffer.capacity() - 1 )
206 nCapacity = m_buffer.capacity();
208 // Buffer capacity must be power of 2
209 assert( nCapacity >= 2 && (nCapacity & (nCapacity - 1)) == 0 );
211 for (size_t i = 0; i != nCapacity; ++i )
212 m_buffer[i].sequence.store(i, memory_model::memory_order_relaxed);
214 m_posEnqueue.store(0, memory_model::memory_order_relaxed);
215 m_posDequeue.store(0, memory_model::memory_order_relaxed);
218 ~VyukovMPMCCycleQueue()
223 /// Enqueues data to the queue using a functor
225 \p Func is a functor called to copy a value to the queue cell.
226 The functor \p f takes one argument - a reference to a empty cell of type \ref value_type :
228 cds::container::VyukovMPMCCycleQueue< Foo > myQueue;
230 myQueue.enqueue_with( [&bar]( Foo& dest ) { dest = std::move(bar); } );
233 template <typename Func>
234 bool enqueue_with(Func f)
239 size_t pos = m_posEnqueue.load(memory_model::memory_order_relaxed);
242 cell = &m_buffer[pos & m_nBufferMask];
243 size_t seq = cell->sequence.load(memory_model::memory_order_acquire);
245 intptr_t dif = static_cast<intptr_t>(seq) - static_cast<intptr_t>(pos);
248 if ( m_posEnqueue.compare_exchange_weak(pos, pos + 1, memory_model::memory_order_relaxed, atomics::memory_order_relaxed ))
253 if ( pos - m_posDequeue.load( memory_model::memory_order_relaxed ) == capacity() )
254 return false; // queue full
256 pos = m_posEnqueue.load( memory_model::memory_order_relaxed );
259 pos = m_posEnqueue.load(memory_model::memory_order_relaxed);
264 cell->sequence.store(pos + 1, memory_model::memory_order_release);
270 /// Enqueues \p val value into the queue.
272 The new queue item is created by calling placement new in free cell.
273 Returns \p true if success, \p false if the queue is full.
275 bool enqueue( value_type const& val )
277 return enqueue_with( [&val]( value_type& dest ){ new ( &dest ) value_type( val ); });
280 /// Synonym for \p enqueue()
281 bool push( value_type const& data )
283 return enqueue( data );
286 /// Synonym for \p enqueue_with()
287 template <typename Func>
288 bool push_with( Func f )
290 return enqueue_with( f );
293 /// Enqueues data of type \ref value_type constructed with <tt>std::forward<Args>(args)...</tt>
294 template <typename... Args>
295 bool emplace( Args&&... args )
297 #if (CDS_COMPILER == CDS_COMPILER_GCC) && (CDS_COMPILER_VERSION < 40900)
298 //work around unsupported feature in g++ 4.8 for forwarding parameter packs to lambda.
299 return enqueue_with ( std::bind([]( value_type& dest,Args ... args ){ new ( &dest ) value_type( std::forward<Args>(args)... );}, std::placeholders::_1 ,args...));
301 return enqueue_with( [&args ...]( value_type& dest ){ new ( &dest ) value_type( std::forward<Args>(args)... ); });
305 /// Dequeues a value using a functor
307 \p Func is a functor called to copy dequeued value.
308 The functor takes one argument - a reference to removed node:
310 cds:container::VyukovMPMCCycleQueue< Foo > myQueue;
312 myQueue.dequeue_with( [&bar]( Foo& src ) { bar = std::move( src );});
314 The functor is called only if the queue is not empty.
316 template <typename Func>
317 bool dequeue_with( Func f )
322 size_t pos = m_posDequeue.load( memory_model::memory_order_relaxed );
325 cell = &m_buffer[pos & m_nBufferMask];
326 size_t seq = cell->sequence.load(memory_model::memory_order_acquire);
327 intptr_t dif = static_cast<intptr_t>(seq) - static_cast<intptr_t>(pos + 1);
330 if ( m_posDequeue.compare_exchange_weak(pos, pos + 1, memory_model::memory_order_relaxed, atomics::memory_order_relaxed))
335 if ( pos - m_posEnqueue.load( memory_model::memory_order_relaxed ) == 0 )
336 return false; // queue empty
338 pos = m_posDequeue.load( memory_model::memory_order_relaxed );
341 pos = m_posDequeue.load(memory_model::memory_order_relaxed);
345 value_cleaner()( cell->data );
346 cell->sequence.store( pos + m_nBufferMask + 1, memory_model::memory_order_release );
352 /// Dequeues a value from the queue
354 If queue is not empty, the function returns \p true, \p dest contains copy of
355 dequeued value. The assignment operator for type \ref value_type is invoked.
356 If queue is empty, the function returns \p false, \p dest is unchanged.
358 bool dequeue(value_type & dest )
360 return dequeue_with( [&dest]( value_type& src ){ dest = src; } );
363 /// Synonym for \p dequeue()
364 bool pop(value_type& data)
366 return dequeue(data);
369 /// Synonym for \p dequeue_with()
370 template <typename Func>
371 bool pop_with( Func f )
373 return dequeue_with( f );
376 /// Returns a pointer to top element of the queue or \p nullptr if queue is empty (only for single-consumer version)
377 template <bool SC = c_single_consumer >
378 typename std::enable_if<SC, value_type *>::type front()
380 static_assert( c_single_consumer, "front() is enabled only if traits::single_consumer is true");
385 size_t pos = m_posDequeue.load( memory_model::memory_order_relaxed );
388 cell = &m_buffer[pos & m_nBufferMask];
389 size_t seq = cell->sequence.load( memory_model::memory_order_acquire );
390 intptr_t dif = static_cast<intptr_t>(seq) - static_cast<intptr_t>(pos + 1);
394 else if ( dif < 0 ) {
396 if ( pos - m_posEnqueue.load( memory_model::memory_order_relaxed ) == 0 )
397 return nullptr; // queue empty
399 pos = m_posDequeue.load( memory_model::memory_order_relaxed );
402 pos = m_posDequeue.load( memory_model::memory_order_relaxed );
406 /// Pops top element; returns \p true if queue is not empty, \p false otherwise (only for single-consumer version)
407 template <bool SC = c_single_consumer >
408 typename std::enable_if<SC, bool>::type pop_front()
410 return dequeue_with( []( value_type& ) {} );
413 /// Checks if the queue is empty
416 const cell_type * cell;
419 size_t pos = m_posDequeue.load(memory_model::memory_order_relaxed);
422 cell = &m_buffer[pos & m_nBufferMask];
423 size_t seq = cell->sequence.load(memory_model::memory_order_acquire);
424 intptr_t dif = static_cast<intptr_t>(seq) - static_cast<intptr_t>(pos + 1);
429 if ( pos - m_posEnqueue.load( memory_model::memory_order_relaxed ) == 0 )
433 pos = m_posDequeue.load(memory_model::memory_order_relaxed);
444 /// Returns queue's item count
446 The value returned depends on \p vyukov_queue::traits::item_counter option.
447 For \p atomicity::empty_item_counter, the function always returns 0.
451 return m_ItemCounter.value();
454 /// Returns capacity of the queue
455 size_t capacity() const
457 return m_buffer.capacity();
462 namespace vyukov_queue {
463 template <typename Traits>
464 struct single_consumer_traits : public Traits
466 static CDS_CONSTEXPR bool const single_consumer = true;
468 } // namespace vyukov_queue
471 /// Vyukov's queue multiple producer - single consumer version
472 template <typename T, typename Traits = vyukov_queue::traits >
473 using VyukovMPSCCycleQueue = VyukovMPMCCycleQueue< T, vyukov_queue::single_consumer_traits<Traits> >;
475 }} // namespace cds::container
477 #endif // #ifndef CDSLIB_CONTAINER_VYUKOV_MPMC_CYCLE_QUEUE_H