85d89d894fa72a0e0ebae47af234489a8bbacf99
[folly.git] / folly / wangle / channel / Pipeline.h
1 /*
2  * Copyright 2015 Facebook, Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *   http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16
17 #pragma once
18
19 #include <folly/wangle/channel/HandlerContext.h>
20 #include <folly/futures/Future.h>
21 #include <folly/futures/Unit.h>
22 #include <folly/io/async/AsyncTransport.h>
23 #include <folly/io/async/DelayedDestruction.h>
24 #include <folly/ExceptionWrapper.h>
25 #include <folly/Memory.h>
26
27 namespace folly { namespace wangle {
28
29 class PipelineManager {
30  public:
31   virtual ~PipelineManager() = default;
32   virtual void deletePipeline(PipelineBase* pipeline) = 0;
33 };
34
35 class PipelineBase : public DelayedDestruction {
36  public:
37   virtual ~PipelineBase() = default;
38
39   void setPipelineManager(PipelineManager* manager) {
40     manager_ = manager;
41   }
42
43   void deletePipeline() {
44     if (manager_) {
45       manager_->deletePipeline(this);
46     }
47   }
48
49   void setTransport(std::shared_ptr<AsyncTransport> transport) {
50     transport_ = transport;
51   }
52
53   std::shared_ptr<AsyncTransport> getTransport() {
54     return transport_;
55   }
56
57  private:
58   PipelineManager* manager_{nullptr};
59   std::shared_ptr<AsyncTransport> transport_;
60 };
61
62 /*
63  * R is the inbound type, i.e. inbound calls start with pipeline.read(R)
64  * W is the outbound type, i.e. outbound calls start with pipeline.write(W)
65  *
66  * Use Unit for one of the types if your pipeline is unidirectional.
67  * If R is Unit, read(), readEOF(), and readException() will be disabled.
68  * If W is Unit, write() and close() will be disabled.
69  */
70 template <class R, class W = Unit>
71 class Pipeline : public PipelineBase {
72  public:
73   Pipeline();
74   ~Pipeline();
75
76   void setWriteFlags(WriteFlags flags);
77   WriteFlags getWriteFlags();
78
79   void setReadBufferSettings(uint64_t minAvailable, uint64_t allocationSize);
80   std::pair<uint64_t, uint64_t> getReadBufferSettings();
81
82   template <class T = R>
83   typename std::enable_if<!std::is_same<T, Unit>::value>::type
84   read(R msg);
85
86   template <class T = R>
87   typename std::enable_if<!std::is_same<T, Unit>::value>::type
88   readEOF();
89
90   template <class T = R>
91   typename std::enable_if<!std::is_same<T, Unit>::value>::type
92   readException(exception_wrapper e);
93
94   template <class T = R>
95   typename std::enable_if<!std::is_same<T, Unit>::value>::type
96   transportActive();
97
98   template <class T = R>
99   typename std::enable_if<!std::is_same<T, Unit>::value>::type
100   transportInactive();
101
102   template <class T = W>
103   typename std::enable_if<!std::is_same<T, Unit>::value, Future<void>>::type
104   write(W msg);
105
106   template <class T = W>
107   typename std::enable_if<!std::is_same<T, Unit>::value, Future<void>>::type
108   close();
109
110   template <class H>
111   Pipeline& addBack(std::shared_ptr<H> handler);
112
113   template <class H>
114   Pipeline& addBack(H&& handler);
115
116   template <class H>
117   Pipeline& addBack(H* handler);
118
119   template <class H>
120   Pipeline& addFront(std::shared_ptr<H> handler);
121
122   template <class H>
123   Pipeline& addFront(H&& handler);
124
125   template <class H>
126   Pipeline& addFront(H* handler);
127
128   template <class H>
129   Pipeline& remove(H* handler);
130
131   template <class H>
132   Pipeline& remove();
133
134   Pipeline& removeFront();
135
136   Pipeline& removeBack();
137
138   template <class H>
139   H* getHandler(int i);
140
141   void finalize();
142
143   // If one of the handlers owns the pipeline itself, use setOwner to ensure
144   // that the pipeline doesn't try to detach the handler during destruction,
145   // lest destruction ordering issues occur.
146   // See thrift/lib/cpp2/async/Cpp2Channel.cpp for an example
147   template <class H>
148   bool setOwner(H* handler);
149
150  protected:
151   explicit Pipeline(bool isStatic);
152
153   template <class Context>
154   void addContextFront(Context* ctx);
155
156   void detachHandlers();
157
158  private:
159   template <class Context>
160   Pipeline& addHelper(std::shared_ptr<Context>&& ctx, bool front);
161
162   template <class H>
163   Pipeline& removeHelper(H* handler, bool checkEqual);
164
165   typedef std::vector<std::shared_ptr<PipelineContext>>::iterator
166     ContextIterator;
167
168   ContextIterator removeAt(const ContextIterator& it);
169
170   WriteFlags writeFlags_{WriteFlags::NONE};
171   std::pair<uint64_t, uint64_t> readBufferSettings_{2048, 2048};
172
173   bool isStatic_{false};
174   std::shared_ptr<PipelineContext> owner_;
175   std::vector<std::shared_ptr<PipelineContext>> ctxs_;
176   std::vector<PipelineContext*> inCtxs_;
177   std::vector<PipelineContext*> outCtxs_;
178   InboundLink<R>* front_{nullptr};
179   OutboundLink<W>* back_{nullptr};
180 };
181
182 }}
183
184 namespace folly {
185
186 class AsyncSocket;
187
188 template <typename Pipeline>
189 class PipelineFactory {
190  public:
191   virtual std::unique_ptr<Pipeline, folly::DelayedDestruction::Destructor>
192   newPipeline(std::shared_ptr<AsyncSocket>) = 0;
193
194   virtual ~PipelineFactory() = default;
195 };
196
197 }
198
199 #include <folly/wangle/channel/Pipeline-inl.h>