#pragma once
#include <boost/noncopyable.hpp>
-#include <functional>
#include <chrono>
+#include <functional>
+#include <memory>
namespace folly { namespace wangle {
- // Like an Rx Scheduler. We should probably rename it to match now that it
- // has scheduling semantics too, but that's a codemod for another lazy
- // summer afternoon.
+ /// An Executor accepts units of work with add(), which should be
+ /// threadsafe.
+ /// Like an Rx Scheduler. We should probably rename it to match now that it
+ /// has scheduling semantics too, but that's a codemod for another lazy
+ /// summer afternoon.
class Executor : boost::noncopyable {
public:
typedef std::function<void()> Action;
/// schedule variants must be threadsafe.
virtual void add(Action&&) = 0;
+ /// A convenience function for shared_ptr to legacy functors.
+ ///
+ /// Sometimes you have a functor that is move-only, and therefore can't be
+ /// converted to a std::function (e.g. std::packaged_task). In that case,
+ /// wrap it in a shared_ptr (or maybe folly::MoveWrapper) and use this.
+ template <class P>
+ void addPtr(P fn) {
+ this->add([fn]() mutable { (*fn)(); });
+ }
+
/// Alias for add() (for Rx consistency)
void schedule(Action&& a) { add(std::move(a)); }
namespace folly { namespace wangle {
+ /// When work is "queued", execute it immediately inline.
+ /// Usually when you think you want this, you actually want a
+ /// QueuedImmediateExecutor.
class InlineExecutor : public Executor {
public:
void add(std::function<void()>&& f) override {
--- /dev/null
+/*
+ * Copyright 2014 Facebook, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "QueuedImmediateExecutor.h"
+#include "folly/ThreadLocal.h"
+#include <queue>
+
+namespace folly { namespace wangle {
+
+void QueuedImmediateExecutor::add(Action&& callback)
+{
+ thread_local std::queue<Action> q;
+
+ if (q.empty()) {
+ q.push(std::move(callback));
+ while (!q.empty()) {
+ q.front()();
+ q.pop();
+ }
+ } else {
+ q.push(callback);
+ }
+}
+
+}} // namespace
--- /dev/null
+/*
+ * Copyright 2014 Facebook, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#pragma once
+
+#include "folly/wangle/Executor.h"
+
+namespace folly { namespace wangle {
+
+/**
+ * Runs inline like InlineExecutor, but with a queue so that any tasks added
+ * to this executor by one of its own callbacks will be queued instead of
+ * executed inline (nested). This is usually better behavior than Inline.
+ */
+class QueuedImmediateExecutor : public Executor {
+ public:
+ void add(Action&&) override;
+};
+
+}} // namespace
*/
#include <gtest/gtest.h>
+#include "folly/wangle/InlineExecutor.h"
#include "folly/wangle/ManualExecutor.h"
+#include "folly/wangle/QueuedImmediateExecutor.h"
-using namespace testing;
using namespace folly::wangle;
using namespace std::chrono;
+using namespace testing;
TEST(ManualExecutor, runIsStable) {
ManualExecutor x;
x.advance(microseconds(-1));
EXPECT_EQ(count, 0);
}
+
+TEST(Executor, InlineExecutor) {
+ InlineExecutor x;
+ size_t counter = 0;
+ x.add([&]{
+ x.add([&]{
+ EXPECT_EQ(counter++, 0);
+ });
+ EXPECT_EQ(counter++, 1);
+ });
+ EXPECT_EQ(counter, 2);
+}
+
+TEST(Executor, QueuedImmediateExecutor) {
+ QueuedImmediateExecutor x;
+ size_t counter = 0;
+ x.add([&]{
+ x.add([&]{
+ EXPECT_EQ(1, counter++);
+ });
+ EXPECT_EQ(0, counter++);
+ });
+ EXPECT_EQ(2, counter);
+}
+
+TEST(Executor, Runnable) {
+ InlineExecutor x;
+ size_t counter = 0;
+ struct Runnable {
+ std::function<void()> fn;
+ void operator()() { fn(); }
+ };
+ Runnable f;
+ f.fn = [&]{ counter++; };
+ x.add(f);
+ EXPECT_EQ(counter, 1);
+}
+
+TEST(Executor, RunnablePtr) {
+ InlineExecutor x;
+ struct Runnable {
+ std::function<void()> fn;
+ void operator()() { fn(); }
+ };
+ size_t counter = 0;
+ auto fnp = std::make_shared<Runnable>();
+ fnp->fn = [&]{ counter++; };
+ x.addPtr(fnp);
+ EXPECT_EQ(counter, 1);
+}