3 #ifndef __CDS_CONTAINER_RWQUEUE_H
4 #define __CDS_CONTAINER_RWQUEUE_H
6 #include <mutex> // unique_lock
7 #include <cds/container/msqueue.h>
8 #include <cds/lock/spinlock.h>
10 namespace cds { namespace container {
11 /// RWQueue related definitions
12 /** @ingroup cds_nonintrusive_helper
15 /// RWQueue default type traits
19 typedef cds::lock::Spin lock_type;
22 typedef CDS_DEFAULT_ALLOCATOR allocator;
24 /// Item counting feature; by default, disabled. Use \p cds::atomicity::item_counter to enable item counting
25 typedef cds::atomicity::empty_item_counter item_counter;
27 /// Alignment of internal queue data. Default is \p opt::cache_line_alignment
28 enum { alignment = opt::cache_line_alignment };
31 /// Metafunction converting option list to \p rwqueue::traits
33 Supported \p Options are:
34 - opt::lock_type - lock policy, default is \p cds::lock::Spin. Any type satisfied \p Mutex C++ concept may be used.
35 - opt::allocator - allocator (like \p std::allocator) used for allocating queue nodes. Default is \ref CDS_DEFAULT_ALLOCATOR
36 - opt::item_counter - the type of item counting feature. Default is \p cds::atomicity::empty_item_counter (item counting disabled)
37 To enable item counting use \p cds::atomicity::item_counter.
38 - opt::alignment - the alignment for internal queue data. Default is \p opt::cache_line_alignment
40 Example: declare mutex-based \p %RWQueue with item counting
42 typedef cds::container::RWQueue< Foo,
43 typename cds::container::rwqueue::make_traits<
44 cds::opt::item_counter< cds::atomicity::item_counter >,
45 cds::opt::lock_type< std::mutex >
50 template <typename... Options>
52 # ifdef CDS_DOXYGEN_INVOKED
53 typedef implementation_defined type; ///< Metafunction result
55 typedef typename cds::opt::make_options<
56 typename cds::opt::find_type_traits< traits, Options... >::type
62 } // namespace rwqueue
64 /// Michael & Scott blocking queue with fine-grained synchronization schema
65 /** @ingroup cds_nonintrusive_queue
66 The queue has two different locks: one for reading and one for writing.
67 Therefore, one writer and one reader can simultaneously access to the queue.
68 The queue does not require any garbage collector.
71 - [1998] Maged Michael, Michael Scott "Simple, fast, and practical non-blocking
72 and blocking concurrent queue algorithms"
74 <b>Template arguments</b>
75 - \p T - value type to be stored in the queue
76 - \p Traits - queue traits, default is \p rwqueue::traits. You can use \p rwqueue::make_traits
77 metafunction to make your traits or just derive your traits from \p %rwqueue::traits:
79 struct myTraits: public cds::container::rwqueue::traits {
80 typedef cds::atomicity::item_counter item_counter;
82 typedef cds::container::RWQueue< Foo, myTraits > myQueue;
84 // Equivalent make_traits example:
85 typedef cds::container::RWQueue< Foo,
86 typename cds::container::rwqueue::make_traits<
87 cds::opt::item_counter< cds::atomicity::item_counter >
92 template <typename T, typename Traits = rwqueue::traits >
96 /// Rebind template arguments
97 template <typename T2, typename Traits2>
99 typedef RWQueue< T2, Traits2 > other ; ///< Rebinding result
103 typedef T value_type; ///< Type of value to be stored in the queue
104 typedef Traits traits; ///< Queue traits
106 typedef typename traits::lock_type lock_type; ///< Locking primitive
107 typedef typename traits::item_counter item_counter; ///< Item counting policy used
114 node_type * volatile m_pNext ; ///< Pointer to the next node in the queue
115 value_type m_value ; ///< Value stored in the node
117 node_type( value_type const& v )
126 template <typename... Args>
127 node_type( Args&&... args )
129 , m_value( std::forward<Args>(args)...)
135 typedef typename traits::allocator::template rebind<node_type>::other allocator_type; ///< Allocator type used for allocate/deallocate the queue nodes
139 typedef typename opt::details::alignment_setter< lock_type, traits::alignment >::type aligned_lock_type;
140 typedef std::unique_lock<lock_type> scoped_lock;
141 typedef cds::details::Allocator< node_type, allocator_type > node_allocator;
143 item_counter m_ItemCounter;
145 mutable aligned_lock_type m_HeadLock;
147 mutable aligned_lock_type m_TailLock;
153 static node_type * alloc_node()
155 return node_allocator().New();
158 static node_type * alloc_node( T const& data )
160 return node_allocator().New( data );
163 template <typename... Args>
164 static node_type * alloc_node_move( Args&&... args )
166 return node_allocator().MoveNew( std::forward<Args>( args )... );
169 static void free_node( node_type * pNode )
171 node_allocator().Delete( pNode );
174 bool enqueue_node( node_type * p )
176 assert( p != nullptr );
178 scoped_lock lock( m_TailLock );
180 m_pTail->m_pNext = p;
186 struct node_disposer {
187 void operator()( node_type * pNode )
192 typedef std::unique_ptr< node_type, node_disposer > scoped_node_ptr;
196 /// Makes empty queue
199 node_type * pNode = alloc_node();
204 /// Destructor clears queue
208 assert( m_pHead == m_pTail );
209 free_node( m_pHead );
212 /// Enqueues \p data. Always return \a true
213 bool enqueue( value_type const& data )
215 scoped_node_ptr p( alloc_node( data ));
216 if ( enqueue_node( p.get() )) {
223 /// Enqueues \p data to the queue using a functor
225 \p Func is a functor called to create node.
226 The functor \p f takes one argument - a reference to a new node of type \ref value_type :
228 cds::container::RWQueue< cds::gc::HP, Foo > myQueue;
230 myQueue.enqueue_with( [&bar]( Foo& dest ) { dest = bar; } );
233 template <typename Func>
234 bool enqueue_with( Func f )
236 scoped_node_ptr p( alloc_node() );
238 if ( enqueue_node( p.get() )) {
245 /// Enqueues data of type \ref value_type constructed with <tt>std::forward<Args>(args)...</tt>
246 template <typename... Args>
247 bool emplace( Args&&... args )
249 scoped_node_ptr p( alloc_node_move( std::forward<Args>(args)... ));
250 if ( enqueue_node( p.get() )) {
257 /// Synonym for \p enqueue() function
258 bool push( value_type const& val )
260 return enqueue( val );
263 /// Synonym for \p enqueue_with() function
264 template <typename Func>
265 bool push_with( Func f )
267 return enqueue_with( f );
270 /// Dequeues a value to \p dest.
272 If queue is empty returns \a false, \p dest can be corrupted.
273 If queue is not empty returns \a true, \p dest contains the value dequeued
275 bool dequeue( value_type& dest )
277 return dequeue_with( [&dest]( value_type& src ) { dest = src; } );
280 /// Dequeues a value using a functor
282 \p Func is a functor called to copy dequeued value.
283 The functor takes one argument - a reference to removed node:
285 cds:container::RWQueue< cds::gc::HP, Foo > myQueue;
287 myQueue.dequeue_with( [&bar]( Foo& src ) { bar = std::move( src );});
289 The functor is called only if the queue is not empty.
291 template <typename Func>
292 bool dequeue_with( Func f )
296 scoped_lock lock( m_HeadLock );
298 node_type * pNewHead = pNode->m_pNext;
299 if ( pNewHead == nullptr )
301 f( pNewHead->m_value );
309 /// Synonym for \p dequeue() function
310 bool pop( value_type& dest )
312 return dequeue( dest );
315 /// Synonym for \p dequeue_with() function
316 template <typename Func>
317 bool pop_with( Func f )
319 return dequeue_with( f );
322 /// Checks if queue is empty
325 scoped_lock lock( m_HeadLock );
326 return m_pHead->m_pNext == nullptr;
332 scoped_lock lockR( m_HeadLock );
333 scoped_lock lockW( m_TailLock );
334 while ( m_pHead->m_pNext != nullptr ) {
335 node_type * pHead = m_pHead;
336 m_pHead = m_pHead->m_pNext;
341 /// Returns queue's item count
343 The value returned depends on \p rwqueue::traits::item_counter. For \p atomicity::empty_item_counter,
344 this function always returns 0.
346 @note Even if you use real item counter and it returns 0, this fact is not mean that the queue
347 is empty. To check queue emptyness use \p empty() method.
351 return m_ItemCounter.value();
355 /// Returns reference to internal statistics
356 cds::container::msqueue::empty_stat statistics() const
358 return cds::container::msqueue::empty_stat();
363 }} // namespace cds::container
365 #endif // #ifndef __CDS_CONTAINER_RWQUEUE_H