3 #ifndef __CDS_INTRUSIVE_TSIGAS_CYCLE_QUEUE_H
4 #define __CDS_INTRUSIVE_TSIGAS_CYCLE_QUEUE_H
6 #include <cds/intrusive/details/base.h>
7 #include <cds/algo/atomic.h>
8 #include <cds/details/bounded_container.h>
9 #include <cds/opt/buffer.h>
11 namespace cds { namespace intrusive {
13 /// TsigasCycleQueue related definitions
14 /** @ingroup cds_intrusive_helper
16 namespace tsigas_queue {
18 /// TsigasCycleQueue default traits
21 /// Buffer type for cyclic array
23 The type of element for the buffer is not important: the queue rebinds
24 buffer for required type via \p rebind metafunction.
26 For \p TsigasCycleQueue queue the buffer size should have power-of-2 size.
28 typedef cds::opt::v::dynamic_buffer< void * > buffer;
31 typedef cds::backoff::empty back_off;
33 /// The functor used for dispose removed items. Default is \p opt::v::empty_disposer. This option is used for dequeuing
34 typedef opt::v::empty_disposer disposer;
36 /// Item counting feature; by default, disabled. Use \p cds::atomicity::item_counter to enable item counting
37 typedef atomicity::empty_item_counter item_counter;
39 /// C++ memory ordering model
41 Can be \p opt::v::relaxed_ordering (relaxed memory model, the default)
42 or \p opt::v::sequential_consistent (sequentially consisnent memory model).
44 typedef opt::v::relaxed_ordering memory_model;
46 /// Alignment for internal queue data. Default is \p opt::cache_line_alignment
47 enum { alignment = opt::cache_line_alignment };
50 /// Metafunction converting option list to \p tsigas_queue::traits
52 Supported \p Options are:
53 - \p opt::buffer - the buffer type for internal cyclic array. Possible types are:
54 \p opt::v::dynamic_buffer (the default), \p opt::v::static_buffer. The type of
55 element in the buffer is not important: it will be changed via \p rebind metafunction.
56 - \p opt::back_off - back-off strategy used, default is \p cds::backoff::empty.
57 - \p opt::disposer - the functor used for dispose removed items. Default is \p opt::v::empty_disposer. This option is used
59 - \p opt::item_counter - the type of item counting feature. Default is \p cds::atomicity::empty_item_counter (item counting disabled)
60 To enable item counting use \p cds::atomicity::item_counter
61 - \p opt::alignment - the alignment for internal queue data. Default is \p opt::cache_line_alignment
62 - \p opt::memory_model - C++ memory ordering model. Can be \p opt::v::relaxed_ordering (relaxed memory model, the default)
63 or \p opt::v::sequential_consistent (sequentially consisnent memory model).
65 Example: declare \p %TsigasCycleQueue with item counting and static iternal buffer of size 1024:
67 typedef cds::intrusive::TsigasCycleQueue< Foo,
68 typename cds::intrusive::tsigas_queue::make_traits<
69 cds::opt::buffer< cds::opt::v::static_buffer< void *, 1024 >,
70 cds::opt::item_counte< cds::atomicity::item_counter >
75 template <typename... Options>
77 # ifdef CDS_DOXYGEN_INVOKED
78 typedef implementation_defined type; ///< Metafunction result
80 typedef typename cds::opt::make_options<
81 typename cds::opt::find_type_traits< traits, Options... >::type
88 } //namespace tsigas_queue
90 /// Non-blocking cyclic queue discovered by Philippas Tsigas and Yi Zhang
91 /** @ingroup cds_intrusive_queue
94 \li [2000] Philippas Tsigas, Yi Zhang "A Simple, Fast and Scalable Non-Blocking Concurrent FIFO Queue
95 for Shared Memory Multiprocessor Systems"
98 - \p T - value type to be stored in queue. The queue stores pointers to passed data of type \p T.
99 <b>Restriction</b>: the queue can manage at least two-byte aligned data: the least significant bit (LSB)
100 of any pointer stored in the queue must be zero since the algorithm may use LSB
101 as a flag that marks the free cell.
102 - \p Traits - queue traits, default is \p tsigas_queue::traits. You can use \p tsigas_queue::make_traits
103 metafunction to make your traits or just derive your traits from \p %tsigas_queue::traits:
105 struct myTraits: public cds::intrusive::tsigas_queue::traits {
106 typedef cds::atomicity::item_counter item_counter;
108 typedef cds::intrusive::TsigasCycleQueue< Foo, myTraits > myQueue;
110 // Equivalent make_traits example:
111 typedef cds::intrusive::TsigasCycleQueue< Foo,
112 typename cds::intrusive::tsigas_queue::make_traits<
113 cds::opt::item_counter< cds::atomicity::item_counter >
118 This queue algorithm does not require any garbage collector.
122 #include <cds/intrusive/tsigas_cycle_queue.h>
128 // Queue of Foo pointers, capacity is 1024, statically allocated buffer:
129 struct queue_traits: public cds::intrusive::tsigas_queue::traits
131 typedef cds::opt::v::static_buffer< Foo, 1024 > buffer;
133 typedef cds::intrusive::TsigasCycleQueue< Foo, queue_traits > static_queue;
134 static_queue stQueue;
136 // Queue of Foo pointers, capacity is 1024, dynamically allocated buffer, with item counting:
137 typedef cds::intrusive::TsigasCycleQueue< Foo,
138 typename cds::intrusive::tsigas_queue::make_traits<
139 cds::opt::buffer< cds::opt::v::dynamic_buffer< Foo > >,
140 cds::opt::item_counter< cds::atomicity::item_counter >
143 dynamic_queue dynQueue( 1024 );
146 template <typename T, typename Traits = tsigas_queue::traits >
147 class TsigasCycleQueue: public cds::bounded_container
150 /// Rebind template arguments
151 template <typename T2, typename Traits2>
153 typedef TsigasCycleQueue< T2, Traits2 > other ; ///< Rebinding result
157 typedef T value_type; ///< type of value to be stored in the queue
158 typedef Traits traits; ///< Queue traits
159 typedef typename traits::item_counter item_counter; ///< Item counter type
160 typedef typename traits::disposer disposer; ///< Item disposer
161 typedef typename traits::back_off back_off; ///< back-off strategy used
162 typedef typename traits::memory_model memory_model; ///< Memory ordering. See cds::opt::memory_model option
163 typedef typename traits::buffer::template rebind< atomics::atomic<value_type *> >::other buffer; ///< Internal buffer
167 typedef typename opt::details::alignment_setter< buffer, traits::alignment >::type aligned_buffer;
168 typedef size_t index_type;
169 typedef typename opt::details::alignment_setter< atomics::atomic<index_type>, traits::alignment >::type aligned_index;
174 buffer m_buffer ; ///< array of pointer T *, array size is equal to m_nCapacity+1
175 aligned_index m_nHead ; ///< index of queue's head
176 aligned_index m_nTail ; ///< index of queue's tail
177 item_counter m_ItemCounter ; ///< item counter
182 static CDS_CONSTEXPR intptr_t const free0 = 0;
183 static CDS_CONSTEXPR intptr_t const free1 = 1;
185 static bool is_free( const value_type * p ) CDS_NOEXCEPT
187 return p == reinterpret_cast<value_type *>(free0) || p == reinterpret_cast<value_type *>(free1);
190 size_t CDS_CONSTEXPR buffer_capacity() const CDS_NOEXCEPT
192 return m_buffer.capacity();
195 index_type CDS_CONSTEXPR modulo() const CDS_NOEXCEPT
197 return buffer_capacity() - 1;
202 /// Initialize empty queue of capacity \p nCapacity
204 If internal buffer type is \p cds::opt::v::static_buffer, the \p nCapacity parameter is ignored.
206 Note that the real capacity of queue is \p nCapacity - 2.
208 TsigasCycleQueue( size_t nCapacity = 0 )
209 : m_buffer( nCapacity )
222 /// Enqueues an item to the queue
223 /** @anchor cds_intrusive_TsigasQueue_enqueue
224 Returns \p true if success, \p false if queue is full
226 bool enqueue( value_type& data )
228 value_type * pNewNode = &data;
229 assert( (reinterpret_cast<ptr_atomic_t>( pNewNode ) & 1) == 0 );
232 const index_type nModulo = modulo();
235 index_type te = m_nTail.load(memory_model::memory_order_acquire);
237 value_type * tt = m_buffer[ ate ].load(memory_model::memory_order_relaxed);
238 index_type temp = ( ate + 1 ) & nModulo ; // next item after tail
240 // Looking for actual tail
241 while ( !is_free( tt ) ) {
242 if ( te != m_nTail.load(memory_model::memory_order_relaxed) ) // check the tail consistency
244 if ( temp == m_nHead.load(memory_model::memory_order_acquire) ) // queue full?
246 tt = m_buffer[ temp ].load(memory_model::memory_order_relaxed);
248 temp = (temp + 1) & nModulo;
251 if ( te != m_nTail.load(memory_model::memory_order_relaxed) )
254 // Check whether queue is full
255 if ( temp == m_nHead.load(memory_model::memory_order_acquire) ) {
256 ate = ( temp + 1 ) & nModulo;
257 tt = m_buffer[ ate ].load(memory_model::memory_order_relaxed);
258 if ( !is_free( tt ) ) {
259 return false; // Queue is full
262 // help the dequeue to update head
263 m_nHead.compare_exchange_strong( temp, ate, memory_model::memory_order_release, atomics::memory_order_relaxed );
267 if ( tt == reinterpret_cast<value_type *>(free1) )
268 pNewNode = reinterpret_cast<value_type *>(reinterpret_cast<intptr_t>( pNewNode ) | 1);
269 if ( te != m_nTail.load(memory_model::memory_order_relaxed) )
272 // get actual tail and try to enqueue new node
273 if ( m_buffer[ate].compare_exchange_strong( tt, pNewNode, memory_model::memory_order_release, atomics::memory_order_relaxed ) ) {
275 m_nTail.compare_exchange_strong( te, temp, memory_model::memory_order_release, atomics::memory_order_relaxed );
280 } while ( bkoff(), true );
282 // No control path reaches this line!
286 /// Dequeues item from the queue
287 /** @anchor cds_intrusive_TsigasQueue_dequeue
288 If the queue is empty the function returns \p nullptr
290 Dequeue does not call value disposer. You may manually dispose returned value if it is needed.
292 value_type * dequeue()
296 const index_type nModulo = modulo();
298 index_type th = m_nHead.load(memory_model::memory_order_acquire);
299 index_type temp = ( th + 1 ) & nModulo;
300 value_type * tt = m_buffer[ temp ].load(memory_model::memory_order_relaxed);
303 // find the actual head after this loop
304 while ( is_free( tt ) ) {
305 if ( th != m_nHead.load(memory_model::memory_order_relaxed) )
308 // two consecutive nullptr means the queue is empty
309 if ( temp == m_nTail.load(memory_model::memory_order_acquire) )
312 temp = ( temp + 1 ) & nModulo;
313 tt = m_buffer[ temp ].load(memory_model::memory_order_relaxed);
316 if ( th != m_nHead.load(memory_model::memory_order_relaxed) )
319 // check whether the queue is empty
320 if ( temp == m_nTail.load(memory_model::memory_order_acquire) ) {
321 // help the enqueue to update end
322 m_nTail.compare_exchange_weak( temp, (temp + 1) & nModulo, memory_model::memory_order_release, atomics::memory_order_relaxed );
326 pNull = reinterpret_cast<value_type *>((reinterpret_cast<ptr_atomic_t>(tt) & 1) ? free0 : free1 );
328 if ( th != m_nHead.load(memory_model::memory_order_relaxed) )
331 // Get the actual head, null means empty
332 if ( m_buffer[temp].compare_exchange_weak( tt, pNull, memory_model::memory_order_acquire, atomics::memory_order_relaxed )) {
334 m_nHead.compare_exchange_weak( th, temp, memory_model::memory_order_release, atomics::memory_order_relaxed );
336 return reinterpret_cast<value_type *>(reinterpret_cast<intptr_t>( tt ) & ~intptr_t(1));
340 } while ( bkoff(), true );
342 // No control path reaches this line!
346 /// Synonym for \p enqueue()
347 bool push( value_type& data )
349 return enqueue( data );
352 /// Synonym for \p dequeue()
358 /// Checks if the queue is empty
361 const index_type nModulo = modulo();
364 index_type th = m_nHead.load(memory_model::memory_order_relaxed);
365 index_type temp = ( th + 1 ) & nModulo;
366 const value_type * tt = m_buffer[ temp ].load(memory_model::memory_order_relaxed);
368 // find the actual head after this loop
369 while ( is_free( tt ) ) {
370 if ( th != m_nHead.load(memory_model::memory_order_relaxed) )
372 // two consecutive nullptr means queue empty
373 if ( temp == m_nTail.load(memory_model::memory_order_relaxed) )
375 temp = ( temp + 1 ) & nModulo;
376 tt = m_buffer[ temp ].load(memory_model::memory_order_relaxed);
381 /// Clears queue in lock-free manner.
383 \p f parameter is a functor to dispose removed items:
385 myQueue.clear( []( value_type * p ) { delete p; } );
388 template <typename Disposer>
389 void clear( Disposer f )
392 while ( (pv = pop()) != nullptr ) {
399 This function uses the disposer that is specified in \p Traits,
400 see \p tsigas_queue::traits::disposer.
407 /// Returns queue's item count
409 The value returned depends on \p tsigas_queue::traits::item_counter.
410 For \p atomicity::empty_item_counter, the function always returns 0.
412 size_t size() const CDS_NOEXCEPT
414 return m_ItemCounter.value();
417 /// Returns capacity of internal cyclic buffer
418 size_t CDS_CONSTEXPR capacity() const CDS_NOEXCEPT
420 return buffer_capacity() - 2;
424 }} // namespace cds::intrusive
426 #endif // #ifndef __CDS_INTRUSIVE_TSIGAS_CYCLE_QUEUE_H