Finagle interfaces
authorDave Watson <davejwatson@fb.com>
Fri, 23 Jan 2015 17:50:35 +0000 (09:50 -0800)
committerwoo <woo@fb.com>
Mon, 2 Feb 2015 21:12:31 +0000 (13:12 -0800)
Summary: Future service interfaces similar to finagle.  Service creators for client, filters

Test Plan: Unittests included - also sets up a simple pipeline to test a full stack client/server.

Reviewed By: hans@fb.com

Subscribers: jsedgwick, trunkagent, njormrod, folly-diffs@, doug, fugalh

FB internal diff: D1573086

Tasks: 5002456

Signature: t1:1573086:1421970698:328453c4a980bb6950fc9aeed6a2b6d9819c20db

folly/wangle/bootstrap/BootstrapTest.cpp
folly/wangle/bootstrap/ClientBootstrap.h
folly/wangle/service/ClientDispatcher.h [new file with mode: 0644]
folly/wangle/service/ServerDispatcher.h [new file with mode: 0644]
folly/wangle/service/Service.h [new file with mode: 0644]
folly/wangle/service/ServiceTest.cpp [new file with mode: 0644]

index 1567978b676319dadc21d4f630c01c1756a04cc5..9f2f664e771c761890a72fd0087d60b874e1d3d4 100644 (file)
@@ -26,13 +26,11 @@ using namespace folly;
 
 typedef ChannelPipeline<IOBufQueue&, std::unique_ptr<IOBuf>> Pipeline;
 
-class TestServer : public ServerBootstrap<Pipeline> {
-  Pipeline* newPipeline(std::shared_ptr<AsyncSocket>) {
-    return nullptr;
-  }
-};
+typedef ServerBootstrap<Pipeline> TestServer;
+typedef ClientBootstrap<Pipeline> TestClient;
 
