From 89c3562ec54989b0e1fbe942bf403ccbf54fbcc7 Mon Sep 17 00:00:00 2001 From: Lucian Grijincu Date: Sat, 12 Apr 2014 23:10:11 -0700 Subject: [PATCH] folly: gen: pmap: parallel version of map Summary: same as map, but runs it's argument in parallel over a number of threads. @override-unit-failures Test Plan: added test Reviewed By: tjackson@fb.com FB internal diff: D1258364 --- folly/Optional.h | 8 +- folly/gen/ParallelMap-inl.h | 254 ++++++++++++++++++++++++ folly/gen/ParallelMap.h | 50 +++++ folly/gen/test/ParallelMapBenchmark.cpp | 91 +++++++++ folly/gen/test/ParallelMapTest.cpp | 145 ++++++++++++++ 5 files changed, 545 insertions(+), 3 deletions(-) create mode 100644 folly/gen/ParallelMap-inl.h create mode 100644 folly/gen/ParallelMap.h create mode 100644 folly/gen/test/ParallelMapBenchmark.cpp create mode 100644 folly/gen/test/ParallelMapTest.cpp diff --git a/folly/Optional.h b/folly/Optional.h index f85fbf6b..e877738d 100644 --- a/folly/Optional.h +++ b/folly/Optional.h @@ -112,15 +112,17 @@ class Optional { } } - /* implicit */ Optional(const None&) + /* implicit */ Optional(const None&) noexcept : hasValue_(false) { } - /* implicit */ Optional(Value&& newValue) { + /* implicit */ Optional(Value&& newValue) + noexcept(std::is_nothrow_move_constructible::value) { construct(std::move(newValue)); } - /* implicit */ Optional(const Value& newValue) { + /* implicit */ Optional(const Value& newValue) + noexcept(std::is_nothrow_copy_constructible::value) { construct(newValue); } diff --git a/folly/gen/ParallelMap-inl.h b/folly/gen/ParallelMap-inl.h new file mode 100644 index 00000000..7cc0321b --- /dev/null +++ b/folly/gen/ParallelMap-inl.h @@ -0,0 +1,254 @@ +/* + * Copyright 2014 Facebook, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#ifndef FOLLY_GEN_PARALLELMAP_H +#error This file may only be included from folly/gen/ParallelMap.h +#endif + +#include +#include +#include +#include +#include +#include + +#include "folly/MPMCPipeline.h" +#include "folly/experimental/EventCount.h" + +namespace folly { namespace gen { namespace detail { + +/** + * PMap - Map in parallel (using threads). For producing a sequence of + * values by passing each value from a source collection through a + * predicate while running the predicate in parallel in different + * threads. + * + * This type is usually used through the 'pmap' helper function: + * + * auto squares = seq(1, 10) | pmap(4, fibonacci) | sum; + */ +template +class PMap : public Operator> { + Predicate pred_; + size_t nThreads_; + public: + PMap() {} + + PMap(Predicate pred, size_t nThreads) + : pred_(std::move(pred)), + nThreads_(nThreads) { } + + template::type, + class Output = typename std::decay< + typename std::result_of::type + >::type> + class Generator : + public GenImpl> { + Source source_; + Predicate pred_; + const size_t nThreads_; + + class ExecutionPipeline { + std::vector workers_; + std::atomic done_{false}; + const Predicate& pred_; + MPMCPipeline pipeline_; + EventCount wake_; + + public: + ExecutionPipeline(const Predicate& pred, size_t nThreads) + : pred_(pred), + pipeline_(nThreads, nThreads) { + workers_.reserve(nThreads); + for (int i = 0; i < nThreads; i++) { + workers_.push_back(std::thread([this] { this->predApplier(); })); + } + } + + ~ExecutionPipeline() { + assert(pipeline_.sizeGuess() == 0); + assert(done_.load()); + for (auto& w : workers_) { w.join(); } + } + + void stop() { + // prevent workers from consuming more than we produce. + done_.store(true, std::memory_order_release); + wake_.notifyAll(); + } + + bool write(Value&& value) { + bool wrote = pipeline_.write(std::forward(value)); + if (wrote) { + wake_.notify(); + } + return wrote; + } + + void blockingWrite(Value&& value) { + pipeline_.blockingWrite(std::forward(value)); + wake_.notify(); + } + + bool read(Output& out) { + return pipeline_.read(out); + } + + void blockingRead(Output& out) { + pipeline_.blockingRead(out); + } + + private: + void predApplier() { + // Each thread takes a value from the pipeline_, runs the + // predicate and enqueues the result. The pipeline preserves + // ordering. NOTE: don't use blockingReadStage<0> to read from + // the pipeline_ as there may not be any: end-of-data is signaled + // separately using done_/wake_. + Input in; + for (;;) { + auto key = wake_.prepareWait(); + + typename MPMCPipeline::template Ticket<0> ticket; + if (pipeline_.template readStage<0>(ticket, in)) { + wake_.cancelWait(); + Output out = pred_(std::move(in)); + pipeline_.template blockingWriteStage<0>(ticket, + std::move(out)); + continue; + } + + if (done_.load(std::memory_order_acquire)) { + wake_.cancelWait(); + break; + } + + // Not done_, but no items in the queue. + wake_.wait(key); + } + } + }; + + public: + Generator(Source source, const Predicate& pred, size_t nThreads) + : source_(std::move(source)), + pred_(pred), + nThreads_(nThreads ?: sysconf(_SC_NPROCESSORS_ONLN)) { + } + + template + void foreach(Body&& body) const { + ExecutionPipeline pipeline(pred_, nThreads_); + + size_t wrote = 0; + size_t read = 0; + source_.foreach([&](Value value) { + if (pipeline.write(std::forward(value))) { + // input queue not yet full, saturate it before we process + // anything downstream + ++wrote; + return; + } + + // input queue full; drain ready items from the queue + Output out; + while (pipeline.read(out)) { + ++read; + body(std::move(out)); + } + + // write the value we were going to write before we made room. + pipeline.blockingWrite(std::forward(value)); + ++wrote; + }); + + pipeline.stop(); + + // flush the output queue + while (read < wrote) { + Output out; + pipeline.blockingRead(out); + ++read; + body(std::move(out)); + } + } + + template + bool apply(Handler&& handler) const { + ExecutionPipeline pipeline(pred_, nThreads_); + + size_t wrote = 0; + size_t read = 0; + bool more = true; + source_.apply([&](Value value) { + if (pipeline.write(std::forward(value))) { + // input queue not yet full, saturate it before we process + // anything downstream + ++wrote; + return true; + } + + // input queue full; drain ready items from the queue + Output out; + while (pipeline.read(out)) { + ++read; + if (!handler(std::move(out))) { + more = false; + return false; + } + } + + // write the value we were going to write before we made room. + pipeline.blockingWrite(std::forward(value)); + ++wrote; + return true; + }); + + pipeline.stop(); + + // flush the output queue + while (read < wrote) { + Output out; + pipeline.blockingRead(out); + ++read; + if (more) { + more = more && handler(std::move(out)); + } + } + return more; + } + + static constexpr bool infinite = Source::infinite; + }; + + template> + Gen compose(GenImpl&& source) const { + return Gen(std::move(source.self()), pred_, nThreads_); + } + + template> + Gen compose(const GenImpl& source) const { + return Gen(source.self(), pred_, nThreads_); + } +}; + +}}} // namespaces diff --git a/folly/gen/ParallelMap.h b/folly/gen/ParallelMap.h new file mode 100644 index 00000000..196dfd0b --- /dev/null +++ b/folly/gen/ParallelMap.h @@ -0,0 +1,50 @@ +/* + * Copyright 2014 Facebook, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#ifndef FOLLY_GEN_PARALLELMAP_H +#define FOLLY_GEN_PARALLELMAP_H + +#include "folly/gen/Core.h" + +namespace folly { namespace gen { + +namespace detail { + +template +class PMap; + +} // namespace detail + +/** + * Run `pred` in parallel in nThreads. Results are returned in the + * same order in which they were retrieved from the source generator + * (similar to map). + * + * NOTE: Only `pred` is run from separate threads; the source + * generator and the rest of the pipeline is executed in the + * caller thread. + */ +template> + PMap pmap(Predicate pred = Predicate(), size_t nThreads = 0) { + return PMap(std::move(pred), nThreads); +} + +}} // namespaces + +#include "folly/gen/ParallelMap-inl.h" + +#endif // FOLLY_GEN_PARALLELMAP_H diff --git a/folly/gen/test/ParallelMapBenchmark.cpp b/folly/gen/test/ParallelMapBenchmark.cpp new file mode 100644 index 00000000..1793b798 --- /dev/null +++ b/folly/gen/test/ParallelMapBenchmark.cpp @@ -0,0 +1,91 @@ +/* + * Copyright 2014 Facebook, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include +#include +#include +#include +#include + +#include "folly/Benchmark.h" +#include "folly/gen/Base.h" +#include "folly/gen/ParallelMap.h" + +using namespace folly::gen; + +DEFINE_int32(threads, + std::max(1, (int32_t) sysconf(_SC_NPROCESSORS_CONF) / 2), + "Num threads."); + +constexpr int kFib = 35; // unit of work +size_t fib(int n) { return n <= 1 ? 1 : fib(n-1) * fib(n-2); } + +BENCHMARK(FibSumMap, n) { + auto result = + seq(1, (int) n) + | map([](int) { return fib(kFib); }) + | sum; + folly::doNotOptimizeAway(result); +} + +BENCHMARK_RELATIVE(FibSumPmap, n) { + // Schedule more work: enough so that each worker thread does the + // same amount as one FibSumMap. + const size_t kNumThreads = FLAGS_threads; + auto result = + seq(1, (int) (n * kNumThreads)) + | pmap([](int) { return fib(kFib); }, kNumThreads) + | sum; + folly::doNotOptimizeAway(result); +} + +BENCHMARK_RELATIVE(FibSumThreads, n) { + // Schedule kNumThreads to execute the same code as FibSumMap. + const size_t kNumThreads = FLAGS_threads; + std::vector workers; + workers.reserve(kNumThreads); + auto fn = [n] { + auto result = + seq(1, (int) n) + | map([](int) { return fib(kFib); }) + | sum; + folly::doNotOptimizeAway(result); + }; + for (int i = 0; i < kNumThreads; i++) { + workers.push_back(std::thread(fn)); + } + for (auto& w : workers) { w.join(); } +} + +/* + ============================================================================ + folly/gen/test/ParallelMapBenchmark.cpp relative time/iter iters/s + ============================================================================ + FibSumMap 41.64ms 24.02 + FibSumPmap 98.38% 42.32ms 23.63 + FibSumThreads 94.48% 44.07ms 22.69 + ============================================================================ + + real0m15.595s + user2m47.100s + sys0m0.016s +*/ + +int main(int argc, char *argv[]) { + google::ParseCommandLineFlags(&argc, &argv, true); + folly::runBenchmarks(); + return 0; +} diff --git a/folly/gen/test/ParallelMapTest.cpp b/folly/gen/test/ParallelMapTest.cpp new file mode 100644 index 00000000..7b29f6cd --- /dev/null +++ b/folly/gen/test/ParallelMapTest.cpp @@ -0,0 +1,145 @@ +/* + * Copyright 2014 Facebook, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include + +#include +#include + +#include "folly/Memory.h" +#include "folly/gen/Base.h" +#include "folly/gen/ParallelMap.h" + +using namespace folly; +using namespace folly::gen; + +TEST(Pmap, InfiniteEquivalent) { + // apply + { + auto mapResult + = seq(1) + | map([](int x) { return x * x; }) + | until([](int x) { return x > 1000 * 1000; }) + | as>(); + + auto pmapResult + = seq(1) + | pmap([](int x) { return x * x; }, 4) + | until([](int x) { return x > 1000 * 1000; }) + | as>(); + + EXPECT_EQ(pmapResult, mapResult); + } + + // foreach + { + auto mapResult + = seq(1, 10) + | map([](int x) { return x * x; }) + | as>(); + + auto pmapResult + = seq(1, 10) + | pmap([](int x) { return x * x; }, 4) + | as>(); + + EXPECT_EQ(pmapResult, mapResult); + } +} + +TEST(Pmap, Empty) { + // apply + { + auto mapResult + = seq(1) + | map([](int x) { return x * x; }) + | until([](int) { return true; }) + | as>(); + + auto pmapResult + = seq(1) + | pmap([](int x) { return x * x; }, 4) + | until([](int) { return true; }) + | as>(); + + EXPECT_EQ(mapResult.size(), 0); + EXPECT_EQ(pmapResult, mapResult); + } + + // foreach + { + auto mapResult + = empty() + | map([](int x) { return x * x; }) + | as>(); + + auto pmapResult + = empty() + | pmap([](int x) { return x * x; }, 4) + | as>(); + + EXPECT_EQ(mapResult.size(), 0); + EXPECT_EQ(pmapResult, mapResult); + } +} + +TEST(Pmap, Rvalues) { + // apply + { + auto mapResult + = seq(1) + | map([](int x) { return make_unique(x); }) + | map([](std::unique_ptr x) { return make_unique(*x * *x); }) + | map([](std::unique_ptr x) { return *x; }) + | take(1000) + | sum; + + auto pmapResult + = seq(1) + | pmap([](int x) { return make_unique(x); }) + | pmap([](std::unique_ptr x) { return make_unique(*x * *x); }) + | pmap([](std::unique_ptr x) { return *x; }) + | take(1000) + | sum; + + EXPECT_EQ(pmapResult, mapResult); + } + + // foreach + { + auto mapResult + = seq(1, 1000) + | map([](int x) { return make_unique(x); }) + | map([](std::unique_ptr x) { return make_unique(*x * *x); }) + | map([](std::unique_ptr x) { return *x; }) + | sum; + + auto pmapResult + = seq(1, 1000) + | pmap([](int x) { return make_unique(x); }) + | pmap([](std::unique_ptr x) { return make_unique(*x * *x); }) + | pmap([](std::unique_ptr x) { return *x; }) + | sum; + + EXPECT_EQ(pmapResult, mapResult); + } +} + +int main(int argc, char *argv[]) { + testing::InitGoogleTest(&argc, argv); + google::ParseCommandLineFlags(&argc, &argv, true); + return RUN_ALL_TESTS(); +} -- 2.34.1