X-Git-Url: http://demsky.eecs.uci.edu/git/?a=blobdiff_plain;f=cds%2Fintrusive%2Ffcqueue.h;h=5ec67cd66f6313f95063b0ace6e3bc54e6355b68;hb=4862d76f4493b907bea9820b215209a26cf851f4;hp=7cd811365846814bc9a46b2c2dd721120eadbad8;hpb=e5a7366d7fd32c9d0d540e094a7810120c31856c;p=libcds.git diff --git a/cds/intrusive/fcqueue.h b/cds/intrusive/fcqueue.h index 7cd81136..5ec67cd6 100644 --- a/cds/intrusive/fcqueue.h +++ b/cds/intrusive/fcqueue.h @@ -1,7 +1,35 @@ -//$$CDS-header$$ +/* + This file is a part of libcds - Concurrent Data Structures library -#ifndef __CDS_INTRUSIVE_FCQUEUE_H -#define __CDS_INTRUSIVE_FCQUEUE_H + (C) Copyright Maxim Khizhinsky (libcds.dev@gmail.com) 2006-2017 + + Source code repo: http://github.com/khizmax/libcds/ + Download: http://sourceforge.net/projects/libcds/files/ + + Redistribution and use in source and binary forms, with or without + modification, are permitted provided that the following conditions are met: + + * Redistributions of source code must retain the above copyright notice, this + list of conditions and the following disclaimer. + + * Redistributions in binary form must reproduce the above copyright notice, + this list of conditions and the following disclaimer in the documentation + and/or other materials provided with the distribution. + + THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" + AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE + IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE + DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE + FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL + DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR + SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER + CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, + OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. +*/ + +#ifndef CDSLIB_INTRUSIVE_FCQUEUE_H +#define CDSLIB_INTRUSIVE_FCQUEUE_H #include #include @@ -10,10 +38,10 @@ namespace cds { namespace intrusive { - /// FCQueue related definitions + /// \p FCQueue related definitions namespace fcqueue { - /// FCQueue internal statistics + /// \p FCQueue internal statistics template struct stat: public cds::algo::flat_combining::stat { @@ -42,7 +70,7 @@ namespace cds { namespace intrusive { //@endcond }; - /// FCQueue type traits + /// \p FCQueue type traits struct traits: public cds::algo::flat_combining::traits { typedef cds::intrusive::opt::v::empty_disposer disposer ; ///< Disposer to erase removed elements. Used only in \p FCQueue::clear() function @@ -53,17 +81,12 @@ namespace cds { namespace intrusive { /// Metafunction converting option list to traits /** \p Options are: - - \p opt::lock_type - mutex type, default is \p cds::sync::spin - - \p opt::back_off - back-off strategy, defalt is \p cds::backoff::Default - - \p opt::disposer - the functor used for dispose removed items. Default is \p opt::intrusive::v::empty_disposer. + - any \p cds::algo::flat_combining::make_traits options + - \p opt::disposer - the functor used to dispose removed items. Default is \p opt::intrusive::v::empty_disposer. This option is used only in \p FCQueue::clear() function. - - \p opt::allocator - allocator type, default is \ref CDS_DEFAULT_ALLOCATOR - \p opt::stat - internal statistics, possible type: \p fcqueue::stat, \p fcqueue::empty_stat (the default) - - \p opt::memory_model - C++ memory ordering model. - List of all available memory ordering see \p opt::memory_model. - Default is \p cds::opt::v:relaxed_ordering - \p opt::enable_elimination - enable/disable operation \ref cds_elimination_description "elimination" - By default, the elimination is disabled. + By default, the elimination is disabled (\p false) */ template struct make_traits { @@ -133,8 +156,8 @@ namespace cds { namespace intrusive { protected: //@cond - fc_kernel m_FlatCombining; - container_type m_Queue; + mutable fc_kernel m_FlatCombining; + container_type m_Queue; //@endcond public: @@ -156,7 +179,7 @@ namespace cds { namespace intrusive { */ bool enqueue( value_type& val ) { - fc_record * pRec = m_FlatCombining.acquire_record(); + auto pRec = m_FlatCombining.acquire_record(); pRec->pVal = &val; if ( c_bEliminationEnabled ) @@ -164,7 +187,7 @@ namespace cds { namespace intrusive { else m_FlatCombining.combine( op_enq, pRec, *this ); - assert( pRec->is_done() ); + assert( pRec->is_done()); m_FlatCombining.release_record( pRec ); m_FlatCombining.internal_statistics().onEnqueue(); return true; @@ -182,7 +205,7 @@ namespace cds { namespace intrusive { */ value_type * dequeue() { - fc_record * pRec = m_FlatCombining.acquire_record(); + auto pRec = m_FlatCombining.acquire_record(); pRec->pVal = nullptr; if ( c_bEliminationEnabled ) @@ -190,7 +213,7 @@ namespace cds { namespace intrusive { else m_FlatCombining.combine( op_deq, pRec, *this ); - assert( pRec->is_done() ); + assert( pRec->is_done()); m_FlatCombining.release_record( pRec ); m_FlatCombining.internal_statistics().onDequeue( pRec->bEmpty ); @@ -210,14 +233,14 @@ namespace cds { namespace intrusive { */ void clear( bool bDispose = false ) { - fc_record * pRec = m_FlatCombining.acquire_record(); + auto pRec = m_FlatCombining.acquire_record(); if ( c_bEliminationEnabled ) m_FlatCombining.batch_combine( bDispose ? op_clear_and_dispose : op_clear, pRec, *this ); else m_FlatCombining.combine( bDispose ? op_clear_and_dispose : op_clear, pRec, *this ); - assert( pRec->is_done() ); + assert( pRec->is_done()); m_FlatCombining.release_record( pRec ); } @@ -238,8 +261,10 @@ namespace cds { namespace intrusive { */ bool empty() const { - m_FlatCombining.wait_while_combining(); - return m_Queue.empty(); + bool bRet = false; + auto const& queue = m_Queue; + m_FlatCombining.invoke_exclusive([&queue, &bRet]() { bRet = queue.empty(); }); + return bRet; } /// Internal statistics @@ -260,10 +285,14 @@ namespace cds { namespace intrusive { { assert( pRec ); - switch ( pRec->op() ) { + // this function is called under FC mutex, so switch TSan off + // All TSan warnings are false positive + //CDS_TSAN_ANNOTATE_IGNORE_RW_BEGIN; + + switch ( pRec->op()) { case op_enq: assert( pRec->pVal ); - m_Queue.push_back( *(pRec->pVal ) ); + m_Queue.push_back( *(pRec->pVal )); break; case op_deq: pRec->bEmpty = m_Queue.empty(); @@ -276,23 +305,28 @@ namespace cds { namespace intrusive { m_Queue.clear(); break; case op_clear_and_dispose: - m_Queue.clear_and_dispose( disposer() ); + m_Queue.clear_and_dispose( disposer()); break; default: assert(false); break; } + //CDS_TSAN_ANNOTATE_IGNORE_RW_END; } /// Batch-processing flat combining void fc_process( typename fc_kernel::iterator itBegin, typename fc_kernel::iterator itEnd ) { + // this function is called under FC mutex, so switch TSan off + // All TSan warnings are false positive + //CDS_TSAN_ANNOTATE_IGNORE_RW_BEGIN; + typedef typename fc_kernel::iterator fc_iterator; for ( fc_iterator it = itBegin, itPrev = itEnd; it != itEnd; ++it ) { - switch ( it->op() ) { + switch ( it->op( atomics::memory_order_acquire )) { case op_enq: case op_deq: - if ( m_Queue.empty() ) { + if ( m_Queue.empty()) { if ( itPrev != itEnd && collide( *itPrev, *it )) itPrev = itEnd; else @@ -301,6 +335,7 @@ namespace cds { namespace intrusive { break; } } + //CDS_TSAN_ANNOTATE_IGNORE_RW_END; } //@endcond @@ -308,9 +343,9 @@ namespace cds { namespace intrusive { //@cond bool collide( fc_record& rec1, fc_record& rec2 ) { - assert( m_Queue.empty() ); + assert( m_Queue.empty()); - switch ( rec1.op() ) { + switch ( rec1.op()) { case op_enq: if ( rec2.op() == op_deq ) { assert(rec1.pVal); @@ -341,4 +376,4 @@ namespace cds { namespace intrusive { }} // namespace cds::intrusive -#endif // #ifndef __CDS_INTRUSIVE_FCQUEUE_H +#endif // #ifndef CDSLIB_INTRUSIVE_FCQUEUE_H