typedef ChannelPipeline<IOBufQueue&, std::unique_ptr<IOBuf>> Pipeline;
-class TestServer : public ServerBootstrap<Pipeline> {
- Pipeline* newPipeline(std::shared_ptr<AsyncSocket>) {
- return nullptr;
- }
-};
+typedef ServerBootstrap<Pipeline> TestServer;
+typedef ClientBootstrap<Pipeline> TestClient;
-class TestClient : public ClientBootstrap<Pipeline> {
+class TestClientPipelineFactory : public PipelineFactory<Pipeline> {
+ public:
Pipeline* newPipeline(std::shared_ptr<AsyncSocket> sock) {
CHECK(sock->good());
server.getSockets()[0]->getAddress(&address);
TestClient client;
+ client.pipelineFactory(std::make_shared<TestClientPipelineFactory>());
client.connect(address);
base->loop();
server.stop();
server.getSockets()[0]->getAddress(&address);
TestClient client;
+ client.pipelineFactory(std::make_shared<TestClientPipelineFactory>());
+
client.connect(address);
TestClient client2;
+ client2.pipelineFactory(std::make_shared<TestClientPipelineFactory>());
client2.connect(address);
base->loop();
boost::barrier barrier(2);
auto thread = std::thread([&](){
TestClient client;
+ client.pipelineFactory(std::make_shared<TestClientPipelineFactory>());
client.connect(address);
EventBaseManager::get()->getEventBase()->loop();
barrier.wait();
server.getSockets()[0]->getAddress(&address);
TestClient client;
+ client.pipelineFactory(std::make_shared<TestClientPipelineFactory>());
+
client.connect(address);
EventBaseManager::get()->getEventBase()->loop();
server.getSockets()[0]->getAddress(&address);
TestClient client;
+ client.pipelineFactory(std::make_shared<TestClientPipelineFactory>());
client.connect(address);
TestClient client2;
+ client2.pipelineFactory(std::make_shared<TestClientPipelineFactory>());
client2.connect(address);
TestClient client3;
+ client3.pipelineFactory(std::make_shared<TestClientPipelineFactory>());
client3.connect(address);
TestClient client4;
+ client4.pipelineFactory(std::make_shared<TestClientPipelineFactory>());
client4.connect(address);
TestClient client5;
+ client5.pipelineFactory(std::make_shared<TestClientPipelineFactory>());
client5.connect(address);
EventBaseManager::get()->getEventBase()->loop();
return this;
}
ClientBootstrap* connect(SocketAddress address) {
+ DCHECK(pipelineFactory_);
pipeline_.reset(
- newPipeline(
+ pipelineFactory_->newPipeline(
AsyncSocket::newSocket(EventBaseManager::get()->getEventBase(), address)
));
return this;
}
+ ClientBootstrap* pipelineFactory(
+ std::shared_ptr<PipelineFactory<Pipeline>> factory) {
+ pipelineFactory_ = factory;
+ return this;
+ }
+
+ Pipeline* getPipeline() {
+ return pipeline_.get();
+ }
+
virtual ~ClientBootstrap() {}
protected:
int port_;
- virtual Pipeline* newPipeline(std::shared_ptr<AsyncSocket> socket) = 0;
+ std::shared_ptr<PipelineFactory<Pipeline>> pipelineFactory_;
};
} // namespace
--- /dev/null
+/*
+ * Copyright 2015 Facebook, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#pragma once
+
+#include <folly/wangle/channel/ChannelHandler.h>
+
+namespace folly { namespace wangle {
+
+/**
+ * Dispatch a request, satisfying Promise `p` with the response;
+ * the returned Future is satisfied when the response is received:
+ * only one request is allowed at a time.
+ */
+template <typename Pipeline, typename Req, typename Resp = Req>
+class SerialClientDispatcher : public ChannelHandlerAdapter<Req, Resp>
+ , public Service<Req, Resp> {
+ public:
+
+ typedef typename ChannelHandlerAdapter<Req, Resp>::Context Context;
+
+ void setPipeline(Pipeline* pipeline) {
+ pipeline_ = pipeline;
+ pipeline->addBack(
+ ChannelHandlerPtr<SerialClientDispatcher<Pipeline, Req, Resp>, false>(
+ this));
+ pipeline->finalize();
+ }
+
+ void read(Context* ctx, Req in) override {
+ DCHECK(p_);
+ p_->setValue(std::move(in));
+ p_ = none;
+ }
+
+ virtual Future<Resp> operator()(Req arg) override {
+ CHECK(!p_);
+ DCHECK(pipeline_);
+
+ p_ = Promise<Resp>();
+ auto f = p_->getFuture();
+ pipeline_->write(std::move(arg));
+ return f;
+ }
+
+ private:
+ Pipeline* pipeline_{nullptr};
+ folly::Optional<Promise<Resp>> p_;
+};
+
+}} // namespace
--- /dev/null
+/*
+ * Copyright 2015 Facebook, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#pragma once
+
+#include <folly/wangle/channel/ChannelHandler.h>
+
+namespace folly { namespace wangle {
+
+/**
+ * Dispatch requests from pipeline one at a time synchronously.
+ * Concurrent requests are queued in the pipeline.
+ */
+template <typename Req, typename Resp = Req>
+class SerialServerDispatcher : public ChannelHandlerAdapter<Req, Resp> {
+ public:
+
+ typedef typename ChannelHandlerAdapter<Req, Resp>::Context Context;
+
+ explicit SerialServerDispatcher(Service<Req, Resp>* service)
+ : service_(service) {}
+
+ void read(Context* ctx, Req in) override {
+ auto resp = (*service_)(std::move(in)).get();
+ ctx->fireWrite(std::move(resp));
+ }
+
+ private:
+
+ Service<Req, Resp>* service_;
+};
+
+}} // namespace
--- /dev/null
+/*
+ * Copyright 2015 Facebook, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#pragma once
+
+#include <folly/futures/Future.h>
+#include <folly/Memory.h>
+
+#include <folly/wangle/bootstrap/ServerBootstrap.h>
+#include <folly/wangle/bootstrap/ClientBootstrap.h>
+#include <folly/wangle/channel/ChannelPipeline.h>
+#include <folly/wangle/channel/AsyncSocketHandler.h>
+
+namespace folly {
+
+/**
+ * A Service is an asynchronous function from Request to
+ * Future<Response>. It is the basic unit of the RPC interface.
+ */
+template <typename Req, typename Resp = Req>
+class Service {
+ public:
+ virtual Future<Resp> operator()(Req request) = 0;
+ virtual ~Service() {}
+};
+
+/**
+ * A Filter acts as a decorator/transformer of a service. It may apply
+ * transformations to the input and output of that service:
+ *
+ * class MyService
+ *
+ * ReqA -> |
+ * | -> ReqB
+ * | <- RespB
+ * RespA <- |
+ *
+ * For example, you may have a service that takes Strings and parses
+ * them as Ints. If you want to expose this as a Network Service via
+ * Thrift, it is nice to isolate the protocol handling from the
+ * business rules. Hence you might have a Filter that converts back
+ * and forth between Thrift structs:
+ *
+ * [ThriftIn -> (String -> Int) -> ThriftOut]
+ */
+template <typename ReqA, typename RespA,
+ typename ReqB = ReqA, typename RespB = RespA>
+class Filter {
+ public:
+ virtual Future<RespA> operator()(
+ Service<ReqB, RespB>* service, ReqA request) = 0;
+ std::unique_ptr<Service<ReqA, RespA>> compose(
+ Service<ReqB, RespB>* service);
+ virtual ~Filter() {}
+};
+
+template <typename ReqA, typename RespA,
+ typename ReqB = ReqA, typename RespB = RespA>
+class ComposedService : public Service<ReqA, RespA> {
+ public:
+ ComposedService(Service<ReqB, RespB>* service,
+ Filter<ReqA, RespA, ReqB, RespB>* filter)
+ : service_(service)
+ , filter_(filter) {}
+ virtual Future<RespA> operator()(ReqA request) override {
+ return (*filter_)(service_, request);
+ }
+
+ ~ComposedService(){}
+ private:
+ Service<ReqB, RespB>* service_;
+ Filter<ReqA, RespA, ReqB, RespB>* filter_;
+};
+
+template <typename ReqA, typename RespA,
+ typename ReqB, typename RespB>
+ std::unique_ptr<Service<ReqA, RespA>>
+ Filter<ReqA, RespA, ReqB, RespB>::compose(Service<ReqB, RespB>* service) {
+ return folly::make_unique<ComposedService<ReqA, RespA, ReqB, RespB>>(
+ service, this);
+}
+
+/**
+ * A factory that creates services, given a client. This lets you
+ * make RPC calls on the Service interface over a client's pipeline.
+ *
+ * Clients can be reused after you are done using the service.
+ */
+template <typename Pipeline, typename Req, typename Resp>
+class ServiceFactory {
+ public:
+ virtual Future<Service<Req, Resp>*> operator()(
+ ClientBootstrap<Pipeline>* client) = 0;
+
+ virtual ~ServiceFactory() = default;
+};
+
+} // namespace
--- /dev/null
+/*
+ * Copyright 2015 Facebook, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#include <gtest/gtest.h>
+
+#include <folly/wangle/service/Service.h>
+#include <folly/wangle/service/ServerDispatcher.h>
+#include <folly/wangle/service/ClientDispatcher.h>
+
+namespace folly {
+
+using namespace wangle;
+
+typedef ChannelPipeline<IOBufQueue&, std::string> Pipeline;
+
+class EchoService : public Service<std::string, std::string> {
+ public:
+ virtual Future<std::string> operator()(std::string req) override {
+ return makeFuture<std::string>(std::move(req));
+ }
+};
+
+class EchoIntService : public Service<std::string, int> {
+ public:
+ virtual Future<int> operator()(std::string req) override {
+ return makeFuture<int>(folly::to<int>(req));
+ }
+};
+
+class StringCodec : public ChannelHandler<IOBufQueue&, std::string,
+ std::string, std::unique_ptr<IOBuf>> {
+ public:
+ typedef typename ChannelHandler<
+ IOBufQueue&, std::string,
+ std::string, std::unique_ptr<IOBuf>>::Context Context;
+
+ void read(Context* ctx, IOBufQueue& q) override {
+ auto buf = q.pop_front();
+ buf->coalesce();
+ std::string data((const char*)buf->data(), buf->length());
+
+ ctx->fireRead(data);
+ }
+
+ Future<void> write(Context* ctx, std::string msg) override {
+ auto buf = IOBuf::copyBuffer(msg.data(), msg.length());
+ return ctx->fireWrite(std::move(buf));
+ }
+};
+
+template <typename Req, typename Resp>
+class ServerPipelineFactory
+ : public PipelineFactory<Pipeline> {
+ public:
+
+ Pipeline* newPipeline(
+ std::shared_ptr<AsyncSocket> socket) override {
+ auto pipeline = new Pipeline();
+ pipeline->addBack(AsyncSocketHandler(socket));
+ pipeline->addBack(StringCodec());
+ pipeline->addBack(SerialServerDispatcher<Req, Resp>(&service_));
+ pipeline->finalize();
+ pipeline->template getHandler<AsyncSocketHandler>(0)->attachReadCallback();
+ return pipeline;
+ }
+
+ private:
+ EchoService service_;
+};
+
+template <typename Req, typename Resp>
+class ClientPipelineFactory : public PipelineFactory<Pipeline> {
+ public:
+
+ Pipeline* newPipeline(
+ std::shared_ptr<AsyncSocket> socket) override {
+ auto pipeline = new Pipeline();
+ pipeline->addBack(AsyncSocketHandler(socket));
+ pipeline->addBack(StringCodec());
+ pipeline->template getHandler<AsyncSocketHandler>(0)->attachReadCallback();
+
+ return pipeline;
+ }
+};
+
+template <typename Pipeline, typename Req, typename Resp>
+class ClientServiceFactory : public ServiceFactory<Pipeline, Req, Resp> {
+ public:
+ class ClientService : public Service<Req, Resp> {
+ public:
+ explicit ClientService(Pipeline* pipeline) {
+ dispatcher_.setPipeline(pipeline);
+ }
+ Future<Resp> operator()(Req request) override {
+ return dispatcher_(std::move(request));
+ }
+ private:
+ SerialClientDispatcher<Pipeline, Req, Resp> dispatcher_;
+ };
+
+ Future<Service<Req, Resp>*> operator()(
+ ClientBootstrap<Pipeline>* client) override {
+ return makeFuture<Service<Req, Resp>*>(
+ new ClientService(client->getPipeline()));
+ }
+};
+
+TEST(Wangle, ClientServerTest) {
+ int port = 1234;
+ // server
+
+ ServerBootstrap<Pipeline> server;
+ server.childPipeline(
+ std::make_shared<ServerPipelineFactory<std::string, std::string>>());
+ server.bind(port);
+
+ // client
+ ClientBootstrap<Pipeline> client;
+ ClientServiceFactory<Pipeline, std::string, std::string> serviceFactory;
+ client.pipelineFactory(
+ std::make_shared<ClientPipelineFactory<std::string, std::string>>());
+ SocketAddress addr("127.0.0.1", port);
+ client.connect(addr);
+ auto service = serviceFactory(&client).value();
+ auto rep = (*service)("test");
+
+ rep.then([&](std::string value) {
+ EXPECT_EQ("test", value);
+ EventBaseManager::get()->getEventBase()->terminateLoopSoon();
+
+ });
+ EventBaseManager::get()->getEventBase()->loopForever();
+ server.stop();
+}
+
+class AppendFilter : public Filter<std::string, std::string> {
+ public:
+ virtual Future<std::string> operator()(
+ Service<std::string, std::string>* service, std::string req) {
+ return (*service)(req + "\n");
+ }
+};
+
+class IntToStringFilter : public Filter<int, int, std::string, std::string> {
+ public:
+ virtual Future<int> operator()(
+ Service<std::string, std::string>* service, int req) {
+ return (*service)(folly::to<std::string>(req)).then([](std::string resp) {
+ return folly::to<int>(resp);
+ });
+ }
+};
+
+TEST(Wangle, FilterTest) {
+ auto service = folly::make_unique<EchoService>();
+ auto filter = folly::make_unique<AppendFilter>();
+ auto result = (*filter)(service.get(), "test");
+ EXPECT_EQ(result.value(), "test\n");
+
+ // Check composition
+ auto composed_service = filter->compose(service.get());
+ auto result2 = (*composed_service)("test");
+ EXPECT_EQ(result2.value(), "test\n");
+}
+
+TEST(Wangle, ComplexFilterTest) {
+ auto service = folly::make_unique<EchoService>();
+ auto filter = folly::make_unique<IntToStringFilter>();
+ auto result = (*filter)(service.get(), 1);
+ EXPECT_EQ(result.value(), 1);
+
+ // Check composition
+ auto composed_service = filter->compose(service.get());
+ auto result2 = (*composed_service)(2);
+ EXPECT_EQ(result2.value(), 2);
+}
+
+class ChangeTypeFilter : public Filter<int, std::string, std::string, int> {
+ public:
+ virtual Future<std::string> operator()(
+ Service<std::string, int>* service, int req) {
+ return (*service)(folly::to<std::string>(req)).then([](int resp) {
+ return folly::to<std::string>(resp);
+ });
+ }
+};
+
+TEST(Wangle, SuperComplexFilterTest) {
+ auto service = folly::make_unique<EchoIntService>();
+ auto filter = folly::make_unique<ChangeTypeFilter>();
+ auto result = (*filter)(service.get(), 1);
+ EXPECT_EQ(result.value(), "1");
+
+ // Check composition
+ auto composed_service = filter->compose(service.get());
+ auto result2 = (*composed_service)(2);
+ EXPECT_EQ(result2.value(), "2");
+}
+
+int main(int argc, char** argv) {
+ testing::InitGoogleTest(&argc, argv);
+ google::InitGoogleLogging(argv[0]);
+ google::ParseCommandLineFlags(&argc, &argv, true);
+
+ return RUN_ALL_TESTS();
+}
+
+} // namespace