2 This file is a part of libcds - Concurrent Data Structures library
4 (C) Copyright Maxim Khizhinsky (libcds.dev@gmail.com) 2006-2017
6 Source code repo: http://github.com/khizmax/libcds/
7 Download: http://sourceforge.net/projects/libcds/files/
9 Redistribution and use in source and binary forms, with or without
10 modification, are permitted provided that the following conditions are met:
12 * Redistributions of source code must retain the above copyright notice, this
13 list of conditions and the following disclaimer.
15 * Redistributions in binary form must reproduce the above copyright notice,
16 this list of conditions and the following disclaimer in the documentation
17 and/or other materials provided with the distribution.
19 THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
20 AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
21 IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
22 DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE
23 FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
24 DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
25 SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
26 CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,
27 OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
28 OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
31 #ifndef CDSLIB_CONTAINER_WEAK_RINGBUFFER_H
32 #define CDSLIB_CONTAINER_WEAK_RINGBUFFER_H
34 #include <cds/container/details/base.h>
35 #include <cds/opt/buffer.h>
36 #include <cds/opt/value_cleaner.h>
37 #include <cds/algo/atomic.h>
38 #include <cds/details/bounded_container.h>
40 namespace cds { namespace container {
42 /// \p WeakRingBuffer related definitions
43 /** @ingroup cds_nonintrusive_helper
45 namespace weak_ringbuffer {
47 /// \p WeakRingBuffer default traits
49 /// Buffer type for internal array
51 The type of element for the buffer is not important: \p WeakRingBuffer rebind
52 the buffer for required type via \p rebind metafunction.
54 For \p WeakRingBuffer the buffer size should have power-of-2 size.
56 You should use only uninitialized buffer for the ring buffer -
57 \p cds::opt::v::uninitialized_dynamic_buffer (the default),
58 \p cds::opt::v::uninitialized_static_buffer.
60 typedef cds::opt::v::uninitialized_dynamic_buffer< void * > buffer;
62 /// A functor to clean item dequeued.
64 The functor calls the destructor for popped element.
65 After a set of items is dequeued, \p value_cleaner cleans the cells that the items have been occupied.
66 If \p T is a complex type, \p value_cleaner may be useful feature.
67 For POD types \ref opt::v::empty_cleaner is suitable
69 Default value is \ref opt::v::auto_cleaner that calls destructor only if it is not trivial.
71 typedef cds::opt::v::auto_cleaner value_cleaner;
73 /// C++ memory ordering model
75 Can be \p opt::v::relaxed_ordering (relaxed memory model, the default)
76 or \p opt::v::sequential_consistent (sequentially consistent memory model).
78 typedef opt::v::relaxed_ordering memory_model;
80 /// Padding for internal critical atomic data. Default is \p opt::cache_line_padding
81 enum { padding = opt::cache_line_padding };
84 /// Metafunction converting option list to \p weak_ringbuffer::traits
86 Supported \p Options are:
87 - \p opt::buffer - an uninitialized buffer type for internal cyclic array. Possible types are:
88 \p opt::v::uninitialized_dynamic_buffer (the default), \p opt::v::uninitialized_static_buffer. The type of
89 element in the buffer is not important: it will be changed via \p rebind metafunction.
90 - \p opt::value_cleaner - a functor to clean items dequeued.
91 The functor calls the destructor for ring-buffer item.
92 After a set of items is dequeued, \p value_cleaner cleans the cells that the items have been occupied.
93 If \p T is a complex type, \p value_cleaner can be an useful feature.
94 Default value is \ref opt::v::empty_cleaner that is suitable for POD types.
95 - \p opt::padding - padding for internal critical atomic data. Default is \p opt::cache_line_padding
96 - \p opt::memory_model - C++ memory ordering model. Can be \p opt::v::relaxed_ordering (relaxed memory model, the default)
97 or \p opt::v::sequential_consistent (sequentially consisnent memory model).
99 Example: declare \p %WeakRingBuffer with static iternal buffer for 1024 objects:
101 typedef cds::container::WeakRingBuffer< Foo,
102 typename cds::container::weak_ringbuffer::make_traits<
103 cds::opt::buffer< cds::opt::v::uninitialized_static_buffer< void *, 1024 >
108 template <typename... Options>
110 # ifdef CDS_DOXYGEN_INVOKED
111 typedef implementation_defined type; ///< Metafunction result
113 typedef typename cds::opt::make_options<
114 typename cds::opt::find_type_traits< traits, Options... >::type
120 } // namespace weak_ringbuffer
122 /// Single-producer single-consumer ring buffer
123 /** @ingroup cds_nonintrusive_queue
124 Source: [2013] Nhat Minh Le, Adrien Guatto, Albert Cohen, Antoniu Pop. Correct and Effcient Bounded
125 FIFO Queues. [Research Report] RR-8365, INRIA. 2013. <hal-00862450>
127 Ring buffer is a bounded queue. Additionally, \p %WeakRingBuffer supports batch operations -
128 you can push/pop an array of elements.
130 There are a specialization \ref cds_nonintrusive_WeakRingBuffer_void "WeakRingBuffer<void, Traits>"
131 that is not a queue but a "memory pool" between producer and consumer threads.
132 \p WeakRingBuffer<void> supports variable-sized data.
134 @warning: \p %WeakRingBuffer is developed for 64-bit architecture.
135 32-bit platform must provide support for 64-bit atomics.
137 template <typename T, typename Traits = weak_ringbuffer::traits>
138 class WeakRingBuffer: public cds::bounded_container
141 typedef T value_type; ///< Value type to be stored in the ring buffer
142 typedef Traits traits; ///< Ring buffer traits
143 typedef typename traits::memory_model memory_model; ///< Memory ordering. See \p cds::opt::memory_model option
144 typedef typename traits::value_cleaner value_cleaner; ///< Value cleaner, see \p weak_ringbuffer::traits::value_cleaner
146 /// Rebind template arguments
147 template <typename T2, typename Traits2>
149 typedef WeakRingBuffer< T2, Traits2 > other; ///< Rebinding result
154 typedef size_t item_counter;
159 typedef typename traits::buffer::template rebind< value_type >::other buffer;
160 typedef uint64_t counter_type;
165 /// Creates the ring buffer of \p capacity
167 For \p cds::opt::v::uninitialized_static_buffer the \p nCapacity parameter is ignored.
169 If the buffer capacity is a power of two, lightweight binary arithmetics is used
170 instead of modulo arithmetics.
172 WeakRingBuffer( size_t capacity = 0 )
176 , buffer_( capacity )
178 back_.store( 0, memory_model::memory_order_release );
181 /// Destroys the ring buffer
184 value_cleaner cleaner;
185 counter_type back = back_.load( memory_model::memory_order_relaxed );
186 for ( counter_type front = front_.load( memory_model::memory_order_relaxed ); front != back; ++front )
187 cleaner( buffer_[ buffer_.mod( front ) ] );
190 /// Batch push - push array \p arr of size \p count
192 \p CopyFunc is a per-element copy functor: for each element of \p arr
193 <tt>copy( dest, arr[i] )</tt> is called.
194 The \p CopyFunc signature:
196 void copy_func( value_type& element, Q const& source );
198 Here \p element is uninitialized so you should construct it using placement new
199 if needed; for example, if the element type is \p str::string and \p Q is <tt>char const*</tt>,
200 \p copy functor can be:
202 cds::container::WeakRingBuffer<std::string> ringbuf;
204 ringbuf.push( arr, 10,
205 []( std::string& element, char const* src ) {
206 new( &element ) std::string( src );
209 You may use move semantics if appropriate:
211 cds::container::WeakRingBuffer<std::string> ringbuf;
213 ringbuf.push( arr, 10,
214 []( std::string& element, std:string& src ) {
215 new( &element ) std::string( std::move( src ));
219 Returns \p true if success or \p false if not enough space in the ring
221 template <typename Q, typename CopyFunc>
222 bool push( Q* arr, size_t count, CopyFunc copy )
224 assert( count < capacity());
225 counter_type back = back_.load( memory_model::memory_order_relaxed );
227 assert( static_cast<size_t>( back - pfront_ ) <= capacity());
229 if ( static_cast<size_t>( pfront_ + capacity() - back ) < count ) {
230 pfront_ = front_.load( memory_model::memory_order_acquire );
232 if ( static_cast<size_t>( pfront_ + capacity() - back ) < count ) {
239 for ( size_t i = 0; i < count; ++i, ++back )
240 copy( buffer_[buffer_.mod( back )], arr[i] );
242 back_.store( back, memory_model::memory_order_release );
247 /// Batch push - push array \p arr of size \p count with assignment as copy functor
249 This function is equivalent for:
251 push( arr, count, []( value_type& dest, Q const& src ) { dest = src; } );
254 The function is available only if <tt>std::is_constructible<value_type, Q>::value</tt>
257 Returns \p true if success or \p false if not enough space in the ring
259 template <typename Q>
260 typename std::enable_if< std::is_constructible<value_type, Q>::value, bool>::type
261 push( Q* arr, size_t count )
263 return push( arr, count, []( value_type& dest, Q const& src ) { new( &dest ) value_type( src ); } );
266 /// Push one element created from \p args
268 The function is available only if <tt>std::is_constructible<value_type, Args...>::value</tt>
271 Returns \p false if the ring is full or \p true otherwise.
273 template <typename... Args>
274 typename std::enable_if< std::is_constructible<value_type, Args...>::value, bool>::type
275 emplace( Args&&... args )
277 counter_type back = back_.load( memory_model::memory_order_relaxed );
279 assert( static_cast<size_t>( back - pfront_ ) <= capacity());
281 if ( pfront_ + capacity() - back < 1 ) {
282 pfront_ = front_.load( memory_model::memory_order_acquire );
284 if ( pfront_ + capacity() - back < 1 ) {
290 new( &buffer_[buffer_.mod( back )] ) value_type( std::forward<Args>(args)... );
292 back_.store( back + 1, memory_model::memory_order_release );
297 /// Enqueues data to the ring using a functor
299 \p Func is a functor called to copy a value to the ring element.
300 The functor \p f takes one argument - a reference to a empty cell of type \ref value_type :
302 cds::container::WeakRingBuffer< Foo > myRing;
304 myRing.enqueue_with( [&bar]( Foo& dest ) { dest = std::move(bar); } );
307 template <typename Func>
308 bool enqueue_with( Func f )
310 counter_type back = back_.load( memory_model::memory_order_relaxed );
312 assert( static_cast<size_t>( back - pfront_ ) <= capacity());
314 if ( pfront_ + capacity() - back < 1 ) {
315 pfront_ = front_.load( memory_model::memory_order_acquire );
317 if ( pfront_ + capacity() - back < 1 ) {
323 f( buffer_[buffer_.mod( back )] );
325 back_.store( back + 1, memory_model::memory_order_release );
331 /// Enqueues \p val value into the queue.
333 The new queue item is created by calling placement new in free cell.
334 Returns \p true if success, \p false if the ring is full.
336 bool enqueue( value_type const& val )
338 return emplace( val );
341 /// Enqueues \p val value into the queue, move semantics
342 bool enqueue( value_type&& val )
344 return emplace( std::move( val ));
347 /// Synonym for \p enqueue( value_type const& )
348 bool push( value_type const& val )
350 return enqueue( val );
353 /// Synonym for \p enqueue( value_type&& )
354 bool push( value_type&& val )
356 return enqueue( std::move( val ));
359 /// Synonym for \p enqueue_with()
360 template <typename Func>
361 bool push_with( Func f )
363 return enqueue_with( f );
366 /// Batch pop \p count element from the ring buffer into \p arr
368 \p CopyFunc is a per-element copy functor: for each element of \p arr
369 <tt>copy( arr[i], source )</tt> is called.
370 The \p CopyFunc signature:
372 void copy_func( Q& dest, value_type& elemen );
375 Returns \p true if success or \p false if not enough space in the ring
377 template <typename Q, typename CopyFunc>
378 bool pop( Q* arr, size_t count, CopyFunc copy )
380 assert( count < capacity());
382 counter_type front = front_.load( memory_model::memory_order_relaxed );
383 assert( static_cast<size_t>( cback_ - front ) < capacity());
385 if ( static_cast<size_t>( cback_ - front ) < count ) {
386 cback_ = back_.load( memory_model::memory_order_acquire );
387 if ( static_cast<size_t>( cback_ - front ) < count )
392 value_cleaner cleaner;
393 for ( size_t i = 0; i < count; ++i, ++front ) {
394 value_type& val = buffer_[buffer_.mod( front )];
399 front_.store( front, memory_model::memory_order_release );
403 /// Batch pop - push array \p arr of size \p count with assignment as copy functor
405 This function is equivalent for:
407 pop( arr, count, []( Q& dest, value_type& src ) { dest = src; } );
410 The function is available only if <tt>std::is_assignable<Q&, value_type const&>::value</tt>
413 Returns \p true if success or \p false if not enough space in the ring
415 template <typename Q>
416 typename std::enable_if< std::is_assignable<Q&, value_type const&>::value, bool>::type
417 pop( Q* arr, size_t count )
419 return pop( arr, count, []( Q& dest, value_type& src ) { dest = src; } );
422 /// Dequeues an element from the ring to \p val
424 The function is available only if <tt>std::is_assignable<Q&, value_type const&>::value</tt>
427 Returns \p false if the ring is full or \p true otherwise.
429 template <typename Q>
430 typename std::enable_if< std::is_assignable<Q&, value_type const&>::value, bool>::type
433 return pop( &val, 1 );
436 /// Synonym for \p dequeue( Q& )
437 template <typename Q>
438 typename std::enable_if< std::is_assignable<Q&, value_type const&>::value, bool>::type
441 return dequeue( val );
444 /// Dequeues a value using a functor
446 \p Func is a functor called to copy dequeued value.
447 The functor takes one argument - a reference to removed node:
449 cds:container::WeakRingBuffer< Foo > myRing;
451 myRing.dequeue_with( [&bar]( Foo& src ) { bar = std::move( src );});
454 Returns \p true if the ring is not empty, \p false otherwise.
455 The functor is called only if the ring is not empty.
457 template <typename Func>
458 bool dequeue_with( Func f )
460 counter_type front = front_.load( memory_model::memory_order_relaxed );
461 assert( static_cast<size_t>( cback_ - front ) < capacity());
463 if ( cback_ - front < 1 ) {
464 cback_ = back_.load( memory_model::memory_order_acquire );
465 if ( cback_ - front < 1 )
469 value_type& val = buffer_[buffer_.mod( front )];
471 value_cleaner()( val );
473 front_.store( front + 1, memory_model::memory_order_release );
477 /// Synonym for \p dequeue_with()
478 template <typename Func>
479 bool pop_with( Func f )
481 return dequeue_with( f );
484 /// Gets pointer to first element of ring buffer
486 If the ring buffer is empty, returns \p nullptr
488 The function is thread-safe since there is only one consumer.
489 Recall, \p WeakRingBuffer is single-producer/single consumer container.
493 counter_type front = front_.load( memory_model::memory_order_relaxed );
494 assert( static_cast<size_t>( cback_ - front ) < capacity());
496 if ( cback_ - front < 1 ) {
497 cback_ = back_.load( memory_model::memory_order_acquire );
498 if ( cback_ - front < 1 )
502 return &buffer_[buffer_.mod( front )];
505 /// Removes front element of ring-buffer
507 If the ring-buffer is empty, returns \p false.
508 Otherwise, pops the first element from the ring.
512 counter_type front = front_.load( memory_model::memory_order_relaxed );
513 assert( static_cast<size_t>( cback_ - front ) <= capacity());
515 if ( cback_ - front < 1 ) {
516 cback_ = back_.load( memory_model::memory_order_acquire );
517 if ( cback_ - front < 1 )
522 value_cleaner()( buffer_[buffer_.mod( front )] );
524 front_.store( front + 1, memory_model::memory_order_release );
528 /// Clears the ring buffer (only consumer can call this function!)
535 /// Checks if the ring-buffer is empty
538 return front_.load( memory_model::memory_order_relaxed ) == back_.load( memory_model::memory_order_relaxed );
541 /// Checks if the ring-buffer is full
544 return back_.load( memory_model::memory_order_relaxed ) - front_.load( memory_model::memory_order_relaxed ) >= capacity();
547 /// Returns the current size of ring buffer
550 return static_cast<size_t>( back_.load( memory_model::memory_order_relaxed ) - front_.load( memory_model::memory_order_relaxed ));
553 /// Returns capacity of the ring buffer
554 size_t capacity() const
556 return buffer_.capacity();
561 atomics::atomic<counter_type> front_;
562 typename opt::details::apply_padding< atomics::atomic<counter_type>, traits::padding >::padding_type pad1_;
563 atomics::atomic<counter_type> back_;
564 typename opt::details::apply_padding< atomics::atomic<counter_type>, traits::padding >::padding_type pad2_;
565 counter_type pfront_;
566 typename opt::details::apply_padding< counter_type, traits::padding >::padding_type pad3_;
568 typename opt::details::apply_padding< counter_type, traits::padding >::padding_type pad4_;
575 /// Single-producer single-consumer ring buffer for untyped variable-sized data
576 /** @ingroup cds_nonintrusive_queue
577 @anchor cds_nonintrusive_WeakRingBuffer_void
579 This SPSC ring-buffer is intended for data of variable size. The producer
580 allocates a buffer from ring, you fill it with data and pushes them back to ring.
581 The consumer thread reads data from front-end and then pops them:
583 // allocates 1M ring buffer
584 WeakRingBuffer<void> theRing( 1024 * 1024 );
586 void producer_thread()
588 // Get data of size N bytes
594 std::tie( data, size ) = get_data();
596 if ( data == nullptr )
599 // Allocates a buffer from the ring
600 void* buf = theRing.back( size );
602 std::cout << "The ring is full" << std::endl;
606 memcpy( buf, data, size );
608 // Push data into the ring
613 void consumer_thread()
616 auto buf = theRing.front();
618 if ( buf.first == nullptr ) {
619 std::cout << "The ring is empty" << std::endl;
624 process_data( buf.first, buf.second );
632 @warning: \p %WeakRingBuffer is developed for 64-bit architecture.
633 32-bit platform must provide support for 64-bit atomics.
635 #ifdef CDS_DOXYGEN_INVOKED
636 template <typename Traits = weak_ringbuffer::traits>
638 template <typename Traits>
640 class WeakRingBuffer<void, Traits>: public cds::bounded_container
643 typedef Traits traits; ///< Ring buffer traits
644 typedef typename traits::memory_model memory_model; ///< Memory ordering. See \p cds::opt::memory_model option
648 typedef typename traits::buffer::template rebind< uint8_t >::other buffer;
649 typedef uint64_t counter_type;
653 /// Creates the ring buffer of \p capacity bytes
655 For \p cds::opt::v::uninitialized_static_buffer the \p nCapacity parameter is ignored.
657 If the buffer capacity is a power of two, lightweight binary arithmetics is used
658 instead of modulo arithmetics.
660 WeakRingBuffer( size_t capacity = 0 )
664 , buffer_( capacity )
666 back_.store( 0, memory_model::memory_order_release );
669 /// [producer] Reserve \p size bytes
671 The function returns a pointer to reserved buffer of \p size bytes.
672 If no enough space in the ring buffer the function returns \p nullptr.
674 After successful \p %back() you should fill the buffer provided and call \p push_back():
676 // allocates 1M ring buffer
677 WeakRingBuffer<void> theRing( 1024 * 1024 );
679 void producer_thread()
681 // Get data of size N bytes
687 std::tie( data, size ) = get_data();
689 if ( data == nullptr )
692 // Allocates a buffer from the ring
693 void* buf = theRing.back( size );
695 std::cout << "The ring is full" << std::endl;
699 memcpy( buf, data, size );
701 // Push data into the ring
707 void* back( size_t size )
711 // Any data is rounded to 8-byte boundary
712 size_t real_size = calc_real_size( size );
714 // check if we can reserve read_size bytes
715 assert( real_size < capacity());
716 counter_type back = back_.load( memory_model::memory_order_relaxed );
718 assert( static_cast<size_t>( back - pfront_ ) <= capacity());
720 if ( static_cast<size_t>( pfront_ + capacity() - back ) < real_size ) {
721 pfront_ = front_.load( memory_model::memory_order_acquire );
723 if ( static_cast<size_t>( pfront_ + capacity() - back ) < real_size ) {
729 uint8_t* reserved = buffer_.buffer() + buffer_.mod( back );
731 // Check if the buffer free space is enough for storing real_size bytes
732 size_t tail_size = capacity() - static_cast<size_t>( buffer_.mod( back ));
733 if ( tail_size < real_size ) {
735 assert( tail_size >= sizeof( size_t ));
736 assert( !is_tail( tail_size ));
738 *reinterpret_cast<size_t*>( reserved ) = make_tail( tail_size - sizeof(size_t));
741 // We must be in beginning of buffer
742 assert( buffer_.mod( back ) == 0 );
744 if ( static_cast<size_t>( pfront_ + capacity() - back ) < real_size ) {
745 pfront_ = front_.load( memory_model::memory_order_acquire );
747 if ( static_cast<size_t>( pfront_ + capacity() - back ) < real_size ) {
753 back_.store( back, memory_model::memory_order_release );
754 reserved = buffer_.buffer();
757 // reserve and store size
758 *reinterpret_cast<size_t*>( reserved ) = size;
760 return reinterpret_cast<void*>( reserved + sizeof( size_t ));
763 /// [producer] Push reserved bytes into ring
765 The function pushes reserved buffer into the ring. Afte this call,
766 the buffer becomes visible by a consumer:
768 // allocates 1M ring buffer
769 WeakRingBuffer<void> theRing( 1024 * 1024 );
771 void producer_thread()
773 // Get data of size N bytes
779 std::tie( data, size ) = get_data();
781 if ( data == nullptr )
784 // Allocates a buffer from the ring
785 void* buf = theRing.back( size );
787 std::cout << "The ring is full" << std::endl;
791 memcpy( buf, data, size );
793 // Push data into the ring
801 counter_type back = back_.load( memory_model::memory_order_relaxed );
802 uint8_t* reserved = buffer_.buffer() + buffer_.mod( back );
804 size_t real_size = calc_real_size( *reinterpret_cast<size_t*>( reserved ));
805 assert( real_size < capacity());
807 back_.store( back + real_size, memory_model::memory_order_release );
810 /// [producer] Push \p data of \p size bytes into ring
812 This function invokes \p back( size ), \p memcpy( buf, data, size )
813 and \p push_back() in one call.
815 bool push_back( void const* data, size_t size )
817 void* buf = back( size );
819 memcpy( buf, data, size );
826 /// [consumer] Get top data from the ring
828 If the ring is empty, the function returns \p nullptr in \p std:pair::first.
830 std::pair<void*, size_t> front()
832 counter_type front = front_.load( memory_model::memory_order_relaxed );
833 assert( static_cast<size_t>( cback_ - front ) < capacity());
835 if ( cback_ - front < sizeof( size_t )) {
836 cback_ = back_.load( memory_model::memory_order_acquire );
837 if ( cback_ - front < sizeof( size_t ))
838 return std::make_pair( nullptr, 0u );
841 uint8_t * buf = buffer_.buffer() + buffer_.mod( front );
844 assert( ( reinterpret_cast<uintptr_t>( buf ) & ( sizeof( uintptr_t ) - 1 )) == 0 );
846 size_t size = *reinterpret_cast<size_t*>( buf );
847 if ( is_tail( size )) {
849 CDS_VERIFY( pop_front());
851 front = front_.load( memory_model::memory_order_relaxed );
853 if ( cback_ - front < sizeof( size_t )) {
854 cback_ = back_.load( memory_model::memory_order_acquire );
855 if ( cback_ - front < sizeof( size_t ) )
856 return std::make_pair( nullptr, 0u );
859 buf = buffer_.buffer() + buffer_.mod( front );
860 size = *reinterpret_cast<size_t*>( buf );
862 assert( !is_tail( size ));
863 assert( buf == buffer_.buffer());
867 size_t real_size = calc_real_size( size );
868 if ( static_cast<size_t>( cback_ - front ) < real_size ) {
869 cback_ = back_.load( memory_model::memory_order_acquire );
870 assert( static_cast<size_t>( cback_ - front ) >= real_size );
874 return std::make_pair( reinterpret_cast<void*>( buf + sizeof( size_t )), size );
877 /// [consumer] Pops top data
879 Typical consumer workloop:
881 // allocates 1M ring buffer
882 WeakRingBuffer<void> theRing( 1024 * 1024 );
884 void consumer_thread()
887 auto buf = theRing.front();
889 if ( buf.first == nullptr ) {
890 std::cout << "The ring is empty" << std::endl;
895 process_data( buf.first, buf.second );
905 counter_type front = front_.load( memory_model::memory_order_relaxed );
906 assert( static_cast<size_t>( cback_ - front ) <= capacity());
908 if ( cback_ - front < sizeof(size_t)) {
909 cback_ = back_.load( memory_model::memory_order_acquire );
910 if ( cback_ - front < sizeof( size_t ))
914 uint8_t * buf = buffer_.buffer() + buffer_.mod( front );
917 assert( ( reinterpret_cast<uintptr_t>( buf ) & ( sizeof( uintptr_t ) - 1 )) == 0 );
919 size_t size = *reinterpret_cast<size_t*>( buf );
920 size_t real_size = calc_real_size( untail( size ));
923 if ( static_cast<size_t>( cback_ - front ) < real_size ) {
924 cback_ = back_.load( memory_model::memory_order_acquire );
925 assert( static_cast<size_t>( cback_ - front ) >= real_size );
929 front_.store( front + real_size, memory_model::memory_order_release );
933 /// [consumer] Clears the ring buffer
936 for ( auto el = front(); el.first; el = front())
940 /// Checks if the ring-buffer is empty
943 return front_.load( memory_model::memory_order_relaxed ) == back_.load( memory_model::memory_order_relaxed );
946 /// Checks if the ring-buffer is full
949 return back_.load( memory_model::memory_order_relaxed ) - front_.load( memory_model::memory_order_relaxed ) >= capacity();
952 /// Returns the current size of ring buffer
955 return static_cast<size_t>( back_.load( memory_model::memory_order_relaxed ) - front_.load( memory_model::memory_order_relaxed ));
958 /// Returns capacity of the ring buffer
959 size_t capacity() const
961 return buffer_.capacity();
966 static size_t calc_real_size( size_t size )
968 size_t real_size = (( size + sizeof( uintptr_t ) - 1 ) & ~( sizeof( uintptr_t ) - 1 )) + sizeof( size_t );
970 assert( real_size > size );
971 assert( real_size - size >= sizeof( size_t ));
976 static bool is_tail( size_t size )
978 return ( size & ( size_t( 1 ) << ( sizeof( size_t ) * 8 - 1 ))) != 0;
981 static size_t make_tail( size_t size )
983 return size | ( size_t( 1 ) << ( sizeof( size_t ) * 8 - 1 ));
986 static size_t untail( size_t size )
988 return size & (( size_t( 1 ) << ( sizeof( size_t ) * 8 - 1 )) - 1);
994 atomics::atomic<counter_type> front_;
995 typename opt::details::apply_padding< atomics::atomic<counter_type>, traits::padding >::padding_type pad1_;
996 atomics::atomic<counter_type> back_;
997 typename opt::details::apply_padding< atomics::atomic<counter_type>, traits::padding >::padding_type pad2_;
998 counter_type pfront_;
999 typename opt::details::apply_padding< counter_type, traits::padding >::padding_type pad3_;
1000 counter_type cback_;
1001 typename opt::details::apply_padding< counter_type, traits::padding >::padding_type pad4_;
1007 }} // namespace cds::container
1010 #endif // #ifndef CDSLIB_CONTAINER_WEAK_RINGBUFFER_H