#include <gtest/gtest.h>
#include <folly/wangle/codec/FixedLengthFrameDecoder.h>
+#include <folly/wangle/codec/LineBasedFrameDecoder.h>
#include <folly/wangle/codec/LengthFieldBasedFrameDecoder.h>
#include <folly/wangle/codec/LengthFieldPrepender.h>
void read(Context* ctx, IOBufQueue& q) {
test_(q.move());
}
+
+ void readException(Context* ctx, exception_wrapper w) {
+ test_(nullptr);
+ }
private:
std::function<void(std::unique_ptr<IOBuf>)> test_;
};
pipeline.read(q);
EXPECT_EQ(called, 1);
}
+
+TEST(CodecTest, LineBasedFrameDecoder) {
+ ChannelPipeline<IOBufQueue&, std::unique_ptr<IOBuf>> pipeline;
+ int called = 0;
+
+ pipeline
+ .addBack(LineBasedFrameDecoder(10))
+ .addBack(FrameTester([&](std::unique_ptr<IOBuf> buf) {
+ auto sz = buf->computeChainDataLength();
+ called++;
+ EXPECT_EQ(sz, 3);
+ }))
+ .finalize();
+
+ auto buf = IOBuf::create(3);
+ buf->append(3);
+
+ IOBufQueue q(IOBufQueue::cacheChainLength());
+
+ q.append(std::move(buf));
+ pipeline.read(q);
+ EXPECT_EQ(called, 0);
+
+ buf = IOBuf::create(1);
+ buf->append(1);
+ RWPrivateCursor c(buf.get());
+ c.write<char>('\n');
+ q.append(std::move(buf));
+ pipeline.read(q);
+ EXPECT_EQ(called, 1);
+
+ buf = IOBuf::create(4);
+ buf->append(4);
+ RWPrivateCursor c1(buf.get());
+ c1.write(' ');
+ c1.write(' ');
+ c1.write(' ');
+
+ c1.write('\r');
+ q.append(std::move(buf));
+ pipeline.read(q);
+ EXPECT_EQ(called, 1);
+
+ buf = IOBuf::create(1);
+ buf->append(1);
+ RWPrivateCursor c2(buf.get());
+ c2.write('\n');
+ q.append(std::move(buf));
+ pipeline.read(q);
+ EXPECT_EQ(called, 2);
+}
+
+TEST(CodecTest, LineBasedFrameDecoderSaveDelimiter) {
+ ChannelPipeline<IOBufQueue&, std::unique_ptr<IOBuf>> pipeline;
+ int called = 0;
+
+ pipeline
+ .addBack(LineBasedFrameDecoder(10, false))
+ .addBack(FrameTester([&](std::unique_ptr<IOBuf> buf) {
+ auto sz = buf->computeChainDataLength();
+ called++;
+ EXPECT_EQ(sz, 4);
+ }))
+ .finalize();
+
+ auto buf = IOBuf::create(3);
+ buf->append(3);
+
+ IOBufQueue q(IOBufQueue::cacheChainLength());
+
+ q.append(std::move(buf));
+ pipeline.read(q);
+ EXPECT_EQ(called, 0);
+
+ buf = IOBuf::create(1);
+ buf->append(1);
+ RWPrivateCursor c(buf.get());
+ c.write<char>('\n');
+ q.append(std::move(buf));
+ pipeline.read(q);
+ EXPECT_EQ(called, 1);
+
+ buf = IOBuf::create(3);
+ buf->append(3);
+ RWPrivateCursor c1(buf.get());
+ c1.write(' ');
+ c1.write(' ');
+ c1.write('\r');
+ q.append(std::move(buf));
+ pipeline.read(q);
+ EXPECT_EQ(called, 1);
+
+ buf = IOBuf::create(1);
+ buf->append(1);
+ RWPrivateCursor c2(buf.get());
+ c2.write('\n');
+ q.append(std::move(buf));
+ pipeline.read(q);
+ EXPECT_EQ(called, 2);
+}
+
+TEST(CodecTest, LineBasedFrameDecoderFail) {
+ ChannelPipeline<IOBufQueue&, std::unique_ptr<IOBuf>> pipeline;
+ int called = 0;
+
+ pipeline
+ .addBack(LineBasedFrameDecoder(10))
+ .addBack(FrameTester([&](std::unique_ptr<IOBuf> buf) {
+ called++;
+ }))
+ .finalize();
+
+ auto buf = IOBuf::create(11);
+ buf->append(11);
+
+ IOBufQueue q(IOBufQueue::cacheChainLength());
+
+ q.append(std::move(buf));
+ pipeline.read(q);
+ EXPECT_EQ(called, 1);
+
+ buf = IOBuf::create(1);
+ buf->append(1);
+ q.append(std::move(buf));
+ pipeline.read(q);
+ EXPECT_EQ(called, 1);
+
+ buf = IOBuf::create(2);
+ buf->append(2);
+ RWPrivateCursor c(buf.get());
+ c.write(' ');
+ c.write<char>('\n');
+ q.append(std::move(buf));
+ pipeline.read(q);
+ EXPECT_EQ(called, 1);
+
+ buf = IOBuf::create(12);
+ buf->append(12);
+ RWPrivateCursor c2(buf.get());
+ for (int i = 0; i < 11; i++) {
+ c2.write(' ');
+ }
+ c2.write<char>('\n');
+ q.append(std::move(buf));
+ pipeline.read(q);
+ EXPECT_EQ(called, 2);
+}
+
+TEST(CodecTest, LineBasedFrameDecoderNewLineOnly) {
+ ChannelPipeline<IOBufQueue&, std::unique_ptr<IOBuf>> pipeline;
+ int called = 0;
+
+ pipeline
+ .addBack(LineBasedFrameDecoder(
+ 10, true, LineBasedFrameDecoder::TerminatorType::NEWLINE))
+ .addBack(FrameTester([&](std::unique_ptr<IOBuf> buf) {
+ auto sz = buf->computeChainDataLength();
+ called++;
+ EXPECT_EQ(sz, 1);
+ }))
+ .finalize();
+
+ auto buf = IOBuf::create(2);
+ buf->append(2);
+ RWPrivateCursor c(buf.get());
+ c.write<char>('\r');
+ c.write<char>('\n');
+
+ IOBufQueue q(IOBufQueue::cacheChainLength());
+
+ q.append(std::move(buf));
+ pipeline.read(q);
+ EXPECT_EQ(called, 1);
+}
+
+TEST(CodecTest, LineBasedFrameDecoderCarriageNewLineOnly) {
+ ChannelPipeline<IOBufQueue&, std::unique_ptr<IOBuf>> pipeline;
+ int called = 0;
+
+ pipeline
+ .addBack(LineBasedFrameDecoder(
+ 10, true, LineBasedFrameDecoder::TerminatorType::CARRIAGENEWLINE))
+ .addBack(FrameTester([&](std::unique_ptr<IOBuf> buf) {
+ auto sz = buf->computeChainDataLength();
+ called++;
+ EXPECT_EQ(sz, 1);
+ }))
+ .finalize();
+
+ auto buf = IOBuf::create(3);
+ buf->append(3);
+ RWPrivateCursor c(buf.get());
+ c.write<char>('\n');
+ c.write<char>('\r');
+ c.write<char>('\n');
+
+ IOBufQueue q(IOBufQueue::cacheChainLength());
+
+ q.append(std::move(buf));
+ pipeline.read(q);
+ EXPECT_EQ(called, 1);
+}
--- /dev/null
+/*
+ * Copyright 2015 Facebook, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#include <folly/wangle/codec/LineBasedFrameDecoder.h>
+
+namespace folly { namespace wangle {
+
+using folly::io::Cursor;
+
+LineBasedFrameDecoder::LineBasedFrameDecoder(uint32_t maxLength,
+ bool stripDelimiter,
+ TerminatorType terminatorType)
+ : maxLength_(maxLength)
+ , stripDelimiter_(stripDelimiter)
+ , terminatorType_(terminatorType) {}
+
+std::unique_ptr<IOBuf> LineBasedFrameDecoder::decode(
+ Context* ctx, IOBufQueue& buf, size_t&) {
+ int64_t eol = findEndOfLine(buf);
+
+ if (!discarding_) {
+ if (eol >= 0) {
+ Cursor c(buf.front());
+ c += eol;
+ auto delimLength = c.read<char>() == '\r' ? 2 : 1;
+ if (eol > maxLength_) {
+ buf.split(eol + delimLength);
+ fail(ctx, folly::to<std::string>(eol));
+ return nullptr;
+ }
+
+ std::unique_ptr<folly::IOBuf> frame;
+
+ if (stripDelimiter_) {
+ frame = buf.split(eol);
+ buf.trimStart(delimLength);
+ } else {
+ frame = buf.split(eol + delimLength);
+ }
+
+ return std::move(frame);
+ } else {
+ auto len = buf.chainLength();
+ if (len > maxLength_) {
+ discardedBytes_ = len;
+ buf.trimStart(len);
+ discarding_ = true;
+ fail(ctx, "over " + folly::to<std::string>(len));
+ }
+ return nullptr;
+ }
+ } else {
+ if (eol >= 0) {
+ Cursor c(buf.front());
+ c += eol;
+ auto delimLength = c.read<char>() == '\r' ? 2 : 1;
+ buf.trimStart(eol + delimLength);
+ discardedBytes_ = 0;
+ discarding_ = false;
+ } else {
+ discardedBytes_ = buf.chainLength();
+ buf.move();
+ }
+
+ return nullptr;
+ }
+}
+
+void LineBasedFrameDecoder::fail(Context* ctx, std::string len) {
+ ctx->fireReadException(
+ folly::make_exception_wrapper<std::runtime_error>(
+ "frame length" + len +
+ " exeeds max " + folly::to<std::string>(maxLength_)));
+}
+
+int64_t LineBasedFrameDecoder::findEndOfLine(IOBufQueue& buf) {
+ Cursor c(buf.front());
+ for (uint32_t i = 0; i < maxLength_ && i < buf.chainLength(); i++) {
+ auto b = c.read<char>();
+ if (b == '\n' && terminatorType_ != TerminatorType::CARRIAGENEWLINE) {
+ return i;
+ } else if (terminatorType_ != TerminatorType::NEWLINE &&
+ b == '\r' && !c.isAtEnd() && c.read<char>() == '\n') {
+ return i;
+ }
+ }
+
+ return -1;
+}
+
+}} // namespace
--- /dev/null
+/*
+ * Copyright 2015 Facebook, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#pragma once
+
+#include <folly/wangle/codec/ByteToMessageCodec.h>
+#include <folly/io/Cursor.h>
+
+namespace folly { namespace wangle {
+
+/**
+ * A decoder that splits the received IOBufQueue on line endings.
+ *
+ * Both "\n" and "\r\n" are handled, or optionally reqire only
+ * one or the other.
+ */
+class LineBasedFrameDecoder : public ByteToMessageCodec {
+ public:
+ enum class TerminatorType {
+ BOTH,
+ NEWLINE,
+ CARRIAGENEWLINE
+ };
+
+ LineBasedFrameDecoder(uint32_t maxLength = UINT_MAX,
+ bool stripDelimiter = true,
+ TerminatorType terminatorType =
+ TerminatorType::BOTH);
+
+ std::unique_ptr<IOBuf> decode(Context* ctx, IOBufQueue& buf, size_t&);
+
+ private:
+
+ int64_t findEndOfLine(IOBufQueue& buf);
+
+ void fail(Context* ctx, std::string len);
+
+ uint32_t maxLength_;
+ bool stripDelimiter_;
+
+ bool discarding_{false};
+ uint32_t discardedBytes_{0};
+
+ TerminatorType terminatorType_;
+};
+
+}} // namespace