summaryrefslogtreecommitdiff
path: root/net/tipc
diff options
context:
space:
mode:
Diffstat (limited to 'net/tipc')
-rw-r--r--net/tipc/bcast.c14
-rw-r--r--net/tipc/bearer.c64
-rw-r--r--net/tipc/bearer.h10
-rw-r--r--net/tipc/core.c5
-rw-r--r--net/tipc/core.h3
-rw-r--r--net/tipc/group.c4
-rw-r--r--net/tipc/link.c160
-rw-r--r--net/tipc/msg.c59
-rw-r--r--net/tipc/msg.h28
-rw-r--r--net/tipc/name_distr.c2
-rw-r--r--net/tipc/node.c14
-rw-r--r--net/tipc/node.h6
-rw-r--r--net/tipc/socket.c14
-rw-r--r--net/tipc/topsrv.c2
14 files changed, 333 insertions, 52 deletions
diff --git a/net/tipc/bcast.c b/net/tipc/bcast.c
index 1336f3cdad38..6ef1abdd525f 100644
--- a/net/tipc/bcast.c
+++ b/net/tipc/bcast.c
@@ -185,7 +185,7 @@ static void tipc_bcbase_xmit(struct net *net, struct sk_buff_head *xmitq)
}
/* We have to transmit across all bearers */
- skb_queue_head_init(&_xmitq);
+ __skb_queue_head_init(&_xmitq);
for (bearer_id = 0; bearer_id < MAX_BEARERS; bearer_id++) {
if (!bb->dests[bearer_id])
continue;
@@ -256,7 +256,7 @@ static int tipc_bcast_xmit(struct net *net, struct sk_buff_head *pkts,
struct sk_buff_head xmitq;
int rc = 0;
- skb_queue_head_init(&xmitq);
+ __skb_queue_head_init(&xmitq);
tipc_bcast_lock(net);
if (tipc_link_bc_peers(l))
rc = tipc_link_xmit(l, pkts, &xmitq);
@@ -286,7 +286,7 @@ static int tipc_rcast_xmit(struct net *net, struct sk_buff_head *pkts,
u32 dnode, selector;
selector = msg_link_selector(buf_msg(skb_peek(pkts)));
- skb_queue_head_init(&_pkts);
+ __skb_queue_head_init(&_pkts);
list_for_each_entry_safe(dst, tmp, &dests->list, list) {
dnode = dst->node;
@@ -344,7 +344,7 @@ static int tipc_mcast_send_sync(struct net *net, struct sk_buff *skb,
msg_set_size(_hdr, MCAST_H_SIZE);
msg_set_is_rcast(_hdr, !msg_is_rcast(hdr));
- skb_queue_head_init(&tmpq);
+ __skb_queue_head_init(&tmpq);
__skb_queue_tail(&tmpq, _skb);
if (method->rcast)
tipc_bcast_xmit(net, &tmpq, cong_link_cnt);
@@ -378,7 +378,7 @@ int tipc_mcast_xmit(struct net *net, struct sk_buff_head *pkts,
int rc = 0;
skb_queue_head_init(&inputq);
- skb_queue_head_init(&localq);
+ __skb_queue_head_init(&localq);
/* Clone packets before they are consumed by next call */
if (dests->local && !tipc_msg_reassemble(pkts, &localq)) {
@@ -406,8 +406,10 @@ int tipc_mcast_xmit(struct net *net, struct sk_buff_head *pkts,
rc = tipc_bcast_xmit(net, pkts, cong_link_cnt);
}
- if (dests->local)
+ if (dests->local) {
+ tipc_loopback_trace(net, &localq);
tipc_sk_mcast_rcv(net, &localq, &inputq);
+ }
exit:
/* This queue should normally be empty by now */
__skb_queue_purge(pkts);
diff --git a/net/tipc/bearer.c b/net/tipc/bearer.c
index a809c0ec8d15..0214aa1c4427 100644
--- a/net/tipc/bearer.c
+++ b/net/tipc/bearer.c
@@ -389,6 +389,11 @@ int tipc_enable_l2_media(struct net *net, struct tipc_bearer *b,
dev_put(dev);
return -EINVAL;
}
+ if (dev == net->loopback_dev) {
+ dev_put(dev);
+ pr_info("Enabling <%s> not permitted\n", b->name);
+ return -EINVAL;
+ }
/* Autoconfigure own node identity if needed */
if (!tipc_own_id(net) && hwaddr_len <= NODE_ID_LEN) {
@@ -674,6 +679,65 @@ void tipc_bearer_stop(struct net *net)
}
}
+void tipc_clone_to_loopback(struct net *net, struct sk_buff_head *pkts)
+{
+ struct net_device *dev = net->loopback_dev;
+ struct sk_buff *skb, *_skb;
+ int exp;
+
+ skb_queue_walk(pkts, _skb) {
+ skb = pskb_copy(_skb, GFP_ATOMIC);
+ if (!skb)
+ continue;
+
+ exp = SKB_DATA_ALIGN(dev->hard_header_len - skb_headroom(skb));
+ if (exp > 0 && pskb_expand_head(skb, exp, 0, GFP_ATOMIC)) {
+ kfree_skb(skb);
+ continue;
+ }
+
+ skb_reset_network_header(skb);
+ dev_hard_header(skb, dev, ETH_P_TIPC, dev->dev_addr,
+ dev->dev_addr, skb->len);
+ skb->dev = dev;
+ skb->pkt_type = PACKET_HOST;
+ skb->ip_summed = CHECKSUM_UNNECESSARY;
+ skb->protocol = eth_type_trans(skb, dev);
+ netif_rx_ni(skb);
+ }
+}
+
+static int tipc_loopback_rcv_pkt(struct sk_buff *skb, struct net_device *dev,
+ struct packet_type *pt, struct net_device *od)
+{
+ consume_skb(skb);
+ return NET_RX_SUCCESS;
+}
+
+int tipc_attach_loopback(struct net *net)
+{
+ struct net_device *dev = net->loopback_dev;
+ struct tipc_net *tn = tipc_net(net);
+
+ if (!dev)
+ return -ENODEV;
+
+ dev_hold(dev);
+ tn->loopback_pt.dev = dev;
+ tn->loopback_pt.type = htons(ETH_P_TIPC);
+ tn->loopback_pt.func = tipc_loopback_rcv_pkt;
+ dev_add_pack(&tn->loopback_pt);
+ return 0;
+}
+
+void tipc_detach_loopback(struct net *net)
+{
+ struct tipc_net *tn = tipc_net(net);
+
+ dev_remove_pack(&tn->loopback_pt);
+ dev_put(net->loopback_dev);
+}
+
/* Caller should hold rtnl_lock to protect the bearer */
static int __tipc_nl_add_bearer(struct tipc_nl_msg *msg,
struct tipc_bearer *bearer, int nlflags)
diff --git a/net/tipc/bearer.h b/net/tipc/bearer.h
index 7f4c569594a5..ea0f3c49cbed 100644
--- a/net/tipc/bearer.h
+++ b/net/tipc/bearer.h
@@ -232,6 +232,16 @@ void tipc_bearer_xmit(struct net *net, u32 bearer_id,
struct tipc_media_addr *dst);
void tipc_bearer_bc_xmit(struct net *net, u32 bearer_id,
struct sk_buff_head *xmitq);
+void tipc_clone_to_loopback(struct net *net, struct sk_buff_head *pkts);
+int tipc_attach_loopback(struct net *net);
+void tipc_detach_loopback(struct net *net);
+
+static inline void tipc_loopback_trace(struct net *net,
+ struct sk_buff_head *pkts)
+{
+ if (unlikely(dev_nit_active(net->loopback_dev)))
+ tipc_clone_to_loopback(net, pkts);
+}
/* check if device MTU is too low for tipc headers */
static inline bool tipc_mtu_bad(struct net_device *dev, unsigned int reserve)
diff --git a/net/tipc/core.c b/net/tipc/core.c
index c8370722f0bb..23cb379a93d6 100644
--- a/net/tipc/core.c
+++ b/net/tipc/core.c
@@ -82,6 +82,10 @@ static int __net_init tipc_init_net(struct net *net)
if (err)
goto out_bclink;
+ err = tipc_attach_loopback(net);
+ if (err)
+ goto out_bclink;
+
return 0;
out_bclink:
@@ -94,6 +98,7 @@ out_sk_rht:
static void __net_exit tipc_exit_net(struct net *net)
{
+ tipc_detach_loopback(net);
tipc_net_stop(net);
tipc_bcast_stop(net);
tipc_nametbl_stop(net);
diff --git a/net/tipc/core.h b/net/tipc/core.h
index 7a68e1b6a066..60d829581068 100644
--- a/net/tipc/core.h
+++ b/net/tipc/core.h
@@ -125,6 +125,9 @@ struct tipc_net {
/* Cluster capabilities */
u16 capabilities;
+
+ /* Tracing of node internal messages */
+ struct packet_type loopback_pt;
};
static inline struct tipc_net *tipc_net(struct net *net)
diff --git a/net/tipc/group.c b/net/tipc/group.c
index 5f98d38bcf08..89257e2a980d 100644
--- a/net/tipc/group.c
+++ b/net/tipc/group.c
@@ -199,7 +199,7 @@ void tipc_group_join(struct net *net, struct tipc_group *grp, int *sk_rcvbuf)
struct tipc_member *m, *tmp;
struct sk_buff_head xmitq;
- skb_queue_head_init(&xmitq);
+ __skb_queue_head_init(&xmitq);
rbtree_postorder_for_each_entry_safe(m, tmp, tree, tree_node) {
tipc_group_proto_xmit(grp, m, GRP_JOIN_MSG, &xmitq);
tipc_group_update_member(m, 0);
@@ -435,7 +435,7 @@ bool tipc_group_cong(struct tipc_group *grp, u32 dnode, u32 dport,
return true;
if (state == MBR_PENDING && adv == ADV_IDLE)
return true;
- skb_queue_head_init(&xmitq);
+ __skb_queue_head_init(&xmitq);
tipc_group_proto_xmit(grp, m, GRP_ADV_MSG, &xmitq);
tipc_node_distr_xmit(grp->net, &xmitq);
return true;
diff --git a/net/tipc/link.c b/net/tipc/link.c
index c2c5c53cad22..6cc75ffd9e2c 100644
--- a/net/tipc/link.c
+++ b/net/tipc/link.c
@@ -176,6 +176,7 @@ struct tipc_link {
/* Fragmentation/reassembly */
struct sk_buff *reasm_buf;
+ struct sk_buff *reasm_tnlmsg;
/* Broadcast */
u16 ackers;
@@ -849,18 +850,31 @@ static int link_schedule_user(struct tipc_link *l, struct tipc_msg *hdr)
*/
static void link_prepare_wakeup(struct tipc_link *l)
{
+ struct sk_buff_head *wakeupq = &l->wakeupq;
+ struct sk_buff_head *inputq = l->inputq;
struct sk_buff *skb, *tmp;
- int imp, i = 0;
+ struct sk_buff_head tmpq;
+ int avail[5] = {0,};
+ int imp = 0;
+
+ __skb_queue_head_init(&tmpq);
+
+ for (; imp <= TIPC_SYSTEM_IMPORTANCE; imp++)
+ avail[imp] = l->backlog[imp].limit - l->backlog[imp].len;
- skb_queue_walk_safe(&l->wakeupq, skb, tmp) {
+ skb_queue_walk_safe(wakeupq, skb, tmp) {
imp = TIPC_SKB_CB(skb)->chain_imp;
- if (l->backlog[imp].len < l->backlog[imp].limit) {
- skb_unlink(skb, &l->wakeupq);
- skb_queue_tail(l->inputq, skb);
- } else if (i++ > 10) {
- break;
- }
+ if (avail[imp] <= 0)
+ continue;
+ avail[imp]--;
+ __skb_unlink(skb, wakeupq);
+ __skb_queue_tail(&tmpq, skb);
}
+
+ spin_lock_bh(&inputq->lock);
+ skb_queue_splice_tail(&tmpq, inputq);
+ spin_unlock_bh(&inputq->lock);
+
}
void tipc_link_reset(struct tipc_link *l)
@@ -893,8 +907,10 @@ void tipc_link_reset(struct tipc_link *l)
l->backlog[TIPC_CRITICAL_IMPORTANCE].len = 0;
l->backlog[TIPC_SYSTEM_IMPORTANCE].len = 0;
kfree_skb(l->reasm_buf);
+ kfree_skb(l->reasm_tnlmsg);
kfree_skb(l->failover_reasm_skb);
l->reasm_buf = NULL;
+ l->reasm_tnlmsg = NULL;
l->failover_reasm_skb = NULL;
l->rcv_unacked = 0;
l->snd_nxt = 1;
@@ -936,7 +952,10 @@ int tipc_link_xmit(struct tipc_link *l, struct sk_buff_head *list,
int rc = 0;
if (unlikely(msg_size(hdr) > mtu)) {
- skb_queue_purge(list);
+ pr_warn("Too large msg, purging xmit list %d %d %d %d %d!\n",
+ skb_queue_len(list), msg_user(hdr),
+ msg_type(hdr), msg_size(hdr), mtu);
+ __skb_queue_purge(list);
return -EMSGSIZE;
}
@@ -965,7 +984,7 @@ int tipc_link_xmit(struct tipc_link *l, struct sk_buff_head *list,
if (likely(skb_queue_len(transmq) < maxwin)) {
_skb = skb_clone(skb, GFP_ATOMIC);
if (!_skb) {
- skb_queue_purge(list);
+ __skb_queue_purge(list);
return -ENOBUFS;
}
__skb_dequeue(list);
@@ -1238,6 +1257,7 @@ static int tipc_link_tnl_rcv(struct tipc_link *l, struct sk_buff *skb,
struct sk_buff_head *inputq)
{
struct sk_buff **reasm_skb = &l->failover_reasm_skb;
+ struct sk_buff **reasm_tnlmsg = &l->reasm_tnlmsg;
struct sk_buff_head *fdefq = &l->failover_deferdq;
struct tipc_msg *hdr = buf_msg(skb);
struct sk_buff *iskb;
@@ -1245,40 +1265,56 @@ static int tipc_link_tnl_rcv(struct tipc_link *l, struct sk_buff *skb,
int rc = 0;
u16 seqno;
- /* SYNCH_MSG */
- if (msg_type(hdr) == SYNCH_MSG)
- goto drop;
+ if (msg_type(hdr) == SYNCH_MSG) {
+ kfree_skb(skb);
+ return 0;
+ }
- /* FAILOVER_MSG */
- if (!tipc_msg_extract(skb, &iskb, &ipos)) {
- pr_warn_ratelimited("Cannot extract FAILOVER_MSG, defq: %d\n",
- skb_queue_len(fdefq));
- return rc;
+ /* Not a fragment? */
+ if (likely(!msg_nof_fragms(hdr))) {
+ if (unlikely(!tipc_msg_extract(skb, &iskb, &ipos))) {
+ pr_warn_ratelimited("Unable to extract msg, defq: %d\n",
+ skb_queue_len(fdefq));
+ return 0;
+ }
+ kfree_skb(skb);
+ } else {
+ /* Set fragment type for buf_append */
+ if (msg_fragm_no(hdr) == 1)
+ msg_set_type(hdr, FIRST_FRAGMENT);
+ else if (msg_fragm_no(hdr) < msg_nof_fragms(hdr))
+ msg_set_type(hdr, FRAGMENT);
+ else
+ msg_set_type(hdr, LAST_FRAGMENT);
+
+ if (!tipc_buf_append(reasm_tnlmsg, &skb)) {
+ /* Successful but non-complete reassembly? */
+ if (*reasm_tnlmsg || link_is_bc_rcvlink(l))
+ return 0;
+ pr_warn_ratelimited("Unable to reassemble tunnel msg\n");
+ return tipc_link_fsm_evt(l, LINK_FAILURE_EVT);
+ }
+ iskb = skb;
}
do {
seqno = buf_seqno(iskb);
-
if (unlikely(less(seqno, l->drop_point))) {
kfree_skb(iskb);
continue;
}
-
if (unlikely(seqno != l->drop_point)) {
__tipc_skb_queue_sorted(fdefq, seqno, iskb);
continue;
}
l->drop_point++;
-
if (!tipc_data_input(l, iskb, inputq))
rc |= tipc_link_input(l, iskb, inputq, reasm_skb);
if (unlikely(rc))
break;
} while ((iskb = __tipc_skb_dequeue(fdefq, l->drop_point)));
-drop:
- kfree_skb(skb);
return rc;
}
@@ -1644,7 +1680,7 @@ void tipc_link_create_dummy_tnl_msg(struct tipc_link *l,
struct sk_buff *skb;
u32 dnode = l->addr;
- skb_queue_head_init(&tnlq);
+ __skb_queue_head_init(&tnlq);
skb = tipc_msg_create(TUNNEL_PROTOCOL, FAILOVER_MSG,
INT_H_SIZE, BASIC_H_SIZE,
dnode, onode, 0, 0, 0);
@@ -1675,14 +1711,18 @@ void tipc_link_tnl_prepare(struct tipc_link *l, struct tipc_link *tnl,
struct sk_buff *skb, *tnlskb;
struct tipc_msg *hdr, tnlhdr;
struct sk_buff_head *queue = &l->transmq;
- struct sk_buff_head tmpxq, tnlq;
+ struct sk_buff_head tmpxq, tnlq, frags;
u16 pktlen, pktcnt, seqno = l->snd_nxt;
+ bool pktcnt_need_update = false;
+ u16 syncpt;
+ int rc;
if (!tnl)
return;
- skb_queue_head_init(&tnlq);
- skb_queue_head_init(&tmpxq);
+ __skb_queue_head_init(&tnlq);
+ __skb_queue_head_init(&tmpxq);
+ __skb_queue_head_init(&frags);
/* At least one packet required for safe algorithm => add dummy */
skb = tipc_msg_create(TIPC_LOW_IMPORTANCE, TIPC_DIRECT_MSG,
@@ -1692,10 +1732,35 @@ void tipc_link_tnl_prepare(struct tipc_link *l, struct tipc_link *tnl,
pr_warn("%sunable to create tunnel packet\n", link_co_err);
return;
}
- skb_queue_tail(&tnlq, skb);
+ __skb_queue_tail(&tnlq, skb);
tipc_link_xmit(l, &tnlq, &tmpxq);
__skb_queue_purge(&tmpxq);
+ /* Link Synching:
+ * From now on, send only one single ("dummy") SYNCH message
+ * to peer. The SYNCH message does not contain any data, just
+ * a header conveying the synch point to the peer.
+ */
+ if (mtyp == SYNCH_MSG && (tnl->peer_caps & TIPC_TUNNEL_ENHANCED)) {
+ tnlskb = tipc_msg_create(TUNNEL_PROTOCOL, SYNCH_MSG,
+ INT_H_SIZE, 0, l->addr,
+ tipc_own_addr(l->net),
+ 0, 0, 0);
+ if (!tnlskb) {
+ pr_warn("%sunable to create dummy SYNCH_MSG\n",
+ link_co_err);
+ return;
+ }
+
+ hdr = buf_msg(tnlskb);
+ syncpt = l->snd_nxt + skb_queue_len(&l->backlogq) - 1;
+ msg_set_syncpt(hdr, syncpt);
+ msg_set_bearer_id(hdr, l->peer_bearer_id);
+ __skb_queue_tail(&tnlq, tnlskb);
+ tipc_link_xmit(tnl, &tnlq, xmitq);
+ return;
+ }
+
/* Initialize reusable tunnel packet header */
tipc_msg_init(tipc_own_addr(l->net), &tnlhdr, TUNNEL_PROTOCOL,
mtyp, INT_H_SIZE, l->addr);
@@ -1713,6 +1778,39 @@ tnl:
if (queue == &l->backlogq)
msg_set_seqno(hdr, seqno++);
pktlen = msg_size(hdr);
+
+ /* Tunnel link MTU is not large enough? This could be
+ * due to:
+ * 1) Link MTU has just changed or set differently;
+ * 2) Or FAILOVER on the top of a SYNCH message
+ *
+ * The 2nd case should not happen if peer supports
+ * TIPC_TUNNEL_ENHANCED
+ */
+ if (pktlen > tnl->mtu - INT_H_SIZE) {
+ if (mtyp == FAILOVER_MSG &&
+ (tnl->peer_caps & TIPC_TUNNEL_ENHANCED)) {
+ rc = tipc_msg_fragment(skb, &tnlhdr, tnl->mtu,
+ &frags);
+ if (rc) {
+ pr_warn("%sunable to frag msg: rc %d\n",
+ link_co_err, rc);
+ return;
+ }
+ pktcnt += skb_queue_len(&frags) - 1;
+ pktcnt_need_update = true;
+ skb_queue_splice_tail_init(&frags, &tnlq);
+ continue;
+ }
+ /* Unluckily, peer doesn't have TIPC_TUNNEL_ENHANCED
+ * => Just warn it and return!
+ */
+ pr_warn_ratelimited("%stoo large msg <%d, %d>: %d!\n",
+ link_co_err, msg_user(hdr),
+ msg_type(hdr), msg_size(hdr));
+ return;
+ }
+
msg_set_size(&tnlhdr, pktlen + INT_H_SIZE);
tnlskb = tipc_buf_acquire(pktlen + INT_H_SIZE, GFP_ATOMIC);
if (!tnlskb) {
@@ -1728,6 +1826,12 @@ tnl:
goto tnl;
}
+ if (pktcnt_need_update)
+ skb_queue_walk(&tnlq, skb) {
+ hdr = buf_msg(skb);
+ msg_set_msgcnt(hdr, pktcnt);
+ }
+
tipc_link_xmit(tnl, &tnlq, xmitq);
if (mtyp == FAILOVER_MSG) {
diff --git a/net/tipc/msg.c b/net/tipc/msg.c
index f48e5857210f..e6d49cdc61b4 100644
--- a/net/tipc/msg.c
+++ b/net/tipc/msg.c
@@ -244,6 +244,65 @@ bool tipc_msg_validate(struct sk_buff **_skb)
}
/**
+ * tipc_msg_fragment - build a fragment skb list for TIPC message
+ *
+ * @skb: TIPC message skb
+ * @hdr: internal msg header to be put on the top of the fragments
+ * @pktmax: max size of a fragment incl. the header
+ * @frags: returned fragment skb list
+ *
+ * Returns 0 if the fragmentation is successful, otherwise: -EINVAL
+ * or -ENOMEM
+ */
+int tipc_msg_fragment(struct sk_buff *skb, const struct tipc_msg *hdr,
+ int pktmax, struct sk_buff_head *frags)
+{
+ int pktno, nof_fragms, dsz, dmax, eat;
+ struct tipc_msg *_hdr;
+ struct sk_buff *_skb;
+ u8 *data;
+
+ /* Non-linear buffer? */
+ if (skb_linearize(skb))
+ return -ENOMEM;
+
+ data = (u8 *)skb->data;
+ dsz = msg_size(buf_msg(skb));
+ dmax = pktmax - INT_H_SIZE;
+ if (dsz <= dmax || !dmax)
+ return -EINVAL;
+
+ nof_fragms = dsz / dmax + 1;
+ for (pktno = 1; pktno <= nof_fragms; pktno++) {
+ if (pktno < nof_fragms)
+ eat = dmax;
+ else
+ eat = dsz % dmax;
+ /* Allocate a new fragment */
+ _skb = tipc_buf_acquire(INT_H_SIZE + eat, GFP_ATOMIC);
+ if (!_skb)
+ goto error;
+ skb_orphan(_skb);
+ __skb_queue_tail(frags, _skb);
+ /* Copy header & data to the fragment */
+ skb_copy_to_linear_data(_skb, hdr, INT_H_SIZE);
+ skb_copy_to_linear_data_offset(_skb, INT_H_SIZE, data, eat);
+ data += eat;
+ /* Update the fragment's header */
+ _hdr = buf_msg(_skb);
+ msg_set_fragm_no(_hdr, pktno);
+ msg_set_nof_fragms(_hdr, nof_fragms);
+ msg_set_size(_hdr, INT_H_SIZE + eat);
+ }
+ return 0;
+
+error:
+ __skb_queue_purge(frags);
+ __skb_queue_head_init(frags);
+ return -ENOMEM;
+}
+
+/**
* tipc_msg_build - create buffer chain containing specified header and data
* @mhdr: Message header, to be prepended to data
* @m: User message
diff --git a/net/tipc/msg.h b/net/tipc/msg.h
index d7ebc9e955f6..0daa6f04ca81 100644
--- a/net/tipc/msg.h
+++ b/net/tipc/msg.h
@@ -723,12 +723,26 @@ static inline void msg_set_last_bcast(struct tipc_msg *m, u32 n)
msg_set_bits(m, 4, 16, 0xffff, n);
}
+static inline u32 msg_nof_fragms(struct tipc_msg *m)
+{
+ return msg_bits(m, 4, 0, 0xffff);
+}
+
+static inline void msg_set_nof_fragms(struct tipc_msg *m, u32 n)
+{
+ msg_set_bits(m, 4, 0, 0xffff, n);
+}
+
+static inline u32 msg_fragm_no(struct tipc_msg *m)
+{
+ return msg_bits(m, 4, 16, 0xffff);
+}
+
static inline void msg_set_fragm_no(struct tipc_msg *m, u32 n)
{
msg_set_bits(m, 4, 16, 0xffff, n);
}
-
static inline u16 msg_next_sent(struct tipc_msg *m)
{
return msg_bits(m, 4, 0, 0xffff);
@@ -879,6 +893,16 @@ static inline void msg_set_msgcnt(struct tipc_msg *m, u16 n)
msg_set_bits(m, 9, 16, 0xffff, n);
}
+static inline u16 msg_syncpt(struct tipc_msg *m)
+{
+ return msg_bits(m, 9, 16, 0xffff);
+}
+
+static inline void msg_set_syncpt(struct tipc_msg *m, u16 n)
+{
+ msg_set_bits(m, 9, 16, 0xffff, n);
+}
+
static inline u32 msg_conn_ack(struct tipc_msg *m)
{
return msg_bits(m, 9, 16, 0xffff);
@@ -1037,6 +1061,8 @@ bool tipc_msg_bundle(struct sk_buff *skb, struct tipc_msg *msg, u32 mtu);
bool tipc_msg_make_bundle(struct sk_buff **skb, struct tipc_msg *msg,
u32 mtu, u32 dnode);
bool tipc_msg_extract(struct sk_buff *skb, struct sk_buff **iskb, int *pos);
+int tipc_msg_fragment(struct sk_buff *skb, const struct tipc_msg *hdr,
+ int pktmax, struct sk_buff_head *frags);
int tipc_msg_build(struct tipc_msg *mhdr, struct msghdr *m,
int offset, int dsz, int mtu, struct sk_buff_head *list);
bool tipc_msg_lookup_dest(struct net *net, struct sk_buff *skb, int *err);
diff --git a/net/tipc/name_distr.c b/net/tipc/name_distr.c
index 241ed2274473..836e629e8f4a 100644
--- a/net/tipc/name_distr.c
+++ b/net/tipc/name_distr.c
@@ -190,7 +190,7 @@ void tipc_named_node_up(struct net *net, u32 dnode)
struct name_table *nt = tipc_name_table(net);
struct sk_buff_head head;
- skb_queue_head_init(&head);
+ __skb_queue_head_init(&head);
read_lock_bh(&nt->cluster_scope_lock);
named_distribute(net, &head, dnode, &nt->cluster_scope);
diff --git a/net/tipc/node.c b/net/tipc/node.c
index 3a5be1d7e572..c8f6177dd5a2 100644
--- a/net/tipc/node.c
+++ b/net/tipc/node.c
@@ -1443,13 +1443,15 @@ int tipc_node_xmit(struct net *net, struct sk_buff_head *list,
int rc;
if (in_own_node(net, dnode)) {
+ tipc_loopback_trace(net, list);
+ spin_lock_init(&list->lock);
tipc_sk_rcv(net, list);
return 0;
}
n = tipc_node_find(net, dnode);
if (unlikely(!n)) {
- skb_queue_purge(list);
+ __skb_queue_purge(list);
return -EHOSTUNREACH;
}
@@ -1458,7 +1460,7 @@ int tipc_node_xmit(struct net *net, struct sk_buff_head *list,
if (unlikely(bearer_id == INVALID_BEARER_ID)) {
tipc_node_read_unlock(n);
tipc_node_put(n);
- skb_queue_purge(list);
+ __skb_queue_purge(list);
return -EHOSTUNREACH;
}
@@ -1490,7 +1492,7 @@ int tipc_node_xmit_skb(struct net *net, struct sk_buff *skb, u32 dnode,
{
struct sk_buff_head head;
- skb_queue_head_init(&head);
+ __skb_queue_head_init(&head);
__skb_queue_tail(&head, skb);
tipc_node_xmit(net, &head, dnode, selector);
return 0;
@@ -1649,7 +1651,6 @@ static bool tipc_node_check_state(struct tipc_node *n, struct sk_buff *skb,
int usr = msg_user(hdr);
int mtyp = msg_type(hdr);
u16 oseqno = msg_seqno(hdr);
- u16 iseqno = msg_seqno(msg_inner_hdr(hdr));
u16 exp_pkts = msg_msgcnt(hdr);
u16 rcv_nxt, syncpt, dlv_nxt, inputq_len;
int state = n->state;
@@ -1748,7 +1749,10 @@ static bool tipc_node_check_state(struct tipc_node *n, struct sk_buff *skb,
/* Initiate synch mode if applicable */
if ((usr == TUNNEL_PROTOCOL) && (mtyp == SYNCH_MSG) && (oseqno == 1)) {
- syncpt = iseqno + exp_pkts - 1;
+ if (n->capabilities & TIPC_TUNNEL_ENHANCED)
+ syncpt = msg_syncpt(hdr);
+ else
+ syncpt = msg_seqno(msg_inner_hdr(hdr)) + exp_pkts - 1;
if (!tipc_link_is_up(l))
__tipc_node_link_up(n, bearer_id, xmitq);
if (n->state == SELF_UP_PEER_UP) {
diff --git a/net/tipc/node.h b/net/tipc/node.h
index c0bf49ea3de4..291d0ecd4101 100644
--- a/net/tipc/node.h
+++ b/net/tipc/node.h
@@ -53,7 +53,8 @@ enum {
TIPC_NODE_ID128 = (1 << 5),
TIPC_LINK_PROTO_SEQNO = (1 << 6),
TIPC_MCAST_RBCTL = (1 << 7),
- TIPC_GAP_ACK_BLOCK = (1 << 8)
+ TIPC_GAP_ACK_BLOCK = (1 << 8),
+ TIPC_TUNNEL_ENHANCED = (1 << 9)
};
#define TIPC_NODE_CAPABILITIES (TIPC_SYN_BIT | \
@@ -64,7 +65,8 @@ enum {
TIPC_NODE_ID128 | \
TIPC_LINK_PROTO_SEQNO | \
TIPC_MCAST_RBCTL | \
- TIPC_GAP_ACK_BLOCK)
+ TIPC_GAP_ACK_BLOCK | \
+ TIPC_TUNNEL_ENHANCED)
#define INVALID_BEARER_ID -1
void tipc_node_stop(struct net *net);
diff --git a/net/tipc/socket.c b/net/tipc/socket.c
index 83ae41d7e554..3b9f8cc328f5 100644
--- a/net/tipc/socket.c
+++ b/net/tipc/socket.c
@@ -809,7 +809,7 @@ static int tipc_sendmcast(struct socket *sock, struct tipc_name_seq *seq,
msg_set_nameupper(hdr, seq->upper);
/* Build message as chain of buffers */
- skb_queue_head_init(&pkts);
+ __skb_queue_head_init(&pkts);
rc = tipc_msg_build(hdr, msg, 0, dlen, mtu, &pkts);
/* Send message if build was successful */
@@ -853,7 +853,7 @@ static int tipc_send_group_msg(struct net *net, struct tipc_sock *tsk,
msg_set_grp_bc_seqno(hdr, bc_snd_nxt);
/* Build message as chain of buffers */
- skb_queue_head_init(&pkts);
+ __skb_queue_head_init(&pkts);
mtu = tipc_node_get_mtu(net, dnode, tsk->portid);
rc = tipc_msg_build(hdr, m, 0, dlen, mtu, &pkts);
if (unlikely(rc != dlen))
@@ -1058,7 +1058,7 @@ static int tipc_send_group_bcast(struct socket *sock, struct msghdr *m,
msg_set_grp_bc_ack_req(hdr, ack);
/* Build message as chain of buffers */
- skb_queue_head_init(&pkts);
+ __skb_queue_head_init(&pkts);
rc = tipc_msg_build(hdr, m, 0, dlen, mtu, &pkts);
if (unlikely(rc != dlen))
return rc;
@@ -1387,7 +1387,7 @@ static int __tipc_sendmsg(struct socket *sock, struct msghdr *m, size_t dlen)
if (unlikely(rc))
return rc;
- skb_queue_head_init(&pkts);
+ __skb_queue_head_init(&pkts);
mtu = tipc_node_get_mtu(net, dnode, tsk->portid);
rc = tipc_msg_build(hdr, m, 0, dlen, mtu, &pkts);
if (unlikely(rc != dlen))
@@ -1445,7 +1445,7 @@ static int __tipc_sendstream(struct socket *sock, struct msghdr *m, size_t dlen)
int send, sent = 0;
int rc = 0;
- skb_queue_head_init(&pkts);
+ __skb_queue_head_init(&pkts);
if (unlikely(dlen > INT_MAX))
return -EMSGSIZE;
@@ -1805,7 +1805,7 @@ static int tipc_recvmsg(struct socket *sock, struct msghdr *m,
/* Send group flow control advertisement when applicable */
if (tsk->group && msg_in_group(hdr) && !grp_evt) {
- skb_queue_head_init(&xmitq);
+ __skb_queue_head_init(&xmitq);
tipc_group_update_rcv_win(tsk->group, tsk_blocks(hlen + dlen),
msg_orignode(hdr), msg_origport(hdr),
&xmitq);
@@ -2674,7 +2674,7 @@ static void tipc_sk_timeout(struct timer_list *t)
struct sk_buff_head list;
int rc = 0;
- skb_queue_head_init(&list);
+ __skb_queue_head_init(&list);
bh_lock_sock(sk);
/* Try again later if socket is busy */
diff --git a/net/tipc/topsrv.c b/net/tipc/topsrv.c
index ca8ac96d22a9..3a12fc18239b 100644
--- a/net/tipc/topsrv.c
+++ b/net/tipc/topsrv.c
@@ -40,6 +40,7 @@
#include "socket.h"
#include "addr.h"
#include "msg.h"
+#include "bearer.h"
#include <net/sock.h>
#include <linux/module.h>
@@ -608,6 +609,7 @@ static void tipc_topsrv_kern_evt(struct net *net, struct tipc_event *evt)
memcpy(msg_data(buf_msg(skb)), evt, sizeof(*evt));
skb_queue_head_init(&evtq);
__skb_queue_tail(&evtq, skb);
+ tipc_loopback_trace(net, &evtq);
tipc_sk_rcv(net, &evtq);
}