2 * Copyright 2017 Facebook, Inc.
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
8 * http://www.apache.org/licenses/LICENSE-2.0
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
17 #ifndef FOLLY_GEN_PARALLELMAP_H_
18 #error This file may only be included from folly/gen/ParallelMap.h
24 #include <type_traits>
28 #include <folly/MPMCPipeline.h>
29 #include <folly/experimental/EventCount.h>
36 * PMap - Map in parallel (using threads). For producing a sequence of
37 * values by passing each value from a source collection through a
38 * predicate while running the predicate in parallel in different
41 * This type is usually used through the 'pmap' helper function:
43 * auto squares = seq(1, 10) | pmap(fibonacci, 4) | sum;
45 template <class Predicate>
46 class PMap : public Operator<PMap<Predicate>> {
52 PMap(Predicate pred, size_t nThreads)
53 : pred_(std::move(pred)),
54 nThreads_(nThreads) { }
59 class Input = typename std::decay<Value>::type,
60 class Output = typename std::decay<
61 typename std::result_of<Predicate(Value)>::type>::type>
63 public GenImpl<Output, Generator<Value, Source, Input, Output>> {
66 const size_t nThreads_;
68 class ExecutionPipeline {
69 std::vector<std::thread> workers_;
70 std::atomic<bool> done_{false};
71 const Predicate& pred_;
72 MPMCPipeline<Input, Output> pipeline_;
76 ExecutionPipeline(const Predicate& pred, size_t nThreads)
78 pipeline_(nThreads, nThreads) {
79 workers_.reserve(nThreads);
80 for (size_t i = 0; i < nThreads; i++) {
81 workers_.push_back(std::thread([this] { this->predApplier(); }));
85 ~ExecutionPipeline() {
86 assert(pipeline_.sizeGuess() == 0);
88 for (auto& w : workers_) { w.join(); }
92 // prevent workers from consuming more than we produce.
93 done_.store(true, std::memory_order_release);
97 bool write(Value&& value) {
98 bool wrote = pipeline_.write(std::forward<Value>(value));
105 void blockingWrite(Value&& value) {
106 pipeline_.blockingWrite(std::forward<Value>(value));
110 bool read(Output& out) {
111 return pipeline_.read(out);
114 void blockingRead(Output& out) {
115 pipeline_.blockingRead(out);
120 // Each thread takes a value from the pipeline_, runs the
121 // predicate and enqueues the result. The pipeline preserves
122 // ordering. NOTE: don't use blockingReadStage<0> to read from
123 // the pipeline_ as there may not be any: end-of-data is signaled
124 // separately using done_/wake_.
127 auto key = wake_.prepareWait();
129 typename MPMCPipeline<Input, Output>::template Ticket<0> ticket;
130 if (pipeline_.template readStage<0>(ticket, in)) {
132 Output out = pred_(std::move(in));
133 pipeline_.template blockingWriteStage<0>(ticket,
138 if (done_.load(std::memory_order_acquire)) {
143 // Not done_, but no items in the queue.
150 Generator(Source source, const Predicate& pred, size_t nThreads)
151 : source_(std::move(source)),
153 nThreads_(nThreads ? nThreads : sysconf(_SC_NPROCESSORS_ONLN)) {
156 template <class Body>
157 void foreach(Body&& body) const {
158 ExecutionPipeline pipeline(pred_, nThreads_);
162 source_.foreach([&](Value value) {
163 if (pipeline.write(std::forward<Value>(value))) {
164 // input queue not yet full, saturate it before we process
165 // anything downstream
170 // input queue full; drain ready items from the queue
172 while (pipeline.read(out)) {
174 body(std::move(out));
177 // write the value we were going to write before we made room.
178 pipeline.blockingWrite(std::forward<Value>(value));
184 // flush the output queue
185 while (read < wrote) {
187 pipeline.blockingRead(out);
189 body(std::move(out));
193 template <class Handler>
194 bool apply(Handler&& handler) const {
195 ExecutionPipeline pipeline(pred_, nThreads_);
200 source_.apply([&](Value value) {
201 if (pipeline.write(std::forward<Value>(value))) {
202 // input queue not yet full, saturate it before we process
203 // anything downstream
208 // input queue full; drain ready items from the queue
210 while (pipeline.read(out)) {
212 if (!handler(std::move(out))) {
218 // write the value we were going to write before we made room.
219 pipeline.blockingWrite(std::forward<Value>(value));
226 // flush the output queue
227 while (read < wrote) {
229 pipeline.blockingRead(out);
232 more = more && handler(std::move(out));
238 static constexpr bool infinite = Source::infinite;
241 template <class Source, class Value, class Gen = Generator<Value, Source>>
242 Gen compose(GenImpl<Value, Source>&& source) const {
243 return Gen(std::move(source.self()), pred_, nThreads_);
246 template <class Source, class Value, class Gen = Generator<Value, Source>>
247 Gen compose(const GenImpl<Value, Source>& source) const {
248 return Gen(source.self(), pred_, nThreads_);
251 } // namespace detail