3 #ifndef CDSLIB_CONTAINER_VYUKOV_MPMC_CYCLE_QUEUE_H
4 #define CDSLIB_CONTAINER_VYUKOV_MPMC_CYCLE_QUEUE_H
6 #include <cds/container/details/base.h>
7 #include <cds/opt/buffer.h>
8 #include <cds/opt/value_cleaner.h>
9 #include <cds/algo/atomic.h>
10 #include <cds/details/bounded_container.h>
12 namespace cds { namespace container {
14 /// VyukovMPMCCycleQueue related definitions
15 /** @ingroup cds_nonintrusive_helper
17 namespace vyukov_queue {
19 /// VyukovMPMCCycleQueue default traits
21 /// Buffer type for internal array
23 The type of element for the buffer is not important: the queue rebinds
24 buffer for required type via \p rebind metafunction.
26 For \p VyukovMPMCCycleQueue queue the buffer size should have power-of-2 size.
28 typedef cds::opt::v::dynamic_buffer< void * > buffer;
30 /// A functor to clean item dequeued.
32 The functor calls the destructor for queue item.
33 After an item is dequeued, \p value_cleaner cleans the cell that the item has been occupied.
34 If \p T is a complex type, \p value_cleaner may be the useful feature.
36 Default value is \ref opt::v::destruct_cleaner
38 typedef cds::opt::v::destruct_cleaner value_cleaner;
40 /// Item counting feature; by default, disabled. Use \p cds::atomicity::item_counter to enable item counting
41 typedef cds::atomicity::empty_item_counter item_counter;
43 /// C++ memory ordering model
45 Can be \p opt::v::relaxed_ordering (relaxed memory model, the default)
46 or \p opt::v::sequential_consistent (sequentially consisnent memory model).
48 typedef opt::v::relaxed_ordering memory_model;
50 /// Padding for internal critical atomic data. Default is \p opt::cache_line_padding
51 enum { padding = opt::cache_line_padding };
54 typedef cds::backoff::Default back_off;
56 /// Single-consumer version
58 For single-consumer version of algorithm some additional functions
60 (\p front(), \p pop_front()) is available.
64 static CDS_CONSTEXPR bool const single_consumer = false;
67 /// Metafunction converting option list to \p vyukov_queue::traits
69 Supported \p Options are:
70 - \p opt::buffer - the buffer type for internal cyclic array. Possible types are:
71 \p opt::v::dynamic_buffer (the default), \p opt::v::static_buffer. The type of
72 element in the buffer is not important: it will be changed via \p rebind metafunction.
73 - \p opt::value_cleaner - a functor to clean item dequeued.
74 The functor calls the destructor for queue item.
75 After an item is dequeued, \p value_cleaner cleans the cell that the item has been occupied.
76 If \p T is a complex type, \p value_cleaner can be an useful feature.
77 Default value is \ref opt::v::destruct_cleaner
78 - \p opt::back_off - back-off strategy used. If the option is not specified, the \p cds::backoff::Default is used.
79 - \p opt::item_counter - the type of item counting feature. Default is \p cds::atomicity::empty_item_counter (item counting disabled)
80 To enable item counting use \p cds::atomicity::item_counter
81 - \p opt::padding - padding for internal critical atomic data. Default is \p opt::cache_line_padding
82 - \p opt::memory_model - C++ memory ordering model. Can be \p opt::v::relaxed_ordering (relaxed memory model, the default)
83 or \p opt::v::sequential_consistent (sequentially consisnent memory model).
85 Example: declare \p %VyukovMPMCCycleQueue with item counting and static iternal buffer of size 1024:
87 typedef cds::container::VyukovMPMCCycleQueue< Foo,
88 typename cds::container::vyukov_queue::make_traits<
89 cds::opt::buffer< cds::opt::v::static_buffer< void *, 1024 >,
90 cds::opt::item_counte< cds::atomicity::item_counter >
95 template <typename... Options>
97 # ifdef CDS_DOXYGEN_INVOKED
98 typedef implementation_defined type; ///< Metafunction result
100 typedef typename cds::opt::make_options<
101 typename cds::opt::find_type_traits< traits, Options... >::type
107 } //namespace vyukov_queue
109 /// Vyukov's MPMC bounded queue
110 /** @ingroup cds_nonintrusive_queue
111 This algorithm is developed by Dmitry Vyukov (see http://www.1024cores.net)
112 It's multi-producer multi-consumer (MPMC), array-based, fails on overflow, does not require GC, w/o priorities, causal FIFO,
113 blocking producers and consumers queue. The algorithm is pretty simple and fast. It's not lock-free in the official meaning,
114 just implemented by means of atomic RMW operations w/o mutexes.
116 The cost of enqueue/dequeue is 1 CAS per operation.
117 No dynamic memory allocation/management during operation. Producers and consumers are separated from each other (as in the two-lock queue),
118 i.e. do not touch the same data while queue is not empty.
120 There is multiple producer/single consumer version \p cds::container::VyukovMPSCCycleQueue
121 that supports \p front() and \p pop_front() functions.
124 - http://www.1024cores.net/home/lock-free-algorithms/queues/bounded-mpmc-queue
127 - \p T - type stored in queue.
128 - \p Traits - queue traits, default is \p vykov_queue::traits. You can use \p vykov_queue::make_traits
129 metafunction to make your traits or just derive your traits from \p %vykov_queue::traits:
131 struct myTraits: public cds::container::vykov_queue::traits {
132 typedef cds::atomicity::item_counter item_counter;
134 typedef cds::container::VyukovMPMCCycleQueue< Foo, myTraits > myQueue;
136 // Equivalent make_traits example:
137 typedef cds::container::VyukovMPMCCycleQueue< cds::gc::HP, Foo,
138 typename cds::container::vykov_queue::make_traits<
139 cds::opt::item_counter< cds::atomicity::item_counter >
145 Simplified BSD license by Dmitry Vyukov (http://www.1024cores.net/site/1024cores/home/code-license)
147 template <typename T, typename Traits = vyukov_queue::traits >
148 class VyukovMPMCCycleQueue : public cds::bounded_container
151 typedef T value_type; ///< Value type to be stored in the queue
152 typedef Traits traits; ///< Queue traits
153 typedef typename traits::item_counter item_counter; ///< Item counter type
154 typedef typename traits::memory_model memory_model; ///< Memory ordering. See cds::opt::memory_model option
155 typedef typename traits::value_cleaner value_cleaner; ///< Value cleaner, see \p vyukov_queue::traits::value_cleaner
156 typedef typename traits::back_off back_off; ///< back-off strategy
158 /// \p true for single-consumer version, \p false otherwise
159 static CDS_CONSTEXPR bool const c_single_consumer = traits::single_consumer;
161 /// Rebind template arguments
162 template <typename T2, typename Traits2>
164 typedef VyukovMPMCCycleQueue< T2, Traits2 > other ; ///< Rebinding result
169 typedef atomics::atomic<size_t> sequence_type;
172 sequence_type sequence;
179 typedef typename traits::buffer::template rebind<cell_type>::other buffer;
185 size_t const m_nBufferMask;
186 typename opt::details::apply_padding< size_t, traits::padding >::padding_type pad1_;
187 sequence_type m_posEnqueue;
188 typename opt::details::apply_padding< sequence_type, traits::padding >::padding_type pad2_;
189 sequence_type m_posDequeue;
190 typename opt::details::apply_padding< sequence_type, traits::padding >::padding_type pad3_;
191 item_counter m_ItemCounter;
195 /// Constructs the queue of capacity \p nCapacity
197 For \p cds::opt::v::static_buffer the \p nCapacity parameter is ignored.
199 The buffer capacity must be the power of two.
201 VyukovMPMCCycleQueue(
204 : m_buffer( nCapacity )
205 , m_nBufferMask( m_buffer.capacity() - 1 )
207 nCapacity = m_buffer.capacity();
209 // Buffer capacity must be power of 2
210 assert( nCapacity >= 2 && (nCapacity & (nCapacity - 1)) == 0 );
212 for (size_t i = 0; i != nCapacity; ++i )
213 m_buffer[i].sequence.store(i, memory_model::memory_order_relaxed);
215 m_posEnqueue.store(0, memory_model::memory_order_relaxed);
216 m_posDequeue.store(0, memory_model::memory_order_relaxed);
219 ~VyukovMPMCCycleQueue()
224 /// Enqueues data to the queue using a functor
226 \p Func is a functor called to copy a value to the queue cell.
227 The functor \p f takes one argument - a reference to a empty cell of type \ref value_type :
229 cds::container::VyukovMPMCCycleQueue< Foo > myQueue;
231 myQueue.enqueue_with( [&bar]( Foo& dest ) { dest = std::move(bar); } );
234 template <typename Func>
235 bool enqueue_with(Func f)
240 size_t pos = m_posEnqueue.load(memory_model::memory_order_relaxed);
243 cell = &m_buffer[pos & m_nBufferMask];
244 size_t seq = cell->sequence.load(memory_model::memory_order_acquire);
246 intptr_t dif = static_cast<intptr_t>(seq) - static_cast<intptr_t>(pos);
249 if ( m_posEnqueue.compare_exchange_weak(pos, pos + 1, memory_model::memory_order_relaxed, atomics::memory_order_relaxed ))
254 if ( pos - m_posDequeue.load( memory_model::memory_order_relaxed ) == capacity() )
255 return false; // queue full
257 pos = m_posEnqueue.load( memory_model::memory_order_relaxed );
260 pos = m_posEnqueue.load(memory_model::memory_order_relaxed);
265 cell->sequence.store(pos + 1, memory_model::memory_order_release);
271 /// Enqueues \p val value into the queue.
273 The new queue item is created by calling placement new in free cell.
274 Returns \p true if success, \p false if the queue is full.
276 bool enqueue( value_type const& val )
278 return enqueue_with( [&val]( value_type& dest ){ new ( &dest ) value_type( val ); });
281 /// Synonym for \p enqueue()
282 bool push( value_type const& data )
284 return enqueue( data );
287 /// Synonym for \p enqueue_with()
288 template <typename Func>
289 bool push_with( Func f )
291 return enqueue_with( f );
294 /// Enqueues data of type \ref value_type constructed with <tt>std::forward<Args>(args)...</tt>
295 template <typename... Args>
296 bool emplace( Args&&... args )
298 #if (CDS_COMPILER == CDS_COMPILER_GCC) && (CDS_COMPILER_VERSION < 40900)
299 //work around unsupported feature in g++ 4.8 for forwarding parameter packs to lambda.
300 return enqueue_with ( std::bind([]( value_type& dest,Args ... args ){ new ( &dest ) value_type( std::forward<Args>(args)... );}, std::placeholders::_1 ,args...));
302 return enqueue_with( [&args ...]( value_type& dest ){ new ( &dest ) value_type( std::forward<Args>(args)... ); });
306 /// Dequeues a value using a functor
308 \p Func is a functor called to copy dequeued value.
309 The functor takes one argument - a reference to removed node:
311 cds:container::VyukovMPMCCycleQueue< Foo > myQueue;
313 myQueue.dequeue_with( [&bar]( Foo& src ) { bar = std::move( src );});
315 The functor is called only if the queue is not empty.
317 template <typename Func>
318 bool dequeue_with( Func f )
323 size_t pos = m_posDequeue.load( memory_model::memory_order_relaxed );
326 cell = &m_buffer[pos & m_nBufferMask];
327 size_t seq = cell->sequence.load(memory_model::memory_order_acquire);
328 intptr_t dif = static_cast<intptr_t>(seq) - static_cast<intptr_t>(pos + 1);
331 if ( m_posDequeue.compare_exchange_weak(pos, pos + 1, memory_model::memory_order_relaxed, atomics::memory_order_relaxed))
336 if ( pos - m_posEnqueue.load( memory_model::memory_order_relaxed ) == 0 )
337 return false; // queue empty
339 pos = m_posDequeue.load( memory_model::memory_order_relaxed );
342 pos = m_posDequeue.load(memory_model::memory_order_relaxed);
346 value_cleaner()( cell->data );
347 cell->sequence.store( pos + m_nBufferMask + 1, memory_model::memory_order_release );
353 /// Dequeues a value from the queue
355 If queue is not empty, the function returns \p true, \p dest contains copy of
356 dequeued value. The assignment operator for type \ref value_type is invoked.
357 If queue is empty, the function returns \p false, \p dest is unchanged.
359 bool dequeue(value_type & dest )
361 return dequeue_with( [&dest]( value_type& src ){ dest = src; } );
364 /// Synonym for \p dequeue()
365 bool pop(value_type& data)
367 return dequeue(data);
370 /// Synonym for \p dequeue_with()
371 template <typename Func>
372 bool pop_with( Func f )
374 return dequeue_with( f );
377 /// Returns a pointer to top element of the queue or \p nullptr if queue is empty (only for single-consumer version)
378 template <bool SC = c_single_consumer >
379 typename std::enable_if<SC, value_type *>::type front()
381 static_assert( c_single_consumer, "front() is enabled only if traits::single_consumer is true");
386 size_t pos = m_posDequeue.load( memory_model::memory_order_relaxed );
389 cell = &m_buffer[pos & m_nBufferMask];
390 size_t seq = cell->sequence.load( memory_model::memory_order_acquire );
391 intptr_t dif = static_cast<intptr_t>(seq) - static_cast<intptr_t>(pos + 1);
395 else if ( dif < 0 ) {
397 if ( pos - m_posEnqueue.load( memory_model::memory_order_relaxed ) == 0 )
398 return nullptr; // queue empty
400 pos = m_posDequeue.load( memory_model::memory_order_relaxed );
403 pos = m_posDequeue.load( memory_model::memory_order_relaxed );
407 /// Pops top element; returns \p true if queue is not empty, \p false otherwise (only for single-consumer version)
408 template <bool SC = c_single_consumer >
409 typename std::enable_if<SC, bool>::type pop_front()
411 return dequeue_with( []( value_type& ) {} );
414 /// Checks if the queue is empty
417 const cell_type * cell;
420 size_t pos = m_posDequeue.load(memory_model::memory_order_relaxed);
423 cell = &m_buffer[pos & m_nBufferMask];
424 size_t seq = cell->sequence.load(memory_model::memory_order_acquire);
425 intptr_t dif = static_cast<intptr_t>(seq) - static_cast<intptr_t>(pos + 1);
430 if ( pos - m_posEnqueue.load( memory_model::memory_order_relaxed ) == 0 )
434 pos = m_posDequeue.load(memory_model::memory_order_relaxed);
445 /// Returns queue's item count
447 The value returned depends on \p vyukov_queue::traits::item_counter option.
448 For \p atomicity::empty_item_counter, the function always returns 0.
452 return m_ItemCounter.value();
455 /// Returns capacity of the queue
456 size_t capacity() const
458 return m_buffer.capacity();
463 namespace vyukov_queue {
465 template <typename Traits>
466 struct single_consumer_traits : public Traits
468 static CDS_CONSTEXPR bool const single_consumer = true;
470 } // namespace vyukov_queue
473 /// Vyukov's queue multiple producer - single consumer version
475 template <typename T, typename Traits = vyukov_queue::traits >
476 using VyukovMPSCCycleQueue = VyukovMPMCCycleQueue< T, vyukov_queue::single_consumer_traits<Traits> >;
478 }} // namespace cds::container
480 #endif // #ifndef CDSLIB_CONTAINER_VYUKOV_MPMC_CYCLE_QUEUE_H