use inbound/outbound handlers in a few more places
authorJames Sedgwick <jsedgwick@fb.com>
Thu, 30 Apr 2015 01:20:45 +0000 (18:20 -0700)
committerPraveen Kumar Ramakrishnan <praveenr@fb.com>
Tue, 12 May 2015 00:01:18 +0000 (17:01 -0700)
Summary: Also, instead of throwing on finalize() if there's no inbound/outbound handler in the pipeline, log a warning and throw when the operations are attempted. This was necessary for CodecTest which doesn't use outbound handlers.

Test Plan: unit

Reviewed By: davejwatson@fb.com

Subscribers: trunkagent, fugalh, folly-diffs@, jsedgwick, yfeldblum, chalfant

FB internal diff: D2028176

Tasks: 6836580

Signature: t1:2028176:1430346333:fdd645a535e8158d780cfd1119e27803995b663f

folly/wangle/bootstrap/ServerBootstrap-inl.h
folly/wangle/channel/Handler.h
folly/wangle/channel/Pipeline.h
folly/wangle/channel/test/PipelineTest.cpp
folly/wangle/codec/ByteToMessageCodec.cpp
folly/wangle/codec/ByteToMessageCodec.h
folly/wangle/codec/CodecTest.cpp
folly/wangle/service/ClientDispatcher.h

