2 * Copyright 2017 Facebook, Inc.
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
8 * http://www.apache.org/licenses/LICENSE-2.0
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
24 #include <folly/Baton.h>
25 #include <folly/Optional.h>
26 #include <folly/futures/InlineExecutor.h>
27 #include <folly/futures/Timekeeper.h>
28 #include <folly/futures/detail/Core.h>
30 #ifndef FOLLY_FUTURE_USING_FIBER
31 #if FOLLY_MOBILE || defined(__APPLE__)
32 #define FOLLY_FUTURE_USING_FIBER 0
34 #define FOLLY_FUTURE_USING_FIBER 1
35 #include <folly/fibers/Baton.h>
45 #if FOLLY_FUTURE_USING_FIBER
46 typedef folly::fibers::Baton FutureBatonType;
48 typedef folly::Baton<> FutureBatonType;
51 } // namespace futures
54 std::shared_ptr<Timekeeper> getTimekeeperSingleton();
59 // Guarantees that the stored functor is destructed before the stored promise
60 // may be fulfilled. Assumes the stored functor to be noexcept-destructible.
61 template <typename T, typename F>
62 class CoreCallbackState {
64 template <typename FF>
65 CoreCallbackState(Promise<T>&& promise, FF&& func) noexcept(
66 noexcept(F(std::declval<FF>())))
67 : func_(std::forward<FF>(func)), promise_(std::move(promise)) {
68 assert(before_barrier());
71 CoreCallbackState(CoreCallbackState&& that) noexcept(
72 noexcept(F(std::declval<F>()))) {
73 if (that.before_barrier()) {
74 new (&func_) F(std::move(that.func_));
75 promise_ = that.stealPromise();
79 CoreCallbackState& operator=(CoreCallbackState&&) = delete;
81 ~CoreCallbackState() {
82 if (before_barrier()) {
87 template <typename... Args>
88 auto invoke(Args&&... args) noexcept(
89 noexcept(std::declval<F&&>()(std::declval<Args&&>()...))) {
90 assert(before_barrier());
91 return std::move(func_)(std::forward<Args>(args)...);
94 template <typename... Args>
95 auto tryInvoke(Args&&... args) noexcept {
96 return makeTryWith([&] { return invoke(std::forward<Args>(args)...); });
99 void setTry(Try<T>&& t) {
100 stealPromise().setTry(std::move(t));
103 void setException(exception_wrapper&& ew) {
104 stealPromise().setException(std::move(ew));
107 Promise<T> stealPromise() noexcept {
108 assert(before_barrier());
110 return std::move(promise_);
114 bool before_barrier() const noexcept {
115 return !promise_.isFulfilled();
121 Promise<T> promise_{Promise<T>::makeEmpty()};
124 template <typename T, typename F>
125 inline auto makeCoreCallbackState(Promise<T>&& p, F&& f) noexcept(
126 noexcept(CoreCallbackState<T, _t<std::decay<F>>>(
127 std::declval<Promise<T>&&>(),
128 std::declval<F&&>()))) {
129 return CoreCallbackState<T, _t<std::decay<F>>>(
130 std::move(p), std::forward<F>(f));
132 } // namespace detail
133 } // namespace futures
136 SemiFuture<typename std::decay<T>::type> makeSemiFuture(T&& t) {
137 return makeSemiFuture(Try<typename std::decay<T>::type>(std::forward<T>(t)));
140 // makeSemiFutureWith(SemiFuture<T>()) -> SemiFuture<T>
142 typename std::enable_if<
143 isSemiFuture<typename std::result_of<F()>::type>::value,
144 typename std::result_of<F()>::type>::type
145 makeSemiFutureWith(F&& func) {
147 typename isSemiFuture<typename std::result_of<F()>::type>::Inner;
149 return std::forward<F>(func)();
150 } catch (std::exception& e) {
151 return makeSemiFuture<InnerType>(
152 exception_wrapper(std::current_exception(), e));
154 return makeSemiFuture<InnerType>(
155 exception_wrapper(std::current_exception()));
159 // makeSemiFutureWith(T()) -> SemiFuture<T>
160 // makeSemiFutureWith(void()) -> SemiFuture<Unit>
162 typename std::enable_if<
163 !(isSemiFuture<typename std::result_of<F()>::type>::value),
164 SemiFuture<Unit::LiftT<typename std::result_of<F()>::type>>>::type
165 makeSemiFutureWith(F&& func) {
166 using LiftedResult = Unit::LiftT<typename std::result_of<F()>::type>;
167 return makeSemiFuture<LiftedResult>(
168 makeTryWith([&func]() mutable { return std::forward<F>(func)(); }));
172 SemiFuture<T> makeSemiFuture(std::exception_ptr const& e) {
173 return makeSemiFuture(Try<T>(e));
177 SemiFuture<T> makeSemiFuture(exception_wrapper ew) {
178 return makeSemiFuture(Try<T>(std::move(ew)));
181 template <class T, class E>
183 enable_if<std::is_base_of<std::exception, E>::value, SemiFuture<T>>::type
184 makeSemiFuture(E const& e) {
185 return makeSemiFuture(Try<T>(make_exception_wrapper<E>(e)));
189 SemiFuture<T> makeSemiFuture(Try<T>&& t) {
190 return SemiFuture<T>(new futures::detail::Core<T>(std::move(t)));
194 SemiFuture<T> SemiFuture<T>::makeEmpty() {
195 return SemiFuture<T>(futures::detail::EmptyConstruct{});
199 SemiFuture<T>::SemiFuture(SemiFuture<T>&& other) noexcept : core_(other.core_) {
200 other.core_ = nullptr;
204 SemiFuture<T>& SemiFuture<T>::operator=(SemiFuture<T>&& other) noexcept {
205 std::swap(core_, other.core_);
210 SemiFuture<T>::SemiFuture(Future<T>&& other) noexcept : core_(other.core_) {
211 other.core_ = nullptr;
215 SemiFuture<T>& SemiFuture<T>::operator=(Future<T>&& other) noexcept {
216 std::swap(core_, other.core_);
221 template <class T2, typename>
222 SemiFuture<T>::SemiFuture(T2&& val)
223 : core_(new futures::detail::Core<T>(Try<T>(std::forward<T2>(val)))) {}
226 template <typename T2>
227 SemiFuture<T>::SemiFuture(
228 typename std::enable_if<std::is_same<Unit, T2>::value>::type*)
229 : core_(new futures::detail::Core<T>(Try<T>(T()))) {}
234 typename std::enable_if<std::is_constructible<T, Args&&...>::value, int>::
236 SemiFuture<T>::SemiFuture(in_place_t, Args&&... args)
238 new futures::detail::Core<T>(in_place, std::forward<Args>(args)...)) {
242 SemiFuture<T>::~SemiFuture() {
246 // This must be defined after the constructors to avoid a bug in MSVC
247 // https://connect.microsoft.com/VisualStudio/feedback/details/3142777/out-of-line-constructor-definition-after-implicit-reference-causes-incorrect-c2244
248 inline SemiFuture<Unit> makeSemiFuture() {
249 return makeSemiFuture(Unit{});
253 T& SemiFuture<T>::value() & {
256 return core_->getTry().value();
260 T const& SemiFuture<T>::value() const& {
263 return core_->getTry().value();
267 T&& SemiFuture<T>::value() && {
270 return std::move(core_->getTry().value());
274 T const&& SemiFuture<T>::value() const&& {
277 return std::move(core_->getTry().value());
281 inline Future<T> SemiFuture<T>::via(Executor* executor, int8_t priority) && {
284 setExecutor(executor, priority);
286 auto newFuture = Future<T>(core_);
292 inline Future<T> SemiFuture<T>::via(Executor* executor, int8_t priority) & {
295 auto f = p.getFuture();
296 auto func = [p = std::move(p)](Try<T>&& t) mutable {
297 p.setTry(std::move(t));
299 using R = futures::detail::callableResult<T, decltype(func)>;
300 thenImplementation<decltype(func), R>(std::move(func), typename R::Arg());
301 return std::move(f).via(executor, priority);
305 bool SemiFuture<T>::isReady() const {
307 return core_->ready();
311 bool SemiFuture<T>::hasValue() {
312 return getTry().hasValue();
316 bool SemiFuture<T>::hasException() {
317 return getTry().hasException();
321 void SemiFuture<T>::detach() {
323 core_->detachFuture();
329 Try<T>& SemiFuture<T>::getTry() {
332 return core_->getTry();
336 void SemiFuture<T>::throwIfInvalid() const {
343 Optional<Try<T>> SemiFuture<T>::poll() {
345 if (core_->ready()) {
346 o = std::move(core_->getTry());
352 void SemiFuture<T>::raise(exception_wrapper exception) {
353 core_->raise(std::move(exception));
358 void SemiFuture<T>::setCallback_(F&& func) {
360 core_->setCallback(std::forward<F>(func));
364 SemiFuture<T>::SemiFuture(futures::detail::EmptyConstruct) noexcept
368 Future<T> Future<T>::makeEmpty() {
369 return Future<T>(futures::detail::EmptyConstruct{});
373 Future<T>::Future(Future<T>&& other) noexcept
374 : SemiFuture<T>(std::move(other)) {}
377 Future<T>& Future<T>::operator=(Future<T>&& other) noexcept {
378 SemiFuture<T>::operator=(SemiFuture<T>{std::move(other)});
385 typename std::enable_if<
386 !std::is_same<T, typename std::decay<T2>::type>::value &&
387 std::is_constructible<T, T2&&>::value &&
388 std::is_convertible<T2&&, T>::value,
390 Future<T>::Future(Future<T2>&& other)
391 : Future(std::move(other).then([](T2&& v) { return T(std::move(v)); })) {}
396 typename std::enable_if<
397 !std::is_same<T, typename std::decay<T2>::type>::value &&
398 std::is_constructible<T, T2&&>::value &&
399 !std::is_convertible<T2&&, T>::value,
401 Future<T>::Future(Future<T2>&& other)
402 : Future(std::move(other).then([](T2&& v) { return T(std::move(v)); })) {}
407 typename std::enable_if<
408 !std::is_same<T, typename std::decay<T2>::type>::value &&
409 std::is_constructible<T, T2&&>::value,
411 Future<T>& Future<T>::operator=(Future<T2>&& other) {
413 std::move(other).then([](T2&& v) { return T(std::move(v)); }));
416 // TODO: isSemiFuture
418 template <class T2, typename>
419 Future<T>::Future(T2&& val) : SemiFuture<T>(std::forward<T2>(val)) {}
422 template <typename T2>
423 Future<T>::Future(typename std::enable_if<std::is_same<Unit, T2>::value>::type*)
429 typename std::enable_if<std::is_constructible<T, Args&&...>::value, int>::
431 Future<T>::Future(in_place_t, Args&&... args)
432 : SemiFuture<T>(in_place, std::forward<Args>(args)...) {}
435 Future<T>::~Future() {
442 typename std::enable_if<isFuture<F>::value,
443 Future<typename isFuture<T>::Inner>>::type
444 Future<T>::unwrap() {
445 return then([](Future<typename isFuture<T>::Inner> internal_future) {
446 return internal_future;
452 // Variant: returns a value
453 // e.g. f.then([](Try<T>&& t){ return t.value(); });
455 template <typename F, typename R, bool isTry, typename... Args>
456 typename std::enable_if<!R::ReturnsFuture::value, typename R::Return>::type
457 SemiFuture<T>::thenImplementation(
459 futures::detail::argResult<isTry, F, Args...>) {
460 static_assert(sizeof...(Args) <= 1, "Then must take zero/one argument");
461 typedef typename R::ReturnsFuture::Inner B;
463 this->throwIfInvalid();
466 p.core_->setInterruptHandlerNoLock(this->core_->getInterruptHandler());
468 // grab the Future now before we lose our handle on the Promise
469 auto f = p.getFuture();
470 f.core_->setExecutorNoLock(this->getExecutor());
472 /* This is a bit tricky.
474 We can't just close over *this in case this Future gets moved. So we
475 make a new dummy Future. We could figure out something more
476 sophisticated that avoids making a new Future object when it can, as an
477 optimization. But this is correct.
479 core_ can't be moved, it is explicitly disallowed (as is copying). But
480 if there's ever a reason to allow it, this is one place that makes that
481 assumption and would need to be fixed. We use a standard shared pointer
482 for core_ (by copying it in), which means in essence obj holds a shared
483 pointer to itself. But this shouldn't leak because Promise will not
484 outlive the continuation, because Promise will setException() with a
485 broken Promise if it is destructed before completed. We could use a
486 weak pointer but it would have to be converted to a shared pointer when
487 func is executed (because the Future returned by func may possibly
488 persist beyond the callback, if it gets moved), and so it is an
489 optimization to just make it shared from the get-go.
491 Two subtle but important points about this design. futures::detail::Core
492 has no back pointers to Future or Promise, so if Future or Promise get
493 moved (and they will be moved in performant code) we don't have to do
494 anything fancy. And because we store the continuation in the
495 futures::detail::Core, not in the Future, we can execute the continuation
496 even after the Future has gone out of scope. This is an intentional design
497 decision. It is likely we will want to be able to cancel a continuation
498 in some circumstances, but I think it should be explicit not implicit
499 in the destruction of the Future used to create it.
502 [state = futures::detail::makeCoreCallbackState(
503 std::move(p), std::forward<F>(func))](Try<T>&& t) mutable {
505 if (!isTry && t.hasException()) {
506 state.setException(std::move(t.exception()));
508 state.setTry(makeTryWith(
509 [&] { return state.invoke(t.template get<isTry, Args>()...); }));
515 // Variant: returns a Future
516 // e.g. f.then([](T&& t){ return makeFuture<T>(t); });
518 template <typename F, typename R, bool isTry, typename... Args>
519 typename std::enable_if<R::ReturnsFuture::value, typename R::Return>::type
520 SemiFuture<T>::thenImplementation(
522 futures::detail::argResult<isTry, F, Args...>) {
523 static_assert(sizeof...(Args) <= 1, "Then must take zero/one argument");
524 typedef typename R::ReturnsFuture::Inner B;
525 this->throwIfInvalid();
528 p.core_->setInterruptHandlerNoLock(this->core_->getInterruptHandler());
530 // grab the Future now before we lose our handle on the Promise
531 auto f = p.getFuture();
532 f.core_->setExecutorNoLock(this->getExecutor());
535 [state = futures::detail::makeCoreCallbackState(
536 std::move(p), std::forward<F>(func))](Try<T>&& t) mutable {
537 if (!isTry && t.hasException()) {
538 state.setException(std::move(t.exception()));
540 auto tf2 = state.tryInvoke(t.template get<isTry, Args>()...);
541 if (tf2.hasException()) {
542 state.setException(std::move(tf2.exception()));
544 tf2->setCallback_([p = state.stealPromise()](Try<B> && b) mutable {
545 p.setTry(std::move(b));
554 template <typename T>
555 template <typename R, typename Caller, typename... Args>
556 Future<typename isFuture<R>::Inner>
557 Future<T>::then(R(Caller::*func)(Args...), Caller *instance) {
558 typedef typename std::remove_cv<typename std::remove_reference<
559 typename futures::detail::ArgType<Args...>::FirstArg>::type>::type
562 return then([instance, func](Try<T>&& t){
563 return (instance->*func)(t.template get<isTry<FirstArg>::value, Args>()...);
568 Future<Unit> Future<T>::then() {
569 return then([] () {});
572 // onError where the callback returns T
575 typename std::enable_if<
576 !futures::detail::callableWith<F, exception_wrapper>::value &&
577 !futures::detail::callableWith<F, exception_wrapper&>::value &&
578 !futures::detail::Extract<F>::ReturnsFuture::value,
580 Future<T>::onError(F&& func) {
581 typedef std::remove_reference_t<
582 typename futures::detail::Extract<F>::FirstArg>
585 std::is_same<typename futures::detail::Extract<F>::RawReturn, T>::value,
586 "Return type of onError callback must be T or Future<T>");
589 p.core_->setInterruptHandlerNoLock(this->core_->getInterruptHandler());
590 auto f = p.getFuture();
593 [state = futures::detail::makeCoreCallbackState(
594 std::move(p), std::forward<F>(func))](Try<T>&& t) mutable {
595 if (auto e = t.template tryGetExceptionObject<Exn>()) {
596 state.setTry(makeTryWith([&] { return state.invoke(*e); }));
598 state.setTry(std::move(t));
605 // onError where the callback returns Future<T>
608 typename std::enable_if<
609 !futures::detail::callableWith<F, exception_wrapper>::value &&
610 !futures::detail::callableWith<F, exception_wrapper&>::value &&
611 futures::detail::Extract<F>::ReturnsFuture::value,
613 Future<T>::onError(F&& func) {
615 std::is_same<typename futures::detail::Extract<F>::Return, Future<T>>::
617 "Return type of onError callback must be T or Future<T>");
618 typedef std::remove_reference_t<
619 typename futures::detail::Extract<F>::FirstArg>
623 auto f = p.getFuture();
626 [state = futures::detail::makeCoreCallbackState(
627 std::move(p), std::forward<F>(func))](Try<T>&& t) mutable {
628 if (auto e = t.template tryGetExceptionObject<Exn>()) {
629 auto tf2 = state.tryInvoke(*e);
630 if (tf2.hasException()) {
631 state.setException(std::move(tf2.exception()));
633 tf2->setCallback_([p = state.stealPromise()](Try<T> && t3) mutable {
634 p.setTry(std::move(t3));
638 state.setTry(std::move(t));
647 Future<T> Future<T>::ensure(F&& func) {
648 return this->then([funcw = std::forward<F>(func)](Try<T> && t) mutable {
650 return makeFuture(std::move(t));
656 Future<T> Future<T>::onTimeout(Duration dur, F&& func, Timekeeper* tk) {
657 return within(dur, tk).onError([funcw = std::forward<F>(func)](
658 TimedOut const&) { return std::move(funcw)(); });
663 typename std::enable_if<
664 futures::detail::callableWith<F, exception_wrapper>::value &&
665 futures::detail::Extract<F>::ReturnsFuture::value,
667 Future<T>::onError(F&& func) {
669 std::is_same<typename futures::detail::Extract<F>::Return, Future<T>>::
671 "Return type of onError callback must be T or Future<T>");
674 auto f = p.getFuture();
676 [state = futures::detail::makeCoreCallbackState(
677 std::move(p), std::forward<F>(func))](Try<T> t) mutable {
678 if (t.hasException()) {
679 auto tf2 = state.tryInvoke(std::move(t.exception()));
680 if (tf2.hasException()) {
681 state.setException(std::move(tf2.exception()));
683 tf2->setCallback_([p = state.stealPromise()](Try<T> && t3) mutable {
684 p.setTry(std::move(t3));
688 state.setTry(std::move(t));
695 // onError(exception_wrapper) that returns T
698 typename std::enable_if<
699 futures::detail::callableWith<F, exception_wrapper>::value &&
700 !futures::detail::Extract<F>::ReturnsFuture::value,
702 Future<T>::onError(F&& func) {
704 std::is_same<typename futures::detail::Extract<F>::Return, Future<T>>::
706 "Return type of onError callback must be T or Future<T>");
709 auto f = p.getFuture();
711 [state = futures::detail::makeCoreCallbackState(
712 std::move(p), std::forward<F>(func))](Try<T>&& t) mutable {
713 if (t.hasException()) {
714 state.setTry(makeTryWith(
715 [&] { return state.invoke(std::move(t.exception())); }));
717 state.setTry(std::move(t));
725 Try<T>& Future<T>::getTryVia(DrivableExecutor* e) {
726 return waitVia(e).getTry();
729 template <class Func>
730 auto via(Executor* x, Func&& func)
731 -> Future<typename isFuture<decltype(std::declval<Func>()())>::Inner> {
732 // TODO make this actually more performant. :-P #7260175
733 return via(x).then(std::forward<Func>(func));
737 Future<T>::Future(futures::detail::EmptyConstruct) noexcept
738 : SemiFuture<T>(futures::detail::EmptyConstruct{}) {}
743 Future<typename std::decay<T>::type> makeFuture(T&& t) {
744 return makeFuture(Try<typename std::decay<T>::type>(std::forward<T>(t)));
747 inline Future<Unit> makeFuture() {
748 return makeFuture(Unit{});
751 // makeFutureWith(Future<T>()) -> Future<T>
753 typename std::enable_if<isFuture<typename std::result_of<F()>::type>::value,
754 typename std::result_of<F()>::type>::type
755 makeFutureWith(F&& func) {
757 typename isFuture<typename std::result_of<F()>::type>::Inner;
759 return std::forward<F>(func)();
760 } catch (std::exception& e) {
761 return makeFuture<InnerType>(
762 exception_wrapper(std::current_exception(), e));
764 return makeFuture<InnerType>(exception_wrapper(std::current_exception()));
768 // makeFutureWith(T()) -> Future<T>
769 // makeFutureWith(void()) -> Future<Unit>
771 typename std::enable_if<
772 !(isFuture<typename std::result_of<F()>::type>::value),
773 Future<Unit::LiftT<typename std::result_of<F()>::type>>>::type
774 makeFutureWith(F&& func) {
775 using LiftedResult = Unit::LiftT<typename std::result_of<F()>::type>;
776 return makeFuture<LiftedResult>(
777 makeTryWith([&func]() mutable { return std::forward<F>(func)(); }));
781 Future<T> makeFuture(std::exception_ptr const& e) {
782 return makeFuture(Try<T>(e));
786 Future<T> makeFuture(exception_wrapper ew) {
787 return makeFuture(Try<T>(std::move(ew)));
790 template <class T, class E>
791 typename std::enable_if<std::is_base_of<std::exception, E>::value,
793 makeFuture(E const& e) {
794 return makeFuture(Try<T>(make_exception_wrapper<E>(e)));
798 Future<T> makeFuture(Try<T>&& t) {
799 return Future<T>(new futures::detail::Core<T>(std::move(t)));
803 Future<Unit> via(Executor* executor, int8_t priority) {
804 return makeFuture().via(executor, priority);
807 // mapSetCallback calls func(i, Try<T>) when every future completes
809 template <class T, class InputIterator, class F>
810 void mapSetCallback(InputIterator first, InputIterator last, F func) {
811 for (size_t i = 0; first != last; ++first, ++i) {
812 first->setCallback_([func, i](Try<T>&& t) {
813 func(i, std::move(t));
818 // collectAll (variadic)
820 template <typename... Fs>
821 typename futures::detail::CollectAllVariadicContext<
822 typename std::decay<Fs>::type::value_type...>::type
823 collectAll(Fs&&... fs) {
824 auto ctx = std::make_shared<futures::detail::CollectAllVariadicContext<
825 typename std::decay<Fs>::type::value_type...>>();
826 futures::detail::collectVariadicHelper<
827 futures::detail::CollectAllVariadicContext>(ctx, std::forward<Fs>(fs)...);
828 return ctx->p.getFuture();
831 // collectAll (iterator)
833 template <class InputIterator>
836 Try<typename std::iterator_traits<InputIterator>::value_type::value_type>>>
837 collectAll(InputIterator first, InputIterator last) {
839 typename std::iterator_traits<InputIterator>::value_type::value_type T;
841 struct CollectAllContext {
842 CollectAllContext(size_t n) : results(n) {}
843 ~CollectAllContext() {
844 p.setValue(std::move(results));
846 Promise<std::vector<Try<T>>> p;
847 std::vector<Try<T>> results;
851 std::make_shared<CollectAllContext>(size_t(std::distance(first, last)));
852 mapSetCallback<T>(first, last, [ctx](size_t i, Try<T>&& t) {
853 ctx->results[i] = std::move(t);
855 return ctx->p.getFuture();
858 // collect (iterator)
863 template <typename T>
864 struct CollectContext {
866 explicit Nothing(int /* n */) {}
869 using Result = typename std::conditional<
870 std::is_void<T>::value,
872 std::vector<T>>::type;
874 using InternalResult = typename std::conditional<
875 std::is_void<T>::value,
877 std::vector<Optional<T>>>::type;
879 explicit CollectContext(size_t n) : result(n) {}
881 if (!threw.exchange(true)) {
882 // map Optional<T> -> T
883 std::vector<T> finalResult;
884 finalResult.reserve(result.size());
885 std::transform(result.begin(), result.end(),
886 std::back_inserter(finalResult),
887 [](Optional<T>& o) { return std::move(o.value()); });
888 p.setValue(std::move(finalResult));
891 inline void setPartialResult(size_t i, Try<T>& t) {
892 result[i] = std::move(t.value());
895 InternalResult result;
896 std::atomic<bool> threw {false};
899 } // namespace detail
900 } // namespace futures
902 template <class InputIterator>
903 Future<typename futures::detail::CollectContext<typename std::iterator_traits<
904 InputIterator>::value_type::value_type>::Result>
905 collect(InputIterator first, InputIterator last) {
907 typename std::iterator_traits<InputIterator>::value_type::value_type T;
909 auto ctx = std::make_shared<futures::detail::CollectContext<T>>(
910 std::distance(first, last));
911 mapSetCallback<T>(first, last, [ctx](size_t i, Try<T>&& t) {
912 if (t.hasException()) {
913 if (!ctx->threw.exchange(true)) {
914 ctx->p.setException(std::move(t.exception()));
916 } else if (!ctx->threw) {
917 ctx->setPartialResult(i, t);
920 return ctx->p.getFuture();
923 // collect (variadic)
925 template <typename... Fs>
926 typename futures::detail::CollectVariadicContext<
927 typename std::decay<Fs>::type::value_type...>::type
928 collect(Fs&&... fs) {
929 auto ctx = std::make_shared<futures::detail::CollectVariadicContext<
930 typename std::decay<Fs>::type::value_type...>>();
931 futures::detail::collectVariadicHelper<
932 futures::detail::CollectVariadicContext>(ctx, std::forward<Fs>(fs)...);
933 return ctx->p.getFuture();
936 // collectAny (iterator)
938 template <class InputIterator>
943 std::iterator_traits<InputIterator>::value_type::value_type>>>
944 collectAny(InputIterator first, InputIterator last) {
946 typename std::iterator_traits<InputIterator>::value_type::value_type T;
948 struct CollectAnyContext {
949 CollectAnyContext() {}
950 Promise<std::pair<size_t, Try<T>>> p;
951 std::atomic<bool> done {false};
954 auto ctx = std::make_shared<CollectAnyContext>();
955 mapSetCallback<T>(first, last, [ctx](size_t i, Try<T>&& t) {
956 if (!ctx->done.exchange(true)) {
957 ctx->p.setValue(std::make_pair(i, std::move(t)));
960 return ctx->p.getFuture();
963 // collectAnyWithoutException (iterator)
965 template <class InputIterator>
968 typename std::iterator_traits<InputIterator>::value_type::value_type>>
969 collectAnyWithoutException(InputIterator first, InputIterator last) {
971 typename std::iterator_traits<InputIterator>::value_type::value_type T;
973 struct CollectAnyWithoutExceptionContext {
974 CollectAnyWithoutExceptionContext(){}
975 Promise<std::pair<size_t, T>> p;
976 std::atomic<bool> done{false};
977 std::atomic<size_t> nFulfilled{0};
981 auto ctx = std::make_shared<CollectAnyWithoutExceptionContext>();
982 ctx->nTotal = size_t(std::distance(first, last));
984 mapSetCallback<T>(first, last, [ctx](size_t i, Try<T>&& t) {
985 if (!t.hasException() && !ctx->done.exchange(true)) {
986 ctx->p.setValue(std::make_pair(i, std::move(t.value())));
987 } else if (++ctx->nFulfilled == ctx->nTotal) {
988 ctx->p.setException(t.exception());
991 return ctx->p.getFuture();
994 // collectN (iterator)
996 template <class InputIterator>
997 Future<std::vector<std::pair<size_t, Try<typename
998 std::iterator_traits<InputIterator>::value_type::value_type>>>>
999 collectN(InputIterator first, InputIterator last, size_t n) {
1001 std::iterator_traits<InputIterator>::value_type::value_type T;
1002 typedef std::vector<std::pair<size_t, Try<T>>> V;
1004 struct CollectNContext {
1006 std::atomic<size_t> completed = {0};
1009 auto ctx = std::make_shared<CollectNContext>();
1011 if (size_t(std::distance(first, last)) < n) {
1012 ctx->p.setException(std::runtime_error("Not enough futures"));
1014 // for each completed Future, increase count and add to vector, until we
1015 // have n completed futures at which point we fulfil our Promise with the
1017 mapSetCallback<T>(first, last, [ctx, n](size_t i, Try<T>&& t) {
1018 auto c = ++ctx->completed;
1020 assert(ctx->v.size() < n);
1021 ctx->v.emplace_back(i, std::move(t));
1023 ctx->p.setTry(Try<V>(std::move(ctx->v)));
1029 return ctx->p.getFuture();
1032 // reduce (iterator)
1034 template <class It, class T, class F>
1035 Future<T> reduce(It first, It last, T&& initial, F&& func) {
1036 if (first == last) {
1037 return makeFuture(std::move(initial));
1040 typedef typename std::iterator_traits<It>::value_type::value_type ItT;
1041 typedef typename std::conditional<
1042 futures::detail::callableWith<F, T&&, Try<ItT>&&>::value,
1045 typedef isTry<Arg> IsTry;
1047 auto sfunc = std::make_shared<F>(std::move(func));
1049 auto f = first->then(
1050 [ minitial = std::move(initial), sfunc ](Try<ItT> & head) mutable {
1052 std::move(minitial), head.template get<IsTry::value, Arg&&>());
1055 for (++first; first != last; ++first) {
1056 f = collectAll(f, *first).then([sfunc](std::tuple<Try<T>, Try<ItT>>& t) {
1057 return (*sfunc)(std::move(std::get<0>(t).value()),
1058 // Either return a ItT&& or a Try<ItT>&& depending
1059 // on the type of the argument of func.
1060 std::get<1>(t).template get<IsTry::value, Arg&&>());
1067 // window (collection)
1069 template <class Collection, class F, class ItT, class Result>
1070 std::vector<Future<Result>>
1071 window(Collection input, F func, size_t n) {
1072 // Use global inline executor singleton
1073 auto executor = &InlineExecutor::instance();
1074 return window(executor, std::move(input), std::move(func), n);
1077 template <class Collection, class F, class ItT, class Result>
1078 std::vector<Future<Result>>
1079 window(Executor* executor, Collection input, F func, size_t n) {
1080 struct WindowContext {
1081 WindowContext(Executor* executor_, Collection&& input_, F&& func_)
1082 : executor(executor_),
1083 input(std::move(input_)),
1084 promises(input.size()),
1085 func(std::move(func_)) {}
1086 std::atomic<size_t> i{0};
1089 std::vector<Promise<Result>> promises;
1092 static inline void spawn(std::shared_ptr<WindowContext> ctx) {
1093 size_t i = ctx->i++;
1094 if (i < ctx->input.size()) {
1095 auto fut = ctx->func(std::move(ctx->input[i]));
1096 fut.setCallback_([ctx = std::move(ctx), i](Try<Result>&& t) mutable {
1097 const auto executor_ = ctx->executor;
1098 executor_->add([ctx = std::move(ctx), i, t = std::move(t)]() mutable {
1099 ctx->promises[i].setTry(std::move(t));
1100 // Chain another future onto this one
1101 spawn(std::move(ctx));
1108 auto max = std::min(n, input.size());
1110 auto ctx = std::make_shared<WindowContext>(
1111 executor, std::move(input), std::move(func));
1113 // Start the first n Futures
1114 for (size_t i = 0; i < max; ++i) {
1115 executor->add([ctx]() mutable { WindowContext::spawn(std::move(ctx)); });
1118 std::vector<Future<Result>> futures;
1119 futures.reserve(ctx->promises.size());
1120 for (auto& promise : ctx->promises) {
1121 futures.emplace_back(promise.getFuture());
1130 template <class I, class F>
1131 Future<I> Future<T>::reduce(I&& initial, F&& func) {
1133 minitial = std::forward<I>(initial),
1134 mfunc = std::forward<F>(func)
1135 ](T& vals) mutable {
1136 auto ret = std::move(minitial);
1137 for (auto& val : vals) {
1138 ret = mfunc(std::move(ret), std::move(val));
1144 // unorderedReduce (iterator)
1146 template <class It, class T, class F, class ItT, class Arg>
1147 Future<T> unorderedReduce(It first, It last, T initial, F func) {
1148 if (first == last) {
1149 return makeFuture(std::move(initial));
1152 typedef isTry<Arg> IsTry;
1154 struct UnorderedReduceContext {
1155 UnorderedReduceContext(T&& memo, F&& fn, size_t n)
1156 : lock_(), memo_(makeFuture<T>(std::move(memo))),
1157 func_(std::move(fn)), numThens_(0), numFutures_(n), promise_()
1159 folly::MicroSpinLock lock_; // protects memo_ and numThens_
1162 size_t numThens_; // how many Futures completed and called .then()
1163 size_t numFutures_; // how many Futures in total
1164 Promise<T> promise_;
1167 auto ctx = std::make_shared<UnorderedReduceContext>(
1168 std::move(initial), std::move(func), std::distance(first, last));
1170 mapSetCallback<ItT>(
1173 [ctx](size_t /* i */, Try<ItT>&& t) {
1174 // Futures can be completed in any order, simultaneously.
1175 // To make this non-blocking, we create a new Future chain in
1176 // the order of completion to reduce the values.
1177 // The spinlock just protects chaining a new Future, not actually
1178 // executing the reduce, which should be really fast.
1179 folly::MSLGuard lock(ctx->lock_);
1181 ctx->memo_.then([ ctx, mt = std::move(t) ](T && v) mutable {
1182 // Either return a ItT&& or a Try<ItT>&& depending
1183 // on the type of the argument of func.
1184 return ctx->func_(std::move(v),
1185 mt.template get<IsTry::value, Arg&&>());
1187 if (++ctx->numThens_ == ctx->numFutures_) {
1188 // After reducing the value of the last Future, fulfill the Promise
1189 ctx->memo_.setCallback_(
1190 [ctx](Try<T>&& t2) { ctx->promise_.setValue(std::move(t2)); });
1194 return ctx->promise_.getFuture();
1200 Future<T> Future<T>::within(Duration dur, Timekeeper* tk) {
1201 return within(dur, TimedOut(), tk);
1206 Future<T> Future<T>::within(Duration dur, E e, Timekeeper* tk) {
1209 Context(E ex) : exception(std::move(ex)), promise() {}
1211 Future<Unit> thisFuture;
1213 std::atomic<bool> token {false};
1216 if (this->isReady()) {
1217 return std::move(*this);
1220 std::shared_ptr<Timekeeper> tks;
1222 tks = folly::detail::getTimekeeperSingleton();
1226 if (UNLIKELY(!tk)) {
1227 return makeFuture<T>(NoTimekeeper());
1230 auto ctx = std::make_shared<Context>(std::move(e));
1232 ctx->thisFuture = this->then([ctx](Try<T>&& t) mutable {
1233 if (ctx->token.exchange(true) == false) {
1234 ctx->promise.setTry(std::move(t));
1238 // Have time keeper use a weak ptr to hold ctx,
1239 // so that ctx can be deallocated as soon as the future job finished.
1240 tk->after(dur).then([weakCtx = to_weak_ptr(ctx)](Try<Unit> const& t) mutable {
1241 auto lockedCtx = weakCtx.lock();
1243 // ctx already released. "this" completed first, cancel "after"
1246 // "after" completed first, cancel "this"
1247 lockedCtx->thisFuture.raise(TimedOut());
1248 if (lockedCtx->token.exchange(true) == false) {
1249 if (t.hasException()) {
1250 lockedCtx->promise.setException(std::move(t.exception()));
1252 lockedCtx->promise.setException(std::move(lockedCtx->exception));
1257 return ctx->promise.getFuture().via(this->getExecutor());
1263 Future<T> Future<T>::delayed(Duration dur, Timekeeper* tk) {
1264 return collectAll(*this, futures::sleep(dur, tk))
1265 .then([](std::tuple<Try<T>, Try<Unit>> tup) {
1266 Try<T>& t = std::get<0>(tup);
1267 return makeFuture<T>(std::move(t));
1274 template <class FutureType, typename T = typename FutureType::value_type>
1275 void waitImpl(FutureType& f) {
1276 // short-circuit if there's nothing to do
1281 FutureBatonType baton;
1282 f.setCallback_([&](const Try<T>& /* t */) { baton.post(); });
1284 assert(f.isReady());
1287 template <class FutureType, typename T = typename FutureType::value_type>
1288 void waitImpl(FutureType& f, Duration dur) {
1289 // short-circuit if there's nothing to do
1295 auto ret = promise.getFuture();
1296 auto baton = std::make_shared<FutureBatonType>();
1297 f.setCallback_([ baton, promise = std::move(promise) ](Try<T> && t) mutable {
1298 promise.setTry(std::move(t));
1302 if (baton->timed_wait(dur)) {
1303 assert(f.isReady());
1308 void waitViaImpl(Future<T>& f, DrivableExecutor* e) {
1309 // Set callback so to ensure that the via executor has something on it
1310 // so that once the preceding future triggers this callback, drive will
1311 // always have a callback to satisfy it
1315 f = f.via(e).then([](T&& t) { return std::move(t); });
1316 while (!f.isReady()) {
1319 assert(f.isReady());
1322 } // namespace detail
1323 } // namespace futures
1326 SemiFuture<T>& SemiFuture<T>::wait() & {
1327 futures::detail::waitImpl(*this);
1332 SemiFuture<T>&& SemiFuture<T>::wait() && {
1333 futures::detail::waitImpl(*this);
1334 return std::move(*this);
1338 SemiFuture<T>& SemiFuture<T>::wait(Duration dur) & {
1339 futures::detail::waitImpl(*this, dur);
1344 SemiFuture<T>&& SemiFuture<T>::wait(Duration dur) && {
1345 futures::detail::waitImpl(*this, dur);
1346 return std::move(*this);
1350 T SemiFuture<T>::get() {
1351 return std::move(wait().value());
1355 T SemiFuture<T>::get(Duration dur) {
1357 if (this->isReady()) {
1358 return std::move(this->value());
1365 Future<T>& Future<T>::wait() & {
1366 futures::detail::waitImpl(*this);
1371 Future<T>&& Future<T>::wait() && {
1372 futures::detail::waitImpl(*this);
1373 return std::move(*this);
1377 Future<T>& Future<T>::wait(Duration dur) & {
1378 futures::detail::waitImpl(*this, dur);
1383 Future<T>&& Future<T>::wait(Duration dur) && {
1384 futures::detail::waitImpl(*this, dur);
1385 return std::move(*this);
1389 Future<T>& Future<T>::waitVia(DrivableExecutor* e) & {
1390 futures::detail::waitViaImpl(*this, e);
1395 Future<T>&& Future<T>::waitVia(DrivableExecutor* e) && {
1396 futures::detail::waitViaImpl(*this, e);
1397 return std::move(*this);
1401 T Future<T>::getVia(DrivableExecutor* e) {
1402 return std::move(waitVia(e).value());
1409 static bool equals(const Try<T>& t1, const Try<T>& t2) {
1410 return t1.value() == t2.value();
1413 } // namespace detail
1414 } // namespace futures
1417 Future<bool> Future<T>::willEqual(Future<T>& f) {
1418 return collectAll(*this, f).then([](const std::tuple<Try<T>, Try<T>>& t) {
1419 if (std::get<0>(t).hasValue() && std::get<1>(t).hasValue()) {
1420 return futures::detail::TryEquals<T>::equals(
1421 std::get<0>(t), std::get<1>(t));
1430 Future<T> Future<T>::filter(F&& predicate) {
1431 return this->then([p = std::forward<F>(predicate)](T val) {
1432 T const& valConstRef = val;
1433 if (!p(valConstRef)) {
1434 throwPredicateDoesNotObtain();
1441 inline Future<Unit> when(bool p, F&& thunk) {
1442 return p ? std::forward<F>(thunk)().unit() : makeFuture();
1445 template <class P, class F>
1446 Future<Unit> whileDo(P&& predicate, F&& thunk) {
1448 auto future = thunk();
1449 return future.then([
1450 predicate = std::forward<P>(predicate),
1451 thunk = std::forward<F>(thunk)
1453 return whileDo(std::forward<P>(predicate), std::forward<F>(thunk));
1456 return makeFuture();
1460 Future<Unit> times(const int n, F&& thunk) {
1461 return folly::whileDo(
1462 [ n, count = std::make_unique<std::atomic<int>>(0) ]() mutable {
1463 return count->fetch_add(1) < n;
1465 std::forward<F>(thunk));
1469 template <class It, class F, class ItT, class Result>
1470 std::vector<Future<Result>> map(It first, It last, F func) {
1471 std::vector<Future<Result>> results;
1472 for (auto it = first; it != last; it++) {
1473 results.push_back(it->then(func));
1479 // Instantiate the most common Future types to save compile time
1480 extern template class Future<Unit>;
1481 extern template class Future<bool>;
1482 extern template class Future<int>;
1483 extern template class Future<int64_t>;
1484 extern template class Future<std::string>;
1485 extern template class Future<double>;
1486 } // namespace folly