2 This file is a part of libcds - Concurrent Data Structures library
4 (C) Copyright Maxim Khizhinsky (libcds.dev@gmail.com) 2006-2017
6 Source code repo: http://github.com/khizmax/libcds/
7 Download: http://sourceforge.net/projects/libcds/files/
9 Redistribution and use in source and binary forms, with or without
10 modification, are permitted provided that the following conditions are met:
12 * Redistributions of source code must retain the above copyright notice, this
13 list of conditions and the following disclaimer.
15 * Redistributions in binary form must reproduce the above copyright notice,
16 this list of conditions and the following disclaimer in the documentation
17 and/or other materials provided with the distribution.
19 THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
20 AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
21 IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
22 DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE
23 FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
24 DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
25 SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
26 CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,
27 OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
28 OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
31 #include "queue_type.h"
35 #include <type_traits>
37 // Multi-threaded queue push/pop test
40 static size_t s_nConsumerThreadCount = 4;
41 static size_t s_nProducerThreadCount = 4;
42 static size_t s_nQueueSize = 4000000;
43 static size_t s_nHeavyValueSize = 100;
45 static std::atomic<size_t> s_nProducerDone( 0 );
53 template<class Value = old_value>
54 class queue_push_pop: public cds_test::stress_fixture
57 using value_type = Value;
64 template <class Queue>
65 class Producer: public cds_test::thread
67 typedef cds_test::thread base_class;
70 Producer( cds_test::thread_pool& pool, Queue& queue, size_t nPushCount )
71 : base_class( pool, producer_thread )
74 , m_nPushCount( nPushCount )
77 Producer( Producer& src )
79 , m_Queue( src.m_Queue )
81 , m_nPushCount( src.m_nPushCount )
84 virtual thread * clone()
86 return new Producer( *this );
91 size_t const nPushCount = m_nPushCount;
97 while ( v.nNo < nPushCount ) {
98 if ( m_Queue.push( v ))
104 s_nProducerDone.fetch_add( 1 );
109 size_t m_nPushFailed;
110 size_t const m_nPushCount;
113 template <class Queue>
114 class Consumer: public cds_test::thread
116 typedef cds_test::thread base_class;
120 size_t const m_nPushPerProducer;
125 typedef std::vector<size_t> popped_data;
126 typedef std::vector<size_t>::iterator data_iterator;
127 typedef std::vector<size_t>::const_iterator const_data_iterator;
129 std::vector<popped_data> m_WriterData;
132 void initPoppedData()
134 const size_t nProducerCount = s_nProducerThreadCount;
135 m_WriterData.resize( nProducerCount );
136 for ( size_t i = 0; i < nProducerCount; ++i )
137 m_WriterData[i].reserve( m_nPushPerProducer );
141 Consumer( cds_test::thread_pool& pool, Queue& queue, size_t nPushPerProducer )
142 : base_class( pool, consumer_thread )
144 , m_nPushPerProducer( nPushPerProducer )
151 Consumer( Consumer& src )
153 , m_Queue( src.m_Queue )
154 , m_nPushPerProducer( src.m_nPushPerProducer )
162 virtual thread * clone()
164 return new Consumer( *this );
172 const size_t nTotalWriters = s_nProducerThreadCount;
175 if ( m_Queue.pop( v )) {
177 if ( v.nWriterNo < nTotalWriters )
178 m_WriterData[ v.nWriterNo ].push_back( v.nNo );
185 if ( s_nProducerDone.load() >= nTotalWriters ) {
186 if ( m_Queue.empty())
195 size_t m_nThreadPushCount;
198 template <class Queue>
199 void analyze( Queue& q, size_t /*nLeftOffset*/ = 0, size_t nRightOffset = 0 )
201 cds_test::thread_pool& pool = get_pool();
203 typedef Consumer<Queue> Consumer;
204 typedef Producer<Queue> Producer;
206 size_t nPostTestPops = 0;
213 size_t nTotalPops = 0;
214 size_t nPopFalse = 0;
215 size_t nPoppedItems = 0;
216 size_t nPushFailed = 0;
218 std::vector< Consumer * > arrConsumer;
220 for ( size_t i = 0; i < pool.size(); ++i ) {
221 cds_test::thread& thr = pool.get(i);
222 if ( thr.type() == consumer_thread ) {
223 Consumer& consumer = static_cast<Consumer&>( thr );
224 nTotalPops += consumer.m_nPopped;
225 nPopFalse += consumer.m_nPopEmpty;
226 arrConsumer.push_back( &consumer );
227 EXPECT_EQ( consumer.m_nBadWriter, 0u ) << "consumer_thread_no " << i;
230 for ( size_t n = 0; n < s_nProducerThreadCount; ++n )
231 nPopped += consumer.m_WriterData[n].size();
233 nPoppedItems += nPopped;
236 assert( thr.type() == producer_thread );
238 Producer& producer = static_cast<Producer&>( thr );
239 nPushFailed += producer.m_nPushFailed;
240 EXPECT_EQ( producer.m_nPushFailed, 0u ) << "producer_thread_no " << i;
243 EXPECT_EQ( nTotalPops, nPoppedItems );
245 EXPECT_EQ( nTotalPops + nPostTestPops, s_nQueueSize ) << "nTotalPops=" << nTotalPops << ", nPostTestPops=" << nPostTestPops;
246 EXPECT_TRUE( q.empty());
248 // Test consistency of popped sequence
249 for ( size_t nWriter = 0; nWriter < s_nProducerThreadCount; ++nWriter ) {
250 std::vector<size_t> arrData;
251 arrData.reserve( m_nThreadPushCount );
252 for ( size_t nReader = 0; nReader < arrConsumer.size(); ++nReader ) {
253 auto it = arrConsumer[nReader]->m_WriterData[nWriter].begin();
254 auto itEnd = arrConsumer[nReader]->m_WriterData[nWriter].end();
257 for ( ++it; it != itEnd; ++it ) {
258 EXPECT_LT( *itPrev, *it + nRightOffset ) << "consumer=" << nReader << ", producer=" << nWriter;
263 for ( it = arrConsumer[nReader]->m_WriterData[nWriter].begin(); it != itEnd; ++it )
264 arrData.push_back( *it );
267 std::sort( arrData.begin(), arrData.end());
268 for ( size_t i=1; i < arrData.size(); ++i ) {
269 EXPECT_EQ( arrData[i - 1] + 1, arrData[i] ) << "producer=" << nWriter;
272 EXPECT_EQ( arrData[0], 0u ) << "producer=" << nWriter;
273 EXPECT_EQ( arrData[arrData.size() - 1], m_nThreadPushCount - 1 ) << "producer=" << nWriter;
277 template <class Queue>
278 void test_queue( Queue& q )
280 m_nThreadPushCount = s_nQueueSize / s_nProducerThreadCount;
282 cds_test::thread_pool& pool = get_pool();
283 pool.add( new Producer<Queue>( pool, q, m_nThreadPushCount ), s_nProducerThreadCount );
284 pool.add( new Consumer<Queue>( pool, q, m_nThreadPushCount ), s_nConsumerThreadCount );
286 s_nProducerDone.store( 0 );
287 s_nQueueSize = m_nThreadPushCount * s_nProducerThreadCount;
289 propout() << std::make_pair( "producer_count", s_nProducerThreadCount )
290 << std::make_pair( "consumer_count", s_nConsumerThreadCount )
291 << std::make_pair( "push_count", s_nQueueSize );
293 std::chrono::milliseconds duration = pool.run();
295 propout() << std::make_pair( "duration", duration );
298 template <class Queue>
299 void test( Queue& q )
303 propout() << q.statistics();
307 static void set_array_size( size_t size ) {
308 const bool tmp = fc_test::has_set_array_size<value_type>::value;
309 set_array_size(size, std::integral_constant<bool, tmp>());
312 static void set_array_size(size_t size, std::true_type){
313 value_type::set_array_size(size);
316 static void set_array_size(size_t, std::false_type)
321 static void SetUpTestCase()
323 cds_test::config const& cfg = get_config( "queue_push_pop" );
325 s_nConsumerThreadCount = cfg.get_size_t( "ConsumerCount", s_nConsumerThreadCount );
326 s_nProducerThreadCount = cfg.get_size_t( "ProducerCount", s_nProducerThreadCount );
327 s_nQueueSize = cfg.get_size_t( "QueueSize", s_nQueueSize );
328 s_nHeavyValueSize = cfg.get_size_t( "HeavyValueSize", s_nHeavyValueSize );
330 if ( s_nConsumerThreadCount == 0u )
331 s_nConsumerThreadCount = 1;
332 if ( s_nProducerThreadCount == 0u )
333 s_nProducerThreadCount = 1;
334 if ( s_nQueueSize == 0u )
336 if ( s_nHeavyValueSize == 0 )
337 s_nHeavyValueSize = 1;
339 set_array_size( s_nHeavyValueSize );
342 //static void TearDownTestCase();
345 using fc_with_heavy_value = queue_push_pop< fc_test::heavy_value<36000> >;
346 using simple_queue_push_pop = queue_push_pop<>;
348 CDSSTRESS_MSQueue( simple_queue_push_pop )
349 CDSSTRESS_MoirQueue( simple_queue_push_pop )
350 CDSSTRESS_BasketQueue( simple_queue_push_pop )
351 CDSSTRESS_OptimsticQueue( simple_queue_push_pop )
352 CDSSTRESS_FCQueue( simple_queue_push_pop )
353 CDSSTRESS_FCDeque( simple_queue_push_pop )
354 CDSSTRESS_FCDeque_HeavyValue( fc_with_heavy_value )
355 CDSSTRESS_RWQueue( simple_queue_push_pop )
356 CDSSTRESS_StdQueue( simple_queue_push_pop )
358 #undef CDSSTRESS_Queue_F
359 #define CDSSTRESS_Queue_F( test_fixture, type_name ) \
360 TEST_F( test_fixture, type_name ) \
362 typedef queue::Types< value_type >::type_name queue_type; \
363 queue_type queue( s_nQueueSize ); \
367 CDSSTRESS_VyukovQueue( simple_queue_push_pop )
369 #undef CDSSTRESS_Queue_F
372 // ********************************************************************
373 // SegmentedQueue test
375 class segmented_queue_push_pop
376 : public queue_push_pop<>
377 , public ::testing::WithParamInterface< size_t >
379 typedef queue_push_pop<> base_class;
383 template <typename Queue>
386 size_t quasi_factor = GetParam();
388 Queue q( quasi_factor );
389 propout() << std::make_pair( "quasi_factor", quasi_factor );
390 base_class::test_queue( q );
391 analyze( q, quasi_factor * 2, quasi_factor );
392 propout() << q.statistics();
396 static std::vector< size_t > get_test_parameters()
398 cds_test::config const& cfg = cds_test::stress_fixture::get_config( "queue_push_pop" );
399 bool bIterative = cfg.get_bool( "SegmentedQueue_Iterate", false );
400 size_t quasi_factor = cfg.get_size_t( "SegmentedQueue_SegmentSize", 256 );
402 std::vector<size_t> args;
403 if ( bIterative && quasi_factor > 4 ) {
404 for ( size_t qf = 4; qf <= quasi_factor; qf *= 2 )
405 args.push_back( qf );
407 if ( quasi_factor > 2 )
408 args.push_back( quasi_factor );
417 #define CDSSTRESS_Queue_F( test_fixture, type_name ) \
418 TEST_P( test_fixture, type_name ) \
420 typedef typename queue::Types<value_type>::type_name queue_type; \
421 test< queue_type >(); \
424 CDSSTRESS_SegmentedQueue( segmented_queue_push_pop )
426 INSTANTIATE_TEST_CASE_P( SQ,
427 segmented_queue_push_pop,
428 ::testing::ValuesIn( segmented_queue_push_pop::get_test_parameters()));