template <typename Tag>
class ThreadCachedInts {
- // These are only accessed under the ThreadLocal lock.
- int64_t orphan_inc_[2]{0, 0};
- int64_t orphan_dec_[2]{0, 0};
+ std::atomic<int64_t> orphan_inc_[2];
+ std::atomic<int64_t> orphan_dec_[2];
folly::detail::Futex<> waiting_;
class Integer {
std::atomic<int64_t> inc_[2];
std::atomic<int64_t> dec_[2];
~Integer() noexcept {
- ints_->orphan_inc_[0] += inc_[0].load(std::memory_order_relaxed);
- ints_->orphan_inc_[1] += inc_[1].load(std::memory_order_relaxed);
- ints_->orphan_dec_[0] += dec_[0].load(std::memory_order_relaxed);
- ints_->orphan_dec_[1] += dec_[1].load(std::memory_order_relaxed);
+ // Increment counts must be set before decrement counts
+ ints_->orphan_inc_[0].fetch_add(
+ inc_[0].load(std::memory_order_relaxed), std::memory_order_relaxed);
+ ints_->orphan_inc_[1].fetch_add(
+ inc_[1].load(std::memory_order_relaxed), std::memory_order_relaxed);
+ folly::asymmetricLightBarrier(); // B
+ ints_->orphan_dec_[0].fetch_add(
+ dec_[0].load(std::memory_order_relaxed), std::memory_order_relaxed);
+ ints_->orphan_dec_[1].fetch_add(
+ dec_[1].load(std::memory_order_relaxed), std::memory_order_relaxed);
ints_->waiting_.store(0, std::memory_order_release);
ints_->waiting_.futexWake();
}
}
int64_t readFull(uint8_t epoch) {
- int64_t full = 0;
+ int64_t full = -orphan_dec_[epoch].load(std::memory_order_relaxed);
// Matches A - ensure all threads have seen new value of version,
// *and* that we see current values of counters in readFull()
}
// orphan is read behind accessAllThreads lock
- auto res = full + orphan_inc_[epoch] - orphan_dec_[epoch];
- return res;
+ return full + orphan_inc_[epoch].load(std::memory_order_relaxed);
}
void waitForZero(uint8_t phase) {
int_cache_->inc_[0].store(0, std::memory_order_relaxed);
int_cache_->inc_[1].store(0, std::memory_order_relaxed);
}
- orphan_inc_[0] = 0;
- orphan_inc_[1] = 0;
- orphan_dec_[0] = 0;
- orphan_dec_[1] = 0;
+ orphan_inc_[0].store(0, std::memory_order_relaxed);
+ orphan_inc_[1].store(0, std::memory_order_relaxed);
+ orphan_dec_[0].store(0, std::memory_order_relaxed);
+ orphan_dec_[1].store(0, std::memory_order_relaxed);
}
};
#include <atomic>
#include <folly/Function.h>
+#include <folly/Synchronized.h>
#include <folly/ThreadLocal.h>
#include <glog/logging.h>
// Push list to the global list.
void pushGlobal(ListHead& list);
- ListHead ghead_;
+ folly::Synchronized<ListHead> ghead_;
struct TLHead : public AtomicListHead {
ThreadCachedLists* parent_;
TLHead(ThreadCachedLists* parent) : parent_(parent) {}
~TLHead() {
- parent_->ghead_.splice(*this);
+ parent_->ghead_->splice(*this);
}
};
list.splice(thr);
}
- list.splice(ghead_);
+ list.splice(*ghead_.wlock());
}
template <typename Tag>
#include <folly/Random.h>
#include <folly/portability/GFlags.h>
#include <folly/portability/GTest.h>
-#include <folly/synchronization/Baton.h>
using namespace folly;
DEFINE_int64(iters, 100000, "Number of iterations");
-DEFINE_int64(threads, 32, "Number of threads");
+DEFINE_uint64(threads, 32, "Number of threads");
TEST(RcuTest, Basic) {
auto foo = new int(2);
for (uint i = 0; i < sz; i++) {
ints[i].store(new int(0));
}
- for (int th = 0; th < FLAGS_threads; th++) {
+ for (unsigned th = 0; th < FLAGS_threads; th++) {
threads.push_back(std::thread([&]() {
for (int i = 0; i < FLAGS_iters / 100; i++) {
rcu_reader g;
}
done = true;
updater.join();
+ // Cleanup for asan
+ synchronize_rcu();
+ for (uint i = 0; i < sz; i++) {
+ delete ints[i].exchange(nullptr);
+ }
}
TEST(RcuTest, Synchronize) {
std::vector<std::thread> threads;
- for (int th = 0; th < FLAGS_threads; th++) {
+ for (unsigned th = 0; th < FLAGS_threads; th++) {
threads.push_back(std::thread([&]() {
for (int i = 0; i < 10; i++) {
synchronize_rcu();
}
TEST(RcuTest, ForkTest) {
- folly::Baton<> b;
rcu_token epoch;
std::thread t([&]() {
epoch = rcu_default_domain()->lock_shared();
- b.post();
});
- t.detach();
- b.wait();
+ t.join();
auto pid = fork();
if (pid) {
// parent
}
}
-TEST(RcuTest, CoreLocalList) {
+TEST(RcuTest, ThreadLocalList) {
struct TTag;
folly::detail::ThreadCachedLists<TTag> lists;
- int numthreads = 32;
- std::vector<std::thread> threads;
- std::atomic<int> done{0};
- for (int tr = 0; tr < numthreads; tr++) {
- threads.push_back(std::thread([&]() {
+ std::vector<std::thread> threads{FLAGS_threads};
+ std::atomic<unsigned long> done{FLAGS_threads};
+ for (auto& tr : threads) {
+ tr = std::thread([&]() {
for (int i = 0; i < FLAGS_iters; i++) {
auto node = new folly::detail::ThreadCachedListsBase::Node;
lists.push(node);
}
- done++;
- }));
+ --done;
+ });
}
- while (done.load() != numthreads) {
+ while (done.load() > 0) {
folly::detail::ThreadCachedLists<TTag>::ListHead list{};
lists.collect(list);
list.forEach([](folly::detail::ThreadCachedLists<TTag>::Node* node) {
for (auto& thread : threads) {
thread.join();
}
+ // Run cleanup pass one more time to make ASAN happy
+ folly::detail::ThreadCachedLists<TTag>::ListHead list{};
+ lists.collect(list);
+ list.forEach(
+ [](folly::detail::ThreadCachedLists<TTag>::Node* node) { delete node; });
}
TEST(RcuTest, ThreadDeath) {