2 * Copyright 2017 Facebook, Inc.
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
8 * http://www.apache.org/licenses/LICENSE-2.0
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
24 #include <folly/Baton.h>
25 #include <folly/Optional.h>
26 #include <folly/futures/InlineExecutor.h>
27 #include <folly/futures/Timekeeper.h>
28 #include <folly/futures/detail/Core.h>
30 #ifndef FOLLY_FUTURE_USING_FIBER
31 #if FOLLY_MOBILE || defined(__APPLE__)
32 #define FOLLY_FUTURE_USING_FIBER 0
34 #define FOLLY_FUTURE_USING_FIBER 1
35 #include <folly/fibers/Baton.h>
45 #if FOLLY_FUTURE_USING_FIBER
46 typedef folly::fibers::Baton FutureBatonType;
48 typedef folly::Baton<> FutureBatonType;
51 } // namespace futures
54 std::shared_ptr<Timekeeper> getTimekeeperSingleton();
59 // Guarantees that the stored functor is destructed before the stored promise
60 // may be fulfilled. Assumes the stored functor to be noexcept-destructible.
61 template <typename T, typename F>
62 class CoreCallbackState {
64 template <typename FF>
65 CoreCallbackState(Promise<T>&& promise, FF&& func) noexcept(
66 noexcept(F(std::declval<FF>())))
67 : func_(std::forward<FF>(func)), promise_(std::move(promise)) {
68 assert(before_barrier());
71 CoreCallbackState(CoreCallbackState&& that) noexcept(
72 noexcept(F(std::declval<F>()))) {
73 if (that.before_barrier()) {
74 new (&func_) F(std::move(that.func_));
75 promise_ = that.stealPromise();
79 CoreCallbackState& operator=(CoreCallbackState&&) = delete;
81 ~CoreCallbackState() {
82 if (before_barrier()) {
87 template <typename... Args>
88 auto invoke(Args&&... args) noexcept(
89 noexcept(std::declval<F&&>()(std::declval<Args&&>()...))) {
90 assert(before_barrier());
91 return std::move(func_)(std::forward<Args>(args)...);
94 template <typename... Args>
95 auto tryInvoke(Args&&... args) noexcept {
96 return makeTryWith([&] { return invoke(std::forward<Args>(args)...); });
99 void setTry(Try<T>&& t) {
100 stealPromise().setTry(std::move(t));
103 void setException(exception_wrapper&& ew) {
104 stealPromise().setException(std::move(ew));
107 Promise<T> stealPromise() noexcept {
108 assert(before_barrier());
110 return std::move(promise_);
114 bool before_barrier() const noexcept {
115 return !promise_.isFulfilled();
121 Promise<T> promise_{Promise<T>::makeEmpty()};
124 template <typename T, typename F>
125 inline auto makeCoreCallbackState(Promise<T>&& p, F&& f) noexcept(
126 noexcept(CoreCallbackState<T, _t<std::decay<F>>>(
127 std::declval<Promise<T>&&>(),
128 std::declval<F&&>()))) {
129 return CoreCallbackState<T, _t<std::decay<F>>>(
130 std::move(p), std::forward<F>(f));
132 } // namespace detail
133 } // namespace futures
136 SemiFuture<typename std::decay<T>::type> makeSemiFuture(T&& t) {
137 return makeSemiFuture(Try<typename std::decay<T>::type>(std::forward<T>(t)));
140 // makeSemiFutureWith(SemiFuture<T>()) -> SemiFuture<T>
142 typename std::enable_if<
143 isSemiFuture<typename std::result_of<F()>::type>::value,
144 typename std::result_of<F()>::type>::type
145 makeSemiFutureWith(F&& func) {
147 typename isSemiFuture<typename std::result_of<F()>::type>::Inner;
149 return std::forward<F>(func)();
150 } catch (std::exception& e) {
151 return makeSemiFuture<InnerType>(
152 exception_wrapper(std::current_exception(), e));
154 return makeSemiFuture<InnerType>(
155 exception_wrapper(std::current_exception()));
159 // makeSemiFutureWith(T()) -> SemiFuture<T>
160 // makeSemiFutureWith(void()) -> SemiFuture<Unit>
162 typename std::enable_if<
163 !(isSemiFuture<typename std::result_of<F()>::type>::value),
164 SemiFuture<Unit::LiftT<typename std::result_of<F()>::type>>>::type
165 makeSemiFutureWith(F&& func) {
166 using LiftedResult = Unit::LiftT<typename std::result_of<F()>::type>;
167 return makeSemiFuture<LiftedResult>(
168 makeTryWith([&func]() mutable { return std::forward<F>(func)(); }));
172 SemiFuture<T> makeSemiFuture(std::exception_ptr const& e) {
173 return makeSemiFuture(Try<T>(e));
177 SemiFuture<T> makeSemiFuture(exception_wrapper ew) {
178 return makeSemiFuture(Try<T>(std::move(ew)));
181 template <class T, class E>
183 enable_if<std::is_base_of<std::exception, E>::value, SemiFuture<T>>::type
184 makeSemiFuture(E const& e) {
185 return makeSemiFuture(Try<T>(make_exception_wrapper<E>(e)));
189 SemiFuture<T> makeSemiFuture(Try<T>&& t) {
190 return SemiFuture<T>(new futures::detail::Core<T>(std::move(t)));
194 SemiFuture<T> SemiFuture<T>::makeEmpty() {
195 return SemiFuture<T>(futures::detail::EmptyConstruct{});
199 SemiFuture<T>::SemiFuture(SemiFuture<T>&& other) noexcept : core_(other.core_) {
200 other.core_ = nullptr;
204 SemiFuture<T>& SemiFuture<T>::operator=(SemiFuture<T>&& other) noexcept {
205 std::swap(core_, other.core_);
210 SemiFuture<T>::SemiFuture(Future<T>&& other) noexcept : core_(other.core_) {
211 other.core_ = nullptr;
215 SemiFuture<T>& SemiFuture<T>::operator=(Future<T>&& other) noexcept {
216 std::swap(core_, other.core_);
221 template <class T2, typename>
222 SemiFuture<T>::SemiFuture(T2&& val)
223 : core_(new futures::detail::Core<T>(Try<T>(std::forward<T2>(val)))) {}
226 template <typename T2>
227 SemiFuture<T>::SemiFuture(
228 typename std::enable_if<std::is_same<Unit, T2>::value>::type*)
229 : core_(new futures::detail::Core<T>(Try<T>(T()))) {}
234 typename std::enable_if<std::is_constructible<T, Args&&...>::value, int>::
236 SemiFuture<T>::SemiFuture(in_place_t, Args&&... args)
238 new futures::detail::Core<T>(in_place, std::forward<Args>(args)...)) {
242 SemiFuture<T>::~SemiFuture() {
246 // This must be defined after the constructors to avoid a bug in MSVC
247 // https://connect.microsoft.com/VisualStudio/feedback/details/3142777/out-of-line-constructor-definition-after-implicit-reference-causes-incorrect-c2244
248 inline SemiFuture<Unit> makeSemiFuture() {
249 return makeSemiFuture(Unit{});
253 T& SemiFuture<T>::value() & {
256 return core_->getTry().value();
260 T const& SemiFuture<T>::value() const& {
263 return core_->getTry().value();
267 T&& SemiFuture<T>::value() && {
270 return std::move(core_->getTry().value());
274 T const&& SemiFuture<T>::value() const&& {
277 return std::move(core_->getTry().value());
281 inline Future<T> SemiFuture<T>::via(Executor* executor, int8_t priority) && {
284 setExecutor(executor, priority);
286 auto newFuture = Future<T>(core_);
292 inline Future<T> SemiFuture<T>::via(Executor* executor, int8_t priority) & {
295 auto f = p.getFuture();
296 auto func = [p = std::move(p)](Try<T>&& t) mutable {
297 p.setTry(std::move(t));
299 using R = futures::detail::callableResult<T, decltype(func)>;
300 thenImplementation<decltype(func), R>(std::move(func), typename R::Arg());
301 return std::move(f).via(executor, priority);
305 bool SemiFuture<T>::isReady() const {
307 return core_->ready();
311 bool SemiFuture<T>::hasValue() {
312 return getTry().hasValue();
316 bool SemiFuture<T>::hasException() {
317 return getTry().hasException();
321 void SemiFuture<T>::detach() {
323 core_->detachFuture();
329 Try<T>& SemiFuture<T>::getTry() {
332 return core_->getTry();
336 void SemiFuture<T>::throwIfInvalid() const {
342 Optional<Try<T>> SemiFuture<T>::poll() {
344 if (core_->ready()) {
345 o = std::move(core_->getTry());
351 void SemiFuture<T>::raise(exception_wrapper exception) {
352 core_->raise(std::move(exception));
357 void SemiFuture<T>::setCallback_(F&& func) {
359 core_->setCallback(std::forward<F>(func));
363 SemiFuture<T>::SemiFuture(futures::detail::EmptyConstruct) noexcept
367 Future<T> Future<T>::makeEmpty() {
368 return Future<T>(futures::detail::EmptyConstruct{});
372 Future<T>::Future(Future<T>&& other) noexcept
373 : SemiFuture<T>(std::move(other)) {}
376 Future<T>& Future<T>::operator=(Future<T>&& other) noexcept {
377 SemiFuture<T>::operator=(SemiFuture<T>{std::move(other)});
384 typename std::enable_if<
385 !std::is_same<T, typename std::decay<T2>::type>::value &&
386 std::is_constructible<T, T2&&>::value &&
387 std::is_convertible<T2&&, T>::value,
389 Future<T>::Future(Future<T2>&& other)
390 : Future(std::move(other).then([](T2&& v) { return T(std::move(v)); })) {}
395 typename std::enable_if<
396 !std::is_same<T, typename std::decay<T2>::type>::value &&
397 std::is_constructible<T, T2&&>::value &&
398 !std::is_convertible<T2&&, T>::value,
400 Future<T>::Future(Future<T2>&& other)
401 : Future(std::move(other).then([](T2&& v) { return T(std::move(v)); })) {}
406 typename std::enable_if<
407 !std::is_same<T, typename std::decay<T2>::type>::value &&
408 std::is_constructible<T, T2&&>::value,
410 Future<T>& Future<T>::operator=(Future<T2>&& other) {
412 std::move(other).then([](T2&& v) { return T(std::move(v)); }));
415 // TODO: isSemiFuture
417 template <class T2, typename>
418 Future<T>::Future(T2&& val) : SemiFuture<T>(std::forward<T2>(val)) {}
421 template <typename T2>
422 Future<T>::Future(typename std::enable_if<std::is_same<Unit, T2>::value>::type*)
428 typename std::enable_if<std::is_constructible<T, Args&&...>::value, int>::
430 Future<T>::Future(in_place_t, Args&&... args)
431 : SemiFuture<T>(in_place, std::forward<Args>(args)...) {}
434 Future<T>::~Future() {
441 typename std::enable_if<isFuture<F>::value,
442 Future<typename isFuture<T>::Inner>>::type
443 Future<T>::unwrap() {
444 return then([](Future<typename isFuture<T>::Inner> internal_future) {
445 return internal_future;
451 // Variant: returns a value
452 // e.g. f.then([](Try<T>&& t){ return t.value(); });
454 template <typename F, typename R, bool isTry, typename... Args>
455 typename std::enable_if<!R::ReturnsFuture::value, typename R::Return>::type
456 SemiFuture<T>::thenImplementation(
458 futures::detail::argResult<isTry, F, Args...>) {
459 static_assert(sizeof...(Args) <= 1, "Then must take zero/one argument");
460 typedef typename R::ReturnsFuture::Inner B;
462 this->throwIfInvalid();
465 p.core_->setInterruptHandlerNoLock(this->core_->getInterruptHandler());
467 // grab the Future now before we lose our handle on the Promise
468 auto f = p.getFuture();
469 f.core_->setExecutorNoLock(this->getExecutor());
471 /* This is a bit tricky.
473 We can't just close over *this in case this Future gets moved. So we
474 make a new dummy Future. We could figure out something more
475 sophisticated that avoids making a new Future object when it can, as an
476 optimization. But this is correct.
478 core_ can't be moved, it is explicitly disallowed (as is copying). But
479 if there's ever a reason to allow it, this is one place that makes that
480 assumption and would need to be fixed. We use a standard shared pointer
481 for core_ (by copying it in), which means in essence obj holds a shared
482 pointer to itself. But this shouldn't leak because Promise will not
483 outlive the continuation, because Promise will setException() with a
484 broken Promise if it is destructed before completed. We could use a
485 weak pointer but it would have to be converted to a shared pointer when
486 func is executed (because the Future returned by func may possibly
487 persist beyond the callback, if it gets moved), and so it is an
488 optimization to just make it shared from the get-go.
490 Two subtle but important points about this design. futures::detail::Core
491 has no back pointers to Future or Promise, so if Future or Promise get
492 moved (and they will be moved in performant code) we don't have to do
493 anything fancy. And because we store the continuation in the
494 futures::detail::Core, not in the Future, we can execute the continuation
495 even after the Future has gone out of scope. This is an intentional design
496 decision. It is likely we will want to be able to cancel a continuation
497 in some circumstances, but I think it should be explicit not implicit
498 in the destruction of the Future used to create it.
501 [state = futures::detail::makeCoreCallbackState(
502 std::move(p), std::forward<F>(func))](Try<T>&& t) mutable {
504 if (!isTry && t.hasException()) {
505 state.setException(std::move(t.exception()));
507 state.setTry(makeTryWith(
508 [&] { return state.invoke(t.template get<isTry, Args>()...); }));
514 // Variant: returns a Future
515 // e.g. f.then([](T&& t){ return makeFuture<T>(t); });
517 template <typename F, typename R, bool isTry, typename... Args>
518 typename std::enable_if<R::ReturnsFuture::value, typename R::Return>::type
519 SemiFuture<T>::thenImplementation(
521 futures::detail::argResult<isTry, F, Args...>) {
522 static_assert(sizeof...(Args) <= 1, "Then must take zero/one argument");
523 typedef typename R::ReturnsFuture::Inner B;
524 this->throwIfInvalid();
527 p.core_->setInterruptHandlerNoLock(this->core_->getInterruptHandler());
529 // grab the Future now before we lose our handle on the Promise
530 auto f = p.getFuture();
531 f.core_->setExecutorNoLock(this->getExecutor());
534 [state = futures::detail::makeCoreCallbackState(
535 std::move(p), std::forward<F>(func))](Try<T>&& t) mutable {
536 if (!isTry && t.hasException()) {
537 state.setException(std::move(t.exception()));
539 auto tf2 = state.tryInvoke(t.template get<isTry, Args>()...);
540 if (tf2.hasException()) {
541 state.setException(std::move(tf2.exception()));
543 tf2->setCallback_([p = state.stealPromise()](Try<B> && b) mutable {
544 p.setTry(std::move(b));
553 template <typename T>
554 template <typename R, typename Caller, typename... Args>
555 Future<typename isFuture<R>::Inner>
556 Future<T>::then(R(Caller::*func)(Args...), Caller *instance) {
557 typedef typename std::remove_cv<typename std::remove_reference<
558 typename futures::detail::ArgType<Args...>::FirstArg>::type>::type
561 return then([instance, func](Try<T>&& t){
562 return (instance->*func)(t.template get<isTry<FirstArg>::value, Args>()...);
567 Future<Unit> Future<T>::then() {
568 return then([] () {});
571 // onError where the callback returns T
574 typename std::enable_if<
575 !futures::detail::callableWith<F, exception_wrapper>::value &&
576 !futures::detail::callableWith<F, exception_wrapper&>::value &&
577 !futures::detail::Extract<F>::ReturnsFuture::value,
579 Future<T>::onError(F&& func) {
580 typedef std::remove_reference_t<
581 typename futures::detail::Extract<F>::FirstArg>
584 std::is_same<typename futures::detail::Extract<F>::RawReturn, T>::value,
585 "Return type of onError callback must be T or Future<T>");
588 p.core_->setInterruptHandlerNoLock(this->core_->getInterruptHandler());
589 auto f = p.getFuture();
592 [state = futures::detail::makeCoreCallbackState(
593 std::move(p), std::forward<F>(func))](Try<T>&& t) mutable {
594 if (auto e = t.template tryGetExceptionObject<Exn>()) {
595 state.setTry(makeTryWith([&] { return state.invoke(*e); }));
597 state.setTry(std::move(t));
604 // onError where the callback returns Future<T>
607 typename std::enable_if<
608 !futures::detail::callableWith<F, exception_wrapper>::value &&
609 !futures::detail::callableWith<F, exception_wrapper&>::value &&
610 futures::detail::Extract<F>::ReturnsFuture::value,
612 Future<T>::onError(F&& func) {
614 std::is_same<typename futures::detail::Extract<F>::Return, Future<T>>::
616 "Return type of onError callback must be T or Future<T>");
617 typedef std::remove_reference_t<
618 typename futures::detail::Extract<F>::FirstArg>
622 auto f = p.getFuture();
625 [state = futures::detail::makeCoreCallbackState(
626 std::move(p), std::forward<F>(func))](Try<T>&& t) mutable {
627 if (auto e = t.template tryGetExceptionObject<Exn>()) {
628 auto tf2 = state.tryInvoke(*e);
629 if (tf2.hasException()) {
630 state.setException(std::move(tf2.exception()));
632 tf2->setCallback_([p = state.stealPromise()](Try<T> && t3) mutable {
633 p.setTry(std::move(t3));
637 state.setTry(std::move(t));
646 Future<T> Future<T>::ensure(F&& func) {
647 return this->then([funcw = std::forward<F>(func)](Try<T> && t) mutable {
649 return makeFuture(std::move(t));
655 Future<T> Future<T>::onTimeout(Duration dur, F&& func, Timekeeper* tk) {
656 return within(dur, tk).onError([funcw = std::forward<F>(func)](
657 TimedOut const&) { return std::move(funcw)(); });
662 typename std::enable_if<
663 futures::detail::callableWith<F, exception_wrapper>::value &&
664 futures::detail::Extract<F>::ReturnsFuture::value,
666 Future<T>::onError(F&& func) {
668 std::is_same<typename futures::detail::Extract<F>::Return, Future<T>>::
670 "Return type of onError callback must be T or Future<T>");
673 auto f = p.getFuture();
675 [state = futures::detail::makeCoreCallbackState(
676 std::move(p), std::forward<F>(func))](Try<T> t) mutable {
677 if (t.hasException()) {
678 auto tf2 = state.tryInvoke(std::move(t.exception()));
679 if (tf2.hasException()) {
680 state.setException(std::move(tf2.exception()));
682 tf2->setCallback_([p = state.stealPromise()](Try<T> && t3) mutable {
683 p.setTry(std::move(t3));
687 state.setTry(std::move(t));
694 // onError(exception_wrapper) that returns T
697 typename std::enable_if<
698 futures::detail::callableWith<F, exception_wrapper>::value &&
699 !futures::detail::Extract<F>::ReturnsFuture::value,
701 Future<T>::onError(F&& func) {
703 std::is_same<typename futures::detail::Extract<F>::Return, Future<T>>::
705 "Return type of onError callback must be T or Future<T>");
708 auto f = p.getFuture();
710 [state = futures::detail::makeCoreCallbackState(
711 std::move(p), std::forward<F>(func))](Try<T>&& t) mutable {
712 if (t.hasException()) {
713 state.setTry(makeTryWith(
714 [&] { return state.invoke(std::move(t.exception())); }));
716 state.setTry(std::move(t));
724 Try<T>& Future<T>::getTryVia(DrivableExecutor* e) {
725 return waitVia(e).getTry();
728 template <class Func>
729 auto via(Executor* x, Func&& func)
730 -> Future<typename isFuture<decltype(std::declval<Func>()())>::Inner> {
731 // TODO make this actually more performant. :-P #7260175
732 return via(x).then(std::forward<Func>(func));
736 Future<T>::Future(futures::detail::EmptyConstruct) noexcept
737 : SemiFuture<T>(futures::detail::EmptyConstruct{}) {}
742 Future<typename std::decay<T>::type> makeFuture(T&& t) {
743 return makeFuture(Try<typename std::decay<T>::type>(std::forward<T>(t)));
746 inline Future<Unit> makeFuture() {
747 return makeFuture(Unit{});
750 // makeFutureWith(Future<T>()) -> Future<T>
752 typename std::enable_if<isFuture<typename std::result_of<F()>::type>::value,
753 typename std::result_of<F()>::type>::type
754 makeFutureWith(F&& func) {
756 typename isFuture<typename std::result_of<F()>::type>::Inner;
758 return std::forward<F>(func)();
759 } catch (std::exception& e) {
760 return makeFuture<InnerType>(
761 exception_wrapper(std::current_exception(), e));
763 return makeFuture<InnerType>(exception_wrapper(std::current_exception()));
767 // makeFutureWith(T()) -> Future<T>
768 // makeFutureWith(void()) -> Future<Unit>
770 typename std::enable_if<
771 !(isFuture<typename std::result_of<F()>::type>::value),
772 Future<Unit::LiftT<typename std::result_of<F()>::type>>>::type
773 makeFutureWith(F&& func) {
774 using LiftedResult = Unit::LiftT<typename std::result_of<F()>::type>;
775 return makeFuture<LiftedResult>(
776 makeTryWith([&func]() mutable { return std::forward<F>(func)(); }));
780 Future<T> makeFuture(std::exception_ptr const& e) {
781 return makeFuture(Try<T>(e));
785 Future<T> makeFuture(exception_wrapper ew) {
786 return makeFuture(Try<T>(std::move(ew)));
789 template <class T, class E>
790 typename std::enable_if<std::is_base_of<std::exception, E>::value,
792 makeFuture(E const& e) {
793 return makeFuture(Try<T>(make_exception_wrapper<E>(e)));
797 Future<T> makeFuture(Try<T>&& t) {
798 return Future<T>(new futures::detail::Core<T>(std::move(t)));
802 Future<Unit> via(Executor* executor, int8_t priority) {
803 return makeFuture().via(executor, priority);
806 // mapSetCallback calls func(i, Try<T>) when every future completes
808 template <class T, class InputIterator, class F>
809 void mapSetCallback(InputIterator first, InputIterator last, F func) {
810 for (size_t i = 0; first != last; ++first, ++i) {
811 first->setCallback_([func, i](Try<T>&& t) {
812 func(i, std::move(t));
817 // collectAll (variadic)
819 template <typename... Fs>
820 typename futures::detail::CollectAllVariadicContext<
821 typename std::decay<Fs>::type::value_type...>::type
822 collectAll(Fs&&... fs) {
823 auto ctx = std::make_shared<futures::detail::CollectAllVariadicContext<
824 typename std::decay<Fs>::type::value_type...>>();
825 futures::detail::collectVariadicHelper<
826 futures::detail::CollectAllVariadicContext>(ctx, std::forward<Fs>(fs)...);
827 return ctx->p.getFuture();
830 // collectAll (iterator)
832 template <class InputIterator>
835 Try<typename std::iterator_traits<InputIterator>::value_type::value_type>>>
836 collectAll(InputIterator first, InputIterator last) {
838 typename std::iterator_traits<InputIterator>::value_type::value_type T;
840 struct CollectAllContext {
841 CollectAllContext(size_t n) : results(n) {}
842 ~CollectAllContext() {
843 p.setValue(std::move(results));
845 Promise<std::vector<Try<T>>> p;
846 std::vector<Try<T>> results;
850 std::make_shared<CollectAllContext>(size_t(std::distance(first, last)));
851 mapSetCallback<T>(first, last, [ctx](size_t i, Try<T>&& t) {
852 ctx->results[i] = std::move(t);
854 return ctx->p.getFuture();
857 // collect (iterator)
862 template <typename T>
863 struct CollectContext {
865 explicit Nothing(int /* n */) {}
868 using Result = typename std::conditional<
869 std::is_void<T>::value,
871 std::vector<T>>::type;
873 using InternalResult = typename std::conditional<
874 std::is_void<T>::value,
876 std::vector<Optional<T>>>::type;
878 explicit CollectContext(size_t n) : result(n) {}
880 if (!threw.exchange(true)) {
881 // map Optional<T> -> T
882 std::vector<T> finalResult;
883 finalResult.reserve(result.size());
884 std::transform(result.begin(), result.end(),
885 std::back_inserter(finalResult),
886 [](Optional<T>& o) { return std::move(o.value()); });
887 p.setValue(std::move(finalResult));
890 inline void setPartialResult(size_t i, Try<T>& t) {
891 result[i] = std::move(t.value());
894 InternalResult result;
895 std::atomic<bool> threw {false};
898 } // namespace detail
899 } // namespace futures
901 template <class InputIterator>
902 Future<typename futures::detail::CollectContext<typename std::iterator_traits<
903 InputIterator>::value_type::value_type>::Result>
904 collect(InputIterator first, InputIterator last) {
906 typename std::iterator_traits<InputIterator>::value_type::value_type T;
908 auto ctx = std::make_shared<futures::detail::CollectContext<T>>(
909 std::distance(first, last));
910 mapSetCallback<T>(first, last, [ctx](size_t i, Try<T>&& t) {
911 if (t.hasException()) {
912 if (!ctx->threw.exchange(true)) {
913 ctx->p.setException(std::move(t.exception()));
915 } else if (!ctx->threw) {
916 ctx->setPartialResult(i, t);
919 return ctx->p.getFuture();
922 // collect (variadic)
924 template <typename... Fs>
925 typename futures::detail::CollectVariadicContext<
926 typename std::decay<Fs>::type::value_type...>::type
927 collect(Fs&&... fs) {
928 auto ctx = std::make_shared<futures::detail::CollectVariadicContext<
929 typename std::decay<Fs>::type::value_type...>>();
930 futures::detail::collectVariadicHelper<
931 futures::detail::CollectVariadicContext>(ctx, std::forward<Fs>(fs)...);
932 return ctx->p.getFuture();
935 // collectAny (iterator)
937 template <class InputIterator>
942 std::iterator_traits<InputIterator>::value_type::value_type>>>
943 collectAny(InputIterator first, InputIterator last) {
945 typename std::iterator_traits<InputIterator>::value_type::value_type T;
947 struct CollectAnyContext {
948 CollectAnyContext() {}
949 Promise<std::pair<size_t, Try<T>>> p;
950 std::atomic<bool> done {false};
953 auto ctx = std::make_shared<CollectAnyContext>();
954 mapSetCallback<T>(first, last, [ctx](size_t i, Try<T>&& t) {
955 if (!ctx->done.exchange(true)) {
956 ctx->p.setValue(std::make_pair(i, std::move(t)));
959 return ctx->p.getFuture();
962 // collectAnyWithoutException (iterator)
964 template <class InputIterator>
967 typename std::iterator_traits<InputIterator>::value_type::value_type>>
968 collectAnyWithoutException(InputIterator first, InputIterator last) {
970 typename std::iterator_traits<InputIterator>::value_type::value_type T;
972 struct CollectAnyWithoutExceptionContext {
973 CollectAnyWithoutExceptionContext(){}
974 Promise<std::pair<size_t, T>> p;
975 std::atomic<bool> done{false};
976 std::atomic<size_t> nFulfilled{0};
980 auto ctx = std::make_shared<CollectAnyWithoutExceptionContext>();
981 ctx->nTotal = size_t(std::distance(first, last));
983 mapSetCallback<T>(first, last, [ctx](size_t i, Try<T>&& t) {
984 if (!t.hasException() && !ctx->done.exchange(true)) {
985 ctx->p.setValue(std::make_pair(i, std::move(t.value())));
986 } else if (++ctx->nFulfilled == ctx->nTotal) {
987 ctx->p.setException(t.exception());
990 return ctx->p.getFuture();
993 // collectN (iterator)
995 template <class InputIterator>
996 Future<std::vector<std::pair<size_t, Try<typename
997 std::iterator_traits<InputIterator>::value_type::value_type>>>>
998 collectN(InputIterator first, InputIterator last, size_t n) {
1000 std::iterator_traits<InputIterator>::value_type::value_type T;
1001 typedef std::vector<std::pair<size_t, Try<T>>> V;
1003 struct CollectNContext {
1005 std::atomic<size_t> completed = {0};
1008 auto ctx = std::make_shared<CollectNContext>();
1010 if (size_t(std::distance(first, last)) < n) {
1011 ctx->p.setException(std::runtime_error("Not enough futures"));
1013 // for each completed Future, increase count and add to vector, until we
1014 // have n completed futures at which point we fulfil our Promise with the
1016 mapSetCallback<T>(first, last, [ctx, n](size_t i, Try<T>&& t) {
1017 auto c = ++ctx->completed;
1019 assert(ctx->v.size() < n);
1020 ctx->v.emplace_back(i, std::move(t));
1022 ctx->p.setTry(Try<V>(std::move(ctx->v)));
1028 return ctx->p.getFuture();
1031 // reduce (iterator)
1033 template <class It, class T, class F>
1034 Future<T> reduce(It first, It last, T&& initial, F&& func) {
1035 if (first == last) {
1036 return makeFuture(std::move(initial));
1039 typedef typename std::iterator_traits<It>::value_type::value_type ItT;
1040 typedef typename std::conditional<
1041 futures::detail::callableWith<F, T&&, Try<ItT>&&>::value,
1044 typedef isTry<Arg> IsTry;
1046 auto sfunc = std::make_shared<F>(std::move(func));
1048 auto f = first->then(
1049 [ minitial = std::move(initial), sfunc ](Try<ItT> & head) mutable {
1051 std::move(minitial), head.template get<IsTry::value, Arg&&>());
1054 for (++first; first != last; ++first) {
1055 f = collectAll(f, *first).then([sfunc](std::tuple<Try<T>, Try<ItT>>& t) {
1056 return (*sfunc)(std::move(std::get<0>(t).value()),
1057 // Either return a ItT&& or a Try<ItT>&& depending
1058 // on the type of the argument of func.
1059 std::get<1>(t).template get<IsTry::value, Arg&&>());
1066 // window (collection)
1068 template <class Collection, class F, class ItT, class Result>
1069 std::vector<Future<Result>>
1070 window(Collection input, F func, size_t n) {
1071 // Use global inline executor singleton
1072 auto executor = &InlineExecutor::instance();
1073 return window(executor, std::move(input), std::move(func), n);
1076 template <class Collection, class F, class ItT, class Result>
1077 std::vector<Future<Result>>
1078 window(Executor* executor, Collection input, F func, size_t n) {
1079 struct WindowContext {
1080 WindowContext(Executor* executor_, Collection&& input_, F&& func_)
1081 : executor(executor_),
1082 input(std::move(input_)),
1083 promises(input.size()),
1084 func(std::move(func_)) {}
1085 std::atomic<size_t> i{0};
1088 std::vector<Promise<Result>> promises;
1091 static inline void spawn(std::shared_ptr<WindowContext> ctx) {
1092 size_t i = ctx->i++;
1093 if (i < ctx->input.size()) {
1094 auto fut = ctx->func(std::move(ctx->input[i]));
1095 fut.setCallback_([ctx = std::move(ctx), i](Try<Result>&& t) mutable {
1096 const auto executor_ = ctx->executor;
1097 executor_->add([ctx = std::move(ctx), i, t = std::move(t)]() mutable {
1098 ctx->promises[i].setTry(std::move(t));
1099 // Chain another future onto this one
1100 spawn(std::move(ctx));
1107 auto max = std::min(n, input.size());
1109 auto ctx = std::make_shared<WindowContext>(
1110 executor, std::move(input), std::move(func));
1112 // Start the first n Futures
1113 for (size_t i = 0; i < max; ++i) {
1114 executor->add([ctx]() mutable { WindowContext::spawn(std::move(ctx)); });
1117 std::vector<Future<Result>> futures;
1118 futures.reserve(ctx->promises.size());
1119 for (auto& promise : ctx->promises) {
1120 futures.emplace_back(promise.getFuture());
1129 template <class I, class F>
1130 Future<I> Future<T>::reduce(I&& initial, F&& func) {
1132 minitial = std::forward<I>(initial),
1133 mfunc = std::forward<F>(func)
1134 ](T& vals) mutable {
1135 auto ret = std::move(minitial);
1136 for (auto& val : vals) {
1137 ret = mfunc(std::move(ret), std::move(val));
1143 // unorderedReduce (iterator)
1145 template <class It, class T, class F, class ItT, class Arg>
1146 Future<T> unorderedReduce(It first, It last, T initial, F func) {
1147 if (first == last) {
1148 return makeFuture(std::move(initial));
1151 typedef isTry<Arg> IsTry;
1153 struct UnorderedReduceContext {
1154 UnorderedReduceContext(T&& memo, F&& fn, size_t n)
1155 : lock_(), memo_(makeFuture<T>(std::move(memo))),
1156 func_(std::move(fn)), numThens_(0), numFutures_(n), promise_()
1158 folly::MicroSpinLock lock_; // protects memo_ and numThens_
1161 size_t numThens_; // how many Futures completed and called .then()
1162 size_t numFutures_; // how many Futures in total
1163 Promise<T> promise_;
1166 auto ctx = std::make_shared<UnorderedReduceContext>(
1167 std::move(initial), std::move(func), std::distance(first, last));
1169 mapSetCallback<ItT>(
1172 [ctx](size_t /* i */, Try<ItT>&& t) {
1173 // Futures can be completed in any order, simultaneously.
1174 // To make this non-blocking, we create a new Future chain in
1175 // the order of completion to reduce the values.
1176 // The spinlock just protects chaining a new Future, not actually
1177 // executing the reduce, which should be really fast.
1178 folly::MSLGuard lock(ctx->lock_);
1180 ctx->memo_.then([ ctx, mt = std::move(t) ](T && v) mutable {
1181 // Either return a ItT&& or a Try<ItT>&& depending
1182 // on the type of the argument of func.
1183 return ctx->func_(std::move(v),
1184 mt.template get<IsTry::value, Arg&&>());
1186 if (++ctx->numThens_ == ctx->numFutures_) {
1187 // After reducing the value of the last Future, fulfill the Promise
1188 ctx->memo_.setCallback_(
1189 [ctx](Try<T>&& t2) { ctx->promise_.setValue(std::move(t2)); });
1193 return ctx->promise_.getFuture();
1199 Future<T> Future<T>::within(Duration dur, Timekeeper* tk) {
1200 return within(dur, TimedOut(), tk);
1205 Future<T> Future<T>::within(Duration dur, E e, Timekeeper* tk) {
1208 Context(E ex) : exception(std::move(ex)), promise() {}
1210 Future<Unit> thisFuture;
1212 std::atomic<bool> token {false};
1215 if (this->isReady()) {
1216 return std::move(*this);
1219 std::shared_ptr<Timekeeper> tks;
1221 tks = folly::detail::getTimekeeperSingleton();
1222 tk = DCHECK_NOTNULL(tks.get());
1225 auto ctx = std::make_shared<Context>(std::move(e));
1227 ctx->thisFuture = this->then([ctx](Try<T>&& t) mutable {
1228 if (ctx->token.exchange(true) == false) {
1229 ctx->promise.setTry(std::move(t));
1233 // Have time keeper use a weak ptr to hold ctx,
1234 // so that ctx can be deallocated as soon as the future job finished.
1235 tk->after(dur).then([weakCtx = to_weak_ptr(ctx)](Try<Unit> const& t) mutable {
1236 auto lockedCtx = weakCtx.lock();
1238 // ctx already released. "this" completed first, cancel "after"
1241 // "after" completed first, cancel "this"
1242 lockedCtx->thisFuture.raise(TimedOut());
1243 if (lockedCtx->token.exchange(true) == false) {
1244 if (t.hasException()) {
1245 lockedCtx->promise.setException(std::move(t.exception()));
1247 lockedCtx->promise.setException(std::move(lockedCtx->exception));
1252 return ctx->promise.getFuture().via(this->getExecutor());
1258 Future<T> Future<T>::delayed(Duration dur, Timekeeper* tk) {
1259 return collectAll(*this, futures::sleep(dur, tk))
1260 .then([](std::tuple<Try<T>, Try<Unit>> tup) {
1261 Try<T>& t = std::get<0>(tup);
1262 return makeFuture<T>(std::move(t));
1269 template <class FutureType, typename T = typename FutureType::value_type>
1270 void waitImpl(FutureType& f) {
1271 // short-circuit if there's nothing to do
1272 if (f.isReady()) return;
1274 FutureBatonType baton;
1275 f.setCallback_([&](const Try<T>& /* t */) { baton.post(); });
1277 assert(f.isReady());
1280 template <class FutureType, typename T = typename FutureType::value_type>
1281 void waitImpl(FutureType& f, Duration dur) {
1282 // short-circuit if there's nothing to do
1288 auto ret = promise.getFuture();
1289 auto baton = std::make_shared<FutureBatonType>();
1290 f.setCallback_([ baton, promise = std::move(promise) ](Try<T> && t) mutable {
1291 promise.setTry(std::move(t));
1295 if (baton->timed_wait(dur)) {
1296 assert(f.isReady());
1301 void waitViaImpl(Future<T>& f, DrivableExecutor* e) {
1302 // Set callback so to ensure that the via executor has something on it
1303 // so that once the preceding future triggers this callback, drive will
1304 // always have a callback to satisfy it
1307 f = f.via(e).then([](T&& t) { return std::move(t); });
1308 while (!f.isReady()) {
1311 assert(f.isReady());
1314 } // namespace detail
1315 } // namespace futures
1318 SemiFuture<T>& SemiFuture<T>::wait() & {
1319 futures::detail::waitImpl(*this);
1324 SemiFuture<T>&& SemiFuture<T>::wait() && {
1325 futures::detail::waitImpl(*this);
1326 return std::move(*this);
1330 SemiFuture<T>& SemiFuture<T>::wait(Duration dur) & {
1331 futures::detail::waitImpl(*this, dur);
1336 SemiFuture<T>&& SemiFuture<T>::wait(Duration dur) && {
1337 futures::detail::waitImpl(*this, dur);
1338 return std::move(*this);
1342 T SemiFuture<T>::get() {
1343 return std::move(wait().value());
1347 T SemiFuture<T>::get(Duration dur) {
1349 if (this->isReady()) {
1350 return std::move(this->value());
1357 Future<T>& Future<T>::wait() & {
1358 futures::detail::waitImpl(*this);
1363 Future<T>&& Future<T>::wait() && {
1364 futures::detail::waitImpl(*this);
1365 return std::move(*this);
1369 Future<T>& Future<T>::wait(Duration dur) & {
1370 futures::detail::waitImpl(*this, dur);
1375 Future<T>&& Future<T>::wait(Duration dur) && {
1376 futures::detail::waitImpl(*this, dur);
1377 return std::move(*this);
1381 Future<T>& Future<T>::waitVia(DrivableExecutor* e) & {
1382 futures::detail::waitViaImpl(*this, e);
1387 Future<T>&& Future<T>::waitVia(DrivableExecutor* e) && {
1388 futures::detail::waitViaImpl(*this, e);
1389 return std::move(*this);
1393 T Future<T>::getVia(DrivableExecutor* e) {
1394 return std::move(waitVia(e).value());
1401 static bool equals(const Try<T>& t1, const Try<T>& t2) {
1402 return t1.value() == t2.value();
1405 } // namespace detail
1406 } // namespace futures
1409 Future<bool> Future<T>::willEqual(Future<T>& f) {
1410 return collectAll(*this, f).then([](const std::tuple<Try<T>, Try<T>>& t) {
1411 if (std::get<0>(t).hasValue() && std::get<1>(t).hasValue()) {
1412 return futures::detail::TryEquals<T>::equals(
1413 std::get<0>(t), std::get<1>(t));
1422 Future<T> Future<T>::filter(F&& predicate) {
1423 return this->then([p = std::forward<F>(predicate)](T val) {
1424 T const& valConstRef = val;
1425 if (!p(valConstRef)) {
1426 throwPredicateDoesNotObtain();
1433 inline Future<Unit> when(bool p, F&& thunk) {
1434 return p ? std::forward<F>(thunk)().unit() : makeFuture();
1437 template <class P, class F>
1438 Future<Unit> whileDo(P&& predicate, F&& thunk) {
1440 auto future = thunk();
1441 return future.then([
1442 predicate = std::forward<P>(predicate),
1443 thunk = std::forward<F>(thunk)
1445 return whileDo(std::forward<P>(predicate), std::forward<F>(thunk));
1448 return makeFuture();
1452 Future<Unit> times(const int n, F&& thunk) {
1453 return folly::whileDo(
1454 [ n, count = std::make_unique<std::atomic<int>>(0) ]() mutable {
1455 return count->fetch_add(1) < n;
1457 std::forward<F>(thunk));
1461 template <class It, class F, class ItT, class Result>
1462 std::vector<Future<Result>> map(It first, It last, F func) {
1463 std::vector<Future<Result>> results;
1464 for (auto it = first; it != last; it++) {
1465 results.push_back(it->then(func));
1471 // Instantiate the most common Future types to save compile time
1472 extern template class Future<Unit>;
1473 extern template class Future<bool>;
1474 extern template class Future<int>;
1475 extern template class Future<int64_t>;
1476 extern template class Future<std::string>;
1477 extern template class Future<double>;
1478 } // namespace folly