2 * Copyright 2014 Facebook, Inc.
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
8 * http://www.apache.org/licenses/LICENSE-2.0
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
22 #include <folly/Baton.h>
23 #include <folly/wangle/futures/detail/Core.h>
24 #include <folly/wangle/futures/Timekeeper.h>
26 namespace folly { namespace wangle {
31 Timekeeper* getTimekeeperSingleton();
36 static const bool value = false;
40 struct isFuture<Future<T> > {
41 static const bool value = true;
45 Future<T>::Future(Future<T>&& other) noexcept : core_(nullptr) {
46 *this = std::move(other);
50 Future<T>& Future<T>::operator=(Future<T>&& other) {
51 std::swap(core_, other.core_);
56 Future<T>::~Future() {
61 void Future<T>::detach() {
63 core_->detachFuture();
69 void Future<T>::throwIfInvalid() const {
76 void Future<T>::setCallback_(F&& func) {
78 core_->setCallback(std::move(func));
81 // Variant: f.then([](Try<T>&& t){ return t.value(); });
84 typename std::enable_if<
85 !isFuture<typename std::result_of<F(Try<T>&&)>::type>::value,
86 Future<typename std::result_of<F(Try<T>&&)>::type> >::type
87 Future<T>::then(F&& func) {
88 typedef typename std::result_of<F(Try<T>&&)>::type B;
92 // wrap these so we can move them into the lambda
93 folly::MoveWrapper<Promise<B>> p;
94 folly::MoveWrapper<F> funcm(std::forward<F>(func));
96 // grab the Future now before we lose our handle on the Promise
97 auto f = p->getFuture();
99 /* This is a bit tricky.
101 We can't just close over *this in case this Future gets moved. So we
102 make a new dummy Future. We could figure out something more
103 sophisticated that avoids making a new Future object when it can, as an
104 optimization. But this is correct.
106 core_ can't be moved, it is explicitly disallowed (as is copying). But
107 if there's ever a reason to allow it, this is one place that makes that
108 assumption and would need to be fixed. We use a standard shared pointer
109 for core_ (by copying it in), which means in essence obj holds a shared
110 pointer to itself. But this shouldn't leak because Promise will not
111 outlive the continuation, because Promise will setException() with a
112 broken Promise if it is destructed before completed. We could use a
113 weak pointer but it would have to be converted to a shared pointer when
114 func is executed (because the Future returned by func may possibly
115 persist beyond the callback, if it gets moved), and so it is an
116 optimization to just make it shared from the get-go.
118 We have to move in the Promise and func using the MoveWrapper
119 hack. (func could be copied but it's a big drag on perf).
121 Two subtle but important points about this design. detail::Core has no
122 back pointers to Future or Promise, so if Future or Promise get moved
123 (and they will be moved in performant code) we don't have to do
124 anything fancy. And because we store the continuation in the
125 detail::Core, not in the Future, we can execute the continuation even
126 after the Future has gone out of scope. This is an intentional design
127 decision. It is likely we will want to be able to cancel a continuation
128 in some circumstances, but I think it should be explicit not implicit
129 in the destruction of the Future used to create it.
132 [p, funcm](Try<T>&& t) mutable {
134 return (*funcm)(std::move(t));
141 // Variant: f.then([](T&& t){ return t; });
144 typename std::enable_if<
145 !std::is_same<T, void>::value &&
146 !isFuture<typename std::result_of<
147 F(typename detail::AliasIfVoid<T>::type&&)>::type>::value,
148 Future<typename std::result_of<
149 F(typename detail::AliasIfVoid<T>::type&&)>::type> >::type
150 Future<T>::then(F&& func) {
151 typedef typename std::result_of<F(T&&)>::type B;
155 folly::MoveWrapper<Promise<B>> p;
156 folly::MoveWrapper<F> funcm(std::forward<F>(func));
157 auto f = p->getFuture();
160 [p, funcm](Try<T>&& t) mutable {
161 if (t.hasException()) {
162 p->setException(t.getException());
165 return (*funcm)(std::move(t.value()));
173 // Variant: f.then([](){ return; });
176 typename std::enable_if<
177 std::is_same<T, void>::value &&
178 !isFuture<typename std::result_of<F()>::type>::value,
179 Future<typename std::result_of<F()>::type> >::type
180 Future<T>::then(F&& func) {
181 typedef typename std::result_of<F()>::type B;
185 folly::MoveWrapper<Promise<B>> p;
186 folly::MoveWrapper<F> funcm(std::forward<F>(func));
187 auto f = p->getFuture();
190 [p, funcm](Try<T>&& t) mutable {
191 if (t.hasException()) {
192 p->setException(t.getException());
203 // Variant: f.then([](Try<T>&& t){ return makeFuture<T>(t.value()); });
206 typename std::enable_if<
207 isFuture<typename std::result_of<F(Try<T>&&)>::type>::value,
208 Future<typename std::result_of<F(Try<T>&&)>::type::value_type> >::type
209 Future<T>::then(F&& func) {
210 typedef typename std::result_of<F(Try<T>&&)>::type::value_type B;
214 // wrap these so we can move them into the lambda
215 folly::MoveWrapper<Promise<B>> p;
216 folly::MoveWrapper<F> funcm(std::forward<F>(func));
218 // grab the Future now before we lose our handle on the Promise
219 auto f = p->getFuture();
222 [p, funcm](Try<T>&& t) mutable {
224 auto f2 = (*funcm)(std::move(t));
225 // that didn't throw, now we can steal p
226 f2.setCallback_([p](Try<B>&& b) mutable {
227 p->fulfilTry(std::move(b));
230 p->setException(std::current_exception());
237 // Variant: f.then([](T&& t){ return makeFuture<T>(t); });
240 typename std::enable_if<
241 !std::is_same<T, void>::value &&
242 isFuture<typename std::result_of<
243 F(typename detail::AliasIfVoid<T>::type&&)>::type>::value,
244 Future<typename std::result_of<
245 F(typename detail::AliasIfVoid<T>::type&&)>::type::value_type> >::type
246 Future<T>::then(F&& func) {
247 typedef typename std::result_of<F(T&&)>::type::value_type B;
251 folly::MoveWrapper<Promise<B>> p;
252 folly::MoveWrapper<F> funcm(std::forward<F>(func));
253 auto f = p->getFuture();
256 [p, funcm](Try<T>&& t) mutable {
257 if (t.hasException()) {
258 p->setException(t.getException());
261 auto f2 = (*funcm)(std::move(t.value()));
262 f2.setCallback_([p](Try<B>&& b) mutable {
263 p->fulfilTry(std::move(b));
266 p->setException(std::current_exception());
274 // Variant: f.then([](){ return makeFuture(); });
277 typename std::enable_if<
278 std::is_same<T, void>::value &&
279 isFuture<typename std::result_of<F()>::type>::value,
280 Future<typename std::result_of<F()>::type::value_type> >::type
281 Future<T>::then(F&& func) {
282 typedef typename std::result_of<F()>::type::value_type B;
286 folly::MoveWrapper<Promise<B>> p;
287 folly::MoveWrapper<F> funcm(std::forward<F>(func));
289 auto f = p->getFuture();
292 [p, funcm](Try<T>&& t) mutable {
293 if (t.hasException()) {
294 p->setException(t.getException());
297 auto f2 = (*funcm)();
298 f2.setCallback_([p](Try<B>&& b) mutable {
299 p->fulfilTry(std::move(b));
302 p->setException(std::current_exception());
311 Future<void> Future<T>::then() {
312 return then([] (Try<T>&& t) {});
315 // onError where the callback returns T
318 typename std::enable_if<
319 !detail::Extract<F>::ReturnsFuture::value,
321 Future<T>::onError(F&& func) {
322 typedef typename detail::Extract<F>::FirstArg Exn;
324 std::is_same<typename detail::Extract<F>::RawReturn, T>::value,
325 "Return type of onError callback must be T or Future<T>");
328 auto f = p.getFuture();
329 auto pm = folly::makeMoveWrapper(std::move(p));
330 auto funcm = folly::makeMoveWrapper(std::move(func));
331 setCallback_([pm, funcm](Try<T>&& t) mutable {
342 pm->fulfilTry(std::move(t));
348 // onError where the callback returns Future<T>
351 typename std::enable_if<
352 detail::Extract<F>::ReturnsFuture::value,
354 Future<T>::onError(F&& func) {
356 std::is_same<typename detail::Extract<F>::Return, Future<T>>::value,
357 "Return type of onError callback must be T or Future<T>");
358 typedef typename detail::Extract<F>::FirstArg Exn;
361 auto f = p.getFuture();
362 auto pm = folly::makeMoveWrapper(std::move(p));
363 auto funcm = folly::makeMoveWrapper(std::move(func));
364 setCallback_([pm, funcm](Try<T>&& t) mutable {
369 auto f2 = (*funcm)(e);
370 f2.setCallback_([pm](Try<T>&& t2) mutable {
371 pm->fulfilTry(std::move(t2));
374 pm->setException(std::current_exception());
380 pm->fulfilTry(std::move(t));
387 typename std::add_lvalue_reference<T>::type Future<T>::value() {
390 return core_->getTry().value();
394 typename std::add_lvalue_reference<const T>::type Future<T>::value() const {
397 return core_->getTry().value();
401 Try<T>& Future<T>::getTry() {
404 return core_->getTry();
408 template <typename Executor>
409 inline Future<T> Future<T>::via(Executor* executor) && {
413 core_->setExecutor(executor);
415 return std::move(*this);
419 template <typename Executor>
420 inline Future<T> Future<T>::via(Executor* executor) & {
423 MoveWrapper<Promise<T>> p;
424 auto f = p->getFuture();
425 then([p](Try<T>&& t) mutable { p->fulfilTry(std::move(t)); });
426 return std::move(f).via(executor);
430 bool Future<T>::isReady() const {
432 return core_->ready();
436 void Future<T>::raise(std::exception_ptr exception) {
437 core_->raise(exception);
443 Future<typename std::decay<T>::type> makeFuture(T&& t) {
444 Promise<typename std::decay<T>::type> p;
445 auto f = p.getFuture();
446 p.setValue(std::forward<T>(t));
450 inline // for multiple translation units
451 Future<void> makeFuture() {
453 auto f = p.getFuture();
461 typename std::enable_if<!std::is_reference<F>::value, bool>::type sdf)
462 -> Future<decltype(func())> {
463 Promise<decltype(func())> p;
464 auto f = p.getFuture();
473 auto makeFutureTry(F const& func) -> Future<decltype(func())> {
475 return makeFutureTry(std::move(copy));
479 Future<T> makeFuture(std::exception_ptr const& e) {
481 auto f = p.getFuture();
486 template <class T, class E>
487 typename std::enable_if<std::is_base_of<std::exception, E>::value,
489 makeFuture(E const& e) {
491 auto f = p.getFuture();
492 p.fulfil([&]() -> T { throw e; });
497 Future<T> makeFuture(Try<T>&& t) {
499 return makeFuture<T>(std::move(t.value()));
501 return makeFuture<T>(std::current_exception());
506 inline Future<void> makeFuture(Try<void>&& t) {
511 return makeFuture<void>(std::current_exception());
516 template <typename Executor>
517 Future<void> via(Executor* executor) {
518 return makeFuture().via(executor);
523 template <typename... Fs>
524 typename detail::VariadicContext<
525 typename std::decay<Fs>::type::value_type...>::type
529 new detail::VariadicContext<typename std::decay<Fs>::type::value_type...>();
530 ctx->total = sizeof...(fs);
531 auto f_saved = ctx->p.getFuture();
532 detail::whenAllVariadicHelper(ctx,
533 std::forward<typename std::decay<Fs>::type>(fs)...);
534 return std::move(f_saved);
539 template <class InputIterator>
542 Try<typename std::iterator_traits<InputIterator>::value_type::value_type>>>
543 whenAll(InputIterator first, InputIterator last)
546 typename std::iterator_traits<InputIterator>::value_type::value_type T;
548 auto n = std::distance(first, last);
550 return makeFuture(std::vector<Try<T>>());
553 auto ctx = new detail::WhenAllContext<T>();
555 ctx->results.resize(n);
557 auto f_saved = ctx->p.getFuture();
559 for (size_t i = 0; first != last; ++first, ++i) {
562 f.setCallback_([ctx, i, n](Try<T>&& t) {
563 ctx->results[i] = std::move(t);
564 if (++ctx->count == n) {
565 ctx->p.setValue(std::move(ctx->results));
571 return std::move(f_saved);
574 template <class InputIterator>
579 std::iterator_traits<InputIterator>::value_type::value_type> > >
580 whenAny(InputIterator first, InputIterator last) {
582 typename std::iterator_traits<InputIterator>::value_type::value_type T;
584 auto ctx = new detail::WhenAnyContext<T>(std::distance(first, last));
585 auto f_saved = ctx->p.getFuture();
587 for (size_t i = 0; first != last; first++, i++) {
589 f.setCallback_([i, ctx](Try<T>&& t) {
590 if (!ctx->done.exchange(true)) {
591 ctx->p.setValue(std::make_pair(i, std::move(t)));
597 return std::move(f_saved);
600 template <class InputIterator>
601 Future<std::vector<std::pair<size_t, Try<typename
602 std::iterator_traits<InputIterator>::value_type::value_type>>>>
603 whenN(InputIterator first, InputIterator last, size_t n) {
605 std::iterator_traits<InputIterator>::value_type::value_type T;
606 typedef std::vector<std::pair<size_t, Try<T>>> V;
613 auto ctx = std::make_shared<ctx_t>();
616 // for each completed Future, increase count and add to vector, until we
617 // have n completed futures at which point we fulfil our Promise with the
622 it->then([ctx, n, i](Try<T>&& t) {
624 auto c = ++ctx->completed;
626 assert(ctx->v.size() < n);
627 v.push_back(std::make_pair(i, std::move(t)));
629 ctx->p.fulfilTry(Try<V>(std::move(v)));
639 ctx->p.setException(std::runtime_error("Not enough futures"));
642 return ctx->p.getFuture();
645 template <typename T>
647 waitWithSemaphore(Future<T>&& f) {
649 auto done = f.then([&](Try<T> &&t) {
651 return std::move(t.value());
654 while (!done.isReady()) {
655 // There's a race here between the return here and the actual finishing of
656 // the future. f is completed, but the setup may not have finished on done
657 // after the baton has posted.
658 std::this_thread::yield();
664 inline Future<void> waitWithSemaphore<void>(Future<void>&& f) {
666 auto done = f.then([&](Try<void> &&t) {
671 while (!done.isReady()) {
672 // There's a race here between the return here and the actual finishing of
673 // the future. f is completed, but the setup may not have finished on done
674 // after the baton has posted.
675 std::this_thread::yield();
680 template <typename T, class Dur>
682 waitWithSemaphore(Future<T>&& f, Dur timeout) {
683 auto baton = std::make_shared<Baton<>>();
684 auto done = f.then([baton](Try<T> &&t) {
686 return std::move(t.value());
688 baton->timed_wait(std::chrono::system_clock::now() + timeout);
694 waitWithSemaphore(Future<void>&& f, Dur timeout) {
695 auto baton = std::make_shared<Baton<>>();
696 auto done = f.then([baton](Try<void> &&t) {
700 baton->timed_wait(std::chrono::system_clock::now() + timeout);
706 void getWaitHelper(Future<T>* f) {
707 // If we already have a value do the cheap thing
712 folly::Baton<> baton;
713 f->then([&](Try<T> const&) {
720 Future<T> getWaitTimeoutHelper(Future<T>* f, Duration dur) {
721 // TODO make and use variadic whenAny #5877971
723 auto token = std::make_shared<std::atomic<bool>>();
724 folly::Baton<> baton;
726 folly::wangle::detail::getTimekeeperSingleton()->after(dur)
727 .then([&,token](Try<void> const& t) {
730 if (token->exchange(true) == false) {
731 p.setException(TimedOut());
734 } catch (std::exception const& e) {
735 if (token->exchange(true) == false) {
736 p.setException(std::current_exception());
742 f->then([&, token](Try<T>&& t) {
743 if (token->exchange(true) == false) {
744 p.fulfilTry(std::move(t));
750 return p.getFuture();
758 // Big assumption here: the then() call above, since it doesn't move out
759 // the value, leaves us with a value to return here. This would be a big
760 // no-no in user code, but I'm invoking internal developer privilege. This
761 // is slightly more efficient (save a move()) especially if there's an
762 // exception (save a throw).
763 return std::move(value());
767 inline void Future<void>::get() {
772 T Future<T>::get(Duration dur) {
773 return std::move(getWaitTimeoutHelper(this, dur).value());
777 inline void Future<void>::get(Duration dur) {
778 getWaitTimeoutHelper(this, dur).value();
782 Future<T> Future<T>::delayed(Duration dur, Timekeeper* tk)
784 return whenAll(*this, futures::sleep(dur, tk))
785 .then([](Try<std::tuple<Try<T>, Try<void>>>&& tup) {
786 Try<T>& t = std::get<0>(tup.value());
787 return makeFuture<T>(std::move(t));
793 // I haven't included a Future<T&> specialization because I don't forsee us
794 // using it, however it is not difficult to add when needed. Refer to
795 // Future<void> for guidance. std::future and boost::future code would also be