From 31c71948e3b4b791ee462355151528a3d15ea7d2 Mon Sep 17 00:00:00 2001 From: James Sedgwick Date: Mon, 15 Jun 2015 12:12:28 -0700 Subject: [PATCH] pipeline handler removal, fix service test Summary: add remove, remove(Handler*), removeFront(), removeBack() to Pipeline employ these to fix up reusing client pipelines with client dispatchers, which in turn fixes the broken ServiceTest Reviewed By: @djwatson Differential Revision: D2152636 --- folly/wangle/channel/HandlerContext-inl.h | 14 ++++ folly/wangle/channel/Pipeline-inl.h | 78 ++++++++++++++++++++ folly/wangle/channel/Pipeline.h | 18 +++++ folly/wangle/channel/test/PipelineTest.cpp | 85 ++++++++++++++++++++++ folly/wangle/service/ClientDispatcher.h | 25 ++++++- 5 files changed, 217 insertions(+), 3 deletions(-) diff --git a/folly/wangle/channel/HandlerContext-inl.h b/folly/wangle/channel/HandlerContext-inl.h index 5f12f99e..9f111c41 100644 --- a/folly/wangle/channel/HandlerContext-inl.h +++ b/folly/wangle/channel/HandlerContext-inl.h @@ -36,6 +36,8 @@ class PipelineContext { virtual void setNextIn(PipelineContext* ctx) = 0; virtual void setNextOut(PipelineContext* ctx) = 0; + + virtual HandlerDir getDirection() = 0; }; template @@ -86,6 +88,10 @@ class ContextImplBase : public PipelineContext { } void setNextIn(PipelineContext* ctx) override { + if (!ctx) { + nextIn_ = nullptr; + return; + } auto nextIn = dynamic_cast*>(ctx); if (nextIn) { nextIn_ = nextIn; @@ -95,6 +101,10 @@ class ContextImplBase : public PipelineContext { } void setNextOut(PipelineContext* ctx) override { + if (!ctx) { + nextOut_ = nullptr; + return; + } auto nextOut = dynamic_cast*>(ctx); if (nextOut) { nextOut_ = nextOut; @@ -103,6 +113,10 @@ class ContextImplBase : public PipelineContext { } } + HandlerDir getDirection() override { + return H::dir; + } + protected: Context* impl_; P* pipeline_; diff --git a/folly/wangle/channel/Pipeline-inl.h b/folly/wangle/channel/Pipeline-inl.h index 7c1d46bc..dde1f39e 100644 --- a/folly/wangle/channel/Pipeline-inl.h +++ b/folly/wangle/channel/Pipeline-inl.h @@ -164,6 +164,80 @@ Pipeline& Pipeline::addFront(H* handler) { return addFront(std::shared_ptr(handler, [](H*){})); } +template +template +Pipeline& Pipeline::removeHelper(H* handler, bool checkEqual) { + typedef typename ContextType>::type Context; + bool removed = false; + for (auto it = ctxs_.begin(); it != ctxs_.end(); it++) { + auto ctx = std::dynamic_pointer_cast(*it); + if (ctx && (!checkEqual || ctx->getHandler() == handler)) { + it = removeAt(it); + removed = true; + if (it == ctxs_.end()) { + break; + } + } + } + + if (!removed) { + throw std::invalid_argument("No such handler in pipeline"); + } + + return *this; +} + +template +template +Pipeline& Pipeline::remove() { + return removeHelper(nullptr, false); +} + +template +template +Pipeline& Pipeline::remove(H* handler) { + return removeHelper(handler, true); +} + +template +typename Pipeline::ContextIterator Pipeline::removeAt( + const typename Pipeline::ContextIterator& it) { + (*it)->detachPipeline(); + + const auto dir = (*it)->getDirection(); + if (dir == HandlerDir::BOTH || dir == HandlerDir::IN) { + auto it2 = std::find(inCtxs_.begin(), inCtxs_.end(), it->get()); + CHECK(it2 != inCtxs_.end()); + inCtxs_.erase(it2); + } + + if (dir == HandlerDir::BOTH || dir == HandlerDir::OUT) { + auto it2 = std::find(outCtxs_.begin(), outCtxs_.end(), it->get()); + CHECK(it2 != outCtxs_.end()); + outCtxs_.erase(it2); + } + + return ctxs_.erase(it); +} + +template +Pipeline& Pipeline::removeFront() { + if (ctxs_.empty()) { + throw std::invalid_argument("No handlers in pipeline"); + } + removeAt(ctxs_.begin()); + return *this; +} + +template +Pipeline& Pipeline::removeBack() { + if (ctxs_.empty()) { + throw std::invalid_argument("No handlers in pipeline"); + } + removeAt(--ctxs_.end()); + return *this; +} + template template H* Pipeline::getHandler(int i) { @@ -190,18 +264,22 @@ inline void logWarningIfNotNothing(const std::string& warning) { // TODO Have read/write/etc check that pipeline has been finalized template void Pipeline::finalize() { + front_ = nullptr; if (!inCtxs_.empty()) { front_ = dynamic_cast*>(inCtxs_.front()); for (size_t i = 0; i < inCtxs_.size() - 1; i++) { inCtxs_[i]->setNextIn(inCtxs_[i+1]); } + inCtxs_.back()->setNextIn(nullptr); } + back_ = nullptr; if (!outCtxs_.empty()) { back_ = dynamic_cast*>(outCtxs_.back()); for (size_t i = outCtxs_.size() - 1; i > 0; i--) { outCtxs_[i]->setNextOut(outCtxs_[i-1]); } + outCtxs_.front()->setNextOut(nullptr); } if (!front_) { diff --git a/folly/wangle/channel/Pipeline.h b/folly/wangle/channel/Pipeline.h index 174c5d9f..7138ab1c 100644 --- a/folly/wangle/channel/Pipeline.h +++ b/folly/wangle/channel/Pipeline.h @@ -126,6 +126,16 @@ class Pipeline : public PipelineBase { template Pipeline& addFront(H* handler); + template + Pipeline& remove(H* handler); + + template + Pipeline& remove(); + + Pipeline& removeFront(); + + Pipeline& removeBack(); + template H* getHandler(int i); @@ -150,6 +160,14 @@ class Pipeline : public PipelineBase { template Pipeline& addHelper(std::shared_ptr&& ctx, bool front); + template + Pipeline& removeHelper(H* handler, bool checkEqual); + + typedef std::vector>::iterator + ContextIterator; + + ContextIterator removeAt(const ContextIterator& it); + WriteFlags writeFlags_{WriteFlags::NONE}; std::pair readBufferSettings_{2048, 2048}; diff --git a/folly/wangle/channel/test/PipelineTest.cpp b/folly/wangle/channel/test/PipelineTest.cpp index cdc4e980..36c4cee6 100644 --- a/folly/wangle/channel/test/PipelineTest.cpp +++ b/folly/wangle/channel/test/PipelineTest.cpp @@ -304,3 +304,88 @@ TEST(Pipeline, DynamicConstruction) { .finalize()); } } + +TEST(Pipeline, RemovePointer) { + IntHandler handler1, handler2; + EXPECT_CALL(handler1, attachPipeline(_)); + EXPECT_CALL(handler2, attachPipeline(_)); + Pipeline pipeline; + pipeline + .addBack(&handler1) + .addBack(&handler2) + .finalize(); + + EXPECT_CALL(handler1, detachPipeline(_)); + pipeline + .remove(&handler1) + .finalize(); + + EXPECT_CALL(handler2, read_(_, _)); + pipeline.read(1); + + EXPECT_CALL(handler2, detachPipeline(_)); +} + +TEST(Pipeline, RemoveFront) { + IntHandler handler1, handler2; + EXPECT_CALL(handler1, attachPipeline(_)); + EXPECT_CALL(handler2, attachPipeline(_)); + Pipeline pipeline; + pipeline + .addBack(&handler1) + .addBack(&handler2) + .finalize(); + + EXPECT_CALL(handler1, detachPipeline(_)); + pipeline + .removeFront() + .finalize(); + + EXPECT_CALL(handler2, read_(_, _)); + pipeline.read(1); + + EXPECT_CALL(handler2, detachPipeline(_)); +} + +TEST(Pipeline, RemoveBack) { + IntHandler handler1, handler2; + EXPECT_CALL(handler1, attachPipeline(_)); + EXPECT_CALL(handler2, attachPipeline(_)); + Pipeline pipeline; + pipeline + .addBack(&handler1) + .addBack(&handler2) + .finalize(); + + EXPECT_CALL(handler2, detachPipeline(_)); + pipeline + .removeBack() + .finalize(); + + EXPECT_CALL(handler1, read_(_, _)); + pipeline.read(1); + + EXPECT_CALL(handler1, detachPipeline(_)); +} + +TEST(Pipeline, RemoveType) { + IntHandler handler1; + IntHandler2 handler2; + EXPECT_CALL(handler1, attachPipeline(_)); + EXPECT_CALL(handler2, attachPipeline(_)); + Pipeline pipeline; + pipeline + .addBack(&handler1) + .addBack(&handler2) + .finalize(); + + EXPECT_CALL(handler1, detachPipeline(_)); + pipeline + .remove() + .finalize(); + + EXPECT_CALL(handler2, read_(_, _)); + pipeline.read(1); + + EXPECT_CALL(handler2, detachPipeline(_)); +} diff --git a/folly/wangle/service/ClientDispatcher.h b/folly/wangle/service/ClientDispatcher.h index d6354f04..66f24cc9 100644 --- a/folly/wangle/service/ClientDispatcher.h +++ b/folly/wangle/service/ClientDispatcher.h @@ -29,13 +29,27 @@ template class SerialClientDispatcher : public HandlerAdapter , public Service { public: - typedef typename HandlerAdapter::Context Context; + ~SerialClientDispatcher() { + if (pipeline_) { + try { + pipeline_->remove(this).finalize(); + } catch (const std::invalid_argument& e) { + // not in pipeline; this is fine + } + } + } + void setPipeline(Pipeline* pipeline) { + try { + pipeline->template remove(); + } catch (const std::invalid_argument& e) { + // no existing dispatcher; this is fine + } pipeline_ = pipeline; - pipeline->addBack(this); - pipeline->finalize(); + pipeline_->addBack(this); + pipeline_->finalize(); } void read(Context* ctx, Req in) override { @@ -61,6 +75,11 @@ class SerialClientDispatcher : public HandlerAdapter virtual Future close(Context* ctx) override { return HandlerAdapter::close(ctx); } + + void detachPipeline(Context* ctx) override { + pipeline_ = nullptr; + } + private: Pipeline* pipeline_{nullptr}; folly::Optional> p_; -- 2.34.1