2 * Copyright 2015 Facebook, Inc.
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
8 * http://www.apache.org/licenses/LICENSE-2.0
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
20 #include <folly/CPortability.h>
21 #include <folly/experimental/fibers/Baton.h>
22 #include <folly/experimental/fibers/Fiber.h>
23 #include <folly/experimental/fibers/LoopController.h>
24 #include <folly/experimental/fibers/Promise.h>
25 #include <folly/futures/Try.h>
26 #include <folly/Memory.h>
27 #include <folly/Optional.h>
28 #include <folly/Portability.h>
29 #include <folly/ScopeGuard.h>
31 namespace folly { namespace fibers {
35 inline FiberManager::Options preprocessOptions(FiberManager::Options opts) {
36 #ifdef FOLLY_SANITIZE_ADDRESS
37 /* ASAN needs a lot of extra stack space.
38 16x is a conservative estimate, 8x also worked with tests
39 where it mattered. Note that overallocating here does not necessarily
40 increase RSS, since unused memory is pretty much free. */
48 inline void FiberManager::ensureLoopScheduled() {
49 if (isLoopScheduled_) {
53 isLoopScheduled_ = true;
54 loopController_->schedule();
57 inline void FiberManager::runReadyFiber(Fiber* fiber) {
59 assert(currentFiber_ == nullptr);
60 assert(activeFiber_ == nullptr);
63 assert(fiber->state_ == Fiber::NOT_STARTED ||
64 fiber->state_ == Fiber::READY_TO_RUN);
65 currentFiber_ = fiber;
67 observer_->starting();
70 while (fiber->state_ == Fiber::NOT_STARTED ||
71 fiber->state_ == Fiber::READY_TO_RUN) {
73 jumpContext(&mainContext_, &fiber->fcontext_, fiber->data_);
74 if (fiber->state_ == Fiber::AWAITING_IMMEDIATE) {
78 exceptionCallback_(std::current_exception(), "running immediateFunc_");
80 immediateFunc_ = nullptr;
81 fiber->state_ = Fiber::READY_TO_RUN;
85 if (fiber->state_ == Fiber::AWAITING) {
91 currentFiber_ = nullptr;
92 } else if (fiber->state_ == Fiber::INVALID) {
93 assert(fibersActive_ > 0);
95 // Making sure that task functor is deleted once task is complete.
96 // NOTE: we must do it on main context, as the fiber is not
97 // running at this point.
98 fiber->func_ = nullptr;
99 fiber->resultFunc_ = nullptr;
100 if (fiber->finallyFunc_) {
102 fiber->finallyFunc_();
104 exceptionCallback_(std::current_exception(), "running finallyFunc_");
106 fiber->finallyFunc_ = nullptr;
108 // Make sure LocalData is not accessible from its destructor
110 observer_->stopped();
112 currentFiber_ = nullptr;
113 fiber->localData_.reset();
115 if (fibersPoolSize_ < options_.maxFibersPoolSize) {
116 fibersPool_.push_front(*fiber);
120 assert(fibersAllocated_ > 0);
123 } else if (fiber->state_ == Fiber::YIELDED) {
125 observer_->stopped();
127 currentFiber_ = nullptr;
128 fiber->state_ = Fiber::READY_TO_RUN;
129 yieldedFibers_.push_back(*fiber);
133 inline bool FiberManager::loopUntilNoReady() {
135 isLoopScheduled_ = false;
136 if (!readyFibers_.empty()) {
137 ensureLoopScheduled();
139 currentFiberManager_ = nullptr;
142 currentFiberManager_ = this;
144 bool hadRemoteFiber = true;
145 while (hadRemoteFiber) {
146 hadRemoteFiber = false;
148 while (!readyFibers_.empty()) {
149 auto& fiber = readyFibers_.front();
150 readyFibers_.pop_front();
151 runReadyFiber(&fiber);
154 remoteReadyQueue_.sweep(
155 [this, &hadRemoteFiber] (Fiber* fiber) {
156 runReadyFiber(fiber);
157 hadRemoteFiber = true;
161 remoteTaskQueue_.sweep(
162 [this, &hadRemoteFiber] (RemoteTask* taskPtr) {
163 std::unique_ptr<RemoteTask> task(taskPtr);
164 auto fiber = getFiber();
165 if (task->localData) {
166 fiber->localData_ = *task->localData;
169 fiber->setFunction(std::move(task->func));
170 fiber->data_ = reinterpret_cast<intptr_t>(fiber);
171 runReadyFiber(fiber);
172 hadRemoteFiber = true;
177 readyFibers_.splice(readyFibers_.end(), yieldedFibers_);
179 return fibersActive_ > 0;
182 // We need this to be in a struct, not inlined in addTask, because clang crashes
184 template <typename F>
185 struct FiberManager::AddTaskHelper {
188 static constexpr bool allocateInBuffer =
189 sizeof(Func) <= Fiber::kUserBufferSize;
193 Func(F&& func, FiberManager& fm) :
194 func_(std::forward<F>(func)), fm_(fm) {}
200 fm_.exceptionCallback_(std::current_exception(),
201 "running Func functor");
203 if (allocateInBuffer) {
216 template <typename F>
217 void FiberManager::addTask(F&& func) {
218 typedef AddTaskHelper<F> Helper;
220 auto fiber = getFiber();
221 initLocalData(*fiber);
223 if (Helper::allocateInBuffer) {
224 auto funcLoc = static_cast<typename Helper::Func*>(fiber->getUserBuffer());
225 new (funcLoc) typename Helper::Func(std::forward<F>(func), *this);
227 fiber->setFunction(std::ref(*funcLoc));
229 auto funcLoc = new typename Helper::Func(std::forward<F>(func), *this);
231 fiber->setFunction(std::ref(*funcLoc));
234 fiber->data_ = reinterpret_cast<intptr_t>(fiber);
235 readyFibers_.push_back(*fiber);
237 ensureLoopScheduled();
240 template <typename F>
241 void FiberManager::addTaskRemote(F&& func) {
243 auto currentFm = getFiberManagerUnsafe();
245 currentFm->currentFiber_ &&
246 currentFm->localType_ == localType_) {
247 return folly::make_unique<RemoteTask>(
248 std::forward<F>(func),
249 currentFm->currentFiber_->localData_);
251 return folly::make_unique<RemoteTask>(std::forward<F>(func));
253 if (remoteTaskQueue_.insertHead(task.release())) {
254 loopController_->scheduleThreadSafe();
258 template <typename X>
259 struct IsRvalueRefTry { static const bool value = false; };
260 template <typename T>
261 struct IsRvalueRefTry<folly::Try<T>&&> { static const bool value = true; };
263 // We need this to be in a struct, not inlined in addTaskFinally, because clang
264 // crashes otherwise.
265 template <typename F, typename G>
266 struct FiberManager::AddTaskFinallyHelper {
270 typedef typename std::result_of<F()>::type Result;
272 static constexpr bool allocateInBuffer =
273 sizeof(Func) + sizeof(Finally) <= Fiber::kUserBufferSize;
279 finally_(std::move(finally)),
285 finally_(std::move(*result_));
287 fm_.exceptionCallback_(std::current_exception(),
288 "running Finally functor");
291 if (allocateInBuffer) {
302 folly::Optional<folly::Try<Result>> result_;
308 Func(F&& func, Finally& finally) :
309 func_(std::move(func)), result_(finally.result_) {}
312 result_ = folly::makeTryFunction(std::move(func_));
314 if (allocateInBuffer) {
323 folly::Optional<folly::Try<Result>>& result_;
327 template <typename F, typename G>
328 void FiberManager::addTaskFinally(F&& func, G&& finally) {
329 typedef typename std::result_of<F()>::type Result;
332 IsRvalueRefTry<typename FirstArgOf<G>::type>::value,
333 "finally(arg): arg must be Try<T>&&");
337 typename std::remove_reference<
338 typename FirstArgOf<G>::type
339 >::type::element_type
341 "finally(Try<T>&&): T must be convertible from func()'s return type");
343 auto fiber = getFiber();
344 initLocalData(*fiber);
346 typedef AddTaskFinallyHelper<F,G> Helper;
348 if (Helper::allocateInBuffer) {
349 auto funcLoc = static_cast<typename Helper::Func*>(
350 fiber->getUserBuffer());
351 auto finallyLoc = static_cast<typename Helper::Finally*>(
352 static_cast<void*>(funcLoc + 1));
354 new (finallyLoc) typename Helper::Finally(std::move(finally), *this);
355 new (funcLoc) typename Helper::Func(std::move(func), *finallyLoc);
357 fiber->setFunctionFinally(std::ref(*funcLoc), std::ref(*finallyLoc));
359 auto finallyLoc = new typename Helper::Finally(std::move(finally), *this);
360 auto funcLoc = new typename Helper::Func(std::move(func), *finallyLoc);
362 fiber->setFunctionFinally(std::ref(*funcLoc), std::ref(*finallyLoc));
365 fiber->data_ = reinterpret_cast<intptr_t>(fiber);
366 readyFibers_.push_back(*fiber);
368 ensureLoopScheduled();
371 template <typename F>
372 typename std::result_of<F()>::type
373 FiberManager::runInMainContext(F&& func) {
374 return runInMainContextHelper(std::forward<F>(func));
377 template <typename F>
378 inline typename std::enable_if<
379 !std::is_same<typename std::result_of<F()>::type, void>::value,
380 typename std::result_of<F()>::type>::type
381 FiberManager::runInMainContextHelper(F&& func) {
382 if (UNLIKELY(activeFiber_ == nullptr)) {
386 typedef typename std::result_of<F()>::type Result;
388 folly::Try<Result> result;
389 auto f = [&func, &result]() mutable {
390 result = folly::makeTryFunction(std::forward<F>(func));
393 immediateFunc_ = std::ref(f);
394 activeFiber_->preempt(Fiber::AWAITING_IMMEDIATE);
396 return std::move(result.value());
399 template <typename F>
400 inline typename std::enable_if<
401 std::is_same<typename std::result_of<F()>::type, void>::value,
403 FiberManager::runInMainContextHelper(F&& func) {
404 if (UNLIKELY(activeFiber_ == nullptr)) {
409 immediateFunc_ = std::ref(func);
410 activeFiber_->preempt(Fiber::AWAITING_IMMEDIATE);
413 inline FiberManager& FiberManager::getFiberManager() {
414 assert(currentFiberManager_ != nullptr);
415 return *currentFiberManager_;
418 inline FiberManager* FiberManager::getFiberManagerUnsafe() {
419 return currentFiberManager_;
422 inline bool FiberManager::hasActiveFiber() const {
423 return activeFiber_ != nullptr;
426 inline void FiberManager::yield() {
427 assert(currentFiberManager_ == this);
428 assert(activeFiber_ != nullptr);
429 assert(activeFiber_->state_ == Fiber::RUNNING);
430 activeFiber_->preempt(Fiber::YIELDED);
433 template <typename T>
434 T& FiberManager::local() {
435 if (std::type_index(typeid(T)) == localType_ && currentFiber_) {
436 return currentFiber_->localData_.get<T>();
438 return localThread<T>();
441 template <typename T>
442 T& FiberManager::localThread() {
443 static thread_local T t;
447 inline void FiberManager::initLocalData(Fiber& fiber) {
448 auto fm = getFiberManagerUnsafe();
449 if (fm && fm->currentFiber_ && fm->localType_ == localType_) {
450 fiber.localData_ = fm->currentFiber_->localData_;
454 template <typename LocalT>
455 FiberManager::FiberManager(
457 std::unique_ptr<LoopController> loopController__,
459 loopController_(std::move(loopController__)),
460 options_(preprocessOptions(std::move(options))),
461 exceptionCallback_([](std::exception_ptr eptr, std::string context) {
463 std::rethrow_exception(eptr);
464 } catch (const std::exception& e) {
465 LOG(DFATAL) << "Exception " << typeid(e).name()
466 << " with message '" << e.what() << "' was thrown in "
467 << "FiberManager with context '" << context << "'";
470 LOG(DFATAL) << "Unknown exception was thrown in FiberManager with "
471 << "context '" << context << "'";
475 timeoutManager_(std::make_shared<TimeoutController>(*loopController_)),
476 localType_(typeid(LocalT)) {
477 loopController_->setFiberManager(this);
480 template <typename F>
481 typename FirstArgOf<F>::type::value_type
482 inline await(F&& func) {
483 typedef typename FirstArgOf<F>::type::value_type Result;
485 folly::Try<Result> result;
488 baton.wait([&func, &result, &baton]() mutable {
489 func(Promise<Result>(result, baton));
492 return folly::moveFromTry(std::move(result));