summaryrefslogtreecommitdiff
path: root/net
diff options
context:
space:
mode:
Diffstat (limited to 'net')
-rw-r--r--net/rxrpc/ar-internal.h29
-rw-r--r--net/rxrpc/call_object.c7
-rw-r--r--net/rxrpc/conn_object.c2
-rw-r--r--net/rxrpc/input.c213
-rw-r--r--net/rxrpc/misc.c5
-rw-r--r--net/rxrpc/output.c95
-rw-r--r--net/rxrpc/proc.c7
-rw-r--r--net/rxrpc/recvmsg.c117
-rw-r--r--net/rxrpc/rxkad.c1
-rw-r--r--net/rxrpc/sysctl.c2
10 files changed, 279 insertions, 199 deletions
diff --git a/net/rxrpc/ar-internal.h b/net/rxrpc/ar-internal.h
index e93ed18816b1..c6c5bb3d3688 100644
--- a/net/rxrpc/ar-internal.h
+++ b/net/rxrpc/ar-internal.h
@@ -198,7 +198,6 @@ struct rxrpc_skb_priv {
u16 remain;
u16 offset; /* Offset of data */
u16 len; /* Length of data */
- u8 rx_flags; /* Received packet flags */
u8 flags;
#define RXRPC_RX_VERIFIED 0x01
@@ -644,8 +643,20 @@ struct rxrpc_call {
rxrpc_seq_t tx_hard_ack; /* Dead slot in buffer; the first transmitted but
* not hard-ACK'd packet follows this.
*/
+
+ /* Transmitted data tracking. */
rxrpc_seq_t tx_top; /* Highest Tx slot allocated. */
u16 tx_backoff; /* Delay to insert due to Tx failure */
+ u8 tx_winsize; /* Maximum size of Tx window */
+
+ /* Received data tracking */
+ struct sk_buff_head recvmsg_queue; /* Queue of packets ready for recvmsg() */
+ struct sk_buff_head rx_oos_queue; /* Queue of out of sequence packets */
+
+ rxrpc_seq_t rx_highest_seq; /* Higest sequence number received */
+ rxrpc_seq_t rx_consumed; /* Highest packet consumed */
+ rxrpc_serial_t rx_serial; /* Highest serial received for this call */
+ u8 rx_winsize; /* Size of Rx window */
/* TCP-style slow-start congestion control [RFC5681]. Since the SMSS
* is fixed, we keep these numbers in terms of segments (ie. DATA
@@ -660,23 +671,19 @@ struct rxrpc_call {
u8 cong_cumul_acks; /* Cumulative ACK count */
ktime_t cong_tstamp; /* Last time cwnd was changed */
- rxrpc_seq_t rx_hard_ack; /* Dead slot in buffer; the first received but not
- * consumed packet follows this.
- */
- rxrpc_seq_t rx_top; /* Highest Rx slot allocated. */
- rxrpc_seq_t rx_expect_next; /* Expected next packet sequence number */
- rxrpc_serial_t rx_serial; /* Highest serial received for this call */
- u8 rx_winsize; /* Size of Rx window */
- u8 tx_winsize; /* Maximum size of Tx window */
-
spinlock_t input_lock; /* Lock for packet input to this call */
/* Receive-phase ACK management (ACKs we send). */
u8 ackr_reason; /* reason to ACK */
rxrpc_serial_t ackr_serial; /* serial of packet being ACK'd */
- rxrpc_seq_t ackr_highest_seq; /* Higest sequence number received */
+ atomic64_t ackr_window; /* Base (in LSW) and top (in MSW) of SACK window */
atomic_t ackr_nr_unacked; /* Number of unacked packets */
atomic_t ackr_nr_consumed; /* Number of packets needing hard ACK */
+ struct {
+#define RXRPC_SACK_SIZE 256
+ /* SACK table for soft-acked packets */
+ u8 ackr_sack_table[RXRPC_SACK_SIZE];
+ } __aligned(8);
/* RTT management */
rxrpc_serial_t rtt_serial[4]; /* Serial number of DATA or PING sent */
diff --git a/net/rxrpc/call_object.c b/net/rxrpc/call_object.c
index 8f9e88897197..4d8601c6a32d 100644
--- a/net/rxrpc/call_object.c
+++ b/net/rxrpc/call_object.c
@@ -155,6 +155,8 @@ struct rxrpc_call *rxrpc_alloc_call(struct rxrpc_sock *rx, gfp_t gfp,
INIT_LIST_HEAD(&call->accept_link);
INIT_LIST_HEAD(&call->recvmsg_link);
INIT_LIST_HEAD(&call->sock_link);
+ skb_queue_head_init(&call->recvmsg_queue);
+ skb_queue_head_init(&call->rx_oos_queue);
init_waitqueue_head(&call->waitq);
spin_lock_init(&call->lock);
spin_lock_init(&call->notify_lock);
@@ -165,13 +167,12 @@ struct rxrpc_call *rxrpc_alloc_call(struct rxrpc_sock *rx, gfp_t gfp,
call->tx_total_len = -1;
call->next_rx_timo = 20 * HZ;
call->next_req_timo = 1 * HZ;
+ atomic64_set(&call->ackr_window, 0x100000001ULL);
memset(&call->sock_node, 0xed, sizeof(call->sock_node));
- /* Leave space in the ring to handle a maxed-out jumbo packet */
call->rx_winsize = rxrpc_rx_window_size;
call->tx_winsize = 16;
- call->rx_expect_next = 1;
call->cong_cwnd = 2;
call->cong_ssthresh = RXRPC_RXTX_BUFF_SIZE - 1;
@@ -519,6 +520,8 @@ static void rxrpc_cleanup_ring(struct rxrpc_call *call)
rxrpc_free_skb(call->rxtx_buffer[i], rxrpc_skb_cleaned);
call->rxtx_buffer[i] = NULL;
}
+ skb_queue_purge(&call->recvmsg_queue);
+ skb_queue_purge(&call->rx_oos_queue);
}
/*
diff --git a/net/rxrpc/conn_object.c b/net/rxrpc/conn_object.c
index 22089e37e97f..f7ea71ae6159 100644
--- a/net/rxrpc/conn_object.c
+++ b/net/rxrpc/conn_object.c
@@ -175,7 +175,7 @@ void __rxrpc_disconnect_call(struct rxrpc_connection *conn,
trace_rxrpc_disconnect_call(call);
switch (call->completion) {
case RXRPC_CALL_SUCCEEDED:
- chan->last_seq = call->rx_hard_ack;
+ chan->last_seq = call->rx_highest_seq;
chan->last_type = RXRPC_PACKET_TYPE_ACK;
break;
case RXRPC_CALL_LOCALLY_ABORTED:
diff --git a/net/rxrpc/input.c b/net/rxrpc/input.c
index 0e7545ed0128..947e7196e2be 100644
--- a/net/rxrpc/input.c
+++ b/net/rxrpc/input.c
@@ -312,18 +312,43 @@ static bool rxrpc_receiving_reply(struct rxrpc_call *call)
return rxrpc_end_tx_phase(call, true, "ETD");
}
+static void rxrpc_input_update_ack_window(struct rxrpc_call *call,
+ rxrpc_seq_t window, rxrpc_seq_t wtop)
+{
+ atomic64_set_release(&call->ackr_window, ((u64)wtop) << 32 | window);
+}
+
/*
- * Process a DATA packet, adding the packet to the Rx ring. The caller's
- * packet ref must be passed on or discarded.
+ * Push a DATA packet onto the Rx queue.
+ */
+static void rxrpc_input_queue_data(struct rxrpc_call *call, struct sk_buff *skb,
+ rxrpc_seq_t window, rxrpc_seq_t wtop,
+ enum rxrpc_receive_trace why)
+{
+ struct rxrpc_skb_priv *sp = rxrpc_skb(skb);
+ bool last = sp->hdr.flags & RXRPC_LAST_PACKET;
+
+ __skb_queue_tail(&call->recvmsg_queue, skb);
+ rxrpc_input_update_ack_window(call, window, wtop);
+
+ trace_rxrpc_receive(call, last ? why + 1 : why, sp->hdr.serial, sp->hdr.seq);
+}
+
+/*
+ * Process a DATA packet.
*/
static void rxrpc_input_data_one(struct rxrpc_call *call, struct sk_buff *skb)
{
struct rxrpc_skb_priv *sp = rxrpc_skb(skb);
+ struct sk_buff *oos;
rxrpc_serial_t serial = sp->hdr.serial;
- rxrpc_seq_t seq = sp->hdr.seq, hard_ack;
- unsigned int ix = seq & RXRPC_RXTX_BUFF_MASK;
+ u64 win = atomic64_read(&call->ackr_window);
+ rxrpc_seq_t window = lower_32_bits(win);
+ rxrpc_seq_t wtop = upper_32_bits(win);
+ rxrpc_seq_t wlimit = window + call->rx_winsize - 1;
+ rxrpc_seq_t seq = sp->hdr.seq;
bool last = sp->hdr.flags & RXRPC_LAST_PACKET;
- bool acked = false;
+ int ack_reason = -1;
rxrpc_inc_stat(call->rxnet, stat_rx_data);
if (sp->hdr.flags & RXRPC_REQUEST_ACK)
@@ -331,112 +356,135 @@ static void rxrpc_input_data_one(struct rxrpc_call *call, struct sk_buff *skb)
if (sp->hdr.flags & RXRPC_JUMBO_PACKET)
rxrpc_inc_stat(call->rxnet, stat_rx_data_jumbo);
- hard_ack = READ_ONCE(call->rx_hard_ack);
-
- _proto("Rx DATA %%%u { #%x l=%u }", serial, seq, last);
-
if (last) {
- if (test_bit(RXRPC_CALL_RX_LAST, &call->flags) &&
- seq != call->rx_top) {
+ if (test_and_set_bit(RXRPC_CALL_RX_LAST, &call->flags) &&
+ seq + 1 != wtop) {
rxrpc_proto_abort("LSN", call, seq);
- goto out;
+ goto err_free;
}
} else {
if (test_bit(RXRPC_CALL_RX_LAST, &call->flags) &&
- after_eq(seq, call->rx_top)) {
+ after_eq(seq, wtop)) {
+ pr_warn("Packet beyond last: c=%x q=%x window=%x-%x wlimit=%x\n",
+ call->debug_id, seq, window, wtop, wlimit);
rxrpc_proto_abort("LSA", call, seq);
- goto out;
+ goto err_free;
}
}
+ if (after(seq, call->rx_highest_seq))
+ call->rx_highest_seq = seq;
+
trace_rxrpc_rx_data(call->debug_id, seq, serial, sp->hdr.flags);
- if (before_eq(seq, hard_ack)) {
- rxrpc_send_ACK(call, RXRPC_ACK_DUPLICATE, serial,
- rxrpc_propose_ack_input_data);
- goto out;
+ if (before(seq, window)) {
+ ack_reason = RXRPC_ACK_DUPLICATE;
+ goto send_ack;
}
-
- if (call->rxtx_buffer[ix]) {
- rxrpc_send_ACK(call, RXRPC_ACK_DUPLICATE, serial,
- rxrpc_propose_ack_input_data);
- goto out;
+ if (after(seq, wlimit)) {
+ ack_reason = RXRPC_ACK_EXCEEDS_WINDOW;
+ goto send_ack;
}
- if (after(seq, hard_ack + call->rx_winsize)) {
- rxrpc_send_ACK(call, RXRPC_ACK_EXCEEDS_WINDOW, serial,
- rxrpc_propose_ack_input_data);
- goto out;
- }
+ /* Queue the packet. */
+ if (seq == window) {
+ rxrpc_seq_t reset_from;
+ bool reset_sack = false;
- if (sp->hdr.flags & RXRPC_REQUEST_ACK) {
- rxrpc_send_ACK(call, RXRPC_ACK_REQUESTED, serial,
- rxrpc_propose_ack_input_data);
- acked = true;
- }
+ if (sp->hdr.flags & RXRPC_REQUEST_ACK)
+ ack_reason = RXRPC_ACK_REQUESTED;
+ /* Send an immediate ACK if we fill in a hole */
+ else if (!skb_queue_empty(&call->rx_oos_queue))
+ ack_reason = RXRPC_ACK_DELAY;
- if (after(seq, call->ackr_highest_seq))
- call->ackr_highest_seq = seq;
+ window++;
+ if (after(window, wtop))
+ wtop = window;
- /* Queue the packet. We use a couple of memory barriers here as need
- * to make sure that rx_top is perceived to be set after the buffer
- * pointer and that the buffer pointer is set after the annotation and
- * the skb data.
- *
- * Barriers against rxrpc_recvmsg_data() and rxrpc_rotate_rx_window()
- * and also rxrpc_fill_out_ack().
- */
- call->rxtx_annotations[ix] = 1;
- smp_wmb();
- call->rxtx_buffer[ix] = skb;
- if (after(seq, call->rx_top)) {
- smp_store_release(&call->rx_top, seq);
- } else if (before(seq, call->rx_top)) {
- /* Send an immediate ACK if we fill in a hole */
- if (!acked) {
- rxrpc_send_ACK(call, RXRPC_ACK_DELAY, serial,
- rxrpc_propose_ack_input_data_hole);
- acked = true;
+ spin_lock(&call->recvmsg_queue.lock);
+ rxrpc_input_queue_data(call, skb, window, wtop, rxrpc_receive_queue);
+ skb = NULL;
+
+ while ((oos = skb_peek(&call->rx_oos_queue))) {
+ struct rxrpc_skb_priv *osp = rxrpc_skb(oos);
+
+ if (after(osp->hdr.seq, window))
+ break;
+
+ __skb_unlink(oos, &call->rx_oos_queue);
+ last = osp->hdr.flags & RXRPC_LAST_PACKET;
+ seq = osp->hdr.seq;
+ if (!reset_sack) {
+ reset_from = seq;
+ reset_sack = true;
+ }
+
+ window++;
+ rxrpc_input_queue_data(call, oos, window, wtop,
+ rxrpc_receive_queue_oos);
}
- }
- /* From this point on, we're not allowed to touch the packet any longer
- * as its ref now belongs to the Rx ring.
- */
- skb = NULL;
- sp = NULL;
+ spin_unlock(&call->recvmsg_queue.lock);
- if (last) {
- set_bit(RXRPC_CALL_RX_LAST, &call->flags);
- trace_rxrpc_receive(call, rxrpc_receive_queue_last, serial, seq);
+ if (reset_sack) {
+ do {
+ call->ackr_sack_table[reset_from % RXRPC_SACK_SIZE] = 0;
+ } while (reset_from++, before(reset_from, window));
+ }
} else {
- trace_rxrpc_receive(call, rxrpc_receive_queue, serial, seq);
- }
+ bool keep = false;
+
+ ack_reason = RXRPC_ACK_OUT_OF_SEQUENCE;
+
+ if (!call->ackr_sack_table[seq % RXRPC_SACK_SIZE]) {
+ call->ackr_sack_table[seq % RXRPC_SACK_SIZE] = 1;
+ keep = 1;
+ }
+
+ if (after(seq + 1, wtop)) {
+ wtop = seq + 1;
+ rxrpc_input_update_ack_window(call, window, wtop);
+ }
+
+ if (!keep) {
+ ack_reason = RXRPC_ACK_DUPLICATE;
+ goto send_ack;
+ }
+
+ skb_queue_walk(&call->rx_oos_queue, oos) {
+ struct rxrpc_skb_priv *osp = rxrpc_skb(oos);
- if (after_eq(seq, call->rx_expect_next)) {
- if (after(seq, call->rx_expect_next)) {
- _net("OOS %u > %u", seq, call->rx_expect_next);
- rxrpc_send_ACK(call, RXRPC_ACK_OUT_OF_SEQUENCE, serial,
- rxrpc_propose_ack_input_data);
- acked = true;
+ if (after(osp->hdr.seq, seq)) {
+ __skb_queue_before(&call->rx_oos_queue, oos, skb);
+ goto oos_queued;
+ }
}
- call->rx_expect_next = seq + 1;
+
+ __skb_queue_tail(&call->rx_oos_queue, skb);
+ oos_queued:
+ trace_rxrpc_receive(call, last ? rxrpc_receive_oos_last : rxrpc_receive_oos,
+ sp->hdr.serial, sp->hdr.seq);
+ skb = NULL;
}
-out:
- if (!acked &&
- atomic_inc_return(&call->ackr_nr_unacked) > 2)
- rxrpc_send_ACK(call, RXRPC_ACK_IDLE, serial,
+send_ack:
+ if (ack_reason < 0 &&
+ atomic_inc_return(&call->ackr_nr_unacked) > 2 &&
+ test_and_set_bit(RXRPC_CALL_IDLE_ACK_PENDING, &call->flags)) {
+ ack_reason = RXRPC_ACK_IDLE;
+ } else if (ack_reason >= 0) {
+ set_bit(RXRPC_CALL_IDLE_ACK_PENDING, &call->flags);
+ }
+
+ if (ack_reason >= 0)
+ rxrpc_send_ACK(call, ack_reason, serial,
rxrpc_propose_ack_input_data);
else
rxrpc_propose_delay_ACK(call, serial,
rxrpc_propose_ack_input_data);
- trace_rxrpc_notify_socket(call->debug_id, serial);
- rxrpc_notify_socket(call);
-
+err_free:
rxrpc_free_skb(skb, rxrpc_skb_freed);
- _leave(" [queued]");
}
/*
@@ -498,8 +546,9 @@ static void rxrpc_input_data(struct rxrpc_call *call, struct sk_buff *skb)
rxrpc_serial_t serial = sp->hdr.serial;
rxrpc_seq_t seq0 = sp->hdr.seq;
- _enter("{%u,%u},{%u,%u}",
- call->rx_hard_ack, call->rx_top, skb->len, seq0);
+ _enter("{%llx,%x},{%u,%x}",
+ atomic64_read(&call->ackr_window), call->rx_highest_seq,
+ skb->len, seq0);
_proto("Rx DATA %%%u { #%u f=%02x }",
sp->hdr.serial, seq0, sp->hdr.flags);
diff --git a/net/rxrpc/misc.c b/net/rxrpc/misc.c
index f5f03f27bd6c..056c428d8bf3 100644
--- a/net/rxrpc/misc.c
+++ b/net/rxrpc/misc.c
@@ -40,10 +40,7 @@ unsigned long rxrpc_idle_ack_delay = HZ / 2;
* limit is hit, we should generate an EXCEEDS_WINDOW ACK and discard further
* packets.
*/
-unsigned int rxrpc_rx_window_size = RXRPC_INIT_RX_WINDOW_SIZE;
-#if (RXRPC_RXTX_BUFF_SIZE - 1) < RXRPC_INIT_RX_WINDOW_SIZE
-#error Need to reduce RXRPC_INIT_RX_WINDOW_SIZE
-#endif
+unsigned int rxrpc_rx_window_size = 255;
/*
* Maximum Rx MTU size. This indicates to the sender the size of jumbo packet
diff --git a/net/rxrpc/output.c b/net/rxrpc/output.c
index f7bb792c8aa1..0a4f37d7b6b5 100644
--- a/net/rxrpc/output.c
+++ b/net/rxrpc/output.c
@@ -74,47 +74,64 @@ static void rxrpc_set_keepalive(struct rxrpc_call *call)
*/
static size_t rxrpc_fill_out_ack(struct rxrpc_connection *conn,
struct rxrpc_call *call,
- struct rxrpc_txbuf *txb,
- rxrpc_seq_t *_hard_ack,
- rxrpc_seq_t *_top)
+ struct rxrpc_txbuf *txb)
{
struct rxrpc_ackinfo ackinfo;
- unsigned int tmp;
- rxrpc_seq_t hard_ack, top, seq;
- int ix;
+ unsigned int qsize;
+ rxrpc_seq_t window, wtop, wrap_point, ix, first;
+ int rsize;
+ u64 wtmp;
u32 mtu, jmax;
u8 *ackp = txb->acks;
+ u8 sack_buffer[sizeof(call->ackr_sack_table)] __aligned(8);
- tmp = atomic_xchg(&call->ackr_nr_unacked, 0);
- tmp |= atomic_xchg(&call->ackr_nr_consumed, 0);
- if (!tmp && (txb->ack.reason == RXRPC_ACK_DELAY ||
- txb->ack.reason == RXRPC_ACK_IDLE)) {
- rxrpc_inc_stat(call->rxnet, stat_tx_ack_skip);
- return 0;
- }
-
+ atomic_set(&call->ackr_nr_unacked, 0);
+ atomic_set(&call->ackr_nr_consumed, 0);
rxrpc_inc_stat(call->rxnet, stat_tx_ack_fill);
/* Barrier against rxrpc_input_data(). */
- hard_ack = READ_ONCE(call->rx_hard_ack);
- top = smp_load_acquire(&call->rx_top);
- *_hard_ack = hard_ack;
- *_top = top;
-
- txb->ack.firstPacket = htonl(hard_ack + 1);
- txb->ack.previousPacket = htonl(call->ackr_highest_seq);
- txb->ack.nAcks = top - hard_ack;
-
- if (txb->ack.nAcks) {
- seq = hard_ack + 1;
- do {
- ix = seq & RXRPC_RXTX_BUFF_MASK;
- if (call->rxtx_buffer[ix])
- *ackp++ = RXRPC_ACK_TYPE_ACK;
- else
- *ackp++ = RXRPC_ACK_TYPE_NACK;
- seq++;
- } while (before_eq(seq, top));
+retry:
+ wtmp = atomic64_read_acquire(&call->ackr_window);
+ window = lower_32_bits(wtmp);
+ wtop = upper_32_bits(wtmp);
+ txb->ack.firstPacket = htonl(window);
+ txb->ack.nAcks = 0;
+
+ if (after(wtop, window)) {
+ /* Try to copy the SACK ring locklessly. We can use the copy,
+ * only if the now-current top of the window didn't go past the
+ * previously read base - otherwise we can't know whether we
+ * have old data or new data.
+ */
+ memcpy(sack_buffer, call->ackr_sack_table, sizeof(sack_buffer));
+ wrap_point = window + RXRPC_SACK_SIZE - 1;
+ wtmp = atomic64_read_acquire(&call->ackr_window);
+ window = lower_32_bits(wtmp);
+ wtop = upper_32_bits(wtmp);
+ if (after(wtop, wrap_point)) {
+ cond_resched();
+ goto retry;
+ }
+
+ /* The buffer is maintained as a ring with an invariant mapping
+ * between bit position and sequence number, so we'll probably
+ * need to rotate it.
+ */
+ txb->ack.nAcks = wtop - window;
+ ix = window % RXRPC_SACK_SIZE;
+ first = sizeof(sack_buffer) - ix;
+
+ if (ix + txb->ack.nAcks <= RXRPC_SACK_SIZE) {
+ memcpy(txb->acks, sack_buffer + ix, txb->ack.nAcks);
+ } else {
+ memcpy(txb->acks, sack_buffer + ix, first);
+ memcpy(txb->acks + first, sack_buffer,
+ txb->ack.nAcks - first);
+ }
+
+ ackp += txb->ack.nAcks;
+ } else if (before(wtop, window)) {
+ pr_warn("ack window backward %x %x", window, wtop);
} else if (txb->ack.reason == RXRPC_ACK_DELAY) {
txb->ack.reason = RXRPC_ACK_IDLE;
}
@@ -122,16 +139,18 @@ static size_t rxrpc_fill_out_ack(struct rxrpc_connection *conn,
mtu = conn->params.peer->if_mtu;
mtu -= conn->params.peer->hdrsize;
jmax = rxrpc_rx_jumbo_max;
+ qsize = (window - 1) - call->rx_consumed;
+ rsize = max_t(int, call->rx_winsize - qsize, 0);
ackinfo.rxMTU = htonl(rxrpc_rx_mtu);
ackinfo.maxMTU = htonl(mtu);
- ackinfo.rwind = htonl(call->rx_winsize);
+ ackinfo.rwind = htonl(rsize);
ackinfo.jumbo_max = htonl(jmax);
*ackp++ = 0;
*ackp++ = 0;
*ackp++ = 0;
memcpy(ackp, &ackinfo, sizeof(ackinfo));
- return top - hard_ack + 3 + sizeof(ackinfo);
+ return txb->ack.nAcks + 3 + sizeof(ackinfo);
}
/*
@@ -188,7 +207,6 @@ static int rxrpc_send_ack_packet(struct rxrpc_local *local, struct rxrpc_txbuf *
struct msghdr msg;
struct kvec iov[1];
rxrpc_serial_t serial;
- rxrpc_seq_t hard_ack, top;
size_t len, n;
int ret, rtt_slot = -1;
@@ -212,7 +230,7 @@ static int rxrpc_send_ack_packet(struct rxrpc_local *local, struct rxrpc_txbuf *
clear_bit(RXRPC_CALL_IDLE_ACK_PENDING, &call->flags);
spin_lock_bh(&call->lock);
- n = rxrpc_fill_out_ack(conn, call, txb, &hard_ack, &top);
+ n = rxrpc_fill_out_ack(conn, call, txb);
spin_unlock_bh(&call->lock);
if (n == 0) {
kfree(pkt);
@@ -236,6 +254,9 @@ static int rxrpc_send_ack_packet(struct rxrpc_local *local, struct rxrpc_txbuf *
rxrpc_inc_stat(call->rxnet, stat_tx_ack_send);
+ /* Grab the highest received seq as late as possible */
+ txb->ack.previousPacket = htonl(call->rx_highest_seq);
+
iov_iter_kvec(&msg.msg_iter, WRITE, iov, 1, len);
ret = do_udp_sendmsg(conn->params.local->socket, &msg, len);
call->peer->last_tx_at = ktime_get_seconds();
diff --git a/net/rxrpc/proc.c b/net/rxrpc/proc.c
index 3877afd23598..d48af0178866 100644
--- a/net/rxrpc/proc.c
+++ b/net/rxrpc/proc.c
@@ -54,8 +54,9 @@ static int rxrpc_call_seq_show(struct seq_file *seq, void *v)
struct rxrpc_call *call;
struct rxrpc_net *rxnet = rxrpc_net(seq_file_net(seq));
unsigned long timeout = 0;
- rxrpc_seq_t tx_hard_ack, rx_hard_ack;
+ rxrpc_seq_t tx_hard_ack;
char lbuff[50], rbuff[50];
+ u64 wtmp;
if (v == &rxnet->calls) {
seq_puts(seq,
@@ -91,7 +92,7 @@ static int rxrpc_call_seq_show(struct seq_file *seq, void *v)
}
tx_hard_ack = READ_ONCE(call->tx_hard_ack);
- rx_hard_ack = READ_ONCE(call->rx_hard_ack);
+ wtmp = atomic64_read_acquire(&call->ackr_window);
seq_printf(seq,
"UDP %-47.47s %-47.47s %4x %08x %08x %s %3u"
" %-8.8s %08x %08x %08x %02x %08x %02x %08x %06lx\n",
@@ -106,7 +107,7 @@ static int rxrpc_call_seq_show(struct seq_file *seq, void *v)
call->abort_code,
call->debug_id,
tx_hard_ack, READ_ONCE(call->tx_top) - tx_hard_ack,
- rx_hard_ack, READ_ONCE(call->rx_top) - rx_hard_ack,
+ lower_32_bits(wtmp), upper_32_bits(wtmp) - lower_32_bits(wtmp),
call->rx_serial,
timeout);
diff --git a/net/rxrpc/recvmsg.c b/net/rxrpc/recvmsg.c
index 401aae687830..efb85f983657 100644
--- a/net/rxrpc/recvmsg.c
+++ b/net/rxrpc/recvmsg.c
@@ -173,7 +173,8 @@ static int rxrpc_recvmsg_term(struct rxrpc_call *call, struct msghdr *msg)
break;
}
- trace_rxrpc_recvdata(call, rxrpc_recvmsg_terminal, call->rx_hard_ack,
+ trace_rxrpc_recvdata(call, rxrpc_recvmsg_terminal,
+ lower_32_bits(atomic64_read(&call->ackr_window)) - 1,
call->rx_pkt_offset, call->rx_pkt_len, ret);
return ret;
}
@@ -183,10 +184,11 @@ static int rxrpc_recvmsg_term(struct rxrpc_call *call, struct msghdr *msg)
*/
static void rxrpc_end_rx_phase(struct rxrpc_call *call, rxrpc_serial_t serial)
{
+ rxrpc_seq_t whigh = READ_ONCE(call->rx_highest_seq);
+
_enter("%d,%s", call->debug_id, rxrpc_call_states[call->state]);
- trace_rxrpc_receive(call, rxrpc_receive_end, 0, call->rx_top);
- ASSERTCMP(call->rx_hard_ack, ==, call->rx_top);
+ trace_rxrpc_receive(call, rxrpc_receive_end, 0, whigh);
if (call->state == RXRPC_CALL_CLIENT_RECV_REPLY)
rxrpc_propose_delay_ACK(call, serial, rxrpc_propose_ack_terminal_ack);
@@ -220,45 +222,53 @@ static void rxrpc_rotate_rx_window(struct rxrpc_call *call)
struct rxrpc_skb_priv *sp;
struct sk_buff *skb;
rxrpc_serial_t serial;
- rxrpc_seq_t hard_ack, top;
- bool last = false;
- int ix;
+ rxrpc_seq_t old_consumed = call->rx_consumed, tseq;
+ bool last;
+ int acked;
_enter("%d", call->debug_id);
- hard_ack = call->rx_hard_ack;
- top = smp_load_acquire(&call->rx_top);
- ASSERT(before(hard_ack, top));
-
- hard_ack++;
- ix = hard_ack & RXRPC_RXTX_BUFF_MASK;
- skb = call->rxtx_buffer[ix];
+further_rotation:
+ skb = skb_dequeue(&call->recvmsg_queue);
rxrpc_see_skb(skb, rxrpc_skb_rotated);
- sp = rxrpc_skb(skb);
+ sp = rxrpc_skb(skb);
+ tseq = sp->hdr.seq;
serial = sp->hdr.serial;
+ last = sp->hdr.flags & RXRPC_LAST_PACKET;
- if (sp->hdr.flags & RXRPC_LAST_PACKET)
- last = true;
-
- call->rxtx_buffer[ix] = NULL;
- call->rxtx_annotations[ix] = 0;
/* Barrier against rxrpc_input_data(). */
- smp_store_release(&call->rx_hard_ack, hard_ack);
+ if (after(tseq, call->rx_consumed))
+ smp_store_release(&call->rx_consumed, tseq);
rxrpc_free_skb(skb, rxrpc_skb_freed);
- trace_rxrpc_receive(call, rxrpc_receive_rotate, serial, hard_ack);
+ trace_rxrpc_receive(call, last ? rxrpc_receive_rotate_last : rxrpc_receive_rotate,
+ serial, call->rx_consumed);
if (last) {
rxrpc_end_rx_phase(call, serial);
- } else {
- /* Check to see if there's an ACK that needs sending. */
- if (atomic_inc_return(&call->ackr_nr_consumed) > 2 &&
- !test_and_set_bit(RXRPC_CALL_IDLE_ACK_PENDING, &call->flags)) {
- rxrpc_send_ACK(call, RXRPC_ACK_IDLE, serial,
- rxrpc_propose_ack_rotate_rx);
- rxrpc_transmit_ack_packets(call->peer->local);
- }
+ return;
+ }
+
+ /* The next packet on the queue might entirely overlap with the one we
+ * just consumed; if so, rotate that away also.
+ */
+ skb = skb_peek(&call->recvmsg_queue);
+ if (skb) {
+ sp = rxrpc_skb(skb);
+ if (sp->hdr.seq != call->rx_consumed &&
+ after_eq(call->rx_consumed, sp->hdr.seq))
+ goto further_rotation;
+ }
+
+ /* Check to see if there's an ACK that needs sending. */
+ acked = atomic_add_return(call->rx_consumed - old_consumed,
+ &call->ackr_nr_consumed);
+ if (acked > 2 &&
+ !test_and_set_bit(RXRPC_CALL_IDLE_ACK_PENDING, &call->flags)) {
+ rxrpc_send_ACK(call, RXRPC_ACK_IDLE, serial,
+ rxrpc_propose_ack_rotate_rx);
+ rxrpc_transmit_ack_packets(call->peer->local);
}
}
@@ -285,46 +295,38 @@ static int rxrpc_recvmsg_data(struct socket *sock, struct rxrpc_call *call,
{
struct rxrpc_skb_priv *sp;
struct sk_buff *skb;
- rxrpc_serial_t serial;
- rxrpc_seq_t hard_ack, top, seq;
+ rxrpc_seq_t seq = 0;
size_t remain;
unsigned int rx_pkt_offset, rx_pkt_len;
- int ix, copy, ret = -EAGAIN, ret2;
+ int copy, ret = -EAGAIN, ret2;
rx_pkt_offset = call->rx_pkt_offset;
rx_pkt_len = call->rx_pkt_len;
if (call->state >= RXRPC_CALL_SERVER_ACK_REQUEST) {
- seq = call->rx_hard_ack;
+ seq = lower_32_bits(atomic64_read(&call->ackr_window)) - 1;
ret = 1;
goto done;
}
- /* Barriers against rxrpc_input_data(). */
- hard_ack = call->rx_hard_ack;
- seq = hard_ack + 1;
-
- while (top = smp_load_acquire(&call->rx_top),
- before_eq(seq, top)
- ) {
- ix = seq & RXRPC_RXTX_BUFF_MASK;
- skb = call->rxtx_buffer[ix];
- if (!skb) {
- trace_rxrpc_recvdata(call, rxrpc_recvmsg_hole, seq,
- rx_pkt_offset, rx_pkt_len, 0);
- rxrpc_transmit_ack_packets(call->peer->local);
- break;
- }
- smp_rmb();
+ /* No one else can be removing stuff from the queue, so we shouldn't
+ * need the Rx lock to walk it.
+ */
+ skb = skb_peek(&call->recvmsg_queue);
+ while (skb) {
rxrpc_see_skb(skb, rxrpc_skb_seen);
sp = rxrpc_skb(skb);
+ seq = sp->hdr.seq;
- if (!(flags & MSG_PEEK)) {
- serial = sp->hdr.serial;
- trace_rxrpc_receive(call, rxrpc_receive_front,
- serial, seq);
+ if (after_eq(call->rx_consumed, seq)) {
+ kdebug("obsolete %x %x", call->rx_consumed, seq);
+ goto skip_obsolete;
}
+ if (!(flags & MSG_PEEK))
+ trace_rxrpc_receive(call, rxrpc_receive_front,
+ sp->hdr.serial, seq);
+
if (msg)
sock_recv_timestamp(msg, sock->sk, skb);
@@ -338,6 +340,7 @@ static int rxrpc_recvmsg_data(struct socket *sock, struct rxrpc_call *call,
ret = ret2;
goto out;
}
+ rxrpc_transmit_ack_packets(call->peer->local);
} else {
trace_rxrpc_recvdata(call, rxrpc_recvmsg_cont, seq,
rx_pkt_offset, rx_pkt_len, 0);
@@ -370,16 +373,17 @@ static int rxrpc_recvmsg_data(struct socket *sock, struct rxrpc_call *call,
break;
}
+ skip_obsolete:
/* The whole packet has been transferred. */
if (sp->hdr.flags & RXRPC_LAST_PACKET)
ret = 1;
rx_pkt_offset = 0;
rx_pkt_len = 0;
+ skb = skb_peek_next(skb, &call->recvmsg_queue);
+
if (!(flags & MSG_PEEK))
rxrpc_rotate_rx_window(call);
-
- seq++;
}
out:
@@ -522,8 +526,7 @@ try_again:
ret = 0;
rxrpc_transmit_ack_packets(call->peer->local);
- if (after(call->rx_top, call->rx_hard_ack) &&
- call->rxtx_buffer[(call->rx_hard_ack + 1) & RXRPC_RXTX_BUFF_MASK])
+ if (!skb_queue_empty(&call->recvmsg_queue))
rxrpc_notify_socket(call);
break;
default:
diff --git a/net/rxrpc/rxkad.c b/net/rxrpc/rxkad.c
index b19937776aa9..d87c99b36e01 100644
--- a/net/rxrpc/rxkad.c
+++ b/net/rxrpc/rxkad.c
@@ -1191,7 +1191,6 @@ static int rxkad_verify_response(struct rxrpc_connection *conn,
abort_code = RXKADPACKETSHORT;
if (skb_copy_bits(skb, sizeof(struct rxrpc_wire_header) + sizeof(*response),
ticket, ticket_len) < 0)
- goto protocol_error_free;
ret = rxkad_decrypt_ticket(conn, server_key, skb, ticket, ticket_len,
&session_key, &expiry, _abort_code);
diff --git a/net/rxrpc/sysctl.c b/net/rxrpc/sysctl.c
index 2bd987364e44..cde3224a5cd2 100644
--- a/net/rxrpc/sysctl.c
+++ b/net/rxrpc/sysctl.c
@@ -14,7 +14,7 @@ static struct ctl_table_header *rxrpc_sysctl_reg_table;
static const unsigned int four = 4;
static const unsigned int max_backlog = RXRPC_BACKLOG_MAX - 1;
static const unsigned int n_65535 = 65535;
-static const unsigned int n_max_acks = RXRPC_RXTX_BUFF_SIZE - 1;
+static const unsigned int n_max_acks = 255;
static const unsigned long one_jiffy = 1;
static const unsigned long max_jiffies = MAX_JIFFY_OFFSET;