3 #include "cppunit/thread.h"
4 #include "queue/queue_type.h"
5 #include "queue/queue_defs.h"
8 // Multi-threaded queue test for push operation
11 #define TEST_CASE( Q, V ) void Q() { test< Types<V>::Q >(); }
12 #define TEST_BOUNDED( Q, V ) void Q() { test_bounded< Types<V>::Q >(); }
13 #define TEST_SEGMENTED( Q, V ) void Q() { test_segmented< Types<V>::Q >(); }
15 namespace ns_Queue_Push {
16 static size_t s_nThreadCount = 8;
17 static size_t s_nQueueSize = 20000000 ; // no more than 20 million records
22 SimpleValue(): nNo(0) {}
23 SimpleValue( size_t n ): nNo(n) {}
24 size_t getNo() const { return nNo; }
27 using namespace ns_Queue_Push;
29 class Queue_Push: public CppUnitMini::TestCase
31 template <class Queue>
32 class Thread: public CppUnitMini::TestThread
34 virtual TestThread * clone()
36 return new Thread( *this );
46 Thread( CppUnitMini::ThreadPool& pool, Queue& q )
47 : CppUnitMini::TestThread( pool )
51 : CppUnitMini::TestThread( src )
52 , m_Queue( src.m_Queue )
57 return reinterpret_cast<Queue_Push&>( m_Pool.m_Test );
62 cds::threading::Manager::attachThread();
66 cds::threading::Manager::detachThread();
71 m_fTime = m_Timer.duration();
74 for ( size_t nItem = m_nStartItem; nItem < m_nEndItem; ++nItem ) {
75 if ( !m_Queue.push( nItem ))
79 m_fTime = m_Timer.duration() - m_fTime;
84 template <class Queue>
85 void analyze( CppUnitMini::ThreadPool& pool, Queue& testQueue )
87 size_t nThreadItems = s_nQueueSize / s_nThreadCount;
89 for ( CppUnitMini::ThreadPool::iterator it = pool.begin(); it != pool.end(); ++it ) {
90 Thread<Queue> * pThread = reinterpret_cast<Thread<Queue> *>(*it);
91 fTime += pThread->m_fTime;
92 if ( pThread->m_nPushError != 0 )
93 CPPUNIT_MSG(" ERROR: thread push error count=" << pThread->m_nPushError );
95 CPPUNIT_MSG( " Duration=" << (fTime / s_nThreadCount) );
96 CPPUNIT_CHECK( !testQueue.empty() )
98 size_t * arr = new size_t[ s_nQueueSize ];
99 memset(arr, 0, sizeof(arr[0]) * s_nQueueSize );
101 cds::OS::Timer timer;
102 CPPUNIT_MSG( " Pop (single-threaded)..." );
104 SimpleValue val = SimpleValue();
105 while ( testQueue.pop( val )) {
107 ++arr[ val.getNo() ];
109 CPPUNIT_MSG( " Duration=" << timer.duration() );
111 size_t nTotalItems = nThreadItems * s_nThreadCount;
113 for ( size_t i = 0; i < nTotalItems; ++i ) {
115 CPPUNIT_MSG( " ERROR: Item " << i << " has not been pushed" );
116 CPPUNIT_ASSERT( ++nError <= 10 );
123 template <class Queue>
128 CppUnitMini::ThreadPool pool( *this );
129 pool.add( new Thread<Queue>( pool, testQueue ), s_nThreadCount );
132 size_t nThreadItemCount = s_nQueueSize / s_nThreadCount;
133 for ( CppUnitMini::ThreadPool::iterator it = pool.begin(); it != pool.end(); ++it ) {
134 Thread<Queue> * pThread = reinterpret_cast<Thread<Queue> *>(*it);
135 pThread->m_nStartItem = nStart;
136 nStart += nThreadItemCount;
137 pThread->m_nEndItem = nStart;
140 CPPUNIT_MSG( " Push test, thread count=" << s_nThreadCount << " ...");
143 analyze( pool, testQueue );
145 CPPUNIT_MSG( testQueue.statistics() );
148 template <class Queue>
152 size_t nThreadItemCount = s_nQueueSize / s_nThreadCount;
154 Queue testQueue( s_nQueueSize );
156 CppUnitMini::ThreadPool pool( *this );
157 pool.add( new Thread<Queue>( pool, testQueue ), s_nThreadCount );
159 for ( CppUnitMini::ThreadPool::iterator it = pool.begin(); it != pool.end(); ++it ) {
160 Thread<Queue> * pThread = reinterpret_cast<Thread<Queue> *>(*it);
161 pThread->m_nStartItem = nStart;
162 nStart += nThreadItemCount;
163 pThread->m_nEndItem = nStart;
166 CPPUNIT_MSG( " Push test, thread count=" << s_nThreadCount << " ...");
169 analyze( pool, testQueue );
171 CPPUNIT_MSG( testQueue.statistics() );
174 template <class Queue>
175 void test_segmented()
177 CPPUNIT_MSG( " Push test, thread count=" << s_nThreadCount << " ...");
178 for ( size_t nSegmentSize = 4; nSegmentSize <= 256; nSegmentSize *= 4 ) {
179 CPPUNIT_MSG( "Segment size: " << nSegmentSize );
181 Queue testQueue( nSegmentSize );
183 CppUnitMini::ThreadPool pool( *this );
184 pool.add( new Thread<Queue>( pool, testQueue ), s_nThreadCount );
187 size_t nThreadItemCount = s_nQueueSize / s_nThreadCount;
188 for ( CppUnitMini::ThreadPool::iterator it = pool.begin(); it != pool.end(); ++it ) {
189 Thread<Queue> * pThread = reinterpret_cast<Thread<Queue> *>(*it);
190 pThread->m_nStartItem = nStart;
191 nStart += nThreadItemCount;
192 pThread->m_nEndItem = nStart;
197 analyze( pool, testQueue );
199 CPPUNIT_MSG( testQueue.statistics() );
203 void setUpParams( const CppUnitMini::TestCfg& cfg ) {
204 s_nThreadCount = cfg.getULong("ThreadCount", 8 );
205 s_nQueueSize = cfg.getULong("QueueSize", 20000000 );
209 CDSUNIT_DECLARE_MoirQueue( SimpleValue )
210 CDSUNIT_DECLARE_MSQueue( SimpleValue )
211 CDSUNIT_DECLARE_OptimisticQueue( SimpleValue )
212 CDSUNIT_DECLARE_BasketQueue( SimpleValue )
213 CDSUNIT_DECLARE_FCQueue( SimpleValue )
214 CDSUNIT_DECLARE_FCDeque( SimpleValue )
215 CDSUNIT_DECLARE_SegmentedQueue( SimpleValue )
216 CDSUNIT_DECLARE_RWQueue( SimpleValue )
217 CDSUNIT_DECLARE_TsigasCycleQueue( SimpleValue )
218 CDSUNIT_DECLARE_VyukovMPMCCycleQueue( SimpleValue )
219 CDSUNIT_DECLARE_StdQueue( SimpleValue )
221 CPPUNIT_TEST_SUITE(Queue_Push)
222 CDSUNIT_TEST_MoirQueue
224 CDSUNIT_TEST_OptimisticQueue
225 CDSUNIT_TEST_BasketQueue
228 CDSUNIT_TEST_SegmentedQueue
230 CDSUNIT_TEST_TsigasCycleQueue
231 CDSUNIT_TEST_VyukovMPMCCycleQueue
232 CDSUNIT_TEST_StdQueue
233 CPPUNIT_TEST_SUITE_END();
238 CPPUNIT_TEST_SUITE_REGISTRATION(queue::Queue_Push);