2 * Copyright 2014 Facebook, Inc.
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
8 * http://www.apache.org/licenses/LICENSE-2.0
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
22 #include <folly/Baton.h>
23 #include <folly/futures/detail/Core.h>
24 #include <folly/futures/Timekeeper.h>
31 Timekeeper* getTimekeeperSingleton();
35 Future<T>::Future(Future<T>&& other) noexcept : core_(nullptr) {
36 *this = std::move(other);
40 Future<T>& Future<T>::operator=(Future<T>&& other) {
41 std::swap(core_, other.core_);
48 const typename std::enable_if<!std::is_void<F>::value, F>::type& val)
52 *this = p.getFuture();
58 typename std::enable_if<!std::is_void<F>::value, F>::type&& val)
61 p.setValue(std::forward<F>(val));
62 *this = p.getFuture();
67 typename std::enable_if<std::is_void<F>::value, int>::type>
68 Future<void>::Future() : core_(nullptr) {
71 *this = p.getFuture();
76 Future<T>::~Future() {
81 void Future<T>::detach() {
83 core_->detachFuture();
89 void Future<T>::throwIfInvalid() const {
96 void Future<T>::setCallback_(F&& func) {
98 core_->setCallback(std::move(func));
101 // Variant: returns a value
102 // e.g. f.then([](Try<T>&& t){ return t.value(); });
104 template <typename F, typename R, bool isTry, typename... Args>
105 typename std::enable_if<!R::ReturnsFuture::value, typename R::Return>::type
106 Future<T>::thenImplementation(F func, detail::argResult<isTry, F, Args...>) {
107 static_assert(sizeof...(Args) <= 1, "Then must take zero/one argument");
108 typedef typename R::ReturnsFuture::Inner B;
112 // wrap these so we can move them into the lambda
113 folly::MoveWrapper<Promise<B>> p;
114 folly::MoveWrapper<F> funcm(std::forward<F>(func));
116 // grab the Future now before we lose our handle on the Promise
117 auto f = p->getFuture();
119 f.setExecutor(getExecutor());
122 /* This is a bit tricky.
124 We can't just close over *this in case this Future gets moved. So we
125 make a new dummy Future. We could figure out something more
126 sophisticated that avoids making a new Future object when it can, as an
127 optimization. But this is correct.
129 core_ can't be moved, it is explicitly disallowed (as is copying). But
130 if there's ever a reason to allow it, this is one place that makes that
131 assumption and would need to be fixed. We use a standard shared pointer
132 for core_ (by copying it in), which means in essence obj holds a shared
133 pointer to itself. But this shouldn't leak because Promise will not
134 outlive the continuation, because Promise will setException() with a
135 broken Promise if it is destructed before completed. We could use a
136 weak pointer but it would have to be converted to a shared pointer when
137 func is executed (because the Future returned by func may possibly
138 persist beyond the callback, if it gets moved), and so it is an
139 optimization to just make it shared from the get-go.
141 We have to move in the Promise and func using the MoveWrapper
142 hack. (func could be copied but it's a big drag on perf).
144 Two subtle but important points about this design. detail::Core has no
145 back pointers to Future or Promise, so if Future or Promise get moved
146 (and they will be moved in performant code) we don't have to do
147 anything fancy. And because we store the continuation in the
148 detail::Core, not in the Future, we can execute the continuation even
149 after the Future has gone out of scope. This is an intentional design
150 decision. It is likely we will want to be able to cancel a continuation
151 in some circumstances, but I think it should be explicit not implicit
152 in the destruction of the Future used to create it.
155 [p, funcm](Try<T>&& t) mutable {
156 if (!isTry && t.hasException()) {
157 p->setException(std::move(t.exception()));
160 return (*funcm)(t.template get<isTry, Args>()...);
168 // Variant: returns a Future
169 // e.g. f.then([](T&& t){ return makeFuture<T>(t); });
171 template <typename F, typename R, bool isTry, typename... Args>
172 typename std::enable_if<R::ReturnsFuture::value, typename R::Return>::type
173 Future<T>::thenImplementation(F func, detail::argResult<isTry, F, Args...>) {
174 static_assert(sizeof...(Args) <= 1, "Then must take zero/one argument");
175 typedef typename R::ReturnsFuture::Inner B;
179 // wrap these so we can move them into the lambda
180 folly::MoveWrapper<Promise<B>> p;
181 folly::MoveWrapper<F> funcm(std::forward<F>(func));
183 // grab the Future now before we lose our handle on the Promise
184 auto f = p->getFuture();
186 f.setExecutor(getExecutor());
190 [p, funcm](Try<T>&& t) mutable {
191 if (!isTry && t.hasException()) {
192 p->setException(std::move(t.exception()));
195 auto f2 = (*funcm)(t.template get<isTry, Args>()...);
196 // that didn't throw, now we can steal p
197 f2.setCallback_([p](Try<B>&& b) mutable {
198 p->fulfilTry(std::move(b));
200 } catch (const std::exception& e) {
201 p->setException(exception_wrapper(std::current_exception(), e));
203 p->setException(exception_wrapper(std::current_exception()));
211 template <typename T>
212 template <typename R, typename Caller, typename... Args>
213 Future<typename isFuture<R>::Inner>
214 Future<T>::then(R(Caller::*func)(Args...), Caller *instance) {
215 typedef typename std::remove_cv<
216 typename std::remove_reference<
217 typename detail::ArgType<Args...>::FirstArg>::type>::type FirstArg;
218 return then([instance, func](Try<T>&& t){
219 return (instance->*func)(t.template get<isTry<FirstArg>::value, Args>()...);
224 Future<void> Future<T>::then() {
225 return then([] (Try<T>&& t) {});
228 // onError where the callback returns T
231 typename std::enable_if<
232 !detail::Extract<F>::ReturnsFuture::value,
234 Future<T>::onError(F&& func) {
235 typedef typename detail::Extract<F>::FirstArg Exn;
237 std::is_same<typename detail::Extract<F>::RawReturn, T>::value,
238 "Return type of onError callback must be T or Future<T>");
241 auto f = p.getFuture();
242 auto pm = folly::makeMoveWrapper(std::move(p));
243 auto funcm = folly::makeMoveWrapper(std::move(func));
244 setCallback_([pm, funcm](Try<T>&& t) mutable {
245 if (!t.template withException<Exn>([&] (Exn& e) {
250 pm->fulfilTry(std::move(t));
257 // onError where the callback returns Future<T>
260 typename std::enable_if<
261 detail::Extract<F>::ReturnsFuture::value,
263 Future<T>::onError(F&& func) {
265 std::is_same<typename detail::Extract<F>::Return, Future<T>>::value,
266 "Return type of onError callback must be T or Future<T>");
267 typedef typename detail::Extract<F>::FirstArg Exn;
270 auto f = p.getFuture();
271 auto pm = folly::makeMoveWrapper(std::move(p));
272 auto funcm = folly::makeMoveWrapper(std::move(func));
273 setCallback_([pm, funcm](Try<T>&& t) mutable {
274 if (!t.template withException<Exn>([&] (Exn& e) {
276 auto f2 = (*funcm)(e);
277 f2.setCallback_([pm](Try<T>&& t2) mutable {
278 pm->fulfilTry(std::move(t2));
280 } catch (const std::exception& e2) {
281 pm->setException(exception_wrapper(std::current_exception(), e2));
283 pm->setException(exception_wrapper(std::current_exception()));
286 pm->fulfilTry(std::move(t));
295 Future<T> Future<T>::onTimeout(Duration dur, F&& func, Timekeeper* tk) {
296 auto funcw = folly::makeMoveWrapper(std::forward<F>(func));
297 return within(dur, tk)
298 .onError([funcw](TimedOut const&) { return (*funcw)(); });
302 typename std::add_lvalue_reference<T>::type Future<T>::value() {
305 return core_->getTry().value();
309 typename std::add_lvalue_reference<const T>::type Future<T>::value() const {
312 return core_->getTry().value();
316 Try<T>& Future<T>::getTry() {
319 return core_->getTry();
323 template <typename Executor>
324 inline Future<T> Future<T>::via(Executor* executor) && {
327 setExecutor(executor);
329 return std::move(*this);
333 template <typename Executor>
334 inline Future<T> Future<T>::via(Executor* executor) & {
337 MoveWrapper<Promise<T>> p;
338 auto f = p->getFuture();
339 then([p](Try<T>&& t) mutable { p->fulfilTry(std::move(t)); });
340 return std::move(f).via(executor);
344 bool Future<T>::isReady() const {
346 return core_->ready();
350 void Future<T>::raise(exception_wrapper exception) {
351 core_->raise(std::move(exception));
357 Future<typename std::decay<T>::type> makeFuture(T&& t) {
358 Promise<typename std::decay<T>::type> p;
359 p.setValue(std::forward<T>(t));
360 return p.getFuture();
363 inline // for multiple translation units
364 Future<void> makeFuture() {
367 return p.getFuture();
373 typename std::enable_if<!std::is_reference<F>::value, bool>::type sdf)
374 -> Future<decltype(func())> {
375 Promise<decltype(func())> p;
380 return p.getFuture();
384 auto makeFutureTry(F const& func) -> Future<decltype(func())> {
386 return makeFutureTry(std::move(copy));
390 Future<T> makeFuture(std::exception_ptr const& e) {
393 return p.getFuture();
397 Future<T> makeFuture(exception_wrapper ew) {
399 p.setException(std::move(ew));
400 return p.getFuture();
403 template <class T, class E>
404 typename std::enable_if<std::is_base_of<std::exception, E>::value,
406 makeFuture(E const& e) {
408 p.setException(make_exception_wrapper<E>(e));
409 return p.getFuture();
413 Future<T> makeFuture(Try<T>&& t) {
414 Promise<typename std::decay<T>::type> p;
415 p.fulfilTry(std::move(t));
416 return p.getFuture();
420 inline Future<void> makeFuture(Try<void>&& t) {
421 if (t.hasException()) {
422 return makeFuture<void>(std::move(t.exception()));
429 template <typename Executor>
430 Future<void> via(Executor* executor) {
431 return makeFuture().via(executor);
436 template <typename... Fs>
437 typename detail::VariadicContext<
438 typename std::decay<Fs>::type::value_type...>::type
442 new detail::VariadicContext<typename std::decay<Fs>::type::value_type...>();
443 ctx->total = sizeof...(fs);
444 auto f_saved = ctx->p.getFuture();
445 detail::whenAllVariadicHelper(ctx,
446 std::forward<typename std::decay<Fs>::type>(fs)...);
452 template <class InputIterator>
455 Try<typename std::iterator_traits<InputIterator>::value_type::value_type>>>
456 whenAll(InputIterator first, InputIterator last)
459 typename std::iterator_traits<InputIterator>::value_type::value_type T;
462 return makeFuture(std::vector<Try<T>>());
464 size_t n = std::distance(first, last);
466 auto ctx = new detail::WhenAllContext<T>();
468 ctx->results.resize(n);
470 auto f_saved = ctx->p.getFuture();
472 for (size_t i = 0; first != last; ++first, ++i) {
475 f.setCallback_([ctx, i, n](Try<T>&& t) {
476 ctx->results[i] = std::move(t);
477 if (++ctx->count == n) {
478 ctx->p.setValue(std::move(ctx->results));
487 template <class InputIterator>
492 std::iterator_traits<InputIterator>::value_type::value_type> > >
493 whenAny(InputIterator first, InputIterator last) {
495 typename std::iterator_traits<InputIterator>::value_type::value_type T;
497 auto ctx = new detail::WhenAnyContext<T>(std::distance(first, last));
498 auto f_saved = ctx->p.getFuture();
500 for (size_t i = 0; first != last; first++, i++) {
502 f.setCallback_([i, ctx](Try<T>&& t) {
503 if (!ctx->done.exchange(true)) {
504 ctx->p.setValue(std::make_pair(i, std::move(t)));
513 template <class InputIterator>
514 Future<std::vector<std::pair<size_t, Try<typename
515 std::iterator_traits<InputIterator>::value_type::value_type>>>>
516 whenN(InputIterator first, InputIterator last, size_t n) {
518 std::iterator_traits<InputIterator>::value_type::value_type T;
519 typedef std::vector<std::pair<size_t, Try<T>>> V;
526 auto ctx = std::make_shared<ctx_t>();
529 // for each completed Future, increase count and add to vector, until we
530 // have n completed futures at which point we fulfil our Promise with the
535 it->then([ctx, n, i](Try<T>&& t) {
537 auto c = ++ctx->completed;
539 assert(ctx->v.size() < n);
540 v.push_back(std::make_pair(i, std::move(t)));
542 ctx->p.fulfilTry(Try<V>(std::move(v)));
552 ctx->p.setException(std::runtime_error("Not enough futures"));
555 return ctx->p.getFuture();
559 Future<T> Future<T>::within(Duration dur, Timekeeper* tk) {
560 return within(dur, TimedOut(), tk);
565 Future<T> Future<T>::within(Duration dur, E e, Timekeeper* tk) {
568 Context(E ex) : exception(std::move(ex)), promise(), token(false) {}
571 std::atomic<bool> token;
573 auto ctx = std::make_shared<Context>(std::move(e));
576 tk = folly::detail::getTimekeeperSingleton();
580 .then([ctx](Try<void> const& t) {
581 if (ctx->token.exchange(true) == false) {
582 if (t.hasException()) {
583 ctx->promise.setException(std::move(t.exception()));
585 ctx->promise.setException(std::move(ctx->exception));
590 this->then([ctx](Try<T>&& t) {
591 if (ctx->token.exchange(true) == false) {
592 ctx->promise.fulfilTry(std::move(t));
596 return ctx->promise.getFuture();
600 Future<T> Future<T>::delayed(Duration dur, Timekeeper* tk) {
601 return whenAll(*this, futures::sleep(dur, tk))
602 .then([](std::tuple<Try<T>, Try<void>> tup) {
603 Try<T>& t = std::get<0>(tup);
604 return makeFuture<T>(std::move(t));
611 void waitImpl(Future<T>& f) {
612 // short-circuit if there's nothing to do
613 if (f.isReady()) return;
616 f = f.then([&](Try<T> t) {
618 return makeFuture(std::move(t));
622 // There's a race here between the return here and the actual finishing of
623 // the future. f is completed, but the setup may not have finished on done
624 // after the baton has posted.
625 while (!f.isReady()) {
626 std::this_thread::yield();
631 void waitImpl(Future<T>& f, Duration dur) {
632 // short-circuit if there's nothing to do
633 if (f.isReady()) return;
635 auto baton = std::make_shared<Baton<>>();
636 f = f.then([baton](Try<T> t) {
638 return makeFuture(std::move(t));
641 // Let's preserve the invariant that if we did not timeout (timed_wait returns
642 // true), then the returned Future is complete when it is returned to the
643 // caller. We need to wait out the race for that Future to complete.
644 if (baton->timed_wait(std::chrono::system_clock::now() + dur)) {
645 while (!f.isReady()) {
646 std::this_thread::yield();
652 void waitViaImpl(Future<T>& f, DrivableExecutor* e) {
653 while (!f.isReady()) {
661 Future<T>& Future<T>::wait() & {
662 detail::waitImpl(*this);
667 Future<T>&& Future<T>::wait() && {
668 detail::waitImpl(*this);
669 return std::move(*this);
673 Future<T>& Future<T>::wait(Duration dur) & {
674 detail::waitImpl(*this, dur);
679 Future<T>&& Future<T>::wait(Duration dur) && {
680 detail::waitImpl(*this, dur);
681 return std::move(*this);
685 Future<T>& Future<T>::waitVia(DrivableExecutor* e) & {
686 detail::waitViaImpl(*this, e);
691 Future<T>&& Future<T>::waitVia(DrivableExecutor* e) && {
692 detail::waitViaImpl(*this, e);
693 return std::move(*this);
698 return std::move(wait().value());
702 inline void Future<void>::get() {
707 T Future<T>::get(Duration dur) {
710 return std::move(value());
717 inline void Future<void>::get(Duration dur) {
727 T Future<T>::getVia(DrivableExecutor* e) {
728 return std::move(waitVia(e).value());
732 inline void Future<void>::getVia(DrivableExecutor* e) {
740 Future<Z> chainHelper(Future<Z> f) {
744 template <class Z, class F, class Fn, class... Callbacks>
745 Future<Z> chainHelper(F f, Fn fn, Callbacks... fns) {
746 return chainHelper<Z>(f.then(fn), fns...);
750 template <class A, class Z, class... Callbacks>
751 std::function<Future<Z>(Try<A>)>
752 chain(Callbacks... fns) {
753 MoveWrapper<Promise<A>> pw;
754 MoveWrapper<Future<Z>> fw(chainHelper<Z>(pw->getFuture(), fns...));
755 return [=](Try<A> t) mutable {
756 pw->fulfilTry(std::move(t));
757 return std::move(*fw);
765 // I haven't included a Future<T&> specialization because I don't forsee us
766 // using it, however it is not difficult to add when needed. Refer to
767 // Future<void> for guidance. std::future and boost::future code would also be