rearrange Pipeline to have more functionality in PipelineBase
[folly.git] / folly / wangle / channel / Pipeline-inl.h
1 /*
2  * Copyright 2015 Facebook, Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *   http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16
17 #pragma once
18
19 #include <glog/logging.h>
20
21 namespace folly { namespace wangle {
22
23 template <class R, class W>
24 Pipeline<R, W>::Pipeline() : isStatic_(false) {}
25
26 template <class R, class W>
27 Pipeline<R, W>::Pipeline(bool isStatic) : isStatic_(isStatic) {
28   CHECK(isStatic_);
29 }
30
31 template <class R, class W>
32 Pipeline<R, W>::~Pipeline() {
33   if (!isStatic_) {
34     detachHandlers();
35   }
36 }
37
38 template <class H>
39 PipelineBase& PipelineBase::addBack(std::shared_ptr<H> handler) {
40   typedef typename ContextType<H>::type Context;
41   return addHelper(std::make_shared<Context>(this, std::move(handler)), false);
42 }
43
44 template <class H>
45 PipelineBase& PipelineBase::addBack(H&& handler) {
46   return addBack(std::make_shared<H>(std::forward<H>(handler)));
47 }
48
49 template <class H>
50 PipelineBase& PipelineBase::addBack(H* handler) {
51   return addBack(std::shared_ptr<H>(handler, [](H*){}));
52 }
53
54 template <class H>
55 PipelineBase& PipelineBase::addFront(std::shared_ptr<H> handler) {
56   typedef typename ContextType<H>::type Context;
57   return addHelper(std::make_shared<Context>(this, std::move(handler)), true);
58 }
59
60 template <class H>
61 PipelineBase& PipelineBase::addFront(H&& handler) {
62   return addFront(std::make_shared<H>(std::forward<H>(handler)));
63 }
64
65 template <class H>
66 PipelineBase& PipelineBase::addFront(H* handler) {
67   return addFront(std::shared_ptr<H>(handler, [](H*){}));
68 }
69
70 template <class H>
71 PipelineBase& PipelineBase::removeHelper(H* handler, bool checkEqual) {
72   typedef typename ContextType<H>::type Context;
73   bool removed = false;
74   for (auto it = ctxs_.begin(); it != ctxs_.end(); it++) {
75     auto ctx = std::dynamic_pointer_cast<Context>(*it);
76     if (ctx && (!checkEqual || ctx->getHandler() == handler)) {
77       it = removeAt(it);
78       removed = true;
79       if (it == ctxs_.end()) {
80         break;
81       }
82     }
83   }
84
85   if (!removed) {
86     throw std::invalid_argument("No such handler in pipeline");
87   }
88
89   return *this;
90 }
91
92 template <class H>
93 PipelineBase& PipelineBase::remove() {
94   return removeHelper<H>(nullptr, false);
95 }
96
97 template <class H>
98 PipelineBase& PipelineBase::remove(H* handler) {
99   return removeHelper<H>(handler, true);
100 }
101
102 template <class H>
103 H* PipelineBase::getHandler(int i) {
104   typedef typename ContextType<H>::type Context;
105   auto ctx = dynamic_cast<Context*>(ctxs_[i].get());
106   CHECK(ctx);
107   return ctx->getHandler();
108 }
109
110 template <class H>
111 bool PipelineBase::setOwner(H* handler) {
112   typedef typename ContextType<H>::type Context;
113   for (auto& ctx : ctxs_) {
114     auto ctxImpl = dynamic_cast<Context*>(ctx.get());
115     if (ctxImpl && ctxImpl->getHandler() == handler) {
116       owner_ = ctx;
117       return true;
118     }
119   }
120   return false;
121 }
122
123 template <class Context>
124 void PipelineBase::addContextFront(Context* ctx) {
125   addHelper(std::shared_ptr<Context>(ctx, [](Context*){}), true);
126 }
127
128 template <class Context>
129 PipelineBase& PipelineBase::addHelper(
130     std::shared_ptr<Context>&& ctx,
131     bool front) {
132   ctxs_.insert(front ? ctxs_.begin() : ctxs_.end(), ctx);
133   if (Context::dir == HandlerDir::BOTH || Context::dir == HandlerDir::IN) {
134     inCtxs_.insert(front ? inCtxs_.begin() : inCtxs_.end(), ctx.get());
135   }
136   if (Context::dir == HandlerDir::BOTH || Context::dir == HandlerDir::OUT) {
137     outCtxs_.insert(front ? outCtxs_.begin() : outCtxs_.end(), ctx.get());
138   }
139   return *this;
140 }
141
142 namespace detail {
143
144 template <class T>
145 inline void logWarningIfNotUnit(const std::string& warning) {
146   LOG(WARNING) << warning;
147 }
148
149 template <>
150 inline void logWarningIfNotUnit<Unit>(const std::string& warning) {
151   // do nothing
152 }
153
154 } // detail
155
156 template <class R, class W>
157 template <class T>
158 typename std::enable_if<!std::is_same<T, Unit>::value>::type
159 Pipeline<R, W>::read(R msg) {
160   if (!front_) {
161     throw std::invalid_argument("read(): no inbound handler in Pipeline");
162   }
163   front_->read(std::forward<R>(msg));
164 }
165
166 template <class R, class W>
167 template <class T>
168 typename std::enable_if<!std::is_same<T, Unit>::value>::type
169 Pipeline<R, W>::readEOF() {
170   if (!front_) {
171     throw std::invalid_argument("readEOF(): no inbound handler in Pipeline");
172   }
173   front_->readEOF();
174 }
175
176 template <class R, class W>
177 template <class T>
178 typename std::enable_if<!std::is_same<T, Unit>::value>::type
179 Pipeline<R, W>::transportActive() {
180   if (front_) {
181     front_->transportActive();
182   }
183 }
184
185 template <class R, class W>
186 template <class T>
187 typename std::enable_if<!std::is_same<T, Unit>::value>::type
188 Pipeline<R, W>::transportInactive() {
189   if (front_) {
190     front_->transportInactive();
191   }
192 }
193
194 template <class R, class W>
195 template <class T>
196 typename std::enable_if<!std::is_same<T, Unit>::value>::type
197 Pipeline<R, W>::readException(exception_wrapper e) {
198   if (!front_) {
199     throw std::invalid_argument(
200         "readException(): no inbound handler in Pipeline");
201   }
202   front_->readException(std::move(e));
203 }
204
205 template <class R, class W>
206 template <class T>
207 typename std::enable_if<!std::is_same<T, Unit>::value, Future<void>>::type
208 Pipeline<R, W>::write(W msg) {
209   if (!back_) {
210     throw std::invalid_argument("write(): no outbound handler in Pipeline");
211   }
212   return back_->write(std::forward<W>(msg));
213 }
214
215 template <class R, class W>
216 template <class T>
217 typename std::enable_if<!std::is_same<T, Unit>::value, Future<void>>::type
218 Pipeline<R, W>::close() {
219   if (!back_) {
220     throw std::invalid_argument("close(): no outbound handler in Pipeline");
221   }
222   return back_->close();
223 }
224
225 // TODO Have read/write/etc check that pipeline has been finalized
226 template <class R, class W>
227 void Pipeline<R, W>::finalize() {
228   front_ = nullptr;
229   if (!inCtxs_.empty()) {
230     front_ = dynamic_cast<InboundLink<R>*>(inCtxs_.front());
231     for (size_t i = 0; i < inCtxs_.size() - 1; i++) {
232       inCtxs_[i]->setNextIn(inCtxs_[i+1]);
233     }
234     inCtxs_.back()->setNextIn(nullptr);
235   }
236
237   back_ = nullptr;
238   if (!outCtxs_.empty()) {
239     back_ = dynamic_cast<OutboundLink<W>*>(outCtxs_.back());
240     for (size_t i = outCtxs_.size() - 1; i > 0; i--) {
241       outCtxs_[i]->setNextOut(outCtxs_[i-1]);
242     }
243     outCtxs_.front()->setNextOut(nullptr);
244   }
245
246   if (!front_) {
247     detail::logWarningIfNotUnit<R>(
248         "No inbound handler in Pipeline, inbound operations will throw "
249         "std::invalid_argument");
250   }
251   if (!back_) {
252     detail::logWarningIfNotUnit<W>(
253         "No outbound handler in Pipeline, outbound operations will throw "
254         "std::invalid_argument");
255   }
256
257   for (auto it = ctxs_.rbegin(); it != ctxs_.rend(); it++) {
258     (*it)->attachPipeline();
259   }
260 }
261
262 }} // folly::wangle