2 This file is a part of libcds - Concurrent Data Structures library
4 (C) Copyright Maxim Khizhinsky (libcds.dev@gmail.com) 2006-2017
6 Source code repo: http://github.com/khizmax/libcds/
7 Download: http://sourceforge.net/projects/libcds/files/
9 Redistribution and use in source and binary forms, with or without
10 modification, are permitted provided that the following conditions are met:
12 * Redistributions of source code must retain the above copyright notice, this
13 list of conditions and the following disclaimer.
15 * Redistributions in binary form must reproduce the above copyright notice,
16 this list of conditions and the following disclaimer in the documentation
17 and/or other materials provided with the distribution.
19 THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
20 AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
21 IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
22 DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE
23 FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
24 DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
25 SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
26 CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,
27 OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
28 OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
31 #include "queue_type.h"
35 // Multi-threaded queue test for random push/pop operation
38 static size_t s_nThreadCount = 16;
39 static size_t s_nQueueSize = 10000000;
41 std::atomic< size_t > s_nProducerCount(0);
43 class queue_random: public cds_test::stress_fixture
45 typedef cds_test::stress_fixture base_class;
53 value_type( size_t n ) : nNo( n ) {}
56 template <class Queue>
57 class Strain: public cds_test::thread
59 typedef cds_test::thread base_class;
62 Strain( cds_test::thread_pool& pool, Queue& q, size_t nPushCount, size_t nSpread = 0 )
65 , m_nSpread( nSpread )
66 , m_nTotalPushCount( nPushCount )
71 , m_Queue( src.m_Queue )
72 , m_nSpread( src.m_nSpread )
73 , m_nTotalPushCount( src.m_nTotalPushCount )
76 virtual thread * clone()
78 return new Strain( *this );
83 size_t const nThreadCount = s_nThreadCount;
84 size_t const nTotalPush = m_nTotalPushCount;
86 m_arrLastRead.resize( nThreadCount, 0 );
87 m_arrPopCountPerThread.resize( nThreadCount, 0 );
91 while ( m_nPushCount < nTotalPush ) {
92 if ( ( std::rand() & 3) != 3 ) {
94 node.nNo = ++m_nPushCount;
95 if ( !m_Queue.push( node )) {
104 s_nProducerCount.fetch_sub( 1, std::memory_order_relaxed );
106 while ( !m_Queue.empty() || s_nProducerCount.load( std::memory_order_relaxed ) != 0 )
110 bool pop( size_t nThreadCount )
113 node.nThread = nThreadCount;
115 if ( m_Queue.pop( node )) {
117 if ( node.nThread < nThreadCount ) {
118 m_arrPopCountPerThread[ node.nThread ] += 1;
120 if ( m_arrLastRead[ node.nThread ] > node.nNo ) {
121 if ( m_arrLastRead[ node.nThread ] - node.nNo > m_nSpread )
124 else if ( m_arrLastRead[ node.nThread ] == node.nNo )
126 m_arrLastRead[ node.nThread ] = node.nNo;
129 if ( m_arrLastRead[ node.nThread ] < node.nNo )
130 m_arrLastRead[ node.nThread ] = node.nNo;
148 size_t m_nPushCount = 0;
149 size_t m_nPopCount = 0;
150 size_t m_nEmptyPop = 0;
152 size_t m_nUndefWriter = 0;
153 size_t m_nRepeatValue = 0;
154 size_t m_nPushError = 0;
156 std::vector<size_t> m_arrLastRead;
157 std::vector<size_t> m_arrPopCountPerThread;
159 size_t const m_nSpread;
160 size_t const m_nTotalPushCount;
164 static void SetUpTestCase()
166 cds_test::config const& cfg = get_config( "queue_random" );
168 s_nThreadCount = cfg.get_size_t( "ThreadCount", s_nThreadCount );
169 s_nQueueSize = cfg.get_size_t( "QueueSize", s_nQueueSize );
171 if ( s_nThreadCount == 0u )
173 if ( s_nQueueSize == 0u )
177 //static void TearDownTestCase();
180 template <class Queue>
181 void analyze( Queue& q )
183 EXPECT_TRUE( q.empty());
185 std::vector< size_t > arrPushCount;
186 arrPushCount.resize( s_nThreadCount, 0 );
188 size_t nPushTotal = 0;
189 size_t nPopTotal = 0;
190 size_t nPushError = 0;
192 cds_test::thread_pool& pool = get_pool();
193 for ( size_t i = 0; i < pool.size(); ++i ) {
194 Strain<Queue>& thr = static_cast<Strain<Queue> &>( pool.get(i));
195 EXPECT_EQ( thr.m_nUndefWriter, 0u );
196 EXPECT_EQ( thr.m_nRepeatValue, 0u );
197 EXPECT_EQ( thr.m_nPushError, 0u );
198 nPushError += thr.m_nPushError;
200 arrPushCount[ thr.id() ] += thr.m_nPushCount;
202 nPushTotal += thr.m_nPushCount;
203 nPopTotal += thr.m_nPopCount;
206 EXPECT_EQ( nPushTotal, s_nQueueSize );
207 EXPECT_EQ( nPopTotal, s_nQueueSize );
209 size_t const nThreadPushCount = s_nQueueSize / s_nThreadCount;
210 for ( size_t i = 0; i < s_nThreadCount; ++i )
211 EXPECT_EQ( arrPushCount[i], nThreadPushCount ) << "thread=" << i;
214 template <class Queue>
215 void test( Queue& q )
217 size_t nThreadPushCount = s_nQueueSize / s_nThreadCount;
219 cds_test::thread_pool& pool = get_pool();
220 pool.add( new Strain<Queue>( pool, q, nThreadPushCount ), s_nThreadCount );
222 s_nQueueSize = nThreadPushCount * s_nThreadCount;
223 propout() << std::make_pair( "thread_count", s_nThreadCount )
224 << std::make_pair( "push_count", s_nQueueSize );
226 s_nProducerCount.store( pool.size(), std::memory_order_release );
227 std::chrono::milliseconds duration = pool.run();
228 propout() << std::make_pair( "duration", duration );
232 propout() << q.statistics();
236 CDSSTRESS_MSQueue( queue_random )
237 CDSSTRESS_MoirQueue( queue_random )
238 CDSSTRESS_BasketQueue( queue_random )
239 CDSSTRESS_OptimsticQueue( queue_random )
240 CDSSTRESS_FCQueue( queue_random )
241 CDSSTRESS_FCDeque( queue_random )
242 CDSSTRESS_RWQueue( queue_random )
243 CDSSTRESS_StdQueue( queue_random )
245 #undef CDSSTRESS_Queue_F
246 #define CDSSTRESS_Queue_F( test_fixture, type_name ) \
247 TEST_F( test_fixture, type_name ) \
249 typedef queue::Types< value_type >::type_name queue_type; \
250 queue_type queue( s_nQueueSize ); \
254 CDSSTRESS_VyukovQueue( queue_random )
256 #undef CDSSTRESS_Queue_F
258 // ********************************************************************
259 // SegmentedQueue test
261 class segmented_queue_random
262 : public queue_random
263 , public ::testing::WithParamInterface< size_t >
265 typedef queue_random base_class;
268 template <typename Queue>
271 size_t quasi_factor = GetParam();
273 Queue q( quasi_factor );
274 propout() << std::make_pair( "quasi_factor", quasi_factor );
276 size_t nThreadPushCount = s_nQueueSize / s_nThreadCount;
278 cds_test::thread_pool& pool = get_pool();
279 pool.add( new Strain<Queue>( pool, q, nThreadPushCount, quasi_factor * 2 ), s_nThreadCount );
281 s_nQueueSize = nThreadPushCount * s_nThreadCount;
282 propout() << std::make_pair( "thread_count", s_nThreadCount )
283 << std::make_pair( "push_count", s_nQueueSize );
285 s_nProducerCount.store( pool.size(), std::memory_order_release );
286 std::chrono::milliseconds duration = pool.run();
287 propout() << std::make_pair( "duration", duration );
291 propout() << q.statistics();
295 static std::vector< size_t > get_test_parameters()
297 cds_test::config const& cfg = cds_test::stress_fixture::get_config( "queue_push" );
298 bool bIterative = cfg.get_bool( "SegmentedQueue_Iterate", false );
299 size_t quasi_factor = cfg.get_size_t( "SegmentedQueue_SegmentSize", 256 );
301 std::vector<size_t> args;
302 if ( bIterative && quasi_factor > 4 ) {
303 for ( size_t qf = 4; qf <= quasi_factor; qf *= 2 )
304 args.push_back( qf );
307 if ( quasi_factor > 2 )
308 args.push_back( quasi_factor );
317 #define CDSSTRESS_Queue_F( test_fixture, type_name ) \
318 TEST_P( test_fixture, type_name ) \
320 typedef typename queue::Types<value_type>::type_name queue_type; \
321 test< queue_type >(); \
324 CDSSTRESS_SegmentedQueue( segmented_queue_random )
326 #ifdef CDSTEST_GTEST_INSTANTIATE_TEST_CASE_P_HAS_4TH_ARG
327 static std::string get_test_parameter_name( testing::TestParamInfo<size_t> const& p )
329 return std::to_string( p.param );
331 INSTANTIATE_TEST_CASE_P( SQ,
332 segmented_queue_random,
333 ::testing::ValuesIn( segmented_queue_random::get_test_parameters()), get_test_parameter_name );
335 INSTANTIATE_TEST_CASE_P( SQ,
336 segmented_queue_random,
337 ::testing::ValuesIn( segmented_queue_random::get_test_parameters()));