2 * Copyright 2015 Facebook, Inc.
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
8 * http://www.apache.org/licenses/LICENSE-2.0
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
22 #include <folly/Baton.h>
23 #include <folly/Optional.h>
24 #include <folly/futures/detail/Core.h>
25 #include <folly/futures/Timekeeper.h>
32 Timekeeper* getTimekeeperSingleton();
36 Future<T>::Future(Future<T>&& other) noexcept : core_(other.core_) {
37 other.core_ = nullptr;
41 Future<T>& Future<T>::operator=(Future<T>&& other) noexcept {
42 std::swap(core_, other.core_);
49 const typename std::enable_if<!std::is_void<F>::value, F>::type& val)
53 *this = p.getFuture();
59 typename std::enable_if<!std::is_void<F>::value, F>::type&& val)
62 p.setValue(std::forward<F>(val));
63 *this = p.getFuture();
68 typename std::enable_if<std::is_void<F>::value, int>::type>
69 Future<void>::Future() : core_(nullptr) {
72 *this = p.getFuture();
77 Future<T>::~Future() {
82 void Future<T>::detach() {
84 core_->detachFuture();
90 void Future<T>::throwIfInvalid() const {
97 void Future<T>::setCallback_(F&& func) {
99 core_->setCallback(std::move(func));
106 typename std::enable_if<isFuture<F>::value,
107 Future<typename isFuture<T>::Inner>>::type
108 Future<T>::unwrap() {
109 return then([](Future<typename isFuture<T>::Inner> internal_future) {
110 return internal_future;
116 // Variant: returns a value
117 // e.g. f.then([](Try<T>&& t){ return t.value(); });
119 template <typename F, typename R, bool isTry, typename... Args>
120 typename std::enable_if<!R::ReturnsFuture::value, typename R::Return>::type
121 Future<T>::thenImplementation(F func, detail::argResult<isTry, F, Args...>) {
122 static_assert(sizeof...(Args) <= 1, "Then must take zero/one argument");
123 typedef typename R::ReturnsFuture::Inner B;
127 // wrap these so we can move them into the lambda
128 folly::MoveWrapper<Promise<B>> p;
129 folly::MoveWrapper<F> funcm(std::forward<F>(func));
131 // grab the Future now before we lose our handle on the Promise
132 auto f = p->getFuture();
134 f.setExecutor(getExecutor());
137 /* This is a bit tricky.
139 We can't just close over *this in case this Future gets moved. So we
140 make a new dummy Future. We could figure out something more
141 sophisticated that avoids making a new Future object when it can, as an
142 optimization. But this is correct.
144 core_ can't be moved, it is explicitly disallowed (as is copying). But
145 if there's ever a reason to allow it, this is one place that makes that
146 assumption and would need to be fixed. We use a standard shared pointer
147 for core_ (by copying it in), which means in essence obj holds a shared
148 pointer to itself. But this shouldn't leak because Promise will not
149 outlive the continuation, because Promise will setException() with a
150 broken Promise if it is destructed before completed. We could use a
151 weak pointer but it would have to be converted to a shared pointer when
152 func is executed (because the Future returned by func may possibly
153 persist beyond the callback, if it gets moved), and so it is an
154 optimization to just make it shared from the get-go.
156 We have to move in the Promise and func using the MoveWrapper
157 hack. (func could be copied but it's a big drag on perf).
159 Two subtle but important points about this design. detail::Core has no
160 back pointers to Future or Promise, so if Future or Promise get moved
161 (and they will be moved in performant code) we don't have to do
162 anything fancy. And because we store the continuation in the
163 detail::Core, not in the Future, we can execute the continuation even
164 after the Future has gone out of scope. This is an intentional design
165 decision. It is likely we will want to be able to cancel a continuation
166 in some circumstances, but I think it should be explicit not implicit
167 in the destruction of the Future used to create it.
170 [p, funcm](Try<T>&& t) mutable {
171 if (!isTry && t.hasException()) {
172 p->setException(std::move(t.exception()));
175 return (*funcm)(t.template get<isTry, Args>()...);
183 // Variant: returns a Future
184 // e.g. f.then([](T&& t){ return makeFuture<T>(t); });
186 template <typename F, typename R, bool isTry, typename... Args>
187 typename std::enable_if<R::ReturnsFuture::value, typename R::Return>::type
188 Future<T>::thenImplementation(F func, detail::argResult<isTry, F, Args...>) {
189 static_assert(sizeof...(Args) <= 1, "Then must take zero/one argument");
190 typedef typename R::ReturnsFuture::Inner B;
194 // wrap these so we can move them into the lambda
195 folly::MoveWrapper<Promise<B>> p;
196 folly::MoveWrapper<F> funcm(std::forward<F>(func));
198 // grab the Future now before we lose our handle on the Promise
199 auto f = p->getFuture();
201 f.setExecutor(getExecutor());
205 [p, funcm](Try<T>&& t) mutable {
206 if (!isTry && t.hasException()) {
207 p->setException(std::move(t.exception()));
210 auto f2 = (*funcm)(t.template get<isTry, Args>()...);
211 // that didn't throw, now we can steal p
212 f2.setCallback_([p](Try<B>&& b) mutable {
213 p->fulfilTry(std::move(b));
215 } catch (const std::exception& e) {
216 p->setException(exception_wrapper(std::current_exception(), e));
218 p->setException(exception_wrapper(std::current_exception()));
226 template <typename T>
227 template <typename R, typename Caller, typename... Args>
228 Future<typename isFuture<R>::Inner>
229 Future<T>::then(R(Caller::*func)(Args...), Caller *instance) {
230 typedef typename std::remove_cv<
231 typename std::remove_reference<
232 typename detail::ArgType<Args...>::FirstArg>::type>::type FirstArg;
233 return then([instance, func](Try<T>&& t){
234 return (instance->*func)(t.template get<isTry<FirstArg>::value, Args>()...);
239 Future<void> Future<T>::then() {
240 return then([] (Try<T>&& t) {});
243 // onError where the callback returns T
246 typename std::enable_if<
247 !detail::Extract<F>::ReturnsFuture::value,
249 Future<T>::onError(F&& func) {
250 typedef typename detail::Extract<F>::FirstArg Exn;
252 std::is_same<typename detail::Extract<F>::RawReturn, T>::value,
253 "Return type of onError callback must be T or Future<T>");
256 auto f = p.getFuture();
257 auto pm = folly::makeMoveWrapper(std::move(p));
258 auto funcm = folly::makeMoveWrapper(std::move(func));
259 setCallback_([pm, funcm](Try<T>&& t) mutable {
260 if (!t.template withException<Exn>([&] (Exn& e) {
265 pm->fulfilTry(std::move(t));
272 // onError where the callback returns Future<T>
275 typename std::enable_if<
276 detail::Extract<F>::ReturnsFuture::value,
278 Future<T>::onError(F&& func) {
280 std::is_same<typename detail::Extract<F>::Return, Future<T>>::value,
281 "Return type of onError callback must be T or Future<T>");
282 typedef typename detail::Extract<F>::FirstArg Exn;
285 auto f = p.getFuture();
286 auto pm = folly::makeMoveWrapper(std::move(p));
287 auto funcm = folly::makeMoveWrapper(std::move(func));
288 setCallback_([pm, funcm](Try<T>&& t) mutable {
289 if (!t.template withException<Exn>([&] (Exn& e) {
291 auto f2 = (*funcm)(e);
292 f2.setCallback_([pm](Try<T>&& t2) mutable {
293 pm->fulfilTry(std::move(t2));
295 } catch (const std::exception& e2) {
296 pm->setException(exception_wrapper(std::current_exception(), e2));
298 pm->setException(exception_wrapper(std::current_exception()));
301 pm->fulfilTry(std::move(t));
310 Future<T> Future<T>::ensure(F func) {
311 MoveWrapper<F> funcw(std::move(func));
312 return this->then([funcw](Try<T>&& t) {
314 return makeFuture(std::move(t));
320 Future<T> Future<T>::onTimeout(Duration dur, F&& func, Timekeeper* tk) {
321 auto funcw = folly::makeMoveWrapper(std::forward<F>(func));
322 return within(dur, tk)
323 .onError([funcw](TimedOut const&) { return (*funcw)(); });
327 typename std::add_lvalue_reference<T>::type Future<T>::value() {
330 return core_->getTry().value();
334 typename std::add_lvalue_reference<const T>::type Future<T>::value() const {
337 return core_->getTry().value();
341 Try<T>& Future<T>::getTry() {
344 return core_->getTry();
348 Optional<Try<T>> Future<T>::poll() {
350 if (core_->ready()) {
351 o = std::move(core_->getTry());
357 template <typename Executor>
358 inline Future<T> Future<T>::via(Executor* executor) && {
361 setExecutor(executor);
363 return std::move(*this);
367 template <typename Executor>
368 inline Future<T> Future<T>::via(Executor* executor) & {
371 MoveWrapper<Promise<T>> p;
372 auto f = p->getFuture();
373 then([p](Try<T>&& t) mutable { p->fulfilTry(std::move(t)); });
374 return std::move(f).via(executor);
378 bool Future<T>::isReady() const {
380 return core_->ready();
384 void Future<T>::raise(exception_wrapper exception) {
385 core_->raise(std::move(exception));
391 Future<typename std::decay<T>::type> makeFuture(T&& t) {
392 Promise<typename std::decay<T>::type> p;
393 p.setValue(std::forward<T>(t));
394 return p.getFuture();
397 inline // for multiple translation units
398 Future<void> makeFuture() {
401 return p.getFuture();
407 typename std::enable_if<!std::is_reference<F>::value, bool>::type sdf)
408 -> Future<decltype(func())> {
409 Promise<decltype(func())> p;
414 return p.getFuture();
418 auto makeFutureTry(F const& func) -> Future<decltype(func())> {
420 return makeFutureTry(std::move(copy));
424 Future<T> makeFuture(std::exception_ptr const& e) {
427 return p.getFuture();
431 Future<T> makeFuture(exception_wrapper ew) {
433 p.setException(std::move(ew));
434 return p.getFuture();
437 template <class T, class E>
438 typename std::enable_if<std::is_base_of<std::exception, E>::value,
440 makeFuture(E const& e) {
442 p.setException(make_exception_wrapper<E>(e));
443 return p.getFuture();
447 Future<T> makeFuture(Try<T>&& t) {
448 Promise<typename std::decay<T>::type> p;
449 p.fulfilTry(std::move(t));
450 return p.getFuture();
454 inline Future<void> makeFuture(Try<void>&& t) {
455 if (t.hasException()) {
456 return makeFuture<void>(std::move(t.exception()));
463 template <typename Executor>
464 Future<void> via(Executor* executor) {
465 return makeFuture().via(executor);
470 template <typename... Fs>
471 typename detail::VariadicContext<
472 typename std::decay<Fs>::type::value_type...>::type
476 new detail::VariadicContext<typename std::decay<Fs>::type::value_type...>();
477 ctx->total = sizeof...(fs);
478 auto f_saved = ctx->p.getFuture();
479 detail::whenAllVariadicHelper(ctx,
480 std::forward<typename std::decay<Fs>::type>(fs)...);
486 template <class InputIterator>
489 Try<typename std::iterator_traits<InputIterator>::value_type::value_type>>>
490 whenAll(InputIterator first, InputIterator last)
493 typename std::iterator_traits<InputIterator>::value_type::value_type T;
496 return makeFuture(std::vector<Try<T>>());
498 size_t n = std::distance(first, last);
500 auto ctx = new detail::WhenAllContext<T>();
502 ctx->results.resize(n);
504 auto f_saved = ctx->p.getFuture();
506 for (size_t i = 0; first != last; ++first, ++i) {
509 f.setCallback_([ctx, i, n](Try<T>&& t) {
510 ctx->results[i] = std::move(t);
511 if (++ctx->count == n) {
512 ctx->p.setValue(std::move(ctx->results));
521 template <class InputIterator>
526 std::iterator_traits<InputIterator>::value_type::value_type> > >
527 whenAny(InputIterator first, InputIterator last) {
529 typename std::iterator_traits<InputIterator>::value_type::value_type T;
531 auto ctx = new detail::WhenAnyContext<T>(std::distance(first, last));
532 auto f_saved = ctx->p.getFuture();
534 for (size_t i = 0; first != last; first++, i++) {
536 f.setCallback_([i, ctx](Try<T>&& t) {
537 if (!ctx->done.exchange(true)) {
538 ctx->p.setValue(std::make_pair(i, std::move(t)));
547 template <class InputIterator>
548 Future<std::vector<std::pair<size_t, Try<typename
549 std::iterator_traits<InputIterator>::value_type::value_type>>>>
550 whenN(InputIterator first, InputIterator last, size_t n) {
552 std::iterator_traits<InputIterator>::value_type::value_type T;
553 typedef std::vector<std::pair<size_t, Try<T>>> V;
560 auto ctx = std::make_shared<ctx_t>();
563 // for each completed Future, increase count and add to vector, until we
564 // have n completed futures at which point we fulfil our Promise with the
569 it->then([ctx, n, i](Try<T>&& t) {
571 auto c = ++ctx->completed;
573 assert(ctx->v.size() < n);
574 v.push_back(std::make_pair(i, std::move(t)));
576 ctx->p.fulfilTry(Try<V>(std::move(v)));
586 ctx->p.setException(std::runtime_error("Not enough futures"));
589 return ctx->p.getFuture();
593 Future<T> Future<T>::within(Duration dur, Timekeeper* tk) {
594 return within(dur, TimedOut(), tk);
599 Future<T> Future<T>::within(Duration dur, E e, Timekeeper* tk) {
602 Context(E ex) : exception(std::move(ex)), promise(), token(false) {}
605 std::atomic<bool> token;
607 auto ctx = std::make_shared<Context>(std::move(e));
610 tk = folly::detail::getTimekeeperSingleton();
614 .then([ctx](Try<void> const& t) {
615 if (ctx->token.exchange(true) == false) {
616 if (t.hasException()) {
617 ctx->promise.setException(std::move(t.exception()));
619 ctx->promise.setException(std::move(ctx->exception));
624 this->then([ctx](Try<T>&& t) {
625 if (ctx->token.exchange(true) == false) {
626 ctx->promise.fulfilTry(std::move(t));
630 return ctx->promise.getFuture();
634 Future<T> Future<T>::delayed(Duration dur, Timekeeper* tk) {
635 return whenAll(*this, futures::sleep(dur, tk))
636 .then([](std::tuple<Try<T>, Try<void>> tup) {
637 Try<T>& t = std::get<0>(tup);
638 return makeFuture<T>(std::move(t));
645 void waitImpl(Future<T>& f) {
646 // short-circuit if there's nothing to do
647 if (f.isReady()) return;
650 f = f.then([&](Try<T> t) {
652 return makeFuture(std::move(t));
656 // There's a race here between the return here and the actual finishing of
657 // the future. f is completed, but the setup may not have finished on done
658 // after the baton has posted.
659 while (!f.isReady()) {
660 std::this_thread::yield();
665 void waitImpl(Future<T>& f, Duration dur) {
666 // short-circuit if there's nothing to do
667 if (f.isReady()) return;
669 auto baton = std::make_shared<Baton<>>();
670 f = f.then([baton](Try<T> t) {
672 return makeFuture(std::move(t));
675 // Let's preserve the invariant that if we did not timeout (timed_wait returns
676 // true), then the returned Future is complete when it is returned to the
677 // caller. We need to wait out the race for that Future to complete.
678 if (baton->timed_wait(std::chrono::system_clock::now() + dur)) {
679 while (!f.isReady()) {
680 std::this_thread::yield();
686 void waitViaImpl(Future<T>& f, DrivableExecutor* e) {
687 while (!f.isReady()) {
695 Future<T>& Future<T>::wait() & {
696 detail::waitImpl(*this);
701 Future<T>&& Future<T>::wait() && {
702 detail::waitImpl(*this);
703 return std::move(*this);
707 Future<T>& Future<T>::wait(Duration dur) & {
708 detail::waitImpl(*this, dur);
713 Future<T>&& Future<T>::wait(Duration dur) && {
714 detail::waitImpl(*this, dur);
715 return std::move(*this);
719 Future<T>& Future<T>::waitVia(DrivableExecutor* e) & {
720 detail::waitViaImpl(*this, e);
725 Future<T>&& Future<T>::waitVia(DrivableExecutor* e) && {
726 detail::waitViaImpl(*this, e);
727 return std::move(*this);
732 return std::move(wait().value());
736 inline void Future<void>::get() {
741 T Future<T>::get(Duration dur) {
744 return std::move(value());
751 inline void Future<void>::get(Duration dur) {
761 T Future<T>::getVia(DrivableExecutor* e) {
762 return std::move(waitVia(e).value());
766 inline void Future<void>::getVia(DrivableExecutor* e) {
771 Future<bool> Future<T>::willEqual(Future<T>& f) {
772 return whenAll(*this, f).then([](const std::tuple<Try<T>, Try<T>>& t) {
773 if (std::get<0>(t).hasValue() && std::get<1>(t).hasValue()) {
774 return std::get<0>(t).value() == std::get<1>(t).value();
784 Future<Z> chainHelper(Future<Z> f) {
788 template <class Z, class F, class Fn, class... Callbacks>
789 Future<Z> chainHelper(F f, Fn fn, Callbacks... fns) {
790 return chainHelper<Z>(f.then(fn), fns...);
794 template <class A, class Z, class... Callbacks>
795 std::function<Future<Z>(Try<A>)>
796 chain(Callbacks... fns) {
797 MoveWrapper<Promise<A>> pw;
798 MoveWrapper<Future<Z>> fw(chainHelper<Z>(pw->getFuture(), fns...));
799 return [=](Try<A> t) mutable {
800 pw->fulfilTry(std::move(t));
801 return std::move(*fw);
809 // I haven't included a Future<T&> specialization because I don't forsee us
810 // using it, however it is not difficult to add when needed. Refer to
811 // Future<void> for guidance. std::future and boost::future code would also be