2 This file is a part of libcds - Concurrent Data Structures library
4 (C) Copyright Maxim Khizhinsky (libcds.dev@gmail.com) 2006-2017
6 Source code repo: http://github.com/khizmax/libcds/
7 Download: http://sourceforge.net/projects/libcds/files/
9 Redistribution and use in source and binary forms, with or without
10 modification, are permitted provided that the following conditions are met:
12 * Redistributions of source code must retain the above copyright notice, this
13 list of conditions and the following disclaimer.
15 * Redistributions in binary form must reproduce the above copyright notice,
16 this list of conditions and the following disclaimer in the documentation
17 and/or other materials provided with the distribution.
19 THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
20 AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
21 IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
22 DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE
23 FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
24 DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
25 SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
26 CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,
27 OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
28 OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
31 #include "intrusive_queue_type.h"
35 // Multi-threaded random queue test
38 static size_t s_nReaderThreadCount = 4;
39 static size_t s_nWriterThreadCount = 4;
40 static size_t s_nQueueSize = 4000000;
42 static unsigned int s_nFCPassCount = 8;
43 static unsigned int s_nFCCompactFactor = 64;
45 static atomics::atomic< size_t > s_nProducerCount(0);
46 static size_t s_nThreadPushCount;
47 static CDS_CONSTEXPR const size_t c_nBadConsumer = 0xbadc0ffe;
51 template <typename Base = empty >
52 struct value_type: public Base
59 class intrusive_queue_push_pop: public cds_test::stress_fixture
61 typedef cds_test::stress_fixture base_class;
69 template <class Queue>
70 class Producer: public cds_test::thread
72 typedef cds_test::thread base_class;
75 Producer( cds_test::thread_pool& pool, Queue& q )
76 : base_class( pool, producer_thread )
79 Producer( Producer& src )
81 , m_Queue( src.m_Queue )
84 virtual thread * clone()
86 return new Producer( *this );
92 for ( typename Queue::value_type * p = m_pStart; p < m_pEnd; ) {
95 CDS_TSAN_ANNOTATE_HAPPENS_BEFORE( &p->nWriterNo );
96 if ( m_Queue.push( *p )) {
103 s_nProducerCount.fetch_sub( 1, atomics::memory_order_release );
108 size_t m_nPushFailed = 0;
110 // Interval in m_arrValue
111 typename Queue::value_type * m_pStart;
112 typename Queue::value_type * m_pEnd;
115 template <class Queue>
116 class Consumer: public cds_test::thread
118 typedef cds_test::thread base_class;
122 size_t m_nPopEmpty = 0;
123 size_t m_nPopped = 0;
124 size_t m_nBadWriter = 0;
126 typedef std::vector<size_t> TPoppedData;
127 typedef std::vector<size_t>::iterator data_iterator;
128 typedef std::vector<size_t>::const_iterator const_data_iterator;
130 std::vector<TPoppedData> m_WriterData;
133 void initPoppedData()
135 const size_t nWriterCount = s_nWriterThreadCount;
136 const size_t nWriterPushCount = s_nThreadPushCount;
137 m_WriterData.resize( nWriterCount );
138 for ( size_t i = 0; i < nWriterCount; ++i )
139 m_WriterData[i].reserve( nWriterPushCount );
143 Consumer( cds_test::thread_pool& pool, Queue& q )
144 : base_class( pool, consumer_thread )
149 Consumer( Consumer& src )
151 , m_Queue( src.m_Queue )
156 virtual thread * clone()
158 return new Consumer( *this );
163 size_t const nTotalWriters = s_nWriterThreadCount;
166 typename Queue::value_type * p = m_Queue.pop();
170 CDS_TSAN_ANNOTATE_HAPPENS_AFTER( &p->nWriterNo );
171 if ( p->nWriterNo < nTotalWriters )
172 m_WriterData[ p->nWriterNo ].push_back( p->nNo );
178 if ( s_nProducerCount.load( atomics::memory_order_acquire ) == 0 && m_Queue.empty())
185 template <typename T>
188 std::unique_ptr<T[]> m_pArr;
190 value_array( size_t nSize )
191 : m_pArr( new T[nSize] )
194 T * get() const { return m_pArr.get(); }
198 static void SetUpTestCase()
200 cds_test::config const& cfg = get_config( "queue_random" );
202 s_nReaderThreadCount = cfg.get_size_t( "ReaderCount", s_nReaderThreadCount );
203 s_nWriterThreadCount = cfg.get_size_t( "WriterCount", s_nWriterThreadCount );
204 s_nQueueSize = cfg.get_size_t( "QueueSize", s_nQueueSize );
206 s_nFCPassCount = cfg.get_uint( "FCPassCount", s_nFCPassCount );
207 s_nFCCompactFactor = cfg.get_uint( "FCCompactFactor", s_nFCCompactFactor );
209 if ( s_nReaderThreadCount == 0u )
210 s_nReaderThreadCount = 1;
211 if ( s_nWriterThreadCount == 0u )
212 s_nWriterThreadCount = 1;
213 if ( s_nQueueSize == 0u )
217 //static void TearDownTestCase();
220 template <class Queue>
221 void analyze( Queue& testQueue, size_t /*nLeftOffset*/, size_t nRightOffset )
223 typedef Consumer<Queue> Reader;
224 typedef typename Reader::const_data_iterator ReaderIterator;
226 size_t nPostTestPops = 0;
227 while ( testQueue.pop())
230 size_t nTotalPops = 0;
231 size_t nPopFalse = 0;
232 size_t nPoppedItems = 0;
233 size_t nPushFailed = 0;
235 std::vector< Reader * > arrReaders;
237 cds_test::thread_pool& pool = get_pool();
238 for ( size_t i = 0; i < pool.size(); ++i ) {
239 cds_test::thread& thr = pool.get( i );
240 if ( thr.type() == consumer_thread ) {
241 Consumer<Queue>& consumer = static_cast<Consumer<Queue>&>( thr );
242 nTotalPops += consumer.m_nPopped;
243 nPopFalse += consumer.m_nPopEmpty;
244 arrReaders.push_back( &consumer );
245 EXPECT_EQ( consumer.m_nBadWriter, 0u ) << "consumer " << (i - s_nWriterThreadCount);
248 for ( size_t n = 0; n < s_nWriterThreadCount; ++n )
249 nPopped += consumer.m_WriterData[n].size();
253 s << "consumer" << (i - s_nWriterThreadCount) << "_popped";
254 propout() << std::make_pair( s.str().c_str(), nPopped );
256 nPoppedItems += nPopped;
259 Producer<Queue>& producer = static_cast<Producer<Queue>&>( thr );
260 nPushFailed += producer.m_nPushFailed;
261 if ( !std::is_base_of<cds::bounded_container, Queue>::value ) {
262 EXPECT_EQ( producer.m_nPushFailed, 0u ) << "producer " << i;
266 EXPECT_EQ( nTotalPops, nPoppedItems );
268 propout() << std::make_pair( "success_pop", nTotalPops )
269 << std::make_pair( "empty_pop", nPopFalse )
270 << std::make_pair( "failed_push", nPushFailed );
272 size_t nQueueSize = s_nThreadPushCount * s_nWriterThreadCount;
273 EXPECT_EQ( nTotalPops + nPostTestPops, nQueueSize );
274 EXPECT_TRUE( testQueue.empty());
276 // Test that all items have been popped
278 for ( size_t nWriter = 0; nWriter < s_nWriterThreadCount; ++nWriter ) {
279 std::vector<size_t> arrData;
280 arrData.reserve( s_nThreadPushCount );
281 for ( size_t nReader = 0; nReader < arrReaders.size(); ++nReader ) {
282 ReaderIterator it = arrReaders[nReader]->m_WriterData[nWriter].begin();
283 ReaderIterator itEnd = arrReaders[nReader]->m_WriterData[nWriter].end();
285 ReaderIterator itPrev = it;
286 for ( ++it; it != itEnd; ++it ) {
287 EXPECT_LT( *itPrev, *it + nRightOffset )
288 << "Reader " << nReader << ", Writer " << nWriter << ": prev=" << *itPrev << ", cur=" << *it;
293 for ( it = arrReaders[nReader]->m_WriterData[nWriter].begin(); it != itEnd; ++it )
294 arrData.push_back( *it );
296 std::sort( arrData.begin(), arrData.end());
297 for ( size_t i=1; i < arrData.size(); ++i ) {
298 if ( arrData[i-1] + 1 != arrData[i] ) {
299 EXPECT_EQ( arrData[i-1] + 1, arrData[i] ) << "Writer " << nWriter << ": [" << (i-1) << "]=" << arrData[i-1]
300 << ", [" << i << "]=" << arrData[i];
304 EXPECT_EQ( arrData[0], 0u ) << "Writer " << nWriter;
305 EXPECT_EQ( arrData[arrData.size() - 1], s_nThreadPushCount - 1 ) << "Writer " << nWriter;
309 template <class Queue>
310 void test( Queue& q, value_array<typename Queue::value_type>& arrValue, size_t nLeftOffset, size_t nRightOffset )
312 s_nThreadPushCount = s_nQueueSize / s_nWriterThreadCount;
313 s_nQueueSize = s_nThreadPushCount * s_nWriterThreadCount;
314 propout() << std::make_pair( "producer_count", s_nWriterThreadCount )
315 << std::make_pair( "consumer_count", s_nReaderThreadCount )
316 << std::make_pair( "queue_size", s_nQueueSize );
318 typename Queue::value_type * pValStart = arrValue.get();
319 typename Queue::value_type * pValEnd = pValStart + s_nQueueSize;
321 cds_test::thread_pool& pool = get_pool();
322 s_nProducerCount.store( s_nWriterThreadCount, atomics::memory_order_release );
324 // Writers must be first
325 pool.add( new Producer<Queue>( pool, q ), s_nWriterThreadCount );
327 for ( typename Queue::value_type * it = pValStart; it != pValEnd; ++it ) {
330 it->nConsumer = c_nBadConsumer;
333 typename Queue::value_type * pStart = pValStart;
334 for ( size_t i = 0; i < pool.size(); ++i ) {
335 Producer<Queue>& producer = static_cast<Producer<Queue>&>( pool.get( i ));
336 producer.m_pStart = pStart;
337 pStart += s_nThreadPushCount;
338 producer.m_pEnd = pStart;
341 pool.add( new Consumer<Queue>( pool, q ), s_nReaderThreadCount );
343 std::chrono::milliseconds duration = pool.run();
344 propout() << std::make_pair( "duration", duration );
346 // Check that all values have been dequeued
348 size_t nBadConsumerCount = 0;
349 typename Queue::value_type * pEnd = pValStart + s_nQueueSize;
350 for ( typename Queue::value_type * it = pValStart; it != pEnd; ++it ) {
351 if ( it->nConsumer == c_nBadConsumer )
354 EXPECT_EQ( nBadConsumerCount, 0u );
357 analyze( q, nLeftOffset, nRightOffset );
359 propout() << q.statistics();
363 #define CDSSTRESS_QUEUE_F( QueueType, NodeType ) \
364 TEST_F( intrusive_queue_push_pop, QueueType ) \
366 typedef value_type<NodeType> node_type; \
367 typedef typename queue::Types< node_type >::QueueType queue_type; \
368 value_array<typename queue_type::value_type> arrValue( s_nQueueSize ); \
371 test( q, arrValue, 0, 0 ); \
373 queue_type::gc::force_dispose(); \
376 CDSSTRESS_QUEUE_F( MSQueue_HP, cds::intrusive::msqueue::node<cds::gc::HP> )
377 CDSSTRESS_QUEUE_F( MSQueue_HP_ic, cds::intrusive::msqueue::node<cds::gc::HP> )
378 CDSSTRESS_QUEUE_F( MSQueue_HP_stat, cds::intrusive::msqueue::node<cds::gc::HP> )
379 CDSSTRESS_QUEUE_F( MSQueue_DHP, cds::intrusive::msqueue::node<cds::gc::DHP> )
380 CDSSTRESS_QUEUE_F( MSQueue_DHP_ic, cds::intrusive::msqueue::node<cds::gc::DHP> )
381 CDSSTRESS_QUEUE_F( MSQueue_DHP_stat, cds::intrusive::msqueue::node<cds::gc::DHP> )
383 CDSSTRESS_QUEUE_F( MoirQueue_HP, cds::intrusive::msqueue::node<cds::gc::HP> )
384 CDSSTRESS_QUEUE_F( MoirQueue_HP_ic, cds::intrusive::msqueue::node<cds::gc::HP> )
385 CDSSTRESS_QUEUE_F( MoirQueue_HP_stat, cds::intrusive::msqueue::node<cds::gc::HP> )
386 CDSSTRESS_QUEUE_F( MoirQueue_DHP, cds::intrusive::msqueue::node<cds::gc::DHP> )
387 CDSSTRESS_QUEUE_F( MoirQueue_DHP_ic, cds::intrusive::msqueue::node<cds::gc::DHP> )
388 CDSSTRESS_QUEUE_F( MoirQueue_DHP_stat, cds::intrusive::msqueue::node<cds::gc::DHP> )
390 CDSSTRESS_QUEUE_F( OptimisticQueue_HP, cds::intrusive::optimistic_queue::node<cds::gc::HP> )
391 CDSSTRESS_QUEUE_F( OptimisticQueue_HP_ic, cds::intrusive::optimistic_queue::node<cds::gc::HP> )
392 CDSSTRESS_QUEUE_F( OptimisticQueue_HP_stat, cds::intrusive::optimistic_queue::node<cds::gc::HP> )
393 CDSSTRESS_QUEUE_F( OptimisticQueue_DHP, cds::intrusive::optimistic_queue::node<cds::gc::DHP> )
394 CDSSTRESS_QUEUE_F( OptimisticQueue_DHP_ic, cds::intrusive::optimistic_queue::node<cds::gc::DHP> )
395 CDSSTRESS_QUEUE_F( OptimisticQueue_DHP_stat, cds::intrusive::optimistic_queue::node<cds::gc::DHP> )
397 CDSSTRESS_QUEUE_F( BasketQueue_HP, cds::intrusive::basket_queue::node<cds::gc::HP> )
398 CDSSTRESS_QUEUE_F( BasketQueue_HP_ic, cds::intrusive::basket_queue::node<cds::gc::HP> )
399 CDSSTRESS_QUEUE_F( BasketQueue_HP_stat, cds::intrusive::basket_queue::node<cds::gc::HP> )
400 CDSSTRESS_QUEUE_F( BasketQueue_DHP, cds::intrusive::basket_queue::node<cds::gc::DHP> )
401 CDSSTRESS_QUEUE_F( BasketQueue_DHP_ic, cds::intrusive::basket_queue::node<cds::gc::DHP> )
402 CDSSTRESS_QUEUE_F( BasketQueue_DHP_stat, cds::intrusive::basket_queue::node<cds::gc::DHP> )
403 #undef CDSSTRESS_QUEUE_F
406 #define CDSSTRESS_QUEUE_F( QueueType, NodeType ) \
407 TEST_F( intrusive_queue_push_pop, QueueType ) \
409 typedef value_type<NodeType> node_type; \
410 typedef typename queue::Types< node_type >::QueueType queue_type; \
411 value_array<typename queue_type::value_type> arrValue( s_nQueueSize ); \
412 queue_type q( s_nFCCompactFactor, s_nFCPassCount ); \
413 test( q, arrValue, 0, 0 ); \
416 CDSSTRESS_QUEUE_F(FCQueue_list_delay2, boost::intrusive::list_base_hook<> )
417 CDSSTRESS_QUEUE_F(FCQueue_list_delay2_elimination, boost::intrusive::list_base_hook<> )
418 CDSSTRESS_QUEUE_F(FCQueue_list_delay2_elimination_stat, boost::intrusive::list_base_hook<> )
419 CDSSTRESS_QUEUE_F(FCQueue_list_expbackoff_elimination, boost::intrusive::list_base_hook<> )
420 CDSSTRESS_QUEUE_F(FCQueue_list_expbackoff_elimination_stat, boost::intrusive::list_base_hook<> )
421 CDSSTRESS_QUEUE_F(FCQueue_list_wait_ss, boost::intrusive::list_base_hook<> )
422 CDSSTRESS_QUEUE_F(FCQueue_list_wait_ss_stat, boost::intrusive::list_base_hook<> )
423 CDSSTRESS_QUEUE_F(FCQueue_list_wait_sm, boost::intrusive::list_base_hook<> )
424 CDSSTRESS_QUEUE_F(FCQueue_list_wait_sm_stat, boost::intrusive::list_base_hook<> )
425 CDSSTRESS_QUEUE_F(FCQueue_list_wait_mm, boost::intrusive::list_base_hook<> )
426 CDSSTRESS_QUEUE_F(FCQueue_list_wait_mm_stat, boost::intrusive::list_base_hook<> )
427 #undef CDSSTRESS_QUEUE_F
430 #define CDSSTRESS_QUEUE_F( QueueType ) \
431 TEST_F( intrusive_queue_push_pop, QueueType ) \
433 typedef typename queue::Types< value_type<> >::QueueType queue_type; \
434 value_array<typename queue_type::value_type> arrValue( s_nQueueSize ); \
435 queue_type q( s_nQueueSize ); \
436 test( q, arrValue, 0, 0 ); \
439 CDSSTRESS_QUEUE_F( VyukovMPMCCycleQueue_dyn )
440 CDSSTRESS_QUEUE_F( VyukovMPMCCycleQueue_dyn_ic )
441 #undef CDSSTRESS_QUEUE_F
444 // ********************************************************************
445 // SegmentedQueue test
447 class intrusive_segmented_queue_push_pop
448 : public intrusive_queue_push_pop
449 , public ::testing::WithParamInterface< size_t >
451 typedef intrusive_queue_push_pop base_class;
454 template <typename Queue>
457 value_array<typename Queue::value_type> arrValue( s_nQueueSize ); \
459 size_t quasi_factor = GetParam();
461 Queue q( quasi_factor );
462 propout() << std::make_pair( "quasi_factor", quasi_factor );
464 base_class::test( q, arrValue, quasi_factor * 2, quasi_factor );
466 Queue::gc::force_dispose();
470 static std::vector< size_t > get_test_parameters()
472 cds_test::config const& cfg = cds_test::stress_fixture::get_config( "intrusive_queue_push_pop" );
473 bool bIterative = cfg.get_bool( "SegmentedQueue_Iterate", false );
474 size_t quasi_factor = cfg.get_size_t( "SegmentedQueue_SegmentSize", 256 );
476 std::vector<size_t> args;
477 if ( bIterative && quasi_factor > 4 ) {
478 for ( size_t qf = 4; qf <= quasi_factor; qf *= 2 )
479 args.push_back( qf );
482 if ( quasi_factor > 2 )
483 args.push_back( quasi_factor );
492 #define CDSSTRESS_QUEUE_F( type_name ) \
493 TEST_P( intrusive_segmented_queue_push_pop, type_name ) \
495 typedef typename queue::Types<value_type<>>::type_name queue_type; \
496 test< queue_type >(); \
499 CDSSTRESS_QUEUE_F( SegmentedQueue_HP_spin )
500 //CDSSTRESS_QUEUE_F( SegmentedQueue_HP_spin_padding )
501 CDSSTRESS_QUEUE_F( SegmentedQueue_HP_spin_stat )
502 CDSSTRESS_QUEUE_F( SegmentedQueue_HP_mutex )
503 //CDSSTRESS_QUEUE_F( SegmentedQueue_HP_mutex_padding )
504 CDSSTRESS_QUEUE_F( SegmentedQueue_HP_mutex_stat )
505 CDSSTRESS_QUEUE_F( SegmentedQueue_DHP_spin )
506 //CDSSTRESS_QUEUE_F( SegmentedQueue_DHP_spin_padding )
507 CDSSTRESS_QUEUE_F( SegmentedQueue_DHP_spin_stat )
508 CDSSTRESS_QUEUE_F( SegmentedQueue_DHP_mutex )
509 //CDSSTRESS_QUEUE_F( SegmentedQueue_DHP_mutex_padding )
510 CDSSTRESS_QUEUE_F( SegmentedQueue_DHP_mutex_stat )
513 #ifdef CDSTEST_GTEST_INSTANTIATE_TEST_CASE_P_HAS_4TH_ARG
514 static std::string get_test_parameter_name( testing::TestParamInfo<size_t> const& p )
516 return std::to_string( p.param );
519 INSTANTIATE_TEST_CASE_P( SQ,
520 intrusive_segmented_queue_push_pop,
521 ::testing::ValuesIn( intrusive_segmented_queue_push_pop::get_test_parameters() ), get_test_parameter_name );
523 INSTANTIATE_TEST_CASE_P( SQ,
524 intrusive_segmented_queue_push_pop,
525 ::testing::ValuesIn( intrusive_segmented_queue_push_pop::get_test_parameters() ) );