portability/Windows.h \
portability/Unistd.h \
Preprocessor.h \
+ PriorityMPMCQueue.h \
ProducerConsumerQueue.h \
Random.h \
Random-inl.h \
--- /dev/null
+/*
+ * Copyright 2017 Facebook, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#pragma once
+
+#include <glog/logging.h>
+#include <algorithm>
+#include <vector>
+
+#include <folly/MPMCQueue.h>
+
+namespace folly {
+
+/// PriorityMPMCQueue is a thin wrapper on MPMCQueue, providing priorities
+/// by managing multiple underlying MPMCQueues. As of now, this does
+/// not implement a blocking interface. For the purposes of this
+/// class, lower number is higher priority
+
+template <class T>
+class PriorityMPMCQueue {
+ public:
+ PriorityMPMCQueue(size_t numPriorities, size_t capacity) {
+ CHECK_GT(numPriorities, 0);
+ queues_.reserve(numPriorities);
+ for (size_t i = 0; i < numPriorities; i++) {
+ queues_.emplace_back(capacity);
+ }
+ }
+
+ size_t getNumPriorities() {
+ return queues_.size();
+ }
+
+ // Add at medium priority by default
+ bool write(T&& item) {
+ return writeWithPriority(std::move(item), getNumPriorities() / 2);
+ }
+
+ bool writeWithPriority(T&& item, size_t priority) {
+ size_t queue = std::min(getNumPriorities() - 1, priority);
+ CHECK_LT(queue, queues_.size());
+ return queues_.at(queue).write(std::move(item));
+ }
+
+ bool read(T& item) {
+ for (auto& q : queues_) {
+ if (q.readIfNotEmpty(item)) {
+ return true;
+ }
+ }
+ return false;
+ }
+
+ size_t size() const {
+ size_t total_size = 0;
+ for (auto& q : queues_) {
+ // MPMCQueue can have a negative size if there are pending readers.
+ // Since we don't expose a blocking interface this shouldn't happen,
+ // But just in case we put a floor at 0
+ total_size += std::max<ssize_t>(0, q.size());
+ }
+ return total_size;
+ }
+
+ size_t sizeGuess() const {
+ size_t total_size = 0;
+ for (auto& q : queues_) {
+ // MPMCQueue can have a negative size if there are pending readers.
+ // Since we don't expose a blocking interface this shouldn't happen,
+ // But just in case we put a floor at 0
+ total_size += std::max<ssize_t>(0, q.sizeGuess());
+ }
+ return total_size;
+ }
+
+ /// Returns true if there are no items available for dequeue
+ bool isEmpty() const {
+ return size() == 0;
+ }
+
+ private:
+ std::vector<folly::MPMCQueue<T>> queues_;
+};
+
+} // namespace folly
--- /dev/null
+/*
+ * Copyright 2017 Facebook, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include <folly/PriorityMPMCQueue.h>
+#include <folly/portability/GTest.h>
+
+using namespace folly;
+
+TEST(PriorityMPMCQueue, BasicOps) {
+ // With just one priority, this should behave like a normal MPMCQueue
+ PriorityMPMCQueue<size_t> queue(1, 10);
+ EXPECT_TRUE(queue.isEmpty());
+ EXPECT_EQ(1, queue.getNumPriorities());
+
+ queue.write(9);
+ queue.write(8);
+
+ EXPECT_FALSE(queue.isEmpty());
+ EXPECT_EQ(2, queue.size());
+ EXPECT_EQ(2, queue.sizeGuess());
+
+ size_t item;
+ queue.read(item);
+ EXPECT_EQ(9, item);
+ EXPECT_FALSE(queue.isEmpty());
+ EXPECT_EQ(1, queue.size());
+ EXPECT_EQ(1, queue.sizeGuess());
+
+ queue.read(item);
+ EXPECT_EQ(8, item);
+ EXPECT_TRUE(queue.isEmpty());
+ EXPECT_EQ(0, queue.size());
+ EXPECT_EQ(0, queue.sizeGuess());
+}
+
+TEST(PriorityMPMCQueue, TestPriorities) {
+ PriorityMPMCQueue<size_t> queue(3, 10);
+ EXPECT_TRUE(queue.isEmpty());
+ EXPECT_EQ(3, queue.getNumPriorities());
+
+ // This should go to the lowpri queue, as we only
+ // have 3 priorities
+ queue.writeWithPriority(5, 50);
+ // unqualified writes should be mid-pri
+ queue.write(3);
+ queue.writeWithPriority(6, 2);
+ queue.writeWithPriority(1, 0);
+ queue.write(4);
+ queue.writeWithPriority(2, 0);
+
+ EXPECT_FALSE(queue.isEmpty());
+ EXPECT_EQ(6, queue.size());
+ EXPECT_EQ(6, queue.sizeGuess());
+
+ size_t item;
+ for (int i = 1; i <= 6; i++) {
+ queue.read(item);
+ EXPECT_EQ(i, item);
+ EXPECT_EQ(6 - i, queue.size());
+ EXPECT_EQ(6 - i, queue.sizeGuess());
+ }
+}