3 #ifndef CDSLIB_CONTAINER_VYUKOV_MPMC_CYCLE_QUEUE_H
4 #define CDSLIB_CONTAINER_VYUKOV_MPMC_CYCLE_QUEUE_H
6 #include <cds/container/details/base.h>
7 #include <cds/opt/buffer.h>
8 #include <cds/opt/value_cleaner.h>
9 #include <cds/algo/atomic.h>
10 #include <cds/details/bounded_container.h>
12 namespace cds { namespace container {
14 /// VyukovMPMCCycleQueue related definitions
15 /** @ingroup cds_nonintrusive_helper
17 namespace vyukov_queue {
19 /// VyukovMPMCCycleQueue default traits
21 /// Buffer type for internal array
23 The type of element for the buffer is not important: the queue rebinds
24 buffer for required type via \p rebind metafunction.
26 For \p VyukovMPMCCycleQueue queue the buffer size should have power-of-2 size.
28 typedef cds::opt::v::dynamic_buffer< void * > buffer;
30 /// A functor to clean item dequeued.
32 The functor calls the destructor for queue item.
33 After an item is dequeued, \p value_cleaner cleans the cell that the item has been occupied.
34 If \p T is a complex type, \p value_cleaner may be the useful feature.
36 Default value is \ref opt::v::destruct_cleaner
38 typedef cds::opt::v::destruct_cleaner value_cleaner;
40 /// Item counting feature; by default, disabled. Use \p cds::atomicity::item_counter to enable item counting
41 typedef cds::atomicity::empty_item_counter item_counter;
43 /// C++ memory ordering model
45 Can be \p opt::v::relaxed_ordering (relaxed memory model, the default)
46 or \p opt::v::sequential_consistent (sequentially consisnent memory model).
48 typedef opt::v::relaxed_ordering memory_model;
50 /// Padding for internal critical atomic data. Default is \p opt::cache_line_padding
51 enum { padding = opt::cache_line_padding };
54 typedef cds::backoff::Default back_off;
57 /// Metafunction converting option list to \p vyukov_queue::traits
59 Supported \p Options are:
60 - \p opt::buffer - the buffer type for internal cyclic array. Possible types are:
61 \p opt::v::dynamic_buffer (the default), \p opt::v::static_buffer. The type of
62 element in the buffer is not important: it will be changed via \p rebind metafunction.
63 - \p opt::value_cleaner - a functor to clean item dequeued.
64 The functor calls the destructor for queue item.
65 After an item is dequeued, \p value_cleaner cleans the cell that the item has been occupied.
66 If \p T is a complex type, \p value_cleaner can be an useful feature.
67 Default value is \ref opt::v::destruct_cleaner
68 - \p opt::back_off - back-off strategy used. If the option is not specified, the \p cds::backoff::Default is used.
69 - \p opt::item_counter - the type of item counting feature. Default is \p cds::atomicity::empty_item_counter (item counting disabled)
70 To enable item counting use \p cds::atomicity::item_counter
71 - \p opt::padding - padding for internal critical atomic data. Default is \p opt::cache_line_padding
72 - \p opt::memory_model - C++ memory ordering model. Can be \p opt::v::relaxed_ordering (relaxed memory model, the default)
73 or \p opt::v::sequential_consistent (sequentially consisnent memory model).
75 Example: declare \p %VyukovMPMCCycleQueue with item counting and static iternal buffer of size 1024:
77 typedef cds::container::VyukovMPMCCycleQueue< Foo,
78 typename cds::container::vyukov_queue::make_traits<
79 cds::opt::buffer< cds::opt::v::static_buffer< void *, 1024 >,
80 cds::opt::item_counte< cds::atomicity::item_counter >
85 template <typename... Options>
87 # ifdef CDS_DOXYGEN_INVOKED
88 typedef implementation_defined type; ///< Metafunction result
90 typedef typename cds::opt::make_options<
91 typename cds::opt::find_type_traits< traits, Options... >::type
97 } //namespace vyukov_queue
99 /// Vyukov's MPMC bounded queue
100 /** @ingroup cds_nonintrusive_queue
101 This algorithm is developed by Dmitry Vyukov (see http://www.1024cores.net)
102 It's multi-producer multi-consumer (MPMC), array-based, fails on overflow, does not require GC, w/o priorities, causal FIFO,
103 blocking producers and consumers queue. The algorithm is pretty simple and fast. It's not lock-free in the official meaning,
104 just implemented by means of atomic RMW operations w/o mutexes.
106 The cost of enqueue/dequeue is 1 CAS per operation.
107 No dynamic memory allocation/management during operation. Producers and consumers are separated from each other (as in the two-lock queue),
108 i.e. do not touch the same data while queue is not empty.
111 - http://www.1024cores.net/home/lock-free-algorithms/queues/bounded-mpmc-queue
114 - \p T - type stored in queue.
115 - \p Traits - queue traits, default is \p vykov_queue::traits. You can use \p vykov_queue::make_traits
116 metafunction to make your traits or just derive your traits from \p %vykov_queue::traits:
118 struct myTraits: public cds::container::vykov_queue::traits {
119 typedef cds::atomicity::item_counter item_counter;
121 typedef cds::container::VyukovMPMCCycleQueue< Foo, myTraits > myQueue;
123 // Equivalent make_traits example:
124 typedef cds::container::VyukovMPMCCycleQueue< cds::gc::HP, Foo,
125 typename cds::container::vykov_queue::make_traits<
126 cds::opt::item_counter< cds::atomicity::item_counter >
132 Simplified BSD license by Dmitry Vyukov (http://www.1024cores.net/site/1024cores/home/code-license)
134 template <typename T, typename Traits = vyukov_queue::traits >
135 class VyukovMPMCCycleQueue : public cds::bounded_container
138 typedef T value_type; ///< Value type to be stored in the queue
139 typedef Traits traits; ///< Queue traits
140 typedef typename traits::item_counter item_counter; ///< Item counter type
141 typedef typename traits::memory_model memory_model; ///< Memory ordering. See cds::opt::memory_model option
142 typedef typename traits::value_cleaner value_cleaner; ///< Value cleaner, see \p vyukov_queue::traits::value_cleaner
143 typedef typename traits::back_off back_off; ///< back-off strategy
145 /// Rebind template arguments
146 template <typename T2, typename Traits2>
148 typedef VyukovMPMCCycleQueue< T2, Traits2 > other ; ///< Rebinding result
153 typedef atomics::atomic<size_t> sequence_type;
156 sequence_type sequence;
163 typedef typename traits::buffer::template rebind<cell_type>::other buffer;
169 size_t const m_nBufferMask;
170 typename opt::details::apply_padding< size_t, traits::padding >::padding_type pad1_;
171 sequence_type m_posEnqueue;
172 typename opt::details::apply_padding< sequence_type, traits::padding >::padding_type pad2_;
173 sequence_type m_posDequeue;
174 typename opt::details::apply_padding< sequence_type, traits::padding >::padding_type pad3_;
175 item_counter m_ItemCounter;
179 /// Constructs the queue of capacity \p nCapacity
181 For \p cds::opt::v::static_buffer the \p nCapacity parameter is ignored.
183 The buffer capacity must be the power of two.
185 VyukovMPMCCycleQueue(
188 : m_buffer( nCapacity )
189 , m_nBufferMask( m_buffer.capacity() - 1 )
191 nCapacity = m_buffer.capacity();
193 // Buffer capacity must be power of 2
194 assert( nCapacity >= 2 && (nCapacity & (nCapacity - 1)) == 0 );
196 for (size_t i = 0; i != nCapacity; ++i )
197 m_buffer[i].sequence.store(i, memory_model::memory_order_relaxed);
199 m_posEnqueue.store(0, memory_model::memory_order_relaxed);
200 m_posDequeue.store(0, memory_model::memory_order_relaxed);
203 ~VyukovMPMCCycleQueue()
208 /// Enqueues data to the queue using a functor
210 \p Func is a functor called to copy a value to the queue cell.
211 The functor \p f takes one argument - a reference to a empty cell of type \ref value_type :
213 cds::container::VyukovMPMCCycleQueue< Foo > myQueue;
215 myQueue.enqueue_with( [&bar]( Foo& dest ) { dest = std::move(bar); } );
218 template <typename Func>
219 bool enqueue_with(Func f)
224 size_t pos = m_posEnqueue.load(memory_model::memory_order_relaxed);
227 cell = &m_buffer[pos & m_nBufferMask];
228 size_t seq = cell->sequence.load(memory_model::memory_order_acquire);
230 intptr_t dif = static_cast<intptr_t>(seq) - static_cast<intptr_t>(pos);
233 if ( m_posEnqueue.compare_exchange_weak(pos, pos + 1, memory_model::memory_order_relaxed, atomics::memory_order_relaxed ))
238 if ( pos - m_posDequeue.load( memory_model::memory_order_relaxed ) == capacity() )
239 return false; // queue full
241 pos = m_posEnqueue.load( memory_model::memory_order_relaxed );
244 pos = m_posEnqueue.load(memory_model::memory_order_relaxed);
249 cell->sequence.store(pos + 1, memory_model::memory_order_release);
255 /// Enqueues \p val value into the queue.
257 The new queue item is created by calling placement new in free cell.
258 Returns \p true if success, \p false if the queue is full.
260 bool enqueue( value_type const& val )
262 return enqueue_with( [&val]( value_type& dest ){ new ( &dest ) value_type( val ); });
265 /// Synonym for \p enqueue()
266 bool push( value_type const& data )
268 return enqueue( data );
271 /// Synonym for \p enqueue_with()
272 template <typename Func>
273 bool push_with( Func f )
275 return enqueue_with( f );
278 /// Enqueues data of type \ref value_type constructed with <tt>std::forward<Args>(args)...</tt>
279 template <typename... Args>
280 bool emplace( Args&&... args )
282 #if (CDS_COMPILER == CDS_COMPILER_GCC) && (CDS_COMPILER_VERSION < 40900)
283 //work around unsupported feature in g++ 4.8 for forwarding parameter packs to lambda.
284 return enqueue_with ( std::bind([]( value_type& dest,Args ... args ){ new ( &dest ) value_type( std::forward<Args>(args)... );}, std::placeholders::_1 ,args...));
286 return enqueue_with( [&args ...]( value_type& dest ){ new ( &dest ) value_type( std::forward<Args>(args)... ); });
290 /// Dequeues a value using a functor
292 \p Func is a functor called to copy dequeued value.
293 The functor takes one argument - a reference to removed node:
295 cds:container::VyukovMPMCCycleQueue< Foo > myQueue;
297 myQueue.dequeue_with( [&bar]( Foo& src ) { bar = std::move( src );});
299 The functor is called only if the queue is not empty.
301 template <typename Func>
302 bool dequeue_with( Func f )
307 size_t pos = m_posDequeue.load( memory_model::memory_order_relaxed );
310 cell = &m_buffer[pos & m_nBufferMask];
311 size_t seq = cell->sequence.load(memory_model::memory_order_acquire);
312 intptr_t dif = static_cast<intptr_t>(seq) - static_cast<intptr_t>(pos + 1);
315 if ( m_posDequeue.compare_exchange_weak(pos, pos + 1, memory_model::memory_order_relaxed, atomics::memory_order_relaxed))
320 if ( pos - m_posEnqueue.load( memory_model::memory_order_relaxed ) == 0 )
321 return false; // queue empty
323 pos = m_posDequeue.load( memory_model::memory_order_relaxed );
326 pos = m_posDequeue.load(memory_model::memory_order_relaxed);
330 value_cleaner()( cell->data );
331 cell->sequence.store( pos + m_nBufferMask + 1, memory_model::memory_order_release );
337 /// Dequeues a value from the queue
339 If queue is not empty, the function returns \p true, \p dest contains copy of
340 dequeued value. The assignment operator for type \ref value_type is invoked.
341 If queue is empty, the function returns \p false, \p dest is unchanged.
343 bool dequeue(value_type & dest )
345 return dequeue_with( [&dest]( value_type& src ){ dest = src; } );
348 /// Synonym for \p dequeue()
349 bool pop(value_type& data)
351 return dequeue(data);
354 /// Synonym for \p dequeue_with()
355 template <typename Func>
356 bool pop_with( Func f )
358 return dequeue_with( f );
361 /// Checks if the queue is empty
364 const cell_type * cell;
367 size_t pos = m_posDequeue.load(memory_model::memory_order_relaxed);
370 cell = &m_buffer[pos & m_nBufferMask];
371 size_t seq = cell->sequence.load(memory_model::memory_order_acquire);
372 intptr_t dif = static_cast<intptr_t>(seq) - static_cast<intptr_t>(pos + 1);
377 if ( pos - m_posEnqueue.load( memory_model::memory_order_relaxed ) == 0 )
381 pos = m_posDequeue.load(memory_model::memory_order_relaxed);
392 /// Returns queue's item count
394 The value returned depends on \p vyukov_queue::traits::item_counter option.
395 For \p atomicity::empty_item_counter, the function always returns 0.
399 return m_ItemCounter.value();
402 /// Returns capacity of the queue
403 size_t capacity() const
405 return m_buffer.capacity();
408 }} // namespace cds::container
410 #endif // #ifndef CDSLIB_CONTAINER_VYUKOV_MPMC_CYCLE_QUEUE_H