From 74502e3cc02daef719f6255cc55a7bebe0772a26 Mon Sep 17 00:00:00 2001 From: Dave Watson Date: Tue, 19 Dec 2017 13:17:28 -0800 Subject: [PATCH] Fix ThreadLocal races Summary: I misread the ThreadLocal docs, thread destruction functions do *not* grab the accessAllTHreads_ lock unless you use *strict* mode, and even then, it is only a read lock. Easy enough to make the thread-destruction global bits to be atomic / use folly::Synchronized. Reviewed By: yfeldblum Differential Revision: D6592905 fbshipit-source-id: 4ae600dff4c8c04751483a452ca7c07ef3f26380 --- .../synchronization/detail/ThreadCachedInts.h | 32 ++++++++------- .../detail/ThreadCachedLists.h | 7 ++-- folly/synchronization/test/RcuTest.cpp | 39 +++++++++++-------- 3 files changed, 44 insertions(+), 34 deletions(-) diff --git a/folly/synchronization/detail/ThreadCachedInts.h b/folly/synchronization/detail/ThreadCachedInts.h index 2b954c1f..1e6b0fb8 100644 --- a/folly/synchronization/detail/ThreadCachedInts.h +++ b/folly/synchronization/detail/ThreadCachedInts.h @@ -36,9 +36,8 @@ namespace detail { template class ThreadCachedInts { - // These are only accessed under the ThreadLocal lock. - int64_t orphan_inc_[2]{0, 0}; - int64_t orphan_dec_[2]{0, 0}; + std::atomic orphan_inc_[2]; + std::atomic orphan_dec_[2]; folly::detail::Futex<> waiting_; class Integer { @@ -49,10 +48,16 @@ class ThreadCachedInts { std::atomic inc_[2]; std::atomic dec_[2]; ~Integer() noexcept { - ints_->orphan_inc_[0] += inc_[0].load(std::memory_order_relaxed); - ints_->orphan_inc_[1] += inc_[1].load(std::memory_order_relaxed); - ints_->orphan_dec_[0] += dec_[0].load(std::memory_order_relaxed); - ints_->orphan_dec_[1] += dec_[1].load(std::memory_order_relaxed); + // Increment counts must be set before decrement counts + ints_->orphan_inc_[0].fetch_add( + inc_[0].load(std::memory_order_relaxed), std::memory_order_relaxed); + ints_->orphan_inc_[1].fetch_add( + inc_[1].load(std::memory_order_relaxed), std::memory_order_relaxed); + folly::asymmetricLightBarrier(); // B + ints_->orphan_dec_[0].fetch_add( + dec_[0].load(std::memory_order_relaxed), std::memory_order_relaxed); + ints_->orphan_dec_[1].fetch_add( + dec_[1].load(std::memory_order_relaxed), std::memory_order_relaxed); ints_->waiting_.store(0, std::memory_order_release); ints_->waiting_.futexWake(); } @@ -99,7 +104,7 @@ class ThreadCachedInts { } int64_t readFull(uint8_t epoch) { - int64_t full = 0; + int64_t full = -orphan_dec_[epoch].load(std::memory_order_relaxed); // Matches A - ensure all threads have seen new value of version, // *and* that we see current values of counters in readFull() @@ -125,8 +130,7 @@ class ThreadCachedInts { } // orphan is read behind accessAllThreads lock - auto res = full + orphan_inc_[epoch] - orphan_dec_[epoch]; - return res; + return full + orphan_inc_[epoch].load(std::memory_order_relaxed); } void waitForZero(uint8_t phase) { @@ -158,10 +162,10 @@ class ThreadCachedInts { int_cache_->inc_[0].store(0, std::memory_order_relaxed); int_cache_->inc_[1].store(0, std::memory_order_relaxed); } - orphan_inc_[0] = 0; - orphan_inc_[1] = 0; - orphan_dec_[0] = 0; - orphan_dec_[1] = 0; + orphan_inc_[0].store(0, std::memory_order_relaxed); + orphan_inc_[1].store(0, std::memory_order_relaxed); + orphan_dec_[0].store(0, std::memory_order_relaxed); + orphan_dec_[1].store(0, std::memory_order_relaxed); } }; diff --git a/folly/synchronization/detail/ThreadCachedLists.h b/folly/synchronization/detail/ThreadCachedLists.h index 6161d4e1..6852bfd2 100644 --- a/folly/synchronization/detail/ThreadCachedLists.h +++ b/folly/synchronization/detail/ThreadCachedLists.h @@ -19,6 +19,7 @@ #include #include +#include #include #include @@ -81,7 +82,7 @@ class ThreadCachedLists : public ThreadCachedListsBase { // Push list to the global list. void pushGlobal(ListHead& list); - ListHead ghead_; + folly::Synchronized ghead_; struct TLHead : public AtomicListHead { ThreadCachedLists* parent_; @@ -90,7 +91,7 @@ class ThreadCachedLists : public ThreadCachedListsBase { TLHead(ThreadCachedLists* parent) : parent_(parent) {} ~TLHead() { - parent_->ghead_.splice(*this); + parent_->ghead_->splice(*this); } }; @@ -146,7 +147,7 @@ void ThreadCachedLists::collect(ListHead& list) { list.splice(thr); } - list.splice(ghead_); + list.splice(*ghead_.wlock()); } template diff --git a/folly/synchronization/test/RcuTest.cpp b/folly/synchronization/test/RcuTest.cpp index 5df5b0f3..91163997 100644 --- a/folly/synchronization/test/RcuTest.cpp +++ b/folly/synchronization/test/RcuTest.cpp @@ -24,12 +24,11 @@ #include #include #include -#include using namespace folly; DEFINE_int64(iters, 100000, "Number of iterations"); -DEFINE_int64(threads, 32, "Number of threads"); +DEFINE_uint64(threads, 32, "Number of threads"); TEST(RcuTest, Basic) { auto foo = new int(2); @@ -116,7 +115,7 @@ TEST(RcuTest, Stress) { for (uint i = 0; i < sz; i++) { ints[i].store(new int(0)); } - for (int th = 0; th < FLAGS_threads; th++) { + for (unsigned th = 0; th < FLAGS_threads; th++) { threads.push_back(std::thread([&]() { for (int i = 0; i < FLAGS_iters / 100; i++) { rcu_reader g; @@ -148,11 +147,16 @@ TEST(RcuTest, Stress) { } done = true; updater.join(); + // Cleanup for asan + synchronize_rcu(); + for (uint i = 0; i < sz; i++) { + delete ints[i].exchange(nullptr); + } } TEST(RcuTest, Synchronize) { std::vector threads; - for (int th = 0; th < FLAGS_threads; th++) { + for (unsigned th = 0; th < FLAGS_threads; th++) { threads.push_back(std::thread([&]() { for (int i = 0; i < 10; i++) { synchronize_rcu(); @@ -197,14 +201,11 @@ TEST(RcuTest, MoveReaderBetweenThreads) { } TEST(RcuTest, ForkTest) { - folly::Baton<> b; rcu_token epoch; std::thread t([&]() { epoch = rcu_default_domain()->lock_shared(); - b.post(); }); - t.detach(); - b.wait(); + t.join(); auto pid = fork(); if (pid) { // parent @@ -221,22 +222,21 @@ TEST(RcuTest, ForkTest) { } } -TEST(RcuTest, CoreLocalList) { +TEST(RcuTest, ThreadLocalList) { struct TTag; folly::detail::ThreadCachedLists lists; - int numthreads = 32; - std::vector threads; - std::atomic done{0}; - for (int tr = 0; tr < numthreads; tr++) { - threads.push_back(std::thread([&]() { + std::vector threads{FLAGS_threads}; + std::atomic done{FLAGS_threads}; + for (auto& tr : threads) { + tr = std::thread([&]() { for (int i = 0; i < FLAGS_iters; i++) { auto node = new folly::detail::ThreadCachedListsBase::Node; lists.push(node); } - done++; - })); + --done; + }); } - while (done.load() != numthreads) { + while (done.load() > 0) { folly::detail::ThreadCachedLists::ListHead list{}; lists.collect(list); list.forEach([](folly::detail::ThreadCachedLists::Node* node) { @@ -246,6 +246,11 @@ TEST(RcuTest, CoreLocalList) { for (auto& thread : threads) { thread.join(); } + // Run cleanup pass one more time to make ASAN happy + folly::detail::ThreadCachedLists::ListHead list{}; + lists.collect(list); + list.forEach( + [](folly::detail::ThreadCachedLists::Node* node) { delete node; }); } TEST(RcuTest, ThreadDeath) { -- 2.34.1