2 * Copyright 2016 Facebook, Inc.
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
8 * http://www.apache.org/licenses/LICENSE-2.0
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
20 #include <folly/CPortability.h>
21 #include <folly/Memory.h>
22 #include <folly/Optional.h>
23 #include <folly/Portability.h>
24 #include <folly/ScopeGuard.h>
26 #include <folly/ThreadLocal.h>
28 #include <folly/experimental/fibers/Baton.h>
29 #include <folly/experimental/fibers/Fiber.h>
30 #include <folly/experimental/fibers/LoopController.h>
31 #include <folly/experimental/fibers/Promise.h>
32 #include <folly/futures/Promise.h>
33 #include <folly/futures/Try.h>
35 namespace folly { namespace fibers {
39 inline FiberManager::Options preprocessOptions(FiberManager::Options opts) {
40 #ifdef FOLLY_SANITIZE_ADDRESS
41 /* ASAN needs a lot of extra stack space.
42 16x is a conservative estimate, 8x also worked with tests
43 where it mattered. Note that overallocating here does not necessarily
44 increase RSS, since unused memory is pretty much free. */
52 inline void FiberManager::ensureLoopScheduled() {
53 if (isLoopScheduled_) {
57 isLoopScheduled_ = true;
58 loopController_->schedule();
61 inline intptr_t FiberManager::activateFiber(Fiber* fiber) {
62 DCHECK_EQ(activeFiber_, (Fiber*)nullptr);
64 #ifdef FOLLY_SANITIZE_ADDRESS
65 registerFiberActivationWithAsan(fiber);
69 return jumpContext(&mainContext_, &fiber->fcontext_, fiber->data_);
72 inline intptr_t FiberManager::deactivateFiber(Fiber* fiber) {
73 DCHECK_EQ(activeFiber_, fiber);
75 #ifdef FOLLY_SANITIZE_ADDRESS
76 registerFiberDeactivationWithAsan(fiber);
79 activeFiber_ = nullptr;
80 return jumpContext(&fiber->fcontext_, &mainContext_, 0);
83 inline void FiberManager::runReadyFiber(Fiber* fiber) {
85 assert(currentFiber_ == nullptr);
86 assert(activeFiber_ == nullptr);
89 assert(fiber->state_ == Fiber::NOT_STARTED ||
90 fiber->state_ == Fiber::READY_TO_RUN);
91 currentFiber_ = fiber;
92 fiber->rcontext_ = RequestContext::setContext(std::move(fiber->rcontext_));
94 observer_->starting(reinterpret_cast<uintptr_t>(fiber));
97 while (fiber->state_ == Fiber::NOT_STARTED ||
98 fiber->state_ == Fiber::READY_TO_RUN) {
100 if (fiber->state_ == Fiber::AWAITING_IMMEDIATE) {
104 exceptionCallback_(std::current_exception(), "running immediateFunc_");
106 immediateFunc_ = nullptr;
107 fiber->state_ = Fiber::READY_TO_RUN;
111 if (fiber->state_ == Fiber::AWAITING) {
113 awaitFunc_ = nullptr;
115 observer_->stopped(reinterpret_cast<uintptr_t>(fiber));
117 currentFiber_ = nullptr;
118 fiber->rcontext_ = RequestContext::setContext(std::move(fiber->rcontext_));
119 } else if (fiber->state_ == Fiber::INVALID) {
120 assert(fibersActive_ > 0);
122 // Making sure that task functor is deleted once task is complete.
123 // NOTE: we must do it on main context, as the fiber is not
124 // running at this point.
125 fiber->func_ = nullptr;
126 fiber->resultFunc_ = nullptr;
127 if (fiber->finallyFunc_) {
129 fiber->finallyFunc_();
131 exceptionCallback_(std::current_exception(), "running finallyFunc_");
133 fiber->finallyFunc_ = nullptr;
135 // Make sure LocalData is not accessible from its destructor
137 observer_->stopped(reinterpret_cast<uintptr_t>(fiber));
139 currentFiber_ = nullptr;
140 fiber->rcontext_ = RequestContext::setContext(std::move(fiber->rcontext_));
141 fiber->localData_.reset();
142 fiber->rcontext_.reset();
144 if (fibersPoolSize_ < options_.maxFibersPoolSize ||
145 options_.fibersPoolResizePeriodMs > 0) {
146 fibersPool_.push_front(*fiber);
150 assert(fibersAllocated_ > 0);
153 } else if (fiber->state_ == Fiber::YIELDED) {
155 observer_->stopped(reinterpret_cast<uintptr_t>(fiber));
157 currentFiber_ = nullptr;
158 fiber->rcontext_ = RequestContext::setContext(std::move(fiber->rcontext_));
159 fiber->state_ = Fiber::READY_TO_RUN;
160 yieldedFibers_.push_back(*fiber);
164 inline bool FiberManager::loopUntilNoReady() {
165 // Support nested FiberManagers
166 auto originalFiberManager = this;
167 std::swap(currentFiberManager_, originalFiberManager);
170 isLoopScheduled_ = false;
171 if (!readyFibers_.empty()) {
172 ensureLoopScheduled();
174 std::swap(currentFiberManager_, originalFiberManager);
175 CHECK_EQ(this, originalFiberManager);
178 bool hadRemoteFiber = true;
179 while (hadRemoteFiber) {
180 hadRemoteFiber = false;
182 while (!readyFibers_.empty()) {
183 auto& fiber = readyFibers_.front();
184 readyFibers_.pop_front();
185 runReadyFiber(&fiber);
188 remoteReadyQueue_.sweep(
189 [this, &hadRemoteFiber] (Fiber* fiber) {
190 runReadyFiber(fiber);
191 hadRemoteFiber = true;
195 remoteTaskQueue_.sweep(
196 [this, &hadRemoteFiber] (RemoteTask* taskPtr) {
197 std::unique_ptr<RemoteTask> task(taskPtr);
198 auto fiber = getFiber();
199 if (task->localData) {
200 fiber->localData_ = *task->localData;
202 fiber->rcontext_ = std::move(task->rcontext);
204 fiber->setFunction(std::move(task->func));
205 fiber->data_ = reinterpret_cast<intptr_t>(fiber);
207 observer_->runnable(reinterpret_cast<uintptr_t>(fiber));
209 runReadyFiber(fiber);
210 hadRemoteFiber = true;
216 for (auto& yielded : yieldedFibers_) {
217 observer_->runnable(reinterpret_cast<uintptr_t>(&yielded));
220 readyFibers_.splice(readyFibers_.end(), yieldedFibers_);
222 return fibersActive_ > 0;
225 // We need this to be in a struct, not inlined in addTask, because clang crashes
227 template <typename F>
228 struct FiberManager::AddTaskHelper {
231 static constexpr bool allocateInBuffer =
232 sizeof(Func) <= Fiber::kUserBufferSize;
236 Func(F&& func, FiberManager& fm) :
237 func_(std::forward<F>(func)), fm_(fm) {}
243 fm_.exceptionCallback_(std::current_exception(),
244 "running Func functor");
246 if (allocateInBuffer) {
259 template <typename F>
260 void FiberManager::addTask(F&& func) {
261 typedef AddTaskHelper<F> Helper;
263 auto fiber = getFiber();
264 initLocalData(*fiber);
266 if (Helper::allocateInBuffer) {
267 auto funcLoc = static_cast<typename Helper::Func*>(fiber->getUserBuffer());
268 new (funcLoc) typename Helper::Func(std::forward<F>(func), *this);
270 fiber->setFunction(std::ref(*funcLoc));
272 auto funcLoc = new typename Helper::Func(std::forward<F>(func), *this);
274 fiber->setFunction(std::ref(*funcLoc));
277 fiber->data_ = reinterpret_cast<intptr_t>(fiber);
278 readyFibers_.push_back(*fiber);
280 observer_->runnable(reinterpret_cast<uintptr_t>(fiber));
283 ensureLoopScheduled();
286 template <typename F>
287 auto FiberManager::addTaskFuture(F&& func)
288 -> folly::Future<typename std::result_of<F()>::type> {
289 using T = typename std::result_of<F()>::type;
291 auto f = p.getFuture();
292 addTaskFinally([func = std::forward<F>(func)]() mutable { return func(); },
293 [p = std::move(p)](folly::Try<T> && t) mutable {
294 p.setTry(std::move(t));
299 template <typename F>
300 void FiberManager::addTaskRemote(F&& func) {
302 auto currentFm = getFiberManagerUnsafe();
304 currentFm->currentFiber_ &&
305 currentFm->localType_ == localType_) {
306 return folly::make_unique<RemoteTask>(
307 std::forward<F>(func), currentFm->currentFiber_->localData_);
309 return folly::make_unique<RemoteTask>(std::forward<F>(func));
312 [&]() { return remoteTaskQueue_.insertHead(task.release()); };
313 loopController_->scheduleThreadSafe(std::ref(insertHead));
316 template <typename F>
317 auto FiberManager::addTaskRemoteFuture(F&& func)
318 -> folly::Future<typename std::result_of<F()>::type> {
319 folly::Promise<typename std::result_of<F()>::type> p;
320 auto f = p.getFuture();
322 [ p = std::move(p), func = std::forward<F>(func), this ]() mutable {
323 auto t = folly::makeTryWith(std::forward<F>(func));
324 runInMainContext([&]() { p.setTry(std::move(t)); });
329 template <typename X>
330 struct IsRvalueRefTry { static const bool value = false; };
331 template <typename T>
332 struct IsRvalueRefTry<folly::Try<T>&&> { static const bool value = true; };
334 // We need this to be in a struct, not inlined in addTaskFinally, because clang
335 // crashes otherwise.
336 template <typename F, typename G>
337 struct FiberManager::AddTaskFinallyHelper {
340 typedef typename std::result_of<F()>::type Result;
344 Finally(G finally, FiberManager& fm)
345 : finally_(std::move(finally)), fm_(fm) {}
349 finally_(std::move(*result_));
351 fm_.exceptionCallback_(std::current_exception(),
352 "running Finally functor");
355 if (allocateInBuffer) {
366 folly::Optional<folly::Try<Result>> result_;
372 Func(F func, Finally& finally)
373 : func_(std::move(func)), result_(finally.result_) {}
376 result_ = folly::makeTryWith(std::move(func_));
378 if (allocateInBuffer) {
387 folly::Optional<folly::Try<Result>>& result_;
390 static constexpr bool allocateInBuffer =
391 sizeof(Func) + sizeof(Finally) <= Fiber::kUserBufferSize;
394 template <typename F, typename G>
395 void FiberManager::addTaskFinally(F&& func, G&& finally) {
396 typedef typename std::result_of<F()>::type Result;
399 IsRvalueRefTry<typename FirstArgOf<G>::type>::value,
400 "finally(arg): arg must be Try<T>&&");
404 typename std::remove_reference<
405 typename FirstArgOf<G>::type
406 >::type::element_type
408 "finally(Try<T>&&): T must be convertible from func()'s return type");
410 auto fiber = getFiber();
411 initLocalData(*fiber);
413 typedef AddTaskFinallyHelper<
414 typename std::decay<F>::type,
415 typename std::decay<G>::type>
418 if (Helper::allocateInBuffer) {
419 auto funcLoc = static_cast<typename Helper::Func*>(
420 fiber->getUserBuffer());
421 auto finallyLoc = static_cast<typename Helper::Finally*>(
422 static_cast<void*>(funcLoc + 1));
424 new (finallyLoc) typename Helper::Finally(std::forward<G>(finally), *this);
425 new (funcLoc) typename Helper::Func(std::forward<F>(func), *finallyLoc);
427 fiber->setFunctionFinally(std::ref(*funcLoc), std::ref(*finallyLoc));
430 new typename Helper::Finally(std::forward<G>(finally), *this);
432 new typename Helper::Func(std::forward<F>(func), *finallyLoc);
434 fiber->setFunctionFinally(std::ref(*funcLoc), std::ref(*finallyLoc));
437 fiber->data_ = reinterpret_cast<intptr_t>(fiber);
438 readyFibers_.push_back(*fiber);
440 observer_->runnable(reinterpret_cast<uintptr_t>(fiber));
443 ensureLoopScheduled();
446 template <typename F>
447 typename std::result_of<F()>::type
448 FiberManager::runInMainContext(F&& func) {
449 if (UNLIKELY(activeFiber_ == nullptr)) {
453 typedef typename std::result_of<F()>::type Result;
455 folly::Try<Result> result;
456 auto f = [&func, &result]() mutable {
457 result = folly::makeTryWith(std::forward<F>(func));
460 immediateFunc_ = std::ref(f);
461 activeFiber_->preempt(Fiber::AWAITING_IMMEDIATE);
463 return std::move(result).value();
466 inline FiberManager& FiberManager::getFiberManager() {
467 assert(currentFiberManager_ != nullptr);
468 return *currentFiberManager_;
471 inline FiberManager* FiberManager::getFiberManagerUnsafe() {
472 return currentFiberManager_;
475 inline bool FiberManager::hasActiveFiber() const {
476 return activeFiber_ != nullptr;
479 inline void FiberManager::yield() {
480 assert(currentFiberManager_ == this);
481 assert(activeFiber_ != nullptr);
482 assert(activeFiber_->state_ == Fiber::RUNNING);
483 activeFiber_->preempt(Fiber::YIELDED);
486 template <typename T>
487 T& FiberManager::local() {
488 if (std::type_index(typeid(T)) == localType_ && currentFiber_) {
489 return currentFiber_->localData_.get<T>();
491 return localThread<T>();
494 template <typename T>
495 T& FiberManager::localThread() {
497 static thread_local T t;
499 #else // osx doesn't support thread_local
500 static ThreadLocal<T> t;
505 inline void FiberManager::initLocalData(Fiber& fiber) {
506 auto fm = getFiberManagerUnsafe();
507 if (fm && fm->currentFiber_ && fm->localType_ == localType_) {
508 fiber.localData_ = fm->currentFiber_->localData_;
510 fiber.rcontext_ = RequestContext::saveContext();
513 template <typename LocalT>
514 FiberManager::FiberManager(
516 std::unique_ptr<LoopController> loopController__,
518 loopController_(std::move(loopController__)),
519 stackAllocator_(options.useGuardPages),
520 options_(preprocessOptions(std::move(options))),
521 exceptionCallback_([](std::exception_ptr eptr, std::string context) {
523 std::rethrow_exception(eptr);
524 } catch (const std::exception& e) {
525 LOG(DFATAL) << "Exception " << typeid(e).name()
526 << " with message '" << e.what() << "' was thrown in "
527 << "FiberManager with context '" << context << "'";
530 LOG(DFATAL) << "Unknown exception was thrown in FiberManager with "
531 << "context '" << context << "'";
535 timeoutManager_(std::make_shared<TimeoutController>(*loopController_)),
536 fibersPoolResizer_(*this),
537 localType_(typeid(LocalT)) {
538 loopController_->setFiberManager(this);
541 template <typename F>
542 typename FirstArgOf<F>::type::value_type
543 inline await(F&& func) {
544 typedef typename FirstArgOf<F>::type::value_type Result;
546 folly::Try<Result> result;
549 baton.wait([&func, &result, &baton]() mutable {
550 func(Promise<Result>(result, baton));
553 return folly::moveFromTry(result);