From: James Sedgwick <jsedgwick@fb.com>
Date: Mon, 27 Apr 2015 19:12:53 +0000 (-0700)
Subject: fix detachPipeline/attachPipeline ordering
X-Git-Tag: v0.37.0~12
X-Git-Url: http://demsky.eecs.uci.edu/git/?a=commitdiff_plain;h=9eed30ddfdcc53b0d17fa72299ea96cde16cc52b;p=folly.git

fix detachPipeline/attachPipeline ordering

Summary:
detachPipeline always goes bottom to top
attachPipeline always goes top to bottom
now we can attachReadCallback in AsyncSocketHandler::attachPipeline()

not sure of the implications for TAsyncTransportHandler... looks like Cpp2Channel still wants to attach/detach cb manually

Test Plan: unit

Reviewed By: davejwatson@fb.com

Subscribers: fugalh, folly-diffs@, jsedgwick, yfeldblum, chalfant

FB internal diff: D2023982

Tasks: 6836580

Signature: t1:2023982:1430157500:e09a4103550a3e5721effaa1b28ac5bed071fa67
---

diff --git a/folly/wangle/channel/AsyncSocketHandler.h b/folly/wangle/channel/AsyncSocketHandler.h
index 014812b8..ad76e08e 100644
--- a/folly/wangle/channel/AsyncSocketHandler.h
+++ b/folly/wangle/channel/AsyncSocketHandler.h
@@ -67,6 +67,7 @@ class AsyncSocketHandler
   void attachPipeline(Context* ctx) override {
     CHECK(!ctx_);
     ctx_ = ctx;
+    attachReadCallback();
   }
 
   folly::Future<void> write(
diff --git a/folly/wangle/channel/HandlerContext.h b/folly/wangle/channel/HandlerContext.h
index deddb2de..aadc0168 100644
--- a/folly/wangle/channel/HandlerContext.h
+++ b/folly/wangle/channel/HandlerContext.h
@@ -59,6 +59,7 @@ class PipelineContext {
  public:
   virtual ~PipelineContext() {}
 
+  virtual void attachPipeline() = 0;
   virtual void detachPipeline() = 0;
 
   virtual void attachTransport() = 0;
@@ -107,24 +108,31 @@ class ContextImpl : public HandlerContext<typename H::rout,
     initialize(pipeline, std::move(handler));
   }
 
-  void initialize(P* pipeline, std::shared_ptr<H> handler) {
-    pipeline_ = pipeline;
-    handler_ = std::move(handler);
-    handler_->attachPipeline(this);
-  }
-
   // For StaticPipeline
   ContextImpl() {}
 
   ~ContextImpl() {}
 
+  void initialize(P* pipeline, std::shared_ptr<H> handler) {
+    pipeline_ = pipeline;
+    handler_ = std::move(handler);
+  }
+
   H* getHandler() {
     return handler_.get();
   }
 
   // PipelineContext overrides
+  void attachPipeline() override {
+    if (!attached_) {
+      handler_->attachPipeline(this);
+      attached_ = true;
+    }
+  }
+
   void detachPipeline() override {
     handler_->detachPipeline(this);
+    attached_ = false;
   }
 
   void setNextIn(PipelineContext* ctx) override {
@@ -257,6 +265,7 @@ class ContextImpl : public HandlerContext<typename H::rout,
   std::shared_ptr<H> handler_;
   InboundHandlerContext<Rout>* nextIn_{nullptr};
   OutboundHandlerContext<Wout>* nextOut_{nullptr};
+  bool attached_{false};
 };
 
 }}
