MPMCQueue Wrapper with priorities
authorScott Michelson <sdmich@fb.com>
Wed, 1 Feb 2017 20:03:48 +0000 (12:03 -0800)
committerFacebook Github Bot <facebook-github-bot@users.noreply.github.com>
Wed, 1 Feb 2017 20:18:48 +0000 (12:18 -0800)
Summary: In many cases where MPMCQueue would be used, it's useful to be able to prioritize requests. This implements a thin wrapper on MPMCQueue to accomplish this (albeit in a bit of a memory inefficient way)

Reviewed By: haijunz

Differential Revision: D4465498

fbshipit-source-id: 6630b80ccf3138b5c135e7f7f281133b37d82b4d

folly/Makefile.am
folly/PriorityMPMCQueue.h [new file with mode: 0644]
folly/test/PriorityMPMCQueueTest.cpp [new file with mode: 0644]

index 8fad6bc278336e34a71218d4ca3a64013c55490d..75dceafdeab0cb7f9cc37f14aa0935be82c6fa0a 100644 (file)
@@ -318,6 +318,7 @@ nobase_follyinclude_HEADERS = \
        portability/Windows.h \
        portability/Unistd.h \
        Preprocessor.h \
+       PriorityMPMCQueue.h \
        ProducerConsumerQueue.h \
        Random.h \
        Random-inl.h \
diff --git a/folly/PriorityMPMCQueue.h b/folly/PriorityMPMCQueue.h
new file mode 100644 (file)
index 0000000..b1c0a87
--- /dev/null
@@ -0,0 +1,97 @@
+/*
+ * Copyright 2017 Facebook, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#pragma once
+
+#include <glog/logging.h>
+#include <algorithm>
+#include <vector>
+
+#include <folly/MPMCQueue.h>
+
+namespace folly {
+
+/// PriorityMPMCQueue is a thin wrapper on MPMCQueue, providing priorities
+/// by managing multiple underlying MPMCQueues. As of now, this does
+/// not implement a blocking interface. For the purposes of this
+/// class, lower number is higher priority
+
+template <class T>
+class PriorityMPMCQueue {
+ public:
+  PriorityMPMCQueue(size_t numPriorities, size_t capacity) {
+    CHECK_GT(numPriorities, 0);
+    queues_.reserve(numPriorities);
+    for (size_t i = 0; i < numPriorities; i++) {
+      queues_.emplace_back(capacity);
+    }
+  }
+
+  size_t getNumPriorities() {
+    return queues_.size();
+  }
+
+  // Add at medium priority by default
+  bool write(T&& item) {
+    return writeWithPriority(std::move(item), getNumPriorities() / 2);
+  }
+
+  bool writeWithPriority(T&& item, size_t priority) {
+    size_t queue = std::min(getNumPriorities() - 1, priority);
+    CHECK_LT(queue, queues_.size());
+    return queues_.at(queue).write(std::move(item));
+  }
+
+  bool read(T& item) {
+    for (auto& q : queues_) {
+      if (q.readIfNotEmpty(item)) {
+        return true;
+      }
+    }
+    return false;
+  }
+
+  size_t size() const {
+    size_t total_size = 0;
+    for (auto& q : queues_) {
+      // MPMCQueue can have a negative size if there are pending readers.
+      // Since we don't expose a blocking interface this shouldn't happen,
+      // But just in case we put a floor at 0
+      total_size += std::max<ssize_t>(0, q.size());
+    }
+    return total_size;
+  }
+
+  size_t sizeGuess() const {
+    size_t total_size = 0;
+    for (auto& q : queues_) {
+      // MPMCQueue can have a negative size if there are pending readers.
+      // Since we don't expose a blocking interface this shouldn't happen,
+      // But just in case we put a floor at 0
+      total_size += std::max<ssize_t>(0, q.sizeGuess());
+    }
+    return total_size;
+  }
+
+  /// Returns true if there are no items available for dequeue
+  bool isEmpty() const {
+    return size() == 0;
+  }
+
+ private:
+  std::vector<folly::MPMCQueue<T>> queues_;
+};
+
+} // namespace folly
diff --git a/folly/test/PriorityMPMCQueueTest.cpp b/folly/test/PriorityMPMCQueueTest.cpp
new file mode 100644 (file)
index 0000000..a43cca0
--- /dev/null
@@ -0,0 +1,75 @@
+/*
+ * Copyright 2017 Facebook, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include <folly/PriorityMPMCQueue.h>
+#include <folly/portability/GTest.h>
+
+using namespace folly;
+
+TEST(PriorityMPMCQueue, BasicOps) {
+  // With just one priority, this should behave like a normal MPMCQueue
+  PriorityMPMCQueue<size_t> queue(1, 10);
+  EXPECT_TRUE(queue.isEmpty());
+  EXPECT_EQ(1, queue.getNumPriorities());
+
+  queue.write(9);
+  queue.write(8);
+
+  EXPECT_FALSE(queue.isEmpty());
+  EXPECT_EQ(2, queue.size());
+  EXPECT_EQ(2, queue.sizeGuess());
+
+  size_t item;
+  queue.read(item);
+  EXPECT_EQ(9, item);
+  EXPECT_FALSE(queue.isEmpty());
+  EXPECT_EQ(1, queue.size());
+  EXPECT_EQ(1, queue.sizeGuess());
+
+  queue.read(item);
+  EXPECT_EQ(8, item);
+  EXPECT_TRUE(queue.isEmpty());
+  EXPECT_EQ(0, queue.size());
+  EXPECT_EQ(0, queue.sizeGuess());
+}
+
+TEST(PriorityMPMCQueue, TestPriorities) {
+  PriorityMPMCQueue<size_t> queue(3, 10);
+  EXPECT_TRUE(queue.isEmpty());
+  EXPECT_EQ(3, queue.getNumPriorities());
+
+  // This should go to the lowpri queue, as we only
+  // have 3 priorities
+  queue.writeWithPriority(5, 50);
+  // unqualified writes should be mid-pri
+  queue.write(3);
+  queue.writeWithPriority(6, 2);
+  queue.writeWithPriority(1, 0);
+  queue.write(4);
+  queue.writeWithPriority(2, 0);
+
+  EXPECT_FALSE(queue.isEmpty());
+  EXPECT_EQ(6, queue.size());
+  EXPECT_EQ(6, queue.sizeGuess());
+
+  size_t item;
+  for (int i = 1; i <= 6; i++) {
+    queue.read(item);
+    EXPECT_EQ(i, item);
+    EXPECT_EQ(6 - i, queue.size());
+    EXPECT_EQ(6 - i, queue.sizeGuess());
+  }
+}