+++ /dev/null
-/*
- * Copyright 2014 Facebook, Inc.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-#pragma once
-
-#include <functional>
-
-namespace folly { namespace wangle {
-
-typedef std::function<void()> Func;
-
-namespace experimental { // TODO(jsedgwick) merge with folly/wangle/Executor.h
-
-class Executor {
- public:
- virtual ~Executor() {};
- virtual void add(Func func) = 0;
-};
-
-}
-
-}} // folly::wangle
*/
#pragma once
-#include <folly/experimental/wangle/concurrent/Executor.h>
+#include <folly/wangle/Executor.h>
#include <thread>
*/
#pragma once
-#include <folly/experimental/wangle/concurrent/Executor.h>
+#include <folly/wangle/Executor.h>
#include <folly/experimental/wangle/concurrent/LifoSemMPMCQueue.h>
#include <folly/experimental/wangle/concurrent/NamedThreadFactory.h>
#include <folly/experimental/wangle/rx/Observable.h>
namespace folly { namespace wangle {
-class ThreadPoolExecutor : public experimental::Executor {
+class ThreadPoolExecutor : public Executor {
public:
explicit ThreadPoolExecutor(
size_t numThreads,
#pragma once
#include <boost/noncopyable.hpp>
-#include <chrono>
#include <functional>
-#include <memory>
-#include <stdexcept>
namespace folly { namespace wangle {
+ typedef std::function<void()> Func;
+
/// An Executor accepts units of work with add(), which should be
/// threadsafe.
- /// Like an Rx Scheduler. We should probably rename it to match now that it
- /// has scheduling semantics too, but that's a codemod for another lazy
- /// summer afternoon.
class Executor : boost::noncopyable {
public:
- typedef std::function<void()> Action;
- // Reality is that better than millisecond resolution is very hard to
- // achieve. However, we reserve the right to be incredible.
- typedef std::chrono::microseconds Duration;
- typedef std::chrono::steady_clock::time_point TimePoint;
-
- virtual ~Executor() = default;
-
- /// Enqueue an action to be performed by this executor. This and all
- /// schedule variants must be threadsafe.
- virtual void add(Action&&) = 0;
-
- /// A convenience function for shared_ptr to legacy functors.
- ///
- /// Sometimes you have a functor that is move-only, and therefore can't be
- /// converted to a std::function (e.g. std::packaged_task). In that case,
- /// wrap it in a shared_ptr (or maybe folly::MoveWrapper) and use this.
- template <class P>
- void addPtr(P fn) {
- this->add([fn]() mutable { (*fn)(); });
- }
-
- /// Alias for add() (for Rx consistency)
- void schedule(Action&& a) { add(std::move(a)); }
-
- /// Schedule an action to be executed after dur time has elapsed
- /// Expect millisecond resolution at best.
- void schedule(Action&& a, Duration const& dur) {
- scheduleAt(std::move(a), now() + dur);
- }
-
- /// Schedule an action to be executed at time t, or as soon afterward as
- /// possible. Expect millisecond resolution at best. Must be threadsafe.
- virtual void scheduleAt(Action&& a, TimePoint const& t) {
- throw std::logic_error("unimplemented");
- }
-
- /// Get this executor's notion of time. Must be threadsafe.
- virtual TimePoint now() {
- return std::chrono::steady_clock::now();
- }
+ virtual ~Executor() = default;
+
+ /// Enqueue a function to executed by this executor. This and all
+ /// variants must be threadsafe.
+ virtual void add(Func) = 0;
+
+ /// A convenience function for shared_ptr to legacy functors.
+ ///
+ /// Sometimes you have a functor that is move-only, and therefore can't be
+ /// converted to a std::function (e.g. std::packaged_task). In that case,
+ /// wrap it in a shared_ptr (or maybe folly::MoveWrapper) and use this.
+ template <class P>
+ void addPtr(P fn) {
+ this->add([fn]() mutable { (*fn)(); });
+ }
};
}}
/// QueuedImmediateExecutor.
class InlineExecutor : public Executor {
public:
- void add(std::function<void()>&& f) override {
+ void add(Func f) override {
f();
}
};
}
}
-void ManualExecutor::add(std::function<void()>&& callback) {
+void ManualExecutor::add(Func callback) {
std::lock_guard<std::mutex> lock(lock_);
- actions_.push(callback);
+ funcs_.push(std::move(callback));
sem_post(&sem_);
}
size_t ManualExecutor::run() {
size_t count;
size_t n;
- Action action;
+ Func func;
{
std::lock_guard<std::mutex> lock(lock_);
- while (!scheduledActions_.empty()) {
- auto& sa = scheduledActions_.top();
- if (sa.time > now_)
+ while (!scheduledFuncs_.empty()) {
+ auto& sf = scheduledFuncs_.top();
+ if (sf.time > now_)
break;
- actions_.push(sa.action);
- scheduledActions_.pop();
+ funcs_.push(sf.func);
+ scheduledFuncs_.pop();
}
- n = actions_.size();
+ n = funcs_.size();
}
for (count = 0; count < n; count++) {
{
std::lock_guard<std::mutex> lock(lock_);
- if (actions_.empty()) {
+ if (funcs_.empty()) {
break;
}
// This may fail (with EAGAIN), that's fine.
sem_trywait(&sem_);
- action = std::move(actions_.front());
- actions_.pop();
+ func = std::move(funcs_.front());
+ funcs_.pop();
}
- action();
+ func();
}
return count;
while (true) {
{
std::lock_guard<std::mutex> lock(lock_);
- if (!actions_.empty())
+ if (!funcs_.empty())
break;
}
*/
#pragma once
-#include <folly/wangle/Executor.h>
+#include <folly/wangle/ScheduledExecutor.h>
#include <semaphore.h>
#include <memory>
#include <mutex>
///
/// NB No attempt has been made to make anything other than add and schedule
/// threadsafe.
- class ManualExecutor : public Executor {
+ class ManualExecutor : public ScheduledExecutor {
public:
ManualExecutor();
- void add(Action&&) override;
+ void add(Func) override;
- /// Do work. Returns the number of actions that were executed (maybe 0).
+ /// Do work. Returns the number of functions that were executed (maybe 0).
/// Non-blocking, in the sense that we don't wait for work (we can't
- /// control whether one of the actions blocks).
+ /// control whether one of the functions blocks).
/// This is stable, it will not chase an ever-increasing tail of work.
/// This also means, there may be more work available to perform at the
/// moment that this returns.
makeProgress();
}
- virtual void scheduleAt(Action&& a, TimePoint const& t) override {
+ virtual void scheduleAt(Func&& f, TimePoint const& t) override {
std::lock_guard<std::mutex> lock(lock_);
- scheduledActions_.emplace(t, std::move(a));
+ scheduledFuncs_.emplace(t, std::move(f));
sem_post(&sem_);
}
private:
std::mutex lock_;
- std::queue<Action> actions_;
+ std::queue<Func> funcs_;
sem_t sem_;
// helper class to enable ordering of scheduled events in the priority
// queue
- struct ScheduledAction {
+ struct ScheduledFunc {
TimePoint time;
size_t ordinal;
- Action action;
+ Func func;
- ScheduledAction(TimePoint const& t, Action&& a)
- : time(t), action(std::move(a))
+ ScheduledFunc(TimePoint const& t, Func&& f)
+ : time(t), func(std::move(f))
{
static size_t seq = 0;
ordinal = seq++;
}
- bool operator<(ScheduledAction const& b) const {
+ bool operator<(ScheduledFunc const& b) const {
if (time == b.time)
return ordinal < b.ordinal;
return time < b.time;
}
};
- std::priority_queue<ScheduledAction> scheduledActions_;
+ std::priority_queue<ScheduledFunc> scheduledFuncs_;
TimePoint now_ = now_.min();
};
namespace folly { namespace wangle {
-void QueuedImmediateExecutor::add(Action&& callback)
-{
- thread_local std::queue<Action> q;
+void QueuedImmediateExecutor::add(Func callback) {
+ thread_local std::queue<Func> q;
if (q.empty()) {
q.push(std::move(callback));
*/
class QueuedImmediateExecutor : public Executor {
public:
- void add(Action&&) override;
+ void add(Func) override;
};
}} // namespace
--- /dev/null
+/*
+ * Copyright 2014 Facebook, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#pragma once
+
+#include <folly/wangle/Executor.h>
+#include <chrono>
+#include <memory>
+#include <stdexcept>
+
+namespace folly { namespace wangle {
+ // An executor that supports timed scheduling. Like RxScheduler.
+ class ScheduledExecutor : public Executor {
+ public:
+ // Reality is that better than millisecond resolution is very hard to
+ // achieve. However, we reserve the right to be incredible.
+ typedef std::chrono::microseconds Duration;
+ typedef std::chrono::steady_clock::time_point TimePoint;
+
+ virtual ~ScheduledExecutor() = default;
+
+ virtual void add(Func) override = 0;
+
+ /// Alias for add() (for Rx consistency)
+ void schedule(Func&& a) { add(std::move(a)); }
+
+ /// Schedule a Func to be executed after dur time has elapsed
+ /// Expect millisecond resolution at best.
+ void schedule(Func&& a, Duration const& dur) {
+ scheduleAt(std::move(a), now() + dur);
+ }
+
+ /// Schedule a Func to be executed at time t, or as soon afterward as
+ /// possible. Expect millisecond resolution at best. Must be threadsafe.
+ virtual void scheduleAt(Func&& a, TimePoint const& t) {
+ throw std::logic_error("unimplemented");
+ }
+
+ /// Get this executor's notion of time. Must be threadsafe.
+ virtual TimePoint now() {
+ return std::chrono::steady_clock::now();
+ }
+ };
+}}