From 6214f971e82863efe7c72229dde0c84fc0594dd0 Mon Sep 17 00:00:00 2001 From: James Sedgwick Date: Wed, 14 Jan 2015 07:08:50 -0800 Subject: [PATCH] global executors with weak_ptr semantics Summary: unfortunately, can't use atomics now that a weak_ptr is stored instead of a raw ptr, hence the additional singleton locks might want to make an overload of via() that takes shared_ptr to integrate more easily? or change via() to store a shared_ptr (and make it non-owning if a raw ptr is passed) Test Plan: unit Reviewed By: davejwatson@fb.com Subscribers: jsedgwick, trunkagent, fugalh, folly-diffs@ FB internal diff: D1764359 Tasks: 5002442 Signature: t1:1764359:1420845340:349ea88091d7ca4ee386b54aec599647341fadd4 --- folly/Executor.h | 1 + folly/Makefile.am | 1 - folly/wangle/concurrent/GlobalExecutor.cpp | 98 ++++++++++++++++--- folly/wangle/concurrent/GlobalExecutor.h | 24 ++++- folly/wangle/concurrent/IOExecutor.cpp | 49 ---------- folly/wangle/concurrent/IOExecutor.h | 7 +- .../concurrent/test/GlobalExecutorTest.cpp | 44 ++++++++- 7 files changed, 143 insertions(+), 81 deletions(-) delete mode 100644 folly/wangle/concurrent/IOExecutor.cpp diff --git a/folly/Executor.h b/folly/Executor.h index 5ba4844b..4bacba7b 100644 --- a/folly/Executor.h +++ b/folly/Executor.h @@ -16,6 +16,7 @@ #pragma once +#include #include namespace folly { diff --git a/folly/Makefile.am b/folly/Makefile.am index d717d5fd..c3b8ca9d 100644 --- a/folly/Makefile.am +++ b/folly/Makefile.am @@ -343,7 +343,6 @@ libfolly_la_SOURCES = \ wangle/acceptor/TransportInfo.cpp \ wangle/concurrent/CPUThreadPoolExecutor.cpp \ wangle/concurrent/Codel.cpp \ - wangle/concurrent/IOExecutor.cpp \ wangle/concurrent/IOThreadPoolExecutor.cpp \ wangle/concurrent/GlobalExecutor.cpp \ wangle/concurrent/ThreadPoolExecutor.cpp \ diff --git a/folly/wangle/concurrent/GlobalExecutor.cpp b/folly/wangle/concurrent/GlobalExecutor.cpp index 35455921..eb97f06d 100644 --- a/folly/wangle/concurrent/GlobalExecutor.cpp +++ b/folly/wangle/concurrent/GlobalExecutor.cpp @@ -17,38 +17,104 @@ #include #include #include +#include using namespace folly; using namespace folly::wangle; namespace { -Singleton globalIOThreadPoolSingleton( - [](){ - return new IOThreadPoolExecutor( - sysconf(_SC_NPROCESSORS_ONLN), - std::make_shared("GlobalIOThreadPool")); +// lock protecting global CPU executor +struct CPUExecutorLock {}; +Singleton globalCPUExecutorLock; +// global CPU executor +Singleton> globalCPUExecutor; +// default global CPU executor is an InlineExecutor +Singleton> globalInlineExecutor( + []{ + return new std::shared_ptr( + std::make_shared()); + }); + +// lock protecting global IO executor +struct IOExecutorLock {}; +Singleton globalIOExecutorLock; +// global IO executor +Singleton> globalIOExecutor; +// default global IO executor is an IOThreadPoolExecutor +Singleton> globalIOThreadPool( + []{ + return new std::shared_ptr( + std::make_shared( + sysconf(_SC_NPROCESSORS_ONLN), + std::make_shared("GlobalIOThreadPool"))); }); } namespace folly { namespace wangle { -IOExecutor* getIOExecutor() { - auto singleton = IOExecutor::getSingleton(); - auto executor = singleton->load(); - while (!executor) { - IOExecutor* nullIOExecutor = nullptr; - singleton->compare_exchange_strong( - nullIOExecutor, - globalIOThreadPoolSingleton.get_fast()); - executor = singleton->load(); +template +std::shared_ptr getExecutor( + Singleton>& sExecutor, + Singleton>& sDefaultExecutor, + Singleton& sExecutorLock) { + std::shared_ptr executor; + auto singleton = sExecutor.get_fast(); + auto lock = sExecutorLock.get_fast(); + + { + RWSpinLock::ReadHolder guard(lock); + if ((executor = sExecutor->lock())) { + return executor; + } + } + + + RWSpinLock::WriteHolder guard(lock); + executor = singleton->lock(); + if (!executor) { + executor = *sDefaultExecutor.get_fast(); + *singleton = executor; } return executor; } -void setIOExecutor(IOExecutor* executor) { - IOExecutor::getSingleton()->store(executor); +template +void setExecutor( + std::shared_ptr executor, + Singleton>& sExecutor, + Singleton& sExecutorLock) { + RWSpinLock::WriteHolder guard(sExecutorLock.get_fast()); + *sExecutor.get_fast() = std::move(executor); +} + +std::shared_ptr getCPUExecutor() { + return getExecutor( + globalCPUExecutor, + globalInlineExecutor, + globalCPUExecutorLock); +} + +void setCPUExecutor(std::shared_ptr executor) { + setExecutor( + std::move(executor), + globalCPUExecutor, + globalCPUExecutorLock); +} + +std::shared_ptr getIOExecutor() { + return getExecutor( + globalIOExecutor, + globalIOThreadPool, + globalIOExecutorLock); +} + +void setIOExecutor(std::shared_ptr executor) { + setExecutor( + std::move(executor), + globalIOExecutor, + globalIOExecutorLock); } }} // folly::wangle diff --git a/folly/wangle/concurrent/GlobalExecutor.h b/folly/wangle/concurrent/GlobalExecutor.h index cac76be8..08df1c46 100644 --- a/folly/wangle/concurrent/GlobalExecutor.h +++ b/folly/wangle/concurrent/GlobalExecutor.h @@ -16,17 +16,33 @@ #pragma once +#include + +namespace folly { +class Executor; +} + namespace folly { namespace wangle { +// Retrieve the global Executor. If there is none, a default InlineExecutor +// will be constructed and returned. This is named CPUExecutor to distinguish +// it from IOExecutor below and to hint that it's intended for CPU-bound tasks. +std::shared_ptr getCPUExecutor(); + +// Set an Executor to be the global Executor which will be returned by +// subsequent calls to getCPUExecutor(). Takes a non-owning (weak) reference. +void setCPUExecutor(std::shared_ptr executor); + +// IOExecutors differ from Executors in that they drive and provide access to +// one or more EventBases. class IOExecutor; // Retrieve the global IOExecutor. If there is none, a default // IOThreadPoolExecutor will be constructed and returned. -IOExecutor* getIOExecutor(); +std::shared_ptr getIOExecutor(); // Set an IOExecutor to be the global IOExecutor which will be returned by -// subsequent calls to getIOExecutor(). IOExecutors will uninstall themselves -// as global when they are destructed. -void setIOExecutor(IOExecutor* executor); +// subsequent calls to getIOExecutor(). Takes a non-owning (weak) reference. +void setIOExecutor(std::shared_ptr executor); }} diff --git a/folly/wangle/concurrent/IOExecutor.cpp b/folly/wangle/concurrent/IOExecutor.cpp deleted file mode 100644 index d1b3283b..00000000 --- a/folly/wangle/concurrent/IOExecutor.cpp +++ /dev/null @@ -1,49 +0,0 @@ -/* - * Copyright 2014 Facebook, Inc. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -#include - -#include -#include - -using folly::Singleton; -using folly::wangle::IOExecutor; - -namespace { - -Singleton> globalIOExecutorSingleton( - [](){ - return new std::atomic(nullptr); - }); - -} - -namespace folly { namespace wangle { - -IOExecutor::~IOExecutor() { - auto thisCopy = this; - try { - getSingleton()->compare_exchange_strong(thisCopy, nullptr); - } catch (const std::runtime_error& e) { - // The global IOExecutor singleton was already destructed so doesn't need to - // be restored. Ignore. - } -} - -std::atomic* IOExecutor::getSingleton() { - return globalIOExecutorSingleton.get_fast(); -} - -}} // folly::wangle diff --git a/folly/wangle/concurrent/IOExecutor.h b/folly/wangle/concurrent/IOExecutor.h index 14eb6643..0fc4f5c7 100644 --- a/folly/wangle/concurrent/IOExecutor.h +++ b/folly/wangle/concurrent/IOExecutor.h @@ -40,13 +40,8 @@ namespace folly { namespace wangle { // IOThreadPoolExecutor will be created and returned. class IOExecutor : public virtual Executor { public: - virtual ~IOExecutor(); + virtual ~IOExecutor() {} virtual EventBase* getEventBase() = 0; - - private: - static std::atomic* getSingleton(); - friend IOExecutor* getIOExecutor(); - friend void setIOExecutor(IOExecutor* executor); }; }} diff --git a/folly/wangle/concurrent/test/GlobalExecutorTest.cpp b/folly/wangle/concurrent/test/GlobalExecutorTest.cpp index a601b0c1..4539bd26 100644 --- a/folly/wangle/concurrent/test/GlobalExecutorTest.cpp +++ b/folly/wangle/concurrent/test/GlobalExecutorTest.cpp @@ -20,6 +20,40 @@ using namespace folly::wangle; +TEST(GlobalExecutorTest, GlobalCPUExecutor) { + class DummyExecutor : public folly::Executor { + public: + void add(folly::Func f) override { + f(); + count++; + } + int count{0}; + }; + + // The default CPU executor is a synchronous inline executor, lets verify + // that work we add is executed + auto count = 0; + auto f = [&](){ count++; }; + + // Don't explode, we should create the default global CPUExecutor lazily here. + getCPUExecutor()->add(f); + EXPECT_EQ(1, count); + + { + auto dummy = std::make_shared(); + setCPUExecutor(dummy); + getCPUExecutor()->add(f); + // Make sure we were properly installed. + EXPECT_EQ(1, dummy->count); + EXPECT_EQ(2, count); + } + + // Don't explode, we should restore the default global CPUExecutor because our + // weak reference to dummy has expired + getCPUExecutor()->add(f); + EXPECT_EQ(3, count); +} + TEST(GlobalExecutorTest, GlobalIOExecutor) { class DummyExecutor : public IOExecutor { public: @@ -38,14 +72,14 @@ TEST(GlobalExecutorTest, GlobalIOExecutor) { getIOExecutor()->add(f); { - DummyExecutor dummy; - setIOExecutor(&dummy); + auto dummy = std::make_shared(); + setIOExecutor(dummy); getIOExecutor()->add(f); // Make sure we were properly installed. - EXPECT_EQ(1, dummy.count); + EXPECT_EQ(1, dummy->count); } - // Don't explode, we should restore the default global IOExecutor when dummy - // is destructed. + // Don't explode, we should restore the default global IOExecutor because our + // weak reference to dummy has expired getIOExecutor()->add(f); } -- 2.34.1