2 This file is a part of libcds - Concurrent Data Structures library
4 (C) Copyright Maxim Khizhinsky (libcds.dev@gmail.com) 2006-2016
6 Source code repo: http://github.com/khizmax/libcds/
7 Download: http://sourceforge.net/projects/libcds/files/
9 Redistribution and use in source and binary forms, with or without
10 modification, are permitted provided that the following conditions are met:
12 * Redistributions of source code must retain the above copyright notice, this
13 list of conditions and the following disclaimer.
15 * Redistributions in binary form must reproduce the above copyright notice,
16 this list of conditions and the following disclaimer in the documentation
17 and/or other materials provided with the distribution.
19 THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
20 AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
21 IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
22 DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE
23 FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
24 DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
25 SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
26 CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,
27 OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
28 OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
31 #include "cppunit/thread.h"
32 #include "queue/intrusive_queue_type.h"
33 #include "queue/intrusive_queue_defs.h"
37 // Multi-threaded random queue test
40 #define TEST_CASE( Q, HOOK ) void Q() { test< Types< Value<HOOK> >::Q >(); }
41 #define TEST_BOUNDED( Q ) void Q() { test_bounded< Types< Value<> >::Q >(); }
42 #define TEST_FCQUEUE( Q, HOOK ) void Q() { test_fcqueue< Types< Value<HOOK> >::Q >(); }
43 #define TEST_SEGMENTED( Q ) void Q() { test_segmented< Types< Value<> >::Q >(); }
44 #define TEST_BOOST( Q, HOOK ) void Q() { test_boost< Types< Value<HOOK> >::Q >(); }
47 static size_t s_nReaderThreadCount = 4;
48 static size_t s_nWriterThreadCount = 4;
49 static size_t s_nQueueSize = 4000000;
50 static unsigned int s_nFCPassCount = 8;
51 static unsigned int s_nFCCompactFactor = 64;
55 template <typename Base = empty >
56 struct Value: public Base
64 class IntrusiveQueue_ReaderWriter: public CppUnitMini::TestCase
66 template <class Queue>
67 class Producer: public CppUnitMini::TestThread
69 virtual TestThread * clone()
71 return new Producer( *this );
78 // Interval in m_arrValue
79 typename Queue::value_type * m_pStart;
80 typename Queue::value_type * m_pEnd;
83 Producer( CppUnitMini::ThreadPool& pool, Queue& q )
84 : CppUnitMini::TestThread( pool )
87 Producer( Producer& src )
88 : CppUnitMini::TestThread( src )
89 , m_Queue( src.m_Queue )
92 IntrusiveQueue_ReaderWriter& getTest()
94 return static_cast<IntrusiveQueue_ReaderWriter&>( m_Pool.m_Test );
99 cds::threading::Manager::attachThread();
103 cds::threading::Manager::detachThread();
110 m_fTime = m_Timer.duration();
113 for ( typename Queue::value_type * p = m_pStart; p < m_pEnd; ) {
115 p->nWriterNo = m_nThreadNo;
116 CDS_TSAN_ANNOTATE_HAPPENS_BEFORE( &p->nWriterNo );
117 if ( m_Queue.push( *p )) {
125 m_fTime = m_Timer.duration() - m_fTime;
126 getTest().m_nProducerCount.fetch_sub( 1, atomics::memory_order_release );
130 template <class Queue>
131 class Consumer: public CppUnitMini::TestThread
133 virtual TestThread * clone()
135 return new Consumer( *this );
144 typedef std::vector<size_t> TPoppedData;
145 typedef std::vector<size_t>::iterator data_iterator;
146 typedef std::vector<size_t>::const_iterator const_data_iterator;
148 std::vector<TPoppedData> m_WriterData;
151 void initPoppedData()
153 const size_t nWriterCount = s_nWriterThreadCount;
154 const size_t nWriterPushCount = getTest().m_nThreadPushCount;
155 m_WriterData.resize( nWriterCount );
156 for ( size_t i = 0; i < nWriterCount; ++i )
157 m_WriterData[i].reserve( nWriterPushCount );
161 Consumer( CppUnitMini::ThreadPool& pool, Queue& q )
162 : CppUnitMini::TestThread( pool )
167 Consumer( Consumer& src )
168 : CppUnitMini::TestThread( src )
169 , m_Queue( src.m_Queue )
174 IntrusiveQueue_ReaderWriter& getTest()
176 return static_cast<IntrusiveQueue_ReaderWriter&>( m_Pool.m_Test );
181 cds::threading::Manager::attachThread();
185 cds::threading::Manager::detachThread();
193 const size_t nTotalWriters = s_nWriterThreadCount;
195 m_fTime = m_Timer.duration();
198 typename Queue::value_type * p = m_Queue.pop();
200 p->nConsumer = m_nThreadNo;
202 CDS_TSAN_ANNOTATE_HAPPENS_AFTER( &p->nWriterNo );
203 if ( p->nWriterNo < nTotalWriters )
204 m_WriterData[ p->nWriterNo ].push_back( p->nNo );
210 if ( getTest().m_nProducerCount.load( atomics::memory_order_acquire ) == 0 && m_Queue.empty() )
215 m_fTime = m_Timer.duration() - m_fTime;
219 template <typename T>
224 value_array( size_t nSize )
225 : m_pArr( new T[nSize] )
233 T * get() const { return m_pArr; }
238 size_t m_nThreadPushCount;
239 atomics::atomic<size_t> m_nProducerCount;
240 static CDS_CONSTEXPR const size_t c_nBadConsumer = 0xbadc0ffe;
243 template <class Queue>
244 void analyze( CppUnitMini::ThreadPool& pool, Queue& testQueue, size_t /*nLeftOffset*/, size_t nRightOffset )
246 typedef Consumer<Queue> Reader;
247 typedef Producer<Queue> Writer;
248 typedef typename Reader::const_data_iterator ReaderIterator;
250 size_t nPostTestPops = 0;
251 while ( testQueue.pop() )
254 double fTimeWriter = 0;
255 double fTimeReader = 0;
256 size_t nTotalPops = 0;
257 size_t nPopFalse = 0;
258 size_t nPoppedItems = 0;
259 size_t nPushFailed = 0;
261 std::vector< Reader * > arrReaders;
263 for ( CppUnitMini::ThreadPool::iterator it = pool.begin(); it != pool.end(); ++it ) {
264 Reader * pReader = dynamic_cast<Reader *>( *it );
266 fTimeReader += pReader->m_fTime;
267 nTotalPops += pReader->m_nPopped;
268 nPopFalse += pReader->m_nPopEmpty;
269 arrReaders.push_back( pReader );
270 CPPUNIT_CHECK_EX( pReader->m_nBadWriter == 0, "reader " << pReader->m_nThreadNo << " bad writer event count=" << pReader->m_nBadWriter );
273 for ( size_t n = 0; n < s_nWriterThreadCount; ++n )
274 nPopped += pReader->m_WriterData[n].size();
276 CPPUNIT_MSG( " Reader " << pReader->m_nThreadNo - s_nWriterThreadCount << " popped count=" << nPopped );
277 nPoppedItems += nPopped;
280 Writer * pWriter = dynamic_cast<Writer *>( *it );
281 CPPUNIT_ASSERT( pWriter != nullptr );
282 fTimeWriter += pWriter->m_fTime;
283 nPushFailed += pWriter->m_nPushFailed;
284 if ( !boost::is_base_of<cds::bounded_container, Queue>::value ) {
285 CPPUNIT_CHECK_EX( pWriter->m_nPushFailed == 0,
286 "writer " << pWriter->m_nThreadNo << " push failed count=" << pWriter->m_nPushFailed );
290 CPPUNIT_CHECK_EX( nTotalPops == nPoppedItems, "nTotalPops=" << nTotalPops << ", nPoppedItems=" << nPoppedItems );
292 CPPUNIT_MSG( " Readers: duration=" << fTimeReader / s_nReaderThreadCount << ", success pop=" << nTotalPops << ", failed pops=" << nPopFalse );
293 CPPUNIT_MSG( " Writers: duration=" << fTimeWriter / s_nWriterThreadCount << ", failed push=" << nPushFailed );
295 size_t nQueueSize = m_nThreadPushCount * s_nWriterThreadCount;
296 CPPUNIT_CHECK_EX( nTotalPops + nPostTestPops == nQueueSize, "popped=" << nTotalPops + nPostTestPops << " must be " << nQueueSize );
297 CPPUNIT_CHECK( testQueue.empty() );
299 // Test that all items have been popped
301 CPPUNIT_MSG( " Test consistency of popped sequence..." );
303 for ( size_t nWriter = 0; nWriter < s_nWriterThreadCount; ++nWriter ) {
304 std::vector<size_t> arrData;
305 arrData.reserve( m_nThreadPushCount );
307 for ( size_t nReader = 0; nReader < arrReaders.size(); ++nReader ) {
308 ReaderIterator it = arrReaders[nReader]->m_WriterData[nWriter].begin();
309 ReaderIterator itEnd = arrReaders[nReader]->m_WriterData[nWriter].end();
311 ReaderIterator itPrev = it;
312 for ( ++it; it != itEnd; ++it ) {
313 CPPUNIT_CHECK_EX( *itPrev < *it + nRightOffset,
314 "Reader " << nReader << ", Writer " << nWriter << ": prev=" << *itPrev << ", cur=" << *it );
315 if ( ++nErrors > 10 )
321 for ( it = arrReaders[nReader]->m_WriterData[nWriter].begin(); it != itEnd; ++it )
322 arrData.push_back( *it );
324 std::sort( arrData.begin(), arrData.end() );
326 for ( size_t i=1; i < arrData.size(); ++i ) {
327 if ( arrData[i-1] + 1 != arrData[i] ) {
328 CPPUNIT_CHECK_EX( arrData[i-1] + 1 == arrData[i], "Writer " << nWriter << ": [" << (i-1) << "]=" << arrData[i-1] << ", [" << i << "]=" << arrData[i] );
329 if ( ++nErrors > 10 )
334 CPPUNIT_CHECK_EX( arrData[0] == 0, "Writer " << nWriter << "[0] != 0" );
335 CPPUNIT_CHECK_EX( arrData[arrData.size() - 1] == m_nThreadPushCount - 1, "Writer " << nWriter << "[last] != " << m_nThreadPushCount - 1 );
339 template <class Queue>
340 void test_with( Queue& testQueue, value_array<typename Queue::value_type>& arrValue, size_t nLeftOffset, size_t nRightOffset )
342 m_nThreadPushCount = s_nQueueSize / s_nWriterThreadCount;
343 CPPUNIT_MSG( " reader count=" << s_nReaderThreadCount << " writer count=" << s_nWriterThreadCount
344 << " item count=" << m_nThreadPushCount * s_nWriterThreadCount << "..." );
346 typename Queue::value_type * pValStart = arrValue.get();
347 typename Queue::value_type * pValEnd = pValStart + s_nQueueSize;
349 CppUnitMini::ThreadPool pool( *this );
351 m_nProducerCount.store( s_nWriterThreadCount, atomics::memory_order_release );
353 // Writers must be first
354 pool.add( new Producer<Queue>( pool, testQueue ), s_nWriterThreadCount );
356 for ( typename Queue::value_type * it = pValStart; it != pValEnd; ++it ) {
359 it->nConsumer = c_nBadConsumer;
362 typename Queue::value_type * pStart = pValStart;
363 for ( CppUnitMini::ThreadPool::iterator it = pool.begin(); it != pool.end(); ++it ) {
364 static_cast<Producer<Queue>* >( *it )->m_pStart = pStart;
365 pStart += m_nThreadPushCount;
366 static_cast<Producer<Queue>* >( *it )->m_pEnd = pStart;
369 pool.add( new Consumer<Queue>( pool, testQueue ), s_nReaderThreadCount );
373 // Check that all values have been dequeued
375 size_t nBadConsumerCount = 0;
376 size_t nQueueSize = m_nThreadPushCount * s_nWriterThreadCount;
377 typename Queue::value_type * pEnd = pValStart + nQueueSize;
378 for ( typename Queue::value_type * it = pValStart; it != pEnd; ++it ) {
379 if ( it->nConsumer == c_nBadConsumer )
382 CPPUNIT_CHECK_EX( nBadConsumerCount == 0, "nBadConsumerCount=" << nBadConsumerCount );
385 analyze( pool, testQueue, nLeftOffset, nRightOffset );
386 CPPUNIT_MSG( testQueue.statistics() );
389 template <typename Queue>
392 value_array<typename Queue::value_type> arrValue( s_nQueueSize );
396 test_with( q, arrValue, 0, 0 );
398 Queue::gc::force_dispose();
402 template <typename Queue>
405 value_array<typename Queue::value_type> arrValue( s_nQueueSize );
408 test_with(q, arrValue, 0, 0);
412 template <typename Queue>
415 value_array<typename Queue::value_type> arrValue( s_nQueueSize );
417 test_with(q, arrValue, 0, 0);
420 template <typename Queue>
423 value_array<typename Queue::value_type> arrValue( s_nQueueSize );
424 CPPUNIT_MSG( "Combining pass count: " << s_nFCPassCount << ", compact factor: " << s_nFCCompactFactor );
425 Queue q( s_nFCCompactFactor, s_nFCPassCount );
426 test_with(q, arrValue, 0, 0);
429 template <typename Queue>
430 void test_segmented()
432 value_array<typename Queue::value_type> arrValue( s_nQueueSize );
433 for ( size_t nSegmentSize = 4; nSegmentSize <= 256; nSegmentSize *= 4 ) {
434 CPPUNIT_MSG( "Segment size: " << nSegmentSize );
436 Queue q( nSegmentSize );
437 test_with( q, arrValue, nSegmentSize * 2, nSegmentSize );
439 Queue::gc::force_dispose();
443 template <typename Queue>
446 value_array<typename Queue::value_type> arrValue( s_nQueueSize );
447 for ( size_t nArraySize = 2; nArraySize <= 64; nArraySize *= 2 ) {
448 CPPUNIT_MSG( "Array size: " << nArraySize );
450 Queue q( nArraySize );
451 test_with( q, arrValue, 0, 0 );
453 Queue::gc::force_dispose();
457 void setUpParams( const CppUnitMini::TestCfg& cfg ) {
458 s_nReaderThreadCount = cfg.getULong("ReaderCount", 4 );
459 s_nWriterThreadCount = cfg.getULong("WriterCount", 4 );
460 s_nQueueSize = cfg.getULong("QueueSize", 10000000 );
461 s_nFCPassCount = cfg.getUInt("FCPassCount", 8);
462 s_nFCCompactFactor = cfg.getUInt("FCCompactFactor", 64);
466 CDSUNIT_DECLARE_MSQueue
467 CDSUNIT_DECLARE_MoirQueue
468 CDSUNIT_DECLARE_OptimisticQueue
469 CDSUNIT_DECLARE_BasketQueue
470 CDSUNIT_DECLARE_FCQueue
471 CDSUNIT_DECLARE_SegmentedQueue
472 CDSUNIT_DECLARE_TsigasCycleQueue
473 CDSUNIT_DECLARE_VyukovMPMCCycleQueue
474 CDSUNIT_DECLARE_BoostSList
477 CPPUNIT_TEST_SUITE(IntrusiveQueue_ReaderWriter)
479 CDSUNIT_TEST_MoirQueue
480 CDSUNIT_TEST_OptimisticQueue
481 CDSUNIT_TEST_BasketQueue
483 CDSUNIT_TEST_SegmentedQueue
484 CDSUNIT_TEST_TsigasCycleQueue
485 CDSUNIT_TEST_VyukovMPMCCycleQueue
486 CDSUNIT_TEST_BoostSList
487 CPPUNIT_TEST_SUITE_END();
492 CPPUNIT_TEST_SUITE_REGISTRATION(queue::IntrusiveQueue_ReaderWriter);