diff options
Diffstat (limited to 'fs/io-wq.c')
-rw-r--r-- | fs/io-wq.c | 275 |
1 files changed, 184 insertions, 91 deletions
diff --git a/fs/io-wq.c b/fs/io-wq.c index cf086b01c6c6..cd9bd095fb1b 100644 --- a/fs/io-wq.c +++ b/fs/io-wq.c @@ -51,6 +51,10 @@ struct io_worker { struct completion ref_done; + unsigned long create_state; + struct callback_head create_work; + int create_index; + struct rcu_head rcu; }; @@ -129,7 +133,8 @@ struct io_cb_cancel_data { bool cancel_all; }; -static void create_io_worker(struct io_wq *wq, struct io_wqe *wqe, int index); +static void create_io_worker(struct io_wq *wq, struct io_wqe *wqe, int index, bool first); +static void io_wqe_dec_running(struct io_worker *worker); static bool io_worker_get(struct io_worker *worker) { @@ -168,27 +173,22 @@ static void io_worker_exit(struct io_worker *worker) { struct io_wqe *wqe = worker->wqe; struct io_wqe_acct *acct = io_wqe_get_acct(worker); - unsigned flags; if (refcount_dec_and_test(&worker->ref)) complete(&worker->ref_done); wait_for_completion(&worker->ref_done); - preempt_disable(); - current->flags &= ~PF_IO_WORKER; - flags = worker->flags; - worker->flags = 0; - if (flags & IO_WORKER_F_RUNNING) - atomic_dec(&acct->nr_running); - worker->flags = 0; - preempt_enable(); - - raw_spin_lock_irq(&wqe->lock); - if (flags & IO_WORKER_F_FREE) + raw_spin_lock(&wqe->lock); + if (worker->flags & IO_WORKER_F_FREE) hlist_nulls_del_rcu(&worker->nulls_node); list_del_rcu(&worker->all_list); acct->nr_workers--; - raw_spin_unlock_irq(&wqe->lock); + preempt_disable(); + io_wqe_dec_running(worker); + worker->flags = 0; + current->flags &= ~PF_IO_WORKER; + preempt_enable(); + raw_spin_unlock(&wqe->lock); kfree_rcu(worker, rcu); io_worker_ref_put(wqe->wq); @@ -214,15 +214,19 @@ static bool io_wqe_activate_free_worker(struct io_wqe *wqe) struct hlist_nulls_node *n; struct io_worker *worker; - n = rcu_dereference(hlist_nulls_first_rcu(&wqe->free_list)); - if (is_a_nulls(n)) - return false; - - worker = hlist_nulls_entry(n, struct io_worker, nulls_node); - if (io_worker_get(worker)) { - wake_up_process(worker->task); + /* + * Iterate free_list and see if we can find an idle worker to + * activate. If a given worker is on the free_list but in the process + * of exiting, keep trying. + */ + hlist_nulls_for_each_entry_rcu(worker, n, &wqe->free_list, nulls_node) { + if (!io_worker_get(worker)) + continue; + if (wake_up_process(worker->task)) { + io_worker_release(worker); + return true; + } io_worker_release(worker); - return true; } return false; @@ -247,10 +251,22 @@ static void io_wqe_wake_worker(struct io_wqe *wqe, struct io_wqe_acct *acct) ret = io_wqe_activate_free_worker(wqe); rcu_read_unlock(); - if (!ret && acct->nr_workers < acct->max_workers) { - atomic_inc(&acct->nr_running); - atomic_inc(&wqe->wq->worker_refs); - create_io_worker(wqe->wq, wqe, acct->index); + if (!ret) { + bool do_create = false, first = false; + + raw_spin_lock(&wqe->lock); + if (acct->nr_workers < acct->max_workers) { + if (!acct->nr_workers) + first = true; + acct->nr_workers++; + do_create = true; + } + raw_spin_unlock(&wqe->lock); + if (do_create) { + atomic_inc(&acct->nr_running); + atomic_inc(&wqe->wq->worker_refs); + create_io_worker(wqe->wq, wqe, acct->index, first); + } } } @@ -261,42 +277,63 @@ static void io_wqe_inc_running(struct io_worker *worker) atomic_inc(&acct->nr_running); } -struct create_worker_data { - struct callback_head work; - struct io_wqe *wqe; - int index; -}; - static void create_worker_cb(struct callback_head *cb) { - struct create_worker_data *cwd; + struct io_worker *worker; struct io_wq *wq; - - cwd = container_of(cb, struct create_worker_data, work); - wq = cwd->wqe->wq; - create_io_worker(wq, cwd->wqe, cwd->index); - kfree(cwd); + struct io_wqe *wqe; + struct io_wqe_acct *acct; + bool do_create = false, first = false; + + worker = container_of(cb, struct io_worker, create_work); + wqe = worker->wqe; + wq = wqe->wq; + acct = &wqe->acct[worker->create_index]; + raw_spin_lock(&wqe->lock); + if (acct->nr_workers < acct->max_workers) { + if (!acct->nr_workers) + first = true; + acct->nr_workers++; + do_create = true; + } + raw_spin_unlock(&wqe->lock); + if (do_create) { + create_io_worker(wq, wqe, worker->create_index, first); + } else { + atomic_dec(&acct->nr_running); + io_worker_ref_put(wq); + } + clear_bit_unlock(0, &worker->create_state); + io_worker_release(worker); } -static void io_queue_worker_create(struct io_wqe *wqe, struct io_wqe_acct *acct) +static void io_queue_worker_create(struct io_wqe *wqe, struct io_worker *worker, + struct io_wqe_acct *acct) { - struct create_worker_data *cwd; struct io_wq *wq = wqe->wq; /* raced with exit, just ignore create call */ if (test_bit(IO_WQ_BIT_EXIT, &wq->state)) goto fail; + if (!io_worker_get(worker)) + goto fail; + /* + * create_state manages ownership of create_work/index. We should + * only need one entry per worker, as the worker going to sleep + * will trigger the condition, and waking will clear it once it + * runs the task_work. + */ + if (test_bit(0, &worker->create_state) || + test_and_set_bit_lock(0, &worker->create_state)) + goto fail_release; - cwd = kmalloc(sizeof(*cwd), GFP_ATOMIC); - if (cwd) { - init_task_work(&cwd->work, create_worker_cb); - cwd->wqe = wqe; - cwd->index = acct->index; - if (!task_work_add(wq->task, &cwd->work, TWA_SIGNAL)) - return; - - kfree(cwd); - } + init_task_work(&worker->create_work, create_worker_cb); + worker->create_index = acct->index; + if (!task_work_add(wq->task, &worker->create_work, TWA_SIGNAL)) + return; + clear_bit_unlock(0, &worker->create_state); +fail_release: + io_worker_release(worker); fail: atomic_dec(&acct->nr_running); io_worker_ref_put(wq); @@ -314,7 +351,7 @@ static void io_wqe_dec_running(struct io_worker *worker) if (atomic_dec_and_test(&acct->nr_running) && io_wqe_run_queue(wqe)) { atomic_inc(&acct->nr_running); atomic_inc(&wqe->wq->worker_refs); - io_queue_worker_create(wqe, acct); + io_queue_worker_create(wqe, worker, acct); } } @@ -387,7 +424,28 @@ static void io_wait_on_hash(struct io_wqe *wqe, unsigned int hash) spin_unlock(&wq->hash->wait.lock); } -static struct io_wq_work *io_get_next_work(struct io_wqe *wqe) +/* + * We can always run the work if the worker is currently the same type as + * the work (eg both are bound, or both are unbound). If they are not the + * same, only allow it if incrementing the worker count would be allowed. + */ +static bool io_worker_can_run_work(struct io_worker *worker, + struct io_wq_work *work) +{ + struct io_wqe_acct *acct; + + if (!(worker->flags & IO_WORKER_F_BOUND) != + !(work->flags & IO_WQ_WORK_UNBOUND)) + return true; + + /* not the same type, check if we'd go over the limit */ + acct = io_work_get_acct(worker->wqe, work); + return acct->nr_workers < acct->max_workers; +} + +static struct io_wq_work *io_get_next_work(struct io_wqe *wqe, + struct io_worker *worker, + bool *stalled) __must_hold(wqe->lock) { struct io_wq_work_node *node, *prev; @@ -399,6 +457,9 @@ static struct io_wq_work *io_get_next_work(struct io_wqe *wqe) work = container_of(node, struct io_wq_work, list); + if (!io_worker_can_run_work(worker, work)) + break; + /* not hashed, can run anytime */ if (!io_wq_is_hashed(work)) { wq_list_del(&wqe->work_list, node, prev); @@ -425,6 +486,7 @@ static struct io_wq_work *io_get_next_work(struct io_wqe *wqe) raw_spin_unlock(&wqe->lock); io_wait_on_hash(wqe, stall_hash); raw_spin_lock(&wqe->lock); + *stalled = true; } return NULL; @@ -448,9 +510,9 @@ static void io_assign_current_work(struct io_worker *worker, cond_resched(); } - spin_lock_irq(&worker->lock); + spin_lock(&worker->lock); worker->cur_work = work; - spin_unlock_irq(&worker->lock); + spin_unlock(&worker->lock); } static void io_wqe_enqueue(struct io_wqe *wqe, struct io_wq_work *work); @@ -464,6 +526,7 @@ static void io_worker_handle_work(struct io_worker *worker) do { struct io_wq_work *work; + bool stalled; get_next: /* * If we got some work, mark us as busy. If we didn't, but @@ -472,13 +535,14 @@ get_next: * can't make progress, any work completion or insertion will * clear the stalled flag. */ - work = io_get_next_work(wqe); + stalled = false; + work = io_get_next_work(wqe, worker, &stalled); if (work) __io_worker_busy(wqe, worker, work); - else if (!wq_list_empty(&wqe->work_list)) + else if (stalled) wqe->flags |= IO_WQE_FLAG_STALLED; - raw_spin_unlock_irq(&wqe->lock); + raw_spin_unlock(&wqe->lock); if (!work) break; io_assign_current_work(worker, work); @@ -510,16 +574,16 @@ get_next: clear_bit(hash, &wq->hash->map); if (wq_has_sleeper(&wq->hash->wait)) wake_up(&wq->hash->wait); - raw_spin_lock_irq(&wqe->lock); + raw_spin_lock(&wqe->lock); wqe->flags &= ~IO_WQE_FLAG_STALLED; /* skip unnecessary unlock-lock wqe->lock */ if (!work) goto get_next; - raw_spin_unlock_irq(&wqe->lock); + raw_spin_unlock(&wqe->lock); } } while (work); - raw_spin_lock_irq(&wqe->lock); + raw_spin_lock(&wqe->lock); } while (1); } @@ -540,13 +604,13 @@ static int io_wqe_worker(void *data) set_current_state(TASK_INTERRUPTIBLE); loop: - raw_spin_lock_irq(&wqe->lock); + raw_spin_lock(&wqe->lock); if (io_wqe_run_queue(wqe)) { io_worker_handle_work(worker); goto loop; } __io_worker_idle(wqe, worker); - raw_spin_unlock_irq(&wqe->lock); + raw_spin_unlock(&wqe->lock); if (io_flush_signals()) continue; ret = schedule_timeout(WORKER_IDLE_TIMEOUT); @@ -565,7 +629,7 @@ loop: } if (test_bit(IO_WQ_BIT_EXIT, &wq->state)) { - raw_spin_lock_irq(&wqe->lock); + raw_spin_lock(&wqe->lock); io_worker_handle_work(worker); } @@ -607,12 +671,12 @@ void io_wq_worker_sleeping(struct task_struct *tsk) worker->flags &= ~IO_WORKER_F_RUNNING; - raw_spin_lock_irq(&worker->wqe->lock); + raw_spin_lock(&worker->wqe->lock); io_wqe_dec_running(worker); - raw_spin_unlock_irq(&worker->wqe->lock); + raw_spin_unlock(&worker->wqe->lock); } -static void create_io_worker(struct io_wq *wq, struct io_wqe *wqe, int index) +static void create_io_worker(struct io_wq *wq, struct io_wqe *wqe, int index, bool first) { struct io_wqe_acct *acct = &wqe->acct[index]; struct io_worker *worker; @@ -635,6 +699,9 @@ static void create_io_worker(struct io_wq *wq, struct io_wqe *wqe, int index) kfree(worker); fail: atomic_dec(&acct->nr_running); + raw_spin_lock(&wqe->lock); + acct->nr_workers--; + raw_spin_unlock(&wqe->lock); io_worker_ref_put(wq); return; } @@ -644,16 +711,15 @@ fail: set_cpus_allowed_ptr(tsk, wqe->cpu_mask); tsk->flags |= PF_NO_SETAFFINITY; - raw_spin_lock_irq(&wqe->lock); + raw_spin_lock(&wqe->lock); hlist_nulls_add_head_rcu(&worker->nulls_node, &wqe->free_list); list_add_tail_rcu(&worker->all_list, &wqe->all_list); worker->flags |= IO_WORKER_F_FREE; if (index == IO_WQ_ACCT_BOUND) worker->flags |= IO_WORKER_F_BOUND; - if (!acct->nr_workers && (worker->flags & IO_WORKER_F_BOUND)) + if (first && (worker->flags & IO_WORKER_F_BOUND)) worker->flags |= IO_WORKER_F_FIXED; - acct->nr_workers++; - raw_spin_unlock_irq(&wqe->lock); + raw_spin_unlock(&wqe->lock); wake_up_new_task(tsk); } @@ -728,8 +794,7 @@ append: static void io_wqe_enqueue(struct io_wqe *wqe, struct io_wq_work *work) { struct io_wqe_acct *acct = io_work_get_acct(wqe, work); - int work_flags; - unsigned long flags; + bool do_wake; /* * If io-wq is exiting for this task, or if the request has explicitly @@ -741,14 +806,14 @@ static void io_wqe_enqueue(struct io_wqe *wqe, struct io_wq_work *work) return; } - work_flags = work->flags; - raw_spin_lock_irqsave(&wqe->lock, flags); + raw_spin_lock(&wqe->lock); io_wqe_insert_work(wqe, work); wqe->flags &= ~IO_WQE_FLAG_STALLED; - raw_spin_unlock_irqrestore(&wqe->lock, flags); + do_wake = (work->flags & IO_WQ_WORK_CONCURRENT) || + !atomic_read(&acct->nr_running); + raw_spin_unlock(&wqe->lock); - if ((work_flags & IO_WQ_WORK_CONCURRENT) || - !atomic_read(&acct->nr_running)) + if (do_wake) io_wqe_wake_worker(wqe, acct); } @@ -774,19 +839,18 @@ void io_wq_hash_work(struct io_wq_work *work, void *val) static bool io_wq_worker_cancel(struct io_worker *worker, void *data) { struct io_cb_cancel_data *match = data; - unsigned long flags; /* * Hold the lock to avoid ->cur_work going out of scope, caller * may dereference the passed in work. */ - spin_lock_irqsave(&worker->lock, flags); + spin_lock(&worker->lock); if (worker->cur_work && match->fn(worker->cur_work, match->data)) { set_notify_signal(worker->task); match->nr_running++; } - spin_unlock_irqrestore(&worker->lock, flags); + spin_unlock(&worker->lock); return match->nr_running && !match->cancel_all; } @@ -814,16 +878,15 @@ static void io_wqe_cancel_pending_work(struct io_wqe *wqe, { struct io_wq_work_node *node, *prev; struct io_wq_work *work; - unsigned long flags; retry: - raw_spin_lock_irqsave(&wqe->lock, flags); + raw_spin_lock(&wqe->lock); wq_list_for_each(node, prev, &wqe->work_list) { work = container_of(node, struct io_wq_work, list); if (!match->fn(work, match->data)) continue; io_wqe_remove_pending(wqe, work, prev); - raw_spin_unlock_irqrestore(&wqe->lock, flags); + raw_spin_unlock(&wqe->lock); io_run_cancel(work, wqe); match->nr_pending++; if (!match->cancel_all) @@ -832,7 +895,7 @@ retry: /* not safe to continue after unlock */ goto retry; } - raw_spin_unlock_irqrestore(&wqe->lock, flags); + raw_spin_unlock(&wqe->lock); } static void io_wqe_cancel_running_work(struct io_wqe *wqe, @@ -973,12 +1036,12 @@ err_wq: static bool io_task_work_match(struct callback_head *cb, void *data) { - struct create_worker_data *cwd; + struct io_worker *worker; if (cb->func != create_worker_cb) return false; - cwd = container_of(cb, struct create_worker_data, work); - return cwd->wqe->wq == data; + worker = container_of(cb, struct io_worker, create_work); + return worker->wqe->wq == data; } void io_wq_exit_start(struct io_wq *wq) @@ -995,12 +1058,13 @@ static void io_wq_exit_workers(struct io_wq *wq) return; while ((cb = task_work_cancel_match(wq->task, io_task_work_match, wq)) != NULL) { - struct create_worker_data *cwd; + struct io_worker *worker; - cwd = container_of(cb, struct create_worker_data, work); - atomic_dec(&cwd->wqe->acct[cwd->index].nr_running); + worker = container_of(cb, struct io_worker, create_work); + atomic_dec(&worker->wqe->acct[worker->create_index].nr_running); io_worker_ref_put(wq); - kfree(cwd); + clear_bit_unlock(0, &worker->create_state); + io_worker_release(worker); } rcu_read_lock(); @@ -1112,6 +1176,35 @@ int io_wq_cpu_affinity(struct io_wq *wq, cpumask_var_t mask) return 0; } +/* + * Set max number of unbounded workers, returns old value. If new_count is 0, + * then just return the old value. + */ +int io_wq_max_workers(struct io_wq *wq, int *new_count) +{ + int i, node, prev = 0; + + for (i = 0; i < 2; i++) { + if (new_count[i] > task_rlimit(current, RLIMIT_NPROC)) + new_count[i] = task_rlimit(current, RLIMIT_NPROC); + } + + rcu_read_lock(); + for_each_node(node) { + struct io_wqe_acct *acct; + + for (i = 0; i < 2; i++) { + acct = &wq->wqes[node]->acct[i]; + prev = max_t(int, acct->max_workers, prev); + if (new_count[i]) + acct->max_workers = new_count[i]; + new_count[i] = prev; + } + } + rcu_read_unlock(); + return 0; +} + static __init int io_wq_init(void) { int ret; |