Udp Acceptor
[folly.git] / folly / wangle / bootstrap / ServerBootstrap-inl.h
1 /*
2  * Copyright 2015 Facebook, Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *   http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 #pragma once
17
18 #include <folly/wangle/acceptor/Acceptor.h>
19 #include <folly/wangle/bootstrap/ServerSocketFactory.h>
20 #include <folly/io/async/EventBaseManager.h>
21 #include <folly/wangle/concurrent/IOThreadPoolExecutor.h>
22 #include <folly/wangle/acceptor/ManagedConnection.h>
23 #include <folly/wangle/channel/ChannelPipeline.h>
24 #include <folly/wangle/channel/ChannelHandler.h>
25
26 namespace folly {
27
28 template <typename Pipeline>
29 class ServerAcceptor
30     : public Acceptor
31     , public folly::wangle::ChannelHandlerAdapter<void*, std::exception> {
32   typedef std::unique_ptr<Pipeline,
33                           folly::DelayedDestruction::Destructor> PipelinePtr;
34
35   class ServerConnection : public wangle::ManagedConnection {
36    public:
37     explicit ServerConnection(PipelinePtr pipeline)
38         : pipeline_(std::move(pipeline)) {}
39
40     ~ServerConnection() {
41     }
42
43     void timeoutExpired() noexcept {
44     }
45
46     void describe(std::ostream& os) const {}
47     bool isBusy() const {
48       return false;
49     }
50     void notifyPendingShutdown() {}
51     void closeWhenIdle() {}
52     void dropConnection() {
53       delete this;
54     }
55     void dumpConnectionState(uint8_t loglevel) {}
56    private:
57     PipelinePtr pipeline_;
58   };
59
60  public:
61   explicit ServerAcceptor(
62         std::shared_ptr<PipelineFactory<Pipeline>> pipelineFactory,
63         std::shared_ptr<folly::wangle::ChannelPipeline<
64                           void*, std::exception>> acceptorPipeline,
65         EventBase* base)
66       : Acceptor(ServerSocketConfig())
67       , base_(base)
68       , childPipelineFactory_(pipelineFactory)
69       , acceptorPipeline_(acceptorPipeline) {
70     Acceptor::init(nullptr, base_);
71     CHECK(acceptorPipeline_);
72
73     acceptorPipeline_->addBack(folly::wangle::ChannelHandlerPtr<ServerAcceptor, false>(this));
74     acceptorPipeline_->finalize();
75   }
76
77   void read(Context* ctx, void* conn) {
78     AsyncSocket::UniquePtr transport((AsyncSocket*)conn);
79       std::unique_ptr<Pipeline,
80                        folly::DelayedDestruction::Destructor>
81       pipeline(childPipelineFactory_->newPipeline(
82         std::shared_ptr<AsyncSocket>(
83           transport.release(),
84           folly::DelayedDestruction::Destructor())));
85     auto connection = new ServerConnection(std::move(pipeline));
86     Acceptor::addConnection(connection);
87   }
88
89   folly::Future<void> write(Context* ctx, std::exception e) {
90     return ctx->fireWrite(e);
91   }
92
93   /* See Acceptor::onNewConnection for details */
94   void onNewConnection(
95     AsyncSocket::UniquePtr transport, const SocketAddress* address,
96     const std::string& nextProtocolName, const TransportInfo& tinfo) {
97     acceptorPipeline_->read(transport.release());
98   }
99
100   // UDP thunk
101   void onDataAvailable(const folly::SocketAddress& addr,
102                        std::unique_ptr<folly::IOBuf> buf,
103                        bool truncated) noexcept {
104     acceptorPipeline_->read(buf.release());
105   }
106
107  private:
108   EventBase* base_;
109
110   std::shared_ptr<PipelineFactory<Pipeline>> childPipelineFactory_;
111   std::shared_ptr<folly::wangle::ChannelPipeline<
112     void*, std::exception>> acceptorPipeline_;
113 };
114
115 template <typename Pipeline>
116 class ServerAcceptorFactory : public AcceptorFactory {
117  public:
118   explicit ServerAcceptorFactory(
119     std::shared_ptr<PipelineFactory<Pipeline>> factory,
120     std::shared_ptr<PipelineFactory<folly::wangle::ChannelPipeline<
121     void*, std::exception>>> pipeline)
122     : factory_(factory)
123     , pipeline_(pipeline) {}
124
125   std::shared_ptr<Acceptor> newAcceptor(EventBase* base) {
126     std::shared_ptr<folly::wangle::ChannelPipeline<
127                       void*, std::exception>> pipeline(
128                         pipeline_->newPipeline(nullptr));
129     return std::make_shared<ServerAcceptor<Pipeline>>(factory_, pipeline, base);
130   }
131  private:
132   std::shared_ptr<PipelineFactory<Pipeline>> factory_;
133   std::shared_ptr<PipelineFactory<
134     folly::wangle::ChannelPipeline<
135       void*, std::exception>>> pipeline_;
136 };
137
138 class ServerWorkerPool : public folly::wangle::ThreadPoolExecutor::Observer {
139  public:
140   explicit ServerWorkerPool(
141     std::shared_ptr<AcceptorFactory> acceptorFactory,
142     folly::wangle::IOThreadPoolExecutor* exec,
143     std::vector<std::shared_ptr<folly::AsyncSocketBase>>* sockets,
144     std::shared_ptr<ServerSocketFactory> socketFactory)
145       : acceptorFactory_(acceptorFactory)
146       , exec_(exec)
147       , sockets_(sockets)
148       , socketFactory_(socketFactory) {
149     CHECK(exec);
150   }
151
152   template <typename F>
153   void forEachWorker(F&& f) const;
154
155   void threadStarted(
156     folly::wangle::ThreadPoolExecutor::ThreadHandle*);
157   void threadStopped(
158     folly::wangle::ThreadPoolExecutor::ThreadHandle*);
159   void threadPreviouslyStarted(
160       folly::wangle::ThreadPoolExecutor::ThreadHandle* thread) {
161     threadStarted(thread);
162   }
163   void threadNotYetStopped(
164       folly::wangle::ThreadPoolExecutor::ThreadHandle* thread) {
165     threadStopped(thread);
166   }
167
168  private:
169   std::map<folly::wangle::ThreadPoolExecutor::ThreadHandle*,
170            std::shared_ptr<Acceptor>> workers_;
171   std::shared_ptr<AcceptorFactory> acceptorFactory_;
172   folly::wangle::IOThreadPoolExecutor* exec_{nullptr};
173   std::vector<std::shared_ptr<folly::AsyncSocketBase>>* sockets_;
174   std::shared_ptr<ServerSocketFactory> socketFactory_;
175 };
176
177 template <typename F>
178 void ServerWorkerPool::forEachWorker(F&& f) const {
179   for (const auto& kv : workers_) {
180     f(kv.second.get());
181   }
182 }
183
184 class DefaultAcceptPipelineFactory
185     : public PipelineFactory<wangle::ChannelPipeline<void*, std::exception>> {
186   typedef wangle::ChannelPipeline<
187       void*,
188       std::exception> AcceptPipeline;
189
190  public:
191   AcceptPipeline* newPipeline(std::shared_ptr<AsyncSocket>) {
192     return new AcceptPipeline;
193   }
194 };
195
196 } // namespace