2 * Copyright 2016 Facebook, Inc.
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
8 * http://www.apache.org/licenses/LICENSE-2.0
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
24 #include <folly/Baton.h>
25 #include <folly/Optional.h>
26 #include <folly/Random.h>
27 #include <folly/Traits.h>
28 #include <folly/futures/detail/Core.h>
29 #include <folly/futures/Timekeeper.h>
31 #if FOLLY_MOBILE || defined(__APPLE__)
32 #define FOLLY_FUTURE_USING_FIBER 0
34 #define FOLLY_FUTURE_USING_FIBER 1
35 #include <folly/fibers/Baton.h>
43 #if FOLLY_FUTURE_USING_FIBER
44 typedef folly::fibers::Baton FutureBatonType;
46 typedef folly::Baton<> FutureBatonType;
51 std::shared_ptr<Timekeeper> getTimekeeperSingleton();
55 Future<T>::Future(Future<T>&& other) noexcept : core_(other.core_) {
56 other.core_ = nullptr;
60 Future<T>& Future<T>::operator=(Future<T>&& other) noexcept {
61 std::swap(core_, other.core_);
66 template <typename U, typename>
67 Future<T>::Future(Future<U>&& other) noexcept
68 : core_(detail::Core<T>::convert(other.core_)) {
69 other.core_ = nullptr;
73 template <typename U, typename>
74 Future<T>& Future<T>::operator=(Future<U>&& other) noexcept {
75 std::swap(core_, detail::Core<T>::convert(other.core_));
80 template <class T2, typename>
81 Future<T>::Future(T2&& val)
82 : core_(new detail::Core<T>(Try<T>(std::forward<T2>(val)))) {}
85 template <typename T2>
86 Future<T>::Future(typename std::enable_if<std::is_same<Unit, T2>::value>::type*)
87 : core_(new detail::Core<T>(Try<T>(T()))) {}
90 Future<T>::~Future() {
95 void Future<T>::detach() {
97 core_->detachFuture();
103 void Future<T>::throwIfInvalid() const {
110 void Future<T>::setCallback_(F&& func) {
112 core_->setCallback(std::forward<F>(func));
119 typename std::enable_if<isFuture<F>::value,
120 Future<typename isFuture<T>::Inner>>::type
121 Future<T>::unwrap() {
122 return then([](Future<typename isFuture<T>::Inner> internal_future) {
123 return internal_future;
129 // Variant: returns a value
130 // e.g. f.then([](Try<T>&& t){ return t.value(); });
132 template <typename F, typename R, bool isTry, typename... Args>
133 typename std::enable_if<!R::ReturnsFuture::value, typename R::Return>::type
134 Future<T>::thenImplementation(F&& func, detail::argResult<isTry, F, Args...>) {
135 static_assert(sizeof...(Args) <= 1, "Then must take zero/one argument");
136 typedef typename R::ReturnsFuture::Inner B;
141 p.core_->setInterruptHandlerNoLock(core_->getInterruptHandler());
143 // grab the Future now before we lose our handle on the Promise
144 auto f = p.getFuture();
145 f.core_->setExecutorNoLock(getExecutor());
147 /* This is a bit tricky.
149 We can't just close over *this in case this Future gets moved. So we
150 make a new dummy Future. We could figure out something more
151 sophisticated that avoids making a new Future object when it can, as an
152 optimization. But this is correct.
154 core_ can't be moved, it is explicitly disallowed (as is copying). But
155 if there's ever a reason to allow it, this is one place that makes that
156 assumption and would need to be fixed. We use a standard shared pointer
157 for core_ (by copying it in), which means in essence obj holds a shared
158 pointer to itself. But this shouldn't leak because Promise will not
159 outlive the continuation, because Promise will setException() with a
160 broken Promise if it is destructed before completed. We could use a
161 weak pointer but it would have to be converted to a shared pointer when
162 func is executed (because the Future returned by func may possibly
163 persist beyond the callback, if it gets moved), and so it is an
164 optimization to just make it shared from the get-go.
166 Two subtle but important points about this design. detail::Core has no
167 back pointers to Future or Promise, so if Future or Promise get moved
168 (and they will be moved in performant code) we don't have to do
169 anything fancy. And because we store the continuation in the
170 detail::Core, not in the Future, we can execute the continuation even
171 after the Future has gone out of scope. This is an intentional design
172 decision. It is likely we will want to be able to cancel a continuation
173 in some circumstances, but I think it should be explicit not implicit
174 in the destruction of the Future used to create it.
176 setCallback_([ funcm = std::forward<F>(func), pm = std::move(p) ](
177 Try<T> && t) mutable {
178 if (!isTry && t.hasException()) {
179 pm.setException(std::move(t.exception()));
181 pm.setWith([&]() { return funcm(t.template get<isTry, Args>()...); });
188 // Variant: returns a Future
189 // e.g. f.then([](T&& t){ return makeFuture<T>(t); });
191 template <typename F, typename R, bool isTry, typename... Args>
192 typename std::enable_if<R::ReturnsFuture::value, typename R::Return>::type
193 Future<T>::thenImplementation(F&& func, detail::argResult<isTry, F, Args...>) {
194 static_assert(sizeof...(Args) <= 1, "Then must take zero/one argument");
195 typedef typename R::ReturnsFuture::Inner B;
200 p.core_->setInterruptHandlerNoLock(core_->getInterruptHandler());
202 // grab the Future now before we lose our handle on the Promise
203 auto f = p.getFuture();
204 f.core_->setExecutorNoLock(getExecutor());
206 setCallback_([ funcm = std::forward<F>(func), pm = std::move(p) ](
207 Try<T> && t) mutable {
208 if (!isTry && t.hasException()) {
209 pm.setException(std::move(t.exception()));
212 auto f2 = funcm(t.template get<isTry, Args>()...);
213 // that didn't throw, now we can steal p
214 f2.setCallback_([p = std::move(pm)](Try<B> && b) mutable {
215 p.setTry(std::move(b));
217 } catch (const std::exception& e) {
218 pm.setException(exception_wrapper(std::current_exception(), e));
220 pm.setException(exception_wrapper(std::current_exception()));
228 template <typename T>
229 template <typename R, typename Caller, typename... Args>
230 Future<typename isFuture<R>::Inner>
231 Future<T>::then(R(Caller::*func)(Args...), Caller *instance) {
232 typedef typename std::remove_cv<
233 typename std::remove_reference<
234 typename detail::ArgType<Args...>::FirstArg>::type>::type FirstArg;
235 return then([instance, func](Try<T>&& t){
236 return (instance->*func)(t.template get<isTry<FirstArg>::value, Args>()...);
241 template <class Executor, class Arg, class... Args>
242 auto Future<T>::then(Executor* x, Arg&& arg, Args&&... args)
243 -> decltype(this->then(std::forward<Arg>(arg),
244 std::forward<Args>(args)...))
246 auto oldX = getExecutor();
248 return this->then(std::forward<Arg>(arg), std::forward<Args>(args)...).
253 Future<Unit> Future<T>::then() {
254 return then([] () {});
257 // onError where the callback returns T
260 typename std::enable_if<
261 !detail::callableWith<F, exception_wrapper>::value &&
262 !detail::Extract<F>::ReturnsFuture::value,
264 Future<T>::onError(F&& func) {
265 typedef typename detail::Extract<F>::FirstArg Exn;
267 std::is_same<typename detail::Extract<F>::RawReturn, T>::value,
268 "Return type of onError callback must be T or Future<T>");
271 p.core_->setInterruptHandlerNoLock(core_->getInterruptHandler());
272 auto f = p.getFuture();
274 setCallback_([ funcm = std::forward<F>(func), pm = std::move(p) ](
275 Try<T> && t) mutable {
276 if (!t.template withException<Exn>(
277 [&](Exn& e) { pm.setWith([&] { return funcm(e); }); })) {
278 pm.setTry(std::move(t));
285 // onError where the callback returns Future<T>
288 typename std::enable_if<
289 !detail::callableWith<F, exception_wrapper>::value &&
290 detail::Extract<F>::ReturnsFuture::value,
292 Future<T>::onError(F&& func) {
294 std::is_same<typename detail::Extract<F>::Return, Future<T>>::value,
295 "Return type of onError callback must be T or Future<T>");
296 typedef typename detail::Extract<F>::FirstArg Exn;
299 auto f = p.getFuture();
301 setCallback_([ pm = std::move(p), funcm = std::forward<F>(func) ](
302 Try<T> && t) mutable {
303 if (!t.template withException<Exn>([&](Exn& e) {
306 f2.setCallback_([pm = std::move(pm)](Try<T> && t2) mutable {
307 pm.setTry(std::move(t2));
309 } catch (const std::exception& e2) {
310 pm.setException(exception_wrapper(std::current_exception(), e2));
312 pm.setException(exception_wrapper(std::current_exception()));
315 pm.setTry(std::move(t));
324 Future<T> Future<T>::ensure(F&& func) {
325 return this->then([funcw = std::forward<F>(func)](Try<T> && t) mutable {
327 return makeFuture(std::move(t));
333 Future<T> Future<T>::onTimeout(Duration dur, F&& func, Timekeeper* tk) {
334 return within(dur, tk).onError([funcw = std::forward<F>(func)](
335 TimedOut const&) { return funcw(); });
340 typename std::enable_if<detail::callableWith<F, exception_wrapper>::value &&
341 detail::Extract<F>::ReturnsFuture::value,
343 Future<T>::onError(F&& func) {
345 std::is_same<typename detail::Extract<F>::Return, Future<T>>::value,
346 "Return type of onError callback must be T or Future<T>");
349 auto f = p.getFuture();
351 [ pm = std::move(p), funcm = std::forward<F>(func) ](Try<T> t) mutable {
352 if (t.hasException()) {
354 auto f2 = funcm(std::move(t.exception()));
355 f2.setCallback_([pm = std::move(pm)](Try<T> t2) mutable {
356 pm.setTry(std::move(t2));
358 } catch (const std::exception& e2) {
359 pm.setException(exception_wrapper(std::current_exception(), e2));
361 pm.setException(exception_wrapper(std::current_exception()));
364 pm.setTry(std::move(t));
371 // onError(exception_wrapper) that returns T
374 typename std::enable_if<
375 detail::callableWith<F, exception_wrapper>::value &&
376 !detail::Extract<F>::ReturnsFuture::value,
378 Future<T>::onError(F&& func) {
380 std::is_same<typename detail::Extract<F>::Return, Future<T>>::value,
381 "Return type of onError callback must be T or Future<T>");
384 auto f = p.getFuture();
386 [ pm = std::move(p), funcm = std::forward<F>(func) ](Try<T> t) mutable {
387 if (t.hasException()) {
388 pm.setWith([&] { return funcm(std::move(t.exception())); });
390 pm.setTry(std::move(t));
398 typename std::add_lvalue_reference<T>::type Future<T>::value() {
401 return core_->getTry().value();
405 typename std::add_lvalue_reference<const T>::type Future<T>::value() const {
408 return core_->getTry().value();
412 Try<T>& Future<T>::getTry() {
415 return core_->getTry();
419 Optional<Try<T>> Future<T>::poll() {
421 if (core_->ready()) {
422 o = std::move(core_->getTry());
428 inline Future<T> Future<T>::via(Executor* executor, int8_t priority) && {
431 setExecutor(executor, priority);
433 return std::move(*this);
437 inline Future<T> Future<T>::via(Executor* executor, int8_t priority) & {
441 auto f = p.getFuture();
442 then([p = std::move(p)](Try<T> && t) mutable { p.setTry(std::move(t)); });
443 return std::move(f).via(executor, priority);
446 template <class Func>
447 auto via(Executor* x, Func&& func)
448 -> Future<typename isFuture<decltype(func())>::Inner>
450 // TODO make this actually more performant. :-P #7260175
451 return via(x).then(std::forward<Func>(func));
455 bool Future<T>::isReady() const {
457 return core_->ready();
461 bool Future<T>::hasValue() {
462 return getTry().hasValue();
466 bool Future<T>::hasException() {
467 return getTry().hasException();
471 void Future<T>::raise(exception_wrapper exception) {
472 core_->raise(std::move(exception));
478 Future<typename std::decay<T>::type> makeFuture(T&& t) {
479 return makeFuture(Try<typename std::decay<T>::type>(std::forward<T>(t)));
482 inline // for multiple translation units
483 Future<Unit> makeFuture() {
484 return makeFuture(Unit{});
487 // makeFutureWith(Future<T>()) -> Future<T>
489 typename std::enable_if<isFuture<typename std::result_of<F()>::type>::value,
490 typename std::result_of<F()>::type>::type
491 makeFutureWith(F&& func) {
493 typename isFuture<typename std::result_of<F()>::type>::Inner;
496 } catch (std::exception& e) {
497 return makeFuture<InnerType>(
498 exception_wrapper(std::current_exception(), e));
500 return makeFuture<InnerType>(exception_wrapper(std::current_exception()));
504 // makeFutureWith(T()) -> Future<T>
505 // makeFutureWith(void()) -> Future<Unit>
507 typename std::enable_if<
508 !(isFuture<typename std::result_of<F()>::type>::value),
509 Future<typename Unit::Lift<typename std::result_of<F()>::type>::type>>::type
510 makeFutureWith(F&& func) {
512 typename Unit::Lift<typename std::result_of<F()>::type>::type;
513 return makeFuture<LiftedResult>(makeTryWith([&func]() mutable {
519 Future<T> makeFuture(std::exception_ptr const& e) {
520 return makeFuture(Try<T>(e));
524 Future<T> makeFuture(exception_wrapper ew) {
525 return makeFuture(Try<T>(std::move(ew)));
528 template <class T, class E>
529 typename std::enable_if<std::is_base_of<std::exception, E>::value,
531 makeFuture(E const& e) {
532 return makeFuture(Try<T>(make_exception_wrapper<E>(e)));
536 Future<T> makeFuture(Try<T>&& t) {
537 return Future<T>(new detail::Core<T>(std::move(t)));
541 Future<Unit> via(Executor* executor, int8_t priority) {
542 return makeFuture().via(executor, priority);
545 // mapSetCallback calls func(i, Try<T>) when every future completes
547 template <class T, class InputIterator, class F>
548 void mapSetCallback(InputIterator first, InputIterator last, F func) {
549 for (size_t i = 0; first != last; ++first, ++i) {
550 first->setCallback_([func, i](Try<T>&& t) {
551 func(i, std::move(t));
556 // collectAll (variadic)
558 template <typename... Fs>
559 typename detail::CollectAllVariadicContext<
560 typename std::decay<Fs>::type::value_type...>::type
561 collectAll(Fs&&... fs) {
562 auto ctx = std::make_shared<detail::CollectAllVariadicContext<
563 typename std::decay<Fs>::type::value_type...>>();
564 detail::collectVariadicHelper<detail::CollectAllVariadicContext>(
565 ctx, std::forward<typename std::decay<Fs>::type>(fs)...);
566 return ctx->p.getFuture();
569 // collectAll (iterator)
571 template <class InputIterator>
574 Try<typename std::iterator_traits<InputIterator>::value_type::value_type>>>
575 collectAll(InputIterator first, InputIterator last) {
577 typename std::iterator_traits<InputIterator>::value_type::value_type T;
579 struct CollectAllContext {
580 CollectAllContext(int n) : results(n) {}
581 ~CollectAllContext() {
582 p.setValue(std::move(results));
584 Promise<std::vector<Try<T>>> p;
585 std::vector<Try<T>> results;
588 auto ctx = std::make_shared<CollectAllContext>(std::distance(first, last));
589 mapSetCallback<T>(first, last, [ctx](size_t i, Try<T>&& t) {
590 ctx->results[i] = std::move(t);
592 return ctx->p.getFuture();
595 // collect (iterator)
599 template <typename T>
600 struct CollectContext {
602 explicit Nothing(int /* n */) {}
605 using Result = typename std::conditional<
606 std::is_void<T>::value,
608 std::vector<T>>::type;
610 using InternalResult = typename std::conditional<
611 std::is_void<T>::value,
613 std::vector<Optional<T>>>::type;
615 explicit CollectContext(int n) : result(n) {}
617 if (!threw.exchange(true)) {
618 // map Optional<T> -> T
619 std::vector<T> finalResult;
620 finalResult.reserve(result.size());
621 std::transform(result.begin(), result.end(),
622 std::back_inserter(finalResult),
623 [](Optional<T>& o) { return std::move(o.value()); });
624 p.setValue(std::move(finalResult));
627 inline void setPartialResult(size_t i, Try<T>& t) {
628 result[i] = std::move(t.value());
631 InternalResult result;
632 std::atomic<bool> threw {false};
637 template <class InputIterator>
638 Future<typename detail::CollectContext<
639 typename std::iterator_traits<InputIterator>::value_type::value_type>::Result>
640 collect(InputIterator first, InputIterator last) {
642 typename std::iterator_traits<InputIterator>::value_type::value_type T;
644 auto ctx = std::make_shared<detail::CollectContext<T>>(
645 std::distance(first, last));
646 mapSetCallback<T>(first, last, [ctx](size_t i, Try<T>&& t) {
647 if (t.hasException()) {
648 if (!ctx->threw.exchange(true)) {
649 ctx->p.setException(std::move(t.exception()));
651 } else if (!ctx->threw) {
652 ctx->setPartialResult(i, t);
655 return ctx->p.getFuture();
658 // collect (variadic)
660 template <typename... Fs>
661 typename detail::CollectVariadicContext<
662 typename std::decay<Fs>::type::value_type...>::type
663 collect(Fs&&... fs) {
664 auto ctx = std::make_shared<detail::CollectVariadicContext<
665 typename std::decay<Fs>::type::value_type...>>();
666 detail::collectVariadicHelper<detail::CollectVariadicContext>(
667 ctx, std::forward<typename std::decay<Fs>::type>(fs)...);
668 return ctx->p.getFuture();
671 // collectAny (iterator)
673 template <class InputIterator>
678 std::iterator_traits<InputIterator>::value_type::value_type>>>
679 collectAny(InputIterator first, InputIterator last) {
681 typename std::iterator_traits<InputIterator>::value_type::value_type T;
683 struct CollectAnyContext {
684 CollectAnyContext() {};
685 Promise<std::pair<size_t, Try<T>>> p;
686 std::atomic<bool> done {false};
689 auto ctx = std::make_shared<CollectAnyContext>();
690 mapSetCallback<T>(first, last, [ctx](size_t i, Try<T>&& t) {
691 if (!ctx->done.exchange(true)) {
692 ctx->p.setValue(std::make_pair(i, std::move(t)));
695 return ctx->p.getFuture();
698 // collectAnyWithoutException (iterator)
700 template <class InputIterator>
703 typename std::iterator_traits<InputIterator>::value_type::value_type>>
704 collectAnyWithoutException(InputIterator first, InputIterator last) {
706 typename std::iterator_traits<InputIterator>::value_type::value_type T;
708 struct CollectAnyWithoutExceptionContext {
709 CollectAnyWithoutExceptionContext(){};
710 Promise<std::pair<size_t, T>> p;
711 std::atomic<bool> done{false};
712 std::atomic<size_t> nFulfilled{0};
716 auto ctx = std::make_shared<CollectAnyWithoutExceptionContext>();
717 ctx->nTotal = std::distance(first, last);
719 mapSetCallback<T>(first, last, [ctx](size_t i, Try<T>&& t) {
720 if (!t.hasException() && !ctx->done.exchange(true)) {
721 ctx->p.setValue(std::make_pair(i, std::move(t.value())));
722 } else if (++ctx->nFulfilled == ctx->nTotal) {
723 ctx->p.setException(t.exception());
726 return ctx->p.getFuture();
729 // collectN (iterator)
731 template <class InputIterator>
732 Future<std::vector<std::pair<size_t, Try<typename
733 std::iterator_traits<InputIterator>::value_type::value_type>>>>
734 collectN(InputIterator first, InputIterator last, size_t n) {
736 std::iterator_traits<InputIterator>::value_type::value_type T;
737 typedef std::vector<std::pair<size_t, Try<T>>> V;
739 struct CollectNContext {
741 std::atomic<size_t> completed = {0};
744 auto ctx = std::make_shared<CollectNContext>();
746 if (size_t(std::distance(first, last)) < n) {
747 ctx->p.setException(std::runtime_error("Not enough futures"));
749 // for each completed Future, increase count and add to vector, until we
750 // have n completed futures at which point we fulfil our Promise with the
752 mapSetCallback<T>(first, last, [ctx, n](size_t i, Try<T>&& t) {
753 auto c = ++ctx->completed;
755 assert(ctx->v.size() < n);
756 ctx->v.emplace_back(i, std::move(t));
758 ctx->p.setTry(Try<V>(std::move(ctx->v)));
764 return ctx->p.getFuture();
769 template <class It, class T, class F>
770 Future<T> reduce(It first, It last, T&& initial, F&& func) {
772 return makeFuture(std::move(initial));
775 typedef typename std::iterator_traits<It>::value_type::value_type ItT;
777 typename std::conditional<detail::callableWith<F, T&&, Try<ItT>&&>::value,
780 typedef isTry<Arg> IsTry;
782 auto sfunc = std::make_shared<F>(std::move(func));
784 auto f = first->then(
785 [ minitial = std::move(initial), sfunc ](Try<ItT> & head) mutable {
787 std::move(minitial), head.template get<IsTry::value, Arg&&>());
790 for (++first; first != last; ++first) {
791 f = collectAll(f, *first).then([sfunc](std::tuple<Try<T>, Try<ItT>>& t) {
792 return (*sfunc)(std::move(std::get<0>(t).value()),
793 // Either return a ItT&& or a Try<ItT>&& depending
794 // on the type of the argument of func.
795 std::get<1>(t).template get<IsTry::value, Arg&&>());
802 // window (collection)
804 template <class Collection, class F, class ItT, class Result>
805 std::vector<Future<Result>>
806 window(Collection input, F func, size_t n) {
807 struct WindowContext {
808 WindowContext(Collection&& i, F&& fn)
809 : input_(std::move(i)), promises_(input_.size()),
812 std::atomic<size_t> i_ {0};
814 std::vector<Promise<Result>> promises_;
817 static inline void spawn(const std::shared_ptr<WindowContext>& ctx) {
818 size_t i = ctx->i_++;
819 if (i < ctx->input_.size()) {
820 // Using setCallback_ directly since we don't need the Future
821 ctx->func_(std::move(ctx->input_[i])).setCallback_(
822 // ctx is captured by value
823 [ctx, i](Try<Result>&& t) {
824 ctx->promises_[i].setTry(std::move(t));
825 // Chain another future onto this one
826 spawn(std::move(ctx));
832 auto max = std::min(n, input.size());
834 auto ctx = std::make_shared<WindowContext>(
835 std::move(input), std::move(func));
837 for (size_t i = 0; i < max; ++i) {
838 // Start the first n Futures
839 WindowContext::spawn(ctx);
842 std::vector<Future<Result>> futures;
843 futures.reserve(ctx->promises_.size());
844 for (auto& promise : ctx->promises_) {
845 futures.emplace_back(promise.getFuture());
854 template <class I, class F>
855 Future<I> Future<T>::reduce(I&& initial, F&& func) {
857 minitial = std::forward<I>(initial),
858 mfunc = std::forward<F>(func)
860 auto ret = std::move(minitial);
861 for (auto& val : vals) {
862 ret = mfunc(std::move(ret), std::move(val));
868 // unorderedReduce (iterator)
870 template <class It, class T, class F, class ItT, class Arg>
871 Future<T> unorderedReduce(It first, It last, T initial, F func) {
873 return makeFuture(std::move(initial));
876 typedef isTry<Arg> IsTry;
878 struct UnorderedReduceContext {
879 UnorderedReduceContext(T&& memo, F&& fn, size_t n)
880 : lock_(), memo_(makeFuture<T>(std::move(memo))),
881 func_(std::move(fn)), numThens_(0), numFutures_(n), promise_()
883 folly::MicroSpinLock lock_; // protects memo_ and numThens_
886 size_t numThens_; // how many Futures completed and called .then()
887 size_t numFutures_; // how many Futures in total
891 auto ctx = std::make_shared<UnorderedReduceContext>(
892 std::move(initial), std::move(func), std::distance(first, last));
897 [ctx](size_t /* i */, Try<ItT>&& t) {
898 // Futures can be completed in any order, simultaneously.
899 // To make this non-blocking, we create a new Future chain in
900 // the order of completion to reduce the values.
901 // The spinlock just protects chaining a new Future, not actually
902 // executing the reduce, which should be really fast.
903 folly::MSLGuard lock(ctx->lock_);
905 ctx->memo_.then([ ctx, mt = std::move(t) ](T && v) mutable {
906 // Either return a ItT&& or a Try<ItT>&& depending
907 // on the type of the argument of func.
908 return ctx->func_(std::move(v),
909 mt.template get<IsTry::value, Arg&&>());
911 if (++ctx->numThens_ == ctx->numFutures_) {
912 // After reducing the value of the last Future, fulfill the Promise
913 ctx->memo_.setCallback_(
914 [ctx](Try<T>&& t2) { ctx->promise_.setValue(std::move(t2)); });
918 return ctx->promise_.getFuture();
924 Future<T> Future<T>::within(Duration dur, Timekeeper* tk) {
925 return within(dur, TimedOut(), tk);
930 Future<T> Future<T>::within(Duration dur, E e, Timekeeper* tk) {
933 Context(E ex) : exception(std::move(ex)), promise() {}
935 Future<Unit> thisFuture;
937 std::atomic<bool> token {false};
940 std::shared_ptr<Timekeeper> tks;
942 tks = folly::detail::getTimekeeperSingleton();
943 tk = DCHECK_NOTNULL(tks.get());
946 auto ctx = std::make_shared<Context>(std::move(e));
948 ctx->thisFuture = this->then([ctx](Try<T>&& t) mutable {
949 // TODO: "this" completed first, cancel "after"
950 if (ctx->token.exchange(true) == false) {
951 ctx->promise.setTry(std::move(t));
955 tk->after(dur).then([ctx](Try<Unit> const& t) mutable {
956 // "after" completed first, cancel "this"
957 ctx->thisFuture.raise(TimedOut());
958 if (ctx->token.exchange(true) == false) {
959 if (t.hasException()) {
960 ctx->promise.setException(std::move(t.exception()));
962 ctx->promise.setException(std::move(ctx->exception));
967 return ctx->promise.getFuture().via(getExecutor());
973 Future<T> Future<T>::delayed(Duration dur, Timekeeper* tk) {
974 return collectAll(*this, futures::sleep(dur, tk))
975 .then([](std::tuple<Try<T>, Try<Unit>> tup) {
976 Try<T>& t = std::get<0>(tup);
977 return makeFuture<T>(std::move(t));
984 void waitImpl(Future<T>& f) {
985 // short-circuit if there's nothing to do
986 if (f.isReady()) return;
988 FutureBatonType baton;
989 f.setCallback_([&](const Try<T>& /* t */) { baton.post(); });
995 void waitImpl(Future<T>& f, Duration dur) {
996 // short-circuit if there's nothing to do
1002 auto ret = promise.getFuture();
1003 auto baton = std::make_shared<FutureBatonType>();
1004 f.setCallback_([ baton, promise = std::move(promise) ](Try<T> && t) mutable {
1005 promise.setTry(std::move(t));
1009 if (baton->timed_wait(dur)) {
1010 assert(f.isReady());
1015 void waitViaImpl(Future<T>& f, DrivableExecutor* e) {
1016 // Set callback so to ensure that the via executor has something on it
1017 // so that once the preceding future triggers this callback, drive will
1018 // always have a callback to satisfy it
1021 f = f.via(e).then([](T&& t) { return std::move(t); });
1022 while (!f.isReady()) {
1025 assert(f.isReady());
1031 Future<T>& Future<T>::wait() & {
1032 detail::waitImpl(*this);
1037 Future<T>&& Future<T>::wait() && {
1038 detail::waitImpl(*this);
1039 return std::move(*this);
1043 Future<T>& Future<T>::wait(Duration dur) & {
1044 detail::waitImpl(*this, dur);
1049 Future<T>&& Future<T>::wait(Duration dur) && {
1050 detail::waitImpl(*this, dur);
1051 return std::move(*this);
1055 Future<T>& Future<T>::waitVia(DrivableExecutor* e) & {
1056 detail::waitViaImpl(*this, e);
1061 Future<T>&& Future<T>::waitVia(DrivableExecutor* e) && {
1062 detail::waitViaImpl(*this, e);
1063 return std::move(*this);
1067 T Future<T>::get() {
1068 return std::move(wait().value());
1072 T Future<T>::get(Duration dur) {
1075 return std::move(value());
1082 T Future<T>::getVia(DrivableExecutor* e) {
1083 return std::move(waitVia(e).value());
1089 static bool equals(const Try<T>& t1, const Try<T>& t2) {
1090 return t1.value() == t2.value();
1096 Future<bool> Future<T>::willEqual(Future<T>& f) {
1097 return collectAll(*this, f).then([](const std::tuple<Try<T>, Try<T>>& t) {
1098 if (std::get<0>(t).hasValue() && std::get<1>(t).hasValue()) {
1099 return detail::TryEquals<T>::equals(std::get<0>(t), std::get<1>(t));
1108 Future<T> Future<T>::filter(F&& predicate) {
1109 return this->then([p = std::forward<F>(predicate)](T val) {
1110 T const& valConstRef = val;
1111 if (!p(valConstRef)) {
1112 throw PredicateDoesNotObtain();
1119 template <class Callback>
1120 auto Future<T>::thenMulti(Callback&& fn)
1121 -> decltype(this->then(std::forward<Callback>(fn))) {
1122 // thenMulti with one callback is just a then
1123 return then(std::forward<Callback>(fn));
1127 template <class Callback, class... Callbacks>
1128 auto Future<T>::thenMulti(Callback&& fn, Callbacks&&... fns)
1129 -> decltype(this->then(std::forward<Callback>(fn)).
1130 thenMulti(std::forward<Callbacks>(fns)...)) {
1131 // thenMulti with two callbacks is just then(a).thenMulti(b, ...)
1132 return then(std::forward<Callback>(fn)).
1133 thenMulti(std::forward<Callbacks>(fns)...);
1137 template <class Callback, class... Callbacks>
1138 auto Future<T>::thenMultiWithExecutor(Executor* x, Callback&& fn,
1140 -> decltype(this->then(std::forward<Callback>(fn)).
1141 thenMulti(std::forward<Callbacks>(fns)...)) {
1142 // thenMultiExecutor with two callbacks is
1143 // via(x).then(a).thenMulti(b, ...).via(oldX)
1144 auto oldX = getExecutor();
1146 return then(std::forward<Callback>(fn)).
1147 thenMulti(std::forward<Callbacks>(fns)...).via(oldX);
1151 template <class Callback>
1152 auto Future<T>::thenMultiWithExecutor(Executor* x, Callback&& fn)
1153 -> decltype(this->then(std::forward<Callback>(fn))) {
1154 // thenMulti with one callback is just a then with an executor
1155 return then(x, std::forward<Callback>(fn));
1159 inline Future<Unit> when(bool p, F&& thunk) {
1160 return p ? std::forward<F>(thunk)().unit() : makeFuture();
1163 template <class P, class F>
1164 Future<Unit> whileDo(P&& predicate, F&& thunk) {
1166 auto future = thunk();
1167 return future.then([
1168 predicate = std::forward<P>(predicate),
1169 thunk = std::forward<F>(thunk)
1171 return whileDo(std::forward<P>(predicate), std::forward<F>(thunk));
1174 return makeFuture();
1178 Future<Unit> times(const int n, F&& thunk) {
1179 return folly::whileDo(
1180 [ n, count = folly::make_unique<std::atomic<int>>(0) ]() mutable {
1181 return count->fetch_add(1) < n;
1183 std::forward<F>(thunk));
1187 template <class It, class F, class ItT, class Result>
1188 std::vector<Future<Result>> map(It first, It last, F func) {
1189 std::vector<Future<Result>> results;
1190 for (auto it = first; it != last; it++) {
1191 results.push_back(it->then(func));
1201 struct retrying_policy_raw_tag {};
1202 struct retrying_policy_fut_tag {};
1204 template <class Policy>
1205 struct retrying_policy_traits {
1206 using ew = exception_wrapper;
1207 FOLLY_CREATE_HAS_MEMBER_FN_TRAITS(has_op_call, operator());
1208 template <class Ret>
1209 using has_op = typename std::integral_constant<bool,
1210 has_op_call<Policy, Ret(size_t, const ew&)>::value ||
1211 has_op_call<Policy, Ret(size_t, const ew&) const>::value>;
1212 using is_raw = has_op<bool>;
1213 using is_fut = has_op<Future<bool>>;
1214 using tag = typename std::conditional<
1215 is_raw::value, retrying_policy_raw_tag, typename std::conditional<
1216 is_fut::value, retrying_policy_fut_tag, void>::type>::type;
1219 template <class Policy, class FF>
1220 typename std::result_of<FF(size_t)>::type
1221 retrying(size_t k, Policy&& p, FF&& ff) {
1222 using F = typename std::result_of<FF(size_t)>::type;
1223 using T = typename F::value_type;
1226 [ k, pm = std::forward<Policy>(p), ffm = std::forward<FF>(ff) ](
1227 exception_wrapper x) mutable {
1230 [ k, xm = std::move(x), pm = std::move(pm), ffm = std::move(ffm) ](
1232 return r ? retrying(k, std::move(pm), std::move(ffm))
1233 : makeFuture<T>(std::move(xm));
1238 template <class Policy, class FF>
1239 typename std::result_of<FF(size_t)>::type
1240 retrying(Policy&& p, FF&& ff, retrying_policy_raw_tag) {
1241 auto q = [pm = std::forward<Policy>(p)](size_t k, exception_wrapper x) {
1242 return makeFuture<bool>(pm(k, x));
1244 return retrying(0, std::move(q), std::forward<FF>(ff));
1247 template <class Policy, class FF>
1248 typename std::result_of<FF(size_t)>::type
1249 retrying(Policy&& p, FF&& ff, retrying_policy_fut_tag) {
1250 return retrying(0, std::forward<Policy>(p), std::forward<FF>(ff));
1253 // jittered exponential backoff, clamped to [backoff_min, backoff_max]
1254 template <class URNG>
1255 Duration retryingJitteredExponentialBackoffDur(
1257 Duration backoff_min,
1258 Duration backoff_max,
1259 double jitter_param,
1262 auto dist = std::normal_distribution<double>(0.0, jitter_param);
1263 auto jitter = std::exp(dist(rng));
1264 auto backoff = d(d::rep(jitter * backoff_min.count() * std::pow(2, n - 1)));
1265 return std::max(backoff_min, std::min(backoff_max, backoff));
1268 template <class Policy, class URNG>
1269 std::function<Future<bool>(size_t, const exception_wrapper&)>
1270 retryingPolicyCappedJitteredExponentialBackoff(
1272 Duration backoff_min,
1273 Duration backoff_max,
1274 double jitter_param,
1278 pm = std::forward<Policy>(p),
1283 rngp = std::forward<URNG>(rng)
1284 ](size_t n, const exception_wrapper& ex) mutable {
1285 if (n == max_tries) {
1286 return makeFuture(false);
1288 return pm(n, ex).then(
1289 [ n, backoff_min, backoff_max, jitter_param, rngp = std::move(rngp) ](
1292 return makeFuture(false);
1294 auto backoff = detail::retryingJitteredExponentialBackoffDur(
1295 n, backoff_min, backoff_max, jitter_param, rngp);
1296 return futures::sleep(backoff).then([] { return true; });
1301 template <class Policy, class URNG>
1302 std::function<Future<bool>(size_t, const exception_wrapper&)>
1303 retryingPolicyCappedJitteredExponentialBackoff(
1305 Duration backoff_min,
1306 Duration backoff_max,
1307 double jitter_param,
1310 retrying_policy_raw_tag) {
1311 auto q = [pm = std::forward<Policy>(p)](
1312 size_t n, const exception_wrapper& e) {
1313 return makeFuture(pm(n, e));
1315 return retryingPolicyCappedJitteredExponentialBackoff(
1320 std::forward<URNG>(rng),
1324 template <class Policy, class URNG>
1325 std::function<Future<bool>(size_t, const exception_wrapper&)>
1326 retryingPolicyCappedJitteredExponentialBackoff(
1328 Duration backoff_min,
1329 Duration backoff_max,
1330 double jitter_param,
1333 retrying_policy_fut_tag) {
1334 return retryingPolicyCappedJitteredExponentialBackoff(
1339 std::forward<URNG>(rng),
1340 std::forward<Policy>(p));
1344 template <class Policy, class FF>
1345 typename std::result_of<FF(size_t)>::type
1346 retrying(Policy&& p, FF&& ff) {
1347 using tag = typename detail::retrying_policy_traits<Policy>::tag;
1348 return detail::retrying(std::forward<Policy>(p), std::forward<FF>(ff), tag());
1352 std::function<bool(size_t, const exception_wrapper&)>
1353 retryingPolicyBasic(
1355 return [=](size_t n, const exception_wrapper&) { return n < max_tries; };
1358 template <class Policy, class URNG>
1359 std::function<Future<bool>(size_t, const exception_wrapper&)>
1360 retryingPolicyCappedJitteredExponentialBackoff(
1362 Duration backoff_min,
1363 Duration backoff_max,
1364 double jitter_param,
1367 using tag = typename detail::retrying_policy_traits<Policy>::tag;
1368 return detail::retryingPolicyCappedJitteredExponentialBackoff(
1373 std::forward<URNG>(rng),
1374 std::forward<Policy>(p),
1379 std::function<Future<bool>(size_t, const exception_wrapper&)>
1380 retryingPolicyCappedJitteredExponentialBackoff(
1382 Duration backoff_min,
1383 Duration backoff_max,
1384 double jitter_param) {
1385 auto p = [](size_t, const exception_wrapper&) { return true; };
1386 return retryingPolicyCappedJitteredExponentialBackoff(
1397 // Instantiate the most common Future types to save compile time
1398 extern template class Future<Unit>;
1399 extern template class Future<bool>;
1400 extern template class Future<int>;
1401 extern template class Future<int64_t>;
1402 extern template class Future<std::string>;
1403 extern template class Future<double>;
1405 } // namespace folly