Non-intrusive AtomicLinkedList
[folly.git] / folly / experimental / fibers / FiberManagerMap.cpp
1 /*
2  * Copyright 2016 Facebook, Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *   http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 #include "FiberManagerMap.h"
17
18 #include <memory>
19 #include <unordered_map>
20
21 #include <folly/ThreadLocal.h>
22 #include <folly/Synchronized.h>
23
24 namespace folly { namespace fibers {
25
26 namespace {
27
28 class EventBaseOnDestructionCallback : public EventBase::LoopCallback {
29  public:
30   explicit EventBaseOnDestructionCallback(EventBase& evb) : evb_(evb) {}
31   void runLoopCallback() noexcept override;
32
33  private:
34   EventBase& evb_;
35 };
36
37 class GlobalCache {
38  public:
39   static FiberManager& get(EventBase& evb, const FiberManager::Options& opts) {
40     return instance().getImpl(evb, opts);
41   }
42
43   static std::unique_ptr<FiberManager> erase(EventBase& evb) {
44     return instance().eraseImpl(evb);
45   }
46
47  private:
48   GlobalCache() {}
49
50   // Leak this intentionally. During shutdown, we may call getFiberManager,
51   // and want access to the fiber managers during that time.
52   static GlobalCache& instance() {
53     static auto ret = new GlobalCache();
54     return *ret;
55   }
56
57   FiberManager& getImpl(EventBase& evb, const FiberManager::Options& opts) {
58     std::lock_guard<std::mutex> lg(mutex_);
59
60     auto& fmPtrRef = map_[&evb];
61
62     if (!fmPtrRef) {
63       auto loopController = make_unique<EventBaseLoopController>();
64       loopController->attachEventBase(evb);
65       evb.runOnDestruction(new EventBaseOnDestructionCallback(evb));
66
67       fmPtrRef = make_unique<FiberManager>(std::move(loopController), opts);
68     }
69
70     return *fmPtrRef;
71   }
72
73   std::unique_ptr<FiberManager> eraseImpl(EventBase& evb) {
74     std::lock_guard<std::mutex> lg(mutex_);
75
76     DCHECK_EQ(1, map_.count(&evb));
77
78     auto ret = std::move(map_[&evb]);
79     map_.erase(&evb);
80     return ret;
81   }
82
83   std::mutex mutex_;
84   std::unordered_map<EventBase*, std::unique_ptr<FiberManager>> map_;
85 };
86
87 constexpr size_t kEraseListMaxSize = 64;
88
89 class ThreadLocalCache {
90  public:
91   static FiberManager& get(EventBase& evb, const FiberManager::Options& opts) {
92     return instance()->getImpl(evb, opts);
93   }
94
95   static void erase(EventBase& evb) {
96     for (auto& localInstance : instance().accessAllThreads()) {
97       SYNCHRONIZED(info, localInstance.eraseInfo_) {
98         if (info.eraseList.size() >= kEraseListMaxSize) {
99           info.eraseAll = true;
100         } else {
101           info.eraseList.push_back(&evb);
102         }
103         localInstance.eraseRequested_ = true;
104       }
105     }
106   }
107
108  private:
109   ThreadLocalCache() {}
110
111   struct ThreadLocalCacheTag {};
112   using ThreadThreadLocalCache = ThreadLocal<ThreadLocalCache, ThreadLocalCacheTag>;
113
114   // Leak this intentionally. During shutdown, we may call getFiberManager,
115   // and want access to the fiber managers during that time.
116   static ThreadThreadLocalCache& instance() {
117     static auto ret = new ThreadThreadLocalCache([]() { return new ThreadLocalCache(); });
118     return *ret;
119   }
120
121   FiberManager& getImpl(EventBase& evb, const FiberManager::Options& opts) {
122     eraseImpl();
123
124     auto& fmPtrRef = map_[&evb];
125     if (!fmPtrRef) {
126       fmPtrRef = &GlobalCache::get(evb, opts);
127     }
128
129     DCHECK(fmPtrRef != nullptr);
130
131     return *fmPtrRef;
132   }
133
134   void eraseImpl() {
135     if (!eraseRequested_.load()) {
136       return;
137     }
138
139     SYNCHRONIZED(info, eraseInfo_) {
140       if (info.eraseAll) {
141         map_.clear();
142       } else {
143         for (auto evbPtr : info.eraseList) {
144           map_.erase(evbPtr);
145         }
146       }
147
148       info.eraseList.clear();
149       info.eraseAll = false;
150       eraseRequested_ = false;
151     }
152   }
153
154   std::unordered_map<EventBase*, FiberManager*> map_;
155   std::atomic<bool> eraseRequested_{false};
156
157   struct EraseInfo {
158     bool eraseAll{false};
159     std::vector<EventBase*> eraseList;
160   };
161
162   folly::Synchronized<EraseInfo> eraseInfo_;
163 };
164
165 void EventBaseOnDestructionCallback::runLoopCallback() noexcept {
166   auto fm = GlobalCache::erase(evb_);
167   DCHECK(fm.get() != nullptr);
168   ThreadLocalCache::erase(evb_);
169
170   while (fm->hasTasks()) {
171     fm->loopUntilNoReady();
172     evb_.loopOnce();
173   }
174
175   delete this;
176 }
177
178 } // namespace
179
180 FiberManager& getFiberManager(EventBase& evb,
181                               const FiberManager::Options& opts) {
182   return ThreadLocalCache::get(evb, opts);
183 }
184
185 }}