From: James Sedgwick Date: Tue, 23 Sep 2014 13:08:00 +0000 (-0700) Subject: stats for ThreadPoolExecutor X-Git-Tag: v0.22.0~336 X-Git-Url: http://demsky.eecs.uci.edu/git/?a=commitdiff_plain;h=7f3dac64242050ff84ac8c025c2ee0057128c4bf;p=folly.git stats for ThreadPoolExecutor Summary: pool-wide stats via a call on the pool, and per-task stats (e.g. to be funneled into a histogram) via an rx subscription rx needs a little work before this diff is safe - e.g. synchronization around the subscriber list, and perhaps exposing whether there are any subscribers so we can skip stat tracking if no one is listening won't commit this without moving rx into folly/experimental of course the idea is that timeout/expiration notifications can also go through the same subscription channel haven't run the benchmarks yet and have to leave for the evening but tmrw i'll commit changes to the benchmark and get this stuff into windtunnel so i don't have to do so much manual output inspection on future diffs Test Plan: added unit Reviewed By: davejwatson@fb.com Subscribers: fugalh, njormrod, bmatheny FB internal diff: D1558424 Tasks: 5002392, 5002425 --- diff --git a/folly/experimental/wangle/concurrent/CPUThreadPoolExecutor.cpp b/folly/experimental/wangle/concurrent/CPUThreadPoolExecutor.cpp index ba21ed6a..ca88e580 100644 --- a/folly/experimental/wangle/concurrent/CPUThreadPoolExecutor.cpp +++ b/folly/experimental/wangle/concurrent/CPUThreadPoolExecutor.cpp @@ -22,7 +22,7 @@ const size_t CPUThreadPoolExecutor::kDefaultMaxQueueSize = 1 << 18; CPUThreadPoolExecutor::CPUThreadPoolExecutor( size_t numThreads, - std::unique_ptr> taskQueue, + std::unique_ptr> taskQueue, std::unique_ptr threadFactory) : ThreadPoolExecutor(numThreads, std::move(threadFactory)), taskQueue_(std::move(taskQueue)) { @@ -37,29 +37,19 @@ CPUThreadPoolExecutor::~CPUThreadPoolExecutor() { void CPUThreadPoolExecutor::add(Func func) { // TODO handle enqueue failure, here and in other add() callsites - taskQueue_->add(Task(std::move(func))); + taskQueue_->add(CPUTask(std::move(func))); } void CPUThreadPoolExecutor::threadRun(std::shared_ptr thread) { while (1) { // TODO expiration / codel - auto t = taskQueue_->take(); - if (UNLIKELY(t.poison)) { + auto task = taskQueue_->take(); + if (UNLIKELY(task.poison)) { CHECK(threadsToStop_-- > 0); stoppedThreads_.add(thread); return; } else { - thread->idle = false; - try { - t.func(); - } catch (const std::exception& e) { - LOG(ERROR) << "CPUThreadPoolExecutor: func threw unhandled " << - typeid(e).name() << " exception: " << e.what(); - } catch (...) { - LOG(ERROR) << "CPUThreadPoolExecutor: func threw unhandled " - "non-exception object"; - } - thread->idle = true; + runTask(thread, std::move(task)); } if (UNLIKELY(threadsToStop_ > 0 && !isJoin_)) { @@ -77,8 +67,12 @@ void CPUThreadPoolExecutor::stopThreads(size_t n) { CHECK(stoppedThreads_.size() == 0); threadsToStop_ = n; for (int i = 0; i < n; i++) { - taskQueue_->add(Task()); + taskQueue_->add(CPUTask()); } } +uint64_t CPUThreadPoolExecutor::getPendingTaskCount() { + return taskQueue_->size(); +} + }} // folly::wangle diff --git a/folly/experimental/wangle/concurrent/CPUThreadPoolExecutor.h b/folly/experimental/wangle/concurrent/CPUThreadPoolExecutor.h index 575e23c6..7811c678 100644 --- a/folly/experimental/wangle/concurrent/CPUThreadPoolExecutor.h +++ b/folly/experimental/wangle/concurrent/CPUThreadPoolExecutor.h @@ -21,13 +21,12 @@ namespace folly { namespace wangle { class CPUThreadPoolExecutor : public ThreadPoolExecutor { public: - struct Task; + struct CPUTask; - // TODO thread naming, perhaps a required input to ThreadFactories explicit CPUThreadPoolExecutor( size_t numThreads, - std::unique_ptr> taskQueue = - folly::make_unique>( + std::unique_ptr> taskQueue = + folly::make_unique>( CPUThreadPoolExecutor::kDefaultMaxQueueSize), std::unique_ptr threadFactory = folly::make_unique("CPUThreadPool")); @@ -36,15 +35,14 @@ class CPUThreadPoolExecutor : public ThreadPoolExecutor { void add(Func func) override; - struct Task { - explicit Task(Func&& taskArg) : func(std::move(taskArg)), poison(false) {} - Task() : func(nullptr), poison(true) {} - Task(Task&& o) noexcept : func(std::move(o.func)), poison(o.poison) {} - Task(const Task&) = default; - Task& operator=(const Task&) = default; - Func func; + struct CPUTask : public ThreadPoolExecutor::Task { + // Must be noexcept move constructible so it can be used in MPMCQueue + explicit CPUTask(Func&& f) : Task(std::move(f)), poison(false) {} + CPUTask() : Task(nullptr), poison(true) {} + CPUTask(CPUTask&& o) noexcept : Task(std::move(o)), poison(o.poison) {} + CPUTask(const CPUTask&) = default; + CPUTask& operator=(const CPUTask&) = default; bool poison; - // TODO per-task stats, timeouts, expirations }; static const size_t kDefaultMaxQueueSize; @@ -52,8 +50,9 @@ class CPUThreadPoolExecutor : public ThreadPoolExecutor { private: void threadRun(ThreadPtr thread) override; void stopThreads(size_t n) override; + uint64_t getPendingTaskCount() override; - std::unique_ptr> taskQueue_; + std::unique_ptr> taskQueue_; std::atomic threadsToStop_{0}; }; diff --git a/folly/experimental/wangle/concurrent/Executor.h b/folly/experimental/wangle/concurrent/Executor.h index 49db177e..2687ee6b 100644 --- a/folly/experimental/wangle/concurrent/Executor.h +++ b/folly/experimental/wangle/concurrent/Executor.h @@ -22,10 +22,14 @@ namespace folly { namespace wangle { typedef std::function Func; +namespace experimental { // TODO(jsedgwick) merge with folly/wangle/Executor.h + class Executor { public: virtual ~Executor() {}; virtual void add(Func func) = 0; }; +} + }} // folly::wangle diff --git a/folly/experimental/wangle/concurrent/IOThreadPoolExecutor.cpp b/folly/experimental/wangle/concurrent/IOThreadPoolExecutor.cpp index cdc36ef4..6e106f92 100644 --- a/folly/experimental/wangle/concurrent/IOThreadPoolExecutor.cpp +++ b/folly/experimental/wangle/concurrent/IOThreadPoolExecutor.cpp @@ -42,15 +42,15 @@ void IOThreadPoolExecutor::add(Func func) { auto thread = threadList_.get()[nextThread_++ % threadList_.get().size()]; auto ioThread = std::static_pointer_cast(thread); - auto moveFunc = folly::makeMoveWrapper(std::move(func)); - auto wrappedFunc = [moveFunc, ioThread] () { - (*moveFunc)(); - ioThread->outstandingTasks--; + auto moveTask = folly::makeMoveWrapper(Task(std::move(func))); + auto wrappedFunc = [this, ioThread, moveTask] () mutable { + runTask(ioThread, std::move(*moveTask)); + ioThread->pendingTasks--; }; - ioThread->outstandingTasks++; + ioThread->pendingTasks++; if (!ioThread->eventBase.runInEventBaseThread(std::move(wrappedFunc))) { - ioThread->outstandingTasks--; + ioThread->pendingTasks--; throw std::runtime_error("Unable to run func in event base thread"); } } @@ -66,13 +66,14 @@ void IOThreadPoolExecutor::threadRun(ThreadPtr thread) { ioThread->eventBase.loopForever(); } if (isJoin_) { - while (ioThread->outstandingTasks > 0) { + while (ioThread->pendingTasks > 0) { ioThread->eventBase.loopOnce(); } } stoppedThreads_.add(ioThread); } +// threadListLock_ is writelocked void IOThreadPoolExecutor::stopThreads(size_t n) { for (int i = 0; i < n; i++) { const auto ioThread = std::static_pointer_cast( @@ -82,4 +83,18 @@ void IOThreadPoolExecutor::stopThreads(size_t n) { } } +// threadListLock_ is readlocked +uint64_t IOThreadPoolExecutor::getPendingTaskCount() { + uint64_t count = 0; + for (const auto& thread : threadList_.get()) { + auto ioThread = std::static_pointer_cast(thread); + size_t pendingTasks = ioThread->pendingTasks; + if (pendingTasks > 0 && !ioThread->idle) { + pendingTasks--; + } + count += pendingTasks; + } + return count; +} + }} // folly::wangle diff --git a/folly/experimental/wangle/concurrent/IOThreadPoolExecutor.h b/folly/experimental/wangle/concurrent/IOThreadPoolExecutor.h index 2b498bdc..c42da719 100644 --- a/folly/experimental/wangle/concurrent/IOThreadPoolExecutor.h +++ b/folly/experimental/wangle/concurrent/IOThreadPoolExecutor.h @@ -35,11 +35,12 @@ class IOThreadPoolExecutor : public ThreadPoolExecutor { ThreadPtr makeThread() override; void threadRun(ThreadPtr thread) override; void stopThreads(size_t n) override; + uint64_t getPendingTaskCount() override; struct FOLLY_ALIGN_TO_AVOID_FALSE_SHARING IOThread : public Thread { - IOThread() : shouldRun(true), outstandingTasks(0) {}; + IOThread() : shouldRun(true), pendingTasks(0) {}; std::atomic shouldRun; - std::atomic outstandingTasks; + std::atomic pendingTasks; EventBase eventBase; }; diff --git a/folly/experimental/wangle/concurrent/ThreadPoolExecutor.cpp b/folly/experimental/wangle/concurrent/ThreadPoolExecutor.cpp index 4d249b04..30e46f5c 100644 --- a/folly/experimental/wangle/concurrent/ThreadPoolExecutor.cpp +++ b/folly/experimental/wangle/concurrent/ThreadPoolExecutor.cpp @@ -27,6 +27,25 @@ ThreadPoolExecutor::~ThreadPoolExecutor() { CHECK(threadList_.get().size() == 0); } +void ThreadPoolExecutor::runTask( + const ThreadPtr& thread, + Task&& task) { + thread->idle = false; + task.started(); + try { + task.func(); + } catch (const std::exception& e) { + LOG(ERROR) << "ThreadPoolExecutor: func threw unhandled " << + typeid(e).name() << " exception: " << e.what(); + } catch (...) { + LOG(ERROR) << "ThreadPoolExecutor: func threw unhandled non-exception " + "object"; + } + task.completed(); + taskStatsSubject_.onNext(std::move(task.stats)); + thread->idle = true; +} + size_t ThreadPoolExecutor::numThreads() { RWSpinLock::ReadHolder{&threadListLock_}; return threadList_.get().size(); @@ -43,6 +62,7 @@ void ThreadPoolExecutor::setNumThreads(size_t n) { CHECK(threadList_.get().size() == n); } +// threadListLock_ is writelocked void ThreadPoolExecutor::addThreads(size_t n) { for (int i = 0; i < n; i++) { auto thread = makeThread(); @@ -54,6 +74,7 @@ void ThreadPoolExecutor::addThreads(size_t n) { } } +// threadListLock_ is writelocked void ThreadPoolExecutor::removeThreads(size_t n, bool isJoin) { CHECK(n <= threadList_.get().size()); CHECK(stoppedThreads_.size() == 0); @@ -79,6 +100,22 @@ void ThreadPoolExecutor::join() { CHECK(threadList_.get().size() == 0); } +ThreadPoolExecutor::PoolStats ThreadPoolExecutor::getPoolStats() { + RWSpinLock::ReadHolder{&threadListLock_}; + ThreadPoolExecutor::PoolStats stats; + stats.threadCount = threadList_.get().size(); + for (auto thread : threadList_.get()) { + if (thread->idle) { + stats.idleThreadCount++; + } else { + stats.activeThreadCount++; + } + } + stats.pendingTaskCount = getPendingTaskCount(); + stats.totalTaskCount = stats.pendingTaskCount + stats.activeThreadCount; + return stats; +} + std::atomic ThreadPoolExecutor::Thread::nextId(0); void ThreadPoolExecutor::StoppedThreadQueue::add( diff --git a/folly/experimental/wangle/concurrent/ThreadPoolExecutor.h b/folly/experimental/wangle/concurrent/ThreadPoolExecutor.h index f802af14..4eda2d36 100644 --- a/folly/experimental/wangle/concurrent/ThreadPoolExecutor.h +++ b/folly/experimental/wangle/concurrent/ThreadPoolExecutor.h @@ -18,6 +18,7 @@ #include #include #include +#include #include #include @@ -29,7 +30,7 @@ namespace folly { namespace wangle { -class ThreadPoolExecutor : public Executor { +class ThreadPoolExecutor : public experimental::Executor { public: explicit ThreadPoolExecutor( size_t numThreads, @@ -41,10 +42,32 @@ class ThreadPoolExecutor : public Executor { void setNumThreads(size_t numThreads); void stop(); void join(); - // TODO expose stats + + struct PoolStats { + PoolStats() : threadCount(0), idleThreadCount(0), activeThreadCount(0), + pendingTaskCount(0), totalTaskCount(0) {} + size_t threadCount, idleThreadCount, activeThreadCount; + uint64_t pendingTaskCount, totalTaskCount; + }; + + PoolStats getPoolStats(); + + struct TaskStats { + TaskStats() : expired(false), waitTime(0), runTime(0) {} + bool expired; + std::chrono::microseconds waitTime; + std::chrono::microseconds runTime; + }; + + Subscription subscribeToTaskStats( + const ObserverPtr& observer) { + return taskStatsSubject_.subscribe(observer); + } protected: + // Prerequisite: threadListLock_ writelocked void addThreads(size_t n); + // Prerequisite: threadListLock_ writelocked void removeThreads(size_t n, bool isJoin); struct FOLLY_ALIGN_TO_AVOID_FALSE_SHARING Thread { @@ -54,20 +77,50 @@ class ThreadPoolExecutor : public Executor { uint64_t id; std::thread handle; bool idle; - // TODO per-thread stats go here }; typedef std::shared_ptr ThreadPtr; + struct Task { + explicit Task(Func&& f) : func(std::move(f)) { + // Assume that the task in enqueued on creation + intervalBegin = std::chrono::steady_clock::now(); + } + + Func func; + TaskStats stats; + // TODO per-task timeouts, expirations + + void started() { + auto now = std::chrono::steady_clock::now(); + stats.waitTime = std::chrono::duration_cast( + now - intervalBegin); + intervalBegin = now; + } + void completed() { + stats.runTime = std::chrono::duration_cast( + std::chrono::steady_clock::now() - intervalBegin); + } + + std::chrono::steady_clock::time_point intervalBegin; + }; + + void runTask(const ThreadPtr& thread, Task&& task); + // The function that will be bound to pool threads virtual void threadRun(ThreadPtr thread) = 0; - // Stop n threads and put their Thread structs in the threadsStopped_ queue + + // Stop n threads and put their ThreadPtrs in the threadsStopped_ queue + // Prerequisite: threadListLock_ writelocked virtual void stopThreads(size_t n) = 0; + // Create a suitable Thread struct virtual ThreadPtr makeThread() { return std::make_shared(); } - // need a stopThread(id) for keepalive feature + + // Prerequisite: threadListLock_ readlocked + virtual uint64_t getPendingTaskCount() = 0; class ThreadList { public: @@ -112,6 +165,8 @@ class ThreadPoolExecutor : public Executor { RWSpinLock threadListLock_; StoppedThreadQueue stoppedThreads_; std::atomic isJoin_; // whether the current downsizing is a join + + Subject taskStatsSubject_; }; }} // folly::wangle diff --git a/folly/experimental/wangle/concurrent/test/ThreadPoolExecutorTest.cpp b/folly/experimental/wangle/concurrent/test/ThreadPoolExecutorTest.cpp index fe7b8dcd..eb8527ca 100644 --- a/folly/experimental/wangle/concurrent/test/ThreadPoolExecutorTest.cpp +++ b/folly/experimental/wangle/concurrent/test/ThreadPoolExecutorTest.cpp @@ -22,6 +22,10 @@ using namespace folly::wangle; +static Func burnMs(uint64_t ms) { + return [ms]() { std::this_thread::sleep_for(std::chrono::milliseconds(ms)); }; +} + template static void basic() { // Create and destroy @@ -59,7 +63,7 @@ static void stop() { TPE tpe(10); std::atomic completed(0); auto f = [&](){ - std::this_thread::sleep_for(std::chrono::milliseconds(1)); + burnMs(1)(); completed++; }; for (int i = 0; i < 1000; i++) { @@ -82,7 +86,7 @@ static void join() { TPE tpe(10); std::atomic completed(0); auto f = [&](){ - std::this_thread::sleep_for(std::chrono::milliseconds(1)); + burnMs(1)(); completed++; }; for (int i = 0; i < 1000; i++) { @@ -105,7 +109,7 @@ static void resizeUnderLoad() { TPE tpe(10); std::atomic completed(0); auto f = [&](){ - std::this_thread::sleep_for(std::chrono::milliseconds(1)); + burnMs(1)(); completed++; }; for (int i = 0; i < 1000; i++) { @@ -124,3 +128,75 @@ TEST(ThreadPoolExecutorTest, CPUResizeUnderLoad) { TEST(ThreadPoolExecutorTest, IOResizeUnderLoad) { resizeUnderLoad(); } + +template +static void poolStats() { + { + TPE tpe(10); + for (int i = 0; i < 20; i++) { + tpe.add(burnMs(20)); + } + burnMs(10)(); + auto stats = tpe.getPoolStats(); + EXPECT_EQ(10, stats.threadCount); + EXPECT_EQ(0, stats.idleThreadCount); + EXPECT_EQ(10, stats.activeThreadCount); + EXPECT_EQ(10, stats.pendingTaskCount); + EXPECT_EQ(20, stats.totalTaskCount); + } + + { + TPE tpe(10); + for (int i = 0; i < 5; i++) { + tpe.add(burnMs(20)); + } + burnMs(10)(); + auto stats = tpe.getPoolStats(); + EXPECT_EQ(10, stats.threadCount); + EXPECT_EQ(5, stats.idleThreadCount); + EXPECT_EQ(5, stats.activeThreadCount); + EXPECT_EQ(0, stats.pendingTaskCount); + EXPECT_EQ(5, stats.totalTaskCount); + } +} + +TEST(ThreadPoolExecutorTest, CPUPoolStats) { + poolStats(); +} + +TEST(ThreadPoolExecutorTest, IOPoolStats) { + poolStats(); +} + +template +static void taskStats() { + TPE tpe(10); + std::atomic c(0); + tpe.subscribeToTaskStats(Observer::create( + [&] (ThreadPoolExecutor::TaskStats stats) { + int i = c++; + if (i < 10) { + EXPECT_GE(10000, stats.waitTime.count()); + EXPECT_LE(20000, stats.runTime.count()); + } else { + EXPECT_LE(10000, stats.waitTime.count()); + EXPECT_LE(10000, stats.runTime.count()); + } + })); + for (int i = 0; i < 10; i++) { + tpe.add(burnMs(20)); + } + for (int i = 0; i < 10; i++) { + tpe.add(burnMs(10)); + } + tpe.join(); + EXPECT_EQ(20, c); +} + +TEST(ThreadPoolExecutorTest, CPUTaskStats) { + taskStats(); +} + +TEST(ThreadPoolExecutorTest, IOTaskStats) { + taskStats(); +} diff --git a/folly/experimental/wangle/rx/types.h b/folly/experimental/wangle/rx/types.h index 54dd0099..317fac14 100644 --- a/folly/experimental/wangle/rx/types.h +++ b/folly/experimental/wangle/rx/types.h @@ -17,6 +17,7 @@ #pragma once #include +#include namespace folly { namespace wangle { typedef folly::exception_wrapper Error;