cf10e9f466a280c5b3d68a8992a4c97878ff75ec
[folly.git] / folly / wangle / Future-inl.h
1 /*
2  * Copyright 2014 Facebook, Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *   http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16
17 #pragma once
18
19 #include <chrono>
20 #include <thread>
21
22 #include <folly/wangle/detail/Core.h>
23 #include <folly/Baton.h>
24
25 namespace folly { namespace wangle {
26
27 template <typename T>
28 struct isFuture {
29   static const bool value = false;
30 };
31
32 template <typename T>
33 struct isFuture<Future<T> > {
34   static const bool value = true;
35 };
36
37 template <class T>
38 Future<T>::Future(Future<T>&& other) noexcept : core_(nullptr) {
39   *this = std::move(other);
40 }
41
42 template <class T>
43 Future<T>& Future<T>::operator=(Future<T>&& other) {
44   std::swap(core_, other.core_);
45   return *this;
46 }
47
48 template <class T>
49 Future<T>::~Future() {
50   detach();
51 }
52
53 template <class T>
54 void Future<T>::detach() {
55   if (core_) {
56     core_->detachFuture();
57     core_ = nullptr;
58   }
59 }
60
61 template <class T>
62 void Future<T>::throwIfInvalid() const {
63   if (!core_)
64     throw NoState();
65 }
66
67 template <class T>
68 template <class F>
69 void Future<T>::setCallback_(F&& func) {
70   throwIfInvalid();
71   core_->setCallback(std::move(func));
72 }
73
74 template <class T>
75 template <class F>
76 typename std::enable_if<
77   !isFuture<typename std::result_of<F(Try<T>&&)>::type>::value,
78   Future<typename std::result_of<F(Try<T>&&)>::type> >::type
79 Future<T>::then(F&& func) {
80   typedef typename std::result_of<F(Try<T>&&)>::type B;
81
82   throwIfInvalid();
83
84   // wrap these so we can move them into the lambda
85   folly::MoveWrapper<Promise<B>> p;
86   folly::MoveWrapper<F> funcm(std::forward<F>(func));
87
88   // grab the Future now before we lose our handle on the Promise
89   auto f = p->getFuture();
90
91   /* This is a bit tricky.
92
93      We can't just close over *this in case this Future gets moved. So we
94      make a new dummy Future. We could figure out something more
95      sophisticated that avoids making a new Future object when it can, as an
96      optimization. But this is correct.
97
98      core_ can't be moved, it is explicitly disallowed (as is copying). But
99      if there's ever a reason to allow it, this is one place that makes that
100      assumption and would need to be fixed. We use a standard shared pointer
101      for core_ (by copying it in), which means in essence obj holds a shared
102      pointer to itself.  But this shouldn't leak because Promise will not
103      outlive the continuation, because Promise will setException() with a
104      broken Promise if it is destructed before completed. We could use a
105      weak pointer but it would have to be converted to a shared pointer when
106      func is executed (because the Future returned by func may possibly
107      persist beyond the callback, if it gets moved), and so it is an
108      optimization to just make it shared from the get-go.
109
110      We have to move in the Promise and func using the MoveWrapper
111      hack. (func could be copied but it's a big drag on perf).
112
113      Two subtle but important points about this design. detail::Core has no
114      back pointers to Future or Promise, so if Future or Promise get moved
115      (and they will be moved in performant code) we don't have to do
116      anything fancy. And because we store the continuation in the
117      detail::Core, not in the Future, we can execute the continuation even
118      after the Future has gone out of scope. This is an intentional design
119      decision. It is likely we will want to be able to cancel a continuation
120      in some circumstances, but I think it should be explicit not implicit
121      in the destruction of the Future used to create it.
122      */
123   setCallback_(
124     [p, funcm](Try<T>&& t) mutable {
125       p->fulfil([&]() {
126           return (*funcm)(std::move(t));
127         });
128     });
129
130   return std::move(f);
131 }
132
133 template <class T>
134 template <class F>
135 typename std::enable_if<
136   isFuture<typename std::result_of<F(Try<T>&&)>::type>::value,
137   Future<typename std::result_of<F(Try<T>&&)>::type::value_type> >::type
138 Future<T>::then(F&& func) {
139   typedef typename std::result_of<F(Try<T>&&)>::type::value_type B;
140
141   throwIfInvalid();
142
143   // wrap these so we can move them into the lambda
144   folly::MoveWrapper<Promise<B>> p;
145   folly::MoveWrapper<F> funcm(std::forward<F>(func));
146
147   // grab the Future now before we lose our handle on the Promise
148   auto f = p->getFuture();
149
150   setCallback_(
151     [p, funcm](Try<T>&& t) mutable {
152       try {
153         auto f2 = (*funcm)(std::move(t));
154         // that didn't throw, now we can steal p
155         f2.setCallback_([p](Try<B>&& b) mutable {
156             p->fulfilTry(std::move(b));
157           });
158       } catch (...) {
159         p->setException(std::current_exception());
160       }
161     });
162
163   return std::move(f);
164 }
165
166 template <class T>
167 Future<void> Future<T>::then() {
168   return then([] (Try<T>&& t) {});
169 }
170
171 template <class T>
172 typename std::add_lvalue_reference<T>::type Future<T>::value() {
173   throwIfInvalid();
174
175   return core_->value();
176 }
177
178 template <class T>
179 typename std::add_lvalue_reference<const T>::type Future<T>::value() const {
180   throwIfInvalid();
181
182   return core_->value();
183 }
184
185 template <class T>
186 Try<T>& Future<T>::getTry() {
187   throwIfInvalid();
188
189   return core_->getTry();
190 }
191
192 template <class T>
193 template <typename Executor>
194 inline Future<T> Future<T>::via(Executor* executor) {
195   throwIfInvalid();
196
197   this->deactivate();
198   core_->setExecutor(executor);
199
200   return std::move(*this);
201 }
202
203 template <class T>
204 bool Future<T>::isReady() const {
205   throwIfInvalid();
206   return core_->ready();
207 }
208
209 // makeFuture
210
211 template <class T>
212 Future<typename std::decay<T>::type> makeFuture(T&& t) {
213   Promise<typename std::decay<T>::type> p;
214   auto f = p.getFuture();
215   p.setValue(std::forward<T>(t));
216   return std::move(f);
217 }
218
219 inline // for multiple translation units
220 Future<void> makeFuture() {
221   Promise<void> p;
222   auto f = p.getFuture();
223   p.setValue();
224   return std::move(f);
225 }
226
227 template <class F>
228 auto makeFutureTry(
229     F&& func,
230     typename std::enable_if<!std::is_reference<F>::value, bool>::type sdf)
231     -> Future<decltype(func())> {
232   Promise<decltype(func())> p;
233   auto f = p.getFuture();
234   p.fulfil(
235     [&func]() {
236       return (func)();
237     });
238   return std::move(f);
239 }
240
241 template <class F>
242 auto makeFutureTry(F const& func) -> Future<decltype(func())> {
243   F copy = func;
244   return makeFutureTry(std::move(copy));
245 }
246
247 template <class T>
248 Future<T> makeFuture(std::exception_ptr const& e) {
249   Promise<T> p;
250   auto f = p.getFuture();
251   p.setException(e);
252   return std::move(f);
253 }
254
255 template <class T, class E>
256 typename std::enable_if<std::is_base_of<std::exception, E>::value,
257                         Future<T>>::type
258 makeFuture(E const& e) {
259   Promise<T> p;
260   auto f = p.getFuture();
261   p.fulfil([&]() -> T { throw e; });
262   return std::move(f);
263 }
264
265 template <class T>
266 Future<T> makeFuture(Try<T>&& t) {
267   try {
268     return makeFuture<T>(std::move(t.value()));
269   } catch (...) {
270     return makeFuture<T>(std::current_exception());
271   }
272 }
273
274 template <>
275 inline Future<void> makeFuture(Try<void>&& t) {
276   try {
277     t.throwIfFailed();
278     return makeFuture();
279   } catch (...) {
280     return makeFuture<void>(std::current_exception());
281   }
282 }
283
284 // when (variadic)
285
286 template <typename... Fs>
287 typename detail::VariadicContext<
288   typename std::decay<Fs>::type::value_type...>::type
289 whenAll(Fs&&... fs)
290 {
291   auto ctx =
292     new detail::VariadicContext<typename std::decay<Fs>::type::value_type...>();
293   ctx->total = sizeof...(fs);
294   auto f_saved = ctx->p.getFuture();
295   detail::whenAllVariadicHelper(ctx,
296     std::forward<typename std::decay<Fs>::type>(fs)...);
297   return std::move(f_saved);
298 }
299
300 // when (iterator)
301
302 template <class InputIterator>
303 Future<
304   std::vector<
305   Try<typename std::iterator_traits<InputIterator>::value_type::value_type>>>
306 whenAll(InputIterator first, InputIterator last)
307 {
308   typedef
309     typename std::iterator_traits<InputIterator>::value_type::value_type T;
310
311   auto n = std::distance(first, last);
312   if (n == 0) {
313     return makeFuture(std::vector<Try<T>>());
314   }
315
316   auto ctx = new detail::WhenAllContext<T>();
317
318   ctx->total = n;
319   ctx->results.resize(ctx->total);
320
321   auto f_saved = ctx->p.getFuture();
322
323   for (size_t i = 0; first != last; ++first, ++i) {
324      auto& f = *first;
325      f.setCallback_([ctx, i](Try<T>&& t) {
326          ctx->results[i] = std::move(t);
327          if (++ctx->count == ctx->total) {
328            ctx->p.setValue(std::move(ctx->results));
329            delete ctx;
330          }
331        });
332   }
333
334   return std::move(f_saved);
335 }
336
337 template <class InputIterator>
338 Future<
339   std::pair<size_t,
340             Try<
341               typename
342               std::iterator_traits<InputIterator>::value_type::value_type> > >
343 whenAny(InputIterator first, InputIterator last) {
344   typedef
345     typename std::iterator_traits<InputIterator>::value_type::value_type T;
346
347   auto ctx = new detail::WhenAnyContext<T>(std::distance(first, last));
348   auto f_saved = ctx->p.getFuture();
349
350   for (size_t i = 0; first != last; first++, i++) {
351     auto& f = *first;
352     f.setCallback_([i, ctx](Try<T>&& t) {
353       if (!ctx->done.exchange(true)) {
354         ctx->p.setValue(std::make_pair(i, std::move(t)));
355       }
356       ctx->decref();
357     });
358   }
359
360   return std::move(f_saved);
361 }
362
363 template <class InputIterator>
364 Future<std::vector<std::pair<size_t, Try<typename
365   std::iterator_traits<InputIterator>::value_type::value_type>>>>
366 whenN(InputIterator first, InputIterator last, size_t n) {
367   typedef typename
368     std::iterator_traits<InputIterator>::value_type::value_type T;
369   typedef std::vector<std::pair<size_t, Try<T>>> V;
370
371   struct ctx_t {
372     V v;
373     size_t completed;
374     Promise<V> p;
375   };
376   auto ctx = std::make_shared<ctx_t>();
377   ctx->completed = 0;
378
379   // for each completed Future, increase count and add to vector, until we
380   // have n completed futures at which point we fulfil our Promise with the
381   // vector
382   auto it = first;
383   size_t i = 0;
384   while (it != last) {
385     it->then([ctx, n, i](Try<T>&& t) {
386       auto& v = ctx->v;
387       auto c = ++ctx->completed;
388       if (c <= n) {
389         assert(ctx->v.size() < n);
390         v.push_back(std::make_pair(i, std::move(t)));
391         if (c == n) {
392           ctx->p.fulfilTry(Try<V>(std::move(v)));
393         }
394       }
395     });
396
397     it++;
398     i++;
399   }
400
401   if (i < n) {
402     ctx->p.setException(std::runtime_error("Not enough futures"));
403   }
404
405   return ctx->p.getFuture();
406 }
407
408 template <typename T>
409 Future<T>
410 waitWithSemaphore(Future<T>&& f) {
411   Baton<> baton;
412   auto done = f.then([&](Try<T> &&t) {
413     baton.post();
414     return std::move(t.value());
415   });
416   baton.wait();
417   while (!done.isReady()) {
418     // There's a race here between the return here and the actual finishing of
419     // the future. f is completed, but the setup may not have finished on done
420     // after the baton has posted.
421     std::this_thread::yield();
422   }
423   return done;
424 }
425
426 template<>
427 inline Future<void> waitWithSemaphore<void>(Future<void>&& f) {
428   Baton<> baton;
429   auto done = f.then([&](Try<void> &&t) {
430     baton.post();
431     t.value();
432   });
433   baton.wait();
434   while (!done.isReady()) {
435     // There's a race here between the return here and the actual finishing of
436     // the future. f is completed, but the setup may not have finished on done
437     // after the baton has posted.
438     std::this_thread::yield();
439   }
440   return done;
441 }
442
443 template <typename T, class Duration>
444 Future<T>
445 waitWithSemaphore(Future<T>&& f, Duration timeout) {
446   auto baton = std::make_shared<Baton<>>();
447   auto done = f.then([baton](Try<T> &&t) {
448     baton->post();
449     return std::move(t.value());
450   });
451   baton->timed_wait(std::chrono::system_clock::now() + timeout);
452   return done;
453 }
454
455 template <class Duration>
456 Future<void>
457 waitWithSemaphore(Future<void>&& f, Duration timeout) {
458   auto baton = std::make_shared<Baton<>>();
459   auto done = f.then([baton](Try<void> &&t) {
460     baton->post();
461     t.value();
462   });
463   baton->timed_wait(std::chrono::system_clock::now() + timeout);
464   return done;
465 }
466
467 }}
468
469 // I haven't included a Future<T&> specialization because I don't forsee us
470 // using it, however it is not difficult to add when needed. Refer to
471 // Future<void> for guidance. std::future and boost::future code would also be
472 // instructive.