From 79b9cc5f830d8b92abeb29e180a30f75804259d0 Mon Sep 17 00:00:00 2001 From: James Sedgwick Date: Wed, 17 Sep 2014 02:58:06 -0700 Subject: [PATCH] ThreadPoolExecutor and its children CPUThreadPoolExecutor and IOThreadPoolExecutor Summary: Spun off from https://phabricator.fb.com/D1534506 as this seemed different enough for a new diff Similar to previous diff but attempts to reuse a common thread management process between cpu and io bound thread pools. Also sets the stage for other common functionality, e.g. stats, monitoring, timeouts, and so on Here is some output from the queue benchmark in common/concurrent with both of these pools added (changes to BM not in this diff): https://phabricator.fb.com/P16308560 Test Plan: added a unit test, ran benchmark Reviewed By: davejwatson@fb.com Subscribers: fugalh, njormrod, bmatheny FB internal diff: D1555443 Tasks: 5002392, 5002425 --- .../wangle/concurrent/BlockingQueue.h | 29 ++++ .../concurrent/CPUThreadPoolExecutor.cpp | 84 ++++++++++++ .../wangle/concurrent/CPUThreadPoolExecutor.h | 60 +++++++++ .../experimental/wangle/concurrent/Executor.h | 31 +++++ .../concurrent/IOThreadPoolExecutor.cpp | 85 ++++++++++++ .../wangle/concurrent/IOThreadPoolExecutor.h | 49 +++++++ .../wangle/concurrent/LifoSemMPMCQueue.h | 57 ++++++++ .../wangle/concurrent/NamedThreadFactory.h | 42 ++++++ .../wangle/concurrent/ThreadFactory.h | 30 +++++ .../wangle/concurrent/ThreadPoolExecutor.cpp | 110 +++++++++++++++ .../wangle/concurrent/ThreadPoolExecutor.h | 117 ++++++++++++++++ .../test/ThreadPoolExecutorTest.cpp | 126 ++++++++++++++++++ 12 files changed, 820 insertions(+) create mode 100644 folly/experimental/wangle/concurrent/BlockingQueue.h create mode 100644 folly/experimental/wangle/concurrent/CPUThreadPoolExecutor.cpp create mode 100644 folly/experimental/wangle/concurrent/CPUThreadPoolExecutor.h create mode 100644 folly/experimental/wangle/concurrent/Executor.h create mode 100644 folly/experimental/wangle/concurrent/IOThreadPoolExecutor.cpp create mode 100644 folly/experimental/wangle/concurrent/IOThreadPoolExecutor.h create mode 100644 folly/experimental/wangle/concurrent/LifoSemMPMCQueue.h create mode 100644 folly/experimental/wangle/concurrent/NamedThreadFactory.h create mode 100644 folly/experimental/wangle/concurrent/ThreadFactory.h create mode 100644 folly/experimental/wangle/concurrent/ThreadPoolExecutor.cpp create mode 100644 folly/experimental/wangle/concurrent/ThreadPoolExecutor.h create mode 100644 folly/experimental/wangle/concurrent/test/ThreadPoolExecutorTest.cpp diff --git a/folly/experimental/wangle/concurrent/BlockingQueue.h b/folly/experimental/wangle/concurrent/BlockingQueue.h new file mode 100644 index 00000000..6a653544 --- /dev/null +++ b/folly/experimental/wangle/concurrent/BlockingQueue.h @@ -0,0 +1,29 @@ +/* + * Copyright 2014 Facebook, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#pragma once +namespace folly { namespace wangle { + +template +class BlockingQueue { + public: + virtual ~BlockingQueue() {} + virtual void add(T item) = 0; + virtual T take() = 0; + virtual size_t size() = 0; +}; + +}} // folly::wangle diff --git a/folly/experimental/wangle/concurrent/CPUThreadPoolExecutor.cpp b/folly/experimental/wangle/concurrent/CPUThreadPoolExecutor.cpp new file mode 100644 index 00000000..4a133fb9 --- /dev/null +++ b/folly/experimental/wangle/concurrent/CPUThreadPoolExecutor.cpp @@ -0,0 +1,84 @@ +/* + * Copyright 2014 Facebook, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include + +namespace folly { namespace wangle { + +const size_t CPUThreadPoolExecutor::kDefaultMaxQueueSize = 1 << 18; + +CPUThreadPoolExecutor::CPUThreadPoolExecutor( + size_t numThreads, + std::unique_ptr> taskQueue, + std::unique_ptr threadFactory) + : ThreadPoolExecutor(numThreads, std::move(threadFactory)), + taskQueue_(std::move(taskQueue)) { + addThreads(numThreads); + CHECK(threadList_.get().size() == numThreads); +} + +CPUThreadPoolExecutor::~CPUThreadPoolExecutor() { + stop(); + CHECK(threadsToStop_ == 0); +} + +void CPUThreadPoolExecutor::add(Func func) { + // TODO handle enqueue failure, here and in other add() callsites + taskQueue_->add(Task(std::move(func))); +} + +void CPUThreadPoolExecutor::threadRun(std::shared_ptr thread) { + while (1) { + // TODO expiration / codel + auto t = taskQueue_->take(); + if (UNLIKELY(t.poison)) { + CHECK(threadsToStop_-- > 0); + stoppedThreads_.add(thread); + return; + } else { + thread->idle = false; + try { + t.func(); + } catch (const std::exception& e) { + LOG(ERROR) << "CPUThreadPoolExecutor: func threw unhandled " << + typeid(e).name() << " exception: " << e.what(); + } catch (...) { + LOG(ERROR) << "CPUThreadPoolExecutor: func threw unhandled non-exception " + "object"; + } + thread->idle = true; + } + + if (UNLIKELY(threadsToStop_ > 0 && !isJoin_)) { + if (--threadsToStop_ >= 0) { + stoppedThreads_.add(thread); + return; + } else { + threadsToStop_++; + } + } + } +} + +void CPUThreadPoolExecutor::stopThreads(size_t n) { + CHECK(stoppedThreads_.size() == 0); + threadsToStop_ = n; + for (int i = 0; i < n; i++) { + taskQueue_->add(Task()); + } +} + +}} // folly::wangle diff --git a/folly/experimental/wangle/concurrent/CPUThreadPoolExecutor.h b/folly/experimental/wangle/concurrent/CPUThreadPoolExecutor.h new file mode 100644 index 00000000..210fd673 --- /dev/null +++ b/folly/experimental/wangle/concurrent/CPUThreadPoolExecutor.h @@ -0,0 +1,60 @@ +/* + * Copyright 2014 Facebook, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#pragma once +#include + +namespace folly { namespace wangle { + +class CPUThreadPoolExecutor : public ThreadPoolExecutor { + public: + struct Task; + + // TODO thread naming, perhaps a required input to ThreadFactories + explicit CPUThreadPoolExecutor( + size_t numThreads, + std::unique_ptr> taskQueue = + folly::make_unique>( + CPUThreadPoolExecutor::kDefaultMaxQueueSize), + std::unique_ptr threadFactory = + folly::make_unique("CPUThreadPool")); + + ~CPUThreadPoolExecutor(); + + void add(Func func) override; + + struct Task { + explicit Task(Func&& taskArg) : func(std::move(taskArg)), poison(false) {} + Task() : func(nullptr), poison(true) {} + Task(Task&& o) noexcept : func(std::move(o.func)), poison(o.poison) {} + Task(const Task&) = default; + Task& operator=(const Task&) = default; + Func func; + bool poison; + // TODO per-task stats, timeouts, expirations + }; + + static const size_t kDefaultMaxQueueSize; + + private: + void threadRun(ThreadPtr thread) override; + void stopThreads(size_t n) override; + + std::atomic threadsToStop_; + std::unique_ptr> taskQueue_; +}; + +}} // folly::wangle diff --git a/folly/experimental/wangle/concurrent/Executor.h b/folly/experimental/wangle/concurrent/Executor.h new file mode 100644 index 00000000..49db177e --- /dev/null +++ b/folly/experimental/wangle/concurrent/Executor.h @@ -0,0 +1,31 @@ +/* + * Copyright 2014 Facebook, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#pragma once + +#include + +namespace folly { namespace wangle { + +typedef std::function Func; + +class Executor { + public: + virtual ~Executor() {}; + virtual void add(Func func) = 0; +}; + +}} // folly::wangle diff --git a/folly/experimental/wangle/concurrent/IOThreadPoolExecutor.cpp b/folly/experimental/wangle/concurrent/IOThreadPoolExecutor.cpp new file mode 100644 index 00000000..cdc36ef4 --- /dev/null +++ b/folly/experimental/wangle/concurrent/IOThreadPoolExecutor.cpp @@ -0,0 +1,85 @@ +/* + * Copyright 2014 Facebook, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include + +#include +#include + +namespace folly { namespace wangle { + +IOThreadPoolExecutor::IOThreadPoolExecutor( + size_t numThreads, + std::unique_ptr threadFactory) + : ThreadPoolExecutor(numThreads, std::move(threadFactory)), + nextThread_(0) { + addThreads(numThreads); + CHECK(threadList_.get().size() == numThreads); +} + +IOThreadPoolExecutor::~IOThreadPoolExecutor() { + stop(); +} + +void IOThreadPoolExecutor::add(Func func) { + RWSpinLock::ReadHolder{&threadListLock_}; + if (threadList_.get().empty()) { + throw std::runtime_error("No threads available"); + } + auto thread = threadList_.get()[nextThread_++ % threadList_.get().size()]; + auto ioThread = std::static_pointer_cast(thread); + + auto moveFunc = folly::makeMoveWrapper(std::move(func)); + auto wrappedFunc = [moveFunc, ioThread] () { + (*moveFunc)(); + ioThread->outstandingTasks--; + }; + + ioThread->outstandingTasks++; + if (!ioThread->eventBase.runInEventBaseThread(std::move(wrappedFunc))) { + ioThread->outstandingTasks--; + throw std::runtime_error("Unable to run func in event base thread"); + } +} + +std::shared_ptr +IOThreadPoolExecutor::makeThread() { + return std::make_shared(); +} + +void IOThreadPoolExecutor::threadRun(ThreadPtr thread) { + const auto ioThread = std::static_pointer_cast(thread); + while (ioThread->shouldRun) { + ioThread->eventBase.loopForever(); + } + if (isJoin_) { + while (ioThread->outstandingTasks > 0) { + ioThread->eventBase.loopOnce(); + } + } + stoppedThreads_.add(ioThread); +} + +void IOThreadPoolExecutor::stopThreads(size_t n) { + for (int i = 0; i < n; i++) { + const auto ioThread = std::static_pointer_cast( + threadList_.get()[i]); + ioThread->shouldRun = false; + ioThread->eventBase.terminateLoopSoon(); + } +} + +}} // folly::wangle diff --git a/folly/experimental/wangle/concurrent/IOThreadPoolExecutor.h b/folly/experimental/wangle/concurrent/IOThreadPoolExecutor.h new file mode 100644 index 00000000..2b498bdc --- /dev/null +++ b/folly/experimental/wangle/concurrent/IOThreadPoolExecutor.h @@ -0,0 +1,49 @@ +/* + * Copyright 2014 Facebook, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#pragma once +#include +#include + +namespace folly { namespace wangle { + +class IOThreadPoolExecutor : public ThreadPoolExecutor { + public: + explicit IOThreadPoolExecutor( + size_t numThreads, + std::unique_ptr threadFactory = + folly::make_unique("IOThreadPool")); + + ~IOThreadPoolExecutor(); + + void add(Func func) override; + + private: + ThreadPtr makeThread() override; + void threadRun(ThreadPtr thread) override; + void stopThreads(size_t n) override; + + struct FOLLY_ALIGN_TO_AVOID_FALSE_SHARING IOThread : public Thread { + IOThread() : shouldRun(true), outstandingTasks(0) {}; + std::atomic shouldRun; + std::atomic outstandingTasks; + EventBase eventBase; + }; + + size_t nextThread_; +}; + +}} // folly::wangle diff --git a/folly/experimental/wangle/concurrent/LifoSemMPMCQueue.h b/folly/experimental/wangle/concurrent/LifoSemMPMCQueue.h new file mode 100644 index 00000000..45d362e9 --- /dev/null +++ b/folly/experimental/wangle/concurrent/LifoSemMPMCQueue.h @@ -0,0 +1,57 @@ +/* + * Copyright 2014 Facebook, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#pragma once +#include +#include +#include + +namespace folly { namespace wangle { + +template +class LifoSemMPMCQueue : public BlockingQueue { + public: + explicit LifoSemMPMCQueue(size_t capacity) : queue_(capacity) {} + + void add(T item) override { + if (!queue_.write(std::move(item))) { + throw std::runtime_error("LifoSemMPMCQueue full, can't add item"); + } + sem_.post(); + } + + T take() override { + T item; + while (!queue_.read(item)) { + sem_.wait(); + } + return item; + } + + size_t capacity() { + return queue_.capacity(); + } + + size_t size() override { + return queue_.size(); + } + + private: + LifoSem sem_; + MPMCQueue queue_; +}; + +}} // folly::wangle diff --git a/folly/experimental/wangle/concurrent/NamedThreadFactory.h b/folly/experimental/wangle/concurrent/NamedThreadFactory.h new file mode 100644 index 00000000..5c513c51 --- /dev/null +++ b/folly/experimental/wangle/concurrent/NamedThreadFactory.h @@ -0,0 +1,42 @@ +/* + * Copyright 2014 Facebook, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#pragma once +#include +#include +#include + +namespace folly { namespace wangle { + +class NamedThreadFactory : public ThreadFactory { + public: + explicit NamedThreadFactory(folly::StringPiece prefix) + : prefix_(prefix), suffix_(0) {} + + std::thread newThread(Func&& func) override { + auto thread = std::thread(std::move(func)); + folly::setThreadName( + thread.native_handle(), + folly::to(prefix_, suffix_++)); + return thread; + } + + private: + folly::StringPiece prefix_; + std::atomic suffix_; +}; + +}} // folly::wangle diff --git a/folly/experimental/wangle/concurrent/ThreadFactory.h b/folly/experimental/wangle/concurrent/ThreadFactory.h new file mode 100644 index 00000000..b5da0758 --- /dev/null +++ b/folly/experimental/wangle/concurrent/ThreadFactory.h @@ -0,0 +1,30 @@ +/* + * Copyright 2014 Facebook, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#pragma once +#include + +#include + +namespace folly { namespace wangle { + +class ThreadFactory { + public: + virtual ~ThreadFactory() {} + virtual std::thread newThread(Func&& func) = 0; +}; + +}} // folly::wangle diff --git a/folly/experimental/wangle/concurrent/ThreadPoolExecutor.cpp b/folly/experimental/wangle/concurrent/ThreadPoolExecutor.cpp new file mode 100644 index 00000000..4d249b04 --- /dev/null +++ b/folly/experimental/wangle/concurrent/ThreadPoolExecutor.cpp @@ -0,0 +1,110 @@ +/* + * Copyright 2014 Facebook, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include + +namespace folly { namespace wangle { + +ThreadPoolExecutor::ThreadPoolExecutor( + size_t numThreads, + std::unique_ptr threadFactory) + : threadFactory_(std::move(threadFactory)) {} + +ThreadPoolExecutor::~ThreadPoolExecutor() { + CHECK(threadList_.get().size() == 0); +} + +size_t ThreadPoolExecutor::numThreads() { + RWSpinLock::ReadHolder{&threadListLock_}; + return threadList_.get().size(); +} + +void ThreadPoolExecutor::setNumThreads(size_t n) { + RWSpinLock::WriteHolder{&threadListLock_}; + const auto current = threadList_.get().size(); + if (n > current ) { + addThreads(n - current); + } else if (n < current) { + removeThreads(current - n, true); + } + CHECK(threadList_.get().size() == n); +} + +void ThreadPoolExecutor::addThreads(size_t n) { + for (int i = 0; i < n; i++) { + auto thread = makeThread(); + // TODO need a notion of failing to create the thread + // and then handling for that case + thread->handle = threadFactory_->newThread( + std::bind(&ThreadPoolExecutor::threadRun, this, thread)); + threadList_.add(thread); + } +} + +void ThreadPoolExecutor::removeThreads(size_t n, bool isJoin) { + CHECK(n <= threadList_.get().size()); + CHECK(stoppedThreads_.size() == 0); + isJoin_ = isJoin; + stopThreads(n); + for (int i = 0; i < n; i++) { + auto thread = stoppedThreads_.take(); + thread->handle.join(); + threadList_.remove(thread); + } + CHECK(stoppedThreads_.size() == 0); +} + +void ThreadPoolExecutor::stop() { + RWSpinLock::WriteHolder{&threadListLock_}; + removeThreads(threadList_.get().size(), false); + CHECK(threadList_.get().size() == 0); +} + +void ThreadPoolExecutor::join() { + RWSpinLock::WriteHolder{&threadListLock_}; + removeThreads(threadList_.get().size(), true); + CHECK(threadList_.get().size() == 0); +} + +std::atomic ThreadPoolExecutor::Thread::nextId(0); + +void ThreadPoolExecutor::StoppedThreadQueue::add( + ThreadPoolExecutor::ThreadPtr item) { + std::lock_guard guard(mutex_); + queue_.push(std::move(item)); + sem_.post(); +} + +ThreadPoolExecutor::ThreadPtr ThreadPoolExecutor::StoppedThreadQueue::take() { + while(1) { + { + std::lock_guard guard(mutex_); + if (queue_.size() > 0) { + auto item = std::move(queue_.front()); + queue_.pop(); + return item; + } + } + sem_.wait(); + } +} + +size_t ThreadPoolExecutor::StoppedThreadQueue::size() { + std::lock_guard guard(mutex_); + return queue_.size(); +} + +}} // folly::wangle diff --git a/folly/experimental/wangle/concurrent/ThreadPoolExecutor.h b/folly/experimental/wangle/concurrent/ThreadPoolExecutor.h new file mode 100644 index 00000000..f802af14 --- /dev/null +++ b/folly/experimental/wangle/concurrent/ThreadPoolExecutor.h @@ -0,0 +1,117 @@ +/* + * Copyright 2014 Facebook, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#pragma once +#include +#include +#include +#include +#include + +#include +#include +#include + +#include + +namespace folly { namespace wangle { + +class ThreadPoolExecutor : public Executor { + public: + explicit ThreadPoolExecutor( + size_t numThreads, + std::unique_ptr threadFactory); + + ~ThreadPoolExecutor(); + + size_t numThreads(); + void setNumThreads(size_t numThreads); + void stop(); + void join(); + // TODO expose stats + + protected: + void addThreads(size_t n); + void removeThreads(size_t n, bool isJoin); + + struct FOLLY_ALIGN_TO_AVOID_FALSE_SHARING Thread { + virtual ~Thread() {} + Thread() : id(nextId++), handle(), idle(true) {}; + static std::atomic nextId; + uint64_t id; + std::thread handle; + bool idle; + // TODO per-thread stats go here + }; + + typedef std::shared_ptr ThreadPtr; + + // The function that will be bound to pool threads + virtual void threadRun(ThreadPtr thread) = 0; + // Stop n threads and put their Thread structs in the threadsStopped_ queue + virtual void stopThreads(size_t n) = 0; + // Create a suitable Thread struct + virtual ThreadPtr makeThread() { + return std::make_shared(); + } + // need a stopThread(id) for keepalive feature + + class ThreadList { + public: + void add(const ThreadPtr& state) { + auto it = std::lower_bound(vec_.begin(), vec_.end(), state, compare); + vec_.insert(it, state); + } + + void remove(const ThreadPtr& state) { + auto itPair = std::equal_range(vec_.begin(), vec_.end(), state, compare); + CHECK(itPair.first != vec_.end()); + CHECK(std::next(itPair.first) == itPair.second); + vec_.erase(itPair.first); + } + + const std::vector& get() const { + return vec_; + } + + private: + static bool compare(const ThreadPtr& ts1, const ThreadPtr& ts2) { + return ts1->id < ts2->id; + } + + std::vector vec_; + }; + + class StoppedThreadQueue : public BlockingQueue { + public: + void add(ThreadPtr item) override; + ThreadPtr take() override; + size_t size() override; + + private: + LifoSem sem_; + std::mutex mutex_; + std::queue queue_; + }; + + std::unique_ptr threadFactory_; + ThreadList threadList_; + RWSpinLock threadListLock_; + StoppedThreadQueue stoppedThreads_; + std::atomic isJoin_; // whether the current downsizing is a join +}; + +}} // folly::wangle diff --git a/folly/experimental/wangle/concurrent/test/ThreadPoolExecutorTest.cpp b/folly/experimental/wangle/concurrent/test/ThreadPoolExecutorTest.cpp new file mode 100644 index 00000000..fe7b8dcd --- /dev/null +++ b/folly/experimental/wangle/concurrent/test/ThreadPoolExecutorTest.cpp @@ -0,0 +1,126 @@ +/* + * Copyright 2014 Facebook, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include +#include +#include +#include +#include + +using namespace folly::wangle; + +template +static void basic() { + // Create and destroy + TPE tpe(10); +} + +TEST(ThreadPoolExecutorTest, CPUBasic) { + basic(); +} + +TEST(IOThreadPoolExecutorTest, IOBasic) { + basic(); +} + +template +static void resize() { + TPE tpe(100); + EXPECT_EQ(100, tpe.numThreads()); + tpe.setNumThreads(50); + EXPECT_EQ(50, tpe.numThreads()); + tpe.setNumThreads(150); + EXPECT_EQ(150, tpe.numThreads()); +} + +TEST(ThreadPoolExecutorTest, CPUResize) { + resize(); +} + +TEST(ThreadPoolExecutorTest, IOResize) { + resize(); +} + +template +static void stop() { + TPE tpe(10); + std::atomic completed(0); + auto f = [&](){ + std::this_thread::sleep_for(std::chrono::milliseconds(1)); + completed++; + }; + for (int i = 0; i < 1000; i++) { + tpe.add(f); + } + tpe.stop(); + EXPECT_GT(1000, completed); +} + +TEST(ThreadPoolExecutorTest, CPUStop) { + stop(); +} + +TEST(ThreadPoolExecutorTest, IOStop) { + stop(); +} + +template +static void join() { + TPE tpe(10); + std::atomic completed(0); + auto f = [&](){ + std::this_thread::sleep_for(std::chrono::milliseconds(1)); + completed++; + }; + for (int i = 0; i < 1000; i++) { + tpe.add(f); + } + tpe.join(); + EXPECT_EQ(1000, completed); +} + +TEST(ThreadPoolExecutorTest, CPUJoin) { + join(); +} + +TEST(ThreadPoolExecutorTest, IOJoin) { + join(); +} + +template +static void resizeUnderLoad() { + TPE tpe(10); + std::atomic completed(0); + auto f = [&](){ + std::this_thread::sleep_for(std::chrono::milliseconds(1)); + completed++; + }; + for (int i = 0; i < 1000; i++) { + tpe.add(f); + } + tpe.setNumThreads(5); + tpe.setNumThreads(15); + tpe.join(); + EXPECT_EQ(1000, completed); +} + +TEST(ThreadPoolExecutorTest, CPUResizeUnderLoad) { + resizeUnderLoad(); +} + +TEST(ThreadPoolExecutorTest, IOResizeUnderLoad) { + resizeUnderLoad(); +} -- 2.34.1