2 * Copyright 2017 Facebook, Inc.
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
8 * http://www.apache.org/licenses/LICENSE-2.0
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
18 #include <glog/logging.h>
22 #include <folly/MPMCQueue.h>
26 /// PriorityMPMCQueue is a thin wrapper on MPMCQueue, providing priorities
27 /// by managing multiple underlying MPMCQueues. As of now, this does
28 /// not implement a blocking interface. For the purposes of this
29 /// class, lower number is higher priority
33 template <typename> class Atom = std::atomic,
35 class PriorityMPMCQueue {
37 PriorityMPMCQueue(size_t numPriorities, size_t capacity) {
38 CHECK_GT(numPriorities, 0);
39 queues_.reserve(numPriorities);
40 for (size_t i = 0; i < numPriorities; i++) {
41 queues_.emplace_back(capacity);
45 size_t getNumPriorities() {
46 return queues_.size();
49 // Add at medium priority by default
50 bool write(T&& item) {
51 return writeWithPriority(std::move(item), getNumPriorities() / 2);
54 bool writeWithPriority(T&& item, size_t priority) {
55 size_t queue = std::min(getNumPriorities() - 1, priority);
56 CHECK_LT(queue, queues_.size());
57 return queues_.at(queue).write(std::move(item));
61 for (auto& q : queues_) {
62 if (q.readIfNotEmpty(item)) {
69 bool readWithPriority(T& item, size_t priority) {
70 return queues_[priority].readIfNotEmpty(item);
74 size_t total_size = 0;
75 for (auto& q : queues_) {
76 // MPMCQueue can have a negative size if there are pending readers.
77 // Since we don't expose a blocking interface this shouldn't happen,
78 // But just in case we put a floor at 0
79 total_size += std::max<ssize_t>(0, q.size());
84 size_t sizeGuess() const {
85 size_t total_size = 0;
86 for (auto& q : queues_) {
87 // MPMCQueue can have a negative size if there are pending readers.
88 // Since we don't expose a blocking interface this shouldn't happen,
89 // But just in case we put a floor at 0
90 total_size += std::max<ssize_t>(0, q.sizeGuess());
95 /// Returns true if there are no items available for dequeue
96 bool isEmpty() const {
101 std::vector<folly::MPMCQueue<T, Atom, Dynamic>> queues_;