*/
#pragma once
+
+#include <glog/logging.h>
+
namespace folly { namespace wangle {
template <class T>
public:
virtual ~BlockingQueue() {}
virtual void add(T item) = 0;
+ virtual void addWithPriority(T item, uint32_t priority) {
+ LOG_FIRST_N(WARNING, 1) <<
+ "add(item, priority) called on a non-priority queue";
+ add(std::move(item));
+ }
+ virtual uint32_t getNumPriorities() {
+ LOG_FIRST_N(WARNING, 1) <<
+ "getNumPriorities() called on a non-priority queue";
+ return 1;
+ }
virtual T take() = 0;
virtual size_t size() = 0;
};
*/
#include <folly/experimental/wangle/concurrent/CPUThreadPoolExecutor.h>
+#include <folly/experimental/wangle/concurrent/PriorityLifoSemMPMCQueue.h>
namespace folly { namespace wangle {
const size_t CPUThreadPoolExecutor::kDefaultMaxQueueSize = 1 << 18;
+const size_t CPUThreadPoolExecutor::kDefaultNumPriorities = 2;
CPUThreadPoolExecutor::CPUThreadPoolExecutor(
size_t numThreads,
CHECK(threadList_.get().size() == numThreads);
}
+CPUThreadPoolExecutor::CPUThreadPoolExecutor(
+ size_t numThreads,
+ std::shared_ptr<ThreadFactory> threadFactory)
+ : CPUThreadPoolExecutor(
+ numThreads,
+ folly::make_unique<LifoSemMPMCQueue<CPUTask>>(
+ CPUThreadPoolExecutor::kDefaultMaxQueueSize),
+ std::move(threadFactory)) {}
+
+CPUThreadPoolExecutor::CPUThreadPoolExecutor(size_t numThreads)
+ : CPUThreadPoolExecutor(
+ numThreads,
+ std::make_shared<NamedThreadFactory>("CPUThreadPool")) {}
+
+CPUThreadPoolExecutor::CPUThreadPoolExecutor(
+ size_t numThreads,
+ uint32_t numPriorities,
+ std::shared_ptr<ThreadFactory> threadFactory)
+ : CPUThreadPoolExecutor(
+ numThreads,
+ folly::make_unique<PriorityLifoSemMPMCQueue<CPUTask>>(
+ numPriorities,
+ CPUThreadPoolExecutor::kDefaultMaxQueueSize),
+ std::move(threadFactory)) {}
+
CPUThreadPoolExecutor::~CPUThreadPoolExecutor() {
stop();
CHECK(threadsToStop_ == 0);
CPUTask(std::move(func), expiration, std::move(expireCallback)));
}
+void CPUThreadPoolExecutor::add(Func func, uint32_t priority) {
+ add(std::move(func), priority, std::chrono::milliseconds(0));
+}
+
+void CPUThreadPoolExecutor::add(
+ Func func,
+ uint32_t priority,
+ std::chrono::milliseconds expiration,
+ Func expireCallback) {
+ CHECK(priority < getNumPriorities());
+ taskQueue_->addWithPriority(
+ CPUTask(std::move(func), expiration, std::move(expireCallback)),
+ priority);
+}
+
+uint32_t CPUThreadPoolExecutor::getNumPriorities() const {
+ return taskQueue_->getNumPriorities();
+}
+
+BlockingQueue<CPUThreadPoolExecutor::CPUTask>*
+CPUThreadPoolExecutor::getTaskQueue() {
+ return taskQueue_.get();
+}
+
void CPUThreadPoolExecutor::threadRun(std::shared_ptr<Thread> thread) {
thread->startupBaton.post();
while (1) {
*/
#pragma once
+
#include <folly/experimental/wangle/concurrent/ThreadPoolExecutor.h>
namespace folly { namespace wangle {
explicit CPUThreadPoolExecutor(
size_t numThreads,
- std::unique_ptr<BlockingQueue<CPUTask>> taskQueue =
- folly::make_unique<LifoSemMPMCQueue<CPUTask>>(
- CPUThreadPoolExecutor::kDefaultMaxQueueSize),
+ std::unique_ptr<BlockingQueue<CPUTask>> taskQueue,
+ std::shared_ptr<ThreadFactory> threadFactory =
+ std::make_shared<NamedThreadFactory>("CPUThreadPool"));
+
+ explicit CPUThreadPoolExecutor(size_t numThreads);
+
+ explicit CPUThreadPoolExecutor(
+ size_t numThreads,
+ std::shared_ptr<ThreadFactory> threadFactory);
+
+ explicit CPUThreadPoolExecutor(
+ size_t numThreads,
+ uint32_t numPriorities,
std::shared_ptr<ThreadFactory> threadFactory =
std::make_shared<NamedThreadFactory>("CPUThreadPool"));
std::chrono::milliseconds expiration,
Func expireCallback = nullptr) override;
+ void add(Func func, uint32_t priority);
+ void add(
+ Func func,
+ uint32_t priority,
+ std::chrono::milliseconds expiration,
+ Func expireCallback = nullptr);
+
+ uint32_t getNumPriorities() const;
+
struct CPUTask : public ThreadPoolExecutor::Task {
// Must be noexcept move constructible so it can be used in MPMCQueue
explicit CPUTask(
};
static const size_t kDefaultMaxQueueSize;
+ static const size_t kDefaultNumPriorities;
+
+ protected:
+ BlockingQueue<CPUTask>* getTaskQueue();
private:
void threadRun(ThreadPtr thread) override;
--- /dev/null
+/*
+ * Copyright 2014 Facebook, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#pragma once
+#include <folly/experimental/wangle/concurrent/BlockingQueue.h>
+#include <folly/LifoSem.h>
+#include <folly/MPMCQueue.h>
+
+namespace folly { namespace wangle {
+
+template <class T>
+class PriorityLifoSemMPMCQueue : public BlockingQueue<T> {
+ public:
+ explicit PriorityLifoSemMPMCQueue(uint32_t numPriorities, size_t capacity) {
+ CHECK(numPriorities > 0);
+ queues_.reserve(numPriorities);
+ for (int i = 0; i < numPriorities; i++) {
+ queues_.push_back(MPMCQueue<T>(capacity));
+ }
+ }
+
+ uint32_t getNumPriorities() override {
+ return queues_.size();
+ }
+
+ // Add at lowest priority by default
+ void add(T item) override {
+ addWithPriority(std::move(item), 0);
+ }
+
+ void addWithPriority(T item, uint32_t priority) override {
+ CHECK(priority < queues_.size());
+ if (!queues_[priority].write(std::move(item))) {
+ throw std::runtime_error("LifoSemMPMCQueue full, can't add item");
+ }
+ sem_.post();
+ }
+
+ T take() override {
+ T item;
+ while (true) {
+ for (auto it = queues_.rbegin(); it != queues_.rend(); it++) {
+ if (it->read(item)) {
+ return item;
+ }
+ }
+ sem_.wait();
+ }
+ }
+
+ size_t size() override {
+ size_t size = 0;
+ for (auto& q : queues_) {
+ size += q.size();
+ }
+ return size;
+ }
+
+ private:
+ LifoSem sem_;
+ std::vector<MPMCQueue<T>> queues_;
+};
+
+}} // folly::wangle
TEST(ThreadPoolExecutorTest, IOFuturePool) {
futureExecutor<IOThreadPoolExecutor>();
}
+
+TEST(ThreadPoolExecutorTest, PriorityPreemptionTest) {
+ bool tookLopri = false;
+ auto completed = 0;
+ auto hipri = [&] {
+ EXPECT_FALSE(tookLopri);
+ completed++;
+ };
+ auto lopri = [&] {
+ tookLopri = true;
+ completed++;
+ };
+ CPUThreadPoolExecutor pool(0, 2);
+ for (int i = 0; i < 50; i++) {
+ pool.add(lopri, 0);
+ }
+ for (int i = 0; i < 50; i++) {
+ pool.add(hipri, 1);
+ }
+ pool.setNumThreads(1);
+ pool.join();
+ EXPECT_EQ(100, completed);
+}