2 * Copyright 2015 Facebook, Inc.
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
8 * http://www.apache.org/licenses/LICENSE-2.0
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
19 namespace folly { namespace wangle {
21 class PipelineContext {
23 virtual ~PipelineContext() {}
25 virtual void attachPipeline() = 0;
26 virtual void detachPipeline() = 0;
28 virtual void attachTransport() = 0;
29 virtual void detachTransport() = 0;
31 template <class H, class HandlerContext>
32 void attachContext(H* handler, HandlerContext* ctx) {
33 if (++handler->attachCount_ == 1) {
36 handler->ctx_ = nullptr;
40 virtual void setNextIn(PipelineContext* ctx) = 0;
41 virtual void setNextOut(PipelineContext* ctx) = 0;
47 virtual ~InboundLink() {}
48 virtual void read(In msg) = 0;
49 virtual void readEOF() = 0;
50 virtual void readException(exception_wrapper e) = 0;
56 virtual ~OutboundLink() {}
57 virtual Future<void> write(Out msg) = 0;
58 virtual Future<void> close() = 0;
61 template <class P, class H, class Context>
62 class ContextImplBase : public PipelineContext {
67 return handler_.get();
70 void initialize(P* pipeline, std::shared_ptr<H> handler) {
72 handler_ = std::move(handler);
75 // PipelineContext overrides
76 void attachPipeline() override {
78 this->attachContext(handler_.get(), impl_);
79 handler_->attachPipeline(impl_);
84 void detachPipeline() override {
85 handler_->detachPipeline(impl_);
89 void attachTransport() override {
90 DestructorGuard dg(pipeline_);
91 handler_->attachTransport(impl_);
94 void detachTransport() override {
95 DestructorGuard dg(pipeline_);
96 handler_->detachTransport(impl_);
99 void setNextIn(PipelineContext* ctx) override {
100 auto nextIn = dynamic_cast<InboundLink<typename H::rout>*>(ctx);
104 throw std::invalid_argument("inbound type mismatch");
108 void setNextOut(PipelineContext* ctx) override {
109 auto nextOut = dynamic_cast<OutboundLink<typename H::wout>*>(ctx);
113 throw std::invalid_argument("outbound type mismatch");
120 std::shared_ptr<H> handler_;
121 InboundLink<typename H::rout>* nextIn_{nullptr};
122 OutboundLink<typename H::wout>* nextOut_{nullptr};
125 bool attached_{false};
126 using DestructorGuard = typename P::DestructorGuard;
129 template <class P, class H>
131 : public HandlerContext<typename H::rout,
133 public InboundLink<typename H::rin>,
134 public OutboundLink<typename H::win>,
135 public ContextImplBase<P, H, HandlerContext<typename H::rout,
138 typedef typename H::rin Rin;
139 typedef typename H::rout Rout;
140 typedef typename H::win Win;
141 typedef typename H::wout Wout;
142 static const HandlerDir dir = HandlerDir::BOTH;
144 explicit ContextImpl(P* pipeline, std::shared_ptr<H> handler) {
146 this->initialize(pipeline, std::move(handler));
149 // For StaticPipeline
156 // HandlerContext overrides
157 void fireRead(Rout msg) override {
158 DestructorGuard dg(this->pipeline_);
160 this->nextIn_->read(std::forward<Rout>(msg));
162 LOG(WARNING) << "read reached end of pipeline";
166 void fireReadEOF() override {
167 DestructorGuard dg(this->pipeline_);
169 this->nextIn_->readEOF();
171 LOG(WARNING) << "readEOF reached end of pipeline";
175 void fireReadException(exception_wrapper e) override {
176 DestructorGuard dg(this->pipeline_);
178 this->nextIn_->readException(std::move(e));
180 LOG(WARNING) << "readException reached end of pipeline";
184 Future<void> fireWrite(Wout msg) override {
185 DestructorGuard dg(this->pipeline_);
186 if (this->nextOut_) {
187 return this->nextOut_->write(std::forward<Wout>(msg));
189 LOG(WARNING) << "write reached end of pipeline";
194 Future<void> fireClose() override {
195 DestructorGuard dg(this->pipeline_);
196 if (this->nextOut_) {
197 return this->nextOut_->close();
199 LOG(WARNING) << "close reached end of pipeline";
204 PipelineBase* getPipeline() override {
205 return this->pipeline_;
208 std::shared_ptr<AsyncTransport> getTransport() override {
209 return this->pipeline_->getTransport();
212 void setWriteFlags(WriteFlags flags) override {
213 this->pipeline_->setWriteFlags(flags);
216 WriteFlags getWriteFlags() override {
217 return this->pipeline_->getWriteFlags();
220 void setReadBufferSettings(
221 uint64_t minAvailable,
222 uint64_t allocationSize) override {
223 this->pipeline_->setReadBufferSettings(minAvailable, allocationSize);
226 std::pair<uint64_t, uint64_t> getReadBufferSettings() override {
227 return this->pipeline_->getReadBufferSettings();
230 // InboundLink overrides
231 void read(Rin msg) override {
232 DestructorGuard dg(this->pipeline_);
233 this->handler_->read(this, std::forward<Rin>(msg));
236 void readEOF() override {
237 DestructorGuard dg(this->pipeline_);
238 this->handler_->readEOF(this);
241 void readException(exception_wrapper e) override {
242 DestructorGuard dg(this->pipeline_);
243 this->handler_->readException(this, std::move(e));
246 // OutboundLink overrides
247 Future<void> write(Win msg) override {
248 DestructorGuard dg(this->pipeline_);
249 return this->handler_->write(this, std::forward<Win>(msg));
252 Future<void> close() override {
253 DestructorGuard dg(this->pipeline_);
254 return this->handler_->close(this);
258 using DestructorGuard = typename P::DestructorGuard;
261 template <class P, class H>
262 class InboundContextImpl
263 : public InboundHandlerContext<typename H::rout>,
264 public InboundLink<typename H::rin>,
265 public ContextImplBase<P, H, InboundHandlerContext<typename H::rout>> {
267 typedef typename H::rin Rin;
268 typedef typename H::rout Rout;
269 typedef typename H::win Win;
270 typedef typename H::wout Wout;
271 static const HandlerDir dir = HandlerDir::IN;
273 explicit InboundContextImpl(P* pipeline, std::shared_ptr<H> handler) {
275 this->initialize(pipeline, std::move(handler));
278 // For StaticPipeline
279 InboundContextImpl() {
283 ~InboundContextImpl() {}
285 // InboundHandlerContext overrides
286 void fireRead(Rout msg) override {
287 DestructorGuard dg(this->pipeline_);
289 this->nextIn_->read(std::forward<Rout>(msg));
291 LOG(WARNING) << "read reached end of pipeline";
295 void fireReadEOF() override {
296 DestructorGuard dg(this->pipeline_);
298 this->nextIn_->readEOF();
300 LOG(WARNING) << "readEOF reached end of pipeline";
304 void fireReadException(exception_wrapper e) override {
305 DestructorGuard dg(this->pipeline_);
307 this->nextIn_->readException(std::move(e));
309 LOG(WARNING) << "readException reached end of pipeline";
313 PipelineBase* getPipeline() override {
314 return this->pipeline_;
317 std::shared_ptr<AsyncTransport> getTransport() override {
318 return this->pipeline_->getTransport();
321 // InboundLink overrides
322 void read(Rin msg) override {
323 DestructorGuard dg(this->pipeline_);
324 this->handler_->read(this, std::forward<Rin>(msg));
327 void readEOF() override {
328 DestructorGuard dg(this->pipeline_);
329 this->handler_->readEOF(this);
332 void readException(exception_wrapper e) override {
333 DestructorGuard dg(this->pipeline_);
334 this->handler_->readException(this, std::move(e));
338 using DestructorGuard = typename P::DestructorGuard;
341 template <class P, class H>
342 class OutboundContextImpl
343 : public OutboundHandlerContext<typename H::wout>,
344 public OutboundLink<typename H::win>,
345 public ContextImplBase<P, H, OutboundHandlerContext<typename H::wout>> {
347 typedef typename H::rin Rin;
348 typedef typename H::rout Rout;
349 typedef typename H::win Win;
350 typedef typename H::wout Wout;
351 static const HandlerDir dir = HandlerDir::OUT;
353 explicit OutboundContextImpl(P* pipeline, std::shared_ptr<H> handler) {
355 this->initialize(pipeline, std::move(handler));
358 // For StaticPipeline
359 OutboundContextImpl() {
363 ~OutboundContextImpl() {}
365 // OutboundHandlerContext overrides
366 Future<void> fireWrite(Wout msg) override {
367 DestructorGuard dg(this->pipeline_);
368 if (this->nextOut_) {
369 return this->nextOut_->write(std::forward<Wout>(msg));
371 LOG(WARNING) << "write reached end of pipeline";
376 Future<void> fireClose() override {
377 DestructorGuard dg(this->pipeline_);
378 if (this->nextOut_) {
379 return this->nextOut_->close();
381 LOG(WARNING) << "close reached end of pipeline";
386 PipelineBase* getPipeline() override {
387 return this->pipeline_;
390 std::shared_ptr<AsyncTransport> getTransport() override {
391 return this->pipeline_->getTransport();
394 // OutboundLink overrides
395 Future<void> write(Win msg) override {
396 DestructorGuard dg(this->pipeline_);
397 return this->handler_->write(this, std::forward<Win>(msg));
400 Future<void> close() override {
401 DestructorGuard dg(this->pipeline_);
402 return this->handler_->close(this);
406 using DestructorGuard = typename P::DestructorGuard;
409 template <class Handler, class Pipeline>
411 typedef typename std::conditional<
412 Handler::dir == HandlerDir::BOTH,
413 ContextImpl<Pipeline, Handler>,
414 typename std::conditional<
415 Handler::dir == HandlerDir::IN,
416 InboundContextImpl<Pipeline, Handler>,
417 OutboundContextImpl<Pipeline, Handler>