2 * Copyright 2015 Facebook, Inc.
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
8 * http://www.apache.org/licenses/LICENSE-2.0
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
20 #include <folly/Memory.h>
21 #include <folly/Optional.h>
22 #include <folly/Portability.h>
23 #include <folly/ScopeGuard.h>
24 #include <folly/experimental/fibers/Baton.h>
25 #include <folly/experimental/fibers/Fiber.h>
26 #include <folly/experimental/fibers/Promise.h>
27 #include <folly/experimental/fibers/LoopController.h>
28 #include <folly/futures/Try.h>
30 namespace folly { namespace fibers {
32 inline void FiberManager::ensureLoopScheduled() {
33 if (isLoopScheduled_) {
37 isLoopScheduled_ = true;
38 loopController_->schedule();
41 inline void FiberManager::runReadyFiber(Fiber* fiber) {
42 assert(fiber->state_ == Fiber::NOT_STARTED ||
43 fiber->state_ == Fiber::READY_TO_RUN);
44 currentFiber_ = fiber;
46 while (fiber->state_ == Fiber::NOT_STARTED ||
47 fiber->state_ == Fiber::READY_TO_RUN) {
49 jumpContext(&mainContext_, &fiber->fcontext_, fiber->data_);
50 if (fiber->state_ == Fiber::AWAITING_IMMEDIATE) {
54 exceptionCallback_(std::current_exception(), "running immediateFunc_");
56 immediateFunc_ = nullptr;
57 fiber->state_ = Fiber::READY_TO_RUN;
61 if (fiber->state_ == Fiber::AWAITING) {
64 } else if (fiber->state_ == Fiber::INVALID) {
65 assert(fibersActive_ > 0);
67 // Making sure that task functor is deleted once task is complete.
68 // NOTE: we must do it on main context, as the fiber is not
69 // running at this point.
70 fiber->func_ = nullptr;
71 fiber->resultFunc_ = nullptr;
72 if (fiber->finallyFunc_) {
74 fiber->finallyFunc_();
76 exceptionCallback_(std::current_exception(), "running finallyFunc_");
78 fiber->finallyFunc_ = nullptr;
80 fiber->localData_.reset();
82 if (fibersPoolSize_ < options_.maxFibersPoolSize) {
83 fibersPool_.push_front(*fiber);
87 assert(fibersAllocated_ > 0);
90 } else if (fiber->state_ == Fiber::YIELDED) {
91 fiber->state_ = Fiber::READY_TO_RUN;
92 yieldedFibers_.push_back(*fiber);
94 currentFiber_ = nullptr;
97 inline bool FiberManager::loopUntilNoReady() {
99 isLoopScheduled_ = false;
100 if (!readyFibers_.empty()) {
101 ensureLoopScheduled();
103 currentFiberManager_ = nullptr;
106 currentFiberManager_ = this;
108 bool hadRemoteFiber = true;
109 while (hadRemoteFiber) {
110 hadRemoteFiber = false;
112 while (!readyFibers_.empty()) {
113 auto& fiber = readyFibers_.front();
114 readyFibers_.pop_front();
115 runReadyFiber(&fiber);
118 remoteReadyQueue_.sweep(
119 [this, &hadRemoteFiber] (Fiber* fiber) {
120 runReadyFiber(fiber);
121 hadRemoteFiber = true;
125 remoteTaskQueue_.sweep(
126 [this, &hadRemoteFiber] (RemoteTask* taskPtr) {
127 std::unique_ptr<RemoteTask> task(taskPtr);
128 auto fiber = getFiber();
129 if (task->localData) {
130 fiber->localData_ = *task->localData;
133 fiber->setFunction(std::move(task->func));
134 fiber->data_ = reinterpret_cast<intptr_t>(fiber);
135 runReadyFiber(fiber);
136 hadRemoteFiber = true;
141 readyFibers_.splice(readyFibers_.end(), yieldedFibers_);
143 return fibersActive_ > 0;
146 // We need this to be in a struct, not inlined in addTask, because clang crashes
148 template <typename F>
149 struct FiberManager::AddTaskHelper {
152 static constexpr bool allocateInBuffer =
153 sizeof(Func) <= Fiber::kUserBufferSize;
157 Func(F&& func, FiberManager& fm) :
158 func_(std::forward<F>(func)), fm_(fm) {}
164 fm_.exceptionCallback_(std::current_exception(),
165 "running Func functor");
167 if (allocateInBuffer) {
180 template <typename F>
181 void FiberManager::addTask(F&& func) {
182 typedef AddTaskHelper<F> Helper;
184 auto fiber = getFiber();
185 initLocalData(*fiber);
187 if (Helper::allocateInBuffer) {
188 auto funcLoc = static_cast<typename Helper::Func*>(fiber->getUserBuffer());
189 new (funcLoc) typename Helper::Func(std::forward<F>(func), *this);
191 fiber->setFunction(std::ref(*funcLoc));
193 auto funcLoc = new typename Helper::Func(std::forward<F>(func), *this);
195 fiber->setFunction(std::ref(*funcLoc));
198 fiber->data_ = reinterpret_cast<intptr_t>(fiber);
199 readyFibers_.push_back(*fiber);
201 ensureLoopScheduled();
204 template <typename F>
205 void FiberManager::addTaskRemote(F&& func) {
207 auto currentFm = getFiberManagerUnsafe();
209 currentFm->currentFiber_ &&
210 currentFm->localType_ == localType_) {
211 return folly::make_unique<RemoteTask>(
212 std::forward<F>(func),
213 currentFm->currentFiber_->localData_);
215 return folly::make_unique<RemoteTask>(std::forward<F>(func));
217 if (remoteTaskQueue_.insertHead(task.release())) {
218 loopController_->scheduleThreadSafe();
222 template <typename X>
223 struct IsRvalueRefTry { static const bool value = false; };
224 template <typename T>
225 struct IsRvalueRefTry<folly::Try<T>&&> { static const bool value = true; };
227 // We need this to be in a struct, not inlined in addTaskFinally, because clang
228 // crashes otherwise.
229 template <typename F, typename G>
230 struct FiberManager::AddTaskFinallyHelper {
234 typedef typename std::result_of<F()>::type Result;
236 static constexpr bool allocateInBuffer =
237 sizeof(Func) + sizeof(Finally) <= Fiber::kUserBufferSize;
243 finally_(std::move(finally)),
249 finally_(std::move(*result_));
251 fm_.exceptionCallback_(std::current_exception(),
252 "running Finally functor");
255 if (allocateInBuffer) {
266 folly::Optional<folly::Try<Result>> result_;
272 Func(F&& func, Finally& finally) :
273 func_(std::move(func)), result_(finally.result_) {}
276 result_ = folly::makeTryFunction(std::move(func_));
278 if (allocateInBuffer) {
287 folly::Optional<folly::Try<Result>>& result_;
291 template <typename F, typename G>
292 void FiberManager::addTaskFinally(F&& func, G&& finally) {
293 typedef typename std::result_of<F()>::type Result;
296 IsRvalueRefTry<typename FirstArgOf<G>::type>::value,
297 "finally(arg): arg must be Try<T>&&");
301 typename std::remove_reference<
302 typename FirstArgOf<G>::type
303 >::type::element_type
305 "finally(Try<T>&&): T must be convertible from func()'s return type");
307 auto fiber = getFiber();
308 initLocalData(*fiber);
310 typedef AddTaskFinallyHelper<F,G> Helper;
312 if (Helper::allocateInBuffer) {
313 auto funcLoc = static_cast<typename Helper::Func*>(
314 fiber->getUserBuffer());
315 auto finallyLoc = static_cast<typename Helper::Finally*>(
316 static_cast<void*>(funcLoc + 1));
318 new (finallyLoc) typename Helper::Finally(std::move(finally), *this);
319 new (funcLoc) typename Helper::Func(std::move(func), *finallyLoc);
321 fiber->setFunctionFinally(std::ref(*funcLoc), std::ref(*finallyLoc));
323 auto finallyLoc = new typename Helper::Finally(std::move(finally), *this);
324 auto funcLoc = new typename Helper::Func(std::move(func), *finallyLoc);
326 fiber->setFunctionFinally(std::ref(*funcLoc), std::ref(*finallyLoc));
329 fiber->data_ = reinterpret_cast<intptr_t>(fiber);
330 readyFibers_.push_back(*fiber);
332 ensureLoopScheduled();
335 template <typename F>
336 typename std::result_of<F()>::type
337 FiberManager::runInMainContext(F&& func) {
338 return runInMainContextHelper(std::forward<F>(func));
341 template <typename F>
342 inline typename std::enable_if<
343 !std::is_same<typename std::result_of<F()>::type, void>::value,
344 typename std::result_of<F()>::type>::type
345 FiberManager::runInMainContextHelper(F&& func) {
346 if (UNLIKELY(activeFiber_ == nullptr)) {
350 typedef typename std::result_of<F()>::type Result;
352 folly::Try<Result> result;
353 auto f = [&func, &result]() mutable {
354 result = folly::makeTryFunction(std::forward<F>(func));
357 immediateFunc_ = std::ref(f);
358 activeFiber_->preempt(Fiber::AWAITING_IMMEDIATE);
360 return std::move(result.value());
363 template <typename F>
364 inline typename std::enable_if<
365 std::is_same<typename std::result_of<F()>::type, void>::value,
367 FiberManager::runInMainContextHelper(F&& func) {
368 if (UNLIKELY(activeFiber_ == nullptr)) {
373 immediateFunc_ = std::ref(func);
374 activeFiber_->preempt(Fiber::AWAITING_IMMEDIATE);
377 inline FiberManager& FiberManager::getFiberManager() {
378 assert(currentFiberManager_ != nullptr);
379 return *currentFiberManager_;
382 inline FiberManager* FiberManager::getFiberManagerUnsafe() {
383 return currentFiberManager_;
386 inline bool FiberManager::hasActiveFiber() const {
387 return activeFiber_ != nullptr;
390 inline void FiberManager::yield() {
391 assert(currentFiberManager_ == this);
392 assert(activeFiber_ != nullptr);
393 assert(activeFiber_->state_ == Fiber::RUNNING);
394 activeFiber_->preempt(Fiber::YIELDED);
397 template <typename T>
398 T& FiberManager::local() {
399 if (std::type_index(typeid(T)) == localType_ && currentFiber_) {
400 return currentFiber_->localData_.get<T>();
402 return localThread<T>();
405 template <typename T>
406 T& FiberManager::localThread() {
407 static thread_local T t;
411 inline void FiberManager::initLocalData(Fiber& fiber) {
412 auto fm = getFiberManagerUnsafe();
413 if (fm && fm->currentFiber_ && fm->localType_ == localType_) {
414 fiber.localData_ = fm->currentFiber_->localData_;
418 template <typename LocalT>
419 FiberManager::FiberManager(
421 std::unique_ptr<LoopController> loopController__,
423 loopController_(std::move(loopController__)),
424 options_(std::move(options)),
425 exceptionCallback_([](std::exception_ptr eptr, std::string context) {
427 std::rethrow_exception(eptr);
428 } catch (const std::exception& e) {
429 LOG(DFATAL) << "Exception " << typeid(e).name()
430 << " with message '" << e.what() << "' was thrown in "
431 << "FiberManager with context '" << context << "'";
434 LOG(DFATAL) << "Unknown exception was thrown in FiberManager with "
435 << "context '" << context << "'";
439 timeoutManager_(std::make_shared<TimeoutController>(*loopController_)),
440 localType_(typeid(LocalT)) {
441 loopController_->setFiberManager(this);
444 template <typename F>
445 typename FirstArgOf<F>::type::value_type
446 inline await(F&& func) {
447 typedef typename FirstArgOf<F>::type::value_type Result;
449 folly::Try<Result> result;
452 baton.wait([&func, &result, &baton]() mutable {
453 func(Promise<Result>(result, baton));
456 return folly::moveFromTry(std::move(result));