3 #ifndef CDSLIB_CONTAINER_VYUKOV_MPMC_CYCLE_QUEUE_H
4 #define CDSLIB_CONTAINER_VYUKOV_MPMC_CYCLE_QUEUE_H
6 #include <cds/container/details/base.h>
7 #include <cds/opt/buffer.h>
8 #include <cds/opt/value_cleaner.h>
9 #include <cds/algo/atomic.h>
10 #include <cds/details/bounded_container.h>
12 namespace cds { namespace container {
14 /// VyukovMPMCCycleQueue related definitions
15 /** @ingroup cds_nonintrusive_helper
17 namespace vyukov_queue {
19 /// VyukovMPMCCycleQueue default traits
21 /// Buffer type for internal array
23 The type of element for the buffer is not important: the queue rebinds
24 buffer for required type via \p rebind metafunction.
26 For \p VyukovMPMCCycleQueue queue the buffer size should have power-of-2 size.
28 typedef cds::opt::v::dynamic_buffer< void * > buffer;
30 /// A functor to clean item dequeued.
32 The functor calls the destructor for queue item.
33 After an item is dequeued, \p value_cleaner cleans the cell that the item has been occupied.
34 If \p T is a complex type, \p value_cleaner may be the useful feature.
36 Default value is \ref opt::v::destruct_cleaner
38 typedef cds::opt::v::destruct_cleaner value_cleaner;
40 /// Item counting feature; by default, disabled. Use \p cds::atomicity::item_counter to enable item counting
41 typedef cds::atomicity::empty_item_counter item_counter;
43 /// C++ memory ordering model
45 Can be \p opt::v::relaxed_ordering (relaxed memory model, the default)
46 or \p opt::v::sequential_consistent (sequentially consisnent memory model).
48 typedef opt::v::relaxed_ordering memory_model;
50 /// Padding for internal critical atomic data. Default is \p opt::cache_line_padding
51 enum { padding = opt::cache_line_padding };
54 /// Metafunction converting option list to \p vyukov_queue::traits
56 Supported \p Options are:
57 - \p opt::buffer - the buffer type for internal cyclic array. Possible types are:
58 \p opt::v::dynamic_buffer (the default), \p opt::v::static_buffer. The type of
59 element in the buffer is not important: it will be changed via \p rebind metafunction.
60 - \p opt::value_cleaner - a functor to clean item dequeued.
61 The functor calls the destructor for queue item.
62 After an item is dequeued, \p value_cleaner cleans the cell that the item has been occupied.
63 If \p T is a complex type, \p value_cleaner can be an useful feature.
64 Default value is \ref opt::v::destruct_cleaner
65 - \p opt::item_counter - the type of item counting feature. Default is \p cds::atomicity::empty_item_counter (item counting disabled)
66 To enable item counting use \p cds::atomicity::item_counter
67 - \p opt::padding - padding for internal critical atomic data. Default is \p opt::cache_line_padding
68 - \p opt::memory_model - C++ memory ordering model. Can be \p opt::v::relaxed_ordering (relaxed memory model, the default)
69 or \p opt::v::sequential_consistent (sequentially consisnent memory model).
71 Example: declare \p %VyukovMPMCCycleQueue with item counting and static iternal buffer of size 1024:
73 typedef cds::container::VyukovMPMCCycleQueue< Foo,
74 typename cds::container::vyukov_queue::make_traits<
75 cds::opt::buffer< cds::opt::v::static_buffer< void *, 1024 >,
76 cds::opt::item_counte< cds::atomicity::item_counter >
81 template <typename... Options>
83 # ifdef CDS_DOXYGEN_INVOKED
84 typedef implementation_defined type; ///< Metafunction result
86 typedef typename cds::opt::make_options<
87 typename cds::opt::find_type_traits< traits, Options... >::type
93 } //namespace vyukov_queue
95 /// Vyukov's MPMC bounded queue
96 /** @ingroup cds_nonintrusive_queue
97 This algorithm is developed by Dmitry Vyukov (see http://www.1024cores.net)
98 It's multi-producer multi-consumer (MPMC), array-based, fails on overflow, does not require GC, w/o priorities, causal FIFO,
99 blocking producers and consumers queue. The algorithm is pretty simple and fast. It's not lock-free in the official meaning,
100 just implemented by means of atomic RMW operations w/o mutexes.
102 The cost of enqueue/dequeue is 1 CAS per operation.
103 No dynamic memory allocation/management during operation. Producers and consumers are separated from each other (as in the two-lock queue),
104 i.e. do not touch the same data while queue is not empty.
107 - http://www.1024cores.net/home/lock-free-algorithms/queues/bounded-mpmc-queue
110 - \p T - type stored in queue.
111 - \p Traits - queue traits, default is \p vykov_queue::traits. You can use \p vykov_queue::make_traits
112 metafunction to make your traits or just derive your traits from \p %vykov_queue::traits:
114 struct myTraits: public cds::container::vykov_queue::traits {
115 typedef cds::atomicity::item_counter item_counter;
117 typedef cds::container::VyukovMPMCCycleQueue< Foo, myTraits > myQueue;
119 // Equivalent make_traits example:
120 typedef cds::container::VyukovMPMCCycleQueue< cds::gc::HP, Foo,
121 typename cds::container::vykov_queue::make_traits<
122 cds::opt::item_counter< cds::atomicity::item_counter >
128 Simplified BSD license by Dmitry Vyukov (http://www.1024cores.net/site/1024cores/home/code-license)
130 template <typename T, typename Traits = vyukov_queue::traits >
131 class VyukovMPMCCycleQueue : public cds::bounded_container
134 typedef T value_type; ///< Value type to be stored in the queue
135 typedef Traits traits; ///< Queue traits
136 typedef typename traits::item_counter item_counter; ///< Item counter type
137 typedef typename traits::memory_model memory_model; ///< Memory ordering. See cds::opt::memory_model option
138 typedef typename traits::value_cleaner value_cleaner; ///< Value cleaner, see \p vyukov_queue::traits::value_cleaner
140 /// Rebind template arguments
141 template <typename T2, typename Traits2>
143 typedef VyukovMPMCCycleQueue< T2, Traits2 > other ; ///< Rebinding result
148 typedef atomics::atomic<size_t> sequence_type;
151 sequence_type sequence;
158 typedef typename traits::buffer::template rebind<cell_type>::other buffer;
164 size_t const m_nBufferMask;
165 typename opt::details::apply_padding< size_t, traits::padding >::padding_type pad1_;
166 sequence_type m_posEnqueue;
167 typename opt::details::apply_padding< sequence_type, traits::padding >::padding_type pad2_;
168 sequence_type m_posDequeue;
169 typename opt::details::apply_padding< sequence_type, traits::padding >::padding_type pad3_;
170 item_counter m_ItemCounter;
174 /// Constructs the queue of capacity \p nCapacity
176 For \p cds::opt::v::static_buffer the \p nCapacity parameter is ignored.
178 The buffer capacity must be the power of two.
180 VyukovMPMCCycleQueue(
183 : m_buffer( nCapacity )
184 , m_nBufferMask( m_buffer.capacity() - 1 )
186 nCapacity = m_buffer.capacity();
188 // Buffer capacity must be power of 2
189 assert( nCapacity >= 2 && (nCapacity & (nCapacity - 1)) == 0 );
191 for (size_t i = 0; i != nCapacity; ++i )
192 m_buffer[i].sequence.store(i, memory_model::memory_order_relaxed);
194 m_posEnqueue.store(0, memory_model::memory_order_relaxed);
195 m_posDequeue.store(0, memory_model::memory_order_relaxed);
198 ~VyukovMPMCCycleQueue()
203 /// Enqueues data to the queue using a functor
205 \p Func is a functor called to copy a value to the queue cell.
206 The functor \p f takes one argument - a reference to a empty cell of type \ref value_type :
208 cds::container::VyukovMPMCCycleQueue< Foo > myQueue;
210 myQueue.enqueue_with( [&bar]( Foo& dest ) { dest = std::move(bar); } );
213 template <typename Func>
214 bool enqueue_with(Func f)
217 size_t pos = m_posEnqueue.load(memory_model::memory_order_relaxed);
221 cell = &m_buffer[pos & m_nBufferMask];
222 size_t seq = cell->sequence.load(memory_model::memory_order_acquire);
224 intptr_t dif = static_cast<intptr_t>(seq) - static_cast<intptr_t>(pos);
227 if ( m_posEnqueue.compare_exchange_weak(pos, pos + 1, memory_model::memory_order_relaxed, atomics::memory_order_relaxed ))
233 pos = m_posEnqueue.load(memory_model::memory_order_relaxed);
238 cell->sequence.store(pos + 1, memory_model::memory_order_release);
244 /// Enqueues \p val value into the queue.
246 The new queue item is created by calling placement new in free cell.
247 Returns \p true if success, \p false if the queue is full.
249 bool enqueue( value_type const& val )
251 return enqueue_with( [&val]( value_type& dest ){ new ( &dest ) value_type( val ); });
254 /// Synonym for \p enqueue()
255 bool push( value_type const& data )
257 return enqueue( data );
260 /// Synonym for \p enqueue_with()
261 template <typename Func>
262 bool push_with( Func f )
264 return enqueue_with( f );
267 /// Enqueues data of type \ref value_type constructed with <tt>std::forward<Args>(args)...</tt>
268 template <typename... Args>
269 bool emplace( Args&&... args )
271 #if (CDS_COMPILER == CDS_COMPILER_GCC) && (CDS_COMPILER_VERSION < 40900)
272 //work around unsupported feature in g++ 4.8 for forwarding parameter packs to lambda.
273 return enqueue_with ( std::bind([]( value_type& dest,Args ... args ){ new ( &dest ) value_type( std::forward<Args>(args)... );}, std::placeholders::_1 ,args...));
275 return enqueue_with( [&args ...]( value_type& dest ){ new ( &dest ) value_type( std::forward<Args>(args)... ); });
279 /// Dequeues a value using a functor
281 \p Func is a functor called to copy dequeued value.
282 The functor takes one argument - a reference to removed node:
284 cds:container::VyukovMPMCCycleQueue< Foo > myQueue;
286 myQueue.dequeue_with( [&bar]( Foo& src ) { bar = std::move( src );});
288 The functor is called only if the queue is not empty.
290 template <typename Func>
291 bool dequeue_with( Func f )
294 size_t pos = m_posDequeue.load(memory_model::memory_order_relaxed);
298 cell = &m_buffer[pos & m_nBufferMask];
299 size_t seq = cell->sequence.load(memory_model::memory_order_acquire);
300 intptr_t dif = static_cast<intptr_t>(seq) - static_cast<intptr_t>(pos + 1);
303 if ( m_posDequeue.compare_exchange_weak(pos, pos + 1, memory_model::memory_order_relaxed, atomics::memory_order_relaxed))
309 pos = m_posDequeue.load(memory_model::memory_order_relaxed);
313 value_cleaner()( cell->data );
314 cell->sequence.store( pos + m_nBufferMask + 1, memory_model::memory_order_release );
320 /// Dequeues a value from the queue
322 If queue is not empty, the function returns \p true, \p dest contains copy of
323 dequeued value. The assignment operator for type \ref value_type is invoked.
324 If queue is empty, the function returns \p false, \p dest is unchanged.
326 bool dequeue(value_type & dest )
328 return dequeue_with( [&dest]( value_type& src ){ dest = src; } );
331 /// Synonym for \p dequeue()
332 bool pop(value_type& data)
334 return dequeue(data);
337 /// Synonym for \p dequeue_with()
338 template <typename Func>
339 bool pop_with( Func f )
341 return dequeue_with( f );
344 /// Checks if the queue is empty
347 const cell_type * cell;
348 size_t pos = m_posDequeue.load(memory_model::memory_order_relaxed);
352 cell = &m_buffer[pos & m_nBufferMask];
353 size_t seq = cell->sequence.load(memory_model::memory_order_acquire);
354 intptr_t dif = static_cast<intptr_t>(seq) - static_cast<intptr_t>(pos + 1);
361 pos = m_posDequeue.load(memory_model::memory_order_relaxed);
372 /// Returns queue's item count
374 The value returned depends on \p vyukov_queue::traits::item_counter option.
375 For \p atomicity::empty_item_counter, the function always returns 0.
379 return m_ItemCounter.value();
382 /// Returns capacity of the queue
383 size_t capacity() const
385 return m_buffer.capacity();
388 }} // namespace cds::container
390 #endif // #ifndef CDSLIB_CONTAINER_VYUKOV_MPMC_CYCLE_QUEUE_H