2 * Copyright 2014 Facebook, Inc.
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
8 * http://www.apache.org/licenses/LICENSE-2.0
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
22 #include <folly/Baton.h>
23 #include <folly/wangle/futures/detail/Core.h>
24 #include <folly/wangle/futures/Timekeeper.h>
26 namespace folly { namespace wangle {
31 Timekeeper* getTimekeeperSingleton();
36 static const bool value = false;
40 struct isFuture<Future<T> > {
41 static const bool value = true;
45 Future<T>::Future(Future<T>&& other) noexcept : core_(nullptr) {
46 *this = std::move(other);
50 Future<T>& Future<T>::operator=(Future<T>&& other) {
51 std::swap(core_, other.core_);
56 Future<T>::~Future() {
61 void Future<T>::detach() {
63 core_->detachFuture();
69 void Future<T>::throwIfInvalid() const {
76 void Future<T>::setCallback_(F&& func) {
78 core_->setCallback(std::move(func));
81 // Variant: f.then([](Try<T>&& t){ return t.value(); });
84 typename std::enable_if<
85 !isFuture<typename std::result_of<F(Try<T>&&)>::type>::value,
86 Future<typename std::result_of<F(Try<T>&&)>::type> >::type
87 Future<T>::then(F&& func) {
88 typedef typename std::result_of<F(Try<T>&&)>::type B;
92 // wrap these so we can move them into the lambda
93 folly::MoveWrapper<Promise<B>> p;
94 folly::MoveWrapper<F> funcm(std::forward<F>(func));
96 // grab the Future now before we lose our handle on the Promise
97 auto f = p->getFuture();
99 /* This is a bit tricky.
101 We can't just close over *this in case this Future gets moved. So we
102 make a new dummy Future. We could figure out something more
103 sophisticated that avoids making a new Future object when it can, as an
104 optimization. But this is correct.
106 core_ can't be moved, it is explicitly disallowed (as is copying). But
107 if there's ever a reason to allow it, this is one place that makes that
108 assumption and would need to be fixed. We use a standard shared pointer
109 for core_ (by copying it in), which means in essence obj holds a shared
110 pointer to itself. But this shouldn't leak because Promise will not
111 outlive the continuation, because Promise will setException() with a
112 broken Promise if it is destructed before completed. We could use a
113 weak pointer but it would have to be converted to a shared pointer when
114 func is executed (because the Future returned by func may possibly
115 persist beyond the callback, if it gets moved), and so it is an
116 optimization to just make it shared from the get-go.
118 We have to move in the Promise and func using the MoveWrapper
119 hack. (func could be copied but it's a big drag on perf).
121 Two subtle but important points about this design. detail::Core has no
122 back pointers to Future or Promise, so if Future or Promise get moved
123 (and they will be moved in performant code) we don't have to do
124 anything fancy. And because we store the continuation in the
125 detail::Core, not in the Future, we can execute the continuation even
126 after the Future has gone out of scope. This is an intentional design
127 decision. It is likely we will want to be able to cancel a continuation
128 in some circumstances, but I think it should be explicit not implicit
129 in the destruction of the Future used to create it.
132 [p, funcm](Try<T>&& t) mutable {
134 return (*funcm)(std::move(t));
141 // Variant: f.then([](T&& t){ return t; });
144 typename std::enable_if<
145 !std::is_same<T, void>::value &&
146 !isFuture<typename std::result_of<
147 F(typename detail::AliasIfVoid<T>::type&&)>::type>::value,
148 Future<typename std::result_of<
149 F(typename detail::AliasIfVoid<T>::type&&)>::type> >::type
150 Future<T>::then(F&& func) {
151 typedef typename std::result_of<F(T&&)>::type B;
155 folly::MoveWrapper<Promise<B>> p;
156 folly::MoveWrapper<F> funcm(std::forward<F>(func));
157 auto f = p->getFuture();
160 [p, funcm](Try<T>&& t) mutable {
161 if (t.hasException()) {
162 p->setException(std::move(t.exception()));
165 return (*funcm)(std::move(t.value()));
173 // Variant: f.then([](){ return; });
176 typename std::enable_if<
177 std::is_same<T, void>::value &&
178 !isFuture<typename std::result_of<F()>::type>::value,
179 Future<typename std::result_of<F()>::type> >::type
180 Future<T>::then(F&& func) {
181 typedef typename std::result_of<F()>::type B;
185 folly::MoveWrapper<Promise<B>> p;
186 folly::MoveWrapper<F> funcm(std::forward<F>(func));
187 auto f = p->getFuture();
190 [p, funcm](Try<T>&& t) mutable {
191 if (t.hasException()) {
192 p->setException(std::move(t.exception()));
203 // Variant: f.then([](Try<T>&& t){ return makeFuture<T>(t.value()); });
206 typename std::enable_if<
207 isFuture<typename std::result_of<F(Try<T>&&)>::type>::value,
208 Future<typename std::result_of<F(Try<T>&&)>::type::value_type> >::type
209 Future<T>::then(F&& func) {
210 typedef typename std::result_of<F(Try<T>&&)>::type::value_type B;
214 // wrap these so we can move them into the lambda
215 folly::MoveWrapper<Promise<B>> p;
216 folly::MoveWrapper<F> funcm(std::forward<F>(func));
218 // grab the Future now before we lose our handle on the Promise
219 auto f = p->getFuture();
222 [p, funcm](Try<T>&& t) mutable {
224 auto f2 = (*funcm)(std::move(t));
225 // that didn't throw, now we can steal p
226 f2.setCallback_([p](Try<B>&& b) mutable {
227 p->fulfilTry(std::move(b));
229 } catch (const std::exception& e) {
230 p->setException(exception_wrapper(std::current_exception(), e));
232 p->setException(exception_wrapper(std::current_exception()));
239 // Variant: f.then([](T&& t){ return makeFuture<T>(t); });
242 typename std::enable_if<
243 !std::is_same<T, void>::value &&
244 isFuture<typename std::result_of<
245 F(typename detail::AliasIfVoid<T>::type&&)>::type>::value,
246 Future<typename std::result_of<
247 F(typename detail::AliasIfVoid<T>::type&&)>::type::value_type> >::type
248 Future<T>::then(F&& func) {
249 typedef typename std::result_of<F(T&&)>::type::value_type B;
253 folly::MoveWrapper<Promise<B>> p;
254 folly::MoveWrapper<F> funcm(std::forward<F>(func));
255 auto f = p->getFuture();
258 [p, funcm](Try<T>&& t) mutable {
259 if (t.hasException()) {
260 p->setException(std::move(t.exception()));
263 auto f2 = (*funcm)(std::move(t.value()));
264 f2.setCallback_([p](Try<B>&& b) mutable {
265 p->fulfilTry(std::move(b));
267 } catch (const std::exception& e) {
268 p->setException(exception_wrapper(std::current_exception(), e));
270 p->setException(exception_wrapper(std::current_exception()));
278 // Variant: f.then([](){ return makeFuture(); });
281 typename std::enable_if<
282 std::is_same<T, void>::value &&
283 isFuture<typename std::result_of<F()>::type>::value,
284 Future<typename std::result_of<F()>::type::value_type> >::type
285 Future<T>::then(F&& func) {
286 typedef typename std::result_of<F()>::type::value_type B;
290 folly::MoveWrapper<Promise<B>> p;
291 folly::MoveWrapper<F> funcm(std::forward<F>(func));
293 auto f = p->getFuture();
296 [p, funcm](Try<T>&& t) mutable {
297 if (t.hasException()) {
298 p->setException(t.exception());
301 auto f2 = (*funcm)();
302 f2.setCallback_([p](Try<B>&& b) mutable {
303 p->fulfilTry(std::move(b));
305 } catch (const std::exception& e) {
306 p->setException(exception_wrapper(std::current_exception(), e));
308 p->setException(exception_wrapper(std::current_exception()));
317 Future<void> Future<T>::then() {
318 return then([] (Try<T>&& t) {});
321 // onError where the callback returns T
324 typename std::enable_if<
325 !detail::Extract<F>::ReturnsFuture::value,
327 Future<T>::onError(F&& func) {
328 typedef typename detail::Extract<F>::FirstArg Exn;
330 std::is_same<typename detail::Extract<F>::RawReturn, T>::value,
331 "Return type of onError callback must be T or Future<T>");
334 auto f = p.getFuture();
335 auto pm = folly::makeMoveWrapper(std::move(p));
336 auto funcm = folly::makeMoveWrapper(std::move(func));
337 setCallback_([pm, funcm](Try<T>&& t) mutable {
338 if (!t.template withException<Exn>([&] (Exn& e) {
343 pm->fulfilTry(std::move(t));
350 // onError where the callback returns Future<T>
353 typename std::enable_if<
354 detail::Extract<F>::ReturnsFuture::value,
356 Future<T>::onError(F&& func) {
358 std::is_same<typename detail::Extract<F>::Return, Future<T>>::value,
359 "Return type of onError callback must be T or Future<T>");
360 typedef typename detail::Extract<F>::FirstArg Exn;
363 auto f = p.getFuture();
364 auto pm = folly::makeMoveWrapper(std::move(p));
365 auto funcm = folly::makeMoveWrapper(std::move(func));
366 setCallback_([pm, funcm](Try<T>&& t) mutable {
367 if (!t.template withException<Exn>([&] (Exn& e) {
369 auto f2 = (*funcm)(e);
370 f2.setCallback_([pm](Try<T>&& t2) mutable {
371 pm->fulfilTry(std::move(t2));
373 } catch (const std::exception& e) {
374 pm->setException(exception_wrapper(std::current_exception(), e));
376 pm->setException(exception_wrapper(std::current_exception()));
379 pm->fulfilTry(std::move(t));
387 typename std::add_lvalue_reference<T>::type Future<T>::value() {
390 return core_->getTry().value();
394 typename std::add_lvalue_reference<const T>::type Future<T>::value() const {
397 return core_->getTry().value();
401 Try<T>& Future<T>::getTry() {
404 return core_->getTry();
408 template <typename Executor>
409 inline Future<T> Future<T>::via(Executor* executor) && {
413 core_->setExecutor(executor);
415 return std::move(*this);
419 template <typename Executor>
420 inline Future<T> Future<T>::via(Executor* executor) & {
423 MoveWrapper<Promise<T>> p;
424 auto f = p->getFuture();
425 then([p](Try<T>&& t) mutable { p->fulfilTry(std::move(t)); });
426 return std::move(f).via(executor);
430 bool Future<T>::isReady() const {
432 return core_->ready();
436 void Future<T>::raise(exception_wrapper exception) {
437 core_->raise(std::move(exception));
443 Future<typename std::decay<T>::type> makeFuture(T&& t) {
444 Promise<typename std::decay<T>::type> p;
445 auto f = p.getFuture();
446 p.setValue(std::forward<T>(t));
450 inline // for multiple translation units
451 Future<void> makeFuture() {
453 auto f = p.getFuture();
461 typename std::enable_if<!std::is_reference<F>::value, bool>::type sdf)
462 -> Future<decltype(func())> {
463 Promise<decltype(func())> p;
464 auto f = p.getFuture();
473 auto makeFutureTry(F const& func) -> Future<decltype(func())> {
475 return makeFutureTry(std::move(copy));
479 Future<T> makeFuture(std::exception_ptr const& e) {
481 auto f = p.getFuture();
487 Future<T> makeFuture(exception_wrapper ew) {
489 p.setException(std::move(ew));
490 return p.getFuture();
493 template <class T, class E>
494 typename std::enable_if<std::is_base_of<std::exception, E>::value,
496 makeFuture(E const& e) {
498 auto f = p.getFuture();
499 p.setException(make_exception_wrapper<E>(e));
504 Future<T> makeFuture(Try<T>&& t) {
505 if (t.hasException()) {
506 return makeFuture<T>(std::move(t.exception()));
508 return makeFuture<T>(std::move(t.value()));
513 inline Future<void> makeFuture(Try<void>&& t) {
514 if (t.hasException()) {
515 return makeFuture<void>(std::move(t.exception()));
522 template <typename Executor>
523 Future<void> via(Executor* executor) {
524 return makeFuture().via(executor);
529 template <typename... Fs>
530 typename detail::VariadicContext<
531 typename std::decay<Fs>::type::value_type...>::type
535 new detail::VariadicContext<typename std::decay<Fs>::type::value_type...>();
536 ctx->total = sizeof...(fs);
537 auto f_saved = ctx->p.getFuture();
538 detail::whenAllVariadicHelper(ctx,
539 std::forward<typename std::decay<Fs>::type>(fs)...);
540 return std::move(f_saved);
545 template <class InputIterator>
548 Try<typename std::iterator_traits<InputIterator>::value_type::value_type>>>
549 whenAll(InputIterator first, InputIterator last)
552 typename std::iterator_traits<InputIterator>::value_type::value_type T;
554 auto n = std::distance(first, last);
556 return makeFuture(std::vector<Try<T>>());
559 auto ctx = new detail::WhenAllContext<T>();
561 ctx->results.resize(n);
563 auto f_saved = ctx->p.getFuture();
565 for (size_t i = 0; first != last; ++first, ++i) {
568 f.setCallback_([ctx, i, n](Try<T>&& t) {
569 ctx->results[i] = std::move(t);
570 if (++ctx->count == n) {
571 ctx->p.setValue(std::move(ctx->results));
577 return std::move(f_saved);
580 template <class InputIterator>
585 std::iterator_traits<InputIterator>::value_type::value_type> > >
586 whenAny(InputIterator first, InputIterator last) {
588 typename std::iterator_traits<InputIterator>::value_type::value_type T;
590 auto ctx = new detail::WhenAnyContext<T>(std::distance(first, last));
591 auto f_saved = ctx->p.getFuture();
593 for (size_t i = 0; first != last; first++, i++) {
595 f.setCallback_([i, ctx](Try<T>&& t) {
596 if (!ctx->done.exchange(true)) {
597 ctx->p.setValue(std::make_pair(i, std::move(t)));
603 return std::move(f_saved);
606 template <class InputIterator>
607 Future<std::vector<std::pair<size_t, Try<typename
608 std::iterator_traits<InputIterator>::value_type::value_type>>>>
609 whenN(InputIterator first, InputIterator last, size_t n) {
611 std::iterator_traits<InputIterator>::value_type::value_type T;
612 typedef std::vector<std::pair<size_t, Try<T>>> V;
619 auto ctx = std::make_shared<ctx_t>();
622 // for each completed Future, increase count and add to vector, until we
623 // have n completed futures at which point we fulfil our Promise with the
628 it->then([ctx, n, i](Try<T>&& t) {
630 auto c = ++ctx->completed;
632 assert(ctx->v.size() < n);
633 v.push_back(std::make_pair(i, std::move(t)));
635 ctx->p.fulfilTry(Try<V>(std::move(v)));
645 ctx->p.setException(std::runtime_error("Not enough futures"));
648 return ctx->p.getFuture();
651 template <typename T>
653 waitWithSemaphore(Future<T>&& f) {
655 auto done = f.then([&](Try<T> &&t) {
657 return std::move(t.value());
660 while (!done.isReady()) {
661 // There's a race here between the return here and the actual finishing of
662 // the future. f is completed, but the setup may not have finished on done
663 // after the baton has posted.
664 std::this_thread::yield();
670 inline Future<void> waitWithSemaphore<void>(Future<void>&& f) {
672 auto done = f.then([&](Try<void> &&t) {
677 while (!done.isReady()) {
678 // There's a race here between the return here and the actual finishing of
679 // the future. f is completed, but the setup may not have finished on done
680 // after the baton has posted.
681 std::this_thread::yield();
686 template <typename T, class Dur>
688 waitWithSemaphore(Future<T>&& f, Dur timeout) {
689 auto baton = std::make_shared<Baton<>>();
690 auto done = f.then([baton](Try<T> &&t) {
692 return std::move(t.value());
694 baton->timed_wait(std::chrono::system_clock::now() + timeout);
700 waitWithSemaphore(Future<void>&& f, Dur timeout) {
701 auto baton = std::make_shared<Baton<>>();
702 auto done = f.then([baton](Try<void> &&t) {
706 baton->timed_wait(std::chrono::system_clock::now() + timeout);
712 void getWaitHelper(Future<T>* f) {
713 // If we already have a value do the cheap thing
718 folly::Baton<> baton;
719 f->then([&](Try<T> const&) {
726 Future<T> getWaitTimeoutHelper(Future<T>* f, Duration dur) {
727 // TODO make and use variadic whenAny #5877971
729 auto token = std::make_shared<std::atomic<bool>>();
730 folly::Baton<> baton;
732 folly::wangle::detail::getTimekeeperSingleton()->after(dur)
733 .then([&,token](Try<void> const& t) {
734 if (token->exchange(true) == false) {
737 p.setException(TimedOut());
738 } catch (std::exception const& e) {
739 p.setException(exception_wrapper(std::current_exception(), e));
741 p.setException(exception_wrapper(std::current_exception()));
747 f->then([&, token](Try<T>&& t) {
748 if (token->exchange(true) == false) {
749 p.fulfilTry(std::move(t));
755 return p.getFuture();
763 // Big assumption here: the then() call above, since it doesn't move out
764 // the value, leaves us with a value to return here. This would be a big
765 // no-no in user code, but I'm invoking internal developer privilege. This
766 // is slightly more efficient (save a move()) especially if there's an
767 // exception (save a throw).
768 return std::move(value());
772 inline void Future<void>::get() {
778 T Future<T>::get(Duration dur) {
779 return std::move(getWaitTimeoutHelper(this, dur).value());
783 inline void Future<void>::get(Duration dur) {
784 getWaitTimeoutHelper(this, dur).value();
788 Future<T> Future<T>::within(Duration dur, Timekeeper* tk) {
789 return within(dur, TimedOut(), tk);
794 Future<T> Future<T>::within(Duration dur, E e, Timekeeper* tk) {
797 Context(E ex) : exception(std::move(ex)), promise(), token(false) {}
800 std::atomic<bool> token;
802 auto ctx = std::make_shared<Context>(std::move(e));
805 tk = folly::wangle::detail::getTimekeeperSingleton();
809 .then([ctx](Try<void> const& t) {
810 if (ctx->token.exchange(true) == false) {
813 ctx->promise.setException(std::move(ctx->exception));
814 } catch (std::exception const& e) {
815 ctx->promise.setException(
816 exception_wrapper(std::current_exception(), e));
818 ctx->promise.setException(
819 exception_wrapper(std::current_exception()));
824 this->then([ctx](Try<T>&& t) {
825 if (ctx->token.exchange(true) == false) {
826 ctx->promise.fulfilTry(std::move(t));
830 return ctx->promise.getFuture();
834 Future<T> Future<T>::delayed(Duration dur, Timekeeper* tk) {
835 return whenAll(*this, futures::sleep(dur, tk))
836 .then([](Try<std::tuple<Try<T>, Try<void>>>&& tup) {
837 Try<T>& t = std::get<0>(tup.value());
838 return makeFuture<T>(std::move(t));
844 // I haven't included a Future<T&> specialization because I don't forsee us
845 // using it, however it is not difficult to add when needed. Refer to
846 // Future<void> for guidance. std::future and boost::future code would also be