#pragma once
#include <folly/wangle/acceptor/Acceptor.h>
+#include <folly/wangle/bootstrap/ServerSocketFactory.h>
#include <folly/io/async/EventBaseManager.h>
#include <folly/wangle/concurrent/IOThreadPoolExecutor.h>
#include <folly/wangle/acceptor/ManagedConnection.h>
#include <folly/wangle/channel/ChannelPipeline.h>
+#include <folly/wangle/channel/ChannelHandler.h>
namespace folly {
template <typename Pipeline>
-class ServerAcceptor : public Acceptor {
+class ServerAcceptor
+ : public Acceptor
+ , public folly::wangle::ChannelHandlerAdapter<void*, std::exception> {
typedef std::unique_ptr<Pipeline,
folly::DelayedDestruction::Destructor> PipelinePtr;
public:
explicit ServerAcceptor(
- std::shared_ptr<PipelineFactory<Pipeline>> pipelineFactory,
- EventBase* base)
+ std::shared_ptr<PipelineFactory<Pipeline>> pipelineFactory,
+ std::shared_ptr<folly::wangle::ChannelPipeline<
+ void*, std::exception>> acceptorPipeline,
+ EventBase* base)
: Acceptor(ServerSocketConfig())
- , pipelineFactory_(pipelineFactory) {
- Acceptor::init(nullptr, base);
+ , base_(base)
+ , childPipelineFactory_(pipelineFactory)
+ , acceptorPipeline_(acceptorPipeline) {
+ Acceptor::init(nullptr, base_);
+ CHECK(acceptorPipeline_);
+
+ acceptorPipeline_->addBack(folly::wangle::ChannelHandlerPtr<ServerAcceptor, false>(this));
+ acceptorPipeline_->finalize();
}
- /* See Acceptor::onNewConnection for details */
- void onNewConnection(
- AsyncSocket::UniquePtr transport, const SocketAddress* address,
- const std::string& nextProtocolName, const TransportInfo& tinfo) {
-
+ void read(Context* ctx, void* conn) {
+ AsyncSocket::UniquePtr transport((AsyncSocket*)conn);
std::unique_ptr<Pipeline,
folly::DelayedDestruction::Destructor>
- pipeline(pipelineFactory_->newPipeline(
+ pipeline(childPipelineFactory_->newPipeline(
std::shared_ptr<AsyncSocket>(
transport.release(),
folly::DelayedDestruction::Destructor())));
Acceptor::addConnection(connection);
}
+ folly::Future<void> write(Context* ctx, std::exception e) {
+ return ctx->fireWrite(e);
+ }
+
+ /* See Acceptor::onNewConnection for details */
+ void onNewConnection(
+ AsyncSocket::UniquePtr transport, const SocketAddress* address,
+ const std::string& nextProtocolName, const TransportInfo& tinfo) {
+ acceptorPipeline_->read(transport.release());
+ }
+
+ // UDP thunk
+ void onDataAvailable(const folly::SocketAddress& addr,
+ std::unique_ptr<folly::IOBuf> buf,
+ bool truncated) noexcept {
+ acceptorPipeline_->read(buf.release());
+ }
+
private:
- std::shared_ptr<PipelineFactory<Pipeline>> pipelineFactory_;
+ EventBase* base_;
+
+ std::shared_ptr<PipelineFactory<Pipeline>> childPipelineFactory_;
+ std::shared_ptr<folly::wangle::ChannelPipeline<
+ void*, std::exception>> acceptorPipeline_;
};
template <typename Pipeline>
class ServerAcceptorFactory : public AcceptorFactory {
public:
explicit ServerAcceptorFactory(
- std::shared_ptr<PipelineFactory<Pipeline>> factory)
- : factory_(factory) {}
-
- std::shared_ptr<Acceptor> newAcceptor(folly::EventBase* base) {
- return std::make_shared<ServerAcceptor<Pipeline>>(factory_, base);
+ std::shared_ptr<PipelineFactory<Pipeline>> factory,
+ std::shared_ptr<PipelineFactory<folly::wangle::ChannelPipeline<
+ void*, std::exception>>> pipeline)
+ : factory_(factory)
+ , pipeline_(pipeline) {}
+
+ std::shared_ptr<Acceptor> newAcceptor(EventBase* base) {
+ std::shared_ptr<folly::wangle::ChannelPipeline<
+ void*, std::exception>> pipeline(
+ pipeline_->newPipeline(nullptr));
+ return std::make_shared<ServerAcceptor<Pipeline>>(factory_, pipeline, base);
}
private:
std::shared_ptr<PipelineFactory<Pipeline>> factory_;
+ std::shared_ptr<PipelineFactory<
+ folly::wangle::ChannelPipeline<
+ void*, std::exception>>> pipeline_;
};
class ServerWorkerPool : public folly::wangle::ThreadPoolExecutor::Observer {
explicit ServerWorkerPool(
std::shared_ptr<AcceptorFactory> acceptorFactory,
folly::wangle::IOThreadPoolExecutor* exec,
- std::vector<std::shared_ptr<folly::AsyncServerSocket>>* sockets)
+ std::vector<std::shared_ptr<folly::AsyncSocketBase>>* sockets,
+ std::shared_ptr<ServerSocketFactory> socketFactory)
: acceptorFactory_(acceptorFactory)
, exec_(exec)
- , sockets_(sockets) {
+ , sockets_(sockets)
+ , socketFactory_(socketFactory) {
CHECK(exec);
}
std::shared_ptr<Acceptor>> workers_;
std::shared_ptr<AcceptorFactory> acceptorFactory_;
folly::wangle::IOThreadPoolExecutor* exec_{nullptr};
- std::vector<std::shared_ptr<folly::AsyncServerSocket>>* sockets_;
+ std::vector<std::shared_ptr<folly::AsyncSocketBase>>* sockets_;
+ std::shared_ptr<ServerSocketFactory> socketFactory_;
};
template <typename F>
}
}
+class DefaultAcceptPipelineFactory
+ : public PipelineFactory<wangle::ChannelPipeline<void*, std::exception>> {
+ typedef wangle::ChannelPipeline<
+ void*,
+ std::exception> AcceptPipeline;
+
+ public:
+ AcceptPipeline* newPipeline(std::shared_ptr<AsyncSocket>) {
+ return new AcceptPipeline;
+ }
+};
+
} // namespace