2 * Copyright 2017 Facebook, Inc.
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
8 * http://www.apache.org/licenses/LICENSE-2.0
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
24 #include <folly/Optional.h>
25 #include <folly/executors/InlineExecutor.h>
26 #include <folly/futures/Timekeeper.h>
27 #include <folly/futures/detail/Core.h>
28 #include <folly/synchronization/Baton.h>
30 #ifndef FOLLY_FUTURE_USING_FIBER
31 #if FOLLY_MOBILE || defined(__APPLE__)
32 #define FOLLY_FUTURE_USING_FIBER 0
34 #define FOLLY_FUTURE_USING_FIBER 1
35 #include <folly/fibers/Baton.h>
45 #if FOLLY_FUTURE_USING_FIBER
46 typedef folly::fibers::Baton FutureBatonType;
48 typedef folly::Baton<> FutureBatonType;
51 } // namespace futures
54 std::shared_ptr<Timekeeper> getTimekeeperSingleton();
59 // Guarantees that the stored functor is destructed before the stored promise
60 // may be fulfilled. Assumes the stored functor to be noexcept-destructible.
61 template <typename T, typename F>
62 class CoreCallbackState {
64 template <typename FF>
65 CoreCallbackState(Promise<T>&& promise, FF&& func) noexcept(
66 noexcept(F(std::declval<FF>())))
67 : func_(std::forward<FF>(func)), promise_(std::move(promise)) {
68 assert(before_barrier());
71 CoreCallbackState(CoreCallbackState&& that) noexcept(
72 noexcept(F(std::declval<F>()))) {
73 if (that.before_barrier()) {
74 new (&func_) F(std::move(that.func_));
75 promise_ = that.stealPromise();
79 CoreCallbackState& operator=(CoreCallbackState&&) = delete;
81 ~CoreCallbackState() {
82 if (before_barrier()) {
87 template <typename... Args>
88 auto invoke(Args&&... args) noexcept(
89 noexcept(std::declval<F&&>()(std::declval<Args&&>()...))) {
90 assert(before_barrier());
91 return std::move(func_)(std::forward<Args>(args)...);
94 template <typename... Args>
95 auto tryInvoke(Args&&... args) noexcept {
96 return makeTryWith([&] { return invoke(std::forward<Args>(args)...); });
99 void setTry(Try<T>&& t) {
100 stealPromise().setTry(std::move(t));
103 void setException(exception_wrapper&& ew) {
104 stealPromise().setException(std::move(ew));
107 Promise<T> stealPromise() noexcept {
108 assert(before_barrier());
110 return std::move(promise_);
114 bool before_barrier() const noexcept {
115 return !promise_.isFulfilled();
121 Promise<T> promise_{Promise<T>::makeEmpty()};
124 template <typename T, typename F>
125 inline auto makeCoreCallbackState(Promise<T>&& p, F&& f) noexcept(
126 noexcept(CoreCallbackState<T, _t<std::decay<F>>>(
127 std::declval<Promise<T>&&>(),
128 std::declval<F&&>()))) {
129 return CoreCallbackState<T, _t<std::decay<F>>>(
130 std::move(p), std::forward<F>(f));
134 FutureBase<T>::FutureBase(SemiFuture<T>&& other) noexcept : core_(other.core_) {
135 other.core_ = nullptr;
139 FutureBase<T>::FutureBase(Future<T>&& other) noexcept : core_(other.core_) {
140 other.core_ = nullptr;
144 template <class T2, typename>
145 FutureBase<T>::FutureBase(T2&& val)
146 : core_(new futures::detail::Core<T>(Try<T>(std::forward<T2>(val)))) {}
149 template <typename T2>
150 FutureBase<T>::FutureBase(
151 typename std::enable_if<std::is_same<Unit, T2>::value>::type*)
152 : core_(new futures::detail::Core<T>(Try<T>(T()))) {}
157 typename std::enable_if<std::is_constructible<T, Args&&...>::value, int>::
159 FutureBase<T>::FutureBase(in_place_t, Args&&... args)
161 new futures::detail::Core<T>(in_place, std::forward<Args>(args)...)) {
165 template <class FutureType>
166 void FutureBase<T>::assign(FutureType& other) noexcept {
167 std::swap(core_, other.core_);
171 FutureBase<T>::~FutureBase() {
176 T& FutureBase<T>::value() & {
179 return core_->getTry().value();
183 T const& FutureBase<T>::value() const& {
186 return core_->getTry().value();
190 T&& FutureBase<T>::value() && {
193 return std::move(core_->getTry().value());
197 T const&& FutureBase<T>::value() const&& {
200 return std::move(core_->getTry().value());
204 bool FutureBase<T>::isReady() const {
206 return core_->ready();
210 bool FutureBase<T>::hasValue() {
211 return core_->getTry().hasValue();
215 bool FutureBase<T>::hasException() {
216 return core_->getTry().hasException();
220 void FutureBase<T>::detach() {
222 core_->detachFuture();
228 void FutureBase<T>::throwIfInvalid() const {
235 Optional<Try<T>> FutureBase<T>::poll() {
237 if (core_->ready()) {
238 o = std::move(core_->getTry());
244 void FutureBase<T>::raise(exception_wrapper exception) {
245 core_->raise(std::move(exception));
250 void FutureBase<T>::setCallback_(F&& func) {
252 core_->setCallback(std::forward<F>(func));
256 FutureBase<T>::FutureBase(futures::detail::EmptyConstruct) noexcept
261 // Variant: returns a value
262 // e.g. f.then([](Try<T>&& t){ return t.value(); });
264 template <typename F, typename R, bool isTry, typename... Args>
265 typename std::enable_if<!R::ReturnsFuture::value, typename R::Return>::type
266 FutureBase<T>::thenImplementation(
268 futures::detail::argResult<isTry, F, Args...>) {
269 static_assert(sizeof...(Args) <= 1, "Then must take zero/one argument");
270 typedef typename R::ReturnsFuture::Inner B;
272 this->throwIfInvalid();
275 p.core_->setInterruptHandlerNoLock(this->core_->getInterruptHandler());
277 // grab the Future now before we lose our handle on the Promise
278 auto f = p.getFuture();
279 f.core_->setExecutorNoLock(this->getExecutor());
281 /* This is a bit tricky.
283 We can't just close over *this in case this Future gets moved. So we
284 make a new dummy Future. We could figure out something more
285 sophisticated that avoids making a new Future object when it can, as an
286 optimization. But this is correct.
288 core_ can't be moved, it is explicitly disallowed (as is copying). But
289 if there's ever a reason to allow it, this is one place that makes that
290 assumption and would need to be fixed. We use a standard shared pointer
291 for core_ (by copying it in), which means in essence obj holds a shared
292 pointer to itself. But this shouldn't leak because Promise will not
293 outlive the continuation, because Promise will setException() with a
294 broken Promise if it is destructed before completed. We could use a
295 weak pointer but it would have to be converted to a shared pointer when
296 func is executed (because the Future returned by func may possibly
297 persist beyond the callback, if it gets moved), and so it is an
298 optimization to just make it shared from the get-go.
300 Two subtle but important points about this design. futures::detail::Core
301 has no back pointers to Future or Promise, so if Future or Promise get
302 moved (and they will be moved in performant code) we don't have to do
303 anything fancy. And because we store the continuation in the
304 futures::detail::Core, not in the Future, we can execute the continuation
305 even after the Future has gone out of scope. This is an intentional design
306 decision. It is likely we will want to be able to cancel a continuation
307 in some circumstances, but I think it should be explicit not implicit
308 in the destruction of the Future used to create it.
311 [state = futures::detail::makeCoreCallbackState(
312 std::move(p), std::forward<F>(func))](Try<T>&& t) mutable {
314 if (!isTry && t.hasException()) {
315 state.setException(std::move(t.exception()));
317 state.setTry(makeTryWith(
318 [&] { return state.invoke(t.template get<isTry, Args>()...); }));
324 // Variant: returns a Future
325 // e.g. f.then([](T&& t){ return makeFuture<T>(t); });
327 template <typename F, typename R, bool isTry, typename... Args>
328 typename std::enable_if<R::ReturnsFuture::value, typename R::Return>::type
329 FutureBase<T>::thenImplementation(
331 futures::detail::argResult<isTry, F, Args...>) {
332 static_assert(sizeof...(Args) <= 1, "Then must take zero/one argument");
333 typedef typename R::ReturnsFuture::Inner B;
334 this->throwIfInvalid();
337 p.core_->setInterruptHandlerNoLock(this->core_->getInterruptHandler());
339 // grab the Future now before we lose our handle on the Promise
340 auto f = p.getFuture();
341 f.core_->setExecutorNoLock(this->getExecutor());
344 [state = futures::detail::makeCoreCallbackState(
345 std::move(p), std::forward<F>(func))](Try<T>&& t) mutable {
346 if (!isTry && t.hasException()) {
347 state.setException(std::move(t.exception()));
349 auto tf2 = state.tryInvoke(t.template get<isTry, Args>()...);
350 if (tf2.hasException()) {
351 state.setException(std::move(tf2.exception()));
353 tf2->setCallback_([p = state.stealPromise()](Try<B> && b) mutable {
354 p.setTry(std::move(b));
362 } // namespace detail
363 } // namespace futures
366 SemiFuture<typename std::decay<T>::type> makeSemiFuture(T&& t) {
367 return makeSemiFuture(Try<typename std::decay<T>::type>(std::forward<T>(t)));
370 // makeSemiFutureWith(SemiFuture<T>()) -> SemiFuture<T>
372 typename std::enable_if<
373 isSemiFuture<typename std::result_of<F()>::type>::value,
374 typename std::result_of<F()>::type>::type
375 makeSemiFutureWith(F&& func) {
377 typename isSemiFuture<typename std::result_of<F()>::type>::Inner;
379 return std::forward<F>(func)();
380 } catch (std::exception& e) {
381 return makeSemiFuture<InnerType>(
382 exception_wrapper(std::current_exception(), e));
384 return makeSemiFuture<InnerType>(
385 exception_wrapper(std::current_exception()));
389 // makeSemiFutureWith(T()) -> SemiFuture<T>
390 // makeSemiFutureWith(void()) -> SemiFuture<Unit>
392 typename std::enable_if<
393 !(isSemiFuture<typename std::result_of<F()>::type>::value),
394 SemiFuture<Unit::LiftT<typename std::result_of<F()>::type>>>::type
395 makeSemiFutureWith(F&& func) {
396 using LiftedResult = Unit::LiftT<typename std::result_of<F()>::type>;
397 return makeSemiFuture<LiftedResult>(
398 makeTryWith([&func]() mutable { return std::forward<F>(func)(); }));
402 SemiFuture<T> makeSemiFuture(std::exception_ptr const& e) {
403 return makeSemiFuture(Try<T>(e));
407 SemiFuture<T> makeSemiFuture(exception_wrapper ew) {
408 return makeSemiFuture(Try<T>(std::move(ew)));
411 template <class T, class E>
413 enable_if<std::is_base_of<std::exception, E>::value, SemiFuture<T>>::type
414 makeSemiFuture(E const& e) {
415 return makeSemiFuture(Try<T>(make_exception_wrapper<E>(e)));
419 SemiFuture<T> makeSemiFuture(Try<T>&& t) {
420 return SemiFuture<T>(new futures::detail::Core<T>(std::move(t)));
423 // This must be defined after the constructors to avoid a bug in MSVC
424 // https://connect.microsoft.com/VisualStudio/feedback/details/3142777/out-of-line-constructor-definition-after-implicit-reference-causes-incorrect-c2244
425 inline SemiFuture<Unit> makeSemiFuture() {
426 return makeSemiFuture(Unit{});
430 SemiFuture<T> SemiFuture<T>::makeEmpty() {
431 return SemiFuture<T>(futures::detail::EmptyConstruct{});
435 SemiFuture<T>::SemiFuture(SemiFuture<T>&& other) noexcept
436 : futures::detail::FutureBase<T>(std::move(other)) {}
439 SemiFuture<T>::SemiFuture(Future<T>&& other) noexcept
440 : futures::detail::FutureBase<T>(std::move(other)) {
441 // SemiFuture should not have an executor on construction
443 this->setExecutor(nullptr);
448 SemiFuture<T>& SemiFuture<T>::operator=(SemiFuture<T>&& other) noexcept {
454 SemiFuture<T>& SemiFuture<T>::operator=(Future<T>&& other) noexcept {
456 // SemiFuture should not have an executor on construction
458 this->setExecutor(nullptr);
464 void SemiFuture<T>::boost_() {
465 // If a SemiFuture has an executor it should be deferred, so boost it
466 if (auto e = this->getExecutor()) {
467 // We know in a SemiFuture that if we have an executor it should be
468 // DeferredExecutor. Verify this in debug mode.
469 DCHECK(nullptr != dynamic_cast<DeferredExecutor*>(e));
471 auto ka = static_cast<DeferredExecutor*>(e)->getKeepAliveToken();
472 static_cast<DeferredExecutor*>(e)->boost();
477 inline Future<T> SemiFuture<T>::via(Executor* executor, int8_t priority) && {
483 // If current executor is deferred, boost block to ensure that work
484 // progresses and is run on the new executor.
485 auto oldExecutor = this->getExecutor();
486 if (oldExecutor && executor && (executor != oldExecutor)) {
487 // We know in a SemiFuture that if we have an executor it should be
488 // DeferredExecutor. Verify this in debug mode.
489 DCHECK(nullptr != dynamic_cast<DeferredExecutor*>(this->getExecutor()));
490 if (static_cast<DeferredExecutor*>(oldExecutor)) {
491 executor->add([oldExecutorKA = oldExecutor->getKeepAliveToken()]() {
492 static_cast<DeferredExecutor*>(oldExecutorKA.get())->boost();
497 this->setExecutor(executor, priority);
499 auto newFuture = Future<T>(this->core_);
500 this->core_ = nullptr;
505 template <typename F>
506 SemiFuture<typename futures::detail::callableResult<T, F>::Return::value_type>
507 SemiFuture<T>::defer(F&& func) && {
508 // If we already have a deferred executor, use it, otherwise create one
509 auto defKeepAlive = this->getExecutor()
510 ? this->getExecutor()->getKeepAliveToken()
511 : DeferredExecutor::create();
512 auto e = defKeepAlive.get();
513 // We know in a SemiFuture that if we have an executor it should be
514 // DeferredExecutor (either it was that way before, or we just created it).
515 // Verify this in debug mode.
516 DCHECK(nullptr != dynamic_cast<DeferredExecutor*>(e));
517 // Convert to a folly::future with a deferred executor
518 // Will be low-cost if this is not a new executor as via optimises for that
523 // Then add the work, with a wrapper function that captures the
524 // keepAlive so the executor is destroyed at the right time.
526 DeferredExecutor::wrap(std::move(defKeepAlive), std::move(func)))
527 // Finally, convert back o a folly::SemiFuture to hide the executor
529 // Carry deferred executor through chain as constructor from Future will
536 Future<T> Future<T>::makeEmpty() {
537 return Future<T>(futures::detail::EmptyConstruct{});
541 Future<T>::Future(Future<T>&& other) noexcept
542 : futures::detail::FutureBase<T>(std::move(other)) {}
545 Future<T>& Future<T>::operator=(Future<T>&& other) noexcept {
553 typename std::enable_if<
554 !std::is_same<T, typename std::decay<T2>::type>::value &&
555 std::is_constructible<T, T2&&>::value &&
556 std::is_convertible<T2&&, T>::value,
558 Future<T>::Future(Future<T2>&& other)
559 : Future(std::move(other).then([](T2&& v) { return T(std::move(v)); })) {}
564 typename std::enable_if<
565 !std::is_same<T, typename std::decay<T2>::type>::value &&
566 std::is_constructible<T, T2&&>::value &&
567 !std::is_convertible<T2&&, T>::value,
569 Future<T>::Future(Future<T2>&& other)
570 : Future(std::move(other).then([](T2&& v) { return T(std::move(v)); })) {}
575 typename std::enable_if<
576 !std::is_same<T, typename std::decay<T2>::type>::value &&
577 std::is_constructible<T, T2&&>::value,
579 Future<T>& Future<T>::operator=(Future<T2>&& other) {
581 std::move(other).then([](T2&& v) { return T(std::move(v)); }));
589 enable_if<isFuture<F>::value, Future<typename isFuture<T>::Inner>>::type
590 Future<T>::unwrap() {
591 return then([](Future<typename isFuture<T>::Inner> internal_future) {
592 return internal_future;
597 inline Future<T> Future<T>::via(Executor* executor, int8_t priority) && {
598 this->throwIfInvalid();
600 this->setExecutor(executor, priority);
602 auto newFuture = Future<T>(this->core_);
603 this->core_ = nullptr;
608 inline Future<T> Future<T>::via(Executor* executor, int8_t priority) & {
609 this->throwIfInvalid();
611 auto f = p.getFuture();
612 auto func = [p = std::move(p)](Try<T>&& t) mutable {
613 p.setTry(std::move(t));
615 using R = futures::detail::callableResult<T, decltype(func)>;
616 this->template thenImplementation<decltype(func), R>(
617 std::move(func), typename R::Arg());
618 return std::move(f).via(executor, priority);
621 template <typename T>
622 template <typename R, typename Caller, typename... Args>
623 Future<typename isFuture<R>::Inner>
624 Future<T>::then(R(Caller::*func)(Args...), Caller *instance) {
625 typedef typename std::remove_cv<typename std::remove_reference<
626 typename futures::detail::ArgType<Args...>::FirstArg>::type>::type
629 return then([instance, func](Try<T>&& t){
630 return (instance->*func)(t.template get<isTry<FirstArg>::value, Args>()...);
635 Future<Unit> Future<T>::then() {
636 return then([] () {});
639 // onError where the callback returns T
642 typename std::enable_if<
643 !futures::detail::callableWith<F, exception_wrapper>::value &&
644 !futures::detail::callableWith<F, exception_wrapper&>::value &&
645 !futures::detail::Extract<F>::ReturnsFuture::value,
647 Future<T>::onError(F&& func) {
648 typedef std::remove_reference_t<
649 typename futures::detail::Extract<F>::FirstArg>
652 std::is_same<typename futures::detail::Extract<F>::RawReturn, T>::value,
653 "Return type of onError callback must be T or Future<T>");
656 p.core_->setInterruptHandlerNoLock(this->core_->getInterruptHandler());
657 auto f = p.getFuture();
660 [state = futures::detail::makeCoreCallbackState(
661 std::move(p), std::forward<F>(func))](Try<T>&& t) mutable {
662 if (auto e = t.template tryGetExceptionObject<Exn>()) {
663 state.setTry(makeTryWith([&] { return state.invoke(*e); }));
665 state.setTry(std::move(t));
672 // onError where the callback returns Future<T>
675 typename std::enable_if<
676 !futures::detail::callableWith<F, exception_wrapper>::value &&
677 !futures::detail::callableWith<F, exception_wrapper&>::value &&
678 futures::detail::Extract<F>::ReturnsFuture::value,
680 Future<T>::onError(F&& func) {
682 std::is_same<typename futures::detail::Extract<F>::Return, Future<T>>::
684 "Return type of onError callback must be T or Future<T>");
685 typedef std::remove_reference_t<
686 typename futures::detail::Extract<F>::FirstArg>
690 auto f = p.getFuture();
693 [state = futures::detail::makeCoreCallbackState(
694 std::move(p), std::forward<F>(func))](Try<T>&& t) mutable {
695 if (auto e = t.template tryGetExceptionObject<Exn>()) {
696 auto tf2 = state.tryInvoke(*e);
697 if (tf2.hasException()) {
698 state.setException(std::move(tf2.exception()));
700 tf2->setCallback_([p = state.stealPromise()](Try<T> && t3) mutable {
701 p.setTry(std::move(t3));
705 state.setTry(std::move(t));
714 Future<T> Future<T>::ensure(F&& func) {
715 return this->then([funcw = std::forward<F>(func)](Try<T> && t) mutable {
717 return makeFuture(std::move(t));
723 Future<T> Future<T>::onTimeout(Duration dur, F&& func, Timekeeper* tk) {
724 return within(dur, tk).onError([funcw = std::forward<F>(func)](
725 TimedOut const&) { return std::move(funcw)(); });
730 typename std::enable_if<
731 futures::detail::callableWith<F, exception_wrapper>::value &&
732 futures::detail::Extract<F>::ReturnsFuture::value,
734 Future<T>::onError(F&& func) {
736 std::is_same<typename futures::detail::Extract<F>::Return, Future<T>>::
738 "Return type of onError callback must be T or Future<T>");
741 auto f = p.getFuture();
743 [state = futures::detail::makeCoreCallbackState(
744 std::move(p), std::forward<F>(func))](Try<T> t) mutable {
745 if (t.hasException()) {
746 auto tf2 = state.tryInvoke(std::move(t.exception()));
747 if (tf2.hasException()) {
748 state.setException(std::move(tf2.exception()));
750 tf2->setCallback_([p = state.stealPromise()](Try<T> && t3) mutable {
751 p.setTry(std::move(t3));
755 state.setTry(std::move(t));
762 // onError(exception_wrapper) that returns T
765 typename std::enable_if<
766 futures::detail::callableWith<F, exception_wrapper>::value &&
767 !futures::detail::Extract<F>::ReturnsFuture::value,
769 Future<T>::onError(F&& func) {
771 std::is_same<typename futures::detail::Extract<F>::Return, Future<T>>::
773 "Return type of onError callback must be T or Future<T>");
776 auto f = p.getFuture();
778 [state = futures::detail::makeCoreCallbackState(
779 std::move(p), std::forward<F>(func))](Try<T>&& t) mutable {
780 if (t.hasException()) {
781 state.setTry(makeTryWith(
782 [&] { return state.invoke(std::move(t.exception())); }));
784 state.setTry(std::move(t));
791 template <class Func>
792 auto via(Executor* x, Func&& func)
793 -> Future<typename isFuture<decltype(std::declval<Func>()())>::Inner> {
794 // TODO make this actually more performant. :-P #7260175
795 return via(x).then(std::forward<Func>(func));
801 Future<typename std::decay<T>::type> makeFuture(T&& t) {
802 return makeFuture(Try<typename std::decay<T>::type>(std::forward<T>(t)));
805 inline Future<Unit> makeFuture() {
806 return makeFuture(Unit{});
809 // makeFutureWith(Future<T>()) -> Future<T>
811 typename std::enable_if<isFuture<typename std::result_of<F()>::type>::value,
812 typename std::result_of<F()>::type>::type
813 makeFutureWith(F&& func) {
815 typename isFuture<typename std::result_of<F()>::type>::Inner;
817 return std::forward<F>(func)();
818 } catch (std::exception& e) {
819 return makeFuture<InnerType>(
820 exception_wrapper(std::current_exception(), e));
822 return makeFuture<InnerType>(exception_wrapper(std::current_exception()));
826 // makeFutureWith(T()) -> Future<T>
827 // makeFutureWith(void()) -> Future<Unit>
829 typename std::enable_if<
830 !(isFuture<typename std::result_of<F()>::type>::value),
831 Future<Unit::LiftT<typename std::result_of<F()>::type>>>::type
832 makeFutureWith(F&& func) {
833 using LiftedResult = Unit::LiftT<typename std::result_of<F()>::type>;
834 return makeFuture<LiftedResult>(
835 makeTryWith([&func]() mutable { return std::forward<F>(func)(); }));
839 Future<T> makeFuture(std::exception_ptr const& e) {
840 return makeFuture(Try<T>(e));
844 Future<T> makeFuture(exception_wrapper ew) {
845 return makeFuture(Try<T>(std::move(ew)));
848 template <class T, class E>
849 typename std::enable_if<std::is_base_of<std::exception, E>::value,
851 makeFuture(E const& e) {
852 return makeFuture(Try<T>(make_exception_wrapper<E>(e)));
856 Future<T> makeFuture(Try<T>&& t) {
857 return Future<T>(new futures::detail::Core<T>(std::move(t)));
861 Future<Unit> via(Executor* executor, int8_t priority) {
862 return makeFuture().via(executor, priority);
865 // mapSetCallback calls func(i, Try<T>) when every future completes
867 template <class T, class InputIterator, class F>
868 void mapSetCallback(InputIterator first, InputIterator last, F func) {
869 for (size_t i = 0; first != last; ++first, ++i) {
870 first->setCallback_([func, i](Try<T>&& t) {
871 func(i, std::move(t));
876 // collectAll (variadic)
878 template <typename... Fs>
879 typename futures::detail::CollectAllVariadicContext<
880 typename std::decay<Fs>::type::value_type...>::type
881 collectAll(Fs&&... fs) {
882 auto ctx = std::make_shared<futures::detail::CollectAllVariadicContext<
883 typename std::decay<Fs>::type::value_type...>>();
884 futures::detail::collectVariadicHelper<
885 futures::detail::CollectAllVariadicContext>(ctx, std::forward<Fs>(fs)...);
886 return ctx->p.getFuture();
889 // collectAll (iterator)
891 template <class InputIterator>
894 Try<typename std::iterator_traits<InputIterator>::value_type::value_type>>>
895 collectAll(InputIterator first, InputIterator last) {
897 typename std::iterator_traits<InputIterator>::value_type::value_type T;
899 struct CollectAllContext {
900 CollectAllContext(size_t n) : results(n) {}
901 ~CollectAllContext() {
902 p.setValue(std::move(results));
904 Promise<std::vector<Try<T>>> p;
905 std::vector<Try<T>> results;
909 std::make_shared<CollectAllContext>(size_t(std::distance(first, last)));
910 mapSetCallback<T>(first, last, [ctx](size_t i, Try<T>&& t) {
911 ctx->results[i] = std::move(t);
913 return ctx->p.getFuture();
916 // collect (iterator)
921 template <typename T>
922 struct CollectContext {
924 explicit Nothing(int /* n */) {}
927 using Result = typename std::conditional<
928 std::is_void<T>::value,
930 std::vector<T>>::type;
932 using InternalResult = typename std::conditional<
933 std::is_void<T>::value,
935 std::vector<Optional<T>>>::type;
937 explicit CollectContext(size_t n) : result(n) {}
939 if (!threw.exchange(true)) {
940 // map Optional<T> -> T
941 std::vector<T> finalResult;
942 finalResult.reserve(result.size());
943 std::transform(result.begin(), result.end(),
944 std::back_inserter(finalResult),
945 [](Optional<T>& o) { return std::move(o.value()); });
946 p.setValue(std::move(finalResult));
949 inline void setPartialResult(size_t i, Try<T>& t) {
950 result[i] = std::move(t.value());
953 InternalResult result;
954 std::atomic<bool> threw {false};
957 } // namespace detail
958 } // namespace futures
960 template <class InputIterator>
961 Future<typename futures::detail::CollectContext<typename std::iterator_traits<
962 InputIterator>::value_type::value_type>::Result>
963 collect(InputIterator first, InputIterator last) {
965 typename std::iterator_traits<InputIterator>::value_type::value_type T;
967 auto ctx = std::make_shared<futures::detail::CollectContext<T>>(
968 std::distance(first, last));
969 mapSetCallback<T>(first, last, [ctx](size_t i, Try<T>&& t) {
970 if (t.hasException()) {
971 if (!ctx->threw.exchange(true)) {
972 ctx->p.setException(std::move(t.exception()));
974 } else if (!ctx->threw) {
975 ctx->setPartialResult(i, t);
978 return ctx->p.getFuture();
981 // collect (variadic)
983 template <typename... Fs>
984 typename futures::detail::CollectVariadicContext<
985 typename std::decay<Fs>::type::value_type...>::type
986 collect(Fs&&... fs) {
987 auto ctx = std::make_shared<futures::detail::CollectVariadicContext<
988 typename std::decay<Fs>::type::value_type...>>();
989 futures::detail::collectVariadicHelper<
990 futures::detail::CollectVariadicContext>(ctx, std::forward<Fs>(fs)...);
991 return ctx->p.getFuture();
994 // collectAny (iterator)
996 template <class InputIterator>
1001 std::iterator_traits<InputIterator>::value_type::value_type>>>
1002 collectAny(InputIterator first, InputIterator last) {
1004 typename std::iterator_traits<InputIterator>::value_type::value_type T;
1006 struct CollectAnyContext {
1007 CollectAnyContext() {}
1008 Promise<std::pair<size_t, Try<T>>> p;
1009 std::atomic<bool> done {false};
1012 auto ctx = std::make_shared<CollectAnyContext>();
1013 mapSetCallback<T>(first, last, [ctx](size_t i, Try<T>&& t) {
1014 if (!ctx->done.exchange(true)) {
1015 ctx->p.setValue(std::make_pair(i, std::move(t)));
1018 return ctx->p.getFuture();
1021 // collectAnyWithoutException (iterator)
1023 template <class InputIterator>
1026 typename std::iterator_traits<InputIterator>::value_type::value_type>>
1027 collectAnyWithoutException(InputIterator first, InputIterator last) {
1029 typename std::iterator_traits<InputIterator>::value_type::value_type T;
1031 struct CollectAnyWithoutExceptionContext {
1032 CollectAnyWithoutExceptionContext(){}
1033 Promise<std::pair<size_t, T>> p;
1034 std::atomic<bool> done{false};
1035 std::atomic<size_t> nFulfilled{0};
1039 auto ctx = std::make_shared<CollectAnyWithoutExceptionContext>();
1040 ctx->nTotal = size_t(std::distance(first, last));
1042 mapSetCallback<T>(first, last, [ctx](size_t i, Try<T>&& t) {
1043 if (!t.hasException() && !ctx->done.exchange(true)) {
1044 ctx->p.setValue(std::make_pair(i, std::move(t.value())));
1045 } else if (++ctx->nFulfilled == ctx->nTotal) {
1046 ctx->p.setException(t.exception());
1049 return ctx->p.getFuture();
1052 // collectN (iterator)
1054 template <class InputIterator>
1055 Future<std::vector<std::pair<size_t, Try<typename
1056 std::iterator_traits<InputIterator>::value_type::value_type>>>>
1057 collectN(InputIterator first, InputIterator last, size_t n) {
1059 std::iterator_traits<InputIterator>::value_type::value_type T;
1060 typedef std::vector<std::pair<size_t, Try<T>>> V;
1062 struct CollectNContext {
1064 std::atomic<size_t> completed = {0};
1067 auto ctx = std::make_shared<CollectNContext>();
1069 if (size_t(std::distance(first, last)) < n) {
1070 ctx->p.setException(std::runtime_error("Not enough futures"));
1072 // for each completed Future, increase count and add to vector, until we
1073 // have n completed futures at which point we fulfil our Promise with the
1075 mapSetCallback<T>(first, last, [ctx, n](size_t i, Try<T>&& t) {
1076 auto c = ++ctx->completed;
1078 assert(ctx->v.size() < n);
1079 ctx->v.emplace_back(i, std::move(t));
1081 ctx->p.setTry(Try<V>(std::move(ctx->v)));
1087 return ctx->p.getFuture();
1090 // reduce (iterator)
1092 template <class It, class T, class F>
1093 Future<T> reduce(It first, It last, T&& initial, F&& func) {
1094 if (first == last) {
1095 return makeFuture(std::move(initial));
1098 typedef typename std::iterator_traits<It>::value_type::value_type ItT;
1099 typedef typename std::conditional<
1100 futures::detail::callableWith<F, T&&, Try<ItT>&&>::value,
1103 typedef isTry<Arg> IsTry;
1105 auto sfunc = std::make_shared<F>(std::move(func));
1107 auto f = first->then(
1108 [ minitial = std::move(initial), sfunc ](Try<ItT> & head) mutable {
1110 std::move(minitial), head.template get<IsTry::value, Arg&&>());
1113 for (++first; first != last; ++first) {
1114 f = collectAll(f, *first).then([sfunc](std::tuple<Try<T>, Try<ItT>>& t) {
1115 return (*sfunc)(std::move(std::get<0>(t).value()),
1116 // Either return a ItT&& or a Try<ItT>&& depending
1117 // on the type of the argument of func.
1118 std::get<1>(t).template get<IsTry::value, Arg&&>());
1125 // window (collection)
1127 template <class Collection, class F, class ItT, class Result>
1128 std::vector<Future<Result>>
1129 window(Collection input, F func, size_t n) {
1130 // Use global inline executor singleton
1131 auto executor = &InlineExecutor::instance();
1132 return window(executor, std::move(input), std::move(func), n);
1135 template <class Collection, class F, class ItT, class Result>
1136 std::vector<Future<Result>>
1137 window(Executor* executor, Collection input, F func, size_t n) {
1138 struct WindowContext {
1139 WindowContext(Executor* executor_, Collection&& input_, F&& func_)
1140 : executor(executor_),
1141 input(std::move(input_)),
1142 promises(input.size()),
1143 func(std::move(func_)) {}
1144 std::atomic<size_t> i{0};
1147 std::vector<Promise<Result>> promises;
1150 static inline void spawn(std::shared_ptr<WindowContext> ctx) {
1151 size_t i = ctx->i++;
1152 if (i < ctx->input.size()) {
1153 auto fut = ctx->func(std::move(ctx->input[i]));
1154 fut.setCallback_([ctx = std::move(ctx), i](Try<Result>&& t) mutable {
1155 const auto executor_ = ctx->executor;
1156 executor_->add([ctx = std::move(ctx), i, t = std::move(t)]() mutable {
1157 ctx->promises[i].setTry(std::move(t));
1158 // Chain another future onto this one
1159 spawn(std::move(ctx));
1166 auto max = std::min(n, input.size());
1168 auto ctx = std::make_shared<WindowContext>(
1169 executor, std::move(input), std::move(func));
1171 // Start the first n Futures
1172 for (size_t i = 0; i < max; ++i) {
1173 executor->add([ctx]() mutable { WindowContext::spawn(std::move(ctx)); });
1176 std::vector<Future<Result>> futures;
1177 futures.reserve(ctx->promises.size());
1178 for (auto& promise : ctx->promises) {
1179 futures.emplace_back(promise.getFuture());
1188 template <class I, class F>
1189 Future<I> Future<T>::reduce(I&& initial, F&& func) {
1191 minitial = std::forward<I>(initial),
1192 mfunc = std::forward<F>(func)
1193 ](T& vals) mutable {
1194 auto ret = std::move(minitial);
1195 for (auto& val : vals) {
1196 ret = mfunc(std::move(ret), std::move(val));
1202 // unorderedReduce (iterator)
1204 template <class It, class T, class F, class ItT, class Arg>
1205 Future<T> unorderedReduce(It first, It last, T initial, F func) {
1206 if (first == last) {
1207 return makeFuture(std::move(initial));
1210 typedef isTry<Arg> IsTry;
1212 struct UnorderedReduceContext {
1213 UnorderedReduceContext(T&& memo, F&& fn, size_t n)
1214 : lock_(), memo_(makeFuture<T>(std::move(memo))),
1215 func_(std::move(fn)), numThens_(0), numFutures_(n), promise_()
1217 folly::MicroSpinLock lock_; // protects memo_ and numThens_
1220 size_t numThens_; // how many Futures completed and called .then()
1221 size_t numFutures_; // how many Futures in total
1222 Promise<T> promise_;
1225 auto ctx = std::make_shared<UnorderedReduceContext>(
1226 std::move(initial), std::move(func), std::distance(first, last));
1228 mapSetCallback<ItT>(
1231 [ctx](size_t /* i */, Try<ItT>&& t) {
1232 // Futures can be completed in any order, simultaneously.
1233 // To make this non-blocking, we create a new Future chain in
1234 // the order of completion to reduce the values.
1235 // The spinlock just protects chaining a new Future, not actually
1236 // executing the reduce, which should be really fast.
1237 folly::MSLGuard lock(ctx->lock_);
1239 ctx->memo_.then([ ctx, mt = std::move(t) ](T && v) mutable {
1240 // Either return a ItT&& or a Try<ItT>&& depending
1241 // on the type of the argument of func.
1242 return ctx->func_(std::move(v),
1243 mt.template get<IsTry::value, Arg&&>());
1245 if (++ctx->numThens_ == ctx->numFutures_) {
1246 // After reducing the value of the last Future, fulfill the Promise
1247 ctx->memo_.setCallback_(
1248 [ctx](Try<T>&& t2) { ctx->promise_.setValue(std::move(t2)); });
1252 return ctx->promise_.getFuture();
1258 Future<T> Future<T>::within(Duration dur, Timekeeper* tk) {
1259 return within(dur, TimedOut(), tk);
1264 Future<T> Future<T>::within(Duration dur, E e, Timekeeper* tk) {
1267 Context(E ex) : exception(std::move(ex)), promise() {}
1269 Future<Unit> thisFuture;
1271 std::atomic<bool> token {false};
1274 if (this->isReady()) {
1275 return std::move(*this);
1278 std::shared_ptr<Timekeeper> tks;
1280 tks = folly::detail::getTimekeeperSingleton();
1284 if (UNLIKELY(!tk)) {
1285 return makeFuture<T>(NoTimekeeper());
1288 auto ctx = std::make_shared<Context>(std::move(e));
1290 ctx->thisFuture = this->then([ctx](Try<T>&& t) mutable {
1291 if (ctx->token.exchange(true) == false) {
1292 ctx->promise.setTry(std::move(t));
1296 // Have time keeper use a weak ptr to hold ctx,
1297 // so that ctx can be deallocated as soon as the future job finished.
1298 tk->after(dur).then([weakCtx = to_weak_ptr(ctx)](Try<Unit> const& t) mutable {
1299 auto lockedCtx = weakCtx.lock();
1301 // ctx already released. "this" completed first, cancel "after"
1304 // "after" completed first, cancel "this"
1305 lockedCtx->thisFuture.raise(TimedOut());
1306 if (lockedCtx->token.exchange(true) == false) {
1307 if (t.hasException()) {
1308 lockedCtx->promise.setException(std::move(t.exception()));
1310 lockedCtx->promise.setException(std::move(lockedCtx->exception));
1315 return ctx->promise.getFuture().via(this->getExecutor());
1321 Future<T> Future<T>::delayed(Duration dur, Timekeeper* tk) {
1322 return collectAll(*this, futures::sleep(dur, tk))
1323 .then([](std::tuple<Try<T>, Try<Unit>> tup) {
1324 Try<T>& t = std::get<0>(tup);
1325 return makeFuture<T>(std::move(t));
1333 void doBoost(folly::Future<T>& /* usused */) {}
1336 void doBoost(folly::SemiFuture<T>& f) {
1340 template <class FutureType, typename T = typename FutureType::value_type>
1341 void waitImpl(FutureType& f) {
1342 // short-circuit if there's nothing to do
1347 FutureBatonType baton;
1348 f.setCallback_([&](const Try<T>& /* t */) { baton.post(); });
1351 assert(f.isReady());
1354 template <class FutureType, typename T = typename FutureType::value_type>
1355 void waitImpl(FutureType& f, Duration dur) {
1356 // short-circuit if there's nothing to do
1362 auto ret = promise.getFuture();
1363 auto baton = std::make_shared<FutureBatonType>();
1364 f.setCallback_([baton, promise = std::move(promise)](Try<T>&& t) mutable {
1365 promise.setTry(std::move(t));
1370 if (baton->try_wait_for(dur)) {
1371 assert(f.isReady());
1376 void waitViaImpl(Future<T>& f, DrivableExecutor* e) {
1377 // Set callback so to ensure that the via executor has something on it
1378 // so that once the preceding future triggers this callback, drive will
1379 // always have a callback to satisfy it
1383 f = f.via(e).then([](T&& t) { return std::move(t); });
1384 while (!f.isReady()) {
1387 assert(f.isReady());
1391 void waitViaImpl(SemiFuture<T>& f, DrivableExecutor* e) {
1392 // Set callback so to ensure that the via executor has something on it
1393 // so that once the preceding future triggers this callback, drive will
1394 // always have a callback to satisfy it
1398 f = std::move(f).via(e).then([](T&& t) { return std::move(t); });
1399 while (!f.isReady()) {
1402 assert(f.isReady());
1405 } // namespace detail
1406 } // namespace futures
1409 SemiFuture<T>& SemiFuture<T>::wait() & {
1410 futures::detail::waitImpl(*this);
1415 SemiFuture<T>&& SemiFuture<T>::wait() && {
1416 futures::detail::waitImpl(*this);
1417 return std::move(*this);
1421 SemiFuture<T>& SemiFuture<T>::wait(Duration dur) & {
1422 futures::detail::waitImpl(*this, dur);
1427 SemiFuture<T>&& SemiFuture<T>::wait(Duration dur) && {
1428 futures::detail::waitImpl(*this, dur);
1429 return std::move(*this);
1433 SemiFuture<T>& SemiFuture<T>::waitVia(DrivableExecutor* e) & {
1434 futures::detail::waitViaImpl(*this, e);
1439 SemiFuture<T>&& SemiFuture<T>::waitVia(DrivableExecutor* e) && {
1440 futures::detail::waitViaImpl(*this, e);
1441 return std::move(*this);
1445 T SemiFuture<T>::get() && {
1446 return std::move(wait().value());
1450 T SemiFuture<T>::get(Duration dur) && {
1452 if (this->isReady()) {
1453 return std::move(this->value());
1460 Try<T> SemiFuture<T>::getTry() && {
1462 return std::move(this->core_->getTry());
1466 T SemiFuture<T>::getVia(DrivableExecutor* e) && {
1467 return std::move(waitVia(e).value());
1471 Try<T> SemiFuture<T>::getTryVia(DrivableExecutor* e) && {
1473 return std::move(this->core_->getTry());
1477 Future<T>& Future<T>::wait() & {
1478 futures::detail::waitImpl(*this);
1483 Future<T>&& Future<T>::wait() && {
1484 futures::detail::waitImpl(*this);
1485 return std::move(*this);
1489 Future<T>& Future<T>::wait(Duration dur) & {
1490 futures::detail::waitImpl(*this, dur);
1495 Future<T>&& Future<T>::wait(Duration dur) && {
1496 futures::detail::waitImpl(*this, dur);
1497 return std::move(*this);
1501 Future<T>& Future<T>::waitVia(DrivableExecutor* e) & {
1502 futures::detail::waitViaImpl(*this, e);
1507 Future<T>&& Future<T>::waitVia(DrivableExecutor* e) && {
1508 futures::detail::waitViaImpl(*this, e);
1509 return std::move(*this);
1513 T Future<T>::get() {
1514 return std::move(wait().value());
1518 T Future<T>::get(Duration dur) {
1520 if (this->isReady()) {
1521 return std::move(this->value());
1528 Try<T>& Future<T>::getTry() {
1531 return this->core_->getTry();
1535 T Future<T>::getVia(DrivableExecutor* e) {
1536 return std::move(waitVia(e).value());
1540 Try<T>& Future<T>::getTryVia(DrivableExecutor* e) {
1541 return waitVia(e).getTry();
1548 static bool equals(const Try<T>& t1, const Try<T>& t2) {
1549 return t1.value() == t2.value();
1552 } // namespace detail
1553 } // namespace futures
1556 Future<bool> Future<T>::willEqual(Future<T>& f) {
1557 return collectAll(*this, f).then([](const std::tuple<Try<T>, Try<T>>& t) {
1558 if (std::get<0>(t).hasValue() && std::get<1>(t).hasValue()) {
1559 return futures::detail::TryEquals<T>::equals(
1560 std::get<0>(t), std::get<1>(t));
1569 Future<T> Future<T>::filter(F&& predicate) {
1570 return this->then([p = std::forward<F>(predicate)](T val) {
1571 T const& valConstRef = val;
1572 if (!p(valConstRef)) {
1573 throwPredicateDoesNotObtain();
1580 inline Future<Unit> when(bool p, F&& thunk) {
1581 return p ? std::forward<F>(thunk)().unit() : makeFuture();
1584 template <class P, class F>
1585 Future<Unit> whileDo(P&& predicate, F&& thunk) {
1587 auto future = thunk();
1588 return future.then([
1589 predicate = std::forward<P>(predicate),
1590 thunk = std::forward<F>(thunk)
1592 return whileDo(std::forward<P>(predicate), std::forward<F>(thunk));
1595 return makeFuture();
1599 Future<Unit> times(const int n, F&& thunk) {
1600 return folly::whileDo(
1601 [ n, count = std::make_unique<std::atomic<int>>(0) ]() mutable {
1602 return count->fetch_add(1) < n;
1604 std::forward<F>(thunk));
1608 template <class It, class F, class ItT, class Result>
1609 std::vector<Future<Result>> map(It first, It last, F func) {
1610 std::vector<Future<Result>> results;
1611 for (auto it = first; it != last; it++) {
1612 results.push_back(it->then(func));
1616 } // namespace futures
1618 // Instantiate the most common Future types to save compile time
1619 extern template class Future<Unit>;
1620 extern template class Future<bool>;
1621 extern template class Future<int>;
1622 extern template class Future<int64_t>;
1623 extern template class Future<std::string>;
1624 extern template class Future<double>;
1625 } // namespace folly