index ee3f9b22799a6771bef6270dae93517580e1279d..54fde1969f18fd7dc73c00f0cc87f3eea50c7985 100644 (file)
@@ -28,7 +28,7 @@ namespace folly {
 template <typename Pipeline>
 class ServerAcceptor
     : public Acceptor
-    , public folly::wangle::HandlerAdapter<void*, std::exception> {
+    , public folly::wangle::InboundHandler<void*> {
   typedef std::unique_ptr<Pipeline,
                           folly::DelayedDestruction::Destructor> PipelinePtr;
 
@@ -86,10 +86,6 @@ class ServerAcceptor
     Acceptor::addConnection(connection);
   }
 
-  folly::Future<void> write(Context* ctx, std::exception e) {
-    return ctx->fireWrite(e);
-  }
-
   /* See Acceptor::onNewConnection for details */
   void onNewConnection(
     AsyncSocket::UniquePtr transport, const SocketAddress* address,
index 4332878fcd6347d59f38550b03b5c6903a925c21..676c2fabf3783e4ee14e6d643cb3d7b8255ab672 100644 (file)
@@ -159,7 +159,7 @@ class HandlerAdapter : public Handler<R, R, W, W> {
 typedef HandlerAdapter<IOBufQueue&, std::unique_ptr<IOBuf>>
 BytesToBytesHandler;
 
-typedef InboundHandler<IOBufQueue&>
+typedef InboundHandler<IOBufQueue&, std::unique_ptr<IOBuf>>
 InboundBytesToBytesHandler;
 
 typedef OutboundHandler<std::unique_ptr<IOBuf>>
index 48b3db4d3213ec8d3b22ae41f2e08405fda80bbe..db7a71ef3571d7f1d2da2be26ada7a9bc65c67e6 100644 (file)
@@ -62,22 +62,38 @@ class Pipeline : public DelayedDestruction {
   }
 
   void read(R msg) {
+    if (!front_) {
+      throw std::invalid_argument("read(): no inbound handler in Pipeline");
+    }
     front_->read(std::forward<R>(msg));
   }
 
   void readEOF() {
+    if (!front_) {
+      throw std::invalid_argument("readEOF(): no inbound handler in Pipeline");
+    }
     front_->readEOF();
   }
 
   void readException(exception_wrapper e) {
+    if (!front_) {
+      throw std::invalid_argument(
+          "readException(): no inbound handler in Pipeline");
+    }
     front_->readException(std::move(e));
   }
 
   Future<void> write(W msg) {
+    if (!back_) {
+      throw std::invalid_argument("write(): no outbound handler in Pipeline");
+    }
     return back_->write(std::forward<W>(msg));
   }
 
   Future<void> close() {
+    if (!back_) {
+      throw std::invalid_argument("close(): no outbound handler in Pipeline");
+    }
     return back_->close();
   }
 
@@ -138,10 +154,12 @@ class Pipeline : public DelayedDestruction {
     }
 
     if (!front_) {
-      throw std::invalid_argument("no inbound handler in Pipeline");
+      LOG(WARNING) << "No inbound handler in Pipeline, "
+                      "inbound operations will throw std::invalid_argument";
     }
     if (!back_) {
-      throw std::invalid_argument("no outbound handler in Pipeline");
+      LOG(WARNING) << "No outbound handler in Pipeline, "
+                      "outbound operations will throw std::invalid_argument";
     }
 
     for (auto it = ctxs_.rbegin(); it != ctxs_.rend(); it++) {
index 46149560d087bcc9061e7055e5875534d21a0176..4c9405837fb29295934b88d5b5ad0a37dc2ded31 100644 (file)
@@ -250,6 +250,13 @@ TEST(PipelineTest, HandlerInPipelineTwice) {
   EXPECT_CALL(handler, detachPipeline(_)).Times(2);
 }
 
+TEST(PipelineTest, NoDetachOnOwner) {
+  IntHandler handler;
+  EXPECT_CALL(handler, attachPipeline(_));
+  StaticPipeline<int, int, IntHandler> pipeline(&handler);
+  pipeline.setOwner(&handler);
+}
+
 template <class Rin, class Rout = Rin, class Win = Rout, class Wout = Rin>
 class ConcreteHandler : public Handler<Rin, Rout, Win, Wout> {
   typedef typename Handler<Rin, Rout, Win, Wout>::Context Context;
@@ -262,22 +269,21 @@ typedef HandlerAdapter<std::string, std::string> StringHandler;
 typedef ConcreteHandler<int, std::string> IntToStringHandler;
 typedef ConcreteHandler<std::string, int> StringToIntHandler;
 
-TEST(Pipeline, DynamicConstruction) {
-  {
-    Pipeline<int, int> pipeline;
-    EXPECT_THROW(
-      pipeline
-        .addBack(HandlerAdapter<std::string, std::string>{})
-        .finalize(), std::invalid_argument);
-  }
-  {
-    Pipeline<int, int> pipeline;
-    EXPECT_THROW(
-      pipeline
-        .addFront(HandlerAdapter<std::string, std::string>{})
-        .finalize(),
+TEST(Pipeline, MissingInboundOrOutbound) {
+  Pipeline<int, int> pipeline;
+  pipeline
+    .addBack(HandlerAdapter<std::string, std::string>{})
+    .finalize();
+  EXPECT_THROW(pipeline.read(0), std::invalid_argument);
+  EXPECT_THROW(pipeline.readEOF(), std::invalid_argument);
+  EXPECT_THROW(
+      pipeline.readException(exception_wrapper(std::runtime_error("blah"))),
       std::invalid_argument);
-  }
+  EXPECT_THROW(pipeline.write(0), std::invalid_argument);
+  EXPECT_THROW(pipeline.close(), std::invalid_argument);
+}
+
+TEST(Pipeline, DynamicConstruction) {
   {
     StaticPipeline<std::string, std::string, StringHandler, StringHandler>
     pipeline{StringHandler(), StringHandler()};
index 52ee794ffffe1a220750332a308d9e8679369f80..e16183bbc3e164addb5e752a36901149a29d64ed 100644 (file)
@@ -23,8 +23,7 @@ void ByteToMessageCodec::read(Context* ctx, IOBufQueue& q) {
   while (true) {
     result = decode(ctx, q, needed);
     if (result) {
-      q_.append(std::move(result));
-      ctx->fireRead(q_);
+      ctx->fireRead(std::move(result));
     } else {
       break;
     }
index 53ec3d8e4201beca16dc04a43829fa052c2db081..600bb02846cb064fccf8d9b05929f7351dd4ac06 100644 (file)
@@ -47,9 +47,6 @@ class ByteToMessageCodec
     Context* ctx, IOBufQueue& buf, size_t&) = 0;
 
   void read(Context* ctx, IOBufQueue& q);
-
- private:
-  IOBufQueue q_;
 };
 
 }}
index c44577888f4cd7794485540c368c83bbdb3ad75f..ecca824fe82e62d8013bbe0481cede0404e9f18f 100644 (file)
@@ -25,13 +25,13 @@ using namespace folly::wangle;
 using namespace folly::io;
 
 class FrameTester
-    : public BytesToBytesHandler {
+    : public InboundHandler<std::unique_ptr<IOBuf>> {
  public:
-  FrameTester(std::function<void(std::unique_ptr<IOBuf>)> test)
+  explicit FrameTester(std::function<void(std::unique_ptr<IOBuf>)> test)
     : test_(test) {}
 
-  void read(Context* ctx, IOBufQueue& q) {
-    test_(q.move());
+  void read(Context* ctx, std::unique_ptr<IOBuf> buf) {
+    test_(std::move(buf));
   }
 
   void readException(Context* ctx, exception_wrapper w) {
index b05aa9a26373831f6a77a54b5e99b06920e8e1a8..820d6478cea6a8639118e01c1d548b835a1a3782 100644 (file)
@@ -26,11 +26,11 @@ namespace folly { namespace wangle {
  * only one request is allowed at a time.
  */
 template <typename Pipeline, typename Req, typename Resp = Req>
-class SerialClientDispatcher : public HandlerAdapter<Req, Resp>
+class SerialClientDispatcher : public InboundHandler<Req>
                              , public Service<Req, Resp> {
  public:
 
-  typedef typename HandlerAdapter<Req, Resp>::Context Context;
+  typedef typename InboundHandler<Req>::Context Context;
 
   void setPipeline(Pipeline* pipeline) {
     pipeline_ = pipeline;