(folly/futures) Fix get() bug
[folly.git] / folly / futures / Future-inl.h
1 /*
2  * Copyright 2014 Facebook, Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *   http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16
17 #pragma once
18
19 #include <chrono>
20 #include <thread>
21
22 #include <folly/Baton.h>
23 #include <folly/futures/detail/Core.h>
24 #include <folly/futures/Timekeeper.h>
25
26 namespace folly {
27
28 class Timekeeper;
29
30 namespace detail {
31   Timekeeper* getTimekeeperSingleton();
32 }
33
34 template <class T>
35 Future<T>::Future(Future<T>&& other) noexcept : core_(nullptr) {
36   *this = std::move(other);
37 }
38
39 template <class T>
40 Future<T>& Future<T>::operator=(Future<T>&& other) {
41   std::swap(core_, other.core_);
42   return *this;
43 }
44
45 template <class T>
46 template <class F>
47 Future<T>::Future(
48   const typename std::enable_if<!std::is_void<F>::value, F>::type& val)
49     : core_(nullptr) {
50   Promise<F> p;
51   p.setValue(val);
52   *this = p.getFuture();
53 }
54
55 template <class T>
56 template <class F>
57 Future<T>::Future(
58   typename std::enable_if<!std::is_void<F>::value, F>::type&& val)
59     : core_(nullptr) {
60   Promise<F> p;
61   p.setValue(std::forward<F>(val));
62   *this = p.getFuture();
63 }
64
65 template <>
66 template <class F,
67           typename std::enable_if<std::is_void<F>::value, int>::type>
68 Future<void>::Future() : core_(nullptr) {
69   Promise<void> p;
70   p.setValue();
71   *this = p.getFuture();
72 }
73
74
75 template <class T>
76 Future<T>::~Future() {
77   detach();
78 }
79
80 template <class T>
81 void Future<T>::detach() {
82   if (core_) {
83     core_->detachFuture();
84     core_ = nullptr;
85   }
86 }
87
88 template <class T>
89 void Future<T>::throwIfInvalid() const {
90   if (!core_)
91     throw NoState();
92 }
93
94 template <class T>
95 template <class F>
96 void Future<T>::setCallback_(F&& func) {
97   throwIfInvalid();
98   core_->setCallback(std::move(func));
99 }
100
101 // Variant: returns a value
102 // e.g. f.then([](Try<T>&& t){ return t.value(); });
103 template <class T>
104 template <typename F, typename R, bool isTry, typename... Args>
105 typename std::enable_if<!R::ReturnsFuture::value, typename R::Return>::type
106 Future<T>::thenImplementation(F func, detail::argResult<isTry, F, Args...>) {
107   static_assert(sizeof...(Args) <= 1, "Then must take zero/one argument");
108   typedef typename R::ReturnsFuture::Inner B;
109
110   throwIfInvalid();
111
112   // wrap these so we can move them into the lambda
113   folly::MoveWrapper<Promise<B>> p;
114   folly::MoveWrapper<F> funcm(std::forward<F>(func));
115
116   // grab the Future now before we lose our handle on the Promise
117   auto f = p->getFuture();
118   if (getExecutor()) {
119     f.setExecutor(getExecutor());
120   }
121
122   /* This is a bit tricky.
123
124      We can't just close over *this in case this Future gets moved. So we
125      make a new dummy Future. We could figure out something more
126      sophisticated that avoids making a new Future object when it can, as an
127      optimization. But this is correct.
128
129      core_ can't be moved, it is explicitly disallowed (as is copying). But
130      if there's ever a reason to allow it, this is one place that makes that
131      assumption and would need to be fixed. We use a standard shared pointer
132      for core_ (by copying it in), which means in essence obj holds a shared
133      pointer to itself.  But this shouldn't leak because Promise will not
134      outlive the continuation, because Promise will setException() with a
135      broken Promise if it is destructed before completed. We could use a
136      weak pointer but it would have to be converted to a shared pointer when
137      func is executed (because the Future returned by func may possibly
138      persist beyond the callback, if it gets moved), and so it is an
139      optimization to just make it shared from the get-go.
140
141      We have to move in the Promise and func using the MoveWrapper
142      hack. (func could be copied but it's a big drag on perf).
143
144      Two subtle but important points about this design. detail::Core has no
145      back pointers to Future or Promise, so if Future or Promise get moved
146      (and they will be moved in performant code) we don't have to do
147      anything fancy. And because we store the continuation in the
148      detail::Core, not in the Future, we can execute the continuation even
149      after the Future has gone out of scope. This is an intentional design
150      decision. It is likely we will want to be able to cancel a continuation
151      in some circumstances, but I think it should be explicit not implicit
152      in the destruction of the Future used to create it.
153      */
154   setCallback_(
155     [p, funcm](Try<T>&& t) mutable {
156       if (!isTry && t.hasException()) {
157         p->setException(std::move(t.exception()));
158       } else {
159         p->fulfil([&]() {
160           return (*funcm)(t.template get<isTry, Args>()...);
161         });
162       }
163     });
164
165   return f;
166 }
167
168 // Variant: returns a Future
169 // e.g. f.then([](T&& t){ return makeFuture<T>(t); });
170 template <class T>
171 template <typename F, typename R, bool isTry, typename... Args>
172 typename std::enable_if<R::ReturnsFuture::value, typename R::Return>::type
173 Future<T>::thenImplementation(F func, detail::argResult<isTry, F, Args...>) {
174   static_assert(sizeof...(Args) <= 1, "Then must take zero/one argument");
175   typedef typename R::ReturnsFuture::Inner B;
176
177   throwIfInvalid();
178
179   // wrap these so we can move them into the lambda
180   folly::MoveWrapper<Promise<B>> p;
181   folly::MoveWrapper<F> funcm(std::forward<F>(func));
182
183   // grab the Future now before we lose our handle on the Promise
184   auto f = p->getFuture();
185   if (getExecutor()) {
186     f.setExecutor(getExecutor());
187   }
188
189   setCallback_(
190     [p, funcm](Try<T>&& t) mutable {
191       if (!isTry && t.hasException()) {
192         p->setException(std::move(t.exception()));
193       } else {
194         try {
195           auto f2 = (*funcm)(t.template get<isTry, Args>()...);
196           // that didn't throw, now we can steal p
197           f2.setCallback_([p](Try<B>&& b) mutable {
198             p->fulfilTry(std::move(b));
199           });
200         } catch (const std::exception& e) {
201           p->setException(exception_wrapper(std::current_exception(), e));
202         } catch (...) {
203           p->setException(exception_wrapper(std::current_exception()));
204         }
205       }
206     });
207
208   return f;
209 }
210
211 template <typename T>
212 template <typename R, typename Caller, typename... Args>
213   Future<typename isFuture<R>::Inner>
214 Future<T>::then(R(Caller::*func)(Args...), Caller *instance) {
215   typedef typename std::remove_cv<
216     typename std::remove_reference<
217       typename detail::ArgType<Args...>::FirstArg>::type>::type FirstArg;
218   return then([instance, func](Try<T>&& t){
219     return (instance->*func)(t.template get<isTry<FirstArg>::value, Args>()...);
220   });
221 }
222
223 template <class T>
224 Future<void> Future<T>::then() {
225   return then([] (Try<T>&& t) {});
226 }
227
228 // onError where the callback returns T
229 template <class T>
230 template <class F>
231 typename std::enable_if<
232   !detail::Extract<F>::ReturnsFuture::value,
233   Future<T>>::type
234 Future<T>::onError(F&& func) {
235   typedef typename detail::Extract<F>::FirstArg Exn;
236   static_assert(
237       std::is_same<typename detail::Extract<F>::RawReturn, T>::value,
238       "Return type of onError callback must be T or Future<T>");
239
240   Promise<T> p;
241   auto f = p.getFuture();
242   auto pm = folly::makeMoveWrapper(std::move(p));
243   auto funcm = folly::makeMoveWrapper(std::move(func));
244   setCallback_([pm, funcm](Try<T>&& t) mutable {
245     if (!t.template withException<Exn>([&] (Exn& e) {
246           pm->fulfil([&]{
247             return (*funcm)(e);
248           });
249         })) {
250       pm->fulfilTry(std::move(t));
251     }
252   });
253
254   return f;
255 }
256
257 // onError where the callback returns Future<T>
258 template <class T>
259 template <class F>
260 typename std::enable_if<
261   detail::Extract<F>::ReturnsFuture::value,
262   Future<T>>::type
263 Future<T>::onError(F&& func) {
264   static_assert(
265       std::is_same<typename detail::Extract<F>::Return, Future<T>>::value,
266       "Return type of onError callback must be T or Future<T>");
267   typedef typename detail::Extract<F>::FirstArg Exn;
268
269   Promise<T> p;
270   auto f = p.getFuture();
271   auto pm = folly::makeMoveWrapper(std::move(p));
272   auto funcm = folly::makeMoveWrapper(std::move(func));
273   setCallback_([pm, funcm](Try<T>&& t) mutable {
274     if (!t.template withException<Exn>([&] (Exn& e) {
275           try {
276             auto f2 = (*funcm)(e);
277             f2.setCallback_([pm](Try<T>&& t2) mutable {
278               pm->fulfilTry(std::move(t2));
279             });
280           } catch (const std::exception& e2) {
281             pm->setException(exception_wrapper(std::current_exception(), e2));
282           } catch (...) {
283             pm->setException(exception_wrapper(std::current_exception()));
284           }
285         })) {
286       pm->fulfilTry(std::move(t));
287     }
288   });
289
290   return f;
291 }
292
293 template <class T>
294 template <class F>
295 Future<T> Future<T>::onTimeout(Duration dur, F&& func, Timekeeper* tk) {
296   auto funcw = folly::makeMoveWrapper(std::forward<F>(func));
297   return within(dur, tk)
298     .onError([funcw](TimedOut const&) { return (*funcw)(); });
299 }
300
301 template <class T>
302 typename std::add_lvalue_reference<T>::type Future<T>::value() {
303   throwIfInvalid();
304
305   return core_->getTry().value();
306 }
307
308 template <class T>
309 typename std::add_lvalue_reference<const T>::type Future<T>::value() const {
310   throwIfInvalid();
311
312   return core_->getTry().value();
313 }
314
315 template <class T>
316 Try<T>& Future<T>::getTry() {
317   throwIfInvalid();
318
319   return core_->getTry();
320 }
321
322 template <class T>
323 template <typename Executor>
324 inline Future<T> Future<T>::via(Executor* executor) && {
325   throwIfInvalid();
326
327   setExecutor(executor);
328
329   return std::move(*this);
330 }
331
332 template <class T>
333 template <typename Executor>
334 inline Future<T> Future<T>::via(Executor* executor) & {
335   throwIfInvalid();
336
337   MoveWrapper<Promise<T>> p;
338   auto f = p->getFuture();
339   then([p](Try<T>&& t) mutable { p->fulfilTry(std::move(t)); });
340   return std::move(f).via(executor);
341 }
342
343 template <class T>
344 bool Future<T>::isReady() const {
345   throwIfInvalid();
346   return core_->ready();
347 }
348
349 template <class T>
350 void Future<T>::raise(exception_wrapper exception) {
351   core_->raise(std::move(exception));
352 }
353
354 // makeFuture
355
356 template <class T>
357 Future<typename std::decay<T>::type> makeFuture(T&& t) {
358   Promise<typename std::decay<T>::type> p;
359   p.setValue(std::forward<T>(t));
360   return p.getFuture();
361 }
362
363 inline // for multiple translation units
364 Future<void> makeFuture() {
365   Promise<void> p;
366   p.setValue();
367   return p.getFuture();
368 }
369
370 template <class F>
371 auto makeFutureTry(
372     F&& func,
373     typename std::enable_if<!std::is_reference<F>::value, bool>::type sdf)
374     -> Future<decltype(func())> {
375   Promise<decltype(func())> p;
376   p.fulfil(
377     [&func]() {
378       return (func)();
379     });
380   return p.getFuture();
381 }
382
383 template <class F>
384 auto makeFutureTry(F const& func) -> Future<decltype(func())> {
385   F copy = func;
386   return makeFutureTry(std::move(copy));
387 }
388
389 template <class T>
390 Future<T> makeFuture(std::exception_ptr const& e) {
391   Promise<T> p;
392   p.setException(e);
393   return p.getFuture();
394 }
395
396 template <class T>
397 Future<T> makeFuture(exception_wrapper ew) {
398   Promise<T> p;
399   p.setException(std::move(ew));
400   return p.getFuture();
401 }
402
403 template <class T, class E>
404 typename std::enable_if<std::is_base_of<std::exception, E>::value,
405                         Future<T>>::type
406 makeFuture(E const& e) {
407   Promise<T> p;
408   p.setException(make_exception_wrapper<E>(e));
409   return p.getFuture();
410 }
411
412 template <class T>
413 Future<T> makeFuture(Try<T>&& t) {
414   Promise<typename std::decay<T>::type> p;
415   p.fulfilTry(std::move(t));
416   return p.getFuture();
417 }
418
419 template <>
420 inline Future<void> makeFuture(Try<void>&& t) {
421   if (t.hasException()) {
422     return makeFuture<void>(std::move(t.exception()));
423   } else {
424     return makeFuture();
425   }
426 }
427
428 // via
429 template <typename Executor>
430 Future<void> via(Executor* executor) {
431   return makeFuture().via(executor);
432 }
433
434 // when (variadic)
435
436 template <typename... Fs>
437 typename detail::VariadicContext<
438   typename std::decay<Fs>::type::value_type...>::type
439 whenAll(Fs&&... fs)
440 {
441   auto ctx =
442     new detail::VariadicContext<typename std::decay<Fs>::type::value_type...>();
443   ctx->total = sizeof...(fs);
444   auto f_saved = ctx->p.getFuture();
445   detail::whenAllVariadicHelper(ctx,
446     std::forward<typename std::decay<Fs>::type>(fs)...);
447   return f_saved;
448 }
449
450 // when (iterator)
451
452 template <class InputIterator>
453 Future<
454   std::vector<
455   Try<typename std::iterator_traits<InputIterator>::value_type::value_type>>>
456 whenAll(InputIterator first, InputIterator last)
457 {
458   typedef
459     typename std::iterator_traits<InputIterator>::value_type::value_type T;
460
461   if (first >= last) {
462     return makeFuture(std::vector<Try<T>>());
463   }
464   size_t n = std::distance(first, last);
465
466   auto ctx = new detail::WhenAllContext<T>();
467
468   ctx->results.resize(n);
469
470   auto f_saved = ctx->p.getFuture();
471
472   for (size_t i = 0; first != last; ++first, ++i) {
473      assert(i < n);
474      auto& f = *first;
475      f.setCallback_([ctx, i, n](Try<T>&& t) {
476          ctx->results[i] = std::move(t);
477          if (++ctx->count == n) {
478            ctx->p.setValue(std::move(ctx->results));
479            delete ctx;
480          }
481        });
482   }
483
484   return f_saved;
485 }
486
487 template <class InputIterator>
488 Future<
489   std::pair<size_t,
490             Try<
491               typename
492               std::iterator_traits<InputIterator>::value_type::value_type> > >
493 whenAny(InputIterator first, InputIterator last) {
494   typedef
495     typename std::iterator_traits<InputIterator>::value_type::value_type T;
496
497   auto ctx = new detail::WhenAnyContext<T>(std::distance(first, last));
498   auto f_saved = ctx->p.getFuture();
499
500   for (size_t i = 0; first != last; first++, i++) {
501     auto& f = *first;
502     f.setCallback_([i, ctx](Try<T>&& t) {
503       if (!ctx->done.exchange(true)) {
504         ctx->p.setValue(std::make_pair(i, std::move(t)));
505       }
506       ctx->decref();
507     });
508   }
509
510   return f_saved;
511 }
512
513 template <class InputIterator>
514 Future<std::vector<std::pair<size_t, Try<typename
515   std::iterator_traits<InputIterator>::value_type::value_type>>>>
516 whenN(InputIterator first, InputIterator last, size_t n) {
517   typedef typename
518     std::iterator_traits<InputIterator>::value_type::value_type T;
519   typedef std::vector<std::pair<size_t, Try<T>>> V;
520
521   struct ctx_t {
522     V v;
523     size_t completed;
524     Promise<V> p;
525   };
526   auto ctx = std::make_shared<ctx_t>();
527   ctx->completed = 0;
528
529   // for each completed Future, increase count and add to vector, until we
530   // have n completed futures at which point we fulfil our Promise with the
531   // vector
532   auto it = first;
533   size_t i = 0;
534   while (it != last) {
535     it->then([ctx, n, i](Try<T>&& t) {
536       auto& v = ctx->v;
537       auto c = ++ctx->completed;
538       if (c <= n) {
539         assert(ctx->v.size() < n);
540         v.push_back(std::make_pair(i, std::move(t)));
541         if (c == n) {
542           ctx->p.fulfilTry(Try<V>(std::move(v)));
543         }
544       }
545     });
546
547     it++;
548     i++;
549   }
550
551   if (i < n) {
552     ctx->p.setException(std::runtime_error("Not enough futures"));
553   }
554
555   return ctx->p.getFuture();
556 }
557
558 template <class T>
559 Future<T> Future<T>::within(Duration dur, Timekeeper* tk) {
560   return within(dur, TimedOut(), tk);
561 }
562
563 template <class T>
564 template <class E>
565 Future<T> Future<T>::within(Duration dur, E e, Timekeeper* tk) {
566
567   struct Context {
568     Context(E ex) : exception(std::move(ex)), promise(), token(false) {}
569     E exception;
570     Promise<T> promise;
571     std::atomic<bool> token;
572   };
573   auto ctx = std::make_shared<Context>(std::move(e));
574
575   if (!tk) {
576     tk = folly::detail::getTimekeeperSingleton();
577   }
578
579   tk->after(dur)
580     .then([ctx](Try<void> const& t) {
581       if (ctx->token.exchange(true) == false) {
582         if (t.hasException()) {
583           ctx->promise.setException(std::move(t.exception()));
584         } else {
585           ctx->promise.setException(std::move(ctx->exception));
586         }
587       }
588     });
589
590   this->then([ctx](Try<T>&& t) {
591     if (ctx->token.exchange(true) == false) {
592       ctx->promise.fulfilTry(std::move(t));
593     }
594   });
595
596   return ctx->promise.getFuture();
597 }
598
599 template <class T>
600 Future<T> Future<T>::delayed(Duration dur, Timekeeper* tk) {
601   return whenAll(*this, futures::sleep(dur, tk))
602     .then([](std::tuple<Try<T>, Try<void>> tup) {
603       Try<T>& t = std::get<0>(tup);
604       return makeFuture<T>(std::move(t));
605     });
606 }
607
608 namespace detail {
609
610 template <class T>
611 void waitImpl(Future<T>& f) {
612   // short-circuit if there's nothing to do
613   if (f.isReady()) return;
614
615   Baton<> baton;
616   f = f.then([&](Try<T> t) {
617     baton.post();
618     return makeFuture(std::move(t));
619   });
620   baton.wait();
621
622   // There's a race here between the return here and the actual finishing of
623   // the future. f is completed, but the setup may not have finished on done
624   // after the baton has posted.
625   while (!f.isReady()) {
626     std::this_thread::yield();
627   }
628 }
629
630 template <class T>
631 void waitImpl(Future<T>& f, Duration dur) {
632   // short-circuit if there's nothing to do
633   if (f.isReady()) return;
634
635   auto baton = std::make_shared<Baton<>>();
636   f = f.then([baton](Try<T> t) {
637     baton->post();
638     return makeFuture(std::move(t));
639   });
640
641   // Let's preserve the invariant that if we did not timeout (timed_wait returns
642   // true), then the returned Future is complete when it is returned to the
643   // caller. We need to wait out the race for that Future to complete.
644   if (baton->timed_wait(std::chrono::system_clock::now() + dur)) {
645     while (!f.isReady()) {
646       std::this_thread::yield();
647     }
648   }
649 }
650
651 template <class T>
652 void waitViaImpl(Future<T>& f, DrivableExecutor* e) {
653   while (!f.isReady()) {
654     e->drive();
655   }
656 }
657
658 } // detail
659
660 template <class T>
661 Future<T>& Future<T>::wait() & {
662   detail::waitImpl(*this);
663   return *this;
664 }
665
666 template <class T>
667 Future<T>&& Future<T>::wait() && {
668   detail::waitImpl(*this);
669   return std::move(*this);
670 }
671
672 template <class T>
673 Future<T>& Future<T>::wait(Duration dur) & {
674   detail::waitImpl(*this, dur);
675   return *this;
676 }
677
678 template <class T>
679 Future<T>&& Future<T>::wait(Duration dur) && {
680   detail::waitImpl(*this, dur);
681   return std::move(*this);
682 }
683
684 template <class T>
685 Future<T>& Future<T>::waitVia(DrivableExecutor* e) & {
686   detail::waitViaImpl(*this, e);
687   return *this;
688 }
689
690 template <class T>
691 Future<T>&& Future<T>::waitVia(DrivableExecutor* e) && {
692   detail::waitViaImpl(*this, e);
693   return std::move(*this);
694 }
695
696 template <class T>
697 T Future<T>::get() {
698   return std::move(wait().value());
699 }
700
701 template <>
702 inline void Future<void>::get() {
703   wait().value();
704 }
705
706 template <class T>
707 T Future<T>::get(Duration dur) {
708   wait(dur);
709   if (isReady()) {
710     return std::move(value());
711   } else {
712     throw TimedOut();
713   }
714 }
715
716 template <>
717 inline void Future<void>::get(Duration dur) {
718   wait(dur);
719   if (isReady()) {
720     return;
721   } else {
722     throw TimedOut();
723   }
724 }
725
726 template <class T>
727 T Future<T>::getVia(DrivableExecutor* e) {
728   return std::move(waitVia(e).value());
729 }
730
731 template <>
732 inline void Future<void>::getVia(DrivableExecutor* e) {
733   waitVia(e).value();
734 }
735
736 namespace futures {
737
738   namespace {
739     template <class Z>
740     Future<Z> chainHelper(Future<Z> f) {
741       return f;
742     }
743
744     template <class Z, class F, class Fn, class... Callbacks>
745     Future<Z> chainHelper(F f, Fn fn, Callbacks... fns) {
746       return chainHelper<Z>(f.then(fn), fns...);
747     }
748   }
749
750   template <class A, class Z, class... Callbacks>
751   std::function<Future<Z>(Try<A>)>
752   chain(Callbacks... fns) {
753     MoveWrapper<Promise<A>> pw;
754     MoveWrapper<Future<Z>> fw(chainHelper<Z>(pw->getFuture(), fns...));
755     return [=](Try<A> t) mutable {
756       pw->fulfilTry(std::move(t));
757       return std::move(*fw);
758     };
759   }
760
761 }
762
763 } // namespace folly
764
765 // I haven't included a Future<T&> specialization because I don't forsee us
766 // using it, however it is not difficult to add when needed. Refer to
767 // Future<void> for guidance. std::future and boost::future code would also be
768 // instructive.