From fe22c76c53069bb4f1338a5fa6ffeb6a8a2a5923 Mon Sep 17 00:00:00 2001 From: James Sedgwick Date: Mon, 17 Nov 2014 14:44:10 -0800 Subject: [PATCH] priority CPU thread pool Summary: just extend CPUThreadPoolExecutor to use a queue that is itself composed of N mpmc queues, one per priority the verbosity is starting to kill me, i had thought before of truncating Executor of all these pool types and now I'm definitely going to do that unless someone fights me. Test Plan: added unit; maybe i'm not being clever enough as i couldn't think of many ways to test this reliably so there's just a basic preemption test Reviewed By: davejwatson@fb.com Subscribers: trunkagent, fugalh, njormrod, folly-diffs@, bmatheny FB internal diff: D1676452 Tasks: 5002392 Signature: t1:1676452:1416263990:cdf5d44e4a50a6180ba547a3ed4c0c24d4ffdd8f --- .../wangle/concurrent/BlockingQueue.h | 13 ++++ .../concurrent/CPUThreadPoolExecutor.cpp | 51 ++++++++++++ .../wangle/concurrent/CPUThreadPoolExecutor.h | 30 +++++++- .../concurrent/PriorityLifoSemMPMCQueue.h | 77 +++++++++++++++++++ .../test/ThreadPoolExecutorTest.cpp | 23 ++++++ 5 files changed, 191 insertions(+), 3 deletions(-) create mode 100644 folly/experimental/wangle/concurrent/PriorityLifoSemMPMCQueue.h diff --git a/folly/experimental/wangle/concurrent/BlockingQueue.h b/folly/experimental/wangle/concurrent/BlockingQueue.h index 6a653544..08a1f703 100644 --- a/folly/experimental/wangle/concurrent/BlockingQueue.h +++ b/folly/experimental/wangle/concurrent/BlockingQueue.h @@ -15,6 +15,9 @@ */ #pragma once + +#include + namespace folly { namespace wangle { template @@ -22,6 +25,16 @@ class BlockingQueue { public: virtual ~BlockingQueue() {} virtual void add(T item) = 0; + virtual void addWithPriority(T item, uint32_t priority) { + LOG_FIRST_N(WARNING, 1) << + "add(item, priority) called on a non-priority queue"; + add(std::move(item)); + } + virtual uint32_t getNumPriorities() { + LOG_FIRST_N(WARNING, 1) << + "getNumPriorities() called on a non-priority queue"; + return 1; + } virtual T take() = 0; virtual size_t size() = 0; }; diff --git a/folly/experimental/wangle/concurrent/CPUThreadPoolExecutor.cpp b/folly/experimental/wangle/concurrent/CPUThreadPoolExecutor.cpp index 4ef20454..9caf6bee 100644 --- a/folly/experimental/wangle/concurrent/CPUThreadPoolExecutor.cpp +++ b/folly/experimental/wangle/concurrent/CPUThreadPoolExecutor.cpp @@ -15,10 +15,12 @@ */ #include +#include namespace folly { namespace wangle { const size_t CPUThreadPoolExecutor::kDefaultMaxQueueSize = 1 << 18; +const size_t CPUThreadPoolExecutor::kDefaultNumPriorities = 2; CPUThreadPoolExecutor::CPUThreadPoolExecutor( size_t numThreads, @@ -30,6 +32,31 @@ CPUThreadPoolExecutor::CPUThreadPoolExecutor( CHECK(threadList_.get().size() == numThreads); } +CPUThreadPoolExecutor::CPUThreadPoolExecutor( + size_t numThreads, + std::shared_ptr threadFactory) + : CPUThreadPoolExecutor( + numThreads, + folly::make_unique>( + CPUThreadPoolExecutor::kDefaultMaxQueueSize), + std::move(threadFactory)) {} + +CPUThreadPoolExecutor::CPUThreadPoolExecutor(size_t numThreads) + : CPUThreadPoolExecutor( + numThreads, + std::make_shared("CPUThreadPool")) {} + +CPUThreadPoolExecutor::CPUThreadPoolExecutor( + size_t numThreads, + uint32_t numPriorities, + std::shared_ptr threadFactory) + : CPUThreadPoolExecutor( + numThreads, + folly::make_unique>( + numPriorities, + CPUThreadPoolExecutor::kDefaultMaxQueueSize), + std::move(threadFactory)) {} + CPUThreadPoolExecutor::~CPUThreadPoolExecutor() { stop(); CHECK(threadsToStop_ == 0); @@ -48,6 +75,30 @@ void CPUThreadPoolExecutor::add( CPUTask(std::move(func), expiration, std::move(expireCallback))); } +void CPUThreadPoolExecutor::add(Func func, uint32_t priority) { + add(std::move(func), priority, std::chrono::milliseconds(0)); +} + +void CPUThreadPoolExecutor::add( + Func func, + uint32_t priority, + std::chrono::milliseconds expiration, + Func expireCallback) { + CHECK(priority < getNumPriorities()); + taskQueue_->addWithPriority( + CPUTask(std::move(func), expiration, std::move(expireCallback)), + priority); +} + +uint32_t CPUThreadPoolExecutor::getNumPriorities() const { + return taskQueue_->getNumPriorities(); +} + +BlockingQueue* +CPUThreadPoolExecutor::getTaskQueue() { + return taskQueue_.get(); +} + void CPUThreadPoolExecutor::threadRun(std::shared_ptr thread) { thread->startupBaton.post(); while (1) { diff --git a/folly/experimental/wangle/concurrent/CPUThreadPoolExecutor.h b/folly/experimental/wangle/concurrent/CPUThreadPoolExecutor.h index f331232f..b7e88685 100644 --- a/folly/experimental/wangle/concurrent/CPUThreadPoolExecutor.h +++ b/folly/experimental/wangle/concurrent/CPUThreadPoolExecutor.h @@ -15,6 +15,7 @@ */ #pragma once + #include namespace folly { namespace wangle { @@ -25,9 +26,19 @@ class CPUThreadPoolExecutor : public ThreadPoolExecutor { explicit CPUThreadPoolExecutor( size_t numThreads, - std::unique_ptr> taskQueue = - folly::make_unique>( - CPUThreadPoolExecutor::kDefaultMaxQueueSize), + std::unique_ptr> taskQueue, + std::shared_ptr threadFactory = + std::make_shared("CPUThreadPool")); + + explicit CPUThreadPoolExecutor(size_t numThreads); + + explicit CPUThreadPoolExecutor( + size_t numThreads, + std::shared_ptr threadFactory); + + explicit CPUThreadPoolExecutor( + size_t numThreads, + uint32_t numPriorities, std::shared_ptr threadFactory = std::make_shared("CPUThreadPool")); @@ -39,6 +50,15 @@ class CPUThreadPoolExecutor : public ThreadPoolExecutor { std::chrono::milliseconds expiration, Func expireCallback = nullptr) override; + void add(Func func, uint32_t priority); + void add( + Func func, + uint32_t priority, + std::chrono::milliseconds expiration, + Func expireCallback = nullptr); + + uint32_t getNumPriorities() const; + struct CPUTask : public ThreadPoolExecutor::Task { // Must be noexcept move constructible so it can be used in MPMCQueue explicit CPUTask( @@ -57,6 +77,10 @@ class CPUThreadPoolExecutor : public ThreadPoolExecutor { }; static const size_t kDefaultMaxQueueSize; + static const size_t kDefaultNumPriorities; + + protected: + BlockingQueue* getTaskQueue(); private: void threadRun(ThreadPtr thread) override; diff --git a/folly/experimental/wangle/concurrent/PriorityLifoSemMPMCQueue.h b/folly/experimental/wangle/concurrent/PriorityLifoSemMPMCQueue.h new file mode 100644 index 00000000..65500f58 --- /dev/null +++ b/folly/experimental/wangle/concurrent/PriorityLifoSemMPMCQueue.h @@ -0,0 +1,77 @@ +/* + * Copyright 2014 Facebook, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#pragma once +#include +#include +#include + +namespace folly { namespace wangle { + +template +class PriorityLifoSemMPMCQueue : public BlockingQueue { + public: + explicit PriorityLifoSemMPMCQueue(uint32_t numPriorities, size_t capacity) { + CHECK(numPriorities > 0); + queues_.reserve(numPriorities); + for (int i = 0; i < numPriorities; i++) { + queues_.push_back(MPMCQueue(capacity)); + } + } + + uint32_t getNumPriorities() override { + return queues_.size(); + } + + // Add at lowest priority by default + void add(T item) override { + addWithPriority(std::move(item), 0); + } + + void addWithPriority(T item, uint32_t priority) override { + CHECK(priority < queues_.size()); + if (!queues_[priority].write(std::move(item))) { + throw std::runtime_error("LifoSemMPMCQueue full, can't add item"); + } + sem_.post(); + } + + T take() override { + T item; + while (true) { + for (auto it = queues_.rbegin(); it != queues_.rend(); it++) { + if (it->read(item)) { + return item; + } + } + sem_.wait(); + } + } + + size_t size() override { + size_t size = 0; + for (auto& q : queues_) { + size += q.size(); + } + return size; + } + + private: + LifoSem sem_; + std::vector> queues_; +}; + +}} // folly::wangle diff --git a/folly/experimental/wangle/concurrent/test/ThreadPoolExecutorTest.cpp b/folly/experimental/wangle/concurrent/test/ThreadPoolExecutorTest.cpp index 471c8c6d..6e3782ce 100644 --- a/folly/experimental/wangle/concurrent/test/ThreadPoolExecutorTest.cpp +++ b/folly/experimental/wangle/concurrent/test/ThreadPoolExecutorTest.cpp @@ -277,3 +277,26 @@ TEST(ThreadPoolExecutorTest, CPUFuturePool) { TEST(ThreadPoolExecutorTest, IOFuturePool) { futureExecutor(); } + +TEST(ThreadPoolExecutorTest, PriorityPreemptionTest) { + bool tookLopri = false; + auto completed = 0; + auto hipri = [&] { + EXPECT_FALSE(tookLopri); + completed++; + }; + auto lopri = [&] { + tookLopri = true; + completed++; + }; + CPUThreadPoolExecutor pool(0, 2); + for (int i = 0; i < 50; i++) { + pool.add(lopri, 0); + } + for (int i = 0; i < 50; i++) { + pool.add(hipri, 1); + } + pool.setNumThreads(1); + pool.join(); + EXPECT_EQ(100, completed); +} -- 2.34.1