From 1ac421a539aaf2f349591d4bee36c75ff0ee052c Mon Sep 17 00:00:00 2001 From: Andrii Grynenko Date: Wed, 27 Jan 2016 16:08:08 -0800 Subject: [PATCH] Fix EventBase destruction race in FiberManagerMap Summary: Previously we could be reading from thread-local FiberManagerMap while it was modified. This is now fixed by keeping a per-thread list of EventBases which need to be removed from local maps. On the fast-path no action is taken, since list will be empty. Reviewed By: yfeldblum Differential Revision: D2853921 fb-gh-sync-id: f05e1924dd2b97bfb359537de1909bbe193e0cb9 --- folly/AtomicIntrusiveLinkedList.h | 128 ++++++++++++ folly/AtomicLinkedList.h | 106 +++------- folly/Makefile.am | 1 + folly/experimental/fibers/Fiber.h | 4 +- folly/experimental/fibers/FiberManager.h | 9 +- folly/experimental/fibers/FiberManagerMap.cpp | 172 ++++++++++------ folly/test/AtomicLinkedListTest.cpp | 184 ++++++++++++++++++ 7 files changed, 455 insertions(+), 149 deletions(-) create mode 100644 folly/AtomicIntrusiveLinkedList.h create mode 100644 folly/test/AtomicLinkedListTest.cpp diff --git a/folly/AtomicIntrusiveLinkedList.h b/folly/AtomicIntrusiveLinkedList.h new file mode 100644 index 00000000..87e7d5b5 --- /dev/null +++ b/folly/AtomicIntrusiveLinkedList.h @@ -0,0 +1,128 @@ +/* + * Copyright 2016 Facebook, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#pragma once + +#include + +#include + +namespace folly { + +/** + * A very simple atomic single-linked list primitive. + * + * Usage: + * + * class MyClass { + * AtomicIntrusiveLinkedListHook hook_; + * } + * + * AtomicIntrusiveLinkedList list; + * list.insert(&a); + * list.sweep([] (MyClass* c) { doSomething(c); } + */ +template +struct AtomicIntrusiveLinkedListHook { + T* next{nullptr}; +}; + +template T::*HookMember> +class AtomicIntrusiveLinkedList { + public: + AtomicIntrusiveLinkedList() {} + AtomicIntrusiveLinkedList(const AtomicIntrusiveLinkedList&) = delete; + AtomicIntrusiveLinkedList& operator=(const AtomicIntrusiveLinkedList&) = + delete; + AtomicIntrusiveLinkedList(AtomicIntrusiveLinkedList&& other) noexcept { + *this = std::move(other); + } + AtomicIntrusiveLinkedList& operator=( + AtomicIntrusiveLinkedList&& other) noexcept { + auto tmp = other.head_.load(); + other.head_ = head_.load(); + head_ = tmp; + + return *this; + } + + /** + * Note: list must be empty on destruction. + */ + ~AtomicIntrusiveLinkedList() { DCHECK(empty()); } + + bool empty() const { return head_ == nullptr; } + + /** + * Atomically insert t at the head of the list. + * @return True if the inserted element is the only one in the list + * after the call. + */ + bool insertHead(T* t) { + DCHECK(next(t) == nullptr); + + auto oldHead = head_.load(std::memory_order_relaxed); + do { + next(t) = oldHead; + /* oldHead is updated by the call below. + + NOTE: we don't use next(t) instead of oldHead directly due to + compiler bugs (GCC prior to 4.8.3 (bug 60272), clang (bug 18899), + MSVC (bug 819819); source: + http://en.cppreference.com/w/cpp/atomic/atomic/compare_exchange */ + } while (!head_.compare_exchange_weak( + oldHead, t, std::memory_order_release, std::memory_order_relaxed)); + + return oldHead == nullptr; + } + + /** + * Repeatedly replaces the head with nullptr, + * and calls func() on the removed elements in the order from tail to head. + * Stops when the list is empty. + */ + template + void sweep(F&& func) { + while (auto head = head_.exchange(nullptr)) { + auto rhead = reverse(head); + while (rhead != nullptr) { + auto t = rhead; + rhead = next(t); + next(t) = nullptr; + func(t); + } + } + } + + private: + std::atomic head_{nullptr}; + + static T*& next(T* t) { return (t->*HookMember).next; } + + /* Reverses a linked list, returning the pointer to the new head + (old tail) */ + static T* reverse(T* head) { + T* rhead = nullptr; + while (head != nullptr) { + auto t = head; + head = next(t); + next(t) = rhead; + rhead = t; + } + return rhead; + } +}; + +} // namespace folly diff --git a/folly/AtomicLinkedList.h b/folly/AtomicLinkedList.h index f24746a5..ee826a7b 100644 --- a/folly/AtomicLinkedList.h +++ b/folly/AtomicLinkedList.h @@ -13,13 +13,14 @@ * See the License for the specific language governing permissions and * limitations under the License. */ - -#ifndef FOLLY_ATOMIC_LINKED_LIST_H_ -#define FOLLY_ATOMIC_LINKED_LIST_H_ +#pragma once #include #include +#include +#include + namespace folly { /** @@ -27,112 +28,59 @@ namespace folly { * * Usage: * - * class MyClass { - * AtomicLinkedListHook hook_; - * } - * - * AtomicLinkedList list; - * list.insert(&a); - * list.sweep([] (MyClass* c) { doSomething(c); } + * AtomicLinkedList list; + * list.insert(a); + * list.sweep([] (MyClass& c) { doSomething(c); } */ -template -struct AtomicLinkedListHook { - T* next{nullptr}; -}; -template T::* HookMember> +template class AtomicLinkedList { public: AtomicLinkedList() {} AtomicLinkedList(const AtomicLinkedList&) = delete; AtomicLinkedList& operator=(const AtomicLinkedList&) = delete; - AtomicLinkedList(AtomicLinkedList&& other) noexcept { - auto tmp = other.head_.load(); - other.head_ = head_.load(); - head_ = tmp; - } - AtomicLinkedList& operator=(AtomicLinkedList&& other) noexcept { - auto tmp = other.head_.load(); - other.head_ = head_.load(); - head_ = tmp; + AtomicLinkedList(AtomicLinkedList&& other) noexcept = default; + AtomicLinkedList& operator=(AtomicLinkedList&& other) = default; - return *this; - } - - /** - * Note: list must be empty on destruction. - */ ~AtomicLinkedList() { - assert(empty()); + sweep([](T&&) {}); } - bool empty() const { - return head_ == nullptr; - } + bool empty() const { return list_.empty(); } /** * Atomically insert t at the head of the list. * @return True if the inserted element is the only one in the list * after the call. */ - bool insertHead(T* t) { - assert(next(t) == nullptr); - - auto oldHead = head_.load(std::memory_order_relaxed); - do { - next(t) = oldHead; - /* oldHead is updated by the call below. + bool insertHead(T t) { + auto wrapper = folly::make_unique(std::move(t)); - NOTE: we don't use next(t) instead of oldHead directly due to - compiler bugs (GCC prior to 4.8.3 (bug 60272), clang (bug 18899), - MSVC (bug 819819); source: - http://en.cppreference.com/w/cpp/atomic/atomic/compare_exchange */ - } while (!head_.compare_exchange_weak(oldHead, t, - std::memory_order_release, - std::memory_order_relaxed)); - - return oldHead == nullptr; + return list_.insertHead(wrapper.release()); } /** - * Repeatedly replaces the head with nullptr, + * Repeatedly pops element from head, * and calls func() on the removed elements in the order from tail to head. * Stops when the list is empty. */ template void sweep(F&& func) { - while (auto head = head_.exchange(nullptr)) { - auto rhead = reverse(head); - while (rhead != nullptr) { - auto t = rhead; - rhead = next(t); - next(t) = nullptr; - func(t); - } - } + list_.sweep([&](Wrapper* wrapperPtr) mutable { + std::unique_ptr wrapper(wrapperPtr); + + func(std::move(wrapper->data)); + }); } private: - std::atomic head_{nullptr}; + struct Wrapper { + explicit Wrapper(T&& t) : data(std::move(t)) {} - static T*& next(T* t) { - return (t->*HookMember).next; - } - - /* Reverses a linked list, returning the pointer to the new head - (old tail) */ - static T* reverse(T* head) { - T* rhead = nullptr; - while (head != nullptr) { - auto t = head; - head = next(t); - next(t) = rhead; - rhead = t; - } - return rhead; - } + AtomicIntrusiveLinkedListHook hook; + T data; + }; + AtomicIntrusiveLinkedList list_; }; } // namespace folly - -#endif diff --git a/folly/Makefile.am b/folly/Makefile.am index a9704ea6..e5427199 100644 --- a/folly/Makefile.am +++ b/folly/Makefile.am @@ -27,6 +27,7 @@ nobase_follyinclude_HEADERS = \ AtomicHashArray-inl.h \ AtomicHashMap.h \ AtomicHashMap-inl.h \ + AtomicIntrusiveLinkedList.h \ AtomicLinkedList.h \ AtomicStruct.h \ AtomicUnorderedMap.h \ diff --git a/folly/experimental/fibers/Fiber.h b/folly/experimental/fibers/Fiber.h index 0732dd71..3c31e957 100644 --- a/folly/experimental/fibers/Fiber.h +++ b/folly/experimental/fibers/Fiber.h @@ -20,7 +20,7 @@ #include #include -#include +#include #include #include #include @@ -126,7 +126,7 @@ class Fiber { /** * Points to next fiber in remote ready list */ - folly::AtomicLinkedListHook nextRemoteReady_; + folly::AtomicIntrusiveLinkedListHook nextRemoteReady_; static constexpr size_t kUserBufferSize = 256; std::aligned_storage::type userBuffer_; diff --git a/folly/experimental/fibers/FiberManager.h b/folly/experimental/fibers/FiberManager.h index 549d3f32..62c41a1e 100644 --- a/folly/experimental/fibers/FiberManager.h +++ b/folly/experimental/fibers/FiberManager.h @@ -23,7 +23,7 @@ #include #include -#include +#include #include #include #include @@ -307,7 +307,7 @@ class FiberManager : public ::folly::Executor { std::function func; std::unique_ptr localData; std::shared_ptr rcontext; - AtomicLinkedListHook nextRemoteTask; + AtomicIntrusiveLinkedListHook nextRemoteTask; }; typedef folly::IntrusiveList FiberTailQueue; @@ -414,9 +414,10 @@ class FiberManager : public ::folly::Executor { ExceptionCallback exceptionCallback_; /**< task exception callback */ - folly::AtomicLinkedList remoteReadyQueue_; + folly::AtomicIntrusiveLinkedList + remoteReadyQueue_; - folly::AtomicLinkedList + folly::AtomicIntrusiveLinkedList remoteTaskQueue_; std::shared_ptr timeoutManager_; diff --git a/folly/experimental/fibers/FiberManagerMap.cpp b/folly/experimental/fibers/FiberManagerMap.cpp index 9e63bd51..df1cce36 100644 --- a/folly/experimental/fibers/FiberManagerMap.cpp +++ b/folly/experimental/fibers/FiberManagerMap.cpp @@ -15,96 +15,140 @@ */ #include "FiberManagerMap.h" -#include #include #include +#include #include namespace folly { namespace fibers { namespace { -// Leak these intentionally. During shutdown, we may call getFiberManager, and -// want access to the fiber managers during that time. -class LocalFiberManagerMapTag; -typedef folly::ThreadLocal< - std::unordered_map, - LocalFiberManagerMapTag> - LocalMapType; -LocalMapType* localFiberManagerMap() { - static auto ret = new LocalMapType(); - return ret; -} +class OnEventBaseDestructionCallback : public EventBase::LoopCallback { + public: + explicit OnEventBaseDestructionCallback(EventBase& evb) : evb_(evb) {} + void runLoopCallback() noexcept override; -typedef - std::unordered_map> - MapType; -MapType* fiberManagerMap() { - static auto ret = new MapType(); - return ret; -} + private: + EventBase& evb_; +}; -std::mutex* fiberManagerMapMutex() { - static auto ret = new std::mutex(); - return ret; -} +class GlobalCache { + public: + static FiberManager& get(EventBase& evb, const FiberManager::Options& opts) { + return instance().getImpl(evb, opts); + } + + static std::unique_ptr erase(EventBase& evb) { + return instance().eraseImpl(evb); + } + private: + GlobalCache() {} -class OnEventBaseDestructionCallback : public folly::EventBase::LoopCallback { - public: - explicit OnEventBaseDestructionCallback(folly::EventBase& evb) - : evb_(&evb) {} - void runLoopCallback() noexcept override { - for (auto& localMap : localFiberManagerMap()->accessAllThreads()) { - localMap.erase(evb_); + // Leak this intentionally. During shutdown, we may call getFiberManager, + // and want access to the fiber managers during that time. + static GlobalCache& instance() { + static auto ret = new GlobalCache(); + return *ret; + } + + FiberManager& getImpl(EventBase& evb, const FiberManager::Options& opts) { + std::lock_guard lg(mutex_); + + auto& fmPtrRef = map_[&evb]; + + if (!fmPtrRef) { + auto loopController = make_unique(); + loopController->attachEventBase(evb); + evb.runOnDestruction(new OnEventBaseDestructionCallback(evb)); + + fmPtrRef = make_unique(std::move(loopController), opts); } - std::unique_ptr fm; - { - std::lock_guard lg(*fiberManagerMapMutex()); - auto it = fiberManagerMap()->find(evb_); - assert(it != fiberManagerMap()->end()); - fm = std::move(it->second); - fiberManagerMap()->erase(it); + + return *fmPtrRef; + } + + std::unique_ptr eraseImpl(EventBase& evb) { + std::lock_guard lg(mutex_); + + DCHECK(map_.find(&evb) != map_.end()); + + auto ret = std::move(map_[&evb]); + map_.erase(&evb); + return ret; + } + + std::mutex mutex_; + std::unordered_map> map_; +}; + +class LocalCache { + public: + static FiberManager& get(EventBase& evb, const FiberManager::Options& opts) { + return instance()->getImpl(evb, opts); + } + + static void erase(EventBase& evb) { + for (auto& localInstance : instance().accessAllThreads()) { + localInstance.removedEvbs_.insertHead(&evb); } - assert(fm.get() != nullptr); - fm->loopUntilNoReady(); - delete this; } + private: - folly::EventBase* evb_; -}; + LocalCache() {} -FiberManager* getFiberManagerThreadSafe(folly::EventBase& evb, - const FiberManager::Options& opts) { - std::lock_guard lg(*fiberManagerMapMutex()); + struct LocalCacheTag {}; + using ThreadLocalCache = ThreadLocal; - auto it = fiberManagerMap()->find(&evb); - if (LIKELY(it != fiberManagerMap()->end())) { - return it->second.get(); + // Leak this intentionally. During shutdown, we may call getFiberManager, + // and want access to the fiber managers during that time. + static ThreadLocalCache& instance() { + static auto ret = new ThreadLocalCache([]() { return new LocalCache(); }); + return *ret; } - auto loopController = folly::make_unique(); - loopController->attachEventBase(evb); - auto fiberManager = - folly::make_unique(std::move(loopController), opts); - auto result = fiberManagerMap()->emplace(&evb, std::move(fiberManager)); - evb.runOnDestruction(new OnEventBaseDestructionCallback(evb)); - return result.first->second.get(); + FiberManager& getImpl(EventBase& evb, const FiberManager::Options& opts) { + eraseImpl(); + + auto& fmPtrRef = map_[&evb]; + if (!fmPtrRef) { + fmPtrRef = &GlobalCache::get(evb, opts); + } + + DCHECK(fmPtrRef != nullptr); + + return *fmPtrRef; + } + + void eraseImpl() { + if (removedEvbs_.empty()) { + return; + } + + removedEvbs_.sweep([&](EventBase* evb) { map_.erase(evb); }); + } + + std::unordered_map map_; + AtomicLinkedList removedEvbs_; +}; + +void OnEventBaseDestructionCallback::runLoopCallback() noexcept { + auto fm = GlobalCache::erase(evb_); + DCHECK(fm.get() != nullptr); + LocalCache::erase(evb_); + + fm->loopUntilNoReady(); + + delete this; } } // namespace -FiberManager& getFiberManager(folly::EventBase& evb, +FiberManager& getFiberManager(EventBase& evb, const FiberManager::Options& opts) { - auto it = (*localFiberManagerMap())->find(&evb); - if (LIKELY(it != (*localFiberManagerMap())->end())) { - return *(it->second); - } - - auto fm = getFiberManagerThreadSafe(evb, opts); - (*localFiberManagerMap())->emplace(&evb, fm); - return *fm; + return LocalCache::get(evb, opts); } }} diff --git a/folly/test/AtomicLinkedListTest.cpp b/folly/test/AtomicLinkedListTest.cpp new file mode 100644 index 00000000..b9435f75 --- /dev/null +++ b/folly/test/AtomicLinkedListTest.cpp @@ -0,0 +1,184 @@ +/* + * Copyright 2016 Facebook, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include + +#include + +#include + +class TestIntrusiveObject { + public: + explicit TestIntrusiveObject(size_t id__) : id_(id__) {} + size_t id() { return id_; } + + private: + folly::AtomicIntrusiveLinkedListHook hook_; + size_t id_; + + public: + using List = folly::AtomicIntrusiveLinkedList; +}; + +TEST(AtomicIntrusiveLinkedList, Basic) { + TestIntrusiveObject a(1), b(2), c(3); + + TestIntrusiveObject::List list; + + EXPECT_TRUE(list.empty()); + + { + EXPECT_TRUE(list.insertHead(&a)); + EXPECT_FALSE(list.insertHead(&b)); + + EXPECT_FALSE(list.empty()); + + size_t id = 0; + list.sweep( + [&](TestIntrusiveObject* obj) mutable { EXPECT_EQ(++id, obj->id()); }); + + EXPECT_TRUE(list.empty()); + } + + // Try re-inserting the same item (b) and a new item (c) + { + EXPECT_TRUE(list.insertHead(&b)); + EXPECT_FALSE(list.insertHead(&c)); + + EXPECT_FALSE(list.empty()); + + size_t id = 1; + list.sweep( + [&](TestIntrusiveObject* obj) mutable { EXPECT_EQ(++id, obj->id()); }); + + EXPECT_TRUE(list.empty()); + } + + TestIntrusiveObject::List movedList = std::move(list); +} + +TEST(AtomicIntrusiveLinkedList, Move) { + TestIntrusiveObject a(1), b(2); + + TestIntrusiveObject::List list1; + + EXPECT_TRUE(list1.insertHead(&a)); + EXPECT_FALSE(list1.insertHead(&b)); + + EXPECT_FALSE(list1.empty()); + + TestIntrusiveObject::List list2(std::move(list1)); + + EXPECT_TRUE(list1.empty()); + EXPECT_FALSE(list2.empty()); + + TestIntrusiveObject::List list3; + + EXPECT_TRUE(list3.empty()); + + list3 = std::move(list2); + + EXPECT_TRUE(list2.empty()); + EXPECT_FALSE(list3.empty()); + + size_t id = 0; + list3.sweep( + [&](TestIntrusiveObject* obj) mutable { EXPECT_EQ(++id, obj->id()); }); +} + +TEST(AtomicIntrusiveLinkedList, Stress) { + constexpr size_t kNumThreads = 32; + constexpr size_t kNumElements = 100000; + + std::vector elements; + for (size_t i = 0; i < kNumThreads * kNumElements; ++i) { + elements.emplace_back(i); + } + + TestIntrusiveObject::List list; + + std::vector threads; + for (size_t threadId = 0; threadId < kNumThreads; ++threadId) { + threads.emplace_back( + [threadId, kNumThreads, kNumElements, &list, &elements]() { + for (size_t id = 0; id < kNumElements; ++id) { + list.insertHead(&elements[threadId + kNumThreads * id]); + } + }); + } + + std::vector ids; + TestIntrusiveObject* prev{nullptr}; + + while (ids.size() < kNumThreads * kNumThreads) { + list.sweep([&](TestIntrusiveObject* current) { + ids.push_back(current->id()); + + if (prev && prev->id() % kNumThreads == current->id() % kNumThreads) { + EXPECT_EQ(prev->id() + kNumThreads, current->id()); + } + + prev = current; + }); + } + + std::sort(ids.begin(), ids.end()); + + for (size_t i = 0; i < kNumThreads * kNumElements; ++i) { + EXPECT_EQ(i, ids[i]); + } + + for (auto& thread : threads) { + thread.join(); + } +} + +class TestObject { + public: + TestObject(size_t id__, std::shared_ptr ptr) : id_(id__), ptr_(ptr) {} + + size_t id() { return id_; } + + private: + size_t id_; + std::shared_ptr ptr_; +}; + +TEST(AtomicLinkedList, Basic) { + constexpr size_t kNumElements = 10; + + using List = folly::AtomicLinkedList; + List list; + + std::shared_ptr ptr = std::make_shared(42); + + for (size_t id = 0; id < kNumElements; ++id) { + list.insertHead({id, ptr}); + } + + size_t counter = 0; + + list.sweep([&](TestObject object) { + EXPECT_EQ(counter, object.id()); + + EXPECT_EQ(1 + kNumElements - counter, ptr.use_count()); + + ++counter; + }); + + EXPECT_TRUE(ptr.unique()); +} -- 2.34.1