From 2d0abe36cf13fb7b577949fd1539326adddcc9bc Mon Sep 17 00:00:00 2001 From: Chuck Lever Date: Wed, 19 Jun 2019 10:32:38 -0400 Subject: xprtrdma: Fix use-after-free in rpcrdma_post_recvs Dereference wr->next /before/ the memory backing wr has been released. This issue was found by code inspection. It is not expected to be a significant problem because it is in an error path that is almost never executed. Fixes: 7c8d9e7c8863 ("xprtrdma: Move Receive posting to ... ") Signed-off-by: Chuck Lever Signed-off-by: Anna Schumaker --- net/sunrpc/xprtrdma/verbs.c | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) (limited to 'net') diff --git a/net/sunrpc/xprtrdma/verbs.c b/net/sunrpc/xprtrdma/verbs.c index 84bb37924540..e71315e9dca2 100644 --- a/net/sunrpc/xprtrdma/verbs.c +++ b/net/sunrpc/xprtrdma/verbs.c @@ -1553,10 +1553,11 @@ rpcrdma_post_recvs(struct rpcrdma_xprt *r_xprt, bool temp) rc = ib_post_recv(r_xprt->rx_ia.ri_id->qp, wr, (const struct ib_recv_wr **)&bad_wr); if (rc) { - for (wr = bad_wr; wr; wr = wr->next) { + for (wr = bad_wr; wr;) { struct rpcrdma_rep *rep; rep = container_of(wr, struct rpcrdma_rep, rr_recv_wr); + wr = wr->next; rpcrdma_recv_buffer_put(rep); --count; } -- cgit v1.2.3 From 7e0a0e38fcfea47e74b0ff6da6266f00bcd2af43 Mon Sep 17 00:00:00 2001 From: Trond Myklebust Date: Wed, 1 May 2019 10:49:27 -0400 Subject: SUNRPC: Replace the queue timer with a delayed work function The queue timer function, which walks the RPC queue in order to locate candidates for waking up is one of the current constraints against removing the bh-safe queue spin locks. Replace it with a delayed work queue, so that we can do the actual rpc task wake ups from an ordinary process context. Signed-off-by: Trond Myklebust --- include/linux/sunrpc/sched.h | 3 ++- net/sunrpc/sched.c | 30 ++++++++++++++++++++---------- 2 files changed, 22 insertions(+), 11 deletions(-) (limited to 'net') diff --git a/include/linux/sunrpc/sched.h b/include/linux/sunrpc/sched.h index d0e451868f02..7d8db5dcac04 100644 --- a/include/linux/sunrpc/sched.h +++ b/include/linux/sunrpc/sched.h @@ -183,8 +183,9 @@ struct rpc_task_setup { #define RPC_NR_PRIORITY (1 + RPC_PRIORITY_PRIVILEGED - RPC_PRIORITY_LOW) struct rpc_timer { - struct timer_list timer; struct list_head list; + unsigned long expires; + struct delayed_work dwork; }; /* diff --git a/net/sunrpc/sched.c b/net/sunrpc/sched.c index a2c114812717..e0a0cf381eba 100644 --- a/net/sunrpc/sched.c +++ b/net/sunrpc/sched.c @@ -46,7 +46,7 @@ static mempool_t *rpc_buffer_mempool __read_mostly; static void rpc_async_schedule(struct work_struct *); static void rpc_release_task(struct rpc_task *task); -static void __rpc_queue_timer_fn(struct timer_list *t); +static void __rpc_queue_timer_fn(struct work_struct *); /* * RPC tasks sit here while waiting for conditions to improve. @@ -87,13 +87,19 @@ __rpc_disable_timer(struct rpc_wait_queue *queue, struct rpc_task *task) task->tk_timeout = 0; list_del(&task->u.tk_wait.timer_list); if (list_empty(&queue->timer_list.list)) - del_timer(&queue->timer_list.timer); + cancel_delayed_work(&queue->timer_list.dwork); } static void rpc_set_queue_timer(struct rpc_wait_queue *queue, unsigned long expires) { - timer_reduce(&queue->timer_list.timer, expires); + unsigned long now = jiffies; + queue->timer_list.expires = expires; + if (time_before_eq(expires, now)) + expires = 0; + else + expires -= now; + mod_delayed_work(rpciod_workqueue, &queue->timer_list.dwork, expires); } /* @@ -107,7 +113,8 @@ __rpc_add_timer(struct rpc_wait_queue *queue, struct rpc_task *task, task->tk_pid, jiffies_to_msecs(timeout - jiffies)); task->tk_timeout = timeout; - rpc_set_queue_timer(queue, timeout); + if (list_empty(&queue->timer_list.list) || time_before(timeout, queue->timer_list.expires)) + rpc_set_queue_timer(queue, timeout); list_add(&task->u.tk_wait.timer_list, &queue->timer_list.list); } @@ -250,7 +257,8 @@ static void __rpc_init_priority_wait_queue(struct rpc_wait_queue *queue, const c queue->maxpriority = nr_queues - 1; rpc_reset_waitqueue_priority(queue); queue->qlen = 0; - timer_setup(&queue->timer_list.timer, __rpc_queue_timer_fn, 0); + queue->timer_list.expires = 0; + INIT_DEFERRABLE_WORK(&queue->timer_list.dwork, __rpc_queue_timer_fn); INIT_LIST_HEAD(&queue->timer_list.list); rpc_assign_waitqueue_name(queue, qname); } @@ -269,7 +277,7 @@ EXPORT_SYMBOL_GPL(rpc_init_wait_queue); void rpc_destroy_wait_queue(struct rpc_wait_queue *queue) { - del_timer_sync(&queue->timer_list.timer); + cancel_delayed_work_sync(&queue->timer_list.dwork); } EXPORT_SYMBOL_GPL(rpc_destroy_wait_queue); @@ -759,13 +767,15 @@ void rpc_wake_up_status(struct rpc_wait_queue *queue, int status) } EXPORT_SYMBOL_GPL(rpc_wake_up_status); -static void __rpc_queue_timer_fn(struct timer_list *t) +static void __rpc_queue_timer_fn(struct work_struct *work) { - struct rpc_wait_queue *queue = from_timer(queue, t, timer_list.timer); + struct rpc_wait_queue *queue = container_of(work, + struct rpc_wait_queue, + timer_list.dwork.work); struct rpc_task *task, *n; unsigned long expires, now, timeo; - spin_lock(&queue->lock); + spin_lock_bh(&queue->lock); expires = now = jiffies; list_for_each_entry_safe(task, n, &queue->timer_list.list, u.tk_wait.timer_list) { timeo = task->tk_timeout; @@ -780,7 +790,7 @@ static void __rpc_queue_timer_fn(struct timer_list *t) } if (!list_empty(&queue->timer_list.list)) rpc_set_queue_timer(queue, expires); - spin_unlock(&queue->lock); + spin_unlock_bh(&queue->lock); } static void __rpc_atrun(struct rpc_task *task) -- cgit v1.2.3 From 4f8943f8088348ec01456b075d44ad19dce3d698 Mon Sep 17 00:00:00 2001 From: Trond Myklebust Date: Wed, 1 May 2019 16:28:29 -0400 Subject: SUNRPC: Replace direct task wakeups from softirq context Replace the direct task wakeups from inside a softirq context with wakeups from a process context. Signed-off-by: Trond Myklebust --- include/linux/sunrpc/xprtsock.h | 5 +++ net/sunrpc/xprtsock.c | 78 +++++++++++++++++++++++++++++++++++++---- 2 files changed, 77 insertions(+), 6 deletions(-) (limited to 'net') diff --git a/include/linux/sunrpc/xprtsock.h b/include/linux/sunrpc/xprtsock.h index b81d0b3e0799..7638dbe7bc50 100644 --- a/include/linux/sunrpc/xprtsock.h +++ b/include/linux/sunrpc/xprtsock.h @@ -56,6 +56,7 @@ struct sock_xprt { */ unsigned long sock_state; struct delayed_work connect_worker; + struct work_struct error_worker; struct work_struct recv_worker; struct mutex recv_mutex; struct sockaddr_storage srcaddr; @@ -84,6 +85,10 @@ struct sock_xprt { #define XPRT_SOCK_CONNECTING 1U #define XPRT_SOCK_DATA_READY (2) #define XPRT_SOCK_UPD_TIMEOUT (3) +#define XPRT_SOCK_WAKE_ERROR (4) +#define XPRT_SOCK_WAKE_WRITE (5) +#define XPRT_SOCK_WAKE_PENDING (6) +#define XPRT_SOCK_WAKE_DISCONNECT (7) #endif /* __KERNEL__ */ diff --git a/net/sunrpc/xprtsock.c b/net/sunrpc/xprtsock.c index 36652352a38c..92af57019b96 100644 --- a/net/sunrpc/xprtsock.c +++ b/net/sunrpc/xprtsock.c @@ -1211,6 +1211,15 @@ static void xs_sock_reset_state_flags(struct rpc_xprt *xprt) struct sock_xprt *transport = container_of(xprt, struct sock_xprt, xprt); clear_bit(XPRT_SOCK_DATA_READY, &transport->sock_state); + clear_bit(XPRT_SOCK_WAKE_ERROR, &transport->sock_state); + clear_bit(XPRT_SOCK_WAKE_WRITE, &transport->sock_state); + clear_bit(XPRT_SOCK_WAKE_DISCONNECT, &transport->sock_state); +} + +static void xs_run_error_worker(struct sock_xprt *transport, unsigned int nr) +{ + set_bit(nr, &transport->sock_state); + queue_work(xprtiod_workqueue, &transport->error_worker); } static void xs_sock_reset_connection_flags(struct rpc_xprt *xprt) @@ -1231,6 +1240,7 @@ static void xs_sock_reset_connection_flags(struct rpc_xprt *xprt) */ static void xs_error_report(struct sock *sk) { + struct sock_xprt *transport; struct rpc_xprt *xprt; int err; @@ -1238,13 +1248,14 @@ static void xs_error_report(struct sock *sk) if (!(xprt = xprt_from_sock(sk))) goto out; + transport = container_of(xprt, struct sock_xprt, xprt); err = -sk->sk_err; if (err == 0) goto out; dprintk("RPC: xs_error_report client %p, error=%d...\n", xprt, -err); trace_rpc_socket_error(xprt, sk->sk_socket, err); - xprt_wake_pending_tasks(xprt, err); + xs_run_error_worker(transport, XPRT_SOCK_WAKE_ERROR); out: read_unlock_bh(&sk->sk_callback_lock); } @@ -1507,7 +1518,7 @@ static void xs_tcp_state_change(struct sock *sk) xprt->stat.connect_count++; xprt->stat.connect_time += (long)jiffies - xprt->stat.connect_start; - xprt_wake_pending_tasks(xprt, -EAGAIN); + xs_run_error_worker(transport, XPRT_SOCK_WAKE_PENDING); } spin_unlock(&xprt->transport_lock); break; @@ -1525,7 +1536,7 @@ static void xs_tcp_state_change(struct sock *sk) /* The server initiated a shutdown of the socket */ xprt->connect_cookie++; clear_bit(XPRT_CONNECTED, &xprt->state); - xs_tcp_force_close(xprt); + xs_run_error_worker(transport, XPRT_SOCK_WAKE_DISCONNECT); /* fall through */ case TCP_CLOSING: /* @@ -1547,7 +1558,7 @@ static void xs_tcp_state_change(struct sock *sk) xprt_clear_connecting(xprt); clear_bit(XPRT_CLOSING, &xprt->state); /* Trigger the socket release */ - xs_tcp_force_close(xprt); + xs_run_error_worker(transport, XPRT_SOCK_WAKE_DISCONNECT); } out: read_unlock_bh(&sk->sk_callback_lock); @@ -1556,6 +1567,7 @@ static void xs_tcp_state_change(struct sock *sk) static void xs_write_space(struct sock *sk) { struct socket_wq *wq; + struct sock_xprt *transport; struct rpc_xprt *xprt; if (!sk->sk_socket) @@ -1564,13 +1576,14 @@ static void xs_write_space(struct sock *sk) if (unlikely(!(xprt = xprt_from_sock(sk)))) return; + transport = container_of(xprt, struct sock_xprt, xprt); rcu_read_lock(); wq = rcu_dereference(sk->sk_wq); if (!wq || test_and_clear_bit(SOCKWQ_ASYNC_NOSPACE, &wq->flags) == 0) goto out; - if (xprt_write_space(xprt)) - sk->sk_write_pending--; + xs_run_error_worker(transport, XPRT_SOCK_WAKE_WRITE); + sk->sk_write_pending--; out: rcu_read_unlock(); } @@ -2461,6 +2474,56 @@ static void xs_connect(struct rpc_xprt *xprt, struct rpc_task *task) delay); } +static void xs_wake_disconnect(struct sock_xprt *transport) +{ + if (test_and_clear_bit(XPRT_SOCK_WAKE_DISCONNECT, &transport->sock_state)) + xs_tcp_force_close(&transport->xprt); +} + +static void xs_wake_write(struct sock_xprt *transport) +{ + if (test_and_clear_bit(XPRT_SOCK_WAKE_WRITE, &transport->sock_state)) + xprt_write_space(&transport->xprt); +} + +static void xs_wake_error(struct sock_xprt *transport) +{ + int sockerr; + int sockerr_len = sizeof(sockerr); + + if (!test_bit(XPRT_SOCK_WAKE_ERROR, &transport->sock_state)) + return; + mutex_lock(&transport->recv_mutex); + if (transport->sock == NULL) + goto out; + if (!test_and_clear_bit(XPRT_SOCK_WAKE_ERROR, &transport->sock_state)) + goto out; + if (kernel_getsockopt(transport->sock, SOL_SOCKET, SO_ERROR, + (char *)&sockerr, &sockerr_len) != 0) + goto out; + if (sockerr < 0) + xprt_wake_pending_tasks(&transport->xprt, sockerr); +out: + mutex_unlock(&transport->recv_mutex); +} + +static void xs_wake_pending(struct sock_xprt *transport) +{ + if (test_and_clear_bit(XPRT_SOCK_WAKE_PENDING, &transport->sock_state)) + xprt_wake_pending_tasks(&transport->xprt, -EAGAIN); +} + +static void xs_error_handle(struct work_struct *work) +{ + struct sock_xprt *transport = container_of(work, + struct sock_xprt, error_worker); + + xs_wake_disconnect(transport); + xs_wake_write(transport); + xs_wake_error(transport); + xs_wake_pending(transport); +} + /** * xs_local_print_stats - display AF_LOCAL socket-specifc stats * @xprt: rpc_xprt struct containing statistics @@ -2873,6 +2936,7 @@ static struct rpc_xprt *xs_setup_local(struct xprt_create *args) xprt->timeout = &xs_local_default_timeout; INIT_WORK(&transport->recv_worker, xs_stream_data_receive_workfn); + INIT_WORK(&transport->error_worker, xs_error_handle); INIT_DELAYED_WORK(&transport->connect_worker, xs_dummy_setup_socket); switch (sun->sun_family) { @@ -2943,6 +3007,7 @@ static struct rpc_xprt *xs_setup_udp(struct xprt_create *args) xprt->timeout = &xs_udp_default_timeout; INIT_WORK(&transport->recv_worker, xs_udp_data_receive_workfn); + INIT_WORK(&transport->error_worker, xs_error_handle); INIT_DELAYED_WORK(&transport->connect_worker, xs_udp_setup_socket); switch (addr->sa_family) { @@ -3024,6 +3089,7 @@ static struct rpc_xprt *xs_setup_tcp(struct xprt_create *args) (xprt->timeout->to_retries + 1); INIT_WORK(&transport->recv_worker, xs_stream_data_receive_workfn); + INIT_WORK(&transport->error_worker, xs_error_handle); INIT_DELAYED_WORK(&transport->connect_worker, xs_tcp_setup_socket); switch (addr->sa_family) { -- cgit v1.2.3 From b5e924191f87239e555f3ef3b8d8e697bb95e7dc Mon Sep 17 00:00:00 2001 From: Trond Myklebust Date: Thu, 2 May 2019 11:21:08 -0400 Subject: SUNRPC: Remove the bh-safe lock requirement on xprt->transport_lock Signed-off-by: Trond Myklebust --- net/sunrpc/xprt.c | 61 ++++++++++++++---------------- net/sunrpc/xprtrdma/rpc_rdma.c | 4 +- net/sunrpc/xprtrdma/svc_rdma_backchannel.c | 4 +- net/sunrpc/xprtrdma/svc_rdma_transport.c | 8 ++-- net/sunrpc/xprtsock.c | 23 ++++++----- 5 files changed, 47 insertions(+), 53 deletions(-) (limited to 'net') diff --git a/net/sunrpc/xprt.c b/net/sunrpc/xprt.c index f6c82b1651e7..8d41fcf25650 100644 --- a/net/sunrpc/xprt.c +++ b/net/sunrpc/xprt.c @@ -302,9 +302,9 @@ static inline int xprt_lock_write(struct rpc_xprt *xprt, struct rpc_task *task) if (test_bit(XPRT_LOCKED, &xprt->state) && xprt->snd_task == task) return 1; - spin_lock_bh(&xprt->transport_lock); + spin_lock(&xprt->transport_lock); retval = xprt->ops->reserve_xprt(xprt, task); - spin_unlock_bh(&xprt->transport_lock); + spin_unlock(&xprt->transport_lock); return retval; } @@ -381,9 +381,9 @@ static inline void xprt_release_write(struct rpc_xprt *xprt, struct rpc_task *ta { if (xprt->snd_task != task) return; - spin_lock_bh(&xprt->transport_lock); + spin_lock(&xprt->transport_lock); xprt->ops->release_xprt(xprt, task); - spin_unlock_bh(&xprt->transport_lock); + spin_unlock(&xprt->transport_lock); } /* @@ -435,9 +435,9 @@ xprt_request_get_cong(struct rpc_xprt *xprt, struct rpc_rqst *req) if (req->rq_cong) return true; - spin_lock_bh(&xprt->transport_lock); + spin_lock(&xprt->transport_lock); ret = __xprt_get_cong(xprt, req) != 0; - spin_unlock_bh(&xprt->transport_lock); + spin_unlock(&xprt->transport_lock); return ret; } EXPORT_SYMBOL_GPL(xprt_request_get_cong); @@ -464,9 +464,9 @@ static void xprt_clear_congestion_window_wait(struct rpc_xprt *xprt) { if (test_and_clear_bit(XPRT_CWND_WAIT, &xprt->state)) { - spin_lock_bh(&xprt->transport_lock); + spin_lock(&xprt->transport_lock); __xprt_lock_write_next_cong(xprt); - spin_unlock_bh(&xprt->transport_lock); + spin_unlock(&xprt->transport_lock); } } @@ -563,9 +563,9 @@ bool xprt_write_space(struct rpc_xprt *xprt) if (!test_bit(XPRT_WRITE_SPACE, &xprt->state)) return false; - spin_lock_bh(&xprt->transport_lock); + spin_lock(&xprt->transport_lock); ret = xprt_clear_write_space_locked(xprt); - spin_unlock_bh(&xprt->transport_lock); + spin_unlock(&xprt->transport_lock); return ret; } EXPORT_SYMBOL_GPL(xprt_write_space); @@ -634,9 +634,9 @@ int xprt_adjust_timeout(struct rpc_rqst *req) req->rq_retries = 0; xprt_reset_majortimeo(req); /* Reset the RTT counters == "slow start" */ - spin_lock_bh(&xprt->transport_lock); + spin_lock(&xprt->transport_lock); rpc_init_rtt(req->rq_task->tk_client->cl_rtt, to->to_initval); - spin_unlock_bh(&xprt->transport_lock); + spin_unlock(&xprt->transport_lock); status = -ETIMEDOUT; } @@ -668,11 +668,11 @@ static void xprt_autoclose(struct work_struct *work) void xprt_disconnect_done(struct rpc_xprt *xprt) { dprintk("RPC: disconnected transport %p\n", xprt); - spin_lock_bh(&xprt->transport_lock); + spin_lock(&xprt->transport_lock); xprt_clear_connected(xprt); xprt_clear_write_space_locked(xprt); xprt_wake_pending_tasks(xprt, -ENOTCONN); - spin_unlock_bh(&xprt->transport_lock); + spin_unlock(&xprt->transport_lock); } EXPORT_SYMBOL_GPL(xprt_disconnect_done); @@ -684,7 +684,7 @@ EXPORT_SYMBOL_GPL(xprt_disconnect_done); void xprt_force_disconnect(struct rpc_xprt *xprt) { /* Don't race with the test_bit() in xprt_clear_locked() */ - spin_lock_bh(&xprt->transport_lock); + spin_lock(&xprt->transport_lock); set_bit(XPRT_CLOSE_WAIT, &xprt->state); /* Try to schedule an autoclose RPC call */ if (test_and_set_bit(XPRT_LOCKED, &xprt->state) == 0) @@ -692,7 +692,7 @@ void xprt_force_disconnect(struct rpc_xprt *xprt) else if (xprt->snd_task) rpc_wake_up_queued_task_set_status(&xprt->pending, xprt->snd_task, -ENOTCONN); - spin_unlock_bh(&xprt->transport_lock); + spin_unlock(&xprt->transport_lock); } EXPORT_SYMBOL_GPL(xprt_force_disconnect); @@ -726,7 +726,7 @@ xprt_request_retransmit_after_disconnect(struct rpc_task *task) void xprt_conditional_disconnect(struct rpc_xprt *xprt, unsigned int cookie) { /* Don't race with the test_bit() in xprt_clear_locked() */ - spin_lock_bh(&xprt->transport_lock); + spin_lock(&xprt->transport_lock); if (cookie != xprt->connect_cookie) goto out; if (test_bit(XPRT_CLOSING, &xprt->state)) @@ -737,7 +737,7 @@ void xprt_conditional_disconnect(struct rpc_xprt *xprt, unsigned int cookie) queue_work(xprtiod_workqueue, &xprt->task_cleanup); xprt_wake_pending_tasks(xprt, -EAGAIN); out: - spin_unlock_bh(&xprt->transport_lock); + spin_unlock(&xprt->transport_lock); } static bool @@ -759,18 +759,13 @@ xprt_init_autodisconnect(struct timer_list *t) { struct rpc_xprt *xprt = from_timer(xprt, t, timer); - spin_lock(&xprt->transport_lock); if (!RB_EMPTY_ROOT(&xprt->recv_queue)) - goto out_abort; + return; /* Reset xprt->last_used to avoid connect/autodisconnect cycling */ xprt->last_used = jiffies; if (test_and_set_bit(XPRT_LOCKED, &xprt->state)) - goto out_abort; - spin_unlock(&xprt->transport_lock); + return; queue_work(xprtiod_workqueue, &xprt->task_cleanup); - return; -out_abort: - spin_unlock(&xprt->transport_lock); } bool xprt_lock_connect(struct rpc_xprt *xprt, @@ -779,7 +774,7 @@ bool xprt_lock_connect(struct rpc_xprt *xprt, { bool ret = false; - spin_lock_bh(&xprt->transport_lock); + spin_lock(&xprt->transport_lock); if (!test_bit(XPRT_LOCKED, &xprt->state)) goto out; if (xprt->snd_task != task) @@ -787,13 +782,13 @@ bool xprt_lock_connect(struct rpc_xprt *xprt, xprt->snd_task = cookie; ret = true; out: - spin_unlock_bh(&xprt->transport_lock); + spin_unlock(&xprt->transport_lock); return ret; } void xprt_unlock_connect(struct rpc_xprt *xprt, void *cookie) { - spin_lock_bh(&xprt->transport_lock); + spin_lock(&xprt->transport_lock); if (xprt->snd_task != cookie) goto out; if (!test_bit(XPRT_LOCKED, &xprt->state)) @@ -802,7 +797,7 @@ void xprt_unlock_connect(struct rpc_xprt *xprt, void *cookie) xprt->ops->release_xprt(xprt, NULL); xprt_schedule_autodisconnect(xprt); out: - spin_unlock_bh(&xprt->transport_lock); + spin_unlock(&xprt->transport_lock); wake_up_bit(&xprt->state, XPRT_LOCKED); } @@ -1412,14 +1407,14 @@ xprt_request_transmit(struct rpc_rqst *req, struct rpc_task *snd_task) xprt_inject_disconnect(xprt); task->tk_flags |= RPC_TASK_SENT; - spin_lock_bh(&xprt->transport_lock); + spin_lock(&xprt->transport_lock); xprt->stat.sends++; xprt->stat.req_u += xprt->stat.sends - xprt->stat.recvs; xprt->stat.bklog_u += xprt->backlog.qlen; xprt->stat.sending_u += xprt->sending.qlen; xprt->stat.pending_u += xprt->pending.qlen; - spin_unlock_bh(&xprt->transport_lock); + spin_unlock(&xprt->transport_lock); req->rq_connect_cookie = connect_cookie; out_dequeue: @@ -1770,13 +1765,13 @@ void xprt_release(struct rpc_task *task) else if (task->tk_client) rpc_count_iostats(task, task->tk_client->cl_metrics); xprt_request_dequeue_all(task, req); - spin_lock_bh(&xprt->transport_lock); + spin_lock(&xprt->transport_lock); xprt->ops->release_xprt(xprt, task); if (xprt->ops->release_request) xprt->ops->release_request(task); xprt->last_used = jiffies; xprt_schedule_autodisconnect(xprt); - spin_unlock_bh(&xprt->transport_lock); + spin_unlock(&xprt->transport_lock); if (req->rq_buffer) xprt->ops->buf_free(task); xprt_inject_disconnect(xprt); diff --git a/net/sunrpc/xprtrdma/rpc_rdma.c b/net/sunrpc/xprtrdma/rpc_rdma.c index 85115a2e2639..7dc62e55f526 100644 --- a/net/sunrpc/xprtrdma/rpc_rdma.c +++ b/net/sunrpc/xprtrdma/rpc_rdma.c @@ -1360,10 +1360,10 @@ void rpcrdma_reply_handler(struct rpcrdma_rep *rep) else if (credits > buf->rb_max_requests) credits = buf->rb_max_requests; if (buf->rb_credits != credits) { - spin_lock_bh(&xprt->transport_lock); + spin_lock(&xprt->transport_lock); buf->rb_credits = credits; xprt->cwnd = credits << RPC_CWNDSHIFT; - spin_unlock_bh(&xprt->transport_lock); + spin_unlock(&xprt->transport_lock); } req = rpcr_to_rdmar(rqst); diff --git a/net/sunrpc/xprtrdma/svc_rdma_backchannel.c b/net/sunrpc/xprtrdma/svc_rdma_backchannel.c index bed57d8b5c19..d1fcc41d5eb5 100644 --- a/net/sunrpc/xprtrdma/svc_rdma_backchannel.c +++ b/net/sunrpc/xprtrdma/svc_rdma_backchannel.c @@ -72,9 +72,9 @@ int svc_rdma_handle_bc_reply(struct rpc_xprt *xprt, __be32 *rdma_resp, else if (credits > r_xprt->rx_buf.rb_bc_max_requests) credits = r_xprt->rx_buf.rb_bc_max_requests; - spin_lock_bh(&xprt->transport_lock); + spin_lock(&xprt->transport_lock); xprt->cwnd = credits << RPC_CWNDSHIFT; - spin_unlock_bh(&xprt->transport_lock); + spin_unlock(&xprt->transport_lock); spin_lock(&xprt->queue_lock); ret = 0; diff --git a/net/sunrpc/xprtrdma/svc_rdma_transport.c b/net/sunrpc/xprtrdma/svc_rdma_transport.c index 0004535c0188..3fe665152d95 100644 --- a/net/sunrpc/xprtrdma/svc_rdma_transport.c +++ b/net/sunrpc/xprtrdma/svc_rdma_transport.c @@ -226,9 +226,9 @@ static void handle_connect_req(struct rdma_cm_id *new_cma_id, * Enqueue the new transport on the accept queue of the listening * transport */ - spin_lock_bh(&listen_xprt->sc_lock); + spin_lock(&listen_xprt->sc_lock); list_add_tail(&newxprt->sc_accept_q, &listen_xprt->sc_accept_q); - spin_unlock_bh(&listen_xprt->sc_lock); + spin_unlock(&listen_xprt->sc_lock); set_bit(XPT_CONN, &listen_xprt->sc_xprt.xpt_flags); svc_xprt_enqueue(&listen_xprt->sc_xprt); @@ -401,7 +401,7 @@ static struct svc_xprt *svc_rdma_accept(struct svc_xprt *xprt) listen_rdma = container_of(xprt, struct svcxprt_rdma, sc_xprt); clear_bit(XPT_CONN, &xprt->xpt_flags); /* Get the next entry off the accept list */ - spin_lock_bh(&listen_rdma->sc_lock); + spin_lock(&listen_rdma->sc_lock); if (!list_empty(&listen_rdma->sc_accept_q)) { newxprt = list_entry(listen_rdma->sc_accept_q.next, struct svcxprt_rdma, sc_accept_q); @@ -409,7 +409,7 @@ static struct svc_xprt *svc_rdma_accept(struct svc_xprt *xprt) } if (!list_empty(&listen_rdma->sc_accept_q)) set_bit(XPT_CONN, &listen_rdma->sc_xprt.xpt_flags); - spin_unlock_bh(&listen_rdma->sc_lock); + spin_unlock(&listen_rdma->sc_lock); if (!newxprt) return NULL; diff --git a/net/sunrpc/xprtsock.c b/net/sunrpc/xprtsock.c index 92af57019b96..97c15d47f343 100644 --- a/net/sunrpc/xprtsock.c +++ b/net/sunrpc/xprtsock.c @@ -880,7 +880,7 @@ static int xs_nospace(struct rpc_rqst *req) req->rq_slen); /* Protect against races with write_space */ - spin_lock_bh(&xprt->transport_lock); + spin_lock(&xprt->transport_lock); /* Don't race with disconnect */ if (xprt_connected(xprt)) { @@ -890,7 +890,7 @@ static int xs_nospace(struct rpc_rqst *req) } else ret = -ENOTCONN; - spin_unlock_bh(&xprt->transport_lock); + spin_unlock(&xprt->transport_lock); /* Race breaker in case memory is freed before above code is called */ if (ret == -EAGAIN) { @@ -1344,6 +1344,7 @@ static void xs_destroy(struct rpc_xprt *xprt) cancel_delayed_work_sync(&transport->connect_worker); xs_close(xprt); cancel_work_sync(&transport->recv_worker); + cancel_work_sync(&transport->error_worker); xs_xprt_free(xprt); module_put(THIS_MODULE); } @@ -1397,9 +1398,9 @@ static void xs_udp_data_read_skb(struct rpc_xprt *xprt, } - spin_lock_bh(&xprt->transport_lock); + spin_lock(&xprt->transport_lock); xprt_adjust_cwnd(xprt, task, copied); - spin_unlock_bh(&xprt->transport_lock); + spin_unlock(&xprt->transport_lock); spin_lock(&xprt->queue_lock); xprt_complete_rqst(task, copied); __UDPX_INC_STATS(sk, UDP_MIB_INDATAGRAMS); @@ -1509,7 +1510,6 @@ static void xs_tcp_state_change(struct sock *sk) trace_rpc_socket_state_change(xprt, sk->sk_socket); switch (sk->sk_state) { case TCP_ESTABLISHED: - spin_lock(&xprt->transport_lock); if (!xprt_test_and_set_connected(xprt)) { xprt->connect_cookie++; clear_bit(XPRT_SOCK_CONNECTING, &transport->sock_state); @@ -1520,7 +1520,6 @@ static void xs_tcp_state_change(struct sock *sk) xprt->stat.connect_start; xs_run_error_worker(transport, XPRT_SOCK_WAKE_PENDING); } - spin_unlock(&xprt->transport_lock); break; case TCP_FIN_WAIT1: /* The client initiated a shutdown of the socket */ @@ -1677,9 +1676,9 @@ static void xs_udp_set_buffer_size(struct rpc_xprt *xprt, size_t sndsize, size_t */ static void xs_udp_timer(struct rpc_xprt *xprt, struct rpc_task *task) { - spin_lock_bh(&xprt->transport_lock); + spin_lock(&xprt->transport_lock); xprt_adjust_cwnd(xprt, task, -ETIMEDOUT); - spin_unlock_bh(&xprt->transport_lock); + spin_unlock(&xprt->transport_lock); } static int xs_get_random_port(void) @@ -2214,13 +2213,13 @@ static void xs_tcp_set_socket_timeouts(struct rpc_xprt *xprt, unsigned int opt_on = 1; unsigned int timeo; - spin_lock_bh(&xprt->transport_lock); + spin_lock(&xprt->transport_lock); keepidle = DIV_ROUND_UP(xprt->timeout->to_initval, HZ); keepcnt = xprt->timeout->to_retries + 1; timeo = jiffies_to_msecs(xprt->timeout->to_initval) * (xprt->timeout->to_retries + 1); clear_bit(XPRT_SOCK_UPD_TIMEOUT, &transport->sock_state); - spin_unlock_bh(&xprt->transport_lock); + spin_unlock(&xprt->transport_lock); /* TCP Keepalive options */ kernel_setsockopt(sock, SOL_SOCKET, SO_KEEPALIVE, @@ -2245,7 +2244,7 @@ static void xs_tcp_set_connect_timeout(struct rpc_xprt *xprt, struct rpc_timeout to; unsigned long initval; - spin_lock_bh(&xprt->transport_lock); + spin_lock(&xprt->transport_lock); if (reconnect_timeout < xprt->max_reconnect_timeout) xprt->max_reconnect_timeout = reconnect_timeout; if (connect_timeout < xprt->connect_timeout) { @@ -2262,7 +2261,7 @@ static void xs_tcp_set_connect_timeout(struct rpc_xprt *xprt, xprt->connect_timeout = connect_timeout; } set_bit(XPRT_SOCK_UPD_TIMEOUT, &transport->sock_state); - spin_unlock_bh(&xprt->transport_lock); + spin_unlock(&xprt->transport_lock); } static int xs_tcp_finish_connecting(struct rpc_xprt *xprt, struct socket *sock) -- cgit v1.2.3 From 21f0ffaff510b0530bfdf77da7133c0b99dee2fe Mon Sep 17 00:00:00 2001 From: Trond Myklebust Date: Fri, 28 Apr 2017 10:52:42 -0400 Subject: SUNRPC: Add basic load balancing to the transport switch For now, just count the queue length. It is less accurate than counting number of bytes queued, but easier to implement. Signed-off-by: Trond Myklebust --- include/linux/sunrpc/xprt.h | 1 + include/linux/sunrpc/xprtmultipath.h | 2 ++ net/sunrpc/clnt.c | 40 +++++++++++++++++++++++++++++++++--- net/sunrpc/xprtmultipath.c | 20 +++++++++++++++++- 4 files changed, 59 insertions(+), 4 deletions(-) (limited to 'net') diff --git a/include/linux/sunrpc/xprt.h b/include/linux/sunrpc/xprt.h index a6d9fce7f20e..15322c1d9c8c 100644 --- a/include/linux/sunrpc/xprt.h +++ b/include/linux/sunrpc/xprt.h @@ -238,6 +238,7 @@ struct rpc_xprt { /* * Send stuff */ + atomic_long_t queuelen; spinlock_t transport_lock; /* lock transport info */ spinlock_t reserve_lock; /* lock slot table */ spinlock_t queue_lock; /* send/receive queue lock */ diff --git a/include/linux/sunrpc/xprtmultipath.h b/include/linux/sunrpc/xprtmultipath.h index af1257c030d2..c6cce3fbf29d 100644 --- a/include/linux/sunrpc/xprtmultipath.h +++ b/include/linux/sunrpc/xprtmultipath.h @@ -15,6 +15,8 @@ struct rpc_xprt_switch { struct kref xps_kref; unsigned int xps_nxprts; + unsigned int xps_nactive; + atomic_long_t xps_queuelen; struct list_head xps_xprt_list; struct net * xps_net; diff --git a/net/sunrpc/clnt.c b/net/sunrpc/clnt.c index b03bfa055c08..976eab68bb5d 100644 --- a/net/sunrpc/clnt.c +++ b/net/sunrpc/clnt.c @@ -968,13 +968,47 @@ out: } EXPORT_SYMBOL_GPL(rpc_bind_new_program); +static struct rpc_xprt * +rpc_task_get_xprt(struct rpc_clnt *clnt) +{ + struct rpc_xprt_switch *xps; + struct rpc_xprt *xprt= xprt_iter_get_next(&clnt->cl_xpi); + + if (!xprt) + return NULL; + rcu_read_lock(); + xps = rcu_dereference(clnt->cl_xpi.xpi_xpswitch); + atomic_long_inc(&xps->xps_queuelen); + rcu_read_unlock(); + atomic_long_inc(&xprt->queuelen); + + return xprt; +} + +static void +rpc_task_release_xprt(struct rpc_clnt *clnt, struct rpc_xprt *xprt) +{ + struct rpc_xprt_switch *xps; + + atomic_long_dec(&xprt->queuelen); + rcu_read_lock(); + xps = rcu_dereference(clnt->cl_xpi.xpi_xpswitch); + atomic_long_dec(&xps->xps_queuelen); + rcu_read_unlock(); + + xprt_put(xprt); +} + void rpc_task_release_transport(struct rpc_task *task) { struct rpc_xprt *xprt = task->tk_xprt; if (xprt) { task->tk_xprt = NULL; - xprt_put(xprt); + if (task->tk_client) + rpc_task_release_xprt(task->tk_client, xprt); + else + xprt_put(xprt); } } EXPORT_SYMBOL_GPL(rpc_task_release_transport); @@ -983,6 +1017,7 @@ void rpc_task_release_client(struct rpc_task *task) { struct rpc_clnt *clnt = task->tk_client; + rpc_task_release_transport(task); if (clnt != NULL) { /* Remove from client task list */ spin_lock(&clnt->cl_lock); @@ -992,14 +1027,13 @@ void rpc_task_release_client(struct rpc_task *task) rpc_release_client(clnt); } - rpc_task_release_transport(task); } static void rpc_task_set_transport(struct rpc_task *task, struct rpc_clnt *clnt) { if (!task->tk_xprt) - task->tk_xprt = xprt_iter_get_next(&clnt->cl_xpi); + task->tk_xprt = rpc_task_get_xprt(clnt); } static diff --git a/net/sunrpc/xprtmultipath.c b/net/sunrpc/xprtmultipath.c index 8394124126f8..394e427533be 100644 --- a/net/sunrpc/xprtmultipath.c +++ b/net/sunrpc/xprtmultipath.c @@ -36,6 +36,7 @@ static void xprt_switch_add_xprt_locked(struct rpc_xprt_switch *xps, if (xps->xps_nxprts == 0) xps->xps_net = xprt->xprt_net; xps->xps_nxprts++; + xps->xps_nactive++; } /** @@ -62,6 +63,7 @@ static void xprt_switch_remove_xprt_locked(struct rpc_xprt_switch *xps, { if (unlikely(xprt == NULL)) return; + xps->xps_nactive--; xps->xps_nxprts--; if (xps->xps_nxprts == 0) xps->xps_net = NULL; @@ -317,8 +319,24 @@ struct rpc_xprt *xprt_switch_find_next_entry_roundrobin(struct list_head *head, static struct rpc_xprt *xprt_iter_next_entry_roundrobin(struct rpc_xprt_iter *xpi) { - return xprt_iter_next_entry_multiple(xpi, + struct rpc_xprt_switch *xps = rcu_dereference(xpi->xpi_xpswitch); + struct rpc_xprt *xprt; + unsigned long xprt_queuelen; + unsigned long xps_queuelen; + unsigned long xps_avglen; + + do { + xprt = xprt_iter_next_entry_multiple(xpi, xprt_switch_find_next_entry_roundrobin); + if (xprt == NULL) + break; + xprt_queuelen = atomic_long_read(&xprt->queuelen); + if (xprt_queuelen <= 2) + break; + xps_queuelen = atomic_long_read(&xps->xps_queuelen); + xps_avglen = DIV_ROUND_UP(xps_queuelen, xps->xps_nactive); + } while (xprt_queuelen > xps_avglen); + return xprt; } static -- cgit v1.2.3 From c049f8ea9a0db11d87bc8cb4c106be65fe06b70b Mon Sep 17 00:00:00 2001 From: Trond Myklebust Date: Thu, 2 May 2019 11:23:12 -0400 Subject: SUNRPC: Remove the bh-safe lock requirement on the rpc_wait_queue->lock Signed-off-by: Trond Myklebust --- net/sunrpc/sched.c | 50 +++++++++++++++++++++++++------------------------- 1 file changed, 25 insertions(+), 25 deletions(-) (limited to 'net') diff --git a/net/sunrpc/sched.c b/net/sunrpc/sched.c index e0a0cf381eba..191916168e67 100644 --- a/net/sunrpc/sched.c +++ b/net/sunrpc/sched.c @@ -432,9 +432,9 @@ void rpc_sleep_on_timeout(struct rpc_wait_queue *q, struct rpc_task *task, /* * Protect the queue operations. */ - spin_lock_bh(&q->lock); + spin_lock(&q->lock); __rpc_sleep_on_priority_timeout(q, task, timeout, task->tk_priority); - spin_unlock_bh(&q->lock); + spin_unlock(&q->lock); } EXPORT_SYMBOL_GPL(rpc_sleep_on_timeout); @@ -450,9 +450,9 @@ void rpc_sleep_on(struct rpc_wait_queue *q, struct rpc_task *task, /* * Protect the queue operations. */ - spin_lock_bh(&q->lock); + spin_lock(&q->lock); __rpc_sleep_on_priority(q, task, task->tk_priority); - spin_unlock_bh(&q->lock); + spin_unlock(&q->lock); } EXPORT_SYMBOL_GPL(rpc_sleep_on); @@ -466,9 +466,9 @@ void rpc_sleep_on_priority_timeout(struct rpc_wait_queue *q, /* * Protect the queue operations. */ - spin_lock_bh(&q->lock); + spin_lock(&q->lock); __rpc_sleep_on_priority_timeout(q, task, timeout, priority); - spin_unlock_bh(&q->lock); + spin_unlock(&q->lock); } EXPORT_SYMBOL_GPL(rpc_sleep_on_priority_timeout); @@ -483,9 +483,9 @@ void rpc_sleep_on_priority(struct rpc_wait_queue *q, struct rpc_task *task, /* * Protect the queue operations. */ - spin_lock_bh(&q->lock); + spin_lock(&q->lock); __rpc_sleep_on_priority(q, task, priority); - spin_unlock_bh(&q->lock); + spin_unlock(&q->lock); } EXPORT_SYMBOL_GPL(rpc_sleep_on_priority); @@ -563,9 +563,9 @@ void rpc_wake_up_queued_task_on_wq(struct workqueue_struct *wq, { if (!RPC_IS_QUEUED(task)) return; - spin_lock_bh(&queue->lock); + spin_lock(&queue->lock); rpc_wake_up_task_on_wq_queue_locked(wq, queue, task); - spin_unlock_bh(&queue->lock); + spin_unlock(&queue->lock); } /* @@ -575,9 +575,9 @@ void rpc_wake_up_queued_task(struct rpc_wait_queue *queue, struct rpc_task *task { if (!RPC_IS_QUEUED(task)) return; - spin_lock_bh(&queue->lock); + spin_lock(&queue->lock); rpc_wake_up_task_queue_locked(queue, task); - spin_unlock_bh(&queue->lock); + spin_unlock(&queue->lock); } EXPORT_SYMBOL_GPL(rpc_wake_up_queued_task); @@ -610,9 +610,9 @@ rpc_wake_up_queued_task_set_status(struct rpc_wait_queue *queue, { if (!RPC_IS_QUEUED(task)) return; - spin_lock_bh(&queue->lock); + spin_lock(&queue->lock); rpc_wake_up_task_queue_set_status_locked(queue, task, status); - spin_unlock_bh(&queue->lock); + spin_unlock(&queue->lock); } /* @@ -675,12 +675,12 @@ struct rpc_task *rpc_wake_up_first_on_wq(struct workqueue_struct *wq, dprintk("RPC: wake_up_first(%p \"%s\")\n", queue, rpc_qname(queue)); - spin_lock_bh(&queue->lock); + spin_lock(&queue->lock); task = __rpc_find_next_queued(queue); if (task != NULL) task = rpc_wake_up_task_on_wq_queue_action_locked(wq, queue, task, func, data); - spin_unlock_bh(&queue->lock); + spin_unlock(&queue->lock); return task; } @@ -719,7 +719,7 @@ void rpc_wake_up(struct rpc_wait_queue *queue) { struct list_head *head; - spin_lock_bh(&queue->lock); + spin_lock(&queue->lock); head = &queue->tasks[queue->maxpriority]; for (;;) { while (!list_empty(head)) { @@ -733,7 +733,7 @@ void rpc_wake_up(struct rpc_wait_queue *queue) break; head--; } - spin_unlock_bh(&queue->lock); + spin_unlock(&queue->lock); } EXPORT_SYMBOL_GPL(rpc_wake_up); @@ -748,7 +748,7 @@ void rpc_wake_up_status(struct rpc_wait_queue *queue, int status) { struct list_head *head; - spin_lock_bh(&queue->lock); + spin_lock(&queue->lock); head = &queue->tasks[queue->maxpriority]; for (;;) { while (!list_empty(head)) { @@ -763,7 +763,7 @@ void rpc_wake_up_status(struct rpc_wait_queue *queue, int status) break; head--; } - spin_unlock_bh(&queue->lock); + spin_unlock(&queue->lock); } EXPORT_SYMBOL_GPL(rpc_wake_up_status); @@ -775,7 +775,7 @@ static void __rpc_queue_timer_fn(struct work_struct *work) struct rpc_task *task, *n; unsigned long expires, now, timeo; - spin_lock_bh(&queue->lock); + spin_lock(&queue->lock); expires = now = jiffies; list_for_each_entry_safe(task, n, &queue->timer_list.list, u.tk_wait.timer_list) { timeo = task->tk_timeout; @@ -790,7 +790,7 @@ static void __rpc_queue_timer_fn(struct work_struct *work) } if (!list_empty(&queue->timer_list.list)) rpc_set_queue_timer(queue, expires); - spin_unlock_bh(&queue->lock); + spin_unlock(&queue->lock); } static void __rpc_atrun(struct rpc_task *task) @@ -937,13 +937,13 @@ static void __rpc_execute(struct rpc_task *task) * rpc_task pointer may still be dereferenced. */ queue = task->tk_waitqueue; - spin_lock_bh(&queue->lock); + spin_lock(&queue->lock); if (!RPC_IS_QUEUED(task)) { - spin_unlock_bh(&queue->lock); + spin_unlock(&queue->lock); continue; } rpc_clear_running(task); - spin_unlock_bh(&queue->lock); + spin_unlock(&queue->lock); if (task_is_async) return; -- cgit v1.2.3 From 612b41f808a98a124b23d72229693c3181733291 Mon Sep 17 00:00:00 2001 From: Trond Myklebust Date: Thu, 27 Apr 2017 08:50:51 -0400 Subject: SUNRPC: Allow creation of RPC clients with multiple connections Add an argument to struct rpc_create_args that allows the specification of how many transport connections you want to set up to the server. Signed-off-by: Trond Myklebust --- include/linux/sunrpc/clnt.h | 1 + net/sunrpc/clnt.c | 17 ++++++++++++++++- net/sunrpc/xprtmultipath.c | 3 +-- 3 files changed, 18 insertions(+), 3 deletions(-) (limited to 'net') diff --git a/include/linux/sunrpc/clnt.h b/include/linux/sunrpc/clnt.h index 6e8073140a5d..4619098affa3 100644 --- a/include/linux/sunrpc/clnt.h +++ b/include/linux/sunrpc/clnt.h @@ -124,6 +124,7 @@ struct rpc_create_args { u32 prognumber; /* overrides program->number */ u32 version; rpc_authflavor_t authflavor; + u32 nconnect; unsigned long flags; char *client_name; struct svc_xprt *bc_xprt; /* NFSv4.1 backchannel */ diff --git a/net/sunrpc/clnt.c b/net/sunrpc/clnt.c index 976eab68bb5d..b6aca8cb5ae6 100644 --- a/net/sunrpc/clnt.c +++ b/net/sunrpc/clnt.c @@ -528,6 +528,8 @@ struct rpc_clnt *rpc_create(struct rpc_create_args *args) .bc_xprt = args->bc_xprt, }; char servername[48]; + struct rpc_clnt *clnt; + int i; if (args->bc_xprt) { WARN_ON_ONCE(!(args->protocol & XPRT_TRANSPORT_BC)); @@ -590,7 +592,15 @@ struct rpc_clnt *rpc_create(struct rpc_create_args *args) if (args->flags & RPC_CLNT_CREATE_NONPRIVPORT) xprt->resvport = 0; - return rpc_create_xprt(args, xprt); + clnt = rpc_create_xprt(args, xprt); + if (IS_ERR(clnt) || args->nconnect <= 1) + return clnt; + + for (i = 0; i < args->nconnect - 1; i++) { + if (rpc_clnt_add_xprt(clnt, &xprtargs, NULL, NULL) < 0) + break; + } + return clnt; } EXPORT_SYMBOL_GPL(rpc_create); @@ -2730,6 +2740,10 @@ int rpc_clnt_test_and_add_xprt(struct rpc_clnt *clnt, return -ENOMEM; data->xps = xprt_switch_get(xps); data->xprt = xprt_get(xprt); + if (rpc_xprt_switch_has_addr(data->xps, (struct sockaddr *)&xprt->addr)) { + rpc_cb_add_xprt_release(data); + goto success; + } task = rpc_call_null_helper(clnt, xprt, NULL, RPC_TASK_SOFT|RPC_TASK_SOFTCONN|RPC_TASK_ASYNC|RPC_TASK_NULLCREDS, @@ -2737,6 +2751,7 @@ int rpc_clnt_test_and_add_xprt(struct rpc_clnt *clnt, if (IS_ERR(task)) return PTR_ERR(task); rpc_put_task(task); +success: return 1; } EXPORT_SYMBOL_GPL(rpc_clnt_test_and_add_xprt); diff --git a/net/sunrpc/xprtmultipath.c b/net/sunrpc/xprtmultipath.c index 394e427533be..9d66ce53355d 100644 --- a/net/sunrpc/xprtmultipath.c +++ b/net/sunrpc/xprtmultipath.c @@ -52,8 +52,7 @@ void rpc_xprt_switch_add_xprt(struct rpc_xprt_switch *xps, if (xprt == NULL) return; spin_lock(&xps->xps_lock); - if ((xps->xps_net == xprt->xprt_net || xps->xps_net == NULL) && - !rpc_xprt_switch_has_addr(xps, (struct sockaddr *)&xprt->addr)) + if (xps->xps_net == xprt->xprt_net || xps->xps_net == NULL) xprt_switch_add_xprt_locked(xps, xprt); spin_unlock(&xps->xps_lock); } -- cgit v1.2.3 From 5a0c257f8e0f4c4b3c33dff545317c21a921303e Mon Sep 17 00:00:00 2001 From: NeilBrown Date: Thu, 30 May 2019 10:41:28 +1000 Subject: NFS: send state management on a single connection. With NFSv4.1, different network connections need to be explicitly bound to a session. During session startup, this is not possible so only a single connection must be used for session startup. So add a task flag to disable the default round-robin choice of connections (when nconnect > 1) and force the use of a single connection. Then use that flag on all requests for session management - for consistence, include NFSv4.0 management (SETCLIENTID) and session destruction Reported-by: Chuck Lever Signed-off-by: NeilBrown Signed-off-by: Trond Myklebust --- fs/nfs/nfs4proc.c | 22 +++++++++++++--------- include/linux/sunrpc/sched.h | 1 + net/sunrpc/clnt.c | 24 +++++++++++++++++++++++- 3 files changed, 37 insertions(+), 10 deletions(-) (limited to 'net') diff --git a/fs/nfs/nfs4proc.c b/fs/nfs/nfs4proc.c index 26626ea1f197..d115d9973efc 100644 --- a/fs/nfs/nfs4proc.c +++ b/fs/nfs/nfs4proc.c @@ -5992,7 +5992,7 @@ int nfs4_proc_setclientid(struct nfs_client *clp, u32 program, .rpc_message = &msg, .callback_ops = &nfs4_setclientid_ops, .callback_data = &setclientid, - .flags = RPC_TASK_TIMEOUT, + .flags = RPC_TASK_TIMEOUT | RPC_TASK_NO_ROUND_ROBIN, }; int status; @@ -6058,7 +6058,8 @@ int nfs4_proc_setclientid_confirm(struct nfs_client *clp, dprintk("NFS call setclientid_confirm auth=%s, (client ID %llx)\n", clp->cl_rpcclient->cl_auth->au_ops->au_name, clp->cl_clientid); - status = rpc_call_sync(clp->cl_rpcclient, &msg, RPC_TASK_TIMEOUT); + status = rpc_call_sync(clp->cl_rpcclient, &msg, + RPC_TASK_TIMEOUT | RPC_TASK_NO_ROUND_ROBIN); trace_nfs4_setclientid_confirm(clp, status); dprintk("NFS reply setclientid_confirm: %d\n", status); return status; @@ -7639,7 +7640,7 @@ static int _nfs4_proc_secinfo(struct inode *dir, const struct qstr *name, struct NFS_SP4_MACH_CRED_SECINFO, &clnt, &msg); status = nfs4_call_sync(clnt, NFS_SERVER(dir), &msg, &args.seq_args, - &res.seq_res, 0); + &res.seq_res, RPC_TASK_NO_ROUND_ROBIN); dprintk("NFS reply secinfo: %d\n", status); put_cred(cred); @@ -7977,7 +7978,7 @@ nfs4_run_exchange_id(struct nfs_client *clp, const struct cred *cred, .rpc_client = clp->cl_rpcclient, .callback_ops = &nfs4_exchange_id_call_ops, .rpc_message = &msg, - .flags = RPC_TASK_TIMEOUT, + .flags = RPC_TASK_TIMEOUT | RPC_TASK_NO_ROUND_ROBIN, }; struct nfs41_exchange_id_data *calldata; int status; @@ -8202,7 +8203,8 @@ static int _nfs4_proc_destroy_clientid(struct nfs_client *clp, }; int status; - status = rpc_call_sync(clp->cl_rpcclient, &msg, RPC_TASK_TIMEOUT); + status = rpc_call_sync(clp->cl_rpcclient, &msg, + RPC_TASK_TIMEOUT | RPC_TASK_NO_ROUND_ROBIN); trace_nfs4_destroy_clientid(clp, status); if (status) dprintk("NFS: Got error %d from the server %s on " @@ -8481,7 +8483,8 @@ static int _nfs4_proc_create_session(struct nfs_client *clp, nfs4_init_channel_attrs(&args, clp->cl_rpcclient); args.flags = (SESSION4_PERSIST | SESSION4_BACK_CHAN); - status = rpc_call_sync(session->clp->cl_rpcclient, &msg, RPC_TASK_TIMEOUT); + status = rpc_call_sync(session->clp->cl_rpcclient, &msg, + RPC_TASK_TIMEOUT | RPC_TASK_NO_ROUND_ROBIN); trace_nfs4_create_session(clp, status); switch (status) { @@ -8557,7 +8560,8 @@ int nfs4_proc_destroy_session(struct nfs4_session *session, if (!test_and_clear_bit(NFS4_SESSION_ESTABLISHED, &session->session_state)) return 0; - status = rpc_call_sync(session->clp->cl_rpcclient, &msg, RPC_TASK_TIMEOUT); + status = rpc_call_sync(session->clp->cl_rpcclient, &msg, + RPC_TASK_TIMEOUT | RPC_TASK_NO_ROUND_ROBIN); trace_nfs4_destroy_session(session->clp, status); if (status) @@ -8811,7 +8815,7 @@ static int nfs41_proc_reclaim_complete(struct nfs_client *clp, .rpc_client = clp->cl_rpcclient, .rpc_message = &msg, .callback_ops = &nfs4_reclaim_complete_call_ops, - .flags = RPC_TASK_ASYNC, + .flags = RPC_TASK_ASYNC | RPC_TASK_NO_ROUND_ROBIN, }; int status = -ENOMEM; @@ -9330,7 +9334,7 @@ _nfs41_proc_secinfo_no_name(struct nfs_server *server, struct nfs_fh *fhandle, dprintk("--> %s\n", __func__); status = nfs4_call_sync(clnt, server, &msg, &args.seq_args, - &res.seq_res, 0); + &res.seq_res, RPC_TASK_NO_ROUND_ROBIN); dprintk("<-- %s status=%d\n", __func__, status); put_cred(cred); diff --git a/include/linux/sunrpc/sched.h b/include/linux/sunrpc/sched.h index d0e451868f02..11424bdf09e6 100644 --- a/include/linux/sunrpc/sched.h +++ b/include/linux/sunrpc/sched.h @@ -126,6 +126,7 @@ struct rpc_task_setup { #define RPC_CALL_MAJORSEEN 0x0020 /* major timeout seen */ #define RPC_TASK_ROOTCREDS 0x0040 /* force root creds */ #define RPC_TASK_DYNAMIC 0x0080 /* task was kmalloc'ed */ +#define RPC_TASK_NO_ROUND_ROBIN 0x0100 /* send requests on "main" xprt */ #define RPC_TASK_SOFT 0x0200 /* Use soft timeouts */ #define RPC_TASK_SOFTCONN 0x0400 /* Fail if can't connect */ #define RPC_TASK_SENT 0x0800 /* message was sent */ diff --git a/net/sunrpc/clnt.c b/net/sunrpc/clnt.c index b6aca8cb5ae6..d599fab8adcb 100644 --- a/net/sunrpc/clnt.c +++ b/net/sunrpc/clnt.c @@ -995,6 +995,24 @@ rpc_task_get_xprt(struct rpc_clnt *clnt) return xprt; } +static struct rpc_xprt * +rpc_task_get_first_xprt(struct rpc_clnt *clnt) +{ + struct rpc_xprt_switch *xps; + struct rpc_xprt *xprt; + + rcu_read_lock(); + xprt = xprt_get(rcu_dereference(clnt->cl_xprt)); + if (xprt) { + atomic_long_inc(&xprt->queuelen); + xps = rcu_dereference(clnt->cl_xpi.xpi_xpswitch); + atomic_long_inc(&xps->xps_queuelen); + } + rcu_read_unlock(); + + return xprt; +} + static void rpc_task_release_xprt(struct rpc_clnt *clnt, struct rpc_xprt *xprt) { @@ -1042,7 +1060,11 @@ void rpc_task_release_client(struct rpc_task *task) static void rpc_task_set_transport(struct rpc_task *task, struct rpc_clnt *clnt) { - if (!task->tk_xprt) + if (task->tk_xprt) + return; + if (task->tk_flags & RPC_TASK_NO_ROUND_ROBIN) + task->tk_xprt = rpc_task_get_first_xprt(clnt); + else task->tk_xprt = rpc_task_get_xprt(clnt); } -- cgit v1.2.3 From 9dfe52a95a60096fc12234383a19b0f436304418 Mon Sep 17 00:00:00 2001 From: Dave Wysochanski Date: Thu, 23 May 2019 16:13:48 -0400 Subject: SUNRPC: Move call to rpc_count_iostats before rpc_call_done For diagnostic purposes, it would be useful to have an rpc_iostats metric of RPCs completing with tk_status < 0. Unfortunately, tk_status is reset inside the rpc_call_done functions for each operation, and the call to tally the per-op metrics comes after rpc_call_done. Refactor the call to rpc_count_iostat earlier in rpc_exit_task so we can count these RPCs completing in error. Signed-off-by: Dave Wysochanski Signed-off-by: Trond Myklebust --- net/sunrpc/sched.c | 5 +++++ net/sunrpc/xprt.c | 4 ---- 2 files changed, 5 insertions(+), 4 deletions(-) (limited to 'net') diff --git a/net/sunrpc/sched.c b/net/sunrpc/sched.c index a2c114812717..f8ea362fae91 100644 --- a/net/sunrpc/sched.c +++ b/net/sunrpc/sched.c @@ -23,6 +23,7 @@ #include #include +#include #include "sunrpc.h" @@ -832,6 +833,10 @@ rpc_reset_task_statistics(struct rpc_task *task) void rpc_exit_task(struct rpc_task *task) { task->tk_action = NULL; + if (task->tk_ops->rpc_count_stats) + task->tk_ops->rpc_count_stats(task, task->tk_calldata); + else if (task->tk_client) + rpc_count_iostats(task, task->tk_client->cl_metrics); if (task->tk_ops->rpc_call_done != NULL) { task->tk_ops->rpc_call_done(task, task->tk_calldata); if (task->tk_action != NULL) { diff --git a/net/sunrpc/xprt.c b/net/sunrpc/xprt.c index f6c82b1651e7..ab6b4c729ca5 100644 --- a/net/sunrpc/xprt.c +++ b/net/sunrpc/xprt.c @@ -1765,10 +1765,6 @@ void xprt_release(struct rpc_task *task) } xprt = req->rq_xprt; - if (task->tk_ops->rpc_count_stats != NULL) - task->tk_ops->rpc_count_stats(task, task->tk_calldata); - else if (task->tk_client) - rpc_count_iostats(task, task->tk_client->cl_metrics); xprt_request_dequeue_all(task, req); spin_lock_bh(&xprt->transport_lock); xprt->ops->release_xprt(xprt, task); -- cgit v1.2.3 From 93ba048e1b9f95ac7e1584fa33ed4d985bf1133d Mon Sep 17 00:00:00 2001 From: Dave Wysochanski Date: Thu, 23 May 2019 16:13:49 -0400 Subject: SUNRPC: Use proper printk specifiers for unsigned long long Update the printk specifiers inside _print_rpc_iostats to avoid a checkpatch warning. Signed-off-by: Dave Wysochanski Signed-off-by: Trond Myklebust --- net/sunrpc/stats.c | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) (limited to 'net') diff --git a/net/sunrpc/stats.c b/net/sunrpc/stats.c index 2b6dc7e5f74f..2f7bde82450b 100644 --- a/net/sunrpc/stats.c +++ b/net/sunrpc/stats.c @@ -225,7 +225,7 @@ static void _print_rpc_iostats(struct seq_file *seq, struct rpc_iostats *stats, int op, const struct rpc_procinfo *procs) { _print_name(seq, op, procs); - seq_printf(seq, "%lu %lu %lu %Lu %Lu %Lu %Lu %Lu\n", + seq_printf(seq, "%lu %lu %lu %llu %llu %llu %llu %llu\n", stats->om_ops, stats->om_ntrans, stats->om_timeouts, -- cgit v1.2.3 From 10db56917bcb80e70bbcd443d78bbfcb0b1e0652 Mon Sep 17 00:00:00 2001 From: NeilBrown Date: Thu, 30 May 2019 10:41:28 +1000 Subject: SUNRPC: enhance rpc_clnt_show_stats() to report on all xprts. Now that a client can have multiple xprts, we need to report the statistics for all of them. Reported-by: Chuck Lever Signed-off-by: NeilBrown Signed-off-by: Trond Myklebust --- net/sunrpc/stats.c | 15 +++++++++------ 1 file changed, 9 insertions(+), 6 deletions(-) (limited to 'net') diff --git a/net/sunrpc/stats.c b/net/sunrpc/stats.c index 2b6dc7e5f74f..d26df6074bca 100644 --- a/net/sunrpc/stats.c +++ b/net/sunrpc/stats.c @@ -236,9 +236,16 @@ static void _print_rpc_iostats(struct seq_file *seq, struct rpc_iostats *stats, ktime_to_ms(stats->om_execute)); } +static int do_print_stats(struct rpc_clnt *clnt, struct rpc_xprt *xprt, void *seqv) +{ + struct seq_file *seq = seqv; + + xprt->ops->print_stats(xprt, seq); + return 0; +} + void rpc_clnt_show_stats(struct seq_file *seq, struct rpc_clnt *clnt) { - struct rpc_xprt *xprt; unsigned int op, maxproc = clnt->cl_maxproc; if (!clnt->cl_metrics) @@ -248,11 +255,7 @@ void rpc_clnt_show_stats(struct seq_file *seq, struct rpc_clnt *clnt) seq_printf(seq, "p/v: %u/%u (%s)\n", clnt->cl_prog, clnt->cl_vers, clnt->cl_program->name); - rcu_read_lock(); - xprt = rcu_dereference(clnt->cl_xprt); - if (xprt) - xprt->ops->print_stats(xprt, seq); - rcu_read_unlock(); + rpc_clnt_iterate_for_each_xprt(clnt, do_print_stats, seq); seq_printf(seq, "\tper-op statistics\n"); for (op = 0; op < maxproc; op++) { -- cgit v1.2.3 From a332518fda4731c07394164b3edcbb6efaf4c4d7 Mon Sep 17 00:00:00 2001 From: Dave Wysochanski Date: Thu, 23 May 2019 16:13:50 -0400 Subject: SUNRPC: Count ops completing with tk_status < 0 We often see various error conditions with NFS4.x that show up with a very high operation count all completing with tk_status < 0 in a short period of time. Add a count to rpc_iostats to record on a per-op basis the ops that complete in this manner, which will enable lower overhead diagnostics. Signed-off-by: Dave Wysochanski Signed-off-by: Trond Myklebust --- include/linux/sunrpc/metrics.h | 7 ++++++- net/sunrpc/stats.c | 8 ++++++-- 2 files changed, 12 insertions(+), 3 deletions(-) (limited to 'net') diff --git a/include/linux/sunrpc/metrics.h b/include/linux/sunrpc/metrics.h index 1b3751327575..0ee3f7052846 100644 --- a/include/linux/sunrpc/metrics.h +++ b/include/linux/sunrpc/metrics.h @@ -30,7 +30,7 @@ #include #include -#define RPC_IOSTATS_VERS "1.0" +#define RPC_IOSTATS_VERS "1.1" struct rpc_iostats { spinlock_t om_lock; @@ -66,6 +66,11 @@ struct rpc_iostats { ktime_t om_queue, /* queued for xmit */ om_rtt, /* RPC RTT */ om_execute; /* RPC execution */ + /* + * The count of operations that complete with tk_status < 0. + * These statuses usually indicate error conditions. + */ + unsigned long om_error_status; } ____cacheline_aligned; struct rpc_task; diff --git a/net/sunrpc/stats.c b/net/sunrpc/stats.c index 2f7bde82450b..48ea776364f8 100644 --- a/net/sunrpc/stats.c +++ b/net/sunrpc/stats.c @@ -177,6 +177,8 @@ void rpc_count_iostats_metrics(const struct rpc_task *task, execute = ktime_sub(now, task->tk_start); op_metrics->om_execute = ktime_add(op_metrics->om_execute, execute); + if (task->tk_status < 0) + op_metrics->om_error_status++; spin_unlock(&op_metrics->om_lock); @@ -219,13 +221,14 @@ static void _add_rpc_iostats(struct rpc_iostats *a, struct rpc_iostats *b) a->om_queue = ktime_add(a->om_queue, b->om_queue); a->om_rtt = ktime_add(a->om_rtt, b->om_rtt); a->om_execute = ktime_add(a->om_execute, b->om_execute); + a->om_error_status += b->om_error_status; } static void _print_rpc_iostats(struct seq_file *seq, struct rpc_iostats *stats, int op, const struct rpc_procinfo *procs) { _print_name(seq, op, procs); - seq_printf(seq, "%lu %lu %lu %llu %llu %llu %llu %llu\n", + seq_printf(seq, "%lu %lu %lu %llu %llu %llu %llu %llu %lu\n", stats->om_ops, stats->om_ntrans, stats->om_timeouts, @@ -233,7 +236,8 @@ static void _print_rpc_iostats(struct seq_file *seq, struct rpc_iostats *stats, stats->om_bytes_recv, ktime_to_ms(stats->om_queue), ktime_to_ms(stats->om_rtt), - ktime_to_ms(stats->om_execute)); + ktime_to_ms(stats->om_execute), + stats->om_error_status); } void rpc_clnt_show_stats(struct seq_file *seq, struct rpc_clnt *clnt) -- cgit v1.2.3 From 2f34b8bfae19a244993e2b6cd0a8514f3ffd3fa8 Mon Sep 17 00:00:00 2001 From: NeilBrown Date: Thu, 30 May 2019 10:41:28 +1000 Subject: SUNRPC: add links for all client xprts to debugfs Now that a client can have multiple xprts, we need to add them all to debugs. The first one is still "xprt" Subsequent xprts are "xprt1", "xprt2", etc. Signed-off-by: NeilBrown Signed-off-by: Trond Myklebust --- net/sunrpc/debugfs.c | 46 +++++++++++++++++++++++++++++----------------- 1 file changed, 29 insertions(+), 17 deletions(-) (limited to 'net') diff --git a/net/sunrpc/debugfs.c b/net/sunrpc/debugfs.c index 95ebd76b132d..228bc7e8bca0 100644 --- a/net/sunrpc/debugfs.c +++ b/net/sunrpc/debugfs.c @@ -118,12 +118,38 @@ static const struct file_operations tasks_fops = { .release = tasks_release, }; +static int do_xprt_debugfs(struct rpc_clnt *clnt, struct rpc_xprt *xprt, void *numv) +{ + int len; + char name[24]; /* enough for "../../rpc_xprt/ + 8 hex digits + NULL */ + char link[9]; /* enough for 8 hex digits + NULL */ + int *nump = numv; + + if (IS_ERR_OR_NULL(xprt->debugfs)) + return 0; + len = snprintf(name, sizeof(name), "../../rpc_xprt/%s", + xprt->debugfs->d_name.name); + if (len > sizeof(name)) + return -1; + if (*nump == 0) + strcpy(link, "xprt"); + else { + len = snprintf(link, sizeof(link), "xprt%d", *nump); + if (len > sizeof(link)) + return -1; + } + if (!debugfs_create_symlink(link, clnt->cl_debugfs, name)) + return -1; + (*nump)++; + return 0; +} + void rpc_clnt_debugfs_register(struct rpc_clnt *clnt) { int len; - char name[24]; /* enough for "../../rpc_xprt/ + 8 hex digits + NULL */ - struct rpc_xprt *xprt; + char name[9]; /* enough for 8 hex digits + NULL */ + int xprtnum = 0; /* Already registered? */ if (clnt->cl_debugfs || !rpc_clnt_dir) @@ -143,21 +169,7 @@ rpc_clnt_debugfs_register(struct rpc_clnt *clnt) clnt, &tasks_fops)) goto out_err; - rcu_read_lock(); - xprt = rcu_dereference(clnt->cl_xprt); - /* no "debugfs" dentry? Don't bother with the symlink. */ - if (IS_ERR_OR_NULL(xprt->debugfs)) { - rcu_read_unlock(); - return; - } - len = snprintf(name, sizeof(name), "../../rpc_xprt/%s", - xprt->debugfs->d_name.name); - rcu_read_unlock(); - - if (len >= sizeof(name)) - goto out_err; - - if (!debugfs_create_symlink("xprt", clnt->cl_debugfs, name)) + if (rpc_clnt_iterate_for_each_xprt(clnt, do_xprt_debugfs, &xprtnum) < 0) goto out_err; return; -- cgit v1.2.3 From b6580ab39b092aac40871fed11faa410c44b1da0 Mon Sep 17 00:00:00 2001 From: Trond Myklebust Date: Thu, 30 May 2019 11:24:26 -0400 Subject: SUNRPC: Remove warning in debugfs.c when compiling with W=1 Remove the following warning: net/sunrpc/debugfs.c:13: warning: cannot understand function prototype: 'struct dentry *topdir; Signed-off-by: Trond Myklebust --- net/sunrpc/debugfs.c | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) (limited to 'net') diff --git a/net/sunrpc/debugfs.c b/net/sunrpc/debugfs.c index 228bc7e8bca0..105bea190a45 100644 --- a/net/sunrpc/debugfs.c +++ b/net/sunrpc/debugfs.c @@ -1,5 +1,5 @@ // SPDX-License-Identifier: GPL-2.0 -/** +/* * debugfs interface for sunrpc * * (c) 2014 Jeff Layton -- cgit v1.2.3 From 4368d77a4d985758303a4257c07991c079db7a75 Mon Sep 17 00:00:00 2001 From: Anna Schumaker Date: Wed, 19 Jun 2019 17:24:10 -0400 Subject: SUNRPC: Drop redundant CONFIG_ from CONFIG_SUNRPC_DISABLE_INSECURE_ENCTYPES The "CONFIG_" portion is added automatically, so this was being expanded into "CONFIG_CONFIG_SUNRPC_DISABLE_INSECURE_ENCTYPES" Signed-off-by: Anna Schumaker Signed-off-by: Trond Myklebust --- net/sunrpc/Kconfig | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) (limited to 'net') diff --git a/net/sunrpc/Kconfig b/net/sunrpc/Kconfig index aa307505ca54..3bcf985507be 100644 --- a/net/sunrpc/Kconfig +++ b/net/sunrpc/Kconfig @@ -35,7 +35,7 @@ config RPCSEC_GSS_KRB5 If unsure, say Y. -config CONFIG_SUNRPC_DISABLE_INSECURE_ENCTYPES +config SUNRPC_DISABLE_INSECURE_ENCTYPES bool "Secure RPC: Disable insecure Kerberos encryption types" depends on RPCSEC_GSS_KRB5 default n -- cgit v1.2.3 From 80d3c45fd765fbf4f10981b60ff6b1384bdbc706 Mon Sep 17 00:00:00 2001 From: Dave Wysochanski Date: Wed, 26 Jun 2019 16:30:24 -0400 Subject: SUNRPC: Fix possible autodisconnect during connect due to old last_used Ensure last_used is updated before calling mod_timer inside xprt_schedule_autodisconnect. This avoids a possible xprt_autoclose firing immediately after a successful connect when xprt_unlock_connect calls xprt_schedule_autodisconnect with an old value of last_used. Signed-off-by: Dave Wysochanski Signed-off-by: Trond Myklebust --- net/sunrpc/xprt.c | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) (limited to 'net') diff --git a/net/sunrpc/xprt.c b/net/sunrpc/xprt.c index c4d138202abb..70d6a1f10db9 100644 --- a/net/sunrpc/xprt.c +++ b/net/sunrpc/xprt.c @@ -750,6 +750,7 @@ static void xprt_schedule_autodisconnect(struct rpc_xprt *xprt) __must_hold(&xprt->transport_lock) { + xprt->last_used = jiffies; if (RB_EMPTY_ROOT(&xprt->recv_queue) && xprt_has_timer(xprt)) mod_timer(&xprt->timer, xprt->last_used + xprt->idle_timeout); } @@ -1765,7 +1766,6 @@ void xprt_release(struct rpc_task *task) xprt->ops->release_xprt(xprt, task); if (xprt->ops->release_request) xprt->ops->release_request(task); - xprt->last_used = jiffies; xprt_schedule_autodisconnect(xprt); spin_unlock(&xprt->transport_lock); if (req->rq_buffer) -- cgit v1.2.3 From 1310051c720a83c5717658bcbff710b260f2bff9 Mon Sep 17 00:00:00 2001 From: Chuck Lever Date: Wed, 19 Jun 2019 10:32:43 -0400 Subject: xprtrdma: Replace use of xdr_stream_pos in rpcrdma_marshal_req This is a latent bug. xdr_stream_pos works by subtracting xdr_stream::nwords from xdr_buf::len. But xdr_stream::nwords is not initialized by xdr_init_encode(). It works today only because all fields in rpcrdma_req::rl_stream are initialized to zero by rpcrdma_req_create, making the subtraction in xdr_stream_pos always a no-op. I found this issue via code inspection. It was introduced by commit 39f4cd9e9982 ("xprtrdma: Harden chunk list encoding against send buffer overflow"), but the code has changed enough since then that this fix can't be automatically applied to stable. Signed-off-by: Chuck Lever Signed-off-by: Anna Schumaker --- include/trace/events/rpcrdma.h | 9 +++++---- net/sunrpc/xprtrdma/rpc_rdma.c | 6 +++--- 2 files changed, 8 insertions(+), 7 deletions(-) (limited to 'net') diff --git a/include/trace/events/rpcrdma.h b/include/trace/events/rpcrdma.h index f0678e3ac2d4..59492a93fe1d 100644 --- a/include/trace/events/rpcrdma.h +++ b/include/trace/events/rpcrdma.h @@ -470,13 +470,12 @@ TRACE_DEFINE_ENUM(rpcrdma_replych); TRACE_EVENT(xprtrdma_marshal, TP_PROTO( - const struct rpc_rqst *rqst, - unsigned int hdrlen, + const struct rpcrdma_req *req, unsigned int rtype, unsigned int wtype ), - TP_ARGS(rqst, hdrlen, rtype, wtype), + TP_ARGS(req, rtype, wtype), TP_STRUCT__entry( __field(unsigned int, task_id) @@ -491,10 +490,12 @@ TRACE_EVENT(xprtrdma_marshal, ), TP_fast_assign( + const struct rpc_rqst *rqst = &req->rl_slot; + __entry->task_id = rqst->rq_task->tk_pid; __entry->client_id = rqst->rq_task->tk_client->cl_clid; __entry->xid = be32_to_cpu(rqst->rq_xid); - __entry->hdrlen = hdrlen; + __entry->hdrlen = req->rl_hdrbuf.len; __entry->headlen = rqst->rq_snd_buf.head[0].iov_len; __entry->pagelen = rqst->rq_snd_buf.page_len; __entry->taillen = rqst->rq_snd_buf.tail[0].iov_len; diff --git a/net/sunrpc/xprtrdma/rpc_rdma.c b/net/sunrpc/xprtrdma/rpc_rdma.c index 85115a2e2639..97bfb804b6c6 100644 --- a/net/sunrpc/xprtrdma/rpc_rdma.c +++ b/net/sunrpc/xprtrdma/rpc_rdma.c @@ -867,12 +867,12 @@ rpcrdma_marshal_req(struct rpcrdma_xprt *r_xprt, struct rpc_rqst *rqst) if (ret) goto out_err; - trace_xprtrdma_marshal(rqst, xdr_stream_pos(xdr), rtype, wtype); - - ret = rpcrdma_prepare_send_sges(r_xprt, req, xdr_stream_pos(xdr), + ret = rpcrdma_prepare_send_sges(r_xprt, req, req->rl_hdrbuf.len, &rqst->rq_snd_buf, rtype); if (ret) goto out_err; + + trace_xprtrdma_marshal(req, rtype, wtype); return 0; out_err: -- cgit v1.2.3 From 05eb06d86685e7d9dac60e6bbb46d7f4c30b056e Mon Sep 17 00:00:00 2001 From: Chuck Lever Date: Wed, 19 Jun 2019 10:32:48 -0400 Subject: xprtrdma: Fix occasional transport deadlock Under high I/O workloads, I've noticed that an RPC/RDMA transport occasionally deadlocks (IOPS goes to zero, and doesn't recover). Diagnosis shows that the sendctx queue is empty, but when sendctxs are returned to the queue, the xprt_write_space wake-up never occurs. The wake-up logic in rpcrdma_sendctx_put_locked is racy. I noticed that both EMPTY_SCQ and XPRT_WRITE_SPACE are implemented via an atomic bit. Just one of those is sufficient. Removing EMPTY_SCQ in favor of the generic bit mechanism makes the deadlock un-reproducible. Without EMPTY_SCQ, rpcrdma_buffer::rb_flags is no longer used and is therefore removed. Unfortunately this patch does not apply cleanly to stable. If needed, someone will have to port it and test it. Fixes: 2fad659209d5 ("xprtrdma: Wait on empty sendctx queue") Signed-off-by: Chuck Lever Signed-off-by: Anna Schumaker --- include/trace/events/rpcrdma.h | 27 +++++++++++++++++++++++++++ net/sunrpc/xprtrdma/frwr_ops.c | 6 +++++- net/sunrpc/xprtrdma/rpc_rdma.c | 26 ++++++++++++-------------- net/sunrpc/xprtrdma/verbs.c | 11 +++-------- net/sunrpc/xprtrdma/xprt_rdma.h | 6 ------ 5 files changed, 47 insertions(+), 29 deletions(-) (limited to 'net') diff --git a/include/trace/events/rpcrdma.h b/include/trace/events/rpcrdma.h index 59492a93fe1d..2fb415136f7b 100644 --- a/include/trace/events/rpcrdma.h +++ b/include/trace/events/rpcrdma.h @@ -539,6 +539,33 @@ TRACE_EVENT(xprtrdma_marshal_failed, ) ); +TRACE_EVENT(xprtrdma_prepsend_failed, + TP_PROTO(const struct rpc_rqst *rqst, + int ret + ), + + TP_ARGS(rqst, ret), + + TP_STRUCT__entry( + __field(unsigned int, task_id) + __field(unsigned int, client_id) + __field(u32, xid) + __field(int, ret) + ), + + TP_fast_assign( + __entry->task_id = rqst->rq_task->tk_pid; + __entry->client_id = rqst->rq_task->tk_client->cl_clid; + __entry->xid = be32_to_cpu(rqst->rq_xid); + __entry->ret = ret; + ), + + TP_printk("task:%u@%u xid=0x%08x: ret=%d", + __entry->task_id, __entry->client_id, __entry->xid, + __entry->ret + ) +); + TRACE_EVENT(xprtrdma_post_send, TP_PROTO( const struct rpcrdma_req *req, diff --git a/net/sunrpc/xprtrdma/frwr_ops.c b/net/sunrpc/xprtrdma/frwr_ops.c index 794ba4ca0994..ac47314fb751 100644 --- a/net/sunrpc/xprtrdma/frwr_ops.c +++ b/net/sunrpc/xprtrdma/frwr_ops.c @@ -391,7 +391,7 @@ struct rpcrdma_mr_seg *frwr_map(struct rpcrdma_xprt *r_xprt, rpcrdma_mr_recycle(mr); mr = rpcrdma_mr_get(r_xprt); if (!mr) - return ERR_PTR(-EAGAIN); + goto out_getmr_err; } while (mr->frwr.fr_state != FRWR_IS_INVALID); frwr = &mr->frwr; frwr->fr_state = FRWR_IS_VALID; @@ -448,6 +448,10 @@ struct rpcrdma_mr_seg *frwr_map(struct rpcrdma_xprt *r_xprt, *out = mr; return seg; +out_getmr_err: + xprt_wait_for_buffer_space(&r_xprt->rx_xprt); + return ERR_PTR(-EAGAIN); + out_dmamap_err: mr->mr_dir = DMA_NONE; trace_xprtrdma_frwr_sgerr(mr, i); diff --git a/net/sunrpc/xprtrdma/rpc_rdma.c b/net/sunrpc/xprtrdma/rpc_rdma.c index 97bfb804b6c6..59b214ba8813 100644 --- a/net/sunrpc/xprtrdma/rpc_rdma.c +++ b/net/sunrpc/xprtrdma/rpc_rdma.c @@ -699,22 +699,28 @@ rpcrdma_prepare_send_sges(struct rpcrdma_xprt *r_xprt, struct rpcrdma_req *req, u32 hdrlen, struct xdr_buf *xdr, enum rpcrdma_chunktype rtype) { + int ret; + + ret = -EAGAIN; req->rl_sendctx = rpcrdma_sendctx_get_locked(r_xprt); if (!req->rl_sendctx) - return -EAGAIN; + goto err; req->rl_sendctx->sc_wr.num_sge = 0; req->rl_sendctx->sc_unmap_count = 0; req->rl_sendctx->sc_req = req; __clear_bit(RPCRDMA_REQ_F_TX_RESOURCES, &req->rl_flags); + ret = -EIO; if (!rpcrdma_prepare_hdr_sge(r_xprt, req, hdrlen)) - return -EIO; - + goto err; if (rtype != rpcrdma_areadch) if (!rpcrdma_prepare_msg_sges(r_xprt, req, xdr, rtype)) - return -EIO; - + goto err; return 0; + +err: + trace_xprtrdma_prepsend_failed(&req->rl_slot, ret); + return ret; } /** @@ -877,15 +883,7 @@ rpcrdma_marshal_req(struct rpcrdma_xprt *r_xprt, struct rpc_rqst *rqst) out_err: trace_xprtrdma_marshal_failed(rqst, ret); - switch (ret) { - case -EAGAIN: - xprt_wait_for_buffer_space(rqst->rq_xprt); - break; - case -ENOBUFS: - break; - default: - r_xprt->rx_stats.failed_marshal_count++; - } + r_xprt->rx_stats.failed_marshal_count++; return ret; } diff --git a/net/sunrpc/xprtrdma/verbs.c b/net/sunrpc/xprtrdma/verbs.c index e71315e9dca2..0be5a36cacb6 100644 --- a/net/sunrpc/xprtrdma/verbs.c +++ b/net/sunrpc/xprtrdma/verbs.c @@ -901,7 +901,7 @@ out_emptyq: * completions recently. This is a sign the Send Queue is * backing up. Cause the caller to pause and try again. */ - set_bit(RPCRDMA_BUF_F_EMPTY_SCQ, &buf->rb_flags); + xprt_wait_for_buffer_space(&r_xprt->rx_xprt); r_xprt->rx_stats.empty_sendctx_q++; return NULL; } @@ -936,10 +936,7 @@ rpcrdma_sendctx_put_locked(struct rpcrdma_sendctx *sc) /* Paired with READ_ONCE */ smp_store_release(&buf->rb_sc_tail, next_tail); - if (test_and_clear_bit(RPCRDMA_BUF_F_EMPTY_SCQ, &buf->rb_flags)) { - smp_mb__after_atomic(); - xprt_write_space(&sc->sc_xprt->rx_xprt); - } + xprt_write_space(&sc->sc_xprt->rx_xprt); } static void @@ -977,8 +974,6 @@ rpcrdma_mrs_create(struct rpcrdma_xprt *r_xprt) r_xprt->rx_stats.mrs_allocated += count; spin_unlock(&buf->rb_mrlock); trace_xprtrdma_createmrs(r_xprt, count); - - xprt_write_space(&r_xprt->rx_xprt); } static void @@ -990,6 +985,7 @@ rpcrdma_mr_refresh_worker(struct work_struct *work) rx_buf); rpcrdma_mrs_create(r_xprt); + xprt_write_space(&r_xprt->rx_xprt); } /** @@ -1089,7 +1085,6 @@ int rpcrdma_buffer_create(struct rpcrdma_xprt *r_xprt) struct rpcrdma_buffer *buf = &r_xprt->rx_buf; int i, rc; - buf->rb_flags = 0; buf->rb_max_requests = r_xprt->rx_ep.rep_max_requests; buf->rb_bc_srv_max_requests = 0; spin_lock_init(&buf->rb_mrlock); diff --git a/net/sunrpc/xprtrdma/xprt_rdma.h b/net/sunrpc/xprtrdma/xprt_rdma.h index d1e0749bcbc4..2c6c5d8c3de1 100644 --- a/net/sunrpc/xprtrdma/xprt_rdma.h +++ b/net/sunrpc/xprtrdma/xprt_rdma.h @@ -391,7 +391,6 @@ struct rpcrdma_buffer { struct list_head rb_recv_bufs; struct list_head rb_allreqs; - unsigned long rb_flags; u32 rb_max_requests; u32 rb_credits; /* most recent credit grant */ @@ -402,11 +401,6 @@ struct rpcrdma_buffer { struct delayed_work rb_refresh_worker; }; -/* rb_flags */ -enum { - RPCRDMA_BUF_F_EMPTY_SCQ = 0, -}; - /* * Statistics for RPCRDMA */ -- cgit v1.2.3 From 5809ea4f7c39bf38e3f85ec185b776da9d81717c Mon Sep 17 00:00:00 2001 From: Chuck Lever Date: Wed, 19 Jun 2019 10:32:54 -0400 Subject: xprtrdma: Remove the RPCRDMA_REQ_F_PENDING flag Commit 9590d083c1bb ("xprtrdma: Use xprt_pin_rqst in rpcrdma_reply_handler") pins incoming RPC/RDMA replies so they can be left in the pending requests queue while they are being processed without introducing a race between ->buf_free and the transport's reply handler. Therefore RPCRDMA_REQ_F_PENDING is no longer necessary. Signed-off-by: Chuck Lever Signed-off-by: Anna Schumaker --- net/sunrpc/xprtrdma/rpc_rdma.c | 1 - net/sunrpc/xprtrdma/transport.c | 4 +--- net/sunrpc/xprtrdma/xprt_rdma.h | 1 - 3 files changed, 1 insertion(+), 5 deletions(-) (limited to 'net') diff --git a/net/sunrpc/xprtrdma/rpc_rdma.c b/net/sunrpc/xprtrdma/rpc_rdma.c index 59b214ba8813..fbc0a9ff14b1 100644 --- a/net/sunrpc/xprtrdma/rpc_rdma.c +++ b/net/sunrpc/xprtrdma/rpc_rdma.c @@ -1371,7 +1371,6 @@ void rpcrdma_reply_handler(struct rpcrdma_rep *rep) } req->rl_reply = rep; rep->rr_rqst = rqst; - clear_bit(RPCRDMA_REQ_F_PENDING, &req->rl_flags); trace_xprtrdma_reply(rqst->rq_task, rep, req, credits); queue_work(buf->rb_completion_wq, &rep->rr_work); diff --git a/net/sunrpc/xprtrdma/transport.c b/net/sunrpc/xprtrdma/transport.c index 1f73a6a7e43c..f84375ddbb4d 100644 --- a/net/sunrpc/xprtrdma/transport.c +++ b/net/sunrpc/xprtrdma/transport.c @@ -618,8 +618,7 @@ xprt_rdma_free(struct rpc_task *task) struct rpcrdma_xprt *r_xprt = rpcx_to_rdmax(rqst->rq_xprt); struct rpcrdma_req *req = rpcr_to_rdmar(rqst); - if (test_bit(RPCRDMA_REQ_F_PENDING, &req->rl_flags)) - rpcrdma_release_rqst(r_xprt, req); + rpcrdma_release_rqst(r_xprt, req); trace_xprtrdma_op_free(task, req); } @@ -667,7 +666,6 @@ xprt_rdma_send_request(struct rpc_rqst *rqst) goto drop_connection; rqst->rq_xtime = ktime_get(); - __set_bit(RPCRDMA_REQ_F_PENDING, &req->rl_flags); if (rpcrdma_ep_post(&r_xprt->rx_ia, &r_xprt->rx_ep, req)) goto drop_connection; diff --git a/net/sunrpc/xprtrdma/xprt_rdma.h b/net/sunrpc/xprtrdma/xprt_rdma.h index 2c6c5d8c3de1..3c39aa3c113c 100644 --- a/net/sunrpc/xprtrdma/xprt_rdma.h +++ b/net/sunrpc/xprtrdma/xprt_rdma.h @@ -344,7 +344,6 @@ struct rpcrdma_req { /* rl_flags */ enum { - RPCRDMA_REQ_F_PENDING = 0, RPCRDMA_REQ_F_TX_RESOURCES, }; -- cgit v1.2.3 From 847568942f93e0af77e4bb8a098899f310cb3a88 Mon Sep 17 00:00:00 2001 From: Chuck Lever Date: Wed, 19 Jun 2019 10:32:59 -0400 Subject: xprtrdma: Remove fr_state Now that both the Send and Receive completions are handled in process context, it is safe to DMA unmap and return MRs to the free or recycle lists directly in the completion handlers. Doing this means rpcrdma_frwr no longer needs to track the state of each MR, meaning that a VALID or FLUSHED MR can no longer appear on an xprt's MR free list. Thus there is no longer a need to track the MR's registration state in rpcrdma_frwr. Signed-off-by: Chuck Lever Signed-off-by: Anna Schumaker --- include/trace/events/rpcrdma.h | 19 +--- net/sunrpc/xprtrdma/frwr_ops.c | 204 ++++++++++++++++++---------------------- net/sunrpc/xprtrdma/rpc_rdma.c | 2 +- net/sunrpc/xprtrdma/xprt_rdma.h | 11 +-- 4 files changed, 96 insertions(+), 140 deletions(-) (limited to 'net') diff --git a/include/trace/events/rpcrdma.h b/include/trace/events/rpcrdma.h index 2fb415136f7b..1d2547084998 100644 --- a/include/trace/events/rpcrdma.h +++ b/include/trace/events/rpcrdma.h @@ -181,18 +181,6 @@ DECLARE_EVENT_CLASS(xprtrdma_wrch_event, ), \ TP_ARGS(task, mr, nsegs)) -TRACE_DEFINE_ENUM(FRWR_IS_INVALID); -TRACE_DEFINE_ENUM(FRWR_IS_VALID); -TRACE_DEFINE_ENUM(FRWR_FLUSHED_FR); -TRACE_DEFINE_ENUM(FRWR_FLUSHED_LI); - -#define xprtrdma_show_frwr_state(x) \ - __print_symbolic(x, \ - { FRWR_IS_INVALID, "INVALID" }, \ - { FRWR_IS_VALID, "VALID" }, \ - { FRWR_FLUSHED_FR, "FLUSHED_FR" }, \ - { FRWR_FLUSHED_LI, "FLUSHED_LI" }) - DECLARE_EVENT_CLASS(xprtrdma_frwr_done, TP_PROTO( const struct ib_wc *wc, @@ -203,22 +191,19 @@ DECLARE_EVENT_CLASS(xprtrdma_frwr_done, TP_STRUCT__entry( __field(const void *, mr) - __field(unsigned int, state) __field(unsigned int, status) __field(unsigned int, vendor_err) ), TP_fast_assign( __entry->mr = container_of(frwr, struct rpcrdma_mr, frwr); - __entry->state = frwr->fr_state; __entry->status = wc->status; __entry->vendor_err = __entry->status ? wc->vendor_err : 0; ), TP_printk( - "mr=%p state=%s: %s (%u/0x%x)", - __entry->mr, xprtrdma_show_frwr_state(__entry->state), - rdma_show_wc_status(__entry->status), + "mr=%p: %s (%u/0x%x)", + __entry->mr, rdma_show_wc_status(__entry->status), __entry->status, __entry->vendor_err ) ); diff --git a/net/sunrpc/xprtrdma/frwr_ops.c b/net/sunrpc/xprtrdma/frwr_ops.c index ac47314fb751..5c480bc13075 100644 --- a/net/sunrpc/xprtrdma/frwr_ops.c +++ b/net/sunrpc/xprtrdma/frwr_ops.c @@ -168,7 +168,6 @@ int frwr_init_mr(struct rpcrdma_ia *ia, struct rpcrdma_mr *mr) goto out_list_err; mr->frwr.fr_mr = frmr; - mr->frwr.fr_state = FRWR_IS_INVALID; mr->mr_dir = DMA_NONE; INIT_LIST_HEAD(&mr->mr_list); INIT_WORK(&mr->mr_recycle, frwr_mr_recycle_worker); @@ -297,65 +296,6 @@ size_t frwr_maxpages(struct rpcrdma_xprt *r_xprt) (ia->ri_max_segs - 2) * ia->ri_max_frwr_depth); } -/** - * frwr_wc_fastreg - Invoked by RDMA provider for a flushed FastReg WC - * @cq: completion queue (ignored) - * @wc: completed WR - * - */ -static void -frwr_wc_fastreg(struct ib_cq *cq, struct ib_wc *wc) -{ - struct ib_cqe *cqe = wc->wr_cqe; - struct rpcrdma_frwr *frwr = - container_of(cqe, struct rpcrdma_frwr, fr_cqe); - - /* WARNING: Only wr_cqe and status are reliable at this point */ - if (wc->status != IB_WC_SUCCESS) - frwr->fr_state = FRWR_FLUSHED_FR; - trace_xprtrdma_wc_fastreg(wc, frwr); -} - -/** - * frwr_wc_localinv - Invoked by RDMA provider for a flushed LocalInv WC - * @cq: completion queue (ignored) - * @wc: completed WR - * - */ -static void -frwr_wc_localinv(struct ib_cq *cq, struct ib_wc *wc) -{ - struct ib_cqe *cqe = wc->wr_cqe; - struct rpcrdma_frwr *frwr = container_of(cqe, struct rpcrdma_frwr, - fr_cqe); - - /* WARNING: Only wr_cqe and status are reliable at this point */ - if (wc->status != IB_WC_SUCCESS) - frwr->fr_state = FRWR_FLUSHED_LI; - trace_xprtrdma_wc_li(wc, frwr); -} - -/** - * frwr_wc_localinv_wake - Invoked by RDMA provider for a signaled LocalInv WC - * @cq: completion queue (ignored) - * @wc: completed WR - * - * Awaken anyone waiting for an MR to finish being fenced. - */ -static void -frwr_wc_localinv_wake(struct ib_cq *cq, struct ib_wc *wc) -{ - struct ib_cqe *cqe = wc->wr_cqe; - struct rpcrdma_frwr *frwr = container_of(cqe, struct rpcrdma_frwr, - fr_cqe); - - /* WARNING: Only wr_cqe and status are reliable at this point */ - if (wc->status != IB_WC_SUCCESS) - frwr->fr_state = FRWR_FLUSHED_LI; - trace_xprtrdma_wc_li_wake(wc, frwr); - complete(&frwr->fr_linv_done); -} - /** * frwr_map - Register a memory region * @r_xprt: controlling transport @@ -378,23 +318,15 @@ struct rpcrdma_mr_seg *frwr_map(struct rpcrdma_xprt *r_xprt, { struct rpcrdma_ia *ia = &r_xprt->rx_ia; bool holes_ok = ia->ri_mrtype == IB_MR_TYPE_SG_GAPS; - struct rpcrdma_frwr *frwr; struct rpcrdma_mr *mr; struct ib_mr *ibmr; struct ib_reg_wr *reg_wr; int i, n; u8 key; - mr = NULL; - do { - if (mr) - rpcrdma_mr_recycle(mr); - mr = rpcrdma_mr_get(r_xprt); - if (!mr) - goto out_getmr_err; - } while (mr->frwr.fr_state != FRWR_IS_INVALID); - frwr = &mr->frwr; - frwr->fr_state = FRWR_IS_VALID; + mr = rpcrdma_mr_get(r_xprt); + if (!mr) + goto out_getmr_err; if (nsegs > ia->ri_max_frwr_depth) nsegs = ia->ri_max_frwr_depth; @@ -423,7 +355,7 @@ struct rpcrdma_mr_seg *frwr_map(struct rpcrdma_xprt *r_xprt, if (!mr->mr_nents) goto out_dmamap_err; - ibmr = frwr->fr_mr; + ibmr = mr->frwr.fr_mr; n = ib_map_mr_sg(ibmr, mr->mr_sg, mr->mr_nents, NULL, PAGE_SIZE); if (unlikely(n != mr->mr_nents)) goto out_mapmr_err; @@ -433,7 +365,7 @@ struct rpcrdma_mr_seg *frwr_map(struct rpcrdma_xprt *r_xprt, key = (u8)(ibmr->rkey & 0x000000FF); ib_update_fast_reg_key(ibmr, ++key); - reg_wr = &frwr->fr_regwr; + reg_wr = &mr->frwr.fr_regwr; reg_wr->mr = ibmr; reg_wr->key = ibmr->rkey; reg_wr->access = writing ? @@ -464,6 +396,23 @@ out_mapmr_err: return ERR_PTR(-EIO); } +/** + * frwr_wc_fastreg - Invoked by RDMA provider for a flushed FastReg WC + * @cq: completion queue (ignored) + * @wc: completed WR + * + */ +static void frwr_wc_fastreg(struct ib_cq *cq, struct ib_wc *wc) +{ + struct ib_cqe *cqe = wc->wr_cqe; + struct rpcrdma_frwr *frwr = + container_of(cqe, struct rpcrdma_frwr, fr_cqe); + + /* WARNING: Only wr_cqe and status are reliable at this point */ + trace_xprtrdma_wc_fastreg(wc, frwr); + /* The MR will get recycled when the associated req is retransmitted */ +} + /** * frwr_send - post Send WR containing the RPC Call message * @ia: interface adapter @@ -516,31 +465,72 @@ void frwr_reminv(struct rpcrdma_rep *rep, struct list_head *mrs) if (mr->mr_handle == rep->rr_inv_rkey) { list_del_init(&mr->mr_list); trace_xprtrdma_mr_remoteinv(mr); - mr->frwr.fr_state = FRWR_IS_INVALID; rpcrdma_mr_unmap_and_put(mr); break; /* only one invalidated MR per RPC */ } } +static void __frwr_release_mr(struct ib_wc *wc, struct rpcrdma_mr *mr) +{ + if (wc->status != IB_WC_SUCCESS) + rpcrdma_mr_recycle(mr); + else + rpcrdma_mr_unmap_and_put(mr); +} + /** - * frwr_unmap_sync - invalidate memory regions that were registered for @req - * @r_xprt: controlling transport - * @mrs: list of MRs to process + * frwr_wc_localinv - Invoked by RDMA provider for a LOCAL_INV WC + * @cq: completion queue (ignored) + * @wc: completed WR * - * Sleeps until it is safe for the host CPU to access the - * previously mapped memory regions. + */ +static void frwr_wc_localinv(struct ib_cq *cq, struct ib_wc *wc) +{ + struct ib_cqe *cqe = wc->wr_cqe; + struct rpcrdma_frwr *frwr = + container_of(cqe, struct rpcrdma_frwr, fr_cqe); + struct rpcrdma_mr *mr = container_of(frwr, struct rpcrdma_mr, frwr); + + /* WARNING: Only wr_cqe and status are reliable at this point */ + trace_xprtrdma_wc_li(wc, frwr); + __frwr_release_mr(wc, mr); +} + +/** + * frwr_wc_localinv_wake - Invoked by RDMA provider for a LOCAL_INV WC + * @cq: completion queue (ignored) + * @wc: completed WR * - * Caller ensures that @mrs is not empty before the call. This - * function empties the list. + * Awaken anyone waiting for an MR to finish being fenced. */ -void frwr_unmap_sync(struct rpcrdma_xprt *r_xprt, struct list_head *mrs) +static void frwr_wc_localinv_wake(struct ib_cq *cq, struct ib_wc *wc) +{ + struct ib_cqe *cqe = wc->wr_cqe; + struct rpcrdma_frwr *frwr = + container_of(cqe, struct rpcrdma_frwr, fr_cqe); + struct rpcrdma_mr *mr = container_of(frwr, struct rpcrdma_mr, frwr); + + /* WARNING: Only wr_cqe and status are reliable at this point */ + trace_xprtrdma_wc_li_wake(wc, frwr); + complete(&frwr->fr_linv_done); + __frwr_release_mr(wc, mr); +} + +/** + * frwr_unmap_sync - invalidate memory regions that were registered for @req + * @r_xprt: controlling transport instance + * @req: rpcrdma_req with a non-empty list of MRs to process + * + * Sleeps until it is safe for the host CPU to access the previously mapped + * memory regions. + */ +void frwr_unmap_sync(struct rpcrdma_xprt *r_xprt, struct rpcrdma_req *req) { struct ib_send_wr *first, **prev, *last; const struct ib_send_wr *bad_wr; - struct rpcrdma_ia *ia = &r_xprt->rx_ia; struct rpcrdma_frwr *frwr; struct rpcrdma_mr *mr; - int count, rc; + int rc; /* ORDER: Invalidate all of the MRs first * @@ -548,33 +538,32 @@ void frwr_unmap_sync(struct rpcrdma_xprt *r_xprt, struct list_head *mrs) * a single ib_post_send() call. */ frwr = NULL; - count = 0; prev = &first; - list_for_each_entry(mr, mrs, mr_list) { - mr->frwr.fr_state = FRWR_IS_INVALID; + while (!list_empty(&req->rl_registered)) { + mr = rpcrdma_mr_pop(&req->rl_registered); - frwr = &mr->frwr; trace_xprtrdma_mr_localinv(mr); + r_xprt->rx_stats.local_inv_needed++; + frwr = &mr->frwr; frwr->fr_cqe.done = frwr_wc_localinv; last = &frwr->fr_invwr; - memset(last, 0, sizeof(*last)); + last->next = NULL; last->wr_cqe = &frwr->fr_cqe; + last->sg_list = NULL; + last->num_sge = 0; last->opcode = IB_WR_LOCAL_INV; + last->send_flags = IB_SEND_SIGNALED; last->ex.invalidate_rkey = mr->mr_handle; - count++; *prev = last; prev = &last->next; } - if (!frwr) - goto unmap; /* Strong send queue ordering guarantees that when the * last WR in the chain completes, all WRs in the chain * are complete. */ - last->send_flags = IB_SEND_SIGNALED; frwr->fr_cqe.done = frwr_wc_localinv_wake; reinit_completion(&frwr->fr_linv_done); @@ -582,29 +571,20 @@ void frwr_unmap_sync(struct rpcrdma_xprt *r_xprt, struct list_head *mrs) * replaces the QP. The RPC reply handler won't call us * unless ri_id->qp is a valid pointer. */ - r_xprt->rx_stats.local_inv_needed++; bad_wr = NULL; - rc = ib_post_send(ia->ri_id->qp, first, &bad_wr); - if (bad_wr != first) - wait_for_completion(&frwr->fr_linv_done); - if (rc) - goto out_release; + rc = ib_post_send(r_xprt->rx_ia.ri_id->qp, first, &bad_wr); + trace_xprtrdma_post_send(req, rc); - /* ORDER: Now DMA unmap all of the MRs, and return - * them to the free MR list. + /* The final LOCAL_INV WR in the chain is supposed to + * do the wake. If it was never posted, the wake will + * not happen, so don't wait in that case. */ -unmap: - while (!list_empty(mrs)) { - mr = rpcrdma_mr_pop(mrs); - rpcrdma_mr_unmap_and_put(mr); - } - return; - -out_release: - pr_err("rpcrdma: FRWR invalidate ib_post_send returned %i\n", rc); + if (bad_wr != first) + wait_for_completion(&frwr->fr_linv_done); + if (!rc) + return; - /* Unmap and release the MRs in the LOCAL_INV WRs that did not - * get posted. + /* Recycle MRs in the LOCAL_INV chain that did not get posted. */ while (bad_wr) { frwr = container_of(bad_wr, struct rpcrdma_frwr, diff --git a/net/sunrpc/xprtrdma/rpc_rdma.c b/net/sunrpc/xprtrdma/rpc_rdma.c index fbc0a9ff14b1..f23450b176dd 100644 --- a/net/sunrpc/xprtrdma/rpc_rdma.c +++ b/net/sunrpc/xprtrdma/rpc_rdma.c @@ -1277,7 +1277,7 @@ void rpcrdma_release_rqst(struct rpcrdma_xprt *r_xprt, struct rpcrdma_req *req) * RPC has relinquished all its Send Queue entries. */ if (!list_empty(&req->rl_registered)) - frwr_unmap_sync(r_xprt, &req->rl_registered); + frwr_unmap_sync(r_xprt, req); /* Ensure that any DMA mapped pages associated with * the Send of the RPC Call have been unmapped before diff --git a/net/sunrpc/xprtrdma/xprt_rdma.h b/net/sunrpc/xprtrdma/xprt_rdma.h index 3c39aa3c113c..a9de116a5c1a 100644 --- a/net/sunrpc/xprtrdma/xprt_rdma.h +++ b/net/sunrpc/xprtrdma/xprt_rdma.h @@ -240,17 +240,9 @@ struct rpcrdma_sendctx { * An external memory region is any buffer or page that is registered * on the fly (ie, not pre-registered). */ -enum rpcrdma_frwr_state { - FRWR_IS_INVALID, /* ready to be used */ - FRWR_IS_VALID, /* in use */ - FRWR_FLUSHED_FR, /* flushed FASTREG WR */ - FRWR_FLUSHED_LI, /* flushed LOCALINV WR */ -}; - struct rpcrdma_frwr { struct ib_mr *fr_mr; struct ib_cqe fr_cqe; - enum rpcrdma_frwr_state fr_state; struct completion fr_linv_done; union { struct ib_reg_wr fr_regwr; @@ -567,8 +559,7 @@ struct rpcrdma_mr_seg *frwr_map(struct rpcrdma_xprt *r_xprt, struct rpcrdma_mr **mr); int frwr_send(struct rpcrdma_ia *ia, struct rpcrdma_req *req); void frwr_reminv(struct rpcrdma_rep *rep, struct list_head *mrs); -void frwr_unmap_sync(struct rpcrdma_xprt *r_xprt, - struct list_head *mrs); +void frwr_unmap_sync(struct rpcrdma_xprt *r_xprt, struct rpcrdma_req *req); /* * RPC/RDMA protocol calls - xprtrdma/rpc_rdma.c -- cgit v1.2.3 From 40088f0e9b62d7fa033918b54ef45f8bf7d1ad1c Mon Sep 17 00:00:00 2001 From: Chuck Lever Date: Wed, 19 Jun 2019 10:33:04 -0400 Subject: xprtrdma: Add mechanism to place MRs back on the free list When a marshal operation fails, any MRs that were already set up for that request are recycled. Recycling releases MRs and creates new ones, which is expensive. Since commit f2877623082b ("xprtrdma: Chain Send to FastReg WRs") was merged, recycling FRWRs is unnecessary. This is because before that commit, frwr_map had already posted FAST_REG Work Requests, so ownership of the MRs had already been passed to the NIC and thus dealing with them had to be delayed until they completed. Since that commit, however, FAST_REG WRs are posted at the same time as the Send WR. This means that if marshaling fails, we are certain the MRs are safe to simply unmap and place back on the free list because neither the Send nor the FAST_REG WRs have been posted yet. The kernel still has ownership of the MRs at this point. This reduces the total number of MRs that the xprt has to create under heavy workloads and makes the marshaling logic less brittle. Signed-off-by: Chuck Lever Signed-off-by: Anna Schumaker --- net/sunrpc/xprtrdma/frwr_ops.c | 20 ++++++++++++++++++++ net/sunrpc/xprtrdma/rpc_rdma.c | 1 + net/sunrpc/xprtrdma/xprt_rdma.h | 1 + 3 files changed, 22 insertions(+) (limited to 'net') diff --git a/net/sunrpc/xprtrdma/frwr_ops.c b/net/sunrpc/xprtrdma/frwr_ops.c index 5c480bc13075..524cac0a0715 100644 --- a/net/sunrpc/xprtrdma/frwr_ops.c +++ b/net/sunrpc/xprtrdma/frwr_ops.c @@ -144,6 +144,26 @@ frwr_mr_recycle_worker(struct work_struct *work) frwr_release_mr(mr); } +/* frwr_reset - Place MRs back on the free list + * @req: request to reset + * + * Used after a failed marshal. For FRWR, this means the MRs + * don't have to be fully released and recreated. + * + * NB: This is safe only as long as none of @req's MRs are + * involved with an ongoing asynchronous FAST_REG or LOCAL_INV + * Work Request. + */ +void frwr_reset(struct rpcrdma_req *req) +{ + while (!list_empty(&req->rl_registered)) { + struct rpcrdma_mr *mr; + + mr = rpcrdma_mr_pop(&req->rl_registered); + rpcrdma_mr_unmap_and_put(mr); + } +} + /** * frwr_init_mr - Initialize one MR * @ia: interface adapter diff --git a/net/sunrpc/xprtrdma/rpc_rdma.c b/net/sunrpc/xprtrdma/rpc_rdma.c index f23450b176dd..67d72d68ca6c 100644 --- a/net/sunrpc/xprtrdma/rpc_rdma.c +++ b/net/sunrpc/xprtrdma/rpc_rdma.c @@ -884,6 +884,7 @@ rpcrdma_marshal_req(struct rpcrdma_xprt *r_xprt, struct rpc_rqst *rqst) out_err: trace_xprtrdma_marshal_failed(rqst, ret); r_xprt->rx_stats.failed_marshal_count++; + frwr_reset(req); return ret; } diff --git a/net/sunrpc/xprtrdma/xprt_rdma.h b/net/sunrpc/xprtrdma/xprt_rdma.h index a9de116a5c1a..a39652884308 100644 --- a/net/sunrpc/xprtrdma/xprt_rdma.h +++ b/net/sunrpc/xprtrdma/xprt_rdma.h @@ -549,6 +549,7 @@ rpcrdma_data_dir(bool writing) /* Memory registration calls xprtrdma/frwr_ops.c */ bool frwr_is_supported(struct ib_device *device); +void frwr_reset(struct rpcrdma_req *req); int frwr_open(struct rpcrdma_ia *ia, struct rpcrdma_ep *ep); int frwr_init_mr(struct rpcrdma_ia *ia, struct rpcrdma_mr *mr); void frwr_release_mr(struct rpcrdma_mr *mr); -- cgit v1.2.3 From d8099feda4833bab96b1bf312e9e6aad6b771570 Mon Sep 17 00:00:00 2001 From: Chuck Lever Date: Wed, 19 Jun 2019 10:33:10 -0400 Subject: xprtrdma: Reduce context switching due to Local Invalidation Since commit ba69cd122ece ("xprtrdma: Remove support for FMR memory registration"), FRWR is the only supported memory registration mode. We can take advantage of the asynchronous nature of FRWR's LOCAL_INV Work Requests to get rid of the completion wait by having the LOCAL_INV completion handler take care of DMA unmapping MRs and waking the upper layer RPC waiter. This eliminates two context switches when local invalidation is necessary. As a side benefit, we will no longer need the per-xprt deferred completion work queue. Signed-off-by: Chuck Lever Signed-off-by: Anna Schumaker --- include/trace/events/rpcrdma.h | 1 + net/sunrpc/xprtrdma/frwr_ops.c | 103 +++++++++++++++++++++++++++++++++++++++- net/sunrpc/xprtrdma/rpc_rdma.c | 61 ++++++++++++------------ net/sunrpc/xprtrdma/verbs.c | 17 ------- net/sunrpc/xprtrdma/xprt_rdma.h | 8 ++-- 5 files changed, 137 insertions(+), 53 deletions(-) (limited to 'net') diff --git a/include/trace/events/rpcrdma.h b/include/trace/events/rpcrdma.h index 1d2547084998..98023d91a72d 100644 --- a/include/trace/events/rpcrdma.h +++ b/include/trace/events/rpcrdma.h @@ -712,6 +712,7 @@ TRACE_EVENT(xprtrdma_wc_receive, DEFINE_FRWR_DONE_EVENT(xprtrdma_wc_fastreg); DEFINE_FRWR_DONE_EVENT(xprtrdma_wc_li); DEFINE_FRWR_DONE_EVENT(xprtrdma_wc_li_wake); +DEFINE_FRWR_DONE_EVENT(xprtrdma_wc_li_done); TRACE_EVENT(xprtrdma_frwr_alloc, TP_PROTO( diff --git a/net/sunrpc/xprtrdma/frwr_ops.c b/net/sunrpc/xprtrdma/frwr_ops.c index 524cac0a0715..0b6dad7580a1 100644 --- a/net/sunrpc/xprtrdma/frwr_ops.c +++ b/net/sunrpc/xprtrdma/frwr_ops.c @@ -542,7 +542,10 @@ static void frwr_wc_localinv_wake(struct ib_cq *cq, struct ib_wc *wc) * @req: rpcrdma_req with a non-empty list of MRs to process * * Sleeps until it is safe for the host CPU to access the previously mapped - * memory regions. + * memory regions. This guarantees that registered MRs are properly fenced + * from the server before the RPC consumer accesses the data in them. It + * also ensures proper Send flow control: waking the next RPC waits until + * this RPC has relinquished all its Send Queue entries. */ void frwr_unmap_sync(struct rpcrdma_xprt *r_xprt, struct rpcrdma_req *req) { @@ -616,3 +619,101 @@ void frwr_unmap_sync(struct rpcrdma_xprt *r_xprt, struct rpcrdma_req *req) rpcrdma_mr_recycle(mr); } } + +/** + * frwr_wc_localinv_done - Invoked by RDMA provider for a signaled LOCAL_INV WC + * @cq: completion queue (ignored) + * @wc: completed WR + * + */ +static void frwr_wc_localinv_done(struct ib_cq *cq, struct ib_wc *wc) +{ + struct ib_cqe *cqe = wc->wr_cqe; + struct rpcrdma_frwr *frwr = + container_of(cqe, struct rpcrdma_frwr, fr_cqe); + struct rpcrdma_mr *mr = container_of(frwr, struct rpcrdma_mr, frwr); + + /* WARNING: Only wr_cqe and status are reliable at this point */ + trace_xprtrdma_wc_li_done(wc, frwr); + rpcrdma_complete_rqst(frwr->fr_req->rl_reply); + __frwr_release_mr(wc, mr); +} + +/** + * frwr_unmap_async - invalidate memory regions that were registered for @req + * @r_xprt: controlling transport instance + * @req: rpcrdma_req with a non-empty list of MRs to process + * + * This guarantees that registered MRs are properly fenced from the + * server before the RPC consumer accesses the data in them. It also + * ensures proper Send flow control: waking the next RPC waits until + * this RPC has relinquished all its Send Queue entries. + */ +void frwr_unmap_async(struct rpcrdma_xprt *r_xprt, struct rpcrdma_req *req) +{ + struct ib_send_wr *first, *last, **prev; + const struct ib_send_wr *bad_wr; + struct rpcrdma_frwr *frwr; + struct rpcrdma_mr *mr; + int rc; + + /* Chain the LOCAL_INV Work Requests and post them with + * a single ib_post_send() call. + */ + frwr = NULL; + prev = &first; + while (!list_empty(&req->rl_registered)) { + mr = rpcrdma_mr_pop(&req->rl_registered); + + trace_xprtrdma_mr_localinv(mr); + r_xprt->rx_stats.local_inv_needed++; + + frwr = &mr->frwr; + frwr->fr_cqe.done = frwr_wc_localinv; + frwr->fr_req = req; + last = &frwr->fr_invwr; + last->next = NULL; + last->wr_cqe = &frwr->fr_cqe; + last->sg_list = NULL; + last->num_sge = 0; + last->opcode = IB_WR_LOCAL_INV; + last->send_flags = IB_SEND_SIGNALED; + last->ex.invalidate_rkey = mr->mr_handle; + + *prev = last; + prev = &last->next; + } + + /* Strong send queue ordering guarantees that when the + * last WR in the chain completes, all WRs in the chain + * are complete. The last completion will wake up the + * RPC waiter. + */ + frwr->fr_cqe.done = frwr_wc_localinv_done; + + /* Transport disconnect drains the receive CQ before it + * replaces the QP. The RPC reply handler won't call us + * unless ri_id->qp is a valid pointer. + */ + bad_wr = NULL; + rc = ib_post_send(r_xprt->rx_ia.ri_id->qp, first, &bad_wr); + trace_xprtrdma_post_send(req, rc); + if (!rc) + return; + + /* Recycle MRs in the LOCAL_INV chain that did not get posted. + */ + while (bad_wr) { + frwr = container_of(bad_wr, struct rpcrdma_frwr, fr_invwr); + mr = container_of(frwr, struct rpcrdma_mr, frwr); + bad_wr = bad_wr->next; + + rpcrdma_mr_recycle(mr); + } + + /* The final LOCAL_INV WR in the chain is supposed to + * do the wake. If it was never posted, the wake will + * not happen, so wake here in that case. + */ + rpcrdma_complete_rqst(req->rl_reply); +} diff --git a/net/sunrpc/xprtrdma/rpc_rdma.c b/net/sunrpc/xprtrdma/rpc_rdma.c index 67d72d68ca6c..33b6e6a03f68 100644 --- a/net/sunrpc/xprtrdma/rpc_rdma.c +++ b/net/sunrpc/xprtrdma/rpc_rdma.c @@ -1268,24 +1268,15 @@ out_badheader: goto out; } -void rpcrdma_release_rqst(struct rpcrdma_xprt *r_xprt, struct rpcrdma_req *req) +/* Ensure that any DMA mapped pages associated with + * the Send of the RPC Call have been unmapped before + * allowing the RPC to complete. This protects argument + * memory not controlled by the RPC client from being + * re-used before we're done with it. + */ +static void rpcrdma_release_tx(struct rpcrdma_xprt *r_xprt, + struct rpcrdma_req *req) { - /* Invalidate and unmap the data payloads before waking - * the waiting application. This guarantees the memory - * regions are properly fenced from the server before the - * application accesses the data. It also ensures proper - * send flow control: waking the next RPC waits until this - * RPC has relinquished all its Send Queue entries. - */ - if (!list_empty(&req->rl_registered)) - frwr_unmap_sync(r_xprt, req); - - /* Ensure that any DMA mapped pages associated with - * the Send of the RPC Call have been unmapped before - * allowing the RPC to complete. This protects argument - * memory not controlled by the RPC client from being - * re-used before we're done with it. - */ if (test_bit(RPCRDMA_REQ_F_TX_RESOURCES, &req->rl_flags)) { r_xprt->rx_stats.reply_waits_for_send++; out_of_line_wait_on_bit(&req->rl_flags, @@ -1295,24 +1286,23 @@ void rpcrdma_release_rqst(struct rpcrdma_xprt *r_xprt, struct rpcrdma_req *req) } } -/* Reply handling runs in the poll worker thread. Anything that - * might wait is deferred to a separate workqueue. +/** + * rpcrdma_release_rqst - Release hardware resources + * @r_xprt: controlling transport instance + * @req: request with resources to release + * */ -void rpcrdma_deferred_completion(struct work_struct *work) +void rpcrdma_release_rqst(struct rpcrdma_xprt *r_xprt, struct rpcrdma_req *req) { - struct rpcrdma_rep *rep = - container_of(work, struct rpcrdma_rep, rr_work); - struct rpcrdma_req *req = rpcr_to_rdmar(rep->rr_rqst); - struct rpcrdma_xprt *r_xprt = rep->rr_rxprt; + if (!list_empty(&req->rl_registered)) + frwr_unmap_sync(r_xprt, req); - trace_xprtrdma_defer_cmp(rep); - if (rep->rr_wc_flags & IB_WC_WITH_INVALIDATE) - frwr_reminv(rep, &req->rl_registered); - rpcrdma_release_rqst(r_xprt, req); - rpcrdma_complete_rqst(rep); + rpcrdma_release_tx(r_xprt, req); } -/* Process received RPC/RDMA messages. +/** + * rpcrdma_reply_handler - Process received RPC/RDMA messages + * @rep: Incoming rpcrdma_rep object to process * * Errors must result in the RPC task either being awakened, or * allowed to timeout, to discover the errors at that time. @@ -1374,7 +1364,16 @@ void rpcrdma_reply_handler(struct rpcrdma_rep *rep) rep->rr_rqst = rqst; trace_xprtrdma_reply(rqst->rq_task, rep, req, credits); - queue_work(buf->rb_completion_wq, &rep->rr_work); + + if (rep->rr_wc_flags & IB_WC_WITH_INVALIDATE) + frwr_reminv(rep, &req->rl_registered); + if (!list_empty(&req->rl_registered)) { + frwr_unmap_async(r_xprt, req); + /* LocalInv completion will complete the RPC */ + } else { + rpcrdma_release_tx(r_xprt, req); + rpcrdma_complete_rqst(rep); + } return; out_badversion: diff --git a/net/sunrpc/xprtrdma/verbs.c b/net/sunrpc/xprtrdma/verbs.c index 0be5a36cacb6..c50a4b295bd7 100644 --- a/net/sunrpc/xprtrdma/verbs.c +++ b/net/sunrpc/xprtrdma/verbs.c @@ -89,14 +89,12 @@ static void rpcrdma_post_recvs(struct rpcrdma_xprt *r_xprt, bool temp); */ static void rpcrdma_xprt_drain(struct rpcrdma_xprt *r_xprt) { - struct rpcrdma_buffer *buf = &r_xprt->rx_buf; struct rpcrdma_ia *ia = &r_xprt->rx_ia; /* Flush Receives, then wait for deferred Reply work * to complete. */ ib_drain_rq(ia->ri_id->qp); - drain_workqueue(buf->rb_completion_wq); /* Deferred Reply processing might have scheduled * local invalidations. @@ -1056,7 +1054,6 @@ static bool rpcrdma_rep_create(struct rpcrdma_xprt *r_xprt, bool temp) rep->rr_cqe.done = rpcrdma_wc_receive; rep->rr_rxprt = r_xprt; - INIT_WORK(&rep->rr_work, rpcrdma_deferred_completion); rep->rr_recv_wr.next = NULL; rep->rr_recv_wr.wr_cqe = &rep->rr_cqe; rep->rr_recv_wr.sg_list = &rep->rr_rdmabuf->rg_iov; @@ -1117,15 +1114,6 @@ int rpcrdma_buffer_create(struct rpcrdma_xprt *r_xprt) if (rc) goto out; - buf->rb_completion_wq = alloc_workqueue("rpcrdma-%s", - WQ_MEM_RECLAIM | WQ_HIGHPRI, - 0, - r_xprt->rx_xprt.address_strings[RPC_DISPLAY_ADDR]); - if (!buf->rb_completion_wq) { - rc = -ENOMEM; - goto out; - } - return 0; out: rpcrdma_buffer_destroy(buf); @@ -1199,11 +1187,6 @@ rpcrdma_buffer_destroy(struct rpcrdma_buffer *buf) { cancel_delayed_work_sync(&buf->rb_refresh_worker); - if (buf->rb_completion_wq) { - destroy_workqueue(buf->rb_completion_wq); - buf->rb_completion_wq = NULL; - } - rpcrdma_sendctxs_destroy(buf); while (!list_empty(&buf->rb_recv_bufs)) { diff --git a/net/sunrpc/xprtrdma/xprt_rdma.h b/net/sunrpc/xprtrdma/xprt_rdma.h index a39652884308..e465221c9c96 100644 --- a/net/sunrpc/xprtrdma/xprt_rdma.h +++ b/net/sunrpc/xprtrdma/xprt_rdma.h @@ -202,10 +202,9 @@ struct rpcrdma_rep { bool rr_temp; struct rpcrdma_regbuf *rr_rdmabuf; struct rpcrdma_xprt *rr_rxprt; - struct work_struct rr_work; + struct rpc_rqst *rr_rqst; struct xdr_buf rr_hdrbuf; struct xdr_stream rr_stream; - struct rpc_rqst *rr_rqst; struct list_head rr_list; struct ib_recv_wr rr_recv_wr; }; @@ -240,10 +239,12 @@ struct rpcrdma_sendctx { * An external memory region is any buffer or page that is registered * on the fly (ie, not pre-registered). */ +struct rpcrdma_req; struct rpcrdma_frwr { struct ib_mr *fr_mr; struct ib_cqe fr_cqe; struct completion fr_linv_done; + struct rpcrdma_req *fr_req; union { struct ib_reg_wr fr_regwr; struct ib_send_wr fr_invwr; @@ -388,7 +389,6 @@ struct rpcrdma_buffer { u32 rb_bc_srv_max_requests; u32 rb_bc_max_requests; - struct workqueue_struct *rb_completion_wq; struct delayed_work rb_refresh_worker; }; @@ -561,6 +561,7 @@ struct rpcrdma_mr_seg *frwr_map(struct rpcrdma_xprt *r_xprt, int frwr_send(struct rpcrdma_ia *ia, struct rpcrdma_req *req); void frwr_reminv(struct rpcrdma_rep *rep, struct list_head *mrs); void frwr_unmap_sync(struct rpcrdma_xprt *r_xprt, struct rpcrdma_req *req); +void frwr_unmap_async(struct rpcrdma_xprt *r_xprt, struct rpcrdma_req *req); /* * RPC/RDMA protocol calls - xprtrdma/rpc_rdma.c @@ -585,7 +586,6 @@ void rpcrdma_complete_rqst(struct rpcrdma_rep *rep); void rpcrdma_reply_handler(struct rpcrdma_rep *rep); void rpcrdma_release_rqst(struct rpcrdma_xprt *r_xprt, struct rpcrdma_req *req); -void rpcrdma_deferred_completion(struct work_struct *work); static inline void rpcrdma_set_xdrlen(struct xdr_buf *xdr, size_t len) { -- cgit v1.2.3 From 0ab115237025f5e379620bbcd56a02697d07b002 Mon Sep 17 00:00:00 2001 From: Chuck Lever Date: Wed, 19 Jun 2019 10:33:15 -0400 Subject: xprtrdma: Wake RPCs directly in rpcrdma_wc_send path Eliminate a context switch in the path that handles RPC wake-ups when a Receive completion has to wait for a Send completion. Signed-off-by: Chuck Lever Signed-off-by: Anna Schumaker --- net/sunrpc/xprtrdma/rpc_rdma.c | 61 ++++++++++++++++------------------------- net/sunrpc/xprtrdma/transport.c | 10 ++++++- net/sunrpc/xprtrdma/verbs.c | 3 +- net/sunrpc/xprtrdma/xprt_rdma.h | 12 ++------ 4 files changed, 36 insertions(+), 50 deletions(-) (limited to 'net') diff --git a/net/sunrpc/xprtrdma/rpc_rdma.c b/net/sunrpc/xprtrdma/rpc_rdma.c index 33b6e6a03f68..caf0b1950d76 100644 --- a/net/sunrpc/xprtrdma/rpc_rdma.c +++ b/net/sunrpc/xprtrdma/rpc_rdma.c @@ -511,6 +511,16 @@ rpcrdma_encode_reply_chunk(struct rpcrdma_xprt *r_xprt, struct rpcrdma_req *req, return 0; } +static void rpcrdma_sendctx_done(struct kref *kref) +{ + struct rpcrdma_req *req = + container_of(kref, struct rpcrdma_req, rl_kref); + struct rpcrdma_rep *rep = req->rl_reply; + + rpcrdma_complete_rqst(rep); + rep->rr_rxprt->rx_stats.reply_waits_for_send++; +} + /** * rpcrdma_sendctx_unmap - DMA-unmap Send buffer * @sc: sendctx containing SGEs to unmap @@ -520,6 +530,9 @@ void rpcrdma_sendctx_unmap(struct rpcrdma_sendctx *sc) { struct ib_sge *sge; + if (!sc->sc_unmap_count) + return; + /* The first two SGEs contain the transport header and * the inline buffer. These are always left mapped so * they can be cheaply re-used. @@ -529,9 +542,7 @@ void rpcrdma_sendctx_unmap(struct rpcrdma_sendctx *sc) ib_dma_unmap_page(sc->sc_device, sge->addr, sge->length, DMA_TO_DEVICE); - if (test_and_clear_bit(RPCRDMA_REQ_F_TX_RESOURCES, - &sc->sc_req->rl_flags)) - wake_up_bit(&sc->sc_req->rl_flags, RPCRDMA_REQ_F_TX_RESOURCES); + kref_put(&sc->sc_req->rl_kref, rpcrdma_sendctx_done); } /* Prepare an SGE for the RPC-over-RDMA transport header. @@ -666,7 +677,7 @@ map_tail: out: sc->sc_wr.num_sge += sge_no; if (sc->sc_unmap_count) - __set_bit(RPCRDMA_REQ_F_TX_RESOURCES, &req->rl_flags); + kref_get(&req->rl_kref); return true; out_regbuf: @@ -708,7 +719,7 @@ rpcrdma_prepare_send_sges(struct rpcrdma_xprt *r_xprt, req->rl_sendctx->sc_wr.num_sge = 0; req->rl_sendctx->sc_unmap_count = 0; req->rl_sendctx->sc_req = req; - __clear_bit(RPCRDMA_REQ_F_TX_RESOURCES, &req->rl_flags); + kref_init(&req->rl_kref); ret = -EIO; if (!rpcrdma_prepare_hdr_sge(r_xprt, req, hdrlen)) @@ -1268,36 +1279,12 @@ out_badheader: goto out; } -/* Ensure that any DMA mapped pages associated with - * the Send of the RPC Call have been unmapped before - * allowing the RPC to complete. This protects argument - * memory not controlled by the RPC client from being - * re-used before we're done with it. - */ -static void rpcrdma_release_tx(struct rpcrdma_xprt *r_xprt, - struct rpcrdma_req *req) +static void rpcrdma_reply_done(struct kref *kref) { - if (test_bit(RPCRDMA_REQ_F_TX_RESOURCES, &req->rl_flags)) { - r_xprt->rx_stats.reply_waits_for_send++; - out_of_line_wait_on_bit(&req->rl_flags, - RPCRDMA_REQ_F_TX_RESOURCES, - bit_wait, - TASK_UNINTERRUPTIBLE); - } -} + struct rpcrdma_req *req = + container_of(kref, struct rpcrdma_req, rl_kref); -/** - * rpcrdma_release_rqst - Release hardware resources - * @r_xprt: controlling transport instance - * @req: request with resources to release - * - */ -void rpcrdma_release_rqst(struct rpcrdma_xprt *r_xprt, struct rpcrdma_req *req) -{ - if (!list_empty(&req->rl_registered)) - frwr_unmap_sync(r_xprt, req); - - rpcrdma_release_tx(r_xprt, req); + rpcrdma_complete_rqst(req->rl_reply); } /** @@ -1367,13 +1354,11 @@ void rpcrdma_reply_handler(struct rpcrdma_rep *rep) if (rep->rr_wc_flags & IB_WC_WITH_INVALIDATE) frwr_reminv(rep, &req->rl_registered); - if (!list_empty(&req->rl_registered)) { + if (!list_empty(&req->rl_registered)) frwr_unmap_async(r_xprt, req); /* LocalInv completion will complete the RPC */ - } else { - rpcrdma_release_tx(r_xprt, req); - rpcrdma_complete_rqst(rep); - } + else + kref_put(&req->rl_kref, rpcrdma_reply_done); return; out_badversion: diff --git a/net/sunrpc/xprtrdma/transport.c b/net/sunrpc/xprtrdma/transport.c index f84375ddbb4d..9575f1d8db07 100644 --- a/net/sunrpc/xprtrdma/transport.c +++ b/net/sunrpc/xprtrdma/transport.c @@ -618,8 +618,16 @@ xprt_rdma_free(struct rpc_task *task) struct rpcrdma_xprt *r_xprt = rpcx_to_rdmax(rqst->rq_xprt); struct rpcrdma_req *req = rpcr_to_rdmar(rqst); - rpcrdma_release_rqst(r_xprt, req); trace_xprtrdma_op_free(task, req); + + if (!list_empty(&req->rl_registered)) + frwr_unmap_sync(r_xprt, req); + + /* XXX: If the RPC is completing because of a signal and + * not because a reply was received, we ought to ensure + * that the Send completion has fired, so that memory + * involved with the Send is not still visible to the NIC. + */ } /** diff --git a/net/sunrpc/xprtrdma/verbs.c b/net/sunrpc/xprtrdma/verbs.c index c50a4b295bd7..4e22cc244149 100644 --- a/net/sunrpc/xprtrdma/verbs.c +++ b/net/sunrpc/xprtrdma/verbs.c @@ -1462,8 +1462,7 @@ rpcrdma_ep_post(struct rpcrdma_ia *ia, struct ib_send_wr *send_wr = &req->rl_sendctx->sc_wr; int rc; - if (!ep->rep_send_count || - test_bit(RPCRDMA_REQ_F_TX_RESOURCES, &req->rl_flags)) { + if (!ep->rep_send_count || kref_read(&req->rl_kref) > 1) { send_wr->send_flags |= IB_SEND_SIGNALED; ep->rep_send_count = ep->rep_send_batch; } else { diff --git a/net/sunrpc/xprtrdma/xprt_rdma.h b/net/sunrpc/xprtrdma/xprt_rdma.h index e465221c9c96..5475f0dff22a 100644 --- a/net/sunrpc/xprtrdma/xprt_rdma.h +++ b/net/sunrpc/xprtrdma/xprt_rdma.h @@ -44,7 +44,8 @@ #include /* wait_queue_head_t, etc */ #include /* spinlock_t, etc */ -#include /* atomic_t, etc */ +#include /* atomic_t, etc */ +#include /* struct kref */ #include /* struct work_struct */ #include /* RDMA connection api */ @@ -329,17 +330,12 @@ struct rpcrdma_req { struct rpcrdma_regbuf *rl_recvbuf; /* rq_rcv_buf */ struct list_head rl_all; - unsigned long rl_flags; + struct kref rl_kref; struct list_head rl_registered; /* registered segments */ struct rpcrdma_mr_seg rl_segments[RPCRDMA_MAX_SEGS]; }; -/* rl_flags */ -enum { - RPCRDMA_REQ_F_TX_RESOURCES, -}; - static inline struct rpcrdma_req * rpcr_to_rdmar(const struct rpc_rqst *rqst) { @@ -584,8 +580,6 @@ int rpcrdma_marshal_req(struct rpcrdma_xprt *r_xprt, struct rpc_rqst *rqst); void rpcrdma_set_max_header_sizes(struct rpcrdma_xprt *); void rpcrdma_complete_rqst(struct rpcrdma_rep *rep); void rpcrdma_reply_handler(struct rpcrdma_rep *rep); -void rpcrdma_release_rqst(struct rpcrdma_xprt *r_xprt, - struct rpcrdma_req *req); static inline void rpcrdma_set_xdrlen(struct xdr_buf *xdr, size_t len) { -- cgit v1.2.3 From 379d1bc5be373c920bcda16b9894ae99505ea127 Mon Sep 17 00:00:00 2001 From: Chuck Lever Date: Wed, 19 Jun 2019 10:33:20 -0400 Subject: xprtrdma: Simplify rpcrdma_rep_create Clean up. Commit 7c8d9e7c8863 ("xprtrdma: Move Receive posting to Receive handler") reduced the number of rpcrdma_rep_create call sites to one. After that commit, the backchannel code no longer invokes it. Therefore the free list logic added by commit d698c4a02ee0 ("xprtrdma: Fix backchannel allocation of extra rpcrdma_reps") is no longer necessary, and in fact adds some extra overhead that we can do without. Simply post any newly created reps. They will get added back to the rb_recv_bufs list when they subsequently complete. Signed-off-by: Chuck Lever Signed-off-by: Anna Schumaker --- net/sunrpc/xprtrdma/verbs.c | 22 ++++++++-------------- 1 file changed, 8 insertions(+), 14 deletions(-) (limited to 'net') diff --git a/net/sunrpc/xprtrdma/verbs.c b/net/sunrpc/xprtrdma/verbs.c index 4e22cc244149..de6be101abf2 100644 --- a/net/sunrpc/xprtrdma/verbs.c +++ b/net/sunrpc/xprtrdma/verbs.c @@ -1036,9 +1036,9 @@ out1: return NULL; } -static bool rpcrdma_rep_create(struct rpcrdma_xprt *r_xprt, bool temp) +static struct rpcrdma_rep *rpcrdma_rep_create(struct rpcrdma_xprt *r_xprt, + bool temp) { - struct rpcrdma_buffer *buf = &r_xprt->rx_buf; struct rpcrdma_rep *rep; rep = kzalloc(sizeof(*rep), GFP_KERNEL); @@ -1049,9 +1049,9 @@ static bool rpcrdma_rep_create(struct rpcrdma_xprt *r_xprt, bool temp) DMA_FROM_DEVICE, GFP_KERNEL); if (!rep->rr_rdmabuf) goto out_free; + xdr_buf_init(&rep->rr_hdrbuf, rdmab_data(rep->rr_rdmabuf), rdmab_length(rep->rr_rdmabuf)); - rep->rr_cqe.done = rpcrdma_wc_receive; rep->rr_rxprt = r_xprt; rep->rr_recv_wr.next = NULL; @@ -1059,16 +1059,12 @@ static bool rpcrdma_rep_create(struct rpcrdma_xprt *r_xprt, bool temp) rep->rr_recv_wr.sg_list = &rep->rr_rdmabuf->rg_iov; rep->rr_recv_wr.num_sge = 1; rep->rr_temp = temp; - - spin_lock(&buf->rb_lock); - list_add(&rep->rr_list, &buf->rb_recv_bufs); - spin_unlock(&buf->rb_lock); - return true; + return rep; out_free: kfree(rep); out: - return false; + return NULL; } /** @@ -1497,7 +1493,6 @@ rpcrdma_post_recvs(struct rpcrdma_xprt *r_xprt, bool temp) count = 0; wr = NULL; while (needed) { - struct rpcrdma_regbuf *rb; struct rpcrdma_rep *rep; spin_lock(&buf->rb_lock); @@ -1507,13 +1502,12 @@ rpcrdma_post_recvs(struct rpcrdma_xprt *r_xprt, bool temp) list_del(&rep->rr_list); spin_unlock(&buf->rb_lock); if (!rep) { - if (!rpcrdma_rep_create(r_xprt, temp)) + rep = rpcrdma_rep_create(r_xprt, temp); + if (!rep) break; - continue; } - rb = rep->rr_rdmabuf; - if (!rpcrdma_regbuf_dma_map(r_xprt, rb)) { + if (!rpcrdma_regbuf_dma_map(r_xprt, rep->rr_rdmabuf)) { rpcrdma_recv_buffer_put(rep); break; } -- cgit v1.2.3 From 9ef33ef5b628037b694a433f8af014a04bb38126 Mon Sep 17 00:00:00 2001 From: Chuck Lever Date: Wed, 19 Jun 2019 10:33:26 -0400 Subject: xprtrdma: Streamline rpcrdma_post_recvs rb_lock is contended between rpcrdma_buffer_create, rpcrdma_buffer_put, and rpcrdma_post_recvs. Commit e340c2d6ef2a ("xprtrdma: Reduce the doorbell rate (Receive)") causes rpcrdma_post_recvs to take the rb_lock repeatedly when it determines more Receives are needed. Streamline this code path so it takes the lock just once in most cases to build the Receive chain that is about to be posted. Signed-off-by: Chuck Lever Signed-off-by: Anna Schumaker --- net/sunrpc/xprtrdma/verbs.c | 59 +++++++++++++++++++++++++++++---------------- 1 file changed, 38 insertions(+), 21 deletions(-) (limited to 'net') diff --git a/net/sunrpc/xprtrdma/verbs.c b/net/sunrpc/xprtrdma/verbs.c index de6be101abf2..3270c8ad1819 100644 --- a/net/sunrpc/xprtrdma/verbs.c +++ b/net/sunrpc/xprtrdma/verbs.c @@ -1478,11 +1478,13 @@ rpcrdma_post_recvs(struct rpcrdma_xprt *r_xprt, bool temp) { struct rpcrdma_buffer *buf = &r_xprt->rx_buf; struct rpcrdma_ep *ep = &r_xprt->rx_ep; - struct ib_recv_wr *wr, *bad_wr; + struct ib_recv_wr *i, *wr, *bad_wr; + struct rpcrdma_rep *rep; int needed, count, rc; rc = 0; count = 0; + needed = buf->rb_credits + (buf->rb_bc_srv_max_requests << 1); if (ep->rep_receive_count > needed) goto out; @@ -1490,39 +1492,48 @@ rpcrdma_post_recvs(struct rpcrdma_xprt *r_xprt, bool temp) if (!temp) needed += RPCRDMA_MAX_RECV_BATCH; - count = 0; + /* fast path: all needed reps can be found on the free list */ wr = NULL; + spin_lock(&buf->rb_lock); while (needed) { - struct rpcrdma_rep *rep; - - spin_lock(&buf->rb_lock); rep = list_first_entry_or_null(&buf->rb_recv_bufs, struct rpcrdma_rep, rr_list); - if (likely(rep)) - list_del(&rep->rr_list); - spin_unlock(&buf->rb_lock); - if (!rep) { - rep = rpcrdma_rep_create(r_xprt, temp); - if (!rep) - break; - } + if (!rep) + break; - if (!rpcrdma_regbuf_dma_map(r_xprt, rep->rr_rdmabuf)) { - rpcrdma_recv_buffer_put(rep); + list_del(&rep->rr_list); + rep->rr_recv_wr.next = wr; + wr = &rep->rr_recv_wr; + --needed; + } + spin_unlock(&buf->rb_lock); + + while (needed) { + rep = rpcrdma_rep_create(r_xprt, temp); + if (!rep) break; - } - trace_xprtrdma_post_recv(rep->rr_recv_wr.wr_cqe); rep->rr_recv_wr.next = wr; wr = &rep->rr_recv_wr; - ++count; --needed; } - if (!count) + if (!wr) goto out; + for (i = wr; i; i = i->next) { + rep = container_of(i, struct rpcrdma_rep, rr_recv_wr); + + if (!rpcrdma_regbuf_dma_map(r_xprt, rep->rr_rdmabuf)) + goto release_wrs; + + trace_xprtrdma_post_recv(rep->rr_recv_wr.wr_cqe); + ++count; + } + rc = ib_post_recv(r_xprt->rx_ia.ri_id->qp, wr, (const struct ib_recv_wr **)&bad_wr); +out: + trace_xprtrdma_post_recvs(r_xprt, count, rc); if (rc) { for (wr = bad_wr; wr;) { struct rpcrdma_rep *rep; @@ -1534,6 +1545,12 @@ rpcrdma_post_recvs(struct rpcrdma_xprt *r_xprt, bool temp) } } ep->rep_receive_count += count; -out: - trace_xprtrdma_post_recvs(r_xprt, count, rc); + return; + +release_wrs: + for (i = wr; i;) { + rep = container_of(i, struct rpcrdma_rep, rr_recv_wr); + i = i->next; + rpcrdma_recv_buffer_put(rep); + } } -- cgit v1.2.3 From 6a6c6def42469ce08023458ba439a8207fe87ae4 Mon Sep 17 00:00:00 2001 From: Chuck Lever Date: Wed, 19 Jun 2019 10:33:31 -0400 Subject: xprtrdma: Refactor chunk encoding Clean up. Move the "not present" case into the individual chunk encoders. This improves code organization and readability. The reason for the original organization was to optimize for the case where there there are no chunks. The optimization turned out to be inconsequential, so let's err on the side of code readability. Signed-off-by: Chuck Lever Signed-off-by: Anna Schumaker --- net/sunrpc/xprtrdma/rpc_rdma.c | 36 ++++++++++++++++-------------------- 1 file changed, 16 insertions(+), 20 deletions(-) (limited to 'net') diff --git a/net/sunrpc/xprtrdma/rpc_rdma.c b/net/sunrpc/xprtrdma/rpc_rdma.c index caf0b1950d76..d3515d3efe81 100644 --- a/net/sunrpc/xprtrdma/rpc_rdma.c +++ b/net/sunrpc/xprtrdma/rpc_rdma.c @@ -366,6 +366,9 @@ rpcrdma_encode_read_list(struct rpcrdma_xprt *r_xprt, struct rpcrdma_req *req, unsigned int pos; int nsegs; + if (rtype == rpcrdma_noch) + goto done; + pos = rqst->rq_snd_buf.head[0].iov_len; if (rtype == rpcrdma_areadch) pos = 0; @@ -389,7 +392,8 @@ rpcrdma_encode_read_list(struct rpcrdma_xprt *r_xprt, struct rpcrdma_req *req, nsegs -= mr->mr_nents; } while (nsegs); - return 0; +done: + return encode_item_not_present(xdr); } /* Register and XDR encode the Write list. Supports encoding a list @@ -417,6 +421,9 @@ rpcrdma_encode_write_list(struct rpcrdma_xprt *r_xprt, struct rpcrdma_req *req, int nsegs, nchunks; __be32 *segcount; + if (wtype != rpcrdma_writech) + goto done; + seg = req->rl_segments; nsegs = rpcrdma_convert_iovs(r_xprt, &rqst->rq_rcv_buf, rqst->rq_rcv_buf.head[0].iov_len, @@ -451,7 +458,8 @@ rpcrdma_encode_write_list(struct rpcrdma_xprt *r_xprt, struct rpcrdma_req *req, /* Update count of segments in this Write chunk */ *segcount = cpu_to_be32(nchunks); - return 0; +done: + return encode_item_not_present(xdr); } /* Register and XDR encode the Reply chunk. Supports encoding an array @@ -476,6 +484,9 @@ rpcrdma_encode_reply_chunk(struct rpcrdma_xprt *r_xprt, struct rpcrdma_req *req, int nsegs, nchunks; __be32 *segcount; + if (wtype != rpcrdma_replych) + return encode_item_not_present(xdr); + seg = req->rl_segments; nsegs = rpcrdma_convert_iovs(r_xprt, &rqst->rq_rcv_buf, 0, wtype, seg); if (nsegs < 0) @@ -859,28 +870,13 @@ rpcrdma_marshal_req(struct rpcrdma_xprt *r_xprt, struct rpc_rqst *rqst) * send a Call message with a Position Zero Read chunk and a * regular Read chunk at the same time. */ - if (rtype != rpcrdma_noch) { - ret = rpcrdma_encode_read_list(r_xprt, req, rqst, rtype); - if (ret) - goto out_err; - } - ret = encode_item_not_present(xdr); + ret = rpcrdma_encode_read_list(r_xprt, req, rqst, rtype); if (ret) goto out_err; - - if (wtype == rpcrdma_writech) { - ret = rpcrdma_encode_write_list(r_xprt, req, rqst, wtype); - if (ret) - goto out_err; - } - ret = encode_item_not_present(xdr); + ret = rpcrdma_encode_write_list(r_xprt, req, rqst, wtype); if (ret) goto out_err; - - if (wtype != rpcrdma_replych) - ret = encode_item_not_present(xdr); - else - ret = rpcrdma_encode_reply_chunk(r_xprt, req, rqst, wtype); + ret = rpcrdma_encode_reply_chunk(r_xprt, req, rqst, wtype); if (ret) goto out_err; -- cgit v1.2.3 From 5828cebad1c8d535f3c194439e394e92a2273fb2 Mon Sep 17 00:00:00 2001 From: Chuck Lever Date: Wed, 19 Jun 2019 10:33:36 -0400 Subject: xprtrdma: Remove rpcrdma_req::rl_buffer Clean up. There is only one remaining function, rpcrdma_buffer_put(), that uses this field. Its caller can supply a pointer to the correct rpcrdma_buffer, enabling the removal of an 8-byte pointer field from a frequently-allocated shared data structure. Signed-off-by: Chuck Lever Signed-off-by: Anna Schumaker --- net/sunrpc/xprtrdma/transport.c | 5 ++++- net/sunrpc/xprtrdma/verbs.c | 6 ++---- net/sunrpc/xprtrdma/xprt_rdma.h | 4 ++-- 3 files changed, 8 insertions(+), 7 deletions(-) (limited to 'net') diff --git a/net/sunrpc/xprtrdma/transport.c b/net/sunrpc/xprtrdma/transport.c index 9575f1d8db07..3688e0782587 100644 --- a/net/sunrpc/xprtrdma/transport.c +++ b/net/sunrpc/xprtrdma/transport.c @@ -550,8 +550,11 @@ out_sleep: static void xprt_rdma_free_slot(struct rpc_xprt *xprt, struct rpc_rqst *rqst) { + struct rpcrdma_xprt *r_xprt = + container_of(xprt, struct rpcrdma_xprt, rx_xprt); + memset(rqst, 0, sizeof(*rqst)); - rpcrdma_buffer_put(rpcr_to_rdmar(rqst)); + rpcrdma_buffer_put(&r_xprt->rx_buf, rpcr_to_rdmar(rqst)); rpc_wake_up_next(&xprt->backlog); } diff --git a/net/sunrpc/xprtrdma/verbs.c b/net/sunrpc/xprtrdma/verbs.c index 3270c8ad1819..805b1f35e1ca 100644 --- a/net/sunrpc/xprtrdma/verbs.c +++ b/net/sunrpc/xprtrdma/verbs.c @@ -1019,7 +1019,6 @@ struct rpcrdma_req *rpcrdma_req_create(struct rpcrdma_xprt *r_xprt, size_t size, if (!req->rl_recvbuf) goto out4; - req->rl_buffer = buffer; INIT_LIST_HEAD(&req->rl_registered); spin_lock(&buffer->rb_lock); list_add(&req->rl_all, &buffer->rb_allreqs); @@ -1299,13 +1298,12 @@ rpcrdma_buffer_get(struct rpcrdma_buffer *buffers) /** * rpcrdma_buffer_put - Put request/reply buffers back into pool + * @buffers: buffer pool * @req: object to return * */ -void -rpcrdma_buffer_put(struct rpcrdma_req *req) +void rpcrdma_buffer_put(struct rpcrdma_buffer *buffers, struct rpcrdma_req *req) { - struct rpcrdma_buffer *buffers = req->rl_buffer; struct rpcrdma_rep *rep = req->rl_reply; req->rl_reply = NULL; diff --git a/net/sunrpc/xprtrdma/xprt_rdma.h b/net/sunrpc/xprtrdma/xprt_rdma.h index 5475f0dff22a..117e32816e4f 100644 --- a/net/sunrpc/xprtrdma/xprt_rdma.h +++ b/net/sunrpc/xprtrdma/xprt_rdma.h @@ -320,7 +320,6 @@ struct rpcrdma_buffer; struct rpcrdma_req { struct list_head rl_list; struct rpc_rqst rl_slot; - struct rpcrdma_buffer *rl_buffer; struct rpcrdma_rep *rl_reply; struct xdr_stream rl_stream; struct xdr_buf rl_hdrbuf; @@ -499,7 +498,8 @@ rpcrdma_mr_recycle(struct rpcrdma_mr *mr) } struct rpcrdma_req *rpcrdma_buffer_get(struct rpcrdma_buffer *); -void rpcrdma_buffer_put(struct rpcrdma_req *); +void rpcrdma_buffer_put(struct rpcrdma_buffer *buffers, + struct rpcrdma_req *req); void rpcrdma_recv_buffer_put(struct rpcrdma_rep *); bool rpcrdma_regbuf_realloc(struct rpcrdma_regbuf *rb, size_t size, -- cgit v1.2.3 From 675dd90ad0932f2c03912a5252458d792bd7033a Mon Sep 17 00:00:00 2001 From: Chuck Lever Date: Wed, 19 Jun 2019 10:33:42 -0400 Subject: xprtrdma: Modernize ops->connect Adapt and apply changes that were made to the TCP socket connect code. See the following commits for details on the purpose of these changes: Commit 7196dbb02ea0 ("SUNRPC: Allow changing of the TCP timeout parameters on the fly") Commit 3851f1cdb2b8 ("SUNRPC: Limit the reconnect backoff timer to the max RPC message timeout") Commit 02910177aede ("SUNRPC: Fix reconnection timeouts") Some common transport code is moved to xprt.c to satisfy the code duplication police. Signed-off-by: Chuck Lever Signed-off-by: Anna Schumaker --- include/linux/sunrpc/xprt.h | 3 ++ include/trace/events/rpcrdma.h | 31 +++++++++++++++++++ net/sunrpc/sched.c | 1 + net/sunrpc/xprt.c | 32 ++++++++++++++++++++ net/sunrpc/xprtrdma/transport.c | 66 +++++++++++++++++++++++++++++++---------- net/sunrpc/xprtrdma/xprt_rdma.h | 1 + net/sunrpc/xprtsock.c | 23 ++------------ 7 files changed, 121 insertions(+), 36 deletions(-) (limited to 'net') diff --git a/include/linux/sunrpc/xprt.h b/include/linux/sunrpc/xprt.h index a6d9fce7f20e..cc78fd38ea7d 100644 --- a/include/linux/sunrpc/xprt.h +++ b/include/linux/sunrpc/xprt.h @@ -334,6 +334,9 @@ struct xprt_class { */ struct rpc_xprt *xprt_create_transport(struct xprt_create *args); void xprt_connect(struct rpc_task *task); +unsigned long xprt_reconnect_delay(const struct rpc_xprt *xprt); +void xprt_reconnect_backoff(struct rpc_xprt *xprt, + unsigned long init_to); void xprt_reserve(struct rpc_task *task); void xprt_retry_reserve(struct rpc_task *task); int xprt_reserve_xprt(struct rpc_xprt *xprt, struct rpc_task *task); diff --git a/include/trace/events/rpcrdma.h b/include/trace/events/rpcrdma.h index 98023d91a72d..f6a4eaa85a3e 100644 --- a/include/trace/events/rpcrdma.h +++ b/include/trace/events/rpcrdma.h @@ -375,6 +375,37 @@ DEFINE_RXPRT_EVENT(xprtrdma_op_inject_dsc); DEFINE_RXPRT_EVENT(xprtrdma_op_close); DEFINE_RXPRT_EVENT(xprtrdma_op_connect); +TRACE_EVENT(xprtrdma_op_set_cto, + TP_PROTO( + const struct rpcrdma_xprt *r_xprt, + unsigned long connect, + unsigned long reconnect + ), + + TP_ARGS(r_xprt, connect, reconnect), + + TP_STRUCT__entry( + __field(const void *, r_xprt) + __field(unsigned long, connect) + __field(unsigned long, reconnect) + __string(addr, rpcrdma_addrstr(r_xprt)) + __string(port, rpcrdma_portstr(r_xprt)) + ), + + TP_fast_assign( + __entry->r_xprt = r_xprt; + __entry->connect = connect; + __entry->reconnect = reconnect; + __assign_str(addr, rpcrdma_addrstr(r_xprt)); + __assign_str(port, rpcrdma_portstr(r_xprt)); + ), + + TP_printk("peer=[%s]:%s r_xprt=%p: connect=%lu reconnect=%lu", + __get_str(addr), __get_str(port), __entry->r_xprt, + __entry->connect / HZ, __entry->reconnect / HZ + ) +); + TRACE_EVENT(xprtrdma_qp_event, TP_PROTO( const struct rpcrdma_xprt *r_xprt, diff --git a/net/sunrpc/sched.c b/net/sunrpc/sched.c index bb04ae52803a..5ad5dead7bfc 100644 --- a/net/sunrpc/sched.c +++ b/net/sunrpc/sched.c @@ -58,6 +58,7 @@ static struct rpc_wait_queue delay_queue; */ struct workqueue_struct *rpciod_workqueue __read_mostly; struct workqueue_struct *xprtiod_workqueue __read_mostly; +EXPORT_SYMBOL_GPL(xprtiod_workqueue); unsigned long rpc_task_timeout(const struct rpc_task *task) diff --git a/net/sunrpc/xprt.c b/net/sunrpc/xprt.c index ad21880d5601..b1f54b7ccc0c 100644 --- a/net/sunrpc/xprt.c +++ b/net/sunrpc/xprt.c @@ -850,6 +850,38 @@ void xprt_connect(struct rpc_task *task) xprt_release_write(xprt, task); } +/** + * xprt_reconnect_delay - compute the wait before scheduling a connect + * @xprt: transport instance + * + */ +unsigned long xprt_reconnect_delay(const struct rpc_xprt *xprt) +{ + unsigned long start, now = jiffies; + + start = xprt->stat.connect_start + xprt->reestablish_timeout; + if (time_after(start, now)) + return start - now; + return 0; +} +EXPORT_SYMBOL_GPL(xprt_reconnect_delay); + +/** + * xprt_reconnect_backoff - compute the new re-establish timeout + * @xprt: transport instance + * @init_to: initial reestablish timeout + * + */ +void xprt_reconnect_backoff(struct rpc_xprt *xprt, unsigned long init_to) +{ + xprt->reestablish_timeout <<= 1; + if (xprt->reestablish_timeout > xprt->max_reconnect_timeout) + xprt->reestablish_timeout = xprt->max_reconnect_timeout; + if (xprt->reestablish_timeout < init_to) + xprt->reestablish_timeout = init_to; +} +EXPORT_SYMBOL_GPL(xprt_reconnect_backoff); + enum xprt_xid_rb_cmp { XID_RB_EQUAL, XID_RB_LEFT, diff --git a/net/sunrpc/xprtrdma/transport.c b/net/sunrpc/xprtrdma/transport.c index 3688e0782587..4993aa49ecbe 100644 --- a/net/sunrpc/xprtrdma/transport.c +++ b/net/sunrpc/xprtrdma/transport.c @@ -298,6 +298,7 @@ xprt_rdma_destroy(struct rpc_xprt *xprt) module_put(THIS_MODULE); } +/* 60 second timeout, no retries */ static const struct rpc_timeout xprt_rdma_default_timeout = { .to_initval = 60 * HZ, .to_maxval = 60 * HZ, @@ -323,8 +324,9 @@ xprt_setup_rdma(struct xprt_create *args) if (!xprt) return ERR_PTR(-ENOMEM); - /* 60 second timeout, no retries */ xprt->timeout = &xprt_rdma_default_timeout; + xprt->connect_timeout = xprt->timeout->to_initval; + xprt->max_reconnect_timeout = xprt->timeout->to_maxval; xprt->bind_timeout = RPCRDMA_BIND_TO; xprt->reestablish_timeout = RPCRDMA_INIT_REEST_TO; xprt->idle_timeout = RPCRDMA_IDLE_DISC_TO; @@ -487,31 +489,64 @@ xprt_rdma_timer(struct rpc_xprt *xprt, struct rpc_task *task) } /** - * xprt_rdma_connect - try to establish a transport connection + * xprt_rdma_set_connect_timeout - set timeouts for establishing a connection + * @xprt: controlling transport instance + * @connect_timeout: reconnect timeout after client disconnects + * @reconnect_timeout: reconnect timeout after server disconnects + * + */ +static void xprt_rdma_tcp_set_connect_timeout(struct rpc_xprt *xprt, + unsigned long connect_timeout, + unsigned long reconnect_timeout) +{ + struct rpcrdma_xprt *r_xprt = rpcx_to_rdmax(xprt); + + trace_xprtrdma_op_set_cto(r_xprt, connect_timeout, reconnect_timeout); + + spin_lock(&xprt->transport_lock); + + if (connect_timeout < xprt->connect_timeout) { + struct rpc_timeout to; + unsigned long initval; + + to = *xprt->timeout; + initval = connect_timeout; + if (initval < RPCRDMA_INIT_REEST_TO << 1) + initval = RPCRDMA_INIT_REEST_TO << 1; + to.to_initval = initval; + to.to_maxval = initval; + r_xprt->rx_timeout = to; + xprt->timeout = &r_xprt->rx_timeout; + xprt->connect_timeout = connect_timeout; + } + + if (reconnect_timeout < xprt->max_reconnect_timeout) + xprt->max_reconnect_timeout = reconnect_timeout; + + spin_unlock(&xprt->transport_lock); +} + +/** + * xprt_rdma_connect - schedule an attempt to reconnect * @xprt: transport state - * @task: RPC scheduler context + * @task: RPC scheduler context (unused) * */ static void xprt_rdma_connect(struct rpc_xprt *xprt, struct rpc_task *task) { struct rpcrdma_xprt *r_xprt = rpcx_to_rdmax(xprt); + unsigned long delay; trace_xprtrdma_op_connect(r_xprt); + + delay = 0; if (r_xprt->rx_ep.rep_connected != 0) { - /* Reconnect */ - schedule_delayed_work(&r_xprt->rx_connect_worker, - xprt->reestablish_timeout); - xprt->reestablish_timeout <<= 1; - if (xprt->reestablish_timeout > RPCRDMA_MAX_REEST_TO) - xprt->reestablish_timeout = RPCRDMA_MAX_REEST_TO; - else if (xprt->reestablish_timeout < RPCRDMA_INIT_REEST_TO) - xprt->reestablish_timeout = RPCRDMA_INIT_REEST_TO; - } else { - schedule_delayed_work(&r_xprt->rx_connect_worker, 0); - if (!RPC_IS_ASYNC(task)) - flush_delayed_work(&r_xprt->rx_connect_worker); + delay = xprt_reconnect_delay(xprt); + xprt_reconnect_backoff(xprt, RPCRDMA_INIT_REEST_TO); } + queue_delayed_work(xprtiod_workqueue, &r_xprt->rx_connect_worker, + delay); } /** @@ -769,6 +804,7 @@ static const struct rpc_xprt_ops xprt_rdma_procs = { .send_request = xprt_rdma_send_request, .close = xprt_rdma_close, .destroy = xprt_rdma_destroy, + .set_connect_timeout = xprt_rdma_tcp_set_connect_timeout, .print_stats = xprt_rdma_print_stats, .enable_swap = xprt_rdma_enable_swap, .disable_swap = xprt_rdma_disable_swap, diff --git a/net/sunrpc/xprtrdma/xprt_rdma.h b/net/sunrpc/xprtrdma/xprt_rdma.h index 117e32816e4f..8378f45d2da7 100644 --- a/net/sunrpc/xprtrdma/xprt_rdma.h +++ b/net/sunrpc/xprtrdma/xprt_rdma.h @@ -432,6 +432,7 @@ struct rpcrdma_xprt { struct rpcrdma_ep rx_ep; struct rpcrdma_buffer rx_buf; struct delayed_work rx_connect_worker; + struct rpc_timeout rx_timeout; struct rpcrdma_stats rx_stats; }; diff --git a/net/sunrpc/xprtsock.c b/net/sunrpc/xprtsock.c index c69951ed2ebc..b154600085d6 100644 --- a/net/sunrpc/xprtsock.c +++ b/net/sunrpc/xprtsock.c @@ -2402,25 +2402,6 @@ out: xprt_wake_pending_tasks(xprt, status); } -static unsigned long xs_reconnect_delay(const struct rpc_xprt *xprt) -{ - unsigned long start, now = jiffies; - - start = xprt->stat.connect_start + xprt->reestablish_timeout; - if (time_after(start, now)) - return start - now; - return 0; -} - -static void xs_reconnect_backoff(struct rpc_xprt *xprt) -{ - xprt->reestablish_timeout <<= 1; - if (xprt->reestablish_timeout > xprt->max_reconnect_timeout) - xprt->reestablish_timeout = xprt->max_reconnect_timeout; - if (xprt->reestablish_timeout < XS_TCP_INIT_REEST_TO) - xprt->reestablish_timeout = XS_TCP_INIT_REEST_TO; -} - /** * xs_connect - connect a socket to a remote endpoint * @xprt: pointer to transport structure @@ -2450,8 +2431,8 @@ static void xs_connect(struct rpc_xprt *xprt, struct rpc_task *task) /* Start by resetting any existing state */ xs_reset_transport(transport); - delay = xs_reconnect_delay(xprt); - xs_reconnect_backoff(xprt); + delay = xprt_reconnect_delay(xprt); + xprt_reconnect_backoff(xprt, XS_TCP_INIT_REEST_TO); } else dprintk("RPC: xs_connect scheduled xprt %p\n", xprt); -- cgit v1.2.3 From a101b043c44dfcb63bed7f29a675e9fa0259005e Mon Sep 17 00:00:00 2001 From: Trond Myklebust Date: Thu, 11 Jul 2019 16:33:12 -0400 Subject: SUNRPC: Fix transport accounting when caller specifies an rpc_xprt Ensure that we do the required accounting for the round robin queue when the caller to rpc_init_task() has passed in a transport to be used. Reported-by: Olga Kornievskaia Reported-by: Neil Brown Signed-off-by: Trond Myklebust --- include/linux/sunrpc/clnt.h | 2 ++ net/sunrpc/clnt.c | 42 ++++++++++++++++++++---------------------- net/sunrpc/sched.c | 3 ++- 3 files changed, 24 insertions(+), 23 deletions(-) (limited to 'net') diff --git a/include/linux/sunrpc/clnt.h b/include/linux/sunrpc/clnt.h index 4619098affa3..4e070e00c143 100644 --- a/include/linux/sunrpc/clnt.h +++ b/include/linux/sunrpc/clnt.h @@ -164,6 +164,8 @@ void rpc_shutdown_client(struct rpc_clnt *); void rpc_release_client(struct rpc_clnt *); void rpc_task_release_transport(struct rpc_task *); void rpc_task_release_client(struct rpc_task *); +struct rpc_xprt *rpc_task_get_xprt(struct rpc_clnt *clnt, + struct rpc_xprt *xprt); int rpcb_create_local(struct net *); void rpcb_put_local(struct net *); diff --git a/net/sunrpc/clnt.c b/net/sunrpc/clnt.c index d599fab8adcb..383555d2b522 100644 --- a/net/sunrpc/clnt.c +++ b/net/sunrpc/clnt.c @@ -978,11 +978,10 @@ out: } EXPORT_SYMBOL_GPL(rpc_bind_new_program); -static struct rpc_xprt * -rpc_task_get_xprt(struct rpc_clnt *clnt) +struct rpc_xprt * +rpc_task_get_xprt(struct rpc_clnt *clnt, struct rpc_xprt *xprt) { struct rpc_xprt_switch *xps; - struct rpc_xprt *xprt= xprt_iter_get_next(&clnt->cl_xpi); if (!xprt) return NULL; @@ -995,24 +994,6 @@ rpc_task_get_xprt(struct rpc_clnt *clnt) return xprt; } -static struct rpc_xprt * -rpc_task_get_first_xprt(struct rpc_clnt *clnt) -{ - struct rpc_xprt_switch *xps; - struct rpc_xprt *xprt; - - rcu_read_lock(); - xprt = xprt_get(rcu_dereference(clnt->cl_xprt)); - if (xprt) { - atomic_long_inc(&xprt->queuelen); - xps = rcu_dereference(clnt->cl_xpi.xpi_xpswitch); - atomic_long_inc(&xps->xps_queuelen); - } - rcu_read_unlock(); - - return xprt; -} - static void rpc_task_release_xprt(struct rpc_clnt *clnt, struct rpc_xprt *xprt) { @@ -1057,6 +1038,23 @@ void rpc_task_release_client(struct rpc_task *task) } } +static struct rpc_xprt * +rpc_task_get_first_xprt(struct rpc_clnt *clnt) +{ + struct rpc_xprt *xprt; + + rcu_read_lock(); + xprt = xprt_get(rcu_dereference(clnt->cl_xprt)); + rcu_read_unlock(); + return rpc_task_get_xprt(clnt, xprt); +} + +static struct rpc_xprt * +rpc_task_get_next_xprt(struct rpc_clnt *clnt) +{ + return rpc_task_get_xprt(clnt, xprt_iter_get_next(&clnt->cl_xpi)); +} + static void rpc_task_set_transport(struct rpc_task *task, struct rpc_clnt *clnt) { @@ -1065,7 +1063,7 @@ void rpc_task_set_transport(struct rpc_task *task, struct rpc_clnt *clnt) if (task->tk_flags & RPC_TASK_NO_ROUND_ROBIN) task->tk_xprt = rpc_task_get_first_xprt(clnt); else - task->tk_xprt = rpc_task_get_xprt(clnt); + task->tk_xprt = rpc_task_get_next_xprt(clnt); } static diff --git a/net/sunrpc/sched.c b/net/sunrpc/sched.c index 8a0779e963f9..1f275aba786f 100644 --- a/net/sunrpc/sched.c +++ b/net/sunrpc/sched.c @@ -1092,7 +1092,8 @@ static void rpc_init_task(struct rpc_task *task, const struct rpc_task_setup *ta /* Initialize workqueue for async tasks */ task->tk_workqueue = task_setup_data->workqueue; - task->tk_xprt = xprt_get(task_setup_data->rpc_xprt); + task->tk_xprt = rpc_task_get_xprt(task_setup_data->rpc_client, + xprt_get(task_setup_data->rpc_xprt)); task->tk_op_cred = get_rpccred(task_setup_data->rpc_op_cred); -- cgit v1.2.3 From 3cf7292280d5e484747a5f0ee4b62327b037344c Mon Sep 17 00:00:00 2001 From: Trond Myklebust Date: Mon, 15 Jul 2019 15:12:08 -0400 Subject: SUNRPC: Replace division by multiplication in calculation of queue length When checking whether or not a particular xprt queue length is shorter than the average queue length for all xprts, prefer to use multiplication rather than division for performance reasons. Signed-off-by: Trond Myklebust --- net/sunrpc/xprtmultipath.c | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) (limited to 'net') diff --git a/net/sunrpc/xprtmultipath.c b/net/sunrpc/xprtmultipath.c index 9d66ce53355d..5df4e7adedf0 100644 --- a/net/sunrpc/xprtmultipath.c +++ b/net/sunrpc/xprtmultipath.c @@ -322,7 +322,6 @@ struct rpc_xprt *xprt_iter_next_entry_roundrobin(struct rpc_xprt_iter *xpi) struct rpc_xprt *xprt; unsigned long xprt_queuelen; unsigned long xps_queuelen; - unsigned long xps_avglen; do { xprt = xprt_iter_next_entry_multiple(xpi, @@ -333,8 +332,8 @@ struct rpc_xprt *xprt_iter_next_entry_roundrobin(struct rpc_xprt_iter *xpi) if (xprt_queuelen <= 2) break; xps_queuelen = atomic_long_read(&xps->xps_queuelen); - xps_avglen = DIV_ROUND_UP(xps_queuelen, xps->xps_nactive); - } while (xprt_queuelen > xps_avglen); + /* Exit loop if xprt_queuelen <= average queue length */ + } while (xprt_queuelen * READ_ONCE(xps->xps_nactive) > xps_queuelen); return xprt; } -- cgit v1.2.3 From 163f88211c147d96b46e376337190a02203ace02 Mon Sep 17 00:00:00 2001 From: Trond Myklebust Date: Tue, 16 Jul 2019 07:07:28 -0400 Subject: SUNRPC: Skip zero-refcount transports When looking for the next transport to use for an RPC call, skip those that are in the process of being destroyed and that have a zero refcount. Signed-off-by: Trond Myklebust --- net/sunrpc/xprtmultipath.c | 20 +++++++++++++++++++- 1 file changed, 19 insertions(+), 1 deletion(-) (limited to 'net') diff --git a/net/sunrpc/xprtmultipath.c b/net/sunrpc/xprtmultipath.c index 5df4e7adedf0..c12778e1235e 100644 --- a/net/sunrpc/xprtmultipath.c +++ b/net/sunrpc/xprtmultipath.c @@ -193,10 +193,22 @@ void xprt_iter_default_rewind(struct rpc_xprt_iter *xpi) WRITE_ONCE(xpi->xpi_cursor, NULL); } +static +bool xprt_is_active(const struct rpc_xprt *xprt) +{ + return kref_read(&xprt->kref) != 0; +} + static struct rpc_xprt *xprt_switch_find_first_entry(struct list_head *head) { - return list_first_or_null_rcu(head, struct rpc_xprt, xprt_switch); + struct rpc_xprt *pos; + + list_for_each_entry_rcu(pos, head, xprt_switch) { + if (xprt_is_active(pos)) + return pos; + } + return NULL; } static @@ -214,9 +226,12 @@ struct rpc_xprt *xprt_switch_find_current_entry(struct list_head *head, const struct rpc_xprt *cur) { struct rpc_xprt *pos; + bool found = false; list_for_each_entry_rcu(pos, head, xprt_switch) { if (cur == pos) + found = true; + if (found && xprt_is_active(pos)) return pos; } return NULL; @@ -261,9 +276,12 @@ struct rpc_xprt *xprt_switch_find_next_entry(struct list_head *head, const struct rpc_xprt *cur) { struct rpc_xprt *pos, *prev = NULL; + bool found = false; list_for_each_entry_rcu(pos, head, xprt_switch) { if (cur == prev) + found = true; + if (found && xprt_is_active(pos)) return pos; prev = pos; } -- cgit v1.2.3 From 9f98effc19bd490a375c4e764a56a2c015d27c3c Mon Sep 17 00:00:00 2001 From: Trond Myklebust Date: Thu, 18 Jul 2019 01:10:51 -0400 Subject: SUNRPC: Fix initialisation of struct rpc_xprt_switch Ensure that we do initialise the fields xps_nactive, xps_queuelen and xps_net. Signed-off-by: Trond Myklebust --- net/sunrpc/xprtmultipath.c | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) (limited to 'net') diff --git a/net/sunrpc/xprtmultipath.c b/net/sunrpc/xprtmultipath.c index c12778e1235e..f3ecd0bee484 100644 --- a/net/sunrpc/xprtmultipath.c +++ b/net/sunrpc/xprtmultipath.c @@ -103,7 +103,9 @@ struct rpc_xprt_switch *xprt_switch_alloc(struct rpc_xprt *xprt, if (xps != NULL) { spin_lock_init(&xps->xps_lock); kref_init(&xps->xps_kref); - xps->xps_nxprts = 0; + xps->xps_nxprts = xps->xps_nactive = 0; + atomic_long_set(&xps->xps_queuelen, 0); + xps->xps_net = NULL; INIT_LIST_HEAD(&xps->xps_xprt_list); xps->xps_iter_ops = &rpc_xprt_iter_singular; xprt_switch_add_xprt_locked(xps, xprt); -- cgit v1.2.3 From 7402a4fedc2bc448100c2d086406c708451b16dc Mon Sep 17 00:00:00 2001 From: Trond Myklebust Date: Tue, 16 Jul 2019 13:51:29 -0400 Subject: SUNRPC: Fix up backchannel slot table accounting Add a per-transport maximum limit in the socket case, and add helpers to allow the NFSv4 code to discover that limit. Signed-off-by: Trond Myklebust --- fs/nfs/nfs4proc.c | 3 +++ include/linux/sunrpc/bc_xprt.h | 1 + include/linux/sunrpc/clnt.h | 1 + include/linux/sunrpc/xprt.h | 6 ++++-- net/sunrpc/backchannel_rqst.c | 40 +++++++++++++++++++++------------------ net/sunrpc/clnt.c | 13 +++++++++++++ net/sunrpc/svc.c | 2 +- net/sunrpc/xprtrdma/backchannel.c | 7 +++++++ net/sunrpc/xprtrdma/transport.c | 1 + net/sunrpc/xprtrdma/xprt_rdma.h | 1 + net/sunrpc/xprtsock.c | 1 + 11 files changed, 55 insertions(+), 21 deletions(-) (limited to 'net') diff --git a/fs/nfs/nfs4proc.c b/fs/nfs/nfs4proc.c index 52de7245a2ee..39896afc6edf 100644 --- a/fs/nfs/nfs4proc.c +++ b/fs/nfs/nfs4proc.c @@ -8380,6 +8380,7 @@ static void nfs4_init_channel_attrs(struct nfs41_create_session_args *args, { unsigned int max_rqst_sz, max_resp_sz; unsigned int max_bc_payload = rpc_max_bc_payload(clnt); + unsigned int max_bc_slots = rpc_num_bc_slots(clnt); max_rqst_sz = NFS_MAX_FILE_IO_SIZE + nfs41_maxwrite_overhead; max_resp_sz = NFS_MAX_FILE_IO_SIZE + nfs41_maxread_overhead; @@ -8402,6 +8403,8 @@ static void nfs4_init_channel_attrs(struct nfs41_create_session_args *args, args->bc_attrs.max_resp_sz_cached = 0; args->bc_attrs.max_ops = NFS4_MAX_BACK_CHANNEL_OPS; args->bc_attrs.max_reqs = max_t(unsigned short, max_session_cb_slots, 1); + if (args->bc_attrs.max_reqs > max_bc_slots) + args->bc_attrs.max_reqs = max_bc_slots; dprintk("%s: Back Channel : max_rqst_sz=%u max_resp_sz=%u " "max_resp_sz_cached=%u max_ops=%u max_reqs=%u\n", diff --git a/include/linux/sunrpc/bc_xprt.h b/include/linux/sunrpc/bc_xprt.h index d4229a78524a..87d27e13d885 100644 --- a/include/linux/sunrpc/bc_xprt.h +++ b/include/linux/sunrpc/bc_xprt.h @@ -43,6 +43,7 @@ void xprt_destroy_backchannel(struct rpc_xprt *, unsigned int max_reqs); int xprt_setup_bc(struct rpc_xprt *xprt, unsigned int min_reqs); void xprt_destroy_bc(struct rpc_xprt *xprt, unsigned int max_reqs); void xprt_free_bc_rqst(struct rpc_rqst *req); +unsigned int xprt_bc_max_slots(struct rpc_xprt *xprt); /* * Determine if a shared backchannel is in use diff --git a/include/linux/sunrpc/clnt.h b/include/linux/sunrpc/clnt.h index 4e070e00c143..abc63bd1be2b 100644 --- a/include/linux/sunrpc/clnt.h +++ b/include/linux/sunrpc/clnt.h @@ -194,6 +194,7 @@ void rpc_setbufsize(struct rpc_clnt *, unsigned int, unsigned int); struct net * rpc_net_ns(struct rpc_clnt *); size_t rpc_max_payload(struct rpc_clnt *); size_t rpc_max_bc_payload(struct rpc_clnt *); +unsigned int rpc_num_bc_slots(struct rpc_clnt *); void rpc_force_rebind(struct rpc_clnt *); size_t rpc_peeraddr(struct rpc_clnt *, struct sockaddr *, size_t); const char *rpc_peeraddr2str(struct rpc_clnt *, enum rpc_display_format_t); diff --git a/include/linux/sunrpc/xprt.h b/include/linux/sunrpc/xprt.h index ed76e5fb36c1..13e108bcc9eb 100644 --- a/include/linux/sunrpc/xprt.h +++ b/include/linux/sunrpc/xprt.h @@ -158,6 +158,7 @@ struct rpc_xprt_ops { int (*bc_setup)(struct rpc_xprt *xprt, unsigned int min_reqs); size_t (*bc_maxpayload)(struct rpc_xprt *xprt); + unsigned int (*bc_num_slots)(struct rpc_xprt *xprt); void (*bc_free_rqst)(struct rpc_rqst *rqst); void (*bc_destroy)(struct rpc_xprt *xprt, unsigned int max_reqs); @@ -251,8 +252,9 @@ struct rpc_xprt { #if defined(CONFIG_SUNRPC_BACKCHANNEL) struct svc_serv *bc_serv; /* The RPC service which will */ /* process the callback */ - int bc_alloc_count; /* Total number of preallocs */ - atomic_t bc_free_slots; + unsigned int bc_alloc_max; + unsigned int bc_alloc_count; /* Total number of preallocs */ + atomic_t bc_slot_count; /* Number of allocated slots */ spinlock_t bc_pa_lock; /* Protects the preallocated * items */ struct list_head bc_pa_list; /* List of preallocated diff --git a/net/sunrpc/backchannel_rqst.c b/net/sunrpc/backchannel_rqst.c index c47d82622fd1..339e8c077c2d 100644 --- a/net/sunrpc/backchannel_rqst.c +++ b/net/sunrpc/backchannel_rqst.c @@ -31,25 +31,20 @@ SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. #define RPCDBG_FACILITY RPCDBG_TRANS #endif +#define BC_MAX_SLOTS 64U + +unsigned int xprt_bc_max_slots(struct rpc_xprt *xprt) +{ + return BC_MAX_SLOTS; +} + /* * Helper routines that track the number of preallocation elements * on the transport. */ static inline int xprt_need_to_requeue(struct rpc_xprt *xprt) { - return xprt->bc_alloc_count < atomic_read(&xprt->bc_free_slots); -} - -static inline void xprt_inc_alloc_count(struct rpc_xprt *xprt, unsigned int n) -{ - atomic_add(n, &xprt->bc_free_slots); - xprt->bc_alloc_count += n; -} - -static inline int xprt_dec_alloc_count(struct rpc_xprt *xprt, unsigned int n) -{ - atomic_sub(n, &xprt->bc_free_slots); - return xprt->bc_alloc_count -= n; + return xprt->bc_alloc_count < xprt->bc_alloc_max; } /* @@ -145,6 +140,9 @@ int xprt_setup_bc(struct rpc_xprt *xprt, unsigned int min_reqs) dprintk("RPC: setup backchannel transport\n"); + if (min_reqs > BC_MAX_SLOTS) + min_reqs = BC_MAX_SLOTS; + /* * We use a temporary list to keep track of the preallocated * buffers. Once we're done building the list we splice it @@ -172,7 +170,9 @@ int xprt_setup_bc(struct rpc_xprt *xprt, unsigned int min_reqs) */ spin_lock(&xprt->bc_pa_lock); list_splice(&tmp_list, &xprt->bc_pa_list); - xprt_inc_alloc_count(xprt, min_reqs); + xprt->bc_alloc_count += min_reqs; + xprt->bc_alloc_max += min_reqs; + atomic_add(min_reqs, &xprt->bc_slot_count); spin_unlock(&xprt->bc_pa_lock); dprintk("RPC: setup backchannel transport done\n"); @@ -220,11 +220,13 @@ void xprt_destroy_bc(struct rpc_xprt *xprt, unsigned int max_reqs) goto out; spin_lock_bh(&xprt->bc_pa_lock); - xprt_dec_alloc_count(xprt, max_reqs); + xprt->bc_alloc_max -= max_reqs; list_for_each_entry_safe(req, tmp, &xprt->bc_pa_list, rq_bc_pa_list) { dprintk("RPC: req=%p\n", req); list_del(&req->rq_bc_pa_list); xprt_free_allocation(req); + xprt->bc_alloc_count--; + atomic_dec(&xprt->bc_slot_count); if (--max_reqs == 0) break; } @@ -241,13 +243,14 @@ static struct rpc_rqst *xprt_get_bc_request(struct rpc_xprt *xprt, __be32 xid, struct rpc_rqst *req = NULL; dprintk("RPC: allocate a backchannel request\n"); - if (atomic_read(&xprt->bc_free_slots) <= 0) - goto not_found; if (list_empty(&xprt->bc_pa_list)) { if (!new) goto not_found; + if (atomic_read(&xprt->bc_slot_count) >= BC_MAX_SLOTS) + goto not_found; list_add_tail(&new->rq_bc_pa_list, &xprt->bc_pa_list); xprt->bc_alloc_count++; + atomic_inc(&xprt->bc_slot_count); } req = list_first_entry(&xprt->bc_pa_list, struct rpc_rqst, rq_bc_pa_list); @@ -291,6 +294,7 @@ void xprt_free_bc_rqst(struct rpc_rqst *req) if (xprt_need_to_requeue(xprt)) { list_add_tail(&req->rq_bc_pa_list, &xprt->bc_pa_list); xprt->bc_alloc_count++; + atomic_inc(&xprt->bc_slot_count); req = NULL; } spin_unlock_bh(&xprt->bc_pa_lock); @@ -357,7 +361,7 @@ void xprt_complete_bc_request(struct rpc_rqst *req, uint32_t copied) spin_lock(&xprt->bc_pa_lock); list_del(&req->rq_bc_pa_list); - xprt_dec_alloc_count(xprt, 1); + xprt->bc_alloc_count--; spin_unlock(&xprt->bc_pa_lock); req->rq_private_buf.len = copied; diff --git a/net/sunrpc/clnt.c b/net/sunrpc/clnt.c index 383555d2b522..79c849391cb9 100644 --- a/net/sunrpc/clnt.c +++ b/net/sunrpc/clnt.c @@ -1526,6 +1526,19 @@ size_t rpc_max_bc_payload(struct rpc_clnt *clnt) } EXPORT_SYMBOL_GPL(rpc_max_bc_payload); +unsigned int rpc_num_bc_slots(struct rpc_clnt *clnt) +{ + struct rpc_xprt *xprt; + unsigned int ret; + + rcu_read_lock(); + xprt = rcu_dereference(clnt->cl_xprt); + ret = xprt->ops->bc_num_slots(xprt); + rcu_read_unlock(); + return ret; +} +EXPORT_SYMBOL_GPL(rpc_num_bc_slots); + /** * rpc_force_rebind - force transport to check that remote port is unchanged * @clnt: client to rebind diff --git a/net/sunrpc/svc.c b/net/sunrpc/svc.c index e15cb704453e..220b79988000 100644 --- a/net/sunrpc/svc.c +++ b/net/sunrpc/svc.c @@ -1595,7 +1595,7 @@ bc_svc_process(struct svc_serv *serv, struct rpc_rqst *req, /* Parse and execute the bc call */ proc_error = svc_process_common(rqstp, argv, resv); - atomic_inc(&req->rq_xprt->bc_free_slots); + atomic_dec(&req->rq_xprt->bc_slot_count); if (!proc_error) { /* Processing error: drop the request */ xprt_free_bc_request(req); diff --git a/net/sunrpc/xprtrdma/backchannel.c b/net/sunrpc/xprtrdma/backchannel.c index ce986591f213..59e624b1d7a0 100644 --- a/net/sunrpc/xprtrdma/backchannel.c +++ b/net/sunrpc/xprtrdma/backchannel.c @@ -52,6 +52,13 @@ size_t xprt_rdma_bc_maxpayload(struct rpc_xprt *xprt) return maxmsg - RPCRDMA_HDRLEN_MIN; } +unsigned int xprt_rdma_bc_max_slots(struct rpc_xprt *xprt) +{ + struct rpcrdma_xprt *r_xprt = rpcx_to_rdmax(xprt); + + return r_xprt->rx_buf.rb_bc_srv_max_requests; +} + static int rpcrdma_bc_marshal_reply(struct rpc_rqst *rqst) { struct rpcrdma_xprt *r_xprt = rpcx_to_rdmax(rqst->rq_xprt); diff --git a/net/sunrpc/xprtrdma/transport.c b/net/sunrpc/xprtrdma/transport.c index 4993aa49ecbe..52abddac19e5 100644 --- a/net/sunrpc/xprtrdma/transport.c +++ b/net/sunrpc/xprtrdma/transport.c @@ -812,6 +812,7 @@ static const struct rpc_xprt_ops xprt_rdma_procs = { #if defined(CONFIG_SUNRPC_BACKCHANNEL) .bc_setup = xprt_rdma_bc_setup, .bc_maxpayload = xprt_rdma_bc_maxpayload, + .bc_num_slots = xprt_rdma_bc_max_slots, .bc_free_rqst = xprt_rdma_bc_free_rqst, .bc_destroy = xprt_rdma_bc_destroy, #endif diff --git a/net/sunrpc/xprtrdma/xprt_rdma.h b/net/sunrpc/xprtrdma/xprt_rdma.h index 8378f45d2da7..92ce09fcea74 100644 --- a/net/sunrpc/xprtrdma/xprt_rdma.h +++ b/net/sunrpc/xprtrdma/xprt_rdma.h @@ -605,6 +605,7 @@ void xprt_rdma_cleanup(void); #if defined(CONFIG_SUNRPC_BACKCHANNEL) int xprt_rdma_bc_setup(struct rpc_xprt *, unsigned int); size_t xprt_rdma_bc_maxpayload(struct rpc_xprt *); +unsigned int xprt_rdma_bc_max_slots(struct rpc_xprt *); int rpcrdma_bc_post_recv(struct rpcrdma_xprt *, unsigned int); void rpcrdma_bc_receive_call(struct rpcrdma_xprt *, struct rpcrdma_rep *); int xprt_rdma_bc_send_reply(struct rpc_rqst *rqst); diff --git a/net/sunrpc/xprtsock.c b/net/sunrpc/xprtsock.c index 3c2cc96afcaa..6b1fca51028a 100644 --- a/net/sunrpc/xprtsock.c +++ b/net/sunrpc/xprtsock.c @@ -2788,6 +2788,7 @@ static const struct rpc_xprt_ops xs_tcp_ops = { #ifdef CONFIG_SUNRPC_BACKCHANNEL .bc_setup = xprt_setup_bc, .bc_maxpayload = xs_tcp_bc_maxpayload, + .bc_num_slots = xprt_bc_max_slots, .bc_free_rqst = xprt_free_bc_rqst, .bc_destroy = xprt_destroy_bc, #endif -- cgit v1.2.3 From 75369089820473eac45e9ddd970081901a373c08 Mon Sep 17 00:00:00 2001 From: Trond Myklebust Date: Wed, 17 Jul 2019 21:22:38 -0400 Subject: SUNRPC: Ensure the bvecs are reset when we re-encode the RPC request The bvec tracks the list of pages, so if the number of pages changes due to a re-encode, we need to reset the bvec as well. Fixes: 277e4ab7d530 ("SUNRPC: Simplify TCP receive code by switching...") Signed-off-by: Trond Myklebust Cc: stable@vger.kernel.org # v4.20+ --- net/sunrpc/clnt.c | 3 +-- net/sunrpc/xprt.c | 2 ++ net/sunrpc/xprtsock.c | 1 + 3 files changed, 4 insertions(+), 2 deletions(-) (limited to 'net') diff --git a/net/sunrpc/clnt.c b/net/sunrpc/clnt.c index 79c849391cb9..d8679b6027e9 100644 --- a/net/sunrpc/clnt.c +++ b/net/sunrpc/clnt.c @@ -1865,6 +1865,7 @@ rpc_xdr_encode(struct rpc_task *task) req->rq_snd_buf.head[0].iov_len = 0; xdr_init_encode(&xdr, &req->rq_snd_buf, req->rq_snd_buf.head[0].iov_base, req); + xdr_free_bvec(&req->rq_snd_buf); if (rpc_encode_header(task, &xdr)) return; @@ -1904,8 +1905,6 @@ call_encode(struct rpc_task *task) rpc_call_rpcerror(task, task->tk_status); } return; - } else { - xprt_request_prepare(task->tk_rqstp); } /* Add task to reply queue before transmission to avoid races */ diff --git a/net/sunrpc/xprt.c b/net/sunrpc/xprt.c index 70a704c44c6d..783748dc5e6f 100644 --- a/net/sunrpc/xprt.c +++ b/net/sunrpc/xprt.c @@ -1041,6 +1041,8 @@ xprt_request_enqueue_receive(struct rpc_task *task) if (!xprt_request_need_enqueue_receive(task, req)) return; + + xprt_request_prepare(task->tk_rqstp); spin_lock(&xprt->queue_lock); /* Update the softirq receive buffer */ diff --git a/net/sunrpc/xprtsock.c b/net/sunrpc/xprtsock.c index 6b1fca51028a..e2176c167a57 100644 --- a/net/sunrpc/xprtsock.c +++ b/net/sunrpc/xprtsock.c @@ -909,6 +909,7 @@ static int xs_nospace(struct rpc_rqst *req) static void xs_stream_prepare_request(struct rpc_rqst *req) { + xdr_free_bvec(&req->rq_rcv_buf); req->rq_task->tk_status = xdr_alloc_bvec(&req->rq_rcv_buf, GFP_KERNEL); } -- cgit v1.2.3 From f554af280a80a49412acdd26a6371963f4741e70 Mon Sep 17 00:00:00 2001 From: Trond Myklebust Date: Tue, 16 Jul 2019 13:27:23 -0400 Subject: SUNRPC: Optimise transport balancing code Moves the balancing code to avoid doing cursor changes on every search iteration. Signed-off-by: Trond Myklebust --- net/sunrpc/xprtmultipath.c | 67 ++++++++++++++++++++++++++-------------------- 1 file changed, 38 insertions(+), 29 deletions(-) (limited to 'net') diff --git a/net/sunrpc/xprtmultipath.c b/net/sunrpc/xprtmultipath.c index f3ecd0bee484..78c075a68c04 100644 --- a/net/sunrpc/xprtmultipath.c +++ b/net/sunrpc/xprtmultipath.c @@ -19,7 +19,7 @@ #include #include -typedef struct rpc_xprt *(*xprt_switch_find_xprt_t)(struct list_head *head, +typedef struct rpc_xprt *(*xprt_switch_find_xprt_t)(struct rpc_xprt_switch *xps, const struct rpc_xprt *cur); static const struct rpc_xprt_iter_ops rpc_xprt_iter_singular; @@ -291,22 +291,15 @@ struct rpc_xprt *xprt_switch_find_next_entry(struct list_head *head, } static -struct rpc_xprt *xprt_switch_set_next_cursor(struct list_head *head, +struct rpc_xprt *xprt_switch_set_next_cursor(struct rpc_xprt_switch *xps, struct rpc_xprt **cursor, xprt_switch_find_xprt_t find_next) { - struct rpc_xprt *cur, *pos, *old; + struct rpc_xprt *pos, *old; - cur = READ_ONCE(*cursor); - for (;;) { - old = cur; - pos = find_next(head, old); - if (pos == NULL) - break; - cur = cmpxchg_relaxed(cursor, old, pos); - if (cur == old) - break; - } + old = smp_load_acquire(cursor); + pos = find_next(xps, old); + smp_store_release(cursor, pos); return pos; } @@ -318,13 +311,11 @@ struct rpc_xprt *xprt_iter_next_entry_multiple(struct rpc_xprt_iter *xpi, if (xps == NULL) return NULL; - return xprt_switch_set_next_cursor(&xps->xps_xprt_list, - &xpi->xpi_cursor, - find_next); + return xprt_switch_set_next_cursor(xps, &xpi->xpi_cursor, find_next); } static -struct rpc_xprt *xprt_switch_find_next_entry_roundrobin(struct list_head *head, +struct rpc_xprt *__xprt_switch_find_next_entry_roundrobin(struct list_head *head, const struct rpc_xprt *cur) { struct rpc_xprt *ret; @@ -336,31 +327,49 @@ struct rpc_xprt *xprt_switch_find_next_entry_roundrobin(struct list_head *head, } static -struct rpc_xprt *xprt_iter_next_entry_roundrobin(struct rpc_xprt_iter *xpi) +struct rpc_xprt *xprt_switch_find_next_entry_roundrobin(struct rpc_xprt_switch *xps, + const struct rpc_xprt *cur) { - struct rpc_xprt_switch *xps = rcu_dereference(xpi->xpi_xpswitch); + struct list_head *head = &xps->xps_xprt_list; struct rpc_xprt *xprt; - unsigned long xprt_queuelen; - unsigned long xps_queuelen; + unsigned int nactive; - do { - xprt = xprt_iter_next_entry_multiple(xpi, - xprt_switch_find_next_entry_roundrobin); - if (xprt == NULL) + for (;;) { + unsigned long xprt_queuelen, xps_queuelen; + + xprt = __xprt_switch_find_next_entry_roundrobin(head, cur); + if (!xprt) break; xprt_queuelen = atomic_long_read(&xprt->queuelen); - if (xprt_queuelen <= 2) - break; xps_queuelen = atomic_long_read(&xps->xps_queuelen); + nactive = READ_ONCE(xps->xps_nactive); /* Exit loop if xprt_queuelen <= average queue length */ - } while (xprt_queuelen * READ_ONCE(xps->xps_nactive) > xps_queuelen); + if (xprt_queuelen * nactive <= xps_queuelen) + break; + cur = xprt; + } return xprt; } +static +struct rpc_xprt *xprt_iter_next_entry_roundrobin(struct rpc_xprt_iter *xpi) +{ + return xprt_iter_next_entry_multiple(xpi, + xprt_switch_find_next_entry_roundrobin); +} + +static +struct rpc_xprt *xprt_switch_find_next_entry_all(struct rpc_xprt_switch *xps, + const struct rpc_xprt *cur) +{ + return xprt_switch_find_next_entry(&xps->xps_xprt_list, cur); +} + static struct rpc_xprt *xprt_iter_next_entry_all(struct rpc_xprt_iter *xpi) { - return xprt_iter_next_entry_multiple(xpi, xprt_switch_find_next_entry); + return xprt_iter_next_entry_multiple(xpi, + xprt_switch_find_next_entry_all); } /* -- cgit v1.2.3