Summary: Observer methods, so users of IOThreadPoolExecutor can do stuff when threads are added/removed. As a use case, previously the thrift server only used the threads already started when it started up, and assumed iothreadpool was never resized.
Test Plan: Added several unittests
Reviewed By: jsedgwick@fb.com
Subscribers: trunkagent, doug, fugalh, alandau, bmatheny, mshneer, folly-diffs@
FB internal diff:
D1753861
Signature: t1:
1753861:
1420236825:
54cbdfee0efb3b97dea35faba29c134f2b10a480
auto task = taskQueue_->take();
if (UNLIKELY(task.poison)) {
CHECK(threadsToStop_-- > 0);
+ for (auto& o : observers_) {
+ o->threadStopped(thread.get());
+ }
+
stoppedThreads_.add(thread);
return;
} else {
return pickThread()->eventBase;
}
+EventBase* IOThreadPoolExecutor::getEventBase(
+ ThreadPoolExecutor::ThreadHandle* h) {
+ auto thread = dynamic_cast<IOThread*>(h);
+
+ if (thread) {
+ return thread->eventBase;
+ }
+
+ return nullptr;
+}
+
std::shared_ptr<ThreadPoolExecutor::Thread>
IOThreadPoolExecutor::makeThread() {
return std::make_shared<IOThread>(this);
for (size_t i = 0; i < n; i++) {
const auto ioThread = std::static_pointer_cast<IOThread>(
threadList_.get()[i]);
+ for (auto& o : observers_) {
+ o->threadStopped(ioThread.get());
+ }
ioThread->shouldRun = false;
ioThread->eventBase->terminateLoopSoon();
}
}
-std::vector<EventBase*> IOThreadPoolExecutor::getEventBases() {
- std::vector<EventBase*> bases;
- RWSpinLock::ReadHolder{&threadListLock_};
- for (const auto& thread : threadList_.get()) {
- auto ioThread = std::static_pointer_cast<IOThread>(thread);
- bases.push_back(ioThread->eventBase);
- }
- return bases;
-}
-
// threadListLock_ is readlocked
uint64_t IOThreadPoolExecutor::getPendingTaskCount() {
uint64_t count = 0;
EventBase* getEventBase() override;
- std::vector<EventBase*> getEventBases();
+ EventBase* getEventBase(ThreadPoolExecutor::ThreadHandle*);
private:
struct FOLLY_ALIGN_TO_AVOID_FALSE_SHARING IOThread : public Thread {
for (auto& thread : newThreads) {
thread->startupBaton.wait();
}
+ for (auto& o : observers_) {
+ for (auto& thread : newThreads) {
+ o->threadStarted(thread.get());
+ }
+ }
}
// threadListLock_ is writelocked
return queue_.size();
}
+void ThreadPoolExecutor::addObserver(std::shared_ptr<Observer> o) {
+ RWSpinLock::ReadHolder{&threadListLock_};
+ observers_.push_back(o);
+ for (auto& thread : threadList_.get()) {
+ o->threadPreviouslyStarted(thread.get());
+ }
+}
+
+void ThreadPoolExecutor::removeObserver(std::shared_ptr<Observer> o) {
+ RWSpinLock::ReadHolder{&threadListLock_};
+ for (auto& thread : threadList_.get()) {
+ o->threadNotYetStopped(thread.get());
+ }
+
+ for (auto it = observers_.begin(); it != observers_.end(); it++) {
+ if (*it == o) {
+ observers_.erase(it);
+ return;
+ }
+ }
+ DCHECK(false);
+}
+
}} // folly::wangle
return taskStatsSubject_->subscribe(observer);
}
+ /**
+ * Base class for threads created with ThreadPoolExecutor.
+ * Some subclasses have methods that operate on these
+ * handles.
+ */
+ class ThreadHandle {
+ public:
+ virtual ~ThreadHandle() = default;
+ };
+
+ /**
+ * Observer interface for thread start/stop.
+ * Provides hooks so actions can be taken when
+ * threads are created
+ */
+ class Observer {
+ public:
+ virtual void threadStarted(ThreadHandle*) = 0;
+ virtual void threadStopped(ThreadHandle*) = 0;
+ virtual void threadPreviouslyStarted(ThreadHandle*) = 0;
+ virtual void threadNotYetStopped(ThreadHandle*) = 0;
+ virtual ~Observer() = default;
+ };
+
+ void addObserver(std::shared_ptr<Observer>);
+ void removeObserver(std::shared_ptr<Observer>);
+
protected:
// Prerequisite: threadListLock_ writelocked
void addThreads(size_t n);
// Prerequisite: threadListLock_ writelocked
void removeThreads(size_t n, bool isJoin);
- struct FOLLY_ALIGN_TO_AVOID_FALSE_SHARING Thread {
+ struct FOLLY_ALIGN_TO_AVOID_FALSE_SHARING Thread : public ThreadHandle {
explicit Thread(ThreadPoolExecutor* pool)
: id(nextId++),
handle(),
std::atomic<bool> isJoin_; // whether the current downsizing is a join
std::shared_ptr<Subject<TaskStats>> taskStatsSubject_;
+ std::vector<std::shared_ptr<Observer>> observers_;
};
}} // folly::wangle
pool.join();
EXPECT_EQ(100, completed);
}
+
+class TestObserver : public ThreadPoolExecutor::Observer {
+ public:
+ void threadStarted(ThreadPoolExecutor::ThreadHandle*) {
+ threads_++;
+ }
+ void threadStopped(ThreadPoolExecutor::ThreadHandle*) {
+ threads_--;
+ }
+ void threadPreviouslyStarted(ThreadPoolExecutor::ThreadHandle*) {
+ threads_++;
+ }
+ void threadNotYetStopped(ThreadPoolExecutor::ThreadHandle*) {
+ threads_--;
+ }
+ void checkCalls() {
+ ASSERT_EQ(threads_, 0);
+ }
+ private:
+ int threads_{0};
+};
+
+TEST(ThreadPoolExecutorTest, IOObserver) {
+ auto observer = std::make_shared<TestObserver>();
+
+ {
+ IOThreadPoolExecutor exe(10);
+ exe.addObserver(observer);
+ exe.setNumThreads(3);
+ exe.setNumThreads(0);
+ exe.setNumThreads(7);
+ exe.removeObserver(observer);
+ exe.setNumThreads(10);
+ }
+
+ observer->checkCalls();
+}
+
+TEST(ThreadPoolExecutorTest, CPUObserver) {
+ auto observer = std::make_shared<TestObserver>();
+
+ {
+ CPUThreadPoolExecutor exe(10);
+ exe.addObserver(observer);
+ exe.setNumThreads(3);
+ exe.setNumThreads(0);
+ exe.setNumThreads(7);
+ exe.removeObserver(observer);
+ exe.setNumThreads(10);
+ }
+
+ observer->checkCalls();
+}