diff options
Diffstat (limited to 'io_uring/io_uring.c')
-rw-r--r-- | io_uring/io_uring.c | 509 |
1 files changed, 330 insertions, 179 deletions
diff --git a/io_uring/io_uring.c b/io_uring/io_uring.c index 58ac13b69dc8..3b915deb4d08 100644 --- a/io_uring/io_uring.c +++ b/io_uring/io_uring.c @@ -151,7 +151,7 @@ static void io_move_task_work_from_local(struct io_ring_ctx *ctx); static void __io_submit_flush_completions(struct io_ring_ctx *ctx); static __cold void io_fallback_tw(struct io_uring_task *tctx); -static struct kmem_cache *req_cachep; +struct kmem_cache *req_cachep; struct sock *io_uring_get_socket(struct file *file) { @@ -230,6 +230,7 @@ static inline void req_fail_link_node(struct io_kiocb *req, int res) static inline void io_req_add_to_cache(struct io_kiocb *req, struct io_ring_ctx *ctx) { wq_stack_add_head(&req->comp_list, &ctx->submit_state.free_list); + kasan_poison_object_data(req_cachep, req); } static __cold void io_ring_ctx_ref_free(struct percpu_ref *ref) @@ -245,17 +246,15 @@ static __cold void io_fallback_req_func(struct work_struct *work) fallback_work.work); struct llist_node *node = llist_del_all(&ctx->fallback_llist); struct io_kiocb *req, *tmp; - bool locked = false; + bool locked = true; - percpu_ref_get(&ctx->refs); + mutex_lock(&ctx->uring_lock); llist_for_each_entry_safe(req, tmp, node, io_task_work.node) req->io_task_work.func(req, &locked); - - if (locked) { - io_submit_flush_completions(ctx); - mutex_unlock(&ctx->uring_lock); - } - percpu_ref_put(&ctx->refs); + if (WARN_ON_ONCE(!locked)) + return; + io_submit_flush_completions(ctx); + mutex_unlock(&ctx->uring_lock); } static int io_alloc_hash_table(struct io_hash_table *table, unsigned bits) @@ -316,6 +315,7 @@ static __cold struct io_ring_ctx *io_ring_ctx_alloc(struct io_uring_params *p) xa_init_flags(&ctx->personalities, XA_FLAGS_ALLOC1); mutex_init(&ctx->uring_lock); init_waitqueue_head(&ctx->cq_wait); + init_waitqueue_head(&ctx->poll_wq); spin_lock_init(&ctx->completion_lock); spin_lock_init(&ctx->timeout_lock); INIT_WQ_LIST(&ctx->iopoll_list); @@ -407,7 +407,7 @@ static inline void io_arm_ltimeout(struct io_kiocb *req) static void io_prep_async_work(struct io_kiocb *req) { - const struct io_op_def *def = &io_op_defs[req->opcode]; + const struct io_issue_def *def = &io_issue_defs[req->opcode]; struct io_ring_ctx *ctx = req->ctx; if (!(req->flags & REQ_F_CREDS)) { @@ -572,6 +572,8 @@ static void io_eventfd_flush_signal(struct io_ring_ctx *ctx) void __io_commit_cqring_flush(struct io_ring_ctx *ctx) { + if (ctx->poll_activated) + io_poll_wq_wake(ctx); if (ctx->off_timeout_used) io_flush_timeouts(ctx); if (ctx->drain_active) { @@ -618,6 +620,25 @@ static inline void __io_cq_unlock_post(struct io_ring_ctx *ctx) io_cqring_wake(ctx); } +static inline void __io_cq_unlock_post_flush(struct io_ring_ctx *ctx) + __releases(ctx->completion_lock) +{ + io_commit_cqring(ctx); + __io_cq_unlock(ctx); + io_commit_cqring_flush(ctx); + + /* + * As ->task_complete implies that the ring is single tasked, cq_wait + * may only be waited on by the current in io_cqring_wait(), but since + * it will re-check the wakeup conditions once we return we can safely + * skip waking it up. + */ + if (!(ctx->flags & IORING_SETUP_DEFER_TASKRUN)) { + smp_mb(); + __io_cqring_wake(ctx); + } +} + void io_cq_unlock_post(struct io_ring_ctx *ctx) __releases(ctx->completion_lock) { @@ -645,7 +666,6 @@ static void io_cqring_overflow_kill(struct io_ring_ctx *ctx) } } -/* Returns true if there are no backlogged entries after the flush */ static void __io_cqring_overflow_flush(struct io_ring_ctx *ctx) { size_t cqe_size = sizeof(struct io_uring_cqe); @@ -693,7 +713,8 @@ static void io_cqring_overflow_flush(struct io_ring_ctx *ctx) io_cqring_do_overflow_flush(ctx); } -void __io_put_task(struct task_struct *task, int nr) +/* can be called by any task */ +static void io_put_task_remote(struct task_struct *task, int nr) { struct io_uring_task *tctx = task->io_uring; @@ -703,6 +724,21 @@ void __io_put_task(struct task_struct *task, int nr) put_task_struct_many(task, nr); } +/* used by a task to put its own references */ +static void io_put_task_local(struct task_struct *task, int nr) +{ + task->io_uring->cached_refs += nr; +} + +/* must to be called somewhat shortly after putting a request */ +static inline void io_put_task(struct task_struct *task, int nr) +{ + if (likely(task == current)) + io_put_task_local(task, nr); + else + io_put_task_remote(task, nr); +} + void io_task_refs_refill(struct io_uring_task *tctx) { unsigned int refill = -tctx->cached_refs + IO_TCTX_REFS_CACHE_NR; @@ -731,6 +767,8 @@ static bool io_cqring_event_overflow(struct io_ring_ctx *ctx, u64 user_data, size_t ocq_size = sizeof(struct io_overflow_cqe); bool is_cqe32 = (ctx->flags & IORING_SETUP_CQE32); + lockdep_assert_held(&ctx->completion_lock); + if (is_cqe32) ocq_size += sizeof(struct io_uring_cqe); @@ -820,9 +858,6 @@ static bool io_fill_cqe_aux(struct io_ring_ctx *ctx, u64 user_data, s32 res, { struct io_uring_cqe *cqe; - if (!ctx->task_complete) - lockdep_assert_held(&ctx->completion_lock); - ctx->cq_extra++; /* @@ -946,15 +981,15 @@ static void __io_req_complete_post(struct io_kiocb *req) req->link = NULL; } } + io_put_kbuf_comp(req); + io_dismantle_req(req); io_req_put_rsrc(req); /* * Selected buffer deallocation in io_clean_op() assumes that * we don't hold ->completion_lock. Clean them here to avoid * deadlocks. */ - io_put_kbuf_comp(req); - io_dismantle_req(req); - io_put_task(req->task, 1); + io_put_task_remote(req->task, 1); wq_list_add_head(&req->comp_list, &ctx->locked_free_list); ctx->locked_free_nr++; } @@ -981,7 +1016,7 @@ void io_req_complete_post(struct io_kiocb *req, unsigned issue_flags) void io_req_defer_failed(struct io_kiocb *req, s32 res) __must_hold(&ctx->uring_lock) { - const struct io_op_def *def = &io_op_defs[req->opcode]; + const struct io_cold_def *def = &io_cold_defs[req->opcode]; lockdep_assert_held(&req->ctx->uring_lock); @@ -1077,7 +1112,7 @@ __cold void io_free_req(struct io_kiocb *req) io_req_put_rsrc(req); io_dismantle_req(req); - io_put_task(req->task, 1); + io_put_task_remote(req->task, 1); spin_lock(&ctx->completion_lock); wq_list_add_head(&req->comp_list, &ctx->locked_free_list); @@ -1131,7 +1166,7 @@ static unsigned int handle_tw_list(struct llist_node *node, { unsigned int count = 0; - while (node != last) { + while (node && node != last) { struct llist_node *next = node->next; struct io_kiocb *req = container_of(node, struct io_kiocb, io_task_work.node); @@ -1144,10 +1179,16 @@ static unsigned int handle_tw_list(struct llist_node *node, /* if not contended, grab and improve batching */ *locked = mutex_trylock(&(*ctx)->uring_lock); percpu_ref_get(&(*ctx)->refs); - } + } else if (!*locked) + *locked = mutex_trylock(&(*ctx)->uring_lock); req->io_task_work.func(req, locked); node = next; count++; + if (unlikely(need_resched())) { + ctx_flush_and_put(*ctx, locked); + *ctx = NULL; + cond_resched(); + } } return count; @@ -1191,23 +1232,29 @@ void tctx_task_work(struct callback_head *cb) task_work); struct llist_node fake = {}; struct llist_node *node; - unsigned int loops = 1; - unsigned int count; + unsigned int loops = 0; + unsigned int count = 0; if (unlikely(current->flags & PF_EXITING)) { io_fallback_tw(tctx); return; } - node = io_llist_xchg(&tctx->task_list, &fake); - count = handle_tw_list(node, &ctx, &uring_locked, NULL); - node = io_llist_cmpxchg(&tctx->task_list, &fake, NULL); - while (node != &fake) { + do { loops++; node = io_llist_xchg(&tctx->task_list, &fake); count += handle_tw_list(node, &ctx, &uring_locked, &fake); + + /* skip expensive cmpxchg if there are items in the list */ + if (READ_ONCE(tctx->task_list.first) != &fake) + continue; + if (uring_locked && !wq_list_empty(&ctx->submit_state.compl_reqs)) { + io_submit_flush_completions(ctx); + if (READ_ONCE(tctx->task_list.first) != &fake) + continue; + } node = io_llist_cmpxchg(&tctx->task_list, &fake, NULL); - } + } while (node != &fake); ctx_flush_and_put(ctx, &uring_locked); @@ -1236,22 +1283,29 @@ static void io_req_local_work_add(struct io_kiocb *req) { struct io_ring_ctx *ctx = req->ctx; - if (!llist_add(&req->io_task_work.node, &ctx->work_llist)) + percpu_ref_get(&ctx->refs); + + if (!llist_add(&req->io_task_work.node, &ctx->work_llist)) { + percpu_ref_put(&ctx->refs); return; - /* need it for the following io_cqring_wake() */ + } + /* needed for the following wake up */ smp_mb__after_atomic(); if (unlikely(atomic_read(&req->task->io_uring->in_idle))) { io_move_task_work_from_local(ctx); + percpu_ref_put(&ctx->refs); return; } if (ctx->flags & IORING_SETUP_TASKRUN_FLAG) atomic_or(IORING_SQ_TASKRUN, &ctx->rings->sq_flags); - if (ctx->has_evfd) io_eventfd_signal(ctx); - __io_cqring_wake(ctx); + + if (READ_ONCE(ctx->cq_waiting)) + wake_up_state(ctx->submitter_task, TASK_INTERRUPTIBLE); + percpu_ref_put(&ctx->refs); } void __io_req_task_work_add(struct io_kiocb *req, bool allow_local) @@ -1291,21 +1345,19 @@ static void __cold io_move_task_work_from_local(struct io_ring_ctx *ctx) } } -int __io_run_local_work(struct io_ring_ctx *ctx, bool *locked) +static int __io_run_local_work(struct io_ring_ctx *ctx, bool *locked) { struct llist_node *node; - struct llist_node fake; - struct llist_node *current_final = NULL; - int ret; - unsigned int loops = 1; + unsigned int loops = 0; + int ret = 0; - if (unlikely(ctx->submitter_task != current)) + if (WARN_ON_ONCE(ctx->submitter_task != current)) return -EEXIST; - - node = io_llist_xchg(&ctx->work_llist, &fake); - ret = 0; + if (ctx->flags & IORING_SETUP_TASKRUN_FLAG) + atomic_andnot(IORING_SQ_TASKRUN, &ctx->rings->sq_flags); again: - while (node != current_final) { + node = io_llist_xchg(&ctx->work_llist, NULL); + while (node) { struct llist_node *next = node->next; struct io_kiocb *req = container_of(node, struct io_kiocb, io_task_work.node); @@ -1314,26 +1366,20 @@ again: ret++; node = next; } + loops++; - if (ctx->flags & IORING_SETUP_TASKRUN_FLAG) - atomic_andnot(IORING_SQ_TASKRUN, &ctx->rings->sq_flags); - - node = io_llist_cmpxchg(&ctx->work_llist, &fake, NULL); - if (node != &fake) { - loops++; - current_final = &fake; - node = io_llist_xchg(&ctx->work_llist, &fake); + if (!llist_empty(&ctx->work_llist)) goto again; - } - - if (*locked) + if (*locked) { io_submit_flush_completions(ctx); + if (!llist_empty(&ctx->work_llist)) + goto again; + } trace_io_uring_local_work_run(ctx, ret, loops); return ret; - } -int io_run_local_work(struct io_ring_ctx *ctx) +static inline int io_run_local_work_locked(struct io_ring_ctx *ctx) { bool locked; int ret; @@ -1341,8 +1387,19 @@ int io_run_local_work(struct io_ring_ctx *ctx) if (llist_empty(&ctx->work_llist)) return 0; - __set_current_state(TASK_RUNNING); - locked = mutex_trylock(&ctx->uring_lock); + locked = true; + ret = __io_run_local_work(ctx, &locked); + /* shouldn't happen! */ + if (WARN_ON_ONCE(!locked)) + mutex_lock(&ctx->uring_lock); + return ret; +} + +static int io_run_local_work(struct io_ring_ctx *ctx) +{ + bool locked = mutex_trylock(&ctx->uring_lock); + int ret; + ret = __io_run_local_work(ctx, &locked); if (locked) mutex_unlock(&ctx->uring_lock); @@ -1360,10 +1417,12 @@ void io_req_task_submit(struct io_kiocb *req, bool *locked) { io_tw_lock(req->ctx, locked); /* req->task == current here, checking PF_EXITING is safe */ - if (likely(!(req->task->flags & PF_EXITING))) - io_queue_sqe(req); - else + if (unlikely(req->task->flags & PF_EXITING)) io_req_defer_failed(req, -EFAULT); + else if (req->flags & REQ_F_FORCE_ASYNC) + io_queue_iowq(req, locked); + else + io_queue_sqe(req); } void io_req_task_queue_fail(struct io_kiocb *req, int ret) @@ -1462,7 +1521,7 @@ static void __io_submit_flush_completions(struct io_ring_ctx *ctx) } } } - __io_cq_unlock_post(ctx); + __io_cq_unlock_post_flush(ctx); if (!wq_list_empty(&ctx->submit_state.compl_reqs)) { io_free_batch_list(ctx, state->compl_reqs.first); @@ -1703,8 +1762,8 @@ unsigned int io_file_get_flags(struct file *file) bool io_alloc_async_data(struct io_kiocb *req) { - WARN_ON_ONCE(!io_op_defs[req->opcode].async_size); - req->async_data = kmalloc(io_op_defs[req->opcode].async_size, GFP_KERNEL); + WARN_ON_ONCE(!io_cold_defs[req->opcode].async_size); + req->async_data = kmalloc(io_cold_defs[req->opcode].async_size, GFP_KERNEL); if (req->async_data) { req->flags |= REQ_F_ASYNC_DATA; return false; @@ -1714,20 +1773,21 @@ bool io_alloc_async_data(struct io_kiocb *req) int io_req_prep_async(struct io_kiocb *req) { - const struct io_op_def *def = &io_op_defs[req->opcode]; + const struct io_cold_def *cdef = &io_cold_defs[req->opcode]; + const struct io_issue_def *def = &io_issue_defs[req->opcode]; /* assign early for deferred execution for non-fixed file */ if (def->needs_file && !(req->flags & REQ_F_FIXED_FILE)) req->file = io_file_get_normal(req, req->cqe.fd); - if (!def->prep_async) + if (!cdef->prep_async) return 0; if (WARN_ON_ONCE(req_has_async_data(req))) return -EFAULT; - if (!io_op_defs[req->opcode].manual_alloc) { + if (!def->manual_alloc) { if (io_alloc_async_data(req)) return -EAGAIN; } - return def->prep_async(req); + return cdef->prep_async(req); } static u32 io_get_sequence(struct io_kiocb *req) @@ -1760,17 +1820,12 @@ queue: } spin_unlock(&ctx->completion_lock); - ret = io_req_prep_async(req); - if (ret) { -fail: - io_req_defer_failed(req, ret); - return; - } io_prep_async_link(req); de = kmalloc(sizeof(*de), GFP_KERNEL); if (!de) { ret = -ENOMEM; - goto fail; + io_req_defer_failed(req, ret); + return; } spin_lock(&ctx->completion_lock); @@ -1796,7 +1851,7 @@ static void io_clean_op(struct io_kiocb *req) } if (req->flags & REQ_F_NEED_CLEANUP) { - const struct io_op_def *def = &io_op_defs[req->opcode]; + const struct io_cold_def *def = &io_cold_defs[req->opcode]; if (def->cleanup) def->cleanup(req); @@ -1820,9 +1875,10 @@ static void io_clean_op(struct io_kiocb *req) req->flags &= ~IO_REQ_CLEAN_FLAGS; } -static bool io_assign_file(struct io_kiocb *req, unsigned int issue_flags) +static bool io_assign_file(struct io_kiocb *req, const struct io_issue_def *def, + unsigned int issue_flags) { - if (req->file || !io_op_defs[req->opcode].needs_file) + if (req->file || !def->needs_file) return true; if (req->flags & REQ_F_FIXED_FILE) @@ -1835,11 +1891,11 @@ static bool io_assign_file(struct io_kiocb *req, unsigned int issue_flags) static int io_issue_sqe(struct io_kiocb *req, unsigned int issue_flags) { - const struct io_op_def *def = &io_op_defs[req->opcode]; + const struct io_issue_def *def = &io_issue_defs[req->opcode]; const struct cred *creds = NULL; int ret; - if (unlikely(!io_assign_file(req, issue_flags))) + if (unlikely(!io_assign_file(req, def, issue_flags))) return -EBADF; if (unlikely((req->flags & REQ_F_CREDS) && req->creds != current_cred())) @@ -1889,7 +1945,7 @@ struct io_wq_work *io_wq_free_work(struct io_wq_work *work) void io_wq_submit_work(struct io_wq_work *work) { struct io_kiocb *req = container_of(work, struct io_kiocb, work); - const struct io_op_def *def = &io_op_defs[req->opcode]; + const struct io_issue_def *def = &io_issue_defs[req->opcode]; unsigned int issue_flags = IO_URING_F_UNLOCKED | IO_URING_F_IOWQ; bool needs_poll = false; int ret = 0, err = -ECANCELED; @@ -1908,7 +1964,7 @@ fail: io_req_task_queue_fail(req, err); return; } - if (!io_assign_file(req, issue_flags)) { + if (!io_assign_file(req, def, issue_flags)) { err = -EBADF; work->flags |= IO_WQ_WORK_CANCEL; goto fail; @@ -2043,13 +2099,16 @@ static void io_queue_sqe_fallback(struct io_kiocb *req) req->flags &= ~REQ_F_HARDLINK; req->flags |= REQ_F_LINK; io_req_defer_failed(req, req->cqe.res); - } else if (unlikely(req->ctx->drain_active)) { - io_drain_req(req); } else { int ret = io_req_prep_async(req); - if (unlikely(ret)) + if (unlikely(ret)) { io_req_defer_failed(req, ret); + return; + } + + if (unlikely(req->ctx->drain_active)) + io_drain_req(req); else io_queue_iowq(req, NULL); } @@ -2101,7 +2160,7 @@ static int io_init_req(struct io_ring_ctx *ctx, struct io_kiocb *req, const struct io_uring_sqe *sqe) __must_hold(&ctx->uring_lock) { - const struct io_op_def *def; + const struct io_issue_def *def; unsigned int sqe_flags; int personality; u8 opcode; @@ -2119,7 +2178,7 @@ static int io_init_req(struct io_ring_ctx *ctx, struct io_kiocb *req, req->opcode = 0; return -EINVAL; } - def = &io_op_defs[opcode]; + def = &io_issue_defs[opcode]; if (unlikely(sqe_flags & ~SQE_COMMON_FLAGS)) { /* enforce forwards compatibility on users */ if (sqe_flags & ~SQE_VALID_FLAGS) @@ -2330,7 +2389,7 @@ static void io_commit_sqring(struct io_ring_ctx *ctx) * used, it's important that those reads are done through READ_ONCE() to * prevent a re-load down the line. */ -static const struct io_uring_sqe *io_get_sqe(struct io_ring_ctx *ctx) +static bool io_get_sqe(struct io_ring_ctx *ctx, const struct io_uring_sqe **sqe) { unsigned head, mask = ctx->sq_entries - 1; unsigned sq_idx = ctx->cached_sq_head++ & mask; @@ -2348,14 +2407,15 @@ static const struct io_uring_sqe *io_get_sqe(struct io_ring_ctx *ctx) /* double index for 128-byte SQEs, twice as long */ if (ctx->flags & IORING_SETUP_SQE128) head <<= 1; - return &ctx->sq_sqes[head]; + *sqe = &ctx->sq_sqes[head]; + return true; } /* drop invalid entries */ ctx->cq_extra--; WRITE_ONCE(ctx->rings->sq_dropped, READ_ONCE(ctx->rings->sq_dropped) + 1); - return NULL; + return false; } int io_submit_sqes(struct io_ring_ctx *ctx, unsigned int nr) @@ -2376,11 +2436,9 @@ int io_submit_sqes(struct io_ring_ctx *ctx, unsigned int nr) const struct io_uring_sqe *sqe; struct io_kiocb *req; - if (unlikely(!io_alloc_req_refill(ctx))) + if (unlikely(!io_alloc_req(ctx, &req))) break; - req = io_alloc_req(ctx); - sqe = io_get_sqe(ctx); - if (unlikely(!sqe)) { + if (unlikely(!io_get_sqe(ctx, &sqe))) { io_req_add_to_cache(req, ctx); break; } @@ -2415,13 +2473,13 @@ struct io_wait_queue { struct io_ring_ctx *ctx; unsigned cq_tail; unsigned nr_timeouts; + ktime_t timeout; }; static inline bool io_has_work(struct io_ring_ctx *ctx) { return test_bit(IO_CHECK_CQ_OVERFLOW_BIT, &ctx->check_cq) || - ((ctx->flags & IORING_SETUP_DEFER_TASKRUN) && - !llist_empty(&ctx->work_llist)); + !llist_empty(&ctx->work_llist); } static inline bool io_should_wake(struct io_wait_queue *iowq) @@ -2440,22 +2498,25 @@ static inline bool io_should_wake(struct io_wait_queue *iowq) static int io_wake_function(struct wait_queue_entry *curr, unsigned int mode, int wake_flags, void *key) { - struct io_wait_queue *iowq = container_of(curr, struct io_wait_queue, - wq); - struct io_ring_ctx *ctx = iowq->ctx; + struct io_wait_queue *iowq = container_of(curr, struct io_wait_queue, wq); /* * Cannot safely flush overflowed CQEs from here, ensure we wake up * the task, and the next invocation will do it. */ - if (io_should_wake(iowq) || io_has_work(ctx)) + if (io_should_wake(iowq) || io_has_work(iowq->ctx)) return autoremove_wake_function(curr, mode, wake_flags, key); return -1; } int io_run_task_work_sig(struct io_ring_ctx *ctx) { - if (io_run_task_work_ctx(ctx) > 0) + if (!llist_empty(&ctx->work_llist)) { + __set_current_state(TASK_RUNNING); + if (io_run_local_work(ctx) > 0) + return 1; + } + if (io_run_task_work() > 0) return 1; if (task_sigpending(current)) return -EINTR; @@ -2464,35 +2525,23 @@ int io_run_task_work_sig(struct io_ring_ctx *ctx) /* when returns >0, the caller should retry */ static inline int io_cqring_wait_schedule(struct io_ring_ctx *ctx, - struct io_wait_queue *iowq, - ktime_t timeout) + struct io_wait_queue *iowq) { - int ret; - unsigned long check_cq; - - /* make sure we run task_work before checking for signals */ - ret = io_run_task_work_sig(ctx); - if (ret || io_should_wake(iowq)) - return ret; - - check_cq = READ_ONCE(ctx->check_cq); - if (unlikely(check_cq)) { - /* let the caller flush overflows, retry */ - if (check_cq & BIT(IO_CHECK_CQ_OVERFLOW_BIT)) - return 1; - if (check_cq & BIT(IO_CHECK_CQ_DROPPED_BIT)) - return -EBADR; - } - if (!schedule_hrtimeout(&timeout, HRTIMER_MODE_ABS)) + if (unlikely(READ_ONCE(ctx->check_cq))) + return 1; + if (unlikely(!llist_empty(&ctx->work_llist))) + return 1; + if (unlikely(test_thread_flag(TIF_NOTIFY_SIGNAL))) + return 1; + if (unlikely(task_sigpending(current))) + return -EINTR; + if (unlikely(io_should_wake(iowq))) + return 0; + if (iowq->timeout == KTIME_MAX) + schedule(); + else if (!schedule_hrtimeout(&iowq->timeout, HRTIMER_MODE_ABS)) return -ETIME; - - /* - * Run task_work after scheduling. If we got woken because of - * task_work being processed, run it now rather than let the caller - * do another wait loop. - */ - ret = io_run_task_work_sig(ctx); - return ret < 0 ? ret : 1; + return 0; } /* @@ -2505,23 +2554,17 @@ static int io_cqring_wait(struct io_ring_ctx *ctx, int min_events, { struct io_wait_queue iowq; struct io_rings *rings = ctx->rings; - ktime_t timeout = KTIME_MAX; int ret; if (!io_allowed_run_tw(ctx)) return -EEXIST; - - do { - /* always run at least 1 task work to process local work */ - ret = io_run_task_work_ctx(ctx); - if (ret < 0) - return ret; - io_cqring_overflow_flush(ctx); - - /* if user messes with these they will just get an early return */ - if (__io_cqring_events_user(ctx) >= min_events) - return 0; - } while (ret > 0); + if (!llist_empty(&ctx->work_llist)) + io_run_local_work(ctx); + io_run_task_work(); + io_cqring_overflow_flush(ctx); + /* if user messes with these they will just get an early return */ + if (__io_cqring_events_user(ctx) >= min_events) + return 0; if (sig) { #ifdef CONFIG_COMPAT @@ -2536,36 +2579,69 @@ static int io_cqring_wait(struct io_ring_ctx *ctx, int min_events, return ret; } - if (uts) { - struct timespec64 ts; - - if (get_timespec64(&ts, uts)) - return -EFAULT; - timeout = ktime_add_ns(timespec64_to_ktime(ts), ktime_get_ns()); - } - init_waitqueue_func_entry(&iowq.wq, io_wake_function); iowq.wq.private = current; INIT_LIST_HEAD(&iowq.wq.entry); iowq.ctx = ctx; iowq.nr_timeouts = atomic_read(&ctx->cq_timeouts); iowq.cq_tail = READ_ONCE(ctx->rings->cq.head) + min_events; + iowq.timeout = KTIME_MAX; + + if (uts) { + struct timespec64 ts; + + if (get_timespec64(&ts, uts)) + return -EFAULT; + iowq.timeout = ktime_add_ns(timespec64_to_ktime(ts), ktime_get_ns()); + } trace_io_uring_cqring_wait(ctx, min_events); do { - if (test_bit(IO_CHECK_CQ_OVERFLOW_BIT, &ctx->check_cq)) { - finish_wait(&ctx->cq_wait, &iowq.wq); - io_cqring_do_overflow_flush(ctx); + unsigned long check_cq; + + if (ctx->flags & IORING_SETUP_DEFER_TASKRUN) { + WRITE_ONCE(ctx->cq_waiting, 1); + set_current_state(TASK_INTERRUPTIBLE); + } else { + prepare_to_wait_exclusive(&ctx->cq_wait, &iowq.wq, + TASK_INTERRUPTIBLE); } - prepare_to_wait_exclusive(&ctx->cq_wait, &iowq.wq, - TASK_INTERRUPTIBLE); - ret = io_cqring_wait_schedule(ctx, &iowq, timeout); - if (__io_cqring_events_user(ctx) >= min_events) + + ret = io_cqring_wait_schedule(ctx, &iowq); + __set_current_state(TASK_RUNNING); + WRITE_ONCE(ctx->cq_waiting, 0); + + if (ret < 0) break; + /* + * Run task_work after scheduling and before io_should_wake(). + * If we got woken because of task_work being processed, run it + * now rather than let the caller do another wait loop. + */ + io_run_task_work(); + if (!llist_empty(&ctx->work_llist)) + io_run_local_work(ctx); + + check_cq = READ_ONCE(ctx->check_cq); + if (unlikely(check_cq)) { + /* let the caller flush overflows, retry */ + if (check_cq & BIT(IO_CHECK_CQ_OVERFLOW_BIT)) + io_cqring_do_overflow_flush(ctx); + if (check_cq & BIT(IO_CHECK_CQ_DROPPED_BIT)) { + ret = -EBADR; + break; + } + } + + if (io_should_wake(&iowq)) { + ret = 0; + break; + } cond_resched(); - } while (ret > 0); + } while (1); - finish_wait(&ctx->cq_wait, &iowq.wq); + if (!(ctx->flags & IORING_SETUP_DEFER_TASKRUN)) + finish_wait(&ctx->cq_wait, &iowq.wq); restore_saved_sigmask_unless(ret == -EINTR); return READ_ONCE(rings->cq.head) == READ_ONCE(rings->cq.tail) ? ret : 0; @@ -2680,14 +2756,14 @@ static int io_eventfd_unregister(struct io_ring_ctx *ctx) static void io_req_caches_free(struct io_ring_ctx *ctx) { + struct io_kiocb *req; int nr = 0; mutex_lock(&ctx->uring_lock); io_flush_cached_locked_reqs(ctx, &ctx->submit_state); while (!io_req_cache_empty(ctx)) { - struct io_kiocb *req = io_alloc_req(ctx); - + req = io_extract_req(ctx); kmem_cache_free(req_cachep, req); nr++; } @@ -2759,12 +2835,54 @@ static __cold void io_ring_ctx_free(struct io_ring_ctx *ctx) kfree(ctx); } +static __cold void io_activate_pollwq_cb(struct callback_head *cb) +{ + struct io_ring_ctx *ctx = container_of(cb, struct io_ring_ctx, + poll_wq_task_work); + + mutex_lock(&ctx->uring_lock); + ctx->poll_activated = true; + mutex_unlock(&ctx->uring_lock); + + /* + * Wake ups for some events between start of polling and activation + * might've been lost due to loose synchronisation. + */ + wake_up_all(&ctx->poll_wq); + percpu_ref_put(&ctx->refs); +} + +static __cold void io_activate_pollwq(struct io_ring_ctx *ctx) +{ + spin_lock(&ctx->completion_lock); + /* already activated or in progress */ + if (ctx->poll_activated || ctx->poll_wq_task_work.func) + goto out; + if (WARN_ON_ONCE(!ctx->task_complete)) + goto out; + if (!ctx->submitter_task) + goto out; + /* + * with ->submitter_task only the submitter task completes requests, we + * only need to sync with it, which is done by injecting a tw + */ + init_task_work(&ctx->poll_wq_task_work, io_activate_pollwq_cb); + percpu_ref_get(&ctx->refs); + if (task_work_add(ctx->submitter_task, &ctx->poll_wq_task_work, TWA_SIGNAL)) + percpu_ref_put(&ctx->refs); +out: + spin_unlock(&ctx->completion_lock); +} + static __poll_t io_uring_poll(struct file *file, poll_table *wait) { struct io_ring_ctx *ctx = file->private_data; __poll_t mask = 0; - poll_wait(file, &ctx->cq_wait, wait); + if (unlikely(!ctx->poll_activated)) + io_activate_pollwq(ctx); + + poll_wait(file, &ctx->poll_wq, wait); /* * synchronizes with barrier from wq_has_sleeper call in * io_commit_cqring @@ -2787,7 +2905,7 @@ static __poll_t io_uring_poll(struct file *file, poll_table *wait) * pushes them to do the flush. */ - if (io_cqring_events(ctx) || io_has_work(ctx)) + if (__io_cqring_events_user(ctx) || io_has_work(ctx)) mask |= EPOLLIN | EPOLLRDNORM; return mask; @@ -3050,10 +3168,12 @@ static __cold bool io_uring_try_cancel_requests(struct io_ring_ctx *ctx, while (!wq_list_empty(&ctx->iopoll_list)) { io_iopoll_try_reap_events(ctx); ret = true; + cond_resched(); } } - if (ctx->flags & IORING_SETUP_DEFER_TASKRUN) + if ((ctx->flags & IORING_SETUP_DEFER_TASKRUN) && + io_allowed_defer_tw_run(ctx)) ret |= io_run_local_work(ctx) > 0; ret |= io_cancel_defer_files(ctx, task, cancel_all); mutex_lock(&ctx->uring_lock); @@ -3325,11 +3445,9 @@ SYSCALL_DEFINE6(io_uring_enter, unsigned int, fd, u32, to_submit, } if (flags & IORING_ENTER_SQ_WAKEUP) wake_up(&ctx->sq_data->wait); - if (flags & IORING_ENTER_SQ_WAIT) { - ret = io_sqpoll_wait_sq(ctx); - if (ret) - goto out; - } + if (flags & IORING_ENTER_SQ_WAIT) + io_sqpoll_wait_sq(ctx); + ret = to_submit; } else if (to_submit) { ret = io_uring_add_tctx_node(ctx); @@ -3570,6 +3688,13 @@ static __cold int io_uring_create(unsigned entries, struct io_uring_params *p, ctx->task_complete = true; /* + * lazy poll_wq activation relies on ->task_complete for synchronisation + * purposes, see io_activate_pollwq() + */ + if (!ctx->task_complete) + ctx->poll_activated = true; + + /* * When SETUP_IOPOLL and SETUP_SQPOLL are both enabled, user * space applications don't need to do io completion events * polling again, they can rely on io_sq_thread to do polling @@ -3660,7 +3785,7 @@ static __cold int io_uring_create(unsigned entries, struct io_uring_params *p, IORING_FEAT_POLL_32BITS | IORING_FEAT_SQPOLL_NONFIXED | IORING_FEAT_EXT_ARG | IORING_FEAT_NATIVE_WORKERS | IORING_FEAT_RSRC_TAGS | IORING_FEAT_CQE_SKIP | - IORING_FEAT_LINKED_FILE; + IORING_FEAT_LINKED_FILE | IORING_FEAT_REG_REG_RING; if (copy_to_user(params, p, sizeof(*p))) { ret = -EFAULT; @@ -3669,7 +3794,7 @@ static __cold int io_uring_create(unsigned entries, struct io_uring_params *p, if (ctx->flags & IORING_SETUP_SINGLE_ISSUER && !(ctx->flags & IORING_SETUP_R_DISABLED)) - ctx->submitter_task = get_task_struct(current); + WRITE_ONCE(ctx->submitter_task, get_task_struct(current)); file = io_uring_get_file(ctx); if (IS_ERR(file)) { @@ -3757,7 +3882,7 @@ static __cold int io_probe(struct io_ring_ctx *ctx, void __user *arg, for (i = 0; i < nr_args; i++) { p->ops[i].op = i; - if (!io_op_defs[i].not_supported) + if (!io_issue_defs[i].not_supported) p->ops[i].flags = IO_URING_OP_SUPPORTED; } p->ops_len = i; @@ -3862,8 +3987,15 @@ static int io_register_enable_rings(struct io_ring_ctx *ctx) if (!(ctx->flags & IORING_SETUP_R_DISABLED)) return -EBADFD; - if (ctx->flags & IORING_SETUP_SINGLE_ISSUER && !ctx->submitter_task) - ctx->submitter_task = get_task_struct(current); + if (ctx->flags & IORING_SETUP_SINGLE_ISSUER && !ctx->submitter_task) { + WRITE_ONCE(ctx->submitter_task, get_task_struct(current)); + /* + * Lazy activation attempts would fail if it was polled before + * submitter_task is set. + */ + if (wq_has_sleeper(&ctx->poll_wq)) + io_activate_pollwq(ctx); + } if (ctx->restrictions.registered) ctx->restricted = 1; @@ -4174,17 +4306,36 @@ SYSCALL_DEFINE4(io_uring_register, unsigned int, fd, unsigned int, opcode, struct io_ring_ctx *ctx; long ret = -EBADF; struct fd f; + bool use_registered_ring; + + use_registered_ring = !!(opcode & IORING_REGISTER_USE_REGISTERED_RING); + opcode &= ~IORING_REGISTER_USE_REGISTERED_RING; if (opcode >= IORING_REGISTER_LAST) return -EINVAL; - f = fdget(fd); - if (!f.file) - return -EBADF; + if (use_registered_ring) { + /* + * Ring fd has been registered via IORING_REGISTER_RING_FDS, we + * need only dereference our task private array to find it. + */ + struct io_uring_task *tctx = current->io_uring; - ret = -EOPNOTSUPP; - if (!io_is_uring_fops(f.file)) - goto out_fput; + if (unlikely(!tctx || fd >= IO_RINGFD_REG_MAX)) + return -EINVAL; + fd = array_index_nospec(fd, IO_RINGFD_REG_MAX); + f.file = tctx->registered_rings[fd]; + f.flags = 0; + if (unlikely(!f.file)) + return -EBADF; + } else { + f = fdget(fd); + if (unlikely(!f.file)) + return -EBADF; + ret = -EOPNOTSUPP; + if (!io_is_uring_fops(f.file)) + goto out_fput; + } ctx = f.file->private_data; |