From: James Sedgwick Date: Fri, 21 Nov 2014 19:30:14 +0000 (-0800) Subject: modifiable channel pipelines X-Git-Tag: v0.22.0~151 X-Git-Url: http://demsky.eecs.uci.edu/git/?a=commitdiff_plain;h=4092355936775450e253d57b7f8a90c734b00cb3;p=folly.git modifiable channel pipelines Summary: Basically the same interface as before, but you must specify the read and write types for the ends of pipeline. Implementation is cleaner as well; there's fewer levels of indirection This dynamic casts shit all over the place and is less typesafe then the previous iteration, but I think with some carefully placed static_asserts, could be just as safe (in the case where you don't do any modification, anyway) Right now you can only add to the front or back of the pipeline but the way it's set up you could add any number of mutations, including ones that are triggered by handlers. But this should (might?) be enough for Tunnel, which was the motivation. Test Plan: basic test compiles, thrift2 diff still works with a one line change Reviewed By: hans@fb.com Subscribers: trunkagent, fugalh, njormrod, folly-diffs@, bmatheny FB internal diff: D1661169 Tasks: 5002299 Signature: t1:1661169:1416521727:1f126279796c0b09d1905b9f7dbc48a9e5540271 --- diff --git a/folly/experimental/wangle/channel/ChannelHandler.h b/folly/experimental/wangle/channel/ChannelHandler.h index 3ef7ae50..27a32447 100644 --- a/folly/experimental/wangle/channel/ChannelHandler.h +++ b/folly/experimental/wangle/channel/ChannelHandler.h @@ -138,7 +138,6 @@ class ChannelHandlerPtr : public ChannelHandler< } } - void attachTransport(Context* ctx) override { ctx_ = ctx; if (handler_) { diff --git a/folly/experimental/wangle/channel/ChannelHandlerContext.h b/folly/experimental/wangle/channel/ChannelHandlerContext.h new file mode 100644 index 00000000..b0f1064b --- /dev/null +++ b/folly/experimental/wangle/channel/ChannelHandlerContext.h @@ -0,0 +1,251 @@ +/* + * Copyright 2014 Facebook, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#pragma once + +#include +#include + +namespace folly { namespace wangle { + +template +class ChannelHandlerContext { + public: + virtual ~ChannelHandlerContext() {} + + virtual void fireRead(In msg) = 0; + virtual void fireReadEOF() = 0; + virtual void fireReadException(exception_wrapper e) = 0; + + virtual Future fireWrite(Out msg) = 0; + virtual Future fireClose() = 0; + + virtual std::shared_ptr getTransport() = 0; + + virtual void setWriteFlags(WriteFlags flags) = 0; + virtual WriteFlags getWriteFlags() = 0; + + virtual void setReadBufferSettings( + uint64_t minAvailable, + uint64_t allocationSize) = 0; + virtual std::pair getReadBufferSettings() = 0; + + /* TODO + template + virtual void addHandlerBefore(H&&) {} + template + virtual void addHandlerAfter(H&&) {} + template + virtual void replaceHandler(H&&) {} + virtual void removeHandler() {} + */ +}; + +class PipelineContext { + public: + virtual ~PipelineContext() {} + + virtual void attachTransport() = 0; + virtual void detachTransport() = 0; + + void link(PipelineContext* other) { + setNextIn(other); + other->setNextOut(this); + } + + protected: + virtual void setNextIn(PipelineContext* ctx) = 0; + virtual void setNextOut(PipelineContext* ctx) = 0; +}; + +template +class InboundChannelHandlerContext { + public: + virtual ~InboundChannelHandlerContext() {} + virtual void read(In msg) = 0; + virtual void readEOF() = 0; + virtual void readException(exception_wrapper e) = 0; +}; + +template +class OutboundChannelHandlerContext { + public: + virtual ~OutboundChannelHandlerContext() {} + virtual Future write(Out msg) = 0; + virtual Future close() = 0; +}; + +template +class ContextImpl : public ChannelHandlerContext, + public InboundChannelHandlerContext, + public OutboundChannelHandlerContext, + public PipelineContext { + public: + typedef typename H::rin Rin; + typedef typename H::rout Rout; + typedef typename H::win Win; + typedef typename H::wout Wout; + + template + explicit ContextImpl(P* pipeline, HandlerArg&& handlerArg) + : pipeline_(pipeline), + handler_(std::forward(handlerArg)) { + handler_.attachPipeline(this); + } + + ~ContextImpl() { + handler_.detachPipeline(this); + } + + H* getHandler() { + return &handler_; + } + + // PipelineContext overrides + void setNextIn(PipelineContext* ctx) override { + auto nextIn = dynamic_cast*>(ctx); + if (nextIn) { + nextIn_ = nextIn; + } else { + throw std::invalid_argument("wrong type in setNextIn"); + } + } + + void setNextOut(PipelineContext* ctx) override { + auto nextOut = dynamic_cast*>(ctx); + if (nextOut) { + nextOut_ = nextOut; + } else { + throw std::invalid_argument("wrong type in setNextOut"); + } + } + + void attachTransport() override { + typename P::DestructorGuard dg(static_cast(pipeline_)); + handler_.attachTransport(this); + } + + void detachTransport() override { + typename P::DestructorGuard dg(static_cast(pipeline_)); + handler_.detachTransport(this); + } + + // ChannelHandlerContext overrides + void fireRead(Rout msg) override { + typename P::DestructorGuard dg(static_cast(pipeline_)); + if (nextIn_) { + nextIn_->read(std::forward(msg)); + } else { + LOG(WARNING) << "read reached end of pipeline"; + } + } + + void fireReadEOF() override { + typename P::DestructorGuard dg(static_cast(pipeline_)); + if (nextIn_) { + nextIn_->readEOF(); + } else { + LOG(WARNING) << "readEOF reached end of pipeline"; + } + } + + void fireReadException(exception_wrapper e) override { + typename P::DestructorGuard dg(static_cast(pipeline_)); + if (nextIn_) { + nextIn_->readException(std::move(e)); + } else { + LOG(WARNING) << "readException reached end of pipeline"; + } + } + + Future fireWrite(Wout msg) override { + typename P::DestructorGuard dg(static_cast(pipeline_)); + if (nextOut_) { + return nextOut_->write(std::forward(msg)); + } else { + LOG(WARNING) << "write reached end of pipeline"; + return makeFuture(); + } + } + + Future fireClose() override { + typename P::DestructorGuard dg(static_cast(pipeline_)); + if (nextOut_) { + return nextOut_->close(); + } else { + LOG(WARNING) << "close reached end of pipeline"; + return makeFuture(); + } + } + + std::shared_ptr getTransport() override { + return pipeline_->getTransport(); + } + + void setWriteFlags(WriteFlags flags) override { + pipeline_->setWriteFlags(flags); + } + + WriteFlags getWriteFlags() override { + return pipeline_->getWriteFlags(); + } + + void setReadBufferSettings( + uint64_t minAvailable, + uint64_t allocationSize) override { + pipeline_->setReadBufferSettings(minAvailable, allocationSize); + } + + std::pair getReadBufferSettings() override { + return pipeline_->getReadBufferSettings(); + } + + // InboundChannelHandlerContext overrides + void read(Rin msg) override { + typename P::DestructorGuard dg(static_cast(pipeline_)); + handler_.read(this, std::forward(msg)); + } + + void readEOF() override { + typename P::DestructorGuard dg(static_cast(pipeline_)); + handler_.readEOF(this); + } + + void readException(exception_wrapper e) override { + typename P::DestructorGuard dg(static_cast(pipeline_)); + handler_.readException(this, std::move(e)); + } + + // OutboundChannelHandlerContext overrides + Future write(Win msg) override { + typename P::DestructorGuard dg(static_cast(pipeline_)); + return handler_.write(this, std::forward(msg)); + } + + Future close() override { + typename P::DestructorGuard dg(static_cast(pipeline_)); + return handler_.close(this); + } + + private: + P* pipeline_; + H handler_; + InboundChannelHandlerContext* nextIn_{nullptr}; + OutboundChannelHandlerContext* nextOut_{nullptr}; +}; + +}} diff --git a/folly/experimental/wangle/channel/ChannelPipeline.h b/folly/experimental/wangle/channel/ChannelPipeline.h index de80a856..7af4adb9 100644 --- a/folly/experimental/wangle/channel/ChannelPipeline.h +++ b/folly/experimental/wangle/channel/ChannelPipeline.h @@ -16,51 +16,29 @@ #pragma once +#include #include +#include #include #include +#include #include -#include namespace folly { namespace wangle { -template -class ChannelHandlerContext { - public: - virtual ~ChannelHandlerContext() {} - - virtual void fireRead(In msg) = 0; - virtual void fireReadEOF() = 0; - virtual void fireReadException(exception_wrapper e) = 0; - - virtual Future fireWrite(Out msg) = 0; - virtual Future fireClose() = 0; - - virtual std::shared_ptr getTransport() = 0; - - virtual void setWriteFlags(WriteFlags flags) = 0; - virtual WriteFlags getWriteFlags() = 0; - - virtual void setReadBufferSettings( - uint64_t minAvailable, - uint64_t allocationSize) = 0; - virtual std::pair getReadBufferSettings() = 0; -}; +template +class ChannelPipeline; -template -class OutboundChannelHandlerContext { +template +class ChannelPipeline : public DelayedDestruction { public: - virtual ~OutboundChannelHandlerContext() {} - virtual Future write(Out msg) = 0; - virtual Future close() = 0; -}; + ChannelPipeline() {} + ~ChannelPipeline() {} -template -class ChannelPipeline; + std::shared_ptr getTransport() { + return transport_; + } -template <> -class ChannelPipeline<> : public DelayedDestruction { - public: void setWriteFlags(WriteFlags flags) { writeFlags_ = flags; } @@ -77,41 +55,87 @@ class ChannelPipeline<> : public DelayedDestruction { return readBufferSettings_; } - protected: - static const bool is_end{true}; - typedef void LastHandler; - typedef void OutboundContext; - - std::shared_ptr transport_; - WriteFlags writeFlags_{WriteFlags::NONE}; - std::pair readBufferSettings_{2048, 2048}; - - ~ChannelPipeline() {} - - template - void read(T&& msg) { - LOG(FATAL) << "impossibru"; + void read(R msg) { + front_->read(std::forward(msg)); } void readEOF() { - LOG(FATAL) << "impossibru"; + front_->readEOF(); } void readException(exception_wrapper e) { - LOG(FATAL) << "impossibru"; + front_->readException(std::move(e)); } - template - Future write(T&& msg) { - LOG(FATAL) << "impossibru"; - return makeFuture(); + Future write(W msg) { + return back_->write(std::forward(msg)); } Future close() { - LOG(FATAL) << "impossibru"; - return makeFuture(); + return back_->close(); } + template + ChannelPipeline& addBack(H&& handler) { + ctxs_.push_back(folly::make_unique>( + this, std::forward(handler))); + return *this; + } + + template + ChannelPipeline& addFront(H&& handler) { + ctxs_.insert(0, folly::make_unique>( + this, std::forward(handler))); + return *this; + } + + template + H* getHandler(int i) { + auto ctx = dynamic_cast*>(ctxs_[i].get()); + CHECK(ctx); + return ctx->getHandler(); + } + + void finalize() { + finalizeHelper(); + InboundChannelHandlerContext* front; + front_ = dynamic_cast*>( + ctxs_.front().get()); + if (!front_) { + throw std::invalid_argument("wrong type for first handler"); + } + } + + protected: + explicit ChannelPipeline(bool shouldFinalize) { + CHECK(!shouldFinalize); + } + + void finalizeHelper() { + if (ctxs_.empty()) { + return; + } + + for (int i = 0; i < ctxs_.size() - 1; i++) { + ctxs_[i]->link(ctxs_[i+1].get()); + } + + back_ = dynamic_cast*>(ctxs_.back().get()); + if (!back_) { + throw std::invalid_argument("wrong type for last handler"); + } + } + + PipelineContext* getLocalFront() { + return ctxs_.empty() ? nullptr : ctxs_.front().get(); + } + + static const bool is_end{true}; + + std::shared_ptr transport_; + WriteFlags writeFlags_{WriteFlags::NONE}; + std::pair readBufferSettings_{2048, 2048}; + void attachPipeline() {} void attachTransport( @@ -123,234 +147,172 @@ class ChannelPipeline<> : public DelayedDestruction { transport_ = nullptr; } - template - void setOutboundContext(T ctx) {} + OutboundChannelHandlerContext* back_{nullptr}; - template - H* getHandler(size_t i) { - LOG(FATAL) << "impossibru"; - } + private: + InboundChannelHandlerContext* front_{nullptr}; + std::vector> ctxs_; }; -template -class ChannelPipeline - : public ChannelPipeline { +template +class ChannelPipeline + : public ChannelPipeline { protected: - typedef typename std::conditional< - ChannelPipeline::is_end, - Handler, - typename ChannelPipeline::LastHandler>::type - LastHandler; - - public: template - ChannelPipeline(HandlerArg&& handlerArg, HandlersArgs&&... handlersArgs) - : ChannelPipeline(std::forward(handlersArgs)...), - handler_(std::forward(handlerArg)), - ctx_(this) { - handler_.attachPipeline(&ctx_); - ChannelPipeline::setOutboundContext(&ctx_); + ChannelPipeline( + bool shouldFinalize, + HandlerArg&& handlerArg, + HandlersArgs&&... handlersArgs) + : ChannelPipeline( + false, + std::forward(handlersArgs)...), + ctx_(this, std::forward(handlerArg)) { + if (shouldFinalize) { + finalize(); + } } + public: + template + explicit ChannelPipeline(HandlersArgs&&... handlersArgs) + : ChannelPipeline(true, std::forward(handlersArgs)...) {} ~ChannelPipeline() {} - void destroy() override { - handler_.detachPipeline(&ctx_); - } + void destroy() override { } - void read(typename Handler::rin msg) { - ChannelPipeline<>::DestructorGuard dg( + void read(R msg) { + typename ChannelPipeline::DestructorGuard dg( static_cast(this)); - handler_.read(&ctx_, std::forward(msg)); + front_->read(std::forward(msg)); } void readEOF() { - ChannelPipeline<>::DestructorGuard dg( + typename ChannelPipeline::DestructorGuard dg( static_cast(this)); - handler_.readEOF(&ctx_); + front_->readEOF(); } void readException(exception_wrapper e) { - ChannelPipeline<>::DestructorGuard dg( + typename ChannelPipeline::DestructorGuard dg( static_cast(this)); - handler_.readException(&ctx_, std::move(e)); + front_->readEOF(std::move(e)); } - Future write(typename LastHandler::win msg) { - ChannelPipeline<>::DestructorGuard dg( + Future write(W msg) { + typename ChannelPipeline::DestructorGuard dg( static_cast(this)); - return ChannelPipeline::writeHere( - std::forward(msg)); + return back_->write(std::forward(msg)); } Future close() { - ChannelPipeline<>::DestructorGuard dg( + typename ChannelPipeline::DestructorGuard dg( static_cast(this)); - return ChannelPipeline::closeHere(); + return back_->close(); } void attachTransport( std::shared_ptr transport) { - ChannelPipeline<>::DestructorGuard dg( + typename ChannelPipeline::DestructorGuard dg( static_cast(this)); - CHECK(!ChannelPipeline<>::transport_); - ChannelPipeline::attachTransport(std::move(transport)); - handler_.attachTransport(&ctx_); + CHECK((!ChannelPipeline::transport_)); + ChannelPipeline::attachTransport(std::move(transport)); + forEachCtx([&](PipelineContext* ctx){ + ctx->attachTransport(); + }); } void detachTransport() { - ChannelPipeline<>::DestructorGuard dg( + typename ChannelPipeline::DestructorGuard dg( static_cast(this)); - ChannelPipeline::detachTransport(); - handler_.detachTransport(&ctx_); + ChannelPipeline::detachTransport(); + forEachCtx([&](PipelineContext* ctx){ + ctx->detachTransport(); + }); } std::shared_ptr getTransport() { - return ChannelPipeline<>::transport_; + return ChannelPipeline::transport_; } template - H* getHandler(size_t i) { - if (i == 0) { - auto ptr = dynamic_cast(&handler_); - CHECK(ptr); - return ptr; - } else { - return ChannelPipeline::template getHandler(i-1); - } - } - - protected: - static const bool is_end{false}; - - typedef OutboundChannelHandlerContext OutboundContext; - - void setOutboundContext(OutboundContext* ctx) { - outboundCtx_ = ctx; - } - - Future writeHere(typename Handler::win msg) { - return handler_.write(&ctx_, std::forward(msg)); + ChannelPipeline& addBack(H&& handler) { + ChannelPipeline::addBack(std::move(handler)); + return *this; } - Future closeHere() { - return handler_.close(&ctx_); + template + ChannelPipeline& addFront(H&& handler) { + ctxs_.insert(0, folly::make_unique>( + this, std::move(handler))); + return *this; } - private: - class Context - : public ChannelHandlerContext, - public OutboundChannelHandlerContext { - public: - explicit Context(ChannelPipeline* pipeline) : pipeline_(pipeline) {} - ChannelPipeline* pipeline_; - - void fireRead(typename Handler::rout msg) override { - ChannelPipeline<>::DestructorGuard dg(pipeline_); - pipeline_->fireRead(std::forward(msg)); - } - - void fireReadEOF() override { - ChannelPipeline<>::DestructorGuard dg(pipeline_); - return pipeline_->fireReadEOF(); - } - - void fireReadException(exception_wrapper e) override { - ChannelPipeline<>::DestructorGuard dg(pipeline_); - return pipeline_->fireReadException(std::move(e)); - } - - Future fireWrite(typename Handler::wout msg) override { - ChannelPipeline<>::DestructorGuard dg(pipeline_); - return pipeline_->fireWrite(std::forward(msg)); - } - - Future write(typename Handler::win msg) override { - ChannelPipeline<>::DestructorGuard dg(pipeline_); - return pipeline_->writeHere(std::forward(msg)); - } - - Future fireClose() override { - ChannelPipeline<>::DestructorGuard dg(pipeline_); - return pipeline_->fireClose(); - } - - Future close() override { - ChannelPipeline<>::DestructorGuard dg(pipeline_); - return pipeline_->closeHere(); - } - - std::shared_ptr getTransport() override { - return pipeline_->transport_; - } - - void setWriteFlags(WriteFlags flags) override { - pipeline_->setWriteFlags(flags); - } - - WriteFlags getWriteFlags() override { - return pipeline_->getWriteFlags(); + template + H* getHandler(size_t i) { + if (i > ctxs_.size()) { + return ChannelPipeline::template getHandler( + i - (ctxs_.size() + 1)); + } else { + auto pctx = (i == ctxs_.size()) ? &ctx_ : ctxs_[i].get(); + auto ctx = dynamic_cast*>(pctx); + return ctx->getHandler(); } + } - void setReadBufferSettings( - uint64_t minAvailable, - uint64_t allocationSize) override { - pipeline_->setReadBufferSettings(minAvailable, allocationSize); + void finalize() { + finalizeHelper(); + auto ctx = ctxs_.empty() ? &ctx_ : ctxs_.front().get(); + front_ = dynamic_cast*>(ctx); + if (!front_) { + throw std::invalid_argument("wrong type for first handler"); } + } - std::pair getReadBufferSettings() override { - return pipeline_->getReadBufferSettings(); + protected: + void finalizeHelper() { + ChannelPipeline::finalizeHelper(); + back_ = ChannelPipeline::back_; + if (!back_) { + auto is_end = ChannelPipeline::is_end; + CHECK(is_end); + back_ = dynamic_cast*>(&ctx_); + if (!back_) { + throw std::invalid_argument("wrong type for last handler"); + } } - }; - void fireRead(typename Handler::rout msg) { - if (!ChannelPipeline::is_end) { - ChannelPipeline::read( - std::forward(msg)); - } else { - LOG(WARNING) << "read() reached end of pipeline"; + if (!ctxs_.empty()) { + for (int i = 0; i < ctxs_.size() - 1; i++) { + ctxs_[i]->link(ctxs_[i+1].get()); + } + ctxs_.back()->link(&ctx_); } - } - void fireReadEOF() { - if (!ChannelPipeline::is_end) { - ChannelPipeline::readEOF(); - } else { - LOG(WARNING) << "readEOF() reached end of pipeline"; + auto nextFront = ChannelPipeline::getLocalFront(); + if (nextFront) { + ctx_.link(nextFront); } } - void fireReadException(exception_wrapper e) { - if (!ChannelPipeline::is_end) { - ChannelPipeline::readException(std::move(e)); - } else { - LOG(WARNING) << "readException() reached end of pipeline"; - } + PipelineContext* getLocalFront() { + return ctxs_.empty() ? &ctx_ : ctxs_.front().get(); } - Future fireWrite(typename Handler::wout msg) { - if (outboundCtx_) { - return outboundCtx_->write(std::forward(msg)); - } else { - LOG(WARNING) << "write() reached end of pipeline"; - return makeFuture(); - } - } + static const bool is_end{false}; + InboundChannelHandlerContext* front_{nullptr}; + OutboundChannelHandlerContext* back_{nullptr}; - Future fireClose() { - if (outboundCtx_) { - return outboundCtx_->close(); - } else { - LOG(WARNING) << "close() reached end of pipeline"; - return makeFuture(); + private: + template + void forEachCtx(const F& func) { + for (auto& ctx : ctxs_) { + func(ctx.get()); } + func(&ctx_); } - friend class Context; - Handler handler_; - Context ctx_; - OutboundContext* outboundCtx_{nullptr}; + ContextImpl ctx_; + std::vector> ctxs_; }; }} diff --git a/folly/experimental/wangle/channel/ChannelTest.cpp b/folly/experimental/wangle/channel/ChannelTest.cpp index e0151583..6b7ec897 100644 --- a/folly/experimental/wangle/channel/ChannelTest.cpp +++ b/folly/experimental/wangle/channel/ChannelTest.cpp @@ -17,6 +17,7 @@ #include #include #include +#include #include #include @@ -63,7 +64,7 @@ class EchoService : public ChannelHandlerAdapter { }; TEST(ChannelTest, PlzCompile) { - ChannelPipeline< + ChannelPipeline pipeline(BytesPassthrough(), BytesPassthrough(), BytesPassthrough); - ChannelPipeline< + ChannelPipeline, KittyPrepender, - KittyPrepender, - EchoService> + KittyPrepender> kittyPipeline( std::make_shared(), KittyPrepender{}, - KittyPrepender{}, - EchoService{}); + KittyPrepender{}); + kittyPipeline.addBack(KittyPrepender{}); + kittyPipeline.addBack(EchoService{}); + kittyPipeline.finalize(); kittyPipeline.read(5); auto handler = kittyPipeline.getHandler(2); CHECK(handler); + + auto p = folly::make_unique(42); + folly::Optional> foo{std::move(p)}; +} + +TEST(ChannelTest, PlzCompile2) { + EchoService echoService; + ChannelPipeline pipeline; + pipeline + .addBack(ToString()) + .addBack(KittyPrepender()) + .addBack(KittyPrepender()) + .addBack(ChannelHandlerPtr(&echoService)) + .finalize(); + pipeline.read(42); }