2 * Copyright 2017 Facebook, Inc.
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
8 * http://www.apache.org/licenses/LICENSE-2.0
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
24 #include <folly/Baton.h>
25 #include <folly/Optional.h>
26 #include <folly/Random.h>
27 #include <folly/Traits.h>
28 #include <folly/futures/detail/Core.h>
29 #include <folly/futures/Timekeeper.h>
31 #if FOLLY_MOBILE || defined(__APPLE__)
32 #define FOLLY_FUTURE_USING_FIBER 0
34 #define FOLLY_FUTURE_USING_FIBER 1
35 #include <folly/fibers/Baton.h>
43 #if FOLLY_FUTURE_USING_FIBER
44 typedef folly::fibers::Baton FutureBatonType;
46 typedef folly::Baton<> FutureBatonType;
51 std::shared_ptr<Timekeeper> getTimekeeperSingleton();
53 // Guarantees that the stored functor is destructed before the stored promise
54 // may be fulfilled. Assumes the stored functor to be noexcept-destructible.
55 template <typename T, typename F>
56 class CoreCallbackState {
58 template <typename FF>
59 CoreCallbackState(Promise<T>&& promise, FF&& func) noexcept(
60 noexcept(F(std::declval<FF>())))
61 : func_(std::forward<FF>(func)), promise_(std::move(promise)) {
62 assert(before_barrier());
65 CoreCallbackState(CoreCallbackState&& that) noexcept(
66 noexcept(F(std::declval<F>()))) {
67 if (that.before_barrier()) {
68 new (&func_) F(std::move(that.func_));
69 promise_ = that.stealPromise();
73 CoreCallbackState& operator=(CoreCallbackState&&) = delete;
75 ~CoreCallbackState() {
76 if (before_barrier()) {
81 template <typename... Args>
82 auto invoke(Args&&... args) noexcept(
83 noexcept(std::declval<F&&>()(std::declval<Args&&>()...))) {
84 assert(before_barrier());
85 return std::move(func_)(std::forward<Args>(args)...);
88 template <typename... Args>
89 auto tryInvoke(Args&&... args) noexcept {
90 return makeTryWith([&] { return invoke(std::forward<Args>(args)...); });
93 void setTry(Try<T>&& t) {
94 stealPromise().setTry(std::move(t));
97 void setException(exception_wrapper&& ew) {
98 stealPromise().setException(std::move(ew));
101 Promise<T> stealPromise() noexcept {
102 assert(before_barrier());
104 return std::move(promise_);
108 bool before_barrier() const noexcept {
109 return !promise_.isFulfilled();
115 Promise<T> promise_{detail::EmptyConstruct{}};
118 template <typename T, typename F>
119 inline auto makeCoreCallbackState(Promise<T>&& p, F&& f) noexcept(
120 noexcept(CoreCallbackState<T, _t<std::decay<F>>>(
121 std::declval<Promise<T>&&>(),
122 std::declval<F&&>()))) {
123 return CoreCallbackState<T, _t<std::decay<F>>>(
124 std::move(p), std::forward<F>(f));
129 Future<T>::Future(Future<T>&& other) noexcept : core_(other.core_) {
130 other.core_ = nullptr;
134 Future<T>& Future<T>::operator=(Future<T>&& other) noexcept {
135 std::swap(core_, other.core_);
140 template <class T2, typename>
141 Future<T>::Future(T2&& val)
142 : core_(new detail::Core<T>(Try<T>(std::forward<T2>(val)))) {}
145 template <typename T2>
146 Future<T>::Future(typename std::enable_if<std::is_same<Unit, T2>::value>::type*)
147 : core_(new detail::Core<T>(Try<T>(T()))) {}
150 Future<T>::~Future() {
155 void Future<T>::detach() {
157 core_->detachFuture();
163 void Future<T>::throwIfInvalid() const {
170 void Future<T>::setCallback_(F&& func) {
172 core_->setCallback(std::forward<F>(func));
179 typename std::enable_if<isFuture<F>::value,
180 Future<typename isFuture<T>::Inner>>::type
181 Future<T>::unwrap() {
182 return then([](Future<typename isFuture<T>::Inner> internal_future) {
183 return internal_future;
189 // Variant: returns a value
190 // e.g. f.then([](Try<T>&& t){ return t.value(); });
192 template <typename F, typename R, bool isTry, typename... Args>
193 typename std::enable_if<!R::ReturnsFuture::value, typename R::Return>::type
194 Future<T>::thenImplementation(F&& func, detail::argResult<isTry, F, Args...>) {
195 static_assert(sizeof...(Args) <= 1, "Then must take zero/one argument");
196 typedef typename R::ReturnsFuture::Inner B;
201 p.core_->setInterruptHandlerNoLock(core_->getInterruptHandler());
203 // grab the Future now before we lose our handle on the Promise
204 auto f = p.getFuture();
205 f.core_->setExecutorNoLock(getExecutor());
207 /* This is a bit tricky.
209 We can't just close over *this in case this Future gets moved. So we
210 make a new dummy Future. We could figure out something more
211 sophisticated that avoids making a new Future object when it can, as an
212 optimization. But this is correct.
214 core_ can't be moved, it is explicitly disallowed (as is copying). But
215 if there's ever a reason to allow it, this is one place that makes that
216 assumption and would need to be fixed. We use a standard shared pointer
217 for core_ (by copying it in), which means in essence obj holds a shared
218 pointer to itself. But this shouldn't leak because Promise will not
219 outlive the continuation, because Promise will setException() with a
220 broken Promise if it is destructed before completed. We could use a
221 weak pointer but it would have to be converted to a shared pointer when
222 func is executed (because the Future returned by func may possibly
223 persist beyond the callback, if it gets moved), and so it is an
224 optimization to just make it shared from the get-go.
226 Two subtle but important points about this design. detail::Core has no
227 back pointers to Future or Promise, so if Future or Promise get moved
228 (and they will be moved in performant code) we don't have to do
229 anything fancy. And because we store the continuation in the
230 detail::Core, not in the Future, we can execute the continuation even
231 after the Future has gone out of scope. This is an intentional design
232 decision. It is likely we will want to be able to cancel a continuation
233 in some circumstances, but I think it should be explicit not implicit
234 in the destruction of the Future used to create it.
237 [state = detail::makeCoreCallbackState(
238 std::move(p), std::forward<F>(func))](Try<T> && t) mutable {
239 if (!isTry && t.hasException()) {
240 state.setException(std::move(t.exception()));
242 state.setTry(makeTryWith(
243 [&] { return state.invoke(t.template get<isTry, Args>()...); }));
250 // Variant: returns a Future
251 // e.g. f.then([](T&& t){ return makeFuture<T>(t); });
253 template <typename F, typename R, bool isTry, typename... Args>
254 typename std::enable_if<R::ReturnsFuture::value, typename R::Return>::type
255 Future<T>::thenImplementation(F&& func, detail::argResult<isTry, F, Args...>) {
256 static_assert(sizeof...(Args) <= 1, "Then must take zero/one argument");
257 typedef typename R::ReturnsFuture::Inner B;
262 p.core_->setInterruptHandlerNoLock(core_->getInterruptHandler());
264 // grab the Future now before we lose our handle on the Promise
265 auto f = p.getFuture();
266 f.core_->setExecutorNoLock(getExecutor());
269 [state = detail::makeCoreCallbackState(
270 std::move(p), std::forward<F>(func))](Try<T> && t) mutable {
271 if (!isTry && t.hasException()) {
272 state.setException(std::move(t.exception()));
274 auto tf2 = state.tryInvoke(t.template get<isTry, Args>()...);
275 if (tf2.hasException()) {
276 state.setException(std::move(tf2.exception()));
278 tf2->setCallback_([p = state.stealPromise()](Try<B> && b) mutable {
279 p.setTry(std::move(b));
288 template <typename T>
289 template <typename R, typename Caller, typename... Args>
290 Future<typename isFuture<R>::Inner>
291 Future<T>::then(R(Caller::*func)(Args...), Caller *instance) {
292 typedef typename std::remove_cv<
293 typename std::remove_reference<
294 typename detail::ArgType<Args...>::FirstArg>::type>::type FirstArg;
295 return then([instance, func](Try<T>&& t){
296 return (instance->*func)(t.template get<isTry<FirstArg>::value, Args>()...);
301 template <class Executor, class Arg, class... Args>
302 auto Future<T>::then(Executor* x, Arg&& arg, Args&&... args)
303 -> decltype(this->then(std::forward<Arg>(arg),
304 std::forward<Args>(args)...))
306 auto oldX = getExecutor();
308 return this->then(std::forward<Arg>(arg), std::forward<Args>(args)...).
313 Future<Unit> Future<T>::then() {
314 return then([] () {});
317 // onError where the callback returns T
320 typename std::enable_if<
321 !detail::callableWith<F, exception_wrapper>::value &&
322 !detail::Extract<F>::ReturnsFuture::value,
324 Future<T>::onError(F&& func) {
325 typedef std::remove_reference_t<typename detail::Extract<F>::FirstArg> Exn;
327 std::is_same<typename detail::Extract<F>::RawReturn, T>::value,
328 "Return type of onError callback must be T or Future<T>");
331 p.core_->setInterruptHandlerNoLock(core_->getInterruptHandler());
332 auto f = p.getFuture();
335 [state = detail::makeCoreCallbackState(
336 std::move(p), std::forward<F>(func))](Try<T> && t) mutable {
337 if (auto e = t.template tryGetExceptionObject<Exn>()) {
338 state.setTry(makeTryWith([&] { return state.invoke(*e); }));
340 state.setTry(std::move(t));
347 // onError where the callback returns Future<T>
350 typename std::enable_if<
351 !detail::callableWith<F, exception_wrapper>::value &&
352 detail::Extract<F>::ReturnsFuture::value,
354 Future<T>::onError(F&& func) {
356 std::is_same<typename detail::Extract<F>::Return, Future<T>>::value,
357 "Return type of onError callback must be T or Future<T>");
358 typedef std::remove_reference_t<typename detail::Extract<F>::FirstArg> Exn;
361 auto f = p.getFuture();
364 [state = detail::makeCoreCallbackState(
365 std::move(p), std::forward<F>(func))](Try<T> && t) mutable {
366 if (auto e = t.template tryGetExceptionObject<Exn>()) {
367 auto tf2 = state.tryInvoke(*e);
368 if (tf2.hasException()) {
369 state.setException(std::move(tf2.exception()));
371 tf2->setCallback_([p = state.stealPromise()](Try<T> && t3) mutable {
372 p.setTry(std::move(t3));
376 state.setTry(std::move(t));
385 Future<T> Future<T>::ensure(F&& func) {
386 return this->then([funcw = std::forward<F>(func)](Try<T> && t) mutable {
388 return makeFuture(std::move(t));
394 Future<T> Future<T>::onTimeout(Duration dur, F&& func, Timekeeper* tk) {
395 return within(dur, tk).onError([funcw = std::forward<F>(func)](
396 TimedOut const&) { return std::move(funcw)(); });
401 typename std::enable_if<detail::callableWith<F, exception_wrapper>::value &&
402 detail::Extract<F>::ReturnsFuture::value,
404 Future<T>::onError(F&& func) {
406 std::is_same<typename detail::Extract<F>::Return, Future<T>>::value,
407 "Return type of onError callback must be T or Future<T>");
410 auto f = p.getFuture();
412 [state = detail::makeCoreCallbackState(
413 std::move(p), std::forward<F>(func))](Try<T> t) mutable {
414 if (t.hasException()) {
415 auto tf2 = state.tryInvoke(std::move(t.exception()));
416 if (tf2.hasException()) {
417 state.setException(std::move(tf2.exception()));
419 tf2->setCallback_([p = state.stealPromise()](Try<T> t3) mutable {
420 p.setTry(std::move(t3));
424 state.setTry(std::move(t));
431 // onError(exception_wrapper) that returns T
434 typename std::enable_if<
435 detail::callableWith<F, exception_wrapper>::value &&
436 !detail::Extract<F>::ReturnsFuture::value,
438 Future<T>::onError(F&& func) {
440 std::is_same<typename detail::Extract<F>::Return, Future<T>>::value,
441 "Return type of onError callback must be T or Future<T>");
444 auto f = p.getFuture();
446 [state = detail::makeCoreCallbackState(
447 std::move(p), std::forward<F>(func))](Try<T> t) mutable {
448 if (t.hasException()) {
449 state.setTry(makeTryWith(
450 [&] { return state.invoke(std::move(t.exception())); }));
452 state.setTry(std::move(t));
460 typename std::add_lvalue_reference<T>::type Future<T>::value() {
463 return core_->getTry().value();
467 typename std::add_lvalue_reference<const T>::type Future<T>::value() const {
470 return core_->getTry().value();
474 Try<T>& Future<T>::getTry() {
477 return core_->getTry();
481 Try<T>& Future<T>::getTryVia(DrivableExecutor* e) {
482 return waitVia(e).getTry();
486 Optional<Try<T>> Future<T>::poll() {
488 if (core_->ready()) {
489 o = std::move(core_->getTry());
495 inline Future<T> Future<T>::via(Executor* executor, int8_t priority) && {
498 setExecutor(executor, priority);
500 return std::move(*this);
504 inline Future<T> Future<T>::via(Executor* executor, int8_t priority) & {
508 auto f = p.getFuture();
509 then([p = std::move(p)](Try<T> && t) mutable { p.setTry(std::move(t)); });
510 return std::move(f).via(executor, priority);
513 template <class Func>
514 auto via(Executor* x, Func&& func)
515 -> Future<typename isFuture<decltype(std::declval<Func>()())>::Inner> {
516 // TODO make this actually more performant. :-P #7260175
517 return via(x).then(std::forward<Func>(func));
521 bool Future<T>::isReady() const {
523 return core_->ready();
527 bool Future<T>::hasValue() {
528 return getTry().hasValue();
532 bool Future<T>::hasException() {
533 return getTry().hasException();
537 void Future<T>::raise(exception_wrapper exception) {
538 core_->raise(std::move(exception));
544 Future<typename std::decay<T>::type> makeFuture(T&& t) {
545 return makeFuture(Try<typename std::decay<T>::type>(std::forward<T>(t)));
548 inline // for multiple translation units
549 Future<Unit> makeFuture() {
550 return makeFuture(Unit{});
553 // makeFutureWith(Future<T>()) -> Future<T>
555 typename std::enable_if<isFuture<typename std::result_of<F()>::type>::value,
556 typename std::result_of<F()>::type>::type
557 makeFutureWith(F&& func) {
559 typename isFuture<typename std::result_of<F()>::type>::Inner;
561 return std::forward<F>(func)();
562 } catch (std::exception& e) {
563 return makeFuture<InnerType>(
564 exception_wrapper(std::current_exception(), e));
566 return makeFuture<InnerType>(exception_wrapper(std::current_exception()));
570 // makeFutureWith(T()) -> Future<T>
571 // makeFutureWith(void()) -> Future<Unit>
573 typename std::enable_if<
574 !(isFuture<typename std::result_of<F()>::type>::value),
575 Future<typename Unit::Lift<typename std::result_of<F()>::type>::type>>::type
576 makeFutureWith(F&& func) {
578 typename Unit::Lift<typename std::result_of<F()>::type>::type;
579 return makeFuture<LiftedResult>(
580 makeTryWith([&func]() mutable { return std::forward<F>(func)(); }));
584 Future<T> makeFuture(std::exception_ptr const& e) {
585 return makeFuture(Try<T>(e));
589 Future<T> makeFuture(exception_wrapper ew) {
590 return makeFuture(Try<T>(std::move(ew)));
593 template <class T, class E>
594 typename std::enable_if<std::is_base_of<std::exception, E>::value,
596 makeFuture(E const& e) {
597 return makeFuture(Try<T>(make_exception_wrapper<E>(e)));
601 Future<T> makeFuture(Try<T>&& t) {
602 return Future<T>(new detail::Core<T>(std::move(t)));
606 Future<Unit> via(Executor* executor, int8_t priority) {
607 return makeFuture().via(executor, priority);
610 // mapSetCallback calls func(i, Try<T>) when every future completes
612 template <class T, class InputIterator, class F>
613 void mapSetCallback(InputIterator first, InputIterator last, F func) {
614 for (size_t i = 0; first != last; ++first, ++i) {
615 first->setCallback_([func, i](Try<T>&& t) {
616 func(i, std::move(t));
621 // collectAll (variadic)
623 template <typename... Fs>
624 typename detail::CollectAllVariadicContext<
625 typename std::decay<Fs>::type::value_type...>::type
626 collectAll(Fs&&... fs) {
627 auto ctx = std::make_shared<detail::CollectAllVariadicContext<
628 typename std::decay<Fs>::type::value_type...>>();
629 detail::collectVariadicHelper<detail::CollectAllVariadicContext>(
630 ctx, std::forward<Fs>(fs)...);
631 return ctx->p.getFuture();
634 // collectAll (iterator)
636 template <class InputIterator>
639 Try<typename std::iterator_traits<InputIterator>::value_type::value_type>>>
640 collectAll(InputIterator first, InputIterator last) {
642 typename std::iterator_traits<InputIterator>::value_type::value_type T;
644 struct CollectAllContext {
645 CollectAllContext(size_t n) : results(n) {}
646 ~CollectAllContext() {
647 p.setValue(std::move(results));
649 Promise<std::vector<Try<T>>> p;
650 std::vector<Try<T>> results;
654 std::make_shared<CollectAllContext>(size_t(std::distance(first, last)));
655 mapSetCallback<T>(first, last, [ctx](size_t i, Try<T>&& t) {
656 ctx->results[i] = std::move(t);
658 return ctx->p.getFuture();
661 // collect (iterator)
665 template <typename T>
666 struct CollectContext {
668 explicit Nothing(int /* n */) {}
671 using Result = typename std::conditional<
672 std::is_void<T>::value,
674 std::vector<T>>::type;
676 using InternalResult = typename std::conditional<
677 std::is_void<T>::value,
679 std::vector<Optional<T>>>::type;
681 explicit CollectContext(size_t n) : result(n) {}
683 if (!threw.exchange(true)) {
684 // map Optional<T> -> T
685 std::vector<T> finalResult;
686 finalResult.reserve(result.size());
687 std::transform(result.begin(), result.end(),
688 std::back_inserter(finalResult),
689 [](Optional<T>& o) { return std::move(o.value()); });
690 p.setValue(std::move(finalResult));
693 inline void setPartialResult(size_t i, Try<T>& t) {
694 result[i] = std::move(t.value());
697 InternalResult result;
698 std::atomic<bool> threw {false};
703 template <class InputIterator>
704 Future<typename detail::CollectContext<
705 typename std::iterator_traits<InputIterator>::value_type::value_type>::Result>
706 collect(InputIterator first, InputIterator last) {
708 typename std::iterator_traits<InputIterator>::value_type::value_type T;
710 auto ctx = std::make_shared<detail::CollectContext<T>>(
711 std::distance(first, last));
712 mapSetCallback<T>(first, last, [ctx](size_t i, Try<T>&& t) {
713 if (t.hasException()) {
714 if (!ctx->threw.exchange(true)) {
715 ctx->p.setException(std::move(t.exception()));
717 } else if (!ctx->threw) {
718 ctx->setPartialResult(i, t);
721 return ctx->p.getFuture();
724 // collect (variadic)
726 template <typename... Fs>
727 typename detail::CollectVariadicContext<
728 typename std::decay<Fs>::type::value_type...>::type
729 collect(Fs&&... fs) {
730 auto ctx = std::make_shared<detail::CollectVariadicContext<
731 typename std::decay<Fs>::type::value_type...>>();
732 detail::collectVariadicHelper<detail::CollectVariadicContext>(
733 ctx, std::forward<Fs>(fs)...);
734 return ctx->p.getFuture();
737 // collectAny (iterator)
739 template <class InputIterator>
744 std::iterator_traits<InputIterator>::value_type::value_type>>>
745 collectAny(InputIterator first, InputIterator last) {
747 typename std::iterator_traits<InputIterator>::value_type::value_type T;
749 struct CollectAnyContext {
750 CollectAnyContext() {}
751 Promise<std::pair<size_t, Try<T>>> p;
752 std::atomic<bool> done {false};
755 auto ctx = std::make_shared<CollectAnyContext>();
756 mapSetCallback<T>(first, last, [ctx](size_t i, Try<T>&& t) {
757 if (!ctx->done.exchange(true)) {
758 ctx->p.setValue(std::make_pair(i, std::move(t)));
761 return ctx->p.getFuture();
764 // collectAnyWithoutException (iterator)
766 template <class InputIterator>
769 typename std::iterator_traits<InputIterator>::value_type::value_type>>
770 collectAnyWithoutException(InputIterator first, InputIterator last) {
772 typename std::iterator_traits<InputIterator>::value_type::value_type T;
774 struct CollectAnyWithoutExceptionContext {
775 CollectAnyWithoutExceptionContext(){}
776 Promise<std::pair<size_t, T>> p;
777 std::atomic<bool> done{false};
778 std::atomic<size_t> nFulfilled{0};
782 auto ctx = std::make_shared<CollectAnyWithoutExceptionContext>();
783 ctx->nTotal = size_t(std::distance(first, last));
785 mapSetCallback<T>(first, last, [ctx](size_t i, Try<T>&& t) {
786 if (!t.hasException() && !ctx->done.exchange(true)) {
787 ctx->p.setValue(std::make_pair(i, std::move(t.value())));
788 } else if (++ctx->nFulfilled == ctx->nTotal) {
789 ctx->p.setException(t.exception());
792 return ctx->p.getFuture();
795 // collectN (iterator)
797 template <class InputIterator>
798 Future<std::vector<std::pair<size_t, Try<typename
799 std::iterator_traits<InputIterator>::value_type::value_type>>>>
800 collectN(InputIterator first, InputIterator last, size_t n) {
802 std::iterator_traits<InputIterator>::value_type::value_type T;
803 typedef std::vector<std::pair<size_t, Try<T>>> V;
805 struct CollectNContext {
807 std::atomic<size_t> completed = {0};
810 auto ctx = std::make_shared<CollectNContext>();
812 if (size_t(std::distance(first, last)) < n) {
813 ctx->p.setException(std::runtime_error("Not enough futures"));
815 // for each completed Future, increase count and add to vector, until we
816 // have n completed futures at which point we fulfil our Promise with the
818 mapSetCallback<T>(first, last, [ctx, n](size_t i, Try<T>&& t) {
819 auto c = ++ctx->completed;
821 assert(ctx->v.size() < n);
822 ctx->v.emplace_back(i, std::move(t));
824 ctx->p.setTry(Try<V>(std::move(ctx->v)));
830 return ctx->p.getFuture();
835 template <class It, class T, class F>
836 Future<T> reduce(It first, It last, T&& initial, F&& func) {
838 return makeFuture(std::move(initial));
841 typedef typename std::iterator_traits<It>::value_type::value_type ItT;
843 typename std::conditional<detail::callableWith<F, T&&, Try<ItT>&&>::value,
846 typedef isTry<Arg> IsTry;
848 auto sfunc = std::make_shared<F>(std::move(func));
850 auto f = first->then(
851 [ minitial = std::move(initial), sfunc ](Try<ItT> & head) mutable {
853 std::move(minitial), head.template get<IsTry::value, Arg&&>());
856 for (++first; first != last; ++first) {
857 f = collectAll(f, *first).then([sfunc](std::tuple<Try<T>, Try<ItT>>& t) {
858 return (*sfunc)(std::move(std::get<0>(t).value()),
859 // Either return a ItT&& or a Try<ItT>&& depending
860 // on the type of the argument of func.
861 std::get<1>(t).template get<IsTry::value, Arg&&>());
868 // window (collection)
870 template <class Collection, class F, class ItT, class Result>
871 std::vector<Future<Result>>
872 window(Collection input, F func, size_t n) {
873 struct WindowContext {
874 WindowContext(Collection&& i, F&& fn)
875 : input_(std::move(i)), promises_(input_.size()),
878 std::atomic<size_t> i_ {0};
880 std::vector<Promise<Result>> promises_;
883 static inline void spawn(const std::shared_ptr<WindowContext>& ctx) {
884 size_t i = ctx->i_++;
885 if (i < ctx->input_.size()) {
886 // Using setCallback_ directly since we don't need the Future
887 ctx->func_(std::move(ctx->input_[i])).setCallback_(
888 // ctx is captured by value
889 [ctx, i](Try<Result>&& t) {
890 ctx->promises_[i].setTry(std::move(t));
891 // Chain another future onto this one
892 spawn(std::move(ctx));
898 auto max = std::min(n, input.size());
900 auto ctx = std::make_shared<WindowContext>(
901 std::move(input), std::move(func));
903 for (size_t i = 0; i < max; ++i) {
904 // Start the first n Futures
905 WindowContext::spawn(ctx);
908 std::vector<Future<Result>> futures;
909 futures.reserve(ctx->promises_.size());
910 for (auto& promise : ctx->promises_) {
911 futures.emplace_back(promise.getFuture());
920 template <class I, class F>
921 Future<I> Future<T>::reduce(I&& initial, F&& func) {
923 minitial = std::forward<I>(initial),
924 mfunc = std::forward<F>(func)
926 auto ret = std::move(minitial);
927 for (auto& val : vals) {
928 ret = mfunc(std::move(ret), std::move(val));
934 // unorderedReduce (iterator)
936 template <class It, class T, class F, class ItT, class Arg>
937 Future<T> unorderedReduce(It first, It last, T initial, F func) {
939 return makeFuture(std::move(initial));
942 typedef isTry<Arg> IsTry;
944 struct UnorderedReduceContext {
945 UnorderedReduceContext(T&& memo, F&& fn, size_t n)
946 : lock_(), memo_(makeFuture<T>(std::move(memo))),
947 func_(std::move(fn)), numThens_(0), numFutures_(n), promise_()
949 folly::MicroSpinLock lock_; // protects memo_ and numThens_
952 size_t numThens_; // how many Futures completed and called .then()
953 size_t numFutures_; // how many Futures in total
957 auto ctx = std::make_shared<UnorderedReduceContext>(
958 std::move(initial), std::move(func), std::distance(first, last));
963 [ctx](size_t /* i */, Try<ItT>&& t) {
964 // Futures can be completed in any order, simultaneously.
965 // To make this non-blocking, we create a new Future chain in
966 // the order of completion to reduce the values.
967 // The spinlock just protects chaining a new Future, not actually
968 // executing the reduce, which should be really fast.
969 folly::MSLGuard lock(ctx->lock_);
971 ctx->memo_.then([ ctx, mt = std::move(t) ](T && v) mutable {
972 // Either return a ItT&& or a Try<ItT>&& depending
973 // on the type of the argument of func.
974 return ctx->func_(std::move(v),
975 mt.template get<IsTry::value, Arg&&>());
977 if (++ctx->numThens_ == ctx->numFutures_) {
978 // After reducing the value of the last Future, fulfill the Promise
979 ctx->memo_.setCallback_(
980 [ctx](Try<T>&& t2) { ctx->promise_.setValue(std::move(t2)); });
984 return ctx->promise_.getFuture();
990 Future<T> Future<T>::within(Duration dur, Timekeeper* tk) {
991 return within(dur, TimedOut(), tk);
996 Future<T> Future<T>::within(Duration dur, E e, Timekeeper* tk) {
999 Context(E ex) : exception(std::move(ex)), promise() {}
1001 Future<Unit> thisFuture;
1003 std::atomic<bool> token {false};
1006 std::shared_ptr<Timekeeper> tks;
1008 tks = folly::detail::getTimekeeperSingleton();
1009 tk = DCHECK_NOTNULL(tks.get());
1012 auto ctx = std::make_shared<Context>(std::move(e));
1014 ctx->thisFuture = this->then([ctx](Try<T>&& t) mutable {
1015 // TODO: "this" completed first, cancel "after"
1016 if (ctx->token.exchange(true) == false) {
1017 ctx->promise.setTry(std::move(t));
1021 tk->after(dur).then([ctx](Try<Unit> const& t) mutable {
1022 // "after" completed first, cancel "this"
1023 ctx->thisFuture.raise(TimedOut());
1024 if (ctx->token.exchange(true) == false) {
1025 if (t.hasException()) {
1026 ctx->promise.setException(std::move(t.exception()));
1028 ctx->promise.setException(std::move(ctx->exception));
1033 return ctx->promise.getFuture().via(getExecutor());
1039 Future<T> Future<T>::delayed(Duration dur, Timekeeper* tk) {
1040 return collectAll(*this, futures::sleep(dur, tk))
1041 .then([](std::tuple<Try<T>, Try<Unit>> tup) {
1042 Try<T>& t = std::get<0>(tup);
1043 return makeFuture<T>(std::move(t));
1050 void waitImpl(Future<T>& f) {
1051 // short-circuit if there's nothing to do
1052 if (f.isReady()) return;
1054 FutureBatonType baton;
1055 f.setCallback_([&](const Try<T>& /* t */) { baton.post(); });
1057 assert(f.isReady());
1061 void waitImpl(Future<T>& f, Duration dur) {
1062 // short-circuit if there's nothing to do
1068 auto ret = promise.getFuture();
1069 auto baton = std::make_shared<FutureBatonType>();
1070 f.setCallback_([ baton, promise = std::move(promise) ](Try<T> && t) mutable {
1071 promise.setTry(std::move(t));
1075 if (baton->timed_wait(dur)) {
1076 assert(f.isReady());
1081 void waitViaImpl(Future<T>& f, DrivableExecutor* e) {
1082 // Set callback so to ensure that the via executor has something on it
1083 // so that once the preceding future triggers this callback, drive will
1084 // always have a callback to satisfy it
1087 f = f.via(e).then([](T&& t) { return std::move(t); });
1088 while (!f.isReady()) {
1091 assert(f.isReady());
1097 Future<T>& Future<T>::wait() & {
1098 detail::waitImpl(*this);
1103 Future<T>&& Future<T>::wait() && {
1104 detail::waitImpl(*this);
1105 return std::move(*this);
1109 Future<T>& Future<T>::wait(Duration dur) & {
1110 detail::waitImpl(*this, dur);
1115 Future<T>&& Future<T>::wait(Duration dur) && {
1116 detail::waitImpl(*this, dur);
1117 return std::move(*this);
1121 Future<T>& Future<T>::waitVia(DrivableExecutor* e) & {
1122 detail::waitViaImpl(*this, e);
1127 Future<T>&& Future<T>::waitVia(DrivableExecutor* e) && {
1128 detail::waitViaImpl(*this, e);
1129 return std::move(*this);
1133 T Future<T>::get() {
1134 return std::move(wait().value());
1138 T Future<T>::get(Duration dur) {
1141 return std::move(value());
1148 T Future<T>::getVia(DrivableExecutor* e) {
1149 return std::move(waitVia(e).value());
1155 static bool equals(const Try<T>& t1, const Try<T>& t2) {
1156 return t1.value() == t2.value();
1162 Future<bool> Future<T>::willEqual(Future<T>& f) {
1163 return collectAll(*this, f).then([](const std::tuple<Try<T>, Try<T>>& t) {
1164 if (std::get<0>(t).hasValue() && std::get<1>(t).hasValue()) {
1165 return detail::TryEquals<T>::equals(std::get<0>(t), std::get<1>(t));
1174 Future<T> Future<T>::filter(F&& predicate) {
1175 return this->then([p = std::forward<F>(predicate)](T val) {
1176 T const& valConstRef = val;
1177 if (!p(valConstRef)) {
1178 throw PredicateDoesNotObtain();
1185 template <class Callback>
1186 auto Future<T>::thenMulti(Callback&& fn)
1187 -> decltype(this->then(std::forward<Callback>(fn))) {
1188 // thenMulti with one callback is just a then
1189 return then(std::forward<Callback>(fn));
1193 template <class Callback, class... Callbacks>
1194 auto Future<T>::thenMulti(Callback&& fn, Callbacks&&... fns)
1195 -> decltype(this->then(std::forward<Callback>(fn)).
1196 thenMulti(std::forward<Callbacks>(fns)...)) {
1197 // thenMulti with two callbacks is just then(a).thenMulti(b, ...)
1198 return then(std::forward<Callback>(fn)).
1199 thenMulti(std::forward<Callbacks>(fns)...);
1203 template <class Callback, class... Callbacks>
1204 auto Future<T>::thenMultiWithExecutor(Executor* x, Callback&& fn,
1206 -> decltype(this->then(std::forward<Callback>(fn)).
1207 thenMulti(std::forward<Callbacks>(fns)...)) {
1208 // thenMultiExecutor with two callbacks is
1209 // via(x).then(a).thenMulti(b, ...).via(oldX)
1210 auto oldX = getExecutor();
1212 return then(std::forward<Callback>(fn)).
1213 thenMulti(std::forward<Callbacks>(fns)...).via(oldX);
1217 template <class Callback>
1218 auto Future<T>::thenMultiWithExecutor(Executor* x, Callback&& fn)
1219 -> decltype(this->then(std::forward<Callback>(fn))) {
1220 // thenMulti with one callback is just a then with an executor
1221 return then(x, std::forward<Callback>(fn));
1225 inline Future<Unit> when(bool p, F&& thunk) {
1226 return p ? std::forward<F>(thunk)().unit() : makeFuture();
1229 template <class P, class F>
1230 Future<Unit> whileDo(P&& predicate, F&& thunk) {
1232 auto future = thunk();
1233 return future.then([
1234 predicate = std::forward<P>(predicate),
1235 thunk = std::forward<F>(thunk)
1237 return whileDo(std::forward<P>(predicate), std::forward<F>(thunk));
1240 return makeFuture();
1244 Future<Unit> times(const int n, F&& thunk) {
1245 return folly::whileDo(
1246 [ n, count = std::make_unique<std::atomic<int>>(0) ]() mutable {
1247 return count->fetch_add(1) < n;
1249 std::forward<F>(thunk));
1253 template <class It, class F, class ItT, class Result>
1254 std::vector<Future<Result>> map(It first, It last, F func) {
1255 std::vector<Future<Result>> results;
1256 for (auto it = first; it != last; it++) {
1257 results.push_back(it->then(func));
1267 struct retrying_policy_raw_tag {};
1268 struct retrying_policy_fut_tag {};
1270 template <class Policy>
1271 struct retrying_policy_traits {
1272 using ew = exception_wrapper;
1273 FOLLY_CREATE_HAS_MEMBER_FN_TRAITS(has_op_call, operator());
1274 template <class Ret>
1275 using has_op = typename std::integral_constant<bool,
1276 has_op_call<Policy, Ret(size_t, const ew&)>::value ||
1277 has_op_call<Policy, Ret(size_t, const ew&) const>::value>;
1278 using is_raw = has_op<bool>;
1279 using is_fut = has_op<Future<bool>>;
1280 using tag = typename std::conditional<
1281 is_raw::value, retrying_policy_raw_tag, typename std::conditional<
1282 is_fut::value, retrying_policy_fut_tag, void>::type>::type;
1285 template <class Policy, class FF, class Prom>
1286 void retryingImpl(size_t k, Policy&& p, FF&& ff, Prom prom) {
1287 using F = typename std::result_of<FF(size_t)>::type;
1288 using T = typename F::value_type;
1289 auto f = makeFutureWith([&] { return ff(k++); });
1292 prom = std::move(prom),
1293 pm = std::forward<Policy>(p),
1294 ffm = std::forward<FF>(ff)
1295 ](Try<T> && t) mutable {
1297 prom.setValue(std::move(t).value());
1300 auto& x = t.exception();
1304 prom = std::move(prom),
1307 ffm = std::move(ffm)
1308 ](bool shouldRetry) mutable {
1310 retryingImpl(k, std::move(pm), std::move(ffm), std::move(prom));
1312 prom.setException(std::move(xm));
1318 template <class Policy, class FF>
1319 typename std::result_of<FF(size_t)>::type
1320 retrying(size_t k, Policy&& p, FF&& ff) {
1321 using F = typename std::result_of<FF(size_t)>::type;
1322 using T = typename F::value_type;
1323 auto prom = Promise<T>();
1324 auto f = prom.getFuture();
1326 k, std::forward<Policy>(p), std::forward<FF>(ff), std::move(prom));
1330 template <class Policy, class FF>
1331 typename std::result_of<FF(size_t)>::type
1332 retrying(Policy&& p, FF&& ff, retrying_policy_raw_tag) {
1333 auto q = [pm = std::forward<Policy>(p)](size_t k, exception_wrapper x) {
1334 return makeFuture<bool>(pm(k, x));
1336 return retrying(0, std::move(q), std::forward<FF>(ff));
1339 template <class Policy, class FF>
1340 typename std::result_of<FF(size_t)>::type
1341 retrying(Policy&& p, FF&& ff, retrying_policy_fut_tag) {
1342 return retrying(0, std::forward<Policy>(p), std::forward<FF>(ff));
1345 // jittered exponential backoff, clamped to [backoff_min, backoff_max]
1346 template <class URNG>
1347 Duration retryingJitteredExponentialBackoffDur(
1349 Duration backoff_min,
1350 Duration backoff_max,
1351 double jitter_param,
1354 auto dist = std::normal_distribution<double>(0.0, jitter_param);
1355 auto jitter = std::exp(dist(rng));
1356 auto backoff = d(d::rep(jitter * backoff_min.count() * std::pow(2, n - 1)));
1357 return std::max(backoff_min, std::min(backoff_max, backoff));
1360 template <class Policy, class URNG>
1361 std::function<Future<bool>(size_t, const exception_wrapper&)>
1362 retryingPolicyCappedJitteredExponentialBackoff(
1364 Duration backoff_min,
1365 Duration backoff_max,
1366 double jitter_param,
1370 pm = std::forward<Policy>(p),
1375 rngp = std::forward<URNG>(rng)
1376 ](size_t n, const exception_wrapper& ex) mutable {
1377 if (n == max_tries) {
1378 return makeFuture(false);
1380 return pm(n, ex).then(
1381 [ n, backoff_min, backoff_max, jitter_param, rngp = std::move(rngp) ](
1384 return makeFuture(false);
1386 auto backoff = detail::retryingJitteredExponentialBackoffDur(
1387 n, backoff_min, backoff_max, jitter_param, rngp);
1388 return futures::sleep(backoff).then([] { return true; });
1393 template <class Policy, class URNG>
1394 std::function<Future<bool>(size_t, const exception_wrapper&)>
1395 retryingPolicyCappedJitteredExponentialBackoff(
1397 Duration backoff_min,
1398 Duration backoff_max,
1399 double jitter_param,
1402 retrying_policy_raw_tag) {
1403 auto q = [pm = std::forward<Policy>(p)](
1404 size_t n, const exception_wrapper& e) {
1405 return makeFuture(pm(n, e));
1407 return retryingPolicyCappedJitteredExponentialBackoff(
1412 std::forward<URNG>(rng),
1416 template <class Policy, class URNG>
1417 std::function<Future<bool>(size_t, const exception_wrapper&)>
1418 retryingPolicyCappedJitteredExponentialBackoff(
1420 Duration backoff_min,
1421 Duration backoff_max,
1422 double jitter_param,
1425 retrying_policy_fut_tag) {
1426 return retryingPolicyCappedJitteredExponentialBackoff(
1431 std::forward<URNG>(rng),
1432 std::forward<Policy>(p));
1436 template <class Policy, class FF>
1437 typename std::result_of<FF(size_t)>::type
1438 retrying(Policy&& p, FF&& ff) {
1439 using tag = typename detail::retrying_policy_traits<Policy>::tag;
1440 return detail::retrying(std::forward<Policy>(p), std::forward<FF>(ff), tag());
1444 std::function<bool(size_t, const exception_wrapper&)>
1445 retryingPolicyBasic(
1447 return [=](size_t n, const exception_wrapper&) { return n < max_tries; };
1450 template <class Policy, class URNG>
1451 std::function<Future<bool>(size_t, const exception_wrapper&)>
1452 retryingPolicyCappedJitteredExponentialBackoff(
1454 Duration backoff_min,
1455 Duration backoff_max,
1456 double jitter_param,
1459 using tag = typename detail::retrying_policy_traits<Policy>::tag;
1460 return detail::retryingPolicyCappedJitteredExponentialBackoff(
1465 std::forward<URNG>(rng),
1466 std::forward<Policy>(p),
1471 std::function<Future<bool>(size_t, const exception_wrapper&)>
1472 retryingPolicyCappedJitteredExponentialBackoff(
1474 Duration backoff_min,
1475 Duration backoff_max,
1476 double jitter_param) {
1477 auto p = [](size_t, const exception_wrapper&) { return true; };
1478 return retryingPolicyCappedJitteredExponentialBackoff(
1489 // Instantiate the most common Future types to save compile time
1490 extern template class Future<Unit>;
1491 extern template class Future<bool>;
1492 extern template class Future<int>;
1493 extern template class Future<int64_t>;
1494 extern template class Future<std::string>;
1495 extern template class Future<double>;
1497 } // namespace folly