3 #ifndef __CDS_INTRUSIVE_TSIGAS_CYCLE_QUEUE_H
4 #define __CDS_INTRUSIVE_TSIGAS_CYCLE_QUEUE_H
6 #include <cds/intrusive/base.h>
7 #include <cds/cxx11_atomic.h>
8 #include <cds/details/bounded_container.h>
9 #include <cds/opt/buffer.h>
12 namespace cds { namespace intrusive {
14 /// Non-blocking cyclic queue discovered by Philippas Tsigas and Yi Zhang
15 /** @ingroup cds_intrusive_queue
18 \li [2000] Philippas Tsigas, Yi Zhang "A Simple, Fast and Scalable Non-Blocking Concurrent FIFO Queue
19 for Shared Memory Multiprocessor Systems"
22 - T - data stored in queue. The queue stores pointers to passed data of type \p T.
23 <b>Restriction</b>: the queue can manage at least two-byte aligned data: the least significant bit (LSB)
24 of any pointer stored in the queue must be zero since the algorithm may use LSB
25 as a flag that marks the free cell.
29 - opt::buffer - buffer to store items. Mandatory option, see option description for full list of possible types.
30 - opt::disposer - the functor used for dispose removed items. Default is opt::v::empty_disposer. This option is used
31 only in \ref clear function.
32 - opt::item_counter - the type of item counting feature. Default is \ref atomicity::empty_item_counter
33 - opt::back_off - back-off strategy used. If the option is not specified, the cds::backoff::empty is used.
34 - opt::alignment - the alignment for internal queue data. Default is opt::cache_line_alignment
35 - opt::memory_model - C++ memory ordering model. Can be opt::v::relaxed_ordering (relaxed memory model, the default)
36 or opt::v::sequential_consistent (sequentially consisnent memory model).
38 This queue algorithm does not require any garbage collector.
42 #include <cds/intrusive/tsigas_cycle_queue.h>
48 // Queue of Foo pointers, capacity is 1024, statically allocated buffer:
49 typedef cds::intrusive::TsigasCycleQueue<
51 ,cds::opt::buffer< cds::opt::v::static_buffer< Foo, 1024 > >
55 // Queue of Foo pointers, capacity is 1024, dynamically allocated buffer:
56 typedef cds::intrusive::TsigasCycleQueue<
58 ,cds::opt::buffer< cds::opt::v::dynamic_buffer< Foo > >
60 dynamic_queue dynQueue( 1024 );
63 template <typename T, CDS_DECL_OPTIONS7>
64 class TsigasCycleQueue: public cds::bounded_container
67 struct default_options
69 typedef cds::backoff::empty back_off;
70 typedef opt::v::empty_disposer disposer;
71 typedef atomicity::empty_item_counter item_counter;
72 typedef opt::v::relaxed_ordering memory_model;
73 enum { alignment = opt::cache_line_alignment };
79 typedef typename opt::make_options<
80 typename cds::opt::find_type_traits< default_options, CDS_OPTIONS7>::type
86 /// Rebind template arguments
87 template <typename T2, CDS_DECL_OTHER_OPTIONS7>
89 typedef TsigasCycleQueue< T2, CDS_OTHER_OPTIONS7> other ; ///< Rebinding result
93 typedef T value_type ; ///< type of value stored in the queue
94 typedef typename options::item_counter item_counter; ///< Item counter type
95 typedef typename options::disposer disposer ; ///< Item disposer
96 typedef typename options::back_off back_off ; ///< back-off strategy used
97 typedef typename options::memory_model memory_model; ///< Memory ordering. See cds::opt::memory_model option
101 typedef typename options::buffer::template rebind< CDS_ATOMIC::atomic<value_type *> >::other buffer;
102 typedef typename opt::details::alignment_setter< buffer, options::alignment >::type aligned_buffer;
103 typedef size_t index_type;
104 typedef typename opt::details::alignment_setter< CDS_ATOMIC::atomic<index_type>, options::alignment >::type aligned_index;
109 buffer m_buffer ; ///< array of pointer T *, array size is equal to m_nCapacity+1
110 aligned_index m_nHead ; ///< index of queue's head
111 aligned_index m_nTail ; ///< index of queue's tail
112 item_counter m_ItemCounter ; ///< item counter
117 static CDS_CONSTEXPR value_type * free0() CDS_NOEXCEPT
121 static CDS_CONSTEXPR value_type * free1() CDS_NOEXCEPT
123 return (value_type*) 1;
125 static bool is_free( const value_type * p ) CDS_NOEXCEPT
127 return p == free0() || p == free1();
130 size_t buffer_capacity() const CDS_NOEXCEPT
132 return m_buffer.capacity();
135 index_type modulo() const CDS_NOEXCEPT
137 return buffer_capacity() - 1;
142 /// Initialize empty queue of capacity \p nCapacity
144 For cds::opt::v::static_buffer the \p nCapacity parameter is ignored.
146 Note that the real capacity of queue is \p nCapacity - 2.
148 TsigasCycleQueue( size_t nCapacity = 0 )
149 : m_buffer( nCapacity )
162 /// Returns queue's item count
164 The value returned depends on opt::item_counter option. For atomicity::empty_item_counter,
165 this function always returns 0.
167 size_t size() const CDS_NOEXCEPT
169 return m_ItemCounter.value();
172 /// Returns capacity of cyclic buffer
173 size_t capacity() const CDS_NOEXCEPT
175 return buffer_capacity() - 2;
178 /// Enqueues item from the queue
179 /** @anchor cds_intrusive_TsigasQueue_enqueue
180 Returns \p true if success, \p false otherwise (for example, if queue is full)
182 bool enqueue( value_type& data )
184 value_type * pNewNode = &data;
185 assert( (reinterpret_cast<ptr_atomic_t>( pNewNode ) & 1) == 0 );
188 const index_type nModulo = modulo();
191 index_type te = m_nTail.load(memory_model::memory_order_acquire);
193 value_type * tt = m_buffer[ ate ].load(memory_model::memory_order_relaxed);
194 index_type temp = ( ate + 1 ) & nModulo ; // next item after tail
196 // Looking for actual tail
197 while ( !is_free( tt ) ) {
198 if ( te != m_nTail.load(memory_model::memory_order_relaxed) ) // check the tail consistency
200 if ( temp == m_nHead.load(memory_model::memory_order_acquire) ) // queue full?
202 tt = m_buffer[ temp ].load(memory_model::memory_order_relaxed);
204 temp = (temp + 1) & nModulo;
207 if ( te != m_nTail.load(memory_model::memory_order_relaxed) )
210 // Check whether queue is full
211 if ( temp == m_nHead.load(memory_model::memory_order_acquire) ) {
212 ate = ( temp + 1 ) & nModulo;
213 tt = m_buffer[ ate ].load(memory_model::memory_order_relaxed);
214 if ( !is_free( tt ) ) {
215 return false ; // Queue is full
218 // help the dequeue to update head
219 m_nHead.compare_exchange_strong( temp, ate, memory_model::memory_order_release, CDS_ATOMIC::memory_order_relaxed );
224 pNewNode = reinterpret_cast<value_type *>(reinterpret_cast<intptr_t>( pNewNode ) | 1);
225 if ( te != m_nTail.load(memory_model::memory_order_relaxed) )
228 // get actual tail and try to enqueue new node
229 if ( m_buffer[ate].compare_exchange_strong( tt, pNewNode, memory_model::memory_order_release, CDS_ATOMIC::memory_order_relaxed ) ) {
231 m_nTail.compare_exchange_strong( te, temp, memory_model::memory_order_release, CDS_ATOMIC::memory_order_relaxed );
236 } while ( bkoff(), true );
238 // No control path reaches this line!
242 /// Dequeues item from the queue
243 /** @anchor cds_intrusive_TsigasQueue_dequeue
244 If the queue is empty the function returns \a NULL
246 Dequeue does not call value disposer. You can manually dispose returned value if it is needed.
248 value_type * dequeue()
252 const index_type nModulo = modulo();
254 index_type th = m_nHead.load(memory_model::memory_order_acquire);
255 index_type temp = ( th + 1 ) & nModulo;
256 value_type * tt = m_buffer[ temp ].load(memory_model::memory_order_relaxed);
259 // find the actual head after this loop
260 while ( is_free( tt ) ) {
261 if ( th != m_nHead.load(memory_model::memory_order_relaxed) )
264 // two consecutive NULL means queue empty
265 if ( temp == m_nTail.load(memory_model::memory_order_acquire) )
268 temp = ( temp + 1 ) & nModulo;
269 tt = m_buffer[ temp ].load(memory_model::memory_order_relaxed);
272 if ( th != m_nHead.load(memory_model::memory_order_relaxed) )
275 // check whether the queue is empty
276 if ( temp == m_nTail.load(memory_model::memory_order_acquire) ) {
277 // help the enqueue to update end
278 m_nTail.compare_exchange_strong( temp, (temp + 1) & nModulo, memory_model::memory_order_release, CDS_ATOMIC::memory_order_relaxed );
282 pNull = (reinterpret_cast<ptr_atomic_t>( tt ) & 1) ? free0() : free1();
284 if ( th != m_nHead.load(memory_model::memory_order_relaxed) )
287 // Get the actual head, null means empty
288 if ( m_buffer[temp].compare_exchange_strong( tt, pNull, memory_model::memory_order_release, CDS_ATOMIC::memory_order_relaxed )) {
290 m_nHead.compare_exchange_strong( th, temp, memory_model::memory_order_release, CDS_ATOMIC::memory_order_relaxed );
292 return reinterpret_cast<value_type *>(reinterpret_cast<intptr_t>( tt ) & ~intptr_t(1));
296 } while ( bkoff(), true );
298 // No control path reaches this line!
302 /// Synonym of \ref cds_intrusive_TsigasQueue_enqueue "enqueue"
303 bool push( value_type& data )
305 return enqueue( data );
308 /// Synonym of \ref cds_intrusive_TsigasQueue_dequeue "dequeue"
314 /// Checks if the queue is empty
317 const index_type nModulo = modulo();
320 index_type th = m_nHead.load(memory_model::memory_order_relaxed);
321 index_type temp = ( th + 1 ) & nModulo;
322 const value_type * tt = m_buffer[ temp ].load(memory_model::memory_order_relaxed);
324 // find the actual head after this loop
325 while ( is_free( tt ) ) {
326 if ( th != m_nHead.load(memory_model::memory_order_relaxed) )
328 // two consecutive NULL means queue empty
329 if ( temp == m_nTail.load(memory_model::memory_order_relaxed) )
331 temp = ( temp + 1 ) & nModulo;
332 tt = m_buffer[ temp ].load(memory_model::memory_order_relaxed);
337 /// Clears queue in lock-free manner.
339 \p f parameter is a functor to dispose removed items.
340 The interface of \p DISPOSER is:
343 void operator ()( T * val );
346 You can pass \p disposer by reference using \p boost::ref.
347 The disposer will be called immediately for each item.
349 template <typename Disposer>
350 void clear( Disposer f )
353 while ( (pv = pop()) != nullptr ) {
360 This function uses the disposer that is specified in \p Options.
368 }} // namespace cds::intrusive
370 #endif // #ifndef __CDS_INTRUSIVE_TSIGAS_CYCLE_QUEUE_H