summaryrefslogtreecommitdiff
path: root/net/rxrpc/txbuf.c
diff options
context:
space:
mode:
authorDavid Howells <dhowells@redhat.com>2022-04-01 01:55:08 +0300
committerDavid Howells <dhowells@redhat.com>2022-12-01 16:36:41 +0300
commitcf37b5987508878647161ec3cdba0bb00a1b607a (patch)
treed39fbe2796d73724a18b9ad8173b6236cc3a9b8b /net/rxrpc/txbuf.c
parentf3441d4125fc98995858550a5521b8d7daf0504a (diff)
downloadlinux-cf37b5987508878647161ec3cdba0bb00a1b607a.tar.xz
rxrpc: Move DATA transmission into call processor work item
Move DATA transmission into the call processor work item. In a future patch, this will be called from the I/O thread rather than being itsown work item. This will allow DATA transmission to be driven directly by incoming ACKs, pokes and timers as those are processed. The Tx queue is also split: The queue of packets prepared by sendmsg is now places in call->tx_sendmsg and the packet dispatcher decants the packets into call->tx_buffer as space becomes available in the transmission window. This allows sendmsg to run ahead of the available space to try and prevent an underflow in transmission. Signed-off-by: David Howells <dhowells@redhat.com> cc: Marc Dionne <marc.dionne@auristor.com> cc: linux-afs@lists.infradead.org
Diffstat (limited to 'net/rxrpc/txbuf.c')
-rw-r--r--net/rxrpc/txbuf.c10
1 files changed, 8 insertions, 2 deletions
diff --git a/net/rxrpc/txbuf.c b/net/rxrpc/txbuf.c
index 90ff00c340cd..a5054389dfbb 100644
--- a/net/rxrpc/txbuf.c
+++ b/net/rxrpc/txbuf.c
@@ -34,7 +34,7 @@ struct rxrpc_txbuf *rxrpc_alloc_txbuf(struct rxrpc_call *call, u8 packet_type,
txb->offset = 0;
txb->flags = 0;
txb->ack_why = 0;
- txb->seq = call->tx_top + 1;
+ txb->seq = call->tx_prepared + 1;
txb->wire.epoch = htonl(call->conn->proto.epoch);
txb->wire.cid = htonl(call->cid);
txb->wire.callNumber = htonl(call->call_id);
@@ -107,6 +107,7 @@ void rxrpc_shrink_call_tx_buffer(struct rxrpc_call *call)
{
struct rxrpc_txbuf *txb;
rxrpc_seq_t hard_ack = smp_load_acquire(&call->acks_hard_ack);
+ bool wake = false;
_enter("%x/%x/%x", call->tx_bottom, call->acks_hard_ack, call->tx_top);
@@ -123,7 +124,7 @@ void rxrpc_shrink_call_tx_buffer(struct rxrpc_call *call)
if (txb->seq != call->tx_bottom + 1)
rxrpc_see_txbuf(txb, rxrpc_txbuf_see_out_of_step);
ASSERTCMP(txb->seq, ==, call->tx_bottom + 1);
- call->tx_bottom++;
+ smp_store_release(&call->tx_bottom, call->tx_bottom + 1);
list_del_rcu(&txb->call_link);
trace_rxrpc_txqueue(call, rxrpc_txqueue_dequeue);
@@ -131,7 +132,12 @@ void rxrpc_shrink_call_tx_buffer(struct rxrpc_call *call)
spin_unlock(&call->tx_lock);
rxrpc_put_txbuf(txb, rxrpc_txbuf_put_rotated);
+ if (after(call->acks_hard_ack, call->tx_bottom + 128))
+ wake = true;
}
spin_unlock(&call->tx_lock);
+
+ if (wake)
+ wake_up(&call->waitq);
}