Fixed a bug
[libcds.git] / cds / container / weak_ringbuffer.h
1 /*
2     This file is a part of libcds - Concurrent Data Structures library
3
4     (C) Copyright Maxim Khizhinsky (libcds.dev@gmail.com) 2006-2017
5
6     Source code repo: http://github.com/khizmax/libcds/
7     Download: http://sourceforge.net/projects/libcds/files/
8
9     Redistribution and use in source and binary forms, with or without
10     modification, are permitted provided that the following conditions are met:
11
12     * Redistributions of source code must retain the above copyright notice, this
13       list of conditions and the following disclaimer.
14
15     * Redistributions in binary form must reproduce the above copyright notice,
16       this list of conditions and the following disclaimer in the documentation
17       and/or other materials provided with the distribution.
18
19     THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
20     AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
21     IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
22     DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE
23     FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
24     DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
25     SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
26     CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,
27     OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
28     OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
29 */
30
31 #ifndef CDSLIB_CONTAINER_WEAK_RINGBUFFER_H
32 #define CDSLIB_CONTAINER_WEAK_RINGBUFFER_H
33
34 #include <cds/container/details/base.h>
35 #include <cds/opt/buffer.h>
36 #include <cds/opt/value_cleaner.h>
37 #include <cds/algo/atomic.h>
38 #include <cds/details/bounded_container.h>
39
40 namespace cds { namespace container {
41
42     /// \p WeakRingBuffer related definitions
43     /** @ingroup cds_nonintrusive_helper
44     */
45     namespace weak_ringbuffer {
46
47         /// \p WeakRingBuffer default traits
48         struct traits {
49             /// Buffer type for internal array
50             /*
51                 The type of element for the buffer is not important: \p WeakRingBuffer rebind
52                 the buffer for required type via \p rebind metafunction.
53
54                 For \p WeakRingBuffer the buffer size should have power-of-2 size.
55
56                 You should use only uninitialized buffer for the ring buffer -
57                 \p cds::opt::v::uninitialized_dynamic_buffer (the default),
58                 \p cds::opt::v::uninitialized_static_buffer.
59             */
60             typedef cds::opt::v::uninitialized_dynamic_buffer< void * > buffer;
61
62             /// A functor to clean item dequeued.
63             /**
64                 The functor calls the destructor for popped element.
65                 After a set of items is dequeued, \p value_cleaner cleans the cells that the items have been occupied.
66                 If \p T is a complex type, \p value_cleaner may be useful feature.
67                 For POD types \ref opt::v::empty_cleaner is suitable
68
69                 Default value is \ref opt::v::auto_cleaner that calls destructor only if it is not trivial.
70             */
71             typedef cds::opt::v::auto_cleaner value_cleaner;
72
73             /// C++ memory ordering model
74             /**
75                 Can be \p opt::v::relaxed_ordering (relaxed memory model, the default)
76                 or \p opt::v::sequential_consistent (sequentially consistent memory model).
77             */
78             typedef opt::v::relaxed_ordering    memory_model;
79
80             /// Padding for internal critical atomic data. Default is \p opt::cache_line_padding
81             enum { padding = opt::cache_line_padding };
82         };
83
84         /// Metafunction converting option list to \p weak_ringbuffer::traits
85         /**
86             Supported \p Options are:
87             - \p opt::buffer - an uninitialized buffer type for internal cyclic array. Possible types are:
88                 \p opt::v::uninitialized_dynamic_buffer (the default), \p opt::v::uninitialized_static_buffer. The type of
89                 element in the buffer is not important: it will be changed via \p rebind metafunction.
90             - \p opt::value_cleaner - a functor to clean items dequeued.
91                 The functor calls the destructor for ring-buffer item.
92                 After a set of items is dequeued, \p value_cleaner cleans the cells that the items have been occupied.
93                 If \p T is a complex type, \p value_cleaner can be an useful feature.
94                 Default value is \ref opt::v::empty_cleaner that is suitable for POD types.
95             - \p opt::padding - padding for internal critical atomic data. Default is \p opt::cache_line_padding
96             - \p opt::memory_model - C++ memory ordering model. Can be \p opt::v::relaxed_ordering (relaxed memory model, the default)
97                 or \p opt::v::sequential_consistent (sequentially consisnent memory model).
98
99             Example: declare \p %WeakRingBuffer with static iternal buffer of size 1024:
100             \code
101             typedef cds::container::WeakRingBuffer< Foo,
102                 typename cds::container::weak_ringbuffer::make_traits<
103                     cds::opt::buffer< cds::opt::v::uninitialized_static_buffer< void *, 1024 >
104                 >::type
105             > myRing;
106             \endcode
107         */
108         template <typename... Options>
109         struct make_traits {
110 #   ifdef CDS_DOXYGEN_INVOKED
111             typedef implementation_defined type;   ///< Metafunction result
112 #   else
113             typedef typename cds::opt::make_options<
114                 typename cds::opt::find_type_traits< traits, Options... >::type
115                 , Options...
116             >::type type;
117 #   endif
118         };
119
120     } // namespace weak_ringbuffer
121
122     /// Single-producer single-consumer ring buffer
123     /** @ingroup cds_nonintrusive_queue
124         Source: [2013] Nhat Minh Le, Adrien Guatto, Albert Cohen, Antoniu Pop. Correct and Effcient Bounded
125             FIFO Queues. [Research Report] RR-8365, INRIA. 2013. <hal-00862450>
126
127         Ring buffer is a bounded queue. Additionally, \p %WeakRingBuffer supports batch operations -
128         you can push/pop an array of elements.
129
130         There are a specialization \ref cds_nonintrusive_WeakRingBuffer_void "WeakRingBuffer<void, Traits>" 
131         that is not a queue but a "memory pool" between producer and consumer threads. 
132         \p WeakRingBuffer<void> supports data of different size.
133     */
134     template <typename T, typename Traits = weak_ringbuffer::traits>
135     class WeakRingBuffer: public cds::bounded_container
136     {
137     public:
138         typedef T value_type;   ///< Value type to be stored in the ring buffer
139         typedef Traits traits;  ///< Ring buffer traits
140         typedef typename traits::memory_model  memory_model;  ///< Memory ordering. See \p cds::opt::memory_model option
141         typedef typename traits::value_cleaner value_cleaner; ///< Value cleaner, see \p weak_ringbuffer::traits::value_cleaner
142
143         /// Rebind template arguments
144         template <typename T2, typename Traits2>
145         struct rebind {
146             typedef WeakRingBuffer< T2, Traits2 > other;   ///< Rebinding result
147         };
148
149         //@cond
150         // Only for tests
151         typedef size_t item_counter;
152         //@endcond
153
154     private:
155         //@cond
156         typedef typename traits::buffer::template rebind< value_type >::other buffer;
157         //@endcond
158
159     public:
160
161         /// Creates the ring buffer of \p capacity
162         /**
163             For \p cds::opt::v::uninitialized_static_buffer the \p nCapacity parameter is ignored.
164
165             If the buffer capacity is a power of two, lightweight binary arithmetics is used
166             instead of modulo arithmetics.
167         */
168         WeakRingBuffer( size_t capacity = 0 )
169             : front_( 0 )
170             , pfront_( 0 )
171             , cback_( 0 )
172             , buffer_( capacity )
173         {
174             back_.store( 0, memory_model::memory_order_release );
175         }
176
177         /// Destroys the ring buffer
178         ~WeakRingBuffer()
179         {
180             value_cleaner cleaner;
181             size_t back = back_.load( memory_model::memory_order_relaxed );
182             for ( size_t front = front_.load( memory_model::memory_order_relaxed ); front != back; ++front )
183                 cleaner( buffer_[ buffer_.mod( front ) ] );
184         }
185
186         /// Batch push - push array \p arr of size \p count
187         /**
188             \p CopyFunc is a per-element copy functor: for each element of \p arr
189             <tt>copy( dest, arr[i] )</tt> is called.
190             The \p CopyFunc signature:
191             \code
192                 void copy_func( value_type& element, Q const& source );
193             \endcode
194             Here \p element is uninitialized so you should construct it using placement new
195             if needed; for example, if the element type is \p str::string and \p Q is <tt>char const*</tt>,
196             \p copy functor can be:
197             \code
198             cds::container::WeakRingBuffer<std::string> ringbuf;
199             char const* arr[10];
200             ringbuf.push( arr, 10, 
201                 []( std::string& element, char const* src ) {
202                     new( &element ) std::string( src );
203                 });
204             \endcode
205             You may use move semantics if appropriate:
206             \code
207             cds::container::WeakRingBuffer<std::string> ringbuf;
208             std::string arr[10];
209             ringbuf.push( arr, 10,
210                 []( std::string& element, std:string& src ) {
211                     new( &element ) std::string( std::move( src ));
212                 });
213             \endcode
214
215             Returns \p true if success or \p false if not enough space in the ring
216         */
217         template <typename Q, typename CopyFunc>
218         bool push( Q* arr, size_t count, CopyFunc copy )
219         {
220             assert( count < capacity() );
221             size_t back = back_.load( memory_model::memory_order_relaxed );
222
223             assert( back - pfront_ <= capacity() );
224
225             if ( pfront_ + capacity() - back < count ) {
226                 pfront_ = front_.load( memory_model::memory_order_acquire );
227
228                 if ( pfront_ + capacity() - back < count ) {
229                     // not enough space
230                     return false;
231                 }
232             }
233
234             // copy data
235             for ( size_t i = 0; i < count; ++i, ++back )
236                 copy( buffer_[buffer_.mod( back )], arr[i] );
237
238             back_.store( back, memory_model::memory_order_release );
239
240             return true;
241         }
242
243         /// Batch push - push array \p arr of size \p count with assignment as copy functor
244         /**
245             This function is equivalent for:
246             \code
247             push( arr, count, []( value_type& dest, Q const& src ) { dest = src; } );
248             \endcode
249
250             The function is available only if <tt>std::is_constructible<value_type, Q>::value</tt>
251             is \p true.
252
253             Returns \p true if success or \p false if not enough space in the ring
254         */
255         template <typename Q>
256         typename std::enable_if< std::is_constructible<value_type, Q>::value, bool>::type
257         push( Q* arr, size_t count )
258         {
259             return push( arr, count, []( value_type& dest, Q const& src ) { new( &dest ) value_type( src ); } );
260         }
261
262         /// Push one element created from \p args
263         /**
264             The function is available only if <tt>std::is_constructible<value_type, Args...>::value</tt>
265             is \p true.
266
267             Returns \p false if the ring is full or \p true otherwise.
268         */
269         template <typename... Args>
270         typename std::enable_if< std::is_constructible<value_type, Args...>::value, bool>::type
271         emplace( Args&&... args )
272         {
273             size_t back = back_.load( memory_model::memory_order_relaxed );
274
275             assert( back - pfront_ <= capacity() );
276
277             if ( pfront_ + capacity() - back < 1 ) {
278                 pfront_ = front_.load( memory_model::memory_order_acquire );
279
280                 if ( pfront_ + capacity() - back < 1 ) {
281                     // not enough space
282                     return false;
283                 }
284             }
285
286             new( &buffer_[buffer_.mod( back )] ) value_type( std::forward<Args>(args)... );
287
288             back_.store( back + 1, memory_model::memory_order_release );
289
290             return true;
291         }
292
293         /// Enqueues data to the ring using a functor
294         /**
295             \p Func is a functor called to copy a value to the ring element.
296             The functor \p f takes one argument - a reference to a empty cell of type \ref value_type :
297             \code
298             cds::container::WeakRingBuffer< Foo > myRing;
299             Bar bar;
300             myRing.enqueue_with( [&bar]( Foo& dest ) { dest = std::move(bar); } );
301             \endcode
302         */
303         template <typename Func>
304         bool enqueue_with( Func f )
305         {
306             size_t back = back_.load( memory_model::memory_order_relaxed );
307
308             assert( back - pfront_ <= capacity() );
309
310             if ( pfront_ + capacity() - back < 1 ) {
311                 pfront_ = front_.load( memory_model::memory_order_acquire );
312
313                 if ( pfront_ + capacity() - back < 1 ) {
314                     // not enough space
315                     return false;
316                 }
317             }
318
319             f( buffer_[buffer_.mod( back )] );
320
321             back_.store( back + 1, memory_model::memory_order_release );
322
323             return true;
324
325         }
326
327         /// Enqueues \p val value into the queue.
328         /**
329             The new queue item is created by calling placement new in free cell.
330             Returns \p true if success, \p false if the ring is full.
331         */
332         bool enqueue( value_type const& val )
333         {
334             return emplace( val );
335         }
336
337         /// Enqueues \p val value into the queue, move semantics
338         bool enqueue( value_type&& val )
339         {
340             return emplace( std::move( val ));
341         }
342
343         /// Synonym for \p enqueue( value_type const& )
344         bool push( value_type const& val )
345         {
346             return enqueue( val );
347         }
348
349         /// Synonym for \p enqueue( value_type&& )
350         bool push( value_type&& val )
351         {
352             return enqueue( std::move( val ));
353         }
354
355         /// Synonym for \p enqueue_with()
356         template <typename Func>
357         bool push_with( Func f )
358         {
359             return enqueue_with( f );
360         }
361
362         /// Batch pop \p count element from the ring buffer into \p arr
363         /**
364             \p CopyFunc is a per-element copy functor: for each element of \p arr
365             <tt>copy( arr[i], source )</tt> is called.
366             The \p CopyFunc signature:
367             \code
368             void copy_func( Q& dest, value_type& elemen );
369             \endcode
370
371             Returns \p true if success or \p false if not enough space in the ring
372         */
373         template <typename Q, typename CopyFunc>
374         bool pop( Q* arr, size_t count, CopyFunc copy )
375         {
376             assert( count < capacity() );
377
378             size_t front = front_.load( memory_model::memory_order_relaxed );
379             assert( cback_ - front < capacity() );
380
381             if ( cback_ - front < count ) {
382                 cback_ = back_.load( memory_model::memory_order_acquire );
383                 if ( cback_ - front < count )
384                     return false;
385             }
386
387             // copy data
388             value_cleaner cleaner;
389             for ( size_t i = 0; i < count; ++i, ++front ) {
390                 value_type& val = buffer_[buffer_.mod( front )];
391                 copy( arr[i], val );
392                 cleaner( val );
393             }
394
395             front_.store( front, memory_model::memory_order_release );
396             return true;
397         }
398
399         /// Batch pop - push array \p arr of size \p count with assignment as copy functor
400         /**
401             This function is equivalent for:
402             \code
403             pop( arr, count, []( Q& dest, value_type& src ) { dest = src; } );
404             \endcode
405
406             The function is available only if <tt>std::is_assignable<Q&, value_type const&>::value</tt>
407             is \p true.
408
409             Returns \p true if success or \p false if not enough space in the ring
410         */
411         template <typename Q>
412         typename std::enable_if< std::is_assignable<Q&, value_type const&>::value, bool>::type
413         pop( Q* arr, size_t count )
414         {
415             return pop( arr, count, []( Q& dest, value_type& src ) { dest = src; } );
416         }
417
418         /// Dequeues an element from the ring to \p val
419         /**
420             The function is available only if <tt>std::is_assignable<Q&, value_type const&>::value</tt>
421             is \p true.
422
423             Returns \p false if the ring is full or \p true otherwise.
424         */
425         template <typename Q>
426         typename std::enable_if< std::is_assignable<Q&, value_type const&>::value, bool>::type
427         dequeue( Q& val )
428         {
429             return pop( &val, 1 );
430         }
431
432         /// Synonym for \p dequeue( Q& )
433         template <typename Q>
434         typename std::enable_if< std::is_assignable<Q&, value_type const&>::value, bool>::type
435         pop( Q& val )
436         {
437             return dequeue( val );
438         }
439
440         /// Dequeues a value using a functor
441         /**
442             \p Func is a functor called to copy dequeued value.
443             The functor takes one argument - a reference to removed node:
444             \code
445             cds:container::WeakRingBuffer< Foo > myRing;
446             Bar bar;
447             myRing.dequeue_with( [&bar]( Foo& src ) { bar = std::move( src );});
448             \endcode
449
450             Returns \p true if the ring is not empty, \p false otherwise.
451             The functor is called only if the ring is not empty.
452         */
453         template <typename Func>
454         bool dequeue_with( Func f )
455         {
456             size_t front = front_.load( memory_model::memory_order_relaxed );
457             assert( cback_ - front < capacity() );
458
459             if ( cback_ - front < 1 ) {
460                 cback_ = back_.load( memory_model::memory_order_acquire );
461                 if ( cback_ - front < 1 )
462                     return false;
463             }
464
465             value_type& val = buffer_[buffer_.mod( front )];
466             f( val );
467             value_cleaner()( val );
468
469             front_.store( front + 1, memory_model::memory_order_release );
470             return true;
471         }
472
473         /// Synonym for \p dequeue_with()
474         template <typename Func>
475         bool pop_with( Func f )
476         {
477             return dequeue_with( f );
478         }
479
480         /// Gets pointer to first element of ring buffer
481         /**
482             If the ring buffer is empty, returns \p nullptr
483
484             The function is thread-safe since there is only one consumer.
485             Recall, \p WeakRingBuffer is single-producer/single consumer container.
486         */
487         value_type* front()
488         {
489             size_t front = front_.load( memory_model::memory_order_relaxed );
490             assert( cback_ - front < capacity() );
491
492             if ( cback_ - front < 1 ) {
493                 cback_ = back_.load( memory_model::memory_order_acquire );
494                 if ( cback_ - front < 1 )
495                     return nullptr;
496             }
497
498             return &buffer_[buffer_.mod( front )];
499         }
500
501         /// Removes front element of ring-buffer
502         /**
503             If the ring-buffer is empty, returns \p false.
504             Otherwise, pops the first element from the ring.
505         */
506         bool pop_front()
507         {
508             size_t front = front_.load( memory_model::memory_order_relaxed );
509             assert( cback_ - front <= capacity() );
510
511             if ( cback_ - front < 1 ) {
512                 cback_ = back_.load( memory_model::memory_order_acquire );
513                 if ( cback_ - front < 1 )
514                     return false;
515             }
516
517             // clean cell
518             value_cleaner()( buffer_[buffer_.mod( front )] );
519
520             front_.store( front + 1, memory_model::memory_order_release );
521             return true;
522         }
523
524         /// Clears the ring buffer (only consumer can call this function!)
525         void clear()
526         {
527             value_type v;
528             while ( pop( v ) );
529         }
530
531         /// Checks if the ring-buffer is empty
532         bool empty() const
533         {
534             return front_.load( memory_model::memory_order_relaxed ) == back_.load( memory_model::memory_order_relaxed );
535         }
536
537         /// Checks if the ring-buffer is full
538         bool full() const
539         {
540             return back_.load( memory_model::memory_order_relaxed ) - front_.load( memory_model::memory_order_relaxed ) >= capacity();
541         }
542
543         /// Returns the current size of ring buffer
544         size_t size() const
545         {
546             return back_.load( memory_model::memory_order_relaxed ) - front_.load( memory_model::memory_order_relaxed );
547         }
548
549         /// Returns capacity of the ring buffer
550         size_t capacity() const
551         {
552             return buffer_.capacity();
553         }
554
555     private:
556         //@cond
557         atomics::atomic<size_t>     front_;
558         typename opt::details::apply_padding< atomics::atomic<size_t>, traits::padding >::padding_type pad1_;
559         atomics::atomic<size_t>     back_;
560         typename opt::details::apply_padding< atomics::atomic<size_t>, traits::padding >::padding_type pad2_;
561         size_t                      pfront_;
562         typename opt::details::apply_padding< size_t, traits::padding >::padding_type pad3_;
563         size_t                      cback_;
564         typename opt::details::apply_padding< size_t, traits::padding >::padding_type pad4_;
565
566         buffer                      buffer_;
567         //@endcond
568     };
569
570
571     /// Single-producer single-consumer ring buffer for untyped variable-sized data
572     /** @ingroup cds_nonintrusive_queue
573         @anchor cds_nonintrusive_WeakRingBuffer_void
574     */
575 #ifdef CDS_DOXYGEN_INVOKED
576     template <typename Traits = weak_ringbuffer::traits>
577 #else
578     template <typename Traits>
579 #endif
580     class WeakRingBuffer<void, Traits>: public cds::bounded_container
581     {
582     public:
583         typedef Traits      traits;         ///< Ring buffer traits
584         typedef typename    traits::memory_model  memory_model;  ///< Memory ordering. See \p cds::opt::memory_model option
585
586     private:
587         //@cond
588         typedef typename traits::buffer::template rebind< uint8_t >::other buffer;
589         //@endcond
590
591     public:
592         /// Creates the ring buffer of \p capacity bytes
593         /**
594             For \p cds::opt::v::uninitialized_static_buffer the \p nCapacity parameter is ignored.
595
596             If the buffer capacity is a power of two, lightweight binary arithmetics is used
597             instead of modulo arithmetics.
598         */
599         WeakRingBuffer( size_t capacity = 0 )
600             : front_( 0 )
601             , pfront_( 0 )
602             , cback_( 0 )
603             , buffer_( capacity )
604         {
605             back_.store( 0, memory_model::memory_order_release );
606         }
607
608         /// [producer] Reserve \p size bytes
609         void* back( size_t size )
610         {
611             // Any data is rounded to 8-byte boundary
612             size_t real_size = calc_real_size( size );
613
614             // check if we can reserve read_size bytes
615             assert( real_size < capacity() );
616             size_t back = back_.load( memory_model::memory_order_relaxed );
617
618             assert( back - pfront_ <= capacity() );
619
620             if ( pfront_ + capacity() - back < real_size ) {
621                 pfront_ = front_.load( memory_model::memory_order_acquire );
622
623                 if ( pfront_ + capacity() - back < real_size ) {
624                     // not enough space
625                     return nullptr;
626                 }
627             }
628
629             uint8_t* reserved = buffer_.buffer() + buffer_.mod( back );
630
631             // Check if the buffer free space is enough for storing real_size bytes
632             size_t tail_size = capacity() - buffer_.mod( back );
633             if ( tail_size < real_size ) {
634                 // make unused tail
635                 assert( tail_size >= sizeof( size_t ) );
636                 assert( !is_tail( tail_size ) );
637
638                 *reinterpret_cast<size_t*>( reserved ) = make_tail( tail_size - sizeof(size_t));
639                 back += tail_size;
640
641                 // We must be in beginning of buffer
642                 assert( buffer_.mod( back ) == 0 );
643
644                 if ( pfront_ + capacity() - back < real_size ) {
645                     pfront_ = front_.load( memory_model::memory_order_acquire );
646
647                     if ( pfront_ + capacity() - back < real_size ) {
648                         // not enough space
649                         return nullptr;
650                     }
651                 }
652
653                 reserved = buffer_.buffer();
654             }
655
656             // reserve and store size
657             uint8_t* reserved = buffer_.buffer() + buffer_.mod( back );
658             *reinterpret_cast<size_t*>( reserved ) = size;
659
660             return reinterpret_cast<void*>( reserved + sizeof( size_t ) );
661         }
662
663         /// [producer] Push reserved bytes into ring
664         void push_back()
665         {
666             size_t back = back_.load( memory_model::memory_order_relaxed );
667             uint8_t* reserved = buffer_.buffer() + buffer_.mod( back );
668
669             size_t real_size = calc_real_size( *reinterpret_cast<size_t*>( reserved ) );
670             assert( real_size < capacity() );
671
672             back_.store( back + real_size, memory_model::memory_order_release );
673         }
674
675         /// [producer] Push \p data of \p size bytes into ring
676         bool push_back( void const* data, size_t size )
677         {
678             void* buf = back( size );
679             if ( buf ) {
680                 memcpy( buf, data, size );
681                 push_back();
682                 return true;
683             }
684             return false;
685         }
686
687         /// [consumer] Get top data from the ring
688         std::pair<void*, size_t> front()
689         {
690             size_t front = front_.load( memory_model::memory_order_relaxed );
691             assert( cback_ - front < capacity() );
692
693             if ( cback_ - front < sizeof( size_t )) {
694                 cback_ = back_.load( memory_model::memory_order_acquire );
695                 if ( cback_ - front < sizeof( size_t ) )
696                     return std::make_pair( nullptr, 0u );
697             }
698
699             uint8_t * buf = buffer_.buffer() + buffer_.mod( front );
700
701             // check alignment
702             assert( ( reinterpret_cast<uintptr_t>( buf ) & ( sizeof( uintptr_t ) - 1 ) ) == 0 );
703
704             size_t size = *reinterpret_cast<size_t*>( buf );
705             if ( is_tail( size ) ) {
706                 // unused tail, skip
707                 CDS_VERIFY( pop_front() );
708
709                 front = front_.load( memory_model::memory_order_relaxed );
710                 buf = buffer_.buffer() + buffer_.mod( front );
711                 size = *reinterpret_cast<size_t*>( buf );
712
713                 assert( !is_tail( size ) );
714             }
715
716 #ifdef _DEBUG
717             size_t real_size = calc_real_size( size );
718             if ( cback_ - front < real_size ) {
719                 cback_ = back_.load( memory_model::memory_order_acquire );
720                 assert( cback_ - front >= real_size );
721             }
722 #endif
723
724             return std::make_pair( reinterpret_cast<void*>( buf + sizeof( size_t ) ), size );
725         }
726
727         /// [consumer] Pops top data
728         bool pop_front()
729         {
730             size_t front = front_.load( memory_model::memory_order_relaxed );
731             assert( cback_ - front <= capacity() );
732
733             if ( cback_ - front < sizeof(size_t) ) {
734                 cback_ = back_.load( memory_model::memory_order_acquire );
735                 if ( cback_ - front < sizeof( size_t ) )
736                     return false;
737             }
738
739             uint8_t * buf = buffer_.buffer() + buffer_.mod( front );
740
741             // check alignment
742             assert( ( reinterpret_cast<uintptr_t>( buf ) & ( sizeof( uintptr_t ) - 1 ) ) == 0 );
743
744             size_t size = *reinterpret_cast<size_t*>( buf );
745             assert( !is_tail( size ) );
746
747             size_t real_size = calc_real_size( size );
748
749 #ifdef _DEBUG
750             if ( cback_ - front < real_size ) {
751                 cback_ = back_.load( memory_model::memory_order_acquire );
752                 assert( cback_ - front >= real_size );
753             }
754 #endif
755
756             front_.store( front + real_size, memory_model::memory_order_release );
757             return true;
758
759         }
760
761         /// [consumer] Clears the ring buffer
762         void clear()
763         {
764             for ( auto el = front(); el.first; el = front() )
765                 pop_front();
766         }
767
768         /// Checks if the ring-buffer is empty
769         bool empty() const
770         {
771             return front_.load( memory_model::memory_order_relaxed ) == back_.load( memory_model::memory_order_relaxed );
772         }
773
774         /// Checks if the ring-buffer is full
775         bool full() const
776         {
777             return back_.load( memory_model::memory_order_relaxed ) - front_.load( memory_model::memory_order_relaxed ) >= capacity();
778         }
779
780         /// Returns the current size of ring buffer
781         size_t size() const
782         {
783             return back_.load( memory_model::memory_order_relaxed ) - front_.load( memory_model::memory_order_relaxed );
784         }
785
786         /// Returns capacity of the ring buffer
787         size_t capacity() const
788         {
789             return buffer_.capacity();
790         }
791
792     private:
793         static size_t calc_real_size( size_t size )
794         {
795             size_t real_size =  (( size + sizeof( uintptr_t ) - 1 ) & ~( sizeof( uintptr_t ) - 1 )) + sizeof( size_t );
796
797             assert( real_size > size );
798             assert( real_size - size >= sizeof( size_t ) );
799
800             return real_size;
801         }
802
803         static bool is_tail( size_t size )
804         {
805             return ( size & ( size_t( 1 ) << ( sizeof( size_t ) * 8 - 1 ))) != 0;
806         }
807
808         static size_t make_tail( size_t size )
809         {
810             return size | ( size_t( 1 ) << ( sizeof( size_t ) * 8 - 1 ));
811         }
812
813     private:
814         //@cond
815         atomics::atomic<size_t>     front_;
816         typename opt::details::apply_padding< atomics::atomic<size_t>, traits::padding >::padding_type pad1_;
817         atomics::atomic<size_t>     back_;
818         typename opt::details::apply_padding< atomics::atomic<size_t>, traits::padding >::padding_type pad2_;
819         size_t                      pfront_;
820         typename opt::details::apply_padding< size_t, traits::padding >::padding_type pad3_;
821         size_t                      cback_;
822         typename opt::details::apply_padding< size_t, traits::padding >::padding_type pad4_;
823
824         buffer                      buffer_;
825         //@endcond
826     };
827
828 }} // namespace cds::container
829
830
831 #endif // #ifndef CDSLIB_CONTAINER_WEAK_RINGBUFFER_H