3 #include "cppunit/thread.h"
4 #include "queue/queue_type.h"
5 #include "queue/queue_defs.h"
9 #include <boost/type_traits/is_base_of.hpp>
11 // Multi-threaded queue test for random push/pop operation
14 #define TEST_CASE( Q, V ) void Q() { test< Types<V>::Q >(); }
15 #define TEST_BOUNDED( Q, V ) TEST_CASE( Q, V )
16 #define TEST_SEGMENTED( Q, V ) void Q() { test_segmented< Types< V >::Q >(); }
18 namespace ns_Queue_Random {
19 static size_t s_nThreadCount = 16;
20 static size_t s_nQueueSize = 10000000;
27 SimpleValue( size_t n ): nNo(n) {}
28 size_t getNo() const { return nNo; }
32 using namespace ns_Queue_Random;
34 class Queue_Random: public CppUnitMini::TestCase
36 typedef CppUnitMini::TestCase base_class;
38 template <class Queue>
39 class Thread: public CppUnitMini::TestThread
41 virtual TestThread * clone()
43 return new Thread( *this );
53 size_t m_nUndefWriter;
54 size_t m_nRepeatValue;
55 size_t m_nPushError ; // push error count
57 std::vector<size_t> m_arrLastRead;
58 std::vector<size_t> m_arrPopCountPerThread;
60 size_t const m_nSpread;
63 Thread( CppUnitMini::ThreadPool& pool, Queue& q, size_t nSpread = 0 )
64 : CppUnitMini::TestThread( pool )
66 , m_nSpread( nSpread )
69 : CppUnitMini::TestThread( src )
70 , m_Queue( src.m_Queue )
71 , m_nSpread( src.m_nSpread )
74 Queue_Random& getTest()
76 return reinterpret_cast<Queue_Random&>( m_Pool.m_Test );
81 cds::threading::Manager::attachThread();
89 m_arrLastRead.resize( s_nThreadCount, 0 );
90 m_arrPopCountPerThread.resize( s_nThreadCount, 0 );
94 cds::threading::Manager::detachThread();
99 size_t const nThreadCount = s_nThreadCount;
100 size_t const nTotalPush = getTest().m_nThreadPushCount;
104 m_fTime = m_Timer.duration();
106 bool bNextPop = false;
107 while ( m_nPushCount < nTotalPush ) {
108 if ( !bNextPop && (rand() & 3) != 3 ) {
110 node.nThread = m_nThreadNo;
111 node.nNo = ++m_nPushCount;
112 if ( !m_Queue.push( node )) {
126 while ( !m_Queue.empty() && nPopLoop < 1000000 ) {
127 if ( pop( nThreadCount ) )
134 m_fTime = m_Timer.duration() - m_fTime;
137 bool pop( size_t nThreadCount )
142 if ( m_Queue.pop( node )) {
144 if ( node.nThread < nThreadCount ) {
145 m_arrPopCountPerThread[ node.nThread ] += 1;
147 if ( m_arrLastRead[ node.nThread ] > node.nNo ) {
148 if ( m_arrLastRead[ node.nThread ] - node.nNo > m_nSpread )
151 else if ( m_arrLastRead[ node.nThread ] == node.nNo )
153 m_arrLastRead[ node.nThread ] = node.nNo;
156 if ( m_arrLastRead[ node.nThread ] < node.nNo ) {
157 m_arrLastRead[ node.nThread ] = node.nNo;
163 //if ( node.nNo < m_Test.m_nPushCount )
164 // m_Test.m_pRead[ node.nWriter ][ node.nNo ] = node.nNo;
179 size_t m_nThreadPushCount;
182 template <class Queue>
183 void analyze( CppUnitMini::ThreadPool& pool, Queue& testQueue )
185 CPPUNIT_CHECK( testQueue.empty() );
187 std::vector< size_t > arrPushCount;
188 arrPushCount.resize( s_nThreadCount, 0 );
190 size_t nPushTotal = 0;
191 size_t nPopTotal = 0;
193 size_t nPushError = 0;
195 for ( CppUnitMini::ThreadPool::iterator it = pool.begin(); it != pool.end(); ++it ) {
196 Thread<Queue> * pThread = static_cast<Thread<Queue> *>( *it );
197 CPPUNIT_CHECK( pThread->m_nUndefWriter == 0 );
198 CPPUNIT_CHECK_EX( pThread->m_nRepeatValue == 0, "nRepeatValue=" << pThread->m_nRepeatValue );
199 if ( !boost::is_base_of<cds::bounded_container, Queue>::value ) {
200 CPPUNIT_CHECK( pThread->m_nPushError == 0 );
203 nPushError += pThread->m_nPushError;
205 arrPushCount[ pThread->m_nThreadNo ] += pThread->m_nPushCount;
207 nPushTotal += pThread->m_nPushCount;
208 nPopTotal += pThread->m_nPopCount;
209 fTime += pThread->m_fTime;
212 CPPUNIT_MSG( " Duration=" << (fTime /= s_nThreadCount) );
213 if ( boost::is_base_of<cds::bounded_container, Queue>::value ) {
214 CPPUNIT_MSG( " push error (when queue is full)=" << nPushError );
217 size_t nTotalItems = m_nThreadPushCount * s_nThreadCount;
219 CPPUNIT_CHECK_EX( nPushTotal == nTotalItems, "nPushTotal=" << nPushTotal << ", nTotalItems=" << nTotalItems );
220 CPPUNIT_CHECK_EX( nPopTotal == nTotalItems, "nPopTotal=" << nPopTotal << ", nTotalItems=" << nTotalItems );
222 for ( size_t i = 0; i < s_nThreadCount; ++i )
223 CPPUNIT_CHECK( arrPushCount[i] == m_nThreadPushCount );
226 template <class Queue>
229 CPPUNIT_MSG( "Random push/pop test\n thread count=" << s_nThreadCount << ", push count=" << s_nQueueSize << " ..." );
231 m_nThreadPushCount = s_nQueueSize / s_nThreadCount;
234 CppUnitMini::ThreadPool pool( *this );
235 pool.add( new Thread<Queue>( pool, testQueue ), s_nThreadCount );
239 analyze( pool, testQueue );
240 CPPUNIT_MSG( testQueue.statistics() );
243 template <class Queue>
244 void test_segmented()
246 CPPUNIT_MSG( "Random push/pop test\n thread count=" << s_nThreadCount << ", push count=" << s_nQueueSize << " ..." );
248 m_nThreadPushCount = s_nQueueSize / s_nThreadCount;
250 for ( size_t nSegmentSize = 4; nSegmentSize <= 256; nSegmentSize *= 4 ) {
251 CPPUNIT_MSG( "Segment size: " << nSegmentSize );
253 Queue testQueue( nSegmentSize );
254 CppUnitMini::ThreadPool pool( *this );
255 pool.add( new Thread<Queue>( pool, testQueue, nSegmentSize * 2 ), s_nThreadCount );
259 analyze( pool, testQueue );
260 CPPUNIT_MSG( testQueue.statistics() );
264 void setUpParams( const CppUnitMini::TestCfg& cfg ) {
265 s_nThreadCount = cfg.getULong("ThreadCount", 8 );
266 s_nQueueSize = cfg.getULong("QueueSize", 20000000 );
270 CDSUNIT_DECLARE_MoirQueue( SimpleValue )
271 CDSUNIT_DECLARE_MSQueue( SimpleValue )
272 CDSUNIT_DECLARE_OptimisticQueue( SimpleValue )
273 CDSUNIT_DECLARE_BasketQueue( SimpleValue )
274 CDSUNIT_DECLARE_FCQueue( SimpleValue )
275 CDSUNIT_DECLARE_FCDeque( SimpleValue )
276 CDSUNIT_DECLARE_SegmentedQueue( SimpleValue )
277 CDSUNIT_DECLARE_RWQueue( SimpleValue )
278 CDSUNIT_DECLARE_TsigasCysleQueue( SimpleValue )
279 CDSUNIT_DECLARE_VyukovMPMCCycleQueue( SimpleValue )
280 CDSUNIT_DECLARE_StdQueue( SimpleValue )
282 CPPUNIT_TEST_SUITE(Queue_Random)
283 CDSUNIT_TEST_MoirQueue
285 CDSUNIT_TEST_OptimisticQueue
286 CDSUNIT_TEST_BasketQueue
289 CDSUNIT_TEST_SegmentedQueue
291 CDSUNIT_TEST_TsigasCysleQueue
292 CDSUNIT_TEST_VyukovMPMCCycleQueue
293 CDSUNIT_TEST_StdQueue
294 CPPUNIT_TEST_SUITE_END();
299 CPPUNIT_TEST_SUITE_REGISTRATION(queue::Queue_Random);