--- /dev/null
+/*
+ * Copyright 2015 Facebook, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#include <folly/wangle/codec/ByteToMessageCodec.h>
+
+namespace folly { namespace wangle {
+
+void ByteToMessageCodec::read(Context* ctx, IOBufQueue& q) {
+ size_t needed = 0;
+ std::unique_ptr<IOBuf> result;
+ while (true) {
+ result = decode(ctx, q, needed);
+ if (result) {
+ q_.append(std::move(result));
+ ctx->fireRead(q_);
+ } else {
+ break;
+ }
+ }
+}
+
+}} // namespace
--- /dev/null
+/*
+ * Copyright 2015 Facebook, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#pragma once
+
+#include <folly/wangle/channel/ChannelHandler.h>
+
+namespace folly { namespace wangle {
+
+/**
+ * A ChannelHandler which decodes bytes in a stream-like fashion from
+ * IOBufQueue to a Message type.
+ *
+ * Frame detection
+ *
+ * Generally frame detection should be handled earlier in the pipeline
+ * by adding a DelimiterBasedFrameDecoder, FixedLengthFrameDecoder,
+ * LengthFieldBasedFrameDecoder, LineBasedFrameDecoder.
+ *
+ * If a custom frame decoder is required, then one needs to be careful
+ * when implementing one with {@link ByteToMessageDecoder}. Ensure
+ * there are enough bytes in the buffer for a complete frame by
+ * checking {@link ByteBuf#readableBytes()}. If there are not enough
+ * bytes for a complete frame, return without modify the reader index
+ * to allow more bytes to arrive.
+ *
+ * To check for complete frames without modify the reader index, use
+ * IOBufQueue.front(), without split() or pop_front().
+ */
+class ByteToMessageCodec
+ : public BytesToBytesHandler {
+ public:
+
+ virtual std::unique_ptr<IOBuf> decode(
+ Context* ctx, IOBufQueue& buf, size_t&) = 0;
+
+ void read(Context* ctx, IOBufQueue& q);
+
+ private:
+ IOBufQueue q_;
+};
+
+}}
--- /dev/null
+/*
+ * Copyright 2015 Facebook, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#include <gtest/gtest.h>
+
+#include <folly/wangle/codec/FixedLengthFrameDecoder.h>
+#include <folly/wangle/codec/LengthFieldBasedFrameDecoder.h>
+#include <folly/wangle/codec/LengthFieldPrepender.h>
+
+using namespace folly;
+using namespace folly::wangle;
+using namespace folly::io;
+
+class FrameTester
+ : public BytesToBytesHandler {
+ public:
+ FrameTester(std::function<void(std::unique_ptr<IOBuf>)> test)
+ : test_(test) {}
+
+ void read(Context* ctx, IOBufQueue& q) {
+ test_(q.move());
+ }
+ private:
+ std::function<void(std::unique_ptr<IOBuf>)> test_;
+};
+
+class BytesReflector
+ : public BytesToBytesHandler {
+ public:
+
+ Future<void> write(Context* ctx, std::unique_ptr<IOBuf> buf) {
+ IOBufQueue q_(IOBufQueue::cacheChainLength());
+ q_.append(std::move(buf));
+ ctx->fireRead(q_);
+
+ return makeFuture();
+ }
+};
+
+TEST(CodecTest, FixedLengthFrameDecoder) {
+ ChannelPipeline<IOBufQueue&, std::unique_ptr<IOBuf>> pipeline;
+ int called = 0;
+
+ pipeline
+ .addBack(FixedLengthFrameDecoder(10))
+ .addBack(FrameTester([&](std::unique_ptr<IOBuf> buf) {
+ auto sz = buf->computeChainDataLength();
+ called++;
+ EXPECT_EQ(sz, 10);
+ }))
+ .finalize();
+
+ auto buf3 = IOBuf::create(3);
+ buf3->append(3);
+ auto buf11 = IOBuf::create(11);
+ buf11->append(11);
+ auto buf16 = IOBuf::create(16);
+ buf16->append(16);
+
+ IOBufQueue q(IOBufQueue::cacheChainLength());
+
+ q.append(std::move(buf3));
+ pipeline.read(q);
+ EXPECT_EQ(called, 0);
+
+ q.append(std::move(buf11));
+ pipeline.read(q);
+ EXPECT_EQ(called, 1);
+
+ q.append(std::move(buf16));
+ pipeline.read(q);
+ EXPECT_EQ(called, 3);
+}
+
+TEST(CodecTest, LengthFieldFramePipeline) {
+ ChannelPipeline<IOBufQueue&, std::unique_ptr<IOBuf>> pipeline;
+ int called = 0;
+
+ pipeline
+ .addBack(BytesReflector())
+ .addBack(LengthFieldBasedFrameDecoder())
+ .addBack(FrameTester([&](std::unique_ptr<IOBuf> buf) {
+ auto sz = buf->computeChainDataLength();
+ called++;
+ EXPECT_EQ(sz, 2);
+ }))
+ .addBack(LengthFieldPrepender())
+ .finalize();
+
+ auto buf = IOBuf::create(2);
+ buf->append(2);
+ pipeline.write(std::move(buf));
+ EXPECT_EQ(called, 1);
+}
+
+TEST(CodecTest, LengthFieldFramePipelineLittleEndian) {
+ ChannelPipeline<IOBufQueue&, std::unique_ptr<IOBuf>> pipeline;
+ int called = 0;
+
+ pipeline
+ .addBack(BytesReflector())
+ .addBack(LengthFieldBasedFrameDecoder(4, 100, 0, 0, 4, false))
+ .addBack(FrameTester([&](std::unique_ptr<IOBuf> buf) {
+ auto sz = buf->computeChainDataLength();
+ called++;
+ EXPECT_EQ(sz, 1);
+ }))
+ .addBack(LengthFieldPrepender(4, 0, false, false))
+ .finalize();
+
+ auto buf = IOBuf::create(1);
+ buf->append(1);
+ pipeline.write(std::move(buf));
+ EXPECT_EQ(called, 1);
+}
+
+TEST(CodecTest, LengthFieldFrameDecoderSimple) {
+ ChannelPipeline<IOBufQueue&, std::unique_ptr<IOBuf>> pipeline;
+ int called = 0;
+
+ pipeline
+ .addBack(LengthFieldBasedFrameDecoder())
+ .addBack(FrameTester([&](std::unique_ptr<IOBuf> buf) {
+ auto sz = buf->computeChainDataLength();
+ called++;
+ EXPECT_EQ(sz, 1);
+ }))
+ .finalize();
+
+ auto bufFrame = IOBuf::create(4);
+ bufFrame->append(4);
+ RWPrivateCursor c(bufFrame.get());
+ c.writeBE((uint32_t)1);
+ auto bufData = IOBuf::create(1);
+ bufData->append(1);
+
+ IOBufQueue q(IOBufQueue::cacheChainLength());
+
+ q.append(std::move(bufFrame));
+ pipeline.read(q);
+ EXPECT_EQ(called, 0);
+
+ q.append(std::move(bufData));
+ pipeline.read(q);
+ EXPECT_EQ(called, 1);
+}
+
+TEST(CodecTest, LengthFieldFrameDecoderNoStrip) {
+ ChannelPipeline<IOBufQueue&, std::unique_ptr<IOBuf>> pipeline;
+ int called = 0;
+
+ pipeline
+ .addBack(LengthFieldBasedFrameDecoder(2, 10, 0, 0, 0))
+ .addBack(FrameTester([&](std::unique_ptr<IOBuf> buf) {
+ auto sz = buf->computeChainDataLength();
+ called++;
+ EXPECT_EQ(sz, 3);
+ }))
+ .finalize();
+
+ auto bufFrame = IOBuf::create(2);
+ bufFrame->append(2);
+ RWPrivateCursor c(bufFrame.get());
+ c.writeBE((uint16_t)1);
+ auto bufData = IOBuf::create(1);
+ bufData->append(1);
+
+ IOBufQueue q(IOBufQueue::cacheChainLength());
+
+ q.append(std::move(bufFrame));
+ pipeline.read(q);
+ EXPECT_EQ(called, 0);
+
+ q.append(std::move(bufData));
+ pipeline.read(q);
+ EXPECT_EQ(called, 1);
+}
+
+TEST(CodecTest, LengthFieldFrameDecoderAdjustment) {
+ ChannelPipeline<IOBufQueue&, std::unique_ptr<IOBuf>> pipeline;
+ int called = 0;
+
+ pipeline
+ .addBack(LengthFieldBasedFrameDecoder(2, 10, 0, -2, 0))
+ .addBack(FrameTester([&](std::unique_ptr<IOBuf> buf) {
+ auto sz = buf->computeChainDataLength();
+ called++;
+ EXPECT_EQ(sz, 3);
+ }))
+ .finalize();
+
+ auto bufFrame = IOBuf::create(2);
+ bufFrame->append(2);
+ RWPrivateCursor c(bufFrame.get());
+ c.writeBE((uint16_t)3); // includes frame size
+ auto bufData = IOBuf::create(1);
+ bufData->append(1);
+
+ IOBufQueue q(IOBufQueue::cacheChainLength());
+
+ q.append(std::move(bufFrame));
+ pipeline.read(q);
+ EXPECT_EQ(called, 0);
+
+ q.append(std::move(bufData));
+ pipeline.read(q);
+ EXPECT_EQ(called, 1);
+}
+
+TEST(CodecTest, LengthFieldFrameDecoderPreHeader) {
+ ChannelPipeline<IOBufQueue&, std::unique_ptr<IOBuf>> pipeline;
+ int called = 0;
+
+ pipeline
+ .addBack(LengthFieldBasedFrameDecoder(2, 10, 2, 0, 0))
+ .addBack(FrameTester([&](std::unique_ptr<IOBuf> buf) {
+ auto sz = buf->computeChainDataLength();
+ called++;
+ EXPECT_EQ(sz, 5);
+ }))
+ .finalize();
+
+ auto bufFrame = IOBuf::create(4);
+ bufFrame->append(4);
+ RWPrivateCursor c(bufFrame.get());
+ c.write((uint16_t)100); // header
+ c.writeBE((uint16_t)1); // frame size
+ auto bufData = IOBuf::create(1);
+ bufData->append(1);
+
+ IOBufQueue q(IOBufQueue::cacheChainLength());
+
+ q.append(std::move(bufFrame));
+ pipeline.read(q);
+ EXPECT_EQ(called, 0);
+
+ q.append(std::move(bufData));
+ pipeline.read(q);
+ EXPECT_EQ(called, 1);
+}
+
+TEST(CodecTest, LengthFieldFrameDecoderPostHeader) {
+ ChannelPipeline<IOBufQueue&, std::unique_ptr<IOBuf>> pipeline;
+ int called = 0;
+
+ pipeline
+ .addBack(LengthFieldBasedFrameDecoder(2, 10, 0, 2, 0))
+ .addBack(FrameTester([&](std::unique_ptr<IOBuf> buf) {
+ auto sz = buf->computeChainDataLength();
+ called++;
+ EXPECT_EQ(sz, 5);
+ }))
+ .finalize();
+
+ auto bufFrame = IOBuf::create(4);
+ bufFrame->append(4);
+ RWPrivateCursor c(bufFrame.get());
+ c.writeBE((uint16_t)1); // frame size
+ c.write((uint16_t)100); // header
+ auto bufData = IOBuf::create(1);
+ bufData->append(1);
+
+ IOBufQueue q(IOBufQueue::cacheChainLength());
+
+ q.append(std::move(bufFrame));
+ pipeline.read(q);
+ EXPECT_EQ(called, 0);
+
+ q.append(std::move(bufData));
+ pipeline.read(q);
+ EXPECT_EQ(called, 1);
+}
+
+TEST(CodecTest, LengthFieldFrameDecoderStripPrePostHeader) {
+ ChannelPipeline<IOBufQueue&, std::unique_ptr<IOBuf>> pipeline;
+ int called = 0;
+
+ pipeline
+ .addBack(LengthFieldBasedFrameDecoder(2, 10, 2, 2, 4))
+ .addBack(FrameTester([&](std::unique_ptr<IOBuf> buf) {
+ auto sz = buf->computeChainDataLength();
+ called++;
+ EXPECT_EQ(sz, 3);
+ }))
+ .finalize();
+
+ auto bufFrame = IOBuf::create(6);
+ bufFrame->append(6);
+ RWPrivateCursor c(bufFrame.get());
+ c.write((uint16_t)100); // pre header
+ c.writeBE((uint16_t)1); // frame size
+ c.write((uint16_t)100); // post header
+ auto bufData = IOBuf::create(1);
+ bufData->append(1);
+
+ IOBufQueue q(IOBufQueue::cacheChainLength());
+
+ q.append(std::move(bufFrame));
+ pipeline.read(q);
+ EXPECT_EQ(called, 0);
+
+ q.append(std::move(bufData));
+ pipeline.read(q);
+ EXPECT_EQ(called, 1);
+}
+
+TEST(CodecTest, LengthFieldFrameDecoderStripPrePostHeaderFrameInclHeader) {
+ ChannelPipeline<IOBufQueue&, std::unique_ptr<IOBuf>> pipeline;
+ int called = 0;
+
+ pipeline
+ .addBack(LengthFieldBasedFrameDecoder(2, 10, 2, -2, 4))
+ .addBack(FrameTester([&](std::unique_ptr<IOBuf> buf) {
+ auto sz = buf->computeChainDataLength();
+ called++;
+ EXPECT_EQ(sz, 3);
+ }))
+ .finalize();
+
+ auto bufFrame = IOBuf::create(6);
+ bufFrame->append(6);
+ RWPrivateCursor c(bufFrame.get());
+ c.write((uint16_t)100); // pre header
+ c.writeBE((uint16_t)5); // frame size
+ c.write((uint16_t)100); // post header
+ auto bufData = IOBuf::create(1);
+ bufData->append(1);
+
+ IOBufQueue q(IOBufQueue::cacheChainLength());
+
+ q.append(std::move(bufFrame));
+ pipeline.read(q);
+ EXPECT_EQ(called, 0);
+
+ q.append(std::move(bufData));
+ pipeline.read(q);
+ EXPECT_EQ(called, 1);
+}
--- /dev/null
+/*
+ * Copyright 2015 Facebook, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#pragma once
+
+#include <folly/wangle/codec/ByteToMessageCodec.h>
+
+namespace folly {namespace wangle {
+
+/**
+ * A decoder that splits the received IOBufs by the fixed number
+ * of bytes. For example, if you received the following four
+ * fragmented packets:
+ *
+ * +---+----+------+----+
+ * | A | BC | DEFG | HI |
+ * +---+----+------+----+
+ *
+ * A FixedLengthFrameDecoder will decode them into the following three
+ * packets with the fixed length:
+ *
+ * +-----+-----+-----+
+ * | ABC | DEF | GHI |
+ * +-----+-----+-----+
+ *
+ */
+class FixedLengthFrameDecoder
+ : public ByteToMessageCodec {
+ public:
+
+ FixedLengthFrameDecoder(size_t length)
+ : length_(length) {}
+
+ std::unique_ptr<IOBuf> decode(Context* ctx, IOBufQueue& q, size_t& needed) {
+ if (q.chainLength() < length_) {
+ needed = length_ - q.chainLength();
+ return nullptr;
+ }
+
+ return q.split(length_);
+ }
+
+ private:
+ size_t length_;
+};
+
+}} // Namespace
--- /dev/null
+/*
+ * Copyright 2015 Facebook, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#include <folly/wangle/codec/LengthFieldBasedFrameDecoder.h>
+
+namespace folly { namespace wangle {
+
+LengthFieldBasedFrameDecoder::LengthFieldBasedFrameDecoder(
+ uint32_t lengthFieldLength,
+ uint32_t maxFrameLength,
+ uint32_t lengthFieldOffset,
+ uint32_t lengthAdjustment,
+ uint32_t initialBytesToStrip,
+ bool networkByteOrder)
+ : lengthFieldLength_(lengthFieldLength)
+ , maxFrameLength_(maxFrameLength)
+ , lengthFieldOffset_(lengthFieldOffset)
+ , lengthAdjustment_(lengthAdjustment)
+ , initialBytesToStrip_(initialBytesToStrip)
+ , networkByteOrder_(networkByteOrder)
+ , lengthFieldEndOffset_(lengthFieldOffset + lengthFieldLength) {
+ CHECK(maxFrameLength > 0);
+ CHECK(lengthFieldOffset <= maxFrameLength - lengthFieldLength);
+}
+
+std::unique_ptr<IOBuf> LengthFieldBasedFrameDecoder::decode(
+ Context* ctx, IOBufQueue& buf, size_t&) {
+ // discarding too long frame
+ if (buf.chainLength() <= lengthFieldEndOffset_) {
+ return nullptr;
+ }
+
+ uint64_t frameLength = getUnadjustedFrameLength(
+ buf, lengthFieldOffset_, lengthFieldLength_, networkByteOrder_);
+
+ frameLength += lengthAdjustment_ + lengthFieldEndOffset_;
+
+ if (frameLength < lengthFieldEndOffset_) {
+ throw std::runtime_error("Frame too small");
+ }
+
+ if (frameLength > maxFrameLength_) {
+ throw std::runtime_error("Frame larger than " +
+ folly::to<std::string>(maxFrameLength_));
+ }
+
+ if (buf.chainLength() < frameLength) {
+ return nullptr;
+ }
+
+ if (initialBytesToStrip_ > frameLength) {
+ throw std::runtime_error("InitialBytesToSkip larger than frame");
+ }
+
+ buf.trimStart(initialBytesToStrip_);
+ int actualFrameLength = frameLength - initialBytesToStrip_;
+ return buf.split(actualFrameLength);
+}
+
+uint64_t LengthFieldBasedFrameDecoder::getUnadjustedFrameLength(
+ IOBufQueue& buf, int offset, int length, bool networkByteOrder) {
+ folly::io::Cursor c(buf.front());
+ uint64_t frameLength;
+
+ c.skip(offset);
+
+ switch(length) {
+ case 1:{
+ if (networkByteOrder) {
+ frameLength = c.readBE<uint8_t>();
+ } else {
+ frameLength = c.readLE<uint8_t>();
+ }
+ break;
+ }
+ case 2:{
+ if (networkByteOrder) {
+ frameLength = c.readBE<uint16_t>();
+ } else {
+ frameLength = c.readLE<uint16_t>();
+ }
+ break;
+ }
+ case 4:{
+ if (networkByteOrder) {
+ frameLength = c.readBE<uint32_t>();
+ } else {
+ frameLength = c.readLE<uint32_t>();
+ }
+ break;
+ }
+ case 8:{
+ if (networkByteOrder) {
+ frameLength = c.readBE<uint64_t>();
+ } else {
+ frameLength = c.readLE<uint64_t>();
+ }
+ break;
+ }
+ }
+
+ return frameLength;
+}
+
+
+}} // namespace
--- /dev/null
+/*
+ * Copyright 2015 Facebook, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#pragma once
+
+#include <folly/wangle/codec/ByteToMessageCodec.h>
+#include <folly/io/Cursor.h>
+
+namespace folly { namespace wangle {
+
+/**
+ * A decoder that splits the received IOBufs dynamically by the
+ * value of the length field in the message. It is particularly useful when you
+ * decode a binary message which has an integer header field that represents the
+ * length of the message body or the whole message.
+ *
+ * LengthFieldBasedFrameDecoder has many configuration parameters so
+ * that it can decode any message with a length field, which is often seen in
+ * proprietary client-server protocols. Here are some example that will give
+ * you the basic idea on which option does what.
+ *
+ * 2 bytes length field at offset 0, do not strip header
+ *
+ * The value of the length field in this example is 12 (0x0C) which
+ * represents the length of "HELLO, WORLD". By default, the decoder assumes
+ * that the length field represents the number of the bytes that follows the
+ * length field. Therefore, it can be decoded with the simplistic parameter
+ * combination.
+ *
+ * lengthFieldOffset = 0
+ * lengthFieldLength = 2
+ * lengthAdjustment = 0
+ * initialBytesToStrip = 0 (= do not strip header)
+ *
+ * BEFORE DECODE (14 bytes) AFTER DECODE (14 bytes)
+ * +--------+----------------+ +--------+----------------+
+ * | Length | Actual Content |----->| Length | Actual Content |
+ * | 0x000C | "HELLO, WORLD" | | 0x000C | "HELLO, WORLD" |
+ * +--------+----------------+ +--------+----------------+
+ *
+ *
+ * 2 bytes length field at offset 0, strip header
+ *
+ * Because we can get the length of the content by calling
+ * ioBuf->computeChainDataLength(), you might want to strip the length
+ * field by specifying initialBytesToStrip. In this example, we
+ * specified 2, that is same with the length of the length field, to
+ * strip the first two bytes.
+ *
+ * lengthFieldOffset = 0
+ * lengthFieldLength = 2
+ * lengthAdjustment = 0
+ * initialBytesToStrip = 2 (= the length of the Length field)
+ *
+ * BEFORE DECODE (14 bytes) AFTER DECODE (12 bytes)
+ * +--------+----------------+ +----------------+
+ * | Length | Actual Content |----->| Actual Content |
+ * | 0x000C | "HELLO, WORLD" | | "HELLO, WORLD" |
+ * +--------+----------------+ +----------------+
+ *
+ *
+ * 2 bytes length field at offset 0, do not strip header, the length field
+ * represents the length of the whole message
+ *
+ * In most cases, the length field represents the length of the message body
+ * only, as shown in the previous examples. However, in some protocols, the
+ * length field represents the length of the whole message, including the
+ * message header. In such a case, we specify a non-zero
+ * lengthAdjustment. Because the length value in this example message
+ * is always greater than the body length by 2, we specify -2
+ * as lengthAdjustment for compensation.
+ *
+ * lengthFieldOffset = 0
+ * lengthFieldLength = 2
+ * lengthAdjustment = -2 (= the length of the Length field)
+ * initialBytesToStrip = 0
+ *
+ * BEFORE DECODE (14 bytes) AFTER DECODE (14 bytes)
+ * +--------+----------------+ +--------+----------------+
+ * | Length | Actual Content |----->| Length | Actual Content |
+ * | 0x000E | "HELLO, WORLD" | | 0x000E | "HELLO, WORLD" |
+ * +--------+----------------+ +--------+----------------+
+ *
+ *
+ * 3 bytes length field at the end of 5 bytes header, do not strip header
+ *
+ * The following message is a simple variation of the first example. An extra
+ * header value is prepended to the message. lengthAdjustment is zero
+ * again because the decoder always takes the length of the prepended data into
+ * account during frame length calculation.
+ *
+ * lengthFieldOffset = 2 (= the length of Header 1)
+ * lengthFieldLength = 3
+ * lengthAdjustment = 0
+ * initialBytesToStrip = 0
+ *
+ * BEFORE DECODE (17 bytes) AFTER DECODE (17 bytes)
+ * +----------+----------+----------------+ +----------+----------+----------------+
+ * | Header 1 | Length | Actual Content |----->| Header 1 | Length | Actual Content |
+ * | 0xCAFE | 0x00000C | "HELLO, WORLD" | | 0xCAFE | 0x00000C | "HELLO, WORLD" |
+ * +----------+----------+----------------+ +----------+----------+----------------+
+ *
+ *
+ * 3 bytes length field at the beginning of 5 bytes header, do not strip header
+ *
+ * This is an advanced example that shows the case where there is an extra
+ * header between the length field and the message body. You have to specify a
+ * positive lengthAdjustment so that the decoder counts the extra
+ * header into the frame length calculation.
+ *
+ * lengthFieldOffset = 0
+ * lengthFieldLength = 3
+ * lengthAdjustment = 2 (= the length of Header 1)
+ * initialBytesToStrip = 0
+ *
+ * BEFORE DECODE (17 bytes) AFTER DECODE (17 bytes)
+ * +----------+----------+----------------+ +----------+----------+----------------+
+ * | Length | Header 1 | Actual Content |----->| Length | Header 1 | Actual Content |
+ * | 0x00000C | 0xCAFE | "HELLO, WORLD" | | 0x00000C | 0xCAFE | "HELLO, WORLD" |
+ * +----------+----------+----------------+ +----------+----------+----------------+
+ *
+ *
+ * 2 bytes length field at offset 1 in the middle of 4 bytes header,
+ * strip the first header field and the length field
+ *
+ * This is a combination of all the examples above. There are the prepended
+ * header before the length field and the extra header after the length field.
+ * The prepended header affects the lengthFieldOffset and the extra
+ * header affects the lengthAdjustment. We also specified a non-zero
+ * initialBytesToStrip to strip the length field and the prepended
+ * header from the frame. If you don't want to strip the prepended header, you
+ * could specify 0 for initialBytesToSkip.
+ *
+ * lengthFieldOffset = 1 (= the length of HDR1)
+ * lengthFieldLength = 2
+ * lengthAdjustment = 1 (= the length of HDR2)
+ * initialBytesToStrip = 3 (= the length of HDR1 + LEN)
+ *
+ * BEFORE DECODE (16 bytes) AFTER DECODE (13 bytes)
+ * +------+--------+------+----------------+ +------+----------------+
+ * | HDR1 | Length | HDR2 | Actual Content |----->| HDR2 | Actual Content |
+ * | 0xCA | 0x000C | 0xFE | "HELLO, WORLD" | | 0xFE | "HELLO, WORLD" |
+ * +------+--------+------+----------------+ +------+----------------+
+ *
+ *
+ * 2 bytes length field at offset 1 in the middle of 4 bytes header,
+ * strip the first header field and the length field, the length field
+ * represents the length of the whole message
+ *
+ * Let's give another twist to the previous example. The only difference from
+ * the previous example is that the length field represents the length of the
+ * whole message instead of the message body, just like the third example.
+ * We have to count the length of HDR1 and Length into lengthAdjustment.
+ * Please note that we don't need to take the length of HDR2 into account
+ * because the length field already includes the whole header length.
+ *
+ * lengthFieldOffset = 1
+ * lengthFieldLength = 2
+ * lengthAdjustment = -3 (= the length of HDR1 + LEN, negative)
+ * initialBytesToStrip = 3
+ *
+ * BEFORE DECODE (16 bytes) AFTER DECODE (13 bytes)
+ * +------+--------+------+----------------+ +------+----------------+
+ * | HDR1 | Length | HDR2 | Actual Content |----->| HDR2 | Actual Content |
+ * | 0xCA | 0x0010 | 0xFE | "HELLO, WORLD" | | 0xFE | "HELLO, WORLD" |
+ * +------+--------+------+----------------+ +------+----------------+
+ *
+ * @see LengthFieldPrepender
+ */
+class LengthFieldBasedFrameDecoder : public ByteToMessageCodec {
+ public:
+ LengthFieldBasedFrameDecoder(
+ uint32_t lengthFieldLength = 4,
+ uint32_t maxFrameLength = UINT_MAX,
+ uint32_t lengthFieldOffset = 0,
+ uint32_t lengthAdjustment = 0,
+ uint32_t initialBytesToStrip = 4,
+ bool networkByteOrder = true);
+
+ std::unique_ptr<IOBuf> decode(Context* ctx, IOBufQueue& buf, size_t&);
+
+ private:
+
+ uint64_t getUnadjustedFrameLength(
+ IOBufQueue& buf, int offset, int length, bool networkByteOrder);
+
+ uint32_t lengthFieldLength_;
+ uint32_t maxFrameLength_;
+ uint32_t lengthFieldOffset_;
+ uint32_t lengthAdjustment_;
+ uint32_t initialBytesToStrip_;
+ bool networkByteOrder_;
+
+ uint32_t lengthFieldEndOffset_;
+};
+
+}} // namespace
--- /dev/null
+/*
+ * Copyright 2015 Facebook, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#include <folly/wangle/codec/LengthFieldPrepender.h>
+
+namespace folly { namespace wangle {
+
+LengthFieldPrepender::LengthFieldPrepender(
+ int lengthFieldLength,
+ int lengthAdjustment,
+ bool lengthIncludesLengthField,
+ bool networkByteOrder)
+ : lengthFieldLength_(lengthFieldLength)
+ , lengthAdjustment_(lengthAdjustment)
+ , lengthIncludesLengthField_(lengthIncludesLengthField)
+ , networkByteOrder_(networkByteOrder) {
+ CHECK(lengthFieldLength == 1 ||
+ lengthFieldLength == 2 ||
+ lengthFieldLength == 4 ||
+ lengthFieldLength == 8 );
+ }
+
+Future<void> LengthFieldPrepender::write(
+ Context* ctx, std::unique_ptr<IOBuf> buf) {
+ int length = lengthAdjustment_ + buf->computeChainDataLength();
+ if (lengthIncludesLengthField_) {
+ length += lengthFieldLength_;
+ }
+
+ if (length < 0) {
+ throw std::runtime_error("Length field < 0");
+ }
+
+ auto len = IOBuf::create(lengthFieldLength_);
+ len->append(lengthFieldLength_);
+ folly::io::RWPrivateCursor c(len.get());
+
+ switch (lengthFieldLength_) {
+ case 1: {
+ if (length >= 256) {
+ throw std::runtime_error("length does not fit byte");
+ }
+ if (networkByteOrder_) {
+ c.writeBE((uint8_t)length);
+ } else {
+ c.writeLE((uint8_t)length);
+ }
+ break;
+ }
+ case 2: {
+ if (length >= 65536) {
+ throw std::runtime_error("length does not fit byte");
+ }
+ if (networkByteOrder_) {
+ c.writeBE((uint16_t)length);
+ } else {
+ c.writeLE((uint16_t)length);
+ }
+ break;
+ }
+ case 4: {
+ if (networkByteOrder_) {
+ c.writeBE((uint32_t)length);
+ } else {
+ c.writeLE((uint32_t)length);
+ }
+ break;
+ }
+ case 8: {
+ if (networkByteOrder_) {
+ c.writeBE((uint64_t)length);
+ } else {
+ c.writeLE((uint64_t)length);
+ }
+ break;
+ }
+ default: {
+ throw std::runtime_error("Invalid lengthFieldLength");
+ }
+ }
+
+ len->prependChain(std::move(buf));
+ return ctx->fireWrite(std::move(len));
+}
+
+
+}} // Namespace
--- /dev/null
+/*
+ * Copyright 2015 Facebook, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#pragma once
+
+#include <folly/wangle/codec/ByteToMessageCodec.h>
+#include <folly/io/Cursor.h>
+
+namespace folly { namespace wangle {
+
+/**
+ * An encoder that prepends the length of the message. The length value is
+ * prepended as a binary form.
+ *
+ * For example, LengthFieldPrepender(2)will encode the
+ * following 12-bytes string:
+ *
+ * +----------------+
+ * | "HELLO, WORLD" |
+ * +----------------+
+ *
+ * into the following:
+ *
+ * +--------+----------------+
+ * + 0x000C | "HELLO, WORLD" |
+ * +--------+----------------+
+ *
+ * If you turned on the lengthIncludesLengthFieldLength flag in the
+ * constructor, the encoded data would look like the following
+ * (12 (original data) + 2 (prepended data) = 14 (0xE)):
+ *
+ * +--------+----------------+
+ * + 0x000E | "HELLO, WORLD" |
+ * +--------+----------------+
+ *
+ */
+class LengthFieldPrepender
+: public BytesToBytesHandler {
+ public:
+ LengthFieldPrepender(
+ int lengthFieldLength = 4,
+ int lengthAdjustment = 0,
+ bool lengthIncludesLengthField = false,
+ bool networkByteOrder = true);
+
+ Future<void> write(Context* ctx, std::unique_ptr<IOBuf> buf);
+
+ private:
+ int lengthFieldLength_;
+ int lengthAdjustment_;
+ bool lengthIncludesLengthField_;
+ bool networkByteOrder_;
+};
+
+}} // namespace
--- /dev/null
+Codecs are modeled after netty's codecs:
+
+https://github.com/netty/netty/tree/master/codec/src/main/java/io/netty/handler/codec
+
+Most of the changes are due to differing memory allocation strategies.
\ No newline at end of file