}
};
+/**
+ * Window - For overlapping the lifetimes of pipeline values, especially with
+ * Futures.
+ *
+ * This type is usually used through the 'window' helper function:
+ *
+ * auto responses
+ * = byLine(STDIN)
+ * | map(makeRequestFuture)
+ * | window(1000)
+ * | map(waitFuture)
+ * | as<vector>();
+ */
+class Window : public Operator<Window> {
+ size_t windowSize_;
+
+ public:
+ explicit Window(size_t windowSize) : windowSize_(windowSize) {
+ if (windowSize_ == 0) {
+ throw std::invalid_argument("Window size must be non-zero!");
+ }
+ }
+
+ template <
+ class Value,
+ class Source,
+ class StorageType = typename std::decay<Value>::type>
+ class Generator
+ : public GenImpl<StorageType&&, Generator<Value, Source, StorageType>> {
+ Source source_;
+ size_t windowSize_;
+
+ public:
+ explicit Generator(Source source, size_t windowSize)
+ : source_(std::move(source)), windowSize_(windowSize) {}
+
+ template <class Handler>
+ bool apply(Handler&& handler) const {
+ std::vector<StorageType> buffer;
+ buffer.reserve(windowSize_);
+ size_t readIndex = 0;
+ bool shouldContinue = source_.apply([&](Value value) -> bool {
+ if (buffer.size() < windowSize_) {
+ buffer.push_back(std::forward<Value>(value));
+ } else {
+ StorageType& entry = buffer[readIndex++];
+ if (readIndex == windowSize_) {
+ readIndex = 0;
+ }
+ if (!handler(std::move(entry))) {
+ return false;
+ }
+ entry = std::forward<Value>(value);
+ }
+ return true;
+ });
+ if (!shouldContinue) {
+ return false;
+ }
+ if (buffer.size() < windowSize_) {
+ for (StorageType& entry : buffer) {
+ if (!handler(std::move(entry))) {
+ return false;
+ }
+ }
+ } else {
+ for (size_t i = readIndex;;) {
+ StorageType& entry = buffer[i++];
+ if (!handler(std::move(entry))) {
+ return false;
+ }
+ if (i == windowSize_) {
+ i = 0;
+ }
+ if (i == readIndex) {
+ break;
+ }
+ }
+ }
+ return true;
+ }
+
+ // Taking n-tuples of an infinite source is still infinite
+ static constexpr bool infinite = Source::infinite;
+ };
+
+ template <class Source, class Value, class Gen = Generator<Value, Source>>
+ Gen compose(GenImpl<Value, Source>&& source) const {
+ return Gen(std::move(source.self()), windowSize_);
+ }
+
+ template <class Source, class Value, class Gen = Generator<Value, Source>>
+ Gen compose(const GenImpl<Value, Source>& source) const {
+ return Gen(source.self(), windowSize_);
+ }
+};
+
/**
* Concat - For flattening generators of generators.
*
inline detail::Batch batch(size_t batchSize) {
return detail::Batch(batchSize);
}
+
+inline detail::Window window(size_t windowSize) {
+ return detail::Window(windowSize);
+}
+
} // namespace gen
} // namespace folly
EXPECT_EQ(expected, actual);
}
+TEST(Gen, Window) {
+ auto expected = seq(0, 10) | as<std::vector>();
+ for (size_t windowSize = 1; windowSize <= 20; ++windowSize) {
+ // no early stop
+ auto actual = seq(0, 10) |
+ mapped([](int i) { return std::unique_ptr<int>(new int(i)); }) |
+ window(4) | dereference | as<std::vector>();
+ EXPECT_EQ(expected, actual) << windowSize;
+ }
+ for (size_t windowSize = 1; windowSize <= 20; ++windowSize) {
+ // pre-window take
+ auto actual = seq(0) |
+ mapped([](int i) { return std::unique_ptr<int>(new int(i)); }) |
+ take(11) | window(4) | dereference | as<std::vector>();
+ EXPECT_EQ(expected, actual) << windowSize;
+ }
+ for (size_t windowSize = 1; windowSize <= 20; ++windowSize) {
+ // post-window take
+ auto actual = seq(0) |
+ mapped([](int i) { return std::unique_ptr<int>(new int(i)); }) |
+ window(4) | take(11) | dereference | as<std::vector>();
+ EXPECT_EQ(expected, actual) << windowSize;
+ }
+}
+
TEST(Gen, Just) {
{
int x = 3;