From b261ab5d734918e0284079cf0eb9388bcb518642 Mon Sep 17 00:00:00 2001 From: khizmax Date: Fri, 12 May 2017 17:08:59 +0300 Subject: [PATCH] Added prototype for untyped variable-sized WeakRingBuffer (not tested yet, to be continued) --- cds/container/weak_ringbuffer.h | 273 +++++++++++++++++++++++++++++++- 1 file changed, 265 insertions(+), 8 deletions(-) diff --git a/cds/container/weak_ringbuffer.h b/cds/container/weak_ringbuffer.h index 17b249b0..3ae5dd64 100644 --- a/cds/container/weak_ringbuffer.h +++ b/cds/container/weak_ringbuffer.h @@ -126,6 +126,10 @@ namespace cds { namespace container { Ring buffer is a bounded queue. Additionally, \p %WeakRingBuffer supports batch operations - you can push/pop an array of elements. + + There are a specialization \ref cds_nonintrusive_WeakRingBuffer_void "WeakRingBuffer" + that is not a queue but a "memory pool" between producer and consumer threads. + \p WeakRingBuffer supports data of different size. */ template class WeakRingBuffer: public cds::bounded_container @@ -208,7 +212,7 @@ namespace cds { namespace container { }); \endcode - Returns \p true if success or \p false if not enought sufficient space in the ring + Returns \p true if success or \p false if not enough space in the ring */ template bool push( Q* arr, size_t count, CopyFunc copy ) @@ -222,7 +226,7 @@ namespace cds { namespace container { pfront_ = front_.load( memory_model::memory_order_acquire ); if ( pfront_ + capacity() - back < count ) { - // not enought space + // not enough space return false; } } @@ -246,7 +250,7 @@ namespace cds { namespace container { The function is available only if std::is_constructible::value is \p true. - Returns \p true if success or \p false if not enought sufficient space in the ring + Returns \p true if success or \p false if not enough space in the ring */ template typename std::enable_if< std::is_constructible::value, bool>::type @@ -274,7 +278,7 @@ namespace cds { namespace container { pfront_ = front_.load( memory_model::memory_order_acquire ); if ( pfront_ + capacity() - back < 1 ) { - // not enought space + // not enough space return false; } } @@ -307,7 +311,7 @@ namespace cds { namespace container { pfront_ = front_.load( memory_model::memory_order_acquire ); if ( pfront_ + capacity() - back < 1 ) { - // not enought space + // not enough space return false; } } @@ -364,7 +368,7 @@ namespace cds { namespace container { void copy_func( Q& dest, value_type& elemen ); \endcode - Returns \p true if success or \p false if not enought sufficient space in the ring + Returns \p true if success or \p false if not enough space in the ring */ template bool pop( Q* arr, size_t count, CopyFunc copy ) @@ -402,7 +406,7 @@ namespace cds { namespace container { The function is available only if std::is_assignable::value is \p true. - Returns \p true if success or \p false if not enought sufficient space in the ring + Returns \p true if success or \p false if not enough space in the ring */ template typename std::enable_if< std::is_assignable::value, bool>::type @@ -517,13 +521,245 @@ namespace cds { namespace container { return true; } - /// Clears the ring buffer + /// Clears the ring buffer (only consumer can call this function!) void clear() { value_type v; while ( pop( v ) ); } + /// Checks if the ring-buffer is empty + bool empty() const + { + return front_.load( memory_model::memory_order_relaxed ) == back_.load( memory_model::memory_order_relaxed ); + } + + /// Checks if the ring-buffer is full + bool full() const + { + return back_.load( memory_model::memory_order_relaxed ) - front_.load( memory_model::memory_order_relaxed ) >= capacity(); + } + + /// Returns the current size of ring buffer + size_t size() const + { + return back_.load( memory_model::memory_order_relaxed ) - front_.load( memory_model::memory_order_relaxed ); + } + + /// Returns capacity of the ring buffer + size_t capacity() const + { + return buffer_.capacity(); + } + + private: + //@cond + atomics::atomic front_; + typename opt::details::apply_padding< atomics::atomic, traits::padding >::padding_type pad1_; + atomics::atomic back_; + typename opt::details::apply_padding< atomics::atomic, traits::padding >::padding_type pad2_; + size_t pfront_; + typename opt::details::apply_padding< size_t, traits::padding >::padding_type pad3_; + size_t cback_; + typename opt::details::apply_padding< size_t, traits::padding >::padding_type pad4_; + + buffer buffer_; + //@endcond + }; + + + /// Single-producer single-consumer ring buffer for untyped variable-sized data + /** @ingroup cds_nonintrusive_queue + @anchor cds_nonintrusive_WeakRingBuffer_void + */ + template + class WeakRingBuffer: public cds::bounded_container + { + public: + typedef Traits traits; ///< Ring buffer traits + typedef typename traits::memory_model memory_model; ///< Memory ordering. See \p cds::opt::memory_model option + + private: + //@cond + typedef typename traits::buffer::template rebind< uint8_t >::other buffer; + //@endcond + + public: + /// Creates the ring buffer of \p capacity bytes + /** + For \p cds::opt::v::uninitialized_static_buffer the \p nCapacity parameter is ignored. + + If the buffer capacity is a power of two, lightweight binary arithmetics is used + instead of modulo arithmetics. + */ + WeakRingBuffer( size_t capacity = 0 ) + : front_( 0 ) + , pfront_( 0 ) + , cback_( 0 ) + , buffer_( capacity ) + { + back_.store( 0, memory_model::memory_order_release ); + } + + /// [producer] Reserve \p size bytes + void* back( size_t size ) + { + // Any data is rounded to 8-byte boundary + size_t real_size = calc_real_size( size ); + + // check if we can reserve read_size bytes + assert( real_size < capacity() ); + size_t back = back_.load( memory_model::memory_order_relaxed ); + + assert( back - pfront_ <= capacity() ); + + if ( pfront_ + capacity() - back < real_size ) { + pfront_ = front_.load( memory_model::memory_order_acquire ); + + if ( pfront_ + capacity() - back < real_size ) { + // not enough space + return nullptr; + } + } + + uint8_t* reserved = buffer_.buffer() + buffer_.mod( back ); + + // Check if the buffer free space is enough for storing real_size bytes + size_t tail_size = capacity() - buffer_.mod( back ); + if ( tail_size < real_size ) { + // make unused tail + assert( tail_size >= sizeof( size_t ) ); + assert( !is_tail( tail_size ) ); + + *reinterpret_cast( reserved ) = make_tail( tail_size - sizeof(size_t)); + back += tail_size; + + // We must be in beginning of buffer + assert( buffer_.mod( back ) == 0 ); + + if ( pfront_ + capacity() - back < real_size ) { + pfront_ = front_.load( memory_model::memory_order_acquire ); + + if ( pfront_ + capacity() - back < real_size ) { + // not enough space + return nullptr; + } + } + + reserved = buffer_.buffer(); + } + + // reserve and store size + uint8_t* reserved = buffer_.buffer() + buffer_.mod( back ); + *reinterpret_cast( reserved ) = size; + + return reinterpret_cast( reserved + sizeof( size_t ) ); + } + + /// [producer] Push reserved bytes into ring + void push_back() + { + size_t back = back_.load( memory_model::memory_order_relaxed ); + uint8_t* reserved = buffer_.buffer() + buffer_.mod( back ); + + size_t real_size = calc_real_size( *reinterpret_cast( reserved ) ); + assert( real_size < capacity() ); + + back_.store( back + real_size, memory_model::memory_order_release ); + } + + /// [producer] Push \p data of \p size bytes into ring + bool push_back( void const* data, size_t size ) + { + void* buf = back( size ); + if ( buf ) { + memcpy( buf, data, size ); + push_back(); + return true; + } + return false; + } + + /// [consumer] Get top data from the ring + std::pair front() + { + size_t front = front_.load( memory_model::memory_order_relaxed ); + assert( cback_ - front < capacity() ); + + if ( cback_ - front < sizeof( size_t )) { + cback_ = back_.load( memory_model::memory_order_acquire ); + if ( cback_ - front < sizeof( size_t ) ) + return std::make_pair( nullptr, 0u ); + } + + uint8_t * buf = buffer_.buffer() + buffer_.mod( front ); + + // check alignment + assert( ( reinterpret_cast( buf ) & ( sizeof( uintptr_t ) - 1 ) ) == 0 ); + + size_t size = *reinterpret_cast( buf ); + if ( is_tail( size ) ) { + // unused tail, skip + CDS_VERIFY( pop_front() ); + + front = front_.load( memory_model::memory_order_relaxed ); + buf = buffer_.buffer() + buffer_.mod( front ); + size = *reinterpret_cast( buf ); + + assert( !is_tail( size ) ); + } + +#ifdef _DEBUG + size_t real_size = calc_real_size( size ); + if ( cback_ - front < real_size ) { + cback_ = back_.load( memory_model::memory_order_acquire ); + assert( cback_ - front >= real_size ); + } +#endif + + return std::make_pair( reinterpret_cast( buf + sizeof( size_t ) ), size ); + } + + /// [consumer] Pops top data + bool pop_front() + { + size_t front = front_.load( memory_model::memory_order_relaxed ); + assert( cback_ - front <= capacity() ); + + if ( cback_ - front < sizeof(size_t) ) { + cback_ = back_.load( memory_model::memory_order_acquire ); + if ( cback_ - front < sizeof( size_t ) ) + return false; + } + + uint8_t * buf = buffer_.buffer() + buffer_.mod( front ); + + // check alignment + assert( ( reinterpret_cast( buf ) & ( sizeof( uintptr_t ) - 1 ) ) == 0 ); + + size_t size = *reinterpret_cast( buf ); + assert( !is_tail( size ) ); + + size_t real_size = calc_real_size( size ); + +#ifdef _DEBUG + if ( cback_ - front < real_size ) { + cback_ = back_.load( memory_model::memory_order_acquire ); + assert( cback_ - front >= real_size ); + } +#endif + + front_.store( front + real_size, memory_model::memory_order_release ); + return true; + + } + + /// [consumer] Clears the ring buffer + void clear() + { + for ( auto el = front(); el.first; el = front() ) + pop_front(); + } /// Checks if the ring-buffer is empty bool empty() const @@ -549,6 +785,27 @@ namespace cds { namespace container { return buffer_.capacity(); } + private: + static size_t calc_real_size( size_t size ) + { + size_t real_size = (( size + sizeof( uintptr_t ) - 1 ) & ~( sizeof( uintptr_t ) - 1 )) + sizeof( size_t ); + + assert( real_size > size ); + assert( real_size - size >= sizeof( size_t ) ); + + return real_size; + } + + static bool is_tail( size_t size ) + { + return ( size & ( size_t( 1 ) << ( sizeof( size_t ) * 8 - 1 ))) != 0; + } + + static size_t make_tail( size_t size ) + { + return size | ( size_t( 1 ) << ( sizeof( size_t ) * 8 - 1 )); + } + private: //@cond atomics::atomic front_; -- 2.34.1