4ee0f5b869af83d1a024c204e3ed8bb782f538ab
[libcds.git] / cds / intrusive / basket_queue.h
1 //$$CDS-header$$
2
3 #ifndef __CDS_INTRUSIVE_BASKET_QUEUE_H
4 #define __CDS_INTRUSIVE_BASKET_QUEUE_H
5
6 #include <cds/intrusive/base.h>
7 #include <cds/details/marked_ptr.h>
8 #include <cds/intrusive/queue_stat.h>
9 #include <cds/intrusive/single_link_struct.h>
10 #include <cds/ref.h>
11 #include <cds/intrusive/details/dummy_node_holder.h>
12
13 #include <cds/details/std/type_traits.h>
14
15 namespace cds { namespace intrusive {
16
17     /// BasketQueue -related definitions
18     /** @ingroup cds_intrusive_helper
19     */
20     namespace basket_queue {
21         /// BasketQueue node
22         /**
23             Template parameters:
24             - GC - garbage collector used
25             - Tag - a tag used to distinguish between different implementation
26         */
27         template <class GC, typename Tag = opt::none>
28         struct node: public GC::container_node
29         {
30             typedef GC      gc  ;   ///< Garbage collector
31             typedef Tag     tag ;   ///< tag
32
33             typedef cds::details::marked_ptr<node, 1>   marked_ptr         ;   ///< marked pointer
34             typedef typename gc::template atomic_marked_ptr< marked_ptr>     atomic_marked_ptr   ;   ///< atomic marked pointer specific for GC
35
36             /// Rebind node for other template parameters
37             template <class GC2, typename Tag2 = tag>
38             struct rebind {
39                 typedef node<GC2, Tag2>  other ;    ///< Rebinding result
40             };
41
42             atomic_marked_ptr m_pNext ; ///< pointer to the next node in the container
43
44             node()
45                 : m_pNext( nullptr )
46             {}
47         };
48
49         //@cond
50         // Specialization for HRC GC
51         template <typename Tag>
52         struct node< gc::HRC, Tag>: public gc::HRC::container_node
53         {
54             typedef gc::HRC gc  ;   ///< Garbage collector
55             typedef Tag     tag ;   ///< tag
56
57             typedef cds::details::marked_ptr<node, 1>   marked_ptr         ;   ///< marked pointer
58             typedef typename gc::template atomic_marked_ptr< marked_ptr>     atomic_marked_ptr   ;   ///< atomic marked pointer specific for GC
59
60             atomic_marked_ptr m_pNext ; ///< pointer to the next node in the container
61
62             node()
63                 : m_pNext( nullptr )
64             {}
65
66         protected:
67             virtual void cleanUp( cds::gc::hrc::ThreadGC * pGC )
68             {
69                 assert( pGC != nullptr );
70                 typename gc::template GuardArray<2> aGuards( *pGC );
71
72                 while ( true ) {
73                     marked_ptr pNext = aGuards.protect( 0, m_pNext );
74                     if ( pNext.ptr() && pNext->m_bDeleted.load(CDS_ATOMIC::memory_order_acquire) ) {
75                         marked_ptr p = aGuards.protect( 1, pNext->m_pNext );
76                         m_pNext.compare_exchange_strong( pNext, p, CDS_ATOMIC::memory_order_acquire, CDS_ATOMIC::memory_order_relaxed );
77                         continue;
78                     }
79                     else {
80                         break;
81                     }
82                 }
83             }
84
85             virtual void terminate( cds::gc::hrc::ThreadGC * pGC, bool bConcurrent )
86             {
87                 if ( bConcurrent ) {
88                     marked_ptr pNext = m_pNext.load(CDS_ATOMIC::memory_order_relaxed);
89                     do {} while ( !m_pNext.compare_exchange_weak( pNext, marked_ptr(), CDS_ATOMIC::memory_order_release, CDS_ATOMIC::memory_order_relaxed ) );
90                 }
91                 else {
92                     m_pNext.store( marked_ptr(), CDS_ATOMIC::memory_order_relaxed );
93                 }
94             }
95         };
96         //@endcond
97
98         using single_link::default_hook;
99
100         //@cond
101         template < typename HookType, CDS_DECL_OPTIONS2>
102         struct hook
103         {
104             typedef typename opt::make_options< default_hook, CDS_OPTIONS2>::type  options;
105             typedef typename options::gc    gc;
106             typedef typename options::tag   tag;
107             typedef node<gc, tag> node_type;
108             typedef HookType     hook_type;
109         };
110         //@endcond
111
112
113         /// Base hook
114         /**
115             \p Options are:
116             - opt::gc - garbage collector used.
117             - opt::tag - tag
118         */
119         template < CDS_DECL_OPTIONS2 >
120         struct base_hook: public hook< opt::base_hook_tag, CDS_OPTIONS2 >
121         {};
122
123         /// Member hook
124         /**
125             \p MemberOffset defines offset in bytes of \ref node member into your structure.
126             Use \p offsetof macro to define \p MemberOffset
127
128             \p Options are:
129             - opt::gc - garbage collector used.
130             - opt::tag - tag
131         */
132         template < size_t MemberOffset, CDS_DECL_OPTIONS2 >
133         struct member_hook: public hook< opt::member_hook_tag, CDS_OPTIONS2 >
134         {
135             //@cond
136             static const size_t c_nMemberOffset = MemberOffset;
137             //@endcond
138         };
139
140         /// Traits hook
141         /**
142             \p NodeTraits defines type traits for node.
143             See \ref node_traits for \p NodeTraits interface description
144
145             \p Options are:
146             - opt::gc - garbage collector used.
147             - opt::tag - tag
148         */
149         template <typename NodeTraits, CDS_DECL_OPTIONS2 >
150         struct traits_hook: public hook< opt::traits_hook_tag, CDS_OPTIONS2 >
151         {
152             //@cond
153             typedef NodeTraits node_traits;
154             //@endcond
155         };
156
157
158 #if defined(CDS_CXX11_TEMPLATE_ALIAS_SUPPORT) && !defined(CDS_DOXYGEN_INVOKED)
159         template < typename Node, opt::link_check_type LinkType > using get_link_checker = single_link::get_link_checker< Node, LinkType >;
160 #else
161         /// Metafunction for selecting appropriate link checking policy
162         template < typename Node, opt::link_check_type LinkType >
163         struct get_link_checker: public single_link::get_link_checker< Node, LinkType >
164         {};
165
166 #endif
167
168         /// Basket queue internal statistics. May be used for debugging or profiling
169         /**
170             Basket queue statistics derives from cds::intrusive::queue_stat
171             and extends it by two additional fields specific for the algorithm.
172         */
173         template <typename Counter = cds::atomicity::event_counter >
174         struct stat: public cds::intrusive::queue_stat< Counter >
175         {
176             //@cond
177             typedef cds::intrusive::queue_stat< Counter >   base_class;
178             typedef typename base_class::counter_type       counter_type;
179             //@endcond
180
181             counter_type m_TryAddBasket      ;  ///< Count of attemps adding new item to a basket (only or BasketQueue, for other queue this metric is not used)
182             counter_type m_AddBasketCount    ;  ///< Count of events "Enqueue a new item into basket" (only or BasketQueue, for other queue this metric is not used)
183
184             /// Register an attempt t add new item to basket
185             void onTryAddBasket()           { ++m_TryAddBasket; }
186             /// Register event "Enqueue a new item into basket" (only or BasketQueue, for other queue this metric is not used)
187             void onAddBasket()              { ++m_AddBasketCount; }
188
189             //@cond
190             void reset()
191             {
192                 base_class::reset();
193                 m_TryAddBasket.reset();
194                 m_AddBasketCount.reset();
195             }
196
197             stat& operator +=( stat const& s )
198             {
199                 base_class::operator +=( s );
200                 m_TryAddBasket += s.m_TryAddBasket.get();
201                 m_AddBasketCount += s.m_AddBasketCount.get();
202                 return *this;
203             }
204             //@endcond
205         };
206
207         /// Dummy basket queue statistics - no counting is performed. Support interface like \ref stat
208         struct dummy_stat: public cds::intrusive::queue_dummy_stat
209         {
210             //@cond
211             void onTryAddBasket()           {}
212             void onAddBasket()              {}
213
214             void reset() {}
215             dummy_stat& operator +=( dummy_stat const& )
216             {
217                 return *this;
218             }
219             //@endcond
220         };
221
222     }   // namespace basket_queue
223
224     /// Basket lock-free queue (intrusive variant)
225     /** @ingroup cds_intrusive_queue
226         Implementation of basket queue algorithm.
227
228         \par Source:
229             [2007] Moshe Hoffman, Ori Shalev, Nir Shavit "The Baskets Queue"
230
231         <b>Key idea</b>
232
233         In the \93basket\94 approach, instead of
234         the traditional ordered list of nodes, the queue consists of an ordered list of groups
235         of nodes (logical baskets). The order of nodes in each basket need not be specified, and in
236         fact, it is easiest to maintain them in FIFO order. The baskets fulfill the following basic
237         rules:
238         - Each basket has a time interval in which all its nodes\92 enqueue operations overlap.
239         - The baskets are ordered by the order of their respective time intervals.
240         - For each basket, its nodes\92 dequeue operations occur after its time interval.
241         - The dequeue operations are performed according to the order of baskets.
242
243         Two properties define the FIFO order of nodes:
244         - The order of nodes in a basket is not specified.
245         - The order of nodes in different baskets is the FIFO-order of their respective baskets.
246
247         In algorithms such as the MS-queue or optimistic
248         queue, threads enqueue items by applying a Compare-and-swap (CAS) operation to the
249         queue\92s tail pointer, and all the threads that fail on a particular CAS operation (and also
250         the winner of that CAS) overlap in time. In particular, they share the time interval of
251         the CAS operation itself. Hence, all the threads that fail to CAS on the tail-node of
252         the queue may be inserted into the same basket. By integrating the basket-mechanism
253         as the back-off mechanism, the time usually spent on backing-off before trying to link
254         onto the new tail, can now be utilized to insert the failed operations into the basket,
255         allowing enqueues to complete sooner. In the meantime, the next successful CAS operations
256         by enqueues allow new baskets to be formed down the list, and these can be
257         filled concurrently. Moreover, the failed operations don\92t retry their link attempt on the
258         new tail, lowering the overall contention on it. This leads to a queue
259         algorithm that unlike all former concurrent queue algorithms requires virtually no tuning
260         of the backoff mechanisms to reduce contention, making the algorithm an attractive
261         out-of-the-box queue.
262
263         In order to enqueue, just as in MSQueue, a thread first tries to link the new node to
264         the last node. If it failed to do so, then another thread has already succeeded. Thus it
265         tries to insert the new node into the new basket that was created by the winner thread.
266         To dequeue a node, a thread first reads the head of the queue to obtain the
267         oldest basket. It may then dequeue any node in the oldest basket.
268
269         <b>Template arguments:</b>
270         - \p GC - garbage collector type: gc::HP, gc::HRC, gc::PTB
271         - \p T - type to be stored in the queue, should be convertible to \ref single_link::node
272         - \p Options - options
273
274         <b>Type of node</b>: \ref single_link::node
275
276         \p Options are:
277         - opt::hook - hook used. Possible values are: basket_queue::base_hook, basket_queue::member_hook, basket_queue::traits_hook.
278             If the option is not specified, <tt>basket_queue::base_hook<></tt> is used.
279             For Gidenstam's gc::HRC, only basket_queue::base_hook is supported.
280         - opt::back_off - back-off strategy used. If the option is not specified, the cds::backoff::empty is used.
281         - opt::disposer - the functor used for dispose removed items. Default is opt::v::empty_disposer. This option is used
282             in \ref dequeue function.
283         - opt::link_checker - the type of node's link fields checking. Default is \ref opt::debug_check_link
284             Note: for gc::HRC garbage collector, link checking policy is always selected as \ref opt::always_check_link.
285         - opt::item_counter - the type of item counting feature. Default is \ref atomicity::empty_item_counter (no item counting feature)
286         - opt::stat - the type to gather internal statistics.
287             Possible option value are: \ref basket_queue::stat, \ref basket_queue::dummy_stat,
288             user-provided class that supports basket_queue::stat interface.
289             Default is \ref basket_queue::dummy_stat.
290             Generic option intrusive::queue_stat and intrusive::queue_dummy_stat are acceptable too, however,
291             they will be automatically converted to basket_queue::stat and basket_queue::dummy_stat
292             respectively.
293         - opt::alignment - the alignment for internal queue data. Default is opt::cache_line_alignment
294         - opt::memory_model - C++ memory ordering model. Can be opt::v::relaxed_ordering (relaxed memory model, the default)
295             or opt::v::sequential_consistent (sequentially consisnent memory model).
296
297         Garbage collecting schema \p GC must be consistent with the basket_queue::node GC.
298
299         \par About item disposing
300         Like MSQueue, the Baskets queue algo has a key feature: even if the queue is empty it contains one item that is "dummy" one from
301         the standpoint of the algo. See \ref dequeue function doc for explanation.
302
303         \par Examples
304         \code
305         #include <cds/intrusive/basket_queue.h>
306         #include <cds/gc/hp.h>
307
308         namespace ci = cds::inrtusive;
309         typedef cds::gc::HP hp_gc;
310
311         // Basket queue with Hazard Pointer garbage collector, base hook + item disposer:
312         struct Foo: public ci::basket_queue::node< hp_gc >
313         {
314             // Your data
315             ...
316         };
317
318         // Disposer for Foo struct just deletes the object passed in
319         struct fooDisposer {
320             void operator()( Foo * p )
321             {
322                 delete p;
323             }
324         };
325
326         typedef ci::BasketQueue< hp_gc,
327             Foo
328             ,ci::opt::hook<
329                 ci::basket_queue::base_hook< ci::opt::gc<hp_gc> >
330             >
331             ,ci::opt::disposer< fooDisposer >
332         > fooQueue;
333
334         // BasketQueue with Hazard Pointer garbage collector,
335         // member hook + item disposer + item counter,
336         // without alignment of internal queue data:
337         struct Bar
338         {
339             // Your data
340             ...
341             ci::basket_queue::node< hp_gc > hMember;
342         };
343
344         typedef ci::BasketQueue< hp_gc,
345             Foo
346             ,ci::opt::hook<
347                 ci::basket_queue::member_hook<
348                     offsetof(Bar, hMember)
349                     ,ci::opt::gc<hp_gc>
350                 >
351             >
352             ,ci::opt::disposer< fooDisposer >
353             ,cds::opt::item_counter< cds::atomicity::item_counter >
354             ,cds::opt::alignment< cds::opt::no_special_alignment >
355         > barQueue;
356         \endcode
357     */
358     template <typename GC, typename T, CDS_DECL_OPTIONS9>
359     class BasketQueue
360     {
361         //@cond
362         struct default_options
363         {
364             typedef cds::backoff::empty             back_off;
365             typedef basket_queue::base_hook<>       hook;
366             typedef opt::v::empty_disposer          disposer;
367             typedef atomicity::empty_item_counter   item_counter;
368             typedef basket_queue::dummy_stat        stat;
369             typedef opt::v::relaxed_ordering        memory_model;
370             static const opt::link_check_type link_checker = opt::debug_check_link;
371             enum { alignment = opt::cache_line_alignment };
372         };
373         //@endcond
374
375     public:
376         //@cond
377         typedef typename opt::make_options<
378             typename cds::opt::find_type_traits< default_options, CDS_OPTIONS9 >::type
379             ,CDS_OPTIONS9
380         >::type   options;
381
382         typedef typename std::conditional<
383             std::is_same<typename options::stat, cds::intrusive::queue_stat<> >::value
384             ,basket_queue::stat<>
385             ,typename std::conditional<
386                 std::is_same<typename options::stat, cds::intrusive::queue_dummy_stat>::value
387                 ,basket_queue::dummy_stat
388                 ,typename options::stat
389             >::type
390         >::type stat_type_;
391
392         //@endcond
393
394     public:
395         typedef T  value_type   ;   ///< type of value stored in the queue
396         typedef typename options::hook      hook        ;   ///< hook type
397         typedef typename hook::node_type    node_type   ;   ///< node type
398         typedef typename options::disposer  disposer    ;   ///< disposer used
399         typedef typename get_node_traits< value_type, node_type, hook>::type node_traits ;    ///< node traits
400         typedef typename basket_queue::get_link_checker< node_type, options::link_checker >::type link_checker   ;   ///< link checker
401
402         typedef GC gc          ;   ///< Garbage collector
403         typedef typename options::back_off  back_off    ;   ///< back-off strategy
404         typedef typename options::item_counter item_counter ;   ///< Item counting policy used
405 #ifdef CDS_DOXYGEN_INVOKED
406         typedef typename options::stat      stat        ;   ///< Internal statistics policy used
407 #else
408         typedef stat_type_  stat;
409 #endif
410         typedef typename options::memory_model  memory_model ;   ///< Memory ordering. See cds::opt::memory_model option
411
412         /// Rebind template arguments
413         template <typename GC2, typename T2, CDS_DECL_OTHER_OPTIONS9>
414         struct rebind {
415             typedef BasketQueue< GC2, T2, CDS_OTHER_OPTIONS9> other   ;   ///< Rebinding result
416         };
417
418         static const size_t m_nHazardPtrCount = 6 ; ///< Count of hazard pointer required for the algorithm
419
420     protected:
421         //@cond
422
423         struct internal_disposer
424         {
425             void operator()( value_type * p )
426             {
427                 assert( p != nullptr );
428
429                 BasketQueue::clear_links( node_traits::to_node_ptr(p) );
430                 disposer()( p );
431             }
432         };
433
434         typedef typename node_type::marked_ptr   marked_ptr;
435         typedef typename node_type::atomic_marked_ptr atomic_marked_ptr;
436
437         typedef intrusive::node_to_value<BasketQueue> node_to_value;
438         typedef typename opt::details::alignment_setter< atomic_marked_ptr, options::alignment >::type aligned_node_ptr;
439         typedef typename opt::details::alignment_setter<
440             cds::intrusive::details::dummy_node< gc, node_type>,
441             options::alignment
442         >::type    dummy_node_type;
443
444         //@endcond
445
446         aligned_node_ptr    m_pHead ;           ///< Queue's head pointer (aligned)
447         aligned_node_ptr    m_pTail ;           ///< Queue's tail pointer (aligned)
448
449         dummy_node_type     m_Dummy ;           ///< dummy node
450         item_counter        m_ItemCounter   ;   ///< Item counter
451         stat                m_Stat  ;           ///< Internal statistics
452         //@cond
453         size_t const        m_nMaxHops;
454         //@endcond
455
456         //@cond
457
458         template <size_t Count>
459         static marked_ptr guard_node( typename gc::template GuardArray<Count>& g, size_t idx, atomic_marked_ptr const& p )
460         {
461             marked_ptr pg;
462             while ( true ) {
463                 pg = p.load( memory_model::memory_order_relaxed );
464                 g.assign( idx, node_traits::to_value_ptr( pg.ptr() ) );
465                 if ( p.load( memory_model::memory_order_acquire) == pg ) {
466                     return pg;
467                 }
468             }
469         }
470
471         static marked_ptr guard_node( typename gc::Guard& g, atomic_marked_ptr const& p )
472         {
473             marked_ptr pg;
474             while ( true ) {
475                 pg = p.load( memory_model::memory_order_relaxed );
476                 g.assign( node_traits::to_value_ptr( pg.ptr() ) );
477                 if ( p.load( memory_model::memory_order_acquire) == pg ) {
478                     return pg;
479                 }
480             }
481         }
482
483         struct dequeue_result {
484             typename gc::template GuardArray<3>  guards;
485             node_type * pNext;
486         };
487
488         bool do_dequeue( dequeue_result& res, bool bDeque )
489         {
490             // Note:
491             // If bDeque == false then the function is called from empty method and no real dequeuing operation is performed
492
493             back_off bkoff;
494
495             marked_ptr h;
496             marked_ptr t;
497             marked_ptr pNext;
498
499             while ( true ) {
500                 h = guard_node( res.guards, 0, m_pHead );
501                 t = guard_node( res.guards, 1, m_pTail );
502                 pNext = guard_node( res.guards, 2, h->m_pNext );
503
504                 if ( h == m_pHead.load( memory_model::memory_order_acquire ) ) {
505                     if ( h.ptr() == t.ptr() ) {
506                         if ( !pNext.ptr() )
507                             return false;
508
509                         {
510                             typename gc::Guard g;
511                             while ( pNext->m_pNext.load(memory_model::memory_order_relaxed).ptr() && m_pTail.load(memory_model::memory_order_relaxed) == t ) {
512                                 pNext = guard_node( g, pNext->m_pNext );
513                                 res.guards.assign( 2, g.template get<value_type>() );
514                             }
515                         }
516
517                         m_pTail.compare_exchange_weak( t, marked_ptr(pNext.ptr()), memory_model::memory_order_release, memory_model::memory_order_relaxed );
518                     }
519                     else {
520                         marked_ptr iter( h );
521                         size_t hops = 0;
522
523                         typename gc::Guard g;
524
525                         while ( pNext.ptr() && pNext.bits() && iter.ptr() != t.ptr() && m_pHead.load(memory_model::memory_order_relaxed) == h ) {
526                             iter = pNext;
527                             g.assign( res.guards.template get<value_type>(2) );
528                             pNext = guard_node( res.guards, 2, pNext->m_pNext );
529                             ++hops;
530                         }
531
532                         if ( m_pHead.load(memory_model::memory_order_relaxed) != h )
533                             continue;
534
535                         if ( iter.ptr() == t.ptr() )
536                             free_chain( h, iter );
537                         else if ( bDeque ) {
538                             res.pNext = pNext.ptr();
539
540                             if ( iter->m_pNext.compare_exchange_weak( pNext, marked_ptr( pNext.ptr(), 1 ), memory_model::memory_order_release, memory_model::memory_order_relaxed ) ) {
541                                 if ( hops >= m_nMaxHops )
542                                     free_chain( h, pNext );
543                                 break;
544                             }
545                         }
546                         else
547                             return true;
548                     }
549                 }
550
551                 if ( bDeque )
552                     m_Stat.onDequeueRace();
553                 bkoff();
554             }
555
556             if ( bDeque ) {
557                 --m_ItemCounter;
558                 m_Stat.onDequeue();
559             }
560
561             return true;
562         }
563
564         void free_chain( marked_ptr head, marked_ptr newHead )
565         {
566             // "head" and "newHead" are guarded
567
568             if ( m_pHead.compare_exchange_strong( head, marked_ptr(newHead.ptr()), memory_model::memory_order_release, memory_model::memory_order_relaxed ))
569             {
570                 typename gc::template GuardArray<2> guards;
571                 guards.assign( 0, node_traits::to_value_ptr(head.ptr()) );
572                 while ( head.ptr() != newHead.ptr() ) {
573                     marked_ptr pNext = guard_node( guards, 1, head->m_pNext );
574                     assert( pNext.bits() != 0 );
575                     dispose_node( head.ptr() );
576                     guards.assign( 0, guards.template get<value_type>(1) );
577                     head = pNext;
578                 }
579             }
580         }
581
582         static void clear_links( node_type * pNode )
583         {
584             pNode->m_pNext.store( marked_ptr( nullptr ), memory_model::memory_order_release );
585         }
586
587         void dispose_node( node_type * p )
588         {
589             if ( p != m_Dummy.get() ) {
590                 gc::template retire<internal_disposer>( node_traits::to_value_ptr(p) );
591             }
592             else
593                 m_Dummy.retire();
594         }
595         //@endcond
596
597     public:
598         /// Initializes empty queue
599         BasketQueue()
600             : m_pHead( nullptr )
601             , m_pTail( nullptr )
602             , m_nMaxHops( 3 )
603         {
604             // GC and node_type::gc must be the same
605             static_assert(( std::is_same<gc, typename node_type::gc>::value ), "GC and node_type::gc must be the same");
606
607             // For cds::gc::HRC, only one base_hook is allowed
608             static_assert((
609                 std::conditional<
610                     std::is_same<gc, cds::gc::HRC>::value,
611                     std::is_same< typename hook::hook_type, opt::base_hook_tag >,
612                     boost::true_type
613                 >::type::value
614             ), "For cds::gc::HRC, only base_hook is allowed");
615
616             // Head/tail initialization should be made via store call
617             // because of gc::HRC manages reference counting
618             m_pHead.store( marked_ptr(m_Dummy.get()), memory_model::memory_order_relaxed );
619             m_pTail.store( marked_ptr(m_Dummy.get()), memory_model::memory_order_relaxed );
620         }
621
622         /// Destructor clears the queue
623         /**
624             Since the baskets queue contains at least one item even
625             if the queue is empty, the destructor may call item disposer.
626         */
627         ~BasketQueue()
628         {
629             clear();
630
631             node_type * pHead = m_pHead.load(memory_model::memory_order_relaxed).ptr();
632             assert( pHead != nullptr );
633
634             {
635                 node_type * pNext = pHead->m_pNext.load( memory_model::memory_order_relaxed ).ptr();
636                 while ( pNext ) {
637                     node_type * p = pNext;
638                     pNext = pNext->m_pNext.load( memory_model::memory_order_relaxed ).ptr();
639                     p->m_pNext.store( marked_ptr(), memory_model::memory_order_relaxed );
640                     dispose_node( p );
641                 }
642                 pHead->m_pNext.store( marked_ptr(), memory_model::memory_order_relaxed );
643                 //m_pTail.store( marked_ptr( pHead ), memory_model::memory_order_relaxed );
644             }
645
646             m_pHead.store( marked_ptr( nullptr ), memory_model::memory_order_relaxed );
647             m_pTail.store( marked_ptr( nullptr ), memory_model::memory_order_relaxed );
648
649             dispose_node( pHead );
650         }
651
652         /// Returns queue's item count
653         /**
654             The value returned depends on opt::item_counter option. For atomicity::empty_item_counter,
655             this function always returns 0.
656
657             <b>Warning</b>: even if you use real item counter and it returns 0, this fact is not mean that the queue
658             is empty. To check queue emptyness use \ref empty() method.
659         */
660         size_t size() const
661         {
662             return m_ItemCounter.value();
663         }
664
665         /// Returns reference to internal statistics
666         const stat& statistics() const
667         {
668             return m_Stat;
669         }
670
671         /// Enqueues \p val value into the queue.
672         /** @anchor cds_intrusive_BasketQueue_enqueue
673             The function always returns \p true.
674         */
675         bool enqueue( value_type& val )
676         {
677             node_type * pNew = node_traits::to_node_ptr( val );
678             link_checker::is_empty( pNew );
679
680             typename gc::Guard guard;
681             back_off bkoff;
682
683             marked_ptr t;
684             while ( true ) {
685                 t = guard_node( guard, m_pTail );
686
687                 marked_ptr pNext = t->m_pNext.load(memory_model::memory_order_acquire );
688
689                 if ( pNext.ptr() == nullptr ) {
690                     pNew->m_pNext.store( marked_ptr(), memory_model::memory_order_release );
691                     if ( t->m_pNext.compare_exchange_weak( pNext, marked_ptr(pNew), memory_model::memory_order_release, memory_model::memory_order_relaxed ) ) {
692                         if ( !m_pTail.compare_exchange_strong( t, marked_ptr(pNew), memory_model::memory_order_release, memory_model::memory_order_relaxed ))
693                             m_Stat.onAdvanceTailFailed();
694                         break;
695                     }
696
697                     // Try adding to basket
698                     m_Stat.onTryAddBasket();
699
700                     // Reread tail next
701                     typename gc::Guard gNext;
702
703                 try_again:
704                     pNext = guard_node( gNext, t->m_pNext );
705
706                     // add to the basket
707                     if ( m_pTail.load(memory_model::memory_order_relaxed) == t
708                          && t->m_pNext.load( memory_model::memory_order_relaxed) == pNext
709                          && !pNext.bits()  )
710                     {
711                         bkoff();
712                         pNew->m_pNext.store( pNext, memory_model::memory_order_release );
713                         if ( t->m_pNext.compare_exchange_weak( pNext, marked_ptr( pNew ), memory_model::memory_order_release, memory_model::memory_order_relaxed )) {
714                             m_Stat.onAddBasket();
715                             break;
716                         }
717                         goto try_again;
718                     }
719                 }
720                 else {
721                     // Tail is misplaced, advance it
722
723                     typename gc::template GuardArray<2> g;
724                     g.assign( 0, node_traits::to_value_ptr( pNext.ptr() ) );
725                     if ( t->m_pNext.load( memory_model::memory_order_relaxed ) != pNext ) {
726                         m_Stat.onEnqueueRace();
727                         bkoff();
728                         continue;
729                     }
730
731                     marked_ptr p;
732                     bool bTailOk = true;
733                     while ( (p = pNext->m_pNext.load( memory_model::memory_order_relaxed )).ptr() != nullptr )
734                     {
735                         bTailOk = m_pTail.load( memory_model::memory_order_relaxed ) == t;
736                         if ( !bTailOk )
737                             break;
738
739                         g.assign( 1, node_traits::to_value_ptr( p.ptr() ));
740                         if ( pNext->m_pNext.load(memory_model::memory_order_relaxed) != p )
741                             continue;
742                         pNext = p;
743                         g.assign( 0, g.template get<value_type>( 1 ) );
744                     }
745                     if ( !bTailOk || !m_pTail.compare_exchange_weak( t, marked_ptr( pNext.ptr() ), memory_model::memory_order_release, memory_model::memory_order_relaxed ))
746                         m_Stat.onAdvanceTailFailed();
747
748                     m_Stat.onBadTail();
749                 }
750
751                 m_Stat.onEnqueueRace();
752             }
753
754             ++m_ItemCounter;
755             m_Stat.onEnqueue();
756
757             return true;
758         }
759
760         /// Dequeues a value from the queue
761         /** @anchor cds_intrusive_BasketQueue_dequeue
762             If the queue is empty the function returns \p NULL.
763
764             <b>Warning</b>: see MSQueue::deque note about item disposing
765         */
766         value_type * dequeue()
767         {
768             dequeue_result res;
769
770             if ( do_dequeue( res, true ))
771                 return node_traits::to_value_ptr( *res.pNext );
772             return nullptr;
773         }
774
775         /// Synonym for \ref cds_intrusive_BasketQueue_enqueue "enqueue" function
776         bool push( value_type& val )
777         {
778             return enqueue( val );
779         }
780
781         /// Synonym for \ref cds_intrusive_BasketQueue_dequeue "dequeue" function
782         value_type * pop()
783         {
784             return dequeue();
785         }
786
787         /// Checks if the queue is empty
788         /**
789             Note that this function is not \p const.
790             The function is based on \ref dequeue algorithm
791             but really does not dequeued any item.
792         */
793         bool empty()
794         {
795             dequeue_result res;
796             return !do_dequeue( res, false );
797         }
798
799         /// Clear the queue
800         /**
801             The function repeatedly calls \ref dequeue until it returns \p NULL.
802             The disposer defined in template \p Options is called for each item
803             that can be safely disposed.
804         */
805         void clear()
806         {
807             while ( dequeue() );
808         }
809     };
810
811 }} // namespace cds::intrusive
812
813 #endif // #ifndef __CDS_INTRUSIVE_BASKET_QUEUE_H