2 * Copyright 2015 Facebook, Inc.
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
8 * http://www.apache.org/licenses/LICENSE-2.0
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
19 #include <folly/wangle/channel/HandlerContext.h>
20 #include <folly/futures/Future.h>
21 #include <folly/io/async/AsyncTransport.h>
22 #include <folly/io/async/DelayedDestruction.h>
23 #include <folly/ExceptionWrapper.h>
24 #include <folly/Memory.h>
26 namespace folly { namespace wangle {
28 class PipelineManager {
30 virtual ~PipelineManager() {}
31 virtual void deletePipeline(PipelineBase* pipeline) = 0;
36 void setPipelineManager(PipelineManager* manager) {
40 void deletePipeline() {
42 manager_->deletePipeline(this);
47 PipelineManager* manager_{nullptr};
53 * R is the inbound type, i.e. inbound calls start with pipeline.read(R)
54 * W is the outbound type, i.e. outbound calls start with pipeline.write(W)
56 * Use Nothing for one of the types if your pipeline is unidirectional.
57 * If R is Nothing, read(), readEOF(), and readException() will be disabled.
58 * If W is Nothing, write() and close() will be disabled.
60 template <class R, class W = Nothing>
61 class Pipeline : public PipelineBase, public DelayedDestruction {
66 std::shared_ptr<AsyncTransport> getTransport();
68 void setWriteFlags(WriteFlags flags);
69 WriteFlags getWriteFlags();
71 void setReadBufferSettings(uint64_t minAvailable, uint64_t allocationSize);
72 std::pair<uint64_t, uint64_t> getReadBufferSettings();
74 template <class T = R>
75 typename std::enable_if<!std::is_same<T, Nothing>::value>::type
78 template <class T = R>
79 typename std::enable_if<!std::is_same<T, Nothing>::value>::type
82 template <class T = R>
83 typename std::enable_if<!std::is_same<T, Nothing>::value>::type
84 readException(exception_wrapper e);
86 template <class T = W>
87 typename std::enable_if<!std::is_same<T, Nothing>::value, Future<void>>::type
90 template <class T = W>
91 typename std::enable_if<!std::is_same<T, Nothing>::value, Future<void>>::type
95 Pipeline& addBack(std::shared_ptr<H> handler);
98 Pipeline& addBack(H&& handler);
101 Pipeline& addBack(H* handler);
104 Pipeline& addFront(std::shared_ptr<H> handler);
107 Pipeline& addFront(H&& handler);
110 Pipeline& addFront(H* handler);
113 H* getHandler(int i);
117 // If one of the handlers owns the pipeline itself, use setOwner to ensure
118 // that the pipeline doesn't try to detach the handler during destruction,
119 // lest destruction ordering issues occur.
120 // See thrift/lib/cpp2/async/Cpp2Channel.cpp for an example
122 bool setOwner(H* handler);
124 void attachTransport(std::shared_ptr<AsyncTransport> transport);
126 void detachTransport();
129 explicit Pipeline(bool isStatic);
131 template <class Context>
132 void addContextFront(Context* ctx);
134 void detachHandlers();
137 template <class Context>
138 Pipeline& addHelper(std::shared_ptr<Context>&& ctx, bool front);
140 std::shared_ptr<AsyncTransport> transport_;
141 WriteFlags writeFlags_{WriteFlags::NONE};
142 std::pair<uint64_t, uint64_t> readBufferSettings_{2048, 2048};
144 bool isStatic_{false};
145 std::shared_ptr<PipelineContext> owner_;
146 std::vector<std::shared_ptr<PipelineContext>> ctxs_;
147 std::vector<PipelineContext*> inCtxs_;
148 std::vector<PipelineContext*> outCtxs_;
149 InboundLink<R>* front_{nullptr};
150 OutboundLink<W>* back_{nullptr};
159 template <typename Pipeline>
160 class PipelineFactory {
162 virtual std::unique_ptr<Pipeline, folly::DelayedDestruction::Destructor>
163 newPipeline(std::shared_ptr<AsyncSocket>) = 0;
165 virtual ~PipelineFactory() {}
170 #include <folly/wangle/channel/Pipeline-inl.h>