2 * Copyright 2017 Facebook, Inc.
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
8 * http://www.apache.org/licenses/LICENSE-2.0
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
25 #include <folly/Baton.h>
26 #include <folly/Optional.h>
27 #include <folly/Random.h>
28 #include <folly/futures/Timekeeper.h>
29 #include <folly/futures/detail/Core.h>
31 #ifndef FOLLY_FUTURE_USING_FIBER
32 #if FOLLY_MOBILE || defined(__APPLE__)
33 #define FOLLY_FUTURE_USING_FIBER 0
35 #define FOLLY_FUTURE_USING_FIBER 1
36 #include <folly/fibers/Baton.h>
46 #if FOLLY_FUTURE_USING_FIBER
47 typedef folly::fibers::Baton FutureBatonType;
49 typedef folly::Baton<> FutureBatonType;
52 } // namespace futures
55 std::shared_ptr<Timekeeper> getTimekeeperSingleton();
60 // Guarantees that the stored functor is destructed before the stored promise
61 // may be fulfilled. Assumes the stored functor to be noexcept-destructible.
62 template <typename T, typename F>
63 class CoreCallbackState {
65 template <typename FF>
66 CoreCallbackState(Promise<T>&& promise, FF&& func) noexcept(
67 noexcept(F(std::declval<FF>())))
68 : func_(std::forward<FF>(func)), promise_(std::move(promise)) {
69 assert(before_barrier());
72 CoreCallbackState(CoreCallbackState&& that) noexcept(
73 noexcept(F(std::declval<F>()))) {
74 if (that.before_barrier()) {
75 new (&func_) F(std::move(that.func_));
76 promise_ = that.stealPromise();
80 CoreCallbackState& operator=(CoreCallbackState&&) = delete;
82 ~CoreCallbackState() {
83 if (before_barrier()) {
88 template <typename... Args>
89 auto invoke(Args&&... args) noexcept(
90 noexcept(std::declval<F&&>()(std::declval<Args&&>()...))) {
91 assert(before_barrier());
92 return std::move(func_)(std::forward<Args>(args)...);
95 template <typename... Args>
96 auto tryInvoke(Args&&... args) noexcept {
97 return makeTryWith([&] { return invoke(std::forward<Args>(args)...); });
100 void setTry(Try<T>&& t) {
101 stealPromise().setTry(std::move(t));
104 void setException(exception_wrapper&& ew) {
105 stealPromise().setException(std::move(ew));
108 Promise<T> stealPromise() noexcept {
109 assert(before_barrier());
111 return std::move(promise_);
115 bool before_barrier() const noexcept {
116 return !promise_.isFulfilled();
122 Promise<T> promise_{Promise<T>::makeEmpty()};
125 template <typename T, typename F>
126 inline auto makeCoreCallbackState(Promise<T>&& p, F&& f) noexcept(
127 noexcept(CoreCallbackState<T, _t<std::decay<F>>>(
128 std::declval<Promise<T>&&>(),
129 std::declval<F&&>()))) {
130 return CoreCallbackState<T, _t<std::decay<F>>>(
131 std::move(p), std::forward<F>(f));
133 } // namespace detail
134 } // namespace futures
137 SemiFuture<typename std::decay<T>::type> makeSemiFuture(T&& t) {
138 return makeSemiFuture(Try<typename std::decay<T>::type>(std::forward<T>(t)));
141 inline SemiFuture<Unit> makeSemiFuture() {
142 return makeSemiFuture(Unit{});
145 // makeSemiFutureWith(SemiFuture<T>()) -> SemiFuture<T>
147 typename std::enable_if<
148 isSemiFuture<typename std::result_of<F()>::type>::value,
149 typename std::result_of<F()>::type>::type
150 makeSemiFutureWith(F&& func) {
152 typename isSemiFuture<typename std::result_of<F()>::type>::Inner;
154 return std::forward<F>(func)();
155 } catch (std::exception& e) {
156 return makeSemiFuture<InnerType>(
157 exception_wrapper(std::current_exception(), e));
159 return makeSemiFuture<InnerType>(
160 exception_wrapper(std::current_exception()));
164 // makeSemiFutureWith(T()) -> SemiFuture<T>
165 // makeSemiFutureWith(void()) -> SemiFuture<Unit>
167 typename std::enable_if<
168 !(isSemiFuture<typename std::result_of<F()>::type>::value),
169 SemiFuture<Unit::LiftT<typename std::result_of<F()>::type>>>::type
170 makeSemiFutureWith(F&& func) {
171 using LiftedResult = Unit::LiftT<typename std::result_of<F()>::type>;
172 return makeSemiFuture<LiftedResult>(
173 makeTryWith([&func]() mutable { return std::forward<F>(func)(); }));
177 SemiFuture<T> makeSemiFuture(std::exception_ptr const& e) {
178 return makeSemiFuture(Try<T>(e));
182 SemiFuture<T> makeSemiFuture(exception_wrapper ew) {
183 return makeSemiFuture(Try<T>(std::move(ew)));
186 template <class T, class E>
188 enable_if<std::is_base_of<std::exception, E>::value, SemiFuture<T>>::type
189 makeSemiFuture(E const& e) {
190 return makeSemiFuture(Try<T>(make_exception_wrapper<E>(e)));
194 SemiFuture<T> makeSemiFuture(Try<T>&& t) {
195 return SemiFuture<T>(new futures::detail::Core<T>(std::move(t)));
199 SemiFuture<T> SemiFuture<T>::makeEmpty() {
200 return SemiFuture<T>(futures::detail::EmptyConstruct{});
204 SemiFuture<T>::SemiFuture(SemiFuture<T>&& other) noexcept : core_(other.core_) {
205 other.core_ = nullptr;
209 SemiFuture<T>& SemiFuture<T>::operator=(SemiFuture<T>&& other) noexcept {
210 std::swap(core_, other.core_);
215 SemiFuture<T>::SemiFuture(Future<T>&& other) noexcept : core_(other.core_) {
216 other.core_ = nullptr;
220 SemiFuture<T>& SemiFuture<T>::operator=(Future<T>&& other) noexcept {
221 std::swap(core_, other.core_);
226 template <class T2, typename>
227 SemiFuture<T>::SemiFuture(T2&& val)
228 : core_(new futures::detail::Core<T>(Try<T>(std::forward<T2>(val)))) {}
231 template <typename T2>
232 SemiFuture<T>::SemiFuture(
233 typename std::enable_if<std::is_same<Unit, T2>::value>::type*)
234 : core_(new futures::detail::Core<T>(Try<T>(T()))) {}
239 typename std::enable_if<std::is_constructible<T, Args&&...>::value, int>::
241 SemiFuture<T>::SemiFuture(in_place_t, Args&&... args)
243 new futures::detail::Core<T>(in_place, std::forward<Args>(args)...)) {
247 SemiFuture<T>::~SemiFuture() {
252 typename std::add_lvalue_reference<T>::type SemiFuture<T>::value() {
255 return core_->getTry().value();
259 typename std::add_lvalue_reference<const T>::type SemiFuture<T>::value() const {
262 return core_->getTry().value();
266 inline Future<T> SemiFuture<T>::via(Executor* executor, int8_t priority) && {
269 setExecutor(executor, priority);
271 auto newFuture = Future<T>(core_);
277 inline Future<T> SemiFuture<T>::via(Executor* executor, int8_t priority) & {
280 auto f = p.getFuture();
281 auto func = [p = std::move(p)](Try<T>&& t) mutable {
282 p.setTry(std::move(t));
284 using R = futures::detail::callableResult<T, decltype(func)>;
285 thenImplementation<decltype(func), R>(std::move(func), typename R::Arg());
286 return std::move(f).via(executor, priority);
290 bool SemiFuture<T>::isReady() const {
292 return core_->ready();
296 bool SemiFuture<T>::hasValue() {
297 return getTry().hasValue();
301 bool SemiFuture<T>::hasException() {
302 return getTry().hasException();
306 void SemiFuture<T>::detach() {
308 core_->detachFuture();
314 Try<T>& SemiFuture<T>::getTry() {
317 return core_->getTry();
321 void SemiFuture<T>::throwIfInvalid() const {
327 Optional<Try<T>> SemiFuture<T>::poll() {
329 if (core_->ready()) {
330 o = std::move(core_->getTry());
336 void SemiFuture<T>::raise(exception_wrapper exception) {
337 core_->raise(std::move(exception));
342 void SemiFuture<T>::setCallback_(F&& func) {
344 core_->setCallback(std::forward<F>(func));
348 SemiFuture<T>::SemiFuture(futures::detail::EmptyConstruct) noexcept
352 Future<T> Future<T>::makeEmpty() {
353 return Future<T>(futures::detail::EmptyConstruct{});
357 Future<T>::Future(Future<T>&& other) noexcept
358 : SemiFuture<T>(std::move(other)) {}
361 Future<T>& Future<T>::operator=(Future<T>&& other) noexcept {
362 SemiFuture<T>::operator=(SemiFuture<T>{std::move(other)});
369 typename std::enable_if<
370 !std::is_same<T, typename std::decay<T2>::type>::value &&
371 std::is_constructible<T, T2&&>::value &&
372 std::is_convertible<T2&&, T>::value,
374 Future<T>::Future(Future<T2>&& other)
375 : Future(std::move(other).then([](T2&& v) { return T(std::move(v)); })) {}
380 typename std::enable_if<
381 !std::is_same<T, typename std::decay<T2>::type>::value &&
382 std::is_constructible<T, T2&&>::value &&
383 !std::is_convertible<T2&&, T>::value,
385 Future<T>::Future(Future<T2>&& other)
386 : Future(std::move(other).then([](T2&& v) { return T(std::move(v)); })) {}
391 typename std::enable_if<
392 !std::is_same<T, typename std::decay<T2>::type>::value &&
393 std::is_constructible<T, T2&&>::value,
395 Future<T>& Future<T>::operator=(Future<T2>&& other) {
397 std::move(other).then([](T2&& v) { return T(std::move(v)); }));
400 // TODO: isSemiFuture
402 template <class T2, typename>
403 Future<T>::Future(T2&& val) : SemiFuture<T>(std::forward<T2>(val)) {}
406 template <typename T2>
407 Future<T>::Future(typename std::enable_if<std::is_same<Unit, T2>::value>::type*)
413 typename std::enable_if<std::is_constructible<T, Args&&...>::value, int>::
415 Future<T>::Future(in_place_t, Args&&... args)
416 : SemiFuture<T>(in_place, std::forward<Args>(args)...) {}
419 Future<T>::~Future() {
426 typename std::enable_if<isFuture<F>::value,
427 Future<typename isFuture<T>::Inner>>::type
428 Future<T>::unwrap() {
429 return then([](Future<typename isFuture<T>::Inner> internal_future) {
430 return internal_future;
436 // Variant: returns a value
437 // e.g. f.then([](Try<T>&& t){ return t.value(); });
439 template <typename F, typename R, bool isTry, typename... Args>
440 typename std::enable_if<!R::ReturnsFuture::value, typename R::Return>::type
441 SemiFuture<T>::thenImplementation(
443 futures::detail::argResult<isTry, F, Args...>) {
444 static_assert(sizeof...(Args) <= 1, "Then must take zero/one argument");
445 typedef typename R::ReturnsFuture::Inner B;
447 this->throwIfInvalid();
450 p.core_->setInterruptHandlerNoLock(this->core_->getInterruptHandler());
452 // grab the Future now before we lose our handle on the Promise
453 auto f = p.getFuture();
454 f.core_->setExecutorNoLock(this->getExecutor());
456 /* This is a bit tricky.
458 We can't just close over *this in case this Future gets moved. So we
459 make a new dummy Future. We could figure out something more
460 sophisticated that avoids making a new Future object when it can, as an
461 optimization. But this is correct.
463 core_ can't be moved, it is explicitly disallowed (as is copying). But
464 if there's ever a reason to allow it, this is one place that makes that
465 assumption and would need to be fixed. We use a standard shared pointer
466 for core_ (by copying it in), which means in essence obj holds a shared
467 pointer to itself. But this shouldn't leak because Promise will not
468 outlive the continuation, because Promise will setException() with a
469 broken Promise if it is destructed before completed. We could use a
470 weak pointer but it would have to be converted to a shared pointer when
471 func is executed (because the Future returned by func may possibly
472 persist beyond the callback, if it gets moved), and so it is an
473 optimization to just make it shared from the get-go.
475 Two subtle but important points about this design. futures::detail::Core
476 has no back pointers to Future or Promise, so if Future or Promise get
477 moved (and they will be moved in performant code) we don't have to do
478 anything fancy. And because we store the continuation in the
479 futures::detail::Core, not in the Future, we can execute the continuation
480 even after the Future has gone out of scope. This is an intentional design
481 decision. It is likely we will want to be able to cancel a continuation
482 in some circumstances, but I think it should be explicit not implicit
483 in the destruction of the Future used to create it.
486 [state = futures::detail::makeCoreCallbackState(
487 std::move(p), std::forward<F>(func))](Try<T>&& t) mutable {
489 if (!isTry && t.hasException()) {
490 state.setException(std::move(t.exception()));
492 state.setTry(makeTryWith(
493 [&] { return state.invoke(t.template get<isTry, Args>()...); }));
499 // Variant: returns a Future
500 // e.g. f.then([](T&& t){ return makeFuture<T>(t); });
502 template <typename F, typename R, bool isTry, typename... Args>
503 typename std::enable_if<R::ReturnsFuture::value, typename R::Return>::type
504 SemiFuture<T>::thenImplementation(
506 futures::detail::argResult<isTry, F, Args...>) {
507 static_assert(sizeof...(Args) <= 1, "Then must take zero/one argument");
508 typedef typename R::ReturnsFuture::Inner B;
509 this->throwIfInvalid();
512 p.core_->setInterruptHandlerNoLock(this->core_->getInterruptHandler());
514 // grab the Future now before we lose our handle on the Promise
515 auto f = p.getFuture();
516 f.core_->setExecutorNoLock(this->getExecutor());
519 [state = futures::detail::makeCoreCallbackState(
520 std::move(p), std::forward<F>(func))](Try<T>&& t) mutable {
521 if (!isTry && t.hasException()) {
522 state.setException(std::move(t.exception()));
524 auto tf2 = state.tryInvoke(t.template get<isTry, Args>()...);
525 if (tf2.hasException()) {
526 state.setException(std::move(tf2.exception()));
528 tf2->setCallback_([p = state.stealPromise()](Try<B> && b) mutable {
529 p.setTry(std::move(b));
538 template <typename T>
539 template <typename R, typename Caller, typename... Args>
540 Future<typename isFuture<R>::Inner>
541 Future<T>::then(R(Caller::*func)(Args...), Caller *instance) {
542 typedef typename std::remove_cv<typename std::remove_reference<
543 typename futures::detail::ArgType<Args...>::FirstArg>::type>::type
546 return then([instance, func](Try<T>&& t){
547 return (instance->*func)(t.template get<isTry<FirstArg>::value, Args>()...);
552 Future<Unit> Future<T>::then() {
553 return then([] () {});
556 // onError where the callback returns T
559 typename std::enable_if<
560 !futures::detail::callableWith<F, exception_wrapper>::value &&
561 !futures::detail::callableWith<F, exception_wrapper&>::value &&
562 !futures::detail::Extract<F>::ReturnsFuture::value,
564 Future<T>::onError(F&& func) {
565 typedef std::remove_reference_t<
566 typename futures::detail::Extract<F>::FirstArg>
569 std::is_same<typename futures::detail::Extract<F>::RawReturn, T>::value,
570 "Return type of onError callback must be T or Future<T>");
573 p.core_->setInterruptHandlerNoLock(this->core_->getInterruptHandler());
574 auto f = p.getFuture();
577 [state = futures::detail::makeCoreCallbackState(
578 std::move(p), std::forward<F>(func))](Try<T>&& t) mutable {
579 if (auto e = t.template tryGetExceptionObject<Exn>()) {
580 state.setTry(makeTryWith([&] { return state.invoke(*e); }));
582 state.setTry(std::move(t));
589 // onError where the callback returns Future<T>
592 typename std::enable_if<
593 !futures::detail::callableWith<F, exception_wrapper>::value &&
594 !futures::detail::callableWith<F, exception_wrapper&>::value &&
595 futures::detail::Extract<F>::ReturnsFuture::value,
597 Future<T>::onError(F&& func) {
599 std::is_same<typename futures::detail::Extract<F>::Return, Future<T>>::
601 "Return type of onError callback must be T or Future<T>");
602 typedef std::remove_reference_t<
603 typename futures::detail::Extract<F>::FirstArg>
607 auto f = p.getFuture();
610 [state = futures::detail::makeCoreCallbackState(
611 std::move(p), std::forward<F>(func))](Try<T>&& t) mutable {
612 if (auto e = t.template tryGetExceptionObject<Exn>()) {
613 auto tf2 = state.tryInvoke(*e);
614 if (tf2.hasException()) {
615 state.setException(std::move(tf2.exception()));
617 tf2->setCallback_([p = state.stealPromise()](Try<T> && t3) mutable {
618 p.setTry(std::move(t3));
622 state.setTry(std::move(t));
631 Future<T> Future<T>::ensure(F&& func) {
632 return this->then([funcw = std::forward<F>(func)](Try<T> && t) mutable {
634 return makeFuture(std::move(t));
640 Future<T> Future<T>::onTimeout(Duration dur, F&& func, Timekeeper* tk) {
641 return within(dur, tk).onError([funcw = std::forward<F>(func)](
642 TimedOut const&) { return std::move(funcw)(); });
647 typename std::enable_if<
648 futures::detail::callableWith<F, exception_wrapper>::value &&
649 futures::detail::Extract<F>::ReturnsFuture::value,
651 Future<T>::onError(F&& func) {
653 std::is_same<typename futures::detail::Extract<F>::Return, Future<T>>::
655 "Return type of onError callback must be T or Future<T>");
658 auto f = p.getFuture();
660 [state = futures::detail::makeCoreCallbackState(
661 std::move(p), std::forward<F>(func))](Try<T> t) mutable {
662 if (t.hasException()) {
663 auto tf2 = state.tryInvoke(std::move(t.exception()));
664 if (tf2.hasException()) {
665 state.setException(std::move(tf2.exception()));
667 tf2->setCallback_([p = state.stealPromise()](Try<T> && t3) mutable {
668 p.setTry(std::move(t3));
672 state.setTry(std::move(t));
679 // onError(exception_wrapper) that returns T
682 typename std::enable_if<
683 futures::detail::callableWith<F, exception_wrapper>::value &&
684 !futures::detail::Extract<F>::ReturnsFuture::value,
686 Future<T>::onError(F&& func) {
688 std::is_same<typename futures::detail::Extract<F>::Return, Future<T>>::
690 "Return type of onError callback must be T or Future<T>");
693 auto f = p.getFuture();
695 [state = futures::detail::makeCoreCallbackState(
696 std::move(p), std::forward<F>(func))](Try<T>&& t) mutable {
697 if (t.hasException()) {
698 state.setTry(makeTryWith(
699 [&] { return state.invoke(std::move(t.exception())); }));
701 state.setTry(std::move(t));
709 Try<T>& Future<T>::getTryVia(DrivableExecutor* e) {
710 return waitVia(e).getTry();
713 template <class Func>
714 auto via(Executor* x, Func&& func)
715 -> Future<typename isFuture<decltype(std::declval<Func>()())>::Inner> {
716 // TODO make this actually more performant. :-P #7260175
717 return via(x).then(std::forward<Func>(func));
721 Future<T>::Future(futures::detail::EmptyConstruct) noexcept
722 : SemiFuture<T>(futures::detail::EmptyConstruct{}) {}
727 Future<typename std::decay<T>::type> makeFuture(T&& t) {
728 return makeFuture(Try<typename std::decay<T>::type>(std::forward<T>(t)));
731 inline Future<Unit> makeFuture() {
732 return makeFuture(Unit{});
735 // makeFutureWith(Future<T>()) -> Future<T>
737 typename std::enable_if<isFuture<typename std::result_of<F()>::type>::value,
738 typename std::result_of<F()>::type>::type
739 makeFutureWith(F&& func) {
741 typename isFuture<typename std::result_of<F()>::type>::Inner;
743 return std::forward<F>(func)();
744 } catch (std::exception& e) {
745 return makeFuture<InnerType>(
746 exception_wrapper(std::current_exception(), e));
748 return makeFuture<InnerType>(exception_wrapper(std::current_exception()));
752 // makeFutureWith(T()) -> Future<T>
753 // makeFutureWith(void()) -> Future<Unit>
755 typename std::enable_if<
756 !(isFuture<typename std::result_of<F()>::type>::value),
757 Future<Unit::LiftT<typename std::result_of<F()>::type>>>::type
758 makeFutureWith(F&& func) {
759 using LiftedResult = Unit::LiftT<typename std::result_of<F()>::type>;
760 return makeFuture<LiftedResult>(
761 makeTryWith([&func]() mutable { return std::forward<F>(func)(); }));
765 Future<T> makeFuture(std::exception_ptr const& e) {
766 return makeFuture(Try<T>(e));
770 Future<T> makeFuture(exception_wrapper ew) {
771 return makeFuture(Try<T>(std::move(ew)));
774 template <class T, class E>
775 typename std::enable_if<std::is_base_of<std::exception, E>::value,
777 makeFuture(E const& e) {
778 return makeFuture(Try<T>(make_exception_wrapper<E>(e)));
782 Future<T> makeFuture(Try<T>&& t) {
783 return Future<T>(new futures::detail::Core<T>(std::move(t)));
787 Future<Unit> via(Executor* executor, int8_t priority) {
788 return makeFuture().via(executor, priority);
791 // mapSetCallback calls func(i, Try<T>) when every future completes
793 template <class T, class InputIterator, class F>
794 void mapSetCallback(InputIterator first, InputIterator last, F func) {
795 for (size_t i = 0; first != last; ++first, ++i) {
796 first->setCallback_([func, i](Try<T>&& t) {
797 func(i, std::move(t));
802 // collectAll (variadic)
804 template <typename... Fs>
805 typename futures::detail::CollectAllVariadicContext<
806 typename std::decay<Fs>::type::value_type...>::type
807 collectAll(Fs&&... fs) {
808 auto ctx = std::make_shared<futures::detail::CollectAllVariadicContext<
809 typename std::decay<Fs>::type::value_type...>>();
810 futures::detail::collectVariadicHelper<
811 futures::detail::CollectAllVariadicContext>(ctx, std::forward<Fs>(fs)...);
812 return ctx->p.getFuture();
815 // collectAll (iterator)
817 template <class InputIterator>
820 Try<typename std::iterator_traits<InputIterator>::value_type::value_type>>>
821 collectAll(InputIterator first, InputIterator last) {
823 typename std::iterator_traits<InputIterator>::value_type::value_type T;
825 struct CollectAllContext {
826 CollectAllContext(size_t n) : results(n) {}
827 ~CollectAllContext() {
828 p.setValue(std::move(results));
830 Promise<std::vector<Try<T>>> p;
831 std::vector<Try<T>> results;
835 std::make_shared<CollectAllContext>(size_t(std::distance(first, last)));
836 mapSetCallback<T>(first, last, [ctx](size_t i, Try<T>&& t) {
837 ctx->results[i] = std::move(t);
839 return ctx->p.getFuture();
842 // collect (iterator)
847 template <typename T>
848 struct CollectContext {
850 explicit Nothing(int /* n */) {}
853 using Result = typename std::conditional<
854 std::is_void<T>::value,
856 std::vector<T>>::type;
858 using InternalResult = typename std::conditional<
859 std::is_void<T>::value,
861 std::vector<Optional<T>>>::type;
863 explicit CollectContext(size_t n) : result(n) {}
865 if (!threw.exchange(true)) {
866 // map Optional<T> -> T
867 std::vector<T> finalResult;
868 finalResult.reserve(result.size());
869 std::transform(result.begin(), result.end(),
870 std::back_inserter(finalResult),
871 [](Optional<T>& o) { return std::move(o.value()); });
872 p.setValue(std::move(finalResult));
875 inline void setPartialResult(size_t i, Try<T>& t) {
876 result[i] = std::move(t.value());
879 InternalResult result;
880 std::atomic<bool> threw {false};
883 } // namespace detail
884 } // namespace futures
886 template <class InputIterator>
887 Future<typename futures::detail::CollectContext<typename std::iterator_traits<
888 InputIterator>::value_type::value_type>::Result>
889 collect(InputIterator first, InputIterator last) {
891 typename std::iterator_traits<InputIterator>::value_type::value_type T;
893 auto ctx = std::make_shared<futures::detail::CollectContext<T>>(
894 std::distance(first, last));
895 mapSetCallback<T>(first, last, [ctx](size_t i, Try<T>&& t) {
896 if (t.hasException()) {
897 if (!ctx->threw.exchange(true)) {
898 ctx->p.setException(std::move(t.exception()));
900 } else if (!ctx->threw) {
901 ctx->setPartialResult(i, t);
904 return ctx->p.getFuture();
907 // collect (variadic)
909 template <typename... Fs>
910 typename futures::detail::CollectVariadicContext<
911 typename std::decay<Fs>::type::value_type...>::type
912 collect(Fs&&... fs) {
913 auto ctx = std::make_shared<futures::detail::CollectVariadicContext<
914 typename std::decay<Fs>::type::value_type...>>();
915 futures::detail::collectVariadicHelper<
916 futures::detail::CollectVariadicContext>(ctx, std::forward<Fs>(fs)...);
917 return ctx->p.getFuture();
920 // collectAny (iterator)
922 template <class InputIterator>
927 std::iterator_traits<InputIterator>::value_type::value_type>>>
928 collectAny(InputIterator first, InputIterator last) {
930 typename std::iterator_traits<InputIterator>::value_type::value_type T;
932 struct CollectAnyContext {
933 CollectAnyContext() {}
934 Promise<std::pair<size_t, Try<T>>> p;
935 std::atomic<bool> done {false};
938 auto ctx = std::make_shared<CollectAnyContext>();
939 mapSetCallback<T>(first, last, [ctx](size_t i, Try<T>&& t) {
940 if (!ctx->done.exchange(true)) {
941 ctx->p.setValue(std::make_pair(i, std::move(t)));
944 return ctx->p.getFuture();
947 // collectAnyWithoutException (iterator)
949 template <class InputIterator>
952 typename std::iterator_traits<InputIterator>::value_type::value_type>>
953 collectAnyWithoutException(InputIterator first, InputIterator last) {
955 typename std::iterator_traits<InputIterator>::value_type::value_type T;
957 struct CollectAnyWithoutExceptionContext {
958 CollectAnyWithoutExceptionContext(){}
959 Promise<std::pair<size_t, T>> p;
960 std::atomic<bool> done{false};
961 std::atomic<size_t> nFulfilled{0};
965 auto ctx = std::make_shared<CollectAnyWithoutExceptionContext>();
966 ctx->nTotal = size_t(std::distance(first, last));
968 mapSetCallback<T>(first, last, [ctx](size_t i, Try<T>&& t) {
969 if (!t.hasException() && !ctx->done.exchange(true)) {
970 ctx->p.setValue(std::make_pair(i, std::move(t.value())));
971 } else if (++ctx->nFulfilled == ctx->nTotal) {
972 ctx->p.setException(t.exception());
975 return ctx->p.getFuture();
978 // collectN (iterator)
980 template <class InputIterator>
981 Future<std::vector<std::pair<size_t, Try<typename
982 std::iterator_traits<InputIterator>::value_type::value_type>>>>
983 collectN(InputIterator first, InputIterator last, size_t n) {
985 std::iterator_traits<InputIterator>::value_type::value_type T;
986 typedef std::vector<std::pair<size_t, Try<T>>> V;
988 struct CollectNContext {
990 std::atomic<size_t> completed = {0};
993 auto ctx = std::make_shared<CollectNContext>();
995 if (size_t(std::distance(first, last)) < n) {
996 ctx->p.setException(std::runtime_error("Not enough futures"));
998 // for each completed Future, increase count and add to vector, until we
999 // have n completed futures at which point we fulfil our Promise with the
1001 mapSetCallback<T>(first, last, [ctx, n](size_t i, Try<T>&& t) {
1002 auto c = ++ctx->completed;
1004 assert(ctx->v.size() < n);
1005 ctx->v.emplace_back(i, std::move(t));
1007 ctx->p.setTry(Try<V>(std::move(ctx->v)));
1013 return ctx->p.getFuture();
1016 // reduce (iterator)
1018 template <class It, class T, class F>
1019 Future<T> reduce(It first, It last, T&& initial, F&& func) {
1020 if (first == last) {
1021 return makeFuture(std::move(initial));
1024 typedef typename std::iterator_traits<It>::value_type::value_type ItT;
1025 typedef typename std::conditional<
1026 futures::detail::callableWith<F, T&&, Try<ItT>&&>::value,
1029 typedef isTry<Arg> IsTry;
1031 auto sfunc = std::make_shared<F>(std::move(func));
1033 auto f = first->then(
1034 [ minitial = std::move(initial), sfunc ](Try<ItT> & head) mutable {
1036 std::move(minitial), head.template get<IsTry::value, Arg&&>());
1039 for (++first; first != last; ++first) {
1040 f = collectAll(f, *first).then([sfunc](std::tuple<Try<T>, Try<ItT>>& t) {
1041 return (*sfunc)(std::move(std::get<0>(t).value()),
1042 // Either return a ItT&& or a Try<ItT>&& depending
1043 // on the type of the argument of func.
1044 std::get<1>(t).template get<IsTry::value, Arg&&>());
1051 // window (collection)
1053 template <class Collection, class F, class ItT, class Result>
1054 std::vector<Future<Result>>
1055 window(Collection input, F func, size_t n) {
1056 struct WindowContext {
1057 WindowContext(Collection&& i, F&& fn)
1058 : input_(std::move(i)), promises_(input_.size()),
1059 func_(std::move(fn))
1061 std::atomic<size_t> i_ {0};
1063 std::vector<Promise<Result>> promises_;
1066 static inline void spawn(const std::shared_ptr<WindowContext>& ctx) {
1067 size_t i = ctx->i_++;
1068 if (i < ctx->input_.size()) {
1069 // Using setCallback_ directly since we don't need the Future
1070 ctx->func_(std::move(ctx->input_[i])).setCallback_(
1071 // ctx is captured by value
1072 [ctx, i](Try<Result>&& t) {
1073 ctx->promises_[i].setTry(std::move(t));
1074 // Chain another future onto this one
1075 spawn(std::move(ctx));
1081 auto max = std::min(n, input.size());
1083 auto ctx = std::make_shared<WindowContext>(
1084 std::move(input), std::move(func));
1086 for (size_t i = 0; i < max; ++i) {
1087 // Start the first n Futures
1088 WindowContext::spawn(ctx);
1091 std::vector<Future<Result>> futures;
1092 futures.reserve(ctx->promises_.size());
1093 for (auto& promise : ctx->promises_) {
1094 futures.emplace_back(promise.getFuture());
1103 template <class I, class F>
1104 Future<I> Future<T>::reduce(I&& initial, F&& func) {
1106 minitial = std::forward<I>(initial),
1107 mfunc = std::forward<F>(func)
1108 ](T& vals) mutable {
1109 auto ret = std::move(minitial);
1110 for (auto& val : vals) {
1111 ret = mfunc(std::move(ret), std::move(val));
1117 // unorderedReduce (iterator)
1119 template <class It, class T, class F, class ItT, class Arg>
1120 Future<T> unorderedReduce(It first, It last, T initial, F func) {
1121 if (first == last) {
1122 return makeFuture(std::move(initial));
1125 typedef isTry<Arg> IsTry;
1127 struct UnorderedReduceContext {
1128 UnorderedReduceContext(T&& memo, F&& fn, size_t n)
1129 : lock_(), memo_(makeFuture<T>(std::move(memo))),
1130 func_(std::move(fn)), numThens_(0), numFutures_(n), promise_()
1132 folly::MicroSpinLock lock_; // protects memo_ and numThens_
1135 size_t numThens_; // how many Futures completed and called .then()
1136 size_t numFutures_; // how many Futures in total
1137 Promise<T> promise_;
1140 auto ctx = std::make_shared<UnorderedReduceContext>(
1141 std::move(initial), std::move(func), std::distance(first, last));
1143 mapSetCallback<ItT>(
1146 [ctx](size_t /* i */, Try<ItT>&& t) {
1147 // Futures can be completed in any order, simultaneously.
1148 // To make this non-blocking, we create a new Future chain in
1149 // the order of completion to reduce the values.
1150 // The spinlock just protects chaining a new Future, not actually
1151 // executing the reduce, which should be really fast.
1152 folly::MSLGuard lock(ctx->lock_);
1154 ctx->memo_.then([ ctx, mt = std::move(t) ](T && v) mutable {
1155 // Either return a ItT&& or a Try<ItT>&& depending
1156 // on the type of the argument of func.
1157 return ctx->func_(std::move(v),
1158 mt.template get<IsTry::value, Arg&&>());
1160 if (++ctx->numThens_ == ctx->numFutures_) {
1161 // After reducing the value of the last Future, fulfill the Promise
1162 ctx->memo_.setCallback_(
1163 [ctx](Try<T>&& t2) { ctx->promise_.setValue(std::move(t2)); });
1167 return ctx->promise_.getFuture();
1173 Future<T> Future<T>::within(Duration dur, Timekeeper* tk) {
1174 return within(dur, TimedOut(), tk);
1179 Future<T> Future<T>::within(Duration dur, E e, Timekeeper* tk) {
1182 Context(E ex) : exception(std::move(ex)), promise() {}
1184 Future<Unit> thisFuture;
1186 std::atomic<bool> token {false};
1189 if (this->isReady()) {
1190 return std::move(*this);
1193 std::shared_ptr<Timekeeper> tks;
1195 tks = folly::detail::getTimekeeperSingleton();
1196 tk = DCHECK_NOTNULL(tks.get());
1199 auto ctx = std::make_shared<Context>(std::move(e));
1201 ctx->thisFuture = this->then([ctx](Try<T>&& t) mutable {
1202 if (ctx->token.exchange(true) == false) {
1203 ctx->promise.setTry(std::move(t));
1207 // Have time keeper use a weak ptr to hold ctx,
1208 // so that ctx can be deallocated as soon as the future job finished.
1209 tk->after(dur).then([weakCtx = to_weak_ptr(ctx)](Try<Unit> const& t) mutable {
1210 auto lockedCtx = weakCtx.lock();
1212 // ctx already released. "this" completed first, cancel "after"
1215 // "after" completed first, cancel "this"
1216 lockedCtx->thisFuture.raise(TimedOut());
1217 if (lockedCtx->token.exchange(true) == false) {
1218 if (t.hasException()) {
1219 lockedCtx->promise.setException(std::move(t.exception()));
1221 lockedCtx->promise.setException(std::move(lockedCtx->exception));
1226 return ctx->promise.getFuture().via(this->getExecutor());
1232 Future<T> Future<T>::delayed(Duration dur, Timekeeper* tk) {
1233 return collectAll(*this, futures::sleep(dur, tk))
1234 .then([](std::tuple<Try<T>, Try<Unit>> tup) {
1235 Try<T>& t = std::get<0>(tup);
1236 return makeFuture<T>(std::move(t));
1243 template <class FutureType, typename T = typename FutureType::value_type>
1244 void waitImpl(FutureType& f) {
1245 // short-circuit if there's nothing to do
1246 if (f.isReady()) return;
1248 FutureBatonType baton;
1249 f.setCallback_([&](const Try<T>& /* t */) { baton.post(); });
1251 assert(f.isReady());
1254 template <class FutureType, typename T = typename FutureType::value_type>
1255 void waitImpl(FutureType& f, Duration dur) {
1256 // short-circuit if there's nothing to do
1262 auto ret = promise.getFuture();
1263 auto baton = std::make_shared<FutureBatonType>();
1264 f.setCallback_([ baton, promise = std::move(promise) ](Try<T> && t) mutable {
1265 promise.setTry(std::move(t));
1269 if (baton->timed_wait(dur)) {
1270 assert(f.isReady());
1275 void waitViaImpl(Future<T>& f, DrivableExecutor* e) {
1276 // Set callback so to ensure that the via executor has something on it
1277 // so that once the preceding future triggers this callback, drive will
1278 // always have a callback to satisfy it
1281 f = f.via(e).then([](T&& t) { return std::move(t); });
1282 while (!f.isReady()) {
1285 assert(f.isReady());
1288 } // namespace detail
1289 } // namespace futures
1292 SemiFuture<T>& SemiFuture<T>::wait() & {
1293 futures::detail::waitImpl(*this);
1298 SemiFuture<T>&& SemiFuture<T>::wait() && {
1299 futures::detail::waitImpl(*this);
1300 return std::move(*this);
1304 SemiFuture<T>& SemiFuture<T>::wait(Duration dur) & {
1305 futures::detail::waitImpl(*this, dur);
1310 SemiFuture<T>&& SemiFuture<T>::wait(Duration dur) && {
1311 futures::detail::waitImpl(*this, dur);
1312 return std::move(*this);
1316 T SemiFuture<T>::get() {
1317 return std::move(wait().value());
1321 T SemiFuture<T>::get(Duration dur) {
1323 if (this->isReady()) {
1324 return std::move(this->value());
1331 Future<T>& Future<T>::wait() & {
1332 futures::detail::waitImpl(*this);
1337 Future<T>&& Future<T>::wait() && {
1338 futures::detail::waitImpl(*this);
1339 return std::move(*this);
1343 Future<T>& Future<T>::wait(Duration dur) & {
1344 futures::detail::waitImpl(*this, dur);
1349 Future<T>&& Future<T>::wait(Duration dur) && {
1350 futures::detail::waitImpl(*this, dur);
1351 return std::move(*this);
1355 Future<T>& Future<T>::waitVia(DrivableExecutor* e) & {
1356 futures::detail::waitViaImpl(*this, e);
1361 Future<T>&& Future<T>::waitVia(DrivableExecutor* e) && {
1362 futures::detail::waitViaImpl(*this, e);
1363 return std::move(*this);
1367 T Future<T>::getVia(DrivableExecutor* e) {
1368 return std::move(waitVia(e).value());
1375 static bool equals(const Try<T>& t1, const Try<T>& t2) {
1376 return t1.value() == t2.value();
1379 } // namespace detail
1380 } // namespace futures
1383 Future<bool> Future<T>::willEqual(Future<T>& f) {
1384 return collectAll(*this, f).then([](const std::tuple<Try<T>, Try<T>>& t) {
1385 if (std::get<0>(t).hasValue() && std::get<1>(t).hasValue()) {
1386 return futures::detail::TryEquals<T>::equals(
1387 std::get<0>(t), std::get<1>(t));
1396 Future<T> Future<T>::filter(F&& predicate) {
1397 return this->then([p = std::forward<F>(predicate)](T val) {
1398 T const& valConstRef = val;
1399 if (!p(valConstRef)) {
1400 throwPredicateDoesNotObtain();
1407 inline Future<Unit> when(bool p, F&& thunk) {
1408 return p ? std::forward<F>(thunk)().unit() : makeFuture();
1411 template <class P, class F>
1412 Future<Unit> whileDo(P&& predicate, F&& thunk) {
1414 auto future = thunk();
1415 return future.then([
1416 predicate = std::forward<P>(predicate),
1417 thunk = std::forward<F>(thunk)
1419 return whileDo(std::forward<P>(predicate), std::forward<F>(thunk));
1422 return makeFuture();
1426 Future<Unit> times(const int n, F&& thunk) {
1427 return folly::whileDo(
1428 [ n, count = std::make_unique<std::atomic<int>>(0) ]() mutable {
1429 return count->fetch_add(1) < n;
1431 std::forward<F>(thunk));
1435 template <class It, class F, class ItT, class Result>
1436 std::vector<Future<Result>> map(It first, It last, F func) {
1437 std::vector<Future<Result>> results;
1438 for (auto it = first; it != last; it++) {
1439 results.push_back(it->then(func));
1449 struct retrying_policy_raw_tag {};
1450 struct retrying_policy_fut_tag {};
1452 template <class Policy>
1453 struct retrying_policy_traits {
1454 using result = std::result_of_t<Policy(size_t, const exception_wrapper&)>;
1455 using is_raw = std::is_same<result, bool>;
1456 using is_fut = std::is_same<result, Future<bool>>;
1457 using tag = typename std::conditional<
1458 is_raw::value, retrying_policy_raw_tag, typename std::conditional<
1459 is_fut::value, retrying_policy_fut_tag, void>::type>::type;
1462 template <class Policy, class FF, class Prom>
1463 void retryingImpl(size_t k, Policy&& p, FF&& ff, Prom prom) {
1464 using F = typename std::result_of<FF(size_t)>::type;
1465 using T = typename F::value_type;
1466 auto f = makeFutureWith([&] { return ff(k++); });
1469 prom = std::move(prom),
1470 pm = std::forward<Policy>(p),
1471 ffm = std::forward<FF>(ff)
1472 ](Try<T> && t) mutable {
1474 prom.setValue(std::move(t).value());
1477 auto& x = t.exception();
1481 prom = std::move(prom),
1484 ffm = std::move(ffm)
1485 ](bool shouldRetry) mutable {
1487 retryingImpl(k, std::move(pm), std::move(ffm), std::move(prom));
1489 prom.setException(std::move(xm));
1495 template <class Policy, class FF>
1496 typename std::result_of<FF(size_t)>::type
1497 retrying(size_t k, Policy&& p, FF&& ff) {
1498 using F = typename std::result_of<FF(size_t)>::type;
1499 using T = typename F::value_type;
1500 auto prom = Promise<T>();
1501 auto f = prom.getFuture();
1503 k, std::forward<Policy>(p), std::forward<FF>(ff), std::move(prom));
1507 template <class Policy, class FF>
1508 typename std::result_of<FF(size_t)>::type
1509 retrying(Policy&& p, FF&& ff, retrying_policy_raw_tag) {
1510 auto q = [pm = std::forward<Policy>(p)](size_t k, exception_wrapper x) {
1511 return makeFuture<bool>(pm(k, x));
1513 return retrying(0, std::move(q), std::forward<FF>(ff));
1516 template <class Policy, class FF>
1517 typename std::result_of<FF(size_t)>::type
1518 retrying(Policy&& p, FF&& ff, retrying_policy_fut_tag) {
1519 return retrying(0, std::forward<Policy>(p), std::forward<FF>(ff));
1522 // jittered exponential backoff, clamped to [backoff_min, backoff_max]
1523 template <class URNG>
1524 Duration retryingJitteredExponentialBackoffDur(
1526 Duration backoff_min,
1527 Duration backoff_max,
1528 double jitter_param,
1531 auto dist = std::normal_distribution<double>(0.0, jitter_param);
1532 auto jitter = std::exp(dist(rng));
1533 auto backoff = d(d::rep(jitter * backoff_min.count() * std::pow(2, n - 1)));
1534 return std::max(backoff_min, std::min(backoff_max, backoff));
1537 template <class Policy, class URNG>
1538 std::function<Future<bool>(size_t, const exception_wrapper&)>
1539 retryingPolicyCappedJitteredExponentialBackoff(
1541 Duration backoff_min,
1542 Duration backoff_max,
1543 double jitter_param,
1547 pm = std::forward<Policy>(p),
1552 rngp = std::forward<URNG>(rng)
1553 ](size_t n, const exception_wrapper& ex) mutable {
1554 if (n == max_tries) {
1555 return makeFuture(false);
1557 return pm(n, ex).then(
1558 [ n, backoff_min, backoff_max, jitter_param, rngp = std::move(rngp) ](
1561 return makeFuture(false);
1563 auto backoff = detail::retryingJitteredExponentialBackoffDur(
1564 n, backoff_min, backoff_max, jitter_param, rngp);
1565 return futures::sleep(backoff).then([] { return true; });
1570 template <class Policy, class URNG>
1571 std::function<Future<bool>(size_t, const exception_wrapper&)>
1572 retryingPolicyCappedJitteredExponentialBackoff(
1574 Duration backoff_min,
1575 Duration backoff_max,
1576 double jitter_param,
1579 retrying_policy_raw_tag) {
1580 auto q = [pm = std::forward<Policy>(p)](
1581 size_t n, const exception_wrapper& e) {
1582 return makeFuture(pm(n, e));
1584 return retryingPolicyCappedJitteredExponentialBackoff(
1589 std::forward<URNG>(rng),
1593 template <class Policy, class URNG>
1594 std::function<Future<bool>(size_t, const exception_wrapper&)>
1595 retryingPolicyCappedJitteredExponentialBackoff(
1597 Duration backoff_min,
1598 Duration backoff_max,
1599 double jitter_param,
1602 retrying_policy_fut_tag) {
1603 return retryingPolicyCappedJitteredExponentialBackoff(
1608 std::forward<URNG>(rng),
1609 std::forward<Policy>(p));
1613 template <class Policy, class FF>
1614 typename std::result_of<FF(size_t)>::type
1615 retrying(Policy&& p, FF&& ff) {
1616 using tag = typename detail::retrying_policy_traits<Policy>::tag;
1617 return detail::retrying(std::forward<Policy>(p), std::forward<FF>(ff), tag());
1621 std::function<bool(size_t, const exception_wrapper&)>
1622 retryingPolicyBasic(
1624 return [=](size_t n, const exception_wrapper&) { return n < max_tries; };
1627 template <class Policy, class URNG>
1628 std::function<Future<bool>(size_t, const exception_wrapper&)>
1629 retryingPolicyCappedJitteredExponentialBackoff(
1631 Duration backoff_min,
1632 Duration backoff_max,
1633 double jitter_param,
1636 using tag = typename detail::retrying_policy_traits<Policy>::tag;
1637 return detail::retryingPolicyCappedJitteredExponentialBackoff(
1642 std::forward<URNG>(rng),
1643 std::forward<Policy>(p),
1648 std::function<Future<bool>(size_t, const exception_wrapper&)>
1649 retryingPolicyCappedJitteredExponentialBackoff(
1651 Duration backoff_min,
1652 Duration backoff_max,
1653 double jitter_param) {
1654 auto p = [](size_t, const exception_wrapper&) { return true; };
1655 return retryingPolicyCappedJitteredExponentialBackoff(
1666 // Instantiate the most common Future types to save compile time
1667 extern template class Future<Unit>;
1668 extern template class Future<bool>;
1669 extern template class Future<int>;
1670 extern template class Future<int64_t>;
1671 extern template class Future<std::string>;
1672 extern template class Future<double>;
1673 } // namespace folly