io/async/AsyncUDPSocket.h \
io/async/AsyncServerSocket.h \
io/async/AsyncSocket.h \
+ io/async/AsyncSocketBase.h \
io/async/AsyncSSLSocket.h \
io/async/AsyncSocketException.h \
io/async/DelayedDestruction.h \
wangle/acceptor/TransportInfo.h \
wangle/bootstrap/ServerBootstrap.h \
wangle/bootstrap/ServerBootstrap-inl.h \
+ wangle/bootstrap/ServerSocketFactory.h \
wangle/bootstrap/ClientBootstrap.h \
wangle/channel/AsyncSocketHandler.h \
wangle/channel/ChannelHandler.h \
#include <folly/io/async/EventBase.h>
#include <folly/io/async/NotificationQueue.h>
#include <folly/io/async/AsyncTimeout.h>
+#include <folly/io/async/AsyncSocketBase.h>
#include <folly/io/ShutdownSocketSet.h>
#include <folly/SocketAddress.h>
#include <memory>
* modify the AsyncServerSocket state may only be performed from the primary
* EventBase thread.
*/
-class AsyncServerSocket : public DelayedDestruction {
+class AsyncServerSocket : public DelayedDestruction
+ , public AsyncSocketBase {
public:
typedef std::unique_ptr<AsyncServerSocket, Destructor> UniquePtr;
// Disallow copy, move, and default construction.
--- /dev/null
+/*
+ * Copyright 2015 Facebook, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#pragma once
+
+#include <folly/SocketAddress.h>
+#include <folly/io/async/EventBase.h>
+
+namespace folly {
+
+class AsyncSocketBase {
+ public:
+ virtual EventBase* getEventBase() const = 0;
+ virtual ~AsyncSocketBase() = default;
+ virtual void getAddress(SocketAddress*) const = 0;
+};
+
+} // namespace
#include <sys/uio.h>
#include <folly/io/async/DelayedDestruction.h>
+#include <folly/io/async/EventBase.h>
+#include <folly/io/async/AsyncSocketBase.h>
namespace folly {
* timeout, since most callers want to give up if the remote end stops
* responding and no further progress can be made sending the data.
*/
-class AsyncTransport : public DelayedDestruction {
+class AsyncTransport : public DelayedDestruction, public AsyncSocketBase {
public:
typedef std::unique_ptr<AsyncTransport, Destructor> UniquePtr;
*/
virtual bool isDetachable() const = 0;
- /**
- * Get the EventBase used by this transport.
- *
- * Returns nullptr if this transport is not currently attached to a
- * EventBase.
- */
- virtual EventBase* getEventBase() const = 0;
-
/**
* Set the send timeout.
*
*/
virtual void getLocalAddress(SocketAddress* address) const = 0;
+ virtual void getAddress(SocketAddress* address) const {
+ getLocalAddress(address);
+ }
+
/**
* Get the address of the remote endpoint to which this transport is
* connected.
* more than 1 packet will not work because they will end up with
* different event base to process.
*/
-class AsyncUDPServerSocket : private AsyncUDPSocket::ReadCallback {
+class AsyncUDPServerSocket : private AsyncUDPSocket::ReadCallback
+ , public AsyncSocketBase {
public:
class Callback {
public:
return socket_->address();
}
+ void getAddress(SocketAddress* a) const {
+ *a = address();
+ }
+
/**
* Add a listener to the round robin list
*/
socket_.reset();
}
+ EventBase* getEventBase() const {
+ return evb_;
+ }
+
private:
// AsyncUDPSocket::ReadCallback
void getReadBuffer(void** buf, size_t* len) noexcept {
#include <folly/io/IOBuf.h>
#include <folly/ScopeGuard.h>
#include <folly/io/async/AsyncSocketException.h>
+#include <folly/io/async/AsyncSocketBase.h>
#include <folly/io/async/EventHandler.h>
#include <folly/io/async/EventBase.h>
#include <folly/SocketAddress.h>
#include <event.h>
#include <folly/io/async/AsyncSSLSocket.h>
#include <folly/io/async/AsyncServerSocket.h>
+#include <folly/io/async/AsyncUDPServerSocket.h>
namespace folly { namespace wangle {
class ManagedConnection;
*/
class Acceptor :
public folly::AsyncServerSocket::AcceptCallback,
- public folly::wangle::ConnectionManager::Callback {
+ public folly::wangle::ConnectionManager::Callback,
+ public AsyncUDPServerSocket::Callback {
public:
enum class State : uint32_t {
const std::string& nextProtocolName,
const TransportInfo& tinfo) = 0;
+ void onListenStarted() noexcept {}
+ void onListenStopped() noexcept {}
+ void onDataAvailable(const SocketAddress&, std::unique_ptr<IOBuf>, bool) noexcept {}
+
virtual AsyncSocket::UniquePtr makeNewAsyncSocket(EventBase* base, int fd) {
return AsyncSocket::UniquePtr(new AsyncSocket(base, fd));
}
std::atomic<int> pipelines{0};
};
+class TestAcceptor : public Acceptor {
+EventBase base_;
+ public:
+ TestAcceptor() : Acceptor(ServerSocketConfig()) {
+ Acceptor::init(nullptr, &base_);
+ }
+ void onNewConnection(
+ AsyncSocket::UniquePtr sock,
+ const folly::SocketAddress* address,
+ const std::string& nextProtocolName,
+ const TransportInfo& tinfo) {
+ }
+};
+
+class TestAcceptorFactory : public AcceptorFactory {
+ public:
+ std::shared_ptr<Acceptor> newAcceptor(EventBase* base) {
+ return std::make_shared<TestAcceptor>();
+ }
+};
+
TEST(Bootstrap, Basic) {
TestServer server;
TestClient client;
server.stop();
}
+TEST(Bootstrap, ServerWithChildHandler) {
+ TestServer server;
+ server.childHandler(std::make_shared<TestAcceptorFactory>());
+ server.bind(0);
+ server.stop();
+}
+
TEST(Bootstrap, ClientServerTest) {
TestServer server;
auto factory = std::make_shared<TestPipelineFactory>();
folly::AsyncServerSocket::UniquePtr socket(new AsyncServerSocket);
server.bind(std::move(socket));
}
+
+std::atomic<int> connections{0};
+
+class TestHandlerPipeline
+ : public ChannelHandlerAdapter<void*,
+ std::exception> {
+ public:
+ void read(Context* ctx, void* conn) {
+ connections++;
+ return ctx->fireRead(conn);
+ }
+
+ Future<void> write(Context* ctx, std::exception e) {
+ return ctx->fireWrite(e);
+ }
+};
+
+template <typename HandlerPipeline>
+class TestHandlerPipelineFactory
+ : public PipelineFactory<ServerBootstrap<Pipeline>::AcceptPipeline> {
+ public:
+ ServerBootstrap<Pipeline>::AcceptPipeline* newPipeline(std::shared_ptr<AsyncSocket>) {
+ auto pipeline = new ServerBootstrap<Pipeline>::AcceptPipeline;
+ auto handler = std::make_shared<HandlerPipeline>();
+ pipeline->addBack(ChannelHandlerPtr<HandlerPipeline>(handler));
+ return pipeline;
+ }
+};
+
+TEST(Bootstrap, LoadBalanceHandler) {
+ TestServer server;
+ auto factory = std::make_shared<TestPipelineFactory>();
+ server.childPipeline(factory);
+
+ auto pipelinefactory =
+ std::make_shared<TestHandlerPipelineFactory<TestHandlerPipeline>>();
+ server.pipeline(pipelinefactory);
+ server.bind(0);
+ auto base = EventBaseManager::get()->getEventBase();
+
+ SocketAddress address;
+ server.getSockets()[0]->getAddress(&address);
+
+ TestClient client;
+ client.pipelineFactory(std::make_shared<TestClientPipelineFactory>());
+ client.connect(address);
+ base->loop();
+ server.stop();
+
+ CHECK(factory->pipelines == 1);
+ CHECK(connections == 1);
+}
+
+class TestUDPPipeline
+ : public ChannelHandlerAdapter<void*,
+ std::exception> {
+ public:
+ void read(Context* ctx, void* conn) {
+ connections++;
+ }
+
+ Future<void> write(Context* ctx, std::exception e) {
+ return ctx->fireWrite(e);
+ }
+};
+
+TEST(Bootstrap, UDP) {
+ TestServer server;
+ auto factory = std::make_shared<TestPipelineFactory>();
+ auto pipelinefactory =
+ std::make_shared<TestHandlerPipelineFactory<TestUDPPipeline>>();
+ server.pipeline(pipelinefactory);
+ server.channelFactory(std::make_shared<AsyncUDPServerSocketFactory>());
+ server.bind(0);
+}
+
+TEST(Bootstrap, UDPClientServerTest) {
+ connections = 0;
+
+ TestServer server;
+ auto factory = std::make_shared<TestPipelineFactory>();
+ auto pipelinefactory =
+ std::make_shared<TestHandlerPipelineFactory<TestUDPPipeline>>();
+ server.pipeline(pipelinefactory);
+ server.channelFactory(std::make_shared<AsyncUDPServerSocketFactory>());
+ server.bind(0);
+
+ auto base = EventBaseManager::get()->getEventBase();
+
+ SocketAddress address;
+ server.getSockets()[0]->getAddress(&address);
+
+ SocketAddress localhost("::1", 0);
+ AsyncUDPSocket client(base);
+ client.bind(localhost);
+ auto data = IOBuf::create(1);
+ data->append(1);
+ *(data->writableData()) = 'a';
+ client.write(address, std::move(data));
+ base->loop();
+ server.stop();
+
+ CHECK(connections == 1);
+}
#pragma once
#include <folly/wangle/acceptor/Acceptor.h>
+#include <folly/wangle/bootstrap/ServerSocketFactory.h>
#include <folly/io/async/EventBaseManager.h>
#include <folly/wangle/concurrent/IOThreadPoolExecutor.h>
#include <folly/wangle/acceptor/ManagedConnection.h>
#include <folly/wangle/channel/ChannelPipeline.h>
+#include <folly/wangle/channel/ChannelHandler.h>
namespace folly {
template <typename Pipeline>
-class ServerAcceptor : public Acceptor {
+class ServerAcceptor
+ : public Acceptor
+ , public folly::wangle::ChannelHandlerAdapter<void*, std::exception> {
typedef std::unique_ptr<Pipeline,
folly::DelayedDestruction::Destructor> PipelinePtr;
public:
explicit ServerAcceptor(
- std::shared_ptr<PipelineFactory<Pipeline>> pipelineFactory,
- EventBase* base)
+ std::shared_ptr<PipelineFactory<Pipeline>> pipelineFactory,
+ std::shared_ptr<folly::wangle::ChannelPipeline<
+ void*, std::exception>> acceptorPipeline,
+ EventBase* base)
: Acceptor(ServerSocketConfig())
- , pipelineFactory_(pipelineFactory) {
- Acceptor::init(nullptr, base);
+ , base_(base)
+ , childPipelineFactory_(pipelineFactory)
+ , acceptorPipeline_(acceptorPipeline) {
+ Acceptor::init(nullptr, base_);
+ CHECK(acceptorPipeline_);
+
+ acceptorPipeline_->addBack(folly::wangle::ChannelHandlerPtr<ServerAcceptor, false>(this));
+ acceptorPipeline_->finalize();
}
- /* See Acceptor::onNewConnection for details */
- void onNewConnection(
- AsyncSocket::UniquePtr transport, const SocketAddress* address,
- const std::string& nextProtocolName, const TransportInfo& tinfo) {
-
+ void read(Context* ctx, void* conn) {
+ AsyncSocket::UniquePtr transport((AsyncSocket*)conn);
std::unique_ptr<Pipeline,
folly::DelayedDestruction::Destructor>
- pipeline(pipelineFactory_->newPipeline(
+ pipeline(childPipelineFactory_->newPipeline(
std::shared_ptr<AsyncSocket>(
transport.release(),
folly::DelayedDestruction::Destructor())));
Acceptor::addConnection(connection);
}
+ folly::Future<void> write(Context* ctx, std::exception e) {
+ return ctx->fireWrite(e);
+ }
+
+ /* See Acceptor::onNewConnection for details */
+ void onNewConnection(
+ AsyncSocket::UniquePtr transport, const SocketAddress* address,
+ const std::string& nextProtocolName, const TransportInfo& tinfo) {
+ acceptorPipeline_->read(transport.release());
+ }
+
+ // UDP thunk
+ void onDataAvailable(const folly::SocketAddress& addr,
+ std::unique_ptr<folly::IOBuf> buf,
+ bool truncated) noexcept {
+ acceptorPipeline_->read(buf.release());
+ }
+
private:
- std::shared_ptr<PipelineFactory<Pipeline>> pipelineFactory_;
+ EventBase* base_;
+
+ std::shared_ptr<PipelineFactory<Pipeline>> childPipelineFactory_;
+ std::shared_ptr<folly::wangle::ChannelPipeline<
+ void*, std::exception>> acceptorPipeline_;
};
template <typename Pipeline>
class ServerAcceptorFactory : public AcceptorFactory {
public:
explicit ServerAcceptorFactory(
- std::shared_ptr<PipelineFactory<Pipeline>> factory)
- : factory_(factory) {}
-
- std::shared_ptr<Acceptor> newAcceptor(folly::EventBase* base) {
- return std::make_shared<ServerAcceptor<Pipeline>>(factory_, base);
+ std::shared_ptr<PipelineFactory<Pipeline>> factory,
+ std::shared_ptr<PipelineFactory<folly::wangle::ChannelPipeline<
+ void*, std::exception>>> pipeline)
+ : factory_(factory)
+ , pipeline_(pipeline) {}
+
+ std::shared_ptr<Acceptor> newAcceptor(EventBase* base) {
+ std::shared_ptr<folly::wangle::ChannelPipeline<
+ void*, std::exception>> pipeline(
+ pipeline_->newPipeline(nullptr));
+ return std::make_shared<ServerAcceptor<Pipeline>>(factory_, pipeline, base);
}
private:
std::shared_ptr<PipelineFactory<Pipeline>> factory_;
+ std::shared_ptr<PipelineFactory<
+ folly::wangle::ChannelPipeline<
+ void*, std::exception>>> pipeline_;
};
class ServerWorkerPool : public folly::wangle::ThreadPoolExecutor::Observer {
explicit ServerWorkerPool(
std::shared_ptr<AcceptorFactory> acceptorFactory,
folly::wangle::IOThreadPoolExecutor* exec,
- std::vector<std::shared_ptr<folly::AsyncServerSocket>>* sockets)
+ std::vector<std::shared_ptr<folly::AsyncSocketBase>>* sockets,
+ std::shared_ptr<ServerSocketFactory> socketFactory)
: acceptorFactory_(acceptorFactory)
, exec_(exec)
- , sockets_(sockets) {
+ , sockets_(sockets)
+ , socketFactory_(socketFactory) {
CHECK(exec);
}
std::shared_ptr<Acceptor>> workers_;
std::shared_ptr<AcceptorFactory> acceptorFactory_;
folly::wangle::IOThreadPoolExecutor* exec_{nullptr};
- std::vector<std::shared_ptr<folly::AsyncServerSocket>>* sockets_;
+ std::vector<std::shared_ptr<folly::AsyncSocketBase>>* sockets_;
+ std::shared_ptr<ServerSocketFactory> socketFactory_;
};
template <typename F>
}
}
+class DefaultAcceptPipelineFactory
+ : public PipelineFactory<wangle::ChannelPipeline<void*, std::exception>> {
+ typedef wangle::ChannelPipeline<
+ void*,
+ std::exception> AcceptPipeline;
+
+ public:
+ AcceptPipeline* newPipeline(std::shared_ptr<AsyncSocket>) {
+ return new AcceptPipeline;
+ }
+};
+
} // namespace
*/
#include <folly/wangle/bootstrap/ServerBootstrap.h>
#include <folly/wangle/concurrent/NamedThreadFactory.h>
+#include <folly/wangle/channel/ChannelHandler.h>
#include <folly/io/async/EventBaseManager.h>
namespace folly {
workers_.insert({h, worker});
for(auto socket : *sockets_) {
- socket->getEventBase()->runInEventBaseThread([this, worker, socket](){
- socket->addAcceptCallback(worker.get(), worker->getEventBase());
+ socket->getEventBase()->runInEventBaseThreadAndWait([this, worker, socket](){
+ socketFactory_->addAcceptCB(
+ socket, worker.get(), worker->getEventBase());
});
}
}
for (auto& socket : *sockets_) {
folly::Baton<> barrier;
- socket->getEventBase()->runInEventBaseThread([&]() {
- socket->removeAcceptCallback(worker->second.get(), nullptr);
+ socket->getEventBase()->runInEventBaseThreadAndWait([&]() {
+ socketFactory_->removeAcceptCB(
+ socket, worker->second.get(), nullptr);
barrier.post();
});
barrier.wait();
}
- CHECK(worker->second->getEventBase() != nullptr);
- CHECK(!worker->second->getEventBase()->isInEventBaseThread());
- folly::Baton<> barrier;
- worker->second->getEventBase()->runInEventBaseThread([&]() {
- worker->second->dropAllConnections();
- barrier.post();
- });
+ if (!worker->second->getEventBase()->isInEventBaseThread()) {
+ worker->second->getEventBase()->runInEventBaseThreadAndWait([=]() {
+ worker->second->dropAllConnections();
+ });
+ } else {
+ worker->second->dropAllConnections();
+ }
- barrier.wait();
workers_.erase(worker);
}
#include <folly/wangle/bootstrap/ServerBootstrap-inl.h>
#include <folly/Baton.h>
+#include <folly/wangle/channel/ChannelPipeline.h>
namespace folly {
~ServerBootstrap() {
stop();
}
- /* TODO(davejwatson)
- *
- * If there is any work to be done BEFORE handing the work to IO
- * threads, this handler is where the pipeline to do it would be
- * set.
- *
- * This could be used for things like logging, load balancing, or
- * advanced load balancing on IO threads. Netty also provides this.
+
+ typedef wangle::ChannelPipeline<
+ void*,
+ std::exception> AcceptPipeline;
+ /*
+ * Pipeline used to add connections to event bases.
+ * This is used for UDP or for load balancing
+ * TCP connections to IO threads explicitly
*/
- ServerBootstrap* handler() {
+ ServerBootstrap* pipeline(
+ std::shared_ptr<PipelineFactory<AcceptPipeline>> factory) {
+ pipeline_ = factory;
+ return this;
+ }
+
+ ServerBootstrap* channelFactory(
+ std::shared_ptr<ServerSocketFactory> factory) {
+ socketFactory_ = factory;
return this;
}
*/
ServerBootstrap* childPipeline(
std::shared_ptr<PipelineFactory<Pipeline>> factory) {
- pipelineFactory_ = factory;
+ childPipelineFactory_ = factory;
return this;
}
32, std::make_shared<wangle::NamedThreadFactory>("IO Thread"));
}
- CHECK(acceptorFactory_ || pipelineFactory_);
+ // TODO better config checking
+ // CHECK(acceptorFactory_ || childPipelineFactory_);
+ CHECK(!(acceptorFactory_ && childPipelineFactory_));
if (acceptorFactory_) {
workerFactory_ = std::make_shared<ServerWorkerPool>(
- acceptorFactory_, io_group.get(), &sockets_);
+ acceptorFactory_, io_group.get(), &sockets_, socketFactory_);
} else {
workerFactory_ = std::make_shared<ServerWorkerPool>(
- std::make_shared<ServerAcceptorFactory<Pipeline>>(pipelineFactory_),
- io_group.get(), &sockets_);
+ std::make_shared<ServerAcceptorFactory<Pipeline>>(
+ childPipelineFactory_,
+ pipeline_),
+ io_group.get(), &sockets_, socketFactory_);
}
io_group->addObserver(workerFactory_);
// Since only a single socket is given,
// we can only accept on a single thread
CHECK(acceptor_group_->numThreads() == 1);
+
std::shared_ptr<folly::AsyncServerSocket> socket(
s.release(), DelayedDestruction::Destructor());
folly::Baton<> barrier;
acceptor_group_->add([&](){
socket->attachEventBase(EventBaseManager::get()->getEventBase());
- socket->listen(1024);
+ socket->listen(socketConfig.acceptBacklog);
socket->startAccepting();
barrier.post();
});
// Startup all the threads
workerFactory_->forEachWorker([this, socket](Acceptor* worker){
- socket->getEventBase()->runInEventBaseThread([this, worker, socket](){
- socket->addAcceptCallback(worker, worker->getEventBase());
+ socket->getEventBase()->runInEventBaseThreadAndWait(
+ [this, worker, socket](){
+ socketFactory_->addAcceptCB(socket, worker, worker->getEventBase());
});
});
}
std::mutex sock_lock;
- std::vector<std::shared_ptr<folly::AsyncServerSocket>> new_sockets;
+ std::vector<std::shared_ptr<folly::AsyncSocketBase>> new_sockets;
+
std::exception_ptr exn;
auto startupFunc = [&](std::shared_ptr<folly::Baton<>> barrier){
- auto socket = folly::AsyncServerSocket::newSocket();
- socket->setReusePortEnabled(reusePort);
- socket->attachEventBase(EventBaseManager::get()->getEventBase());
-
- try {
- if (port >= 0) {
- socket->bind(port);
- } else {
- socket->bind(address);
- port = address.getPort();
- }
-
- socket->listen(socketConfig.acceptBacklog);
- socket->startAccepting();
- } catch (...) {
- exn = std::current_exception();
- barrier->post();
-
- return;
- }
+
+ try {
+ auto socket = socketFactory_->newSocket(
+ port, address, socketConfig.acceptBacklog, reusePort, socketConfig);
sock_lock.lock();
new_sockets.push_back(socket);
}
barrier->post();
+ } catch (...) {
+ exn = std::current_exception();
+ barrier->post();
+
+ return;
+ }
+
+
+
};
auto wait0 = std::make_shared<folly::Baton<>>();
std::rethrow_exception(exn);
}
- // Startup all the threads
- for(auto socket : new_sockets) {
+ for (auto& socket : new_sockets) {
+ // Startup all the threads
workerFactory_->forEachWorker([this, socket](Acceptor* worker){
- socket->getEventBase()->runInEventBaseThread([this, worker, socket](){
- socket->addAcceptCallback(worker, worker->getEventBase());
+ socket->getEventBase()->runInEventBaseThreadAndWait([this, worker, socket](){
+ socketFactory_->addAcceptCB(socket, worker, worker->getEventBase());
});
});
- }
- for (auto& socket : new_sockets) {
sockets_.push_back(socket);
}
}
void stop() {
for (auto socket : sockets_) {
folly::Baton<> barrier;
- socket->getEventBase()->runInEventBaseThread([&barrier, socket]() {
- socket->stopAccepting();
- socket->detachEventBase();
+ socket->getEventBase()->runInEventBaseThread([&]() mutable {
+ socketFactory_->stopSocket(socket);
barrier.post();
});
barrier.wait();
/*
* Get the list of listening sockets
*/
- const std::vector<std::shared_ptr<folly::AsyncServerSocket>>&
+ const std::vector<std::shared_ptr<folly::AsyncSocketBase>>&
getSockets() const {
return sockets_;
}
std::shared_ptr<wangle::IOThreadPoolExecutor> io_group_;
std::shared_ptr<ServerWorkerPool> workerFactory_;
- std::vector<std::shared_ptr<folly::AsyncServerSocket>> sockets_;
+ std::vector<std::shared_ptr<folly::AsyncSocketBase>> sockets_;
std::shared_ptr<AcceptorFactory> acceptorFactory_;
- std::shared_ptr<PipelineFactory<Pipeline>> pipelineFactory_;
+ std::shared_ptr<PipelineFactory<Pipeline>> childPipelineFactory_;
+ std::shared_ptr<PipelineFactory<AcceptPipeline>> pipeline_{
+ std::make_shared<DefaultAcceptPipelineFactory>()};
+ std::shared_ptr<ServerSocketFactory> socketFactory_{
+ std::make_shared<AsyncServerSocketFactory>()};
};
} // namespace
--- /dev/null
+/*
+ * Copyright 2015 Facebook, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#pragma once
+
+#include <folly/wangle/bootstrap/ServerBootstrap-inl.h>
+#include <folly/io/async/AsyncServerSocket.h>
+#include <folly/io/async/EventBaseManager.h>
+#include <folly/io/async/AsyncUDPServerSocket.h>
+
+namespace folly {
+
+class ServerSocketFactory {
+ public:
+ virtual std::shared_ptr<AsyncSocketBase> newSocket(
+ int port, SocketAddress address, int backlog,
+ bool reuse, ServerSocketConfig& config) = 0;
+
+ virtual void stopSocket(
+ std::shared_ptr<AsyncSocketBase>& socket) = 0;
+
+ virtual void removeAcceptCB(std::shared_ptr<AsyncSocketBase> sock, Acceptor *callback, EventBase* base) = 0;
+ virtual void addAcceptCB(std::shared_ptr<AsyncSocketBase> sock, Acceptor* callback, EventBase* base) = 0 ;
+ virtual ~ServerSocketFactory() = default;
+};
+
+class AsyncServerSocketFactory : public ServerSocketFactory {
+ public:
+ std::shared_ptr<AsyncSocketBase> newSocket(
+ int port, SocketAddress address, int backlog, bool reuse,
+ ServerSocketConfig& config) {
+
+ auto socket = folly::AsyncServerSocket::newSocket();
+ socket->setReusePortEnabled(reuse);
+ socket->attachEventBase(EventBaseManager::get()->getEventBase());
+ if (port >= 0) {
+ socket->bind(port);
+ } else {
+ socket->bind(address);
+ }
+
+ socket->listen(config.acceptBacklog);
+ socket->startAccepting();
+
+ return socket;
+ }
+
+ virtual void stopSocket(
+ std::shared_ptr<AsyncSocketBase>& s) {
+ auto socket = std::dynamic_pointer_cast<AsyncServerSocket>(s);
+ DCHECK(socket);
+ socket->stopAccepting();
+ socket->detachEventBase();
+ }
+
+ virtual void removeAcceptCB(std::shared_ptr<AsyncSocketBase> s,
+ Acceptor *callback, EventBase* base) {
+ auto socket = std::dynamic_pointer_cast<AsyncServerSocket>(s);
+ CHECK(socket);
+ socket->removeAcceptCallback(callback, base);
+ }
+
+ virtual void addAcceptCB(std::shared_ptr<AsyncSocketBase> s,
+ Acceptor* callback, EventBase* base) {
+ auto socket = std::dynamic_pointer_cast<AsyncServerSocket>(s);
+ CHECK(socket);
+ socket->addAcceptCallback(callback, base);
+ }
+};
+
+class AsyncUDPServerSocketFactory : public ServerSocketFactory {
+ public:
+ std::shared_ptr<AsyncSocketBase> newSocket(
+ int port, SocketAddress address, int backlog, bool reuse,
+ ServerSocketConfig& config) {
+
+ auto socket = std::make_shared<AsyncUDPServerSocket>(
+ EventBaseManager::get()->getEventBase());
+ //socket->setReusePortEnabled(reuse);
+ SocketAddress addressr("::1", port);
+ socket->bind(addressr);
+ socket->listen();
+
+ return socket;
+ }
+
+ virtual void stopSocket(
+ std::shared_ptr<AsyncSocketBase>& s) {
+ auto socket = std::dynamic_pointer_cast<AsyncUDPServerSocket>(s);
+ DCHECK(socket);
+ socket->close();
+ }
+
+ virtual void removeAcceptCB(std::shared_ptr<AsyncSocketBase> s,
+ Acceptor *callback, EventBase* base) {
+ }
+
+ virtual void addAcceptCB(std::shared_ptr<AsyncSocketBase> s,
+ Acceptor* callback, EventBase* base) {
+ auto socket = std::dynamic_pointer_cast<AsyncUDPServerSocket>(s);
+ DCHECK(socket);
+ socket->addListener(base, callback);
+ }
+};
+
+} // namespace