wangle/channel/HandlerContext.h \
wangle/channel/OutputBufferingHandler.h \
wangle/channel/Pipeline.h \
+ wangle/channel/StaticPipeline.h \
wangle/concurrent/BlockingQueue.h \
wangle/concurrent/Codel.h \
wangle/concurrent/CPUThreadPoolExecutor.h \
public:
ServerBootstrap<BytesPipeline>::AcceptPipeline* newPipeline(std::shared_ptr<AsyncSocket>) {
auto pipeline = new ServerBootstrap<BytesPipeline>::AcceptPipeline;
- auto handler = std::make_shared<HandlerPipeline>();
- pipeline->addBack(HandlerPtr<HandlerPipeline>(handler));
+ pipeline->addBack(HandlerPipeline());
return pipeline;
}
};
Acceptor::init(nullptr, base_);
CHECK(acceptorPipeline_);
- acceptorPipeline_->addBack(folly::wangle::HandlerPtr<ServerAcceptor, false>(this));
+ acceptorPipeline_->addBack(this);
acceptorPipeline_->finalize();
}
typedef HandlerAdapter<IOBufQueue&, std::unique_ptr<IOBuf>>
BytesToBytesHandler;
-template <class HandlerT, bool Shared = true>
-class HandlerPtr : public Handler<
- typename HandlerT::rin,
- typename HandlerT::rout,
- typename HandlerT::win,
- typename HandlerT::wout> {
- public:
- typedef typename std::conditional<
- Shared,
- std::shared_ptr<HandlerT>,
- HandlerT*>::type
- Ptr;
-
- typedef typename HandlerT::Context Context;
-
- explicit HandlerPtr(Ptr handler)
- : handler_(std::move(handler)) {}
-
- Ptr getHandler() {
- return handler_;
- }
-
- void setHandler(Ptr handler) {
- if (handler == handler_) {
- return;
- }
- if (handler_ && ctx_) {
- handler_->detachPipeline(ctx_);
- }
- handler_ = std::move(handler);
- if (handler_ && ctx_) {
- handler_->attachPipeline(ctx_);
- if (ctx_->getTransport()) {
- handler_->attachTransport(ctx_);
- }
- }
- }
-
- void attachPipeline(Context* ctx) override {
- ctx_ = ctx;
- if (handler_) {
- handler_->attachPipeline(ctx_);
- }
- }
-
- void attachTransport(Context* ctx) override {
- ctx_ = ctx;
- if (handler_) {
- handler_->attachTransport(ctx_);
- }
- }
-
- void detachPipeline(Context* ctx) override {
- ctx_ = ctx;
- if (handler_) {
- handler_->detachPipeline(ctx_);
- }
- }
-
- void detachTransport(Context* ctx) override {
- ctx_ = ctx;
- if (handler_) {
- handler_->detachTransport(ctx_);
- }
- }
-
- void read(Context* ctx, typename HandlerT::rin msg) override {
- DCHECK(handler_);
- handler_->read(ctx, std::forward<typename HandlerT::rin>(msg));
- }
-
- void readEOF(Context* ctx) override {
- DCHECK(handler_);
- handler_->readEOF(ctx);
- }
-
- void readException(Context* ctx, exception_wrapper e) override {
- DCHECK(handler_);
- handler_->readException(ctx, std::move(e));
- }
-
- Future<void> write(Context* ctx, typename HandlerT::win msg) override {
- DCHECK(handler_);
- return handler_->write(ctx, std::forward<typename HandlerT::win>(msg));
- }
-
- Future<void> close(Context* ctx) override {
- DCHECK(handler_);
- return handler_->close(ctx);
- }
-
- private:
- Context* ctx_;
- Ptr handler_;
-};
-
}}
public:
virtual ~PipelineContext() {}
+ virtual void detachPipeline() = 0;
+
virtual void attachTransport() = 0;
virtual void detachTransport() = 0;
typedef typename H::win Win;
typedef typename H::wout Wout;
- template <class HandlerArg>
- explicit ContextImpl(P* pipeline, HandlerArg&& handlerArg)
- : pipeline_(pipeline),
- handler_(std::forward<HandlerArg>(handlerArg)) {
- handler_.attachPipeline(this);
+ explicit ContextImpl(P* pipeline, std::shared_ptr<H> handler) {
+ initialize(pipeline, std::move(handler));
}
- ~ContextImpl() {
- handler_.detachPipeline(this);
+ void initialize(P* pipeline, std::shared_ptr<H> handler) {
+ pipeline_ = pipeline;
+ handler_ = std::move(handler);
+ handler_->attachPipeline(this);
}
+ // For StaticPipeline
+ ContextImpl() {}
+
+ ~ContextImpl() {}
+
H* getHandler() {
- return &handler_;
+ return handler_.get();
}
// PipelineContext overrides
+ void detachPipeline() override {
+ handler_->detachPipeline(this);
+ }
+
void setNextIn(PipelineContext* ctx) override {
auto nextIn = dynamic_cast<InboundHandlerContext<Rout>*>(ctx);
if (nextIn) {
void attachTransport() override {
typename P::DestructorGuard dg(static_cast<DelayedDestruction*>(pipeline_));
- handler_.attachTransport(this);
+ handler_->attachTransport(this);
}
void detachTransport() override {
typename P::DestructorGuard dg(static_cast<DelayedDestruction*>(pipeline_));
- handler_.detachTransport(this);
+ handler_->detachTransport(this);
}
// HandlerContext overrides
// InboundHandlerContext overrides
void read(Rin msg) override {
typename P::DestructorGuard dg(static_cast<DelayedDestruction*>(pipeline_));
- handler_.read(this, std::forward<Rin>(msg));
+ handler_->read(this, std::forward<Rin>(msg));
}
void readEOF() override {
typename P::DestructorGuard dg(static_cast<DelayedDestruction*>(pipeline_));
- handler_.readEOF(this);
+ handler_->readEOF(this);
}
void readException(exception_wrapper e) override {
typename P::DestructorGuard dg(static_cast<DelayedDestruction*>(pipeline_));
- handler_.readException(this, std::move(e));
+ handler_->readException(this, std::move(e));
}
// OutboundHandlerContext overrides
Future<void> write(Win msg) override {
typename P::DestructorGuard dg(static_cast<DelayedDestruction*>(pipeline_));
- return handler_.write(this, std::forward<Win>(msg));
+ return handler_->write(this, std::forward<Win>(msg));
}
Future<void> close() override {
typename P::DestructorGuard dg(static_cast<DelayedDestruction*>(pipeline_));
- return handler_.close(this);
+ return handler_->close(this);
}
private:
P* pipeline_;
- H handler_;
+ std::shared_ptr<H> handler_;
InboundHandlerContext<Rout>* nextIn_{nullptr};
OutboundHandlerContext<Wout>* nextOut_{nullptr};
};
* R is the inbound type, i.e. inbound calls start with pipeline.read(R)
* W is the outbound type, i.e. outbound calls start with pipeline.write(W)
*/
-template <class R, class W, class... Handlers>
-class Pipeline;
-
template <class R, class W>
-class Pipeline<R, W> : public DelayedDestruction {
+class Pipeline : public DelayedDestruction {
public:
Pipeline() {}
- ~Pipeline() {}
std::shared_ptr<AsyncTransport> getTransport() {
return transport_;
}
template <class H>
- Pipeline& addBack(H&& handler) {
- ctxs_.push_back(folly::make_unique<ContextImpl<Pipeline, H>>(
- this, std::forward<H>(handler)));
+ Pipeline& addBack(std::shared_ptr<H> handler) {
+ ctxs_.push_back(std::make_shared<ContextImpl<Pipeline, H>>(
+ this,
+ std::move(handler)));
return *this;
}
template <class H>
- Pipeline& addFront(H&& handler) {
+ Pipeline& addBack(H* handler) {
+ return addBack(std::shared_ptr<H>(handler, [](H*){}));
+ }
+
+ template <class H>
+ Pipeline& addBack(H&& handler) {
+ return addBack(std::make_shared<H>(std::forward<H>(handler)));
+ }
+
+ template <class H>
+ Pipeline& addFront(std::shared_ptr<H> handler) {
ctxs_.insert(
ctxs_.begin(),
- folly::make_unique<ContextImpl<Pipeline, H>>(
- this,
- std::forward<H>(handler)));
+ std::make_shared<ContextImpl<Pipeline, H>>(this, std::move(handler)));
return *this;
}
+ template <class H>
+ Pipeline& addFront(H* handler) {
+ return addFront(std::shared_ptr<H>(handler, [](H*){}));
+ }
+
+ template <class H>
+ Pipeline& addFront(H&& handler) {
+ return addFront(std::make_shared<H>(std::forward<H>(handler)));
+ }
+
template <class H>
H* getHandler(int i) {
auto ctx = dynamic_cast<ContextImpl<Pipeline, H>*>(ctxs_[i].get());
}
void finalize() {
- finalizeHelper();
- InboundHandlerContext<R>* front;
- front_ = dynamic_cast<InboundHandlerContext<R>*>(
- ctxs_.front().get());
- if (!front_) {
- throw std::invalid_argument("wrong type for first handler");
- }
- }
-
- protected:
- explicit Pipeline(bool shouldFinalize) {
- CHECK(!shouldFinalize);
- }
-
- void finalizeHelper() {
if (ctxs_.empty()) {
return;
}
if (!back_) {
throw std::invalid_argument("wrong type for last handler");
}
- }
-
- PipelineContext* getLocalFront() {
- return ctxs_.empty() ? nullptr : ctxs_.front().get();
- }
-
- static const bool is_end{true};
-
- std::shared_ptr<AsyncTransport> transport_;
- WriteFlags writeFlags_{WriteFlags::NONE};
- std::pair<uint64_t, uint64_t> readBufferSettings_{2048, 2048};
-
- void attachPipeline() {}
-
- void attachTransport(
- std::shared_ptr<AsyncTransport> transport) {
- transport_ = std::move(transport);
- }
- void detachTransport() {
- transport_ = nullptr;
- }
-
- OutboundHandlerContext<W>* back_{nullptr};
-
- private:
- InboundHandlerContext<R>* front_{nullptr};
- std::vector<std::unique_ptr<PipelineContext>> ctxs_;
-};
-
-template <class R, class W, class Handler, class... Handlers>
-class Pipeline<R, W, Handler, Handlers...>
- : public Pipeline<R, W, Handlers...> {
- protected:
- template <class HandlerArg, class... HandlersArgs>
- Pipeline(
- bool shouldFinalize,
- HandlerArg&& handlerArg,
- HandlersArgs&&... handlersArgs)
- : Pipeline<R, W, Handlers...>(
- false,
- std::forward<HandlersArgs>(handlersArgs)...),
- ctx_(this, std::forward<HandlerArg>(handlerArg)) {
- if (shouldFinalize) {
- finalize();
+ front_ = dynamic_cast<InboundHandlerContext<R>*>(ctxs_.front().get());
+ if (!front_) {
+ throw std::invalid_argument("wrong type for first handler");
}
}
- public:
- template <class... HandlersArgs>
- explicit Pipeline(HandlersArgs&&... handlersArgs)
- : Pipeline(true, std::forward<HandlersArgs>(handlersArgs)...) {}
-
- ~Pipeline() {}
-
- void read(R msg) {
- typename Pipeline<R, W>::DestructorGuard dg(
- static_cast<DelayedDestruction*>(this));
- front_->read(std::forward<R>(msg));
- }
-
- void readEOF() {
- typename Pipeline<R, W>::DestructorGuard dg(
- static_cast<DelayedDestruction*>(this));
- front_->readEOF();
- }
-
- void readException(exception_wrapper e) {
- typename Pipeline<R, W>::DestructorGuard dg(
- static_cast<DelayedDestruction*>(this));
- front_->readException(std::move(e));
- }
-
- Future<void> write(W msg) {
- typename Pipeline<R, W>::DestructorGuard dg(
- static_cast<DelayedDestruction*>(this));
- return back_->write(std::forward<W>(msg));
- }
-
- Future<void> close() {
- typename Pipeline<R, W>::DestructorGuard dg(
- static_cast<DelayedDestruction*>(this));
- return back_->close();
+ // If one of the handlers owns the pipeline itself, use setOwner to ensure
+ // that the pipeline doesn't try to detach the handler during destruction,
+ // lest destruction ordering issues occur.
+ // See thrift/lib/cpp2/async/Cpp2Channel.cpp for an example
+ template <class H>
+ bool setOwner(H* handler) {
+ for (auto& ctx : ctxs_) {
+ auto ctxImpl = dynamic_cast<ContextImpl<Pipeline, H>*>(ctx.get());
+ if (ctxImpl && ctxImpl->getHandler() == handler) {
+ owner_ = ctx;
+ return true;
+ }
+ }
+ return false;
}
void attachTransport(
std::shared_ptr<AsyncTransport> transport) {
- typename Pipeline<R, W>::DestructorGuard dg(
- static_cast<DelayedDestruction*>(this));
- CHECK((!Pipeline<R, W>::transport_));
- Pipeline<R, W, Handlers...>::attachTransport(std::move(transport));
- forEachCtx([&](PipelineContext* ctx){
+ transport_ = std::move(transport);
+ for (auto& ctx : ctxs_) {
ctx->attachTransport();
- });
+ }
}
void detachTransport() {
- typename Pipeline<R, W>::DestructorGuard dg(
- static_cast<DelayedDestruction*>(this));
- Pipeline<R, W, Handlers...>::detachTransport();
- forEachCtx([&](PipelineContext* ctx){
+ transport_ = nullptr;
+ for (auto& ctx : ctxs_) {
ctx->detachTransport();
- });
- }
-
- std::shared_ptr<AsyncTransport> getTransport() {
- return Pipeline<R, W>::transport_;
- }
-
- template <class H>
- Pipeline& addBack(H&& handler) {
- Pipeline<R, W>::addBack(std::move(handler));
- return *this;
+ }
}
- template <class H>
- Pipeline& addFront(H&& handler) {
+ protected:
+ template <class Context>
+ void addContextFront(Context* context) {
ctxs_.insert(
ctxs_.begin(),
- folly::make_unique<ContextImpl<Pipeline, H>>(
- this,
- std::move(handler)));
- return *this;
- }
-
- template <class H>
- H* getHandler(size_t i) {
- if (i > ctxs_.size()) {
- return Pipeline<R, W, Handlers...>::template getHandler<H>(
- i - (ctxs_.size() + 1));
- } else {
- auto pctx = (i == ctxs_.size()) ? &ctx_ : ctxs_[i].get();
- auto ctx = dynamic_cast<ContextImpl<Pipeline, H>*>(pctx);
- return ctx->getHandler();
- }
- }
-
- void finalize() {
- finalizeHelper();
- auto ctx = ctxs_.empty() ? &ctx_ : ctxs_.front().get();
- front_ = dynamic_cast<InboundHandlerContext<R>*>(ctx);
- if (!front_) {
- throw std::invalid_argument("wrong type for first handler");
- }
+ std::shared_ptr<Context>(context, [](Context*){}));
}
- protected:
- void finalizeHelper() {
- Pipeline<R, W, Handlers...>::finalizeHelper();
- back_ = Pipeline<R, W, Handlers...>::back_;
- if (!back_) {
- auto is_at_end = Pipeline<R, W, Handlers...>::is_end;
- CHECK(is_at_end);
- back_ = dynamic_cast<OutboundHandlerContext<W>*>(&ctx_);
- if (!back_) {
- throw std::invalid_argument("wrong type for last handler");
- }
- }
-
- if (!ctxs_.empty()) {
- for (size_t i = 0; i < ctxs_.size() - 1; i++) {
- ctxs_[i]->link(ctxs_[i+1].get());
+ void detachHandlers() {
+ for (auto& ctx : ctxs_) {
+ if (ctx != owner_) {
+ ctx->detachPipeline();
}
- ctxs_.back()->link(&ctx_);
- }
-
- auto nextFront = Pipeline<R, W, Handlers...>::getLocalFront();
- if (nextFront) {
- ctx_.link(nextFront);
}
}
- PipelineContext* getLocalFront() {
- return ctxs_.empty() ? &ctx_ : ctxs_.front().get();
- }
+ private:
+ std::shared_ptr<AsyncTransport> transport_;
+ WriteFlags writeFlags_{WriteFlags::NONE};
+ std::pair<uint64_t, uint64_t> readBufferSettings_{2048, 2048};
- static const bool is_end{false};
InboundHandlerContext<R>* front_{nullptr};
OutboundHandlerContext<W>* back_{nullptr};
-
- private:
- template <class F>
- void forEachCtx(const F& func) {
- for (auto& ctx : ctxs_) {
- func(ctx.get());
- }
- func(&ctx_);
- }
-
- ContextImpl<Pipeline, Handler> ctx_;
- std::vector<std::unique_ptr<PipelineContext>> ctxs_;
+ std::vector<std::shared_ptr<PipelineContext>> ctxs_;
+ std::shared_ptr<PipelineContext> owner_;
};
}}
--- /dev/null
+/*
+ * Copyright 2015 Facebook, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#pragma once
+
+#include <folly/wangle/channel/Pipeline.h>
+
+namespace folly { namespace wangle {
+
+/*
+ * StaticPipeline allows you to create a Pipeline with minimal allocations.
+ * Specify your handlers after the input/output types of your Pipeline in order
+ * from front to back, and construct with either H&&, H*, or std::shared_ptr<H>
+ * for each handler. The pipeline will be finalized for you at the end of
+ * construction. For example:
+ *
+ * StringToStringHandler stringHandler1;
+ * auto stringHandler2 = std::make_shared<StringToStringHandler>();
+ *
+ * StaticPipeline<int, std::string,
+ * IntToStringHandler,
+ * StringToStringHandler,
+ * StringToStringHandler>(
+ * IntToStringHandler(), // H&&
+ * &stringHandler1, // H*
+ * stringHandler2) // std::shared_ptr<H>
+ * pipeline;
+ *
+ * You can then use pipeline just like any Pipeline. See Pipeline.h.
+ */
+template <class R, class W, class... Handlers>
+class StaticPipeline;
+
+template <class R, class W>
+class StaticPipeline<R, W> : public Pipeline<R, W> {
+ protected:
+ explicit StaticPipeline(bool) : Pipeline<R, W>() {}
+};
+
+template <class R, class W, class Handler, class... Handlers>
+class StaticPipeline<R, W, Handler, Handlers...>
+ : public StaticPipeline<R, W, Handlers...> {
+ public:
+ template <class... HandlerArgs>
+ explicit StaticPipeline(HandlerArgs&&... handlers)
+ : StaticPipeline(true, std::forward<HandlerArgs>(handlers)...) {
+ isFirst_ = true;
+ }
+
+ ~StaticPipeline() {
+ if (isFirst_) {
+ Pipeline<R, W>::detachHandlers();
+ }
+ }
+
+ protected:
+ typedef ContextImpl<Pipeline<R, W>, Handler> Context;
+
+ template <class HandlerArg, class... HandlerArgs>
+ StaticPipeline(
+ bool isFirst,
+ HandlerArg&& handler,
+ HandlerArgs&&... handlers)
+ : StaticPipeline<R, W, Handlers...>(
+ false,
+ std::forward<HandlerArgs>(handlers)...) {
+ isFirst_ = isFirst;
+ setHandler(std::forward<HandlerArg>(handler));
+ CHECK(handlerPtr_);
+ ctx_.initialize(this, handlerPtr_);
+ Pipeline<R, W>::addContextFront(&ctx_);
+ if (isFirst_) {
+ Pipeline<R, W>::finalize();
+ }
+ }
+
+ private:
+ template <class HandlerArg>
+ typename std::enable_if<std::is_same<
+ typename std::remove_reference<HandlerArg>::type,
+ Handler
+ >::value>::type
+ setHandler(HandlerArg&& arg) {
+ handler_.emplace(std::forward<HandlerArg>(arg));
+ handlerPtr_ = std::shared_ptr<Handler>(&(*handler_), [](Handler*){});
+ }
+
+ template <class HandlerArg>
+ typename std::enable_if<std::is_same<
+ typename std::decay<HandlerArg>::type,
+ std::shared_ptr<Handler>
+ >::value>::type
+ setHandler(HandlerArg&& arg) {
+ handlerPtr_ = std::forward<HandlerArg>(arg);
+ }
+
+ template <class HandlerArg>
+ typename std::enable_if<std::is_same<
+ typename std::decay<HandlerArg>::type,
+ Handler*
+ >::value>::type
+ setHandler(HandlerArg&& arg) {
+ handlerPtr_ = std::shared_ptr<Handler>(arg, [](Handler*){});
+ }
+
+ bool isFirst_;
+ folly::Optional<Handler> handler_;
+ std::shared_ptr<Handler> handlerPtr_;
+ ContextImpl<Pipeline<R, W>, Handler> ctx_;
+};
+
+}} // folly::wangle
* limitations under the License.
*/
-#include <folly/wangle/channel/Pipeline.h>
+#include <folly/wangle/channel/StaticPipeline.h>
#include <folly/wangle/channel/OutputBufferingHandler.h>
#include <folly/wangle/channel/test/MockHandler.h>
#include <folly/io/async/AsyncSocket.h>
TEST(OutputBufferingHandlerTest, Basic) {
MockBytesHandler mockHandler;
EXPECT_CALL(mockHandler, attachPipeline(_));
- Pipeline<IOBufQueue&, std::unique_ptr<IOBuf>,
- HandlerPtr<MockBytesHandler, false>,
+ StaticPipeline<IOBufQueue&, std::unique_ptr<IOBuf>,
+ MockBytesHandler,
OutputBufferingHandler>
pipeline(&mockHandler, OutputBufferingHandler{});
#include <folly/wangle/channel/Handler.h>
#include <folly/wangle/channel/Pipeline.h>
+#include <folly/wangle/channel/StaticPipeline.h>
#include <folly/wangle/channel/AsyncSocketHandler.h>
#include <folly/wangle/channel/OutputBufferingHandler.h>
#include <folly/wangle/channel/test/MockHandler.h>
using namespace testing;
typedef StrictMock<MockHandlerAdapter<int, int>> IntHandler;
-typedef HandlerPtr<IntHandler, false> IntHandlerPtr;
ACTION(FireRead) {
arg0->fireRead(arg1);
auto socket = AsyncSocket::newSocket(&eb);
// static
{
- Pipeline<IOBufQueue&, std::unique_ptr<IOBuf>,
+ StaticPipeline<IOBufQueue&, std::unique_ptr<IOBuf>,
AsyncSocketHandler,
OutputBufferingHandler>
pipeline{AsyncSocketHandler(socket), OutputBufferingHandler()};
EXPECT_CALL(handler1, attachPipeline(_));
EXPECT_CALL(handler2, attachPipeline(_));
- Pipeline<int, int, IntHandlerPtr, IntHandlerPtr>
+ StaticPipeline<int, int, IntHandler, IntHandler>
pipeline(&handler1, &handler2);
EXPECT_CALL(handler1, read_(_, _)).WillOnce(FireRead());
TEST(PipelineTest, ReachEndOfPipeline) {
IntHandler handler;
EXPECT_CALL(handler, attachPipeline(_));
- Pipeline<int, int, IntHandlerPtr>
+ StaticPipeline<int, int, IntHandler>
pipeline(&handler);
EXPECT_CALL(handler, read_(_, _)).WillOnce(FireRead());
EXPECT_CALL(handler1, attachPipeline(_));
EXPECT_CALL(handler2, attachPipeline(_));
- Pipeline<int, int, IntHandlerPtr, IntHandlerPtr>
+ StaticPipeline<int, int, IntHandler, IntHandler>
pipeline(&handler1, &handler2);
EXPECT_CALL(handler1, read_(_, _)).WillOnce(FireRead());
TEST(PipelineTest, DynamicFireActions) {
IntHandler handler1, handler2, handler3;
EXPECT_CALL(handler2, attachPipeline(_));
- Pipeline<int, int, IntHandlerPtr>
+ StaticPipeline<int, int, IntHandler>
pipeline(&handler2);
EXPECT_CALL(handler1, attachPipeline(_));
EXPECT_CALL(handler3, attachPipeline(_));
pipeline
- .addFront(IntHandlerPtr(&handler1))
- .addBack(IntHandlerPtr(&handler3))
+ .addFront(&handler1)
+ .addBack(&handler3)
.finalize();
- EXPECT_TRUE(pipeline.getHandler<IntHandlerPtr>(0));
- EXPECT_TRUE(pipeline.getHandler<IntHandlerPtr>(1));
- EXPECT_TRUE(pipeline.getHandler<IntHandlerPtr>(2));
+ EXPECT_TRUE(pipeline.getHandler<IntHandler>(0));
+ EXPECT_TRUE(pipeline.getHandler<IntHandler>(1));
+ EXPECT_TRUE(pipeline.getHandler<IntHandler>(2));
EXPECT_CALL(handler1, read_(_, _)).WillOnce(FireRead());
EXPECT_CALL(handler2, read_(_, _)).WillOnce(FireRead());
std::invalid_argument);
}
{
- Pipeline<std::string, std::string, StringHandler, StringHandler>
+ StaticPipeline<std::string, std::string, StringHandler, StringHandler>
pipeline{StringHandler(), StringHandler()};
// Exercise both addFront and addBack. Final pipeline is
TEST(Pipeline, AttachTransport) {
IntHandler handler;
EXPECT_CALL(handler, attachPipeline(_));
- Pipeline<int, int, IntHandlerPtr>
+ StaticPipeline<int, int, IntHandler>
pipeline(&handler);
EventBase eb;
void setPipeline(Pipeline* pipeline) {
pipeline_ = pipeline;
- pipeline->addBack(
- HandlerPtr<SerialClientDispatcher<Pipeline, Req, Resp>, false>(
- this));
+ pipeline->addBack(this);
pipeline->finalize();
}