Text formatting, docfix
[libcds.git] / cds / intrusive / basket_queue.h
1 //$$CDS-header$$
2
3 #ifndef CDSLIB_INTRUSIVE_BASKET_QUEUE_H
4 #define CDSLIB_INTRUSIVE_BASKET_QUEUE_H
5
6 #include <type_traits>
7 #include <cds/intrusive/details/single_link_struct.h>
8 #include <cds/details/marked_ptr.h>
9
10 namespace cds { namespace intrusive {
11
12     /// BasketQueue -related definitions
13     /** @ingroup cds_intrusive_helper
14     */
15     namespace basket_queue {
16         /// BasketQueue node
17         /**
18             Template parameters:
19             Template parameters:
20             - GC - garbage collector used
21             - Tag - a \ref cds_intrusive_hook_tag "tag"
22             */
23         template <class GC, typename Tag = opt::none>
24         struct node
25         {
26             typedef GC      gc  ;   ///< Garbage collector
27             typedef Tag     tag ;   ///< tag
28
29             typedef cds::details::marked_ptr<node, 1>                    marked_ptr;        ///< marked pointer
30             typedef typename gc::template atomic_marked_ptr< marked_ptr> atomic_marked_ptr; ///< atomic marked pointer specific for GC
31
32             /// Rebind node for other template parameters
33             template <class GC2, typename Tag2 = tag>
34             struct rebind {
35                 typedef node<GC2, Tag2>  other ;    ///< Rebinding result
36             };
37
38             atomic_marked_ptr m_pNext ; ///< pointer to the next node in the container
39
40             node()
41                 : m_pNext( nullptr )
42             {}
43         };
44
45         using cds::intrusive::single_link::default_hook;
46
47         //@cond
48         template < typename HookType, typename... Options>
49         struct hook
50         {
51             typedef typename opt::make_options< default_hook, Options...>::type  options;
52             typedef typename options::gc    gc;
53             typedef typename options::tag   tag;
54             typedef node<gc, tag> node_type;
55             typedef HookType     hook_type;
56         };
57         //@endcond
58
59
60         /// Base hook
61         /**
62             \p Options are:
63             - opt::gc - garbage collector used.
64             - opt::tag - a \ref cds_intrusive_hook_tag "tag"
65         */
66         template < typename... Options >
67         struct base_hook: public hook< opt::base_hook_tag, Options... >
68         {};
69
70         /// Member hook
71         /**
72             \p MemberOffset defines offset in bytes of \ref node member into your structure.
73             Use \p offsetof macro to define \p MemberOffset
74
75             \p Options are:
76             - opt::gc - garbage collector used.
77             - opt::tag - a \ref cds_intrusive_hook_tag "tag"
78         */
79         template < size_t MemberOffset, typename... Options >
80         struct member_hook: public hook< opt::member_hook_tag, Options... >
81         {
82             //@cond
83             static const size_t c_nMemberOffset = MemberOffset;
84             //@endcond
85         };
86
87         /// Traits hook
88         /**
89             \p NodeTraits defines type traits for node.
90             See \ref node_traits for \p NodeTraits interface description
91
92             \p Options are:
93             - opt::gc - garbage collector used.
94             - opt::tag - a \ref cds_intrusive_hook_tag "tag"
95         */
96         template <typename NodeTraits, typename... Options >
97         struct traits_hook: public hook< opt::traits_hook_tag, Options... >
98         {
99             //@cond
100             typedef NodeTraits node_traits;
101             //@endcond
102         };
103
104         /// BasketQueue internal statistics. May be used for debugging or profiling
105         /**
106             Template argument \p Counter defines type of counter.
107             Default is \p cds::atomicity::event_counter, that is weak, i.e. it is not guaranteed
108             strict event counting.
109             You may use stronger type of counter like as \p cds::atomicity::item_counter,
110             or even integral type, for example, \p int.
111         */
112         template <typename Counter = cds::atomicity::event_counter >
113         struct stat
114         {
115             typedef Counter counter_type;   ///< Counter type
116
117             counter_type m_EnqueueCount;    ///< Enqueue call count
118             counter_type m_DequeueCount;    ///< Dequeue call count
119             counter_type m_EnqueueRace;     ///< Count of enqueue race conditions encountered
120             counter_type m_DequeueRace;     ///< Count of dequeue race conditions encountered
121             counter_type m_AdvanceTailError;///< Count of "advance tail failed" events
122             counter_type m_BadTail;         ///< Count of events "Tail is not pointed to the last item in the queue"
123             counter_type m_TryAddBasket;    ///< Count of attemps adding new item to a basket (only or BasketQueue, for other queue this metric is not used)
124             counter_type m_AddBasketCount;  ///< Count of events "Enqueue a new item into basket" (only or BasketQueue, for other queue this metric is not used)
125             counter_type m_EmptyDequeue;    ///< Count of dequeue from empty queue
126
127             /// Register enqueue call
128             void onEnqueue()                { ++m_EnqueueCount; }
129             /// Register dequeue call
130             void onDequeue()                { ++m_DequeueCount; }
131             /// Register enqueue race event
132             void onEnqueueRace()            { ++m_EnqueueRace; }
133             /// Register dequeue race event
134             void onDequeueRace()            { ++m_DequeueRace; }
135             /// Register "advance tail failed" event
136             void onAdvanceTailFailed()      { ++m_AdvanceTailError; }
137             /// Register event "Tail is not pointed to last item in the queue"
138             void onBadTail()                { ++m_BadTail; }
139             /// Register an attempt t add new item to basket
140             void onTryAddBasket()           { ++m_TryAddBasket; }
141             /// Register event "Enqueue a new item into basket" (only or BasketQueue, for other queue this metric is not used)
142             void onAddBasket()              { ++m_AddBasketCount; }
143             /// Register dequeuing from empty queue
144             void onEmptyDequeue()           { ++m_EmptyDequeue; }
145
146
147             //@cond
148             void reset()
149             {
150                 m_EnqueueCount.reset();
151                 m_DequeueCount.reset();
152                 m_EnqueueRace.reset();
153                 m_DequeueRace.reset();
154                 m_AdvanceTailError.reset();
155                 m_BadTail.reset();
156                 m_TryAddBasket.reset();
157                 m_AddBasketCount.reset();
158                 m_EmptyDequeue.reset();
159             }
160
161             stat& operator +=( stat const& s )
162             {
163                 m_EnqueueCount  += s.m_EnqueueCount.get();
164                 m_DequeueCount  += s.m_DequeueCount.get();
165                 m_EnqueueRace   += s.m_EnqueueRace.get();
166                 m_DequeueRace   += s.m_DequeueRace.get();
167                 m_AdvanceTailError += s.m_AdvanceTailError.get();
168                 m_BadTail       += s.m_BadTail.get();
169                 m_TryAddBasket  += s.m_TryAddBasket.get();
170                 m_AddBasketCount += s.m_AddBasketCount.get();
171                 m_EmptyDequeue  += s.m_EmptyDequeue.get();
172                 return *this;
173             }
174             //@endcond
175         };
176
177         /// Dummy BasketQueue statistics - no counting is performed, no overhead. Support interface like \p basket_queue::stat
178         struct empty_stat
179         {
180             //@cond
181             void onEnqueue()            const {}
182             void onDequeue()            const {}
183             void onEnqueueRace()        const {}
184             void onDequeueRace()        const {}
185             void onAdvanceTailFailed()  const {}
186             void onBadTail()            const {}
187             void onTryAddBasket()       const {}
188             void onAddBasket()          const {}
189             void onEmptyDequeue()       const {}
190
191             void reset() {}
192             empty_stat& operator +=( empty_stat const& )
193             {
194                 return *this;
195             }
196             //@endcond
197         };
198
199         /// BasketQueue default type traits
200         struct traits
201         {
202             /// Back-off strategy
203             typedef cds::backoff::empty             back_off;
204
205             /// Hook, possible types are \p basket_queue::base_hook, \p basket_queue::member_hook, \p basket_queue::traits_hook
206             typedef basket_queue::base_hook<>       hook;
207
208             /// The functor used for dispose removed items. Default is \p opt::v::empty_disposer. This option is used for dequeuing
209             typedef opt::v::empty_disposer          disposer;
210
211             /// Item counting feature; by default, disabled. Use \p cds::atomicity::item_counter to enable item counting
212             typedef atomicity::empty_item_counter   item_counter;
213
214             /// Internal statistics (by default, disabled)
215             /**
216                 Possible option value are: \p basket_queue::stat, \p basket_queue::empty_stat (the default),
217                 user-provided class that supports \p %basket_queue::stat interface.
218             */
219             typedef basket_queue::empty_stat        stat;
220
221             /// C++ memory ordering model
222             /**
223                 Can be \p opt::v::relaxed_ordering (relaxed memory model, the default)
224                 or \p opt::v::sequential_consistent (sequentially consisnent memory model).
225             */
226             typedef opt::v::relaxed_ordering        memory_model;
227
228             /// Link checking, see \p cds::opt::link_checker
229             static CDS_CONSTEXPR const opt::link_check_type link_checker = opt::debug_check_link;
230
231             /// Padding for internal critical atomic data. Default is \p opt::cache_line_padding
232             enum { padding = opt::cache_line_padding };
233         };
234
235
236         /// Metafunction converting option list to \p basket_queue::traits
237         /**
238             Supported \p Options are:
239
240             - opt::hook - hook used. Possible hooks are: \p basket_queue::base_hook, \p basket_queue::member_hook, \p basket_queue::traits_hook.
241                 If the option is not specified, \p %basket_queue::base_hook<> is used.
242             - opt::back_off - back-off strategy used, default is \p cds::backoff::empty.
243             - opt::disposer - the functor used for dispose removed items. Default is \p opt::v::empty_disposer. This option is used
244                 when dequeuing.
245             - opt::link_checker - the type of node's link fields checking. Default is \p opt::debug_check_link
246             - opt::item_counter - the type of item counting feature. Default is \p cds::atomicity::empty_item_counter (item counting disabled)
247                 To enable item counting use \p cds::atomicity::item_counter
248             - opt::stat - the type to gather internal statistics.
249                 Possible statistics types are: \p basket_queue::stat, \p basket_queue::empty_stat, user-provided class that supports \p %basket_queue::stat interface.
250                 Default is \p %basket_queue::empty_stat (internal statistics disabled).
251             - \p opt::padding - padding for internal critical atomic data. Default is \p opt::cache_line_padding
252             - opt::memory_model - C++ memory ordering model. Can be \p opt::v::relaxed_ordering (relaxed memory model, the default)
253                 or \p opt::v::sequential_consistent (sequentially consisnent memory model).
254
255             Example: declare \p %BasketQueue with item counting and internal statistics
256             \code
257             typedef cds::intrusive::BasketQueue< cds::gc::HP, Foo,
258                 typename cds::intrusive::basket_queue::make_traits<
259                     cds::intrusive::opt:hook< cds::intrusive::basket_queue::base_hook< cds::opt::gc<cds:gc::HP> >>,
260                     cds::opt::item_counte< cds::atomicity::item_counter >,
261                     cds::opt::stat< cds::intrusive::basket_queue::stat<> >
262                 >::type
263             > myQueue;
264             \endcode
265         */
266         template <typename... Options>
267         struct make_traits {
268 #   ifdef CDS_DOXYGEN_INVOKED
269             typedef implementation_defined type;   ///< Metafunction result
270 #   else
271             typedef typename cds::opt::make_options<
272                 typename cds::opt::find_type_traits< traits, Options... >::type
273                 , Options...
274             >::type type;
275 #   endif
276         };
277     }   // namespace basket_queue
278
279     /// Basket lock-free queue (intrusive variant)
280     /** @ingroup cds_intrusive_queue
281         Implementation of basket queue algorithm.
282
283         \par Source:
284             [2007] Moshe Hoffman, Ori Shalev, Nir Shavit "The Baskets Queue"
285
286         <b>Key idea</b>
287
288         In the 'basket' approach, instead of
289         the traditional ordered list of nodes, the queue consists of an ordered list of groups
290         of nodes (logical baskets). The order of nodes in each basket need not be specified, and in
291         fact, it is easiest to maintain them in FIFO order. The baskets fulfill the following basic
292         rules:
293         - Each basket has a time interval in which all its nodes' enqueue operations overlap.
294         - The baskets are ordered by the order of their respective time intervals.
295         - For each basket, its nodes' dequeue operations occur after its time interval.
296         - The dequeue operations are performed according to the order of baskets.
297
298         Two properties define the FIFO order of nodes:
299         - The order of nodes in a basket is not specified.
300         - The order of nodes in different baskets is the FIFO-order of their respective baskets.
301
302         In algorithms such as the MS-queue or optimistic
303         queue, threads enqueue items by applying a Compare-and-swap (CAS) operation to the
304         queue's tail pointer, and all the threads that fail on a particular CAS operation (and also
305         the winner of that CAS) overlap in time. In particular, they share the time interval of
306         the CAS operation itself. Hence, all the threads that fail to CAS on the tail-node of
307         the queue may be inserted into the same basket. By integrating the basket-mechanism
308         as the back-off mechanism, the time usually spent on backing-off before trying to link
309         onto the new tail, can now be utilized to insert the failed operations into the basket,
310         allowing enqueues to complete sooner. In the meantime, the next successful CAS operations
311         by enqueues allow new baskets to be formed down the list, and these can be
312         filled concurrently. Moreover, the failed operations don't retry their link attempt on the
313         new tail, lowering the overall contention on it. This leads to a queue
314         algorithm that unlike all former concurrent queue algorithms requires virtually no tuning
315         of the backoff mechanisms to reduce contention, making the algorithm an attractive
316         out-of-the-box queue.
317
318         In order to enqueue, just as in MSQueue, a thread first tries to link the new node to
319         the last node. If it failed to do so, then another thread has already succeeded. Thus it
320         tries to insert the new node into the new basket that was created by the winner thread.
321         To dequeue a node, a thread first reads the head of the queue to obtain the
322         oldest basket. It may then dequeue any node in the oldest basket.
323
324         <b>Template arguments:</b>
325         - \p GC - garbage collector type: \p gc::HP, \p gc::DHP
326         - \p T - type of value to be stored in the queue
327         - \p Traits - queue traits, default is \p basket_queue::traits. You can use \p basket_queue::make_traits
328             metafunction to make your traits or just derive your traits from \p %basket_queue::traits:
329             \code
330             struct myTraits: public cds::intrusive::basket_queue::traits {
331                 typedef cds::intrusive::basket_queue::stat<> stat;
332                 typedef cds::atomicity::item_counter    item_counter;
333             };
334             typedef cds::intrusive::BasketQueue< cds::gc::HP, Foo, myTraits > myQueue;
335
336             // Equivalent make_traits example:
337             typedef cds::intrusive::BasketQueue< cds::gc::HP, Foo,
338                 typename cds::intrusive::basket_queue::make_traits<
339                     cds::opt::stat< cds::intrusive::basket_queue::stat<> >,
340                     cds::opt::item_counter< cds::atomicity::item_counter >
341                 >::type
342             > myQueue;
343             \endcode
344
345         Garbage collecting schema \p GC must be consistent with the \p basket_queue::node GC.
346
347         \par About item disposing
348         Like \p MSQueue, the Baskets queue algo has a key feature: even if the queue is empty it contains one item that is "dummy" one from
349         the standpoint of the algo. See \p dequeue() function doc for explanation.
350
351         \par Examples
352         \code
353         #include <cds/intrusive/basket_queue.h>
354         #include <cds/gc/hp.h>
355
356         namespace ci = cds::inrtusive;
357         typedef cds::gc::HP hp_gc;
358
359         // Basket queue with Hazard Pointer garbage collector, base hook + item disposer:
360         struct Foo: public ci::basket_queue::node< hp_gc >
361         {
362             // Your data
363             ...
364         };
365
366         // Disposer for Foo struct just deletes the object passed in
367         struct fooDisposer {
368             void operator()( Foo * p )
369             {
370                 delete p;
371             }
372         };
373
374         struct fooTraits: public ci::basket_queue::traits {
375             typedef ci::basket_queue::base_hook< ci::opt::gc<hp_gc> > hook;
376             typedef fooDisposer disposer;
377         };
378         typedef ci::BasketQueue< hp_gc, Foo, fooTraits > fooQueue;
379
380         // BasketQueue with Hazard Pointer garbage collector,
381         // member hook + item disposer + item counter,
382         // without padding of internal queue data:
383         struct Bar
384         {
385             // Your data
386             ...
387             ci::basket_queue::node< hp_gc > hMember;
388         };
389
390         struct barTraits: public
391             ci::basket_queue::make_traits<
392                 ci::opt::hook<
393                     ci::basket_queue::member_hook<
394                         offsetof(Bar, hMember)
395                         ,ci::opt::gc<hp_gc>
396                     >
397                 >
398                 ,ci::opt::disposer< fooDisposer >
399                 ,cds::opt::item_counter< cds::atomicity::item_counter >
400                 ,cds::opt::padding< cds::opt::no_special_padding >
401             >::type
402         {};
403         typedef ci::BasketQueue< hp_gc, Bar, barTraits > barQueue;
404         \endcode
405     */
406     template <typename GC, typename T, typename Traits = basket_queue::traits >
407     class BasketQueue
408     {
409     public:
410         typedef GC gc;          ///< Garbage collector
411         typedef T  value_type;  ///< type of value stored in the queue
412         typedef Traits traits;  ///< Queue traits
413         typedef typename traits::hook       hook;       ///< hook type
414         typedef typename hook::node_type    node_type;  ///< node type
415         typedef typename traits::disposer   disposer;   ///< disposer used
416         typedef typename get_node_traits< value_type, node_type, hook>::type node_traits;   ///< node traits
417         typedef typename single_link::get_link_checker< node_type, traits::link_checker >::type link_checker;   ///< link checker
418
419         typedef typename traits::back_off       back_off;     ///< back-off strategy
420         typedef typename traits::item_counter   item_counter; ///< Item counting policy used
421         typedef typename traits::stat           stat;         ///< Internal statistics policy used
422         typedef typename traits::memory_model   memory_model; ///< Memory ordering. See cds::opt::memory_model option
423
424         /// Rebind template arguments
425         template <typename GC2, typename T2, typename Traits2>
426         struct rebind {
427             typedef BasketQueue< GC2, T2, Traits2> other   ;   ///< Rebinding result
428         };
429
430         static CDS_CONSTEXPR const size_t c_nHazardPtrCount = 6 ; ///< Count of hazard pointer required for the algorithm
431
432     protected:
433         //@cond
434         typedef typename node_type::marked_ptr   marked_ptr;
435         typedef typename node_type::atomic_marked_ptr atomic_marked_ptr;
436
437         // GC and node_type::gc must be the same
438         static_assert( std::is_same<gc, typename node_type::gc>::value, "GC and node_type::gc must be the same");
439         //@endcond
440
441         atomic_marked_ptr    m_pHead ;           ///< Queue's head pointer (aligned)
442         //@cond
443         typename opt::details::apply_padding< atomic_marked_ptr, traits::padding >::padding_type pad1_;
444         //@endcond
445         atomic_marked_ptr    m_pTail ;           ///< Queue's tail pointer (aligned)
446         //@cond
447         typename opt::details::apply_padding< atomic_marked_ptr, traits::padding >::padding_type pad2_;
448         //@endcond
449         node_type           m_Dummy ;           ///< dummy node
450         //@cond
451         typename opt::details::apply_padding< node_type, traits::padding >::padding_type pad3_;
452         //@endcond
453         item_counter        m_ItemCounter   ;   ///< Item counter
454         stat                m_Stat  ;           ///< Internal statistics
455         //@cond
456         size_t const        m_nMaxHops;
457         //@endcond
458
459         //@cond
460
461         struct dequeue_result {
462             typename gc::template GuardArray<3>  guards;
463             node_type * pNext;
464         };
465
466         bool do_dequeue( dequeue_result& res, bool bDeque )
467         {
468             // Note:
469             // If bDeque == false then the function is called from empty method and no real dequeuing operation is performed
470
471             back_off bkoff;
472
473             marked_ptr h;
474             marked_ptr t;
475             marked_ptr pNext;
476
477             while ( true ) {
478                 h = res.guards.protect( 0, m_pHead, []( marked_ptr p ) -> value_type * { return node_traits::to_value_ptr( p.ptr());});
479                 t = res.guards.protect( 1, m_pTail, []( marked_ptr p ) -> value_type * { return node_traits::to_value_ptr( p.ptr());});
480                 pNext = res.guards.protect( 2, h->m_pNext, []( marked_ptr p ) -> value_type * { return node_traits::to_value_ptr( p.ptr());});
481
482                 if ( h == m_pHead.load( memory_model::memory_order_acquire )) {
483                     if ( h.ptr() == t.ptr()) {
484                         if ( !pNext.ptr()) {
485                             m_Stat.onEmptyDequeue();
486                             return false;
487                         }
488
489                         {
490                             typename gc::Guard g;
491                             while ( pNext->m_pNext.load(memory_model::memory_order_relaxed).ptr() && m_pTail.load(memory_model::memory_order_relaxed) == t ) {
492                                 pNext = g.protect( pNext->m_pNext, []( marked_ptr p ) -> value_type * { return node_traits::to_value_ptr( p.ptr());});
493                                 res.guards.copy( 2, g );
494                             }
495                         }
496
497                         m_pTail.compare_exchange_weak( t, marked_ptr(pNext.ptr()), memory_model::memory_order_acquire, atomics::memory_order_relaxed );
498                     }
499                     else {
500                         marked_ptr iter( h );
501                         size_t hops = 0;
502
503                         typename gc::Guard g;
504
505                         while ( pNext.ptr() && pNext.bits() && iter.ptr() != t.ptr() && m_pHead.load(memory_model::memory_order_relaxed) == h ) {
506                             iter = pNext;
507                             g.assign( res.guards.template get<value_type>(2));
508                             pNext = res.guards.protect( 2, pNext->m_pNext, []( marked_ptr p ) -> value_type * { return node_traits::to_value_ptr( p.ptr());});
509                             ++hops;
510                         }
511
512                         if ( m_pHead.load(memory_model::memory_order_relaxed) != h )
513                             continue;
514
515                         if ( iter.ptr() == t.ptr())
516                             free_chain( h, iter );
517                         else if ( bDeque ) {
518                             res.pNext = pNext.ptr();
519
520                             if ( iter->m_pNext.compare_exchange_weak( pNext, marked_ptr( pNext.ptr(), 1 ), memory_model::memory_order_acquire, atomics::memory_order_relaxed )) {
521                                 if ( hops >= m_nMaxHops )
522                                     free_chain( h, pNext );
523                                 break;
524                             }
525                         }
526                         else
527                             return true;
528                     }
529                 }
530
531                 if ( bDeque )
532                     m_Stat.onDequeueRace();
533                 bkoff();
534             }
535
536             if ( bDeque ) {
537                 --m_ItemCounter;
538                 m_Stat.onDequeue();
539             }
540
541             return true;
542         }
543
544         void free_chain( marked_ptr head, marked_ptr newHead )
545         {
546             // "head" and "newHead" are guarded
547
548             if ( m_pHead.compare_exchange_strong( head, marked_ptr(newHead.ptr()), memory_model::memory_order_release, atomics::memory_order_relaxed ))
549             {
550                 typename gc::template GuardArray<2> guards;
551                 guards.assign( 0, node_traits::to_value_ptr(head.ptr()));
552                 while ( head.ptr() != newHead.ptr()) {
553                     marked_ptr pNext = guards.protect( 1, head->m_pNext, []( marked_ptr p ) -> value_type * { return node_traits::to_value_ptr( p.ptr());});
554                     assert( pNext.bits() != 0 );
555                     dispose_node( head.ptr());
556                     guards.copy( 0, 1 );
557                     head = pNext;
558                 }
559             }
560         }
561
562         static void clear_links( node_type * pNode )
563         {
564             pNode->m_pNext.store( marked_ptr( nullptr ), memory_model::memory_order_release );
565         }
566
567         void dispose_node( node_type * p )
568         {
569             if ( p != &m_Dummy ) {
570                 struct internal_disposer
571                 {
572                     void operator()( value_type * p )
573                     {
574                         assert( p != nullptr );
575                         BasketQueue::clear_links( node_traits::to_node_ptr( p ));
576                         disposer()(p);
577                     }
578                 };
579                 gc::template retire<internal_disposer>( node_traits::to_value_ptr(p));
580             }
581         }
582         //@endcond
583
584     public:
585         /// Initializes empty queue
586         BasketQueue()
587             : m_pHead( &m_Dummy )
588             , m_pTail( &m_Dummy )
589             , m_nMaxHops( 3 )
590         {}
591
592         /// Destructor clears the queue
593         /**
594             Since the baskets queue contains at least one item even
595             if the queue is empty, the destructor may call item disposer.
596         */
597         ~BasketQueue()
598         {
599             clear();
600
601             node_type * pHead = m_pHead.load(memory_model::memory_order_relaxed).ptr();
602             assert( pHead != nullptr );
603
604             {
605                 node_type * pNext = pHead->m_pNext.load( memory_model::memory_order_relaxed ).ptr();
606                 while ( pNext ) {
607                     node_type * p = pNext;
608                     pNext = pNext->m_pNext.load( memory_model::memory_order_relaxed ).ptr();
609                     p->m_pNext.store( marked_ptr(), memory_model::memory_order_relaxed );
610                     dispose_node( p );
611                 }
612                 pHead->m_pNext.store( marked_ptr(), memory_model::memory_order_relaxed );
613                 //m_pTail.store( marked_ptr( pHead ), memory_model::memory_order_relaxed );
614             }
615
616             m_pHead.store( marked_ptr( nullptr ), memory_model::memory_order_relaxed );
617             m_pTail.store( marked_ptr( nullptr ), memory_model::memory_order_relaxed );
618
619             dispose_node( pHead );
620         }
621
622         /// Enqueues \p val value into the queue.
623         /** @anchor cds_intrusive_BasketQueue_enqueue
624             The function always returns \p true.
625         */
626         bool enqueue( value_type& val )
627         {
628             node_type * pNew = node_traits::to_node_ptr( val );
629             link_checker::is_empty( pNew );
630
631             typename gc::Guard guard;
632             back_off bkoff;
633
634             marked_ptr t;
635             while ( true ) {
636                 t = guard.protect( m_pTail, []( marked_ptr p ) -> value_type * { return node_traits::to_value_ptr( p.ptr());});
637
638                 marked_ptr pNext = t->m_pNext.load(memory_model::memory_order_acquire );
639
640                 if ( pNext.ptr() == nullptr ) {
641                     pNew->m_pNext.store( marked_ptr(), memory_model::memory_order_release );
642                     if ( t->m_pNext.compare_exchange_weak( pNext, marked_ptr(pNew), memory_model::memory_order_release, atomics::memory_order_relaxed )) {
643                         if ( !m_pTail.compare_exchange_strong( t, marked_ptr(pNew), memory_model::memory_order_release, atomics::memory_order_relaxed ))
644                             m_Stat.onAdvanceTailFailed();
645                         break;
646                     }
647
648                     // Try adding to basket
649                     m_Stat.onTryAddBasket();
650
651                     // Reread tail next
652                     typename gc::Guard gNext;
653
654                 try_again:
655                     pNext = gNext.protect( t->m_pNext, []( marked_ptr p ) -> value_type * { return node_traits::to_value_ptr( p.ptr());});
656
657                     // add to the basket
658                     if ( m_pTail.load(memory_model::memory_order_acquire) == t
659                          && t->m_pNext.load( memory_model::memory_order_relaxed) == pNext
660                          && !pNext.bits())
661                     {
662                         bkoff();
663                         pNew->m_pNext.store( pNext, memory_model::memory_order_relaxed );
664                         if ( t->m_pNext.compare_exchange_weak( pNext, marked_ptr( pNew ), memory_model::memory_order_release, atomics::memory_order_relaxed )) {
665                             m_Stat.onAddBasket();
666                             break;
667                         }
668                         goto try_again;
669                     }
670                 }
671                 else {
672                     // Tail is misplaced, advance it
673
674                     typename gc::template GuardArray<2> g;
675                     g.assign( 0, node_traits::to_value_ptr( pNext.ptr()));
676                     if ( m_pTail.load( memory_model::memory_order_acquire ) != t
677
678                       || t->m_pNext.load( memory_model::memory_order_relaxed ) != pNext )
679
680                     {
681                         m_Stat.onEnqueueRace();
682                         bkoff();
683                         continue;
684                     }
685
686                     marked_ptr p;
687                     bool bTailOk = true;
688                     while ( (p = pNext->m_pNext.load( memory_model::memory_order_relaxed )).ptr() != nullptr )
689                     {
690                         bTailOk = m_pTail.load( memory_model::memory_order_acquire ) == t;
691                         if ( !bTailOk )
692                             break;
693
694                         g.assign( 1, node_traits::to_value_ptr( p.ptr()));
695                         if ( pNext->m_pNext.load(memory_model::memory_order_acquire) != p )
696                             continue;
697                         pNext = p;
698                         g.assign( 0, g.template get<value_type>( 1 ));
699                     }
700                     if ( !bTailOk || !m_pTail.compare_exchange_weak( t, marked_ptr( pNext.ptr()), memory_model::memory_order_release, atomics::memory_order_relaxed ))
701                         m_Stat.onAdvanceTailFailed();
702
703                     m_Stat.onBadTail();
704                 }
705
706                 m_Stat.onEnqueueRace();
707             }
708
709             ++m_ItemCounter;
710             m_Stat.onEnqueue();
711
712             return true;
713         }
714
715         /// Synonym for \p enqueue() function
716         bool push( value_type& val )
717         {
718             return enqueue( val );
719         }
720
721         /// Dequeues a value from the queue
722         /** @anchor cds_intrusive_BasketQueue_dequeue
723             If the queue is empty the function returns \p nullptr.
724
725             @note See \p MSQueue::dequeue() note about item disposing
726         */
727         value_type * dequeue()
728         {
729             dequeue_result res;
730
731             if ( do_dequeue( res, true ))
732                 return node_traits::to_value_ptr( *res.pNext );
733             return nullptr;
734         }
735
736         /// Synonym for \p dequeue() function
737         value_type * pop()
738         {
739             return dequeue();
740         }
741
742         /// Checks if the queue is empty
743         /**
744             Note that this function is not \p const.
745             The function is based on \p dequeue() algorithm
746             but really it does not dequeue any item.
747         */
748         bool empty()
749         {
750             dequeue_result res;
751             return !do_dequeue( res, false );
752         }
753
754         /// Clear the queue
755         /**
756             The function repeatedly calls \p dequeue() until it returns \p nullptr.
757             The disposer defined in template \p Traits is called for each item
758             that can be safely disposed.
759         */
760         void clear()
761         {
762             while ( dequeue());
763         }
764
765         /// Returns queue's item count
766         /**
767             The value returned depends on \p Traits (see basket_queue::traits::item_counter). For \p atomicity::empty_item_counter,
768             this function always returns 0.
769
770             @note Even if you use real item counter and it returns 0, this fact is not mean that the queue
771             is empty. To check queue emptyness use \p empty() method.
772         */
773         size_t size() const
774         {
775             return m_ItemCounter.value();
776         }
777
778         /// Returns reference to internal statistics
779         const stat& statistics() const
780         {
781             return m_Stat;
782         }
783     };
784
785 }} // namespace cds::intrusive
786
787 #endif // #ifndef CDSLIB_INTRUSIVE_BASKET_QUEUE_H