fix detachPipeline/attachPipeline ordering
authorJames Sedgwick <jsedgwick@fb.com>
Mon, 27 Apr 2015 19:12:53 +0000 (12:12 -0700)
committerAndrii Grynenko <andrii@fb.com>
Wed, 29 Apr 2015 22:56:41 +0000 (15:56 -0700)
Summary:
detachPipeline always goes bottom to top
attachPipeline always goes top to bottom
now we can attachReadCallback in AsyncSocketHandler::attachPipeline()

not sure of the implications for TAsyncTransportHandler... looks like Cpp2Channel still wants to attach/detach cb manually

Test Plan: unit

Reviewed By: davejwatson@fb.com

Subscribers: fugalh, folly-diffs@, jsedgwick, yfeldblum, chalfant

FB internal diff: D2023982

Tasks: 6836580

Signature: t1:2023982:1430157500:e09a4103550a3e5721effaa1b28ac5bed071fa67

folly/wangle/channel/AsyncSocketHandler.h
folly/wangle/channel/HandlerContext.h
folly/wangle/channel/Pipeline.h
folly/wangle/channel/StaticPipeline.h
folly/wangle/channel/test/PipelineTest.cpp
folly/wangle/service/ServiceTest.cpp

index 014812b8bdbefd567b54358d25c3a780bbb51919..ad76e08ea4584902c70ff2663c583a5e338fb166 100644 (file)
@@ -67,6 +67,7 @@ class AsyncSocketHandler
   void attachPipeline(Context* ctx) override {
     CHECK(!ctx_);
     ctx_ = ctx;
+    attachReadCallback();
   }
 
   folly::Future<void> write(
index deddb2de012e27ea2ae40fff27c427ad75053b4a..aadc01681afaeb930f81c483311c07cb773f1abf 100644 (file)
@@ -59,6 +59,7 @@ class PipelineContext {
  public:
   virtual ~PipelineContext() {}
 
+  virtual void attachPipeline() = 0;
   virtual void detachPipeline() = 0;
 
   virtual void attachTransport() = 0;
@@ -107,24 +108,31 @@ class ContextImpl : public HandlerContext<typename H::rout,
     initialize(pipeline, std::move(handler));
   }
 
-  void initialize(P* pipeline, std::shared_ptr<H> handler) {
-    pipeline_ = pipeline;
-    handler_ = std::move(handler);
-    handler_->attachPipeline(this);
-  }
-
   // For StaticPipeline
   ContextImpl() {}
 
   ~ContextImpl() {}
 
+  void initialize(P* pipeline, std::shared_ptr<H> handler) {
+    pipeline_ = pipeline;
+    handler_ = std::move(handler);
+  }
+
   H* getHandler() {
     return handler_.get();
   }
 
   // PipelineContext overrides
+  void attachPipeline() override {
+    if (!attached_) {
+      handler_->attachPipeline(this);
+      attached_ = true;
+    }
+  }
+
   void detachPipeline() override {
     handler_->detachPipeline(this);
+    attached_ = false;
   }
 
   void setNextIn(PipelineContext* ctx) override {
@@ -257,6 +265,7 @@ class ContextImpl : public HandlerContext<typename H::rout,
   std::shared_ptr<H> handler_;
   InboundHandlerContext<Rout>* nextIn_{nullptr};
   OutboundHandlerContext<Wout>* nextOut_{nullptr};
+  bool attached_{false};
 };
 
 }}
