Fixed data races found by tsan
authorkhizmax <libcds.dev@gmail.com>
Wed, 22 Apr 2015 20:12:06 +0000 (23:12 +0300)
committerkhizmax <libcds.dev@gmail.com>
Wed, 22 Apr 2015 20:12:06 +0000 (23:12 +0300)
18 files changed:
cds/compiler/clang/defs.h
cds/compiler/defs.h
cds/compiler/feature_tsan.h [new file with mode: 0644]
cds/compiler/gcc/defs.h
cds/container/lazy_kvlist_rcu.h
cds/intrusive/impl/michael_list.h
cds/intrusive/lazy_list_rcu.h
cds/intrusive/michael_list_rcu.h
cds/intrusive/split_list.h
cds/urcu/details/gpb.h
cds/urcu/details/gpi.h
cds/urcu/details/gpt.h
cds/urcu/details/sig_buffered.h
cds/urcu/details/sig_threaded.h
cds/urcu/dispose_thread.h
projects/Win/vc12/cds.vcxproj
projects/Win/vc12/cds.vcxproj.filters
tests/cppunit/thread.cpp

index 182a5b8302c7b04db750bd1e381306527f360735..ff6fb794b6da46f9aa88774e7f1b02de0970ebf4 100644 (file)
 // Inheriting constructors
 #define CDS_CXX11_INHERITING_CTOR
 
+
+// *************************************************
+// Features
+#if defined(__has_feature) && __has_feature(thread_sanitizer)
+#   define CDS_THREAD_SANITIZER_ENABLED
+#endif
+
 // *************************************************
 // Alignment macro
 
index 1b4739b47c4d4307716dd7bec56cb5a6f884e3d9..a87195bd36a462f200488aa24d4f13961d505e86 100644 (file)
@@ -1,7 +1,7 @@
 //$$CDS-header$$
 
