auto moveTask = folly::makeMoveWrapper(
Task(std::move(func), expiration, std::move(expireCallback)));
- auto wrappedFunc = [this, ioThread, moveTask] () mutable {
+ auto wrappedFunc = [ioThread, moveTask] () mutable {
runTask(ioThread, std::move(*moveTask));
ioThread->pendingTasks--;
};
std::shared_ptr<ThreadPoolExecutor::Thread>
IOThreadPoolExecutor::makeThread() {
- return std::make_shared<IOThread>();
+ return std::make_shared<IOThread>(this);
}
void IOThreadPoolExecutor::threadRun(ThreadPtr thread) {
uint64_t getPendingTaskCount() override;
struct FOLLY_ALIGN_TO_AVOID_FALSE_SHARING IOThread : public Thread {
- IOThread() : shouldRun(true), pendingTasks(0) {};
+ IOThread(IOThreadPoolExecutor* pool)
+ : Thread(pool),
+ shouldRun(true),
+ pendingTasks(0) {};
std::atomic<bool> shouldRun;
std::atomic<size_t> pendingTasks;
EventBase* eventBase;
ThreadPoolExecutor::ThreadPoolExecutor(
size_t numThreads,
std::shared_ptr<ThreadFactory> threadFactory)
- : threadFactory_(std::move(threadFactory)) {}
+ : threadFactory_(std::move(threadFactory)),
+ taskStatsSubject_(std::make_shared<Subject<TaskStats>>()) {}
ThreadPoolExecutor::~ThreadPoolExecutor() {
CHECK(threadList_.get().size() == 0);
task.stats_.runTime = std::chrono::steady_clock::now() - startTime;
}
thread->idle = true;
- taskStatsSubject_.onNext(std::move(task.stats_));
+ thread->taskStatsSubject->onNext(std::move(task.stats_));
}
size_t ThreadPoolExecutor::numThreads() {
Subscription<TaskStats> subscribeToTaskStats(
const ObserverPtr<TaskStats>& observer) {
- return taskStatsSubject_.subscribe(observer);
+ return taskStatsSubject_->subscribe(observer);
}
protected:
void removeThreads(size_t n, bool isJoin);
struct FOLLY_ALIGN_TO_AVOID_FALSE_SHARING Thread {
+ explicit Thread(ThreadPoolExecutor* pool)
+ : id(nextId++),
+ handle(),
+ idle(true),
+ taskStatsSubject(pool->taskStatsSubject_) {}
+
virtual ~Thread() {}
- Thread() : id(nextId++), handle(), idle(true) {};
+
static std::atomic<uint64_t> nextId;
uint64_t id;
std::thread handle;
bool idle;
Baton<> startupBaton;
+ std::shared_ptr<Subject<TaskStats>> taskStatsSubject;
};
typedef std::shared_ptr<Thread> ThreadPtr;
Func expireCallback_;
};
- void runTask(const ThreadPtr& thread, Task&& task);
+ static void runTask(const ThreadPtr& thread, Task&& task);
// The function that will be bound to pool threads. It must call
// thread->startupBaton.post() when it's ready to consume work.
// Create a suitable Thread struct
virtual ThreadPtr makeThread() {
- return std::make_shared<Thread>();
+ return std::make_shared<Thread>(this);
}
// Prerequisite: threadListLock_ readlocked
StoppedThreadQueue stoppedThreads_;
std::atomic<bool> isJoin_; // whether the current downsizing is a join
- Subject<TaskStats> taskStatsSubject_;
+ std::shared_ptr<Subject<TaskStats>> taskStatsSubject_;
};
}} // folly::wangle