From: James Sedgwick Date: Thu, 30 Apr 2015 01:20:45 +0000 (-0700) Subject: use inbound/outbound handlers in a few more places X-Git-Tag: v0.38.0~33 X-Git-Url: http://demsky.eecs.uci.edu/git/?a=commitdiff_plain;h=fdd8e84bddefb7fbf07df87b9ed83d6b748c68f3;p=folly.git use inbound/outbound handlers in a few more places Summary: Also, instead of throwing on finalize() if there's no inbound/outbound handler in the pipeline, log a warning and throw when the operations are attempted. This was necessary for CodecTest which doesn't use outbound handlers. Test Plan: unit Reviewed By: davejwatson@fb.com Subscribers: trunkagent, fugalh, folly-diffs@, jsedgwick, yfeldblum, chalfant FB internal diff: D2028176 Tasks: 6836580 Signature: t1:2028176:1430346333:fdd645a535e8158d780cfd1119e27803995b663f --- diff --git a/folly/wangle/bootstrap/ServerBootstrap-inl.h b/folly/wangle/bootstrap/ServerBootstrap-inl.h index ee3f9b22..54fde196 100644 --- a/folly/wangle/bootstrap/ServerBootstrap-inl.h +++ b/folly/wangle/bootstrap/ServerBootstrap-inl.h @@ -28,7 +28,7 @@ namespace folly { template class ServerAcceptor : public Acceptor - , public folly::wangle::HandlerAdapter { + , public folly::wangle::InboundHandler { typedef std::unique_ptr PipelinePtr; @@ -86,10 +86,6 @@ class ServerAcceptor Acceptor::addConnection(connection); } - folly::Future write(Context* ctx, std::exception e) { - return ctx->fireWrite(e); - } - /* See Acceptor::onNewConnection for details */ void onNewConnection( AsyncSocket::UniquePtr transport, const SocketAddress* address, diff --git a/folly/wangle/channel/Handler.h b/folly/wangle/channel/Handler.h index 4332878f..676c2fab 100644 --- a/folly/wangle/channel/Handler.h +++ b/folly/wangle/channel/Handler.h @@ -159,7 +159,7 @@ class HandlerAdapter : public Handler { typedef HandlerAdapter> BytesToBytesHandler; -typedef InboundHandler +typedef InboundHandler> InboundBytesToBytesHandler; typedef OutboundHandler> diff --git a/folly/wangle/channel/Pipeline.h b/folly/wangle/channel/Pipeline.h index 48b3db4d..db7a71ef 100644 --- a/folly/wangle/channel/Pipeline.h +++ b/folly/wangle/channel/Pipeline.h @@ -62,22 +62,38 @@ class Pipeline : public DelayedDestruction { } void read(R msg) { + if (!front_) { + throw std::invalid_argument("read(): no inbound handler in Pipeline"); + } front_->read(std::forward(msg)); } void readEOF() { + if (!front_) { + throw std::invalid_argument("readEOF(): no inbound handler in Pipeline"); + } front_->readEOF(); } void readException(exception_wrapper e) { + if (!front_) { + throw std::invalid_argument( + "readException(): no inbound handler in Pipeline"); + } front_->readException(std::move(e)); } Future write(W msg) { + if (!back_) { + throw std::invalid_argument("write(): no outbound handler in Pipeline"); + } return back_->write(std::forward(msg)); } Future close() { + if (!back_) { + throw std::invalid_argument("close(): no outbound handler in Pipeline"); + } return back_->close(); } @@ -138,10 +154,12 @@ class Pipeline : public DelayedDestruction { } if (!front_) { - throw std::invalid_argument("no inbound handler in Pipeline"); + LOG(WARNING) << "No inbound handler in Pipeline, " + "inbound operations will throw std::invalid_argument"; } if (!back_) { - throw std::invalid_argument("no outbound handler in Pipeline"); + LOG(WARNING) << "No outbound handler in Pipeline, " + "outbound operations will throw std::invalid_argument"; } for (auto it = ctxs_.rbegin(); it != ctxs_.rend(); it++) { diff --git a/folly/wangle/channel/test/PipelineTest.cpp b/folly/wangle/channel/test/PipelineTest.cpp index 46149560..4c940583 100644 --- a/folly/wangle/channel/test/PipelineTest.cpp +++ b/folly/wangle/channel/test/PipelineTest.cpp @@ -250,6 +250,13 @@ TEST(PipelineTest, HandlerInPipelineTwice) { EXPECT_CALL(handler, detachPipeline(_)).Times(2); } +TEST(PipelineTest, NoDetachOnOwner) { + IntHandler handler; + EXPECT_CALL(handler, attachPipeline(_)); + StaticPipeline pipeline(&handler); + pipeline.setOwner(&handler); +} + template class ConcreteHandler : public Handler { typedef typename Handler::Context Context; @@ -262,22 +269,21 @@ typedef HandlerAdapter StringHandler; typedef ConcreteHandler IntToStringHandler; typedef ConcreteHandler StringToIntHandler; -TEST(Pipeline, DynamicConstruction) { - { - Pipeline pipeline; - EXPECT_THROW( - pipeline - .addBack(HandlerAdapter{}) - .finalize(), std::invalid_argument); - } - { - Pipeline pipeline; - EXPECT_THROW( - pipeline - .addFront(HandlerAdapter{}) - .finalize(), +TEST(Pipeline, MissingInboundOrOutbound) { + Pipeline pipeline; + pipeline + .addBack(HandlerAdapter{}) + .finalize(); + EXPECT_THROW(pipeline.read(0), std::invalid_argument); + EXPECT_THROW(pipeline.readEOF(), std::invalid_argument); + EXPECT_THROW( + pipeline.readException(exception_wrapper(std::runtime_error("blah"))), std::invalid_argument); - } + EXPECT_THROW(pipeline.write(0), std::invalid_argument); + EXPECT_THROW(pipeline.close(), std::invalid_argument); +} + +TEST(Pipeline, DynamicConstruction) { { StaticPipeline pipeline{StringHandler(), StringHandler()}; diff --git a/folly/wangle/codec/ByteToMessageCodec.cpp b/folly/wangle/codec/ByteToMessageCodec.cpp index 52ee794f..e16183bb 100644 --- a/folly/wangle/codec/ByteToMessageCodec.cpp +++ b/folly/wangle/codec/ByteToMessageCodec.cpp @@ -23,8 +23,7 @@ void ByteToMessageCodec::read(Context* ctx, IOBufQueue& q) { while (true) { result = decode(ctx, q, needed); if (result) { - q_.append(std::move(result)); - ctx->fireRead(q_); + ctx->fireRead(std::move(result)); } else { break; } diff --git a/folly/wangle/codec/ByteToMessageCodec.h b/folly/wangle/codec/ByteToMessageCodec.h index 53ec3d8e..600bb028 100644 --- a/folly/wangle/codec/ByteToMessageCodec.h +++ b/folly/wangle/codec/ByteToMessageCodec.h @@ -47,9 +47,6 @@ class ByteToMessageCodec Context* ctx, IOBufQueue& buf, size_t&) = 0; void read(Context* ctx, IOBufQueue& q); - - private: - IOBufQueue q_; }; }} diff --git a/folly/wangle/codec/CodecTest.cpp b/folly/wangle/codec/CodecTest.cpp index c4457788..ecca824f 100644 --- a/folly/wangle/codec/CodecTest.cpp +++ b/folly/wangle/codec/CodecTest.cpp @@ -25,13 +25,13 @@ using namespace folly::wangle; using namespace folly::io; class FrameTester - : public BytesToBytesHandler { + : public InboundHandler> { public: - FrameTester(std::function)> test) + explicit FrameTester(std::function)> test) : test_(test) {} - void read(Context* ctx, IOBufQueue& q) { - test_(q.move()); + void read(Context* ctx, std::unique_ptr buf) { + test_(std::move(buf)); } void readException(Context* ctx, exception_wrapper w) { diff --git a/folly/wangle/service/ClientDispatcher.h b/folly/wangle/service/ClientDispatcher.h index b05aa9a2..820d6478 100644 --- a/folly/wangle/service/ClientDispatcher.h +++ b/folly/wangle/service/ClientDispatcher.h @@ -26,11 +26,11 @@ namespace folly { namespace wangle { * only one request is allowed at a time. */ template -class SerialClientDispatcher : public HandlerAdapter +class SerialClientDispatcher : public InboundHandler , public Service { public: - typedef typename HandlerAdapter::Context Context; + typedef typename InboundHandler::Context Context; void setPipeline(Pipeline* pipeline) { pipeline_ = pipeline;