TEST serial_executor_test SOURCES SerialExecutorTest.cpp
TEST thread_pool_executor_test SOURCES ThreadPoolExecutorTest.cpp
TEST threaded_executor_test SOURCES ThreadedExecutorTest.cpp
+ TEST timed_drivable_executor_test SOURCES TimedDrivableExecutorTest.cpp
DIRECTORY executors/task_queue/test/
TEST unbounded_blocking_queue_test SOURCES UnboundedBlockingQueueTest.cpp
executors/SerialExecutor.h \
executors/ThreadPoolExecutor.h \
executors/ThreadedExecutor.h \
+ executors/TimedDrivableExecutor.h \
executors/task_queue/BlockingQueue.h \
executors/task_queue/LifoSemMPMCQueue.h \
executors/task_queue/PriorityLifoSemMPMCQueue.h \
executors/SerialExecutor.cpp \
executors/ThreadPoolExecutor.cpp \
executors/ThreadedExecutor.cpp \
+ executors/TimedDrivableExecutor.cpp \
executors/QueuedImmediateExecutor.cpp \
experimental/hazptr/hazptr.cpp \
experimental/hazptr/memory_resource.cpp \
--- /dev/null
+/*
+ * Copyright 2018-present Facebook, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include <folly/executors/TimedDrivableExecutor.h>
+
+#include <cstring>
+#include <ctime>
+#include <string>
+#include <tuple>
+
+namespace folly {
+
+void TimedDrivableExecutor::add(Func callback) {
+ queue_.enqueue(std::move(callback));
+}
+
+void TimedDrivableExecutor::drive() {
+ wait();
+ run();
+}
+
+size_t TimedDrivableExecutor::run() {
+ size_t count = 0;
+ size_t n = queue_.size();
+
+ // If we have waited already, then func_ may have a value
+ if (func_) {
+ auto f = std::move(func_);
+ f();
+ count = 1;
+ }
+
+ while (count < n && queue_.try_dequeue(func_)) {
+ auto f = std::move(func_);
+ f();
+ ++count;
+ }
+
+ return count;
+}
+
+size_t TimedDrivableExecutor::drain() {
+ size_t tasksRun = 0;
+ size_t tasksForSingleRun = 0;
+ while ((tasksForSingleRun = run()) != 0) {
+ tasksRun += tasksForSingleRun;
+ }
+ return tasksRun;
+}
+
+void TimedDrivableExecutor::wait() {
+ if (!func_) {
+ queue_.dequeue(func_);
+ }
+}
+
+} // namespace folly
--- /dev/null
+/*
+ * Copyright 2018-present Facebook, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#pragma once
+
+#include <chrono>
+
+#include <folly/concurrency/UnboundedQueue.h>
+#include <folly/executors/DrivableExecutor.h>
+
+namespace folly {
+
+/*
+ * A DrivableExecutor can be driven via its drive() method or its driveUntil()
+ * that drives until some time point.
+ */
+class TimedDrivableExecutor : public DrivableExecutor {
+ public:
+ /// Implements DrivableExecutor
+ void drive() override;
+
+ // Make progress if there is work to do and return true. Otherwise return
+ // false.
+ bool try_drive() {
+ return try_wait() && run() > 0;
+ }
+
+ // Make progress on this Executor's work. Acts as drive, except it will only
+ // wait for a period of timeout for work to be enqueued. If no work is
+ // enqueued by that point, it will return.
+ template <typename Rep, typename Period>
+ bool try_drive_for(const std::chrono::duration<Rep, Period>& timeout) {
+ return try_wait_for(timeout) && run() > 0;
+ }
+
+ // Make progress on this Executor's work. Acts as drive, except it will only
+ // wait until deadline for work to be enqueued. If no work is enqueued by
+ // that point, it will return.
+ template <typename Clock, typename Duration>
+ bool try_drive_until(
+ const std::chrono::time_point<Clock, Duration>& deadline) {
+ return try_wait_until(deadline) && run() > 0;
+ }
+
+ void add(Func) override;
+
+ /// Do work. Returns the number of functions that were executed (maybe 0).
+ /// Non-blocking, in the sense that we don't wait for work (we can't
+ /// control whether one of the functions blocks).
+ /// This is stable, it will not chase an ever-increasing tail of work.
+ /// This also means, there may be more work available to perform at the
+ /// moment that this returns.
+ size_t run();
+
+ // Do work until there is no more work to do.
+ // Returns the number of functions that were executed (maybe 0).
+ // Unlike run, this method is not stable. It will chase an infinite tail of
+ // work so should be used with care.
+ // There will be no work available to perform at the moment that this
+ // returns.
+ size_t drain();
+
+ /// Wait for work to do.
+ void wait();
+
+ // Return true if there is work to do, false otherwise
+ bool try_wait() {
+ return func_ || queue_.try_dequeue(func_);
+ }
+
+ /// Wait for work to do or for a period of timeout, whichever is sooner.
+ template <typename Rep, typename Period>
+ bool try_wait_for(const std::chrono::duration<Rep, Period>& timeout) {
+ return func_ || queue_.try_dequeue_for(func_, timeout);
+ }
+
+ /// Wait for work to do or until deadline passes, whichever is sooner.
+ template <typename Clock, typename Duration>
+ bool try_wait_until(
+ const std::chrono::time_point<Clock, Duration>& deadline) {
+ return func_ || queue_.try_dequeue_until(func_, deadline);
+ }
+
+ private:
+ UMPSCQueue<Func, true> queue_;
+ Func func_;
+};
+
+} // namespace folly
--- /dev/null
+/*
+ * Copyright 2018-present Facebook, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include <folly/executors/TimedDrivableExecutor.h>
+
+#include <folly/futures/Future.h>
+#include <folly/portability/GTest.h>
+#include <folly/synchronization/Baton.h>
+
+using namespace folly;
+
+TEST(TimedDrivableExecutor, runIsStable) {
+ TimedDrivableExecutor x;
+ size_t count = 0;
+ auto f1 = [&]() { count++; };
+ auto f2 = [&]() {
+ x.add(f1);
+ x.add(f1);
+ };
+ x.add(f2);
+ x.run();
+ EXPECT_EQ(count, 0);
+}
+
+TEST(TimedDrivableExecutor, drainIsNotStable) {
+ TimedDrivableExecutor x;
+ size_t count = 0;
+ auto f1 = [&]() { count++; };
+ auto f2 = [&]() {
+ x.add(f1);
+ x.add(f1);
+ };
+ x.add(f2);
+ x.drain();
+ EXPECT_EQ(count, 2);
+}
+
+TEST(TimedDrivableExecutor, try_drive) {
+ TimedDrivableExecutor x;
+ size_t count = 0;
+ auto f1 = [&]() { count++; };
+ x.try_drive();
+ EXPECT_EQ(count, 0);
+ x.add(f1);
+ x.try_drive();
+ EXPECT_EQ(count, 1);
+}
+
+TEST(TimedDrivableExecutor, try_drive_for) {
+ TimedDrivableExecutor x;
+ size_t count = 0;
+ auto f1 = [&]() { count++; };
+ x.try_drive_for(std::chrono::milliseconds(100));
+ EXPECT_EQ(count, 0);
+ x.add(f1);
+ x.try_drive_for(std::chrono::milliseconds(100));
+ EXPECT_EQ(count, 1);
+}
+
+TEST(TimedDrivableExecutor, try_drive_until) {
+ TimedDrivableExecutor x;
+ size_t count = 0;
+ auto f1 = [&]() { count++; };
+ x.try_drive_until(
+ std::chrono::system_clock::now() + std::chrono::milliseconds(100));
+ EXPECT_EQ(count, 0);
+ x.add(f1);
+ x.try_drive_until(
+ std::chrono::system_clock::now() + std::chrono::milliseconds(100));
+ EXPECT_EQ(count, 1);
+}