2 * Copyright 2014 Facebook, Inc.
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
8 * http://www.apache.org/licenses/LICENSE-2.0
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
17 #ifndef FOLLY_GEN_PARALLELMAP_H
18 #error This file may only be included from folly/gen/ParallelMap.h
24 #include <type_traits>
28 #include "folly/MPMCPipeline.h"
29 #include "folly/experimental/EventCount.h"
31 namespace folly { namespace gen { namespace detail {
34 * PMap - Map in parallel (using threads). For producing a sequence of
35 * values by passing each value from a source collection through a
36 * predicate while running the predicate in parallel in different
39 * This type is usually used through the 'pmap' helper function:
41 * auto squares = seq(1, 10) | pmap(4, fibonacci) | sum;
43 template<class Predicate>
44 class PMap : public Operator<PMap<Predicate>> {
50 PMap(Predicate pred, size_t nThreads)
51 : pred_(std::move(pred)),
52 nThreads_(nThreads) { }
56 class Input = typename std::decay<Value>::type,
57 class Output = typename std::decay<
58 typename std::result_of<Predicate(Value)>::type
61 public GenImpl<Output, Generator<Value, Source, Input, Output>> {
64 const size_t nThreads_;
66 class ExecutionPipeline {
67 std::vector<std::thread> workers_;
68 std::atomic<bool> done_{false};
69 const Predicate& pred_;
70 MPMCPipeline<Input, Output> pipeline_;
74 ExecutionPipeline(const Predicate& pred, size_t nThreads)
76 pipeline_(nThreads, nThreads) {
77 workers_.reserve(nThreads);
78 for (int i = 0; i < nThreads; i++) {
79 workers_.push_back(std::thread([this] { this->predApplier(); }));
83 ~ExecutionPipeline() {
84 assert(pipeline_.sizeGuess() == 0);
86 for (auto& w : workers_) { w.join(); }
90 // prevent workers from consuming more than we produce.
91 done_.store(true, std::memory_order_release);
95 bool write(Value&& value) {
96 bool wrote = pipeline_.write(std::forward<Value>(value));
103 void blockingWrite(Value&& value) {
104 pipeline_.blockingWrite(std::forward<Value>(value));
108 bool read(Output& out) {
109 return pipeline_.read(out);
112 void blockingRead(Output& out) {
113 pipeline_.blockingRead(out);
118 // Each thread takes a value from the pipeline_, runs the
119 // predicate and enqueues the result. The pipeline preserves
120 // ordering. NOTE: don't use blockingReadStage<0> to read from
121 // the pipeline_ as there may not be any: end-of-data is signaled
122 // separately using done_/wake_.
125 auto key = wake_.prepareWait();
127 typename MPMCPipeline<Input, Output>::template Ticket<0> ticket;
128 if (pipeline_.template readStage<0>(ticket, in)) {
130 Output out = pred_(std::move(in));
131 pipeline_.template blockingWriteStage<0>(ticket,
136 if (done_.load(std::memory_order_acquire)) {
141 // Not done_, but no items in the queue.
148 Generator(Source source, const Predicate& pred, size_t nThreads)
149 : source_(std::move(source)),
151 nThreads_(nThreads ?: sysconf(_SC_NPROCESSORS_ONLN)) {
155 void foreach(Body&& body) const {
156 ExecutionPipeline pipeline(pred_, nThreads_);
160 source_.foreach([&](Value value) {
161 if (pipeline.write(std::forward<Value>(value))) {
162 // input queue not yet full, saturate it before we process
163 // anything downstream
168 // input queue full; drain ready items from the queue
170 while (pipeline.read(out)) {
172 body(std::move(out));
175 // write the value we were going to write before we made room.
176 pipeline.blockingWrite(std::forward<Value>(value));
182 // flush the output queue
183 while (read < wrote) {
185 pipeline.blockingRead(out);
187 body(std::move(out));
191 template<class Handler>
192 bool apply(Handler&& handler) const {
193 ExecutionPipeline pipeline(pred_, nThreads_);
198 source_.apply([&](Value value) {
199 if (pipeline.write(std::forward<Value>(value))) {
200 // input queue not yet full, saturate it before we process
201 // anything downstream
206 // input queue full; drain ready items from the queue
208 while (pipeline.read(out)) {
210 if (!handler(std::move(out))) {
216 // write the value we were going to write before we made room.
217 pipeline.blockingWrite(std::forward<Value>(value));
224 // flush the output queue
225 while (read < wrote) {
227 pipeline.blockingRead(out);
230 more = more && handler(std::move(out));
236 static constexpr bool infinite = Source::infinite;
239 template<class Source,
241 class Gen = Generator<Value, Source>>
242 Gen compose(GenImpl<Value, Source>&& source) const {
243 return Gen(std::move(source.self()), pred_, nThreads_);
246 template<class Source,
248 class Gen = Generator<Value, Source>>
249 Gen compose(const GenImpl<Value, Source>& source) const {
250 return Gen(source.self(), pred_, nThreads_);