2 * Copyright 2015 Facebook, Inc.
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
8 * http://www.apache.org/licenses/LICENSE-2.0
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
19 #include <folly/io/async/AsyncTransport.h>
20 #include <folly/futures/Future.h>
21 #include <folly/ExceptionWrapper.h>
23 namespace folly { namespace wangle {
25 template <class In, class Out>
26 class HandlerContext {
28 virtual ~HandlerContext() {}
30 virtual void fireRead(In msg) = 0;
31 virtual void fireReadEOF() = 0;
32 virtual void fireReadException(exception_wrapper e) = 0;
34 virtual Future<void> fireWrite(Out msg) = 0;
35 virtual Future<void> fireClose() = 0;
37 virtual std::shared_ptr<AsyncTransport> getTransport() = 0;
39 virtual void setWriteFlags(WriteFlags flags) = 0;
40 virtual WriteFlags getWriteFlags() = 0;
42 virtual void setReadBufferSettings(
43 uint64_t minAvailable,
44 uint64_t allocationSize) = 0;
45 virtual std::pair<uint64_t, uint64_t> getReadBufferSettings() = 0;
49 virtual void addHandlerBefore(H&&) {}
51 virtual void addHandlerAfter(H&&) {}
53 virtual void replaceHandler(H&&) {}
54 virtual void removeHandler() {}
58 class PipelineContext {
60 virtual ~PipelineContext() {}
62 virtual void attachPipeline() = 0;
63 virtual void detachPipeline() = 0;
65 virtual void attachTransport() = 0;
66 virtual void detachTransport() = 0;
68 void link(PipelineContext* other) {
70 other->setNextOut(this);
74 virtual void setNextIn(PipelineContext* ctx) = 0;
75 virtual void setNextOut(PipelineContext* ctx) = 0;
79 class InboundHandlerContext {
81 virtual ~InboundHandlerContext() {}
82 virtual void read(In msg) = 0;
83 virtual void readEOF() = 0;
84 virtual void readException(exception_wrapper e) = 0;
88 class OutboundHandlerContext {
90 virtual ~OutboundHandlerContext() {}
91 virtual Future<void> write(Out msg) = 0;
92 virtual Future<void> close() = 0;
95 template <class P, class H>
96 class ContextImpl : public HandlerContext<typename H::rout,
98 public InboundHandlerContext<typename H::rin>,
99 public OutboundHandlerContext<typename H::win>,
100 public PipelineContext {
102 typedef typename H::rin Rin;
103 typedef typename H::rout Rout;
104 typedef typename H::win Win;
105 typedef typename H::wout Wout;
107 explicit ContextImpl(P* pipeline, std::shared_ptr<H> handler) {
108 initialize(pipeline, std::move(handler));
111 // For StaticPipeline
116 void initialize(P* pipeline, std::shared_ptr<H> handler) {
117 pipeline_ = pipeline;
118 handler_ = std::move(handler);
122 return handler_.get();
125 // PipelineContext overrides
126 void attachPipeline() override {
128 handler_->attachPipeline(this);
133 void detachPipeline() override {
134 handler_->detachPipeline(this);
138 void setNextIn(PipelineContext* ctx) override {
139 auto nextIn = dynamic_cast<InboundHandlerContext<Rout>*>(ctx);
143 throw std::invalid_argument("wrong type in setNextIn");
147 void setNextOut(PipelineContext* ctx) override {
148 auto nextOut = dynamic_cast<OutboundHandlerContext<Wout>*>(ctx);
152 throw std::invalid_argument("wrong type in setNextOut");
156 void attachTransport() override {
157 typename P::DestructorGuard dg(static_cast<DelayedDestruction*>(pipeline_));
158 handler_->attachTransport(this);
161 void detachTransport() override {
162 typename P::DestructorGuard dg(static_cast<DelayedDestruction*>(pipeline_));
163 handler_->detachTransport(this);
166 // HandlerContext overrides
167 void fireRead(Rout msg) override {
168 typename P::DestructorGuard dg(static_cast<DelayedDestruction*>(pipeline_));
170 nextIn_->read(std::forward<Rout>(msg));
172 LOG(WARNING) << "read reached end of pipeline";
176 void fireReadEOF() override {
177 typename P::DestructorGuard dg(static_cast<DelayedDestruction*>(pipeline_));
181 LOG(WARNING) << "readEOF reached end of pipeline";
185 void fireReadException(exception_wrapper e) override {
186 typename P::DestructorGuard dg(static_cast<DelayedDestruction*>(pipeline_));
188 nextIn_->readException(std::move(e));
190 LOG(WARNING) << "readException reached end of pipeline";
194 Future<void> fireWrite(Wout msg) override {
195 typename P::DestructorGuard dg(static_cast<DelayedDestruction*>(pipeline_));
197 return nextOut_->write(std::forward<Wout>(msg));
199 LOG(WARNING) << "write reached end of pipeline";
204 Future<void> fireClose() override {
205 typename P::DestructorGuard dg(static_cast<DelayedDestruction*>(pipeline_));
207 return nextOut_->close();
209 LOG(WARNING) << "close reached end of pipeline";
214 std::shared_ptr<AsyncTransport> getTransport() override {
215 return pipeline_->getTransport();
218 void setWriteFlags(WriteFlags flags) override {
219 pipeline_->setWriteFlags(flags);
222 WriteFlags getWriteFlags() override {
223 return pipeline_->getWriteFlags();
226 void setReadBufferSettings(
227 uint64_t minAvailable,
228 uint64_t allocationSize) override {
229 pipeline_->setReadBufferSettings(minAvailable, allocationSize);
232 std::pair<uint64_t, uint64_t> getReadBufferSettings() override {
233 return pipeline_->getReadBufferSettings();
236 // InboundHandlerContext overrides
237 void read(Rin msg) override {
238 typename P::DestructorGuard dg(static_cast<DelayedDestruction*>(pipeline_));
239 handler_->read(this, std::forward<Rin>(msg));
242 void readEOF() override {
243 typename P::DestructorGuard dg(static_cast<DelayedDestruction*>(pipeline_));
244 handler_->readEOF(this);
247 void readException(exception_wrapper e) override {
248 typename P::DestructorGuard dg(static_cast<DelayedDestruction*>(pipeline_));
249 handler_->readException(this, std::move(e));
252 // OutboundHandlerContext overrides
253 Future<void> write(Win msg) override {
254 typename P::DestructorGuard dg(static_cast<DelayedDestruction*>(pipeline_));
255 return handler_->write(this, std::forward<Win>(msg));
258 Future<void> close() override {
259 typename P::DestructorGuard dg(static_cast<DelayedDestruction*>(pipeline_));
260 return handler_->close(this);
265 std::shared_ptr<H> handler_;
266 InboundHandlerContext<Rout>* nextIn_{nullptr};
267 OutboundHandlerContext<Wout>* nextOut_{nullptr};
268 bool attached_{false};