strip Channel from all class names
authorJames Sedgwick <jsedgwick@fb.com>
Mon, 27 Apr 2015 18:19:18 +0000 (11:19 -0700)
committerAlecs King <int@fb.com>
Mon, 27 Apr 2015 23:54:40 +0000 (16:54 -0700)
Summary: as above. Only got a little messy when components within folly::wangle typedefed things to Pipeline

Test Plan: unit tests

Reviewed By: davejwatson@fb.com

Subscribers: wormhole-diffs@, fugalh, alandau, bmatheny, folly-diffs@, jsedgwick, yfeldblum, chalfant

FB internal diff: D2022181

Tasks: 6836580

Signature: t1:2022181:1430157032:df0bdfb9ca0d76b86d52c55c4ad41ea953a18cb4

26 files changed:
folly/Makefile.am
folly/wangle/bootstrap/BootstrapTest.cpp
folly/wangle/bootstrap/ClientBootstrap.h
folly/wangle/bootstrap/ServerBootstrap-inl.h
folly/wangle/bootstrap/ServerBootstrap.cpp
folly/wangle/bootstrap/ServerBootstrap.h
folly/wangle/channel/AsyncSocketHandler.h
folly/wangle/channel/ChannelHandler.h [deleted file]
folly/wangle/channel/ChannelHandlerContext.h [deleted file]
folly/wangle/channel/ChannelPipeline.h [deleted file]
folly/wangle/channel/Handler.h [new file with mode: 0644]
folly/wangle/channel/HandlerContext.h [new file with mode: 0644]
folly/wangle/channel/OutputBufferingHandler.h
folly/wangle/channel/Pipeline.h [new file with mode: 0644]
folly/wangle/channel/test/ChannelPipelineTest.cpp [deleted file]
folly/wangle/channel/test/MockChannelHandler.h [deleted file]
folly/wangle/channel/test/MockHandler.h [new file with mode: 0644]
folly/wangle/channel/test/OutputBufferingHandlerTest.cpp
folly/wangle/channel/test/PipelineTest.cpp [new file with mode: 0644]
folly/wangle/codec/ByteToMessageCodec.h
folly/wangle/codec/CodecTest.cpp
folly/wangle/codec/StringCodec.h
folly/wangle/service/ClientDispatcher.h
folly/wangle/service/ServerDispatcher.h
folly/wangle/service/Service.h
folly/wangle/service/ServiceTest.cpp

index 0cfbed974219ab4198c7efd5a47ee59111f9290b..adba4d1e205d2625c82a031b3cdf78284768cae8 100644 (file)
@@ -273,10 +273,10 @@ nobase_follyinclude_HEADERS = \
        wangle/bootstrap/ServerSocketFactory.h \
        wangle/bootstrap/ClientBootstrap.h \
        wangle/channel/AsyncSocketHandler.h \
-       wangle/channel/ChannelHandler.h \
-       wangle/channel/ChannelHandlerContext.h \
-       wangle/channel/ChannelPipeline.h \
+       wangle/channel/Handler.h \
+       wangle/channel/HandlerContext.h \
        wangle/channel/OutputBufferingHandler.h \
+       wangle/channel/Pipeline.h \
        wangle/concurrent/BlockingQueue.h \
        wangle/concurrent/Codel.h \
        wangle/concurrent/CPUThreadPoolExecutor.h \
index d0ee7e2a4a0ca336aa98690bb34db09a884435db..9087be57977900e1d839732fc01157145e447771 100644 (file)
@@ -16,7 +16,7 @@
 
 #include "folly/wangle/bootstrap/ServerBootstrap.h"
 #include "folly/wangle/bootstrap/ClientBootstrap.h"
-#include "folly/wangle/channel/ChannelHandler.h"
+#include "folly/wangle/channel/Handler.h"
 
 #include <glog/logging.h>
 #include <gtest/gtest.h>
 using namespace folly::wangle;
 using namespace folly;
 
-typedef ChannelPipeline<IOBufQueue&, std::unique_ptr<IOBuf>> Pipeline;
+typedef Pipeline<IOBufQueue&, std::unique_ptr<IOBuf>> BytesPipeline;
 
-typedef ServerBootstrap<Pipeline> TestServer;
-typedef ClientBootstrap<Pipeline> TestClient;
+typedef ServerBootstrap<BytesPipeline> TestServer;
+typedef ClientBootstrap<BytesPipeline> TestClient;
 
