2 * Copyright 2015 Facebook, Inc.
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
8 * http://www.apache.org/licenses/LICENSE-2.0
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
18 #include <folly/wangle/acceptor/Acceptor.h>
19 #include <folly/wangle/bootstrap/ServerSocketFactory.h>
20 #include <folly/io/async/EventBaseManager.h>
21 #include <folly/wangle/concurrent/IOThreadPoolExecutor.h>
22 #include <folly/wangle/acceptor/ManagedConnection.h>
23 #include <folly/wangle/channel/Pipeline.h>
24 #include <folly/wangle/channel/Handler.h>
28 template <typename Pipeline>
31 , public folly::wangle::InboundHandler<void*> {
32 typedef std::unique_ptr<Pipeline,
33 folly::DelayedDestruction::Destructor> PipelinePtr;
35 class ServerConnection : public wangle::ManagedConnection {
37 explicit ServerConnection(PipelinePtr pipeline)
38 : pipeline_(std::move(pipeline)) {}
43 void timeoutExpired() noexcept {
46 void describe(std::ostream& os) const {}
50 void notifyPendingShutdown() {}
51 void closeWhenIdle() {}
52 void dropConnection() {
55 void dumpConnectionState(uint8_t loglevel) {}
57 PipelinePtr pipeline_;
61 explicit ServerAcceptor(
62 std::shared_ptr<PipelineFactory<Pipeline>> pipelineFactory,
63 std::shared_ptr<folly::wangle::Pipeline<
64 void*, std::exception>> acceptorPipeline,
66 : Acceptor(ServerSocketConfig())
68 , childPipelineFactory_(pipelineFactory)
69 , acceptorPipeline_(acceptorPipeline) {
70 Acceptor::init(nullptr, base_);
71 CHECK(acceptorPipeline_);
73 acceptorPipeline_->addBack(this);
74 acceptorPipeline_->finalize();
77 void read(Context* ctx, void* conn) {
78 AsyncSocket::UniquePtr transport((AsyncSocket*)conn);
79 std::unique_ptr<Pipeline,
80 folly::DelayedDestruction::Destructor>
81 pipeline(childPipelineFactory_->newPipeline(
82 std::shared_ptr<AsyncSocket>(
84 folly::DelayedDestruction::Destructor())));
85 auto connection = new ServerConnection(std::move(pipeline));
86 Acceptor::addConnection(connection);
89 /* See Acceptor::onNewConnection for details */
91 AsyncSocket::UniquePtr transport, const SocketAddress* address,
92 const std::string& nextProtocolName, const TransportInfo& tinfo) {
93 acceptorPipeline_->read(transport.release());
97 void onDataAvailable(std::shared_ptr<AsyncUDPSocket> socket,
98 const folly::SocketAddress& addr,
99 std::unique_ptr<folly::IOBuf> buf,
100 bool truncated) noexcept {
101 acceptorPipeline_->read(buf.release());
107 std::shared_ptr<PipelineFactory<Pipeline>> childPipelineFactory_;
108 std::shared_ptr<folly::wangle::Pipeline<
109 void*, std::exception>> acceptorPipeline_;
112 template <typename Pipeline>
113 class ServerAcceptorFactory : public AcceptorFactory {
115 explicit ServerAcceptorFactory(
116 std::shared_ptr<PipelineFactory<Pipeline>> factory,
117 std::shared_ptr<PipelineFactory<folly::wangle::Pipeline<
118 void*, std::exception>>> pipeline)
120 , pipeline_(pipeline) {}
122 std::shared_ptr<Acceptor> newAcceptor(EventBase* base) {
123 std::shared_ptr<folly::wangle::Pipeline<
124 void*, std::exception>> pipeline(
125 pipeline_->newPipeline(nullptr));
126 return std::make_shared<ServerAcceptor<Pipeline>>(factory_, pipeline, base);
129 std::shared_ptr<PipelineFactory<Pipeline>> factory_;
130 std::shared_ptr<PipelineFactory<
131 folly::wangle::Pipeline<
132 void*, std::exception>>> pipeline_;
135 class ServerWorkerPool : public folly::wangle::ThreadPoolExecutor::Observer {
137 explicit ServerWorkerPool(
138 std::shared_ptr<AcceptorFactory> acceptorFactory,
139 folly::wangle::IOThreadPoolExecutor* exec,
140 std::shared_ptr<std::vector<std::shared_ptr<folly::AsyncSocketBase>>> sockets,
141 std::shared_ptr<ServerSocketFactory> socketFactory)
142 : acceptorFactory_(acceptorFactory)
145 , socketFactory_(socketFactory) {
149 template <typename F>
150 void forEachWorker(F&& f) const;
153 folly::wangle::ThreadPoolExecutor::ThreadHandle*);
155 folly::wangle::ThreadPoolExecutor::ThreadHandle*);
156 void threadPreviouslyStarted(
157 folly::wangle::ThreadPoolExecutor::ThreadHandle* thread) {
158 threadStarted(thread);
160 void threadNotYetStopped(
161 folly::wangle::ThreadPoolExecutor::ThreadHandle* thread) {
162 threadStopped(thread);
166 std::map<folly::wangle::ThreadPoolExecutor::ThreadHandle*,
167 std::shared_ptr<Acceptor>> workers_;
168 std::shared_ptr<AcceptorFactory> acceptorFactory_;
169 folly::wangle::IOThreadPoolExecutor* exec_{nullptr};
170 std::shared_ptr<std::vector<std::shared_ptr<folly::AsyncSocketBase>>> sockets_;
171 std::shared_ptr<ServerSocketFactory> socketFactory_;
174 template <typename F>
175 void ServerWorkerPool::forEachWorker(F&& f) const {
176 for (const auto& kv : workers_) {
181 class DefaultAcceptPipelineFactory
182 : public PipelineFactory<wangle::Pipeline<void*, std::exception>> {
183 typedef wangle::Pipeline<
185 std::exception> AcceptPipeline;
188 AcceptPipeline* newPipeline(std::shared_ptr<AsyncSocket>) {
189 return new AcceptPipeline;