Merged branch 'master' of https://github.com/Nemo1369/libcds
[libcds.git] / cds / container / vyukov_mpmc_cycle_queue.h
1 /*
2     This file is a part of libcds - Concurrent Data Structures library
3
4     (C) Copyright Maxim Khizhinsky (libcds.dev@gmail.com) 2006-2017
5
6     Source code repo: http://github.com/khizmax/libcds/
7     Download: http://sourceforge.net/projects/libcds/files/
8
9     Redistribution and use in source and binary forms, with or without
10     modification, are permitted provided that the following conditions are met:
11
12     * Redistributions of source code must retain the above copyright notice, this
13       list of conditions and the following disclaimer.
14
15     * Redistributions in binary form must reproduce the above copyright notice,
16       this list of conditions and the following disclaimer in the documentation
17       and/or other materials provided with the distribution.
18
19     THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
20     AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
21     IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
22     DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE
23     FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
24     DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
25     SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
26     CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,
27     OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
28     OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
29 */
30
31 #ifndef CDSLIB_CONTAINER_VYUKOV_MPMC_CYCLE_QUEUE_H
32 #define CDSLIB_CONTAINER_VYUKOV_MPMC_CYCLE_QUEUE_H
33
34 #include <cds/container/details/base.h>
35 #include <cds/opt/buffer.h>
36 #include <cds/opt/value_cleaner.h>
37 #include <cds/algo/atomic.h>
38 #include <cds/details/bounded_container.h>
39
40 namespace cds { namespace container {
41
42     /// VyukovMPMCCycleQueue related definitions
43     /** @ingroup cds_nonintrusive_helper
44     */
45     namespace vyukov_queue {
46
47         /// VyukovMPMCCycleQueue default traits
48         struct traits {
49             /// Buffer type for internal array
50             /*
51                 The type of element for the buffer is not important: the queue rebinds
52                 the buffer for required type via \p rebind metafunction.
53
54                 For \p VyukovMPMCCycleQueue queue the buffer size should have power-of-2 size.
55
56                 You should use only uninitialized buffer for the queue -
57                 \p cds::opt::v::uninitialized_dynamic_buffer (the default),
58                 \p cds::opt::v::uninitialized_static_buffer.
59             */
60             typedef cds::opt::v::uninitialized_dynamic_buffer< void * > buffer;
61
62             /// A functor to clean item dequeued.
63             /**
64                 The functor calls the destructor for queue item.
65                 After an item is dequeued, \p value_cleaner cleans the cell that the item has been occupied.
66                 If \p T is a complex type, \p value_cleaner may be useful feature.
67
68                 Default value is \ref opt::v::destruct_cleaner
69             */
70             typedef cds::opt::v::destruct_cleaner value_cleaner;
71
72             /// Item counting feature; by default, disabled. Use \p cds::atomicity::item_counter to enable item counting
73             typedef cds::atomicity::empty_item_counter item_counter;
74
75             /// C++ memory ordering model
76             /**
77                 Can be \p opt::v::relaxed_ordering (relaxed memory model, the default)
78                 or \p opt::v::sequential_consistent (sequentially consistent memory model).
79             */
80             typedef opt::v::relaxed_ordering    memory_model;
81
82             /// Padding for internal critical atomic data. Default is \p opt::cache_line_padding
83             enum { padding = opt::cache_line_padding };
84
85             /// Back-off strategy
86             typedef cds::backoff::Default           back_off;
87
88             /// Single-consumer version
89             /**
90                 For single-consumer version of algorithm some additional functions
91                 (\p front(), \p pop_front()) is available.
92
93                 Default is \p false
94             */
95             static CDS_CONSTEXPR bool const single_consumer = false;
96         };
97
98         /// Metafunction converting option list to \p vyukov_queue::traits
99         /**
100             Supported \p Options are:
101             - \p opt::buffer - an uninitialized buffer type for internal cyclic array. Possible types are:
102                 \p opt::v::uninitialized_dynamic_buffer (the default), \p opt::v::uninitialized_static_buffer. The type of
103                 element in the buffer is not important: it will be changed via \p rebind metafunction.
104             - \p opt::value_cleaner - a functor to clean item dequeued.
105                 The functor calls the destructor for queue item.
106                 After an item is dequeued, \p value_cleaner cleans the cell that the item has been occupied.
107                 If \p T is a complex type, \p value_cleaner can be an useful feature.
108                 Default value is \ref opt::v::destruct_cleaner
109             - \p opt::back_off - back-off strategy used. If the option is not specified, the \p cds::backoff::Default is used.
110             - \p opt::item_counter - the type of item counting feature. Default is \p cds::atomicity::empty_item_counter (item counting disabled)
111                 To enable item counting use \p cds::atomicity::item_counter
112             - \p opt::padding - padding for internal critical atomic data. Default is \p opt::cache_line_padding
113             - \p opt::memory_model - C++ memory ordering model. Can be \p opt::v::relaxed_ordering (relaxed memory model, the default)
114                 or \p opt::v::sequential_consistent (sequentially consisnent memory model).
115
116             Example: declare \p %VyukovMPMCCycleQueue with item counting and static iternal buffer of size 1024:
117             \code
118             typedef cds::container::VyukovMPMCCycleQueue< Foo,
119                 typename cds::container::vyukov_queue::make_traits<
120                     cds::opt::buffer< cds::opt::v::uninitialized_static_buffer< void *, 1024 >,
121                     cds::opt::item_counte< cds::atomicity::item_counter >
122                 >::type
123             > myQueue;
124             \endcode
125         */
126         template <typename... Options>
127         struct make_traits {
128 #   ifdef CDS_DOXYGEN_INVOKED
129             typedef implementation_defined type;   ///< Metafunction result
130 #   else
131             typedef typename cds::opt::make_options<
132                 typename cds::opt::find_type_traits< traits, Options... >::type
133                 , Options...
134             >::type type;
135 #   endif
136         };
137
138     } //namespace vyukov_queue
139
140     /// Vyukov's MPMC bounded queue
141     /** @ingroup cds_nonintrusive_queue
142         This algorithm is developed by Dmitry Vyukov (see http://www.1024cores.net)
143         It's multi-producer multi-consumer (MPMC), array-based, fails on overflow, does not require GC, w/o priorities, causal FIFO,
144         blocking producers and consumers queue. The algorithm is pretty simple and fast. It's not lock-free in the official meaning,
145         just implemented by means of atomic RMW operations w/o mutexes.
146
147         The cost of enqueue/dequeue is 1 CAS per operation.
148         No dynamic memory allocation/management during operation. Producers and consumers are separated from each other (as in the two-lock queue),
149         i.e. do not touch the same data while queue is not empty.
150
151         There is multiple producer/single consumer version \p cds::container::VyukovMPSCCycleQueue
152         that supports \p front() and \p pop_front() functions.
153
154         Source:
155             - http://www.1024cores.net/home/lock-free-algorithms/queues/bounded-mpmc-queue
156
157         Template parameters
158         - \p T - type stored in queue.
159         - \p Traits - queue traits, default is \p vyukov_queue::traits. You can use \p vyukov_queue::make_traits
160             metafunction to make your traits or just derive your traits from \p %vyukov_queue::traits:
161             \code
162             struct myTraits: public cds::container::vyukov_queue::traits {
163                 typedef cds::atomicity::item_counter    item_counter;
164             };
165             typedef cds::container::VyukovMPMCCycleQueue< Foo, myTraits > myQueue;
166
167             // Equivalent make_traits example:
168             typedef cds::container::VyukovMPMCCycleQueue< cds::gc::HP, Foo,
169                 typename cds::container::vyukov_queue::make_traits<
170                     cds::opt::item_counter< cds::atomicity::item_counter >
171                 >::type
172             > myQueue;
173             \endcode
174
175         \par License
176             Simplified BSD license by Dmitry Vyukov (http://www.1024cores.net/site/1024cores/home/code-license)
177     */
178     template <typename T, typename Traits = vyukov_queue::traits >
179     class VyukovMPMCCycleQueue : public cds::bounded_container
180     {
181     public:
182         typedef T value_type;   ///< Value type to be stored in the queue
183         typedef Traits traits;  ///< Queue traits
184         typedef typename traits::item_counter  item_counter;  ///< Item counter type
185         typedef typename traits::memory_model  memory_model;  ///< Memory ordering. See \p cds::opt::memory_model option
186         typedef typename traits::value_cleaner value_cleaner; ///< Value cleaner, see \p vyukov_queue::traits::value_cleaner
187         typedef typename traits::back_off  back_off;          ///< back-off strategy
188
189         /// \p true for single-consumer version, \p false otherwise
190         static CDS_CONSTEXPR bool const c_single_consumer = traits::single_consumer;
191
192         /// Rebind template arguments
193         template <typename T2, typename Traits2>
194         struct rebind {
195             typedef VyukovMPMCCycleQueue< T2, Traits2 > other   ;   ///< Rebinding result
196         };
197
198     protected:
199         //@cond
200         typedef atomics::atomic<size_t> sequence_type;
201         struct cell_type
202         {
203             sequence_type   sequence;
204             value_type      data;
205
206             cell_type()
207             {}
208         };
209
210         typedef typename traits::buffer::template rebind<cell_type>::other buffer;
211         //@endcond
212
213     protected:
214         //@cond
215         buffer          m_buffer;
216         size_t const    m_nBufferMask;
217         typename opt::details::apply_padding< size_t, traits::padding >::padding_type pad1_;
218         sequence_type   m_posEnqueue;
219         typename opt::details::apply_padding< sequence_type, traits::padding >::padding_type pad2_;
220         sequence_type   m_posDequeue;
221         typename opt::details::apply_padding< sequence_type, traits::padding >::padding_type pad3_;
222         item_counter    m_ItemCounter;
223         //@endcond
224
225     public:
226         /// Constructs the queue of capacity \p nCapacity
227         /**
228             For \p cds::opt::v::uninitialized_static_buffer the \p nCapacity parameter is ignored.
229
230             The buffer capacity must be the power of two.
231         */
232         VyukovMPMCCycleQueue(
233             size_t nCapacity = 0
234             )
235             : m_buffer( nCapacity )
236             , m_nBufferMask( m_buffer.capacity() - 1 )
237         {
238             nCapacity = m_buffer.capacity();
239
240             // Buffer capacity must be power of 2
241             assert( nCapacity >= 2 && (nCapacity & (nCapacity - 1)) == 0 );
242
243             for (size_t i = 0; i != nCapacity; ++i )
244                 m_buffer[i].sequence.store(i, memory_model::memory_order_relaxed);
245
246             m_posEnqueue.store(0, memory_model::memory_order_relaxed);
247             m_posDequeue.store(0, memory_model::memory_order_relaxed);
248         }
249
250         ~VyukovMPMCCycleQueue()
251         {
252             clear();
253         }
254
255         /// Enqueues data to the queue using a functor
256         /**
257             \p Func is a functor called to copy a value to the queue cell.
258             The functor \p f takes one argument - a reference to a empty cell of type \ref value_type :
259             \code
260             cds::container::VyukovMPMCCycleQueue< Foo > myQueue;
261             Bar bar;
262             myQueue.enqueue_with( [&bar]( Foo& dest ) { dest = std::move(bar); } );
263             \endcode
264         */
265         template <typename Func>
266         bool enqueue_with(Func f)
267         {
268             cell_type* cell;
269             back_off bkoff;
270
271             size_t pos = m_posEnqueue.load(memory_model::memory_order_relaxed);
272             for (;;)
273             {
274                 cell = &m_buffer[pos & m_nBufferMask];
275                 size_t seq = cell->sequence.load(memory_model::memory_order_acquire);
276
277                 intptr_t dif = static_cast<intptr_t>(seq) - static_cast<intptr_t>(pos);
278
279                 if (dif == 0) {
280                     if ( m_posEnqueue.compare_exchange_weak(pos, pos + 1, memory_model::memory_order_relaxed, atomics::memory_order_relaxed ))
281                         break;
282                 }
283                 else if (dif < 0) {
284                     // Queue full?
285                     if ( pos - m_posDequeue.load( memory_model::memory_order_relaxed ) == capacity())
286                         return false;   // queue full
287                     bkoff();
288                     pos = m_posEnqueue.load( memory_model::memory_order_relaxed );
289                 }
290                 else
291                     pos = m_posEnqueue.load(memory_model::memory_order_relaxed);
292             }
293
294             f( cell->data );
295
296             cell->sequence.store(pos + 1, memory_model::memory_order_release);
297             ++m_ItemCounter;
298
299             return true;
300         }
301
302         /// Enqueues \p val value into the queue.
303         /**
304             The new queue item is created by calling placement new in free cell.
305             Returns \p true if success, \p false if the queue is full.
306         */
307         bool enqueue( value_type const& val )
308         {
309             return enqueue_with( [&val]( value_type& dest ){ new ( &dest ) value_type( val ); });
310         }
311
312         /// Enqueues \p val value into the queue, move semantics
313         bool enqueue( value_type&& val )
314         {
315             return enqueue_with( [&val]( value_type& dest ) { new (&dest) value_type( std::move( val ));});
316         }
317
318         /// Synonym for \p enqueue( valuetype const& )
319         bool push( value_type const& data )
320         {
321             return enqueue( data );
322         }
323
324         /// Synonym for \p enqueue( value_type&& )
325         bool push( value_type&& data )
326         {
327             return enqueue( std::move( data ));
328         }
329
330         /// Synonym for \p enqueue_with()
331         template <typename Func>
332         bool push_with( Func f )
333         {
334             return enqueue_with( f );
335         }
336
337         /// Enqueues data of type \ref value_type constructed with <tt>std::forward<Args>(args)...</tt>
338         template <typename... Args>
339         bool emplace( Args&&... args )
340         {
341 #if (CDS_COMPILER == CDS_COMPILER_GCC) && (CDS_COMPILER_VERSION < 40900)
342             //work around unsupported feature in g++ 4.8 for forwarding parameter packs to lambda.
343             value_type val( std::forward<Args>(args)... );
344             return enqueue_with( [&val]( value_type& dest ){ new ( &dest ) value_type( std::move( val )); });
345 #else
346             return enqueue_with( [&args ...]( value_type& dest ){ new ( &dest ) value_type( std::forward<Args>( args )... ); });
347 #endif
348         }
349
350         /// Dequeues a value using a functor
351         /**
352             \p Func is a functor called to copy dequeued value.
353             The functor takes one argument - a reference to removed node:
354             \code
355             cds:container::VyukovMPMCCycleQueue< Foo > myQueue;
356             Bar bar;
357             myQueue.dequeue_with( [&bar]( Foo& src ) { bar = std::move( src );});
358             \endcode
359             The functor is called only if the queue is not empty.
360         */
361         template <typename Func>
362         bool dequeue_with( Func f )
363         {
364             cell_type * cell;
365             back_off bkoff;
366
367             size_t pos = m_posDequeue.load( memory_model::memory_order_relaxed );
368             for (;;)
369             {
370                 cell = &m_buffer[pos & m_nBufferMask];
371                 size_t seq = cell->sequence.load(memory_model::memory_order_acquire);
372                 intptr_t dif = static_cast<intptr_t>(seq) - static_cast<intptr_t>(pos + 1);
373
374                 if (dif == 0) {
375                     if ( m_posDequeue.compare_exchange_weak(pos, pos + 1, memory_model::memory_order_relaxed, atomics::memory_order_relaxed))
376                         break;
377                 }
378                 else if (dif < 0) {
379                     // Queue empty?
380                     if ( pos - m_posEnqueue.load( memory_model::memory_order_relaxed ) == 0 )
381                         return false;   // queue empty
382                     bkoff();
383                     pos = m_posDequeue.load( memory_model::memory_order_relaxed );
384                 }
385                 else
386                     pos = m_posDequeue.load(memory_model::memory_order_relaxed);
387             }
388
389             f( cell->data );
390             value_cleaner()( cell->data );
391             cell->sequence.store( pos + m_nBufferMask + 1, memory_model::memory_order_release );
392             --m_ItemCounter;
393
394             return true;
395         }
396
397         /// Dequeues a value from the queue
398         /**
399             If queue is not empty, the function returns \p true, \p dest contains a copy of
400             dequeued value. The assignment operator for type \ref value_type is invoked.
401             If queue is empty, the function returns \p false, \p dest is unchanged.
402         */
403         bool dequeue(value_type& dest )
404         {
405             return dequeue_with( [&dest]( value_type& src ){ dest = std::move( src );});
406         }
407
408         /// Synonym for \p dequeue()
409         bool pop(value_type& data)
410         {
411             return dequeue(data);
412         }
413
414         /// Synonym for \p dequeue_with()
415         template <typename Func>
416         bool pop_with( Func f )
417         {
418             return dequeue_with( f );
419         }
420
421         /// Returns a pointer to top element of the queue or \p nullptr if queue is empty (only for single-consumer version)
422         template <bool SC = c_single_consumer >
423         typename std::enable_if<SC, value_type *>::type front()
424         {
425             static_assert( c_single_consumer, "front() is enabled only if traits::single_consumer is true");
426
427             cell_type * cell;
428             back_off bkoff;
429
430             size_t pos = m_posDequeue.load( memory_model::memory_order_relaxed );
431             for ( ;;)
432             {
433                 cell = &m_buffer[pos & m_nBufferMask];
434                 size_t seq = cell->sequence.load( memory_model::memory_order_acquire );
435                 intptr_t dif = static_cast<intptr_t>(seq) - static_cast<intptr_t>(pos + 1);
436
437                 if ( dif == 0 )
438                     return &cell->data;
439                 else if ( dif < 0 ) {
440                     // Queue empty?
441                     if ( pos - m_posEnqueue.load( memory_model::memory_order_relaxed ) == 0 )
442                         return nullptr;   // queue empty
443                     bkoff();
444                     pos = m_posDequeue.load( memory_model::memory_order_relaxed );
445                 }
446                 else
447                     pos = m_posDequeue.load( memory_model::memory_order_relaxed );
448             }
449         }
450
451         /// Pops top element; returns \p true if queue is not empty, \p false otherwise (only for single-consumer version)
452         template <bool SC = c_single_consumer >
453         typename std::enable_if<SC, bool>::type pop_front()
454         {
455             return dequeue_with( []( value_type& ) {} );
456         }
457
458         /// Checks if the queue is empty
459         bool empty() const
460         {
461             const cell_type * cell;
462             back_off bkoff;
463
464             size_t pos = m_posDequeue.load(memory_model::memory_order_relaxed);
465             for (;;)
466             {
467                 cell = &m_buffer[pos & m_nBufferMask];
468                 size_t seq = cell->sequence.load(memory_model::memory_order_acquire);
469                 intptr_t dif = static_cast<intptr_t>(seq) - static_cast<intptr_t>(pos + 1);
470
471                 if (dif == 0)
472                     return false;
473                 else if (dif < 0) {
474                     if ( pos - m_posEnqueue.load( memory_model::memory_order_relaxed ) == 0 )
475                         return true;
476                 }
477                 bkoff();
478                 pos = m_posDequeue.load(memory_model::memory_order_relaxed);
479             }
480         }
481
482         /// Clears the queue
483         void clear()
484         {
485             value_type v;
486             while ( pop(v));
487         }
488
489         /// Returns queue's item count
490         /**
491             The value returned depends on \p vyukov_queue::traits::item_counter option.
492             For \p atomicity::empty_item_counter, the function always returns 0.
493         */
494         size_t size() const
495         {
496             return m_ItemCounter.value();
497         }
498
499         /// Returns capacity of the queue
500         size_t capacity() const
501         {
502             return m_buffer.capacity();
503         }
504     };
505
506     //@cond
507     namespace vyukov_queue {
508
509         template <typename Traits>
510         struct single_consumer_traits : public Traits
511         {
512             static CDS_CONSTEXPR bool const single_consumer = true;
513         };
514     } // namespace vyukov_queue
515     //@endcond
516
517     /// Vyukov's queue multiple producer - single consumer version
518     template <typename T, typename Traits = vyukov_queue::traits >
519     using VyukovMPSCCycleQueue = VyukovMPMCCycleQueue< T, vyukov_queue::single_consumer_traits<Traits> >;
520
521 }}  // namespace cds::container
522
523 #endif // #ifndef CDSLIB_CONTAINER_VYUKOV_MPMC_CYCLE_QUEUE_H