2 * Copyright 2017 Facebook, Inc.
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
8 * http://www.apache.org/licenses/LICENSE-2.0
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
25 #include <folly/Baton.h>
26 #include <folly/Optional.h>
27 #include <folly/Random.h>
28 #include <folly/futures/Timekeeper.h>
29 #include <folly/futures/detail/Core.h>
31 #ifndef FOLLY_FUTURE_USING_FIBER
32 #if FOLLY_MOBILE || defined(__APPLE__)
33 #define FOLLY_FUTURE_USING_FIBER 0
35 #define FOLLY_FUTURE_USING_FIBER 1
36 #include <folly/fibers/Baton.h>
46 #if FOLLY_FUTURE_USING_FIBER
47 typedef folly::fibers::Baton FutureBatonType;
49 typedef folly::Baton<> FutureBatonType;
52 } // namespace futures
55 std::shared_ptr<Timekeeper> getTimekeeperSingleton();
60 // Guarantees that the stored functor is destructed before the stored promise
61 // may be fulfilled. Assumes the stored functor to be noexcept-destructible.
62 template <typename T, typename F>
63 class CoreCallbackState {
65 template <typename FF>
66 CoreCallbackState(Promise<T>&& promise, FF&& func) noexcept(
67 noexcept(F(std::declval<FF>())))
68 : func_(std::forward<FF>(func)), promise_(std::move(promise)) {
69 assert(before_barrier());
72 CoreCallbackState(CoreCallbackState&& that) noexcept(
73 noexcept(F(std::declval<F>()))) {
74 if (that.before_barrier()) {
75 new (&func_) F(std::move(that.func_));
76 promise_ = that.stealPromise();
80 CoreCallbackState& operator=(CoreCallbackState&&) = delete;
82 ~CoreCallbackState() {
83 if (before_barrier()) {
88 template <typename... Args>
89 auto invoke(Args&&... args) noexcept(
90 noexcept(std::declval<F&&>()(std::declval<Args&&>()...))) {
91 assert(before_barrier());
92 return std::move(func_)(std::forward<Args>(args)...);
95 template <typename... Args>
96 auto tryInvoke(Args&&... args) noexcept {
97 return makeTryWith([&] { return invoke(std::forward<Args>(args)...); });
100 void setTry(Try<T>&& t) {
101 stealPromise().setTry(std::move(t));
104 void setException(exception_wrapper&& ew) {
105 stealPromise().setException(std::move(ew));
108 Promise<T> stealPromise() noexcept {
109 assert(before_barrier());
111 return std::move(promise_);
115 bool before_barrier() const noexcept {
116 return !promise_.isFulfilled();
122 Promise<T> promise_{Promise<T>::makeEmpty()};
125 template <typename T, typename F>
126 inline auto makeCoreCallbackState(Promise<T>&& p, F&& f) noexcept(
127 noexcept(CoreCallbackState<T, _t<std::decay<F>>>(
128 std::declval<Promise<T>&&>(),
129 std::declval<F&&>()))) {
130 return CoreCallbackState<T, _t<std::decay<F>>>(
131 std::move(p), std::forward<F>(f));
133 } // namespace detail
134 } // namespace futures
137 Future<T> Future<T>::makeEmpty() {
138 return Future<T>(futures::detail::EmptyConstruct{});
142 Future<T>::Future(Future<T>&& other) noexcept : core_(other.core_) {
143 other.core_ = nullptr;
147 Future<T>& Future<T>::operator=(Future<T>&& other) noexcept {
148 std::swap(core_, other.core_);
155 typename std::enable_if<
156 !std::is_same<T, typename std::decay<T2>::type>::value &&
157 std::is_constructible<T, T2&&>::value &&
158 std::is_convertible<T2&&, T>::value,
160 Future<T>::Future(Future<T2>&& other)
161 : Future(std::move(other).then([](T2&& v) { return T(std::move(v)); })) {}
166 typename std::enable_if<
167 !std::is_same<T, typename std::decay<T2>::type>::value &&
168 std::is_constructible<T, T2&&>::value &&
169 !std::is_convertible<T2&&, T>::value,
171 Future<T>::Future(Future<T2>&& other)
172 : Future(std::move(other).then([](T2&& v) { return T(std::move(v)); })) {}
177 typename std::enable_if<
178 !std::is_same<T, typename std::decay<T2>::type>::value &&
179 std::is_constructible<T, T2&&>::value,
181 Future<T>& Future<T>::operator=(Future<T2>&& other) {
183 std::move(other).then([](T2&& v) { return T(std::move(v)); }));
187 template <class T2, typename>
188 Future<T>::Future(T2&& val)
189 : core_(new futures::detail::Core<T>(Try<T>(std::forward<T2>(val)))) {}
192 template <typename T2>
193 Future<T>::Future(typename std::enable_if<std::is_same<Unit, T2>::value>::type*)
194 : core_(new futures::detail::Core<T>(Try<T>(T()))) {}
199 typename std::enable_if<std::is_constructible<T, Args&&...>::value, int>::
201 Future<T>::Future(in_place_t, Args&&... args)
203 new futures::detail::Core<T>(in_place, std::forward<Args>(args)...)) {
207 Future<T>::~Future() {
212 void Future<T>::detach() {
214 core_->detachFuture();
220 void Future<T>::throwIfInvalid() const {
227 void Future<T>::setCallback_(F&& func) {
229 core_->setCallback(std::forward<F>(func));
236 typename std::enable_if<isFuture<F>::value,
237 Future<typename isFuture<T>::Inner>>::type
238 Future<T>::unwrap() {
239 return then([](Future<typename isFuture<T>::Inner> internal_future) {
240 return internal_future;
246 // Variant: returns a value
247 // e.g. f.then([](Try<T>&& t){ return t.value(); });
249 template <typename F, typename R, bool isTry, typename... Args>
250 typename std::enable_if<!R::ReturnsFuture::value, typename R::Return>::type
251 Future<T>::thenImplementation(
253 futures::detail::argResult<isTry, F, Args...>) {
254 static_assert(sizeof...(Args) <= 1, "Then must take zero/one argument");
255 typedef typename R::ReturnsFuture::Inner B;
260 p.core_->setInterruptHandlerNoLock(core_->getInterruptHandler());
262 // grab the Future now before we lose our handle on the Promise
263 auto f = p.getFuture();
264 f.core_->setExecutorNoLock(getExecutor());
266 /* This is a bit tricky.
268 We can't just close over *this in case this Future gets moved. So we
269 make a new dummy Future. We could figure out something more
270 sophisticated that avoids making a new Future object when it can, as an
271 optimization. But this is correct.
273 core_ can't be moved, it is explicitly disallowed (as is copying). But
274 if there's ever a reason to allow it, this is one place that makes that
275 assumption and would need to be fixed. We use a standard shared pointer
276 for core_ (by copying it in), which means in essence obj holds a shared
277 pointer to itself. But this shouldn't leak because Promise will not
278 outlive the continuation, because Promise will setException() with a
279 broken Promise if it is destructed before completed. We could use a
280 weak pointer but it would have to be converted to a shared pointer when
281 func is executed (because the Future returned by func may possibly
282 persist beyond the callback, if it gets moved), and so it is an
283 optimization to just make it shared from the get-go.
285 Two subtle but important points about this design. futures::detail::Core
286 has no back pointers to Future or Promise, so if Future or Promise get
287 moved (and they will be moved in performant code) we don't have to do
288 anything fancy. And because we store the continuation in the
289 futures::detail::Core, not in the Future, we can execute the continuation
290 even after the Future has gone out of scope. This is an intentional design
291 decision. It is likely we will want to be able to cancel a continuation
292 in some circumstances, but I think it should be explicit not implicit
293 in the destruction of the Future used to create it.
296 [state = futures::detail::makeCoreCallbackState(
297 std::move(p), std::forward<F>(func))](Try<T> && t) mutable {
298 if (!isTry && t.hasException()) {
299 state.setException(std::move(t.exception()));
301 state.setTry(makeTryWith(
302 [&] { return state.invoke(t.template get<isTry, Args>()...); }));
309 // Variant: returns a Future
310 // e.g. f.then([](T&& t){ return makeFuture<T>(t); });
312 template <typename F, typename R, bool isTry, typename... Args>
313 typename std::enable_if<R::ReturnsFuture::value, typename R::Return>::type
314 Future<T>::thenImplementation(
316 futures::detail::argResult<isTry, F, Args...>) {
317 static_assert(sizeof...(Args) <= 1, "Then must take zero/one argument");
318 typedef typename R::ReturnsFuture::Inner B;
323 p.core_->setInterruptHandlerNoLock(core_->getInterruptHandler());
325 // grab the Future now before we lose our handle on the Promise
326 auto f = p.getFuture();
327 f.core_->setExecutorNoLock(getExecutor());
330 [state = futures::detail::makeCoreCallbackState(
331 std::move(p), std::forward<F>(func))](Try<T> && t) mutable {
332 if (!isTry && t.hasException()) {
333 state.setException(std::move(t.exception()));
335 auto tf2 = state.tryInvoke(t.template get<isTry, Args>()...);
336 if (tf2.hasException()) {
337 state.setException(std::move(tf2.exception()));
339 tf2->setCallback_([p = state.stealPromise()](Try<B> && b) mutable {
340 p.setTry(std::move(b));
349 template <typename T>
350 template <typename R, typename Caller, typename... Args>
351 Future<typename isFuture<R>::Inner>
352 Future<T>::then(R(Caller::*func)(Args...), Caller *instance) {
353 typedef typename std::remove_cv<typename std::remove_reference<
354 typename futures::detail::ArgType<Args...>::FirstArg>::type>::type
356 return then([instance, func](Try<T>&& t){
357 return (instance->*func)(t.template get<isTry<FirstArg>::value, Args>()...);
362 Future<Unit> Future<T>::then() {
363 return then([] () {});
366 // onError where the callback returns T
369 typename std::enable_if<
370 !futures::detail::callableWith<F, exception_wrapper>::value &&
371 !futures::detail::callableWith<F, exception_wrapper&>::value &&
372 !futures::detail::Extract<F>::ReturnsFuture::value,
374 Future<T>::onError(F&& func) {
375 typedef std::remove_reference_t<
376 typename futures::detail::Extract<F>::FirstArg>
379 std::is_same<typename futures::detail::Extract<F>::RawReturn, T>::value,
380 "Return type of onError callback must be T or Future<T>");
383 p.core_->setInterruptHandlerNoLock(core_->getInterruptHandler());
384 auto f = p.getFuture();
387 [state = futures::detail::makeCoreCallbackState(
388 std::move(p), std::forward<F>(func))](Try<T> && t) mutable {
389 if (auto e = t.template tryGetExceptionObject<Exn>()) {
390 state.setTry(makeTryWith([&] { return state.invoke(*e); }));
392 state.setTry(std::move(t));
399 // onError where the callback returns Future<T>
402 typename std::enable_if<
403 !futures::detail::callableWith<F, exception_wrapper>::value &&
404 !futures::detail::callableWith<F, exception_wrapper&>::value &&
405 futures::detail::Extract<F>::ReturnsFuture::value,
407 Future<T>::onError(F&& func) {
409 std::is_same<typename futures::detail::Extract<F>::Return, Future<T>>::
411 "Return type of onError callback must be T or Future<T>");
412 typedef std::remove_reference_t<
413 typename futures::detail::Extract<F>::FirstArg>
417 auto f = p.getFuture();
420 [state = futures::detail::makeCoreCallbackState(
421 std::move(p), std::forward<F>(func))](Try<T> && t) mutable {
422 if (auto e = t.template tryGetExceptionObject<Exn>()) {
423 auto tf2 = state.tryInvoke(*e);
424 if (tf2.hasException()) {
425 state.setException(std::move(tf2.exception()));
427 tf2->setCallback_([p = state.stealPromise()](Try<T> && t3) mutable {
428 p.setTry(std::move(t3));
432 state.setTry(std::move(t));
441 Future<T> Future<T>::ensure(F&& func) {
442 return this->then([funcw = std::forward<F>(func)](Try<T> && t) mutable {
444 return makeFuture(std::move(t));
450 Future<T> Future<T>::onTimeout(Duration dur, F&& func, Timekeeper* tk) {
451 return within(dur, tk).onError([funcw = std::forward<F>(func)](
452 TimedOut const&) { return std::move(funcw)(); });
457 typename std::enable_if<
458 futures::detail::callableWith<F, exception_wrapper>::value &&
459 futures::detail::Extract<F>::ReturnsFuture::value,
461 Future<T>::onError(F&& func) {
463 std::is_same<typename futures::detail::Extract<F>::Return, Future<T>>::
465 "Return type of onError callback must be T or Future<T>");
468 auto f = p.getFuture();
470 [state = futures::detail::makeCoreCallbackState(
471 std::move(p), std::forward<F>(func))](Try<T> t) mutable {
472 if (t.hasException()) {
473 auto tf2 = state.tryInvoke(std::move(t.exception()));
474 if (tf2.hasException()) {
475 state.setException(std::move(tf2.exception()));
477 tf2->setCallback_([p = state.stealPromise()](Try<T> && t3) mutable {
478 p.setTry(std::move(t3));
482 state.setTry(std::move(t));
489 // onError(exception_wrapper) that returns T
492 typename std::enable_if<
493 futures::detail::callableWith<F, exception_wrapper>::value &&
494 !futures::detail::Extract<F>::ReturnsFuture::value,
496 Future<T>::onError(F&& func) {
498 std::is_same<typename futures::detail::Extract<F>::Return, Future<T>>::
500 "Return type of onError callback must be T or Future<T>");
503 auto f = p.getFuture();
505 [state = futures::detail::makeCoreCallbackState(
506 std::move(p), std::forward<F>(func))](Try<T> && t) mutable {
507 if (t.hasException()) {
508 state.setTry(makeTryWith(
509 [&] { return state.invoke(std::move(t.exception())); }));
511 state.setTry(std::move(t));
519 typename std::add_lvalue_reference<T>::type Future<T>::value() {
522 return core_->getTry().value();
526 typename std::add_lvalue_reference<const T>::type Future<T>::value() const {
529 return core_->getTry().value();
533 Try<T>& Future<T>::getTry() {
536 return core_->getTry();
540 Try<T>& Future<T>::getTryVia(DrivableExecutor* e) {
541 return waitVia(e).getTry();
545 Optional<Try<T>> Future<T>::poll() {
547 if (core_->ready()) {
548 o = std::move(core_->getTry());
554 inline Future<T> Future<T>::via(Executor* executor, int8_t priority) && {
557 setExecutor(executor, priority);
559 return std::move(*this);
563 inline Future<T> Future<T>::via(Executor* executor, int8_t priority) & {
567 auto f = p.getFuture();
568 then([p = std::move(p)](Try<T> && t) mutable { p.setTry(std::move(t)); });
569 return std::move(f).via(executor, priority);
572 template <class Func>
573 auto via(Executor* x, Func&& func)
574 -> Future<typename isFuture<decltype(std::declval<Func>()())>::Inner> {
575 // TODO make this actually more performant. :-P #7260175
576 return via(x).then(std::forward<Func>(func));
580 bool Future<T>::isReady() const {
582 return core_->ready();
586 bool Future<T>::hasValue() {
587 return getTry().hasValue();
591 bool Future<T>::hasException() {
592 return getTry().hasException();
596 void Future<T>::raise(exception_wrapper exception) {
597 core_->raise(std::move(exception));
601 Future<T>::Future(futures::detail::EmptyConstruct) noexcept : core_(nullptr) {}
606 Future<typename std::decay<T>::type> makeFuture(T&& t) {
607 return makeFuture(Try<typename std::decay<T>::type>(std::forward<T>(t)));
610 inline // for multiple translation units
611 Future<Unit> makeFuture() {
612 return makeFuture(Unit{});
615 // makeFutureWith(Future<T>()) -> Future<T>
617 typename std::enable_if<isFuture<typename std::result_of<F()>::type>::value,
618 typename std::result_of<F()>::type>::type
619 makeFutureWith(F&& func) {
621 typename isFuture<typename std::result_of<F()>::type>::Inner;
623 return std::forward<F>(func)();
624 } catch (std::exception& e) {
625 return makeFuture<InnerType>(
626 exception_wrapper(std::current_exception(), e));
628 return makeFuture<InnerType>(exception_wrapper(std::current_exception()));
632 // makeFutureWith(T()) -> Future<T>
633 // makeFutureWith(void()) -> Future<Unit>
635 typename std::enable_if<
636 !(isFuture<typename std::result_of<F()>::type>::value),
637 Future<typename Unit::Lift<typename std::result_of<F()>::type>::type>>::type
638 makeFutureWith(F&& func) {
640 typename Unit::Lift<typename std::result_of<F()>::type>::type;
641 return makeFuture<LiftedResult>(
642 makeTryWith([&func]() mutable { return std::forward<F>(func)(); }));
646 Future<T> makeFuture(std::exception_ptr const& e) {
647 return makeFuture(Try<T>(e));
651 Future<T> makeFuture(exception_wrapper ew) {
652 return makeFuture(Try<T>(std::move(ew)));
655 template <class T, class E>
656 typename std::enable_if<std::is_base_of<std::exception, E>::value,
658 makeFuture(E const& e) {
659 return makeFuture(Try<T>(make_exception_wrapper<E>(e)));
663 Future<T> makeFuture(Try<T>&& t) {
664 return Future<T>(new futures::detail::Core<T>(std::move(t)));
668 Future<Unit> via(Executor* executor, int8_t priority) {
669 return makeFuture().via(executor, priority);
672 // mapSetCallback calls func(i, Try<T>) when every future completes
674 template <class T, class InputIterator, class F>
675 void mapSetCallback(InputIterator first, InputIterator last, F func) {
676 for (size_t i = 0; first != last; ++first, ++i) {
677 first->setCallback_([func, i](Try<T>&& t) {
678 func(i, std::move(t));
683 // collectAll (variadic)
685 template <typename... Fs>
686 typename futures::detail::CollectAllVariadicContext<
687 typename std::decay<Fs>::type::value_type...>::type
688 collectAll(Fs&&... fs) {
689 auto ctx = std::make_shared<futures::detail::CollectAllVariadicContext<
690 typename std::decay<Fs>::type::value_type...>>();
691 futures::detail::collectVariadicHelper<
692 futures::detail::CollectAllVariadicContext>(ctx, std::forward<Fs>(fs)...);
693 return ctx->p.getFuture();
696 // collectAll (iterator)
698 template <class InputIterator>
701 Try<typename std::iterator_traits<InputIterator>::value_type::value_type>>>
702 collectAll(InputIterator first, InputIterator last) {
704 typename std::iterator_traits<InputIterator>::value_type::value_type T;
706 struct CollectAllContext {
707 CollectAllContext(size_t n) : results(n) {}
708 ~CollectAllContext() {
709 p.setValue(std::move(results));
711 Promise<std::vector<Try<T>>> p;
712 std::vector<Try<T>> results;
716 std::make_shared<CollectAllContext>(size_t(std::distance(first, last)));
717 mapSetCallback<T>(first, last, [ctx](size_t i, Try<T>&& t) {
718 ctx->results[i] = std::move(t);
720 return ctx->p.getFuture();
723 // collect (iterator)
728 template <typename T>
729 struct CollectContext {
731 explicit Nothing(int /* n */) {}
734 using Result = typename std::conditional<
735 std::is_void<T>::value,
737 std::vector<T>>::type;
739 using InternalResult = typename std::conditional<
740 std::is_void<T>::value,
742 std::vector<Optional<T>>>::type;
744 explicit CollectContext(size_t n) : result(n) {}
746 if (!threw.exchange(true)) {
747 // map Optional<T> -> T
748 std::vector<T> finalResult;
749 finalResult.reserve(result.size());
750 std::transform(result.begin(), result.end(),
751 std::back_inserter(finalResult),
752 [](Optional<T>& o) { return std::move(o.value()); });
753 p.setValue(std::move(finalResult));
756 inline void setPartialResult(size_t i, Try<T>& t) {
757 result[i] = std::move(t.value());
760 InternalResult result;
761 std::atomic<bool> threw {false};
764 } // namespace detail
765 } // namespace futures
767 template <class InputIterator>
768 Future<typename futures::detail::CollectContext<typename std::iterator_traits<
769 InputIterator>::value_type::value_type>::Result>
770 collect(InputIterator first, InputIterator last) {
772 typename std::iterator_traits<InputIterator>::value_type::value_type T;
774 auto ctx = std::make_shared<futures::detail::CollectContext<T>>(
775 std::distance(first, last));
776 mapSetCallback<T>(first, last, [ctx](size_t i, Try<T>&& t) {
777 if (t.hasException()) {
778 if (!ctx->threw.exchange(true)) {
779 ctx->p.setException(std::move(t.exception()));
781 } else if (!ctx->threw) {
782 ctx->setPartialResult(i, t);
785 return ctx->p.getFuture();
788 // collect (variadic)
790 template <typename... Fs>
791 typename futures::detail::CollectVariadicContext<
792 typename std::decay<Fs>::type::value_type...>::type
793 collect(Fs&&... fs) {
794 auto ctx = std::make_shared<futures::detail::CollectVariadicContext<
795 typename std::decay<Fs>::type::value_type...>>();
796 futures::detail::collectVariadicHelper<
797 futures::detail::CollectVariadicContext>(ctx, std::forward<Fs>(fs)...);
798 return ctx->p.getFuture();
801 // collectAny (iterator)
803 template <class InputIterator>
808 std::iterator_traits<InputIterator>::value_type::value_type>>>
809 collectAny(InputIterator first, InputIterator last) {
811 typename std::iterator_traits<InputIterator>::value_type::value_type T;
813 struct CollectAnyContext {
814 CollectAnyContext() {}
815 Promise<std::pair<size_t, Try<T>>> p;
816 std::atomic<bool> done {false};
819 auto ctx = std::make_shared<CollectAnyContext>();
820 mapSetCallback<T>(first, last, [ctx](size_t i, Try<T>&& t) {
821 if (!ctx->done.exchange(true)) {
822 ctx->p.setValue(std::make_pair(i, std::move(t)));
825 return ctx->p.getFuture();
828 // collectAnyWithoutException (iterator)
830 template <class InputIterator>
833 typename std::iterator_traits<InputIterator>::value_type::value_type>>
834 collectAnyWithoutException(InputIterator first, InputIterator last) {
836 typename std::iterator_traits<InputIterator>::value_type::value_type T;
838 struct CollectAnyWithoutExceptionContext {
839 CollectAnyWithoutExceptionContext(){}
840 Promise<std::pair<size_t, T>> p;
841 std::atomic<bool> done{false};
842 std::atomic<size_t> nFulfilled{0};
846 auto ctx = std::make_shared<CollectAnyWithoutExceptionContext>();
847 ctx->nTotal = size_t(std::distance(first, last));
849 mapSetCallback<T>(first, last, [ctx](size_t i, Try<T>&& t) {
850 if (!t.hasException() && !ctx->done.exchange(true)) {
851 ctx->p.setValue(std::make_pair(i, std::move(t.value())));
852 } else if (++ctx->nFulfilled == ctx->nTotal) {
853 ctx->p.setException(t.exception());
856 return ctx->p.getFuture();
859 // collectN (iterator)
861 template <class InputIterator>
862 Future<std::vector<std::pair<size_t, Try<typename
863 std::iterator_traits<InputIterator>::value_type::value_type>>>>
864 collectN(InputIterator first, InputIterator last, size_t n) {
866 std::iterator_traits<InputIterator>::value_type::value_type T;
867 typedef std::vector<std::pair<size_t, Try<T>>> V;
869 struct CollectNContext {
871 std::atomic<size_t> completed = {0};
874 auto ctx = std::make_shared<CollectNContext>();
876 if (size_t(std::distance(first, last)) < n) {
877 ctx->p.setException(std::runtime_error("Not enough futures"));
879 // for each completed Future, increase count and add to vector, until we
880 // have n completed futures at which point we fulfil our Promise with the
882 mapSetCallback<T>(first, last, [ctx, n](size_t i, Try<T>&& t) {
883 auto c = ++ctx->completed;
885 assert(ctx->v.size() < n);
886 ctx->v.emplace_back(i, std::move(t));
888 ctx->p.setTry(Try<V>(std::move(ctx->v)));
894 return ctx->p.getFuture();
899 template <class It, class T, class F>
900 Future<T> reduce(It first, It last, T&& initial, F&& func) {
902 return makeFuture(std::move(initial));
905 typedef typename std::iterator_traits<It>::value_type::value_type ItT;
906 typedef typename std::conditional<
907 futures::detail::callableWith<F, T&&, Try<ItT>&&>::value,
910 typedef isTry<Arg> IsTry;
912 auto sfunc = std::make_shared<F>(std::move(func));
914 auto f = first->then(
915 [ minitial = std::move(initial), sfunc ](Try<ItT> & head) mutable {
917 std::move(minitial), head.template get<IsTry::value, Arg&&>());
920 for (++first; first != last; ++first) {
921 f = collectAll(f, *first).then([sfunc](std::tuple<Try<T>, Try<ItT>>& t) {
922 return (*sfunc)(std::move(std::get<0>(t).value()),
923 // Either return a ItT&& or a Try<ItT>&& depending
924 // on the type of the argument of func.
925 std::get<1>(t).template get<IsTry::value, Arg&&>());
932 // window (collection)
934 template <class Collection, class F, class ItT, class Result>
935 std::vector<Future<Result>>
936 window(Collection input, F func, size_t n) {
937 struct WindowContext {
938 WindowContext(Collection&& i, F&& fn)
939 : input_(std::move(i)), promises_(input_.size()),
942 std::atomic<size_t> i_ {0};
944 std::vector<Promise<Result>> promises_;
947 static inline void spawn(const std::shared_ptr<WindowContext>& ctx) {
948 size_t i = ctx->i_++;
949 if (i < ctx->input_.size()) {
950 // Using setCallback_ directly since we don't need the Future
951 ctx->func_(std::move(ctx->input_[i])).setCallback_(
952 // ctx is captured by value
953 [ctx, i](Try<Result>&& t) {
954 ctx->promises_[i].setTry(std::move(t));
955 // Chain another future onto this one
956 spawn(std::move(ctx));
962 auto max = std::min(n, input.size());
964 auto ctx = std::make_shared<WindowContext>(
965 std::move(input), std::move(func));
967 for (size_t i = 0; i < max; ++i) {
968 // Start the first n Futures
969 WindowContext::spawn(ctx);
972 std::vector<Future<Result>> futures;
973 futures.reserve(ctx->promises_.size());
974 for (auto& promise : ctx->promises_) {
975 futures.emplace_back(promise.getFuture());
984 template <class I, class F>
985 Future<I> Future<T>::reduce(I&& initial, F&& func) {
987 minitial = std::forward<I>(initial),
988 mfunc = std::forward<F>(func)
990 auto ret = std::move(minitial);
991 for (auto& val : vals) {
992 ret = mfunc(std::move(ret), std::move(val));
998 // unorderedReduce (iterator)
1000 template <class It, class T, class F, class ItT, class Arg>
1001 Future<T> unorderedReduce(It first, It last, T initial, F func) {
1002 if (first == last) {
1003 return makeFuture(std::move(initial));
1006 typedef isTry<Arg> IsTry;
1008 struct UnorderedReduceContext {
1009 UnorderedReduceContext(T&& memo, F&& fn, size_t n)
1010 : lock_(), memo_(makeFuture<T>(std::move(memo))),
1011 func_(std::move(fn)), numThens_(0), numFutures_(n), promise_()
1013 folly::MicroSpinLock lock_; // protects memo_ and numThens_
1016 size_t numThens_; // how many Futures completed and called .then()
1017 size_t numFutures_; // how many Futures in total
1018 Promise<T> promise_;
1021 auto ctx = std::make_shared<UnorderedReduceContext>(
1022 std::move(initial), std::move(func), std::distance(first, last));
1024 mapSetCallback<ItT>(
1027 [ctx](size_t /* i */, Try<ItT>&& t) {
1028 // Futures can be completed in any order, simultaneously.
1029 // To make this non-blocking, we create a new Future chain in
1030 // the order of completion to reduce the values.
1031 // The spinlock just protects chaining a new Future, not actually
1032 // executing the reduce, which should be really fast.
1033 folly::MSLGuard lock(ctx->lock_);
1035 ctx->memo_.then([ ctx, mt = std::move(t) ](T && v) mutable {
1036 // Either return a ItT&& or a Try<ItT>&& depending
1037 // on the type of the argument of func.
1038 return ctx->func_(std::move(v),
1039 mt.template get<IsTry::value, Arg&&>());
1041 if (++ctx->numThens_ == ctx->numFutures_) {
1042 // After reducing the value of the last Future, fulfill the Promise
1043 ctx->memo_.setCallback_(
1044 [ctx](Try<T>&& t2) { ctx->promise_.setValue(std::move(t2)); });
1048 return ctx->promise_.getFuture();
1054 Future<T> Future<T>::within(Duration dur, Timekeeper* tk) {
1055 return within(dur, TimedOut(), tk);
1060 Future<T> Future<T>::within(Duration dur, E e, Timekeeper* tk) {
1063 Context(E ex) : exception(std::move(ex)), promise() {}
1065 Future<Unit> thisFuture;
1067 std::atomic<bool> token {false};
1070 std::shared_ptr<Timekeeper> tks;
1072 tks = folly::detail::getTimekeeperSingleton();
1073 tk = DCHECK_NOTNULL(tks.get());
1076 auto ctx = std::make_shared<Context>(std::move(e));
1078 ctx->thisFuture = this->then([ctx](Try<T>&& t) mutable {
1079 if (ctx->token.exchange(true) == false) {
1080 ctx->promise.setTry(std::move(t));
1084 // Have time keeper use a weak ptr to hold ctx,
1085 // so that ctx can be deallocated as soon as the future job finished.
1086 tk->after(dur).then([weakCtx = to_weak_ptr(ctx)](Try<Unit> const& t) mutable {
1087 auto lockedCtx = weakCtx.lock();
1089 // ctx already released. "this" completed first, cancel "after"
1092 // "after" completed first, cancel "this"
1093 lockedCtx->thisFuture.raise(TimedOut());
1094 if (lockedCtx->token.exchange(true) == false) {
1095 if (t.hasException()) {
1096 lockedCtx->promise.setException(std::move(t.exception()));
1098 lockedCtx->promise.setException(std::move(lockedCtx->exception));
1103 return ctx->promise.getFuture().via(getExecutor());
1109 Future<T> Future<T>::delayed(Duration dur, Timekeeper* tk) {
1110 return collectAll(*this, futures::sleep(dur, tk))
1111 .then([](std::tuple<Try<T>, Try<Unit>> tup) {
1112 Try<T>& t = std::get<0>(tup);
1113 return makeFuture<T>(std::move(t));
1121 void waitImpl(Future<T>& f) {
1122 // short-circuit if there's nothing to do
1123 if (f.isReady()) return;
1125 FutureBatonType baton;
1126 f.setCallback_([&](const Try<T>& /* t */) { baton.post(); });
1128 assert(f.isReady());
1132 void waitImpl(Future<T>& f, Duration dur) {
1133 // short-circuit if there's nothing to do
1139 auto ret = promise.getFuture();
1140 auto baton = std::make_shared<FutureBatonType>();
1141 f.setCallback_([ baton, promise = std::move(promise) ](Try<T> && t) mutable {
1142 promise.setTry(std::move(t));
1146 if (baton->timed_wait(dur)) {
1147 assert(f.isReady());
1152 void waitViaImpl(Future<T>& f, DrivableExecutor* e) {
1153 // Set callback so to ensure that the via executor has something on it
1154 // so that once the preceding future triggers this callback, drive will
1155 // always have a callback to satisfy it
1158 f = f.via(e).then([](T&& t) { return std::move(t); });
1159 while (!f.isReady()) {
1162 assert(f.isReady());
1165 } // namespace detail
1166 } // namespace futures
1169 Future<T>& Future<T>::wait() & {
1170 futures::detail::waitImpl(*this);
1175 Future<T>&& Future<T>::wait() && {
1176 futures::detail::waitImpl(*this);
1177 return std::move(*this);
1181 Future<T>& Future<T>::wait(Duration dur) & {
1182 futures::detail::waitImpl(*this, dur);
1187 Future<T>&& Future<T>::wait(Duration dur) && {
1188 futures::detail::waitImpl(*this, dur);
1189 return std::move(*this);
1193 Future<T>& Future<T>::waitVia(DrivableExecutor* e) & {
1194 futures::detail::waitViaImpl(*this, e);
1199 Future<T>&& Future<T>::waitVia(DrivableExecutor* e) && {
1200 futures::detail::waitViaImpl(*this, e);
1201 return std::move(*this);
1205 T Future<T>::get() {
1206 return std::move(wait().value());
1210 T Future<T>::get(Duration dur) {
1213 return std::move(value());
1220 T Future<T>::getVia(DrivableExecutor* e) {
1221 return std::move(waitVia(e).value());
1228 static bool equals(const Try<T>& t1, const Try<T>& t2) {
1229 return t1.value() == t2.value();
1232 } // namespace detail
1233 } // namespace futures
1236 Future<bool> Future<T>::willEqual(Future<T>& f) {
1237 return collectAll(*this, f).then([](const std::tuple<Try<T>, Try<T>>& t) {
1238 if (std::get<0>(t).hasValue() && std::get<1>(t).hasValue()) {
1239 return futures::detail::TryEquals<T>::equals(
1240 std::get<0>(t), std::get<1>(t));
1249 Future<T> Future<T>::filter(F&& predicate) {
1250 return this->then([p = std::forward<F>(predicate)](T val) {
1251 T const& valConstRef = val;
1252 if (!p(valConstRef)) {
1253 throwPredicateDoesNotObtain();
1260 inline Future<Unit> when(bool p, F&& thunk) {
1261 return p ? std::forward<F>(thunk)().unit() : makeFuture();
1264 template <class P, class F>
1265 Future<Unit> whileDo(P&& predicate, F&& thunk) {
1267 auto future = thunk();
1268 return future.then([
1269 predicate = std::forward<P>(predicate),
1270 thunk = std::forward<F>(thunk)
1272 return whileDo(std::forward<P>(predicate), std::forward<F>(thunk));
1275 return makeFuture();
1279 Future<Unit> times(const int n, F&& thunk) {
1280 return folly::whileDo(
1281 [ n, count = std::make_unique<std::atomic<int>>(0) ]() mutable {
1282 return count->fetch_add(1) < n;
1284 std::forward<F>(thunk));
1288 template <class It, class F, class ItT, class Result>
1289 std::vector<Future<Result>> map(It first, It last, F func) {
1290 std::vector<Future<Result>> results;
1291 for (auto it = first; it != last; it++) {
1292 results.push_back(it->then(func));
1302 struct retrying_policy_raw_tag {};
1303 struct retrying_policy_fut_tag {};
1305 template <class Policy>
1306 struct retrying_policy_traits {
1307 using result = std::result_of_t<Policy(size_t, const exception_wrapper&)>;
1308 using is_raw = std::is_same<result, bool>;
1309 using is_fut = std::is_same<result, Future<bool>>;
1310 using tag = typename std::conditional<
1311 is_raw::value, retrying_policy_raw_tag, typename std::conditional<
1312 is_fut::value, retrying_policy_fut_tag, void>::type>::type;
1315 template <class Policy, class FF, class Prom>
1316 void retryingImpl(size_t k, Policy&& p, FF&& ff, Prom prom) {
1317 using F = typename std::result_of<FF(size_t)>::type;
1318 using T = typename F::value_type;
1319 auto f = makeFutureWith([&] { return ff(k++); });
1322 prom = std::move(prom),
1323 pm = std::forward<Policy>(p),
1324 ffm = std::forward<FF>(ff)
1325 ](Try<T> && t) mutable {
1327 prom.setValue(std::move(t).value());
1330 auto& x = t.exception();
1334 prom = std::move(prom),
1337 ffm = std::move(ffm)
1338 ](bool shouldRetry) mutable {
1340 retryingImpl(k, std::move(pm), std::move(ffm), std::move(prom));
1342 prom.setException(std::move(xm));
1348 template <class Policy, class FF>
1349 typename std::result_of<FF(size_t)>::type
1350 retrying(size_t k, Policy&& p, FF&& ff) {
1351 using F = typename std::result_of<FF(size_t)>::type;
1352 using T = typename F::value_type;
1353 auto prom = Promise<T>();
1354 auto f = prom.getFuture();
1356 k, std::forward<Policy>(p), std::forward<FF>(ff), std::move(prom));
1360 template <class Policy, class FF>
1361 typename std::result_of<FF(size_t)>::type
1362 retrying(Policy&& p, FF&& ff, retrying_policy_raw_tag) {
1363 auto q = [pm = std::forward<Policy>(p)](size_t k, exception_wrapper x) {
1364 return makeFuture<bool>(pm(k, x));
1366 return retrying(0, std::move(q), std::forward<FF>(ff));
1369 template <class Policy, class FF>
1370 typename std::result_of<FF(size_t)>::type
1371 retrying(Policy&& p, FF&& ff, retrying_policy_fut_tag) {
1372 return retrying(0, std::forward<Policy>(p), std::forward<FF>(ff));
1375 // jittered exponential backoff, clamped to [backoff_min, backoff_max]
1376 template <class URNG>
1377 Duration retryingJitteredExponentialBackoffDur(
1379 Duration backoff_min,
1380 Duration backoff_max,
1381 double jitter_param,
1384 auto dist = std::normal_distribution<double>(0.0, jitter_param);
1385 auto jitter = std::exp(dist(rng));
1386 auto backoff = d(d::rep(jitter * backoff_min.count() * std::pow(2, n - 1)));
1387 return std::max(backoff_min, std::min(backoff_max, backoff));
1390 template <class Policy, class URNG>
1391 std::function<Future<bool>(size_t, const exception_wrapper&)>
1392 retryingPolicyCappedJitteredExponentialBackoff(
1394 Duration backoff_min,
1395 Duration backoff_max,
1396 double jitter_param,
1400 pm = std::forward<Policy>(p),
1405 rngp = std::forward<URNG>(rng)
1406 ](size_t n, const exception_wrapper& ex) mutable {
1407 if (n == max_tries) {
1408 return makeFuture(false);
1410 return pm(n, ex).then(
1411 [ n, backoff_min, backoff_max, jitter_param, rngp = std::move(rngp) ](
1414 return makeFuture(false);
1416 auto backoff = detail::retryingJitteredExponentialBackoffDur(
1417 n, backoff_min, backoff_max, jitter_param, rngp);
1418 return futures::sleep(backoff).then([] { return true; });
1423 template <class Policy, class URNG>
1424 std::function<Future<bool>(size_t, const exception_wrapper&)>
1425 retryingPolicyCappedJitteredExponentialBackoff(
1427 Duration backoff_min,
1428 Duration backoff_max,
1429 double jitter_param,
1432 retrying_policy_raw_tag) {
1433 auto q = [pm = std::forward<Policy>(p)](
1434 size_t n, const exception_wrapper& e) {
1435 return makeFuture(pm(n, e));
1437 return retryingPolicyCappedJitteredExponentialBackoff(
1442 std::forward<URNG>(rng),
1446 template <class Policy, class URNG>
1447 std::function<Future<bool>(size_t, const exception_wrapper&)>
1448 retryingPolicyCappedJitteredExponentialBackoff(
1450 Duration backoff_min,
1451 Duration backoff_max,
1452 double jitter_param,
1455 retrying_policy_fut_tag) {
1456 return retryingPolicyCappedJitteredExponentialBackoff(
1461 std::forward<URNG>(rng),
1462 std::forward<Policy>(p));
1466 template <class Policy, class FF>
1467 typename std::result_of<FF(size_t)>::type
1468 retrying(Policy&& p, FF&& ff) {
1469 using tag = typename detail::retrying_policy_traits<Policy>::tag;
1470 return detail::retrying(std::forward<Policy>(p), std::forward<FF>(ff), tag());
1474 std::function<bool(size_t, const exception_wrapper&)>
1475 retryingPolicyBasic(
1477 return [=](size_t n, const exception_wrapper&) { return n < max_tries; };
1480 template <class Policy, class URNG>
1481 std::function<Future<bool>(size_t, const exception_wrapper&)>
1482 retryingPolicyCappedJitteredExponentialBackoff(
1484 Duration backoff_min,
1485 Duration backoff_max,
1486 double jitter_param,
1489 using tag = typename detail::retrying_policy_traits<Policy>::tag;
1490 return detail::retryingPolicyCappedJitteredExponentialBackoff(
1495 std::forward<URNG>(rng),
1496 std::forward<Policy>(p),
1501 std::function<Future<bool>(size_t, const exception_wrapper&)>
1502 retryingPolicyCappedJitteredExponentialBackoff(
1504 Duration backoff_min,
1505 Duration backoff_max,
1506 double jitter_param) {
1507 auto p = [](size_t, const exception_wrapper&) { return true; };
1508 return retryingPolicyCappedJitteredExponentialBackoff(
1519 // Instantiate the most common Future types to save compile time
1520 extern template class Future<Unit>;
1521 extern template class Future<bool>;
1522 extern template class Future<int>;
1523 extern template class Future<int64_t>;
1524 extern template class Future<std::string>;
1525 extern template class Future<double>;
1527 } // namespace folly