unrevert "(wangle) express current Core functionality with a state machine""
[folly.git] / folly / wangle / detail / Core.h
1 /*
2  * Copyright 2014 Facebook, Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *   http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16
17 #pragma once
18
19 #include <atomic>
20 #include <mutex>
21 #include <stdexcept>
22 #include <vector>
23
24 #include <folly/Optional.h>
25 #include <folly/SmallLocks.h>
26
27 #include <folly/wangle/Try.h>
28 #include <folly/wangle/Promise.h>
29 #include <folly/wangle/Future.h>
30 #include <folly/wangle/Executor.h>
31 #include <folly/wangle/detail/FSM.h>
32
33 namespace folly { namespace wangle { namespace detail {
34
35 // As of GCC 4.8.1, the std::function in libstdc++ optimizes only for pointers
36 // to functions, using a helper avoids a call to malloc.
37 template<typename T>
38 void empty_callback(Try<T>&&) { }
39
40 enum class State {
41   Waiting,
42   Interruptible,
43   Interrupted,
44   Done,
45 };
46
47 /** The shared state object for Future and Promise. */
48 template<typename T>
49 class Core : protected FSM<State> {
50  public:
51   // This must be heap-constructed. There's probably a way to enforce that in
52   // code but since this is just internal detail code and I don't know how
53   // off-hand, I'm punting.
54   Core() : FSM<State>(State::Waiting) {}
55   ~Core() {
56     assert(calledBack_);
57     assert(detached_ == 2);
58   }
59
60   // not copyable
61   Core(Core const&) = delete;
62   Core& operator=(Core const&) = delete;
63
64   // not movable (see comment in the implementation of Future::then)
65   Core(Core&&) noexcept = delete;
66   Core& operator=(Core&&) = delete;
67
68   Try<T>& getTry() {
69     if (ready()) {
70       return *result_;
71     } else {
72       throw FutureNotReady();
73     }
74   }
75
76   template <typename F>
77   void setCallback(F func) {
78     auto setCallback_ = [&]{
79       if (callback_) {
80         throw std::logic_error("setCallback called twice");
81       }
82
83       callback_ = std::move(func);
84     };
85
86     FSM_START
87       case State::Waiting:
88       case State::Interruptible:
89       case State::Interrupted:
90         FSM_UPDATE(state, setCallback_);
91         break;
92
93       case State::Done:
94         FSM_UPDATE2(State::Done,
95           setCallback_,
96           [&]{ maybeCallback(); });
97         break;
98     FSM_END
99   }
100
101   void setResult(Try<T>&& t) {
102     FSM_START
103       case State::Waiting:
104       case State::Interruptible:
105       case State::Interrupted:
106         FSM_UPDATE2(State::Done,
107           [&]{ result_ = std::move(t); },
108           [&]{ maybeCallback(); });
109         break;
110
111       case State::Done:
112         throw std::logic_error("setResult called twice");
113     FSM_END
114   }
115
116   bool ready() const {
117     return getState() == State::Done;
118   }
119
120   // Called by a destructing Future
121   void detachFuture() {
122     if (!callback_) {
123       setCallback(empty_callback<T>);
124     }
125     activate();
126     detachOne();
127   }
128
129   // Called by a destructing Promise
130   void detachPromise() {
131     if (!ready()) {
132       setResult(Try<T>(std::make_exception_ptr(BrokenPromise())));
133     }
134     detachOne();
135   }
136
137   void deactivate() {
138     active_ = false;
139   }
140
141   void activate() {
142     active_ = true;
143     if (ready()) {
144       maybeCallback();
145     }
146   }
147
148   bool isActive() { return active_; }
149
150   void setExecutor(Executor* x) {
151     executor_ = x;
152   }
153
154   void raise(std::exception_ptr const& e) {
155     FSM_START
156       case State::Interruptible:
157         FSM_UPDATE2(State::Interrupted,
158           [&]{ interrupt_ = e; },
159           [&]{ interruptHandler_(interrupt_); });
160         break;
161
162       case State::Waiting:
163       case State::Interrupted:
164         FSM_UPDATE(State::Interrupted,
165           [&]{ interrupt_ = e; });
166         break;
167
168       case State::Done:
169         FSM_BREAK
170     FSM_END
171   }
172
173   void setInterruptHandler(std::function<void(std::exception_ptr const&)> fn) {
174     FSM_START
175       case State::Waiting:
176       case State::Interruptible:
177         FSM_UPDATE(State::Interruptible,
178           [&]{ interruptHandler_ = std::move(fn); });
179         break;
180
181       case State::Interrupted:
182         fn(interrupt_);
183         FSM_BREAK
184
185       case State::Done:
186         FSM_BREAK
187     FSM_END
188   }
189
190  private:
191   void maybeCallback() {
192     assert(ready());
193     if (!calledBack_ && isActive() && callback_) {
194       // TODO(5306911) we should probably try/catch
195       calledBack_ = true;
196       Executor* x = executor_;
197       if (x) {
198         MoveWrapper<std::function<void(Try<T>&&)>> cb(std::move(callback_));
199         MoveWrapper<folly::Optional<Try<T>>> val(std::move(result_));
200         x->add([cb, val]() mutable { (*cb)(std::move(**val)); });
201       } else {
202         callback_(std::move(*result_));
203       }
204     }
205   }
206
207   void detachOne() {
208     ++detached_;
209     assert(detached_ == 1 || detached_ == 2);
210     if (detached_ == 2) {
211       // we should have already executed the callback with the value
212       assert(calledBack_);
213       delete this;
214     }
215   }
216
217   folly::Optional<Try<T>> result_;
218   std::function<void(Try<T>&&)> callback_;
219   std::atomic<bool> calledBack_ {false};
220   std::atomic<unsigned char> detached_ {0};
221   std::atomic<bool> active_ {true};
222   std::atomic<Executor*> executor_ {nullptr};
223   std::exception_ptr interrupt_;
224   std::function<void(std::exception_ptr const&)> interruptHandler_;
225 };
226
227 template <typename... Ts>
228 struct VariadicContext {
229   VariadicContext() : total(0), count(0) {}
230   Promise<std::tuple<Try<Ts>... > > p;
231   std::tuple<Try<Ts>... > results;
232   size_t total;
233   std::atomic<size_t> count;
234   typedef Future<std::tuple<Try<Ts>...>> type;
235 };
236
237 template <typename... Ts, typename THead, typename... Fs>
238 typename std::enable_if<sizeof...(Fs) == 0, void>::type
239 whenAllVariadicHelper(VariadicContext<Ts...> *ctx, THead&& head, Fs&&... tail) {
240   head.setCallback_([ctx](Try<typename THead::value_type>&& t) {
241     std::get<sizeof...(Ts) - sizeof...(Fs) - 1>(ctx->results) = std::move(t);
242     if (++ctx->count == ctx->total) {
243       ctx->p.setValue(std::move(ctx->results));
244       delete ctx;
245     }
246   });
247 }
248
249 template <typename... Ts, typename THead, typename... Fs>
250 typename std::enable_if<sizeof...(Fs) != 0, void>::type
251 whenAllVariadicHelper(VariadicContext<Ts...> *ctx, THead&& head, Fs&&... tail) {
252   head.setCallback_([ctx](Try<typename THead::value_type>&& t) {
253     std::get<sizeof...(Ts) - sizeof...(Fs) - 1>(ctx->results) = std::move(t);
254     if (++ctx->count == ctx->total) {
255       ctx->p.setValue(std::move(ctx->results));
256       delete ctx;
257     }
258   });
259   // template tail-recursion
260   whenAllVariadicHelper(ctx, std::forward<Fs>(tail)...);
261 }
262
263 template <typename T>
264 struct WhenAllContext {
265   explicit WhenAllContext() : count(0), total(0) {}
266   Promise<std::vector<Try<T> > > p;
267   std::vector<Try<T> > results;
268   std::atomic<size_t> count;
269   size_t total;
270 };
271
272 template <typename T>
273 struct WhenAnyContext {
274   explicit WhenAnyContext(size_t n) : done(false), ref_count(n) {};
275   Promise<std::pair<size_t, Try<T>>> p;
276   std::atomic<bool> done;
277   std::atomic<size_t> ref_count;
278   void decref() {
279     if (--ref_count == 0) {
280       delete this;
281     }
282   }
283 };
284
285 template <typename T>
286 struct WhenAllLaterContext {
287   explicit WhenAllLaterContext() : count(0), total(0) {}
288   std::function<void(std::vector<Try<T>>&&)> fn;
289   std::vector<Try<T> > results;
290   std::atomic<size_t> count;
291   size_t total;
292 };
293
294 }}} // namespace