From 9ba22af14cfab4fa84ea4451d567b7ef2704a998 Mon Sep 17 00:00:00 2001 From: khizmax Date: Wed, 8 Oct 2014 17:47:11 +0400 Subject: [PATCH] RWQueue refactoring --- cds/container/msqueue.h | 8 +- cds/container/rwqueue.h | 279 ++++++++++++++------------- tests/test-hdr/queue/hdr_queue.h | 6 - tests/test-hdr/queue/hdr_queue_new.h | 17 +- tests/test-hdr/queue/hdr_rwqueue.cpp | 49 +++-- tests/unit/queue/queue_defs.h | 4 +- tests/unit/queue/queue_type.h | 25 +-- 7 files changed, 212 insertions(+), 176 deletions(-) diff --git a/cds/container/msqueue.h b/cds/container/msqueue.h index 7b467f83..42c5c5aa 100644 --- a/cds/container/msqueue.h +++ b/cds/container/msqueue.h @@ -180,7 +180,9 @@ namespace cds { namespace container { }; public: - typedef T value_type; ///< Value type stored in the queue + typedef T value_type; ///< Value type stored in the queue + typedef Traits traits; ///< Queue traits + typedef typename base_class::gc gc; ///< Garbage collector used typedef typename base_class::back_off back_off; ///< Back-off strategy used typedef typename maker::allocator_type allocator_type; ///< Allocator type used for allocate/deallocate the nodes @@ -251,7 +253,7 @@ namespace cds { namespace container { return false; } - /// Enqueues \p data to queue using a functor + /// Enqueues \p data to the queue using a functor /** \p Func is a functor called to create node. The functor \p f takes one argument - a reference to a new node of type \ref value_type : @@ -361,6 +363,8 @@ namespace cds { namespace container { } /// Returns queue's item count (see \ref intrusive::MSQueue::size for explanation) + /** \copydetails cds::intrusive::MSQueue::size() + */ size_t size() const { return base_class::size(); diff --git a/cds/container/rwqueue.h b/cds/container/rwqueue.h index 6754b6bc..16226707 100644 --- a/cds/container/rwqueue.h +++ b/cds/container/rwqueue.h @@ -3,16 +3,65 @@ #ifndef __CDS_CONTAINER_RWQUEUE_H #define __CDS_CONTAINER_RWQUEUE_H -#include -#include // ref #include // unique_lock -#include +#include #include -#include -#include -#include namespace cds { namespace container { + /// RWQueue related definitions + /** @ingroup cds_nonintrusive_helper + */ + namespace rwqueue { + /// RWQueue default type traits + struct traits + { + /// Lock policy + typedef cds::lock::Spin lock_type; + + /// Node allocator + typedef CDS_DEFAULT_ALLOCATOR allocator; + + /// Item counting feature; by default, disabled. Use \p cds::atomicity::item_counter to enable item counting + typedef cds::atomicity::empty_item_counter item_counter; + + /// Alignment of internal queue data. Default is \p opt::cache_line_alignment + enum { alignment = opt::cache_line_alignment }; + }; + + /// Metafunction converting option list to \p rwqueue::traits + /** + Supported \p Options are: + - opt::lock_type - lock policy, default is \p cds::lock::Spin. Any type satisfied \p Mutex C++ concept may be used. + - opt::allocator - allocator (like \p std::allocator) used for allocating queue nodes. Default is \ref CDS_DEFAULT_ALLOCATOR + - opt::item_counter - the type of item counting feature. Default is \p cds::atomicity::empty_item_counter (item counting disabled) + To enable item counting use \p cds::atomicity::item_counter. + - opt::alignment - the alignment for internal queue data. Default is \p opt::cache_line_alignment + - opt::memory_model - C++ memory ordering model. Can be \p opt::v::relaxed_ordering (relaxed memory model, the default) + or \p opt::v::sequential_consistent (sequentially consisnent memory model). + + Example: declare mutex-based \p %RWQueue with item counting + \code + typedef cds::container::RWQueue< Foo, + typename cds::container::rwqueue::make_traits< + cds::opt::item_counter< cds::atomicity::item_counter >, + cds::opt::lock_type< std::mutex > + >::type + > myQueue; + \endcode + */ + template + struct make_traits { +# ifdef CDS_DOXYGEN_INVOKED + typedef implementation_defined type; ///< Metafunction result +# else + typedef typename cds::opt::make_options< + typename cds::opt::find_type_traits< traits, Options... >::type + , Options... + >::type type; +# endif + }; + + } // namespace rwqueue /// Michael & Scott blocking queue with fine-grained synchronization schema /** @ingroup cds_nonintrusive_queue @@ -25,61 +74,47 @@ namespace cds { namespace container { and blocking concurrent queue algorithms" Template arguments - - \p T - type to be stored in the queue - - \p Options - options - - \p Options are: - - opt::allocator - allocator (like \p std::allocator). Default is \ref CDS_DEFAULT_ALLOCATOR - - opt::lock_type - type of lock primitive. Default is cds::lock::Spin. - - opt::item_counter - the type of item counting feature. Default is \ref atomicity::empty_item_counter - - opt::stat - the type to gather internal statistics. - Possible option value are: queue_stat, queue_dummy_stat, user-provided class that supports queue_stat interface. - Default is \ref intrusive::queue_dummy_stat. - RWQueue uses only \p onEnqueue and \p onDequeue counter. - - opt::alignment - the alignment for \p lock_type to prevent false sharing. Default is opt::cache_line_alignment - - This queue has no intrusive counterpart. + - \p T - value type to be stored in the queue + - \p Traits - queue traits, default is \p rwqueue::traits. You can use \p rwqueue::make_traits + metafunction to make your traits or just derive your traits from \p %rwqueue::traits: + \code + struct myTraits: public cds::container::rwqueue::traits { + typedef cds::atomicity::item_counter item_counter; + }; + typedef cds::container::RWQueue< Foo, myTraits > myQueue; + + // Equivalent make_traits example: + typedef cds::container::RWQueue< Foo, + typename cds::container::rwqueue::make_traits< + cds::opt::item_counter< cds::atomicity::item_counter > + >::type + > myQueue; + \endcode */ - template + template class RWQueue { - //@cond - struct default_options - { - typedef lock::Spin lock_type; - typedef CDS_DEFAULT_ALLOCATOR allocator; - typedef atomicity::empty_item_counter item_counter; - typedef intrusive::queue_dummy_stat stat; - enum { alignment = opt::cache_line_alignment }; - }; - //@endcond - - public: - //@cond - typedef typename opt::make_options< - typename cds::opt::find_type_traits< default_options, Options... >::type - ,Options... - >::type options; - //@endcond - public: /// Rebind template arguments - template + template struct rebind { - typedef RWQueue< T2, Options2...> other ; ///< Rebinding result + typedef RWQueue< T2, Traits2 > other ; ///< Rebinding result }; public: - typedef T value_type ; ///< type of value stored in the queue + typedef T value_type; ///< Type of value to be stored in the queue + typedef Traits traits; ///< Queue traits - typedef typename options::lock_type lock_type ; ///< Locking primitive used + typedef typename traits::lock_type lock_type; ///< Locking primitive + typedef typename traits::item_counter item_counter; ///< Item counting policy used + typedef typename traits::memory_model memory_model; ///< Memory ordering. See \p cds::opt::memory_model option protected: //@cond /// Node type struct node_type { - node_type * volatile m_pNext ; ///< Pointer to the next node in queue + node_type * volatile m_pNext ; ///< Pointer to the next node in the queue value_type m_value ; ///< Value stored in the node node_type( value_type const& v ) @@ -100,18 +135,15 @@ namespace cds { namespace container { //@endcond public: - typedef typename options::allocator::template rebind::other allocator_type ; ///< Allocator type used for allocate/deallocate the queue nodes - typedef typename options::item_counter item_counter ; ///< Item counting policy used - typedef typename options::stat stat ; ///< Internal statistics policy used + typedef typename traits::allocator::template rebind::other allocator_type; ///< Allocator type used for allocate/deallocate the queue nodes protected: //@cond - typedef typename opt::details::alignment_setter< lock_type, options::alignment >::type aligned_lock_type; - typedef std::unique_lock auto_lock; + typedef typename opt::details::alignment_setter< lock_type, traits::alignment >::type aligned_lock_type; + typedef std::unique_lock scoped_lock; typedef cds::details::Allocator< node_type, allocator_type > node_allocator; item_counter m_ItemCounter; - stat m_Stat; mutable aligned_lock_type m_HeadLock; node_type * m_pHead; @@ -146,12 +178,11 @@ namespace cds { namespace container { { assert( p != nullptr ); { - auto_lock lock( m_TailLock ); + scoped_lock lock( m_TailLock ); m_pTail = m_pTail->m_pNext = p; } ++m_ItemCounter; - m_Stat.onEnqueue(); return true; } @@ -185,37 +216,29 @@ namespace cds { namespace container { bool enqueue( value_type const& data ) { scoped_node_ptr p( alloc_node( data )); - if ( enqueue_node( p.get() )) { + if ( enqueue_node( p )) { p.release(); return true; } return false; } - /// Enqueues \p data to queue using copy functor + /// Enqueues \p data to the queue using a functor /** - \p Func is a functor called to copy value \p data of type \p Type - which may be differ from type \p T stored in the queue. - The functor's interface is: + \p Func is a functor called to create node. + The functor \p f takes one argument - a reference to a new node of type \ref value_type : \code - struct myFunctor { - void operator()(T& dest, Type const& data) - { - // // Code to copy \p data to \p dest - dest = data; - } - }; + cds::container::RWQueue< cds::gc::HP, Foo > myQueue; + Bar bar; + myQueue.enqueue_with( [&bar]( Foo& dest ) { dest = bar; } ); \endcode - You may use \p boost:ref construction to pass functor \p f by reference. - - Requirements The functor \p Func should not throw any exception. */ - template - bool enqueue( Type const& data, Func f ) + template + bool enqueue_with( Func f ) { - scoped_node_ptr p( alloc_node()); - f( p->m_value, data ); - if ( enqueue_node( p.get() )) { + scoped_node_ptr p( alloc_node() ); + f( p->m_value ); + if ( enqueue_node( p ) ) { p.release(); return true; } @@ -234,92 +257,83 @@ namespace cds { namespace container { return false; } - /// Dequeues a value using copy functor + /// Synonym for \p enqueue() function + bool push( value_type const& val ) + { + return enqueue( val ); + } + + /// Synonym for \p enqueue_with() function + template + bool push_with( Func f ) + { + return enqueue_with( f ); + } + + /// Dequeues a value to \p dest. + /** + If queue is empty returns \a false, \p dest can be corrupted. + If queue is not empty returns \a true, \p dest contains the value dequeued + */ + bool dequeue( value_type& dest ) + { + return dequeue( [&dest]( value_type * src ) { dest = src; } ); + } + + /// Dequeues a value using a functor /** - \p Func is a functor called to copy dequeued value to \p dest of type \p Type - which may be differ from type \p T stored in the queue. - The functor's interface is: + \p Func is a functor called to copy dequeued value. + The functor takes one argument - a reference to removed node: \code - struct myFunctor { - void operator()(Type& dest, T const& data) - { - // // Copy \p data to \p dest - dest = data; - } - }; + cds:container::RWQueue< cds::gc::HP, Foo > myQueue; + Bar bar; + myQueue.dequeue_with( [&bar]( Foo& src ) { bar = std::move( src );}); \endcode - You may use \p boost:ref construction to pass functor \p f by reference. - - Requirements The functor \p Func should not throw any exception. + The functor is called only if the queue is not empty. */ - template - bool dequeue( Type& dest, Func f ) + template + bool dequeue_with( Func f ) { node_type * pNode; { - auto_lock lock( m_HeadLock ); + scoped_lock lock( m_HeadLock ); pNode = m_pHead; node_type * pNewHead = pNode->m_pNext; if ( pNewHead == nullptr ) return false; - f( dest, pNewHead->m_value ); + f( pNewHead->m_value ); m_pHead = pNewHead; } // unlock here --m_ItemCounter; free_node( pNode ); - m_Stat.onDequeue(); return true; } - /** Dequeues a value to \p dest. - - If queue is empty returns \a false, \p dest may be corrupted. - If queue is not empty returns \a true, \p dest contains the value dequeued - */ - bool dequeue( value_type& dest ) - { - typedef cds::details::trivial_assign functor; - return dequeue( dest, functor() ); - } - - /// Synonym for \ref enqueue - bool push( value_type const& data ) - { - return enqueue( data ); - } - - /// Synonym for template version of \ref enqueue function - template - bool push( Type const& data, Func f ) - { - return enqueue( data, f ); - } - - /// Synonym for \ref dequeue - bool pop( value_type& data ) + /// Synonym for \p dequeue() function + bool pop( value_type& dest ) { - return dequeue( data ); + return dequeue( dest ); } - /// Synonym for template version of \ref dequeue function - template - bool pop( Type& dest, Func f ) + /// Synonym for \p dequeue_with() function + template + bool pop_with( Func f ) { - return dequeue( dest, f ); + return dequeue_with( f ); } /// Checks if queue is empty bool empty() const { - auto_lock lock( m_HeadLock ); + scoped_lock lock( m_HeadLock ); return m_pHead->m_pNext == nullptr; } /// Clears queue void clear() { - auto_lock lockR( m_HeadLock ); - auto_lock lockW( m_TailLock ); + scoped_lock lockR( m_HeadLock ); + scoped_lock lockW( m_TailLock ); while ( m_pHead->m_pNext != nullptr ) { node_type * pHead = m_pHead; m_pHead = m_pHead->m_pNext; @@ -329,23 +343,24 @@ namespace cds { namespace container { /// Returns queue's item count /** - The value returned depends on opt::item_counter option. For atomicity::empty_item_counter, + The value returned depends on \p rwqueue::traits::item_counter. For \p atomicity::empty_item_counter, this function always returns 0. - Warning: even if you use real item counter and it returns 0, this fact is not mean that the queue - is empty. To check queue emptyness use \ref empty() method. + @note Even if you use real item counter and it returns 0, this fact is not mean that the queue + is empty. To check queue emptyness use \p empty() method. */ size_t size() const { return m_ItemCounter.value(); } + //@cond /// Returns reference to internal statistics - stat const& statistics() const + cds::container::msqueue::empty_stat statistics() const { - return m_Stat; + return cds::container::msqueue::empty_stat(); } - + //@endcond }; }} // namespace cds::container diff --git a/tests/test-hdr/queue/hdr_queue.h b/tests/test-hdr/queue/hdr_queue.h index 728885e8..f213e771 100644 --- a/tests/test-hdr/queue/hdr_queue.h +++ b/tests/test-hdr/queue/hdr_queue.h @@ -151,9 +151,6 @@ namespace queue { void Vyukov_MPMCCyclicQueue(); void Vyukov_MPMCCyclicQueue_Counted(); - void RWQueue_(); - void RWQueue_Counted(); - CPPUNIT_TEST_SUITE(Queue_TestHeader) CPPUNIT_TEST(FCQueue_deque) CPPUNIT_TEST(FCQueue_deque_elimination) @@ -164,9 +161,6 @@ namespace queue { CPPUNIT_TEST(FCQueue_list_mutex) CPPUNIT_TEST(FCQueue_list_stat) - CPPUNIT_TEST(RWQueue_); - CPPUNIT_TEST(RWQueue_Counted); - CPPUNIT_TEST(Vyukov_MPMCCyclicQueue); CPPUNIT_TEST(Vyukov_MPMCCyclicQueue_Counted); CPPUNIT_TEST_SUITE_END(); diff --git a/tests/test-hdr/queue/hdr_queue_new.h b/tests/test-hdr/queue/hdr_queue_new.h index 4fb373ee..5c8f93a3 100644 --- a/tests/test-hdr/queue/hdr_queue_new.h +++ b/tests/test-hdr/queue/hdr_queue_new.h @@ -287,7 +287,13 @@ namespace queue { void BasketQueue_DHP_Counted_seqcst(); void BasketQueue_DHP_Counted_relax_align(); void BasketQueue_DHP_Counted_seqcst_align(); -/* + + void RWQueue_default(); + void RWQueue_mutex(); + void RWQueue_ic(); + void RWQueue_ic_mutex(); + + /* void FCQueue_deque(); void FCQueue_deque_elimination(); void FCQueue_deque_mutex(); @@ -300,8 +306,6 @@ namespace queue { void Vyukov_MPMCCyclicQueue(); void Vyukov_MPMCCyclicQueue_Counted(); - void RWQueue_(); - void RWQueue_Counted(); */ CPPUNIT_TEST_SUITE( HdrTestQueue ) @@ -402,12 +406,13 @@ namespace queue { CPPUNIT_TEST(FCQueue_list_mutex) CPPUNIT_TEST(FCQueue_list_stat) - CPPUNIT_TEST(RWQueue_); - CPPUNIT_TEST(RWQueue_Counted); - CPPUNIT_TEST(Vyukov_MPMCCyclicQueue); CPPUNIT_TEST(Vyukov_MPMCCyclicQueue_Counted); */ + CPPUNIT_TEST( RWQueue_default) + CPPUNIT_TEST( RWQueue_mutex ) + CPPUNIT_TEST( RWQueue_ic ) + CPPUNIT_TEST_SUITE_END(); }; diff --git a/tests/test-hdr/queue/hdr_rwqueue.cpp b/tests/test-hdr/queue/hdr_rwqueue.cpp index 5969d22f..bb826a08 100644 --- a/tests/test-hdr/queue/hdr_rwqueue.cpp +++ b/tests/test-hdr/queue/hdr_rwqueue.cpp @@ -1,28 +1,45 @@ //$$CDS-header$$ #include +#include -#include "queue/hdr_queue.h" +#include "queue/hdr_queue_new.h" namespace queue { - void Queue_TestHeader::RWQueue_() + void HdrTestQueue::RWQueue_default() { - testNoItemCounter< - cds::container::RWQueue< - int - ,cds::opt::lock_type< cds::SpinLock > - > - >(); + test_no_ic< cds::container::RWQueue< int > >(); } - void Queue_TestHeader::RWQueue_Counted() + void HdrTestQueue::RWQueue_mutex() { - testWithItemCounter< - cds::container::RWQueue< - int - ,cds::opt::lock_type< cds::SpinLock > - ,cds::opt::item_counter< cds::atomicity::item_counter > - > - >(); + struct queue_traits : public cds::container::rwqueue::traits + { + typedef std::mutex lock_type; + }; + + test_no_ic< cds::container::RWQueue< int, queue_traits > >(); + } + + void HdrTestQueue::RWQueue_ic() + { + struct queue_traits : public cds::container::rwqueue::traits + { + typedef cds::atomicity::item_counter item_counter; + }; + + test_ic< cds::container::RWQueue< int, queue_traits > >(); } + + void HdrTestQueue::RWQueue_ic_mutex() + { + struct queue_traits : public cds::container::rwqueue::traits + { + typedef cds::atomicity::item_counter item_counter; + typedef std::mutex lock_type; + }; + + test_ic< cds::container::RWQueue< int, queue_traits > >(); + } + } diff --git a/tests/unit/queue/queue_defs.h b/tests/unit/queue/queue_defs.h index 1731ad3e..febb7fa1 100644 --- a/tests/unit/queue/queue_defs.h +++ b/tests/unit/queue/queue_defs.h @@ -170,12 +170,12 @@ #define CDSUNIT_DECLARE_RWQueue( ITEM_TYPE ) \ TEST_CASE( RWQueue_Spin, ITEM_TYPE ) \ TEST_CASE( RWQueue_Spin_ic, ITEM_TYPE ) \ - TEST_CASE( RWQueue_Spin_stat, ITEM_TYPE ) + TEST_CASE( RWQueue_mutex, ITEM_TYPE ) #define CDSUNIT_TEST_RWQueue \ CPPUNIT_TEST(RWQueue_Spin) \ CPPUNIT_TEST(RWQueue_Spin_ic) \ - CPPUNIT_TEST(RWQueue_Spin_stat) + CPPUNIT_TEST(RWQueue_mutex) // TsigasCycleQueue #define CDSUNIT_DECLARE_TsigasCysleQueue( ITEM_TYPE ) \ diff --git a/tests/unit/queue/queue_type.h b/tests/unit/queue/queue_type.h index 1060ea11..2bb57bb0 100644 --- a/tests/unit/queue/queue_type.h +++ b/tests/unit/queue/queue_type.h @@ -371,19 +371,20 @@ namespace queue { // RWQueue - typedef cds::container::RWQueue< - Value - > RWQueue_Spin; + typedef cds::container::RWQueue< Value > RWQueue_Spin; - typedef cds::container::RWQueue< - Value - ,cds::opt::item_counter< cds::atomicity::item_counter > - > RWQueue_Spin_ic; + struct traits_RWQueue_Spin_ic : public cds::container::rwqueue::traits + { + typedef cds::atomicity::item_counter item_counter; + }; + typedef cds::container::RWQueue< Value, traits_RWQueue_Spin_ic > RWQueue_Spin_ic; - typedef cds::container::RWQueue< - Value - ,cds::opt::stat< cds::intrusive::queue_stat<> > - > RWQueue_Spin_stat; + struct traits_RWQueue_mutex : public + cds::container::rwqueue::make_traits< + cds::opt::lock_type< std::mutex > + >::type + {}; + typedef cds::container::RWQueue< Value, traits_RWQueue_mutex > traits_RWQueue_mutex; // FCQueue class traits_FCQueue_elimination: @@ -574,7 +575,7 @@ namespace std { << "\t\t fix list call: " << s.m_FixListCount.get() << "\n"; } - static inline std::ostream& operator <<( std::ostream& o, cds::intrusive::optimistic_queue::dummy_stat const& s ) + static inline std::ostream& operator <<( std::ostream& o, cds::intrusive::optimistic_queue::empty_stat const& s ) { return o; } -- 2.34.1