2 * Copyright 2014 Facebook, Inc.
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
8 * http://www.apache.org/licenses/LICENSE-2.0
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
18 #include <folly/wangle/acceptor/Acceptor.h>
19 #include <folly/io/async/EventBaseManager.h>
20 #include <folly/wangle/concurrent/IOThreadPoolExecutor.h>
21 #include <folly/wangle/acceptor/ManagedConnection.h>
22 #include <folly/wangle/channel/ChannelPipeline.h>
26 template <typename Pipeline>
27 class ServerAcceptor : public Acceptor {
28 typedef std::unique_ptr<Pipeline,
29 folly::DelayedDestruction::Destructor> PipelinePtr;
31 class ServerConnection : public wangle::ManagedConnection {
33 explicit ServerConnection(PipelinePtr pipeline)
34 : pipeline_(std::move(pipeline)) {}
39 void timeoutExpired() noexcept {
42 void describe(std::ostream& os) const {}
46 void notifyPendingShutdown() {}
47 void closeWhenIdle() {}
48 void dropConnection() {}
49 void dumpConnectionState(uint8_t loglevel) {}
51 PipelinePtr pipeline_;
55 explicit ServerAcceptor(
56 std::shared_ptr<PipelineFactory<Pipeline>> pipelineFactory)
57 : Acceptor(ServerSocketConfig())
58 , pipelineFactory_(pipelineFactory) {
59 Acceptor::init(nullptr, &base_);
62 /* See Acceptor::onNewConnection for details */
64 AsyncSocket::UniquePtr transport, const SocketAddress* address,
65 const std::string& nextProtocolName, const TransportInfo& tinfo) {
67 std::unique_ptr<Pipeline,
68 folly::DelayedDestruction::Destructor>
69 pipeline(pipelineFactory_->newPipeline(
70 std::shared_ptr<AsyncSocket>(
72 folly::DelayedDestruction::Destructor())));
73 auto connection = new ServerConnection(std::move(pipeline));
74 Acceptor::addConnection(connection);
78 Acceptor::dropAllConnections();
84 std::shared_ptr<PipelineFactory<Pipeline>> pipelineFactory_;
87 template <typename Pipeline>
88 class ServerAcceptorFactory : public AcceptorFactory {
90 explicit ServerAcceptorFactory(
91 std::shared_ptr<PipelineFactory<Pipeline>> factory)
92 : factory_(factory) {}
94 std::shared_ptr<Acceptor> newAcceptor() {
95 return std::make_shared<ServerAcceptor<Pipeline>>(factory_);
98 std::shared_ptr<PipelineFactory<Pipeline>> factory_;
101 class ServerWorkerFactory : public folly::wangle::ThreadFactory {
103 explicit ServerWorkerFactory(std::shared_ptr<AcceptorFactory> acceptorFactory)
105 std::make_shared<folly::wangle::NamedThreadFactory>("BootstrapWorker"))
106 , acceptorFactory_(acceptorFactory)
108 virtual std::thread newThread(folly::Func&& func) override;
110 void setInternalFactory(
111 std::shared_ptr<folly::wangle::NamedThreadFactory> internalFactory);
112 void setNamePrefix(folly::StringPiece prefix);
114 template <typename F>
115 void forEachWorker(F&& f);
118 std::shared_ptr<folly::wangle::NamedThreadFactory> internalFactory_;
119 folly::RWSpinLock workersLock_;
120 std::map<int32_t, std::shared_ptr<Acceptor>> workers_;
121 int32_t nextWorkerId_{0};
123 std::shared_ptr<AcceptorFactory> acceptorFactory_;
126 template <typename F>
127 void ServerWorkerFactory::forEachWorker(F&& f) {
128 folly::RWSpinLock::ReadHolder guard(workersLock_);
129 for (const auto& kv : workers_) {