onError callbacks
[folly.git] / folly / wangle / futures / Future-inl.h
1 /*
2  * Copyright 2014 Facebook, Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *   http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16
17 #pragma once
18
19 #include <chrono>
20 #include <thread>
21
22 #include <folly/wangle/futures/detail/Core.h>
23 #include <folly/Baton.h>
24
25 namespace folly { namespace wangle {
26
27 template <typename T>
28 struct isFuture {
29   static const bool value = false;
30 };
31
32 template <typename T>
33 struct isFuture<Future<T> > {
34   static const bool value = true;
35 };
36
37 template <class T>
38 Future<T>::Future(Future<T>&& other) noexcept : core_(nullptr) {
39   *this = std::move(other);
40 }
41
42 template <class T>
43 Future<T>& Future<T>::operator=(Future<T>&& other) {
44   std::swap(core_, other.core_);
45   return *this;
46 }
47
48 template <class T>
49 Future<T>::~Future() {
50   detach();
51 }
52
53 template <class T>
54 void Future<T>::detach() {
55   if (core_) {
56     core_->detachFuture();
57     core_ = nullptr;
58   }
59 }
60
61 template <class T>
62 void Future<T>::throwIfInvalid() const {
63   if (!core_)
64     throw NoState();
65 }
66
67 template <class T>
68 template <class F>
69 void Future<T>::setCallback_(F&& func) {
70   throwIfInvalid();
71   core_->setCallback(std::move(func));
72 }
73
74 // Variant: f.then([](Try<T>&& t){ return t.value(); });
75 template <class T>
76 template <class F>
77 typename std::enable_if<
78   !isFuture<typename std::result_of<F(Try<T>&&)>::type>::value,
79   Future<typename std::result_of<F(Try<T>&&)>::type> >::type
80 Future<T>::then(F&& func) {
81   typedef typename std::result_of<F(Try<T>&&)>::type B;
82
83   throwIfInvalid();
84
85   // wrap these so we can move them into the lambda
86   folly::MoveWrapper<Promise<B>> p;
87   folly::MoveWrapper<F> funcm(std::forward<F>(func));
88
89   // grab the Future now before we lose our handle on the Promise
90   auto f = p->getFuture();
91
92   /* This is a bit tricky.
93
94      We can't just close over *this in case this Future gets moved. So we
95      make a new dummy Future. We could figure out something more
96      sophisticated that avoids making a new Future object when it can, as an
97      optimization. But this is correct.
98
99      core_ can't be moved, it is explicitly disallowed (as is copying). But
100      if there's ever a reason to allow it, this is one place that makes that
101      assumption and would need to be fixed. We use a standard shared pointer
102      for core_ (by copying it in), which means in essence obj holds a shared
103      pointer to itself.  But this shouldn't leak because Promise will not
104      outlive the continuation, because Promise will setException() with a
105      broken Promise if it is destructed before completed. We could use a
106      weak pointer but it would have to be converted to a shared pointer when
107      func is executed (because the Future returned by func may possibly
108      persist beyond the callback, if it gets moved), and so it is an
109      optimization to just make it shared from the get-go.
110
111      We have to move in the Promise and func using the MoveWrapper
112      hack. (func could be copied but it's a big drag on perf).
113
114      Two subtle but important points about this design. detail::Core has no
115      back pointers to Future or Promise, so if Future or Promise get moved
116      (and they will be moved in performant code) we don't have to do
117      anything fancy. And because we store the continuation in the
118      detail::Core, not in the Future, we can execute the continuation even
119      after the Future has gone out of scope. This is an intentional design
120      decision. It is likely we will want to be able to cancel a continuation
121      in some circumstances, but I think it should be explicit not implicit
122      in the destruction of the Future used to create it.
123      */
124   setCallback_(
125     [p, funcm](Try<T>&& t) mutable {
126       p->fulfil([&]() {
127           return (*funcm)(std::move(t));
128         });
129     });
130
131   return std::move(f);
132 }
133
134 // Variant: f.then([](T&& t){ return t; });
135 template <class T>
136 template <class F>
137 typename std::enable_if<
138   !std::is_same<T, void>::value &&
139   !isFuture<typename std::result_of<
140     F(typename detail::AliasIfVoid<T>::type&&)>::type>::value,
141   Future<typename std::result_of<
142     F(typename detail::AliasIfVoid<T>::type&&)>::type> >::type
143 Future<T>::then(F&& func) {
144   typedef typename std::result_of<F(T&&)>::type B;
145
146   throwIfInvalid();
147
148   folly::MoveWrapper<Promise<B>> p;
149   folly::MoveWrapper<F> funcm(std::forward<F>(func));
150   auto f = p->getFuture();
151
152   setCallback_(
153     [p, funcm](Try<T>&& t) mutable {
154       if (t.hasException()) {
155         p->setException(t.getException());
156       } else {
157         p->fulfil([&]() {
158           return (*funcm)(std::move(t.value()));
159         });
160       }
161     });
162
163   return std::move(f);
164 }
165
166 // Variant: f.then([](){ return; });
167 template <class T>
168 template <class F>
169 typename std::enable_if<
170   std::is_same<T, void>::value &&
171   !isFuture<typename std::result_of<F()>::type>::value,
172   Future<typename std::result_of<F()>::type> >::type
173 Future<T>::then(F&& func) {
174   typedef typename std::result_of<F()>::type B;
175
176   throwIfInvalid();
177
178   folly::MoveWrapper<Promise<B>> p;
179   folly::MoveWrapper<F> funcm(std::forward<F>(func));
180   auto f = p->getFuture();
181
182   setCallback_(
183     [p, funcm](Try<T>&& t) mutable {
184       if (t.hasException()) {
185         p->setException(t.getException());
186       } else {
187         p->fulfil([&]() {
188           return (*funcm)();
189         });
190       }
191     });
192
193   return std::move(f);
194 }
195
196 // Variant: f.then([](Try<T>&& t){ return makeFuture<T>(t.value()); });
197 template <class T>
198 template <class F>
199 typename std::enable_if<
200   isFuture<typename std::result_of<F(Try<T>&&)>::type>::value,
201   Future<typename std::result_of<F(Try<T>&&)>::type::value_type> >::type
202 Future<T>::then(F&& func) {
203   typedef typename std::result_of<F(Try<T>&&)>::type::value_type B;
204
205   throwIfInvalid();
206
207   // wrap these so we can move them into the lambda
208   folly::MoveWrapper<Promise<B>> p;
209   folly::MoveWrapper<F> funcm(std::forward<F>(func));
210
211   // grab the Future now before we lose our handle on the Promise
212   auto f = p->getFuture();
213
214   setCallback_(
215     [p, funcm](Try<T>&& t) mutable {
216       try {
217         auto f2 = (*funcm)(std::move(t));
218         // that didn't throw, now we can steal p
219         f2.setCallback_([p](Try<B>&& b) mutable {
220             p->fulfilTry(std::move(b));
221           });
222       } catch (...) {
223         p->setException(std::current_exception());
224       }
225     });
226
227   return std::move(f);
228 }
229
230 // Variant: f.then([](T&& t){ return makeFuture<T>(t); });
231 template <class T>
232 template <class F>
233 typename std::enable_if<
234   !std::is_same<T, void>::value &&
235   isFuture<typename std::result_of<
236     F(typename detail::AliasIfVoid<T>::type&&)>::type>::value,
237   Future<typename std::result_of<
238     F(typename detail::AliasIfVoid<T>::type&&)>::type::value_type> >::type
239 Future<T>::then(F&& func) {
240   typedef typename std::result_of<F(T&&)>::type::value_type B;
241
242   throwIfInvalid();
243
244   folly::MoveWrapper<Promise<B>> p;
245   folly::MoveWrapper<F> funcm(std::forward<F>(func));
246   auto f = p->getFuture();
247
248   setCallback_(
249     [p, funcm](Try<T>&& t) mutable {
250       if (t.hasException()) {
251         p->setException(t.getException());
252       } else {
253         try {
254           auto f2 = (*funcm)(std::move(t.value()));
255           f2.setCallback_([p](Try<B>&& b) mutable {
256               p->fulfilTry(std::move(b));
257             });
258         } catch (...) {
259           p->setException(std::current_exception());
260         }
261       }
262     });
263
264   return std::move(f);
265 }
266
267 // Variant: f.then([](){ return makeFuture(); });
268 template <class T>
269 template <class F>
270 typename std::enable_if<
271   std::is_same<T, void>::value &&
272   isFuture<typename std::result_of<F()>::type>::value,
273   Future<typename std::result_of<F()>::type::value_type> >::type
274 Future<T>::then(F&& func) {
275   typedef typename std::result_of<F()>::type::value_type B;
276
277   throwIfInvalid();
278
279   folly::MoveWrapper<Promise<B>> p;
280   folly::MoveWrapper<F> funcm(std::forward<F>(func));
281
282   auto f = p->getFuture();
283
284   setCallback_(
285     [p, funcm](Try<T>&& t) mutable {
286       if (t.hasException()) {
287         p->setException(t.getException());
288       } else {
289         try {
290           auto f2 = (*funcm)();
291           f2.setCallback_([p](Try<B>&& b) mutable {
292               p->fulfilTry(std::move(b));
293             });
294         } catch (...) {
295           p->setException(std::current_exception());
296         }
297       }
298     });
299
300   return std::move(f);
301 }
302
303 template <class T>
304 Future<void> Future<T>::then() {
305   return then([] (Try<T>&& t) {});
306 }
307
308 // onError where the callback returns T
309 template <class T>
310 template <class F>
311 typename std::enable_if<
312   !detail::Extract<F>::ReturnsFuture::value,
313   Future<T>>::type
314 Future<T>::onError(F&& func) {
315   typedef typename detail::Extract<F>::FirstArg Exn;
316   static_assert(
317       std::is_same<typename detail::Extract<F>::RawReturn, T>::value,
318       "Return type of onError callback must be T or Future<T>");
319
320   Promise<T> p;
321   auto f = p.getFuture();
322   auto pm = folly::makeMoveWrapper(std::move(p));
323   auto funcm = folly::makeMoveWrapper(std::move(func));
324   setCallback_([pm, funcm](Try<T>&& t) mutable {
325     try {
326       t.throwIfFailed();
327     } catch (Exn& e) {
328       pm->fulfil([&]{
329         return (*funcm)(e);
330       });
331       return;
332     } catch (...) {
333       // fall through
334     }
335     pm->fulfilTry(std::move(t));
336   });
337
338   return f;
339 }
340
341 // onError where the callback returns Future<T>
342 template <class T>
343 template <class F>
344 typename std::enable_if<
345   detail::Extract<F>::ReturnsFuture::value,
346   Future<T>>::type
347 Future<T>::onError(F&& func) {
348   static_assert(
349       std::is_same<typename detail::Extract<F>::Return, Future<T>>::value,
350       "Return type of onError callback must be T or Future<T>");
351   typedef typename detail::Extract<F>::FirstArg Exn;
352
353   Promise<T> p;
354   auto f = p.getFuture();
355   auto pm = folly::makeMoveWrapper(std::move(p));
356   auto funcm = folly::makeMoveWrapper(std::move(func));
357   setCallback_([pm, funcm](Try<T>&& t) mutable {
358     try {
359       t.throwIfFailed();
360     } catch (Exn& e) {
361       try {
362         auto f2 = (*funcm)(e);
363         f2.setCallback_([pm](Try<T>&& t2) mutable {
364           pm->fulfilTry(std::move(t2));
365         });
366       } catch (...) {
367         pm->setException(std::current_exception());
368       }
369       return;
370     } catch (...) {
371       // fall through
372     }
373     pm->fulfilTry(std::move(t));
374   });
375
376   return f;
377 }
378
379 template <class T>
380 typename std::add_lvalue_reference<T>::type Future<T>::value() {
381   throwIfInvalid();
382
383   return core_->getTry().value();
384 }
385
386 template <class T>
387 typename std::add_lvalue_reference<const T>::type Future<T>::value() const {
388   throwIfInvalid();
389
390   return core_->getTry().value();
391 }
392
393 template <class T>
394 Try<T>& Future<T>::getTry() {
395   throwIfInvalid();
396
397   return core_->getTry();
398 }
399
400 template <class T>
401 template <typename Executor>
402 inline Future<T> Future<T>::via(Executor* executor) && {
403   throwIfInvalid();
404
405   this->deactivate();
406   core_->setExecutor(executor);
407
408   return std::move(*this);
409 }
410
411 template <class T>
412 template <typename Executor>
413 inline Future<T> Future<T>::via(Executor* executor) & {
414   throwIfInvalid();
415
416   MoveWrapper<Promise<T>> p;
417   auto f = p->getFuture();
418   then([p](Try<T>&& t) mutable { p->fulfilTry(std::move(t)); });
419   return std::move(f).via(executor);
420 }
421
422 template <class T>
423 bool Future<T>::isReady() const {
424   throwIfInvalid();
425   return core_->ready();
426 }
427
428 template <class T>
429 void Future<T>::raise(std::exception_ptr exception) {
430   core_->raise(exception);
431 }
432
433 // makeFuture
434
435 template <class T>
436 Future<typename std::decay<T>::type> makeFuture(T&& t) {
437   Promise<typename std::decay<T>::type> p;
438   auto f = p.getFuture();
439   p.setValue(std::forward<T>(t));
440   return std::move(f);
441 }
442
443 inline // for multiple translation units
444 Future<void> makeFuture() {
445   Promise<void> p;
446   auto f = p.getFuture();
447   p.setValue();
448   return std::move(f);
449 }
450
451 template <class F>
452 auto makeFutureTry(
453     F&& func,
454     typename std::enable_if<!std::is_reference<F>::value, bool>::type sdf)
455     -> Future<decltype(func())> {
456   Promise<decltype(func())> p;
457   auto f = p.getFuture();
458   p.fulfil(
459     [&func]() {
460       return (func)();
461     });
462   return std::move(f);
463 }
464
465 template <class F>
466 auto makeFutureTry(F const& func) -> Future<decltype(func())> {
467   F copy = func;
468   return makeFutureTry(std::move(copy));
469 }
470
471 template <class T>
472 Future<T> makeFuture(std::exception_ptr const& e) {
473   Promise<T> p;
474   auto f = p.getFuture();
475   p.setException(e);
476   return std::move(f);
477 }
478
479 template <class T, class E>
480 typename std::enable_if<std::is_base_of<std::exception, E>::value,
481                         Future<T>>::type
482 makeFuture(E const& e) {
483   Promise<T> p;
484   auto f = p.getFuture();
485   p.fulfil([&]() -> T { throw e; });
486   return std::move(f);
487 }
488
489 template <class T>
490 Future<T> makeFuture(Try<T>&& t) {
491   try {
492     return makeFuture<T>(std::move(t.value()));
493   } catch (...) {
494     return makeFuture<T>(std::current_exception());
495   }
496 }
497
498 template <>
499 inline Future<void> makeFuture(Try<void>&& t) {
500   try {
501     t.throwIfFailed();
502     return makeFuture();
503   } catch (...) {
504     return makeFuture<void>(std::current_exception());
505   }
506 }
507
508 // via
509 template <typename Executor>
510 Future<void> via(Executor* executor) {
511   return makeFuture().via(executor);
512 }
513
514 // when (variadic)
515
516 template <typename... Fs>
517 typename detail::VariadicContext<
518   typename std::decay<Fs>::type::value_type...>::type
519 whenAll(Fs&&... fs)
520 {
521   auto ctx =
522     new detail::VariadicContext<typename std::decay<Fs>::type::value_type...>();
523   ctx->total = sizeof...(fs);
524   auto f_saved = ctx->p.getFuture();
525   detail::whenAllVariadicHelper(ctx,
526     std::forward<typename std::decay<Fs>::type>(fs)...);
527   return std::move(f_saved);
528 }
529
530 // when (iterator)
531
532 template <class InputIterator>
533 Future<
534   std::vector<
535   Try<typename std::iterator_traits<InputIterator>::value_type::value_type>>>
536 whenAll(InputIterator first, InputIterator last)
537 {
538   typedef
539     typename std::iterator_traits<InputIterator>::value_type::value_type T;
540
541   auto n = std::distance(first, last);
542   if (n == 0) {
543     return makeFuture(std::vector<Try<T>>());
544   }
545
546   auto ctx = new detail::WhenAllContext<T>();
547
548   ctx->results.resize(n);
549
550   auto f_saved = ctx->p.getFuture();
551
552   for (size_t i = 0; first != last; ++first, ++i) {
553      assert(i < n);
554      auto& f = *first;
555      f.setCallback_([ctx, i, n](Try<T>&& t) {
556          ctx->results[i] = std::move(t);
557          if (++ctx->count == n) {
558            ctx->p.setValue(std::move(ctx->results));
559            delete ctx;
560          }
561        });
562   }
563
564   return std::move(f_saved);
565 }
566
567 template <class InputIterator>
568 Future<
569   std::pair<size_t,
570             Try<
571               typename
572               std::iterator_traits<InputIterator>::value_type::value_type> > >
573 whenAny(InputIterator first, InputIterator last) {
574   typedef
575     typename std::iterator_traits<InputIterator>::value_type::value_type T;
576
577   auto ctx = new detail::WhenAnyContext<T>(std::distance(first, last));
578   auto f_saved = ctx->p.getFuture();
579
580   for (size_t i = 0; first != last; first++, i++) {
581     auto& f = *first;
582     f.setCallback_([i, ctx](Try<T>&& t) {
583       if (!ctx->done.exchange(true)) {
584         ctx->p.setValue(std::make_pair(i, std::move(t)));
585       }
586       ctx->decref();
587     });
588   }
589
590   return std::move(f_saved);
591 }
592
593 template <class InputIterator>
594 Future<std::vector<std::pair<size_t, Try<typename
595   std::iterator_traits<InputIterator>::value_type::value_type>>>>
596 whenN(InputIterator first, InputIterator last, size_t n) {
597   typedef typename
598     std::iterator_traits<InputIterator>::value_type::value_type T;
599   typedef std::vector<std::pair<size_t, Try<T>>> V;
600
601   struct ctx_t {
602     V v;
603     size_t completed;
604     Promise<V> p;
605   };
606   auto ctx = std::make_shared<ctx_t>();
607   ctx->completed = 0;
608
609   // for each completed Future, increase count and add to vector, until we
610   // have n completed futures at which point we fulfil our Promise with the
611   // vector
612   auto it = first;
613   size_t i = 0;
614   while (it != last) {
615     it->then([ctx, n, i](Try<T>&& t) {
616       auto& v = ctx->v;
617       auto c = ++ctx->completed;
618       if (c <= n) {
619         assert(ctx->v.size() < n);
620         v.push_back(std::make_pair(i, std::move(t)));
621         if (c == n) {
622           ctx->p.fulfilTry(Try<V>(std::move(v)));
623         }
624       }
625     });
626
627     it++;
628     i++;
629   }
630
631   if (i < n) {
632     ctx->p.setException(std::runtime_error("Not enough futures"));
633   }
634
635   return ctx->p.getFuture();
636 }
637
638 template <typename T>
639 Future<T>
640 waitWithSemaphore(Future<T>&& f) {
641   Baton<> baton;
642   auto done = f.then([&](Try<T> &&t) {
643     baton.post();
644     return std::move(t.value());
645   });
646   baton.wait();
647   while (!done.isReady()) {
648     // There's a race here between the return here and the actual finishing of
649     // the future. f is completed, but the setup may not have finished on done
650     // after the baton has posted.
651     std::this_thread::yield();
652   }
653   return done;
654 }
655
656 template<>
657 inline Future<void> waitWithSemaphore<void>(Future<void>&& f) {
658   Baton<> baton;
659   auto done = f.then([&](Try<void> &&t) {
660     baton.post();
661     t.value();
662   });
663   baton.wait();
664   while (!done.isReady()) {
665     // There's a race here between the return here and the actual finishing of
666     // the future. f is completed, but the setup may not have finished on done
667     // after the baton has posted.
668     std::this_thread::yield();
669   }
670   return done;
671 }
672
673 template <typename T, class Duration>
674 Future<T>
675 waitWithSemaphore(Future<T>&& f, Duration timeout) {
676   auto baton = std::make_shared<Baton<>>();
677   auto done = f.then([baton](Try<T> &&t) {
678     baton->post();
679     return std::move(t.value());
680   });
681   baton->timed_wait(std::chrono::system_clock::now() + timeout);
682   return done;
683 }
684
685 template <class Duration>
686 Future<void>
687 waitWithSemaphore(Future<void>&& f, Duration timeout) {
688   auto baton = std::make_shared<Baton<>>();
689   auto done = f.then([baton](Try<void> &&t) {
690     baton->post();
691     t.value();
692   });
693   baton->timed_wait(std::chrono::system_clock::now() + timeout);
694   return done;
695 }
696
697 }}
698
699 // I haven't included a Future<T&> specialization because I don't forsee us
700 // using it, however it is not difficult to add when needed. Refer to
701 // Future<void> for guidance. std::future and boost::future code would also be
702 // instructive.