downstreamConnectionManager_ = ConnectionManager::makeUnique(
eventBase, accConfig_.connectionIdleTimeout, this);
- serverSocket->addAcceptCallback(this, eventBase);
- // SO_KEEPALIVE is the only setting that is inherited by accepted
- // connections so only apply this setting
- for (const auto& option: socketOptions_) {
- if (option.first.level == SOL_SOCKET &&
- option.first.optname == SO_KEEPALIVE && option.second == 1) {
- serverSocket->setKeepAliveEnabled(true);
- break;
+ if (serverSocket) {
+ serverSocket->addAcceptCallback(this, eventBase);
+ // SO_KEEPALIVE is the only setting that is inherited by accepted
+ // connections so only apply this setting
+ for (const auto& option: socketOptions_) {
+ if (option.first.level == SOL_SOCKET &&
+ option.first.optname == SO_KEEPALIVE && option.second == 1) {
+ serverSocket->setKeepAliveEnabled(true);
+ break;
+ }
}
}
}
/**
* Access the Acceptor's event base.
*/
- EventBase* getEventBase() { return base_; }
+ virtual EventBase* getEventBase() const { return base_; }
/**
* Access the Acceptor's downstream (client-side) ConnectionManager
std::chrono::steady_clock::time_point acceptTime
) noexcept;
+ /**
+ * Drains all open connections of their outstanding transactions. When
+ * a connection's transaction count reaches zero, the connection closes.
+ */
+ void drainAllConnections();
+
protected:
friend class AcceptorHandshakeHelper;
*/
void dropAllConnections();
- /**
- * Drains all open connections of their outstanding transactions. When
- * a connection's transaction count reaches zero, the connection closes.
- */
- void drainAllConnections();
+ protected:
/**
* onConnectionsDrained() will be called once all connections have been
std::shared_ptr<SSLCacheProvider> cacheProvider_;
};
+class AcceptorFactory {
+ public:
+ virtual std::shared_ptr<Acceptor> newAcceptor() = 0;
+ virtual ~AcceptorFactory() = default;
+};
+
} // namespace
--- /dev/null
+/*
+ * Copyright 2014 Facebook, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "folly/experimental/wangle/bootstrap/ServerBootstrap.h"
+#include "folly/experimental/wangle/bootstrap/ClientBootstrap.h"
+#include "folly/experimental/wangle/channel/ChannelHandler.h"
+
+#include <glog/logging.h>
+#include <gtest/gtest.h>
+
+using namespace folly::wangle;
+using namespace folly;
+
+typedef ChannelHandlerAdapter<IOBuf> BytesPassthrough;
+typedef ChannelPipeline<BytesPassthrough> Pipeline;
+
+class TestServer : public ServerBootstrap<Pipeline> {
+ Pipeline* newPipeline(std::shared_ptr<AsyncSocket>) {
+ return nullptr;
+ }
+};
+
+class TestClient : public ClientBootstrap<Pipeline> {
+ Pipeline* newPipeline(std::shared_ptr<AsyncSocket> sock) {
+ CHECK(sock->good());
+
+ // We probably aren't connected immedately, check after a small delay
+ EventBaseManager::get()->getEventBase()->runAfterDelay([sock](){
+ CHECK(sock->readable());
+ }, 100);
+ return nullptr;
+ }
+};
+
+class TestPipelineFactory : public PipelineFactory<Pipeline> {
+ public:
+ Pipeline* newPipeline(std::shared_ptr<AsyncSocket> sock) {
+ pipelines++;
+ return new Pipeline(BytesPassthrough());
+ }
+ std::atomic<int> pipelines{0};
+};
+
+TEST(Bootstrap, Basic) {
+ TestServer server;
+ TestClient client;
+}
+
+TEST(Bootstrap, ServerWithPipeline) {
+ TestServer server;
+ server.childPipeline(std::make_shared<TestPipelineFactory>());
+ server.bind(0);
+ auto base = EventBaseManager::get()->getEventBase();
+ base->runAfterDelay([&](){
+ server.stop();
+ }, 500);
+ base->loop();
+}
+
+TEST(Bootstrap, ClientServerTest) {
+ TestServer server;
+ auto factory = std::make_shared<TestPipelineFactory>();
+ server.childPipeline(factory);
+ server.bind(0);
+ auto base = EventBaseManager::get()->getEventBase();
+
+ SocketAddress address;
+ server.getSockets()[0]->getAddress(&address);
+
+ TestClient client;
+ client.connect(address);
+ base->runAfterDelay([&](){
+ server.stop();
+ }, 500);
+ base->loop();
+
+ CHECK(factory->pipelines == 1);
+}
+
+TEST(Bootstrap, ClientConnectionManagerTest) {
+ // Create a single IO thread, and verify that
+ // client connections are pooled properly
+
+ TestServer server;
+ auto factory = std::make_shared<TestPipelineFactory>();
+ server.childPipeline(factory);
+ server.group(std::make_shared<IOThreadPoolExecutor>(1));
+ server.bind(0);
+ auto base = EventBaseManager::get()->getEventBase();
+
+ SocketAddress address;
+ server.getSockets()[0]->getAddress(&address);
+
+ TestClient client;
+ client.connect(address);
+
+ TestClient client2;
+ client2.connect(address);
+
+ base->runAfterDelay([&](){
+ server.stop();
+ }, 500);
+
+ base->loop();
+
+ CHECK(factory->pipelines == 2);
+}
--- /dev/null
+/*
+ * Copyright 2014 Facebook, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#pragma once
+
+#include <folly/experimental/wangle/channel/ChannelPipeline.h>
+
+namespace folly {
+
+/*
+ * A thin wrapper around ChannelPipeline and AsyncSocket to match
+ * ServerBootstrap. On connect() a new pipeline is created.
+ */
+template <typename Pipeline>
+class ClientBootstrap {
+ public:
+ ClientBootstrap() {
+ }
+ ClientBootstrap* bind(int port) {
+ port_ = port;
+ return this;
+ }
+ ClientBootstrap* connect(SocketAddress address) {
+ pipeline_.reset(
+ newPipeline(
+ AsyncSocket::newSocket(EventBaseManager::get()->getEventBase(), address)
+ ));
+ return this;
+ }
+
+ virtual ~ClientBootstrap() {}
+
+ protected:
+ std::unique_ptr<Pipeline,
+ folly::DelayedDestruction::Destructor> pipeline_;
+
+ int port_;
+
+ virtual Pipeline* newPipeline(std::shared_ptr<AsyncSocket> socket) = 0;
+};
+
+} // namespace
--- /dev/null
+/*
+ * Copyright 2014 Facebook, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#pragma once
+
+#include <folly/experimental/wangle/acceptor/Acceptor.h>
+#include <folly/io/async/EventBaseManager.h>
+#include <folly/experimental/wangle/concurrent/IOThreadPoolExecutor.h>
+#include <folly/experimental/wangle/ManagedConnection.h>
+#include <folly/experimental/wangle/channel/ChannelPipeline.h>
+
+namespace folly {
+
+template <typename Pipeline>
+class ServerAcceptor : public Acceptor {
+ typedef std::unique_ptr<Pipeline,
+ folly::DelayedDestruction::Destructor> PipelinePtr;
+
+ class ServerConnection : public wangle::ManagedConnection {
+ public:
+ explicit ServerConnection(PipelinePtr pipeline)
+ : pipeline_(std::move(pipeline)) {}
+
+ ~ServerConnection() {
+ }
+
+ void timeoutExpired() noexcept {
+ }
+
+ void describe(std::ostream& os) const {}
+ bool isBusy() const {
+ return false;
+ }
+ void notifyPendingShutdown() {}
+ void closeWhenIdle() {}
+ void dropConnection() {}
+ void dumpConnectionState(uint8_t loglevel) {}
+ private:
+ PipelinePtr pipeline_;
+ };
+
+ public:
+ explicit ServerAcceptor(
+ std::shared_ptr<PipelineFactory<Pipeline>> pipelineFactory)
+ : Acceptor(ServerSocketConfig())
+ , pipelineFactory_(pipelineFactory) {
+ Acceptor::init(nullptr, &base_);
+ }
+
+ /* See Acceptor::onNewConnection for details */
+ void onNewConnection(
+ AsyncSocket::UniquePtr transport, const SocketAddress* address,
+ const std::string& nextProtocolName, const TransportInfo& tinfo) {
+
+ std::unique_ptr<Pipeline,
+ folly::DelayedDestruction::Destructor>
+ pipeline(pipelineFactory_->newPipeline(
+ std::shared_ptr<AsyncSocket>(
+ transport.release(),
+ folly::DelayedDestruction::Destructor())));
+ auto connection = new ServerConnection(std::move(pipeline));
+ Acceptor::addConnection(connection);
+ }
+
+ ~ServerAcceptor() {
+ Acceptor::dropAllConnections();
+ }
+
+ private:
+ EventBase base_;
+
+ std::shared_ptr<PipelineFactory<Pipeline>> pipelineFactory_;
+};
+
+template <typename Pipeline>
+class ServerAcceptorFactory : public AcceptorFactory {
+ public:
+ explicit ServerAcceptorFactory(
+ std::shared_ptr<PipelineFactory<Pipeline>> factory)
+ : factory_(factory) {}
+
+ std::shared_ptr<Acceptor> newAcceptor() {
+ return std::make_shared<ServerAcceptor<Pipeline>>(factory_);
+ }
+ private:
+ std::shared_ptr<PipelineFactory<Pipeline>> factory_;
+};
+
+class ServerWorkerFactory : public folly::wangle::ThreadFactory {
+ public:
+ explicit ServerWorkerFactory(std::shared_ptr<AcceptorFactory> acceptorFactory)
+ : internalFactory_(
+ std::make_shared<folly::wangle::NamedThreadFactory>("BootstrapWorker"))
+ , acceptorFactory_(acceptorFactory)
+ {}
+ virtual std::thread newThread(folly::wangle::Func&& func) override;
+
+ void setInternalFactory(
+ std::shared_ptr<folly::wangle::NamedThreadFactory> internalFactory);
+ void setNamePrefix(folly::StringPiece prefix);
+
+ template <typename F>
+ void forEachWorker(F&& f);
+
+ private:
+ std::shared_ptr<folly::wangle::NamedThreadFactory> internalFactory_;
+ folly::RWSpinLock workersLock_;
+ std::map<int32_t, std::shared_ptr<Acceptor>> workers_;
+ int32_t nextWorkerId_{0};
+
+ std::shared_ptr<AcceptorFactory> acceptorFactory_;
+};
+
+template <typename F>
+void ServerWorkerFactory::forEachWorker(F&& f) {
+ folly::RWSpinLock::ReadHolder guard(workersLock_);
+ for (const auto& kv : workers_) {
+ f(kv.second.get());
+ }
+}
+
+} // namespace
--- /dev/null
+/*
+ * Copyright 2014 Facebook, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#include <folly/experimental/wangle/bootstrap/ServerBootstrap.h>
+#include <folly/experimental/wangle/concurrent/NamedThreadFactory.h>
+#include <folly/io/async/EventBaseManager.h>
+
+namespace folly {
+
+std::thread ServerWorkerFactory::newThread(
+ folly::wangle::Func&& func) {
+ return internalFactory_->newThread([=](){
+ auto id = nextWorkerId_++;
+ auto worker = acceptorFactory_->newAcceptor();
+ {
+ folly::RWSpinLock::WriteHolder guard(workersLock_);
+ workers_.insert({id, worker});
+ }
+ EventBaseManager::get()->setEventBase(worker->getEventBase(), false);
+ func();
+ EventBaseManager::get()->clearEventBase();
+
+ worker->drainAllConnections();
+ {
+ folly::RWSpinLock::WriteHolder guard(workersLock_);
+ workers_.erase(id);
+ }
+ });
+}
+
+void ServerWorkerFactory::setInternalFactory(
+ std::shared_ptr<wangle::NamedThreadFactory> internalFactory) {
+ CHECK(workers_.empty());
+ internalFactory_ = internalFactory;
+}
+
+void ServerWorkerFactory::setNamePrefix(folly::StringPiece prefix) {
+ CHECK(workers_.empty());
+ internalFactory_->setNamePrefix(prefix);
+}
+
+} // namespace
--- /dev/null
+/*
+ * Copyright 2014 Facebook, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#pragma once
+
+#include <folly/experimental/wangle/bootstrap/ServerBootstrap-inl.h>
+
+namespace folly {
+
+/*
+ * ServerBootstrap is a parent class intended to set up a
+ * high-performance TCP accepting server. It will manage a pool of
+ * accepting threads, any number of accepting sockets, a pool of
+ * IO-worker threads, and connection pool for each IO thread for you.
+ *
+ * The output is given as a ChannelPipeline template: given a
+ * PipelineFactory, it will create a new pipeline for each connection,
+ * and your server can handle the incoming bytes.
+ *
+ * BACKWARDS COMPATIBLITY: for servers already taking a pool of
+ * Acceptor objects, an AcceptorFactory can be given directly instead
+ * of a pipeline factory.
+ */
+template <typename Pipeline>
+class ServerBootstrap {
+ public:
+ /* TODO(davejwatson)
+ *
+ * If there is any work to be done BEFORE handing the work to IO
+ * threads, this handler is where the pipeline to do it would be
+ * set.
+ *
+ * This could be used for things like logging, load balancing, or
+ * advanced load balancing on IO threads. Netty also provides this.
+ */
+ ServerBootstrap* handler() {
+ return this;
+ }
+
+ /*
+ * BACKWARDS COMPATIBILITY - an acceptor factory can be set. Your
+ * Acceptor is responsible for managing the connection pool.
+ *
+ * @param childHandler - acceptor factory to call for each IO thread
+ */
+ ServerBootstrap* childHandler(std::shared_ptr<AcceptorFactory> childHandler) {
+ acceptorFactory_ = childHandler;
+ return this;
+ }
+
+ /*
+ * Set a pipeline factory that will be called for each new connection
+ *
+ * @param factory pipeline factory to use for each new connection
+ */
+ ServerBootstrap* childPipeline(
+ std::shared_ptr<PipelineFactory<Pipeline>> factory) {
+ pipelineFactory_ = factory;
+ return this;
+ }
+
+ /*
+ * Set the IO executor. If not set, a default one will be created
+ * with one thread per core.
+ *
+ * @param io_group - io executor to use for IO threads.
+ */
+ ServerBootstrap* group(
+ std::shared_ptr<folly::wangle::IOThreadPoolExecutor> io_group) {
+ return group(nullptr, io_group);
+ }
+
+ /*
+ * Set the acceptor executor, and IO executor.
+ *
+ * If no acceptor executor is set, a single thread will be created for accepts
+ * If no IO executor is set, a default of one thread per core will be created
+ *
+ * @param group - acceptor executor to use for acceptor threads.
+ * @param io_group - io executor to use for IO threads.
+ */
+ ServerBootstrap* group(
+ std::shared_ptr<folly::wangle::IOThreadPoolExecutor> accept_group,
+ std::shared_ptr<wangle::IOThreadPoolExecutor> io_group) {
+ if (!accept_group) {
+ accept_group = std::make_shared<folly::wangle::IOThreadPoolExecutor>(
+ 1, std::make_shared<wangle::NamedThreadFactory>("Acceptor Thread"));
+ }
+ if (!io_group) {
+ io_group = std::make_shared<folly::wangle::IOThreadPoolExecutor>(
+ 32, std::make_shared<wangle::NamedThreadFactory>("IO Thread"));
+ }
+ auto factory = io_group->getThreadFactory();
+
+ //CHECK(factory == nullptr); // TODO
+
+ CHECK(acceptorFactory_ || pipelineFactory_);
+
+ if (acceptorFactory_) {
+ workerFactory_ = std::make_shared<ServerWorkerFactory>(
+ acceptorFactory_);
+ } else {
+ workerFactory_ = std::make_shared<ServerWorkerFactory>(
+ std::make_shared<ServerAcceptorFactory<Pipeline>>(pipelineFactory_));
+ }
+
+ acceptor_group_ = accept_group;
+ io_group_ = io_group;
+
+ auto numThreads = io_group_->numThreads();
+ io_group_->setNumThreads(0);
+ io_group_->setThreadFactory(workerFactory_);
+ io_group_->setNumThreads(numThreads);
+
+ return this;
+ }
+
+ /*
+ * Bind to a port and start listening.
+ * One of childPipeline or childHandler must be called before bind
+ *
+ * @param port Port to listen on
+ */
+ void bind(int port) {
+ // TODO bind to v4 and v6
+ // TODO take existing socket
+ // TODO use the acceptor thread
+ auto socket = folly::AsyncServerSocket::newSocket(
+ EventBaseManager::get()->getEventBase());
+ sockets_.push_back(socket);
+ socket->bind(port);
+
+ // TODO Take ServerSocketConfig
+ socket->listen(1024);
+
+ if (!workerFactory_) {
+ group(nullptr);
+ }
+
+ // Startup all the threads
+ workerFactory_->forEachWorker([this, socket](Acceptor* worker){
+ socket->getEventBase()->runInEventBaseThread([this, worker, socket](){
+ socket->addAcceptCallback(worker, worker->getEventBase());
+ });
+ });
+ socket->startAccepting();
+ }
+
+ /*
+ * Stop listening on all sockets.
+ */
+ void stop() {
+ for (auto& socket : sockets_) {
+ socket->stopAccepting();
+ }
+ acceptor_group_->join();
+ io_group_->join();
+ }
+
+ /*
+ * Get the list of listening sockets
+ */
+ std::vector<std::shared_ptr<folly::AsyncServerSocket>>&
+ getSockets() {
+ return sockets_;
+ }
+
+ private:
+ std::shared_ptr<wangle::IOThreadPoolExecutor> acceptor_group_;
+ std::shared_ptr<wangle::IOThreadPoolExecutor> io_group_;
+
+ std::shared_ptr<ServerWorkerFactory> workerFactory_;
+ std::vector<std::shared_ptr<folly::AsyncServerSocket>> sockets_;
+
+ std::shared_ptr<AcceptorFactory> acceptorFactory_;
+ std::shared_ptr<PipelineFactory<Pipeline>> pipelineFactory_;
+};
+
+} // namespace
};
}}
+
+namespace folly {
+
+class AsyncSocket;
+
+template <typename Pipeline>
+class PipelineFactory {
+ public:
+ virtual Pipeline* newPipeline(std::shared_ptr<AsyncSocket>) = 0;
+ virtual ~PipelineFactory() {}
+};
+
+}
threadFactory_ = std::move(threadFactory);
}
+ std::shared_ptr<ThreadFactory> getThreadFactory(void) {
+ return threadFactory_;
+ }
+
size_t numThreads();
void setNumThreads(size_t numThreads);
void stop();