2 This file is a part of libcds - Concurrent Data Structures library
4 (C) Copyright Maxim Khizhinsky (libcds.dev@gmail.com) 2006-2016
6 Source code repo: http://github.com/khizmax/libcds/
7 Download: http://sourceforge.net/projects/libcds/files/
9 Redistribution and use in source and binary forms, with or without
10 modification, are permitted provided that the following conditions are met:
12 * Redistributions of source code must retain the above copyright notice, this
13 list of conditions and the following disclaimer.
15 * Redistributions in binary form must reproduce the above copyright notice,
16 this list of conditions and the following disclaimer in the documentation
17 and/or other materials provided with the distribution.
19 THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
20 AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
21 IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
22 DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE
23 FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
24 DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
25 SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
26 CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,
27 OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
28 OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
31 #include "queue_type.h"
35 // Multi-threaded queue test for random push/pop operation
38 static size_t s_nThreadCount = 16;
39 static size_t s_nQueueSize = 10000000;
41 std::atomic< size_t > s_nProducerCount(0);
43 class queue_random: public cds_test::stress_fixture
45 typedef cds_test::stress_fixture base_class;
53 value_type( size_t n ) : nNo( n ) {}
56 template <class Queue>
57 class Strain: public cds_test::thread
59 typedef cds_test::thread base_class;
62 Strain( cds_test::thread_pool& pool, Queue& q, size_t nPushCount, size_t nSpread = 0 )
65 , m_nSpread( nSpread )
66 , m_nTotalPushCount( nPushCount )
71 , m_Queue( src.m_Queue )
72 , m_nSpread( src.m_nSpread )
73 , m_nTotalPushCount( src.m_nTotalPushCount )
76 virtual thread * clone()
78 return new Strain( *this );
83 size_t const nThreadCount = s_nThreadCount;
84 size_t const nTotalPush = m_nTotalPushCount;
86 m_arrLastRead.resize( nThreadCount, 0 );
87 m_arrPopCountPerThread.resize( nThreadCount, 0 );
91 while ( m_nPushCount < nTotalPush ) {
92 if ( ( std::rand() & 3) != 3 ) {
94 node.nNo = ++m_nPushCount;
95 if ( !m_Queue.push( node )) {
104 s_nProducerCount.fetch_sub( 1, std::memory_order_relaxed );
106 while ( !m_Queue.empty() || s_nProducerCount.load( std::memory_order_relaxed ) != 0 )
110 bool pop( size_t nThreadCount )
113 node.nThread = nThreadCount;
115 if ( m_Queue.pop( node )) {
117 if ( node.nThread < nThreadCount ) {
118 m_arrPopCountPerThread[ node.nThread ] += 1;
120 if ( m_arrLastRead[ node.nThread ] > node.nNo ) {
121 if ( m_arrLastRead[ node.nThread ] - node.nNo > m_nSpread )
124 else if ( m_arrLastRead[ node.nThread ] == node.nNo )
126 m_arrLastRead[ node.nThread ] = node.nNo;
129 if ( m_arrLastRead[ node.nThread ] < node.nNo )
130 m_arrLastRead[ node.nThread ] = node.nNo;
148 size_t m_nPushCount = 0;
149 size_t m_nPopCount = 0;
150 size_t m_nEmptyPop = 0;
152 size_t m_nUndefWriter = 0;
153 size_t m_nRepeatValue = 0;
154 size_t m_nPushError = 0;
156 std::vector<size_t> m_arrLastRead;
157 std::vector<size_t> m_arrPopCountPerThread;
159 size_t const m_nSpread;
160 size_t const m_nTotalPushCount;
164 static void SetUpTestCase()
\r
166 cds_test::config const& cfg = get_config( "queue_random" );
\r
168 s_nThreadCount = cfg.get_size_t( "ThreadCount", s_nThreadCount );
169 s_nQueueSize = cfg.get_size_t( "QueueSize", s_nQueueSize );
171 if ( s_nThreadCount == 0 )
173 if ( s_nQueueSize == 0 )
177 //static void TearDownTestCase();
\r
180 template <class Queue>
181 void analyze( Queue& q )
183 EXPECT_TRUE( q.empty() );
185 std::vector< size_t > arrPushCount;
186 arrPushCount.resize( s_nThreadCount, 0 );
188 size_t nPushTotal = 0;
189 size_t nPopTotal = 0;
190 size_t nPushError = 0;
192 cds_test::thread_pool& pool = get_pool();
193 for ( size_t i = 0; i < pool.size(); ++i ) {
194 Strain<Queue>& thr = static_cast<Strain<Queue> &>( pool.get(i));
195 EXPECT_EQ( thr.m_nUndefWriter, 0 );
196 EXPECT_EQ( thr.m_nRepeatValue, 0 );
197 EXPECT_EQ( thr.m_nPushError, 0 );
198 nPushError += thr.m_nPushError;
200 arrPushCount[ thr.id() ] += thr.m_nPushCount;
202 nPushTotal += thr.m_nPushCount;
203 nPopTotal += thr.m_nPopCount;
206 EXPECT_EQ( nPushTotal, s_nQueueSize );
207 EXPECT_EQ( nPopTotal, s_nQueueSize );
209 size_t const nThreadPushCount = s_nQueueSize / s_nThreadCount;
210 for ( size_t i = 0; i < s_nThreadCount; ++i )
211 EXPECT_EQ( arrPushCount[i], nThreadPushCount ) << "thread=" << i;
214 template <class Queue>
215 void test( Queue& q )
217 size_t nThreadPushCount = s_nQueueSize / s_nThreadCount;
219 cds_test::thread_pool& pool = get_pool();
220 pool.add( new Strain<Queue>( pool, q, nThreadPushCount ), s_nThreadCount );
222 s_nQueueSize = nThreadPushCount * s_nThreadCount;
223 propout() << std::make_pair( "thread_count", s_nThreadCount )
224 << std::make_pair( "push_count", s_nQueueSize );
226 s_nProducerCount.store( pool.size(), std::memory_order_release );
227 std::chrono::milliseconds duration = pool.run();
228 propout() << std::make_pair( "duration", duration );
232 propout() << q.statistics();
236 CDSSTRESS_MSQueue( queue_random )
237 CDSSTRESS_MoirQueue( queue_random )
238 CDSSTRESS_BasketQueue( queue_random )
239 CDSSTRESS_OptimsticQueue( queue_random )
240 CDSSTRESS_FCQueue( queue_random )
241 CDSSTRESS_FCDeque( queue_random )
242 CDSSTRESS_RWQueue( queue_random )
243 CDSSTRESS_StdQueue( queue_random )
245 #undef CDSSTRESS_Queue_F
246 #define CDSSTRESS_Queue_F( test_fixture, type_name ) \
247 TEST_F( test_fixture, type_name ) \
249 typedef queue::Types< value_type >::type_name queue_type; \
250 queue_type queue( s_nQueueSize ); \
254 CDSSTRESS_TsigasQueue( queue_random )
255 CDSSTRESS_VyukovQueue( queue_random )
257 #undef CDSSTRESS_Queue_F
259 // ********************************************************************
260 // SegmentedQueue test
262 class segmented_queue_random
263 : public queue_random
264 , public ::testing::WithParamInterface< size_t >
266 typedef queue_random base_class;
269 template <typename Queue>
272 size_t quasi_factor = GetParam();
274 Queue q( quasi_factor );
275 propout() << std::make_pair( "quasi_factor", quasi_factor );
277 size_t nThreadPushCount = s_nQueueSize / s_nThreadCount;
279 cds_test::thread_pool& pool = get_pool();
280 pool.add( new Strain<Queue>( pool, q, nThreadPushCount, quasi_factor * 2 ), s_nThreadCount );
282 s_nQueueSize = nThreadPushCount * s_nThreadCount;
283 propout() << std::make_pair( "thread_count", s_nThreadCount )
284 << std::make_pair( "push_count", s_nQueueSize );
286 s_nProducerCount.store( pool.size(), std::memory_order_release );
287 std::chrono::milliseconds duration = pool.run();
288 propout() << std::make_pair( "duration", duration );
292 propout() << q.statistics();
296 static std::vector< size_t > get_test_parameters()
298 cds_test::config const& cfg = cds_test::stress_fixture::get_config( "queue_push" );
299 bool bIterative = cfg.get_bool( "SegmentedQueue_Iterate", false );
300 size_t quasi_factor = cfg.get_size_t( "SegmentedQueue_SegmentSize", 256 );
302 std::vector<size_t> args;
303 if ( bIterative && quasi_factor > 4 ) {
304 for ( size_t qf = 4; qf <= quasi_factor; qf *= 2 )
305 args.push_back( qf );
308 if ( quasi_factor > 2 )
309 args.push_back( quasi_factor );
318 #define CDSSTRESS_Queue_F( test_fixture, type_name ) \
319 TEST_P( test_fixture, type_name ) \
321 typedef typename queue::Types<value_type>::type_name queue_type; \
322 test< queue_type >(); \
325 CDSSTRESS_SegmentedQueue( segmented_queue_random )
327 INSTANTIATE_TEST_CASE_P( SQ,
328 segmented_queue_random,
329 ::testing::ValuesIn( segmented_queue_random::get_test_parameters()));