2 * Copyright 2015 Facebook, Inc.
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
8 * http://www.apache.org/licenses/LICENSE-2.0
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
20 #include <folly/CPortability.h>
21 #include <folly/Memory.h>
22 #include <folly/Optional.h>
23 #include <folly/Portability.h>
24 #include <folly/ScopeGuard.h>
26 #include <folly/ThreadLocal.h>
28 #include <folly/experimental/fibers/Baton.h>
29 #include <folly/experimental/fibers/Fiber.h>
30 #include <folly/experimental/fibers/LoopController.h>
31 #include <folly/experimental/fibers/Promise.h>
32 #include <folly/futures/Try.h>
34 namespace folly { namespace fibers {
38 inline FiberManager::Options preprocessOptions(FiberManager::Options opts) {
39 #ifdef FOLLY_SANITIZE_ADDRESS
40 /* ASAN needs a lot of extra stack space.
41 16x is a conservative estimate, 8x also worked with tests
42 where it mattered. Note that overallocating here does not necessarily
43 increase RSS, since unused memory is pretty much free. */
51 inline void FiberManager::ensureLoopScheduled() {
52 if (isLoopScheduled_) {
56 isLoopScheduled_ = true;
57 loopController_->schedule();
60 inline void FiberManager::runReadyFiber(Fiber* fiber) {
62 assert(currentFiber_ == nullptr);
63 assert(activeFiber_ == nullptr);
66 assert(fiber->state_ == Fiber::NOT_STARTED ||
67 fiber->state_ == Fiber::READY_TO_RUN);
68 currentFiber_ = fiber;
70 observer_->starting(reinterpret_cast<uintptr_t>(fiber));
73 while (fiber->state_ == Fiber::NOT_STARTED ||
74 fiber->state_ == Fiber::READY_TO_RUN) {
76 jumpContext(&mainContext_, &fiber->fcontext_, fiber->data_);
77 if (fiber->state_ == Fiber::AWAITING_IMMEDIATE) {
81 exceptionCallback_(std::current_exception(), "running immediateFunc_");
83 immediateFunc_ = nullptr;
84 fiber->state_ = Fiber::READY_TO_RUN;
88 if (fiber->state_ == Fiber::AWAITING) {
92 observer_->stopped(reinterpret_cast<uintptr_t>(fiber));
94 currentFiber_ = nullptr;
95 } else if (fiber->state_ == Fiber::INVALID) {
96 assert(fibersActive_ > 0);
98 // Making sure that task functor is deleted once task is complete.
99 // NOTE: we must do it on main context, as the fiber is not
100 // running at this point.
101 fiber->func_ = nullptr;
102 fiber->resultFunc_ = nullptr;
103 if (fiber->finallyFunc_) {
105 fiber->finallyFunc_();
107 exceptionCallback_(std::current_exception(), "running finallyFunc_");
109 fiber->finallyFunc_ = nullptr;
111 // Make sure LocalData is not accessible from its destructor
113 observer_->stopped(reinterpret_cast<uintptr_t>(fiber));
115 currentFiber_ = nullptr;
116 fiber->localData_.reset();
118 if (fibersPoolSize_ < options_.maxFibersPoolSize) {
119 fibersPool_.push_front(*fiber);
123 assert(fibersAllocated_ > 0);
126 } else if (fiber->state_ == Fiber::YIELDED) {
128 observer_->stopped(reinterpret_cast<uintptr_t>(fiber));
130 currentFiber_ = nullptr;
131 fiber->state_ = Fiber::READY_TO_RUN;
132 yieldedFibers_.push_back(*fiber);
136 inline bool FiberManager::loopUntilNoReady() {
138 isLoopScheduled_ = false;
139 if (!readyFibers_.empty()) {
140 ensureLoopScheduled();
142 currentFiberManager_ = nullptr;
145 currentFiberManager_ = this;
147 bool hadRemoteFiber = true;
148 while (hadRemoteFiber) {
149 hadRemoteFiber = false;
151 while (!readyFibers_.empty()) {
152 auto& fiber = readyFibers_.front();
153 readyFibers_.pop_front();
154 runReadyFiber(&fiber);
157 remoteReadyQueue_.sweep(
158 [this, &hadRemoteFiber] (Fiber* fiber) {
159 runReadyFiber(fiber);
160 hadRemoteFiber = true;
164 remoteTaskQueue_.sweep(
165 [this, &hadRemoteFiber] (RemoteTask* taskPtr) {
166 std::unique_ptr<RemoteTask> task(taskPtr);
167 auto fiber = getFiber();
168 if (task->localData) {
169 fiber->localData_ = *task->localData;
172 fiber->setFunction(std::move(task->func));
173 fiber->data_ = reinterpret_cast<intptr_t>(fiber);
175 observer_->runnable(reinterpret_cast<uintptr_t>(fiber));
177 runReadyFiber(fiber);
178 hadRemoteFiber = true;
184 for (auto& yielded : yieldedFibers_) {
185 observer_->runnable(reinterpret_cast<uintptr_t>(&yielded));
188 readyFibers_.splice(readyFibers_.end(), yieldedFibers_);
190 return fibersActive_ > 0;
193 // We need this to be in a struct, not inlined in addTask, because clang crashes
195 template <typename F>
196 struct FiberManager::AddTaskHelper {
199 static constexpr bool allocateInBuffer =
200 sizeof(Func) <= Fiber::kUserBufferSize;
204 Func(F&& func, FiberManager& fm) :
205 func_(std::forward<F>(func)), fm_(fm) {}
211 fm_.exceptionCallback_(std::current_exception(),
212 "running Func functor");
214 if (allocateInBuffer) {
227 template <typename F>
228 void FiberManager::addTask(F&& func) {
229 typedef AddTaskHelper<F> Helper;
231 auto fiber = getFiber();
232 initLocalData(*fiber);
234 if (Helper::allocateInBuffer) {
235 auto funcLoc = static_cast<typename Helper::Func*>(fiber->getUserBuffer());
236 new (funcLoc) typename Helper::Func(std::forward<F>(func), *this);
238 fiber->setFunction(std::ref(*funcLoc));
240 auto funcLoc = new typename Helper::Func(std::forward<F>(func), *this);
242 fiber->setFunction(std::ref(*funcLoc));
245 fiber->data_ = reinterpret_cast<intptr_t>(fiber);
246 readyFibers_.push_back(*fiber);
248 observer_->runnable(reinterpret_cast<uintptr_t>(fiber));
251 ensureLoopScheduled();
254 template <typename F>
255 void FiberManager::addTaskRemote(F&& func) {
257 auto currentFm = getFiberManagerUnsafe();
259 currentFm->currentFiber_ &&
260 currentFm->localType_ == localType_) {
261 return folly::make_unique<RemoteTask>(
262 std::forward<F>(func),
263 currentFm->currentFiber_->localData_);
265 return folly::make_unique<RemoteTask>(std::forward<F>(func));
267 if (remoteTaskQueue_.insertHead(task.release())) {
268 loopController_->scheduleThreadSafe();
272 template <typename X>
273 struct IsRvalueRefTry { static const bool value = false; };
274 template <typename T>
275 struct IsRvalueRefTry<folly::Try<T>&&> { static const bool value = true; };
277 // We need this to be in a struct, not inlined in addTaskFinally, because clang
278 // crashes otherwise.
279 template <typename F, typename G>
280 struct FiberManager::AddTaskFinallyHelper {
284 typedef typename std::result_of<F()>::type Result;
286 static constexpr bool allocateInBuffer =
287 sizeof(Func) + sizeof(Finally) <= Fiber::kUserBufferSize;
293 finally_(std::move(finally)),
299 finally_(std::move(*result_));
301 fm_.exceptionCallback_(std::current_exception(),
302 "running Finally functor");
305 if (allocateInBuffer) {
316 folly::Optional<folly::Try<Result>> result_;
322 Func(F&& func, Finally& finally) :
323 func_(std::move(func)), result_(finally.result_) {}
326 result_ = folly::makeTryWith(std::move(func_));
328 if (allocateInBuffer) {
337 folly::Optional<folly::Try<Result>>& result_;
341 template <typename F, typename G>
342 void FiberManager::addTaskFinally(F&& func, G&& finally) {
343 typedef typename std::result_of<F()>::type Result;
346 IsRvalueRefTry<typename FirstArgOf<G>::type>::value,
347 "finally(arg): arg must be Try<T>&&");
351 typename std::remove_reference<
352 typename FirstArgOf<G>::type
353 >::type::element_type
355 "finally(Try<T>&&): T must be convertible from func()'s return type");
357 auto fiber = getFiber();
358 initLocalData(*fiber);
360 typedef AddTaskFinallyHelper<F,G> Helper;
362 if (Helper::allocateInBuffer) {
363 auto funcLoc = static_cast<typename Helper::Func*>(
364 fiber->getUserBuffer());
365 auto finallyLoc = static_cast<typename Helper::Finally*>(
366 static_cast<void*>(funcLoc + 1));
368 new (finallyLoc) typename Helper::Finally(std::move(finally), *this);
369 new (funcLoc) typename Helper::Func(std::move(func), *finallyLoc);
371 fiber->setFunctionFinally(std::ref(*funcLoc), std::ref(*finallyLoc));
373 auto finallyLoc = new typename Helper::Finally(std::move(finally), *this);
374 auto funcLoc = new typename Helper::Func(std::move(func), *finallyLoc);
376 fiber->setFunctionFinally(std::ref(*funcLoc), std::ref(*finallyLoc));
379 fiber->data_ = reinterpret_cast<intptr_t>(fiber);
380 readyFibers_.push_back(*fiber);
382 observer_->runnable(reinterpret_cast<uintptr_t>(fiber));
385 ensureLoopScheduled();
388 template <typename F>
389 typename std::result_of<F()>::type
390 FiberManager::runInMainContext(F&& func) {
391 return runInMainContextHelper(std::forward<F>(func));
394 template <typename F>
395 inline typename std::enable_if<
396 !std::is_same<typename std::result_of<F()>::type, void>::value,
397 typename std::result_of<F()>::type>::type
398 FiberManager::runInMainContextHelper(F&& func) {
399 if (UNLIKELY(activeFiber_ == nullptr)) {
403 typedef typename std::result_of<F()>::type Result;
405 folly::Try<Result> result;
406 auto f = [&func, &result]() mutable {
407 result = folly::makeTryWith(std::forward<F>(func));
410 immediateFunc_ = std::ref(f);
411 activeFiber_->preempt(Fiber::AWAITING_IMMEDIATE);
413 return std::move(result.value());
416 template <typename F>
417 inline typename std::enable_if<
418 std::is_same<typename std::result_of<F()>::type, void>::value,
420 FiberManager::runInMainContextHelper(F&& func) {
421 if (UNLIKELY(activeFiber_ == nullptr)) {
426 immediateFunc_ = std::ref(func);
427 activeFiber_->preempt(Fiber::AWAITING_IMMEDIATE);
430 inline FiberManager& FiberManager::getFiberManager() {
431 assert(currentFiberManager_ != nullptr);
432 return *currentFiberManager_;
435 inline FiberManager* FiberManager::getFiberManagerUnsafe() {
436 return currentFiberManager_;
439 inline bool FiberManager::hasActiveFiber() const {
440 return activeFiber_ != nullptr;
443 inline void FiberManager::yield() {
444 assert(currentFiberManager_ == this);
445 assert(activeFiber_ != nullptr);
446 assert(activeFiber_->state_ == Fiber::RUNNING);
447 activeFiber_->preempt(Fiber::YIELDED);
450 template <typename T>
451 T& FiberManager::local() {
452 if (std::type_index(typeid(T)) == localType_ && currentFiber_) {
453 return currentFiber_->localData_.get<T>();
455 return localThread<T>();
458 template <typename T>
459 T& FiberManager::localThread() {
461 static thread_local T t;
463 #else // osx doesn't support thread_local
464 static ThreadLocal<T> t;
469 inline void FiberManager::initLocalData(Fiber& fiber) {
470 auto fm = getFiberManagerUnsafe();
471 if (fm && fm->currentFiber_ && fm->localType_ == localType_) {
472 fiber.localData_ = fm->currentFiber_->localData_;
476 template <typename LocalT>
477 FiberManager::FiberManager(
479 std::unique_ptr<LoopController> loopController__,
481 loopController_(std::move(loopController__)),
482 options_(preprocessOptions(std::move(options))),
483 exceptionCallback_([](std::exception_ptr eptr, std::string context) {
485 std::rethrow_exception(eptr);
486 } catch (const std::exception& e) {
487 LOG(DFATAL) << "Exception " << typeid(e).name()
488 << " with message '" << e.what() << "' was thrown in "
489 << "FiberManager with context '" << context << "'";
492 LOG(DFATAL) << "Unknown exception was thrown in FiberManager with "
493 << "context '" << context << "'";
497 timeoutManager_(std::make_shared<TimeoutController>(*loopController_)),
498 localType_(typeid(LocalT)) {
499 loopController_->setFiberManager(this);
502 template <typename F>
503 typename FirstArgOf<F>::type::value_type
504 inline await(F&& func) {
505 typedef typename FirstArgOf<F>::type::value_type Result;
507 folly::Try<Result> result;
510 baton.wait([&func, &result, &baton]() mutable {
511 func(Promise<Result>(result, baton));
514 return folly::moveFromTry(std::move(result));