From cf24f9855c678de382e0abbf0e6de257058581f6 Mon Sep 17 00:00:00 2001 From: Scott Michelson Date: Wed, 1 Feb 2017 12:03:48 -0800 Subject: [PATCH] MPMCQueue Wrapper with priorities Summary: In many cases where MPMCQueue would be used, it's useful to be able to prioritize requests. This implements a thin wrapper on MPMCQueue to accomplish this (albeit in a bit of a memory inefficient way) Reviewed By: haijunz Differential Revision: D4465498 fbshipit-source-id: 6630b80ccf3138b5c135e7f7f281133b37d82b4d --- folly/Makefile.am | 1 + folly/PriorityMPMCQueue.h | 97 ++++++++++++++++++++++++++++ folly/test/PriorityMPMCQueueTest.cpp | 75 +++++++++++++++++++++ 3 files changed, 173 insertions(+) create mode 100644 folly/PriorityMPMCQueue.h create mode 100644 folly/test/PriorityMPMCQueueTest.cpp diff --git a/folly/Makefile.am b/folly/Makefile.am index 8fad6bc2..75dceafd 100644 --- a/folly/Makefile.am +++ b/folly/Makefile.am @@ -318,6 +318,7 @@ nobase_follyinclude_HEADERS = \ portability/Windows.h \ portability/Unistd.h \ Preprocessor.h \ + PriorityMPMCQueue.h \ ProducerConsumerQueue.h \ Random.h \ Random-inl.h \ diff --git a/folly/PriorityMPMCQueue.h b/folly/PriorityMPMCQueue.h new file mode 100644 index 00000000..b1c0a875 --- /dev/null +++ b/folly/PriorityMPMCQueue.h @@ -0,0 +1,97 @@ +/* + * Copyright 2017 Facebook, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#pragma once + +#include +#include +#include + +#include + +namespace folly { + +/// PriorityMPMCQueue is a thin wrapper on MPMCQueue, providing priorities +/// by managing multiple underlying MPMCQueues. As of now, this does +/// not implement a blocking interface. For the purposes of this +/// class, lower number is higher priority + +template +class PriorityMPMCQueue { + public: + PriorityMPMCQueue(size_t numPriorities, size_t capacity) { + CHECK_GT(numPriorities, 0); + queues_.reserve(numPriorities); + for (size_t i = 0; i < numPriorities; i++) { + queues_.emplace_back(capacity); + } + } + + size_t getNumPriorities() { + return queues_.size(); + } + + // Add at medium priority by default + bool write(T&& item) { + return writeWithPriority(std::move(item), getNumPriorities() / 2); + } + + bool writeWithPriority(T&& item, size_t priority) { + size_t queue = std::min(getNumPriorities() - 1, priority); + CHECK_LT(queue, queues_.size()); + return queues_.at(queue).write(std::move(item)); + } + + bool read(T& item) { + for (auto& q : queues_) { + if (q.readIfNotEmpty(item)) { + return true; + } + } + return false; + } + + size_t size() const { + size_t total_size = 0; + for (auto& q : queues_) { + // MPMCQueue can have a negative size if there are pending readers. + // Since we don't expose a blocking interface this shouldn't happen, + // But just in case we put a floor at 0 + total_size += std::max(0, q.size()); + } + return total_size; + } + + size_t sizeGuess() const { + size_t total_size = 0; + for (auto& q : queues_) { + // MPMCQueue can have a negative size if there are pending readers. + // Since we don't expose a blocking interface this shouldn't happen, + // But just in case we put a floor at 0 + total_size += std::max(0, q.sizeGuess()); + } + return total_size; + } + + /// Returns true if there are no items available for dequeue + bool isEmpty() const { + return size() == 0; + } + + private: + std::vector> queues_; +}; + +} // namespace folly diff --git a/folly/test/PriorityMPMCQueueTest.cpp b/folly/test/PriorityMPMCQueueTest.cpp new file mode 100644 index 00000000..a43cca0b --- /dev/null +++ b/folly/test/PriorityMPMCQueueTest.cpp @@ -0,0 +1,75 @@ +/* + * Copyright 2017 Facebook, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include +#include + +using namespace folly; + +TEST(PriorityMPMCQueue, BasicOps) { + // With just one priority, this should behave like a normal MPMCQueue + PriorityMPMCQueue queue(1, 10); + EXPECT_TRUE(queue.isEmpty()); + EXPECT_EQ(1, queue.getNumPriorities()); + + queue.write(9); + queue.write(8); + + EXPECT_FALSE(queue.isEmpty()); + EXPECT_EQ(2, queue.size()); + EXPECT_EQ(2, queue.sizeGuess()); + + size_t item; + queue.read(item); + EXPECT_EQ(9, item); + EXPECT_FALSE(queue.isEmpty()); + EXPECT_EQ(1, queue.size()); + EXPECT_EQ(1, queue.sizeGuess()); + + queue.read(item); + EXPECT_EQ(8, item); + EXPECT_TRUE(queue.isEmpty()); + EXPECT_EQ(0, queue.size()); + EXPECT_EQ(0, queue.sizeGuess()); +} + +TEST(PriorityMPMCQueue, TestPriorities) { + PriorityMPMCQueue queue(3, 10); + EXPECT_TRUE(queue.isEmpty()); + EXPECT_EQ(3, queue.getNumPriorities()); + + // This should go to the lowpri queue, as we only + // have 3 priorities + queue.writeWithPriority(5, 50); + // unqualified writes should be mid-pri + queue.write(3); + queue.writeWithPriority(6, 2); + queue.writeWithPriority(1, 0); + queue.write(4); + queue.writeWithPriority(2, 0); + + EXPECT_FALSE(queue.isEmpty()); + EXPECT_EQ(6, queue.size()); + EXPECT_EQ(6, queue.sizeGuess()); + + size_t item; + for (int i = 1; i <= 6; i++) { + queue.read(item); + EXPECT_EQ(i, item); + EXPECT_EQ(6 - i, queue.size()); + EXPECT_EQ(6 - i, queue.sizeGuess()); + } +} -- 2.34.1