}
}
- /* implicit */ Optional(const None&)
+ /* implicit */ Optional(const None&) noexcept
: hasValue_(false) {
}
- /* implicit */ Optional(Value&& newValue) {
+ /* implicit */ Optional(Value&& newValue)
+ noexcept(std::is_nothrow_move_constructible<Value>::value) {
construct(std::move(newValue));
}
- /* implicit */ Optional(const Value& newValue) {
+ /* implicit */ Optional(const Value& newValue)
+ noexcept(std::is_nothrow_copy_constructible<Value>::value) {
construct(newValue);
}
--- /dev/null
+/*
+ * Copyright 2014 Facebook, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef FOLLY_GEN_PARALLELMAP_H
+#error This file may only be included from folly/gen/ParallelMap.h
+#endif
+
+#include <atomic>
+#include <cassert>
+#include <thread>
+#include <type_traits>
+#include <utility>
+#include <vector>
+
+#include "folly/MPMCPipeline.h"
+#include "folly/experimental/EventCount.h"
+
+namespace folly { namespace gen { namespace detail {
+
+/**
+ * PMap - Map in parallel (using threads). For producing a sequence of
+ * values by passing each value from a source collection through a
+ * predicate while running the predicate in parallel in different
+ * threads.
+ *
+ * This type is usually used through the 'pmap' helper function:
+ *
+ * auto squares = seq(1, 10) | pmap(4, fibonacci) | sum;
+ */
+template<class Predicate>
+class PMap : public Operator<PMap<Predicate>> {
+ Predicate pred_;
+ size_t nThreads_;
+ public:
+ PMap() {}
+
+ PMap(Predicate pred, size_t nThreads)
+ : pred_(std::move(pred)),
+ nThreads_(nThreads) { }
+
+ template<class Value,
+ class Source,
+ class Input = typename std::decay<Value>::type,
+ class Output = typename std::decay<
+ typename std::result_of<Predicate(Value)>::type
+ >::type>
+ class Generator :
+ public GenImpl<Output, Generator<Value, Source, Input, Output>> {
+ Source source_;
+ Predicate pred_;
+ const size_t nThreads_;
+
+ class ExecutionPipeline {
+ std::vector<std::thread> workers_;
+ std::atomic<bool> done_{false};
+ const Predicate& pred_;
+ MPMCPipeline<Input, Output> pipeline_;
+ EventCount wake_;
+
+ public:
+ ExecutionPipeline(const Predicate& pred, size_t nThreads)
+ : pred_(pred),
+ pipeline_(nThreads, nThreads) {
+ workers_.reserve(nThreads);
+ for (int i = 0; i < nThreads; i++) {
+ workers_.push_back(std::thread([this] { this->predApplier(); }));
+ }
+ }
+
+ ~ExecutionPipeline() {
+ assert(pipeline_.sizeGuess() == 0);
+ assert(done_.load());
+ for (auto& w : workers_) { w.join(); }
+ }
+
+ void stop() {
+ // prevent workers from consuming more than we produce.
+ done_.store(true, std::memory_order_release);
+ wake_.notifyAll();
+ }
+
+ bool write(Value&& value) {
+ bool wrote = pipeline_.write(std::forward<Value>(value));
+ if (wrote) {
+ wake_.notify();
+ }
+ return wrote;
+ }
+
+ void blockingWrite(Value&& value) {
+ pipeline_.blockingWrite(std::forward<Value>(value));
+ wake_.notify();
+ }
+
+ bool read(Output& out) {
+ return pipeline_.read(out);
+ }
+
+ void blockingRead(Output& out) {
+ pipeline_.blockingRead(out);
+ }
+
+ private:
+ void predApplier() {
+ // Each thread takes a value from the pipeline_, runs the
+ // predicate and enqueues the result. The pipeline preserves
+ // ordering. NOTE: don't use blockingReadStage<0> to read from
+ // the pipeline_ as there may not be any: end-of-data is signaled
+ // separately using done_/wake_.
+ Input in;
+ for (;;) {
+ auto key = wake_.prepareWait();
+
+ typename MPMCPipeline<Input, Output>::template Ticket<0> ticket;
+ if (pipeline_.template readStage<0>(ticket, in)) {
+ wake_.cancelWait();
+ Output out = pred_(std::move(in));
+ pipeline_.template blockingWriteStage<0>(ticket,
+ std::move(out));
+ continue;
+ }
+
+ if (done_.load(std::memory_order_acquire)) {
+ wake_.cancelWait();
+ break;
+ }
+
+ // Not done_, but no items in the queue.
+ wake_.wait(key);
+ }
+ }
+ };
+
+ public:
+ Generator(Source source, const Predicate& pred, size_t nThreads)
+ : source_(std::move(source)),
+ pred_(pred),
+ nThreads_(nThreads ?: sysconf(_SC_NPROCESSORS_ONLN)) {
+ }
+
+ template<class Body>
+ void foreach(Body&& body) const {
+ ExecutionPipeline pipeline(pred_, nThreads_);
+
+ size_t wrote = 0;
+ size_t read = 0;
+ source_.foreach([&](Value value) {
+ if (pipeline.write(std::forward<Value>(value))) {
+ // input queue not yet full, saturate it before we process
+ // anything downstream
+ ++wrote;
+ return;
+ }
+
+ // input queue full; drain ready items from the queue
+ Output out;
+ while (pipeline.read(out)) {
+ ++read;
+ body(std::move(out));
+ }
+
+ // write the value we were going to write before we made room.
+ pipeline.blockingWrite(std::forward<Value>(value));
+ ++wrote;
+ });
+
+ pipeline.stop();
+
+ // flush the output queue
+ while (read < wrote) {
+ Output out;
+ pipeline.blockingRead(out);
+ ++read;
+ body(std::move(out));
+ }
+ }
+
+ template<class Handler>
+ bool apply(Handler&& handler) const {
+ ExecutionPipeline pipeline(pred_, nThreads_);
+
+ size_t wrote = 0;
+ size_t read = 0;
+ bool more = true;
+ source_.apply([&](Value value) {
+ if (pipeline.write(std::forward<Value>(value))) {
+ // input queue not yet full, saturate it before we process
+ // anything downstream
+ ++wrote;
+ return true;
+ }
+
+ // input queue full; drain ready items from the queue
+ Output out;
+ while (pipeline.read(out)) {
+ ++read;
+ if (!handler(std::move(out))) {
+ more = false;
+ return false;
+ }
+ }
+
+ // write the value we were going to write before we made room.
+ pipeline.blockingWrite(std::forward<Value>(value));
+ ++wrote;
+ return true;
+ });
+
+ pipeline.stop();
+
+ // flush the output queue
+ while (read < wrote) {
+ Output out;
+ pipeline.blockingRead(out);
+ ++read;
+ if (more) {
+ more = more && handler(std::move(out));
+ }
+ }
+ return more;
+ }
+
+ static constexpr bool infinite = Source::infinite;
+ };
+
+ template<class Source,
+ class Value,
+ class Gen = Generator<Value, Source>>
+ Gen compose(GenImpl<Value, Source>&& source) const {
+ return Gen(std::move(source.self()), pred_, nThreads_);
+ }
+
+ template<class Source,
+ class Value,
+ class Gen = Generator<Value, Source>>
+ Gen compose(const GenImpl<Value, Source>& source) const {
+ return Gen(source.self(), pred_, nThreads_);
+ }
+};
+
+}}} // namespaces
--- /dev/null
+/*
+ * Copyright 2014 Facebook, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef FOLLY_GEN_PARALLELMAP_H
+#define FOLLY_GEN_PARALLELMAP_H
+
+#include "folly/gen/Core.h"
+
+namespace folly { namespace gen {
+
+namespace detail {
+
+template<class Predicate>
+class PMap;
+
+} // namespace detail
+
+/**
+ * Run `pred` in parallel in nThreads. Results are returned in the
+ * same order in which they were retrieved from the source generator
+ * (similar to map).
+ *
+ * NOTE: Only `pred` is run from separate threads; the source
+ * generator and the rest of the pipeline is executed in the
+ * caller thread.
+ */
+template<class Predicate,
+ class PMap = detail::PMap<Predicate>>
+ PMap pmap(Predicate pred = Predicate(), size_t nThreads = 0) {
+ return PMap(std::move(pred), nThreads);
+}
+
+}} // namespaces
+
+#include "folly/gen/ParallelMap-inl.h"
+
+#endif // FOLLY_GEN_PARALLELMAP_H
--- /dev/null
+/*
+ * Copyright 2014 Facebook, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include <unistd.h>
+#include <atomic>
+#include <algorithm>
+#include <thread>
+#include <vector>
+
+#include "folly/Benchmark.h"
+#include "folly/gen/Base.h"
+#include "folly/gen/ParallelMap.h"
+
+using namespace folly::gen;
+
+DEFINE_int32(threads,
+ std::max(1, (int32_t) sysconf(_SC_NPROCESSORS_CONF) / 2),
+ "Num threads.");
+
+constexpr int kFib = 35; // unit of work
+size_t fib(int n) { return n <= 1 ? 1 : fib(n-1) * fib(n-2); }
+
+BENCHMARK(FibSumMap, n) {
+ auto result =
+ seq(1, (int) n)
+ | map([](int) { return fib(kFib); })
+ | sum;
+ folly::doNotOptimizeAway(result);
+}
+
+BENCHMARK_RELATIVE(FibSumPmap, n) {
+ // Schedule more work: enough so that each worker thread does the
+ // same amount as one FibSumMap.
+ const size_t kNumThreads = FLAGS_threads;
+ auto result =
+ seq(1, (int) (n * kNumThreads))
+ | pmap([](int) { return fib(kFib); }, kNumThreads)
+ | sum;
+ folly::doNotOptimizeAway(result);
+}
+
+BENCHMARK_RELATIVE(FibSumThreads, n) {
+ // Schedule kNumThreads to execute the same code as FibSumMap.
+ const size_t kNumThreads = FLAGS_threads;
+ std::vector<std::thread> workers;
+ workers.reserve(kNumThreads);
+ auto fn = [n] {
+ auto result =
+ seq(1, (int) n)
+ | map([](int) { return fib(kFib); })
+ | sum;
+ folly::doNotOptimizeAway(result);
+ };
+ for (int i = 0; i < kNumThreads; i++) {
+ workers.push_back(std::thread(fn));
+ }
+ for (auto& w : workers) { w.join(); }
+}
+
+/*
+ ============================================================================
+ folly/gen/test/ParallelMapBenchmark.cpp relative time/iter iters/s
+ ============================================================================
+ FibSumMap 41.64ms 24.02
+ FibSumPmap 98.38% 42.32ms 23.63
+ FibSumThreads 94.48% 44.07ms 22.69
+ ============================================================================
+
+ real0m15.595s
+ user2m47.100s
+ sys0m0.016s
+*/
+
+int main(int argc, char *argv[]) {
+ google::ParseCommandLineFlags(&argc, &argv, true);
+ folly::runBenchmarks();
+ return 0;
+}
--- /dev/null
+/*
+ * Copyright 2014 Facebook, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include <vector>
+
+#include <glog/logging.h>
+#include <gtest/gtest.h>
+
+#include "folly/Memory.h"
+#include "folly/gen/Base.h"
+#include "folly/gen/ParallelMap.h"
+
+using namespace folly;
+using namespace folly::gen;
+
+TEST(Pmap, InfiniteEquivalent) {
+ // apply
+ {
+ auto mapResult
+ = seq(1)
+ | map([](int x) { return x * x; })
+ | until([](int x) { return x > 1000 * 1000; })
+ | as<std::vector<int>>();
+
+ auto pmapResult
+ = seq(1)
+ | pmap([](int x) { return x * x; }, 4)
+ | until([](int x) { return x > 1000 * 1000; })
+ | as<std::vector<int>>();
+
+ EXPECT_EQ(pmapResult, mapResult);
+ }
+
+ // foreach
+ {
+ auto mapResult
+ = seq(1, 10)
+ | map([](int x) { return x * x; })
+ | as<std::vector<int>>();
+
+ auto pmapResult
+ = seq(1, 10)
+ | pmap([](int x) { return x * x; }, 4)
+ | as<std::vector<int>>();
+
+ EXPECT_EQ(pmapResult, mapResult);
+ }
+}
+
+TEST(Pmap, Empty) {
+ // apply
+ {
+ auto mapResult
+ = seq(1)
+ | map([](int x) { return x * x; })
+ | until([](int) { return true; })
+ | as<std::vector<int>>();
+
+ auto pmapResult
+ = seq(1)
+ | pmap([](int x) { return x * x; }, 4)
+ | until([](int) { return true; })
+ | as<std::vector<int>>();
+
+ EXPECT_EQ(mapResult.size(), 0);
+ EXPECT_EQ(pmapResult, mapResult);
+ }
+
+ // foreach
+ {
+ auto mapResult
+ = empty<int>()
+ | map([](int x) { return x * x; })
+ | as<std::vector<int>>();
+
+ auto pmapResult
+ = empty<int>()
+ | pmap([](int x) { return x * x; }, 4)
+ | as<std::vector<int>>();
+
+ EXPECT_EQ(mapResult.size(), 0);
+ EXPECT_EQ(pmapResult, mapResult);
+ }
+}
+
+TEST(Pmap, Rvalues) {
+ // apply
+ {
+ auto mapResult
+ = seq(1)
+ | map([](int x) { return make_unique<int>(x); })
+ | map([](std::unique_ptr<int> x) { return make_unique<int>(*x * *x); })
+ | map([](std::unique_ptr<int> x) { return *x; })
+ | take(1000)
+ | sum;
+
+ auto pmapResult
+ = seq(1)
+ | pmap([](int x) { return make_unique<int>(x); })
+ | pmap([](std::unique_ptr<int> x) { return make_unique<int>(*x * *x); })
+ | pmap([](std::unique_ptr<int> x) { return *x; })
+ | take(1000)
+ | sum;
+
+ EXPECT_EQ(pmapResult, mapResult);
+ }
+
+ // foreach
+ {
+ auto mapResult
+ = seq(1, 1000)
+ | map([](int x) { return make_unique<int>(x); })
+ | map([](std::unique_ptr<int> x) { return make_unique<int>(*x * *x); })
+ | map([](std::unique_ptr<int> x) { return *x; })
+ | sum;
+
+ auto pmapResult
+ = seq(1, 1000)
+ | pmap([](int x) { return make_unique<int>(x); })
+ | pmap([](std::unique_ptr<int> x) { return make_unique<int>(*x * *x); })
+ | pmap([](std::unique_ptr<int> x) { return *x; })
+ | sum;
+
+ EXPECT_EQ(pmapResult, mapResult);
+ }
+}
+
+int main(int argc, char *argv[]) {
+ testing::InitGoogleTest(&argc, argv);
+ google::ParseCommandLineFlags(&argc, &argv, true);
+ return RUN_ALL_TESTS();
+}