*/
#include "FiberManagerMap.h"
-#include <cassert>
#include <memory>
#include <unordered_map>
+#include <folly/AtomicLinkedList.h>
#include <folly/ThreadLocal.h>
+#include <folly/Synchronized.h>
namespace folly { namespace fibers {
namespace {
-// Leak these intentionally. During shutdown, we may call getFiberManager, and
-// want access to the fiber managers during that time.
-class LocalFiberManagerMapTag;
-typedef folly::ThreadLocal<
- std::unordered_map<folly::EventBase*, FiberManager*>,
- LocalFiberManagerMapTag>
- LocalMapType;
-LocalMapType* localFiberManagerMap() {
- static auto ret = new LocalMapType();
- return ret;
-}
+class EventBaseOnDestructionCallback : public EventBase::LoopCallback {
+ public:
+ explicit EventBaseOnDestructionCallback(EventBase& evb) : evb_(evb) {}
+ void runLoopCallback() noexcept override;
-typedef
- std::unordered_map<folly::EventBase*, std::unique_ptr<FiberManager>>
- MapType;
-MapType* fiberManagerMap() {
- static auto ret = new MapType();
- return ret;
-}
+ private:
+ EventBase& evb_;
+};
-std::mutex* fiberManagerMapMutex() {
- static auto ret = new std::mutex();
- return ret;
-}
+class GlobalCache {
+ public:
+ static FiberManager& get(EventBase& evb, const FiberManager::Options& opts) {
+ return instance().getImpl(evb, opts);
+ }
+ static std::unique_ptr<FiberManager> erase(EventBase& evb) {
+ return instance().eraseImpl(evb);
+ }
-class OnEventBaseDestructionCallback : public folly::EventBase::LoopCallback {
- public:
- explicit OnEventBaseDestructionCallback(folly::EventBase& evb)
- : evb_(&evb) {}
- void runLoopCallback() noexcept override {
- for (auto& localMap : localFiberManagerMap()->accessAllThreads()) {
- localMap.erase(evb_);
+ private:
+ GlobalCache() {}
+
+ // Leak this intentionally. During shutdown, we may call getFiberManager,
+ // and want access to the fiber managers during that time.
+ static GlobalCache& instance() {
+ static auto ret = new GlobalCache();
+ return *ret;
+ }
+
+ FiberManager& getImpl(EventBase& evb, const FiberManager::Options& opts) {
+ std::lock_guard<std::mutex> lg(mutex_);
+
+ auto& fmPtrRef = map_[&evb];
+
+ if (!fmPtrRef) {
+ auto loopController = make_unique<EventBaseLoopController>();
+ loopController->attachEventBase(evb);
+ evb.runOnDestruction(new EventBaseOnDestructionCallback(evb));
+
+ fmPtrRef = make_unique<FiberManager>(std::move(loopController), opts);
}
- std::unique_ptr<FiberManager> fm;
- {
- std::lock_guard<std::mutex> lg(*fiberManagerMapMutex());
- auto it = fiberManagerMap()->find(evb_);
- assert(it != fiberManagerMap()->end());
- fm = std::move(it->second);
- fiberManagerMap()->erase(it);
+
+ return *fmPtrRef;
+ }
+
+ std::unique_ptr<FiberManager> eraseImpl(EventBase& evb) {
+ std::lock_guard<std::mutex> lg(mutex_);
+
+ DCHECK_EQ(1, map_.count(&evb));
+
+ auto ret = std::move(map_[&evb]);
+ map_.erase(&evb);
+ return ret;
+ }
+
+ std::mutex mutex_;
+ std::unordered_map<EventBase*, std::unique_ptr<FiberManager>> map_;
+};
+
+constexpr size_t kEraseListMaxSize = 64;
+
+class ThreadLocalCache {
+ public:
+ static FiberManager& get(EventBase& evb, const FiberManager::Options& opts) {
+ return instance()->getImpl(evb, opts);
+ }
+
+ static void erase(EventBase& evb) {
+ for (auto& localInstance : instance().accessAllThreads()) {
+ SYNCHRONIZED(info, localInstance.eraseInfo_) {
+ if (info.eraseList.size() >= kEraseListMaxSize) {
+ info.eraseAll = true;
+ } else {
+ info.eraseList.push_back(&evb);
+ }
+ localInstance.eraseRequested_ = true;
+ }
}
- assert(fm.get() != nullptr);
- fm->loopUntilNoReady();
- delete this;
}
+
private:
- folly::EventBase* evb_;
-};
+ ThreadLocalCache() {}
-FiberManager* getFiberManagerThreadSafe(folly::EventBase& evb,
- const FiberManager::Options& opts) {
- std::lock_guard<std::mutex> lg(*fiberManagerMapMutex());
+ struct ThreadLocalCacheTag {};
+ using ThreadThreadLocalCache = ThreadLocal<ThreadLocalCache, ThreadLocalCacheTag>;
- auto it = fiberManagerMap()->find(&evb);
- if (LIKELY(it != fiberManagerMap()->end())) {
- return it->second.get();
+ // Leak this intentionally. During shutdown, we may call getFiberManager,
+ // and want access to the fiber managers during that time.
+ static ThreadThreadLocalCache& instance() {
+ static auto ret = new ThreadThreadLocalCache([]() { return new ThreadLocalCache(); });
+ return *ret;
}
- auto loopController = folly::make_unique<EventBaseLoopController>();
- loopController->attachEventBase(evb);
- auto fiberManager =
- folly::make_unique<FiberManager>(std::move(loopController), opts);
- auto result = fiberManagerMap()->emplace(&evb, std::move(fiberManager));
- evb.runOnDestruction(new OnEventBaseDestructionCallback(evb));
- return result.first->second.get();
+ FiberManager& getImpl(EventBase& evb, const FiberManager::Options& opts) {
+ eraseImpl();
+
+ auto& fmPtrRef = map_[&evb];
+ if (!fmPtrRef) {
+ fmPtrRef = &GlobalCache::get(evb, opts);
+ }
+
+ DCHECK(fmPtrRef != nullptr);
+
+ return *fmPtrRef;
+ }
+
+ void eraseImpl() {
+ if (!eraseRequested_.load()) {
+ return;
+ }
+
+ SYNCHRONIZED(info, eraseInfo_) {
+ if (info.eraseAll) {
+ map_.clear();
+ } else {
+ for (auto evbPtr : info.eraseList) {
+ map_.erase(evbPtr);
+ }
+ }
+
+ info.eraseList.clear();
+ info.eraseAll = false;
+ eraseRequested_ = false;
+ }
+ }
+
+ std::unordered_map<EventBase*, FiberManager*> map_;
+ std::atomic<bool> eraseRequested_{false};
+
+ struct EraseInfo {
+ bool eraseAll{false};
+ std::vector<EventBase*> eraseList;
+ };
+
+ folly::Synchronized<EraseInfo> eraseInfo_;
+};
+
+void EventBaseOnDestructionCallback::runLoopCallback() noexcept {
+ auto fm = GlobalCache::erase(evb_);
+ DCHECK(fm.get() != nullptr);
+ ThreadLocalCache::erase(evb_);
+
+ fm->loopUntilNoReady();
+
+ delete this;
}
} // namespace
-FiberManager& getFiberManager(folly::EventBase& evb,
+FiberManager& getFiberManager(EventBase& evb,
const FiberManager::Options& opts) {
- auto it = (*localFiberManagerMap())->find(&evb);
- if (LIKELY(it != (*localFiberManagerMap())->end())) {
- return *(it->second);
- }
-
- auto fm = getFiberManagerThreadSafe(evb, opts);
- (*localFiberManagerMap())->emplace(&evb, fm);
- return *fm;
+ return ThreadLocalCache::get(evb, opts);
}
}}