a224a9f915ff25e5cf0f9871412338b602208f80
[folly.git] / folly / wangle / bootstrap / ServerBootstrap-inl.h
1 /*
2  * Copyright 2014 Facebook, Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *   http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 #pragma once
17
18 #include <folly/wangle/acceptor/Acceptor.h>
19 #include <folly/io/async/EventBaseManager.h>
20 #include <folly/wangle/concurrent/IOThreadPoolExecutor.h>
21 #include <folly/wangle/acceptor/ManagedConnection.h>
22 #include <folly/wangle/channel/ChannelPipeline.h>
23
24 namespace folly {
25
26 template <typename Pipeline>
27 class ServerAcceptor : public Acceptor {
28   typedef std::unique_ptr<Pipeline,
29                           folly::DelayedDestruction::Destructor> PipelinePtr;
30
31   class ServerConnection : public wangle::ManagedConnection {
32    public:
33     explicit ServerConnection(PipelinePtr pipeline)
34         : pipeline_(std::move(pipeline)) {}
35
36     ~ServerConnection() {
37     }
38
39     void timeoutExpired() noexcept {
40     }
41
42     void describe(std::ostream& os) const {}
43     bool isBusy() const {
44       return false;
45     }
46     void notifyPendingShutdown() {}
47     void closeWhenIdle() {}
48     void dropConnection() {}
49     void dumpConnectionState(uint8_t loglevel) {}
50    private:
51     PipelinePtr pipeline_;
52   };
53
54  public:
55   explicit ServerAcceptor(
56     std::shared_ptr<PipelineFactory<Pipeline>> pipelineFactory)
57       : Acceptor(ServerSocketConfig())
58       , pipelineFactory_(pipelineFactory) {
59     Acceptor::init(nullptr, &base_);
60   }
61
62   /* See Acceptor::onNewConnection for details */
63   void onNewConnection(
64     AsyncSocket::UniquePtr transport, const SocketAddress* address,
65     const std::string& nextProtocolName, const TransportInfo& tinfo) {
66
67       std::unique_ptr<Pipeline,
68                        folly::DelayedDestruction::Destructor>
69       pipeline(pipelineFactory_->newPipeline(
70         std::shared_ptr<AsyncSocket>(
71           transport.release(),
72           folly::DelayedDestruction::Destructor())));
73     auto connection = new ServerConnection(std::move(pipeline));
74     Acceptor::addConnection(connection);
75   }
76
77   ~ServerAcceptor() {
78     Acceptor::dropAllConnections();
79   }
80
81  private:
82   EventBase base_;
83
84   std::shared_ptr<PipelineFactory<Pipeline>> pipelineFactory_;
85 };
86
87 template <typename Pipeline>
88 class ServerAcceptorFactory : public AcceptorFactory {
89  public:
90   explicit ServerAcceptorFactory(
91       std::shared_ptr<PipelineFactory<Pipeline>> factory)
92     : factory_(factory) {}
93
94   std::shared_ptr<Acceptor> newAcceptor() {
95     return std::make_shared<ServerAcceptor<Pipeline>>(factory_);
96   }
97  private:
98   std::shared_ptr<PipelineFactory<Pipeline>> factory_;
99 };
100
101 class ServerWorkerFactory : public folly::wangle::ThreadFactory {
102  public:
103   explicit ServerWorkerFactory(std::shared_ptr<AcceptorFactory> acceptorFactory)
104       : internalFactory_(
105         std::make_shared<folly::wangle::NamedThreadFactory>("BootstrapWorker"))
106       , acceptorFactory_(acceptorFactory)
107     {}
108   virtual std::thread newThread(folly::Func&& func) override;
109
110   void setInternalFactory(
111     std::shared_ptr<folly::wangle::NamedThreadFactory> internalFactory);
112   void setNamePrefix(folly::StringPiece prefix);
113
114   template <typename F>
115   void forEachWorker(F&& f);
116
117  private:
118   std::shared_ptr<folly::wangle::NamedThreadFactory> internalFactory_;
119   folly::RWSpinLock workersLock_;
120   std::map<int32_t, std::shared_ptr<Acceptor>> workers_;
121   int32_t nextWorkerId_{0};
122
123   std::shared_ptr<AcceptorFactory> acceptorFactory_;
124 };
125
126 template <typename F>
127 void ServerWorkerFactory::forEachWorker(F&& f) {
128   folly::RWSpinLock::ReadHolder guard(workersLock_);
129   for (const auto& kv : workers_) {
130     f(kv.second.get());
131   }
132 }
133
134 } // namespace