// Inheriting constructors
#define CDS_CXX11_INHERITING_CTOR
+
+// *************************************************
+// Features
+#if defined(__has_feature) && __has_feature(thread_sanitizer)
+# define CDS_THREAD_SANITIZER_ENABLED
+#endif
+
// *************************************************
// Alignment macro
//$$CDS-header$$
-#ifndef CDSLIB_ARH_COMPILER_DEFS_H
-#define CDSLIB_ARH_COMPILER_DEFS_H
+#ifndef CDSLIB_COMPILER_DEFS_H
+#define CDSLIB_COMPILER_DEFS_H
/*
Required C++11 features:
# define CDS_EXPORT_API
#endif
-#endif // #ifndef CDSLIB_ARH_COMPILER_DEFS_H
+// Features
+#include <cds/compiler/feature_tsan.h>
+
+#endif // #ifndef CDSLIB_COMPILER_DEFS_H
--- /dev/null
+//$$CDS-header$$
+
+#ifndef CDSLIB_COMPILER_FEATURE_TSAN_H
+#define CDSLIB_COMPILER_FEATURE_TSAN_H
+
+// Thread Sanitizer annotations.
+// From https://groups.google.com/d/msg/thread-sanitizer/SsrHB7FTnTk/mNTGNLQj-9cJ
+
+#ifdef CDS_THREAD_SANITIZER_ENABLED
+# define CDS_TSAN_ANNOTATE_HAPPENS_BEFORE(addr) AnnotateHappensBefore(__FILE__, __LINE__, (void*)(addr))\r
+# define CDS_TSAN_ANNOTATE_HAPPENS_AFTER(addr) AnnotateHappensAfter(__FILE__, __LINE__, (void*)(addr))\r
+\r
+# define CDS_TSAN_ANNOTATE_IGNORE_READS_BEGIN AnnotateIgnoreReadsBegin(__FILE__, __LINE__)\r
+# define CDS_TSAN_ANNOTATE_IGNORE_READS_END AnnotateIgnoreReadsEnd(__FILE__, __LINE__)\r
+# define CDS_TSAN_ANNOTATE_IGNORE_WRITES_BEGIN AnnotateIgnoreWritesBegin(__FILE__, __LINE__)\r
+# define CDS_TSAN_ANNOTATE_IGNORE_WRITES_END AnnotateIgnoreWritesEnd(__FILE__, __LINE__)\r
+# define CDS_TSAN_ANNOTATE_IGNORE_RW_BEGIN \\r
+ CDS_TSAN_ANNOTATE_IGNORE_READS_BEGIN; \\r
+ CDS_TSAN_ANNOTATE_IGNORE_WRITES_BEGIN\r
+# define CDS_TSAN_ANNOTATE_IGNORE_RW_END \\r
+ CDS_TSAN_ANNOTATE_IGNORE_WRITES_END;\\r
+ CDS_TSAN_ANNOTATE_IGNORE_READS_END\r
+\r
+ // provided by TSan\r
+ extern "C" {\r
+ void AnnotateHappensBefore(const char *f, int l, void *addr);\r
+ void AnnotateHappensAfter(const char *f, int l, void *addr);\r
+\r
+ void AnnotateIgnoreReadsBegin(const char *f, int l);\r
+ void AnnotateIgnoreReadsEnd(const char *f, int l);\r
+ void AnnotateIgnoreWritesBegin(const char *f, int l);\r
+ void AnnotateIgnoreWritesEnd(const char *f, int l);\r
+ }\r
+#else\r
+# define CDS_TSAN_ANNOTATE_HAPPENS_BEFORE(addr)\r
+# define CDS_TSAN_ANNOTATE_HAPPENS_AFTER(addr)
+
+# define CDS_TSAN_ANNOTATE_IGNORE_READS_BEGIN\r
+# define CDS_TSAN_ANNOTATE_IGNORE_READS_END\r
+# define CDS_TSAN_ANNOTATE_IGNORE_WRITES_BEGIN\r
+# define CDS_TSAN_ANNOTATE_IGNORE_WRITES_END\r
+# define CDS_TSAN_ANNOTATE_IGNORE_RW_BEGIN\r
+# define CDS_TSAN_ANNOTATE_IGNORE_RW_END\r
+#endif
+
+#endif // #ifndef CDSLIB_COMPILER_FEATURE_TSAN_H
// Inheriting constructors
#define CDS_CXX11_INHERITING_CTOR
+// *************************************************
+// Features
+// If you run under Thread Sanitizer, pass -DCDS_THREAD_SANITIZER_ENABLED in compiler command line
+//#define CDS_THREAD_SANITIZER_ENABLED
+
// *************************************************
// Alignment macro
template <typename K>
static node_type * alloc_node(const K& key)
{
- return cxx_allocator().New( key );
+ CDS_TSAN_ANNOTATE_IGNORE_WRITES_BEGIN;
+ node_type * p = cxx_allocator().New( key );
+ CDS_TSAN_ANNOTATE_IGNORE_WRITES_END;
+ return p;
}
template <typename K, typename V>
static node_type * alloc_node( const K& key, const V& val )
{
- return cxx_allocator().New( key, val );
+ CDS_TSAN_ANNOTATE_IGNORE_WRITES_BEGIN;
+ node_type * p = cxx_allocator().New( key, val );
+ CDS_TSAN_ANNOTATE_IGNORE_WRITES_END;
+ return p;
}
template <typename... Args>
static node_type * alloc_node( Args&&... args )
{
- return cxx_allocator().MoveNew( std::forward<Args>(args)... );
+ CDS_TSAN_ANNOTATE_IGNORE_WRITES_BEGIN;
+ node_type * p = cxx_allocator().MoveNew( std::forward<Args>(args)... );
+ CDS_TSAN_ANNOTATE_IGNORE_WRITES_END;
+ return p;
}
static void free_node( node_type * pNode )
link_checker::is_empty( pNode );
marked_node_ptr cur(pos.pCur);
- pNode->m_pNext.store( cur, memory_model::memory_order_relaxed );
+ pNode->m_pNext.store( cur, memory_model::memory_order_release );
return pos.pPrev->compare_exchange_strong( cur, marked_node_ptr(pNode), memory_model::memory_order_release, atomics::memory_order_relaxed );
}
// physical deletion may be performed by search function if it detects that a node is logically deleted (marked)
// CAS may be successful here or in other thread that searching something
marked_node_ptr cur(pos.pCur);
- if ( pos.pPrev->compare_exchange_strong( cur, marked_node_ptr( pos.pNext ), memory_model::memory_order_release, atomics::memory_order_relaxed ))
+ if ( pos.pPrev->compare_exchange_strong( cur, marked_node_ptr( pos.pNext ), memory_model::memory_order_acquire, atomics::memory_order_relaxed ))
retire_node( pos.pCur );
return true;
}
return false;
}
- pNext = pCur->m_pNext.load(memory_model::memory_order_relaxed);
+ pNext = pCur->m_pNext.load(memory_model::memory_order_acquire);
pos.guards.assign( position::guard_next_item, node_traits::to_value_ptr( pNext.ptr() ));
- if ( pCur->m_pNext.load(memory_model::memory_order_acquire).all() != pNext.all() ) {
+ if ( pCur->m_pNext.load(memory_model::memory_order_relaxed).all() != pNext.all() ) {
bkoff();
goto try_again;
}
if ( pNext.bits() == 1 ) {
// pCur marked i.e. logically deleted. Help the erase/unlink function to unlink pCur node
marked_node_ptr cur( pCur.ptr());
- if ( pPrev->compare_exchange_strong( cur, marked_node_ptr( pNext.ptr() ), memory_model::memory_order_release, atomics::memory_order_relaxed )) {
+ if ( pPrev->compare_exchange_strong( cur, marked_node_ptr( pNext.ptr() ), memory_model::memory_order_acquire, atomics::memory_order_relaxed )) {
retire_node( pCur.ptr() );
}
else {
while ( pCur.ptr() != pTail && ( pCur.ptr() == pHead || cmp( *node_traits::to_value_ptr( *pCur.ptr() ), key ) < 0 )) {
pPrev = pCur;
- pCur = pCur->m_pNext.load(memory_model::memory_order_relaxed);
+ pCur = pCur->m_pNext.load(memory_model::memory_order_acquire);
}
pos.pCur = pCur.ptr();
link_checker::is_empty( pNode );
marked_node_ptr p( pos.pCur );
- pNode->m_pNext.store( p, memory_model::memory_order_relaxed );
+ pNode->m_pNext.store( p, memory_model::memory_order_release );
return pos.pPrev->compare_exchange_strong( p, marked_node_ptr(pNode), memory_model::memory_order_release, atomics::memory_order_relaxed );
}
{
// Mark the node (logical deleting)
marked_node_ptr next(pos.pNext, 0);
- if ( pos.pCur->m_pNext.compare_exchange_strong( next, marked_node_ptr(pos.pNext, 1), memory_model::memory_order_acquire, atomics::memory_order_relaxed )) {
+ if ( pos.pCur->m_pNext.compare_exchange_strong( next, marked_node_ptr(pos.pNext, 1), memory_model::memory_order_release, atomics::memory_order_relaxed )) {
marked_node_ptr cur(pos.pCur);
- if ( pos.pPrev->compare_exchange_strong( cur, marked_node_ptr( pos.pNext ), memory_model::memory_order_release, atomics::memory_order_relaxed ))
+ if ( pos.pPrev->compare_exchange_strong( cur, marked_node_ptr( pos.pNext ), memory_model::memory_order_acquire, atomics::memory_order_relaxed ))
return true;
next |= 1;
- CDS_VERIFY( pos.pCur->m_pNext.compare_exchange_strong( next, next ^ 1, memory_model::memory_order_release, atomics::memory_order_relaxed ));
+ CDS_VERIFY( pos.pCur->m_pNext.compare_exchange_strong( next, next ^ 1, memory_model::memory_order_relaxed, atomics::memory_order_relaxed ));
}
return false;
}
while ( true ) {
if ( !pCur.ptr() ) {
pos.pPrev = pPrev;
- pos.pCur = pCur.ptr();
- pos.pNext = pNext.ptr();
+ pos.pCur = nullptr;
+ pos.pNext = nullptr;
return false;
}
- pNext = pCur->m_pNext.load(memory_model::memory_order_relaxed);
+ pNext = pCur->m_pNext.load(memory_model::memory_order_acquire);
- if ( pPrev->load(memory_model::memory_order_acquire) != pCur
- || pNext != pCur->m_pNext.load(memory_model::memory_order_acquire)
+ if ( pPrev->load(memory_model::memory_order_relaxed) != pCur
+ || pNext != pCur->m_pNext.load(memory_model::memory_order_relaxed)
|| pNext.bits() != 0 ) // pNext contains deletion mark for pCur
{
// if pCur is marked as deleted (pNext.bits() != 0)
const size_t nNewMaxCount = (nBucketCount < m_Buckets.capacity()) ? max_item_count( nBucketCount << 1, nLoadFactor )
: std::numeric_limits<size_t>::max();
- m_nMaxItemCount.compare_exchange_strong( nMaxCount, nNewMaxCount, memory_model::memory_order_relaxed,
- memory_model::memory_order_relaxed );
- m_nBucketCountLog2.compare_exchange_strong( sz, sz + 1, memory_model::memory_order_relaxed, memory_model::memory_order_relaxed );
+ m_nMaxItemCount.compare_exchange_strong( nMaxCount, nNewMaxCount, memory_model::memory_order_relaxed, atomics::memory_order_relaxed );
+ m_nBucketCountLog2.compare_exchange_strong( sz, sz + 1, memory_model::memory_order_relaxed, atomics::memory_order_relaxed );
}
template <typename Q, typename Compare, typename Func>
{
epoch_retired_ptr p;
while ( m_Buffer.pop( p )) {
- if ( p.m_nEpoch <= nEpoch )
+ if ( p.m_nEpoch <= nEpoch ) {
+ CDS_TSAN_ANNOTATE_IGNORE_RW_BEGIN;
p.free();
+ CDS_TSAN_ANNOTATE_IGNORE_RW_END;
+ }
else {
push_buffer( p );
break;
bool bPushed = m_Buffer.push( ep );
if ( !bPushed || m_Buffer.size() >= capacity() ) {
synchronize();
- if ( !bPushed )
+ if ( !bPushed ) {
+ CDS_TSAN_ANNOTATE_IGNORE_RW_BEGIN;
ep.free();
+ CDS_TSAN_ANNOTATE_IGNORE_RW_END;
+ }
return true;
}
return false;
virtual void retire_ptr( retired_ptr& p )
{
synchronize();
- if ( p.m_p )
+ if ( p.m_p ) {
+ CDS_TSAN_ANNOTATE_IGNORE_RW_BEGIN;
p.free();
+ CDS_TSAN_ANNOTATE_IGNORE_RW_END;
+ }
}
/// Retires the pointer chain [\p itFirst, \p itLast)
bool bPushed = m_Buffer.push( p );
if ( !bPushed || m_Buffer.size() >= capacity() ) {
synchronize();
- if ( !bPushed )
+ if ( !bPushed ) {
+ CDS_TSAN_ANNOTATE_IGNORE_RW_BEGIN;
p.free();
+ CDS_TSAN_ANNOTATE_IGNORE_RW_END;
+ }
return true;
}
return false;
protected:
//@cond
- buffer_type m_Buffer;
- atomics::atomic<uint64_t> m_nCurEpoch;
- lock_type m_Lock;
- size_t const m_nCapacity;
+ buffer_type m_Buffer;
+ atomics::atomic<uint64_t> m_nCurEpoch;
+ lock_type m_Lock;
+ size_t const m_nCapacity;
//@endcond
public:
{
epoch_retired_ptr p;
while ( m_Buffer.pop( p )) {
- if ( p.m_nEpoch <= nEpoch )
+ if ( p.m_nEpoch <= nEpoch ) {
+ CDS_TSAN_ANNOTATE_IGNORE_RW_BEGIN;
p.free();
+ CDS_TSAN_ANNOTATE_IGNORE_RW_END;
+ }
else {
push_buffer( p );
break;
bool bPushed = m_Buffer.push( ep );
if ( !bPushed || m_Buffer.size() >= capacity() ) {
synchronize();
- if ( !bPushed )
+ if ( !bPushed ) {
+ CDS_TSAN_ANNOTATE_IGNORE_RW_BEGIN;
ep.free();
+ CDS_TSAN_ANNOTATE_IGNORE_RW_END;
+ }
return true;
}
return false;
bool bPushed = m_Buffer.push( p );
if ( !bPushed || m_Buffer.size() >= capacity() ) {
synchronize();
- if ( !bPushed )
+ if ( !bPushed ) {
+ CDS_TSAN_ANNOTATE_IGNORE_RW_BEGIN;
p.free();
+ CDS_TSAN_ANNOTATE_IGNORE_RW_END;
+ }
return true;
}
return false;
{
epoch_retired_ptr p;
while ( pBuf->pop( p ) ) {
- if ( p.m_nEpoch <= nCurEpoch )
+ if ( p.m_nEpoch <= nCurEpoch ) {
+ CDS_TSAN_ANNOTATE_IGNORE_RW_BEGIN;
p.free();
+ CDS_TSAN_ANNOTATE_IGNORE_RW_END;
+ }
else {
pBuf->push( p );
break;
<ClInclude Include="..\..\..\cds\algo\int_algo.h" />\r
<ClInclude Include="..\..\..\cds\compiler\clang\defs.h" />\r
<ClInclude Include="..\..\..\cds\compiler\cxx11_atomic.h" />\r
+ <ClInclude Include="..\..\..\cds\compiler\feature_tsan.h" />\r
<ClInclude Include="..\..\..\cds\compiler\gcc\amd64\cxx11_atomic.h" />\r
<ClInclude Include="..\..\..\cds\compiler\gcc\compiler_macro.h" />\r
<ClInclude Include="..\..\..\cds\compiler\gcc\ia64\cxx11_atomic.h" />\r
<ClInclude Include="..\..\..\cds\sync\pool_monitor.h">\r
<Filter>Header Files\cds\sync</Filter>\r
</ClInclude>\r
+ <ClInclude Include="..\..\..\cds\compiler\feature_tsan.h">\r
+ <Filter>Header Files\cds\compiler</Filter>\r
+ </ClInclude>\r
</ItemGroup>\r
</Project>
\ No newline at end of file
//$$CDS-header$$
+#include <chrono>
+#include <cds/details/defs.h> // TSan annotations
#include "cppunit/thread.h"
-#include <boost/date_time/posix_time/posix_time_types.hpp>
namespace CppUnitMini {
ThreadPool::~ThreadPool()
{
+ CDS_TSAN_ANNOTATE_IGNORE_RW_BEGIN;
+
delete m_pBarrierStart;
delete m_pBarrierDone;
for ( size_t i = 0; i < m_arrThreads.size(); ++i )
delete m_arrThreads[i];
m_arrThreads.resize( 0 );
+
+ CDS_TSAN_ANNOTATE_IGNORE_RW_END;
}
void ThreadPool::add( TestThread * pThread, size_t nCount )
// Wait while all threads is done
m_pBarrierDone->wait();
- boost::this_thread::sleep(boost::posix_time::milliseconds(500));
+ std::this_thread::sleep_for(std::chrono::milliseconds(500));
}
void ThreadPool::run( unsigned int nDuration )
for ( size_t i = 0; i < nThreadCount; ++i )
m_arrThreads[i]->create();
- boost::system_time stEnd( boost::get_system_time() + boost::posix_time::seconds( nDuration ) );
+ auto stEnd(std::chrono::steady_clock::now() + std::chrono::seconds( nDuration ));
do {
- boost::this_thread::sleep( stEnd );
- } while ( boost::get_system_time() < stEnd );
+ std::this_thread::sleep_until( stEnd );
+ } while ( std::chrono::steady_clock::now() < stEnd );
for ( size_t i = 0; i < nThreadCount; ++i )
m_arrThreads[i]->stop();
// Wait while all threads is done
m_pBarrierDone->wait();
- boost::this_thread::sleep(boost::posix_time::milliseconds(500));
+ std::this_thread::sleep_for(std::chrono::milliseconds(500));
}
void ThreadPool::onThreadInitDone( TestThread * pThread )