2 * Copyright 2016 Facebook, Inc.
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
8 * http://www.apache.org/licenses/LICENSE-2.0
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
24 #include <folly/Baton.h>
25 #include <folly/Optional.h>
26 #include <folly/Random.h>
27 #include <folly/Traits.h>
28 #include <folly/futures/detail/Core.h>
29 #include <folly/futures/Timekeeper.h>
31 #if FOLLY_MOBILE || defined(__APPLE__)
32 #define FOLLY_FUTURE_USING_FIBER 0
34 #define FOLLY_FUTURE_USING_FIBER 1
35 #include <folly/fibers/Baton.h>
43 #if FOLLY_FUTURE_USING_FIBER
44 typedef folly::fibers::Baton FutureBatonType;
46 typedef folly::Baton<> FutureBatonType;
51 std::shared_ptr<Timekeeper> getTimekeeperSingleton();
55 Future<T>::Future(Future<T>&& other) noexcept : core_(other.core_) {
56 other.core_ = nullptr;
60 Future<T>& Future<T>::operator=(Future<T>&& other) noexcept {
61 std::swap(core_, other.core_);
66 template <typename U, typename>
67 Future<T>::Future(Future<U>&& other) noexcept
68 : core_(detail::Core<T>::convert(other.core_)) {
69 other.core_ = nullptr;
73 template <typename U, typename>
74 Future<T>& Future<T>::operator=(Future<U>&& other) noexcept {
75 std::swap(core_, detail::Core<T>::convert(other.core_));
80 template <class T2, typename>
81 Future<T>::Future(T2&& val)
82 : core_(new detail::Core<T>(Try<T>(std::forward<T2>(val)))) {}
85 template <typename T2>
86 Future<T>::Future(typename std::enable_if<std::is_same<Unit, T2>::value>::type*)
87 : core_(new detail::Core<T>(Try<T>(T()))) {}
90 Future<T>::~Future() {
95 void Future<T>::detach() {
97 core_->detachFuture();
103 void Future<T>::throwIfInvalid() const {
110 void Future<T>::setCallback_(F&& func) {
112 core_->setCallback(std::forward<F>(func));
119 typename std::enable_if<isFuture<F>::value,
120 Future<typename isFuture<T>::Inner>>::type
121 Future<T>::unwrap() {
122 return then([](Future<typename isFuture<T>::Inner> internal_future) {
123 return internal_future;
129 // Variant: returns a value
130 // e.g. f.then([](Try<T>&& t){ return t.value(); });
132 template <typename F, typename R, bool isTry, typename... Args>
133 typename std::enable_if<!R::ReturnsFuture::value, typename R::Return>::type
134 Future<T>::thenImplementation(F&& func, detail::argResult<isTry, F, Args...>) {
135 static_assert(sizeof...(Args) <= 1, "Then must take zero/one argument");
136 typedef typename R::ReturnsFuture::Inner B;
141 p.core_->setInterruptHandlerNoLock(core_->getInterruptHandler());
143 // grab the Future now before we lose our handle on the Promise
144 auto f = p.getFuture();
145 f.core_->setExecutorNoLock(getExecutor());
147 /* This is a bit tricky.
149 We can't just close over *this in case this Future gets moved. So we
150 make a new dummy Future. We could figure out something more
151 sophisticated that avoids making a new Future object when it can, as an
152 optimization. But this is correct.
154 core_ can't be moved, it is explicitly disallowed (as is copying). But
155 if there's ever a reason to allow it, this is one place that makes that
156 assumption and would need to be fixed. We use a standard shared pointer
157 for core_ (by copying it in), which means in essence obj holds a shared
158 pointer to itself. But this shouldn't leak because Promise will not
159 outlive the continuation, because Promise will setException() with a
160 broken Promise if it is destructed before completed. We could use a
161 weak pointer but it would have to be converted to a shared pointer when
162 func is executed (because the Future returned by func may possibly
163 persist beyond the callback, if it gets moved), and so it is an
164 optimization to just make it shared from the get-go.
166 Two subtle but important points about this design. detail::Core has no
167 back pointers to Future or Promise, so if Future or Promise get moved
168 (and they will be moved in performant code) we don't have to do
169 anything fancy. And because we store the continuation in the
170 detail::Core, not in the Future, we can execute the continuation even
171 after the Future has gone out of scope. This is an intentional design
172 decision. It is likely we will want to be able to cancel a continuation
173 in some circumstances, but I think it should be explicit not implicit
174 in the destruction of the Future used to create it.
176 setCallback_([ funcm = std::forward<F>(func), pm = std::move(p) ](
177 Try<T> && t) mutable {
178 if (!isTry && t.hasException()) {
179 pm.setException(std::move(t.exception()));
181 pm.setWith([&]() { return funcm(t.template get<isTry, Args>()...); });
188 // Variant: returns a Future
189 // e.g. f.then([](T&& t){ return makeFuture<T>(t); });
191 template <typename F, typename R, bool isTry, typename... Args>
192 typename std::enable_if<R::ReturnsFuture::value, typename R::Return>::type
193 Future<T>::thenImplementation(F&& func, detail::argResult<isTry, F, Args...>) {
194 static_assert(sizeof...(Args) <= 1, "Then must take zero/one argument");
195 typedef typename R::ReturnsFuture::Inner B;
200 p.core_->setInterruptHandlerNoLock(core_->getInterruptHandler());
202 // grab the Future now before we lose our handle on the Promise
203 auto f = p.getFuture();
204 f.core_->setExecutorNoLock(getExecutor());
206 setCallback_([ funcm = std::forward<F>(func), pm = std::move(p) ](
207 Try<T> && t) mutable {
208 if (!isTry && t.hasException()) {
209 pm.setException(std::move(t.exception()));
212 auto f2 = funcm(t.template get<isTry, Args>()...);
213 // that didn't throw, now we can steal p
214 f2.setCallback_([p = std::move(pm)](Try<B> && b) mutable {
215 p.setTry(std::move(b));
217 } catch (const std::exception& e) {
218 pm.setException(exception_wrapper(std::current_exception(), e));
220 pm.setException(exception_wrapper(std::current_exception()));
228 template <typename T>
229 template <typename R, typename Caller, typename... Args>
230 Future<typename isFuture<R>::Inner>
231 Future<T>::then(R(Caller::*func)(Args...), Caller *instance) {
232 typedef typename std::remove_cv<
233 typename std::remove_reference<
234 typename detail::ArgType<Args...>::FirstArg>::type>::type FirstArg;
235 return then([instance, func](Try<T>&& t){
236 return (instance->*func)(t.template get<isTry<FirstArg>::value, Args>()...);
241 template <class Executor, class Arg, class... Args>
242 auto Future<T>::then(Executor* x, Arg&& arg, Args&&... args)
243 -> decltype(this->then(std::forward<Arg>(arg),
244 std::forward<Args>(args)...))
246 auto oldX = getExecutor();
248 return this->then(std::forward<Arg>(arg), std::forward<Args>(args)...).
253 Future<Unit> Future<T>::then() {
254 return then([] () {});
257 // onError where the callback returns T
260 typename std::enable_if<
261 !detail::callableWith<F, exception_wrapper>::value &&
262 !detail::Extract<F>::ReturnsFuture::value,
264 Future<T>::onError(F&& func) {
265 typedef typename detail::Extract<F>::FirstArg Exn;
267 std::is_same<typename detail::Extract<F>::RawReturn, T>::value,
268 "Return type of onError callback must be T or Future<T>");
271 p.core_->setInterruptHandlerNoLock(core_->getInterruptHandler());
272 auto f = p.getFuture();
274 setCallback_([ funcm = std::forward<F>(func), pm = std::move(p) ](
275 Try<T> && t) mutable {
276 if (!t.template withException<Exn>(
277 [&](Exn& e) { pm.setWith([&] { return funcm(e); }); })) {
278 pm.setTry(std::move(t));
285 // onError where the callback returns Future<T>
288 typename std::enable_if<
289 !detail::callableWith<F, exception_wrapper>::value &&
290 detail::Extract<F>::ReturnsFuture::value,
292 Future<T>::onError(F&& func) {
294 std::is_same<typename detail::Extract<F>::Return, Future<T>>::value,
295 "Return type of onError callback must be T or Future<T>");
296 typedef typename detail::Extract<F>::FirstArg Exn;
299 auto f = p.getFuture();
301 setCallback_([ pm = std::move(p), funcm = std::forward<F>(func) ](
302 Try<T> && t) mutable {
303 if (!t.template withException<Exn>([&](Exn& e) {
306 f2.setCallback_([pm = std::move(pm)](Try<T> && t2) mutable {
307 pm.setTry(std::move(t2));
309 } catch (const std::exception& e2) {
310 pm.setException(exception_wrapper(std::current_exception(), e2));
312 pm.setException(exception_wrapper(std::current_exception()));
315 pm.setTry(std::move(t));
324 Future<T> Future<T>::ensure(F&& func) {
325 return this->then([funcw = std::forward<F>(func)](Try<T> && t) mutable {
327 return makeFuture(std::move(t));
333 Future<T> Future<T>::onTimeout(Duration dur, F&& func, Timekeeper* tk) {
334 return within(dur, tk).onError([funcw = std::forward<F>(func)](
335 TimedOut const&) { return funcw(); });
340 typename std::enable_if<detail::callableWith<F, exception_wrapper>::value &&
341 detail::Extract<F>::ReturnsFuture::value,
343 Future<T>::onError(F&& func) {
345 std::is_same<typename detail::Extract<F>::Return, Future<T>>::value,
346 "Return type of onError callback must be T or Future<T>");
349 auto f = p.getFuture();
351 [ pm = std::move(p), funcm = std::forward<F>(func) ](Try<T> t) mutable {
352 if (t.hasException()) {
354 auto f2 = funcm(std::move(t.exception()));
355 f2.setCallback_([pm = std::move(pm)](Try<T> t2) mutable {
356 pm.setTry(std::move(t2));
358 } catch (const std::exception& e2) {
359 pm.setException(exception_wrapper(std::current_exception(), e2));
361 pm.setException(exception_wrapper(std::current_exception()));
364 pm.setTry(std::move(t));
371 // onError(exception_wrapper) that returns T
374 typename std::enable_if<
375 detail::callableWith<F, exception_wrapper>::value &&
376 !detail::Extract<F>::ReturnsFuture::value,
378 Future<T>::onError(F&& func) {
380 std::is_same<typename detail::Extract<F>::Return, Future<T>>::value,
381 "Return type of onError callback must be T or Future<T>");
384 auto f = p.getFuture();
386 [ pm = std::move(p), funcm = std::forward<F>(func) ](Try<T> t) mutable {
387 if (t.hasException()) {
388 pm.setWith([&] { return funcm(std::move(t.exception())); });
390 pm.setTry(std::move(t));
398 typename std::add_lvalue_reference<T>::type Future<T>::value() {
401 return core_->getTry().value();
405 typename std::add_lvalue_reference<const T>::type Future<T>::value() const {
408 return core_->getTry().value();
412 Try<T>& Future<T>::getTry() {
415 return core_->getTry();
419 Optional<Try<T>> Future<T>::poll() {
421 if (core_->ready()) {
422 o = std::move(core_->getTry());
428 inline Future<T> Future<T>::via(Executor* executor, int8_t priority) && {
431 setExecutor(executor, priority);
433 return std::move(*this);
437 inline Future<T> Future<T>::via(Executor* executor, int8_t priority) & {
441 auto f = p.getFuture();
442 then([p = std::move(p)](Try<T> && t) mutable { p.setTry(std::move(t)); });
443 return std::move(f).via(executor, priority);
446 template <class Func>
447 auto via(Executor* x, Func&& func)
448 -> Future<typename isFuture<decltype(func())>::Inner>
450 // TODO make this actually more performant. :-P #7260175
451 return via(x).then(std::forward<Func>(func));
455 bool Future<T>::isReady() const {
457 return core_->ready();
461 bool Future<T>::hasValue() {
462 return getTry().hasValue();
466 bool Future<T>::hasException() {
467 return getTry().hasException();
471 void Future<T>::raise(exception_wrapper exception) {
472 core_->raise(std::move(exception));
478 Future<typename std::decay<T>::type> makeFuture(T&& t) {
479 return makeFuture(Try<typename std::decay<T>::type>(std::forward<T>(t)));
482 inline // for multiple translation units
483 Future<Unit> makeFuture() {
484 return makeFuture(Unit{});
487 // makeFutureWith(Future<T>()) -> Future<T>
489 typename std::enable_if<isFuture<typename std::result_of<F()>::type>::value,
490 typename std::result_of<F()>::type>::type
491 makeFutureWith(F&& func) {
493 typename isFuture<typename std::result_of<F()>::type>::Inner;
496 } catch (std::exception& e) {
497 return makeFuture<InnerType>(
498 exception_wrapper(std::current_exception(), e));
500 return makeFuture<InnerType>(exception_wrapper(std::current_exception()));
504 // makeFutureWith(T()) -> Future<T>
505 // makeFutureWith(void()) -> Future<Unit>
507 typename std::enable_if<
508 !(isFuture<typename std::result_of<F()>::type>::value),
509 Future<typename Unit::Lift<typename std::result_of<F()>::type>::type>>::type
510 makeFutureWith(F&& func) {
512 typename Unit::Lift<typename std::result_of<F()>::type>::type;
513 return makeFuture<LiftedResult>(makeTryWith([&func]() mutable {
519 Future<T> makeFuture(std::exception_ptr const& e) {
520 return makeFuture(Try<T>(e));
524 Future<T> makeFuture(exception_wrapper ew) {
525 return makeFuture(Try<T>(std::move(ew)));
528 template <class T, class E>
529 typename std::enable_if<std::is_base_of<std::exception, E>::value,
531 makeFuture(E const& e) {
532 return makeFuture(Try<T>(make_exception_wrapper<E>(e)));
536 Future<T> makeFuture(Try<T>&& t) {
537 return Future<T>(new detail::Core<T>(std::move(t)));
541 Future<Unit> via(Executor* executor, int8_t priority) {
542 return makeFuture().via(executor, priority);
545 // mapSetCallback calls func(i, Try<T>) when every future completes
547 template <class T, class InputIterator, class F>
548 void mapSetCallback(InputIterator first, InputIterator last, F func) {
549 for (size_t i = 0; first != last; ++first, ++i) {
550 first->setCallback_([func, i](Try<T>&& t) {
551 func(i, std::move(t));
556 // collectAll (variadic)
558 template <typename... Fs>
559 typename detail::CollectAllVariadicContext<
560 typename std::decay<Fs>::type::value_type...>::type
561 collectAll(Fs&&... fs) {
562 auto ctx = std::make_shared<detail::CollectAllVariadicContext<
563 typename std::decay<Fs>::type::value_type...>>();
564 detail::collectVariadicHelper<detail::CollectAllVariadicContext>(
565 ctx, std::forward<typename std::decay<Fs>::type>(fs)...);
566 return ctx->p.getFuture();
569 // collectAll (iterator)
571 template <class InputIterator>
574 Try<typename std::iterator_traits<InputIterator>::value_type::value_type>>>
575 collectAll(InputIterator first, InputIterator last) {
577 typename std::iterator_traits<InputIterator>::value_type::value_type T;
579 struct CollectAllContext {
580 CollectAllContext(int n) : results(n) {}
581 ~CollectAllContext() {
582 p.setValue(std::move(results));
584 Promise<std::vector<Try<T>>> p;
585 std::vector<Try<T>> results;
588 auto ctx = std::make_shared<CollectAllContext>(std::distance(first, last));
589 mapSetCallback<T>(first, last, [ctx](size_t i, Try<T>&& t) {
590 ctx->results[i] = std::move(t);
592 return ctx->p.getFuture();
595 // collect (iterator)
599 template <typename T>
600 struct CollectContext {
602 explicit Nothing(int /* n */) {}
605 using Result = typename std::conditional<
606 std::is_void<T>::value,
608 std::vector<T>>::type;
610 using InternalResult = typename std::conditional<
611 std::is_void<T>::value,
613 std::vector<Optional<T>>>::type;
615 explicit CollectContext(int n) : result(n) {}
617 if (!threw.exchange(true)) {
618 // map Optional<T> -> T
619 std::vector<T> finalResult;
620 finalResult.reserve(result.size());
621 std::transform(result.begin(), result.end(),
622 std::back_inserter(finalResult),
623 [](Optional<T>& o) { return std::move(o.value()); });
624 p.setValue(std::move(finalResult));
627 inline void setPartialResult(size_t i, Try<T>& t) {
628 result[i] = std::move(t.value());
631 InternalResult result;
632 std::atomic<bool> threw {false};
637 template <class InputIterator>
638 Future<typename detail::CollectContext<
639 typename std::iterator_traits<InputIterator>::value_type::value_type>::Result>
640 collect(InputIterator first, InputIterator last) {
642 typename std::iterator_traits<InputIterator>::value_type::value_type T;
644 auto ctx = std::make_shared<detail::CollectContext<T>>(
645 std::distance(first, last));
646 mapSetCallback<T>(first, last, [ctx](size_t i, Try<T>&& t) {
647 if (t.hasException()) {
648 if (!ctx->threw.exchange(true)) {
649 ctx->p.setException(std::move(t.exception()));
651 } else if (!ctx->threw) {
652 ctx->setPartialResult(i, t);
655 return ctx->p.getFuture();
658 // collect (variadic)
660 template <typename... Fs>
661 typename detail::CollectVariadicContext<
662 typename std::decay<Fs>::type::value_type...>::type
663 collect(Fs&&... fs) {
664 auto ctx = std::make_shared<detail::CollectVariadicContext<
665 typename std::decay<Fs>::type::value_type...>>();
666 detail::collectVariadicHelper<detail::CollectVariadicContext>(
667 ctx, std::forward<typename std::decay<Fs>::type>(fs)...);
668 return ctx->p.getFuture();
671 // collectAny (iterator)
673 template <class InputIterator>
678 std::iterator_traits<InputIterator>::value_type::value_type>>>
679 collectAny(InputIterator first, InputIterator last) {
681 typename std::iterator_traits<InputIterator>::value_type::value_type T;
683 struct CollectAnyContext {
684 CollectAnyContext() {};
685 Promise<std::pair<size_t, Try<T>>> p;
686 std::atomic<bool> done {false};
689 auto ctx = std::make_shared<CollectAnyContext>();
690 mapSetCallback<T>(first, last, [ctx](size_t i, Try<T>&& t) {
691 if (!ctx->done.exchange(true)) {
692 ctx->p.setValue(std::make_pair(i, std::move(t)));
695 return ctx->p.getFuture();
698 // collectAnyWithoutException (iterator)
700 template <class InputIterator>
703 typename std::iterator_traits<InputIterator>::value_type::value_type>>
704 collectAnyWithoutException(InputIterator first, InputIterator last) {
706 typename std::iterator_traits<InputIterator>::value_type::value_type T;
708 struct CollectAnyWithoutExceptionContext {
709 CollectAnyWithoutExceptionContext(){};
710 Promise<std::pair<size_t, T>> p;
711 std::atomic<bool> done{false};
712 std::atomic<size_t> nFulfilled{0};
716 auto ctx = std::make_shared<CollectAnyWithoutExceptionContext>();
717 ctx->nTotal = std::distance(first, last);
719 mapSetCallback<T>(first, last, [ctx](size_t i, Try<T>&& t) {
720 if (!t.hasException() && !ctx->done.exchange(true)) {
721 ctx->p.setValue(std::make_pair(i, std::move(t.value())));
722 } else if (++ctx->nFulfilled == ctx->nTotal) {
723 ctx->p.setException(t.exception());
726 return ctx->p.getFuture();
729 // collectN (iterator)
731 template <class InputIterator>
732 Future<std::vector<std::pair<size_t, Try<typename
733 std::iterator_traits<InputIterator>::value_type::value_type>>>>
734 collectN(InputIterator first, InputIterator last, size_t n) {
736 std::iterator_traits<InputIterator>::value_type::value_type T;
737 typedef std::vector<std::pair<size_t, Try<T>>> V;
739 struct CollectNContext {
741 std::atomic<size_t> completed = {0};
744 auto ctx = std::make_shared<CollectNContext>();
746 if (size_t(std::distance(first, last)) < n) {
747 ctx->p.setException(std::runtime_error("Not enough futures"));
749 // for each completed Future, increase count and add to vector, until we
750 // have n completed futures at which point we fulfil our Promise with the
752 mapSetCallback<T>(first, last, [ctx, n](size_t i, Try<T>&& t) {
753 auto c = ++ctx->completed;
755 assert(ctx->v.size() < n);
756 ctx->v.emplace_back(i, std::move(t));
758 ctx->p.setTry(Try<V>(std::move(ctx->v)));
764 return ctx->p.getFuture();
769 template <class It, class T, class F>
770 Future<T> reduce(It first, It last, T&& initial, F&& func) {
772 return makeFuture(std::move(initial));
775 typedef typename std::iterator_traits<It>::value_type::value_type ItT;
777 typename std::conditional<detail::callableWith<F, T&&, Try<ItT>&&>::value,
780 typedef isTry<Arg> IsTry;
782 auto sfunc = std::make_shared<F>(std::move(func));
784 auto f = first->then(
785 [ minitial = std::move(initial), sfunc ](Try<ItT> & head) mutable {
787 std::move(minitial), head.template get<IsTry::value, Arg&&>());
790 for (++first; first != last; ++first) {
791 f = collectAll(f, *first).then([sfunc](std::tuple<Try<T>, Try<ItT>>& t) {
792 return (*sfunc)(std::move(std::get<0>(t).value()),
793 // Either return a ItT&& or a Try<ItT>&& depending
794 // on the type of the argument of func.
795 std::get<1>(t).template get<IsTry::value, Arg&&>());
802 // window (collection)
804 template <class Collection, class F, class ItT, class Result>
805 std::vector<Future<Result>>
806 window(Collection input, F func, size_t n) {
807 struct WindowContext {
808 WindowContext(Collection&& i, F&& fn)
809 : input_(std::move(i)), promises_(input_.size()),
812 std::atomic<size_t> i_ {0};
814 std::vector<Promise<Result>> promises_;
817 static inline void spawn(const std::shared_ptr<WindowContext>& ctx) {
818 size_t i = ctx->i_++;
819 if (i < ctx->input_.size()) {
820 // Using setCallback_ directly since we don't need the Future
821 ctx->func_(std::move(ctx->input_[i])).setCallback_(
822 // ctx is captured by value
823 [ctx, i](Try<Result>&& t) {
824 ctx->promises_[i].setTry(std::move(t));
825 // Chain another future onto this one
826 spawn(std::move(ctx));
832 auto max = std::min(n, input.size());
834 auto ctx = std::make_shared<WindowContext>(
835 std::move(input), std::move(func));
837 for (size_t i = 0; i < max; ++i) {
838 // Start the first n Futures
839 WindowContext::spawn(ctx);
842 std::vector<Future<Result>> futures;
843 futures.reserve(ctx->promises_.size());
844 for (auto& promise : ctx->promises_) {
845 futures.emplace_back(promise.getFuture());
854 template <class I, class F>
855 Future<I> Future<T>::reduce(I&& initial, F&& func) {
857 minitial = std::forward<I>(initial),
858 mfunc = std::forward<F>(func)
860 auto ret = std::move(minitial);
861 for (auto& val : vals) {
862 ret = mfunc(std::move(ret), std::move(val));
868 // unorderedReduce (iterator)
870 template <class It, class T, class F, class ItT, class Arg>
871 Future<T> unorderedReduce(It first, It last, T initial, F func) {
873 return makeFuture(std::move(initial));
876 typedef isTry<Arg> IsTry;
878 struct UnorderedReduceContext {
879 UnorderedReduceContext(T&& memo, F&& fn, size_t n)
880 : lock_(), memo_(makeFuture<T>(std::move(memo))),
881 func_(std::move(fn)), numThens_(0), numFutures_(n), promise_()
883 folly::MicroSpinLock lock_; // protects memo_ and numThens_
886 size_t numThens_; // how many Futures completed and called .then()
887 size_t numFutures_; // how many Futures in total
891 auto ctx = std::make_shared<UnorderedReduceContext>(
892 std::move(initial), std::move(func), std::distance(first, last));
897 [ctx](size_t /* i */, Try<ItT>&& t) {
898 // Futures can be completed in any order, simultaneously.
899 // To make this non-blocking, we create a new Future chain in
900 // the order of completion to reduce the values.
901 // The spinlock just protects chaining a new Future, not actually
902 // executing the reduce, which should be really fast.
903 folly::MSLGuard lock(ctx->lock_);
905 ctx->memo_.then([ ctx, mt = std::move(t) ](T && v) mutable {
906 // Either return a ItT&& or a Try<ItT>&& depending
907 // on the type of the argument of func.
908 return ctx->func_(std::move(v),
909 mt.template get<IsTry::value, Arg&&>());
911 if (++ctx->numThens_ == ctx->numFutures_) {
912 // After reducing the value of the last Future, fulfill the Promise
913 ctx->memo_.setCallback_(
914 [ctx](Try<T>&& t2) { ctx->promise_.setValue(std::move(t2)); });
918 return ctx->promise_.getFuture();
924 Future<T> Future<T>::within(Duration dur, Timekeeper* tk) {
925 return within(dur, TimedOut(), tk);
930 Future<T> Future<T>::within(Duration dur, E e, Timekeeper* tk) {
933 Context(E ex) : exception(std::move(ex)), promise() {}
935 Future<Unit> thisFuture;
937 std::atomic<bool> token {false};
940 std::shared_ptr<Timekeeper> tks;
942 tks = folly::detail::getTimekeeperSingleton();
943 tk = DCHECK_NOTNULL(tks.get());
946 auto ctx = std::make_shared<Context>(std::move(e));
948 ctx->thisFuture = this->then([ctx](Try<T>&& t) mutable {
949 // TODO: "this" completed first, cancel "after"
950 if (ctx->token.exchange(true) == false) {
951 ctx->promise.setTry(std::move(t));
955 tk->after(dur).then([ctx](Try<Unit> const& t) mutable {
956 // "after" completed first, cancel "this"
957 ctx->thisFuture.raise(TimedOut());
958 if (ctx->token.exchange(true) == false) {
959 if (t.hasException()) {
960 ctx->promise.setException(std::move(t.exception()));
962 ctx->promise.setException(std::move(ctx->exception));
967 return ctx->promise.getFuture().via(getExecutor());
973 Future<T> Future<T>::delayed(Duration dur, Timekeeper* tk) {
974 return collectAll(*this, futures::sleep(dur, tk))
975 .then([](std::tuple<Try<T>, Try<Unit>> tup) {
976 Try<T>& t = std::get<0>(tup);
977 return makeFuture<T>(std::move(t));
985 void waitImpl(Future<T>& f) {
986 // short-circuit if there's nothing to do
987 if (f.isReady()) return;
989 FutureBatonType baton;
990 f.setCallback_([&](const Try<T>& /* t */) { baton.post(); });
996 void waitImpl(Future<T>& f, Duration dur) {
997 // short-circuit if there's nothing to do
1003 auto ret = promise.getFuture();
1004 auto baton = std::make_shared<FutureBatonType>();
1005 f.setCallback_([ baton, promise = std::move(promise) ](Try<T> && t) mutable {
1006 promise.setTry(std::move(t));
1010 if (baton->timed_wait(dur)) {
1011 assert(f.isReady());
1016 void waitViaImpl(Future<T>& f, DrivableExecutor* e) {
1017 // Set callback so to ensure that the via executor has something on it
1018 // so that once the preceding future triggers this callback, drive will
1019 // always have a callback to satisfy it
1022 f = f.via(e).then([](T&& t) { return std::move(t); });
1023 while (!f.isReady()) {
1026 assert(f.isReady());
1032 Future<T>& Future<T>::wait() & {
1033 detail::waitImpl(*this);
1038 Future<T>&& Future<T>::wait() && {
1039 detail::waitImpl(*this);
1040 return std::move(*this);
1044 Future<T>& Future<T>::wait(Duration dur) & {
1045 detail::waitImpl(*this, dur);
1050 Future<T>&& Future<T>::wait(Duration dur) && {
1051 detail::waitImpl(*this, dur);
1052 return std::move(*this);
1056 Future<T>& Future<T>::waitVia(DrivableExecutor* e) & {
1057 detail::waitViaImpl(*this, e);
1062 Future<T>&& Future<T>::waitVia(DrivableExecutor* e) && {
1063 detail::waitViaImpl(*this, e);
1064 return std::move(*this);
1068 T Future<T>::get() {
1069 return std::move(wait().value());
1073 T Future<T>::get(Duration dur) {
1076 return std::move(value());
1083 T Future<T>::getVia(DrivableExecutor* e) {
1084 return std::move(waitVia(e).value());
1090 static bool equals(const Try<T>& t1, const Try<T>& t2) {
1091 return t1.value() == t2.value();
1097 Future<bool> Future<T>::willEqual(Future<T>& f) {
1098 return collectAll(*this, f).then([](const std::tuple<Try<T>, Try<T>>& t) {
1099 if (std::get<0>(t).hasValue() && std::get<1>(t).hasValue()) {
1100 return detail::TryEquals<T>::equals(std::get<0>(t), std::get<1>(t));
1109 Future<T> Future<T>::filter(F&& predicate) {
1110 return this->then([p = std::forward<F>(predicate)](T val) {
1111 T const& valConstRef = val;
1112 if (!p(valConstRef)) {
1113 throw PredicateDoesNotObtain();
1120 template <class Callback>
1121 auto Future<T>::thenMulti(Callback&& fn)
1122 -> decltype(this->then(std::forward<Callback>(fn))) {
1123 // thenMulti with one callback is just a then
1124 return then(std::forward<Callback>(fn));
1128 template <class Callback, class... Callbacks>
1129 auto Future<T>::thenMulti(Callback&& fn, Callbacks&&... fns)
1130 -> decltype(this->then(std::forward<Callback>(fn)).
1131 thenMulti(std::forward<Callbacks>(fns)...)) {
1132 // thenMulti with two callbacks is just then(a).thenMulti(b, ...)
1133 return then(std::forward<Callback>(fn)).
1134 thenMulti(std::forward<Callbacks>(fns)...);
1138 template <class Callback, class... Callbacks>
1139 auto Future<T>::thenMultiWithExecutor(Executor* x, Callback&& fn,
1141 -> decltype(this->then(std::forward<Callback>(fn)).
1142 thenMulti(std::forward<Callbacks>(fns)...)) {
1143 // thenMultiExecutor with two callbacks is
1144 // via(x).then(a).thenMulti(b, ...).via(oldX)
1145 auto oldX = getExecutor();
1147 return then(std::forward<Callback>(fn)).
1148 thenMulti(std::forward<Callbacks>(fns)...).via(oldX);
1152 template <class Callback>
1153 auto Future<T>::thenMultiWithExecutor(Executor* x, Callback&& fn)
1154 -> decltype(this->then(std::forward<Callback>(fn))) {
1155 // thenMulti with one callback is just a then with an executor
1156 return then(x, std::forward<Callback>(fn));
1160 inline Future<Unit> when(bool p, F&& thunk) {
1161 return p ? std::forward<F>(thunk)().unit() : makeFuture();
1164 template <class P, class F>
1165 Future<Unit> whileDo(P&& predicate, F&& thunk) {
1167 auto future = thunk();
1168 return future.then([
1169 predicate = std::forward<P>(predicate),
1170 thunk = std::forward<F>(thunk)
1172 return whileDo(std::forward<P>(predicate), std::forward<F>(thunk));
1175 return makeFuture();
1179 Future<Unit> times(const int n, F&& thunk) {
1180 return folly::whileDo(
1181 [ n, count = folly::make_unique<std::atomic<int>>(0) ]() mutable {
1182 return count->fetch_add(1) < n;
1184 std::forward<F>(thunk));
1188 template <class It, class F, class ItT, class Result>
1189 std::vector<Future<Result>> map(It first, It last, F func) {
1190 std::vector<Future<Result>> results;
1191 for (auto it = first; it != last; it++) {
1192 results.push_back(it->then(func));
1202 struct retrying_policy_raw_tag {};
1203 struct retrying_policy_fut_tag {};
1205 template <class Policy>
1206 struct retrying_policy_traits {
1207 using ew = exception_wrapper;
1208 FOLLY_CREATE_HAS_MEMBER_FN_TRAITS(has_op_call, operator());
1209 template <class Ret>
1210 using has_op = typename std::integral_constant<bool,
1211 has_op_call<Policy, Ret(size_t, const ew&)>::value ||
1212 has_op_call<Policy, Ret(size_t, const ew&) const>::value>;
1213 using is_raw = has_op<bool>;
1214 using is_fut = has_op<Future<bool>>;
1215 using tag = typename std::conditional<
1216 is_raw::value, retrying_policy_raw_tag, typename std::conditional<
1217 is_fut::value, retrying_policy_fut_tag, void>::type>::type;
1220 template <class Policy, class FF>
1221 typename std::result_of<FF(size_t)>::type
1222 retrying(size_t k, Policy&& p, FF&& ff) {
1223 using F = typename std::result_of<FF(size_t)>::type;
1224 using T = typename F::value_type;
1227 [ k, pm = std::forward<Policy>(p), ffm = std::forward<FF>(ff) ](
1228 exception_wrapper x) mutable {
1231 [ k, xm = std::move(x), pm = std::move(pm), ffm = std::move(ffm) ](
1233 return r ? retrying(k, std::move(pm), std::move(ffm))
1234 : makeFuture<T>(std::move(xm));
1239 template <class Policy, class FF>
1240 typename std::result_of<FF(size_t)>::type
1241 retrying(Policy&& p, FF&& ff, retrying_policy_raw_tag) {
1242 auto q = [pm = std::forward<Policy>(p)](size_t k, exception_wrapper x) {
1243 return makeFuture<bool>(pm(k, x));
1245 return retrying(0, std::move(q), std::forward<FF>(ff));
1248 template <class Policy, class FF>
1249 typename std::result_of<FF(size_t)>::type
1250 retrying(Policy&& p, FF&& ff, retrying_policy_fut_tag) {
1251 return retrying(0, std::forward<Policy>(p), std::forward<FF>(ff));
1254 // jittered exponential backoff, clamped to [backoff_min, backoff_max]
1255 template <class URNG>
1256 Duration retryingJitteredExponentialBackoffDur(
1258 Duration backoff_min,
1259 Duration backoff_max,
1260 double jitter_param,
1263 auto dist = std::normal_distribution<double>(0.0, jitter_param);
1264 auto jitter = std::exp(dist(rng));
1265 auto backoff = d(d::rep(jitter * backoff_min.count() * std::pow(2, n - 1)));
1266 return std::max(backoff_min, std::min(backoff_max, backoff));
1269 template <class Policy, class URNG>
1270 std::function<Future<bool>(size_t, const exception_wrapper&)>
1271 retryingPolicyCappedJitteredExponentialBackoff(
1273 Duration backoff_min,
1274 Duration backoff_max,
1275 double jitter_param,
1279 pm = std::forward<Policy>(p),
1284 rngp = std::forward<URNG>(rng)
1285 ](size_t n, const exception_wrapper& ex) mutable {
1286 if (n == max_tries) {
1287 return makeFuture(false);
1289 return pm(n, ex).then(
1290 [ n, backoff_min, backoff_max, jitter_param, rngp = std::move(rngp) ](
1293 return makeFuture(false);
1295 auto backoff = detail::retryingJitteredExponentialBackoffDur(
1296 n, backoff_min, backoff_max, jitter_param, rngp);
1297 return futures::sleep(backoff).then([] { return true; });
1302 template <class Policy, class URNG>
1303 std::function<Future<bool>(size_t, const exception_wrapper&)>
1304 retryingPolicyCappedJitteredExponentialBackoff(
1306 Duration backoff_min,
1307 Duration backoff_max,
1308 double jitter_param,
1311 retrying_policy_raw_tag) {
1312 auto q = [pm = std::forward<Policy>(p)](
1313 size_t n, const exception_wrapper& e) {
1314 return makeFuture(pm(n, e));
1316 return retryingPolicyCappedJitteredExponentialBackoff(
1321 std::forward<URNG>(rng),
1325 template <class Policy, class URNG>
1326 std::function<Future<bool>(size_t, const exception_wrapper&)>
1327 retryingPolicyCappedJitteredExponentialBackoff(
1329 Duration backoff_min,
1330 Duration backoff_max,
1331 double jitter_param,
1334 retrying_policy_fut_tag) {
1335 return retryingPolicyCappedJitteredExponentialBackoff(
1340 std::forward<URNG>(rng),
1341 std::forward<Policy>(p));
1345 template <class Policy, class FF>
1346 typename std::result_of<FF(size_t)>::type
1347 retrying(Policy&& p, FF&& ff) {
1348 using tag = typename detail::retrying_policy_traits<Policy>::tag;
1349 return detail::retrying(std::forward<Policy>(p), std::forward<FF>(ff), tag());
1353 std::function<bool(size_t, const exception_wrapper&)>
1354 retryingPolicyBasic(
1356 return [=](size_t n, const exception_wrapper&) { return n < max_tries; };
1359 template <class Policy, class URNG>
1360 std::function<Future<bool>(size_t, const exception_wrapper&)>
1361 retryingPolicyCappedJitteredExponentialBackoff(
1363 Duration backoff_min,
1364 Duration backoff_max,
1365 double jitter_param,
1368 using tag = typename detail::retrying_policy_traits<Policy>::tag;
1369 return detail::retryingPolicyCappedJitteredExponentialBackoff(
1374 std::forward<URNG>(rng),
1375 std::forward<Policy>(p),
1380 std::function<Future<bool>(size_t, const exception_wrapper&)>
1381 retryingPolicyCappedJitteredExponentialBackoff(
1383 Duration backoff_min,
1384 Duration backoff_max,
1385 double jitter_param) {
1386 auto p = [](size_t, const exception_wrapper&) { return true; };
1387 return retryingPolicyCappedJitteredExponentialBackoff(
1398 // Instantiate the most common Future types to save compile time
1399 extern template class Future<Unit>;
1400 extern template class Future<bool>;
1401 extern template class Future<int>;
1402 extern template class Future<int64_t>;
1403 extern template class Future<std::string>;
1404 extern template class Future<double>;
1406 } // namespace folly