2 This file is a part of libcds - Concurrent Data Structures library
4 (C) Copyright Maxim Khizhinsky (libcds.dev@gmail.com) 2006-2016
6 Source code repo: http://github.com/khizmax/libcds/
7 Download: http://sourceforge.net/projects/libcds/files/
9 Redistribution and use in source and binary forms, with or without
10 modification, are permitted provided that the following conditions are met:
12 * Redistributions of source code must retain the above copyright notice, this
13 list of conditions and the following disclaimer.
15 * Redistributions in binary form must reproduce the above copyright notice,
16 this list of conditions and the following disclaimer in the documentation
17 and/or other materials provided with the distribution.
19 THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
20 AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
21 IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
22 DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE
23 FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
24 DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
25 SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
26 CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,
27 OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
28 OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
31 #ifndef CDSLIB_INTRUSIVE_TSIGAS_CYCLE_QUEUE_H
32 #define CDSLIB_INTRUSIVE_TSIGAS_CYCLE_QUEUE_H
34 #include <cds/intrusive/details/base.h>
35 #include <cds/algo/atomic.h>
36 #include <cds/details/bounded_container.h>
37 #include <cds/opt/buffer.h>
39 namespace cds { namespace intrusive {
41 /// TsigasCycleQueue related definitions
42 /** @ingroup cds_intrusive_helper
44 namespace tsigas_queue {
46 /// TsigasCycleQueue default traits
49 /// Buffer type for cyclic array
51 The type of element for the buffer is not important: the queue rebinds
52 buffer for required type via \p rebind metafunction.
54 For \p TsigasCycleQueue queue the buffer size should have power-of-2 size.
56 You should use any initialized buffer type, see \p opt::buffer.
58 typedef cds::opt::v::initialized_dynamic_buffer< void * > buffer;
61 typedef cds::backoff::empty back_off;
63 /// The functor used for dispose removed items. Default is \p opt::v::empty_disposer. This option is used for dequeuing
64 typedef opt::v::empty_disposer disposer;
66 /// Item counting feature; by default, disabled. Use \p cds::atomicity::item_counter to enable item counting
67 typedef atomicity::empty_item_counter item_counter;
69 /// C++ memory ordering model
71 Can be \p opt::v::relaxed_ordering (relaxed memory model, the default)
72 or \p opt::v::sequential_consistent (sequentially consisnent memory model).
74 typedef opt::v::relaxed_ordering memory_model;
76 /// Padding for internal critical atomic data. Default is \p opt::cache_line_padding
77 enum { padding = opt::cache_line_padding };
80 /// Metafunction converting option list to \p tsigas_queue::traits
82 Supported \p Options are:
83 - \p opt::buffer - the buffer type for internal cyclic array. Possible types are:
84 \p opt::v::initialized_dynamic_buffer (the default), \p opt::v::initialized_static_buffer. The type of
85 element in the buffer is not important: it will be changed via \p rebind metafunction.
86 - \p opt::back_off - back-off strategy used, default is \p cds::backoff::empty.
87 - \p opt::disposer - the functor used for dispose removed items. Default is \p opt::v::empty_disposer. This option is used
89 - \p opt::item_counter - the type of item counting feature. Default is \p cds::atomicity::empty_item_counter (item counting disabled)
90 To enable item counting use \p cds::atomicity::item_counter
91 - \p opt::padding - padding for internal critical atomic data. Default is \p opt::cache_line_padding
92 - \p opt::memory_model - C++ memory ordering model. Can be \p opt::v::relaxed_ordering (relaxed memory model, the default)
93 or \p opt::v::sequential_consistent (sequentially consisnent memory model).
95 Example: declare \p %TsigasCycleQueue with item counting and static iternal buffer of size 1024:
97 typedef cds::intrusive::TsigasCycleQueue< Foo,
98 typename cds::intrusive::tsigas_queue::make_traits<
99 cds::opt::buffer< cds::opt::v::initialized_static_buffer< void *, 1024 >,
100 cds::opt::item_counte< cds::atomicity::item_counter >
105 template <typename... Options>
107 # ifdef CDS_DOXYGEN_INVOKED
108 typedef implementation_defined type; ///< Metafunction result
110 typedef typename cds::opt::make_options<
111 typename cds::opt::find_type_traits< traits, Options... >::type
118 } //namespace tsigas_queue
120 /// Non-blocking cyclic queue discovered by Philippas Tsigas and Yi Zhang
121 /** @ingroup cds_intrusive_queue
124 \li [2000] Philippas Tsigas, Yi Zhang "A Simple, Fast and Scalable Non-Blocking Concurrent FIFO Queue
125 for Shared Memory Multiprocessor Systems"
128 - \p T - value type to be stored in queue. The queue stores pointers to passed data of type \p T.
129 <b>Restriction</b>: the queue can manage at least two-byte aligned data: the least significant bit (LSB)
130 of any pointer stored in the queue must be zero since the algorithm may use LSB
131 as a flag that marks the free cell.
132 - \p Traits - queue traits, default is \p tsigas_queue::traits. You can use \p tsigas_queue::make_traits
133 metafunction to make your traits or just derive your traits from \p %tsigas_queue::traits:
135 struct myTraits: public cds::intrusive::tsigas_queue::traits {
136 typedef cds::atomicity::item_counter item_counter;
138 typedef cds::intrusive::TsigasCycleQueue< Foo, myTraits > myQueue;
140 // Equivalent make_traits example:
141 typedef cds::intrusive::TsigasCycleQueue< Foo,
142 typename cds::intrusive::tsigas_queue::make_traits<
143 cds::opt::item_counter< cds::atomicity::item_counter >
148 This queue algorithm does not require any garbage collector.
152 #include <cds/intrusive/tsigas_cycle_queue.h>
158 // Queue of Foo pointers, capacity is 1024, statically allocated buffer:
159 struct queue_traits: public cds::intrusive::tsigas_queue::traits
161 typedef cds::opt::v::initialized_static_buffer< Foo, 1024 > buffer;
163 typedef cds::intrusive::TsigasCycleQueue< Foo, queue_traits > static_queue;
164 static_queue stQueue;
166 // Queue of Foo pointers, capacity is 1024, dynamically allocated buffer, with item counting:
167 typedef cds::intrusive::TsigasCycleQueue< Foo,
168 typename cds::intrusive::tsigas_queue::make_traits<
169 cds::opt::buffer< cds::opt::v::initialized_dynamic_buffer< Foo > >,
170 cds::opt::item_counter< cds::atomicity::item_counter >
173 dynamic_queue dynQueue( 1024 );
176 template <typename T, typename Traits = tsigas_queue::traits >
177 class TsigasCycleQueue: public cds::bounded_container
180 /// Rebind template arguments
181 template <typename T2, typename Traits2>
183 typedef TsigasCycleQueue< T2, Traits2 > other ; ///< Rebinding result
187 typedef T value_type; ///< type of value to be stored in the queue
188 typedef Traits traits; ///< Queue traits
189 typedef typename traits::item_counter item_counter; ///< Item counter type
190 typedef typename traits::disposer disposer; ///< Item disposer
191 typedef typename traits::back_off back_off; ///< back-off strategy used
192 typedef typename traits::memory_model memory_model; ///< Memory ordering. See cds::opt::memory_model option
193 typedef typename traits::buffer::template rebind< atomics::atomic<value_type *> >::other buffer; ///< Internal buffer
197 typedef size_t index_type;
202 buffer m_buffer ; ///< array of pointer T *, array size is equal to m_nCapacity+1
203 typename opt::details::apply_padding< index_type, traits::padding >::padding_type pad1_;
204 atomics::atomic<index_type> m_nHead ; ///< index of queue's head
205 typename opt::details::apply_padding< index_type, traits::padding >::padding_type pad2_;
206 atomics::atomic<index_type> m_nTail ; ///< index of queue's tail
207 typename opt::details::apply_padding< index_type, traits::padding >::padding_type pad3_;
208 item_counter m_ItemCounter; ///< item counter
213 static CDS_CONSTEXPR intptr_t const free0 = 0;
214 static CDS_CONSTEXPR intptr_t const free1 = 1;
216 static bool is_free( const value_type * p ) CDS_NOEXCEPT
218 return (reinterpret_cast<intptr_t>(p) & ~intptr_t(1)) == 0;
221 size_t CDS_CONSTEXPR buffer_capacity() const CDS_NOEXCEPT
223 return m_buffer.capacity();
226 index_type CDS_CONSTEXPR modulo() const CDS_NOEXCEPT
228 return buffer_capacity() - 1;
233 /// Initialize empty queue of capacity \p nCapacity
235 If internal buffer type is \p cds::opt::v::initialized_static_buffer, the \p nCapacity parameter is ignored.
237 Note that the real capacity of queue is \p nCapacity - 2.
239 TsigasCycleQueue( size_t nCapacity = 0 )
240 : m_buffer( nCapacity )
253 /// Enqueues an item to the queue
254 /** @anchor cds_intrusive_TsigasQueue_enqueue
255 Returns \p true if success, \p false if queue is full
257 bool enqueue( value_type& data )
259 value_type * pNewNode = &data;
260 assert( (reinterpret_cast<uintptr_t>(pNewNode) & 1) == 0 );
263 const index_type nModulo = modulo();
266 index_type te = m_nTail.load(memory_model::memory_order_acquire);
268 value_type * tt = m_buffer[ ate ].load(memory_model::memory_order_relaxed);
269 index_type temp = ( ate + 1 ) & nModulo ; // next item after tail
271 // Looking for actual tail
272 while ( !is_free( tt )) {
273 if ( te != m_nTail.load(memory_model::memory_order_relaxed)) // check the tail consistency
275 if ( temp == m_nHead.load(memory_model::memory_order_acquire)) // queue full?
277 tt = m_buffer[ temp ].load(memory_model::memory_order_relaxed);
279 temp = (temp + 1) & nModulo;
282 if ( te != m_nTail.load(memory_model::memory_order_acquire))
285 // Check whether queue is full
286 if ( temp == m_nHead.load(memory_model::memory_order_acquire)) {
287 ate = ( temp + 1 ) & nModulo;
288 tt = m_buffer[ ate ].load(memory_model::memory_order_relaxed);
289 if ( !is_free( tt )) {
290 return false; // Queue is full
293 // help the dequeue to update head
294 m_nHead.compare_exchange_strong( temp, ate, memory_model::memory_order_release, atomics::memory_order_relaxed );
298 if ( tt == reinterpret_cast<value_type *>(free1))
299 pNewNode = reinterpret_cast<value_type *>(reinterpret_cast<intptr_t>( pNewNode ) | 1);
300 if ( te != m_nTail.load(memory_model::memory_order_acquire))
303 // get actual tail and try to enqueue new node
304 if ( m_buffer[ate].compare_exchange_strong( tt, pNewNode, memory_model::memory_order_release, atomics::memory_order_relaxed )) {
306 m_nTail.compare_exchange_strong( te, temp, memory_model::memory_order_release, atomics::memory_order_relaxed );
311 } while ( bkoff(), true );
313 // No control path reaches this line!
317 /// Dequeues item from the queue
318 /** @anchor cds_intrusive_TsigasQueue_dequeue
319 If the queue is empty the function returns \p nullptr
321 Dequeue does not call value disposer. You may manually dispose returned value if it is needed.
323 value_type * dequeue()
327 const index_type nModulo = modulo();
329 index_type th = m_nHead.load(memory_model::memory_order_acquire);
330 index_type temp = ( th + 1 ) & nModulo;
331 value_type * tt = m_buffer[ temp ].load(memory_model::memory_order_relaxed);
334 // find the actual head after this loop
335 while ( is_free( tt )) {
336 if ( th != m_nHead.load(memory_model::memory_order_relaxed))
339 // two consecutive nullptr means the queue is empty
340 if ( temp == m_nTail.load(memory_model::memory_order_acquire))
343 temp = ( temp + 1 ) & nModulo;
344 tt = m_buffer[ temp ].load(memory_model::memory_order_relaxed);
347 if ( th != m_nHead.load(memory_model::memory_order_relaxed))
350 // check whether the queue is empty
351 if ( temp == m_nTail.load(memory_model::memory_order_acquire)) {
352 // help the enqueue to update end
353 m_nTail.compare_exchange_weak( temp, (temp + 1) & nModulo, memory_model::memory_order_release, atomics::memory_order_relaxed );
357 pNull = reinterpret_cast<value_type *>((reinterpret_cast<uintptr_t>(tt) & 1) ? free0 : free1);
359 if ( th != m_nHead.load(memory_model::memory_order_relaxed))
362 // Get the actual head, null means empty
363 if ( m_buffer[temp].compare_exchange_weak( tt, pNull, memory_model::memory_order_acquire, atomics::memory_order_relaxed )) {
365 m_nHead.compare_exchange_weak( th, temp, memory_model::memory_order_release, atomics::memory_order_relaxed );
367 return reinterpret_cast<value_type *>(reinterpret_cast<intptr_t>( tt ) & ~intptr_t(1));
371 } while ( bkoff(), true );
373 // No control path reaches this line!
377 /// Synonym for \p enqueue()
378 bool push( value_type& data )
380 return enqueue( data );
383 /// Synonym for \p dequeue()
389 /// Checks if the queue is empty
392 const index_type nModulo = modulo();
395 index_type th = m_nHead.load(memory_model::memory_order_relaxed);
396 index_type temp = ( th + 1 ) & nModulo;
397 const value_type * tt = m_buffer[ temp ].load(memory_model::memory_order_relaxed);
399 // find the actual head after this loop
400 while ( is_free( tt )) {
401 if ( th != m_nHead.load(memory_model::memory_order_relaxed))
403 // two consecutive nullptr means queue empty
404 if ( temp == m_nTail.load(memory_model::memory_order_relaxed))
406 temp = ( temp + 1 ) & nModulo;
407 tt = m_buffer[ temp ].load(memory_model::memory_order_relaxed);
412 /// Clears queue in lock-free manner.
414 \p f parameter is a functor to dispose removed items:
416 myQueue.clear( []( value_type * p ) { delete p; } );
419 template <typename Disposer>
420 void clear( Disposer f )
423 while ( (pv = pop()) != nullptr ) {
430 This function uses the disposer that is specified in \p Traits,
431 see \p tsigas_queue::traits::disposer.
438 /// Returns queue's item count
440 The value returned depends on \p tsigas_queue::traits::item_counter.
441 For \p atomicity::empty_item_counter, the function always returns 0.
443 size_t size() const CDS_NOEXCEPT
445 return m_ItemCounter.value();
448 /// Returns capacity of internal cyclic buffer
449 size_t CDS_CONSTEXPR capacity() const CDS_NOEXCEPT
451 return buffer_capacity() - 2;
455 }} // namespace cds::intrusive
457 #endif // #ifndef CDSLIB_INTRUSIVE_TSIGAS_CYCLE_QUEUE_H