2 * Copyright 2015 Facebook, Inc.
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
8 * http://www.apache.org/licenses/LICENSE-2.0
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
22 #include <folly/Baton.h>
23 #include <folly/Optional.h>
24 #include <folly/futures/detail/Core.h>
25 #include <folly/futures/Timekeeper.h>
32 Timekeeper* getTimekeeperSingleton();
36 Future<T>::Future(Future<T>&& other) noexcept : core_(other.core_) {
37 other.core_ = nullptr;
41 Future<T>& Future<T>::operator=(Future<T>&& other) noexcept {
42 std::swap(core_, other.core_);
49 const typename std::enable_if<!std::is_void<F>::value, F>::type& val)
53 *this = p.getFuture();
59 typename std::enable_if<!std::is_void<F>::value, F>::type&& val)
62 p.setValue(std::forward<F>(val));
63 *this = p.getFuture();
68 typename std::enable_if<std::is_void<F>::value, int>::type>
69 Future<void>::Future() : core_(nullptr) {
72 *this = p.getFuture();
77 Future<T>::~Future() {
82 void Future<T>::detach() {
84 core_->detachFuture();
90 void Future<T>::throwIfInvalid() const {
97 void Future<T>::setCallback_(F&& func) {
99 core_->setCallback(std::move(func));
106 typename std::enable_if<isFuture<F>::value,
107 Future<typename isFuture<T>::Inner>>::type
108 Future<T>::unwrap() {
109 return then([](Future<typename isFuture<T>::Inner> internal_future) {
110 return internal_future;
116 // Variant: returns a value
117 // e.g. f.then([](Try<T>&& t){ return t.value(); });
119 template <typename F, typename R, bool isTry, typename... Args>
120 typename std::enable_if<!R::ReturnsFuture::value, typename R::Return>::type
121 Future<T>::thenImplementation(F func, detail::argResult<isTry, F, Args...>) {
122 static_assert(sizeof...(Args) <= 1, "Then must take zero/one argument");
123 typedef typename R::ReturnsFuture::Inner B;
127 // wrap these so we can move them into the lambda
128 folly::MoveWrapper<Promise<B>> p;
129 folly::MoveWrapper<F> funcm(std::forward<F>(func));
131 // grab the Future now before we lose our handle on the Promise
132 auto f = p->getFuture();
134 f.setExecutor(getExecutor());
137 /* This is a bit tricky.
139 We can't just close over *this in case this Future gets moved. So we
140 make a new dummy Future. We could figure out something more
141 sophisticated that avoids making a new Future object when it can, as an
142 optimization. But this is correct.
144 core_ can't be moved, it is explicitly disallowed (as is copying). But
145 if there's ever a reason to allow it, this is one place that makes that
146 assumption and would need to be fixed. We use a standard shared pointer
147 for core_ (by copying it in), which means in essence obj holds a shared
148 pointer to itself. But this shouldn't leak because Promise will not
149 outlive the continuation, because Promise will setException() with a
150 broken Promise if it is destructed before completed. We could use a
151 weak pointer but it would have to be converted to a shared pointer when
152 func is executed (because the Future returned by func may possibly
153 persist beyond the callback, if it gets moved), and so it is an
154 optimization to just make it shared from the get-go.
156 We have to move in the Promise and func using the MoveWrapper
157 hack. (func could be copied but it's a big drag on perf).
159 Two subtle but important points about this design. detail::Core has no
160 back pointers to Future or Promise, so if Future or Promise get moved
161 (and they will be moved in performant code) we don't have to do
162 anything fancy. And because we store the continuation in the
163 detail::Core, not in the Future, we can execute the continuation even
164 after the Future has gone out of scope. This is an intentional design
165 decision. It is likely we will want to be able to cancel a continuation
166 in some circumstances, but I think it should be explicit not implicit
167 in the destruction of the Future used to create it.
170 [p, funcm](Try<T>&& t) mutable {
171 if (!isTry && t.hasException()) {
172 p->setException(std::move(t.exception()));
175 return (*funcm)(t.template get<isTry, Args>()...);
183 // Variant: returns a Future
184 // e.g. f.then([](T&& t){ return makeFuture<T>(t); });
186 template <typename F, typename R, bool isTry, typename... Args>
187 typename std::enable_if<R::ReturnsFuture::value, typename R::Return>::type
188 Future<T>::thenImplementation(F func, detail::argResult<isTry, F, Args...>) {
189 static_assert(sizeof...(Args) <= 1, "Then must take zero/one argument");
190 typedef typename R::ReturnsFuture::Inner B;
194 // wrap these so we can move them into the lambda
195 folly::MoveWrapper<Promise<B>> p;
196 folly::MoveWrapper<F> funcm(std::forward<F>(func));
198 // grab the Future now before we lose our handle on the Promise
199 auto f = p->getFuture();
201 f.setExecutor(getExecutor());
205 [p, funcm](Try<T>&& t) mutable {
206 if (!isTry && t.hasException()) {
207 p->setException(std::move(t.exception()));
210 auto f2 = (*funcm)(t.template get<isTry, Args>()...);
211 // that didn't throw, now we can steal p
212 f2.setCallback_([p](Try<B>&& b) mutable {
213 p->fulfilTry(std::move(b));
215 } catch (const std::exception& e) {
216 p->setException(exception_wrapper(std::current_exception(), e));
218 p->setException(exception_wrapper(std::current_exception()));
226 template <typename T>
227 template <typename R, typename Caller, typename... Args>
228 Future<typename isFuture<R>::Inner>
229 Future<T>::then(R(Caller::*func)(Args...), Caller *instance) {
230 typedef typename std::remove_cv<
231 typename std::remove_reference<
232 typename detail::ArgType<Args...>::FirstArg>::type>::type FirstArg;
233 return then([instance, func](Try<T>&& t){
234 return (instance->*func)(t.template get<isTry<FirstArg>::value, Args>()...);
239 Future<void> Future<T>::then() {
240 return then([] (Try<T>&& t) {});
243 // onError where the callback returns T
246 typename std::enable_if<
247 !detail::callableWith<F, exception_wrapper>::value &&
248 !detail::Extract<F>::ReturnsFuture::value,
250 Future<T>::onError(F&& func) {
251 typedef typename detail::Extract<F>::FirstArg Exn;
253 std::is_same<typename detail::Extract<F>::RawReturn, T>::value,
254 "Return type of onError callback must be T or Future<T>");
257 auto f = p.getFuture();
258 auto pm = folly::makeMoveWrapper(std::move(p));
259 auto funcm = folly::makeMoveWrapper(std::move(func));
260 setCallback_([pm, funcm](Try<T>&& t) mutable {
261 if (!t.template withException<Exn>([&] (Exn& e) {
266 pm->fulfilTry(std::move(t));
273 // onError where the callback returns Future<T>
276 typename std::enable_if<
277 !detail::callableWith<F, exception_wrapper>::value &&
278 detail::Extract<F>::ReturnsFuture::value,
280 Future<T>::onError(F&& func) {
282 std::is_same<typename detail::Extract<F>::Return, Future<T>>::value,
283 "Return type of onError callback must be T or Future<T>");
284 typedef typename detail::Extract<F>::FirstArg Exn;
287 auto f = p.getFuture();
288 auto pm = folly::makeMoveWrapper(std::move(p));
289 auto funcm = folly::makeMoveWrapper(std::move(func));
290 setCallback_([pm, funcm](Try<T>&& t) mutable {
291 if (!t.template withException<Exn>([&] (Exn& e) {
293 auto f2 = (*funcm)(e);
294 f2.setCallback_([pm](Try<T>&& t2) mutable {
295 pm->fulfilTry(std::move(t2));
297 } catch (const std::exception& e2) {
298 pm->setException(exception_wrapper(std::current_exception(), e2));
300 pm->setException(exception_wrapper(std::current_exception()));
303 pm->fulfilTry(std::move(t));
312 Future<T> Future<T>::ensure(F func) {
313 MoveWrapper<F> funcw(std::move(func));
314 return this->then([funcw](Try<T>&& t) {
316 return makeFuture(std::move(t));
322 Future<T> Future<T>::onTimeout(Duration dur, F&& func, Timekeeper* tk) {
323 auto funcw = folly::makeMoveWrapper(std::forward<F>(func));
324 return within(dur, tk)
325 .onError([funcw](TimedOut const&) { return (*funcw)(); });
330 typename std::enable_if<
331 detail::callableWith<F, exception_wrapper>::value &&
332 detail::Extract<F>::ReturnsFuture::value,
334 Future<T>::onError(F&& func) {
336 std::is_same<typename detail::Extract<F>::Return, Future<T>>::value,
337 "Return type of onError callback must be T or Future<T>");
340 auto f = p.getFuture();
341 auto pm = folly::makeMoveWrapper(std::move(p));
342 auto funcm = folly::makeMoveWrapper(std::move(func));
343 setCallback_([pm, funcm](Try<T> t) mutable {
344 if (t.hasException()) {
346 auto f2 = (*funcm)(std::move(t.exception()));
347 f2.setCallback_([pm](Try<T> t2) mutable {
348 pm->fulfilTry(std::move(t2));
350 } catch (const std::exception& e2) {
351 pm->setException(exception_wrapper(std::current_exception(), e2));
353 pm->setException(exception_wrapper(std::current_exception()));
356 pm->fulfilTry(std::move(t));
363 // onError(exception_wrapper) that returns T
366 typename std::enable_if<
367 detail::callableWith<F, exception_wrapper>::value &&
368 !detail::Extract<F>::ReturnsFuture::value,
370 Future<T>::onError(F&& func) {
372 std::is_same<typename detail::Extract<F>::Return, Future<T>>::value,
373 "Return type of onError callback must be T or Future<T>");
376 auto f = p.getFuture();
377 auto pm = folly::makeMoveWrapper(std::move(p));
378 auto funcm = folly::makeMoveWrapper(std::move(func));
379 setCallback_([pm, funcm](Try<T> t) mutable {
380 if (t.hasException()) {
382 return (*funcm)(std::move(t.exception()));
385 pm->fulfilTry(std::move(t));
393 typename std::add_lvalue_reference<T>::type Future<T>::value() {
396 return core_->getTry().value();
400 typename std::add_lvalue_reference<const T>::type Future<T>::value() const {
403 return core_->getTry().value();
407 Try<T>& Future<T>::getTry() {
410 return core_->getTry();
414 Optional<Try<T>> Future<T>::poll() {
416 if (core_->ready()) {
417 o = std::move(core_->getTry());
423 template <typename Executor>
424 inline Future<T> Future<T>::via(Executor* executor) && {
427 setExecutor(executor);
429 return std::move(*this);
433 template <typename Executor>
434 inline Future<T> Future<T>::via(Executor* executor) & {
437 MoveWrapper<Promise<T>> p;
438 auto f = p->getFuture();
439 then([p](Try<T>&& t) mutable { p->fulfilTry(std::move(t)); });
440 return std::move(f).via(executor);
444 bool Future<T>::isReady() const {
446 return core_->ready();
450 void Future<T>::raise(exception_wrapper exception) {
451 core_->raise(std::move(exception));
457 Future<typename std::decay<T>::type> makeFuture(T&& t) {
458 Promise<typename std::decay<T>::type> p;
459 p.setValue(std::forward<T>(t));
460 return p.getFuture();
463 inline // for multiple translation units
464 Future<void> makeFuture() {
467 return p.getFuture();
473 typename std::enable_if<!std::is_reference<F>::value, bool>::type sdf)
474 -> Future<decltype(func())> {
475 Promise<decltype(func())> p;
480 return p.getFuture();
484 auto makeFutureTry(F const& func) -> Future<decltype(func())> {
486 return makeFutureTry(std::move(copy));
490 Future<T> makeFuture(std::exception_ptr const& e) {
493 return p.getFuture();
497 Future<T> makeFuture(exception_wrapper ew) {
499 p.setException(std::move(ew));
500 return p.getFuture();
503 template <class T, class E>
504 typename std::enable_if<std::is_base_of<std::exception, E>::value,
506 makeFuture(E const& e) {
508 p.setException(make_exception_wrapper<E>(e));
509 return p.getFuture();
513 Future<T> makeFuture(Try<T>&& t) {
514 Promise<typename std::decay<T>::type> p;
515 p.fulfilTry(std::move(t));
516 return p.getFuture();
520 inline Future<void> makeFuture(Try<void>&& t) {
521 if (t.hasException()) {
522 return makeFuture<void>(std::move(t.exception()));
529 template <typename Executor>
530 Future<void> via(Executor* executor) {
531 return makeFuture().via(executor);
536 template <typename... Fs>
537 typename detail::VariadicContext<
538 typename std::decay<Fs>::type::value_type...>::type
542 new detail::VariadicContext<typename std::decay<Fs>::type::value_type...>();
543 ctx->total = sizeof...(fs);
544 auto f_saved = ctx->p.getFuture();
545 detail::whenAllVariadicHelper(ctx,
546 std::forward<typename std::decay<Fs>::type>(fs)...);
552 template <class InputIterator>
555 Try<typename std::iterator_traits<InputIterator>::value_type::value_type>>>
556 whenAll(InputIterator first, InputIterator last)
559 typename std::iterator_traits<InputIterator>::value_type::value_type T;
562 return makeFuture(std::vector<Try<T>>());
564 size_t n = std::distance(first, last);
566 auto ctx = new detail::WhenAllContext<T>();
568 ctx->results.resize(n);
570 auto f_saved = ctx->p.getFuture();
572 for (size_t i = 0; first != last; ++first, ++i) {
575 f.setCallback_([ctx, i, n](Try<T>&& t) {
576 ctx->results[i] = std::move(t);
577 if (++ctx->count == n) {
578 ctx->p.setValue(std::move(ctx->results));
587 template <class InputIterator>
592 std::iterator_traits<InputIterator>::value_type::value_type> > >
593 whenAny(InputIterator first, InputIterator last) {
595 typename std::iterator_traits<InputIterator>::value_type::value_type T;
597 auto ctx = new detail::WhenAnyContext<T>(std::distance(first, last));
598 auto f_saved = ctx->p.getFuture();
600 for (size_t i = 0; first != last; first++, i++) {
602 f.setCallback_([i, ctx](Try<T>&& t) {
603 if (!ctx->done.exchange(true)) {
604 ctx->p.setValue(std::make_pair(i, std::move(t)));
613 template <class InputIterator>
614 Future<std::vector<std::pair<size_t, Try<typename
615 std::iterator_traits<InputIterator>::value_type::value_type>>>>
616 whenN(InputIterator first, InputIterator last, size_t n) {
618 std::iterator_traits<InputIterator>::value_type::value_type T;
619 typedef std::vector<std::pair<size_t, Try<T>>> V;
626 auto ctx = std::make_shared<ctx_t>();
629 // for each completed Future, increase count and add to vector, until we
630 // have n completed futures at which point we fulfil our Promise with the
635 it->then([ctx, n, i](Try<T>&& t) {
637 auto c = ++ctx->completed;
639 assert(ctx->v.size() < n);
640 v.push_back(std::make_pair(i, std::move(t)));
642 ctx->p.fulfilTry(Try<V>(std::move(v)));
652 ctx->p.setException(std::runtime_error("Not enough futures"));
655 return ctx->p.getFuture();
658 template <class It, class T, class F, class ItT, class Arg>
659 typename std::enable_if<!isFutureResult<F, T, Arg>::value, Future<T>>::type
660 reduce(It first, It last, T initial, F func) {
662 return makeFuture(std::move(initial));
665 typedef isTry<Arg> IsTry;
667 return whenAll(first, last)
668 .then([initial, func](std::vector<Try<ItT>>& vals) mutable {
669 for (auto& val : vals) {
670 initial = func(std::move(initial),
671 // Either return a ItT&& or a Try<ItT>&& depending
672 // on the type of the argument of func.
673 val.template get<IsTry::value, Arg&&>());
679 template <class It, class T, class F, class ItT, class Arg>
680 typename std::enable_if<isFutureResult<F, T, Arg>::value, Future<T>>::type
681 reduce(It first, It last, T initial, F func) {
683 return makeFuture(std::move(initial));
686 typedef isTry<Arg> IsTry;
688 auto f = first->then([initial, func](Try<ItT>& head) mutable {
689 return func(std::move(initial),
690 head.template get<IsTry::value, Arg&&>());
693 for (++first; first != last; ++first) {
694 f = whenAll(f, *first).then([func](std::tuple<Try<T>, Try<ItT>>& t) {
695 return func(std::move(std::get<0>(t).value()),
696 // Either return a ItT&& or a Try<ItT>&& depending
697 // on the type of the argument of func.
698 std::get<1>(t).template get<IsTry::value, Arg&&>());
706 Future<T> Future<T>::within(Duration dur, Timekeeper* tk) {
707 return within(dur, TimedOut(), tk);
712 Future<T> Future<T>::within(Duration dur, E e, Timekeeper* tk) {
715 Context(E ex) : exception(std::move(ex)), promise(), token(false) {}
718 std::atomic<bool> token;
720 auto ctx = std::make_shared<Context>(std::move(e));
723 tk = folly::detail::getTimekeeperSingleton();
727 .then([ctx](Try<void> const& t) {
728 if (ctx->token.exchange(true) == false) {
729 if (t.hasException()) {
730 ctx->promise.setException(std::move(t.exception()));
732 ctx->promise.setException(std::move(ctx->exception));
737 this->then([ctx](Try<T>&& t) {
738 if (ctx->token.exchange(true) == false) {
739 ctx->promise.fulfilTry(std::move(t));
743 return ctx->promise.getFuture();
747 Future<T> Future<T>::delayed(Duration dur, Timekeeper* tk) {
748 return whenAll(*this, futures::sleep(dur, tk))
749 .then([](std::tuple<Try<T>, Try<void>> tup) {
750 Try<T>& t = std::get<0>(tup);
751 return makeFuture<T>(std::move(t));
758 void waitImpl(Future<T>& f) {
759 // short-circuit if there's nothing to do
760 if (f.isReady()) return;
763 f = f.then([&](Try<T> t) {
765 return makeFuture(std::move(t));
769 // There's a race here between the return here and the actual finishing of
770 // the future. f is completed, but the setup may not have finished on done
771 // after the baton has posted.
772 while (!f.isReady()) {
773 std::this_thread::yield();
778 void waitImpl(Future<T>& f, Duration dur) {
779 // short-circuit if there's nothing to do
780 if (f.isReady()) return;
782 auto baton = std::make_shared<Baton<>>();
783 f = f.then([baton](Try<T> t) {
785 return makeFuture(std::move(t));
788 // Let's preserve the invariant that if we did not timeout (timed_wait returns
789 // true), then the returned Future is complete when it is returned to the
790 // caller. We need to wait out the race for that Future to complete.
791 if (baton->timed_wait(std::chrono::system_clock::now() + dur)) {
792 while (!f.isReady()) {
793 std::this_thread::yield();
799 void waitViaImpl(Future<T>& f, DrivableExecutor* e) {
800 while (!f.isReady()) {
808 Future<T>& Future<T>::wait() & {
809 detail::waitImpl(*this);
814 Future<T>&& Future<T>::wait() && {
815 detail::waitImpl(*this);
816 return std::move(*this);
820 Future<T>& Future<T>::wait(Duration dur) & {
821 detail::waitImpl(*this, dur);
826 Future<T>&& Future<T>::wait(Duration dur) && {
827 detail::waitImpl(*this, dur);
828 return std::move(*this);
832 Future<T>& Future<T>::waitVia(DrivableExecutor* e) & {
833 detail::waitViaImpl(*this, e);
838 Future<T>&& Future<T>::waitVia(DrivableExecutor* e) && {
839 detail::waitViaImpl(*this, e);
840 return std::move(*this);
845 return std::move(wait().value());
849 inline void Future<void>::get() {
854 T Future<T>::get(Duration dur) {
857 return std::move(value());
864 inline void Future<void>::get(Duration dur) {
874 T Future<T>::getVia(DrivableExecutor* e) {
875 return std::move(waitVia(e).value());
879 inline void Future<void>::getVia(DrivableExecutor* e) {
884 Future<bool> Future<T>::willEqual(Future<T>& f) {
885 return whenAll(*this, f).then([](const std::tuple<Try<T>, Try<T>>& t) {
886 if (std::get<0>(t).hasValue() && std::get<1>(t).hasValue()) {
887 return std::get<0>(t).value() == std::get<1>(t).value();
896 Future<T> Future<T>::filter(F predicate) {
897 auto p = folly::makeMoveWrapper(std::move(predicate));
898 return this->then([p](T val) {
899 T const& valConstRef = val;
900 if (!(*p)(valConstRef)) {
901 throw PredicateDoesNotObtain();
910 Future<Z> chainHelper(Future<Z> f) {
914 template <class Z, class F, class Fn, class... Callbacks>
915 Future<Z> chainHelper(F f, Fn fn, Callbacks... fns) {
916 return chainHelper<Z>(f.then(fn), fns...);
920 template <class A, class Z, class... Callbacks>
921 std::function<Future<Z>(Try<A>)>
922 chain(Callbacks... fns) {
923 MoveWrapper<Promise<A>> pw;
924 MoveWrapper<Future<Z>> fw(chainHelper<Z>(pw->getFuture(), fns...));
925 return [=](Try<A> t) mutable {
926 pw->fulfilTry(std::move(t));
927 return std::move(*fw);
935 // I haven't included a Future<T&> specialization because I don't forsee us
936 // using it, however it is not difficult to add when needed. Refer to
937 // Future<void> for guidance. std::future and boost::future code would also be