2 * Copyright 2015 Facebook, Inc.
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
8 * http://www.apache.org/licenses/LICENSE-2.0
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
19 #include <folly/wangle/channel/HandlerContext.h>
20 #include <folly/futures/Future.h>
21 #include <folly/io/async/AsyncTransport.h>
22 #include <folly/io/async/DelayedDestruction.h>
23 #include <folly/ExceptionWrapper.h>
24 #include <folly/Memory.h>
26 namespace folly { namespace wangle {
28 class PipelineManager {
30 virtual ~PipelineManager() {}
31 virtual void deletePipeline(PipelineBase* pipeline) = 0;
36 virtual ~PipelineBase() {}
38 void setPipelineManager(PipelineManager* manager) {
42 void deletePipeline() {
44 manager_->deletePipeline(this);
48 void setTransport(std::shared_ptr<AsyncTransport> transport) {
49 transport_ = transport;
52 std::shared_ptr<AsyncTransport> getTransport() {
57 PipelineManager* manager_{nullptr};
58 std::shared_ptr<AsyncTransport> transport_;
64 * R is the inbound type, i.e. inbound calls start with pipeline.read(R)
65 * W is the outbound type, i.e. outbound calls start with pipeline.write(W)
67 * Use Nothing for one of the types if your pipeline is unidirectional.
68 * If R is Nothing, read(), readEOF(), and readException() will be disabled.
69 * If W is Nothing, write() and close() will be disabled.
71 template <class R, class W = Nothing>
72 class Pipeline : public PipelineBase, public DelayedDestruction {
77 void setWriteFlags(WriteFlags flags);
78 WriteFlags getWriteFlags();
80 void setReadBufferSettings(uint64_t minAvailable, uint64_t allocationSize);
81 std::pair<uint64_t, uint64_t> getReadBufferSettings();
83 template <class T = R>
84 typename std::enable_if<!std::is_same<T, Nothing>::value>::type
87 template <class T = R>
88 typename std::enable_if<!std::is_same<T, Nothing>::value>::type
91 template <class T = R>
92 typename std::enable_if<!std::is_same<T, Nothing>::value>::type
93 readException(exception_wrapper e);
95 template <class T = R>
96 typename std::enable_if<!std::is_same<T, Nothing>::value>::type
99 template <class T = R>
100 typename std::enable_if<!std::is_same<T, Nothing>::value>::type
103 template <class T = W>
104 typename std::enable_if<!std::is_same<T, Nothing>::value, Future<void>>::type
107 template <class T = W>
108 typename std::enable_if<!std::is_same<T, Nothing>::value, Future<void>>::type
112 Pipeline& addBack(std::shared_ptr<H> handler);
115 Pipeline& addBack(H&& handler);
118 Pipeline& addBack(H* handler);
121 Pipeline& addFront(std::shared_ptr<H> handler);
124 Pipeline& addFront(H&& handler);
127 Pipeline& addFront(H* handler);
130 H* getHandler(int i);
134 // If one of the handlers owns the pipeline itself, use setOwner to ensure
135 // that the pipeline doesn't try to detach the handler during destruction,
136 // lest destruction ordering issues occur.
137 // See thrift/lib/cpp2/async/Cpp2Channel.cpp for an example
139 bool setOwner(H* handler);
142 explicit Pipeline(bool isStatic);
144 template <class Context>
145 void addContextFront(Context* ctx);
147 void detachHandlers();
150 template <class Context>
151 Pipeline& addHelper(std::shared_ptr<Context>&& ctx, bool front);
153 WriteFlags writeFlags_{WriteFlags::NONE};
154 std::pair<uint64_t, uint64_t> readBufferSettings_{2048, 2048};
156 bool isStatic_{false};
157 std::shared_ptr<PipelineContext> owner_;
158 std::vector<std::shared_ptr<PipelineContext>> ctxs_;
159 std::vector<PipelineContext*> inCtxs_;
160 std::vector<PipelineContext*> outCtxs_;
161 InboundLink<R>* front_{nullptr};
162 OutboundLink<W>* back_{nullptr};
171 template <typename Pipeline>
172 class PipelineFactory {
174 virtual std::unique_ptr<Pipeline, folly::DelayedDestruction::Destructor>
175 newPipeline(std::shared_ptr<AsyncSocket>) = 0;
177 virtual ~PipelineFactory() {}
182 #include <folly/wangle/channel/Pipeline-inl.h>