From: Tom Jackson Date: Thu, 14 Sep 2017 02:01:02 +0000 (-0700) Subject: Window, mainly for futures X-Git-Tag: v2017.09.18.00~5 X-Git-Url: http://demsky.eecs.uci.edu/git/?a=commitdiff_plain;h=61017384228ed48f8ca6ad6514191499c3cb3e81;p=folly.git Window, mainly for futures Summary: Just a circular buffer in the middle of a pipeline. Reviewed By: yfeldblum Differential Revision: D5791551 fbshipit-source-id: 2808a53df9b8cd2a402da0678a6b4ed0e6ca6e00 --- diff --git a/folly/gen/Base-inl.h b/folly/gen/Base-inl.h index e7a8e415..6e39e03f 100644 --- a/folly/gen/Base-inl.h +++ b/folly/gen/Base-inl.h @@ -1366,6 +1366,103 @@ class Batch : public Operator { } }; +/** + * Window - For overlapping the lifetimes of pipeline values, especially with + * Futures. + * + * This type is usually used through the 'window' helper function: + * + * auto responses + * = byLine(STDIN) + * | map(makeRequestFuture) + * | window(1000) + * | map(waitFuture) + * | as(); + */ +class Window : public Operator { + size_t windowSize_; + + public: + explicit Window(size_t windowSize) : windowSize_(windowSize) { + if (windowSize_ == 0) { + throw std::invalid_argument("Window size must be non-zero!"); + } + } + + template < + class Value, + class Source, + class StorageType = typename std::decay::type> + class Generator + : public GenImpl> { + Source source_; + size_t windowSize_; + + public: + explicit Generator(Source source, size_t windowSize) + : source_(std::move(source)), windowSize_(windowSize) {} + + template + bool apply(Handler&& handler) const { + std::vector buffer; + buffer.reserve(windowSize_); + size_t readIndex = 0; + bool shouldContinue = source_.apply([&](Value value) -> bool { + if (buffer.size() < windowSize_) { + buffer.push_back(std::forward(value)); + } else { + StorageType& entry = buffer[readIndex++]; + if (readIndex == windowSize_) { + readIndex = 0; + } + if (!handler(std::move(entry))) { + return false; + } + entry = std::forward(value); + } + return true; + }); + if (!shouldContinue) { + return false; + } + if (buffer.size() < windowSize_) { + for (StorageType& entry : buffer) { + if (!handler(std::move(entry))) { + return false; + } + } + } else { + for (size_t i = readIndex;;) { + StorageType& entry = buffer[i++]; + if (!handler(std::move(entry))) { + return false; + } + if (i == windowSize_) { + i = 0; + } + if (i == readIndex) { + break; + } + } + } + return true; + } + + // Taking n-tuples of an infinite source is still infinite + static constexpr bool infinite = Source::infinite; + }; + + template > + Gen compose(GenImpl&& source) const { + return Gen(std::move(source.self()), windowSize_); + } + + template > + Gen compose(const GenImpl& source) const { + return Gen(source.self(), windowSize_); + } +}; + /** * Concat - For flattening generators of generators. * @@ -2357,6 +2454,11 @@ inline detail::Skip skip(size_t count) { return detail::Skip(count); } inline detail::Batch batch(size_t batchSize) { return detail::Batch(batchSize); } + +inline detail::Window window(size_t windowSize) { + return detail::Window(windowSize); +} + } // namespace gen } // namespace folly diff --git a/folly/gen/Base.h b/folly/gen/Base.h index 1f8a59ec..b760aef3 100644 --- a/folly/gen/Base.h +++ b/folly/gen/Base.h @@ -357,6 +357,8 @@ class Cycle; class Batch; +class Window; + class Dereference; class Indirect; diff --git a/folly/gen/test/BaseTest.cpp b/folly/gen/test/BaseTest.cpp index 8e0358c2..5d9075bd 100644 --- a/folly/gen/test/BaseTest.cpp +++ b/folly/gen/test/BaseTest.cpp @@ -1249,6 +1249,31 @@ TEST(Gen, BatchMove) { EXPECT_EQ(expected, actual); } +TEST(Gen, Window) { + auto expected = seq(0, 10) | as(); + for (size_t windowSize = 1; windowSize <= 20; ++windowSize) { + // no early stop + auto actual = seq(0, 10) | + mapped([](int i) { return std::unique_ptr(new int(i)); }) | + window(4) | dereference | as(); + EXPECT_EQ(expected, actual) << windowSize; + } + for (size_t windowSize = 1; windowSize <= 20; ++windowSize) { + // pre-window take + auto actual = seq(0) | + mapped([](int i) { return std::unique_ptr(new int(i)); }) | + take(11) | window(4) | dereference | as(); + EXPECT_EQ(expected, actual) << windowSize; + } + for (size_t windowSize = 1; windowSize <= 20; ++windowSize) { + // post-window take + auto actual = seq(0) | + mapped([](int i) { return std::unique_ptr(new int(i)); }) | + window(4) | take(11) | dereference | as(); + EXPECT_EQ(expected, actual) << windowSize; + } +} + TEST(Gen, Just) { { int x = 3;