2 This file is a part of libcds - Concurrent Data Structures library
4 (C) Copyright Maxim Khizhinsky (libcds.dev@gmail.com) 2006-2017
6 Source code repo: http://github.com/khizmax/libcds/
7 Download: http://sourceforge.net/projects/libcds/files/
9 Redistribution and use in source and binary forms, with or without
10 modification, are permitted provided that the following conditions are met:
12 * Redistributions of source code must retain the above copyright notice, this
13 list of conditions and the following disclaimer.
15 * Redistributions in binary form must reproduce the above copyright notice,
16 this list of conditions and the following disclaimer in the documentation
17 and/or other materials provided with the distribution.
19 THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
20 AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
21 IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
22 DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE
23 FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
24 DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
25 SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
26 CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,
27 OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
28 OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
31 #include "queue_type.h"
35 #include <type_traits>
37 // Multi-threaded queue push/pop test
40 static size_t s_nConsumerThreadCount = 4;
41 static size_t s_nProducerThreadCount = 4;
42 static size_t s_nQueueSize = 4000000;
43 static size_t s_nMSQueueSize = 4000000;
44 static size_t s_nMoirQueueSize = 4000000;
45 static size_t s_nBasketQueueSize = 4000000;
46 static size_t s_nOptimisticQueueSize = 4000000;
47 static size_t s_nRWQueueSize = 4000000;
48 static size_t s_nSegmentedQueueSize = 400000;
49 static size_t s_nVyukovQueueSize = 40000;
50 static size_t s_nHeavyValueSize = 100;
52 static std::atomic<size_t> s_nProducerDone( 0 );
60 template<class Value = old_value>
61 class queue_push_pop: public cds_test::stress_fixture
64 using value_type = Value;
71 template <class Queue>
72 class Producer: public cds_test::thread
74 typedef cds_test::thread base_class;
77 Producer( cds_test::thread_pool& pool, Queue& queue, size_t nPushCount )
78 : base_class( pool, producer_thread )
81 , m_nPushCount( nPushCount )
84 Producer( Producer& src )
86 , m_Queue( src.m_Queue )
88 , m_nPushCount( src.m_nPushCount )
91 virtual thread * clone()
93 return new Producer( *this );
98 size_t const nPushCount = m_nPushCount;
104 while ( v.nNo < nPushCount ) {
105 if ( m_Queue.push( v ))
111 s_nProducerDone.fetch_add( 1 );
116 size_t m_nPushFailed;
117 size_t const m_nPushCount;
120 template <class Queue>
121 class Consumer: public cds_test::thread
123 typedef cds_test::thread base_class;
127 size_t const m_nPushPerProducer;
132 typedef std::vector<size_t> popped_data;
133 typedef std::vector<size_t>::iterator data_iterator;
134 typedef std::vector<size_t>::const_iterator const_data_iterator;
136 std::vector<popped_data> m_WriterData;
139 void initPoppedData()
141 const size_t nProducerCount = s_nProducerThreadCount;
142 m_WriterData.resize( nProducerCount );
143 for ( size_t i = 0; i < nProducerCount; ++i )
144 m_WriterData[i].reserve( m_nPushPerProducer );
148 Consumer( cds_test::thread_pool& pool, Queue& queue, size_t nPushPerProducer )
149 : base_class( pool, consumer_thread )
151 , m_nPushPerProducer( nPushPerProducer )
158 Consumer( Consumer& src )
160 , m_Queue( src.m_Queue )
161 , m_nPushPerProducer( src.m_nPushPerProducer )
169 virtual thread * clone()
171 return new Consumer( *this );
179 const size_t nTotalWriters = s_nProducerThreadCount;
182 if ( m_Queue.pop( v )) {
184 if ( v.nWriterNo < nTotalWriters )
185 m_WriterData[ v.nWriterNo ].push_back( v.nNo );
192 if ( s_nProducerDone.load() >= nTotalWriters ) {
193 if ( m_Queue.empty())
202 size_t m_nThreadPushCount;
205 template <class Queue>
206 void analyze( Queue& q, size_t /*nLeftOffset*/ = 0, size_t nRightOffset = 0 )
208 cds_test::thread_pool& pool = get_pool();
210 typedef Consumer<Queue> consumer_type;
211 typedef Producer<Queue> producer_type;
213 size_t nPostTestPops = 0;
220 size_t nTotalPops = 0;
221 size_t nPopFalse = 0;
222 size_t nPoppedItems = 0;
223 size_t nPushFailed = 0;
225 std::vector< consumer_type * > arrConsumer;
227 for ( size_t i = 0; i < pool.size(); ++i ) {
228 cds_test::thread& thr = pool.get(i);
229 if ( thr.type() == consumer_thread ) {
230 consumer_type& consumer = static_cast<consumer_type&>( thr );
231 nTotalPops += consumer.m_nPopped;
232 nPopFalse += consumer.m_nPopEmpty;
233 arrConsumer.push_back( &consumer );
234 EXPECT_EQ( consumer.m_nBadWriter, 0u ) << "consumer_thread_no " << i;
237 for ( size_t n = 0; n < s_nProducerThreadCount; ++n )
238 nPopped += consumer.m_WriterData[n].size();
240 nPoppedItems += nPopped;
243 assert( thr.type() == producer_thread );
245 producer_type& producer = static_cast<producer_type&>( thr );
246 nPushFailed += producer.m_nPushFailed;
247 EXPECT_EQ( producer.m_nPushFailed, 0u ) << "producer_thread_no " << i;
250 EXPECT_EQ( nTotalPops, nPoppedItems );
252 EXPECT_EQ( nTotalPops + nPostTestPops, s_nQueueSize ) << "nTotalPops=" << nTotalPops << ", nPostTestPops=" << nPostTestPops;
253 EXPECT_TRUE( q.empty());
255 // Test consistency of popped sequence
256 for ( size_t nWriter = 0; nWriter < s_nProducerThreadCount; ++nWriter ) {
257 std::vector<size_t> arrData;
258 arrData.reserve( m_nThreadPushCount );
259 for ( size_t nReader = 0; nReader < arrConsumer.size(); ++nReader ) {
260 auto it = arrConsumer[nReader]->m_WriterData[nWriter].begin();
261 auto itEnd = arrConsumer[nReader]->m_WriterData[nWriter].end();
264 for ( ++it; it != itEnd; ++it ) {
265 EXPECT_LT( *itPrev, *it + nRightOffset ) << "consumer=" << nReader << ", producer=" << nWriter;
270 for ( it = arrConsumer[nReader]->m_WriterData[nWriter].begin(); it != itEnd; ++it )
271 arrData.push_back( *it );
274 std::sort( arrData.begin(), arrData.end());
275 for ( size_t i=1; i < arrData.size(); ++i ) {
276 EXPECT_EQ( arrData[i - 1] + 1, arrData[i] ) << "producer=" << nWriter;
279 EXPECT_EQ( arrData[0], 0u ) << "producer=" << nWriter;
280 EXPECT_EQ( arrData[arrData.size() - 1], m_nThreadPushCount - 1 ) << "producer=" << nWriter;
284 template <class Queue>
285 void test_queue( Queue& q )
287 m_nThreadPushCount = s_nQueueSize / s_nProducerThreadCount;
289 cds_test::thread_pool& pool = get_pool();
290 pool.add( new Producer<Queue>( pool, q, m_nThreadPushCount ), s_nProducerThreadCount );
291 pool.add( new Consumer<Queue>( pool, q, m_nThreadPushCount ), s_nConsumerThreadCount );
293 s_nProducerDone.store( 0 );
294 s_nQueueSize = m_nThreadPushCount * s_nProducerThreadCount;
296 propout() << std::make_pair( "producer_count", s_nProducerThreadCount )
297 << std::make_pair( "consumer_count", s_nConsumerThreadCount )
298 << std::make_pair( "push_count", s_nQueueSize );
300 std::chrono::milliseconds duration = pool.run();
302 propout() << std::make_pair( "duration", duration );
305 template <class Queue>
306 void test( Queue& q )
310 propout() << q.statistics();
314 static void set_array_size( size_t size ) {
315 const bool tmp = fc_test::has_set_array_size<value_type>::value;
316 set_array_size(size, std::integral_constant<bool, tmp>());
319 static void set_array_size(size_t size, std::true_type){
320 value_type::set_array_size(size);
323 static void set_array_size(size_t, std::false_type)
328 static void SetUpTestCase()
330 cds_test::config const& cfg = get_config( "queue_push_pop" );
332 s_nConsumerThreadCount = cfg.get_size_t( "ConsumerCount", s_nConsumerThreadCount );
333 s_nProducerThreadCount = cfg.get_size_t( "ProducerCount", s_nProducerThreadCount );
334 s_nQueueSize = cfg.get_size_t( "QueueSize", s_nQueueSize );
336 s_nMSQueueSize = cfg.get_size_t( "MSQueueSize", s_nMSQueueSize );
337 s_nMoirQueueSize = cfg.get_size_t( "MoirQueueSize", s_nMoirQueueSize );
338 s_nBasketQueueSize = cfg.get_size_t( "BasketQueueSize", s_nBasketQueueSize );
339 s_nOptimisticQueueSize = cfg.get_size_t( "OptimisticQueueSize", s_nOptimisticQueueSize );
340 s_nRWQueueSize = cfg.get_size_t( "RWQueueSize", s_nRWQueueSize );
342 s_nVyukovQueueSize = cfg.get_size_t( "VyukovQueueSize", s_nVyukovQueueSize );
343 s_nSegmentedQueueSize = cfg.get_size_t( "SegmentedQueueSize", s_nSegmentedQueueSize );
344 s_nHeavyValueSize = cfg.get_size_t( "HeavyValueSize", s_nHeavyValueSize );
346 if ( s_nConsumerThreadCount == 0u )
347 s_nConsumerThreadCount = 1;
348 if ( s_nProducerThreadCount == 0u )
349 s_nProducerThreadCount = 1;
350 if ( s_nQueueSize == 0u )
352 if ( s_nHeavyValueSize == 0 )
353 s_nHeavyValueSize = 1;
355 set_array_size( s_nHeavyValueSize );
358 //static void TearDownTestCase();
361 using fc_with_heavy_value = queue_push_pop< fc_test::heavy_value<36000> >;
362 using simple_queue_push_pop = queue_push_pop<>;
364 #undef CDSSTRESS_Queue_F
365 #define CDSSTRESS_Queue_F( test_fixture, type_name ) \
366 TEST_F( test_fixture, type_name ) \
368 typedef queue::Types< value_type >::type_name queue_type; \
370 s_nQueueSize = s_nMSQueueSize; \
374 CDSSTRESS_MSQueue( simple_queue_push_pop )
376 #undef CDSSTRESS_Queue_F
377 #define CDSSTRESS_Queue_F( test_fixture, type_name ) \
378 TEST_F( test_fixture, type_name ) \
380 typedef queue::Types< value_type >::type_name queue_type; \
382 s_nQueueSize = s_nMoirQueueSize; \
385 CDSSTRESS_MoirQueue( simple_queue_push_pop )
387 #undef CDSSTRESS_Queue_F
388 #define CDSSTRESS_Queue_F( test_fixture, type_name ) \
389 TEST_F( test_fixture, type_name ) \
391 typedef queue::Types< value_type >::type_name queue_type; \
393 s_nQueueSize = s_nBasketQueueSize; \
396 CDSSTRESS_BasketQueue( simple_queue_push_pop )
398 #undef CDSSTRESS_Queue_F
399 #define CDSSTRESS_Queue_F( test_fixture, type_name ) \
400 TEST_F( test_fixture, type_name ) \
402 typedef queue::Types< value_type >::type_name queue_type; \
404 s_nQueueSize = s_nOptimisticQueueSize; \
407 CDSSTRESS_OptimsticQueue( simple_queue_push_pop )
409 #undef CDSSTRESS_Queue_F
410 #define CDSSTRESS_Queue_F( test_fixture, type_name ) \
411 TEST_F( test_fixture, type_name ) \
413 typedef queue::Types< value_type >::type_name queue_type; \
415 s_nQueueSize = s_nRWQueueSize; \
418 CDSSTRESS_RWQueue( simple_queue_push_pop )
420 #undef CDSSTRESS_Queue_F
421 #define CDSSTRESS_Queue_F( test_fixture, type_name ) \
422 TEST_F( test_fixture, type_name ) \
424 size_t old_queue_size = s_nQueueSize; \
425 s_nQueueSize = s_nVyukovQueueSize; \
426 typedef queue::Types< value_type >::type_name queue_type; \
427 queue_type queue( s_nQueueSize ); \
429 s_nQueueSize = old_queue_size; \
432 //CDSSTRESS_VyukovQueue( simple_queue_push_pop )
434 // ********************************************************************
435 // SegmentedQueue test
437 class segmented_queue_push_pop
438 : public queue_push_pop<>
439 , public ::testing::WithParamInterface< size_t >
441 typedef queue_push_pop<> base_class;
445 template <typename Queue>
448 size_t quasi_factor = GetParam();
450 Queue q( quasi_factor );
451 propout() << std::make_pair( "quasi_factor", quasi_factor );
452 base_class::test_queue( q );
453 analyze( q, quasi_factor * 2, quasi_factor );
454 propout() << q.statistics();
458 static std::vector< size_t > get_test_parameters()
460 cds_test::config const& cfg = cds_test::stress_fixture::get_config( "queue_push_pop" );
461 bool bIterative = cfg.get_bool( "SegmentedQueue_Iterate", false );
462 size_t quasi_factor = cfg.get_size_t( "SegmentedQueue_SegmentSize", 256 );
464 std::vector<size_t> args;
465 if ( bIterative && quasi_factor > 4 ) {
466 for ( size_t qf = 4; qf <= quasi_factor; qf *= 2 )
467 args.push_back( qf );
469 if ( quasi_factor > 2 )
470 args.push_back( quasi_factor );
479 #undef CDSSTRESS_Queue_F
480 #define CDSSTRESS_Queue_F( test_fixture, type_name ) \
481 TEST_P( test_fixture, type_name ) \
483 typedef typename queue::Types<value_type>::type_name queue_type; \
484 s_nQueueSize = s_nSegmentedQueueSize; \
485 test< queue_type >(); \
488 CDSSTRESS_SegmentedQueue( segmented_queue_push_pop )
490 #ifdef CDSTEST_GTEST_INSTANTIATE_TEST_CASE_P_HAS_4TH_ARG
491 static std::string get_test_parameter_name( testing::TestParamInfo<size_t> const& p )
493 return std::to_string( p.param );
495 INSTANTIATE_TEST_CASE_P( SQ,
496 segmented_queue_push_pop,
497 ::testing::ValuesIn( segmented_queue_push_pop::get_test_parameters()), get_test_parameter_name );
499 INSTANTIATE_TEST_CASE_P( SQ,
500 segmented_queue_push_pop,
501 ::testing::ValuesIn( segmented_queue_push_pop::get_test_parameters()));