2 This file is a part of libcds - Concurrent Data Structures library
4 (C) Copyright Maxim Khizhinsky (libcds.dev@gmail.com) 2006-2016
6 Source code repo: http://github.com/khizmax/libcds/
7 Download: http://sourceforge.net/projects/libcds/files/
9 Redistribution and use in source and binary forms, with or without
10 modification, are permitted provided that the following conditions are met:
12 * Redistributions of source code must retain the above copyright notice, this
13 list of conditions and the following disclaimer.
15 * Redistributions in binary form must reproduce the above copyright notice,
16 this list of conditions and the following disclaimer in the documentation
17 and/or other materials provided with the distribution.
19 THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
20 AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
21 IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
22 DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE
23 FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
24 DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
25 SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
26 CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,
27 OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
28 OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
31 #include "queue_type.h"
36 // Multi-threaded queue push/pop test
39 static size_t s_nConsumerThreadCount = 4;
40 static size_t s_nProducerThreadCount = 4;
41 static size_t s_nQueueSize = 4000000;
43 static std::atomic<size_t> s_nProducerDone( 0 );
45 class queue_push_pop: public cds_test::stress_fixture
59 template <class Queue>
60 class Producer: public cds_test::thread
62 typedef cds_test::thread base_class;
65 Producer( cds_test::thread_pool& pool, Queue& queue, size_t nPushCount )
66 : base_class( pool, producer_thread )
69 , m_nPushCount( nPushCount )
72 Producer( Producer& src )
74 , m_Queue( src.m_Queue )
76 , m_nPushCount( src.m_nPushCount )
79 virtual thread * clone()
81 return new Producer( *this );
86 size_t const nPushCount = m_nPushCount;
92 while ( v.nNo < nPushCount ) {
93 if ( m_Queue.push( v ))
99 s_nProducerDone.fetch_add( 1 );
104 size_t m_nPushFailed;
105 size_t const m_nPushCount;
108 template <class Queue>
109 class Consumer: public cds_test::thread
111 typedef cds_test::thread base_class;
115 size_t const m_nPushPerProducer;
120 typedef std::vector<size_t> popped_data;
121 typedef std::vector<size_t>::iterator data_iterator;
122 typedef std::vector<size_t>::const_iterator const_data_iterator;
124 std::vector<popped_data> m_WriterData;
127 void initPoppedData()
129 const size_t nProducerCount = s_nProducerThreadCount;
130 m_WriterData.resize( nProducerCount );
131 for ( size_t i = 0; i < nProducerCount; ++i )
132 m_WriterData[i].reserve( m_nPushPerProducer );
136 Consumer( cds_test::thread_pool& pool, Queue& queue, size_t nPushPerProducer )
137 : base_class( pool, consumer_thread )
139 , m_nPushPerProducer( nPushPerProducer )
146 Consumer( Consumer& src )
148 , m_Queue( src.m_Queue )
149 , m_nPushPerProducer( src.m_nPushPerProducer )
157 virtual thread * clone()
159 return new Consumer( *this );
167 const size_t nTotalWriters = s_nProducerThreadCount;
170 if ( m_Queue.pop( v ) ) {
172 if ( v.nWriterNo < nTotalWriters )
173 m_WriterData[ v.nWriterNo ].push_back( v.nNo );
180 if ( s_nProducerDone.load() >= nTotalWriters ) {
181 if ( m_Queue.empty() )
190 size_t m_nThreadPushCount;
193 template <class Queue>
194 void analyze( Queue& q, size_t /*nLeftOffset*/ = 0, size_t nRightOffset = 0 )
196 cds_test::thread_pool& pool = get_pool();
198 typedef Consumer<Queue> Consumer;
199 typedef Producer<Queue> Producer;
201 size_t nPostTestPops = 0;
208 size_t nTotalPops = 0;
209 size_t nPopFalse = 0;
210 size_t nPoppedItems = 0;
211 size_t nPushFailed = 0;
213 std::vector< Consumer * > arrConsumer;
215 for ( size_t i = 0; i < pool.size(); ++i ) {
216 cds_test::thread& thr = pool.get(i);
217 if ( thr.type() == consumer_thread ) {
218 Consumer& consumer = static_cast<Consumer&>( thr );
219 nTotalPops += consumer.m_nPopped;
220 nPopFalse += consumer.m_nPopEmpty;
221 arrConsumer.push_back( &consumer );
222 EXPECT_EQ( consumer.m_nBadWriter, 0 ) << "consumer_thread_no " << i;
225 for ( size_t n = 0; n < s_nProducerThreadCount; ++n )
226 nPopped += consumer.m_WriterData[n].size();
228 nPoppedItems += nPopped;
231 assert( thr.type() == producer_thread );
233 Producer& producer = static_cast<Producer&>( thr );
234 nPushFailed += producer.m_nPushFailed;
235 EXPECT_EQ( producer.m_nPushFailed, 0 ) << "producer_thread_no " << i;
238 EXPECT_EQ( nTotalPops, nPoppedItems );
240 EXPECT_EQ( nTotalPops + nPostTestPops, s_nQueueSize ) << "nTotalPops=" << nTotalPops << ", nPostTestPops=" << nPostTestPops;
241 EXPECT_TRUE( q.empty() );
243 // Test consistency of popped sequence
244 for ( size_t nWriter = 0; nWriter < s_nProducerThreadCount; ++nWriter ) {
245 std::vector<size_t> arrData;
246 arrData.reserve( m_nThreadPushCount );
247 for ( size_t nReader = 0; nReader < arrConsumer.size(); ++nReader ) {
248 auto it = arrConsumer[nReader]->m_WriterData[nWriter].begin();
249 auto itEnd = arrConsumer[nReader]->m_WriterData[nWriter].end();
252 for ( ++it; it != itEnd; ++it ) {
253 EXPECT_LT( *itPrev, *it + nRightOffset ) << "consumer=" << nReader << ", producer=" << nWriter;
258 for ( it = arrConsumer[nReader]->m_WriterData[nWriter].begin(); it != itEnd; ++it )
259 arrData.push_back( *it );
262 std::sort( arrData.begin(), arrData.end() );
263 for ( size_t i=1; i < arrData.size(); ++i ) {
264 EXPECT_EQ( arrData[i - 1] + 1, arrData[i] ) << "producer=" << nWriter;
267 EXPECT_EQ( arrData[0], 0 ) << "producer=" << nWriter;
268 EXPECT_EQ( arrData[arrData.size() - 1], m_nThreadPushCount - 1 ) << "producer=" << nWriter;
272 template <class Queue>
273 void test_queue( Queue& q )
275 m_nThreadPushCount = s_nQueueSize / s_nProducerThreadCount;
277 cds_test::thread_pool& pool = get_pool();
278 pool.add( new Producer<Queue>( pool, q, m_nThreadPushCount ), s_nProducerThreadCount );
279 pool.add( new Consumer<Queue>( pool, q, m_nThreadPushCount ), s_nConsumerThreadCount );
281 s_nProducerDone.store( 0 );
282 s_nQueueSize = m_nThreadPushCount * s_nProducerThreadCount;
284 propout() << std::make_pair( "producer_count", s_nProducerThreadCount )
285 << std::make_pair( "consumer_count", s_nConsumerThreadCount )
286 << std::make_pair( "push_count", s_nQueueSize );
288 std::chrono::milliseconds duration = pool.run();
290 propout() << std::make_pair( "duration", duration );
293 template <class Queue>
294 void test( Queue& q )
298 propout() << q.statistics();
302 static void SetUpTestCase()
304 cds_test::config const& cfg = get_config( "queue_push_pop" );
306 s_nConsumerThreadCount = cfg.get_size_t( "ConsumerCount", s_nConsumerThreadCount );
307 s_nProducerThreadCount = cfg.get_size_t( "ProducerCount", s_nProducerThreadCount );
308 s_nQueueSize = cfg.get_size_t( "QueueSize", s_nQueueSize );
310 if ( s_nConsumerThreadCount == 0 )
311 s_nConsumerThreadCount = 1;
312 if ( s_nProducerThreadCount == 0 )
313 s_nProducerThreadCount = 1;
314 if ( s_nQueueSize == 0 )
318 //static void TearDownTestCase();
321 CDSSTRESS_MSQueue( queue_push_pop )
322 CDSSTRESS_MoirQueue( queue_push_pop )
323 CDSSTRESS_BasketQueue( queue_push_pop )
324 CDSSTRESS_OptimsticQueue( queue_push_pop )
325 CDSSTRESS_FCQueue( queue_push_pop )
326 CDSSTRESS_FCDeque( queue_push_pop )
327 CDSSTRESS_RWQueue( queue_push_pop )
328 CDSSTRESS_StdQueue( queue_push_pop )
330 #undef CDSSTRESS_Queue_F
331 #define CDSSTRESS_Queue_F( test_fixture, type_name, level ) \
332 TEST_F( test_fixture, type_name ) \
334 if ( !check_detail_level( level )) return; \
335 typedef queue::Types< value_type >::type_name queue_type; \
336 queue_type queue( s_nQueueSize ); \
340 CDSSTRESS_TsigasQueue( queue_push_pop )
341 CDSSTRESS_VyukovQueue( queue_push_pop )
343 #undef CDSSTRESS_Queue_F
346 // ********************************************************************
347 // SegmentedQueue test
349 class segmented_queue_push_pop
350 : public queue_push_pop
351 , public ::testing::WithParamInterface< size_t >
353 typedef queue_push_pop base_class;
357 template <typename Queue>
360 size_t quasi_factor = GetParam();
362 Queue q( quasi_factor );
363 propout() << std::make_pair( "quasi_factor", quasi_factor );
364 base_class::test_queue( q );
365 analyze( q, quasi_factor * 2, quasi_factor );
366 propout() << q.statistics();
370 static std::vector< size_t > get_test_parameters()
372 cds_test::config const& cfg = cds_test::stress_fixture::get_config( "queue_push_pop" );
373 bool bIterative = cfg.get_bool( "SegmentedQueue_Iterate", false );
374 size_t quasi_factor = cfg.get_size_t( "SegmentedQueue_SegmentSize", 256 );
376 std::vector<size_t> args;
377 if ( bIterative && quasi_factor > 4 ) {
378 for ( size_t qf = 4; qf <= quasi_factor; qf *= 2 )
379 args.push_back( qf );
382 if ( quasi_factor > 2 )
383 args.push_back( quasi_factor );
392 #define CDSSTRESS_Queue_F( test_fixture, type_name, level ) \
393 TEST_P( test_fixture, type_name ) \
395 if ( !check_detail_level( level )) return; \
396 typedef typename queue::Types<value_type>::type_name queue_type; \
397 test< queue_type >(); \
400 CDSSTRESS_SegmentedQueue( segmented_queue_push_pop )
402 INSTANTIATE_TEST_CASE_P( SQ,
403 segmented_queue_push_pop,
404 ::testing::ValuesIn( segmented_queue_push_pop::get_test_parameters()));