2 * Copyright 2014 Facebook, Inc.
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
8 * http://www.apache.org/licenses/LICENSE-2.0
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
24 #include <folly/Optional.h>
25 #include <folly/SmallLocks.h>
27 #include <folly/wangle/Try.h>
28 #include <folly/wangle/Promise.h>
29 #include <folly/wangle/Future.h>
30 #include <folly/Executor.h>
31 #include <folly/wangle/detail/FSM.h>
33 #include <folly/io/async/Request.h>
35 namespace folly { namespace wangle { namespace detail {
37 // As of GCC 4.8.1, the std::function in libstdc++ optimizes only for pointers
38 // to functions, using a helper avoids a call to malloc.
40 void empty_callback(Try<T>&&) { }
49 /** The shared state object for Future and Promise. */
51 class Core : protected FSM<State> {
53 // This must be heap-constructed. There's probably a way to enforce that in
54 // code but since this is just internal detail code and I don't know how
55 // off-hand, I'm punting.
56 Core() : FSM<State>(State::Waiting) {}
59 assert(detached_ == 2);
63 Core(Core const&) = delete;
64 Core& operator=(Core const&) = delete;
66 // not movable (see comment in the implementation of Future::then)
67 Core(Core&&) noexcept = delete;
68 Core& operator=(Core&&) = delete;
74 throw FutureNotReady();
79 void setCallback(F func) {
80 auto setCallback_ = [&]{
82 throw std::logic_error("setCallback called twice");
85 context_ = RequestContext::saveContext();
86 callback_ = std::move(func);
91 case State::Interruptible:
92 case State::Interrupted:
93 FSM_UPDATE(state, setCallback_);
97 FSM_UPDATE2(State::Done,
99 [&]{ maybeCallback(); });
104 void setResult(Try<T>&& t) {
107 case State::Interruptible:
108 case State::Interrupted:
109 FSM_UPDATE2(State::Done,
110 [&]{ result_ = std::move(t); },
111 [&]{ maybeCallback(); });
115 throw std::logic_error("setResult called twice");
120 return getState() == State::Done;
123 // Called by a destructing Future
124 void detachFuture() {
126 setCallback(empty_callback<T>);
132 // Called by a destructing Promise
133 void detachPromise() {
135 setResult(Try<T>(std::make_exception_ptr(BrokenPromise())));
151 bool isActive() { return active_; }
153 void setExecutor(Executor* x) {
157 void raise(std::exception_ptr const& e) {
159 case State::Interruptible:
160 FSM_UPDATE2(State::Interrupted,
161 [&]{ interrupt_ = e; },
162 [&]{ interruptHandler_(interrupt_); });
166 case State::Interrupted:
167 FSM_UPDATE(State::Interrupted,
168 [&]{ interrupt_ = e; });
176 void setInterruptHandler(std::function<void(std::exception_ptr const&)> fn) {
179 case State::Interruptible:
180 FSM_UPDATE(State::Interruptible,
181 [&]{ interruptHandler_ = std::move(fn); });
184 case State::Interrupted:
194 void maybeCallback() {
196 if (isActive() && callback_) {
197 if (!calledBack_.exchange(true)) {
198 // TODO(5306911) we should probably try/catch
199 Executor* x = executor_;
201 RequestContext::setContext(context_);
203 MoveWrapper<std::function<void(Try<T>&&)>> cb(std::move(callback_));
204 MoveWrapper<folly::Optional<Try<T>>> val(std::move(result_));
205 x->add([cb, val]() mutable { (*cb)(std::move(**val)); });
207 callback_(std::move(*result_));
214 auto d = ++detached_;
218 // we should have already executed the callback with the value
224 folly::Optional<Try<T>> result_;
225 std::function<void(Try<T>&&)> callback_;
226 std::shared_ptr<RequestContext> context_{nullptr};
227 std::atomic<bool> calledBack_ {false};
228 std::atomic<unsigned char> detached_ {0};
229 std::atomic<bool> active_ {true};
230 std::atomic<Executor*> executor_ {nullptr};
231 std::exception_ptr interrupt_;
232 std::function<void(std::exception_ptr const&)> interruptHandler_;
235 template <typename... Ts>
236 struct VariadicContext {
237 VariadicContext() : total(0), count(0) {}
238 Promise<std::tuple<Try<Ts>... > > p;
239 std::tuple<Try<Ts>... > results;
241 std::atomic<size_t> count;
242 typedef Future<std::tuple<Try<Ts>...>> type;
245 template <typename... Ts, typename THead, typename... Fs>
246 typename std::enable_if<sizeof...(Fs) == 0, void>::type
247 whenAllVariadicHelper(VariadicContext<Ts...> *ctx, THead&& head, Fs&&... tail) {
248 head.setCallback_([ctx](Try<typename THead::value_type>&& t) {
249 std::get<sizeof...(Ts) - sizeof...(Fs) - 1>(ctx->results) = std::move(t);
250 if (++ctx->count == ctx->total) {
251 ctx->p.setValue(std::move(ctx->results));
257 template <typename... Ts, typename THead, typename... Fs>
258 typename std::enable_if<sizeof...(Fs) != 0, void>::type
259 whenAllVariadicHelper(VariadicContext<Ts...> *ctx, THead&& head, Fs&&... tail) {
260 head.setCallback_([ctx](Try<typename THead::value_type>&& t) {
261 std::get<sizeof...(Ts) - sizeof...(Fs) - 1>(ctx->results) = std::move(t);
262 if (++ctx->count == ctx->total) {
263 ctx->p.setValue(std::move(ctx->results));
267 // template tail-recursion
268 whenAllVariadicHelper(ctx, std::forward<Fs>(tail)...);
271 template <typename T>
272 struct WhenAllContext {
273 WhenAllContext() : count(0) {}
274 Promise<std::vector<Try<T> > > p;
275 std::vector<Try<T> > results;
276 std::atomic<size_t> count;
279 template <typename T>
280 struct WhenAnyContext {
281 explicit WhenAnyContext(size_t n) : done(false), ref_count(n) {};
282 Promise<std::pair<size_t, Try<T>>> p;
283 std::atomic<bool> done;
284 std::atomic<size_t> ref_count;
286 if (--ref_count == 0) {