2 * Copyright 2017 Facebook, Inc.
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
8 * http://www.apache.org/licenses/LICENSE-2.0
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
24 #include <folly/Baton.h>
25 #include <folly/Optional.h>
26 #include <folly/Random.h>
27 #include <folly/Traits.h>
28 #include <folly/futures/detail/Core.h>
29 #include <folly/futures/Timekeeper.h>
31 #if FOLLY_MOBILE || defined(__APPLE__)
32 #define FOLLY_FUTURE_USING_FIBER 0
34 #define FOLLY_FUTURE_USING_FIBER 1
35 #include <folly/fibers/Baton.h>
43 #if FOLLY_FUTURE_USING_FIBER
44 typedef folly::fibers::Baton FutureBatonType;
46 typedef folly::Baton<> FutureBatonType;
51 std::shared_ptr<Timekeeper> getTimekeeperSingleton();
53 // Guarantees that the stored functor is destructed before the stored promise
54 // may be fulfilled. Assumes the stored functor to be noexcept-destructible.
55 template <typename T, typename F>
56 class CoreCallbackState {
58 template <typename FF>
59 CoreCallbackState(Promise<T>&& promise, FF&& func) noexcept(
60 noexcept(F(std::declval<FF>())))
61 : func_(std::forward<FF>(func)), promise_(std::move(promise)) {
62 assert(before_barrier());
65 CoreCallbackState(CoreCallbackState&& that) noexcept(
66 noexcept(F(std::declval<F>()))) {
67 if (that.before_barrier()) {
68 new (&func_) F(std::move(that.func_));
69 promise_ = that.stealPromise();
73 CoreCallbackState& operator=(CoreCallbackState&&) = delete;
75 ~CoreCallbackState() {
76 if (before_barrier()) {
81 template <typename... Args>
82 auto invoke(Args&&... args) noexcept(
83 noexcept(std::declval<F&&>()(std::declval<Args&&>()...))) {
84 assert(before_barrier());
85 return std::move(func_)(std::forward<Args>(args)...);
88 template <typename... Args>
89 auto tryInvoke(Args&&... args) noexcept {
90 return makeTryWith([&] { return invoke(std::forward<Args>(args)...); });
93 void setTry(Try<T>&& t) {
94 stealPromise().setTry(std::move(t));
97 void setException(exception_wrapper&& ew) {
98 stealPromise().setException(std::move(ew));
101 Promise<T> stealPromise() noexcept {
102 assert(before_barrier());
104 return std::move(promise_);
108 bool before_barrier() const noexcept {
109 return !promise_.isFulfilled();
115 Promise<T> promise_{detail::EmptyConstruct{}};
118 template <typename T, typename F>
119 inline auto makeCoreCallbackState(Promise<T>&& p, F&& f) noexcept(
120 noexcept(CoreCallbackState<T, _t<std::decay<F>>>(
121 std::declval<Promise<T>&&>(),
122 std::declval<F&&>()))) {
123 return CoreCallbackState<T, _t<std::decay<F>>>(
124 std::move(p), std::forward<F>(f));
129 Future<T>::Future(Future<T>&& other) noexcept : core_(other.core_) {
130 other.core_ = nullptr;
134 Future<T>& Future<T>::operator=(Future<T>&& other) noexcept {
135 std::swap(core_, other.core_);
142 typename std::enable_if<
143 !std::is_same<T, typename std::decay<T2>::type>::value &&
144 std::is_constructible<T, T2&&>::value &&
145 std::is_convertible<T2&&, T>::value,
147 Future<T>::Future(Future<T2>&& other)
148 : Future(std::move(other).then([](T2&& v) { return T(std::move(v)); })) {}
153 typename std::enable_if<
154 !std::is_same<T, typename std::decay<T2>::type>::value &&
155 std::is_constructible<T, T2&&>::value &&
156 !std::is_convertible<T2&&, T>::value,
158 Future<T>::Future(Future<T2>&& other)
159 : Future(std::move(other).then([](T2&& v) { return T(std::move(v)); })) {}
164 typename std::enable_if<
165 !std::is_same<T, typename std::decay<T2>::type>::value &&
166 std::is_constructible<T, T2&&>::value,
168 Future<T>& Future<T>::operator=(Future<T2>&& other) {
170 std::move(other).then([](T2&& v) { return T(std::move(v)); }));
174 template <class T2, typename>
175 Future<T>::Future(T2&& val)
176 : core_(new detail::Core<T>(Try<T>(std::forward<T2>(val)))) {}
179 template <typename T2>
180 Future<T>::Future(typename std::enable_if<std::is_same<Unit, T2>::value>::type*)
181 : core_(new detail::Core<T>(Try<T>(T()))) {}
184 Future<T>::~Future() {
189 void Future<T>::detach() {
191 core_->detachFuture();
197 void Future<T>::throwIfInvalid() const {
204 void Future<T>::setCallback_(F&& func) {
206 core_->setCallback(std::forward<F>(func));
213 typename std::enable_if<isFuture<F>::value,
214 Future<typename isFuture<T>::Inner>>::type
215 Future<T>::unwrap() {
216 return then([](Future<typename isFuture<T>::Inner> internal_future) {
217 return internal_future;
223 // Variant: returns a value
224 // e.g. f.then([](Try<T>&& t){ return t.value(); });
226 template <typename F, typename R, bool isTry, typename... Args>
227 typename std::enable_if<!R::ReturnsFuture::value, typename R::Return>::type
228 Future<T>::thenImplementation(F&& func, detail::argResult<isTry, F, Args...>) {
229 static_assert(sizeof...(Args) <= 1, "Then must take zero/one argument");
230 typedef typename R::ReturnsFuture::Inner B;
235 p.core_->setInterruptHandlerNoLock(core_->getInterruptHandler());
237 // grab the Future now before we lose our handle on the Promise
238 auto f = p.getFuture();
239 f.core_->setExecutorNoLock(getExecutor());
241 /* This is a bit tricky.
243 We can't just close over *this in case this Future gets moved. So we
244 make a new dummy Future. We could figure out something more
245 sophisticated that avoids making a new Future object when it can, as an
246 optimization. But this is correct.
248 core_ can't be moved, it is explicitly disallowed (as is copying). But
249 if there's ever a reason to allow it, this is one place that makes that
250 assumption and would need to be fixed. We use a standard shared pointer
251 for core_ (by copying it in), which means in essence obj holds a shared
252 pointer to itself. But this shouldn't leak because Promise will not
253 outlive the continuation, because Promise will setException() with a
254 broken Promise if it is destructed before completed. We could use a
255 weak pointer but it would have to be converted to a shared pointer when
256 func is executed (because the Future returned by func may possibly
257 persist beyond the callback, if it gets moved), and so it is an
258 optimization to just make it shared from the get-go.
260 Two subtle but important points about this design. detail::Core has no
261 back pointers to Future or Promise, so if Future or Promise get moved
262 (and they will be moved in performant code) we don't have to do
263 anything fancy. And because we store the continuation in the
264 detail::Core, not in the Future, we can execute the continuation even
265 after the Future has gone out of scope. This is an intentional design
266 decision. It is likely we will want to be able to cancel a continuation
267 in some circumstances, but I think it should be explicit not implicit
268 in the destruction of the Future used to create it.
271 [state = detail::makeCoreCallbackState(
272 std::move(p), std::forward<F>(func))](Try<T> && t) mutable {
273 if (!isTry && t.hasException()) {
274 state.setException(std::move(t.exception()));
276 state.setTry(makeTryWith(
277 [&] { return state.invoke(t.template get<isTry, Args>()...); }));
284 // Variant: returns a Future
285 // e.g. f.then([](T&& t){ return makeFuture<T>(t); });
287 template <typename F, typename R, bool isTry, typename... Args>
288 typename std::enable_if<R::ReturnsFuture::value, typename R::Return>::type
289 Future<T>::thenImplementation(F&& func, detail::argResult<isTry, F, Args...>) {
290 static_assert(sizeof...(Args) <= 1, "Then must take zero/one argument");
291 typedef typename R::ReturnsFuture::Inner B;
296 p.core_->setInterruptHandlerNoLock(core_->getInterruptHandler());
298 // grab the Future now before we lose our handle on the Promise
299 auto f = p.getFuture();
300 f.core_->setExecutorNoLock(getExecutor());
303 [state = detail::makeCoreCallbackState(
304 std::move(p), std::forward<F>(func))](Try<T> && t) mutable {
305 if (!isTry && t.hasException()) {
306 state.setException(std::move(t.exception()));
308 auto tf2 = state.tryInvoke(t.template get<isTry, Args>()...);
309 if (tf2.hasException()) {
310 state.setException(std::move(tf2.exception()));
312 tf2->setCallback_([p = state.stealPromise()](Try<B> && b) mutable {
313 p.setTry(std::move(b));
322 template <typename T>
323 template <typename R, typename Caller, typename... Args>
324 Future<typename isFuture<R>::Inner>
325 Future<T>::then(R(Caller::*func)(Args...), Caller *instance) {
326 typedef typename std::remove_cv<
327 typename std::remove_reference<
328 typename detail::ArgType<Args...>::FirstArg>::type>::type FirstArg;
329 return then([instance, func](Try<T>&& t){
330 return (instance->*func)(t.template get<isTry<FirstArg>::value, Args>()...);
335 Future<Unit> Future<T>::then() {
336 return then([] () {});
339 // onError where the callback returns T
342 typename std::enable_if<
343 !detail::callableWith<F, exception_wrapper>::value &&
344 !detail::Extract<F>::ReturnsFuture::value,
346 Future<T>::onError(F&& func) {
347 typedef std::remove_reference_t<typename detail::Extract<F>::FirstArg> Exn;
349 std::is_same<typename detail::Extract<F>::RawReturn, T>::value,
350 "Return type of onError callback must be T or Future<T>");
353 p.core_->setInterruptHandlerNoLock(core_->getInterruptHandler());
354 auto f = p.getFuture();
357 [state = detail::makeCoreCallbackState(
358 std::move(p), std::forward<F>(func))](Try<T> && t) mutable {
359 if (auto e = t.template tryGetExceptionObject<Exn>()) {
360 state.setTry(makeTryWith([&] { return state.invoke(*e); }));
362 state.setTry(std::move(t));
369 // onError where the callback returns Future<T>
372 typename std::enable_if<
373 !detail::callableWith<F, exception_wrapper>::value &&
374 detail::Extract<F>::ReturnsFuture::value,
376 Future<T>::onError(F&& func) {
378 std::is_same<typename detail::Extract<F>::Return, Future<T>>::value,
379 "Return type of onError callback must be T or Future<T>");
380 typedef std::remove_reference_t<typename detail::Extract<F>::FirstArg> Exn;
383 auto f = p.getFuture();
386 [state = detail::makeCoreCallbackState(
387 std::move(p), std::forward<F>(func))](Try<T> && t) mutable {
388 if (auto e = t.template tryGetExceptionObject<Exn>()) {
389 auto tf2 = state.tryInvoke(*e);
390 if (tf2.hasException()) {
391 state.setException(std::move(tf2.exception()));
393 tf2->setCallback_([p = state.stealPromise()](Try<T> && t3) mutable {
394 p.setTry(std::move(t3));
398 state.setTry(std::move(t));
407 Future<T> Future<T>::ensure(F&& func) {
408 return this->then([funcw = std::forward<F>(func)](Try<T> && t) mutable {
410 return makeFuture(std::move(t));
416 Future<T> Future<T>::onTimeout(Duration dur, F&& func, Timekeeper* tk) {
417 return within(dur, tk).onError([funcw = std::forward<F>(func)](
418 TimedOut const&) { return std::move(funcw)(); });
423 typename std::enable_if<detail::callableWith<F, exception_wrapper>::value &&
424 detail::Extract<F>::ReturnsFuture::value,
426 Future<T>::onError(F&& func) {
428 std::is_same<typename detail::Extract<F>::Return, Future<T>>::value,
429 "Return type of onError callback must be T or Future<T>");
432 auto f = p.getFuture();
434 [state = detail::makeCoreCallbackState(
435 std::move(p), std::forward<F>(func))](Try<T> t) mutable {
436 if (t.hasException()) {
437 auto tf2 = state.tryInvoke(std::move(t.exception()));
438 if (tf2.hasException()) {
439 state.setException(std::move(tf2.exception()));
441 tf2->setCallback_([p = state.stealPromise()](Try<T> && t3) mutable {
442 p.setTry(std::move(t3));
446 state.setTry(std::move(t));
453 // onError(exception_wrapper) that returns T
456 typename std::enable_if<
457 detail::callableWith<F, exception_wrapper>::value &&
458 !detail::Extract<F>::ReturnsFuture::value,
460 Future<T>::onError(F&& func) {
462 std::is_same<typename detail::Extract<F>::Return, Future<T>>::value,
463 "Return type of onError callback must be T or Future<T>");
466 auto f = p.getFuture();
468 [state = detail::makeCoreCallbackState(
469 std::move(p), std::forward<F>(func))](Try<T> && t) mutable {
470 if (t.hasException()) {
471 state.setTry(makeTryWith(
472 [&] { return state.invoke(std::move(t.exception())); }));
474 state.setTry(std::move(t));
482 typename std::add_lvalue_reference<T>::type Future<T>::value() {
485 return core_->getTry().value();
489 typename std::add_lvalue_reference<const T>::type Future<T>::value() const {
492 return core_->getTry().value();
496 Try<T>& Future<T>::getTry() {
499 return core_->getTry();
503 Try<T>& Future<T>::getTryVia(DrivableExecutor* e) {
504 return waitVia(e).getTry();
508 Optional<Try<T>> Future<T>::poll() {
510 if (core_->ready()) {
511 o = std::move(core_->getTry());
517 inline Future<T> Future<T>::via(Executor* executor, int8_t priority) && {
520 setExecutor(executor, priority);
522 return std::move(*this);
526 inline Future<T> Future<T>::via(Executor* executor, int8_t priority) & {
530 auto f = p.getFuture();
531 then([p = std::move(p)](Try<T> && t) mutable { p.setTry(std::move(t)); });
532 return std::move(f).via(executor, priority);
535 template <class Func>
536 auto via(Executor* x, Func&& func)
537 -> Future<typename isFuture<decltype(std::declval<Func>()())>::Inner> {
538 // TODO make this actually more performant. :-P #7260175
539 return via(x).then(std::forward<Func>(func));
543 bool Future<T>::isReady() const {
545 return core_->ready();
549 bool Future<T>::hasValue() {
550 return getTry().hasValue();
554 bool Future<T>::hasException() {
555 return getTry().hasException();
559 void Future<T>::raise(exception_wrapper exception) {
560 core_->raise(std::move(exception));
566 Future<typename std::decay<T>::type> makeFuture(T&& t) {
567 return makeFuture(Try<typename std::decay<T>::type>(std::forward<T>(t)));
570 inline // for multiple translation units
571 Future<Unit> makeFuture() {
572 return makeFuture(Unit{});
575 // makeFutureWith(Future<T>()) -> Future<T>
577 typename std::enable_if<isFuture<typename std::result_of<F()>::type>::value,
578 typename std::result_of<F()>::type>::type
579 makeFutureWith(F&& func) {
581 typename isFuture<typename std::result_of<F()>::type>::Inner;
583 return std::forward<F>(func)();
584 } catch (std::exception& e) {
585 return makeFuture<InnerType>(
586 exception_wrapper(std::current_exception(), e));
588 return makeFuture<InnerType>(exception_wrapper(std::current_exception()));
592 // makeFutureWith(T()) -> Future<T>
593 // makeFutureWith(void()) -> Future<Unit>
595 typename std::enable_if<
596 !(isFuture<typename std::result_of<F()>::type>::value),
597 Future<typename Unit::Lift<typename std::result_of<F()>::type>::type>>::type
598 makeFutureWith(F&& func) {
600 typename Unit::Lift<typename std::result_of<F()>::type>::type;
601 return makeFuture<LiftedResult>(
602 makeTryWith([&func]() mutable { return std::forward<F>(func)(); }));
606 Future<T> makeFuture(std::exception_ptr const& e) {
607 return makeFuture(Try<T>(e));
611 Future<T> makeFuture(exception_wrapper ew) {
612 return makeFuture(Try<T>(std::move(ew)));
615 template <class T, class E>
616 typename std::enable_if<std::is_base_of<std::exception, E>::value,
618 makeFuture(E const& e) {
619 return makeFuture(Try<T>(make_exception_wrapper<E>(e)));
623 Future<T> makeFuture(Try<T>&& t) {
624 return Future<T>(new detail::Core<T>(std::move(t)));
628 Future<Unit> via(Executor* executor, int8_t priority) {
629 return makeFuture().via(executor, priority);
632 // mapSetCallback calls func(i, Try<T>) when every future completes
634 template <class T, class InputIterator, class F>
635 void mapSetCallback(InputIterator first, InputIterator last, F func) {
636 for (size_t i = 0; first != last; ++first, ++i) {
637 first->setCallback_([func, i](Try<T>&& t) {
638 func(i, std::move(t));
643 // collectAll (variadic)
645 template <typename... Fs>
646 typename detail::CollectAllVariadicContext<
647 typename std::decay<Fs>::type::value_type...>::type
648 collectAll(Fs&&... fs) {
649 auto ctx = std::make_shared<detail::CollectAllVariadicContext<
650 typename std::decay<Fs>::type::value_type...>>();
651 detail::collectVariadicHelper<detail::CollectAllVariadicContext>(
652 ctx, std::forward<Fs>(fs)...);
653 return ctx->p.getFuture();
656 // collectAll (iterator)
658 template <class InputIterator>
661 Try<typename std::iterator_traits<InputIterator>::value_type::value_type>>>
662 collectAll(InputIterator first, InputIterator last) {
664 typename std::iterator_traits<InputIterator>::value_type::value_type T;
666 struct CollectAllContext {
667 CollectAllContext(size_t n) : results(n) {}
668 ~CollectAllContext() {
669 p.setValue(std::move(results));
671 Promise<std::vector<Try<T>>> p;
672 std::vector<Try<T>> results;
676 std::make_shared<CollectAllContext>(size_t(std::distance(first, last)));
677 mapSetCallback<T>(first, last, [ctx](size_t i, Try<T>&& t) {
678 ctx->results[i] = std::move(t);
680 return ctx->p.getFuture();
683 // collect (iterator)
687 template <typename T>
688 struct CollectContext {
690 explicit Nothing(int /* n */) {}
693 using Result = typename std::conditional<
694 std::is_void<T>::value,
696 std::vector<T>>::type;
698 using InternalResult = typename std::conditional<
699 std::is_void<T>::value,
701 std::vector<Optional<T>>>::type;
703 explicit CollectContext(size_t n) : result(n) {}
705 if (!threw.exchange(true)) {
706 // map Optional<T> -> T
707 std::vector<T> finalResult;
708 finalResult.reserve(result.size());
709 std::transform(result.begin(), result.end(),
710 std::back_inserter(finalResult),
711 [](Optional<T>& o) { return std::move(o.value()); });
712 p.setValue(std::move(finalResult));
715 inline void setPartialResult(size_t i, Try<T>& t) {
716 result[i] = std::move(t.value());
719 InternalResult result;
720 std::atomic<bool> threw {false};
725 template <class InputIterator>
726 Future<typename detail::CollectContext<
727 typename std::iterator_traits<InputIterator>::value_type::value_type>::Result>
728 collect(InputIterator first, InputIterator last) {
730 typename std::iterator_traits<InputIterator>::value_type::value_type T;
732 auto ctx = std::make_shared<detail::CollectContext<T>>(
733 std::distance(first, last));
734 mapSetCallback<T>(first, last, [ctx](size_t i, Try<T>&& t) {
735 if (t.hasException()) {
736 if (!ctx->threw.exchange(true)) {
737 ctx->p.setException(std::move(t.exception()));
739 } else if (!ctx->threw) {
740 ctx->setPartialResult(i, t);
743 return ctx->p.getFuture();
746 // collect (variadic)
748 template <typename... Fs>
749 typename detail::CollectVariadicContext<
750 typename std::decay<Fs>::type::value_type...>::type
751 collect(Fs&&... fs) {
752 auto ctx = std::make_shared<detail::CollectVariadicContext<
753 typename std::decay<Fs>::type::value_type...>>();
754 detail::collectVariadicHelper<detail::CollectVariadicContext>(
755 ctx, std::forward<Fs>(fs)...);
756 return ctx->p.getFuture();
759 // collectAny (iterator)
761 template <class InputIterator>
766 std::iterator_traits<InputIterator>::value_type::value_type>>>
767 collectAny(InputIterator first, InputIterator last) {
769 typename std::iterator_traits<InputIterator>::value_type::value_type T;
771 struct CollectAnyContext {
772 CollectAnyContext() {}
773 Promise<std::pair<size_t, Try<T>>> p;
774 std::atomic<bool> done {false};
777 auto ctx = std::make_shared<CollectAnyContext>();
778 mapSetCallback<T>(first, last, [ctx](size_t i, Try<T>&& t) {
779 if (!ctx->done.exchange(true)) {
780 ctx->p.setValue(std::make_pair(i, std::move(t)));
783 return ctx->p.getFuture();
786 // collectAnyWithoutException (iterator)
788 template <class InputIterator>
791 typename std::iterator_traits<InputIterator>::value_type::value_type>>
792 collectAnyWithoutException(InputIterator first, InputIterator last) {
794 typename std::iterator_traits<InputIterator>::value_type::value_type T;
796 struct CollectAnyWithoutExceptionContext {
797 CollectAnyWithoutExceptionContext(){}
798 Promise<std::pair<size_t, T>> p;
799 std::atomic<bool> done{false};
800 std::atomic<size_t> nFulfilled{0};
804 auto ctx = std::make_shared<CollectAnyWithoutExceptionContext>();
805 ctx->nTotal = size_t(std::distance(first, last));
807 mapSetCallback<T>(first, last, [ctx](size_t i, Try<T>&& t) {
808 if (!t.hasException() && !ctx->done.exchange(true)) {
809 ctx->p.setValue(std::make_pair(i, std::move(t.value())));
810 } else if (++ctx->nFulfilled == ctx->nTotal) {
811 ctx->p.setException(t.exception());
814 return ctx->p.getFuture();
817 // collectN (iterator)
819 template <class InputIterator>
820 Future<std::vector<std::pair<size_t, Try<typename
821 std::iterator_traits<InputIterator>::value_type::value_type>>>>
822 collectN(InputIterator first, InputIterator last, size_t n) {
824 std::iterator_traits<InputIterator>::value_type::value_type T;
825 typedef std::vector<std::pair<size_t, Try<T>>> V;
827 struct CollectNContext {
829 std::atomic<size_t> completed = {0};
832 auto ctx = std::make_shared<CollectNContext>();
834 if (size_t(std::distance(first, last)) < n) {
835 ctx->p.setException(std::runtime_error("Not enough futures"));
837 // for each completed Future, increase count and add to vector, until we
838 // have n completed futures at which point we fulfil our Promise with the
840 mapSetCallback<T>(first, last, [ctx, n](size_t i, Try<T>&& t) {
841 auto c = ++ctx->completed;
843 assert(ctx->v.size() < n);
844 ctx->v.emplace_back(i, std::move(t));
846 ctx->p.setTry(Try<V>(std::move(ctx->v)));
852 return ctx->p.getFuture();
857 template <class It, class T, class F>
858 Future<T> reduce(It first, It last, T&& initial, F&& func) {
860 return makeFuture(std::move(initial));
863 typedef typename std::iterator_traits<It>::value_type::value_type ItT;
865 typename std::conditional<detail::callableWith<F, T&&, Try<ItT>&&>::value,
868 typedef isTry<Arg> IsTry;
870 auto sfunc = std::make_shared<F>(std::move(func));
872 auto f = first->then(
873 [ minitial = std::move(initial), sfunc ](Try<ItT> & head) mutable {
875 std::move(minitial), head.template get<IsTry::value, Arg&&>());
878 for (++first; first != last; ++first) {
879 f = collectAll(f, *first).then([sfunc](std::tuple<Try<T>, Try<ItT>>& t) {
880 return (*sfunc)(std::move(std::get<0>(t).value()),
881 // Either return a ItT&& or a Try<ItT>&& depending
882 // on the type of the argument of func.
883 std::get<1>(t).template get<IsTry::value, Arg&&>());
890 // window (collection)
892 template <class Collection, class F, class ItT, class Result>
893 std::vector<Future<Result>>
894 window(Collection input, F func, size_t n) {
895 struct WindowContext {
896 WindowContext(Collection&& i, F&& fn)
897 : input_(std::move(i)), promises_(input_.size()),
900 std::atomic<size_t> i_ {0};
902 std::vector<Promise<Result>> promises_;
905 static inline void spawn(const std::shared_ptr<WindowContext>& ctx) {
906 size_t i = ctx->i_++;
907 if (i < ctx->input_.size()) {
908 // Using setCallback_ directly since we don't need the Future
909 ctx->func_(std::move(ctx->input_[i])).setCallback_(
910 // ctx is captured by value
911 [ctx, i](Try<Result>&& t) {
912 ctx->promises_[i].setTry(std::move(t));
913 // Chain another future onto this one
914 spawn(std::move(ctx));
920 auto max = std::min(n, input.size());
922 auto ctx = std::make_shared<WindowContext>(
923 std::move(input), std::move(func));
925 for (size_t i = 0; i < max; ++i) {
926 // Start the first n Futures
927 WindowContext::spawn(ctx);
930 std::vector<Future<Result>> futures;
931 futures.reserve(ctx->promises_.size());
932 for (auto& promise : ctx->promises_) {
933 futures.emplace_back(promise.getFuture());
942 template <class I, class F>
943 Future<I> Future<T>::reduce(I&& initial, F&& func) {
945 minitial = std::forward<I>(initial),
946 mfunc = std::forward<F>(func)
948 auto ret = std::move(minitial);
949 for (auto& val : vals) {
950 ret = mfunc(std::move(ret), std::move(val));
956 // unorderedReduce (iterator)
958 template <class It, class T, class F, class ItT, class Arg>
959 Future<T> unorderedReduce(It first, It last, T initial, F func) {
961 return makeFuture(std::move(initial));
964 typedef isTry<Arg> IsTry;
966 struct UnorderedReduceContext {
967 UnorderedReduceContext(T&& memo, F&& fn, size_t n)
968 : lock_(), memo_(makeFuture<T>(std::move(memo))),
969 func_(std::move(fn)), numThens_(0), numFutures_(n), promise_()
971 folly::MicroSpinLock lock_; // protects memo_ and numThens_
974 size_t numThens_; // how many Futures completed and called .then()
975 size_t numFutures_; // how many Futures in total
979 auto ctx = std::make_shared<UnorderedReduceContext>(
980 std::move(initial), std::move(func), std::distance(first, last));
985 [ctx](size_t /* i */, Try<ItT>&& t) {
986 // Futures can be completed in any order, simultaneously.
987 // To make this non-blocking, we create a new Future chain in
988 // the order of completion to reduce the values.
989 // The spinlock just protects chaining a new Future, not actually
990 // executing the reduce, which should be really fast.
991 folly::MSLGuard lock(ctx->lock_);
993 ctx->memo_.then([ ctx, mt = std::move(t) ](T && v) mutable {
994 // Either return a ItT&& or a Try<ItT>&& depending
995 // on the type of the argument of func.
996 return ctx->func_(std::move(v),
997 mt.template get<IsTry::value, Arg&&>());
999 if (++ctx->numThens_ == ctx->numFutures_) {
1000 // After reducing the value of the last Future, fulfill the Promise
1001 ctx->memo_.setCallback_(
1002 [ctx](Try<T>&& t2) { ctx->promise_.setValue(std::move(t2)); });
1006 return ctx->promise_.getFuture();
1012 Future<T> Future<T>::within(Duration dur, Timekeeper* tk) {
1013 return within(dur, TimedOut(), tk);
1018 Future<T> Future<T>::within(Duration dur, E e, Timekeeper* tk) {
1021 Context(E ex) : exception(std::move(ex)), promise() {}
1023 Future<Unit> thisFuture;
1025 std::atomic<bool> token {false};
1028 std::shared_ptr<Timekeeper> tks;
1030 tks = folly::detail::getTimekeeperSingleton();
1031 tk = DCHECK_NOTNULL(tks.get());
1034 auto ctx = std::make_shared<Context>(std::move(e));
1036 ctx->thisFuture = this->then([ctx](Try<T>&& t) mutable {
1037 // TODO: "this" completed first, cancel "after"
1038 if (ctx->token.exchange(true) == false) {
1039 ctx->promise.setTry(std::move(t));
1043 tk->after(dur).then([ctx](Try<Unit> const& t) mutable {
1044 // "after" completed first, cancel "this"
1045 ctx->thisFuture.raise(TimedOut());
1046 if (ctx->token.exchange(true) == false) {
1047 if (t.hasException()) {
1048 ctx->promise.setException(std::move(t.exception()));
1050 ctx->promise.setException(std::move(ctx->exception));
1055 return ctx->promise.getFuture().via(getExecutor());
1061 Future<T> Future<T>::delayed(Duration dur, Timekeeper* tk) {
1062 return collectAll(*this, futures::sleep(dur, tk))
1063 .then([](std::tuple<Try<T>, Try<Unit>> tup) {
1064 Try<T>& t = std::get<0>(tup);
1065 return makeFuture<T>(std::move(t));
1072 void waitImpl(Future<T>& f) {
1073 // short-circuit if there's nothing to do
1074 if (f.isReady()) return;
1076 FutureBatonType baton;
1077 f.setCallback_([&](const Try<T>& /* t */) { baton.post(); });
1079 assert(f.isReady());
1083 void waitImpl(Future<T>& f, Duration dur) {
1084 // short-circuit if there's nothing to do
1090 auto ret = promise.getFuture();
1091 auto baton = std::make_shared<FutureBatonType>();
1092 f.setCallback_([ baton, promise = std::move(promise) ](Try<T> && t) mutable {
1093 promise.setTry(std::move(t));
1097 if (baton->timed_wait(dur)) {
1098 assert(f.isReady());
1103 void waitViaImpl(Future<T>& f, DrivableExecutor* e) {
1104 // Set callback so to ensure that the via executor has something on it
1105 // so that once the preceding future triggers this callback, drive will
1106 // always have a callback to satisfy it
1109 f = f.via(e).then([](T&& t) { return std::move(t); });
1110 while (!f.isReady()) {
1113 assert(f.isReady());
1119 Future<T>& Future<T>::wait() & {
1120 detail::waitImpl(*this);
1125 Future<T>&& Future<T>::wait() && {
1126 detail::waitImpl(*this);
1127 return std::move(*this);
1131 Future<T>& Future<T>::wait(Duration dur) & {
1132 detail::waitImpl(*this, dur);
1137 Future<T>&& Future<T>::wait(Duration dur) && {
1138 detail::waitImpl(*this, dur);
1139 return std::move(*this);
1143 Future<T>& Future<T>::waitVia(DrivableExecutor* e) & {
1144 detail::waitViaImpl(*this, e);
1149 Future<T>&& Future<T>::waitVia(DrivableExecutor* e) && {
1150 detail::waitViaImpl(*this, e);
1151 return std::move(*this);
1155 T Future<T>::get() {
1156 return std::move(wait().value());
1160 T Future<T>::get(Duration dur) {
1163 return std::move(value());
1170 T Future<T>::getVia(DrivableExecutor* e) {
1171 return std::move(waitVia(e).value());
1177 static bool equals(const Try<T>& t1, const Try<T>& t2) {
1178 return t1.value() == t2.value();
1184 Future<bool> Future<T>::willEqual(Future<T>& f) {
1185 return collectAll(*this, f).then([](const std::tuple<Try<T>, Try<T>>& t) {
1186 if (std::get<0>(t).hasValue() && std::get<1>(t).hasValue()) {
1187 return detail::TryEquals<T>::equals(std::get<0>(t), std::get<1>(t));
1196 Future<T> Future<T>::filter(F&& predicate) {
1197 return this->then([p = std::forward<F>(predicate)](T val) {
1198 T const& valConstRef = val;
1199 if (!p(valConstRef)) {
1200 throw PredicateDoesNotObtain();
1207 inline Future<Unit> when(bool p, F&& thunk) {
1208 return p ? std::forward<F>(thunk)().unit() : makeFuture();
1211 template <class P, class F>
1212 Future<Unit> whileDo(P&& predicate, F&& thunk) {
1214 auto future = thunk();
1215 return future.then([
1216 predicate = std::forward<P>(predicate),
1217 thunk = std::forward<F>(thunk)
1219 return whileDo(std::forward<P>(predicate), std::forward<F>(thunk));
1222 return makeFuture();
1226 Future<Unit> times(const int n, F&& thunk) {
1227 return folly::whileDo(
1228 [ n, count = std::make_unique<std::atomic<int>>(0) ]() mutable {
1229 return count->fetch_add(1) < n;
1231 std::forward<F>(thunk));
1235 template <class It, class F, class ItT, class Result>
1236 std::vector<Future<Result>> map(It first, It last, F func) {
1237 std::vector<Future<Result>> results;
1238 for (auto it = first; it != last; it++) {
1239 results.push_back(it->then(func));
1249 struct retrying_policy_raw_tag {};
1250 struct retrying_policy_fut_tag {};
1252 template <class Policy>
1253 struct retrying_policy_traits {
1254 using ew = exception_wrapper;
1255 FOLLY_CREATE_HAS_MEMBER_FN_TRAITS(has_op_call, operator());
1256 template <class Ret>
1257 using has_op = typename std::integral_constant<bool,
1258 has_op_call<Policy, Ret(size_t, const ew&)>::value ||
1259 has_op_call<Policy, Ret(size_t, const ew&) const>::value>;
1260 using is_raw = has_op<bool>;
1261 using is_fut = has_op<Future<bool>>;
1262 using tag = typename std::conditional<
1263 is_raw::value, retrying_policy_raw_tag, typename std::conditional<
1264 is_fut::value, retrying_policy_fut_tag, void>::type>::type;
1267 template <class Policy, class FF, class Prom>
1268 void retryingImpl(size_t k, Policy&& p, FF&& ff, Prom prom) {
1269 using F = typename std::result_of<FF(size_t)>::type;
1270 using T = typename F::value_type;
1271 auto f = makeFutureWith([&] { return ff(k++); });
1274 prom = std::move(prom),
1275 pm = std::forward<Policy>(p),
1276 ffm = std::forward<FF>(ff)
1277 ](Try<T> && t) mutable {
1279 prom.setValue(std::move(t).value());
1282 auto& x = t.exception();
1286 prom = std::move(prom),
1289 ffm = std::move(ffm)
1290 ](bool shouldRetry) mutable {
1292 retryingImpl(k, std::move(pm), std::move(ffm), std::move(prom));
1294 prom.setException(std::move(xm));
1300 template <class Policy, class FF>
1301 typename std::result_of<FF(size_t)>::type
1302 retrying(size_t k, Policy&& p, FF&& ff) {
1303 using F = typename std::result_of<FF(size_t)>::type;
1304 using T = typename F::value_type;
1305 auto prom = Promise<T>();
1306 auto f = prom.getFuture();
1308 k, std::forward<Policy>(p), std::forward<FF>(ff), std::move(prom));
1312 template <class Policy, class FF>
1313 typename std::result_of<FF(size_t)>::type
1314 retrying(Policy&& p, FF&& ff, retrying_policy_raw_tag) {
1315 auto q = [pm = std::forward<Policy>(p)](size_t k, exception_wrapper x) {
1316 return makeFuture<bool>(pm(k, x));
1318 return retrying(0, std::move(q), std::forward<FF>(ff));
1321 template <class Policy, class FF>
1322 typename std::result_of<FF(size_t)>::type
1323 retrying(Policy&& p, FF&& ff, retrying_policy_fut_tag) {
1324 return retrying(0, std::forward<Policy>(p), std::forward<FF>(ff));
1327 // jittered exponential backoff, clamped to [backoff_min, backoff_max]
1328 template <class URNG>
1329 Duration retryingJitteredExponentialBackoffDur(
1331 Duration backoff_min,
1332 Duration backoff_max,
1333 double jitter_param,
1336 auto dist = std::normal_distribution<double>(0.0, jitter_param);
1337 auto jitter = std::exp(dist(rng));
1338 auto backoff = d(d::rep(jitter * backoff_min.count() * std::pow(2, n - 1)));
1339 return std::max(backoff_min, std::min(backoff_max, backoff));
1342 template <class Policy, class URNG>
1343 std::function<Future<bool>(size_t, const exception_wrapper&)>
1344 retryingPolicyCappedJitteredExponentialBackoff(
1346 Duration backoff_min,
1347 Duration backoff_max,
1348 double jitter_param,
1352 pm = std::forward<Policy>(p),
1357 rngp = std::forward<URNG>(rng)
1358 ](size_t n, const exception_wrapper& ex) mutable {
1359 if (n == max_tries) {
1360 return makeFuture(false);
1362 return pm(n, ex).then(
1363 [ n, backoff_min, backoff_max, jitter_param, rngp = std::move(rngp) ](
1366 return makeFuture(false);
1368 auto backoff = detail::retryingJitteredExponentialBackoffDur(
1369 n, backoff_min, backoff_max, jitter_param, rngp);
1370 return futures::sleep(backoff).then([] { return true; });
1375 template <class Policy, class URNG>
1376 std::function<Future<bool>(size_t, const exception_wrapper&)>
1377 retryingPolicyCappedJitteredExponentialBackoff(
1379 Duration backoff_min,
1380 Duration backoff_max,
1381 double jitter_param,
1384 retrying_policy_raw_tag) {
1385 auto q = [pm = std::forward<Policy>(p)](
1386 size_t n, const exception_wrapper& e) {
1387 return makeFuture(pm(n, e));
1389 return retryingPolicyCappedJitteredExponentialBackoff(
1394 std::forward<URNG>(rng),
1398 template <class Policy, class URNG>
1399 std::function<Future<bool>(size_t, const exception_wrapper&)>
1400 retryingPolicyCappedJitteredExponentialBackoff(
1402 Duration backoff_min,
1403 Duration backoff_max,
1404 double jitter_param,
1407 retrying_policy_fut_tag) {
1408 return retryingPolicyCappedJitteredExponentialBackoff(
1413 std::forward<URNG>(rng),
1414 std::forward<Policy>(p));
1418 template <class Policy, class FF>
1419 typename std::result_of<FF(size_t)>::type
1420 retrying(Policy&& p, FF&& ff) {
1421 using tag = typename detail::retrying_policy_traits<Policy>::tag;
1422 return detail::retrying(std::forward<Policy>(p), std::forward<FF>(ff), tag());
1426 std::function<bool(size_t, const exception_wrapper&)>
1427 retryingPolicyBasic(
1429 return [=](size_t n, const exception_wrapper&) { return n < max_tries; };
1432 template <class Policy, class URNG>
1433 std::function<Future<bool>(size_t, const exception_wrapper&)>
1434 retryingPolicyCappedJitteredExponentialBackoff(
1436 Duration backoff_min,
1437 Duration backoff_max,
1438 double jitter_param,
1441 using tag = typename detail::retrying_policy_traits<Policy>::tag;
1442 return detail::retryingPolicyCappedJitteredExponentialBackoff(
1447 std::forward<URNG>(rng),
1448 std::forward<Policy>(p),
1453 std::function<Future<bool>(size_t, const exception_wrapper&)>
1454 retryingPolicyCappedJitteredExponentialBackoff(
1456 Duration backoff_min,
1457 Duration backoff_max,
1458 double jitter_param) {
1459 auto p = [](size_t, const exception_wrapper&) { return true; };
1460 return retryingPolicyCappedJitteredExponentialBackoff(
1471 // Instantiate the most common Future types to save compile time
1472 extern template class Future<Unit>;
1473 extern template class Future<bool>;
1474 extern template class Future<int>;
1475 extern template class Future<int64_t>;
1476 extern template class Future<std::string>;
1477 extern template class Future<double>;
1479 } // namespace folly