2 * Copyright 2017 Facebook, Inc.
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
8 * http://www.apache.org/licenses/LICENSE-2.0
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
24 #include <folly/Baton.h>
25 #include <folly/Optional.h>
26 #include <folly/Random.h>
27 #include <folly/futures/detail/Core.h>
28 #include <folly/futures/Timekeeper.h>
30 #if FOLLY_MOBILE || defined(__APPLE__)
31 #define FOLLY_FUTURE_USING_FIBER 0
33 #define FOLLY_FUTURE_USING_FIBER 1
34 #include <folly/fibers/Baton.h>
42 #if FOLLY_FUTURE_USING_FIBER
43 typedef folly::fibers::Baton FutureBatonType;
45 typedef folly::Baton<> FutureBatonType;
50 std::shared_ptr<Timekeeper> getTimekeeperSingleton();
52 // Guarantees that the stored functor is destructed before the stored promise
53 // may be fulfilled. Assumes the stored functor to be noexcept-destructible.
54 template <typename T, typename F>
55 class CoreCallbackState {
57 template <typename FF>
58 CoreCallbackState(Promise<T>&& promise, FF&& func) noexcept(
59 noexcept(F(std::declval<FF>())))
60 : func_(std::forward<FF>(func)), promise_(std::move(promise)) {
61 assert(before_barrier());
64 CoreCallbackState(CoreCallbackState&& that) noexcept(
65 noexcept(F(std::declval<F>()))) {
66 if (that.before_barrier()) {
67 new (&func_) F(std::move(that.func_));
68 promise_ = that.stealPromise();
72 CoreCallbackState& operator=(CoreCallbackState&&) = delete;
74 ~CoreCallbackState() {
75 if (before_barrier()) {
80 template <typename... Args>
81 auto invoke(Args&&... args) noexcept(
82 noexcept(std::declval<F&&>()(std::declval<Args&&>()...))) {
83 assert(before_barrier());
84 return std::move(func_)(std::forward<Args>(args)...);
87 template <typename... Args>
88 auto tryInvoke(Args&&... args) noexcept {
89 return makeTryWith([&] { return invoke(std::forward<Args>(args)...); });
92 void setTry(Try<T>&& t) {
93 stealPromise().setTry(std::move(t));
96 void setException(exception_wrapper&& ew) {
97 stealPromise().setException(std::move(ew));
100 Promise<T> stealPromise() noexcept {
101 assert(before_barrier());
103 return std::move(promise_);
107 bool before_barrier() const noexcept {
108 return !promise_.isFulfilled();
114 Promise<T> promise_{Promise<T>::makeEmpty()};
117 template <typename T, typename F>
118 inline auto makeCoreCallbackState(Promise<T>&& p, F&& f) noexcept(
119 noexcept(CoreCallbackState<T, _t<std::decay<F>>>(
120 std::declval<Promise<T>&&>(),
121 std::declval<F&&>()))) {
122 return CoreCallbackState<T, _t<std::decay<F>>>(
123 std::move(p), std::forward<F>(f));
128 Future<T> Future<T>::makeEmpty() {
129 return Future<T>(detail::EmptyConstruct{});
133 Future<T>::Future(Future<T>&& other) noexcept : core_(other.core_) {
134 other.core_ = nullptr;
138 Future<T>& Future<T>::operator=(Future<T>&& other) noexcept {
139 std::swap(core_, other.core_);
146 typename std::enable_if<
147 !std::is_same<T, typename std::decay<T2>::type>::value &&
148 std::is_constructible<T, T2&&>::value &&
149 std::is_convertible<T2&&, T>::value,
151 Future<T>::Future(Future<T2>&& other)
152 : Future(std::move(other).then([](T2&& v) { return T(std::move(v)); })) {}
157 typename std::enable_if<
158 !std::is_same<T, typename std::decay<T2>::type>::value &&
159 std::is_constructible<T, T2&&>::value &&
160 !std::is_convertible<T2&&, T>::value,
162 Future<T>::Future(Future<T2>&& other)
163 : Future(std::move(other).then([](T2&& v) { return T(std::move(v)); })) {}
168 typename std::enable_if<
169 !std::is_same<T, typename std::decay<T2>::type>::value &&
170 std::is_constructible<T, T2&&>::value,
172 Future<T>& Future<T>::operator=(Future<T2>&& other) {
174 std::move(other).then([](T2&& v) { return T(std::move(v)); }));
178 template <class T2, typename>
179 Future<T>::Future(T2&& val)
180 : core_(new detail::Core<T>(Try<T>(std::forward<T2>(val)))) {}
183 template <typename T2>
184 Future<T>::Future(typename std::enable_if<std::is_same<Unit, T2>::value>::type*)
185 : core_(new detail::Core<T>(Try<T>(T()))) {}
188 Future<T>::~Future() {
193 void Future<T>::detach() {
195 core_->detachFuture();
201 void Future<T>::throwIfInvalid() const {
208 void Future<T>::setCallback_(F&& func) {
210 core_->setCallback(std::forward<F>(func));
217 typename std::enable_if<isFuture<F>::value,
218 Future<typename isFuture<T>::Inner>>::type
219 Future<T>::unwrap() {
220 return then([](Future<typename isFuture<T>::Inner> internal_future) {
221 return internal_future;
227 // Variant: returns a value
228 // e.g. f.then([](Try<T>&& t){ return t.value(); });
230 template <typename F, typename R, bool isTry, typename... Args>
231 typename std::enable_if<!R::ReturnsFuture::value, typename R::Return>::type
232 Future<T>::thenImplementation(F&& func, detail::argResult<isTry, F, Args...>) {
233 static_assert(sizeof...(Args) <= 1, "Then must take zero/one argument");
234 typedef typename R::ReturnsFuture::Inner B;
239 p.core_->setInterruptHandlerNoLock(core_->getInterruptHandler());
241 // grab the Future now before we lose our handle on the Promise
242 auto f = p.getFuture();
243 f.core_->setExecutorNoLock(getExecutor());
245 /* This is a bit tricky.
247 We can't just close over *this in case this Future gets moved. So we
248 make a new dummy Future. We could figure out something more
249 sophisticated that avoids making a new Future object when it can, as an
250 optimization. But this is correct.
252 core_ can't be moved, it is explicitly disallowed (as is copying). But
253 if there's ever a reason to allow it, this is one place that makes that
254 assumption and would need to be fixed. We use a standard shared pointer
255 for core_ (by copying it in), which means in essence obj holds a shared
256 pointer to itself. But this shouldn't leak because Promise will not
257 outlive the continuation, because Promise will setException() with a
258 broken Promise if it is destructed before completed. We could use a
259 weak pointer but it would have to be converted to a shared pointer when
260 func is executed (because the Future returned by func may possibly
261 persist beyond the callback, if it gets moved), and so it is an
262 optimization to just make it shared from the get-go.
264 Two subtle but important points about this design. detail::Core has no
265 back pointers to Future or Promise, so if Future or Promise get moved
266 (and they will be moved in performant code) we don't have to do
267 anything fancy. And because we store the continuation in the
268 detail::Core, not in the Future, we can execute the continuation even
269 after the Future has gone out of scope. This is an intentional design
270 decision. It is likely we will want to be able to cancel a continuation
271 in some circumstances, but I think it should be explicit not implicit
272 in the destruction of the Future used to create it.
275 [state = detail::makeCoreCallbackState(
276 std::move(p), std::forward<F>(func))](Try<T> && t) mutable {
277 if (!isTry && t.hasException()) {
278 state.setException(std::move(t.exception()));
280 state.setTry(makeTryWith(
281 [&] { return state.invoke(t.template get<isTry, Args>()...); }));
288 // Variant: returns a Future
289 // e.g. f.then([](T&& t){ return makeFuture<T>(t); });
291 template <typename F, typename R, bool isTry, typename... Args>
292 typename std::enable_if<R::ReturnsFuture::value, typename R::Return>::type
293 Future<T>::thenImplementation(F&& func, detail::argResult<isTry, F, Args...>) {
294 static_assert(sizeof...(Args) <= 1, "Then must take zero/one argument");
295 typedef typename R::ReturnsFuture::Inner B;
300 p.core_->setInterruptHandlerNoLock(core_->getInterruptHandler());
302 // grab the Future now before we lose our handle on the Promise
303 auto f = p.getFuture();
304 f.core_->setExecutorNoLock(getExecutor());
307 [state = detail::makeCoreCallbackState(
308 std::move(p), std::forward<F>(func))](Try<T> && t) mutable {
309 if (!isTry && t.hasException()) {
310 state.setException(std::move(t.exception()));
312 auto tf2 = state.tryInvoke(t.template get<isTry, Args>()...);
313 if (tf2.hasException()) {
314 state.setException(std::move(tf2.exception()));
316 tf2->setCallback_([p = state.stealPromise()](Try<B> && b) mutable {
317 p.setTry(std::move(b));
326 template <typename T>
327 template <typename R, typename Caller, typename... Args>
328 Future<typename isFuture<R>::Inner>
329 Future<T>::then(R(Caller::*func)(Args...), Caller *instance) {
330 typedef typename std::remove_cv<
331 typename std::remove_reference<
332 typename detail::ArgType<Args...>::FirstArg>::type>::type FirstArg;
333 return then([instance, func](Try<T>&& t){
334 return (instance->*func)(t.template get<isTry<FirstArg>::value, Args>()...);
339 Future<Unit> Future<T>::then() {
340 return then([] () {});
343 // onError where the callback returns T
346 typename std::enable_if<
347 !detail::callableWith<F, exception_wrapper>::value &&
348 !detail::Extract<F>::ReturnsFuture::value,
350 Future<T>::onError(F&& func) {
351 typedef std::remove_reference_t<typename detail::Extract<F>::FirstArg> Exn;
353 std::is_same<typename detail::Extract<F>::RawReturn, T>::value,
354 "Return type of onError callback must be T or Future<T>");
357 p.core_->setInterruptHandlerNoLock(core_->getInterruptHandler());
358 auto f = p.getFuture();
361 [state = detail::makeCoreCallbackState(
362 std::move(p), std::forward<F>(func))](Try<T> && t) mutable {
363 if (auto e = t.template tryGetExceptionObject<Exn>()) {
364 state.setTry(makeTryWith([&] { return state.invoke(*e); }));
366 state.setTry(std::move(t));
373 // onError where the callback returns Future<T>
376 typename std::enable_if<
377 !detail::callableWith<F, exception_wrapper>::value &&
378 detail::Extract<F>::ReturnsFuture::value,
380 Future<T>::onError(F&& func) {
382 std::is_same<typename detail::Extract<F>::Return, Future<T>>::value,
383 "Return type of onError callback must be T or Future<T>");
384 typedef std::remove_reference_t<typename detail::Extract<F>::FirstArg> Exn;
387 auto f = p.getFuture();
390 [state = detail::makeCoreCallbackState(
391 std::move(p), std::forward<F>(func))](Try<T> && t) mutable {
392 if (auto e = t.template tryGetExceptionObject<Exn>()) {
393 auto tf2 = state.tryInvoke(*e);
394 if (tf2.hasException()) {
395 state.setException(std::move(tf2.exception()));
397 tf2->setCallback_([p = state.stealPromise()](Try<T> && t3) mutable {
398 p.setTry(std::move(t3));
402 state.setTry(std::move(t));
411 Future<T> Future<T>::ensure(F&& func) {
412 return this->then([funcw = std::forward<F>(func)](Try<T> && t) mutable {
414 return makeFuture(std::move(t));
420 Future<T> Future<T>::onTimeout(Duration dur, F&& func, Timekeeper* tk) {
421 return within(dur, tk).onError([funcw = std::forward<F>(func)](
422 TimedOut const&) { return std::move(funcw)(); });
427 typename std::enable_if<detail::callableWith<F, exception_wrapper>::value &&
428 detail::Extract<F>::ReturnsFuture::value,
430 Future<T>::onError(F&& func) {
432 std::is_same<typename detail::Extract<F>::Return, Future<T>>::value,
433 "Return type of onError callback must be T or Future<T>");
436 auto f = p.getFuture();
438 [state = detail::makeCoreCallbackState(
439 std::move(p), std::forward<F>(func))](Try<T> t) mutable {
440 if (t.hasException()) {
441 auto tf2 = state.tryInvoke(std::move(t.exception()));
442 if (tf2.hasException()) {
443 state.setException(std::move(tf2.exception()));
445 tf2->setCallback_([p = state.stealPromise()](Try<T> && t3) mutable {
446 p.setTry(std::move(t3));
450 state.setTry(std::move(t));
457 // onError(exception_wrapper) that returns T
460 typename std::enable_if<
461 detail::callableWith<F, exception_wrapper>::value &&
462 !detail::Extract<F>::ReturnsFuture::value,
464 Future<T>::onError(F&& func) {
466 std::is_same<typename detail::Extract<F>::Return, Future<T>>::value,
467 "Return type of onError callback must be T or Future<T>");
470 auto f = p.getFuture();
472 [state = detail::makeCoreCallbackState(
473 std::move(p), std::forward<F>(func))](Try<T> && t) mutable {
474 if (t.hasException()) {
475 state.setTry(makeTryWith(
476 [&] { return state.invoke(std::move(t.exception())); }));
478 state.setTry(std::move(t));
486 typename std::add_lvalue_reference<T>::type Future<T>::value() {
489 return core_->getTry().value();
493 typename std::add_lvalue_reference<const T>::type Future<T>::value() const {
496 return core_->getTry().value();
500 Try<T>& Future<T>::getTry() {
503 return core_->getTry();
507 Try<T>& Future<T>::getTryVia(DrivableExecutor* e) {
508 return waitVia(e).getTry();
512 Optional<Try<T>> Future<T>::poll() {
514 if (core_->ready()) {
515 o = std::move(core_->getTry());
521 inline Future<T> Future<T>::via(Executor* executor, int8_t priority) && {
524 setExecutor(executor, priority);
526 return std::move(*this);
530 inline Future<T> Future<T>::via(Executor* executor, int8_t priority) & {
534 auto f = p.getFuture();
535 then([p = std::move(p)](Try<T> && t) mutable { p.setTry(std::move(t)); });
536 return std::move(f).via(executor, priority);
539 template <class Func>
540 auto via(Executor* x, Func&& func)
541 -> Future<typename isFuture<decltype(std::declval<Func>()())>::Inner> {
542 // TODO make this actually more performant. :-P #7260175
543 return via(x).then(std::forward<Func>(func));
547 bool Future<T>::isReady() const {
549 return core_->ready();
553 bool Future<T>::hasValue() {
554 return getTry().hasValue();
558 bool Future<T>::hasException() {
559 return getTry().hasException();
563 void Future<T>::raise(exception_wrapper exception) {
564 core_->raise(std::move(exception));
568 Future<T>::Future(detail::EmptyConstruct) noexcept
574 Future<typename std::decay<T>::type> makeFuture(T&& t) {
575 return makeFuture(Try<typename std::decay<T>::type>(std::forward<T>(t)));
578 inline // for multiple translation units
579 Future<Unit> makeFuture() {
580 return makeFuture(Unit{});
583 // makeFutureWith(Future<T>()) -> Future<T>
585 typename std::enable_if<isFuture<typename std::result_of<F()>::type>::value,
586 typename std::result_of<F()>::type>::type
587 makeFutureWith(F&& func) {
589 typename isFuture<typename std::result_of<F()>::type>::Inner;
591 return std::forward<F>(func)();
592 } catch (std::exception& e) {
593 return makeFuture<InnerType>(
594 exception_wrapper(std::current_exception(), e));
596 return makeFuture<InnerType>(exception_wrapper(std::current_exception()));
600 // makeFutureWith(T()) -> Future<T>
601 // makeFutureWith(void()) -> Future<Unit>
603 typename std::enable_if<
604 !(isFuture<typename std::result_of<F()>::type>::value),
605 Future<typename Unit::Lift<typename std::result_of<F()>::type>::type>>::type
606 makeFutureWith(F&& func) {
608 typename Unit::Lift<typename std::result_of<F()>::type>::type;
609 return makeFuture<LiftedResult>(
610 makeTryWith([&func]() mutable { return std::forward<F>(func)(); }));
614 Future<T> makeFuture(std::exception_ptr const& e) {
615 return makeFuture(Try<T>(e));
619 Future<T> makeFuture(exception_wrapper ew) {
620 return makeFuture(Try<T>(std::move(ew)));
623 template <class T, class E>
624 typename std::enable_if<std::is_base_of<std::exception, E>::value,
626 makeFuture(E const& e) {
627 return makeFuture(Try<T>(make_exception_wrapper<E>(e)));
631 Future<T> makeFuture(Try<T>&& t) {
632 return Future<T>(new detail::Core<T>(std::move(t)));
636 Future<Unit> via(Executor* executor, int8_t priority) {
637 return makeFuture().via(executor, priority);
640 // mapSetCallback calls func(i, Try<T>) when every future completes
642 template <class T, class InputIterator, class F>
643 void mapSetCallback(InputIterator first, InputIterator last, F func) {
644 for (size_t i = 0; first != last; ++first, ++i) {
645 first->setCallback_([func, i](Try<T>&& t) {
646 func(i, std::move(t));
651 // collectAll (variadic)
653 template <typename... Fs>
654 typename detail::CollectAllVariadicContext<
655 typename std::decay<Fs>::type::value_type...>::type
656 collectAll(Fs&&... fs) {
657 auto ctx = std::make_shared<detail::CollectAllVariadicContext<
658 typename std::decay<Fs>::type::value_type...>>();
659 detail::collectVariadicHelper<detail::CollectAllVariadicContext>(
660 ctx, std::forward<Fs>(fs)...);
661 return ctx->p.getFuture();
664 // collectAll (iterator)
666 template <class InputIterator>
669 Try<typename std::iterator_traits<InputIterator>::value_type::value_type>>>
670 collectAll(InputIterator first, InputIterator last) {
672 typename std::iterator_traits<InputIterator>::value_type::value_type T;
674 struct CollectAllContext {
675 CollectAllContext(size_t n) : results(n) {}
676 ~CollectAllContext() {
677 p.setValue(std::move(results));
679 Promise<std::vector<Try<T>>> p;
680 std::vector<Try<T>> results;
684 std::make_shared<CollectAllContext>(size_t(std::distance(first, last)));
685 mapSetCallback<T>(first, last, [ctx](size_t i, Try<T>&& t) {
686 ctx->results[i] = std::move(t);
688 return ctx->p.getFuture();
691 // collect (iterator)
695 template <typename T>
696 struct CollectContext {
698 explicit Nothing(int /* n */) {}
701 using Result = typename std::conditional<
702 std::is_void<T>::value,
704 std::vector<T>>::type;
706 using InternalResult = typename std::conditional<
707 std::is_void<T>::value,
709 std::vector<Optional<T>>>::type;
711 explicit CollectContext(size_t n) : result(n) {}
713 if (!threw.exchange(true)) {
714 // map Optional<T> -> T
715 std::vector<T> finalResult;
716 finalResult.reserve(result.size());
717 std::transform(result.begin(), result.end(),
718 std::back_inserter(finalResult),
719 [](Optional<T>& o) { return std::move(o.value()); });
720 p.setValue(std::move(finalResult));
723 inline void setPartialResult(size_t i, Try<T>& t) {
724 result[i] = std::move(t.value());
727 InternalResult result;
728 std::atomic<bool> threw {false};
733 template <class InputIterator>
734 Future<typename detail::CollectContext<
735 typename std::iterator_traits<InputIterator>::value_type::value_type>::Result>
736 collect(InputIterator first, InputIterator last) {
738 typename std::iterator_traits<InputIterator>::value_type::value_type T;
740 auto ctx = std::make_shared<detail::CollectContext<T>>(
741 std::distance(first, last));
742 mapSetCallback<T>(first, last, [ctx](size_t i, Try<T>&& t) {
743 if (t.hasException()) {
744 if (!ctx->threw.exchange(true)) {
745 ctx->p.setException(std::move(t.exception()));
747 } else if (!ctx->threw) {
748 ctx->setPartialResult(i, t);
751 return ctx->p.getFuture();
754 // collect (variadic)
756 template <typename... Fs>
757 typename detail::CollectVariadicContext<
758 typename std::decay<Fs>::type::value_type...>::type
759 collect(Fs&&... fs) {
760 auto ctx = std::make_shared<detail::CollectVariadicContext<
761 typename std::decay<Fs>::type::value_type...>>();
762 detail::collectVariadicHelper<detail::CollectVariadicContext>(
763 ctx, std::forward<Fs>(fs)...);
764 return ctx->p.getFuture();
767 // collectAny (iterator)
769 template <class InputIterator>
774 std::iterator_traits<InputIterator>::value_type::value_type>>>
775 collectAny(InputIterator first, InputIterator last) {
777 typename std::iterator_traits<InputIterator>::value_type::value_type T;
779 struct CollectAnyContext {
780 CollectAnyContext() {}
781 Promise<std::pair<size_t, Try<T>>> p;
782 std::atomic<bool> done {false};
785 auto ctx = std::make_shared<CollectAnyContext>();
786 mapSetCallback<T>(first, last, [ctx](size_t i, Try<T>&& t) {
787 if (!ctx->done.exchange(true)) {
788 ctx->p.setValue(std::make_pair(i, std::move(t)));
791 return ctx->p.getFuture();
794 // collectAnyWithoutException (iterator)
796 template <class InputIterator>
799 typename std::iterator_traits<InputIterator>::value_type::value_type>>
800 collectAnyWithoutException(InputIterator first, InputIterator last) {
802 typename std::iterator_traits<InputIterator>::value_type::value_type T;
804 struct CollectAnyWithoutExceptionContext {
805 CollectAnyWithoutExceptionContext(){}
806 Promise<std::pair<size_t, T>> p;
807 std::atomic<bool> done{false};
808 std::atomic<size_t> nFulfilled{0};
812 auto ctx = std::make_shared<CollectAnyWithoutExceptionContext>();
813 ctx->nTotal = size_t(std::distance(first, last));
815 mapSetCallback<T>(first, last, [ctx](size_t i, Try<T>&& t) {
816 if (!t.hasException() && !ctx->done.exchange(true)) {
817 ctx->p.setValue(std::make_pair(i, std::move(t.value())));
818 } else if (++ctx->nFulfilled == ctx->nTotal) {
819 ctx->p.setException(t.exception());
822 return ctx->p.getFuture();
825 // collectN (iterator)
827 template <class InputIterator>
828 Future<std::vector<std::pair<size_t, Try<typename
829 std::iterator_traits<InputIterator>::value_type::value_type>>>>
830 collectN(InputIterator first, InputIterator last, size_t n) {
832 std::iterator_traits<InputIterator>::value_type::value_type T;
833 typedef std::vector<std::pair<size_t, Try<T>>> V;
835 struct CollectNContext {
837 std::atomic<size_t> completed = {0};
840 auto ctx = std::make_shared<CollectNContext>();
842 if (size_t(std::distance(first, last)) < n) {
843 ctx->p.setException(std::runtime_error("Not enough futures"));
845 // for each completed Future, increase count and add to vector, until we
846 // have n completed futures at which point we fulfil our Promise with the
848 mapSetCallback<T>(first, last, [ctx, n](size_t i, Try<T>&& t) {
849 auto c = ++ctx->completed;
851 assert(ctx->v.size() < n);
852 ctx->v.emplace_back(i, std::move(t));
854 ctx->p.setTry(Try<V>(std::move(ctx->v)));
860 return ctx->p.getFuture();
865 template <class It, class T, class F>
866 Future<T> reduce(It first, It last, T&& initial, F&& func) {
868 return makeFuture(std::move(initial));
871 typedef typename std::iterator_traits<It>::value_type::value_type ItT;
873 typename std::conditional<detail::callableWith<F, T&&, Try<ItT>&&>::value,
876 typedef isTry<Arg> IsTry;
878 auto sfunc = std::make_shared<F>(std::move(func));
880 auto f = first->then(
881 [ minitial = std::move(initial), sfunc ](Try<ItT> & head) mutable {
883 std::move(minitial), head.template get<IsTry::value, Arg&&>());
886 for (++first; first != last; ++first) {
887 f = collectAll(f, *first).then([sfunc](std::tuple<Try<T>, Try<ItT>>& t) {
888 return (*sfunc)(std::move(std::get<0>(t).value()),
889 // Either return a ItT&& or a Try<ItT>&& depending
890 // on the type of the argument of func.
891 std::get<1>(t).template get<IsTry::value, Arg&&>());
898 // window (collection)
900 template <class Collection, class F, class ItT, class Result>
901 std::vector<Future<Result>>
902 window(Collection input, F func, size_t n) {
903 struct WindowContext {
904 WindowContext(Collection&& i, F&& fn)
905 : input_(std::move(i)), promises_(input_.size()),
908 std::atomic<size_t> i_ {0};
910 std::vector<Promise<Result>> promises_;
913 static inline void spawn(const std::shared_ptr<WindowContext>& ctx) {
914 size_t i = ctx->i_++;
915 if (i < ctx->input_.size()) {
916 // Using setCallback_ directly since we don't need the Future
917 ctx->func_(std::move(ctx->input_[i])).setCallback_(
918 // ctx is captured by value
919 [ctx, i](Try<Result>&& t) {
920 ctx->promises_[i].setTry(std::move(t));
921 // Chain another future onto this one
922 spawn(std::move(ctx));
928 auto max = std::min(n, input.size());
930 auto ctx = std::make_shared<WindowContext>(
931 std::move(input), std::move(func));
933 for (size_t i = 0; i < max; ++i) {
934 // Start the first n Futures
935 WindowContext::spawn(ctx);
938 std::vector<Future<Result>> futures;
939 futures.reserve(ctx->promises_.size());
940 for (auto& promise : ctx->promises_) {
941 futures.emplace_back(promise.getFuture());
950 template <class I, class F>
951 Future<I> Future<T>::reduce(I&& initial, F&& func) {
953 minitial = std::forward<I>(initial),
954 mfunc = std::forward<F>(func)
956 auto ret = std::move(minitial);
957 for (auto& val : vals) {
958 ret = mfunc(std::move(ret), std::move(val));
964 // unorderedReduce (iterator)
966 template <class It, class T, class F, class ItT, class Arg>
967 Future<T> unorderedReduce(It first, It last, T initial, F func) {
969 return makeFuture(std::move(initial));
972 typedef isTry<Arg> IsTry;
974 struct UnorderedReduceContext {
975 UnorderedReduceContext(T&& memo, F&& fn, size_t n)
976 : lock_(), memo_(makeFuture<T>(std::move(memo))),
977 func_(std::move(fn)), numThens_(0), numFutures_(n), promise_()
979 folly::MicroSpinLock lock_; // protects memo_ and numThens_
982 size_t numThens_; // how many Futures completed and called .then()
983 size_t numFutures_; // how many Futures in total
987 auto ctx = std::make_shared<UnorderedReduceContext>(
988 std::move(initial), std::move(func), std::distance(first, last));
993 [ctx](size_t /* i */, Try<ItT>&& t) {
994 // Futures can be completed in any order, simultaneously.
995 // To make this non-blocking, we create a new Future chain in
996 // the order of completion to reduce the values.
997 // The spinlock just protects chaining a new Future, not actually
998 // executing the reduce, which should be really fast.
999 folly::MSLGuard lock(ctx->lock_);
1001 ctx->memo_.then([ ctx, mt = std::move(t) ](T && v) mutable {
1002 // Either return a ItT&& or a Try<ItT>&& depending
1003 // on the type of the argument of func.
1004 return ctx->func_(std::move(v),
1005 mt.template get<IsTry::value, Arg&&>());
1007 if (++ctx->numThens_ == ctx->numFutures_) {
1008 // After reducing the value of the last Future, fulfill the Promise
1009 ctx->memo_.setCallback_(
1010 [ctx](Try<T>&& t2) { ctx->promise_.setValue(std::move(t2)); });
1014 return ctx->promise_.getFuture();
1020 Future<T> Future<T>::within(Duration dur, Timekeeper* tk) {
1021 return within(dur, TimedOut(), tk);
1026 Future<T> Future<T>::within(Duration dur, E e, Timekeeper* tk) {
1029 Context(E ex) : exception(std::move(ex)), promise() {}
1031 Future<Unit> thisFuture;
1033 std::atomic<bool> token {false};
1036 std::shared_ptr<Timekeeper> tks;
1038 tks = folly::detail::getTimekeeperSingleton();
1039 tk = DCHECK_NOTNULL(tks.get());
1042 auto ctx = std::make_shared<Context>(std::move(e));
1044 ctx->thisFuture = this->then([ctx](Try<T>&& t) mutable {
1045 // TODO: "this" completed first, cancel "after"
1046 if (ctx->token.exchange(true) == false) {
1047 ctx->promise.setTry(std::move(t));
1051 tk->after(dur).then([ctx](Try<Unit> const& t) mutable {
1052 // "after" completed first, cancel "this"
1053 ctx->thisFuture.raise(TimedOut());
1054 if (ctx->token.exchange(true) == false) {
1055 if (t.hasException()) {
1056 ctx->promise.setException(std::move(t.exception()));
1058 ctx->promise.setException(std::move(ctx->exception));
1063 return ctx->promise.getFuture().via(getExecutor());
1069 Future<T> Future<T>::delayed(Duration dur, Timekeeper* tk) {
1070 return collectAll(*this, futures::sleep(dur, tk))
1071 .then([](std::tuple<Try<T>, Try<Unit>> tup) {
1072 Try<T>& t = std::get<0>(tup);
1073 return makeFuture<T>(std::move(t));
1080 void waitImpl(Future<T>& f) {
1081 // short-circuit if there's nothing to do
1082 if (f.isReady()) return;
1084 FutureBatonType baton;
1085 f.setCallback_([&](const Try<T>& /* t */) { baton.post(); });
1087 assert(f.isReady());
1091 void waitImpl(Future<T>& f, Duration dur) {
1092 // short-circuit if there's nothing to do
1098 auto ret = promise.getFuture();
1099 auto baton = std::make_shared<FutureBatonType>();
1100 f.setCallback_([ baton, promise = std::move(promise) ](Try<T> && t) mutable {
1101 promise.setTry(std::move(t));
1105 if (baton->timed_wait(dur)) {
1106 assert(f.isReady());
1111 void waitViaImpl(Future<T>& f, DrivableExecutor* e) {
1112 // Set callback so to ensure that the via executor has something on it
1113 // so that once the preceding future triggers this callback, drive will
1114 // always have a callback to satisfy it
1117 f = f.via(e).then([](T&& t) { return std::move(t); });
1118 while (!f.isReady()) {
1121 assert(f.isReady());
1127 Future<T>& Future<T>::wait() & {
1128 detail::waitImpl(*this);
1133 Future<T>&& Future<T>::wait() && {
1134 detail::waitImpl(*this);
1135 return std::move(*this);
1139 Future<T>& Future<T>::wait(Duration dur) & {
1140 detail::waitImpl(*this, dur);
1145 Future<T>&& Future<T>::wait(Duration dur) && {
1146 detail::waitImpl(*this, dur);
1147 return std::move(*this);
1151 Future<T>& Future<T>::waitVia(DrivableExecutor* e) & {
1152 detail::waitViaImpl(*this, e);
1157 Future<T>&& Future<T>::waitVia(DrivableExecutor* e) && {
1158 detail::waitViaImpl(*this, e);
1159 return std::move(*this);
1163 T Future<T>::get() {
1164 return std::move(wait().value());
1168 T Future<T>::get(Duration dur) {
1171 return std::move(value());
1178 T Future<T>::getVia(DrivableExecutor* e) {
1179 return std::move(waitVia(e).value());
1185 static bool equals(const Try<T>& t1, const Try<T>& t2) {
1186 return t1.value() == t2.value();
1192 Future<bool> Future<T>::willEqual(Future<T>& f) {
1193 return collectAll(*this, f).then([](const std::tuple<Try<T>, Try<T>>& t) {
1194 if (std::get<0>(t).hasValue() && std::get<1>(t).hasValue()) {
1195 return detail::TryEquals<T>::equals(std::get<0>(t), std::get<1>(t));
1204 Future<T> Future<T>::filter(F&& predicate) {
1205 return this->then([p = std::forward<F>(predicate)](T val) {
1206 T const& valConstRef = val;
1207 if (!p(valConstRef)) {
1208 throw PredicateDoesNotObtain();
1215 inline Future<Unit> when(bool p, F&& thunk) {
1216 return p ? std::forward<F>(thunk)().unit() : makeFuture();
1219 template <class P, class F>
1220 Future<Unit> whileDo(P&& predicate, F&& thunk) {
1222 auto future = thunk();
1223 return future.then([
1224 predicate = std::forward<P>(predicate),
1225 thunk = std::forward<F>(thunk)
1227 return whileDo(std::forward<P>(predicate), std::forward<F>(thunk));
1230 return makeFuture();
1234 Future<Unit> times(const int n, F&& thunk) {
1235 return folly::whileDo(
1236 [ n, count = std::make_unique<std::atomic<int>>(0) ]() mutable {
1237 return count->fetch_add(1) < n;
1239 std::forward<F>(thunk));
1243 template <class It, class F, class ItT, class Result>
1244 std::vector<Future<Result>> map(It first, It last, F func) {
1245 std::vector<Future<Result>> results;
1246 for (auto it = first; it != last; it++) {
1247 results.push_back(it->then(func));
1257 struct retrying_policy_raw_tag {};
1258 struct retrying_policy_fut_tag {};
1260 template <class Policy>
1261 struct retrying_policy_traits {
1262 using result = std::result_of_t<Policy(size_t, const exception_wrapper&)>;
1263 using is_raw = std::is_same<result, bool>;
1264 using is_fut = std::is_same<result, Future<bool>>;
1265 using tag = typename std::conditional<
1266 is_raw::value, retrying_policy_raw_tag, typename std::conditional<
1267 is_fut::value, retrying_policy_fut_tag, void>::type>::type;
1270 template <class Policy, class FF, class Prom>
1271 void retryingImpl(size_t k, Policy&& p, FF&& ff, Prom prom) {
1272 using F = typename std::result_of<FF(size_t)>::type;
1273 using T = typename F::value_type;
1274 auto f = makeFutureWith([&] { return ff(k++); });
1277 prom = std::move(prom),
1278 pm = std::forward<Policy>(p),
1279 ffm = std::forward<FF>(ff)
1280 ](Try<T> && t) mutable {
1282 prom.setValue(std::move(t).value());
1285 auto& x = t.exception();
1289 prom = std::move(prom),
1292 ffm = std::move(ffm)
1293 ](bool shouldRetry) mutable {
1295 retryingImpl(k, std::move(pm), std::move(ffm), std::move(prom));
1297 prom.setException(std::move(xm));
1303 template <class Policy, class FF>
1304 typename std::result_of<FF(size_t)>::type
1305 retrying(size_t k, Policy&& p, FF&& ff) {
1306 using F = typename std::result_of<FF(size_t)>::type;
1307 using T = typename F::value_type;
1308 auto prom = Promise<T>();
1309 auto f = prom.getFuture();
1311 k, std::forward<Policy>(p), std::forward<FF>(ff), std::move(prom));
1315 template <class Policy, class FF>
1316 typename std::result_of<FF(size_t)>::type
1317 retrying(Policy&& p, FF&& ff, retrying_policy_raw_tag) {
1318 auto q = [pm = std::forward<Policy>(p)](size_t k, exception_wrapper x) {
1319 return makeFuture<bool>(pm(k, x));
1321 return retrying(0, std::move(q), std::forward<FF>(ff));
1324 template <class Policy, class FF>
1325 typename std::result_of<FF(size_t)>::type
1326 retrying(Policy&& p, FF&& ff, retrying_policy_fut_tag) {
1327 return retrying(0, std::forward<Policy>(p), std::forward<FF>(ff));
1330 // jittered exponential backoff, clamped to [backoff_min, backoff_max]
1331 template <class URNG>
1332 Duration retryingJitteredExponentialBackoffDur(
1334 Duration backoff_min,
1335 Duration backoff_max,
1336 double jitter_param,
1339 auto dist = std::normal_distribution<double>(0.0, jitter_param);
1340 auto jitter = std::exp(dist(rng));
1341 auto backoff = d(d::rep(jitter * backoff_min.count() * std::pow(2, n - 1)));
1342 return std::max(backoff_min, std::min(backoff_max, backoff));
1345 template <class Policy, class URNG>
1346 std::function<Future<bool>(size_t, const exception_wrapper&)>
1347 retryingPolicyCappedJitteredExponentialBackoff(
1349 Duration backoff_min,
1350 Duration backoff_max,
1351 double jitter_param,
1355 pm = std::forward<Policy>(p),
1360 rngp = std::forward<URNG>(rng)
1361 ](size_t n, const exception_wrapper& ex) mutable {
1362 if (n == max_tries) {
1363 return makeFuture(false);
1365 return pm(n, ex).then(
1366 [ n, backoff_min, backoff_max, jitter_param, rngp = std::move(rngp) ](
1369 return makeFuture(false);
1371 auto backoff = detail::retryingJitteredExponentialBackoffDur(
1372 n, backoff_min, backoff_max, jitter_param, rngp);
1373 return futures::sleep(backoff).then([] { return true; });
1378 template <class Policy, class URNG>
1379 std::function<Future<bool>(size_t, const exception_wrapper&)>
1380 retryingPolicyCappedJitteredExponentialBackoff(
1382 Duration backoff_min,
1383 Duration backoff_max,
1384 double jitter_param,
1387 retrying_policy_raw_tag) {
1388 auto q = [pm = std::forward<Policy>(p)](
1389 size_t n, const exception_wrapper& e) {
1390 return makeFuture(pm(n, e));
1392 return retryingPolicyCappedJitteredExponentialBackoff(
1397 std::forward<URNG>(rng),
1401 template <class Policy, class URNG>
1402 std::function<Future<bool>(size_t, const exception_wrapper&)>
1403 retryingPolicyCappedJitteredExponentialBackoff(
1405 Duration backoff_min,
1406 Duration backoff_max,
1407 double jitter_param,
1410 retrying_policy_fut_tag) {
1411 return retryingPolicyCappedJitteredExponentialBackoff(
1416 std::forward<URNG>(rng),
1417 std::forward<Policy>(p));
1421 template <class Policy, class FF>
1422 typename std::result_of<FF(size_t)>::type
1423 retrying(Policy&& p, FF&& ff) {
1424 using tag = typename detail::retrying_policy_traits<Policy>::tag;
1425 return detail::retrying(std::forward<Policy>(p), std::forward<FF>(ff), tag());
1429 std::function<bool(size_t, const exception_wrapper&)>
1430 retryingPolicyBasic(
1432 return [=](size_t n, const exception_wrapper&) { return n < max_tries; };
1435 template <class Policy, class URNG>
1436 std::function<Future<bool>(size_t, const exception_wrapper&)>
1437 retryingPolicyCappedJitteredExponentialBackoff(
1439 Duration backoff_min,
1440 Duration backoff_max,
1441 double jitter_param,
1444 using tag = typename detail::retrying_policy_traits<Policy>::tag;
1445 return detail::retryingPolicyCappedJitteredExponentialBackoff(
1450 std::forward<URNG>(rng),
1451 std::forward<Policy>(p),
1456 std::function<Future<bool>(size_t, const exception_wrapper&)>
1457 retryingPolicyCappedJitteredExponentialBackoff(
1459 Duration backoff_min,
1460 Duration backoff_max,
1461 double jitter_param) {
1462 auto p = [](size_t, const exception_wrapper&) { return true; };
1463 return retryingPolicyCappedJitteredExponentialBackoff(
1474 // Instantiate the most common Future types to save compile time
1475 extern template class Future<Unit>;
1476 extern template class Future<bool>;
1477 extern template class Future<int>;
1478 extern template class Future<int64_t>;
1479 extern template class Future<std::string>;
1480 extern template class Future<double>;
1482 } // namespace folly