2 This file is a part of libcds - Concurrent Data Structures library
4 (C) Copyright Maxim Khizhinsky (libcds.dev@gmail.com) 2006-2016
6 Source code repo: http://github.com/khizmax/libcds/
7 Download: http://sourceforge.net/projects/libcds/files/
9 Redistribution and use in source and binary forms, with or without
10 modification, are permitted provided that the following conditions are met:
12 * Redistributions of source code must retain the above copyright notice, this
13 list of conditions and the following disclaimer.
15 * Redistributions in binary form must reproduce the above copyright notice,
16 this list of conditions and the following disclaimer in the documentation
17 and/or other materials provided with the distribution.
19 THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
20 AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
21 IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
22 DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE
23 FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
24 DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
25 SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
26 CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,
27 OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
28 OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
31 #include "cppunit/thread.h"
32 #include "queue/queue_type.h"
33 #include "queue/queue_defs.h"
38 // Multi-threaded random queue test
41 #define TEST_CASE( Q, V ) void Q() { test< Types<V>::Q >(); }
42 #define TEST_BOUNDED( Q, V ) TEST_CASE( Q, V )
43 #define TEST_SEGMENTED( Q, V ) void Q() { test_segmented< Types< V >::Q >(); }
46 static size_t s_nReaderThreadCount = 4;
47 static size_t s_nWriterThreadCount = 4;
48 static size_t s_nQueueSize = 4000000;
56 class Queue_ReaderWriter: public CppUnitMini::TestCase
58 template <class Queue>
59 class WriterThread: public CppUnitMini::TestThread
61 virtual TestThread * clone()
63 return new WriterThread( *this );
71 WriterThread( CppUnitMini::ThreadPool& pool, Queue& q )
72 : CppUnitMini::TestThread( pool )
75 WriterThread( WriterThread& src )
76 : CppUnitMini::TestThread( src )
77 , m_Queue( src.m_Queue )
80 Queue_ReaderWriter& getTest()
82 return reinterpret_cast<Queue_ReaderWriter&>( m_Pool.m_Test );
87 cds::threading::Manager::attachThread();
91 cds::threading::Manager::detachThread();
96 size_t nPushCount = getTest().m_nThreadPushCount;
98 v.nWriterNo = m_nThreadNo;
102 m_fTime = m_Timer.duration();
104 while ( v.nNo < nPushCount ) {
105 if ( m_Queue.push( v ))
111 m_fTime = m_Timer.duration() - m_fTime;
112 getTest().m_nWriterDone.fetch_add( 1 );
116 template <class Queue>
117 class ReaderThread: public CppUnitMini::TestThread
119 virtual TestThread * clone()
121 return new ReaderThread( *this );
130 typedef std::vector<size_t> TPoppedData;
131 typedef std::vector<size_t>::iterator data_iterator;
132 typedef std::vector<size_t>::const_iterator const_data_iterator;
134 std::vector<TPoppedData> m_WriterData;
137 void initPoppedData()
139 const size_t nWriterCount = s_nWriterThreadCount;
140 const size_t nWriterPushCount = getTest().m_nThreadPushCount;
141 m_WriterData.resize( nWriterCount );
142 for ( size_t i = 0; i < nWriterCount; ++i )
143 m_WriterData[i].reserve( nWriterPushCount );
147 ReaderThread( CppUnitMini::ThreadPool& pool, Queue& q )
148 : CppUnitMini::TestThread( pool )
153 ReaderThread( ReaderThread& src )
154 : CppUnitMini::TestThread( src )
155 , m_Queue( src.m_Queue )
160 Queue_ReaderWriter& getTest()
162 return reinterpret_cast<Queue_ReaderWriter&>( m_Pool.m_Test );
167 cds::threading::Manager::attachThread();
171 cds::threading::Manager::detachThread();
179 const size_t nTotalWriters = s_nWriterThreadCount;
182 m_fTime = m_Timer.duration();
185 if ( m_Queue.pop( v ) ) {
187 if ( /*v.nWriterNo >= 0 &&*/ v.nWriterNo < nTotalWriters )
188 m_WriterData[ v.nWriterNo ].push_back( v.nNo );
195 if ( m_Queue.empty() ) {
196 if ( getTest().m_nWriterDone.load() >= nTotalWriters ) {
197 if ( m_Queue.empty() )
203 m_fTime = m_Timer.duration() - m_fTime;
208 size_t m_nThreadPushCount;
209 atomics::atomic<size_t> m_nWriterDone;
212 template <class Queue>
213 void analyze( CppUnitMini::ThreadPool& pool, Queue& testQueue, size_t /*nLeftOffset*/ = 0, size_t nRightOffset = 0 )
215 typedef ReaderThread<Queue> Reader;
216 typedef WriterThread<Queue> Writer;
217 typedef typename Reader::const_data_iterator ReaderIterator;
219 size_t nPostTestPops = 0;
222 while ( testQueue.pop( v ))
226 double fTimeWriter = 0;
227 double fTimeReader = 0;
228 size_t nTotalPops = 0;
229 size_t nPopFalse = 0;
230 size_t nPoppedItems = 0;
231 size_t nPushFailed = 0;
233 std::vector< Reader * > arrReaders;
235 for ( CppUnitMini::ThreadPool::iterator it = pool.begin(); it != pool.end(); ++it ) {
236 Reader * pReader = dynamic_cast<Reader *>( *it );
238 fTimeReader += pReader->m_fTime;
239 nTotalPops += pReader->m_nPopped;
240 nPopFalse += pReader->m_nPopEmpty;
241 arrReaders.push_back( pReader );
242 CPPUNIT_CHECK_EX( pReader->m_nBadWriter == 0, "reader " << pReader->m_nThreadNo << " bad writer event count=" << pReader->m_nBadWriter );
245 for ( size_t n = 0; n < s_nWriterThreadCount; ++n )
246 nPopped += pReader->m_WriterData[n].size();
248 CPPUNIT_MSG( " Reader " << pReader->m_nThreadNo - s_nWriterThreadCount << " popped count=" << nPopped );
249 nPoppedItems += nPopped;
252 Writer * pWriter = dynamic_cast<Writer *>( *it );
253 CPPUNIT_ASSERT( pWriter != nullptr );
254 fTimeWriter += pWriter->m_fTime;
255 nPushFailed += pWriter->m_nPushFailed;
256 if ( !boost::is_base_of<cds::bounded_container, Queue>::value ) {
257 CPPUNIT_CHECK_EX( pWriter->m_nPushFailed == 0,
258 "writer " << pWriter->m_nThreadNo << " push failed count=" << pWriter->m_nPushFailed );
262 CPPUNIT_CHECK_EX( nTotalPops == nPoppedItems, "nTotalPops=" << nTotalPops << ", nPoppedItems=" << nPoppedItems );
264 CPPUNIT_MSG( " Readers: duration=" << fTimeReader / s_nReaderThreadCount << ", success pop=" << nTotalPops << ", failed pops=" << nPopFalse );
265 CPPUNIT_MSG( " Writers: duration=" << fTimeWriter / s_nWriterThreadCount << ", failed push=" << nPushFailed );
267 size_t nQueueSize = m_nThreadPushCount * s_nWriterThreadCount;
268 CPPUNIT_CHECK_EX( nTotalPops + nPostTestPops == nQueueSize, "popped=" << nTotalPops + nPostTestPops << " must be " << nQueueSize );
269 CPPUNIT_CHECK( testQueue.empty() );
271 // Test that all items have been popped
272 CPPUNIT_MSG( " Test consistency of popped sequence..." );
273 for ( size_t nWriter = 0; nWriter < s_nWriterThreadCount; ++nWriter ) {
274 std::vector<size_t> arrData;
275 arrData.reserve( m_nThreadPushCount );
277 for ( size_t nReader = 0; nReader < arrReaders.size(); ++nReader ) {
278 ReaderIterator it = arrReaders[nReader]->m_WriterData[nWriter].begin();
279 ReaderIterator itEnd = arrReaders[nReader]->m_WriterData[nWriter].end();
281 ReaderIterator itPrev = it;
282 for ( ++it; it != itEnd; ++it ) {
283 CPPUNIT_CHECK_EX( *itPrev < *it + nRightOffset, "Reader " << nReader << ", Writer " << nWriter << ": prev=" << *itPrev << ", cur=" << *it );
284 if ( ++nErrors > 10 )
290 for ( it = arrReaders[nReader]->m_WriterData[nWriter].begin(); it != itEnd; ++it )
291 arrData.push_back( *it );
294 std::sort( arrData.begin(), arrData.end() );
296 for ( size_t i=1; i < arrData.size(); ++i ) {
297 if ( arrData[i-1] + 1 != arrData[i] ) {
298 CPPUNIT_CHECK_EX( arrData[i-1] + 1 == arrData[i], "Writer " << nWriter << ": [" << (i-1) << "]=" << arrData[i-1] << ", [" << i << "]=" << arrData[i] );
299 if ( ++nErrors > 10 )
304 CPPUNIT_CHECK_EX( arrData[0] == 0, "Writer " << nWriter << "[0] != 0" );
305 CPPUNIT_CHECK_EX( arrData[arrData.size() - 1] == m_nThreadPushCount - 1, "Writer " << nWriter << "[last] != " << m_nThreadPushCount - 1 );
309 template <class Queue>
312 m_nThreadPushCount = s_nQueueSize / s_nWriterThreadCount;
313 CPPUNIT_MSG( " reader count=" << s_nReaderThreadCount << " writer count=" << s_nWriterThreadCount
314 << " item count=" << m_nThreadPushCount * s_nWriterThreadCount << "..." );
317 CppUnitMini::ThreadPool pool( *this );
319 m_nWriterDone.store( 0 );
321 // Writers must be first
322 pool.add( new WriterThread<Queue>( pool, testQueue ), s_nWriterThreadCount );
323 pool.add( new ReaderThread<Queue>( pool, testQueue ), s_nReaderThreadCount );
325 //CPPUNIT_MSG( " Reader/Writer test, reader count=" << s_nReaderThreadCount << " writer count=" << s_nWriterThreadCount << "..." );
328 analyze( pool, testQueue );
329 CPPUNIT_MSG( testQueue.statistics() );
332 template <class Queue>
333 void test_segmented()
335 m_nThreadPushCount = s_nQueueSize / s_nWriterThreadCount;
336 CPPUNIT_MSG( " reader count=" << s_nReaderThreadCount << " writer count=" << s_nWriterThreadCount
337 << " item count=" << m_nThreadPushCount * s_nWriterThreadCount << "..." );
339 for ( size_t nSegmentSize = 4; nSegmentSize <= 256; nSegmentSize *= 4 ) {
340 CPPUNIT_MSG( "Segment size: " << nSegmentSize );
342 Queue q( nSegmentSize );
343 CppUnitMini::ThreadPool pool( *this );
345 m_nWriterDone.store( 0 );
347 // Writers must be first
348 pool.add( new WriterThread<Queue>( pool, q ), s_nWriterThreadCount );
349 pool.add( new ReaderThread<Queue>( pool, q ), s_nReaderThreadCount );
353 analyze( pool, q, nSegmentSize * 2, nSegmentSize );
354 CPPUNIT_MSG( q.statistics() );
358 void setUpParams( const CppUnitMini::TestCfg& cfg ) {
359 s_nReaderThreadCount = cfg.getULong("ReaderCount", 4 );
360 s_nWriterThreadCount = cfg.getULong("WriterCount", 4 );
361 s_nQueueSize = cfg.getULong("QueueSize", 10000000 );
365 CDSUNIT_DECLARE_MoirQueue( Value )
366 CDSUNIT_DECLARE_MSQueue( Value )
367 CDSUNIT_DECLARE_OptimisticQueue( Value )
368 CDSUNIT_DECLARE_BasketQueue( Value )
369 CDSUNIT_DECLARE_FCQueue( Value )
370 CDSUNIT_DECLARE_FCDeque( Value )
371 CDSUNIT_DECLARE_SegmentedQueue( Value )
372 CDSUNIT_DECLARE_RWQueue( Value )
373 CDSUNIT_DECLARE_TsigasCycleQueue( Value )
374 CDSUNIT_DECLARE_VyukovMPMCCycleQueue( Value )
375 CDSUNIT_DECLARE_StdQueue( Value )
377 CPPUNIT_TEST_SUITE(Queue_ReaderWriter)
378 CDSUNIT_TEST_MoirQueue
380 CDSUNIT_TEST_OptimisticQueue
381 CDSUNIT_TEST_BasketQueue
384 CDSUNIT_TEST_SegmentedQueue
386 CDSUNIT_TEST_TsigasCycleQueue
387 CDSUNIT_TEST_VyukovMPMCCycleQueue
388 CDSUNIT_TEST_StdQueue
389 CPPUNIT_TEST_SUITE_END();
394 CPPUNIT_TEST_SUITE_REGISTRATION(queue::Queue_ReaderWriter);