3 #include "cppunit/thread.h"
4 #include "queue/intrusive_queue_type.h"
5 #include "queue/intrusive_queue_defs.h"
9 // Multi-threaded random queue test
12 #define TEST_CASE( Q, HOOK ) void Q() { test< Types< Value<HOOK> >::Q >(); }
13 #define TEST_BOUNDED( Q ) void Q() { test_bounded< Types< Value<> >::Q >(); }
14 #define TEST_FCQUEUE( Q, HOOK ) void Q() { test_fcqueue< Types< Value<HOOK> >::Q >(); }
15 #define TEST_SEGMENTED( Q ) void Q() { test_segmented< Types< Value<> >::Q >(); }
16 #define TEST_BOOST( Q, HOOK ) void Q() { test_boost< Types< Value<HOOK> >::Q >(); }
19 static size_t s_nReaderThreadCount = 4;
20 static size_t s_nWriterThreadCount = 4;
21 static size_t s_nQueueSize = 4000000;
22 static unsigned int s_nFCPassCount = 8;
23 static unsigned int s_nFCCompactFactor = 64;
27 template <typename Base = empty >
28 struct Value: public Base
36 class IntrusiveQueue_ReaderWriter: public CppUnitMini::TestCase
38 template <class Queue>
39 class Producer: public CppUnitMini::TestThread
41 virtual TestThread * clone()
43 return new Producer( *this );
50 // Interval in m_arrValue
51 typename Queue::value_type * m_pStart;
52 typename Queue::value_type * m_pEnd;
55 Producer( CppUnitMini::ThreadPool& pool, Queue& q )
56 : CppUnitMini::TestThread( pool )
59 Producer( Producer& src )
60 : CppUnitMini::TestThread( src )
61 , m_Queue( src.m_Queue )
64 IntrusiveQueue_ReaderWriter& getTest()
66 return static_cast<IntrusiveQueue_ReaderWriter&>( m_Pool.m_Test );
71 cds::threading::Manager::attachThread();
75 cds::threading::Manager::detachThread();
82 m_fTime = m_Timer.duration();
85 for ( typename Queue::value_type * p = m_pStart; p < m_pEnd; ) {
87 p->nWriterNo = m_nThreadNo;
88 if ( m_Queue.push( *p )) {
96 m_fTime = m_Timer.duration() - m_fTime;
97 getTest().m_nProducerCount.fetch_sub( 1, CDS_ATOMIC::memory_order_release );
101 template <class Queue>
102 class Consumer: public CppUnitMini::TestThread
104 virtual TestThread * clone()
106 return new Consumer( *this );
115 typedef std::vector<size_t> TPoppedData;
116 typedef std::vector<size_t>::iterator data_iterator;
117 typedef std::vector<size_t>::const_iterator const_data_iterator;
119 std::vector<TPoppedData> m_WriterData;
122 void initPoppedData()
124 const size_t nWriterCount = s_nWriterThreadCount;
125 const size_t nWriterPushCount = getTest().m_nThreadPushCount;
126 m_WriterData.resize( nWriterCount );
127 for ( size_t i = 0; i < nWriterCount; ++i )
128 m_WriterData[i].reserve( nWriterPushCount );
132 Consumer( CppUnitMini::ThreadPool& pool, Queue& q )
133 : CppUnitMini::TestThread( pool )
138 Consumer( Consumer& src )
139 : CppUnitMini::TestThread( src )
140 , m_Queue( src.m_Queue )
145 IntrusiveQueue_ReaderWriter& getTest()
147 return static_cast<IntrusiveQueue_ReaderWriter&>( m_Pool.m_Test );
152 cds::threading::Manager::attachThread();
156 cds::threading::Manager::detachThread();
164 const size_t nTotalWriters = s_nWriterThreadCount;
166 m_fTime = m_Timer.duration();
169 typename Queue::value_type * p = m_Queue.pop();
171 p->nConsumer = m_nThreadNo;
173 if ( p->nWriterNo < nTotalWriters )
174 m_WriterData[ p->nWriterNo ].push_back( p->nNo );
180 if ( getTest().m_nProducerCount.load( CDS_ATOMIC::memory_order_acquire ) == 0 && m_Queue.empty() )
185 m_fTime = m_Timer.duration() - m_fTime;
189 template <typename T>
194 value_array( size_t nSize )
195 : m_pArr( new T[nSize] )
203 T * get() const { return m_pArr; }
208 size_t m_nThreadPushCount;
209 CDS_ATOMIC::atomic<size_t> m_nProducerCount;
210 static CDS_CONSTEXPR_CONST size_t c_nBadConsumer = 0xbadc0ffe;
213 template <class Queue>
214 void analyze( CppUnitMini::ThreadPool& pool, Queue& testQueue, size_t nLeftOffset, size_t nRightOffset )
216 typedef Consumer<Queue> Reader;
217 typedef Producer<Queue> Writer;
218 typedef typename Reader::const_data_iterator ReaderIterator;
220 size_t nPostTestPops = 0;
221 while ( testQueue.pop() )
224 double fTimeWriter = 0;
225 double fTimeReader = 0;
226 size_t nTotalPops = 0;
227 size_t nPopFalse = 0;
228 size_t nPoppedItems = 0;
229 size_t nPushFailed = 0;
231 std::vector< Reader * > arrReaders;
233 for ( CppUnitMini::ThreadPool::iterator it = pool.begin(); it != pool.end(); ++it ) {
234 Reader * pReader = dynamic_cast<Reader *>( *it );
236 fTimeReader += pReader->m_fTime;
237 nTotalPops += pReader->m_nPopped;
238 nPopFalse += pReader->m_nPopEmpty;
239 arrReaders.push_back( pReader );
240 CPPUNIT_CHECK_EX( pReader->m_nBadWriter == 0, "reader " << pReader->m_nThreadNo << " bad writer event count=" << pReader->m_nBadWriter );
243 for ( size_t n = 0; n < s_nWriterThreadCount; ++n )
244 nPopped += pReader->m_WriterData[n].size();
246 CPPUNIT_MSG( " Reader " << pReader->m_nThreadNo - s_nWriterThreadCount << " popped count=" << nPopped );
247 nPoppedItems += nPopped;
250 Writer * pWriter = dynamic_cast<Writer *>( *it );
251 CPPUNIT_ASSERT( pWriter != NULL );
252 fTimeWriter += pWriter->m_fTime;
253 nPushFailed += pWriter->m_nPushFailed;
254 if ( !boost::is_base_of<cds::bounded_container, Queue>::value ) {
255 CPPUNIT_CHECK_EX( pWriter->m_nPushFailed == 0,
256 "writer " << pWriter->m_nThreadNo << " push failed count=" << pWriter->m_nPushFailed );
260 CPPUNIT_CHECK_EX( nTotalPops == nPoppedItems, "nTotalPops=" << nTotalPops << ", nPoppedItems=" << nPoppedItems );
262 CPPUNIT_MSG( " Readers: duration=" << fTimeReader / s_nReaderThreadCount << ", success pop=" << nTotalPops << ", failed pops=" << nPopFalse );
263 CPPUNIT_MSG( " Writers: duration=" << fTimeWriter / s_nWriterThreadCount << ", failed push=" << nPushFailed );
265 size_t nQueueSize = m_nThreadPushCount * s_nWriterThreadCount;
266 CPPUNIT_CHECK_EX( nTotalPops + nPostTestPops == nQueueSize, "popped=" << nTotalPops + nPostTestPops << " must be " << nQueueSize );
267 CPPUNIT_CHECK( testQueue.empty() );
269 // Test that all items have been popped
271 CPPUNIT_MSG( " Test consistency of popped sequence..." );
273 for ( size_t nWriter = 0; nWriter < s_nWriterThreadCount; ++nWriter ) {
274 std::vector<size_t> arrData;
275 arrData.reserve( m_nThreadPushCount );
277 for ( size_t nReader = 0; nReader < arrReaders.size(); ++nReader ) {
278 ReaderIterator it = arrReaders[nReader]->m_WriterData[nWriter].begin();
279 ReaderIterator itEnd = arrReaders[nReader]->m_WriterData[nWriter].end();
281 ReaderIterator itPrev = it;
282 for ( ++it; it != itEnd; ++it ) {
283 CPPUNIT_CHECK_EX( *itPrev < *it + nRightOffset,
284 "Reader " << nReader << ", Writer " << nWriter << ": prev=" << *itPrev << ", cur=" << *it );
285 if ( ++nErrors > 10 )
291 for ( it = arrReaders[nReader]->m_WriterData[nWriter].begin(); it != itEnd; ++it )
292 arrData.push_back( *it );
294 std::sort( arrData.begin(), arrData.end() );
296 for ( size_t i=1; i < arrData.size(); ++i ) {
297 if ( arrData[i-1] + 1 != arrData[i] ) {
298 CPPUNIT_CHECK_EX( arrData[i-1] + 1 == arrData[i], "Writer " << nWriter << ": [" << (i-1) << "]=" << arrData[i-1] << ", [" << i << "]=" << arrData[i] );
299 if ( ++nErrors > 10 )
304 CPPUNIT_CHECK_EX( arrData[0] == 0, "Writer " << nWriter << "[0] != 0" );
305 CPPUNIT_CHECK_EX( arrData[arrData.size() - 1] == m_nThreadPushCount - 1, "Writer " << nWriter << "[last] != " << m_nThreadPushCount - 1 );
309 template <class Queue>
310 void test_with( Queue& testQueue, value_array<typename Queue::value_type>& arrValue, size_t nLeftOffset, size_t nRightOffset )
312 m_nThreadPushCount = s_nQueueSize / s_nWriterThreadCount;
313 CPPUNIT_MSG( " reader count=" << s_nReaderThreadCount << " writer count=" << s_nWriterThreadCount
314 << " item count=" << m_nThreadPushCount * s_nWriterThreadCount << "..." );
316 typename Queue::value_type * pValStart = arrValue.get();
317 typename Queue::value_type * pValEnd = pValStart + s_nQueueSize;
319 CppUnitMini::ThreadPool pool( *this );
321 m_nProducerCount.store( s_nWriterThreadCount, CDS_ATOMIC::memory_order_release );
323 // Writers must be first
324 pool.add( new Producer<Queue>( pool, testQueue ), s_nWriterThreadCount );
326 for ( typename Queue::value_type * it = pValStart; it != pValEnd; ++it ) {
329 it->nConsumer = c_nBadConsumer;
332 typename Queue::value_type * pStart = pValStart;
333 for ( CppUnitMini::ThreadPool::iterator it = pool.begin(); it != pool.end(); ++it ) {
334 static_cast<Producer<Queue>* >( *it )->m_pStart = pStart;
335 pStart += m_nThreadPushCount;
336 static_cast<Producer<Queue>* >( *it )->m_pEnd = pStart;
339 pool.add( new Consumer<Queue>( pool, testQueue ), s_nReaderThreadCount );
343 // Check that all values have been dequeued
345 size_t nBadConsumerCount = 0;
346 size_t nQueueSize = m_nThreadPushCount * s_nWriterThreadCount;
347 typename Queue::value_type * pEnd = pValStart + nQueueSize;
348 for ( typename Queue::value_type * it = pValStart; it != pEnd; ++it ) {
349 if ( it->nConsumer == c_nBadConsumer )
352 CPPUNIT_CHECK_EX( nBadConsumerCount == 0, "nBadConsumerCount=" << nBadConsumerCount );
355 analyze( pool, testQueue, nLeftOffset, nRightOffset );
356 CPPUNIT_MSG( testQueue.statistics() );
359 template <typename Queue>
362 value_array<typename Queue::value_type> arrValue( s_nQueueSize );
365 test_with(q, arrValue, 0, 0);
367 Queue::gc::force_dispose();
370 template <typename Queue>
373 value_array<typename Queue::value_type> arrValue( s_nQueueSize );
376 test_with(q, arrValue, 0, 0);
380 template <typename Queue>
383 value_array<typename Queue::value_type> arrValue( s_nQueueSize );
385 test_with(q, arrValue, 0, 0);
388 template <typename Queue>
391 value_array<typename Queue::value_type> arrValue( s_nQueueSize );
392 CPPUNIT_MSG( "Combining pass count: " << s_nFCPassCount << ", compact factor: " << s_nFCCompactFactor );
393 Queue q( s_nFCCompactFactor, s_nFCPassCount );
394 test_with(q, arrValue, 0, 0);
397 template <typename Queue>
398 void test_segmented()
400 value_array<typename Queue::value_type> arrValue( s_nQueueSize );
401 for ( size_t nSegmentSize = 4; nSegmentSize <= 256; nSegmentSize *= 4 ) {
402 CPPUNIT_MSG( "Segment size: " << nSegmentSize );
404 Queue q( nSegmentSize );
405 test_with( q, arrValue, nSegmentSize * 2, nSegmentSize );
407 Queue::gc::force_dispose();
411 template <typename Queue>
414 value_array<typename Queue::value_type> arrValue( s_nQueueSize );
415 for ( size_t nArraySize = 2; nArraySize <= 64; nArraySize *= 2 ) {
416 CPPUNIT_MSG( "Array size: " << nArraySize );
418 Queue q( nArraySize );
419 test_with( q, arrValue, 0, 0 );
421 Queue::gc::force_dispose();
425 void setUpParams( const CppUnitMini::TestCfg& cfg ) {
426 s_nReaderThreadCount = cfg.getULong("ReaderCount", 4 );
427 s_nWriterThreadCount = cfg.getULong("WriterCount", 4 );
428 s_nQueueSize = cfg.getULong("QueueSize", 10000000 );
429 s_nFCPassCount = cfg.getUInt("FCPassCount", 8);
430 s_nFCCompactFactor = cfg.getUInt("FCCompactFactor", 64);
434 CDSUNIT_DECLARE_MSQueue
435 CDSUNIT_DECLARE_MoirQueue
436 CDSUNIT_DECLARE_OptimisticQueue
437 CDSUNIT_DECLARE_BasketQueue
438 CDSUNIT_DECLARE_MichaelDeque
439 CDSUNIT_DECLARE_FCQueue
440 CDSUNIT_DECLARE_SegmentedQueue
441 CDSUNIT_DECLARE_TsigasCycleQueue
442 CDSUNIT_DECLARE_VyukovMPMCCycleQueue
443 CDSUNIT_DECLARE_BoostSList
446 CPPUNIT_TEST_SUITE(IntrusiveQueue_ReaderWriter)
448 CDSUNIT_TEST_MoirQueue
449 CDSUNIT_TEST_OptimisticQueue
450 CDSUNIT_TEST_BasketQueue
451 CDSUNIT_TEST_MichaelDeque
453 CDSUNIT_TEST_SegmentedQueue
454 CDSUNIT_TEST_TsigasCycleQueue
455 CDSUNIT_TEST_VyukovMPMCCycleQueue
456 CDSUNIT_TEST_BoostSList
457 CPPUNIT_TEST_SUITE_END();
462 CPPUNIT_TEST_SUITE_REGISTRATION(queue::IntrusiveQueue_ReaderWriter);