2 * Copyright 2017 Facebook, Inc.
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
8 * http://www.apache.org/licenses/LICENSE-2.0
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
25 #include <folly/Baton.h>
26 #include <folly/Optional.h>
27 #include <folly/Random.h>
28 #include <folly/futures/Timekeeper.h>
29 #include <folly/futures/detail/Core.h>
31 #if FOLLY_MOBILE || defined(__APPLE__)
32 #define FOLLY_FUTURE_USING_FIBER 0
34 #define FOLLY_FUTURE_USING_FIBER 1
35 #include <folly/fibers/Baton.h>
43 #if FOLLY_FUTURE_USING_FIBER
44 typedef folly::fibers::Baton FutureBatonType;
46 typedef folly::Baton<> FutureBatonType;
51 std::shared_ptr<Timekeeper> getTimekeeperSingleton();
53 // Guarantees that the stored functor is destructed before the stored promise
54 // may be fulfilled. Assumes the stored functor to be noexcept-destructible.
55 template <typename T, typename F>
56 class CoreCallbackState {
58 template <typename FF>
59 CoreCallbackState(Promise<T>&& promise, FF&& func) noexcept(
60 noexcept(F(std::declval<FF>())))
61 : func_(std::forward<FF>(func)), promise_(std::move(promise)) {
62 assert(before_barrier());
65 CoreCallbackState(CoreCallbackState&& that) noexcept(
66 noexcept(F(std::declval<F>()))) {
67 if (that.before_barrier()) {
68 new (&func_) F(std::move(that.func_));
69 promise_ = that.stealPromise();
73 CoreCallbackState& operator=(CoreCallbackState&&) = delete;
75 ~CoreCallbackState() {
76 if (before_barrier()) {
81 template <typename... Args>
82 auto invoke(Args&&... args) noexcept(
83 noexcept(std::declval<F&&>()(std::declval<Args&&>()...))) {
84 assert(before_barrier());
85 return std::move(func_)(std::forward<Args>(args)...);
88 template <typename... Args>
89 auto tryInvoke(Args&&... args) noexcept {
90 return makeTryWith([&] { return invoke(std::forward<Args>(args)...); });
93 void setTry(Try<T>&& t) {
94 stealPromise().setTry(std::move(t));
97 void setException(exception_wrapper&& ew) {
98 stealPromise().setException(std::move(ew));
101 Promise<T> stealPromise() noexcept {
102 assert(before_barrier());
104 return std::move(promise_);
108 bool before_barrier() const noexcept {
109 return !promise_.isFulfilled();
115 Promise<T> promise_{Promise<T>::makeEmpty()};
118 template <typename T, typename F>
119 inline auto makeCoreCallbackState(Promise<T>&& p, F&& f) noexcept(
120 noexcept(CoreCallbackState<T, _t<std::decay<F>>>(
121 std::declval<Promise<T>&&>(),
122 std::declval<F&&>()))) {
123 return CoreCallbackState<T, _t<std::decay<F>>>(
124 std::move(p), std::forward<F>(f));
129 Future<T> Future<T>::makeEmpty() {
130 return Future<T>(detail::EmptyConstruct{});
134 Future<T>::Future(Future<T>&& other) noexcept : core_(other.core_) {
135 other.core_ = nullptr;
139 Future<T>& Future<T>::operator=(Future<T>&& other) noexcept {
140 std::swap(core_, other.core_);
147 typename std::enable_if<
148 !std::is_same<T, typename std::decay<T2>::type>::value &&
149 std::is_constructible<T, T2&&>::value &&
150 std::is_convertible<T2&&, T>::value,
152 Future<T>::Future(Future<T2>&& other)
153 : Future(std::move(other).then([](T2&& v) { return T(std::move(v)); })) {}
158 typename std::enable_if<
159 !std::is_same<T, typename std::decay<T2>::type>::value &&
160 std::is_constructible<T, T2&&>::value &&
161 !std::is_convertible<T2&&, T>::value,
163 Future<T>::Future(Future<T2>&& other)
164 : Future(std::move(other).then([](T2&& v) { return T(std::move(v)); })) {}
169 typename std::enable_if<
170 !std::is_same<T, typename std::decay<T2>::type>::value &&
171 std::is_constructible<T, T2&&>::value,
173 Future<T>& Future<T>::operator=(Future<T2>&& other) {
175 std::move(other).then([](T2&& v) { return T(std::move(v)); }));
179 template <class T2, typename>
180 Future<T>::Future(T2&& val)
181 : core_(new detail::Core<T>(Try<T>(std::forward<T2>(val)))) {}
184 template <typename T2>
185 Future<T>::Future(typename std::enable_if<std::is_same<Unit, T2>::value>::type*)
186 : core_(new detail::Core<T>(Try<T>(T()))) {}
191 typename std::enable_if<std::is_constructible<T, Args&&...>::value, int>::
193 Future<T>::Future(in_place_t, Args&&... args)
194 : core_(new detail::Core<T>(in_place, std::forward<Args>(args)...)) {}
197 Future<T>::~Future() {
202 void Future<T>::detach() {
204 core_->detachFuture();
210 void Future<T>::throwIfInvalid() const {
217 void Future<T>::setCallback_(F&& func) {
219 core_->setCallback(std::forward<F>(func));
226 typename std::enable_if<isFuture<F>::value,
227 Future<typename isFuture<T>::Inner>>::type
228 Future<T>::unwrap() {
229 return then([](Future<typename isFuture<T>::Inner> internal_future) {
230 return internal_future;
236 // Variant: returns a value
237 // e.g. f.then([](Try<T>&& t){ return t.value(); });
239 template <typename F, typename R, bool isTry, typename... Args>
240 typename std::enable_if<!R::ReturnsFuture::value, typename R::Return>::type
241 Future<T>::thenImplementation(F&& func, detail::argResult<isTry, F, Args...>) {
242 static_assert(sizeof...(Args) <= 1, "Then must take zero/one argument");
243 typedef typename R::ReturnsFuture::Inner B;
248 p.core_->setInterruptHandlerNoLock(core_->getInterruptHandler());
250 // grab the Future now before we lose our handle on the Promise
251 auto f = p.getFuture();
252 f.core_->setExecutorNoLock(getExecutor());
254 /* This is a bit tricky.
256 We can't just close over *this in case this Future gets moved. So we
257 make a new dummy Future. We could figure out something more
258 sophisticated that avoids making a new Future object when it can, as an
259 optimization. But this is correct.
261 core_ can't be moved, it is explicitly disallowed (as is copying). But
262 if there's ever a reason to allow it, this is one place that makes that
263 assumption and would need to be fixed. We use a standard shared pointer
264 for core_ (by copying it in), which means in essence obj holds a shared
265 pointer to itself. But this shouldn't leak because Promise will not
266 outlive the continuation, because Promise will setException() with a
267 broken Promise if it is destructed before completed. We could use a
268 weak pointer but it would have to be converted to a shared pointer when
269 func is executed (because the Future returned by func may possibly
270 persist beyond the callback, if it gets moved), and so it is an
271 optimization to just make it shared from the get-go.
273 Two subtle but important points about this design. detail::Core has no
274 back pointers to Future or Promise, so if Future or Promise get moved
275 (and they will be moved in performant code) we don't have to do
276 anything fancy. And because we store the continuation in the
277 detail::Core, not in the Future, we can execute the continuation even
278 after the Future has gone out of scope. This is an intentional design
279 decision. It is likely we will want to be able to cancel a continuation
280 in some circumstances, but I think it should be explicit not implicit
281 in the destruction of the Future used to create it.
284 [state = detail::makeCoreCallbackState(
285 std::move(p), std::forward<F>(func))](Try<T> && t) mutable {
286 if (!isTry && t.hasException()) {
287 state.setException(std::move(t.exception()));
289 state.setTry(makeTryWith(
290 [&] { return state.invoke(t.template get<isTry, Args>()...); }));
297 // Variant: returns a Future
298 // e.g. f.then([](T&& t){ return makeFuture<T>(t); });
300 template <typename F, typename R, bool isTry, typename... Args>
301 typename std::enable_if<R::ReturnsFuture::value, typename R::Return>::type
302 Future<T>::thenImplementation(F&& func, detail::argResult<isTry, F, Args...>) {
303 static_assert(sizeof...(Args) <= 1, "Then must take zero/one argument");
304 typedef typename R::ReturnsFuture::Inner B;
309 p.core_->setInterruptHandlerNoLock(core_->getInterruptHandler());
311 // grab the Future now before we lose our handle on the Promise
312 auto f = p.getFuture();
313 f.core_->setExecutorNoLock(getExecutor());
316 [state = detail::makeCoreCallbackState(
317 std::move(p), std::forward<F>(func))](Try<T> && t) mutable {
318 if (!isTry && t.hasException()) {
319 state.setException(std::move(t.exception()));
321 auto tf2 = state.tryInvoke(t.template get<isTry, Args>()...);
322 if (tf2.hasException()) {
323 state.setException(std::move(tf2.exception()));
325 tf2->setCallback_([p = state.stealPromise()](Try<B> && b) mutable {
326 p.setTry(std::move(b));
335 template <typename T>
336 template <typename R, typename Caller, typename... Args>
337 Future<typename isFuture<R>::Inner>
338 Future<T>::then(R(Caller::*func)(Args...), Caller *instance) {
339 typedef typename std::remove_cv<
340 typename std::remove_reference<
341 typename detail::ArgType<Args...>::FirstArg>::type>::type FirstArg;
342 return then([instance, func](Try<T>&& t){
343 return (instance->*func)(t.template get<isTry<FirstArg>::value, Args>()...);
348 Future<Unit> Future<T>::then() {
349 return then([] () {});
352 // onError where the callback returns T
355 typename std::enable_if<
356 !detail::callableWith<F, exception_wrapper>::value &&
357 !detail::Extract<F>::ReturnsFuture::value,
359 Future<T>::onError(F&& func) {
360 typedef std::remove_reference_t<typename detail::Extract<F>::FirstArg> Exn;
362 std::is_same<typename detail::Extract<F>::RawReturn, T>::value,
363 "Return type of onError callback must be T or Future<T>");
366 p.core_->setInterruptHandlerNoLock(core_->getInterruptHandler());
367 auto f = p.getFuture();
370 [state = detail::makeCoreCallbackState(
371 std::move(p), std::forward<F>(func))](Try<T> && t) mutable {
372 if (auto e = t.template tryGetExceptionObject<Exn>()) {
373 state.setTry(makeTryWith([&] { return state.invoke(*e); }));
375 state.setTry(std::move(t));
382 // onError where the callback returns Future<T>
385 typename std::enable_if<
386 !detail::callableWith<F, exception_wrapper>::value &&
387 detail::Extract<F>::ReturnsFuture::value,
389 Future<T>::onError(F&& func) {
391 std::is_same<typename detail::Extract<F>::Return, Future<T>>::value,
392 "Return type of onError callback must be T or Future<T>");
393 typedef std::remove_reference_t<typename detail::Extract<F>::FirstArg> Exn;
396 auto f = p.getFuture();
399 [state = detail::makeCoreCallbackState(
400 std::move(p), std::forward<F>(func))](Try<T> && t) mutable {
401 if (auto e = t.template tryGetExceptionObject<Exn>()) {
402 auto tf2 = state.tryInvoke(*e);
403 if (tf2.hasException()) {
404 state.setException(std::move(tf2.exception()));
406 tf2->setCallback_([p = state.stealPromise()](Try<T> && t3) mutable {
407 p.setTry(std::move(t3));
411 state.setTry(std::move(t));
420 Future<T> Future<T>::ensure(F&& func) {
421 return this->then([funcw = std::forward<F>(func)](Try<T> && t) mutable {
423 return makeFuture(std::move(t));
429 Future<T> Future<T>::onTimeout(Duration dur, F&& func, Timekeeper* tk) {
430 return within(dur, tk).onError([funcw = std::forward<F>(func)](
431 TimedOut const&) { return std::move(funcw)(); });
436 typename std::enable_if<detail::callableWith<F, exception_wrapper>::value &&
437 detail::Extract<F>::ReturnsFuture::value,
439 Future<T>::onError(F&& func) {
441 std::is_same<typename detail::Extract<F>::Return, Future<T>>::value,
442 "Return type of onError callback must be T or Future<T>");
445 auto f = p.getFuture();
447 [state = detail::makeCoreCallbackState(
448 std::move(p), std::forward<F>(func))](Try<T> t) mutable {
449 if (t.hasException()) {
450 auto tf2 = state.tryInvoke(std::move(t.exception()));
451 if (tf2.hasException()) {
452 state.setException(std::move(tf2.exception()));
454 tf2->setCallback_([p = state.stealPromise()](Try<T> && t3) mutable {
455 p.setTry(std::move(t3));
459 state.setTry(std::move(t));
466 // onError(exception_wrapper) that returns T
469 typename std::enable_if<
470 detail::callableWith<F, exception_wrapper>::value &&
471 !detail::Extract<F>::ReturnsFuture::value,
473 Future<T>::onError(F&& func) {
475 std::is_same<typename detail::Extract<F>::Return, Future<T>>::value,
476 "Return type of onError callback must be T or Future<T>");
479 auto f = p.getFuture();
481 [state = detail::makeCoreCallbackState(
482 std::move(p), std::forward<F>(func))](Try<T> && t) mutable {
483 if (t.hasException()) {
484 state.setTry(makeTryWith(
485 [&] { return state.invoke(std::move(t.exception())); }));
487 state.setTry(std::move(t));
495 typename std::add_lvalue_reference<T>::type Future<T>::value() {
498 return core_->getTry().value();
502 typename std::add_lvalue_reference<const T>::type Future<T>::value() const {
505 return core_->getTry().value();
509 Try<T>& Future<T>::getTry() {
512 return core_->getTry();
516 Try<T>& Future<T>::getTryVia(DrivableExecutor* e) {
517 return waitVia(e).getTry();
521 Optional<Try<T>> Future<T>::poll() {
523 if (core_->ready()) {
524 o = std::move(core_->getTry());
530 inline Future<T> Future<T>::via(Executor* executor, int8_t priority) && {
533 setExecutor(executor, priority);
535 return std::move(*this);
539 inline Future<T> Future<T>::via(Executor* executor, int8_t priority) & {
543 auto f = p.getFuture();
544 then([p = std::move(p)](Try<T> && t) mutable { p.setTry(std::move(t)); });
545 return std::move(f).via(executor, priority);
548 template <class Func>
549 auto via(Executor* x, Func&& func)
550 -> Future<typename isFuture<decltype(std::declval<Func>()())>::Inner> {
551 // TODO make this actually more performant. :-P #7260175
552 return via(x).then(std::forward<Func>(func));
556 bool Future<T>::isReady() const {
558 return core_->ready();
562 bool Future<T>::hasValue() {
563 return getTry().hasValue();
567 bool Future<T>::hasException() {
568 return getTry().hasException();
572 void Future<T>::raise(exception_wrapper exception) {
573 core_->raise(std::move(exception));
577 Future<T>::Future(detail::EmptyConstruct) noexcept
583 Future<typename std::decay<T>::type> makeFuture(T&& t) {
584 return makeFuture(Try<typename std::decay<T>::type>(std::forward<T>(t)));
587 inline // for multiple translation units
588 Future<Unit> makeFuture() {
589 return makeFuture(Unit{});
592 // makeFutureWith(Future<T>()) -> Future<T>
594 typename std::enable_if<isFuture<typename std::result_of<F()>::type>::value,
595 typename std::result_of<F()>::type>::type
596 makeFutureWith(F&& func) {
598 typename isFuture<typename std::result_of<F()>::type>::Inner;
600 return std::forward<F>(func)();
601 } catch (std::exception& e) {
602 return makeFuture<InnerType>(
603 exception_wrapper(std::current_exception(), e));
605 return makeFuture<InnerType>(exception_wrapper(std::current_exception()));
609 // makeFutureWith(T()) -> Future<T>
610 // makeFutureWith(void()) -> Future<Unit>
612 typename std::enable_if<
613 !(isFuture<typename std::result_of<F()>::type>::value),
614 Future<typename Unit::Lift<typename std::result_of<F()>::type>::type>>::type
615 makeFutureWith(F&& func) {
617 typename Unit::Lift<typename std::result_of<F()>::type>::type;
618 return makeFuture<LiftedResult>(
619 makeTryWith([&func]() mutable { return std::forward<F>(func)(); }));
623 Future<T> makeFuture(std::exception_ptr const& e) {
624 return makeFuture(Try<T>(e));
628 Future<T> makeFuture(exception_wrapper ew) {
629 return makeFuture(Try<T>(std::move(ew)));
632 template <class T, class E>
633 typename std::enable_if<std::is_base_of<std::exception, E>::value,
635 makeFuture(E const& e) {
636 return makeFuture(Try<T>(make_exception_wrapper<E>(e)));
640 Future<T> makeFuture(Try<T>&& t) {
641 return Future<T>(new detail::Core<T>(std::move(t)));
645 Future<Unit> via(Executor* executor, int8_t priority) {
646 return makeFuture().via(executor, priority);
649 // mapSetCallback calls func(i, Try<T>) when every future completes
651 template <class T, class InputIterator, class F>
652 void mapSetCallback(InputIterator first, InputIterator last, F func) {
653 for (size_t i = 0; first != last; ++first, ++i) {
654 first->setCallback_([func, i](Try<T>&& t) {
655 func(i, std::move(t));
660 // collectAll (variadic)
662 template <typename... Fs>
663 typename detail::CollectAllVariadicContext<
664 typename std::decay<Fs>::type::value_type...>::type
665 collectAll(Fs&&... fs) {
666 auto ctx = std::make_shared<detail::CollectAllVariadicContext<
667 typename std::decay<Fs>::type::value_type...>>();
668 detail::collectVariadicHelper<detail::CollectAllVariadicContext>(
669 ctx, std::forward<Fs>(fs)...);
670 return ctx->p.getFuture();
673 // collectAll (iterator)
675 template <class InputIterator>
678 Try<typename std::iterator_traits<InputIterator>::value_type::value_type>>>
679 collectAll(InputIterator first, InputIterator last) {
681 typename std::iterator_traits<InputIterator>::value_type::value_type T;
683 struct CollectAllContext {
684 CollectAllContext(size_t n) : results(n) {}
685 ~CollectAllContext() {
686 p.setValue(std::move(results));
688 Promise<std::vector<Try<T>>> p;
689 std::vector<Try<T>> results;
693 std::make_shared<CollectAllContext>(size_t(std::distance(first, last)));
694 mapSetCallback<T>(first, last, [ctx](size_t i, Try<T>&& t) {
695 ctx->results[i] = std::move(t);
697 return ctx->p.getFuture();
700 // collect (iterator)
704 template <typename T>
705 struct CollectContext {
707 explicit Nothing(int /* n */) {}
710 using Result = typename std::conditional<
711 std::is_void<T>::value,
713 std::vector<T>>::type;
715 using InternalResult = typename std::conditional<
716 std::is_void<T>::value,
718 std::vector<Optional<T>>>::type;
720 explicit CollectContext(size_t n) : result(n) {}
722 if (!threw.exchange(true)) {
723 // map Optional<T> -> T
724 std::vector<T> finalResult;
725 finalResult.reserve(result.size());
726 std::transform(result.begin(), result.end(),
727 std::back_inserter(finalResult),
728 [](Optional<T>& o) { return std::move(o.value()); });
729 p.setValue(std::move(finalResult));
732 inline void setPartialResult(size_t i, Try<T>& t) {
733 result[i] = std::move(t.value());
736 InternalResult result;
737 std::atomic<bool> threw {false};
742 template <class InputIterator>
743 Future<typename detail::CollectContext<
744 typename std::iterator_traits<InputIterator>::value_type::value_type>::Result>
745 collect(InputIterator first, InputIterator last) {
747 typename std::iterator_traits<InputIterator>::value_type::value_type T;
749 auto ctx = std::make_shared<detail::CollectContext<T>>(
750 std::distance(first, last));
751 mapSetCallback<T>(first, last, [ctx](size_t i, Try<T>&& t) {
752 if (t.hasException()) {
753 if (!ctx->threw.exchange(true)) {
754 ctx->p.setException(std::move(t.exception()));
756 } else if (!ctx->threw) {
757 ctx->setPartialResult(i, t);
760 return ctx->p.getFuture();
763 // collect (variadic)
765 template <typename... Fs>
766 typename detail::CollectVariadicContext<
767 typename std::decay<Fs>::type::value_type...>::type
768 collect(Fs&&... fs) {
769 auto ctx = std::make_shared<detail::CollectVariadicContext<
770 typename std::decay<Fs>::type::value_type...>>();
771 detail::collectVariadicHelper<detail::CollectVariadicContext>(
772 ctx, std::forward<Fs>(fs)...);
773 return ctx->p.getFuture();
776 // collectAny (iterator)
778 template <class InputIterator>
783 std::iterator_traits<InputIterator>::value_type::value_type>>>
784 collectAny(InputIterator first, InputIterator last) {
786 typename std::iterator_traits<InputIterator>::value_type::value_type T;
788 struct CollectAnyContext {
789 CollectAnyContext() {}
790 Promise<std::pair<size_t, Try<T>>> p;
791 std::atomic<bool> done {false};
794 auto ctx = std::make_shared<CollectAnyContext>();
795 mapSetCallback<T>(first, last, [ctx](size_t i, Try<T>&& t) {
796 if (!ctx->done.exchange(true)) {
797 ctx->p.setValue(std::make_pair(i, std::move(t)));
800 return ctx->p.getFuture();
803 // collectAnyWithoutException (iterator)
805 template <class InputIterator>
808 typename std::iterator_traits<InputIterator>::value_type::value_type>>
809 collectAnyWithoutException(InputIterator first, InputIterator last) {
811 typename std::iterator_traits<InputIterator>::value_type::value_type T;
813 struct CollectAnyWithoutExceptionContext {
814 CollectAnyWithoutExceptionContext(){}
815 Promise<std::pair<size_t, T>> p;
816 std::atomic<bool> done{false};
817 std::atomic<size_t> nFulfilled{0};
821 auto ctx = std::make_shared<CollectAnyWithoutExceptionContext>();
822 ctx->nTotal = size_t(std::distance(first, last));
824 mapSetCallback<T>(first, last, [ctx](size_t i, Try<T>&& t) {
825 if (!t.hasException() && !ctx->done.exchange(true)) {
826 ctx->p.setValue(std::make_pair(i, std::move(t.value())));
827 } else if (++ctx->nFulfilled == ctx->nTotal) {
828 ctx->p.setException(t.exception());
831 return ctx->p.getFuture();
834 // collectN (iterator)
836 template <class InputIterator>
837 Future<std::vector<std::pair<size_t, Try<typename
838 std::iterator_traits<InputIterator>::value_type::value_type>>>>
839 collectN(InputIterator first, InputIterator last, size_t n) {
841 std::iterator_traits<InputIterator>::value_type::value_type T;
842 typedef std::vector<std::pair<size_t, Try<T>>> V;
844 struct CollectNContext {
846 std::atomic<size_t> completed = {0};
849 auto ctx = std::make_shared<CollectNContext>();
851 if (size_t(std::distance(first, last)) < n) {
852 ctx->p.setException(std::runtime_error("Not enough futures"));
854 // for each completed Future, increase count and add to vector, until we
855 // have n completed futures at which point we fulfil our Promise with the
857 mapSetCallback<T>(first, last, [ctx, n](size_t i, Try<T>&& t) {
858 auto c = ++ctx->completed;
860 assert(ctx->v.size() < n);
861 ctx->v.emplace_back(i, std::move(t));
863 ctx->p.setTry(Try<V>(std::move(ctx->v)));
869 return ctx->p.getFuture();
874 template <class It, class T, class F>
875 Future<T> reduce(It first, It last, T&& initial, F&& func) {
877 return makeFuture(std::move(initial));
880 typedef typename std::iterator_traits<It>::value_type::value_type ItT;
882 typename std::conditional<detail::callableWith<F, T&&, Try<ItT>&&>::value,
885 typedef isTry<Arg> IsTry;
887 auto sfunc = std::make_shared<F>(std::move(func));
889 auto f = first->then(
890 [ minitial = std::move(initial), sfunc ](Try<ItT> & head) mutable {
892 std::move(minitial), head.template get<IsTry::value, Arg&&>());
895 for (++first; first != last; ++first) {
896 f = collectAll(f, *first).then([sfunc](std::tuple<Try<T>, Try<ItT>>& t) {
897 return (*sfunc)(std::move(std::get<0>(t).value()),
898 // Either return a ItT&& or a Try<ItT>&& depending
899 // on the type of the argument of func.
900 std::get<1>(t).template get<IsTry::value, Arg&&>());
907 // window (collection)
909 template <class Collection, class F, class ItT, class Result>
910 std::vector<Future<Result>>
911 window(Collection input, F func, size_t n) {
912 struct WindowContext {
913 WindowContext(Collection&& i, F&& fn)
914 : input_(std::move(i)), promises_(input_.size()),
917 std::atomic<size_t> i_ {0};
919 std::vector<Promise<Result>> promises_;
922 static inline void spawn(const std::shared_ptr<WindowContext>& ctx) {
923 size_t i = ctx->i_++;
924 if (i < ctx->input_.size()) {
925 // Using setCallback_ directly since we don't need the Future
926 ctx->func_(std::move(ctx->input_[i])).setCallback_(
927 // ctx is captured by value
928 [ctx, i](Try<Result>&& t) {
929 ctx->promises_[i].setTry(std::move(t));
930 // Chain another future onto this one
931 spawn(std::move(ctx));
937 auto max = std::min(n, input.size());
939 auto ctx = std::make_shared<WindowContext>(
940 std::move(input), std::move(func));
942 for (size_t i = 0; i < max; ++i) {
943 // Start the first n Futures
944 WindowContext::spawn(ctx);
947 std::vector<Future<Result>> futures;
948 futures.reserve(ctx->promises_.size());
949 for (auto& promise : ctx->promises_) {
950 futures.emplace_back(promise.getFuture());
959 template <class I, class F>
960 Future<I> Future<T>::reduce(I&& initial, F&& func) {
962 minitial = std::forward<I>(initial),
963 mfunc = std::forward<F>(func)
965 auto ret = std::move(minitial);
966 for (auto& val : vals) {
967 ret = mfunc(std::move(ret), std::move(val));
973 // unorderedReduce (iterator)
975 template <class It, class T, class F, class ItT, class Arg>
976 Future<T> unorderedReduce(It first, It last, T initial, F func) {
978 return makeFuture(std::move(initial));
981 typedef isTry<Arg> IsTry;
983 struct UnorderedReduceContext {
984 UnorderedReduceContext(T&& memo, F&& fn, size_t n)
985 : lock_(), memo_(makeFuture<T>(std::move(memo))),
986 func_(std::move(fn)), numThens_(0), numFutures_(n), promise_()
988 folly::MicroSpinLock lock_; // protects memo_ and numThens_
991 size_t numThens_; // how many Futures completed and called .then()
992 size_t numFutures_; // how many Futures in total
996 auto ctx = std::make_shared<UnorderedReduceContext>(
997 std::move(initial), std::move(func), std::distance(first, last));
1002 [ctx](size_t /* i */, Try<ItT>&& t) {
1003 // Futures can be completed in any order, simultaneously.
1004 // To make this non-blocking, we create a new Future chain in
1005 // the order of completion to reduce the values.
1006 // The spinlock just protects chaining a new Future, not actually
1007 // executing the reduce, which should be really fast.
1008 folly::MSLGuard lock(ctx->lock_);
1010 ctx->memo_.then([ ctx, mt = std::move(t) ](T && v) mutable {
1011 // Either return a ItT&& or a Try<ItT>&& depending
1012 // on the type of the argument of func.
1013 return ctx->func_(std::move(v),
1014 mt.template get<IsTry::value, Arg&&>());
1016 if (++ctx->numThens_ == ctx->numFutures_) {
1017 // After reducing the value of the last Future, fulfill the Promise
1018 ctx->memo_.setCallback_(
1019 [ctx](Try<T>&& t2) { ctx->promise_.setValue(std::move(t2)); });
1023 return ctx->promise_.getFuture();
1029 Future<T> Future<T>::within(Duration dur, Timekeeper* tk) {
1030 return within(dur, TimedOut(), tk);
1035 Future<T> Future<T>::within(Duration dur, E e, Timekeeper* tk) {
1038 Context(E ex) : exception(std::move(ex)), promise() {}
1040 Future<Unit> thisFuture;
1042 std::atomic<bool> token {false};
1045 std::shared_ptr<Timekeeper> tks;
1047 tks = folly::detail::getTimekeeperSingleton();
1048 tk = DCHECK_NOTNULL(tks.get());
1051 auto ctx = std::make_shared<Context>(std::move(e));
1053 ctx->thisFuture = this->then([ctx](Try<T>&& t) mutable {
1054 // TODO: "this" completed first, cancel "after"
1055 if (ctx->token.exchange(true) == false) {
1056 ctx->promise.setTry(std::move(t));
1060 tk->after(dur).then([ctx](Try<Unit> const& t) mutable {
1061 // "after" completed first, cancel "this"
1062 ctx->thisFuture.raise(TimedOut());
1063 if (ctx->token.exchange(true) == false) {
1064 if (t.hasException()) {
1065 ctx->promise.setException(std::move(t.exception()));
1067 ctx->promise.setException(std::move(ctx->exception));
1072 return ctx->promise.getFuture().via(getExecutor());
1078 Future<T> Future<T>::delayed(Duration dur, Timekeeper* tk) {
1079 return collectAll(*this, futures::sleep(dur, tk))
1080 .then([](std::tuple<Try<T>, Try<Unit>> tup) {
1081 Try<T>& t = std::get<0>(tup);
1082 return makeFuture<T>(std::move(t));
1089 void waitImpl(Future<T>& f) {
1090 // short-circuit if there's nothing to do
1091 if (f.isReady()) return;
1093 FutureBatonType baton;
1094 f.setCallback_([&](const Try<T>& /* t */) { baton.post(); });
1096 assert(f.isReady());
1100 void waitImpl(Future<T>& f, Duration dur) {
1101 // short-circuit if there's nothing to do
1107 auto ret = promise.getFuture();
1108 auto baton = std::make_shared<FutureBatonType>();
1109 f.setCallback_([ baton, promise = std::move(promise) ](Try<T> && t) mutable {
1110 promise.setTry(std::move(t));
1114 if (baton->timed_wait(dur)) {
1115 assert(f.isReady());
1120 void waitViaImpl(Future<T>& f, DrivableExecutor* e) {
1121 // Set callback so to ensure that the via executor has something on it
1122 // so that once the preceding future triggers this callback, drive will
1123 // always have a callback to satisfy it
1126 f = f.via(e).then([](T&& t) { return std::move(t); });
1127 while (!f.isReady()) {
1130 assert(f.isReady());
1136 Future<T>& Future<T>::wait() & {
1137 detail::waitImpl(*this);
1142 Future<T>&& Future<T>::wait() && {
1143 detail::waitImpl(*this);
1144 return std::move(*this);
1148 Future<T>& Future<T>::wait(Duration dur) & {
1149 detail::waitImpl(*this, dur);
1154 Future<T>&& Future<T>::wait(Duration dur) && {
1155 detail::waitImpl(*this, dur);
1156 return std::move(*this);
1160 Future<T>& Future<T>::waitVia(DrivableExecutor* e) & {
1161 detail::waitViaImpl(*this, e);
1166 Future<T>&& Future<T>::waitVia(DrivableExecutor* e) && {
1167 detail::waitViaImpl(*this, e);
1168 return std::move(*this);
1172 T Future<T>::get() {
1173 return std::move(wait().value());
1177 T Future<T>::get(Duration dur) {
1180 return std::move(value());
1187 T Future<T>::getVia(DrivableExecutor* e) {
1188 return std::move(waitVia(e).value());
1194 static bool equals(const Try<T>& t1, const Try<T>& t2) {
1195 return t1.value() == t2.value();
1201 Future<bool> Future<T>::willEqual(Future<T>& f) {
1202 return collectAll(*this, f).then([](const std::tuple<Try<T>, Try<T>>& t) {
1203 if (std::get<0>(t).hasValue() && std::get<1>(t).hasValue()) {
1204 return detail::TryEquals<T>::equals(std::get<0>(t), std::get<1>(t));
1213 Future<T> Future<T>::filter(F&& predicate) {
1214 return this->then([p = std::forward<F>(predicate)](T val) {
1215 T const& valConstRef = val;
1216 if (!p(valConstRef)) {
1217 throw PredicateDoesNotObtain();
1224 inline Future<Unit> when(bool p, F&& thunk) {
1225 return p ? std::forward<F>(thunk)().unit() : makeFuture();
1228 template <class P, class F>
1229 Future<Unit> whileDo(P&& predicate, F&& thunk) {
1231 auto future = thunk();
1232 return future.then([
1233 predicate = std::forward<P>(predicate),
1234 thunk = std::forward<F>(thunk)
1236 return whileDo(std::forward<P>(predicate), std::forward<F>(thunk));
1239 return makeFuture();
1243 Future<Unit> times(const int n, F&& thunk) {
1244 return folly::whileDo(
1245 [ n, count = std::make_unique<std::atomic<int>>(0) ]() mutable {
1246 return count->fetch_add(1) < n;
1248 std::forward<F>(thunk));
1252 template <class It, class F, class ItT, class Result>
1253 std::vector<Future<Result>> map(It first, It last, F func) {
1254 std::vector<Future<Result>> results;
1255 for (auto it = first; it != last; it++) {
1256 results.push_back(it->then(func));
1266 struct retrying_policy_raw_tag {};
1267 struct retrying_policy_fut_tag {};
1269 template <class Policy>
1270 struct retrying_policy_traits {
1271 using result = std::result_of_t<Policy(size_t, const exception_wrapper&)>;
1272 using is_raw = std::is_same<result, bool>;
1273 using is_fut = std::is_same<result, Future<bool>>;
1274 using tag = typename std::conditional<
1275 is_raw::value, retrying_policy_raw_tag, typename std::conditional<
1276 is_fut::value, retrying_policy_fut_tag, void>::type>::type;
1279 template <class Policy, class FF, class Prom>
1280 void retryingImpl(size_t k, Policy&& p, FF&& ff, Prom prom) {
1281 using F = typename std::result_of<FF(size_t)>::type;
1282 using T = typename F::value_type;
1283 auto f = makeFutureWith([&] { return ff(k++); });
1286 prom = std::move(prom),
1287 pm = std::forward<Policy>(p),
1288 ffm = std::forward<FF>(ff)
1289 ](Try<T> && t) mutable {
1291 prom.setValue(std::move(t).value());
1294 auto& x = t.exception();
1298 prom = std::move(prom),
1301 ffm = std::move(ffm)
1302 ](bool shouldRetry) mutable {
1304 retryingImpl(k, std::move(pm), std::move(ffm), std::move(prom));
1306 prom.setException(std::move(xm));
1312 template <class Policy, class FF>
1313 typename std::result_of<FF(size_t)>::type
1314 retrying(size_t k, Policy&& p, FF&& ff) {
1315 using F = typename std::result_of<FF(size_t)>::type;
1316 using T = typename F::value_type;
1317 auto prom = Promise<T>();
1318 auto f = prom.getFuture();
1320 k, std::forward<Policy>(p), std::forward<FF>(ff), std::move(prom));
1324 template <class Policy, class FF>
1325 typename std::result_of<FF(size_t)>::type
1326 retrying(Policy&& p, FF&& ff, retrying_policy_raw_tag) {
1327 auto q = [pm = std::forward<Policy>(p)](size_t k, exception_wrapper x) {
1328 return makeFuture<bool>(pm(k, x));
1330 return retrying(0, std::move(q), std::forward<FF>(ff));
1333 template <class Policy, class FF>
1334 typename std::result_of<FF(size_t)>::type
1335 retrying(Policy&& p, FF&& ff, retrying_policy_fut_tag) {
1336 return retrying(0, std::forward<Policy>(p), std::forward<FF>(ff));
1339 // jittered exponential backoff, clamped to [backoff_min, backoff_max]
1340 template <class URNG>
1341 Duration retryingJitteredExponentialBackoffDur(
1343 Duration backoff_min,
1344 Duration backoff_max,
1345 double jitter_param,
1348 auto dist = std::normal_distribution<double>(0.0, jitter_param);
1349 auto jitter = std::exp(dist(rng));
1350 auto backoff = d(d::rep(jitter * backoff_min.count() * std::pow(2, n - 1)));
1351 return std::max(backoff_min, std::min(backoff_max, backoff));
1354 template <class Policy, class URNG>
1355 std::function<Future<bool>(size_t, const exception_wrapper&)>
1356 retryingPolicyCappedJitteredExponentialBackoff(
1358 Duration backoff_min,
1359 Duration backoff_max,
1360 double jitter_param,
1364 pm = std::forward<Policy>(p),
1369 rngp = std::forward<URNG>(rng)
1370 ](size_t n, const exception_wrapper& ex) mutable {
1371 if (n == max_tries) {
1372 return makeFuture(false);
1374 return pm(n, ex).then(
1375 [ n, backoff_min, backoff_max, jitter_param, rngp = std::move(rngp) ](
1378 return makeFuture(false);
1380 auto backoff = detail::retryingJitteredExponentialBackoffDur(
1381 n, backoff_min, backoff_max, jitter_param, rngp);
1382 return futures::sleep(backoff).then([] { return true; });
1387 template <class Policy, class URNG>
1388 std::function<Future<bool>(size_t, const exception_wrapper&)>
1389 retryingPolicyCappedJitteredExponentialBackoff(
1391 Duration backoff_min,
1392 Duration backoff_max,
1393 double jitter_param,
1396 retrying_policy_raw_tag) {
1397 auto q = [pm = std::forward<Policy>(p)](
1398 size_t n, const exception_wrapper& e) {
1399 return makeFuture(pm(n, e));
1401 return retryingPolicyCappedJitteredExponentialBackoff(
1406 std::forward<URNG>(rng),
1410 template <class Policy, class URNG>
1411 std::function<Future<bool>(size_t, const exception_wrapper&)>
1412 retryingPolicyCappedJitteredExponentialBackoff(
1414 Duration backoff_min,
1415 Duration backoff_max,
1416 double jitter_param,
1419 retrying_policy_fut_tag) {
1420 return retryingPolicyCappedJitteredExponentialBackoff(
1425 std::forward<URNG>(rng),
1426 std::forward<Policy>(p));
1430 template <class Policy, class FF>
1431 typename std::result_of<FF(size_t)>::type
1432 retrying(Policy&& p, FF&& ff) {
1433 using tag = typename detail::retrying_policy_traits<Policy>::tag;
1434 return detail::retrying(std::forward<Policy>(p), std::forward<FF>(ff), tag());
1438 std::function<bool(size_t, const exception_wrapper&)>
1439 retryingPolicyBasic(
1441 return [=](size_t n, const exception_wrapper&) { return n < max_tries; };
1444 template <class Policy, class URNG>
1445 std::function<Future<bool>(size_t, const exception_wrapper&)>
1446 retryingPolicyCappedJitteredExponentialBackoff(
1448 Duration backoff_min,
1449 Duration backoff_max,
1450 double jitter_param,
1453 using tag = typename detail::retrying_policy_traits<Policy>::tag;
1454 return detail::retryingPolicyCappedJitteredExponentialBackoff(
1459 std::forward<URNG>(rng),
1460 std::forward<Policy>(p),
1465 std::function<Future<bool>(size_t, const exception_wrapper&)>
1466 retryingPolicyCappedJitteredExponentialBackoff(
1468 Duration backoff_min,
1469 Duration backoff_max,
1470 double jitter_param) {
1471 auto p = [](size_t, const exception_wrapper&) { return true; };
1472 return retryingPolicyCappedJitteredExponentialBackoff(
1483 // Instantiate the most common Future types to save compile time
1484 extern template class Future<Unit>;
1485 extern template class Future<bool>;
1486 extern template class Future<int>;
1487 extern template class Future<int64_t>;
1488 extern template class Future<std::string>;
1489 extern template class Future<double>;
1491 } // namespace folly