2 This file is a part of libcds - Concurrent Data Structures library
4 (C) Copyright Maxim Khizhinsky (libcds.dev@gmail.com) 2006-2017
6 Source code repo: http://github.com/khizmax/libcds/
7 Download: http://sourceforge.net/projects/libcds/files/
9 Redistribution and use in source and binary forms, with or without
10 modification, are permitted provided that the following conditions are met:
12 * Redistributions of source code must retain the above copyright notice, this
13 list of conditions and the following disclaimer.
15 * Redistributions in binary form must reproduce the above copyright notice,
16 this list of conditions and the following disclaimer in the documentation
17 and/or other materials provided with the distribution.
19 THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
20 AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
21 IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
22 DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE
23 FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
24 DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
25 SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
26 CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,
27 OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
28 OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
31 #ifndef CDSLIB_CONTAINER_VYUKOV_MPMC_CYCLE_QUEUE_H
32 #define CDSLIB_CONTAINER_VYUKOV_MPMC_CYCLE_QUEUE_H
34 #include <cds/container/details/base.h>
35 #include <cds/opt/buffer.h>
36 #include <cds/opt/value_cleaner.h>
37 #include <cds/algo/atomic.h>
38 #include <cds/details/bounded_container.h>
40 namespace cds { namespace container {
42 /// VyukovMPMCCycleQueue related definitions
43 /** @ingroup cds_nonintrusive_helper
45 namespace vyukov_queue {
47 /// VyukovMPMCCycleQueue default traits
49 /// Buffer type for internal array
51 The type of element for the buffer is not important: the queue rebinds
52 the buffer for required type via \p rebind metafunction.
54 For \p VyukovMPMCCycleQueue queue the buffer size should have power-of-2 size.
56 You should use only uninitialized buffer for the queue -
57 \p cds::opt::v::uninitialized_dynamic_buffer (the default),
58 \p cds::opt::v::uninitialized_static_buffer.
60 typedef cds::opt::v::uninitialized_dynamic_buffer< void * > buffer;
62 /// A functor to clean item dequeued.
64 The functor calls the destructor for queue item.
65 After an item is dequeued, \p value_cleaner cleans the cell that the item has been occupied.
66 If \p T is a complex type, \p value_cleaner may be useful feature.
68 Default value is \ref opt::v::destruct_cleaner
70 typedef cds::opt::v::destruct_cleaner value_cleaner;
72 /// Item counting feature; by default, disabled. Use \p cds::atomicity::item_counter to enable item counting
73 typedef cds::atomicity::empty_item_counter item_counter;
75 /// C++ memory ordering model
77 Can be \p opt::v::relaxed_ordering (relaxed memory model, the default)
78 or \p opt::v::sequential_consistent (sequentially consistent memory model).
80 typedef opt::v::relaxed_ordering memory_model;
82 /// Padding for internal critical atomic data. Default is \p opt::cache_line_padding
83 enum { padding = opt::cache_line_padding };
86 typedef cds::backoff::Default back_off;
88 /// Single-consumer version
90 For single-consumer version of algorithm some additional functions
91 (\p front(), \p pop_front()) is available.
95 static CDS_CONSTEXPR bool const single_consumer = false;
98 /// Metafunction converting option list to \p vyukov_queue::traits
100 Supported \p Options are:
101 - \p opt::buffer - an uninitialized buffer type for internal cyclic array. Possible types are:
102 \p opt::v::uninitialized_dynamic_buffer (the default), \p opt::v::uninitialized_static_buffer. The type of
103 element in the buffer is not important: it will be changed via \p rebind metafunction.
104 - \p opt::value_cleaner - a functor to clean item dequeued.
105 The functor calls the destructor for queue item.
106 After an item is dequeued, \p value_cleaner cleans the cell that the item has been occupied.
107 If \p T is a complex type, \p value_cleaner can be an useful feature.
108 Default value is \ref opt::v::destruct_cleaner
109 - \p opt::back_off - back-off strategy used. If the option is not specified, the \p cds::backoff::Default is used.
110 - \p opt::item_counter - the type of item counting feature. Default is \p cds::atomicity::empty_item_counter (item counting disabled)
111 To enable item counting use \p cds::atomicity::item_counter
112 - \p opt::padding - padding for internal critical atomic data. Default is \p opt::cache_line_padding
113 - \p opt::memory_model - C++ memory ordering model. Can be \p opt::v::relaxed_ordering (relaxed memory model, the default)
114 or \p opt::v::sequential_consistent (sequentially consisnent memory model).
116 Example: declare \p %VyukovMPMCCycleQueue with item counting and static iternal buffer of size 1024:
118 typedef cds::container::VyukovMPMCCycleQueue< Foo,
119 typename cds::container::vyukov_queue::make_traits<
120 cds::opt::buffer< cds::opt::v::uninitialized_static_buffer< void *, 1024 >,
121 cds::opt::item_counte< cds::atomicity::item_counter >
126 template <typename... Options>
128 # ifdef CDS_DOXYGEN_INVOKED
129 typedef implementation_defined type; ///< Metafunction result
131 typedef typename cds::opt::make_options<
132 typename cds::opt::find_type_traits< traits, Options... >::type
138 } //namespace vyukov_queue
140 /// Vyukov's MPMC bounded queue
141 /** @ingroup cds_nonintrusive_queue
142 This algorithm is developed by Dmitry Vyukov (see http://www.1024cores.net)
143 It's multi-producer multi-consumer (MPMC), array-based, fails on overflow, does not require GC, w/o priorities, causal FIFO,
144 blocking producers and consumers queue. The algorithm is pretty simple and fast. It's not lock-free in the official meaning,
145 just implemented by means of atomic RMW operations w/o mutexes.
147 The cost of enqueue/dequeue is 1 CAS per operation.
148 No dynamic memory allocation/management during operation. Producers and consumers are separated from each other (as in the two-lock queue),
149 i.e. do not touch the same data while queue is not empty.
151 There is multiple producer/single consumer version \p cds::container::VyukovMPSCCycleQueue
152 that supports \p front() and \p pop_front() functions.
155 - http://www.1024cores.net/home/lock-free-algorithms/queues/bounded-mpmc-queue
158 - \p T - type stored in queue.
159 - \p Traits - queue traits, default is \p vyukov_queue::traits. You can use \p vyukov_queue::make_traits
160 metafunction to make your traits or just derive your traits from \p %vyukov_queue::traits:
162 struct myTraits: public cds::container::vyukov_queue::traits {
163 typedef cds::atomicity::item_counter item_counter;
165 typedef cds::container::VyukovMPMCCycleQueue< Foo, myTraits > myQueue;
167 // Equivalent make_traits example:
168 typedef cds::container::VyukovMPMCCycleQueue< cds::gc::HP, Foo,
169 typename cds::container::vyukov_queue::make_traits<
170 cds::opt::item_counter< cds::atomicity::item_counter >
176 Simplified BSD license by Dmitry Vyukov (http://www.1024cores.net/site/1024cores/home/code-license)
178 template <typename T, typename Traits = vyukov_queue::traits >
179 class VyukovMPMCCycleQueue : public cds::bounded_container
182 typedef T value_type; ///< Value type to be stored in the queue
183 typedef Traits traits; ///< Queue traits
184 typedef typename traits::item_counter item_counter; ///< Item counter type
185 typedef typename traits::memory_model memory_model; ///< Memory ordering. See \p cds::opt::memory_model option
186 typedef typename traits::value_cleaner value_cleaner; ///< Value cleaner, see \p vyukov_queue::traits::value_cleaner
187 typedef typename traits::back_off back_off; ///< back-off strategy
189 /// \p true for single-consumer version, \p false otherwise
190 static CDS_CONSTEXPR bool const c_single_consumer = traits::single_consumer;
192 /// Rebind template arguments
193 template <typename T2, typename Traits2>
195 typedef VyukovMPMCCycleQueue< T2, Traits2 > other ; ///< Rebinding result
200 typedef atomics::atomic<size_t> sequence_type;
203 sequence_type sequence;
210 typedef typename traits::buffer::template rebind<cell_type>::other buffer;
216 size_t const m_nBufferMask;
217 typename opt::details::apply_padding< size_t, traits::padding >::padding_type pad1_;
218 sequence_type m_posEnqueue;
219 typename opt::details::apply_padding< sequence_type, traits::padding >::padding_type pad2_;
220 sequence_type m_posDequeue;
221 typename opt::details::apply_padding< sequence_type, traits::padding >::padding_type pad3_;
222 item_counter m_ItemCounter;
226 /// Constructs the queue of capacity \p nCapacity
228 For \p cds::opt::v::uninitialized_static_buffer the \p nCapacity parameter is ignored.
230 The buffer capacity must be the power of two.
232 VyukovMPMCCycleQueue(
235 : m_buffer( nCapacity )
236 , m_nBufferMask( m_buffer.capacity() - 1 )
238 nCapacity = m_buffer.capacity();
240 // Buffer capacity must be power of 2
241 assert( nCapacity >= 2 && (nCapacity & (nCapacity - 1)) == 0 );
243 for (size_t i = 0; i != nCapacity; ++i )
244 m_buffer[i].sequence.store(i, memory_model::memory_order_relaxed);
246 m_posEnqueue.store(0, memory_model::memory_order_relaxed);
247 m_posDequeue.store(0, memory_model::memory_order_relaxed);
250 ~VyukovMPMCCycleQueue()
255 /// Enqueues data to the queue using a functor
257 \p Func is a functor called to copy a value to the queue cell.
258 The functor \p f takes one argument - a reference to a empty cell of type \ref value_type :
260 cds::container::VyukovMPMCCycleQueue< Foo > myQueue;
262 myQueue.enqueue_with( [&bar]( Foo& dest ) { dest = std::move(bar); } );
265 template <typename Func>
266 bool enqueue_with(Func f)
271 size_t pos = m_posEnqueue.load(memory_model::memory_order_relaxed);
274 cell = &m_buffer[pos & m_nBufferMask];
275 size_t seq = cell->sequence.load(memory_model::memory_order_acquire);
277 intptr_t dif = static_cast<intptr_t>(seq) - static_cast<intptr_t>(pos);
280 if ( m_posEnqueue.compare_exchange_weak(pos, pos + 1, memory_model::memory_order_relaxed, atomics::memory_order_relaxed ))
285 if ( pos - m_posDequeue.load( memory_model::memory_order_relaxed ) == capacity())
286 return false; // queue full
288 pos = m_posEnqueue.load( memory_model::memory_order_relaxed );
291 pos = m_posEnqueue.load(memory_model::memory_order_relaxed);
296 cell->sequence.store(pos + 1, memory_model::memory_order_release);
302 /// Enqueues \p val value into the queue.
304 The new queue item is created by calling placement new in free cell.
305 Returns \p true if success, \p false if the queue is full.
307 bool enqueue( value_type const& val )
309 return enqueue_with( [&val]( value_type& dest ){ new ( &dest ) value_type( val ); });
312 /// Enqueues \p val value into the queue, move semantics
313 bool enqueue( value_type&& val )
315 return enqueue_with( [&val]( value_type& dest ) { new (&dest) value_type( std::move( val ));});
318 /// Synonym for \p enqueue( valuetype const& )
319 bool push( value_type const& data )
321 return enqueue( data );
324 /// Synonym for \p enqueue( value_type&& )
325 bool push( value_type&& data )
327 return enqueue( std::move( data ));
330 /// Synonym for \p enqueue_with()
331 template <typename Func>
332 bool push_with( Func f )
334 return enqueue_with( f );
337 /// Enqueues data of type \ref value_type constructed with <tt>std::forward<Args>(args)...</tt>
338 template <typename... Args>
339 bool emplace( Args&&... args )
341 #if (CDS_COMPILER == CDS_COMPILER_GCC) && (CDS_COMPILER_VERSION < 40900)
342 //work around unsupported feature in g++ 4.8 for forwarding parameter packs to lambda.
343 value_type val( std::forward<Args>(args)... );
344 return enqueue_with( [&val]( value_type& dest ){ new ( &dest ) value_type( std::move( val )); });
346 return enqueue_with( [&args ...]( value_type& dest ){ new ( &dest ) value_type( std::forward<Args>( args )... ); });
350 /// Dequeues a value using a functor
352 \p Func is a functor called to copy dequeued value.
353 The functor takes one argument - a reference to removed node:
355 cds:container::VyukovMPMCCycleQueue< Foo > myQueue;
357 myQueue.dequeue_with( [&bar]( Foo& src ) { bar = std::move( src );});
359 The functor is called only if the queue is not empty.
361 template <typename Func>
362 bool dequeue_with( Func f )
367 size_t pos = m_posDequeue.load( memory_model::memory_order_relaxed );
370 cell = &m_buffer[pos & m_nBufferMask];
371 size_t seq = cell->sequence.load(memory_model::memory_order_acquire);
372 intptr_t dif = static_cast<intptr_t>(seq) - static_cast<intptr_t>(pos + 1);
375 if ( m_posDequeue.compare_exchange_weak(pos, pos + 1, memory_model::memory_order_relaxed, atomics::memory_order_relaxed))
380 if ( pos - m_posEnqueue.load( memory_model::memory_order_relaxed ) == 0 )
381 return false; // queue empty
383 pos = m_posDequeue.load( memory_model::memory_order_relaxed );
386 pos = m_posDequeue.load(memory_model::memory_order_relaxed);
390 value_cleaner()( cell->data );
391 cell->sequence.store( pos + m_nBufferMask + 1, memory_model::memory_order_release );
397 /// Dequeues a value from the queue
399 If queue is not empty, the function returns \p true, \p dest contains a copy of
400 dequeued value. The assignment operator for type \ref value_type is invoked.
401 If queue is empty, the function returns \p false, \p dest is unchanged.
403 bool dequeue(value_type& dest )
405 return dequeue_with( [&dest]( value_type& src ){ dest = std::move( src );});
408 /// Synonym for \p dequeue()
409 bool pop(value_type& data)
411 return dequeue(data);
414 /// Synonym for \p dequeue_with()
415 template <typename Func>
416 bool pop_with( Func f )
418 return dequeue_with( f );
421 /// Returns a pointer to top element of the queue or \p nullptr if queue is empty (only for single-consumer version)
422 template <bool SC = c_single_consumer >
423 typename std::enable_if<SC, value_type *>::type front()
425 static_assert( c_single_consumer, "front() is enabled only if traits::single_consumer is true");
430 size_t pos = m_posDequeue.load( memory_model::memory_order_relaxed );
433 cell = &m_buffer[pos & m_nBufferMask];
434 size_t seq = cell->sequence.load( memory_model::memory_order_acquire );
435 intptr_t dif = static_cast<intptr_t>(seq) - static_cast<intptr_t>(pos + 1);
439 else if ( dif < 0 ) {
441 if ( pos - m_posEnqueue.load( memory_model::memory_order_relaxed ) == 0 )
442 return nullptr; // queue empty
444 pos = m_posDequeue.load( memory_model::memory_order_relaxed );
447 pos = m_posDequeue.load( memory_model::memory_order_relaxed );
451 /// Pops top element; returns \p true if queue is not empty, \p false otherwise (only for single-consumer version)
452 template <bool SC = c_single_consumer >
453 typename std::enable_if<SC, bool>::type pop_front()
455 return dequeue_with( []( value_type& ) {} );
458 /// Checks if the queue is empty
461 const cell_type * cell;
464 size_t pos = m_posDequeue.load(memory_model::memory_order_relaxed);
467 cell = &m_buffer[pos & m_nBufferMask];
468 size_t seq = cell->sequence.load(memory_model::memory_order_acquire);
469 intptr_t dif = static_cast<intptr_t>(seq) - static_cast<intptr_t>(pos + 1);
474 if ( pos - m_posEnqueue.load( memory_model::memory_order_relaxed ) == 0 )
478 pos = m_posDequeue.load(memory_model::memory_order_relaxed);
489 /// Returns queue's item count
491 The value returned depends on \p vyukov_queue::traits::item_counter option.
492 For \p atomicity::empty_item_counter, the function always returns 0.
496 return m_ItemCounter.value();
499 /// Returns capacity of the queue
500 size_t capacity() const
502 return m_buffer.capacity();
507 namespace vyukov_queue {
509 template <typename Traits>
510 struct single_consumer_traits : public Traits
512 static CDS_CONSTEXPR bool const single_consumer = true;
514 } // namespace vyukov_queue
517 /// Vyukov's queue multiple producer - single consumer version
518 template <typename T, typename Traits = vyukov_queue::traits >
519 using VyukovMPSCCycleQueue = VyukovMPMCCycleQueue< T, vyukov_queue::single_consumer_traits<Traits> >;
521 }} // namespace cds::container
523 #endif // #ifndef CDSLIB_CONTAINER_VYUKOV_MPMC_CYCLE_QUEUE_H