2 * Copyright 2014 Facebook, Inc.
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
8 * http://www.apache.org/licenses/LICENSE-2.0
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
17 #ifndef FOLLY_GEN_PARALLEL_H_
18 #error This file may only be included from folly/gen/ParallelGen.h
21 #include <folly/MPMCQueue.h>
22 #include <folly/ScopeGuard.h>
23 #include <folly/experimental/EventCount.h>
33 class ClosableMPMCQueue {
35 std::atomic<size_t> producers_{0};
36 std::atomic<size_t> consumers_{0};
37 folly::EventCount wakeProducer_;
38 folly::EventCount wakeConsumer_;
41 explicit ClosableMPMCQueue(size_t capacity) : queue_(capacity) {}
43 ~ClosableMPMCQueue() {
48 void openProducer() { ++producers_; }
49 void openConsumer() { ++consumers_; }
51 void closeInputProducer() {
52 int64_t producers = producers_--;
54 if (producers == 1) { // last producer
55 wakeConsumer_.notifyAll();
59 void closeOutputConsumer() {
60 int64_t consumers = consumers_--;
62 if (consumers == 1) { // last consumer
63 wakeProducer_.notifyAll();
67 size_t producers() const {
68 return producers_.load(std::memory_order_acquire);
71 size_t consumers() const {
72 return consumers_.load(std::memory_order_acquire);
75 template <typename... Args>
76 bool writeUnlessFull(Args&&... args) noexcept {
77 if (queue_.write(std::forward<Args>(args)...)) {
78 // wake consumers to pick up new value
79 wakeConsumer_.notify();
85 template <typename... Args>
86 bool writeUnlessClosed(Args&&... args) {
87 // write if there's room
88 while (!queue_.writeIfNotFull(std::forward<Args>(args)...)) {
89 // if write fails, check if there are still consumers listening
90 auto key = wakeProducer_.prepareWait();
92 // no consumers left; bail out
93 wakeProducer_.cancelWait();
96 wakeProducer_.wait(key);
98 // wake consumers to pick up new value
99 wakeConsumer_.notify();
103 bool readUnlessEmpty(T& out) {
104 if (queue_.read(out)) {
105 // wake producers to fill empty space
106 wakeProducer_.notify();
112 bool readUnlessClosed(T& out) {
113 while (!queue_.readIfNotEmpty(out)) {
114 auto key = wakeConsumer_.prepareWait();
116 // wake producers to fill empty space
117 wakeProducer_.notify();
120 wakeConsumer_.wait(key);
122 // wake writers blocked by full queue
123 wakeProducer_.notify();
128 template <class Sink>
129 class Sub : public Operator<Sub<Sink>> {
133 explicit Sub(Sink sink) : sink_(sink) {}
135 template <class Value,
138 decltype(std::declval<Sink>().compose(std::declval<Source>())),
139 class Just = Just<typename std::decay<Result>::type>>
140 Just compose(const GenImpl<Value, Source>& source) const {
141 return Just(source | sink_);
146 class Parallel : public Operator<Parallel<Ops>> {
151 Parallel(Ops ops, size_t threads) : ops_(std::move(ops)), threads_(threads) {}
153 template <class Input,
155 class InputDecayed = typename std::decay<Input>::type,
157 decltype(std::declval<Ops>().compose(Empty<InputDecayed&&>())),
158 class Output = typename Composed::ValueType,
159 class OutputDecayed = typename std::decay<Output>::type>
160 class Generator : public GenImpl<OutputDecayed&&,
167 const Source source_;
169 const size_t threads_;
170 typedef ClosableMPMCQueue<InputDecayed> InQueue;
171 typedef ClosableMPMCQueue<OutputDecayed> OutQueue;
173 class Puller : public GenImpl<InputDecayed&&, Puller> {
177 explicit Puller(InQueue* queue) : queue_(queue) {}
179 template <class Handler>
180 bool apply(Handler&& handler) const {
182 while (queue_->readUnlessClosed(input)) {
183 if (!handler(std::move(input))) {
190 template <class Body>
191 void foreach(Body&& body) const {
193 while (queue_->readUnlessClosed(input)) {
194 body(std::move(input));
199 template <bool all = false>
200 class Pusher : public Operator<Pusher<all>> {
204 explicit Pusher(OutQueue* queue) : queue_(queue) {}
206 template <class Value, class InnerSource>
207 void compose(const GenImpl<Value, InnerSource>& source) const {
209 source.self().foreach([&](Value value) {
210 queue_->writeUnlessClosed(std::forward<Value>(value));
213 source.self().apply([&](Value value) {
214 return queue_->writeUnlessClosed(std::forward<Value>(value));
220 template <bool all = false>
226 std::vector<std::thread> workers_;
230 puller_ | *ops_ | pusher_;
234 Executor(size_t threads, const Ops* ops)
235 : inQueue_(threads * 4),
236 outQueue_(threads * 4),
240 inQueue_.openProducer();
241 outQueue_.openConsumer();
242 for (size_t t = 0; t < threads; ++t) {
243 inQueue_.openConsumer();
244 outQueue_.openProducer();
245 workers_.emplace_back([this] {
247 inQueue_.closeOutputConsumer();
248 outQueue_.closeInputProducer();
256 if (inQueue_.producers()) {
257 inQueue_.closeInputProducer();
259 if (outQueue_.consumers()) {
260 outQueue_.closeOutputConsumer();
262 while (!workers_.empty()) {
263 workers_.back().join();
266 CHECK(!inQueue_.consumers());
267 CHECK(!outQueue_.producers());
270 void closeInputProducer() { inQueue_.closeInputProducer(); }
272 void closeOutputConsumer() { outQueue_.closeOutputConsumer(); }
274 bool writeUnlessClosed(Input&& input) {
275 return inQueue_.writeUnlessClosed(std::forward<Input>(input));
278 bool writeUnlessFull(Input&& input) {
279 return inQueue_.writeUnlessFull(std::forward<Input>(input));
282 bool readUnlessClosed(OutputDecayed& output) {
283 return outQueue_.readUnlessClosed(output);
286 bool readUnlessEmpty(OutputDecayed& output) {
287 return outQueue_.readUnlessEmpty(output);
292 Generator(Source source, Ops ops, size_t threads)
293 : source_(std::move(source)),
294 ops_(std::move(ops)),
296 ?: std::max<size_t>(1, sysconf(_SC_NPROCESSORS_CONF))) {}
298 template <class Handler>
299 bool apply(Handler&& handler) const {
300 Executor<false> executor(threads_, &ops_);
302 source_.apply([&](Input input) {
303 if (executor.writeUnlessFull(std::forward<Input>(input))) {
306 OutputDecayed output;
307 while (executor.readUnlessEmpty(output)) {
308 if (!handler(std::move(output))) {
313 if (!executor.writeUnlessClosed(std::forward<Input>(input))) {
318 executor.closeInputProducer();
321 OutputDecayed output;
322 while (executor.readUnlessClosed(output)) {
323 if (!handler(std::move(output))) {
329 executor.closeOutputConsumer();
334 template <class Body>
335 void foreach(Body&& body) const {
336 Executor<true> executor(threads_, &ops_);
337 source_.foreach([&](Input input) {
338 if (executor.writeUnlessFull(std::forward<Input>(input))) {
341 OutputDecayed output;
342 while (executor.readUnlessEmpty(output)) {
343 body(std::move(output));
345 CHECK(executor.writeUnlessClosed(std::forward<Input>(input)));
347 executor.closeInputProducer();
349 OutputDecayed output;
350 while (executor.readUnlessClosed(output)) {
351 body(std::move(output));
353 executor.closeOutputConsumer();
357 template <class Value, class Source>
358 Generator<Value, Source> compose(const GenImpl<Value, Source>& source) const {
359 return Generator<Value, Source>(source.self(), ops_, threads_);
362 template <class Value, class Source>
363 Generator<Value, Source> compose(GenImpl<Value, Source>&& source) const {
364 return Generator<Value, Source>(std::move(source.self()), ops_, threads_);
369 * ChunkedRangeSource - For slicing up ranges into a sequence of chunks given a
370 * maximum chunk size.
372 * Usually used through the 'chunked' helper, like:
376 * | parallel // each thread processes a chunk
377 * | concat // but can still process values one at a time
381 template <class Iterator>
382 class ChunkedRangeSource
383 : public GenImpl<RangeSource<Iterator>&&, ChunkedRangeSource<Iterator>> {
385 Range<Iterator> range_;
388 ChunkedRangeSource() {}
389 ChunkedRangeSource(int chunkSize, Range<Iterator> range)
390 : chunkSize_(chunkSize), range_(std::move(range)) {}
392 template <class Handler>
393 bool apply(Handler&& handler) const {
394 auto remaining = range_;
395 while (!remaining.empty()) {
396 auto chunk = remaining.subpiece(0, chunkSize_);
397 remaining.advance(chunk.size());
398 auto gen = RangeSource<Iterator>(chunk);
399 if (!handler(std::move(gen))) {
407 } // namespace detail