-class TestClient : public ClientBootstrap<Pipeline> {
+class TestClientPipelineFactory : public PipelineFactory<Pipeline> {
+ public:
   Pipeline* newPipeline(std::shared_ptr<AsyncSocket> sock) {
     CHECK(sock->good());
 
@@ -76,6 +74,7 @@ TEST(Bootstrap, ClientServerTest) {
   server.getSockets()[0]->getAddress(&address);
 
   TestClient client;
+  client.pipelineFactory(std::make_shared<TestClientPipelineFactory>());
   client.connect(address);
   base->loop();
   server.stop();
@@ -98,9 +97,12 @@ TEST(Bootstrap, ClientConnectionManagerTest) {
   server.getSockets()[0]->getAddress(&address);
 
   TestClient client;
+  client.pipelineFactory(std::make_shared<TestClientPipelineFactory>());
+
   client.connect(address);
 
   TestClient client2;
+  client2.pipelineFactory(std::make_shared<TestClientPipelineFactory>());
   client2.connect(address);
 
   base->loop();
@@ -124,6 +126,7 @@ TEST(Bootstrap, ServerAcceptGroupTest) {
   boost::barrier barrier(2);
   auto thread = std::thread([&](){
     TestClient client;
+    client.pipelineFactory(std::make_shared<TestClientPipelineFactory>());
     client.connect(address);
     EventBaseManager::get()->getEventBase()->loop();
     barrier.wait();
@@ -162,6 +165,8 @@ TEST(Bootstrap, ServerAcceptGroup2Test) {
   server.getSockets()[0]->getAddress(&address);
 
   TestClient client;
+  client.pipelineFactory(std::make_shared<TestClientPipelineFactory>());
+
   client.connect(address);
   EventBaseManager::get()->getEventBase()->loop();
 
@@ -198,18 +203,23 @@ TEST(Bootstrap, SharedThreadPool) {
   server.getSockets()[0]->getAddress(&address);
 
   TestClient client;
+  client.pipelineFactory(std::make_shared<TestClientPipelineFactory>());
   client.connect(address);
 
   TestClient client2;
+  client2.pipelineFactory(std::make_shared<TestClientPipelineFactory>());
   client2.connect(address);
 
   TestClient client3;
+  client3.pipelineFactory(std::make_shared<TestClientPipelineFactory>());
   client3.connect(address);
 
   TestClient client4;
+  client4.pipelineFactory(std::make_shared<TestClientPipelineFactory>());
   client4.connect(address);
 
   TestClient client5;
+  client5.pipelineFactory(std::make_shared<TestClientPipelineFactory>());
   client5.connect(address);
 
   EventBaseManager::get()->getEventBase()->loop();
index 8ee8fad9a4f0b8f43532df8a217730c90fc71aa9..4010519519cb649d1ff3f128bb748c5dd510c92a 100644 (file)
@@ -33,13 +33,24 @@ class ClientBootstrap {
     return this;
   }
   ClientBootstrap* connect(SocketAddress address) {
+    DCHECK(pipelineFactory_);
     pipeline_.reset(
-      newPipeline(
+      pipelineFactory_->newPipeline(
         AsyncSocket::newSocket(EventBaseManager::get()->getEventBase(), address)
       ));
     return this;
   }
 
+  ClientBootstrap* pipelineFactory(
+      std::shared_ptr<PipelineFactory<Pipeline>> factory) {
+    pipelineFactory_ = factory;
+    return this;
+  }
+
+  Pipeline* getPipeline() {
+    return pipeline_.get();
+  }
+
   virtual ~ClientBootstrap() {}
 
  protected:
@@ -48,7 +59,7 @@ class ClientBootstrap {
 
   int port_;
 
-  virtual Pipeline* newPipeline(std::shared_ptr<AsyncSocket> socket) = 0;
+  std::shared_ptr<PipelineFactory<Pipeline>> pipelineFactory_;
 };
 
 } // namespace
diff --git a/folly/wangle/service/ClientDispatcher.h b/folly/wangle/service/ClientDispatcher.h
new file mode 100644 (file)
index 0000000..a42a74f
--- /dev/null
@@ -0,0 +1,63 @@
+/*
+ * Copyright 2015 Facebook, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#pragma once
+
+#include <folly/wangle/channel/ChannelHandler.h>
+
+namespace folly { namespace wangle {
+
+/**
+ * Dispatch a request, satisfying Promise `p` with the response;
+ * the returned Future is satisfied when the response is received:
+ * only one request is allowed at a time.
+ */
+template <typename Pipeline, typename Req, typename Resp = Req>
+class SerialClientDispatcher : public ChannelHandlerAdapter<Req, Resp>
+                             , public Service<Req, Resp> {
+ public:
+
+  typedef typename ChannelHandlerAdapter<Req, Resp>::Context Context;
+
+  void setPipeline(Pipeline* pipeline) {
+    pipeline_ = pipeline;
+    pipeline->addBack(
+      ChannelHandlerPtr<SerialClientDispatcher<Pipeline, Req, Resp>, false>(
+        this));
+    pipeline->finalize();
+  }
+
+  void read(Context* ctx, Req in) override {
+    DCHECK(p_);
+    p_->setValue(std::move(in));
+    p_ = none;
+  }
+
+  virtual Future<Resp> operator()(Req arg) override {
+    CHECK(!p_);
+    DCHECK(pipeline_);
+
+    p_ = Promise<Resp>();
+    auto f = p_->getFuture();
+    pipeline_->write(std::move(arg));
+    return f;
+  }
+
+ private:
+  Pipeline* pipeline_{nullptr};
+  folly::Optional<Promise<Resp>> p_;
+};
+
+}} // namespace
diff --git a/folly/wangle/service/ServerDispatcher.h b/folly/wangle/service/ServerDispatcher.h
new file mode 100644 (file)
index 0000000..7ec3a4d
--- /dev/null
@@ -0,0 +1,45 @@
+/*
+ * Copyright 2015 Facebook, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#pragma once
+
+#include <folly/wangle/channel/ChannelHandler.h>
+
+namespace folly { namespace wangle {
+
+/**
+ * Dispatch requests from pipeline one at a time synchronously.
+ * Concurrent requests are queued in the pipeline.
+ */
+template <typename Req, typename Resp = Req>
+class SerialServerDispatcher : public ChannelHandlerAdapter<Req, Resp> {
+ public:
+
+  typedef typename ChannelHandlerAdapter<Req, Resp>::Context Context;
+
+  explicit SerialServerDispatcher(Service<Req, Resp>* service)
+      : service_(service) {}
+
+  void read(Context* ctx, Req in) override {
+    auto resp = (*service_)(std::move(in)).get();
+    ctx->fireWrite(std::move(resp));
+  }
+
+ private:
+
+  Service<Req, Resp>* service_;
+};
+
+}} // namespace
diff --git a/folly/wangle/service/Service.h b/folly/wangle/service/Service.h
new file mode 100644 (file)
index 0000000..7d78def
--- /dev/null
@@ -0,0 +1,110 @@
+/*
+ * Copyright 2015 Facebook, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#pragma once
+
+#include <folly/futures/Future.h>
+#include <folly/Memory.h>
+
+#include <folly/wangle/bootstrap/ServerBootstrap.h>
+#include <folly/wangle/bootstrap/ClientBootstrap.h>
+#include <folly/wangle/channel/ChannelPipeline.h>
+#include <folly/wangle/channel/AsyncSocketHandler.h>
+
+namespace folly {
+
+/**
+ * A Service is an asynchronous function from Request to
+ * Future<Response>. It is the basic unit of the RPC interface.
+ */
+template <typename Req, typename Resp = Req>
+class Service {
+ public:
+  virtual Future<Resp> operator()(Req request) = 0;
+  virtual ~Service() {}
+};
+
+/**
+ * A Filter acts as a decorator/transformer of a service. It may apply
+ * transformations to the input and output of that service:
+ *
+ *          class MyService
+ *
+ * ReqA  -> |
+ *          | -> ReqB
+ *          | <- RespB
+ * RespA <- |
+ *
+ * For example, you may have a service that takes Strings and parses
+ * them as Ints.  If you want to expose this as a Network Service via
+ * Thrift, it is nice to isolate the protocol handling from the
+ * business rules. Hence you might have a Filter that converts back
+ * and forth between Thrift structs:
+ *
+ * [ThriftIn -> (String  ->  Int) -> ThriftOut]
+ */
+template <typename ReqA, typename RespA,
+          typename ReqB = ReqA, typename RespB = RespA>
+class Filter {
+  public:
+  virtual Future<RespA> operator()(
+    Service<ReqB, RespB>* service, ReqA request) = 0;
+  std::unique_ptr<Service<ReqA, RespA>> compose(
+    Service<ReqB, RespB>* service);
+  virtual ~Filter() {}
+};
+
+template <typename ReqA, typename RespA,
+          typename ReqB = ReqA, typename RespB = RespA>
+class ComposedService : public Service<ReqA, RespA> {
+  public:
+  ComposedService(Service<ReqB, RespB>* service,
+                  Filter<ReqA, RespA, ReqB, RespB>* filter)
+    : service_(service)
+    , filter_(filter) {}
+  virtual Future<RespA> operator()(ReqA request) override {
+    return (*filter_)(service_, request);
+  }
+
+  ~ComposedService(){}
+  private:
+  Service<ReqB, RespB>* service_;
+  Filter<ReqA, RespA, ReqB, RespB>* filter_;
+};
+
+template <typename ReqA, typename RespA,
+          typename ReqB, typename RespB>
+  std::unique_ptr<Service<ReqA, RespA>>
+    Filter<ReqA, RespA, ReqB, RespB>::compose(Service<ReqB, RespB>* service) {
+  return  folly::make_unique<ComposedService<ReqA, RespA, ReqB, RespB>>(
+    service, this);
+}
+
+/**
+ * A factory that creates services, given a client.  This lets you
+ * make RPC calls on the Service interface over a client's pipeline.
+ *
+ * Clients can be reused after you are done using the service.
+ */
+template <typename Pipeline, typename Req, typename Resp>
+class ServiceFactory {
+ public:
+  virtual Future<Service<Req, Resp>*> operator()(
+    ClientBootstrap<Pipeline>* client) = 0;
+
+  virtual ~ServiceFactory() = default;
+};
+
+} // namespace
diff --git a/folly/wangle/service/ServiceTest.cpp b/folly/wangle/service/ServiceTest.cpp
new file mode 100644 (file)
index 0000000..d54ac9e
--- /dev/null
@@ -0,0 +1,220 @@
+/*
+ * Copyright 2015 Facebook, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#include <gtest/gtest.h>
+
+#include <folly/wangle/service/Service.h>
+#include <folly/wangle/service/ServerDispatcher.h>
+#include <folly/wangle/service/ClientDispatcher.h>
+
+namespace folly {
+
+using namespace wangle;
+
+typedef ChannelPipeline<IOBufQueue&, std::string> Pipeline;
+
+class EchoService : public Service<std::string, std::string> {
+ public:
+  virtual Future<std::string> operator()(std::string req) override {
+    return makeFuture<std::string>(std::move(req));
+  }
+};
+
+class EchoIntService : public Service<std::string, int> {
+ public:
+  virtual Future<int> operator()(std::string req) override {
+    return makeFuture<int>(folly::to<int>(req));
+  }
+};
+
+class StringCodec : public ChannelHandler<IOBufQueue&, std::string,
+                                          std::string, std::unique_ptr<IOBuf>> {
+ public:
+  typedef typename ChannelHandler<
+   IOBufQueue&, std::string,
+   std::string, std::unique_ptr<IOBuf>>::Context Context;
+
+  void read(Context* ctx, IOBufQueue& q) override {
+    auto buf = q.pop_front();
+    buf->coalesce();
+    std::string data((const char*)buf->data(), buf->length());
+
+    ctx->fireRead(data);
+  }
+
+  Future<void> write(Context* ctx, std::string msg) override {
+    auto buf = IOBuf::copyBuffer(msg.data(), msg.length());
+    return ctx->fireWrite(std::move(buf));
+  }
+};
+
+template <typename Req, typename Resp>
+class ServerPipelineFactory
+    : public PipelineFactory<Pipeline> {
+ public:
+
+  Pipeline* newPipeline(
+      std::shared_ptr<AsyncSocket> socket) override {
+    auto pipeline = new Pipeline();
+    pipeline->addBack(AsyncSocketHandler(socket));
+    pipeline->addBack(StringCodec());
+    pipeline->addBack(SerialServerDispatcher<Req, Resp>(&service_));
+    pipeline->finalize();
+    pipeline->template getHandler<AsyncSocketHandler>(0)->attachReadCallback();
+    return pipeline;
+  }
+
+ private:
+  EchoService service_;
+};
+
+template <typename Req, typename Resp>
+class ClientPipelineFactory : public PipelineFactory<Pipeline> {
+ public:
+
+  Pipeline* newPipeline(
+      std::shared_ptr<AsyncSocket> socket) override {
+    auto pipeline = new Pipeline();
+    pipeline->addBack(AsyncSocketHandler(socket));
+    pipeline->addBack(StringCodec());
+    pipeline->template getHandler<AsyncSocketHandler>(0)->attachReadCallback();
+
+    return pipeline;
+   }
+};
+
+template <typename Pipeline, typename Req, typename Resp>
+class ClientServiceFactory : public ServiceFactory<Pipeline, Req, Resp> {
+ public:
+  class ClientService : public Service<Req, Resp> {
+   public:
+    explicit ClientService(Pipeline* pipeline) {
+      dispatcher_.setPipeline(pipeline);
+    }
+    Future<Resp> operator()(Req request) override {
+      return dispatcher_(std::move(request));
+    }
+   private:
+    SerialClientDispatcher<Pipeline, Req, Resp> dispatcher_;
+  };
+
+  Future<Service<Req, Resp>*> operator()(
+      ClientBootstrap<Pipeline>* client) override {
+    return makeFuture<Service<Req, Resp>*>(
+      new ClientService(client->getPipeline()));
+  }
+};
+
+TEST(Wangle, ClientServerTest) {
+  int port = 1234;
+  // server
+
+  ServerBootstrap<Pipeline> server;
+  server.childPipeline(
+    std::make_shared<ServerPipelineFactory<std::string, std::string>>());
+  server.bind(port);
+
+  // client
+  ClientBootstrap<Pipeline> client;
+  ClientServiceFactory<Pipeline, std::string, std::string> serviceFactory;
+  client.pipelineFactory(
+    std::make_shared<ClientPipelineFactory<std::string, std::string>>());
+  SocketAddress addr("127.0.0.1", port);
+  client.connect(addr);
+  auto service = serviceFactory(&client).value();
+  auto rep = (*service)("test");
+
+  rep.then([&](std::string value) {
+    EXPECT_EQ("test", value);
+    EventBaseManager::get()->getEventBase()->terminateLoopSoon();
+
+  });
+  EventBaseManager::get()->getEventBase()->loopForever();
+  server.stop();
+}
+
+class AppendFilter : public Filter<std::string, std::string> {
+ public:
+  virtual Future<std::string> operator()(
+    Service<std::string, std::string>* service, std::string req) {
+    return (*service)(req + "\n");
+  }
+};
+
+class IntToStringFilter : public Filter<int, int, std::string, std::string> {
+ public:
+  virtual Future<int> operator()(
+    Service<std::string, std::string>* service, int req) {
+    return (*service)(folly::to<std::string>(req)).then([](std::string resp) {
+      return folly::to<int>(resp);
+    });
+  }
+};
+
+TEST(Wangle, FilterTest) {
+  auto service = folly::make_unique<EchoService>();
+  auto filter = folly::make_unique<AppendFilter>();
+  auto result = (*filter)(service.get(), "test");
+  EXPECT_EQ(result.value(), "test\n");
+
+  // Check composition
+  auto composed_service = filter->compose(service.get());
+  auto result2 = (*composed_service)("test");
+  EXPECT_EQ(result2.value(), "test\n");
+}
+
+TEST(Wangle, ComplexFilterTest) {
+  auto service = folly::make_unique<EchoService>();
+  auto filter = folly::make_unique<IntToStringFilter>();
+  auto result = (*filter)(service.get(), 1);
+  EXPECT_EQ(result.value(), 1);
+
+  // Check composition
+  auto composed_service = filter->compose(service.get());
+  auto result2 = (*composed_service)(2);
+  EXPECT_EQ(result2.value(), 2);
+}
+
+class ChangeTypeFilter : public Filter<int, std::string, std::string, int> {
+ public:
+  virtual Future<std::string> operator()(
+    Service<std::string, int>* service, int req) {
+    return (*service)(folly::to<std::string>(req)).then([](int resp) {
+      return folly::to<std::string>(resp);
+    });
+  }
+};
+
+TEST(Wangle, SuperComplexFilterTest) {
+  auto service = folly::make_unique<EchoIntService>();
+  auto filter = folly::make_unique<ChangeTypeFilter>();
+  auto result = (*filter)(service.get(), 1);
+  EXPECT_EQ(result.value(), "1");
+
+  // Check composition
+  auto composed_service = filter->compose(service.get());
+  auto result2 = (*composed_service)(2);
+  EXPECT_EQ(result2.value(), "2");
+}
+
+int main(int argc, char** argv) {
+  testing::InitGoogleTest(&argc, argv);
+  google::InitGoogleLogging(argv[0]);
+  google::ParseCommandLineFlags(&argc, &argv, true);
+
+  return RUN_ALL_TESTS();
+}
+
+} // namespace