fixup service filter interface
authorDave Watson <davejwatson@fb.com>
Thu, 21 May 2015 17:40:26 +0000 (10:40 -0700)
committerwoo <woo@fb.com>
Tue, 26 May 2015 18:31:39 +0000 (11:31 -0700)
Summary:
Based on a more thourough reading of finagle's interface:
* adds close/isAvailable, which seem very close to thrift's interfaces
* ComposedServices are hardcoded to underlying services, to simplify the code (means extra allocs?)
* Made everything a shared_ptr
* Addd ServiceFactoryFilters

Test Plan: Updated the existing unittests and added some new ones

Reviewed By: jsedgwick@fb.com

Subscribers: doug, fugalh, folly-diffs@, jsedgwick, yfeldblum, chalfant

FB internal diff: D2037206

Signature: t1:2037206:1432147489:3464d4c12a9e434d4973febcabbf7b2b3a883257

folly/wangle/service/ClientDispatcher.h
folly/wangle/service/Service.h
folly/wangle/service/ServiceTest.cpp

index 820d6478cea6a8639118e01c1d548b835a1a3782..d6354f04f98f717ff394749bc90d087250430775 100644 (file)
@@ -26,11 +26,11 @@ namespace folly { namespace wangle {
  * only one request is allowed at a time.
  */
 template <typename Pipeline, typename Req, typename Resp = Req>
-class SerialClientDispatcher : public InboundHandler<Req>
+class SerialClientDispatcher : public HandlerAdapter<Req, Resp>
                              , public Service<Req, Resp> {
  public:
 
-  typedef typename InboundHandler<Req>::Context Context;
+  typedef typename HandlerAdapter<Req, Resp>::Context Context;
 
   void setPipeline(Pipeline* pipeline) {
     pipeline_ = pipeline;
@@ -54,6 +54,13 @@ class SerialClientDispatcher : public InboundHandler<Req>
     return f;
   }
 
+  virtual Future<void> close() override {
+    return HandlerAdapter<Req, Resp>::close(nullptr);
+  }
+
+  virtual Future<void> close(Context* ctx) override {
+    return HandlerAdapter<Req, Resp>::close(ctx);
+  }
  private:
   Pipeline* pipeline_{nullptr};
   folly::Optional<Promise<Resp>> p_;
index 1d9aab2412329c4c786286bfe101667118d5317d..7eb54241e098774368fd03e9abf618965b904b41 100644 (file)
@@ -34,6 +34,12 @@ class Service {
  public:
   virtual Future<Resp> operator()(Req request) = 0;
   virtual ~Service() {}
+  virtual Future<void> close() {
+    return makeFuture();
+  }
+  virtual bool isAvailable() {
+    return true;
+  }
 };
 
 /**
@@ -57,40 +63,23 @@ class Service {
  */
 template <typename ReqA, typename RespA,
           typename ReqB = ReqA, typename RespB = RespA>
-class Filter {
+class ServiceFilter : public Service<ReqA, RespA> {
   public:
-  virtual Future<RespA> operator()(
-    Service<ReqB, RespB>* service, ReqA request) = 0;
-  std::unique_ptr<Service<ReqA, RespA>> compose(
-    Service<ReqB, RespB>* service);
-  virtual ~Filter() {}
-};
+  explicit ServiceFilter(std::shared_ptr<Service<ReqB, RespB>> service)
+      : service_(service) {}
+  virtual ~ServiceFilter() {}
 
-template <typename ReqA, typename RespA,
-          typename ReqB = ReqA, typename RespB = RespA>
-class ComposedService : public Service<ReqA, RespA> {
-  public:
-  ComposedService(Service<ReqB, RespB>* service,
-                  Filter<ReqA, RespA, ReqB, RespB>* filter)
-    : service_(service)
-    , filter_(filter) {}
-  virtual Future<RespA> operator()(ReqA request) override {
-    return (*filter_)(service_, request);
+  virtual Future<void> close() override {
+    return service_->close();
   }
 
-  ~ComposedService(){}
-  private:
-  Service<ReqB, RespB>* service_;
-  Filter<ReqA, RespA, ReqB, RespB>* filter_;
-};
+  virtual bool isAvailable() override {
+    return service_->isAvailable();
+  }
 
-template <typename ReqA, typename RespA,
-          typename ReqB, typename RespB>
-  std::unique_ptr<Service<ReqA, RespA>>
-    Filter<ReqA, RespA, ReqB, RespB>::compose(Service<ReqB, RespB>* service) {
-  return  folly::make_unique<ComposedService<ReqA, RespA, ReqB, RespB>>(
-    service, this);
-}
+ protected:
+  std::shared_ptr<Service<ReqB, RespB>> service_;
+};
 
 /**
  * A factory that creates services, given a client.  This lets you
@@ -101,10 +90,65 @@ template <typename ReqA, typename RespA,
 template <typename Pipeline, typename Req, typename Resp>
 class ServiceFactory {
  public:
-  virtual Future<Service<Req, Resp>*> operator()(
-    ClientBootstrap<Pipeline>* client) = 0;
+  virtual Future<std::shared_ptr<Service<Req, Resp>>> operator()(
+    std::shared_ptr<ClientBootstrap<Pipeline>> client) = 0;
 
   virtual ~ServiceFactory() = default;
+
 };
 
+
+template <typename Pipeline, typename Req, typename Resp>
+class ConstFactory : public ServiceFactory<Pipeline, Req, Resp> {
+ public:
+  explicit ConstFactory(std::shared_ptr<Service<Req, Resp>> service)
+      : service_(service) {}
+
+  virtual Future<std::shared_ptr<Service<Req, Resp>>> operator()(
+    std::shared_ptr<ClientBootstrap<Pipeline>> client) {
+    return service_;
+  }
+ private:
+  std::shared_ptr<Service<Req, Resp>> service_;
+};
+
+template <typename Pipeline, typename ReqA, typename RespA,
+          typename ReqB = ReqA, typename RespB = RespA>
+class ServiceFactoryFilter : public ServiceFactory<Pipeline, ReqA, RespA> {
+ public:
+  explicit ServiceFactoryFilter(
+    std::shared_ptr<ServiceFactory<Pipeline, ReqB, RespB>> serviceFactory)
+      : serviceFactory_(std::move(serviceFactory)) {}
+
+  virtual ~ServiceFactoryFilter() = default;
+
+ protected:
+  std::shared_ptr<ServiceFactory<Pipeline, ReqB, RespB>> serviceFactory_;
+};
+
+template <typename Pipeline, typename Req, typename Resp = Req>
+class FactoryToService : public Service<Req, Resp> {
+ public:
+  explicit FactoryToService(
+    std::shared_ptr<ServiceFactory<Pipeline, Req, Resp>> factory)
+      : factory_(factory) {}
+  virtual ~FactoryToService() {}
+
+  virtual Future<Resp> operator()(Req request) override {
+    DCHECK(factory_);
+    return ((*factory_)(nullptr)).then(
+      [=](std::shared_ptr<Service<Req, Resp>> service)
+      {
+        return (*service)(std::move(request)).ensure(
+          [this]() {
+            this->close();
+          });
+      });
+  }
+
+ private:
+  std::shared_ptr<ServiceFactory<Pipeline, Req, Resp>> factory_;
+};
+
+
 } // namespace
index 297af72d9c9f1473814115cc4565c5aa11c5f37b..4bf37df8d184d34bd3f7f30f2b8efb123cee1b87 100644 (file)
@@ -38,14 +38,14 @@ class SimpleDecode : public ByteToMessageCodec {
 class EchoService : public Service<std::string, std::string> {
  public:
   virtual Future<std::string> operator()(std::string req) override {
-    return makeFuture<std::string>(std::move(req));
+    return req;
   }
 };
 
 class EchoIntService : public Service<std::string, int> {
  public:
   virtual Future<int> operator()(std::string req) override {
-    return makeFuture<int>(folly::to<int>(req));
+    return folly::to<int>(req);
   }
 };
 
@@ -101,10 +101,10 @@ class ClientServiceFactory : public ServiceFactory<Pipeline, Req, Resp> {
     SerialClientDispatcher<Pipeline, Req, Resp> dispatcher_;
   };
 
-  Future<Service<Req, Resp>*> operator()(
-      ClientBootstrap<Pipeline>* client) override {
-    return makeFuture<Service<Req, Resp>*>(
-      new ClientService(client->getPipeline()));
+  Future<std::shared_ptr<Service<Req, Resp>>> operator() (
+    std::shared_ptr<ClientBootstrap<Pipeline>> client) override {
+    return Future<std::shared_ptr<Service<Req, Resp>>>(
+      std::make_shared<ClientService>(client->getPipeline()));
   }
 };
 
@@ -124,8 +124,7 @@ TEST(Wangle, ClientServerTest) {
     std::make_shared<ClientPipelineFactory<std::string, std::string>>());
   SocketAddress addr("127.0.0.1", port);
   client->connect(addr);
-  auto service = std::shared_ptr<Service<std::string, std::string>>(
-    serviceFactory(client.get()).value());
+  auto service = serviceFactory(client).value();
   auto rep = (*service)("test");
 
   rep.then([&](std::string value) {
@@ -138,68 +137,114 @@ TEST(Wangle, ClientServerTest) {
   client.reset();
 }
 
-class AppendFilter : public Filter<std::string, std::string> {
+class AppendFilter : public ServiceFilter<std::string, std::string> {
  public:
-  virtual Future<std::string> operator()(
-    Service<std::string, std::string>* service, std::string req) {
-    return (*service)(req + "\n");
+  explicit AppendFilter(
+    std::shared_ptr<Service<std::string, std::string>> service) :
+      ServiceFilter<std::string, std::string>(service) {}
+
+  virtual Future<std::string> operator()(std::string req) {
+    return (*service_)(req + "\n");
   }
 };
 
-class IntToStringFilter : public Filter<int, int, std::string, std::string> {
+class IntToStringFilter
+    : public ServiceFilter<int, int, std::string, std::string> {
  public:
-  virtual Future<int> operator()(
-    Service<std::string, std::string>* service, int req) {
-    return (*service)(folly::to<std::string>(req)).then([](std::string resp) {
+  explicit IntToStringFilter(
+    std::shared_ptr<Service<std::string, std::string>> service) :
+      ServiceFilter<int, int, std::string, std::string>(service) {}
+
+  virtual Future<int> operator()(int req) {
+    return (*service_)(folly::to<std::string>(req)).then([](std::string resp) {
       return folly::to<int>(resp);
     });
   }
 };
 
 TEST(Wangle, FilterTest) {
-  auto service = folly::make_unique<EchoService>();
-  auto filter = folly::make_unique<AppendFilter>();
-  auto result = (*filter)(service.get(), "test");
+  auto service = std::make_shared<EchoService>();
+  auto filter = std::make_shared<AppendFilter>(service);
+  auto result = (*filter)("test");
   EXPECT_EQ(result.value(), "test\n");
-
-  // Check composition
-  auto composed_service = filter->compose(service.get());
-  auto result2 = (*composed_service)("test");
-  EXPECT_EQ(result2.value(), "test\n");
 }
 
 TEST(Wangle, ComplexFilterTest) {
-  auto service = folly::make_unique<EchoService>();
-  auto filter = folly::make_unique<IntToStringFilter>();
-  auto result = (*filter)(service.get(), 1);
+  auto service = std::make_shared<EchoService>();
+  auto filter = std::make_shared<IntToStringFilter>(service);
+  auto result = (*filter)(1);
   EXPECT_EQ(result.value(), 1);
-
-  // Check composition
-  auto composed_service = filter->compose(service.get());
-  auto result2 = (*composed_service)(2);
-  EXPECT_EQ(result2.value(), 2);
 }
 
-class ChangeTypeFilter : public Filter<int, std::string, std::string, int> {
+class ChangeTypeFilter
+    : public ServiceFilter<int, std::string, std::string, int> {
  public:
-  virtual Future<std::string> operator()(
-    Service<std::string, int>* service, int req) {
-    return (*service)(folly::to<std::string>(req)).then([](int resp) {
+  explicit ChangeTypeFilter(
+    std::shared_ptr<Service<std::string, int>> service) :
+      ServiceFilter<int, std::string, std::string, int>(service) {}
+
+  virtual Future<std::string> operator()(int req) {
+    return (*service_)(folly::to<std::string>(req)).then([](int resp) {
       return folly::to<std::string>(resp);
     });
   }
 };
 
 TEST(Wangle, SuperComplexFilterTest) {
-  auto service = folly::make_unique<EchoIntService>();
-  auto filter = folly::make_unique<ChangeTypeFilter>();
-  auto result = (*filter)(service.get(), 1);
+  auto service = std::make_shared<EchoIntService>();
+  auto filter = std::make_shared<ChangeTypeFilter>(service);
+  auto result = (*filter)(1);
   EXPECT_EQ(result.value(), "1");
+}
+
+template <typename Pipeline, typename Req, typename Resp>
+class ConnectionCountFilter : public ServiceFactoryFilter<Pipeline, Req, Resp> {
+ public:
+  explicit ConnectionCountFilter(
+    std::shared_ptr<ServiceFactory<Pipeline, Req, Resp>> factory)
+      : ServiceFactoryFilter<Pipeline, Req, Resp>(factory) {}
+
+    virtual Future<std::shared_ptr<Service<Req, Resp>>> operator()(
+      std::shared_ptr<ClientBootstrap<Pipeline>> client) {
+      connectionCount++;
+      return (*this->serviceFactory_)(client);
+    }
+
+  int connectionCount{0};
+};
+
+TEST(Wangle, ServiceFactoryFilter) {
+  auto clientFactory =
+    std::make_shared<
+    ClientServiceFactory<ServicePipeline, std::string, std::string>>();
+  auto countingFactory =
+    std::make_shared<
+    ConnectionCountFilter<ServicePipeline, std::string, std::string>>(
+      clientFactory);
+
+  auto client = std::make_shared<ClientBootstrap<ServicePipeline>>();
+
+  client->pipelineFactory(
+    std::make_shared<ClientPipelineFactory<std::string, std::string>>());
+  // It doesn't matter if connect succeds or not, but it needs to be called
+  // to create a pipeline
+  client->connect(folly::SocketAddress("::1", 8090));
+
+  auto service = (*countingFactory)(client).value();
+
+  // After the first service goes away, the client can be reused
+  service = (*countingFactory)(client).value();
+  EXPECT_EQ(2, countingFactory->connectionCount);
+}
+
+TEST(Wangle, FactoryToService) {
+  auto constfactory =
+    std::make_shared<ConstFactory<ServicePipeline, std::string, std::string>>(
+    std::make_shared<EchoService>());
+  FactoryToService<ServicePipeline, std::string, std::string> service(
+    constfactory);
 
-  // Check composition
-  auto composed_service = filter->compose(service.get());
-  auto result2 = (*composed_service)(2);
-  EXPECT_EQ(result2.value(), "2");
+  EXPECT_EQ("test", service("test").value());
 }
 
 int main(int argc, char** argv) {