From 0a14b4ccb443adbe283e8681e4e151b2eb20ceca Mon Sep 17 00:00:00 2001 From: James Sedgwick Date: Mon, 17 Nov 2014 15:04:35 -0800 Subject: [PATCH] fix potential race/memory corruption in IOThreadPoolExecutor Summary: In unusual but possible circumstances, the EventBase and thus pending tasks will outlive the pool, so we shouldn't keep references of any kind to the pool in the task. The only reference we were keeping was used to access the task stats rx subject. Store the subject as a shared ptr and give a copy of the ptr to the Thread object, which is itself owned by a shared ptr and captured by every task. I thought this had to do with the thread local leak in mentioned in the test plan of D1682860 but this patch doesn't actually fix that :( Thankfully, while task surfing I saw @phillip's awesome D1682698. Patching that in fixes the leak! Woo. Either way, this is more correct. Test Plan: unit under clang/asan Reviewed By: davejwatson@fb.com Subscribers: trunkagent, fugalh, njormrod, folly-diffs@, philipp FB internal diff: D1683221 Tasks: 5336655 Signature: t1:1683221:1416264933:946d29b5a3eb22ed08812f2adefb7284b1899e4e --- .../wangle/concurrent/IOThreadPoolExecutor.cpp | 4 ++-- .../wangle/concurrent/IOThreadPoolExecutor.h | 5 ++++- .../wangle/concurrent/ThreadPoolExecutor.cpp | 5 +++-- .../wangle/concurrent/ThreadPoolExecutor.h | 17 ++++++++++++----- 4 files changed, 21 insertions(+), 10 deletions(-) diff --git a/folly/experimental/wangle/concurrent/IOThreadPoolExecutor.cpp b/folly/experimental/wangle/concurrent/IOThreadPoolExecutor.cpp index dfca6216..d60472fc 100644 --- a/folly/experimental/wangle/concurrent/IOThreadPoolExecutor.cpp +++ b/folly/experimental/wangle/concurrent/IOThreadPoolExecutor.cpp @@ -93,7 +93,7 @@ void IOThreadPoolExecutor::add( auto moveTask = folly::makeMoveWrapper( Task(std::move(func), expiration, std::move(expireCallback))); - auto wrappedFunc = [this, ioThread, moveTask] () mutable { + auto wrappedFunc = [ioThread, moveTask] () mutable { runTask(ioThread, std::move(*moveTask)); ioThread->pendingTasks--; }; @@ -107,7 +107,7 @@ void IOThreadPoolExecutor::add( std::shared_ptr IOThreadPoolExecutor::makeThread() { - return std::make_shared(); + return std::make_shared(this); } void IOThreadPoolExecutor::threadRun(ThreadPtr thread) { diff --git a/folly/experimental/wangle/concurrent/IOThreadPoolExecutor.h b/folly/experimental/wangle/concurrent/IOThreadPoolExecutor.h index 7acaafe8..35cce4c0 100644 --- a/folly/experimental/wangle/concurrent/IOThreadPoolExecutor.h +++ b/folly/experimental/wangle/concurrent/IOThreadPoolExecutor.h @@ -42,7 +42,10 @@ class IOThreadPoolExecutor : public ThreadPoolExecutor { uint64_t getPendingTaskCount() override; struct FOLLY_ALIGN_TO_AVOID_FALSE_SHARING IOThread : public Thread { - IOThread() : shouldRun(true), pendingTasks(0) {}; + IOThread(IOThreadPoolExecutor* pool) + : Thread(pool), + shouldRun(true), + pendingTasks(0) {}; std::atomic shouldRun; std::atomic pendingTasks; EventBase* eventBase; diff --git a/folly/experimental/wangle/concurrent/ThreadPoolExecutor.cpp b/folly/experimental/wangle/concurrent/ThreadPoolExecutor.cpp index 18d8c275..74890d7b 100644 --- a/folly/experimental/wangle/concurrent/ThreadPoolExecutor.cpp +++ b/folly/experimental/wangle/concurrent/ThreadPoolExecutor.cpp @@ -21,7 +21,8 @@ namespace folly { namespace wangle { ThreadPoolExecutor::ThreadPoolExecutor( size_t numThreads, std::shared_ptr threadFactory) - : threadFactory_(std::move(threadFactory)) {} + : threadFactory_(std::move(threadFactory)), + taskStatsSubject_(std::make_shared>()) {} ThreadPoolExecutor::~ThreadPoolExecutor() { CHECK(threadList_.get().size() == 0); @@ -63,7 +64,7 @@ void ThreadPoolExecutor::runTask( task.stats_.runTime = std::chrono::steady_clock::now() - startTime; } thread->idle = true; - taskStatsSubject_.onNext(std::move(task.stats_)); + thread->taskStatsSubject->onNext(std::move(task.stats_)); } size_t ThreadPoolExecutor::numThreads() { diff --git a/folly/experimental/wangle/concurrent/ThreadPoolExecutor.h b/folly/experimental/wangle/concurrent/ThreadPoolExecutor.h index 68da850a..b1a3dd38 100644 --- a/folly/experimental/wangle/concurrent/ThreadPoolExecutor.h +++ b/folly/experimental/wangle/concurrent/ThreadPoolExecutor.h @@ -73,7 +73,7 @@ class ThreadPoolExecutor : public Executor { Subscription subscribeToTaskStats( const ObserverPtr& observer) { - return taskStatsSubject_.subscribe(observer); + return taskStatsSubject_->subscribe(observer); } protected: @@ -83,13 +83,20 @@ class ThreadPoolExecutor : public Executor { void removeThreads(size_t n, bool isJoin); struct FOLLY_ALIGN_TO_AVOID_FALSE_SHARING Thread { + explicit Thread(ThreadPoolExecutor* pool) + : id(nextId++), + handle(), + idle(true), + taskStatsSubject(pool->taskStatsSubject_) {} + virtual ~Thread() {} - Thread() : id(nextId++), handle(), idle(true) {}; + static std::atomic nextId; uint64_t id; std::thread handle; bool idle; Baton<> startupBaton; + std::shared_ptr> taskStatsSubject; }; typedef std::shared_ptr ThreadPtr; @@ -106,7 +113,7 @@ class ThreadPoolExecutor : public Executor { Func expireCallback_; }; - void runTask(const ThreadPtr& thread, Task&& task); + static void runTask(const ThreadPtr& thread, Task&& task); // The function that will be bound to pool threads. It must call // thread->startupBaton.post() when it's ready to consume work. @@ -118,7 +125,7 @@ class ThreadPoolExecutor : public Executor { // Create a suitable Thread struct virtual ThreadPtr makeThread() { - return std::make_shared(); + return std::make_shared(this); } // Prerequisite: threadListLock_ readlocked @@ -168,7 +175,7 @@ class ThreadPoolExecutor : public Executor { StoppedThreadQueue stoppedThreads_; std::atomic isJoin_; // whether the current downsizing is a join - Subject taskStatsSubject_; + std::shared_ptr> taskStatsSubject_; }; }} // folly::wangle -- 2.34.1