2 * Copyright 2016 Facebook, Inc.
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
8 * http://www.apache.org/licenses/LICENSE-2.0
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
24 #include <folly/Baton.h>
25 #include <folly/Optional.h>
26 #include <folly/Random.h>
27 #include <folly/Traits.h>
28 #include <folly/futures/detail/Core.h>
29 #include <folly/futures/Timekeeper.h>
31 #if FOLLY_MOBILE || defined(__APPLE__)
32 #define FOLLY_FUTURE_USING_FIBER 0
34 #define FOLLY_FUTURE_USING_FIBER 1
35 #include <folly/fibers/Baton.h>
43 #if FOLLY_FUTURE_USING_FIBER
44 typedef folly::fibers::Baton FutureBatonType;
46 typedef folly::Baton<> FutureBatonType;
51 std::shared_ptr<Timekeeper> getTimekeeperSingleton();
55 Future<T>::Future(Future<T>&& other) noexcept : core_(other.core_) {
56 other.core_ = nullptr;
60 Future<T>& Future<T>::operator=(Future<T>&& other) noexcept {
61 std::swap(core_, other.core_);
66 template <typename U, typename>
67 Future<T>::Future(Future<U>&& other) noexcept
68 : core_(detail::Core<T>::convert(other.core_)) {
69 other.core_ = nullptr;
73 template <typename U, typename>
74 Future<T>& Future<T>::operator=(Future<U>&& other) noexcept {
75 std::swap(core_, detail::Core<T>::convert(other.core_));
80 template <class T2, typename>
81 Future<T>::Future(T2&& val)
82 : core_(new detail::Core<T>(Try<T>(std::forward<T2>(val)))) {}
85 template <typename T2>
86 Future<T>::Future(typename std::enable_if<std::is_same<Unit, T2>::value>::type*)
87 : core_(new detail::Core<T>(Try<T>(T()))) {}
90 Future<T>::~Future() {
95 void Future<T>::detach() {
97 core_->detachFuture();
103 void Future<T>::throwIfInvalid() const {
110 void Future<T>::setCallback_(F&& func) {
112 core_->setCallback(std::forward<F>(func));
119 typename std::enable_if<isFuture<F>::value,
120 Future<typename isFuture<T>::Inner>>::type
121 Future<T>::unwrap() {
122 return then([](Future<typename isFuture<T>::Inner> internal_future) {
123 return internal_future;
129 // Variant: returns a value
130 // e.g. f.then([](Try<T>&& t){ return t.value(); });
132 template <typename F, typename R, bool isTry, typename... Args>
133 typename std::enable_if<!R::ReturnsFuture::value, typename R::Return>::type
134 Future<T>::thenImplementation(F&& func, detail::argResult<isTry, F, Args...>) {
135 static_assert(sizeof...(Args) <= 1, "Then must take zero/one argument");
136 typedef typename R::ReturnsFuture::Inner B;
141 p.core_->setInterruptHandlerNoLock(core_->getInterruptHandler());
143 // grab the Future now before we lose our handle on the Promise
144 auto f = p.getFuture();
145 f.core_->setExecutorNoLock(getExecutor());
147 /* This is a bit tricky.
149 We can't just close over *this in case this Future gets moved. So we
150 make a new dummy Future. We could figure out something more
151 sophisticated that avoids making a new Future object when it can, as an
152 optimization. But this is correct.
154 core_ can't be moved, it is explicitly disallowed (as is copying). But
155 if there's ever a reason to allow it, this is one place that makes that
156 assumption and would need to be fixed. We use a standard shared pointer
157 for core_ (by copying it in), which means in essence obj holds a shared
158 pointer to itself. But this shouldn't leak because Promise will not
159 outlive the continuation, because Promise will setException() with a
160 broken Promise if it is destructed before completed. We could use a
161 weak pointer but it would have to be converted to a shared pointer when
162 func is executed (because the Future returned by func may possibly
163 persist beyond the callback, if it gets moved), and so it is an
164 optimization to just make it shared from the get-go.
166 Two subtle but important points about this design. detail::Core has no
167 back pointers to Future or Promise, so if Future or Promise get moved
168 (and they will be moved in performant code) we don't have to do
169 anything fancy. And because we store the continuation in the
170 detail::Core, not in the Future, we can execute the continuation even
171 after the Future has gone out of scope. This is an intentional design
172 decision. It is likely we will want to be able to cancel a continuation
173 in some circumstances, but I think it should be explicit not implicit
174 in the destruction of the Future used to create it.
176 setCallback_([ funcm = std::forward<F>(func), pm = std::move(p) ](
177 Try<T> && t) mutable {
178 if (!isTry && t.hasException()) {
179 pm.setException(std::move(t.exception()));
181 pm.setWith([&]() { return funcm(t.template get<isTry, Args>()...); });
188 // Variant: returns a Future
189 // e.g. f.then([](T&& t){ return makeFuture<T>(t); });
191 template <typename F, typename R, bool isTry, typename... Args>
192 typename std::enable_if<R::ReturnsFuture::value, typename R::Return>::type
193 Future<T>::thenImplementation(F&& func, detail::argResult<isTry, F, Args...>) {
194 static_assert(sizeof...(Args) <= 1, "Then must take zero/one argument");
195 typedef typename R::ReturnsFuture::Inner B;
200 p.core_->setInterruptHandlerNoLock(core_->getInterruptHandler());
202 // grab the Future now before we lose our handle on the Promise
203 auto f = p.getFuture();
204 f.core_->setExecutorNoLock(getExecutor());
206 setCallback_([ funcm = std::forward<F>(func), pm = std::move(p) ](
207 Try<T> && t) mutable {
208 if (!isTry && t.hasException()) {
209 pm.setException(std::move(t.exception()));
212 auto f2 = funcm(t.template get<isTry, Args>()...);
213 // that didn't throw, now we can steal p
214 f2.setCallback_([p = std::move(pm)](Try<B> && b) mutable {
215 p.setTry(std::move(b));
217 } catch (const std::exception& e) {
218 pm.setException(exception_wrapper(std::current_exception(), e));
220 pm.setException(exception_wrapper(std::current_exception()));
228 template <typename T>
229 template <typename R, typename Caller, typename... Args>
230 Future<typename isFuture<R>::Inner>
231 Future<T>::then(R(Caller::*func)(Args...), Caller *instance) {
232 typedef typename std::remove_cv<
233 typename std::remove_reference<
234 typename detail::ArgType<Args...>::FirstArg>::type>::type FirstArg;
235 return then([instance, func](Try<T>&& t){
236 return (instance->*func)(t.template get<isTry<FirstArg>::value, Args>()...);
241 template <class Executor, class Arg, class... Args>
242 auto Future<T>::then(Executor* x, Arg&& arg, Args&&... args)
243 -> decltype(this->then(std::forward<Arg>(arg),
244 std::forward<Args>(args)...))
246 auto oldX = getExecutor();
248 return this->then(std::forward<Arg>(arg), std::forward<Args>(args)...).
253 Future<Unit> Future<T>::then() {
254 return then([] () {});
257 // onError where the callback returns T
260 typename std::enable_if<
261 !detail::callableWith<F, exception_wrapper>::value &&
262 !detail::Extract<F>::ReturnsFuture::value,
264 Future<T>::onError(F&& func) {
265 typedef typename detail::Extract<F>::FirstArg Exn;
267 std::is_same<typename detail::Extract<F>::RawReturn, T>::value,
268 "Return type of onError callback must be T or Future<T>");
271 p.core_->setInterruptHandlerNoLock(core_->getInterruptHandler());
272 auto f = p.getFuture();
274 setCallback_([ funcm = std::forward<F>(func), pm = std::move(p) ](
275 Try<T> && t) mutable {
276 if (!t.template withException<Exn>(
277 [&](Exn& e) { pm.setWith([&] { return funcm(e); }); })) {
278 pm.setTry(std::move(t));
285 // onError where the callback returns Future<T>
288 typename std::enable_if<
289 !detail::callableWith<F, exception_wrapper>::value &&
290 detail::Extract<F>::ReturnsFuture::value,
292 Future<T>::onError(F&& func) {
294 std::is_same<typename detail::Extract<F>::Return, Future<T>>::value,
295 "Return type of onError callback must be T or Future<T>");
296 typedef typename detail::Extract<F>::FirstArg Exn;
299 auto f = p.getFuture();
301 setCallback_([ pm = std::move(p), funcm = std::forward<F>(func) ](
302 Try<T> && t) mutable {
303 if (!t.template withException<Exn>([&](Exn& e) {
306 f2.setCallback_([pm = std::move(pm)](Try<T> && t2) mutable {
307 pm.setTry(std::move(t2));
309 } catch (const std::exception& e2) {
310 pm.setException(exception_wrapper(std::current_exception(), e2));
312 pm.setException(exception_wrapper(std::current_exception()));
315 pm.setTry(std::move(t));
324 Future<T> Future<T>::ensure(F&& func) {
325 return this->then([funcw = std::forward<F>(func)](Try<T> && t) mutable {
327 return makeFuture(std::move(t));
333 Future<T> Future<T>::onTimeout(Duration dur, F&& func, Timekeeper* tk) {
334 return within(dur, tk).onError([funcw = std::forward<F>(func)](
335 TimedOut const&) { return funcw(); });
340 typename std::enable_if<detail::callableWith<F, exception_wrapper>::value &&
341 detail::Extract<F>::ReturnsFuture::value,
343 Future<T>::onError(F&& func) {
345 std::is_same<typename detail::Extract<F>::Return, Future<T>>::value,
346 "Return type of onError callback must be T or Future<T>");
349 auto f = p.getFuture();
351 [ pm = std::move(p), funcm = std::forward<F>(func) ](Try<T> t) mutable {
352 if (t.hasException()) {
354 auto f2 = funcm(std::move(t.exception()));
355 f2.setCallback_([pm = std::move(pm)](Try<T> t2) mutable {
356 pm.setTry(std::move(t2));
358 } catch (const std::exception& e2) {
359 pm.setException(exception_wrapper(std::current_exception(), e2));
361 pm.setException(exception_wrapper(std::current_exception()));
364 pm.setTry(std::move(t));
371 // onError(exception_wrapper) that returns T
374 typename std::enable_if<
375 detail::callableWith<F, exception_wrapper>::value &&
376 !detail::Extract<F>::ReturnsFuture::value,
378 Future<T>::onError(F&& func) {
380 std::is_same<typename detail::Extract<F>::Return, Future<T>>::value,
381 "Return type of onError callback must be T or Future<T>");
384 auto f = p.getFuture();
386 [ pm = std::move(p), funcm = std::forward<F>(func) ](Try<T> t) mutable {
387 if (t.hasException()) {
388 pm.setWith([&] { return funcm(std::move(t.exception())); });
390 pm.setTry(std::move(t));
398 typename std::add_lvalue_reference<T>::type Future<T>::value() {
401 return core_->getTry().value();
405 typename std::add_lvalue_reference<const T>::type Future<T>::value() const {
408 return core_->getTry().value();
412 Try<T>& Future<T>::getTry() {
415 return core_->getTry();
419 Try<T>& Future<T>::getTryVia(DrivableExecutor* e) {
420 return waitVia(e).getTry();
424 Optional<Try<T>> Future<T>::poll() {
426 if (core_->ready()) {
427 o = std::move(core_->getTry());
433 inline Future<T> Future<T>::via(Executor* executor, int8_t priority) && {
436 setExecutor(executor, priority);
438 return std::move(*this);
442 inline Future<T> Future<T>::via(Executor* executor, int8_t priority) & {
446 auto f = p.getFuture();
447 then([p = std::move(p)](Try<T> && t) mutable { p.setTry(std::move(t)); });
448 return std::move(f).via(executor, priority);
451 template <class Func>
452 auto via(Executor* x, Func&& func)
453 -> Future<typename isFuture<decltype(func())>::Inner>
455 // TODO make this actually more performant. :-P #7260175
456 return via(x).then(std::forward<Func>(func));
460 bool Future<T>::isReady() const {
462 return core_->ready();
466 bool Future<T>::hasValue() {
467 return getTry().hasValue();
471 bool Future<T>::hasException() {
472 return getTry().hasException();
476 void Future<T>::raise(exception_wrapper exception) {
477 core_->raise(std::move(exception));
483 Future<typename std::decay<T>::type> makeFuture(T&& t) {
484 return makeFuture(Try<typename std::decay<T>::type>(std::forward<T>(t)));
487 inline // for multiple translation units
488 Future<Unit> makeFuture() {
489 return makeFuture(Unit{});
492 // makeFutureWith(Future<T>()) -> Future<T>
494 typename std::enable_if<isFuture<typename std::result_of<F()>::type>::value,
495 typename std::result_of<F()>::type>::type
496 makeFutureWith(F&& func) {
498 typename isFuture<typename std::result_of<F()>::type>::Inner;
501 } catch (std::exception& e) {
502 return makeFuture<InnerType>(
503 exception_wrapper(std::current_exception(), e));
505 return makeFuture<InnerType>(exception_wrapper(std::current_exception()));
509 // makeFutureWith(T()) -> Future<T>
510 // makeFutureWith(void()) -> Future<Unit>
512 typename std::enable_if<
513 !(isFuture<typename std::result_of<F()>::type>::value),
514 Future<typename Unit::Lift<typename std::result_of<F()>::type>::type>>::type
515 makeFutureWith(F&& func) {
517 typename Unit::Lift<typename std::result_of<F()>::type>::type;
518 return makeFuture<LiftedResult>(makeTryWith([&func]() mutable {
524 Future<T> makeFuture(std::exception_ptr const& e) {
525 return makeFuture(Try<T>(e));
529 Future<T> makeFuture(exception_wrapper ew) {
530 return makeFuture(Try<T>(std::move(ew)));
533 template <class T, class E>
534 typename std::enable_if<std::is_base_of<std::exception, E>::value,
536 makeFuture(E const& e) {
537 return makeFuture(Try<T>(make_exception_wrapper<E>(e)));
541 Future<T> makeFuture(Try<T>&& t) {
542 return Future<T>(new detail::Core<T>(std::move(t)));
546 Future<Unit> via(Executor* executor, int8_t priority) {
547 return makeFuture().via(executor, priority);
550 // mapSetCallback calls func(i, Try<T>) when every future completes
552 template <class T, class InputIterator, class F>
553 void mapSetCallback(InputIterator first, InputIterator last, F func) {
554 for (size_t i = 0; first != last; ++first, ++i) {
555 first->setCallback_([func, i](Try<T>&& t) {
556 func(i, std::move(t));
561 // collectAll (variadic)
563 template <typename... Fs>
564 typename detail::CollectAllVariadicContext<
565 typename std::decay<Fs>::type::value_type...>::type
566 collectAll(Fs&&... fs) {
567 auto ctx = std::make_shared<detail::CollectAllVariadicContext<
568 typename std::decay<Fs>::type::value_type...>>();
569 detail::collectVariadicHelper<detail::CollectAllVariadicContext>(
570 ctx, std::forward<typename std::decay<Fs>::type>(fs)...);
571 return ctx->p.getFuture();
574 // collectAll (iterator)
576 template <class InputIterator>
579 Try<typename std::iterator_traits<InputIterator>::value_type::value_type>>>
580 collectAll(InputIterator first, InputIterator last) {
582 typename std::iterator_traits<InputIterator>::value_type::value_type T;
584 struct CollectAllContext {
585 CollectAllContext(int n) : results(n) {}
586 ~CollectAllContext() {
587 p.setValue(std::move(results));
589 Promise<std::vector<Try<T>>> p;
590 std::vector<Try<T>> results;
593 auto ctx = std::make_shared<CollectAllContext>(std::distance(first, last));
594 mapSetCallback<T>(first, last, [ctx](size_t i, Try<T>&& t) {
595 ctx->results[i] = std::move(t);
597 return ctx->p.getFuture();
600 // collect (iterator)
604 template <typename T>
605 struct CollectContext {
607 explicit Nothing(int /* n */) {}
610 using Result = typename std::conditional<
611 std::is_void<T>::value,
613 std::vector<T>>::type;
615 using InternalResult = typename std::conditional<
616 std::is_void<T>::value,
618 std::vector<Optional<T>>>::type;
620 explicit CollectContext(int n) : result(n) {}
622 if (!threw.exchange(true)) {
623 // map Optional<T> -> T
624 std::vector<T> finalResult;
625 finalResult.reserve(result.size());
626 std::transform(result.begin(), result.end(),
627 std::back_inserter(finalResult),
628 [](Optional<T>& o) { return std::move(o.value()); });
629 p.setValue(std::move(finalResult));
632 inline void setPartialResult(size_t i, Try<T>& t) {
633 result[i] = std::move(t.value());
636 InternalResult result;
637 std::atomic<bool> threw {false};
642 template <class InputIterator>
643 Future<typename detail::CollectContext<
644 typename std::iterator_traits<InputIterator>::value_type::value_type>::Result>
645 collect(InputIterator first, InputIterator last) {
647 typename std::iterator_traits<InputIterator>::value_type::value_type T;
649 auto ctx = std::make_shared<detail::CollectContext<T>>(
650 std::distance(first, last));
651 mapSetCallback<T>(first, last, [ctx](size_t i, Try<T>&& t) {
652 if (t.hasException()) {
653 if (!ctx->threw.exchange(true)) {
654 ctx->p.setException(std::move(t.exception()));
656 } else if (!ctx->threw) {
657 ctx->setPartialResult(i, t);
660 return ctx->p.getFuture();
663 // collect (variadic)
665 template <typename... Fs>
666 typename detail::CollectVariadicContext<
667 typename std::decay<Fs>::type::value_type...>::type
668 collect(Fs&&... fs) {
669 auto ctx = std::make_shared<detail::CollectVariadicContext<
670 typename std::decay<Fs>::type::value_type...>>();
671 detail::collectVariadicHelper<detail::CollectVariadicContext>(
672 ctx, std::forward<typename std::decay<Fs>::type>(fs)...);
673 return ctx->p.getFuture();
676 // collectAny (iterator)
678 template <class InputIterator>
683 std::iterator_traits<InputIterator>::value_type::value_type>>>
684 collectAny(InputIterator first, InputIterator last) {
686 typename std::iterator_traits<InputIterator>::value_type::value_type T;
688 struct CollectAnyContext {
689 CollectAnyContext() {}
690 Promise<std::pair<size_t, Try<T>>> p;
691 std::atomic<bool> done {false};
694 auto ctx = std::make_shared<CollectAnyContext>();
695 mapSetCallback<T>(first, last, [ctx](size_t i, Try<T>&& t) {
696 if (!ctx->done.exchange(true)) {
697 ctx->p.setValue(std::make_pair(i, std::move(t)));
700 return ctx->p.getFuture();
703 // collectAnyWithoutException (iterator)
705 template <class InputIterator>
708 typename std::iterator_traits<InputIterator>::value_type::value_type>>
709 collectAnyWithoutException(InputIterator first, InputIterator last) {
711 typename std::iterator_traits<InputIterator>::value_type::value_type T;
713 struct CollectAnyWithoutExceptionContext {
714 CollectAnyWithoutExceptionContext(){}
715 Promise<std::pair<size_t, T>> p;
716 std::atomic<bool> done{false};
717 std::atomic<size_t> nFulfilled{0};
721 auto ctx = std::make_shared<CollectAnyWithoutExceptionContext>();
722 ctx->nTotal = std::distance(first, last);
724 mapSetCallback<T>(first, last, [ctx](size_t i, Try<T>&& t) {
725 if (!t.hasException() && !ctx->done.exchange(true)) {
726 ctx->p.setValue(std::make_pair(i, std::move(t.value())));
727 } else if (++ctx->nFulfilled == ctx->nTotal) {
728 ctx->p.setException(t.exception());
731 return ctx->p.getFuture();
734 // collectN (iterator)
736 template <class InputIterator>
737 Future<std::vector<std::pair<size_t, Try<typename
738 std::iterator_traits<InputIterator>::value_type::value_type>>>>
739 collectN(InputIterator first, InputIterator last, size_t n) {
741 std::iterator_traits<InputIterator>::value_type::value_type T;
742 typedef std::vector<std::pair<size_t, Try<T>>> V;
744 struct CollectNContext {
746 std::atomic<size_t> completed = {0};
749 auto ctx = std::make_shared<CollectNContext>();
751 if (size_t(std::distance(first, last)) < n) {
752 ctx->p.setException(std::runtime_error("Not enough futures"));
754 // for each completed Future, increase count and add to vector, until we
755 // have n completed futures at which point we fulfil our Promise with the
757 mapSetCallback<T>(first, last, [ctx, n](size_t i, Try<T>&& t) {
758 auto c = ++ctx->completed;
760 assert(ctx->v.size() < n);
761 ctx->v.emplace_back(i, std::move(t));
763 ctx->p.setTry(Try<V>(std::move(ctx->v)));
769 return ctx->p.getFuture();
774 template <class It, class T, class F>
775 Future<T> reduce(It first, It last, T&& initial, F&& func) {
777 return makeFuture(std::move(initial));
780 typedef typename std::iterator_traits<It>::value_type::value_type ItT;
782 typename std::conditional<detail::callableWith<F, T&&, Try<ItT>&&>::value,
785 typedef isTry<Arg> IsTry;
787 auto sfunc = std::make_shared<F>(std::move(func));
789 auto f = first->then(
790 [ minitial = std::move(initial), sfunc ](Try<ItT> & head) mutable {
792 std::move(minitial), head.template get<IsTry::value, Arg&&>());
795 for (++first; first != last; ++first) {
796 f = collectAll(f, *first).then([sfunc](std::tuple<Try<T>, Try<ItT>>& t) {
797 return (*sfunc)(std::move(std::get<0>(t).value()),
798 // Either return a ItT&& or a Try<ItT>&& depending
799 // on the type of the argument of func.
800 std::get<1>(t).template get<IsTry::value, Arg&&>());
807 // window (collection)
809 template <class Collection, class F, class ItT, class Result>
810 std::vector<Future<Result>>
811 window(Collection input, F func, size_t n) {
812 struct WindowContext {
813 WindowContext(Collection&& i, F&& fn)
814 : input_(std::move(i)), promises_(input_.size()),
817 std::atomic<size_t> i_ {0};
819 std::vector<Promise<Result>> promises_;
822 static inline void spawn(const std::shared_ptr<WindowContext>& ctx) {
823 size_t i = ctx->i_++;
824 if (i < ctx->input_.size()) {
825 // Using setCallback_ directly since we don't need the Future
826 ctx->func_(std::move(ctx->input_[i])).setCallback_(
827 // ctx is captured by value
828 [ctx, i](Try<Result>&& t) {
829 ctx->promises_[i].setTry(std::move(t));
830 // Chain another future onto this one
831 spawn(std::move(ctx));
837 auto max = std::min(n, input.size());
839 auto ctx = std::make_shared<WindowContext>(
840 std::move(input), std::move(func));
842 for (size_t i = 0; i < max; ++i) {
843 // Start the first n Futures
844 WindowContext::spawn(ctx);
847 std::vector<Future<Result>> futures;
848 futures.reserve(ctx->promises_.size());
849 for (auto& promise : ctx->promises_) {
850 futures.emplace_back(promise.getFuture());
859 template <class I, class F>
860 Future<I> Future<T>::reduce(I&& initial, F&& func) {
862 minitial = std::forward<I>(initial),
863 mfunc = std::forward<F>(func)
865 auto ret = std::move(minitial);
866 for (auto& val : vals) {
867 ret = mfunc(std::move(ret), std::move(val));
873 // unorderedReduce (iterator)
875 template <class It, class T, class F, class ItT, class Arg>
876 Future<T> unorderedReduce(It first, It last, T initial, F func) {
878 return makeFuture(std::move(initial));
881 typedef isTry<Arg> IsTry;
883 struct UnorderedReduceContext {
884 UnorderedReduceContext(T&& memo, F&& fn, size_t n)
885 : lock_(), memo_(makeFuture<T>(std::move(memo))),
886 func_(std::move(fn)), numThens_(0), numFutures_(n), promise_()
888 folly::MicroSpinLock lock_; // protects memo_ and numThens_
891 size_t numThens_; // how many Futures completed and called .then()
892 size_t numFutures_; // how many Futures in total
896 auto ctx = std::make_shared<UnorderedReduceContext>(
897 std::move(initial), std::move(func), std::distance(first, last));
902 [ctx](size_t /* i */, Try<ItT>&& t) {
903 // Futures can be completed in any order, simultaneously.
904 // To make this non-blocking, we create a new Future chain in
905 // the order of completion to reduce the values.
906 // The spinlock just protects chaining a new Future, not actually
907 // executing the reduce, which should be really fast.
908 folly::MSLGuard lock(ctx->lock_);
910 ctx->memo_.then([ ctx, mt = std::move(t) ](T && v) mutable {
911 // Either return a ItT&& or a Try<ItT>&& depending
912 // on the type of the argument of func.
913 return ctx->func_(std::move(v),
914 mt.template get<IsTry::value, Arg&&>());
916 if (++ctx->numThens_ == ctx->numFutures_) {
917 // After reducing the value of the last Future, fulfill the Promise
918 ctx->memo_.setCallback_(
919 [ctx](Try<T>&& t2) { ctx->promise_.setValue(std::move(t2)); });
923 return ctx->promise_.getFuture();
929 Future<T> Future<T>::within(Duration dur, Timekeeper* tk) {
930 return within(dur, TimedOut(), tk);
935 Future<T> Future<T>::within(Duration dur, E e, Timekeeper* tk) {
938 Context(E ex) : exception(std::move(ex)), promise() {}
940 Future<Unit> thisFuture;
942 std::atomic<bool> token {false};
945 std::shared_ptr<Timekeeper> tks;
947 tks = folly::detail::getTimekeeperSingleton();
948 tk = DCHECK_NOTNULL(tks.get());
951 auto ctx = std::make_shared<Context>(std::move(e));
953 ctx->thisFuture = this->then([ctx](Try<T>&& t) mutable {
954 // TODO: "this" completed first, cancel "after"
955 if (ctx->token.exchange(true) == false) {
956 ctx->promise.setTry(std::move(t));
960 tk->after(dur).then([ctx](Try<Unit> const& t) mutable {
961 // "after" completed first, cancel "this"
962 ctx->thisFuture.raise(TimedOut());
963 if (ctx->token.exchange(true) == false) {
964 if (t.hasException()) {
965 ctx->promise.setException(std::move(t.exception()));
967 ctx->promise.setException(std::move(ctx->exception));
972 return ctx->promise.getFuture().via(getExecutor());
978 Future<T> Future<T>::delayed(Duration dur, Timekeeper* tk) {
979 return collectAll(*this, futures::sleep(dur, tk))
980 .then([](std::tuple<Try<T>, Try<Unit>> tup) {
981 Try<T>& t = std::get<0>(tup);
982 return makeFuture<T>(std::move(t));
989 void waitImpl(Future<T>& f) {
990 // short-circuit if there's nothing to do
991 if (f.isReady()) return;
993 FutureBatonType baton;
994 f.setCallback_([&](const Try<T>& /* t */) { baton.post(); });
1000 void waitImpl(Future<T>& f, Duration dur) {
1001 // short-circuit if there's nothing to do
1007 auto ret = promise.getFuture();
1008 auto baton = std::make_shared<FutureBatonType>();
1009 f.setCallback_([ baton, promise = std::move(promise) ](Try<T> && t) mutable {
1010 promise.setTry(std::move(t));
1014 if (baton->timed_wait(dur)) {
1015 assert(f.isReady());
1020 void waitViaImpl(Future<T>& f, DrivableExecutor* e) {
1021 // Set callback so to ensure that the via executor has something on it
1022 // so that once the preceding future triggers this callback, drive will
1023 // always have a callback to satisfy it
1026 f = f.via(e).then([](T&& t) { return std::move(t); });
1027 while (!f.isReady()) {
1030 assert(f.isReady());
1036 Future<T>& Future<T>::wait() & {
1037 detail::waitImpl(*this);
1042 Future<T>&& Future<T>::wait() && {
1043 detail::waitImpl(*this);
1044 return std::move(*this);
1048 Future<T>& Future<T>::wait(Duration dur) & {
1049 detail::waitImpl(*this, dur);
1054 Future<T>&& Future<T>::wait(Duration dur) && {
1055 detail::waitImpl(*this, dur);
1056 return std::move(*this);
1060 Future<T>& Future<T>::waitVia(DrivableExecutor* e) & {
1061 detail::waitViaImpl(*this, e);
1066 Future<T>&& Future<T>::waitVia(DrivableExecutor* e) && {
1067 detail::waitViaImpl(*this, e);
1068 return std::move(*this);
1072 T Future<T>::get() {
1073 return std::move(wait().value());
1077 T Future<T>::get(Duration dur) {
1080 return std::move(value());
1087 T Future<T>::getVia(DrivableExecutor* e) {
1088 return std::move(waitVia(e).value());
1094 static bool equals(const Try<T>& t1, const Try<T>& t2) {
1095 return t1.value() == t2.value();
1101 Future<bool> Future<T>::willEqual(Future<T>& f) {
1102 return collectAll(*this, f).then([](const std::tuple<Try<T>, Try<T>>& t) {
1103 if (std::get<0>(t).hasValue() && std::get<1>(t).hasValue()) {
1104 return detail::TryEquals<T>::equals(std::get<0>(t), std::get<1>(t));
1113 Future<T> Future<T>::filter(F&& predicate) {
1114 return this->then([p = std::forward<F>(predicate)](T val) {
1115 T const& valConstRef = val;
1116 if (!p(valConstRef)) {
1117 throw PredicateDoesNotObtain();
1124 template <class Callback>
1125 auto Future<T>::thenMulti(Callback&& fn)
1126 -> decltype(this->then(std::forward<Callback>(fn))) {
1127 // thenMulti with one callback is just a then
1128 return then(std::forward<Callback>(fn));
1132 template <class Callback, class... Callbacks>
1133 auto Future<T>::thenMulti(Callback&& fn, Callbacks&&... fns)
1134 -> decltype(this->then(std::forward<Callback>(fn)).
1135 thenMulti(std::forward<Callbacks>(fns)...)) {
1136 // thenMulti with two callbacks is just then(a).thenMulti(b, ...)
1137 return then(std::forward<Callback>(fn)).
1138 thenMulti(std::forward<Callbacks>(fns)...);
1142 template <class Callback, class... Callbacks>
1143 auto Future<T>::thenMultiWithExecutor(Executor* x, Callback&& fn,
1145 -> decltype(this->then(std::forward<Callback>(fn)).
1146 thenMulti(std::forward<Callbacks>(fns)...)) {
1147 // thenMultiExecutor with two callbacks is
1148 // via(x).then(a).thenMulti(b, ...).via(oldX)
1149 auto oldX = getExecutor();
1151 return then(std::forward<Callback>(fn)).
1152 thenMulti(std::forward<Callbacks>(fns)...).via(oldX);
1156 template <class Callback>
1157 auto Future<T>::thenMultiWithExecutor(Executor* x, Callback&& fn)
1158 -> decltype(this->then(std::forward<Callback>(fn))) {
1159 // thenMulti with one callback is just a then with an executor
1160 return then(x, std::forward<Callback>(fn));
1164 inline Future<Unit> when(bool p, F&& thunk) {
1165 return p ? std::forward<F>(thunk)().unit() : makeFuture();
1168 template <class P, class F>
1169 Future<Unit> whileDo(P&& predicate, F&& thunk) {
1171 auto future = thunk();
1172 return future.then([
1173 predicate = std::forward<P>(predicate),
1174 thunk = std::forward<F>(thunk)
1176 return whileDo(std::forward<P>(predicate), std::forward<F>(thunk));
1179 return makeFuture();
1183 Future<Unit> times(const int n, F&& thunk) {
1184 return folly::whileDo(
1185 [ n, count = folly::make_unique<std::atomic<int>>(0) ]() mutable {
1186 return count->fetch_add(1) < n;
1188 std::forward<F>(thunk));
1192 template <class It, class F, class ItT, class Result>
1193 std::vector<Future<Result>> map(It first, It last, F func) {
1194 std::vector<Future<Result>> results;
1195 for (auto it = first; it != last; it++) {
1196 results.push_back(it->then(func));
1206 struct retrying_policy_raw_tag {};
1207 struct retrying_policy_fut_tag {};
1209 template <class Policy>
1210 struct retrying_policy_traits {
1211 using ew = exception_wrapper;
1212 FOLLY_CREATE_HAS_MEMBER_FN_TRAITS(has_op_call, operator());
1213 template <class Ret>
1214 using has_op = typename std::integral_constant<bool,
1215 has_op_call<Policy, Ret(size_t, const ew&)>::value ||
1216 has_op_call<Policy, Ret(size_t, const ew&) const>::value>;
1217 using is_raw = has_op<bool>;
1218 using is_fut = has_op<Future<bool>>;
1219 using tag = typename std::conditional<
1220 is_raw::value, retrying_policy_raw_tag, typename std::conditional<
1221 is_fut::value, retrying_policy_fut_tag, void>::type>::type;
1224 template <class Policy, class FF>
1225 typename std::result_of<FF(size_t)>::type
1226 retrying(size_t k, Policy&& p, FF&& ff) {
1227 using F = typename std::result_of<FF(size_t)>::type;
1228 using T = typename F::value_type;
1231 [ k, pm = std::forward<Policy>(p), ffm = std::forward<FF>(ff) ](
1232 exception_wrapper x) mutable {
1235 [ k, xm = std::move(x), pm = std::move(pm), ffm = std::move(ffm) ](
1237 return r ? retrying(k, std::move(pm), std::move(ffm))
1238 : makeFuture<T>(std::move(xm));
1243 template <class Policy, class FF>
1244 typename std::result_of<FF(size_t)>::type
1245 retrying(Policy&& p, FF&& ff, retrying_policy_raw_tag) {
1246 auto q = [pm = std::forward<Policy>(p)](size_t k, exception_wrapper x) {
1247 return makeFuture<bool>(pm(k, x));
1249 return retrying(0, std::move(q), std::forward<FF>(ff));
1252 template <class Policy, class FF>
1253 typename std::result_of<FF(size_t)>::type
1254 retrying(Policy&& p, FF&& ff, retrying_policy_fut_tag) {
1255 return retrying(0, std::forward<Policy>(p), std::forward<FF>(ff));
1258 // jittered exponential backoff, clamped to [backoff_min, backoff_max]
1259 template <class URNG>
1260 Duration retryingJitteredExponentialBackoffDur(
1262 Duration backoff_min,
1263 Duration backoff_max,
1264 double jitter_param,
1267 auto dist = std::normal_distribution<double>(0.0, jitter_param);
1268 auto jitter = std::exp(dist(rng));
1269 auto backoff = d(d::rep(jitter * backoff_min.count() * std::pow(2, n - 1)));
1270 return std::max(backoff_min, std::min(backoff_max, backoff));
1273 template <class Policy, class URNG>
1274 std::function<Future<bool>(size_t, const exception_wrapper&)>
1275 retryingPolicyCappedJitteredExponentialBackoff(
1277 Duration backoff_min,
1278 Duration backoff_max,
1279 double jitter_param,
1283 pm = std::forward<Policy>(p),
1288 rngp = std::forward<URNG>(rng)
1289 ](size_t n, const exception_wrapper& ex) mutable {
1290 if (n == max_tries) {
1291 return makeFuture(false);
1293 return pm(n, ex).then(
1294 [ n, backoff_min, backoff_max, jitter_param, rngp = std::move(rngp) ](
1297 return makeFuture(false);
1299 auto backoff = detail::retryingJitteredExponentialBackoffDur(
1300 n, backoff_min, backoff_max, jitter_param, rngp);
1301 return futures::sleep(backoff).then([] { return true; });
1306 template <class Policy, class URNG>
1307 std::function<Future<bool>(size_t, const exception_wrapper&)>
1308 retryingPolicyCappedJitteredExponentialBackoff(
1310 Duration backoff_min,
1311 Duration backoff_max,
1312 double jitter_param,
1315 retrying_policy_raw_tag) {
1316 auto q = [pm = std::forward<Policy>(p)](
1317 size_t n, const exception_wrapper& e) {
1318 return makeFuture(pm(n, e));
1320 return retryingPolicyCappedJitteredExponentialBackoff(
1325 std::forward<URNG>(rng),
1329 template <class Policy, class URNG>
1330 std::function<Future<bool>(size_t, const exception_wrapper&)>
1331 retryingPolicyCappedJitteredExponentialBackoff(
1333 Duration backoff_min,
1334 Duration backoff_max,
1335 double jitter_param,
1338 retrying_policy_fut_tag) {
1339 return retryingPolicyCappedJitteredExponentialBackoff(
1344 std::forward<URNG>(rng),
1345 std::forward<Policy>(p));
1349 template <class Policy, class FF>
1350 typename std::result_of<FF(size_t)>::type
1351 retrying(Policy&& p, FF&& ff) {
1352 using tag = typename detail::retrying_policy_traits<Policy>::tag;
1353 return detail::retrying(std::forward<Policy>(p), std::forward<FF>(ff), tag());
1357 std::function<bool(size_t, const exception_wrapper&)>
1358 retryingPolicyBasic(
1360 return [=](size_t n, const exception_wrapper&) { return n < max_tries; };
1363 template <class Policy, class URNG>
1364 std::function<Future<bool>(size_t, const exception_wrapper&)>
1365 retryingPolicyCappedJitteredExponentialBackoff(
1367 Duration backoff_min,
1368 Duration backoff_max,
1369 double jitter_param,
1372 using tag = typename detail::retrying_policy_traits<Policy>::tag;
1373 return detail::retryingPolicyCappedJitteredExponentialBackoff(
1378 std::forward<URNG>(rng),
1379 std::forward<Policy>(p),
1384 std::function<Future<bool>(size_t, const exception_wrapper&)>
1385 retryingPolicyCappedJitteredExponentialBackoff(
1387 Duration backoff_min,
1388 Duration backoff_max,
1389 double jitter_param) {
1390 auto p = [](size_t, const exception_wrapper&) { return true; };
1391 return retryingPolicyCappedJitteredExponentialBackoff(
1402 // Instantiate the most common Future types to save compile time
1403 extern template class Future<Unit>;
1404 extern template class Future<bool>;
1405 extern template class Future<int>;
1406 extern template class Future<int64_t>;
1407 extern template class Future<std::string>;
1408 extern template class Future<double>;
1410 } // namespace folly