From: James Sedgwick Date: Mon, 27 Apr 2015 18:19:18 +0000 (-0700) Subject: strip Channel from all class names X-Git-Tag: v0.36.0~4 X-Git-Url: http://demsky.eecs.uci.edu/git/?a=commitdiff_plain;h=3444ffba56927ac7e682dafd92ccc37a0f50c8ad;p=folly.git strip Channel from all class names Summary: as above. Only got a little messy when components within folly::wangle typedefed things to Pipeline Test Plan: unit tests Reviewed By: davejwatson@fb.com Subscribers: wormhole-diffs@, fugalh, alandau, bmatheny, folly-diffs@, jsedgwick, yfeldblum, chalfant FB internal diff: D2022181 Tasks: 6836580 Signature: t1:2022181:1430157032:df0bdfb9ca0d76b86d52c55c4ad41ea953a18cb4 --- diff --git a/folly/Makefile.am b/folly/Makefile.am index 0cfbed97..adba4d1e 100644 --- a/folly/Makefile.am +++ b/folly/Makefile.am @@ -273,10 +273,10 @@ nobase_follyinclude_HEADERS = \ wangle/bootstrap/ServerSocketFactory.h \ wangle/bootstrap/ClientBootstrap.h \ wangle/channel/AsyncSocketHandler.h \ - wangle/channel/ChannelHandler.h \ - wangle/channel/ChannelHandlerContext.h \ - wangle/channel/ChannelPipeline.h \ + wangle/channel/Handler.h \ + wangle/channel/HandlerContext.h \ wangle/channel/OutputBufferingHandler.h \ + wangle/channel/Pipeline.h \ wangle/concurrent/BlockingQueue.h \ wangle/concurrent/Codel.h \ wangle/concurrent/CPUThreadPoolExecutor.h \ diff --git a/folly/wangle/bootstrap/BootstrapTest.cpp b/folly/wangle/bootstrap/BootstrapTest.cpp index d0ee7e2a..9087be57 100644 --- a/folly/wangle/bootstrap/BootstrapTest.cpp +++ b/folly/wangle/bootstrap/BootstrapTest.cpp @@ -16,7 +16,7 @@ #include "folly/wangle/bootstrap/ServerBootstrap.h" #include "folly/wangle/bootstrap/ClientBootstrap.h" -#include "folly/wangle/channel/ChannelHandler.h" +#include "folly/wangle/channel/Handler.h" #include #include @@ -25,14 +25,14 @@ using namespace folly::wangle; using namespace folly; -typedef ChannelPipeline> Pipeline; +typedef Pipeline> BytesPipeline; -typedef ServerBootstrap TestServer; -typedef ClientBootstrap TestClient; +typedef ServerBootstrap TestServer; +typedef ClientBootstrap TestClient; -class TestClientPipelineFactory : public PipelineFactory { +class TestClientPipelineFactory : public PipelineFactory { public: - Pipeline* newPipeline(std::shared_ptr sock) { + BytesPipeline* newPipeline(std::shared_ptr sock) { CHECK(sock->good()); // We probably aren't connected immedately, check after a small delay @@ -43,11 +43,11 @@ class TestClientPipelineFactory : public PipelineFactory { } }; -class TestPipelineFactory : public PipelineFactory { +class TestPipelineFactory : public PipelineFactory { public: - Pipeline* newPipeline(std::shared_ptr sock) { + BytesPipeline* newPipeline(std::shared_ptr sock) { pipelines++; - return new Pipeline(); + return new BytesPipeline(); } std::atomic pipelines{0}; }; @@ -268,7 +268,7 @@ TEST(Bootstrap, ExistingSocket) { std::atomic connections{0}; class TestHandlerPipeline - : public ChannelHandlerAdapter { public: void read(Context* ctx, void* conn) { @@ -283,12 +283,12 @@ class TestHandlerPipeline template class TestHandlerPipelineFactory - : public PipelineFactory::AcceptPipeline> { + : public PipelineFactory::AcceptPipeline> { public: - ServerBootstrap::AcceptPipeline* newPipeline(std::shared_ptr) { - auto pipeline = new ServerBootstrap::AcceptPipeline; + ServerBootstrap::AcceptPipeline* newPipeline(std::shared_ptr) { + auto pipeline = new ServerBootstrap::AcceptPipeline; auto handler = std::make_shared(); - pipeline->addBack(ChannelHandlerPtr(handler)); + pipeline->addBack(HandlerPtr(handler)); return pipeline; } }; @@ -318,7 +318,7 @@ TEST(Bootstrap, LoadBalanceHandler) { } class TestUDPPipeline - : public ChannelHandlerAdapter { public: void read(Context* ctx, void* conn) { diff --git a/folly/wangle/bootstrap/ClientBootstrap.h b/folly/wangle/bootstrap/ClientBootstrap.h index 37179fda..2b3f3be2 100644 --- a/folly/wangle/bootstrap/ClientBootstrap.h +++ b/folly/wangle/bootstrap/ClientBootstrap.h @@ -15,12 +15,12 @@ */ #pragma once -#include +#include namespace folly { /* - * A thin wrapper around ChannelPipeline and AsyncSocket to match + * A thin wrapper around Pipeline and AsyncSocket to match * ServerBootstrap. On connect() a new pipeline is created. */ template diff --git a/folly/wangle/bootstrap/ServerBootstrap-inl.h b/folly/wangle/bootstrap/ServerBootstrap-inl.h index e1909265..1e0ebc75 100644 --- a/folly/wangle/bootstrap/ServerBootstrap-inl.h +++ b/folly/wangle/bootstrap/ServerBootstrap-inl.h @@ -20,15 +20,15 @@ #include #include #include -#include -#include +#include +#include namespace folly { template class ServerAcceptor : public Acceptor - , public folly::wangle::ChannelHandlerAdapter { + , public folly::wangle::HandlerAdapter { typedef std::unique_ptr PipelinePtr; @@ -60,7 +60,7 @@ class ServerAcceptor public: explicit ServerAcceptor( std::shared_ptr> pipelineFactory, - std::shared_ptr> acceptorPipeline, EventBase* base) : Acceptor(ServerSocketConfig()) @@ -70,7 +70,7 @@ class ServerAcceptor Acceptor::init(nullptr, base_); CHECK(acceptorPipeline_); - acceptorPipeline_->addBack(folly::wangle::ChannelHandlerPtr(this)); + acceptorPipeline_->addBack(folly::wangle::HandlerPtr(this)); acceptorPipeline_->finalize(); } @@ -109,7 +109,7 @@ class ServerAcceptor EventBase* base_; std::shared_ptr> childPipelineFactory_; - std::shared_ptr> acceptorPipeline_; }; @@ -118,13 +118,13 @@ class ServerAcceptorFactory : public AcceptorFactory { public: explicit ServerAcceptorFactory( std::shared_ptr> factory, - std::shared_ptr>> pipeline) : factory_(factory) , pipeline_(pipeline) {} std::shared_ptr newAcceptor(EventBase* base) { - std::shared_ptr> pipeline( pipeline_->newPipeline(nullptr)); return std::make_shared>(factory_, pipeline, base); @@ -132,7 +132,7 @@ class ServerAcceptorFactory : public AcceptorFactory { private: std::shared_ptr> factory_; std::shared_ptr>> pipeline_; }; @@ -183,8 +183,8 @@ void ServerWorkerPool::forEachWorker(F&& f) const { } class DefaultAcceptPipelineFactory - : public PipelineFactory> { - typedef wangle::ChannelPipeline< + : public PipelineFactory> { + typedef wangle::Pipeline< void*, std::exception> AcceptPipeline; diff --git a/folly/wangle/bootstrap/ServerBootstrap.cpp b/folly/wangle/bootstrap/ServerBootstrap.cpp index e59ed657..6b7a4101 100644 --- a/folly/wangle/bootstrap/ServerBootstrap.cpp +++ b/folly/wangle/bootstrap/ServerBootstrap.cpp @@ -15,7 +15,7 @@ */ #include #include -#include +#include #include namespace folly { diff --git a/folly/wangle/bootstrap/ServerBootstrap.h b/folly/wangle/bootstrap/ServerBootstrap.h index 3520b0f5..28785a1b 100644 --- a/folly/wangle/bootstrap/ServerBootstrap.h +++ b/folly/wangle/bootstrap/ServerBootstrap.h @@ -17,11 +17,11 @@ #include #include -#include +#include namespace folly { -typedef folly::wangle::ChannelPipeline< +typedef folly::wangle::Pipeline< folly::IOBufQueue&, std::unique_ptr> DefaultPipeline; /* @@ -30,7 +30,7 @@ typedef folly::wangle::ChannelPipeline< * accepting threads, any number of accepting sockets, a pool of * IO-worker threads, and connection pool for each IO thread for you. * - * The output is given as a ChannelPipeline template: given a + * The output is given as a Pipeline template: given a * PipelineFactory, it will create a new pipeline for each connection, * and your server can handle the incoming bytes. * @@ -52,7 +52,7 @@ class ServerBootstrap { join(); } - typedef wangle::ChannelPipeline< + typedef wangle::Pipeline< void*, std::exception> AcceptPipeline; /* diff --git a/folly/wangle/channel/AsyncSocketHandler.h b/folly/wangle/channel/AsyncSocketHandler.h index b5be966f..014812b8 100644 --- a/folly/wangle/channel/AsyncSocketHandler.h +++ b/folly/wangle/channel/AsyncSocketHandler.h @@ -16,7 +16,7 @@ #pragma once -#include +#include #include #include #include diff --git a/folly/wangle/channel/ChannelHandler.h b/folly/wangle/channel/ChannelHandler.h deleted file mode 100644 index e1aa6d5e..00000000 --- a/folly/wangle/channel/ChannelHandler.h +++ /dev/null @@ -1,196 +0,0 @@ -/* - * Copyright 2015 Facebook, Inc. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -#pragma once - -#include -#include -#include -#include - -namespace folly { namespace wangle { - -template -class ChannelHandler { - public: - typedef Rin rin; - typedef Rout rout; - typedef Win win; - typedef Wout wout; - typedef ChannelHandlerContext Context; - virtual ~ChannelHandler() {} - - virtual void read(Context* ctx, Rin msg) = 0; - virtual void readEOF(Context* ctx) { - ctx->fireReadEOF(); - } - virtual void readException(Context* ctx, exception_wrapper e) { - ctx->fireReadException(std::move(e)); - } - - virtual Future write(Context* ctx, Win msg) = 0; - virtual Future close(Context* ctx) { - return ctx->fireClose(); - } - - virtual void attachPipeline(Context* ctx) {} - virtual void attachTransport(Context* ctx) {} - - virtual void detachPipeline(Context* ctx) {} - virtual void detachTransport(Context* ctx) {} - - /* - // Other sorts of things we might want, all shamelessly stolen from Netty - // inbound - virtual void exceptionCaught( - ChannelHandlerContext* ctx, - exception_wrapper e) {} - virtual void channelRegistered(ChannelHandlerContext* ctx) {} - virtual void channelUnregistered(ChannelHandlerContext* ctx) {} - virtual void channelActive(ChannelHandlerContext* ctx) {} - virtual void channelInactive(ChannelHandlerContext* ctx) {} - virtual void channelReadComplete(ChannelHandlerContext* ctx) {} - virtual void userEventTriggered(ChannelHandlerContext* ctx, void* evt) {} - virtual void channelWritabilityChanged(ChannelHandlerContext* ctx) {} - - // outbound - virtual Future bind( - ChannelHandlerContext* ctx, - SocketAddress localAddress) {} - virtual Future connect( - ChannelHandlerContext* ctx, - SocketAddress remoteAddress, SocketAddress localAddress) {} - virtual Future disconnect(ChannelHandlerContext* ctx) {} - virtual Future deregister(ChannelHandlerContext* ctx) {} - virtual Future read(ChannelHandlerContext* ctx) {} - virtual void flush(ChannelHandlerContext* ctx) {} - */ -}; - -template -class ChannelHandlerAdapter : public ChannelHandler { - public: - typedef typename ChannelHandler::Context Context; - - void read(Context* ctx, R msg) override { - ctx->fireRead(std::forward(msg)); - } - - Future write(Context* ctx, W msg) override { - return ctx->fireWrite(std::forward(msg)); - } -}; - -typedef ChannelHandlerAdapter> -BytesToBytesHandler; - -template -class ChannelHandlerPtr : public ChannelHandler< - typename Handler::rin, - typename Handler::rout, - typename Handler::win, - typename Handler::wout> { - public: - typedef typename std::conditional< - Shared, - std::shared_ptr, - Handler*>::type - HandlerPtr; - - typedef typename Handler::Context Context; - - explicit ChannelHandlerPtr(HandlerPtr handler) - : handler_(std::move(handler)) {} - - HandlerPtr getHandler() { - return handler_; - } - - void setHandler(HandlerPtr handler) { - if (handler == handler_) { - return; - } - if (handler_ && ctx_) { - handler_->detachPipeline(ctx_); - } - handler_ = std::move(handler); - if (handler_ && ctx_) { - handler_->attachPipeline(ctx_); - if (ctx_->getTransport()) { - handler_->attachTransport(ctx_); - } - } - } - - void attachPipeline(Context* ctx) override { - ctx_ = ctx; - if (handler_) { - handler_->attachPipeline(ctx_); - } - } - - void attachTransport(Context* ctx) override { - ctx_ = ctx; - if (handler_) { - handler_->attachTransport(ctx_); - } - } - - void detachPipeline(Context* ctx) override { - ctx_ = ctx; - if (handler_) { - handler_->detachPipeline(ctx_); - } - } - - void detachTransport(Context* ctx) override { - ctx_ = ctx; - if (handler_) { - handler_->detachTransport(ctx_); - } - } - - void read(Context* ctx, typename Handler::rin msg) override { - DCHECK(handler_); - handler_->read(ctx, std::forward(msg)); - } - - void readEOF(Context* ctx) override { - DCHECK(handler_); - handler_->readEOF(ctx); - } - - void readException(Context* ctx, exception_wrapper e) override { - DCHECK(handler_); - handler_->readException(ctx, std::move(e)); - } - - Future write(Context* ctx, typename Handler::win msg) override { - DCHECK(handler_); - return handler_->write(ctx, std::forward(msg)); - } - - Future close(Context* ctx) override { - DCHECK(handler_); - return handler_->close(ctx); - } - - private: - Context* ctx_; - HandlerPtr handler_; -}; - -}} diff --git a/folly/wangle/channel/ChannelHandlerContext.h b/folly/wangle/channel/ChannelHandlerContext.h deleted file mode 100644 index 0cb0fceb..00000000 --- a/folly/wangle/channel/ChannelHandlerContext.h +++ /dev/null @@ -1,252 +0,0 @@ -/* - * Copyright 2015 Facebook, Inc. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -#pragma once - -#include -#include -#include - -namespace folly { namespace wangle { - -template -class ChannelHandlerContext { - public: - virtual ~ChannelHandlerContext() {} - - virtual void fireRead(In msg) = 0; - virtual void fireReadEOF() = 0; - virtual void fireReadException(exception_wrapper e) = 0; - - virtual Future fireWrite(Out msg) = 0; - virtual Future fireClose() = 0; - - virtual std::shared_ptr getTransport() = 0; - - virtual void setWriteFlags(WriteFlags flags) = 0; - virtual WriteFlags getWriteFlags() = 0; - - virtual void setReadBufferSettings( - uint64_t minAvailable, - uint64_t allocationSize) = 0; - virtual std::pair getReadBufferSettings() = 0; - - /* TODO - template - virtual void addHandlerBefore(H&&) {} - template - virtual void addHandlerAfter(H&&) {} - template - virtual void replaceHandler(H&&) {} - virtual void removeHandler() {} - */ -}; - -class PipelineContext { - public: - virtual ~PipelineContext() {} - - virtual void attachTransport() = 0; - virtual void detachTransport() = 0; - - void link(PipelineContext* other) { - setNextIn(other); - other->setNextOut(this); - } - - protected: - virtual void setNextIn(PipelineContext* ctx) = 0; - virtual void setNextOut(PipelineContext* ctx) = 0; -}; - -template -class InboundChannelHandlerContext { - public: - virtual ~InboundChannelHandlerContext() {} - virtual void read(In msg) = 0; - virtual void readEOF() = 0; - virtual void readException(exception_wrapper e) = 0; -}; - -template -class OutboundChannelHandlerContext { - public: - virtual ~OutboundChannelHandlerContext() {} - virtual Future write(Out msg) = 0; - virtual Future close() = 0; -}; - -template -class ContextImpl : public ChannelHandlerContext, - public InboundChannelHandlerContext, - public OutboundChannelHandlerContext, - public PipelineContext { - public: - typedef typename H::rin Rin; - typedef typename H::rout Rout; - typedef typename H::win Win; - typedef typename H::wout Wout; - - template - explicit ContextImpl(P* pipeline, HandlerArg&& handlerArg) - : pipeline_(pipeline), - handler_(std::forward(handlerArg)) { - handler_.attachPipeline(this); - } - - ~ContextImpl() { - handler_.detachPipeline(this); - } - - H* getHandler() { - return &handler_; - } - - // PipelineContext overrides - void setNextIn(PipelineContext* ctx) override { - auto nextIn = dynamic_cast*>(ctx); - if (nextIn) { - nextIn_ = nextIn; - } else { - throw std::invalid_argument("wrong type in setNextIn"); - } - } - - void setNextOut(PipelineContext* ctx) override { - auto nextOut = dynamic_cast*>(ctx); - if (nextOut) { - nextOut_ = nextOut; - } else { - throw std::invalid_argument("wrong type in setNextOut"); - } - } - - void attachTransport() override { - typename P::DestructorGuard dg(static_cast(pipeline_)); - handler_.attachTransport(this); - } - - void detachTransport() override { - typename P::DestructorGuard dg(static_cast(pipeline_)); - handler_.detachTransport(this); - } - - // ChannelHandlerContext overrides - void fireRead(Rout msg) override { - typename P::DestructorGuard dg(static_cast(pipeline_)); - if (nextIn_) { - nextIn_->read(std::forward(msg)); - } else { - LOG(WARNING) << "read reached end of pipeline"; - } - } - - void fireReadEOF() override { - typename P::DestructorGuard dg(static_cast(pipeline_)); - if (nextIn_) { - nextIn_->readEOF(); - } else { - LOG(WARNING) << "readEOF reached end of pipeline"; - } - } - - void fireReadException(exception_wrapper e) override { - typename P::DestructorGuard dg(static_cast(pipeline_)); - if (nextIn_) { - nextIn_->readException(std::move(e)); - } else { - LOG(WARNING) << "readException reached end of pipeline"; - } - } - - Future fireWrite(Wout msg) override { - typename P::DestructorGuard dg(static_cast(pipeline_)); - if (nextOut_) { - return nextOut_->write(std::forward(msg)); - } else { - LOG(WARNING) << "write reached end of pipeline"; - return makeFuture(); - } - } - - Future fireClose() override { - typename P::DestructorGuard dg(static_cast(pipeline_)); - if (nextOut_) { - return nextOut_->close(); - } else { - LOG(WARNING) << "close reached end of pipeline"; - return makeFuture(); - } - } - - std::shared_ptr getTransport() override { - return pipeline_->getTransport(); - } - - void setWriteFlags(WriteFlags flags) override { - pipeline_->setWriteFlags(flags); - } - - WriteFlags getWriteFlags() override { - return pipeline_->getWriteFlags(); - } - - void setReadBufferSettings( - uint64_t minAvailable, - uint64_t allocationSize) override { - pipeline_->setReadBufferSettings(minAvailable, allocationSize); - } - - std::pair getReadBufferSettings() override { - return pipeline_->getReadBufferSettings(); - } - - // InboundChannelHandlerContext overrides - void read(Rin msg) override { - typename P::DestructorGuard dg(static_cast(pipeline_)); - handler_.read(this, std::forward(msg)); - } - - void readEOF() override { - typename P::DestructorGuard dg(static_cast(pipeline_)); - handler_.readEOF(this); - } - - void readException(exception_wrapper e) override { - typename P::DestructorGuard dg(static_cast(pipeline_)); - handler_.readException(this, std::move(e)); - } - - // OutboundChannelHandlerContext overrides - Future write(Win msg) override { - typename P::DestructorGuard dg(static_cast(pipeline_)); - return handler_.write(this, std::forward(msg)); - } - - Future close() override { - typename P::DestructorGuard dg(static_cast(pipeline_)); - return handler_.close(this); - } - - private: - P* pipeline_; - H handler_; - InboundChannelHandlerContext* nextIn_{nullptr}; - OutboundChannelHandlerContext* nextOut_{nullptr}; -}; - -}} diff --git a/folly/wangle/channel/ChannelPipeline.h b/folly/wangle/channel/ChannelPipeline.h deleted file mode 100644 index f7918370..00000000 --- a/folly/wangle/channel/ChannelPipeline.h +++ /dev/null @@ -1,340 +0,0 @@ -/* - * Copyright 2015 Facebook, Inc. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -#pragma once - -#include -#include -#include -#include -#include -#include -#include - -namespace folly { namespace wangle { - -/* - * R is the inbound type, i.e. inbound calls start with pipeline.read(R) - * W is the outbound type, i.e. outbound calls start with pipeline.write(W) - */ -template -class ChannelPipeline; - -template -class ChannelPipeline : public DelayedDestruction { - public: - ChannelPipeline() {} - ~ChannelPipeline() {} - - std::shared_ptr getTransport() { - return transport_; - } - - void setWriteFlags(WriteFlags flags) { - writeFlags_ = flags; - } - - WriteFlags getWriteFlags() { - return writeFlags_; - } - - void setReadBufferSettings(uint64_t minAvailable, uint64_t allocationSize) { - readBufferSettings_ = std::make_pair(minAvailable, allocationSize); - } - - std::pair getReadBufferSettings() { - return readBufferSettings_; - } - - void read(R msg) { - front_->read(std::forward(msg)); - } - - void readEOF() { - front_->readEOF(); - } - - void readException(exception_wrapper e) { - front_->readException(std::move(e)); - } - - Future write(W msg) { - return back_->write(std::forward(msg)); - } - - Future close() { - return back_->close(); - } - - template - ChannelPipeline& addBack(H&& handler) { - ctxs_.push_back(folly::make_unique>( - this, std::forward(handler))); - return *this; - } - - template - ChannelPipeline& addFront(H&& handler) { - ctxs_.insert( - ctxs_.begin(), - folly::make_unique>( - this, - std::forward(handler))); - return *this; - } - - template - H* getHandler(int i) { - auto ctx = dynamic_cast*>(ctxs_[i].get()); - CHECK(ctx); - return ctx->getHandler(); - } - - void finalize() { - finalizeHelper(); - InboundChannelHandlerContext* front; - front_ = dynamic_cast*>( - ctxs_.front().get()); - if (!front_) { - throw std::invalid_argument("wrong type for first handler"); - } - } - - protected: - explicit ChannelPipeline(bool shouldFinalize) { - CHECK(!shouldFinalize); - } - - void finalizeHelper() { - if (ctxs_.empty()) { - return; - } - - for (size_t i = 0; i < ctxs_.size() - 1; i++) { - ctxs_[i]->link(ctxs_[i+1].get()); - } - - back_ = dynamic_cast*>(ctxs_.back().get()); - if (!back_) { - throw std::invalid_argument("wrong type for last handler"); - } - } - - PipelineContext* getLocalFront() { - return ctxs_.empty() ? nullptr : ctxs_.front().get(); - } - - static const bool is_end{true}; - - std::shared_ptr transport_; - WriteFlags writeFlags_{WriteFlags::NONE}; - std::pair readBufferSettings_{2048, 2048}; - - void attachPipeline() {} - - void attachTransport( - std::shared_ptr transport) { - transport_ = std::move(transport); - } - - void detachTransport() { - transport_ = nullptr; - } - - OutboundChannelHandlerContext* back_{nullptr}; - - private: - InboundChannelHandlerContext* front_{nullptr}; - std::vector> ctxs_; -}; - -template -class ChannelPipeline - : public ChannelPipeline { - protected: - template - ChannelPipeline( - bool shouldFinalize, - HandlerArg&& handlerArg, - HandlersArgs&&... handlersArgs) - : ChannelPipeline( - false, - std::forward(handlersArgs)...), - ctx_(this, std::forward(handlerArg)) { - if (shouldFinalize) { - finalize(); - } - } - - public: - template - explicit ChannelPipeline(HandlersArgs&&... handlersArgs) - : ChannelPipeline(true, std::forward(handlersArgs)...) {} - - ~ChannelPipeline() {} - - void read(R msg) { - typename ChannelPipeline::DestructorGuard dg( - static_cast(this)); - front_->read(std::forward(msg)); - } - - void readEOF() { - typename ChannelPipeline::DestructorGuard dg( - static_cast(this)); - front_->readEOF(); - } - - void readException(exception_wrapper e) { - typename ChannelPipeline::DestructorGuard dg( - static_cast(this)); - front_->readException(std::move(e)); - } - - Future write(W msg) { - typename ChannelPipeline::DestructorGuard dg( - static_cast(this)); - return back_->write(std::forward(msg)); - } - - Future close() { - typename ChannelPipeline::DestructorGuard dg( - static_cast(this)); - return back_->close(); - } - - void attachTransport( - std::shared_ptr transport) { - typename ChannelPipeline::DestructorGuard dg( - static_cast(this)); - CHECK((!ChannelPipeline::transport_)); - ChannelPipeline::attachTransport(std::move(transport)); - forEachCtx([&](PipelineContext* ctx){ - ctx->attachTransport(); - }); - } - - void detachTransport() { - typename ChannelPipeline::DestructorGuard dg( - static_cast(this)); - ChannelPipeline::detachTransport(); - forEachCtx([&](PipelineContext* ctx){ - ctx->detachTransport(); - }); - } - - std::shared_ptr getTransport() { - return ChannelPipeline::transport_; - } - - template - ChannelPipeline& addBack(H&& handler) { - ChannelPipeline::addBack(std::move(handler)); - return *this; - } - - template - ChannelPipeline& addFront(H&& handler) { - ctxs_.insert( - ctxs_.begin(), - folly::make_unique>( - this, - std::move(handler))); - return *this; - } - - template - H* getHandler(size_t i) { - if (i > ctxs_.size()) { - return ChannelPipeline::template getHandler( - i - (ctxs_.size() + 1)); - } else { - auto pctx = (i == ctxs_.size()) ? &ctx_ : ctxs_[i].get(); - auto ctx = dynamic_cast*>(pctx); - return ctx->getHandler(); - } - } - - void finalize() { - finalizeHelper(); - auto ctx = ctxs_.empty() ? &ctx_ : ctxs_.front().get(); - front_ = dynamic_cast*>(ctx); - if (!front_) { - throw std::invalid_argument("wrong type for first handler"); - } - } - - protected: - void finalizeHelper() { - ChannelPipeline::finalizeHelper(); - back_ = ChannelPipeline::back_; - if (!back_) { - auto is_at_end = ChannelPipeline::is_end; - CHECK(is_at_end); - back_ = dynamic_cast*>(&ctx_); - if (!back_) { - throw std::invalid_argument("wrong type for last handler"); - } - } - - if (!ctxs_.empty()) { - for (size_t i = 0; i < ctxs_.size() - 1; i++) { - ctxs_[i]->link(ctxs_[i+1].get()); - } - ctxs_.back()->link(&ctx_); - } - - auto nextFront = ChannelPipeline::getLocalFront(); - if (nextFront) { - ctx_.link(nextFront); - } - } - - PipelineContext* getLocalFront() { - return ctxs_.empty() ? &ctx_ : ctxs_.front().get(); - } - - static const bool is_end{false}; - InboundChannelHandlerContext* front_{nullptr}; - OutboundChannelHandlerContext* back_{nullptr}; - - private: - template - void forEachCtx(const F& func) { - for (auto& ctx : ctxs_) { - func(ctx.get()); - } - func(&ctx_); - } - - ContextImpl ctx_; - std::vector> ctxs_; -}; - -}} - -namespace folly { - -class AsyncSocket; - -template -class PipelineFactory { - public: - virtual Pipeline* newPipeline(std::shared_ptr) = 0; - virtual ~PipelineFactory() {} -}; - -} diff --git a/folly/wangle/channel/Handler.h b/folly/wangle/channel/Handler.h new file mode 100644 index 00000000..67219f17 --- /dev/null +++ b/folly/wangle/channel/Handler.h @@ -0,0 +1,196 @@ +/* + * Copyright 2015 Facebook, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#pragma once + +#include +#include +#include +#include + +namespace folly { namespace wangle { + +template +class Handler { + public: + typedef Rin rin; + typedef Rout rout; + typedef Win win; + typedef Wout wout; + typedef HandlerContext Context; + virtual ~Handler() {} + + virtual void read(Context* ctx, Rin msg) = 0; + virtual void readEOF(Context* ctx) { + ctx->fireReadEOF(); + } + virtual void readException(Context* ctx, exception_wrapper e) { + ctx->fireReadException(std::move(e)); + } + + virtual Future write(Context* ctx, Win msg) = 0; + virtual Future close(Context* ctx) { + return ctx->fireClose(); + } + + virtual void attachPipeline(Context* ctx) {} + virtual void attachTransport(Context* ctx) {} + + virtual void detachPipeline(Context* ctx) {} + virtual void detachTransport(Context* ctx) {} + + /* + // Other sorts of things we might want, all shamelessly stolen from Netty + // inbound + virtual void exceptionCaught( + HandlerContext* ctx, + exception_wrapper e) {} + virtual void channelRegistered(HandlerContext* ctx) {} + virtual void channelUnregistered(HandlerContext* ctx) {} + virtual void channelActive(HandlerContext* ctx) {} + virtual void channelInactive(HandlerContext* ctx) {} + virtual void channelReadComplete(HandlerContext* ctx) {} + virtual void userEventTriggered(HandlerContext* ctx, void* evt) {} + virtual void channelWritabilityChanged(HandlerContext* ctx) {} + + // outbound + virtual Future bind( + HandlerContext* ctx, + SocketAddress localAddress) {} + virtual Future connect( + HandlerContext* ctx, + SocketAddress remoteAddress, SocketAddress localAddress) {} + virtual Future disconnect(HandlerContext* ctx) {} + virtual Future deregister(HandlerContext* ctx) {} + virtual Future read(HandlerContext* ctx) {} + virtual void flush(HandlerContext* ctx) {} + */ +}; + +template +class HandlerAdapter : public Handler { + public: + typedef typename Handler::Context Context; + + void read(Context* ctx, R msg) override { + ctx->fireRead(std::forward(msg)); + } + + Future write(Context* ctx, W msg) override { + return ctx->fireWrite(std::forward(msg)); + } +}; + +typedef HandlerAdapter> +BytesToBytesHandler; + +template +class HandlerPtr : public Handler< + typename HandlerT::rin, + typename HandlerT::rout, + typename HandlerT::win, + typename HandlerT::wout> { + public: + typedef typename std::conditional< + Shared, + std::shared_ptr, + HandlerT*>::type + Ptr; + + typedef typename HandlerT::Context Context; + + explicit HandlerPtr(Ptr handler) + : handler_(std::move(handler)) {} + + Ptr getHandler() { + return handler_; + } + + void setHandler(Ptr handler) { + if (handler == handler_) { + return; + } + if (handler_ && ctx_) { + handler_->detachPipeline(ctx_); + } + handler_ = std::move(handler); + if (handler_ && ctx_) { + handler_->attachPipeline(ctx_); + if (ctx_->getTransport()) { + handler_->attachTransport(ctx_); + } + } + } + + void attachPipeline(Context* ctx) override { + ctx_ = ctx; + if (handler_) { + handler_->attachPipeline(ctx_); + } + } + + void attachTransport(Context* ctx) override { + ctx_ = ctx; + if (handler_) { + handler_->attachTransport(ctx_); + } + } + + void detachPipeline(Context* ctx) override { + ctx_ = ctx; + if (handler_) { + handler_->detachPipeline(ctx_); + } + } + + void detachTransport(Context* ctx) override { + ctx_ = ctx; + if (handler_) { + handler_->detachTransport(ctx_); + } + } + + void read(Context* ctx, typename HandlerT::rin msg) override { + DCHECK(handler_); + handler_->read(ctx, std::forward(msg)); + } + + void readEOF(Context* ctx) override { + DCHECK(handler_); + handler_->readEOF(ctx); + } + + void readException(Context* ctx, exception_wrapper e) override { + DCHECK(handler_); + handler_->readException(ctx, std::move(e)); + } + + Future write(Context* ctx, typename HandlerT::win msg) override { + DCHECK(handler_); + return handler_->write(ctx, std::forward(msg)); + } + + Future close(Context* ctx) override { + DCHECK(handler_); + return handler_->close(ctx); + } + + private: + Context* ctx_; + Ptr handler_; +}; + +}} diff --git a/folly/wangle/channel/HandlerContext.h b/folly/wangle/channel/HandlerContext.h new file mode 100644 index 00000000..809f2b1a --- /dev/null +++ b/folly/wangle/channel/HandlerContext.h @@ -0,0 +1,252 @@ +/* + * Copyright 2015 Facebook, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#pragma once + +#include +#include +#include + +namespace folly { namespace wangle { + +template +class HandlerContext { + public: + virtual ~HandlerContext() {} + + virtual void fireRead(In msg) = 0; + virtual void fireReadEOF() = 0; + virtual void fireReadException(exception_wrapper e) = 0; + + virtual Future fireWrite(Out msg) = 0; + virtual Future fireClose() = 0; + + virtual std::shared_ptr getTransport() = 0; + + virtual void setWriteFlags(WriteFlags flags) = 0; + virtual WriteFlags getWriteFlags() = 0; + + virtual void setReadBufferSettings( + uint64_t minAvailable, + uint64_t allocationSize) = 0; + virtual std::pair getReadBufferSettings() = 0; + + /* TODO + template + virtual void addHandlerBefore(H&&) {} + template + virtual void addHandlerAfter(H&&) {} + template + virtual void replaceHandler(H&&) {} + virtual void removeHandler() {} + */ +}; + +class PipelineContext { + public: + virtual ~PipelineContext() {} + + virtual void attachTransport() = 0; + virtual void detachTransport() = 0; + + void link(PipelineContext* other) { + setNextIn(other); + other->setNextOut(this); + } + + protected: + virtual void setNextIn(PipelineContext* ctx) = 0; + virtual void setNextOut(PipelineContext* ctx) = 0; +}; + +template +class InboundHandlerContext { + public: + virtual ~InboundHandlerContext() {} + virtual void read(In msg) = 0; + virtual void readEOF() = 0; + virtual void readException(exception_wrapper e) = 0; +}; + +template +class OutboundHandlerContext { + public: + virtual ~OutboundHandlerContext() {} + virtual Future write(Out msg) = 0; + virtual Future close() = 0; +}; + +template +class ContextImpl : public HandlerContext, + public InboundHandlerContext, + public OutboundHandlerContext, + public PipelineContext { + public: + typedef typename H::rin Rin; + typedef typename H::rout Rout; + typedef typename H::win Win; + typedef typename H::wout Wout; + + template + explicit ContextImpl(P* pipeline, HandlerArg&& handlerArg) + : pipeline_(pipeline), + handler_(std::forward(handlerArg)) { + handler_.attachPipeline(this); + } + + ~ContextImpl() { + handler_.detachPipeline(this); + } + + H* getHandler() { + return &handler_; + } + + // PipelineContext overrides + void setNextIn(PipelineContext* ctx) override { + auto nextIn = dynamic_cast*>(ctx); + if (nextIn) { + nextIn_ = nextIn; + } else { + throw std::invalid_argument("wrong type in setNextIn"); + } + } + + void setNextOut(PipelineContext* ctx) override { + auto nextOut = dynamic_cast*>(ctx); + if (nextOut) { + nextOut_ = nextOut; + } else { + throw std::invalid_argument("wrong type in setNextOut"); + } + } + + void attachTransport() override { + typename P::DestructorGuard dg(static_cast(pipeline_)); + handler_.attachTransport(this); + } + + void detachTransport() override { + typename P::DestructorGuard dg(static_cast(pipeline_)); + handler_.detachTransport(this); + } + + // HandlerContext overrides + void fireRead(Rout msg) override { + typename P::DestructorGuard dg(static_cast(pipeline_)); + if (nextIn_) { + nextIn_->read(std::forward(msg)); + } else { + LOG(WARNING) << "read reached end of pipeline"; + } + } + + void fireReadEOF() override { + typename P::DestructorGuard dg(static_cast(pipeline_)); + if (nextIn_) { + nextIn_->readEOF(); + } else { + LOG(WARNING) << "readEOF reached end of pipeline"; + } + } + + void fireReadException(exception_wrapper e) override { + typename P::DestructorGuard dg(static_cast(pipeline_)); + if (nextIn_) { + nextIn_->readException(std::move(e)); + } else { + LOG(WARNING) << "readException reached end of pipeline"; + } + } + + Future fireWrite(Wout msg) override { + typename P::DestructorGuard dg(static_cast(pipeline_)); + if (nextOut_) { + return nextOut_->write(std::forward(msg)); + } else { + LOG(WARNING) << "write reached end of pipeline"; + return makeFuture(); + } + } + + Future fireClose() override { + typename P::DestructorGuard dg(static_cast(pipeline_)); + if (nextOut_) { + return nextOut_->close(); + } else { + LOG(WARNING) << "close reached end of pipeline"; + return makeFuture(); + } + } + + std::shared_ptr getTransport() override { + return pipeline_->getTransport(); + } + + void setWriteFlags(WriteFlags flags) override { + pipeline_->setWriteFlags(flags); + } + + WriteFlags getWriteFlags() override { + return pipeline_->getWriteFlags(); + } + + void setReadBufferSettings( + uint64_t minAvailable, + uint64_t allocationSize) override { + pipeline_->setReadBufferSettings(minAvailable, allocationSize); + } + + std::pair getReadBufferSettings() override { + return pipeline_->getReadBufferSettings(); + } + + // InboundHandlerContext overrides + void read(Rin msg) override { + typename P::DestructorGuard dg(static_cast(pipeline_)); + handler_.read(this, std::forward(msg)); + } + + void readEOF() override { + typename P::DestructorGuard dg(static_cast(pipeline_)); + handler_.readEOF(this); + } + + void readException(exception_wrapper e) override { + typename P::DestructorGuard dg(static_cast(pipeline_)); + handler_.readException(this, std::move(e)); + } + + // OutboundHandlerContext overrides + Future write(Win msg) override { + typename P::DestructorGuard dg(static_cast(pipeline_)); + return handler_.write(this, std::forward(msg)); + } + + Future close() override { + typename P::DestructorGuard dg(static_cast(pipeline_)); + return handler_.close(this); + } + + private: + P* pipeline_; + H handler_; + InboundHandlerContext* nextIn_{nullptr}; + OutboundHandlerContext* nextOut_{nullptr}; +}; + +}} diff --git a/folly/wangle/channel/OutputBufferingHandler.h b/folly/wangle/channel/OutputBufferingHandler.h index e5ca99ae..73fc0666 100644 --- a/folly/wangle/channel/OutputBufferingHandler.h +++ b/folly/wangle/channel/OutputBufferingHandler.h @@ -16,7 +16,7 @@ #pragma once -#include +#include #include #include #include diff --git a/folly/wangle/channel/Pipeline.h b/folly/wangle/channel/Pipeline.h new file mode 100644 index 00000000..7d4fd2a6 --- /dev/null +++ b/folly/wangle/channel/Pipeline.h @@ -0,0 +1,340 @@ +/* + * Copyright 2015 Facebook, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#pragma once + +#include +#include +#include +#include +#include +#include +#include + +namespace folly { namespace wangle { + +/* + * R is the inbound type, i.e. inbound calls start with pipeline.read(R) + * W is the outbound type, i.e. outbound calls start with pipeline.write(W) + */ +template +class Pipeline; + +template +class Pipeline : public DelayedDestruction { + public: + Pipeline() {} + ~Pipeline() {} + + std::shared_ptr getTransport() { + return transport_; + } + + void setWriteFlags(WriteFlags flags) { + writeFlags_ = flags; + } + + WriteFlags getWriteFlags() { + return writeFlags_; + } + + void setReadBufferSettings(uint64_t minAvailable, uint64_t allocationSize) { + readBufferSettings_ = std::make_pair(minAvailable, allocationSize); + } + + std::pair getReadBufferSettings() { + return readBufferSettings_; + } + + void read(R msg) { + front_->read(std::forward(msg)); + } + + void readEOF() { + front_->readEOF(); + } + + void readException(exception_wrapper e) { + front_->readException(std::move(e)); + } + + Future write(W msg) { + return back_->write(std::forward(msg)); + } + + Future close() { + return back_->close(); + } + + template + Pipeline& addBack(H&& handler) { + ctxs_.push_back(folly::make_unique>( + this, std::forward(handler))); + return *this; + } + + template + Pipeline& addFront(H&& handler) { + ctxs_.insert( + ctxs_.begin(), + folly::make_unique>( + this, + std::forward(handler))); + return *this; + } + + template + H* getHandler(int i) { + auto ctx = dynamic_cast*>(ctxs_[i].get()); + CHECK(ctx); + return ctx->getHandler(); + } + + void finalize() { + finalizeHelper(); + InboundHandlerContext* front; + front_ = dynamic_cast*>( + ctxs_.front().get()); + if (!front_) { + throw std::invalid_argument("wrong type for first handler"); + } + } + + protected: + explicit Pipeline(bool shouldFinalize) { + CHECK(!shouldFinalize); + } + + void finalizeHelper() { + if (ctxs_.empty()) { + return; + } + + for (size_t i = 0; i < ctxs_.size() - 1; i++) { + ctxs_[i]->link(ctxs_[i+1].get()); + } + + back_ = dynamic_cast*>(ctxs_.back().get()); + if (!back_) { + throw std::invalid_argument("wrong type for last handler"); + } + } + + PipelineContext* getLocalFront() { + return ctxs_.empty() ? nullptr : ctxs_.front().get(); + } + + static const bool is_end{true}; + + std::shared_ptr transport_; + WriteFlags writeFlags_{WriteFlags::NONE}; + std::pair readBufferSettings_{2048, 2048}; + + void attachPipeline() {} + + void attachTransport( + std::shared_ptr transport) { + transport_ = std::move(transport); + } + + void detachTransport() { + transport_ = nullptr; + } + + OutboundHandlerContext* back_{nullptr}; + + private: + InboundHandlerContext* front_{nullptr}; + std::vector> ctxs_; +}; + +template +class Pipeline + : public Pipeline { + protected: + template + Pipeline( + bool shouldFinalize, + HandlerArg&& handlerArg, + HandlersArgs&&... handlersArgs) + : Pipeline( + false, + std::forward(handlersArgs)...), + ctx_(this, std::forward(handlerArg)) { + if (shouldFinalize) { + finalize(); + } + } + + public: + template + explicit Pipeline(HandlersArgs&&... handlersArgs) + : Pipeline(true, std::forward(handlersArgs)...) {} + + ~Pipeline() {} + + void read(R msg) { + typename Pipeline::DestructorGuard dg( + static_cast(this)); + front_->read(std::forward(msg)); + } + + void readEOF() { + typename Pipeline::DestructorGuard dg( + static_cast(this)); + front_->readEOF(); + } + + void readException(exception_wrapper e) { + typename Pipeline::DestructorGuard dg( + static_cast(this)); + front_->readException(std::move(e)); + } + + Future write(W msg) { + typename Pipeline::DestructorGuard dg( + static_cast(this)); + return back_->write(std::forward(msg)); + } + + Future close() { + typename Pipeline::DestructorGuard dg( + static_cast(this)); + return back_->close(); + } + + void attachTransport( + std::shared_ptr transport) { + typename Pipeline::DestructorGuard dg( + static_cast(this)); + CHECK((!Pipeline::transport_)); + Pipeline::attachTransport(std::move(transport)); + forEachCtx([&](PipelineContext* ctx){ + ctx->attachTransport(); + }); + } + + void detachTransport() { + typename Pipeline::DestructorGuard dg( + static_cast(this)); + Pipeline::detachTransport(); + forEachCtx([&](PipelineContext* ctx){ + ctx->detachTransport(); + }); + } + + std::shared_ptr getTransport() { + return Pipeline::transport_; + } + + template + Pipeline& addBack(H&& handler) { + Pipeline::addBack(std::move(handler)); + return *this; + } + + template + Pipeline& addFront(H&& handler) { + ctxs_.insert( + ctxs_.begin(), + folly::make_unique>( + this, + std::move(handler))); + return *this; + } + + template + H* getHandler(size_t i) { + if (i > ctxs_.size()) { + return Pipeline::template getHandler( + i - (ctxs_.size() + 1)); + } else { + auto pctx = (i == ctxs_.size()) ? &ctx_ : ctxs_[i].get(); + auto ctx = dynamic_cast*>(pctx); + return ctx->getHandler(); + } + } + + void finalize() { + finalizeHelper(); + auto ctx = ctxs_.empty() ? &ctx_ : ctxs_.front().get(); + front_ = dynamic_cast*>(ctx); + if (!front_) { + throw std::invalid_argument("wrong type for first handler"); + } + } + + protected: + void finalizeHelper() { + Pipeline::finalizeHelper(); + back_ = Pipeline::back_; + if (!back_) { + auto is_at_end = Pipeline::is_end; + CHECK(is_at_end); + back_ = dynamic_cast*>(&ctx_); + if (!back_) { + throw std::invalid_argument("wrong type for last handler"); + } + } + + if (!ctxs_.empty()) { + for (size_t i = 0; i < ctxs_.size() - 1; i++) { + ctxs_[i]->link(ctxs_[i+1].get()); + } + ctxs_.back()->link(&ctx_); + } + + auto nextFront = Pipeline::getLocalFront(); + if (nextFront) { + ctx_.link(nextFront); + } + } + + PipelineContext* getLocalFront() { + return ctxs_.empty() ? &ctx_ : ctxs_.front().get(); + } + + static const bool is_end{false}; + InboundHandlerContext* front_{nullptr}; + OutboundHandlerContext* back_{nullptr}; + + private: + template + void forEachCtx(const F& func) { + for (auto& ctx : ctxs_) { + func(ctx.get()); + } + func(&ctx_); + } + + ContextImpl ctx_; + std::vector> ctxs_; +}; + +}} + +namespace folly { + +class AsyncSocket; + +template +class PipelineFactory { + public: + virtual Pipeline* newPipeline(std::shared_ptr) = 0; + virtual ~PipelineFactory() {} +}; + +} diff --git a/folly/wangle/channel/test/ChannelPipelineTest.cpp b/folly/wangle/channel/test/ChannelPipelineTest.cpp deleted file mode 100644 index 0be3db28..00000000 --- a/folly/wangle/channel/test/ChannelPipelineTest.cpp +++ /dev/null @@ -1,251 +0,0 @@ -/* - * Copyright 2015 Facebook, Inc. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -#include -#include -#include -#include -#include -#include -#include - -using namespace folly; -using namespace folly::wangle; -using namespace testing; - -typedef StrictMock> IntHandler; -typedef ChannelHandlerPtr IntHandlerPtr; - -ACTION(FireRead) { - arg0->fireRead(arg1); -} - -ACTION(FireReadEOF) { - arg0->fireReadEOF(); -} - -ACTION(FireReadException) { - arg0->fireReadException(arg1); -} - -ACTION(FireWrite) { - arg0->fireWrite(arg1); -} - -ACTION(FireClose) { - arg0->fireClose(); -} - -// Test move only types, among other things -TEST(ChannelTest, RealHandlersCompile) { - EventBase eb; - auto socket = AsyncSocket::newSocket(&eb); - // static - { - ChannelPipeline, - AsyncSocketHandler, - OutputBufferingHandler> - pipeline{AsyncSocketHandler(socket), OutputBufferingHandler()}; - EXPECT_TRUE(pipeline.getHandler(0)); - EXPECT_TRUE(pipeline.getHandler(1)); - } - // dynamic - { - ChannelPipeline> pipeline; - pipeline - .addBack(AsyncSocketHandler(socket)) - .addBack(OutputBufferingHandler()) - .finalize(); - EXPECT_TRUE(pipeline.getHandler(0)); - EXPECT_TRUE(pipeline.getHandler(1)); - } -} - -// Test that handlers correctly fire the next handler when directed -TEST(ChannelTest, FireActions) { - IntHandler handler1; - IntHandler handler2; - - EXPECT_CALL(handler1, attachPipeline(_)); - EXPECT_CALL(handler2, attachPipeline(_)); - - ChannelPipeline - pipeline(&handler1, &handler2); - - EXPECT_CALL(handler1, read_(_, _)).WillOnce(FireRead()); - EXPECT_CALL(handler2, read_(_, _)).Times(1); - pipeline.read(1); - - EXPECT_CALL(handler1, readEOF(_)).WillOnce(FireReadEOF()); - EXPECT_CALL(handler2, readEOF(_)).Times(1); - pipeline.readEOF(); - - EXPECT_CALL(handler1, readException(_, _)).WillOnce(FireReadException()); - EXPECT_CALL(handler2, readException(_, _)).Times(1); - pipeline.readException(make_exception_wrapper("blah")); - - EXPECT_CALL(handler2, write_(_, _)).WillOnce(FireWrite()); - EXPECT_CALL(handler1, write_(_, _)).Times(1); - EXPECT_NO_THROW(pipeline.write(1).value()); - - EXPECT_CALL(handler2, close_(_)).WillOnce(FireClose()); - EXPECT_CALL(handler1, close_(_)).Times(1); - EXPECT_NO_THROW(pipeline.close().value()); - - EXPECT_CALL(handler1, detachPipeline(_)); - EXPECT_CALL(handler2, detachPipeline(_)); -} - -// Test that nothing bad happens when actions reach the end of the pipeline -// (a warning will be logged, however) -TEST(ChannelTest, ReachEndOfPipeline) { - IntHandler handler; - EXPECT_CALL(handler, attachPipeline(_)); - ChannelPipeline - pipeline(&handler); - - EXPECT_CALL(handler, read_(_, _)).WillOnce(FireRead()); - pipeline.read(1); - - EXPECT_CALL(handler, readEOF(_)).WillOnce(FireReadEOF()); - pipeline.readEOF(); - - EXPECT_CALL(handler, readException(_, _)).WillOnce(FireReadException()); - pipeline.readException(make_exception_wrapper("blah")); - - EXPECT_CALL(handler, write_(_, _)).WillOnce(FireWrite()); - EXPECT_NO_THROW(pipeline.write(1).value()); - - EXPECT_CALL(handler, close_(_)).WillOnce(FireClose()); - EXPECT_NO_THROW(pipeline.close().value()); - - EXPECT_CALL(handler, detachPipeline(_)); -} - -// Test having the last read handler turn around and write -TEST(ChannelTest, TurnAround) { - IntHandler handler1; - IntHandler handler2; - - EXPECT_CALL(handler1, attachPipeline(_)); - EXPECT_CALL(handler2, attachPipeline(_)); - - ChannelPipeline - pipeline(&handler1, &handler2); - - EXPECT_CALL(handler1, read_(_, _)).WillOnce(FireRead()); - EXPECT_CALL(handler2, read_(_, _)).WillOnce(FireWrite()); - EXPECT_CALL(handler1, write_(_, _)).Times(1); - pipeline.read(1); - - EXPECT_CALL(handler1, detachPipeline(_)); - EXPECT_CALL(handler2, detachPipeline(_)); -} - -TEST(ChannelTest, DynamicFireActions) { - IntHandler handler1, handler2, handler3; - EXPECT_CALL(handler2, attachPipeline(_)); - ChannelPipeline - pipeline(&handler2); - - EXPECT_CALL(handler1, attachPipeline(_)); - EXPECT_CALL(handler3, attachPipeline(_)); - - pipeline - .addFront(IntHandlerPtr(&handler1)) - .addBack(IntHandlerPtr(&handler3)) - .finalize(); - - EXPECT_TRUE(pipeline.getHandler(0)); - EXPECT_TRUE(pipeline.getHandler(1)); - EXPECT_TRUE(pipeline.getHandler(2)); - - EXPECT_CALL(handler1, read_(_, _)).WillOnce(FireRead()); - EXPECT_CALL(handler2, read_(_, _)).WillOnce(FireRead()); - EXPECT_CALL(handler3, read_(_, _)).Times(1); - pipeline.read(1); - - EXPECT_CALL(handler3, write_(_, _)).WillOnce(FireWrite()); - EXPECT_CALL(handler2, write_(_, _)).WillOnce(FireWrite()); - EXPECT_CALL(handler1, write_(_, _)).Times(1); - EXPECT_NO_THROW(pipeline.write(1).value()); - - EXPECT_CALL(handler1, detachPipeline(_)); - EXPECT_CALL(handler2, detachPipeline(_)); - EXPECT_CALL(handler3, detachPipeline(_)); -} - -template -class ConcreteChannelHandler : public ChannelHandler { - typedef typename ChannelHandler::Context Context; - public: - void read(Context* ctx, Rin msg) {} - Future write(Context* ctx, Win msg) { return makeFuture(); } -}; - -typedef ChannelHandlerAdapter StringHandler; -typedef ConcreteChannelHandler IntToStringHandler; -typedef ConcreteChannelHandler StringToIntHandler; - -TEST(ChannelPipeline, DynamicConstruction) { - { - ChannelPipeline pipeline; - EXPECT_THROW( - pipeline - .addBack(ChannelHandlerAdapter{}) - .finalize(), std::invalid_argument); - } - { - ChannelPipeline pipeline; - EXPECT_THROW( - pipeline - .addFront(ChannelHandlerAdapter{}) - .finalize(), - std::invalid_argument); - } - { - ChannelPipeline - pipeline{StringHandler(), StringHandler()}; - - // Exercise both addFront and addBack. Final pipeline is - // StI <-> ItS <-> StS <-> StS <-> StI <-> ItS - EXPECT_NO_THROW( - pipeline - .addFront(IntToStringHandler{}) - .addFront(StringToIntHandler{}) - .addBack(StringToIntHandler{}) - .addBack(IntToStringHandler{}) - .finalize()); - } -} - -TEST(ChannelPipeline, AttachTransport) { - IntHandler handler; - EXPECT_CALL(handler, attachPipeline(_)); - ChannelPipeline - pipeline(&handler); - - EventBase eb; - auto socket = AsyncSocket::newSocket(&eb); - - EXPECT_CALL(handler, attachTransport(_)); - pipeline.attachTransport(socket); - - EXPECT_CALL(handler, detachTransport(_)); - pipeline.detachTransport(); - - EXPECT_CALL(handler, detachPipeline(_)); -} diff --git a/folly/wangle/channel/test/MockChannelHandler.h b/folly/wangle/channel/test/MockChannelHandler.h deleted file mode 100644 index 15b88cb7..00000000 --- a/folly/wangle/channel/test/MockChannelHandler.h +++ /dev/null @@ -1,75 +0,0 @@ -/* - * Copyright 2015 Facebook, Inc. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -#pragma once - -#include -#include - -namespace folly { namespace wangle { - -template -class MockChannelHandler : public ChannelHandler { - public: - typedef typename ChannelHandler::Context Context; - - MockChannelHandler() = default; - MockChannelHandler(MockChannelHandler&&) = default; - -#ifdef __clang__ -# pragma clang diagnostic push -# if __clang_major__ > 3 || __clang_minor__ >= 6 -# pragma clang diagnostic ignored "-Winconsistent-missing-override" -# endif -#endif - - MOCK_METHOD2_T(read_, void(Context*, Rin&)); - MOCK_METHOD1_T(readEOF, void(Context*)); - MOCK_METHOD2_T(readException, void(Context*, exception_wrapper)); - - MOCK_METHOD2_T(write_, void(Context*, Win&)); - MOCK_METHOD1_T(close_, void(Context*)); - - MOCK_METHOD1_T(attachPipeline, void(Context*)); - MOCK_METHOD1_T(attachTransport, void(Context*)); - MOCK_METHOD1_T(detachPipeline, void(Context*)); - MOCK_METHOD1_T(detachTransport, void(Context*)); - -#ifdef __clang__ -#pragma clang diagnostic pop -#endif - - void read(Context* ctx, Rin msg) override { - read_(ctx, msg); - } - - Future write(Context* ctx, Win msg) override { - return makeFutureWith([&](){ - write_(ctx, msg); - }); - } - - Future close(Context* ctx) override { - return makeFutureWith([&](){ - close_(ctx); - }); - } -}; - -template -using MockChannelHandlerAdapter = MockChannelHandler; - -}} diff --git a/folly/wangle/channel/test/MockHandler.h b/folly/wangle/channel/test/MockHandler.h new file mode 100644 index 00000000..5a476646 --- /dev/null +++ b/folly/wangle/channel/test/MockHandler.h @@ -0,0 +1,75 @@ +/* + * Copyright 2015 Facebook, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#pragma once + +#include +#include + +namespace folly { namespace wangle { + +template +class MockHandler : public Handler { + public: + typedef typename Handler::Context Context; + + MockHandler() = default; + MockHandler(MockHandler&&) = default; + +#ifdef __clang__ +# pragma clang diagnostic push +# if __clang_major__ > 3 || __clang_minor__ >= 6 +# pragma clang diagnostic ignored "-Winconsistent-missing-override" +# endif +#endif + + MOCK_METHOD2_T(read_, void(Context*, Rin&)); + MOCK_METHOD1_T(readEOF, void(Context*)); + MOCK_METHOD2_T(readException, void(Context*, exception_wrapper)); + + MOCK_METHOD2_T(write_, void(Context*, Win&)); + MOCK_METHOD1_T(close_, void(Context*)); + + MOCK_METHOD1_T(attachPipeline, void(Context*)); + MOCK_METHOD1_T(attachTransport, void(Context*)); + MOCK_METHOD1_T(detachPipeline, void(Context*)); + MOCK_METHOD1_T(detachTransport, void(Context*)); + +#ifdef __clang__ +#pragma clang diagnostic pop +#endif + + void read(Context* ctx, Rin msg) override { + read_(ctx, msg); + } + + Future write(Context* ctx, Win msg) override { + return makeFutureWith([&](){ + write_(ctx, msg); + }); + } + + Future close(Context* ctx) override { + return makeFutureWith([&](){ + close_(ctx); + }); + } +}; + +template +using MockHandlerAdapter = MockHandler; + +}} diff --git a/folly/wangle/channel/test/OutputBufferingHandlerTest.cpp b/folly/wangle/channel/test/OutputBufferingHandlerTest.cpp index e99d43e5..a08509b6 100644 --- a/folly/wangle/channel/test/OutputBufferingHandlerTest.cpp +++ b/folly/wangle/channel/test/OutputBufferingHandlerTest.cpp @@ -14,9 +14,9 @@ * limitations under the License. */ -#include +#include #include -#include +#include #include #include #include @@ -25,18 +25,18 @@ using namespace folly; using namespace folly::wangle; using namespace testing; -typedef StrictMock>> -MockHandler; +MockBytesHandler; MATCHER_P(IOBufContains, str, "") { return arg->moveToFbString() == str; } TEST(OutputBufferingHandlerTest, Basic) { - MockHandler mockHandler; + MockBytesHandler mockHandler; EXPECT_CALL(mockHandler, attachPipeline(_)); - ChannelPipeline, - ChannelHandlerPtr, + Pipeline, + HandlerPtr, OutputBufferingHandler> pipeline(&mockHandler, OutputBufferingHandler{}); diff --git a/folly/wangle/channel/test/PipelineTest.cpp b/folly/wangle/channel/test/PipelineTest.cpp new file mode 100644 index 00000000..5fa97a64 --- /dev/null +++ b/folly/wangle/channel/test/PipelineTest.cpp @@ -0,0 +1,251 @@ +/* + * Copyright 2015 Facebook, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include +#include +#include +#include +#include +#include +#include + +using namespace folly; +using namespace folly::wangle; +using namespace testing; + +typedef StrictMock> IntHandler; +typedef HandlerPtr IntHandlerPtr; + +ACTION(FireRead) { + arg0->fireRead(arg1); +} + +ACTION(FireReadEOF) { + arg0->fireReadEOF(); +} + +ACTION(FireReadException) { + arg0->fireReadException(arg1); +} + +ACTION(FireWrite) { + arg0->fireWrite(arg1); +} + +ACTION(FireClose) { + arg0->fireClose(); +} + +// Test move only types, among other things +TEST(PipelineTest, RealHandlersCompile) { + EventBase eb; + auto socket = AsyncSocket::newSocket(&eb); + // static + { + Pipeline, + AsyncSocketHandler, + OutputBufferingHandler> + pipeline{AsyncSocketHandler(socket), OutputBufferingHandler()}; + EXPECT_TRUE(pipeline.getHandler(0)); + EXPECT_TRUE(pipeline.getHandler(1)); + } + // dynamic + { + Pipeline> pipeline; + pipeline + .addBack(AsyncSocketHandler(socket)) + .addBack(OutputBufferingHandler()) + .finalize(); + EXPECT_TRUE(pipeline.getHandler(0)); + EXPECT_TRUE(pipeline.getHandler(1)); + } +} + +// Test that handlers correctly fire the next handler when directed +TEST(PipelineTest, FireActions) { + IntHandler handler1; + IntHandler handler2; + + EXPECT_CALL(handler1, attachPipeline(_)); + EXPECT_CALL(handler2, attachPipeline(_)); + + Pipeline + pipeline(&handler1, &handler2); + + EXPECT_CALL(handler1, read_(_, _)).WillOnce(FireRead()); + EXPECT_CALL(handler2, read_(_, _)).Times(1); + pipeline.read(1); + + EXPECT_CALL(handler1, readEOF(_)).WillOnce(FireReadEOF()); + EXPECT_CALL(handler2, readEOF(_)).Times(1); + pipeline.readEOF(); + + EXPECT_CALL(handler1, readException(_, _)).WillOnce(FireReadException()); + EXPECT_CALL(handler2, readException(_, _)).Times(1); + pipeline.readException(make_exception_wrapper("blah")); + + EXPECT_CALL(handler2, write_(_, _)).WillOnce(FireWrite()); + EXPECT_CALL(handler1, write_(_, _)).Times(1); + EXPECT_NO_THROW(pipeline.write(1).value()); + + EXPECT_CALL(handler2, close_(_)).WillOnce(FireClose()); + EXPECT_CALL(handler1, close_(_)).Times(1); + EXPECT_NO_THROW(pipeline.close().value()); + + EXPECT_CALL(handler1, detachPipeline(_)); + EXPECT_CALL(handler2, detachPipeline(_)); +} + +// Test that nothing bad happens when actions reach the end of the pipeline +// (a warning will be logged, however) +TEST(PipelineTest, ReachEndOfPipeline) { + IntHandler handler; + EXPECT_CALL(handler, attachPipeline(_)); + Pipeline + pipeline(&handler); + + EXPECT_CALL(handler, read_(_, _)).WillOnce(FireRead()); + pipeline.read(1); + + EXPECT_CALL(handler, readEOF(_)).WillOnce(FireReadEOF()); + pipeline.readEOF(); + + EXPECT_CALL(handler, readException(_, _)).WillOnce(FireReadException()); + pipeline.readException(make_exception_wrapper("blah")); + + EXPECT_CALL(handler, write_(_, _)).WillOnce(FireWrite()); + EXPECT_NO_THROW(pipeline.write(1).value()); + + EXPECT_CALL(handler, close_(_)).WillOnce(FireClose()); + EXPECT_NO_THROW(pipeline.close().value()); + + EXPECT_CALL(handler, detachPipeline(_)); +} + +// Test having the last read handler turn around and write +TEST(PipelineTest, TurnAround) { + IntHandler handler1; + IntHandler handler2; + + EXPECT_CALL(handler1, attachPipeline(_)); + EXPECT_CALL(handler2, attachPipeline(_)); + + Pipeline + pipeline(&handler1, &handler2); + + EXPECT_CALL(handler1, read_(_, _)).WillOnce(FireRead()); + EXPECT_CALL(handler2, read_(_, _)).WillOnce(FireWrite()); + EXPECT_CALL(handler1, write_(_, _)).Times(1); + pipeline.read(1); + + EXPECT_CALL(handler1, detachPipeline(_)); + EXPECT_CALL(handler2, detachPipeline(_)); +} + +TEST(PipelineTest, DynamicFireActions) { + IntHandler handler1, handler2, handler3; + EXPECT_CALL(handler2, attachPipeline(_)); + Pipeline + pipeline(&handler2); + + EXPECT_CALL(handler1, attachPipeline(_)); + EXPECT_CALL(handler3, attachPipeline(_)); + + pipeline + .addFront(IntHandlerPtr(&handler1)) + .addBack(IntHandlerPtr(&handler3)) + .finalize(); + + EXPECT_TRUE(pipeline.getHandler(0)); + EXPECT_TRUE(pipeline.getHandler(1)); + EXPECT_TRUE(pipeline.getHandler(2)); + + EXPECT_CALL(handler1, read_(_, _)).WillOnce(FireRead()); + EXPECT_CALL(handler2, read_(_, _)).WillOnce(FireRead()); + EXPECT_CALL(handler3, read_(_, _)).Times(1); + pipeline.read(1); + + EXPECT_CALL(handler3, write_(_, _)).WillOnce(FireWrite()); + EXPECT_CALL(handler2, write_(_, _)).WillOnce(FireWrite()); + EXPECT_CALL(handler1, write_(_, _)).Times(1); + EXPECT_NO_THROW(pipeline.write(1).value()); + + EXPECT_CALL(handler1, detachPipeline(_)); + EXPECT_CALL(handler2, detachPipeline(_)); + EXPECT_CALL(handler3, detachPipeline(_)); +} + +template +class ConcreteHandler : public Handler { + typedef typename Handler::Context Context; + public: + void read(Context* ctx, Rin msg) {} + Future write(Context* ctx, Win msg) { return makeFuture(); } +}; + +typedef HandlerAdapter StringHandler; +typedef ConcreteHandler IntToStringHandler; +typedef ConcreteHandler StringToIntHandler; + +TEST(Pipeline, DynamicConstruction) { + { + Pipeline pipeline; + EXPECT_THROW( + pipeline + .addBack(HandlerAdapter{}) + .finalize(), std::invalid_argument); + } + { + Pipeline pipeline; + EXPECT_THROW( + pipeline + .addFront(HandlerAdapter{}) + .finalize(), + std::invalid_argument); + } + { + Pipeline + pipeline{StringHandler(), StringHandler()}; + + // Exercise both addFront and addBack. Final pipeline is + // StI <-> ItS <-> StS <-> StS <-> StI <-> ItS + EXPECT_NO_THROW( + pipeline + .addFront(IntToStringHandler{}) + .addFront(StringToIntHandler{}) + .addBack(StringToIntHandler{}) + .addBack(IntToStringHandler{}) + .finalize()); + } +} + +TEST(Pipeline, AttachTransport) { + IntHandler handler; + EXPECT_CALL(handler, attachPipeline(_)); + Pipeline + pipeline(&handler); + + EventBase eb; + auto socket = AsyncSocket::newSocket(&eb); + + EXPECT_CALL(handler, attachTransport(_)); + pipeline.attachTransport(socket); + + EXPECT_CALL(handler, detachTransport(_)); + pipeline.detachTransport(); + + EXPECT_CALL(handler, detachPipeline(_)); +} diff --git a/folly/wangle/codec/ByteToMessageCodec.h b/folly/wangle/codec/ByteToMessageCodec.h index 96680002..20d6e7fe 100644 --- a/folly/wangle/codec/ByteToMessageCodec.h +++ b/folly/wangle/codec/ByteToMessageCodec.h @@ -15,12 +15,12 @@ */ #pragma once -#include +#include namespace folly { namespace wangle { /** - * A ChannelHandler which decodes bytes in a stream-like fashion from + * A Handler which decodes bytes in a stream-like fashion from * IOBufQueue to a Message type. * * Frame detection diff --git a/folly/wangle/codec/CodecTest.cpp b/folly/wangle/codec/CodecTest.cpp index 7657c715..80bb83d3 100644 --- a/folly/wangle/codec/CodecTest.cpp +++ b/folly/wangle/codec/CodecTest.cpp @@ -55,7 +55,7 @@ class BytesReflector }; TEST(FixedLengthFrameDecoder, FailWhenLengthFieldEndOffset) { - ChannelPipeline> pipeline; + Pipeline> pipeline; int called = 0; pipeline @@ -90,7 +90,7 @@ TEST(FixedLengthFrameDecoder, FailWhenLengthFieldEndOffset) { } TEST(LengthFieldFramePipeline, SimpleTest) { - ChannelPipeline> pipeline; + Pipeline> pipeline; int called = 0; pipeline @@ -111,7 +111,7 @@ TEST(LengthFieldFramePipeline, SimpleTest) { } TEST(LengthFieldFramePipeline, LittleEndian) { - ChannelPipeline> pipeline; + Pipeline> pipeline; int called = 0; pipeline @@ -132,7 +132,7 @@ TEST(LengthFieldFramePipeline, LittleEndian) { } TEST(LengthFieldFrameDecoder, Simple) { - ChannelPipeline> pipeline; + Pipeline> pipeline; int called = 0; pipeline @@ -163,7 +163,7 @@ TEST(LengthFieldFrameDecoder, Simple) { } TEST(LengthFieldFrameDecoder, NoStrip) { - ChannelPipeline> pipeline; + Pipeline> pipeline; int called = 0; pipeline @@ -194,7 +194,7 @@ TEST(LengthFieldFrameDecoder, NoStrip) { } TEST(LengthFieldFrameDecoder, Adjustment) { - ChannelPipeline> pipeline; + Pipeline> pipeline; int called = 0; pipeline @@ -225,7 +225,7 @@ TEST(LengthFieldFrameDecoder, Adjustment) { } TEST(LengthFieldFrameDecoder, PreHeader) { - ChannelPipeline> pipeline; + Pipeline> pipeline; int called = 0; pipeline @@ -257,7 +257,7 @@ TEST(LengthFieldFrameDecoder, PreHeader) { } TEST(LengthFieldFrameDecoder, PostHeader) { - ChannelPipeline> pipeline; + Pipeline> pipeline; int called = 0; pipeline @@ -289,7 +289,7 @@ TEST(LengthFieldFrameDecoder, PostHeader) { } TEST(LengthFieldFrameDecoderStrip, PrePostHeader) { - ChannelPipeline> pipeline; + Pipeline> pipeline; int called = 0; pipeline @@ -322,7 +322,7 @@ TEST(LengthFieldFrameDecoderStrip, PrePostHeader) { } TEST(LengthFieldFrameDecoder, StripPrePostHeaderFrameInclHeader) { - ChannelPipeline> pipeline; + Pipeline> pipeline; int called = 0; pipeline @@ -355,7 +355,7 @@ TEST(LengthFieldFrameDecoder, StripPrePostHeaderFrameInclHeader) { } TEST(LengthFieldFrameDecoder, FailTestLengthFieldEndOffset) { - ChannelPipeline> pipeline; + Pipeline> pipeline; int called = 0; pipeline @@ -380,7 +380,7 @@ TEST(LengthFieldFrameDecoder, FailTestLengthFieldEndOffset) { } TEST(LengthFieldFrameDecoder, FailTestLengthFieldFrameSize) { - ChannelPipeline> pipeline; + Pipeline> pipeline; int called = 0; pipeline @@ -407,7 +407,7 @@ TEST(LengthFieldFrameDecoder, FailTestLengthFieldFrameSize) { } TEST(LengthFieldFrameDecoder, FailTestLengthFieldInitialBytes) { - ChannelPipeline> pipeline; + Pipeline> pipeline; int called = 0; pipeline @@ -434,7 +434,7 @@ TEST(LengthFieldFrameDecoder, FailTestLengthFieldInitialBytes) { } TEST(LineBasedFrameDecoder, Simple) { - ChannelPipeline> pipeline; + Pipeline> pipeline; int called = 0; pipeline @@ -485,7 +485,7 @@ TEST(LineBasedFrameDecoder, Simple) { } TEST(LineBasedFrameDecoder, SaveDelimiter) { - ChannelPipeline> pipeline; + Pipeline> pipeline; int called = 0; pipeline @@ -534,7 +534,7 @@ TEST(LineBasedFrameDecoder, SaveDelimiter) { } TEST(LineBasedFrameDecoder, Fail) { - ChannelPipeline> pipeline; + Pipeline> pipeline; int called = 0; pipeline @@ -582,7 +582,7 @@ TEST(LineBasedFrameDecoder, Fail) { } TEST(LineBasedFrameDecoder, NewLineOnly) { - ChannelPipeline> pipeline; + Pipeline> pipeline; int called = 0; pipeline @@ -609,7 +609,7 @@ TEST(LineBasedFrameDecoder, NewLineOnly) { } TEST(LineBasedFrameDecoder, CarriageNewLineOnly) { - ChannelPipeline> pipeline; + Pipeline> pipeline; int called = 0; pipeline diff --git a/folly/wangle/codec/StringCodec.h b/folly/wangle/codec/StringCodec.h index 0f8c96d4..226a3677 100644 --- a/folly/wangle/codec/StringCodec.h +++ b/folly/wangle/codec/StringCodec.h @@ -16,17 +16,17 @@ #pragma once -#include +#include namespace folly { namespace wangle { /* * StringCodec converts a pipeline from IOBufs to std::strings. */ -class StringCodec : public ChannelHandler> { +class StringCodec : public Handler> { public: - typedef typename ChannelHandler< + typedef typename Handler< IOBufQueue&, std::string, std::string, std::unique_ptr>::Context Context; diff --git a/folly/wangle/service/ClientDispatcher.h b/folly/wangle/service/ClientDispatcher.h index f0a5e89c..ac8ccc92 100644 --- a/folly/wangle/service/ClientDispatcher.h +++ b/folly/wangle/service/ClientDispatcher.h @@ -15,7 +15,7 @@ */ #pragma once -#include +#include #include namespace folly { namespace wangle { @@ -26,16 +26,16 @@ namespace folly { namespace wangle { * only one request is allowed at a time. */ template -class SerialClientDispatcher : public ChannelHandlerAdapter +class SerialClientDispatcher : public HandlerAdapter , public Service { public: - typedef typename ChannelHandlerAdapter::Context Context; + typedef typename HandlerAdapter::Context Context; void setPipeline(Pipeline* pipeline) { pipeline_ = pipeline; pipeline->addBack( - ChannelHandlerPtr, false>( + HandlerPtr, false>( this)); pipeline->finalize(); } diff --git a/folly/wangle/service/ServerDispatcher.h b/folly/wangle/service/ServerDispatcher.h index 8152dc60..0b3167e0 100644 --- a/folly/wangle/service/ServerDispatcher.h +++ b/folly/wangle/service/ServerDispatcher.h @@ -15,7 +15,7 @@ */ #pragma once -#include +#include #include namespace folly { namespace wangle { @@ -25,10 +25,10 @@ namespace folly { namespace wangle { * Concurrent requests are queued in the pipeline. */ template -class SerialServerDispatcher : public ChannelHandlerAdapter { +class SerialServerDispatcher : public HandlerAdapter { public: - typedef typename ChannelHandlerAdapter::Context Context; + typedef typename HandlerAdapter::Context Context; explicit SerialServerDispatcher(Service* service) : service_(service) {} diff --git a/folly/wangle/service/Service.h b/folly/wangle/service/Service.h index 7d78defd..1d9aab24 100644 --- a/folly/wangle/service/Service.h +++ b/folly/wangle/service/Service.h @@ -20,7 +20,7 @@ #include #include -#include +#include #include namespace folly { diff --git a/folly/wangle/service/ServiceTest.cpp b/folly/wangle/service/ServiceTest.cpp index 843f80d0..b9c6d815 100644 --- a/folly/wangle/service/ServiceTest.cpp +++ b/folly/wangle/service/ServiceTest.cpp @@ -24,7 +24,7 @@ namespace folly { using namespace wangle; -typedef ChannelPipeline Pipeline; +typedef Pipeline ServicePipeline; class EchoService : public Service { public: @@ -42,12 +42,12 @@ class EchoIntService : public Service { template class ServerPipelineFactory - : public PipelineFactory { + : public PipelineFactory { public: - Pipeline* newPipeline( + ServicePipeline* newPipeline( std::shared_ptr socket) override { - auto pipeline = new Pipeline(); + auto pipeline = new ServicePipeline(); pipeline->addBack(AsyncSocketHandler(socket)); pipeline->addBack(StringCodec()); pipeline->addBack(SerialServerDispatcher(&service_)); @@ -61,12 +61,12 @@ class ServerPipelineFactory }; template -class ClientPipelineFactory : public PipelineFactory { +class ClientPipelineFactory : public PipelineFactory { public: - Pipeline* newPipeline( + ServicePipeline* newPipeline( std::shared_ptr socket) override { - auto pipeline = new Pipeline(); + auto pipeline = new ServicePipeline(); pipeline->addBack(AsyncSocketHandler(socket)); pipeline->addBack(StringCodec()); pipeline->template getHandler(0)->attachReadCallback(); @@ -101,14 +101,14 @@ TEST(Wangle, ClientServerTest) { int port = 1234; // server - ServerBootstrap server; + ServerBootstrap server; server.childPipeline( std::make_shared>()); server.bind(port); // client - auto client = std::make_shared>(); - ClientServiceFactory serviceFactory; + auto client = std::make_shared>(); + ClientServiceFactory serviceFactory; client->pipelineFactory( std::make_shared>()); SocketAddress addr("127.0.0.1", port);