From: James Sedgwick Date: Wed, 15 Oct 2014 22:57:47 +0000 (-0700) Subject: merge wangle/Executor.h and experimental/wangle/concurrent/Executor.h X-Git-Tag: v0.22.0~273 X-Git-Url: http://demsky.eecs.uci.edu/git/?a=commitdiff_plain;h=ea078b876b76faed4ba1585a3197bd94c54ec261;p=folly.git merge wangle/Executor.h and experimental/wangle/concurrent/Executor.h Summary: the one in concurrent/ is a bit more generic, so I kept that as Executor and renamed the existing one ScheduledExecutor because Hans is surfing I took the liberty of renaming Action->Func as an alias for std::function, because I think it's more reflective also kept the version of add() that doesn't force rvalue-reference as it's more user friendly and probably not less performant in common cases (insert reference to "want speed? pass by value" here) Test Plan: compiled some major relevant bits, will let contbuild show me anything I missed Reviewed By: hans@fb.com Subscribers: trunkagent, rushix, fbcode-common-diffs@, fugalh, msk, njormrod FB internal diff: D1591237 Tasks: 5279196 --- diff --git a/folly/experimental/wangle/concurrent/Executor.h b/folly/experimental/wangle/concurrent/Executor.h deleted file mode 100644 index 2687ee6b..00000000 --- a/folly/experimental/wangle/concurrent/Executor.h +++ /dev/null @@ -1,35 +0,0 @@ -/* - * Copyright 2014 Facebook, Inc. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -#pragma once - -#include - -namespace folly { namespace wangle { - -typedef std::function Func; - -namespace experimental { // TODO(jsedgwick) merge with folly/wangle/Executor.h - -class Executor { - public: - virtual ~Executor() {}; - virtual void add(Func func) = 0; -}; - -} - -}} // folly::wangle diff --git a/folly/experimental/wangle/concurrent/ThreadFactory.h b/folly/experimental/wangle/concurrent/ThreadFactory.h index b5da0758..8f799065 100644 --- a/folly/experimental/wangle/concurrent/ThreadFactory.h +++ b/folly/experimental/wangle/concurrent/ThreadFactory.h @@ -15,7 +15,7 @@ */ #pragma once -#include +#include #include diff --git a/folly/experimental/wangle/concurrent/ThreadPoolExecutor.h b/folly/experimental/wangle/concurrent/ThreadPoolExecutor.h index 88c25e88..84b61051 100644 --- a/folly/experimental/wangle/concurrent/ThreadPoolExecutor.h +++ b/folly/experimental/wangle/concurrent/ThreadPoolExecutor.h @@ -15,7 +15,7 @@ */ #pragma once -#include +#include #include #include #include @@ -31,7 +31,7 @@ namespace folly { namespace wangle { -class ThreadPoolExecutor : public experimental::Executor { +class ThreadPoolExecutor : public Executor { public: explicit ThreadPoolExecutor( size_t numThreads, diff --git a/folly/wangle/Executor.h b/folly/wangle/Executor.h index 00a52b17..cc9ce34d 100644 --- a/folly/wangle/Executor.h +++ b/folly/wangle/Executor.h @@ -17,59 +17,29 @@ #pragma once #include -#include #include -#include -#include namespace folly { namespace wangle { + typedef std::function Func; + /// An Executor accepts units of work with add(), which should be /// threadsafe. - /// Like an Rx Scheduler. We should probably rename it to match now that it - /// has scheduling semantics too, but that's a codemod for another lazy - /// summer afternoon. class Executor : boost::noncopyable { public: - typedef std::function Action; - // Reality is that better than millisecond resolution is very hard to - // achieve. However, we reserve the right to be incredible. - typedef std::chrono::microseconds Duration; - typedef std::chrono::steady_clock::time_point TimePoint; - - virtual ~Executor() = default; - - /// Enqueue an action to be performed by this executor. This and all - /// schedule variants must be threadsafe. - virtual void add(Action&&) = 0; - - /// A convenience function for shared_ptr to legacy functors. - /// - /// Sometimes you have a functor that is move-only, and therefore can't be - /// converted to a std::function (e.g. std::packaged_task). In that case, - /// wrap it in a shared_ptr (or maybe folly::MoveWrapper) and use this. - template - void addPtr(P fn) { - this->add([fn]() mutable { (*fn)(); }); - } - - /// Alias for add() (for Rx consistency) - void schedule(Action&& a) { add(std::move(a)); } - - /// Schedule an action to be executed after dur time has elapsed - /// Expect millisecond resolution at best. - void schedule(Action&& a, Duration const& dur) { - scheduleAt(std::move(a), now() + dur); - } - - /// Schedule an action to be executed at time t, or as soon afterward as - /// possible. Expect millisecond resolution at best. Must be threadsafe. - virtual void scheduleAt(Action&& a, TimePoint const& t) { - throw std::logic_error("unimplemented"); - } - - /// Get this executor's notion of time. Must be threadsafe. - virtual TimePoint now() { - return std::chrono::steady_clock::now(); - } + virtual ~Executor() = default; + + /// Enqueue a function to executed by this executor. This and all + /// variants must be threadsafe. + virtual void add(Func) = 0; + + /// A convenience function for shared_ptr to legacy functors. + /// + /// Sometimes you have a functor that is move-only, and therefore can't be + /// converted to a std::function (e.g. std::packaged_task). In that case, + /// wrap it in a shared_ptr (or maybe folly::MoveWrapper) and use this. + template + void addPtr(P fn) { + this->add([fn]() mutable { (*fn)(); }); + } }; }} diff --git a/folly/wangle/InlineExecutor.h b/folly/wangle/InlineExecutor.h index 48f82bdd..df9bfe29 100644 --- a/folly/wangle/InlineExecutor.h +++ b/folly/wangle/InlineExecutor.h @@ -24,7 +24,7 @@ namespace folly { namespace wangle { /// QueuedImmediateExecutor. class InlineExecutor : public Executor { public: - void add(std::function&& f) override { + void add(Func f) override { f(); } }; diff --git a/folly/wangle/ManualExecutor.cpp b/folly/wangle/ManualExecutor.cpp index 560475cc..4a2ee3a3 100644 --- a/folly/wangle/ManualExecutor.cpp +++ b/folly/wangle/ManualExecutor.cpp @@ -30,35 +30,35 @@ ManualExecutor::ManualExecutor() { } } -void ManualExecutor::add(std::function&& callback) { +void ManualExecutor::add(Func callback) { std::lock_guard lock(lock_); - actions_.push(callback); + funcs_.push(std::move(callback)); sem_post(&sem_); } size_t ManualExecutor::run() { size_t count; size_t n; - Action action; + Func func; { std::lock_guard lock(lock_); - while (!scheduledActions_.empty()) { - auto& sa = scheduledActions_.top(); - if (sa.time > now_) + while (!scheduledFuncs_.empty()) { + auto& sf = scheduledFuncs_.top(); + if (sf.time > now_) break; - actions_.push(sa.action); - scheduledActions_.pop(); + funcs_.push(sf.func); + scheduledFuncs_.pop(); } - n = actions_.size(); + n = funcs_.size(); } for (count = 0; count < n; count++) { { std::lock_guard lock(lock_); - if (actions_.empty()) { + if (funcs_.empty()) { break; } @@ -67,10 +67,10 @@ size_t ManualExecutor::run() { // This may fail (with EAGAIN), that's fine. sem_trywait(&sem_); - action = std::move(actions_.front()); - actions_.pop(); + func = std::move(funcs_.front()); + funcs_.pop(); } - action(); + func(); } return count; @@ -80,7 +80,7 @@ void ManualExecutor::wait() { while (true) { { std::lock_guard lock(lock_); - if (!actions_.empty()) + if (!funcs_.empty()) break; } diff --git a/folly/wangle/ManualExecutor.h b/folly/wangle/ManualExecutor.h index 76be4b43..27caf28c 100644 --- a/folly/wangle/ManualExecutor.h +++ b/folly/wangle/ManualExecutor.h @@ -15,7 +15,7 @@ */ #pragma once -#include +#include #include #include #include @@ -31,15 +31,15 @@ namespace folly { namespace wangle { /// /// NB No attempt has been made to make anything other than add and schedule /// threadsafe. - class ManualExecutor : public Executor { + class ManualExecutor : public ScheduledExecutor { public: ManualExecutor(); - void add(Action&&) override; + void add(Func) override; - /// Do work. Returns the number of actions that were executed (maybe 0). + /// Do work. Returns the number of functions that were executed (maybe 0). /// Non-blocking, in the sense that we don't wait for work (we can't - /// control whether one of the actions blocks). + /// control whether one of the functions blocks). /// This is stable, it will not chase an ever-increasing tail of work. /// This also means, there may be more work available to perform at the /// moment that this returns. @@ -60,9 +60,9 @@ namespace folly { namespace wangle { makeProgress(); } - virtual void scheduleAt(Action&& a, TimePoint const& t) override { + virtual void scheduleAt(Func&& f, TimePoint const& t) override { std::lock_guard lock(lock_); - scheduledActions_.emplace(t, std::move(a)); + scheduledFuncs_.emplace(t, std::move(f)); sem_post(&sem_); } @@ -82,30 +82,30 @@ namespace folly { namespace wangle { private: std::mutex lock_; - std::queue actions_; + std::queue funcs_; sem_t sem_; // helper class to enable ordering of scheduled events in the priority // queue - struct ScheduledAction { + struct ScheduledFunc { TimePoint time; size_t ordinal; - Action action; + Func func; - ScheduledAction(TimePoint const& t, Action&& a) - : time(t), action(std::move(a)) + ScheduledFunc(TimePoint const& t, Func&& f) + : time(t), func(std::move(f)) { static size_t seq = 0; ordinal = seq++; } - bool operator<(ScheduledAction const& b) const { + bool operator<(ScheduledFunc const& b) const { if (time == b.time) return ordinal < b.ordinal; return time < b.time; } }; - std::priority_queue scheduledActions_; + std::priority_queue scheduledFuncs_; TimePoint now_ = now_.min(); }; diff --git a/folly/wangle/QueuedImmediateExecutor.cpp b/folly/wangle/QueuedImmediateExecutor.cpp index 469d0296..739ba5ef 100644 --- a/folly/wangle/QueuedImmediateExecutor.cpp +++ b/folly/wangle/QueuedImmediateExecutor.cpp @@ -20,9 +20,8 @@ namespace folly { namespace wangle { -void QueuedImmediateExecutor::add(Action&& callback) -{ - thread_local std::queue q; +void QueuedImmediateExecutor::add(Func callback) { + thread_local std::queue q; if (q.empty()) { q.push(std::move(callback)); diff --git a/folly/wangle/QueuedImmediateExecutor.h b/folly/wangle/QueuedImmediateExecutor.h index 46f14d4d..a4c4985d 100644 --- a/folly/wangle/QueuedImmediateExecutor.h +++ b/folly/wangle/QueuedImmediateExecutor.h @@ -27,7 +27,7 @@ namespace folly { namespace wangle { */ class QueuedImmediateExecutor : public Executor { public: - void add(Action&&) override; + void add(Func) override; }; }} // namespace diff --git a/folly/wangle/ScheduledExecutor.h b/folly/wangle/ScheduledExecutor.h new file mode 100644 index 00000000..fe5f6f12 --- /dev/null +++ b/folly/wangle/ScheduledExecutor.h @@ -0,0 +1,57 @@ +/* + * Copyright 2014 Facebook, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#pragma once + +#include +#include +#include +#include + +namespace folly { namespace wangle { + // An executor that supports timed scheduling. Like RxScheduler. + class ScheduledExecutor : public Executor { + public: + // Reality is that better than millisecond resolution is very hard to + // achieve. However, we reserve the right to be incredible. + typedef std::chrono::microseconds Duration; + typedef std::chrono::steady_clock::time_point TimePoint; + + virtual ~ScheduledExecutor() = default; + + virtual void add(Func) override = 0; + + /// Alias for add() (for Rx consistency) + void schedule(Func&& a) { add(std::move(a)); } + + /// Schedule a Func to be executed after dur time has elapsed + /// Expect millisecond resolution at best. + void schedule(Func&& a, Duration const& dur) { + scheduleAt(std::move(a), now() + dur); + } + + /// Schedule a Func to be executed at time t, or as soon afterward as + /// possible. Expect millisecond resolution at best. Must be threadsafe. + virtual void scheduleAt(Func&& a, TimePoint const& t) { + throw std::logic_error("unimplemented"); + } + + /// Get this executor's notion of time. Must be threadsafe. + virtual TimePoint now() { + return std::chrono::steady_clock::now(); + } + }; +}}