From: Hans Fugal Date: Thu, 19 Jun 2014 01:03:45 +0000 (-0700) Subject: Scheduler interface of Executor X-Git-Tag: v0.22.0~501 X-Git-Url: http://demsky.eecs.uci.edu/git/?a=commitdiff_plain;h=d7a43326b76def70da0b780d71813eab5a887e2c;p=folly.git Scheduler interface of Executor Summary: and ManualExecutor implementation Test Plan: unit tests, contbuild Reviewed By: davejwatson@fb.com Subscribers: bmatheny, folly@lists, net-systems@, fugalh, exa, marccelani, jsedgwick FB internal diff: D1392999 Tasks: 4548494 --- diff --git a/folly/wangle/Executor.h b/folly/wangle/Executor.h index 29037b47..8cbd089a 100644 --- a/folly/wangle/Executor.h +++ b/folly/wangle/Executor.h @@ -18,11 +18,44 @@ #include #include +#include namespace folly { namespace wangle { + // Like an Rx Scheduler. We should probably rename it to match now that it + // has scheduling semantics too, but that's a codemod for another lazy + // summer afternoon. class Executor : boost::noncopyable { public: + typedef std::function Action; + // Reality is that better than millisecond resolution is very hard to + // achieve. However, we reserve the right to be incredible. + typedef std::chrono::microseconds Duration; + typedef std::chrono::steady_clock::time_point TimePoint; + virtual ~Executor() = default; - virtual void add(std::function&&) = 0; + + /// Enqueue an action to be performed by this executor. This and all + /// schedule variants must be threadsafe. + virtual void add(Action&&) = 0; + + /// Alias for add() (for Rx consistency) + void schedule(Action&& a) { add(std::move(a)); } + + /// Schedule an action to be executed after dur time has elapsed + /// Expect millisecond resolution at best. + void schedule(Action&& a, Duration const& dur) { + scheduleAt(std::move(a), now() + dur); + } + + /// Schedule an action to be executed at time t, or as soon afterward as + /// possible. Expect millisecond resolution at best. Must be threadsafe. + virtual void scheduleAt(Action&& a, TimePoint const& t) { + throw std::logic_error("unimplemented"); + } + + /// Get this executor's notion of time. Must be threadsafe. + virtual TimePoint now() { + return std::chrono::steady_clock::now(); + } }; }} diff --git a/folly/wangle/ManualExecutor.cpp b/folly/wangle/ManualExecutor.cpp index cb9e6aaf..eed8a84d 100644 --- a/folly/wangle/ManualExecutor.cpp +++ b/folly/wangle/ManualExecutor.cpp @@ -18,6 +18,7 @@ #include #include +#include #include @@ -31,24 +32,33 @@ ManualExecutor::ManualExecutor() { void ManualExecutor::add(std::function&& callback) { std::lock_guard lock(lock_); - runnables_.push(callback); + actions_.push(callback); sem_post(&sem_); } size_t ManualExecutor::run() { size_t count; size_t n; - std::function runnable; + Action action; { std::lock_guard lock(lock_); - n = runnables_.size(); + + while (!scheduledActions_.empty()) { + auto& sa = scheduledActions_.top(); + if (sa.time > now_) + break; + actions_.push(sa.action); + scheduledActions_.pop(); + } + + n = actions_.size(); } for (count = 0; count < n; count++) { { std::lock_guard lock(lock_); - if (runnables_.empty()) { + if (actions_.empty()) { break; } @@ -57,10 +67,10 @@ size_t ManualExecutor::run() { // This may fail (with EAGAIN), that's fine. sem_trywait(&sem_); - runnable = std::move(runnables_.front()); - runnables_.pop(); + action = std::move(actions_.front()); + actions_.pop(); } - runnable(); + action(); } return count; @@ -70,7 +80,7 @@ void ManualExecutor::wait() { while (true) { { std::lock_guard lock(lock_); - if (!runnables_.empty()) + if (!actions_.empty()) break; } diff --git a/folly/wangle/ManualExecutor.h b/folly/wangle/ManualExecutor.h index 14c455fa..e7fb8190 100644 --- a/folly/wangle/ManualExecutor.h +++ b/folly/wangle/ManualExecutor.h @@ -20,17 +20,29 @@ #include #include #include +#include namespace folly { namespace wangle { - + /// A ManualExecutor only does work when you turn the crank, by calling + /// run() or indirectly with makeProgress() or waitFor(). + /// + /// The clock for a manual executor starts at 0 and advances only when you + /// ask it to. i.e. time is also under manual control. + /// + /// NB No attempt has been made to make anything other than add and schedule + /// threadsafe. class ManualExecutor : public Executor { public: ManualExecutor(); - void add(std::function&&) override; + void add(Action&&) override; - /// Do work. Returns the number of runnables that were executed (maybe 0). - /// Non-blocking. + /// Do work. Returns the number of actions that were executed (maybe 0). + /// Non-blocking, in the sense that we don't wait for work (we can't + /// control whether one of the actions blocks). + /// This is stable, it will not chase an ever-increasing tail of work. + /// This also means, there may be more work available to perform at the + /// moment that this returns. size_t run(); /// Wait for work to do. @@ -42,15 +54,64 @@ namespace folly { namespace wangle { run(); } + /// makeProgress until this Future is ready. template void waitFor(F const& f) { while (!f.isReady()) makeProgress(); } + virtual void scheduleAt(Action&& a, TimePoint const& t) override { + std::lock_guard lock(lock_); + scheduledActions_.emplace(t, std::move(a)); + sem_post(&sem_); + } + + /// Advance the clock. The clock never advances on its own. + /// Advancing the clock causes some work to be done, if work is available + /// to do (perhaps newly available because of the advanced clock). + /// If dur is <= 0 this is a noop. + void advance(Duration const& dur) { + advanceTo(now_ + dur); + } + + /// Advance the clock to this absolute time. If t is <= now(), + /// this is a noop. + void advanceTo(TimePoint const& t) { + if (t > now_) { + now_ = t; + } + run(); + } + + TimePoint now() override { return now_; } + private: std::mutex lock_; - std::queue> runnables_; + std::queue actions_; sem_t sem_; + + // helper class to enable ordering of scheduled events in the priority + // queue + struct ScheduledAction { + TimePoint time; + size_t ordinal; + Action action; + + ScheduledAction(TimePoint const& t, Action&& a) + : time(t), action(std::move(a)) + { + static size_t seq = 0; + ordinal = seq++; + } + + bool operator<(ScheduledAction const& b) const { + if (time == b.time) + return ordinal < b.ordinal; + return time < b.time; + } + }; + std::priority_queue scheduledActions_; + TimePoint now_ = now_.min(); }; }} diff --git a/folly/wangle/test/ExecutorTest.cpp b/folly/wangle/test/ExecutorTest.cpp new file mode 100644 index 00000000..5230869f --- /dev/null +++ b/folly/wangle/test/ExecutorTest.cpp @@ -0,0 +1,88 @@ +/* + * Copyright 2014 Facebook, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include +#include "folly/wangle/ManualExecutor.h" + +using namespace testing; +using namespace folly::wangle; +using namespace std::chrono; + +TEST(ManualExecutor, runIsStable) { + ManualExecutor x; + size_t count = 0; + auto f1 = [&]() { count++; }; + auto f2 = [&]() { x.add(f1); x.add(f1); }; + x.add(f2); + x.run(); +} + +TEST(ManualExecutor, scheduleDur) { + ManualExecutor x; + size_t count = 0; + milliseconds dur {10}; + x.schedule([&]{ count++; }, dur); + EXPECT_EQ(count, 0); + x.run(); + EXPECT_EQ(count, 0); + x.advance(dur/2); + EXPECT_EQ(count, 0); + x.advance(dur/2); + EXPECT_EQ(count, 1); +} + +TEST(ManualExecutor, clockStartsAt0) { + ManualExecutor x; + EXPECT_EQ(x.now(), x.now().min()); +} + +TEST(ManualExecutor, scheduleAbs) { + ManualExecutor x; + size_t count = 0; + x.scheduleAt([&]{ count++; }, x.now() + milliseconds(10)); + EXPECT_EQ(count, 0); + x.advance(milliseconds(10)); + EXPECT_EQ(count, 1); +} + +TEST(ManualExecutor, advanceTo) { + ManualExecutor x; + size_t count = 0; + x.scheduleAt([&]{ count++; }, steady_clock::now()); + EXPECT_EQ(count, 0); + x.advanceTo(steady_clock::now()); + EXPECT_EQ(count, 1); +} + +TEST(ManualExecutor, advanceBack) { + ManualExecutor x; + size_t count = 0; + x.advance(microseconds(5)); + x.schedule([&]{ count++; }, microseconds(6)); + EXPECT_EQ(count, 0); + x.advanceTo(x.now() - microseconds(1)); + EXPECT_EQ(count, 0); +} + +TEST(ManualExecutor, advanceNeg) { + ManualExecutor x; + size_t count = 0; + x.advance(microseconds(5)); + x.schedule([&]{ count++; }, microseconds(6)); + EXPECT_EQ(count, 0); + x.advance(microseconds(-1)); + EXPECT_EQ(count, 0); +}