3 #include "cppunit/thread.h"
4 #include "queue/queue_type.h"
5 #include "queue/queue_defs.h"
7 // Multi-threaded queue test for pop operation
10 #define TEST_CASE( Q, V ) void Q() { test< Types<V>::Q >(); }
11 #define TEST_BOUNDED( Q, V ) void Q() { test_bounded< Types<V>::Q >(); }
12 #define TEST_SEGMENTED( Q, V ) void Q() { test_segmented< Types<V>::Q >(); }
14 namespace ns_Queue_Pop {
15 static size_t s_nThreadCount = 8;
16 static size_t s_nQueueSize = 20000000 ; // no more than 20 million records
21 SimpleValue(): nNo(0) {}
22 SimpleValue( size_t n ): nNo(n) {}
23 size_t getNo() const { return nNo; }
26 using namespace ns_Queue_Pop;
28 class Queue_Pop: public CppUnitMini::TestCase
30 template <class Queue>
31 class Thread: public CppUnitMini::TestThread
33 virtual TestThread * clone()
35 return new Thread( *this );
44 Thread( CppUnitMini::ThreadPool& pool, Queue& q )
45 : CppUnitMini::TestThread( pool )
48 m_arr = new long[s_nQueueSize];
51 : CppUnitMini::TestThread( src )
52 , m_Queue( src.m_Queue )
54 m_arr = new long[s_nQueueSize];
63 return reinterpret_cast<Queue_Pop&>( m_Pool.m_Test );
68 cds::threading::Manager::attachThread();
69 memset(m_arr, 0, sizeof(m_arr[0]) * s_nQueueSize );
73 cds::threading::Manager::detachThread();
78 m_fTime = m_Timer.duration();
80 typedef typename Queue::value_type value_type;
81 value_type value = value_type();
83 while ( m_Queue.pop( value ) ) {
84 ++m_arr[ value.getNo() ];
87 m_nPopCount = nPopCount;
89 m_fTime = m_Timer.duration() - m_fTime;
95 template <class Queue>
96 void analyze( CppUnitMini::ThreadPool& pool, Queue& testQueue )
98 size_t * arr = new size_t[ s_nQueueSize ];
99 memset(arr, 0, sizeof(arr[0]) * s_nQueueSize );
102 size_t nTotalPops = 0;
103 for ( CppUnitMini::ThreadPool::iterator it = pool.begin(); it != pool.end(); ++it ) {
104 Thread<Queue> * pThread = reinterpret_cast<Thread<Queue> *>(*it);
105 for ( size_t i = 0; i < s_nQueueSize; ++i )
106 arr[i] += pThread->m_arr[i];
107 nTotalPops += pThread->m_nPopCount;
108 fTime += pThread->m_fTime;
110 CPPUNIT_MSG( " Duration=" << (fTime / s_nThreadCount) );
111 CPPUNIT_CHECK_EX( nTotalPops == s_nQueueSize, "Total pop=" << nTotalPops << ", queue size=" << s_nQueueSize);
112 CPPUNIT_CHECK( testQueue.empty() )
115 for ( size_t i = 0; i < s_nQueueSize; ++i ) {
117 CPPUNIT_MSG( " ERROR: Item " << i << " has not been popped" );
118 CPPUNIT_ASSERT( ++nError <= 10 );
125 template <class Queue>
129 CppUnitMini::ThreadPool pool( *this );
130 pool.add( new Thread<Queue>( pool, testQueue ), s_nThreadCount );
132 CPPUNIT_MSG( " Create queue size =" << s_nQueueSize << " ...");
133 cds::OS::Timer timer;
134 for ( size_t i = 0; i < s_nQueueSize; ++i )
136 CPPUNIT_MSG( " Duration=" << timer.duration() );
138 CPPUNIT_MSG( " Pop test, thread count=" << s_nThreadCount << " ...");
141 analyze( pool, testQueue );
143 CPPUNIT_MSG( testQueue.statistics() );
146 template <class Queue>
149 Queue testQueue( s_nQueueSize );
150 CppUnitMini::ThreadPool pool( *this );
151 pool.add( new Thread<Queue>( pool, testQueue ), s_nThreadCount );
153 CPPUNIT_MSG( " Create queue size =" << s_nQueueSize << " ...");
154 cds::OS::Timer timer;
155 for ( size_t i = 0; i < s_nQueueSize; ++i )
157 CPPUNIT_MSG( " Duration=" << timer.duration() );
159 CPPUNIT_MSG( " Pop test, thread count=" << s_nThreadCount << " ...");
162 analyze( pool, testQueue );
164 CPPUNIT_MSG( testQueue.statistics() );
167 template <class Queue>
168 void test_segmented()
170 for ( size_t nSegmentSize = 4; nSegmentSize <= 256; nSegmentSize *= 4 ) {
171 CPPUNIT_MSG( "Segment size: " << nSegmentSize );
173 Queue testQueue( nSegmentSize );
174 CppUnitMini::ThreadPool pool( *this );
175 pool.add( new Thread<Queue>( pool, testQueue ), s_nThreadCount );
177 CPPUNIT_MSG( " Create queue size =" << s_nQueueSize << " ...");
178 cds::OS::Timer timer;
179 for ( size_t i = 0; i < s_nQueueSize; ++i )
181 CPPUNIT_MSG( " Duration=" << timer.duration() );
183 CPPUNIT_MSG( " Pop test, thread count=" << s_nThreadCount << " ...");
186 analyze( pool, testQueue );
188 CPPUNIT_MSG( testQueue.statistics() );
192 void setUpParams( const CppUnitMini::TestCfg& cfg ) {
193 s_nThreadCount = cfg.getULong("ThreadCount", 8 );
194 s_nQueueSize = cfg.getULong("QueueSize", 20000000 );
198 CDSUNIT_DECLARE_MoirQueue( SimpleValue )
199 CDSUNIT_DECLARE_MSQueue( SimpleValue )
200 CDSUNIT_DECLARE_OptimisticQueue( SimpleValue )
201 CDSUNIT_DECLARE_BasketQueue( SimpleValue )
202 CDSUNIT_DECLARE_FCQueue( SimpleValue )
203 CDSUNIT_DECLARE_FCDeque( SimpleValue )
204 CDSUNIT_DECLARE_SegmentedQueue( SimpleValue )
205 CDSUNIT_DECLARE_RWQueue( SimpleValue )
206 CDSUNIT_DECLARE_TsigasCysleQueue( SimpleValue )
207 CDSUNIT_DECLARE_VyukovMPMCCycleQueue( SimpleValue )
208 CDSUNIT_DECLARE_StdQueue( SimpleValue )
210 CPPUNIT_TEST_SUITE(Queue_Pop)
211 CDSUNIT_TEST_MoirQueue
213 CDSUNIT_TEST_OptimisticQueue
214 CDSUNIT_TEST_BasketQueue
217 CDSUNIT_TEST_SegmentedQueue
219 CDSUNIT_TEST_TsigasCysleQueue
220 CDSUNIT_TEST_VyukovMPMCCycleQueue
221 CDSUNIT_TEST_StdQueue
222 CPPUNIT_TEST_SUITE_END();
227 CPPUNIT_TEST_SUITE_REGISTRATION(queue::Queue_Pop);