2 * Copyright 2014-present Facebook, Inc.
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
8 * http://www.apache.org/licenses/LICENSE-2.0
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
17 #ifndef FOLLY_GEN_PARALLEL_H_
18 #error This file may only be included from folly/gen/ParallelGen.h
21 #include <folly/MPMCQueue.h>
22 #include <folly/ScopeGuard.h>
23 #include <folly/experimental/EventCount.h>
33 class ClosableMPMCQueue {
35 std::atomic<size_t> producers_{0};
36 std::atomic<size_t> consumers_{0};
37 folly::EventCount wakeProducer_;
38 folly::EventCount wakeConsumer_;
41 explicit ClosableMPMCQueue(size_t capacity) : queue_(capacity) {}
43 ~ClosableMPMCQueue() {
48 void openProducer() { ++producers_; }
49 void openConsumer() { ++consumers_; }
51 void closeInputProducer() {
52 size_t producers = producers_--;
54 if (producers == 1) { // last producer
55 wakeConsumer_.notifyAll();
59 void closeOutputConsumer() {
60 size_t consumers = consumers_--;
62 if (consumers == 1) { // last consumer
63 wakeProducer_.notifyAll();
67 size_t producers() const {
68 return producers_.load(std::memory_order_acquire);
71 size_t consumers() const {
72 return consumers_.load(std::memory_order_acquire);
75 template <typename... Args>
76 bool writeUnlessFull(Args&&... args) noexcept {
77 if (queue_.write(std::forward<Args>(args)...)) {
78 // wake consumers to pick up new value
79 wakeConsumer_.notify();
85 template <typename... Args>
86 bool writeUnlessClosed(Args&&... args) {
87 // write if there's room
88 while (!queue_.writeIfNotFull(std::forward<Args>(args)...)) {
89 // if write fails, check if there are still consumers listening
90 auto key = wakeProducer_.prepareWait();
92 // no consumers left; bail out
93 wakeProducer_.cancelWait();
96 wakeProducer_.wait(key);
98 // wake consumers to pick up new value
99 wakeConsumer_.notify();
103 bool readUnlessEmpty(T& out) {
104 if (queue_.read(out)) {
105 // wake producers to fill empty space
106 wakeProducer_.notify();
112 bool readUnlessClosed(T& out) {
113 while (!queue_.readIfNotEmpty(out)) {
114 auto key = wakeConsumer_.prepareWait();
116 // wake producers to fill empty space
117 wakeProducer_.notify();
120 wakeConsumer_.wait(key);
122 // wake writers blocked by full queue
123 wakeProducer_.notify();
128 template <class Sink>
129 class Sub : public Operator<Sub<Sink>> {
133 explicit Sub(Sink sink) : sink_(sink) {}
139 decltype(std::declval<Sink>().compose(std::declval<Source>())),
140 class Just = SingleCopy<typename std::decay<Result>::type>>
141 Just compose(const GenImpl<Value, Source>& source) const {
142 return Just(source | sink_);
147 class Parallel : public Operator<Parallel<Ops>> {
152 Parallel(Ops ops, size_t threads) : ops_(std::move(ops)), threads_(threads) {}
157 class InputDecayed = typename std::decay<Input>::type,
159 decltype(std::declval<Ops>().compose(Empty<InputDecayed&&>())),
160 class Output = typename Composed::ValueType,
161 class OutputDecayed = typename std::decay<Output>::type>
162 class Generator : public GenImpl<OutputDecayed&&,
169 const Source source_;
171 const size_t threads_;
172 typedef ClosableMPMCQueue<InputDecayed> InQueue;
173 typedef ClosableMPMCQueue<OutputDecayed> OutQueue;
175 class Puller : public GenImpl<InputDecayed&&, Puller> {
179 explicit Puller(InQueue* queue) : queue_(queue) {}
181 template <class Handler>
182 bool apply(Handler&& handler) const {
184 while (queue_->readUnlessClosed(input)) {
185 if (!handler(std::move(input))) {
192 template <class Body>
193 void foreach(Body&& body) const {
195 while (queue_->readUnlessClosed(input)) {
196 body(std::move(input));
201 template <bool all = false>
202 class Pusher : public Operator<Pusher<all>> {
206 explicit Pusher(OutQueue* queue) : queue_(queue) {}
208 template <class Value, class InnerSource>
209 void compose(const GenImpl<Value, InnerSource>& source) const {
211 source.self().foreach([&](Value value) {
212 queue_->writeUnlessClosed(std::forward<Value>(value));
215 source.self().apply([&](Value value) {
216 return queue_->writeUnlessClosed(std::forward<Value>(value));
222 template <bool all = false>
228 std::vector<std::thread> workers_;
232 puller_ | *ops_ | pusher_;
236 Executor(size_t threads, const Ops* ops)
237 : inQueue_(threads * 4),
238 outQueue_(threads * 4),
242 inQueue_.openProducer();
243 outQueue_.openConsumer();
244 for (size_t t = 0; t < threads; ++t) {
245 inQueue_.openConsumer();
246 outQueue_.openProducer();
247 workers_.emplace_back([this] {
249 inQueue_.closeOutputConsumer();
250 outQueue_.closeInputProducer();
258 if (inQueue_.producers()) {
259 inQueue_.closeInputProducer();
261 if (outQueue_.consumers()) {
262 outQueue_.closeOutputConsumer();
264 while (!workers_.empty()) {
265 workers_.back().join();
268 CHECK(!inQueue_.consumers());
269 CHECK(!outQueue_.producers());
272 void closeInputProducer() { inQueue_.closeInputProducer(); }
274 void closeOutputConsumer() { outQueue_.closeOutputConsumer(); }
276 bool writeUnlessClosed(Input&& input) {
277 return inQueue_.writeUnlessClosed(std::forward<Input>(input));
280 bool writeUnlessFull(Input&& input) {
281 return inQueue_.writeUnlessFull(std::forward<Input>(input));
284 bool readUnlessClosed(OutputDecayed& output) {
285 return outQueue_.readUnlessClosed(output);
288 bool readUnlessEmpty(OutputDecayed& output) {
289 return outQueue_.readUnlessEmpty(output);
294 Generator(Source source, Ops ops, size_t threads)
295 : source_(std::move(source)),
296 ops_(std::move(ops)),
300 : size_t(std::max<long>(1, sysconf(_SC_NPROCESSORS_CONF)))) {}
302 template <class Handler>
303 bool apply(Handler&& handler) const {
304 Executor<false> executor(threads_, &ops_);
306 source_.apply([&](Input input) {
307 if (executor.writeUnlessFull(std::forward<Input>(input))) {
310 OutputDecayed output;
311 while (executor.readUnlessEmpty(output)) {
312 if (!handler(std::move(output))) {
317 if (!executor.writeUnlessClosed(std::forward<Input>(input))) {
322 executor.closeInputProducer();
325 OutputDecayed output;
326 while (executor.readUnlessClosed(output)) {
327 if (!handler(std::move(output))) {
333 executor.closeOutputConsumer();
338 template <class Body>
339 void foreach(Body&& body) const {
340 Executor<true> executor(threads_, &ops_);
341 source_.foreach([&](Input input) {
342 if (executor.writeUnlessFull(std::forward<Input>(input))) {
345 OutputDecayed output;
346 while (executor.readUnlessEmpty(output)) {
347 body(std::move(output));
349 CHECK(executor.writeUnlessClosed(std::forward<Input>(input)));
351 executor.closeInputProducer();
353 OutputDecayed output;
354 while (executor.readUnlessClosed(output)) {
355 body(std::move(output));
357 executor.closeOutputConsumer();
361 template <class Value, class Source>
362 Generator<Value, Source> compose(const GenImpl<Value, Source>& source) const {
363 return Generator<Value, Source>(source.self(), ops_, threads_);
366 template <class Value, class Source>
367 Generator<Value, Source> compose(GenImpl<Value, Source>&& source) const {
368 return Generator<Value, Source>(std::move(source.self()), ops_, threads_);
373 * ChunkedRangeSource - For slicing up ranges into a sequence of chunks given a
374 * maximum chunk size.
376 * Usually used through the 'chunked' helper, like:
380 * | parallel // each thread processes a chunk
381 * | concat // but can still process values one at a time
385 template <class Iterator>
386 class ChunkedRangeSource
387 : public GenImpl<RangeSource<Iterator>&&, ChunkedRangeSource<Iterator>> {
389 Range<Iterator> range_;
392 ChunkedRangeSource() = default;
393 ChunkedRangeSource(int chunkSize, Range<Iterator> range)
394 : chunkSize_(chunkSize), range_(std::move(range)) {}
396 template <class Handler>
397 bool apply(Handler&& handler) const {
398 auto remaining = range_;
399 while (!remaining.empty()) {
400 auto chunk = remaining.subpiece(0, chunkSize_);
401 remaining.advance(chunk.size());
402 auto gen = RangeSource<Iterator>(chunk);
403 if (!handler(std::move(gen))) {
411 } // namespace detail