From: Jon Maltiel Swenson Date: Fri, 30 Oct 2015 00:16:08 +0000 (-0700) Subject: Periodically reclaim unnecessary fibers from free pool X-Git-Tag: deprecate-dynamic-initializer~285 X-Git-Url: http://demsky.eecs.uci.edu/git/?a=commitdiff_plain;h=fe2bf2cdf9062abb56e474cbba2248461f8f5b0d;p=folly.git Periodically reclaim unnecessary fibers from free pool Summary: Reclaim unnecessary fibers from the free pool periodically. Turn this behavior on in mcrouter/proxy.cpp Reviewed By: pavlo-fb Differential Revision: D2584825 fb-gh-sync-id: eabc58eefe6fd38972c9e23ca3cbe79eb8316a3e --- diff --git a/folly/experimental/fibers/FiberManager-inl.h b/folly/experimental/fibers/FiberManager-inl.h index 8df38576..299ca709 100644 --- a/folly/experimental/fibers/FiberManager-inl.h +++ b/folly/experimental/fibers/FiberManager-inl.h @@ -119,7 +119,8 @@ inline void FiberManager::runReadyFiber(Fiber* fiber) { fiber->localData_.reset(); fiber->rcontext_.reset(); - if (fibersPoolSize_ < options_.maxFibersPoolSize) { + if (fibersPoolSize_ < options_.maxFibersPoolSize || + options_.fibersPoolResizePeriodMs > 0) { fibersPool_.push_front(*fiber); ++fibersPoolSize_; } else { @@ -503,6 +504,7 @@ FiberManager::FiberManager( } }), timeoutManager_(std::make_shared(*loopController_)), + fibersPoolResizer_(*this), localType_(typeid(LocalT)) { loopController_->setFiberManager(this); } diff --git a/folly/experimental/fibers/FiberManager.cpp b/folly/experimental/fibers/FiberManager.cpp index 2b19393a..1f0f264a 100644 --- a/folly/experimental/fibers/FiberManager.cpp +++ b/folly/experimental/fibers/FiberManager.cpp @@ -66,6 +66,12 @@ bool FiberManager::hasTasks() const { Fiber* FiberManager::getFiber() { Fiber* fiber = nullptr; + + if (options_.fibersPoolResizePeriodMs > 0 && !fibersPoolResizerScheduled_) { + fibersPoolResizer_(); + fibersPoolResizerScheduled_ = true; + } + if (fibersPool_.empty()) { fiber = new Fiber(*this); ++fibersAllocated_; @@ -76,7 +82,9 @@ Fiber* FiberManager::getFiber() { --fibersPoolSize_; } assert(fiber); - ++fibersActive_; + if (++fibersActive_ > maxFibersActiveLastPeriod_) { + maxFibersActiveLastPeriod_ = fibersActive_; + } ++fiberId_; bool recordStack = (options_.recordStackEvery != 0) && (fiberId_ % options_.recordStackEvery == 0); @@ -114,4 +122,26 @@ void FiberManager::setObserver(ExecutionObserver* observer) { observer_ = observer; } +void FiberManager::doFibersPoolResizing() { + while (fibersAllocated_ > maxFibersActiveLastPeriod_ && + fibersPoolSize_ > options_.maxFibersPoolSize) { + auto fiber = &fibersPool_.front(); + assert(fiber != nullptr); + fibersPool_.pop_front(); + delete fiber; + --fibersPoolSize_; + --fibersAllocated_; + } + + maxFibersActiveLastPeriod_ = fibersActive_; +} + +void FiberManager::FiberManager::FibersPoolResizer::operator()() { + fiberManager_.doFibersPoolResizing(); + fiberManager_.timeoutManager_->registerTimeout( + *this, + std::chrono::milliseconds( + fiberManager_.options_.fibersPoolResizePeriodMs)); +} + }} diff --git a/folly/experimental/fibers/FiberManager.h b/folly/experimental/fibers/FiberManager.h index 80ebd543..0a74500f 100644 --- a/folly/experimental/fibers/FiberManager.h +++ b/folly/experimental/fibers/FiberManager.h @@ -34,6 +34,7 @@ #include #include #include +#include #include namespace folly { namespace fibers { @@ -89,6 +90,13 @@ class FiberManager : public ::folly::Executor { */ bool useGuardPages{false}; + /** + * Free unnecessary fibers in the fibers pool every fibersPoolResizePeriodMs + * milliseconds. If value is 0, periodic resizing of the fibers pool is + * disabled. + */ + uint32_t fibersPoolResizePeriodMs{0}; + constexpr Options() {} }; @@ -306,6 +314,12 @@ class FiberManager : public ::folly::Executor { size_t fibersActive_{0}; /**< number of running or blocked fibers */ size_t fiberId_{0}; /**< id of last fiber used */ + /** + * Maximum number of active fibers in the last period lasting + * Options::fibersPoolResizePeriod milliseconds. + */ + size_t maxFibersActiveLastPeriod_{0}; + FContext::ContextStruct mainContext_; /**< stores loop function context */ std::unique_ptr loopController_; @@ -387,6 +401,19 @@ class FiberManager : public ::folly::Executor { std::shared_ptr timeoutManager_; + struct FibersPoolResizer { + explicit FibersPoolResizer(FiberManager& fm) : + fiberManager_(fm) {} + void operator()(); + private: + FiberManager& fiberManager_; + }; + + FibersPoolResizer fibersPoolResizer_; + bool fibersPoolResizerScheduled_{false}; + + void doFibersPoolResizing(); + /** * Only local of this type will be available for fibers. */ diff --git a/folly/experimental/fibers/test/FibersTest.cpp b/folly/experimental/fibers/test/FibersTest.cpp index 874bf7df..ed6156f1 100644 --- a/folly/experimental/fibers/test/FibersTest.cpp +++ b/folly/experimental/fibers/test/FibersTest.cpp @@ -1403,6 +1403,68 @@ TEST(FiberManager, RequestContext) { EXPECT_EQ(rcontext, folly::RequestContext::get()); } +TEST(FiberManager, resizePeriodically) { + FiberManager::Options opts; + opts.fibersPoolResizePeriodMs = 300; + opts.maxFibersPoolSize = 5; + + FiberManager manager(folly::make_unique(), opts); + + folly::EventBase evb; + dynamic_cast(manager.loopController()) + .attachEventBase(evb); + + std::vector batons(10); + + size_t tasksRun = 0; + for (size_t i = 0; i < 30; ++i) { + manager.addTask([i, &batons, &tasksRun]() { + ++tasksRun; + // Keep some fibers active indefinitely + if (i < batons.size()) { + batons[i].wait(); + } + }); + } + + EXPECT_EQ(0, tasksRun); + EXPECT_EQ(30, manager.fibersAllocated()); + EXPECT_EQ(0, manager.fibersPoolSize()); + + evb.loopOnce(); + EXPECT_EQ(30, tasksRun); + EXPECT_EQ(30, manager.fibersAllocated()); + // Can go over maxFibersPoolSize, 10 of 30 fibers still active + EXPECT_EQ(20, manager.fibersPoolSize()); + + std::this_thread::sleep_for(std::chrono::milliseconds(400)); + evb.loopOnce(); // no fibers active in this period + EXPECT_EQ(30, manager.fibersAllocated()); + EXPECT_EQ(20, manager.fibersPoolSize()); + + std::this_thread::sleep_for(std::chrono::milliseconds(400)); + evb.loopOnce(); // should shrink fibers pool to maxFibersPoolSize + EXPECT_EQ(15, manager.fibersAllocated()); + EXPECT_EQ(5, manager.fibersPoolSize()); + + for (size_t i = 0; i < batons.size(); ++i) { + batons[i].post(); + } + evb.loopOnce(); + EXPECT_EQ(15, manager.fibersAllocated()); + EXPECT_EQ(15, manager.fibersPoolSize()); + + std::this_thread::sleep_for(std::chrono::milliseconds(400)); + evb.loopOnce(); // 10 fibers active in last period + EXPECT_EQ(10, manager.fibersAllocated()); + EXPECT_EQ(10, manager.fibersPoolSize()); + + std::this_thread::sleep_for(std::chrono::milliseconds(400)); + evb.loopOnce(); + EXPECT_EQ(5, manager.fibersAllocated()); + EXPECT_EQ(5, manager.fibersPoolSize()); +} + static size_t sNumAwaits; void runBenchmark(size_t numAwaits, size_t toSend) {