2 * Copyright 2017 Facebook, Inc.
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
8 * http://www.apache.org/licenses/LICENSE-2.0
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
25 #include <folly/Baton.h>
26 #include <folly/Optional.h>
27 #include <folly/Random.h>
28 #include <folly/futures/Timekeeper.h>
29 #include <folly/futures/detail/Core.h>
31 #if FOLLY_MOBILE || defined(__APPLE__)
32 #define FOLLY_FUTURE_USING_FIBER 0
34 #define FOLLY_FUTURE_USING_FIBER 1
35 #include <folly/fibers/Baton.h>
44 #if FOLLY_FUTURE_USING_FIBER
45 typedef folly::fibers::Baton FutureBatonType;
47 typedef folly::Baton<> FutureBatonType;
50 } // namespace futures
53 std::shared_ptr<Timekeeper> getTimekeeperSingleton();
58 // Guarantees that the stored functor is destructed before the stored promise
59 // may be fulfilled. Assumes the stored functor to be noexcept-destructible.
60 template <typename T, typename F>
61 class CoreCallbackState {
63 template <typename FF>
64 CoreCallbackState(Promise<T>&& promise, FF&& func) noexcept(
65 noexcept(F(std::declval<FF>())))
66 : func_(std::forward<FF>(func)), promise_(std::move(promise)) {
67 assert(before_barrier());
70 CoreCallbackState(CoreCallbackState&& that) noexcept(
71 noexcept(F(std::declval<F>()))) {
72 if (that.before_barrier()) {
73 new (&func_) F(std::move(that.func_));
74 promise_ = that.stealPromise();
78 CoreCallbackState& operator=(CoreCallbackState&&) = delete;
80 ~CoreCallbackState() {
81 if (before_barrier()) {
86 template <typename... Args>
87 auto invoke(Args&&... args) noexcept(
88 noexcept(std::declval<F&&>()(std::declval<Args&&>()...))) {
89 assert(before_barrier());
90 return std::move(func_)(std::forward<Args>(args)...);
93 template <typename... Args>
94 auto tryInvoke(Args&&... args) noexcept {
95 return makeTryWith([&] { return invoke(std::forward<Args>(args)...); });
98 void setTry(Try<T>&& t) {
99 stealPromise().setTry(std::move(t));
102 void setException(exception_wrapper&& ew) {
103 stealPromise().setException(std::move(ew));
106 Promise<T> stealPromise() noexcept {
107 assert(before_barrier());
109 return std::move(promise_);
113 bool before_barrier() const noexcept {
114 return !promise_.isFulfilled();
120 Promise<T> promise_{Promise<T>::makeEmpty()};
123 template <typename T, typename F>
124 inline auto makeCoreCallbackState(Promise<T>&& p, F&& f) noexcept(
125 noexcept(CoreCallbackState<T, _t<std::decay<F>>>(
126 std::declval<Promise<T>&&>(),
127 std::declval<F&&>()))) {
128 return CoreCallbackState<T, _t<std::decay<F>>>(
129 std::move(p), std::forward<F>(f));
131 } // namespace detail
132 } // namespace futures
135 Future<T> Future<T>::makeEmpty() {
136 return Future<T>(futures::detail::EmptyConstruct{});
140 Future<T>::Future(Future<T>&& other) noexcept : core_(other.core_) {
141 other.core_ = nullptr;
145 Future<T>& Future<T>::operator=(Future<T>&& other) noexcept {
146 std::swap(core_, other.core_);
153 typename std::enable_if<
154 !std::is_same<T, typename std::decay<T2>::type>::value &&
155 std::is_constructible<T, T2&&>::value &&
156 std::is_convertible<T2&&, T>::value,
158 Future<T>::Future(Future<T2>&& other)
159 : Future(std::move(other).then([](T2&& v) { return T(std::move(v)); })) {}
164 typename std::enable_if<
165 !std::is_same<T, typename std::decay<T2>::type>::value &&
166 std::is_constructible<T, T2&&>::value &&
167 !std::is_convertible<T2&&, T>::value,
169 Future<T>::Future(Future<T2>&& other)
170 : Future(std::move(other).then([](T2&& v) { return T(std::move(v)); })) {}
175 typename std::enable_if<
176 !std::is_same<T, typename std::decay<T2>::type>::value &&
177 std::is_constructible<T, T2&&>::value,
179 Future<T>& Future<T>::operator=(Future<T2>&& other) {
181 std::move(other).then([](T2&& v) { return T(std::move(v)); }));
185 template <class T2, typename>
186 Future<T>::Future(T2&& val)
187 : core_(new futures::detail::Core<T>(Try<T>(std::forward<T2>(val)))) {}
190 template <typename T2>
191 Future<T>::Future(typename std::enable_if<std::is_same<Unit, T2>::value>::type*)
192 : core_(new futures::detail::Core<T>(Try<T>(T()))) {}
197 typename std::enable_if<std::is_constructible<T, Args&&...>::value, int>::
199 Future<T>::Future(in_place_t, Args&&... args)
201 new futures::detail::Core<T>(in_place, std::forward<Args>(args)...)) {
205 Future<T>::~Future() {
210 void Future<T>::detach() {
212 core_->detachFuture();
218 void Future<T>::throwIfInvalid() const {
225 void Future<T>::setCallback_(F&& func) {
227 core_->setCallback(std::forward<F>(func));
234 typename std::enable_if<isFuture<F>::value,
235 Future<typename isFuture<T>::Inner>>::type
236 Future<T>::unwrap() {
237 return then([](Future<typename isFuture<T>::Inner> internal_future) {
238 return internal_future;
244 // Variant: returns a value
245 // e.g. f.then([](Try<T>&& t){ return t.value(); });
247 template <typename F, typename R, bool isTry, typename... Args>
248 typename std::enable_if<!R::ReturnsFuture::value, typename R::Return>::type
249 Future<T>::thenImplementation(
251 futures::detail::argResult<isTry, F, Args...>) {
252 static_assert(sizeof...(Args) <= 1, "Then must take zero/one argument");
253 typedef typename R::ReturnsFuture::Inner B;
258 p.core_->setInterruptHandlerNoLock(core_->getInterruptHandler());
260 // grab the Future now before we lose our handle on the Promise
261 auto f = p.getFuture();
262 f.core_->setExecutorNoLock(getExecutor());
264 /* This is a bit tricky.
266 We can't just close over *this in case this Future gets moved. So we
267 make a new dummy Future. We could figure out something more
268 sophisticated that avoids making a new Future object when it can, as an
269 optimization. But this is correct.
271 core_ can't be moved, it is explicitly disallowed (as is copying). But
272 if there's ever a reason to allow it, this is one place that makes that
273 assumption and would need to be fixed. We use a standard shared pointer
274 for core_ (by copying it in), which means in essence obj holds a shared
275 pointer to itself. But this shouldn't leak because Promise will not
276 outlive the continuation, because Promise will setException() with a
277 broken Promise if it is destructed before completed. We could use a
278 weak pointer but it would have to be converted to a shared pointer when
279 func is executed (because the Future returned by func may possibly
280 persist beyond the callback, if it gets moved), and so it is an
281 optimization to just make it shared from the get-go.
283 Two subtle but important points about this design. futures::detail::Core
284 has no back pointers to Future or Promise, so if Future or Promise get
285 moved (and they will be moved in performant code) we don't have to do
286 anything fancy. And because we store the continuation in the
287 futures::detail::Core, not in the Future, we can execute the continuation
288 even after the Future has gone out of scope. This is an intentional design
289 decision. It is likely we will want to be able to cancel a continuation
290 in some circumstances, but I think it should be explicit not implicit
291 in the destruction of the Future used to create it.
294 [state = futures::detail::makeCoreCallbackState(
295 std::move(p), std::forward<F>(func))](Try<T> && t) mutable {
296 if (!isTry && t.hasException()) {
297 state.setException(std::move(t.exception()));
299 state.setTry(makeTryWith(
300 [&] { return state.invoke(t.template get<isTry, Args>()...); }));
307 // Variant: returns a Future
308 // e.g. f.then([](T&& t){ return makeFuture<T>(t); });
310 template <typename F, typename R, bool isTry, typename... Args>
311 typename std::enable_if<R::ReturnsFuture::value, typename R::Return>::type
312 Future<T>::thenImplementation(
314 futures::detail::argResult<isTry, F, Args...>) {
315 static_assert(sizeof...(Args) <= 1, "Then must take zero/one argument");
316 typedef typename R::ReturnsFuture::Inner B;
321 p.core_->setInterruptHandlerNoLock(core_->getInterruptHandler());
323 // grab the Future now before we lose our handle on the Promise
324 auto f = p.getFuture();
325 f.core_->setExecutorNoLock(getExecutor());
328 [state = futures::detail::makeCoreCallbackState(
329 std::move(p), std::forward<F>(func))](Try<T> && t) mutable {
330 if (!isTry && t.hasException()) {
331 state.setException(std::move(t.exception()));
333 auto tf2 = state.tryInvoke(t.template get<isTry, Args>()...);
334 if (tf2.hasException()) {
335 state.setException(std::move(tf2.exception()));
337 tf2->setCallback_([p = state.stealPromise()](Try<B> && b) mutable {
338 p.setTry(std::move(b));
347 template <typename T>
348 template <typename R, typename Caller, typename... Args>
349 Future<typename isFuture<R>::Inner>
350 Future<T>::then(R(Caller::*func)(Args...), Caller *instance) {
351 typedef typename std::remove_cv<typename std::remove_reference<
352 typename futures::detail::ArgType<Args...>::FirstArg>::type>::type
354 return then([instance, func](Try<T>&& t){
355 return (instance->*func)(t.template get<isTry<FirstArg>::value, Args>()...);
360 Future<Unit> Future<T>::then() {
361 return then([] () {});
364 // onError where the callback returns T
367 typename std::enable_if<
368 !futures::detail::callableWith<F, exception_wrapper>::value &&
369 !futures::detail::callableWith<F, exception_wrapper&>::value &&
370 !futures::detail::Extract<F>::ReturnsFuture::value,
372 Future<T>::onError(F&& func) {
373 typedef std::remove_reference_t<
374 typename futures::detail::Extract<F>::FirstArg>
377 std::is_same<typename futures::detail::Extract<F>::RawReturn, T>::value,
378 "Return type of onError callback must be T or Future<T>");
381 p.core_->setInterruptHandlerNoLock(core_->getInterruptHandler());
382 auto f = p.getFuture();
385 [state = futures::detail::makeCoreCallbackState(
386 std::move(p), std::forward<F>(func))](Try<T> && t) mutable {
387 if (auto e = t.template tryGetExceptionObject<Exn>()) {
388 state.setTry(makeTryWith([&] { return state.invoke(*e); }));
390 state.setTry(std::move(t));
397 // onError where the callback returns Future<T>
400 typename std::enable_if<
401 !futures::detail::callableWith<F, exception_wrapper>::value &&
402 !futures::detail::callableWith<F, exception_wrapper&>::value &&
403 futures::detail::Extract<F>::ReturnsFuture::value,
405 Future<T>::onError(F&& func) {
407 std::is_same<typename futures::detail::Extract<F>::Return, Future<T>>::
409 "Return type of onError callback must be T or Future<T>");
410 typedef std::remove_reference_t<
411 typename futures::detail::Extract<F>::FirstArg>
415 auto f = p.getFuture();
418 [state = futures::detail::makeCoreCallbackState(
419 std::move(p), std::forward<F>(func))](Try<T> && t) mutable {
420 if (auto e = t.template tryGetExceptionObject<Exn>()) {
421 auto tf2 = state.tryInvoke(*e);
422 if (tf2.hasException()) {
423 state.setException(std::move(tf2.exception()));
425 tf2->setCallback_([p = state.stealPromise()](Try<T> && t3) mutable {
426 p.setTry(std::move(t3));
430 state.setTry(std::move(t));
439 Future<T> Future<T>::ensure(F&& func) {
440 return this->then([funcw = std::forward<F>(func)](Try<T> && t) mutable {
442 return makeFuture(std::move(t));
448 Future<T> Future<T>::onTimeout(Duration dur, F&& func, Timekeeper* tk) {
449 return within(dur, tk).onError([funcw = std::forward<F>(func)](
450 TimedOut const&) { return std::move(funcw)(); });
455 typename std::enable_if<
456 futures::detail::callableWith<F, exception_wrapper>::value &&
457 futures::detail::Extract<F>::ReturnsFuture::value,
459 Future<T>::onError(F&& func) {
461 std::is_same<typename futures::detail::Extract<F>::Return, Future<T>>::
463 "Return type of onError callback must be T or Future<T>");
466 auto f = p.getFuture();
468 [state = futures::detail::makeCoreCallbackState(
469 std::move(p), std::forward<F>(func))](Try<T> t) mutable {
470 if (t.hasException()) {
471 auto tf2 = state.tryInvoke(std::move(t.exception()));
472 if (tf2.hasException()) {
473 state.setException(std::move(tf2.exception()));
475 tf2->setCallback_([p = state.stealPromise()](Try<T> && t3) mutable {
476 p.setTry(std::move(t3));
480 state.setTry(std::move(t));
487 // onError(exception_wrapper) that returns T
490 typename std::enable_if<
491 futures::detail::callableWith<F, exception_wrapper>::value &&
492 !futures::detail::Extract<F>::ReturnsFuture::value,
494 Future<T>::onError(F&& func) {
496 std::is_same<typename futures::detail::Extract<F>::Return, Future<T>>::
498 "Return type of onError callback must be T or Future<T>");
501 auto f = p.getFuture();
503 [state = futures::detail::makeCoreCallbackState(
504 std::move(p), std::forward<F>(func))](Try<T> && t) mutable {
505 if (t.hasException()) {
506 state.setTry(makeTryWith(
507 [&] { return state.invoke(std::move(t.exception())); }));
509 state.setTry(std::move(t));
517 typename std::add_lvalue_reference<T>::type Future<T>::value() {
520 return core_->getTry().value();
524 typename std::add_lvalue_reference<const T>::type Future<T>::value() const {
527 return core_->getTry().value();
531 Try<T>& Future<T>::getTry() {
534 return core_->getTry();
538 Try<T>& Future<T>::getTryVia(DrivableExecutor* e) {
539 return waitVia(e).getTry();
543 Optional<Try<T>> Future<T>::poll() {
545 if (core_->ready()) {
546 o = std::move(core_->getTry());
552 inline Future<T> Future<T>::via(Executor* executor, int8_t priority) && {
555 setExecutor(executor, priority);
557 return std::move(*this);
561 inline Future<T> Future<T>::via(Executor* executor, int8_t priority) & {
565 auto f = p.getFuture();
566 then([p = std::move(p)](Try<T> && t) mutable { p.setTry(std::move(t)); });
567 return std::move(f).via(executor, priority);
570 template <class Func>
571 auto via(Executor* x, Func&& func)
572 -> Future<typename isFuture<decltype(std::declval<Func>()())>::Inner> {
573 // TODO make this actually more performant. :-P #7260175
574 return via(x).then(std::forward<Func>(func));
578 bool Future<T>::isReady() const {
580 return core_->ready();
584 bool Future<T>::hasValue() {
585 return getTry().hasValue();
589 bool Future<T>::hasException() {
590 return getTry().hasException();
594 void Future<T>::raise(exception_wrapper exception) {
595 core_->raise(std::move(exception));
599 Future<T>::Future(futures::detail::EmptyConstruct) noexcept : core_(nullptr) {}
604 Future<typename std::decay<T>::type> makeFuture(T&& t) {
605 return makeFuture(Try<typename std::decay<T>::type>(std::forward<T>(t)));
608 inline // for multiple translation units
609 Future<Unit> makeFuture() {
610 return makeFuture(Unit{});
613 // makeFutureWith(Future<T>()) -> Future<T>
615 typename std::enable_if<isFuture<typename std::result_of<F()>::type>::value,
616 typename std::result_of<F()>::type>::type
617 makeFutureWith(F&& func) {
619 typename isFuture<typename std::result_of<F()>::type>::Inner;
621 return std::forward<F>(func)();
622 } catch (std::exception& e) {
623 return makeFuture<InnerType>(
624 exception_wrapper(std::current_exception(), e));
626 return makeFuture<InnerType>(exception_wrapper(std::current_exception()));
630 // makeFutureWith(T()) -> Future<T>
631 // makeFutureWith(void()) -> Future<Unit>
633 typename std::enable_if<
634 !(isFuture<typename std::result_of<F()>::type>::value),
635 Future<typename Unit::Lift<typename std::result_of<F()>::type>::type>>::type
636 makeFutureWith(F&& func) {
638 typename Unit::Lift<typename std::result_of<F()>::type>::type;
639 return makeFuture<LiftedResult>(
640 makeTryWith([&func]() mutable { return std::forward<F>(func)(); }));
644 Future<T> makeFuture(std::exception_ptr const& e) {
645 return makeFuture(Try<T>(e));
649 Future<T> makeFuture(exception_wrapper ew) {
650 return makeFuture(Try<T>(std::move(ew)));
653 template <class T, class E>
654 typename std::enable_if<std::is_base_of<std::exception, E>::value,
656 makeFuture(E const& e) {
657 return makeFuture(Try<T>(make_exception_wrapper<E>(e)));
661 Future<T> makeFuture(Try<T>&& t) {
662 return Future<T>(new futures::detail::Core<T>(std::move(t)));
666 Future<Unit> via(Executor* executor, int8_t priority) {
667 return makeFuture().via(executor, priority);
670 // mapSetCallback calls func(i, Try<T>) when every future completes
672 template <class T, class InputIterator, class F>
673 void mapSetCallback(InputIterator first, InputIterator last, F func) {
674 for (size_t i = 0; first != last; ++first, ++i) {
675 first->setCallback_([func, i](Try<T>&& t) {
676 func(i, std::move(t));
681 // collectAll (variadic)
683 template <typename... Fs>
684 typename futures::detail::CollectAllVariadicContext<
685 typename std::decay<Fs>::type::value_type...>::type
686 collectAll(Fs&&... fs) {
687 auto ctx = std::make_shared<futures::detail::CollectAllVariadicContext<
688 typename std::decay<Fs>::type::value_type...>>();
689 futures::detail::collectVariadicHelper<
690 futures::detail::CollectAllVariadicContext>(ctx, std::forward<Fs>(fs)...);
691 return ctx->p.getFuture();
694 // collectAll (iterator)
696 template <class InputIterator>
699 Try<typename std::iterator_traits<InputIterator>::value_type::value_type>>>
700 collectAll(InputIterator first, InputIterator last) {
702 typename std::iterator_traits<InputIterator>::value_type::value_type T;
704 struct CollectAllContext {
705 CollectAllContext(size_t n) : results(n) {}
706 ~CollectAllContext() {
707 p.setValue(std::move(results));
709 Promise<std::vector<Try<T>>> p;
710 std::vector<Try<T>> results;
714 std::make_shared<CollectAllContext>(size_t(std::distance(first, last)));
715 mapSetCallback<T>(first, last, [ctx](size_t i, Try<T>&& t) {
716 ctx->results[i] = std::move(t);
718 return ctx->p.getFuture();
721 // collect (iterator)
726 template <typename T>
727 struct CollectContext {
729 explicit Nothing(int /* n */) {}
732 using Result = typename std::conditional<
733 std::is_void<T>::value,
735 std::vector<T>>::type;
737 using InternalResult = typename std::conditional<
738 std::is_void<T>::value,
740 std::vector<Optional<T>>>::type;
742 explicit CollectContext(size_t n) : result(n) {}
744 if (!threw.exchange(true)) {
745 // map Optional<T> -> T
746 std::vector<T> finalResult;
747 finalResult.reserve(result.size());
748 std::transform(result.begin(), result.end(),
749 std::back_inserter(finalResult),
750 [](Optional<T>& o) { return std::move(o.value()); });
751 p.setValue(std::move(finalResult));
754 inline void setPartialResult(size_t i, Try<T>& t) {
755 result[i] = std::move(t.value());
758 InternalResult result;
759 std::atomic<bool> threw {false};
762 } // namespace detail
763 } // namespace futures
765 template <class InputIterator>
766 Future<typename futures::detail::CollectContext<typename std::iterator_traits<
767 InputIterator>::value_type::value_type>::Result>
768 collect(InputIterator first, InputIterator last) {
770 typename std::iterator_traits<InputIterator>::value_type::value_type T;
772 auto ctx = std::make_shared<futures::detail::CollectContext<T>>(
773 std::distance(first, last));
774 mapSetCallback<T>(first, last, [ctx](size_t i, Try<T>&& t) {
775 if (t.hasException()) {
776 if (!ctx->threw.exchange(true)) {
777 ctx->p.setException(std::move(t.exception()));
779 } else if (!ctx->threw) {
780 ctx->setPartialResult(i, t);
783 return ctx->p.getFuture();
786 // collect (variadic)
788 template <typename... Fs>
789 typename futures::detail::CollectVariadicContext<
790 typename std::decay<Fs>::type::value_type...>::type
791 collect(Fs&&... fs) {
792 auto ctx = std::make_shared<futures::detail::CollectVariadicContext<
793 typename std::decay<Fs>::type::value_type...>>();
794 futures::detail::collectVariadicHelper<
795 futures::detail::CollectVariadicContext>(ctx, std::forward<Fs>(fs)...);
796 return ctx->p.getFuture();
799 // collectAny (iterator)
801 template <class InputIterator>
806 std::iterator_traits<InputIterator>::value_type::value_type>>>
807 collectAny(InputIterator first, InputIterator last) {
809 typename std::iterator_traits<InputIterator>::value_type::value_type T;
811 struct CollectAnyContext {
812 CollectAnyContext() {}
813 Promise<std::pair<size_t, Try<T>>> p;
814 std::atomic<bool> done {false};
817 auto ctx = std::make_shared<CollectAnyContext>();
818 mapSetCallback<T>(first, last, [ctx](size_t i, Try<T>&& t) {
819 if (!ctx->done.exchange(true)) {
820 ctx->p.setValue(std::make_pair(i, std::move(t)));
823 return ctx->p.getFuture();
826 // collectAnyWithoutException (iterator)
828 template <class InputIterator>
831 typename std::iterator_traits<InputIterator>::value_type::value_type>>
832 collectAnyWithoutException(InputIterator first, InputIterator last) {
834 typename std::iterator_traits<InputIterator>::value_type::value_type T;
836 struct CollectAnyWithoutExceptionContext {
837 CollectAnyWithoutExceptionContext(){}
838 Promise<std::pair<size_t, T>> p;
839 std::atomic<bool> done{false};
840 std::atomic<size_t> nFulfilled{0};
844 auto ctx = std::make_shared<CollectAnyWithoutExceptionContext>();
845 ctx->nTotal = size_t(std::distance(first, last));
847 mapSetCallback<T>(first, last, [ctx](size_t i, Try<T>&& t) {
848 if (!t.hasException() && !ctx->done.exchange(true)) {
849 ctx->p.setValue(std::make_pair(i, std::move(t.value())));
850 } else if (++ctx->nFulfilled == ctx->nTotal) {
851 ctx->p.setException(t.exception());
854 return ctx->p.getFuture();
857 // collectN (iterator)
859 template <class InputIterator>
860 Future<std::vector<std::pair<size_t, Try<typename
861 std::iterator_traits<InputIterator>::value_type::value_type>>>>
862 collectN(InputIterator first, InputIterator last, size_t n) {
864 std::iterator_traits<InputIterator>::value_type::value_type T;
865 typedef std::vector<std::pair<size_t, Try<T>>> V;
867 struct CollectNContext {
869 std::atomic<size_t> completed = {0};
872 auto ctx = std::make_shared<CollectNContext>();
874 if (size_t(std::distance(first, last)) < n) {
875 ctx->p.setException(std::runtime_error("Not enough futures"));
877 // for each completed Future, increase count and add to vector, until we
878 // have n completed futures at which point we fulfil our Promise with the
880 mapSetCallback<T>(first, last, [ctx, n](size_t i, Try<T>&& t) {
881 auto c = ++ctx->completed;
883 assert(ctx->v.size() < n);
884 ctx->v.emplace_back(i, std::move(t));
886 ctx->p.setTry(Try<V>(std::move(ctx->v)));
892 return ctx->p.getFuture();
897 template <class It, class T, class F>
898 Future<T> reduce(It first, It last, T&& initial, F&& func) {
900 return makeFuture(std::move(initial));
903 typedef typename std::iterator_traits<It>::value_type::value_type ItT;
904 typedef typename std::conditional<
905 futures::detail::callableWith<F, T&&, Try<ItT>&&>::value,
908 typedef isTry<Arg> IsTry;
910 auto sfunc = std::make_shared<F>(std::move(func));
912 auto f = first->then(
913 [ minitial = std::move(initial), sfunc ](Try<ItT> & head) mutable {
915 std::move(minitial), head.template get<IsTry::value, Arg&&>());
918 for (++first; first != last; ++first) {
919 f = collectAll(f, *first).then([sfunc](std::tuple<Try<T>, Try<ItT>>& t) {
920 return (*sfunc)(std::move(std::get<0>(t).value()),
921 // Either return a ItT&& or a Try<ItT>&& depending
922 // on the type of the argument of func.
923 std::get<1>(t).template get<IsTry::value, Arg&&>());
930 // window (collection)
932 template <class Collection, class F, class ItT, class Result>
933 std::vector<Future<Result>>
934 window(Collection input, F func, size_t n) {
935 struct WindowContext {
936 WindowContext(Collection&& i, F&& fn)
937 : input_(std::move(i)), promises_(input_.size()),
940 std::atomic<size_t> i_ {0};
942 std::vector<Promise<Result>> promises_;
945 static inline void spawn(const std::shared_ptr<WindowContext>& ctx) {
946 size_t i = ctx->i_++;
947 if (i < ctx->input_.size()) {
948 // Using setCallback_ directly since we don't need the Future
949 ctx->func_(std::move(ctx->input_[i])).setCallback_(
950 // ctx is captured by value
951 [ctx, i](Try<Result>&& t) {
952 ctx->promises_[i].setTry(std::move(t));
953 // Chain another future onto this one
954 spawn(std::move(ctx));
960 auto max = std::min(n, input.size());
962 auto ctx = std::make_shared<WindowContext>(
963 std::move(input), std::move(func));
965 for (size_t i = 0; i < max; ++i) {
966 // Start the first n Futures
967 WindowContext::spawn(ctx);
970 std::vector<Future<Result>> futures;
971 futures.reserve(ctx->promises_.size());
972 for (auto& promise : ctx->promises_) {
973 futures.emplace_back(promise.getFuture());
982 template <class I, class F>
983 Future<I> Future<T>::reduce(I&& initial, F&& func) {
985 minitial = std::forward<I>(initial),
986 mfunc = std::forward<F>(func)
988 auto ret = std::move(minitial);
989 for (auto& val : vals) {
990 ret = mfunc(std::move(ret), std::move(val));
996 // unorderedReduce (iterator)
998 template <class It, class T, class F, class ItT, class Arg>
999 Future<T> unorderedReduce(It first, It last, T initial, F func) {
1000 if (first == last) {
1001 return makeFuture(std::move(initial));
1004 typedef isTry<Arg> IsTry;
1006 struct UnorderedReduceContext {
1007 UnorderedReduceContext(T&& memo, F&& fn, size_t n)
1008 : lock_(), memo_(makeFuture<T>(std::move(memo))),
1009 func_(std::move(fn)), numThens_(0), numFutures_(n), promise_()
1011 folly::MicroSpinLock lock_; // protects memo_ and numThens_
1014 size_t numThens_; // how many Futures completed and called .then()
1015 size_t numFutures_; // how many Futures in total
1016 Promise<T> promise_;
1019 auto ctx = std::make_shared<UnorderedReduceContext>(
1020 std::move(initial), std::move(func), std::distance(first, last));
1022 mapSetCallback<ItT>(
1025 [ctx](size_t /* i */, Try<ItT>&& t) {
1026 // Futures can be completed in any order, simultaneously.
1027 // To make this non-blocking, we create a new Future chain in
1028 // the order of completion to reduce the values.
1029 // The spinlock just protects chaining a new Future, not actually
1030 // executing the reduce, which should be really fast.
1031 folly::MSLGuard lock(ctx->lock_);
1033 ctx->memo_.then([ ctx, mt = std::move(t) ](T && v) mutable {
1034 // Either return a ItT&& or a Try<ItT>&& depending
1035 // on the type of the argument of func.
1036 return ctx->func_(std::move(v),
1037 mt.template get<IsTry::value, Arg&&>());
1039 if (++ctx->numThens_ == ctx->numFutures_) {
1040 // After reducing the value of the last Future, fulfill the Promise
1041 ctx->memo_.setCallback_(
1042 [ctx](Try<T>&& t2) { ctx->promise_.setValue(std::move(t2)); });
1046 return ctx->promise_.getFuture();
1052 Future<T> Future<T>::within(Duration dur, Timekeeper* tk) {
1053 return within(dur, TimedOut(), tk);
1058 Future<T> Future<T>::within(Duration dur, E e, Timekeeper* tk) {
1061 Context(E ex) : exception(std::move(ex)), promise() {}
1063 Future<Unit> thisFuture;
1065 std::atomic<bool> token {false};
1068 std::shared_ptr<Timekeeper> tks;
1070 tks = folly::detail::getTimekeeperSingleton();
1071 tk = DCHECK_NOTNULL(tks.get());
1074 auto ctx = std::make_shared<Context>(std::move(e));
1076 ctx->thisFuture = this->then([ctx](Try<T>&& t) mutable {
1077 // TODO: "this" completed first, cancel "after"
1078 if (ctx->token.exchange(true) == false) {
1079 ctx->promise.setTry(std::move(t));
1083 tk->after(dur).then([ctx](Try<Unit> const& t) mutable {
1084 // "after" completed first, cancel "this"
1085 ctx->thisFuture.raise(TimedOut());
1086 if (ctx->token.exchange(true) == false) {
1087 if (t.hasException()) {
1088 ctx->promise.setException(std::move(t.exception()));
1090 ctx->promise.setException(std::move(ctx->exception));
1095 return ctx->promise.getFuture().via(getExecutor());
1101 Future<T> Future<T>::delayed(Duration dur, Timekeeper* tk) {
1102 return collectAll(*this, futures::sleep(dur, tk))
1103 .then([](std::tuple<Try<T>, Try<Unit>> tup) {
1104 Try<T>& t = std::get<0>(tup);
1105 return makeFuture<T>(std::move(t));
1113 void waitImpl(Future<T>& f) {
1114 // short-circuit if there's nothing to do
1115 if (f.isReady()) return;
1117 FutureBatonType baton;
1118 f.setCallback_([&](const Try<T>& /* t */) { baton.post(); });
1120 assert(f.isReady());
1124 void waitImpl(Future<T>& f, Duration dur) {
1125 // short-circuit if there's nothing to do
1131 auto ret = promise.getFuture();
1132 auto baton = std::make_shared<FutureBatonType>();
1133 f.setCallback_([ baton, promise = std::move(promise) ](Try<T> && t) mutable {
1134 promise.setTry(std::move(t));
1138 if (baton->timed_wait(dur)) {
1139 assert(f.isReady());
1144 void waitViaImpl(Future<T>& f, DrivableExecutor* e) {
1145 // Set callback so to ensure that the via executor has something on it
1146 // so that once the preceding future triggers this callback, drive will
1147 // always have a callback to satisfy it
1150 f = f.via(e).then([](T&& t) { return std::move(t); });
1151 while (!f.isReady()) {
1154 assert(f.isReady());
1157 } // namespace detail
1158 } // namespace futures
1161 Future<T>& Future<T>::wait() & {
1162 futures::detail::waitImpl(*this);
1167 Future<T>&& Future<T>::wait() && {
1168 futures::detail::waitImpl(*this);
1169 return std::move(*this);
1173 Future<T>& Future<T>::wait(Duration dur) & {
1174 futures::detail::waitImpl(*this, dur);
1179 Future<T>&& Future<T>::wait(Duration dur) && {
1180 futures::detail::waitImpl(*this, dur);
1181 return std::move(*this);
1185 Future<T>& Future<T>::waitVia(DrivableExecutor* e) & {
1186 futures::detail::waitViaImpl(*this, e);
1191 Future<T>&& Future<T>::waitVia(DrivableExecutor* e) && {
1192 futures::detail::waitViaImpl(*this, e);
1193 return std::move(*this);
1197 T Future<T>::get() {
1198 return std::move(wait().value());
1202 T Future<T>::get(Duration dur) {
1205 return std::move(value());
1212 T Future<T>::getVia(DrivableExecutor* e) {
1213 return std::move(waitVia(e).value());
1220 static bool equals(const Try<T>& t1, const Try<T>& t2) {
1221 return t1.value() == t2.value();
1224 } // namespace detail
1225 } // namespace futures
1228 Future<bool> Future<T>::willEqual(Future<T>& f) {
1229 return collectAll(*this, f).then([](const std::tuple<Try<T>, Try<T>>& t) {
1230 if (std::get<0>(t).hasValue() && std::get<1>(t).hasValue()) {
1231 return futures::detail::TryEquals<T>::equals(
1232 std::get<0>(t), std::get<1>(t));
1241 Future<T> Future<T>::filter(F&& predicate) {
1242 return this->then([p = std::forward<F>(predicate)](T val) {
1243 T const& valConstRef = val;
1244 if (!p(valConstRef)) {
1245 throwPredicateDoesNotObtain();
1252 inline Future<Unit> when(bool p, F&& thunk) {
1253 return p ? std::forward<F>(thunk)().unit() : makeFuture();
1256 template <class P, class F>
1257 Future<Unit> whileDo(P&& predicate, F&& thunk) {
1259 auto future = thunk();
1260 return future.then([
1261 predicate = std::forward<P>(predicate),
1262 thunk = std::forward<F>(thunk)
1264 return whileDo(std::forward<P>(predicate), std::forward<F>(thunk));
1267 return makeFuture();
1271 Future<Unit> times(const int n, F&& thunk) {
1272 return folly::whileDo(
1273 [ n, count = std::make_unique<std::atomic<int>>(0) ]() mutable {
1274 return count->fetch_add(1) < n;
1276 std::forward<F>(thunk));
1280 template <class It, class F, class ItT, class Result>
1281 std::vector<Future<Result>> map(It first, It last, F func) {
1282 std::vector<Future<Result>> results;
1283 for (auto it = first; it != last; it++) {
1284 results.push_back(it->then(func));
1294 struct retrying_policy_raw_tag {};
1295 struct retrying_policy_fut_tag {};
1297 template <class Policy>
1298 struct retrying_policy_traits {
1299 using result = std::result_of_t<Policy(size_t, const exception_wrapper&)>;
1300 using is_raw = std::is_same<result, bool>;
1301 using is_fut = std::is_same<result, Future<bool>>;
1302 using tag = typename std::conditional<
1303 is_raw::value, retrying_policy_raw_tag, typename std::conditional<
1304 is_fut::value, retrying_policy_fut_tag, void>::type>::type;
1307 template <class Policy, class FF, class Prom>
1308 void retryingImpl(size_t k, Policy&& p, FF&& ff, Prom prom) {
1309 using F = typename std::result_of<FF(size_t)>::type;
1310 using T = typename F::value_type;
1311 auto f = makeFutureWith([&] { return ff(k++); });
1314 prom = std::move(prom),
1315 pm = std::forward<Policy>(p),
1316 ffm = std::forward<FF>(ff)
1317 ](Try<T> && t) mutable {
1319 prom.setValue(std::move(t).value());
1322 auto& x = t.exception();
1326 prom = std::move(prom),
1329 ffm = std::move(ffm)
1330 ](bool shouldRetry) mutable {
1332 retryingImpl(k, std::move(pm), std::move(ffm), std::move(prom));
1334 prom.setException(std::move(xm));
1340 template <class Policy, class FF>
1341 typename std::result_of<FF(size_t)>::type
1342 retrying(size_t k, Policy&& p, FF&& ff) {
1343 using F = typename std::result_of<FF(size_t)>::type;
1344 using T = typename F::value_type;
1345 auto prom = Promise<T>();
1346 auto f = prom.getFuture();
1348 k, std::forward<Policy>(p), std::forward<FF>(ff), std::move(prom));
1352 template <class Policy, class FF>
1353 typename std::result_of<FF(size_t)>::type
1354 retrying(Policy&& p, FF&& ff, retrying_policy_raw_tag) {
1355 auto q = [pm = std::forward<Policy>(p)](size_t k, exception_wrapper x) {
1356 return makeFuture<bool>(pm(k, x));
1358 return retrying(0, std::move(q), std::forward<FF>(ff));
1361 template <class Policy, class FF>
1362 typename std::result_of<FF(size_t)>::type
1363 retrying(Policy&& p, FF&& ff, retrying_policy_fut_tag) {
1364 return retrying(0, std::forward<Policy>(p), std::forward<FF>(ff));
1367 // jittered exponential backoff, clamped to [backoff_min, backoff_max]
1368 template <class URNG>
1369 Duration retryingJitteredExponentialBackoffDur(
1371 Duration backoff_min,
1372 Duration backoff_max,
1373 double jitter_param,
1376 auto dist = std::normal_distribution<double>(0.0, jitter_param);
1377 auto jitter = std::exp(dist(rng));
1378 auto backoff = d(d::rep(jitter * backoff_min.count() * std::pow(2, n - 1)));
1379 return std::max(backoff_min, std::min(backoff_max, backoff));
1382 template <class Policy, class URNG>
1383 std::function<Future<bool>(size_t, const exception_wrapper&)>
1384 retryingPolicyCappedJitteredExponentialBackoff(
1386 Duration backoff_min,
1387 Duration backoff_max,
1388 double jitter_param,
1392 pm = std::forward<Policy>(p),
1397 rngp = std::forward<URNG>(rng)
1398 ](size_t n, const exception_wrapper& ex) mutable {
1399 if (n == max_tries) {
1400 return makeFuture(false);
1402 return pm(n, ex).then(
1403 [ n, backoff_min, backoff_max, jitter_param, rngp = std::move(rngp) ](
1406 return makeFuture(false);
1408 auto backoff = detail::retryingJitteredExponentialBackoffDur(
1409 n, backoff_min, backoff_max, jitter_param, rngp);
1410 return futures::sleep(backoff).then([] { return true; });
1415 template <class Policy, class URNG>
1416 std::function<Future<bool>(size_t, const exception_wrapper&)>
1417 retryingPolicyCappedJitteredExponentialBackoff(
1419 Duration backoff_min,
1420 Duration backoff_max,
1421 double jitter_param,
1424 retrying_policy_raw_tag) {
1425 auto q = [pm = std::forward<Policy>(p)](
1426 size_t n, const exception_wrapper& e) {
1427 return makeFuture(pm(n, e));
1429 return retryingPolicyCappedJitteredExponentialBackoff(
1434 std::forward<URNG>(rng),
1438 template <class Policy, class URNG>
1439 std::function<Future<bool>(size_t, const exception_wrapper&)>
1440 retryingPolicyCappedJitteredExponentialBackoff(
1442 Duration backoff_min,
1443 Duration backoff_max,
1444 double jitter_param,
1447 retrying_policy_fut_tag) {
1448 return retryingPolicyCappedJitteredExponentialBackoff(
1453 std::forward<URNG>(rng),
1454 std::forward<Policy>(p));
1458 template <class Policy, class FF>
1459 typename std::result_of<FF(size_t)>::type
1460 retrying(Policy&& p, FF&& ff) {
1461 using tag = typename detail::retrying_policy_traits<Policy>::tag;
1462 return detail::retrying(std::forward<Policy>(p), std::forward<FF>(ff), tag());
1466 std::function<bool(size_t, const exception_wrapper&)>
1467 retryingPolicyBasic(
1469 return [=](size_t n, const exception_wrapper&) { return n < max_tries; };
1472 template <class Policy, class URNG>
1473 std::function<Future<bool>(size_t, const exception_wrapper&)>
1474 retryingPolicyCappedJitteredExponentialBackoff(
1476 Duration backoff_min,
1477 Duration backoff_max,
1478 double jitter_param,
1481 using tag = typename detail::retrying_policy_traits<Policy>::tag;
1482 return detail::retryingPolicyCappedJitteredExponentialBackoff(
1487 std::forward<URNG>(rng),
1488 std::forward<Policy>(p),
1493 std::function<Future<bool>(size_t, const exception_wrapper&)>
1494 retryingPolicyCappedJitteredExponentialBackoff(
1496 Duration backoff_min,
1497 Duration backoff_max,
1498 double jitter_param) {
1499 auto p = [](size_t, const exception_wrapper&) { return true; };
1500 return retryingPolicyCappedJitteredExponentialBackoff(
1511 // Instantiate the most common Future types to save compile time
1512 extern template class Future<Unit>;
1513 extern template class Future<bool>;
1514 extern template class Future<int>;
1515 extern template class Future<int64_t>;
1516 extern template class Future<std::string>;
1517 extern template class Future<double>;
1519 } // namespace folly