summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--include/uapi/linux/tipc.h1
-rw-r--r--net/tipc/msg.c53
-rw-r--r--net/tipc/msg.h12
-rw-r--r--net/tipc/node.h7
-rw-r--r--net/tipc/socket.c117
5 files changed, 170 insertions, 20 deletions
diff --git a/include/uapi/linux/tipc.h b/include/uapi/linux/tipc.h
index 7df026ea6aff..76421b878767 100644
--- a/include/uapi/linux/tipc.h
+++ b/include/uapi/linux/tipc.h
@@ -191,6 +191,7 @@ struct sockaddr_tipc {
#define TIPC_GROUP_JOIN 135 /* Takes struct tipc_group_req* */
#define TIPC_GROUP_LEAVE 136 /* No argument */
#define TIPC_SOCK_RECVQ_USED 137 /* Default: none (read only) */
+#define TIPC_NODELAY 138 /* Default: false */
/*
* Flag values
diff --git a/net/tipc/msg.c b/net/tipc/msg.c
index 922d262e153f..973795a1a968 100644
--- a/net/tipc/msg.c
+++ b/net/tipc/msg.c
@@ -190,6 +190,59 @@ err:
return 0;
}
+/**
+ * tipc_msg_append(): Append data to tail of an existing buffer queue
+ * @hdr: header to be used
+ * @m: the data to be appended
+ * @mss: max allowable size of buffer
+ * @dlen: size of data to be appended
+ * @txq: queue to appand to
+ * Returns the number og 1k blocks appended or errno value
+ */
+int tipc_msg_append(struct tipc_msg *_hdr, struct msghdr *m, int dlen,
+ int mss, struct sk_buff_head *txq)
+{
+ struct sk_buff *skb, *prev;
+ int accounted, total, curr;
+ int mlen, cpy, rem = dlen;
+ struct tipc_msg *hdr;
+
+ skb = skb_peek_tail(txq);
+ accounted = skb ? msg_blocks(buf_msg(skb)) : 0;
+ total = accounted;
+
+ while (rem) {
+ if (!skb || skb->len >= mss) {
+ prev = skb;
+ skb = tipc_buf_acquire(mss, GFP_KERNEL);
+ if (unlikely(!skb))
+ return -ENOMEM;
+ skb_orphan(skb);
+ skb_trim(skb, MIN_H_SIZE);
+ hdr = buf_msg(skb);
+ skb_copy_to_linear_data(skb, _hdr, MIN_H_SIZE);
+ msg_set_hdr_sz(hdr, MIN_H_SIZE);
+ msg_set_size(hdr, MIN_H_SIZE);
+ __skb_queue_tail(txq, skb);
+ total += 1;
+ if (prev)
+ msg_set_ack_required(buf_msg(prev), 0);
+ msg_set_ack_required(hdr, 1);
+ }
+ hdr = buf_msg(skb);
+ curr = msg_blocks(hdr);
+ mlen = msg_size(hdr);
+ cpy = min_t(int, rem, mss - mlen);
+ if (cpy != copy_from_iter(skb->data + mlen, cpy, &m->msg_iter))
+ return -EFAULT;
+ msg_set_size(hdr, mlen + cpy);
+ skb_put(skb, cpy);
+ rem -= cpy;
+ total += msg_blocks(hdr) - curr;
+ }
+ return total - accounted;
+}
+
/* tipc_msg_validate - validate basic format of received message
*
* This routine ensures a TIPC message has an acceptable header, and at least
diff --git a/net/tipc/msg.h b/net/tipc/msg.h
index 2d7cb66a6912..0435dda4b90c 100644
--- a/net/tipc/msg.h
+++ b/net/tipc/msg.h
@@ -290,6 +290,16 @@ static inline void msg_set_src_droppable(struct tipc_msg *m, u32 d)
msg_set_bits(m, 0, 18, 1, d);
}
+static inline int msg_ack_required(struct tipc_msg *m)
+{
+ return msg_bits(m, 0, 18, 1);
+}
+
+static inline void msg_set_ack_required(struct tipc_msg *m, u32 d)
+{
+ msg_set_bits(m, 0, 18, 1, d);
+}
+
static inline bool msg_is_rcast(struct tipc_msg *m)
{
return msg_bits(m, 0, 18, 0x1);
@@ -1079,6 +1089,8 @@ int tipc_msg_fragment(struct sk_buff *skb, const struct tipc_msg *hdr,
int pktmax, struct sk_buff_head *frags);
int tipc_msg_build(struct tipc_msg *mhdr, struct msghdr *m,
int offset, int dsz, int mtu, struct sk_buff_head *list);
+int tipc_msg_append(struct tipc_msg *hdr, struct msghdr *m, int dlen,
+ int mss, struct sk_buff_head *txq);
bool tipc_msg_lookup_dest(struct net *net, struct sk_buff *skb, int *err);
bool tipc_msg_assemble(struct sk_buff_head *list);
bool tipc_msg_reassemble(struct sk_buff_head *list, struct sk_buff_head *rcvq);
diff --git a/net/tipc/node.h b/net/tipc/node.h
index 30563c4f35d5..c39cd861c07d 100644
--- a/net/tipc/node.h
+++ b/net/tipc/node.h
@@ -54,7 +54,8 @@ enum {
TIPC_LINK_PROTO_SEQNO = (1 << 6),
TIPC_MCAST_RBCTL = (1 << 7),
TIPC_GAP_ACK_BLOCK = (1 << 8),
- TIPC_TUNNEL_ENHANCED = (1 << 9)
+ TIPC_TUNNEL_ENHANCED = (1 << 9),
+ TIPC_NAGLE = (1 << 10)
};
#define TIPC_NODE_CAPABILITIES (TIPC_SYN_BIT | \
@@ -66,7 +67,9 @@ enum {
TIPC_LINK_PROTO_SEQNO | \
TIPC_MCAST_RBCTL | \
TIPC_GAP_ACK_BLOCK | \
- TIPC_TUNNEL_ENHANCED)
+ TIPC_TUNNEL_ENHANCED | \
+ TIPC_NAGLE)
+
#define INVALID_BEARER_ID -1
void tipc_node_stop(struct net *net);
diff --git a/net/tipc/socket.c b/net/tipc/socket.c
index 2bcacd6022d5..3e99a122e321 100644
--- a/net/tipc/socket.c
+++ b/net/tipc/socket.c
@@ -75,6 +75,7 @@ struct sockaddr_pair {
* @conn_instance: TIPC instance used when connection was established
* @published: non-zero if port has one or more associated names
* @max_pkt: maximum packet size "hint" used when building messages sent by port
+ * @maxnagle: maximum size of msg which can be subject to nagle
* @portid: unique port identity in TIPC socket hash table
* @phdr: preformatted message header used when sending messages
* #cong_links: list of congested links
@@ -97,6 +98,7 @@ struct tipc_sock {
u32 conn_instance;
int published;
u32 max_pkt;
+ u32 maxnagle;
u32 portid;
struct tipc_msg phdr;
struct list_head cong_links;
@@ -116,6 +118,10 @@ struct tipc_sock {
struct tipc_mc_method mc_method;
struct rcu_head rcu;
struct tipc_group *group;
+ u32 oneway;
+ u16 snd_backlog;
+ bool expect_ack;
+ bool nodelay;
bool group_is_open;
};
@@ -137,6 +143,7 @@ static int tipc_sk_insert(struct tipc_sock *tsk);
static void tipc_sk_remove(struct tipc_sock *tsk);
static int __tipc_sendstream(struct socket *sock, struct msghdr *m, size_t dsz);
static int __tipc_sendmsg(struct socket *sock, struct msghdr *m, size_t dsz);
+static void tipc_sk_push_backlog(struct tipc_sock *tsk);
static const struct proto_ops packet_ops;
static const struct proto_ops stream_ops;
@@ -227,6 +234,26 @@ static u16 tsk_inc(struct tipc_sock *tsk, int msglen)
return 1;
}
+/* tsk_set_nagle - enable/disable nagle property by manipulating maxnagle
+ */
+static void tsk_set_nagle(struct tipc_sock *tsk)
+{
+ struct sock *sk = &tsk->sk;
+
+ tsk->maxnagle = 0;
+ if (sk->sk_type != SOCK_STREAM)
+ return;
+ if (tsk->nodelay)
+ return;
+ if (!(tsk->peer_caps & TIPC_NAGLE))
+ return;
+ /* Limit node local buffer size to avoid receive queue overflow */
+ if (tsk->max_pkt == MAX_MSG_SIZE)
+ tsk->maxnagle = 1500;
+ else
+ tsk->maxnagle = tsk->max_pkt;
+}
+
/**
* tsk_advance_rx_queue - discard first buffer in socket receive queue
*
@@ -446,6 +473,7 @@ static int tipc_sk_create(struct net *net, struct socket *sock,
tsk = tipc_sk(sk);
tsk->max_pkt = MAX_PKT_DEFAULT;
+ tsk->maxnagle = 0;
INIT_LIST_HEAD(&tsk->publications);
INIT_LIST_HEAD(&tsk->cong_links);
msg = &tsk->phdr;
@@ -512,8 +540,12 @@ static void __tipc_shutdown(struct socket *sock, int error)
tipc_wait_for_cond(sock, &timeout, (!tsk->cong_link_cnt &&
!tsk_conn_cong(tsk)));
- /* Remove any pending SYN message */
- __skb_queue_purge(&sk->sk_write_queue);
+ /* Push out unsent messages or remove if pending SYN */
+ skb = skb_peek(&sk->sk_write_queue);
+ if (skb && !msg_is_syn(buf_msg(skb)))
+ tipc_sk_push_backlog(tsk);
+ else
+ __skb_queue_purge(&sk->sk_write_queue);
/* Reject all unreceived messages, except on an active connection
* (which disconnects locally & sends a 'FIN+' to peer).
@@ -1208,6 +1240,27 @@ void tipc_sk_mcast_rcv(struct net *net, struct sk_buff_head *arrvq,
tipc_sk_rcv(net, inputq);
}
+/* tipc_sk_push_backlog(): send accumulated buffers in socket write queue
+ * when socket is in Nagle mode
+ */
+static void tipc_sk_push_backlog(struct tipc_sock *tsk)
+{
+ struct sk_buff_head *txq = &tsk->sk.sk_write_queue;
+ struct net *net = sock_net(&tsk->sk);
+ u32 dnode = tsk_peer_node(tsk);
+ int rc;
+
+ if (skb_queue_empty(txq) || tsk->cong_link_cnt)
+ return;
+
+ tsk->snt_unacked += tsk->snd_backlog;
+ tsk->snd_backlog = 0;
+ tsk->expect_ack = true;
+ rc = tipc_node_xmit(net, txq, dnode, tsk->portid);
+ if (rc == -ELINKCONG)
+ tsk->cong_link_cnt = 1;
+}
+
/**
* tipc_sk_conn_proto_rcv - receive a connection mng protocol message
* @tsk: receiving socket
@@ -1221,7 +1274,7 @@ static void tipc_sk_conn_proto_rcv(struct tipc_sock *tsk, struct sk_buff *skb,
u32 onode = tsk_own_node(tsk);
struct sock *sk = &tsk->sk;
int mtyp = msg_type(hdr);
- bool conn_cong;
+ bool was_cong;
/* Ignore if connection cannot be validated: */
if (!tsk_peer_msg(tsk, hdr)) {
@@ -1254,11 +1307,13 @@ static void tipc_sk_conn_proto_rcv(struct tipc_sock *tsk, struct sk_buff *skb,
__skb_queue_tail(xmitq, skb);
return;
} else if (mtyp == CONN_ACK) {
- conn_cong = tsk_conn_cong(tsk);
+ was_cong = tsk_conn_cong(tsk);
+ tsk->expect_ack = false;
+ tipc_sk_push_backlog(tsk);
tsk->snt_unacked -= msg_conn_ack(hdr);
if (tsk->peer_caps & TIPC_BLOCK_FLOWCTL)
tsk->snd_win = msg_adv_win(hdr);
- if (conn_cong)
+ if (was_cong && !tsk_conn_cong(tsk))
sk->sk_write_space(sk);
} else if (mtyp != CONN_PROBE_REPLY) {
pr_warn("Received unknown CONN_PROTO msg\n");
@@ -1437,15 +1492,15 @@ static int __tipc_sendstream(struct socket *sock, struct msghdr *m, size_t dlen)
struct sock *sk = sock->sk;
DECLARE_SOCKADDR(struct sockaddr_tipc *, dest, m->msg_name);
long timeout = sock_sndtimeo(sk, m->msg_flags & MSG_DONTWAIT);
+ struct sk_buff_head *txq = &sk->sk_write_queue;
struct tipc_sock *tsk = tipc_sk(sk);
struct tipc_msg *hdr = &tsk->phdr;
struct net *net = sock_net(sk);
- struct sk_buff_head pkts;
u32 dnode = tsk_peer_node(tsk);
+ int maxnagle = tsk->maxnagle;
+ int maxpkt = tsk->max_pkt;
int send, sent = 0;
- int rc = 0;
-
- __skb_queue_head_init(&pkts);
+ int blocks, rc = 0;
if (unlikely(dlen > INT_MAX))
return -EMSGSIZE;
@@ -1467,21 +1522,35 @@ static int __tipc_sendstream(struct socket *sock, struct msghdr *m, size_t dlen)
tipc_sk_connected(sk)));
if (unlikely(rc))
break;
-
send = min_t(size_t, dlen - sent, TIPC_MAX_USER_MSG_SIZE);
- rc = tipc_msg_build(hdr, m, sent, send, tsk->max_pkt, &pkts);
- if (unlikely(rc != send))
- break;
-
- trace_tipc_sk_sendstream(sk, skb_peek(&pkts),
+ blocks = tsk->snd_backlog;
+ if (tsk->oneway++ >= 4 && send <= maxnagle) {
+ rc = tipc_msg_append(hdr, m, send, maxnagle, txq);
+ if (unlikely(rc < 0))
+ break;
+ blocks += rc;
+ if (blocks <= 64 && tsk->expect_ack) {
+ tsk->snd_backlog = blocks;
+ sent += send;
+ break;
+ }
+ tsk->expect_ack = true;
+ } else {
+ rc = tipc_msg_build(hdr, m, sent, send, maxpkt, txq);
+ if (unlikely(rc != send))
+ break;
+ blocks += tsk_inc(tsk, send + MIN_H_SIZE);
+ }
+ trace_tipc_sk_sendstream(sk, skb_peek(txq),
TIPC_DUMP_SK_SNDQ, " ");
- rc = tipc_node_xmit(net, &pkts, dnode, tsk->portid);
+ rc = tipc_node_xmit(net, txq, dnode, tsk->portid);
if (unlikely(rc == -ELINKCONG)) {
tsk->cong_link_cnt = 1;
rc = 0;
}
if (likely(!rc)) {
- tsk->snt_unacked += tsk_inc(tsk, send + MIN_H_SIZE);
+ tsk->snt_unacked += blocks;
+ tsk->snd_backlog = 0;
sent += send;
}
} while (sent < dlen && !rc);
@@ -1528,6 +1597,7 @@ static void tipc_sk_finish_conn(struct tipc_sock *tsk, u32 peer_port,
tipc_node_add_conn(net, peer_node, tsk->portid, peer_port);
tsk->max_pkt = tipc_node_get_mtu(net, peer_node, tsk->portid, true);
tsk->peer_caps = tipc_node_get_capabilities(net, peer_node);
+ tsk_set_nagle(tsk);
__skb_queue_purge(&sk->sk_write_queue);
if (tsk->peer_caps & TIPC_BLOCK_FLOWCTL)
return;
@@ -1848,6 +1918,7 @@ static int tipc_recvstream(struct socket *sock, struct msghdr *m,
bool peek = flags & MSG_PEEK;
int offset, required, copy, copied = 0;
int hlen, dlen, err, rc;
+ bool ack = false;
long timeout;
/* Catch invalid receive attempts */
@@ -1892,6 +1963,7 @@ static int tipc_recvstream(struct socket *sock, struct msghdr *m,
/* Copy data if msg ok, otherwise return error/partial data */
if (likely(!err)) {
+ ack = msg_ack_required(hdr);
offset = skb_cb->bytes_read;
copy = min_t(int, dlen - offset, buflen - copied);
rc = skb_copy_datagram_msg(skb, hlen + offset, m, copy);
@@ -1919,7 +1991,7 @@ static int tipc_recvstream(struct socket *sock, struct msghdr *m,
/* Send connection flow control advertisement when applicable */
tsk->rcv_unacked += tsk_inc(tsk, hlen + dlen);
- if (unlikely(tsk->rcv_unacked >= tsk->rcv_win / TIPC_ACK_RATE))
+ if (ack || tsk->rcv_unacked >= tsk->rcv_win / TIPC_ACK_RATE)
tipc_sk_send_ack(tsk);
/* Exit if all requested data or FIN/error received */
@@ -1990,6 +2062,7 @@ static void tipc_sk_proto_rcv(struct sock *sk,
smp_wmb();
tsk->cong_link_cnt--;
wakeup = true;
+ tipc_sk_push_backlog(tsk);
break;
case GROUP_PROTOCOL:
tipc_group_proto_rcv(grp, &wakeup, hdr, inputq, xmitq);
@@ -2029,6 +2102,7 @@ static bool tipc_sk_filter_connect(struct tipc_sock *tsk, struct sk_buff *skb)
if (unlikely(msg_mcast(hdr)))
return false;
+ tsk->oneway = 0;
switch (sk->sk_state) {
case TIPC_CONNECTING:
@@ -2074,6 +2148,8 @@ static bool tipc_sk_filter_connect(struct tipc_sock *tsk, struct sk_buff *skb)
return true;
return false;
case TIPC_ESTABLISHED:
+ if (!skb_queue_empty(&sk->sk_write_queue))
+ tipc_sk_push_backlog(tsk);
/* Accept only connection-based messages sent by peer */
if (likely(con_msg && !err && pport == oport && pnode == onode))
return true;
@@ -2959,6 +3035,7 @@ static int tipc_setsockopt(struct socket *sock, int lvl, int opt,
case TIPC_SRC_DROPPABLE:
case TIPC_DEST_DROPPABLE:
case TIPC_CONN_TIMEOUT:
+ case TIPC_NODELAY:
if (ol < sizeof(value))
return -EINVAL;
if (get_user(value, (u32 __user *)ov))
@@ -3007,6 +3084,10 @@ static int tipc_setsockopt(struct socket *sock, int lvl, int opt,
case TIPC_GROUP_LEAVE:
res = tipc_sk_leave(tsk);
break;
+ case TIPC_NODELAY:
+ tsk->nodelay = !!value;
+ tsk_set_nagle(tsk);
+ break;
default:
res = -EINVAL;
}