-class TestClientPipelineFactory : public PipelineFactory<Pipeline> {
+class TestClientPipelineFactory : public PipelineFactory<BytesPipeline> {
  public:
-  Pipeline* newPipeline(std::shared_ptr<AsyncSocket> sock) {
+  BytesPipeline* newPipeline(std::shared_ptr<AsyncSocket> sock) {
     CHECK(sock->good());
 
     // We probably aren't connected immedately, check after a small delay
@@ -43,11 +43,11 @@ class TestClientPipelineFactory : public PipelineFactory<Pipeline> {
   }
 };
 
-class TestPipelineFactory : public PipelineFactory<Pipeline> {
+class TestPipelineFactory : public PipelineFactory<BytesPipeline> {
  public:
-  Pipeline* newPipeline(std::shared_ptr<AsyncSocket> sock) {
+  BytesPipeline* newPipeline(std::shared_ptr<AsyncSocket> sock) {
     pipelines++;
-    return new Pipeline();
+    return new BytesPipeline();
   }
   std::atomic<int> pipelines{0};
 };
@@ -268,7 +268,7 @@ TEST(Bootstrap, ExistingSocket) {
 std::atomic<int> connections{0};
 
 class TestHandlerPipeline
-    : public ChannelHandlerAdapter<void*,
+    : public HandlerAdapter<void*,
                                    std::exception> {
  public:
   void read(Context* ctx, void* conn) {
@@ -283,12 +283,12 @@ class TestHandlerPipeline
 
 template <typename HandlerPipeline>
 class TestHandlerPipelineFactory
-    : public PipelineFactory<ServerBootstrap<Pipeline>::AcceptPipeline> {
+    : public PipelineFactory<ServerBootstrap<BytesPipeline>::AcceptPipeline> {
  public:
-  ServerBootstrap<Pipeline>::AcceptPipeline* newPipeline(std::shared_ptr<AsyncSocket>) {
-    auto pipeline = new ServerBootstrap<Pipeline>::AcceptPipeline;
+  ServerBootstrap<BytesPipeline>::AcceptPipeline* newPipeline(std::shared_ptr<AsyncSocket>) {
+    auto pipeline = new ServerBootstrap<BytesPipeline>::AcceptPipeline;
     auto handler = std::make_shared<HandlerPipeline>();
-      pipeline->addBack(ChannelHandlerPtr<HandlerPipeline>(handler));
+      pipeline->addBack(HandlerPtr<HandlerPipeline>(handler));
     return pipeline;
   }
 };
@@ -318,7 +318,7 @@ TEST(Bootstrap, LoadBalanceHandler) {
 }
 
 class TestUDPPipeline
-    : public ChannelHandlerAdapter<void*,
+    : public HandlerAdapter<void*,
                                    std::exception> {
  public:
   void read(Context* ctx, void* conn) {
index 37179fdab9f22d3cd26549e23be338691f1618af..2b3f3be2798dab34abe97941fe93cc9abc51fad5 100644 (file)
  */
 #pragma once
 
-#include <folly/wangle/channel/ChannelPipeline.h>
+#include <folly/wangle/channel/Pipeline.h>
 
 namespace folly {
 
 /*
- * A thin wrapper around ChannelPipeline and AsyncSocket to match
+ * A thin wrapper around Pipeline and AsyncSocket to match
  * ServerBootstrap.  On connect() a new pipeline is created.
  */
 template <typename Pipeline>
index e1909265eb6e99204d35d5ad06fbc1677e23e2c8..1e0ebc75d3d827dd051186b41128f26c48d5c838 100644 (file)
 #include <folly/io/async/EventBaseManager.h>
 #include <folly/wangle/concurrent/IOThreadPoolExecutor.h>
 #include <folly/wangle/acceptor/ManagedConnection.h>
-#include <folly/wangle/channel/ChannelPipeline.h>
-#include <folly/wangle/channel/ChannelHandler.h>
+#include <folly/wangle/channel/Pipeline.h>
+#include <folly/wangle/channel/Handler.h>
 
 namespace folly {
 
 template <typename Pipeline>
 class ServerAcceptor
     : public Acceptor
-    , public folly::wangle::ChannelHandlerAdapter<void*, std::exception> {
+    , public folly::wangle::HandlerAdapter<void*, std::exception> {
   typedef std::unique_ptr<Pipeline,
                           folly::DelayedDestruction::Destructor> PipelinePtr;
 
@@ -60,7 +60,7 @@ class ServerAcceptor
  public:
   explicit ServerAcceptor(
         std::shared_ptr<PipelineFactory<Pipeline>> pipelineFactory,
-        std::shared_ptr<folly::wangle::ChannelPipeline<
+        std::shared_ptr<folly::wangle::Pipeline<
                           void*, std::exception>> acceptorPipeline,
         EventBase* base)
       : Acceptor(ServerSocketConfig())
@@ -70,7 +70,7 @@ class ServerAcceptor
     Acceptor::init(nullptr, base_);
     CHECK(acceptorPipeline_);
 
-    acceptorPipeline_->addBack(folly::wangle::ChannelHandlerPtr<ServerAcceptor, false>(this));
+    acceptorPipeline_->addBack(folly::wangle::HandlerPtr<ServerAcceptor, false>(this));
     acceptorPipeline_->finalize();
   }
 
@@ -109,7 +109,7 @@ class ServerAcceptor
   EventBase* base_;
 
   std::shared_ptr<PipelineFactory<Pipeline>> childPipelineFactory_;
-  std::shared_ptr<folly::wangle::ChannelPipeline<
+  std::shared_ptr<folly::wangle::Pipeline<
     void*, std::exception>> acceptorPipeline_;
 };
 
@@ -118,13 +118,13 @@ class ServerAcceptorFactory : public AcceptorFactory {
  public:
   explicit ServerAcceptorFactory(
     std::shared_ptr<PipelineFactory<Pipeline>> factory,
-    std::shared_ptr<PipelineFactory<folly::wangle::ChannelPipeline<
+    std::shared_ptr<PipelineFactory<folly::wangle::Pipeline<
     void*, std::exception>>> pipeline)
     : factory_(factory)
     , pipeline_(pipeline) {}
 
   std::shared_ptr<Acceptor> newAcceptor(EventBase* base) {
-    std::shared_ptr<folly::wangle::ChannelPipeline<
+    std::shared_ptr<folly::wangle::Pipeline<
                       void*, std::exception>> pipeline(
                         pipeline_->newPipeline(nullptr));
     return std::make_shared<ServerAcceptor<Pipeline>>(factory_, pipeline, base);
@@ -132,7 +132,7 @@ class ServerAcceptorFactory : public AcceptorFactory {
  private:
   std::shared_ptr<PipelineFactory<Pipeline>> factory_;
   std::shared_ptr<PipelineFactory<
-    folly::wangle::ChannelPipeline<
+    folly::wangle::Pipeline<
       void*, std::exception>>> pipeline_;
 };
 
@@ -183,8 +183,8 @@ void ServerWorkerPool::forEachWorker(F&& f) const {
 }
 
 class DefaultAcceptPipelineFactory
-    : public PipelineFactory<wangle::ChannelPipeline<void*, std::exception>> {
-  typedef wangle::ChannelPipeline<
+    : public PipelineFactory<wangle::Pipeline<void*, std::exception>> {
+  typedef wangle::Pipeline<
       void*,
       std::exception> AcceptPipeline;
 
index e59ed657560102b122febdbf7fa730702e35e6cb..6b7a41018179b95788a047b93d4960c53af7a753 100644 (file)
@@ -15,7 +15,7 @@
  */
 #include <folly/wangle/bootstrap/ServerBootstrap.h>
 #include <folly/wangle/concurrent/NamedThreadFactory.h>
-#include <folly/wangle/channel/ChannelHandler.h>
+#include <folly/wangle/channel/Handler.h>
 #include <folly/io/async/EventBaseManager.h>
 
 namespace folly {
index 3520b0f51232e2d0ec62f5ea6231664a29504f88..28785a1b3f58ee02c77ac619969ae34aff2e9917 100644 (file)
 
 #include <folly/wangle/bootstrap/ServerBootstrap-inl.h>
 #include <folly/Baton.h>
-#include <folly/wangle/channel/ChannelPipeline.h>
+#include <folly/wangle/channel/Pipeline.h>
 
 namespace folly {
 
-typedef folly::wangle::ChannelPipeline<
+typedef folly::wangle::Pipeline<
   folly::IOBufQueue&, std::unique_ptr<folly::IOBuf>> DefaultPipeline;
 
 /*
@@ -30,7 +30,7 @@ typedef folly::wangle::ChannelPipeline<
  * accepting threads, any number of accepting sockets, a pool of
  * IO-worker threads, and connection pool for each IO thread for you.
  *
- * The output is given as a ChannelPipeline template: given a
+ * The output is given as a Pipeline template: given a
  * PipelineFactory, it will create a new pipeline for each connection,
  * and your server can handle the incoming bytes.
  *
@@ -52,7 +52,7 @@ class ServerBootstrap {
     join();
   }
 
-  typedef wangle::ChannelPipeline<
+  typedef wangle::Pipeline<
    void*,
    std::exception> AcceptPipeline;
   /*
index b5be966f331b37806b9930734d8a800ccc560f71..014812b8bdbefd567b54358d25c3a780bbb51919 100644 (file)
@@ -16,7 +16,7 @@
 
 #pragma once
 
-#include <folly/wangle/channel/ChannelHandler.h>
+#include <folly/wangle/channel/Handler.h>
 #include <folly/io/async/AsyncSocket.h>
 #include <folly/io/async/EventBase.h>
 #include <folly/io/async/EventBaseManager.h>
diff --git a/folly/wangle/channel/ChannelHandler.h b/folly/wangle/channel/ChannelHandler.h
deleted file mode 100644 (file)
index e1aa6d5..0000000
+++ /dev/null
@@ -1,196 +0,0 @@
-/*
- * Copyright 2015 Facebook, Inc.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-#pragma once
-
-#include <folly/futures/Future.h>
-#include <folly/wangle/channel/ChannelPipeline.h>
-#include <folly/io/IOBuf.h>
-#include <folly/io/IOBufQueue.h>
-
-namespace folly { namespace wangle {
-
-template <class Rin, class Rout = Rin, class Win = Rout, class Wout = Rin>
-class ChannelHandler {
- public:
-  typedef Rin rin;
-  typedef Rout rout;
-  typedef Win win;
-  typedef Wout wout;
-  typedef ChannelHandlerContext<Rout, Wout> Context;
-  virtual ~ChannelHandler() {}
-
-  virtual void read(Context* ctx, Rin msg) = 0;
-  virtual void readEOF(Context* ctx) {
-    ctx->fireReadEOF();
-  }
-  virtual void readException(Context* ctx, exception_wrapper e) {
-    ctx->fireReadException(std::move(e));
-  }
-
-  virtual Future<void> write(Context* ctx, Win msg) = 0;
-  virtual Future<void> close(Context* ctx) {
-    return ctx->fireClose();
-  }
-
-  virtual void attachPipeline(Context* ctx) {}
-  virtual void attachTransport(Context* ctx) {}
-
-  virtual void detachPipeline(Context* ctx) {}
-  virtual void detachTransport(Context* ctx) {}
-
-  /*
-  // Other sorts of things we might want, all shamelessly stolen from Netty
-  // inbound
-  virtual void exceptionCaught(
-      ChannelHandlerContext* ctx,
-      exception_wrapper e) {}
-  virtual void channelRegistered(ChannelHandlerContext* ctx) {}
-  virtual void channelUnregistered(ChannelHandlerContext* ctx) {}
-  virtual void channelActive(ChannelHandlerContext* ctx) {}
-  virtual void channelInactive(ChannelHandlerContext* ctx) {}
-  virtual void channelReadComplete(ChannelHandlerContext* ctx) {}
-  virtual void userEventTriggered(ChannelHandlerContext* ctx, void* evt) {}
-  virtual void channelWritabilityChanged(ChannelHandlerContext* ctx) {}
-
-  // outbound
-  virtual Future<void> bind(
-      ChannelHandlerContext* ctx,
-      SocketAddress localAddress) {}
-  virtual Future<void> connect(
-          ChannelHandlerContext* ctx,
-          SocketAddress remoteAddress, SocketAddress localAddress) {}
-  virtual Future<void> disconnect(ChannelHandlerContext* ctx) {}
-  virtual Future<void> deregister(ChannelHandlerContext* ctx) {}
-  virtual Future<void> read(ChannelHandlerContext* ctx) {}
-  virtual void flush(ChannelHandlerContext* ctx) {}
-  */
-};
-
-template <class R, class W = R>
-class ChannelHandlerAdapter : public ChannelHandler<R, R, W, W> {
- public:
-  typedef typename ChannelHandler<R, R, W, W>::Context Context;
-
-  void read(Context* ctx, R msg) override {
-    ctx->fireRead(std::forward<R>(msg));
-  }
-
-  Future<void> write(Context* ctx, W msg) override {
-    return ctx->fireWrite(std::forward<W>(msg));
-  }
-};
-
-typedef ChannelHandlerAdapter<IOBufQueue&, std::unique_ptr<IOBuf>>
-BytesToBytesHandler;
-
-template <class Handler, bool Shared = true>
-class ChannelHandlerPtr : public ChannelHandler<
-                                   typename Handler::rin,
-                                   typename Handler::rout,
-                                   typename Handler::win,
-                                   typename Handler::wout> {
- public:
-  typedef typename std::conditional<
-    Shared,
-    std::shared_ptr<Handler>,
-    Handler*>::type
-  HandlerPtr;
-
-  typedef typename Handler::Context Context;
-
-  explicit ChannelHandlerPtr(HandlerPtr handler)
-    : handler_(std::move(handler)) {}
-
-  HandlerPtr getHandler() {
-    return handler_;
-  }
-
-  void setHandler(HandlerPtr handler) {
-    if (handler == handler_) {
-      return;
-    }
-    if (handler_ && ctx_) {
-      handler_->detachPipeline(ctx_);
-    }
-    handler_ = std::move(handler);
-    if (handler_ && ctx_) {
-      handler_->attachPipeline(ctx_);
-      if (ctx_->getTransport()) {
-        handler_->attachTransport(ctx_);
-      }
-    }
-  }
-
-  void attachPipeline(Context* ctx) override {
-    ctx_ = ctx;
-    if (handler_) {
-      handler_->attachPipeline(ctx_);
-    }
-  }
-
-  void attachTransport(Context* ctx) override {
-    ctx_ = ctx;
-    if (handler_) {
-      handler_->attachTransport(ctx_);
-    }
-  }
-
-  void detachPipeline(Context* ctx) override {
-    ctx_ = ctx;
-    if (handler_) {
-      handler_->detachPipeline(ctx_);
-    }
-  }
-
-  void detachTransport(Context* ctx) override {
-    ctx_ = ctx;
-    if (handler_) {
-      handler_->detachTransport(ctx_);
-    }
-  }
-
-  void read(Context* ctx, typename Handler::rin msg) override {
-    DCHECK(handler_);
-    handler_->read(ctx, std::forward<typename Handler::rin>(msg));
-  }
-
-  void readEOF(Context* ctx) override {
-    DCHECK(handler_);
-    handler_->readEOF(ctx);
-  }
-
-  void readException(Context* ctx, exception_wrapper e) override {
-    DCHECK(handler_);
-    handler_->readException(ctx, std::move(e));
-  }
-
-  Future<void> write(Context* ctx, typename Handler::win msg) override {
-    DCHECK(handler_);
-    return handler_->write(ctx, std::forward<typename Handler::win>(msg));
-  }
-
-  Future<void> close(Context* ctx) override {
-    DCHECK(handler_);
-    return handler_->close(ctx);
-  }
-
- private:
-  Context* ctx_;
-  HandlerPtr handler_;
-};
-
-}}
diff --git a/folly/wangle/channel/ChannelHandlerContext.h b/folly/wangle/channel/ChannelHandlerContext.h
deleted file mode 100644 (file)
index 0cb0fce..0000000
+++ /dev/null
@@ -1,252 +0,0 @@
-/*
- * Copyright 2015 Facebook, Inc.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-#pragma once
-
-#include <folly/io/async/AsyncTransport.h>
-#include <folly/futures/Future.h>
-#include <folly/ExceptionWrapper.h>
-
-namespace folly { namespace wangle {
-
-template <class In, class Out>
-class ChannelHandlerContext {
- public:
-  virtual ~ChannelHandlerContext() {}
-
-  virtual void fireRead(In msg) = 0;
-  virtual void fireReadEOF() = 0;
-  virtual void fireReadException(exception_wrapper e) = 0;
-
-  virtual Future<void> fireWrite(Out msg) = 0;
-  virtual Future<void> fireClose() = 0;
-
-  virtual std::shared_ptr<AsyncTransport> getTransport() = 0;
-
-  virtual void setWriteFlags(WriteFlags flags) = 0;
-  virtual WriteFlags getWriteFlags() = 0;
-
-  virtual void setReadBufferSettings(
-      uint64_t minAvailable,
-      uint64_t allocationSize) = 0;
-  virtual std::pair<uint64_t, uint64_t> getReadBufferSettings() = 0;
-
-  /* TODO
-  template <class H>
-  virtual void addHandlerBefore(H&&) {}
-  template <class H>
-  virtual void addHandlerAfter(H&&) {}
-  template <class H>
-  virtual void replaceHandler(H&&) {}
-  virtual void removeHandler() {}
-  */
-};
-
-class PipelineContext {
- public:
-  virtual ~PipelineContext() {}
-
-  virtual void attachTransport() = 0;
-  virtual void detachTransport() = 0;
-
-  void link(PipelineContext* other) {
-    setNextIn(other);
-    other->setNextOut(this);
-  }
-
- protected:
-  virtual void setNextIn(PipelineContext* ctx) = 0;
-  virtual void setNextOut(PipelineContext* ctx) = 0;
-};
-
-template <class In>
-class InboundChannelHandlerContext {
- public:
-  virtual ~InboundChannelHandlerContext() {}
-  virtual void read(In msg) = 0;
-  virtual void readEOF() = 0;
-  virtual void readException(exception_wrapper e) = 0;
-};
-
-template <class Out>
-class OutboundChannelHandlerContext {
- public:
-  virtual ~OutboundChannelHandlerContext() {}
-  virtual Future<void> write(Out msg) = 0;
-  virtual Future<void> close() = 0;
-};
-
-template <class P, class H>
-class ContextImpl : public ChannelHandlerContext<typename H::rout,
-                                                 typename H::wout>,
-                    public InboundChannelHandlerContext<typename H::rin>,
-                    public OutboundChannelHandlerContext<typename H::win>,
-                    public PipelineContext {
- public:
-  typedef typename H::rin Rin;
-  typedef typename H::rout Rout;
-  typedef typename H::win Win;
-  typedef typename H::wout Wout;
-
-  template <class HandlerArg>
-  explicit ContextImpl(P* pipeline, HandlerArg&& handlerArg)
-    : pipeline_(pipeline),
-      handler_(std::forward<HandlerArg>(handlerArg)) {
-    handler_.attachPipeline(this);
-  }
-
-  ~ContextImpl() {
-    handler_.detachPipeline(this);
-  }
-
-  H* getHandler() {
-    return &handler_;
-  }
-
-  // PipelineContext overrides
-  void setNextIn(PipelineContext* ctx) override {
-    auto nextIn = dynamic_cast<InboundChannelHandlerContext<Rout>*>(ctx);
-    if (nextIn) {
-      nextIn_ = nextIn;
-    } else {
-      throw std::invalid_argument("wrong type in setNextIn");
-    }
-  }
-
-  void setNextOut(PipelineContext* ctx) override {
-    auto nextOut = dynamic_cast<OutboundChannelHandlerContext<Wout>*>(ctx);
-    if (nextOut) {
-      nextOut_ = nextOut;
-    } else {
-      throw std::invalid_argument("wrong type in setNextOut");
-    }
-  }
-
-  void attachTransport() override {
-    typename P::DestructorGuard dg(static_cast<DelayedDestruction*>(pipeline_));
-    handler_.attachTransport(this);
-  }
-
-  void detachTransport() override {
-    typename P::DestructorGuard dg(static_cast<DelayedDestruction*>(pipeline_));
-    handler_.detachTransport(this);
-  }
-
-  // ChannelHandlerContext overrides
-  void fireRead(Rout msg) override {
-    typename P::DestructorGuard dg(static_cast<DelayedDestruction*>(pipeline_));
-    if (nextIn_) {
-      nextIn_->read(std::forward<Rout>(msg));
-    } else {
-      LOG(WARNING) << "read reached end of pipeline";
-    }
-  }
-
-  void fireReadEOF() override {
-    typename P::DestructorGuard dg(static_cast<DelayedDestruction*>(pipeline_));
-    if (nextIn_) {
-      nextIn_->readEOF();
-    } else {
-      LOG(WARNING) << "readEOF reached end of pipeline";
-    }
-  }
-
-  void fireReadException(exception_wrapper e) override {
-    typename P::DestructorGuard dg(static_cast<DelayedDestruction*>(pipeline_));
-    if (nextIn_) {
-      nextIn_->readException(std::move(e));
-    } else {
-      LOG(WARNING) << "readException reached end of pipeline";
-    }
-  }
-
-  Future<void> fireWrite(Wout msg) override {
-    typename P::DestructorGuard dg(static_cast<DelayedDestruction*>(pipeline_));
-    if (nextOut_) {
-      return nextOut_->write(std::forward<Wout>(msg));
-    } else {
-      LOG(WARNING) << "write reached end of pipeline";
-      return makeFuture();
-    }
-  }
-
-  Future<void> fireClose() override {
-    typename P::DestructorGuard dg(static_cast<DelayedDestruction*>(pipeline_));
-    if (nextOut_) {
-      return nextOut_->close();
-    } else {
-      LOG(WARNING) << "close reached end of pipeline";
-      return makeFuture();
-    }
-  }
-
-  std::shared_ptr<AsyncTransport> getTransport() override {
-    return pipeline_->getTransport();
-  }
-
-  void setWriteFlags(WriteFlags flags) override {
-    pipeline_->setWriteFlags(flags);
-  }
-
-  WriteFlags getWriteFlags() override {
-    return pipeline_->getWriteFlags();
-  }
-
-  void setReadBufferSettings(
-      uint64_t minAvailable,
-      uint64_t allocationSize) override {
-    pipeline_->setReadBufferSettings(minAvailable, allocationSize);
-  }
-
-  std::pair<uint64_t, uint64_t> getReadBufferSettings() override {
-    return pipeline_->getReadBufferSettings();
-  }
-
-  // InboundChannelHandlerContext overrides
-  void read(Rin msg) override {
-    typename P::DestructorGuard dg(static_cast<DelayedDestruction*>(pipeline_));
-    handler_.read(this, std::forward<Rin>(msg));
-  }
-
-  void readEOF() override {
-    typename P::DestructorGuard dg(static_cast<DelayedDestruction*>(pipeline_));
-    handler_.readEOF(this);
-  }
-
-  void readException(exception_wrapper e) override {
-    typename P::DestructorGuard dg(static_cast<DelayedDestruction*>(pipeline_));
-    handler_.readException(this, std::move(e));
-  }
-
-  // OutboundChannelHandlerContext overrides
-  Future<void> write(Win msg) override {
-    typename P::DestructorGuard dg(static_cast<DelayedDestruction*>(pipeline_));
-    return handler_.write(this, std::forward<Win>(msg));
-  }
-
-  Future<void> close() override {
-    typename P::DestructorGuard dg(static_cast<DelayedDestruction*>(pipeline_));
-    return handler_.close(this);
-  }
-
- private:
-  P* pipeline_;
-  H handler_;
-  InboundChannelHandlerContext<Rout>* nextIn_{nullptr};
-  OutboundChannelHandlerContext<Wout>* nextOut_{nullptr};
-};
-
-}}
diff --git a/folly/wangle/channel/ChannelPipeline.h b/folly/wangle/channel/ChannelPipeline.h
deleted file mode 100644 (file)
index f791837..0000000
+++ /dev/null
@@ -1,340 +0,0 @@
-/*
- * Copyright 2015 Facebook, Inc.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-#pragma once
-
-#include <folly/wangle/channel/ChannelHandlerContext.h>
-#include <folly/futures/Future.h>
-#include <folly/io/async/AsyncTransport.h>
-#include <folly/io/async/DelayedDestruction.h>
-#include <folly/ExceptionWrapper.h>
-#include <folly/Memory.h>
-#include <glog/logging.h>
-
-namespace folly { namespace wangle {
-
-/*
- * R is the inbound type, i.e. inbound calls start with pipeline.read(R)
- * W is the outbound type, i.e. outbound calls start with pipeline.write(W)
- */
-template <class R, class W, class... Handlers>
-class ChannelPipeline;
-
-template <class R, class W>
-class ChannelPipeline<R, W> : public DelayedDestruction {
- public:
-  ChannelPipeline() {}
-  ~ChannelPipeline() {}
-
-  std::shared_ptr<AsyncTransport> getTransport() {
-    return transport_;
-  }
-
-  void setWriteFlags(WriteFlags flags) {
-    writeFlags_ = flags;
-  }
-
-  WriteFlags getWriteFlags() {
-    return writeFlags_;
-  }
-
-  void setReadBufferSettings(uint64_t minAvailable, uint64_t allocationSize) {
-    readBufferSettings_ = std::make_pair(minAvailable, allocationSize);
-  }
-
-  std::pair<uint64_t, uint64_t> getReadBufferSettings() {
-    return readBufferSettings_;
-  }
-
-  void read(R msg) {
-    front_->read(std::forward<R>(msg));
-  }
-
-  void readEOF() {
-    front_->readEOF();
-  }
-
-  void readException(exception_wrapper e) {
-    front_->readException(std::move(e));
-  }
-
-  Future<void> write(W msg) {
-    return back_->write(std::forward<W>(msg));
-  }
-
-  Future<void> close() {
-    return back_->close();
-  }
-
-  template <class H>
-  ChannelPipeline& addBack(H&& handler) {
-    ctxs_.push_back(folly::make_unique<ContextImpl<ChannelPipeline, H>>(
-        this, std::forward<H>(handler)));
-    return *this;
-  }
-
-  template <class H>
-  ChannelPipeline& addFront(H&& handler) {
-    ctxs_.insert(
-        ctxs_.begin(),
-        folly::make_unique<ContextImpl<ChannelPipeline, H>>(
-            this,
-            std::forward<H>(handler)));
-    return *this;
-  }
-
-  template <class H>
-  H* getHandler(int i) {
-    auto ctx = dynamic_cast<ContextImpl<ChannelPipeline, H>*>(ctxs_[i].get());
-    CHECK(ctx);
-    return ctx->getHandler();
-  }
-
-  void finalize() {
-    finalizeHelper();
-    InboundChannelHandlerContext<R>* front;
-    front_ = dynamic_cast<InboundChannelHandlerContext<R>*>(
-        ctxs_.front().get());
-    if (!front_) {
-      throw std::invalid_argument("wrong type for first handler");
-    }
-  }
-
- protected:
-  explicit ChannelPipeline(bool shouldFinalize) {
-    CHECK(!shouldFinalize);
-  }
-
-  void finalizeHelper() {
-    if (ctxs_.empty()) {
-      return;
-    }
-
-    for (size_t i = 0; i < ctxs_.size() - 1; i++) {
-      ctxs_[i]->link(ctxs_[i+1].get());
-    }
-
-    back_ = dynamic_cast<OutboundChannelHandlerContext<W>*>(ctxs_.back().get());
-    if (!back_) {
-      throw std::invalid_argument("wrong type for last handler");
-    }
-  }
-
-  PipelineContext* getLocalFront() {
-    return ctxs_.empty() ? nullptr : ctxs_.front().get();
-  }
-
-  static const bool is_end{true};
-
-  std::shared_ptr<AsyncTransport> transport_;
-  WriteFlags writeFlags_{WriteFlags::NONE};
-  std::pair<uint64_t, uint64_t> readBufferSettings_{2048, 2048};
-
-  void attachPipeline() {}
-
-  void attachTransport(
-      std::shared_ptr<AsyncTransport> transport) {
-    transport_ = std::move(transport);
-  }
-
-  void detachTransport() {
-    transport_ = nullptr;
-  }
-
-  OutboundChannelHandlerContext<W>* back_{nullptr};
-
- private:
-  InboundChannelHandlerContext<R>* front_{nullptr};
-  std::vector<std::unique_ptr<PipelineContext>> ctxs_;
-};
-
-template <class R, class W, class Handler, class... Handlers>
-class ChannelPipeline<R, W, Handler, Handlers...>
-  : public ChannelPipeline<R, W, Handlers...> {
- protected:
-  template <class HandlerArg, class... HandlersArgs>
-  ChannelPipeline(
-      bool shouldFinalize,
-      HandlerArg&& handlerArg,
-      HandlersArgs&&... handlersArgs)
-    : ChannelPipeline<R, W, Handlers...>(
-          false,
-          std::forward<HandlersArgs>(handlersArgs)...),
-          ctx_(this, std::forward<HandlerArg>(handlerArg)) {
-    if (shouldFinalize) {
-      finalize();
-    }
-  }
-
- public:
-  template <class... HandlersArgs>
-  explicit ChannelPipeline(HandlersArgs&&... handlersArgs)
-    : ChannelPipeline(true, std::forward<HandlersArgs>(handlersArgs)...) {}
-
-  ~ChannelPipeline() {}
-
-  void read(R msg) {
-    typename ChannelPipeline<R, W>::DestructorGuard dg(
-        static_cast<DelayedDestruction*>(this));
-    front_->read(std::forward<R>(msg));
-  }
-
-  void readEOF() {
-    typename ChannelPipeline<R, W>::DestructorGuard dg(
-        static_cast<DelayedDestruction*>(this));
-    front_->readEOF();
-  }
-
-  void readException(exception_wrapper e) {
-    typename ChannelPipeline<R, W>::DestructorGuard dg(
-        static_cast<DelayedDestruction*>(this));
-    front_->readException(std::move(e));
-  }
-
-  Future<void> write(W msg) {
-    typename ChannelPipeline<R, W>::DestructorGuard dg(
-        static_cast<DelayedDestruction*>(this));
-    return back_->write(std::forward<W>(msg));
-  }
-
-  Future<void> close() {
-    typename ChannelPipeline<R, W>::DestructorGuard dg(
-        static_cast<DelayedDestruction*>(this));
-    return back_->close();
-  }
-
-  void attachTransport(
-      std::shared_ptr<AsyncTransport> transport) {
-    typename ChannelPipeline<R, W>::DestructorGuard dg(
-        static_cast<DelayedDestruction*>(this));
-    CHECK((!ChannelPipeline<R, W>::transport_));
-    ChannelPipeline<R, W, Handlers...>::attachTransport(std::move(transport));
-    forEachCtx([&](PipelineContext* ctx){
-      ctx->attachTransport();
-    });
-  }
-
-  void detachTransport() {
-    typename ChannelPipeline<R, W>::DestructorGuard dg(
-        static_cast<DelayedDestruction*>(this));
-    ChannelPipeline<R, W, Handlers...>::detachTransport();
-    forEachCtx([&](PipelineContext* ctx){
-      ctx->detachTransport();
-    });
-  }
-
-  std::shared_ptr<AsyncTransport> getTransport() {
-    return ChannelPipeline<R, W>::transport_;
-  }
-
-  template <class H>
-  ChannelPipeline& addBack(H&& handler) {
-    ChannelPipeline<R, W>::addBack(std::move(handler));
-    return *this;
-  }
-
-  template <class H>
-  ChannelPipeline& addFront(H&& handler) {
-    ctxs_.insert(
-        ctxs_.begin(),
-        folly::make_unique<ContextImpl<ChannelPipeline, H>>(
-            this,
-            std::move(handler)));
-    return *this;
-  }
-
-  template <class H>
-  H* getHandler(size_t i) {
-    if (i > ctxs_.size()) {
-      return ChannelPipeline<R, W, Handlers...>::template getHandler<H>(
-          i - (ctxs_.size() + 1));
-    } else {
-      auto pctx = (i == ctxs_.size()) ? &ctx_ : ctxs_[i].get();
-      auto ctx = dynamic_cast<ContextImpl<ChannelPipeline, H>*>(pctx);
-      return ctx->getHandler();
-    }
-  }
-
-  void finalize() {
-    finalizeHelper();
-    auto ctx = ctxs_.empty() ? &ctx_ : ctxs_.front().get();
-    front_ = dynamic_cast<InboundChannelHandlerContext<R>*>(ctx);
-    if (!front_) {
-      throw std::invalid_argument("wrong type for first handler");
-    }
-  }
-
- protected:
-  void finalizeHelper() {
-    ChannelPipeline<R, W, Handlers...>::finalizeHelper();
-    back_ = ChannelPipeline<R, W, Handlers...>::back_;
-    if (!back_) {
-      auto is_at_end = ChannelPipeline<R, W, Handlers...>::is_end;
-      CHECK(is_at_end);
-      back_ = dynamic_cast<OutboundChannelHandlerContext<W>*>(&ctx_);
-      if (!back_) {
-        throw std::invalid_argument("wrong type for last handler");
-      }
-    }
-
-    if (!ctxs_.empty()) {
-      for (size_t i = 0; i < ctxs_.size() - 1; i++) {
-        ctxs_[i]->link(ctxs_[i+1].get());
-      }
-      ctxs_.back()->link(&ctx_);
-    }
-
-    auto nextFront = ChannelPipeline<R, W, Handlers...>::getLocalFront();
-    if (nextFront) {
-      ctx_.link(nextFront);
-    }
-  }
-
-  PipelineContext* getLocalFront() {
-    return ctxs_.empty() ? &ctx_ : ctxs_.front().get();
-  }
-
-  static const bool is_end{false};
-  InboundChannelHandlerContext<R>* front_{nullptr};
-  OutboundChannelHandlerContext<W>* back_{nullptr};
-
- private:
-  template <class F>
-  void forEachCtx(const F& func) {
-    for (auto& ctx : ctxs_) {
-      func(ctx.get());
-    }
-    func(&ctx_);
-  }
-
-  ContextImpl<ChannelPipeline, Handler> ctx_;
-  std::vector<std::unique_ptr<PipelineContext>> ctxs_;
-};
-
-}}
-
-namespace folly {
-
-class AsyncSocket;
-
-template <typename Pipeline>
-class PipelineFactory {
- public:
-  virtual Pipeline* newPipeline(std::shared_ptr<AsyncSocket>) = 0;
-  virtual ~PipelineFactory() {}
-};
-
-}
diff --git a/folly/wangle/channel/Handler.h b/folly/wangle/channel/Handler.h
new file mode 100644 (file)
index 0000000..67219f1
--- /dev/null
@@ -0,0 +1,196 @@
+/*
+ * Copyright 2015 Facebook, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#pragma once
+
+#include <folly/futures/Future.h>
+#include <folly/wangle/channel/Pipeline.h>
+#include <folly/io/IOBuf.h>
+#include <folly/io/IOBufQueue.h>
+
+namespace folly { namespace wangle {
+
+template <class Rin, class Rout = Rin, class Win = Rout, class Wout = Rin>
+class Handler {
+ public:
+  typedef Rin rin;
+  typedef Rout rout;
+  typedef Win win;
+  typedef Wout wout;
+  typedef HandlerContext<Rout, Wout> Context;
+  virtual ~Handler() {}
+
+  virtual void read(Context* ctx, Rin msg) = 0;
+  virtual void readEOF(Context* ctx) {
+    ctx->fireReadEOF();
+  }
+  virtual void readException(Context* ctx, exception_wrapper e) {
+    ctx->fireReadException(std::move(e));
+  }
+
+  virtual Future<void> write(Context* ctx, Win msg) = 0;
+  virtual Future<void> close(Context* ctx) {
+    return ctx->fireClose();
+  }
+
+  virtual void attachPipeline(Context* ctx) {}
+  virtual void attachTransport(Context* ctx) {}
+
+  virtual void detachPipeline(Context* ctx) {}
+  virtual void detachTransport(Context* ctx) {}
+
+  /*
+  // Other sorts of things we might want, all shamelessly stolen from Netty
+  // inbound
+  virtual void exceptionCaught(
+      HandlerContext* ctx,
+      exception_wrapper e) {}
+  virtual void channelRegistered(HandlerContext* ctx) {}
+  virtual void channelUnregistered(HandlerContext* ctx) {}
+  virtual void channelActive(HandlerContext* ctx) {}
+  virtual void channelInactive(HandlerContext* ctx) {}
+  virtual void channelReadComplete(HandlerContext* ctx) {}
+  virtual void userEventTriggered(HandlerContext* ctx, void* evt) {}
+  virtual void channelWritabilityChanged(HandlerContext* ctx) {}
+
+  // outbound
+  virtual Future<void> bind(
+      HandlerContext* ctx,
+      SocketAddress localAddress) {}
+  virtual Future<void> connect(
+          HandlerContext* ctx,
+          SocketAddress remoteAddress, SocketAddress localAddress) {}
+  virtual Future<void> disconnect(HandlerContext* ctx) {}
+  virtual Future<void> deregister(HandlerContext* ctx) {}
+  virtual Future<void> read(HandlerContext* ctx) {}
+  virtual void flush(HandlerContext* ctx) {}
+  */
+};
+
+template <class R, class W = R>
+class HandlerAdapter : public Handler<R, R, W, W> {
+ public:
+  typedef typename Handler<R, R, W, W>::Context Context;
+
+  void read(Context* ctx, R msg) override {
+    ctx->fireRead(std::forward<R>(msg));
+  }
+
+  Future<void> write(Context* ctx, W msg) override {
+    return ctx->fireWrite(std::forward<W>(msg));
+  }
+};
+
+typedef HandlerAdapter<IOBufQueue&, std::unique_ptr<IOBuf>>
+BytesToBytesHandler;
+
+template <class HandlerT, bool Shared = true>
+class HandlerPtr : public Handler<
+                            typename HandlerT::rin,
+                            typename HandlerT::rout,
+                            typename HandlerT::win,
+                            typename HandlerT::wout> {
+ public:
+  typedef typename std::conditional<
+    Shared,
+    std::shared_ptr<HandlerT>,
+    HandlerT*>::type
+  Ptr;
+
+  typedef typename HandlerT::Context Context;
+
+  explicit HandlerPtr(Ptr handler)
+    : handler_(std::move(handler)) {}
+
+  Ptr getHandler() {
+    return handler_;
+  }
+
+  void setHandler(Ptr handler) {
+    if (handler == handler_) {
+      return;
+    }
+    if (handler_ && ctx_) {
+      handler_->detachPipeline(ctx_);
+    }
+    handler_ = std::move(handler);
+    if (handler_ && ctx_) {
+      handler_->attachPipeline(ctx_);
+      if (ctx_->getTransport()) {
+        handler_->attachTransport(ctx_);
+      }
+    }
+  }
+
+  void attachPipeline(Context* ctx) override {
+    ctx_ = ctx;
+    if (handler_) {
+      handler_->attachPipeline(ctx_);
+    }
+  }
+
+  void attachTransport(Context* ctx) override {
+    ctx_ = ctx;
+    if (handler_) {
+      handler_->attachTransport(ctx_);
+    }
+  }
+
+  void detachPipeline(Context* ctx) override {
+    ctx_ = ctx;
+    if (handler_) {
+      handler_->detachPipeline(ctx_);
+    }
+  }
+
+  void detachTransport(Context* ctx) override {
+    ctx_ = ctx;
+    if (handler_) {
+      handler_->detachTransport(ctx_);
+    }
+  }
+
+  void read(Context* ctx, typename HandlerT::rin msg) override {
+    DCHECK(handler_);
+    handler_->read(ctx, std::forward<typename HandlerT::rin>(msg));
+  }
+
+  void readEOF(Context* ctx) override {
+    DCHECK(handler_);
+    handler_->readEOF(ctx);
+  }
+
+  void readException(Context* ctx, exception_wrapper e) override {
+    DCHECK(handler_);
+    handler_->readException(ctx, std::move(e));
+  }
+
+  Future<void> write(Context* ctx, typename HandlerT::win msg) override {
+    DCHECK(handler_);
+    return handler_->write(ctx, std::forward<typename HandlerT::win>(msg));
+  }
+
+  Future<void> close(Context* ctx) override {
+    DCHECK(handler_);
+    return handler_->close(ctx);
+  }
+
+ private:
+  Context* ctx_;
+  Ptr handler_;
+};
+
+}}
diff --git a/folly/wangle/channel/HandlerContext.h b/folly/wangle/channel/HandlerContext.h
new file mode 100644 (file)
index 0000000..809f2b1
--- /dev/null
@@ -0,0 +1,252 @@
+/*
+ * Copyright 2015 Facebook, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#pragma once
+
+#include <folly/io/async/AsyncTransport.h>
+#include <folly/futures/Future.h>
+#include <folly/ExceptionWrapper.h>
+
+namespace folly { namespace wangle {
+
+template <class In, class Out>
+class HandlerContext {
+ public:
+  virtual ~HandlerContext() {}
+
+  virtual void fireRead(In msg) = 0;
+  virtual void fireReadEOF() = 0;
+  virtual void fireReadException(exception_wrapper e) = 0;
+
+  virtual Future<void> fireWrite(Out msg) = 0;
+  virtual Future<void> fireClose() = 0;
+
+  virtual std::shared_ptr<AsyncTransport> getTransport() = 0;
+
+  virtual void setWriteFlags(WriteFlags flags) = 0;
+  virtual WriteFlags getWriteFlags() = 0;
+
+  virtual void setReadBufferSettings(
+      uint64_t minAvailable,
+      uint64_t allocationSize) = 0;
+  virtual std::pair<uint64_t, uint64_t> getReadBufferSettings() = 0;
+
+  /* TODO
+  template <class H>
+  virtual void addHandlerBefore(H&&) {}
+  template <class H>
+  virtual void addHandlerAfter(H&&) {}
+  template <class H>
+  virtual void replaceHandler(H&&) {}
+  virtual void removeHandler() {}
+  */
+};
+
+class PipelineContext {
+ public:
+  virtual ~PipelineContext() {}
+
+  virtual void attachTransport() = 0;
+  virtual void detachTransport() = 0;
+
+  void link(PipelineContext* other) {
+    setNextIn(other);
+    other->setNextOut(this);
+  }
+
+ protected:
+  virtual void setNextIn(PipelineContext* ctx) = 0;
+  virtual void setNextOut(PipelineContext* ctx) = 0;
+};
+
+template <class In>
+class InboundHandlerContext {
+ public:
+  virtual ~InboundHandlerContext() {}
+  virtual void read(In msg) = 0;
+  virtual void readEOF() = 0;
+  virtual void readException(exception_wrapper e) = 0;
+};
+
+template <class Out>
+class OutboundHandlerContext {
+ public:
+  virtual ~OutboundHandlerContext() {}
+  virtual Future<void> write(Out msg) = 0;
+  virtual Future<void> close() = 0;
+};
+
+template <class P, class H>
+class ContextImpl : public HandlerContext<typename H::rout,
+                                                 typename H::wout>,
+                    public InboundHandlerContext<typename H::rin>,
+                    public OutboundHandlerContext<typename H::win>,
+                    public PipelineContext {
+ public:
+  typedef typename H::rin Rin;
+  typedef typename H::rout Rout;
+  typedef typename H::win Win;
+  typedef typename H::wout Wout;
+
+  template <class HandlerArg>
+  explicit ContextImpl(P* pipeline, HandlerArg&& handlerArg)
+    : pipeline_(pipeline),
+      handler_(std::forward<HandlerArg>(handlerArg)) {
+    handler_.attachPipeline(this);
+  }
+
+  ~ContextImpl() {
+    handler_.detachPipeline(this);
+  }
+
+  H* getHandler() {
+    return &handler_;
+  }
+
+  // PipelineContext overrides
+  void setNextIn(PipelineContext* ctx) override {
+    auto nextIn = dynamic_cast<InboundHandlerContext<Rout>*>(ctx);
+    if (nextIn) {
+      nextIn_ = nextIn;
+    } else {
+      throw std::invalid_argument("wrong type in setNextIn");
+    }
+  }
+
+  void setNextOut(PipelineContext* ctx) override {
+    auto nextOut = dynamic_cast<OutboundHandlerContext<Wout>*>(ctx);
+    if (nextOut) {
+      nextOut_ = nextOut;
+    } else {
+      throw std::invalid_argument("wrong type in setNextOut");
+    }
+  }
+
+  void attachTransport() override {
+    typename P::DestructorGuard dg(static_cast<DelayedDestruction*>(pipeline_));
+    handler_.attachTransport(this);
+  }
+
+  void detachTransport() override {
+    typename P::DestructorGuard dg(static_cast<DelayedDestruction*>(pipeline_));
+    handler_.detachTransport(this);
+  }
+
+  // HandlerContext overrides
+  void fireRead(Rout msg) override {
+    typename P::DestructorGuard dg(static_cast<DelayedDestruction*>(pipeline_));
+    if (nextIn_) {
+      nextIn_->read(std::forward<Rout>(msg));
+    } else {
+      LOG(WARNING) << "read reached end of pipeline";
+    }
+  }
+
+  void fireReadEOF() override {
+    typename P::DestructorGuard dg(static_cast<DelayedDestruction*>(pipeline_));
+    if (nextIn_) {
+      nextIn_->readEOF();
+    } else {
+      LOG(WARNING) << "readEOF reached end of pipeline";
+    }
+  }
+
+  void fireReadException(exception_wrapper e) override {
+    typename P::DestructorGuard dg(static_cast<DelayedDestruction*>(pipeline_));
+    if (nextIn_) {
+      nextIn_->readException(std::move(e));
+    } else {
+      LOG(WARNING) << "readException reached end of pipeline";
+    }
+  }
+
+  Future<void> fireWrite(Wout msg) override {
+    typename P::DestructorGuard dg(static_cast<DelayedDestruction*>(pipeline_));
+    if (nextOut_) {
+      return nextOut_->write(std::forward<Wout>(msg));
+    } else {
+      LOG(WARNING) << "write reached end of pipeline";
+      return makeFuture();
+    }
+  }
+
+  Future<void> fireClose() override {
+    typename P::DestructorGuard dg(static_cast<DelayedDestruction*>(pipeline_));
+    if (nextOut_) {
+      return nextOut_->close();
+    } else {
+      LOG(WARNING) << "close reached end of pipeline";
+      return makeFuture();
+    }
+  }
+
+  std::shared_ptr<AsyncTransport> getTransport() override {
+    return pipeline_->getTransport();
+  }
+
+  void setWriteFlags(WriteFlags flags) override {
+    pipeline_->setWriteFlags(flags);
+  }
+
+  WriteFlags getWriteFlags() override {
+    return pipeline_->getWriteFlags();
+  }
+
+  void setReadBufferSettings(
+      uint64_t minAvailable,
+      uint64_t allocationSize) override {
+    pipeline_->setReadBufferSettings(minAvailable, allocationSize);
+  }
+
+  std::pair<uint64_t, uint64_t> getReadBufferSettings() override {
+    return pipeline_->getReadBufferSettings();
+  }
+
+  // InboundHandlerContext overrides
+  void read(Rin msg) override {
+    typename P::DestructorGuard dg(static_cast<DelayedDestruction*>(pipeline_));
+    handler_.read(this, std::forward<Rin>(msg));
+  }
+
+  void readEOF() override {
+    typename P::DestructorGuard dg(static_cast<DelayedDestruction*>(pipeline_));
+    handler_.readEOF(this);
+  }
+
+  void readException(exception_wrapper e) override {
+    typename P::DestructorGuard dg(static_cast<DelayedDestruction*>(pipeline_));
+    handler_.readException(this, std::move(e));
+  }
+
+  // OutboundHandlerContext overrides
+  Future<void> write(Win msg) override {
+    typename P::DestructorGuard dg(static_cast<DelayedDestruction*>(pipeline_));
+    return handler_.write(this, std::forward<Win>(msg));
+  }
+
+  Future<void> close() override {
+    typename P::DestructorGuard dg(static_cast<DelayedDestruction*>(pipeline_));
+    return handler_.close(this);
+  }
+
+ private:
+  P* pipeline_;
+  H handler_;
+  InboundHandlerContext<Rout>* nextIn_{nullptr};
+  OutboundHandlerContext<Wout>* nextOut_{nullptr};
+};
+
+}}
index e5ca99aee94bbc01fe8fda8af8b970ad7751fde0..73fc06667a5edd06290a822d318cbeac5127eff7 100644 (file)
@@ -16,7 +16,7 @@
 
 #pragma once
 
-#include <folly/wangle/channel/ChannelHandler.h>
+#include <folly/wangle/channel/Handler.h>
 #include <folly/io/async/EventBase.h>
 #include <folly/io/async/EventBaseManager.h>
 #include <folly/io/IOBuf.h>
diff --git a/folly/wangle/channel/Pipeline.h b/folly/wangle/channel/Pipeline.h
new file mode 100644 (file)
index 0000000..7d4fd2a
--- /dev/null
@@ -0,0 +1,340 @@
+/*
+ * Copyright 2015 Facebook, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#pragma once
+
+#include <folly/wangle/channel/HandlerContext.h>
+#include <folly/futures/Future.h>
+#include <folly/io/async/AsyncTransport.h>
+#include <folly/io/async/DelayedDestruction.h>
+#include <folly/ExceptionWrapper.h>
+#include <folly/Memory.h>
+#include <glog/logging.h>
+
+namespace folly { namespace wangle {
+
+/*
+ * R is the inbound type, i.e. inbound calls start with pipeline.read(R)
+ * W is the outbound type, i.e. outbound calls start with pipeline.write(W)
+ */
+template <class R, class W, class... Handlers>
+class Pipeline;
+
+template <class R, class W>
+class Pipeline<R, W> : public DelayedDestruction {
+ public:
+  Pipeline() {}
+  ~Pipeline() {}
+
+  std::shared_ptr<AsyncTransport> getTransport() {
+    return transport_;
+  }
+
+  void setWriteFlags(WriteFlags flags) {
+    writeFlags_ = flags;
+  }
+
+  WriteFlags getWriteFlags() {
+    return writeFlags_;
+  }
+
+  void setReadBufferSettings(uint64_t minAvailable, uint64_t allocationSize) {
+    readBufferSettings_ = std::make_pair(minAvailable, allocationSize);
+  }
+
+  std::pair<uint64_t, uint64_t> getReadBufferSettings() {
+    return readBufferSettings_;
+  }
+
+  void read(R msg) {
+    front_->read(std::forward<R>(msg));
+  }
+
+  void readEOF() {
+    front_->readEOF();
+  }
+
+  void readException(exception_wrapper e) {
+    front_->readException(std::move(e));
+  }
+
+  Future<void> write(W msg) {
+    return back_->write(std::forward<W>(msg));
+  }
+
+  Future<void> close() {
+    return back_->close();
+  }
+
+  template <class H>
+  Pipeline& addBack(H&& handler) {
+    ctxs_.push_back(folly::make_unique<ContextImpl<Pipeline, H>>(
+        this, std::forward<H>(handler)));
+    return *this;
+  }
+
+  template <class H>
+  Pipeline& addFront(H&& handler) {
+    ctxs_.insert(
+        ctxs_.begin(),
+        folly::make_unique<ContextImpl<Pipeline, H>>(
+            this,
+            std::forward<H>(handler)));
+    return *this;
+  }
+
+  template <class H>
+  H* getHandler(int i) {
+    auto ctx = dynamic_cast<ContextImpl<Pipeline, H>*>(ctxs_[i].get());
+    CHECK(ctx);
+    return ctx->getHandler();
+  }
+
+  void finalize() {
+    finalizeHelper();
+    InboundHandlerContext<R>* front;
+    front_ = dynamic_cast<InboundHandlerContext<R>*>(
+        ctxs_.front().get());
+    if (!front_) {
+      throw std::invalid_argument("wrong type for first handler");
+    }
+  }
+
+ protected:
+  explicit Pipeline(bool shouldFinalize) {
+    CHECK(!shouldFinalize);
+  }
+
+  void finalizeHelper() {
+    if (ctxs_.empty()) {
+      return;
+    }
+
+    for (size_t i = 0; i < ctxs_.size() - 1; i++) {
+      ctxs_[i]->link(ctxs_[i+1].get());
+    }
+
+    back_ = dynamic_cast<OutboundHandlerContext<W>*>(ctxs_.back().get());
+    if (!back_) {
+      throw std::invalid_argument("wrong type for last handler");
+    }
+  }
+
+  PipelineContext* getLocalFront() {
+    return ctxs_.empty() ? nullptr : ctxs_.front().get();
+  }
+
+  static const bool is_end{true};
+
+  std::shared_ptr<AsyncTransport> transport_;
+  WriteFlags writeFlags_{WriteFlags::NONE};
+  std::pair<uint64_t, uint64_t> readBufferSettings_{2048, 2048};
+
+  void attachPipeline() {}
+
+  void attachTransport(
+      std::shared_ptr<AsyncTransport> transport) {
+    transport_ = std::move(transport);
+  }
+
+  void detachTransport() {
+    transport_ = nullptr;
+  }
+
+  OutboundHandlerContext<W>* back_{nullptr};
+
+ private:
+  InboundHandlerContext<R>* front_{nullptr};
+  std::vector<std::unique_ptr<PipelineContext>> ctxs_;
+};
+
+template <class R, class W, class Handler, class... Handlers>
+class Pipeline<R, W, Handler, Handlers...>
+  : public Pipeline<R, W, Handlers...> {
+ protected:
+  template <class HandlerArg, class... HandlersArgs>
+  Pipeline(
+      bool shouldFinalize,
+      HandlerArg&& handlerArg,
+      HandlersArgs&&... handlersArgs)
+    : Pipeline<R, W, Handlers...>(
+          false,
+          std::forward<HandlersArgs>(handlersArgs)...),
+          ctx_(this, std::forward<HandlerArg>(handlerArg)) {
+    if (shouldFinalize) {
+      finalize();
+    }
+  }
+
+ public:
+  template <class... HandlersArgs>
+  explicit Pipeline(HandlersArgs&&... handlersArgs)
+    : Pipeline(true, std::forward<HandlersArgs>(handlersArgs)...) {}
+
+  ~Pipeline() {}
+
+  void read(R msg) {
+    typename Pipeline<R, W>::DestructorGuard dg(
+        static_cast<DelayedDestruction*>(this));
+    front_->read(std::forward<R>(msg));
+  }
+
+  void readEOF() {
+    typename Pipeline<R, W>::DestructorGuard dg(
+        static_cast<DelayedDestruction*>(this));
+    front_->readEOF();
+  }
+
+  void readException(exception_wrapper e) {
+    typename Pipeline<R, W>::DestructorGuard dg(
+        static_cast<DelayedDestruction*>(this));
+    front_->readException(std::move(e));
+  }
+
+  Future<void> write(W msg) {
+    typename Pipeline<R, W>::DestructorGuard dg(
+        static_cast<DelayedDestruction*>(this));
+    return back_->write(std::forward<W>(msg));
+  }
+
+  Future<void> close() {
+    typename Pipeline<R, W>::DestructorGuard dg(
+        static_cast<DelayedDestruction*>(this));
+    return back_->close();
+  }
+
+  void attachTransport(
+      std::shared_ptr<AsyncTransport> transport) {
+    typename Pipeline<R, W>::DestructorGuard dg(
+        static_cast<DelayedDestruction*>(this));
+    CHECK((!Pipeline<R, W>::transport_));
+    Pipeline<R, W, Handlers...>::attachTransport(std::move(transport));
+    forEachCtx([&](PipelineContext* ctx){
+      ctx->attachTransport();
+    });
+  }
+
+  void detachTransport() {
+    typename Pipeline<R, W>::DestructorGuard dg(
+        static_cast<DelayedDestruction*>(this));
+    Pipeline<R, W, Handlers...>::detachTransport();
+    forEachCtx([&](PipelineContext* ctx){
+      ctx->detachTransport();
+    });
+  }
+
+  std::shared_ptr<AsyncTransport> getTransport() {
+    return Pipeline<R, W>::transport_;
+  }
+
+  template <class H>
+  Pipeline& addBack(H&& handler) {
+    Pipeline<R, W>::addBack(std::move(handler));
+    return *this;
+  }
+
+  template <class H>
+  Pipeline& addFront(H&& handler) {
+    ctxs_.insert(
+        ctxs_.begin(),
+        folly::make_unique<ContextImpl<Pipeline, H>>(
+            this,
+            std::move(handler)));
+    return *this;
+  }
+
+  template <class H>
+  H* getHandler(size_t i) {
+    if (i > ctxs_.size()) {
+      return Pipeline<R, W, Handlers...>::template getHandler<H>(
+          i - (ctxs_.size() + 1));
+    } else {
+      auto pctx = (i == ctxs_.size()) ? &ctx_ : ctxs_[i].get();
+      auto ctx = dynamic_cast<ContextImpl<Pipeline, H>*>(pctx);
+      return ctx->getHandler();
+    }
+  }
+
+  void finalize() {
+    finalizeHelper();
+    auto ctx = ctxs_.empty() ? &ctx_ : ctxs_.front().get();
+    front_ = dynamic_cast<InboundHandlerContext<R>*>(ctx);
+    if (!front_) {
+      throw std::invalid_argument("wrong type for first handler");
+    }
+  }
+
+ protected:
+  void finalizeHelper() {
+    Pipeline<R, W, Handlers...>::finalizeHelper();
+    back_ = Pipeline<R, W, Handlers...>::back_;
+    if (!back_) {
+      auto is_at_end = Pipeline<R, W, Handlers...>::is_end;
+      CHECK(is_at_end);
+      back_ = dynamic_cast<OutboundHandlerContext<W>*>(&ctx_);
+      if (!back_) {
+        throw std::invalid_argument("wrong type for last handler");
+      }
+    }
+
+    if (!ctxs_.empty()) {
+      for (size_t i = 0; i < ctxs_.size() - 1; i++) {
+        ctxs_[i]->link(ctxs_[i+1].get());
+      }
+      ctxs_.back()->link(&ctx_);
+    }
+
+    auto nextFront = Pipeline<R, W, Handlers...>::getLocalFront();
+    if (nextFront) {
+      ctx_.link(nextFront);
+    }
+  }
+
+  PipelineContext* getLocalFront() {
+    return ctxs_.empty() ? &ctx_ : ctxs_.front().get();
+  }
+
+  static const bool is_end{false};
+  InboundHandlerContext<R>* front_{nullptr};
+  OutboundHandlerContext<W>* back_{nullptr};
+
+ private:
+  template <class F>
+  void forEachCtx(const F& func) {
+    for (auto& ctx : ctxs_) {
+      func(ctx.get());
+    }
+    func(&ctx_);
+  }
+
+  ContextImpl<Pipeline, Handler> ctx_;
+  std::vector<std::unique_ptr<PipelineContext>> ctxs_;
+};
+
+}}
+
+namespace folly {
+
+class AsyncSocket;
+
+template <typename Pipeline>
+class PipelineFactory {
+ public:
+  virtual Pipeline* newPipeline(std::shared_ptr<AsyncSocket>) = 0;
+  virtual ~PipelineFactory() {}
+};
+
+}
diff --git a/folly/wangle/channel/test/ChannelPipelineTest.cpp b/folly/wangle/channel/test/ChannelPipelineTest.cpp
deleted file mode 100644 (file)
index 0be3db2..0000000
+++ /dev/null
@@ -1,251 +0,0 @@
-/*
- * Copyright 2015 Facebook, Inc.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-#include <folly/wangle/channel/ChannelHandler.h>
-#include <folly/wangle/channel/ChannelPipeline.h>
-#include <folly/wangle/channel/AsyncSocketHandler.h>
-#include <folly/wangle/channel/OutputBufferingHandler.h>
-#include <folly/wangle/channel/test/MockChannelHandler.h>
-#include <gmock/gmock.h>
-#include <gtest/gtest.h>
-
-using namespace folly;
-using namespace folly::wangle;
-using namespace testing;
-
-typedef StrictMock<MockChannelHandlerAdapter<int, int>> IntHandler;
-typedef ChannelHandlerPtr<IntHandler, false> IntHandlerPtr;
-
-ACTION(FireRead) {
-  arg0->fireRead(arg1);
-}
-
-ACTION(FireReadEOF) {
-  arg0->fireReadEOF();
-}
-
-ACTION(FireReadException) {
-  arg0->fireReadException(arg1);
-}
-
-ACTION(FireWrite) {
-  arg0->fireWrite(arg1);
-}
-
-ACTION(FireClose) {
-  arg0->fireClose();
-}
-
-// Test move only types, among other things
-TEST(ChannelTest, RealHandlersCompile) {
-  EventBase eb;
-  auto socket = AsyncSocket::newSocket(&eb);
-  // static
-  {
-    ChannelPipeline<IOBufQueue&, std::unique_ptr<IOBuf>,
-      AsyncSocketHandler,
-      OutputBufferingHandler>
-    pipeline{AsyncSocketHandler(socket), OutputBufferingHandler()};
-    EXPECT_TRUE(pipeline.getHandler<AsyncSocketHandler>(0));
-    EXPECT_TRUE(pipeline.getHandler<OutputBufferingHandler>(1));
-  }
-  // dynamic
-  {
-    ChannelPipeline<IOBufQueue&, std::unique_ptr<IOBuf>> pipeline;
-    pipeline
-      .addBack(AsyncSocketHandler(socket))
-      .addBack(OutputBufferingHandler())
-      .finalize();
-    EXPECT_TRUE(pipeline.getHandler<AsyncSocketHandler>(0));
-    EXPECT_TRUE(pipeline.getHandler<OutputBufferingHandler>(1));
-  }
-}
-
-// Test that handlers correctly fire the next handler when directed
-TEST(ChannelTest, FireActions) {
-  IntHandler handler1;
-  IntHandler handler2;
-
-  EXPECT_CALL(handler1, attachPipeline(_));
-  EXPECT_CALL(handler2, attachPipeline(_));
-
-  ChannelPipeline<int, int, IntHandlerPtr, IntHandlerPtr>
-  pipeline(&handler1, &handler2);
-
-  EXPECT_CALL(handler1, read_(_, _)).WillOnce(FireRead());
-  EXPECT_CALL(handler2, read_(_, _)).Times(1);
-  pipeline.read(1);
-
-  EXPECT_CALL(handler1, readEOF(_)).WillOnce(FireReadEOF());
-  EXPECT_CALL(handler2, readEOF(_)).Times(1);
-  pipeline.readEOF();
-
-  EXPECT_CALL(handler1, readException(_, _)).WillOnce(FireReadException());
-  EXPECT_CALL(handler2, readException(_, _)).Times(1);
-  pipeline.readException(make_exception_wrapper<std::runtime_error>("blah"));
-
-  EXPECT_CALL(handler2, write_(_, _)).WillOnce(FireWrite());
-  EXPECT_CALL(handler1, write_(_, _)).Times(1);
-  EXPECT_NO_THROW(pipeline.write(1).value());
-
-  EXPECT_CALL(handler2, close_(_)).WillOnce(FireClose());
-  EXPECT_CALL(handler1, close_(_)).Times(1);
-  EXPECT_NO_THROW(pipeline.close().value());
-
-  EXPECT_CALL(handler1, detachPipeline(_));
-  EXPECT_CALL(handler2, detachPipeline(_));
-}
-
-// Test that nothing bad happens when actions reach the end of the pipeline
-// (a warning will be logged, however)
-TEST(ChannelTest, ReachEndOfPipeline) {
-  IntHandler handler;
-  EXPECT_CALL(handler, attachPipeline(_));
-  ChannelPipeline<int, int, IntHandlerPtr>
-  pipeline(&handler);
-
-  EXPECT_CALL(handler, read_(_, _)).WillOnce(FireRead());
-  pipeline.read(1);
-
-  EXPECT_CALL(handler, readEOF(_)).WillOnce(FireReadEOF());
-  pipeline.readEOF();
-
-  EXPECT_CALL(handler, readException(_, _)).WillOnce(FireReadException());
-  pipeline.readException(make_exception_wrapper<std::runtime_error>("blah"));
-
-  EXPECT_CALL(handler, write_(_, _)).WillOnce(FireWrite());
-  EXPECT_NO_THROW(pipeline.write(1).value());
-
-  EXPECT_CALL(handler, close_(_)).WillOnce(FireClose());
-  EXPECT_NO_THROW(pipeline.close().value());
-
-  EXPECT_CALL(handler, detachPipeline(_));
-}
-
-// Test having the last read handler turn around and write
-TEST(ChannelTest, TurnAround) {
-  IntHandler handler1;
-  IntHandler handler2;
-
-  EXPECT_CALL(handler1, attachPipeline(_));
-  EXPECT_CALL(handler2, attachPipeline(_));
-
-  ChannelPipeline<int, int, IntHandlerPtr, IntHandlerPtr>
-  pipeline(&handler1, &handler2);
-
-  EXPECT_CALL(handler1, read_(_, _)).WillOnce(FireRead());
-  EXPECT_CALL(handler2, read_(_, _)).WillOnce(FireWrite());
-  EXPECT_CALL(handler1, write_(_, _)).Times(1);
-  pipeline.read(1);
-
-  EXPECT_CALL(handler1, detachPipeline(_));
-  EXPECT_CALL(handler2, detachPipeline(_));
-}
-
-TEST(ChannelTest, DynamicFireActions) {
-  IntHandler handler1, handler2, handler3;
-  EXPECT_CALL(handler2, attachPipeline(_));
-  ChannelPipeline<int, int, IntHandlerPtr>
-  pipeline(&handler2);
-
-  EXPECT_CALL(handler1, attachPipeline(_));
-  EXPECT_CALL(handler3, attachPipeline(_));
-
-  pipeline
-    .addFront(IntHandlerPtr(&handler1))
-    .addBack(IntHandlerPtr(&handler3))
-    .finalize();
-
-  EXPECT_TRUE(pipeline.getHandler<IntHandlerPtr>(0));
-  EXPECT_TRUE(pipeline.getHandler<IntHandlerPtr>(1));
-  EXPECT_TRUE(pipeline.getHandler<IntHandlerPtr>(2));
-
-  EXPECT_CALL(handler1, read_(_, _)).WillOnce(FireRead());
-  EXPECT_CALL(handler2, read_(_, _)).WillOnce(FireRead());
-  EXPECT_CALL(handler3, read_(_, _)).Times(1);
-  pipeline.read(1);
-
-  EXPECT_CALL(handler3, write_(_, _)).WillOnce(FireWrite());
-  EXPECT_CALL(handler2, write_(_, _)).WillOnce(FireWrite());
-  EXPECT_CALL(handler1, write_(_, _)).Times(1);
-  EXPECT_NO_THROW(pipeline.write(1).value());
-
-  EXPECT_CALL(handler1, detachPipeline(_));
-  EXPECT_CALL(handler2, detachPipeline(_));
-  EXPECT_CALL(handler3, detachPipeline(_));
-}
-
-template <class Rin, class Rout = Rin, class Win = Rout, class Wout = Rin>
-class ConcreteChannelHandler : public ChannelHandler<Rin, Rout, Win, Wout> {
-  typedef typename ChannelHandler<Rin, Rout, Win, Wout>::Context Context;
- public:
-  void read(Context* ctx, Rin msg) {}
-  Future<void> write(Context* ctx, Win msg) { return makeFuture(); }
-};
-
-typedef ChannelHandlerAdapter<std::string, std::string> StringHandler;
-typedef ConcreteChannelHandler<int, std::string> IntToStringHandler;
-typedef ConcreteChannelHandler<std::string, int> StringToIntHandler;
-
-TEST(ChannelPipeline, DynamicConstruction) {
-  {
-    ChannelPipeline<int, int> pipeline;
-    EXPECT_THROW(
-      pipeline
-        .addBack(ChannelHandlerAdapter<std::string, std::string>{})
-        .finalize(), std::invalid_argument);
-  }
-  {
-    ChannelPipeline<int, int> pipeline;
-    EXPECT_THROW(
-      pipeline
-        .addFront(ChannelHandlerAdapter<std::string, std::string>{})
-        .finalize(),
-      std::invalid_argument);
-  }
-  {
-    ChannelPipeline<std::string, std::string, StringHandler, StringHandler>
-    pipeline{StringHandler(), StringHandler()};
-
-    // Exercise both addFront and addBack. Final pipeline is
-    // StI <-> ItS <-> StS <-> StS <-> StI <-> ItS
-    EXPECT_NO_THROW(
-      pipeline
-        .addFront(IntToStringHandler{})
-        .addFront(StringToIntHandler{})
-        .addBack(StringToIntHandler{})
-        .addBack(IntToStringHandler{})
-        .finalize());
-  }
-}
-
-TEST(ChannelPipeline, AttachTransport) {
-  IntHandler handler;
-  EXPECT_CALL(handler, attachPipeline(_));
-  ChannelPipeline<int, int, IntHandlerPtr>
-  pipeline(&handler);
-
-  EventBase eb;
-  auto socket = AsyncSocket::newSocket(&eb);
-
-  EXPECT_CALL(handler, attachTransport(_));
-  pipeline.attachTransport(socket);
-
-  EXPECT_CALL(handler, detachTransport(_));
-  pipeline.detachTransport();
-
-  EXPECT_CALL(handler, detachPipeline(_));
-}
diff --git a/folly/wangle/channel/test/MockChannelHandler.h b/folly/wangle/channel/test/MockChannelHandler.h
deleted file mode 100644 (file)
index 15b88cb..0000000
+++ /dev/null
@@ -1,75 +0,0 @@
-/*
- * Copyright 2015 Facebook, Inc.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-#pragma once
-
-#include <folly/wangle/channel/ChannelHandler.h>
-#include <gmock/gmock.h>
-
-namespace folly { namespace wangle {
-
-template <class Rin, class Rout = Rin, class Win = Rout, class Wout = Rin>
-class MockChannelHandler : public ChannelHandler<Rin, Rout, Win, Wout> {
- public:
-  typedef typename ChannelHandler<Rin, Rout, Win, Wout>::Context Context;
-
-  MockChannelHandler() = default;
-  MockChannelHandler(MockChannelHandler&&) = default;
-
-#ifdef __clang__
-# pragma clang diagnostic push
-# if __clang_major__ > 3 || __clang_minor__ >= 6
-#  pragma clang diagnostic ignored "-Winconsistent-missing-override"
-# endif
-#endif
-
-  MOCK_METHOD2_T(read_, void(Context*, Rin&));
-  MOCK_METHOD1_T(readEOF, void(Context*));
-  MOCK_METHOD2_T(readException, void(Context*, exception_wrapper));
-
-  MOCK_METHOD2_T(write_, void(Context*, Win&));
-  MOCK_METHOD1_T(close_, void(Context*));
-
-  MOCK_METHOD1_T(attachPipeline, void(Context*));
-  MOCK_METHOD1_T(attachTransport, void(Context*));
-  MOCK_METHOD1_T(detachPipeline, void(Context*));
-  MOCK_METHOD1_T(detachTransport, void(Context*));
-
-#ifdef __clang__
-#pragma clang diagnostic pop
-#endif
-
-  void read(Context* ctx, Rin msg) override {
-    read_(ctx, msg);
-  }
-
-  Future<void> write(Context* ctx, Win msg) override {
-    return makeFutureWith([&](){
-      write_(ctx, msg);
-    });
-  }
-
-  Future<void> close(Context* ctx) override {
-    return makeFutureWith([&](){
-      close_(ctx);
-    });
-  }
-};
-
-template <class R, class W = R>
-using MockChannelHandlerAdapter = MockChannelHandler<R, R, W, W>;
-
-}}
diff --git a/folly/wangle/channel/test/MockHandler.h b/folly/wangle/channel/test/MockHandler.h
new file mode 100644 (file)
index 0000000..5a47664
--- /dev/null
@@ -0,0 +1,75 @@
+/*
+ * Copyright 2015 Facebook, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#pragma once
+
+#include <folly/wangle/channel/Handler.h>
+#include <gmock/gmock.h>
+
+namespace folly { namespace wangle {
+
+template <class Rin, class Rout = Rin, class Win = Rout, class Wout = Rin>
+class MockHandler : public Handler<Rin, Rout, Win, Wout> {
+ public:
+  typedef typename Handler<Rin, Rout, Win, Wout>::Context Context;
+
+  MockHandler() = default;
+  MockHandler(MockHandler&&) = default;
+
+#ifdef __clang__
+# pragma clang diagnostic push
+# if __clang_major__ > 3 || __clang_minor__ >= 6
+#  pragma clang diagnostic ignored "-Winconsistent-missing-override"
+# endif
+#endif
+
+  MOCK_METHOD2_T(read_, void(Context*, Rin&));
+  MOCK_METHOD1_T(readEOF, void(Context*));
+  MOCK_METHOD2_T(readException, void(Context*, exception_wrapper));
+
+  MOCK_METHOD2_T(write_, void(Context*, Win&));
+  MOCK_METHOD1_T(close_, void(Context*));
+
+  MOCK_METHOD1_T(attachPipeline, void(Context*));
+  MOCK_METHOD1_T(attachTransport, void(Context*));
+  MOCK_METHOD1_T(detachPipeline, void(Context*));
+  MOCK_METHOD1_T(detachTransport, void(Context*));
+
+#ifdef __clang__
+#pragma clang diagnostic pop
+#endif
+
+  void read(Context* ctx, Rin msg) override {
+    read_(ctx, msg);
+  }
+
+  Future<void> write(Context* ctx, Win msg) override {
+    return makeFutureWith([&](){
+      write_(ctx, msg);
+    });
+  }
+
+  Future<void> close(Context* ctx) override {
+    return makeFutureWith([&](){
+      close_(ctx);
+    });
+  }
+};
+
+template <class R, class W = R>
+using MockHandlerAdapter = MockHandler<R, R, W, W>;
+
+}}
index e99d43e5061179ed43fd6afc2e831a740ede0344..a08509b653461d6430a3cd522bfc2212ea180fe4 100644 (file)
@@ -14,9 +14,9 @@
  * limitations under the License.
  */
 
-#include <folly/wangle/channel/ChannelPipeline.h>
+#include <folly/wangle/channel/Pipeline.h>
 #include <folly/wangle/channel/OutputBufferingHandler.h>
-#include <folly/wangle/channel/test/MockChannelHandler.h>
+#include <folly/wangle/channel/test/MockHandler.h>
 #include <folly/io/async/AsyncSocket.h>
 #include <gmock/gmock.h>
 #include <gtest/gtest.h>
@@ -25,18 +25,18 @@ using namespace folly;
 using namespace folly::wangle;
 using namespace testing;
 
-typedef StrictMock<MockChannelHandlerAdapter<
+typedef StrictMock<MockHandlerAdapter<
   IOBufQueue&,
   std::unique_ptr<IOBuf>>>
-MockHandler;
+MockBytesHandler;
 
 MATCHER_P(IOBufContains, str, "") { return arg->moveToFbString() == str; }
 
 TEST(OutputBufferingHandlerTest, Basic) {
-  MockHandler mockHandler;
+  MockBytesHandler mockHandler;
   EXPECT_CALL(mockHandler, attachPipeline(_));
-  ChannelPipeline<IOBufQueue&, std::unique_ptr<IOBuf>,
-    ChannelHandlerPtr<MockHandler, false>,
+  Pipeline<IOBufQueue&, std::unique_ptr<IOBuf>,
+    HandlerPtr<MockBytesHandler, false>,
     OutputBufferingHandler>
   pipeline(&mockHandler, OutputBufferingHandler{});
 
diff --git a/folly/wangle/channel/test/PipelineTest.cpp b/folly/wangle/channel/test/PipelineTest.cpp
new file mode 100644 (file)
index 0000000..5fa97a6
--- /dev/null
@@ -0,0 +1,251 @@
+/*
+ * Copyright 2015 Facebook, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include <folly/wangle/channel/Handler.h>
+#include <folly/wangle/channel/Pipeline.h>
+#include <folly/wangle/channel/AsyncSocketHandler.h>
+#include <folly/wangle/channel/OutputBufferingHandler.h>
+#include <folly/wangle/channel/test/MockHandler.h>
+#include <gmock/gmock.h>
+#include <gtest/gtest.h>
+
+using namespace folly;
+using namespace folly::wangle;
+using namespace testing;
+
+typedef StrictMock<MockHandlerAdapter<int, int>> IntHandler;
+typedef HandlerPtr<IntHandler, false> IntHandlerPtr;
+
+ACTION(FireRead) {
+  arg0->fireRead(arg1);
+}
+
+ACTION(FireReadEOF) {
+  arg0->fireReadEOF();
+}
+
+ACTION(FireReadException) {
+  arg0->fireReadException(arg1);
+}
+
+ACTION(FireWrite) {
+  arg0->fireWrite(arg1);
+}
+
+ACTION(FireClose) {
+  arg0->fireClose();
+}
+
+// Test move only types, among other things
+TEST(PipelineTest, RealHandlersCompile) {
+  EventBase eb;
+  auto socket = AsyncSocket::newSocket(&eb);
+  // static
+  {
+    Pipeline<IOBufQueue&, std::unique_ptr<IOBuf>,
+      AsyncSocketHandler,
+      OutputBufferingHandler>
+    pipeline{AsyncSocketHandler(socket), OutputBufferingHandler()};
+    EXPECT_TRUE(pipeline.getHandler<AsyncSocketHandler>(0));
+    EXPECT_TRUE(pipeline.getHandler<OutputBufferingHandler>(1));
+  }
+  // dynamic
+  {
+    Pipeline<IOBufQueue&, std::unique_ptr<IOBuf>> pipeline;
+    pipeline
+      .addBack(AsyncSocketHandler(socket))
+      .addBack(OutputBufferingHandler())
+      .finalize();
+    EXPECT_TRUE(pipeline.getHandler<AsyncSocketHandler>(0));
+    EXPECT_TRUE(pipeline.getHandler<OutputBufferingHandler>(1));
+  }
+}
+
+// Test that handlers correctly fire the next handler when directed
+TEST(PipelineTest, FireActions) {
+  IntHandler handler1;
+  IntHandler handler2;
+
+  EXPECT_CALL(handler1, attachPipeline(_));
+  EXPECT_CALL(handler2, attachPipeline(_));
+
+  Pipeline<int, int, IntHandlerPtr, IntHandlerPtr>
+  pipeline(&handler1, &handler2);
+
+  EXPECT_CALL(handler1, read_(_, _)).WillOnce(FireRead());
+  EXPECT_CALL(handler2, read_(_, _)).Times(1);
+  pipeline.read(1);
+
+  EXPECT_CALL(handler1, readEOF(_)).WillOnce(FireReadEOF());
+  EXPECT_CALL(handler2, readEOF(_)).Times(1);
+  pipeline.readEOF();
+
+  EXPECT_CALL(handler1, readException(_, _)).WillOnce(FireReadException());
+  EXPECT_CALL(handler2, readException(_, _)).Times(1);
+  pipeline.readException(make_exception_wrapper<std::runtime_error>("blah"));
+
+  EXPECT_CALL(handler2, write_(_, _)).WillOnce(FireWrite());
+  EXPECT_CALL(handler1, write_(_, _)).Times(1);
+  EXPECT_NO_THROW(pipeline.write(1).value());
+
+  EXPECT_CALL(handler2, close_(_)).WillOnce(FireClose());
+  EXPECT_CALL(handler1, close_(_)).Times(1);
+  EXPECT_NO_THROW(pipeline.close().value());
+
+  EXPECT_CALL(handler1, detachPipeline(_));
+  EXPECT_CALL(handler2, detachPipeline(_));
+}
+
+// Test that nothing bad happens when actions reach the end of the pipeline
+// (a warning will be logged, however)
+TEST(PipelineTest, ReachEndOfPipeline) {
+  IntHandler handler;
+  EXPECT_CALL(handler, attachPipeline(_));
+  Pipeline<int, int, IntHandlerPtr>
+  pipeline(&handler);
+
+  EXPECT_CALL(handler, read_(_, _)).WillOnce(FireRead());
+  pipeline.read(1);
+
+  EXPECT_CALL(handler, readEOF(_)).WillOnce(FireReadEOF());
+  pipeline.readEOF();
+
+  EXPECT_CALL(handler, readException(_, _)).WillOnce(FireReadException());
+  pipeline.readException(make_exception_wrapper<std::runtime_error>("blah"));
+
+  EXPECT_CALL(handler, write_(_, _)).WillOnce(FireWrite());
+  EXPECT_NO_THROW(pipeline.write(1).value());
+
+  EXPECT_CALL(handler, close_(_)).WillOnce(FireClose());
+  EXPECT_NO_THROW(pipeline.close().value());
+
+  EXPECT_CALL(handler, detachPipeline(_));
+}
+
+// Test having the last read handler turn around and write
+TEST(PipelineTest, TurnAround) {
+  IntHandler handler1;
+  IntHandler handler2;
+
+  EXPECT_CALL(handler1, attachPipeline(_));
+  EXPECT_CALL(handler2, attachPipeline(_));
+
+  Pipeline<int, int, IntHandlerPtr, IntHandlerPtr>
+  pipeline(&handler1, &handler2);
+
+  EXPECT_CALL(handler1, read_(_, _)).WillOnce(FireRead());
+  EXPECT_CALL(handler2, read_(_, _)).WillOnce(FireWrite());
+  EXPECT_CALL(handler1, write_(_, _)).Times(1);
+  pipeline.read(1);
+
+  EXPECT_CALL(handler1, detachPipeline(_));
+  EXPECT_CALL(handler2, detachPipeline(_));
+}
+
+TEST(PipelineTest, DynamicFireActions) {
+  IntHandler handler1, handler2, handler3;
+  EXPECT_CALL(handler2, attachPipeline(_));
+  Pipeline<int, int, IntHandlerPtr>
+  pipeline(&handler2);
+
+  EXPECT_CALL(handler1, attachPipeline(_));
+  EXPECT_CALL(handler3, attachPipeline(_));
+
+  pipeline
+    .addFront(IntHandlerPtr(&handler1))
+    .addBack(IntHandlerPtr(&handler3))
+    .finalize();
+
+  EXPECT_TRUE(pipeline.getHandler<IntHandlerPtr>(0));
+  EXPECT_TRUE(pipeline.getHandler<IntHandlerPtr>(1));
+  EXPECT_TRUE(pipeline.getHandler<IntHandlerPtr>(2));
+
+  EXPECT_CALL(handler1, read_(_, _)).WillOnce(FireRead());
+  EXPECT_CALL(handler2, read_(_, _)).WillOnce(FireRead());
+  EXPECT_CALL(handler3, read_(_, _)).Times(1);
+  pipeline.read(1);
+
+  EXPECT_CALL(handler3, write_(_, _)).WillOnce(FireWrite());
+  EXPECT_CALL(handler2, write_(_, _)).WillOnce(FireWrite());
+  EXPECT_CALL(handler1, write_(_, _)).Times(1);
+  EXPECT_NO_THROW(pipeline.write(1).value());
+
+  EXPECT_CALL(handler1, detachPipeline(_));
+  EXPECT_CALL(handler2, detachPipeline(_));
+  EXPECT_CALL(handler3, detachPipeline(_));
+}
+
+template <class Rin, class Rout = Rin, class Win = Rout, class Wout = Rin>
+class ConcreteHandler : public Handler<Rin, Rout, Win, Wout> {
+  typedef typename Handler<Rin, Rout, Win, Wout>::Context Context;
+ public:
+  void read(Context* ctx, Rin msg) {}
+  Future<void> write(Context* ctx, Win msg) { return makeFuture(); }
+};
+
+typedef HandlerAdapter<std::string, std::string> StringHandler;
+typedef ConcreteHandler<int, std::string> IntToStringHandler;
+typedef ConcreteHandler<std::string, int> StringToIntHandler;
+
+TEST(Pipeline, DynamicConstruction) {
+  {
+    Pipeline<int, int> pipeline;
+    EXPECT_THROW(
+      pipeline
+        .addBack(HandlerAdapter<std::string, std::string>{})
+        .finalize(), std::invalid_argument);
+  }
+  {
+    Pipeline<int, int> pipeline;
+    EXPECT_THROW(
+      pipeline
+        .addFront(HandlerAdapter<std::string, std::string>{})
+        .finalize(),
+      std::invalid_argument);
+  }
+  {
+    Pipeline<std::string, std::string, StringHandler, StringHandler>
+    pipeline{StringHandler(), StringHandler()};
+
+    // Exercise both addFront and addBack. Final pipeline is
+    // StI <-> ItS <-> StS <-> StS <-> StI <-> ItS
+    EXPECT_NO_THROW(
+      pipeline
+        .addFront(IntToStringHandler{})
+        .addFront(StringToIntHandler{})
+        .addBack(StringToIntHandler{})
+        .addBack(IntToStringHandler{})
+        .finalize());
+  }
+}
+
+TEST(Pipeline, AttachTransport) {
+  IntHandler handler;
+  EXPECT_CALL(handler, attachPipeline(_));
+  Pipeline<int, int, IntHandlerPtr>
+  pipeline(&handler);
+
+  EventBase eb;
+  auto socket = AsyncSocket::newSocket(&eb);
+
+  EXPECT_CALL(handler, attachTransport(_));
+  pipeline.attachTransport(socket);
+
+  EXPECT_CALL(handler, detachTransport(_));
+  pipeline.detachTransport();
+
+  EXPECT_CALL(handler, detachPipeline(_));
+}
index 96680002bc54000535f7565b69c38ffabd86d0d4..20d6e7fe676dc0c5b24d4b556306cdd8d87f769c 100644 (file)
  */
 #pragma once
 
-#include <folly/wangle/channel/ChannelHandler.h>
+#include <folly/wangle/channel/Handler.h>
 
 namespace folly { namespace wangle {
 
 /**
- * A ChannelHandler which decodes bytes in a stream-like fashion from
+ * A Handler which decodes bytes in a stream-like fashion from
  * IOBufQueue to a  Message type.
  *
  * Frame detection
index 7657c715013596a42d25dc7f9599ad008a098a03..80bb83d3acdb38ff0466975c5d8b4575e9e434b8 100644 (file)
@@ -55,7 +55,7 @@ class BytesReflector
 };
 
 TEST(FixedLengthFrameDecoder, FailWhenLengthFieldEndOffset) {
-  ChannelPipeline<IOBufQueue&, std::unique_ptr<IOBuf>> pipeline;
+  Pipeline<IOBufQueue&, std::unique_ptr<IOBuf>> pipeline;
   int called = 0;
 
   pipeline
@@ -90,7 +90,7 @@ TEST(FixedLengthFrameDecoder, FailWhenLengthFieldEndOffset) {
 }
 
 TEST(LengthFieldFramePipeline, SimpleTest) {
-  ChannelPipeline<IOBufQueue&, std::unique_ptr<IOBuf>> pipeline;
+  Pipeline<IOBufQueue&, std::unique_ptr<IOBuf>> pipeline;
   int called = 0;
 
   pipeline
@@ -111,7 +111,7 @@ TEST(LengthFieldFramePipeline, SimpleTest) {
 }
 
 TEST(LengthFieldFramePipeline, LittleEndian) {
-  ChannelPipeline<IOBufQueue&, std::unique_ptr<IOBuf>> pipeline;
+  Pipeline<IOBufQueue&, std::unique_ptr<IOBuf>> pipeline;
   int called = 0;
 
   pipeline
@@ -132,7 +132,7 @@ TEST(LengthFieldFramePipeline, LittleEndian) {
 }
 
 TEST(LengthFieldFrameDecoder, Simple) {
-  ChannelPipeline<IOBufQueue&, std::unique_ptr<IOBuf>> pipeline;
+  Pipeline<IOBufQueue&, std::unique_ptr<IOBuf>> pipeline;
   int called = 0;
 
   pipeline
@@ -163,7 +163,7 @@ TEST(LengthFieldFrameDecoder, Simple) {
 }
 
 TEST(LengthFieldFrameDecoder, NoStrip) {
-  ChannelPipeline<IOBufQueue&, std::unique_ptr<IOBuf>> pipeline;
+  Pipeline<IOBufQueue&, std::unique_ptr<IOBuf>> pipeline;
   int called = 0;
 
   pipeline
@@ -194,7 +194,7 @@ TEST(LengthFieldFrameDecoder, NoStrip) {
 }
 
 TEST(LengthFieldFrameDecoder, Adjustment) {
-  ChannelPipeline<IOBufQueue&, std::unique_ptr<IOBuf>> pipeline;
+  Pipeline<IOBufQueue&, std::unique_ptr<IOBuf>> pipeline;
   int called = 0;
 
   pipeline
@@ -225,7 +225,7 @@ TEST(LengthFieldFrameDecoder, Adjustment) {
 }
 
 TEST(LengthFieldFrameDecoder, PreHeader) {
-  ChannelPipeline<IOBufQueue&, std::unique_ptr<IOBuf>> pipeline;
+  Pipeline<IOBufQueue&, std::unique_ptr<IOBuf>> pipeline;
   int called = 0;
 
   pipeline
@@ -257,7 +257,7 @@ TEST(LengthFieldFrameDecoder, PreHeader) {
 }
 
 TEST(LengthFieldFrameDecoder, PostHeader) {
-  ChannelPipeline<IOBufQueue&, std::unique_ptr<IOBuf>> pipeline;
+  Pipeline<IOBufQueue&, std::unique_ptr<IOBuf>> pipeline;
   int called = 0;
 
   pipeline
@@ -289,7 +289,7 @@ TEST(LengthFieldFrameDecoder, PostHeader) {
 }
 
 TEST(LengthFieldFrameDecoderStrip, PrePostHeader) {
-  ChannelPipeline<IOBufQueue&, std::unique_ptr<IOBuf>> pipeline;
+  Pipeline<IOBufQueue&, std::unique_ptr<IOBuf>> pipeline;
   int called = 0;
 
   pipeline
@@ -322,7 +322,7 @@ TEST(LengthFieldFrameDecoderStrip, PrePostHeader) {
 }
 
 TEST(LengthFieldFrameDecoder, StripPrePostHeaderFrameInclHeader) {
-  ChannelPipeline<IOBufQueue&, std::unique_ptr<IOBuf>> pipeline;
+  Pipeline<IOBufQueue&, std::unique_ptr<IOBuf>> pipeline;
   int called = 0;
 
   pipeline
@@ -355,7 +355,7 @@ TEST(LengthFieldFrameDecoder, StripPrePostHeaderFrameInclHeader) {
 }
 
 TEST(LengthFieldFrameDecoder, FailTestLengthFieldEndOffset) {
-  ChannelPipeline<IOBufQueue&, std::unique_ptr<IOBuf>> pipeline;
+  Pipeline<IOBufQueue&, std::unique_ptr<IOBuf>> pipeline;
   int called = 0;
 
   pipeline
@@ -380,7 +380,7 @@ TEST(LengthFieldFrameDecoder, FailTestLengthFieldEndOffset) {
 }
 
 TEST(LengthFieldFrameDecoder, FailTestLengthFieldFrameSize) {
-  ChannelPipeline<IOBufQueue&, std::unique_ptr<IOBuf>> pipeline;
+  Pipeline<IOBufQueue&, std::unique_ptr<IOBuf>> pipeline;
   int called = 0;
 
   pipeline
@@ -407,7 +407,7 @@ TEST(LengthFieldFrameDecoder, FailTestLengthFieldFrameSize) {
 }
 
 TEST(LengthFieldFrameDecoder, FailTestLengthFieldInitialBytes) {
-  ChannelPipeline<IOBufQueue&, std::unique_ptr<IOBuf>> pipeline;
+  Pipeline<IOBufQueue&, std::unique_ptr<IOBuf>> pipeline;
   int called = 0;
 
   pipeline
@@ -434,7 +434,7 @@ TEST(LengthFieldFrameDecoder, FailTestLengthFieldInitialBytes) {
 }
 
 TEST(LineBasedFrameDecoder, Simple) {
-  ChannelPipeline<IOBufQueue&, std::unique_ptr<IOBuf>> pipeline;
+  Pipeline<IOBufQueue&, std::unique_ptr<IOBuf>> pipeline;
   int called = 0;
 
   pipeline
@@ -485,7 +485,7 @@ TEST(LineBasedFrameDecoder, Simple) {
 }
 
 TEST(LineBasedFrameDecoder, SaveDelimiter) {
-  ChannelPipeline<IOBufQueue&, std::unique_ptr<IOBuf>> pipeline;
+  Pipeline<IOBufQueue&, std::unique_ptr<IOBuf>> pipeline;
   int called = 0;
 
   pipeline
@@ -534,7 +534,7 @@ TEST(LineBasedFrameDecoder, SaveDelimiter) {
 }
 
 TEST(LineBasedFrameDecoder, Fail) {
-  ChannelPipeline<IOBufQueue&, std::unique_ptr<IOBuf>> pipeline;
+  Pipeline<IOBufQueue&, std::unique_ptr<IOBuf>> pipeline;
   int called = 0;
 
   pipeline
@@ -582,7 +582,7 @@ TEST(LineBasedFrameDecoder, Fail) {
 }
 
 TEST(LineBasedFrameDecoder, NewLineOnly) {
-  ChannelPipeline<IOBufQueue&, std::unique_ptr<IOBuf>> pipeline;
+  Pipeline<IOBufQueue&, std::unique_ptr<IOBuf>> pipeline;
   int called = 0;
 
   pipeline
@@ -609,7 +609,7 @@ TEST(LineBasedFrameDecoder, NewLineOnly) {
 }
 
 TEST(LineBasedFrameDecoder, CarriageNewLineOnly) {
-  ChannelPipeline<IOBufQueue&, std::unique_ptr<IOBuf>> pipeline;
+  Pipeline<IOBufQueue&, std::unique_ptr<IOBuf>> pipeline;
   int called = 0;
 
   pipeline
index 0f8c96d4402e431ff259d8ddd710b8738abfb80a..226a3677098e3298ef737c4186754241cea3c17f 100644 (file)
 
 #pragma once
 
-#include <folly/wangle/channel/ChannelHandler.h>
+#include <folly/wangle/channel/Handler.h>
 
 namespace folly { namespace wangle {
 
 /*
  * StringCodec converts a pipeline from IOBufs to std::strings.
  */
-class StringCodec : public ChannelHandler<IOBufQueue&, std::string,
-                                          std::string, std::unique_ptr<IOBuf>> {
+class StringCodec : public Handler<IOBufQueue&, std::string,
+                                   std::string, std::unique_ptr<IOBuf>> {
  public:
-  typedef typename ChannelHandler<
+  typedef typename Handler<
    IOBufQueue&, std::string,
    std::string, std::unique_ptr<IOBuf>>::Context Context;
 
index f0a5e89ce263a2a98f21cc73cc5f4121a434f221..ac8ccc927e5f46f0f7acafa40034f68d92650063 100644 (file)
@@ -15,7 +15,7 @@
  */
 #pragma once
 
-#include <folly/wangle/channel/ChannelHandler.h>
+#include <folly/wangle/channel/Handler.h>
 #include <folly/wangle/service/Service.h>
 
 namespace folly { namespace wangle {
@@ -26,16 +26,16 @@ namespace folly { namespace wangle {
  * only one request is allowed at a time.
  */
 template <typename Pipeline, typename Req, typename Resp = Req>
-class SerialClientDispatcher : public ChannelHandlerAdapter<Req, Resp>
+class SerialClientDispatcher : public HandlerAdapter<Req, Resp>
                              , public Service<Req, Resp> {
  public:
 
-  typedef typename ChannelHandlerAdapter<Req, Resp>::Context Context;
+  typedef typename HandlerAdapter<Req, Resp>::Context Context;
 
   void setPipeline(Pipeline* pipeline) {
     pipeline_ = pipeline;
     pipeline->addBack(
-      ChannelHandlerPtr<SerialClientDispatcher<Pipeline, Req, Resp>, false>(
+      HandlerPtr<SerialClientDispatcher<Pipeline, Req, Resp>, false>(
         this));
     pipeline->finalize();
   }
index 8152dc60d861b368d7efe3f38447438ecf4662e3..0b3167e0fd2d6fee36d0148acc454e0d684a8ce1 100644 (file)
@@ -15,7 +15,7 @@
  */
 #pragma once
 
-#include <folly/wangle/channel/ChannelHandler.h>
+#include <folly/wangle/channel/Handler.h>
 #include <folly/wangle/service/Service.h>
 
 namespace folly { namespace wangle {
@@ -25,10 +25,10 @@ namespace folly { namespace wangle {
  * Concurrent requests are queued in the pipeline.
  */
 template <typename Req, typename Resp = Req>
-class SerialServerDispatcher : public ChannelHandlerAdapter<Req, Resp> {
+class SerialServerDispatcher : public HandlerAdapter<Req, Resp> {
  public:
 
-  typedef typename ChannelHandlerAdapter<Req, Resp>::Context Context;
+  typedef typename HandlerAdapter<Req, Resp>::Context Context;
 
   explicit SerialServerDispatcher(Service<Req, Resp>* service)
       : service_(service) {}
index 7d78defdc6fb17d99c22bc43c9bbb6a1d03fd442..1d9aab2412329c4c786286bfe101667118d5317d 100644 (file)
@@ -20,7 +20,7 @@
 
 #include <folly/wangle/bootstrap/ServerBootstrap.h>
 #include <folly/wangle/bootstrap/ClientBootstrap.h>
-#include <folly/wangle/channel/ChannelPipeline.h>
+#include <folly/wangle/channel/Pipeline.h>
 #include <folly/wangle/channel/AsyncSocketHandler.h>
 
 namespace folly {
index 843f80d0df1e75d1d1bd171737ce239eb62e298b..b9c6d815a54e5be3282b1484518087ea60a682e1 100644 (file)
@@ -24,7 +24,7 @@ namespace folly {
 
 using namespace wangle;
 
-typedef ChannelPipeline<IOBufQueue&, std::string> Pipeline;
+typedef Pipeline<IOBufQueue&, std::string> ServicePipeline;
 
 class EchoService : public Service<std::string, std::string> {
  public:
@@ -42,12 +42,12 @@ class EchoIntService : public Service<std::string, int> {
 
 template <typename Req, typename Resp>
 class ServerPipelineFactory
-    : public PipelineFactory<Pipeline> {
+    : public PipelineFactory<ServicePipeline> {
  public:
 
-  Pipeline* newPipeline(
+  ServicePipeline* newPipeline(
       std::shared_ptr<AsyncSocket> socket) override {
-    auto pipeline = new Pipeline();
+    auto pipeline = new ServicePipeline();
     pipeline->addBack(AsyncSocketHandler(socket));
     pipeline->addBack(StringCodec());
     pipeline->addBack(SerialServerDispatcher<Req, Resp>(&service_));
@@ -61,12 +61,12 @@ class ServerPipelineFactory
 };
 
 template <typename Req, typename Resp>
-class ClientPipelineFactory : public PipelineFactory<Pipeline> {
+class ClientPipelineFactory : public PipelineFactory<ServicePipeline> {
  public:
 
-  Pipeline* newPipeline(
+  ServicePipeline* newPipeline(
       std::shared_ptr<AsyncSocket> socket) override {
-    auto pipeline = new Pipeline();
+    auto pipeline = new ServicePipeline();
     pipeline->addBack(AsyncSocketHandler(socket));
     pipeline->addBack(StringCodec());
     pipeline->template getHandler<AsyncSocketHandler>(0)->attachReadCallback();
@@ -101,14 +101,14 @@ TEST(Wangle, ClientServerTest) {
   int port = 1234;
   // server
 
-  ServerBootstrap<Pipeline> server;
+  ServerBootstrap<ServicePipeline> server;
   server.childPipeline(
     std::make_shared<ServerPipelineFactory<std::string, std::string>>());
   server.bind(port);
 
   // client
-  auto client = std::make_shared<ClientBootstrap<Pipeline>>();
-  ClientServiceFactory<Pipeline, std::string, std::string> serviceFactory;
+  auto client = std::make_shared<ClientBootstrap<ServicePipeline>>();
+  ClientServiceFactory<ServicePipeline, std::string, std::string> serviceFactory;
   client->pipelineFactory(
     std::make_shared<ClientPipelineFactory<std::string, std::string>>());
   SocketAddress addr("127.0.0.1", port);