From: Dave Watson Date: Thu, 9 Apr 2015 17:00:48 +0000 (-0700) Subject: LineBasedFrameDecoder X-Git-Tag: v0.35.0~3 X-Git-Url: http://demsky.eecs.uci.edu/git/?a=commitdiff_plain;h=d887f51c7edaa2d330312957662e89ec89ce4e2b;p=folly.git LineBasedFrameDecoder Summary: Copy of netty's line based decoder. Test Plan: unittests fbconfig folly/wangle/codec; fbmake runtests Reviewed By: hans@fb.com Subscribers: doug, fugalh, folly-diffs@, jsedgwick, yfeldblum, chalfant FB internal diff: D1959155 Signature: t1:1959155:1427935150:e11280c5567df9ad9964dbb656aa090267856f57 --- diff --git a/folly/wangle/codec/CodecTest.cpp b/folly/wangle/codec/CodecTest.cpp index df2c03e1..7ba02043 100644 --- a/folly/wangle/codec/CodecTest.cpp +++ b/folly/wangle/codec/CodecTest.cpp @@ -16,6 +16,7 @@ #include #include +#include #include #include @@ -32,6 +33,10 @@ class FrameTester void read(Context* ctx, IOBufQueue& q) { test_(q.move()); } + + void readException(Context* ctx, exception_wrapper w) { + test_(nullptr); + } private: std::function)> test_; }; @@ -348,3 +353,205 @@ TEST(CodecTest, LengthFieldFrameDecoderStripPrePostHeaderFrameInclHeader) { pipeline.read(q); EXPECT_EQ(called, 1); } + +TEST(CodecTest, LineBasedFrameDecoder) { + ChannelPipeline> pipeline; + int called = 0; + + pipeline + .addBack(LineBasedFrameDecoder(10)) + .addBack(FrameTester([&](std::unique_ptr buf) { + auto sz = buf->computeChainDataLength(); + called++; + EXPECT_EQ(sz, 3); + })) + .finalize(); + + auto buf = IOBuf::create(3); + buf->append(3); + + IOBufQueue q(IOBufQueue::cacheChainLength()); + + q.append(std::move(buf)); + pipeline.read(q); + EXPECT_EQ(called, 0); + + buf = IOBuf::create(1); + buf->append(1); + RWPrivateCursor c(buf.get()); + c.write('\n'); + q.append(std::move(buf)); + pipeline.read(q); + EXPECT_EQ(called, 1); + + buf = IOBuf::create(4); + buf->append(4); + RWPrivateCursor c1(buf.get()); + c1.write(' '); + c1.write(' '); + c1.write(' '); + + c1.write('\r'); + q.append(std::move(buf)); + pipeline.read(q); + EXPECT_EQ(called, 1); + + buf = IOBuf::create(1); + buf->append(1); + RWPrivateCursor c2(buf.get()); + c2.write('\n'); + q.append(std::move(buf)); + pipeline.read(q); + EXPECT_EQ(called, 2); +} + +TEST(CodecTest, LineBasedFrameDecoderSaveDelimiter) { + ChannelPipeline> pipeline; + int called = 0; + + pipeline + .addBack(LineBasedFrameDecoder(10, false)) + .addBack(FrameTester([&](std::unique_ptr buf) { + auto sz = buf->computeChainDataLength(); + called++; + EXPECT_EQ(sz, 4); + })) + .finalize(); + + auto buf = IOBuf::create(3); + buf->append(3); + + IOBufQueue q(IOBufQueue::cacheChainLength()); + + q.append(std::move(buf)); + pipeline.read(q); + EXPECT_EQ(called, 0); + + buf = IOBuf::create(1); + buf->append(1); + RWPrivateCursor c(buf.get()); + c.write('\n'); + q.append(std::move(buf)); + pipeline.read(q); + EXPECT_EQ(called, 1); + + buf = IOBuf::create(3); + buf->append(3); + RWPrivateCursor c1(buf.get()); + c1.write(' '); + c1.write(' '); + c1.write('\r'); + q.append(std::move(buf)); + pipeline.read(q); + EXPECT_EQ(called, 1); + + buf = IOBuf::create(1); + buf->append(1); + RWPrivateCursor c2(buf.get()); + c2.write('\n'); + q.append(std::move(buf)); + pipeline.read(q); + EXPECT_EQ(called, 2); +} + +TEST(CodecTest, LineBasedFrameDecoderFail) { + ChannelPipeline> pipeline; + int called = 0; + + pipeline + .addBack(LineBasedFrameDecoder(10)) + .addBack(FrameTester([&](std::unique_ptr buf) { + called++; + })) + .finalize(); + + auto buf = IOBuf::create(11); + buf->append(11); + + IOBufQueue q(IOBufQueue::cacheChainLength()); + + q.append(std::move(buf)); + pipeline.read(q); + EXPECT_EQ(called, 1); + + buf = IOBuf::create(1); + buf->append(1); + q.append(std::move(buf)); + pipeline.read(q); + EXPECT_EQ(called, 1); + + buf = IOBuf::create(2); + buf->append(2); + RWPrivateCursor c(buf.get()); + c.write(' '); + c.write('\n'); + q.append(std::move(buf)); + pipeline.read(q); + EXPECT_EQ(called, 1); + + buf = IOBuf::create(12); + buf->append(12); + RWPrivateCursor c2(buf.get()); + for (int i = 0; i < 11; i++) { + c2.write(' '); + } + c2.write('\n'); + q.append(std::move(buf)); + pipeline.read(q); + EXPECT_EQ(called, 2); +} + +TEST(CodecTest, LineBasedFrameDecoderNewLineOnly) { + ChannelPipeline> pipeline; + int called = 0; + + pipeline + .addBack(LineBasedFrameDecoder( + 10, true, LineBasedFrameDecoder::TerminatorType::NEWLINE)) + .addBack(FrameTester([&](std::unique_ptr buf) { + auto sz = buf->computeChainDataLength(); + called++; + EXPECT_EQ(sz, 1); + })) + .finalize(); + + auto buf = IOBuf::create(2); + buf->append(2); + RWPrivateCursor c(buf.get()); + c.write('\r'); + c.write('\n'); + + IOBufQueue q(IOBufQueue::cacheChainLength()); + + q.append(std::move(buf)); + pipeline.read(q); + EXPECT_EQ(called, 1); +} + +TEST(CodecTest, LineBasedFrameDecoderCarriageNewLineOnly) { + ChannelPipeline> pipeline; + int called = 0; + + pipeline + .addBack(LineBasedFrameDecoder( + 10, true, LineBasedFrameDecoder::TerminatorType::CARRIAGENEWLINE)) + .addBack(FrameTester([&](std::unique_ptr buf) { + auto sz = buf->computeChainDataLength(); + called++; + EXPECT_EQ(sz, 1); + })) + .finalize(); + + auto buf = IOBuf::create(3); + buf->append(3); + RWPrivateCursor c(buf.get()); + c.write('\n'); + c.write('\r'); + c.write('\n'); + + IOBufQueue q(IOBufQueue::cacheChainLength()); + + q.append(std::move(buf)); + pipeline.read(q); + EXPECT_EQ(called, 1); +} diff --git a/folly/wangle/codec/LineBasedFrameDecoder.cpp b/folly/wangle/codec/LineBasedFrameDecoder.cpp new file mode 100644 index 00000000..ab0bb074 --- /dev/null +++ b/folly/wangle/codec/LineBasedFrameDecoder.cpp @@ -0,0 +1,103 @@ +/* + * Copyright 2015 Facebook, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#include + +namespace folly { namespace wangle { + +using folly::io::Cursor; + +LineBasedFrameDecoder::LineBasedFrameDecoder(uint32_t maxLength, + bool stripDelimiter, + TerminatorType terminatorType) + : maxLength_(maxLength) + , stripDelimiter_(stripDelimiter) + , terminatorType_(terminatorType) {} + +std::unique_ptr LineBasedFrameDecoder::decode( + Context* ctx, IOBufQueue& buf, size_t&) { + int64_t eol = findEndOfLine(buf); + + if (!discarding_) { + if (eol >= 0) { + Cursor c(buf.front()); + c += eol; + auto delimLength = c.read() == '\r' ? 2 : 1; + if (eol > maxLength_) { + buf.split(eol + delimLength); + fail(ctx, folly::to(eol)); + return nullptr; + } + + std::unique_ptr frame; + + if (stripDelimiter_) { + frame = buf.split(eol); + buf.trimStart(delimLength); + } else { + frame = buf.split(eol + delimLength); + } + + return std::move(frame); + } else { + auto len = buf.chainLength(); + if (len > maxLength_) { + discardedBytes_ = len; + buf.trimStart(len); + discarding_ = true; + fail(ctx, "over " + folly::to(len)); + } + return nullptr; + } + } else { + if (eol >= 0) { + Cursor c(buf.front()); + c += eol; + auto delimLength = c.read() == '\r' ? 2 : 1; + buf.trimStart(eol + delimLength); + discardedBytes_ = 0; + discarding_ = false; + } else { + discardedBytes_ = buf.chainLength(); + buf.move(); + } + + return nullptr; + } +} + +void LineBasedFrameDecoder::fail(Context* ctx, std::string len) { + ctx->fireReadException( + folly::make_exception_wrapper( + "frame length" + len + + " exeeds max " + folly::to(maxLength_))); +} + +int64_t LineBasedFrameDecoder::findEndOfLine(IOBufQueue& buf) { + Cursor c(buf.front()); + for (uint32_t i = 0; i < maxLength_ && i < buf.chainLength(); i++) { + auto b = c.read(); + if (b == '\n' && terminatorType_ != TerminatorType::CARRIAGENEWLINE) { + return i; + } else if (terminatorType_ != TerminatorType::NEWLINE && + b == '\r' && !c.isAtEnd() && c.read() == '\n') { + return i; + } + } + + return -1; +} + +}} // namespace diff --git a/folly/wangle/codec/LineBasedFrameDecoder.h b/folly/wangle/codec/LineBasedFrameDecoder.h new file mode 100644 index 00000000..5ae9433f --- /dev/null +++ b/folly/wangle/codec/LineBasedFrameDecoder.h @@ -0,0 +1,59 @@ +/* + * Copyright 2015 Facebook, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#pragma once + +#include +#include + +namespace folly { namespace wangle { + +/** + * A decoder that splits the received IOBufQueue on line endings. + * + * Both "\n" and "\r\n" are handled, or optionally reqire only + * one or the other. + */ +class LineBasedFrameDecoder : public ByteToMessageCodec { + public: + enum class TerminatorType { + BOTH, + NEWLINE, + CARRIAGENEWLINE + }; + + LineBasedFrameDecoder(uint32_t maxLength = UINT_MAX, + bool stripDelimiter = true, + TerminatorType terminatorType = + TerminatorType::BOTH); + + std::unique_ptr decode(Context* ctx, IOBufQueue& buf, size_t&); + + private: + + int64_t findEndOfLine(IOBufQueue& buf); + + void fail(Context* ctx, std::string len); + + uint32_t maxLength_; + bool stripDelimiter_; + + bool discarding_{false}; + uint32_t discardedBytes_{0}; + + TerminatorType terminatorType_; +}; + +}} // namespace