2 * Copyright 2017 Facebook, Inc.
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
8 * http://www.apache.org/licenses/LICENSE-2.0
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
25 #include <folly/detail/Futex.h>
26 #include <folly/experimental/flat_combining/FlatCombining.h>
27 #include <glog/logging.h>
31 /// Thread-safe priority queue based on flat combining. If the
32 /// constructor parameter maxSize is greater than 0 (default = 0),
33 /// then the queue is bounded. This template provides blocking,
34 /// non-blocking, and timed variants of each of push(), pop(), and
35 /// peek() operations. The empty() and size() functions are inherently
38 /// PriorityQueue must support the interface of std::priority_queue,
39 /// specifically empty(), size(), push(), top(), and pop(). Mutex
40 /// must meet the standard Lockable requirements.
42 /// By default FlatCombining uses a dedicated combiner thread, which
43 /// yields better latency and throughput under high contention but
44 /// higher overheads under low contention. If the constructor
45 /// parameter dedicated is false, then there will be no dedicated
46 /// combiner thread and any requester may do combining of operations
47 /// requested by other threads. For more details see the comments for
52 /// FlatCombiningPriorityQueue<int> pq(1);
53 /// CHECK(pq.empty());
54 /// CHECK(pq.size() == 0);
56 /// CHECK(!tryPop(v));
57 /// CHECK(!tryPop(v, now() + seconds(1)));
58 /// CHECK(!tryPeek(v));
59 /// CHECK(!tryPeek(v, now() + seconds(1)));
61 /// CHECK(!pq.empty());
62 /// CHECK(pq.size() == 1);
63 /// CHECK(!pq.tryPush(20));
64 /// CHECK(!pq.tryPush(20), now() + seconds(1)));
67 /// CHECK(pq.size() == 1);
70 /// CHECK(pq.empty());
75 typename PriorityQueue = std::priority_queue<T>,
76 typename Mutex = std::mutex,
77 template <typename> class Atom = std::atomic>
78 class FlatCombiningPriorityQueue
79 : public folly::FlatCombining<
80 FlatCombiningPriorityQueue<T, PriorityQueue, Mutex, Atom>,
83 using FCPQ = FlatCombiningPriorityQueue<T, PriorityQueue, Mutex, Atom>;
84 using FC = folly::FlatCombining<FCPQ, Mutex, Atom>;
89 typename = decltype(PriorityQueue(std::declval<PQArgs>()...))>
90 explicit FlatCombiningPriorityQueue(
91 // Concurrent priority queue parameter
92 const size_t maxSize = 0,
93 // Flat combining parameters
94 const bool dedicated = true,
95 const uint32_t numRecs = 0,
96 const uint32_t maxOps = 0,
97 // (Sequential) PriorityQueue Parameters
99 : FC(dedicated, numRecs, maxOps),
101 pq_(std::forward<PQArgs>(args)...) {}
103 /// Returns true iff the priority queue is empty
106 auto fn = [&] { res = pq_.empty(); };
107 const_cast<FCPQ*>(this)->requestFC(fn);
111 /// Returns the number of items in the priority queue
112 size_t size() const {
114 auto fn = [&] { res = pq_.size(); };
115 const_cast<FCPQ*>(this)->requestFC(fn);
119 /// Non-blocking push. Succeeds if there is space in the priority
120 /// queue to insert the new item. Tries once if no time point is
121 /// provided or until the provided time_point is reached. If
122 /// successful, inserts the provided item in the priority queue
123 /// according to its priority.
124 template <class Clock = std::chrono::steady_clock>
127 const std::chrono::time_point<Clock>& when =
128 std::chrono::time_point<Clock>::min());
130 /// Non-blocking pop. Succeeds if the priority queue is
131 /// nonempty. Tries once if no time point is provided or until the
132 /// provided time_point is reached. If successful, copies the
133 /// highest priority item and removes it from the priority queue.
134 template <class Clock = std::chrono::steady_clock>
137 const std::chrono::time_point<Clock>& when =
138 std::chrono::time_point<Clock>::min());
140 /// Non-blocking peek. Succeeds if the priority queue is
141 /// nonempty. Tries once if no time point is provided or until the
142 /// provided time_point is reached. If successful, copies the
143 /// highest priority item without removing it.
144 template <class Clock = std::chrono::steady_clock>
147 const std::chrono::time_point<Clock>& when =
148 std::chrono::time_point<Clock>::min());
150 /// Blocking push. Inserts the provided item in the priority
151 /// queue. If it is full, this function blocks until there is space
152 /// for the new item.
153 void push(const T& val) {
154 tryPush(val, std::chrono::time_point<std::chrono::steady_clock>::max());
157 /// Blocking pop. Copies the highest priority item and removes
158 /// it. If the priority queue is empty, this function blocks until
161 tryPop(val, std::chrono::time_point<std::chrono::steady_clock>::max());
164 /// Blocking peek. Copies the highest priority item without
165 /// removing it. If the priority queue is empty, this function
166 /// blocks until it is nonempty.
168 tryPeek(val, std::chrono::time_point<std::chrono::steady_clock>::max());
174 detail::Futex<Atom> empty_;
175 detail::Futex<Atom> full_;
177 bool isTrue(detail::Futex<Atom>& futex) {
178 return futex.load(std::memory_order_relaxed) != 0;
181 void setFutex(detail::Futex<Atom>& futex, uint32_t val) {
182 futex.store(val, std::memory_order_relaxed);
185 bool futexSignal(detail::Futex<Atom>& futex) {
199 typename PriorityQueue,
201 template <typename> class Atom>
202 template <class Clock>
203 inline bool FlatCombiningPriorityQueue<T, PriorityQueue, Mutex, Atom>::tryPush(
205 const std::chrono::time_point<Clock>& when) {
211 if (maxSize_ > 0 && pq_.size() == maxSize_) {
216 DCHECK(maxSize_ == 0 || pq_.size() < maxSize_);
219 wake = futexSignal(empty_);
222 } catch (const std::bad_alloc&) {
236 if (when == std::chrono::time_point<Clock>::min()) {
239 while (isTrue(full_)) {
240 if (when == std::chrono::time_point<Clock>::max()) {
243 if (Clock::now() > when) {
246 full_.futexWaitUntil(1, when);
249 } // inner while loop
250 } // outer while loop
255 typename PriorityQueue,
257 template <typename> class Atom>
258 template <class Clock>
259 inline bool FlatCombiningPriorityQueue<T, PriorityQueue, Mutex, Atom>::tryPop(
261 const std::chrono::time_point<Clock>& when) {
271 wake = futexSignal(full_);
284 while (isTrue(empty_)) {
285 if (when == std::chrono::time_point<Clock>::max()) {
288 if (Clock::now() > when) {
291 empty_.futexWaitUntil(1, when);
294 } // inner while loop
295 } // outer while loop
300 typename PriorityQueue,
302 template <typename> class Atom>
303 template <class Clock>
304 inline bool FlatCombiningPriorityQueue<T, PriorityQueue, Mutex, Atom>::tryPeek(
306 const std::chrono::time_point<Clock>& when) {
323 while (isTrue(empty_)) {
324 if (when == std::chrono::time_point<Clock>::max()) {
327 if (Clock::now() > when) {
330 empty_.futexWaitUntil(1, when);
333 } // inner while loop
334 } // outer while loop