2 * Copyright 2017 Facebook, Inc.
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
8 * http://www.apache.org/licenses/LICENSE-2.0
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
24 #include <folly/Baton.h>
25 #include <folly/Optional.h>
26 #include <folly/Random.h>
27 #include <folly/futures/detail/Core.h>
28 #include <folly/futures/Timekeeper.h>
30 #if FOLLY_MOBILE || defined(__APPLE__)
31 #define FOLLY_FUTURE_USING_FIBER 0
33 #define FOLLY_FUTURE_USING_FIBER 1
34 #include <folly/fibers/Baton.h>
42 #if FOLLY_FUTURE_USING_FIBER
43 typedef folly::fibers::Baton FutureBatonType;
45 typedef folly::Baton<> FutureBatonType;
50 std::shared_ptr<Timekeeper> getTimekeeperSingleton();
52 // Guarantees that the stored functor is destructed before the stored promise
53 // may be fulfilled. Assumes the stored functor to be noexcept-destructible.
54 template <typename T, typename F>
55 class CoreCallbackState {
57 template <typename FF>
58 CoreCallbackState(Promise<T>&& promise, FF&& func) noexcept(
59 noexcept(F(std::declval<FF>())))
60 : func_(std::forward<FF>(func)), promise_(std::move(promise)) {
61 assert(before_barrier());
64 CoreCallbackState(CoreCallbackState&& that) noexcept(
65 noexcept(F(std::declval<F>()))) {
66 if (that.before_barrier()) {
67 new (&func_) F(std::move(that.func_));
68 promise_ = that.stealPromise();
72 CoreCallbackState& operator=(CoreCallbackState&&) = delete;
74 ~CoreCallbackState() {
75 if (before_barrier()) {
80 template <typename... Args>
81 auto invoke(Args&&... args) noexcept(
82 noexcept(std::declval<F&&>()(std::declval<Args&&>()...))) {
83 assert(before_barrier());
84 return std::move(func_)(std::forward<Args>(args)...);
87 template <typename... Args>
88 auto tryInvoke(Args&&... args) noexcept {
89 return makeTryWith([&] { return invoke(std::forward<Args>(args)...); });
92 void setTry(Try<T>&& t) {
93 stealPromise().setTry(std::move(t));
96 void setException(exception_wrapper&& ew) {
97 stealPromise().setException(std::move(ew));
100 Promise<T> stealPromise() noexcept {
101 assert(before_barrier());
103 return std::move(promise_);
107 bool before_barrier() const noexcept {
108 return !promise_.isFulfilled();
114 Promise<T> promise_{Promise<T>::makeEmpty()};
117 template <typename T, typename F>
118 inline auto makeCoreCallbackState(Promise<T>&& p, F&& f) noexcept(
119 noexcept(CoreCallbackState<T, _t<std::decay<F>>>(
120 std::declval<Promise<T>&&>(),
121 std::declval<F&&>()))) {
122 return CoreCallbackState<T, _t<std::decay<F>>>(
123 std::move(p), std::forward<F>(f));
128 Future<T> Future<T>::makeEmpty() {
129 return Future<T>(detail::EmptyConstruct{});
133 Future<T>::Future(Future<T>&& other) noexcept : core_(other.core_) {
134 other.core_ = nullptr;
138 Future<T>& Future<T>::operator=(Future<T>&& other) noexcept {
139 std::swap(core_, other.core_);
146 typename std::enable_if<
147 !std::is_same<T, typename std::decay<T2>::type>::value &&
148 std::is_constructible<T, T2&&>::value &&
149 std::is_convertible<T2&&, T>::value,
151 Future<T>::Future(Future<T2>&& other)
152 : Future(std::move(other).then([](T2&& v) { return T(std::move(v)); })) {}
157 typename std::enable_if<
158 !std::is_same<T, typename std::decay<T2>::type>::value &&
159 std::is_constructible<T, T2&&>::value &&
160 !std::is_convertible<T2&&, T>::value,
162 Future<T>::Future(Future<T2>&& other)
163 : Future(std::move(other).then([](T2&& v) { return T(std::move(v)); })) {}
168 typename std::enable_if<
169 !std::is_same<T, typename std::decay<T2>::type>::value &&
170 std::is_constructible<T, T2&&>::value,
172 Future<T>& Future<T>::operator=(Future<T2>&& other) {
174 std::move(other).then([](T2&& v) { return T(std::move(v)); }));
178 template <class T2, typename>
179 Future<T>::Future(T2&& val)
180 : core_(new detail::Core<T>(Try<T>(std::forward<T2>(val)))) {}
183 template <typename T2>
184 Future<T>::Future(typename std::enable_if<std::is_same<Unit, T2>::value>::type*)
185 : core_(new detail::Core<T>(Try<T>(T()))) {}
190 typename std::enable_if<std::is_constructible<T, Args&&...>::value, int>::
192 Future<T>::Future(in_place_t, Args&&... args)
193 : core_(new detail::Core<T>(in_place, std::forward<Args>(args)...)) {}
196 Future<T>::~Future() {
201 void Future<T>::detach() {
203 core_->detachFuture();
209 void Future<T>::throwIfInvalid() const {
216 void Future<T>::setCallback_(F&& func) {
218 core_->setCallback(std::forward<F>(func));
225 typename std::enable_if<isFuture<F>::value,
226 Future<typename isFuture<T>::Inner>>::type
227 Future<T>::unwrap() {
228 return then([](Future<typename isFuture<T>::Inner> internal_future) {
229 return internal_future;
235 // Variant: returns a value
236 // e.g. f.then([](Try<T>&& t){ return t.value(); });
238 template <typename F, typename R, bool isTry, typename... Args>
239 typename std::enable_if<!R::ReturnsFuture::value, typename R::Return>::type
240 Future<T>::thenImplementation(F&& func, detail::argResult<isTry, F, Args...>) {
241 static_assert(sizeof...(Args) <= 1, "Then must take zero/one argument");
242 typedef typename R::ReturnsFuture::Inner B;
247 p.core_->setInterruptHandlerNoLock(core_->getInterruptHandler());
249 // grab the Future now before we lose our handle on the Promise
250 auto f = p.getFuture();
251 f.core_->setExecutorNoLock(getExecutor());
253 /* This is a bit tricky.
255 We can't just close over *this in case this Future gets moved. So we
256 make a new dummy Future. We could figure out something more
257 sophisticated that avoids making a new Future object when it can, as an
258 optimization. But this is correct.
260 core_ can't be moved, it is explicitly disallowed (as is copying). But
261 if there's ever a reason to allow it, this is one place that makes that
262 assumption and would need to be fixed. We use a standard shared pointer
263 for core_ (by copying it in), which means in essence obj holds a shared
264 pointer to itself. But this shouldn't leak because Promise will not
265 outlive the continuation, because Promise will setException() with a
266 broken Promise if it is destructed before completed. We could use a
267 weak pointer but it would have to be converted to a shared pointer when
268 func is executed (because the Future returned by func may possibly
269 persist beyond the callback, if it gets moved), and so it is an
270 optimization to just make it shared from the get-go.
272 Two subtle but important points about this design. detail::Core has no
273 back pointers to Future or Promise, so if Future or Promise get moved
274 (and they will be moved in performant code) we don't have to do
275 anything fancy. And because we store the continuation in the
276 detail::Core, not in the Future, we can execute the continuation even
277 after the Future has gone out of scope. This is an intentional design
278 decision. It is likely we will want to be able to cancel a continuation
279 in some circumstances, but I think it should be explicit not implicit
280 in the destruction of the Future used to create it.
283 [state = detail::makeCoreCallbackState(
284 std::move(p), std::forward<F>(func))](Try<T> && t) mutable {
285 if (!isTry && t.hasException()) {
286 state.setException(std::move(t.exception()));
288 state.setTry(makeTryWith(
289 [&] { return state.invoke(t.template get<isTry, Args>()...); }));
296 // Variant: returns a Future
297 // e.g. f.then([](T&& t){ return makeFuture<T>(t); });
299 template <typename F, typename R, bool isTry, typename... Args>
300 typename std::enable_if<R::ReturnsFuture::value, typename R::Return>::type
301 Future<T>::thenImplementation(F&& func, detail::argResult<isTry, F, Args...>) {
302 static_assert(sizeof...(Args) <= 1, "Then must take zero/one argument");
303 typedef typename R::ReturnsFuture::Inner B;
308 p.core_->setInterruptHandlerNoLock(core_->getInterruptHandler());
310 // grab the Future now before we lose our handle on the Promise
311 auto f = p.getFuture();
312 f.core_->setExecutorNoLock(getExecutor());
315 [state = detail::makeCoreCallbackState(
316 std::move(p), std::forward<F>(func))](Try<T> && t) mutable {
317 if (!isTry && t.hasException()) {
318 state.setException(std::move(t.exception()));
320 auto tf2 = state.tryInvoke(t.template get<isTry, Args>()...);
321 if (tf2.hasException()) {
322 state.setException(std::move(tf2.exception()));
324 tf2->setCallback_([p = state.stealPromise()](Try<B> && b) mutable {
325 p.setTry(std::move(b));
334 template <typename T>
335 template <typename R, typename Caller, typename... Args>
336 Future<typename isFuture<R>::Inner>
337 Future<T>::then(R(Caller::*func)(Args...), Caller *instance) {
338 typedef typename std::remove_cv<
339 typename std::remove_reference<
340 typename detail::ArgType<Args...>::FirstArg>::type>::type FirstArg;
341 return then([instance, func](Try<T>&& t){
342 return (instance->*func)(t.template get<isTry<FirstArg>::value, Args>()...);
347 Future<Unit> Future<T>::then() {
348 return then([] () {});
351 // onError where the callback returns T
354 typename std::enable_if<
355 !detail::callableWith<F, exception_wrapper>::value &&
356 !detail::Extract<F>::ReturnsFuture::value,
358 Future<T>::onError(F&& func) {
359 typedef std::remove_reference_t<typename detail::Extract<F>::FirstArg> Exn;
361 std::is_same<typename detail::Extract<F>::RawReturn, T>::value,
362 "Return type of onError callback must be T or Future<T>");
365 p.core_->setInterruptHandlerNoLock(core_->getInterruptHandler());
366 auto f = p.getFuture();
369 [state = detail::makeCoreCallbackState(
370 std::move(p), std::forward<F>(func))](Try<T> && t) mutable {
371 if (auto e = t.template tryGetExceptionObject<Exn>()) {
372 state.setTry(makeTryWith([&] { return state.invoke(*e); }));
374 state.setTry(std::move(t));
381 // onError where the callback returns Future<T>
384 typename std::enable_if<
385 !detail::callableWith<F, exception_wrapper>::value &&
386 detail::Extract<F>::ReturnsFuture::value,
388 Future<T>::onError(F&& func) {
390 std::is_same<typename detail::Extract<F>::Return, Future<T>>::value,
391 "Return type of onError callback must be T or Future<T>");
392 typedef std::remove_reference_t<typename detail::Extract<F>::FirstArg> Exn;
395 auto f = p.getFuture();
398 [state = detail::makeCoreCallbackState(
399 std::move(p), std::forward<F>(func))](Try<T> && t) mutable {
400 if (auto e = t.template tryGetExceptionObject<Exn>()) {
401 auto tf2 = state.tryInvoke(*e);
402 if (tf2.hasException()) {
403 state.setException(std::move(tf2.exception()));
405 tf2->setCallback_([p = state.stealPromise()](Try<T> && t3) mutable {
406 p.setTry(std::move(t3));
410 state.setTry(std::move(t));
419 Future<T> Future<T>::ensure(F&& func) {
420 return this->then([funcw = std::forward<F>(func)](Try<T> && t) mutable {
422 return makeFuture(std::move(t));
428 Future<T> Future<T>::onTimeout(Duration dur, F&& func, Timekeeper* tk) {
429 return within(dur, tk).onError([funcw = std::forward<F>(func)](
430 TimedOut const&) { return std::move(funcw)(); });
435 typename std::enable_if<detail::callableWith<F, exception_wrapper>::value &&
436 detail::Extract<F>::ReturnsFuture::value,
438 Future<T>::onError(F&& func) {
440 std::is_same<typename detail::Extract<F>::Return, Future<T>>::value,
441 "Return type of onError callback must be T or Future<T>");
444 auto f = p.getFuture();
446 [state = detail::makeCoreCallbackState(
447 std::move(p), std::forward<F>(func))](Try<T> t) mutable {
448 if (t.hasException()) {
449 auto tf2 = state.tryInvoke(std::move(t.exception()));
450 if (tf2.hasException()) {
451 state.setException(std::move(tf2.exception()));
453 tf2->setCallback_([p = state.stealPromise()](Try<T> && t3) mutable {
454 p.setTry(std::move(t3));
458 state.setTry(std::move(t));
465 // onError(exception_wrapper) that returns T
468 typename std::enable_if<
469 detail::callableWith<F, exception_wrapper>::value &&
470 !detail::Extract<F>::ReturnsFuture::value,
472 Future<T>::onError(F&& func) {
474 std::is_same<typename detail::Extract<F>::Return, Future<T>>::value,
475 "Return type of onError callback must be T or Future<T>");
478 auto f = p.getFuture();
480 [state = detail::makeCoreCallbackState(
481 std::move(p), std::forward<F>(func))](Try<T> && t) mutable {
482 if (t.hasException()) {
483 state.setTry(makeTryWith(
484 [&] { return state.invoke(std::move(t.exception())); }));
486 state.setTry(std::move(t));
494 typename std::add_lvalue_reference<T>::type Future<T>::value() {
497 return core_->getTry().value();
501 typename std::add_lvalue_reference<const T>::type Future<T>::value() const {
504 return core_->getTry().value();
508 Try<T>& Future<T>::getTry() {
511 return core_->getTry();
515 Try<T>& Future<T>::getTryVia(DrivableExecutor* e) {
516 return waitVia(e).getTry();
520 Optional<Try<T>> Future<T>::poll() {
522 if (core_->ready()) {
523 o = std::move(core_->getTry());
529 inline Future<T> Future<T>::via(Executor* executor, int8_t priority) && {
532 setExecutor(executor, priority);
534 return std::move(*this);
538 inline Future<T> Future<T>::via(Executor* executor, int8_t priority) & {
542 auto f = p.getFuture();
543 then([p = std::move(p)](Try<T> && t) mutable { p.setTry(std::move(t)); });
544 return std::move(f).via(executor, priority);
547 template <class Func>
548 auto via(Executor* x, Func&& func)
549 -> Future<typename isFuture<decltype(std::declval<Func>()())>::Inner> {
550 // TODO make this actually more performant. :-P #7260175
551 return via(x).then(std::forward<Func>(func));
555 bool Future<T>::isReady() const {
557 return core_->ready();
561 bool Future<T>::hasValue() {
562 return getTry().hasValue();
566 bool Future<T>::hasException() {
567 return getTry().hasException();
571 void Future<T>::raise(exception_wrapper exception) {
572 core_->raise(std::move(exception));
576 Future<T>::Future(detail::EmptyConstruct) noexcept
582 Future<typename std::decay<T>::type> makeFuture(T&& t) {
583 return makeFuture(Try<typename std::decay<T>::type>(std::forward<T>(t)));
586 inline // for multiple translation units
587 Future<Unit> makeFuture() {
588 return makeFuture(Unit{});
591 // makeFutureWith(Future<T>()) -> Future<T>
593 typename std::enable_if<isFuture<typename std::result_of<F()>::type>::value,
594 typename std::result_of<F()>::type>::type
595 makeFutureWith(F&& func) {
597 typename isFuture<typename std::result_of<F()>::type>::Inner;
599 return std::forward<F>(func)();
600 } catch (std::exception& e) {
601 return makeFuture<InnerType>(
602 exception_wrapper(std::current_exception(), e));
604 return makeFuture<InnerType>(exception_wrapper(std::current_exception()));
608 // makeFutureWith(T()) -> Future<T>
609 // makeFutureWith(void()) -> Future<Unit>
611 typename std::enable_if<
612 !(isFuture<typename std::result_of<F()>::type>::value),
613 Future<typename Unit::Lift<typename std::result_of<F()>::type>::type>>::type
614 makeFutureWith(F&& func) {
616 typename Unit::Lift<typename std::result_of<F()>::type>::type;
617 return makeFuture<LiftedResult>(
618 makeTryWith([&func]() mutable { return std::forward<F>(func)(); }));
622 Future<T> makeFuture(std::exception_ptr const& e) {
623 return makeFuture(Try<T>(e));
627 Future<T> makeFuture(exception_wrapper ew) {
628 return makeFuture(Try<T>(std::move(ew)));
631 template <class T, class E>
632 typename std::enable_if<std::is_base_of<std::exception, E>::value,
634 makeFuture(E const& e) {
635 return makeFuture(Try<T>(make_exception_wrapper<E>(e)));
639 Future<T> makeFuture(Try<T>&& t) {
640 return Future<T>(new detail::Core<T>(std::move(t)));
644 Future<Unit> via(Executor* executor, int8_t priority) {
645 return makeFuture().via(executor, priority);
648 // mapSetCallback calls func(i, Try<T>) when every future completes
650 template <class T, class InputIterator, class F>
651 void mapSetCallback(InputIterator first, InputIterator last, F func) {
652 for (size_t i = 0; first != last; ++first, ++i) {
653 first->setCallback_([func, i](Try<T>&& t) {
654 func(i, std::move(t));
659 // collectAll (variadic)
661 template <typename... Fs>
662 typename detail::CollectAllVariadicContext<
663 typename std::decay<Fs>::type::value_type...>::type
664 collectAll(Fs&&... fs) {
665 auto ctx = std::make_shared<detail::CollectAllVariadicContext<
666 typename std::decay<Fs>::type::value_type...>>();
667 detail::collectVariadicHelper<detail::CollectAllVariadicContext>(
668 ctx, std::forward<Fs>(fs)...);
669 return ctx->p.getFuture();
672 // collectAll (iterator)
674 template <class InputIterator>
677 Try<typename std::iterator_traits<InputIterator>::value_type::value_type>>>
678 collectAll(InputIterator first, InputIterator last) {
680 typename std::iterator_traits<InputIterator>::value_type::value_type T;
682 struct CollectAllContext {
683 CollectAllContext(size_t n) : results(n) {}
684 ~CollectAllContext() {
685 p.setValue(std::move(results));
687 Promise<std::vector<Try<T>>> p;
688 std::vector<Try<T>> results;
692 std::make_shared<CollectAllContext>(size_t(std::distance(first, last)));
693 mapSetCallback<T>(first, last, [ctx](size_t i, Try<T>&& t) {
694 ctx->results[i] = std::move(t);
696 return ctx->p.getFuture();
699 // collect (iterator)
703 template <typename T>
704 struct CollectContext {
706 explicit Nothing(int /* n */) {}
709 using Result = typename std::conditional<
710 std::is_void<T>::value,
712 std::vector<T>>::type;
714 using InternalResult = typename std::conditional<
715 std::is_void<T>::value,
717 std::vector<Optional<T>>>::type;
719 explicit CollectContext(size_t n) : result(n) {}
721 if (!threw.exchange(true)) {
722 // map Optional<T> -> T
723 std::vector<T> finalResult;
724 finalResult.reserve(result.size());
725 std::transform(result.begin(), result.end(),
726 std::back_inserter(finalResult),
727 [](Optional<T>& o) { return std::move(o.value()); });
728 p.setValue(std::move(finalResult));
731 inline void setPartialResult(size_t i, Try<T>& t) {
732 result[i] = std::move(t.value());
735 InternalResult result;
736 std::atomic<bool> threw {false};
741 template <class InputIterator>
742 Future<typename detail::CollectContext<
743 typename std::iterator_traits<InputIterator>::value_type::value_type>::Result>
744 collect(InputIterator first, InputIterator last) {
746 typename std::iterator_traits<InputIterator>::value_type::value_type T;
748 auto ctx = std::make_shared<detail::CollectContext<T>>(
749 std::distance(first, last));
750 mapSetCallback<T>(first, last, [ctx](size_t i, Try<T>&& t) {
751 if (t.hasException()) {
752 if (!ctx->threw.exchange(true)) {
753 ctx->p.setException(std::move(t.exception()));
755 } else if (!ctx->threw) {
756 ctx->setPartialResult(i, t);
759 return ctx->p.getFuture();
762 // collect (variadic)
764 template <typename... Fs>
765 typename detail::CollectVariadicContext<
766 typename std::decay<Fs>::type::value_type...>::type
767 collect(Fs&&... fs) {
768 auto ctx = std::make_shared<detail::CollectVariadicContext<
769 typename std::decay<Fs>::type::value_type...>>();
770 detail::collectVariadicHelper<detail::CollectVariadicContext>(
771 ctx, std::forward<Fs>(fs)...);
772 return ctx->p.getFuture();
775 // collectAny (iterator)
777 template <class InputIterator>
782 std::iterator_traits<InputIterator>::value_type::value_type>>>
783 collectAny(InputIterator first, InputIterator last) {
785 typename std::iterator_traits<InputIterator>::value_type::value_type T;
787 struct CollectAnyContext {
788 CollectAnyContext() {}
789 Promise<std::pair<size_t, Try<T>>> p;
790 std::atomic<bool> done {false};
793 auto ctx = std::make_shared<CollectAnyContext>();
794 mapSetCallback<T>(first, last, [ctx](size_t i, Try<T>&& t) {
795 if (!ctx->done.exchange(true)) {
796 ctx->p.setValue(std::make_pair(i, std::move(t)));
799 return ctx->p.getFuture();
802 // collectAnyWithoutException (iterator)
804 template <class InputIterator>
807 typename std::iterator_traits<InputIterator>::value_type::value_type>>
808 collectAnyWithoutException(InputIterator first, InputIterator last) {
810 typename std::iterator_traits<InputIterator>::value_type::value_type T;
812 struct CollectAnyWithoutExceptionContext {
813 CollectAnyWithoutExceptionContext(){}
814 Promise<std::pair<size_t, T>> p;
815 std::atomic<bool> done{false};
816 std::atomic<size_t> nFulfilled{0};
820 auto ctx = std::make_shared<CollectAnyWithoutExceptionContext>();
821 ctx->nTotal = size_t(std::distance(first, last));
823 mapSetCallback<T>(first, last, [ctx](size_t i, Try<T>&& t) {
824 if (!t.hasException() && !ctx->done.exchange(true)) {
825 ctx->p.setValue(std::make_pair(i, std::move(t.value())));
826 } else if (++ctx->nFulfilled == ctx->nTotal) {
827 ctx->p.setException(t.exception());
830 return ctx->p.getFuture();
833 // collectN (iterator)
835 template <class InputIterator>
836 Future<std::vector<std::pair<size_t, Try<typename
837 std::iterator_traits<InputIterator>::value_type::value_type>>>>
838 collectN(InputIterator first, InputIterator last, size_t n) {
840 std::iterator_traits<InputIterator>::value_type::value_type T;
841 typedef std::vector<std::pair<size_t, Try<T>>> V;
843 struct CollectNContext {
845 std::atomic<size_t> completed = {0};
848 auto ctx = std::make_shared<CollectNContext>();
850 if (size_t(std::distance(first, last)) < n) {
851 ctx->p.setException(std::runtime_error("Not enough futures"));
853 // for each completed Future, increase count and add to vector, until we
854 // have n completed futures at which point we fulfil our Promise with the
856 mapSetCallback<T>(first, last, [ctx, n](size_t i, Try<T>&& t) {
857 auto c = ++ctx->completed;
859 assert(ctx->v.size() < n);
860 ctx->v.emplace_back(i, std::move(t));
862 ctx->p.setTry(Try<V>(std::move(ctx->v)));
868 return ctx->p.getFuture();
873 template <class It, class T, class F>
874 Future<T> reduce(It first, It last, T&& initial, F&& func) {
876 return makeFuture(std::move(initial));
879 typedef typename std::iterator_traits<It>::value_type::value_type ItT;
881 typename std::conditional<detail::callableWith<F, T&&, Try<ItT>&&>::value,
884 typedef isTry<Arg> IsTry;
886 auto sfunc = std::make_shared<F>(std::move(func));
888 auto f = first->then(
889 [ minitial = std::move(initial), sfunc ](Try<ItT> & head) mutable {
891 std::move(minitial), head.template get<IsTry::value, Arg&&>());
894 for (++first; first != last; ++first) {
895 f = collectAll(f, *first).then([sfunc](std::tuple<Try<T>, Try<ItT>>& t) {
896 return (*sfunc)(std::move(std::get<0>(t).value()),
897 // Either return a ItT&& or a Try<ItT>&& depending
898 // on the type of the argument of func.
899 std::get<1>(t).template get<IsTry::value, Arg&&>());
906 // window (collection)
908 template <class Collection, class F, class ItT, class Result>
909 std::vector<Future<Result>>
910 window(Collection input, F func, size_t n) {
911 struct WindowContext {
912 WindowContext(Collection&& i, F&& fn)
913 : input_(std::move(i)), promises_(input_.size()),
916 std::atomic<size_t> i_ {0};
918 std::vector<Promise<Result>> promises_;
921 static inline void spawn(const std::shared_ptr<WindowContext>& ctx) {
922 size_t i = ctx->i_++;
923 if (i < ctx->input_.size()) {
924 // Using setCallback_ directly since we don't need the Future
925 ctx->func_(std::move(ctx->input_[i])).setCallback_(
926 // ctx is captured by value
927 [ctx, i](Try<Result>&& t) {
928 ctx->promises_[i].setTry(std::move(t));
929 // Chain another future onto this one
930 spawn(std::move(ctx));
936 auto max = std::min(n, input.size());
938 auto ctx = std::make_shared<WindowContext>(
939 std::move(input), std::move(func));
941 for (size_t i = 0; i < max; ++i) {
942 // Start the first n Futures
943 WindowContext::spawn(ctx);
946 std::vector<Future<Result>> futures;
947 futures.reserve(ctx->promises_.size());
948 for (auto& promise : ctx->promises_) {
949 futures.emplace_back(promise.getFuture());
958 template <class I, class F>
959 Future<I> Future<T>::reduce(I&& initial, F&& func) {
961 minitial = std::forward<I>(initial),
962 mfunc = std::forward<F>(func)
964 auto ret = std::move(minitial);
965 for (auto& val : vals) {
966 ret = mfunc(std::move(ret), std::move(val));
972 // unorderedReduce (iterator)
974 template <class It, class T, class F, class ItT, class Arg>
975 Future<T> unorderedReduce(It first, It last, T initial, F func) {
977 return makeFuture(std::move(initial));
980 typedef isTry<Arg> IsTry;
982 struct UnorderedReduceContext {
983 UnorderedReduceContext(T&& memo, F&& fn, size_t n)
984 : lock_(), memo_(makeFuture<T>(std::move(memo))),
985 func_(std::move(fn)), numThens_(0), numFutures_(n), promise_()
987 folly::MicroSpinLock lock_; // protects memo_ and numThens_
990 size_t numThens_; // how many Futures completed and called .then()
991 size_t numFutures_; // how many Futures in total
995 auto ctx = std::make_shared<UnorderedReduceContext>(
996 std::move(initial), std::move(func), std::distance(first, last));
1001 [ctx](size_t /* i */, Try<ItT>&& t) {
1002 // Futures can be completed in any order, simultaneously.
1003 // To make this non-blocking, we create a new Future chain in
1004 // the order of completion to reduce the values.
1005 // The spinlock just protects chaining a new Future, not actually
1006 // executing the reduce, which should be really fast.
1007 folly::MSLGuard lock(ctx->lock_);
1009 ctx->memo_.then([ ctx, mt = std::move(t) ](T && v) mutable {
1010 // Either return a ItT&& or a Try<ItT>&& depending
1011 // on the type of the argument of func.
1012 return ctx->func_(std::move(v),
1013 mt.template get<IsTry::value, Arg&&>());
1015 if (++ctx->numThens_ == ctx->numFutures_) {
1016 // After reducing the value of the last Future, fulfill the Promise
1017 ctx->memo_.setCallback_(
1018 [ctx](Try<T>&& t2) { ctx->promise_.setValue(std::move(t2)); });
1022 return ctx->promise_.getFuture();
1028 Future<T> Future<T>::within(Duration dur, Timekeeper* tk) {
1029 return within(dur, TimedOut(), tk);
1034 Future<T> Future<T>::within(Duration dur, E e, Timekeeper* tk) {
1037 Context(E ex) : exception(std::move(ex)), promise() {}
1039 Future<Unit> thisFuture;
1041 std::atomic<bool> token {false};
1044 std::shared_ptr<Timekeeper> tks;
1046 tks = folly::detail::getTimekeeperSingleton();
1047 tk = DCHECK_NOTNULL(tks.get());
1050 auto ctx = std::make_shared<Context>(std::move(e));
1052 ctx->thisFuture = this->then([ctx](Try<T>&& t) mutable {
1053 // TODO: "this" completed first, cancel "after"
1054 if (ctx->token.exchange(true) == false) {
1055 ctx->promise.setTry(std::move(t));
1059 tk->after(dur).then([ctx](Try<Unit> const& t) mutable {
1060 // "after" completed first, cancel "this"
1061 ctx->thisFuture.raise(TimedOut());
1062 if (ctx->token.exchange(true) == false) {
1063 if (t.hasException()) {
1064 ctx->promise.setException(std::move(t.exception()));
1066 ctx->promise.setException(std::move(ctx->exception));
1071 return ctx->promise.getFuture().via(getExecutor());
1077 Future<T> Future<T>::delayed(Duration dur, Timekeeper* tk) {
1078 return collectAll(*this, futures::sleep(dur, tk))
1079 .then([](std::tuple<Try<T>, Try<Unit>> tup) {
1080 Try<T>& t = std::get<0>(tup);
1081 return makeFuture<T>(std::move(t));
1088 void waitImpl(Future<T>& f) {
1089 // short-circuit if there's nothing to do
1090 if (f.isReady()) return;
1092 FutureBatonType baton;
1093 f.setCallback_([&](const Try<T>& /* t */) { baton.post(); });
1095 assert(f.isReady());
1099 void waitImpl(Future<T>& f, Duration dur) {
1100 // short-circuit if there's nothing to do
1106 auto ret = promise.getFuture();
1107 auto baton = std::make_shared<FutureBatonType>();
1108 f.setCallback_([ baton, promise = std::move(promise) ](Try<T> && t) mutable {
1109 promise.setTry(std::move(t));
1113 if (baton->timed_wait(dur)) {
1114 assert(f.isReady());
1119 void waitViaImpl(Future<T>& f, DrivableExecutor* e) {
1120 // Set callback so to ensure that the via executor has something on it
1121 // so that once the preceding future triggers this callback, drive will
1122 // always have a callback to satisfy it
1125 f = f.via(e).then([](T&& t) { return std::move(t); });
1126 while (!f.isReady()) {
1129 assert(f.isReady());
1135 Future<T>& Future<T>::wait() & {
1136 detail::waitImpl(*this);
1141 Future<T>&& Future<T>::wait() && {
1142 detail::waitImpl(*this);
1143 return std::move(*this);
1147 Future<T>& Future<T>::wait(Duration dur) & {
1148 detail::waitImpl(*this, dur);
1153 Future<T>&& Future<T>::wait(Duration dur) && {
1154 detail::waitImpl(*this, dur);
1155 return std::move(*this);
1159 Future<T>& Future<T>::waitVia(DrivableExecutor* e) & {
1160 detail::waitViaImpl(*this, e);
1165 Future<T>&& Future<T>::waitVia(DrivableExecutor* e) && {
1166 detail::waitViaImpl(*this, e);
1167 return std::move(*this);
1171 T Future<T>::get() {
1172 return std::move(wait().value());
1176 T Future<T>::get(Duration dur) {
1179 return std::move(value());
1186 T Future<T>::getVia(DrivableExecutor* e) {
1187 return std::move(waitVia(e).value());
1193 static bool equals(const Try<T>& t1, const Try<T>& t2) {
1194 return t1.value() == t2.value();
1200 Future<bool> Future<T>::willEqual(Future<T>& f) {
1201 return collectAll(*this, f).then([](const std::tuple<Try<T>, Try<T>>& t) {
1202 if (std::get<0>(t).hasValue() && std::get<1>(t).hasValue()) {
1203 return detail::TryEquals<T>::equals(std::get<0>(t), std::get<1>(t));
1212 Future<T> Future<T>::filter(F&& predicate) {
1213 return this->then([p = std::forward<F>(predicate)](T val) {
1214 T const& valConstRef = val;
1215 if (!p(valConstRef)) {
1216 throw PredicateDoesNotObtain();
1223 inline Future<Unit> when(bool p, F&& thunk) {
1224 return p ? std::forward<F>(thunk)().unit() : makeFuture();
1227 template <class P, class F>
1228 Future<Unit> whileDo(P&& predicate, F&& thunk) {
1230 auto future = thunk();
1231 return future.then([
1232 predicate = std::forward<P>(predicate),
1233 thunk = std::forward<F>(thunk)
1235 return whileDo(std::forward<P>(predicate), std::forward<F>(thunk));
1238 return makeFuture();
1242 Future<Unit> times(const int n, F&& thunk) {
1243 return folly::whileDo(
1244 [ n, count = std::make_unique<std::atomic<int>>(0) ]() mutable {
1245 return count->fetch_add(1) < n;
1247 std::forward<F>(thunk));
1251 template <class It, class F, class ItT, class Result>
1252 std::vector<Future<Result>> map(It first, It last, F func) {
1253 std::vector<Future<Result>> results;
1254 for (auto it = first; it != last; it++) {
1255 results.push_back(it->then(func));
1265 struct retrying_policy_raw_tag {};
1266 struct retrying_policy_fut_tag {};
1268 template <class Policy>
1269 struct retrying_policy_traits {
1270 using result = std::result_of_t<Policy(size_t, const exception_wrapper&)>;
1271 using is_raw = std::is_same<result, bool>;
1272 using is_fut = std::is_same<result, Future<bool>>;
1273 using tag = typename std::conditional<
1274 is_raw::value, retrying_policy_raw_tag, typename std::conditional<
1275 is_fut::value, retrying_policy_fut_tag, void>::type>::type;
1278 template <class Policy, class FF, class Prom>
1279 void retryingImpl(size_t k, Policy&& p, FF&& ff, Prom prom) {
1280 using F = typename std::result_of<FF(size_t)>::type;
1281 using T = typename F::value_type;
1282 auto f = makeFutureWith([&] { return ff(k++); });
1285 prom = std::move(prom),
1286 pm = std::forward<Policy>(p),
1287 ffm = std::forward<FF>(ff)
1288 ](Try<T> && t) mutable {
1290 prom.setValue(std::move(t).value());
1293 auto& x = t.exception();
1297 prom = std::move(prom),
1300 ffm = std::move(ffm)
1301 ](bool shouldRetry) mutable {
1303 retryingImpl(k, std::move(pm), std::move(ffm), std::move(prom));
1305 prom.setException(std::move(xm));
1311 template <class Policy, class FF>
1312 typename std::result_of<FF(size_t)>::type
1313 retrying(size_t k, Policy&& p, FF&& ff) {
1314 using F = typename std::result_of<FF(size_t)>::type;
1315 using T = typename F::value_type;
1316 auto prom = Promise<T>();
1317 auto f = prom.getFuture();
1319 k, std::forward<Policy>(p), std::forward<FF>(ff), std::move(prom));
1323 template <class Policy, class FF>
1324 typename std::result_of<FF(size_t)>::type
1325 retrying(Policy&& p, FF&& ff, retrying_policy_raw_tag) {
1326 auto q = [pm = std::forward<Policy>(p)](size_t k, exception_wrapper x) {
1327 return makeFuture<bool>(pm(k, x));
1329 return retrying(0, std::move(q), std::forward<FF>(ff));
1332 template <class Policy, class FF>
1333 typename std::result_of<FF(size_t)>::type
1334 retrying(Policy&& p, FF&& ff, retrying_policy_fut_tag) {
1335 return retrying(0, std::forward<Policy>(p), std::forward<FF>(ff));
1338 // jittered exponential backoff, clamped to [backoff_min, backoff_max]
1339 template <class URNG>
1340 Duration retryingJitteredExponentialBackoffDur(
1342 Duration backoff_min,
1343 Duration backoff_max,
1344 double jitter_param,
1347 auto dist = std::normal_distribution<double>(0.0, jitter_param);
1348 auto jitter = std::exp(dist(rng));
1349 auto backoff = d(d::rep(jitter * backoff_min.count() * std::pow(2, n - 1)));
1350 return std::max(backoff_min, std::min(backoff_max, backoff));
1353 template <class Policy, class URNG>
1354 std::function<Future<bool>(size_t, const exception_wrapper&)>
1355 retryingPolicyCappedJitteredExponentialBackoff(
1357 Duration backoff_min,
1358 Duration backoff_max,
1359 double jitter_param,
1363 pm = std::forward<Policy>(p),
1368 rngp = std::forward<URNG>(rng)
1369 ](size_t n, const exception_wrapper& ex) mutable {
1370 if (n == max_tries) {
1371 return makeFuture(false);
1373 return pm(n, ex).then(
1374 [ n, backoff_min, backoff_max, jitter_param, rngp = std::move(rngp) ](
1377 return makeFuture(false);
1379 auto backoff = detail::retryingJitteredExponentialBackoffDur(
1380 n, backoff_min, backoff_max, jitter_param, rngp);
1381 return futures::sleep(backoff).then([] { return true; });
1386 template <class Policy, class URNG>
1387 std::function<Future<bool>(size_t, const exception_wrapper&)>
1388 retryingPolicyCappedJitteredExponentialBackoff(
1390 Duration backoff_min,
1391 Duration backoff_max,
1392 double jitter_param,
1395 retrying_policy_raw_tag) {
1396 auto q = [pm = std::forward<Policy>(p)](
1397 size_t n, const exception_wrapper& e) {
1398 return makeFuture(pm(n, e));
1400 return retryingPolicyCappedJitteredExponentialBackoff(
1405 std::forward<URNG>(rng),
1409 template <class Policy, class URNG>
1410 std::function<Future<bool>(size_t, const exception_wrapper&)>
1411 retryingPolicyCappedJitteredExponentialBackoff(
1413 Duration backoff_min,
1414 Duration backoff_max,
1415 double jitter_param,
1418 retrying_policy_fut_tag) {
1419 return retryingPolicyCappedJitteredExponentialBackoff(
1424 std::forward<URNG>(rng),
1425 std::forward<Policy>(p));
1429 template <class Policy, class FF>
1430 typename std::result_of<FF(size_t)>::type
1431 retrying(Policy&& p, FF&& ff) {
1432 using tag = typename detail::retrying_policy_traits<Policy>::tag;
1433 return detail::retrying(std::forward<Policy>(p), std::forward<FF>(ff), tag());
1437 std::function<bool(size_t, const exception_wrapper&)>
1438 retryingPolicyBasic(
1440 return [=](size_t n, const exception_wrapper&) { return n < max_tries; };
1443 template <class Policy, class URNG>
1444 std::function<Future<bool>(size_t, const exception_wrapper&)>
1445 retryingPolicyCappedJitteredExponentialBackoff(
1447 Duration backoff_min,
1448 Duration backoff_max,
1449 double jitter_param,
1452 using tag = typename detail::retrying_policy_traits<Policy>::tag;
1453 return detail::retryingPolicyCappedJitteredExponentialBackoff(
1458 std::forward<URNG>(rng),
1459 std::forward<Policy>(p),
1464 std::function<Future<bool>(size_t, const exception_wrapper&)>
1465 retryingPolicyCappedJitteredExponentialBackoff(
1467 Duration backoff_min,
1468 Duration backoff_max,
1469 double jitter_param) {
1470 auto p = [](size_t, const exception_wrapper&) { return true; };
1471 return retryingPolicyCappedJitteredExponentialBackoff(
1482 // Instantiate the most common Future types to save compile time
1483 extern template class Future<Unit>;
1484 extern template class Future<bool>;
1485 extern template class Future<int>;
1486 extern template class Future<int64_t>;
1487 extern template class Future<std::string>;
1488 extern template class Future<double>;
1490 } // namespace folly