}
intptr_t Fiber::preempt(State state) {
- DCHECK_EQ(fiberManager_.activeFiber_, this);
- DCHECK_EQ(state_, RUNNING);
- DCHECK_NE(state, RUNNING);
+ intptr_t ret;
- fiberManager_.activeFiber_ = nullptr;
- state_ = state;
+ auto preemptImpl = [&]() mutable {
+ DCHECK_EQ(fiberManager_.activeFiber_, this);
+ DCHECK_EQ(state_, RUNNING);
+ DCHECK_NE(state, RUNNING);
- recordStackPosition();
+ fiberManager_.activeFiber_ = nullptr;
+ state_ = state;
+
+ recordStackPosition();
+
+ ret = jumpContext(&fcontext_, &fiberManager_.mainContext_, 0);
- auto ret = jumpContext(&fcontext_, &fiberManager_.mainContext_, 0);
+ DCHECK_EQ(fiberManager_.activeFiber_, this);
+ DCHECK_EQ(state_, READY_TO_RUN);
+ state_ = RUNNING;
+ };
- DCHECK_EQ(fiberManager_.activeFiber_, this);
- DCHECK_EQ(state_, READY_TO_RUN);
- state_ = RUNNING;
+ if (fiberManager_.preemptRunner_) {
+ fiberManager_.preemptRunner_->run(std::ref(preemptImpl));
+ } else {
+ preemptImpl();
+ }
return ret;
}
observer_ = observer;
}
+void FiberManager::setPreemptRunner(InlineFunctionRunner* preemptRunner) {
+ preemptRunner_ = preemptRunner;
+}
+
void FiberManager::doFibersPoolResizing() {
while (fibersAllocated_ > maxFibersActiveLastPeriod_ &&
fibersPoolSize_ > options_.maxFibersPoolSize) {
class LocalType {
};
+class InlineFunctionRunner {
+ public:
+ virtual ~InlineFunctionRunner() {}
+
+ /**
+ * func must be executed inline and only once.
+ */
+ virtual void run(std::function<void()> func) = 0;
+};
+
/**
* @class FiberManager
* @brief Single-threaded task execution engine.
*/
void setObserver(ExecutionObserver* observer);
+ /**
+ * Setup fibers preempt runner.
+ */
+ void setPreemptRunner(InlineFunctionRunner* preemptRunner);
+
/**
* Returns an estimate of the number of fibers which are waiting to run (does
* not include fibers or tasks scheduled remotely).
*/
std::function<void()> immediateFunc_;
+ /**
+ * Preempt runner.
+ */
+ InlineFunctionRunner* preemptRunner_{nullptr};
+
/**
* Fiber's execution observer.
*/