3 #ifndef __CDS_CONTAINER_VYUKOV_MPMC_CYCLE_QUEUE_H
4 #define __CDS_CONTAINER_VYUKOV_MPMC_CYCLE_QUEUE_H
6 #include <cds/container/base.h>
7 #include <cds/opt/buffer.h>
8 #include <cds/opt/value_cleaner.h>
9 #include <cds/cxx11_atomic.h>
11 #include <cds/details/trivial_assign.h>
12 #include <cds/details/bounded_container.h>
14 namespace cds { namespace container {
16 /// Vyukov's MPMC bounded queue
17 /** @ingroup cds_nonintrusive_queue
18 This algorithm is developed by Dmitry Vyukov (see http://www.1024cores.net)
19 It's multi-producer multi-consumer (MPMC), array-based, fails on overflow, does not require GC, w/o priorities, causal FIFO,
20 blocking producers and consumers queue. The algorithm is pretty simple and fast. It's not lock-free in the official meaning,
21 just implemented by means of atomic RMW operations w/o mutexes.
23 The cost of enqueue/dequeue is 1 CAS per operation.
24 No dynamic memory allocation/management during operation. Producers and consumers are separated from each other (as in the two-lock queue),
25 i.e. do not touch the same data while queue is not empty.
28 http://www.1024cores.net/home/lock-free-algorithms/queues/bounded-mpmc-queue
30 \par Template parameters
31 \li \p T - type stored in queue.
32 \li \p Options - queue's options
34 Options \p Options are:
35 - opt::buffer - buffer to store items. Mandatory option, see option description for full list of possible types.
36 - opt::item_counter - the type of item counting feature. Default is \ref atomicity::empty_item_counter
37 - opt::value_cleaner - a functor to clean item dequeued. Default value is \ref opt::v::destruct_cleaner
38 that calls the destructor of type \p T.
39 After an item is dequeued, \p value_cleaner cleans the cell that the item has been occupied. If \p T
40 is a complex type, \p value_cleaner may be the useful feature.
41 - opt::alignment - the alignment for internal queue data. Default is opt::cache_line_alignment
42 - opt::memory_model - C++ memory ordering model. Can be opt::v::relaxed_ordering (relaxed memory model, the default)
43 or opt::v::sequential_consistent (sequentially consisnent memory model).
46 Simplified BSD license by Dmitry Vyukov (http://www.1024cores.net/site/1024cores/home/code-license)
50 #include <cds/container/vyukov_mpmc_cycle_queue.h>
52 // // Queue with 1024 item static buffer
53 cds::container::vyukov_mpmc_bounded<
55 ,cds::opt::buffer< cds::opt::v::static_buffer<int, 1024> >
59 template <typename T, CDS_DECL_OPTIONS6>
60 class VyukovMPMCCycleQueue
61 : public cds::bounded_container
65 struct default_options
67 typedef cds::opt::v::destruct_cleaner value_cleaner;
68 typedef atomicity::empty_item_counter item_counter;
69 typedef opt::v::empty_disposer disposer ; // for intrusive version only
70 typedef opt::v::relaxed_ordering memory_model;
71 enum { alignment = opt::cache_line_alignment };
77 typedef typename opt::make_options<
78 typename cds::opt::find_type_traits< default_options, CDS_OPTIONS6 >::type
85 typedef typename options::value_cleaner value_cleaner;
89 typedef T value_type ; ///< @anchor cds_container_VyukovMPMCCycleQueue_value_type type of value stored in the queue
90 typedef typename options::item_counter item_counter ; ///< Item counter type
91 typedef typename options::memory_model memory_model ; ///< Memory ordering. See cds::opt::memory_model option
93 /// Rebind template arguments
94 template <typename T2, CDS_DECL_OTHER_OPTIONS6>
96 typedef VyukovMPMCCycleQueue< T2, CDS_OTHER_OPTIONS6> other ; ///< Rebinding result
101 typedef atomics::atomic<size_t> sequence_type;
104 sequence_type sequence;
111 # ifndef CDS_CXX11_LAMBDA_SUPPORT
112 struct copy_construct {
113 void operator()( value_type& dest, value_type const& src )
115 new ( &dest ) value_type( src );
120 typedef cds::details::trivial_assign< value_type, value_type > copy_assign;
122 typedef typename options::buffer::template rebind<cell_type>::other buffer;
123 typedef typename opt::details::alignment_setter< sequence_type, options::alignment >::type aligned_sequence_type;
124 typedef typename opt::details::alignment_setter< buffer, options::alignment >::type aligned_buffer;
129 aligned_buffer m_buffer;
130 size_t const m_nBufferMask;
131 aligned_sequence_type m_posEnqueue;
132 aligned_sequence_type m_posDequeue;
133 item_counter m_ItemCounter;
137 /// Constructs the queue of capacity \p nCapacity
139 For cds::opt::v::static_buffer the \p nCapacity parameter is ignored.
141 VyukovMPMCCycleQueue(
144 : m_buffer( nCapacity )
145 , m_nBufferMask( m_buffer.capacity() - 1 )
147 nCapacity = m_buffer.capacity();
149 // Buffer capacity must be power of 2
150 assert( nCapacity >= 2 && (nCapacity & (nCapacity - 1)) == 0 );
152 for (size_t i = 0; i != nCapacity; i += 1)
153 m_buffer[i].sequence.store(i, memory_model::memory_order_relaxed);
155 m_posEnqueue.store(0, memory_model::memory_order_relaxed);
156 m_posDequeue.store(0, memory_model::memory_order_relaxed);
159 ~VyukovMPMCCycleQueue()
164 /// Enqueues \p data to queue using copy functor
165 /** @anchor cds_container_VyukovMPMCCycleQueue_enqueue_func
166 \p Func is a functor called to copy value \p data of type \p Source
167 which may be differ from type \p T stored in the queue.
168 The functor's interface is:
171 void operator()(T& dest, Source const& data)
173 // // Code to copy \p data to \p dest
178 You may use \p boost:ref construction to pass functor \p f by reference.
180 <b>Requirements</b> The functor \p Func should not throw any exception.
182 template <typename Source, typename Func>
183 bool enqueue(Source const& data, Func func)
186 size_t pos = m_posEnqueue.load(memory_model::memory_order_relaxed);
190 cell = &m_buffer[pos & m_nBufferMask];
191 size_t seq = cell->sequence.load(memory_model::memory_order_acquire);
193 intptr_t dif = (intptr_t)seq - (intptr_t)pos;
197 if ( m_posEnqueue.compare_exchange_weak(pos, pos + 1, memory_model::memory_order_relaxed) )
203 pos = m_posEnqueue.load(memory_model::memory_order_relaxed);
206 unref(func)( cell->data, data );
208 cell->sequence.store(pos + 1, memory_model::memory_order_release);
214 /// @anchor cds_container_VyukovMPMCCycleQueue_enqueue Enqueues \p data to queue
215 bool enqueue(value_type const& data )
217 # ifdef CDS_CXX11_LAMBDA_SUPPORT
218 return enqueue( data, [](value_type& dest, value_type const& src){ new ( &dest ) value_type( src ); });
220 return enqueue( data, copy_construct() );
224 # ifdef CDS_EMPLACE_SUPPORT
225 /// Enqueues data of type \ref cds_container_VyukovMPMCCycleQueue_value_type "value_type" constructed with <tt>std::forward<Args>(args)...</tt>
227 This function is available only for compiler that supports
228 variadic template and move semantics
230 template <typename... Args>
231 bool emplace( Args&&... args )
234 size_t pos = m_posEnqueue.load(memory_model::memory_order_relaxed);
238 cell = &m_buffer[pos & m_nBufferMask];
239 size_t seq = cell->sequence.load(memory_model::memory_order_acquire);
241 intptr_t dif = (intptr_t)seq - (intptr_t)pos;
245 if ( m_posEnqueue.compare_exchange_weak(pos, pos + 1, memory_model::memory_order_relaxed) )
251 pos = m_posEnqueue.load(memory_model::memory_order_relaxed);
254 new ( &cell->data ) value_type( std::forward<Args>(args)... );
256 cell->sequence.store(pos + 1, memory_model::memory_order_release);
264 /// Dequeues an item from queue
265 /** @anchor cds_container_VyukovMPMCCycleQueue_dequeue_func
266 \p Func is a functor called to copy dequeued value of type \p T to \p dest of type \p Dest.
267 The functor's interface is:
270 void operator()(Dest& dest, T const& data)
272 // // Code to copy \p data to \p dest
277 You may use \p boost:ref construction to pass functor \p func by reference.
279 <b>Requirements</b> The functor \p Func should not throw any exception.
281 template <typename Dest, typename Func>
282 bool dequeue( Dest& data, Func func )
285 size_t pos = m_posDequeue.load(memory_model::memory_order_relaxed);
289 cell = &m_buffer[pos & m_nBufferMask];
290 size_t seq = cell->sequence.load(memory_model::memory_order_acquire);
291 intptr_t dif = (intptr_t)seq - (intptr_t)(pos + 1);
294 if ( m_posDequeue.compare_exchange_weak(pos, pos + 1, memory_model::memory_order_relaxed))
300 pos = m_posDequeue.load(memory_model::memory_order_relaxed);
303 unref(func)( data, cell->data );
304 value_cleaner()( cell->data );
306 cell->sequence.store( pos + m_nBufferMask + 1, memory_model::memory_order_release );
311 /// Dequeues an item from queue to \p data
312 /** @anchor cds_container_VyukovMPMCCycleQueue_dequeue
313 If queue is empty, returns \p false, \p data is unchanged.
315 bool dequeue(value_type & data )
317 return dequeue( data, copy_assign() );
320 /// Synonym of \ref cds_container_VyukovMPMCCycleQueue_enqueue "enqueue"
321 bool push(value_type const& data)
323 return enqueue(data);
326 /// Synonym for template version of \ref cds_container_VyukovMPMCCycleQueue_enqueue_func "enqueue" function
327 template <typename Source, typename Func>
328 bool push( const Source& data, Func f )
330 return enqueue( data, f );
333 /// Synonym of \ref cds_container_VyukovMPMCCycleQueue_dequeue "dequeue"
334 bool pop(value_type& data)
336 return dequeue(data);
339 /// Synonym for template version of \ref cds_container_VyukovMPMCCycleQueue_dequeue_func "dequeue" function
340 template <typename Type, typename Func>
341 bool pop( Type& dest, Func f )
343 return dequeue( dest, f );
346 /// Checks if the queue is empty
349 const cell_type * cell;
350 size_t pos = m_posDequeue.load(memory_model::memory_order_relaxed);
354 cell = &m_buffer[pos & m_nBufferMask];
355 size_t seq = cell->sequence.load(memory_model::memory_order_acquire);
356 intptr_t dif = (intptr_t)seq - (intptr_t)(pos + 1);
363 pos = m_posDequeue.load(memory_model::memory_order_relaxed);
374 /// Returns queue's item count
376 The value returned depends on opt::item_counter option. For atomicity::empty_item_counter,
377 this function always returns 0.
381 return m_ItemCounter.value();
384 /// Returns capacity of cyclic buffer
385 size_t capacity() const
387 return m_buffer.capacity();
390 }} // namespace cds::container
392 #endif // #ifndef __CDS_CONTAINER_VYUKOV_MPMC_CYCLE_QUEUE_H