From d3cc50e00280b1ca5ffd4f713ac312181b1d6810 Mon Sep 17 00:00:00 2001 From: Tom Jackson Date: Fri, 4 Apr 2014 17:58:07 -0700 Subject: [PATCH] parallel(pipeline) Summary: Adding `... | parallel(my | pipe | line) | ...` for parallelizing a portion of a generator pipeline. ```lang=cpp auto factored = from(values) | parallel(filter(isEven) | map(square) | sub(count)) | sum; ``` Work is divided evenly among a fixed number of threads using a `MPMCQueue`. Test Plan: Unit tests and benchmarks testing for a variety of workloads and performance characteristics, including sub-linear (blocking) workloads, linear (mostly math) workloads, and superlinear (sleeping) workloads to simulate real-world use. Reviewed By: lucian@fb.com FB internal diff: D638551 --- folly/gen/Base-inl.h | 86 +++++- folly/gen/Base.h | 28 +- folly/gen/Core-inl.h | 8 +- folly/gen/Parallel-inl.h | 410 +++++++++++++++++++++++++++ folly/gen/Parallel.h | 110 +++++++ folly/gen/test/Bench.h | 33 +++ folly/gen/test/ParallelBenchmark.cpp | 179 ++++++++++++ folly/gen/test/ParallelTest.cpp | 109 +++++++ 8 files changed, 958 insertions(+), 5 deletions(-) create mode 100644 folly/gen/Parallel-inl.h create mode 100644 folly/gen/Parallel.h create mode 100644 folly/gen/test/Bench.h create mode 100644 folly/gen/test/ParallelBenchmark.cpp create mode 100644 folly/gen/test/ParallelTest.cpp diff --git a/folly/gen/Base-inl.h b/folly/gen/Base-inl.h index d627ed55..1c2da985 100644 --- a/folly/gen/Base-inl.h +++ b/folly/gen/Base-inl.h @@ -142,6 +142,45 @@ public: } }; +/** + * RangeSource - For producing values from a folly::Range. Useful for referring + * to a slice of some container. + * + * This type is primarily used through the 'from' function, like: + * + * auto rangeSource = from(folly::range(v.begin(), v.end())); + * auto sum = rangeSource | sum; + * + * Reminder: Be careful not to invalidate iterators when using ranges like this. + */ +template +class RangeSource : public GenImpl::reference, + RangeSource> { + Range range_; + public: + RangeSource() {} + explicit RangeSource(Range range) + : range_(std::move(range)) + {} + + template + bool apply(Handler&& handler) const { + for (auto& value : range_) { + if (!handler(value)) { + return false; + } + } + return true; + } + + template + void foreach(Body&& body) const { + for (auto& value : range_) { + body(value); + } + } +}; + /** * Sequence - For generating values from beginning value, incremented along the * way with the ++ and += operators. Iteration may continue indefinitely by @@ -256,8 +295,32 @@ class Yield : public GenImpl> { template class Empty : public GenImpl> { public: - template - bool apply(Handler&&) const { return true; } + template + bool apply(Handler&&) const { + return true; + } + + template + void foreach(Body&&) const {} +}; + +template +class Just : public GenImpl> { + static_assert(!std::is_reference::value, + "Just requires non-ref types"); + const Value value_; + public: + Just(Value value) : value_(std::forward(value)) {} + + template + bool apply(Handler&& handler) const { + return handler(value_); + } + + template + void foreach(Body&& body) const { + body(value_); + } }; /* @@ -879,6 +942,25 @@ class Distinct : public Operator> { } }; +/** + * Composer - Helper class for adapting pipelines into functors. Primarily used + * for 'mapOp'. + */ +template +class Composer { + Operators op_; + public: + explicit Composer(Operators op) + : op_(std::move(op)) {} + + template() + .compose(std::declval()))> + Ret operator()(Source&& source) const { + return op_.compose(std::forward(source)); + } +}; + /** * Batch - For producing fixed-size batches of each value from a source. * diff --git a/folly/gen/Base.h b/folly/gen/Base.h index 3b6cb7ca..06cc0e3c 100644 --- a/folly/gen/Base.h +++ b/folly/gen/Base.h @@ -1,5 +1,5 @@ /* - * Copyright 2013 Facebook, Inc. + * Copyright 2014 Facebook, Inc. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -13,6 +13,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ + #ifndef FOLLY_GEN_BASE_H #define FOLLY_GEN_BASE_H @@ -264,6 +265,8 @@ class Yield; template class Empty; +template +class Just; /* * Operators @@ -290,6 +293,9 @@ class Order; template class Distinct; +template +class Composer; + template class TypeAssertion; @@ -431,6 +437,11 @@ detail::Empty empty() { return {}; } +template +detail::Just just(Value value) { + return detail::Just(std::move(value)); +} + /* * Operator Factories */ @@ -446,6 +457,21 @@ Map map(Predicate pred = Predicate()) { return Map(std::move(pred)); } +/** + * mapOp - Given a generator of generators, maps the application of the given + * operator on to each inner gen. Especially useful in aggregating nested data + * structures: + * + * chunked(samples, 256) + * | mapOp(filter(sampleTest) | count) + * | sum; + */ +template>> +Map mapOp(Operator op) { + return Map(detail::Composer(std::move(op))); +} + /* * member(...) - For extracting a member from each value. * diff --git a/folly/gen/Core-inl.h b/folly/gen/Core-inl.h index 9808487f..973928a6 100644 --- a/folly/gen/Core-inl.h +++ b/folly/gen/Core-inl.h @@ -90,8 +90,10 @@ class Operator : public FBounded { protected: Operator() = default; - Operator(const Operator&) = default; Operator(Operator&&) = default; + Operator(const Operator&) = default; + Operator& operator=(Operator&&) = default; + Operator& operator=(const Operator&) = default; }; /** @@ -142,8 +144,10 @@ class GenImpl : public FBounded { protected: // To prevent slicing GenImpl() = default; - GenImpl(const GenImpl&) = default; GenImpl(GenImpl&&) = default; + GenImpl(const GenImpl&) = default; + GenImpl& operator=(GenImpl&&) = default; + GenImpl& operator=(const GenImpl&) = default; public: typedef Value ValueType; diff --git a/folly/gen/Parallel-inl.h b/folly/gen/Parallel-inl.h new file mode 100644 index 00000000..b4866a11 --- /dev/null +++ b/folly/gen/Parallel-inl.h @@ -0,0 +1,410 @@ +/* + * Copyright 2014 Facebook, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#ifndef FOLLY_GEN_PARALLEL_H_ +#error This file may only be included from folly/gen/ParallelGen.h +#endif + +#include "folly/MPMCQueue.h" +#include "folly/ScopeGuard.h" +#include "folly/experimental/EventCount.h" +#include +#include +#include + +namespace folly { +namespace gen { +namespace detail { + +template +class ClosableMPMCQueue { + MPMCQueue queue_; + std::atomic producers_{0}; + std::atomic consumers_{0}; + folly::EventCount wakeProducer_; + folly::EventCount wakeConsumer_; + + public: + explicit ClosableMPMCQueue(size_t capacity) : queue_(capacity) {} + + ~ClosableMPMCQueue() { + CHECK(!producers()); + CHECK(!consumers()); + } + + void openProducer() { ++producers_; } + void openConsumer() { ++consumers_; } + + void closeInputProducer() { + int64_t producers = producers_--; + CHECK(producers); + if (producers == 1) { // last producer + wakeConsumer_.notifyAll(); + } + } + + void closeOutputConsumer() { + int64_t consumers = consumers_--; + CHECK(consumers); + if (consumers == 1) { // last consumer + wakeProducer_.notifyAll(); + } + } + + size_t producers() const { + return producers_.load(std::memory_order_acquire); + } + + size_t consumers() const { + return consumers_.load(std::memory_order_acquire); + } + + template + bool writeUnlessFull(Args&&... args) noexcept { + if (queue_.write(std::forward(args)...)) { + // wake consumers to pick up new value + wakeConsumer_.notify(); + return true; + } + return false; + } + + template + bool writeUnlessClosed(Args&&... args) { + // write if there's room + while (!queue_.writeIfNotFull(std::forward(args)...)) { + // if write fails, check if there are still consumers listening + auto key = wakeProducer_.prepareWait(); + if (!consumers()) { + // no consumers left; bail out + wakeProducer_.cancelWait(); + return false; + } + wakeProducer_.wait(key); + } + // wake consumers to pick up new value + wakeConsumer_.notify(); + return true; + } + + bool readUnlessEmpty(T& out) { + if (queue_.read(out)) { + // wake producers to fill empty space + wakeProducer_.notify(); + return true; + } + return false; + } + + bool readUnlessClosed(T& out) { + while (!queue_.readIfNotEmpty(out)) { + auto key = wakeConsumer_.prepareWait(); + if (!producers()) { + // wake producers to fill empty space + wakeProducer_.notify(); + return false; + } + wakeConsumer_.wait(key); + } + // wake writers blocked by full queue + wakeProducer_.notify(); + return true; + } +}; + +template +class Sub : public Operator> { + Sink sink_; + + public: + explicit Sub(Sink sink) : sink_(sink) {} + + template ().compose(std::declval())), + class Just = Just::type>> + Just compose(const GenImpl& source) const { + return Just(source | sink_); + } +}; + +template +class Parallel : public Operator> { + Ops ops_; + size_t threads_; + + public: + Parallel(Ops ops, size_t threads) : ops_(std::move(ops)), threads_(threads) {} + + template ::type, + class Composed = + decltype(std::declval().compose(Empty())), + class Output = typename Composed::ValueType, + class OutputDecayed = typename std::decay::type> + class Generator : public GenImpl> { + const Source source_; + const Ops ops_; + const size_t threads_; + typedef ClosableMPMCQueue InQueue; + typedef ClosableMPMCQueue OutQueue; + + class Puller : public GenImpl { + InQueue* queue_; + + public: + explicit Puller(InQueue* queue) : queue_(queue) {} + + template + bool apply(Handler&& handler) const { + InputDecayed input; + while (queue_->readUnlessClosed(input)) { + if (!handler(std::move(input))) { + return false; + } + } + return true; + } + + template + void foreach(Body&& body) const { + InputDecayed input; + while (queue_->readUnlessClosed(input)) { + body(std::move(input)); + } + } + }; + + template + class Pusher : public Operator> { + OutQueue* queue_; + + public: + explicit Pusher(OutQueue* queue) : queue_(queue) {} + + template + void compose(const GenImpl& source) const { + if (all) { + source.self().foreach([&](Value value) { + queue_->writeUnlessClosed(std::forward(value)); + }); + } else { + source.self().apply([&](Value value) { + return queue_->writeUnlessClosed(std::forward(value)); + }); + } + } + }; + + template + class Executor { + InQueue inQueue_; + OutQueue outQueue_; + Puller puller_; + Pusher pusher_; + std::vector workers_; + const Ops* ops_; + + void work() { + puller_ | *ops_ | pusher_; + }; + + public: + Executor(size_t threads, const Ops* ops) + : inQueue_(threads * 4), + outQueue_(threads * 4), + puller_(&inQueue_), + pusher_(&outQueue_), + ops_(ops) { + inQueue_.openProducer(); + outQueue_.openConsumer(); + for (int t = 0; t < threads; ++t) { + inQueue_.openConsumer(); + outQueue_.openProducer(); + workers_.emplace_back([this] { + SCOPE_EXIT { + inQueue_.closeOutputConsumer(); + outQueue_.closeInputProducer(); + }; + this->work(); + }); + } + } + + ~Executor() { + if (inQueue_.producers()) { + inQueue_.closeInputProducer(); + } + if (outQueue_.consumers()) { + outQueue_.closeOutputConsumer(); + } + while (!workers_.empty()) { + workers_.back().join(); + workers_.pop_back(); + } + CHECK(!inQueue_.consumers()); + CHECK(!outQueue_.producers()); + } + + void closeInputProducer() { inQueue_.closeInputProducer(); } + + void closeOutputConsumer() { outQueue_.closeOutputConsumer(); } + + bool writeUnlessClosed(Input&& input) { + return inQueue_.writeUnlessClosed(std::forward(input)); + } + + bool writeUnlessFull(Input&& input) { + return inQueue_.writeUnlessFull(std::forward(input)); + } + + bool readUnlessClosed(OutputDecayed& output) { + return outQueue_.readUnlessClosed(output); + } + + bool readUnlessEmpty(OutputDecayed& output) { + return outQueue_.readUnlessEmpty(output); + } + }; + + public: + Generator(Source source, Ops ops, size_t threads) + : source_(std::move(source)), + ops_(std::move(ops)), + threads_(threads + ?: std::max(1, sysconf(_SC_NPROCESSORS_CONF))) {} + + template + bool apply(Handler&& handler) const { + Executor executor(threads_, &ops_); + bool more = true; + source_.apply([&](Input input) { + if (executor.writeUnlessFull(std::forward(input))) { + return true; + } + OutputDecayed output; + while (executor.readUnlessEmpty(output)) { + if (!handler(std::move(output))) { + more = false; + return false; + } + } + if (!executor.writeUnlessClosed(std::forward(input))) { + return false; + } + return true; + }); + executor.closeInputProducer(); + + if (more) { + OutputDecayed output; + while (executor.readUnlessClosed(output)) { + if (!handler(std::move(output))) { + more = false; + break; + } + } + } + executor.closeOutputConsumer(); + + return more; + } + + template + void foreach(Body&& body) const { + Executor executor(threads_, &ops_); + source_.foreach([&](Input input) { + if (executor.writeUnlessFull(std::forward(input))) { + return; + } + OutputDecayed output; + while (executor.readUnlessEmpty(output)) { + body(std::move(output)); + } + CHECK(executor.writeUnlessClosed(std::forward(input))); + }); + executor.closeInputProducer(); + + OutputDecayed output; + while (executor.readUnlessClosed(output)) { + body(std::move(output)); + } + executor.closeOutputConsumer(); + } + }; + + template + Generator compose(const GenImpl& source) const { + return Generator(source.self(), ops_, threads_); + } + + template + Generator compose(GenImpl&& source) const { + return Generator(std::move(source.self()), ops_, threads_); + } +}; + +/** + * ChunkedRangeSource - For slicing up ranges into a sequence of chunks given a + * maximum chunk size. + * + * Usually used through the 'chunked' helper, like: + * + * int n + * = chunked(values) + * | parallel // each thread processes a chunk + * | concat // but can still process values one at a time + * | filter(isPrime) + * | atomic_count; + */ +template +class ChunkedRangeSource + : public GenImpl&&, ChunkedRangeSource> { + int chunkSize_; + Range range_; + + public: + ChunkedRangeSource() {} + ChunkedRangeSource(int chunkSize, Range range) + : chunkSize_(chunkSize), range_(std::move(range)) {} + + template + bool apply(Handler&& handler) const { + auto remaining = range_; + while (!remaining.empty()) { + auto chunk = remaining.subpiece(0, chunkSize_); + remaining.advance(chunk.size()); + auto gen = RangeSource(chunk); + if (!handler(std::move(gen))) { + return false; + } + } + return true; + } +}; + +} // namespace detail + +} // namespace gen +} // namespace folly diff --git a/folly/gen/Parallel.h b/folly/gen/Parallel.h new file mode 100644 index 00000000..d1b99792 --- /dev/null +++ b/folly/gen/Parallel.h @@ -0,0 +1,110 @@ +/* + * Copyright 2014 Facebook, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#ifndef FOLLY_GEN_PARALLEL_H_ +#define FOLLY_GEN_PARALLEL_H_ + +#include + +#include "folly/gen/Base.h" + +namespace folly { namespace gen { +namespace detail { + +template +class Parallel; + +template +class Sub; + +template +class ChunkedRangeSource; + +} + +/** + * chunked() - For producing values from a container in slices. + * + * Especially for use with 'parallel()', chunked can be used to process values + * from a persistent container in chunks larger than one value at a time. The + * values produced are generators for slices of the input container. */ +template > +Chunked chunked(const Container& container, int chunkSize = 256) { + return Chunked(chunkSize, folly::range(container.begin(), container.end())); +} + +template > +Chunked chunked(Container& container, int chunkSize = 256) { + return Chunked(chunkSize, folly::range(container.begin(), container.end())); +} + + +/** + * parallel - A parallelization operator. + * + * 'parallel(ops)' can be used with any generator to process a segment + * of the pipeline in parallel. Multiple threads are used to apply the + * operations ('ops') to the input sequence, with the resulting sequence + * interleaved to be processed on the client thread. + * + * auto scoredResults + * = from(ids) + * | parallel(map(fetchObj) | filter(isValid) | map(scoreObj)) + * | as(); + * + * Operators specified for parallel execution must yield sequences, not just + * individual values. If a sink function such as 'count' is desired, it must be + * wrapped in 'sub' to produce a subcount, since any such aggregation must be + * re-aggregated. + * + * auto matches + * = from(docs) + * | parallel(filter(expensiveTest) | sub(count)) + * | sum; + * + * Here, each thread counts its portion of the result, then the sub-counts are + * summed up to produce the total count. + */ +template > +Parallel parallel(Ops ops, size_t threads = 0) { + return Parallel(std::move(ops), threads); +} + +/** + * sub - For sub-summarization of a sequence. + * + * 'sub' can be used to apply a sink function to a generator, but wrap the + * single value in another generator. Note that the sink is eagerly evaluated on + * the input sequence. + * + * auto sum = from(list) | sub(count) | first; + * + * This is primarily used with 'parallel', as noted above. + */ +template > +Sub sub(Sink sink) { + return Sub(std::move(sink)); +} + +}} // !namespace folly::gen + +#include "folly/gen/Parallel-inl.h" + +#endif /* FOLLY_GEN_PARALLEL_H_ */ diff --git a/folly/gen/test/Bench.h b/folly/gen/test/Bench.h new file mode 100644 index 00000000..be91357f --- /dev/null +++ b/folly/gen/test/Bench.h @@ -0,0 +1,33 @@ +/* + * Copyright 2014 Facebook, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#ifndef FOLLY_GEN_BENCH_H_ +#define FOLLY_GEN_BENCH_H_ + +#include "folly/Benchmark.h" + +#define BENCH_GEN_IMPL(gen, prefix) \ +static bool FB_ANONYMOUS_VARIABLE(benchGen) = ( \ + ::folly::addBenchmark(__FILE__, prefix FB_STRINGIZE(gen), \ + [](unsigned iters){ \ + while (iters--) { \ + folly::doNotOptimizeAway(gen); \ + } \ + }), true) +#define BENCH_GEN(gen) BENCH_GEN_IMPL(gen, "") +#define BENCH_GEN_REL(gen) BENCH_GEN_IMPL(gen, "%") + +#endif diff --git a/folly/gen/test/ParallelBenchmark.cpp b/folly/gen/test/ParallelBenchmark.cpp new file mode 100644 index 00000000..afebbc80 --- /dev/null +++ b/folly/gen/test/ParallelBenchmark.cpp @@ -0,0 +1,179 @@ +/* + * Copyright 2014 Facebook, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include +#include +#include +#include +#include + +#include "folly/gen/Base.h" +#include "folly/gen/Parallel.h" +#include "folly/gen/test/Bench.h" + + +DEFINE_int32(threads, + std::max(1, (int32_t) sysconf(_SC_NPROCESSORS_CONF) / 2), + "Num threads."); + +using namespace folly::gen; +using std::vector; + + +constexpr int kFib = 28; // unit of work +size_t fib(int n) { return n <= 1 ? 1 : fib(n - 1) + fib(n - 2); } + +static auto add = [](int a, int b) { return a + b; }; +static auto mod7 = [](int i) { return i % 7; }; + +static auto isPrimeSlow = [](int n) { + if (n < 2) { + return false; + } else if (n > 2) { + for (int d = 3; d * d <= n; d += 2) { + if (0 == n % d) { + return false; + } + } + } + return true; +}; + +static auto primes = + seq(1, 1 << 20) | filter(isPrimeSlow) | as(); + +static auto isPrime = [](int n) { + return !(from(primes) + | until([&](int d) { return d * d > n; }) + | filter([&](int d) { return 0 == n % d; }) + | any); +}; + +static auto factors = [](int n) { + return from(primes) + | until([&](int d) { return d * d > n; }) + | filter([&](int d) { return 0 == n % d; }) + | count; +}; + +static auto factorsSlow = [](int n) { + return from(primes) + | filter([&](int d) { return 0 == n % d; }) + | count; +}; + +static auto sleepyWork = [](int i) { + const auto sleepyTime = std::chrono::microseconds(100); + std::this_thread::sleep_for(sleepyTime); + return i; +}; + +static auto sleepAndWork = [](int i) { + return factorsSlow(i) + sleepyWork(i); +}; + +std::mutex block; +static auto workAndBlock = [](int i) { + int r = factorsSlow(i); + { + std::lock_guard lock(block); + return sleepyWork(i) + r; + } +}; + +auto start = 1 << 20; +auto v = seq(start) | take(1 << 20) | as(); +auto small = from(v) | take(1 << 12); +auto medium = from(v) | take(1 << 14); +auto large = from(v) | take(1 << 18); +auto huge = from(v); +auto chunks = chunked(v); + +BENCH_GEN(small | map(factorsSlow) | sum); +BENCH_GEN_REL(small | parallel(map(factorsSlow)) | sum); +BENCHMARK_DRAW_LINE(); + +BENCH_GEN(small | map(factors) | sum); +BENCH_GEN_REL(small | parallel(map(factors)) | sum); +BENCHMARK_DRAW_LINE(); + +BENCH_GEN(large | map(factors) | sum); +BENCH_GEN_REL(large | parallel(map(factors)) | sum); +BENCHMARK_DRAW_LINE(); + +auto ch = chunks; +auto cat = concat; +BENCH_GEN(huge | filter(isPrime) | count); +BENCH_GEN_REL(ch | cat | filter(isPrime) | count); +BENCH_GEN_REL(ch | parallel(cat | filter(isPrime)) | count); +BENCH_GEN_REL(ch | parallel(cat | filter(isPrime) | sub(count)) | sum); +BENCHMARK_DRAW_LINE(); + +BENCH_GEN(small | map(sleepAndWork) | sum); +BENCH_GEN_REL(small | parallel(map(sleepAndWork)) | sum); +BENCHMARK_DRAW_LINE(); + +const int fibs = 1000; +BENCH_GEN(seq(1, fibs) | map([](int) { return fib(kFib); }) | sum); +BENCH_GEN_REL(seq(1, fibs) | + parallel(map([](int) { return fib(kFib); }) | sub(sum)) | sum); +BENCH_GEN_REL([] { + auto threads = seq(1, int(FLAGS_threads)) + | map([](int i) { + return std::thread([=] { + return range((i + 0) * fibs / FLAGS_threads, + (i + 1) * fibs / FLAGS_threads) | + map([](int) { return fib(kFib); }) | sum; + }); + }) + | as(); + from(threads) | [](std::thread &thread) { thread.join(); }; + return 1; +}()); +BENCHMARK_DRAW_LINE(); + +#if 0 +============================================================================ +folly/gen/test/ParallelBenchmark.cpp relative time/iter iters/s +============================================================================ +small | map(factorsSlow) | sum 4.59s 217.87m +small | parallel(map(factorsSlow)) | sum 1588.86% 288.88ms 3.46 +---------------------------------------------------------------------------- +small | map(factors) | sum 9.62ms 103.94 +small | parallel(map(factors)) | sum 89.15% 10.79ms 92.66 +---------------------------------------------------------------------------- +large | map(factors) | sum 650.52ms 1.54 +large | parallel(map(factors)) | sum 53.82% 1.21s 827.41m +---------------------------------------------------------------------------- +huge | filter(isPrime) | count 295.93ms 3.38 +ch | cat | filter(isPrime) | count 99.76% 296.64ms 3.37 +ch | parallel(cat | filter(isPrime)) | count 142.75% 207.31ms 4.82 +ch | parallel(cat | filter(isPrime) | sub(count 1538.50% 19.24ms 51.99 +---------------------------------------------------------------------------- +small | map(sleepAndWork) | sum 5.37s 186.18m +small | parallel(map(sleepAndWork)) | sum 1840.38% 291.85ms 3.43 +---------------------------------------------------------------------------- +seq(1, fibs) | map([](int) { return fib(kFib); 1.49s 669.53m +seq(1, fibs) | parallel(map([](int) { return fi 1698.07% 87.96ms 11.37 +[] { auto threads = seq(1, int(FLAGS_threads)) 1571.16% 95.06ms 10.52 +---------------------------------------------------------------------------- +============================================================================ +#endif +int main(int argc, char *argv[]) { + google::ParseCommandLineFlags(&argc, &argv, true); + folly::runBenchmarks(); + return 0; +} diff --git a/folly/gen/test/ParallelTest.cpp b/folly/gen/test/ParallelTest.cpp new file mode 100644 index 00000000..7eabb317 --- /dev/null +++ b/folly/gen/test/ParallelTest.cpp @@ -0,0 +1,109 @@ +/* + * Copyright 2014 Facebook, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include +#include +#include +#include +#include +#include "folly/gen/Base.h" +#include "folly/gen/Parallel.h" + +using namespace folly; +using namespace folly::gen; +using std::vector; + +const auto square = [](int i) { return i * i; }; +const auto even = [](int i) { return 0 == i % 2; }; +static auto sleepyWork = [](int i) { + const auto sleepyTime = std::chrono::microseconds(100); + std::this_thread::sleep_for(sleepyTime); + return i; +}; + +static auto isPrime = [](int n) { + if (n < 2) { + return false; + } else if (n > 2) { + for (int d = 3; d * d <= n; d += 2) { + if (0 == n % d) { + return false; + } + } + } + return true; +}; + +struct { + template + std::unique_ptr operator()(T t) const { + return std::unique_ptr(new T(std::move(t))); + } +} makeUnique; + +static auto primes = seq(1, 1 << 14) + | filter(isPrime) + | as>(); + +static auto primeFactors = [](int n) { + return from(primes) + | filter([&](int d) { return 0 == n % d; }) + | count; +}; + +TEST(ParallelTest, Serial) { + EXPECT_EQ( + seq(1,10) | map(square) | filter(even) | sum, + seq(1,10) | parallel(map(square) | filter(even)) | sum); +} + +auto heavyWork = map(primeFactors); + +TEST(ParallelTest, ComputeBound64) { + int length = 1 << 10; + EXPECT_EQ(seq(1, length) | heavyWork | sum, + seq(1, length) | parallel(heavyWork) | sum); +} + +TEST(ParallelTest, Take) { + int length = 1 << 18; + int limit = 1 << 14; + EXPECT_EQ(seq(1, length) | take(limit) | count, + seq(1, length) | parallel(heavyWork) | take(limit) | count); +} + + +TEST(ParallelTest, Unique) { + auto uniqued = from(primes) | map(makeUnique) | as(); + EXPECT_EQ(primes.size(), + from(primes) | parallel(map(makeUnique)) | + parallel(dereference | map(makeUnique)) | dereference | count); + EXPECT_EQ(2, + from(primes) | parallel(map(makeUnique)) | + parallel(dereference | map(makeUnique)) | dereference | + take(2) | count); +} + +TEST(ParallelTest, PSum) { + EXPECT_EQ(from(primes) | map(sleepyWork) | sum, + from(primes) | parallel(map(sleepyWork) | sub(sum)) | sum); +} + +int main(int argc, char *argv[]) { + testing::InitGoogleTest(&argc, argv); + google::ParseCommandLineFlags(&argc, &argv, true); + return RUN_ALL_TESTS(); +} -- 2.34.1