2 * Copyright 2016 Facebook, Inc.
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
8 * http://www.apache.org/licenses/LICENSE-2.0
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
24 #include <folly/Baton.h>
25 #include <folly/Optional.h>
26 #include <folly/Random.h>
27 #include <folly/Traits.h>
28 #include <folly/futures/detail/Core.h>
29 #include <folly/futures/Timekeeper.h>
31 #if FOLLY_MOBILE || defined(__APPLE__)
32 #define FOLLY_FUTURE_USING_FIBER 0
34 #define FOLLY_FUTURE_USING_FIBER 1
35 #include <folly/fibers/Baton.h>
43 #if FOLLY_FUTURE_USING_FIBER
44 typedef folly::fibers::Baton FutureBatonType;
46 typedef folly::Baton<> FutureBatonType;
51 std::shared_ptr<Timekeeper> getTimekeeperSingleton();
55 Future<T>::Future(Future<T>&& other) noexcept : core_(other.core_) {
56 other.core_ = nullptr;
60 Future<T>& Future<T>::operator=(Future<T>&& other) noexcept {
61 std::swap(core_, other.core_);
66 template <typename U, typename>
67 Future<T>::Future(Future<U>&& other) noexcept
68 : core_(detail::Core<T>::convert(other.core_)) {
69 other.core_ = nullptr;
73 template <typename U, typename>
74 Future<T>& Future<T>::operator=(Future<U>&& other) noexcept {
75 std::swap(core_, detail::Core<T>::convert(other.core_));
80 template <class T2, typename>
81 Future<T>::Future(T2&& val)
82 : core_(new detail::Core<T>(Try<T>(std::forward<T2>(val)))) {}
85 template <typename T2>
86 Future<T>::Future(typename std::enable_if<std::is_same<Unit, T2>::value>::type*)
87 : core_(new detail::Core<T>(Try<T>(T()))) {}
90 Future<T>::~Future() {
95 void Future<T>::detach() {
97 core_->detachFuture();
103 void Future<T>::throwIfInvalid() const {
110 void Future<T>::setCallback_(F&& func) {
112 core_->setCallback(std::forward<F>(func));
119 typename std::enable_if<isFuture<F>::value,
120 Future<typename isFuture<T>::Inner>>::type
121 Future<T>::unwrap() {
122 return then([](Future<typename isFuture<T>::Inner> internal_future) {
123 return internal_future;
129 // Variant: returns a value
130 // e.g. f.then([](Try<T>&& t){ return t.value(); });
132 template <typename F, typename R, bool isTry, typename... Args>
133 typename std::enable_if<!R::ReturnsFuture::value, typename R::Return>::type
134 Future<T>::thenImplementation(F&& func, detail::argResult<isTry, F, Args...>) {
135 static_assert(sizeof...(Args) <= 1, "Then must take zero/one argument");
136 typedef typename R::ReturnsFuture::Inner B;
141 p.core_->setInterruptHandlerNoLock(core_->getInterruptHandler());
143 // grab the Future now before we lose our handle on the Promise
144 auto f = p.getFuture();
145 f.core_->setExecutorNoLock(getExecutor());
147 /* This is a bit tricky.
149 We can't just close over *this in case this Future gets moved. So we
150 make a new dummy Future. We could figure out something more
151 sophisticated that avoids making a new Future object when it can, as an
152 optimization. But this is correct.
154 core_ can't be moved, it is explicitly disallowed (as is copying). But
155 if there's ever a reason to allow it, this is one place that makes that
156 assumption and would need to be fixed. We use a standard shared pointer
157 for core_ (by copying it in), which means in essence obj holds a shared
158 pointer to itself. But this shouldn't leak because Promise will not
159 outlive the continuation, because Promise will setException() with a
160 broken Promise if it is destructed before completed. We could use a
161 weak pointer but it would have to be converted to a shared pointer when
162 func is executed (because the Future returned by func may possibly
163 persist beyond the callback, if it gets moved), and so it is an
164 optimization to just make it shared from the get-go.
166 Two subtle but important points about this design. detail::Core has no
167 back pointers to Future or Promise, so if Future or Promise get moved
168 (and they will be moved in performant code) we don't have to do
169 anything fancy. And because we store the continuation in the
170 detail::Core, not in the Future, we can execute the continuation even
171 after the Future has gone out of scope. This is an intentional design
172 decision. It is likely we will want to be able to cancel a continuation
173 in some circumstances, but I think it should be explicit not implicit
174 in the destruction of the Future used to create it.
176 setCallback_([ funcm = std::forward<F>(func), pm = std::move(p) ](
177 Try<T> && t) mutable {
178 if (!isTry && t.hasException()) {
179 pm.setException(std::move(t.exception()));
181 pm.setWith([&]() { return funcm(t.template get<isTry, Args>()...); });
188 // Variant: returns a Future
189 // e.g. f.then([](T&& t){ return makeFuture<T>(t); });
191 template <typename F, typename R, bool isTry, typename... Args>
192 typename std::enable_if<R::ReturnsFuture::value, typename R::Return>::type
193 Future<T>::thenImplementation(F&& func, detail::argResult<isTry, F, Args...>) {
194 static_assert(sizeof...(Args) <= 1, "Then must take zero/one argument");
195 typedef typename R::ReturnsFuture::Inner B;
200 p.core_->setInterruptHandlerNoLock(core_->getInterruptHandler());
202 // grab the Future now before we lose our handle on the Promise
203 auto f = p.getFuture();
204 f.core_->setExecutorNoLock(getExecutor());
206 setCallback_([ funcm = std::forward<F>(func), pm = std::move(p) ](
207 Try<T> && t) mutable {
208 if (!isTry && t.hasException()) {
209 pm.setException(std::move(t.exception()));
212 auto f2 = funcm(t.template get<isTry, Args>()...);
213 // that didn't throw, now we can steal p
214 f2.setCallback_([p = std::move(pm)](Try<B> && b) mutable {
215 p.setTry(std::move(b));
217 } catch (const std::exception& e) {
218 pm.setException(exception_wrapper(std::current_exception(), e));
220 pm.setException(exception_wrapper(std::current_exception()));
228 template <typename T>
229 template <typename R, typename Caller, typename... Args>
230 Future<typename isFuture<R>::Inner>
231 Future<T>::then(R(Caller::*func)(Args...), Caller *instance) {
232 typedef typename std::remove_cv<
233 typename std::remove_reference<
234 typename detail::ArgType<Args...>::FirstArg>::type>::type FirstArg;
235 return then([instance, func](Try<T>&& t){
236 return (instance->*func)(t.template get<isTry<FirstArg>::value, Args>()...);
241 template <class Executor, class Arg, class... Args>
242 auto Future<T>::then(Executor* x, Arg&& arg, Args&&... args)
243 -> decltype(this->then(std::forward<Arg>(arg),
244 std::forward<Args>(args)...))
246 auto oldX = getExecutor();
248 return this->then(std::forward<Arg>(arg), std::forward<Args>(args)...).
253 Future<Unit> Future<T>::then() {
254 return then([] () {});
257 // onError where the callback returns T
260 typename std::enable_if<
261 !detail::callableWith<F, exception_wrapper>::value &&
262 !detail::Extract<F>::ReturnsFuture::value,
264 Future<T>::onError(F&& func) {
265 typedef typename detail::Extract<F>::FirstArg Exn;
267 std::is_same<typename detail::Extract<F>::RawReturn, T>::value,
268 "Return type of onError callback must be T or Future<T>");
271 p.core_->setInterruptHandlerNoLock(core_->getInterruptHandler());
272 auto f = p.getFuture();
274 setCallback_([ funcm = std::forward<F>(func), pm = std::move(p) ](
275 Try<T> && t) mutable {
276 if (!t.template withException<Exn>(
277 [&](Exn& e) { pm.setWith([&] { return funcm(e); }); })) {
278 pm.setTry(std::move(t));
285 // onError where the callback returns Future<T>
288 typename std::enable_if<
289 !detail::callableWith<F, exception_wrapper>::value &&
290 detail::Extract<F>::ReturnsFuture::value,
292 Future<T>::onError(F&& func) {
294 std::is_same<typename detail::Extract<F>::Return, Future<T>>::value,
295 "Return type of onError callback must be T or Future<T>");
296 typedef typename detail::Extract<F>::FirstArg Exn;
299 auto f = p.getFuture();
301 setCallback_([ pm = std::move(p), funcm = std::forward<F>(func) ](
302 Try<T> && t) mutable {
303 if (!t.template withException<Exn>([&](Exn& e) {
306 f2.setCallback_([pm = std::move(pm)](Try<T> && t2) mutable {
307 pm.setTry(std::move(t2));
309 } catch (const std::exception& e2) {
310 pm.setException(exception_wrapper(std::current_exception(), e2));
312 pm.setException(exception_wrapper(std::current_exception()));
315 pm.setTry(std::move(t));
324 Future<T> Future<T>::ensure(F&& func) {
325 return this->then([funcw = std::forward<F>(func)](Try<T> && t) mutable {
327 return makeFuture(std::move(t));
333 Future<T> Future<T>::onTimeout(Duration dur, F&& func, Timekeeper* tk) {
334 return within(dur, tk).onError([funcw = std::forward<F>(func)](
335 TimedOut const&) { return funcw(); });
340 typename std::enable_if<detail::callableWith<F, exception_wrapper>::value &&
341 detail::Extract<F>::ReturnsFuture::value,
343 Future<T>::onError(F&& func) {
345 std::is_same<typename detail::Extract<F>::Return, Future<T>>::value,
346 "Return type of onError callback must be T or Future<T>");
349 auto f = p.getFuture();
351 [ pm = std::move(p), funcm = std::forward<F>(func) ](Try<T> t) mutable {
352 if (t.hasException()) {
354 auto f2 = funcm(std::move(t.exception()));
355 f2.setCallback_([pm = std::move(pm)](Try<T> t2) mutable {
356 pm.setTry(std::move(t2));
358 } catch (const std::exception& e2) {
359 pm.setException(exception_wrapper(std::current_exception(), e2));
361 pm.setException(exception_wrapper(std::current_exception()));
364 pm.setTry(std::move(t));
371 // onError(exception_wrapper) that returns T
374 typename std::enable_if<
375 detail::callableWith<F, exception_wrapper>::value &&
376 !detail::Extract<F>::ReturnsFuture::value,
378 Future<T>::onError(F&& func) {
380 std::is_same<typename detail::Extract<F>::Return, Future<T>>::value,
381 "Return type of onError callback must be T or Future<T>");
384 auto f = p.getFuture();
386 [ pm = std::move(p), funcm = std::forward<F>(func) ](Try<T> t) mutable {
387 if (t.hasException()) {
388 pm.setWith([&] { return funcm(std::move(t.exception())); });
390 pm.setTry(std::move(t));
398 typename std::add_lvalue_reference<T>::type Future<T>::value() {
401 return core_->getTry().value();
405 typename std::add_lvalue_reference<const T>::type Future<T>::value() const {
408 return core_->getTry().value();
412 Try<T>& Future<T>::getTry() {
415 return core_->getTry();
419 Optional<Try<T>> Future<T>::poll() {
421 if (core_->ready()) {
422 o = std::move(core_->getTry());
428 inline Future<T> Future<T>::via(Executor* executor, int8_t priority) && {
431 setExecutor(executor, priority);
433 return std::move(*this);
437 inline Future<T> Future<T>::via(Executor* executor, int8_t priority) & {
441 auto f = p.getFuture();
442 then([p = std::move(p)](Try<T> && t) mutable { p.setTry(std::move(t)); });
443 return std::move(f).via(executor, priority);
446 template <class Func>
447 auto via(Executor* x, Func&& func)
448 -> Future<typename isFuture<decltype(func())>::Inner>
450 // TODO make this actually more performant. :-P #7260175
451 return via(x).then(std::forward<Func>(func));
455 bool Future<T>::isReady() const {
457 return core_->ready();
461 bool Future<T>::hasValue() {
462 return getTry().hasValue();
466 bool Future<T>::hasException() {
467 return getTry().hasException();
471 void Future<T>::raise(exception_wrapper exception) {
472 core_->raise(std::move(exception));
478 Future<typename std::decay<T>::type> makeFuture(T&& t) {
479 return makeFuture(Try<typename std::decay<T>::type>(std::forward<T>(t)));
482 inline // for multiple translation units
483 Future<Unit> makeFuture() {
484 return makeFuture(Unit{});
487 // makeFutureWith(Future<T>()) -> Future<T>
489 typename std::enable_if<isFuture<typename std::result_of<F()>::type>::value,
490 typename std::result_of<F()>::type>::type
491 makeFutureWith(F&& func) {
493 typename isFuture<typename std::result_of<F()>::type>::Inner;
496 } catch (std::exception& e) {
497 return makeFuture<InnerType>(
498 exception_wrapper(std::current_exception(), e));
500 return makeFuture<InnerType>(exception_wrapper(std::current_exception()));
504 // makeFutureWith(T()) -> Future<T>
505 // makeFutureWith(void()) -> Future<Unit>
507 typename std::enable_if<
508 !(isFuture<typename std::result_of<F()>::type>::value),
509 Future<typename Unit::Lift<typename std::result_of<F()>::type>::type>>::type
510 makeFutureWith(F&& func) {
512 typename Unit::Lift<typename std::result_of<F()>::type>::type;
513 return makeFuture<LiftedResult>(makeTryWith([&func]() mutable {
519 Future<T> makeFuture(std::exception_ptr const& e) {
520 return makeFuture(Try<T>(e));
524 Future<T> makeFuture(exception_wrapper ew) {
525 return makeFuture(Try<T>(std::move(ew)));
528 template <class T, class E>
529 typename std::enable_if<std::is_base_of<std::exception, E>::value,
531 makeFuture(E const& e) {
532 return makeFuture(Try<T>(make_exception_wrapper<E>(e)));
536 Future<T> makeFuture(Try<T>&& t) {
537 return Future<T>(new detail::Core<T>(std::move(t)));
541 Future<Unit> via(Executor* executor, int8_t priority) {
542 return makeFuture().via(executor, priority);
545 // mapSetCallback calls func(i, Try<T>) when every future completes
547 template <class T, class InputIterator, class F>
548 void mapSetCallback(InputIterator first, InputIterator last, F func) {
549 for (size_t i = 0; first != last; ++first, ++i) {
550 first->setCallback_([func, i](Try<T>&& t) {
551 func(i, std::move(t));
556 // collectAll (variadic)
558 template <typename... Fs>
559 typename detail::CollectAllVariadicContext<
560 typename std::decay<Fs>::type::value_type...>::type
561 collectAll(Fs&&... fs) {
562 auto ctx = std::make_shared<detail::CollectAllVariadicContext<
563 typename std::decay<Fs>::type::value_type...>>();
564 detail::collectVariadicHelper<detail::CollectAllVariadicContext>(
565 ctx, std::forward<typename std::decay<Fs>::type>(fs)...);
566 return ctx->p.getFuture();
569 // collectAll (iterator)
571 template <class InputIterator>
574 Try<typename std::iterator_traits<InputIterator>::value_type::value_type>>>
575 collectAll(InputIterator first, InputIterator last) {
577 typename std::iterator_traits<InputIterator>::value_type::value_type T;
579 struct CollectAllContext {
580 CollectAllContext(int n) : results(n) {}
581 ~CollectAllContext() {
582 p.setValue(std::move(results));
584 Promise<std::vector<Try<T>>> p;
585 std::vector<Try<T>> results;
588 auto ctx = std::make_shared<CollectAllContext>(std::distance(first, last));
589 mapSetCallback<T>(first, last, [ctx](size_t i, Try<T>&& t) {
590 ctx->results[i] = std::move(t);
592 return ctx->p.getFuture();
595 // collect (iterator)
599 template <typename T>
600 struct CollectContext {
602 explicit Nothing(int /* n */) {}
605 using Result = typename std::conditional<
606 std::is_void<T>::value,
608 std::vector<T>>::type;
610 using InternalResult = typename std::conditional<
611 std::is_void<T>::value,
613 std::vector<Optional<T>>>::type;
615 explicit CollectContext(int n) : result(n) {}
617 if (!threw.exchange(true)) {
618 // map Optional<T> -> T
619 std::vector<T> finalResult;
620 finalResult.reserve(result.size());
621 std::transform(result.begin(), result.end(),
622 std::back_inserter(finalResult),
623 [](Optional<T>& o) { return std::move(o.value()); });
624 p.setValue(std::move(finalResult));
627 inline void setPartialResult(size_t i, Try<T>& t) {
628 result[i] = std::move(t.value());
631 InternalResult result;
632 std::atomic<bool> threw {false};
637 template <class InputIterator>
638 Future<typename detail::CollectContext<
639 typename std::iterator_traits<InputIterator>::value_type::value_type>::Result>
640 collect(InputIterator first, InputIterator last) {
642 typename std::iterator_traits<InputIterator>::value_type::value_type T;
644 auto ctx = std::make_shared<detail::CollectContext<T>>(
645 std::distance(first, last));
646 mapSetCallback<T>(first, last, [ctx](size_t i, Try<T>&& t) {
647 if (t.hasException()) {
648 if (!ctx->threw.exchange(true)) {
649 ctx->p.setException(std::move(t.exception()));
651 } else if (!ctx->threw) {
652 ctx->setPartialResult(i, t);
655 return ctx->p.getFuture();
658 // collect (variadic)
660 template <typename... Fs>
661 typename detail::CollectVariadicContext<
662 typename std::decay<Fs>::type::value_type...>::type
663 collect(Fs&&... fs) {
664 auto ctx = std::make_shared<detail::CollectVariadicContext<
665 typename std::decay<Fs>::type::value_type...>>();
666 detail::collectVariadicHelper<detail::CollectVariadicContext>(
667 ctx, std::forward<typename std::decay<Fs>::type>(fs)...);
668 return ctx->p.getFuture();
671 // collectAny (iterator)
673 template <class InputIterator>
678 std::iterator_traits<InputIterator>::value_type::value_type>>>
679 collectAny(InputIterator first, InputIterator last) {
681 typename std::iterator_traits<InputIterator>::value_type::value_type T;
683 struct CollectAnyContext {
684 CollectAnyContext() {};
685 Promise<std::pair<size_t, Try<T>>> p;
686 std::atomic<bool> done {false};
689 auto ctx = std::make_shared<CollectAnyContext>();
690 mapSetCallback<T>(first, last, [ctx](size_t i, Try<T>&& t) {
691 if (!ctx->done.exchange(true)) {
692 ctx->p.setValue(std::make_pair(i, std::move(t)));
695 return ctx->p.getFuture();
698 // collectN (iterator)
700 template <class InputIterator>
701 Future<std::vector<std::pair<size_t, Try<typename
702 std::iterator_traits<InputIterator>::value_type::value_type>>>>
703 collectN(InputIterator first, InputIterator last, size_t n) {
705 std::iterator_traits<InputIterator>::value_type::value_type T;
706 typedef std::vector<std::pair<size_t, Try<T>>> V;
708 struct CollectNContext {
710 std::atomic<size_t> completed = {0};
713 auto ctx = std::make_shared<CollectNContext>();
715 if (size_t(std::distance(first, last)) < n) {
716 ctx->p.setException(std::runtime_error("Not enough futures"));
718 // for each completed Future, increase count and add to vector, until we
719 // have n completed futures at which point we fulfil our Promise with the
721 mapSetCallback<T>(first, last, [ctx, n](size_t i, Try<T>&& t) {
722 auto c = ++ctx->completed;
724 assert(ctx->v.size() < n);
725 ctx->v.emplace_back(i, std::move(t));
727 ctx->p.setTry(Try<V>(std::move(ctx->v)));
733 return ctx->p.getFuture();
738 template <class It, class T, class F>
739 Future<T> reduce(It first, It last, T&& initial, F&& func) {
741 return makeFuture(std::move(initial));
744 typedef typename std::iterator_traits<It>::value_type::value_type ItT;
746 typename std::conditional<detail::callableWith<F, T&&, Try<ItT>&&>::value,
749 typedef isTry<Arg> IsTry;
751 auto sfunc = std::make_shared<F>(std::move(func));
753 auto f = first->then(
754 [ minitial = std::move(initial), sfunc ](Try<ItT> & head) mutable {
756 std::move(minitial), head.template get<IsTry::value, Arg&&>());
759 for (++first; first != last; ++first) {
760 f = collectAll(f, *first).then([sfunc](std::tuple<Try<T>, Try<ItT>>& t) {
761 return (*sfunc)(std::move(std::get<0>(t).value()),
762 // Either return a ItT&& or a Try<ItT>&& depending
763 // on the type of the argument of func.
764 std::get<1>(t).template get<IsTry::value, Arg&&>());
771 // window (collection)
773 template <class Collection, class F, class ItT, class Result>
774 std::vector<Future<Result>>
775 window(Collection input, F func, size_t n) {
776 struct WindowContext {
777 WindowContext(Collection&& i, F&& fn)
778 : input_(std::move(i)), promises_(input_.size()),
781 std::atomic<size_t> i_ {0};
783 std::vector<Promise<Result>> promises_;
786 static inline void spawn(const std::shared_ptr<WindowContext>& ctx) {
787 size_t i = ctx->i_++;
788 if (i < ctx->input_.size()) {
789 // Using setCallback_ directly since we don't need the Future
790 ctx->func_(std::move(ctx->input_[i])).setCallback_(
791 // ctx is captured by value
792 [ctx, i](Try<Result>&& t) {
793 ctx->promises_[i].setTry(std::move(t));
794 // Chain another future onto this one
795 spawn(std::move(ctx));
801 auto max = std::min(n, input.size());
803 auto ctx = std::make_shared<WindowContext>(
804 std::move(input), std::move(func));
806 for (size_t i = 0; i < max; ++i) {
807 // Start the first n Futures
808 WindowContext::spawn(ctx);
811 std::vector<Future<Result>> futures;
812 futures.reserve(ctx->promises_.size());
813 for (auto& promise : ctx->promises_) {
814 futures.emplace_back(promise.getFuture());
823 template <class I, class F>
824 Future<I> Future<T>::reduce(I&& initial, F&& func) {
826 minitial = std::forward<I>(initial),
827 mfunc = std::forward<F>(func)
829 auto ret = std::move(minitial);
830 for (auto& val : vals) {
831 ret = mfunc(std::move(ret), std::move(val));
837 // unorderedReduce (iterator)
839 template <class It, class T, class F, class ItT, class Arg>
840 Future<T> unorderedReduce(It first, It last, T initial, F func) {
842 return makeFuture(std::move(initial));
845 typedef isTry<Arg> IsTry;
847 struct UnorderedReduceContext {
848 UnorderedReduceContext(T&& memo, F&& fn, size_t n)
849 : lock_(), memo_(makeFuture<T>(std::move(memo))),
850 func_(std::move(fn)), numThens_(0), numFutures_(n), promise_()
852 folly::MicroSpinLock lock_; // protects memo_ and numThens_
855 size_t numThens_; // how many Futures completed and called .then()
856 size_t numFutures_; // how many Futures in total
860 auto ctx = std::make_shared<UnorderedReduceContext>(
861 std::move(initial), std::move(func), std::distance(first, last));
866 [ctx](size_t /* i */, Try<ItT>&& t) {
867 // Futures can be completed in any order, simultaneously.
868 // To make this non-blocking, we create a new Future chain in
869 // the order of completion to reduce the values.
870 // The spinlock just protects chaining a new Future, not actually
871 // executing the reduce, which should be really fast.
872 folly::MSLGuard lock(ctx->lock_);
874 ctx->memo_.then([ ctx, mt = std::move(t) ](T && v) mutable {
875 // Either return a ItT&& or a Try<ItT>&& depending
876 // on the type of the argument of func.
877 return ctx->func_(std::move(v),
878 mt.template get<IsTry::value, Arg&&>());
880 if (++ctx->numThens_ == ctx->numFutures_) {
881 // After reducing the value of the last Future, fulfill the Promise
882 ctx->memo_.setCallback_(
883 [ctx](Try<T>&& t2) { ctx->promise_.setValue(std::move(t2)); });
887 return ctx->promise_.getFuture();
893 Future<T> Future<T>::within(Duration dur, Timekeeper* tk) {
894 return within(dur, TimedOut(), tk);
899 Future<T> Future<T>::within(Duration dur, E e, Timekeeper* tk) {
902 Context(E ex) : exception(std::move(ex)), promise() {}
904 Future<Unit> thisFuture;
906 std::atomic<bool> token {false};
909 std::shared_ptr<Timekeeper> tks;
911 tks = folly::detail::getTimekeeperSingleton();
912 tk = DCHECK_NOTNULL(tks.get());
915 auto ctx = std::make_shared<Context>(std::move(e));
917 ctx->thisFuture = this->then([ctx](Try<T>&& t) mutable {
918 // TODO: "this" completed first, cancel "after"
919 if (ctx->token.exchange(true) == false) {
920 ctx->promise.setTry(std::move(t));
924 tk->after(dur).then([ctx](Try<Unit> const& t) mutable {
925 // "after" completed first, cancel "this"
926 ctx->thisFuture.raise(TimedOut());
927 if (ctx->token.exchange(true) == false) {
928 if (t.hasException()) {
929 ctx->promise.setException(std::move(t.exception()));
931 ctx->promise.setException(std::move(ctx->exception));
936 return ctx->promise.getFuture().via(getExecutor());
942 Future<T> Future<T>::delayed(Duration dur, Timekeeper* tk) {
943 return collectAll(*this, futures::sleep(dur, tk))
944 .then([](std::tuple<Try<T>, Try<Unit>> tup) {
945 Try<T>& t = std::get<0>(tup);
946 return makeFuture<T>(std::move(t));
953 void waitImpl(Future<T>& f) {
954 // short-circuit if there's nothing to do
955 if (f.isReady()) return;
957 FutureBatonType baton;
958 f.setCallback_([&](const Try<T>& /* t */) { baton.post(); });
964 void waitImpl(Future<T>& f, Duration dur) {
965 // short-circuit if there's nothing to do
971 auto ret = promise.getFuture();
972 auto baton = std::make_shared<FutureBatonType>();
973 f.setCallback_([ baton, promise = std::move(promise) ](Try<T> && t) mutable {
974 promise.setTry(std::move(t));
978 if (baton->timed_wait(dur)) {
984 void waitViaImpl(Future<T>& f, DrivableExecutor* e) {
985 // Set callback so to ensure that the via executor has something on it
986 // so that once the preceding future triggers this callback, drive will
987 // always have a callback to satisfy it
990 f = f.then([](T&& t) { return std::move(t); });
991 while (!f.isReady()) {
1000 Future<T>& Future<T>::wait() & {
1001 detail::waitImpl(*this);
1006 Future<T>&& Future<T>::wait() && {
1007 detail::waitImpl(*this);
1008 return std::move(*this);
1012 Future<T>& Future<T>::wait(Duration dur) & {
1013 detail::waitImpl(*this, dur);
1018 Future<T>&& Future<T>::wait(Duration dur) && {
1019 detail::waitImpl(*this, dur);
1020 return std::move(*this);
1024 Future<T>& Future<T>::waitVia(DrivableExecutor* e) & {
1025 detail::waitViaImpl(*this, e);
1030 Future<T>&& Future<T>::waitVia(DrivableExecutor* e) && {
1031 detail::waitViaImpl(*this, e);
1032 return std::move(*this);
1036 T Future<T>::get() {
1037 return std::move(wait().value());
1041 T Future<T>::get(Duration dur) {
1044 return std::move(value());
1051 T Future<T>::getVia(DrivableExecutor* e) {
1052 return std::move(waitVia(e).value());
1058 static bool equals(const Try<T>& t1, const Try<T>& t2) {
1059 return t1.value() == t2.value();
1065 Future<bool> Future<T>::willEqual(Future<T>& f) {
1066 return collectAll(*this, f).then([](const std::tuple<Try<T>, Try<T>>& t) {
1067 if (std::get<0>(t).hasValue() && std::get<1>(t).hasValue()) {
1068 return detail::TryEquals<T>::equals(std::get<0>(t), std::get<1>(t));
1077 Future<T> Future<T>::filter(F&& predicate) {
1078 return this->then([p = std::forward<F>(predicate)](T val) {
1079 T const& valConstRef = val;
1080 if (!p(valConstRef)) {
1081 throw PredicateDoesNotObtain();
1088 template <class Callback>
1089 auto Future<T>::thenMulti(Callback&& fn)
1090 -> decltype(this->then(std::forward<Callback>(fn))) {
1091 // thenMulti with one callback is just a then
1092 return then(std::forward<Callback>(fn));
1096 template <class Callback, class... Callbacks>
1097 auto Future<T>::thenMulti(Callback&& fn, Callbacks&&... fns)
1098 -> decltype(this->then(std::forward<Callback>(fn)).
1099 thenMulti(std::forward<Callbacks>(fns)...)) {
1100 // thenMulti with two callbacks is just then(a).thenMulti(b, ...)
1101 return then(std::forward<Callback>(fn)).
1102 thenMulti(std::forward<Callbacks>(fns)...);
1106 template <class Callback, class... Callbacks>
1107 auto Future<T>::thenMultiWithExecutor(Executor* x, Callback&& fn,
1109 -> decltype(this->then(std::forward<Callback>(fn)).
1110 thenMulti(std::forward<Callbacks>(fns)...)) {
1111 // thenMultiExecutor with two callbacks is
1112 // via(x).then(a).thenMulti(b, ...).via(oldX)
1113 auto oldX = getExecutor();
1115 return then(std::forward<Callback>(fn)).
1116 thenMulti(std::forward<Callbacks>(fns)...).via(oldX);
1120 template <class Callback>
1121 auto Future<T>::thenMultiWithExecutor(Executor* x, Callback&& fn)
1122 -> decltype(this->then(std::forward<Callback>(fn))) {
1123 // thenMulti with one callback is just a then with an executor
1124 return then(x, std::forward<Callback>(fn));
1128 inline Future<Unit> when(bool p, F&& thunk) {
1129 return p ? std::forward<F>(thunk)().unit() : makeFuture();
1132 template <class P, class F>
1133 Future<Unit> whileDo(P&& predicate, F&& thunk) {
1135 auto future = thunk();
1136 return future.then([
1137 predicate = std::forward<P>(predicate),
1138 thunk = std::forward<F>(thunk)
1140 return whileDo(std::forward<P>(predicate), std::forward<F>(thunk));
1143 return makeFuture();
1147 Future<Unit> times(const int n, F&& thunk) {
1148 return folly::whileDo(
1149 [ n, count = folly::make_unique<std::atomic<int>>(0) ]() mutable {
1150 return count->fetch_add(1) < n;
1152 std::forward<F>(thunk));
1156 template <class It, class F, class ItT, class Result>
1157 std::vector<Future<Result>> map(It first, It last, F func) {
1158 std::vector<Future<Result>> results;
1159 for (auto it = first; it != last; it++) {
1160 results.push_back(it->then(func));
1170 struct retrying_policy_raw_tag {};
1171 struct retrying_policy_fut_tag {};
1173 template <class Policy>
1174 struct retrying_policy_traits {
1175 using ew = exception_wrapper;
1176 FOLLY_CREATE_HAS_MEMBER_FN_TRAITS(has_op_call, operator());
1177 template <class Ret>
1178 using has_op = typename std::integral_constant<bool,
1179 has_op_call<Policy, Ret(size_t, const ew&)>::value ||
1180 has_op_call<Policy, Ret(size_t, const ew&) const>::value>;
1181 using is_raw = has_op<bool>;
1182 using is_fut = has_op<Future<bool>>;
1183 using tag = typename std::conditional<
1184 is_raw::value, retrying_policy_raw_tag, typename std::conditional<
1185 is_fut::value, retrying_policy_fut_tag, void>::type>::type;
1188 template <class Policy, class FF>
1189 typename std::result_of<FF(size_t)>::type
1190 retrying(size_t k, Policy&& p, FF&& ff) {
1191 using F = typename std::result_of<FF(size_t)>::type;
1192 using T = typename F::value_type;
1195 [ k, pm = std::forward<Policy>(p), ffm = std::forward<FF>(ff) ](
1196 exception_wrapper x) mutable {
1199 [ k, xm = std::move(x), pm = std::move(pm), ffm = std::move(ffm) ](
1201 return r ? retrying(k, std::move(pm), std::move(ffm))
1202 : makeFuture<T>(std::move(xm));
1207 template <class Policy, class FF>
1208 typename std::result_of<FF(size_t)>::type
1209 retrying(Policy&& p, FF&& ff, retrying_policy_raw_tag) {
1210 auto q = [pm = std::forward<Policy>(p)](size_t k, exception_wrapper x) {
1211 return makeFuture<bool>(pm(k, x));
1213 return retrying(0, std::move(q), std::forward<FF>(ff));
1216 template <class Policy, class FF>
1217 typename std::result_of<FF(size_t)>::type
1218 retrying(Policy&& p, FF&& ff, retrying_policy_fut_tag) {
1219 return retrying(0, std::forward<Policy>(p), std::forward<FF>(ff));
1222 // jittered exponential backoff, clamped to [backoff_min, backoff_max]
1223 template <class URNG>
1224 Duration retryingJitteredExponentialBackoffDur(
1226 Duration backoff_min,
1227 Duration backoff_max,
1228 double jitter_param,
1231 auto dist = std::normal_distribution<double>(0.0, jitter_param);
1232 auto jitter = std::exp(dist(rng));
1233 auto backoff = d(d::rep(jitter * backoff_min.count() * std::pow(2, n - 1)));
1234 return std::max(backoff_min, std::min(backoff_max, backoff));
1237 template <class Policy, class URNG>
1238 std::function<Future<bool>(size_t, const exception_wrapper&)>
1239 retryingPolicyCappedJitteredExponentialBackoff(
1241 Duration backoff_min,
1242 Duration backoff_max,
1243 double jitter_param,
1247 pm = std::forward<Policy>(p),
1252 rngp = std::forward<URNG>(rng)
1253 ](size_t n, const exception_wrapper& ex) mutable {
1254 if (n == max_tries) {
1255 return makeFuture(false);
1257 return pm(n, ex).then(
1258 [ n, backoff_min, backoff_max, jitter_param, rngp = std::move(rngp) ](
1261 return makeFuture(false);
1263 auto backoff = detail::retryingJitteredExponentialBackoffDur(
1264 n, backoff_min, backoff_max, jitter_param, rngp);
1265 return futures::sleep(backoff).then([] { return true; });
1270 template <class Policy, class URNG>
1271 std::function<Future<bool>(size_t, const exception_wrapper&)>
1272 retryingPolicyCappedJitteredExponentialBackoff(
1274 Duration backoff_min,
1275 Duration backoff_max,
1276 double jitter_param,
1279 retrying_policy_raw_tag) {
1280 auto q = [pm = std::forward<Policy>(p)](
1281 size_t n, const exception_wrapper& e) {
1282 return makeFuture(pm(n, e));
1284 return retryingPolicyCappedJitteredExponentialBackoff(
1289 std::forward<URNG>(rng),
1293 template <class Policy, class URNG>
1294 std::function<Future<bool>(size_t, const exception_wrapper&)>
1295 retryingPolicyCappedJitteredExponentialBackoff(
1297 Duration backoff_min,
1298 Duration backoff_max,
1299 double jitter_param,
1302 retrying_policy_fut_tag) {
1303 return retryingPolicyCappedJitteredExponentialBackoff(
1308 std::forward<URNG>(rng),
1309 std::forward<Policy>(p));
1313 template <class Policy, class FF>
1314 typename std::result_of<FF(size_t)>::type
1315 retrying(Policy&& p, FF&& ff) {
1316 using tag = typename detail::retrying_policy_traits<Policy>::tag;
1317 return detail::retrying(std::forward<Policy>(p), std::forward<FF>(ff), tag());
1321 std::function<bool(size_t, const exception_wrapper&)>
1322 retryingPolicyBasic(
1324 return [=](size_t n, const exception_wrapper&) { return n < max_tries; };
1327 template <class Policy, class URNG>
1328 std::function<Future<bool>(size_t, const exception_wrapper&)>
1329 retryingPolicyCappedJitteredExponentialBackoff(
1331 Duration backoff_min,
1332 Duration backoff_max,
1333 double jitter_param,
1336 using tag = typename detail::retrying_policy_traits<Policy>::tag;
1337 return detail::retryingPolicyCappedJitteredExponentialBackoff(
1342 std::forward<URNG>(rng),
1343 std::forward<Policy>(p),
1348 std::function<Future<bool>(size_t, const exception_wrapper&)>
1349 retryingPolicyCappedJitteredExponentialBackoff(
1351 Duration backoff_min,
1352 Duration backoff_max,
1353 double jitter_param) {
1354 auto p = [](size_t, const exception_wrapper&) { return true; };
1355 return retryingPolicyCappedJitteredExponentialBackoff(
1366 // Instantiate the most common Future types to save compile time
1367 extern template class Future<Unit>;
1368 extern template class Future<bool>;
1369 extern template class Future<int>;
1370 extern template class Future<int64_t>;
1371 extern template class Future<std::string>;
1372 extern template class Future<double>;
1374 } // namespace folly