2 * Copyright 2017 Facebook, Inc.
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
8 * http://www.apache.org/licenses/LICENSE-2.0
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
24 #include <folly/Baton.h>
25 #include <folly/Optional.h>
26 #include <folly/Random.h>
27 #include <folly/Traits.h>
28 #include <folly/futures/detail/Core.h>
29 #include <folly/futures/Timekeeper.h>
31 #if FOLLY_MOBILE || defined(__APPLE__)
32 #define FOLLY_FUTURE_USING_FIBER 0
34 #define FOLLY_FUTURE_USING_FIBER 1
35 #include <folly/fibers/Baton.h>
43 #if FOLLY_FUTURE_USING_FIBER
44 typedef folly::fibers::Baton FutureBatonType;
46 typedef folly::Baton<> FutureBatonType;
51 std::shared_ptr<Timekeeper> getTimekeeperSingleton();
55 Future<T>::Future(Future<T>&& other) noexcept : core_(other.core_) {
56 other.core_ = nullptr;
60 Future<T>& Future<T>::operator=(Future<T>&& other) noexcept {
61 std::swap(core_, other.core_);
66 template <class T2, typename>
67 Future<T>::Future(T2&& val)
68 : core_(new detail::Core<T>(Try<T>(std::forward<T2>(val)))) {}
71 template <typename T2>
72 Future<T>::Future(typename std::enable_if<std::is_same<Unit, T2>::value>::type*)
73 : core_(new detail::Core<T>(Try<T>(T()))) {}
76 Future<T>::~Future() {
81 void Future<T>::detach() {
83 core_->detachFuture();
89 void Future<T>::throwIfInvalid() const {
96 void Future<T>::setCallback_(F&& func) {
98 core_->setCallback(std::forward<F>(func));
105 typename std::enable_if<isFuture<F>::value,
106 Future<typename isFuture<T>::Inner>>::type
107 Future<T>::unwrap() {
108 return then([](Future<typename isFuture<T>::Inner> internal_future) {
109 return internal_future;
115 // Variant: returns a value
116 // e.g. f.then([](Try<T>&& t){ return t.value(); });
118 template <typename F, typename R, bool isTry, typename... Args>
119 typename std::enable_if<!R::ReturnsFuture::value, typename R::Return>::type
120 Future<T>::thenImplementation(F&& func, detail::argResult<isTry, F, Args...>) {
121 static_assert(sizeof...(Args) <= 1, "Then must take zero/one argument");
122 typedef typename R::ReturnsFuture::Inner B;
127 p.core_->setInterruptHandlerNoLock(core_->getInterruptHandler());
129 // grab the Future now before we lose our handle on the Promise
130 auto f = p.getFuture();
131 f.core_->setExecutorNoLock(getExecutor());
133 /* This is a bit tricky.
135 We can't just close over *this in case this Future gets moved. So we
136 make a new dummy Future. We could figure out something more
137 sophisticated that avoids making a new Future object when it can, as an
138 optimization. But this is correct.
140 core_ can't be moved, it is explicitly disallowed (as is copying). But
141 if there's ever a reason to allow it, this is one place that makes that
142 assumption and would need to be fixed. We use a standard shared pointer
143 for core_ (by copying it in), which means in essence obj holds a shared
144 pointer to itself. But this shouldn't leak because Promise will not
145 outlive the continuation, because Promise will setException() with a
146 broken Promise if it is destructed before completed. We could use a
147 weak pointer but it would have to be converted to a shared pointer when
148 func is executed (because the Future returned by func may possibly
149 persist beyond the callback, if it gets moved), and so it is an
150 optimization to just make it shared from the get-go.
152 Two subtle but important points about this design. detail::Core has no
153 back pointers to Future or Promise, so if Future or Promise get moved
154 (and they will be moved in performant code) we don't have to do
155 anything fancy. And because we store the continuation in the
156 detail::Core, not in the Future, we can execute the continuation even
157 after the Future has gone out of scope. This is an intentional design
158 decision. It is likely we will want to be able to cancel a continuation
159 in some circumstances, but I think it should be explicit not implicit
160 in the destruction of the Future used to create it.
162 setCallback_([ funcm = std::forward<F>(func), pm = std::move(p) ](
163 Try<T> && t) mutable {
164 if (!isTry && t.hasException()) {
165 pm.setException(std::move(t.exception()));
167 pm.setWith([&]() { return funcm(t.template get<isTry, Args>()...); });
174 // Variant: returns a Future
175 // e.g. f.then([](T&& t){ return makeFuture<T>(t); });
177 template <typename F, typename R, bool isTry, typename... Args>
178 typename std::enable_if<R::ReturnsFuture::value, typename R::Return>::type
179 Future<T>::thenImplementation(F&& func, detail::argResult<isTry, F, Args...>) {
180 static_assert(sizeof...(Args) <= 1, "Then must take zero/one argument");
181 typedef typename R::ReturnsFuture::Inner B;
186 p.core_->setInterruptHandlerNoLock(core_->getInterruptHandler());
188 // grab the Future now before we lose our handle on the Promise
189 auto f = p.getFuture();
190 f.core_->setExecutorNoLock(getExecutor());
192 setCallback_([ funcm = std::forward<F>(func), pm = std::move(p) ](
193 Try<T> && t) mutable {
195 if (!isTry && t.hasException()) {
196 return std::move(t.exception());
199 auto f2 = funcm(t.template get<isTry, Args>()...);
200 // that didn't throw, now we can steal p
201 f2.setCallback_([p = std::move(pm)](Try<B> && b) mutable {
202 p.setTry(std::move(b));
204 return exception_wrapper();
205 } catch (const std::exception& e) {
206 return exception_wrapper(std::current_exception(), e);
208 return exception_wrapper(std::current_exception());
213 pm.setException(std::move(ew));
220 template <typename T>
221 template <typename R, typename Caller, typename... Args>
222 Future<typename isFuture<R>::Inner>
223 Future<T>::then(R(Caller::*func)(Args...), Caller *instance) {
224 typedef typename std::remove_cv<
225 typename std::remove_reference<
226 typename detail::ArgType<Args...>::FirstArg>::type>::type FirstArg;
227 return then([instance, func](Try<T>&& t){
228 return (instance->*func)(t.template get<isTry<FirstArg>::value, Args>()...);
233 template <class Executor, class Arg, class... Args>
234 auto Future<T>::then(Executor* x, Arg&& arg, Args&&... args)
235 -> decltype(this->then(std::forward<Arg>(arg),
236 std::forward<Args>(args)...))
238 auto oldX = getExecutor();
240 return this->then(std::forward<Arg>(arg), std::forward<Args>(args)...).
245 Future<Unit> Future<T>::then() {
246 return then([] () {});
249 // onError where the callback returns T
252 typename std::enable_if<
253 !detail::callableWith<F, exception_wrapper>::value &&
254 !detail::Extract<F>::ReturnsFuture::value,
256 Future<T>::onError(F&& func) {
257 typedef typename detail::Extract<F>::FirstArg Exn;
259 std::is_same<typename detail::Extract<F>::RawReturn, T>::value,
260 "Return type of onError callback must be T or Future<T>");
263 p.core_->setInterruptHandlerNoLock(core_->getInterruptHandler());
264 auto f = p.getFuture();
266 setCallback_([ funcm = std::forward<F>(func), pm = std::move(p) ](
267 Try<T> && t) mutable {
268 if (!t.template withException<Exn>(
269 [&](Exn& e) { pm.setWith([&] { return funcm(e); }); })) {
270 pm.setTry(std::move(t));
277 // onError where the callback returns Future<T>
280 typename std::enable_if<
281 !detail::callableWith<F, exception_wrapper>::value &&
282 detail::Extract<F>::ReturnsFuture::value,
284 Future<T>::onError(F&& func) {
286 std::is_same<typename detail::Extract<F>::Return, Future<T>>::value,
287 "Return type of onError callback must be T or Future<T>");
288 typedef typename detail::Extract<F>::FirstArg Exn;
291 auto f = p.getFuture();
293 setCallback_([ pm = std::move(p), funcm = std::forward<F>(func) ](
294 Try<T> && t) mutable {
295 if (!t.template withException<Exn>([&](Exn& e) {
299 f2.setCallback_([pm = std::move(pm)](Try<T> && t2) mutable {
300 pm.setTry(std::move(t2));
302 return exception_wrapper();
303 } catch (const std::exception& e2) {
304 return exception_wrapper(std::current_exception(), e2);
306 return exception_wrapper(std::current_exception());
310 pm.setException(std::move(ew));
313 pm.setTry(std::move(t));
322 Future<T> Future<T>::ensure(F&& func) {
323 return this->then([funcw = std::forward<F>(func)](Try<T> && t) mutable {
325 return makeFuture(std::move(t));
331 Future<T> Future<T>::onTimeout(Duration dur, F&& func, Timekeeper* tk) {
332 return within(dur, tk).onError([funcw = std::forward<F>(func)](
333 TimedOut const&) { return funcw(); });
338 typename std::enable_if<detail::callableWith<F, exception_wrapper>::value &&
339 detail::Extract<F>::ReturnsFuture::value,
341 Future<T>::onError(F&& func) {
343 std::is_same<typename detail::Extract<F>::Return, Future<T>>::value,
344 "Return type of onError callback must be T or Future<T>");
347 auto f = p.getFuture();
349 [ pm = std::move(p), funcm = std::forward<F>(func) ](Try<T> t) mutable {
350 if (t.hasException()) {
353 auto f2 = funcm(std::move(t.exception()));
354 f2.setCallback_([pm = std::move(pm)](Try<T> t2) mutable {
355 pm.setTry(std::move(t2));
357 return exception_wrapper();
358 } catch (const std::exception& e2) {
359 return exception_wrapper(std::current_exception(), e2);
361 return exception_wrapper(std::current_exception());
365 pm.setException(std::move(ew));
368 pm.setTry(std::move(t));
375 // onError(exception_wrapper) that returns T
378 typename std::enable_if<
379 detail::callableWith<F, exception_wrapper>::value &&
380 !detail::Extract<F>::ReturnsFuture::value,
382 Future<T>::onError(F&& func) {
384 std::is_same<typename detail::Extract<F>::Return, Future<T>>::value,
385 "Return type of onError callback must be T or Future<T>");
388 auto f = p.getFuture();
390 [ pm = std::move(p), funcm = std::forward<F>(func) ](Try<T> t) mutable {
391 if (t.hasException()) {
392 pm.setWith([&] { return funcm(std::move(t.exception())); });
394 pm.setTry(std::move(t));
402 typename std::add_lvalue_reference<T>::type Future<T>::value() {
405 return core_->getTry().value();
409 typename std::add_lvalue_reference<const T>::type Future<T>::value() const {
412 return core_->getTry().value();
416 Try<T>& Future<T>::getTry() {
419 return core_->getTry();
423 Try<T>& Future<T>::getTryVia(DrivableExecutor* e) {
424 return waitVia(e).getTry();
428 Optional<Try<T>> Future<T>::poll() {
430 if (core_->ready()) {
431 o = std::move(core_->getTry());
437 inline Future<T> Future<T>::via(Executor* executor, int8_t priority) && {
440 setExecutor(executor, priority);
442 return std::move(*this);
446 inline Future<T> Future<T>::via(Executor* executor, int8_t priority) & {
450 auto f = p.getFuture();
451 then([p = std::move(p)](Try<T> && t) mutable { p.setTry(std::move(t)); });
452 return std::move(f).via(executor, priority);
455 template <class Func>
456 auto via(Executor* x, Func&& func)
457 -> Future<typename isFuture<decltype(func())>::Inner>
459 // TODO make this actually more performant. :-P #7260175
460 return via(x).then(std::forward<Func>(func));
464 bool Future<T>::isReady() const {
466 return core_->ready();
470 bool Future<T>::hasValue() {
471 return getTry().hasValue();
475 bool Future<T>::hasException() {
476 return getTry().hasException();
480 void Future<T>::raise(exception_wrapper exception) {
481 core_->raise(std::move(exception));
487 Future<typename std::decay<T>::type> makeFuture(T&& t) {
488 return makeFuture(Try<typename std::decay<T>::type>(std::forward<T>(t)));
491 inline // for multiple translation units
492 Future<Unit> makeFuture() {
493 return makeFuture(Unit{});
496 // makeFutureWith(Future<T>()) -> Future<T>
498 typename std::enable_if<isFuture<typename std::result_of<F()>::type>::value,
499 typename std::result_of<F()>::type>::type
500 makeFutureWith(F&& func) {
502 typename isFuture<typename std::result_of<F()>::type>::Inner;
505 } catch (std::exception& e) {
506 return makeFuture<InnerType>(
507 exception_wrapper(std::current_exception(), e));
509 return makeFuture<InnerType>(exception_wrapper(std::current_exception()));
513 // makeFutureWith(T()) -> Future<T>
514 // makeFutureWith(void()) -> Future<Unit>
516 typename std::enable_if<
517 !(isFuture<typename std::result_of<F()>::type>::value),
518 Future<typename Unit::Lift<typename std::result_of<F()>::type>::type>>::type
519 makeFutureWith(F&& func) {
521 typename Unit::Lift<typename std::result_of<F()>::type>::type;
522 return makeFuture<LiftedResult>(makeTryWith([&func]() mutable {
528 Future<T> makeFuture(std::exception_ptr const& e) {
529 return makeFuture(Try<T>(e));
533 Future<T> makeFuture(exception_wrapper ew) {
534 return makeFuture(Try<T>(std::move(ew)));
537 template <class T, class E>
538 typename std::enable_if<std::is_base_of<std::exception, E>::value,
540 makeFuture(E const& e) {
541 return makeFuture(Try<T>(make_exception_wrapper<E>(e)));
545 Future<T> makeFuture(Try<T>&& t) {
546 return Future<T>(new detail::Core<T>(std::move(t)));
550 Future<Unit> via(Executor* executor, int8_t priority) {
551 return makeFuture().via(executor, priority);
554 // mapSetCallback calls func(i, Try<T>) when every future completes
556 template <class T, class InputIterator, class F>
557 void mapSetCallback(InputIterator first, InputIterator last, F func) {
558 for (size_t i = 0; first != last; ++first, ++i) {
559 first->setCallback_([func, i](Try<T>&& t) {
560 func(i, std::move(t));
565 // collectAll (variadic)
567 template <typename... Fs>
568 typename detail::CollectAllVariadicContext<
569 typename std::decay<Fs>::type::value_type...>::type
570 collectAll(Fs&&... fs) {
571 auto ctx = std::make_shared<detail::CollectAllVariadicContext<
572 typename std::decay<Fs>::type::value_type...>>();
573 detail::collectVariadicHelper<detail::CollectAllVariadicContext>(
574 ctx, std::forward<typename std::decay<Fs>::type>(fs)...);
575 return ctx->p.getFuture();
578 // collectAll (iterator)
580 template <class InputIterator>
583 Try<typename std::iterator_traits<InputIterator>::value_type::value_type>>>
584 collectAll(InputIterator first, InputIterator last) {
586 typename std::iterator_traits<InputIterator>::value_type::value_type T;
588 struct CollectAllContext {
589 CollectAllContext(int n) : results(n) {}
590 ~CollectAllContext() {
591 p.setValue(std::move(results));
593 Promise<std::vector<Try<T>>> p;
594 std::vector<Try<T>> results;
597 auto ctx = std::make_shared<CollectAllContext>(std::distance(first, last));
598 mapSetCallback<T>(first, last, [ctx](size_t i, Try<T>&& t) {
599 ctx->results[i] = std::move(t);
601 return ctx->p.getFuture();
604 // collect (iterator)
608 template <typename T>
609 struct CollectContext {
611 explicit Nothing(int /* n */) {}
614 using Result = typename std::conditional<
615 std::is_void<T>::value,
617 std::vector<T>>::type;
619 using InternalResult = typename std::conditional<
620 std::is_void<T>::value,
622 std::vector<Optional<T>>>::type;
624 explicit CollectContext(int n) : result(n) {}
626 if (!threw.exchange(true)) {
627 // map Optional<T> -> T
628 std::vector<T> finalResult;
629 finalResult.reserve(result.size());
630 std::transform(result.begin(), result.end(),
631 std::back_inserter(finalResult),
632 [](Optional<T>& o) { return std::move(o.value()); });
633 p.setValue(std::move(finalResult));
636 inline void setPartialResult(size_t i, Try<T>& t) {
637 result[i] = std::move(t.value());
640 InternalResult result;
641 std::atomic<bool> threw {false};
646 template <class InputIterator>
647 Future<typename detail::CollectContext<
648 typename std::iterator_traits<InputIterator>::value_type::value_type>::Result>
649 collect(InputIterator first, InputIterator last) {
651 typename std::iterator_traits<InputIterator>::value_type::value_type T;
653 auto ctx = std::make_shared<detail::CollectContext<T>>(
654 std::distance(first, last));
655 mapSetCallback<T>(first, last, [ctx](size_t i, Try<T>&& t) {
656 if (t.hasException()) {
657 if (!ctx->threw.exchange(true)) {
658 ctx->p.setException(std::move(t.exception()));
660 } else if (!ctx->threw) {
661 ctx->setPartialResult(i, t);
664 return ctx->p.getFuture();
667 // collect (variadic)
669 template <typename... Fs>
670 typename detail::CollectVariadicContext<
671 typename std::decay<Fs>::type::value_type...>::type
672 collect(Fs&&... fs) {
673 auto ctx = std::make_shared<detail::CollectVariadicContext<
674 typename std::decay<Fs>::type::value_type...>>();
675 detail::collectVariadicHelper<detail::CollectVariadicContext>(
676 ctx, std::forward<typename std::decay<Fs>::type>(fs)...);
677 return ctx->p.getFuture();
680 // collectAny (iterator)
682 template <class InputIterator>
687 std::iterator_traits<InputIterator>::value_type::value_type>>>
688 collectAny(InputIterator first, InputIterator last) {
690 typename std::iterator_traits<InputIterator>::value_type::value_type T;
692 struct CollectAnyContext {
693 CollectAnyContext() {}
694 Promise<std::pair<size_t, Try<T>>> p;
695 std::atomic<bool> done {false};
698 auto ctx = std::make_shared<CollectAnyContext>();
699 mapSetCallback<T>(first, last, [ctx](size_t i, Try<T>&& t) {
700 if (!ctx->done.exchange(true)) {
701 ctx->p.setValue(std::make_pair(i, std::move(t)));
704 return ctx->p.getFuture();
707 // collectAnyWithoutException (iterator)
709 template <class InputIterator>
712 typename std::iterator_traits<InputIterator>::value_type::value_type>>
713 collectAnyWithoutException(InputIterator first, InputIterator last) {
715 typename std::iterator_traits<InputIterator>::value_type::value_type T;
717 struct CollectAnyWithoutExceptionContext {
718 CollectAnyWithoutExceptionContext(){}
719 Promise<std::pair<size_t, T>> p;
720 std::atomic<bool> done{false};
721 std::atomic<size_t> nFulfilled{0};
725 auto ctx = std::make_shared<CollectAnyWithoutExceptionContext>();
726 ctx->nTotal = std::distance(first, last);
728 mapSetCallback<T>(first, last, [ctx](size_t i, Try<T>&& t) {
729 if (!t.hasException() && !ctx->done.exchange(true)) {
730 ctx->p.setValue(std::make_pair(i, std::move(t.value())));
731 } else if (++ctx->nFulfilled == ctx->nTotal) {
732 ctx->p.setException(t.exception());
735 return ctx->p.getFuture();
738 // collectN (iterator)
740 template <class InputIterator>
741 Future<std::vector<std::pair<size_t, Try<typename
742 std::iterator_traits<InputIterator>::value_type::value_type>>>>
743 collectN(InputIterator first, InputIterator last, size_t n) {
745 std::iterator_traits<InputIterator>::value_type::value_type T;
746 typedef std::vector<std::pair<size_t, Try<T>>> V;
748 struct CollectNContext {
750 std::atomic<size_t> completed = {0};
753 auto ctx = std::make_shared<CollectNContext>();
755 if (size_t(std::distance(first, last)) < n) {
756 ctx->p.setException(std::runtime_error("Not enough futures"));
758 // for each completed Future, increase count and add to vector, until we
759 // have n completed futures at which point we fulfil our Promise with the
761 mapSetCallback<T>(first, last, [ctx, n](size_t i, Try<T>&& t) {
762 auto c = ++ctx->completed;
764 assert(ctx->v.size() < n);
765 ctx->v.emplace_back(i, std::move(t));
767 ctx->p.setTry(Try<V>(std::move(ctx->v)));
773 return ctx->p.getFuture();
778 template <class It, class T, class F>
779 Future<T> reduce(It first, It last, T&& initial, F&& func) {
781 return makeFuture(std::move(initial));
784 typedef typename std::iterator_traits<It>::value_type::value_type ItT;
786 typename std::conditional<detail::callableWith<F, T&&, Try<ItT>&&>::value,
789 typedef isTry<Arg> IsTry;
791 auto sfunc = std::make_shared<F>(std::move(func));
793 auto f = first->then(
794 [ minitial = std::move(initial), sfunc ](Try<ItT> & head) mutable {
796 std::move(minitial), head.template get<IsTry::value, Arg&&>());
799 for (++first; first != last; ++first) {
800 f = collectAll(f, *first).then([sfunc](std::tuple<Try<T>, Try<ItT>>& t) {
801 return (*sfunc)(std::move(std::get<0>(t).value()),
802 // Either return a ItT&& or a Try<ItT>&& depending
803 // on the type of the argument of func.
804 std::get<1>(t).template get<IsTry::value, Arg&&>());
811 // window (collection)
813 template <class Collection, class F, class ItT, class Result>
814 std::vector<Future<Result>>
815 window(Collection input, F func, size_t n) {
816 struct WindowContext {
817 WindowContext(Collection&& i, F&& fn)
818 : input_(std::move(i)), promises_(input_.size()),
821 std::atomic<size_t> i_ {0};
823 std::vector<Promise<Result>> promises_;
826 static inline void spawn(const std::shared_ptr<WindowContext>& ctx) {
827 size_t i = ctx->i_++;
828 if (i < ctx->input_.size()) {
829 // Using setCallback_ directly since we don't need the Future
830 ctx->func_(std::move(ctx->input_[i])).setCallback_(
831 // ctx is captured by value
832 [ctx, i](Try<Result>&& t) {
833 ctx->promises_[i].setTry(std::move(t));
834 // Chain another future onto this one
835 spawn(std::move(ctx));
841 auto max = std::min(n, input.size());
843 auto ctx = std::make_shared<WindowContext>(
844 std::move(input), std::move(func));
846 for (size_t i = 0; i < max; ++i) {
847 // Start the first n Futures
848 WindowContext::spawn(ctx);
851 std::vector<Future<Result>> futures;
852 futures.reserve(ctx->promises_.size());
853 for (auto& promise : ctx->promises_) {
854 futures.emplace_back(promise.getFuture());
863 template <class I, class F>
864 Future<I> Future<T>::reduce(I&& initial, F&& func) {
866 minitial = std::forward<I>(initial),
867 mfunc = std::forward<F>(func)
869 auto ret = std::move(minitial);
870 for (auto& val : vals) {
871 ret = mfunc(std::move(ret), std::move(val));
877 // unorderedReduce (iterator)
879 template <class It, class T, class F, class ItT, class Arg>
880 Future<T> unorderedReduce(It first, It last, T initial, F func) {
882 return makeFuture(std::move(initial));
885 typedef isTry<Arg> IsTry;
887 struct UnorderedReduceContext {
888 UnorderedReduceContext(T&& memo, F&& fn, size_t n)
889 : lock_(), memo_(makeFuture<T>(std::move(memo))),
890 func_(std::move(fn)), numThens_(0), numFutures_(n), promise_()
892 folly::MicroSpinLock lock_; // protects memo_ and numThens_
895 size_t numThens_; // how many Futures completed and called .then()
896 size_t numFutures_; // how many Futures in total
900 auto ctx = std::make_shared<UnorderedReduceContext>(
901 std::move(initial), std::move(func), std::distance(first, last));
906 [ctx](size_t /* i */, Try<ItT>&& t) {
907 // Futures can be completed in any order, simultaneously.
908 // To make this non-blocking, we create a new Future chain in
909 // the order of completion to reduce the values.
910 // The spinlock just protects chaining a new Future, not actually
911 // executing the reduce, which should be really fast.
912 folly::MSLGuard lock(ctx->lock_);
914 ctx->memo_.then([ ctx, mt = std::move(t) ](T && v) mutable {
915 // Either return a ItT&& or a Try<ItT>&& depending
916 // on the type of the argument of func.
917 return ctx->func_(std::move(v),
918 mt.template get<IsTry::value, Arg&&>());
920 if (++ctx->numThens_ == ctx->numFutures_) {
921 // After reducing the value of the last Future, fulfill the Promise
922 ctx->memo_.setCallback_(
923 [ctx](Try<T>&& t2) { ctx->promise_.setValue(std::move(t2)); });
927 return ctx->promise_.getFuture();
933 Future<T> Future<T>::within(Duration dur, Timekeeper* tk) {
934 return within(dur, TimedOut(), tk);
939 Future<T> Future<T>::within(Duration dur, E e, Timekeeper* tk) {
942 Context(E ex) : exception(std::move(ex)), promise() {}
944 Future<Unit> thisFuture;
946 std::atomic<bool> token {false};
949 std::shared_ptr<Timekeeper> tks;
951 tks = folly::detail::getTimekeeperSingleton();
952 tk = DCHECK_NOTNULL(tks.get());
955 auto ctx = std::make_shared<Context>(std::move(e));
957 ctx->thisFuture = this->then([ctx](Try<T>&& t) mutable {
958 // TODO: "this" completed first, cancel "after"
959 if (ctx->token.exchange(true) == false) {
960 ctx->promise.setTry(std::move(t));
964 tk->after(dur).then([ctx](Try<Unit> const& t) mutable {
965 // "after" completed first, cancel "this"
966 ctx->thisFuture.raise(TimedOut());
967 if (ctx->token.exchange(true) == false) {
968 if (t.hasException()) {
969 ctx->promise.setException(std::move(t.exception()));
971 ctx->promise.setException(std::move(ctx->exception));
976 return ctx->promise.getFuture().via(getExecutor());
982 Future<T> Future<T>::delayed(Duration dur, Timekeeper* tk) {
983 return collectAll(*this, futures::sleep(dur, tk))
984 .then([](std::tuple<Try<T>, Try<Unit>> tup) {
985 Try<T>& t = std::get<0>(tup);
986 return makeFuture<T>(std::move(t));
993 void waitImpl(Future<T>& f) {
994 // short-circuit if there's nothing to do
995 if (f.isReady()) return;
997 FutureBatonType baton;
998 f.setCallback_([&](const Try<T>& /* t */) { baton.post(); });
1000 assert(f.isReady());
1004 void waitImpl(Future<T>& f, Duration dur) {
1005 // short-circuit if there's nothing to do
1011 auto ret = promise.getFuture();
1012 auto baton = std::make_shared<FutureBatonType>();
1013 f.setCallback_([ baton, promise = std::move(promise) ](Try<T> && t) mutable {
1014 promise.setTry(std::move(t));
1018 if (baton->timed_wait(dur)) {
1019 assert(f.isReady());
1024 void waitViaImpl(Future<T>& f, DrivableExecutor* e) {
1025 // Set callback so to ensure that the via executor has something on it
1026 // so that once the preceding future triggers this callback, drive will
1027 // always have a callback to satisfy it
1030 f = f.via(e).then([](T&& t) { return std::move(t); });
1031 while (!f.isReady()) {
1034 assert(f.isReady());
1040 Future<T>& Future<T>::wait() & {
1041 detail::waitImpl(*this);
1046 Future<T>&& Future<T>::wait() && {
1047 detail::waitImpl(*this);
1048 return std::move(*this);
1052 Future<T>& Future<T>::wait(Duration dur) & {
1053 detail::waitImpl(*this, dur);
1058 Future<T>&& Future<T>::wait(Duration dur) && {
1059 detail::waitImpl(*this, dur);
1060 return std::move(*this);
1064 Future<T>& Future<T>::waitVia(DrivableExecutor* e) & {
1065 detail::waitViaImpl(*this, e);
1070 Future<T>&& Future<T>::waitVia(DrivableExecutor* e) && {
1071 detail::waitViaImpl(*this, e);
1072 return std::move(*this);
1076 T Future<T>::get() {
1077 return std::move(wait().value());
1081 T Future<T>::get(Duration dur) {
1084 return std::move(value());
1091 T Future<T>::getVia(DrivableExecutor* e) {
1092 return std::move(waitVia(e).value());
1098 static bool equals(const Try<T>& t1, const Try<T>& t2) {
1099 return t1.value() == t2.value();
1105 Future<bool> Future<T>::willEqual(Future<T>& f) {
1106 return collectAll(*this, f).then([](const std::tuple<Try<T>, Try<T>>& t) {
1107 if (std::get<0>(t).hasValue() && std::get<1>(t).hasValue()) {
1108 return detail::TryEquals<T>::equals(std::get<0>(t), std::get<1>(t));
1117 Future<T> Future<T>::filter(F&& predicate) {
1118 return this->then([p = std::forward<F>(predicate)](T val) {
1119 T const& valConstRef = val;
1120 if (!p(valConstRef)) {
1121 throw PredicateDoesNotObtain();
1128 template <class Callback>
1129 auto Future<T>::thenMulti(Callback&& fn)
1130 -> decltype(this->then(std::forward<Callback>(fn))) {
1131 // thenMulti with one callback is just a then
1132 return then(std::forward<Callback>(fn));
1136 template <class Callback, class... Callbacks>
1137 auto Future<T>::thenMulti(Callback&& fn, Callbacks&&... fns)
1138 -> decltype(this->then(std::forward<Callback>(fn)).
1139 thenMulti(std::forward<Callbacks>(fns)...)) {
1140 // thenMulti with two callbacks is just then(a).thenMulti(b, ...)
1141 return then(std::forward<Callback>(fn)).
1142 thenMulti(std::forward<Callbacks>(fns)...);
1146 template <class Callback, class... Callbacks>
1147 auto Future<T>::thenMultiWithExecutor(Executor* x, Callback&& fn,
1149 -> decltype(this->then(std::forward<Callback>(fn)).
1150 thenMulti(std::forward<Callbacks>(fns)...)) {
1151 // thenMultiExecutor with two callbacks is
1152 // via(x).then(a).thenMulti(b, ...).via(oldX)
1153 auto oldX = getExecutor();
1155 return then(std::forward<Callback>(fn)).
1156 thenMulti(std::forward<Callbacks>(fns)...).via(oldX);
1160 template <class Callback>
1161 auto Future<T>::thenMultiWithExecutor(Executor* x, Callback&& fn)
1162 -> decltype(this->then(std::forward<Callback>(fn))) {
1163 // thenMulti with one callback is just a then with an executor
1164 return then(x, std::forward<Callback>(fn));
1168 inline Future<Unit> when(bool p, F&& thunk) {
1169 return p ? std::forward<F>(thunk)().unit() : makeFuture();
1172 template <class P, class F>
1173 Future<Unit> whileDo(P&& predicate, F&& thunk) {
1175 auto future = thunk();
1176 return future.then([
1177 predicate = std::forward<P>(predicate),
1178 thunk = std::forward<F>(thunk)
1180 return whileDo(std::forward<P>(predicate), std::forward<F>(thunk));
1183 return makeFuture();
1187 Future<Unit> times(const int n, F&& thunk) {
1188 return folly::whileDo(
1189 [ n, count = folly::make_unique<std::atomic<int>>(0) ]() mutable {
1190 return count->fetch_add(1) < n;
1192 std::forward<F>(thunk));
1196 template <class It, class F, class ItT, class Result>
1197 std::vector<Future<Result>> map(It first, It last, F func) {
1198 std::vector<Future<Result>> results;
1199 for (auto it = first; it != last; it++) {
1200 results.push_back(it->then(func));
1210 struct retrying_policy_raw_tag {};
1211 struct retrying_policy_fut_tag {};
1213 template <class Policy>
1214 struct retrying_policy_traits {
1215 using ew = exception_wrapper;
1216 FOLLY_CREATE_HAS_MEMBER_FN_TRAITS(has_op_call, operator());
1217 template <class Ret>
1218 using has_op = typename std::integral_constant<bool,
1219 has_op_call<Policy, Ret(size_t, const ew&)>::value ||
1220 has_op_call<Policy, Ret(size_t, const ew&) const>::value>;
1221 using is_raw = has_op<bool>;
1222 using is_fut = has_op<Future<bool>>;
1223 using tag = typename std::conditional<
1224 is_raw::value, retrying_policy_raw_tag, typename std::conditional<
1225 is_fut::value, retrying_policy_fut_tag, void>::type>::type;
1228 template <class Policy, class FF>
1229 typename std::result_of<FF(size_t)>::type
1230 retrying(size_t k, Policy&& p, FF&& ff) {
1231 using F = typename std::result_of<FF(size_t)>::type;
1232 using T = typename F::value_type;
1235 [ k, pm = std::forward<Policy>(p), ffm = std::forward<FF>(ff) ](
1236 exception_wrapper x) mutable {
1239 [ k, xm = std::move(x), pm = std::move(pm), ffm = std::move(ffm) ](
1241 return r ? retrying(k, std::move(pm), std::move(ffm))
1242 : makeFuture<T>(std::move(xm));
1247 template <class Policy, class FF>
1248 typename std::result_of<FF(size_t)>::type
1249 retrying(Policy&& p, FF&& ff, retrying_policy_raw_tag) {
1250 auto q = [pm = std::forward<Policy>(p)](size_t k, exception_wrapper x) {
1251 return makeFuture<bool>(pm(k, x));
1253 return retrying(0, std::move(q), std::forward<FF>(ff));
1256 template <class Policy, class FF>
1257 typename std::result_of<FF(size_t)>::type
1258 retrying(Policy&& p, FF&& ff, retrying_policy_fut_tag) {
1259 return retrying(0, std::forward<Policy>(p), std::forward<FF>(ff));
1262 // jittered exponential backoff, clamped to [backoff_min, backoff_max]
1263 template <class URNG>
1264 Duration retryingJitteredExponentialBackoffDur(
1266 Duration backoff_min,
1267 Duration backoff_max,
1268 double jitter_param,
1271 auto dist = std::normal_distribution<double>(0.0, jitter_param);
1272 auto jitter = std::exp(dist(rng));
1273 auto backoff = d(d::rep(jitter * backoff_min.count() * std::pow(2, n - 1)));
1274 return std::max(backoff_min, std::min(backoff_max, backoff));
1277 template <class Policy, class URNG>
1278 std::function<Future<bool>(size_t, const exception_wrapper&)>
1279 retryingPolicyCappedJitteredExponentialBackoff(
1281 Duration backoff_min,
1282 Duration backoff_max,
1283 double jitter_param,
1287 pm = std::forward<Policy>(p),
1292 rngp = std::forward<URNG>(rng)
1293 ](size_t n, const exception_wrapper& ex) mutable {
1294 if (n == max_tries) {
1295 return makeFuture(false);
1297 return pm(n, ex).then(
1298 [ n, backoff_min, backoff_max, jitter_param, rngp = std::move(rngp) ](
1301 return makeFuture(false);
1303 auto backoff = detail::retryingJitteredExponentialBackoffDur(
1304 n, backoff_min, backoff_max, jitter_param, rngp);
1305 return futures::sleep(backoff).then([] { return true; });
1310 template <class Policy, class URNG>
1311 std::function<Future<bool>(size_t, const exception_wrapper&)>
1312 retryingPolicyCappedJitteredExponentialBackoff(
1314 Duration backoff_min,
1315 Duration backoff_max,
1316 double jitter_param,
1319 retrying_policy_raw_tag) {
1320 auto q = [pm = std::forward<Policy>(p)](
1321 size_t n, const exception_wrapper& e) {
1322 return makeFuture(pm(n, e));
1324 return retryingPolicyCappedJitteredExponentialBackoff(
1329 std::forward<URNG>(rng),
1333 template <class Policy, class URNG>
1334 std::function<Future<bool>(size_t, const exception_wrapper&)>
1335 retryingPolicyCappedJitteredExponentialBackoff(
1337 Duration backoff_min,
1338 Duration backoff_max,
1339 double jitter_param,
1342 retrying_policy_fut_tag) {
1343 return retryingPolicyCappedJitteredExponentialBackoff(
1348 std::forward<URNG>(rng),
1349 std::forward<Policy>(p));
1353 template <class Policy, class FF>
1354 typename std::result_of<FF(size_t)>::type
1355 retrying(Policy&& p, FF&& ff) {
1356 using tag = typename detail::retrying_policy_traits<Policy>::tag;
1357 return detail::retrying(std::forward<Policy>(p), std::forward<FF>(ff), tag());
1361 std::function<bool(size_t, const exception_wrapper&)>
1362 retryingPolicyBasic(
1364 return [=](size_t n, const exception_wrapper&) { return n < max_tries; };
1367 template <class Policy, class URNG>
1368 std::function<Future<bool>(size_t, const exception_wrapper&)>
1369 retryingPolicyCappedJitteredExponentialBackoff(
1371 Duration backoff_min,
1372 Duration backoff_max,
1373 double jitter_param,
1376 using tag = typename detail::retrying_policy_traits<Policy>::tag;
1377 return detail::retryingPolicyCappedJitteredExponentialBackoff(
1382 std::forward<URNG>(rng),
1383 std::forward<Policy>(p),
1388 std::function<Future<bool>(size_t, const exception_wrapper&)>
1389 retryingPolicyCappedJitteredExponentialBackoff(
1391 Duration backoff_min,
1392 Duration backoff_max,
1393 double jitter_param) {
1394 auto p = [](size_t, const exception_wrapper&) { return true; };
1395 return retryingPolicyCappedJitteredExponentialBackoff(
1406 // Instantiate the most common Future types to save compile time
1407 extern template class Future<Unit>;
1408 extern template class Future<bool>;
1409 extern template class Future<int>;
1410 extern template class Future<int64_t>;
1411 extern template class Future<std::string>;
1412 extern template class Future<double>;
1414 } // namespace folly