From 1673619a96338ad2bbf953e85c2d539867735352 Mon Sep 17 00:00:00 2001 From: Dave Watson Date: Wed, 30 Sep 2015 07:42:24 -0700 Subject: [PATCH] UpdateableSharedPtr in folly MIME-Version: 1.0 Content-Type: text/plain; charset=utf8 Content-Transfer-Encoding: 8bit Summary: This diff adds UpdateableSharedPtr and FastUpdateableSharedPtr classes to folly. They are efficiet RCU-style holders for a shared_ptr. See the comment at the top of UpdateableSharedPtr.h for a more detailed description and benchmark results. All the added files are copied from logdevice code with (almost) no changes (`logdevice/common/` on branch `origin/logdevice-latest`). We've been using them in logdevice for a few months. D1821723 introduced it. Reviewed By: @​bmaurer Differential Revision: D1919702 --- folly/Makefile.am | 1 + folly/ReadMostlySharedPtr.h | 187 ++++++++++ folly/test/ReadMostlySharedPtrBenchmark.cpp | 319 ++++++++++++++++ folly/test/ReadMostlySharedPtrTest.cpp | 382 ++++++++++++++++++++ 4 files changed, 889 insertions(+) create mode 100644 folly/ReadMostlySharedPtr.h create mode 100644 folly/test/ReadMostlySharedPtrBenchmark.cpp create mode 100644 folly/test/ReadMostlySharedPtrTest.cpp diff --git a/folly/Makefile.am b/folly/Makefile.am index 37bfde33..25e6db47 100644 --- a/folly/Makefile.am +++ b/folly/Makefile.am @@ -247,6 +247,7 @@ nobase_follyinclude_HEADERS = \ Random.h \ Random-inl.h \ Range.h \ + ReadMostlyAtomicSharedPtr.h \ RWSpinLock.h \ ScopeGuard.h \ SharedMutex.h \ diff --git a/folly/ReadMostlySharedPtr.h b/folly/ReadMostlySharedPtr.h new file mode 100644 index 00000000..9f714899 --- /dev/null +++ b/folly/ReadMostlySharedPtr.h @@ -0,0 +1,187 @@ +/* + * Copyright 2015 Facebook, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +/* -*- Mode: C++; tab-width: 2; c-basic-offset: 2; indent-tabs-mode: nil -*- */ +#pragma once + +#include +#include +#include +#include +#include + +namespace folly { + +/** + * @file ReadMostlySharedPtr is a smart pointer that allows for high + * performance shared ownership of an object. In order to provide + * this, ReadMostlySharedPtr may potentially delay the destruction of + * a shared object for longer than a std::shared_ptr would, and + * depending on the implementation, may have slower updates. + * + * The load() method allows a reader to acquire a ReadPtr that + * maintains a reference to a single version of the object. Even if a + * writer calls store(), the ReadPtr will point to the version of the + * object that was in use at the time of the read. The old version of + * the object will only be destroyed after all outstanding ReadPtrs to + * that version have been destroyed. + */ + +template +class ReadMostlySharedPtr { + public: + constexpr explicit ReadMostlySharedPtr(std::unique_ptr&& ptr = nullptr) + : masterPtr_(std::move(ptr)) {} + + /** + * Replaces the managed object. + */ + void store(std::unique_ptr&& uptr) { + { + std::shared_ptr ptr(std::move(uptr)); + std::lock_guard lock(mutex_); + // Swap to avoid calling ~T() under the lock + std::swap(masterPtr_, ptr); + } + + { + // This also holds a lock that prevents destruction of thread cache + // entries, but not creation. If creating a thread cache entry for a new + // thread happens duting iteration, the entry is not guaranteed to + // be seen. It's fine for us: if load() created a new cache entry after + // we got accessor, it will see the updated pointer, so we don't need to + // clear the cache. + auto accessor = threadLocalCache_.accessAllThreads(); + + for (CachedPointer& local: accessor) { + std::lock_guard local_lock(local.lock); + // We could instead just assign masterPtr_ to local.ptr, but it's better + // if the thread allocates the Ptr for itself - the allocator is more + // likely to place its reference counter in a region optimal for access + // from that thread. + local.ptr.clear(); + } + } + } + + class ReadPtr { + friend class ReadMostlySharedPtr; + public: + ReadPtr() {} + void reset() { + ref_ = nullptr; + ptr_.reset(); + } + explicit operator bool() const { + return (ref_ != nullptr); + } + bool operator ==(T* ptr) const { + return ref_ == ptr; + } + bool operator ==(std::nullptr_t) const { + return ref_ == nullptr; + } + T* operator->() const { return ref_; } + T& operator*() const { return *ref_; } + T* get() const { return ref_; } + private: + explicit ReadPtr(std::shared_ptr& ptr) + : ptr_(ptr) + , ref_(ptr.get()) {} + std::shared_ptr ptr_; + T* ref_{nullptr}; + }; + + /** + * Returns a shared_ptr to the managed object. + */ + ReadPtr load() const { + auto& local = *threadLocalCache_; + + std::lock_guard local_lock(local.lock); + + if (!local.ptr.hasValue()) { + std::lock_guard lock(mutex_); + if (!masterPtr_) { + local.ptr.emplace(nullptr); + } else { + // The following expression is tricky. + // + // It creates a shared_ptr> that points to a copy of + // masterPtr_. The reference counter of this shared_ptr> + // will normally only be modified from this thread, which avoids + // cache line bouncing. (Though the caller is free to pass the pointer + // to other threads and bump reference counter from there) + // + // Then this shared_ptr> is turned into shared_ptr. + // This means that the returned shared_ptr will internally point to + // control block of the shared_ptr>, but will dereference + // to T, not shared_ptr. + local.ptr = makeCachedCopy(masterPtr_); + } + } + + // The return statement makes the copy before destroying local variables, + // so local.ptr is only accessed under local.lock here. + return ReadPtr(local.ptr.value()); + } + + private: + + // non copyable + ReadMostlySharedPtr(const ReadMostlySharedPtr&) = delete; + ReadMostlySharedPtr& operator=(const ReadMostlySharedPtr&) = delete; + + struct CachedPointer { + folly::Optional> ptr; + folly::SpinLock lock; + }; + + std::shared_ptr masterPtr_; + + // Instead of using Tag as tag for ThreadLocal, effectively use pair (T, Tag), + // which is more granular. + struct ThreadLocalTag {}; + + mutable folly::ThreadLocal threadLocalCache_; + + // Ensures safety between concurrent store() and load() calls + mutable std::mutex mutex_; + + std::shared_ptr + makeCachedCopy(const std::shared_ptr &ptr) const { + // For std::shared_ptr wrap a copy in another std::shared_ptr to + // avoid cache line bouncing. + // + // The following expression is tricky. + // + // It creates a shared_ptr> that points to a copy of + // masterPtr_. The reference counter of this shared_ptr> + // will normally only be modified from this thread, which avoids + // cache line bouncing. (Though the caller is free to pass the pointer + // to other threads and bump reference counter from there) + // + // Then this shared_ptr> is turned into shared_ptr. + // This means that the returned shared_ptr will internally point to + // control block of the shared_ptr>, but will dereference + // to T, not shared_ptr. + return std::shared_ptr( + std::make_shared>(ptr), ptr.get()); + } + +}; + +} diff --git a/folly/test/ReadMostlySharedPtrBenchmark.cpp b/folly/test/ReadMostlySharedPtrBenchmark.cpp new file mode 100644 index 00000000..e135b6c2 --- /dev/null +++ b/folly/test/ReadMostlySharedPtrBenchmark.cpp @@ -0,0 +1,319 @@ +/* + * Copyright 2015 Facebook, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +/* -*- Mode: C++; tab-width: 2; c-basic-offset: 2; indent-tabs-mode: nil -*- */ + +#include +#include +#include +#include +#include + +#include + +/** + * @file Benchmark comparing three implementations of ReadMostlySharedPtr. + * + * Run with something like --bm_min_usec=100000. + */ + +namespace slow { + +// An implementation with thread local cache of shared_ptrs. +template +class ReadMostlySharedPtr : boost::noncopyable { + public: + explicit ReadMostlySharedPtr(std::shared_ptr ptr = nullptr) { + master_.ptr = std::move(ptr); + master_.version.store(1); + } + + std::shared_ptr store(std::shared_ptr ptr) { + std::lock_guard guard(mutex_); + std::swap(master_.ptr, ptr); + master_.version.fetch_add(1); + return ptr; + } + + std::shared_ptr load() const { + // We are the only thread accessing threadLocalCache_->version so it is + // fine to use memory_order_relaxed + auto local_version = + threadLocalCache_->version.load(std::memory_order_relaxed); + if (local_version != master_.version.load()) { + std::lock_guard guard(mutex_); + threadLocalCache_->ptr = master_.ptr; + threadLocalCache_->version.store(master_.version.load(), + std::memory_order_relaxed); + } + return threadLocalCache_->ptr; + } + + private: + struct VersionedPointer : boost::noncopyable { + VersionedPointer() : version(0) { } + std::shared_ptr ptr; + std::atomic version; + }; + + folly::ThreadLocal threadLocalCache_; + VersionedPointer master_; + + // Ensures safety between concurrent store() and load() calls + mutable std::mutex mutex_; +}; + +} + + +/** + * At the moment the fastest implementation in this benchmark. + * A real RCU implementation would most likely be significantly better. + */ +namespace fast { + +/** + * Contains a version number and a shared_ptr that points to the most recent + * object. The load() method uses thread-local storage to efficiently return + * the current pointer without locking when the pointer has not changed. + * The version of the pointer in thread-local cache is compared to the + * master version. If the master is found to be newer, it is copied into + * the thread-local cache under a lock. The store() method grabs the lock, + * updates the master pointer and bumps the version number. + * + * The downside is that it doesn't clear or update thread-local cache + * when updating the pointer. This means that old instances of T can stay + * alive in thread-local cache indefinitely if load() is not called from + * some threads. + */ +template +class ReadMostlySharedPtr : boost::noncopyable { + public: + explicit ReadMostlySharedPtr(std::shared_ptr ptr = nullptr) { + masterPtr_ = std::move(ptr); + masterVersion_.store(1); + } + + /** + * Replaces the managed object. + */ + void store(std::shared_ptr ptr) { + { + std::lock_guard guard(mutex_); + // Swap to avoid calling ~T() under the lock + std::swap(masterPtr_, ptr); + masterVersion_.fetch_add(1); + } + } + + /** + * Returns a shared_ptr to the managed object. + */ + std::shared_ptr load() const { + auto& local = *threadLocalCache_; + if (local.version != masterVersion_.load()) { + std::lock_guard guard(mutex_); + + if (!masterPtr_) { + local.ptr = nullptr; + } else { + // The following expression is tricky. + // + // It creates a shared_ptr> that points to a copy of + // masterPtr_. The reference counter of this shared_ptr> + // will normally only be modified from this thread, which avoids + // cache line bouncing. (Though the caller is free to pass the pointer + // to other threads and bump reference counter from there) + // + // Then this shared_ptr> is turned into shared_ptr. + // This means that the returned shared_ptr will internally point to + // control block of the shared_ptr>, but will dereference + // to T, not shared_ptr. + local.ptr = std::shared_ptr( + std::make_shared>(masterPtr_), + masterPtr_.get()); + } + + local.version = masterVersion_.load(); + } + return local.ptr; + } + + private: + struct VersionedPointer : boost::noncopyable { + VersionedPointer() { } + std::shared_ptr ptr; + uint64_t version = 0; + }; + + folly::ThreadLocal threadLocalCache_; + + std::shared_ptr masterPtr_; + std::atomic masterVersion_; + + // Ensures safety between concurrent store() and load() calls + mutable std::mutex mutex_; +}; + +} + + +template +void benchReads(int n) { + PtrInt ptr(folly::make_unique(42)); + for (int i = 0; i < n; ++i) { + auto val = ptr.load(); + folly::doNotOptimizeAway(val.get()); + } +} + +template +void benchWrites(int n) { + PtrInt ptr; + for (int i = 0; i < n; ++i) { + ptr.store(folly::make_unique(3)); + } +} + +template +void benchReadsWhenWriting(int n) { + PtrInt ptr; + std::atomic shutdown {false}; + std::thread writing_thread; + + BENCHMARK_SUSPEND { + writing_thread = std::thread([&] { + for (uint64_t i = 0; !shutdown.load(); ++i) { + ptr.store(folly::make_unique(3)); + } + }); + } + + for (uint64_t i = 0; i < n; ++i) { + auto val = ptr.load(); + folly::doNotOptimizeAway(val.get()); + } + + BENCHMARK_SUSPEND { + shutdown.store(true); + writing_thread.join(); + } +} + + +template +void benchWritesWhenReading(int n) { + PtrInt ptr; + std::atomic shutdown {false}; + std::thread reading_thread; + + BENCHMARK_SUSPEND { + reading_thread = std::thread([&] { + for (uint64_t i = 0; !shutdown.load(); ++i) { + auto val = ptr.load(); + folly::doNotOptimizeAway(val.get()); + } + }); + } + + + for (uint64_t i = 0; i < n; ++i) { + ptr.store(folly::make_unique(3)); + } + + BENCHMARK_SUSPEND { + shutdown.store(true); + reading_thread.join(); + } +} + + +template +void benchReadsIn10Threads(int n) { + PtrInt ptr(folly::make_unique(27)); + std::vector threads(10); + int n_per_thread = n; + + for (std::thread& t: threads) { + t = std::thread([&] { + for (int i = 0; i < n; ++i) { + auto val = ptr.load(); + folly::doNotOptimizeAway(val.get()); + } + }); + } + + for (std::thread& t: threads) { + t.join(); + } +} + + +#define BENCH(name) \ + BENCHMARK(name ## _Slow, n) { \ + bench ## name >(n); \ + } \ + BENCHMARK(name ## _ReadMostlySharedPtr, n) { \ + bench ## name >(n);\ + } \ + BENCHMARK(name ## _FastReadMostlySharedPtr, n) { \ + bench ## name >(n); \ + } \ + BENCHMARK_DRAW_LINE(); + + +BENCH(Reads) +BENCH(Writes) +BENCH(ReadsWhenWriting) +BENCH(WritesWhenReading) +BENCH(ReadsIn10Threads) + +int main(int argc, char** argv) { + gflags::ParseCommandLineFlags(&argc, &argv, true); + gflags::SetCommandLineOptionWithMode( + "bm_min_usec", "100000", gflags::SET_FLAG_IF_DEFAULT + ); + + folly::runBenchmarks(); + + return 0; +} + +/* +============================================================================ +folly/test/ReadMostlySharedPtrBenchmark.cpp relative time/iter iters/s +============================================================================ +Reads_Slow 21.05ns 47.52M +Reads_ReadMostlySharedPtr 30.57ns 32.71M +Reads_FastReadMostlySharedPtr 21.24ns 47.09M +---------------------------------------------------------------------------- +Writes_Slow 117.52ns 8.51M +Writes_ReadMostlySharedPtr 145.26ns 6.88M +Writes_FastReadMostlySharedPtr 116.26ns 8.60M +---------------------------------------------------------------------------- +ReadsWhenWriting_Slow 56.18ns 17.80M +ReadsWhenWriting_ReadMostlySharedPtr 141.32ns 7.08M +ReadsWhenWriting_FastReadMostlySharedPtr 51.82ns 19.30M +---------------------------------------------------------------------------- +WritesWhenReading_Slow 828.32ns 1.21M +WritesWhenReading_ReadMostlySharedPtr 3.00us 333.63K +WritesWhenReading_FastReadMostlySharedPtr 677.28ns 1.48M +---------------------------------------------------------------------------- +ReadsIn10Threads_Slow 509.37ns 1.96M +ReadsIn10Threads_ReadMostlySharedPtr 34.33ns 29.13M +ReadsIn10Threads_FastReadMostlySharedPtr 26.31ns 38.00M +---------------------------------------------------------------------------- +============================================================================ +*/ diff --git a/folly/test/ReadMostlySharedPtrTest.cpp b/folly/test/ReadMostlySharedPtrTest.cpp new file mode 100644 index 00000000..2aeb4fca --- /dev/null +++ b/folly/test/ReadMostlySharedPtrTest.cpp @@ -0,0 +1,382 @@ +/* + * Copyright 2015 Facebook, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +/* -*- Mode: C++; tab-width: 2; c-basic-offset: 2; indent-tabs-mode: nil -*- */ + +#include +#include +#include +#include +#include +#include + +#include + +using folly::ReadMostlySharedPtr; + +// send SIGALRM to test process after this many seconds +const unsigned int TEST_TIMEOUT = 10; + +class ReadMostlySharedPtrTest : public ::testing::Test { + public: + ReadMostlySharedPtrTest() { + alarm(TEST_TIMEOUT); + } +}; + +struct TestObject { + int value; + std::atomic& counter; + + TestObject(int value, std::atomic& counter) + : value(value), counter(counter) { + ++counter; + } + + ~TestObject() { + assert(counter.load() > 0); + --counter; + } +}; + +// One side calls requestAndWait(), the other side calls waitForRequest(), +// does something and calls completed(). +class Coordinator { + public: + void requestAndWait() { + { + std::lock_guard lock(mutex); + assert(!is_requested); + assert(!is_completed); + is_requested = true; + } + cv.notify_all(); + { + std::unique_lock lock(mutex); + cv.wait(lock, [&] { return is_completed; }); + } + } + + void waitForRequest() { + std::unique_lock lock(mutex); + assert(!is_completed); + cv.wait(lock, [&] { return is_requested; }); + } + + void completed() { + { + std::lock_guard lock(mutex); + assert(is_requested); + is_completed = true; + } + cv.notify_all(); + } + + private: + bool is_requested = false; + bool is_completed = false; + std::condition_variable cv; + std::mutex mutex; +}; + +TEST_F(ReadMostlySharedPtrTest, BasicStores) { + ReadMostlySharedPtr ptr; + + // Store 1. + std::atomic cnt1{0}; + ptr.store(folly::make_unique(1, cnt1)); + EXPECT_EQ(1, cnt1.load()); + + // Store 2, check that 1 is destroyed. + std::atomic cnt2{0}; + ptr.store(folly::make_unique(2, cnt2)); + EXPECT_EQ(1, cnt2.load()); + EXPECT_EQ(0, cnt1.load()); + + // Store nullptr, check that 2 is destroyed. + ptr.store(nullptr); + EXPECT_EQ(0, cnt2.load()); +} + +TEST_F(ReadMostlySharedPtrTest, BasicLoads) { + std::atomic cnt2{0}; + ReadMostlySharedPtr::ReadPtr x; + + { + ReadMostlySharedPtr ptr; + + // Check that ptr is initially nullptr. + EXPECT_EQ(ptr.load(), nullptr); + + std::atomic cnt1{0}; + ptr.store(folly::make_unique(1, cnt1)); + EXPECT_EQ(1, cnt1.load()); + + x = ptr.load(); + EXPECT_EQ(1, x->value); + + ptr.store(folly::make_unique(2, cnt2)); + EXPECT_EQ(1, cnt2.load()); + EXPECT_EQ(1, cnt1.load()); + + x = ptr.load(); + EXPECT_EQ(2, x->value); + EXPECT_EQ(0, cnt1.load()); + + ptr.store(nullptr); + EXPECT_EQ(1, cnt2.load()); + } + + EXPECT_EQ(1, cnt2.load()); + + x.reset(); + EXPECT_EQ(0, cnt2.load()); +} + +TEST_F(ReadMostlySharedPtrTest, LoadsFromThreads) { + std::atomic cnt{0}; + + { + ReadMostlySharedPtr ptr; + Coordinator loads[7]; + + std::thread t1([&] { + loads[0].waitForRequest(); + EXPECT_EQ(ptr.load(), nullptr); + loads[0].completed(); + + loads[3].waitForRequest(); + EXPECT_EQ(2, ptr.load()->value); + loads[3].completed(); + + loads[4].waitForRequest(); + EXPECT_EQ(4, ptr.load()->value); + loads[4].completed(); + + loads[5].waitForRequest(); + EXPECT_EQ(5, ptr.load()->value); + loads[5].completed(); + }); + + std::thread t2([&] { + loads[1].waitForRequest(); + EXPECT_EQ(1, ptr.load()->value); + loads[1].completed(); + + loads[2].waitForRequest(); + EXPECT_EQ(2, ptr.load()->value); + loads[2].completed(); + + loads[6].waitForRequest(); + EXPECT_EQ(5, ptr.load()->value); + loads[6].completed(); + }); + + loads[0].requestAndWait(); + + ptr.store(folly::make_unique(1, cnt)); + loads[1].requestAndWait(); + + ptr.store(folly::make_unique(2, cnt)); + loads[2].requestAndWait(); + loads[3].requestAndWait(); + + ptr.store(folly::make_unique(3, cnt)); + ptr.store(folly::make_unique(4, cnt)); + loads[4].requestAndWait(); + + ptr.store(folly::make_unique(5, cnt)); + loads[5].requestAndWait(); + loads[6].requestAndWait(); + + EXPECT_EQ(1, cnt.load()); + + t1.join(); + t2.join(); + } + + EXPECT_EQ(0, cnt.load()); +} + +TEST_F(ReadMostlySharedPtrTest, Ctor) { + std::atomic cnt1{0}; + { + ReadMostlySharedPtr ptr( + folly::make_unique(1, cnt1)); + + EXPECT_EQ(1, ptr.load()->value); + } + + EXPECT_EQ(0, cnt1.load()); +} + +TEST_F(ReadMostlySharedPtrTest, ClearingCache) { + ReadMostlySharedPtr ptr; + + // Store 1. + std::atomic cnt1{0}; + ptr.store(folly::make_unique(1, cnt1)); + + Coordinator c; + + std::thread t([&] { + // Cache the pointer for this thread. + ptr.load(); + c.requestAndWait(); + }); + + // Wait for the thread to cache pointer. + c.waitForRequest(); + EXPECT_EQ(1, cnt1.load()); + + // Store 2 and check that 1 is destroyed. + std::atomic cnt2{0}; + ptr.store(folly::make_unique(2, cnt2)); + EXPECT_EQ(0, cnt1.load()); + + // Unblock thread. + c.completed(); + t.join(); +} + +TEST_F(ReadMostlySharedPtrTest, SlowDestructor) { + struct Thingy { + Coordinator* dtor; + + Thingy(Coordinator* dtor = nullptr) : dtor(dtor) {} + + ~Thingy() { + if (dtor) { + dtor->requestAndWait(); + } + } + }; + + Coordinator dtor; + + ReadMostlySharedPtr ptr; + ptr.store(folly::make_unique(&dtor)); + + std::thread t([&] { + // This will block in ~Thingy(). + ptr.store(folly::make_unique()); + }); + + // Wait until store() in thread calls ~T(). + dtor.waitForRequest(); + // Do a store while another store() is stuck in ~T(). + ptr.store(folly::make_unique()); + // Let the other store() go. + dtor.completed(); + + t.join(); +} + +TEST_F(ReadMostlySharedPtrTest, StressTest) { + const int ptr_count = 2; + const int thread_count = 5; + const std::chrono::milliseconds duration(100); + const std::chrono::milliseconds upd_delay(1); + const std::chrono::milliseconds respawn_delay(1); + + struct Instance { + std::atomic value{0}; + std::atomic prev_value{0}; + ReadMostlySharedPtr ptr; + }; + + struct Thread { + std::thread t; + std::atomic shutdown{false}; + }; + + std::atomic counter(0); + std::vector instances(ptr_count); + std::vector threads(thread_count); + std::atomic seed(0); + + // Threads that call load() and checking value. + auto thread_func = [&](int t) { + pthread_setname_np(pthread_self(), + ("load" + folly::to(t)).c_str()); + std::mt19937 rnd(++seed); + while (!threads[t].shutdown.load()) { + Instance& instance = instances[rnd() % instances.size()]; + int val1 = instance.prev_value.load(); + auto p = instance.ptr.load(); + int val = p ? p->value : 0; + int val2 = instance.value.load(); + EXPECT_LE(val1, val); + EXPECT_LE(val, val2); + } + }; + + for (size_t t = 0; t < threads.size(); ++t) { + threads[t].t = std::thread(thread_func, t); + } + + std::atomic shutdown(false); + + // Thread that calls store() occasionally. + std::thread update_thread([&] { + pthread_setname_np(pthread_self(), "store"); + std::mt19937 rnd(++seed); + while (!shutdown.load()) { + Instance& instance = instances[rnd() % instances.size()]; + int val = ++instance.value; + instance.ptr.store(folly::make_unique(val, counter)); + ++instance.prev_value; + /* sleep override */ + std::this_thread::sleep_for(upd_delay); + } + }); + + // Thread that joins and spawns load() threads occasionally. + std::thread respawn_thread([&] { + pthread_setname_np(pthread_self(), "respawn"); + std::mt19937 rnd(++seed); + while (!shutdown.load()) { + int t = rnd() % threads.size(); + threads[t].shutdown.store(true); + threads[t].t.join(); + threads[t].shutdown.store(false); + threads[t].t = std::thread(thread_func, t); + + /* sleep override */ + std::this_thread::sleep_for(respawn_delay); + } + }); + + // Let all of this run for some time. + /* sleep override */ + std::this_thread::sleep_for(duration); + + // Shut all of this down. + shutdown.store(true); + + update_thread.join(); + respawn_thread.join(); + for (auto& t: threads) { + t.shutdown.store(true); + t.t.join(); + } + + for (auto& instance: instances) { + instance.ptr.store(nullptr); + EXPECT_EQ(instance.value.load(), instance.prev_value.load()); + } + + EXPECT_EQ(0, counter.load()); +} -- 2.34.1