X-Git-Url: http://demsky.eecs.uci.edu/git/?a=blobdiff_plain;f=cds%2Fcontainer%2Frwqueue.h;h=9dd83075e98978372fd9169c5f3b957a2889bcb8;hb=3b358e91223a2824954c4843fbe8cc45c71c516e;hp=4b687dc9fab0a7920717dadc5d906b5cf345606b;hpb=f9c9349624ddf418216d00cd16b303e42ce34813;p=libcds.git diff --git a/cds/container/rwqueue.h b/cds/container/rwqueue.h index 4b687dc9..9dd83075 100644 --- a/cds/container/rwqueue.h +++ b/cds/container/rwqueue.h @@ -1,17 +1,94 @@ -//$$CDS-header$$ +/* + This file is a part of libcds - Concurrent Data Structures library -#ifndef __CDS_CONTAINER_RWQUEUE_H -#define __CDS_CONTAINER_RWQUEUE_H + (C) Copyright Maxim Khizhinsky (libcds.dev@gmail.com) 2006-2017 -#include + Source code repo: http://github.com/khizmax/libcds/ + Download: http://sourceforge.net/projects/libcds/files/ + + Redistribution and use in source and binary forms, with or without + modification, are permitted provided that the following conditions are met: + + * Redistributions of source code must retain the above copyright notice, this + list of conditions and the following disclaimer. + + * Redistributions in binary form must reproduce the above copyright notice, + this list of conditions and the following disclaimer in the documentation + and/or other materials provided with the distribution. + + THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" + AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE + IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE + DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE + FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL + DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR + SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER + CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, + OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. +*/ + +#ifndef CDSLIB_CONTAINER_RWQUEUE_H +#define CDSLIB_CONTAINER_RWQUEUE_H + +#include // unique_lock +#include #include -#include -#include #include -#include -#include namespace cds { namespace container { + /// RWQueue related definitions + /** @ingroup cds_nonintrusive_helper + */ + namespace rwqueue { + /// RWQueue default type traits + struct traits + { + /// Lock policy + typedef cds::sync::spin lock_type; + + /// Node allocator + typedef CDS_DEFAULT_ALLOCATOR allocator; + + /// Item counting feature; by default, disabled. Use \p cds::atomicity::item_counter to enable item counting + typedef cds::atomicity::empty_item_counter item_counter; + + /// Padding for internal critical atomic data. Default is \p opt::cache_line_padding + enum { padding = opt::cache_line_padding }; + }; + + /// Metafunction converting option list to \p rwqueue::traits + /** + Supported \p Options are: + - opt::lock_type - lock policy, default is \p cds::sync::spin. Any type satisfied \p Mutex C++ concept may be used. + - opt::allocator - allocator (like \p std::allocator) used for allocating queue nodes. Default is \ref CDS_DEFAULT_ALLOCATOR + - opt::item_counter - the type of item counting feature. Default is \p cds::atomicity::empty_item_counter (item counting disabled) + To enable item counting use \p cds::atomicity::item_counter. + - \p opt::padding - padding for internal critical data. Default is \p opt::cache_line_padding + + Example: declare mutex-based \p %RWQueue with item counting + \code + typedef cds::container::RWQueue< Foo, + typename cds::container::rwqueue::make_traits< + cds::opt::item_counter< cds::atomicity::item_counter >, + cds::opt::lock_type< std::mutex > + >::type + > myQueue; + \endcode + */ + template + struct make_traits { +# ifdef CDS_DOXYGEN_INVOKED + typedef implementation_defined type; ///< Metafunction result +# else + typedef typename cds::opt::make_options< + typename cds::opt::find_type_traits< traits, Options... >::type + , Options... + >::type type; +# endif + }; + + } // namespace rwqueue /// Michael & Scott blocking queue with fine-grained synchronization schema /** @ingroup cds_nonintrusive_queue @@ -24,62 +101,47 @@ namespace cds { namespace container { and blocking concurrent queue algorithms" Template arguments - - \p T - type to be stored in the queue - - \p Options - options - - \p Options are: - - opt::allocator - allocator (like \p std::allocator). Default is \ref CDS_DEFAULT_ALLOCATOR - - opt::lock_type - type of lock primitive. Default is cds::lock::Spin. - - opt::item_counter - the type of item counting feature. Default is \ref atomicity::empty_item_counter - - opt::stat - the type to gather internal statistics. - Possible option value are: queue_stat, queue_dummy_stat, user-provided class that supports queue_stat interface. - Default is \ref intrusive::queue_dummy_stat. - RWQueue uses only \p onEnqueue and \p onDequeue counter. - - opt::alignment - the alignment for \p lock_type to prevent false sharing. Default is opt::cache_line_alignment - - This queue has no intrusive counterpart. + - \p T - value type to be stored in the queue + - \p Traits - queue traits, default is \p rwqueue::traits. You can use \p rwqueue::make_traits + metafunction to make your traits or just derive your traits from \p %rwqueue::traits: + \code + struct myTraits: public cds::container::rwqueue::traits { + typedef cds::atomicity::item_counter item_counter; + }; + typedef cds::container::RWQueue< Foo, myTraits > myQueue; + + // Equivalent make_traits example: + typedef cds::container::RWQueue< Foo, + typename cds::container::rwqueue::make_traits< + cds::opt::item_counter< cds::atomicity::item_counter > + >::type + > myQueue; + \endcode */ - template + template class RWQueue { - //@cond - struct default_options - { - typedef lock::Spin lock_type; - typedef CDS_DEFAULT_ALLOCATOR allocator; - typedef atomicity::empty_item_counter item_counter; - typedef intrusive::queue_dummy_stat stat; - enum { alignment = opt::cache_line_alignment }; - }; - //@endcond - - public: - //@cond - typedef typename opt::make_options< - typename cds::opt::find_type_traits< default_options, Options... >::type - ,Options... - >::type options; - //@endcond - public: /// Rebind template arguments - template + template struct rebind { - typedef RWQueue< T2, Options2...> other ; ///< Rebinding result + typedef RWQueue< T2, Traits2 > other ; ///< Rebinding result }; public: - typedef T value_type ; ///< type of value stored in the queue + typedef T value_type; ///< Type of value to be stored in the queue + typedef Traits traits; ///< Queue traits - typedef typename options::lock_type lock_type ; ///< Locking primitive used + typedef typename traits::lock_type lock_type; ///< Locking primitive + typedef typename traits::item_counter item_counter; ///< Item counting policy used protected: //@cond /// Node type struct node_type { - node_type * volatile m_pNext ; ///< Pointer to the next node in queue - value_type m_value ; ///< Value stored in the node + atomics::atomic< node_type *> m_pNext; ///< Pointer to the next node in the queue + value_type m_value; ///< Value stored in the node node_type( value_type const& v ) : m_pNext( nullptr ) @@ -99,23 +161,23 @@ namespace cds { namespace container { //@endcond public: - typedef typename options::allocator::template rebind::other allocator_type ; ///< Allocator type used for allocate/deallocate the queue nodes - typedef typename options::item_counter item_counter ; ///< Item counting policy used - typedef typename options::stat stat ; ///< Internal statistics policy used + typedef typename traits::allocator::template rebind::other allocator_type; ///< Allocator type used for allocate/deallocate the queue nodes protected: //@cond - typedef typename opt::details::alignment_setter< lock_type, options::alignment >::type aligned_lock_type; - typedef cds::lock::scoped_lock auto_lock; + typedef std::unique_lock scoped_lock; typedef cds::details::Allocator< node_type, allocator_type > node_allocator; - item_counter m_ItemCounter; - stat m_Stat; + struct head_type { + mutable lock_type lock; + node_type * ptr; + }; - mutable aligned_lock_type m_HeadLock; - node_type * m_pHead; - mutable aligned_lock_type m_TailLock; - node_type * m_pTail; + head_type m_Head; + typename opt::details::apply_padding< head_type, traits::padding >::padding_type pad_; + head_type m_Tail; + + item_counter m_ItemCounter; //@endcond protected: @@ -145,12 +207,11 @@ namespace cds { namespace container { { assert( p != nullptr ); { - auto_lock lock( m_TailLock ); - m_pTail = - m_pTail->m_pNext = p; + scoped_lock lock( m_Tail.lock ); + m_Tail.ptr->m_pNext.store( p, atomics::memory_order_release ); + m_Tail.ptr = p; } ++m_ItemCounter; - m_Stat.onEnqueue(); return true; } @@ -168,53 +229,56 @@ namespace cds { namespace container { RWQueue() { node_type * pNode = alloc_node(); - m_pHead = - m_pTail = pNode; + m_Head.ptr = + m_Tail.ptr = pNode; } /// Destructor clears queue ~RWQueue() { clear(); - assert( m_pHead == m_pTail ); - free_node( m_pHead ); + assert( m_Head.ptr == m_Tail.ptr ); + free_node( m_Head.ptr ); } /// Enqueues \p data. Always return \a true bool enqueue( value_type const& data ) { scoped_node_ptr p( alloc_node( data )); - if ( enqueue_node( p.get() )) { + if ( enqueue_node( p.get())) { + p.release(); + return true; + } + return false; + } + + /// Enqueues \p data, move semantics + bool enqueue( value_type&& data ) + { + scoped_node_ptr p( alloc_node_move( std::move( data ))); + if ( enqueue_node( p.get())) { p.release(); return true; } return false; } - /// Enqueues \p data to queue using copy functor + /// Enqueues \p data to the queue using a functor /** - \p Func is a functor called to copy value \p data of type \p Type - which may be differ from type \p T stored in the queue. - The functor's interface is: + \p Func is a functor called to create node. + The functor \p f takes one argument - a reference to a new node of type \ref value_type : \code - struct myFunctor { - void operator()(T& dest, Type const& data) - { - // // Code to copy \p data to \p dest - dest = data; - } - }; + cds::container::RWQueue< cds::gc::HP, Foo > myQueue; + Bar bar; + myQueue.enqueue_with( [&bar]( Foo& dest ) { dest = bar; } ); \endcode - You may use \p boost:ref construction to pass functor \p f by reference. - - Requirements The functor \p Func should not throw any exception. */ - template - bool enqueue( Type const& data, Func f ) + template + bool enqueue_with( Func f ) { scoped_node_ptr p( alloc_node()); - unref(f)( p->m_value, data ); - if ( enqueue_node( p.get() )) { + f( p->m_value ); + if ( enqueue_node( p.get())) { p.release(); return true; } @@ -226,127 +290,126 @@ namespace cds { namespace container { bool emplace( Args&&... args ) { scoped_node_ptr p( alloc_node_move( std::forward(args)... )); - if ( enqueue_node( p.get() )) { + if ( enqueue_node( p.get())) { p.release(); return true; } return false; } - /// Dequeues a value using copy functor + /// Synonym for \p enqueue( value_type const& ) function + bool push( value_type const& val ) + { + return enqueue( val ); + } + + /// Synonym for \p enqueue( value_type&& ) function + bool push( value_type&& val ) + { + return enqueue( std::move( val )); + } + + /// Synonym for \p enqueue_with() function + template + bool push_with( Func f ) + { + return enqueue_with( f ); + } + + /// Dequeues a value to \p dest. + /** + If queue is empty returns \a false, \p dest can be corrupted. + If queue is not empty returns \a true, \p dest contains the value dequeued + */ + bool dequeue( value_type& dest ) + { + return dequeue_with( [&dest]( value_type& src ) { dest = std::move( src ); }); + } + + /// Dequeues a value using a functor /** - \p Func is a functor called to copy dequeued value to \p dest of type \p Type - which may be differ from type \p T stored in the queue. - The functor's interface is: + \p Func is a functor called to copy dequeued value. + The functor takes one argument - a reference to removed node: \code - struct myFunctor { - void operator()(Type& dest, T const& data) - { - // // Copy \p data to \p dest - dest = data; - } - }; + cds:container::RWQueue< cds::gc::HP, Foo > myQueue; + Bar bar; + myQueue.dequeue_with( [&bar]( Foo& src ) { bar = std::move( src );}); \endcode - You may use \p boost:ref construction to pass functor \p f by reference. - - Requirements The functor \p Func should not throw any exception. + The functor is called only if the queue is not empty. */ - template - bool dequeue( Type& dest, Func f ) + template + bool dequeue_with( Func f ) { node_type * pNode; { - auto_lock lock( m_HeadLock ); - pNode = m_pHead; - node_type * pNewHead = pNode->m_pNext; + scoped_lock lock( m_Head.lock ); + pNode = m_Head.ptr; + node_type * pNewHead = pNode->m_pNext.load( atomics::memory_order_acquire ); if ( pNewHead == nullptr ) return false; - unref(f)( dest, pNewHead->m_value ); - m_pHead = pNewHead; + f( pNewHead->m_value ); + m_Head.ptr = pNewHead; } // unlock here --m_ItemCounter; free_node( pNode ); - m_Stat.onDequeue(); return true; } - /** Dequeues a value to \p dest. - - If queue is empty returns \a false, \p dest may be corrupted. - If queue is not empty returns \a true, \p dest contains the value dequeued - */ - bool dequeue( value_type& dest ) - { - typedef cds::details::trivial_assign functor; - return dequeue( dest, functor() ); - } - - /// Synonym for \ref enqueue - bool push( value_type const& data ) - { - return enqueue( data ); - } - - /// Synonym for template version of \ref enqueue function - template - bool push( Type const& data, Func f ) - { - return enqueue( data, f ); - } - - /// Synonym for \ref dequeue - bool pop( value_type& data ) + /// Synonym for \p dequeue() function + bool pop( value_type& dest ) { - return dequeue( data ); + return dequeue( dest ); } - /// Synonym for template version of \ref dequeue function - template - bool pop( Type& dest, Func f ) + /// Synonym for \p dequeue_with() function + template + bool pop_with( Func f ) { - return dequeue( dest, f ); + return dequeue_with( f ); } /// Checks if queue is empty bool empty() const { - auto_lock lock( m_HeadLock ); - return m_pHead->m_pNext == nullptr; + scoped_lock lock( m_Head.lock ); + return m_Head.ptr->m_pNext.load( atomics::memory_order_relaxed ) == nullptr; } /// Clears queue void clear() { - auto_lock lockR( m_HeadLock ); - auto_lock lockW( m_TailLock ); - while ( m_pHead->m_pNext != nullptr ) { - node_type * pHead = m_pHead; - m_pHead = m_pHead->m_pNext; + scoped_lock lockR( m_Head.lock ); + scoped_lock lockW( m_Tail.lock ); + while ( m_Head.ptr->m_pNext.load( atomics::memory_order_relaxed ) != nullptr ) { + node_type * pHead = m_Head.ptr; + m_Head.ptr = m_Head.ptr->m_pNext.load( atomics::memory_order_relaxed ); free_node( pHead ); } + m_ItemCounter.reset(); } /// Returns queue's item count /** - The value returned depends on opt::item_counter option. For atomicity::empty_item_counter, + The value returned depends on \p rwqueue::traits::item_counter. For \p atomicity::empty_item_counter, this function always returns 0. - Warning: even if you use real item counter and it returns 0, this fact is not mean that the queue - is empty. To check queue emptyness use \ref empty() method. + @note Even if you use real item counter and it returns 0, this fact is not mean that the queue + is empty. To check queue emptyness use \p empty() method. */ size_t size() const { return m_ItemCounter.value(); } - /// Returns reference to internal statistics - stat const& statistics() const + //@cond + /// The class has no internal statistics. For test consistency only + std::nullptr_t statistics() const { - return m_Stat; + return nullptr; } - + //@endcond }; }} // namespace cds::container -#endif // #ifndef __CDS_CONTAINER_RWQUEUE_H +#endif // #ifndef CDSLIB_CONTAINER_RWQUEUE_H