--- /dev/null
+/*
+ * Copyright 2014 Facebook, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#pragma once
+
+#include <folly/wangle/Future.h>
+#include <folly/ExceptionWrapper.h>
+
+namespace folly { namespace wangle {
+
+template <class In, class Out>
+class ChannelHandlerContext {
+ public:
+ virtual ~ChannelHandlerContext() {}
+
+ virtual void fireRead(In msg) = 0;
+ virtual void fireReadEOF() = 0;
+ virtual void fireReadException(exception_wrapper e) = 0;
+
+ virtual Future<void> fireWrite(Out msg) = 0;
+ virtual Future<void> fireClose() = 0;
+
+ virtual std::shared_ptr<AsyncTransport> getTransport() = 0;
+
+ virtual void setWriteFlags(WriteFlags flags) = 0;
+ virtual WriteFlags getWriteFlags() = 0;
+
+ virtual void setReadBufferSettings(
+ uint64_t minAvailable,
+ uint64_t allocationSize) = 0;
+ virtual std::pair<uint64_t, uint64_t> getReadBufferSettings() = 0;
+
+ /* TODO
+ template <class H>
+ virtual void addHandlerBefore(H&&) {}
+ template <class H>
+ virtual void addHandlerAfter(H&&) {}
+ template <class H>
+ virtual void replaceHandler(H&&) {}
+ virtual void removeHandler() {}
+ */
+};
+
+class PipelineContext {
+ public:
+ virtual ~PipelineContext() {}
+
+ virtual void attachTransport() = 0;
+ virtual void detachTransport() = 0;
+
+ void link(PipelineContext* other) {
+ setNextIn(other);
+ other->setNextOut(this);
+ }
+
+ protected:
+ virtual void setNextIn(PipelineContext* ctx) = 0;
+ virtual void setNextOut(PipelineContext* ctx) = 0;
+};
+
+template <class In>
+class InboundChannelHandlerContext {
+ public:
+ virtual ~InboundChannelHandlerContext() {}
+ virtual void read(In msg) = 0;
+ virtual void readEOF() = 0;
+ virtual void readException(exception_wrapper e) = 0;
+};
+
+template <class Out>
+class OutboundChannelHandlerContext {
+ public:
+ virtual ~OutboundChannelHandlerContext() {}
+ virtual Future<void> write(Out msg) = 0;
+ virtual Future<void> close() = 0;
+};
+
+template <class P, class H>
+class ContextImpl : public ChannelHandlerContext<typename H::rout,
+ typename H::wout>,
+ public InboundChannelHandlerContext<typename H::rin>,
+ public OutboundChannelHandlerContext<typename H::win>,
+ public PipelineContext {
+ public:
+ typedef typename H::rin Rin;
+ typedef typename H::rout Rout;
+ typedef typename H::win Win;
+ typedef typename H::wout Wout;
+
+ template <class HandlerArg>
+ explicit ContextImpl(P* pipeline, HandlerArg&& handlerArg)
+ : pipeline_(pipeline),
+ handler_(std::forward<HandlerArg>(handlerArg)) {
+ handler_.attachPipeline(this);
+ }
+
+ ~ContextImpl() {
+ handler_.detachPipeline(this);
+ }
+
+ H* getHandler() {
+ return &handler_;
+ }
+
+ // PipelineContext overrides
+ void setNextIn(PipelineContext* ctx) override {
+ auto nextIn = dynamic_cast<InboundChannelHandlerContext<Rout>*>(ctx);
+ if (nextIn) {
+ nextIn_ = nextIn;
+ } else {
+ throw std::invalid_argument("wrong type in setNextIn");
+ }
+ }
+
+ void setNextOut(PipelineContext* ctx) override {
+ auto nextOut = dynamic_cast<OutboundChannelHandlerContext<Wout>*>(ctx);
+ if (nextOut) {
+ nextOut_ = nextOut;
+ } else {
+ throw std::invalid_argument("wrong type in setNextOut");
+ }
+ }
+
+ void attachTransport() override {
+ typename P::DestructorGuard dg(static_cast<DelayedDestruction*>(pipeline_));
+ handler_.attachTransport(this);
+ }
+
+ void detachTransport() override {
+ typename P::DestructorGuard dg(static_cast<DelayedDestruction*>(pipeline_));
+ handler_.detachTransport(this);
+ }
+
+ // ChannelHandlerContext overrides
+ void fireRead(Rout msg) override {
+ typename P::DestructorGuard dg(static_cast<DelayedDestruction*>(pipeline_));
+ if (nextIn_) {
+ nextIn_->read(std::forward<Rout>(msg));
+ } else {
+ LOG(WARNING) << "read reached end of pipeline";
+ }
+ }
+
+ void fireReadEOF() override {
+ typename P::DestructorGuard dg(static_cast<DelayedDestruction*>(pipeline_));
+ if (nextIn_) {
+ nextIn_->readEOF();
+ } else {
+ LOG(WARNING) << "readEOF reached end of pipeline";
+ }
+ }
+
+ void fireReadException(exception_wrapper e) override {
+ typename P::DestructorGuard dg(static_cast<DelayedDestruction*>(pipeline_));
+ if (nextIn_) {
+ nextIn_->readException(std::move(e));
+ } else {
+ LOG(WARNING) << "readException reached end of pipeline";
+ }
+ }
+
+ Future<void> fireWrite(Wout msg) override {
+ typename P::DestructorGuard dg(static_cast<DelayedDestruction*>(pipeline_));
+ if (nextOut_) {
+ return nextOut_->write(std::forward<Wout>(msg));
+ } else {
+ LOG(WARNING) << "write reached end of pipeline";
+ return makeFuture();
+ }
+ }
+
+ Future<void> fireClose() override {
+ typename P::DestructorGuard dg(static_cast<DelayedDestruction*>(pipeline_));
+ if (nextOut_) {
+ return nextOut_->close();
+ } else {
+ LOG(WARNING) << "close reached end of pipeline";
+ return makeFuture();
+ }
+ }
+
+ std::shared_ptr<AsyncTransport> getTransport() override {
+ return pipeline_->getTransport();
+ }
+
+ void setWriteFlags(WriteFlags flags) override {
+ pipeline_->setWriteFlags(flags);
+ }
+
+ WriteFlags getWriteFlags() override {
+ return pipeline_->getWriteFlags();
+ }
+
+ void setReadBufferSettings(
+ uint64_t minAvailable,
+ uint64_t allocationSize) override {
+ pipeline_->setReadBufferSettings(minAvailable, allocationSize);
+ }
+
+ std::pair<uint64_t, uint64_t> getReadBufferSettings() override {
+ return pipeline_->getReadBufferSettings();
+ }
+
+ // InboundChannelHandlerContext overrides
+ void read(Rin msg) override {
+ typename P::DestructorGuard dg(static_cast<DelayedDestruction*>(pipeline_));
+ handler_.read(this, std::forward<Rin>(msg));
+ }
+
+ void readEOF() override {
+ typename P::DestructorGuard dg(static_cast<DelayedDestruction*>(pipeline_));
+ handler_.readEOF(this);
+ }
+
+ void readException(exception_wrapper e) override {
+ typename P::DestructorGuard dg(static_cast<DelayedDestruction*>(pipeline_));
+ handler_.readException(this, std::move(e));
+ }
+
+ // OutboundChannelHandlerContext overrides
+ Future<void> write(Win msg) override {
+ typename P::DestructorGuard dg(static_cast<DelayedDestruction*>(pipeline_));
+ return handler_.write(this, std::forward<Win>(msg));
+ }
+
+ Future<void> close() override {
+ typename P::DestructorGuard dg(static_cast<DelayedDestruction*>(pipeline_));
+ return handler_.close(this);
+ }
+
+ private:
+ P* pipeline_;
+ H handler_;
+ InboundChannelHandlerContext<Rout>* nextIn_{nullptr};
+ OutboundChannelHandlerContext<Wout>* nextOut_{nullptr};
+};
+
+}}
#pragma once
+#include <folly/experimental/wangle/channel/ChannelHandlerContext.h>
#include <folly/wangle/Future.h>
+#include <folly/io/async/AsyncTransport.h>
#include <folly/io/async/DelayedDestruction.h>
#include <folly/ExceptionWrapper.h>
+#include <folly/Memory.h>
#include <glog/logging.h>
-#include <thrift/lib/cpp/async/TAsyncTransport.h>
namespace folly { namespace wangle {
-template <class In, class Out>
-class ChannelHandlerContext {
- public:
- virtual ~ChannelHandlerContext() {}
-
- virtual void fireRead(In msg) = 0;
- virtual void fireReadEOF() = 0;
- virtual void fireReadException(exception_wrapper e) = 0;
-
- virtual Future<void> fireWrite(Out msg) = 0;
- virtual Future<void> fireClose() = 0;
-
- virtual std::shared_ptr<AsyncTransport> getTransport() = 0;
-
- virtual void setWriteFlags(WriteFlags flags) = 0;
- virtual WriteFlags getWriteFlags() = 0;
-
- virtual void setReadBufferSettings(
- uint64_t minAvailable,
- uint64_t allocationSize) = 0;
- virtual std::pair<uint64_t, uint64_t> getReadBufferSettings() = 0;
-};
+template <class R, class W, class... Handlers>
+class ChannelPipeline;
-template <class Out>
-class OutboundChannelHandlerContext {
+template <class R, class W>
+class ChannelPipeline<R, W> : public DelayedDestruction {
public:
- virtual ~OutboundChannelHandlerContext() {}
- virtual Future<void> write(Out msg) = 0;
- virtual Future<void> close() = 0;
-};
+ ChannelPipeline() {}
+ ~ChannelPipeline() {}
-template <class... Handlers>
-class ChannelPipeline;
+ std::shared_ptr<AsyncTransport> getTransport() {
+ return transport_;
+ }
-template <>
-class ChannelPipeline<> : public DelayedDestruction {
- public:
void setWriteFlags(WriteFlags flags) {
writeFlags_ = flags;
}
return readBufferSettings_;
}
- protected:
- static const bool is_end{true};
- typedef void LastHandler;
- typedef void OutboundContext;
-
- std::shared_ptr<AsyncTransport> transport_;
- WriteFlags writeFlags_{WriteFlags::NONE};
- std::pair<uint64_t, uint64_t> readBufferSettings_{2048, 2048};
-
- ~ChannelPipeline() {}
-
- template <class T>
- void read(T&& msg) {
- LOG(FATAL) << "impossibru";
+ void read(R msg) {
+ front_->read(std::forward<R>(msg));
}
void readEOF() {
- LOG(FATAL) << "impossibru";
+ front_->readEOF();
}
void readException(exception_wrapper e) {
- LOG(FATAL) << "impossibru";
+ front_->readException(std::move(e));
}
- template <class T>
- Future<void> write(T&& msg) {
- LOG(FATAL) << "impossibru";
- return makeFuture();
+ Future<void> write(W msg) {
+ return back_->write(std::forward<W>(msg));
}
Future<void> close() {
- LOG(FATAL) << "impossibru";
- return makeFuture();
+ return back_->close();
}
+ template <class H>
+ ChannelPipeline& addBack(H&& handler) {
+ ctxs_.push_back(folly::make_unique<ContextImpl<ChannelPipeline, H>>(
+ this, std::forward<H>(handler)));
+ return *this;
+ }
+
+ template <class H>
+ ChannelPipeline& addFront(H&& handler) {
+ ctxs_.insert(0, folly::make_unique<ContextImpl<ChannelPipeline, H>>(
+ this, std::forward<H>(handler)));
+ return *this;
+ }
+
+ template <class H>
+ H* getHandler(int i) {
+ auto ctx = dynamic_cast<ContextImpl<ChannelPipeline, H>*>(ctxs_[i].get());
+ CHECK(ctx);
+ return ctx->getHandler();
+ }
+
+ void finalize() {
+ finalizeHelper();
+ InboundChannelHandlerContext<R>* front;
+ front_ = dynamic_cast<InboundChannelHandlerContext<R>*>(
+ ctxs_.front().get());
+ if (!front_) {
+ throw std::invalid_argument("wrong type for first handler");
+ }
+ }
+
+ protected:
+ explicit ChannelPipeline(bool shouldFinalize) {
+ CHECK(!shouldFinalize);
+ }
+
+ void finalizeHelper() {
+ if (ctxs_.empty()) {
+ return;
+ }
+
+ for (int i = 0; i < ctxs_.size() - 1; i++) {
+ ctxs_[i]->link(ctxs_[i+1].get());
+ }
+
+ back_ = dynamic_cast<OutboundChannelHandlerContext<W>*>(ctxs_.back().get());
+ if (!back_) {
+ throw std::invalid_argument("wrong type for last handler");
+ }
+ }
+
+ PipelineContext* getLocalFront() {
+ return ctxs_.empty() ? nullptr : ctxs_.front().get();
+ }
+
+ static const bool is_end{true};
+
+ std::shared_ptr<AsyncTransport> transport_;
+ WriteFlags writeFlags_{WriteFlags::NONE};
+ std::pair<uint64_t, uint64_t> readBufferSettings_{2048, 2048};
+
void attachPipeline() {}
void attachTransport(
transport_ = nullptr;
}
- template <class T>
- void setOutboundContext(T ctx) {}
+ OutboundChannelHandlerContext<W>* back_{nullptr};
- template <class H>
- H* getHandler(size_t i) {
- LOG(FATAL) << "impossibru";
- }
+ private:
+ InboundChannelHandlerContext<R>* front_{nullptr};
+ std::vector<std::unique_ptr<PipelineContext>> ctxs_;
};
-template <class Handler, class... Handlers>
-class ChannelPipeline<Handler, Handlers...>
- : public ChannelPipeline<Handlers...> {
+template <class R, class W, class Handler, class... Handlers>
+class ChannelPipeline<R, W, Handler, Handlers...>
+ : public ChannelPipeline<R, W, Handlers...> {
protected:
- typedef typename std::conditional<
- ChannelPipeline<Handlers...>::is_end,
- Handler,
- typename ChannelPipeline<Handlers...>::LastHandler>::type
- LastHandler;
-
- public:
template <class HandlerArg, class... HandlersArgs>
- ChannelPipeline(HandlerArg&& handlerArg, HandlersArgs&&... handlersArgs)
- : ChannelPipeline<Handlers...>(std::forward<HandlersArgs>(handlersArgs)...),
- handler_(std::forward<HandlerArg>(handlerArg)),
- ctx_(this) {
- handler_.attachPipeline(&ctx_);
- ChannelPipeline<Handlers...>::setOutboundContext(&ctx_);
+ ChannelPipeline(
+ bool shouldFinalize,
+ HandlerArg&& handlerArg,
+ HandlersArgs&&... handlersArgs)
+ : ChannelPipeline<R, W, Handlers...>(
+ false,
+ std::forward<HandlersArgs>(handlersArgs)...),
+ ctx_(this, std::forward<HandlerArg>(handlerArg)) {
+ if (shouldFinalize) {
+ finalize();
+ }
}
+ public:
+ template <class... HandlersArgs>
+ explicit ChannelPipeline(HandlersArgs&&... handlersArgs)
+ : ChannelPipeline(true, std::forward<HandlersArgs>(handlersArgs)...) {}
~ChannelPipeline() {}
- void destroy() override {
- handler_.detachPipeline(&ctx_);
- }
+ void destroy() override { }
- void read(typename Handler::rin msg) {
- ChannelPipeline<>::DestructorGuard dg(
+ void read(R msg) {
+ typename ChannelPipeline<R, W>::DestructorGuard dg(
static_cast<DelayedDestruction*>(this));
- handler_.read(&ctx_, std::forward<typename Handler::rin>(msg));
+ front_->read(std::forward<R>(msg));
}
void readEOF() {
- ChannelPipeline<>::DestructorGuard dg(
+ typename ChannelPipeline<R, W>::DestructorGuard dg(
static_cast<DelayedDestruction*>(this));
- handler_.readEOF(&ctx_);
+ front_->readEOF();
}
void readException(exception_wrapper e) {
- ChannelPipeline<>::DestructorGuard dg(
+ typename ChannelPipeline<R, W>::DestructorGuard dg(
static_cast<DelayedDestruction*>(this));
- handler_.readException(&ctx_, std::move(e));
+ front_->readEOF(std::move(e));
}
- Future<void> write(typename LastHandler::win msg) {
- ChannelPipeline<>::DestructorGuard dg(
+ Future<void> write(W msg) {
+ typename ChannelPipeline<R, W>::DestructorGuard dg(
static_cast<DelayedDestruction*>(this));
- return ChannelPipeline<LastHandler>::writeHere(
- std::forward<typename LastHandler::win>(msg));
+ return back_->write(std::forward<W>(msg));
}
Future<void> close() {
- ChannelPipeline<>::DestructorGuard dg(
+ typename ChannelPipeline<R, W>::DestructorGuard dg(
static_cast<DelayedDestruction*>(this));
- return ChannelPipeline<LastHandler>::closeHere();
+ return back_->close();
}
void attachTransport(
std::shared_ptr<AsyncTransport> transport) {
- ChannelPipeline<>::DestructorGuard dg(
+ typename ChannelPipeline<R, W>::DestructorGuard dg(
static_cast<DelayedDestruction*>(this));
- CHECK(!ChannelPipeline<>::transport_);
- ChannelPipeline<Handlers...>::attachTransport(std::move(transport));
- handler_.attachTransport(&ctx_);
+ CHECK((!ChannelPipeline<R, W>::transport_));
+ ChannelPipeline<R, W, Handlers...>::attachTransport(std::move(transport));
+ forEachCtx([&](PipelineContext* ctx){
+ ctx->attachTransport();
+ });
}
void detachTransport() {
- ChannelPipeline<>::DestructorGuard dg(
+ typename ChannelPipeline<R, W>::DestructorGuard dg(
static_cast<DelayedDestruction*>(this));
- ChannelPipeline<Handlers...>::detachTransport();
- handler_.detachTransport(&ctx_);
+ ChannelPipeline<R, W, Handlers...>::detachTransport();
+ forEachCtx([&](PipelineContext* ctx){
+ ctx->detachTransport();
+ });
}
std::shared_ptr<AsyncTransport> getTransport() {
- return ChannelPipeline<>::transport_;
+ return ChannelPipeline<R, W>::transport_;
}
template <class H>
- H* getHandler(size_t i) {
- if (i == 0) {
- auto ptr = dynamic_cast<H*>(&handler_);
- CHECK(ptr);
- return ptr;
- } else {
- return ChannelPipeline<Handlers...>::template getHandler<H>(i-1);
- }
- }
-
- protected:
- static const bool is_end{false};
-
- typedef OutboundChannelHandlerContext<typename Handler::wout> OutboundContext;
-
- void setOutboundContext(OutboundContext* ctx) {
- outboundCtx_ = ctx;
- }
-
- Future<void> writeHere(typename Handler::win msg) {
- return handler_.write(&ctx_, std::forward<typename Handler::win>(msg));
+ ChannelPipeline& addBack(H&& handler) {
+ ChannelPipeline<R, W>::addBack(std::move(handler));
+ return *this;
}
- Future<void> closeHere() {
- return handler_.close(&ctx_);
+ template <class H>
+ ChannelPipeline& addFront(H&& handler) {
+ ctxs_.insert(0, folly::make_unique<ContextImpl<ChannelPipeline, H>>(
+ this, std::move(handler)));
+ return *this;
}
- private:
- class Context
- : public ChannelHandlerContext<typename Handler::rout,
- typename Handler::wout>,
- public OutboundChannelHandlerContext<typename Handler::win> {
- public:
- explicit Context(ChannelPipeline* pipeline) : pipeline_(pipeline) {}
- ChannelPipeline* pipeline_;
-
- void fireRead(typename Handler::rout msg) override {
- ChannelPipeline<>::DestructorGuard dg(pipeline_);
- pipeline_->fireRead(std::forward<typename Handler::rout>(msg));
- }
-
- void fireReadEOF() override {
- ChannelPipeline<>::DestructorGuard dg(pipeline_);
- return pipeline_->fireReadEOF();
- }
-
- void fireReadException(exception_wrapper e) override {
- ChannelPipeline<>::DestructorGuard dg(pipeline_);
- return pipeline_->fireReadException(std::move(e));
- }
-
- Future<void> fireWrite(typename Handler::wout msg) override {
- ChannelPipeline<>::DestructorGuard dg(pipeline_);
- return pipeline_->fireWrite(std::forward<typename Handler::wout>(msg));
- }
-
- Future<void> write(typename Handler::win msg) override {
- ChannelPipeline<>::DestructorGuard dg(pipeline_);
- return pipeline_->writeHere(std::forward<typename Handler::win>(msg));
- }
-
- Future<void> fireClose() override {
- ChannelPipeline<>::DestructorGuard dg(pipeline_);
- return pipeline_->fireClose();
- }
-
- Future<void> close() override {
- ChannelPipeline<>::DestructorGuard dg(pipeline_);
- return pipeline_->closeHere();
- }
-
- std::shared_ptr<AsyncTransport> getTransport() override {
- return pipeline_->transport_;
- }
-
- void setWriteFlags(WriteFlags flags) override {
- pipeline_->setWriteFlags(flags);
- }
-
- WriteFlags getWriteFlags() override {
- return pipeline_->getWriteFlags();
+ template <class H>
+ H* getHandler(size_t i) {
+ if (i > ctxs_.size()) {
+ return ChannelPipeline<R, W, Handlers...>::template getHandler<H>(
+ i - (ctxs_.size() + 1));
+ } else {
+ auto pctx = (i == ctxs_.size()) ? &ctx_ : ctxs_[i].get();
+ auto ctx = dynamic_cast<ContextImpl<ChannelPipeline, H>*>(pctx);
+ return ctx->getHandler();
}
+ }
- void setReadBufferSettings(
- uint64_t minAvailable,
- uint64_t allocationSize) override {
- pipeline_->setReadBufferSettings(minAvailable, allocationSize);
+ void finalize() {
+ finalizeHelper();
+ auto ctx = ctxs_.empty() ? &ctx_ : ctxs_.front().get();
+ front_ = dynamic_cast<InboundChannelHandlerContext<R>*>(ctx);
+ if (!front_) {
+ throw std::invalid_argument("wrong type for first handler");
}
+ }
- std::pair<uint64_t, uint64_t> getReadBufferSettings() override {
- return pipeline_->getReadBufferSettings();
+ protected:
+ void finalizeHelper() {
+ ChannelPipeline<R, W, Handlers...>::finalizeHelper();
+ back_ = ChannelPipeline<R, W, Handlers...>::back_;
+ if (!back_) {
+ auto is_end = ChannelPipeline<R, W, Handlers...>::is_end;
+ CHECK(is_end);
+ back_ = dynamic_cast<OutboundChannelHandlerContext<W>*>(&ctx_);
+ if (!back_) {
+ throw std::invalid_argument("wrong type for last handler");
+ }
}
- };
- void fireRead(typename Handler::rout msg) {
- if (!ChannelPipeline<Handlers...>::is_end) {
- ChannelPipeline<Handlers...>::read(
- std::forward<typename Handler::rout>(msg));
- } else {
- LOG(WARNING) << "read() reached end of pipeline";
+ if (!ctxs_.empty()) {
+ for (int i = 0; i < ctxs_.size() - 1; i++) {
+ ctxs_[i]->link(ctxs_[i+1].get());
+ }
+ ctxs_.back()->link(&ctx_);
}
- }
- void fireReadEOF() {
- if (!ChannelPipeline<Handlers...>::is_end) {
- ChannelPipeline<Handlers...>::readEOF();
- } else {
- LOG(WARNING) << "readEOF() reached end of pipeline";
+ auto nextFront = ChannelPipeline<R, W, Handlers...>::getLocalFront();
+ if (nextFront) {
+ ctx_.link(nextFront);
}
}
- void fireReadException(exception_wrapper e) {
- if (!ChannelPipeline<Handlers...>::is_end) {
- ChannelPipeline<Handlers...>::readException(std::move(e));
- } else {
- LOG(WARNING) << "readException() reached end of pipeline";
- }
+ PipelineContext* getLocalFront() {
+ return ctxs_.empty() ? &ctx_ : ctxs_.front().get();
}
- Future<void> fireWrite(typename Handler::wout msg) {
- if (outboundCtx_) {
- return outboundCtx_->write(std::forward<typename Handler::wout>(msg));
- } else {
- LOG(WARNING) << "write() reached end of pipeline";
- return makeFuture();
- }
- }
+ static const bool is_end{false};
+ InboundChannelHandlerContext<R>* front_{nullptr};
+ OutboundChannelHandlerContext<W>* back_{nullptr};
- Future<void> fireClose() {
- if (outboundCtx_) {
- return outboundCtx_->close();
- } else {
- LOG(WARNING) << "close() reached end of pipeline";
- return makeFuture();
+ private:
+ template <class F>
+ void forEachCtx(const F& func) {
+ for (auto& ctx : ctxs_) {
+ func(ctx.get());
}
+ func(&ctx_);
}
- friend class Context;
- Handler handler_;
- Context ctx_;
- OutboundContext* outboundCtx_{nullptr};
+ ContextImpl<ChannelPipeline, Handler> ctx_;
+ std::vector<std::unique_ptr<PipelineContext>> ctxs_;
};
}}