From d3a23ad3cf7714787651d610dc6659a68def43e7 Mon Sep 17 00:00:00 2001 From: James Sedgwick Date: Mon, 3 Nov 2014 09:37:35 -0800 Subject: [PATCH] netty-like channels Summary: I wanted some foundational abstractions to start building out codecs on top of. I also know that Blake gets very amused when I shamelessly copy Java/Netty abstractions, and I live to amuse Blake so I did it again. So here's an implementation of something very similar to Netty's ChannelAdapters/ChannelPipelines Only read() and write() for now, everything/anything else can easily be bolted on once the design is settled (if this is at all the direction we want to go) I have a lot of thoughts about this design but I'm going to save my fingers and leave that to ad hoc discussions once folks take a look at this A couple of things, though: - It should be possible to dynamically add handlers to the chain. How I envision this working is that each original adapters keeps two lists of shared/unique ptrs to adapters of the correct type to sit next to them on either side, and dispatch to them appropriately when they're there. - I was trying to do without separate ChannelHandlerContext objects altogether (keep the interface, but have ChannelPipeline act as the context itself) but ran into issues with virtual multiple inheritance. I might have another go at this. - Only movable types are permitted. I hope this won't be too restrictive, because it would be a PITA to support both move-only and copy-only types. - Why no Rx? Seems to me that any handlers that actually needs Rx (e.g. stats fanout or something) can deal with it themselves. - If it turned out to be useful to nest these (for more flexible composition) that would also be doable. i.e. ChannelPipeline, ChannelPipeline> Test Plan: super basic test compiles and runs as expected Reviewed By: davejwatson@fb.com Subscribers: ajitb, folly-diffs@, ldbrandy, trunkagent, fugalh, njormrod FB internal diff: D1604575 Tasks: 5002299 Signature: t1:1604575:1415034767:bc3b12fae726890aa6a55ed391286917ae23e56e --- .../wangle/channel/ChannelHandler.h | 193 ++++++++++ .../wangle/channel/ChannelPipeline.h | 356 ++++++++++++++++++ .../wangle/channel/ChannelTest.cpp | 88 +++++ 3 files changed, 637 insertions(+) create mode 100644 folly/experimental/wangle/channel/ChannelHandler.h create mode 100644 folly/experimental/wangle/channel/ChannelPipeline.h create mode 100644 folly/experimental/wangle/channel/ChannelTest.cpp diff --git a/folly/experimental/wangle/channel/ChannelHandler.h b/folly/experimental/wangle/channel/ChannelHandler.h new file mode 100644 index 00000000..3ef7ae50 --- /dev/null +++ b/folly/experimental/wangle/channel/ChannelHandler.h @@ -0,0 +1,193 @@ +/* + * Copyright 2014 Facebook, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#pragma once + +#include +#include +#include +#include + +namespace folly { namespace wangle { + +template +class ChannelHandler { + public: + typedef Rin rin; + typedef Rout rout; + typedef Win win; + typedef Wout wout; + typedef ChannelHandlerContext Context; + virtual ~ChannelHandler() {} + + virtual void read(Context* ctx, Rin msg) = 0; + virtual void readEOF(Context* ctx) { + ctx->fireReadEOF(); + } + virtual void readException(Context* ctx, exception_wrapper e) { + ctx->fireReadException(std::move(e)); + } + + virtual Future write(Context* ctx, Win msg) = 0; + virtual Future close(Context* ctx) { + return ctx->fireClose(); + } + + virtual void attachPipeline(Context* ctx) {} + virtual void attachTransport(Context* ctx) {} + + virtual void detachPipeline(Context* ctx) {} + virtual void detachTransport(Context* ctx) {} + + /* + // Other sorts of things we might want, all shamelessly stolen from Netty + // inbound + virtual void exceptionCaught( + ChannelHandlerContext* ctx, + exception_wrapper e) {} + virtual void channelRegistered(ChannelHandlerContext* ctx) {} + virtual void channelUnregistered(ChannelHandlerContext* ctx) {} + virtual void channelActive(ChannelHandlerContext* ctx) {} + virtual void channelInactive(ChannelHandlerContext* ctx) {} + virtual void channelReadComplete(ChannelHandlerContext* ctx) {} + virtual void userEventTriggered(ChannelHandlerContext* ctx, void* evt) {} + virtual void channelWritabilityChanged(ChannelHandlerContext* ctx) {} + + // outbound + virtual Future bind( + ChannelHandlerContext* ctx, + SocketAddress localAddress) {} + virtual Future connect( + ChannelHandlerContext* ctx, + SocketAddress remoteAddress, SocketAddress localAddress) {} + virtual Future disconnect(ChannelHandlerContext* ctx) {} + virtual Future deregister(ChannelHandlerContext* ctx) {} + virtual Future read(ChannelHandlerContext* ctx) {} + virtual void flush(ChannelHandlerContext* ctx) {} + */ +}; + +template +class ChannelHandlerAdapter : public ChannelHandler { + public: + typedef typename ChannelHandler::Context Context; + + void read(Context* ctx, R msg) override { + ctx->fireRead(std::forward(msg)); + } + + Future write(Context* ctx, W msg) override { + return ctx->fireWrite(std::forward(msg)); + } +}; + +typedef ChannelHandlerAdapter> +BytesToBytesHandler; + +template +class ChannelHandlerPtr : public ChannelHandler< + typename Handler::rin, + typename Handler::rout, + typename Handler::win, + typename Handler::wout> { + public: + typedef typename std::conditional< + Shared, + std::shared_ptr, + Handler*>::type + HandlerPtr; + + typedef typename Handler::Context Context; + + explicit ChannelHandlerPtr(HandlerPtr handler) + : handler_(std::move(handler)) {} + + void setHandler(HandlerPtr handler) { + if (handler == handler_) { + return; + } + if (handler_ && ctx_) { + handler_->detachPipeline(ctx_); + } + handler_ = std::move(handler); + if (handler_ && ctx_) { + handler_->attachPipeline(ctx_); + if (ctx_->getTransport()) { + handler_->attachTransport(ctx_); + } + } + } + + void attachPipeline(Context* ctx) override { + ctx_ = ctx; + if (handler_) { + handler_->attachPipeline(ctx_); + } + } + + + void attachTransport(Context* ctx) override { + ctx_ = ctx; + if (handler_) { + handler_->attachTransport(ctx_); + } + } + + void detachPipeline(Context* ctx) override { + ctx_ = ctx; + if (handler_) { + handler_->detachPipeline(ctx_); + } + } + + void detachTransport(Context* ctx) override { + ctx_ = ctx; + if (handler_) { + handler_->detachTransport(ctx_); + } + } + + void read(Context* ctx, typename Handler::rin msg) override { + DCHECK(handler_); + handler_->read(ctx, std::forward(msg)); + } + + void readEOF(Context* ctx) override { + DCHECK(handler_); + handler_->readEOF(ctx); + } + + void readException(Context* ctx, exception_wrapper e) override { + DCHECK(handler_); + handler_->readException(ctx, std::move(e)); + } + + Future write(Context* ctx, typename Handler::win msg) override { + DCHECK(handler_); + return handler_->write(ctx, std::forward(msg)); + } + + Future close(Context* ctx) override { + DCHECK(handler_); + return handler_->close(ctx); + } + + private: + Context* ctx_; + HandlerPtr handler_; +}; + +}} diff --git a/folly/experimental/wangle/channel/ChannelPipeline.h b/folly/experimental/wangle/channel/ChannelPipeline.h new file mode 100644 index 00000000..89212df5 --- /dev/null +++ b/folly/experimental/wangle/channel/ChannelPipeline.h @@ -0,0 +1,356 @@ +/* + * Copyright 2014 Facebook, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#pragma once + +#include +#include +#include +#include +#include + +namespace folly { namespace wangle { + +template +class ChannelHandlerContext { + public: + virtual ~ChannelHandlerContext() {} + + virtual void fireRead(In msg) = 0; + virtual void fireReadEOF() = 0; + virtual void fireReadException(exception_wrapper e) = 0; + + virtual Future fireWrite(Out msg) = 0; + virtual Future fireClose() = 0; + + virtual std::shared_ptr getTransport() = 0; + + virtual void setWriteFlags(WriteFlags flags) = 0; + virtual WriteFlags getWriteFlags() = 0; + + virtual void setReadBufferSettings( + uint64_t minAvailable, + uint64_t allocationSize) = 0; + virtual std::pair getReadBufferSettings() = 0; +}; + +template +class OutboundChannelHandlerContext { + public: + virtual ~OutboundChannelHandlerContext() {} + virtual Future write(Out msg) = 0; + virtual Future close() = 0; +}; + +template +class ChannelPipeline; + +template <> +class ChannelPipeline<> : public DelayedDestruction { + public: + void setWriteFlags(WriteFlags flags) { + writeFlags_ = flags; + } + + WriteFlags getWriteFlags() { + return writeFlags_; + } + + void setReadBufferSettings(uint64_t minAvailable, uint64_t allocationSize) { + readBufferSettings_ = std::make_pair(minAvailable, allocationSize); + } + + std::pair getReadBufferSettings() { + return readBufferSettings_; + } + + protected: + static const bool is_end{true}; + typedef void LastHandler; + typedef void OutboundContext; + + std::shared_ptr transport_; + WriteFlags writeFlags_{WriteFlags::NONE}; + std::pair readBufferSettings_{2048, 2048}; + + ~ChannelPipeline() {} + + template + void read(T&& msg) { + LOG(FATAL) << "impossibru"; + } + + void readEOF() { + LOG(FATAL) << "impossibru"; + } + + void readException(exception_wrapper e) { + LOG(FATAL) << "impossibru"; + } + + template + Future write(T&& msg) { + LOG(FATAL) << "impossibru"; + return makeFuture(); + } + + Future close() { + LOG(FATAL) << "impossibru"; + return makeFuture(); + } + + void attachPipeline() {} + + void attachTransport( + std::shared_ptr transport) { + transport_ = std::move(transport); + } + + void detachTransport() { + transport_ = nullptr; + } + + template + void setOutboundContext(T ctx) {} + + template + H* getHandler(size_t i) { + LOG(FATAL) << "impossibru"; + } +}; + +template +class ChannelPipeline + : public ChannelPipeline { + protected: + typedef typename std::conditional< + ChannelPipeline::is_end, + Handler, + typename ChannelPipeline::LastHandler>::type + LastHandler; + + public: + template + ChannelPipeline(HandlerArg&& handlerArg, HandlersArgs&&... handlersArgs) + : ChannelPipeline(std::forward(handlersArgs)...), + handler_(std::forward(handlerArg)), + ctx_(this) { + handler_.attachPipeline(&ctx_); + ChannelPipeline::setOutboundContext(&ctx_); + } + + ~ChannelPipeline() {} + + void destroy() override { + handler_.detachPipeline(&ctx_); + } + + void read(typename Handler::rin msg) { + ChannelPipeline<>::DestructorGuard dg( + static_cast(this)); + handler_.read(&ctx_, std::forward(msg)); + } + + void readEOF() { + ChannelPipeline<>::DestructorGuard dg( + static_cast(this)); + handler_.readEOF(&ctx_); + } + + void readException(exception_wrapper e) { + ChannelPipeline<>::DestructorGuard dg( + static_cast(this)); + handler_.readException(&ctx_, std::move(e)); + } + + Future write(typename LastHandler::win msg) { + ChannelPipeline<>::DestructorGuard dg( + static_cast(this)); + return ChannelPipeline::writeHere( + std::forward(msg)); + } + + Future close() { + ChannelPipeline<>::DestructorGuard dg( + static_cast(this)); + return ChannelPipeline::closeHere(); + } + + void attachTransport( + std::shared_ptr transport) { + ChannelPipeline<>::DestructorGuard dg( + static_cast(this)); + CHECK(!ChannelPipeline<>::transport_); + ChannelPipeline::attachTransport(std::move(transport)); + handler_.attachTransport(&ctx_); + } + + void detachTransport() { + ChannelPipeline<>::DestructorGuard dg( + static_cast(this)); + ChannelPipeline::detachTransport(); + handler_.detachTransport(&ctx_); + } + + std::shared_ptr getTransport() { + return ChannelPipeline<>::transport_; + } + + template + H* getHandler(size_t i) { + if (i == 0) { + auto ptr = dynamic_cast(&handler_); + CHECK(ptr); + return ptr; + } else { + return ChannelPipeline::template getHandler(i-1); + } + } + + protected: + static const bool is_end{false}; + + typedef OutboundChannelHandlerContext OutboundContext; + + void setOutboundContext(OutboundContext* ctx) { + outboundCtx_ = ctx; + } + + Future writeHere(typename Handler::win msg) { + return handler_.write(&ctx_, std::forward(msg)); + } + + Future closeHere() { + return handler_.close(&ctx_); + } + + private: + class Context + : public ChannelHandlerContext, + public OutboundChannelHandlerContext { + public: + explicit Context(ChannelPipeline* pipeline) : pipeline_(pipeline) {} + ChannelPipeline* pipeline_; + + void fireRead(typename Handler::rout msg) override { + ChannelPipeline<>::DestructorGuard dg(pipeline_); + pipeline_->fireRead(std::forward(msg)); + } + + void fireReadEOF() override { + ChannelPipeline<>::DestructorGuard dg(pipeline_); + return pipeline_->fireReadEOF(); + } + + void fireReadException(exception_wrapper e) override { + ChannelPipeline<>::DestructorGuard dg(pipeline_); + return pipeline_->fireReadException(std::move(e)); + } + + Future fireWrite(typename Handler::wout msg) override { + ChannelPipeline<>::DestructorGuard dg(pipeline_); + return pipeline_->fireWrite(std::forward(msg)); + } + + Future write(typename Handler::win msg) override { + ChannelPipeline<>::DestructorGuard dg(pipeline_); + return pipeline_->writeHere(std::forward(msg)); + } + + Future fireClose() override { + ChannelPipeline<>::DestructorGuard dg(pipeline_); + return pipeline_->fireClose(); + } + + Future close() override { + ChannelPipeline<>::DestructorGuard dg(pipeline_); + return pipeline_->closeHere(); + } + + std::shared_ptr getTransport() override { + return pipeline_->transport_; + } + + void setWriteFlags(WriteFlags flags) override { + pipeline_->setWriteFlags(flags); + } + + WriteFlags getWriteFlags() override { + return pipeline_->getWriteFlags(); + } + + void setReadBufferSettings( + uint64_t minAvailable, + uint64_t allocationSize) override { + pipeline_->setReadBufferSettings(minAvailable, allocationSize); + } + + std::pair getReadBufferSettings() override { + return pipeline_->getReadBufferSettings(); + } + }; + + void fireRead(typename Handler::rout msg) { + if (!ChannelPipeline::is_end) { + ChannelPipeline::read( + std::forward(msg)); + } else { + LOG(WARNING) << "read() reached end of pipeline"; + } + } + + void fireReadEOF() { + if (!ChannelPipeline::is_end) { + ChannelPipeline::readEOF(); + } else { + LOG(WARNING) << "readEOF() reached end of pipeline"; + } + } + + void fireReadException(exception_wrapper e) { + if (!ChannelPipeline::is_end) { + ChannelPipeline::readException(std::move(e)); + } else { + LOG(WARNING) << "readException() reached end of pipeline"; + } + } + + Future fireWrite(typename Handler::wout msg) { + if (outboundCtx_) { + return outboundCtx_->write(std::forward(msg)); + } else { + LOG(WARNING) << "write() reached end of pipeline"; + return makeFuture(); + } + } + + Future fireClose() { + if (outboundCtx_) { + return outboundCtx_->close(); + } else { + LOG(WARNING) << "close() reached end of pipeline"; + return makeFuture(); + } + } + + friend class Context; + Handler handler_; + Context ctx_; + OutboundContext* outboundCtx_{nullptr}; +}; + +}} diff --git a/folly/experimental/wangle/channel/ChannelTest.cpp b/folly/experimental/wangle/channel/ChannelTest.cpp new file mode 100644 index 00000000..e0151583 --- /dev/null +++ b/folly/experimental/wangle/channel/ChannelTest.cpp @@ -0,0 +1,88 @@ +/* + * Copyright 2014 Facebook, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include +#include +#include +#include +#include + +using namespace folly; +using namespace folly::wangle; + +class ToString : public ChannelHandler { + public: + virtual ~ToString() {} + void read(Context* ctx, int msg) override { + LOG(INFO) << "ToString read"; + ctx->fireRead(folly::to(msg)); + } + Future write(Context* ctx, std::string msg) override { + LOG(INFO) << "ToString write"; + return ctx->fireWrite(folly::to(msg)); + } +}; + +class KittyPrepender : public ChannelHandlerAdapter { + public: + virtual ~KittyPrepender() {} + void read(Context* ctx, std::string msg) override { + LOG(INFO) << "KittyAppender read"; + ctx->fireRead(folly::to("kitty", msg)); + } + Future write(Context* ctx, std::string msg) override { + LOG(INFO) << "KittyAppender write"; + return ctx->fireWrite(msg.substr(5)); + } +}; + +typedef ChannelHandlerAdapter BytesPassthrough; + +class EchoService : public ChannelHandlerAdapter { + public: + virtual ~EchoService() {} + void read(Context* ctx, std::string str) override { + LOG(INFO) << "ECHO: " << str; + ctx->fireWrite(str).then([](Try&& t) { + LOG(INFO) << "done writing"; + }); + } +}; + +TEST(ChannelTest, PlzCompile) { + ChannelPipeline< + BytesPassthrough, + BytesPassthrough, + // If this were useful it wouldn't be that hard + // ChannelPipeline, + BytesPassthrough> + pipeline(BytesPassthrough(), BytesPassthrough(), BytesPassthrough); + + ChannelPipeline< + ChannelHandlerPtr, + KittyPrepender, + KittyPrepender, + EchoService> + kittyPipeline( + std::make_shared(), + KittyPrepender{}, + KittyPrepender{}, + EchoService{}); + kittyPipeline.read(5); + + auto handler = kittyPipeline.getHandler(2); + CHECK(handler); +} -- 2.34.1