summaryrefslogtreecommitdiff
path: root/net/rxrpc/call_event.c
diff options
context:
space:
mode:
Diffstat (limited to 'net/rxrpc/call_event.c')
-rw-r--r--net/rxrpc/call_event.c74
1 files changed, 30 insertions, 44 deletions
diff --git a/net/rxrpc/call_event.c b/net/rxrpc/call_event.c
index fee8b6ddb334..8365d3366114 100644
--- a/net/rxrpc/call_event.c
+++ b/net/rxrpc/call_event.c
@@ -811,8 +811,9 @@ static int rxrpc_post_message(struct rxrpc_call *call, u32 mark, u32 error,
}
/*
- * handle background processing of incoming call packets and ACK / abort
- * generation
+ * Handle background processing of incoming call packets and ACK / abort
+ * generation. A ref on the call is donated to us by whoever queued the work
+ * item.
*/
void rxrpc_process_call(struct work_struct *work)
{
@@ -827,6 +828,7 @@ void rxrpc_process_call(struct work_struct *work)
unsigned long bits;
__be32 data, pad;
size_t len;
+ bool requeue = false;
int loop, nbit, ioc, ret, mtu;
u32 serial, abort_code = RX_PROTOCOL_ERROR;
u8 *acks = NULL;
@@ -838,6 +840,11 @@ void rxrpc_process_call(struct work_struct *work)
call->debug_id, rxrpc_call_states[call->state], call->events,
(jiffies - call->creation_jif) / (HZ / 10));
+ if (call->state >= RXRPC_CALL_COMPLETE) {
+ rxrpc_put_call(call, rxrpc_call_put);
+ return;
+ }
+
if (!call->conn)
goto skip_msg_init;
@@ -1088,16 +1095,21 @@ skip_msg_init:
spin_lock_bh(&call->lock);
if (call->state == RXRPC_CALL_SERVER_SECURING) {
+ struct rxrpc_sock *rx;
_debug("securing");
- write_lock(&call->socket->call_lock);
- if (!test_bit(RXRPC_CALL_RELEASED, &call->flags) &&
- !test_bit(RXRPC_CALL_EV_RELEASE, &call->events)) {
- _debug("not released");
- call->state = RXRPC_CALL_SERVER_ACCEPTING;
- list_move_tail(&call->accept_link,
- &call->socket->acceptq);
+ rcu_read_lock();
+ rx = rcu_dereference(call->socket);
+ if (rx) {
+ write_lock(&rx->call_lock);
+ if (!test_bit(RXRPC_CALL_RELEASED, &call->flags)) {
+ _debug("not released");
+ call->state = RXRPC_CALL_SERVER_ACCEPTING;
+ list_move_tail(&call->accept_link,
+ &rx->acceptq);
+ }
+ write_unlock(&rx->call_lock);
}
- write_unlock(&call->socket->call_lock);
+ rcu_read_unlock();
read_lock(&call->state_lock);
if (call->state < RXRPC_CALL_COMPLETE)
set_bit(RXRPC_CALL_EV_POST_ACCEPT, &call->events);
@@ -1139,11 +1151,6 @@ skip_msg_init:
goto maybe_reschedule;
}
- if (test_bit(RXRPC_CALL_EV_RELEASE, &call->events)) {
- rxrpc_release_call(call);
- clear_bit(RXRPC_CALL_EV_RELEASE, &call->events);
- }
-
/* other events may have been raised since we started checking */
goto maybe_reschedule;
@@ -1209,10 +1216,8 @@ send_message_2:
&msg, iov, ioc, len);
if (ret < 0) {
_debug("sendmsg failed: %d", ret);
- read_lock_bh(&call->state_lock);
- if (call->state < RXRPC_CALL_DEAD)
- rxrpc_queue_call(call);
- read_unlock_bh(&call->state_lock);
+ if (call->state < RXRPC_CALL_COMPLETE)
+ requeue = true;
goto error;
}
@@ -1245,41 +1250,22 @@ send_message_2:
kill_ACKs:
del_timer_sync(&call->ack_timer);
- if (test_and_clear_bit(RXRPC_CALL_EV_ACK_FINAL, &call->events))
- rxrpc_put_call(call, rxrpc_call_put);
clear_bit(RXRPC_CALL_EV_ACK, &call->events);
maybe_reschedule:
if (call->events || !skb_queue_empty(&call->rx_queue)) {
- read_lock_bh(&call->state_lock);
- if (call->state < RXRPC_CALL_DEAD)
- rxrpc_queue_call(call);
- read_unlock_bh(&call->state_lock);
- }
-
- /* don't leave aborted connections on the accept queue */
- if (call->state >= RXRPC_CALL_COMPLETE &&
- !list_empty(&call->accept_link)) {
- _debug("X unlinking once-pending call %p { e=%lx f=%lx c=%x }",
- call, call->events, call->flags, call->conn->proto.cid);
-
- read_lock_bh(&call->state_lock);
- if (!test_bit(RXRPC_CALL_RELEASED, &call->flags) &&
- !test_and_set_bit(RXRPC_CALL_EV_RELEASE, &call->events))
- rxrpc_queue_call(call);
- read_unlock_bh(&call->state_lock);
+ if (call->state < RXRPC_CALL_COMPLETE)
+ requeue = true;
}
error:
kfree(acks);
- /* because we don't want two CPUs both processing the work item for one
- * call at the same time, we use a flag to note when it's busy; however
- * this means there's a race between clearing the flag and setting the
- * work pending bit and the work item being processed again */
- if (call->events && !work_pending(&call->processor)) {
+ if ((requeue || call->events) && !work_pending(&call->processor)) {
_debug("jumpstart %x", call->conn->proto.cid);
- rxrpc_queue_call(call);
+ __rxrpc_queue_call(call);
+ } else {
+ rxrpc_put_call(call, rxrpc_call_put);
}
_leave("");