X-Git-Url: http://demsky.eecs.uci.edu/git/?a=blobdiff_plain;f=gdax-orderbook-hpp%2Fdemo%2Fdependencies%2Flibcds-2.3.2%2Fcds%2Fintrusive%2Fmspriority_queue.h;fp=gdax-orderbook-hpp%2Fdemo%2Fdependencies%2Flibcds-2.3.2%2Fcds%2Fintrusive%2Fmspriority_queue.h;h=a41cd9893650fe9491810570de6b104fb84e1bab;hb=4223430d9168223029c7639149025c79e69b4f37;hp=0000000000000000000000000000000000000000;hpb=7ea7751a31c0388bf888052517be181a2989b113;p=c11concurrency-benchmarks.git diff --git a/gdax-orderbook-hpp/demo/dependencies/libcds-2.3.2/cds/intrusive/mspriority_queue.h b/gdax-orderbook-hpp/demo/dependencies/libcds-2.3.2/cds/intrusive/mspriority_queue.h new file mode 100644 index 0000000..a41cd98 --- /dev/null +++ b/gdax-orderbook-hpp/demo/dependencies/libcds-2.3.2/cds/intrusive/mspriority_queue.h @@ -0,0 +1,543 @@ +/* + This file is a part of libcds - Concurrent Data Structures library + + (C) Copyright Maxim Khizhinsky (libcds.dev@gmail.com) 2006-2017 + + Source code repo: http://github.com/khizmax/libcds/ + Download: http://sourceforge.net/projects/libcds/files/ + + Redistribution and use in source and binary forms, with or without + modification, are permitted provided that the following conditions are met: + + * Redistributions of source code must retain the above copyright notice, this + list of conditions and the following disclaimer. + + * Redistributions in binary form must reproduce the above copyright notice, + this list of conditions and the following disclaimer in the documentation + and/or other materials provided with the distribution. + + THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" + AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE + IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE + DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE + FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL + DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR + SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER + CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, + OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. +*/ + +#ifndef CDSLIB_INTRUSIVE_MSPRIORITY_QUEUE_H +#define CDSLIB_INTRUSIVE_MSPRIORITY_QUEUE_H + +#include // std::unique_lock +#include +#include +#include +#include +#include +#include +#include +#include + +namespace cds { namespace intrusive { + + /// MSPriorityQueue related definitions + /** @ingroup cds_intrusive_helper + */ + namespace mspriority_queue { + + /// MSPriorityQueue statistics + template + struct stat { + typedef Counter event_counter ; ///< Event counter type + + event_counter m_nPushCount; ///< Count of success push operation + event_counter m_nPopCount; ///< Count of success pop operation + event_counter m_nPushFailCount; ///< Count of failed ("the queue is full") push operation + event_counter m_nPopFailCount; ///< Count of failed ("the queue is empty") pop operation + event_counter m_nPushHeapifySwapCount; ///< Count of item swapping when heapifying in push + event_counter m_nPopHeapifySwapCount; ///< Count of item swapping when heapifying in pop + event_counter m_nItemMovedTop; ///< Count of events when \p push() encountered that inserted item was moved to top by a concurrent \p pop() + event_counter m_nItemMovedUp; ///< Count of events when \p push() encountered that inserted item was moved upwards by a concurrent \p pop() + event_counter m_nPushEmptyPass; ///< Count of empty pass during heapify via concurrent operations + + //@cond + void onPushSuccess() { ++m_nPushCount ;} + void onPopSuccess() { ++m_nPopCount ;} + void onPushFailed() { ++m_nPushFailCount ;} + void onPopFailed() { ++m_nPopFailCount ;} + void onPushHeapifySwap() { ++m_nPushHeapifySwapCount ;} + void onPopHeapifySwap() { ++m_nPopHeapifySwapCount ;} + + void onItemMovedTop() { ++m_nItemMovedTop ;} + void onItemMovedUp() { ++m_nItemMovedUp ;} + void onPushEmptyPass() { ++m_nPushEmptyPass ;} + //@endcond + }; + + /// MSPriorityQueue empty statistics + struct empty_stat { + //@cond + void onPushSuccess() const {} + void onPopSuccess() const {} + void onPushFailed() const {} + void onPopFailed() const {} + void onPushHeapifySwap() const {} + void onPopHeapifySwap() const {} + + void onItemMovedTop() const {} + void onItemMovedUp() const {} + void onPushEmptyPass() const {} + //@endcond + }; + + /// MSPriorityQueue traits + struct traits { + /// Storage type + /** + The storage type for the heap array. Default is \p cds::opt::v::initialized_dynamic_buffer. + + You may specify any type of buffer's value since at instantiation time + the \p buffer::rebind member metafunction is called to change type + of values stored in the buffer. + */ + typedef opt::v::initialized_dynamic_buffer buffer; + + /// Priority compare functor + /** + No default functor is provided. If the option is not specified, the \p less is used. + */ + typedef opt::none compare; + + /// Specifies binary predicate used for priority comparing. + /** + Default is \p std::less. + */ + typedef opt::none less; + + /// Type of mutual-exclusion lock. The lock is not need to be recursive. + typedef cds::sync::spin lock_type; + + /// Back-off strategy + typedef backoff::Default back_off; + + /// Internal statistics + /** + Possible types: \p mspriority_queue::empty_stat (the default, no overhead), \p mspriority_queue::stat + or any other with interface like \p %mspriority_queue::stat + */ + typedef empty_stat stat; + }; + + /// Metafunction converting option list to traits + /** + \p Options: + - \p opt::buffer - the buffer type for heap array. Possible type are: \p opt::v::initialized_static_buffer, \p opt::v::initialized_dynamic_buffer. + Default is \p %opt::v::initialized_dynamic_buffer. + You may specify any type of value for the buffer since at instantiation time + the \p buffer::rebind member metafunction is called to change the type of values stored in the buffer. + - \p opt::compare - priority compare functor. No default functor is provided. + If the option is not specified, the \p opt::less is used. + - \p opt::less - specifies binary predicate used for priority compare. Default is \p std::less. + - \p opt::lock_type - lock type. Default is \p cds::sync::spin + - \p opt::back_off - back-off strategy. Default is \p cds::backoff::yield + - \p opt::stat - internal statistics. Available types: \p mspriority_queue::stat, \p mspriority_queue::empty_stat (the default, no overhead) + */ + template + struct make_traits { +# ifdef CDS_DOXYGEN_INVOKED + typedef implementation_defined type ; ///< Metafunction result +# else + typedef typename cds::opt::make_options< + typename cds::opt::find_type_traits< traits, Options... >::type + ,Options... + >::type type; +# endif + }; + + } // namespace mspriority_queue + + /// Michael & Scott array-based lock-based concurrent priority queue heap + /** @ingroup cds_intrusive_priority_queue + Source: + - [1996] G.Hunt, M.Michael, S. Parthasarathy, M.Scott + "An efficient algorithm for concurrent priority queue heaps" + + \p %MSPriorityQueue augments the standard array-based heap data structure with + a mutual-exclusion lock on the heap's size and locks on each node in the heap. + Each node also has a tag that indicates whether + it is empty, valid, or in a transient state due to an update to the heap + by an inserting thread. + The algorithm allows concurrent insertions and deletions in opposite directions, + without risking deadlock and without the need for special server threads. + It also uses a "bit-reversal" technique to scatter accesses across the fringe + of the tree to reduce contention. + On large heaps the algorithm achieves significant performance improvements + over serialized single-lock algorithm, for various insertion/deletion + workloads. For small heaps it still performs well, but not as well as + single-lock algorithm. + + Template parameters: + - \p T - type to be stored in the queue. The priority is a part of \p T type. + - \p Traits - type traits. See \p mspriority_queue::traits for explanation. + It is possible to declare option-based queue with \p cds::container::mspriority_queue::make_traits + metafunction instead of \p Traits template argument. + */ + template + class MSPriorityQueue: public cds::bounded_container + { + public: + typedef T value_type ; ///< Value type stored in the queue + typedef Traits traits ; ///< Traits template parameter + +# ifdef CDS_DOXYGEN_INVOKED + typedef implementation_defined key_comparator ; ///< priority comparing functor based on opt::compare and opt::less option setter. +# else + typedef typename opt::details::make_comparator< value_type, traits >::type key_comparator; +# endif + + typedef typename traits::lock_type lock_type; ///< heap's size lock type + typedef typename traits::back_off back_off; ///< Back-off strategy + typedef typename traits::stat stat; ///< internal statistics type, see \p mspriority_queue::traits::stat + typedef typename cds::bitop::bit_reverse_counter<> item_counter;///< Item counter type + + protected: + //@cond + typedef cds::OS::ThreadId tag_type; + + enum tag_value { + Available = -1, + Empty = 0 + }; + //@endcond + + //@cond + /// Heap item type + struct node { + value_type * m_pVal ; ///< A value pointer + tag_type volatile m_nTag ; ///< A tag + mutable lock_type m_Lock ; ///< Node-level lock + + /// Creates empty node + node() + : m_pVal( nullptr ) + , m_nTag( tag_type(Empty)) + {} + + /// Lock the node + void lock() + { + m_Lock.lock(); + } + + /// Unlock the node + void unlock() + { + m_Lock.unlock(); + } + }; + //@endcond + + public: + typedef typename traits::buffer::template rebind::other buffer_type ; ///< Heap array buffer type + + //@cond + typedef typename item_counter::counter_type counter_type; + //@endcond + + protected: + item_counter m_ItemCounter ; ///< Item counter + mutable lock_type m_Lock ; ///< Heap's size lock + buffer_type m_Heap ; ///< Heap array + stat m_Stat ; ///< internal statistics accumulator + + public: + /// Constructs empty priority queue + /** + For \p cds::opt::v::initialized_static_buffer the \p nCapacity parameter is ignored. + */ + MSPriorityQueue( size_t nCapacity ) + : m_Heap( nCapacity ) + {} + + /// Clears priority queue and destructs the object + ~MSPriorityQueue() + { + clear(); + } + + /// Inserts a item into priority queue + /** + If the priority queue is full, the function returns \p false, + no item has been added. + Otherwise, the function inserts the pointer to \p val into the heap + and returns \p true. + + The function does not make a copy of \p val. + */ + bool push( value_type& val ) + { + tag_type const curId = cds::OS::get_current_thread_id(); + + // Insert new item at bottom of the heap + m_Lock.lock(); + if ( m_ItemCounter.value() >= capacity()) { + // the heap is full + m_Lock.unlock(); + m_Stat.onPushFailed(); + return false; + } + + counter_type i = m_ItemCounter.inc(); + assert( i < m_Heap.capacity()); + + node& refNode = m_Heap[i]; + refNode.lock(); + m_Lock.unlock(); + assert( refNode.m_nTag == tag_type( Empty )); + assert( refNode.m_pVal == nullptr ); + refNode.m_pVal = &val; + refNode.m_nTag = curId; + refNode.unlock(); + + // Move item towards top of heap while it has a higher priority than its parent + heapify_after_push( i, curId ); + + m_Stat.onPushSuccess(); + return true; + } + + /// Extracts item with high priority + /** + If the priority queue is empty, the function returns \p nullptr. + Otherwise, it returns the item extracted. + */ + value_type * pop() + { + node& refTop = m_Heap[1]; + + m_Lock.lock(); + if ( m_ItemCounter.value() == 0 ) { + // the heap is empty + m_Lock.unlock(); + m_Stat.onPopFailed(); + return nullptr; + } + counter_type nBottom = m_ItemCounter.dec(); + assert( nBottom < m_Heap.capacity()); + assert( nBottom > 0 ); + + refTop.lock(); + if ( nBottom == 1 ) { + refTop.m_nTag = tag_type( Empty ); + value_type * pVal = refTop.m_pVal; + refTop.m_pVal = nullptr; + refTop.unlock(); + m_Lock.unlock(); + m_Stat.onPopSuccess(); + return pVal; + } + + node& refBottom = m_Heap[nBottom]; + refBottom.lock(); + m_Lock.unlock(); + refBottom.m_nTag = tag_type(Empty); + value_type * pVal = refBottom.m_pVal; + refBottom.m_pVal = nullptr; + refBottom.unlock(); + + if ( refTop.m_nTag == tag_type(Empty)) { + // nBottom == nTop + refTop.unlock(); + m_Stat.onPopSuccess(); + return pVal; + } + + std::swap( refTop.m_pVal, pVal ); + refTop.m_nTag = tag_type( Available ); + + // refTop will be unlocked inside heapify_after_pop + heapify_after_pop( &refTop ); + + m_Stat.onPopSuccess(); + return pVal; + } + + /// Clears the queue (not atomic) + /** + This function is no atomic, but thread-safe + */ + void clear() + { + clear_with( []( value_type const& /*src*/ ) {} ); + } + + /// Clears the queue (not atomic) + /** + This function is no atomic, but thread-safe. + + For each item removed the functor \p f is called. + \p Func interface is: + \code + struct clear_functor + { + void operator()( value_type& item ); + }; + \endcode + A lambda function or a function pointer can be used as \p f. + */ + template + void clear_with( Func f ) + { + value_type * pVal; + while (( pVal = pop()) != nullptr ) + f( *pVal ); + } + + /// Checks is the priority queue is empty + bool empty() const + { + return size() == 0; + } + + /// Checks if the priority queue is full + bool full() const + { + return size() == capacity(); + } + + /// Returns current size of priority queue + size_t size() const + { + std::unique_lock l( m_Lock ); + return static_cast( m_ItemCounter.value()); + } + + /// Return capacity of the priority queue + size_t capacity() const + { + // m_Heap[0] is not used + return m_Heap.capacity() - 1; + } + + /// Returns const reference to internal statistics + stat const& statistics() const + { + return m_Stat; + } + + protected: + //@cond + + void heapify_after_push( counter_type i, tag_type curId ) + { + key_comparator cmp; + back_off bkoff; + + // Move item towards top of the heap while it has higher priority than parent + while ( i > 1 ) { + bool bProgress = true; + counter_type nParent = i / 2; + node& refParent = m_Heap[nParent]; + refParent.lock(); + node& refItem = m_Heap[i]; + refItem.lock(); + + if ( refParent.m_nTag == tag_type(Available) && refItem.m_nTag == curId ) { + if ( cmp( *refItem.m_pVal, *refParent.m_pVal ) > 0 ) { + std::swap( refItem.m_nTag, refParent.m_nTag ); + std::swap( refItem.m_pVal, refParent.m_pVal ); + m_Stat.onPushHeapifySwap(); + i = nParent; + } + else { + refItem.m_nTag = tag_type(Available); + i = 0; + } + } + else if ( refParent.m_nTag == tag_type( Empty )) { + m_Stat.onItemMovedTop(); + i = 0; + } + else if ( refItem.m_nTag != curId ) { + m_Stat.onItemMovedUp(); + i = nParent; + } + else { + m_Stat.onPushEmptyPass(); + bProgress = false; + } + + refItem.unlock(); + refParent.unlock(); + + if ( !bProgress ) + bkoff(); + else + bkoff.reset(); + } + + if ( i == 1 ) { + node& refItem = m_Heap[i]; + refItem.lock(); + if ( refItem.m_nTag == curId ) + refItem.m_nTag = tag_type(Available); + refItem.unlock(); + } + } + + void heapify_after_pop( node * pParent ) + { + key_comparator cmp; + counter_type const nCapacity = m_Heap.capacity(); + + counter_type nParent = 1; + for ( counter_type nChild = nParent * 2; nChild < nCapacity; nChild *= 2 ) { + node* pChild = &m_Heap[ nChild ]; + pChild->lock(); + + if ( pChild->m_nTag == tag_type( Empty )) { + pChild->unlock(); + break; + } + + counter_type const nRight = nChild + 1; + if ( nRight < nCapacity ) { + node& refRight = m_Heap[nRight]; + refRight.lock(); + + if ( refRight.m_nTag != tag_type( Empty ) && cmp( *refRight.m_pVal, *pChild->m_pVal ) > 0 ) { + // get right child + pChild->unlock(); + nChild = nRight; + pChild = &refRight; + } + else + refRight.unlock(); + } + + // If child has higher priority than parent then swap + // Otherwise stop + if ( cmp( *pChild->m_pVal, *pParent->m_pVal ) > 0 ) { + std::swap( pParent->m_nTag, pChild->m_nTag ); + std::swap( pParent->m_pVal, pChild->m_pVal ); + pParent->unlock(); + m_Stat.onPopHeapifySwap(); + nParent = nChild; + pParent = pChild; + } + else { + pChild->unlock(); + break; + } + } + pParent->unlock(); + } + //@endcond + }; + +}} // namespace cds::intrusive + +#endif // #ifndef CDSLIB_INTRUSIVE_MSPRIORITY_QUEUE_H