Add ability to yield execution of the currently running fiber
[folly.git] / folly / experimental / fibers / FiberManager-inl.h
1 /*
2  * Copyright 2015 Facebook, Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *   http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 #pragma once
17
18 #include <cassert>
19
20 #include <folly/Memory.h>
21 #include <folly/Optional.h>
22 #include <folly/Portability.h>
23 #include <folly/ScopeGuard.h>
24 #include <folly/experimental/fibers/Baton.h>
25 #include <folly/experimental/fibers/Fiber.h>
26 #include <folly/experimental/fibers/Promise.h>
27 #include <folly/experimental/fibers/LoopController.h>
28 #include <folly/futures/Try.h>
29
30 namespace folly { namespace fibers {
31
32 inline void FiberManager::ensureLoopScheduled() {
33   if (isLoopScheduled_) {
34     return;
35   }
36
37   isLoopScheduled_ = true;
38   loopController_->schedule();
39 }
40
41 inline void FiberManager::runReadyFiber(Fiber* fiber) {
42   assert(fiber->state_ == Fiber::NOT_STARTED ||
43          fiber->state_ == Fiber::READY_TO_RUN);
44   currentFiber_ = fiber;
45
46   while (fiber->state_ == Fiber::NOT_STARTED ||
47          fiber->state_ == Fiber::READY_TO_RUN) {
48     activeFiber_ = fiber;
49     jumpContext(&mainContext_, &fiber->fcontext_, fiber->data_);
50     if (fiber->state_ == Fiber::AWAITING_IMMEDIATE) {
51       try {
52         immediateFunc_();
53       } catch (...) {
54         exceptionCallback_(std::current_exception(), "running immediateFunc_");
55       }
56       immediateFunc_ = nullptr;
57       fiber->state_ = Fiber::READY_TO_RUN;
58     }
59   }
60
61   if (fiber->state_ == Fiber::AWAITING) {
62     awaitFunc_(*fiber);
63     awaitFunc_ = nullptr;
64   } else if (fiber->state_ == Fiber::INVALID) {
65     assert(fibersActive_ > 0);
66     --fibersActive_;
67     // Making sure that task functor is deleted once task is complete.
68     // NOTE: we must do it on main context, as the fiber is not
69     // running at this point.
70     fiber->func_ = nullptr;
71     fiber->resultFunc_ = nullptr;
72     if (fiber->finallyFunc_) {
73       try {
74         fiber->finallyFunc_();
75       } catch (...) {
76         exceptionCallback_(std::current_exception(), "running finallyFunc_");
77       }
78       fiber->finallyFunc_ = nullptr;
79     }
80     fiber->localData_.reset();
81
82     if (fibersPoolSize_ < options_.maxFibersPoolSize) {
83       fibersPool_.push_front(*fiber);
84       ++fibersPoolSize_;
85     } else {
86       delete fiber;
87       assert(fibersAllocated_ > 0);
88       --fibersAllocated_;
89     }
90   } else if (fiber->state_ == Fiber::YIELDED) {
91     fiber->state_ = Fiber::READY_TO_RUN;
92     yieldedFibers_.push_back(*fiber);
93   }
94   currentFiber_ = nullptr;
95 }
96
97 inline bool FiberManager::loopUntilNoReady() {
98   SCOPE_EXIT {
99     isLoopScheduled_ = false;
100     currentFiberManager_ = nullptr;
101   };
102
103   currentFiberManager_ = this;
104
105   bool hadRemoteFiber = true;
106   while (hadRemoteFiber) {
107     hadRemoteFiber = false;
108
109     while (!readyFibers_.empty()) {
110       auto& fiber = readyFibers_.front();
111       readyFibers_.pop_front();
112       runReadyFiber(&fiber);
113     }
114
115     remoteReadyQueue_.sweep(
116       [this, &hadRemoteFiber] (Fiber* fiber) {
117         runReadyFiber(fiber);
118         hadRemoteFiber = true;
119       }
120     );
121
122     remoteTaskQueue_.sweep(
123       [this, &hadRemoteFiber] (RemoteTask* taskPtr) {
124         std::unique_ptr<RemoteTask> task(taskPtr);
125         auto fiber = getFiber();
126         if (task->localData) {
127           fiber->localData_ = *task->localData;
128         }
129
130         fiber->setFunction(std::move(task->func));
131         fiber->data_ = reinterpret_cast<intptr_t>(fiber);
132         runReadyFiber(fiber);
133         hadRemoteFiber = true;
134       }
135     );
136   }
137
138   if (!yieldedFibers_.empty()) {
139     readyFibers_.splice(readyFibers_.end(), yieldedFibers_);
140     ensureLoopScheduled();
141   }
142
143   return fibersActive_ > 0;
144 }
145
146 // We need this to be in a struct, not inlined in addTask, because clang crashes
147 // otherwise.
148 template <typename F>
149 struct FiberManager::AddTaskHelper {
150   class Func;
151
152   static constexpr bool allocateInBuffer =
153     sizeof(Func) <= Fiber::kUserBufferSize;
154
155   class Func {
156    public:
157     Func(F&& func, FiberManager& fm) :
158         func_(std::forward<F>(func)), fm_(fm) {}
159
160     void operator()() {
161       try {
162         func_();
163       } catch (...) {
164         fm_.exceptionCallback_(std::current_exception(),
165                                "running Func functor");
166       }
167       if (allocateInBuffer) {
168         this->~Func();
169       } else {
170         delete this;
171       }
172     }
173
174    private:
175     F func_;
176     FiberManager& fm_;
177   };
178 };
179
180 template <typename F>
181 void FiberManager::addTask(F&& func) {
182   typedef AddTaskHelper<F> Helper;
183
184   auto fiber = getFiber();
185   initLocalData(*fiber);
186
187   if (Helper::allocateInBuffer) {
188     auto funcLoc = static_cast<typename Helper::Func*>(fiber->getUserBuffer());
189     new (funcLoc) typename Helper::Func(std::forward<F>(func), *this);
190
191     fiber->setFunction(std::ref(*funcLoc));
192   } else {
193     auto funcLoc = new typename Helper::Func(std::forward<F>(func), *this);
194
195     fiber->setFunction(std::ref(*funcLoc));
196   }
197
198   fiber->data_ = reinterpret_cast<intptr_t>(fiber);
199   readyFibers_.push_back(*fiber);
200
201   ensureLoopScheduled();
202 }
203
204 template <typename F>
205 void FiberManager::addTaskRemote(F&& func) {
206   auto task = [&]() {
207     auto currentFm = getFiberManagerUnsafe();
208     if (currentFm &&
209         currentFm->currentFiber_ &&
210         currentFm->localType_ == localType_) {
211       return folly::make_unique<RemoteTask>(
212         std::forward<F>(func),
213         currentFm->currentFiber_->localData_);
214     }
215     return folly::make_unique<RemoteTask>(std::forward<F>(func));
216   }();
217   if (remoteTaskQueue_.insertHead(task.release())) {
218     loopController_->scheduleThreadSafe();
219   }
220 }
221
222 template <typename X>
223 struct IsRvalueRefTry { static const bool value = false; };
224 template <typename T>
225 struct IsRvalueRefTry<folly::Try<T>&&> { static const bool value = true; };
226
227 // We need this to be in a struct, not inlined in addTaskFinally, because clang
228 // crashes otherwise.
229 template <typename F, typename G>
230 struct FiberManager::AddTaskFinallyHelper {
231   class Func;
232   class Finally;
233
234   typedef typename std::result_of<F()>::type Result;
235
236   static constexpr bool allocateInBuffer =
237     sizeof(Func) + sizeof(Finally) <= Fiber::kUserBufferSize;
238
239   class Finally {
240    public:
241     Finally(G&& finally,
242             FiberManager& fm) :
243         finally_(std::move(finally)),
244         fm_(fm) {
245     }
246
247     void operator()() {
248       try {
249         finally_(std::move(*result_));
250       } catch (...) {
251         fm_.exceptionCallback_(std::current_exception(),
252                                "running Finally functor");
253       }
254
255       if (allocateInBuffer) {
256         this->~Finally();
257       } else {
258         delete this;
259       }
260     }
261
262    private:
263     friend class Func;
264
265     G finally_;
266     folly::Optional<folly::Try<Result>> result_;
267     FiberManager& fm_;
268   };
269
270   class Func {
271    public:
272     Func(F&& func, Finally& finally) :
273         func_(std::move(func)), result_(finally.result_) {}
274
275     void operator()() {
276       result_ = folly::makeTryFunction(std::move(func_));
277
278       if (allocateInBuffer) {
279         this->~Func();
280       } else {
281         delete this;
282       }
283     }
284
285    private:
286     F func_;
287     folly::Optional<folly::Try<Result>>& result_;
288   };
289 };
290
291 template <typename F, typename G>
292 void FiberManager::addTaskFinally(F&& func, G&& finally) {
293   typedef typename std::result_of<F()>::type Result;
294
295   static_assert(
296     IsRvalueRefTry<typename FirstArgOf<G>::type>::value,
297     "finally(arg): arg must be Try<T>&&");
298   static_assert(
299     std::is_convertible<
300       Result,
301       typename std::remove_reference<
302         typename FirstArgOf<G>::type
303       >::type::element_type
304     >::value,
305     "finally(Try<T>&&): T must be convertible from func()'s return type");
306
307   auto fiber = getFiber();
308   initLocalData(*fiber);
309
310   typedef AddTaskFinallyHelper<F,G> Helper;
311
312   if (Helper::allocateInBuffer) {
313     auto funcLoc = static_cast<typename Helper::Func*>(
314       fiber->getUserBuffer());
315     auto finallyLoc = static_cast<typename Helper::Finally*>(
316       static_cast<void*>(funcLoc + 1));
317
318     new (finallyLoc) typename Helper::Finally(std::move(finally), *this);
319     new (funcLoc) typename Helper::Func(std::move(func), *finallyLoc);
320
321     fiber->setFunctionFinally(std::ref(*funcLoc), std::ref(*finallyLoc));
322   } else {
323     auto finallyLoc = new typename Helper::Finally(std::move(finally), *this);
324     auto funcLoc = new typename Helper::Func(std::move(func), *finallyLoc);
325
326     fiber->setFunctionFinally(std::ref(*funcLoc), std::ref(*finallyLoc));
327   }
328
329   fiber->data_ = reinterpret_cast<intptr_t>(fiber);
330   readyFibers_.push_back(*fiber);
331
332   ensureLoopScheduled();
333 }
334
335 template <typename F>
336 typename std::result_of<F()>::type
337 FiberManager::runInMainContext(F&& func) {
338   return runInMainContextHelper(std::forward<F>(func));
339 }
340
341 template <typename F>
342 inline typename std::enable_if<
343   !std::is_same<typename std::result_of<F()>::type, void>::value,
344   typename std::result_of<F()>::type>::type
345 FiberManager::runInMainContextHelper(F&& func) {
346   if (UNLIKELY(activeFiber_ == nullptr)) {
347     return func();
348   }
349
350   typedef typename std::result_of<F()>::type Result;
351
352   folly::Try<Result> result;
353   auto f = [&func, &result]() mutable {
354     result = folly::makeTryFunction(std::forward<F>(func));
355   };
356
357   immediateFunc_ = std::ref(f);
358   activeFiber_->preempt(Fiber::AWAITING_IMMEDIATE);
359
360   return std::move(result.value());
361 }
362
363 template <typename F>
364 inline typename std::enable_if<
365   std::is_same<typename std::result_of<F()>::type, void>::value,
366   void>::type
367 FiberManager::runInMainContextHelper(F&& func) {
368   if (UNLIKELY(activeFiber_ == nullptr)) {
369     func();
370     return;
371   }
372
373   immediateFunc_ = std::ref(func);
374   activeFiber_->preempt(Fiber::AWAITING_IMMEDIATE);
375 }
376
377 inline FiberManager& FiberManager::getFiberManager() {
378   assert(currentFiberManager_ != nullptr);
379   return *currentFiberManager_;
380 }
381
382 inline FiberManager* FiberManager::getFiberManagerUnsafe() {
383   return currentFiberManager_;
384 }
385
386 inline bool FiberManager::hasActiveFiber() const {
387   return activeFiber_ != nullptr;
388 }
389
390 inline void FiberManager::yield() {
391   assert(currentFiberManager_ == this);
392   assert(activeFiber_ != nullptr);
393   assert(activeFiber_->state_ == Fiber::RUNNING);
394   activeFiber_->preempt(Fiber::YIELDED);
395 }
396
397 template <typename T>
398 T& FiberManager::local() {
399   if (std::type_index(typeid(T)) == localType_ && currentFiber_) {
400     return currentFiber_->localData_.get<T>();
401   }
402   return localThread<T>();
403 }
404
405 template <typename T>
406 T& FiberManager::localThread() {
407   static thread_local T t;
408   return t;
409 }
410
411 inline void FiberManager::initLocalData(Fiber& fiber) {
412   auto fm = getFiberManagerUnsafe();
413   if (fm && fm->currentFiber_ && fm->localType_ == localType_) {
414     fiber.localData_ = fm->currentFiber_->localData_;
415   }
416 }
417
418 template <typename LocalT>
419 FiberManager FiberManager::create(
420     std::unique_ptr<LoopController> loopController,
421     Options options) {
422   FiberManager fm(std::move(loopController), std::move(options));
423   fm.localType_ = typeid(LocalT);
424   return fm;
425 }
426
427 template <typename F>
428 typename FirstArgOf<F>::type::value_type
429 inline await(F&& func) {
430   typedef typename FirstArgOf<F>::type::value_type Result;
431
432   folly::Try<Result> result;
433
434   Baton baton;
435   baton.wait([&func, &result, &baton]() mutable {
436       func(Promise<Result>(result, baton));
437     });
438
439   return folly::moveFromTry(std::move(result));
440 }
441
442 }}