2 * Copyright 2015 Facebook, Inc.
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
8 * http://www.apache.org/licenses/LICENSE-2.0
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
19 #include <folly/wangle/channel/HandlerContext.h>
20 #include <folly/futures/Future.h>
21 #include <folly/io/async/AsyncTransport.h>
22 #include <folly/io/async/DelayedDestruction.h>
23 #include <folly/ExceptionWrapper.h>
24 #include <folly/Memory.h>
25 #include <glog/logging.h>
27 namespace folly { namespace wangle {
30 * R is the inbound type, i.e. inbound calls start with pipeline.read(R)
31 * W is the outbound type, i.e. outbound calls start with pipeline.write(W)
33 template <class R, class W>
34 class Pipeline : public DelayedDestruction {
36 Pipeline() : isStatic_(false) {}
44 std::shared_ptr<AsyncTransport> getTransport() {
48 void setWriteFlags(WriteFlags flags) {
52 WriteFlags getWriteFlags() {
56 void setReadBufferSettings(uint64_t minAvailable, uint64_t allocationSize) {
57 readBufferSettings_ = std::make_pair(minAvailable, allocationSize);
60 std::pair<uint64_t, uint64_t> getReadBufferSettings() {
61 return readBufferSettings_;
65 front_->read(std::forward<R>(msg));
72 void readException(exception_wrapper e) {
73 front_->readException(std::move(e));
76 Future<void> write(W msg) {
77 return back_->write(std::forward<W>(msg));
80 Future<void> close() {
81 return back_->close();
85 Pipeline& addBack(std::shared_ptr<H> handler) {
86 ctxs_.push_back(std::make_shared<ContextImpl<Pipeline, H>>(
93 Pipeline& addBack(H* handler) {
94 return addBack(std::shared_ptr<H>(handler, [](H*){}));
98 Pipeline& addBack(H&& handler) {
99 return addBack(std::make_shared<H>(std::forward<H>(handler)));
103 Pipeline& addFront(std::shared_ptr<H> handler) {
106 std::make_shared<ContextImpl<Pipeline, H>>(this, std::move(handler)));
111 Pipeline& addFront(H* handler) {
112 return addFront(std::shared_ptr<H>(handler, [](H*){}));
116 Pipeline& addFront(H&& handler) {
117 return addFront(std::make_shared<H>(std::forward<H>(handler)));
121 H* getHandler(int i) {
122 auto ctx = dynamic_cast<ContextImpl<Pipeline, H>*>(ctxs_[i].get());
124 return ctx->getHandler();
132 for (size_t i = 0; i < ctxs_.size() - 1; i++) {
133 ctxs_[i]->link(ctxs_[i+1].get());
136 back_ = dynamic_cast<OutboundHandlerContext<W>*>(ctxs_.back().get());
138 throw std::invalid_argument("wrong type for last handler");
141 front_ = dynamic_cast<InboundHandlerContext<R>*>(ctxs_.front().get());
143 throw std::invalid_argument("wrong type for first handler");
146 for (auto it = ctxs_.rbegin(); it != ctxs_.rend(); it++) {
147 (*it)->attachPipeline();
151 // If one of the handlers owns the pipeline itself, use setOwner to ensure
152 // that the pipeline doesn't try to detach the handler during destruction,
153 // lest destruction ordering issues occur.
154 // See thrift/lib/cpp2/async/Cpp2Channel.cpp for an example
156 bool setOwner(H* handler) {
157 for (auto& ctx : ctxs_) {
158 auto ctxImpl = dynamic_cast<ContextImpl<Pipeline, H>*>(ctx.get());
159 if (ctxImpl && ctxImpl->getHandler() == handler) {
167 void attachTransport(
168 std::shared_ptr<AsyncTransport> transport) {
169 transport_ = std::move(transport);
170 for (auto& ctx : ctxs_) {
171 ctx->attachTransport();
175 void detachTransport() {
176 transport_ = nullptr;
177 for (auto& ctx : ctxs_) {
178 ctx->detachTransport();
183 explicit Pipeline(bool isStatic) : isStatic_(isStatic) {
187 template <class Context>
188 void addContextFront(Context* context) {
191 std::shared_ptr<Context>(context, [](Context*){}));
194 void detachHandlers() {
195 for (auto& ctx : ctxs_) {
197 ctx->detachPipeline();
203 std::shared_ptr<AsyncTransport> transport_;
204 WriteFlags writeFlags_{WriteFlags::NONE};
205 std::pair<uint64_t, uint64_t> readBufferSettings_{2048, 2048};
207 bool isStatic_{false};
208 InboundHandlerContext<R>* front_{nullptr};
209 OutboundHandlerContext<W>* back_{nullptr};
210 std::vector<std::shared_ptr<PipelineContext>> ctxs_;
211 std::shared_ptr<PipelineContext> owner_;
220 template <typename Pipeline>
221 class PipelineFactory {
223 virtual Pipeline* newPipeline(std::shared_ptr<AsyncSocket>) = 0;
224 virtual ~PipelineFactory() {}