#include <linux/debug_locks.h>
#include <linux/lockdep.h>
#include <linux/idr.h>
+#include <linux/jhash.h>
#include <linux/hashtable.h>
+#include <linux/rculist.h>
#include "workqueue_internal.h"
* create_worker() is in progress.
*/
POOL_MANAGE_WORKERS = 1 << 0, /* need to manage workers */
- POOL_MANAGING_WORKERS = 1 << 1, /* managing workers */
POOL_DISASSOCIATED = 1 << 2, /* cpu can't serve workers */
POOL_FREEZING = 1 << 3, /* freeze in progress */
NR_STD_WORKER_POOLS = 2, /* # standard pools per cpu */
+ UNBOUND_POOL_HASH_ORDER = 6, /* hashed by pool->attrs */
BUSY_WORKER_HASH_ORDER = 6, /* 64 pointers */
MAX_IDLE_WORKERS_RATIO = 4, /* 1/4 of busy can be idle */
* F: wq->flush_mutex protected.
*
* W: workqueue_lock protected.
+ *
+ * R: workqueue_lock protected for writes. Sched-RCU protected for reads.
*/
/* struct worker is defined in workqueue_internal.h */
struct worker_pool {
spinlock_t lock; /* the pool lock */
- unsigned int cpu; /* I: the associated cpu */
+ int cpu; /* I: the associated cpu */
int id; /* I: pool ID */
unsigned int flags; /* X: flags */
DECLARE_HASHTABLE(busy_hash, BUSY_WORKER_HASH_ORDER);
/* L: hash of busy workers */
+ struct mutex manager_arb; /* manager arbitration */
struct mutex assoc_mutex; /* protect POOL_DISASSOCIATED */
struct ida worker_ida; /* L: for worker IDs */
+ struct workqueue_attrs *attrs; /* I: worker attributes */
+ struct hlist_node hash_node; /* R: unbound_pool_hash node */
+ int refcnt; /* refcnt for unbound pools */
+
/*
* The current concurrency level. As it's likely to be accessed
* from other CPUs during try_to_wake_up(), put it in a separate
* cacheline.
*/
atomic_t nr_running ____cacheline_aligned_in_smp;
+
+ /*
+ * Destruction of pool is sched-RCU protected to allow dereferences
+ * from get_work_pool().
+ */
+ struct rcu_head rcu;
} ____cacheline_aligned_in_smp;
/*
int nr_active; /* L: nr of active works */
int max_active; /* L: max active works */
struct list_head delayed_works; /* L: delayed works */
-};
+ struct list_head pwqs_node; /* R: node on wq->pwqs */
+ struct list_head mayday_node; /* W: node on wq->maydays */
+} __aligned(1 << WORK_STRUCT_FLAG_BITS);
/*
* Structure used to wait for workqueue flush.
struct completion done; /* flush completion */
};
-/*
- * All cpumasks are assumed to be always set on UP and thus can't be
- * used to determine whether there's something to be done.
- */
-#ifdef CONFIG_SMP
-typedef cpumask_var_t mayday_mask_t;
-#define mayday_test_and_set_cpu(cpu, mask) \
- cpumask_test_and_set_cpu((cpu), (mask))
-#define mayday_clear_cpu(cpu, mask) cpumask_clear_cpu((cpu), (mask))
-#define for_each_mayday_cpu(cpu, mask) for_each_cpu((cpu), (mask))
-#define alloc_mayday_mask(maskp, gfp) zalloc_cpumask_var((maskp), (gfp))
-#define free_mayday_mask(mask) free_cpumask_var((mask))
-#else
-typedef unsigned long mayday_mask_t;
-#define mayday_test_and_set_cpu(cpu, mask) test_and_set_bit(0, &(mask))
-#define mayday_clear_cpu(cpu, mask) clear_bit(0, &(mask))
-#define for_each_mayday_cpu(cpu, mask) if ((cpu) = 0, (mask))
-#define alloc_mayday_mask(maskp, gfp) true
-#define free_mayday_mask(mask) do { } while (0)
-#endif
-
/*
* The externally visible workqueue abstraction is an array of
* per-CPU workqueues:
*/
struct workqueue_struct {
unsigned int flags; /* W: WQ_* flags */
- union {
- struct pool_workqueue __percpu *pcpu;
- struct pool_workqueue *single;
- unsigned long v;
- } pool_wq; /* I: pwq's */
+ struct pool_workqueue __percpu *cpu_pwqs; /* I: per-cpu pwq's */
+ struct list_head pwqs; /* R: all pwqs of this wq */
struct list_head list; /* W: list of all workqueues */
struct mutex flush_mutex; /* protects wq flushing */
struct list_head flusher_queue; /* F: flush waiters */
struct list_head flusher_overflow; /* F: flush overflow list */
- mayday_mask_t mayday_mask; /* cpus requesting rescue */
+ struct list_head maydays; /* W: pwqs requesting rescue */
struct worker *rescuer; /* I: rescue worker */
int nr_drainers; /* W: drain in progress */
char name[]; /* I: workqueue name */
};
+static struct kmem_cache *pwq_cache;
+
+/* hash of all unbound pools keyed by pool->attrs */
+static DEFINE_HASHTABLE(unbound_pool_hash, UNBOUND_POOL_HASH_ORDER);
+
+static struct workqueue_attrs *unbound_std_wq_attrs[NR_STD_WORKER_POOLS];
+
struct workqueue_struct *system_wq __read_mostly;
EXPORT_SYMBOL_GPL(system_wq);
struct workqueue_struct *system_highpri_wq __read_mostly;
#define CREATE_TRACE_POINTS
#include <trace/events/workqueue.h>
+#define assert_rcu_or_wq_lock() \
+ rcu_lockdep_assert(rcu_read_lock_sched_held() || \
+ lockdep_is_held(&workqueue_lock), \
+ "sched RCU or workqueue lock should be held")
+
#define for_each_std_worker_pool(pool, cpu) \
for ((pool) = &std_worker_pools(cpu)[0]; \
(pool) < &std_worker_pools(cpu)[NR_STD_WORKER_POOLS]; (pool)++)
-#define for_each_busy_worker(worker, i, pos, pool) \
- hash_for_each(pool->busy_hash, i, pos, worker, hentry)
+#define for_each_busy_worker(worker, i, pool) \
+ hash_for_each(pool->busy_hash, i, worker, hentry)
static inline int __next_wq_cpu(int cpu, const struct cpumask *mask,
unsigned int sw)
return WORK_CPU_END;
}
-static inline int __next_pwq_cpu(int cpu, const struct cpumask *mask,
- struct workqueue_struct *wq)
-{
- return __next_wq_cpu(cpu, mask, !(wq->flags & WQ_UNBOUND) ? 1 : 2);
-}
-
/*
* CPU iterators
*
*
* for_each_wq_cpu() : possible CPUs + WORK_CPU_UNBOUND
* for_each_online_wq_cpu() : online CPUs + WORK_CPU_UNBOUND
- * for_each_pwq_cpu() : possible CPUs for bound workqueues,
- * WORK_CPU_UNBOUND for unbound workqueues
*/
#define for_each_wq_cpu(cpu) \
for ((cpu) = __next_wq_cpu(-1, cpu_possible_mask, 3); \
(cpu) < WORK_CPU_END; \
(cpu) = __next_wq_cpu((cpu), cpu_online_mask, 3))
-#define for_each_pwq_cpu(cpu, wq) \
- for ((cpu) = __next_pwq_cpu(-1, cpu_possible_mask, (wq)); \
- (cpu) < WORK_CPU_END; \
- (cpu) = __next_pwq_cpu((cpu), cpu_possible_mask, (wq)))
+/**
+ * for_each_pool - iterate through all worker_pools in the system
+ * @pool: iteration cursor
+ * @id: integer used for iteration
+ *
+ * This must be called either with workqueue_lock held or sched RCU read
+ * locked. If the pool needs to be used beyond the locking in effect, the
+ * caller is responsible for guaranteeing that the pool stays online.
+ *
+ * The if/else clause exists only for the lockdep assertion and can be
+ * ignored.
+ */
+#define for_each_pool(pool, id) \
+ idr_for_each_entry(&worker_pool_idr, pool, id) \
+ if (({ assert_rcu_or_wq_lock(); false; })) { } \
+ else
+
+/**
+ * for_each_pwq - iterate through all pool_workqueues of the specified workqueue
+ * @pwq: iteration cursor
+ * @wq: the target workqueue
+ *
+ * This must be called either with workqueue_lock held or sched RCU read
+ * locked. If the pwq needs to be used beyond the locking in effect, the
+ * caller is responsible for guaranteeing that the pwq stays online.
+ *
+ * The if/else clause exists only for the lockdep assertion and can be
+ * ignored.
+ */
+#define for_each_pwq(pwq, wq) \
+ list_for_each_entry_rcu((pwq), &(wq)->pwqs, pwqs_node) \
+ if (({ assert_rcu_or_wq_lock(); false; })) { } \
+ else
#ifdef CONFIG_DEBUG_OBJECTS_WORK
cpu_std_worker_pools);
static struct worker_pool unbound_std_worker_pools[NR_STD_WORKER_POOLS];
-/* idr of all pools */
-static DEFINE_MUTEX(worker_pool_idr_mutex);
+/*
+ * idr of all pools. Modifications are protected by workqueue_lock. Read
+ * accesses are protected by sched-RCU protected.
+ */
static DEFINE_IDR(worker_pool_idr);
static int worker_thread(void *__worker);
{
int ret;
- mutex_lock(&worker_pool_idr_mutex);
- idr_pre_get(&worker_pool_idr, GFP_KERNEL);
- ret = idr_get_new(&worker_pool_idr, pool, &pool->id);
- mutex_unlock(&worker_pool_idr_mutex);
+ do {
+ if (!idr_pre_get(&worker_pool_idr, GFP_KERNEL))
+ return -ENOMEM;
- return ret;
-}
+ spin_lock_irq(&workqueue_lock);
+ ret = idr_get_new(&worker_pool_idr, pool, &pool->id);
+ spin_unlock_irq(&workqueue_lock);
+ } while (ret == -EAGAIN);
-/*
- * Lookup worker_pool by id. The idr currently is built during boot and
- * never modified. Don't worry about locking for now.
- */
-static struct worker_pool *worker_pool_by_id(int pool_id)
-{
- return idr_find(&worker_pool_idr, pool_id);
+ return ret;
}
static struct worker_pool *get_std_worker_pool(int cpu, bool highpri)
return &pools[highpri];
}
-static struct pool_workqueue *get_pwq(unsigned int cpu,
- struct workqueue_struct *wq)
+/**
+ * first_pwq - return the first pool_workqueue of the specified workqueue
+ * @wq: the target workqueue
+ *
+ * This must be called either with workqueue_lock held or sched RCU read
+ * locked. If the pwq needs to be used beyond the locking in effect, the
+ * caller is responsible for guaranteeing that the pwq stays online.
+ */
+static struct pool_workqueue *first_pwq(struct workqueue_struct *wq)
{
- if (!(wq->flags & WQ_UNBOUND)) {
- if (likely(cpu < nr_cpu_ids))
- return per_cpu_ptr(wq->pool_wq.pcpu, cpu);
- } else if (likely(cpu == WORK_CPU_UNBOUND))
- return wq->pool_wq.single;
- return NULL;
+ assert_rcu_or_wq_lock();
+ return list_first_or_null_rcu(&wq->pwqs, struct pool_workqueue,
+ pwqs_node);
}
static unsigned int work_color_to_flags(int color)
static inline void set_work_data(struct work_struct *work, unsigned long data,
unsigned long flags)
{
- BUG_ON(!work_pending(work));
+ WARN_ON_ONCE(!work_pending(work));
atomic_long_set(&work->data, data | flags | work_static(work));
}
* @work: the work item of interest
*
* Return the worker_pool @work was last associated with. %NULL if none.
+ *
+ * Pools are created and destroyed under workqueue_lock, and allows read
+ * access under sched-RCU read lock. As such, this function should be
+ * called under workqueue_lock or with preemption disabled.
+ *
+ * All fields of the returned pool are accessible as long as the above
+ * mentioned locking is in effect. If the returned pool needs to be used
+ * beyond the critical section, the caller is responsible for ensuring the
+ * returned pool is and stays online.
*/
static struct worker_pool *get_work_pool(struct work_struct *work)
{
unsigned long data = atomic_long_read(&work->data);
- struct worker_pool *pool;
int pool_id;
+ assert_rcu_or_wq_lock();
+
if (data & WORK_STRUCT_PWQ)
return ((struct pool_workqueue *)
(data & WORK_STRUCT_WQ_DATA_MASK))->pool;
if (pool_id == WORK_OFFQ_POOL_NONE)
return NULL;
- pool = worker_pool_by_id(pool_id);
- WARN_ON_ONCE(!pool);
- return pool;
+ return idr_find(&worker_pool_idr, pool_id);
}
/**
/* Do we have too many workers and should some go away? */
static bool too_many_workers(struct worker_pool *pool)
{
- bool managing = pool->flags & POOL_MANAGING_WORKERS;
+ bool managing = mutex_is_locked(&pool->manager_arb);
int nr_idle = pool->nr_idle + managing; /* manager is considered idle */
int nr_busy = pool->nr_workers - nr_idle;
* CONTEXT:
* spin_lock_irq(rq->lock)
*/
-void wq_worker_waking_up(struct task_struct *task, unsigned int cpu)
+void wq_worker_waking_up(struct task_struct *task, int cpu)
{
struct worker *worker = kthread_data(task);
* RETURNS:
* Worker task on @cpu to wake up, %NULL if none.
*/
-struct task_struct *wq_worker_sleeping(struct task_struct *task,
- unsigned int cpu)
+struct task_struct *wq_worker_sleeping(struct task_struct *task, int cpu)
{
struct worker *worker = kthread_data(task), *to_wakeup = NULL;
struct worker_pool *pool;
pool = worker->pool;
/* this can only happen on the local cpu */
- BUG_ON(cpu != raw_smp_processor_id());
+ if (WARN_ON_ONCE(cpu != raw_smp_processor_id()))
+ return NULL;
/*
* The counterpart of the following dec_and_test, implied mb,
struct work_struct *work)
{
struct worker *worker;
- struct hlist_node *tmp;
- hash_for_each_possible(pool->busy_hash, worker, tmp, hentry,
+ hash_for_each_possible(pool->busy_hash, worker, hentry,
(unsigned long)work)
if (worker->current_work == work &&
worker->current_func == work->func)
return worker && worker->current_pwq->wq == wq;
}
-static void __queue_work(unsigned int cpu, struct workqueue_struct *wq,
+static void __queue_work(int cpu, struct workqueue_struct *wq,
struct work_struct *work)
{
struct pool_workqueue *pwq;
* work needs to be queued on that cpu to guarantee
* non-reentrancy.
*/
- pwq = get_pwq(cpu, wq);
+ pwq = per_cpu_ptr(wq->cpu_pwqs, cpu);
last_pool = get_work_pool(work);
if (last_pool && last_pool != pwq->pool) {
worker = find_worker_executing_work(last_pool, work);
if (worker && worker->current_pwq->wq == wq) {
- pwq = get_pwq(last_pool->cpu, wq);
+ pwq = per_cpu_ptr(wq->cpu_pwqs, last_pool->cpu);
} else {
/* meh... not running there, queue here */
spin_unlock(&last_pool->lock);
spin_lock(&pwq->pool->lock);
}
} else {
- pwq = get_pwq(WORK_CPU_UNBOUND, wq);
+ pwq = first_pwq(wq);
spin_lock(&pwq->pool->lock);
}
{
struct worker_pool *pool = worker->pool;
- BUG_ON(worker->flags & WORKER_IDLE);
- BUG_ON(!list_empty(&worker->entry) &&
- (worker->hentry.next || worker->hentry.pprev));
+ if (WARN_ON_ONCE(worker->flags & WORKER_IDLE) ||
+ WARN_ON_ONCE(!list_empty(&worker->entry) &&
+ (worker->hentry.next || worker->hentry.pprev)))
+ return;
/* can't use worker_set_flags(), also called from start_worker() */
worker->flags |= WORKER_IDLE;
{
struct worker_pool *pool = worker->pool;
- BUG_ON(!(worker->flags & WORKER_IDLE));
+ if (WARN_ON_ONCE(!(worker->flags & WORKER_IDLE)))
+ return;
worker_clr_flags(worker, WORKER_IDLE);
pool->nr_idle--;
list_del_init(&worker->entry);
}
/**
- * worker_maybe_bind_and_lock - bind worker to its cpu if possible and lock pool
- * @worker: self
+ * worker_maybe_bind_and_lock - try to bind %current to worker_pool and lock it
+ * @pool: target worker_pool
+ *
+ * Bind %current to the cpu of @pool if it is associated and lock @pool.
*
* Works which are scheduled while the cpu is online must at least be
* scheduled to a worker which is bound to the cpu so that if they are
* flushed from cpu callbacks while cpu is going down, they are
* guaranteed to execute on the cpu.
*
- * This function is to be used by rogue workers and rescuers to bind
+ * This function is to be used by unbound workers and rescuers to bind
* themselves to the target cpu and may race with cpu going down or
* coming online. kthread_bind() can't be used because it may put the
* worker to already dead cpu and set_cpus_allowed_ptr() can't be used
* %true if the associated pool is online (@worker is successfully
* bound), %false if offline.
*/
-static bool worker_maybe_bind_and_lock(struct worker *worker)
+static bool worker_maybe_bind_and_lock(struct worker_pool *pool)
__acquires(&pool->lock)
{
- struct worker_pool *pool = worker->pool;
- struct task_struct *task = worker->task;
-
while (true) {
/*
* The following call may fail, succeed or succeed
* against POOL_DISASSOCIATED.
*/
if (!(pool->flags & POOL_DISASSOCIATED))
- set_cpus_allowed_ptr(task, get_cpu_mask(pool->cpu));
+ set_cpus_allowed_ptr(current, pool->attrs->cpumask);
spin_lock_irq(&pool->lock);
if (pool->flags & POOL_DISASSOCIATED)
return false;
- if (task_cpu(task) == pool->cpu &&
- cpumask_equal(¤t->cpus_allowed,
- get_cpu_mask(pool->cpu)))
+ if (task_cpu(current) == pool->cpu &&
+ cpumask_equal(¤t->cpus_allowed, pool->attrs->cpumask))
return true;
spin_unlock_irq(&pool->lock);
static void idle_worker_rebind(struct worker *worker)
{
/* CPU may go down again inbetween, clear UNBOUND only on success */
- if (worker_maybe_bind_and_lock(worker))
+ if (worker_maybe_bind_and_lock(worker->pool))
worker_clr_flags(worker, WORKER_UNBOUND);
/* rebind complete, become available again */
{
struct worker *worker = container_of(work, struct worker, rebind_work);
- if (worker_maybe_bind_and_lock(worker))
+ if (worker_maybe_bind_and_lock(worker->pool))
worker_clr_flags(worker, WORKER_UNBOUND);
spin_unlock_irq(&worker->pool->lock);
static void rebind_workers(struct worker_pool *pool)
{
struct worker *worker, *n;
- struct hlist_node *pos;
int i;
lockdep_assert_held(&pool->assoc_mutex);
}
/* rebind busy workers */
- for_each_busy_worker(worker, i, pos, pool) {
+ for_each_busy_worker(worker, i, pool) {
struct work_struct *rebind_work = &worker->rebind_work;
struct workqueue_struct *wq;
* wq doesn't really matter but let's keep @worker->pool
* and @pwq->pool consistent for sanity.
*/
- if (std_worker_pool_pri(worker->pool))
+ if (worker->pool->attrs->nice < 0)
wq = system_highpri_wq;
else
wq = system_wq;
- insert_work(get_pwq(pool->cpu, wq), rebind_work,
+ insert_work(per_cpu_ptr(wq->cpu_pwqs, pool->cpu), rebind_work,
worker->scheduled.next,
work_color_to_flags(WORK_NO_COLOR));
}
*/
static struct worker *create_worker(struct worker_pool *pool)
{
- const char *pri = std_worker_pool_pri(pool) ? "H" : "";
+ const char *pri = pool->attrs->nice < 0 ? "H" : "";
struct worker *worker = NULL;
int id = -1;
worker->pool = pool;
worker->id = id;
- if (pool->cpu != WORK_CPU_UNBOUND)
+ if (pool->cpu >= 0)
worker->task = kthread_create_on_node(worker_thread,
worker, cpu_to_node(pool->cpu),
- "kworker/%u:%d%s", pool->cpu, id, pri);
+ "kworker/%d:%d%s", pool->cpu, id, pri);
else
worker->task = kthread_create(worker_thread, worker,
"kworker/u:%d%s", id, pri);
if (IS_ERR(worker->task))
goto fail;
- if (std_worker_pool_pri(pool))
- set_user_nice(worker->task, HIGHPRI_NICE_LEVEL);
+ set_user_nice(worker->task, pool->attrs->nice);
+ set_cpus_allowed_ptr(worker->task, pool->attrs->cpumask);
/*
- * Determine CPU binding of the new worker depending on
- * %POOL_DISASSOCIATED. The caller is responsible for ensuring the
- * flag remains stable across this function. See the comments
- * above the flag definition for details.
- *
- * As an unbound worker may later become a regular one if CPU comes
- * online, make sure every worker has %PF_THREAD_BOUND set.
+ * %PF_THREAD_BOUND is used to prevent userland from meddling with
+ * cpumask of workqueue workers. This is an abuse. We need
+ * %PF_NO_SETAFFINITY.
*/
- if (!(pool->flags & POOL_DISASSOCIATED)) {
- kthread_bind(worker->task, pool->cpu);
- } else {
- worker->task->flags |= PF_THREAD_BOUND;
+ worker->task->flags |= PF_THREAD_BOUND;
+
+ /*
+ * The caller is responsible for ensuring %POOL_DISASSOCIATED
+ * remains stable across this function. See the comments above the
+ * flag definition for details.
+ */
+ if (pool->flags & POOL_DISASSOCIATED)
worker->flags |= WORKER_UNBOUND;
- }
return worker;
fail:
int id = worker->id;
/* sanity check frenzy */
- BUG_ON(worker->current_work);
- BUG_ON(!list_empty(&worker->scheduled));
+ if (WARN_ON(worker->current_work) ||
+ WARN_ON(!list_empty(&worker->scheduled)))
+ return;
if (worker->flags & WORKER_STARTED)
pool->nr_workers--;
spin_unlock_irq(&pool->lock);
}
-static bool send_mayday(struct work_struct *work)
+static void send_mayday(struct work_struct *work)
{
struct pool_workqueue *pwq = get_work_pwq(work);
struct workqueue_struct *wq = pwq->wq;
- unsigned int cpu;
+
+ lockdep_assert_held(&workqueue_lock);
if (!(wq->flags & WQ_RESCUER))
- return false;
+ return;
/* mayday mayday mayday */
- cpu = pwq->pool->cpu;
- /* WORK_CPU_UNBOUND can't be set in cpumask, use cpu 0 instead */
- if (cpu == WORK_CPU_UNBOUND)
- cpu = 0;
- if (!mayday_test_and_set_cpu(cpu, wq->mayday_mask))
+ if (list_empty(&pwq->mayday_node)) {
+ list_add_tail(&pwq->mayday_node, &wq->maydays);
wake_up_process(wq->rescuer->task);
- return true;
+ }
}
static void pool_mayday_timeout(unsigned long __pool)
struct worker_pool *pool = (void *)__pool;
struct work_struct *work;
- spin_lock_irq(&pool->lock);
+ spin_lock_irq(&workqueue_lock); /* for wq->maydays */
+ spin_lock(&pool->lock);
if (need_to_create_worker(pool)) {
/*
send_mayday(work);
}
- spin_unlock_irq(&pool->lock);
+ spin_unlock(&pool->lock);
+ spin_unlock_irq(&workqueue_lock);
mod_timer(&pool->mayday_timer, jiffies + MAYDAY_INTERVAL);
}
del_timer_sync(&pool->mayday_timer);
spin_lock_irq(&pool->lock);
start_worker(worker);
- BUG_ON(need_to_create_worker(pool));
+ if (WARN_ON_ONCE(need_to_create_worker(pool)))
+ goto restart;
return true;
}
struct worker_pool *pool = worker->pool;
bool ret = false;
- if (pool->flags & POOL_MANAGING_WORKERS)
+ if (!mutex_trylock(&pool->manager_arb))
return ret;
- pool->flags |= POOL_MANAGING_WORKERS;
-
/*
* To simplify both worker management and CPU hotplug, hold off
* management while hotplug is in progress. CPU hotplug path can't
- * grab %POOL_MANAGING_WORKERS to achieve this because that can
- * lead to idle worker depletion (all become busy thinking someone
- * else is managing) which in turn can result in deadlock under
- * extreme circumstances. Use @pool->assoc_mutex to synchronize
- * manager against CPU hotplug.
+ * grab @pool->manager_arb to achieve this because that can lead to
+ * idle worker depletion (all become busy thinking someone else is
+ * managing) which in turn can result in deadlock under extreme
+ * circumstances. Use @pool->assoc_mutex to synchronize manager
+ * against CPU hotplug.
*
* assoc_mutex would always be free unless CPU hotplug is in
* progress. trylock first without dropping @pool->lock.
* on @pool's current state. Try it and adjust
* %WORKER_UNBOUND accordingly.
*/
- if (worker_maybe_bind_and_lock(worker))
+ if (worker_maybe_bind_and_lock(pool))
worker->flags &= ~WORKER_UNBOUND;
else
worker->flags |= WORKER_UNBOUND;
ret |= maybe_destroy_workers(pool);
ret |= maybe_create_worker(pool);
- pool->flags &= ~POOL_MANAGING_WORKERS;
mutex_unlock(&pool->assoc_mutex);
+ mutex_unlock(&pool->manager_arb);
return ret;
}
* preparing to process a work or actually processing it.
* Make sure nobody diddled with it while I was sleeping.
*/
- BUG_ON(!list_empty(&worker->scheduled));
+ WARN_ON_ONCE(!list_empty(&worker->scheduled));
/*
* When control reaches this point, we're guaranteed to have
struct worker *rescuer = __rescuer;
struct workqueue_struct *wq = rescuer->rescue_wq;
struct list_head *scheduled = &rescuer->scheduled;
- bool is_unbound = wq->flags & WQ_UNBOUND;
- unsigned int cpu;
set_user_nice(current, RESCUER_NICE_LEVEL);
return 0;
}
- /*
- * See whether any cpu is asking for help. Unbounded
- * workqueues use cpu 0 in mayday_mask for CPU_UNBOUND.
- */
- for_each_mayday_cpu(cpu, wq->mayday_mask) {
- unsigned int tcpu = is_unbound ? WORK_CPU_UNBOUND : cpu;
- struct pool_workqueue *pwq = get_pwq(tcpu, wq);
+ /* see whether any pwq is asking for help */
+ spin_lock_irq(&workqueue_lock);
+
+ while (!list_empty(&wq->maydays)) {
+ struct pool_workqueue *pwq = list_first_entry(&wq->maydays,
+ struct pool_workqueue, mayday_node);
struct worker_pool *pool = pwq->pool;
struct work_struct *work, *n;
__set_current_state(TASK_RUNNING);
- mayday_clear_cpu(cpu, wq->mayday_mask);
+ list_del_init(&pwq->mayday_node);
+
+ spin_unlock_irq(&workqueue_lock);
/* migrate to the target cpu if possible */
+ worker_maybe_bind_and_lock(pool);
rescuer->pool = pool;
- worker_maybe_bind_and_lock(rescuer);
/*
* Slurp in all works issued via this workqueue and
* process'em.
*/
- BUG_ON(!list_empty(&rescuer->scheduled));
+ WARN_ON_ONCE(!list_empty(&rescuer->scheduled));
list_for_each_entry_safe(work, n, &pool->worklist, entry)
if (get_work_pwq(work) == pwq)
move_linked_works(work, scheduled, &n);
if (keep_working(pool))
wake_up_worker(pool);
- spin_unlock_irq(&pool->lock);
+ rescuer->pool = NULL;
+ spin_unlock(&pool->lock);
+ spin_lock(&workqueue_lock);
}
+ spin_unlock_irq(&workqueue_lock);
+
/* rescuers should never participate in concurrency management */
WARN_ON_ONCE(!(rescuer->flags & WORKER_NOT_RUNNING));
schedule();
int flush_color, int work_color)
{
bool wait = false;
- unsigned int cpu;
+ struct pool_workqueue *pwq;
if (flush_color >= 0) {
- BUG_ON(atomic_read(&wq->nr_pwqs_to_flush));
+ WARN_ON_ONCE(atomic_read(&wq->nr_pwqs_to_flush));
atomic_set(&wq->nr_pwqs_to_flush, 1);
}
- for_each_pwq_cpu(cpu, wq) {
- struct pool_workqueue *pwq = get_pwq(cpu, wq);
+ local_irq_disable();
+
+ for_each_pwq(pwq, wq) {
struct worker_pool *pool = pwq->pool;
- spin_lock_irq(&pool->lock);
+ spin_lock(&pool->lock);
if (flush_color >= 0) {
- BUG_ON(pwq->flush_color != -1);
+ WARN_ON_ONCE(pwq->flush_color != -1);
if (pwq->nr_in_flight[flush_color]) {
pwq->flush_color = flush_color;
}
if (work_color >= 0) {
- BUG_ON(work_color != work_next_color(pwq->work_color));
+ WARN_ON_ONCE(work_color != work_next_color(pwq->work_color));
pwq->work_color = work_color;
}
- spin_unlock_irq(&pool->lock);
+ spin_unlock(&pool->lock);
}
+ local_irq_enable();
+
if (flush_color >= 0 && atomic_dec_and_test(&wq->nr_pwqs_to_flush))
complete(&wq->first_flusher->done);
* becomes our flush_color and work_color is advanced
* by one.
*/
- BUG_ON(!list_empty(&wq->flusher_overflow));
+ WARN_ON_ONCE(!list_empty(&wq->flusher_overflow));
this_flusher.flush_color = wq->work_color;
wq->work_color = next_color;
if (!wq->first_flusher) {
/* no flush in progress, become the first flusher */
- BUG_ON(wq->flush_color != this_flusher.flush_color);
+ WARN_ON_ONCE(wq->flush_color != this_flusher.flush_color);
wq->first_flusher = &this_flusher;
}
} else {
/* wait in queue */
- BUG_ON(wq->flush_color == this_flusher.flush_color);
+ WARN_ON_ONCE(wq->flush_color == this_flusher.flush_color);
list_add_tail(&this_flusher.list, &wq->flusher_queue);
flush_workqueue_prep_pwqs(wq, -1, wq->work_color);
}
wq->first_flusher = NULL;
- BUG_ON(!list_empty(&this_flusher.list));
- BUG_ON(wq->flush_color != this_flusher.flush_color);
+ WARN_ON_ONCE(!list_empty(&this_flusher.list));
+ WARN_ON_ONCE(wq->flush_color != this_flusher.flush_color);
while (true) {
struct wq_flusher *next, *tmp;
complete(&next->done);
}
- BUG_ON(!list_empty(&wq->flusher_overflow) &&
- wq->flush_color != work_next_color(wq->work_color));
+ WARN_ON_ONCE(!list_empty(&wq->flusher_overflow) &&
+ wq->flush_color != work_next_color(wq->work_color));
/* this flush_color is finished, advance by one */
wq->flush_color = work_next_color(wq->flush_color);
}
if (list_empty(&wq->flusher_queue)) {
- BUG_ON(wq->flush_color != wq->work_color);
+ WARN_ON_ONCE(wq->flush_color != wq->work_color);
break;
}
* Need to flush more colors. Make the next flusher
* the new first flusher and arm pwqs.
*/
- BUG_ON(wq->flush_color == wq->work_color);
- BUG_ON(wq->flush_color != next->flush_color);
+ WARN_ON_ONCE(wq->flush_color == wq->work_color);
+ WARN_ON_ONCE(wq->flush_color != next->flush_color);
list_del_init(&next->list);
wq->first_flusher = next;
void drain_workqueue(struct workqueue_struct *wq)
{
unsigned int flush_cnt = 0;
- unsigned int cpu;
+ struct pool_workqueue *pwq;
/*
* __queue_work() needs to test whether there are drainers, is much
* hotter than drain_workqueue() and already looks at @wq->flags.
* Use WQ_DRAINING so that queue doesn't have to check nr_drainers.
*/
- spin_lock(&workqueue_lock);
+ spin_lock_irq(&workqueue_lock);
if (!wq->nr_drainers++)
wq->flags |= WQ_DRAINING;
- spin_unlock(&workqueue_lock);
+ spin_unlock_irq(&workqueue_lock);
reflush:
flush_workqueue(wq);
- for_each_pwq_cpu(cpu, wq) {
- struct pool_workqueue *pwq = get_pwq(cpu, wq);
+ local_irq_disable();
+
+ for_each_pwq(pwq, wq) {
bool drained;
- spin_lock_irq(&pwq->pool->lock);
+ spin_lock(&pwq->pool->lock);
drained = !pwq->nr_active && list_empty(&pwq->delayed_works);
- spin_unlock_irq(&pwq->pool->lock);
+ spin_unlock(&pwq->pool->lock);
if (drained)
continue;
(flush_cnt % 100 == 0 && flush_cnt <= 1000))
pr_warn("workqueue %s: flush on destruction isn't complete after %u tries\n",
wq->name, flush_cnt);
+
+ local_irq_enable();
goto reflush;
}
if (!--wq->nr_drainers)
wq->flags &= ~WQ_DRAINING;
spin_unlock(&workqueue_lock);
+
+ local_irq_enable();
}
EXPORT_SYMBOL_GPL(drain_workqueue);
struct pool_workqueue *pwq;
might_sleep();
+
+ local_irq_disable();
pool = get_work_pool(work);
- if (!pool)
+ if (!pool) {
+ local_irq_enable();
return false;
+ }
- spin_lock_irq(&pool->lock);
+ spin_lock(&pool->lock);
/* see the comment in try_to_grab_pending() with the same code */
pwq = get_work_pwq(work);
if (pwq) {
return system_wq != NULL;
}
-static int alloc_pwqs(struct workqueue_struct *wq)
+/**
+ * free_workqueue_attrs - free a workqueue_attrs
+ * @attrs: workqueue_attrs to free
+ *
+ * Undo alloc_workqueue_attrs().
+ */
+void free_workqueue_attrs(struct workqueue_attrs *attrs)
{
- /*
- * pwqs are forced aligned according to WORK_STRUCT_FLAG_BITS.
- * Make sure that the alignment isn't lower than that of
- * unsigned long long.
- */
- const size_t size = sizeof(struct pool_workqueue);
- const size_t align = max_t(size_t, 1 << WORK_STRUCT_FLAG_BITS,
- __alignof__(unsigned long long));
+ if (attrs) {
+ free_cpumask_var(attrs->cpumask);
+ kfree(attrs);
+ }
+}
- if (!(wq->flags & WQ_UNBOUND))
- wq->pool_wq.pcpu = __alloc_percpu(size, align);
- else {
- void *ptr;
+/**
+ * alloc_workqueue_attrs - allocate a workqueue_attrs
+ * @gfp_mask: allocation mask to use
+ *
+ * Allocate a new workqueue_attrs, initialize with default settings and
+ * return it. Returns NULL on failure.
+ */
+struct workqueue_attrs *alloc_workqueue_attrs(gfp_t gfp_mask)
+{
+ struct workqueue_attrs *attrs;
- /*
- * Allocate enough room to align pwq and put an extra
- * pointer at the end pointing back to the originally
- * allocated pointer which will be used for free.
- */
- ptr = kzalloc(size + align + sizeof(void *), GFP_KERNEL);
- if (ptr) {
- wq->pool_wq.single = PTR_ALIGN(ptr, align);
- *(void **)(wq->pool_wq.single + 1) = ptr;
+ attrs = kzalloc(sizeof(*attrs), gfp_mask);
+ if (!attrs)
+ goto fail;
+ if (!alloc_cpumask_var(&attrs->cpumask, gfp_mask))
+ goto fail;
+
+ cpumask_setall(attrs->cpumask);
+ return attrs;
+fail:
+ free_workqueue_attrs(attrs);
+ return NULL;
+}
+
+static void copy_workqueue_attrs(struct workqueue_attrs *to,
+ const struct workqueue_attrs *from)
+{
+ to->nice = from->nice;
+ cpumask_copy(to->cpumask, from->cpumask);
+}
+
+/*
+ * Hacky implementation of jhash of bitmaps which only considers the
+ * specified number of bits. We probably want a proper implementation in
+ * include/linux/jhash.h.
+ */
+static u32 jhash_bitmap(const unsigned long *bitmap, int bits, u32 hash)
+{
+ int nr_longs = bits / BITS_PER_LONG;
+ int nr_leftover = bits % BITS_PER_LONG;
+ unsigned long leftover = 0;
+
+ if (nr_longs)
+ hash = jhash(bitmap, nr_longs * sizeof(long), hash);
+ if (nr_leftover) {
+ bitmap_copy(&leftover, bitmap + nr_longs, nr_leftover);
+ hash = jhash(&leftover, sizeof(long), hash);
+ }
+ return hash;
+}
+
+/* hash value of the content of @attr */
+static u32 wqattrs_hash(const struct workqueue_attrs *attrs)
+{
+ u32 hash = 0;
+
+ hash = jhash_1word(attrs->nice, hash);
+ hash = jhash_bitmap(cpumask_bits(attrs->cpumask), nr_cpu_ids, hash);
+ return hash;
+}
+
+/* content equality test */
+static bool wqattrs_equal(const struct workqueue_attrs *a,
+ const struct workqueue_attrs *b)
+{
+ if (a->nice != b->nice)
+ return false;
+ if (!cpumask_equal(a->cpumask, b->cpumask))
+ return false;
+ return true;
+}
+
+/**
+ * init_worker_pool - initialize a newly zalloc'd worker_pool
+ * @pool: worker_pool to initialize
+ *
+ * Initiailize a newly zalloc'd @pool. It also allocates @pool->attrs.
+ * Returns 0 on success, -errno on failure. Even on failure, all fields
+ * inside @pool proper are initialized and put_unbound_pool() can be called
+ * on @pool safely to release it.
+ */
+static int init_worker_pool(struct worker_pool *pool)
+{
+ spin_lock_init(&pool->lock);
+ pool->id = -1;
+ pool->cpu = -1;
+ pool->flags |= POOL_DISASSOCIATED;
+ INIT_LIST_HEAD(&pool->worklist);
+ INIT_LIST_HEAD(&pool->idle_list);
+ hash_init(pool->busy_hash);
+
+ init_timer_deferrable(&pool->idle_timer);
+ pool->idle_timer.function = idle_worker_timeout;
+ pool->idle_timer.data = (unsigned long)pool;
+
+ setup_timer(&pool->mayday_timer, pool_mayday_timeout,
+ (unsigned long)pool);
+
+ mutex_init(&pool->manager_arb);
+ mutex_init(&pool->assoc_mutex);
+ ida_init(&pool->worker_ida);
+
+ INIT_HLIST_NODE(&pool->hash_node);
+ pool->refcnt = 1;
+
+ /* shouldn't fail above this point */
+ pool->attrs = alloc_workqueue_attrs(GFP_KERNEL);
+ if (!pool->attrs)
+ return -ENOMEM;
+ return 0;
+}
+
+static void rcu_free_pool(struct rcu_head *rcu)
+{
+ struct worker_pool *pool = container_of(rcu, struct worker_pool, rcu);
+
+ ida_destroy(&pool->worker_ida);
+ free_workqueue_attrs(pool->attrs);
+ kfree(pool);
+}
+
+/**
+ * put_unbound_pool - put a worker_pool
+ * @pool: worker_pool to put
+ *
+ * Put @pool. If its refcnt reaches zero, it gets destroyed in sched-RCU
+ * safe manner.
+ */
+static void put_unbound_pool(struct worker_pool *pool)
+{
+ struct worker *worker;
+
+ spin_lock_irq(&workqueue_lock);
+ if (--pool->refcnt) {
+ spin_unlock_irq(&workqueue_lock);
+ return;
+ }
+
+ /* sanity checks */
+ if (WARN_ON(!(pool->flags & POOL_DISASSOCIATED)) ||
+ WARN_ON(!list_empty(&pool->worklist))) {
+ spin_unlock_irq(&workqueue_lock);
+ return;
+ }
+
+ /* release id and unhash */
+ if (pool->id >= 0)
+ idr_remove(&worker_pool_idr, pool->id);
+ hash_del(&pool->hash_node);
+
+ spin_unlock_irq(&workqueue_lock);
+
+ /* lock out manager and destroy all workers */
+ mutex_lock(&pool->manager_arb);
+ spin_lock_irq(&pool->lock);
+
+ while ((worker = first_worker(pool)))
+ destroy_worker(worker);
+ WARN_ON(pool->nr_workers || pool->nr_idle);
+
+ spin_unlock_irq(&pool->lock);
+ mutex_unlock(&pool->manager_arb);
+
+ /* shut down the timers */
+ del_timer_sync(&pool->idle_timer);
+ del_timer_sync(&pool->mayday_timer);
+
+ /* sched-RCU protected to allow dereferences from get_work_pool() */
+ call_rcu_sched(&pool->rcu, rcu_free_pool);
+}
+
+/**
+ * get_unbound_pool - get a worker_pool with the specified attributes
+ * @attrs: the attributes of the worker_pool to get
+ *
+ * Obtain a worker_pool which has the same attributes as @attrs, bump the
+ * reference count and return it. If there already is a matching
+ * worker_pool, it will be used; otherwise, this function attempts to
+ * create a new one. On failure, returns NULL.
+ */
+static struct worker_pool *get_unbound_pool(const struct workqueue_attrs *attrs)
+{
+ static DEFINE_MUTEX(create_mutex);
+ u32 hash = wqattrs_hash(attrs);
+ struct worker_pool *pool;
+ struct worker *worker;
+
+ mutex_lock(&create_mutex);
+
+ /* do we already have a matching pool? */
+ spin_lock_irq(&workqueue_lock);
+ hash_for_each_possible(unbound_pool_hash, pool, hash_node, hash) {
+ if (wqattrs_equal(pool->attrs, attrs)) {
+ pool->refcnt++;
+ goto out_unlock;
}
}
+ spin_unlock_irq(&workqueue_lock);
+
+ /* nope, create a new one */
+ pool = kzalloc(sizeof(*pool), GFP_KERNEL);
+ if (!pool || init_worker_pool(pool) < 0)
+ goto fail;
- /* just in case, make sure it's actually aligned */
- BUG_ON(!IS_ALIGNED(wq->pool_wq.v, align));
- return wq->pool_wq.v ? 0 : -ENOMEM;
+ copy_workqueue_attrs(pool->attrs, attrs);
+
+ if (worker_pool_assign_id(pool) < 0)
+ goto fail;
+
+ /* create and start the initial worker */
+ worker = create_worker(pool);
+ if (!worker)
+ goto fail;
+
+ spin_lock_irq(&pool->lock);
+ start_worker(worker);
+ spin_unlock_irq(&pool->lock);
+
+ /* install */
+ spin_lock_irq(&workqueue_lock);
+ hash_add(unbound_pool_hash, &pool->hash_node, hash);
+out_unlock:
+ spin_unlock_irq(&workqueue_lock);
+ mutex_unlock(&create_mutex);
+ return pool;
+fail:
+ mutex_unlock(&create_mutex);
+ if (pool)
+ put_unbound_pool(pool);
+ return NULL;
+}
+
+static int alloc_and_link_pwqs(struct workqueue_struct *wq)
+{
+ bool highpri = wq->flags & WQ_HIGHPRI;
+ int cpu;
+
+ if (!(wq->flags & WQ_UNBOUND)) {
+ wq->cpu_pwqs = alloc_percpu(struct pool_workqueue);
+ if (!wq->cpu_pwqs)
+ return -ENOMEM;
+
+ for_each_possible_cpu(cpu) {
+ struct pool_workqueue *pwq =
+ per_cpu_ptr(wq->cpu_pwqs, cpu);
+
+ pwq->pool = get_std_worker_pool(cpu, highpri);
+ list_add_tail_rcu(&pwq->pwqs_node, &wq->pwqs);
+ }
+ } else {
+ struct pool_workqueue *pwq;
+
+ pwq = kmem_cache_zalloc(pwq_cache, GFP_KERNEL);
+ if (!pwq)
+ return -ENOMEM;
+
+ pwq->pool = get_unbound_pool(unbound_std_wq_attrs[highpri]);
+ if (!pwq->pool) {
+ kmem_cache_free(pwq_cache, pwq);
+ return -ENOMEM;
+ }
+
+ list_add_tail_rcu(&pwq->pwqs_node, &wq->pwqs);
+ }
+
+ return 0;
}
static void free_pwqs(struct workqueue_struct *wq)
{
if (!(wq->flags & WQ_UNBOUND))
- free_percpu(wq->pool_wq.pcpu);
- else if (wq->pool_wq.single) {
- /* the pointer to free is stored right after the pwq */
- kfree(*(void **)(wq->pool_wq.single + 1));
- }
+ free_percpu(wq->cpu_pwqs);
+ else if (!list_empty(&wq->pwqs))
+ kmem_cache_free(pwq_cache, list_first_entry(&wq->pwqs,
+ struct pool_workqueue, pwqs_node));
}
static int wq_clamp_max_active(int max_active, unsigned int flags,
{
va_list args, args1;
struct workqueue_struct *wq;
- unsigned int cpu;
+ struct pool_workqueue *pwq;
size_t namelen;
/* determine namelen, allocate wq and format name */
wq->saved_max_active = max_active;
mutex_init(&wq->flush_mutex);
atomic_set(&wq->nr_pwqs_to_flush, 0);
+ INIT_LIST_HEAD(&wq->pwqs);
INIT_LIST_HEAD(&wq->flusher_queue);
INIT_LIST_HEAD(&wq->flusher_overflow);
+ INIT_LIST_HEAD(&wq->maydays);
lockdep_init_map(&wq->lockdep_map, lock_name, key, 0);
INIT_LIST_HEAD(&wq->list);
- if (alloc_pwqs(wq) < 0)
+ if (alloc_and_link_pwqs(wq) < 0)
goto err;
- for_each_pwq_cpu(cpu, wq) {
- struct pool_workqueue *pwq = get_pwq(cpu, wq);
-
+ local_irq_disable();
+ for_each_pwq(pwq, wq) {
BUG_ON((unsigned long)pwq & WORK_STRUCT_FLAG_MASK);
- pwq->pool = get_std_worker_pool(cpu, flags & WQ_HIGHPRI);
pwq->wq = wq;
pwq->flush_color = -1;
pwq->max_active = max_active;
INIT_LIST_HEAD(&pwq->delayed_works);
+ INIT_LIST_HEAD(&pwq->mayday_node);
}
+ local_irq_enable();
if (flags & WQ_RESCUER) {
struct worker *rescuer;
- if (!alloc_mayday_mask(&wq->mayday_mask, GFP_KERNEL))
- goto err;
-
wq->rescuer = rescuer = alloc_worker();
if (!rescuer)
goto err;
* list. Grab it, set max_active accordingly and add the new
* workqueue to workqueues list.
*/
- spin_lock(&workqueue_lock);
+ spin_lock_irq(&workqueue_lock);
if (workqueue_freezing && wq->flags & WQ_FREEZABLE)
- for_each_pwq_cpu(cpu, wq)
- get_pwq(cpu, wq)->max_active = 0;
+ for_each_pwq(pwq, wq)
+ pwq->max_active = 0;
list_add(&wq->list, &workqueues);
- spin_unlock(&workqueue_lock);
+ spin_unlock_irq(&workqueue_lock);
return wq;
err:
if (wq) {
free_pwqs(wq);
- free_mayday_mask(wq->mayday_mask);
kfree(wq->rescuer);
kfree(wq);
}
*/
void destroy_workqueue(struct workqueue_struct *wq)
{
- unsigned int cpu;
+ struct pool_workqueue *pwq;
/* drain it before proceeding with destruction */
drain_workqueue(wq);
+ spin_lock_irq(&workqueue_lock);
+
+ /* sanity checks */
+ for_each_pwq(pwq, wq) {
+ int i;
+
+ for (i = 0; i < WORK_NR_COLORS; i++) {
+ if (WARN_ON(pwq->nr_in_flight[i])) {
+ spin_unlock_irq(&workqueue_lock);
+ return;
+ }
+ }
+
+ if (WARN_ON(pwq->nr_active) ||
+ WARN_ON(!list_empty(&pwq->delayed_works))) {
+ spin_unlock_irq(&workqueue_lock);
+ return;
+ }
+ }
+
/*
* wq list is used to freeze wq, remove from list after
* flushing is complete in case freeze races us.
*/
- spin_lock(&workqueue_lock);
list_del(&wq->list);
- spin_unlock(&workqueue_lock);
- /* sanity check */
- for_each_pwq_cpu(cpu, wq) {
- struct pool_workqueue *pwq = get_pwq(cpu, wq);
- int i;
-
- for (i = 0; i < WORK_NR_COLORS; i++)
- BUG_ON(pwq->nr_in_flight[i]);
- BUG_ON(pwq->nr_active);
- BUG_ON(!list_empty(&pwq->delayed_works));
- }
+ spin_unlock_irq(&workqueue_lock);
if (wq->flags & WQ_RESCUER) {
kthread_stop(wq->rescuer->task);
- free_mayday_mask(wq->mayday_mask);
kfree(wq->rescuer);
}
+ /*
+ * We're the sole accessor of @wq at this point. Directly access
+ * the first pwq and put its pool.
+ */
+ if (wq->flags & WQ_UNBOUND) {
+ pwq = list_first_entry(&wq->pwqs, struct pool_workqueue,
+ pwqs_node);
+ put_unbound_pool(pwq->pool);
+ }
free_pwqs(wq);
kfree(wq);
}
*/
void workqueue_set_max_active(struct workqueue_struct *wq, int max_active)
{
- unsigned int cpu;
+ struct pool_workqueue *pwq;
max_active = wq_clamp_max_active(max_active, wq->flags, wq->name);
- spin_lock(&workqueue_lock);
+ spin_lock_irq(&workqueue_lock);
wq->saved_max_active = max_active;
- for_each_pwq_cpu(cpu, wq) {
- struct pool_workqueue *pwq = get_pwq(cpu, wq);
+ for_each_pwq(pwq, wq) {
struct worker_pool *pool = pwq->pool;
- spin_lock_irq(&pool->lock);
+ spin_lock(&pool->lock);
if (!(wq->flags & WQ_FREEZABLE) ||
!(pool->flags & POOL_FREEZING))
pwq_set_max_active(pwq, max_active);
- spin_unlock_irq(&pool->lock);
+ spin_unlock(&pool->lock);
}
- spin_unlock(&workqueue_lock);
+ spin_unlock_irq(&workqueue_lock);
}
EXPORT_SYMBOL_GPL(workqueue_set_max_active);
* RETURNS:
* %true if congested, %false otherwise.
*/
-bool workqueue_congested(unsigned int cpu, struct workqueue_struct *wq)
+bool workqueue_congested(int cpu, struct workqueue_struct *wq)
{
- struct pool_workqueue *pwq = get_pwq(cpu, wq);
+ struct pool_workqueue *pwq;
+ bool ret;
+
+ preempt_disable();
- return !list_empty(&pwq->delayed_works);
+ if (!(wq->flags & WQ_UNBOUND))
+ pwq = per_cpu_ptr(wq->cpu_pwqs, cpu);
+ else
+ pwq = first_pwq(wq);
+
+ ret = !list_empty(&pwq->delayed_works);
+ preempt_enable();
+
+ return ret;
}
EXPORT_SYMBOL_GPL(workqueue_congested);
*/
unsigned int work_busy(struct work_struct *work)
{
- struct worker_pool *pool = get_work_pool(work);
+ struct worker_pool *pool;
unsigned long flags;
unsigned int ret = 0;
if (work_pending(work))
ret |= WORK_BUSY_PENDING;
+ local_irq_save(flags);
+ pool = get_work_pool(work);
if (pool) {
- spin_lock_irqsave(&pool->lock, flags);
+ spin_lock(&pool->lock);
if (find_worker_executing_work(pool, work))
ret |= WORK_BUSY_RUNNING;
- spin_unlock_irqrestore(&pool->lock, flags);
+ spin_unlock(&pool->lock);
}
+ local_irq_restore(flags);
return ret;
}
int cpu = smp_processor_id();
struct worker_pool *pool;
struct worker *worker;
- struct hlist_node *pos;
int i;
for_each_std_worker_pool(pool, cpu) {
- BUG_ON(cpu != smp_processor_id());
+ WARN_ON_ONCE(cpu != smp_processor_id());
mutex_lock(&pool->assoc_mutex);
spin_lock_irq(&pool->lock);
list_for_each_entry(worker, &pool->idle_list, entry)
worker->flags |= WORKER_UNBOUND;
- for_each_busy_worker(worker, i, pos, pool)
+ for_each_busy_worker(worker, i, pool)
worker->flags |= WORKER_UNBOUND;
pool->flags |= POOL_DISASSOCIATED;
unsigned long action,
void *hcpu)
{
- unsigned int cpu = (unsigned long)hcpu;
+ int cpu = (unsigned long)hcpu;
struct worker_pool *pool;
switch (action & ~CPU_TASKS_FROZEN) {
unsigned long action,
void *hcpu)
{
- unsigned int cpu = (unsigned long)hcpu;
+ int cpu = (unsigned long)hcpu;
struct work_struct unbind_work;
switch (action & ~CPU_TASKS_FROZEN) {
* It is up to the caller to ensure that the cpu doesn't go offline.
* The caller must not hold any locks which would prevent @fn from completing.
*/
-long work_on_cpu(unsigned int cpu, long (*fn)(void *), void *arg)
+long work_on_cpu(int cpu, long (*fn)(void *), void *arg)
{
struct work_for_cpu wfc = { .fn = fn, .arg = arg };
*/
void freeze_workqueues_begin(void)
{
- unsigned int cpu;
+ struct worker_pool *pool;
+ struct workqueue_struct *wq;
+ struct pool_workqueue *pwq;
+ int id;
- spin_lock(&workqueue_lock);
+ spin_lock_irq(&workqueue_lock);
- BUG_ON(workqueue_freezing);
+ WARN_ON_ONCE(workqueue_freezing);
workqueue_freezing = true;
- for_each_wq_cpu(cpu) {
- struct worker_pool *pool;
- struct workqueue_struct *wq;
-
- for_each_std_worker_pool(pool, cpu) {
- spin_lock_irq(&pool->lock);
-
- WARN_ON_ONCE(pool->flags & POOL_FREEZING);
- pool->flags |= POOL_FREEZING;
-
- list_for_each_entry(wq, &workqueues, list) {
- struct pool_workqueue *pwq = get_pwq(cpu, wq);
+ /* set FREEZING */
+ for_each_pool(pool, id) {
+ spin_lock(&pool->lock);
+ WARN_ON_ONCE(pool->flags & POOL_FREEZING);
+ pool->flags |= POOL_FREEZING;
+ spin_unlock(&pool->lock);
+ }
- if (pwq && pwq->pool == pool &&
- (wq->flags & WQ_FREEZABLE))
- pwq->max_active = 0;
- }
+ /* suppress further executions by setting max_active to zero */
+ list_for_each_entry(wq, &workqueues, list) {
+ if (!(wq->flags & WQ_FREEZABLE))
+ continue;
- spin_unlock_irq(&pool->lock);
+ for_each_pwq(pwq, wq) {
+ spin_lock(&pwq->pool->lock);
+ pwq->max_active = 0;
+ spin_unlock(&pwq->pool->lock);
}
}
- spin_unlock(&workqueue_lock);
+ spin_unlock_irq(&workqueue_lock);
}
/**
*/
bool freeze_workqueues_busy(void)
{
- unsigned int cpu;
bool busy = false;
+ struct workqueue_struct *wq;
+ struct pool_workqueue *pwq;
- spin_lock(&workqueue_lock);
+ spin_lock_irq(&workqueue_lock);
- BUG_ON(!workqueue_freezing);
+ WARN_ON_ONCE(!workqueue_freezing);
- for_each_wq_cpu(cpu) {
- struct workqueue_struct *wq;
+ list_for_each_entry(wq, &workqueues, list) {
+ if (!(wq->flags & WQ_FREEZABLE))
+ continue;
/*
* nr_active is monotonically decreasing. It's safe
* to peek without lock.
*/
- list_for_each_entry(wq, &workqueues, list) {
- struct pool_workqueue *pwq = get_pwq(cpu, wq);
-
- if (!pwq || !(wq->flags & WQ_FREEZABLE))
- continue;
-
- BUG_ON(pwq->nr_active < 0);
+ for_each_pwq(pwq, wq) {
+ WARN_ON_ONCE(pwq->nr_active < 0);
if (pwq->nr_active) {
busy = true;
goto out_unlock;
}
}
out_unlock:
- spin_unlock(&workqueue_lock);
+ spin_unlock_irq(&workqueue_lock);
return busy;
}
*/
void thaw_workqueues(void)
{
- unsigned int cpu;
+ struct workqueue_struct *wq;
+ struct pool_workqueue *pwq;
+ struct worker_pool *pool;
+ int id;
- spin_lock(&workqueue_lock);
+ spin_lock_irq(&workqueue_lock);
if (!workqueue_freezing)
goto out_unlock;
- for_each_wq_cpu(cpu) {
- struct worker_pool *pool;
- struct workqueue_struct *wq;
-
- for_each_std_worker_pool(pool, cpu) {
- spin_lock_irq(&pool->lock);
-
- WARN_ON_ONCE(!(pool->flags & POOL_FREEZING));
- pool->flags &= ~POOL_FREEZING;
-
- list_for_each_entry(wq, &workqueues, list) {
- struct pool_workqueue *pwq = get_pwq(cpu, wq);
-
- if (!pwq || pwq->pool != pool ||
- !(wq->flags & WQ_FREEZABLE))
- continue;
-
- /* restore max_active and repopulate worklist */
- pwq_set_max_active(pwq, wq->saved_max_active);
- }
+ /* clear FREEZING */
+ for_each_pool(pool, id) {
+ spin_lock(&pool->lock);
+ WARN_ON_ONCE(!(pool->flags & POOL_FREEZING));
+ pool->flags &= ~POOL_FREEZING;
+ spin_unlock(&pool->lock);
+ }
- wake_up_worker(pool);
+ /* restore max_active and repopulate worklist */
+ list_for_each_entry(wq, &workqueues, list) {
+ if (!(wq->flags & WQ_FREEZABLE))
+ continue;
- spin_unlock_irq(&pool->lock);
+ for_each_pwq(pwq, wq) {
+ spin_lock(&pwq->pool->lock);
+ pwq_set_max_active(pwq, wq->saved_max_active);
+ spin_unlock(&pwq->pool->lock);
}
}
+ /* kick workers */
+ for_each_pool(pool, id) {
+ spin_lock(&pool->lock);
+ wake_up_worker(pool);
+ spin_unlock(&pool->lock);
+ }
+
workqueue_freezing = false;
out_unlock:
- spin_unlock(&workqueue_lock);
+ spin_unlock_irq(&workqueue_lock);
}
#endif /* CONFIG_FREEZER */
static int __init init_workqueues(void)
{
- unsigned int cpu;
+ int std_nice[NR_STD_WORKER_POOLS] = { 0, HIGHPRI_NICE_LEVEL };
+ int i, cpu;
/* make sure we have enough bits for OFFQ pool ID */
BUILD_BUG_ON((1LU << (BITS_PER_LONG - WORK_OFFQ_POOL_SHIFT)) <
WORK_CPU_END * NR_STD_WORKER_POOLS);
+ WARN_ON(__alignof__(struct pool_workqueue) < __alignof__(long long));
+
+ pwq_cache = KMEM_CACHE(pool_workqueue, SLAB_PANIC);
+
cpu_notifier(workqueue_cpu_up_callback, CPU_PRI_WORKQUEUE_UP);
hotcpu_notifier(workqueue_cpu_down_callback, CPU_PRI_WORKQUEUE_DOWN);
/* initialize CPU pools */
- for_each_wq_cpu(cpu) {
+ for_each_possible_cpu(cpu) {
struct worker_pool *pool;
+ i = 0;
for_each_std_worker_pool(pool, cpu) {
- spin_lock_init(&pool->lock);
+ BUG_ON(init_worker_pool(pool));
pool->cpu = cpu;
- pool->flags |= POOL_DISASSOCIATED;
- INIT_LIST_HEAD(&pool->worklist);
- INIT_LIST_HEAD(&pool->idle_list);
- hash_init(pool->busy_hash);
-
- init_timer_deferrable(&pool->idle_timer);
- pool->idle_timer.function = idle_worker_timeout;
- pool->idle_timer.data = (unsigned long)pool;
-
- setup_timer(&pool->mayday_timer, pool_mayday_timeout,
- (unsigned long)pool);
-
- mutex_init(&pool->assoc_mutex);
- ida_init(&pool->worker_ida);
+ cpumask_copy(pool->attrs->cpumask, cpumask_of(cpu));
+ pool->attrs->nice = std_nice[i++];
/* alloc pool ID */
BUG_ON(worker_pool_assign_id(pool));
}
/* create the initial worker */
- for_each_online_wq_cpu(cpu) {
+ for_each_online_cpu(cpu) {
struct worker_pool *pool;
for_each_std_worker_pool(pool, cpu) {
struct worker *worker;
- if (cpu != WORK_CPU_UNBOUND)
- pool->flags &= ~POOL_DISASSOCIATED;
+ pool->flags &= ~POOL_DISASSOCIATED;
worker = create_worker(pool);
BUG_ON(!worker);
}
}
+ /* create default unbound wq attrs */
+ for (i = 0; i < NR_STD_WORKER_POOLS; i++) {
+ struct workqueue_attrs *attrs;
+
+ BUG_ON(!(attrs = alloc_workqueue_attrs(GFP_KERNEL)));
+
+ attrs->nice = std_nice[i];
+ cpumask_setall(attrs->cpumask);
+
+ unbound_std_wq_attrs[i] = attrs;
+ }
+
system_wq = alloc_workqueue("events", 0, 0);
system_highpri_wq = alloc_workqueue("events_highpri", WQ_HIGHPRI, 0);
system_long_wq = alloc_workqueue("events_long", 0, 0);