via with priority
authorJames Sedgwick <jsedgwick@fb.com>
Thu, 14 May 2015 00:45:10 +0000 (17:45 -0700)
committerViswanath Sivakumar <viswanath@fb.com>
Wed, 20 May 2015 17:57:06 +0000 (10:57 -0700)
Summary:
I wish I could just have an add(Func, priority) but the damned overloaded virtual warnings become a nightmare, so it's addWithPriority.
I also switched priority to a uint8_t in the hopes of reducing Core size. Turns out std::atomic<uint8_t> is 8 bytes anyways :( I left it that way because come on you really shouldn't be using > 256 priorities.
Biggest problem is the data race with the two atomics executor_ and priority_. Should we just use a microspinlock to co-protect them? Could probably save some size from the atomics that way.

Test Plan: unit

Reviewed By: hans@fb.com

Subscribers: hannesr, fugalh, folly-diffs@, jsedgwick, yfeldblum, chalfant

FB internal diff: D2039619

Tasks: 6928162

Signature: t1:2039619:1431551266:3b31ed2329301aaa9c32f0f41b6e61f3482d570e

folly/Executor.h
folly/futures/Future-inl.h
folly/futures/Future.h
folly/futures/detail/Core.h
folly/futures/helpers.h
folly/futures/test/ViaTest.cpp
folly/wangle/concurrent/BlockingQueue.h
folly/wangle/concurrent/CPUThreadPoolExecutor.cpp
folly/wangle/concurrent/CPUThreadPoolExecutor.h
folly/wangle/concurrent/PriorityLifoSemMPMCQueue.h
folly/wangle/concurrent/test/ThreadPoolExecutorTest.cpp

index 175a7b09e0d6efec9b2ffaf7b5464117ba7094e3..c790045985885558b4b13c79995663ee825f33f7 100644 (file)
@@ -17,6 +17,7 @@
 #pragma once
 
 #include <atomic>
+#include <climits>
 #include <functional>
 
 namespace folly {
@@ -33,6 +34,21 @@ class Executor {
   /// variants must be threadsafe.
   virtual void add(Func) = 0;
 
+  /// Enqueue a function with a given priority, where 0 is the medium priority
+  /// This is up to the implementation to enforce
+  virtual void addWithPriority(Func, int8_t priority) {
+    throw std::runtime_error(
+        "addWithPriority() is not implemented for this Executor");
+  }
+
+  virtual uint8_t getNumPriorities() const {
+    return 1;
+  }
+
+  static const int8_t LO_PRI  = SCHAR_MIN;
+  static const int8_t MID_PRI = 0;
+  static const int8_t HI_PRI  = SCHAR_MAX;
+
   /// A convenience function for shared_ptr to legacy functors.
   ///
   /// Sometimes you have a functor that is move-only, and therefore can't be
index 003c9c896b62955a8c45bb7eff8b25ad8ee842a2..28cb216f8ef8abb7f528ad109106084e5cabb8dc 100644 (file)
@@ -422,22 +422,22 @@ Optional<Try<T>> Future<T>::poll() {
 }
 
 template <class T>
-inline Future<T> Future<T>::via(Executor* executor) && {
+inline Future<T> Future<T>::via(Executor* executor, int8_t priority) && {
   throwIfInvalid();
 
-  setExecutor(executor);
+  setExecutor(executor, priority);
 
   return std::move(*this);
 }
 
 template <class T>
-inline Future<T> Future<T>::via(Executor* executor) & {
+inline Future<T> Future<T>::via(Executor* executor, int8_t priority) & {
   throwIfInvalid();
 
   MoveWrapper<Promise<T>> p;
   auto f = p->getFuture();
   then([p](Try<T>&& t) mutable { p->setTry(std::move(t)); });
-  return std::move(f).via(executor);
+  return std::move(f).via(executor, priority);
 }
 
 template <class T>
@@ -526,8 +526,8 @@ inline Future<void> makeFuture(Try<void>&& t) {
 }
 
 // via
-inline Future<void> via(Executor* executor) {
-  return makeFuture().via(executor);
+Future<void> via(Executor* executor, int8_t priority) {
+  return makeFuture().via(executor, priority);
 }
 
 // mapSetCallback calls func(i, Try<T>) when every future completes
index d0824aa8a7b35b01cc88f1772efa05f19685d811..ccea88b3389a94808204607d3adf27ca83567cb4 100644 (file)
@@ -97,12 +97,16 @@ class Future {
   // The ref-qualifier allows for `this` to be moved out so we
   // don't get access-after-free situations in chaining.
   // https://akrzemi1.wordpress.com/2014/06/02/ref-qualifiers/
-  inline Future<T> via(Executor* executor) &&;
+  inline Future<T> via(
+      Executor* executor,
+      int8_t priority = Executor::MID_PRI) &&;
 
   /// This variant creates a new future, where the ref-qualifier && version
   /// moves `this` out. This one is less efficient but avoids confusing users
   /// when "return f.via(x);" fails.
-  inline Future<T> via(Executor* executor) &;
+  inline Future<T> via(
+      Executor* executor,
+      int8_t priority = Executor::MID_PRI) &;
 
   /** True when the result (or exception) is ready. */
   bool isReady() const;
@@ -405,7 +409,9 @@ class Future {
   thenImplementation(F func, detail::argResult<isTry, F, Args...>);
 
   Executor* getExecutor() { return core_->getExecutor(); }
-  void setExecutor(Executor* x) { core_->setExecutor(x); }
+  void setExecutor(Executor* x, int8_t priority = Executor::MID_PRI) {
+    core_->setExecutor(x, priority);
+  }
 };
 
 } // folly
index 65e2cb1d76d141f0dbc9f345fda92169dd7be02a..3729b622d75e73c4b9ac83b1a59ff14523c8d723 100644 (file)
@@ -227,17 +227,20 @@ class Core {
   bool isActive() { return active_; }
 
   /// Call only from Future thread
-  void setExecutor(Executor* x) {
+  void setExecutor(Executor* x, int8_t priority) {
+    folly::MSLGuard g(executorLock_);
     executor_ = x;
+    priority_ = priority;
   }
 
   Executor* getExecutor() {
+    folly::MSLGuard g(executorLock_);
     return executor_;
   }
 
   /// Call only from Future thread
   void raise(exception_wrapper e) {
-    std::lock_guard<decltype(interruptLock_)> guard(interruptLock_);
+    folly::MSLGuard guard(interruptLock_);
     if (!interrupt_ && !hasResult()) {
       interrupt_ = folly::make_unique<exception_wrapper>(std::move(e));
       if (interruptHandler_) {
@@ -248,7 +251,7 @@ class Core {
 
   /// Call only from Promise thread
   void setInterruptHandler(std::function<void(exception_wrapper const&)> fn) {
-    std::lock_guard<decltype(interruptLock_)> guard(interruptLock_);
+    folly::MSLGuard guard(interruptLock_);
     if (!hasResult()) {
       if (interrupt_) {
         fn(*interrupt_);
@@ -277,14 +280,28 @@ class Core {
     RequestContext::setContext(context_);
 
     // TODO(6115514) semantic race on reading executor_ and setExecutor()
-    Executor* x = executor_;
+    Executor* x;
+    int8_t priority;
+    {
+      folly::MSLGuard g(executorLock_);
+      x = executor_;
+      priority = priority_;
+    }
+
     if (x) {
       ++attached_; // keep Core alive until executor did its thing
       try {
-        x->add([this]() mutable {
-          SCOPE_EXIT { detachOne(); };
-          callback_(std::move(*result_));
-        });
+        if (LIKELY(x->getNumPriorities() == 1)) {
+          x->add([this]() mutable {
+            SCOPE_EXIT { detachOne(); };
+            callback_(std::move(*result_));
+          });
+        } else {
+          x->addWithPriority([this]() mutable {
+            SCOPE_EXIT { detachOne(); };
+            callback_(std::move(*result_));
+          }, priority);
+        }
       } catch (...) {
         result_ = Try<T>(exception_wrapper(std::current_exception()));
         callback_(std::move(*result_));
@@ -307,12 +324,14 @@ class Core {
   std::atomic<unsigned char> attached_ {2};
   std::atomic<bool> active_ {true};
   folly::MicroSpinLock interruptLock_ {0};
+  folly::MicroSpinLock executorLock_ {0};
+  int8_t priority_ {-1};
+  Executor* executor_ {nullptr};
   folly::Optional<Try<T>> result_ {};
   std::function<void(Try<T>&&)> callback_ {nullptr};
   static constexpr size_t lambdaBufSize = 8 * sizeof(void*);
   char lambdaBuf_[lambdaBufSize];
   std::shared_ptr<RequestContext> context_ {nullptr};
-  std::atomic<Executor*> executor_ {nullptr};
   std::unique_ptr<exception_wrapper> interrupt_ {};
   std::function<void(exception_wrapper const&)> interruptHandler_ {nullptr};
 };
index 744a01aca40b6110cf8172bd54d10ae35b8ed052..c74564111da8bbda230ef3b8a0b484cf5e445db8 100644 (file)
@@ -128,10 +128,14 @@ Future<T> makeFuture(Try<T>&& t);
  * This is just syntactic sugar for makeFuture().via(executor)
  *
  * @param executor the Executor to call back on
+ * @param priority optionally, the priority to add with. Defaults to 0 which
+ * represents medium priority.
  *
  * @returns a void Future that will call back on the given executor
  */
-inline Future<void> via(Executor* executor);
+inline Future<void> via(
+    Executor* executor,
+    int8_t priority = Executor::MID_PRI);
 
 /** When all the input Futures complete, the returned Future will complete.
   Errors do not cause early termination; this Future will always succeed
index 3ade66790e2b98642ab047c28219e2859041e28c..061dc4c4b34de965cdf6f3de0852f61a8e2cf77d 100644 (file)
@@ -185,6 +185,49 @@ TEST(Via, chain3) {
   EXPECT_EQ(3, count);
 }
 
+struct PriorityExecutor : public Executor {
+  void add(Func f) override {}
+
+  void addWithPriority(Func, int8_t priority) override {
+    int mid = getNumPriorities() / 2;
+    int p = priority < 0 ?
+            std::max(0, mid + priority) :
+            std::min(getNumPriorities() - 1, mid + priority);
+    EXPECT_LT(p, 3);
+    EXPECT_GE(p, 0);
+    if (p == 0) {
+      count0++;
+    } else if (p == 1) {
+      count1++;
+    } else if (p == 2) {
+      count2++;
+    }
+  }
+
+  uint8_t getNumPriorities() const override {
+    return 3;
+  }
+
+  int count0{0};
+  int count1{0};
+  int count2{0};
+};
+
+TEST(Via, priority) {
+  PriorityExecutor exe;
+  via(&exe, -1).then([]{});
+  via(&exe, 0).then([]{});
+  via(&exe, 1).then([]{});
+  via(&exe, 42).then([]{});  // overflow should go to max priority
+  via(&exe, -42).then([]{}); // underflow should go to min priority
+  via(&exe).then([]{});      // default to mid priority
+  via(&exe, Executor::LO_PRI).then([]{});
+  via(&exe, Executor::HI_PRI).then([]{});
+  EXPECT_EQ(3, exe.count0);
+  EXPECT_EQ(2, exe.count1);
+  EXPECT_EQ(3, exe.count2);
+}
+
 TEST(Via, then2) {
   ManualExecutor x1, x2;
   bool a = false, b = false, c = false;
index 14d9c6a5c65c43fe08c614b32a554be45acead8f..ebfdc18d5a4c4c32b76fa399639fd31e559dbc13 100644 (file)
@@ -25,14 +25,12 @@ class BlockingQueue {
  public:
   virtual ~BlockingQueue() {}
   virtual void add(T item) = 0;
-  virtual void addWithPriority(T item, uint32_t priority) {
+  virtual void addWithPriority(T item, int8_t priority) {
     LOG_FIRST_N(WARNING, 1) <<
       "add(item, priority) called on a non-priority queue";
     add(std::move(item));
   }
-  virtual uint32_t getNumPriorities() {
-    LOG_FIRST_N(WARNING, 1) <<
-      "getNumPriorities() called on a non-priority queue";
+  virtual uint8_t getNumPriorities() {
     return 1;
   }
   virtual T take() = 0;
index fcc835cda400b136d2c32517d62ec7374f113796..864bd3a1dfb112d685098aad95fb44a03990048a 100644 (file)
@@ -20,7 +20,6 @@
 namespace folly { namespace wangle {
 
 const size_t CPUThreadPoolExecutor::kDefaultMaxQueueSize = 1 << 14;
-const size_t CPUThreadPoolExecutor::kDefaultNumPriorities = 2;
 
 CPUThreadPoolExecutor::CPUThreadPoolExecutor(
     size_t numThreads,
@@ -48,7 +47,7 @@ CPUThreadPoolExecutor::CPUThreadPoolExecutor(size_t numThreads)
 
 CPUThreadPoolExecutor::CPUThreadPoolExecutor(
     size_t numThreads,
-    uint32_t numPriorities,
+    int8_t numPriorities,
     std::shared_ptr<ThreadFactory> threadFactory)
     : CPUThreadPoolExecutor(
           numThreads,
@@ -59,7 +58,7 @@ CPUThreadPoolExecutor::CPUThreadPoolExecutor(
 
 CPUThreadPoolExecutor::CPUThreadPoolExecutor(
     size_t numThreads,
-    uint32_t numPriorities,
+    int8_t numPriorities,
     size_t maxQueueSize,
     std::shared_ptr<ThreadFactory> threadFactory)
     : CPUThreadPoolExecutor(
@@ -87,22 +86,22 @@ void CPUThreadPoolExecutor::add(
       CPUTask(std::move(func), expiration, std::move(expireCallback)));
 }
 
-void CPUThreadPoolExecutor::add(Func func, uint32_t priority) {
+void CPUThreadPoolExecutor::addWithPriority(Func func, int8_t priority) {
   add(std::move(func), priority, std::chrono::milliseconds(0));
 }
 
 void CPUThreadPoolExecutor::add(
     Func func,
-    uint32_t priority,
+    int8_t priority,
     std::chrono::milliseconds expiration,
     Func expireCallback) {
-  CHECK(priority < getNumPriorities());
+  CHECK(getNumPriorities() > 0);
   taskQueue_->addWithPriority(
       CPUTask(std::move(func), expiration, std::move(expireCallback)),
       priority);
 }
 
-uint32_t CPUThreadPoolExecutor::getNumPriorities() const {
+uint8_t CPUThreadPoolExecutor::getNumPriorities() const {
   return taskQueue_->getNumPriorities();
 }
 
@@ -142,7 +141,7 @@ void CPUThreadPoolExecutor::stopThreads(size_t n) {
   CHECK(stoppedThreads_.size() == 0);
   threadsToStop_ = n;
   for (size_t i = 0; i < n; i++) {
-    taskQueue_->add(CPUTask());
+    taskQueue_->addWithPriority(CPUTask(), Executor::LO_PRI);
   }
 }
 
index 56833e22abd97d7fd8ee28f20e4b4425106a5096..7b85ae1f8cfbfbc7571cd2559898e4f15a55e701 100644 (file)
@@ -24,27 +24,26 @@ class CPUThreadPoolExecutor : public ThreadPoolExecutor {
  public:
   struct CPUTask;
 
-  explicit CPUThreadPoolExecutor(
+  CPUThreadPoolExecutor(
       size_t numThreads,
       std::unique_ptr<BlockingQueue<CPUTask>> taskQueue,
       std::shared_ptr<ThreadFactory> threadFactory =
           std::make_shared<NamedThreadFactory>("CPUThreadPool"));
 
   explicit CPUThreadPoolExecutor(size_t numThreads);
-
-  explicit CPUThreadPoolExecutor(
+CPUThreadPoolExecutor(
       size_t numThreads,
       std::shared_ptr<ThreadFactory> threadFactory);
 
-  explicit CPUThreadPoolExecutor(
+  CPUThreadPoolExecutor(
       size_t numThreads,
-      uint32_t numPriorities,
+      int8_t numPriorities,
       std::shared_ptr<ThreadFactory> threadFactory =
           std::make_shared<NamedThreadFactory>("CPUThreadPool"));
 
-  explicit CPUThreadPoolExecutor(
+  CPUThreadPoolExecutor(
       size_t numThreads,
-      uint32_t numPriorities,
+      int8_t numPriorities,
       size_t maxQueueSize,
       std::shared_ptr<ThreadFactory> threadFactory =
           std::make_shared<NamedThreadFactory>("CPUThreadPool"));
@@ -57,14 +56,14 @@ class CPUThreadPoolExecutor : public ThreadPoolExecutor {
       std::chrono::milliseconds expiration,
       Func expireCallback = nullptr) override;
 
-  void add(Func func, uint32_t priority);
+  void addWithPriority(Func func, int8_t priority) override;
   void add(
       Func func,
-      uint32_t priority,
+      int8_t priority,
       std::chrono::milliseconds expiration,
       Func expireCallback = nullptr);
 
-  uint32_t getNumPriorities() const;
+  uint8_t getNumPriorities() const override;
 
   struct CPUTask : public ThreadPoolExecutor::Task {
     // Must be noexcept move constructible so it can be used in MPMCQueue
@@ -84,7 +83,6 @@ class CPUThreadPoolExecutor : public ThreadPoolExecutor {
   };
 
   static const size_t kDefaultMaxQueueSize;
-  static const size_t kDefaultNumPriorities;
 
  protected:
   BlockingQueue<CPUTask>* getTaskQueue();
index 0e4847154ec5625ac6a0b2ee0c5e9ff19b4424c2..583a9a34b34001a9c9718e674e9d617c1caead55 100644 (file)
@@ -24,26 +24,29 @@ namespace folly { namespace wangle {
 template <class T>
 class PriorityLifoSemMPMCQueue : public BlockingQueue<T> {
  public:
-  explicit PriorityLifoSemMPMCQueue(uint32_t numPriorities, size_t capacity) {
-    CHECK(numPriorities > 0);
+  explicit PriorityLifoSemMPMCQueue(uint8_t numPriorities, size_t capacity) {
     queues_.reserve(numPriorities);
-    for (uint32_t i = 0; i < numPriorities; i++) {
+    for (int8_t i = 0; i < numPriorities; i++) {
       queues_.push_back(MPMCQueue<T>(capacity));
     }
   }
 
-  uint32_t getNumPriorities() override {
+  uint8_t getNumPriorities() override {
     return queues_.size();
   }
 
-  // Add at lowest priority by default
+  // Add at medium priority by default
   void add(T item) override {
-    addWithPriority(std::move(item), 0);
+    addWithPriority(std::move(item), Executor::MID_PRI);
   }
 
-  void addWithPriority(T item, uint32_t priority) override {
-    CHECK(priority < queues_.size());
-    if (!queues_[priority].write(std::move(item))) {
+  void addWithPriority(T item, int8_t priority) override {
+    int mid = getNumPriorities() / 2;
+    size_t queue = priority < 0 ?
+                   std::max(0, mid + priority) :
+                   std::min(getNumPriorities() - 1, mid + priority);
+    CHECK(queue < queues_.size());
+    if (!queues_[queue].write(std::move(item))) {
       throw std::runtime_error("LifoSemMPMCQueue full, can't add item");
     }
     sem_.post();
index d3fca8c4ecccabeb103f7dfe057263a00af40d56..8a6fcc023bc04a7f40959a3930fc12c6eb5204d4 100644 (file)
@@ -310,10 +310,10 @@ TEST(ThreadPoolExecutorTest, PriorityPreemptionTest) {
   };
   CPUThreadPoolExecutor pool(0, 2);
   for (int i = 0; i < 50; i++) {
-    pool.add(lopri, 0);
+    pool.addWithPriority(lopri, Executor::LO_PRI);
   }
   for (int i = 0; i < 50; i++) {
-    pool.add(hipri, 1);
+    pool.addWithPriority(hipri, Executor::HI_PRI);
   }
   pool.setNumThreads(1);
   pool.join();
@@ -372,3 +372,24 @@ TEST(ThreadPoolExecutorTest, CPUObserver) {
 
   observer->checkCalls();
 }
+
+TEST(ThreadPoolExecutorTest, AddWithPriority) {
+  std::atomic_int c{0};
+  auto f = [&]{ c++; };
+
+  // IO exe doesn't support priorities
+  IOThreadPoolExecutor ioExe(10);
+  EXPECT_THROW(ioExe.addWithPriority(f, 0), std::runtime_error);
+
+  CPUThreadPoolExecutor cpuExe(10, 3);
+  cpuExe.addWithPriority(f, -1);
+  cpuExe.addWithPriority(f, 0);
+  cpuExe.addWithPriority(f, 1);
+  cpuExe.addWithPriority(f, -2); // will add at the lowest priority
+  cpuExe.addWithPriority(f, 2);  // will add at the highest priority
+  cpuExe.addWithPriority(f, Executor::LO_PRI);
+  cpuExe.addWithPriority(f, Executor::HI_PRI);
+  cpuExe.join();
+
+  EXPECT_EQ(7, c);
+}