From 395c7e78e17e9521fcb9b8e3474d7ff17095b3ac Mon Sep 17 00:00:00 2001 From: James Sedgwick Date: Tue, 23 Sep 2014 11:17:03 -0700 Subject: [PATCH] user-defined expirations Summary: Couple of notes: 1. is it a bummer not to have per-task callbacks of some kind? the interfaces set up here only tell you that some task expired, not which one expired. TM calls back with the Runnable object. is that useful? 2. std::chrono::* business is frustratingly verbose, but the safety/explicitness is nice. Not sure how I feel overall. 3. perhaps expirations should be given in microseconds even if we don't think we can accurately accomplish that Test Plan: added unit Reviewed By: hans@fb.com Subscribers: fugalh, njormrod, bmatheny FB internal diff: D1563520 --- .../concurrent/CPUThreadPoolExecutor.cpp | 11 ++++- .../wangle/concurrent/CPUThreadPoolExecutor.h | 15 ++++++- .../concurrent/IOThreadPoolExecutor.cpp | 10 ++++- .../wangle/concurrent/IOThreadPoolExecutor.h | 4 ++ .../wangle/concurrent/ThreadPoolExecutor.cpp | 42 +++++++++++++----- .../wangle/concurrent/ThreadPoolExecutor.h | 40 ++++++++--------- .../test/ThreadPoolExecutorTest.cpp | 43 ++++++++++++++++--- 7 files changed, 121 insertions(+), 44 deletions(-) diff --git a/folly/experimental/wangle/concurrent/CPUThreadPoolExecutor.cpp b/folly/experimental/wangle/concurrent/CPUThreadPoolExecutor.cpp index ca88e580..daac2eb9 100644 --- a/folly/experimental/wangle/concurrent/CPUThreadPoolExecutor.cpp +++ b/folly/experimental/wangle/concurrent/CPUThreadPoolExecutor.cpp @@ -36,13 +36,20 @@ CPUThreadPoolExecutor::~CPUThreadPoolExecutor() { } void CPUThreadPoolExecutor::add(Func func) { + add(std::move(func), std::chrono::milliseconds(0)); +} + +void CPUThreadPoolExecutor::add( + Func func, + std::chrono::milliseconds expiration, + Func expireCallback) { // TODO handle enqueue failure, here and in other add() callsites - taskQueue_->add(CPUTask(std::move(func))); + taskQueue_->add( + CPUTask(std::move(func), expiration, std::move(expireCallback))); } void CPUThreadPoolExecutor::threadRun(std::shared_ptr thread) { while (1) { - // TODO expiration / codel auto task = taskQueue_->take(); if (UNLIKELY(task.poison)) { CHECK(threadsToStop_-- > 0); diff --git a/folly/experimental/wangle/concurrent/CPUThreadPoolExecutor.h b/folly/experimental/wangle/concurrent/CPUThreadPoolExecutor.h index 7811c678..28e2dad6 100644 --- a/folly/experimental/wangle/concurrent/CPUThreadPoolExecutor.h +++ b/folly/experimental/wangle/concurrent/CPUThreadPoolExecutor.h @@ -34,11 +34,22 @@ class CPUThreadPoolExecutor : public ThreadPoolExecutor { ~CPUThreadPoolExecutor(); void add(Func func) override; + void add( + Func func, + std::chrono::milliseconds expiration, + Func expireCallback = nullptr) override; struct CPUTask : public ThreadPoolExecutor::Task { // Must be noexcept move constructible so it can be used in MPMCQueue - explicit CPUTask(Func&& f) : Task(std::move(f)), poison(false) {} - CPUTask() : Task(nullptr), poison(true) {} + explicit CPUTask( + Func&& f, + std::chrono::milliseconds expiration, + Func&& expireCallback) + : Task(std::move(f), expiration, std::move(expireCallback)), + poison(false) {} + CPUTask() + : Task(nullptr, std::chrono::milliseconds(0), nullptr), + poison(true) {} CPUTask(CPUTask&& o) noexcept : Task(std::move(o)), poison(o.poison) {} CPUTask(const CPUTask&) = default; CPUTask& operator=(const CPUTask&) = default; diff --git a/folly/experimental/wangle/concurrent/IOThreadPoolExecutor.cpp b/folly/experimental/wangle/concurrent/IOThreadPoolExecutor.cpp index 6e106f92..80d5ef73 100644 --- a/folly/experimental/wangle/concurrent/IOThreadPoolExecutor.cpp +++ b/folly/experimental/wangle/concurrent/IOThreadPoolExecutor.cpp @@ -35,6 +35,13 @@ IOThreadPoolExecutor::~IOThreadPoolExecutor() { } void IOThreadPoolExecutor::add(Func func) { + add(std::move(func), std::chrono::milliseconds(0)); +} + +void IOThreadPoolExecutor::add( + Func func, + std::chrono::milliseconds expiration, + Func expireCallback) { RWSpinLock::ReadHolder{&threadListLock_}; if (threadList_.get().empty()) { throw std::runtime_error("No threads available"); @@ -42,7 +49,8 @@ void IOThreadPoolExecutor::add(Func func) { auto thread = threadList_.get()[nextThread_++ % threadList_.get().size()]; auto ioThread = std::static_pointer_cast(thread); - auto moveTask = folly::makeMoveWrapper(Task(std::move(func))); + auto moveTask = folly::makeMoveWrapper( + Task(std::move(func), expiration, std::move(expireCallback))); auto wrappedFunc = [this, ioThread, moveTask] () mutable { runTask(ioThread, std::move(*moveTask)); ioThread->pendingTasks--; diff --git a/folly/experimental/wangle/concurrent/IOThreadPoolExecutor.h b/folly/experimental/wangle/concurrent/IOThreadPoolExecutor.h index c42da719..60f9d933 100644 --- a/folly/experimental/wangle/concurrent/IOThreadPoolExecutor.h +++ b/folly/experimental/wangle/concurrent/IOThreadPoolExecutor.h @@ -30,6 +30,10 @@ class IOThreadPoolExecutor : public ThreadPoolExecutor { ~IOThreadPoolExecutor(); void add(Func func) override; + void add( + Func func, + std::chrono::milliseconds expiration, + Func expireCallback = nullptr) override; private: ThreadPtr makeThread() override; diff --git a/folly/experimental/wangle/concurrent/ThreadPoolExecutor.cpp b/folly/experimental/wangle/concurrent/ThreadPoolExecutor.cpp index 30e46f5c..8b0b158d 100644 --- a/folly/experimental/wangle/concurrent/ThreadPoolExecutor.cpp +++ b/folly/experimental/wangle/concurrent/ThreadPoolExecutor.cpp @@ -27,23 +27,43 @@ ThreadPoolExecutor::~ThreadPoolExecutor() { CHECK(threadList_.get().size() == 0); } +ThreadPoolExecutor::Task::Task( + Func&& func, + std::chrono::milliseconds expiration, + Func&& expireCallback) + : func_(std::move(func)), + expiration_(expiration), + expireCallback_(std::move(expireCallback)) { + // Assume that the task in enqueued on creation + enqueueTime_ = std::chrono::steady_clock::now(); +} + void ThreadPoolExecutor::runTask( const ThreadPtr& thread, Task&& task) { thread->idle = false; - task.started(); - try { - task.func(); - } catch (const std::exception& e) { - LOG(ERROR) << "ThreadPoolExecutor: func threw unhandled " << - typeid(e).name() << " exception: " << e.what(); - } catch (...) { - LOG(ERROR) << "ThreadPoolExecutor: func threw unhandled non-exception " - "object"; + auto startTime = std::chrono::steady_clock::now(); + task.stats_.waitTime = startTime - task.enqueueTime_; + if (task.expiration_ > std::chrono::milliseconds(0) && + task.stats_.waitTime >= task.expiration_) { + task.stats_.expired = true; + if (task.expireCallback_ != nullptr) { + task.expireCallback_(); + } + } else { + try { + task.func_(); + } catch (const std::exception& e) { + LOG(ERROR) << "ThreadPoolExecutor: func threw unhandled " << + typeid(e).name() << " exception: " << e.what(); + } catch (...) { + LOG(ERROR) << "ThreadPoolExecutor: func threw unhandled non-exception " + "object"; + } + task.stats_.runTime = std::chrono::steady_clock::now() - startTime; } - task.completed(); - taskStatsSubject_.onNext(std::move(task.stats)); thread->idle = true; + taskStatsSubject_.onNext(std::move(task.stats_)); } size_t ThreadPoolExecutor::numThreads() { diff --git a/folly/experimental/wangle/concurrent/ThreadPoolExecutor.h b/folly/experimental/wangle/concurrent/ThreadPoolExecutor.h index 4eda2d36..bf0dfda8 100644 --- a/folly/experimental/wangle/concurrent/ThreadPoolExecutor.h +++ b/folly/experimental/wangle/concurrent/ThreadPoolExecutor.h @@ -38,6 +38,12 @@ class ThreadPoolExecutor : public experimental::Executor { ~ThreadPoolExecutor(); + virtual void add(Func func) override = 0; + virtual void add( + Func func, + std::chrono::milliseconds expiration, + Func expireCallback) = 0; + size_t numThreads(); void setNumThreads(size_t numThreads); void stop(); @@ -55,8 +61,8 @@ class ThreadPoolExecutor : public experimental::Executor { struct TaskStats { TaskStats() : expired(false), waitTime(0), runTime(0) {} bool expired; - std::chrono::microseconds waitTime; - std::chrono::microseconds runTime; + std::chrono::nanoseconds waitTime; + std::chrono::nanoseconds runTime; }; Subscription subscribeToTaskStats( @@ -82,27 +88,15 @@ class ThreadPoolExecutor : public experimental::Executor { typedef std::shared_ptr ThreadPtr; struct Task { - explicit Task(Func&& f) : func(std::move(f)) { - // Assume that the task in enqueued on creation - intervalBegin = std::chrono::steady_clock::now(); - } - - Func func; - TaskStats stats; - // TODO per-task timeouts, expirations - - void started() { - auto now = std::chrono::steady_clock::now(); - stats.waitTime = std::chrono::duration_cast( - now - intervalBegin); - intervalBegin = now; - } - void completed() { - stats.runTime = std::chrono::duration_cast( - std::chrono::steady_clock::now() - intervalBegin); - } - - std::chrono::steady_clock::time_point intervalBegin; + explicit Task( + Func&& func, + std::chrono::milliseconds expiration, + Func&& expireCallback); + Func func_; + TaskStats stats_; + std::chrono::steady_clock::time_point enqueueTime_; + std::chrono::milliseconds expiration_; + Func expireCallback_; }; void runTask(const ThreadPtr& thread, Task&& task); diff --git a/folly/experimental/wangle/concurrent/test/ThreadPoolExecutorTest.cpp b/folly/experimental/wangle/concurrent/test/ThreadPoolExecutorTest.cpp index eb8527ca..8b972773 100644 --- a/folly/experimental/wangle/concurrent/test/ThreadPoolExecutorTest.cpp +++ b/folly/experimental/wangle/concurrent/test/ThreadPoolExecutorTest.cpp @@ -21,9 +21,10 @@ #include using namespace folly::wangle; +using namespace std::chrono; static Func burnMs(uint64_t ms) { - return [ms]() { std::this_thread::sleep_for(std::chrono::milliseconds(ms)); }; + return [ms]() { std::this_thread::sleep_for(milliseconds(ms)); }; } template @@ -176,11 +177,11 @@ static void taskStats() { [&] (ThreadPoolExecutor::TaskStats stats) { int i = c++; if (i < 10) { - EXPECT_GE(10000, stats.waitTime.count()); - EXPECT_LE(20000, stats.runTime.count()); + EXPECT_GE(milliseconds(10), stats.waitTime); + EXPECT_LE(milliseconds(20), stats.runTime); } else { - EXPECT_LE(10000, stats.waitTime.count()); - EXPECT_LE(10000, stats.runTime.count()); + EXPECT_LE(milliseconds(10), stats.waitTime); + EXPECT_LE(milliseconds(10), stats.runTime); } })); for (int i = 0; i < 10; i++) { @@ -200,3 +201,35 @@ TEST(ThreadPoolExecutorTest, CPUTaskStats) { TEST(ThreadPoolExecutorTest, IOTaskStats) { taskStats(); } + +template +static void expiration() { + TPE tpe(1); + std::atomic statCbCount(0); + tpe.subscribeToTaskStats(Observer::create( + [&] (ThreadPoolExecutor::TaskStats stats) { + int i = statCbCount++; + if (i == 0) { + EXPECT_FALSE(stats.expired); + } else if (i == 1) { + EXPECT_TRUE(stats.expired); + } else { + FAIL(); + } + })); + std::atomic expireCbCount(0); + auto expireCb = [&] () { expireCbCount++; }; + tpe.add(burnMs(10), milliseconds(10), expireCb); + tpe.add(burnMs(10), milliseconds(10), expireCb); + tpe.join(); + EXPECT_EQ(2, statCbCount); + EXPECT_EQ(1, expireCbCount); +} + +TEST(ThreadPoolExecutorTest, CPUExpiration) { + expiration(); +} + +TEST(ThreadPoolExecutorTest, IOExpiration) { + expiration(); +} -- 2.34.1