2 * Copyright 2017 Facebook, Inc.
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
8 * http://www.apache.org/licenses/LICENSE-2.0
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
24 #include <folly/Baton.h>
25 #include <folly/Optional.h>
26 #include <folly/futures/Timekeeper.h>
27 #include <folly/futures/detail/Core.h>
29 #ifndef FOLLY_FUTURE_USING_FIBER
30 #if FOLLY_MOBILE || defined(__APPLE__)
31 #define FOLLY_FUTURE_USING_FIBER 0
33 #define FOLLY_FUTURE_USING_FIBER 1
34 #include <folly/fibers/Baton.h>
44 #if FOLLY_FUTURE_USING_FIBER
45 typedef folly::fibers::Baton FutureBatonType;
47 typedef folly::Baton<> FutureBatonType;
50 } // namespace futures
53 std::shared_ptr<Timekeeper> getTimekeeperSingleton();
58 // Guarantees that the stored functor is destructed before the stored promise
59 // may be fulfilled. Assumes the stored functor to be noexcept-destructible.
60 template <typename T, typename F>
61 class CoreCallbackState {
63 template <typename FF>
64 CoreCallbackState(Promise<T>&& promise, FF&& func) noexcept(
65 noexcept(F(std::declval<FF>())))
66 : func_(std::forward<FF>(func)), promise_(std::move(promise)) {
67 assert(before_barrier());
70 CoreCallbackState(CoreCallbackState&& that) noexcept(
71 noexcept(F(std::declval<F>()))) {
72 if (that.before_barrier()) {
73 new (&func_) F(std::move(that.func_));
74 promise_ = that.stealPromise();
78 CoreCallbackState& operator=(CoreCallbackState&&) = delete;
80 ~CoreCallbackState() {
81 if (before_barrier()) {
86 template <typename... Args>
87 auto invoke(Args&&... args) noexcept(
88 noexcept(std::declval<F&&>()(std::declval<Args&&>()...))) {
89 assert(before_barrier());
90 return std::move(func_)(std::forward<Args>(args)...);
93 template <typename... Args>
94 auto tryInvoke(Args&&... args) noexcept {
95 return makeTryWith([&] { return invoke(std::forward<Args>(args)...); });
98 void setTry(Try<T>&& t) {
99 stealPromise().setTry(std::move(t));
102 void setException(exception_wrapper&& ew) {
103 stealPromise().setException(std::move(ew));
106 Promise<T> stealPromise() noexcept {
107 assert(before_barrier());
109 return std::move(promise_);
113 bool before_barrier() const noexcept {
114 return !promise_.isFulfilled();
120 Promise<T> promise_{Promise<T>::makeEmpty()};
123 template <typename T, typename F>
124 inline auto makeCoreCallbackState(Promise<T>&& p, F&& f) noexcept(
125 noexcept(CoreCallbackState<T, _t<std::decay<F>>>(
126 std::declval<Promise<T>&&>(),
127 std::declval<F&&>()))) {
128 return CoreCallbackState<T, _t<std::decay<F>>>(
129 std::move(p), std::forward<F>(f));
131 } // namespace detail
132 } // namespace futures
135 SemiFuture<typename std::decay<T>::type> makeSemiFuture(T&& t) {
136 return makeSemiFuture(Try<typename std::decay<T>::type>(std::forward<T>(t)));
139 // makeSemiFutureWith(SemiFuture<T>()) -> SemiFuture<T>
141 typename std::enable_if<
142 isSemiFuture<typename std::result_of<F()>::type>::value,
143 typename std::result_of<F()>::type>::type
144 makeSemiFutureWith(F&& func) {
146 typename isSemiFuture<typename std::result_of<F()>::type>::Inner;
148 return std::forward<F>(func)();
149 } catch (std::exception& e) {
150 return makeSemiFuture<InnerType>(
151 exception_wrapper(std::current_exception(), e));
153 return makeSemiFuture<InnerType>(
154 exception_wrapper(std::current_exception()));
158 // makeSemiFutureWith(T()) -> SemiFuture<T>
159 // makeSemiFutureWith(void()) -> SemiFuture<Unit>
161 typename std::enable_if<
162 !(isSemiFuture<typename std::result_of<F()>::type>::value),
163 SemiFuture<Unit::LiftT<typename std::result_of<F()>::type>>>::type
164 makeSemiFutureWith(F&& func) {
165 using LiftedResult = Unit::LiftT<typename std::result_of<F()>::type>;
166 return makeSemiFuture<LiftedResult>(
167 makeTryWith([&func]() mutable { return std::forward<F>(func)(); }));
171 SemiFuture<T> makeSemiFuture(std::exception_ptr const& e) {
172 return makeSemiFuture(Try<T>(e));
176 SemiFuture<T> makeSemiFuture(exception_wrapper ew) {
177 return makeSemiFuture(Try<T>(std::move(ew)));
180 template <class T, class E>
182 enable_if<std::is_base_of<std::exception, E>::value, SemiFuture<T>>::type
183 makeSemiFuture(E const& e) {
184 return makeSemiFuture(Try<T>(make_exception_wrapper<E>(e)));
188 SemiFuture<T> makeSemiFuture(Try<T>&& t) {
189 return SemiFuture<T>(new futures::detail::Core<T>(std::move(t)));
193 SemiFuture<T> SemiFuture<T>::makeEmpty() {
194 return SemiFuture<T>(futures::detail::EmptyConstruct{});
198 SemiFuture<T>::SemiFuture(SemiFuture<T>&& other) noexcept : core_(other.core_) {
199 other.core_ = nullptr;
203 SemiFuture<T>& SemiFuture<T>::operator=(SemiFuture<T>&& other) noexcept {
204 std::swap(core_, other.core_);
209 SemiFuture<T>::SemiFuture(Future<T>&& other) noexcept : core_(other.core_) {
210 other.core_ = nullptr;
214 SemiFuture<T>& SemiFuture<T>::operator=(Future<T>&& other) noexcept {
215 std::swap(core_, other.core_);
220 template <class T2, typename>
221 SemiFuture<T>::SemiFuture(T2&& val)
222 : core_(new futures::detail::Core<T>(Try<T>(std::forward<T2>(val)))) {}
225 template <typename T2>
226 SemiFuture<T>::SemiFuture(
227 typename std::enable_if<std::is_same<Unit, T2>::value>::type*)
228 : core_(new futures::detail::Core<T>(Try<T>(T()))) {}
233 typename std::enable_if<std::is_constructible<T, Args&&...>::value, int>::
235 SemiFuture<T>::SemiFuture(in_place_t, Args&&... args)
237 new futures::detail::Core<T>(in_place, std::forward<Args>(args)...)) {
241 SemiFuture<T>::~SemiFuture() {
245 // This must be defined after the constructors to avoid a bug in MSVC
246 // https://connect.microsoft.com/VisualStudio/feedback/details/3142777/out-of-line-constructor-definition-after-implicit-reference-causes-incorrect-c2244
247 inline SemiFuture<Unit> makeSemiFuture() {
248 return makeSemiFuture(Unit{});
252 T& SemiFuture<T>::value() & {
255 return core_->getTry().value();
259 T const& SemiFuture<T>::value() const& {
262 return core_->getTry().value();
266 T&& SemiFuture<T>::value() && {
269 return std::move(core_->getTry().value());
273 T const&& SemiFuture<T>::value() const&& {
276 return std::move(core_->getTry().value());
280 inline Future<T> SemiFuture<T>::via(Executor* executor, int8_t priority) && {
283 setExecutor(executor, priority);
285 auto newFuture = Future<T>(core_);
291 inline Future<T> SemiFuture<T>::via(Executor* executor, int8_t priority) & {
294 auto f = p.getFuture();
295 auto func = [p = std::move(p)](Try<T>&& t) mutable {
296 p.setTry(std::move(t));
298 using R = futures::detail::callableResult<T, decltype(func)>;
299 thenImplementation<decltype(func), R>(std::move(func), typename R::Arg());
300 return std::move(f).via(executor, priority);
304 bool SemiFuture<T>::isReady() const {
306 return core_->ready();
310 bool SemiFuture<T>::hasValue() {
311 return getTry().hasValue();
315 bool SemiFuture<T>::hasException() {
316 return getTry().hasException();
320 void SemiFuture<T>::detach() {
322 core_->detachFuture();
328 Try<T>& SemiFuture<T>::getTry() {
331 return core_->getTry();
335 void SemiFuture<T>::throwIfInvalid() const {
341 Optional<Try<T>> SemiFuture<T>::poll() {
343 if (core_->ready()) {
344 o = std::move(core_->getTry());
350 void SemiFuture<T>::raise(exception_wrapper exception) {
351 core_->raise(std::move(exception));
356 void SemiFuture<T>::setCallback_(F&& func) {
358 core_->setCallback(std::forward<F>(func));
362 SemiFuture<T>::SemiFuture(futures::detail::EmptyConstruct) noexcept
366 Future<T> Future<T>::makeEmpty() {
367 return Future<T>(futures::detail::EmptyConstruct{});
371 Future<T>::Future(Future<T>&& other) noexcept
372 : SemiFuture<T>(std::move(other)) {}
375 Future<T>& Future<T>::operator=(Future<T>&& other) noexcept {
376 SemiFuture<T>::operator=(SemiFuture<T>{std::move(other)});
383 typename std::enable_if<
384 !std::is_same<T, typename std::decay<T2>::type>::value &&
385 std::is_constructible<T, T2&&>::value &&
386 std::is_convertible<T2&&, T>::value,
388 Future<T>::Future(Future<T2>&& other)
389 : Future(std::move(other).then([](T2&& v) { return T(std::move(v)); })) {}
394 typename std::enable_if<
395 !std::is_same<T, typename std::decay<T2>::type>::value &&
396 std::is_constructible<T, T2&&>::value &&
397 !std::is_convertible<T2&&, T>::value,
399 Future<T>::Future(Future<T2>&& other)
400 : Future(std::move(other).then([](T2&& v) { return T(std::move(v)); })) {}
405 typename std::enable_if<
406 !std::is_same<T, typename std::decay<T2>::type>::value &&
407 std::is_constructible<T, T2&&>::value,
409 Future<T>& Future<T>::operator=(Future<T2>&& other) {
411 std::move(other).then([](T2&& v) { return T(std::move(v)); }));
414 // TODO: isSemiFuture
416 template <class T2, typename>
417 Future<T>::Future(T2&& val) : SemiFuture<T>(std::forward<T2>(val)) {}
420 template <typename T2>
421 Future<T>::Future(typename std::enable_if<std::is_same<Unit, T2>::value>::type*)
427 typename std::enable_if<std::is_constructible<T, Args&&...>::value, int>::
429 Future<T>::Future(in_place_t, Args&&... args)
430 : SemiFuture<T>(in_place, std::forward<Args>(args)...) {}
433 Future<T>::~Future() {
440 typename std::enable_if<isFuture<F>::value,
441 Future<typename isFuture<T>::Inner>>::type
442 Future<T>::unwrap() {
443 return then([](Future<typename isFuture<T>::Inner> internal_future) {
444 return internal_future;
450 // Variant: returns a value
451 // e.g. f.then([](Try<T>&& t){ return t.value(); });
453 template <typename F, typename R, bool isTry, typename... Args>
454 typename std::enable_if<!R::ReturnsFuture::value, typename R::Return>::type
455 SemiFuture<T>::thenImplementation(
457 futures::detail::argResult<isTry, F, Args...>) {
458 static_assert(sizeof...(Args) <= 1, "Then must take zero/one argument");
459 typedef typename R::ReturnsFuture::Inner B;
461 this->throwIfInvalid();
464 p.core_->setInterruptHandlerNoLock(this->core_->getInterruptHandler());
466 // grab the Future now before we lose our handle on the Promise
467 auto f = p.getFuture();
468 f.core_->setExecutorNoLock(this->getExecutor());
470 /* This is a bit tricky.
472 We can't just close over *this in case this Future gets moved. So we
473 make a new dummy Future. We could figure out something more
474 sophisticated that avoids making a new Future object when it can, as an
475 optimization. But this is correct.
477 core_ can't be moved, it is explicitly disallowed (as is copying). But
478 if there's ever a reason to allow it, this is one place that makes that
479 assumption and would need to be fixed. We use a standard shared pointer
480 for core_ (by copying it in), which means in essence obj holds a shared
481 pointer to itself. But this shouldn't leak because Promise will not
482 outlive the continuation, because Promise will setException() with a
483 broken Promise if it is destructed before completed. We could use a
484 weak pointer but it would have to be converted to a shared pointer when
485 func is executed (because the Future returned by func may possibly
486 persist beyond the callback, if it gets moved), and so it is an
487 optimization to just make it shared from the get-go.
489 Two subtle but important points about this design. futures::detail::Core
490 has no back pointers to Future or Promise, so if Future or Promise get
491 moved (and they will be moved in performant code) we don't have to do
492 anything fancy. And because we store the continuation in the
493 futures::detail::Core, not in the Future, we can execute the continuation
494 even after the Future has gone out of scope. This is an intentional design
495 decision. It is likely we will want to be able to cancel a continuation
496 in some circumstances, but I think it should be explicit not implicit
497 in the destruction of the Future used to create it.
500 [state = futures::detail::makeCoreCallbackState(
501 std::move(p), std::forward<F>(func))](Try<T>&& t) mutable {
503 if (!isTry && t.hasException()) {
504 state.setException(std::move(t.exception()));
506 state.setTry(makeTryWith(
507 [&] { return state.invoke(t.template get<isTry, Args>()...); }));
513 // Variant: returns a Future
514 // e.g. f.then([](T&& t){ return makeFuture<T>(t); });
516 template <typename F, typename R, bool isTry, typename... Args>
517 typename std::enable_if<R::ReturnsFuture::value, typename R::Return>::type
518 SemiFuture<T>::thenImplementation(
520 futures::detail::argResult<isTry, F, Args...>) {
521 static_assert(sizeof...(Args) <= 1, "Then must take zero/one argument");
522 typedef typename R::ReturnsFuture::Inner B;
523 this->throwIfInvalid();
526 p.core_->setInterruptHandlerNoLock(this->core_->getInterruptHandler());
528 // grab the Future now before we lose our handle on the Promise
529 auto f = p.getFuture();
530 f.core_->setExecutorNoLock(this->getExecutor());
533 [state = futures::detail::makeCoreCallbackState(
534 std::move(p), std::forward<F>(func))](Try<T>&& t) mutable {
535 if (!isTry && t.hasException()) {
536 state.setException(std::move(t.exception()));
538 auto tf2 = state.tryInvoke(t.template get<isTry, Args>()...);
539 if (tf2.hasException()) {
540 state.setException(std::move(tf2.exception()));
542 tf2->setCallback_([p = state.stealPromise()](Try<B> && b) mutable {
543 p.setTry(std::move(b));
552 template <typename T>
553 template <typename R, typename Caller, typename... Args>
554 Future<typename isFuture<R>::Inner>
555 Future<T>::then(R(Caller::*func)(Args...), Caller *instance) {
556 typedef typename std::remove_cv<typename std::remove_reference<
557 typename futures::detail::ArgType<Args...>::FirstArg>::type>::type
560 return then([instance, func](Try<T>&& t){
561 return (instance->*func)(t.template get<isTry<FirstArg>::value, Args>()...);
566 Future<Unit> Future<T>::then() {
567 return then([] () {});
570 // onError where the callback returns T
573 typename std::enable_if<
574 !futures::detail::callableWith<F, exception_wrapper>::value &&
575 !futures::detail::callableWith<F, exception_wrapper&>::value &&
576 !futures::detail::Extract<F>::ReturnsFuture::value,
578 Future<T>::onError(F&& func) {
579 typedef std::remove_reference_t<
580 typename futures::detail::Extract<F>::FirstArg>
583 std::is_same<typename futures::detail::Extract<F>::RawReturn, T>::value,
584 "Return type of onError callback must be T or Future<T>");
587 p.core_->setInterruptHandlerNoLock(this->core_->getInterruptHandler());
588 auto f = p.getFuture();
591 [state = futures::detail::makeCoreCallbackState(
592 std::move(p), std::forward<F>(func))](Try<T>&& t) mutable {
593 if (auto e = t.template tryGetExceptionObject<Exn>()) {
594 state.setTry(makeTryWith([&] { return state.invoke(*e); }));
596 state.setTry(std::move(t));
603 // onError where the callback returns Future<T>
606 typename std::enable_if<
607 !futures::detail::callableWith<F, exception_wrapper>::value &&
608 !futures::detail::callableWith<F, exception_wrapper&>::value &&
609 futures::detail::Extract<F>::ReturnsFuture::value,
611 Future<T>::onError(F&& func) {
613 std::is_same<typename futures::detail::Extract<F>::Return, Future<T>>::
615 "Return type of onError callback must be T or Future<T>");
616 typedef std::remove_reference_t<
617 typename futures::detail::Extract<F>::FirstArg>
621 auto f = p.getFuture();
624 [state = futures::detail::makeCoreCallbackState(
625 std::move(p), std::forward<F>(func))](Try<T>&& t) mutable {
626 if (auto e = t.template tryGetExceptionObject<Exn>()) {
627 auto tf2 = state.tryInvoke(*e);
628 if (tf2.hasException()) {
629 state.setException(std::move(tf2.exception()));
631 tf2->setCallback_([p = state.stealPromise()](Try<T> && t3) mutable {
632 p.setTry(std::move(t3));
636 state.setTry(std::move(t));
645 Future<T> Future<T>::ensure(F&& func) {
646 return this->then([funcw = std::forward<F>(func)](Try<T> && t) mutable {
648 return makeFuture(std::move(t));
654 Future<T> Future<T>::onTimeout(Duration dur, F&& func, Timekeeper* tk) {
655 return within(dur, tk).onError([funcw = std::forward<F>(func)](
656 TimedOut const&) { return std::move(funcw)(); });
661 typename std::enable_if<
662 futures::detail::callableWith<F, exception_wrapper>::value &&
663 futures::detail::Extract<F>::ReturnsFuture::value,
665 Future<T>::onError(F&& func) {
667 std::is_same<typename futures::detail::Extract<F>::Return, Future<T>>::
669 "Return type of onError callback must be T or Future<T>");
672 auto f = p.getFuture();
674 [state = futures::detail::makeCoreCallbackState(
675 std::move(p), std::forward<F>(func))](Try<T> t) mutable {
676 if (t.hasException()) {
677 auto tf2 = state.tryInvoke(std::move(t.exception()));
678 if (tf2.hasException()) {
679 state.setException(std::move(tf2.exception()));
681 tf2->setCallback_([p = state.stealPromise()](Try<T> && t3) mutable {
682 p.setTry(std::move(t3));
686 state.setTry(std::move(t));
693 // onError(exception_wrapper) that returns T
696 typename std::enable_if<
697 futures::detail::callableWith<F, exception_wrapper>::value &&
698 !futures::detail::Extract<F>::ReturnsFuture::value,
700 Future<T>::onError(F&& func) {
702 std::is_same<typename futures::detail::Extract<F>::Return, Future<T>>::
704 "Return type of onError callback must be T or Future<T>");
707 auto f = p.getFuture();
709 [state = futures::detail::makeCoreCallbackState(
710 std::move(p), std::forward<F>(func))](Try<T>&& t) mutable {
711 if (t.hasException()) {
712 state.setTry(makeTryWith(
713 [&] { return state.invoke(std::move(t.exception())); }));
715 state.setTry(std::move(t));
723 Try<T>& Future<T>::getTryVia(DrivableExecutor* e) {
724 return waitVia(e).getTry();
727 template <class Func>
728 auto via(Executor* x, Func&& func)
729 -> Future<typename isFuture<decltype(std::declval<Func>()())>::Inner> {
730 // TODO make this actually more performant. :-P #7260175
731 return via(x).then(std::forward<Func>(func));
735 Future<T>::Future(futures::detail::EmptyConstruct) noexcept
736 : SemiFuture<T>(futures::detail::EmptyConstruct{}) {}
741 Future<typename std::decay<T>::type> makeFuture(T&& t) {
742 return makeFuture(Try<typename std::decay<T>::type>(std::forward<T>(t)));
745 inline Future<Unit> makeFuture() {
746 return makeFuture(Unit{});
749 // makeFutureWith(Future<T>()) -> Future<T>
751 typename std::enable_if<isFuture<typename std::result_of<F()>::type>::value,
752 typename std::result_of<F()>::type>::type
753 makeFutureWith(F&& func) {
755 typename isFuture<typename std::result_of<F()>::type>::Inner;
757 return std::forward<F>(func)();
758 } catch (std::exception& e) {
759 return makeFuture<InnerType>(
760 exception_wrapper(std::current_exception(), e));
762 return makeFuture<InnerType>(exception_wrapper(std::current_exception()));
766 // makeFutureWith(T()) -> Future<T>
767 // makeFutureWith(void()) -> Future<Unit>
769 typename std::enable_if<
770 !(isFuture<typename std::result_of<F()>::type>::value),
771 Future<Unit::LiftT<typename std::result_of<F()>::type>>>::type
772 makeFutureWith(F&& func) {
773 using LiftedResult = Unit::LiftT<typename std::result_of<F()>::type>;
774 return makeFuture<LiftedResult>(
775 makeTryWith([&func]() mutable { return std::forward<F>(func)(); }));
779 Future<T> makeFuture(std::exception_ptr const& e) {
780 return makeFuture(Try<T>(e));
784 Future<T> makeFuture(exception_wrapper ew) {
785 return makeFuture(Try<T>(std::move(ew)));
788 template <class T, class E>
789 typename std::enable_if<std::is_base_of<std::exception, E>::value,
791 makeFuture(E const& e) {
792 return makeFuture(Try<T>(make_exception_wrapper<E>(e)));
796 Future<T> makeFuture(Try<T>&& t) {
797 return Future<T>(new futures::detail::Core<T>(std::move(t)));
801 Future<Unit> via(Executor* executor, int8_t priority) {
802 return makeFuture().via(executor, priority);
805 // mapSetCallback calls func(i, Try<T>) when every future completes
807 template <class T, class InputIterator, class F>
808 void mapSetCallback(InputIterator first, InputIterator last, F func) {
809 for (size_t i = 0; first != last; ++first, ++i) {
810 first->setCallback_([func, i](Try<T>&& t) {
811 func(i, std::move(t));
816 // collectAll (variadic)
818 template <typename... Fs>
819 typename futures::detail::CollectAllVariadicContext<
820 typename std::decay<Fs>::type::value_type...>::type
821 collectAll(Fs&&... fs) {
822 auto ctx = std::make_shared<futures::detail::CollectAllVariadicContext<
823 typename std::decay<Fs>::type::value_type...>>();
824 futures::detail::collectVariadicHelper<
825 futures::detail::CollectAllVariadicContext>(ctx, std::forward<Fs>(fs)...);
826 return ctx->p.getFuture();
829 // collectAll (iterator)
831 template <class InputIterator>
834 Try<typename std::iterator_traits<InputIterator>::value_type::value_type>>>
835 collectAll(InputIterator first, InputIterator last) {
837 typename std::iterator_traits<InputIterator>::value_type::value_type T;
839 struct CollectAllContext {
840 CollectAllContext(size_t n) : results(n) {}
841 ~CollectAllContext() {
842 p.setValue(std::move(results));
844 Promise<std::vector<Try<T>>> p;
845 std::vector<Try<T>> results;
849 std::make_shared<CollectAllContext>(size_t(std::distance(first, last)));
850 mapSetCallback<T>(first, last, [ctx](size_t i, Try<T>&& t) {
851 ctx->results[i] = std::move(t);
853 return ctx->p.getFuture();
856 // collect (iterator)
861 template <typename T>
862 struct CollectContext {
864 explicit Nothing(int /* n */) {}
867 using Result = typename std::conditional<
868 std::is_void<T>::value,
870 std::vector<T>>::type;
872 using InternalResult = typename std::conditional<
873 std::is_void<T>::value,
875 std::vector<Optional<T>>>::type;
877 explicit CollectContext(size_t n) : result(n) {}
879 if (!threw.exchange(true)) {
880 // map Optional<T> -> T
881 std::vector<T> finalResult;
882 finalResult.reserve(result.size());
883 std::transform(result.begin(), result.end(),
884 std::back_inserter(finalResult),
885 [](Optional<T>& o) { return std::move(o.value()); });
886 p.setValue(std::move(finalResult));
889 inline void setPartialResult(size_t i, Try<T>& t) {
890 result[i] = std::move(t.value());
893 InternalResult result;
894 std::atomic<bool> threw {false};
897 } // namespace detail
898 } // namespace futures
900 template <class InputIterator>
901 Future<typename futures::detail::CollectContext<typename std::iterator_traits<
902 InputIterator>::value_type::value_type>::Result>
903 collect(InputIterator first, InputIterator last) {
905 typename std::iterator_traits<InputIterator>::value_type::value_type T;
907 auto ctx = std::make_shared<futures::detail::CollectContext<T>>(
908 std::distance(first, last));
909 mapSetCallback<T>(first, last, [ctx](size_t i, Try<T>&& t) {
910 if (t.hasException()) {
911 if (!ctx->threw.exchange(true)) {
912 ctx->p.setException(std::move(t.exception()));
914 } else if (!ctx->threw) {
915 ctx->setPartialResult(i, t);
918 return ctx->p.getFuture();
921 // collect (variadic)
923 template <typename... Fs>
924 typename futures::detail::CollectVariadicContext<
925 typename std::decay<Fs>::type::value_type...>::type
926 collect(Fs&&... fs) {
927 auto ctx = std::make_shared<futures::detail::CollectVariadicContext<
928 typename std::decay<Fs>::type::value_type...>>();
929 futures::detail::collectVariadicHelper<
930 futures::detail::CollectVariadicContext>(ctx, std::forward<Fs>(fs)...);
931 return ctx->p.getFuture();
934 // collectAny (iterator)
936 template <class InputIterator>
941 std::iterator_traits<InputIterator>::value_type::value_type>>>
942 collectAny(InputIterator first, InputIterator last) {
944 typename std::iterator_traits<InputIterator>::value_type::value_type T;
946 struct CollectAnyContext {
947 CollectAnyContext() {}
948 Promise<std::pair<size_t, Try<T>>> p;
949 std::atomic<bool> done {false};
952 auto ctx = std::make_shared<CollectAnyContext>();
953 mapSetCallback<T>(first, last, [ctx](size_t i, Try<T>&& t) {
954 if (!ctx->done.exchange(true)) {
955 ctx->p.setValue(std::make_pair(i, std::move(t)));
958 return ctx->p.getFuture();
961 // collectAnyWithoutException (iterator)
963 template <class InputIterator>
966 typename std::iterator_traits<InputIterator>::value_type::value_type>>
967 collectAnyWithoutException(InputIterator first, InputIterator last) {
969 typename std::iterator_traits<InputIterator>::value_type::value_type T;
971 struct CollectAnyWithoutExceptionContext {
972 CollectAnyWithoutExceptionContext(){}
973 Promise<std::pair<size_t, T>> p;
974 std::atomic<bool> done{false};
975 std::atomic<size_t> nFulfilled{0};
979 auto ctx = std::make_shared<CollectAnyWithoutExceptionContext>();
980 ctx->nTotal = size_t(std::distance(first, last));
982 mapSetCallback<T>(first, last, [ctx](size_t i, Try<T>&& t) {
983 if (!t.hasException() && !ctx->done.exchange(true)) {
984 ctx->p.setValue(std::make_pair(i, std::move(t.value())));
985 } else if (++ctx->nFulfilled == ctx->nTotal) {
986 ctx->p.setException(t.exception());
989 return ctx->p.getFuture();
992 // collectN (iterator)
994 template <class InputIterator>
995 Future<std::vector<std::pair<size_t, Try<typename
996 std::iterator_traits<InputIterator>::value_type::value_type>>>>
997 collectN(InputIterator first, InputIterator last, size_t n) {
999 std::iterator_traits<InputIterator>::value_type::value_type T;
1000 typedef std::vector<std::pair<size_t, Try<T>>> V;
1002 struct CollectNContext {
1004 std::atomic<size_t> completed = {0};
1007 auto ctx = std::make_shared<CollectNContext>();
1009 if (size_t(std::distance(first, last)) < n) {
1010 ctx->p.setException(std::runtime_error("Not enough futures"));
1012 // for each completed Future, increase count and add to vector, until we
1013 // have n completed futures at which point we fulfil our Promise with the
1015 mapSetCallback<T>(first, last, [ctx, n](size_t i, Try<T>&& t) {
1016 auto c = ++ctx->completed;
1018 assert(ctx->v.size() < n);
1019 ctx->v.emplace_back(i, std::move(t));
1021 ctx->p.setTry(Try<V>(std::move(ctx->v)));
1027 return ctx->p.getFuture();
1030 // reduce (iterator)
1032 template <class It, class T, class F>
1033 Future<T> reduce(It first, It last, T&& initial, F&& func) {
1034 if (first == last) {
1035 return makeFuture(std::move(initial));
1038 typedef typename std::iterator_traits<It>::value_type::value_type ItT;
1039 typedef typename std::conditional<
1040 futures::detail::callableWith<F, T&&, Try<ItT>&&>::value,
1043 typedef isTry<Arg> IsTry;
1045 auto sfunc = std::make_shared<F>(std::move(func));
1047 auto f = first->then(
1048 [ minitial = std::move(initial), sfunc ](Try<ItT> & head) mutable {
1050 std::move(minitial), head.template get<IsTry::value, Arg&&>());
1053 for (++first; first != last; ++first) {
1054 f = collectAll(f, *first).then([sfunc](std::tuple<Try<T>, Try<ItT>>& t) {
1055 return (*sfunc)(std::move(std::get<0>(t).value()),
1056 // Either return a ItT&& or a Try<ItT>&& depending
1057 // on the type of the argument of func.
1058 std::get<1>(t).template get<IsTry::value, Arg&&>());
1065 // window (collection)
1067 template <class Collection, class F, class ItT, class Result>
1068 std::vector<Future<Result>>
1069 window(Collection input, F func, size_t n) {
1070 struct WindowContext {
1071 WindowContext(Collection&& i, F&& fn)
1072 : input_(std::move(i)), promises_(input_.size()),
1073 func_(std::move(fn))
1075 std::atomic<size_t> i_ {0};
1077 std::vector<Promise<Result>> promises_;
1080 static inline void spawn(const std::shared_ptr<WindowContext>& ctx) {
1081 size_t i = ctx->i_++;
1082 if (i < ctx->input_.size()) {
1083 // Using setCallback_ directly since we don't need the Future
1084 ctx->func_(std::move(ctx->input_[i])).setCallback_(
1085 // ctx is captured by value
1086 [ctx, i](Try<Result>&& t) {
1087 ctx->promises_[i].setTry(std::move(t));
1088 // Chain another future onto this one
1089 spawn(std::move(ctx));
1095 auto max = std::min(n, input.size());
1097 auto ctx = std::make_shared<WindowContext>(
1098 std::move(input), std::move(func));
1100 for (size_t i = 0; i < max; ++i) {
1101 // Start the first n Futures
1102 WindowContext::spawn(ctx);
1105 std::vector<Future<Result>> futures;
1106 futures.reserve(ctx->promises_.size());
1107 for (auto& promise : ctx->promises_) {
1108 futures.emplace_back(promise.getFuture());
1117 template <class I, class F>
1118 Future<I> Future<T>::reduce(I&& initial, F&& func) {
1120 minitial = std::forward<I>(initial),
1121 mfunc = std::forward<F>(func)
1122 ](T& vals) mutable {
1123 auto ret = std::move(minitial);
1124 for (auto& val : vals) {
1125 ret = mfunc(std::move(ret), std::move(val));
1131 // unorderedReduce (iterator)
1133 template <class It, class T, class F, class ItT, class Arg>
1134 Future<T> unorderedReduce(It first, It last, T initial, F func) {
1135 if (first == last) {
1136 return makeFuture(std::move(initial));
1139 typedef isTry<Arg> IsTry;
1141 struct UnorderedReduceContext {
1142 UnorderedReduceContext(T&& memo, F&& fn, size_t n)
1143 : lock_(), memo_(makeFuture<T>(std::move(memo))),
1144 func_(std::move(fn)), numThens_(0), numFutures_(n), promise_()
1146 folly::MicroSpinLock lock_; // protects memo_ and numThens_
1149 size_t numThens_; // how many Futures completed and called .then()
1150 size_t numFutures_; // how many Futures in total
1151 Promise<T> promise_;
1154 auto ctx = std::make_shared<UnorderedReduceContext>(
1155 std::move(initial), std::move(func), std::distance(first, last));
1157 mapSetCallback<ItT>(
1160 [ctx](size_t /* i */, Try<ItT>&& t) {
1161 // Futures can be completed in any order, simultaneously.
1162 // To make this non-blocking, we create a new Future chain in
1163 // the order of completion to reduce the values.
1164 // The spinlock just protects chaining a new Future, not actually
1165 // executing the reduce, which should be really fast.
1166 folly::MSLGuard lock(ctx->lock_);
1168 ctx->memo_.then([ ctx, mt = std::move(t) ](T && v) mutable {
1169 // Either return a ItT&& or a Try<ItT>&& depending
1170 // on the type of the argument of func.
1171 return ctx->func_(std::move(v),
1172 mt.template get<IsTry::value, Arg&&>());
1174 if (++ctx->numThens_ == ctx->numFutures_) {
1175 // After reducing the value of the last Future, fulfill the Promise
1176 ctx->memo_.setCallback_(
1177 [ctx](Try<T>&& t2) { ctx->promise_.setValue(std::move(t2)); });
1181 return ctx->promise_.getFuture();
1187 Future<T> Future<T>::within(Duration dur, Timekeeper* tk) {
1188 return within(dur, TimedOut(), tk);
1193 Future<T> Future<T>::within(Duration dur, E e, Timekeeper* tk) {
1196 Context(E ex) : exception(std::move(ex)), promise() {}
1198 Future<Unit> thisFuture;
1200 std::atomic<bool> token {false};
1203 if (this->isReady()) {
1204 return std::move(*this);
1207 std::shared_ptr<Timekeeper> tks;
1209 tks = folly::detail::getTimekeeperSingleton();
1210 tk = DCHECK_NOTNULL(tks.get());
1213 auto ctx = std::make_shared<Context>(std::move(e));
1215 ctx->thisFuture = this->then([ctx](Try<T>&& t) mutable {
1216 if (ctx->token.exchange(true) == false) {
1217 ctx->promise.setTry(std::move(t));
1221 // Have time keeper use a weak ptr to hold ctx,
1222 // so that ctx can be deallocated as soon as the future job finished.
1223 tk->after(dur).then([weakCtx = to_weak_ptr(ctx)](Try<Unit> const& t) mutable {
1224 auto lockedCtx = weakCtx.lock();
1226 // ctx already released. "this" completed first, cancel "after"
1229 // "after" completed first, cancel "this"
1230 lockedCtx->thisFuture.raise(TimedOut());
1231 if (lockedCtx->token.exchange(true) == false) {
1232 if (t.hasException()) {
1233 lockedCtx->promise.setException(std::move(t.exception()));
1235 lockedCtx->promise.setException(std::move(lockedCtx->exception));
1240 return ctx->promise.getFuture().via(this->getExecutor());
1246 Future<T> Future<T>::delayed(Duration dur, Timekeeper* tk) {
1247 return collectAll(*this, futures::sleep(dur, tk))
1248 .then([](std::tuple<Try<T>, Try<Unit>> tup) {
1249 Try<T>& t = std::get<0>(tup);
1250 return makeFuture<T>(std::move(t));
1257 template <class FutureType, typename T = typename FutureType::value_type>
1258 void waitImpl(FutureType& f) {
1259 // short-circuit if there's nothing to do
1260 if (f.isReady()) return;
1262 FutureBatonType baton;
1263 f.setCallback_([&](const Try<T>& /* t */) { baton.post(); });
1265 assert(f.isReady());
1268 template <class FutureType, typename T = typename FutureType::value_type>
1269 void waitImpl(FutureType& f, Duration dur) {
1270 // short-circuit if there's nothing to do
1276 auto ret = promise.getFuture();
1277 auto baton = std::make_shared<FutureBatonType>();
1278 f.setCallback_([ baton, promise = std::move(promise) ](Try<T> && t) mutable {
1279 promise.setTry(std::move(t));
1283 if (baton->timed_wait(dur)) {
1284 assert(f.isReady());
1289 void waitViaImpl(Future<T>& f, DrivableExecutor* e) {
1290 // Set callback so to ensure that the via executor has something on it
1291 // so that once the preceding future triggers this callback, drive will
1292 // always have a callback to satisfy it
1295 f = f.via(e).then([](T&& t) { return std::move(t); });
1296 while (!f.isReady()) {
1299 assert(f.isReady());
1302 } // namespace detail
1303 } // namespace futures
1306 SemiFuture<T>& SemiFuture<T>::wait() & {
1307 futures::detail::waitImpl(*this);
1312 SemiFuture<T>&& SemiFuture<T>::wait() && {
1313 futures::detail::waitImpl(*this);
1314 return std::move(*this);
1318 SemiFuture<T>& SemiFuture<T>::wait(Duration dur) & {
1319 futures::detail::waitImpl(*this, dur);
1324 SemiFuture<T>&& SemiFuture<T>::wait(Duration dur) && {
1325 futures::detail::waitImpl(*this, dur);
1326 return std::move(*this);
1330 T SemiFuture<T>::get() {
1331 return std::move(wait().value());
1335 T SemiFuture<T>::get(Duration dur) {
1337 if (this->isReady()) {
1338 return std::move(this->value());
1345 Future<T>& Future<T>::wait() & {
1346 futures::detail::waitImpl(*this);
1351 Future<T>&& Future<T>::wait() && {
1352 futures::detail::waitImpl(*this);
1353 return std::move(*this);
1357 Future<T>& Future<T>::wait(Duration dur) & {
1358 futures::detail::waitImpl(*this, dur);
1363 Future<T>&& Future<T>::wait(Duration dur) && {
1364 futures::detail::waitImpl(*this, dur);
1365 return std::move(*this);
1369 Future<T>& Future<T>::waitVia(DrivableExecutor* e) & {
1370 futures::detail::waitViaImpl(*this, e);
1375 Future<T>&& Future<T>::waitVia(DrivableExecutor* e) && {
1376 futures::detail::waitViaImpl(*this, e);
1377 return std::move(*this);
1381 T Future<T>::getVia(DrivableExecutor* e) {
1382 return std::move(waitVia(e).value());
1389 static bool equals(const Try<T>& t1, const Try<T>& t2) {
1390 return t1.value() == t2.value();
1393 } // namespace detail
1394 } // namespace futures
1397 Future<bool> Future<T>::willEqual(Future<T>& f) {
1398 return collectAll(*this, f).then([](const std::tuple<Try<T>, Try<T>>& t) {
1399 if (std::get<0>(t).hasValue() && std::get<1>(t).hasValue()) {
1400 return futures::detail::TryEquals<T>::equals(
1401 std::get<0>(t), std::get<1>(t));
1410 Future<T> Future<T>::filter(F&& predicate) {
1411 return this->then([p = std::forward<F>(predicate)](T val) {
1412 T const& valConstRef = val;
1413 if (!p(valConstRef)) {
1414 throwPredicateDoesNotObtain();
1421 inline Future<Unit> when(bool p, F&& thunk) {
1422 return p ? std::forward<F>(thunk)().unit() : makeFuture();
1425 template <class P, class F>
1426 Future<Unit> whileDo(P&& predicate, F&& thunk) {
1428 auto future = thunk();
1429 return future.then([
1430 predicate = std::forward<P>(predicate),
1431 thunk = std::forward<F>(thunk)
1433 return whileDo(std::forward<P>(predicate), std::forward<F>(thunk));
1436 return makeFuture();
1440 Future<Unit> times(const int n, F&& thunk) {
1441 return folly::whileDo(
1442 [ n, count = std::make_unique<std::atomic<int>>(0) ]() mutable {
1443 return count->fetch_add(1) < n;
1445 std::forward<F>(thunk));
1449 template <class It, class F, class ItT, class Result>
1450 std::vector<Future<Result>> map(It first, It last, F func) {
1451 std::vector<Future<Result>> results;
1452 for (auto it = first; it != last; it++) {
1453 results.push_back(it->then(func));
1459 // Instantiate the most common Future types to save compile time
1460 extern template class Future<Unit>;
1461 extern template class Future<bool>;
1462 extern template class Future<int>;
1463 extern template class Future<int64_t>;
1464 extern template class Future<std::string>;
1465 extern template class Future<double>;
1466 } // namespace folly