2 * Copyright 2015 Facebook, Inc.
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
8 * http://www.apache.org/licenses/LICENSE-2.0
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
18 #include <folly/wangle/acceptor/Acceptor.h>
19 #include <folly/wangle/bootstrap/ServerSocketFactory.h>
20 #include <folly/io/async/EventBaseManager.h>
21 #include <folly/wangle/concurrent/IOThreadPoolExecutor.h>
22 #include <folly/wangle/acceptor/ManagedConnection.h>
23 #include <folly/wangle/channel/Pipeline.h>
24 #include <folly/wangle/channel/Handler.h>
28 template <typename Pipeline>
31 , public folly::wangle::InboundHandler<void*> {
32 typedef std::unique_ptr<Pipeline,
33 folly::DelayedDestruction::Destructor> PipelinePtr;
35 class ServerConnection : public wangle::ManagedConnection,
36 public wangle::PipelineManager {
38 explicit ServerConnection(PipelinePtr pipeline)
39 : pipeline_(std::move(pipeline)) {
40 pipeline_->setPipelineManager(this);
43 ~ServerConnection() {}
45 void timeoutExpired() noexcept override {
48 void describe(std::ostream& os) const override {}
49 bool isBusy() const override {
52 void notifyPendingShutdown() override {}
53 void closeWhenIdle() override {}
54 void dropConnection() override {
57 void dumpConnectionState(uint8_t loglevel) override {}
59 void deletePipeline(wangle::PipelineBase* p) override {
60 CHECK(p == pipeline_.get());
65 PipelinePtr pipeline_;
69 explicit ServerAcceptor(
70 std::shared_ptr<PipelineFactory<Pipeline>> pipelineFactory,
71 std::shared_ptr<folly::wangle::Pipeline<void*>> acceptorPipeline,
73 : Acceptor(ServerSocketConfig())
75 , childPipelineFactory_(pipelineFactory)
76 , acceptorPipeline_(acceptorPipeline) {
77 Acceptor::init(nullptr, base_);
78 CHECK(acceptorPipeline_);
80 acceptorPipeline_->addBack(this);
81 acceptorPipeline_->finalize();
84 void read(Context* ctx, void* conn) {
85 AsyncSocket::UniquePtr transport((AsyncSocket*)conn);
86 std::unique_ptr<Pipeline,
87 folly::DelayedDestruction::Destructor>
88 pipeline(childPipelineFactory_->newPipeline(
89 std::shared_ptr<AsyncSocket>(
91 folly::DelayedDestruction::Destructor())));
92 pipeline->transportActive();
93 auto connection = new ServerConnection(std::move(pipeline));
94 Acceptor::addConnection(connection);
97 /* See Acceptor::onNewConnection for details */
99 AsyncSocket::UniquePtr transport, const SocketAddress* address,
100 const std::string& nextProtocolName, const TransportInfo& tinfo) {
101 acceptorPipeline_->read(transport.release());
105 void onDataAvailable(std::shared_ptr<AsyncUDPSocket> socket,
106 const folly::SocketAddress& addr,
107 std::unique_ptr<folly::IOBuf> buf,
108 bool truncated) noexcept {
109 acceptorPipeline_->read(buf.release());
115 std::shared_ptr<PipelineFactory<Pipeline>> childPipelineFactory_;
116 std::shared_ptr<folly::wangle::Pipeline<void*>> acceptorPipeline_;
119 template <typename Pipeline>
120 class ServerAcceptorFactory : public AcceptorFactory {
122 explicit ServerAcceptorFactory(
123 std::shared_ptr<PipelineFactory<Pipeline>> factory,
124 std::shared_ptr<PipelineFactory<folly::wangle::Pipeline<void*>>> pipeline)
126 , pipeline_(pipeline) {}
128 std::shared_ptr<Acceptor> newAcceptor(EventBase* base) {
129 std::shared_ptr<folly::wangle::Pipeline<void*>> pipeline(
130 pipeline_->newPipeline(nullptr));
131 return std::make_shared<ServerAcceptor<Pipeline>>(factory_, pipeline, base);
134 std::shared_ptr<PipelineFactory<Pipeline>> factory_;
135 std::shared_ptr<PipelineFactory<
136 folly::wangle::Pipeline<void*>>> pipeline_;
139 class ServerWorkerPool : public folly::wangle::ThreadPoolExecutor::Observer {
141 explicit ServerWorkerPool(
142 std::shared_ptr<AcceptorFactory> acceptorFactory,
143 folly::wangle::IOThreadPoolExecutor* exec,
144 std::shared_ptr<std::vector<std::shared_ptr<folly::AsyncSocketBase>>> sockets,
145 std::shared_ptr<ServerSocketFactory> socketFactory)
146 : acceptorFactory_(acceptorFactory)
149 , socketFactory_(socketFactory) {
153 template <typename F>
154 void forEachWorker(F&& f) const;
157 folly::wangle::ThreadPoolExecutor::ThreadHandle*);
159 folly::wangle::ThreadPoolExecutor::ThreadHandle*);
160 void threadPreviouslyStarted(
161 folly::wangle::ThreadPoolExecutor::ThreadHandle* thread) {
162 threadStarted(thread);
164 void threadNotYetStopped(
165 folly::wangle::ThreadPoolExecutor::ThreadHandle* thread) {
166 threadStopped(thread);
170 std::map<folly::wangle::ThreadPoolExecutor::ThreadHandle*,
171 std::shared_ptr<Acceptor>> workers_;
172 std::shared_ptr<AcceptorFactory> acceptorFactory_;
173 folly::wangle::IOThreadPoolExecutor* exec_{nullptr};
174 std::shared_ptr<std::vector<std::shared_ptr<folly::AsyncSocketBase>>> sockets_;
175 std::shared_ptr<ServerSocketFactory> socketFactory_;
178 template <typename F>
179 void ServerWorkerPool::forEachWorker(F&& f) const {
180 for (const auto& kv : workers_) {
185 class DefaultAcceptPipelineFactory
186 : public PipelineFactory<wangle::Pipeline<void*>> {
187 typedef wangle::Pipeline<void*> AcceptPipeline;
190 std::unique_ptr<AcceptPipeline, folly::DelayedDestruction::Destructor>
191 newPipeline(std::shared_ptr<AsyncSocket>) {
193 return std::unique_ptr<AcceptPipeline, folly::DelayedDestruction::Destructor>
194 (new AcceptPipeline);