2 * Copyright 2015 Facebook, Inc.
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
8 * http://www.apache.org/licenses/LICENSE-2.0
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
19 namespace folly { namespace wangle {
21 class PipelineContext {
23 virtual ~PipelineContext() = default;
25 virtual void attachPipeline() = 0;
26 virtual void detachPipeline() = 0;
28 template <class H, class HandlerContext>
29 void attachContext(H* handler, HandlerContext* ctx) {
30 if (++handler->attachCount_ == 1) {
33 handler->ctx_ = nullptr;
37 virtual void setNextIn(PipelineContext* ctx) = 0;
38 virtual void setNextOut(PipelineContext* ctx) = 0;
40 virtual HandlerDir getDirection() = 0;
46 virtual ~InboundLink() = default;
47 virtual void read(In msg) = 0;
48 virtual void readEOF() = 0;
49 virtual void readException(exception_wrapper e) = 0;
50 virtual void transportActive() = 0;
51 virtual void transportInactive() = 0;
57 virtual ~OutboundLink() = default;
58 virtual Future<void> write(Out msg) = 0;
59 virtual Future<void> close() = 0;
62 template <class H, class Context>
63 class ContextImplBase : public PipelineContext {
65 ~ContextImplBase() = default;
68 return handler_.get();
71 void initialize(PipelineBase* pipeline, std::shared_ptr<H> handler) {
73 handler_ = std::move(handler);
76 // PipelineContext overrides
77 void attachPipeline() override {
79 this->attachContext(handler_.get(), impl_);
80 handler_->attachPipeline(impl_);
85 void detachPipeline() override {
86 handler_->detachPipeline(impl_);
90 void setNextIn(PipelineContext* ctx) override {
95 auto nextIn = dynamic_cast<InboundLink<typename H::rout>*>(ctx);
99 throw std::invalid_argument("inbound type mismatch");
103 void setNextOut(PipelineContext* ctx) override {
108 auto nextOut = dynamic_cast<OutboundLink<typename H::wout>*>(ctx);
112 throw std::invalid_argument("outbound type mismatch");
116 HandlerDir getDirection() override {
122 PipelineBase* pipeline_;
123 std::shared_ptr<H> handler_;
124 InboundLink<typename H::rout>* nextIn_{nullptr};
125 OutboundLink<typename H::wout>* nextOut_{nullptr};
128 bool attached_{false};
129 using DestructorGuard = typename DelayedDestruction::DestructorGuard;
134 : public HandlerContext<typename H::rout,
136 public InboundLink<typename H::rin>,
137 public OutboundLink<typename H::win>,
138 public ContextImplBase<H, HandlerContext<typename H::rout,
141 typedef typename H::rin Rin;
142 typedef typename H::rout Rout;
143 typedef typename H::win Win;
144 typedef typename H::wout Wout;
145 static const HandlerDir dir = HandlerDir::BOTH;
147 explicit ContextImpl(PipelineBase* pipeline, std::shared_ptr<H> handler) {
149 this->initialize(pipeline, std::move(handler));
152 // For StaticPipeline
157 ~ContextImpl() = default;
159 // HandlerContext overrides
160 void fireRead(Rout msg) override {
161 DestructorGuard dg(this->pipeline_);
163 this->nextIn_->read(std::forward<Rout>(msg));
165 LOG(WARNING) << "read reached end of pipeline";
169 void fireReadEOF() override {
170 DestructorGuard dg(this->pipeline_);
172 this->nextIn_->readEOF();
174 LOG(WARNING) << "readEOF reached end of pipeline";
178 void fireReadException(exception_wrapper e) override {
179 DestructorGuard dg(this->pipeline_);
181 this->nextIn_->readException(std::move(e));
183 LOG(WARNING) << "readException reached end of pipeline";
187 void fireTransportActive() override {
188 DestructorGuard dg(this->pipeline_);
190 this->nextIn_->transportActive();
194 void fireTransportInactive() override {
195 DestructorGuard dg(this->pipeline_);
197 this->nextIn_->transportInactive();
201 Future<void> fireWrite(Wout msg) override {
202 DestructorGuard dg(this->pipeline_);
203 if (this->nextOut_) {
204 return this->nextOut_->write(std::forward<Wout>(msg));
206 LOG(WARNING) << "write reached end of pipeline";
211 Future<void> fireClose() override {
212 DestructorGuard dg(this->pipeline_);
213 if (this->nextOut_) {
214 return this->nextOut_->close();
216 LOG(WARNING) << "close reached end of pipeline";
221 PipelineBase* getPipeline() override {
222 return this->pipeline_;
225 void setWriteFlags(WriteFlags flags) override {
226 this->pipeline_->setWriteFlags(flags);
229 WriteFlags getWriteFlags() override {
230 return this->pipeline_->getWriteFlags();
233 void setReadBufferSettings(
234 uint64_t minAvailable,
235 uint64_t allocationSize) override {
236 this->pipeline_->setReadBufferSettings(minAvailable, allocationSize);
239 std::pair<uint64_t, uint64_t> getReadBufferSettings() override {
240 return this->pipeline_->getReadBufferSettings();
243 // InboundLink overrides
244 void read(Rin msg) override {
245 DestructorGuard dg(this->pipeline_);
246 this->handler_->read(this, std::forward<Rin>(msg));
249 void readEOF() override {
250 DestructorGuard dg(this->pipeline_);
251 this->handler_->readEOF(this);
254 void readException(exception_wrapper e) override {
255 DestructorGuard dg(this->pipeline_);
256 this->handler_->readException(this, std::move(e));
259 void transportActive() override {
260 DestructorGuard dg(this->pipeline_);
261 this->handler_->transportActive(this);
264 void transportInactive() override {
265 DestructorGuard dg(this->pipeline_);
266 this->handler_->transportInactive(this);
269 // OutboundLink overrides
270 Future<void> write(Win msg) override {
271 DestructorGuard dg(this->pipeline_);
272 return this->handler_->write(this, std::forward<Win>(msg));
275 Future<void> close() override {
276 DestructorGuard dg(this->pipeline_);
277 return this->handler_->close(this);
281 using DestructorGuard = typename DelayedDestruction::DestructorGuard;
285 class InboundContextImpl
286 : public InboundHandlerContext<typename H::rout>,
287 public InboundLink<typename H::rin>,
288 public ContextImplBase<H, InboundHandlerContext<typename H::rout>> {
290 typedef typename H::rin Rin;
291 typedef typename H::rout Rout;
292 typedef typename H::win Win;
293 typedef typename H::wout Wout;
294 static const HandlerDir dir = HandlerDir::IN;
296 explicit InboundContextImpl(
297 PipelineBase* pipeline,
298 std::shared_ptr<H> handler) {
300 this->initialize(pipeline, std::move(handler));
303 // For StaticPipeline
304 InboundContextImpl() {
308 ~InboundContextImpl() = default;
310 // InboundHandlerContext overrides
311 void fireRead(Rout msg) override {
312 DestructorGuard dg(this->pipeline_);
314 this->nextIn_->read(std::forward<Rout>(msg));
316 LOG(WARNING) << "read reached end of pipeline";
320 void fireReadEOF() override {
321 DestructorGuard dg(this->pipeline_);
323 this->nextIn_->readEOF();
325 LOG(WARNING) << "readEOF reached end of pipeline";
329 void fireReadException(exception_wrapper e) override {
330 DestructorGuard dg(this->pipeline_);
332 this->nextIn_->readException(std::move(e));
334 LOG(WARNING) << "readException reached end of pipeline";
338 void fireTransportActive() override {
339 DestructorGuard dg(this->pipeline_);
341 this->nextIn_->transportActive();
345 void fireTransportInactive() override {
346 DestructorGuard dg(this->pipeline_);
348 this->nextIn_->transportInactive();
352 PipelineBase* getPipeline() override {
353 return this->pipeline_;
356 // InboundLink overrides
357 void read(Rin msg) override {
358 DestructorGuard dg(this->pipeline_);
359 this->handler_->read(this, std::forward<Rin>(msg));
362 void readEOF() override {
363 DestructorGuard dg(this->pipeline_);
364 this->handler_->readEOF(this);
367 void readException(exception_wrapper e) override {
368 DestructorGuard dg(this->pipeline_);
369 this->handler_->readException(this, std::move(e));
372 void transportActive() override {
373 DestructorGuard dg(this->pipeline_);
374 this->handler_->transportActive(this);
377 void transportInactive() override {
378 DestructorGuard dg(this->pipeline_);
379 this->handler_->transportInactive(this);
383 using DestructorGuard = typename DelayedDestruction::DestructorGuard;
387 class OutboundContextImpl
388 : public OutboundHandlerContext<typename H::wout>,
389 public OutboundLink<typename H::win>,
390 public ContextImplBase<H, OutboundHandlerContext<typename H::wout>> {
392 typedef typename H::rin Rin;
393 typedef typename H::rout Rout;
394 typedef typename H::win Win;
395 typedef typename H::wout Wout;
396 static const HandlerDir dir = HandlerDir::OUT;
398 explicit OutboundContextImpl(
399 PipelineBase* pipeline,
400 std::shared_ptr<H> handler) {
402 this->initialize(pipeline, std::move(handler));
405 // For StaticPipeline
406 OutboundContextImpl() {
410 ~OutboundContextImpl() = default;
412 // OutboundHandlerContext overrides
413 Future<void> fireWrite(Wout msg) override {
414 DestructorGuard dg(this->pipeline_);
415 if (this->nextOut_) {
416 return this->nextOut_->write(std::forward<Wout>(msg));
418 LOG(WARNING) << "write reached end of pipeline";
423 Future<void> fireClose() override {
424 DestructorGuard dg(this->pipeline_);
425 if (this->nextOut_) {
426 return this->nextOut_->close();
428 LOG(WARNING) << "close reached end of pipeline";
433 PipelineBase* getPipeline() override {
434 return this->pipeline_;
437 // OutboundLink overrides
438 Future<void> write(Win msg) override {
439 DestructorGuard dg(this->pipeline_);
440 return this->handler_->write(this, std::forward<Win>(msg));
443 Future<void> close() override {
444 DestructorGuard dg(this->pipeline_);
445 return this->handler_->close(this);
449 using DestructorGuard = typename DelayedDestruction::DestructorGuard;
452 template <class Handler>
454 typedef typename std::conditional<
455 Handler::dir == HandlerDir::BOTH,
456 ContextImpl<Handler>,
457 typename std::conditional<
458 Handler::dir == HandlerDir::IN,
459 InboundContextImpl<Handler>,
460 OutboundContextImpl<Handler>