2 This file is a part of libcds - Concurrent Data Structures library
4 (C) Copyright Maxim Khizhinsky (libcds.dev@gmail.com) 2006-2016
6 Source code repo: http://github.com/khizmax/libcds/
7 Download: http://sourceforge.net/projects/libcds/files/
9 Redistribution and use in source and binary forms, with or without
10 modification, are permitted provided that the following conditions are met:
12 * Redistributions of source code must retain the above copyright notice, this
13 list of conditions and the following disclaimer.
15 * Redistributions in binary form must reproduce the above copyright notice,
16 this list of conditions and the following disclaimer in the documentation
17 and/or other materials provided with the distribution.
19 THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
20 AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
21 IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
22 DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE
23 FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
24 DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
25 SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
26 CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,
27 OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
28 OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
31 #include "intrusive_queue_type.h"
35 // Multi-threaded random queue test
38 static size_t s_nReaderThreadCount = 4;
39 static size_t s_nWriterThreadCount = 4;
40 static size_t s_nQueueSize = 4000000;
42 static unsigned int s_nFCPassCount = 8;
43 static unsigned int s_nFCCompactFactor = 64;
45 static atomics::atomic< size_t > s_nProducerCount(0);
46 static size_t s_nThreadPushCount;
47 static CDS_CONSTEXPR const size_t c_nBadConsumer = 0xbadc0ffe;
51 template <typename Base = empty >
52 struct value_type: public Base
59 class intrusive_queue_push_pop: public cds_test::stress_fixture
61 typedef cds_test::stress_fixture base_class;
69 template <class Queue>
70 class Producer: public cds_test::thread
72 typedef cds_test::thread base_class;
75 Producer( cds_test::thread_pool& pool, Queue& q )
76 : base_class( pool, producer_thread )
79 Producer( Producer& src )
81 , m_Queue( src.m_Queue )
84 virtual thread * clone()
86 return new Producer( *this );
92 for ( typename Queue::value_type * p = m_pStart; p < m_pEnd; ) {
95 CDS_TSAN_ANNOTATE_HAPPENS_BEFORE( &p->nWriterNo );
96 if ( m_Queue.push( *p )) {
103 s_nProducerCount.fetch_sub( 1, atomics::memory_order_release );
108 size_t m_nPushFailed = 0;
110 // Interval in m_arrValue
111 typename Queue::value_type * m_pStart;
112 typename Queue::value_type * m_pEnd;
115 template <class Queue>
116 class Consumer: public cds_test::thread
118 typedef cds_test::thread base_class;
122 size_t m_nPopEmpty = 0;
123 size_t m_nPopped = 0;
124 size_t m_nBadWriter = 0;
126 typedef std::vector<size_t> TPoppedData;
127 typedef std::vector<size_t>::iterator data_iterator;
128 typedef std::vector<size_t>::const_iterator const_data_iterator;
130 std::vector<TPoppedData> m_WriterData;
133 void initPoppedData()
135 const size_t nWriterCount = s_nWriterThreadCount;
136 const size_t nWriterPushCount = s_nThreadPushCount;
137 m_WriterData.resize( nWriterCount );
138 for ( size_t i = 0; i < nWriterCount; ++i )
139 m_WriterData[i].reserve( nWriterPushCount );
143 Consumer( cds_test::thread_pool& pool, Queue& q )
144 : base_class( pool, consumer_thread )
149 Consumer( Consumer& src )
151 , m_Queue( src.m_Queue )
156 virtual thread * clone()
158 return new Consumer( *this );
163 size_t const nTotalWriters = s_nWriterThreadCount;
166 typename Queue::value_type * p = m_Queue.pop();
170 CDS_TSAN_ANNOTATE_HAPPENS_AFTER( &p->nWriterNo );
171 if ( p->nWriterNo < nTotalWriters )
172 m_WriterData[ p->nWriterNo ].push_back( p->nNo );
178 if ( s_nProducerCount.load( atomics::memory_order_acquire ) == 0 && m_Queue.empty() )
185 template <typename T>
188 std::unique_ptr<T[]> m_pArr;
190 value_array( size_t nSize )
191 : m_pArr( new T[nSize] )
194 T * get() const { return m_pArr.get(); }
198 static void SetUpTestCase()
200 cds_test::config const& cfg = get_config( "queue_random" );
202 s_nReaderThreadCount = cfg.get_size_t( "ReaderCount", s_nReaderThreadCount );
203 s_nWriterThreadCount = cfg.get_size_t( "WriterCount", s_nWriterThreadCount );
204 s_nQueueSize = cfg.get_size_t( "QueueSize", s_nQueueSize );
206 s_nFCPassCount = cfg.get_uint( "FCPassCount", s_nFCPassCount );
207 s_nFCCompactFactor = cfg.get_uint( "FCCompactFactor", s_nFCCompactFactor );
209 if ( s_nReaderThreadCount == 0u )
210 s_nReaderThreadCount = 1;
211 if ( s_nWriterThreadCount == 0u )
212 s_nWriterThreadCount = 1;
213 if ( s_nQueueSize == 0u )
217 //static void TearDownTestCase();
220 template <class Queue>
221 void analyze( Queue& testQueue, size_t /*nLeftOffset*/, size_t nRightOffset )
223 typedef Consumer<Queue> Reader;
224 typedef Producer<Queue> Writer;
225 typedef typename Reader::const_data_iterator ReaderIterator;
227 size_t nPostTestPops = 0;
228 while ( testQueue.pop() )
231 size_t nTotalPops = 0;
232 size_t nPopFalse = 0;
233 size_t nPoppedItems = 0;
234 size_t nPushFailed = 0;
236 std::vector< Reader * > arrReaders;
238 cds_test::thread_pool& pool = get_pool();
239 for ( size_t i = 0; i < pool.size(); ++i ) {
240 cds_test::thread& thr = pool.get( i );
241 if ( thr.type() == consumer_thread ) {
242 Consumer<Queue>& consumer = static_cast<Consumer<Queue>&>( thr );
243 nTotalPops += consumer.m_nPopped;
244 nPopFalse += consumer.m_nPopEmpty;
245 arrReaders.push_back( &consumer );
246 EXPECT_EQ( consumer.m_nBadWriter, 0u ) << "consumer " << (i - s_nWriterThreadCount);
249 for ( size_t n = 0; n < s_nWriterThreadCount; ++n )
250 nPopped += consumer.m_WriterData[n].size();
254 s << "consumer" << (i - s_nWriterThreadCount) << "_popped";
255 propout() << std::make_pair( s.str().c_str(), nPopped );
257 nPoppedItems += nPopped;
260 Producer<Queue>& producer = static_cast<Producer<Queue>&>( thr );
261 nPushFailed += producer.m_nPushFailed;
262 if ( !std::is_base_of<cds::bounded_container, Queue>::value ) {
263 EXPECT_EQ( producer.m_nPushFailed, 0u ) << "producer " << i;
267 EXPECT_EQ( nTotalPops, nPoppedItems );
269 propout() << std::make_pair( "success_pop", nTotalPops )
270 << std::make_pair( "empty_pop", nPopFalse )
271 << std::make_pair( "failed_push", nPushFailed );
273 size_t nQueueSize = s_nThreadPushCount * s_nWriterThreadCount;
274 EXPECT_EQ( nTotalPops + nPostTestPops, nQueueSize );
275 EXPECT_TRUE( testQueue.empty() );
277 // Test that all items have been popped
279 for ( size_t nWriter = 0; nWriter < s_nWriterThreadCount; ++nWriter ) {
280 std::vector<size_t> arrData;
281 arrData.reserve( s_nThreadPushCount );
282 for ( size_t nReader = 0; nReader < arrReaders.size(); ++nReader ) {
283 ReaderIterator it = arrReaders[nReader]->m_WriterData[nWriter].begin();
284 ReaderIterator itEnd = arrReaders[nReader]->m_WriterData[nWriter].end();
286 ReaderIterator itPrev = it;
287 for ( ++it; it != itEnd; ++it ) {
288 EXPECT_LT( *itPrev, *it + nRightOffset )
289 << "Reader " << nReader << ", Writer " << nWriter << ": prev=" << *itPrev << ", cur=" << *it;
294 for ( it = arrReaders[nReader]->m_WriterData[nWriter].begin(); it != itEnd; ++it )
295 arrData.push_back( *it );
297 std::sort( arrData.begin(), arrData.end() );
298 for ( size_t i=1; i < arrData.size(); ++i ) {
299 if ( arrData[i-1] + 1 != arrData[i] ) {
300 EXPECT_EQ( arrData[i-1] + 1, arrData[i] ) << "Writer " << nWriter << ": [" << (i-1) << "]=" << arrData[i-1]
301 << ", [" << i << "]=" << arrData[i];
305 EXPECT_EQ( arrData[0], 0u ) << "Writer " << nWriter;
306 EXPECT_EQ( arrData[arrData.size() - 1], s_nThreadPushCount - 1 ) << "Writer " << nWriter;
310 template <class Queue>
311 void test( Queue& q, value_array<typename Queue::value_type>& arrValue, size_t nLeftOffset, size_t nRightOffset )
313 s_nThreadPushCount = s_nQueueSize / s_nWriterThreadCount;
314 s_nQueueSize = s_nThreadPushCount * s_nWriterThreadCount;
315 propout() << std::make_pair( "producer_count", s_nWriterThreadCount )
316 << std::make_pair( "consumer_count", s_nReaderThreadCount )
317 << std::make_pair( "queue_size", s_nQueueSize );
319 typename Queue::value_type * pValStart = arrValue.get();
320 typename Queue::value_type * pValEnd = pValStart + s_nQueueSize;
322 cds_test::thread_pool& pool = get_pool();
323 s_nProducerCount.store( s_nWriterThreadCount, atomics::memory_order_release );
325 // Writers must be first
326 pool.add( new Producer<Queue>( pool, q ), s_nWriterThreadCount );
328 for ( typename Queue::value_type * it = pValStart; it != pValEnd; ++it ) {
331 it->nConsumer = c_nBadConsumer;
334 typename Queue::value_type * pStart = pValStart;
335 for ( size_t i = 0; i < pool.size(); ++i ) {
336 Producer<Queue>& producer = static_cast<Producer<Queue>&>( pool.get( i ));
337 producer.m_pStart = pStart;
338 pStart += s_nThreadPushCount;
339 producer.m_pEnd = pStart;
342 pool.add( new Consumer<Queue>( pool, q ), s_nReaderThreadCount );
344 std::chrono::milliseconds duration = pool.run();
345 propout() << std::make_pair( "duration", duration );
347 // Check that all values have been dequeued
349 size_t nBadConsumerCount = 0;
350 typename Queue::value_type * pEnd = pValStart + s_nQueueSize;
351 for ( typename Queue::value_type * it = pValStart; it != pEnd; ++it ) {
352 if ( it->nConsumer == c_nBadConsumer )
355 EXPECT_EQ( nBadConsumerCount, 0u );
358 analyze( q, nLeftOffset, nRightOffset );
360 propout() << q.statistics();
364 #define CDSSTRESS_QUEUE_F( QueueType, NodeType ) \
365 TEST_F( intrusive_queue_push_pop, QueueType ) \
367 typedef value_type<NodeType> node_type; \
368 typedef typename queue::Types< node_type >::QueueType queue_type; \
369 value_array<typename queue_type::value_type> arrValue( s_nQueueSize ); \
372 test( q, arrValue, 0, 0 ); \
374 queue_type::gc::force_dispose(); \
377 CDSSTRESS_QUEUE_F( MSQueue_HP, cds::intrusive::msqueue::node<cds::gc::HP> )
378 CDSSTRESS_QUEUE_F( MSQueue_HP_ic, cds::intrusive::msqueue::node<cds::gc::HP> )
379 CDSSTRESS_QUEUE_F( MSQueue_HP_stat, cds::intrusive::msqueue::node<cds::gc::HP> )
380 CDSSTRESS_QUEUE_F( MSQueue_DHP, cds::intrusive::msqueue::node<cds::gc::DHP> )
381 CDSSTRESS_QUEUE_F( MSQueue_DHP_ic, cds::intrusive::msqueue::node<cds::gc::DHP> )
382 CDSSTRESS_QUEUE_F( MSQueue_DHP_stat, cds::intrusive::msqueue::node<cds::gc::DHP> )
384 CDSSTRESS_QUEUE_F( MoirQueue_HP, cds::intrusive::msqueue::node<cds::gc::HP> )
385 CDSSTRESS_QUEUE_F( MoirQueue_HP_ic, cds::intrusive::msqueue::node<cds::gc::HP> )
386 CDSSTRESS_QUEUE_F( MoirQueue_HP_stat, cds::intrusive::msqueue::node<cds::gc::HP> )
387 CDSSTRESS_QUEUE_F( MoirQueue_DHP, cds::intrusive::msqueue::node<cds::gc::DHP> )
388 CDSSTRESS_QUEUE_F( MoirQueue_DHP_ic, cds::intrusive::msqueue::node<cds::gc::DHP> )
389 CDSSTRESS_QUEUE_F( MoirQueue_DHP_stat, cds::intrusive::msqueue::node<cds::gc::DHP> )
391 CDSSTRESS_QUEUE_F( OptimisticQueue_HP, cds::intrusive::optimistic_queue::node<cds::gc::HP> )
392 CDSSTRESS_QUEUE_F( OptimisticQueue_HP_ic, cds::intrusive::optimistic_queue::node<cds::gc::HP> )
393 CDSSTRESS_QUEUE_F( OptimisticQueue_HP_stat, cds::intrusive::optimistic_queue::node<cds::gc::HP> )
394 CDSSTRESS_QUEUE_F( OptimisticQueue_DHP, cds::intrusive::optimistic_queue::node<cds::gc::DHP> )
395 CDSSTRESS_QUEUE_F( OptimisticQueue_DHP_ic, cds::intrusive::optimistic_queue::node<cds::gc::DHP> )
396 CDSSTRESS_QUEUE_F( OptimisticQueue_DHP_stat, cds::intrusive::optimistic_queue::node<cds::gc::DHP> )
398 CDSSTRESS_QUEUE_F( BasketQueue_HP, cds::intrusive::basket_queue::node<cds::gc::HP> )
399 CDSSTRESS_QUEUE_F( BasketQueue_HP_ic, cds::intrusive::basket_queue::node<cds::gc::HP> )
400 CDSSTRESS_QUEUE_F( BasketQueue_HP_stat, cds::intrusive::basket_queue::node<cds::gc::HP> )
401 CDSSTRESS_QUEUE_F( BasketQueue_DHP, cds::intrusive::basket_queue::node<cds::gc::DHP> )
402 CDSSTRESS_QUEUE_F( BasketQueue_DHP_ic, cds::intrusive::basket_queue::node<cds::gc::DHP> )
403 CDSSTRESS_QUEUE_F( BasketQueue_DHP_stat, cds::intrusive::basket_queue::node<cds::gc::DHP> )
404 #undef CDSSTRESS_QUEUE_F
407 #define CDSSTRESS_QUEUE_F( QueueType, NodeType ) \
408 TEST_F( intrusive_queue_push_pop, QueueType ) \
410 typedef value_type<NodeType> node_type; \
411 typedef typename queue::Types< node_type >::QueueType queue_type; \
412 value_array<typename queue_type::value_type> arrValue( s_nQueueSize ); \
413 queue_type q( s_nFCCompactFactor, s_nFCPassCount ); \
414 test( q, arrValue, 0, 0 ); \
417 CDSSTRESS_QUEUE_F(FCQueue_list_delay2, boost::intrusive::list_base_hook<> )
418 CDSSTRESS_QUEUE_F(FCQueue_list_delay2_elimination, boost::intrusive::list_base_hook<> )
419 CDSSTRESS_QUEUE_F(FCQueue_list_delay2_elimination_stat, boost::intrusive::list_base_hook<> )
420 CDSSTRESS_QUEUE_F(FCQueue_list_expbackoff_elimination, boost::intrusive::list_base_hook<> )
421 CDSSTRESS_QUEUE_F(FCQueue_list_expbackoff_elimination_stat, boost::intrusive::list_base_hook<> )
422 CDSSTRESS_QUEUE_F(FCQueue_list_wait_ss, boost::intrusive::list_base_hook<> )
423 CDSSTRESS_QUEUE_F(FCQueue_list_wait_ss_stat, boost::intrusive::list_base_hook<> )
424 CDSSTRESS_QUEUE_F(FCQueue_list_wait_sm, boost::intrusive::list_base_hook<> )
425 CDSSTRESS_QUEUE_F(FCQueue_list_wait_sm_stat, boost::intrusive::list_base_hook<> )
426 CDSSTRESS_QUEUE_F(FCQueue_list_wait_mm, boost::intrusive::list_base_hook<> )
427 CDSSTRESS_QUEUE_F(FCQueue_list_wait_mm_stat, boost::intrusive::list_base_hook<> )
428 #undef CDSSTRESS_QUEUE_F
431 #define CDSSTRESS_QUEUE_F( QueueType ) \
432 TEST_F( intrusive_queue_push_pop, QueueType ) \
434 typedef typename queue::Types< value_type<> >::QueueType queue_type; \
435 value_array<typename queue_type::value_type> arrValue( s_nQueueSize ); \
436 queue_type q( s_nQueueSize ); \
437 test( q, arrValue, 0, 0 ); \
440 CDSSTRESS_QUEUE_F( TsigasCycleQueue_dyn )
441 CDSSTRESS_QUEUE_F( TsigasCycleQueue_dyn_ic )
442 CDSSTRESS_QUEUE_F( VyukovMPMCCycleQueue_dyn )
443 CDSSTRESS_QUEUE_F( VyukovMPMCCycleQueue_dyn_ic )
444 #undef CDSSTRESS_QUEUE_F
447 // ********************************************************************
448 // SegmentedQueue test
450 class intrusive_segmented_queue_push_pop
451 : public intrusive_queue_push_pop
452 , public ::testing::WithParamInterface< size_t >
454 typedef intrusive_queue_push_pop base_class;
457 template <typename Queue>
460 value_array<typename Queue::value_type> arrValue( s_nQueueSize ); \
462 size_t quasi_factor = GetParam();
464 Queue q( quasi_factor );
465 propout() << std::make_pair( "quasi_factor", quasi_factor );
467 base_class::test( q, arrValue, quasi_factor * 2, quasi_factor );
469 Queue::gc::force_dispose();
473 static std::vector< size_t > get_test_parameters()
475 cds_test::config const& cfg = cds_test::stress_fixture::get_config( "intrusive_queue_push_pop" );
476 bool bIterative = cfg.get_bool( "SegmentedQueue_Iterate", false );
477 size_t quasi_factor = cfg.get_size_t( "SegmentedQueue_SegmentSize", 256 );
479 std::vector<size_t> args;
480 if ( bIterative && quasi_factor > 4 ) {
481 for ( size_t qf = 4; qf <= quasi_factor; qf *= 2 )
482 args.push_back( qf );
485 if ( quasi_factor > 2 )
486 args.push_back( quasi_factor );
495 #define CDSSTRESS_QUEUE_F( type_name ) \
496 TEST_P( intrusive_segmented_queue_push_pop, type_name ) \
498 typedef typename queue::Types<value_type<>>::type_name queue_type; \
499 test< queue_type >(); \
502 CDSSTRESS_QUEUE_F( SegmentedQueue_HP_spin )
503 //CDSSTRESS_QUEUE_F( SegmentedQueue_HP_spin_padding )
504 CDSSTRESS_QUEUE_F( SegmentedQueue_HP_spin_stat )
505 CDSSTRESS_QUEUE_F( SegmentedQueue_HP_mutex )
506 //CDSSTRESS_QUEUE_F( SegmentedQueue_HP_mutex_padding )
507 CDSSTRESS_QUEUE_F( SegmentedQueue_HP_mutex_stat )
508 CDSSTRESS_QUEUE_F( SegmentedQueue_DHP_spin )
509 //CDSSTRESS_QUEUE_F( SegmentedQueue_DHP_spin_padding )
510 CDSSTRESS_QUEUE_F( SegmentedQueue_DHP_spin_stat )
511 CDSSTRESS_QUEUE_F( SegmentedQueue_DHP_mutex )
512 //CDSSTRESS_QUEUE_F( SegmentedQueue_DHP_mutex_padding )
513 CDSSTRESS_QUEUE_F( SegmentedQueue_DHP_mutex_stat )
515 INSTANTIATE_TEST_CASE_P( SQ,
516 intrusive_segmented_queue_push_pop,
517 ::testing::ValuesIn( intrusive_segmented_queue_push_pop::get_test_parameters() ) );