2 * Copyright 2015 Facebook, Inc.
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
8 * http://www.apache.org/licenses/LICENSE-2.0
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
18 #include <folly/wangle/acceptor/Acceptor.h>
19 #include <folly/io/async/EventBaseManager.h>
20 #include <folly/wangle/concurrent/IOThreadPoolExecutor.h>
21 #include <folly/wangle/acceptor/ManagedConnection.h>
22 #include <folly/wangle/channel/ChannelPipeline.h>
26 template <typename Pipeline>
27 class ServerAcceptor : public Acceptor {
28 typedef std::unique_ptr<Pipeline,
29 folly::DelayedDestruction::Destructor> PipelinePtr;
31 class ServerConnection : public wangle::ManagedConnection {
33 explicit ServerConnection(PipelinePtr pipeline)
34 : pipeline_(std::move(pipeline)) {}
39 void timeoutExpired() noexcept {
42 void describe(std::ostream& os) const {}
46 void notifyPendingShutdown() {}
47 void closeWhenIdle() {}
48 void dropConnection() {
51 void dumpConnectionState(uint8_t loglevel) {}
53 PipelinePtr pipeline_;
57 explicit ServerAcceptor(
58 std::shared_ptr<PipelineFactory<Pipeline>> pipelineFactory,
60 : Acceptor(ServerSocketConfig())
61 , pipelineFactory_(pipelineFactory) {
62 Acceptor::init(nullptr, base);
65 /* See Acceptor::onNewConnection for details */
67 AsyncSocket::UniquePtr transport, const SocketAddress* address,
68 const std::string& nextProtocolName, const TransportInfo& tinfo) {
70 std::unique_ptr<Pipeline,
71 folly::DelayedDestruction::Destructor>
72 pipeline(pipelineFactory_->newPipeline(
73 std::shared_ptr<AsyncSocket>(
75 folly::DelayedDestruction::Destructor())));
76 auto connection = new ServerConnection(std::move(pipeline));
77 Acceptor::addConnection(connection);
81 std::shared_ptr<PipelineFactory<Pipeline>> pipelineFactory_;
84 template <typename Pipeline>
85 class ServerAcceptorFactory : public AcceptorFactory {
87 explicit ServerAcceptorFactory(
88 std::shared_ptr<PipelineFactory<Pipeline>> factory)
89 : factory_(factory) {}
91 std::shared_ptr<Acceptor> newAcceptor(folly::EventBase* base) {
92 return std::make_shared<ServerAcceptor<Pipeline>>(factory_, base);
95 std::shared_ptr<PipelineFactory<Pipeline>> factory_;
98 class ServerWorkerPool : public folly::wangle::ThreadPoolExecutor::Observer {
100 explicit ServerWorkerPool(
101 std::shared_ptr<AcceptorFactory> acceptorFactory,
102 folly::wangle::IOThreadPoolExecutor* exec,
103 std::vector<std::shared_ptr<folly::AsyncServerSocket>>* sockets)
104 : acceptorFactory_(acceptorFactory)
106 , sockets_(sockets) {
110 template <typename F>
111 void forEachWorker(F&& f) const;
114 folly::wangle::ThreadPoolExecutor::ThreadHandle*);
116 folly::wangle::ThreadPoolExecutor::ThreadHandle*);
117 void threadPreviouslyStarted(
118 folly::wangle::ThreadPoolExecutor::ThreadHandle* thread) {
119 threadStarted(thread);
121 void threadNotYetStopped(
122 folly::wangle::ThreadPoolExecutor::ThreadHandle* thread) {
123 threadStopped(thread);
127 std::map<folly::wangle::ThreadPoolExecutor::ThreadHandle*,
128 std::shared_ptr<Acceptor>> workers_;
129 std::shared_ptr<AcceptorFactory> acceptorFactory_;
130 folly::wangle::IOThreadPoolExecutor* exec_{nullptr};
131 std::vector<std::shared_ptr<folly::AsyncServerSocket>>* sockets_;
134 template <typename F>
135 void ServerWorkerPool::forEachWorker(F&& f) const {
136 for (const auto& kv : workers_) {