2 * Copyright 2015 Facebook, Inc.
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
8 * http://www.apache.org/licenses/LICENSE-2.0
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
19 #include <folly/wangle/channel/HandlerContext.h>
20 #include <folly/futures/Future.h>
21 #include <folly/futures/Unit.h>
22 #include <folly/io/async/AsyncTransport.h>
23 #include <folly/io/async/DelayedDestruction.h>
24 #include <folly/ExceptionWrapper.h>
25 #include <folly/Memory.h>
27 namespace folly { namespace wangle {
29 class PipelineManager {
31 virtual ~PipelineManager() = default;
32 virtual void deletePipeline(PipelineBase* pipeline) = 0;
35 class PipelineBase : public DelayedDestruction {
37 virtual ~PipelineBase() = default;
39 void setPipelineManager(PipelineManager* manager) {
43 void deletePipeline() {
45 manager_->deletePipeline(this);
49 void setTransport(std::shared_ptr<AsyncTransport> transport) {
50 transport_ = transport;
53 std::shared_ptr<AsyncTransport> getTransport() {
58 PipelineManager* manager_{nullptr};
59 std::shared_ptr<AsyncTransport> transport_;
63 * R is the inbound type, i.e. inbound calls start with pipeline.read(R)
64 * W is the outbound type, i.e. outbound calls start with pipeline.write(W)
66 * Use Unit for one of the types if your pipeline is unidirectional.
67 * If R is Unit, read(), readEOF(), and readException() will be disabled.
68 * If W is Unit, write() and close() will be disabled.
70 template <class R, class W = Unit>
71 class Pipeline : public PipelineBase {
76 void setWriteFlags(WriteFlags flags);
77 WriteFlags getWriteFlags();
79 void setReadBufferSettings(uint64_t minAvailable, uint64_t allocationSize);
80 std::pair<uint64_t, uint64_t> getReadBufferSettings();
82 template <class T = R>
83 typename std::enable_if<!std::is_same<T, Unit>::value>::type
86 template <class T = R>
87 typename std::enable_if<!std::is_same<T, Unit>::value>::type
90 template <class T = R>
91 typename std::enable_if<!std::is_same<T, Unit>::value>::type
92 readException(exception_wrapper e);
94 template <class T = R>
95 typename std::enable_if<!std::is_same<T, Unit>::value>::type
98 template <class T = R>
99 typename std::enable_if<!std::is_same<T, Unit>::value>::type
102 template <class T = W>
103 typename std::enable_if<!std::is_same<T, Unit>::value, Future<void>>::type
106 template <class T = W>
107 typename std::enable_if<!std::is_same<T, Unit>::value, Future<void>>::type
111 Pipeline& addBack(std::shared_ptr<H> handler);
114 Pipeline& addBack(H&& handler);
117 Pipeline& addBack(H* handler);
120 Pipeline& addFront(std::shared_ptr<H> handler);
123 Pipeline& addFront(H&& handler);
126 Pipeline& addFront(H* handler);
129 Pipeline& remove(H* handler);
134 Pipeline& removeFront();
136 Pipeline& removeBack();
139 H* getHandler(int i);
143 // If one of the handlers owns the pipeline itself, use setOwner to ensure
144 // that the pipeline doesn't try to detach the handler during destruction,
145 // lest destruction ordering issues occur.
146 // See thrift/lib/cpp2/async/Cpp2Channel.cpp for an example
148 bool setOwner(H* handler);
151 explicit Pipeline(bool isStatic);
153 template <class Context>
154 void addContextFront(Context* ctx);
156 void detachHandlers();
159 template <class Context>
160 Pipeline& addHelper(std::shared_ptr<Context>&& ctx, bool front);
163 Pipeline& removeHelper(H* handler, bool checkEqual);
165 typedef std::vector<std::shared_ptr<PipelineContext>>::iterator
168 ContextIterator removeAt(const ContextIterator& it);
170 WriteFlags writeFlags_{WriteFlags::NONE};
171 std::pair<uint64_t, uint64_t> readBufferSettings_{2048, 2048};
173 bool isStatic_{false};
174 std::shared_ptr<PipelineContext> owner_;
175 std::vector<std::shared_ptr<PipelineContext>> ctxs_;
176 std::vector<PipelineContext*> inCtxs_;
177 std::vector<PipelineContext*> outCtxs_;
178 InboundLink<R>* front_{nullptr};
179 OutboundLink<W>* back_{nullptr};
188 template <typename Pipeline>
189 class PipelineFactory {
191 virtual std::unique_ptr<Pipeline, folly::DelayedDestruction::Destructor>
192 newPipeline(std::shared_ptr<AsyncSocket>) = 0;
194 virtual ~PipelineFactory() = default;
199 #include <folly/wangle/channel/Pipeline-inl.h>