Merge pull request #58 from rfw/patch-1
[libcds.git] / cds / container / vyukov_mpmc_cycle_queue.h
1 /*
2     This file is a part of libcds - Concurrent Data Structures library
3
4     (C) Copyright Maxim Khizhinsky (libcds.dev@gmail.com) 2006-2016
5
6     Source code repo: http://github.com/khizmax/libcds/
7     Download: http://sourceforge.net/projects/libcds/files/
8     
9     Redistribution and use in source and binary forms, with or without
10     modification, are permitted provided that the following conditions are met:
11
12     * Redistributions of source code must retain the above copyright notice, this
13       list of conditions and the following disclaimer.
14
15     * Redistributions in binary form must reproduce the above copyright notice,
16       this list of conditions and the following disclaimer in the documentation
17       and/or other materials provided with the distribution.
18
19     THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
20     AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
21     IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
22     DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE
23     FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
24     DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
25     SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
26     CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,
27     OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
28     OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.     
29 */
30
31 #ifndef CDSLIB_CONTAINER_VYUKOV_MPMC_CYCLE_QUEUE_H
32 #define CDSLIB_CONTAINER_VYUKOV_MPMC_CYCLE_QUEUE_H
33
34 #include <cds/container/details/base.h>
35 #include <cds/opt/buffer.h>
36 #include <cds/opt/value_cleaner.h>
37 #include <cds/algo/atomic.h>
38 #include <cds/details/bounded_container.h>
39
40 namespace cds { namespace container {
41
42     /// VyukovMPMCCycleQueue related definitions
43     /** @ingroup cds_nonintrusive_helper
44     */
45     namespace vyukov_queue {
46
47         /// VyukovMPMCCycleQueue default traits
48         struct traits {
49             /// Buffer type for internal array
50             /*
51                 The type of element for the buffer is not important: the queue rebinds
52                 buffer for required type via \p rebind metafunction.
53
54                 For \p VyukovMPMCCycleQueue queue the buffer size should have power-of-2 size.
55             */
56             typedef cds::opt::v::dynamic_buffer< void * > buffer;
57
58             /// A functor to clean item dequeued.
59             /**
60                 The functor  calls the destructor for queue item.
61                 After an item is dequeued, \p value_cleaner cleans the cell that the item has been occupied.
62                 If \p T is a complex type, \p value_cleaner may be the useful feature.
63
64                 Default value is \ref opt::v::destruct_cleaner
65             */
66             typedef cds::opt::v::destruct_cleaner value_cleaner;
67
68             /// Item counting feature; by default, disabled. Use \p cds::atomicity::item_counter to enable item counting
69             typedef cds::atomicity::empty_item_counter item_counter;
70
71             /// C++ memory ordering model
72             /**
73                 Can be \p opt::v::relaxed_ordering (relaxed memory model, the default)
74                 or \p opt::v::sequential_consistent (sequentially consisnent memory model).
75             */
76             typedef opt::v::relaxed_ordering    memory_model;
77
78             /// Padding for internal critical atomic data. Default is \p opt::cache_line_padding
79             enum { padding = opt::cache_line_padding };
80
81             /// Back-off strategy
82             typedef cds::backoff::Default           back_off;
83
84             /// Single-consumer version
85             /**
86                 For single-consumer version of algorithm some additional functions
87
88                 (\p front(), \p pop_front()) is available.
89
90                 Default is \p false
91             */
92             static CDS_CONSTEXPR bool const single_consumer = false;
93         };
94
95         /// Metafunction converting option list to \p vyukov_queue::traits
96         /**
97             Supported \p Options are:
98             - \p opt::buffer - the buffer type for internal cyclic array. Possible types are:
99                 \p opt::v::dynamic_buffer (the default), \p opt::v::static_buffer. The type of
100                 element in the buffer is not important: it will be changed via \p rebind metafunction.
101             - \p opt::value_cleaner - a functor to clean item dequeued.
102                 The functor calls the destructor for queue item.
103                 After an item is dequeued, \p value_cleaner cleans the cell that the item has been occupied.
104                 If \p T is a complex type, \p value_cleaner can be an useful feature.
105                 Default value is \ref opt::v::destruct_cleaner
106             - \p opt::back_off - back-off strategy used. If the option is not specified, the \p cds::backoff::Default is used.
107             - \p opt::item_counter - the type of item counting feature. Default is \p cds::atomicity::empty_item_counter (item counting disabled)
108                 To enable item counting use \p cds::atomicity::item_counter
109             - \p opt::padding - padding for internal critical atomic data. Default is \p opt::cache_line_padding
110             - \p opt::memory_model - C++ memory ordering model. Can be \p opt::v::relaxed_ordering (relaxed memory model, the default)
111                 or \p opt::v::sequential_consistent (sequentially consisnent memory model).
112
113             Example: declare \p %VyukovMPMCCycleQueue with item counting and static iternal buffer of size 1024:
114             \code
115             typedef cds::container::VyukovMPMCCycleQueue< Foo,
116                 typename cds::container::vyukov_queue::make_traits<
117                     cds::opt::buffer< cds::opt::v::static_buffer< void *, 1024 >,
118                     cds::opt::item_counte< cds::atomicity::item_counter >
119                 >::type
120             > myQueue;
121             \endcode
122         */
123         template <typename... Options>
124         struct make_traits {
125 #   ifdef CDS_DOXYGEN_INVOKED
126             typedef implementation_defined type;   ///< Metafunction result
127 #   else
128             typedef typename cds::opt::make_options<
129                 typename cds::opt::find_type_traits< traits, Options... >::type
130                 , Options...
131             >::type type;
132 #   endif
133         };
134
135     } //namespace vyukov_queue
136
137     /// Vyukov's MPMC bounded queue
138     /** @ingroup cds_nonintrusive_queue
139         This algorithm is developed by Dmitry Vyukov (see http://www.1024cores.net)
140         It's multi-producer multi-consumer (MPMC), array-based, fails on overflow, does not require GC, w/o priorities, causal FIFO,
141         blocking producers and consumers queue. The algorithm is pretty simple and fast. It's not lock-free in the official meaning,
142         just implemented by means of atomic RMW operations w/o mutexes.
143
144         The cost of enqueue/dequeue is 1 CAS per operation.
145         No dynamic memory allocation/management during operation. Producers and consumers are separated from each other (as in the two-lock queue),
146         i.e. do not touch the same data while queue is not empty.
147
148         There is multiple producer/single consumer version \p cds::container::VyukovMPSCCycleQueue
149         that supports \p front() and \p pop_front() functions.
150
151         Source:
152             - http://www.1024cores.net/home/lock-free-algorithms/queues/bounded-mpmc-queue
153
154         Template parameters
155         - \p T - type stored in queue.
156         - \p Traits - queue traits, default is \p vykov_queue::traits. You can use \p vykov_queue::make_traits
157             metafunction to make your traits or just derive your traits from \p %vykov_queue::traits:
158             \code
159             struct myTraits: public cds::container::vykov_queue::traits {
160                 typedef cds::atomicity::item_counter    item_counter;
161             };
162             typedef cds::container::VyukovMPMCCycleQueue< Foo, myTraits > myQueue;
163
164             // Equivalent make_traits example:
165             typedef cds::container::VyukovMPMCCycleQueue< cds::gc::HP, Foo,
166                 typename cds::container::vykov_queue::make_traits<
167                     cds::opt::item_counter< cds::atomicity::item_counter >
168                 >::type
169             > myQueue;
170             \endcode
171
172         \par License
173             Simplified BSD license by Dmitry Vyukov (http://www.1024cores.net/site/1024cores/home/code-license)
174     */
175     template <typename T, typename Traits = vyukov_queue::traits >
176     class VyukovMPMCCycleQueue : public cds::bounded_container
177     {
178     public:
179         typedef T value_type;   ///< Value type to be stored in the queue
180         typedef Traits traits;  ///< Queue traits
181         typedef typename traits::item_counter  item_counter;  ///< Item counter type
182         typedef typename traits::memory_model  memory_model;  ///< Memory ordering. See cds::opt::memory_model option
183         typedef typename traits::value_cleaner value_cleaner; ///< Value cleaner, see \p vyukov_queue::traits::value_cleaner
184         typedef typename traits::back_off  back_off;          ///< back-off strategy
185
186         /// \p true for single-consumer version, \p false otherwise
187         static CDS_CONSTEXPR bool const c_single_consumer = traits::single_consumer;
188
189         /// Rebind template arguments
190         template <typename T2, typename Traits2>
191         struct rebind {
192             typedef VyukovMPMCCycleQueue< T2, Traits2 > other   ;   ///< Rebinding result
193         };
194
195     protected:
196         //@cond
197         typedef atomics::atomic<size_t> sequence_type;
198         struct cell_type
199         {
200             sequence_type   sequence;
201             value_type      data;
202
203             cell_type()
204             {}
205         };
206
207         typedef typename traits::buffer::template rebind<cell_type>::other buffer;
208         //@endcond
209
210     protected:
211         //@cond
212         buffer          m_buffer;
213         size_t const    m_nBufferMask;
214         typename opt::details::apply_padding< size_t, traits::padding >::padding_type pad1_;
215         sequence_type   m_posEnqueue;
216         typename opt::details::apply_padding< sequence_type, traits::padding >::padding_type pad2_;
217         sequence_type   m_posDequeue;
218         typename opt::details::apply_padding< sequence_type, traits::padding >::padding_type pad3_;
219         item_counter    m_ItemCounter;
220         //@endcond
221
222     public:
223         /// Constructs the queue of capacity \p nCapacity
224         /**
225             For \p cds::opt::v::static_buffer the \p nCapacity parameter is ignored.
226
227             The buffer capacity must be the power of two.
228         */
229         VyukovMPMCCycleQueue(
230             size_t nCapacity = 0
231             )
232             : m_buffer( nCapacity )
233             , m_nBufferMask( m_buffer.capacity() - 1 )
234         {
235             nCapacity = m_buffer.capacity();
236
237             // Buffer capacity must be power of 2
238             assert( nCapacity >= 2 && (nCapacity & (nCapacity - 1)) == 0 );
239
240             for (size_t i = 0; i != nCapacity; ++i )
241                 m_buffer[i].sequence.store(i, memory_model::memory_order_relaxed);
242
243             m_posEnqueue.store(0, memory_model::memory_order_relaxed);
244             m_posDequeue.store(0, memory_model::memory_order_relaxed);
245         }
246
247         ~VyukovMPMCCycleQueue()
248         {
249             clear();
250         }
251
252         /// Enqueues data to the queue using a functor
253         /**
254             \p Func is a functor called to copy a value to the queue cell.
255             The functor \p f takes one argument - a reference to a empty cell of type \ref value_type :
256             \code
257             cds::container::VyukovMPMCCycleQueue< Foo > myQueue;
258             Bar bar;
259             myQueue.enqueue_with( [&bar]( Foo& dest ) { dest = std::move(bar); } );
260             \endcode
261         */
262         template <typename Func>
263         bool enqueue_with(Func f)
264         {
265             cell_type* cell;
266             back_off bkoff;
267
268             size_t pos = m_posEnqueue.load(memory_model::memory_order_relaxed);
269             for (;;)
270             {
271                 cell = &m_buffer[pos & m_nBufferMask];
272                 size_t seq = cell->sequence.load(memory_model::memory_order_acquire);
273
274                 intptr_t dif = static_cast<intptr_t>(seq) - static_cast<intptr_t>(pos);
275
276                 if (dif == 0) {
277                     if ( m_posEnqueue.compare_exchange_weak(pos, pos + 1, memory_model::memory_order_relaxed, atomics::memory_order_relaxed ))
278                         break;
279                 }
280                 else if (dif < 0) {
281                     // Queue full?
282                     if ( pos - m_posDequeue.load( memory_model::memory_order_relaxed ) == capacity() )
283                         return false;   // queue full
284                     bkoff();
285                     pos = m_posEnqueue.load( memory_model::memory_order_relaxed );
286                 }
287                 else
288                     pos = m_posEnqueue.load(memory_model::memory_order_relaxed);
289             }
290
291             f( cell->data );
292
293             cell->sequence.store(pos + 1, memory_model::memory_order_release);
294             ++m_ItemCounter;
295
296             return true;
297         }
298
299         /// Enqueues \p val value into the queue.
300         /**
301             The new queue item is created by calling placement new in free cell.
302             Returns \p true if success, \p false if the queue is full.
303         */
304         bool enqueue( value_type const& val )
305         {
306             return enqueue_with( [&val]( value_type& dest ){ new ( &dest ) value_type( val ); });
307         }
308
309         /// Synonym for \p enqueue()
310         bool push( value_type const& data )
311         {
312             return enqueue( data );
313         }
314
315         /// Synonym for \p enqueue_with()
316         template <typename Func>
317         bool push_with( Func f )
318         {
319             return enqueue_with( f );
320         }
321
322         /// Enqueues data of type \ref value_type constructed with <tt>std::forward<Args>(args)...</tt>
323         template <typename... Args>
324         bool emplace( Args&&... args )
325         {
326 #if (CDS_COMPILER == CDS_COMPILER_GCC) && (CDS_COMPILER_VERSION < 40900)
327             //work around unsupported feature in g++ 4.8 for forwarding parameter packs to lambda.
328             return enqueue_with ( std::bind([]( value_type& dest,Args ... args ){ new ( &dest ) value_type( std::forward<Args>(args)... );}, std::placeholders::_1 ,args...));
329 #else
330             return enqueue_with( [&args ...]( value_type& dest ){ new ( &dest ) value_type( std::forward<Args>(args)... ); });
331 #endif
332         }
333
334         /// Dequeues a value using a functor
335         /**
336             \p Func is a functor called to copy dequeued value.
337             The functor takes one argument - a reference to removed node:
338             \code
339             cds:container::VyukovMPMCCycleQueue< Foo > myQueue;
340             Bar bar;
341             myQueue.dequeue_with( [&bar]( Foo& src ) { bar = std::move( src );});
342             \endcode
343             The functor is called only if the queue is not empty.
344         */
345         template <typename Func>
346         bool dequeue_with( Func f )
347         {
348             cell_type * cell;
349             back_off bkoff;
350
351             size_t pos = m_posDequeue.load( memory_model::memory_order_relaxed );
352             for (;;)
353             {
354                 cell = &m_buffer[pos & m_nBufferMask];
355                 size_t seq = cell->sequence.load(memory_model::memory_order_acquire);
356                 intptr_t dif = static_cast<intptr_t>(seq) - static_cast<intptr_t>(pos + 1);
357
358                 if (dif == 0) {
359                     if ( m_posDequeue.compare_exchange_weak(pos, pos + 1, memory_model::memory_order_relaxed, atomics::memory_order_relaxed))
360                         break;
361                 }
362                 else if (dif < 0) {
363                     // Queue empty?
364                     if ( pos - m_posEnqueue.load( memory_model::memory_order_relaxed ) == 0 )
365                         return false;   // queue empty
366                     bkoff();
367                     pos = m_posDequeue.load( memory_model::memory_order_relaxed );
368                 }
369                 else
370                     pos = m_posDequeue.load(memory_model::memory_order_relaxed);
371             }
372
373             f( cell->data );
374             value_cleaner()( cell->data );
375             cell->sequence.store( pos + m_nBufferMask + 1, memory_model::memory_order_release );
376             --m_ItemCounter;
377
378             return true;
379         }
380
381         /// Dequeues a value from the queue
382         /**
383             If queue is not empty, the function returns \p true, \p dest contains copy of
384             dequeued value. The assignment operator for type \ref value_type is invoked.
385             If queue is empty, the function returns \p false, \p dest is unchanged.
386         */
387         bool dequeue(value_type & dest )
388         {
389             return dequeue_with( [&dest]( value_type& src ){ dest = src; } );
390         }
391
392         /// Synonym for \p dequeue()
393         bool pop(value_type& data)
394         {
395             return dequeue(data);
396         }
397
398         /// Synonym for \p dequeue_with()
399         template <typename Func>
400         bool pop_with( Func f )
401         {
402             return dequeue_with( f );
403         }
404
405         /// Returns a pointer to top element of the queue or \p nullptr if queue is empty (only for single-consumer version)
406         template <bool SC = c_single_consumer >
407         typename std::enable_if<SC, value_type *>::type front()
408         {
409             static_assert( c_single_consumer, "front() is enabled only if traits::single_consumer is true");
410
411             cell_type * cell;
412             back_off bkoff;
413
414             size_t pos = m_posDequeue.load( memory_model::memory_order_relaxed );
415             for ( ;;)
416             {
417                 cell = &m_buffer[pos & m_nBufferMask];
418                 size_t seq = cell->sequence.load( memory_model::memory_order_acquire );
419                 intptr_t dif = static_cast<intptr_t>(seq) - static_cast<intptr_t>(pos + 1);
420
421                 if ( dif == 0 )
422                     return &cell->data;
423                 else if ( dif < 0 ) {
424                     // Queue empty?
425                     if ( pos - m_posEnqueue.load( memory_model::memory_order_relaxed ) == 0 )
426                         return nullptr;   // queue empty
427                     bkoff();
428                     pos = m_posDequeue.load( memory_model::memory_order_relaxed );
429                 }
430                 else
431                     pos = m_posDequeue.load( memory_model::memory_order_relaxed );
432             }
433         }
434
435         /// Pops top element; returns \p true if queue is not empty, \p false otherwise (only for single-consumer version)
436         template <bool SC = c_single_consumer >
437         typename std::enable_if<SC, bool>::type pop_front()
438         {
439             return dequeue_with( []( value_type& ) {} );
440         }
441
442         /// Checks if the queue is empty
443         bool empty() const
444         {
445             const cell_type * cell;
446             back_off bkoff;
447
448             size_t pos = m_posDequeue.load(memory_model::memory_order_relaxed);
449             for (;;)
450             {
451                 cell = &m_buffer[pos & m_nBufferMask];
452                 size_t seq = cell->sequence.load(memory_model::memory_order_acquire);
453                 intptr_t dif = static_cast<intptr_t>(seq) - static_cast<intptr_t>(pos + 1);
454
455                 if (dif == 0)
456                     return false;
457                 else if (dif < 0) {
458                     if ( pos - m_posEnqueue.load( memory_model::memory_order_relaxed ) == 0 )
459                         return true;
460                 }
461                 bkoff();
462                 pos = m_posDequeue.load(memory_model::memory_order_relaxed);
463             }
464         }
465
466         /// Clears the queue
467         void clear()
468         {
469             value_type v;
470             while ( pop(v) );
471         }
472
473         /// Returns queue's item count
474         /**
475             The value returned depends on \p vyukov_queue::traits::item_counter option.
476             For \p atomicity::empty_item_counter, the function always returns 0.
477         */
478         size_t size() const
479         {
480             return m_ItemCounter.value();
481         }
482
483         /// Returns capacity of the queue
484         size_t capacity() const
485         {
486             return m_buffer.capacity();
487         }
488     };
489
490     //@cond
491     namespace vyukov_queue {
492
493         template <typename Traits>
494         struct single_consumer_traits : public Traits
495         {
496             static CDS_CONSTEXPR bool const single_consumer = true;
497         };
498     } // namespace vyukov_queue
499     //@endcond
500
501     /// Vyukov's queue multiple producer - single consumer version
502
503     template <typename T, typename Traits = vyukov_queue::traits >
504     using VyukovMPSCCycleQueue = VyukovMPMCCycleQueue< T, vyukov_queue::single_consumer_traits<Traits> >;
505
506 }}  // namespace cds::container
507
508 #endif // #ifndef CDSLIB_CONTAINER_VYUKOV_MPMC_CYCLE_QUEUE_H