2 * Copyright 2017 Facebook, Inc.
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
8 * http://www.apache.org/licenses/LICENSE-2.0
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
25 #include <folly/Baton.h>
26 #include <folly/Optional.h>
27 #include <folly/Random.h>
28 #include <folly/futures/Timekeeper.h>
29 #include <folly/futures/detail/Core.h>
31 #ifndef FOLLY_FUTURE_USING_FIBER
32 #if FOLLY_MOBILE || defined(__APPLE__)
33 #define FOLLY_FUTURE_USING_FIBER 0
35 #define FOLLY_FUTURE_USING_FIBER 1
36 #include <folly/fibers/Baton.h>
46 #if FOLLY_FUTURE_USING_FIBER
47 typedef folly::fibers::Baton FutureBatonType;
49 typedef folly::Baton<> FutureBatonType;
52 } // namespace futures
55 std::shared_ptr<Timekeeper> getTimekeeperSingleton();
60 // Guarantees that the stored functor is destructed before the stored promise
61 // may be fulfilled. Assumes the stored functor to be noexcept-destructible.
62 template <typename T, typename F>
63 class CoreCallbackState {
65 template <typename FF>
66 CoreCallbackState(Promise<T>&& promise, FF&& func) noexcept(
67 noexcept(F(std::declval<FF>())))
68 : func_(std::forward<FF>(func)), promise_(std::move(promise)) {
69 assert(before_barrier());
72 CoreCallbackState(CoreCallbackState&& that) noexcept(
73 noexcept(F(std::declval<F>()))) {
74 if (that.before_barrier()) {
75 new (&func_) F(std::move(that.func_));
76 promise_ = that.stealPromise();
80 CoreCallbackState& operator=(CoreCallbackState&&) = delete;
82 ~CoreCallbackState() {
83 if (before_barrier()) {
88 template <typename... Args>
89 auto invoke(Args&&... args) noexcept(
90 noexcept(std::declval<F&&>()(std::declval<Args&&>()...))) {
91 assert(before_barrier());
92 return std::move(func_)(std::forward<Args>(args)...);
95 template <typename... Args>
96 auto tryInvoke(Args&&... args) noexcept {
97 return makeTryWith([&] { return invoke(std::forward<Args>(args)...); });
100 void setTry(Try<T>&& t) {
101 stealPromise().setTry(std::move(t));
104 void setException(exception_wrapper&& ew) {
105 stealPromise().setException(std::move(ew));
108 Promise<T> stealPromise() noexcept {
109 assert(before_barrier());
111 return std::move(promise_);
115 bool before_barrier() const noexcept {
116 return !promise_.isFulfilled();
122 Promise<T> promise_{Promise<T>::makeEmpty()};
125 template <typename T, typename F>
126 inline auto makeCoreCallbackState(Promise<T>&& p, F&& f) noexcept(
127 noexcept(CoreCallbackState<T, _t<std::decay<F>>>(
128 std::declval<Promise<T>&&>(),
129 std::declval<F&&>()))) {
130 return CoreCallbackState<T, _t<std::decay<F>>>(
131 std::move(p), std::forward<F>(f));
133 } // namespace detail
134 } // namespace futures
137 Future<T> Future<T>::makeEmpty() {
138 return Future<T>(futures::detail::EmptyConstruct{});
142 Future<T>::Future(Future<T>&& other) noexcept : core_(other.core_) {
143 other.core_ = nullptr;
147 Future<T>& Future<T>::operator=(Future<T>&& other) noexcept {
148 std::swap(core_, other.core_);
155 typename std::enable_if<
156 !std::is_same<T, typename std::decay<T2>::type>::value &&
157 std::is_constructible<T, T2&&>::value &&
158 std::is_convertible<T2&&, T>::value,
160 Future<T>::Future(Future<T2>&& other)
161 : Future(std::move(other).then([](T2&& v) { return T(std::move(v)); })) {}
166 typename std::enable_if<
167 !std::is_same<T, typename std::decay<T2>::type>::value &&
168 std::is_constructible<T, T2&&>::value &&
169 !std::is_convertible<T2&&, T>::value,
171 Future<T>::Future(Future<T2>&& other)
172 : Future(std::move(other).then([](T2&& v) { return T(std::move(v)); })) {}
177 typename std::enable_if<
178 !std::is_same<T, typename std::decay<T2>::type>::value &&
179 std::is_constructible<T, T2&&>::value,
181 Future<T>& Future<T>::operator=(Future<T2>&& other) {
183 std::move(other).then([](T2&& v) { return T(std::move(v)); }));
187 template <class T2, typename>
188 Future<T>::Future(T2&& val)
189 : core_(new futures::detail::Core<T>(Try<T>(std::forward<T2>(val)))) {}
192 template <typename T2>
193 Future<T>::Future(typename std::enable_if<std::is_same<Unit, T2>::value>::type*)
194 : core_(new futures::detail::Core<T>(Try<T>(T()))) {}
199 typename std::enable_if<std::is_constructible<T, Args&&...>::value, int>::
201 Future<T>::Future(in_place_t, Args&&... args)
203 new futures::detail::Core<T>(in_place, std::forward<Args>(args)...)) {
207 Future<T>::~Future() {
212 void Future<T>::detach() {
214 core_->detachFuture();
220 void Future<T>::throwIfInvalid() const {
227 void Future<T>::setCallback_(F&& func) {
229 core_->setCallback(std::forward<F>(func));
236 typename std::enable_if<isFuture<F>::value,
237 Future<typename isFuture<T>::Inner>>::type
238 Future<T>::unwrap() {
239 return then([](Future<typename isFuture<T>::Inner> internal_future) {
240 return internal_future;
246 // Variant: returns a value
247 // e.g. f.then([](Try<T>&& t){ return t.value(); });
249 template <typename F, typename R, bool isTry, typename... Args>
250 typename std::enable_if<!R::ReturnsFuture::value, typename R::Return>::type
251 Future<T>::thenImplementation(
253 futures::detail::argResult<isTry, F, Args...>) {
254 static_assert(sizeof...(Args) <= 1, "Then must take zero/one argument");
255 typedef typename R::ReturnsFuture::Inner B;
260 p.core_->setInterruptHandlerNoLock(core_->getInterruptHandler());
262 // grab the Future now before we lose our handle on the Promise
263 auto f = p.getFuture();
264 f.core_->setExecutorNoLock(getExecutor());
266 /* This is a bit tricky.
268 We can't just close over *this in case this Future gets moved. So we
269 make a new dummy Future. We could figure out something more
270 sophisticated that avoids making a new Future object when it can, as an
271 optimization. But this is correct.
273 core_ can't be moved, it is explicitly disallowed (as is copying). But
274 if there's ever a reason to allow it, this is one place that makes that
275 assumption and would need to be fixed. We use a standard shared pointer
276 for core_ (by copying it in), which means in essence obj holds a shared
277 pointer to itself. But this shouldn't leak because Promise will not
278 outlive the continuation, because Promise will setException() with a
279 broken Promise if it is destructed before completed. We could use a
280 weak pointer but it would have to be converted to a shared pointer when
281 func is executed (because the Future returned by func may possibly
282 persist beyond the callback, if it gets moved), and so it is an
283 optimization to just make it shared from the get-go.
285 Two subtle but important points about this design. futures::detail::Core
286 has no back pointers to Future or Promise, so if Future or Promise get
287 moved (and they will be moved in performant code) we don't have to do
288 anything fancy. And because we store the continuation in the
289 futures::detail::Core, not in the Future, we can execute the continuation
290 even after the Future has gone out of scope. This is an intentional design
291 decision. It is likely we will want to be able to cancel a continuation
292 in some circumstances, but I think it should be explicit not implicit
293 in the destruction of the Future used to create it.
296 [state = futures::detail::makeCoreCallbackState(
297 std::move(p), std::forward<F>(func))](Try<T> && t) mutable {
298 if (!isTry && t.hasException()) {
299 state.setException(std::move(t.exception()));
301 state.setTry(makeTryWith(
302 [&] { return state.invoke(t.template get<isTry, Args>()...); }));
309 // Variant: returns a Future
310 // e.g. f.then([](T&& t){ return makeFuture<T>(t); });
312 template <typename F, typename R, bool isTry, typename... Args>
313 typename std::enable_if<R::ReturnsFuture::value, typename R::Return>::type
314 Future<T>::thenImplementation(
316 futures::detail::argResult<isTry, F, Args...>) {
317 static_assert(sizeof...(Args) <= 1, "Then must take zero/one argument");
318 typedef typename R::ReturnsFuture::Inner B;
323 p.core_->setInterruptHandlerNoLock(core_->getInterruptHandler());
325 // grab the Future now before we lose our handle on the Promise
326 auto f = p.getFuture();
327 f.core_->setExecutorNoLock(getExecutor());
330 [state = futures::detail::makeCoreCallbackState(
331 std::move(p), std::forward<F>(func))](Try<T> && t) mutable {
332 if (!isTry && t.hasException()) {
333 state.setException(std::move(t.exception()));
335 auto tf2 = state.tryInvoke(t.template get<isTry, Args>()...);
336 if (tf2.hasException()) {
337 state.setException(std::move(tf2.exception()));
339 tf2->setCallback_([p = state.stealPromise()](Try<B> && b) mutable {
340 p.setTry(std::move(b));
349 template <typename T>
350 template <typename R, typename Caller, typename... Args>
351 Future<typename isFuture<R>::Inner>
352 Future<T>::then(R(Caller::*func)(Args...), Caller *instance) {
353 typedef typename std::remove_cv<typename std::remove_reference<
354 typename futures::detail::ArgType<Args...>::FirstArg>::type>::type
356 return then([instance, func](Try<T>&& t){
357 return (instance->*func)(t.template get<isTry<FirstArg>::value, Args>()...);
362 Future<Unit> Future<T>::then() {
363 return then([] () {});
366 // onError where the callback returns T
369 typename std::enable_if<
370 !futures::detail::callableWith<F, exception_wrapper>::value &&
371 !futures::detail::callableWith<F, exception_wrapper&>::value &&
372 !futures::detail::Extract<F>::ReturnsFuture::value,
374 Future<T>::onError(F&& func) {
375 typedef std::remove_reference_t<
376 typename futures::detail::Extract<F>::FirstArg>
379 std::is_same<typename futures::detail::Extract<F>::RawReturn, T>::value,
380 "Return type of onError callback must be T or Future<T>");
383 p.core_->setInterruptHandlerNoLock(core_->getInterruptHandler());
384 auto f = p.getFuture();
387 [state = futures::detail::makeCoreCallbackState(
388 std::move(p), std::forward<F>(func))](Try<T> && t) mutable {
389 if (auto e = t.template tryGetExceptionObject<Exn>()) {
390 state.setTry(makeTryWith([&] { return state.invoke(*e); }));
392 state.setTry(std::move(t));
399 // onError where the callback returns Future<T>
402 typename std::enable_if<
403 !futures::detail::callableWith<F, exception_wrapper>::value &&
404 !futures::detail::callableWith<F, exception_wrapper&>::value &&
405 futures::detail::Extract<F>::ReturnsFuture::value,
407 Future<T>::onError(F&& func) {
409 std::is_same<typename futures::detail::Extract<F>::Return, Future<T>>::
411 "Return type of onError callback must be T or Future<T>");
412 typedef std::remove_reference_t<
413 typename futures::detail::Extract<F>::FirstArg>
417 auto f = p.getFuture();
420 [state = futures::detail::makeCoreCallbackState(
421 std::move(p), std::forward<F>(func))](Try<T> && t) mutable {
422 if (auto e = t.template tryGetExceptionObject<Exn>()) {
423 auto tf2 = state.tryInvoke(*e);
424 if (tf2.hasException()) {
425 state.setException(std::move(tf2.exception()));
427 tf2->setCallback_([p = state.stealPromise()](Try<T> && t3) mutable {
428 p.setTry(std::move(t3));
432 state.setTry(std::move(t));
441 Future<T> Future<T>::ensure(F&& func) {
442 return this->then([funcw = std::forward<F>(func)](Try<T> && t) mutable {
444 return makeFuture(std::move(t));
450 Future<T> Future<T>::onTimeout(Duration dur, F&& func, Timekeeper* tk) {
451 return within(dur, tk).onError([funcw = std::forward<F>(func)](
452 TimedOut const&) { return std::move(funcw)(); });
457 typename std::enable_if<
458 futures::detail::callableWith<F, exception_wrapper>::value &&
459 futures::detail::Extract<F>::ReturnsFuture::value,
461 Future<T>::onError(F&& func) {
463 std::is_same<typename futures::detail::Extract<F>::Return, Future<T>>::
465 "Return type of onError callback must be T or Future<T>");
468 auto f = p.getFuture();
470 [state = futures::detail::makeCoreCallbackState(
471 std::move(p), std::forward<F>(func))](Try<T> t) mutable {
472 if (t.hasException()) {
473 auto tf2 = state.tryInvoke(std::move(t.exception()));
474 if (tf2.hasException()) {
475 state.setException(std::move(tf2.exception()));
477 tf2->setCallback_([p = state.stealPromise()](Try<T> && t3) mutable {
478 p.setTry(std::move(t3));
482 state.setTry(std::move(t));
489 // onError(exception_wrapper) that returns T
492 typename std::enable_if<
493 futures::detail::callableWith<F, exception_wrapper>::value &&
494 !futures::detail::Extract<F>::ReturnsFuture::value,
496 Future<T>::onError(F&& func) {
498 std::is_same<typename futures::detail::Extract<F>::Return, Future<T>>::
500 "Return type of onError callback must be T or Future<T>");
503 auto f = p.getFuture();
505 [state = futures::detail::makeCoreCallbackState(
506 std::move(p), std::forward<F>(func))](Try<T> && t) mutable {
507 if (t.hasException()) {
508 state.setTry(makeTryWith(
509 [&] { return state.invoke(std::move(t.exception())); }));
511 state.setTry(std::move(t));
519 typename std::add_lvalue_reference<T>::type Future<T>::value() {
522 return core_->getTry().value();
526 typename std::add_lvalue_reference<const T>::type Future<T>::value() const {
529 return core_->getTry().value();
533 Try<T>& Future<T>::getTry() {
536 return core_->getTry();
540 Try<T>& Future<T>::getTryVia(DrivableExecutor* e) {
541 return waitVia(e).getTry();
545 Optional<Try<T>> Future<T>::poll() {
547 if (core_->ready()) {
548 o = std::move(core_->getTry());
554 inline Future<T> Future<T>::via(Executor* executor, int8_t priority) && {
557 setExecutor(executor, priority);
559 return std::move(*this);
563 inline Future<T> Future<T>::via(Executor* executor, int8_t priority) & {
567 auto f = p.getFuture();
568 then([p = std::move(p)](Try<T> && t) mutable { p.setTry(std::move(t)); });
569 return std::move(f).via(executor, priority);
572 template <class Func>
573 auto via(Executor* x, Func&& func)
574 -> Future<typename isFuture<decltype(std::declval<Func>()())>::Inner> {
575 // TODO make this actually more performant. :-P #7260175
576 return via(x).then(std::forward<Func>(func));
580 bool Future<T>::isReady() const {
582 return core_->ready();
586 bool Future<T>::hasValue() {
587 return getTry().hasValue();
591 bool Future<T>::hasException() {
592 return getTry().hasException();
596 void Future<T>::raise(exception_wrapper exception) {
597 core_->raise(std::move(exception));
601 Future<T>::Future(futures::detail::EmptyConstruct) noexcept : core_(nullptr) {}
606 Future<typename std::decay<T>::type> makeFuture(T&& t) {
607 return makeFuture(Try<typename std::decay<T>::type>(std::forward<T>(t)));
610 inline // for multiple translation units
611 Future<Unit> makeFuture() {
612 return makeFuture(Unit{});
615 // makeFutureWith(Future<T>()) -> Future<T>
617 typename std::enable_if<isFuture<typename std::result_of<F()>::type>::value,
618 typename std::result_of<F()>::type>::type
619 makeFutureWith(F&& func) {
621 typename isFuture<typename std::result_of<F()>::type>::Inner;
623 return std::forward<F>(func)();
624 } catch (std::exception& e) {
625 return makeFuture<InnerType>(
626 exception_wrapper(std::current_exception(), e));
628 return makeFuture<InnerType>(exception_wrapper(std::current_exception()));
632 // makeFutureWith(T()) -> Future<T>
633 // makeFutureWith(void()) -> Future<Unit>
635 typename std::enable_if<
636 !(isFuture<typename std::result_of<F()>::type>::value),
637 Future<typename Unit::Lift<typename std::result_of<F()>::type>::type>>::type
638 makeFutureWith(F&& func) {
640 typename Unit::Lift<typename std::result_of<F()>::type>::type;
641 return makeFuture<LiftedResult>(
642 makeTryWith([&func]() mutable { return std::forward<F>(func)(); }));
646 Future<T> makeFuture(std::exception_ptr const& e) {
647 return makeFuture(Try<T>(e));
651 Future<T> makeFuture(exception_wrapper ew) {
652 return makeFuture(Try<T>(std::move(ew)));
655 template <class T, class E>
656 typename std::enable_if<std::is_base_of<std::exception, E>::value,
658 makeFuture(E const& e) {
659 return makeFuture(Try<T>(make_exception_wrapper<E>(e)));
663 Future<T> makeFuture(Try<T>&& t) {
664 return Future<T>(new futures::detail::Core<T>(std::move(t)));
668 Future<Unit> via(Executor* executor, int8_t priority) {
669 return makeFuture().via(executor, priority);
672 // mapSetCallback calls func(i, Try<T>) when every future completes
674 template <class T, class InputIterator, class F>
675 void mapSetCallback(InputIterator first, InputIterator last, F func) {
676 for (size_t i = 0; first != last; ++first, ++i) {
677 first->setCallback_([func, i](Try<T>&& t) {
678 func(i, std::move(t));
683 // collectAll (variadic)
685 template <typename... Fs>
686 typename futures::detail::CollectAllVariadicContext<
687 typename std::decay<Fs>::type::value_type...>::type
688 collectAll(Fs&&... fs) {
689 auto ctx = std::make_shared<futures::detail::CollectAllVariadicContext<
690 typename std::decay<Fs>::type::value_type...>>();
691 futures::detail::collectVariadicHelper<
692 futures::detail::CollectAllVariadicContext>(ctx, std::forward<Fs>(fs)...);
693 return ctx->p.getFuture();
696 // collectAll (iterator)
698 template <class InputIterator>
701 Try<typename std::iterator_traits<InputIterator>::value_type::value_type>>>
702 collectAll(InputIterator first, InputIterator last) {
704 typename std::iterator_traits<InputIterator>::value_type::value_type T;
706 struct CollectAllContext {
707 CollectAllContext(size_t n) : results(n) {}
708 ~CollectAllContext() {
709 p.setValue(std::move(results));
711 Promise<std::vector<Try<T>>> p;
712 std::vector<Try<T>> results;
716 std::make_shared<CollectAllContext>(size_t(std::distance(first, last)));
717 mapSetCallback<T>(first, last, [ctx](size_t i, Try<T>&& t) {
718 ctx->results[i] = std::move(t);
720 return ctx->p.getFuture();
723 // collect (iterator)
728 template <typename T>
729 struct CollectContext {
731 explicit Nothing(int /* n */) {}
734 using Result = typename std::conditional<
735 std::is_void<T>::value,
737 std::vector<T>>::type;
739 using InternalResult = typename std::conditional<
740 std::is_void<T>::value,
742 std::vector<Optional<T>>>::type;
744 explicit CollectContext(size_t n) : result(n) {}
746 if (!threw.exchange(true)) {
747 // map Optional<T> -> T
748 std::vector<T> finalResult;
749 finalResult.reserve(result.size());
750 std::transform(result.begin(), result.end(),
751 std::back_inserter(finalResult),
752 [](Optional<T>& o) { return std::move(o.value()); });
753 p.setValue(std::move(finalResult));
756 inline void setPartialResult(size_t i, Try<T>& t) {
757 result[i] = std::move(t.value());
760 InternalResult result;
761 std::atomic<bool> threw {false};
764 } // namespace detail
765 } // namespace futures
767 template <class InputIterator>
768 Future<typename futures::detail::CollectContext<typename std::iterator_traits<
769 InputIterator>::value_type::value_type>::Result>
770 collect(InputIterator first, InputIterator last) {
772 typename std::iterator_traits<InputIterator>::value_type::value_type T;
774 auto ctx = std::make_shared<futures::detail::CollectContext<T>>(
775 std::distance(first, last));
776 mapSetCallback<T>(first, last, [ctx](size_t i, Try<T>&& t) {
777 if (t.hasException()) {
778 if (!ctx->threw.exchange(true)) {
779 ctx->p.setException(std::move(t.exception()));
781 } else if (!ctx->threw) {
782 ctx->setPartialResult(i, t);
785 return ctx->p.getFuture();
788 // collect (variadic)
790 template <typename... Fs>
791 typename futures::detail::CollectVariadicContext<
792 typename std::decay<Fs>::type::value_type...>::type
793 collect(Fs&&... fs) {
794 auto ctx = std::make_shared<futures::detail::CollectVariadicContext<
795 typename std::decay<Fs>::type::value_type...>>();
796 futures::detail::collectVariadicHelper<
797 futures::detail::CollectVariadicContext>(ctx, std::forward<Fs>(fs)...);
798 return ctx->p.getFuture();
801 // collectAny (iterator)
803 template <class InputIterator>
808 std::iterator_traits<InputIterator>::value_type::value_type>>>
809 collectAny(InputIterator first, InputIterator last) {
811 typename std::iterator_traits<InputIterator>::value_type::value_type T;
813 struct CollectAnyContext {
814 CollectAnyContext() {}
815 Promise<std::pair<size_t, Try<T>>> p;
816 std::atomic<bool> done {false};
819 auto ctx = std::make_shared<CollectAnyContext>();
820 mapSetCallback<T>(first, last, [ctx](size_t i, Try<T>&& t) {
821 if (!ctx->done.exchange(true)) {
822 ctx->p.setValue(std::make_pair(i, std::move(t)));
825 return ctx->p.getFuture();
828 // collectAnyWithoutException (iterator)
830 template <class InputIterator>
833 typename std::iterator_traits<InputIterator>::value_type::value_type>>
834 collectAnyWithoutException(InputIterator first, InputIterator last) {
836 typename std::iterator_traits<InputIterator>::value_type::value_type T;
838 struct CollectAnyWithoutExceptionContext {
839 CollectAnyWithoutExceptionContext(){}
840 Promise<std::pair<size_t, T>> p;
841 std::atomic<bool> done{false};
842 std::atomic<size_t> nFulfilled{0};
846 auto ctx = std::make_shared<CollectAnyWithoutExceptionContext>();
847 ctx->nTotal = size_t(std::distance(first, last));
849 mapSetCallback<T>(first, last, [ctx](size_t i, Try<T>&& t) {
850 if (!t.hasException() && !ctx->done.exchange(true)) {
851 ctx->p.setValue(std::make_pair(i, std::move(t.value())));
852 } else if (++ctx->nFulfilled == ctx->nTotal) {
853 ctx->p.setException(t.exception());
856 return ctx->p.getFuture();
859 // collectN (iterator)
861 template <class InputIterator>
862 Future<std::vector<std::pair<size_t, Try<typename
863 std::iterator_traits<InputIterator>::value_type::value_type>>>>
864 collectN(InputIterator first, InputIterator last, size_t n) {
866 std::iterator_traits<InputIterator>::value_type::value_type T;
867 typedef std::vector<std::pair<size_t, Try<T>>> V;
869 struct CollectNContext {
871 std::atomic<size_t> completed = {0};
874 auto ctx = std::make_shared<CollectNContext>();
876 if (size_t(std::distance(first, last)) < n) {
877 ctx->p.setException(std::runtime_error("Not enough futures"));
879 // for each completed Future, increase count and add to vector, until we
880 // have n completed futures at which point we fulfil our Promise with the
882 mapSetCallback<T>(first, last, [ctx, n](size_t i, Try<T>&& t) {
883 auto c = ++ctx->completed;
885 assert(ctx->v.size() < n);
886 ctx->v.emplace_back(i, std::move(t));
888 ctx->p.setTry(Try<V>(std::move(ctx->v)));
894 return ctx->p.getFuture();
899 template <class It, class T, class F>
900 Future<T> reduce(It first, It last, T&& initial, F&& func) {
902 return makeFuture(std::move(initial));
905 typedef typename std::iterator_traits<It>::value_type::value_type ItT;
906 typedef typename std::conditional<
907 futures::detail::callableWith<F, T&&, Try<ItT>&&>::value,
910 typedef isTry<Arg> IsTry;
912 auto sfunc = std::make_shared<F>(std::move(func));
914 auto f = first->then(
915 [ minitial = std::move(initial), sfunc ](Try<ItT> & head) mutable {
917 std::move(minitial), head.template get<IsTry::value, Arg&&>());
920 for (++first; first != last; ++first) {
921 f = collectAll(f, *first).then([sfunc](std::tuple<Try<T>, Try<ItT>>& t) {
922 return (*sfunc)(std::move(std::get<0>(t).value()),
923 // Either return a ItT&& or a Try<ItT>&& depending
924 // on the type of the argument of func.
925 std::get<1>(t).template get<IsTry::value, Arg&&>());
932 // window (collection)
934 template <class Collection, class F, class ItT, class Result>
935 std::vector<Future<Result>>
936 window(Collection input, F func, size_t n) {
937 struct WindowContext {
938 WindowContext(Collection&& i, F&& fn)
939 : input_(std::move(i)), promises_(input_.size()),
942 std::atomic<size_t> i_ {0};
944 std::vector<Promise<Result>> promises_;
947 static inline void spawn(const std::shared_ptr<WindowContext>& ctx) {
948 size_t i = ctx->i_++;
949 if (i < ctx->input_.size()) {
950 // Using setCallback_ directly since we don't need the Future
951 ctx->func_(std::move(ctx->input_[i])).setCallback_(
952 // ctx is captured by value
953 [ctx, i](Try<Result>&& t) {
954 ctx->promises_[i].setTry(std::move(t));
955 // Chain another future onto this one
956 spawn(std::move(ctx));
962 auto max = std::min(n, input.size());
964 auto ctx = std::make_shared<WindowContext>(
965 std::move(input), std::move(func));
967 for (size_t i = 0; i < max; ++i) {
968 // Start the first n Futures
969 WindowContext::spawn(ctx);
972 std::vector<Future<Result>> futures;
973 futures.reserve(ctx->promises_.size());
974 for (auto& promise : ctx->promises_) {
975 futures.emplace_back(promise.getFuture());
984 template <class I, class F>
985 Future<I> Future<T>::reduce(I&& initial, F&& func) {
987 minitial = std::forward<I>(initial),
988 mfunc = std::forward<F>(func)
990 auto ret = std::move(minitial);
991 for (auto& val : vals) {
992 ret = mfunc(std::move(ret), std::move(val));
998 // unorderedReduce (iterator)
1000 template <class It, class T, class F, class ItT, class Arg>
1001 Future<T> unorderedReduce(It first, It last, T initial, F func) {
1002 if (first == last) {
1003 return makeFuture(std::move(initial));
1006 typedef isTry<Arg> IsTry;
1008 struct UnorderedReduceContext {
1009 UnorderedReduceContext(T&& memo, F&& fn, size_t n)
1010 : lock_(), memo_(makeFuture<T>(std::move(memo))),
1011 func_(std::move(fn)), numThens_(0), numFutures_(n), promise_()
1013 folly::MicroSpinLock lock_; // protects memo_ and numThens_
1016 size_t numThens_; // how many Futures completed and called .then()
1017 size_t numFutures_; // how many Futures in total
1018 Promise<T> promise_;
1021 auto ctx = std::make_shared<UnorderedReduceContext>(
1022 std::move(initial), std::move(func), std::distance(first, last));
1024 mapSetCallback<ItT>(
1027 [ctx](size_t /* i */, Try<ItT>&& t) {
1028 // Futures can be completed in any order, simultaneously.
1029 // To make this non-blocking, we create a new Future chain in
1030 // the order of completion to reduce the values.
1031 // The spinlock just protects chaining a new Future, not actually
1032 // executing the reduce, which should be really fast.
1033 folly::MSLGuard lock(ctx->lock_);
1035 ctx->memo_.then([ ctx, mt = std::move(t) ](T && v) mutable {
1036 // Either return a ItT&& or a Try<ItT>&& depending
1037 // on the type of the argument of func.
1038 return ctx->func_(std::move(v),
1039 mt.template get<IsTry::value, Arg&&>());
1041 if (++ctx->numThens_ == ctx->numFutures_) {
1042 // After reducing the value of the last Future, fulfill the Promise
1043 ctx->memo_.setCallback_(
1044 [ctx](Try<T>&& t2) { ctx->promise_.setValue(std::move(t2)); });
1048 return ctx->promise_.getFuture();
1054 Future<T> Future<T>::within(Duration dur, Timekeeper* tk) {
1055 return within(dur, TimedOut(), tk);
1060 Future<T> Future<T>::within(Duration dur, E e, Timekeeper* tk) {
1063 Context(E ex) : exception(std::move(ex)), promise() {}
1065 Future<Unit> thisFuture;
1067 std::atomic<bool> token {false};
1070 if (this->isReady()) {
1071 return std::move(*this);
1074 std::shared_ptr<Timekeeper> tks;
1076 tks = folly::detail::getTimekeeperSingleton();
1077 tk = DCHECK_NOTNULL(tks.get());
1080 auto ctx = std::make_shared<Context>(std::move(e));
1082 ctx->thisFuture = this->then([ctx](Try<T>&& t) mutable {
1083 if (ctx->token.exchange(true) == false) {
1084 ctx->promise.setTry(std::move(t));
1088 // Have time keeper use a weak ptr to hold ctx,
1089 // so that ctx can be deallocated as soon as the future job finished.
1090 tk->after(dur).then([weakCtx = to_weak_ptr(ctx)](Try<Unit> const& t) mutable {
1091 auto lockedCtx = weakCtx.lock();
1093 // ctx already released. "this" completed first, cancel "after"
1096 // "after" completed first, cancel "this"
1097 lockedCtx->thisFuture.raise(TimedOut());
1098 if (lockedCtx->token.exchange(true) == false) {
1099 if (t.hasException()) {
1100 lockedCtx->promise.setException(std::move(t.exception()));
1102 lockedCtx->promise.setException(std::move(lockedCtx->exception));
1107 return ctx->promise.getFuture().via(getExecutor());
1113 Future<T> Future<T>::delayed(Duration dur, Timekeeper* tk) {
1114 return collectAll(*this, futures::sleep(dur, tk))
1115 .then([](std::tuple<Try<T>, Try<Unit>> tup) {
1116 Try<T>& t = std::get<0>(tup);
1117 return makeFuture<T>(std::move(t));
1125 void waitImpl(Future<T>& f) {
1126 // short-circuit if there's nothing to do
1127 if (f.isReady()) return;
1129 FutureBatonType baton;
1130 f.setCallback_([&](const Try<T>& /* t */) { baton.post(); });
1132 assert(f.isReady());
1136 void waitImpl(Future<T>& f, Duration dur) {
1137 // short-circuit if there's nothing to do
1143 auto ret = promise.getFuture();
1144 auto baton = std::make_shared<FutureBatonType>();
1145 f.setCallback_([ baton, promise = std::move(promise) ](Try<T> && t) mutable {
1146 promise.setTry(std::move(t));
1150 if (baton->timed_wait(dur)) {
1151 assert(f.isReady());
1156 void waitViaImpl(Future<T>& f, DrivableExecutor* e) {
1157 // Set callback so to ensure that the via executor has something on it
1158 // so that once the preceding future triggers this callback, drive will
1159 // always have a callback to satisfy it
1162 f = f.via(e).then([](T&& t) { return std::move(t); });
1163 while (!f.isReady()) {
1166 assert(f.isReady());
1169 } // namespace detail
1170 } // namespace futures
1173 Future<T>& Future<T>::wait() & {
1174 futures::detail::waitImpl(*this);
1179 Future<T>&& Future<T>::wait() && {
1180 futures::detail::waitImpl(*this);
1181 return std::move(*this);
1185 Future<T>& Future<T>::wait(Duration dur) & {
1186 futures::detail::waitImpl(*this, dur);
1191 Future<T>&& Future<T>::wait(Duration dur) && {
1192 futures::detail::waitImpl(*this, dur);
1193 return std::move(*this);
1197 Future<T>& Future<T>::waitVia(DrivableExecutor* e) & {
1198 futures::detail::waitViaImpl(*this, e);
1203 Future<T>&& Future<T>::waitVia(DrivableExecutor* e) && {
1204 futures::detail::waitViaImpl(*this, e);
1205 return std::move(*this);
1209 T Future<T>::get() {
1210 return std::move(wait().value());
1214 T Future<T>::get(Duration dur) {
1217 return std::move(value());
1224 T Future<T>::getVia(DrivableExecutor* e) {
1225 return std::move(waitVia(e).value());
1232 static bool equals(const Try<T>& t1, const Try<T>& t2) {
1233 return t1.value() == t2.value();
1236 } // namespace detail
1237 } // namespace futures
1240 Future<bool> Future<T>::willEqual(Future<T>& f) {
1241 return collectAll(*this, f).then([](const std::tuple<Try<T>, Try<T>>& t) {
1242 if (std::get<0>(t).hasValue() && std::get<1>(t).hasValue()) {
1243 return futures::detail::TryEquals<T>::equals(
1244 std::get<0>(t), std::get<1>(t));
1253 Future<T> Future<T>::filter(F&& predicate) {
1254 return this->then([p = std::forward<F>(predicate)](T val) {
1255 T const& valConstRef = val;
1256 if (!p(valConstRef)) {
1257 throwPredicateDoesNotObtain();
1264 inline Future<Unit> when(bool p, F&& thunk) {
1265 return p ? std::forward<F>(thunk)().unit() : makeFuture();
1268 template <class P, class F>
1269 Future<Unit> whileDo(P&& predicate, F&& thunk) {
1271 auto future = thunk();
1272 return future.then([
1273 predicate = std::forward<P>(predicate),
1274 thunk = std::forward<F>(thunk)
1276 return whileDo(std::forward<P>(predicate), std::forward<F>(thunk));
1279 return makeFuture();
1283 Future<Unit> times(const int n, F&& thunk) {
1284 return folly::whileDo(
1285 [ n, count = std::make_unique<std::atomic<int>>(0) ]() mutable {
1286 return count->fetch_add(1) < n;
1288 std::forward<F>(thunk));
1292 template <class It, class F, class ItT, class Result>
1293 std::vector<Future<Result>> map(It first, It last, F func) {
1294 std::vector<Future<Result>> results;
1295 for (auto it = first; it != last; it++) {
1296 results.push_back(it->then(func));
1306 struct retrying_policy_raw_tag {};
1307 struct retrying_policy_fut_tag {};
1309 template <class Policy>
1310 struct retrying_policy_traits {
1311 using result = std::result_of_t<Policy(size_t, const exception_wrapper&)>;
1312 using is_raw = std::is_same<result, bool>;
1313 using is_fut = std::is_same<result, Future<bool>>;
1314 using tag = typename std::conditional<
1315 is_raw::value, retrying_policy_raw_tag, typename std::conditional<
1316 is_fut::value, retrying_policy_fut_tag, void>::type>::type;
1319 template <class Policy, class FF, class Prom>
1320 void retryingImpl(size_t k, Policy&& p, FF&& ff, Prom prom) {
1321 using F = typename std::result_of<FF(size_t)>::type;
1322 using T = typename F::value_type;
1323 auto f = makeFutureWith([&] { return ff(k++); });
1326 prom = std::move(prom),
1327 pm = std::forward<Policy>(p),
1328 ffm = std::forward<FF>(ff)
1329 ](Try<T> && t) mutable {
1331 prom.setValue(std::move(t).value());
1334 auto& x = t.exception();
1338 prom = std::move(prom),
1341 ffm = std::move(ffm)
1342 ](bool shouldRetry) mutable {
1344 retryingImpl(k, std::move(pm), std::move(ffm), std::move(prom));
1346 prom.setException(std::move(xm));
1352 template <class Policy, class FF>
1353 typename std::result_of<FF(size_t)>::type
1354 retrying(size_t k, Policy&& p, FF&& ff) {
1355 using F = typename std::result_of<FF(size_t)>::type;
1356 using T = typename F::value_type;
1357 auto prom = Promise<T>();
1358 auto f = prom.getFuture();
1360 k, std::forward<Policy>(p), std::forward<FF>(ff), std::move(prom));
1364 template <class Policy, class FF>
1365 typename std::result_of<FF(size_t)>::type
1366 retrying(Policy&& p, FF&& ff, retrying_policy_raw_tag) {
1367 auto q = [pm = std::forward<Policy>(p)](size_t k, exception_wrapper x) {
1368 return makeFuture<bool>(pm(k, x));
1370 return retrying(0, std::move(q), std::forward<FF>(ff));
1373 template <class Policy, class FF>
1374 typename std::result_of<FF(size_t)>::type
1375 retrying(Policy&& p, FF&& ff, retrying_policy_fut_tag) {
1376 return retrying(0, std::forward<Policy>(p), std::forward<FF>(ff));
1379 // jittered exponential backoff, clamped to [backoff_min, backoff_max]
1380 template <class URNG>
1381 Duration retryingJitteredExponentialBackoffDur(
1383 Duration backoff_min,
1384 Duration backoff_max,
1385 double jitter_param,
1388 auto dist = std::normal_distribution<double>(0.0, jitter_param);
1389 auto jitter = std::exp(dist(rng));
1390 auto backoff = d(d::rep(jitter * backoff_min.count() * std::pow(2, n - 1)));
1391 return std::max(backoff_min, std::min(backoff_max, backoff));
1394 template <class Policy, class URNG>
1395 std::function<Future<bool>(size_t, const exception_wrapper&)>
1396 retryingPolicyCappedJitteredExponentialBackoff(
1398 Duration backoff_min,
1399 Duration backoff_max,
1400 double jitter_param,
1404 pm = std::forward<Policy>(p),
1409 rngp = std::forward<URNG>(rng)
1410 ](size_t n, const exception_wrapper& ex) mutable {
1411 if (n == max_tries) {
1412 return makeFuture(false);
1414 return pm(n, ex).then(
1415 [ n, backoff_min, backoff_max, jitter_param, rngp = std::move(rngp) ](
1418 return makeFuture(false);
1420 auto backoff = detail::retryingJitteredExponentialBackoffDur(
1421 n, backoff_min, backoff_max, jitter_param, rngp);
1422 return futures::sleep(backoff).then([] { return true; });
1427 template <class Policy, class URNG>
1428 std::function<Future<bool>(size_t, const exception_wrapper&)>
1429 retryingPolicyCappedJitteredExponentialBackoff(
1431 Duration backoff_min,
1432 Duration backoff_max,
1433 double jitter_param,
1436 retrying_policy_raw_tag) {
1437 auto q = [pm = std::forward<Policy>(p)](
1438 size_t n, const exception_wrapper& e) {
1439 return makeFuture(pm(n, e));
1441 return retryingPolicyCappedJitteredExponentialBackoff(
1446 std::forward<URNG>(rng),
1450 template <class Policy, class URNG>
1451 std::function<Future<bool>(size_t, const exception_wrapper&)>
1452 retryingPolicyCappedJitteredExponentialBackoff(
1454 Duration backoff_min,
1455 Duration backoff_max,
1456 double jitter_param,
1459 retrying_policy_fut_tag) {
1460 return retryingPolicyCappedJitteredExponentialBackoff(
1465 std::forward<URNG>(rng),
1466 std::forward<Policy>(p));
1470 template <class Policy, class FF>
1471 typename std::result_of<FF(size_t)>::type
1472 retrying(Policy&& p, FF&& ff) {
1473 using tag = typename detail::retrying_policy_traits<Policy>::tag;
1474 return detail::retrying(std::forward<Policy>(p), std::forward<FF>(ff), tag());
1478 std::function<bool(size_t, const exception_wrapper&)>
1479 retryingPolicyBasic(
1481 return [=](size_t n, const exception_wrapper&) { return n < max_tries; };
1484 template <class Policy, class URNG>
1485 std::function<Future<bool>(size_t, const exception_wrapper&)>
1486 retryingPolicyCappedJitteredExponentialBackoff(
1488 Duration backoff_min,
1489 Duration backoff_max,
1490 double jitter_param,
1493 using tag = typename detail::retrying_policy_traits<Policy>::tag;
1494 return detail::retryingPolicyCappedJitteredExponentialBackoff(
1499 std::forward<URNG>(rng),
1500 std::forward<Policy>(p),
1505 std::function<Future<bool>(size_t, const exception_wrapper&)>
1506 retryingPolicyCappedJitteredExponentialBackoff(
1508 Duration backoff_min,
1509 Duration backoff_max,
1510 double jitter_param) {
1511 auto p = [](size_t, const exception_wrapper&) { return true; };
1512 return retryingPolicyCappedJitteredExponentialBackoff(
1523 // Instantiate the most common Future types to save compile time
1524 extern template class Future<Unit>;
1525 extern template class Future<bool>;
1526 extern template class Future<int>;
1527 extern template class Future<int64_t>;
1528 extern template class Future<std::string>;
1529 extern template class Future<double>;
1531 } // namespace folly