2 * Copyright 2014 Facebook, Inc.
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
8 * http://www.apache.org/licenses/LICENSE-2.0
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
22 #include <folly/Baton.h>
23 #include <folly/futures/detail/Core.h>
24 #include <folly/futures/Timekeeper.h>
31 Timekeeper* getTimekeeperSingleton();
35 Future<T>::Future(Future<T>&& other) noexcept : core_(nullptr) {
36 *this = std::move(other);
40 Future<T>& Future<T>::operator=(Future<T>&& other) {
41 std::swap(core_, other.core_);
48 const typename std::enable_if<!std::is_void<F>::value, F>::type& val)
52 *this = p.getFuture();
58 typename std::enable_if<!std::is_void<F>::value, F>::type&& val)
61 p.setValue(std::forward<F>(val));
62 *this = p.getFuture();
67 typename std::enable_if<std::is_void<F>::value, int>::type>
68 Future<void>::Future() : core_(nullptr) {
71 *this = p.getFuture();
76 Future<T>::~Future() {
81 void Future<T>::detach() {
83 core_->detachFuture();
89 void Future<T>::throwIfInvalid() const {
96 void Future<T>::setCallback_(F&& func) {
98 core_->setCallback(std::move(func));
101 // Variant: returns a value
102 // e.g. f.then([](Try<T>&& t){ return t.value(); });
104 template <typename F, typename R, bool isTry, typename... Args>
105 typename std::enable_if<!R::ReturnsFuture::value, typename R::Return>::type
106 Future<T>::thenImplementation(F func, detail::argResult<isTry, F, Args...>) {
107 static_assert(sizeof...(Args) <= 1, "Then must take zero/one argument");
108 typedef typename R::ReturnsFuture::Inner B;
112 // wrap these so we can move them into the lambda
113 folly::MoveWrapper<Promise<B>> p;
114 folly::MoveWrapper<F> funcm(std::forward<F>(func));
116 // grab the Future now before we lose our handle on the Promise
117 auto f = p->getFuture();
119 /* This is a bit tricky.
121 We can't just close over *this in case this Future gets moved. So we
122 make a new dummy Future. We could figure out something more
123 sophisticated that avoids making a new Future object when it can, as an
124 optimization. But this is correct.
126 core_ can't be moved, it is explicitly disallowed (as is copying). But
127 if there's ever a reason to allow it, this is one place that makes that
128 assumption and would need to be fixed. We use a standard shared pointer
129 for core_ (by copying it in), which means in essence obj holds a shared
130 pointer to itself. But this shouldn't leak because Promise will not
131 outlive the continuation, because Promise will setException() with a
132 broken Promise if it is destructed before completed. We could use a
133 weak pointer but it would have to be converted to a shared pointer when
134 func is executed (because the Future returned by func may possibly
135 persist beyond the callback, if it gets moved), and so it is an
136 optimization to just make it shared from the get-go.
138 We have to move in the Promise and func using the MoveWrapper
139 hack. (func could be copied but it's a big drag on perf).
141 Two subtle but important points about this design. detail::Core has no
142 back pointers to Future or Promise, so if Future or Promise get moved
143 (and they will be moved in performant code) we don't have to do
144 anything fancy. And because we store the continuation in the
145 detail::Core, not in the Future, we can execute the continuation even
146 after the Future has gone out of scope. This is an intentional design
147 decision. It is likely we will want to be able to cancel a continuation
148 in some circumstances, but I think it should be explicit not implicit
149 in the destruction of the Future used to create it.
152 [p, funcm](Try<T>&& t) mutable {
153 if (!isTry && t.hasException()) {
154 p->setException(std::move(t.exception()));
157 return (*funcm)(t.template get<isTry, Args>()...);
165 // Variant: returns a Future
166 // e.g. f.then([](T&& t){ return makeFuture<T>(t); });
168 template <typename F, typename R, bool isTry, typename... Args>
169 typename std::enable_if<R::ReturnsFuture::value, typename R::Return>::type
170 Future<T>::thenImplementation(F func, detail::argResult<isTry, F, Args...>) {
171 static_assert(sizeof...(Args) <= 1, "Then must take zero/one argument");
172 typedef typename R::ReturnsFuture::Inner B;
176 // wrap these so we can move them into the lambda
177 folly::MoveWrapper<Promise<B>> p;
178 folly::MoveWrapper<F> funcm(std::forward<F>(func));
180 // grab the Future now before we lose our handle on the Promise
181 auto f = p->getFuture();
184 [p, funcm](Try<T>&& t) mutable {
185 if (!isTry && t.hasException()) {
186 p->setException(std::move(t.exception()));
189 auto f2 = (*funcm)(t.template get<isTry, Args>()...);
190 // that didn't throw, now we can steal p
191 f2.setCallback_([p](Try<B>&& b) mutable {
192 p->fulfilTry(std::move(b));
194 } catch (const std::exception& e) {
195 p->setException(exception_wrapper(std::current_exception(), e));
197 p->setException(exception_wrapper(std::current_exception()));
205 template <typename T>
206 template <typename R, typename Caller, typename... Args>
207 Future<typename isFuture<R>::Inner>
208 Future<T>::then(R(Caller::*func)(Args...), Caller *instance) {
209 typedef typename std::remove_cv<
210 typename std::remove_reference<
211 typename detail::ArgType<Args...>::FirstArg>::type>::type FirstArg;
212 return then([instance, func](Try<T>&& t){
213 return (instance->*func)(t.template get<isTry<FirstArg>::value, Args>()...);
218 Future<void> Future<T>::then() {
219 return then([] (Try<T>&& t) {});
222 // onError where the callback returns T
225 typename std::enable_if<
226 !detail::Extract<F>::ReturnsFuture::value,
228 Future<T>::onError(F&& func) {
229 typedef typename detail::Extract<F>::FirstArg Exn;
231 std::is_same<typename detail::Extract<F>::RawReturn, T>::value,
232 "Return type of onError callback must be T or Future<T>");
235 auto f = p.getFuture();
236 auto pm = folly::makeMoveWrapper(std::move(p));
237 auto funcm = folly::makeMoveWrapper(std::move(func));
238 setCallback_([pm, funcm](Try<T>&& t) mutable {
239 if (!t.template withException<Exn>([&] (Exn& e) {
244 pm->fulfilTry(std::move(t));
251 // onError where the callback returns Future<T>
254 typename std::enable_if<
255 detail::Extract<F>::ReturnsFuture::value,
257 Future<T>::onError(F&& func) {
259 std::is_same<typename detail::Extract<F>::Return, Future<T>>::value,
260 "Return type of onError callback must be T or Future<T>");
261 typedef typename detail::Extract<F>::FirstArg Exn;
264 auto f = p.getFuture();
265 auto pm = folly::makeMoveWrapper(std::move(p));
266 auto funcm = folly::makeMoveWrapper(std::move(func));
267 setCallback_([pm, funcm](Try<T>&& t) mutable {
268 if (!t.template withException<Exn>([&] (Exn& e) {
270 auto f2 = (*funcm)(e);
271 f2.setCallback_([pm](Try<T>&& t2) mutable {
272 pm->fulfilTry(std::move(t2));
274 } catch (const std::exception& e2) {
275 pm->setException(exception_wrapper(std::current_exception(), e2));
277 pm->setException(exception_wrapper(std::current_exception()));
280 pm->fulfilTry(std::move(t));
289 Future<T> Future<T>::onTimeout(Duration dur, F&& func, Timekeeper* tk) {
290 auto funcw = folly::makeMoveWrapper(std::forward<F>(func));
291 return within(dur, tk)
292 .onError([funcw](TimedOut const&) { return (*funcw)(); });
296 typename std::add_lvalue_reference<T>::type Future<T>::value() {
299 return core_->getTry().value();
303 typename std::add_lvalue_reference<const T>::type Future<T>::value() const {
306 return core_->getTry().value();
310 Try<T>& Future<T>::getTry() {
313 return core_->getTry();
317 template <typename Executor>
318 inline Future<T> Future<T>::via(Executor* executor) && {
322 core_->setExecutor(executor);
324 return std::move(*this);
328 template <typename Executor>
329 inline Future<T> Future<T>::via(Executor* executor) & {
332 MoveWrapper<Promise<T>> p;
333 auto f = p->getFuture();
334 then([p](Try<T>&& t) mutable { p->fulfilTry(std::move(t)); });
335 return std::move(f).via(executor);
339 bool Future<T>::isReady() const {
341 return core_->ready();
345 void Future<T>::raise(exception_wrapper exception) {
346 core_->raise(std::move(exception));
352 Future<typename std::decay<T>::type> makeFuture(T&& t) {
353 Promise<typename std::decay<T>::type> p;
354 p.setValue(std::forward<T>(t));
355 return p.getFuture();
358 inline // for multiple translation units
359 Future<void> makeFuture() {
362 return p.getFuture();
368 typename std::enable_if<!std::is_reference<F>::value, bool>::type sdf)
369 -> Future<decltype(func())> {
370 Promise<decltype(func())> p;
375 return p.getFuture();
379 auto makeFutureTry(F const& func) -> Future<decltype(func())> {
381 return makeFutureTry(std::move(copy));
385 Future<T> makeFuture(std::exception_ptr const& e) {
388 return p.getFuture();
392 Future<T> makeFuture(exception_wrapper ew) {
394 p.setException(std::move(ew));
395 return p.getFuture();
398 template <class T, class E>
399 typename std::enable_if<std::is_base_of<std::exception, E>::value,
401 makeFuture(E const& e) {
403 p.setException(make_exception_wrapper<E>(e));
404 return p.getFuture();
408 Future<T> makeFuture(Try<T>&& t) {
409 Promise<typename std::decay<T>::type> p;
410 p.fulfilTry(std::move(t));
411 return p.getFuture();
415 inline Future<void> makeFuture(Try<void>&& t) {
416 if (t.hasException()) {
417 return makeFuture<void>(std::move(t.exception()));
424 template <typename Executor>
425 Future<void> via(Executor* executor) {
426 return makeFuture().via(executor);
431 template <typename... Fs>
432 typename detail::VariadicContext<
433 typename std::decay<Fs>::type::value_type...>::type
437 new detail::VariadicContext<typename std::decay<Fs>::type::value_type...>();
438 ctx->total = sizeof...(fs);
439 auto f_saved = ctx->p.getFuture();
440 detail::whenAllVariadicHelper(ctx,
441 std::forward<typename std::decay<Fs>::type>(fs)...);
447 template <class InputIterator>
450 Try<typename std::iterator_traits<InputIterator>::value_type::value_type>>>
451 whenAll(InputIterator first, InputIterator last)
454 typename std::iterator_traits<InputIterator>::value_type::value_type T;
457 return makeFuture(std::vector<Try<T>>());
459 size_t n = std::distance(first, last);
461 auto ctx = new detail::WhenAllContext<T>();
463 ctx->results.resize(n);
465 auto f_saved = ctx->p.getFuture();
467 for (size_t i = 0; first != last; ++first, ++i) {
470 f.setCallback_([ctx, i, n](Try<T>&& t) {
471 ctx->results[i] = std::move(t);
472 if (++ctx->count == n) {
473 ctx->p.setValue(std::move(ctx->results));
482 template <class InputIterator>
487 std::iterator_traits<InputIterator>::value_type::value_type> > >
488 whenAny(InputIterator first, InputIterator last) {
490 typename std::iterator_traits<InputIterator>::value_type::value_type T;
492 auto ctx = new detail::WhenAnyContext<T>(std::distance(first, last));
493 auto f_saved = ctx->p.getFuture();
495 for (size_t i = 0; first != last; first++, i++) {
497 f.setCallback_([i, ctx](Try<T>&& t) {
498 if (!ctx->done.exchange(true)) {
499 ctx->p.setValue(std::make_pair(i, std::move(t)));
508 template <class InputIterator>
509 Future<std::vector<std::pair<size_t, Try<typename
510 std::iterator_traits<InputIterator>::value_type::value_type>>>>
511 whenN(InputIterator first, InputIterator last, size_t n) {
513 std::iterator_traits<InputIterator>::value_type::value_type T;
514 typedef std::vector<std::pair<size_t, Try<T>>> V;
521 auto ctx = std::make_shared<ctx_t>();
524 // for each completed Future, increase count and add to vector, until we
525 // have n completed futures at which point we fulfil our Promise with the
530 it->then([ctx, n, i](Try<T>&& t) {
532 auto c = ++ctx->completed;
534 assert(ctx->v.size() < n);
535 v.push_back(std::make_pair(i, std::move(t)));
537 ctx->p.fulfilTry(Try<V>(std::move(v)));
547 ctx->p.setException(std::runtime_error("Not enough futures"));
550 return ctx->p.getFuture();
555 void getWaitHelper(Future<T>* f) {
556 // If we already have a value do the cheap thing
561 folly::Baton<> baton;
562 f->then([&](Try<T> const&) {
569 Future<T> getWaitTimeoutHelper(Future<T>* f, Duration dur) {
570 // TODO make and use variadic whenAny #5877971
572 auto token = std::make_shared<std::atomic<bool>>();
573 folly::Baton<> baton;
575 folly::detail::getTimekeeperSingleton()->after(dur)
576 .then([&,token](Try<void> const& t) {
577 if (token->exchange(true) == false) {
578 if (t.hasException()) {
579 p.setException(std::move(t.exception()));
581 p.setException(TimedOut());
587 f->then([&, token](Try<T>&& t) {
588 if (token->exchange(true) == false) {
589 p.fulfilTry(std::move(t));
595 return p.getFuture();
603 // Big assumption here: the then() call above, since it doesn't move out
604 // the value, leaves us with a value to return here. This would be a big
605 // no-no in user code, but I'm invoking internal developer privilege. This
606 // is slightly more efficient (save a move()) especially if there's an
607 // exception (save a throw).
608 return std::move(value());
612 inline void Future<void>::get() {
618 T Future<T>::get(Duration dur) {
619 return std::move(getWaitTimeoutHelper(this, dur).value());
623 inline void Future<void>::get(Duration dur) {
624 getWaitTimeoutHelper(this, dur).value();
628 T Future<T>::getVia(DrivableExecutor* e) {
632 return std::move(value());
636 inline void Future<void>::getVia(DrivableExecutor* e) {
644 Future<T> Future<T>::within(Duration dur, Timekeeper* tk) {
645 return within(dur, TimedOut(), tk);
650 Future<T> Future<T>::within(Duration dur, E e, Timekeeper* tk) {
653 Context(E ex) : exception(std::move(ex)), promise(), token(false) {}
656 std::atomic<bool> token;
658 auto ctx = std::make_shared<Context>(std::move(e));
661 tk = folly::detail::getTimekeeperSingleton();
665 .then([ctx](Try<void> const& t) {
666 if (ctx->token.exchange(true) == false) {
667 if (t.hasException()) {
668 ctx->promise.setException(std::move(t.exception()));
670 ctx->promise.setException(std::move(ctx->exception));
675 this->then([ctx](Try<T>&& t) {
676 if (ctx->token.exchange(true) == false) {
677 ctx->promise.fulfilTry(std::move(t));
681 return ctx->promise.getFuture();
685 Future<T> Future<T>::delayed(Duration dur, Timekeeper* tk) {
686 return whenAll(*this, futures::sleep(dur, tk))
687 .then([](std::tuple<Try<T>, Try<void>> tup) {
688 Try<T>& t = std::get<0>(tup);
689 return makeFuture<T>(std::move(t));
696 void waitImpl(Future<T>& f) {
698 f = f.then([&](Try<T> t) {
700 return makeFuture(std::move(t));
703 // There's a race here between the return here and the actual finishing of
704 // the future. f is completed, but the setup may not have finished on done
705 // after the baton has posted.
706 while (!f.isReady()) {
707 std::this_thread::yield();
712 void waitImpl(Future<T>& f, Duration dur) {
713 auto baton = std::make_shared<Baton<>>();
714 f = f.then([baton](Try<T> t) {
716 return makeFuture(std::move(t));
718 // Let's preserve the invariant that if we did not timeout (timed_wait returns
719 // true), then the returned Future is complete when it is returned to the
720 // caller. We need to wait out the race for that Future to complete.
721 if (baton->timed_wait(std::chrono::system_clock::now() + dur)) {
722 while (!f.isReady()) {
723 std::this_thread::yield();
729 void waitViaImpl(Future<T>& f, DrivableExecutor* e) {
730 while (!f.isReady()) {
738 Future<T>& Future<T>::wait() & {
739 detail::waitImpl(*this);
744 Future<T>&& Future<T>::wait() && {
745 detail::waitImpl(*this);
746 return std::move(*this);
750 Future<T>& Future<T>::wait(Duration dur) & {
751 detail::waitImpl(*this, dur);
756 Future<T>&& Future<T>::wait(Duration dur) && {
757 detail::waitImpl(*this, dur);
758 return std::move(*this);
762 Future<T>& Future<T>::waitVia(DrivableExecutor* e) & {
763 detail::waitViaImpl(*this, e);
768 Future<T>&& Future<T>::waitVia(DrivableExecutor* e) && {
769 detail::waitViaImpl(*this, e);
770 return std::move(*this);
777 Future<Z> chainHelper(Future<Z> f) {
781 template <class Z, class F, class Fn, class... Callbacks>
782 Future<Z> chainHelper(F f, Fn fn, Callbacks... fns) {
783 return chainHelper<Z>(f.then(fn), fns...);
787 template <class A, class Z, class... Callbacks>
788 std::function<Future<Z>(Try<A>)>
789 chain(Callbacks... fns) {
790 MoveWrapper<Promise<A>> pw;
791 MoveWrapper<Future<Z>> fw(chainHelper<Z>(pw->getFuture(), fns...));
792 return [=](Try<A> t) mutable {
793 pw->fulfilTry(std::move(t));
794 return std::move(*fw);
802 // I haven't included a Future<T&> specialization because I don't forsee us
803 // using it, however it is not difficult to add when needed. Refer to
804 // Future<void> for guidance. std::future and boost::future code would also be