2 This file is a part of libcds - Concurrent Data Structures library
4 (C) Copyright Maxim Khizhinsky (libcds.dev@gmail.com) 2006-2016
6 Source code repo: http://github.com/khizmax/libcds/
7 Download: http://sourceforge.net/projects/libcds/files/
9 Redistribution and use in source and binary forms, with or without
10 modification, are permitted provided that the following conditions are met:
12 * Redistributions of source code must retain the above copyright notice, this
13 list of conditions and the following disclaimer.
15 * Redistributions in binary form must reproduce the above copyright notice,
16 this list of conditions and the following disclaimer in the documentation
17 and/or other materials provided with the distribution.
19 THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
20 AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
21 IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
22 DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE
23 FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
24 DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
25 SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
26 CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,
27 OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
28 OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
31 #include "queue_type.h"
36 // Multi-threaded queue push/pop test
39 static size_t s_nConsumerThreadCount = 4;
40 static size_t s_nProducerThreadCount = 4;
41 static size_t s_nQueueSize = 4000000;
43 static std::atomic<size_t> s_nProducerDone( 0 );
45 class queue_push_pop: public cds_test::stress_fixture
59 template <class Queue>
60 class Producer: public cds_test::thread
62 typedef cds_test::thread base_class;
65 Producer( cds_test::thread_pool& pool, Queue& queue, size_t nPushCount )
66 : base_class( pool, producer_thread )
69 , m_nPushCount( nPushCount )
72 Producer( Producer& src )
74 , m_Queue( src.m_Queue )
76 , m_nPushCount( src.m_nPushCount )
79 virtual thread * clone()
81 return new Producer( *this );
86 size_t const nPushCount = m_nPushCount;
92 while ( v.nNo < nPushCount ) {
93 if ( m_Queue.push( v ))
99 s_nProducerDone.fetch_add( 1 );
104 size_t m_nPushFailed;
105 size_t const m_nPushCount;
108 template <class Queue>
109 class Consumer: public cds_test::thread
111 typedef cds_test::thread base_class;
115 size_t const m_nPushPerProducer;
120 typedef std::vector<size_t> popped_data;
121 typedef std::vector<size_t>::iterator data_iterator;
122 typedef std::vector<size_t>::const_iterator const_data_iterator;
124 std::vector<popped_data> m_WriterData;
127 void initPoppedData()
129 const size_t nProducerCount = s_nProducerThreadCount;
130 m_WriterData.resize( nProducerCount );
131 for ( size_t i = 0; i < nProducerCount; ++i )
132 m_WriterData[i].reserve( m_nPushPerProducer );
136 Consumer( cds_test::thread_pool& pool, Queue& queue, size_t nPushPerProducer )
137 : base_class( pool, consumer_thread )
139 , m_nPushPerProducer( nPushPerProducer )
146 Consumer( Consumer& src )
148 , m_Queue( src.m_Queue )
149 , m_nPushPerProducer( src.m_nPushPerProducer )
157 virtual thread * clone()
159 return new Consumer( *this );
167 const size_t nTotalWriters = s_nProducerThreadCount;
170 if ( m_Queue.pop( v ) ) {
172 if ( v.nWriterNo < nTotalWriters )
173 m_WriterData[ v.nWriterNo ].push_back( v.nNo );
180 if ( m_Queue.empty() ) {
181 if ( s_nProducerDone.load() >= nTotalWriters ) {
182 if ( m_Queue.empty() )
191 size_t m_nThreadPushCount;
194 template <class Queue>
195 void analyze( Queue& q, size_t /*nLeftOffset*/ = 0, size_t nRightOffset = 0 )
197 cds_test::thread_pool& pool = get_pool();
199 typedef Consumer<Queue> Consumer;
200 typedef Producer<Queue> Producer;
202 size_t nPostTestPops = 0;
209 size_t nTotalPops = 0;
210 size_t nPopFalse = 0;
211 size_t nPoppedItems = 0;
212 size_t nPushFailed = 0;
214 std::vector< Consumer * > arrConsumer;
216 for ( size_t i = 0; i < pool.size(); ++i ) {
217 cds_test::thread& thr = pool.get(i);
218 if ( thr.type() == consumer_thread ) {
219 Consumer& consumer = static_cast<Consumer&>( thr );
220 nTotalPops += consumer.m_nPopped;
221 nPopFalse += consumer.m_nPopEmpty;
222 arrConsumer.push_back( &consumer );
223 EXPECT_EQ( consumer.m_nBadWriter, 0 ) << "consumer_thread_no " << i;
226 for ( size_t n = 0; n < s_nProducerThreadCount; ++n )
227 nPopped += consumer.m_WriterData[n].size();
229 nPoppedItems += nPopped;
232 assert( thr.type() == producer_thread );
234 Producer& producer = static_cast<Producer&>( thr );
235 nPushFailed += producer.m_nPushFailed;
236 EXPECT_EQ( producer.m_nPushFailed, 0 ) << "producer_thread_no " << i;
239 EXPECT_EQ( nTotalPops, nPoppedItems );
241 EXPECT_EQ( nTotalPops + nPostTestPops, s_nQueueSize ) << "nTotalPops=" << nTotalPops << ", nPostTestPops=" << nPostTestPops;
242 EXPECT_TRUE( q.empty() );
244 // Test consistency of popped sequence
245 for ( size_t nWriter = 0; nWriter < s_nProducerThreadCount; ++nWriter ) {
246 std::vector<size_t> arrData;
247 arrData.reserve( m_nThreadPushCount );
248 for ( size_t nReader = 0; nReader < arrConsumer.size(); ++nReader ) {
249 auto it = arrConsumer[nReader]->m_WriterData[nWriter].begin();
250 auto itEnd = arrConsumer[nReader]->m_WriterData[nWriter].end();
253 for ( ++it; it != itEnd; ++it ) {
254 EXPECT_LT( *itPrev, *it + nRightOffset ) << "consumer=" << nReader << ", producer=" << nWriter;
259 for ( it = arrConsumer[nReader]->m_WriterData[nWriter].begin(); it != itEnd; ++it )
260 arrData.push_back( *it );
263 std::sort( arrData.begin(), arrData.end() );
264 for ( size_t i=1; i < arrData.size(); ++i ) {
265 EXPECT_EQ( arrData[i - 1] + 1, arrData[i] ) << "producer=" << nWriter;
268 EXPECT_EQ( arrData[0], 0 ) << "producer=" << nWriter;
269 EXPECT_EQ( arrData[arrData.size() - 1], m_nThreadPushCount - 1 ) << "producer=" << nWriter;
273 template <class Queue>
274 void test_queue( Queue& q )
276 m_nThreadPushCount = s_nQueueSize / s_nProducerThreadCount;
278 cds_test::thread_pool& pool = get_pool();
279 pool.add( new Producer<Queue>( pool, q, m_nThreadPushCount ), s_nProducerThreadCount );
280 pool.add( new Consumer<Queue>( pool, q, m_nThreadPushCount ), s_nConsumerThreadCount );
282 s_nProducerDone.store( 0 );
283 s_nQueueSize = m_nThreadPushCount * s_nProducerThreadCount;
285 propout() << std::make_pair( "producer_count", s_nProducerThreadCount )
286 << std::make_pair( "consumer_count", s_nConsumerThreadCount )
287 << std::make_pair( "push_count", s_nQueueSize );
289 std::chrono::milliseconds duration = pool.run();
291 propout() << std::make_pair( "duration", duration );
294 template <class Queue>
295 void test( Queue& q )
299 propout() << q.statistics();
303 static void SetUpTestCase()
\r
305 cds_test::config const& cfg = get_config( "queue_push_pop" );
\r
307 s_nConsumerThreadCount = cfg.get_size_t( "ConsumerCount", s_nConsumerThreadCount );
308 s_nProducerThreadCount = cfg.get_size_t( "ProducerCount", s_nProducerThreadCount );
309 s_nQueueSize = cfg.get_size_t( "QueueSize", s_nQueueSize );
311 if ( s_nConsumerThreadCount == 0 )
312 s_nConsumerThreadCount = 1;
313 if ( s_nProducerThreadCount == 0 )
314 s_nProducerThreadCount = 1;
315 if ( s_nQueueSize == 0 )
319 //static void TearDownTestCase();
\r
322 CDSSTRESS_MSQueue( queue_push_pop )
323 CDSSTRESS_MoirQueue( queue_push_pop )
324 CDSSTRESS_BasketQueue( queue_push_pop )
325 CDSSTRESS_OptimsticQueue( queue_push_pop )
326 CDSSTRESS_FCQueue( queue_push_pop )
327 CDSSTRESS_FCDeque( queue_push_pop )
328 CDSSTRESS_RWQueue( queue_push_pop )
329 CDSSTRESS_StdQueue( queue_push_pop )
331 #undef CDSSTRESS_Queue_F
332 #define CDSSTRESS_Queue_F( test_fixture, type_name ) \
333 TEST_F( test_fixture, type_name ) \
335 typedef queue::Types< value_type >::type_name queue_type; \
336 queue_type queue( s_nQueueSize ); \
340 CDSSTRESS_TsigasQueue( queue_push_pop )
341 CDSSTRESS_VyukovQueue( queue_push_pop )
343 #undef CDSSTRESS_Queue_F
346 // ********************************************************************
347 // SegmentedQueue test
349 class segmented_queue_push_pop
350 : public queue_push_pop
351 , public ::testing::WithParamInterface< size_t >
353 typedef queue_push_pop base_class;
357 template <typename Queue>
360 size_t quasi_factor = GetParam();
362 Queue q( quasi_factor );
363 propout() << std::make_pair( "quasi_factor", quasi_factor );
364 base_class::test_queue( q );
365 analyze( q, quasi_factor * 2, quasi_factor );
366 propout() << q.statistics();
370 static std::vector< size_t > get_test_parameters()
372 cds_test::config const& cfg = cds_test::stress_fixture::get_config( "queue_push_pop" );
373 bool bIterative = cfg.get_bool( "SegmentedQueue_Iterate", false );
374 size_t quasi_factor = cfg.get_size_t( "SegmentedQueue_SegmentSize", 256 );
376 std::vector<size_t> args;
377 if ( bIterative && quasi_factor > 4 ) {
378 for ( size_t qf = 4; qf <= quasi_factor; qf *= 2 )
379 args.push_back( qf );
382 if ( quasi_factor > 2 )
383 args.push_back( quasi_factor );
392 #define CDSSTRESS_Queue_F( test_fixture, type_name ) \
393 TEST_P( test_fixture, type_name ) \
395 typedef typename queue::Types<value_type>::type_name queue_type; \
396 test< queue_type >(); \
399 CDSSTRESS_SegmentedQueue( segmented_queue_push_pop )
401 INSTANTIATE_TEST_CASE_P( SQ,
402 segmented_queue_push_pop,
403 ::testing::ValuesIn( segmented_queue_push_pop::get_test_parameters()));