2 * Copyright 2015 Facebook, Inc.
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
8 * http://www.apache.org/licenses/LICENSE-2.0
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
18 #include <folly/wangle/concurrent/BlockingQueue.h>
19 #include <folly/LifoSem.h>
20 #include <folly/MPMCQueue.h>
22 namespace folly { namespace wangle {
25 class PriorityLifoSemMPMCQueue : public BlockingQueue<T> {
27 explicit PriorityLifoSemMPMCQueue(uint8_t numPriorities, size_t capacity) {
28 queues_.reserve(numPriorities);
29 for (int8_t i = 0; i < numPriorities; i++) {
30 queues_.push_back(MPMCQueue<T>(capacity));
34 uint8_t getNumPriorities() override {
35 return queues_.size();
38 // Add at medium priority by default
39 void add(T item) override {
40 addWithPriority(std::move(item), Executor::MID_PRI);
43 void addWithPriority(T item, int8_t priority) override {
44 int mid = getNumPriorities() / 2;
45 size_t queue = priority < 0 ?
46 std::max(0, mid + priority) :
47 std::min(getNumPriorities() - 1, mid + priority);
48 CHECK(queue < queues_.size());
49 if (!queues_[queue].write(std::move(item))) {
50 throw std::runtime_error("LifoSemMPMCQueue full, can't add item");
58 for (auto it = queues_.rbegin(); it != queues_.rend(); it++) {
67 size_t size() override {
69 for (auto& q : queues_) {
77 std::vector<MPMCQueue<T>> queues_;