2 * Copyright 2017 Facebook, Inc.
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
8 * http://www.apache.org/licenses/LICENSE-2.0
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
24 #include <folly/Baton.h>
25 #include <folly/Optional.h>
26 #include <folly/executors/InlineExecutor.h>
27 #include <folly/futures/Timekeeper.h>
28 #include <folly/futures/detail/Core.h>
30 #ifndef FOLLY_FUTURE_USING_FIBER
31 #if FOLLY_MOBILE || defined(__APPLE__)
32 #define FOLLY_FUTURE_USING_FIBER 0
34 #define FOLLY_FUTURE_USING_FIBER 1
35 #include <folly/fibers/Baton.h>
45 #if FOLLY_FUTURE_USING_FIBER
46 typedef folly::fibers::Baton FutureBatonType;
48 typedef folly::Baton<> FutureBatonType;
51 } // namespace futures
54 std::shared_ptr<Timekeeper> getTimekeeperSingleton();
59 // Guarantees that the stored functor is destructed before the stored promise
60 // may be fulfilled. Assumes the stored functor to be noexcept-destructible.
61 template <typename T, typename F>
62 class CoreCallbackState {
64 template <typename FF>
65 CoreCallbackState(Promise<T>&& promise, FF&& func) noexcept(
66 noexcept(F(std::declval<FF>())))
67 : func_(std::forward<FF>(func)), promise_(std::move(promise)) {
68 assert(before_barrier());
71 CoreCallbackState(CoreCallbackState&& that) noexcept(
72 noexcept(F(std::declval<F>()))) {
73 if (that.before_barrier()) {
74 new (&func_) F(std::move(that.func_));
75 promise_ = that.stealPromise();
79 CoreCallbackState& operator=(CoreCallbackState&&) = delete;
81 ~CoreCallbackState() {
82 if (before_barrier()) {
87 template <typename... Args>
88 auto invoke(Args&&... args) noexcept(
89 noexcept(std::declval<F&&>()(std::declval<Args&&>()...))) {
90 assert(before_barrier());
91 return std::move(func_)(std::forward<Args>(args)...);
94 template <typename... Args>
95 auto tryInvoke(Args&&... args) noexcept {
96 return makeTryWith([&] { return invoke(std::forward<Args>(args)...); });
99 void setTry(Try<T>&& t) {
100 stealPromise().setTry(std::move(t));
103 void setException(exception_wrapper&& ew) {
104 stealPromise().setException(std::move(ew));
107 Promise<T> stealPromise() noexcept {
108 assert(before_barrier());
110 return std::move(promise_);
114 bool before_barrier() const noexcept {
115 return !promise_.isFulfilled();
121 Promise<T> promise_{Promise<T>::makeEmpty()};
124 template <typename T, typename F>
125 inline auto makeCoreCallbackState(Promise<T>&& p, F&& f) noexcept(
126 noexcept(CoreCallbackState<T, _t<std::decay<F>>>(
127 std::declval<Promise<T>&&>(),
128 std::declval<F&&>()))) {
129 return CoreCallbackState<T, _t<std::decay<F>>>(
130 std::move(p), std::forward<F>(f));
134 FutureBase<T>::FutureBase(SemiFuture<T>&& other) noexcept : core_(other.core_) {
135 other.core_ = nullptr;
139 FutureBase<T>::FutureBase(Future<T>&& other) noexcept : core_(other.core_) {
140 other.core_ = nullptr;
144 template <class T2, typename>
145 FutureBase<T>::FutureBase(T2&& val)
146 : core_(new futures::detail::Core<T>(Try<T>(std::forward<T2>(val)))) {}
149 template <typename T2>
150 FutureBase<T>::FutureBase(
151 typename std::enable_if<std::is_same<Unit, T2>::value>::type*)
152 : core_(new futures::detail::Core<T>(Try<T>(T()))) {}
157 typename std::enable_if<std::is_constructible<T, Args&&...>::value, int>::
159 FutureBase<T>::FutureBase(in_place_t, Args&&... args)
161 new futures::detail::Core<T>(in_place, std::forward<Args>(args)...)) {
165 template <class FutureType>
166 void FutureBase<T>::assign(FutureType& other) noexcept {
167 std::swap(core_, other.core_);
171 FutureBase<T>::~FutureBase() {
176 T& FutureBase<T>::value() & {
179 return core_->getTry().value();
183 T const& FutureBase<T>::value() const& {
186 return core_->getTry().value();
190 T&& FutureBase<T>::value() && {
193 return std::move(core_->getTry().value());
197 T const&& FutureBase<T>::value() const&& {
200 return std::move(core_->getTry().value());
204 inline Future<T> FutureBase<T>::via(Executor* executor, int8_t priority) && {
207 setExecutor(executor, priority);
209 auto newFuture = Future<T>(core_);
215 bool FutureBase<T>::isReady() const {
217 return core_->ready();
221 bool FutureBase<T>::hasValue() {
222 return getTry().hasValue();
226 bool FutureBase<T>::hasException() {
227 return getTry().hasException();
231 void FutureBase<T>::detach() {
233 core_->detachFuture();
239 Try<T>& FutureBase<T>::getTry() {
242 return core_->getTry();
246 void FutureBase<T>::throwIfInvalid() const {
253 Optional<Try<T>> FutureBase<T>::poll() {
255 if (core_->ready()) {
256 o = std::move(core_->getTry());
262 void FutureBase<T>::raise(exception_wrapper exception) {
263 core_->raise(std::move(exception));
268 void FutureBase<T>::setCallback_(F&& func) {
270 core_->setCallback(std::forward<F>(func));
274 FutureBase<T>::FutureBase(futures::detail::EmptyConstruct) noexcept
279 // Variant: returns a value
280 // e.g. f.then([](Try<T>&& t){ return t.value(); });
282 template <typename F, typename R, bool isTry, typename... Args>
283 typename std::enable_if<!R::ReturnsFuture::value, typename R::Return>::type
284 FutureBase<T>::thenImplementation(
286 futures::detail::argResult<isTry, F, Args...>) {
287 static_assert(sizeof...(Args) <= 1, "Then must take zero/one argument");
288 typedef typename R::ReturnsFuture::Inner B;
290 this->throwIfInvalid();
293 p.core_->setInterruptHandlerNoLock(this->core_->getInterruptHandler());
295 // grab the Future now before we lose our handle on the Promise
296 auto f = p.getFuture();
297 f.core_->setExecutorNoLock(this->getExecutor());
299 /* This is a bit tricky.
301 We can't just close over *this in case this Future gets moved. So we
302 make a new dummy Future. We could figure out something more
303 sophisticated that avoids making a new Future object when it can, as an
304 optimization. But this is correct.
306 core_ can't be moved, it is explicitly disallowed (as is copying). But
307 if there's ever a reason to allow it, this is one place that makes that
308 assumption and would need to be fixed. We use a standard shared pointer
309 for core_ (by copying it in), which means in essence obj holds a shared
310 pointer to itself. But this shouldn't leak because Promise will not
311 outlive the continuation, because Promise will setException() with a
312 broken Promise if it is destructed before completed. We could use a
313 weak pointer but it would have to be converted to a shared pointer when
314 func is executed (because the Future returned by func may possibly
315 persist beyond the callback, if it gets moved), and so it is an
316 optimization to just make it shared from the get-go.
318 Two subtle but important points about this design. futures::detail::Core
319 has no back pointers to Future or Promise, so if Future or Promise get
320 moved (and they will be moved in performant code) we don't have to do
321 anything fancy. And because we store the continuation in the
322 futures::detail::Core, not in the Future, we can execute the continuation
323 even after the Future has gone out of scope. This is an intentional design
324 decision. It is likely we will want to be able to cancel a continuation
325 in some circumstances, but I think it should be explicit not implicit
326 in the destruction of the Future used to create it.
329 [state = futures::detail::makeCoreCallbackState(
330 std::move(p), std::forward<F>(func))](Try<T>&& t) mutable {
332 if (!isTry && t.hasException()) {
333 state.setException(std::move(t.exception()));
335 state.setTry(makeTryWith(
336 [&] { return state.invoke(t.template get<isTry, Args>()...); }));
342 // Variant: returns a Future
343 // e.g. f.then([](T&& t){ return makeFuture<T>(t); });
345 template <typename F, typename R, bool isTry, typename... Args>
346 typename std::enable_if<R::ReturnsFuture::value, typename R::Return>::type
347 FutureBase<T>::thenImplementation(
349 futures::detail::argResult<isTry, F, Args...>) {
350 static_assert(sizeof...(Args) <= 1, "Then must take zero/one argument");
351 typedef typename R::ReturnsFuture::Inner B;
352 this->throwIfInvalid();
355 p.core_->setInterruptHandlerNoLock(this->core_->getInterruptHandler());
357 // grab the Future now before we lose our handle on the Promise
358 auto f = p.getFuture();
359 f.core_->setExecutorNoLock(this->getExecutor());
362 [state = futures::detail::makeCoreCallbackState(
363 std::move(p), std::forward<F>(func))](Try<T>&& t) mutable {
364 if (!isTry && t.hasException()) {
365 state.setException(std::move(t.exception()));
367 auto tf2 = state.tryInvoke(t.template get<isTry, Args>()...);
368 if (tf2.hasException()) {
369 state.setException(std::move(tf2.exception()));
371 tf2->setCallback_([p = state.stealPromise()](Try<B> && b) mutable {
372 p.setTry(std::move(b));
380 } // namespace detail
381 } // namespace futures
384 SemiFuture<typename std::decay<T>::type> makeSemiFuture(T&& t) {
385 return makeSemiFuture(Try<typename std::decay<T>::type>(std::forward<T>(t)));
388 // makeSemiFutureWith(SemiFuture<T>()) -> SemiFuture<T>
390 typename std::enable_if<
391 isSemiFuture<typename std::result_of<F()>::type>::value,
392 typename std::result_of<F()>::type>::type
393 makeSemiFutureWith(F&& func) {
395 typename isSemiFuture<typename std::result_of<F()>::type>::Inner;
397 return std::forward<F>(func)();
398 } catch (std::exception& e) {
399 return makeSemiFuture<InnerType>(
400 exception_wrapper(std::current_exception(), e));
402 return makeSemiFuture<InnerType>(
403 exception_wrapper(std::current_exception()));
407 // makeSemiFutureWith(T()) -> SemiFuture<T>
408 // makeSemiFutureWith(void()) -> SemiFuture<Unit>
410 typename std::enable_if<
411 !(isSemiFuture<typename std::result_of<F()>::type>::value),
412 SemiFuture<Unit::LiftT<typename std::result_of<F()>::type>>>::type
413 makeSemiFutureWith(F&& func) {
414 using LiftedResult = Unit::LiftT<typename std::result_of<F()>::type>;
415 return makeSemiFuture<LiftedResult>(
416 makeTryWith([&func]() mutable { return std::forward<F>(func)(); }));
420 SemiFuture<T> makeSemiFuture(std::exception_ptr const& e) {
421 return makeSemiFuture(Try<T>(e));
425 SemiFuture<T> makeSemiFuture(exception_wrapper ew) {
426 return makeSemiFuture(Try<T>(std::move(ew)));
429 template <class T, class E>
431 enable_if<std::is_base_of<std::exception, E>::value, SemiFuture<T>>::type
432 makeSemiFuture(E const& e) {
433 return makeSemiFuture(Try<T>(make_exception_wrapper<E>(e)));
437 SemiFuture<T> makeSemiFuture(Try<T>&& t) {
438 return SemiFuture<T>(new futures::detail::Core<T>(std::move(t)));
441 // This must be defined after the constructors to avoid a bug in MSVC
442 // https://connect.microsoft.com/VisualStudio/feedback/details/3142777/out-of-line-constructor-definition-after-implicit-reference-causes-incorrect-c2244
443 inline SemiFuture<Unit> makeSemiFuture() {
444 return makeSemiFuture(Unit{});
448 SemiFuture<T> SemiFuture<T>::makeEmpty() {
449 return SemiFuture<T>(futures::detail::EmptyConstruct{});
453 SemiFuture<T>::SemiFuture(SemiFuture<T>&& other) noexcept
454 : futures::detail::FutureBase<T>(std::move(other)) {}
457 SemiFuture<T>::SemiFuture(Future<T>&& other) noexcept
458 : futures::detail::FutureBase<T>(std::move(other)) {
459 // SemiFuture should not have an executor on construction
461 this->setExecutor(nullptr);
466 SemiFuture<T>& SemiFuture<T>::operator=(SemiFuture<T>&& other) noexcept {
472 SemiFuture<T>& SemiFuture<T>::operator=(Future<T>&& other) noexcept {
474 // SemiFuture should not have an executor on construction
476 this->setExecutor(nullptr);
482 Future<T> Future<T>::makeEmpty() {
483 return Future<T>(futures::detail::EmptyConstruct{});
487 Future<T>::Future(Future<T>&& other) noexcept
488 : futures::detail::FutureBase<T>(std::move(other)) {}
491 Future<T>& Future<T>::operator=(Future<T>&& other) noexcept {
499 typename std::enable_if<
500 !std::is_same<T, typename std::decay<T2>::type>::value &&
501 std::is_constructible<T, T2&&>::value &&
502 std::is_convertible<T2&&, T>::value,
504 Future<T>::Future(Future<T2>&& other)
505 : Future(std::move(other).then([](T2&& v) { return T(std::move(v)); })) {}
510 typename std::enable_if<
511 !std::is_same<T, typename std::decay<T2>::type>::value &&
512 std::is_constructible<T, T2&&>::value &&
513 !std::is_convertible<T2&&, T>::value,
515 Future<T>::Future(Future<T2>&& other)
516 : Future(std::move(other).then([](T2&& v) { return T(std::move(v)); })) {}
521 typename std::enable_if<
522 !std::is_same<T, typename std::decay<T2>::type>::value &&
523 std::is_constructible<T, T2&&>::value,
525 Future<T>& Future<T>::operator=(Future<T2>&& other) {
527 std::move(other).then([](T2&& v) { return T(std::move(v)); }));
535 enable_if<isFuture<F>::value, Future<typename isFuture<T>::Inner>>::type
536 Future<T>::unwrap() {
537 return then([](Future<typename isFuture<T>::Inner> internal_future) {
538 return internal_future;
543 inline Future<T> Future<T>::via(Executor* executor, int8_t priority) & {
544 this->throwIfInvalid();
546 auto f = p.getFuture();
547 auto func = [p = std::move(p)](Try<T>&& t) mutable {
548 p.setTry(std::move(t));
550 using R = futures::detail::callableResult<T, decltype(func)>;
551 this->template thenImplementation<decltype(func), R>(
552 std::move(func), typename R::Arg());
553 return std::move(f).via(executor, priority);
556 template <typename T>
557 template <typename R, typename Caller, typename... Args>
558 Future<typename isFuture<R>::Inner>
559 Future<T>::then(R(Caller::*func)(Args...), Caller *instance) {
560 typedef typename std::remove_cv<typename std::remove_reference<
561 typename futures::detail::ArgType<Args...>::FirstArg>::type>::type
564 return then([instance, func](Try<T>&& t){
565 return (instance->*func)(t.template get<isTry<FirstArg>::value, Args>()...);
570 Future<Unit> Future<T>::then() {
571 return then([] () {});
574 // onError where the callback returns T
577 typename std::enable_if<
578 !futures::detail::callableWith<F, exception_wrapper>::value &&
579 !futures::detail::callableWith<F, exception_wrapper&>::value &&
580 !futures::detail::Extract<F>::ReturnsFuture::value,
582 Future<T>::onError(F&& func) {
583 typedef std::remove_reference_t<
584 typename futures::detail::Extract<F>::FirstArg>
587 std::is_same<typename futures::detail::Extract<F>::RawReturn, T>::value,
588 "Return type of onError callback must be T or Future<T>");
591 p.core_->setInterruptHandlerNoLock(this->core_->getInterruptHandler());
592 auto f = p.getFuture();
595 [state = futures::detail::makeCoreCallbackState(
596 std::move(p), std::forward<F>(func))](Try<T>&& t) mutable {
597 if (auto e = t.template tryGetExceptionObject<Exn>()) {
598 state.setTry(makeTryWith([&] { return state.invoke(*e); }));
600 state.setTry(std::move(t));
607 // onError where the callback returns Future<T>
610 typename std::enable_if<
611 !futures::detail::callableWith<F, exception_wrapper>::value &&
612 !futures::detail::callableWith<F, exception_wrapper&>::value &&
613 futures::detail::Extract<F>::ReturnsFuture::value,
615 Future<T>::onError(F&& func) {
617 std::is_same<typename futures::detail::Extract<F>::Return, Future<T>>::
619 "Return type of onError callback must be T or Future<T>");
620 typedef std::remove_reference_t<
621 typename futures::detail::Extract<F>::FirstArg>
625 auto f = p.getFuture();
628 [state = futures::detail::makeCoreCallbackState(
629 std::move(p), std::forward<F>(func))](Try<T>&& t) mutable {
630 if (auto e = t.template tryGetExceptionObject<Exn>()) {
631 auto tf2 = state.tryInvoke(*e);
632 if (tf2.hasException()) {
633 state.setException(std::move(tf2.exception()));
635 tf2->setCallback_([p = state.stealPromise()](Try<T> && t3) mutable {
636 p.setTry(std::move(t3));
640 state.setTry(std::move(t));
649 Future<T> Future<T>::ensure(F&& func) {
650 return this->then([funcw = std::forward<F>(func)](Try<T> && t) mutable {
652 return makeFuture(std::move(t));
658 Future<T> Future<T>::onTimeout(Duration dur, F&& func, Timekeeper* tk) {
659 return within(dur, tk).onError([funcw = std::forward<F>(func)](
660 TimedOut const&) { return std::move(funcw)(); });
665 typename std::enable_if<
666 futures::detail::callableWith<F, exception_wrapper>::value &&
667 futures::detail::Extract<F>::ReturnsFuture::value,
669 Future<T>::onError(F&& func) {
671 std::is_same<typename futures::detail::Extract<F>::Return, Future<T>>::
673 "Return type of onError callback must be T or Future<T>");
676 auto f = p.getFuture();
678 [state = futures::detail::makeCoreCallbackState(
679 std::move(p), std::forward<F>(func))](Try<T> t) mutable {
680 if (t.hasException()) {
681 auto tf2 = state.tryInvoke(std::move(t.exception()));
682 if (tf2.hasException()) {
683 state.setException(std::move(tf2.exception()));
685 tf2->setCallback_([p = state.stealPromise()](Try<T> && t3) mutable {
686 p.setTry(std::move(t3));
690 state.setTry(std::move(t));
697 // onError(exception_wrapper) that returns T
700 typename std::enable_if<
701 futures::detail::callableWith<F, exception_wrapper>::value &&
702 !futures::detail::Extract<F>::ReturnsFuture::value,
704 Future<T>::onError(F&& func) {
706 std::is_same<typename futures::detail::Extract<F>::Return, Future<T>>::
708 "Return type of onError callback must be T or Future<T>");
711 auto f = p.getFuture();
713 [state = futures::detail::makeCoreCallbackState(
714 std::move(p), std::forward<F>(func))](Try<T>&& t) mutable {
715 if (t.hasException()) {
716 state.setTry(makeTryWith(
717 [&] { return state.invoke(std::move(t.exception())); }));
719 state.setTry(std::move(t));
727 Try<T>& Future<T>::getTryVia(DrivableExecutor* e) {
728 return waitVia(e).getTry();
731 template <class Func>
732 auto via(Executor* x, Func&& func)
733 -> Future<typename isFuture<decltype(std::declval<Func>()())>::Inner> {
734 // TODO make this actually more performant. :-P #7260175
735 return via(x).then(std::forward<Func>(func));
741 Future<typename std::decay<T>::type> makeFuture(T&& t) {
742 return makeFuture(Try<typename std::decay<T>::type>(std::forward<T>(t)));
745 inline Future<Unit> makeFuture() {
746 return makeFuture(Unit{});
749 // makeFutureWith(Future<T>()) -> Future<T>
751 typename std::enable_if<isFuture<typename std::result_of<F()>::type>::value,
752 typename std::result_of<F()>::type>::type
753 makeFutureWith(F&& func) {
755 typename isFuture<typename std::result_of<F()>::type>::Inner;
757 return std::forward<F>(func)();
758 } catch (std::exception& e) {
759 return makeFuture<InnerType>(
760 exception_wrapper(std::current_exception(), e));
762 return makeFuture<InnerType>(exception_wrapper(std::current_exception()));
766 // makeFutureWith(T()) -> Future<T>
767 // makeFutureWith(void()) -> Future<Unit>
769 typename std::enable_if<
770 !(isFuture<typename std::result_of<F()>::type>::value),
771 Future<Unit::LiftT<typename std::result_of<F()>::type>>>::type
772 makeFutureWith(F&& func) {
773 using LiftedResult = Unit::LiftT<typename std::result_of<F()>::type>;
774 return makeFuture<LiftedResult>(
775 makeTryWith([&func]() mutable { return std::forward<F>(func)(); }));
779 Future<T> makeFuture(std::exception_ptr const& e) {
780 return makeFuture(Try<T>(e));
784 Future<T> makeFuture(exception_wrapper ew) {
785 return makeFuture(Try<T>(std::move(ew)));
788 template <class T, class E>
789 typename std::enable_if<std::is_base_of<std::exception, E>::value,
791 makeFuture(E const& e) {
792 return makeFuture(Try<T>(make_exception_wrapper<E>(e)));
796 Future<T> makeFuture(Try<T>&& t) {
797 return Future<T>(new futures::detail::Core<T>(std::move(t)));
801 Future<Unit> via(Executor* executor, int8_t priority) {
802 return makeFuture().via(executor, priority);
805 // mapSetCallback calls func(i, Try<T>) when every future completes
807 template <class T, class InputIterator, class F>
808 void mapSetCallback(InputIterator first, InputIterator last, F func) {
809 for (size_t i = 0; first != last; ++first, ++i) {
810 first->setCallback_([func, i](Try<T>&& t) {
811 func(i, std::move(t));
816 // collectAll (variadic)
818 template <typename... Fs>
819 typename futures::detail::CollectAllVariadicContext<
820 typename std::decay<Fs>::type::value_type...>::type
821 collectAll(Fs&&... fs) {
822 auto ctx = std::make_shared<futures::detail::CollectAllVariadicContext<
823 typename std::decay<Fs>::type::value_type...>>();
824 futures::detail::collectVariadicHelper<
825 futures::detail::CollectAllVariadicContext>(ctx, std::forward<Fs>(fs)...);
826 return ctx->p.getFuture();
829 // collectAll (iterator)
831 template <class InputIterator>
834 Try<typename std::iterator_traits<InputIterator>::value_type::value_type>>>
835 collectAll(InputIterator first, InputIterator last) {
837 typename std::iterator_traits<InputIterator>::value_type::value_type T;
839 struct CollectAllContext {
840 CollectAllContext(size_t n) : results(n) {}
841 ~CollectAllContext() {
842 p.setValue(std::move(results));
844 Promise<std::vector<Try<T>>> p;
845 std::vector<Try<T>> results;
849 std::make_shared<CollectAllContext>(size_t(std::distance(first, last)));
850 mapSetCallback<T>(first, last, [ctx](size_t i, Try<T>&& t) {
851 ctx->results[i] = std::move(t);
853 return ctx->p.getFuture();
856 // collect (iterator)
861 template <typename T>
862 struct CollectContext {
864 explicit Nothing(int /* n */) {}
867 using Result = typename std::conditional<
868 std::is_void<T>::value,
870 std::vector<T>>::type;
872 using InternalResult = typename std::conditional<
873 std::is_void<T>::value,
875 std::vector<Optional<T>>>::type;
877 explicit CollectContext(size_t n) : result(n) {}
879 if (!threw.exchange(true)) {
880 // map Optional<T> -> T
881 std::vector<T> finalResult;
882 finalResult.reserve(result.size());
883 std::transform(result.begin(), result.end(),
884 std::back_inserter(finalResult),
885 [](Optional<T>& o) { return std::move(o.value()); });
886 p.setValue(std::move(finalResult));
889 inline void setPartialResult(size_t i, Try<T>& t) {
890 result[i] = std::move(t.value());
893 InternalResult result;
894 std::atomic<bool> threw {false};
897 } // namespace detail
898 } // namespace futures
900 template <class InputIterator>
901 Future<typename futures::detail::CollectContext<typename std::iterator_traits<
902 InputIterator>::value_type::value_type>::Result>
903 collect(InputIterator first, InputIterator last) {
905 typename std::iterator_traits<InputIterator>::value_type::value_type T;
907 auto ctx = std::make_shared<futures::detail::CollectContext<T>>(
908 std::distance(first, last));
909 mapSetCallback<T>(first, last, [ctx](size_t i, Try<T>&& t) {
910 if (t.hasException()) {
911 if (!ctx->threw.exchange(true)) {
912 ctx->p.setException(std::move(t.exception()));
914 } else if (!ctx->threw) {
915 ctx->setPartialResult(i, t);
918 return ctx->p.getFuture();
921 // collect (variadic)
923 template <typename... Fs>
924 typename futures::detail::CollectVariadicContext<
925 typename std::decay<Fs>::type::value_type...>::type
926 collect(Fs&&... fs) {
927 auto ctx = std::make_shared<futures::detail::CollectVariadicContext<
928 typename std::decay<Fs>::type::value_type...>>();
929 futures::detail::collectVariadicHelper<
930 futures::detail::CollectVariadicContext>(ctx, std::forward<Fs>(fs)...);
931 return ctx->p.getFuture();
934 // collectAny (iterator)
936 template <class InputIterator>
941 std::iterator_traits<InputIterator>::value_type::value_type>>>
942 collectAny(InputIterator first, InputIterator last) {
944 typename std::iterator_traits<InputIterator>::value_type::value_type T;
946 struct CollectAnyContext {
947 CollectAnyContext() {}
948 Promise<std::pair<size_t, Try<T>>> p;
949 std::atomic<bool> done {false};
952 auto ctx = std::make_shared<CollectAnyContext>();
953 mapSetCallback<T>(first, last, [ctx](size_t i, Try<T>&& t) {
954 if (!ctx->done.exchange(true)) {
955 ctx->p.setValue(std::make_pair(i, std::move(t)));
958 return ctx->p.getFuture();
961 // collectAnyWithoutException (iterator)
963 template <class InputIterator>
966 typename std::iterator_traits<InputIterator>::value_type::value_type>>
967 collectAnyWithoutException(InputIterator first, InputIterator last) {
969 typename std::iterator_traits<InputIterator>::value_type::value_type T;
971 struct CollectAnyWithoutExceptionContext {
972 CollectAnyWithoutExceptionContext(){}
973 Promise<std::pair<size_t, T>> p;
974 std::atomic<bool> done{false};
975 std::atomic<size_t> nFulfilled{0};
979 auto ctx = std::make_shared<CollectAnyWithoutExceptionContext>();
980 ctx->nTotal = size_t(std::distance(first, last));
982 mapSetCallback<T>(first, last, [ctx](size_t i, Try<T>&& t) {
983 if (!t.hasException() && !ctx->done.exchange(true)) {
984 ctx->p.setValue(std::make_pair(i, std::move(t.value())));
985 } else if (++ctx->nFulfilled == ctx->nTotal) {
986 ctx->p.setException(t.exception());
989 return ctx->p.getFuture();
992 // collectN (iterator)
994 template <class InputIterator>
995 Future<std::vector<std::pair<size_t, Try<typename
996 std::iterator_traits<InputIterator>::value_type::value_type>>>>
997 collectN(InputIterator first, InputIterator last, size_t n) {
999 std::iterator_traits<InputIterator>::value_type::value_type T;
1000 typedef std::vector<std::pair<size_t, Try<T>>> V;
1002 struct CollectNContext {
1004 std::atomic<size_t> completed = {0};
1007 auto ctx = std::make_shared<CollectNContext>();
1009 if (size_t(std::distance(first, last)) < n) {
1010 ctx->p.setException(std::runtime_error("Not enough futures"));
1012 // for each completed Future, increase count and add to vector, until we
1013 // have n completed futures at which point we fulfil our Promise with the
1015 mapSetCallback<T>(first, last, [ctx, n](size_t i, Try<T>&& t) {
1016 auto c = ++ctx->completed;
1018 assert(ctx->v.size() < n);
1019 ctx->v.emplace_back(i, std::move(t));
1021 ctx->p.setTry(Try<V>(std::move(ctx->v)));
1027 return ctx->p.getFuture();
1030 // reduce (iterator)
1032 template <class It, class T, class F>
1033 Future<T> reduce(It first, It last, T&& initial, F&& func) {
1034 if (first == last) {
1035 return makeFuture(std::move(initial));
1038 typedef typename std::iterator_traits<It>::value_type::value_type ItT;
1039 typedef typename std::conditional<
1040 futures::detail::callableWith<F, T&&, Try<ItT>&&>::value,
1043 typedef isTry<Arg> IsTry;
1045 auto sfunc = std::make_shared<F>(std::move(func));
1047 auto f = first->then(
1048 [ minitial = std::move(initial), sfunc ](Try<ItT> & head) mutable {
1050 std::move(minitial), head.template get<IsTry::value, Arg&&>());
1053 for (++first; first != last; ++first) {
1054 f = collectAll(f, *first).then([sfunc](std::tuple<Try<T>, Try<ItT>>& t) {
1055 return (*sfunc)(std::move(std::get<0>(t).value()),
1056 // Either return a ItT&& or a Try<ItT>&& depending
1057 // on the type of the argument of func.
1058 std::get<1>(t).template get<IsTry::value, Arg&&>());
1065 // window (collection)
1067 template <class Collection, class F, class ItT, class Result>
1068 std::vector<Future<Result>>
1069 window(Collection input, F func, size_t n) {
1070 // Use global inline executor singleton
1071 auto executor = &InlineExecutor::instance();
1072 return window(executor, std::move(input), std::move(func), n);
1075 template <class Collection, class F, class ItT, class Result>
1076 std::vector<Future<Result>>
1077 window(Executor* executor, Collection input, F func, size_t n) {
1078 struct WindowContext {
1079 WindowContext(Executor* executor_, Collection&& input_, F&& func_)
1080 : executor(executor_),
1081 input(std::move(input_)),
1082 promises(input.size()),
1083 func(std::move(func_)) {}
1084 std::atomic<size_t> i{0};
1087 std::vector<Promise<Result>> promises;
1090 static inline void spawn(std::shared_ptr<WindowContext> ctx) {
1091 size_t i = ctx->i++;
1092 if (i < ctx->input.size()) {
1093 auto fut = ctx->func(std::move(ctx->input[i]));
1094 fut.setCallback_([ctx = std::move(ctx), i](Try<Result>&& t) mutable {
1095 const auto executor_ = ctx->executor;
1096 executor_->add([ctx = std::move(ctx), i, t = std::move(t)]() mutable {
1097 ctx->promises[i].setTry(std::move(t));
1098 // Chain another future onto this one
1099 spawn(std::move(ctx));
1106 auto max = std::min(n, input.size());
1108 auto ctx = std::make_shared<WindowContext>(
1109 executor, std::move(input), std::move(func));
1111 // Start the first n Futures
1112 for (size_t i = 0; i < max; ++i) {
1113 executor->add([ctx]() mutable { WindowContext::spawn(std::move(ctx)); });
1116 std::vector<Future<Result>> futures;
1117 futures.reserve(ctx->promises.size());
1118 for (auto& promise : ctx->promises) {
1119 futures.emplace_back(promise.getFuture());
1128 template <class I, class F>
1129 Future<I> Future<T>::reduce(I&& initial, F&& func) {
1131 minitial = std::forward<I>(initial),
1132 mfunc = std::forward<F>(func)
1133 ](T& vals) mutable {
1134 auto ret = std::move(minitial);
1135 for (auto& val : vals) {
1136 ret = mfunc(std::move(ret), std::move(val));
1142 // unorderedReduce (iterator)
1144 template <class It, class T, class F, class ItT, class Arg>
1145 Future<T> unorderedReduce(It first, It last, T initial, F func) {
1146 if (first == last) {
1147 return makeFuture(std::move(initial));
1150 typedef isTry<Arg> IsTry;
1152 struct UnorderedReduceContext {
1153 UnorderedReduceContext(T&& memo, F&& fn, size_t n)
1154 : lock_(), memo_(makeFuture<T>(std::move(memo))),
1155 func_(std::move(fn)), numThens_(0), numFutures_(n), promise_()
1157 folly::MicroSpinLock lock_; // protects memo_ and numThens_
1160 size_t numThens_; // how many Futures completed and called .then()
1161 size_t numFutures_; // how many Futures in total
1162 Promise<T> promise_;
1165 auto ctx = std::make_shared<UnorderedReduceContext>(
1166 std::move(initial), std::move(func), std::distance(first, last));
1168 mapSetCallback<ItT>(
1171 [ctx](size_t /* i */, Try<ItT>&& t) {
1172 // Futures can be completed in any order, simultaneously.
1173 // To make this non-blocking, we create a new Future chain in
1174 // the order of completion to reduce the values.
1175 // The spinlock just protects chaining a new Future, not actually
1176 // executing the reduce, which should be really fast.
1177 folly::MSLGuard lock(ctx->lock_);
1179 ctx->memo_.then([ ctx, mt = std::move(t) ](T && v) mutable {
1180 // Either return a ItT&& or a Try<ItT>&& depending
1181 // on the type of the argument of func.
1182 return ctx->func_(std::move(v),
1183 mt.template get<IsTry::value, Arg&&>());
1185 if (++ctx->numThens_ == ctx->numFutures_) {
1186 // After reducing the value of the last Future, fulfill the Promise
1187 ctx->memo_.setCallback_(
1188 [ctx](Try<T>&& t2) { ctx->promise_.setValue(std::move(t2)); });
1192 return ctx->promise_.getFuture();
1198 Future<T> Future<T>::within(Duration dur, Timekeeper* tk) {
1199 return within(dur, TimedOut(), tk);
1204 Future<T> Future<T>::within(Duration dur, E e, Timekeeper* tk) {
1207 Context(E ex) : exception(std::move(ex)), promise() {}
1209 Future<Unit> thisFuture;
1211 std::atomic<bool> token {false};
1214 if (this->isReady()) {
1215 return std::move(*this);
1218 std::shared_ptr<Timekeeper> tks;
1220 tks = folly::detail::getTimekeeperSingleton();
1224 if (UNLIKELY(!tk)) {
1225 return makeFuture<T>(NoTimekeeper());
1228 auto ctx = std::make_shared<Context>(std::move(e));
1230 ctx->thisFuture = this->then([ctx](Try<T>&& t) mutable {
1231 if (ctx->token.exchange(true) == false) {
1232 ctx->promise.setTry(std::move(t));
1236 // Have time keeper use a weak ptr to hold ctx,
1237 // so that ctx can be deallocated as soon as the future job finished.
1238 tk->after(dur).then([weakCtx = to_weak_ptr(ctx)](Try<Unit> const& t) mutable {
1239 auto lockedCtx = weakCtx.lock();
1241 // ctx already released. "this" completed first, cancel "after"
1244 // "after" completed first, cancel "this"
1245 lockedCtx->thisFuture.raise(TimedOut());
1246 if (lockedCtx->token.exchange(true) == false) {
1247 if (t.hasException()) {
1248 lockedCtx->promise.setException(std::move(t.exception()));
1250 lockedCtx->promise.setException(std::move(lockedCtx->exception));
1255 return ctx->promise.getFuture().via(this->getExecutor());
1261 Future<T> Future<T>::delayed(Duration dur, Timekeeper* tk) {
1262 return collectAll(*this, futures::sleep(dur, tk))
1263 .then([](std::tuple<Try<T>, Try<Unit>> tup) {
1264 Try<T>& t = std::get<0>(tup);
1265 return makeFuture<T>(std::move(t));
1272 template <class FutureType, typename T = typename FutureType::value_type>
1273 void waitImpl(FutureType& f) {
1274 // short-circuit if there's nothing to do
1279 FutureBatonType baton;
1280 f.setCallback_([&](const Try<T>& /* t */) { baton.post(); });
1282 assert(f.isReady());
1285 template <class FutureType, typename T = typename FutureType::value_type>
1286 void waitImpl(FutureType& f, Duration dur) {
1287 // short-circuit if there's nothing to do
1293 auto ret = promise.getFuture();
1294 auto baton = std::make_shared<FutureBatonType>();
1295 f.setCallback_([baton, promise = std::move(promise)](Try<T>&& t) mutable {
1296 promise.setTry(std::move(t));
1300 if (baton->timed_wait(dur)) {
1301 assert(f.isReady());
1306 void waitViaImpl(Future<T>& f, DrivableExecutor* e) {
1307 // Set callback so to ensure that the via executor has something on it
1308 // so that once the preceding future triggers this callback, drive will
1309 // always have a callback to satisfy it
1313 f = f.via(e).then([](T&& t) { return std::move(t); });
1314 while (!f.isReady()) {
1317 assert(f.isReady());
1320 } // namespace detail
1321 } // namespace futures
1324 SemiFuture<T>& SemiFuture<T>::wait() & {
1325 futures::detail::waitImpl(*this);
1330 SemiFuture<T>&& SemiFuture<T>::wait() && {
1331 futures::detail::waitImpl(*this);
1332 return std::move(*this);
1336 SemiFuture<T>& SemiFuture<T>::wait(Duration dur) & {
1337 futures::detail::waitImpl(*this, dur);
1342 SemiFuture<T>&& SemiFuture<T>::wait(Duration dur) && {
1343 futures::detail::waitImpl(*this, dur);
1344 return std::move(*this);
1348 T SemiFuture<T>::get() && {
1349 return std::move(wait().value());
1353 T SemiFuture<T>::get(Duration dur) && {
1355 if (this->isReady()) {
1356 return std::move(this->value());
1363 Future<T>& Future<T>::wait() & {
1364 futures::detail::waitImpl(*this);
1369 Future<T>&& Future<T>::wait() && {
1370 futures::detail::waitImpl(*this);
1371 return std::move(*this);
1375 Future<T>& Future<T>::wait(Duration dur) & {
1376 futures::detail::waitImpl(*this, dur);
1381 Future<T>&& Future<T>::wait(Duration dur) && {
1382 futures::detail::waitImpl(*this, dur);
1383 return std::move(*this);
1387 Future<T>& Future<T>::waitVia(DrivableExecutor* e) & {
1388 futures::detail::waitViaImpl(*this, e);
1393 Future<T>&& Future<T>::waitVia(DrivableExecutor* e) && {
1394 futures::detail::waitViaImpl(*this, e);
1395 return std::move(*this);
1399 T Future<T>::get() {
1400 return std::move(wait().value());
1404 T Future<T>::get(Duration dur) {
1406 if (this->isReady()) {
1407 return std::move(this->value());
1414 T Future<T>::getVia(DrivableExecutor* e) {
1415 return std::move(waitVia(e).value());
1422 static bool equals(const Try<T>& t1, const Try<T>& t2) {
1423 return t1.value() == t2.value();
1426 } // namespace detail
1427 } // namespace futures
1430 Future<bool> Future<T>::willEqual(Future<T>& f) {
1431 return collectAll(*this, f).then([](const std::tuple<Try<T>, Try<T>>& t) {
1432 if (std::get<0>(t).hasValue() && std::get<1>(t).hasValue()) {
1433 return futures::detail::TryEquals<T>::equals(
1434 std::get<0>(t), std::get<1>(t));
1443 Future<T> Future<T>::filter(F&& predicate) {
1444 return this->then([p = std::forward<F>(predicate)](T val) {
1445 T const& valConstRef = val;
1446 if (!p(valConstRef)) {
1447 throwPredicateDoesNotObtain();
1454 inline Future<Unit> when(bool p, F&& thunk) {
1455 return p ? std::forward<F>(thunk)().unit() : makeFuture();
1458 template <class P, class F>
1459 Future<Unit> whileDo(P&& predicate, F&& thunk) {
1461 auto future = thunk();
1462 return future.then([
1463 predicate = std::forward<P>(predicate),
1464 thunk = std::forward<F>(thunk)
1466 return whileDo(std::forward<P>(predicate), std::forward<F>(thunk));
1469 return makeFuture();
1473 Future<Unit> times(const int n, F&& thunk) {
1474 return folly::whileDo(
1475 [ n, count = std::make_unique<std::atomic<int>>(0) ]() mutable {
1476 return count->fetch_add(1) < n;
1478 std::forward<F>(thunk));
1482 template <class It, class F, class ItT, class Result>
1483 std::vector<Future<Result>> map(It first, It last, F func) {
1484 std::vector<Future<Result>> results;
1485 for (auto it = first; it != last; it++) {
1486 results.push_back(it->then(func));
1490 } // namespace futures
1492 // Instantiate the most common Future types to save compile time
1493 extern template class Future<Unit>;
1494 extern template class Future<bool>;
1495 extern template class Future<int>;
1496 extern template class Future<int64_t>;
1497 extern template class Future<std::string>;
1498 extern template class Future<double>;
1499 } // namespace folly