template <class Rin, class Rout = Rin, class Win = Rout, class Wout = Rin>
class Handler : public HandlerBase<HandlerContext<Rout, Wout>> {
public:
+ static const HandlerDir dir = HandlerDir::BOTH;
+
typedef Rin rin;
typedef Rout rout;
typedef Win win;
*/
};
+struct Unit{};
+
+template <class Rin, class Rout = Rin>
+class InboundHandler : public HandlerBase<InboundHandlerContext<Rout>> {
+ public:
+ static const HandlerDir dir = HandlerDir::IN;
+
+ typedef Rin rin;
+ typedef Rout rout;
+ typedef Unit win;
+ typedef Unit wout;
+ typedef InboundHandlerContext<Rout> Context;
+ virtual ~InboundHandler() {}
+
+ virtual void read(Context* ctx, Rin msg) = 0;
+ virtual void readEOF(Context* ctx) {
+ ctx->fireReadEOF();
+ }
+ virtual void readException(Context* ctx, exception_wrapper e) {
+ ctx->fireReadException(std::move(e));
+ }
+};
+
+template <class Win, class Wout = Win>
+class OutboundHandler : public HandlerBase<OutboundHandlerContext<Wout>> {
+ public:
+ static const HandlerDir dir = HandlerDir::OUT;
+
+ typedef Unit rin;
+ typedef Unit rout;
+ typedef Win win;
+ typedef Wout wout;
+ typedef OutboundHandlerContext<Wout> Context;
+ virtual ~OutboundHandler() {}
+
+ virtual Future<void> write(Context* ctx, Win msg) = 0;
+ virtual Future<void> close(Context* ctx) {
+ return ctx->fireClose();
+ }
+};
+
template <class R, class W = R>
class HandlerAdapter : public Handler<R, R, W, W> {
public:
typedef HandlerAdapter<IOBufQueue&, std::unique_ptr<IOBuf>>
BytesToBytesHandler;
+typedef InboundHandler<IOBufQueue&>
+InboundBytesToBytesHandler;
+
+typedef OutboundHandler<std::unique_ptr<IOBuf>>
+OutboundBytesToBytesHandler;
+
}}
*/
};
+template <class In>
+class InboundHandlerContext {
+ public:
+ virtual ~InboundHandlerContext() {}
+
+ virtual void fireRead(In msg) = 0;
+ virtual void fireReadEOF() = 0;
+ virtual void fireReadException(exception_wrapper e) = 0;
+
+ virtual std::shared_ptr<AsyncTransport> getTransport() = 0;
+
+ // TODO Need get/set writeFlags, readBufferSettings? Probably not.
+ // Do we even really need them stored in the pipeline at all?
+ // Could just always delegate to the socket impl
+};
+
+template <class Out>
+class OutboundHandlerContext {
+ public:
+ virtual ~OutboundHandlerContext() {}
+
+ virtual Future<void> fireWrite(Out msg) = 0;
+ virtual Future<void> fireClose() = 0;
+
+ virtual std::shared_ptr<AsyncTransport> getTransport() = 0;
+};
+
+enum class HandlerDir {
+ IN,
+ OUT,
+ BOTH
+};
+
class PipelineContext {
public:
virtual ~PipelineContext() {}
}
}
- void link(PipelineContext* other) {
- setNextIn(other);
- other->setNextOut(this);
- }
-
- protected:
virtual void setNextIn(PipelineContext* ctx) = 0;
virtual void setNextOut(PipelineContext* ctx) = 0;
};
if (nextIn) {
nextIn_ = nextIn;
} else {
- throw std::invalid_argument("wrong type in setNextIn");
+ throw std::invalid_argument("inbound type mismatch");
}
}
if (nextOut) {
nextOut_ = nextOut;
} else {
- throw std::invalid_argument("wrong type in setNextOut");
+ throw std::invalid_argument("outbound type mismatch");
}
}
};
template <class P, class H>
-class ContextImpl : public HandlerContext<typename H::rout,
- typename H::wout>,
- public InboundLink<typename H::rin>,
- public OutboundLink<typename H::win>,
- public ContextImplBase<P, H, HandlerContext<typename H::rout, typename H::wout>> {
+class ContextImpl
+ : public HandlerContext<typename H::rout,
+ typename H::wout>,
+ public InboundLink<typename H::rin>,
+ public OutboundLink<typename H::win>,
+ public ContextImplBase<P, H, HandlerContext<typename H::rout,
+ typename H::wout>> {
public:
typedef typename H::rin Rin;
typedef typename H::rout Rout;
typedef typename H::win Win;
typedef typename H::wout Wout;
+ static const HandlerDir dir = HandlerDir::BOTH;
explicit ContextImpl(P* pipeline, std::shared_ptr<H> handler) {
this->impl_ = this;
using DestructorGuard = typename P::DestructorGuard;
};
+template <class P, class H>
+class InboundContextImpl
+ : public InboundHandlerContext<typename H::rout>,
+ public InboundLink<typename H::rin>,
+ public ContextImplBase<P, H, InboundHandlerContext<typename H::rout>> {
+ public:
+ typedef typename H::rin Rin;
+ typedef typename H::rout Rout;
+ typedef typename H::win Win;
+ typedef typename H::wout Wout;
+ static const HandlerDir dir = HandlerDir::IN;
+
+ explicit InboundContextImpl(P* pipeline, std::shared_ptr<H> handler) {
+ this->impl_ = this;
+ this->initialize(pipeline, std::move(handler));
+ }
+
+ // For StaticPipeline
+ InboundContextImpl() {
+ this->impl_ = this;
+ }
+
+ ~InboundContextImpl() {}
+
+ // InboundHandlerContext overrides
+ void fireRead(Rout msg) override {
+ DestructorGuard dg(this->pipeline_);
+ if (this->nextIn_) {
+ this->nextIn_->read(std::forward<Rout>(msg));
+ } else {
+ LOG(WARNING) << "read reached end of pipeline";
+ }
+ }
+
+ void fireReadEOF() override {
+ DestructorGuard dg(this->pipeline_);
+ if (this->nextIn_) {
+ this->nextIn_->readEOF();
+ } else {
+ LOG(WARNING) << "readEOF reached end of pipeline";
+ }
+ }
+
+ void fireReadException(exception_wrapper e) override {
+ DestructorGuard dg(this->pipeline_);
+ if (this->nextIn_) {
+ this->nextIn_->readException(std::move(e));
+ } else {
+ LOG(WARNING) << "readException reached end of pipeline";
+ }
+ }
+
+ std::shared_ptr<AsyncTransport> getTransport() override {
+ return this->pipeline_->getTransport();
+ }
+
+ // InboundLink overrides
+ void read(Rin msg) override {
+ DestructorGuard dg(this->pipeline_);
+ this->handler_->read(this, std::forward<Rin>(msg));
+ }
+
+ void readEOF() override {
+ DestructorGuard dg(this->pipeline_);
+ this->handler_->readEOF(this);
+ }
+
+ void readException(exception_wrapper e) override {
+ DestructorGuard dg(this->pipeline_);
+ this->handler_->readException(this, std::move(e));
+ }
+
+ private:
+ using DestructorGuard = typename P::DestructorGuard;
+};
+
+template <class P, class H>
+class OutboundContextImpl
+ : public OutboundHandlerContext<typename H::wout>,
+ public OutboundLink<typename H::win>,
+ public ContextImplBase<P, H, OutboundHandlerContext<typename H::wout>> {
+ public:
+ typedef typename H::rin Rin;
+ typedef typename H::rout Rout;
+ typedef typename H::win Win;
+ typedef typename H::wout Wout;
+ static const HandlerDir dir = HandlerDir::OUT;
+
+ explicit OutboundContextImpl(P* pipeline, std::shared_ptr<H> handler) {
+ this->impl_ = this;
+ this->initialize(pipeline, std::move(handler));
+ }
+
+ // For StaticPipeline
+ OutboundContextImpl() {
+ this->impl_ = this;
+ }
+
+ ~OutboundContextImpl() {}
+
+ // OutboundHandlerContext overrides
+ Future<void> fireWrite(Wout msg) override {
+ DestructorGuard dg(this->pipeline_);
+ if (this->nextOut_) {
+ return this->nextOut_->write(std::forward<Wout>(msg));
+ } else {
+ LOG(WARNING) << "write reached end of pipeline";
+ return makeFuture();
+ }
+ }
+
+ Future<void> fireClose() override {
+ DestructorGuard dg(this->pipeline_);
+ if (this->nextOut_) {
+ return this->nextOut_->close();
+ } else {
+ LOG(WARNING) << "close reached end of pipeline";
+ return makeFuture();
+ }
+ }
+
+ std::shared_ptr<AsyncTransport> getTransport() override {
+ return this->pipeline_->getTransport();
+ }
+
+ // OutboundLink overrides
+ Future<void> write(Win msg) override {
+ DestructorGuard dg(this->pipeline_);
+ return this->handler_->write(this, std::forward<Win>(msg));
+ }
+
+ Future<void> close() override {
+ DestructorGuard dg(this->pipeline_);
+ return this->handler_->close(this);
+ }
+
+ private:
+ using DestructorGuard = typename P::DestructorGuard;
+};
+
+template <class Handler, class Pipeline>
+struct ContextType {
+ typedef typename std::conditional<
+ Handler::dir == HandlerDir::BOTH,
+ ContextImpl<Pipeline, Handler>,
+ typename std::conditional<
+ Handler::dir == HandlerDir::IN,
+ InboundContextImpl<Pipeline, Handler>,
+ OutboundContextImpl<Pipeline, Handler>
+ >::type>::type
+ type;
+};
+
}}
*
* This handler may only be used in a single Pipeline.
*/
-class OutputBufferingHandler : public BytesToBytesHandler,
+class OutputBufferingHandler : public OutboundBytesToBytesHandler,
protected EventBase::LoopCallback {
public:
Future<void> write(Context* ctx, std::unique_ptr<IOBuf> buf) override {
template <class H>
Pipeline& addBack(std::shared_ptr<H> handler) {
- ctxs_.push_back(std::make_shared<ContextImpl<Pipeline, H>>(
- this,
- std::move(handler)));
- return *this;
- }
-
- template <class H>
- Pipeline& addBack(H* handler) {
- return addBack(std::shared_ptr<H>(handler, [](H*){}));
+ typedef typename ContextType<H, Pipeline>::type Context;
+ return addHelper(std::make_shared<Context>(this, std::move(handler)), false);
}
template <class H>
}
template <class H>
- Pipeline& addFront(std::shared_ptr<H> handler) {
- ctxs_.insert(
- ctxs_.begin(),
- std::make_shared<ContextImpl<Pipeline, H>>(this, std::move(handler)));
- return *this;
+ Pipeline& addBack(H* handler) {
+ return addBack(std::shared_ptr<H>(handler, [](H*){}));
}
template <class H>
- Pipeline& addFront(H* handler) {
- return addFront(std::shared_ptr<H>(handler, [](H*){}));
+ Pipeline& addFront(std::shared_ptr<H> handler) {
+ typedef typename ContextType<H, Pipeline>::type Context;
+ return addHelper(std::make_shared<Context>(this, std::move(handler)), true);
}
template <class H>
return addFront(std::make_shared<H>(std::forward<H>(handler)));
}
+ template <class H>
+ Pipeline& addFront(H* handler) {
+ return addFront(std::shared_ptr<H>(handler, [](H*){}));
+ }
+
template <class H>
H* getHandler(int i) {
- auto ctx = dynamic_cast<ContextImpl<Pipeline, H>*>(ctxs_[i].get());
+ typedef typename ContextType<H, Pipeline>::type Context;
+ auto ctx = dynamic_cast<Context*>(ctxs_[i].get());
CHECK(ctx);
return ctx->getHandler();
}
+ // TODO Have read/write/etc check that pipeline has been finalized
void finalize() {
- if (ctxs_.empty()) {
- return;
- }
-
- for (size_t i = 0; i < ctxs_.size() - 1; i++) {
- ctxs_[i]->link(ctxs_[i+1].get());
+ if (!inCtxs_.empty()) {
+ front_ = dynamic_cast<InboundLink<R>*>(inCtxs_.front());
+ for (size_t i = 0; i < inCtxs_.size() - 1; i++) {
+ inCtxs_[i]->setNextIn(inCtxs_[i+1]);
+ }
}
- back_ = dynamic_cast<OutboundLink<W>*>(ctxs_.back().get());
- if (!back_) {
- throw std::invalid_argument("wrong type for last handler");
+ if (!outCtxs_.empty()) {
+ back_ = dynamic_cast<OutboundLink<W>*>(outCtxs_.back());
+ for (size_t i = outCtxs_.size() - 1; i > 0; i--) {
+ outCtxs_[i]->setNextOut(outCtxs_[i-1]);
+ }
}
- front_ = dynamic_cast<InboundLink<R>*>(ctxs_.front().get());
if (!front_) {
- throw std::invalid_argument("wrong type for first handler");
+ throw std::invalid_argument("no inbound handler in Pipeline");
+ }
+ if (!back_) {
+ throw std::invalid_argument("no outbound handler in Pipeline");
}
for (auto it = ctxs_.rbegin(); it != ctxs_.rend(); it++) {
// See thrift/lib/cpp2/async/Cpp2Channel.cpp for an example
template <class H>
bool setOwner(H* handler) {
+ typedef typename ContextType<H, Pipeline>::type Context;
for (auto& ctx : ctxs_) {
- auto ctxImpl = dynamic_cast<ContextImpl<Pipeline, H>*>(ctx.get());
+ auto ctxImpl = dynamic_cast<Context*>(ctx.get());
if (ctxImpl && ctxImpl->getHandler() == handler) {
owner_ = ctx;
return true;
}
template <class Context>
- void addContextFront(Context* context) {
- ctxs_.insert(
- ctxs_.begin(),
- std::shared_ptr<Context>(context, [](Context*){}));
+ void addContextFront(Context* ctx) {
+ addHelper(std::shared_ptr<Context>(ctx, [](Context*){}), true);
}
void detachHandlers() {
}
private:
+ template <class Context>
+ Pipeline& addHelper(std::shared_ptr<Context>&& ctx, bool front) {
+ ctxs_.insert(front ? ctxs_.begin() : ctxs_.end(), ctx);
+ if (Context::dir == HandlerDir::BOTH || Context::dir == HandlerDir::IN) {
+ inCtxs_.insert(front ? inCtxs_.begin() : inCtxs_.end(), ctx.get());
+ }
+ if (Context::dir == HandlerDir::BOTH || Context::dir == HandlerDir::OUT) {
+ outCtxs_.insert(front ? outCtxs_.begin() : outCtxs_.end(), ctx.get());
+ }
+ return *this;
+ }
+
std::shared_ptr<AsyncTransport> transport_;
WriteFlags writeFlags_{WriteFlags::NONE};
std::pair<uint64_t, uint64_t> readBufferSettings_{2048, 2048};
bool isStatic_{false};
+ std::shared_ptr<PipelineContext> owner_;
+ std::vector<std::shared_ptr<PipelineContext>> ctxs_;
+ std::vector<PipelineContext*> inCtxs_;
+ std::vector<PipelineContext*> outCtxs_;
InboundLink<R>* front_{nullptr};
OutboundLink<W>* back_{nullptr};
- std::vector<std::shared_ptr<PipelineContext>> ctxs_;
- std::shared_ptr<PipelineContext> owner_;
};
}}
}
protected:
- typedef ContextImpl<Pipeline<R, W>, Handler> Context;
-
template <class HandlerArg, class... HandlerArgs>
StaticPipeline(
bool isFirst,
bool isFirst_;
folly::Optional<Handler> handler_;
std::shared_ptr<Handler> handlerPtr_;
- ContextImpl<Pipeline<R, W>, Handler> ctx_;
+ typename ContextType<Handler, Pipeline<R, W>>::type ctx_;
};
}} // folly::wangle
* IOBufQueue.front(), without split() or pop_front().
*/
class ByteToMessageCodec
- : public BytesToBytesHandler {
+ : public InboundBytesToBytesHandler {
public:
virtual std::unique_ptr<IOBuf> decode(
pipeline
.addBack(BytesReflector())
+ .addBack(LengthFieldPrepender())
.addBack(LengthFieldBasedFrameDecoder())
.addBack(FrameTester([&](std::unique_ptr<IOBuf> buf) {
auto sz = buf->computeChainDataLength();
called++;
EXPECT_EQ(sz, 2);
}))
- .addBack(LengthFieldPrepender())
.finalize();
auto buf = IOBuf::create(2);
*
*/
class LengthFieldPrepender
-: public BytesToBytesHandler {
+: public OutboundBytesToBytesHandler {
public:
LengthFieldPrepender(
int lengthFieldLength = 4,