2 * Copyright 2016 Facebook, Inc.
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
8 * http://www.apache.org/licenses/LICENSE-2.0
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
20 #include <folly/CPortability.h>
21 #include <folly/Memory.h>
22 #include <folly/Optional.h>
23 #include <folly/Portability.h>
24 #include <folly/ScopeGuard.h>
26 #include <folly/ThreadLocal.h>
28 #include <folly/experimental/fibers/Baton.h>
29 #include <folly/experimental/fibers/Fiber.h>
30 #include <folly/experimental/fibers/LoopController.h>
31 #include <folly/experimental/fibers/Promise.h>
32 #include <folly/futures/Promise.h>
33 #include <folly/futures/Try.h>
35 namespace folly { namespace fibers {
39 inline FiberManager::Options preprocessOptions(FiberManager::Options opts) {
40 #ifdef FOLLY_SANITIZE_ADDRESS
41 /* ASAN needs a lot of extra stack space.
42 16x is a conservative estimate, 8x also worked with tests
43 where it mattered. Note that overallocating here does not necessarily
44 increase RSS, since unused memory is pretty much free. */
52 inline void FiberManager::ensureLoopScheduled() {
53 if (isLoopScheduled_) {
57 isLoopScheduled_ = true;
58 loopController_->schedule();
61 inline void FiberManager::runReadyFiber(Fiber* fiber) {
63 assert(currentFiber_ == nullptr);
64 assert(activeFiber_ == nullptr);
67 assert(fiber->state_ == Fiber::NOT_STARTED ||
68 fiber->state_ == Fiber::READY_TO_RUN);
69 currentFiber_ = fiber;
70 fiber->rcontext_ = RequestContext::setContext(std::move(fiber->rcontext_));
72 observer_->starting(reinterpret_cast<uintptr_t>(fiber));
75 while (fiber->state_ == Fiber::NOT_STARTED ||
76 fiber->state_ == Fiber::READY_TO_RUN) {
78 jumpContext(&mainContext_, &fiber->fcontext_, fiber->data_);
79 if (fiber->state_ == Fiber::AWAITING_IMMEDIATE) {
83 exceptionCallback_(std::current_exception(), "running immediateFunc_");
85 immediateFunc_ = nullptr;
86 fiber->state_ = Fiber::READY_TO_RUN;
90 if (fiber->state_ == Fiber::AWAITING) {
94 observer_->stopped(reinterpret_cast<uintptr_t>(fiber));
96 currentFiber_ = nullptr;
97 fiber->rcontext_ = RequestContext::setContext(std::move(fiber->rcontext_));
98 } else if (fiber->state_ == Fiber::INVALID) {
99 assert(fibersActive_ > 0);
101 // Making sure that task functor is deleted once task is complete.
102 // NOTE: we must do it on main context, as the fiber is not
103 // running at this point.
104 fiber->func_ = nullptr;
105 fiber->resultFunc_ = nullptr;
106 if (fiber->finallyFunc_) {
108 fiber->finallyFunc_();
110 exceptionCallback_(std::current_exception(), "running finallyFunc_");
112 fiber->finallyFunc_ = nullptr;
114 // Make sure LocalData is not accessible from its destructor
116 observer_->stopped(reinterpret_cast<uintptr_t>(fiber));
118 currentFiber_ = nullptr;
119 fiber->rcontext_ = RequestContext::setContext(std::move(fiber->rcontext_));
120 fiber->localData_.reset();
121 fiber->rcontext_.reset();
123 if (fibersPoolSize_ < options_.maxFibersPoolSize ||
124 options_.fibersPoolResizePeriodMs > 0) {
125 fibersPool_.push_front(*fiber);
129 assert(fibersAllocated_ > 0);
132 } else if (fiber->state_ == Fiber::YIELDED) {
134 observer_->stopped(reinterpret_cast<uintptr_t>(fiber));
136 currentFiber_ = nullptr;
137 fiber->rcontext_ = RequestContext::setContext(std::move(fiber->rcontext_));
138 fiber->state_ = Fiber::READY_TO_RUN;
139 yieldedFibers_.push_back(*fiber);
143 inline bool FiberManager::loopUntilNoReady() {
144 // Support nested FiberManagers
145 auto originalFiberManager = this;
146 std::swap(currentFiberManager_, originalFiberManager);
149 isLoopScheduled_ = false;
150 if (!readyFibers_.empty()) {
151 ensureLoopScheduled();
153 std::swap(currentFiberManager_, originalFiberManager);
154 CHECK_EQ(this, originalFiberManager);
157 bool hadRemoteFiber = true;
158 while (hadRemoteFiber) {
159 hadRemoteFiber = false;
161 while (!readyFibers_.empty()) {
162 auto& fiber = readyFibers_.front();
163 readyFibers_.pop_front();
164 runReadyFiber(&fiber);
167 remoteReadyQueue_.sweep(
168 [this, &hadRemoteFiber] (Fiber* fiber) {
169 runReadyFiber(fiber);
170 hadRemoteFiber = true;
174 remoteTaskQueue_.sweep(
175 [this, &hadRemoteFiber] (RemoteTask* taskPtr) {
176 std::unique_ptr<RemoteTask> task(taskPtr);
177 auto fiber = getFiber();
178 if (task->localData) {
179 fiber->localData_ = *task->localData;
181 fiber->rcontext_ = std::move(task->rcontext);
183 fiber->setFunction(std::move(task->func));
184 fiber->data_ = reinterpret_cast<intptr_t>(fiber);
186 observer_->runnable(reinterpret_cast<uintptr_t>(fiber));
188 runReadyFiber(fiber);
189 hadRemoteFiber = true;
195 for (auto& yielded : yieldedFibers_) {
196 observer_->runnable(reinterpret_cast<uintptr_t>(&yielded));
199 readyFibers_.splice(readyFibers_.end(), yieldedFibers_);
201 return fibersActive_ > 0;
204 // We need this to be in a struct, not inlined in addTask, because clang crashes
206 template <typename F>
207 struct FiberManager::AddTaskHelper {
210 static constexpr bool allocateInBuffer =
211 sizeof(Func) <= Fiber::kUserBufferSize;
215 Func(F&& func, FiberManager& fm) :
216 func_(std::forward<F>(func)), fm_(fm) {}
222 fm_.exceptionCallback_(std::current_exception(),
223 "running Func functor");
225 if (allocateInBuffer) {
238 template <typename F>
239 void FiberManager::addTask(F&& func) {
240 typedef AddTaskHelper<F> Helper;
242 auto fiber = getFiber();
243 initLocalData(*fiber);
245 if (Helper::allocateInBuffer) {
246 auto funcLoc = static_cast<typename Helper::Func*>(fiber->getUserBuffer());
247 new (funcLoc) typename Helper::Func(std::forward<F>(func), *this);
249 fiber->setFunction(std::ref(*funcLoc));
251 auto funcLoc = new typename Helper::Func(std::forward<F>(func), *this);
253 fiber->setFunction(std::ref(*funcLoc));
256 fiber->data_ = reinterpret_cast<intptr_t>(fiber);
257 readyFibers_.push_back(*fiber);
259 observer_->runnable(reinterpret_cast<uintptr_t>(fiber));
262 ensureLoopScheduled();
265 template <typename F>
266 auto FiberManager::addTaskFuture(F&& func)
267 -> folly::Future<typename std::result_of<F()>::type> {
268 using T = typename std::result_of<F()>::type;
270 auto f = p.getFuture();
271 addTaskFinally([func = std::forward<F>(func)]() mutable { return func(); },
272 [p = std::move(p)](folly::Try<T> && t) mutable {
273 p.setTry(std::move(t));
278 template <typename F>
279 void FiberManager::addTaskRemote(F&& func) {
280 // addTaskRemote indirectly requires wrapping the function in a
281 // std::function, which must be copyable. As move-only lambdas may be
282 // passed in we wrap it first in a move wrapper and then capture the wrapped
284 auto functionWrapper = [f = folly::makeMoveWrapper(
285 std::forward<F>(func))]() mutable {
289 auto currentFm = getFiberManagerUnsafe();
291 currentFm->currentFiber_ &&
292 currentFm->localType_ == localType_) {
293 return folly::make_unique<RemoteTask>(
294 std::move(functionWrapper), currentFm->currentFiber_->localData_);
296 return folly::make_unique<RemoteTask>(std::move(functionWrapper));
299 [&]() { return remoteTaskQueue_.insertHead(task.release()); };
300 loopController_->scheduleThreadSafe(std::ref(insertHead));
303 template <typename F>
304 auto FiberManager::addTaskRemoteFuture(F&& func)
305 -> folly::Future<typename std::result_of<F()>::type> {
306 folly::Promise<typename std::result_of<F()>::type> p;
307 auto f = p.getFuture();
309 [ p = std::move(p), func = std::forward<F>(func), this ]() mutable {
310 auto t = folly::makeTryWith(std::forward<F>(func));
311 runInMainContext([&]() { p.setTry(std::move(t)); });
316 template <typename X>
317 struct IsRvalueRefTry { static const bool value = false; };
318 template <typename T>
319 struct IsRvalueRefTry<folly::Try<T>&&> { static const bool value = true; };
321 // We need this to be in a struct, not inlined in addTaskFinally, because clang
322 // crashes otherwise.
323 template <typename F, typename G>
324 struct FiberManager::AddTaskFinallyHelper {
328 typedef typename std::result_of<F()>::type Result;
330 static constexpr bool allocateInBuffer =
331 sizeof(Func) + sizeof(Finally) <= Fiber::kUserBufferSize;
337 finally_(std::move(finally)),
343 finally_(std::move(*result_));
345 fm_.exceptionCallback_(std::current_exception(),
346 "running Finally functor");
349 if (allocateInBuffer) {
360 folly::Optional<folly::Try<Result>> result_;
366 Func(F&& func, Finally& finally) :
367 func_(std::move(func)), result_(finally.result_) {}
370 result_ = folly::makeTryWith(std::move(func_));
372 if (allocateInBuffer) {
381 folly::Optional<folly::Try<Result>>& result_;
385 template <typename F, typename G>
386 void FiberManager::addTaskFinally(F&& func, G&& finally) {
387 typedef typename std::result_of<F()>::type Result;
390 IsRvalueRefTry<typename FirstArgOf<G>::type>::value,
391 "finally(arg): arg must be Try<T>&&");
395 typename std::remove_reference<
396 typename FirstArgOf<G>::type
397 >::type::element_type
399 "finally(Try<T>&&): T must be convertible from func()'s return type");
401 auto fiber = getFiber();
402 initLocalData(*fiber);
404 typedef AddTaskFinallyHelper<F,G> Helper;
406 if (Helper::allocateInBuffer) {
407 auto funcLoc = static_cast<typename Helper::Func*>(
408 fiber->getUserBuffer());
409 auto finallyLoc = static_cast<typename Helper::Finally*>(
410 static_cast<void*>(funcLoc + 1));
412 new (finallyLoc) typename Helper::Finally(std::move(finally), *this);
413 new (funcLoc) typename Helper::Func(std::move(func), *finallyLoc);
415 fiber->setFunctionFinally(std::ref(*funcLoc), std::ref(*finallyLoc));
417 auto finallyLoc = new typename Helper::Finally(std::move(finally), *this);
418 auto funcLoc = new typename Helper::Func(std::move(func), *finallyLoc);
420 fiber->setFunctionFinally(std::ref(*funcLoc), std::ref(*finallyLoc));
423 fiber->data_ = reinterpret_cast<intptr_t>(fiber);
424 readyFibers_.push_back(*fiber);
426 observer_->runnable(reinterpret_cast<uintptr_t>(fiber));
429 ensureLoopScheduled();
432 template <typename F>
433 typename std::result_of<F()>::type
434 FiberManager::runInMainContext(F&& func) {
435 if (UNLIKELY(activeFiber_ == nullptr)) {
439 typedef typename std::result_of<F()>::type Result;
441 folly::Try<Result> result;
442 auto f = [&func, &result]() mutable {
443 result = folly::makeTryWith(std::forward<F>(func));
446 immediateFunc_ = std::ref(f);
447 activeFiber_->preempt(Fiber::AWAITING_IMMEDIATE);
449 return std::move(result).value();
452 inline FiberManager& FiberManager::getFiberManager() {
453 assert(currentFiberManager_ != nullptr);
454 return *currentFiberManager_;
457 inline FiberManager* FiberManager::getFiberManagerUnsafe() {
458 return currentFiberManager_;
461 inline bool FiberManager::hasActiveFiber() const {
462 return activeFiber_ != nullptr;
465 inline void FiberManager::yield() {
466 assert(currentFiberManager_ == this);
467 assert(activeFiber_ != nullptr);
468 assert(activeFiber_->state_ == Fiber::RUNNING);
469 activeFiber_->preempt(Fiber::YIELDED);
472 template <typename T>
473 T& FiberManager::local() {
474 if (std::type_index(typeid(T)) == localType_ && currentFiber_) {
475 return currentFiber_->localData_.get<T>();
477 return localThread<T>();
480 template <typename T>
481 T& FiberManager::localThread() {
483 static thread_local T t;
485 #else // osx doesn't support thread_local
486 static ThreadLocal<T> t;
491 inline void FiberManager::initLocalData(Fiber& fiber) {
492 auto fm = getFiberManagerUnsafe();
493 if (fm && fm->currentFiber_ && fm->localType_ == localType_) {
494 fiber.localData_ = fm->currentFiber_->localData_;
496 fiber.rcontext_ = RequestContext::saveContext();
499 template <typename LocalT>
500 FiberManager::FiberManager(
502 std::unique_ptr<LoopController> loopController__,
504 loopController_(std::move(loopController__)),
505 stackAllocator_(options.useGuardPages),
506 options_(preprocessOptions(std::move(options))),
507 exceptionCallback_([](std::exception_ptr eptr, std::string context) {
509 std::rethrow_exception(eptr);
510 } catch (const std::exception& e) {
511 LOG(DFATAL) << "Exception " << typeid(e).name()
512 << " with message '" << e.what() << "' was thrown in "
513 << "FiberManager with context '" << context << "'";
516 LOG(DFATAL) << "Unknown exception was thrown in FiberManager with "
517 << "context '" << context << "'";
521 timeoutManager_(std::make_shared<TimeoutController>(*loopController_)),
522 fibersPoolResizer_(*this),
523 localType_(typeid(LocalT)) {
524 loopController_->setFiberManager(this);
527 template <typename F>
528 typename FirstArgOf<F>::type::value_type
529 inline await(F&& func) {
530 typedef typename FirstArgOf<F>::type::value_type Result;
532 folly::Try<Result> result;
535 baton.wait([&func, &result, &baton]() mutable {
536 func(Promise<Result>(result, baton));
539 return folly::moveFromTry(result);