2 This file is a part of libcds - Concurrent Data Structures library
4 (C) Copyright Maxim Khizhinsky (libcds.dev@gmail.com) 2006-2016
6 Source code repo: http://github.com/khizmax/libcds/
7 Download: http://sourceforge.net/projects/libcds/files/
9 Redistribution and use in source and binary forms, with or without
10 modification, are permitted provided that the following conditions are met:
12 * Redistributions of source code must retain the above copyright notice, this
13 list of conditions and the following disclaimer.
15 * Redistributions in binary form must reproduce the above copyright notice,
16 this list of conditions and the following disclaimer in the documentation
17 and/or other materials provided with the distribution.
19 THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
20 AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
21 IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
22 DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE
23 FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
24 DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
25 SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
26 CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,
27 OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
28 OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
31 #ifndef CDSLIB_CONTAINER_VYUKOV_MPMC_CYCLE_QUEUE_H
32 #define CDSLIB_CONTAINER_VYUKOV_MPMC_CYCLE_QUEUE_H
34 #include <cds/container/details/base.h>
35 #include <cds/opt/buffer.h>
36 #include <cds/opt/value_cleaner.h>
37 #include <cds/algo/atomic.h>
38 #include <cds/details/bounded_container.h>
40 namespace cds { namespace container {
42 /// VyukovMPMCCycleQueue related definitions
43 /** @ingroup cds_nonintrusive_helper
45 namespace vyukov_queue {
47 /// VyukovMPMCCycleQueue default traits
49 /// Buffer type for internal array
51 The type of element for the buffer is not important: the queue rebinds
52 buffer for required type via \p rebind metafunction.
54 For \p VyukovMPMCCycleQueue queue the buffer size should have power-of-2 size.
56 typedef cds::opt::v::dynamic_buffer< void * > buffer;
58 /// A functor to clean item dequeued.
60 The functor calls the destructor for queue item.
61 After an item is dequeued, \p value_cleaner cleans the cell that the item has been occupied.
62 If \p T is a complex type, \p value_cleaner may be useful feature.
64 Default value is \ref opt::v::destruct_cleaner
66 typedef cds::opt::v::destruct_cleaner value_cleaner;
68 /// Item counting feature; by default, disabled. Use \p cds::atomicity::item_counter to enable item counting
69 typedef cds::atomicity::empty_item_counter item_counter;
71 /// C++ memory ordering model
73 Can be \p opt::v::relaxed_ordering (relaxed memory model, the default)
74 or \p opt::v::sequential_consistent (sequentially consistent memory model).
76 typedef opt::v::relaxed_ordering memory_model;
78 /// Padding for internal critical atomic data. Default is \p opt::cache_line_padding
79 enum { padding = opt::cache_line_padding };
82 typedef cds::backoff::Default back_off;
84 /// Single-consumer version
86 For single-consumer version of algorithm some additional functions
87 (\p front(), \p pop_front()) is available.
91 static CDS_CONSTEXPR bool const single_consumer = false;
94 /// Metafunction converting option list to \p vyukov_queue::traits
96 Supported \p Options are:
97 - \p opt::buffer - the buffer type for internal cyclic array. Possible types are:
98 \p opt::v::dynamic_buffer (the default), \p opt::v::static_buffer. The type of
99 element in the buffer is not important: it will be changed via \p rebind metafunction.
100 - \p opt::value_cleaner - a functor to clean item dequeued.
101 The functor calls the destructor for queue item.
102 After an item is dequeued, \p value_cleaner cleans the cell that the item has been occupied.
103 If \p T is a complex type, \p value_cleaner can be an useful feature.
104 Default value is \ref opt::v::destruct_cleaner
105 - \p opt::back_off - back-off strategy used. If the option is not specified, the \p cds::backoff::Default is used.
106 - \p opt::item_counter - the type of item counting feature. Default is \p cds::atomicity::empty_item_counter (item counting disabled)
107 To enable item counting use \p cds::atomicity::item_counter
108 - \p opt::padding - padding for internal critical atomic data. Default is \p opt::cache_line_padding
109 - \p opt::memory_model - C++ memory ordering model. Can be \p opt::v::relaxed_ordering (relaxed memory model, the default)
110 or \p opt::v::sequential_consistent (sequentially consisnent memory model).
112 Example: declare \p %VyukovMPMCCycleQueue with item counting and static iternal buffer of size 1024:
114 typedef cds::container::VyukovMPMCCycleQueue< Foo,
115 typename cds::container::vyukov_queue::make_traits<
116 cds::opt::buffer< cds::opt::v::static_buffer< void *, 1024 >,
117 cds::opt::item_counte< cds::atomicity::item_counter >
122 template <typename... Options>
124 # ifdef CDS_DOXYGEN_INVOKED
125 typedef implementation_defined type; ///< Metafunction result
127 typedef typename cds::opt::make_options<
128 typename cds::opt::find_type_traits< traits, Options... >::type
134 } //namespace vyukov_queue
136 /// Vyukov's MPMC bounded queue
137 /** @ingroup cds_nonintrusive_queue
138 This algorithm is developed by Dmitry Vyukov (see http://www.1024cores.net)
139 It's multi-producer multi-consumer (MPMC), array-based, fails on overflow, does not require GC, w/o priorities, causal FIFO,
140 blocking producers and consumers queue. The algorithm is pretty simple and fast. It's not lock-free in the official meaning,
141 just implemented by means of atomic RMW operations w/o mutexes.
143 The cost of enqueue/dequeue is 1 CAS per operation.
144 No dynamic memory allocation/management during operation. Producers and consumers are separated from each other (as in the two-lock queue),
145 i.e. do not touch the same data while queue is not empty.
147 There is multiple producer/single consumer version \p cds::container::VyukovMPSCCycleQueue
148 that supports \p front() and \p pop_front() functions.
151 - http://www.1024cores.net/home/lock-free-algorithms/queues/bounded-mpmc-queue
154 - \p T - type stored in queue.
155 - \p Traits - queue traits, default is \p vyukov_queue::traits. You can use \p vyukov_queue::make_traits
156 metafunction to make your traits or just derive your traits from \p %vyukov_queue::traits:
158 struct myTraits: public cds::container::vyukov_queue::traits {
159 typedef cds::atomicity::item_counter item_counter;
161 typedef cds::container::VyukovMPMCCycleQueue< Foo, myTraits > myQueue;
163 // Equivalent make_traits example:
164 typedef cds::container::VyukovMPMCCycleQueue< cds::gc::HP, Foo,
165 typename cds::container::vyukov_queue::make_traits<
166 cds::opt::item_counter< cds::atomicity::item_counter >
172 Simplified BSD license by Dmitry Vyukov (http://www.1024cores.net/site/1024cores/home/code-license)
174 template <typename T, typename Traits = vyukov_queue::traits >
175 class VyukovMPMCCycleQueue : public cds::bounded_container
178 typedef T value_type; ///< Value type to be stored in the queue
179 typedef Traits traits; ///< Queue traits
180 typedef typename traits::item_counter item_counter; ///< Item counter type
181 typedef typename traits::memory_model memory_model; ///< Memory ordering. See \p cds::opt::memory_model option
182 typedef typename traits::value_cleaner value_cleaner; ///< Value cleaner, see \p vyukov_queue::traits::value_cleaner
183 typedef typename traits::back_off back_off; ///< back-off strategy
185 /// \p true for single-consumer version, \p false otherwise
186 static CDS_CONSTEXPR bool const c_single_consumer = traits::single_consumer;
188 /// Rebind template arguments
189 template <typename T2, typename Traits2>
191 typedef VyukovMPMCCycleQueue< T2, Traits2 > other ; ///< Rebinding result
196 typedef atomics::atomic<size_t> sequence_type;
199 sequence_type sequence;
206 typedef typename traits::buffer::template rebind<cell_type>::other buffer;
212 size_t const m_nBufferMask;
213 typename opt::details::apply_padding< size_t, traits::padding >::padding_type pad1_;
214 sequence_type m_posEnqueue;
215 typename opt::details::apply_padding< sequence_type, traits::padding >::padding_type pad2_;
216 sequence_type m_posDequeue;
217 typename opt::details::apply_padding< sequence_type, traits::padding >::padding_type pad3_;
218 item_counter m_ItemCounter;
222 /// Constructs the queue of capacity \p nCapacity
224 For \p cds::opt::v::static_buffer the \p nCapacity parameter is ignored.
226 The buffer capacity must be the power of two.
228 VyukovMPMCCycleQueue(
231 : m_buffer( nCapacity )
232 , m_nBufferMask( m_buffer.capacity() - 1 )
234 nCapacity = m_buffer.capacity();
236 // Buffer capacity must be power of 2
237 assert( nCapacity >= 2 && (nCapacity & (nCapacity - 1)) == 0 );
239 for (size_t i = 0; i != nCapacity; ++i )
240 m_buffer[i].sequence.store(i, memory_model::memory_order_relaxed);
242 m_posEnqueue.store(0, memory_model::memory_order_relaxed);
243 m_posDequeue.store(0, memory_model::memory_order_relaxed);
246 ~VyukovMPMCCycleQueue()
251 /// Enqueues data to the queue using a functor
253 \p Func is a functor called to copy a value to the queue cell.
254 The functor \p f takes one argument - a reference to a empty cell of type \ref value_type :
256 cds::container::VyukovMPMCCycleQueue< Foo > myQueue;
258 myQueue.enqueue_with( [&bar]( Foo& dest ) { dest = std::move(bar); } );
261 template <typename Func>
262 bool enqueue_with(Func f)
267 size_t pos = m_posEnqueue.load(memory_model::memory_order_relaxed);
270 cell = &m_buffer[pos & m_nBufferMask];
271 size_t seq = cell->sequence.load(memory_model::memory_order_acquire);
273 intptr_t dif = static_cast<intptr_t>(seq) - static_cast<intptr_t>(pos);
276 if ( m_posEnqueue.compare_exchange_weak(pos, pos + 1, memory_model::memory_order_relaxed, atomics::memory_order_relaxed ))
281 if ( pos - m_posDequeue.load( memory_model::memory_order_relaxed ) == capacity() )
282 return false; // queue full
284 pos = m_posEnqueue.load( memory_model::memory_order_relaxed );
287 pos = m_posEnqueue.load(memory_model::memory_order_relaxed);
292 cell->sequence.store(pos + 1, memory_model::memory_order_release);
298 /// Enqueues \p val value into the queue.
300 The new queue item is created by calling placement new in free cell.
301 Returns \p true if success, \p false if the queue is full.
303 bool enqueue( value_type const& val )
305 return enqueue_with( [&val]( value_type& dest ){ new ( &dest ) value_type( val ); });
308 /// Enqueues \p val value into the queue, move semantics
309 bool enqueue( value_type&& val )
311 return enqueue_with( [&val]( value_type& dest ) { new (&dest) value_type( std::move( val ));});
314 /// Synonym for \p enqueue( valuetype const& )
315 bool push( value_type const& data )
317 return enqueue( data );
320 /// Synonym for \p enqueue( value_type&& )
321 bool push( value_type&& data )
323 return enqueue( std::move( data ));
326 /// Synonym for \p enqueue_with()
327 template <typename Func>
328 bool push_with( Func f )
330 return enqueue_with( f );
333 /// Enqueues data of type \ref value_type constructed with <tt>std::forward<Args>(args)...</tt>
334 template <typename... Args>
335 bool emplace( Args&&... args )
337 #if (CDS_COMPILER == CDS_COMPILER_GCC) && (CDS_COMPILER_VERSION < 40900)
338 //work around unsupported feature in g++ 4.8 for forwarding parameter packs to lambda.
339 value_type val( std::forward<Args>(args)... );
340 return enqueue_with( [&val]( value_type& dest ){ new ( &dest ) value_type( std::move( val )); });
342 return enqueue_with( [&args ...]( value_type& dest ){ new ( &dest ) value_type( std::forward<Args>( args )... ); });
346 /// Dequeues a value using a functor
348 \p Func is a functor called to copy dequeued value.
349 The functor takes one argument - a reference to removed node:
351 cds:container::VyukovMPMCCycleQueue< Foo > myQueue;
353 myQueue.dequeue_with( [&bar]( Foo& src ) { bar = std::move( src );});
355 The functor is called only if the queue is not empty.
357 template <typename Func>
358 bool dequeue_with( Func f )
363 size_t pos = m_posDequeue.load( memory_model::memory_order_relaxed );
366 cell = &m_buffer[pos & m_nBufferMask];
367 size_t seq = cell->sequence.load(memory_model::memory_order_acquire);
368 intptr_t dif = static_cast<intptr_t>(seq) - static_cast<intptr_t>(pos + 1);
371 if ( m_posDequeue.compare_exchange_weak(pos, pos + 1, memory_model::memory_order_relaxed, atomics::memory_order_relaxed))
376 if ( pos - m_posEnqueue.load( memory_model::memory_order_relaxed ) == 0 )
377 return false; // queue empty
379 pos = m_posDequeue.load( memory_model::memory_order_relaxed );
382 pos = m_posDequeue.load(memory_model::memory_order_relaxed);
386 value_cleaner()( cell->data );
387 cell->sequence.store( pos + m_nBufferMask + 1, memory_model::memory_order_release );
393 /// Dequeues a value from the queue
395 If queue is not empty, the function returns \p true, \p dest contains a copy of
396 dequeued value. The assignment operator for type \ref value_type is invoked.
397 If queue is empty, the function returns \p false, \p dest is unchanged.
399 bool dequeue(value_type& dest )
401 return dequeue_with( [&dest]( value_type& src ){ dest = std::move( src );});
404 /// Synonym for \p dequeue()
405 bool pop(value_type& data)
407 return dequeue(data);
410 /// Synonym for \p dequeue_with()
411 template <typename Func>
412 bool pop_with( Func f )
414 return dequeue_with( f );
417 /// Returns a pointer to top element of the queue or \p nullptr if queue is empty (only for single-consumer version)
418 template <bool SC = c_single_consumer >
419 typename std::enable_if<SC, value_type *>::type front()
421 static_assert( c_single_consumer, "front() is enabled only if traits::single_consumer is true");
426 size_t pos = m_posDequeue.load( memory_model::memory_order_relaxed );
429 cell = &m_buffer[pos & m_nBufferMask];
430 size_t seq = cell->sequence.load( memory_model::memory_order_acquire );
431 intptr_t dif = static_cast<intptr_t>(seq) - static_cast<intptr_t>(pos + 1);
435 else if ( dif < 0 ) {
437 if ( pos - m_posEnqueue.load( memory_model::memory_order_relaxed ) == 0 )
438 return nullptr; // queue empty
440 pos = m_posDequeue.load( memory_model::memory_order_relaxed );
443 pos = m_posDequeue.load( memory_model::memory_order_relaxed );
447 /// Pops top element; returns \p true if queue is not empty, \p false otherwise (only for single-consumer version)
448 template <bool SC = c_single_consumer >
449 typename std::enable_if<SC, bool>::type pop_front()
451 return dequeue_with( []( value_type& ) {} );
454 /// Checks if the queue is empty
457 const cell_type * cell;
460 size_t pos = m_posDequeue.load(memory_model::memory_order_relaxed);
463 cell = &m_buffer[pos & m_nBufferMask];
464 size_t seq = cell->sequence.load(memory_model::memory_order_acquire);
465 intptr_t dif = static_cast<intptr_t>(seq) - static_cast<intptr_t>(pos + 1);
470 if ( pos - m_posEnqueue.load( memory_model::memory_order_relaxed ) == 0 )
474 pos = m_posDequeue.load(memory_model::memory_order_relaxed);
485 /// Returns queue's item count
487 The value returned depends on \p vyukov_queue::traits::item_counter option.
488 For \p atomicity::empty_item_counter, the function always returns 0.
492 return m_ItemCounter.value();
495 /// Returns capacity of the queue
496 size_t capacity() const
498 return m_buffer.capacity();
503 namespace vyukov_queue {
505 template <typename Traits>
506 struct single_consumer_traits : public Traits
508 static CDS_CONSTEXPR bool const single_consumer = true;
510 } // namespace vyukov_queue
513 /// Vyukov's queue multiple producer - single consumer version
514 template <typename T, typename Traits = vyukov_queue::traits >
515 using VyukovMPSCCycleQueue = VyukovMPMCCycleQueue< T, vyukov_queue::single_consumer_traits<Traits> >;
517 }} // namespace cds::container
519 #endif // #ifndef CDSLIB_CONTAINER_VYUKOV_MPMC_CYCLE_QUEUE_H