2 * Copyright 2015 Facebook, Inc.
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
8 * http://www.apache.org/licenses/LICENSE-2.0
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
19 #include <glog/logging.h>
21 namespace folly { namespace wangle {
23 template <class R, class W>
24 Pipeline<R, W>::Pipeline() : isStatic_(false) {}
26 template <class R, class W>
27 Pipeline<R, W>::Pipeline(bool isStatic) : isStatic_(isStatic) {
31 template <class R, class W>
32 Pipeline<R, W>::~Pipeline() {
39 PipelineBase& PipelineBase::addBack(std::shared_ptr<H> handler) {
40 typedef typename ContextType<H>::type Context;
41 return addHelper(std::make_shared<Context>(this, std::move(handler)), false);
45 PipelineBase& PipelineBase::addBack(H&& handler) {
46 return addBack(std::make_shared<H>(std::forward<H>(handler)));
50 PipelineBase& PipelineBase::addBack(H* handler) {
51 return addBack(std::shared_ptr<H>(handler, [](H*){}));
55 PipelineBase& PipelineBase::addFront(std::shared_ptr<H> handler) {
56 typedef typename ContextType<H>::type Context;
57 return addHelper(std::make_shared<Context>(this, std::move(handler)), true);
61 PipelineBase& PipelineBase::addFront(H&& handler) {
62 return addFront(std::make_shared<H>(std::forward<H>(handler)));
66 PipelineBase& PipelineBase::addFront(H* handler) {
67 return addFront(std::shared_ptr<H>(handler, [](H*){}));
71 PipelineBase& PipelineBase::removeHelper(H* handler, bool checkEqual) {
72 typedef typename ContextType<H>::type Context;
74 for (auto it = ctxs_.begin(); it != ctxs_.end(); it++) {
75 auto ctx = std::dynamic_pointer_cast<Context>(*it);
76 if (ctx && (!checkEqual || ctx->getHandler() == handler)) {
79 if (it == ctxs_.end()) {
86 throw std::invalid_argument("No such handler in pipeline");
93 PipelineBase& PipelineBase::remove() {
94 return removeHelper<H>(nullptr, false);
98 PipelineBase& PipelineBase::remove(H* handler) {
99 return removeHelper<H>(handler, true);
103 H* PipelineBase::getHandler(int i) {
104 typedef typename ContextType<H>::type Context;
105 auto ctx = dynamic_cast<Context*>(ctxs_[i].get());
107 return ctx->getHandler();
111 bool PipelineBase::setOwner(H* handler) {
112 typedef typename ContextType<H>::type Context;
113 for (auto& ctx : ctxs_) {
114 auto ctxImpl = dynamic_cast<Context*>(ctx.get());
115 if (ctxImpl && ctxImpl->getHandler() == handler) {
123 template <class Context>
124 void PipelineBase::addContextFront(Context* ctx) {
125 addHelper(std::shared_ptr<Context>(ctx, [](Context*){}), true);
128 template <class Context>
129 PipelineBase& PipelineBase::addHelper(
130 std::shared_ptr<Context>&& ctx,
132 ctxs_.insert(front ? ctxs_.begin() : ctxs_.end(), ctx);
133 if (Context::dir == HandlerDir::BOTH || Context::dir == HandlerDir::IN) {
134 inCtxs_.insert(front ? inCtxs_.begin() : inCtxs_.end(), ctx.get());
136 if (Context::dir == HandlerDir::BOTH || Context::dir == HandlerDir::OUT) {
137 outCtxs_.insert(front ? outCtxs_.begin() : outCtxs_.end(), ctx.get());
145 inline void logWarningIfNotUnit(const std::string& warning) {
146 LOG(WARNING) << warning;
150 inline void logWarningIfNotUnit<Unit>(const std::string& warning) {
156 template <class R, class W>
158 typename std::enable_if<!std::is_same<T, Unit>::value>::type
159 Pipeline<R, W>::read(R msg) {
161 throw std::invalid_argument("read(): no inbound handler in Pipeline");
163 front_->read(std::forward<R>(msg));
166 template <class R, class W>
168 typename std::enable_if<!std::is_same<T, Unit>::value>::type
169 Pipeline<R, W>::readEOF() {
171 throw std::invalid_argument("readEOF(): no inbound handler in Pipeline");
176 template <class R, class W>
178 typename std::enable_if<!std::is_same<T, Unit>::value>::type
179 Pipeline<R, W>::transportActive() {
181 front_->transportActive();
185 template <class R, class W>
187 typename std::enable_if<!std::is_same<T, Unit>::value>::type
188 Pipeline<R, W>::transportInactive() {
190 front_->transportInactive();
194 template <class R, class W>
196 typename std::enable_if<!std::is_same<T, Unit>::value>::type
197 Pipeline<R, W>::readException(exception_wrapper e) {
199 throw std::invalid_argument(
200 "readException(): no inbound handler in Pipeline");
202 front_->readException(std::move(e));
205 template <class R, class W>
207 typename std::enable_if<!std::is_same<T, Unit>::value, Future<Unit>>::type
208 Pipeline<R, W>::write(W msg) {
210 throw std::invalid_argument("write(): no outbound handler in Pipeline");
212 return back_->write(std::forward<W>(msg));
215 template <class R, class W>
217 typename std::enable_if<!std::is_same<T, Unit>::value, Future<Unit>>::type
218 Pipeline<R, W>::close() {
220 throw std::invalid_argument("close(): no outbound handler in Pipeline");
222 return back_->close();
225 // TODO Have read/write/etc check that pipeline has been finalized
226 template <class R, class W>
227 void Pipeline<R, W>::finalize() {
229 if (!inCtxs_.empty()) {
230 front_ = dynamic_cast<InboundLink<R>*>(inCtxs_.front());
231 for (size_t i = 0; i < inCtxs_.size() - 1; i++) {
232 inCtxs_[i]->setNextIn(inCtxs_[i+1]);
234 inCtxs_.back()->setNextIn(nullptr);
238 if (!outCtxs_.empty()) {
239 back_ = dynamic_cast<OutboundLink<W>*>(outCtxs_.back());
240 for (size_t i = outCtxs_.size() - 1; i > 0; i--) {
241 outCtxs_[i]->setNextOut(outCtxs_[i-1]);
243 outCtxs_.front()->setNextOut(nullptr);
247 detail::logWarningIfNotUnit<R>(
248 "No inbound handler in Pipeline, inbound operations will throw "
249 "std::invalid_argument");
252 detail::logWarningIfNotUnit<W>(
253 "No outbound handler in Pipeline, outbound operations will throw "
254 "std::invalid_argument");
257 for (auto it = ctxs_.rbegin(); it != ctxs_.rend(); it++) {
258 (*it)->attachPipeline();