fiber->localData_.reset();
fiber->rcontext_.reset();
- if (fibersPoolSize_ < options_.maxFibersPoolSize) {
+ if (fibersPoolSize_ < options_.maxFibersPoolSize ||
+ options_.fibersPoolResizePeriodMs > 0) {
fibersPool_.push_front(*fiber);
++fibersPoolSize_;
} else {
}
}),
timeoutManager_(std::make_shared<TimeoutController>(*loopController_)),
+ fibersPoolResizer_(*this),
localType_(typeid(LocalT)) {
loopController_->setFiberManager(this);
}
Fiber* FiberManager::getFiber() {
Fiber* fiber = nullptr;
+
+ if (options_.fibersPoolResizePeriodMs > 0 && !fibersPoolResizerScheduled_) {
+ fibersPoolResizer_();
+ fibersPoolResizerScheduled_ = true;
+ }
+
if (fibersPool_.empty()) {
fiber = new Fiber(*this);
++fibersAllocated_;
--fibersPoolSize_;
}
assert(fiber);
- ++fibersActive_;
+ if (++fibersActive_ > maxFibersActiveLastPeriod_) {
+ maxFibersActiveLastPeriod_ = fibersActive_;
+ }
++fiberId_;
bool recordStack = (options_.recordStackEvery != 0) &&
(fiberId_ % options_.recordStackEvery == 0);
observer_ = observer;
}
+void FiberManager::doFibersPoolResizing() {
+ while (fibersAllocated_ > maxFibersActiveLastPeriod_ &&
+ fibersPoolSize_ > options_.maxFibersPoolSize) {
+ auto fiber = &fibersPool_.front();
+ assert(fiber != nullptr);
+ fibersPool_.pop_front();
+ delete fiber;
+ --fibersPoolSize_;
+ --fibersAllocated_;
+ }
+
+ maxFibersActiveLastPeriod_ = fibersActive_;
+}
+
+void FiberManager::FiberManager::FibersPoolResizer::operator()() {
+ fiberManager_.doFibersPoolResizing();
+ fiberManager_.timeoutManager_->registerTimeout(
+ *this,
+ std::chrono::milliseconds(
+ fiberManager_.options_.fibersPoolResizePeriodMs));
+}
+
}}
#include <folly/experimental/fibers/BoostContextCompatibility.h>
#include <folly/experimental/fibers/Fiber.h>
#include <folly/experimental/fibers/GuardPageAllocator.h>
+#include <folly/experimental/fibers/TimeoutController.h>
#include <folly/experimental/fibers/traits.h>
namespace folly { namespace fibers {
*/
bool useGuardPages{false};
+ /**
+ * Free unnecessary fibers in the fibers pool every fibersPoolResizePeriodMs
+ * milliseconds. If value is 0, periodic resizing of the fibers pool is
+ * disabled.
+ */
+ uint32_t fibersPoolResizePeriodMs{0};
+
constexpr Options() {}
};
size_t fibersActive_{0}; /**< number of running or blocked fibers */
size_t fiberId_{0}; /**< id of last fiber used */
+ /**
+ * Maximum number of active fibers in the last period lasting
+ * Options::fibersPoolResizePeriod milliseconds.
+ */
+ size_t maxFibersActiveLastPeriod_{0};
+
FContext::ContextStruct mainContext_; /**< stores loop function context */
std::unique_ptr<LoopController> loopController_;
std::shared_ptr<TimeoutController> timeoutManager_;
+ struct FibersPoolResizer {
+ explicit FibersPoolResizer(FiberManager& fm) :
+ fiberManager_(fm) {}
+ void operator()();
+ private:
+ FiberManager& fiberManager_;
+ };
+
+ FibersPoolResizer fibersPoolResizer_;
+ bool fibersPoolResizerScheduled_{false};
+
+ void doFibersPoolResizing();
+
/**
* Only local of this type will be available for fibers.
*/
EXPECT_EQ(rcontext, folly::RequestContext::get());
}
+TEST(FiberManager, resizePeriodically) {
+ FiberManager::Options opts;
+ opts.fibersPoolResizePeriodMs = 300;
+ opts.maxFibersPoolSize = 5;
+
+ FiberManager manager(folly::make_unique<EventBaseLoopController>(), opts);
+
+ folly::EventBase evb;
+ dynamic_cast<EventBaseLoopController&>(manager.loopController())
+ .attachEventBase(evb);
+
+ std::vector<Baton> batons(10);
+
+ size_t tasksRun = 0;
+ for (size_t i = 0; i < 30; ++i) {
+ manager.addTask([i, &batons, &tasksRun]() {
+ ++tasksRun;
+ // Keep some fibers active indefinitely
+ if (i < batons.size()) {
+ batons[i].wait();
+ }
+ });
+ }
+
+ EXPECT_EQ(0, tasksRun);
+ EXPECT_EQ(30, manager.fibersAllocated());
+ EXPECT_EQ(0, manager.fibersPoolSize());
+
+ evb.loopOnce();
+ EXPECT_EQ(30, tasksRun);
+ EXPECT_EQ(30, manager.fibersAllocated());
+ // Can go over maxFibersPoolSize, 10 of 30 fibers still active
+ EXPECT_EQ(20, manager.fibersPoolSize());
+
+ std::this_thread::sleep_for(std::chrono::milliseconds(400));
+ evb.loopOnce(); // no fibers active in this period
+ EXPECT_EQ(30, manager.fibersAllocated());
+ EXPECT_EQ(20, manager.fibersPoolSize());
+
+ std::this_thread::sleep_for(std::chrono::milliseconds(400));
+ evb.loopOnce(); // should shrink fibers pool to maxFibersPoolSize
+ EXPECT_EQ(15, manager.fibersAllocated());
+ EXPECT_EQ(5, manager.fibersPoolSize());
+
+ for (size_t i = 0; i < batons.size(); ++i) {
+ batons[i].post();
+ }
+ evb.loopOnce();
+ EXPECT_EQ(15, manager.fibersAllocated());
+ EXPECT_EQ(15, manager.fibersPoolSize());
+
+ std::this_thread::sleep_for(std::chrono::milliseconds(400));
+ evb.loopOnce(); // 10 fibers active in last period
+ EXPECT_EQ(10, manager.fibersAllocated());
+ EXPECT_EQ(10, manager.fibersPoolSize());
+
+ std::this_thread::sleep_for(std::chrono::milliseconds(400));
+ evb.loopOnce();
+ EXPECT_EQ(5, manager.fibersAllocated());
+ EXPECT_EQ(5, manager.fibersPoolSize());
+}
+
static size_t sNumAwaits;
void runBenchmark(size_t numAwaits, size_t toSend) {