summaryrefslogtreecommitdiff
path: root/fs/io-wq.c
diff options
context:
space:
mode:
Diffstat (limited to 'fs/io-wq.c')
-rw-r--r--fs/io-wq.c275
1 files changed, 184 insertions, 91 deletions
diff --git a/fs/io-wq.c b/fs/io-wq.c
index cf086b01c6c6..cd9bd095fb1b 100644
--- a/fs/io-wq.c
+++ b/fs/io-wq.c
@@ -51,6 +51,10 @@ struct io_worker {
struct completion ref_done;
+ unsigned long create_state;
+ struct callback_head create_work;
+ int create_index;
+
struct rcu_head rcu;
};
@@ -129,7 +133,8 @@ struct io_cb_cancel_data {
bool cancel_all;
};
-static void create_io_worker(struct io_wq *wq, struct io_wqe *wqe, int index);
+static void create_io_worker(struct io_wq *wq, struct io_wqe *wqe, int index, bool first);
+static void io_wqe_dec_running(struct io_worker *worker);
static bool io_worker_get(struct io_worker *worker)
{
@@ -168,27 +173,22 @@ static void io_worker_exit(struct io_worker *worker)
{
struct io_wqe *wqe = worker->wqe;
struct io_wqe_acct *acct = io_wqe_get_acct(worker);
- unsigned flags;
if (refcount_dec_and_test(&worker->ref))
complete(&worker->ref_done);
wait_for_completion(&worker->ref_done);
- preempt_disable();
- current->flags &= ~PF_IO_WORKER;
- flags = worker->flags;
- worker->flags = 0;
- if (flags & IO_WORKER_F_RUNNING)
- atomic_dec(&acct->nr_running);
- worker->flags = 0;
- preempt_enable();
-
- raw_spin_lock_irq(&wqe->lock);
- if (flags & IO_WORKER_F_FREE)
+ raw_spin_lock(&wqe->lock);
+ if (worker->flags & IO_WORKER_F_FREE)
hlist_nulls_del_rcu(&worker->nulls_node);
list_del_rcu(&worker->all_list);
acct->nr_workers--;
- raw_spin_unlock_irq(&wqe->lock);
+ preempt_disable();
+ io_wqe_dec_running(worker);
+ worker->flags = 0;
+ current->flags &= ~PF_IO_WORKER;
+ preempt_enable();
+ raw_spin_unlock(&wqe->lock);
kfree_rcu(worker, rcu);
io_worker_ref_put(wqe->wq);
@@ -214,15 +214,19 @@ static bool io_wqe_activate_free_worker(struct io_wqe *wqe)
struct hlist_nulls_node *n;
struct io_worker *worker;
- n = rcu_dereference(hlist_nulls_first_rcu(&wqe->free_list));
- if (is_a_nulls(n))
- return false;
-
- worker = hlist_nulls_entry(n, struct io_worker, nulls_node);
- if (io_worker_get(worker)) {
- wake_up_process(worker->task);
+ /*
+ * Iterate free_list and see if we can find an idle worker to
+ * activate. If a given worker is on the free_list but in the process
+ * of exiting, keep trying.
+ */
+ hlist_nulls_for_each_entry_rcu(worker, n, &wqe->free_list, nulls_node) {
+ if (!io_worker_get(worker))
+ continue;
+ if (wake_up_process(worker->task)) {
+ io_worker_release(worker);
+ return true;
+ }
io_worker_release(worker);
- return true;
}
return false;
@@ -247,10 +251,22 @@ static void io_wqe_wake_worker(struct io_wqe *wqe, struct io_wqe_acct *acct)
ret = io_wqe_activate_free_worker(wqe);
rcu_read_unlock();
- if (!ret && acct->nr_workers < acct->max_workers) {
- atomic_inc(&acct->nr_running);
- atomic_inc(&wqe->wq->worker_refs);
- create_io_worker(wqe->wq, wqe, acct->index);
+ if (!ret) {
+ bool do_create = false, first = false;
+
+ raw_spin_lock(&wqe->lock);
+ if (acct->nr_workers < acct->max_workers) {
+ if (!acct->nr_workers)
+ first = true;
+ acct->nr_workers++;
+ do_create = true;
+ }
+ raw_spin_unlock(&wqe->lock);
+ if (do_create) {
+ atomic_inc(&acct->nr_running);
+ atomic_inc(&wqe->wq->worker_refs);
+ create_io_worker(wqe->wq, wqe, acct->index, first);
+ }
}
}
@@ -261,42 +277,63 @@ static void io_wqe_inc_running(struct io_worker *worker)
atomic_inc(&acct->nr_running);
}
-struct create_worker_data {
- struct callback_head work;
- struct io_wqe *wqe;
- int index;
-};
-
static void create_worker_cb(struct callback_head *cb)
{
- struct create_worker_data *cwd;
+ struct io_worker *worker;
struct io_wq *wq;
-
- cwd = container_of(cb, struct create_worker_data, work);
- wq = cwd->wqe->wq;
- create_io_worker(wq, cwd->wqe, cwd->index);
- kfree(cwd);
+ struct io_wqe *wqe;
+ struct io_wqe_acct *acct;
+ bool do_create = false, first = false;
+
+ worker = container_of(cb, struct io_worker, create_work);
+ wqe = worker->wqe;
+ wq = wqe->wq;
+ acct = &wqe->acct[worker->create_index];
+ raw_spin_lock(&wqe->lock);
+ if (acct->nr_workers < acct->max_workers) {
+ if (!acct->nr_workers)
+ first = true;
+ acct->nr_workers++;
+ do_create = true;
+ }
+ raw_spin_unlock(&wqe->lock);
+ if (do_create) {
+ create_io_worker(wq, wqe, worker->create_index, first);
+ } else {
+ atomic_dec(&acct->nr_running);
+ io_worker_ref_put(wq);
+ }
+ clear_bit_unlock(0, &worker->create_state);
+ io_worker_release(worker);
}
-static void io_queue_worker_create(struct io_wqe *wqe, struct io_wqe_acct *acct)
+static void io_queue_worker_create(struct io_wqe *wqe, struct io_worker *worker,
+ struct io_wqe_acct *acct)
{
- struct create_worker_data *cwd;
struct io_wq *wq = wqe->wq;
/* raced with exit, just ignore create call */
if (test_bit(IO_WQ_BIT_EXIT, &wq->state))
goto fail;
+ if (!io_worker_get(worker))
+ goto fail;
+ /*
+ * create_state manages ownership of create_work/index. We should
+ * only need one entry per worker, as the worker going to sleep
+ * will trigger the condition, and waking will clear it once it
+ * runs the task_work.
+ */
+ if (test_bit(0, &worker->create_state) ||
+ test_and_set_bit_lock(0, &worker->create_state))
+ goto fail_release;
- cwd = kmalloc(sizeof(*cwd), GFP_ATOMIC);
- if (cwd) {
- init_task_work(&cwd->work, create_worker_cb);
- cwd->wqe = wqe;
- cwd->index = acct->index;
- if (!task_work_add(wq->task, &cwd->work, TWA_SIGNAL))
- return;
-
- kfree(cwd);
- }
+ init_task_work(&worker->create_work, create_worker_cb);
+ worker->create_index = acct->index;
+ if (!task_work_add(wq->task, &worker->create_work, TWA_SIGNAL))
+ return;
+ clear_bit_unlock(0, &worker->create_state);
+fail_release:
+ io_worker_release(worker);
fail:
atomic_dec(&acct->nr_running);
io_worker_ref_put(wq);
@@ -314,7 +351,7 @@ static void io_wqe_dec_running(struct io_worker *worker)
if (atomic_dec_and_test(&acct->nr_running) && io_wqe_run_queue(wqe)) {
atomic_inc(&acct->nr_running);
atomic_inc(&wqe->wq->worker_refs);
- io_queue_worker_create(wqe, acct);
+ io_queue_worker_create(wqe, worker, acct);
}
}
@@ -387,7 +424,28 @@ static void io_wait_on_hash(struct io_wqe *wqe, unsigned int hash)
spin_unlock(&wq->hash->wait.lock);
}
-static struct io_wq_work *io_get_next_work(struct io_wqe *wqe)
+/*
+ * We can always run the work if the worker is currently the same type as
+ * the work (eg both are bound, or both are unbound). If they are not the
+ * same, only allow it if incrementing the worker count would be allowed.
+ */
+static bool io_worker_can_run_work(struct io_worker *worker,
+ struct io_wq_work *work)
+{
+ struct io_wqe_acct *acct;
+
+ if (!(worker->flags & IO_WORKER_F_BOUND) !=
+ !(work->flags & IO_WQ_WORK_UNBOUND))
+ return true;
+
+ /* not the same type, check if we'd go over the limit */
+ acct = io_work_get_acct(worker->wqe, work);
+ return acct->nr_workers < acct->max_workers;
+}
+
+static struct io_wq_work *io_get_next_work(struct io_wqe *wqe,
+ struct io_worker *worker,
+ bool *stalled)
__must_hold(wqe->lock)
{
struct io_wq_work_node *node, *prev;
@@ -399,6 +457,9 @@ static struct io_wq_work *io_get_next_work(struct io_wqe *wqe)
work = container_of(node, struct io_wq_work, list);
+ if (!io_worker_can_run_work(worker, work))
+ break;
+
/* not hashed, can run anytime */
if (!io_wq_is_hashed(work)) {
wq_list_del(&wqe->work_list, node, prev);
@@ -425,6 +486,7 @@ static struct io_wq_work *io_get_next_work(struct io_wqe *wqe)
raw_spin_unlock(&wqe->lock);
io_wait_on_hash(wqe, stall_hash);
raw_spin_lock(&wqe->lock);
+ *stalled = true;
}
return NULL;
@@ -448,9 +510,9 @@ static void io_assign_current_work(struct io_worker *worker,
cond_resched();
}
- spin_lock_irq(&worker->lock);
+ spin_lock(&worker->lock);
worker->cur_work = work;
- spin_unlock_irq(&worker->lock);
+ spin_unlock(&worker->lock);
}
static void io_wqe_enqueue(struct io_wqe *wqe, struct io_wq_work *work);
@@ -464,6 +526,7 @@ static void io_worker_handle_work(struct io_worker *worker)
do {
struct io_wq_work *work;
+ bool stalled;
get_next:
/*
* If we got some work, mark us as busy. If we didn't, but
@@ -472,13 +535,14 @@ get_next:
* can't make progress, any work completion or insertion will
* clear the stalled flag.
*/
- work = io_get_next_work(wqe);
+ stalled = false;
+ work = io_get_next_work(wqe, worker, &stalled);
if (work)
__io_worker_busy(wqe, worker, work);
- else if (!wq_list_empty(&wqe->work_list))
+ else if (stalled)
wqe->flags |= IO_WQE_FLAG_STALLED;
- raw_spin_unlock_irq(&wqe->lock);
+ raw_spin_unlock(&wqe->lock);
if (!work)
break;
io_assign_current_work(worker, work);
@@ -510,16 +574,16 @@ get_next:
clear_bit(hash, &wq->hash->map);
if (wq_has_sleeper(&wq->hash->wait))
wake_up(&wq->hash->wait);
- raw_spin_lock_irq(&wqe->lock);
+ raw_spin_lock(&wqe->lock);
wqe->flags &= ~IO_WQE_FLAG_STALLED;
/* skip unnecessary unlock-lock wqe->lock */
if (!work)
goto get_next;
- raw_spin_unlock_irq(&wqe->lock);
+ raw_spin_unlock(&wqe->lock);
}
} while (work);
- raw_spin_lock_irq(&wqe->lock);
+ raw_spin_lock(&wqe->lock);
} while (1);
}
@@ -540,13 +604,13 @@ static int io_wqe_worker(void *data)
set_current_state(TASK_INTERRUPTIBLE);
loop:
- raw_spin_lock_irq(&wqe->lock);
+ raw_spin_lock(&wqe->lock);
if (io_wqe_run_queue(wqe)) {
io_worker_handle_work(worker);
goto loop;
}
__io_worker_idle(wqe, worker);
- raw_spin_unlock_irq(&wqe->lock);
+ raw_spin_unlock(&wqe->lock);
if (io_flush_signals())
continue;
ret = schedule_timeout(WORKER_IDLE_TIMEOUT);
@@ -565,7 +629,7 @@ loop:
}
if (test_bit(IO_WQ_BIT_EXIT, &wq->state)) {
- raw_spin_lock_irq(&wqe->lock);
+ raw_spin_lock(&wqe->lock);
io_worker_handle_work(worker);
}
@@ -607,12 +671,12 @@ void io_wq_worker_sleeping(struct task_struct *tsk)
worker->flags &= ~IO_WORKER_F_RUNNING;
- raw_spin_lock_irq(&worker->wqe->lock);
+ raw_spin_lock(&worker->wqe->lock);
io_wqe_dec_running(worker);
- raw_spin_unlock_irq(&worker->wqe->lock);
+ raw_spin_unlock(&worker->wqe->lock);
}
-static void create_io_worker(struct io_wq *wq, struct io_wqe *wqe, int index)
+static void create_io_worker(struct io_wq *wq, struct io_wqe *wqe, int index, bool first)
{
struct io_wqe_acct *acct = &wqe->acct[index];
struct io_worker *worker;
@@ -635,6 +699,9 @@ static void create_io_worker(struct io_wq *wq, struct io_wqe *wqe, int index)
kfree(worker);
fail:
atomic_dec(&acct->nr_running);
+ raw_spin_lock(&wqe->lock);
+ acct->nr_workers--;
+ raw_spin_unlock(&wqe->lock);
io_worker_ref_put(wq);
return;
}
@@ -644,16 +711,15 @@ fail:
set_cpus_allowed_ptr(tsk, wqe->cpu_mask);
tsk->flags |= PF_NO_SETAFFINITY;
- raw_spin_lock_irq(&wqe->lock);
+ raw_spin_lock(&wqe->lock);
hlist_nulls_add_head_rcu(&worker->nulls_node, &wqe->free_list);
list_add_tail_rcu(&worker->all_list, &wqe->all_list);
worker->flags |= IO_WORKER_F_FREE;
if (index == IO_WQ_ACCT_BOUND)
worker->flags |= IO_WORKER_F_BOUND;
- if (!acct->nr_workers && (worker->flags & IO_WORKER_F_BOUND))
+ if (first && (worker->flags & IO_WORKER_F_BOUND))
worker->flags |= IO_WORKER_F_FIXED;
- acct->nr_workers++;
- raw_spin_unlock_irq(&wqe->lock);
+ raw_spin_unlock(&wqe->lock);
wake_up_new_task(tsk);
}
@@ -728,8 +794,7 @@ append:
static void io_wqe_enqueue(struct io_wqe *wqe, struct io_wq_work *work)
{
struct io_wqe_acct *acct = io_work_get_acct(wqe, work);
- int work_flags;
- unsigned long flags;
+ bool do_wake;
/*
* If io-wq is exiting for this task, or if the request has explicitly
@@ -741,14 +806,14 @@ static void io_wqe_enqueue(struct io_wqe *wqe, struct io_wq_work *work)
return;
}
- work_flags = work->flags;
- raw_spin_lock_irqsave(&wqe->lock, flags);
+ raw_spin_lock(&wqe->lock);
io_wqe_insert_work(wqe, work);
wqe->flags &= ~IO_WQE_FLAG_STALLED;
- raw_spin_unlock_irqrestore(&wqe->lock, flags);
+ do_wake = (work->flags & IO_WQ_WORK_CONCURRENT) ||
+ !atomic_read(&acct->nr_running);
+ raw_spin_unlock(&wqe->lock);
- if ((work_flags & IO_WQ_WORK_CONCURRENT) ||
- !atomic_read(&acct->nr_running))
+ if (do_wake)
io_wqe_wake_worker(wqe, acct);
}
@@ -774,19 +839,18 @@ void io_wq_hash_work(struct io_wq_work *work, void *val)
static bool io_wq_worker_cancel(struct io_worker *worker, void *data)
{
struct io_cb_cancel_data *match = data;
- unsigned long flags;
/*
* Hold the lock to avoid ->cur_work going out of scope, caller
* may dereference the passed in work.
*/
- spin_lock_irqsave(&worker->lock, flags);
+ spin_lock(&worker->lock);
if (worker->cur_work &&
match->fn(worker->cur_work, match->data)) {
set_notify_signal(worker->task);
match->nr_running++;
}
- spin_unlock_irqrestore(&worker->lock, flags);
+ spin_unlock(&worker->lock);
return match->nr_running && !match->cancel_all;
}
@@ -814,16 +878,15 @@ static void io_wqe_cancel_pending_work(struct io_wqe *wqe,
{
struct io_wq_work_node *node, *prev;
struct io_wq_work *work;
- unsigned long flags;
retry:
- raw_spin_lock_irqsave(&wqe->lock, flags);
+ raw_spin_lock(&wqe->lock);
wq_list_for_each(node, prev, &wqe->work_list) {
work = container_of(node, struct io_wq_work, list);
if (!match->fn(work, match->data))
continue;
io_wqe_remove_pending(wqe, work, prev);
- raw_spin_unlock_irqrestore(&wqe->lock, flags);
+ raw_spin_unlock(&wqe->lock);
io_run_cancel(work, wqe);
match->nr_pending++;
if (!match->cancel_all)
@@ -832,7 +895,7 @@ retry:
/* not safe to continue after unlock */
goto retry;
}
- raw_spin_unlock_irqrestore(&wqe->lock, flags);
+ raw_spin_unlock(&wqe->lock);
}
static void io_wqe_cancel_running_work(struct io_wqe *wqe,
@@ -973,12 +1036,12 @@ err_wq:
static bool io_task_work_match(struct callback_head *cb, void *data)
{
- struct create_worker_data *cwd;
+ struct io_worker *worker;
if (cb->func != create_worker_cb)
return false;
- cwd = container_of(cb, struct create_worker_data, work);
- return cwd->wqe->wq == data;
+ worker = container_of(cb, struct io_worker, create_work);
+ return worker->wqe->wq == data;
}
void io_wq_exit_start(struct io_wq *wq)
@@ -995,12 +1058,13 @@ static void io_wq_exit_workers(struct io_wq *wq)
return;
while ((cb = task_work_cancel_match(wq->task, io_task_work_match, wq)) != NULL) {
- struct create_worker_data *cwd;
+ struct io_worker *worker;
- cwd = container_of(cb, struct create_worker_data, work);
- atomic_dec(&cwd->wqe->acct[cwd->index].nr_running);
+ worker = container_of(cb, struct io_worker, create_work);
+ atomic_dec(&worker->wqe->acct[worker->create_index].nr_running);
io_worker_ref_put(wq);
- kfree(cwd);
+ clear_bit_unlock(0, &worker->create_state);
+ io_worker_release(worker);
}
rcu_read_lock();
@@ -1112,6 +1176,35 @@ int io_wq_cpu_affinity(struct io_wq *wq, cpumask_var_t mask)
return 0;
}
+/*
+ * Set max number of unbounded workers, returns old value. If new_count is 0,
+ * then just return the old value.
+ */
+int io_wq_max_workers(struct io_wq *wq, int *new_count)
+{
+ int i, node, prev = 0;
+
+ for (i = 0; i < 2; i++) {
+ if (new_count[i] > task_rlimit(current, RLIMIT_NPROC))
+ new_count[i] = task_rlimit(current, RLIMIT_NPROC);
+ }
+
+ rcu_read_lock();
+ for_each_node(node) {
+ struct io_wqe_acct *acct;
+
+ for (i = 0; i < 2; i++) {
+ acct = &wq->wqes[node]->acct[i];
+ prev = max_t(int, acct->max_workers, prev);
+ if (new_count[i])
+ acct->max_workers = new_count[i];
+ new_count[i] = prev;
+ }
+ }
+ rcu_read_unlock();
+ return 0;
+}
+
static __init int io_wq_init(void)
{
int ret;