index 0cabfb2c6c4146f5aa13cde3e7045cf23dd64d0a..643ccdaeec81d415b36561ce9c14b3295dc7bbab 100644 (file)
@@ -33,7 +33,13 @@ namespace folly { namespace wangle {
 template <class R, class W>
 class Pipeline : public DelayedDestruction {
  public:
-  Pipeline() {}
+  Pipeline() : isStatic_(false) {}
+
+  ~Pipeline() {
+    if (!isStatic_) {
+      detachHandlers();
+    }
+  }
 
   std::shared_ptr<AsyncTransport> getTransport() {
     return transport_;
@@ -136,6 +142,10 @@ class Pipeline : public DelayedDestruction {
     if (!front_) {
       throw std::invalid_argument("wrong type for first handler");
     }
+
+    for (auto it = ctxs_.rbegin(); it != ctxs_.rend(); it++) {
+      (*it)->attachPipeline();
+    }
   }
 
   // If one of the handlers owns the pipeline itself, use setOwner to ensure
@@ -170,6 +180,10 @@ class Pipeline : public DelayedDestruction {
   }
 
  protected:
+  explicit Pipeline(bool isStatic) : isStatic_(isStatic) {
+    CHECK(isStatic_);
+  }
+
   template <class Context>
   void addContextFront(Context* context) {
     ctxs_.insert(
@@ -190,6 +204,7 @@ class Pipeline : public DelayedDestruction {
   WriteFlags writeFlags_{WriteFlags::NONE};
   std::pair<uint64_t, uint64_t> readBufferSettings_{2048, 2048};
 
+  bool isStatic_{false};
   InboundHandlerContext<R>* front_{nullptr};
   OutboundHandlerContext<W>* back_{nullptr};
   std::vector<std::shared_ptr<PipelineContext>> ctxs_;
index 3c644eb747c97c8c35f95fa94fb69427fffff67e..6c6c5e0868062269825052730f3513adb3b01c05 100644 (file)
@@ -47,7 +47,7 @@ class StaticPipeline;
 template <class R, class W>
 class StaticPipeline<R, W> : public Pipeline<R, W> {
  protected:
-  explicit StaticPipeline(bool) : Pipeline<R, W>() {}
+  explicit StaticPipeline(bool) : Pipeline<R, W>(true) {}
 };
 
 template <class R, class W, class Handler, class... Handlers>
index d4d2ce7ce67f06f0943dd35dad2b6287252cb37d..37f3fa9ff8c30846614d84c31d9b12eefb4d4daf 100644 (file)
@@ -79,8 +79,11 @@ TEST(PipelineTest, FireActions) {
   IntHandler handler1;
   IntHandler handler2;
 
-  EXPECT_CALL(handler1, attachPipeline(_));
-  EXPECT_CALL(handler2, attachPipeline(_));
+  {
+    InSequence sequence;
+    EXPECT_CALL(handler2, attachPipeline(_));
+    EXPECT_CALL(handler1, attachPipeline(_));
+  }
 
   StaticPipeline<int, int, IntHandler, IntHandler>
   pipeline(&handler1, &handler2);
@@ -105,8 +108,11 @@ TEST(PipelineTest, FireActions) {
   EXPECT_CALL(handler1, close_(_)).Times(1);
   EXPECT_NO_THROW(pipeline.close().value());
 
-  EXPECT_CALL(handler1, detachPipeline(_));
-  EXPECT_CALL(handler2, detachPipeline(_));
+  {
+    InSequence sequence;
+    EXPECT_CALL(handler1, detachPipeline(_));
+    EXPECT_CALL(handler2, detachPipeline(_));
+  }
 }
 
 // Test that nothing bad happens when actions reach the end of the pipeline
@@ -140,8 +146,11 @@ TEST(PipelineTest, TurnAround) {
   IntHandler handler1;
   IntHandler handler2;
 
-  EXPECT_CALL(handler1, attachPipeline(_));
-  EXPECT_CALL(handler2, attachPipeline(_));
+  {
+    InSequence sequence;
+    EXPECT_CALL(handler2, attachPipeline(_));
+    EXPECT_CALL(handler1, attachPipeline(_));
+  }
 
   StaticPipeline<int, int, IntHandler, IntHandler>
   pipeline(&handler1, &handler2);
@@ -151,8 +160,11 @@ TEST(PipelineTest, TurnAround) {
   EXPECT_CALL(handler1, write_(_, _)).Times(1);
   pipeline.read(1);
 
-  EXPECT_CALL(handler1, detachPipeline(_));
-  EXPECT_CALL(handler2, detachPipeline(_));
+  {
+    InSequence sequence;
+    EXPECT_CALL(handler1, detachPipeline(_));
+    EXPECT_CALL(handler2, detachPipeline(_));
+  }
 }
 
 TEST(PipelineTest, DynamicFireActions) {
@@ -161,8 +173,11 @@ TEST(PipelineTest, DynamicFireActions) {
   StaticPipeline<int, int, IntHandler>
   pipeline(&handler2);
 
-  EXPECT_CALL(handler1, attachPipeline(_));
-  EXPECT_CALL(handler3, attachPipeline(_));
+  {
+    InSequence sequence;
+    EXPECT_CALL(handler3, attachPipeline(_));
+    EXPECT_CALL(handler1, attachPipeline(_));
+  }
 
   pipeline
     .addFront(&handler1)
@@ -183,9 +198,46 @@ TEST(PipelineTest, DynamicFireActions) {
   EXPECT_CALL(handler1, write_(_, _)).Times(1);
   EXPECT_NO_THROW(pipeline.write(1).value());
 
-  EXPECT_CALL(handler1, detachPipeline(_));
-  EXPECT_CALL(handler2, detachPipeline(_));
-  EXPECT_CALL(handler3, detachPipeline(_));
+  {
+    InSequence sequence;
+    EXPECT_CALL(handler1, detachPipeline(_));
+    EXPECT_CALL(handler2, detachPipeline(_));
+    EXPECT_CALL(handler3, detachPipeline(_));
+  }
+}
+
+TEST(PipelineTest, DynamicAttachDetachOrder) {
+  IntHandler handler1, handler2;
+  Pipeline<int, int> pipeline;
+  {
+    InSequence sequence;
+    EXPECT_CALL(handler2, attachPipeline(_));
+    EXPECT_CALL(handler1, attachPipeline(_));
+  }
+  pipeline
+    .addBack(&handler1)
+    .addBack(&handler2)
+    .finalize();
+  {
+    InSequence sequence;
+    EXPECT_CALL(handler1, detachPipeline(_));
+    EXPECT_CALL(handler2, detachPipeline(_));
+  }
+}
+
+TEST(PipelineTest, HandlerInMultiplePipelines) {
+  IntHandler handler;
+  EXPECT_CALL(handler, attachPipeline(_)).Times(2);
+  StaticPipeline<int, int, IntHandler> pipeline1(&handler);
+  StaticPipeline<int, int, IntHandler> pipeline2(&handler);
+  EXPECT_CALL(handler, detachPipeline(_)).Times(2);
+}
+
+TEST(PipelineTest, HandlerInPipelineTwice) {
+  IntHandler handler;
+  EXPECT_CALL(handler, attachPipeline(_)).Times(2);
+  StaticPipeline<int, int, IntHandler, IntHandler> pipeline(&handler, &handler);
+  EXPECT_CALL(handler, detachPipeline(_)).Times(2);
 }
 
 template <class Rin, class Rout = Rin, class Win = Rout, class Wout = Rin>
index b9c6d815a54e5be3282b1484518087ea60a682e1..be01804995772f316c9aa4ca133f7bf9e8f75a31 100644 (file)
@@ -52,7 +52,6 @@ class ServerPipelineFactory
     pipeline->addBack(StringCodec());
     pipeline->addBack(SerialServerDispatcher<Req, Resp>(&service_));
     pipeline->finalize();
-    pipeline->template getHandler<AsyncSocketHandler>(0)->attachReadCallback();
     return pipeline;
   }
 
@@ -69,8 +68,6 @@ class ClientPipelineFactory : public PipelineFactory<ServicePipeline> {
     auto pipeline = new ServicePipeline();
     pipeline->addBack(AsyncSocketHandler(socket));
     pipeline->addBack(StringCodec());
-    pipeline->template getHandler<AsyncSocketHandler>(0)->attachReadCallback();
-
     return pipeline;
    }
 };