Make global executors shutdown-safe
[folly.git] / folly / executors / GlobalExecutor.cpp
index 0341e7123c3389cf3e4dcf17f32802d51fc5508c..6fd661945f602dadc62b37ca1caa077acca4d2a1 100644 (file)
  * limitations under the License.
  */
 
+#include <memory>
+#include <thread>
+
+#include <folly/Function.h>
+#include <folly/SharedMutex.h>
 #include <folly/Singleton.h>
 #include <folly/executors/IOExecutor.h>
 #include <folly/executors/IOThreadPoolExecutor.h>
@@ -23,90 +28,93 @@ using namespace folly;
 
 namespace {
 
-// lock protecting global CPU executor
-struct CPUExecutorLock {};
-Singleton<RWSpinLock, CPUExecutorLock> globalCPUExecutorLock;
-// global CPU executor
-Singleton<std::weak_ptr<Executor>> globalCPUExecutor;
-// default global CPU executor is an InlineExecutor
-Singleton<std::shared_ptr<InlineExecutor>> globalInlineExecutor([] {
-  return new std::shared_ptr<InlineExecutor>(
-      std::make_shared<InlineExecutor>());
-});
-
-// lock protecting global IO executor
-struct IOExecutorLock {};
-Singleton<RWSpinLock, IOExecutorLock> globalIOExecutorLock;
-// global IO executor
-Singleton<std::weak_ptr<IOExecutor>> globalIOExecutor;
-// default global IO executor is an IOThreadPoolExecutor
-Singleton<std::shared_ptr<IOThreadPoolExecutor>> globalIOThreadPool([] {
-  return new std::shared_ptr<IOThreadPoolExecutor>(
-      std::make_shared<IOThreadPoolExecutor>(
-          sysconf(_SC_NPROCESSORS_ONLN),
-          std::make_shared<NamedThreadFactory>("GlobalIOThreadPool")));
-});
-} // namespace
-
-namespace folly {
+template <class ExecutorBase>
+class GlobalExecutor {
+ public:
+  explicit GlobalExecutor(
+      Function<std::unique_ptr<ExecutorBase>()> constructDefault)
+      : constructDefault_(std::move(constructDefault)) {}
+
+  std::shared_ptr<ExecutorBase> get() {
+    {
+      SharedMutex::ReadHolder guard(mutex_);
+      if (auto executor = executor_.lock()) {
+        return executor; // Fast path.
+      }
+    }
 
-template <class Exe, class DefaultExe, class LockTag>
-std::shared_ptr<Exe> getExecutor(
-    Singleton<std::weak_ptr<Exe>>& sExecutor,
-    Singleton<std::shared_ptr<DefaultExe>>& sDefaultExecutor,
-    Singleton<RWSpinLock, LockTag>& sExecutorLock) {
-  std::shared_ptr<Exe> executor;
-  auto singleton = sExecutor.try_get();
-  auto lock = sExecutorLock.try_get();
-
-  {
-    RWSpinLock::ReadHolder guard(lock.get());
-    if ((executor = sExecutor.try_get()->lock())) {
+    SharedMutex::WriteHolder guard(mutex_);
+    if (auto executor = executor_.lock()) {
       return executor;
     }
+
+    if (!defaultExecutor_) {
+      defaultExecutor_ = constructDefault_();
+    }
+
+    return defaultExecutor_;
   }
 
-  RWSpinLock::WriteHolder guard(lock.get());
-  executor = singleton->lock();
-  if (!executor) {
-    std::weak_ptr<Exe> defaultExecutor = *sDefaultExecutor.try_get().get();
-    executor = defaultExecutor.lock();
-    sExecutor.try_get().get()->swap(defaultExecutor);
+  void set(std::weak_ptr<ExecutorBase> executor) {
+    SharedMutex::WriteHolder guard(mutex_);
+    executor_.swap(executor);
   }
-  return executor;
-}
 
-template <class Exe, class LockTag>
-void setExecutor(
-    std::weak_ptr<Exe> executor,
-    Singleton<std::weak_ptr<Exe>>& sExecutor,
-    Singleton<RWSpinLock, LockTag>& sExecutorLock) {
-  auto lock = sExecutorLock.try_get();
-  RWSpinLock::WriteHolder guard(*lock);
-  std::weak_ptr<Exe> executor_weak = std::move(executor);
-  sExecutor.try_get().get()->swap(executor_weak);
-}
+ private:
+  SharedMutex mutex_;
+  std::weak_ptr<ExecutorBase> executor_;
+  std::shared_ptr<ExecutorBase> defaultExecutor_;
+  Function<std::unique_ptr<ExecutorBase>()> constructDefault_;
+};
+
+Singleton<GlobalExecutor<Executor>> gGlobalCPUExecutor([] {
+  return new GlobalExecutor<Executor>(
+      // Default global CPU executor is an InlineExecutor.
+      [] { return std::make_unique<InlineExecutor>(); });
+});
+
+Singleton<GlobalExecutor<IOExecutor>> gGlobalIOExecutor([] {
+  return new GlobalExecutor<IOExecutor>(
+      // Default global IO executor is an IOThreadPoolExecutor.
+      [] {
+        return std::make_unique<IOThreadPoolExecutor>(
+            std::thread::hardware_concurrency(),
+            std::make_shared<NamedThreadFactory>("GlobalIOThreadPool"));
+      });
+});
+
+} // namespace
+
+namespace folly {
 
 std::shared_ptr<Executor> getCPUExecutor() {
-  return getExecutor(
-      globalCPUExecutor, globalInlineExecutor, globalCPUExecutorLock);
+  if (auto singleton = gGlobalCPUExecutor.try_get()) {
+    return singleton->get();
+  }
+  return nullptr;
 }
 
 void setCPUExecutor(std::weak_ptr<Executor> executor) {
-  setExecutor(std::move(executor), globalCPUExecutor, globalCPUExecutorLock);
+  if (auto singleton = gGlobalCPUExecutor.try_get()) {
+    singleton->set(std::move(executor));
+  }
 }
 
 std::shared_ptr<IOExecutor> getIOExecutor() {
-  return getExecutor(
-      globalIOExecutor, globalIOThreadPool, globalIOExecutorLock);
+  if (auto singleton = gGlobalIOExecutor.try_get()) {
+    return singleton->get();
+  }
+  return nullptr;
 }
 
-EventBase* getEventBase() {
-  return getIOExecutor()->getEventBase();
+void setIOExecutor(std::weak_ptr<IOExecutor> executor) {
+  if (auto singleton = gGlobalIOExecutor.try_get()) {
+    singleton->set(std::move(executor));
+  }
 }
 
-void setIOExecutor(std::weak_ptr<IOExecutor> executor) {
-  setExecutor(std::move(executor), globalIOExecutor, globalIOExecutorLock);
+EventBase* getEventBase() {
+  return getIOExecutor()->getEventBase();
 }
 
 } // namespace folly