2 * Copyright 2015 Facebook, Inc.
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
8 * http://www.apache.org/licenses/LICENSE-2.0
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
24 #include <folly/experimental/fibers/Baton.h>
25 #include <folly/Optional.h>
26 #include <folly/Random.h>
27 #include <folly/Traits.h>
28 #include <folly/futures/detail/Core.h>
29 #include <folly/futures/Timekeeper.h>
36 Timekeeper* getTimekeeperSingleton();
40 Future<T>::Future(Future<T>&& other) noexcept : core_(other.core_) {
41 other.core_ = nullptr;
45 Future<T>& Future<T>::operator=(Future<T>&& other) noexcept {
46 std::swap(core_, other.core_);
51 template <class T2, typename>
52 Future<T>::Future(T2&& val)
53 : core_(new detail::Core<T>(Try<T>(std::forward<T2>(val)))) {}
56 template <typename, typename>
58 : core_(new detail::Core<T>(Try<T>(T()))) {}
61 Future<T>::~Future() {
66 void Future<T>::detach() {
68 core_->detachFuture();
74 void Future<T>::throwIfInvalid() const {
81 void Future<T>::setCallback_(F&& func) {
83 core_->setCallback(std::move(func));
90 typename std::enable_if<isFuture<F>::value,
91 Future<typename isFuture<T>::Inner>>::type
93 return then([](Future<typename isFuture<T>::Inner> internal_future) {
94 return internal_future;
100 // Variant: returns a value
101 // e.g. f.then([](Try<T>&& t){ return t.value(); });
103 template <typename F, typename R, bool isTry, typename... Args>
104 typename std::enable_if<!R::ReturnsFuture::value, typename R::Return>::type
105 Future<T>::thenImplementation(F func, detail::argResult<isTry, F, Args...>) {
106 static_assert(sizeof...(Args) <= 1, "Then must take zero/one argument");
107 typedef typename R::ReturnsFuture::Inner B;
111 // wrap these so we can move them into the lambda
112 folly::MoveWrapper<Promise<B>> p;
113 p->core_->setInterruptHandlerNoLock(core_->getInterruptHandler());
114 folly::MoveWrapper<F> funcm(std::forward<F>(func));
116 // grab the Future now before we lose our handle on the Promise
117 auto f = p->getFuture();
118 f.core_->setExecutorNoLock(getExecutor());
120 /* This is a bit tricky.
122 We can't just close over *this in case this Future gets moved. So we
123 make a new dummy Future. We could figure out something more
124 sophisticated that avoids making a new Future object when it can, as an
125 optimization. But this is correct.
127 core_ can't be moved, it is explicitly disallowed (as is copying). But
128 if there's ever a reason to allow it, this is one place that makes that
129 assumption and would need to be fixed. We use a standard shared pointer
130 for core_ (by copying it in), which means in essence obj holds a shared
131 pointer to itself. But this shouldn't leak because Promise will not
132 outlive the continuation, because Promise will setException() with a
133 broken Promise if it is destructed before completed. We could use a
134 weak pointer but it would have to be converted to a shared pointer when
135 func is executed (because the Future returned by func may possibly
136 persist beyond the callback, if it gets moved), and so it is an
137 optimization to just make it shared from the get-go.
139 We have to move in the Promise and func using the MoveWrapper
140 hack. (func could be copied but it's a big drag on perf).
142 Two subtle but important points about this design. detail::Core has no
143 back pointers to Future or Promise, so if Future or Promise get moved
144 (and they will be moved in performant code) we don't have to do
145 anything fancy. And because we store the continuation in the
146 detail::Core, not in the Future, we can execute the continuation even
147 after the Future has gone out of scope. This is an intentional design
148 decision. It is likely we will want to be able to cancel a continuation
149 in some circumstances, but I think it should be explicit not implicit
150 in the destruction of the Future used to create it.
153 [p, funcm](Try<T>&& t) mutable {
154 if (!isTry && t.hasException()) {
155 p->setException(std::move(t.exception()));
158 return (*funcm)(t.template get<isTry, Args>()...);
166 // Variant: returns a Future
167 // e.g. f.then([](T&& t){ return makeFuture<T>(t); });
169 template <typename F, typename R, bool isTry, typename... Args>
170 typename std::enable_if<R::ReturnsFuture::value, typename R::Return>::type
171 Future<T>::thenImplementation(F func, detail::argResult<isTry, F, Args...>) {
172 static_assert(sizeof...(Args) <= 1, "Then must take zero/one argument");
173 typedef typename R::ReturnsFuture::Inner B;
177 // wrap these so we can move them into the lambda
178 folly::MoveWrapper<Promise<B>> p;
179 p->core_->setInterruptHandlerNoLock(core_->getInterruptHandler());
180 folly::MoveWrapper<F> funcm(std::forward<F>(func));
182 // grab the Future now before we lose our handle on the Promise
183 auto f = p->getFuture();
184 f.core_->setExecutorNoLock(getExecutor());
187 [p, funcm](Try<T>&& t) mutable {
188 if (!isTry && t.hasException()) {
189 p->setException(std::move(t.exception()));
192 auto f2 = (*funcm)(t.template get<isTry, Args>()...);
193 // that didn't throw, now we can steal p
194 f2.setCallback_([p](Try<B>&& b) mutable {
195 p->setTry(std::move(b));
197 } catch (const std::exception& e) {
198 p->setException(exception_wrapper(std::current_exception(), e));
200 p->setException(exception_wrapper(std::current_exception()));
208 template <typename T>
209 template <typename R, typename Caller, typename... Args>
210 Future<typename isFuture<R>::Inner>
211 Future<T>::then(R(Caller::*func)(Args...), Caller *instance) {
212 typedef typename std::remove_cv<
213 typename std::remove_reference<
214 typename detail::ArgType<Args...>::FirstArg>::type>::type FirstArg;
215 return then([instance, func](Try<T>&& t){
216 return (instance->*func)(t.template get<isTry<FirstArg>::value, Args>()...);
221 template <class Executor, class Arg, class... Args>
222 auto Future<T>::then(Executor* x, Arg&& arg, Args&&... args)
223 -> decltype(this->then(std::forward<Arg>(arg),
224 std::forward<Args>(args)...))
226 auto oldX = getExecutor();
228 return this->then(std::forward<Arg>(arg), std::forward<Args>(args)...).
233 Future<Unit> Future<T>::then() {
234 return then([] () {});
237 // onError where the callback returns T
240 typename std::enable_if<
241 !detail::callableWith<F, exception_wrapper>::value &&
242 !detail::Extract<F>::ReturnsFuture::value,
244 Future<T>::onError(F&& func) {
245 typedef typename detail::Extract<F>::FirstArg Exn;
247 std::is_same<typename detail::Extract<F>::RawReturn, T>::value,
248 "Return type of onError callback must be T or Future<T>");
251 p.core_->setInterruptHandlerNoLock(core_->getInterruptHandler());
252 auto f = p.getFuture();
253 auto pm = folly::makeMoveWrapper(std::move(p));
254 auto funcm = folly::makeMoveWrapper(std::move(func));
255 setCallback_([pm, funcm](Try<T>&& t) mutable {
256 if (!t.template withException<Exn>([&] (Exn& e) {
261 pm->setTry(std::move(t));
268 // onError where the callback returns Future<T>
271 typename std::enable_if<
272 !detail::callableWith<F, exception_wrapper>::value &&
273 detail::Extract<F>::ReturnsFuture::value,
275 Future<T>::onError(F&& func) {
277 std::is_same<typename detail::Extract<F>::Return, Future<T>>::value,
278 "Return type of onError callback must be T or Future<T>");
279 typedef typename detail::Extract<F>::FirstArg Exn;
282 auto f = p.getFuture();
283 auto pm = folly::makeMoveWrapper(std::move(p));
284 auto funcm = folly::makeMoveWrapper(std::move(func));
285 setCallback_([pm, funcm](Try<T>&& t) mutable {
286 if (!t.template withException<Exn>([&] (Exn& e) {
288 auto f2 = (*funcm)(e);
289 f2.setCallback_([pm](Try<T>&& t2) mutable {
290 pm->setTry(std::move(t2));
292 } catch (const std::exception& e2) {
293 pm->setException(exception_wrapper(std::current_exception(), e2));
295 pm->setException(exception_wrapper(std::current_exception()));
298 pm->setTry(std::move(t));
307 Future<T> Future<T>::ensure(F func) {
308 MoveWrapper<F> funcw(std::move(func));
309 return this->then([funcw](Try<T>&& t) mutable {
311 return makeFuture(std::move(t));
317 Future<T> Future<T>::onTimeout(Duration dur, F&& func, Timekeeper* tk) {
318 auto funcw = folly::makeMoveWrapper(std::forward<F>(func));
319 return within(dur, tk)
320 .onError([funcw](TimedOut const&) { return (*funcw)(); });
325 typename std::enable_if<
326 detail::callableWith<F, exception_wrapper>::value &&
327 detail::Extract<F>::ReturnsFuture::value,
329 Future<T>::onError(F&& func) {
331 std::is_same<typename detail::Extract<F>::Return, Future<T>>::value,
332 "Return type of onError callback must be T or Future<T>");
335 auto f = p.getFuture();
336 auto pm = folly::makeMoveWrapper(std::move(p));
337 auto funcm = folly::makeMoveWrapper(std::move(func));
338 setCallback_([pm, funcm](Try<T> t) mutable {
339 if (t.hasException()) {
341 auto f2 = (*funcm)(std::move(t.exception()));
342 f2.setCallback_([pm](Try<T> t2) mutable {
343 pm->setTry(std::move(t2));
345 } catch (const std::exception& e2) {
346 pm->setException(exception_wrapper(std::current_exception(), e2));
348 pm->setException(exception_wrapper(std::current_exception()));
351 pm->setTry(std::move(t));
358 // onError(exception_wrapper) that returns T
361 typename std::enable_if<
362 detail::callableWith<F, exception_wrapper>::value &&
363 !detail::Extract<F>::ReturnsFuture::value,
365 Future<T>::onError(F&& func) {
367 std::is_same<typename detail::Extract<F>::Return, Future<T>>::value,
368 "Return type of onError callback must be T or Future<T>");
371 auto f = p.getFuture();
372 auto pm = folly::makeMoveWrapper(std::move(p));
373 auto funcm = folly::makeMoveWrapper(std::move(func));
374 setCallback_([pm, funcm](Try<T> t) mutable {
375 if (t.hasException()) {
377 return (*funcm)(std::move(t.exception()));
380 pm->setTry(std::move(t));
388 typename std::add_lvalue_reference<T>::type Future<T>::value() {
391 return core_->getTry().value();
395 typename std::add_lvalue_reference<const T>::type Future<T>::value() const {
398 return core_->getTry().value();
402 Try<T>& Future<T>::getTry() {
405 return core_->getTry();
409 Optional<Try<T>> Future<T>::poll() {
411 if (core_->ready()) {
412 o = std::move(core_->getTry());
418 inline Future<T> Future<T>::via(Executor* executor, int8_t priority) && {
421 setExecutor(executor, priority);
423 return std::move(*this);
427 inline Future<T> Future<T>::via(Executor* executor, int8_t priority) & {
430 MoveWrapper<Promise<T>> p;
431 auto f = p->getFuture();
432 then([p](Try<T>&& t) mutable { p->setTry(std::move(t)); });
433 return std::move(f).via(executor, priority);
437 template <class Func>
438 auto via(Executor* x, Func func)
439 -> Future<typename isFuture<decltype(func())>::Inner>
441 // TODO make this actually more performant. :-P #7260175
442 return via(x).then(func);
446 bool Future<T>::isReady() const {
448 return core_->ready();
452 bool Future<T>::hasValue() {
453 return getTry().hasValue();
457 bool Future<T>::hasException() {
458 return getTry().hasException();
462 void Future<T>::raise(exception_wrapper exception) {
463 core_->raise(std::move(exception));
469 Future<typename std::decay<T>::type> makeFuture(T&& t) {
470 return makeFuture(Try<typename std::decay<T>::type>(std::forward<T>(t)));
473 inline // for multiple translation units
474 Future<Unit> makeFuture() {
475 return makeFuture(Unit{});
478 // makeFutureWith(Future<T>()) -> Future<T>
480 typename std::enable_if<isFuture<typename std::result_of<F()>::type>::value,
481 typename std::result_of<F()>::type>::type
482 makeFutureWith(F&& func) {
484 typename isFuture<typename std::result_of<F()>::type>::Inner;
487 } catch (std::exception& e) {
488 return makeFuture<InnerType>(
489 exception_wrapper(std::current_exception(), e));
491 return makeFuture<InnerType>(exception_wrapper(std::current_exception()));
495 // makeFutureWith(T()) -> Future<T>
496 // makeFutureWith(void()) -> Future<Unit>
498 typename std::enable_if<
499 !(isFuture<typename std::result_of<F()>::type>::value),
500 Future<typename Unit::Lift<typename std::result_of<F()>::type>::type>>::type
501 makeFutureWith(F&& func) {
503 typename Unit::Lift<typename std::result_of<F()>::type>::type;
504 return makeFuture<LiftedResult>(makeTryWith([&func]() mutable {
510 Future<T> makeFuture(std::exception_ptr const& e) {
511 return makeFuture(Try<T>(e));
515 Future<T> makeFuture(exception_wrapper ew) {
516 return makeFuture(Try<T>(std::move(ew)));
519 template <class T, class E>
520 typename std::enable_if<std::is_base_of<std::exception, E>::value,
522 makeFuture(E const& e) {
523 return makeFuture(Try<T>(make_exception_wrapper<E>(e)));
527 Future<T> makeFuture(Try<T>&& t) {
528 return Future<T>(new detail::Core<T>(std::move(t)));
532 Future<Unit> via(Executor* executor, int8_t priority) {
533 return makeFuture().via(executor, priority);
536 // mapSetCallback calls func(i, Try<T>) when every future completes
538 template <class T, class InputIterator, class F>
539 void mapSetCallback(InputIterator first, InputIterator last, F func) {
540 for (size_t i = 0; first != last; ++first, ++i) {
541 first->setCallback_([func, i](Try<T>&& t) {
542 func(i, std::move(t));
547 // collectAll (variadic)
549 template <typename... Fs>
550 typename detail::CollectAllVariadicContext<
551 typename std::decay<Fs>::type::value_type...>::type
552 collectAll(Fs&&... fs) {
553 auto ctx = std::make_shared<detail::CollectAllVariadicContext<
554 typename std::decay<Fs>::type::value_type...>>();
555 detail::collectVariadicHelper<detail::CollectAllVariadicContext>(
556 ctx, std::forward<typename std::decay<Fs>::type>(fs)...);
557 return ctx->p.getFuture();
560 // collectAll (iterator)
562 template <class InputIterator>
565 Try<typename std::iterator_traits<InputIterator>::value_type::value_type>>>
566 collectAll(InputIterator first, InputIterator last) {
568 typename std::iterator_traits<InputIterator>::value_type::value_type T;
570 struct CollectAllContext {
571 CollectAllContext(int n) : results(n) {}
572 ~CollectAllContext() {
573 p.setValue(std::move(results));
575 Promise<std::vector<Try<T>>> p;
576 std::vector<Try<T>> results;
579 auto ctx = std::make_shared<CollectAllContext>(std::distance(first, last));
580 mapSetCallback<T>(first, last, [ctx](size_t i, Try<T>&& t) {
581 ctx->results[i] = std::move(t);
583 return ctx->p.getFuture();
586 // collect (iterator)
590 template <typename T>
591 struct CollectContext {
592 struct Nothing { explicit Nothing(int n) {} };
594 using Result = typename std::conditional<
595 std::is_void<T>::value,
597 std::vector<T>>::type;
599 using InternalResult = typename std::conditional<
600 std::is_void<T>::value,
602 std::vector<Optional<T>>>::type;
604 explicit CollectContext(int n) : result(n) {}
606 if (!threw.exchange(true)) {
607 // map Optional<T> -> T
608 std::vector<T> finalResult;
609 finalResult.reserve(result.size());
610 std::transform(result.begin(), result.end(),
611 std::back_inserter(finalResult),
612 [](Optional<T>& o) { return std::move(o.value()); });
613 p.setValue(std::move(finalResult));
616 inline void setPartialResult(size_t i, Try<T>& t) {
617 result[i] = std::move(t.value());
620 InternalResult result;
621 std::atomic<bool> threw {false};
626 template <class InputIterator>
627 Future<typename detail::CollectContext<
628 typename std::iterator_traits<InputIterator>::value_type::value_type>::Result>
629 collect(InputIterator first, InputIterator last) {
631 typename std::iterator_traits<InputIterator>::value_type::value_type T;
633 auto ctx = std::make_shared<detail::CollectContext<T>>(
634 std::distance(first, last));
635 mapSetCallback<T>(first, last, [ctx](size_t i, Try<T>&& t) {
636 if (t.hasException()) {
637 if (!ctx->threw.exchange(true)) {
638 ctx->p.setException(std::move(t.exception()));
640 } else if (!ctx->threw) {
641 ctx->setPartialResult(i, t);
644 return ctx->p.getFuture();
647 // collect (variadic)
649 template <typename... Fs>
650 typename detail::CollectVariadicContext<
651 typename std::decay<Fs>::type::value_type...>::type
652 collect(Fs&&... fs) {
653 auto ctx = std::make_shared<detail::CollectVariadicContext<
654 typename std::decay<Fs>::type::value_type...>>();
655 detail::collectVariadicHelper<detail::CollectVariadicContext>(
656 ctx, std::forward<typename std::decay<Fs>::type>(fs)...);
657 return ctx->p.getFuture();
660 // collectAny (iterator)
662 template <class InputIterator>
667 std::iterator_traits<InputIterator>::value_type::value_type>>>
668 collectAny(InputIterator first, InputIterator last) {
670 typename std::iterator_traits<InputIterator>::value_type::value_type T;
672 struct CollectAnyContext {
673 CollectAnyContext() {};
674 Promise<std::pair<size_t, Try<T>>> p;
675 std::atomic<bool> done {false};
678 auto ctx = std::make_shared<CollectAnyContext>();
679 mapSetCallback<T>(first, last, [ctx](size_t i, Try<T>&& t) {
680 if (!ctx->done.exchange(true)) {
681 ctx->p.setValue(std::make_pair(i, std::move(t)));
684 return ctx->p.getFuture();
687 // collectN (iterator)
689 template <class InputIterator>
690 Future<std::vector<std::pair<size_t, Try<typename
691 std::iterator_traits<InputIterator>::value_type::value_type>>>>
692 collectN(InputIterator first, InputIterator last, size_t n) {
694 std::iterator_traits<InputIterator>::value_type::value_type T;
695 typedef std::vector<std::pair<size_t, Try<T>>> V;
697 struct CollectNContext {
699 std::atomic<size_t> completed = {0};
702 auto ctx = std::make_shared<CollectNContext>();
704 if (size_t(std::distance(first, last)) < n) {
705 ctx->p.setException(std::runtime_error("Not enough futures"));
707 // for each completed Future, increase count and add to vector, until we
708 // have n completed futures at which point we fulfil our Promise with the
710 mapSetCallback<T>(first, last, [ctx, n](size_t i, Try<T>&& t) {
711 auto c = ++ctx->completed;
713 assert(ctx->v.size() < n);
714 ctx->v.emplace_back(i, std::move(t));
716 ctx->p.setTry(Try<V>(std::move(ctx->v)));
722 return ctx->p.getFuture();
727 template <class It, class T, class F>
728 Future<T> reduce(It first, It last, T&& initial, F&& func) {
730 return makeFuture(std::move(initial));
733 typedef typename std::iterator_traits<It>::value_type::value_type ItT;
734 typedef typename std::conditional<
735 detail::callableWith<F, T&&, Try<ItT>&&>::value, Try<ItT>, ItT>::type Arg;
736 typedef isTry<Arg> IsTry;
738 folly::MoveWrapper<T> minitial(std::move(initial));
739 auto sfunc = std::make_shared<F>(std::move(func));
741 auto f = first->then([minitial, sfunc](Try<ItT>& head) mutable {
742 return (*sfunc)(std::move(*minitial),
743 head.template get<IsTry::value, Arg&&>());
746 for (++first; first != last; ++first) {
747 f = collectAll(f, *first).then([sfunc](std::tuple<Try<T>, Try<ItT>>& t) {
748 return (*sfunc)(std::move(std::get<0>(t).value()),
749 // Either return a ItT&& or a Try<ItT>&& depending
750 // on the type of the argument of func.
751 std::get<1>(t).template get<IsTry::value, Arg&&>());
758 // window (collection)
760 template <class Collection, class F, class ItT, class Result>
761 std::vector<Future<Result>>
762 window(Collection input, F func, size_t n) {
763 struct WindowContext {
764 WindowContext(Collection&& i, F&& fn)
765 : input_(std::move(i)), promises_(input_.size()),
768 std::atomic<size_t> i_ {0};
770 std::vector<Promise<Result>> promises_;
773 static inline void spawn(const std::shared_ptr<WindowContext>& ctx) {
774 size_t i = ctx->i_++;
775 if (i < ctx->input_.size()) {
776 // Using setCallback_ directly since we don't need the Future
777 ctx->func_(std::move(ctx->input_[i])).setCallback_(
778 // ctx is captured by value
779 [ctx, i](Try<Result>&& t) {
780 ctx->promises_[i].setTry(std::move(t));
781 // Chain another future onto this one
782 spawn(std::move(ctx));
788 auto max = std::min(n, input.size());
790 auto ctx = std::make_shared<WindowContext>(
791 std::move(input), std::move(func));
793 for (size_t i = 0; i < max; ++i) {
794 // Start the first n Futures
795 WindowContext::spawn(ctx);
798 std::vector<Future<Result>> futures;
799 futures.reserve(ctx->promises_.size());
800 for (auto& promise : ctx->promises_) {
801 futures.emplace_back(promise.getFuture());
810 template <class I, class F>
811 Future<I> Future<T>::reduce(I&& initial, F&& func) {
812 folly::MoveWrapper<I> minitial(std::move(initial));
813 folly::MoveWrapper<F> mfunc(std::move(func));
814 return then([minitial, mfunc](T& vals) mutable {
815 auto ret = std::move(*minitial);
816 for (auto& val : vals) {
817 ret = (*mfunc)(std::move(ret), std::move(val));
823 // unorderedReduce (iterator)
825 template <class It, class T, class F, class ItT, class Arg>
826 Future<T> unorderedReduce(It first, It last, T initial, F func) {
828 return makeFuture(std::move(initial));
831 typedef isTry<Arg> IsTry;
833 struct UnorderedReduceContext {
834 UnorderedReduceContext(T&& memo, F&& fn, size_t n)
835 : lock_(), memo_(makeFuture<T>(std::move(memo))),
836 func_(std::move(fn)), numThens_(0), numFutures_(n), promise_()
838 folly::MicroSpinLock lock_; // protects memo_ and numThens_
841 size_t numThens_; // how many Futures completed and called .then()
842 size_t numFutures_; // how many Futures in total
846 auto ctx = std::make_shared<UnorderedReduceContext>(
847 std::move(initial), std::move(func), std::distance(first, last));
849 mapSetCallback<ItT>(first, last, [ctx](size_t i, Try<ItT>&& t) {
850 folly::MoveWrapper<Try<ItT>> mt(std::move(t));
851 // Futures can be completed in any order, simultaneously.
852 // To make this non-blocking, we create a new Future chain in
853 // the order of completion to reduce the values.
854 // The spinlock just protects chaining a new Future, not actually
855 // executing the reduce, which should be really fast.
856 folly::MSLGuard lock(ctx->lock_);
857 ctx->memo_ = ctx->memo_.then([ctx, mt](T&& v) mutable {
858 // Either return a ItT&& or a Try<ItT>&& depending
859 // on the type of the argument of func.
860 return ctx->func_(std::move(v), mt->template get<IsTry::value, Arg&&>());
862 if (++ctx->numThens_ == ctx->numFutures_) {
863 // After reducing the value of the last Future, fulfill the Promise
864 ctx->memo_.setCallback_([ctx](Try<T>&& t2) {
865 ctx->promise_.setValue(std::move(t2));
870 return ctx->promise_.getFuture();
876 Future<T> Future<T>::within(Duration dur, Timekeeper* tk) {
877 return within(dur, TimedOut(), tk);
882 Future<T> Future<T>::within(Duration dur, E e, Timekeeper* tk) {
885 Context(E ex) : exception(std::move(ex)), promise() {}
887 Future<Unit> thisFuture;
889 std::atomic<bool> token {false};
893 tk = folly::detail::getTimekeeperSingleton();
896 auto ctx = std::make_shared<Context>(std::move(e));
898 ctx->thisFuture = this->then([ctx](Try<T>&& t) mutable {
899 // TODO: "this" completed first, cancel "after"
900 if (ctx->token.exchange(true) == false) {
901 ctx->promise.setTry(std::move(t));
905 tk->after(dur).then([ctx](Try<Unit> const& t) mutable {
906 // "after" completed first, cancel "this"
907 ctx->thisFuture.raise(TimedOut());
908 if (ctx->token.exchange(true) == false) {
909 if (t.hasException()) {
910 ctx->promise.setException(std::move(t.exception()));
912 ctx->promise.setException(std::move(ctx->exception));
917 return ctx->promise.getFuture().via(getExecutor());
923 Future<T> Future<T>::delayed(Duration dur, Timekeeper* tk) {
924 return collectAll(*this, futures::sleep(dur, tk))
925 .then([](std::tuple<Try<T>, Try<Unit>> tup) {
926 Try<T>& t = std::get<0>(tup);
927 return makeFuture<T>(std::move(t));
934 void waitImpl(Future<T>& f) {
935 // short-circuit if there's nothing to do
936 if (f.isReady()) return;
938 folly::fibers::Baton baton;
939 f = f.then([&](Try<T> t) {
941 return makeFuture(std::move(t));
945 // There's a race here between the return here and the actual finishing of
946 // the future. f is completed, but the setup may not have finished on done
947 // after the baton has posted.
948 while (!f.isReady()) {
949 std::this_thread::yield();
954 void waitImpl(Future<T>& f, Duration dur) {
955 // short-circuit if there's nothing to do
956 if (f.isReady()) return;
958 auto baton = std::make_shared<folly::fibers::Baton>();
959 f = f.then([baton](Try<T> t) {
961 return makeFuture(std::move(t));
964 // Let's preserve the invariant that if we did not timeout (timed_wait returns
965 // true), then the returned Future is complete when it is returned to the
966 // caller. We need to wait out the race for that Future to complete.
967 if (baton->timed_wait(dur)) {
968 while (!f.isReady()) {
969 std::this_thread::yield();
975 void waitViaImpl(Future<T>& f, DrivableExecutor* e) {
976 while (!f.isReady()) {
984 Future<T>& Future<T>::wait() & {
985 detail::waitImpl(*this);
990 Future<T>&& Future<T>::wait() && {
991 detail::waitImpl(*this);
992 return std::move(*this);
996 Future<T>& Future<T>::wait(Duration dur) & {
997 detail::waitImpl(*this, dur);
1002 Future<T>&& Future<T>::wait(Duration dur) && {
1003 detail::waitImpl(*this, dur);
1004 return std::move(*this);
1008 Future<T>& Future<T>::waitVia(DrivableExecutor* e) & {
1009 detail::waitViaImpl(*this, e);
1014 Future<T>&& Future<T>::waitVia(DrivableExecutor* e) && {
1015 detail::waitViaImpl(*this, e);
1016 return std::move(*this);
1020 T Future<T>::get() {
1021 return std::move(wait().value());
1025 T Future<T>::get(Duration dur) {
1028 return std::move(value());
1035 T Future<T>::getVia(DrivableExecutor* e) {
1036 return std::move(waitVia(e).value());
1042 static bool equals(const Try<T>& t1, const Try<T>& t2) {
1043 return t1.value() == t2.value();
1049 Future<bool> Future<T>::willEqual(Future<T>& f) {
1050 return collectAll(*this, f).then([](const std::tuple<Try<T>, Try<T>>& t) {
1051 if (std::get<0>(t).hasValue() && std::get<1>(t).hasValue()) {
1052 return detail::TryEquals<T>::equals(std::get<0>(t), std::get<1>(t));
1061 Future<T> Future<T>::filter(F predicate) {
1062 auto p = folly::makeMoveWrapper(std::move(predicate));
1063 return this->then([p](T val) {
1064 T const& valConstRef = val;
1065 if (!(*p)(valConstRef)) {
1066 throw PredicateDoesNotObtain();
1073 template <class Callback>
1074 auto Future<T>::thenMulti(Callback&& fn)
1075 -> decltype(this->then(std::forward<Callback>(fn))) {
1076 // thenMulti with one callback is just a then
1077 return then(std::forward<Callback>(fn));
1081 template <class Callback, class... Callbacks>
1082 auto Future<T>::thenMulti(Callback&& fn, Callbacks&&... fns)
1083 -> decltype(this->then(std::forward<Callback>(fn)).
1084 thenMulti(std::forward<Callbacks>(fns)...)) {
1085 // thenMulti with two callbacks is just then(a).thenMulti(b, ...)
1086 return then(std::forward<Callback>(fn)).
1087 thenMulti(std::forward<Callbacks>(fns)...);
1091 template <class Callback, class... Callbacks>
1092 auto Future<T>::thenMultiWithExecutor(Executor* x, Callback&& fn,
1094 -> decltype(this->then(std::forward<Callback>(fn)).
1095 thenMulti(std::forward<Callbacks>(fns)...)) {
1096 // thenMultiExecutor with two callbacks is
1097 // via(x).then(a).thenMulti(b, ...).via(oldX)
1098 auto oldX = getExecutor();
1100 return then(std::forward<Callback>(fn)).
1101 thenMulti(std::forward<Callbacks>(fns)...).via(oldX);
1105 template <class Callback>
1106 auto Future<T>::thenMultiWithExecutor(Executor* x, Callback&& fn)
1107 -> decltype(this->then(std::forward<Callback>(fn))) {
1108 // thenMulti with one callback is just a then with an executor
1109 return then(x, std::forward<Callback>(fn));
1113 inline Future<Unit> when(bool p, F thunk) {
1114 return p ? thunk().unit() : makeFuture();
1117 template <class P, class F>
1118 Future<Unit> whileDo(P predicate, F thunk) {
1120 return thunk().then([=] {
1121 return whileDo(predicate, thunk);
1124 return makeFuture();
1128 Future<Unit> times(const int n, F thunk) {
1129 auto count = folly::makeMoveWrapper(
1130 std::unique_ptr<std::atomic<int>>(new std::atomic<int>(0))
1132 return folly::whileDo([=]() mutable {
1133 return (*count)->fetch_add(1) < n;
1138 template <class It, class F, class ItT, class Result>
1139 std::vector<Future<Result>> map(It first, It last, F func) {
1140 std::vector<Future<Result>> results;
1141 for (auto it = first; it != last; it++) {
1142 results.push_back(it->then(func));
1152 struct retrying_policy_raw_tag {};
1153 struct retrying_policy_fut_tag {};
1155 template <class Policy>
1156 struct retrying_policy_traits {
1157 using ew = exception_wrapper;
1158 FOLLY_CREATE_HAS_MEMBER_FN_TRAITS(has_op_call, operator());
1159 template <class Ret>
1160 using has_op = typename std::integral_constant<bool,
1161 has_op_call<Policy, Ret(size_t, const ew&)>::value ||
1162 has_op_call<Policy, Ret(size_t, const ew&) const>::value>;
1163 using is_raw = has_op<bool>;
1164 using is_fut = has_op<Future<bool>>;
1165 using tag = typename std::conditional<
1166 is_raw::value, retrying_policy_raw_tag, typename std::conditional<
1167 is_fut::value, retrying_policy_fut_tag, void>::type>::type;
1170 template <class Policy, class FF>
1171 typename std::result_of<FF(size_t)>::type
1172 retrying(size_t k, Policy&& p, FF&& ff) {
1173 using F = typename std::result_of<FF(size_t)>::type;
1174 using T = typename F::value_type;
1176 auto pm = makeMoveWrapper(p);
1177 auto ffm = makeMoveWrapper(ff);
1178 return f.onError([=](exception_wrapper x) mutable {
1179 auto q = (*pm)(k, x);
1180 auto xm = makeMoveWrapper(std::move(x));
1181 return q.then([=](bool r) mutable {
1183 ? retrying(k, pm.move(), ffm.move())
1184 : makeFuture<T>(xm.move());
1189 template <class Policy, class FF>
1190 typename std::result_of<FF(size_t)>::type
1191 retrying(Policy&& p, FF&& ff, retrying_policy_raw_tag) {
1192 auto pm = makeMoveWrapper(std::move(p));
1193 auto q = [=](size_t k, exception_wrapper x) {
1194 return makeFuture<bool>((*pm)(k, x));
1196 return retrying(0, std::move(q), std::forward<FF>(ff));
1199 template <class Policy, class FF>
1200 typename std::result_of<FF(size_t)>::type
1201 retrying(Policy&& p, FF&& ff, retrying_policy_fut_tag) {
1202 return retrying(0, std::forward<Policy>(p), std::forward<FF>(ff));
1205 // jittered exponential backoff, clamped to [backoff_min, backoff_max]
1206 template <class URNG>
1207 Duration retryingJitteredExponentialBackoffDur(
1209 Duration backoff_min,
1210 Duration backoff_max,
1211 double jitter_param,
1214 auto dist = std::normal_distribution<double>(0.0, jitter_param);
1215 auto jitter = std::exp(dist(rng));
1216 auto backoff = d(d::rep(jitter * backoff_min.count() * std::pow(2, n - 1)));
1217 return std::max(backoff_min, std::min(backoff_max, backoff));
1220 template <class Policy, class URNG>
1221 std::function<Future<bool>(size_t, const exception_wrapper&)>
1222 retryingPolicyCappedJitteredExponentialBackoff(
1224 Duration backoff_min,
1225 Duration backoff_max,
1226 double jitter_param,
1229 auto pm = makeMoveWrapper(std::move(p));
1230 auto rngp = std::make_shared<URNG>(std::move(rng));
1231 return [=](size_t n, const exception_wrapper& ex) mutable {
1232 if (n == max_tries) { return makeFuture(false); }
1233 return (*pm)(n, ex).then([=](bool v) {
1234 if (!v) { return makeFuture(false); }
1235 auto backoff = detail::retryingJitteredExponentialBackoffDur(
1236 n, backoff_min, backoff_max, jitter_param, *rngp);
1237 return futures::sleep(backoff).then([] { return true; });
1242 template <class Policy, class URNG>
1243 std::function<Future<bool>(size_t, const exception_wrapper&)>
1244 retryingPolicyCappedJitteredExponentialBackoff(
1246 Duration backoff_min,
1247 Duration backoff_max,
1248 double jitter_param,
1251 retrying_policy_raw_tag) {
1252 auto pm = makeMoveWrapper(std::move(p));
1253 auto q = [=](size_t n, const exception_wrapper& e) {
1254 return makeFuture((*pm)(n, e));
1256 return retryingPolicyCappedJitteredExponentialBackoff(
1265 template <class Policy, class URNG>
1266 std::function<Future<bool>(size_t, const exception_wrapper&)>
1267 retryingPolicyCappedJitteredExponentialBackoff(
1269 Duration backoff_min,
1270 Duration backoff_max,
1271 double jitter_param,
1274 retrying_policy_fut_tag) {
1275 return retryingPolicyCappedJitteredExponentialBackoff(
1286 template <class Policy, class FF>
1287 typename std::result_of<FF(size_t)>::type
1288 retrying(Policy&& p, FF&& ff) {
1289 using tag = typename detail::retrying_policy_traits<Policy>::tag;
1290 return detail::retrying(std::forward<Policy>(p), std::forward<FF>(ff), tag());
1294 std::function<bool(size_t, const exception_wrapper&)>
1295 retryingPolicyBasic(
1297 return [=](size_t n, const exception_wrapper&) { return n < max_tries; };
1300 template <class Policy, class URNG>
1301 std::function<Future<bool>(size_t, const exception_wrapper&)>
1302 retryingPolicyCappedJitteredExponentialBackoff(
1304 Duration backoff_min,
1305 Duration backoff_max,
1306 double jitter_param,
1309 using tag = typename detail::retrying_policy_traits<Policy>::tag;
1310 return detail::retryingPolicyCappedJitteredExponentialBackoff(
1321 std::function<Future<bool>(size_t, const exception_wrapper&)>
1322 retryingPolicyCappedJitteredExponentialBackoff(
1324 Duration backoff_min,
1325 Duration backoff_max,
1326 double jitter_param) {
1327 auto p = [](size_t, const exception_wrapper&) { return true; };
1328 return retryingPolicyCappedJitteredExponentialBackoff(
1339 // Instantiate the most common Future types to save compile time
1340 extern template class Future<Unit>;
1341 extern template class Future<bool>;
1342 extern template class Future<int>;
1343 extern template class Future<int64_t>;
1344 extern template class Future<std::string>;
1345 extern template class Future<double>;
1347 } // namespace folly