wangle/bootstrap/ServerSocketFactory.h \
wangle/bootstrap/ClientBootstrap.h \
wangle/channel/AsyncSocketHandler.h \
- wangle/channel/ChannelHandler.h \
- wangle/channel/ChannelHandlerContext.h \
- wangle/channel/ChannelPipeline.h \
+ wangle/channel/Handler.h \
+ wangle/channel/HandlerContext.h \
wangle/channel/OutputBufferingHandler.h \
+ wangle/channel/Pipeline.h \
wangle/concurrent/BlockingQueue.h \
wangle/concurrent/Codel.h \
wangle/concurrent/CPUThreadPoolExecutor.h \
#include "folly/wangle/bootstrap/ServerBootstrap.h"
#include "folly/wangle/bootstrap/ClientBootstrap.h"
-#include "folly/wangle/channel/ChannelHandler.h"
+#include "folly/wangle/channel/Handler.h"
#include <glog/logging.h>
#include <gtest/gtest.h>
using namespace folly::wangle;
using namespace folly;
-typedef ChannelPipeline<IOBufQueue&, std::unique_ptr<IOBuf>> Pipeline;
+typedef Pipeline<IOBufQueue&, std::unique_ptr<IOBuf>> BytesPipeline;
-typedef ServerBootstrap<Pipeline> TestServer;
-typedef ClientBootstrap<Pipeline> TestClient;
+typedef ServerBootstrap<BytesPipeline> TestServer;
+typedef ClientBootstrap<BytesPipeline> TestClient;
-class TestClientPipelineFactory : public PipelineFactory<Pipeline> {
+class TestClientPipelineFactory : public PipelineFactory<BytesPipeline> {
public:
- Pipeline* newPipeline(std::shared_ptr<AsyncSocket> sock) {
+ BytesPipeline* newPipeline(std::shared_ptr<AsyncSocket> sock) {
CHECK(sock->good());
// We probably aren't connected immedately, check after a small delay
}
};
-class TestPipelineFactory : public PipelineFactory<Pipeline> {
+class TestPipelineFactory : public PipelineFactory<BytesPipeline> {
public:
- Pipeline* newPipeline(std::shared_ptr<AsyncSocket> sock) {
+ BytesPipeline* newPipeline(std::shared_ptr<AsyncSocket> sock) {
pipelines++;
- return new Pipeline();
+ return new BytesPipeline();
}
std::atomic<int> pipelines{0};
};
std::atomic<int> connections{0};
class TestHandlerPipeline
- : public ChannelHandlerAdapter<void*,
+ : public HandlerAdapter<void*,
std::exception> {
public:
void read(Context* ctx, void* conn) {
template <typename HandlerPipeline>
class TestHandlerPipelineFactory
- : public PipelineFactory<ServerBootstrap<Pipeline>::AcceptPipeline> {
+ : public PipelineFactory<ServerBootstrap<BytesPipeline>::AcceptPipeline> {
public:
- ServerBootstrap<Pipeline>::AcceptPipeline* newPipeline(std::shared_ptr<AsyncSocket>) {
- auto pipeline = new ServerBootstrap<Pipeline>::AcceptPipeline;
+ ServerBootstrap<BytesPipeline>::AcceptPipeline* newPipeline(std::shared_ptr<AsyncSocket>) {
+ auto pipeline = new ServerBootstrap<BytesPipeline>::AcceptPipeline;
auto handler = std::make_shared<HandlerPipeline>();
- pipeline->addBack(ChannelHandlerPtr<HandlerPipeline>(handler));
+ pipeline->addBack(HandlerPtr<HandlerPipeline>(handler));
return pipeline;
}
};
}
class TestUDPPipeline
- : public ChannelHandlerAdapter<void*,
+ : public HandlerAdapter<void*,
std::exception> {
public:
void read(Context* ctx, void* conn) {
*/
#pragma once
-#include <folly/wangle/channel/ChannelPipeline.h>
+#include <folly/wangle/channel/Pipeline.h>
namespace folly {
/*
- * A thin wrapper around ChannelPipeline and AsyncSocket to match
+ * A thin wrapper around Pipeline and AsyncSocket to match
* ServerBootstrap. On connect() a new pipeline is created.
*/
template <typename Pipeline>
#include <folly/io/async/EventBaseManager.h>
#include <folly/wangle/concurrent/IOThreadPoolExecutor.h>
#include <folly/wangle/acceptor/ManagedConnection.h>
-#include <folly/wangle/channel/ChannelPipeline.h>
-#include <folly/wangle/channel/ChannelHandler.h>
+#include <folly/wangle/channel/Pipeline.h>
+#include <folly/wangle/channel/Handler.h>
namespace folly {
template <typename Pipeline>
class ServerAcceptor
: public Acceptor
- , public folly::wangle::ChannelHandlerAdapter<void*, std::exception> {
+ , public folly::wangle::HandlerAdapter<void*, std::exception> {
typedef std::unique_ptr<Pipeline,
folly::DelayedDestruction::Destructor> PipelinePtr;
public:
explicit ServerAcceptor(
std::shared_ptr<PipelineFactory<Pipeline>> pipelineFactory,
- std::shared_ptr<folly::wangle::ChannelPipeline<
+ std::shared_ptr<folly::wangle::Pipeline<
void*, std::exception>> acceptorPipeline,
EventBase* base)
: Acceptor(ServerSocketConfig())
Acceptor::init(nullptr, base_);
CHECK(acceptorPipeline_);
- acceptorPipeline_->addBack(folly::wangle::ChannelHandlerPtr<ServerAcceptor, false>(this));
+ acceptorPipeline_->addBack(folly::wangle::HandlerPtr<ServerAcceptor, false>(this));
acceptorPipeline_->finalize();
}
EventBase* base_;
std::shared_ptr<PipelineFactory<Pipeline>> childPipelineFactory_;
- std::shared_ptr<folly::wangle::ChannelPipeline<
+ std::shared_ptr<folly::wangle::Pipeline<
void*, std::exception>> acceptorPipeline_;
};
public:
explicit ServerAcceptorFactory(
std::shared_ptr<PipelineFactory<Pipeline>> factory,
- std::shared_ptr<PipelineFactory<folly::wangle::ChannelPipeline<
+ std::shared_ptr<PipelineFactory<folly::wangle::Pipeline<
void*, std::exception>>> pipeline)
: factory_(factory)
, pipeline_(pipeline) {}
std::shared_ptr<Acceptor> newAcceptor(EventBase* base) {
- std::shared_ptr<folly::wangle::ChannelPipeline<
+ std::shared_ptr<folly::wangle::Pipeline<
void*, std::exception>> pipeline(
pipeline_->newPipeline(nullptr));
return std::make_shared<ServerAcceptor<Pipeline>>(factory_, pipeline, base);
private:
std::shared_ptr<PipelineFactory<Pipeline>> factory_;
std::shared_ptr<PipelineFactory<
- folly::wangle::ChannelPipeline<
+ folly::wangle::Pipeline<
void*, std::exception>>> pipeline_;
};
}
class DefaultAcceptPipelineFactory
- : public PipelineFactory<wangle::ChannelPipeline<void*, std::exception>> {
- typedef wangle::ChannelPipeline<
+ : public PipelineFactory<wangle::Pipeline<void*, std::exception>> {
+ typedef wangle::Pipeline<
void*,
std::exception> AcceptPipeline;
*/
#include <folly/wangle/bootstrap/ServerBootstrap.h>
#include <folly/wangle/concurrent/NamedThreadFactory.h>
-#include <folly/wangle/channel/ChannelHandler.h>
+#include <folly/wangle/channel/Handler.h>
#include <folly/io/async/EventBaseManager.h>
namespace folly {
#include <folly/wangle/bootstrap/ServerBootstrap-inl.h>
#include <folly/Baton.h>
-#include <folly/wangle/channel/ChannelPipeline.h>
+#include <folly/wangle/channel/Pipeline.h>
namespace folly {
-typedef folly::wangle::ChannelPipeline<
+typedef folly::wangle::Pipeline<
folly::IOBufQueue&, std::unique_ptr<folly::IOBuf>> DefaultPipeline;
/*
* accepting threads, any number of accepting sockets, a pool of
* IO-worker threads, and connection pool for each IO thread for you.
*
- * The output is given as a ChannelPipeline template: given a
+ * The output is given as a Pipeline template: given a
* PipelineFactory, it will create a new pipeline for each connection,
* and your server can handle the incoming bytes.
*
join();
}
- typedef wangle::ChannelPipeline<
+ typedef wangle::Pipeline<
void*,
std::exception> AcceptPipeline;
/*
#pragma once
-#include <folly/wangle/channel/ChannelHandler.h>
+#include <folly/wangle/channel/Handler.h>
#include <folly/io/async/AsyncSocket.h>
#include <folly/io/async/EventBase.h>
#include <folly/io/async/EventBaseManager.h>
+++ /dev/null
-/*
- * Copyright 2015 Facebook, Inc.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-#pragma once
-
-#include <folly/futures/Future.h>
-#include <folly/wangle/channel/ChannelPipeline.h>
-#include <folly/io/IOBuf.h>
-#include <folly/io/IOBufQueue.h>
-
-namespace folly { namespace wangle {
-
-template <class Rin, class Rout = Rin, class Win = Rout, class Wout = Rin>
-class ChannelHandler {
- public:
- typedef Rin rin;
- typedef Rout rout;
- typedef Win win;
- typedef Wout wout;
- typedef ChannelHandlerContext<Rout, Wout> Context;
- virtual ~ChannelHandler() {}
-
- virtual void read(Context* ctx, Rin msg) = 0;
- virtual void readEOF(Context* ctx) {
- ctx->fireReadEOF();
- }
- virtual void readException(Context* ctx, exception_wrapper e) {
- ctx->fireReadException(std::move(e));
- }
-
- virtual Future<void> write(Context* ctx, Win msg) = 0;
- virtual Future<void> close(Context* ctx) {
- return ctx->fireClose();
- }
-
- virtual void attachPipeline(Context* ctx) {}
- virtual void attachTransport(Context* ctx) {}
-
- virtual void detachPipeline(Context* ctx) {}
- virtual void detachTransport(Context* ctx) {}
-
- /*
- // Other sorts of things we might want, all shamelessly stolen from Netty
- // inbound
- virtual void exceptionCaught(
- ChannelHandlerContext* ctx,
- exception_wrapper e) {}
- virtual void channelRegistered(ChannelHandlerContext* ctx) {}
- virtual void channelUnregistered(ChannelHandlerContext* ctx) {}
- virtual void channelActive(ChannelHandlerContext* ctx) {}
- virtual void channelInactive(ChannelHandlerContext* ctx) {}
- virtual void channelReadComplete(ChannelHandlerContext* ctx) {}
- virtual void userEventTriggered(ChannelHandlerContext* ctx, void* evt) {}
- virtual void channelWritabilityChanged(ChannelHandlerContext* ctx) {}
-
- // outbound
- virtual Future<void> bind(
- ChannelHandlerContext* ctx,
- SocketAddress localAddress) {}
- virtual Future<void> connect(
- ChannelHandlerContext* ctx,
- SocketAddress remoteAddress, SocketAddress localAddress) {}
- virtual Future<void> disconnect(ChannelHandlerContext* ctx) {}
- virtual Future<void> deregister(ChannelHandlerContext* ctx) {}
- virtual Future<void> read(ChannelHandlerContext* ctx) {}
- virtual void flush(ChannelHandlerContext* ctx) {}
- */
-};
-
-template <class R, class W = R>
-class ChannelHandlerAdapter : public ChannelHandler<R, R, W, W> {
- public:
- typedef typename ChannelHandler<R, R, W, W>::Context Context;
-
- void read(Context* ctx, R msg) override {
- ctx->fireRead(std::forward<R>(msg));
- }
-
- Future<void> write(Context* ctx, W msg) override {
- return ctx->fireWrite(std::forward<W>(msg));
- }
-};
-
-typedef ChannelHandlerAdapter<IOBufQueue&, std::unique_ptr<IOBuf>>
-BytesToBytesHandler;
-
-template <class Handler, bool Shared = true>
-class ChannelHandlerPtr : public ChannelHandler<
- typename Handler::rin,
- typename Handler::rout,
- typename Handler::win,
- typename Handler::wout> {
- public:
- typedef typename std::conditional<
- Shared,
- std::shared_ptr<Handler>,
- Handler*>::type
- HandlerPtr;
-
- typedef typename Handler::Context Context;
-
- explicit ChannelHandlerPtr(HandlerPtr handler)
- : handler_(std::move(handler)) {}
-
- HandlerPtr getHandler() {
- return handler_;
- }
-
- void setHandler(HandlerPtr handler) {
- if (handler == handler_) {
- return;
- }
- if (handler_ && ctx_) {
- handler_->detachPipeline(ctx_);
- }
- handler_ = std::move(handler);
- if (handler_ && ctx_) {
- handler_->attachPipeline(ctx_);
- if (ctx_->getTransport()) {
- handler_->attachTransport(ctx_);
- }
- }
- }
-
- void attachPipeline(Context* ctx) override {
- ctx_ = ctx;
- if (handler_) {
- handler_->attachPipeline(ctx_);
- }
- }
-
- void attachTransport(Context* ctx) override {
- ctx_ = ctx;
- if (handler_) {
- handler_->attachTransport(ctx_);
- }
- }
-
- void detachPipeline(Context* ctx) override {
- ctx_ = ctx;
- if (handler_) {
- handler_->detachPipeline(ctx_);
- }
- }
-
- void detachTransport(Context* ctx) override {
- ctx_ = ctx;
- if (handler_) {
- handler_->detachTransport(ctx_);
- }
- }
-
- void read(Context* ctx, typename Handler::rin msg) override {
- DCHECK(handler_);
- handler_->read(ctx, std::forward<typename Handler::rin>(msg));
- }
-
- void readEOF(Context* ctx) override {
- DCHECK(handler_);
- handler_->readEOF(ctx);
- }
-
- void readException(Context* ctx, exception_wrapper e) override {
- DCHECK(handler_);
- handler_->readException(ctx, std::move(e));
- }
-
- Future<void> write(Context* ctx, typename Handler::win msg) override {
- DCHECK(handler_);
- return handler_->write(ctx, std::forward<typename Handler::win>(msg));
- }
-
- Future<void> close(Context* ctx) override {
- DCHECK(handler_);
- return handler_->close(ctx);
- }
-
- private:
- Context* ctx_;
- HandlerPtr handler_;
-};
-
-}}
+++ /dev/null
-/*
- * Copyright 2015 Facebook, Inc.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-#pragma once
-
-#include <folly/io/async/AsyncTransport.h>
-#include <folly/futures/Future.h>
-#include <folly/ExceptionWrapper.h>
-
-namespace folly { namespace wangle {
-
-template <class In, class Out>
-class ChannelHandlerContext {
- public:
- virtual ~ChannelHandlerContext() {}
-
- virtual void fireRead(In msg) = 0;
- virtual void fireReadEOF() = 0;
- virtual void fireReadException(exception_wrapper e) = 0;
-
- virtual Future<void> fireWrite(Out msg) = 0;
- virtual Future<void> fireClose() = 0;
-
- virtual std::shared_ptr<AsyncTransport> getTransport() = 0;
-
- virtual void setWriteFlags(WriteFlags flags) = 0;
- virtual WriteFlags getWriteFlags() = 0;
-
- virtual void setReadBufferSettings(
- uint64_t minAvailable,
- uint64_t allocationSize) = 0;
- virtual std::pair<uint64_t, uint64_t> getReadBufferSettings() = 0;
-
- /* TODO
- template <class H>
- virtual void addHandlerBefore(H&&) {}
- template <class H>
- virtual void addHandlerAfter(H&&) {}
- template <class H>
- virtual void replaceHandler(H&&) {}
- virtual void removeHandler() {}
- */
-};
-
-class PipelineContext {
- public:
- virtual ~PipelineContext() {}
-
- virtual void attachTransport() = 0;
- virtual void detachTransport() = 0;
-
- void link(PipelineContext* other) {
- setNextIn(other);
- other->setNextOut(this);
- }
-
- protected:
- virtual void setNextIn(PipelineContext* ctx) = 0;
- virtual void setNextOut(PipelineContext* ctx) = 0;
-};
-
-template <class In>
-class InboundChannelHandlerContext {
- public:
- virtual ~InboundChannelHandlerContext() {}
- virtual void read(In msg) = 0;
- virtual void readEOF() = 0;
- virtual void readException(exception_wrapper e) = 0;
-};
-
-template <class Out>
-class OutboundChannelHandlerContext {
- public:
- virtual ~OutboundChannelHandlerContext() {}
- virtual Future<void> write(Out msg) = 0;
- virtual Future<void> close() = 0;
-};
-
-template <class P, class H>
-class ContextImpl : public ChannelHandlerContext<typename H::rout,
- typename H::wout>,
- public InboundChannelHandlerContext<typename H::rin>,
- public OutboundChannelHandlerContext<typename H::win>,
- public PipelineContext {
- public:
- typedef typename H::rin Rin;
- typedef typename H::rout Rout;
- typedef typename H::win Win;
- typedef typename H::wout Wout;
-
- template <class HandlerArg>
- explicit ContextImpl(P* pipeline, HandlerArg&& handlerArg)
- : pipeline_(pipeline),
- handler_(std::forward<HandlerArg>(handlerArg)) {
- handler_.attachPipeline(this);
- }
-
- ~ContextImpl() {
- handler_.detachPipeline(this);
- }
-
- H* getHandler() {
- return &handler_;
- }
-
- // PipelineContext overrides
- void setNextIn(PipelineContext* ctx) override {
- auto nextIn = dynamic_cast<InboundChannelHandlerContext<Rout>*>(ctx);
- if (nextIn) {
- nextIn_ = nextIn;
- } else {
- throw std::invalid_argument("wrong type in setNextIn");
- }
- }
-
- void setNextOut(PipelineContext* ctx) override {
- auto nextOut = dynamic_cast<OutboundChannelHandlerContext<Wout>*>(ctx);
- if (nextOut) {
- nextOut_ = nextOut;
- } else {
- throw std::invalid_argument("wrong type in setNextOut");
- }
- }
-
- void attachTransport() override {
- typename P::DestructorGuard dg(static_cast<DelayedDestruction*>(pipeline_));
- handler_.attachTransport(this);
- }
-
- void detachTransport() override {
- typename P::DestructorGuard dg(static_cast<DelayedDestruction*>(pipeline_));
- handler_.detachTransport(this);
- }
-
- // ChannelHandlerContext overrides
- void fireRead(Rout msg) override {
- typename P::DestructorGuard dg(static_cast<DelayedDestruction*>(pipeline_));
- if (nextIn_) {
- nextIn_->read(std::forward<Rout>(msg));
- } else {
- LOG(WARNING) << "read reached end of pipeline";
- }
- }
-
- void fireReadEOF() override {
- typename P::DestructorGuard dg(static_cast<DelayedDestruction*>(pipeline_));
- if (nextIn_) {
- nextIn_->readEOF();
- } else {
- LOG(WARNING) << "readEOF reached end of pipeline";
- }
- }
-
- void fireReadException(exception_wrapper e) override {
- typename P::DestructorGuard dg(static_cast<DelayedDestruction*>(pipeline_));
- if (nextIn_) {
- nextIn_->readException(std::move(e));
- } else {
- LOG(WARNING) << "readException reached end of pipeline";
- }
- }
-
- Future<void> fireWrite(Wout msg) override {
- typename P::DestructorGuard dg(static_cast<DelayedDestruction*>(pipeline_));
- if (nextOut_) {
- return nextOut_->write(std::forward<Wout>(msg));
- } else {
- LOG(WARNING) << "write reached end of pipeline";
- return makeFuture();
- }
- }
-
- Future<void> fireClose() override {
- typename P::DestructorGuard dg(static_cast<DelayedDestruction*>(pipeline_));
- if (nextOut_) {
- return nextOut_->close();
- } else {
- LOG(WARNING) << "close reached end of pipeline";
- return makeFuture();
- }
- }
-
- std::shared_ptr<AsyncTransport> getTransport() override {
- return pipeline_->getTransport();
- }
-
- void setWriteFlags(WriteFlags flags) override {
- pipeline_->setWriteFlags(flags);
- }
-
- WriteFlags getWriteFlags() override {
- return pipeline_->getWriteFlags();
- }
-
- void setReadBufferSettings(
- uint64_t minAvailable,
- uint64_t allocationSize) override {
- pipeline_->setReadBufferSettings(minAvailable, allocationSize);
- }
-
- std::pair<uint64_t, uint64_t> getReadBufferSettings() override {
- return pipeline_->getReadBufferSettings();
- }
-
- // InboundChannelHandlerContext overrides
- void read(Rin msg) override {
- typename P::DestructorGuard dg(static_cast<DelayedDestruction*>(pipeline_));
- handler_.read(this, std::forward<Rin>(msg));
- }
-
- void readEOF() override {
- typename P::DestructorGuard dg(static_cast<DelayedDestruction*>(pipeline_));
- handler_.readEOF(this);
- }
-
- void readException(exception_wrapper e) override {
- typename P::DestructorGuard dg(static_cast<DelayedDestruction*>(pipeline_));
- handler_.readException(this, std::move(e));
- }
-
- // OutboundChannelHandlerContext overrides
- Future<void> write(Win msg) override {
- typename P::DestructorGuard dg(static_cast<DelayedDestruction*>(pipeline_));
- return handler_.write(this, std::forward<Win>(msg));
- }
-
- Future<void> close() override {
- typename P::DestructorGuard dg(static_cast<DelayedDestruction*>(pipeline_));
- return handler_.close(this);
- }
-
- private:
- P* pipeline_;
- H handler_;
- InboundChannelHandlerContext<Rout>* nextIn_{nullptr};
- OutboundChannelHandlerContext<Wout>* nextOut_{nullptr};
-};
-
-}}
+++ /dev/null
-/*
- * Copyright 2015 Facebook, Inc.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-#pragma once
-
-#include <folly/wangle/channel/ChannelHandlerContext.h>
-#include <folly/futures/Future.h>
-#include <folly/io/async/AsyncTransport.h>
-#include <folly/io/async/DelayedDestruction.h>
-#include <folly/ExceptionWrapper.h>
-#include <folly/Memory.h>
-#include <glog/logging.h>
-
-namespace folly { namespace wangle {
-
-/*
- * R is the inbound type, i.e. inbound calls start with pipeline.read(R)
- * W is the outbound type, i.e. outbound calls start with pipeline.write(W)
- */
-template <class R, class W, class... Handlers>
-class ChannelPipeline;
-
-template <class R, class W>
-class ChannelPipeline<R, W> : public DelayedDestruction {
- public:
- ChannelPipeline() {}
- ~ChannelPipeline() {}
-
- std::shared_ptr<AsyncTransport> getTransport() {
- return transport_;
- }
-
- void setWriteFlags(WriteFlags flags) {
- writeFlags_ = flags;
- }
-
- WriteFlags getWriteFlags() {
- return writeFlags_;
- }
-
- void setReadBufferSettings(uint64_t minAvailable, uint64_t allocationSize) {
- readBufferSettings_ = std::make_pair(minAvailable, allocationSize);
- }
-
- std::pair<uint64_t, uint64_t> getReadBufferSettings() {
- return readBufferSettings_;
- }
-
- void read(R msg) {
- front_->read(std::forward<R>(msg));
- }
-
- void readEOF() {
- front_->readEOF();
- }
-
- void readException(exception_wrapper e) {
- front_->readException(std::move(e));
- }
-
- Future<void> write(W msg) {
- return back_->write(std::forward<W>(msg));
- }
-
- Future<void> close() {
- return back_->close();
- }
-
- template <class H>
- ChannelPipeline& addBack(H&& handler) {
- ctxs_.push_back(folly::make_unique<ContextImpl<ChannelPipeline, H>>(
- this, std::forward<H>(handler)));
- return *this;
- }
-
- template <class H>
- ChannelPipeline& addFront(H&& handler) {
- ctxs_.insert(
- ctxs_.begin(),
- folly::make_unique<ContextImpl<ChannelPipeline, H>>(
- this,
- std::forward<H>(handler)));
- return *this;
- }
-
- template <class H>
- H* getHandler(int i) {
- auto ctx = dynamic_cast<ContextImpl<ChannelPipeline, H>*>(ctxs_[i].get());
- CHECK(ctx);
- return ctx->getHandler();
- }
-
- void finalize() {
- finalizeHelper();
- InboundChannelHandlerContext<R>* front;
- front_ = dynamic_cast<InboundChannelHandlerContext<R>*>(
- ctxs_.front().get());
- if (!front_) {
- throw std::invalid_argument("wrong type for first handler");
- }
- }
-
- protected:
- explicit ChannelPipeline(bool shouldFinalize) {
- CHECK(!shouldFinalize);
- }
-
- void finalizeHelper() {
- if (ctxs_.empty()) {
- return;
- }
-
- for (size_t i = 0; i < ctxs_.size() - 1; i++) {
- ctxs_[i]->link(ctxs_[i+1].get());
- }
-
- back_ = dynamic_cast<OutboundChannelHandlerContext<W>*>(ctxs_.back().get());
- if (!back_) {
- throw std::invalid_argument("wrong type for last handler");
- }
- }
-
- PipelineContext* getLocalFront() {
- return ctxs_.empty() ? nullptr : ctxs_.front().get();
- }
-
- static const bool is_end{true};
-
- std::shared_ptr<AsyncTransport> transport_;
- WriteFlags writeFlags_{WriteFlags::NONE};
- std::pair<uint64_t, uint64_t> readBufferSettings_{2048, 2048};
-
- void attachPipeline() {}
-
- void attachTransport(
- std::shared_ptr<AsyncTransport> transport) {
- transport_ = std::move(transport);
- }
-
- void detachTransport() {
- transport_ = nullptr;
- }
-
- OutboundChannelHandlerContext<W>* back_{nullptr};
-
- private:
- InboundChannelHandlerContext<R>* front_{nullptr};
- std::vector<std::unique_ptr<PipelineContext>> ctxs_;
-};
-
-template <class R, class W, class Handler, class... Handlers>
-class ChannelPipeline<R, W, Handler, Handlers...>
- : public ChannelPipeline<R, W, Handlers...> {
- protected:
- template <class HandlerArg, class... HandlersArgs>
- ChannelPipeline(
- bool shouldFinalize,
- HandlerArg&& handlerArg,
- HandlersArgs&&... handlersArgs)
- : ChannelPipeline<R, W, Handlers...>(
- false,
- std::forward<HandlersArgs>(handlersArgs)...),
- ctx_(this, std::forward<HandlerArg>(handlerArg)) {
- if (shouldFinalize) {
- finalize();
- }
- }
-
- public:
- template <class... HandlersArgs>
- explicit ChannelPipeline(HandlersArgs&&... handlersArgs)
- : ChannelPipeline(true, std::forward<HandlersArgs>(handlersArgs)...) {}
-
- ~ChannelPipeline() {}
-
- void read(R msg) {
- typename ChannelPipeline<R, W>::DestructorGuard dg(
- static_cast<DelayedDestruction*>(this));
- front_->read(std::forward<R>(msg));
- }
-
- void readEOF() {
- typename ChannelPipeline<R, W>::DestructorGuard dg(
- static_cast<DelayedDestruction*>(this));
- front_->readEOF();
- }
-
- void readException(exception_wrapper e) {
- typename ChannelPipeline<R, W>::DestructorGuard dg(
- static_cast<DelayedDestruction*>(this));
- front_->readException(std::move(e));
- }
-
- Future<void> write(W msg) {
- typename ChannelPipeline<R, W>::DestructorGuard dg(
- static_cast<DelayedDestruction*>(this));
- return back_->write(std::forward<W>(msg));
- }
-
- Future<void> close() {
- typename ChannelPipeline<R, W>::DestructorGuard dg(
- static_cast<DelayedDestruction*>(this));
- return back_->close();
- }
-
- void attachTransport(
- std::shared_ptr<AsyncTransport> transport) {
- typename ChannelPipeline<R, W>::DestructorGuard dg(
- static_cast<DelayedDestruction*>(this));
- CHECK((!ChannelPipeline<R, W>::transport_));
- ChannelPipeline<R, W, Handlers...>::attachTransport(std::move(transport));
- forEachCtx([&](PipelineContext* ctx){
- ctx->attachTransport();
- });
- }
-
- void detachTransport() {
- typename ChannelPipeline<R, W>::DestructorGuard dg(
- static_cast<DelayedDestruction*>(this));
- ChannelPipeline<R, W, Handlers...>::detachTransport();
- forEachCtx([&](PipelineContext* ctx){
- ctx->detachTransport();
- });
- }
-
- std::shared_ptr<AsyncTransport> getTransport() {
- return ChannelPipeline<R, W>::transport_;
- }
-
- template <class H>
- ChannelPipeline& addBack(H&& handler) {
- ChannelPipeline<R, W>::addBack(std::move(handler));
- return *this;
- }
-
- template <class H>
- ChannelPipeline& addFront(H&& handler) {
- ctxs_.insert(
- ctxs_.begin(),
- folly::make_unique<ContextImpl<ChannelPipeline, H>>(
- this,
- std::move(handler)));
- return *this;
- }
-
- template <class H>
- H* getHandler(size_t i) {
- if (i > ctxs_.size()) {
- return ChannelPipeline<R, W, Handlers...>::template getHandler<H>(
- i - (ctxs_.size() + 1));
- } else {
- auto pctx = (i == ctxs_.size()) ? &ctx_ : ctxs_[i].get();
- auto ctx = dynamic_cast<ContextImpl<ChannelPipeline, H>*>(pctx);
- return ctx->getHandler();
- }
- }
-
- void finalize() {
- finalizeHelper();
- auto ctx = ctxs_.empty() ? &ctx_ : ctxs_.front().get();
- front_ = dynamic_cast<InboundChannelHandlerContext<R>*>(ctx);
- if (!front_) {
- throw std::invalid_argument("wrong type for first handler");
- }
- }
-
- protected:
- void finalizeHelper() {
- ChannelPipeline<R, W, Handlers...>::finalizeHelper();
- back_ = ChannelPipeline<R, W, Handlers...>::back_;
- if (!back_) {
- auto is_at_end = ChannelPipeline<R, W, Handlers...>::is_end;
- CHECK(is_at_end);
- back_ = dynamic_cast<OutboundChannelHandlerContext<W>*>(&ctx_);
- if (!back_) {
- throw std::invalid_argument("wrong type for last handler");
- }
- }
-
- if (!ctxs_.empty()) {
- for (size_t i = 0; i < ctxs_.size() - 1; i++) {
- ctxs_[i]->link(ctxs_[i+1].get());
- }
- ctxs_.back()->link(&ctx_);
- }
-
- auto nextFront = ChannelPipeline<R, W, Handlers...>::getLocalFront();
- if (nextFront) {
- ctx_.link(nextFront);
- }
- }
-
- PipelineContext* getLocalFront() {
- return ctxs_.empty() ? &ctx_ : ctxs_.front().get();
- }
-
- static const bool is_end{false};
- InboundChannelHandlerContext<R>* front_{nullptr};
- OutboundChannelHandlerContext<W>* back_{nullptr};
-
- private:
- template <class F>
- void forEachCtx(const F& func) {
- for (auto& ctx : ctxs_) {
- func(ctx.get());
- }
- func(&ctx_);
- }
-
- ContextImpl<ChannelPipeline, Handler> ctx_;
- std::vector<std::unique_ptr<PipelineContext>> ctxs_;
-};
-
-}}
-
-namespace folly {
-
-class AsyncSocket;
-
-template <typename Pipeline>
-class PipelineFactory {
- public:
- virtual Pipeline* newPipeline(std::shared_ptr<AsyncSocket>) = 0;
- virtual ~PipelineFactory() {}
-};
-
-}
--- /dev/null
+/*
+ * Copyright 2015 Facebook, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#pragma once
+
+#include <folly/futures/Future.h>
+#include <folly/wangle/channel/Pipeline.h>
+#include <folly/io/IOBuf.h>
+#include <folly/io/IOBufQueue.h>
+
+namespace folly { namespace wangle {
+
+template <class Rin, class Rout = Rin, class Win = Rout, class Wout = Rin>
+class Handler {
+ public:
+ typedef Rin rin;
+ typedef Rout rout;
+ typedef Win win;
+ typedef Wout wout;
+ typedef HandlerContext<Rout, Wout> Context;
+ virtual ~Handler() {}
+
+ virtual void read(Context* ctx, Rin msg) = 0;
+ virtual void readEOF(Context* ctx) {
+ ctx->fireReadEOF();
+ }
+ virtual void readException(Context* ctx, exception_wrapper e) {
+ ctx->fireReadException(std::move(e));
+ }
+
+ virtual Future<void> write(Context* ctx, Win msg) = 0;
+ virtual Future<void> close(Context* ctx) {
+ return ctx->fireClose();
+ }
+
+ virtual void attachPipeline(Context* ctx) {}
+ virtual void attachTransport(Context* ctx) {}
+
+ virtual void detachPipeline(Context* ctx) {}
+ virtual void detachTransport(Context* ctx) {}
+
+ /*
+ // Other sorts of things we might want, all shamelessly stolen from Netty
+ // inbound
+ virtual void exceptionCaught(
+ HandlerContext* ctx,
+ exception_wrapper e) {}
+ virtual void channelRegistered(HandlerContext* ctx) {}
+ virtual void channelUnregistered(HandlerContext* ctx) {}
+ virtual void channelActive(HandlerContext* ctx) {}
+ virtual void channelInactive(HandlerContext* ctx) {}
+ virtual void channelReadComplete(HandlerContext* ctx) {}
+ virtual void userEventTriggered(HandlerContext* ctx, void* evt) {}
+ virtual void channelWritabilityChanged(HandlerContext* ctx) {}
+
+ // outbound
+ virtual Future<void> bind(
+ HandlerContext* ctx,
+ SocketAddress localAddress) {}
+ virtual Future<void> connect(
+ HandlerContext* ctx,
+ SocketAddress remoteAddress, SocketAddress localAddress) {}
+ virtual Future<void> disconnect(HandlerContext* ctx) {}
+ virtual Future<void> deregister(HandlerContext* ctx) {}
+ virtual Future<void> read(HandlerContext* ctx) {}
+ virtual void flush(HandlerContext* ctx) {}
+ */
+};
+
+template <class R, class W = R>
+class HandlerAdapter : public Handler<R, R, W, W> {
+ public:
+ typedef typename Handler<R, R, W, W>::Context Context;
+
+ void read(Context* ctx, R msg) override {
+ ctx->fireRead(std::forward<R>(msg));
+ }
+
+ Future<void> write(Context* ctx, W msg) override {
+ return ctx->fireWrite(std::forward<W>(msg));
+ }
+};
+
+typedef HandlerAdapter<IOBufQueue&, std::unique_ptr<IOBuf>>
+BytesToBytesHandler;
+
+template <class HandlerT, bool Shared = true>
+class HandlerPtr : public Handler<
+ typename HandlerT::rin,
+ typename HandlerT::rout,
+ typename HandlerT::win,
+ typename HandlerT::wout> {
+ public:
+ typedef typename std::conditional<
+ Shared,
+ std::shared_ptr<HandlerT>,
+ HandlerT*>::type
+ Ptr;
+
+ typedef typename HandlerT::Context Context;
+
+ explicit HandlerPtr(Ptr handler)
+ : handler_(std::move(handler)) {}
+
+ Ptr getHandler() {
+ return handler_;
+ }
+
+ void setHandler(Ptr handler) {
+ if (handler == handler_) {
+ return;
+ }
+ if (handler_ && ctx_) {
+ handler_->detachPipeline(ctx_);
+ }
+ handler_ = std::move(handler);
+ if (handler_ && ctx_) {
+ handler_->attachPipeline(ctx_);
+ if (ctx_->getTransport()) {
+ handler_->attachTransport(ctx_);
+ }
+ }
+ }
+
+ void attachPipeline(Context* ctx) override {
+ ctx_ = ctx;
+ if (handler_) {
+ handler_->attachPipeline(ctx_);
+ }
+ }
+
+ void attachTransport(Context* ctx) override {
+ ctx_ = ctx;
+ if (handler_) {
+ handler_->attachTransport(ctx_);
+ }
+ }
+
+ void detachPipeline(Context* ctx) override {
+ ctx_ = ctx;
+ if (handler_) {
+ handler_->detachPipeline(ctx_);
+ }
+ }
+
+ void detachTransport(Context* ctx) override {
+ ctx_ = ctx;
+ if (handler_) {
+ handler_->detachTransport(ctx_);
+ }
+ }
+
+ void read(Context* ctx, typename HandlerT::rin msg) override {
+ DCHECK(handler_);
+ handler_->read(ctx, std::forward<typename HandlerT::rin>(msg));
+ }
+
+ void readEOF(Context* ctx) override {
+ DCHECK(handler_);
+ handler_->readEOF(ctx);
+ }
+
+ void readException(Context* ctx, exception_wrapper e) override {
+ DCHECK(handler_);
+ handler_->readException(ctx, std::move(e));
+ }
+
+ Future<void> write(Context* ctx, typename HandlerT::win msg) override {
+ DCHECK(handler_);
+ return handler_->write(ctx, std::forward<typename HandlerT::win>(msg));
+ }
+
+ Future<void> close(Context* ctx) override {
+ DCHECK(handler_);
+ return handler_->close(ctx);
+ }
+
+ private:
+ Context* ctx_;
+ Ptr handler_;
+};
+
+}}
--- /dev/null
+/*
+ * Copyright 2015 Facebook, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#pragma once
+
+#include <folly/io/async/AsyncTransport.h>
+#include <folly/futures/Future.h>
+#include <folly/ExceptionWrapper.h>
+
+namespace folly { namespace wangle {
+
+template <class In, class Out>
+class HandlerContext {
+ public:
+ virtual ~HandlerContext() {}
+
+ virtual void fireRead(In msg) = 0;
+ virtual void fireReadEOF() = 0;
+ virtual void fireReadException(exception_wrapper e) = 0;
+
+ virtual Future<void> fireWrite(Out msg) = 0;
+ virtual Future<void> fireClose() = 0;
+
+ virtual std::shared_ptr<AsyncTransport> getTransport() = 0;
+
+ virtual void setWriteFlags(WriteFlags flags) = 0;
+ virtual WriteFlags getWriteFlags() = 0;
+
+ virtual void setReadBufferSettings(
+ uint64_t minAvailable,
+ uint64_t allocationSize) = 0;
+ virtual std::pair<uint64_t, uint64_t> getReadBufferSettings() = 0;
+
+ /* TODO
+ template <class H>
+ virtual void addHandlerBefore(H&&) {}
+ template <class H>
+ virtual void addHandlerAfter(H&&) {}
+ template <class H>
+ virtual void replaceHandler(H&&) {}
+ virtual void removeHandler() {}
+ */
+};
+
+class PipelineContext {
+ public:
+ virtual ~PipelineContext() {}
+
+ virtual void attachTransport() = 0;
+ virtual void detachTransport() = 0;
+
+ void link(PipelineContext* other) {
+ setNextIn(other);
+ other->setNextOut(this);
+ }
+
+ protected:
+ virtual void setNextIn(PipelineContext* ctx) = 0;
+ virtual void setNextOut(PipelineContext* ctx) = 0;
+};
+
+template <class In>
+class InboundHandlerContext {
+ public:
+ virtual ~InboundHandlerContext() {}
+ virtual void read(In msg) = 0;
+ virtual void readEOF() = 0;
+ virtual void readException(exception_wrapper e) = 0;
+};
+
+template <class Out>
+class OutboundHandlerContext {
+ public:
+ virtual ~OutboundHandlerContext() {}
+ virtual Future<void> write(Out msg) = 0;
+ virtual Future<void> close() = 0;
+};
+
+template <class P, class H>
+class ContextImpl : public HandlerContext<typename H::rout,
+ typename H::wout>,
+ public InboundHandlerContext<typename H::rin>,
+ public OutboundHandlerContext<typename H::win>,
+ public PipelineContext {
+ public:
+ typedef typename H::rin Rin;
+ typedef typename H::rout Rout;
+ typedef typename H::win Win;
+ typedef typename H::wout Wout;
+
+ template <class HandlerArg>
+ explicit ContextImpl(P* pipeline, HandlerArg&& handlerArg)
+ : pipeline_(pipeline),
+ handler_(std::forward<HandlerArg>(handlerArg)) {
+ handler_.attachPipeline(this);
+ }
+
+ ~ContextImpl() {
+ handler_.detachPipeline(this);
+ }
+
+ H* getHandler() {
+ return &handler_;
+ }
+
+ // PipelineContext overrides
+ void setNextIn(PipelineContext* ctx) override {
+ auto nextIn = dynamic_cast<InboundHandlerContext<Rout>*>(ctx);
+ if (nextIn) {
+ nextIn_ = nextIn;
+ } else {
+ throw std::invalid_argument("wrong type in setNextIn");
+ }
+ }
+
+ void setNextOut(PipelineContext* ctx) override {
+ auto nextOut = dynamic_cast<OutboundHandlerContext<Wout>*>(ctx);
+ if (nextOut) {
+ nextOut_ = nextOut;
+ } else {
+ throw std::invalid_argument("wrong type in setNextOut");
+ }
+ }
+
+ void attachTransport() override {
+ typename P::DestructorGuard dg(static_cast<DelayedDestruction*>(pipeline_));
+ handler_.attachTransport(this);
+ }
+
+ void detachTransport() override {
+ typename P::DestructorGuard dg(static_cast<DelayedDestruction*>(pipeline_));
+ handler_.detachTransport(this);
+ }
+
+ // HandlerContext overrides
+ void fireRead(Rout msg) override {
+ typename P::DestructorGuard dg(static_cast<DelayedDestruction*>(pipeline_));
+ if (nextIn_) {
+ nextIn_->read(std::forward<Rout>(msg));
+ } else {
+ LOG(WARNING) << "read reached end of pipeline";
+ }
+ }
+
+ void fireReadEOF() override {
+ typename P::DestructorGuard dg(static_cast<DelayedDestruction*>(pipeline_));
+ if (nextIn_) {
+ nextIn_->readEOF();
+ } else {
+ LOG(WARNING) << "readEOF reached end of pipeline";
+ }
+ }
+
+ void fireReadException(exception_wrapper e) override {
+ typename P::DestructorGuard dg(static_cast<DelayedDestruction*>(pipeline_));
+ if (nextIn_) {
+ nextIn_->readException(std::move(e));
+ } else {
+ LOG(WARNING) << "readException reached end of pipeline";
+ }
+ }
+
+ Future<void> fireWrite(Wout msg) override {
+ typename P::DestructorGuard dg(static_cast<DelayedDestruction*>(pipeline_));
+ if (nextOut_) {
+ return nextOut_->write(std::forward<Wout>(msg));
+ } else {
+ LOG(WARNING) << "write reached end of pipeline";
+ return makeFuture();
+ }
+ }
+
+ Future<void> fireClose() override {
+ typename P::DestructorGuard dg(static_cast<DelayedDestruction*>(pipeline_));
+ if (nextOut_) {
+ return nextOut_->close();
+ } else {
+ LOG(WARNING) << "close reached end of pipeline";
+ return makeFuture();
+ }
+ }
+
+ std::shared_ptr<AsyncTransport> getTransport() override {
+ return pipeline_->getTransport();
+ }
+
+ void setWriteFlags(WriteFlags flags) override {
+ pipeline_->setWriteFlags(flags);
+ }
+
+ WriteFlags getWriteFlags() override {
+ return pipeline_->getWriteFlags();
+ }
+
+ void setReadBufferSettings(
+ uint64_t minAvailable,
+ uint64_t allocationSize) override {
+ pipeline_->setReadBufferSettings(minAvailable, allocationSize);
+ }
+
+ std::pair<uint64_t, uint64_t> getReadBufferSettings() override {
+ return pipeline_->getReadBufferSettings();
+ }
+
+ // InboundHandlerContext overrides
+ void read(Rin msg) override {
+ typename P::DestructorGuard dg(static_cast<DelayedDestruction*>(pipeline_));
+ handler_.read(this, std::forward<Rin>(msg));
+ }
+
+ void readEOF() override {
+ typename P::DestructorGuard dg(static_cast<DelayedDestruction*>(pipeline_));
+ handler_.readEOF(this);
+ }
+
+ void readException(exception_wrapper e) override {
+ typename P::DestructorGuard dg(static_cast<DelayedDestruction*>(pipeline_));
+ handler_.readException(this, std::move(e));
+ }
+
+ // OutboundHandlerContext overrides
+ Future<void> write(Win msg) override {
+ typename P::DestructorGuard dg(static_cast<DelayedDestruction*>(pipeline_));
+ return handler_.write(this, std::forward<Win>(msg));
+ }
+
+ Future<void> close() override {
+ typename P::DestructorGuard dg(static_cast<DelayedDestruction*>(pipeline_));
+ return handler_.close(this);
+ }
+
+ private:
+ P* pipeline_;
+ H handler_;
+ InboundHandlerContext<Rout>* nextIn_{nullptr};
+ OutboundHandlerContext<Wout>* nextOut_{nullptr};
+};
+
+}}
#pragma once
-#include <folly/wangle/channel/ChannelHandler.h>
+#include <folly/wangle/channel/Handler.h>
#include <folly/io/async/EventBase.h>
#include <folly/io/async/EventBaseManager.h>
#include <folly/io/IOBuf.h>
--- /dev/null
+/*
+ * Copyright 2015 Facebook, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#pragma once
+
+#include <folly/wangle/channel/HandlerContext.h>
+#include <folly/futures/Future.h>
+#include <folly/io/async/AsyncTransport.h>
+#include <folly/io/async/DelayedDestruction.h>
+#include <folly/ExceptionWrapper.h>
+#include <folly/Memory.h>
+#include <glog/logging.h>
+
+namespace folly { namespace wangle {
+
+/*
+ * R is the inbound type, i.e. inbound calls start with pipeline.read(R)
+ * W is the outbound type, i.e. outbound calls start with pipeline.write(W)
+ */
+template <class R, class W, class... Handlers>
+class Pipeline;
+
+template <class R, class W>
+class Pipeline<R, W> : public DelayedDestruction {
+ public:
+ Pipeline() {}
+ ~Pipeline() {}
+
+ std::shared_ptr<AsyncTransport> getTransport() {
+ return transport_;
+ }
+
+ void setWriteFlags(WriteFlags flags) {
+ writeFlags_ = flags;
+ }
+
+ WriteFlags getWriteFlags() {
+ return writeFlags_;
+ }
+
+ void setReadBufferSettings(uint64_t minAvailable, uint64_t allocationSize) {
+ readBufferSettings_ = std::make_pair(minAvailable, allocationSize);
+ }
+
+ std::pair<uint64_t, uint64_t> getReadBufferSettings() {
+ return readBufferSettings_;
+ }
+
+ void read(R msg) {
+ front_->read(std::forward<R>(msg));
+ }
+
+ void readEOF() {
+ front_->readEOF();
+ }
+
+ void readException(exception_wrapper e) {
+ front_->readException(std::move(e));
+ }
+
+ Future<void> write(W msg) {
+ return back_->write(std::forward<W>(msg));
+ }
+
+ Future<void> close() {
+ return back_->close();
+ }
+
+ template <class H>
+ Pipeline& addBack(H&& handler) {
+ ctxs_.push_back(folly::make_unique<ContextImpl<Pipeline, H>>(
+ this, std::forward<H>(handler)));
+ return *this;
+ }
+
+ template <class H>
+ Pipeline& addFront(H&& handler) {
+ ctxs_.insert(
+ ctxs_.begin(),
+ folly::make_unique<ContextImpl<Pipeline, H>>(
+ this,
+ std::forward<H>(handler)));
+ return *this;
+ }
+
+ template <class H>
+ H* getHandler(int i) {
+ auto ctx = dynamic_cast<ContextImpl<Pipeline, H>*>(ctxs_[i].get());
+ CHECK(ctx);
+ return ctx->getHandler();
+ }
+
+ void finalize() {
+ finalizeHelper();
+ InboundHandlerContext<R>* front;
+ front_ = dynamic_cast<InboundHandlerContext<R>*>(
+ ctxs_.front().get());
+ if (!front_) {
+ throw std::invalid_argument("wrong type for first handler");
+ }
+ }
+
+ protected:
+ explicit Pipeline(bool shouldFinalize) {
+ CHECK(!shouldFinalize);
+ }
+
+ void finalizeHelper() {
+ if (ctxs_.empty()) {
+ return;
+ }
+
+ for (size_t i = 0; i < ctxs_.size() - 1; i++) {
+ ctxs_[i]->link(ctxs_[i+1].get());
+ }
+
+ back_ = dynamic_cast<OutboundHandlerContext<W>*>(ctxs_.back().get());
+ if (!back_) {
+ throw std::invalid_argument("wrong type for last handler");
+ }
+ }
+
+ PipelineContext* getLocalFront() {
+ return ctxs_.empty() ? nullptr : ctxs_.front().get();
+ }
+
+ static const bool is_end{true};
+
+ std::shared_ptr<AsyncTransport> transport_;
+ WriteFlags writeFlags_{WriteFlags::NONE};
+ std::pair<uint64_t, uint64_t> readBufferSettings_{2048, 2048};
+
+ void attachPipeline() {}
+
+ void attachTransport(
+ std::shared_ptr<AsyncTransport> transport) {
+ transport_ = std::move(transport);
+ }
+
+ void detachTransport() {
+ transport_ = nullptr;
+ }
+
+ OutboundHandlerContext<W>* back_{nullptr};
+
+ private:
+ InboundHandlerContext<R>* front_{nullptr};
+ std::vector<std::unique_ptr<PipelineContext>> ctxs_;
+};
+
+template <class R, class W, class Handler, class... Handlers>
+class Pipeline<R, W, Handler, Handlers...>
+ : public Pipeline<R, W, Handlers...> {
+ protected:
+ template <class HandlerArg, class... HandlersArgs>
+ Pipeline(
+ bool shouldFinalize,
+ HandlerArg&& handlerArg,
+ HandlersArgs&&... handlersArgs)
+ : Pipeline<R, W, Handlers...>(
+ false,
+ std::forward<HandlersArgs>(handlersArgs)...),
+ ctx_(this, std::forward<HandlerArg>(handlerArg)) {
+ if (shouldFinalize) {
+ finalize();
+ }
+ }
+
+ public:
+ template <class... HandlersArgs>
+ explicit Pipeline(HandlersArgs&&... handlersArgs)
+ : Pipeline(true, std::forward<HandlersArgs>(handlersArgs)...) {}
+
+ ~Pipeline() {}
+
+ void read(R msg) {
+ typename Pipeline<R, W>::DestructorGuard dg(
+ static_cast<DelayedDestruction*>(this));
+ front_->read(std::forward<R>(msg));
+ }
+
+ void readEOF() {
+ typename Pipeline<R, W>::DestructorGuard dg(
+ static_cast<DelayedDestruction*>(this));
+ front_->readEOF();
+ }
+
+ void readException(exception_wrapper e) {
+ typename Pipeline<R, W>::DestructorGuard dg(
+ static_cast<DelayedDestruction*>(this));
+ front_->readException(std::move(e));
+ }
+
+ Future<void> write(W msg) {
+ typename Pipeline<R, W>::DestructorGuard dg(
+ static_cast<DelayedDestruction*>(this));
+ return back_->write(std::forward<W>(msg));
+ }
+
+ Future<void> close() {
+ typename Pipeline<R, W>::DestructorGuard dg(
+ static_cast<DelayedDestruction*>(this));
+ return back_->close();
+ }
+
+ void attachTransport(
+ std::shared_ptr<AsyncTransport> transport) {
+ typename Pipeline<R, W>::DestructorGuard dg(
+ static_cast<DelayedDestruction*>(this));
+ CHECK((!Pipeline<R, W>::transport_));
+ Pipeline<R, W, Handlers...>::attachTransport(std::move(transport));
+ forEachCtx([&](PipelineContext* ctx){
+ ctx->attachTransport();
+ });
+ }
+
+ void detachTransport() {
+ typename Pipeline<R, W>::DestructorGuard dg(
+ static_cast<DelayedDestruction*>(this));
+ Pipeline<R, W, Handlers...>::detachTransport();
+ forEachCtx([&](PipelineContext* ctx){
+ ctx->detachTransport();
+ });
+ }
+
+ std::shared_ptr<AsyncTransport> getTransport() {
+ return Pipeline<R, W>::transport_;
+ }
+
+ template <class H>
+ Pipeline& addBack(H&& handler) {
+ Pipeline<R, W>::addBack(std::move(handler));
+ return *this;
+ }
+
+ template <class H>
+ Pipeline& addFront(H&& handler) {
+ ctxs_.insert(
+ ctxs_.begin(),
+ folly::make_unique<ContextImpl<Pipeline, H>>(
+ this,
+ std::move(handler)));
+ return *this;
+ }
+
+ template <class H>
+ H* getHandler(size_t i) {
+ if (i > ctxs_.size()) {
+ return Pipeline<R, W, Handlers...>::template getHandler<H>(
+ i - (ctxs_.size() + 1));
+ } else {
+ auto pctx = (i == ctxs_.size()) ? &ctx_ : ctxs_[i].get();
+ auto ctx = dynamic_cast<ContextImpl<Pipeline, H>*>(pctx);
+ return ctx->getHandler();
+ }
+ }
+
+ void finalize() {
+ finalizeHelper();
+ auto ctx = ctxs_.empty() ? &ctx_ : ctxs_.front().get();
+ front_ = dynamic_cast<InboundHandlerContext<R>*>(ctx);
+ if (!front_) {
+ throw std::invalid_argument("wrong type for first handler");
+ }
+ }
+
+ protected:
+ void finalizeHelper() {
+ Pipeline<R, W, Handlers...>::finalizeHelper();
+ back_ = Pipeline<R, W, Handlers...>::back_;
+ if (!back_) {
+ auto is_at_end = Pipeline<R, W, Handlers...>::is_end;
+ CHECK(is_at_end);
+ back_ = dynamic_cast<OutboundHandlerContext<W>*>(&ctx_);
+ if (!back_) {
+ throw std::invalid_argument("wrong type for last handler");
+ }
+ }
+
+ if (!ctxs_.empty()) {
+ for (size_t i = 0; i < ctxs_.size() - 1; i++) {
+ ctxs_[i]->link(ctxs_[i+1].get());
+ }
+ ctxs_.back()->link(&ctx_);
+ }
+
+ auto nextFront = Pipeline<R, W, Handlers...>::getLocalFront();
+ if (nextFront) {
+ ctx_.link(nextFront);
+ }
+ }
+
+ PipelineContext* getLocalFront() {
+ return ctxs_.empty() ? &ctx_ : ctxs_.front().get();
+ }
+
+ static const bool is_end{false};
+ InboundHandlerContext<R>* front_{nullptr};
+ OutboundHandlerContext<W>* back_{nullptr};
+
+ private:
+ template <class F>
+ void forEachCtx(const F& func) {
+ for (auto& ctx : ctxs_) {
+ func(ctx.get());
+ }
+ func(&ctx_);
+ }
+
+ ContextImpl<Pipeline, Handler> ctx_;
+ std::vector<std::unique_ptr<PipelineContext>> ctxs_;
+};
+
+}}
+
+namespace folly {
+
+class AsyncSocket;
+
+template <typename Pipeline>
+class PipelineFactory {
+ public:
+ virtual Pipeline* newPipeline(std::shared_ptr<AsyncSocket>) = 0;
+ virtual ~PipelineFactory() {}
+};
+
+}
+++ /dev/null
-/*
- * Copyright 2015 Facebook, Inc.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-#include <folly/wangle/channel/ChannelHandler.h>
-#include <folly/wangle/channel/ChannelPipeline.h>
-#include <folly/wangle/channel/AsyncSocketHandler.h>
-#include <folly/wangle/channel/OutputBufferingHandler.h>
-#include <folly/wangle/channel/test/MockChannelHandler.h>
-#include <gmock/gmock.h>
-#include <gtest/gtest.h>
-
-using namespace folly;
-using namespace folly::wangle;
-using namespace testing;
-
-typedef StrictMock<MockChannelHandlerAdapter<int, int>> IntHandler;
-typedef ChannelHandlerPtr<IntHandler, false> IntHandlerPtr;
-
-ACTION(FireRead) {
- arg0->fireRead(arg1);
-}
-
-ACTION(FireReadEOF) {
- arg0->fireReadEOF();
-}
-
-ACTION(FireReadException) {
- arg0->fireReadException(arg1);
-}
-
-ACTION(FireWrite) {
- arg0->fireWrite(arg1);
-}
-
-ACTION(FireClose) {
- arg0->fireClose();
-}
-
-// Test move only types, among other things
-TEST(ChannelTest, RealHandlersCompile) {
- EventBase eb;
- auto socket = AsyncSocket::newSocket(&eb);
- // static
- {
- ChannelPipeline<IOBufQueue&, std::unique_ptr<IOBuf>,
- AsyncSocketHandler,
- OutputBufferingHandler>
- pipeline{AsyncSocketHandler(socket), OutputBufferingHandler()};
- EXPECT_TRUE(pipeline.getHandler<AsyncSocketHandler>(0));
- EXPECT_TRUE(pipeline.getHandler<OutputBufferingHandler>(1));
- }
- // dynamic
- {
- ChannelPipeline<IOBufQueue&, std::unique_ptr<IOBuf>> pipeline;
- pipeline
- .addBack(AsyncSocketHandler(socket))
- .addBack(OutputBufferingHandler())
- .finalize();
- EXPECT_TRUE(pipeline.getHandler<AsyncSocketHandler>(0));
- EXPECT_TRUE(pipeline.getHandler<OutputBufferingHandler>(1));
- }
-}
-
-// Test that handlers correctly fire the next handler when directed
-TEST(ChannelTest, FireActions) {
- IntHandler handler1;
- IntHandler handler2;
-
- EXPECT_CALL(handler1, attachPipeline(_));
- EXPECT_CALL(handler2, attachPipeline(_));
-
- ChannelPipeline<int, int, IntHandlerPtr, IntHandlerPtr>
- pipeline(&handler1, &handler2);
-
- EXPECT_CALL(handler1, read_(_, _)).WillOnce(FireRead());
- EXPECT_CALL(handler2, read_(_, _)).Times(1);
- pipeline.read(1);
-
- EXPECT_CALL(handler1, readEOF(_)).WillOnce(FireReadEOF());
- EXPECT_CALL(handler2, readEOF(_)).Times(1);
- pipeline.readEOF();
-
- EXPECT_CALL(handler1, readException(_, _)).WillOnce(FireReadException());
- EXPECT_CALL(handler2, readException(_, _)).Times(1);
- pipeline.readException(make_exception_wrapper<std::runtime_error>("blah"));
-
- EXPECT_CALL(handler2, write_(_, _)).WillOnce(FireWrite());
- EXPECT_CALL(handler1, write_(_, _)).Times(1);
- EXPECT_NO_THROW(pipeline.write(1).value());
-
- EXPECT_CALL(handler2, close_(_)).WillOnce(FireClose());
- EXPECT_CALL(handler1, close_(_)).Times(1);
- EXPECT_NO_THROW(pipeline.close().value());
-
- EXPECT_CALL(handler1, detachPipeline(_));
- EXPECT_CALL(handler2, detachPipeline(_));
-}
-
-// Test that nothing bad happens when actions reach the end of the pipeline
-// (a warning will be logged, however)
-TEST(ChannelTest, ReachEndOfPipeline) {
- IntHandler handler;
- EXPECT_CALL(handler, attachPipeline(_));
- ChannelPipeline<int, int, IntHandlerPtr>
- pipeline(&handler);
-
- EXPECT_CALL(handler, read_(_, _)).WillOnce(FireRead());
- pipeline.read(1);
-
- EXPECT_CALL(handler, readEOF(_)).WillOnce(FireReadEOF());
- pipeline.readEOF();
-
- EXPECT_CALL(handler, readException(_, _)).WillOnce(FireReadException());
- pipeline.readException(make_exception_wrapper<std::runtime_error>("blah"));
-
- EXPECT_CALL(handler, write_(_, _)).WillOnce(FireWrite());
- EXPECT_NO_THROW(pipeline.write(1).value());
-
- EXPECT_CALL(handler, close_(_)).WillOnce(FireClose());
- EXPECT_NO_THROW(pipeline.close().value());
-
- EXPECT_CALL(handler, detachPipeline(_));
-}
-
-// Test having the last read handler turn around and write
-TEST(ChannelTest, TurnAround) {
- IntHandler handler1;
- IntHandler handler2;
-
- EXPECT_CALL(handler1, attachPipeline(_));
- EXPECT_CALL(handler2, attachPipeline(_));
-
- ChannelPipeline<int, int, IntHandlerPtr, IntHandlerPtr>
- pipeline(&handler1, &handler2);
-
- EXPECT_CALL(handler1, read_(_, _)).WillOnce(FireRead());
- EXPECT_CALL(handler2, read_(_, _)).WillOnce(FireWrite());
- EXPECT_CALL(handler1, write_(_, _)).Times(1);
- pipeline.read(1);
-
- EXPECT_CALL(handler1, detachPipeline(_));
- EXPECT_CALL(handler2, detachPipeline(_));
-}
-
-TEST(ChannelTest, DynamicFireActions) {
- IntHandler handler1, handler2, handler3;
- EXPECT_CALL(handler2, attachPipeline(_));
- ChannelPipeline<int, int, IntHandlerPtr>
- pipeline(&handler2);
-
- EXPECT_CALL(handler1, attachPipeline(_));
- EXPECT_CALL(handler3, attachPipeline(_));
-
- pipeline
- .addFront(IntHandlerPtr(&handler1))
- .addBack(IntHandlerPtr(&handler3))
- .finalize();
-
- EXPECT_TRUE(pipeline.getHandler<IntHandlerPtr>(0));
- EXPECT_TRUE(pipeline.getHandler<IntHandlerPtr>(1));
- EXPECT_TRUE(pipeline.getHandler<IntHandlerPtr>(2));
-
- EXPECT_CALL(handler1, read_(_, _)).WillOnce(FireRead());
- EXPECT_CALL(handler2, read_(_, _)).WillOnce(FireRead());
- EXPECT_CALL(handler3, read_(_, _)).Times(1);
- pipeline.read(1);
-
- EXPECT_CALL(handler3, write_(_, _)).WillOnce(FireWrite());
- EXPECT_CALL(handler2, write_(_, _)).WillOnce(FireWrite());
- EXPECT_CALL(handler1, write_(_, _)).Times(1);
- EXPECT_NO_THROW(pipeline.write(1).value());
-
- EXPECT_CALL(handler1, detachPipeline(_));
- EXPECT_CALL(handler2, detachPipeline(_));
- EXPECT_CALL(handler3, detachPipeline(_));
-}
-
-template <class Rin, class Rout = Rin, class Win = Rout, class Wout = Rin>
-class ConcreteChannelHandler : public ChannelHandler<Rin, Rout, Win, Wout> {
- typedef typename ChannelHandler<Rin, Rout, Win, Wout>::Context Context;
- public:
- void read(Context* ctx, Rin msg) {}
- Future<void> write(Context* ctx, Win msg) { return makeFuture(); }
-};
-
-typedef ChannelHandlerAdapter<std::string, std::string> StringHandler;
-typedef ConcreteChannelHandler<int, std::string> IntToStringHandler;
-typedef ConcreteChannelHandler<std::string, int> StringToIntHandler;
-
-TEST(ChannelPipeline, DynamicConstruction) {
- {
- ChannelPipeline<int, int> pipeline;
- EXPECT_THROW(
- pipeline
- .addBack(ChannelHandlerAdapter<std::string, std::string>{})
- .finalize(), std::invalid_argument);
- }
- {
- ChannelPipeline<int, int> pipeline;
- EXPECT_THROW(
- pipeline
- .addFront(ChannelHandlerAdapter<std::string, std::string>{})
- .finalize(),
- std::invalid_argument);
- }
- {
- ChannelPipeline<std::string, std::string, StringHandler, StringHandler>
- pipeline{StringHandler(), StringHandler()};
-
- // Exercise both addFront and addBack. Final pipeline is
- // StI <-> ItS <-> StS <-> StS <-> StI <-> ItS
- EXPECT_NO_THROW(
- pipeline
- .addFront(IntToStringHandler{})
- .addFront(StringToIntHandler{})
- .addBack(StringToIntHandler{})
- .addBack(IntToStringHandler{})
- .finalize());
- }
-}
-
-TEST(ChannelPipeline, AttachTransport) {
- IntHandler handler;
- EXPECT_CALL(handler, attachPipeline(_));
- ChannelPipeline<int, int, IntHandlerPtr>
- pipeline(&handler);
-
- EventBase eb;
- auto socket = AsyncSocket::newSocket(&eb);
-
- EXPECT_CALL(handler, attachTransport(_));
- pipeline.attachTransport(socket);
-
- EXPECT_CALL(handler, detachTransport(_));
- pipeline.detachTransport();
-
- EXPECT_CALL(handler, detachPipeline(_));
-}
+++ /dev/null
-/*
- * Copyright 2015 Facebook, Inc.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-#pragma once
-
-#include <folly/wangle/channel/ChannelHandler.h>
-#include <gmock/gmock.h>
-
-namespace folly { namespace wangle {
-
-template <class Rin, class Rout = Rin, class Win = Rout, class Wout = Rin>
-class MockChannelHandler : public ChannelHandler<Rin, Rout, Win, Wout> {
- public:
- typedef typename ChannelHandler<Rin, Rout, Win, Wout>::Context Context;
-
- MockChannelHandler() = default;
- MockChannelHandler(MockChannelHandler&&) = default;
-
-#ifdef __clang__
-# pragma clang diagnostic push
-# if __clang_major__ > 3 || __clang_minor__ >= 6
-# pragma clang diagnostic ignored "-Winconsistent-missing-override"
-# endif
-#endif
-
- MOCK_METHOD2_T(read_, void(Context*, Rin&));
- MOCK_METHOD1_T(readEOF, void(Context*));
- MOCK_METHOD2_T(readException, void(Context*, exception_wrapper));
-
- MOCK_METHOD2_T(write_, void(Context*, Win&));
- MOCK_METHOD1_T(close_, void(Context*));
-
- MOCK_METHOD1_T(attachPipeline, void(Context*));
- MOCK_METHOD1_T(attachTransport, void(Context*));
- MOCK_METHOD1_T(detachPipeline, void(Context*));
- MOCK_METHOD1_T(detachTransport, void(Context*));
-
-#ifdef __clang__
-#pragma clang diagnostic pop
-#endif
-
- void read(Context* ctx, Rin msg) override {
- read_(ctx, msg);
- }
-
- Future<void> write(Context* ctx, Win msg) override {
- return makeFutureWith([&](){
- write_(ctx, msg);
- });
- }
-
- Future<void> close(Context* ctx) override {
- return makeFutureWith([&](){
- close_(ctx);
- });
- }
-};
-
-template <class R, class W = R>
-using MockChannelHandlerAdapter = MockChannelHandler<R, R, W, W>;
-
-}}
--- /dev/null
+/*
+ * Copyright 2015 Facebook, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#pragma once
+
+#include <folly/wangle/channel/Handler.h>
+#include <gmock/gmock.h>
+
+namespace folly { namespace wangle {
+
+template <class Rin, class Rout = Rin, class Win = Rout, class Wout = Rin>
+class MockHandler : public Handler<Rin, Rout, Win, Wout> {
+ public:
+ typedef typename Handler<Rin, Rout, Win, Wout>::Context Context;
+
+ MockHandler() = default;
+ MockHandler(MockHandler&&) = default;
+
+#ifdef __clang__
+# pragma clang diagnostic push
+# if __clang_major__ > 3 || __clang_minor__ >= 6
+# pragma clang diagnostic ignored "-Winconsistent-missing-override"
+# endif
+#endif
+
+ MOCK_METHOD2_T(read_, void(Context*, Rin&));
+ MOCK_METHOD1_T(readEOF, void(Context*));
+ MOCK_METHOD2_T(readException, void(Context*, exception_wrapper));
+
+ MOCK_METHOD2_T(write_, void(Context*, Win&));
+ MOCK_METHOD1_T(close_, void(Context*));
+
+ MOCK_METHOD1_T(attachPipeline, void(Context*));
+ MOCK_METHOD1_T(attachTransport, void(Context*));
+ MOCK_METHOD1_T(detachPipeline, void(Context*));
+ MOCK_METHOD1_T(detachTransport, void(Context*));
+
+#ifdef __clang__
+#pragma clang diagnostic pop
+#endif
+
+ void read(Context* ctx, Rin msg) override {
+ read_(ctx, msg);
+ }
+
+ Future<void> write(Context* ctx, Win msg) override {
+ return makeFutureWith([&](){
+ write_(ctx, msg);
+ });
+ }
+
+ Future<void> close(Context* ctx) override {
+ return makeFutureWith([&](){
+ close_(ctx);
+ });
+ }
+};
+
+template <class R, class W = R>
+using MockHandlerAdapter = MockHandler<R, R, W, W>;
+
+}}
* limitations under the License.
*/
-#include <folly/wangle/channel/ChannelPipeline.h>
+#include <folly/wangle/channel/Pipeline.h>
#include <folly/wangle/channel/OutputBufferingHandler.h>
-#include <folly/wangle/channel/test/MockChannelHandler.h>
+#include <folly/wangle/channel/test/MockHandler.h>
#include <folly/io/async/AsyncSocket.h>
#include <gmock/gmock.h>
#include <gtest/gtest.h>
using namespace folly::wangle;
using namespace testing;
-typedef StrictMock<MockChannelHandlerAdapter<
+typedef StrictMock<MockHandlerAdapter<
IOBufQueue&,
std::unique_ptr<IOBuf>>>
-MockHandler;
+MockBytesHandler;
MATCHER_P(IOBufContains, str, "") { return arg->moveToFbString() == str; }
TEST(OutputBufferingHandlerTest, Basic) {
- MockHandler mockHandler;
+ MockBytesHandler mockHandler;
EXPECT_CALL(mockHandler, attachPipeline(_));
- ChannelPipeline<IOBufQueue&, std::unique_ptr<IOBuf>,
- ChannelHandlerPtr<MockHandler, false>,
+ Pipeline<IOBufQueue&, std::unique_ptr<IOBuf>,
+ HandlerPtr<MockBytesHandler, false>,
OutputBufferingHandler>
pipeline(&mockHandler, OutputBufferingHandler{});
--- /dev/null
+/*
+ * Copyright 2015 Facebook, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include <folly/wangle/channel/Handler.h>
+#include <folly/wangle/channel/Pipeline.h>
+#include <folly/wangle/channel/AsyncSocketHandler.h>
+#include <folly/wangle/channel/OutputBufferingHandler.h>
+#include <folly/wangle/channel/test/MockHandler.h>
+#include <gmock/gmock.h>
+#include <gtest/gtest.h>
+
+using namespace folly;
+using namespace folly::wangle;
+using namespace testing;
+
+typedef StrictMock<MockHandlerAdapter<int, int>> IntHandler;
+typedef HandlerPtr<IntHandler, false> IntHandlerPtr;
+
+ACTION(FireRead) {
+ arg0->fireRead(arg1);
+}
+
+ACTION(FireReadEOF) {
+ arg0->fireReadEOF();
+}
+
+ACTION(FireReadException) {
+ arg0->fireReadException(arg1);
+}
+
+ACTION(FireWrite) {
+ arg0->fireWrite(arg1);
+}
+
+ACTION(FireClose) {
+ arg0->fireClose();
+}
+
+// Test move only types, among other things
+TEST(PipelineTest, RealHandlersCompile) {
+ EventBase eb;
+ auto socket = AsyncSocket::newSocket(&eb);
+ // static
+ {
+ Pipeline<IOBufQueue&, std::unique_ptr<IOBuf>,
+ AsyncSocketHandler,
+ OutputBufferingHandler>
+ pipeline{AsyncSocketHandler(socket), OutputBufferingHandler()};
+ EXPECT_TRUE(pipeline.getHandler<AsyncSocketHandler>(0));
+ EXPECT_TRUE(pipeline.getHandler<OutputBufferingHandler>(1));
+ }
+ // dynamic
+ {
+ Pipeline<IOBufQueue&, std::unique_ptr<IOBuf>> pipeline;
+ pipeline
+ .addBack(AsyncSocketHandler(socket))
+ .addBack(OutputBufferingHandler())
+ .finalize();
+ EXPECT_TRUE(pipeline.getHandler<AsyncSocketHandler>(0));
+ EXPECT_TRUE(pipeline.getHandler<OutputBufferingHandler>(1));
+ }
+}
+
+// Test that handlers correctly fire the next handler when directed
+TEST(PipelineTest, FireActions) {
+ IntHandler handler1;
+ IntHandler handler2;
+
+ EXPECT_CALL(handler1, attachPipeline(_));
+ EXPECT_CALL(handler2, attachPipeline(_));
+
+ Pipeline<int, int, IntHandlerPtr, IntHandlerPtr>
+ pipeline(&handler1, &handler2);
+
+ EXPECT_CALL(handler1, read_(_, _)).WillOnce(FireRead());
+ EXPECT_CALL(handler2, read_(_, _)).Times(1);
+ pipeline.read(1);
+
+ EXPECT_CALL(handler1, readEOF(_)).WillOnce(FireReadEOF());
+ EXPECT_CALL(handler2, readEOF(_)).Times(1);
+ pipeline.readEOF();
+
+ EXPECT_CALL(handler1, readException(_, _)).WillOnce(FireReadException());
+ EXPECT_CALL(handler2, readException(_, _)).Times(1);
+ pipeline.readException(make_exception_wrapper<std::runtime_error>("blah"));
+
+ EXPECT_CALL(handler2, write_(_, _)).WillOnce(FireWrite());
+ EXPECT_CALL(handler1, write_(_, _)).Times(1);
+ EXPECT_NO_THROW(pipeline.write(1).value());
+
+ EXPECT_CALL(handler2, close_(_)).WillOnce(FireClose());
+ EXPECT_CALL(handler1, close_(_)).Times(1);
+ EXPECT_NO_THROW(pipeline.close().value());
+
+ EXPECT_CALL(handler1, detachPipeline(_));
+ EXPECT_CALL(handler2, detachPipeline(_));
+}
+
+// Test that nothing bad happens when actions reach the end of the pipeline
+// (a warning will be logged, however)
+TEST(PipelineTest, ReachEndOfPipeline) {
+ IntHandler handler;
+ EXPECT_CALL(handler, attachPipeline(_));
+ Pipeline<int, int, IntHandlerPtr>
+ pipeline(&handler);
+
+ EXPECT_CALL(handler, read_(_, _)).WillOnce(FireRead());
+ pipeline.read(1);
+
+ EXPECT_CALL(handler, readEOF(_)).WillOnce(FireReadEOF());
+ pipeline.readEOF();
+
+ EXPECT_CALL(handler, readException(_, _)).WillOnce(FireReadException());
+ pipeline.readException(make_exception_wrapper<std::runtime_error>("blah"));
+
+ EXPECT_CALL(handler, write_(_, _)).WillOnce(FireWrite());
+ EXPECT_NO_THROW(pipeline.write(1).value());
+
+ EXPECT_CALL(handler, close_(_)).WillOnce(FireClose());
+ EXPECT_NO_THROW(pipeline.close().value());
+
+ EXPECT_CALL(handler, detachPipeline(_));
+}
+
+// Test having the last read handler turn around and write
+TEST(PipelineTest, TurnAround) {
+ IntHandler handler1;
+ IntHandler handler2;
+
+ EXPECT_CALL(handler1, attachPipeline(_));
+ EXPECT_CALL(handler2, attachPipeline(_));
+
+ Pipeline<int, int, IntHandlerPtr, IntHandlerPtr>
+ pipeline(&handler1, &handler2);
+
+ EXPECT_CALL(handler1, read_(_, _)).WillOnce(FireRead());
+ EXPECT_CALL(handler2, read_(_, _)).WillOnce(FireWrite());
+ EXPECT_CALL(handler1, write_(_, _)).Times(1);
+ pipeline.read(1);
+
+ EXPECT_CALL(handler1, detachPipeline(_));
+ EXPECT_CALL(handler2, detachPipeline(_));
+}
+
+TEST(PipelineTest, DynamicFireActions) {
+ IntHandler handler1, handler2, handler3;
+ EXPECT_CALL(handler2, attachPipeline(_));
+ Pipeline<int, int, IntHandlerPtr>
+ pipeline(&handler2);
+
+ EXPECT_CALL(handler1, attachPipeline(_));
+ EXPECT_CALL(handler3, attachPipeline(_));
+
+ pipeline
+ .addFront(IntHandlerPtr(&handler1))
+ .addBack(IntHandlerPtr(&handler3))
+ .finalize();
+
+ EXPECT_TRUE(pipeline.getHandler<IntHandlerPtr>(0));
+ EXPECT_TRUE(pipeline.getHandler<IntHandlerPtr>(1));
+ EXPECT_TRUE(pipeline.getHandler<IntHandlerPtr>(2));
+
+ EXPECT_CALL(handler1, read_(_, _)).WillOnce(FireRead());
+ EXPECT_CALL(handler2, read_(_, _)).WillOnce(FireRead());
+ EXPECT_CALL(handler3, read_(_, _)).Times(1);
+ pipeline.read(1);
+
+ EXPECT_CALL(handler3, write_(_, _)).WillOnce(FireWrite());
+ EXPECT_CALL(handler2, write_(_, _)).WillOnce(FireWrite());
+ EXPECT_CALL(handler1, write_(_, _)).Times(1);
+ EXPECT_NO_THROW(pipeline.write(1).value());
+
+ EXPECT_CALL(handler1, detachPipeline(_));
+ EXPECT_CALL(handler2, detachPipeline(_));
+ EXPECT_CALL(handler3, detachPipeline(_));
+}
+
+template <class Rin, class Rout = Rin, class Win = Rout, class Wout = Rin>
+class ConcreteHandler : public Handler<Rin, Rout, Win, Wout> {
+ typedef typename Handler<Rin, Rout, Win, Wout>::Context Context;
+ public:
+ void read(Context* ctx, Rin msg) {}
+ Future<void> write(Context* ctx, Win msg) { return makeFuture(); }
+};
+
+typedef HandlerAdapter<std::string, std::string> StringHandler;
+typedef ConcreteHandler<int, std::string> IntToStringHandler;
+typedef ConcreteHandler<std::string, int> StringToIntHandler;
+
+TEST(Pipeline, DynamicConstruction) {
+ {
+ Pipeline<int, int> pipeline;
+ EXPECT_THROW(
+ pipeline
+ .addBack(HandlerAdapter<std::string, std::string>{})
+ .finalize(), std::invalid_argument);
+ }
+ {
+ Pipeline<int, int> pipeline;
+ EXPECT_THROW(
+ pipeline
+ .addFront(HandlerAdapter<std::string, std::string>{})
+ .finalize(),
+ std::invalid_argument);
+ }
+ {
+ Pipeline<std::string, std::string, StringHandler, StringHandler>
+ pipeline{StringHandler(), StringHandler()};
+
+ // Exercise both addFront and addBack. Final pipeline is
+ // StI <-> ItS <-> StS <-> StS <-> StI <-> ItS
+ EXPECT_NO_THROW(
+ pipeline
+ .addFront(IntToStringHandler{})
+ .addFront(StringToIntHandler{})
+ .addBack(StringToIntHandler{})
+ .addBack(IntToStringHandler{})
+ .finalize());
+ }
+}
+
+TEST(Pipeline, AttachTransport) {
+ IntHandler handler;
+ EXPECT_CALL(handler, attachPipeline(_));
+ Pipeline<int, int, IntHandlerPtr>
+ pipeline(&handler);
+
+ EventBase eb;
+ auto socket = AsyncSocket::newSocket(&eb);
+
+ EXPECT_CALL(handler, attachTransport(_));
+ pipeline.attachTransport(socket);
+
+ EXPECT_CALL(handler, detachTransport(_));
+ pipeline.detachTransport();
+
+ EXPECT_CALL(handler, detachPipeline(_));
+}
*/
#pragma once
-#include <folly/wangle/channel/ChannelHandler.h>
+#include <folly/wangle/channel/Handler.h>
namespace folly { namespace wangle {
/**
- * A ChannelHandler which decodes bytes in a stream-like fashion from
+ * A Handler which decodes bytes in a stream-like fashion from
* IOBufQueue to a Message type.
*
* Frame detection
};
TEST(FixedLengthFrameDecoder, FailWhenLengthFieldEndOffset) {
- ChannelPipeline<IOBufQueue&, std::unique_ptr<IOBuf>> pipeline;
+ Pipeline<IOBufQueue&, std::unique_ptr<IOBuf>> pipeline;
int called = 0;
pipeline
}
TEST(LengthFieldFramePipeline, SimpleTest) {
- ChannelPipeline<IOBufQueue&, std::unique_ptr<IOBuf>> pipeline;
+ Pipeline<IOBufQueue&, std::unique_ptr<IOBuf>> pipeline;
int called = 0;
pipeline
}
TEST(LengthFieldFramePipeline, LittleEndian) {
- ChannelPipeline<IOBufQueue&, std::unique_ptr<IOBuf>> pipeline;
+ Pipeline<IOBufQueue&, std::unique_ptr<IOBuf>> pipeline;
int called = 0;
pipeline
}
TEST(LengthFieldFrameDecoder, Simple) {
- ChannelPipeline<IOBufQueue&, std::unique_ptr<IOBuf>> pipeline;
+ Pipeline<IOBufQueue&, std::unique_ptr<IOBuf>> pipeline;
int called = 0;
pipeline
}
TEST(LengthFieldFrameDecoder, NoStrip) {
- ChannelPipeline<IOBufQueue&, std::unique_ptr<IOBuf>> pipeline;
+ Pipeline<IOBufQueue&, std::unique_ptr<IOBuf>> pipeline;
int called = 0;
pipeline
}
TEST(LengthFieldFrameDecoder, Adjustment) {
- ChannelPipeline<IOBufQueue&, std::unique_ptr<IOBuf>> pipeline;
+ Pipeline<IOBufQueue&, std::unique_ptr<IOBuf>> pipeline;
int called = 0;
pipeline
}
TEST(LengthFieldFrameDecoder, PreHeader) {
- ChannelPipeline<IOBufQueue&, std::unique_ptr<IOBuf>> pipeline;
+ Pipeline<IOBufQueue&, std::unique_ptr<IOBuf>> pipeline;
int called = 0;
pipeline
}
TEST(LengthFieldFrameDecoder, PostHeader) {
- ChannelPipeline<IOBufQueue&, std::unique_ptr<IOBuf>> pipeline;
+ Pipeline<IOBufQueue&, std::unique_ptr<IOBuf>> pipeline;
int called = 0;
pipeline
}
TEST(LengthFieldFrameDecoderStrip, PrePostHeader) {
- ChannelPipeline<IOBufQueue&, std::unique_ptr<IOBuf>> pipeline;
+ Pipeline<IOBufQueue&, std::unique_ptr<IOBuf>> pipeline;
int called = 0;
pipeline
}
TEST(LengthFieldFrameDecoder, StripPrePostHeaderFrameInclHeader) {
- ChannelPipeline<IOBufQueue&, std::unique_ptr<IOBuf>> pipeline;
+ Pipeline<IOBufQueue&, std::unique_ptr<IOBuf>> pipeline;
int called = 0;
pipeline
}
TEST(LengthFieldFrameDecoder, FailTestLengthFieldEndOffset) {
- ChannelPipeline<IOBufQueue&, std::unique_ptr<IOBuf>> pipeline;
+ Pipeline<IOBufQueue&, std::unique_ptr<IOBuf>> pipeline;
int called = 0;
pipeline
}
TEST(LengthFieldFrameDecoder, FailTestLengthFieldFrameSize) {
- ChannelPipeline<IOBufQueue&, std::unique_ptr<IOBuf>> pipeline;
+ Pipeline<IOBufQueue&, std::unique_ptr<IOBuf>> pipeline;
int called = 0;
pipeline
}
TEST(LengthFieldFrameDecoder, FailTestLengthFieldInitialBytes) {
- ChannelPipeline<IOBufQueue&, std::unique_ptr<IOBuf>> pipeline;
+ Pipeline<IOBufQueue&, std::unique_ptr<IOBuf>> pipeline;
int called = 0;
pipeline
}
TEST(LineBasedFrameDecoder, Simple) {
- ChannelPipeline<IOBufQueue&, std::unique_ptr<IOBuf>> pipeline;
+ Pipeline<IOBufQueue&, std::unique_ptr<IOBuf>> pipeline;
int called = 0;
pipeline
}
TEST(LineBasedFrameDecoder, SaveDelimiter) {
- ChannelPipeline<IOBufQueue&, std::unique_ptr<IOBuf>> pipeline;
+ Pipeline<IOBufQueue&, std::unique_ptr<IOBuf>> pipeline;
int called = 0;
pipeline
}
TEST(LineBasedFrameDecoder, Fail) {
- ChannelPipeline<IOBufQueue&, std::unique_ptr<IOBuf>> pipeline;
+ Pipeline<IOBufQueue&, std::unique_ptr<IOBuf>> pipeline;
int called = 0;
pipeline
}
TEST(LineBasedFrameDecoder, NewLineOnly) {
- ChannelPipeline<IOBufQueue&, std::unique_ptr<IOBuf>> pipeline;
+ Pipeline<IOBufQueue&, std::unique_ptr<IOBuf>> pipeline;
int called = 0;
pipeline
}
TEST(LineBasedFrameDecoder, CarriageNewLineOnly) {
- ChannelPipeline<IOBufQueue&, std::unique_ptr<IOBuf>> pipeline;
+ Pipeline<IOBufQueue&, std::unique_ptr<IOBuf>> pipeline;
int called = 0;
pipeline
#pragma once
-#include <folly/wangle/channel/ChannelHandler.h>
+#include <folly/wangle/channel/Handler.h>
namespace folly { namespace wangle {
/*
* StringCodec converts a pipeline from IOBufs to std::strings.
*/
-class StringCodec : public ChannelHandler<IOBufQueue&, std::string,
- std::string, std::unique_ptr<IOBuf>> {
+class StringCodec : public Handler<IOBufQueue&, std::string,
+ std::string, std::unique_ptr<IOBuf>> {
public:
- typedef typename ChannelHandler<
+ typedef typename Handler<
IOBufQueue&, std::string,
std::string, std::unique_ptr<IOBuf>>::Context Context;
*/
#pragma once
-#include <folly/wangle/channel/ChannelHandler.h>
+#include <folly/wangle/channel/Handler.h>
#include <folly/wangle/service/Service.h>
namespace folly { namespace wangle {
* only one request is allowed at a time.
*/
template <typename Pipeline, typename Req, typename Resp = Req>
-class SerialClientDispatcher : public ChannelHandlerAdapter<Req, Resp>
+class SerialClientDispatcher : public HandlerAdapter<Req, Resp>
, public Service<Req, Resp> {
public:
- typedef typename ChannelHandlerAdapter<Req, Resp>::Context Context;
+ typedef typename HandlerAdapter<Req, Resp>::Context Context;
void setPipeline(Pipeline* pipeline) {
pipeline_ = pipeline;
pipeline->addBack(
- ChannelHandlerPtr<SerialClientDispatcher<Pipeline, Req, Resp>, false>(
+ HandlerPtr<SerialClientDispatcher<Pipeline, Req, Resp>, false>(
this));
pipeline->finalize();
}
*/
#pragma once
-#include <folly/wangle/channel/ChannelHandler.h>
+#include <folly/wangle/channel/Handler.h>
#include <folly/wangle/service/Service.h>
namespace folly { namespace wangle {
* Concurrent requests are queued in the pipeline.
*/
template <typename Req, typename Resp = Req>
-class SerialServerDispatcher : public ChannelHandlerAdapter<Req, Resp> {
+class SerialServerDispatcher : public HandlerAdapter<Req, Resp> {
public:
- typedef typename ChannelHandlerAdapter<Req, Resp>::Context Context;
+ typedef typename HandlerAdapter<Req, Resp>::Context Context;
explicit SerialServerDispatcher(Service<Req, Resp>* service)
: service_(service) {}
#include <folly/wangle/bootstrap/ServerBootstrap.h>
#include <folly/wangle/bootstrap/ClientBootstrap.h>
-#include <folly/wangle/channel/ChannelPipeline.h>
+#include <folly/wangle/channel/Pipeline.h>
#include <folly/wangle/channel/AsyncSocketHandler.h>
namespace folly {
using namespace wangle;
-typedef ChannelPipeline<IOBufQueue&, std::string> Pipeline;
+typedef Pipeline<IOBufQueue&, std::string> ServicePipeline;
class EchoService : public Service<std::string, std::string> {
public:
template <typename Req, typename Resp>
class ServerPipelineFactory
- : public PipelineFactory<Pipeline> {
+ : public PipelineFactory<ServicePipeline> {
public:
- Pipeline* newPipeline(
+ ServicePipeline* newPipeline(
std::shared_ptr<AsyncSocket> socket) override {
- auto pipeline = new Pipeline();
+ auto pipeline = new ServicePipeline();
pipeline->addBack(AsyncSocketHandler(socket));
pipeline->addBack(StringCodec());
pipeline->addBack(SerialServerDispatcher<Req, Resp>(&service_));
};
template <typename Req, typename Resp>
-class ClientPipelineFactory : public PipelineFactory<Pipeline> {
+class ClientPipelineFactory : public PipelineFactory<ServicePipeline> {
public:
- Pipeline* newPipeline(
+ ServicePipeline* newPipeline(
std::shared_ptr<AsyncSocket> socket) override {
- auto pipeline = new Pipeline();
+ auto pipeline = new ServicePipeline();
pipeline->addBack(AsyncSocketHandler(socket));
pipeline->addBack(StringCodec());
pipeline->template getHandler<AsyncSocketHandler>(0)->attachReadCallback();
int port = 1234;
// server
- ServerBootstrap<Pipeline> server;
+ ServerBootstrap<ServicePipeline> server;
server.childPipeline(
std::make_shared<ServerPipelineFactory<std::string, std::string>>());
server.bind(port);
// client
- auto client = std::make_shared<ClientBootstrap<Pipeline>>();
- ClientServiceFactory<Pipeline, std::string, std::string> serviceFactory;
+ auto client = std::make_shared<ClientBootstrap<ServicePipeline>>();
+ ClientServiceFactory<ServicePipeline, std::string, std::string> serviceFactory;
client->pipelineFactory(
std::make_shared<ClientPipelineFactory<std::string, std::string>>());
SocketAddress addr("127.0.0.1", port);