diff --git a/folly/wangle/channel/Pipeline.h b/folly/wangle/channel/Pipeline.h
index 0cabfb2c..643ccdae 100644
--- a/folly/wangle/channel/Pipeline.h
+++ b/folly/wangle/channel/Pipeline.h
@@ -33,7 +33,13 @@ namespace folly { namespace wangle {
 template <class R, class W>
 class Pipeline : public DelayedDestruction {
  public:
-  Pipeline() {}
+  Pipeline() : isStatic_(false) {}
+
+  ~Pipeline() {
+    if (!isStatic_) {
+      detachHandlers();
+    }
+  }
 
   std::shared_ptr<AsyncTransport> getTransport() {
     return transport_;
@@ -136,6 +142,10 @@ class Pipeline : public DelayedDestruction {
     if (!front_) {
       throw std::invalid_argument("wrong type for first handler");
     }
+
+    for (auto it = ctxs_.rbegin(); it != ctxs_.rend(); it++) {
+      (*it)->attachPipeline();
+    }
   }
 
   // If one of the handlers owns the pipeline itself, use setOwner to ensure
@@ -170,6 +180,10 @@ class Pipeline : public DelayedDestruction {
   }
 
  protected:
+  explicit Pipeline(bool isStatic) : isStatic_(isStatic) {
+    CHECK(isStatic_);
+  }
+
   template <class Context>
   void addContextFront(Context* context) {
     ctxs_.insert(
@@ -190,6 +204,7 @@ class Pipeline : public DelayedDestruction {
   WriteFlags writeFlags_{WriteFlags::NONE};
   std::pair<uint64_t, uint64_t> readBufferSettings_{2048, 2048};
 
+  bool isStatic_{false};
   InboundHandlerContext<R>* front_{nullptr};
   OutboundHandlerContext<W>* back_{nullptr};
   std::vector<std::shared_ptr<PipelineContext>> ctxs_;
diff --git a/folly/wangle/channel/StaticPipeline.h b/folly/wangle/channel/StaticPipeline.h
index 3c644eb7..6c6c5e08 100644
--- a/folly/wangle/channel/StaticPipeline.h
+++ b/folly/wangle/channel/StaticPipeline.h
@@ -47,7 +47,7 @@ class StaticPipeline;
 template <class R, class W>
 class StaticPipeline<R, W> : public Pipeline<R, W> {
  protected:
-  explicit StaticPipeline(bool) : Pipeline<R, W>() {}
+  explicit StaticPipeline(bool) : Pipeline<R, W>(true) {}
 };
 
 template <class R, class W, class Handler, class... Handlers>
diff --git a/folly/wangle/channel/test/PipelineTest.cpp b/folly/wangle/channel/test/PipelineTest.cpp
index d4d2ce7c..37f3fa9f 100644
--- a/folly/wangle/channel/test/PipelineTest.cpp
+++ b/folly/wangle/channel/test/PipelineTest.cpp
@@ -79,8 +79,11 @@ TEST(PipelineTest, FireActions) {
   IntHandler handler1;
   IntHandler handler2;
 
-  EXPECT_CALL(handler1, attachPipeline(_));
-  EXPECT_CALL(handler2, attachPipeline(_));
+  {
+    InSequence sequence;
+    EXPECT_CALL(handler2, attachPipeline(_));
+    EXPECT_CALL(handler1, attachPipeline(_));
+  }
 
   StaticPipeline<int, int, IntHandler, IntHandler>
   pipeline(&handler1, &handler2);
@@ -105,8 +108,11 @@ TEST(PipelineTest, FireActions) {
   EXPECT_CALL(handler1, close_(_)).Times(1);
   EXPECT_NO_THROW(pipeline.close().value());
 
-  EXPECT_CALL(handler1, detachPipeline(_));
-  EXPECT_CALL(handler2, detachPipeline(_));
+  {
+    InSequence sequence;
+    EXPECT_CALL(handler1, detachPipeline(_));
+    EXPECT_CALL(handler2, detachPipeline(_));
+  }
 }
 
 // Test that nothing bad happens when actions reach the end of the pipeline
@@ -140,8 +146,11 @@ TEST(PipelineTest, TurnAround) {
   IntHandler handler1;
   IntHandler handler2;
 
-  EXPECT_CALL(handler1, attachPipeline(_));
-  EXPECT_CALL(handler2, attachPipeline(_));
+  {
+    InSequence sequence;
+    EXPECT_CALL(handler2, attachPipeline(_));
+    EXPECT_CALL(handler1, attachPipeline(_));
+  }
 
   StaticPipeline<int, int, IntHandler, IntHandler>
   pipeline(&handler1, &handler2);
@@ -151,8 +160,11 @@ TEST(PipelineTest, TurnAround) {
   EXPECT_CALL(handler1, write_(_, _)).Times(1);
   pipeline.read(1);
 
-  EXPECT_CALL(handler1, detachPipeline(_));
-  EXPECT_CALL(handler2, detachPipeline(_));
+  {
+    InSequence sequence;
+    EXPECT_CALL(handler1, detachPipeline(_));
+    EXPECT_CALL(handler2, detachPipeline(_));
+  }
 }
 
 TEST(PipelineTest, DynamicFireActions) {
@@ -161,8 +173,11 @@ TEST(PipelineTest, DynamicFireActions) {
   StaticPipeline<int, int, IntHandler>
   pipeline(&handler2);
 
-  EXPECT_CALL(handler1, attachPipeline(_));
-  EXPECT_CALL(handler3, attachPipeline(_));
+  {
+    InSequence sequence;
+    EXPECT_CALL(handler3, attachPipeline(_));
+    EXPECT_CALL(handler1, attachPipeline(_));
+  }
 
   pipeline
     .addFront(&handler1)
@@ -183,9 +198,46 @@ TEST(PipelineTest, DynamicFireActions) {
   EXPECT_CALL(handler1, write_(_, _)).Times(1);
   EXPECT_NO_THROW(pipeline.write(1).value());
 
-  EXPECT_CALL(handler1, detachPipeline(_));
-  EXPECT_CALL(handler2, detachPipeline(_));
-  EXPECT_CALL(handler3, detachPipeline(_));
+  {
+    InSequence sequence;
+    EXPECT_CALL(handler1, detachPipeline(_));
+    EXPECT_CALL(handler2, detachPipeline(_));
+    EXPECT_CALL(handler3, detachPipeline(_));
+  }
+}
+
+TEST(PipelineTest, DynamicAttachDetachOrder) {
+  IntHandler handler1, handler2;
+  Pipeline<int, int> pipeline;
+  {
+    InSequence sequence;
+    EXPECT_CALL(handler2, attachPipeline(_));
+    EXPECT_CALL(handler1, attachPipeline(_));
+  }
+  pipeline
+    .addBack(&handler1)
+    .addBack(&handler2)
+    .finalize();
+  {
+    InSequence sequence;
+    EXPECT_CALL(handler1, detachPipeline(_));
+    EXPECT_CALL(handler2, detachPipeline(_));
+  }
+}
+
+TEST(PipelineTest, HandlerInMultiplePipelines) {
+  IntHandler handler;
+  EXPECT_CALL(handler, attachPipeline(_)).Times(2);
+  StaticPipeline<int, int, IntHandler> pipeline1(&handler);
+  StaticPipeline<int, int, IntHandler> pipeline2(&handler);
+  EXPECT_CALL(handler, detachPipeline(_)).Times(2);
+}
+
+TEST(PipelineTest, HandlerInPipelineTwice) {
+  IntHandler handler;
+  EXPECT_CALL(handler, attachPipeline(_)).Times(2);
+  StaticPipeline<int, int, IntHandler, IntHandler> pipeline(&handler, &handler);
+  EXPECT_CALL(handler, detachPipeline(_)).Times(2);
 }
 
 template <class Rin, class Rout = Rin, class Win = Rout, class Wout = Rin>
diff --git a/folly/wangle/service/ServiceTest.cpp b/folly/wangle/service/ServiceTest.cpp
index b9c6d815..be018049 100644
--- a/folly/wangle/service/ServiceTest.cpp
+++ b/folly/wangle/service/ServiceTest.cpp
@@ -52,7 +52,6 @@ class ServerPipelineFactory
     pipeline->addBack(StringCodec());
     pipeline->addBack(SerialServerDispatcher<Req, Resp>(&service_));
     pipeline->finalize();
-    pipeline->template getHandler<AsyncSocketHandler>(0)->attachReadCallback();
     return pipeline;
   }
 
@@ -69,8 +68,6 @@ class ClientPipelineFactory : public PipelineFactory<ServicePipeline> {
     auto pipeline = new ServicePipeline();
     pipeline->addBack(AsyncSocketHandler(socket));
     pipeline->addBack(StringCodec());
-    pipeline->template getHandler<AsyncSocketHandler>(0)->attachReadCallback();
-
     return pipeline;
    }
 };