From: James Sedgwick <jsedgwick@fb.com>
Date: Thu, 30 Apr 2015 01:20:45 +0000 (-0700)
Subject: use inbound/outbound handlers in a few more places
X-Git-Tag: v0.38.0~33
X-Git-Url: http://demsky.eecs.uci.edu/git/?a=commitdiff_plain;h=fdd8e84bddefb7fbf07df87b9ed83d6b748c68f3;p=folly.git

use inbound/outbound handlers in a few more places

Summary: Also, instead of throwing on finalize() if there's no inbound/outbound handler in the pipeline, log a warning and throw when the operations are attempted. This was necessary for CodecTest which doesn't use outbound handlers.

Test Plan: unit

Reviewed By: davejwatson@fb.com

Subscribers: trunkagent, fugalh, folly-diffs@, jsedgwick, yfeldblum, chalfant

FB internal diff: D2028176

Tasks: 6836580

Signature: t1:2028176:1430346333:fdd645a535e8158d780cfd1119e27803995b663f
---

diff --git a/folly/wangle/bootstrap/ServerBootstrap-inl.h b/folly/wangle/bootstrap/ServerBootstrap-inl.h
index ee3f9b22..54fde196 100644
--- a/folly/wangle/bootstrap/ServerBootstrap-inl.h
+++ b/folly/wangle/bootstrap/ServerBootstrap-inl.h
@@ -28,7 +28,7 @@ namespace folly {
 template <typename Pipeline>
 class ServerAcceptor
     : public Acceptor
-    , public folly::wangle::HandlerAdapter<void*, std::exception> {
+    , public folly::wangle::InboundHandler<void*> {
   typedef std::unique_ptr<Pipeline,
                           folly::DelayedDestruction::Destructor> PipelinePtr;
 
@@ -86,10 +86,6 @@ class ServerAcceptor
     Acceptor::addConnection(connection);
   }
 
-  folly::Future<void> write(Context* ctx, std::exception e) {
-    return ctx->fireWrite(e);
-  }
-
   /* See Acceptor::onNewConnection for details */
   void onNewConnection(
     AsyncSocket::UniquePtr transport, const SocketAddress* address,
diff --git a/folly/wangle/channel/Handler.h b/folly/wangle/channel/Handler.h
index 4332878f..676c2fab 100644
--- a/folly/wangle/channel/Handler.h
+++ b/folly/wangle/channel/Handler.h
@@ -159,7 +159,7 @@ class HandlerAdapter : public Handler<R, R, W, W> {
 typedef HandlerAdapter<IOBufQueue&, std::unique_ptr<IOBuf>>
 BytesToBytesHandler;
 
-typedef InboundHandler<IOBufQueue&>
+typedef InboundHandler<IOBufQueue&, std::unique_ptr<IOBuf>>
 InboundBytesToBytesHandler;
 
 typedef OutboundHandler<std::unique_ptr<IOBuf>>
diff --git a/folly/wangle/channel/Pipeline.h b/folly/wangle/channel/Pipeline.h
index 48b3db4d..db7a71ef 100644
--- a/folly/wangle/channel/Pipeline.h
+++ b/folly/wangle/channel/Pipeline.h
@@ -62,22 +62,38 @@ class Pipeline : public DelayedDestruction {
   }
 
   void read(R msg) {
+    if (!front_) {
+      throw std::invalid_argument("read(): no inbound handler in Pipeline");
+    }
     front_->read(std::forward<R>(msg));
   }
 
   void readEOF() {
+    if (!front_) {
+      throw std::invalid_argument("readEOF(): no inbound handler in Pipeline");
+    }
     front_->readEOF();
   }
 
   void readException(exception_wrapper e) {
+    if (!front_) {
+      throw std::invalid_argument(
+          "readException(): no inbound handler in Pipeline");
+    }
     front_->readException(std::move(e));
   }
 
   Future<void> write(W msg) {
+    if (!back_) {
+      throw std::invalid_argument("write(): no outbound handler in Pipeline");
+    }
     return back_->write(std::forward<W>(msg));
   }
 
   Future<void> close() {
+    if (!back_) {
+      throw std::invalid_argument("close(): no outbound handler in Pipeline");
+    }
     return back_->close();
   }
 
@@ -138,10 +154,12 @@ class Pipeline : public DelayedDestruction {
     }
 
     if (!front_) {
-      throw std::invalid_argument("no inbound handler in Pipeline");
+      LOG(WARNING) << "No inbound handler in Pipeline, "
+                      "inbound operations will throw std::invalid_argument";
     }
     if (!back_) {
-      throw std::invalid_argument("no outbound handler in Pipeline");
+      LOG(WARNING) << "No outbound handler in Pipeline, "
+                      "outbound operations will throw std::invalid_argument";
     }
 
     for (auto it = ctxs_.rbegin(); it != ctxs_.rend(); it++) {
diff --git a/folly/wangle/channel/test/PipelineTest.cpp b/folly/wangle/channel/test/PipelineTest.cpp
index 46149560..4c940583 100644
--- a/folly/wangle/channel/test/PipelineTest.cpp
+++ b/folly/wangle/channel/test/PipelineTest.cpp
@@ -250,6 +250,13 @@ TEST(PipelineTest, HandlerInPipelineTwice) {
   EXPECT_CALL(handler, detachPipeline(_)).Times(2);
 }
 
+TEST(PipelineTest, NoDetachOnOwner) {
+  IntHandler handler;
+  EXPECT_CALL(handler, attachPipeline(_));
+  StaticPipeline<int, int, IntHandler> pipeline(&handler);
+  pipeline.setOwner(&handler);
+}
+
 template <class Rin, class Rout = Rin, class Win = Rout, class Wout = Rin>
 class ConcreteHandler : public Handler<Rin, Rout, Win, Wout> {
   typedef typename Handler<Rin, Rout, Win, Wout>::Context Context;
@@ -262,22 +269,21 @@ typedef HandlerAdapter<std::string, std::string> StringHandler;
 typedef ConcreteHandler<int, std::string> IntToStringHandler;
 typedef ConcreteHandler<std::string, int> StringToIntHandler;
 
-TEST(Pipeline, DynamicConstruction) {
-  {
-    Pipeline<int, int> pipeline;
-    EXPECT_THROW(
-      pipeline
-        .addBack(HandlerAdapter<std::string, std::string>{})
-        .finalize(), std::invalid_argument);
-  }
-  {
-    Pipeline<int, int> pipeline;
-    EXPECT_THROW(
-      pipeline
-        .addFront(HandlerAdapter<std::string, std::string>{})
-        .finalize(),
+TEST(Pipeline, MissingInboundOrOutbound) {
+  Pipeline<int, int> pipeline;
+  pipeline
+    .addBack(HandlerAdapter<std::string, std::string>{})
+    .finalize();
+  EXPECT_THROW(pipeline.read(0), std::invalid_argument);
+  EXPECT_THROW(pipeline.readEOF(), std::invalid_argument);
+  EXPECT_THROW(
+      pipeline.readException(exception_wrapper(std::runtime_error("blah"))),
       std::invalid_argument);
-  }
+  EXPECT_THROW(pipeline.write(0), std::invalid_argument);
+  EXPECT_THROW(pipeline.close(), std::invalid_argument);
+}
+
+TEST(Pipeline, DynamicConstruction) {
   {
     StaticPipeline<std::string, std::string, StringHandler, StringHandler>
     pipeline{StringHandler(), StringHandler()};
diff --git a/folly/wangle/codec/ByteToMessageCodec.cpp b/folly/wangle/codec/ByteToMessageCodec.cpp
index 52ee794f..e16183bb 100644
--- a/folly/wangle/codec/ByteToMessageCodec.cpp
+++ b/folly/wangle/codec/ByteToMessageCodec.cpp
@@ -23,8 +23,7 @@ void ByteToMessageCodec::read(Context* ctx, IOBufQueue& q) {
   while (true) {
     result = decode(ctx, q, needed);
     if (result) {
-      q_.append(std::move(result));
-      ctx->fireRead(q_);
+      ctx->fireRead(std::move(result));
     } else {
       break;
     }
diff --git a/folly/wangle/codec/ByteToMessageCodec.h b/folly/wangle/codec/ByteToMessageCodec.h
index 53ec3d8e..600bb028 100644
--- a/folly/wangle/codec/ByteToMessageCodec.h
+++ b/folly/wangle/codec/ByteToMessageCodec.h
@@ -47,9 +47,6 @@ class ByteToMessageCodec
     Context* ctx, IOBufQueue& buf, size_t&) = 0;
 
   void read(Context* ctx, IOBufQueue& q);
-
- private:
-  IOBufQueue q_;
 };
 
 }}
diff --git a/folly/wangle/codec/CodecTest.cpp b/folly/wangle/codec/CodecTest.cpp
index c4457788..ecca824f 100644
--- a/folly/wangle/codec/CodecTest.cpp
+++ b/folly/wangle/codec/CodecTest.cpp
@@ -25,13 +25,13 @@ using namespace folly::wangle;
 using namespace folly::io;
 
 class FrameTester
-    : public BytesToBytesHandler {
+    : public InboundHandler<std::unique_ptr<IOBuf>> {
  public:
-  FrameTester(std::function<void(std::unique_ptr<IOBuf>)> test)
+  explicit FrameTester(std::function<void(std::unique_ptr<IOBuf>)> test)
     : test_(test) {}
 
-  void read(Context* ctx, IOBufQueue& q) {
-    test_(q.move());
+  void read(Context* ctx, std::unique_ptr<IOBuf> buf) {
+    test_(std::move(buf));
   }
 
   void readException(Context* ctx, exception_wrapper w) {
diff --git a/folly/wangle/service/ClientDispatcher.h b/folly/wangle/service/ClientDispatcher.h
index b05aa9a2..820d6478 100644
--- a/folly/wangle/service/ClientDispatcher.h
+++ b/folly/wangle/service/ClientDispatcher.h
@@ -26,11 +26,11 @@ namespace folly { namespace wangle {
  * only one request is allowed at a time.
  */
 template <typename Pipeline, typename Req, typename Resp = Req>
-class SerialClientDispatcher : public HandlerAdapter<Req, Resp>
+class SerialClientDispatcher : public InboundHandler<Req>
                              , public Service<Req, Resp> {
  public:
 
-  typedef typename HandlerAdapter<Req, Resp>::Context Context;
+  typedef typename InboundHandler<Req>::Context Context;
 
   void setPipeline(Pipeline* pipeline) {
     pipeline_ = pipeline;