Rearranged GCC 4.8 workaround, docfix
[libcds.git] / cds / container / vyukov_mpmc_cycle_queue.h
1 /*
2     This file is a part of libcds - Concurrent Data Structures library
3
4     (C) Copyright Maxim Khizhinsky (libcds.dev@gmail.com) 2006-2016
5
6     Source code repo: http://github.com/khizmax/libcds/
7     Download: http://sourceforge.net/projects/libcds/files/
8     
9     Redistribution and use in source and binary forms, with or without
10     modification, are permitted provided that the following conditions are met:
11
12     * Redistributions of source code must retain the above copyright notice, this
13       list of conditions and the following disclaimer.
14
15     * Redistributions in binary form must reproduce the above copyright notice,
16       this list of conditions and the following disclaimer in the documentation
17       and/or other materials provided with the distribution.
18
19     THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
20     AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
21     IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
22     DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE
23     FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
24     DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
25     SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
26     CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,
27     OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
28     OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.     
29 */
30
31 #ifndef CDSLIB_CONTAINER_VYUKOV_MPMC_CYCLE_QUEUE_H
32 #define CDSLIB_CONTAINER_VYUKOV_MPMC_CYCLE_QUEUE_H
33
34 #include <cds/container/details/base.h>
35 #include <cds/opt/buffer.h>
36 #include <cds/opt/value_cleaner.h>
37 #include <cds/algo/atomic.h>
38 #include <cds/details/bounded_container.h>
39
40 namespace cds { namespace container {
41
42     /// VyukovMPMCCycleQueue related definitions
43     /** @ingroup cds_nonintrusive_helper
44     */
45     namespace vyukov_queue {
46
47         /// VyukovMPMCCycleQueue default traits
48         struct traits {
49             /// Buffer type for internal array
50             /*
51                 The type of element for the buffer is not important: the queue rebinds
52                 buffer for required type via \p rebind metafunction.
53
54                 For \p VyukovMPMCCycleQueue queue the buffer size should have power-of-2 size.
55             */
56             typedef cds::opt::v::dynamic_buffer< void * > buffer;
57
58             /// A functor to clean item dequeued.
59             /**
60                 The functor calls the destructor for queue item.
61                 After an item is dequeued, \p value_cleaner cleans the cell that the item has been occupied.
62                 If \p T is a complex type, \p value_cleaner may be useful feature.
63
64                 Default value is \ref opt::v::destruct_cleaner
65             */
66             typedef cds::opt::v::destruct_cleaner value_cleaner;
67
68             /// Item counting feature; by default, disabled. Use \p cds::atomicity::item_counter to enable item counting
69             typedef cds::atomicity::empty_item_counter item_counter;
70
71             /// C++ memory ordering model
72             /**
73                 Can be \p opt::v::relaxed_ordering (relaxed memory model, the default)
74                 or \p opt::v::sequential_consistent (sequentially consistent memory model).
75             */
76             typedef opt::v::relaxed_ordering    memory_model;
77
78             /// Padding for internal critical atomic data. Default is \p opt::cache_line_padding
79             enum { padding = opt::cache_line_padding };
80
81             /// Back-off strategy
82             typedef cds::backoff::Default           back_off;
83
84             /// Single-consumer version
85             /**
86                 For single-consumer version of algorithm some additional functions
87                 (\p front(), \p pop_front()) is available.
88
89                 Default is \p false
90             */
91             static CDS_CONSTEXPR bool const single_consumer = false;
92         };
93
94         /// Metafunction converting option list to \p vyukov_queue::traits
95         /**
96             Supported \p Options are:
97             - \p opt::buffer - the buffer type for internal cyclic array. Possible types are:
98                 \p opt::v::dynamic_buffer (the default), \p opt::v::static_buffer. The type of
99                 element in the buffer is not important: it will be changed via \p rebind metafunction.
100             - \p opt::value_cleaner - a functor to clean item dequeued.
101                 The functor calls the destructor for queue item.
102                 After an item is dequeued, \p value_cleaner cleans the cell that the item has been occupied.
103                 If \p T is a complex type, \p value_cleaner can be an useful feature.
104                 Default value is \ref opt::v::destruct_cleaner
105             - \p opt::back_off - back-off strategy used. If the option is not specified, the \p cds::backoff::Default is used.
106             - \p opt::item_counter - the type of item counting feature. Default is \p cds::atomicity::empty_item_counter (item counting disabled)
107                 To enable item counting use \p cds::atomicity::item_counter
108             - \p opt::padding - padding for internal critical atomic data. Default is \p opt::cache_line_padding
109             - \p opt::memory_model - C++ memory ordering model. Can be \p opt::v::relaxed_ordering (relaxed memory model, the default)
110                 or \p opt::v::sequential_consistent (sequentially consisnent memory model).
111
112             Example: declare \p %VyukovMPMCCycleQueue with item counting and static iternal buffer of size 1024:
113             \code
114             typedef cds::container::VyukovMPMCCycleQueue< Foo,
115                 typename cds::container::vyukov_queue::make_traits<
116                     cds::opt::buffer< cds::opt::v::static_buffer< void *, 1024 >,
117                     cds::opt::item_counte< cds::atomicity::item_counter >
118                 >::type
119             > myQueue;
120             \endcode
121         */
122         template <typename... Options>
123         struct make_traits {
124 #   ifdef CDS_DOXYGEN_INVOKED
125             typedef implementation_defined type;   ///< Metafunction result
126 #   else
127             typedef typename cds::opt::make_options<
128                 typename cds::opt::find_type_traits< traits, Options... >::type
129                 , Options...
130             >::type type;
131 #   endif
132         };
133
134     } //namespace vyukov_queue
135
136     /// Vyukov's MPMC bounded queue
137     /** @ingroup cds_nonintrusive_queue
138         This algorithm is developed by Dmitry Vyukov (see http://www.1024cores.net)
139         It's multi-producer multi-consumer (MPMC), array-based, fails on overflow, does not require GC, w/o priorities, causal FIFO,
140         blocking producers and consumers queue. The algorithm is pretty simple and fast. It's not lock-free in the official meaning,
141         just implemented by means of atomic RMW operations w/o mutexes.
142
143         The cost of enqueue/dequeue is 1 CAS per operation.
144         No dynamic memory allocation/management during operation. Producers and consumers are separated from each other (as in the two-lock queue),
145         i.e. do not touch the same data while queue is not empty.
146
147         There is multiple producer/single consumer version \p cds::container::VyukovMPSCCycleQueue
148         that supports \p front() and \p pop_front() functions.
149
150         Source:
151             - http://www.1024cores.net/home/lock-free-algorithms/queues/bounded-mpmc-queue
152
153         Template parameters
154         - \p T - type stored in queue.
155         - \p Traits - queue traits, default is \p vyukov_queue::traits. You can use \p vyukov_queue::make_traits
156             metafunction to make your traits or just derive your traits from \p %vyukov_queue::traits:
157             \code
158             struct myTraits: public cds::container::vyukov_queue::traits {
159                 typedef cds::atomicity::item_counter    item_counter;
160             };
161             typedef cds::container::VyukovMPMCCycleQueue< Foo, myTraits > myQueue;
162
163             // Equivalent make_traits example:
164             typedef cds::container::VyukovMPMCCycleQueue< cds::gc::HP, Foo,
165                 typename cds::container::vyukov_queue::make_traits<
166                     cds::opt::item_counter< cds::atomicity::item_counter >
167                 >::type
168             > myQueue;
169             \endcode
170
171         \par License
172             Simplified BSD license by Dmitry Vyukov (http://www.1024cores.net/site/1024cores/home/code-license)
173     */
174     template <typename T, typename Traits = vyukov_queue::traits >
175     class VyukovMPMCCycleQueue : public cds::bounded_container
176     {
177     public:
178         typedef T value_type;   ///< Value type to be stored in the queue
179         typedef Traits traits;  ///< Queue traits
180         typedef typename traits::item_counter  item_counter;  ///< Item counter type
181         typedef typename traits::memory_model  memory_model;  ///< Memory ordering. See \p cds::opt::memory_model option
182         typedef typename traits::value_cleaner value_cleaner; ///< Value cleaner, see \p vyukov_queue::traits::value_cleaner
183         typedef typename traits::back_off  back_off;          ///< back-off strategy
184
185         /// \p true for single-consumer version, \p false otherwise
186         static CDS_CONSTEXPR bool const c_single_consumer = traits::single_consumer;
187
188         /// Rebind template arguments
189         template <typename T2, typename Traits2>
190         struct rebind {
191             typedef VyukovMPMCCycleQueue< T2, Traits2 > other   ;   ///< Rebinding result
192         };
193
194     protected:
195         //@cond
196         typedef atomics::atomic<size_t> sequence_type;
197         struct cell_type
198         {
199             sequence_type   sequence;
200             value_type      data;
201
202             cell_type()
203             {}
204         };
205
206         typedef typename traits::buffer::template rebind<cell_type>::other buffer;
207         //@endcond
208
209     protected:
210         //@cond
211         buffer          m_buffer;
212         size_t const    m_nBufferMask;
213         typename opt::details::apply_padding< size_t, traits::padding >::padding_type pad1_;
214         sequence_type   m_posEnqueue;
215         typename opt::details::apply_padding< sequence_type, traits::padding >::padding_type pad2_;
216         sequence_type   m_posDequeue;
217         typename opt::details::apply_padding< sequence_type, traits::padding >::padding_type pad3_;
218         item_counter    m_ItemCounter;
219         //@endcond
220
221     public:
222         /// Constructs the queue of capacity \p nCapacity
223         /**
224             For \p cds::opt::v::static_buffer the \p nCapacity parameter is ignored.
225
226             The buffer capacity must be the power of two.
227         */
228         VyukovMPMCCycleQueue(
229             size_t nCapacity = 0
230             )
231             : m_buffer( nCapacity )
232             , m_nBufferMask( m_buffer.capacity() - 1 )
233         {
234             nCapacity = m_buffer.capacity();
235
236             // Buffer capacity must be power of 2
237             assert( nCapacity >= 2 && (nCapacity & (nCapacity - 1)) == 0 );
238
239             for (size_t i = 0; i != nCapacity; ++i )
240                 m_buffer[i].sequence.store(i, memory_model::memory_order_relaxed);
241
242             m_posEnqueue.store(0, memory_model::memory_order_relaxed);
243             m_posDequeue.store(0, memory_model::memory_order_relaxed);
244         }
245
246         ~VyukovMPMCCycleQueue()
247         {
248             clear();
249         }
250
251         /// Enqueues data to the queue using a functor
252         /**
253             \p Func is a functor called to copy a value to the queue cell.
254             The functor \p f takes one argument - a reference to a empty cell of type \ref value_type :
255             \code
256             cds::container::VyukovMPMCCycleQueue< Foo > myQueue;
257             Bar bar;
258             myQueue.enqueue_with( [&bar]( Foo& dest ) { dest = std::move(bar); } );
259             \endcode
260         */
261         template <typename Func>
262         bool enqueue_with(Func f)
263         {
264             cell_type* cell;
265             back_off bkoff;
266
267             size_t pos = m_posEnqueue.load(memory_model::memory_order_relaxed);
268             for (;;)
269             {
270                 cell = &m_buffer[pos & m_nBufferMask];
271                 size_t seq = cell->sequence.load(memory_model::memory_order_acquire);
272
273                 intptr_t dif = static_cast<intptr_t>(seq) - static_cast<intptr_t>(pos);
274
275                 if (dif == 0) {
276                     if ( m_posEnqueue.compare_exchange_weak(pos, pos + 1, memory_model::memory_order_relaxed, atomics::memory_order_relaxed ))
277                         break;
278                 }
279                 else if (dif < 0) {
280                     // Queue full?
281                     if ( pos - m_posDequeue.load( memory_model::memory_order_relaxed ) == capacity() )
282                         return false;   // queue full
283                     bkoff();
284                     pos = m_posEnqueue.load( memory_model::memory_order_relaxed );
285                 }
286                 else
287                     pos = m_posEnqueue.load(memory_model::memory_order_relaxed);
288             }
289
290             f( cell->data );
291
292             cell->sequence.store(pos + 1, memory_model::memory_order_release);
293             ++m_ItemCounter;
294
295             return true;
296         }
297
298         /// Enqueues \p val value into the queue.
299         /**
300             The new queue item is created by calling placement new in free cell.
301             Returns \p true if success, \p false if the queue is full.
302         */
303         bool enqueue( value_type const& val )
304         {
305             return enqueue_with( [&val]( value_type& dest ){ new ( &dest ) value_type( val ); });
306         }
307
308         /// Enqueues \p val value into the queue, move semantics
309         bool enqueue( value_type&& val )
310         {
311             return enqueue_with( [&val]( value_type& dest ) { new (&dest) value_type( std::move( val ));});
312         }
313
314         /// Synonym for \p enqueue( valuetype const& )
315         bool push( value_type const& data )
316         {
317             return enqueue( data );
318         }
319
320         /// Synonym for \p enqueue( value_type&& )
321         bool push( value_type&& data )
322         {
323             return enqueue( std::move( data ));
324         }
325
326         /// Synonym for \p enqueue_with()
327         template <typename Func>
328         bool push_with( Func f )
329         {
330             return enqueue_with( f );
331         }
332
333         /// Enqueues data of type \ref value_type constructed with <tt>std::forward<Args>(args)...</tt>
334         template <typename... Args>
335         bool emplace( Args&&... args )
336         {
337 #if (CDS_COMPILER == CDS_COMPILER_GCC) && (CDS_COMPILER_VERSION < 40900)
338             //work around unsupported feature in g++ 4.8 for forwarding parameter packs to lambda.
339             value_type val( std::forward<Args>(args)... );
340             return enqueue_with( [&val]( value_type& dest ){ new ( &dest ) value_type( std::move( val )); });
341 #else
342             return enqueue_with( [&args ...]( value_type& dest ){ new ( &dest ) value_type( std::forward<Args>( args )... ); });
343 #endif
344         }
345
346         /// Dequeues a value using a functor
347         /**
348             \p Func is a functor called to copy dequeued value.
349             The functor takes one argument - a reference to removed node:
350             \code
351             cds:container::VyukovMPMCCycleQueue< Foo > myQueue;
352             Bar bar;
353             myQueue.dequeue_with( [&bar]( Foo& src ) { bar = std::move( src );});
354             \endcode
355             The functor is called only if the queue is not empty.
356         */
357         template <typename Func>
358         bool dequeue_with( Func f )
359         {
360             cell_type * cell;
361             back_off bkoff;
362
363             size_t pos = m_posDequeue.load( memory_model::memory_order_relaxed );
364             for (;;)
365             {
366                 cell = &m_buffer[pos & m_nBufferMask];
367                 size_t seq = cell->sequence.load(memory_model::memory_order_acquire);
368                 intptr_t dif = static_cast<intptr_t>(seq) - static_cast<intptr_t>(pos + 1);
369
370                 if (dif == 0) {
371                     if ( m_posDequeue.compare_exchange_weak(pos, pos + 1, memory_model::memory_order_relaxed, atomics::memory_order_relaxed))
372                         break;
373                 }
374                 else if (dif < 0) {
375                     // Queue empty?
376                     if ( pos - m_posEnqueue.load( memory_model::memory_order_relaxed ) == 0 )
377                         return false;   // queue empty
378                     bkoff();
379                     pos = m_posDequeue.load( memory_model::memory_order_relaxed );
380                 }
381                 else
382                     pos = m_posDequeue.load(memory_model::memory_order_relaxed);
383             }
384
385             f( cell->data );
386             value_cleaner()( cell->data );
387             cell->sequence.store( pos + m_nBufferMask + 1, memory_model::memory_order_release );
388             --m_ItemCounter;
389
390             return true;
391         }
392
393         /// Dequeues a value from the queue
394         /**
395             If queue is not empty, the function returns \p true, \p dest contains a copy of
396             dequeued value. The assignment operator for type \ref value_type is invoked.
397             If queue is empty, the function returns \p false, \p dest is unchanged.
398         */
399         bool dequeue(value_type& dest )
400         {
401             return dequeue_with( [&dest]( value_type& src ){ dest = std::move( src );});
402         }
403
404         /// Synonym for \p dequeue()
405         bool pop(value_type& data)
406         {
407             return dequeue(data);
408         }
409
410         /// Synonym for \p dequeue_with()
411         template <typename Func>
412         bool pop_with( Func f )
413         {
414             return dequeue_with( f );
415         }
416
417         /// Returns a pointer to top element of the queue or \p nullptr if queue is empty (only for single-consumer version)
418         template <bool SC = c_single_consumer >
419         typename std::enable_if<SC, value_type *>::type front()
420         {
421             static_assert( c_single_consumer, "front() is enabled only if traits::single_consumer is true");
422
423             cell_type * cell;
424             back_off bkoff;
425
426             size_t pos = m_posDequeue.load( memory_model::memory_order_relaxed );
427             for ( ;;)
428             {
429                 cell = &m_buffer[pos & m_nBufferMask];
430                 size_t seq = cell->sequence.load( memory_model::memory_order_acquire );
431                 intptr_t dif = static_cast<intptr_t>(seq) - static_cast<intptr_t>(pos + 1);
432
433                 if ( dif == 0 )
434                     return &cell->data;
435                 else if ( dif < 0 ) {
436                     // Queue empty?
437                     if ( pos - m_posEnqueue.load( memory_model::memory_order_relaxed ) == 0 )
438                         return nullptr;   // queue empty
439                     bkoff();
440                     pos = m_posDequeue.load( memory_model::memory_order_relaxed );
441                 }
442                 else
443                     pos = m_posDequeue.load( memory_model::memory_order_relaxed );
444             }
445         }
446
447         /// Pops top element; returns \p true if queue is not empty, \p false otherwise (only for single-consumer version)
448         template <bool SC = c_single_consumer >
449         typename std::enable_if<SC, bool>::type pop_front()
450         {
451             return dequeue_with( []( value_type& ) {} );
452         }
453
454         /// Checks if the queue is empty
455         bool empty() const
456         {
457             const cell_type * cell;
458             back_off bkoff;
459
460             size_t pos = m_posDequeue.load(memory_model::memory_order_relaxed);
461             for (;;)
462             {
463                 cell = &m_buffer[pos & m_nBufferMask];
464                 size_t seq = cell->sequence.load(memory_model::memory_order_acquire);
465                 intptr_t dif = static_cast<intptr_t>(seq) - static_cast<intptr_t>(pos + 1);
466
467                 if (dif == 0)
468                     return false;
469                 else if (dif < 0) {
470                     if ( pos - m_posEnqueue.load( memory_model::memory_order_relaxed ) == 0 )
471                         return true;
472                 }
473                 bkoff();
474                 pos = m_posDequeue.load(memory_model::memory_order_relaxed);
475             }
476         }
477
478         /// Clears the queue
479         void clear()
480         {
481             value_type v;
482             while ( pop(v) );
483         }
484
485         /// Returns queue's item count
486         /**
487             The value returned depends on \p vyukov_queue::traits::item_counter option.
488             For \p atomicity::empty_item_counter, the function always returns 0.
489         */
490         size_t size() const
491         {
492             return m_ItemCounter.value();
493         }
494
495         /// Returns capacity of the queue
496         size_t capacity() const
497         {
498             return m_buffer.capacity();
499         }
500     };
501
502     //@cond
503     namespace vyukov_queue {
504
505         template <typename Traits>
506         struct single_consumer_traits : public Traits
507         {
508             static CDS_CONSTEXPR bool const single_consumer = true;
509         };
510     } // namespace vyukov_queue
511     //@endcond
512
513     /// Vyukov's queue multiple producer - single consumer version
514     template <typename T, typename Traits = vyukov_queue::traits >
515     using VyukovMPSCCycleQueue = VyukovMPMCCycleQueue< T, vyukov_queue::single_consumer_traits<Traits> >;
516
517 }}  // namespace cds::container
518
519 #endif // #ifndef CDSLIB_CONTAINER_VYUKOV_MPMC_CYCLE_QUEUE_H