public:
virtual ~PipelineContext() {}
+ virtual void attachPipeline() = 0;
virtual void detachPipeline() = 0;
virtual void attachTransport() = 0;
initialize(pipeline, std::move(handler));
}
- void initialize(P* pipeline, std::shared_ptr<H> handler) {
- pipeline_ = pipeline;
- handler_ = std::move(handler);
- handler_->attachPipeline(this);
- }
-
// For StaticPipeline
ContextImpl() {}
~ContextImpl() {}
+ void initialize(P* pipeline, std::shared_ptr<H> handler) {
+ pipeline_ = pipeline;
+ handler_ = std::move(handler);
+ }
+
H* getHandler() {
return handler_.get();
}
// PipelineContext overrides
+ void attachPipeline() override {
+ if (!attached_) {
+ handler_->attachPipeline(this);
+ attached_ = true;
+ }
+ }
+
void detachPipeline() override {
handler_->detachPipeline(this);
+ attached_ = false;
}
void setNextIn(PipelineContext* ctx) override {
std::shared_ptr<H> handler_;
InboundHandlerContext<Rout>* nextIn_{nullptr};
OutboundHandlerContext<Wout>* nextOut_{nullptr};
+ bool attached_{false};
};
}}
template <class R, class W>
class Pipeline : public DelayedDestruction {
public:
- Pipeline() {}
+ Pipeline() : isStatic_(false) {}
+
+ ~Pipeline() {
+ if (!isStatic_) {
+ detachHandlers();
+ }
+ }
std::shared_ptr<AsyncTransport> getTransport() {
return transport_;
if (!front_) {
throw std::invalid_argument("wrong type for first handler");
}
+
+ for (auto it = ctxs_.rbegin(); it != ctxs_.rend(); it++) {
+ (*it)->attachPipeline();
+ }
}
// If one of the handlers owns the pipeline itself, use setOwner to ensure
}
protected:
+ explicit Pipeline(bool isStatic) : isStatic_(isStatic) {
+ CHECK(isStatic_);
+ }
+
template <class Context>
void addContextFront(Context* context) {
ctxs_.insert(
WriteFlags writeFlags_{WriteFlags::NONE};
std::pair<uint64_t, uint64_t> readBufferSettings_{2048, 2048};
+ bool isStatic_{false};
InboundHandlerContext<R>* front_{nullptr};
OutboundHandlerContext<W>* back_{nullptr};
std::vector<std::shared_ptr<PipelineContext>> ctxs_;
IntHandler handler1;
IntHandler handler2;
- EXPECT_CALL(handler1, attachPipeline(_));
- EXPECT_CALL(handler2, attachPipeline(_));
+ {
+ InSequence sequence;
+ EXPECT_CALL(handler2, attachPipeline(_));
+ EXPECT_CALL(handler1, attachPipeline(_));
+ }
StaticPipeline<int, int, IntHandler, IntHandler>
pipeline(&handler1, &handler2);
EXPECT_CALL(handler1, close_(_)).Times(1);
EXPECT_NO_THROW(pipeline.close().value());
- EXPECT_CALL(handler1, detachPipeline(_));
- EXPECT_CALL(handler2, detachPipeline(_));
+ {
+ InSequence sequence;
+ EXPECT_CALL(handler1, detachPipeline(_));
+ EXPECT_CALL(handler2, detachPipeline(_));
+ }
}
// Test that nothing bad happens when actions reach the end of the pipeline
IntHandler handler1;
IntHandler handler2;
- EXPECT_CALL(handler1, attachPipeline(_));
- EXPECT_CALL(handler2, attachPipeline(_));
+ {
+ InSequence sequence;
+ EXPECT_CALL(handler2, attachPipeline(_));
+ EXPECT_CALL(handler1, attachPipeline(_));
+ }
StaticPipeline<int, int, IntHandler, IntHandler>
pipeline(&handler1, &handler2);
EXPECT_CALL(handler1, write_(_, _)).Times(1);
pipeline.read(1);
- EXPECT_CALL(handler1, detachPipeline(_));
- EXPECT_CALL(handler2, detachPipeline(_));
+ {
+ InSequence sequence;
+ EXPECT_CALL(handler1, detachPipeline(_));
+ EXPECT_CALL(handler2, detachPipeline(_));
+ }
}
TEST(PipelineTest, DynamicFireActions) {
StaticPipeline<int, int, IntHandler>
pipeline(&handler2);
- EXPECT_CALL(handler1, attachPipeline(_));
- EXPECT_CALL(handler3, attachPipeline(_));
+ {
+ InSequence sequence;
+ EXPECT_CALL(handler3, attachPipeline(_));
+ EXPECT_CALL(handler1, attachPipeline(_));
+ }
pipeline
.addFront(&handler1)
EXPECT_CALL(handler1, write_(_, _)).Times(1);
EXPECT_NO_THROW(pipeline.write(1).value());
- EXPECT_CALL(handler1, detachPipeline(_));
- EXPECT_CALL(handler2, detachPipeline(_));
- EXPECT_CALL(handler3, detachPipeline(_));
+ {
+ InSequence sequence;
+ EXPECT_CALL(handler1, detachPipeline(_));
+ EXPECT_CALL(handler2, detachPipeline(_));
+ EXPECT_CALL(handler3, detachPipeline(_));
+ }
+}
+
+TEST(PipelineTest, DynamicAttachDetachOrder) {
+ IntHandler handler1, handler2;
+ Pipeline<int, int> pipeline;
+ {
+ InSequence sequence;
+ EXPECT_CALL(handler2, attachPipeline(_));
+ EXPECT_CALL(handler1, attachPipeline(_));
+ }
+ pipeline
+ .addBack(&handler1)
+ .addBack(&handler2)
+ .finalize();
+ {
+ InSequence sequence;
+ EXPECT_CALL(handler1, detachPipeline(_));
+ EXPECT_CALL(handler2, detachPipeline(_));
+ }
+}
+
+TEST(PipelineTest, HandlerInMultiplePipelines) {
+ IntHandler handler;
+ EXPECT_CALL(handler, attachPipeline(_)).Times(2);
+ StaticPipeline<int, int, IntHandler> pipeline1(&handler);
+ StaticPipeline<int, int, IntHandler> pipeline2(&handler);
+ EXPECT_CALL(handler, detachPipeline(_)).Times(2);
+}
+
+TEST(PipelineTest, HandlerInPipelineTwice) {
+ IntHandler handler;
+ EXPECT_CALL(handler, attachPipeline(_)).Times(2);
+ StaticPipeline<int, int, IntHandler, IntHandler> pipeline(&handler, &handler);
+ EXPECT_CALL(handler, detachPipeline(_)).Times(2);
}
template <class Rin, class Rout = Rin, class Win = Rout, class Wout = Rin>