3 #ifndef CDSLIB_CONTAINER_VYUKOV_MPMC_CYCLE_QUEUE_H
4 #define CDSLIB_CONTAINER_VYUKOV_MPMC_CYCLE_QUEUE_H
6 #include <cds/container/details/base.h>
7 #include <cds/opt/buffer.h>
8 #include <cds/opt/value_cleaner.h>
9 #include <cds/algo/atomic.h>
10 #include <cds/details/bounded_container.h>
12 namespace cds { namespace container {
14 /// VyukovMPMCCycleQueue related definitions
15 /** @ingroup cds_nonintrusive_helper
17 namespace vyukov_queue {
19 /// VyukovMPMCCycleQueue default traits
21 /// Buffer type for internal array
23 The type of element for the buffer is not important: the queue rebinds
24 buffer for required type via \p rebind metafunction.
26 For \p VyukovMPMCCycleQueue queue the buffer size should have power-of-2 size.
28 typedef cds::opt::v::dynamic_buffer< void * > buffer;
30 /// A functor to clean item dequeued.
32 The functor calls the destructor for queue item.
33 After an item is dequeued, \p value_cleaner cleans the cell that the item has been occupied.
34 If \p T is a complex type, \p value_cleaner may be the useful feature.
36 Default value is \ref opt::v::destruct_cleaner
38 typedef cds::opt::v::destruct_cleaner value_cleaner;
40 /// Item counting feature; by default, disabled. Use \p cds::atomicity::item_counter to enable item counting
41 typedef cds::atomicity::empty_item_counter item_counter;
43 /// C++ memory ordering model
45 Can be \p opt::v::relaxed_ordering (relaxed memory model, the default)
46 or \p opt::v::sequential_consistent (sequentially consisnent memory model).
48 typedef opt::v::relaxed_ordering memory_model;
50 /// Alignment for internal queue data. Default is \p opt::cache_line_alignment
51 enum { alignment = opt::cache_line_alignment };
54 /// Metafunction converting option list to \p vyukov_queue::traits
56 Supported \p Options are:
57 - \p opt::buffer - the buffer type for internal cyclic array. Possible types are:
58 \p opt::v::dynamic_buffer (the default), \p opt::v::static_buffer. The type of
59 element in the buffer is not important: it will be changed via \p rebind metafunction.
60 - \p opt::value_cleaner - a functor to clean item dequeued.
61 The functor calls the destructor for queue item.
62 After an item is dequeued, \p value_cleaner cleans the cell that the item has been occupied.
63 If \p T is a complex type, \p value_cleaner can be an useful feature.
64 Default value is \ref opt::v::destruct_cleaner
65 - \p opt::item_counter - the type of item counting feature. Default is \p cds::atomicity::empty_item_counter (item counting disabled)
66 To enable item counting use \p cds::atomicity::item_counter
67 - \p opt::alignment - the alignment for internal queue data. Default is \p opt::cache_line_alignment
68 - \p opt::memory_model - C++ memory ordering model. Can be \p opt::v::relaxed_ordering (relaxed memory model, the default)
69 or \p opt::v::sequential_consistent (sequentially consisnent memory model).
71 Example: declare \p %VyukovMPMCCycleQueue with item counting and static iternal buffer of size 1024:
73 typedef cds::container::VyukovMPMCCycleQueue< Foo,
74 typename cds::container::vyukov_queue::make_traits<
75 cds::opt::buffer< cds::opt::v::static_buffer< void *, 1024 >,
76 cds::opt::item_counte< cds::atomicity::item_counter >
81 template <typename... Options>
83 # ifdef CDS_DOXYGEN_INVOKED
84 typedef implementation_defined type; ///< Metafunction result
86 typedef typename cds::opt::make_options<
87 typename cds::opt::find_type_traits< traits, Options... >::type
93 } //namespace vyukov_queue
95 /// Vyukov's MPMC bounded queue
96 /** @ingroup cds_nonintrusive_queue
97 This algorithm is developed by Dmitry Vyukov (see http://www.1024cores.net)
98 It's multi-producer multi-consumer (MPMC), array-based, fails on overflow, does not require GC, w/o priorities, causal FIFO,
99 blocking producers and consumers queue. The algorithm is pretty simple and fast. It's not lock-free in the official meaning,
100 just implemented by means of atomic RMW operations w/o mutexes.
102 The cost of enqueue/dequeue is 1 CAS per operation.
103 No dynamic memory allocation/management during operation. Producers and consumers are separated from each other (as in the two-lock queue),
104 i.e. do not touch the same data while queue is not empty.
107 - http://www.1024cores.net/home/lock-free-algorithms/queues/bounded-mpmc-queue
110 - \p T - type stored in queue.
111 - \p Traits - queue traits, default is \p vykov_queue::traits. You can use \p vykov_queue::make_traits
112 metafunction to make your traits or just derive your traits from \p %vykov_queue::traits:
114 struct myTraits: public cds::container::vykov_queue::traits {
115 typedef cds::atomicity::item_counter item_counter;
117 typedef cds::container::VyukovMPMCCycleQueue< Foo, myTraits > myQueue;
119 // Equivalent make_traits example:
120 typedef cds::container::VyukovMPMCCycleQueue< cds::gc::HP, Foo,
121 typename cds::container::vykov_queue::make_traits<
122 cds::opt::item_counter< cds::atomicity::item_counter >
128 Simplified BSD license by Dmitry Vyukov (http://www.1024cores.net/site/1024cores/home/code-license)
130 template <typename T, typename Traits = vyukov_queue::traits >
131 class VyukovMPMCCycleQueue : public cds::bounded_container
134 typedef T value_type; ///< Value type to be stored in the queue
135 typedef Traits traits; ///< Queue traits
136 typedef typename traits::item_counter item_counter; ///< Item counter type
137 typedef typename traits::memory_model memory_model; ///< Memory ordering. See cds::opt::memory_model option
138 typedef typename traits::value_cleaner value_cleaner; ///< Value cleaner, see \p vyukov_queue::traits::value_cleaner
140 /// Rebind template arguments
141 template <typename T2, typename Traits2>
143 typedef VyukovMPMCCycleQueue< T2, Traits2 > other ; ///< Rebinding result
148 typedef atomics::atomic<size_t> sequence_type;
151 sequence_type sequence;
158 typedef typename traits::buffer::template rebind<cell_type>::other buffer;
159 typedef typename opt::details::alignment_setter< sequence_type, traits::alignment >::type aligned_sequence_type;
160 typedef typename opt::details::alignment_setter< buffer, traits::alignment >::type aligned_buffer;
165 aligned_buffer m_buffer;
166 size_t const m_nBufferMask;
167 aligned_sequence_type m_posEnqueue;
168 aligned_sequence_type m_posDequeue;
169 item_counter m_ItemCounter;
173 /// Constructs the queue of capacity \p nCapacity
175 For \p cds::opt::v::static_buffer the \p nCapacity parameter is ignored.
177 The buffer capacity must be the power of two.
179 VyukovMPMCCycleQueue(
182 : m_buffer( nCapacity )
183 , m_nBufferMask( m_buffer.capacity() - 1 )
185 nCapacity = m_buffer.capacity();
187 // Buffer capacity must be power of 2
188 assert( nCapacity >= 2 && (nCapacity & (nCapacity - 1)) == 0 );
190 for (size_t i = 0; i != nCapacity; ++i )
191 m_buffer[i].sequence.store(i, memory_model::memory_order_relaxed);
193 m_posEnqueue.store(0, memory_model::memory_order_relaxed);
194 m_posDequeue.store(0, memory_model::memory_order_relaxed);
197 ~VyukovMPMCCycleQueue()
202 /// Enqueues data to the queue using a functor
204 \p Func is a functor called to copy a value to the queue cell.
205 The functor \p f takes one argument - a reference to a empty cell of type \ref value_type :
207 cds::container::VyukovMPMCCycleQueue< Foo > myQueue;
209 myQueue.enqueue_with( [&bar]( Foo& dest ) { dest = std::move(bar); } );
212 template <typename Func>
213 bool enqueue_with(Func f)
216 size_t pos = m_posEnqueue.load(memory_model::memory_order_relaxed);
220 cell = &m_buffer[pos & m_nBufferMask];
221 size_t seq = cell->sequence.load(memory_model::memory_order_acquire);
223 intptr_t dif = static_cast<intptr_t>(seq) - static_cast<intptr_t>(pos);
226 if ( m_posEnqueue.compare_exchange_weak(pos, pos + 1, memory_model::memory_order_relaxed, atomics::memory_order_relaxed ))
232 pos = m_posEnqueue.load(memory_model::memory_order_relaxed);
237 cell->sequence.store(pos + 1, memory_model::memory_order_release);
243 /// Enqueues \p val value into the queue.
245 The new queue item is created by calling placement new in free cell.
246 Returns \p true if success, \p false if the queue is full.
248 bool enqueue( value_type const& val )
250 return enqueue_with( [&val]( value_type& dest ){ new ( &dest ) value_type( val ); });
253 /// Synonym for \p enqueue()
254 bool push( value_type const& data )
256 return enqueue( data );
259 /// Synonym for \p enqueue_with()
260 template <typename Func>
261 bool push_with( Func f )
263 return enqueue_with( f );
266 /// Enqueues data of type \ref value_type constructed with <tt>std::forward<Args>(args)...</tt>
267 template <typename... Args>
268 bool emplace( Args&&... args )
271 size_t pos = m_posEnqueue.load(memory_model::memory_order_relaxed);
275 cell = &m_buffer[pos & m_nBufferMask];
276 size_t seq = cell->sequence.load(memory_model::memory_order_acquire);
278 intptr_t dif = static_cast<intptr_t>(seq) - static_cast<intptr_t>(pos);
281 if ( m_posEnqueue.compare_exchange_weak(pos, pos + 1, memory_model::memory_order_relaxed, atomics::memory_order_relaxed))
287 pos = m_posEnqueue.load(memory_model::memory_order_relaxed);
290 new ( &cell->data ) value_type( std::forward<Args>(args)... );
292 cell->sequence.store(pos + 1, memory_model::memory_order_release);
298 /// Dequeues a value using a functor
300 \p Func is a functor called to copy dequeued value.
301 The functor takes one argument - a reference to removed node:
303 cds:container::VyukovMPMCCycleQueue< Foo > myQueue;
305 myQueue.dequeue_with( [&bar]( Foo& src ) { bar = std::move( src );});
307 The functor is called only if the queue is not empty.
309 template <typename Func>
310 bool dequeue_with( Func f )
313 size_t pos = m_posDequeue.load(memory_model::memory_order_relaxed);
317 cell = &m_buffer[pos & m_nBufferMask];
318 size_t seq = cell->sequence.load(memory_model::memory_order_acquire);
319 intptr_t dif = static_cast<intptr_t>(seq) - static_cast<intptr_t>(pos + 1);
322 if ( m_posDequeue.compare_exchange_weak(pos, pos + 1, memory_model::memory_order_relaxed, atomics::memory_order_relaxed))
328 pos = m_posDequeue.load(memory_model::memory_order_relaxed);
332 value_cleaner()( cell->data );
333 cell->sequence.store( pos + m_nBufferMask + 1, memory_model::memory_order_release );
339 /// Dequeues a value from the queue
341 If queue is not empty, the function returns \p true, \p dest contains copy of
342 dequeued value. The assignment operator for type \ref value_type is invoked.
343 If queue is empty, the function returns \p false, \p dest is unchanged.
345 bool dequeue(value_type & dest )
347 return dequeue_with( [&dest]( value_type& src ){ dest = src; } );
350 /// Synonym for \p dequeue()
351 bool pop(value_type& data)
353 return dequeue(data);
356 /// Synonym for \p dequeue_with()
357 template <typename Func>
358 bool pop_with( Func f )
360 return dequeue_with( f );
363 /// Checks if the queue is empty
366 const cell_type * cell;
367 size_t pos = m_posDequeue.load(memory_model::memory_order_relaxed);
371 cell = &m_buffer[pos & m_nBufferMask];
372 size_t seq = cell->sequence.load(memory_model::memory_order_acquire);
373 intptr_t dif = static_cast<intptr_t>(seq) - static_cast<intptr_t>(pos + 1);
380 pos = m_posDequeue.load(memory_model::memory_order_relaxed);
391 /// Returns queue's item count
393 The value returned depends on \p vyukov_queue::traits::item_counter option.
394 For \p atomicity::empty_item_counter, the function always returns 0.
398 return m_ItemCounter.value();
401 /// Returns capacity of the queue
402 size_t capacity() const
404 return m_buffer.capacity();
407 }} // namespace cds::container
409 #endif // #ifndef CDSLIB_CONTAINER_VYUKOV_MPMC_CYCLE_QUEUE_H