}
counter_type nBottom = m_ItemCounter.reversed_value();
m_ItemCounter.dec();
- // Since m_Heap[0] is not used, capacity() returns m_Heap.capacity() - 1
- // Consequently, "<=" is here
- assert( nBottom <= capacity() );
+ assert( nBottom < m_Heap.capacity() );
assert( nBottom > 0 );
node& refBottom = m_Heap[ nBottom ];
refTop.m_nTag = tag_type( Available );
// refTop will be unlocked inside heapify_after_pop
- heapify_after_pop( 1, &refTop );
+ heapify_after_pop( &refTop );
m_Stat.onPopSuccess();
return pVal;
}
}
- void heapify_after_pop( counter_type nParent, node * pParent )
+ void heapify_after_pop( node * pParent )
{
key_comparator cmp;
+ counter_type const nCapacity = m_Heap.capacity();
- while ( nParent < m_Heap.capacity() / 2 ) {
- counter_type nLeft = nParent * 2;
- counter_type nRight = nLeft + 1;
- node& refLeft = m_Heap[nLeft];
- node& refRight = m_Heap[nRight];
- refLeft.lock();
- refRight.lock();
-
- counter_type nChild;
- node * pChild;
- if ( refLeft.m_nTag == tag_type(Empty) ) {
- refRight.unlock();
- refLeft.unlock();
+ counter_type nParent = 1;
+ for ( counter_type nChild = nParent * 2; nChild < nCapacity; nChild *= 2 ) {
+ node* pChild = &m_Heap[ nChild ];
+ pChild->lock();
+
+ if ( pChild->m_nTag == tag_type( Empty )) {
+ pChild->unlock();
break;
}
- else if ( refRight.m_nTag == tag_type(Empty) || cmp( *refLeft.m_pVal, *refRight.m_pVal ) > 0 ) {
- refRight.unlock();
- nChild = nLeft;
- pChild = &refLeft;
- }
- else {
- refLeft.unlock();
- nChild = nRight;
- pChild = &refRight;
+
+ counter_type const nRight = nChild + 1;
+ if ( nRight < nCapacity ) {
+ node& refRight = m_Heap[nRight];
+ refRight.lock();
+
+ if ( refRight.m_nTag != tag_type( Empty ) && cmp( *refRight.m_pVal, *pChild->m_pVal ) > 0 ) {
+ // get right child
+ pChild->unlock();
+ nChild = nRight;
+ pChild = &refRight;
+ }
+ else
+ refRight.unlock();
}
// If child has higher priority that parent then swap
typedef cds_test::stress_fixture base_class;
protected:
- template <class PQueue>
- class Producer: public cds_test::thread
- {
- typedef cds_test::thread base_class;
-
- public:
- Producer( cds_test::thread_pool& pool, PQueue& queue )
- : base_class( pool )
- , m_Queue( queue )
- {}
-
- Producer( Producer& src )
- : base_class( src )
- , m_Queue( src.m_Queue )
- {}
-
- virtual thread * clone()
- {
- return new Producer( *this );
- }
-
- virtual void test()
- {
- typedef typename PQueue::value_type value_type;
- for ( array_type::const_iterator it = m_arr.begin(); it != m_arr.end(); ++it ) {
- if ( !m_Queue.push( value_type( *it ) ))
- ++m_nPushError;
- }
- }
-
- void prepare( size_t nStart, size_t nEnd )
- {
- m_arr.reserve( nEnd - nStart );
- for ( size_t i = nStart; i < nEnd; ++i )
- m_arr.push_back( i );
- shuffle( m_arr.begin(), m_arr.end() );
- }
-
- public:
- PQueue& m_Queue;
- size_t m_nPushError = 0;
-
- typedef std::vector<size_t> array_type;
- array_type m_arr;
- };
-
template <class PQueue>
class Consumer: public cds_test::thread
{
template <class PQueue>
void test( PQueue& q )
{
- size_t const nThreadItemCount = s_nQueueSize / s_nThreadCount;
- s_nQueueSize = nThreadItemCount * s_nThreadCount;
-
cds_test::thread_pool& pool = get_pool();
propout() << std::make_pair( "thread_count", s_nThreadCount )
// push
{
- pool.add( new Producer<PQueue>( pool, q ), s_nThreadCount );
+ std::vector< size_t > arr;
+ arr.reserve( s_nQueueSize );
+ for ( size_t i = 0; i < s_nQueueSize; ++i )
+ arr.push_back( i );
+ shuffle( arr.begin(), arr.end() );
- size_t nStart = 0;
- for ( size_t i = 0; i < pool.size(); ++i ) {
- static_cast<Producer<PQueue>&>( pool.get(i) ).prepare( nStart, nStart + nThreadItemCount );
- nStart += nThreadItemCount;
- }
-
- std::chrono::milliseconds duration = pool.run();
- propout() << std::make_pair( "producer_duration", duration );
+ typedef typename PQueue::value_type value_type;
+ for ( auto it = arr.begin(); it != arr.end(); ++it )
+ q.push( value_type( *it ));
}
// pop
{
- pool.clear();
pool.add( new Consumer<PQueue>( pool, q ), s_nThreadCount );
std::chrono::milliseconds duration = pool.run();
TEST_F( fixture_t, pqueue_t ) \
{ \
typedef pqueue::Types<pqueue::simple_value>::pqueue_t pqueue_type; \
- pqueue_type pq( s_nQueueSize ); \
+ pqueue_type pq( s_nQueueSize + 1 ); \
test( pq ); \
}
CDSSTRESS_MSPriorityQueue( pqueue_pop, MSPriorityQueue_dyn_less )