14a586ffa813f76b7d3aa94cc9577ee6c09e2d96
[folly.git] / folly / wangle / channel / Pipeline.h
1 /*
2  * Copyright 2015 Facebook, Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *   http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16
17 #pragma once
18
19 #include <folly/wangle/channel/HandlerContext.h>
20 #include <folly/futures/Future.h>
21 #include <folly/io/async/AsyncTransport.h>
22 #include <folly/io/async/DelayedDestruction.h>
23 #include <folly/ExceptionWrapper.h>
24 #include <folly/Memory.h>
25
26 namespace folly { namespace wangle {
27
28 class PipelineManager {
29  public:
30   virtual ~PipelineManager() {}
31   virtual void deletePipeline(PipelineBase* pipeline) = 0;
32 };
33
34 class PipelineBase {
35  public:
36   void setPipelineManager(PipelineManager* manager) {
37     manager_ = manager;
38   }
39
40   void deletePipeline() {
41     if (manager_) {
42       manager_->deletePipeline(this);
43     }
44   }
45
46  private:
47   PipelineManager* manager_{nullptr};
48 };
49
50 struct Nothing{};
51
52 /*
53  * R is the inbound type, i.e. inbound calls start with pipeline.read(R)
54  * W is the outbound type, i.e. outbound calls start with pipeline.write(W)
55  *
56  * Use Nothing for one of the types if your pipeline is unidirectional.
57  * If R is Nothing, read(), readEOF(), and readException() will be disabled.
58  * If W is Nothing, write() and close() will be disabled.
59  */
60 template <class R, class W = Nothing>
61 class Pipeline : public PipelineBase, public DelayedDestruction {
62  public:
63   Pipeline();
64   ~Pipeline();
65
66   std::shared_ptr<AsyncTransport> getTransport();
67
68   void setWriteFlags(WriteFlags flags);
69   WriteFlags getWriteFlags();
70
71   void setReadBufferSettings(uint64_t minAvailable, uint64_t allocationSize);
72   std::pair<uint64_t, uint64_t> getReadBufferSettings();
73
74   template <class T = R>
75   typename std::enable_if<!std::is_same<T, Nothing>::value>::type
76   read(R msg);
77
78   template <class T = R>
79   typename std::enable_if<!std::is_same<T, Nothing>::value>::type
80   readEOF();
81
82   template <class T = R>
83   typename std::enable_if<!std::is_same<T, Nothing>::value>::type
84   readException(exception_wrapper e);
85
86   template <class T = W>
87   typename std::enable_if<!std::is_same<T, Nothing>::value, Future<void>>::type
88   write(W msg);
89
90   template <class T = W>
91   typename std::enable_if<!std::is_same<T, Nothing>::value, Future<void>>::type
92   close();
93
94   template <class H>
95   Pipeline& addBack(std::shared_ptr<H> handler);
96
97   template <class H>
98   Pipeline& addBack(H&& handler);
99
100   template <class H>
101   Pipeline& addBack(H* handler);
102
103   template <class H>
104   Pipeline& addFront(std::shared_ptr<H> handler);
105
106   template <class H>
107   Pipeline& addFront(H&& handler);
108
109   template <class H>
110   Pipeline& addFront(H* handler);
111
112   template <class H>
113   H* getHandler(int i);
114
115   void finalize();
116
117   // If one of the handlers owns the pipeline itself, use setOwner to ensure
118   // that the pipeline doesn't try to detach the handler during destruction,
119   // lest destruction ordering issues occur.
120   // See thrift/lib/cpp2/async/Cpp2Channel.cpp for an example
121   template <class H>
122   bool setOwner(H* handler);
123
124   void attachTransport(std::shared_ptr<AsyncTransport> transport);
125
126   void detachTransport();
127
128  protected:
129   explicit Pipeline(bool isStatic);
130
131   template <class Context>
132   void addContextFront(Context* ctx);
133
134   void detachHandlers();
135
136  private:
137   template <class Context>
138   Pipeline& addHelper(std::shared_ptr<Context>&& ctx, bool front);
139
140   std::shared_ptr<AsyncTransport> transport_;
141   WriteFlags writeFlags_{WriteFlags::NONE};
142   std::pair<uint64_t, uint64_t> readBufferSettings_{2048, 2048};
143
144   bool isStatic_{false};
145   std::shared_ptr<PipelineContext> owner_;
146   std::vector<std::shared_ptr<PipelineContext>> ctxs_;
147   std::vector<PipelineContext*> inCtxs_;
148   std::vector<PipelineContext*> outCtxs_;
149   InboundLink<R>* front_{nullptr};
150   OutboundLink<W>* back_{nullptr};
151 };
152
153 }}
154
155 namespace folly {
156
157 class AsyncSocket;
158
159 template <typename Pipeline>
160 class PipelineFactory {
161  public:
162   virtual std::unique_ptr<Pipeline, folly::DelayedDestruction::Destructor>
163   newPipeline(std::shared_ptr<AsyncSocket>) = 0;
164
165   virtual ~PipelineFactory() {}
166 };
167
168 }
169
170 #include <folly/wangle/channel/Pipeline-inl.h>