2 * Copyright 2014-present Facebook, Inc.
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
8 * http://www.apache.org/licenses/LICENSE-2.0
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
24 #include <folly/executors/DrivableExecutor.h>
25 #include <folly/executors/ScheduledExecutor.h>
26 #include <folly/synchronization/LifoSem.h>
29 /// A ManualExecutor only does work when you turn the crank, by calling
30 /// run() or indirectly with makeProgress() or waitFor().
32 /// The clock for a manual executor starts at 0 and advances only when you
33 /// ask it to. i.e. time is also under manual control.
35 /// NB No attempt has been made to make anything other than add and schedule
37 class ManualExecutor : public DrivableExecutor,
38 public ScheduledExecutor {
40 void add(Func) override;
42 /// Do work. Returns the number of functions that were executed (maybe 0).
43 /// Non-blocking, in the sense that we don't wait for work (we can't
44 /// control whether one of the functions blocks).
45 /// This is stable, it will not chase an ever-increasing tail of work.
46 /// This also means, there may be more work available to perform at the
47 /// moment that this returns.
50 // Do work until there is no more work to do.
51 // Returns the number of functions that were executed (maybe 0).
52 // Unlike run, this method is not stable. It will chase an infinite tail of
53 // work so should be used with care.
54 // There will be no work available to perform at the moment that this
58 /// Wait for work to do.
61 /// Wait for work to do, and do it.
67 /// Implements DrivableExecutor
68 void drive() override {
72 /// makeProgress until this Future is ready.
73 template <class F> void waitFor(F const& f) {
79 while (!f.isReady()) {
86 void scheduleAt(Func&& f, TimePoint const& t) override {
87 std::lock_guard<std::mutex> lock(lock_);
88 scheduledFuncs_.emplace(t, std::move(f));
92 /// Advance the clock. The clock never advances on its own.
93 /// Advancing the clock causes some work to be done, if work is available
94 /// to do (perhaps newly available because of the advanced clock).
95 /// If dur is <= 0 this is a noop.
96 void advance(Duration const& dur) {
97 advanceTo(now_ + dur);
100 /// Advance the clock to this absolute time. If t is <= now(),
102 void advanceTo(TimePoint const& t);
104 TimePoint now() override { return now_; }
106 /// Flush the function queue. Destroys all stored functions without
107 /// executing them. Returns number of removed functions.
108 std::size_t clear() {
109 std::queue<Func> funcs;
110 std::priority_queue<ScheduledFunc> scheduled_funcs;
113 std::lock_guard<std::mutex> lock(lock_);
115 scheduledFuncs_.swap(scheduled_funcs);
118 return funcs.size() + scheduled_funcs.size();
123 std::queue<Func> funcs_;
126 // helper class to enable ordering of scheduled events in the priority
128 struct ScheduledFunc {
133 ScheduledFunc(TimePoint const& t, Func&& f)
134 : time(t), func(std::move(f))
136 static size_t seq = 0;
140 bool operator<(ScheduledFunc const& b) const {
141 // Earlier-scheduled things must be *higher* priority
142 // in the max-based std::priority_queue
143 if (time == b.time) {
144 return ordinal > b.ordinal;
146 return time > b.time;
149 Func&& moveOutFunc() const {
150 return std::move(func);
153 std::priority_queue<ScheduledFunc> scheduledFuncs_;
154 TimePoint now_ = TimePoint::min();