#pragma once
#include <atomic>
+#include <climits>
#include <functional>
namespace folly {
/// variants must be threadsafe.
virtual void add(Func) = 0;
+ /// Enqueue a function with a given priority, where 0 is the medium priority
+ /// This is up to the implementation to enforce
+ virtual void addWithPriority(Func, int8_t priority) {
+ throw std::runtime_error(
+ "addWithPriority() is not implemented for this Executor");
+ }
+
+ virtual uint8_t getNumPriorities() const {
+ return 1;
+ }
+
+ static const int8_t LO_PRI = SCHAR_MIN;
+ static const int8_t MID_PRI = 0;
+ static const int8_t HI_PRI = SCHAR_MAX;
+
/// A convenience function for shared_ptr to legacy functors.
///
/// Sometimes you have a functor that is move-only, and therefore can't be
}
template <class T>
-inline Future<T> Future<T>::via(Executor* executor) && {
+inline Future<T> Future<T>::via(Executor* executor, int8_t priority) && {
throwIfInvalid();
- setExecutor(executor);
+ setExecutor(executor, priority);
return std::move(*this);
}
template <class T>
-inline Future<T> Future<T>::via(Executor* executor) & {
+inline Future<T> Future<T>::via(Executor* executor, int8_t priority) & {
throwIfInvalid();
MoveWrapper<Promise<T>> p;
auto f = p->getFuture();
then([p](Try<T>&& t) mutable { p->setTry(std::move(t)); });
- return std::move(f).via(executor);
+ return std::move(f).via(executor, priority);
}
template <class T>
}
// via
-inline Future<void> via(Executor* executor) {
- return makeFuture().via(executor);
+Future<void> via(Executor* executor, int8_t priority) {
+ return makeFuture().via(executor, priority);
}
// mapSetCallback calls func(i, Try<T>) when every future completes
// The ref-qualifier allows for `this` to be moved out so we
// don't get access-after-free situations in chaining.
// https://akrzemi1.wordpress.com/2014/06/02/ref-qualifiers/
- inline Future<T> via(Executor* executor) &&;
+ inline Future<T> via(
+ Executor* executor,
+ int8_t priority = Executor::MID_PRI) &&;
/// This variant creates a new future, where the ref-qualifier && version
/// moves `this` out. This one is less efficient but avoids confusing users
/// when "return f.via(x);" fails.
- inline Future<T> via(Executor* executor) &;
+ inline Future<T> via(
+ Executor* executor,
+ int8_t priority = Executor::MID_PRI) &;
/** True when the result (or exception) is ready. */
bool isReady() const;
thenImplementation(F func, detail::argResult<isTry, F, Args...>);
Executor* getExecutor() { return core_->getExecutor(); }
- void setExecutor(Executor* x) { core_->setExecutor(x); }
+ void setExecutor(Executor* x, int8_t priority = Executor::MID_PRI) {
+ core_->setExecutor(x, priority);
+ }
};
} // folly
bool isActive() { return active_; }
/// Call only from Future thread
- void setExecutor(Executor* x) {
+ void setExecutor(Executor* x, int8_t priority) {
+ folly::MSLGuard g(executorLock_);
executor_ = x;
+ priority_ = priority;
}
Executor* getExecutor() {
+ folly::MSLGuard g(executorLock_);
return executor_;
}
/// Call only from Future thread
void raise(exception_wrapper e) {
- std::lock_guard<decltype(interruptLock_)> guard(interruptLock_);
+ folly::MSLGuard guard(interruptLock_);
if (!interrupt_ && !hasResult()) {
interrupt_ = folly::make_unique<exception_wrapper>(std::move(e));
if (interruptHandler_) {
/// Call only from Promise thread
void setInterruptHandler(std::function<void(exception_wrapper const&)> fn) {
- std::lock_guard<decltype(interruptLock_)> guard(interruptLock_);
+ folly::MSLGuard guard(interruptLock_);
if (!hasResult()) {
if (interrupt_) {
fn(*interrupt_);
RequestContext::setContext(context_);
// TODO(6115514) semantic race on reading executor_ and setExecutor()
- Executor* x = executor_;
+ Executor* x;
+ int8_t priority;
+ {
+ folly::MSLGuard g(executorLock_);
+ x = executor_;
+ priority = priority_;
+ }
+
if (x) {
++attached_; // keep Core alive until executor did its thing
try {
- x->add([this]() mutable {
- SCOPE_EXIT { detachOne(); };
- callback_(std::move(*result_));
- });
+ if (LIKELY(x->getNumPriorities() == 1)) {
+ x->add([this]() mutable {
+ SCOPE_EXIT { detachOne(); };
+ callback_(std::move(*result_));
+ });
+ } else {
+ x->addWithPriority([this]() mutable {
+ SCOPE_EXIT { detachOne(); };
+ callback_(std::move(*result_));
+ }, priority);
+ }
} catch (...) {
result_ = Try<T>(exception_wrapper(std::current_exception()));
callback_(std::move(*result_));
std::atomic<unsigned char> attached_ {2};
std::atomic<bool> active_ {true};
folly::MicroSpinLock interruptLock_ {0};
+ folly::MicroSpinLock executorLock_ {0};
+ int8_t priority_ {-1};
+ Executor* executor_ {nullptr};
folly::Optional<Try<T>> result_ {};
std::function<void(Try<T>&&)> callback_ {nullptr};
static constexpr size_t lambdaBufSize = 8 * sizeof(void*);
char lambdaBuf_[lambdaBufSize];
std::shared_ptr<RequestContext> context_ {nullptr};
- std::atomic<Executor*> executor_ {nullptr};
std::unique_ptr<exception_wrapper> interrupt_ {};
std::function<void(exception_wrapper const&)> interruptHandler_ {nullptr};
};
* This is just syntactic sugar for makeFuture().via(executor)
*
* @param executor the Executor to call back on
+ * @param priority optionally, the priority to add with. Defaults to 0 which
+ * represents medium priority.
*
* @returns a void Future that will call back on the given executor
*/
-inline Future<void> via(Executor* executor);
+inline Future<void> via(
+ Executor* executor,
+ int8_t priority = Executor::MID_PRI);
/** When all the input Futures complete, the returned Future will complete.
Errors do not cause early termination; this Future will always succeed
EXPECT_EQ(3, count);
}
+struct PriorityExecutor : public Executor {
+ void add(Func f) override {}
+
+ void addWithPriority(Func, int8_t priority) override {
+ int mid = getNumPriorities() / 2;
+ int p = priority < 0 ?
+ std::max(0, mid + priority) :
+ std::min(getNumPriorities() - 1, mid + priority);
+ EXPECT_LT(p, 3);
+ EXPECT_GE(p, 0);
+ if (p == 0) {
+ count0++;
+ } else if (p == 1) {
+ count1++;
+ } else if (p == 2) {
+ count2++;
+ }
+ }
+
+ uint8_t getNumPriorities() const override {
+ return 3;
+ }
+
+ int count0{0};
+ int count1{0};
+ int count2{0};
+};
+
+TEST(Via, priority) {
+ PriorityExecutor exe;
+ via(&exe, -1).then([]{});
+ via(&exe, 0).then([]{});
+ via(&exe, 1).then([]{});
+ via(&exe, 42).then([]{}); // overflow should go to max priority
+ via(&exe, -42).then([]{}); // underflow should go to min priority
+ via(&exe).then([]{}); // default to mid priority
+ via(&exe, Executor::LO_PRI).then([]{});
+ via(&exe, Executor::HI_PRI).then([]{});
+ EXPECT_EQ(3, exe.count0);
+ EXPECT_EQ(2, exe.count1);
+ EXPECT_EQ(3, exe.count2);
+}
+
TEST(Via, then2) {
ManualExecutor x1, x2;
bool a = false, b = false, c = false;
public:
virtual ~BlockingQueue() {}
virtual void add(T item) = 0;
- virtual void addWithPriority(T item, uint32_t priority) {
+ virtual void addWithPriority(T item, int8_t priority) {
LOG_FIRST_N(WARNING, 1) <<
"add(item, priority) called on a non-priority queue";
add(std::move(item));
}
- virtual uint32_t getNumPriorities() {
- LOG_FIRST_N(WARNING, 1) <<
- "getNumPriorities() called on a non-priority queue";
+ virtual uint8_t getNumPriorities() {
return 1;
}
virtual T take() = 0;
namespace folly { namespace wangle {
const size_t CPUThreadPoolExecutor::kDefaultMaxQueueSize = 1 << 14;
-const size_t CPUThreadPoolExecutor::kDefaultNumPriorities = 2;
CPUThreadPoolExecutor::CPUThreadPoolExecutor(
size_t numThreads,
CPUThreadPoolExecutor::CPUThreadPoolExecutor(
size_t numThreads,
- uint32_t numPriorities,
+ int8_t numPriorities,
std::shared_ptr<ThreadFactory> threadFactory)
: CPUThreadPoolExecutor(
numThreads,
CPUThreadPoolExecutor::CPUThreadPoolExecutor(
size_t numThreads,
- uint32_t numPriorities,
+ int8_t numPriorities,
size_t maxQueueSize,
std::shared_ptr<ThreadFactory> threadFactory)
: CPUThreadPoolExecutor(
CPUTask(std::move(func), expiration, std::move(expireCallback)));
}
-void CPUThreadPoolExecutor::add(Func func, uint32_t priority) {
+void CPUThreadPoolExecutor::addWithPriority(Func func, int8_t priority) {
add(std::move(func), priority, std::chrono::milliseconds(0));
}
void CPUThreadPoolExecutor::add(
Func func,
- uint32_t priority,
+ int8_t priority,
std::chrono::milliseconds expiration,
Func expireCallback) {
- CHECK(priority < getNumPriorities());
+ CHECK(getNumPriorities() > 0);
taskQueue_->addWithPriority(
CPUTask(std::move(func), expiration, std::move(expireCallback)),
priority);
}
-uint32_t CPUThreadPoolExecutor::getNumPriorities() const {
+uint8_t CPUThreadPoolExecutor::getNumPriorities() const {
return taskQueue_->getNumPriorities();
}
CHECK(stoppedThreads_.size() == 0);
threadsToStop_ = n;
for (size_t i = 0; i < n; i++) {
- taskQueue_->add(CPUTask());
+ taskQueue_->addWithPriority(CPUTask(), Executor::LO_PRI);
}
}
public:
struct CPUTask;
- explicit CPUThreadPoolExecutor(
+ CPUThreadPoolExecutor(
size_t numThreads,
std::unique_ptr<BlockingQueue<CPUTask>> taskQueue,
std::shared_ptr<ThreadFactory> threadFactory =
std::make_shared<NamedThreadFactory>("CPUThreadPool"));
explicit CPUThreadPoolExecutor(size_t numThreads);
-
- explicit CPUThreadPoolExecutor(
+CPUThreadPoolExecutor(
size_t numThreads,
std::shared_ptr<ThreadFactory> threadFactory);
- explicit CPUThreadPoolExecutor(
+ CPUThreadPoolExecutor(
size_t numThreads,
- uint32_t numPriorities,
+ int8_t numPriorities,
std::shared_ptr<ThreadFactory> threadFactory =
std::make_shared<NamedThreadFactory>("CPUThreadPool"));
- explicit CPUThreadPoolExecutor(
+ CPUThreadPoolExecutor(
size_t numThreads,
- uint32_t numPriorities,
+ int8_t numPriorities,
size_t maxQueueSize,
std::shared_ptr<ThreadFactory> threadFactory =
std::make_shared<NamedThreadFactory>("CPUThreadPool"));
std::chrono::milliseconds expiration,
Func expireCallback = nullptr) override;
- void add(Func func, uint32_t priority);
+ void addWithPriority(Func func, int8_t priority) override;
void add(
Func func,
- uint32_t priority,
+ int8_t priority,
std::chrono::milliseconds expiration,
Func expireCallback = nullptr);
- uint32_t getNumPriorities() const;
+ uint8_t getNumPriorities() const override;
struct CPUTask : public ThreadPoolExecutor::Task {
// Must be noexcept move constructible so it can be used in MPMCQueue
};
static const size_t kDefaultMaxQueueSize;
- static const size_t kDefaultNumPriorities;
protected:
BlockingQueue<CPUTask>* getTaskQueue();
template <class T>
class PriorityLifoSemMPMCQueue : public BlockingQueue<T> {
public:
- explicit PriorityLifoSemMPMCQueue(uint32_t numPriorities, size_t capacity) {
- CHECK(numPriorities > 0);
+ explicit PriorityLifoSemMPMCQueue(uint8_t numPriorities, size_t capacity) {
queues_.reserve(numPriorities);
- for (uint32_t i = 0; i < numPriorities; i++) {
+ for (int8_t i = 0; i < numPriorities; i++) {
queues_.push_back(MPMCQueue<T>(capacity));
}
}
- uint32_t getNumPriorities() override {
+ uint8_t getNumPriorities() override {
return queues_.size();
}
- // Add at lowest priority by default
+ // Add at medium priority by default
void add(T item) override {
- addWithPriority(std::move(item), 0);
+ addWithPriority(std::move(item), Executor::MID_PRI);
}
- void addWithPriority(T item, uint32_t priority) override {
- CHECK(priority < queues_.size());
- if (!queues_[priority].write(std::move(item))) {
+ void addWithPriority(T item, int8_t priority) override {
+ int mid = getNumPriorities() / 2;
+ size_t queue = priority < 0 ?
+ std::max(0, mid + priority) :
+ std::min(getNumPriorities() - 1, mid + priority);
+ CHECK(queue < queues_.size());
+ if (!queues_[queue].write(std::move(item))) {
throw std::runtime_error("LifoSemMPMCQueue full, can't add item");
}
sem_.post();
};
CPUThreadPoolExecutor pool(0, 2);
for (int i = 0; i < 50; i++) {
- pool.add(lopri, 0);
+ pool.addWithPriority(lopri, Executor::LO_PRI);
}
for (int i = 0; i < 50; i++) {
- pool.add(hipri, 1);
+ pool.addWithPriority(hipri, Executor::HI_PRI);
}
pool.setNumThreads(1);
pool.join();
observer->checkCalls();
}
+
+TEST(ThreadPoolExecutorTest, AddWithPriority) {
+ std::atomic_int c{0};
+ auto f = [&]{ c++; };
+
+ // IO exe doesn't support priorities
+ IOThreadPoolExecutor ioExe(10);
+ EXPECT_THROW(ioExe.addWithPriority(f, 0), std::runtime_error);
+
+ CPUThreadPoolExecutor cpuExe(10, 3);
+ cpuExe.addWithPriority(f, -1);
+ cpuExe.addWithPriority(f, 0);
+ cpuExe.addWithPriority(f, 1);
+ cpuExe.addWithPriority(f, -2); // will add at the lowest priority
+ cpuExe.addWithPriority(f, 2); // will add at the highest priority
+ cpuExe.addWithPriority(f, Executor::LO_PRI);
+ cpuExe.addWithPriority(f, Executor::HI_PRI);
+ cpuExe.join();
+
+ EXPECT_EQ(7, c);
+}