2 This file is a part of libcds - Concurrent Data Structures library
4 (C) Copyright Maxim Khizhinsky (libcds.dev@gmail.com) 2006-2016
6 Source code repo: http://github.com/khizmax/libcds/
7 Download: http://sourceforge.net/projects/libcds/files/
9 Redistribution and use in source and binary forms, with or without
10 modification, are permitted provided that the following conditions are met:
12 * Redistributions of source code must retain the above copyright notice, this
13 list of conditions and the following disclaimer.
15 * Redistributions in binary form must reproduce the above copyright notice,
16 this list of conditions and the following disclaimer in the documentation
17 and/or other materials provided with the distribution.
19 THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
20 AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
21 IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
22 DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE
23 FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
24 DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
25 SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
26 CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,
27 OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
28 OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
31 #ifndef CDSLIB_CONTAINER_VYUKOV_MPMC_CYCLE_QUEUE_H
32 #define CDSLIB_CONTAINER_VYUKOV_MPMC_CYCLE_QUEUE_H
34 #include <cds/container/details/base.h>
35 #include <cds/opt/buffer.h>
36 #include <cds/opt/value_cleaner.h>
37 #include <cds/algo/atomic.h>
38 #include <cds/details/bounded_container.h>
40 namespace cds { namespace container {
42 /// VyukovMPMCCycleQueue related definitions
43 /** @ingroup cds_nonintrusive_helper
45 namespace vyukov_queue {
47 /// VyukovMPMCCycleQueue default traits
49 /// Buffer type for internal array
51 The type of element for the buffer is not important: the queue rebinds
52 buffer for required type via \p rebind metafunction.
54 For \p VyukovMPMCCycleQueue queue the buffer size should have power-of-2 size.
56 typedef cds::opt::v::dynamic_buffer< void * > buffer;
58 /// A functor to clean item dequeued.
60 The functor calls the destructor for queue item.
61 After an item is dequeued, \p value_cleaner cleans the cell that the item has been occupied.
62 If \p T is a complex type, \p value_cleaner may be the useful feature.
64 Default value is \ref opt::v::destruct_cleaner
66 typedef cds::opt::v::destruct_cleaner value_cleaner;
68 /// Item counting feature; by default, disabled. Use \p cds::atomicity::item_counter to enable item counting
69 typedef cds::atomicity::empty_item_counter item_counter;
71 /// C++ memory ordering model
73 Can be \p opt::v::relaxed_ordering (relaxed memory model, the default)
74 or \p opt::v::sequential_consistent (sequentially consisnent memory model).
76 typedef opt::v::relaxed_ordering memory_model;
78 /// Padding for internal critical atomic data. Default is \p opt::cache_line_padding
79 enum { padding = opt::cache_line_padding };
82 typedef cds::backoff::Default back_off;
84 /// Single-consumer version
86 For single-consumer version of algorithm some additional functions
88 (\p front(), \p pop_front()) is available.
92 static CDS_CONSTEXPR bool const single_consumer = false;
95 /// Metafunction converting option list to \p vyukov_queue::traits
97 Supported \p Options are:
98 - \p opt::buffer - the buffer type for internal cyclic array. Possible types are:
99 \p opt::v::dynamic_buffer (the default), \p opt::v::static_buffer. The type of
100 element in the buffer is not important: it will be changed via \p rebind metafunction.
101 - \p opt::value_cleaner - a functor to clean item dequeued.
102 The functor calls the destructor for queue item.
103 After an item is dequeued, \p value_cleaner cleans the cell that the item has been occupied.
104 If \p T is a complex type, \p value_cleaner can be an useful feature.
105 Default value is \ref opt::v::destruct_cleaner
106 - \p opt::back_off - back-off strategy used. If the option is not specified, the \p cds::backoff::Default is used.
107 - \p opt::item_counter - the type of item counting feature. Default is \p cds::atomicity::empty_item_counter (item counting disabled)
108 To enable item counting use \p cds::atomicity::item_counter
109 - \p opt::padding - padding for internal critical atomic data. Default is \p opt::cache_line_padding
110 - \p opt::memory_model - C++ memory ordering model. Can be \p opt::v::relaxed_ordering (relaxed memory model, the default)
111 or \p opt::v::sequential_consistent (sequentially consisnent memory model).
113 Example: declare \p %VyukovMPMCCycleQueue with item counting and static iternal buffer of size 1024:
115 typedef cds::container::VyukovMPMCCycleQueue< Foo,
116 typename cds::container::vyukov_queue::make_traits<
117 cds::opt::buffer< cds::opt::v::static_buffer< void *, 1024 >,
118 cds::opt::item_counte< cds::atomicity::item_counter >
123 template <typename... Options>
125 # ifdef CDS_DOXYGEN_INVOKED
126 typedef implementation_defined type; ///< Metafunction result
128 typedef typename cds::opt::make_options<
129 typename cds::opt::find_type_traits< traits, Options... >::type
135 } //namespace vyukov_queue
137 /// Vyukov's MPMC bounded queue
138 /** @ingroup cds_nonintrusive_queue
139 This algorithm is developed by Dmitry Vyukov (see http://www.1024cores.net)
140 It's multi-producer multi-consumer (MPMC), array-based, fails on overflow, does not require GC, w/o priorities, causal FIFO,
141 blocking producers and consumers queue. The algorithm is pretty simple and fast. It's not lock-free in the official meaning,
142 just implemented by means of atomic RMW operations w/o mutexes.
144 The cost of enqueue/dequeue is 1 CAS per operation.
145 No dynamic memory allocation/management during operation. Producers and consumers are separated from each other (as in the two-lock queue),
146 i.e. do not touch the same data while queue is not empty.
148 There is multiple producer/single consumer version \p cds::container::VyukovMPSCCycleQueue
149 that supports \p front() and \p pop_front() functions.
152 - http://www.1024cores.net/home/lock-free-algorithms/queues/bounded-mpmc-queue
155 - \p T - type stored in queue.
156 - \p Traits - queue traits, default is \p vykov_queue::traits. You can use \p vykov_queue::make_traits
157 metafunction to make your traits or just derive your traits from \p %vykov_queue::traits:
159 struct myTraits: public cds::container::vykov_queue::traits {
160 typedef cds::atomicity::item_counter item_counter;
162 typedef cds::container::VyukovMPMCCycleQueue< Foo, myTraits > myQueue;
164 // Equivalent make_traits example:
165 typedef cds::container::VyukovMPMCCycleQueue< cds::gc::HP, Foo,
166 typename cds::container::vykov_queue::make_traits<
167 cds::opt::item_counter< cds::atomicity::item_counter >
173 Simplified BSD license by Dmitry Vyukov (http://www.1024cores.net/site/1024cores/home/code-license)
175 template <typename T, typename Traits = vyukov_queue::traits >
176 class VyukovMPMCCycleQueue : public cds::bounded_container
179 typedef T value_type; ///< Value type to be stored in the queue
180 typedef Traits traits; ///< Queue traits
181 typedef typename traits::item_counter item_counter; ///< Item counter type
182 typedef typename traits::memory_model memory_model; ///< Memory ordering. See cds::opt::memory_model option
183 typedef typename traits::value_cleaner value_cleaner; ///< Value cleaner, see \p vyukov_queue::traits::value_cleaner
184 typedef typename traits::back_off back_off; ///< back-off strategy
186 /// \p true for single-consumer version, \p false otherwise
187 static CDS_CONSTEXPR bool const c_single_consumer = traits::single_consumer;
189 /// Rebind template arguments
190 template <typename T2, typename Traits2>
192 typedef VyukovMPMCCycleQueue< T2, Traits2 > other ; ///< Rebinding result
197 typedef atomics::atomic<size_t> sequence_type;
200 sequence_type sequence;
207 typedef typename traits::buffer::template rebind<cell_type>::other buffer;
213 size_t const m_nBufferMask;
214 typename opt::details::apply_padding< size_t, traits::padding >::padding_type pad1_;
215 sequence_type m_posEnqueue;
216 typename opt::details::apply_padding< sequence_type, traits::padding >::padding_type pad2_;
217 sequence_type m_posDequeue;
218 typename opt::details::apply_padding< sequence_type, traits::padding >::padding_type pad3_;
219 item_counter m_ItemCounter;
223 /// Constructs the queue of capacity \p nCapacity
225 For \p cds::opt::v::static_buffer the \p nCapacity parameter is ignored.
227 The buffer capacity must be the power of two.
229 VyukovMPMCCycleQueue(
232 : m_buffer( nCapacity )
233 , m_nBufferMask( m_buffer.capacity() - 1 )
235 nCapacity = m_buffer.capacity();
237 // Buffer capacity must be power of 2
238 assert( nCapacity >= 2 && (nCapacity & (nCapacity - 1)) == 0 );
240 for (size_t i = 0; i != nCapacity; ++i )
241 m_buffer[i].sequence.store(i, memory_model::memory_order_relaxed);
243 m_posEnqueue.store(0, memory_model::memory_order_relaxed);
244 m_posDequeue.store(0, memory_model::memory_order_relaxed);
247 ~VyukovMPMCCycleQueue()
252 /// Enqueues data to the queue using a functor
254 \p Func is a functor called to copy a value to the queue cell.
255 The functor \p f takes one argument - a reference to a empty cell of type \ref value_type :
257 cds::container::VyukovMPMCCycleQueue< Foo > myQueue;
259 myQueue.enqueue_with( [&bar]( Foo& dest ) { dest = std::move(bar); } );
262 template <typename Func>
263 bool enqueue_with(Func f)
268 size_t pos = m_posEnqueue.load(memory_model::memory_order_relaxed);
271 cell = &m_buffer[pos & m_nBufferMask];
272 size_t seq = cell->sequence.load(memory_model::memory_order_acquire);
274 intptr_t dif = static_cast<intptr_t>(seq) - static_cast<intptr_t>(pos);
277 if ( m_posEnqueue.compare_exchange_weak(pos, pos + 1, memory_model::memory_order_relaxed, atomics::memory_order_relaxed ))
282 if ( pos - m_posDequeue.load( memory_model::memory_order_relaxed ) == capacity() )
283 return false; // queue full
285 pos = m_posEnqueue.load( memory_model::memory_order_relaxed );
288 pos = m_posEnqueue.load(memory_model::memory_order_relaxed);
293 cell->sequence.store(pos + 1, memory_model::memory_order_release);
299 /// Enqueues \p val value into the queue.
301 The new queue item is created by calling placement new in free cell.
302 Returns \p true if success, \p false if the queue is full.
304 bool enqueue( value_type const& val )
306 return enqueue_with( [&val]( value_type& dest ){ new ( &dest ) value_type( val ); });
309 /// Synonym for \p enqueue()
310 bool push( value_type const& data )
312 return enqueue( data );
315 /// Synonym for \p enqueue_with()
316 template <typename Func>
317 bool push_with( Func f )
319 return enqueue_with( f );
322 /// Enqueues data of type \ref value_type constructed with <tt>std::forward<Args>(args)...</tt>
323 template <typename... Args>
324 bool emplace( Args&&... args )
326 #if (CDS_COMPILER == CDS_COMPILER_GCC) && (CDS_COMPILER_VERSION < 40900)
327 //work around unsupported feature in g++ 4.8 for forwarding parameter packs to lambda.
328 return enqueue_with ( std::bind([]( value_type& dest,Args ... args ){ new ( &dest ) value_type( std::forward<Args>(args)... );}, std::placeholders::_1 ,args...));
330 return enqueue_with( [&args ...]( value_type& dest ){ new ( &dest ) value_type( std::forward<Args>(args)... ); });
334 /// Dequeues a value using a functor
336 \p Func is a functor called to copy dequeued value.
337 The functor takes one argument - a reference to removed node:
339 cds:container::VyukovMPMCCycleQueue< Foo > myQueue;
341 myQueue.dequeue_with( [&bar]( Foo& src ) { bar = std::move( src );});
343 The functor is called only if the queue is not empty.
345 template <typename Func>
346 bool dequeue_with( Func f )
351 size_t pos = m_posDequeue.load( memory_model::memory_order_relaxed );
354 cell = &m_buffer[pos & m_nBufferMask];
355 size_t seq = cell->sequence.load(memory_model::memory_order_acquire);
356 intptr_t dif = static_cast<intptr_t>(seq) - static_cast<intptr_t>(pos + 1);
359 if ( m_posDequeue.compare_exchange_weak(pos, pos + 1, memory_model::memory_order_relaxed, atomics::memory_order_relaxed))
364 if ( pos - m_posEnqueue.load( memory_model::memory_order_relaxed ) == 0 )
365 return false; // queue empty
367 pos = m_posDequeue.load( memory_model::memory_order_relaxed );
370 pos = m_posDequeue.load(memory_model::memory_order_relaxed);
374 value_cleaner()( cell->data );
375 cell->sequence.store( pos + m_nBufferMask + 1, memory_model::memory_order_release );
381 /// Dequeues a value from the queue
383 If queue is not empty, the function returns \p true, \p dest contains copy of
384 dequeued value. The assignment operator for type \ref value_type is invoked.
385 If queue is empty, the function returns \p false, \p dest is unchanged.
387 bool dequeue(value_type & dest )
389 return dequeue_with( [&dest]( value_type& src ){ dest = src; } );
392 /// Synonym for \p dequeue()
393 bool pop(value_type& data)
395 return dequeue(data);
398 /// Synonym for \p dequeue_with()
399 template <typename Func>
400 bool pop_with( Func f )
402 return dequeue_with( f );
405 /// Returns a pointer to top element of the queue or \p nullptr if queue is empty (only for single-consumer version)
406 template <bool SC = c_single_consumer >
407 typename std::enable_if<SC, value_type *>::type front()
409 static_assert( c_single_consumer, "front() is enabled only if traits::single_consumer is true");
414 size_t pos = m_posDequeue.load( memory_model::memory_order_relaxed );
417 cell = &m_buffer[pos & m_nBufferMask];
418 size_t seq = cell->sequence.load( memory_model::memory_order_acquire );
419 intptr_t dif = static_cast<intptr_t>(seq) - static_cast<intptr_t>(pos + 1);
423 else if ( dif < 0 ) {
425 if ( pos - m_posEnqueue.load( memory_model::memory_order_relaxed ) == 0 )
426 return nullptr; // queue empty
428 pos = m_posDequeue.load( memory_model::memory_order_relaxed );
431 pos = m_posDequeue.load( memory_model::memory_order_relaxed );
435 /// Pops top element; returns \p true if queue is not empty, \p false otherwise (only for single-consumer version)
436 template <bool SC = c_single_consumer >
437 typename std::enable_if<SC, bool>::type pop_front()
439 return dequeue_with( []( value_type& ) {} );
442 /// Checks if the queue is empty
445 const cell_type * cell;
448 size_t pos = m_posDequeue.load(memory_model::memory_order_relaxed);
451 cell = &m_buffer[pos & m_nBufferMask];
452 size_t seq = cell->sequence.load(memory_model::memory_order_acquire);
453 intptr_t dif = static_cast<intptr_t>(seq) - static_cast<intptr_t>(pos + 1);
458 if ( pos - m_posEnqueue.load( memory_model::memory_order_relaxed ) == 0 )
462 pos = m_posDequeue.load(memory_model::memory_order_relaxed);
473 /// Returns queue's item count
475 The value returned depends on \p vyukov_queue::traits::item_counter option.
476 For \p atomicity::empty_item_counter, the function always returns 0.
480 return m_ItemCounter.value();
483 /// Returns capacity of the queue
484 size_t capacity() const
486 return m_buffer.capacity();
491 namespace vyukov_queue {
493 template <typename Traits>
494 struct single_consumer_traits : public Traits
496 static CDS_CONSTEXPR bool const single_consumer = true;
498 } // namespace vyukov_queue
501 /// Vyukov's queue multiple producer - single consumer version
503 template <typename T, typename Traits = vyukov_queue::traits >
504 using VyukovMPSCCycleQueue = VyukovMPMCCycleQueue< T, vyukov_queue::single_consumer_traits<Traits> >;
506 }} // namespace cds::container
508 #endif // #ifndef CDSLIB_CONTAINER_VYUKOV_MPMC_CYCLE_QUEUE_H