Scheduler interface of Executor
[folly.git] / folly / wangle / ManualExecutor.h
1 /*
2  * Copyright 2014 Facebook, Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *   http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16
17 #pragma once
18 #include "folly/wangle/Executor.h"
19 #include <semaphore.h>
20 #include <memory>
21 #include <mutex>
22 #include <queue>
23 #include <cstdio>
24
25 namespace folly { namespace wangle {
26   /// A ManualExecutor only does work when you turn the crank, by calling
27   /// run() or indirectly with makeProgress() or waitFor().
28   ///
29   /// The clock for a manual executor starts at 0 and advances only when you
30   /// ask it to. i.e. time is also under manual control.
31   ///
32   /// NB No attempt has been made to make anything other than add and schedule
33   /// threadsafe.
34   class ManualExecutor : public Executor {
35    public:
36     ManualExecutor();
37
38     void add(Action&&) override;
39
40     /// Do work. Returns the number of actions that were executed (maybe 0).
41     /// Non-blocking, in the sense that we don't wait for work (we can't
42     /// control whether one of the actions blocks).
43     /// This is stable, it will not chase an ever-increasing tail of work.
44     /// This also means, there may be more work available to perform at the
45     /// moment that this returns.
46     size_t run();
47
48     /// Wait for work to do.
49     void wait();
50
51     /// Wait for work to do, and do it.
52     void makeProgress() {
53       wait();
54       run();
55     }
56
57     /// makeProgress until this Future is ready.
58     template <class F> void waitFor(F const& f) {
59       while (!f.isReady())
60         makeProgress();
61     }
62
63     virtual void scheduleAt(Action&& a, TimePoint const& t) override {
64       std::lock_guard<std::mutex> lock(lock_);
65       scheduledActions_.emplace(t, std::move(a));
66       sem_post(&sem_);
67     }
68
69     /// Advance the clock. The clock never advances on its own.
70     /// Advancing the clock causes some work to be done, if work is available
71     /// to do (perhaps newly available because of the advanced clock).
72     /// If dur is <= 0 this is a noop.
73     void advance(Duration const& dur) {
74       advanceTo(now_ + dur);
75     }
76
77     /// Advance the clock to this absolute time. If t is <= now(),
78     /// this is a noop.
79     void advanceTo(TimePoint const& t) {
80       if (t > now_) {
81         now_ = t;
82       }
83       run();
84     }
85
86     TimePoint now() override { return now_; }
87
88    private:
89     std::mutex lock_;
90     std::queue<Action> actions_;
91     sem_t sem_;
92
93     // helper class to enable ordering of scheduled events in the priority
94     // queue
95     struct ScheduledAction {
96       TimePoint time;
97       size_t ordinal;
98       Action action;
99
100       ScheduledAction(TimePoint const& t, Action&& a)
101         : time(t), action(std::move(a))
102       {
103         static size_t seq = 0;
104         ordinal = seq++;
105       }
106
107       bool operator<(ScheduledAction const& b) const {
108         if (time == b.time)
109           return ordinal < b.ordinal;
110         return time < b.time;
111       }
112     };
113     std::priority_queue<ScheduledAction> scheduledActions_;
114     TimePoint now_ = now_.min();
115   };
116
117 }}