-#ifndef CDSLIB_ARH_COMPILER_DEFS_H
-#define CDSLIB_ARH_COMPILER_DEFS_H
+#ifndef CDSLIB_COMPILER_DEFS_H
+#define CDSLIB_COMPILER_DEFS_H
 
 /*
     Required C++11 features:
@@ -37,4 +37,7 @@
 #   define CDS_EXPORT_API
 #endif
 
-#endif  // #ifndef CDSLIB_ARH_COMPILER_DEFS_H
+// Features
+#include <cds/compiler/feature_tsan.h>
+
+#endif  // #ifndef CDSLIB_COMPILER_DEFS_H
diff --git a/cds/compiler/feature_tsan.h b/cds/compiler/feature_tsan.h
new file mode 100644 (file)
index 0000000..926876a
--- /dev/null
@@ -0,0 +1,46 @@
+//$$CDS-header$$
+
+#ifndef CDSLIB_COMPILER_FEATURE_TSAN_H
+#define CDSLIB_COMPILER_FEATURE_TSAN_H
+
+// Thread Sanitizer annotations.
+// From https://groups.google.com/d/msg/thread-sanitizer/SsrHB7FTnTk/mNTGNLQj-9cJ
+
+#ifdef CDS_THREAD_SANITIZER_ENABLED
+#   define CDS_TSAN_ANNOTATE_HAPPENS_BEFORE(addr)   AnnotateHappensBefore(__FILE__, __LINE__, (void*)(addr))\r
+#   define CDS_TSAN_ANNOTATE_HAPPENS_AFTER(addr)    AnnotateHappensAfter(__FILE__, __LINE__, (void*)(addr))\r
+\r
+#   define CDS_TSAN_ANNOTATE_IGNORE_READS_BEGIN     AnnotateIgnoreReadsBegin(__FILE__, __LINE__)\r
+#   define CDS_TSAN_ANNOTATE_IGNORE_READS_END       AnnotateIgnoreReadsEnd(__FILE__, __LINE__)\r
+#   define CDS_TSAN_ANNOTATE_IGNORE_WRITES_BEGIN    AnnotateIgnoreWritesBegin(__FILE__, __LINE__)\r
+#   define CDS_TSAN_ANNOTATE_IGNORE_WRITES_END      AnnotateIgnoreWritesEnd(__FILE__, __LINE__)\r
+#   define CDS_TSAN_ANNOTATE_IGNORE_RW_BEGIN        \\r
+                                                    CDS_TSAN_ANNOTATE_IGNORE_READS_BEGIN; \\r
+                                                    CDS_TSAN_ANNOTATE_IGNORE_WRITES_BEGIN\r
+#   define CDS_TSAN_ANNOTATE_IGNORE_RW_END          \\r
+                                                    CDS_TSAN_ANNOTATE_IGNORE_WRITES_END;\\r
+                                                    CDS_TSAN_ANNOTATE_IGNORE_READS_END\r
+\r
+    // provided by TSan\r
+    extern "C" {\r
+        void AnnotateHappensBefore(const char *f, int l, void *addr);\r
+        void AnnotateHappensAfter(const char *f, int l, void *addr);\r
+\r
+        void AnnotateIgnoreReadsBegin(const char *f, int l);\r
+        void AnnotateIgnoreReadsEnd(const char *f, int l);\r
+        void AnnotateIgnoreWritesBegin(const char *f, int l);\r
+        void AnnotateIgnoreWritesEnd(const char *f, int l);\r
+    }\r
+#else\r
+#   define CDS_TSAN_ANNOTATE_HAPPENS_BEFORE(addr)\r
+#   define CDS_TSAN_ANNOTATE_HAPPENS_AFTER(addr)
+
+#   define CDS_TSAN_ANNOTATE_IGNORE_READS_BEGIN\r
+#   define CDS_TSAN_ANNOTATE_IGNORE_READS_END\r
+#   define CDS_TSAN_ANNOTATE_IGNORE_WRITES_BEGIN\r
+#   define CDS_TSAN_ANNOTATE_IGNORE_WRITES_END\r
+#   define CDS_TSAN_ANNOTATE_IGNORE_RW_BEGIN\r
+#   define CDS_TSAN_ANNOTATE_IGNORE_RW_END\r
+#endif
+
+#endif  // #ifndef CDSLIB_COMPILER_FEATURE_TSAN_H
index 2dadc6e1edb93544ce4a79bb442d7c21d7b64c47..386c6237b2e2f03ed06683f61132d2f918d1dd9f 100644 (file)
 // Inheriting constructors
 #define CDS_CXX11_INHERITING_CTOR
 
+// *************************************************
+// Features
+// If you run under Thread Sanitizer, pass -DCDS_THREAD_SANITIZER_ENABLED in compiler command line
+//#define CDS_THREAD_SANITIZER_ENABLED
+
 // *************************************************
 // Alignment macro
 
index de24d1312dc0124e0009786aff83085b80238fd9..1ead8ccc1327f9c622aba9eccf2cd13bd84402f8 100644 (file)
@@ -130,19 +130,28 @@ namespace cds { namespace container {
         template <typename K>
         static node_type * alloc_node(const K& key)
         {
-            return cxx_allocator().New( key );
+            CDS_TSAN_ANNOTATE_IGNORE_WRITES_BEGIN;
+            node_type * p = cxx_allocator().New( key );
+            CDS_TSAN_ANNOTATE_IGNORE_WRITES_END;
+            return p;
         }
 
         template <typename K, typename V>
         static node_type * alloc_node( const K& key, const V& val )
         {
-            return cxx_allocator().New( key, val );
+            CDS_TSAN_ANNOTATE_IGNORE_WRITES_BEGIN;
+            node_type * p = cxx_allocator().New( key, val );
+            CDS_TSAN_ANNOTATE_IGNORE_WRITES_END;
+            return p;
         }
 
         template <typename... Args>
         static node_type * alloc_node( Args&&... args )
         {
-            return cxx_allocator().MoveNew( std::forward<Args>(args)... );
+            CDS_TSAN_ANNOTATE_IGNORE_WRITES_BEGIN;
+            node_type * p = cxx_allocator().MoveNew( std::forward<Args>(args)... );
+            CDS_TSAN_ANNOTATE_IGNORE_WRITES_END;
+            return p;
         }
 
         static void free_node( node_type * pNode )
index e4dc790186e7f95ffe1257d65d3b5759ffd2dd82..bbe9b1341415050fe3e1bf053ade5bc5a7b8f320 100644 (file)
@@ -243,7 +243,7 @@ namespace cds { namespace intrusive {
             link_checker::is_empty( pNode );
 
             marked_node_ptr cur(pos.pCur);
-            pNode->m_pNext.store( cur, memory_model::memory_order_relaxed );
+            pNode->m_pNext.store( cur, memory_model::memory_order_release );
             return pos.pPrev->compare_exchange_strong( cur, marked_node_ptr(pNode), memory_model::memory_order_release, atomics::memory_order_relaxed );
         }
 
@@ -258,7 +258,7 @@ namespace cds { namespace intrusive {
                 // physical deletion may be performed by search function if it detects that a node is logically deleted (marked)
                 // CAS may be successful here or in other thread that searching something
                 marked_node_ptr cur(pos.pCur);
-                if ( pos.pPrev->compare_exchange_strong( cur, marked_node_ptr( pos.pNext ), memory_model::memory_order_release, atomics::memory_order_relaxed ))
+                if ( pos.pPrev->compare_exchange_strong( cur, marked_node_ptr( pos.pNext ), memory_model::memory_order_acquire, atomics::memory_order_relaxed ))
                     retire_node( pos.pCur );
                 return true;
             }
@@ -1082,9 +1082,9 @@ try_again:
                     return false;
                 }
 
-                pNext = pCur->m_pNext.load(memory_model::memory_order_relaxed);
+                pNext = pCur->m_pNext.load(memory_model::memory_order_acquire);
                 pos.guards.assign( position::guard_next_item, node_traits::to_value_ptr( pNext.ptr() ));
-                if ( pCur->m_pNext.load(memory_model::memory_order_acquire).all() != pNext.all() ) {
+                if ( pCur->m_pNext.load(memory_model::memory_order_relaxed).all() != pNext.all() ) {
                     bkoff();
                     goto try_again;
                 }
@@ -1098,7 +1098,7 @@ try_again:
                 if ( pNext.bits() == 1 ) {
                     // pCur marked i.e. logically deleted. Help the erase/unlink function to unlink pCur node
                     marked_node_ptr cur( pCur.ptr());
-                    if ( pPrev->compare_exchange_strong( cur, marked_node_ptr( pNext.ptr() ), memory_model::memory_order_release, atomics::memory_order_relaxed )) {
+                    if ( pPrev->compare_exchange_strong( cur, marked_node_ptr( pNext.ptr() ), memory_model::memory_order_acquire, atomics::memory_order_relaxed )) {
                         retire_node( pCur.ptr() );
                     }
                     else {
index ceabfa00e5a9d8a2cf39efe937143c41ec538856..a62cf157ed16c3172e210bd2172fe0107eb47abf 100644 (file)
@@ -1142,7 +1142,7 @@ namespace cds { namespace intrusive {
 
             while ( pCur.ptr() != pTail && ( pCur.ptr() == pHead || cmp( *node_traits::to_value_ptr( *pCur.ptr() ), key ) < 0 )) {
                 pPrev = pCur;
-                pCur = pCur->m_pNext.load(memory_model::memory_order_relaxed);
+                pCur = pCur->m_pNext.load(memory_model::memory_order_acquire);
             }
 
             pos.pCur = pCur.ptr();
index 2114e83a4453f38640ada053ee08b64a89ef2c3a..25f9ce58b699812d432af821cea55bff6f7bd93f 100644 (file)
@@ -138,7 +138,7 @@ namespace cds { namespace intrusive {
             link_checker::is_empty( pNode );
 
             marked_node_ptr p( pos.pCur );
-            pNode->m_pNext.store( p, memory_model::memory_order_relaxed );
+            pNode->m_pNext.store( p, memory_model::memory_order_release );
             return pos.pPrev->compare_exchange_strong( p, marked_node_ptr(pNode), memory_model::memory_order_release, atomics::memory_order_relaxed );
         }
 
@@ -146,12 +146,12 @@ namespace cds { namespace intrusive {
         {
             // Mark the node (logical deleting)
             marked_node_ptr next(pos.pNext, 0);
-            if ( pos.pCur->m_pNext.compare_exchange_strong( next, marked_node_ptr(pos.pNext, 1), memory_model::memory_order_acquire, atomics::memory_order_relaxed )) {
+            if ( pos.pCur->m_pNext.compare_exchange_strong( next, marked_node_ptr(pos.pNext, 1), memory_model::memory_order_release, atomics::memory_order_relaxed )) {
                 marked_node_ptr cur(pos.pCur);
-                if ( pos.pPrev->compare_exchange_strong( cur, marked_node_ptr( pos.pNext ), memory_model::memory_order_release, atomics::memory_order_relaxed ))
+                if ( pos.pPrev->compare_exchange_strong( cur, marked_node_ptr( pos.pNext ), memory_model::memory_order_acquire, atomics::memory_order_relaxed ))
                     return true;
                 next |= 1;
-                CDS_VERIFY( pos.pCur->m_pNext.compare_exchange_strong( next, next ^ 1, memory_model::memory_order_release, atomics::memory_order_relaxed ));
+                CDS_VERIFY( pos.pCur->m_pNext.compare_exchange_strong( next, next ^ 1, memory_model::memory_order_relaxed, atomics::memory_order_relaxed ));
             }
             return false;
         }
@@ -980,15 +980,15 @@ namespace cds { namespace intrusive {
             while ( true ) {
                 if ( !pCur.ptr() ) {
                     pos.pPrev = pPrev;
-                    pos.pCur = pCur.ptr();
-                    pos.pNext = pNext.ptr();
+                    pos.pCur = nullptr;
+                    pos.pNext = nullptr;
                     return false;
                 }
 
-                pNext = pCur->m_pNext.load(memory_model::memory_order_relaxed);
+                pNext = pCur->m_pNext.load(memory_model::memory_order_acquire);
 
-                if ( pPrev->load(memory_model::memory_order_acquire) != pCur
-                    || pNext != pCur->m_pNext.load(memory_model::memory_order_acquire)
+                if ( pPrev->load(memory_model::memory_order_relaxed) != pCur
+                    || pNext != pCur->m_pNext.load(memory_model::memory_order_relaxed)
                     || pNext.bits() != 0 )  // pNext contains deletion mark for pCur
                 {
                     // if pCur is marked as deleted (pNext.bits() != 0)
index 0288e703f5c74b9aa1b6cd4eafdaafd67c71b1b1..e28cdf936ae6f005913fd8f6aa834921e925f34e 100644 (file)
@@ -478,9 +478,8 @@ namespace cds { namespace intrusive {
 
             const size_t nNewMaxCount = (nBucketCount < m_Buckets.capacity()) ? max_item_count( nBucketCount << 1, nLoadFactor )
                                                                               : std::numeric_limits<size_t>::max();
-            m_nMaxItemCount.compare_exchange_strong( nMaxCount, nNewMaxCount, memory_model::memory_order_relaxed,
-                memory_model::memory_order_relaxed );
-            m_nBucketCountLog2.compare_exchange_strong( sz, sz + 1, memory_model::memory_order_relaxed, memory_model::memory_order_relaxed );
+            m_nMaxItemCount.compare_exchange_strong( nMaxCount, nNewMaxCount, memory_model::memory_order_relaxed, atomics::memory_order_relaxed );
+            m_nBucketCountLog2.compare_exchange_strong( sz, sz + 1, memory_model::memory_order_relaxed, atomics::memory_order_relaxed );
         }
 
         template <typename Q, typename Compare, typename Func>
index f79b6e22d2d2126f1e811d875a09498f997f7f5e..023c26387f820d1f043f2d16a236e0af9e416c49 100644 (file)
@@ -107,8 +107,11 @@ namespace cds { namespace urcu {
         {
             epoch_retired_ptr p;
             while ( m_Buffer.pop( p )) {
-                if ( p.m_nEpoch <= nEpoch )
+                if ( p.m_nEpoch <= nEpoch ) {
+                    CDS_TSAN_ANNOTATE_IGNORE_RW_BEGIN;
                     p.free();
+                    CDS_TSAN_ANNOTATE_IGNORE_RW_END;
+                }
                 else {
                     push_buffer( p );
                     break;
@@ -122,8 +125,11 @@ namespace cds { namespace urcu {
             bool bPushed = m_Buffer.push( ep );
             if ( !bPushed || m_Buffer.size() >= capacity() ) {
                 synchronize();
-                if ( !bPushed )
+                if ( !bPushed ) {
+                    CDS_TSAN_ANNOTATE_IGNORE_RW_BEGIN;
                     ep.free();
+                    CDS_TSAN_ANNOTATE_IGNORE_RW_END;
+                }
                 return true;
             }
             return false;
index b72ffec6ebd50a1a90c397455b92423ded267062..783a3064d3a8394fdfcba36b5723ef76119aabf4 100644 (file)
@@ -111,8 +111,11 @@ namespace cds { namespace urcu {
         virtual void retire_ptr( retired_ptr& p )
         {
             synchronize();
-            if ( p.m_p )
+            if ( p.m_p ) {
+                CDS_TSAN_ANNOTATE_IGNORE_RW_BEGIN;
                 p.free();
+                CDS_TSAN_ANNOTATE_IGNORE_RW_END;
+            }
         }
 
         /// Retires the pointer chain [\p itFirst, \p itLast)
index 9bc0d876d7305fd5d3f888f815c7fcb59f4c568a..84d2c1a83ae6ebd78d051b77fecbeb2264cfcc26 100644 (file)
@@ -113,8 +113,11 @@ namespace cds { namespace urcu {
             bool bPushed = m_Buffer.push( p );
             if ( !bPushed || m_Buffer.size() >= capacity() ) {
                 synchronize();
-                if ( !bPushed )
+                if ( !bPushed ) {
+                    CDS_TSAN_ANNOTATE_IGNORE_RW_BEGIN;
                     p.free();
+                    CDS_TSAN_ANNOTATE_IGNORE_RW_END;
+                }
                 return true;
             }
             return false;
index db4d88cf0eb08a1dcd6d28e3318cb104792b401b..bb0e0c5d6ba105eb36133ede27597cd82ab72232 100644 (file)
@@ -68,10 +68,10 @@ namespace cds { namespace urcu {
 
     protected:
         //@cond
-        buffer_type                     m_Buffer;
-        atomics::atomic<uint64_t>    m_nCurEpoch;
-        lock_type                       m_Lock;
-        size_t const                    m_nCapacity;
+        buffer_type               m_Buffer;
+        atomics::atomic<uint64_t> m_nCurEpoch;
+        lock_type                 m_Lock;
+        size_t const              m_nCapacity;
         //@endcond
 
     public:
@@ -104,8 +104,11 @@ namespace cds { namespace urcu {
         {
             epoch_retired_ptr p;
             while ( m_Buffer.pop( p )) {
-                if ( p.m_nEpoch <= nEpoch )
+                if ( p.m_nEpoch <= nEpoch ) {
+                    CDS_TSAN_ANNOTATE_IGNORE_RW_BEGIN;
                     p.free();
+                    CDS_TSAN_ANNOTATE_IGNORE_RW_END;
+                }
                 else {
                     push_buffer( p );
                     break;
@@ -118,8 +121,11 @@ namespace cds { namespace urcu {
             bool bPushed = m_Buffer.push( ep );
             if ( !bPushed || m_Buffer.size() >= capacity() ) {
                 synchronize();
-                if ( !bPushed )
+                if ( !bPushed ) {
+                    CDS_TSAN_ANNOTATE_IGNORE_RW_BEGIN;
                     ep.free();
+                    CDS_TSAN_ANNOTATE_IGNORE_RW_END;
+                }
                 return true;
             }
             return false;
index cf99c543f37577a3b9aea8881e5c3179302e91e9..17d0152531f1c98c149850100a3a8fbe697c9a5a 100644 (file)
@@ -110,8 +110,11 @@ namespace cds { namespace urcu {
             bool bPushed = m_Buffer.push( p );
             if ( !bPushed || m_Buffer.size() >= capacity() ) {
                 synchronize();
-                if ( !bPushed )
+                if ( !bPushed ) {
+                    CDS_TSAN_ANNOTATE_IGNORE_RW_BEGIN;
                     p.free();
+                    CDS_TSAN_ANNOTATE_IGNORE_RW_END;
+                }
                 return true;
             }
             return false;
index ca95603587dd1f7200e3b1cbb19a850070920b1f..46baa8fa7c67029b5b83e19daa348074eda07b05 100644 (file)
@@ -102,8 +102,11 @@ namespace cds { namespace urcu {
         {
             epoch_retired_ptr p;
             while ( pBuf->pop( p ) ) {
-                if ( p.m_nEpoch <= nCurEpoch )
+                if ( p.m_nEpoch <= nCurEpoch ) {
+                    CDS_TSAN_ANNOTATE_IGNORE_RW_BEGIN;
                     p.free();
+                    CDS_TSAN_ANNOTATE_IGNORE_RW_END;
+                }
                 else {
                     pBuf->push( p );
                     break;
index 09504492d19c2b7baac140887fe585fd4acabdc4..6b981b47b7ef09e891a689f4e2a83452462432e8 100644 (file)
     <ClInclude Include="..\..\..\cds\algo\int_algo.h" />\r
     <ClInclude Include="..\..\..\cds\compiler\clang\defs.h" />\r
     <ClInclude Include="..\..\..\cds\compiler\cxx11_atomic.h" />\r
+    <ClInclude Include="..\..\..\cds\compiler\feature_tsan.h" />\r
     <ClInclude Include="..\..\..\cds\compiler\gcc\amd64\cxx11_atomic.h" />\r
     <ClInclude Include="..\..\..\cds\compiler\gcc\compiler_macro.h" />\r
     <ClInclude Include="..\..\..\cds\compiler\gcc\ia64\cxx11_atomic.h" />\r
index 54bc40c49827e47e724274cec912d1453a5b50f0..e2042f83fbdc4484480b99aeb351380a40effa39 100644 (file)
     <ClInclude Include="..\..\..\cds\sync\pool_monitor.h">\r
       <Filter>Header Files\cds\sync</Filter>\r
     </ClInclude>\r
+    <ClInclude Include="..\..\..\cds\compiler\feature_tsan.h">\r
+      <Filter>Header Files\cds\compiler</Filter>\r
+    </ClInclude>\r
   </ItemGroup>\r
 </Project>
\ No newline at end of file
index a50aa37087773a6a20bf0d37cfe8ce2a6d88ccb7..bac98015296510ea890e021090ed2e35701cb7c9 100644 (file)
@@ -1,7 +1,8 @@
 //$$CDS-header$$
 
+#include <chrono>
+#include <cds/details/defs.h> // TSan annotations
 #include "cppunit/thread.h"
-#include <boost/date_time/posix_time/posix_time_types.hpp>
 
 namespace CppUnitMini {
 
@@ -42,12 +43,16 @@ namespace CppUnitMini {
 
     ThreadPool::~ThreadPool()
     {
+        CDS_TSAN_ANNOTATE_IGNORE_RW_BEGIN;
+
         delete m_pBarrierStart;
         delete m_pBarrierDone;
 
         for ( size_t i = 0; i < m_arrThreads.size(); ++i )
             delete m_arrThreads[i];
         m_arrThreads.resize( 0 );
+
+        CDS_TSAN_ANNOTATE_IGNORE_RW_END;
     }
 
     void    ThreadPool::add( TestThread * pThread, size_t nCount )
@@ -75,7 +80,7 @@ namespace CppUnitMini {
 
         // Wait while all threads is done
         m_pBarrierDone->wait();
-        boost::this_thread::sleep(boost::posix_time::milliseconds(500));
+        std::this_thread::sleep_for(std::chrono::milliseconds(500));
     }
 
     void ThreadPool::run( unsigned int nDuration )
@@ -87,17 +92,17 @@ namespace CppUnitMini {
         for ( size_t i = 0; i < nThreadCount; ++i )
             m_arrThreads[i]->create();
 
-        boost::system_time stEnd( boost::get_system_time() + boost::posix_time::seconds( nDuration ) );
+        auto stEnd(std::chrono::steady_clock::now() + std::chrono::seconds( nDuration ));
         do {
-            boost::this_thread::sleep( stEnd );
-        } while ( boost::get_system_time() < stEnd );
+            std::this_thread::sleep_until( stEnd );
+        } while ( std::chrono::steady_clock::now() < stEnd );
 
         for ( size_t i = 0; i < nThreadCount; ++i )
             m_arrThreads[i]->stop();
 
         // Wait while all threads is done
         m_pBarrierDone->wait();
-        boost::this_thread::sleep(boost::posix_time::milliseconds(500));
+        std::this_thread::sleep_for(std::chrono::milliseconds(500));
     }
 
     void    ThreadPool::onThreadInitDone( TestThread * pThread )