Fix ThreadLocal races
authorDave Watson <davejwatson@fb.com>
Tue, 19 Dec 2017 21:17:28 +0000 (13:17 -0800)
committerFacebook Github Bot <facebook-github-bot@users.noreply.github.com>
Tue, 19 Dec 2017 21:24:46 +0000 (13:24 -0800)
Summary:
I misread the ThreadLocal docs, thread destruction functions do *not* grab the accessAllTHreads_ lock
unless you use *strict* mode, and even then, it is only a read lock.

Easy enough to make the thread-destruction global bits to be atomic / use folly::Synchronized.

Reviewed By: yfeldblum

Differential Revision: D6592905

fbshipit-source-id: 4ae600dff4c8c04751483a452ca7c07ef3f26380

folly/synchronization/detail/ThreadCachedInts.h
folly/synchronization/detail/ThreadCachedLists.h
folly/synchronization/test/RcuTest.cpp

index 2b954c1f309a864c4928571b371fdeb2eabf2cd1..1e6b0fb8f077e6e3afe8a42e8ea085782a15af94 100644 (file)
@@ -36,9 +36,8 @@ namespace detail {
 
 template <typename Tag>
 class ThreadCachedInts {
-  // These are only accessed under the ThreadLocal lock.
-  int64_t orphan_inc_[2]{0, 0};
-  int64_t orphan_dec_[2]{0, 0};
+  std::atomic<int64_t> orphan_inc_[2];
+  std::atomic<int64_t> orphan_dec_[2];
   folly::detail::Futex<> waiting_;
 
   class Integer {
@@ -49,10 +48,16 @@ class ThreadCachedInts {
     std::atomic<int64_t> inc_[2];
     std::atomic<int64_t> dec_[2];
     ~Integer() noexcept {
-      ints_->orphan_inc_[0] += inc_[0].load(std::memory_order_relaxed);
-      ints_->orphan_inc_[1] += inc_[1].load(std::memory_order_relaxed);
-      ints_->orphan_dec_[0] += dec_[0].load(std::memory_order_relaxed);
-      ints_->orphan_dec_[1] += dec_[1].load(std::memory_order_relaxed);
+      // Increment counts must be set before decrement counts
+      ints_->orphan_inc_[0].fetch_add(
+          inc_[0].load(std::memory_order_relaxed), std::memory_order_relaxed);
+      ints_->orphan_inc_[1].fetch_add(
+          inc_[1].load(std::memory_order_relaxed), std::memory_order_relaxed);
+      folly::asymmetricLightBarrier(); // B
+      ints_->orphan_dec_[0].fetch_add(
+          dec_[0].load(std::memory_order_relaxed), std::memory_order_relaxed);
+      ints_->orphan_dec_[1].fetch_add(
+          dec_[1].load(std::memory_order_relaxed), std::memory_order_relaxed);
       ints_->waiting_.store(0, std::memory_order_release);
       ints_->waiting_.futexWake();
     }
@@ -99,7 +104,7 @@ class ThreadCachedInts {
   }
 
   int64_t readFull(uint8_t epoch) {
-    int64_t full = 0;
+    int64_t full = -orphan_dec_[epoch].load(std::memory_order_relaxed);
 
     // Matches A - ensure all threads have seen new value of version,
     // *and* that we see current values of counters in readFull()
@@ -125,8 +130,7 @@ class ThreadCachedInts {
     }
 
     // orphan is read behind accessAllThreads lock
-    auto res = full + orphan_inc_[epoch] - orphan_dec_[epoch];
-    return res;
+    return full + orphan_inc_[epoch].load(std::memory_order_relaxed);
   }
 
   void waitForZero(uint8_t phase) {
@@ -158,10 +162,10 @@ class ThreadCachedInts {
       int_cache_->inc_[0].store(0, std::memory_order_relaxed);
       int_cache_->inc_[1].store(0, std::memory_order_relaxed);
     }
-    orphan_inc_[0] = 0;
-    orphan_inc_[1] = 0;
-    orphan_dec_[0] = 0;
-    orphan_dec_[1] = 0;
+    orphan_inc_[0].store(0, std::memory_order_relaxed);
+    orphan_inc_[1].store(0, std::memory_order_relaxed);
+    orphan_dec_[0].store(0, std::memory_order_relaxed);
+    orphan_dec_[1].store(0, std::memory_order_relaxed);
   }
 };
 
index 6161d4e118a0ffc4b1a278dedb363fd810a6a4d9..6852bfd2240febb364fca19eb27e82c6ffb1b488 100644 (file)
@@ -19,6 +19,7 @@
 #include <atomic>
 
 #include <folly/Function.h>
+#include <folly/Synchronized.h>
 #include <folly/ThreadLocal.h>
 #include <glog/logging.h>
 
@@ -81,7 +82,7 @@ class ThreadCachedLists : public ThreadCachedListsBase {
   // Push list to the global list.
   void pushGlobal(ListHead& list);
 
-  ListHead ghead_;
+  folly::Synchronized<ListHead> ghead_;
 
   struct TLHead : public AtomicListHead {
     ThreadCachedLists* parent_;
@@ -90,7 +91,7 @@ class ThreadCachedLists : public ThreadCachedListsBase {
     TLHead(ThreadCachedLists* parent) : parent_(parent) {}
 
     ~TLHead() {
-      parent_->ghead_.splice(*this);
+      parent_->ghead_->splice(*this);
     }
   };
 
@@ -146,7 +147,7 @@ void ThreadCachedLists<Tag>::collect(ListHead& list) {
     list.splice(thr);
   }
 
-  list.splice(ghead_);
+  list.splice(*ghead_.wlock());
 }
 
 template <typename Tag>
index 5df5b0f3d3fb3ab42def4e1bd50625ad115613ac..911639972e9858e338cf2e6e30df16fe0e0c0706 100644 (file)
 #include <folly/Random.h>
 #include <folly/portability/GFlags.h>
 #include <folly/portability/GTest.h>
-#include <folly/synchronization/Baton.h>
 
 using namespace folly;
 
 DEFINE_int64(iters, 100000, "Number of iterations");
-DEFINE_int64(threads, 32, "Number of threads");
+DEFINE_uint64(threads, 32, "Number of threads");
 
 TEST(RcuTest, Basic) {
   auto foo = new int(2);
@@ -116,7 +115,7 @@ TEST(RcuTest, Stress) {
   for (uint i = 0; i < sz; i++) {
     ints[i].store(new int(0));
   }
-  for (int th = 0; th < FLAGS_threads; th++) {
+  for (unsigned th = 0; th < FLAGS_threads; th++) {
     threads.push_back(std::thread([&]() {
       for (int i = 0; i < FLAGS_iters / 100; i++) {
         rcu_reader g;
@@ -148,11 +147,16 @@ TEST(RcuTest, Stress) {
   }
   done = true;
   updater.join();
+  // Cleanup for asan
+  synchronize_rcu();
+  for (uint i = 0; i < sz; i++) {
+    delete ints[i].exchange(nullptr);
+  }
 }
 
 TEST(RcuTest, Synchronize) {
   std::vector<std::thread> threads;
-  for (int th = 0; th < FLAGS_threads; th++) {
+  for (unsigned th = 0; th < FLAGS_threads; th++) {
     threads.push_back(std::thread([&]() {
       for (int i = 0; i < 10; i++) {
         synchronize_rcu();
@@ -197,14 +201,11 @@ TEST(RcuTest, MoveReaderBetweenThreads) {
 }
 
 TEST(RcuTest, ForkTest) {
-  folly::Baton<> b;
   rcu_token epoch;
   std::thread t([&]() {
     epoch = rcu_default_domain()->lock_shared();
-    b.post();
   });
-  t.detach();
-  b.wait();
+  t.join();
   auto pid = fork();
   if (pid) {
     // parent
@@ -221,22 +222,21 @@ TEST(RcuTest, ForkTest) {
   }
 }
 
-TEST(RcuTest, CoreLocalList) {
+TEST(RcuTest, ThreadLocalList) {
   struct TTag;
   folly::detail::ThreadCachedLists<TTag> lists;
-  int numthreads = 32;
-  std::vector<std::thread> threads;
-  std::atomic<int> done{0};
-  for (int tr = 0; tr < numthreads; tr++) {
-    threads.push_back(std::thread([&]() {
+  std::vector<std::thread> threads{FLAGS_threads};
+  std::atomic<unsigned long> done{FLAGS_threads};
+  for (auto& tr : threads) {
+    tr = std::thread([&]() {
       for (int i = 0; i < FLAGS_iters; i++) {
         auto node = new folly::detail::ThreadCachedListsBase::Node;
         lists.push(node);
       }
-      done++;
-    }));
+      --done;
+    });
   }
-  while (done.load() != numthreads) {
+  while (done.load() > 0) {
     folly::detail::ThreadCachedLists<TTag>::ListHead list{};
     lists.collect(list);
     list.forEach([](folly::detail::ThreadCachedLists<TTag>::Node* node) {
@@ -246,6 +246,11 @@ TEST(RcuTest, CoreLocalList) {
   for (auto& thread : threads) {
     thread.join();
   }
+  // Run cleanup pass one more time to make ASAN happy
+  folly::detail::ThreadCachedLists<TTag>::ListHead list{};
+  lists.collect(list);
+  list.forEach(
+      [](folly::detail::ThreadCachedLists<TTag>::Node* node) { delete node; });
 }
 
 TEST(RcuTest, ThreadDeath) {