From caf6cdc0cba48d747b8fa9ff5e3e93d3504d2439 Mon Sep 17 00:00:00 2001 From: Dave Watson Date: Thu, 21 May 2015 10:40:26 -0700 Subject: [PATCH] fixup service filter interface Summary: Based on a more thourough reading of finagle's interface: * adds close/isAvailable, which seem very close to thrift's interfaces * ComposedServices are hardcoded to underlying services, to simplify the code (means extra allocs?) * Made everything a shared_ptr * Addd ServiceFactoryFilters Test Plan: Updated the existing unittests and added some new ones Reviewed By: jsedgwick@fb.com Subscribers: doug, fugalh, folly-diffs@, jsedgwick, yfeldblum, chalfant FB internal diff: D2037206 Signature: t1:2037206:1432147489:3464d4c12a9e434d4973febcabbf7b2b3a883257 --- folly/wangle/service/ClientDispatcher.h | 11 +- folly/wangle/service/Service.h | 106 +++++++++++++------ folly/wangle/service/ServiceTest.cpp | 131 ++++++++++++++++-------- 3 files changed, 172 insertions(+), 76 deletions(-) diff --git a/folly/wangle/service/ClientDispatcher.h b/folly/wangle/service/ClientDispatcher.h index 820d6478..d6354f04 100644 --- a/folly/wangle/service/ClientDispatcher.h +++ b/folly/wangle/service/ClientDispatcher.h @@ -26,11 +26,11 @@ namespace folly { namespace wangle { * only one request is allowed at a time. */ template -class SerialClientDispatcher : public InboundHandler +class SerialClientDispatcher : public HandlerAdapter , public Service { public: - typedef typename InboundHandler::Context Context; + typedef typename HandlerAdapter::Context Context; void setPipeline(Pipeline* pipeline) { pipeline_ = pipeline; @@ -54,6 +54,13 @@ class SerialClientDispatcher : public InboundHandler return f; } + virtual Future close() override { + return HandlerAdapter::close(nullptr); + } + + virtual Future close(Context* ctx) override { + return HandlerAdapter::close(ctx); + } private: Pipeline* pipeline_{nullptr}; folly::Optional> p_; diff --git a/folly/wangle/service/Service.h b/folly/wangle/service/Service.h index 1d9aab24..7eb54241 100644 --- a/folly/wangle/service/Service.h +++ b/folly/wangle/service/Service.h @@ -34,6 +34,12 @@ class Service { public: virtual Future operator()(Req request) = 0; virtual ~Service() {} + virtual Future close() { + return makeFuture(); + } + virtual bool isAvailable() { + return true; + } }; /** @@ -57,40 +63,23 @@ class Service { */ template -class Filter { +class ServiceFilter : public Service { public: - virtual Future operator()( - Service* service, ReqA request) = 0; - std::unique_ptr> compose( - Service* service); - virtual ~Filter() {} -}; + explicit ServiceFilter(std::shared_ptr> service) + : service_(service) {} + virtual ~ServiceFilter() {} -template -class ComposedService : public Service { - public: - ComposedService(Service* service, - Filter* filter) - : service_(service) - , filter_(filter) {} - virtual Future operator()(ReqA request) override { - return (*filter_)(service_, request); + virtual Future close() override { + return service_->close(); } - ~ComposedService(){} - private: - Service* service_; - Filter* filter_; -}; + virtual bool isAvailable() override { + return service_->isAvailable(); + } -template - std::unique_ptr> - Filter::compose(Service* service) { - return folly::make_unique>( - service, this); -} + protected: + std::shared_ptr> service_; +}; /** * A factory that creates services, given a client. This lets you @@ -101,10 +90,65 @@ template class ServiceFactory { public: - virtual Future*> operator()( - ClientBootstrap* client) = 0; + virtual Future>> operator()( + std::shared_ptr> client) = 0; virtual ~ServiceFactory() = default; + }; + +template +class ConstFactory : public ServiceFactory { + public: + explicit ConstFactory(std::shared_ptr> service) + : service_(service) {} + + virtual Future>> operator()( + std::shared_ptr> client) { + return service_; + } + private: + std::shared_ptr> service_; +}; + +template +class ServiceFactoryFilter : public ServiceFactory { + public: + explicit ServiceFactoryFilter( + std::shared_ptr> serviceFactory) + : serviceFactory_(std::move(serviceFactory)) {} + + virtual ~ServiceFactoryFilter() = default; + + protected: + std::shared_ptr> serviceFactory_; +}; + +template +class FactoryToService : public Service { + public: + explicit FactoryToService( + std::shared_ptr> factory) + : factory_(factory) {} + virtual ~FactoryToService() {} + + virtual Future operator()(Req request) override { + DCHECK(factory_); + return ((*factory_)(nullptr)).then( + [=](std::shared_ptr> service) + { + return (*service)(std::move(request)).ensure( + [this]() { + this->close(); + }); + }); + } + + private: + std::shared_ptr> factory_; +}; + + } // namespace diff --git a/folly/wangle/service/ServiceTest.cpp b/folly/wangle/service/ServiceTest.cpp index 297af72d..4bf37df8 100644 --- a/folly/wangle/service/ServiceTest.cpp +++ b/folly/wangle/service/ServiceTest.cpp @@ -38,14 +38,14 @@ class SimpleDecode : public ByteToMessageCodec { class EchoService : public Service { public: virtual Future operator()(std::string req) override { - return makeFuture(std::move(req)); + return req; } }; class EchoIntService : public Service { public: virtual Future operator()(std::string req) override { - return makeFuture(folly::to(req)); + return folly::to(req); } }; @@ -101,10 +101,10 @@ class ClientServiceFactory : public ServiceFactory { SerialClientDispatcher dispatcher_; }; - Future*> operator()( - ClientBootstrap* client) override { - return makeFuture*>( - new ClientService(client->getPipeline())); + Future>> operator() ( + std::shared_ptr> client) override { + return Future>>( + std::make_shared(client->getPipeline())); } }; @@ -124,8 +124,7 @@ TEST(Wangle, ClientServerTest) { std::make_shared>()); SocketAddress addr("127.0.0.1", port); client->connect(addr); - auto service = std::shared_ptr>( - serviceFactory(client.get()).value()); + auto service = serviceFactory(client).value(); auto rep = (*service)("test"); rep.then([&](std::string value) { @@ -138,68 +137,114 @@ TEST(Wangle, ClientServerTest) { client.reset(); } -class AppendFilter : public Filter { +class AppendFilter : public ServiceFilter { public: - virtual Future operator()( - Service* service, std::string req) { - return (*service)(req + "\n"); + explicit AppendFilter( + std::shared_ptr> service) : + ServiceFilter(service) {} + + virtual Future operator()(std::string req) { + return (*service_)(req + "\n"); } }; -class IntToStringFilter : public Filter { +class IntToStringFilter + : public ServiceFilter { public: - virtual Future operator()( - Service* service, int req) { - return (*service)(folly::to(req)).then([](std::string resp) { + explicit IntToStringFilter( + std::shared_ptr> service) : + ServiceFilter(service) {} + + virtual Future operator()(int req) { + return (*service_)(folly::to(req)).then([](std::string resp) { return folly::to(resp); }); } }; TEST(Wangle, FilterTest) { - auto service = folly::make_unique(); - auto filter = folly::make_unique(); - auto result = (*filter)(service.get(), "test"); + auto service = std::make_shared(); + auto filter = std::make_shared(service); + auto result = (*filter)("test"); EXPECT_EQ(result.value(), "test\n"); - - // Check composition - auto composed_service = filter->compose(service.get()); - auto result2 = (*composed_service)("test"); - EXPECT_EQ(result2.value(), "test\n"); } TEST(Wangle, ComplexFilterTest) { - auto service = folly::make_unique(); - auto filter = folly::make_unique(); - auto result = (*filter)(service.get(), 1); + auto service = std::make_shared(); + auto filter = std::make_shared(service); + auto result = (*filter)(1); EXPECT_EQ(result.value(), 1); - - // Check composition - auto composed_service = filter->compose(service.get()); - auto result2 = (*composed_service)(2); - EXPECT_EQ(result2.value(), 2); } -class ChangeTypeFilter : public Filter { +class ChangeTypeFilter + : public ServiceFilter { public: - virtual Future operator()( - Service* service, int req) { - return (*service)(folly::to(req)).then([](int resp) { + explicit ChangeTypeFilter( + std::shared_ptr> service) : + ServiceFilter(service) {} + + virtual Future operator()(int req) { + return (*service_)(folly::to(req)).then([](int resp) { return folly::to(resp); }); } }; TEST(Wangle, SuperComplexFilterTest) { - auto service = folly::make_unique(); - auto filter = folly::make_unique(); - auto result = (*filter)(service.get(), 1); + auto service = std::make_shared(); + auto filter = std::make_shared(service); + auto result = (*filter)(1); EXPECT_EQ(result.value(), "1"); +} + +template +class ConnectionCountFilter : public ServiceFactoryFilter { + public: + explicit ConnectionCountFilter( + std::shared_ptr> factory) + : ServiceFactoryFilter(factory) {} + + virtual Future>> operator()( + std::shared_ptr> client) { + connectionCount++; + return (*this->serviceFactory_)(client); + } + + int connectionCount{0}; +}; + +TEST(Wangle, ServiceFactoryFilter) { + auto clientFactory = + std::make_shared< + ClientServiceFactory>(); + auto countingFactory = + std::make_shared< + ConnectionCountFilter>( + clientFactory); + + auto client = std::make_shared>(); + + client->pipelineFactory( + std::make_shared>()); + // It doesn't matter if connect succeds or not, but it needs to be called + // to create a pipeline + client->connect(folly::SocketAddress("::1", 8090)); + + auto service = (*countingFactory)(client).value(); + + // After the first service goes away, the client can be reused + service = (*countingFactory)(client).value(); + EXPECT_EQ(2, countingFactory->connectionCount); +} + +TEST(Wangle, FactoryToService) { + auto constfactory = + std::make_shared>( + std::make_shared()); + FactoryToService service( + constfactory); - // Check composition - auto composed_service = filter->compose(service.get()); - auto result2 = (*composed_service)(2); - EXPECT_EQ(result2.value(), "2"); + EXPECT_EQ("test", service("test").value()); } int main(int argc, char** argv) { -- 2.34.1