#include <boost/noncopyable.hpp>
#include <functional>
+#include <chrono>
namespace folly { namespace wangle {
+ // Like an Rx Scheduler. We should probably rename it to match now that it
+ // has scheduling semantics too, but that's a codemod for another lazy
+ // summer afternoon.
class Executor : boost::noncopyable {
public:
+ typedef std::function<void()> Action;
+ // Reality is that better than millisecond resolution is very hard to
+ // achieve. However, we reserve the right to be incredible.
+ typedef std::chrono::microseconds Duration;
+ typedef std::chrono::steady_clock::time_point TimePoint;
+
virtual ~Executor() = default;
- virtual void add(std::function<void()>&&) = 0;
+
+ /// Enqueue an action to be performed by this executor. This and all
+ /// schedule variants must be threadsafe.
+ virtual void add(Action&&) = 0;
+
+ /// Alias for add() (for Rx consistency)
+ void schedule(Action&& a) { add(std::move(a)); }
+
+ /// Schedule an action to be executed after dur time has elapsed
+ /// Expect millisecond resolution at best.
+ void schedule(Action&& a, Duration const& dur) {
+ scheduleAt(std::move(a), now() + dur);
+ }
+
+ /// Schedule an action to be executed at time t, or as soon afterward as
+ /// possible. Expect millisecond resolution at best. Must be threadsafe.
+ virtual void scheduleAt(Action&& a, TimePoint const& t) {
+ throw std::logic_error("unimplemented");
+ }
+
+ /// Get this executor's notion of time. Must be threadsafe.
+ virtual TimePoint now() {
+ return std::chrono::steady_clock::now();
+ }
};
}}
#include <string.h>
#include <string>
+#include <tuple>
#include <stdexcept>
void ManualExecutor::add(std::function<void()>&& callback) {
std::lock_guard<std::mutex> lock(lock_);
- runnables_.push(callback);
+ actions_.push(callback);
sem_post(&sem_);
}
size_t ManualExecutor::run() {
size_t count;
size_t n;
- std::function<void()> runnable;
+ Action action;
{
std::lock_guard<std::mutex> lock(lock_);
- n = runnables_.size();
+
+ while (!scheduledActions_.empty()) {
+ auto& sa = scheduledActions_.top();
+ if (sa.time > now_)
+ break;
+ actions_.push(sa.action);
+ scheduledActions_.pop();
+ }
+
+ n = actions_.size();
}
for (count = 0; count < n; count++) {
{
std::lock_guard<std::mutex> lock(lock_);
- if (runnables_.empty()) {
+ if (actions_.empty()) {
break;
}
// This may fail (with EAGAIN), that's fine.
sem_trywait(&sem_);
- runnable = std::move(runnables_.front());
- runnables_.pop();
+ action = std::move(actions_.front());
+ actions_.pop();
}
- runnable();
+ action();
}
return count;
while (true) {
{
std::lock_guard<std::mutex> lock(lock_);
- if (!runnables_.empty())
+ if (!actions_.empty())
break;
}
#include <memory>
#include <mutex>
#include <queue>
+#include <cstdio>
namespace folly { namespace wangle {
-
+ /// A ManualExecutor only does work when you turn the crank, by calling
+ /// run() or indirectly with makeProgress() or waitFor().
+ ///
+ /// The clock for a manual executor starts at 0 and advances only when you
+ /// ask it to. i.e. time is also under manual control.
+ ///
+ /// NB No attempt has been made to make anything other than add and schedule
+ /// threadsafe.
class ManualExecutor : public Executor {
public:
ManualExecutor();
- void add(std::function<void()>&&) override;
+ void add(Action&&) override;
- /// Do work. Returns the number of runnables that were executed (maybe 0).
- /// Non-blocking.
+ /// Do work. Returns the number of actions that were executed (maybe 0).
+ /// Non-blocking, in the sense that we don't wait for work (we can't
+ /// control whether one of the actions blocks).
+ /// This is stable, it will not chase an ever-increasing tail of work.
+ /// This also means, there may be more work available to perform at the
+ /// moment that this returns.
size_t run();
/// Wait for work to do.
run();
}
+ /// makeProgress until this Future is ready.
template <class F> void waitFor(F const& f) {
while (!f.isReady())
makeProgress();
}
+ virtual void scheduleAt(Action&& a, TimePoint const& t) override {
+ std::lock_guard<std::mutex> lock(lock_);
+ scheduledActions_.emplace(t, std::move(a));
+ sem_post(&sem_);
+ }
+
+ /// Advance the clock. The clock never advances on its own.
+ /// Advancing the clock causes some work to be done, if work is available
+ /// to do (perhaps newly available because of the advanced clock).
+ /// If dur is <= 0 this is a noop.
+ void advance(Duration const& dur) {
+ advanceTo(now_ + dur);
+ }
+
+ /// Advance the clock to this absolute time. If t is <= now(),
+ /// this is a noop.
+ void advanceTo(TimePoint const& t) {
+ if (t > now_) {
+ now_ = t;
+ }
+ run();
+ }
+
+ TimePoint now() override { return now_; }
+
private:
std::mutex lock_;
- std::queue<std::function<void()>> runnables_;
+ std::queue<Action> actions_;
sem_t sem_;
+
+ // helper class to enable ordering of scheduled events in the priority
+ // queue
+ struct ScheduledAction {
+ TimePoint time;
+ size_t ordinal;
+ Action action;
+
+ ScheduledAction(TimePoint const& t, Action&& a)
+ : time(t), action(std::move(a))
+ {
+ static size_t seq = 0;
+ ordinal = seq++;
+ }
+
+ bool operator<(ScheduledAction const& b) const {
+ if (time == b.time)
+ return ordinal < b.ordinal;
+ return time < b.time;
+ }
+ };
+ std::priority_queue<ScheduledAction> scheduledActions_;
+ TimePoint now_ = now_.min();
};
}}
--- /dev/null
+/*
+ * Copyright 2014 Facebook, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include <gtest/gtest.h>
+#include "folly/wangle/ManualExecutor.h"
+
+using namespace testing;
+using namespace folly::wangle;
+using namespace std::chrono;
+
+TEST(ManualExecutor, runIsStable) {
+ ManualExecutor x;
+ size_t count = 0;
+ auto f1 = [&]() { count++; };
+ auto f2 = [&]() { x.add(f1); x.add(f1); };
+ x.add(f2);
+ x.run();
+}
+
+TEST(ManualExecutor, scheduleDur) {
+ ManualExecutor x;
+ size_t count = 0;
+ milliseconds dur {10};
+ x.schedule([&]{ count++; }, dur);
+ EXPECT_EQ(count, 0);
+ x.run();
+ EXPECT_EQ(count, 0);
+ x.advance(dur/2);
+ EXPECT_EQ(count, 0);
+ x.advance(dur/2);
+ EXPECT_EQ(count, 1);
+}
+
+TEST(ManualExecutor, clockStartsAt0) {
+ ManualExecutor x;
+ EXPECT_EQ(x.now(), x.now().min());
+}
+
+TEST(ManualExecutor, scheduleAbs) {
+ ManualExecutor x;
+ size_t count = 0;
+ x.scheduleAt([&]{ count++; }, x.now() + milliseconds(10));
+ EXPECT_EQ(count, 0);
+ x.advance(milliseconds(10));
+ EXPECT_EQ(count, 1);
+}
+
+TEST(ManualExecutor, advanceTo) {
+ ManualExecutor x;
+ size_t count = 0;
+ x.scheduleAt([&]{ count++; }, steady_clock::now());
+ EXPECT_EQ(count, 0);
+ x.advanceTo(steady_clock::now());
+ EXPECT_EQ(count, 1);
+}
+
+TEST(ManualExecutor, advanceBack) {
+ ManualExecutor x;
+ size_t count = 0;
+ x.advance(microseconds(5));
+ x.schedule([&]{ count++; }, microseconds(6));
+ EXPECT_EQ(count, 0);
+ x.advanceTo(x.now() - microseconds(1));
+ EXPECT_EQ(count, 0);
+}
+
+TEST(ManualExecutor, advanceNeg) {
+ ManualExecutor x;
+ size_t count = 0;
+ x.advance(microseconds(5));
+ x.schedule([&]{ count++; }, microseconds(6));
+ EXPECT_EQ(count, 0);
+ x.advance(microseconds(-1));
+ EXPECT_EQ(count, 0);
+}