summaryrefslogtreecommitdiff
path: root/io_uring/io-wq.c
diff options
context:
space:
mode:
Diffstat (limited to 'io_uring/io-wq.c')
-rw-r--r--io_uring/io-wq.c70
1 files changed, 47 insertions, 23 deletions
diff --git a/io_uring/io-wq.c b/io_uring/io-wq.c
index 399e9a15c38d..62f345587df5 100644
--- a/io_uring/io-wq.c
+++ b/io_uring/io-wq.c
@@ -232,17 +232,25 @@ static void io_worker_exit(struct io_worker *worker)
do_exit(0);
}
-static inline bool io_acct_run_queue(struct io_wq_acct *acct)
+static inline bool __io_acct_run_queue(struct io_wq_acct *acct)
{
- bool ret = false;
+ return !test_bit(IO_ACCT_STALLED_BIT, &acct->flags) &&
+ !wq_list_empty(&acct->work_list);
+}
+/*
+ * If there's work to do, returns true with acct->lock acquired. If not,
+ * returns false with no lock held.
+ */
+static inline bool io_acct_run_queue(struct io_wq_acct *acct)
+ __acquires(&acct->lock)
+{
raw_spin_lock(&acct->lock);
- if (!wq_list_empty(&acct->work_list) &&
- !test_bit(IO_ACCT_STALLED_BIT, &acct->flags))
- ret = true;
- raw_spin_unlock(&acct->lock);
+ if (__io_acct_run_queue(acct))
+ return true;
- return ret;
+ raw_spin_unlock(&acct->lock);
+ return false;
}
/*
@@ -268,11 +276,14 @@ static bool io_wq_activate_free_worker(struct io_wq *wq,
io_worker_release(worker);
continue;
}
- if (wake_up_process(worker->task)) {
- io_worker_release(worker);
- return true;
- }
+ /*
+ * If the worker is already running, it's either already
+ * starting work or finishing work. In either case, if it does
+ * to go sleep, we'll kick off a new task for this work anyway.
+ */
+ wake_up_process(worker->task);
io_worker_release(worker);
+ return true;
}
return false;
@@ -397,6 +408,7 @@ static void io_wq_dec_running(struct io_worker *worker)
if (!io_acct_run_queue(acct))
return;
+ raw_spin_unlock(&acct->lock);
atomic_inc(&acct->nr_running);
atomic_inc(&wq->worker_refs);
io_queue_worker_create(worker, acct, create_worker_cb);
@@ -521,9 +533,13 @@ static void io_assign_current_work(struct io_worker *worker,
raw_spin_unlock(&worker->lock);
}
-static void io_worker_handle_work(struct io_worker *worker)
+/*
+ * Called with acct->lock held, drops it before returning
+ */
+static void io_worker_handle_work(struct io_wq_acct *acct,
+ struct io_worker *worker)
+ __releases(&acct->lock)
{
- struct io_wq_acct *acct = io_wq_get_acct(worker);
struct io_wq *wq = worker->wq;
bool do_kill = test_bit(IO_WQ_BIT_EXIT, &wq->state);
@@ -537,7 +553,6 @@ static void io_worker_handle_work(struct io_worker *worker)
* can't make progress, any work completion or insertion will
* clear the stalled flag.
*/
- raw_spin_lock(&acct->lock);
work = io_get_next_work(acct, worker);
raw_spin_unlock(&acct->lock);
if (work) {
@@ -591,6 +606,10 @@ static void io_worker_handle_work(struct io_worker *worker)
wake_up(&wq->hash->wait);
}
} while (work);
+
+ if (!__io_acct_run_queue(acct))
+ break;
+ raw_spin_lock(&acct->lock);
} while (1);
}
@@ -611,8 +630,13 @@ static int io_wq_worker(void *data)
long ret;
set_current_state(TASK_INTERRUPTIBLE);
+
+ /*
+ * If we have work to do, io_acct_run_queue() returns with
+ * the acct->lock held. If not, it will drop it.
+ */
while (io_acct_run_queue(acct))
- io_worker_handle_work(worker);
+ io_worker_handle_work(acct, worker);
raw_spin_lock(&wq->lock);
/*
@@ -645,8 +669,8 @@ static int io_wq_worker(void *data)
}
}
- if (test_bit(IO_WQ_BIT_EXIT, &wq->state))
- io_worker_handle_work(worker);
+ if (test_bit(IO_WQ_BIT_EXIT, &wq->state) && io_acct_run_queue(acct))
+ io_worker_handle_work(acct, worker);
io_worker_exit(worker);
return 0;
@@ -909,13 +933,10 @@ void io_wq_enqueue(struct io_wq *wq, struct io_wq_work *work)
clear_bit(IO_ACCT_STALLED_BIT, &acct->flags);
raw_spin_unlock(&acct->lock);
- raw_spin_lock(&wq->lock);
rcu_read_lock();
do_create = !io_wq_activate_free_worker(wq, acct);
rcu_read_unlock();
- raw_spin_unlock(&wq->lock);
-
if (do_create && ((work_flags & IO_WQ_WORK_CONCURRENT) ||
!atomic_read(&acct->nr_running))) {
bool did_create;
@@ -1285,13 +1306,16 @@ static int io_wq_cpu_offline(unsigned int cpu, struct hlist_node *node)
return __io_wq_cpu_online(wq, cpu, false);
}
-int io_wq_cpu_affinity(struct io_wq *wq, cpumask_var_t mask)
+int io_wq_cpu_affinity(struct io_uring_task *tctx, cpumask_var_t mask)
{
+ if (!tctx || !tctx->io_wq)
+ return -EINVAL;
+
rcu_read_lock();
if (mask)
- cpumask_copy(wq->cpu_mask, mask);
+ cpumask_copy(tctx->io_wq->cpu_mask, mask);
else
- cpumask_copy(wq->cpu_mask, cpu_possible_mask);
+ cpumask_copy(tctx->io_wq->cpu_mask, cpu_possible_mask);
rcu_read_unlock();
return 0;