#pragma once
+#include <atomic>
#include <functional>
namespace folly {
wangle/acceptor/TransportInfo.cpp \
wangle/concurrent/CPUThreadPoolExecutor.cpp \
wangle/concurrent/Codel.cpp \
- wangle/concurrent/IOExecutor.cpp \
wangle/concurrent/IOThreadPoolExecutor.cpp \
wangle/concurrent/GlobalExecutor.cpp \
wangle/concurrent/ThreadPoolExecutor.cpp \
#include <folly/experimental/Singleton.h>
#include <folly/wangle/concurrent/IOExecutor.h>
#include <folly/wangle/concurrent/IOThreadPoolExecutor.h>
+#include <folly/futures/InlineExecutor.h>
using namespace folly;
using namespace folly::wangle;
namespace {
-Singleton<IOThreadPoolExecutor> globalIOThreadPoolSingleton(
- [](){
- return new IOThreadPoolExecutor(
- sysconf(_SC_NPROCESSORS_ONLN),
- std::make_shared<NamedThreadFactory>("GlobalIOThreadPool"));
+// lock protecting global CPU executor
+struct CPUExecutorLock {};
+Singleton<RWSpinLock, CPUExecutorLock> globalCPUExecutorLock;
+// global CPU executor
+Singleton<std::weak_ptr<Executor>> globalCPUExecutor;
+// default global CPU executor is an InlineExecutor
+Singleton<std::shared_ptr<InlineExecutor>> globalInlineExecutor(
+ []{
+ return new std::shared_ptr<InlineExecutor>(
+ std::make_shared<InlineExecutor>());
+ });
+
+// lock protecting global IO executor
+struct IOExecutorLock {};
+Singleton<RWSpinLock, IOExecutorLock> globalIOExecutorLock;
+// global IO executor
+Singleton<std::weak_ptr<IOExecutor>> globalIOExecutor;
+// default global IO executor is an IOThreadPoolExecutor
+Singleton<std::shared_ptr<IOThreadPoolExecutor>> globalIOThreadPool(
+ []{
+ return new std::shared_ptr<IOThreadPoolExecutor>(
+ std::make_shared<IOThreadPoolExecutor>(
+ sysconf(_SC_NPROCESSORS_ONLN),
+ std::make_shared<NamedThreadFactory>("GlobalIOThreadPool")));
});
}
namespace folly { namespace wangle {
-IOExecutor* getIOExecutor() {
- auto singleton = IOExecutor::getSingleton();
- auto executor = singleton->load();
- while (!executor) {
- IOExecutor* nullIOExecutor = nullptr;
- singleton->compare_exchange_strong(
- nullIOExecutor,
- globalIOThreadPoolSingleton.get_fast());
- executor = singleton->load();
+template <class Exe, class DefaultExe, class LockTag>
+std::shared_ptr<Exe> getExecutor(
+ Singleton<std::weak_ptr<Exe>>& sExecutor,
+ Singleton<std::shared_ptr<DefaultExe>>& sDefaultExecutor,
+ Singleton<RWSpinLock, LockTag>& sExecutorLock) {
+ std::shared_ptr<Exe> executor;
+ auto singleton = sExecutor.get_fast();
+ auto lock = sExecutorLock.get_fast();
+
+ {
+ RWSpinLock::ReadHolder guard(lock);
+ if ((executor = sExecutor->lock())) {
+ return executor;
+ }
+ }
+
+
+ RWSpinLock::WriteHolder guard(lock);
+ executor = singleton->lock();
+ if (!executor) {
+ executor = *sDefaultExecutor.get_fast();
+ *singleton = executor;
}
return executor;
}
-void setIOExecutor(IOExecutor* executor) {
- IOExecutor::getSingleton()->store(executor);
+template <class Exe, class LockTag>
+void setExecutor(
+ std::shared_ptr<Exe> executor,
+ Singleton<std::weak_ptr<Exe>>& sExecutor,
+ Singleton<RWSpinLock, LockTag>& sExecutorLock) {
+ RWSpinLock::WriteHolder guard(sExecutorLock.get_fast());
+ *sExecutor.get_fast() = std::move(executor);
+}
+
+std::shared_ptr<Executor> getCPUExecutor() {
+ return getExecutor(
+ globalCPUExecutor,
+ globalInlineExecutor,
+ globalCPUExecutorLock);
+}
+
+void setCPUExecutor(std::shared_ptr<Executor> executor) {
+ setExecutor(
+ std::move(executor),
+ globalCPUExecutor,
+ globalCPUExecutorLock);
+}
+
+std::shared_ptr<IOExecutor> getIOExecutor() {
+ return getExecutor(
+ globalIOExecutor,
+ globalIOThreadPool,
+ globalIOExecutorLock);
+}
+
+void setIOExecutor(std::shared_ptr<IOExecutor> executor) {
+ setExecutor(
+ std::move(executor),
+ globalIOExecutor,
+ globalIOExecutorLock);
}
}} // folly::wangle
#pragma once
+#include <memory>
+
+namespace folly {
+class Executor;
+}
+
namespace folly { namespace wangle {
+// Retrieve the global Executor. If there is none, a default InlineExecutor
+// will be constructed and returned. This is named CPUExecutor to distinguish
+// it from IOExecutor below and to hint that it's intended for CPU-bound tasks.
+std::shared_ptr<Executor> getCPUExecutor();
+
+// Set an Executor to be the global Executor which will be returned by
+// subsequent calls to getCPUExecutor(). Takes a non-owning (weak) reference.
+void setCPUExecutor(std::shared_ptr<Executor> executor);
+
+// IOExecutors differ from Executors in that they drive and provide access to
+// one or more EventBases.
class IOExecutor;
// Retrieve the global IOExecutor. If there is none, a default
// IOThreadPoolExecutor will be constructed and returned.
-IOExecutor* getIOExecutor();
+std::shared_ptr<IOExecutor> getIOExecutor();
// Set an IOExecutor to be the global IOExecutor which will be returned by
-// subsequent calls to getIOExecutor(). IOExecutors will uninstall themselves
-// as global when they are destructed.
-void setIOExecutor(IOExecutor* executor);
+// subsequent calls to getIOExecutor(). Takes a non-owning (weak) reference.
+void setIOExecutor(std::shared_ptr<IOExecutor> executor);
}}
+++ /dev/null
-/*
- * Copyright 2014 Facebook, Inc.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-#include <folly/wangle/concurrent/IOExecutor.h>
-
-#include <folly/experimental/Singleton.h>
-#include <folly/wangle/concurrent/GlobalExecutor.h>
-
-using folly::Singleton;
-using folly::wangle::IOExecutor;
-
-namespace {
-
-Singleton<std::atomic<IOExecutor*>> globalIOExecutorSingleton(
- [](){
- return new std::atomic<IOExecutor*>(nullptr);
- });
-
-}
-
-namespace folly { namespace wangle {
-
-IOExecutor::~IOExecutor() {
- auto thisCopy = this;
- try {
- getSingleton()->compare_exchange_strong(thisCopy, nullptr);
- } catch (const std::runtime_error& e) {
- // The global IOExecutor singleton was already destructed so doesn't need to
- // be restored. Ignore.
- }
-}
-
-std::atomic<IOExecutor*>* IOExecutor::getSingleton() {
- return globalIOExecutorSingleton.get_fast();
-}
-
-}} // folly::wangle
// IOThreadPoolExecutor will be created and returned.
class IOExecutor : public virtual Executor {
public:
- virtual ~IOExecutor();
+ virtual ~IOExecutor() {}
virtual EventBase* getEventBase() = 0;
-
- private:
- static std::atomic<IOExecutor*>* getSingleton();
- friend IOExecutor* getIOExecutor();
- friend void setIOExecutor(IOExecutor* executor);
};
}}
using namespace folly::wangle;
+TEST(GlobalExecutorTest, GlobalCPUExecutor) {
+ class DummyExecutor : public folly::Executor {
+ public:
+ void add(folly::Func f) override {
+ f();
+ count++;
+ }
+ int count{0};
+ };
+
+ // The default CPU executor is a synchronous inline executor, lets verify
+ // that work we add is executed
+ auto count = 0;
+ auto f = [&](){ count++; };
+
+ // Don't explode, we should create the default global CPUExecutor lazily here.
+ getCPUExecutor()->add(f);
+ EXPECT_EQ(1, count);
+
+ {
+ auto dummy = std::make_shared<DummyExecutor>();
+ setCPUExecutor(dummy);
+ getCPUExecutor()->add(f);
+ // Make sure we were properly installed.
+ EXPECT_EQ(1, dummy->count);
+ EXPECT_EQ(2, count);
+ }
+
+ // Don't explode, we should restore the default global CPUExecutor because our
+ // weak reference to dummy has expired
+ getCPUExecutor()->add(f);
+ EXPECT_EQ(3, count);
+}
+
TEST(GlobalExecutorTest, GlobalIOExecutor) {
class DummyExecutor : public IOExecutor {
public:
getIOExecutor()->add(f);
{
- DummyExecutor dummy;
- setIOExecutor(&dummy);
+ auto dummy = std::make_shared<DummyExecutor>();
+ setIOExecutor(dummy);
getIOExecutor()->add(f);
// Make sure we were properly installed.
- EXPECT_EQ(1, dummy.count);
+ EXPECT_EQ(1, dummy->count);
}
- // Don't explode, we should restore the default global IOExecutor when dummy
- // is destructed.
+ // Don't explode, we should restore the default global IOExecutor because our
+ // weak reference to dummy has expired
getIOExecutor()->add(f);
}