diff options
Diffstat (limited to 'net/rxrpc/sendmsg.c')
-rw-r--r-- | net/rxrpc/sendmsg.c | 232 |
1 files changed, 159 insertions, 73 deletions
diff --git a/net/rxrpc/sendmsg.c b/net/rxrpc/sendmsg.c index 9ea6f972767e..09f2a3e05221 100644 --- a/net/rxrpc/sendmsg.c +++ b/net/rxrpc/sendmsg.c @@ -21,21 +21,78 @@ #include <net/af_rxrpc.h> #include "ar-internal.h" -enum rxrpc_command { - RXRPC_CMD_SEND_DATA, /* send data message */ - RXRPC_CMD_SEND_ABORT, /* request abort generation */ - RXRPC_CMD_ACCEPT, /* [server] accept incoming call */ - RXRPC_CMD_REJECT_BUSY, /* [server] reject a call as busy */ -}; - -struct rxrpc_send_params { - s64 tx_total_len; /* Total Tx data length (if send data) */ - unsigned long user_call_ID; /* User's call ID */ - u32 abort_code; /* Abort code to Tx (if abort) */ - enum rxrpc_command command : 8; /* The command to implement */ - bool exclusive; /* Shared or exclusive call */ - bool upgrade; /* If the connection is upgradeable */ -}; +/* + * Wait for space to appear in the Tx queue or a signal to occur. + */ +static int rxrpc_wait_for_tx_window_intr(struct rxrpc_sock *rx, + struct rxrpc_call *call, + long *timeo) +{ + for (;;) { + set_current_state(TASK_INTERRUPTIBLE); + if (call->tx_top - call->tx_hard_ack < + min_t(unsigned int, call->tx_winsize, + call->cong_cwnd + call->cong_extra)) + return 0; + + if (call->state >= RXRPC_CALL_COMPLETE) + return call->error; + + if (signal_pending(current)) + return sock_intr_errno(*timeo); + + trace_rxrpc_transmit(call, rxrpc_transmit_wait); + mutex_unlock(&call->user_mutex); + *timeo = schedule_timeout(*timeo); + if (mutex_lock_interruptible(&call->user_mutex) < 0) + return sock_intr_errno(*timeo); + } +} + +/* + * Wait for space to appear in the Tx queue uninterruptibly, but with + * a timeout of 2*RTT if no progress was made and a signal occurred. + */ +static int rxrpc_wait_for_tx_window_nonintr(struct rxrpc_sock *rx, + struct rxrpc_call *call) +{ + rxrpc_seq_t tx_start, tx_win; + signed long rtt2, timeout; + u64 rtt; + + rtt = READ_ONCE(call->peer->rtt); + rtt2 = nsecs_to_jiffies64(rtt) * 2; + if (rtt2 < 1) + rtt2 = 1; + + timeout = rtt2; + tx_start = READ_ONCE(call->tx_hard_ack); + + for (;;) { + set_current_state(TASK_UNINTERRUPTIBLE); + + tx_win = READ_ONCE(call->tx_hard_ack); + if (call->tx_top - tx_win < + min_t(unsigned int, call->tx_winsize, + call->cong_cwnd + call->cong_extra)) + return 0; + + if (call->state >= RXRPC_CALL_COMPLETE) + return call->error; + + if (timeout == 0 && + tx_win == tx_start && signal_pending(current)) + return -EINTR; + + if (tx_win != tx_start) { + timeout = rtt2; + tx_start = tx_win; + } + + trace_rxrpc_transmit(call, rxrpc_transmit_wait); + timeout = schedule_timeout(timeout); + } +} /* * wait for space to appear in the transmit/ACK window @@ -43,7 +100,8 @@ struct rxrpc_send_params { */ static int rxrpc_wait_for_tx_window(struct rxrpc_sock *rx, struct rxrpc_call *call, - long *timeo) + long *timeo, + bool waitall) { DECLARE_WAITQUEUE(myself, current); int ret; @@ -53,30 +111,10 @@ static int rxrpc_wait_for_tx_window(struct rxrpc_sock *rx, add_wait_queue(&call->waitq, &myself); - for (;;) { - set_current_state(TASK_INTERRUPTIBLE); - ret = 0; - if (call->tx_top - call->tx_hard_ack < - min_t(unsigned int, call->tx_winsize, - call->cong_cwnd + call->cong_extra)) - break; - if (call->state >= RXRPC_CALL_COMPLETE) { - ret = call->error; - break; - } - if (signal_pending(current)) { - ret = sock_intr_errno(*timeo); - break; - } - - trace_rxrpc_transmit(call, rxrpc_transmit_wait); - mutex_unlock(&call->user_mutex); - *timeo = schedule_timeout(*timeo); - if (mutex_lock_interruptible(&call->user_mutex) < 0) { - ret = sock_intr_errno(*timeo); - break; - } - } + if (waitall) + ret = rxrpc_wait_for_tx_window_nonintr(rx, call); + else + ret = rxrpc_wait_for_tx_window_intr(rx, call, timeo); remove_wait_queue(&call->waitq, &myself); set_current_state(TASK_RUNNING); @@ -120,6 +158,7 @@ static void rxrpc_queue_packet(struct rxrpc_sock *rx, struct rxrpc_call *call, rxrpc_notify_end_tx_t notify_end_tx) { struct rxrpc_skb_priv *sp = rxrpc_skb(skb); + unsigned long now; rxrpc_seq_t seq = sp->hdr.seq; int ret, ix; u8 annotation = RXRPC_TX_ANNO_UNACK; @@ -159,13 +198,14 @@ static void rxrpc_queue_packet(struct rxrpc_sock *rx, struct rxrpc_call *call, break; case RXRPC_CALL_SERVER_ACK_REQUEST: call->state = RXRPC_CALL_SERVER_SEND_REPLY; - call->ack_at = call->expire_at; + now = jiffies; + WRITE_ONCE(call->ack_at, now + MAX_JIFFY_OFFSET); if (call->ackr_reason == RXRPC_ACK_DELAY) call->ackr_reason = 0; - __rxrpc_set_timer(call, rxrpc_timer_init_for_send_reply, - ktime_get_real()); + trace_rxrpc_timer(call, rxrpc_timer_init_for_send_reply, now); if (!last) break; + /* Fall through */ case RXRPC_CALL_SERVER_SEND_REPLY: call->state = RXRPC_CALL_SERVER_AWAIT_ACK; rxrpc_notify_end_tx(rx, call, notify_end_tx); @@ -184,14 +224,19 @@ static void rxrpc_queue_packet(struct rxrpc_sock *rx, struct rxrpc_call *call, _debug("need instant resend %d", ret); rxrpc_instant_resend(call, ix); } else { - ktime_t now = ktime_get_real(), resend_at; - - resend_at = ktime_add_ms(now, rxrpc_resend_timeout); - - if (ktime_before(resend_at, call->resend_at)) { - call->resend_at = resend_at; - rxrpc_set_timer(call, rxrpc_timer_set_for_send, now); - } + unsigned long now = jiffies, resend_at; + + if (call->peer->rtt_usage > 1) + resend_at = nsecs_to_jiffies(call->peer->rtt * 3 / 2); + else + resend_at = rxrpc_resend_timeout; + if (resend_at < 1) + resend_at = 1; + + resend_at += now; + WRITE_ONCE(call->resend_at, resend_at); + rxrpc_reduce_call_timer(call, resend_at, now, + rxrpc_timer_set_for_send); } rxrpc_free_skb(skb, rxrpc_skb_tx_freed); @@ -240,7 +285,7 @@ static int rxrpc_send_data(struct rxrpc_sock *rx, do { /* Check to see if there's a ping ACK to reply to. */ if (call->ackr_reason == RXRPC_ACK_PING_RESPONSE) - rxrpc_send_ack_packet(call, false); + rxrpc_send_ack_packet(call, false, NULL); if (!skb) { size_t size, chunk, max, space; @@ -254,7 +299,8 @@ static int rxrpc_send_data(struct rxrpc_sock *rx, if (msg->msg_flags & MSG_DONTWAIT) goto maybe_error; ret = rxrpc_wait_for_tx_window(rx, call, - &timeo); + &timeo, + msg->msg_flags & MSG_WAITALL); if (ret < 0) goto maybe_error; } @@ -424,11 +470,11 @@ static int rxrpc_sendmsg_cmsg(struct msghdr *msg, struct rxrpc_send_params *p) if (msg->msg_flags & MSG_CMSG_COMPAT) { if (len != sizeof(u32)) return -EINVAL; - p->user_call_ID = *(u32 *)CMSG_DATA(cmsg); + p->call.user_call_ID = *(u32 *)CMSG_DATA(cmsg); } else { if (len != sizeof(unsigned long)) return -EINVAL; - p->user_call_ID = *(unsigned long *) + p->call.user_call_ID = *(unsigned long *) CMSG_DATA(cmsg); } got_user_ID = true; @@ -466,13 +512,26 @@ static int rxrpc_sendmsg_cmsg(struct msghdr *msg, struct rxrpc_send_params *p) break; case RXRPC_TX_LENGTH: - if (p->tx_total_len != -1 || len != sizeof(__s64)) + if (p->call.tx_total_len != -1 || len != sizeof(__s64)) return -EINVAL; - p->tx_total_len = *(__s64 *)CMSG_DATA(cmsg); - if (p->tx_total_len < 0) + p->call.tx_total_len = *(__s64 *)CMSG_DATA(cmsg); + if (p->call.tx_total_len < 0) return -EINVAL; break; + case RXRPC_SET_CALL_TIMEOUT: + if (len & 3 || len < 4 || len > 12) + return -EINVAL; + memcpy(&p->call.timeouts, CMSG_DATA(cmsg), len); + p->call.nr_timeouts = len / 4; + if (p->call.timeouts.hard > INT_MAX / HZ) + return -ERANGE; + if (p->call.nr_timeouts >= 2 && p->call.timeouts.idle > 60 * 60 * 1000) + return -ERANGE; + if (p->call.nr_timeouts >= 3 && p->call.timeouts.normal > 60 * 60 * 1000) + return -ERANGE; + break; + default: return -EINVAL; } @@ -480,7 +539,7 @@ static int rxrpc_sendmsg_cmsg(struct msghdr *msg, struct rxrpc_send_params *p) if (!got_user_ID) return -EINVAL; - if (p->tx_total_len != -1 && p->command != RXRPC_CMD_SEND_DATA) + if (p->call.tx_total_len != -1 && p->command != RXRPC_CMD_SEND_DATA) return -EINVAL; _leave(" = 0"); return 0; @@ -520,8 +579,7 @@ rxrpc_new_client_call_for_sendmsg(struct rxrpc_sock *rx, struct msghdr *msg, cp.exclusive = rx->exclusive | p->exclusive; cp.upgrade = p->upgrade; cp.service_id = srx->srx_service; - call = rxrpc_new_client_call(rx, &cp, srx, p->user_call_ID, - p->tx_total_len, GFP_KERNEL); + call = rxrpc_new_client_call(rx, &cp, srx, &p->call, GFP_KERNEL); /* The socket is now unlocked */ _leave(" = %p\n", call); @@ -538,15 +596,17 @@ int rxrpc_do_sendmsg(struct rxrpc_sock *rx, struct msghdr *msg, size_t len) { enum rxrpc_call_state state; struct rxrpc_call *call; + unsigned long now, j; int ret; struct rxrpc_send_params p = { - .tx_total_len = -1, - .user_call_ID = 0, - .abort_code = 0, - .command = RXRPC_CMD_SEND_DATA, - .exclusive = false, - .upgrade = true, + .call.tx_total_len = -1, + .call.user_call_ID = 0, + .call.nr_timeouts = 0, + .abort_code = 0, + .command = RXRPC_CMD_SEND_DATA, + .exclusive = false, + .upgrade = false, }; _enter(""); @@ -559,15 +619,15 @@ int rxrpc_do_sendmsg(struct rxrpc_sock *rx, struct msghdr *msg, size_t len) ret = -EINVAL; if (rx->sk.sk_state != RXRPC_SERVER_LISTENING) goto error_release_sock; - call = rxrpc_accept_call(rx, p.user_call_ID, NULL); + call = rxrpc_accept_call(rx, p.call.user_call_ID, NULL); /* The socket is now unlocked. */ if (IS_ERR(call)) return PTR_ERR(call); - rxrpc_put_call(call, rxrpc_call_put); - return 0; + ret = 0; + goto out_put_unlock; } - call = rxrpc_find_call_by_user_ID(rx, p.user_call_ID); + call = rxrpc_find_call_by_user_ID(rx, p.call.user_call_ID); if (!call) { ret = -EBADSLT; if (p.command != RXRPC_CMD_SEND_DATA) @@ -597,16 +657,41 @@ int rxrpc_do_sendmsg(struct rxrpc_sock *rx, struct msghdr *msg, size_t len) goto error_put; } - if (p.tx_total_len != -1) { + if (p.call.tx_total_len != -1) { ret = -EINVAL; if (call->tx_total_len != -1 || call->tx_pending || call->tx_top != 0) goto error_put; - call->tx_total_len = p.tx_total_len; + call->tx_total_len = p.call.tx_total_len; } } + switch (p.call.nr_timeouts) { + case 3: + j = msecs_to_jiffies(p.call.timeouts.normal); + if (p.call.timeouts.normal > 0 && j == 0) + j = 1; + WRITE_ONCE(call->next_rx_timo, j); + /* Fall through */ + case 2: + j = msecs_to_jiffies(p.call.timeouts.idle); + if (p.call.timeouts.idle > 0 && j == 0) + j = 1; + WRITE_ONCE(call->next_req_timo, j); + /* Fall through */ + case 1: + if (p.call.timeouts.hard > 0) { + j = msecs_to_jiffies(p.call.timeouts.hard); + now = jiffies; + j += now; + WRITE_ONCE(call->expect_term_by, j); + rxrpc_reduce_call_timer(call, j, now, + rxrpc_timer_set_for_hard); + } + break; + } + state = READ_ONCE(call->state); _debug("CALL %d USR %lx ST %d on CONN %p", call->debug_id, call->user_call_ID, state, call->conn); @@ -633,6 +718,7 @@ int rxrpc_do_sendmsg(struct rxrpc_sock *rx, struct msghdr *msg, size_t len) ret = rxrpc_send_data(rx, call, msg, len, NULL); } +out_put_unlock: mutex_unlock(&call->user_mutex); error_put: rxrpc_put_call(call, rxrpc_call_put); |