2 * Copyright 2015 Facebook, Inc.
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
8 * http://www.apache.org/licenses/LICENSE-2.0
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
19 namespace folly { namespace wangle {
21 class PipelineContext {
23 virtual ~PipelineContext() = default;
25 virtual void attachPipeline() = 0;
26 virtual void detachPipeline() = 0;
28 template <class H, class HandlerContext>
29 void attachContext(H* handler, HandlerContext* ctx) {
30 if (++handler->attachCount_ == 1) {
33 handler->ctx_ = nullptr;
37 virtual void setNextIn(PipelineContext* ctx) = 0;
38 virtual void setNextOut(PipelineContext* ctx) = 0;
44 virtual ~InboundLink() = default;
45 virtual void read(In msg) = 0;
46 virtual void readEOF() = 0;
47 virtual void readException(exception_wrapper e) = 0;
48 virtual void transportActive() = 0;
49 virtual void transportInactive() = 0;
55 virtual ~OutboundLink() = default;
56 virtual Future<void> write(Out msg) = 0;
57 virtual Future<void> close() = 0;
60 template <class P, class H, class Context>
61 class ContextImplBase : public PipelineContext {
63 ~ContextImplBase() = default;
66 return handler_.get();
69 void initialize(P* pipeline, std::shared_ptr<H> handler) {
71 handler_ = std::move(handler);
74 // PipelineContext overrides
75 void attachPipeline() override {
77 this->attachContext(handler_.get(), impl_);
78 handler_->attachPipeline(impl_);
83 void detachPipeline() override {
84 handler_->detachPipeline(impl_);
88 void setNextIn(PipelineContext* ctx) override {
89 auto nextIn = dynamic_cast<InboundLink<typename H::rout>*>(ctx);
93 throw std::invalid_argument("inbound type mismatch");
97 void setNextOut(PipelineContext* ctx) override {
98 auto nextOut = dynamic_cast<OutboundLink<typename H::wout>*>(ctx);
102 throw std::invalid_argument("outbound type mismatch");
109 std::shared_ptr<H> handler_;
110 InboundLink<typename H::rout>* nextIn_{nullptr};
111 OutboundLink<typename H::wout>* nextOut_{nullptr};
114 bool attached_{false};
115 using DestructorGuard = typename P::DestructorGuard;
118 template <class P, class H>
120 : public HandlerContext<typename H::rout,
122 public InboundLink<typename H::rin>,
123 public OutboundLink<typename H::win>,
124 public ContextImplBase<P, H, HandlerContext<typename H::rout,
127 typedef typename H::rin Rin;
128 typedef typename H::rout Rout;
129 typedef typename H::win Win;
130 typedef typename H::wout Wout;
131 static const HandlerDir dir = HandlerDir::BOTH;
133 explicit ContextImpl(P* pipeline, std::shared_ptr<H> handler) {
135 this->initialize(pipeline, std::move(handler));
138 // For StaticPipeline
143 ~ContextImpl() = default;
145 // HandlerContext overrides
146 void fireRead(Rout msg) override {
147 DestructorGuard dg(this->pipeline_);
149 this->nextIn_->read(std::forward<Rout>(msg));
151 LOG(WARNING) << "read reached end of pipeline";
155 void fireReadEOF() override {
156 DestructorGuard dg(this->pipeline_);
158 this->nextIn_->readEOF();
160 LOG(WARNING) << "readEOF reached end of pipeline";
164 void fireReadException(exception_wrapper e) override {
165 DestructorGuard dg(this->pipeline_);
167 this->nextIn_->readException(std::move(e));
169 LOG(WARNING) << "readException reached end of pipeline";
173 void fireTransportActive() override {
174 DestructorGuard dg(this->pipeline_);
176 this->nextIn_->transportActive();
180 void fireTransportInactive() override {
181 DestructorGuard dg(this->pipeline_);
183 this->nextIn_->transportInactive();
187 Future<void> fireWrite(Wout msg) override {
188 DestructorGuard dg(this->pipeline_);
189 if (this->nextOut_) {
190 return this->nextOut_->write(std::forward<Wout>(msg));
192 LOG(WARNING) << "write reached end of pipeline";
197 Future<void> fireClose() override {
198 DestructorGuard dg(this->pipeline_);
199 if (this->nextOut_) {
200 return this->nextOut_->close();
202 LOG(WARNING) << "close reached end of pipeline";
207 PipelineBase* getPipeline() override {
208 return this->pipeline_;
211 void setWriteFlags(WriteFlags flags) override {
212 this->pipeline_->setWriteFlags(flags);
215 WriteFlags getWriteFlags() override {
216 return this->pipeline_->getWriteFlags();
219 void setReadBufferSettings(
220 uint64_t minAvailable,
221 uint64_t allocationSize) override {
222 this->pipeline_->setReadBufferSettings(minAvailable, allocationSize);
225 std::pair<uint64_t, uint64_t> getReadBufferSettings() override {
226 return this->pipeline_->getReadBufferSettings();
229 // InboundLink overrides
230 void read(Rin msg) override {
231 DestructorGuard dg(this->pipeline_);
232 this->handler_->read(this, std::forward<Rin>(msg));
235 void readEOF() override {
236 DestructorGuard dg(this->pipeline_);
237 this->handler_->readEOF(this);
240 void readException(exception_wrapper e) override {
241 DestructorGuard dg(this->pipeline_);
242 this->handler_->readException(this, std::move(e));
245 void transportActive() override {
246 DestructorGuard dg(this->pipeline_);
247 this->handler_->transportActive(this);
250 void transportInactive() override {
251 DestructorGuard dg(this->pipeline_);
252 this->handler_->transportInactive(this);
255 // OutboundLink overrides
256 Future<void> write(Win msg) override {
257 DestructorGuard dg(this->pipeline_);
258 return this->handler_->write(this, std::forward<Win>(msg));
261 Future<void> close() override {
262 DestructorGuard dg(this->pipeline_);
263 return this->handler_->close(this);
267 using DestructorGuard = typename P::DestructorGuard;
270 template <class P, class H>
271 class InboundContextImpl
272 : public InboundHandlerContext<typename H::rout>,
273 public InboundLink<typename H::rin>,
274 public ContextImplBase<P, H, InboundHandlerContext<typename H::rout>> {
276 typedef typename H::rin Rin;
277 typedef typename H::rout Rout;
278 typedef typename H::win Win;
279 typedef typename H::wout Wout;
280 static const HandlerDir dir = HandlerDir::IN;
282 explicit InboundContextImpl(P* pipeline, std::shared_ptr<H> handler) {
284 this->initialize(pipeline, std::move(handler));
287 // For StaticPipeline
288 InboundContextImpl() {
292 ~InboundContextImpl() = default;
294 // InboundHandlerContext overrides
295 void fireRead(Rout msg) override {
296 DestructorGuard dg(this->pipeline_);
298 this->nextIn_->read(std::forward<Rout>(msg));
300 LOG(WARNING) << "read reached end of pipeline";
304 void fireReadEOF() override {
305 DestructorGuard dg(this->pipeline_);
307 this->nextIn_->readEOF();
309 LOG(WARNING) << "readEOF reached end of pipeline";
313 void fireReadException(exception_wrapper e) override {
314 DestructorGuard dg(this->pipeline_);
316 this->nextIn_->readException(std::move(e));
318 LOG(WARNING) << "readException reached end of pipeline";
322 void fireTransportActive() override {
323 DestructorGuard dg(this->pipeline_);
325 this->nextIn_->transportActive();
329 void fireTransportInactive() override {
330 DestructorGuard dg(this->pipeline_);
332 this->nextIn_->transportInactive();
336 PipelineBase* getPipeline() override {
337 return this->pipeline_;
340 // InboundLink overrides
341 void read(Rin msg) override {
342 DestructorGuard dg(this->pipeline_);
343 this->handler_->read(this, std::forward<Rin>(msg));
346 void readEOF() override {
347 DestructorGuard dg(this->pipeline_);
348 this->handler_->readEOF(this);
351 void readException(exception_wrapper e) override {
352 DestructorGuard dg(this->pipeline_);
353 this->handler_->readException(this, std::move(e));
356 void transportActive() override {
357 DestructorGuard dg(this->pipeline_);
358 this->handler_->transportActive(this);
361 void transportInactive() override {
362 DestructorGuard dg(this->pipeline_);
363 this->handler_->transportInactive(this);
367 using DestructorGuard = typename P::DestructorGuard;
370 template <class P, class H>
371 class OutboundContextImpl
372 : public OutboundHandlerContext<typename H::wout>,
373 public OutboundLink<typename H::win>,
374 public ContextImplBase<P, H, OutboundHandlerContext<typename H::wout>> {
376 typedef typename H::rin Rin;
377 typedef typename H::rout Rout;
378 typedef typename H::win Win;
379 typedef typename H::wout Wout;
380 static const HandlerDir dir = HandlerDir::OUT;
382 explicit OutboundContextImpl(P* pipeline, std::shared_ptr<H> handler) {
384 this->initialize(pipeline, std::move(handler));
387 // For StaticPipeline
388 OutboundContextImpl() {
392 ~OutboundContextImpl() = default;
394 // OutboundHandlerContext overrides
395 Future<void> fireWrite(Wout msg) override {
396 DestructorGuard dg(this->pipeline_);
397 if (this->nextOut_) {
398 return this->nextOut_->write(std::forward<Wout>(msg));
400 LOG(WARNING) << "write reached end of pipeline";
405 Future<void> fireClose() override {
406 DestructorGuard dg(this->pipeline_);
407 if (this->nextOut_) {
408 return this->nextOut_->close();
410 LOG(WARNING) << "close reached end of pipeline";
415 PipelineBase* getPipeline() override {
416 return this->pipeline_;
419 // OutboundLink overrides
420 Future<void> write(Win msg) override {
421 DestructorGuard dg(this->pipeline_);
422 return this->handler_->write(this, std::forward<Win>(msg));
425 Future<void> close() override {
426 DestructorGuard dg(this->pipeline_);
427 return this->handler_->close(this);
431 using DestructorGuard = typename P::DestructorGuard;
434 template <class Handler, class Pipeline>
436 typedef typename std::conditional<
437 Handler::dir == HandlerDir::BOTH,
438 ContextImpl<Pipeline, Handler>,
439 typename std::conditional<
440 Handler::dir == HandlerDir::IN,
441 InboundContextImpl<Pipeline, Handler>,
442 OutboundContextImpl<Pipeline, Handler>