}
void CPUThreadPoolExecutor::add(Func func) {
+ add(std::move(func), std::chrono::milliseconds(0));
+}
+
+void CPUThreadPoolExecutor::add(
+ Func func,
+ std::chrono::milliseconds expiration,
+ Func expireCallback) {
// TODO handle enqueue failure, here and in other add() callsites
- taskQueue_->add(CPUTask(std::move(func)));
+ taskQueue_->add(
+ CPUTask(std::move(func), expiration, std::move(expireCallback)));
}
void CPUThreadPoolExecutor::threadRun(std::shared_ptr<Thread> thread) {
while (1) {
- // TODO expiration / codel
auto task = taskQueue_->take();
if (UNLIKELY(task.poison)) {
CHECK(threadsToStop_-- > 0);
~CPUThreadPoolExecutor();
void add(Func func) override;
+ void add(
+ Func func,
+ std::chrono::milliseconds expiration,
+ Func expireCallback = nullptr) override;
struct CPUTask : public ThreadPoolExecutor::Task {
// Must be noexcept move constructible so it can be used in MPMCQueue
- explicit CPUTask(Func&& f) : Task(std::move(f)), poison(false) {}
- CPUTask() : Task(nullptr), poison(true) {}
+ explicit CPUTask(
+ Func&& f,
+ std::chrono::milliseconds expiration,
+ Func&& expireCallback)
+ : Task(std::move(f), expiration, std::move(expireCallback)),
+ poison(false) {}
+ CPUTask()
+ : Task(nullptr, std::chrono::milliseconds(0), nullptr),
+ poison(true) {}
CPUTask(CPUTask&& o) noexcept : Task(std::move(o)), poison(o.poison) {}
CPUTask(const CPUTask&) = default;
CPUTask& operator=(const CPUTask&) = default;
}
void IOThreadPoolExecutor::add(Func func) {
+ add(std::move(func), std::chrono::milliseconds(0));
+}
+
+void IOThreadPoolExecutor::add(
+ Func func,
+ std::chrono::milliseconds expiration,
+ Func expireCallback) {
RWSpinLock::ReadHolder{&threadListLock_};
if (threadList_.get().empty()) {
throw std::runtime_error("No threads available");
auto thread = threadList_.get()[nextThread_++ % threadList_.get().size()];
auto ioThread = std::static_pointer_cast<IOThread>(thread);
- auto moveTask = folly::makeMoveWrapper(Task(std::move(func)));
+ auto moveTask = folly::makeMoveWrapper(
+ Task(std::move(func), expiration, std::move(expireCallback)));
auto wrappedFunc = [this, ioThread, moveTask] () mutable {
runTask(ioThread, std::move(*moveTask));
ioThread->pendingTasks--;
~IOThreadPoolExecutor();
void add(Func func) override;
+ void add(
+ Func func,
+ std::chrono::milliseconds expiration,
+ Func expireCallback = nullptr) override;
private:
ThreadPtr makeThread() override;
CHECK(threadList_.get().size() == 0);
}
+ThreadPoolExecutor::Task::Task(
+ Func&& func,
+ std::chrono::milliseconds expiration,
+ Func&& expireCallback)
+ : func_(std::move(func)),
+ expiration_(expiration),
+ expireCallback_(std::move(expireCallback)) {
+ // Assume that the task in enqueued on creation
+ enqueueTime_ = std::chrono::steady_clock::now();
+}
+
void ThreadPoolExecutor::runTask(
const ThreadPtr& thread,
Task&& task) {
thread->idle = false;
- task.started();
- try {
- task.func();
- } catch (const std::exception& e) {
- LOG(ERROR) << "ThreadPoolExecutor: func threw unhandled " <<
- typeid(e).name() << " exception: " << e.what();
- } catch (...) {
- LOG(ERROR) << "ThreadPoolExecutor: func threw unhandled non-exception "
- "object";
+ auto startTime = std::chrono::steady_clock::now();
+ task.stats_.waitTime = startTime - task.enqueueTime_;
+ if (task.expiration_ > std::chrono::milliseconds(0) &&
+ task.stats_.waitTime >= task.expiration_) {
+ task.stats_.expired = true;
+ if (task.expireCallback_ != nullptr) {
+ task.expireCallback_();
+ }
+ } else {
+ try {
+ task.func_();
+ } catch (const std::exception& e) {
+ LOG(ERROR) << "ThreadPoolExecutor: func threw unhandled " <<
+ typeid(e).name() << " exception: " << e.what();
+ } catch (...) {
+ LOG(ERROR) << "ThreadPoolExecutor: func threw unhandled non-exception "
+ "object";
+ }
+ task.stats_.runTime = std::chrono::steady_clock::now() - startTime;
}
- task.completed();
- taskStatsSubject_.onNext(std::move(task.stats));
thread->idle = true;
+ taskStatsSubject_.onNext(std::move(task.stats_));
}
size_t ThreadPoolExecutor::numThreads() {
~ThreadPoolExecutor();
+ virtual void add(Func func) override = 0;
+ virtual void add(
+ Func func,
+ std::chrono::milliseconds expiration,
+ Func expireCallback) = 0;
+
size_t numThreads();
void setNumThreads(size_t numThreads);
void stop();
struct TaskStats {
TaskStats() : expired(false), waitTime(0), runTime(0) {}
bool expired;
- std::chrono::microseconds waitTime;
- std::chrono::microseconds runTime;
+ std::chrono::nanoseconds waitTime;
+ std::chrono::nanoseconds runTime;
};
Subscription subscribeToTaskStats(
typedef std::shared_ptr<Thread> ThreadPtr;
struct Task {
- explicit Task(Func&& f) : func(std::move(f)) {
- // Assume that the task in enqueued on creation
- intervalBegin = std::chrono::steady_clock::now();
- }
-
- Func func;
- TaskStats stats;
- // TODO per-task timeouts, expirations
-
- void started() {
- auto now = std::chrono::steady_clock::now();
- stats.waitTime = std::chrono::duration_cast<std::chrono::microseconds>(
- now - intervalBegin);
- intervalBegin = now;
- }
- void completed() {
- stats.runTime = std::chrono::duration_cast<std::chrono::microseconds>(
- std::chrono::steady_clock::now() - intervalBegin);
- }
-
- std::chrono::steady_clock::time_point intervalBegin;
+ explicit Task(
+ Func&& func,
+ std::chrono::milliseconds expiration,
+ Func&& expireCallback);
+ Func func_;
+ TaskStats stats_;
+ std::chrono::steady_clock::time_point enqueueTime_;
+ std::chrono::milliseconds expiration_;
+ Func expireCallback_;
};
void runTask(const ThreadPtr& thread, Task&& task);
#include <gtest/gtest.h>
using namespace folly::wangle;
+using namespace std::chrono;
static Func burnMs(uint64_t ms) {
- return [ms]() { std::this_thread::sleep_for(std::chrono::milliseconds(ms)); };
+ return [ms]() { std::this_thread::sleep_for(milliseconds(ms)); };
}
template <class TPE>
[&] (ThreadPoolExecutor::TaskStats stats) {
int i = c++;
if (i < 10) {
- EXPECT_GE(10000, stats.waitTime.count());
- EXPECT_LE(20000, stats.runTime.count());
+ EXPECT_GE(milliseconds(10), stats.waitTime);
+ EXPECT_LE(milliseconds(20), stats.runTime);
} else {
- EXPECT_LE(10000, stats.waitTime.count());
- EXPECT_LE(10000, stats.runTime.count());
+ EXPECT_LE(milliseconds(10), stats.waitTime);
+ EXPECT_LE(milliseconds(10), stats.runTime);
}
}));
for (int i = 0; i < 10; i++) {
TEST(ThreadPoolExecutorTest, IOTaskStats) {
taskStats<IOThreadPoolExecutor>();
}
+
+template <class TPE>
+static void expiration() {
+ TPE tpe(1);
+ std::atomic<int> statCbCount(0);
+ tpe.subscribeToTaskStats(Observer<ThreadPoolExecutor::TaskStats>::create(
+ [&] (ThreadPoolExecutor::TaskStats stats) {
+ int i = statCbCount++;
+ if (i == 0) {
+ EXPECT_FALSE(stats.expired);
+ } else if (i == 1) {
+ EXPECT_TRUE(stats.expired);
+ } else {
+ FAIL();
+ }
+ }));
+ std::atomic<int> expireCbCount(0);
+ auto expireCb = [&] () { expireCbCount++; };
+ tpe.add(burnMs(10), milliseconds(10), expireCb);
+ tpe.add(burnMs(10), milliseconds(10), expireCb);
+ tpe.join();
+ EXPECT_EQ(2, statCbCount);
+ EXPECT_EQ(1, expireCbCount);
+}
+
+TEST(ThreadPoolExecutorTest, CPUExpiration) {
+ expiration<CPUThreadPoolExecutor>();
+}
+
+TEST(ThreadPoolExecutorTest, IOExpiration) {
+ expiration<IOThreadPoolExecutor>();
+}