From: James Sedgwick Date: Thu, 30 Apr 2015 01:04:41 +0000 (-0700) Subject: inbound/outbound handlers X-Git-Tag: v0.38.0~34 X-Git-Url: http://demsky.eecs.uci.edu/git/?a=commitdiff_plain;h=555213ca924f9e5a174cc8efd0f5da8b6c2ad4a9;p=folly.git inbound/outbound handlers Summary: Much less copypasta this time around. I wonder if the getters and setters for write flags and read buffer settings are necessary in the new handler types, or even if they belong in the bidirectional handler I'm all ears for more suggestions on reducing copypasta I'm going to reorg the code (inl headers etc) in a subsequent diff once this is in - easier to review this way Test Plan: existing unit, thinking about tests for these changes Reviewed By: davejwatson@fb.com Subscribers: fugalh, folly-diffs@, jsedgwick, yfeldblum, chalfant FB internal diff: D2026522 Tasks: 6836580 Signature: t1:2026522:1430346145:bd7f7770eddce0470e2ac72440fc001cf128df08 --- diff --git a/folly/wangle/channel/Handler.h b/folly/wangle/channel/Handler.h index 73d59a64..4332878f 100644 --- a/folly/wangle/channel/Handler.h +++ b/folly/wangle/channel/Handler.h @@ -51,6 +51,8 @@ class HandlerBase { template class Handler : public HandlerBase> { public: + static const HandlerDir dir = HandlerDir::BOTH; + typedef Rin rin; typedef Rout rout; typedef Win win; @@ -99,6 +101,47 @@ class Handler : public HandlerBase> { */ }; +struct Unit{}; + +template +class InboundHandler : public HandlerBase> { + public: + static const HandlerDir dir = HandlerDir::IN; + + typedef Rin rin; + typedef Rout rout; + typedef Unit win; + typedef Unit wout; + typedef InboundHandlerContext Context; + virtual ~InboundHandler() {} + + virtual void read(Context* ctx, Rin msg) = 0; + virtual void readEOF(Context* ctx) { + ctx->fireReadEOF(); + } + virtual void readException(Context* ctx, exception_wrapper e) { + ctx->fireReadException(std::move(e)); + } +}; + +template +class OutboundHandler : public HandlerBase> { + public: + static const HandlerDir dir = HandlerDir::OUT; + + typedef Unit rin; + typedef Unit rout; + typedef Win win; + typedef Wout wout; + typedef OutboundHandlerContext Context; + virtual ~OutboundHandler() {} + + virtual Future write(Context* ctx, Win msg) = 0; + virtual Future close(Context* ctx) { + return ctx->fireClose(); + } +}; + template class HandlerAdapter : public Handler { public: @@ -116,4 +159,10 @@ class HandlerAdapter : public Handler { typedef HandlerAdapter> BytesToBytesHandler; +typedef InboundHandler +InboundBytesToBytesHandler; + +typedef OutboundHandler> +OutboundBytesToBytesHandler; + }} diff --git a/folly/wangle/channel/HandlerContext.h b/folly/wangle/channel/HandlerContext.h index 4bd79f5f..2f899796 100644 --- a/folly/wangle/channel/HandlerContext.h +++ b/folly/wangle/channel/HandlerContext.h @@ -55,6 +55,39 @@ class HandlerContext { */ }; +template +class InboundHandlerContext { + public: + virtual ~InboundHandlerContext() {} + + virtual void fireRead(In msg) = 0; + virtual void fireReadEOF() = 0; + virtual void fireReadException(exception_wrapper e) = 0; + + virtual std::shared_ptr getTransport() = 0; + + // TODO Need get/set writeFlags, readBufferSettings? Probably not. + // Do we even really need them stored in the pipeline at all? + // Could just always delegate to the socket impl +}; + +template +class OutboundHandlerContext { + public: + virtual ~OutboundHandlerContext() {} + + virtual Future fireWrite(Out msg) = 0; + virtual Future fireClose() = 0; + + virtual std::shared_ptr getTransport() = 0; +}; + +enum class HandlerDir { + IN, + OUT, + BOTH +}; + class PipelineContext { public: virtual ~PipelineContext() {} @@ -74,12 +107,6 @@ class PipelineContext { } } - void link(PipelineContext* other) { - setNextIn(other); - other->setNextOut(this); - } - - protected: virtual void setNextIn(PipelineContext* ctx) = 0; virtual void setNextOut(PipelineContext* ctx) = 0; }; @@ -144,7 +171,7 @@ class ContextImplBase : public PipelineContext { if (nextIn) { nextIn_ = nextIn; } else { - throw std::invalid_argument("wrong type in setNextIn"); + throw std::invalid_argument("inbound type mismatch"); } } @@ -153,7 +180,7 @@ class ContextImplBase : public PipelineContext { if (nextOut) { nextOut_ = nextOut; } else { - throw std::invalid_argument("wrong type in setNextOut"); + throw std::invalid_argument("outbound type mismatch"); } } @@ -170,16 +197,19 @@ class ContextImplBase : public PipelineContext { }; template -class ContextImpl : public HandlerContext, - public InboundLink, - public OutboundLink, - public ContextImplBase> { +class ContextImpl + : public HandlerContext, + public InboundLink, + public OutboundLink, + public ContextImplBase> { public: typedef typename H::rin Rin; typedef typename H::rout Rout; typedef typename H::win Win; typedef typename H::wout Wout; + static const HandlerDir dir = HandlerDir::BOTH; explicit ContextImpl(P* pipeline, std::shared_ptr handler) { this->impl_ = this; @@ -294,4 +324,157 @@ class ContextImpl : public HandlerContext +class InboundContextImpl + : public InboundHandlerContext, + public InboundLink, + public ContextImplBase> { + public: + typedef typename H::rin Rin; + typedef typename H::rout Rout; + typedef typename H::win Win; + typedef typename H::wout Wout; + static const HandlerDir dir = HandlerDir::IN; + + explicit InboundContextImpl(P* pipeline, std::shared_ptr handler) { + this->impl_ = this; + this->initialize(pipeline, std::move(handler)); + } + + // For StaticPipeline + InboundContextImpl() { + this->impl_ = this; + } + + ~InboundContextImpl() {} + + // InboundHandlerContext overrides + void fireRead(Rout msg) override { + DestructorGuard dg(this->pipeline_); + if (this->nextIn_) { + this->nextIn_->read(std::forward(msg)); + } else { + LOG(WARNING) << "read reached end of pipeline"; + } + } + + void fireReadEOF() override { + DestructorGuard dg(this->pipeline_); + if (this->nextIn_) { + this->nextIn_->readEOF(); + } else { + LOG(WARNING) << "readEOF reached end of pipeline"; + } + } + + void fireReadException(exception_wrapper e) override { + DestructorGuard dg(this->pipeline_); + if (this->nextIn_) { + this->nextIn_->readException(std::move(e)); + } else { + LOG(WARNING) << "readException reached end of pipeline"; + } + } + + std::shared_ptr getTransport() override { + return this->pipeline_->getTransport(); + } + + // InboundLink overrides + void read(Rin msg) override { + DestructorGuard dg(this->pipeline_); + this->handler_->read(this, std::forward(msg)); + } + + void readEOF() override { + DestructorGuard dg(this->pipeline_); + this->handler_->readEOF(this); + } + + void readException(exception_wrapper e) override { + DestructorGuard dg(this->pipeline_); + this->handler_->readException(this, std::move(e)); + } + + private: + using DestructorGuard = typename P::DestructorGuard; +}; + +template +class OutboundContextImpl + : public OutboundHandlerContext, + public OutboundLink, + public ContextImplBase> { + public: + typedef typename H::rin Rin; + typedef typename H::rout Rout; + typedef typename H::win Win; + typedef typename H::wout Wout; + static const HandlerDir dir = HandlerDir::OUT; + + explicit OutboundContextImpl(P* pipeline, std::shared_ptr handler) { + this->impl_ = this; + this->initialize(pipeline, std::move(handler)); + } + + // For StaticPipeline + OutboundContextImpl() { + this->impl_ = this; + } + + ~OutboundContextImpl() {} + + // OutboundHandlerContext overrides + Future fireWrite(Wout msg) override { + DestructorGuard dg(this->pipeline_); + if (this->nextOut_) { + return this->nextOut_->write(std::forward(msg)); + } else { + LOG(WARNING) << "write reached end of pipeline"; + return makeFuture(); + } + } + + Future fireClose() override { + DestructorGuard dg(this->pipeline_); + if (this->nextOut_) { + return this->nextOut_->close(); + } else { + LOG(WARNING) << "close reached end of pipeline"; + return makeFuture(); + } + } + + std::shared_ptr getTransport() override { + return this->pipeline_->getTransport(); + } + + // OutboundLink overrides + Future write(Win msg) override { + DestructorGuard dg(this->pipeline_); + return this->handler_->write(this, std::forward(msg)); + } + + Future close() override { + DestructorGuard dg(this->pipeline_); + return this->handler_->close(this); + } + + private: + using DestructorGuard = typename P::DestructorGuard; +}; + +template +struct ContextType { + typedef typename std::conditional< + Handler::dir == HandlerDir::BOTH, + ContextImpl, + typename std::conditional< + Handler::dir == HandlerDir::IN, + InboundContextImpl, + OutboundContextImpl + >::type>::type + type; +}; + }} diff --git a/folly/wangle/channel/OutputBufferingHandler.h b/folly/wangle/channel/OutputBufferingHandler.h index eb7c4248..48d8aa51 100644 --- a/folly/wangle/channel/OutputBufferingHandler.h +++ b/folly/wangle/channel/OutputBufferingHandler.h @@ -30,7 +30,7 @@ namespace folly { namespace wangle { * * This handler may only be used in a single Pipeline. */ -class OutputBufferingHandler : public BytesToBytesHandler, +class OutputBufferingHandler : public OutboundBytesToBytesHandler, protected EventBase::LoopCallback { public: Future write(Context* ctx, std::unique_ptr buf) override { diff --git a/folly/wangle/channel/Pipeline.h b/folly/wangle/channel/Pipeline.h index 738a83f9..48b3db4d 100644 --- a/folly/wangle/channel/Pipeline.h +++ b/folly/wangle/channel/Pipeline.h @@ -83,15 +83,8 @@ class Pipeline : public DelayedDestruction { template Pipeline& addBack(std::shared_ptr handler) { - ctxs_.push_back(std::make_shared>( - this, - std::move(handler))); - return *this; - } - - template - Pipeline& addBack(H* handler) { - return addBack(std::shared_ptr(handler, [](H*){})); + typedef typename ContextType::type Context; + return addHelper(std::make_shared(this, std::move(handler)), false); } template @@ -100,16 +93,14 @@ class Pipeline : public DelayedDestruction { } template - Pipeline& addFront(std::shared_ptr handler) { - ctxs_.insert( - ctxs_.begin(), - std::make_shared>(this, std::move(handler))); - return *this; + Pipeline& addBack(H* handler) { + return addBack(std::shared_ptr(handler, [](H*){})); } template - Pipeline& addFront(H* handler) { - return addFront(std::shared_ptr(handler, [](H*){})); + Pipeline& addFront(std::shared_ptr handler) { + typedef typename ContextType::type Context; + return addHelper(std::make_shared(this, std::move(handler)), true); } template @@ -117,30 +108,40 @@ class Pipeline : public DelayedDestruction { return addFront(std::make_shared(std::forward(handler))); } + template + Pipeline& addFront(H* handler) { + return addFront(std::shared_ptr(handler, [](H*){})); + } + template H* getHandler(int i) { - auto ctx = dynamic_cast*>(ctxs_[i].get()); + typedef typename ContextType::type Context; + auto ctx = dynamic_cast(ctxs_[i].get()); CHECK(ctx); return ctx->getHandler(); } + // TODO Have read/write/etc check that pipeline has been finalized void finalize() { - if (ctxs_.empty()) { - return; - } - - for (size_t i = 0; i < ctxs_.size() - 1; i++) { - ctxs_[i]->link(ctxs_[i+1].get()); + if (!inCtxs_.empty()) { + front_ = dynamic_cast*>(inCtxs_.front()); + for (size_t i = 0; i < inCtxs_.size() - 1; i++) { + inCtxs_[i]->setNextIn(inCtxs_[i+1]); + } } - back_ = dynamic_cast*>(ctxs_.back().get()); - if (!back_) { - throw std::invalid_argument("wrong type for last handler"); + if (!outCtxs_.empty()) { + back_ = dynamic_cast*>(outCtxs_.back()); + for (size_t i = outCtxs_.size() - 1; i > 0; i--) { + outCtxs_[i]->setNextOut(outCtxs_[i-1]); + } } - front_ = dynamic_cast*>(ctxs_.front().get()); if (!front_) { - throw std::invalid_argument("wrong type for first handler"); + throw std::invalid_argument("no inbound handler in Pipeline"); + } + if (!back_) { + throw std::invalid_argument("no outbound handler in Pipeline"); } for (auto it = ctxs_.rbegin(); it != ctxs_.rend(); it++) { @@ -154,8 +155,9 @@ class Pipeline : public DelayedDestruction { // See thrift/lib/cpp2/async/Cpp2Channel.cpp for an example template bool setOwner(H* handler) { + typedef typename ContextType::type Context; for (auto& ctx : ctxs_) { - auto ctxImpl = dynamic_cast*>(ctx.get()); + auto ctxImpl = dynamic_cast(ctx.get()); if (ctxImpl && ctxImpl->getHandler() == handler) { owner_ = ctx; return true; @@ -185,10 +187,8 @@ class Pipeline : public DelayedDestruction { } template - void addContextFront(Context* context) { - ctxs_.insert( - ctxs_.begin(), - std::shared_ptr(context, [](Context*){})); + void addContextFront(Context* ctx) { + addHelper(std::shared_ptr(ctx, [](Context*){}), true); } void detachHandlers() { @@ -200,15 +200,29 @@ class Pipeline : public DelayedDestruction { } private: + template + Pipeline& addHelper(std::shared_ptr&& ctx, bool front) { + ctxs_.insert(front ? ctxs_.begin() : ctxs_.end(), ctx); + if (Context::dir == HandlerDir::BOTH || Context::dir == HandlerDir::IN) { + inCtxs_.insert(front ? inCtxs_.begin() : inCtxs_.end(), ctx.get()); + } + if (Context::dir == HandlerDir::BOTH || Context::dir == HandlerDir::OUT) { + outCtxs_.insert(front ? outCtxs_.begin() : outCtxs_.end(), ctx.get()); + } + return *this; + } + std::shared_ptr transport_; WriteFlags writeFlags_{WriteFlags::NONE}; std::pair readBufferSettings_{2048, 2048}; bool isStatic_{false}; + std::shared_ptr owner_; + std::vector> ctxs_; + std::vector inCtxs_; + std::vector outCtxs_; InboundLink* front_{nullptr}; OutboundLink* back_{nullptr}; - std::vector> ctxs_; - std::shared_ptr owner_; }; }} diff --git a/folly/wangle/channel/StaticPipeline.h b/folly/wangle/channel/StaticPipeline.h index 6c6c5e08..68e0d906 100644 --- a/folly/wangle/channel/StaticPipeline.h +++ b/folly/wangle/channel/StaticPipeline.h @@ -67,8 +67,6 @@ class StaticPipeline } protected: - typedef ContextImpl, Handler> Context; - template StaticPipeline( bool isFirst, @@ -119,7 +117,7 @@ class StaticPipeline bool isFirst_; folly::Optional handler_; std::shared_ptr handlerPtr_; - ContextImpl, Handler> ctx_; + typename ContextType>::type ctx_; }; }} // folly::wangle diff --git a/folly/wangle/codec/ByteToMessageCodec.h b/folly/wangle/codec/ByteToMessageCodec.h index 20d6e7fe..53ec3d8e 100644 --- a/folly/wangle/codec/ByteToMessageCodec.h +++ b/folly/wangle/codec/ByteToMessageCodec.h @@ -40,7 +40,7 @@ namespace folly { namespace wangle { * IOBufQueue.front(), without split() or pop_front(). */ class ByteToMessageCodec - : public BytesToBytesHandler { + : public InboundBytesToBytesHandler { public: virtual std::unique_ptr decode( diff --git a/folly/wangle/codec/CodecTest.cpp b/folly/wangle/codec/CodecTest.cpp index 80bb83d3..c4457788 100644 --- a/folly/wangle/codec/CodecTest.cpp +++ b/folly/wangle/codec/CodecTest.cpp @@ -95,13 +95,13 @@ TEST(LengthFieldFramePipeline, SimpleTest) { pipeline .addBack(BytesReflector()) + .addBack(LengthFieldPrepender()) .addBack(LengthFieldBasedFrameDecoder()) .addBack(FrameTester([&](std::unique_ptr buf) { auto sz = buf->computeChainDataLength(); called++; EXPECT_EQ(sz, 2); })) - .addBack(LengthFieldPrepender()) .finalize(); auto buf = IOBuf::create(2); diff --git a/folly/wangle/codec/LengthFieldPrepender.h b/folly/wangle/codec/LengthFieldPrepender.h index 72c30d81..d2e1d37b 100644 --- a/folly/wangle/codec/LengthFieldPrepender.h +++ b/folly/wangle/codec/LengthFieldPrepender.h @@ -47,7 +47,7 @@ namespace folly { namespace wangle { * */ class LengthFieldPrepender -: public BytesToBytesHandler { +: public OutboundBytesToBytesHandler { public: LengthFieldPrepender( int lengthFieldLength = 4,