From ca5fc3666621fbfe0ce1211e32ebe5f646e9af0c Mon Sep 17 00:00:00 2001 From: James Sedgwick Date: Wed, 1 Oct 2014 10:32:54 -0700 Subject: [PATCH] expose event base from IOThreadPoolExecutor Summary: I'm not 100% sure this is the best way to go about this but I don't hate it either. I'm going to start seeing how it might fit into tserver - my guess is that some sort Cpp2WorkerFactory which also manages those objects would get plugged in as the thread factory Haven't fleshed out how this would relate to TEventBaseManager Test Plan: added unit, starting to play with this in Thrift2 server Reviewed By: davejwatson@fb.com Subscribers: alandau, bmatheny, trunkagent, fugalh, njormrod FB internal diff: D1574660 --- .../wangle/concurrent/CPUThreadPoolExecutor.cpp | 1 + .../wangle/concurrent/IOThreadPoolExecutor.cpp | 12 ++++++++---- .../wangle/concurrent/IOThreadPoolExecutor.h | 2 +- .../wangle/concurrent/ThreadPoolExecutor.cpp | 8 +++++++- .../wangle/concurrent/ThreadPoolExecutor.h | 5 ++++- 5 files changed, 21 insertions(+), 7 deletions(-) diff --git a/folly/experimental/wangle/concurrent/CPUThreadPoolExecutor.cpp b/folly/experimental/wangle/concurrent/CPUThreadPoolExecutor.cpp index daac2eb9..6d826b55 100644 --- a/folly/experimental/wangle/concurrent/CPUThreadPoolExecutor.cpp +++ b/folly/experimental/wangle/concurrent/CPUThreadPoolExecutor.cpp @@ -49,6 +49,7 @@ void CPUThreadPoolExecutor::add( } void CPUThreadPoolExecutor::threadRun(std::shared_ptr thread) { + thread->startupBaton.post(); while (1) { auto task = taskQueue_->take(); if (UNLIKELY(task.poison)) { diff --git a/folly/experimental/wangle/concurrent/IOThreadPoolExecutor.cpp b/folly/experimental/wangle/concurrent/IOThreadPoolExecutor.cpp index 80d5ef73..8de3d5ab 100644 --- a/folly/experimental/wangle/concurrent/IOThreadPoolExecutor.cpp +++ b/folly/experimental/wangle/concurrent/IOThreadPoolExecutor.cpp @@ -18,6 +18,7 @@ #include #include +#include namespace folly { namespace wangle { @@ -57,7 +58,7 @@ void IOThreadPoolExecutor::add( }; ioThread->pendingTasks++; - if (!ioThread->eventBase.runInEventBaseThread(std::move(wrappedFunc))) { + if (!ioThread->eventBase->runInEventBaseThread(std::move(wrappedFunc))) { ioThread->pendingTasks--; throw std::runtime_error("Unable to run func in event base thread"); } @@ -70,12 +71,15 @@ IOThreadPoolExecutor::makeThread() { void IOThreadPoolExecutor::threadRun(ThreadPtr thread) { const auto ioThread = std::static_pointer_cast(thread); + ioThread->eventBase = + apache::thrift::async::TEventBaseManager::get()->getEventBase(); + thread->startupBaton.post(); while (ioThread->shouldRun) { - ioThread->eventBase.loopForever(); + ioThread->eventBase->loopForever(); } if (isJoin_) { while (ioThread->pendingTasks > 0) { - ioThread->eventBase.loopOnce(); + ioThread->eventBase->loopOnce(); } } stoppedThreads_.add(ioThread); @@ -87,7 +91,7 @@ void IOThreadPoolExecutor::stopThreads(size_t n) { const auto ioThread = std::static_pointer_cast( threadList_.get()[i]); ioThread->shouldRun = false; - ioThread->eventBase.terminateLoopSoon(); + ioThread->eventBase->terminateLoopSoon(); } } diff --git a/folly/experimental/wangle/concurrent/IOThreadPoolExecutor.h b/folly/experimental/wangle/concurrent/IOThreadPoolExecutor.h index 60f9d933..a6bf5215 100644 --- a/folly/experimental/wangle/concurrent/IOThreadPoolExecutor.h +++ b/folly/experimental/wangle/concurrent/IOThreadPoolExecutor.h @@ -45,7 +45,7 @@ class IOThreadPoolExecutor : public ThreadPoolExecutor { IOThread() : shouldRun(true), pendingTasks(0) {}; std::atomic shouldRun; std::atomic pendingTasks; - EventBase eventBase; + EventBase* eventBase; }; size_t nextThread_; diff --git a/folly/experimental/wangle/concurrent/ThreadPoolExecutor.cpp b/folly/experimental/wangle/concurrent/ThreadPoolExecutor.cpp index 8b0b158d..d8ddfac1 100644 --- a/folly/experimental/wangle/concurrent/ThreadPoolExecutor.cpp +++ b/folly/experimental/wangle/concurrent/ThreadPoolExecutor.cpp @@ -84,14 +84,20 @@ void ThreadPoolExecutor::setNumThreads(size_t n) { // threadListLock_ is writelocked void ThreadPoolExecutor::addThreads(size_t n) { + std::vector newThreads; for (int i = 0; i < n; i++) { - auto thread = makeThread(); + newThreads.push_back(makeThread()); + } + for (auto& thread : newThreads) { // TODO need a notion of failing to create the thread // and then handling for that case thread->handle = threadFactory_->newThread( std::bind(&ThreadPoolExecutor::threadRun, this, thread)); threadList_.add(thread); } + for (auto& thread : newThreads) { + thread->startupBaton.wait(); + } } // threadListLock_ is writelocked diff --git a/folly/experimental/wangle/concurrent/ThreadPoolExecutor.h b/folly/experimental/wangle/concurrent/ThreadPoolExecutor.h index bf0dfda8..54819ad6 100644 --- a/folly/experimental/wangle/concurrent/ThreadPoolExecutor.h +++ b/folly/experimental/wangle/concurrent/ThreadPoolExecutor.h @@ -19,6 +19,7 @@ #include #include #include +#include #include #include @@ -83,6 +84,7 @@ class ThreadPoolExecutor : public experimental::Executor { uint64_t id; std::thread handle; bool idle; + Baton<> startupBaton; }; typedef std::shared_ptr ThreadPtr; @@ -101,7 +103,8 @@ class ThreadPoolExecutor : public experimental::Executor { void runTask(const ThreadPtr& thread, Task&& task); - // The function that will be bound to pool threads + // The function that will be bound to pool threads. It must call + // thread->startupBaton.post() when it's ready to consume work. virtual void threadRun(ThreadPtr thread) = 0; // Stop n threads and put their ThreadPtrs in the threadsStopped_ queue -- 2.34.1