2 * Copyright 2015 Facebook, Inc.
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
8 * http://www.apache.org/licenses/LICENSE-2.0
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
22 #include <folly/Baton.h>
23 #include <folly/Optional.h>
24 #include <folly/futures/detail/Core.h>
25 #include <folly/futures/Timekeeper.h>
32 Timekeeper* getTimekeeperSingleton();
36 Future<T>::Future(Future<T>&& other) noexcept : core_(other.core_) {
37 other.core_ = nullptr;
41 Future<T>& Future<T>::operator=(Future<T>&& other) noexcept {
42 std::swap(core_, other.core_);
49 const typename std::enable_if<!std::is_void<F>::value, F>::type& val)
53 *this = p.getFuture();
59 typename std::enable_if<!std::is_void<F>::value, F>::type&& val)
62 p.setValue(std::forward<F>(val));
63 *this = p.getFuture();
68 typename std::enable_if<std::is_void<F>::value, int>::type>
69 Future<void>::Future() : core_(nullptr) {
72 *this = p.getFuture();
77 Future<T>::~Future() {
82 void Future<T>::detach() {
84 core_->detachFuture();
90 void Future<T>::throwIfInvalid() const {
97 void Future<T>::setCallback_(F&& func) {
99 core_->setCallback(std::move(func));
106 typename std::enable_if<isFuture<F>::value,
107 Future<typename isFuture<T>::Inner>>::type
108 Future<T>::unwrap() {
109 return then([](Future<typename isFuture<T>::Inner> internal_future) {
110 return internal_future;
116 // Variant: returns a value
117 // e.g. f.then([](Try<T>&& t){ return t.value(); });
119 template <typename F, typename R, bool isTry, typename... Args>
120 typename std::enable_if<!R::ReturnsFuture::value, typename R::Return>::type
121 Future<T>::thenImplementation(F func, detail::argResult<isTry, F, Args...>) {
122 static_assert(sizeof...(Args) <= 1, "Then must take zero/one argument");
123 typedef typename R::ReturnsFuture::Inner B;
127 // wrap these so we can move them into the lambda
128 folly::MoveWrapper<Promise<B>> p;
129 folly::MoveWrapper<F> funcm(std::forward<F>(func));
131 // grab the Future now before we lose our handle on the Promise
132 auto f = p->getFuture();
134 f.setExecutor(getExecutor());
137 /* This is a bit tricky.
139 We can't just close over *this in case this Future gets moved. So we
140 make a new dummy Future. We could figure out something more
141 sophisticated that avoids making a new Future object when it can, as an
142 optimization. But this is correct.
144 core_ can't be moved, it is explicitly disallowed (as is copying). But
145 if there's ever a reason to allow it, this is one place that makes that
146 assumption and would need to be fixed. We use a standard shared pointer
147 for core_ (by copying it in), which means in essence obj holds a shared
148 pointer to itself. But this shouldn't leak because Promise will not
149 outlive the continuation, because Promise will setException() with a
150 broken Promise if it is destructed before completed. We could use a
151 weak pointer but it would have to be converted to a shared pointer when
152 func is executed (because the Future returned by func may possibly
153 persist beyond the callback, if it gets moved), and so it is an
154 optimization to just make it shared from the get-go.
156 We have to move in the Promise and func using the MoveWrapper
157 hack. (func could be copied but it's a big drag on perf).
159 Two subtle but important points about this design. detail::Core has no
160 back pointers to Future or Promise, so if Future or Promise get moved
161 (and they will be moved in performant code) we don't have to do
162 anything fancy. And because we store the continuation in the
163 detail::Core, not in the Future, we can execute the continuation even
164 after the Future has gone out of scope. This is an intentional design
165 decision. It is likely we will want to be able to cancel a continuation
166 in some circumstances, but I think it should be explicit not implicit
167 in the destruction of the Future used to create it.
170 [p, funcm](Try<T>&& t) mutable {
171 if (!isTry && t.hasException()) {
172 p->setException(std::move(t.exception()));
175 return (*funcm)(t.template get<isTry, Args>()...);
183 // Variant: returns a Future
184 // e.g. f.then([](T&& t){ return makeFuture<T>(t); });
186 template <typename F, typename R, bool isTry, typename... Args>
187 typename std::enable_if<R::ReturnsFuture::value, typename R::Return>::type
188 Future<T>::thenImplementation(F func, detail::argResult<isTry, F, Args...>) {
189 static_assert(sizeof...(Args) <= 1, "Then must take zero/one argument");
190 typedef typename R::ReturnsFuture::Inner B;
194 // wrap these so we can move them into the lambda
195 folly::MoveWrapper<Promise<B>> p;
196 folly::MoveWrapper<F> funcm(std::forward<F>(func));
198 // grab the Future now before we lose our handle on the Promise
199 auto f = p->getFuture();
201 f.setExecutor(getExecutor());
205 [p, funcm](Try<T>&& t) mutable {
206 if (!isTry && t.hasException()) {
207 p->setException(std::move(t.exception()));
210 auto f2 = (*funcm)(t.template get<isTry, Args>()...);
211 // that didn't throw, now we can steal p
212 f2.setCallback_([p](Try<B>&& b) mutable {
213 p->setTry(std::move(b));
215 } catch (const std::exception& e) {
216 p->setException(exception_wrapper(std::current_exception(), e));
218 p->setException(exception_wrapper(std::current_exception()));
226 template <typename T>
227 template <typename R, typename Caller, typename... Args>
228 Future<typename isFuture<R>::Inner>
229 Future<T>::then(R(Caller::*func)(Args...), Caller *instance) {
230 typedef typename std::remove_cv<
231 typename std::remove_reference<
232 typename detail::ArgType<Args...>::FirstArg>::type>::type FirstArg;
233 return then([instance, func](Try<T>&& t){
234 return (instance->*func)(t.template get<isTry<FirstArg>::value, Args>()...);
239 Future<void> Future<T>::then() {
240 return then([] (Try<T>&& t) {});
243 // onError where the callback returns T
246 typename std::enable_if<
247 !detail::callableWith<F, exception_wrapper>::value &&
248 !detail::Extract<F>::ReturnsFuture::value,
250 Future<T>::onError(F&& func) {
251 typedef typename detail::Extract<F>::FirstArg Exn;
253 std::is_same<typename detail::Extract<F>::RawReturn, T>::value,
254 "Return type of onError callback must be T or Future<T>");
257 auto f = p.getFuture();
258 auto pm = folly::makeMoveWrapper(std::move(p));
259 auto funcm = folly::makeMoveWrapper(std::move(func));
260 setCallback_([pm, funcm](Try<T>&& t) mutable {
261 if (!t.template withException<Exn>([&] (Exn& e) {
266 pm->setTry(std::move(t));
273 // onError where the callback returns Future<T>
276 typename std::enable_if<
277 !detail::callableWith<F, exception_wrapper>::value &&
278 detail::Extract<F>::ReturnsFuture::value,
280 Future<T>::onError(F&& func) {
282 std::is_same<typename detail::Extract<F>::Return, Future<T>>::value,
283 "Return type of onError callback must be T or Future<T>");
284 typedef typename detail::Extract<F>::FirstArg Exn;
287 auto f = p.getFuture();
288 auto pm = folly::makeMoveWrapper(std::move(p));
289 auto funcm = folly::makeMoveWrapper(std::move(func));
290 setCallback_([pm, funcm](Try<T>&& t) mutable {
291 if (!t.template withException<Exn>([&] (Exn& e) {
293 auto f2 = (*funcm)(e);
294 f2.setCallback_([pm](Try<T>&& t2) mutable {
295 pm->setTry(std::move(t2));
297 } catch (const std::exception& e2) {
298 pm->setException(exception_wrapper(std::current_exception(), e2));
300 pm->setException(exception_wrapper(std::current_exception()));
303 pm->setTry(std::move(t));
312 Future<T> Future<T>::ensure(F func) {
313 MoveWrapper<F> funcw(std::move(func));
314 return this->then([funcw](Try<T>&& t) {
316 return makeFuture(std::move(t));
322 Future<T> Future<T>::onTimeout(Duration dur, F&& func, Timekeeper* tk) {
323 auto funcw = folly::makeMoveWrapper(std::forward<F>(func));
324 return within(dur, tk)
325 .onError([funcw](TimedOut const&) { return (*funcw)(); });
330 typename std::enable_if<
331 detail::callableWith<F, exception_wrapper>::value &&
332 detail::Extract<F>::ReturnsFuture::value,
334 Future<T>::onError(F&& func) {
336 std::is_same<typename detail::Extract<F>::Return, Future<T>>::value,
337 "Return type of onError callback must be T or Future<T>");
340 auto f = p.getFuture();
341 auto pm = folly::makeMoveWrapper(std::move(p));
342 auto funcm = folly::makeMoveWrapper(std::move(func));
343 setCallback_([pm, funcm](Try<T> t) mutable {
344 if (t.hasException()) {
346 auto f2 = (*funcm)(std::move(t.exception()));
347 f2.setCallback_([pm](Try<T> t2) mutable {
348 pm->setTry(std::move(t2));
350 } catch (const std::exception& e2) {
351 pm->setException(exception_wrapper(std::current_exception(), e2));
353 pm->setException(exception_wrapper(std::current_exception()));
356 pm->setTry(std::move(t));
363 // onError(exception_wrapper) that returns T
366 typename std::enable_if<
367 detail::callableWith<F, exception_wrapper>::value &&
368 !detail::Extract<F>::ReturnsFuture::value,
370 Future<T>::onError(F&& func) {
372 std::is_same<typename detail::Extract<F>::Return, Future<T>>::value,
373 "Return type of onError callback must be T or Future<T>");
376 auto f = p.getFuture();
377 auto pm = folly::makeMoveWrapper(std::move(p));
378 auto funcm = folly::makeMoveWrapper(std::move(func));
379 setCallback_([pm, funcm](Try<T> t) mutable {
380 if (t.hasException()) {
382 return (*funcm)(std::move(t.exception()));
385 pm->setTry(std::move(t));
393 typename std::add_lvalue_reference<T>::type Future<T>::value() {
396 return core_->getTry().value();
400 typename std::add_lvalue_reference<const T>::type Future<T>::value() const {
403 return core_->getTry().value();
407 Try<T>& Future<T>::getTry() {
410 return core_->getTry();
414 Optional<Try<T>> Future<T>::poll() {
416 if (core_->ready()) {
417 o = std::move(core_->getTry());
423 template <typename Executor>
424 inline Future<T> Future<T>::via(Executor* executor) && {
427 setExecutor(executor);
429 return std::move(*this);
433 template <typename Executor>
434 inline Future<T> Future<T>::via(Executor* executor) & {
437 MoveWrapper<Promise<T>> p;
438 auto f = p->getFuture();
439 then([p](Try<T>&& t) mutable { p->setTry(std::move(t)); });
440 return std::move(f).via(executor);
444 bool Future<T>::isReady() const {
446 return core_->ready();
450 void Future<T>::raise(exception_wrapper exception) {
451 core_->raise(std::move(exception));
457 Future<typename std::decay<T>::type> makeFuture(T&& t) {
458 Promise<typename std::decay<T>::type> p;
459 p.setValue(std::forward<T>(t));
460 return p.getFuture();
463 inline // for multiple translation units
464 Future<void> makeFuture() {
467 return p.getFuture();
473 typename std::enable_if<!std::is_reference<F>::value, bool>::type sdf)
474 -> Future<decltype(func())> {
475 Promise<decltype(func())> p;
480 return p.getFuture();
484 auto makeFutureTry(F const& func) -> Future<decltype(func())> {
486 return makeFutureTry(std::move(copy));
490 Future<T> makeFuture(std::exception_ptr const& e) {
493 return p.getFuture();
497 Future<T> makeFuture(exception_wrapper ew) {
499 p.setException(std::move(ew));
500 return p.getFuture();
503 template <class T, class E>
504 typename std::enable_if<std::is_base_of<std::exception, E>::value,
506 makeFuture(E const& e) {
508 p.setException(make_exception_wrapper<E>(e));
509 return p.getFuture();
513 Future<T> makeFuture(Try<T>&& t) {
514 Promise<typename std::decay<T>::type> p;
515 p.setTry(std::move(t));
516 return p.getFuture();
520 inline Future<void> makeFuture(Try<void>&& t) {
521 if (t.hasException()) {
522 return makeFuture<void>(std::move(t.exception()));
529 template <typename Executor>
530 Future<void> via(Executor* executor) {
531 return makeFuture().via(executor);
536 template <typename... Fs>
537 typename detail::VariadicContext<
538 typename std::decay<Fs>::type::value_type...>::type
539 whenAll(Fs&&... fs) {
541 new detail::VariadicContext<typename std::decay<Fs>::type::value_type...>();
542 ctx->total = sizeof...(fs);
543 auto f_saved = ctx->p.getFuture();
544 detail::whenAllVariadicHelper(ctx,
545 std::forward<typename std::decay<Fs>::type>(fs)...);
551 template <class InputIterator>
554 Try<typename std::iterator_traits<InputIterator>::value_type::value_type>>>
555 whenAll(InputIterator first, InputIterator last) {
557 typename std::iterator_traits<InputIterator>::value_type::value_type T;
560 return makeFuture(std::vector<Try<T>>());
562 size_t n = std::distance(first, last);
564 auto ctx = new detail::WhenAllContext<T>();
566 ctx->results.resize(n);
568 auto f_saved = ctx->p.getFuture();
570 for (size_t i = 0; first != last; ++first, ++i) {
573 f.setCallback_([ctx, i, n](Try<T> t) {
574 ctx->results[i] = std::move(t);
575 if (++ctx->count == n) {
576 ctx->p.setValue(std::move(ctx->results));
587 template <typename T>
588 struct CollectContext {
589 explicit CollectContext(int n) : count(0), threw(false) {
592 Promise<std::vector<T>> p;
593 std::vector<T> results;
594 std::atomic<size_t> count;
595 std::atomic_bool threw;
597 typedef std::vector<T> result_type;
599 static inline Future<std::vector<T>> makeEmptyFuture() {
600 return makeFuture(std::vector<T>());
603 inline void setValue() {
604 p.setValue(std::move(results));
607 inline void addResult(int i, Try<T>& t) {
608 results[i] = std::move(t.value());
613 struct CollectContext<void> {
614 explicit CollectContext(int n) : count(0), threw(false) {}
616 std::atomic<size_t> count;
617 std::atomic_bool threw;
619 typedef void result_type;
621 static inline Future<void> makeEmptyFuture() {
625 inline void setValue() {
629 inline void addResult(int i, Try<void>& t) {
636 template <class InputIterator>
637 Future<typename detail::CollectContext<
638 typename std::iterator_traits<InputIterator>::value_type::value_type
640 collect(InputIterator first, InputIterator last) {
642 typename std::iterator_traits<InputIterator>::value_type::value_type T;
645 return detail::CollectContext<T>::makeEmptyFuture();
648 size_t n = std::distance(first, last);
649 auto ctx = new detail::CollectContext<T>(n);
650 auto f_saved = ctx->p.getFuture();
652 for (size_t i = 0; first != last; ++first, ++i) {
655 f.setCallback_([ctx, i, n](Try<T> t) {
656 auto c = ++ctx->count;
658 if (t.hasException()) {
659 if (!ctx->threw.exchange(true)) {
660 ctx->p.setException(std::move(t.exception()));
662 } else if (!ctx->threw) {
663 ctx->addResult(i, t);
678 template <class InputIterator>
683 std::iterator_traits<InputIterator>::value_type::value_type> > >
684 whenAny(InputIterator first, InputIterator last) {
686 typename std::iterator_traits<InputIterator>::value_type::value_type T;
688 auto ctx = new detail::WhenAnyContext<T>(std::distance(first, last));
689 auto f_saved = ctx->p.getFuture();
691 for (size_t i = 0; first != last; first++, i++) {
693 f.setCallback_([i, ctx](Try<T>&& t) {
694 if (!ctx->done.exchange(true)) {
695 ctx->p.setValue(std::make_pair(i, std::move(t)));
704 template <class InputIterator>
705 Future<std::vector<std::pair<size_t, Try<typename
706 std::iterator_traits<InputIterator>::value_type::value_type>>>>
707 whenN(InputIterator first, InputIterator last, size_t n) {
709 std::iterator_traits<InputIterator>::value_type::value_type T;
710 typedef std::vector<std::pair<size_t, Try<T>>> V;
717 auto ctx = std::make_shared<ctx_t>();
720 // for each completed Future, increase count and add to vector, until we
721 // have n completed futures at which point we fulfill our Promise with the
726 it->then([ctx, n, i](Try<T>&& t) {
728 auto c = ++ctx->completed;
730 assert(ctx->v.size() < n);
731 v.push_back(std::make_pair(i, std::move(t)));
733 ctx->p.setTry(Try<V>(std::move(v)));
743 ctx->p.setException(std::runtime_error("Not enough futures"));
746 return ctx->p.getFuture();
749 template <class It, class T, class F, class ItT, class Arg>
750 typename std::enable_if<!isFutureResult<F, T, Arg>::value, Future<T>>::type
751 reduce(It first, It last, T initial, F func) {
753 return makeFuture(std::move(initial));
756 typedef isTry<Arg> IsTry;
758 return whenAll(first, last)
759 .then([initial, func](std::vector<Try<ItT>>& vals) mutable {
760 for (auto& val : vals) {
761 initial = func(std::move(initial),
762 // Either return a ItT&& or a Try<ItT>&& depending
763 // on the type of the argument of func.
764 val.template get<IsTry::value, Arg&&>());
770 template <class It, class T, class F, class ItT, class Arg>
771 typename std::enable_if<isFutureResult<F, T, Arg>::value, Future<T>>::type
772 reduce(It first, It last, T initial, F func) {
774 return makeFuture(std::move(initial));
777 typedef isTry<Arg> IsTry;
779 auto f = first->then([initial, func](Try<ItT>& head) mutable {
780 return func(std::move(initial),
781 head.template get<IsTry::value, Arg&&>());
784 for (++first; first != last; ++first) {
785 f = whenAll(f, *first).then([func](std::tuple<Try<T>, Try<ItT>>& t) {
786 return func(std::move(std::get<0>(t).value()),
787 // Either return a ItT&& or a Try<ItT>&& depending
788 // on the type of the argument of func.
789 std::get<1>(t).template get<IsTry::value, Arg&&>());
797 Future<T> Future<T>::within(Duration dur, Timekeeper* tk) {
798 return within(dur, TimedOut(), tk);
803 Future<T> Future<T>::within(Duration dur, E e, Timekeeper* tk) {
806 Context(E ex) : exception(std::move(ex)), promise(), token(false) {}
809 std::atomic<bool> token;
811 auto ctx = std::make_shared<Context>(std::move(e));
814 tk = folly::detail::getTimekeeperSingleton();
818 .then([ctx](Try<void> const& t) {
819 if (ctx->token.exchange(true) == false) {
820 if (t.hasException()) {
821 ctx->promise.setException(std::move(t.exception()));
823 ctx->promise.setException(std::move(ctx->exception));
828 this->then([ctx](Try<T>&& t) {
829 if (ctx->token.exchange(true) == false) {
830 ctx->promise.setTry(std::move(t));
834 return ctx->promise.getFuture();
838 Future<T> Future<T>::delayed(Duration dur, Timekeeper* tk) {
839 return whenAll(*this, futures::sleep(dur, tk))
840 .then([](std::tuple<Try<T>, Try<void>> tup) {
841 Try<T>& t = std::get<0>(tup);
842 return makeFuture<T>(std::move(t));
849 void waitImpl(Future<T>& f) {
850 // short-circuit if there's nothing to do
851 if (f.isReady()) return;
854 f = f.then([&](Try<T> t) {
856 return makeFuture(std::move(t));
860 // There's a race here between the return here and the actual finishing of
861 // the future. f is completed, but the setup may not have finished on done
862 // after the baton has posted.
863 while (!f.isReady()) {
864 std::this_thread::yield();
869 void waitImpl(Future<T>& f, Duration dur) {
870 // short-circuit if there's nothing to do
871 if (f.isReady()) return;
873 auto baton = std::make_shared<Baton<>>();
874 f = f.then([baton](Try<T> t) {
876 return makeFuture(std::move(t));
879 // Let's preserve the invariant that if we did not timeout (timed_wait returns
880 // true), then the returned Future is complete when it is returned to the
881 // caller. We need to wait out the race for that Future to complete.
882 if (baton->timed_wait(std::chrono::system_clock::now() + dur)) {
883 while (!f.isReady()) {
884 std::this_thread::yield();
890 void waitViaImpl(Future<T>& f, DrivableExecutor* e) {
891 while (!f.isReady()) {
899 Future<T>& Future<T>::wait() & {
900 detail::waitImpl(*this);
905 Future<T>&& Future<T>::wait() && {
906 detail::waitImpl(*this);
907 return std::move(*this);
911 Future<T>& Future<T>::wait(Duration dur) & {
912 detail::waitImpl(*this, dur);
917 Future<T>&& Future<T>::wait(Duration dur) && {
918 detail::waitImpl(*this, dur);
919 return std::move(*this);
923 Future<T>& Future<T>::waitVia(DrivableExecutor* e) & {
924 detail::waitViaImpl(*this, e);
929 Future<T>&& Future<T>::waitVia(DrivableExecutor* e) && {
930 detail::waitViaImpl(*this, e);
931 return std::move(*this);
936 return std::move(wait().value());
940 inline void Future<void>::get() {
945 T Future<T>::get(Duration dur) {
948 return std::move(value());
955 inline void Future<void>::get(Duration dur) {
965 T Future<T>::getVia(DrivableExecutor* e) {
966 return std::move(waitVia(e).value());
970 inline void Future<void>::getVia(DrivableExecutor* e) {
975 Future<bool> Future<T>::willEqual(Future<T>& f) {
976 return whenAll(*this, f).then([](const std::tuple<Try<T>, Try<T>>& t) {
977 if (std::get<0>(t).hasValue() && std::get<1>(t).hasValue()) {
978 return std::get<0>(t).value() == std::get<1>(t).value();
987 Future<T> Future<T>::filter(F predicate) {
988 auto p = folly::makeMoveWrapper(std::move(predicate));
989 return this->then([p](T val) {
990 T const& valConstRef = val;
991 if (!(*p)(valConstRef)) {
992 throw PredicateDoesNotObtain();
1001 Future<Z> chainHelper(Future<Z> f) {
1005 template <class Z, class F, class Fn, class... Callbacks>
1006 Future<Z> chainHelper(F f, Fn fn, Callbacks... fns) {
1007 return chainHelper<Z>(f.then(fn), fns...);
1011 template <class A, class Z, class... Callbacks>
1012 std::function<Future<Z>(Try<A>)>
1013 chain(Callbacks... fns) {
1014 MoveWrapper<Promise<A>> pw;
1015 MoveWrapper<Future<Z>> fw(chainHelper<Z>(pw->getFuture(), fns...));
1016 return [=](Try<A> t) mutable {
1017 pw->setTry(std::move(t));
1018 return std::move(*fw);
1024 } // namespace folly
1026 // I haven't included a Future<T&> specialization because I don't forsee us
1027 // using it, however it is not difficult to add when needed. Refer to
1028 // Future<void> for guidance. std::future and boost::future code would also be