2 * Copyright 2015 Facebook, Inc.
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
8 * http://www.apache.org/licenses/LICENSE-2.0
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
19 #include <glog/logging.h>
21 namespace folly { namespace wangle {
23 template <class R, class W>
24 Pipeline<R, W>::Pipeline() : isStatic_(false) {}
26 template <class R, class W>
27 Pipeline<R, W>::Pipeline(bool isStatic) : isStatic_(isStatic) {
31 template <class R, class W>
32 Pipeline<R, W>::~Pipeline() {
38 template <class R, class W>
39 void Pipeline<R, W>::setWriteFlags(WriteFlags flags) {
43 template <class R, class W>
44 WriteFlags Pipeline<R, W>::getWriteFlags() {
48 template <class R, class W>
49 void Pipeline<R, W>::setReadBufferSettings(
50 uint64_t minAvailable,
51 uint64_t allocationSize) {
52 readBufferSettings_ = std::make_pair(minAvailable, allocationSize);
55 template <class R, class W>
56 std::pair<uint64_t, uint64_t> Pipeline<R, W>::getReadBufferSettings() {
57 return readBufferSettings_;
60 template <class R, class W>
62 typename std::enable_if<!std::is_same<T, Unit>::value>::type
63 Pipeline<R, W>::read(R msg) {
65 throw std::invalid_argument("read(): no inbound handler in Pipeline");
67 front_->read(std::forward<R>(msg));
70 template <class R, class W>
72 typename std::enable_if<!std::is_same<T, Unit>::value>::type
73 Pipeline<R, W>::readEOF() {
75 throw std::invalid_argument("readEOF(): no inbound handler in Pipeline");
80 template <class R, class W>
82 typename std::enable_if<!std::is_same<T, Unit>::value>::type
83 Pipeline<R, W>::transportActive() {
85 front_->transportActive();
89 template <class R, class W>
91 typename std::enable_if<!std::is_same<T, Unit>::value>::type
92 Pipeline<R, W>::transportInactive() {
94 front_->transportInactive();
98 template <class R, class W>
100 typename std::enable_if<!std::is_same<T, Unit>::value>::type
101 Pipeline<R, W>::readException(exception_wrapper e) {
103 throw std::invalid_argument(
104 "readException(): no inbound handler in Pipeline");
106 front_->readException(std::move(e));
109 template <class R, class W>
111 typename std::enable_if<!std::is_same<T, Unit>::value, Future<void>>::type
112 Pipeline<R, W>::write(W msg) {
114 throw std::invalid_argument("write(): no outbound handler in Pipeline");
116 return back_->write(std::forward<W>(msg));
119 template <class R, class W>
121 typename std::enable_if<!std::is_same<T, Unit>::value, Future<void>>::type
122 Pipeline<R, W>::close() {
124 throw std::invalid_argument("close(): no outbound handler in Pipeline");
126 return back_->close();
129 template <class R, class W>
131 Pipeline<R, W>& Pipeline<R, W>::addBack(std::shared_ptr<H> handler) {
132 typedef typename ContextType<H, Pipeline<R, W>>::type Context;
133 return addHelper(std::make_shared<Context>(this, std::move(handler)), false);
136 template <class R, class W>
138 Pipeline<R, W>& Pipeline<R, W>::addBack(H&& handler) {
139 return addBack(std::make_shared<H>(std::forward<H>(handler)));
142 template <class R, class W>
144 Pipeline<R, W>& Pipeline<R, W>::addBack(H* handler) {
145 return addBack(std::shared_ptr<H>(handler, [](H*){}));
148 template <class R, class W>
150 Pipeline<R, W>& Pipeline<R, W>::addFront(std::shared_ptr<H> handler) {
151 typedef typename ContextType<H, Pipeline<R, W>>::type Context;
152 return addHelper(std::make_shared<Context>(this, std::move(handler)), true);
155 template <class R, class W>
157 Pipeline<R, W>& Pipeline<R, W>::addFront(H&& handler) {
158 return addFront(std::make_shared<H>(std::forward<H>(handler)));
161 template <class R, class W>
163 Pipeline<R, W>& Pipeline<R, W>::addFront(H* handler) {
164 return addFront(std::shared_ptr<H>(handler, [](H*){}));
167 template <class R, class W>
169 Pipeline<R, W>& Pipeline<R, W>::removeHelper(H* handler, bool checkEqual) {
170 typedef typename ContextType<H, Pipeline<R, W>>::type Context;
171 bool removed = false;
172 for (auto it = ctxs_.begin(); it != ctxs_.end(); it++) {
173 auto ctx = std::dynamic_pointer_cast<Context>(*it);
174 if (ctx && (!checkEqual || ctx->getHandler() == handler)) {
177 if (it == ctxs_.end()) {
184 throw std::invalid_argument("No such handler in pipeline");
190 template <class R, class W>
192 Pipeline<R, W>& Pipeline<R, W>::remove() {
193 return removeHelper<H>(nullptr, false);
196 template <class R, class W>
198 Pipeline<R, W>& Pipeline<R, W>::remove(H* handler) {
199 return removeHelper<H>(handler, true);
202 template <class R, class W>
203 typename Pipeline<R, W>::ContextIterator Pipeline<R, W>::removeAt(
204 const typename Pipeline<R, W>::ContextIterator& it) {
205 (*it)->detachPipeline();
207 const auto dir = (*it)->getDirection();
208 if (dir == HandlerDir::BOTH || dir == HandlerDir::IN) {
209 auto it2 = std::find(inCtxs_.begin(), inCtxs_.end(), it->get());
210 CHECK(it2 != inCtxs_.end());
214 if (dir == HandlerDir::BOTH || dir == HandlerDir::OUT) {
215 auto it2 = std::find(outCtxs_.begin(), outCtxs_.end(), it->get());
216 CHECK(it2 != outCtxs_.end());
220 return ctxs_.erase(it);
223 template <class R, class W>
224 Pipeline<R, W>& Pipeline<R, W>::removeFront() {
226 throw std::invalid_argument("No handlers in pipeline");
228 removeAt(ctxs_.begin());
232 template <class R, class W>
233 Pipeline<R, W>& Pipeline<R, W>::removeBack() {
235 throw std::invalid_argument("No handlers in pipeline");
237 removeAt(--ctxs_.end());
241 template <class R, class W>
243 H* Pipeline<R, W>::getHandler(int i) {
244 typedef typename ContextType<H, Pipeline<R, W>>::type Context;
245 auto ctx = dynamic_cast<Context*>(ctxs_[i].get());
247 return ctx->getHandler();
253 inline void logWarningIfNotUnit(const std::string& warning) {
254 LOG(WARNING) << warning;
258 inline void logWarningIfNotUnit<Unit>(const std::string& warning) {
264 // TODO Have read/write/etc check that pipeline has been finalized
265 template <class R, class W>
266 void Pipeline<R, W>::finalize() {
268 if (!inCtxs_.empty()) {
269 front_ = dynamic_cast<InboundLink<R>*>(inCtxs_.front());
270 for (size_t i = 0; i < inCtxs_.size() - 1; i++) {
271 inCtxs_[i]->setNextIn(inCtxs_[i+1]);
273 inCtxs_.back()->setNextIn(nullptr);
277 if (!outCtxs_.empty()) {
278 back_ = dynamic_cast<OutboundLink<W>*>(outCtxs_.back());
279 for (size_t i = outCtxs_.size() - 1; i > 0; i--) {
280 outCtxs_[i]->setNextOut(outCtxs_[i-1]);
282 outCtxs_.front()->setNextOut(nullptr);
286 detail::logWarningIfNotUnit<R>(
287 "No inbound handler in Pipeline, inbound operations will throw "
288 "std::invalid_argument");
291 detail::logWarningIfNotUnit<W>(
292 "No outbound handler in Pipeline, outbound operations will throw "
293 "std::invalid_argument");
296 for (auto it = ctxs_.rbegin(); it != ctxs_.rend(); it++) {
297 (*it)->attachPipeline();
301 template <class R, class W>
303 bool Pipeline<R, W>::setOwner(H* handler) {
304 typedef typename ContextType<H, Pipeline<R, W>>::type Context;
305 for (auto& ctx : ctxs_) {
306 auto ctxImpl = dynamic_cast<Context*>(ctx.get());
307 if (ctxImpl && ctxImpl->getHandler() == handler) {
315 template <class R, class W>
316 template <class Context>
317 void Pipeline<R, W>::addContextFront(Context* ctx) {
318 addHelper(std::shared_ptr<Context>(ctx, [](Context*){}), true);
321 template <class R, class W>
322 void Pipeline<R, W>::detachHandlers() {
323 for (auto& ctx : ctxs_) {
325 ctx->detachPipeline();
330 template <class R, class W>
331 template <class Context>
332 Pipeline<R, W>& Pipeline<R, W>::addHelper(
333 std::shared_ptr<Context>&& ctx,
335 ctxs_.insert(front ? ctxs_.begin() : ctxs_.end(), ctx);
336 if (Context::dir == HandlerDir::BOTH || Context::dir == HandlerDir::IN) {
337 inCtxs_.insert(front ? inCtxs_.begin() : inCtxs_.end(), ctx.get());
339 if (Context::dir == HandlerDir::BOTH || Context::dir == HandlerDir::OUT) {
340 outCtxs_.insert(front ? outCtxs_.begin() : outCtxs_.end(), ctx.get());