From 4abb466901ce3fa3cc0b1f446f8f7a1bb67e715f Mon Sep 17 00:00:00 2001 From: Dave Watson Date: Fri, 20 Mar 2015 09:54:08 -0700 Subject: [PATCH] Udp Acceptor Summary: major changes: 1) ServerSocketFactory and AsyncSocketBase to abstract the differences between UDP and TCP async socket. Could possibly push some of this to the sockets themselves eventually 2) pipeline() is a pipeline between accept/receive of a UDP message, and before sending it to workers. Default impl for TCP is to fan out to worker threads. This is the same as Netty. Since we don't know if the data is a TCP socket or a UDP message, it's a void*, which sucks (netty uses Object msg, so it isn't any different). Test Plan: Added lots of new tests. Doesn't test any data passing yet though, just connects/simple receipt of UDP message. Reviewed By: hans@fb.com Subscribers: alandau, bmatheny, mshneer, jsedgwick, yfeldblum, trunkagent, doug, fugalh, folly-diffs@ FB internal diff: D1736670 Tasks: 5788116 Signature: t1:1736670:1424372992:e109450604ed905004bd40dfbb508b5808332c15 --- folly/Makefile.am | 2 + folly/io/async/AsyncServerSocket.h | 4 +- folly/io/async/AsyncSocketBase.h | 30 +++++ folly/io/async/AsyncTransport.h | 16 +-- folly/io/async/AsyncUDPServerSocket.h | 11 +- folly/io/async/AsyncUDPSocket.h | 1 + folly/wangle/acceptor/Acceptor.h | 8 +- folly/wangle/bootstrap/BootstrapTest.cpp | 132 +++++++++++++++++++ folly/wangle/bootstrap/ServerBootstrap-inl.h | 95 ++++++++++--- folly/wangle/bootstrap/ServerBootstrap.cpp | 26 ++-- folly/wangle/bootstrap/ServerBootstrap.h | 110 +++++++++------- folly/wangle/bootstrap/ServerSocketFactory.h | 118 +++++++++++++++++ 12 files changed, 459 insertions(+), 94 deletions(-) create mode 100644 folly/io/async/AsyncSocketBase.h create mode 100644 folly/wangle/bootstrap/ServerSocketFactory.h diff --git a/folly/Makefile.am b/folly/Makefile.am index 72272940..a5aec762 100644 --- a/folly/Makefile.am +++ b/folly/Makefile.am @@ -148,6 +148,7 @@ nobase_follyinclude_HEADERS = \ io/async/AsyncUDPSocket.h \ io/async/AsyncServerSocket.h \ io/async/AsyncSocket.h \ + io/async/AsyncSocketBase.h \ io/async/AsyncSSLSocket.h \ io/async/AsyncSocketException.h \ io/async/DelayedDestruction.h \ @@ -234,6 +235,7 @@ nobase_follyinclude_HEADERS = \ wangle/acceptor/TransportInfo.h \ wangle/bootstrap/ServerBootstrap.h \ wangle/bootstrap/ServerBootstrap-inl.h \ + wangle/bootstrap/ServerSocketFactory.h \ wangle/bootstrap/ClientBootstrap.h \ wangle/channel/AsyncSocketHandler.h \ wangle/channel/ChannelHandler.h \ diff --git a/folly/io/async/AsyncServerSocket.h b/folly/io/async/AsyncServerSocket.h index 46576aa2..ff9562fe 100644 --- a/folly/io/async/AsyncServerSocket.h +++ b/folly/io/async/AsyncServerSocket.h @@ -21,6 +21,7 @@ #include #include #include +#include #include #include #include @@ -56,7 +57,8 @@ namespace folly { * modify the AsyncServerSocket state may only be performed from the primary * EventBase thread. */ -class AsyncServerSocket : public DelayedDestruction { +class AsyncServerSocket : public DelayedDestruction + , public AsyncSocketBase { public: typedef std::unique_ptr UniquePtr; // Disallow copy, move, and default construction. diff --git a/folly/io/async/AsyncSocketBase.h b/folly/io/async/AsyncSocketBase.h new file mode 100644 index 00000000..35e8a152 --- /dev/null +++ b/folly/io/async/AsyncSocketBase.h @@ -0,0 +1,30 @@ +/* + * Copyright 2015 Facebook, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#pragma once + +#include +#include + +namespace folly { + +class AsyncSocketBase { + public: + virtual EventBase* getEventBase() const = 0; + virtual ~AsyncSocketBase() = default; + virtual void getAddress(SocketAddress*) const = 0; +}; + +} // namespace diff --git a/folly/io/async/AsyncTransport.h b/folly/io/async/AsyncTransport.h index 95fdd9dd..1cf4d209 100644 --- a/folly/io/async/AsyncTransport.h +++ b/folly/io/async/AsyncTransport.h @@ -20,6 +20,8 @@ #include #include +#include +#include namespace folly { @@ -111,7 +113,7 @@ inline bool isSet(WriteFlags a, WriteFlags b) { * timeout, since most callers want to give up if the remote end stops * responding and no further progress can be made sending the data. */ -class AsyncTransport : public DelayedDestruction { +class AsyncTransport : public DelayedDestruction, public AsyncSocketBase { public: typedef std::unique_ptr UniquePtr; @@ -256,14 +258,6 @@ class AsyncTransport : public DelayedDestruction { */ virtual bool isDetachable() const = 0; - /** - * Get the EventBase used by this transport. - * - * Returns nullptr if this transport is not currently attached to a - * EventBase. - */ - virtual EventBase* getEventBase() const = 0; - /** * Set the send timeout. * @@ -296,6 +290,10 @@ class AsyncTransport : public DelayedDestruction { */ virtual void getLocalAddress(SocketAddress* address) const = 0; + virtual void getAddress(SocketAddress* address) const { + getLocalAddress(address); + } + /** * Get the address of the remote endpoint to which this transport is * connected. diff --git a/folly/io/async/AsyncUDPServerSocket.h b/folly/io/async/AsyncUDPServerSocket.h index 9feb6d41..e424e83d 100644 --- a/folly/io/async/AsyncUDPServerSocket.h +++ b/folly/io/async/AsyncUDPServerSocket.h @@ -36,7 +36,8 @@ namespace folly { * more than 1 packet will not work because they will end up with * different event base to process. */ -class AsyncUDPServerSocket : private AsyncUDPSocket::ReadCallback { +class AsyncUDPServerSocket : private AsyncUDPSocket::ReadCallback + , public AsyncSocketBase { public: class Callback { public: @@ -93,6 +94,10 @@ class AsyncUDPServerSocket : private AsyncUDPSocket::ReadCallback { return socket_->address(); } + void getAddress(SocketAddress* a) const { + *a = address(); + } + /** * Add a listener to the round robin list */ @@ -124,6 +129,10 @@ class AsyncUDPServerSocket : private AsyncUDPSocket::ReadCallback { socket_.reset(); } + EventBase* getEventBase() const { + return evb_; + } + private: // AsyncUDPSocket::ReadCallback void getReadBuffer(void** buf, size_t* len) noexcept { diff --git a/folly/io/async/AsyncUDPSocket.h b/folly/io/async/AsyncUDPSocket.h index 0341b00f..a1bca318 100644 --- a/folly/io/async/AsyncUDPSocket.h +++ b/folly/io/async/AsyncUDPSocket.h @@ -19,6 +19,7 @@ #include #include #include +#include #include #include #include diff --git a/folly/wangle/acceptor/Acceptor.h b/folly/wangle/acceptor/Acceptor.h index 7d7269c5..c82f97a6 100644 --- a/folly/wangle/acceptor/Acceptor.h +++ b/folly/wangle/acceptor/Acceptor.h @@ -20,6 +20,7 @@ #include #include #include +#include namespace folly { namespace wangle { class ManagedConnection; @@ -46,7 +47,8 @@ class SSLContextManager; */ class Acceptor : public folly::AsyncServerSocket::AcceptCallback, - public folly::wangle::ConnectionManager::Callback { + public folly::wangle::ConnectionManager::Callback, + public AsyncUDPServerSocket::Callback { public: enum class State : uint32_t { @@ -229,6 +231,10 @@ class Acceptor : const std::string& nextProtocolName, const TransportInfo& tinfo) = 0; + void onListenStarted() noexcept {} + void onListenStopped() noexcept {} + void onDataAvailable(const SocketAddress&, std::unique_ptr, bool) noexcept {} + virtual AsyncSocket::UniquePtr makeNewAsyncSocket(EventBase* base, int fd) { return AsyncSocket::UniquePtr(new AsyncSocket(base, fd)); } diff --git a/folly/wangle/bootstrap/BootstrapTest.cpp b/folly/wangle/bootstrap/BootstrapTest.cpp index 4ba1880d..d0ee7e2a 100644 --- a/folly/wangle/bootstrap/BootstrapTest.cpp +++ b/folly/wangle/bootstrap/BootstrapTest.cpp @@ -52,6 +52,27 @@ class TestPipelineFactory : public PipelineFactory { std::atomic pipelines{0}; }; +class TestAcceptor : public Acceptor { +EventBase base_; + public: + TestAcceptor() : Acceptor(ServerSocketConfig()) { + Acceptor::init(nullptr, &base_); + } + void onNewConnection( + AsyncSocket::UniquePtr sock, + const folly::SocketAddress* address, + const std::string& nextProtocolName, + const TransportInfo& tinfo) { + } +}; + +class TestAcceptorFactory : public AcceptorFactory { + public: + std::shared_ptr newAcceptor(EventBase* base) { + return std::make_shared(); + } +}; + TEST(Bootstrap, Basic) { TestServer server; TestClient client; @@ -64,6 +85,13 @@ TEST(Bootstrap, ServerWithPipeline) { server.stop(); } +TEST(Bootstrap, ServerWithChildHandler) { + TestServer server; + server.childHandler(std::make_shared()); + server.bind(0); + server.stop(); +} + TEST(Bootstrap, ClientServerTest) { TestServer server; auto factory = std::make_shared(); @@ -236,3 +264,107 @@ TEST(Bootstrap, ExistingSocket) { folly::AsyncServerSocket::UniquePtr socket(new AsyncServerSocket); server.bind(std::move(socket)); } + +std::atomic connections{0}; + +class TestHandlerPipeline + : public ChannelHandlerAdapter { + public: + void read(Context* ctx, void* conn) { + connections++; + return ctx->fireRead(conn); + } + + Future write(Context* ctx, std::exception e) { + return ctx->fireWrite(e); + } +}; + +template +class TestHandlerPipelineFactory + : public PipelineFactory::AcceptPipeline> { + public: + ServerBootstrap::AcceptPipeline* newPipeline(std::shared_ptr) { + auto pipeline = new ServerBootstrap::AcceptPipeline; + auto handler = std::make_shared(); + pipeline->addBack(ChannelHandlerPtr(handler)); + return pipeline; + } +}; + +TEST(Bootstrap, LoadBalanceHandler) { + TestServer server; + auto factory = std::make_shared(); + server.childPipeline(factory); + + auto pipelinefactory = + std::make_shared>(); + server.pipeline(pipelinefactory); + server.bind(0); + auto base = EventBaseManager::get()->getEventBase(); + + SocketAddress address; + server.getSockets()[0]->getAddress(&address); + + TestClient client; + client.pipelineFactory(std::make_shared()); + client.connect(address); + base->loop(); + server.stop(); + + CHECK(factory->pipelines == 1); + CHECK(connections == 1); +} + +class TestUDPPipeline + : public ChannelHandlerAdapter { + public: + void read(Context* ctx, void* conn) { + connections++; + } + + Future write(Context* ctx, std::exception e) { + return ctx->fireWrite(e); + } +}; + +TEST(Bootstrap, UDP) { + TestServer server; + auto factory = std::make_shared(); + auto pipelinefactory = + std::make_shared>(); + server.pipeline(pipelinefactory); + server.channelFactory(std::make_shared()); + server.bind(0); +} + +TEST(Bootstrap, UDPClientServerTest) { + connections = 0; + + TestServer server; + auto factory = std::make_shared(); + auto pipelinefactory = + std::make_shared>(); + server.pipeline(pipelinefactory); + server.channelFactory(std::make_shared()); + server.bind(0); + + auto base = EventBaseManager::get()->getEventBase(); + + SocketAddress address; + server.getSockets()[0]->getAddress(&address); + + SocketAddress localhost("::1", 0); + AsyncUDPSocket client(base); + client.bind(localhost); + auto data = IOBuf::create(1); + data->append(1); + *(data->writableData()) = 'a'; + client.write(address, std::move(data)); + base->loop(); + server.stop(); + + CHECK(connections == 1); +} diff --git a/folly/wangle/bootstrap/ServerBootstrap-inl.h b/folly/wangle/bootstrap/ServerBootstrap-inl.h index ac18f314..a6ffd268 100644 --- a/folly/wangle/bootstrap/ServerBootstrap-inl.h +++ b/folly/wangle/bootstrap/ServerBootstrap-inl.h @@ -16,15 +16,19 @@ #pragma once #include +#include #include #include #include #include +#include namespace folly { template -class ServerAcceptor : public Acceptor { +class ServerAcceptor + : public Acceptor + , public folly::wangle::ChannelHandlerAdapter { typedef std::unique_ptr PipelinePtr; @@ -55,21 +59,26 @@ class ServerAcceptor : public Acceptor { public: explicit ServerAcceptor( - std::shared_ptr> pipelineFactory, - EventBase* base) + std::shared_ptr> pipelineFactory, + std::shared_ptr> acceptorPipeline, + EventBase* base) : Acceptor(ServerSocketConfig()) - , pipelineFactory_(pipelineFactory) { - Acceptor::init(nullptr, base); + , base_(base) + , childPipelineFactory_(pipelineFactory) + , acceptorPipeline_(acceptorPipeline) { + Acceptor::init(nullptr, base_); + CHECK(acceptorPipeline_); + + acceptorPipeline_->addBack(folly::wangle::ChannelHandlerPtr(this)); + acceptorPipeline_->finalize(); } - /* See Acceptor::onNewConnection for details */ - void onNewConnection( - AsyncSocket::UniquePtr transport, const SocketAddress* address, - const std::string& nextProtocolName, const TransportInfo& tinfo) { - + void read(Context* ctx, void* conn) { + AsyncSocket::UniquePtr transport((AsyncSocket*)conn); std::unique_ptr - pipeline(pipelineFactory_->newPipeline( + pipeline(childPipelineFactory_->newPipeline( std::shared_ptr( transport.release(), folly::DelayedDestruction::Destructor()))); @@ -77,22 +86,53 @@ class ServerAcceptor : public Acceptor { Acceptor::addConnection(connection); } + folly::Future write(Context* ctx, std::exception e) { + return ctx->fireWrite(e); + } + + /* See Acceptor::onNewConnection for details */ + void onNewConnection( + AsyncSocket::UniquePtr transport, const SocketAddress* address, + const std::string& nextProtocolName, const TransportInfo& tinfo) { + acceptorPipeline_->read(transport.release()); + } + + // UDP thunk + void onDataAvailable(const folly::SocketAddress& addr, + std::unique_ptr buf, + bool truncated) noexcept { + acceptorPipeline_->read(buf.release()); + } + private: - std::shared_ptr> pipelineFactory_; + EventBase* base_; + + std::shared_ptr> childPipelineFactory_; + std::shared_ptr> acceptorPipeline_; }; template class ServerAcceptorFactory : public AcceptorFactory { public: explicit ServerAcceptorFactory( - std::shared_ptr> factory) - : factory_(factory) {} - - std::shared_ptr newAcceptor(folly::EventBase* base) { - return std::make_shared>(factory_, base); + std::shared_ptr> factory, + std::shared_ptr>> pipeline) + : factory_(factory) + , pipeline_(pipeline) {} + + std::shared_ptr newAcceptor(EventBase* base) { + std::shared_ptr> pipeline( + pipeline_->newPipeline(nullptr)); + return std::make_shared>(factory_, pipeline, base); } private: std::shared_ptr> factory_; + std::shared_ptr>> pipeline_; }; class ServerWorkerPool : public folly::wangle::ThreadPoolExecutor::Observer { @@ -100,10 +140,12 @@ class ServerWorkerPool : public folly::wangle::ThreadPoolExecutor::Observer { explicit ServerWorkerPool( std::shared_ptr acceptorFactory, folly::wangle::IOThreadPoolExecutor* exec, - std::vector>* sockets) + std::vector>* sockets, + std::shared_ptr socketFactory) : acceptorFactory_(acceptorFactory) , exec_(exec) - , sockets_(sockets) { + , sockets_(sockets) + , socketFactory_(socketFactory) { CHECK(exec); } @@ -128,7 +170,8 @@ class ServerWorkerPool : public folly::wangle::ThreadPoolExecutor::Observer { std::shared_ptr> workers_; std::shared_ptr acceptorFactory_; folly::wangle::IOThreadPoolExecutor* exec_{nullptr}; - std::vector>* sockets_; + std::vector>* sockets_; + std::shared_ptr socketFactory_; }; template @@ -138,4 +181,16 @@ void ServerWorkerPool::forEachWorker(F&& f) const { } } +class DefaultAcceptPipelineFactory + : public PipelineFactory> { + typedef wangle::ChannelPipeline< + void*, + std::exception> AcceptPipeline; + + public: + AcceptPipeline* newPipeline(std::shared_ptr) { + return new AcceptPipeline; + } +}; + } // namespace diff --git a/folly/wangle/bootstrap/ServerBootstrap.cpp b/folly/wangle/bootstrap/ServerBootstrap.cpp index be2add8d..cd7a88eb 100644 --- a/folly/wangle/bootstrap/ServerBootstrap.cpp +++ b/folly/wangle/bootstrap/ServerBootstrap.cpp @@ -15,6 +15,7 @@ */ #include #include +#include #include namespace folly { @@ -25,8 +26,9 @@ void ServerWorkerPool::threadStarted( workers_.insert({h, worker}); for(auto socket : *sockets_) { - socket->getEventBase()->runInEventBaseThread([this, worker, socket](){ - socket->addAcceptCallback(worker.get(), worker->getEventBase()); + socket->getEventBase()->runInEventBaseThreadAndWait([this, worker, socket](){ + socketFactory_->addAcceptCB( + socket, worker.get(), worker->getEventBase()); }); } } @@ -38,22 +40,22 @@ void ServerWorkerPool::threadStopped( for (auto& socket : *sockets_) { folly::Baton<> barrier; - socket->getEventBase()->runInEventBaseThread([&]() { - socket->removeAcceptCallback(worker->second.get(), nullptr); + socket->getEventBase()->runInEventBaseThreadAndWait([&]() { + socketFactory_->removeAcceptCB( + socket, worker->second.get(), nullptr); barrier.post(); }); barrier.wait(); } - CHECK(worker->second->getEventBase() != nullptr); - CHECK(!worker->second->getEventBase()->isInEventBaseThread()); - folly::Baton<> barrier; - worker->second->getEventBase()->runInEventBaseThread([&]() { - worker->second->dropAllConnections(); - barrier.post(); - }); + if (!worker->second->getEventBase()->isInEventBaseThread()) { + worker->second->getEventBase()->runInEventBaseThreadAndWait([=]() { + worker->second->dropAllConnections(); + }); + } else { + worker->second->dropAllConnections(); + } - barrier.wait(); workers_.erase(worker); } diff --git a/folly/wangle/bootstrap/ServerBootstrap.h b/folly/wangle/bootstrap/ServerBootstrap.h index 5a65186d..82465988 100644 --- a/folly/wangle/bootstrap/ServerBootstrap.h +++ b/folly/wangle/bootstrap/ServerBootstrap.h @@ -17,6 +17,7 @@ #include #include +#include namespace folly { @@ -44,16 +45,24 @@ class ServerBootstrap { ~ServerBootstrap() { stop(); } - /* TODO(davejwatson) - * - * If there is any work to be done BEFORE handing the work to IO - * threads, this handler is where the pipeline to do it would be - * set. - * - * This could be used for things like logging, load balancing, or - * advanced load balancing on IO threads. Netty also provides this. + + typedef wangle::ChannelPipeline< + void*, + std::exception> AcceptPipeline; + /* + * Pipeline used to add connections to event bases. + * This is used for UDP or for load balancing + * TCP connections to IO threads explicitly */ - ServerBootstrap* handler() { + ServerBootstrap* pipeline( + std::shared_ptr> factory) { + pipeline_ = factory; + return this; + } + + ServerBootstrap* channelFactory( + std::shared_ptr factory) { + socketFactory_ = factory; return this; } @@ -75,7 +84,7 @@ class ServerBootstrap { */ ServerBootstrap* childPipeline( std::shared_ptr> factory) { - pipelineFactory_ = factory; + childPipelineFactory_ = factory; return this; } @@ -111,15 +120,19 @@ class ServerBootstrap { 32, std::make_shared("IO Thread")); } - CHECK(acceptorFactory_ || pipelineFactory_); + // TODO better config checking + // CHECK(acceptorFactory_ || childPipelineFactory_); + CHECK(!(acceptorFactory_ && childPipelineFactory_)); if (acceptorFactory_) { workerFactory_ = std::make_shared( - acceptorFactory_, io_group.get(), &sockets_); + acceptorFactory_, io_group.get(), &sockets_, socketFactory_); } else { workerFactory_ = std::make_shared( - std::make_shared>(pipelineFactory_), - io_group.get(), &sockets_); + std::make_shared>( + childPipelineFactory_, + pipeline_), + io_group.get(), &sockets_, socketFactory_); } io_group->addObserver(workerFactory_); @@ -143,13 +156,14 @@ class ServerBootstrap { // Since only a single socket is given, // we can only accept on a single thread CHECK(acceptor_group_->numThreads() == 1); + std::shared_ptr socket( s.release(), DelayedDestruction::Destructor()); folly::Baton<> barrier; acceptor_group_->add([&](){ socket->attachEventBase(EventBaseManager::get()->getEventBase()); - socket->listen(1024); + socket->listen(socketConfig.acceptBacklog); socket->startAccepting(); barrier.post(); }); @@ -157,8 +171,9 @@ class ServerBootstrap { // Startup all the threads workerFactory_->forEachWorker([this, socket](Acceptor* worker){ - socket->getEventBase()->runInEventBaseThread([this, worker, socket](){ - socket->addAcceptCallback(worker, worker->getEventBase()); + socket->getEventBase()->runInEventBaseThreadAndWait( + [this, worker, socket](){ + socketFactory_->addAcceptCB(socket, worker, worker->getEventBase()); }); }); @@ -192,31 +207,16 @@ class ServerBootstrap { } std::mutex sock_lock; - std::vector> new_sockets; + std::vector> new_sockets; + std::exception_ptr exn; auto startupFunc = [&](std::shared_ptr> barrier){ - auto socket = folly::AsyncServerSocket::newSocket(); - socket->setReusePortEnabled(reusePort); - socket->attachEventBase(EventBaseManager::get()->getEventBase()); - - try { - if (port >= 0) { - socket->bind(port); - } else { - socket->bind(address); - port = address.getPort(); - } - - socket->listen(socketConfig.acceptBacklog); - socket->startAccepting(); - } catch (...) { - exn = std::current_exception(); - barrier->post(); - - return; - } + + try { + auto socket = socketFactory_->newSocket( + port, address, socketConfig.acceptBacklog, reusePort, socketConfig); sock_lock.lock(); new_sockets.push_back(socket); @@ -228,6 +228,15 @@ class ServerBootstrap { } barrier->post(); + } catch (...) { + exn = std::current_exception(); + barrier->post(); + + return; + } + + + }; auto wait0 = std::make_shared>(); @@ -244,16 +253,14 @@ class ServerBootstrap { std::rethrow_exception(exn); } - // Startup all the threads - for(auto socket : new_sockets) { + for (auto& socket : new_sockets) { + // Startup all the threads workerFactory_->forEachWorker([this, socket](Acceptor* worker){ - socket->getEventBase()->runInEventBaseThread([this, worker, socket](){ - socket->addAcceptCallback(worker, worker->getEventBase()); + socket->getEventBase()->runInEventBaseThreadAndWait([this, worker, socket](){ + socketFactory_->addAcceptCB(socket, worker, worker->getEventBase()); }); }); - } - for (auto& socket : new_sockets) { sockets_.push_back(socket); } } @@ -264,9 +271,8 @@ class ServerBootstrap { void stop() { for (auto socket : sockets_) { folly::Baton<> barrier; - socket->getEventBase()->runInEventBaseThread([&barrier, socket]() { - socket->stopAccepting(); - socket->detachEventBase(); + socket->getEventBase()->runInEventBaseThread([&]() mutable { + socketFactory_->stopSocket(socket); barrier.post(); }); barrier.wait(); @@ -284,7 +290,7 @@ class ServerBootstrap { /* * Get the list of listening sockets */ - const std::vector>& + const std::vector>& getSockets() const { return sockets_; } @@ -305,10 +311,14 @@ class ServerBootstrap { std::shared_ptr io_group_; std::shared_ptr workerFactory_; - std::vector> sockets_; + std::vector> sockets_; std::shared_ptr acceptorFactory_; - std::shared_ptr> pipelineFactory_; + std::shared_ptr> childPipelineFactory_; + std::shared_ptr> pipeline_{ + std::make_shared()}; + std::shared_ptr socketFactory_{ + std::make_shared()}; }; } // namespace diff --git a/folly/wangle/bootstrap/ServerSocketFactory.h b/folly/wangle/bootstrap/ServerSocketFactory.h new file mode 100644 index 00000000..ca99c2ee --- /dev/null +++ b/folly/wangle/bootstrap/ServerSocketFactory.h @@ -0,0 +1,118 @@ +/* + * Copyright 2015 Facebook, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#pragma once + +#include +#include +#include +#include + +namespace folly { + +class ServerSocketFactory { + public: + virtual std::shared_ptr newSocket( + int port, SocketAddress address, int backlog, + bool reuse, ServerSocketConfig& config) = 0; + + virtual void stopSocket( + std::shared_ptr& socket) = 0; + + virtual void removeAcceptCB(std::shared_ptr sock, Acceptor *callback, EventBase* base) = 0; + virtual void addAcceptCB(std::shared_ptr sock, Acceptor* callback, EventBase* base) = 0 ; + virtual ~ServerSocketFactory() = default; +}; + +class AsyncServerSocketFactory : public ServerSocketFactory { + public: + std::shared_ptr newSocket( + int port, SocketAddress address, int backlog, bool reuse, + ServerSocketConfig& config) { + + auto socket = folly::AsyncServerSocket::newSocket(); + socket->setReusePortEnabled(reuse); + socket->attachEventBase(EventBaseManager::get()->getEventBase()); + if (port >= 0) { + socket->bind(port); + } else { + socket->bind(address); + } + + socket->listen(config.acceptBacklog); + socket->startAccepting(); + + return socket; + } + + virtual void stopSocket( + std::shared_ptr& s) { + auto socket = std::dynamic_pointer_cast(s); + DCHECK(socket); + socket->stopAccepting(); + socket->detachEventBase(); + } + + virtual void removeAcceptCB(std::shared_ptr s, + Acceptor *callback, EventBase* base) { + auto socket = std::dynamic_pointer_cast(s); + CHECK(socket); + socket->removeAcceptCallback(callback, base); + } + + virtual void addAcceptCB(std::shared_ptr s, + Acceptor* callback, EventBase* base) { + auto socket = std::dynamic_pointer_cast(s); + CHECK(socket); + socket->addAcceptCallback(callback, base); + } +}; + +class AsyncUDPServerSocketFactory : public ServerSocketFactory { + public: + std::shared_ptr newSocket( + int port, SocketAddress address, int backlog, bool reuse, + ServerSocketConfig& config) { + + auto socket = std::make_shared( + EventBaseManager::get()->getEventBase()); + //socket->setReusePortEnabled(reuse); + SocketAddress addressr("::1", port); + socket->bind(addressr); + socket->listen(); + + return socket; + } + + virtual void stopSocket( + std::shared_ptr& s) { + auto socket = std::dynamic_pointer_cast(s); + DCHECK(socket); + socket->close(); + } + + virtual void removeAcceptCB(std::shared_ptr s, + Acceptor *callback, EventBase* base) { + } + + virtual void addAcceptCB(std::shared_ptr s, + Acceptor* callback, EventBase* base) { + auto socket = std::dynamic_pointer_cast(s); + DCHECK(socket); + socket->addListener(base, callback); + } +}; + +} // namespace -- 2.34.1