diff options
Diffstat (limited to 'net/xdp')
-rw-r--r-- | net/xdp/xsk.c | 366 | ||||
-rw-r--r-- | net/xdp/xsk_buff_pool.c | 7 | ||||
-rw-r--r-- | net/xdp/xsk_queue.h | 95 |
3 files changed, 354 insertions, 114 deletions
diff --git a/net/xdp/xsk.c b/net/xdp/xsk.c index 10ea85c03147..fcfc8472f73d 100644 --- a/net/xdp/xsk.c +++ b/net/xdp/xsk.c @@ -25,6 +25,7 @@ #include <linux/vmalloc.h> #include <net/xdp_sock_drv.h> #include <net/busy_poll.h> +#include <net/netdev_rx_queue.h> #include <net/xdp.h> #include "xsk_queue.h" @@ -135,14 +136,14 @@ int xsk_reg_pool_at_qid(struct net_device *dev, struct xsk_buff_pool *pool, return 0; } -static int __xsk_rcv_zc(struct xdp_sock *xs, struct xdp_buff *xdp, u32 len) +static int __xsk_rcv_zc(struct xdp_sock *xs, struct xdp_buff_xsk *xskb, u32 len, + u32 flags) { - struct xdp_buff_xsk *xskb = container_of(xdp, struct xdp_buff_xsk, xdp); u64 addr; int err; addr = xp_get_handle(xskb); - err = xskq_prod_reserve_desc(xs->rx, addr, len); + err = xskq_prod_reserve_desc(xs->rx, addr, len, flags); if (err) { xs->rx_queue_full++; return err; @@ -152,48 +153,138 @@ static int __xsk_rcv_zc(struct xdp_sock *xs, struct xdp_buff *xdp, u32 len) return 0; } -static void xsk_copy_xdp(struct xdp_buff *to, struct xdp_buff *from, u32 len) +static int xsk_rcv_zc(struct xdp_sock *xs, struct xdp_buff *xdp, u32 len) { - void *from_buf, *to_buf; - u32 metalen; + struct xdp_buff_xsk *xskb = container_of(xdp, struct xdp_buff_xsk, xdp); + u32 frags = xdp_buff_has_frags(xdp); + struct xdp_buff_xsk *pos, *tmp; + struct list_head *xskb_list; + u32 contd = 0; + int err; - if (unlikely(xdp_data_meta_unsupported(from))) { - from_buf = from->data; - to_buf = to->data; - metalen = 0; - } else { - from_buf = from->data_meta; - metalen = from->data - from->data_meta; - to_buf = to->data - metalen; + if (frags) + contd = XDP_PKT_CONTD; + + err = __xsk_rcv_zc(xs, xskb, len, contd); + if (err || likely(!frags)) + goto out; + + xskb_list = &xskb->pool->xskb_list; + list_for_each_entry_safe(pos, tmp, xskb_list, xskb_list_node) { + if (list_is_singular(xskb_list)) + contd = 0; + len = pos->xdp.data_end - pos->xdp.data; + err = __xsk_rcv_zc(xs, pos, len, contd); + if (err) + return err; + list_del(&pos->xskb_list_node); } - memcpy(to_buf, from_buf, len + metalen); +out: + return err; } -static int __xsk_rcv(struct xdp_sock *xs, struct xdp_buff *xdp) +static void *xsk_copy_xdp_start(struct xdp_buff *from) { + if (unlikely(xdp_data_meta_unsupported(from))) + return from->data; + else + return from->data_meta; +} + +static u32 xsk_copy_xdp(void *to, void **from, u32 to_len, + u32 *from_len, skb_frag_t **frag, u32 rem) +{ + u32 copied = 0; + + while (1) { + u32 copy_len = min_t(u32, *from_len, to_len); + + memcpy(to, *from, copy_len); + copied += copy_len; + if (rem == copied) + return copied; + + if (*from_len == copy_len) { + *from = skb_frag_address(*frag); + *from_len = skb_frag_size((*frag)++); + } else { + *from += copy_len; + *from_len -= copy_len; + } + if (to_len == copy_len) + return copied; + + to_len -= copy_len; + to += copy_len; + } +} + +static int __xsk_rcv(struct xdp_sock *xs, struct xdp_buff *xdp, u32 len) +{ + u32 frame_size = xsk_pool_get_rx_frame_size(xs->pool); + void *copy_from = xsk_copy_xdp_start(xdp), *copy_to; + u32 from_len, meta_len, rem, num_desc; + struct xdp_buff_xsk *xskb; struct xdp_buff *xsk_xdp; - int err; - u32 len; + skb_frag_t *frag; - len = xdp->data_end - xdp->data; - if (len > xsk_pool_get_rx_frame_size(xs->pool)) { - xs->rx_dropped++; - return -ENOSPC; + from_len = xdp->data_end - copy_from; + meta_len = xdp->data - copy_from; + rem = len + meta_len; + + if (len <= frame_size && !xdp_buff_has_frags(xdp)) { + int err; + + xsk_xdp = xsk_buff_alloc(xs->pool); + if (!xsk_xdp) { + xs->rx_dropped++; + return -ENOMEM; + } + memcpy(xsk_xdp->data - meta_len, copy_from, rem); + xskb = container_of(xsk_xdp, struct xdp_buff_xsk, xdp); + err = __xsk_rcv_zc(xs, xskb, len, 0); + if (err) { + xsk_buff_free(xsk_xdp); + return err; + } + + return 0; } - xsk_xdp = xsk_buff_alloc(xs->pool); - if (!xsk_xdp) { + num_desc = (len - 1) / frame_size + 1; + + if (!xsk_buff_can_alloc(xs->pool, num_desc)) { xs->rx_dropped++; return -ENOMEM; } + if (xskq_prod_nb_free(xs->rx, num_desc) < num_desc) { + xs->rx_queue_full++; + return -ENOBUFS; + } - xsk_copy_xdp(xsk_xdp, xdp, len); - err = __xsk_rcv_zc(xs, xsk_xdp, len); - if (err) { - xsk_buff_free(xsk_xdp); - return err; + if (xdp_buff_has_frags(xdp)) { + struct skb_shared_info *sinfo; + + sinfo = xdp_get_shared_info_from_buff(xdp); + frag = &sinfo->frags[0]; } + + do { + u32 to_len = frame_size + meta_len; + u32 copied; + + xsk_xdp = xsk_buff_alloc(xs->pool); + copy_to = xsk_xdp->data - meta_len; + + copied = xsk_copy_xdp(copy_to, ©_from, to_len, &from_len, &frag, rem); + rem -= copied; + + xskb = container_of(xsk_xdp, struct xdp_buff_xsk, xdp); + __xsk_rcv_zc(xs, xskb, copied - meta_len, rem ? XDP_PKT_CONTD : 0); + meta_len = 0; + } while (rem); + return 0; } @@ -215,7 +306,7 @@ static bool xsk_is_bound(struct xdp_sock *xs) return false; } -static int xsk_rcv_check(struct xdp_sock *xs, struct xdp_buff *xdp) +static int xsk_rcv_check(struct xdp_sock *xs, struct xdp_buff *xdp, u32 len) { if (!xsk_is_bound(xs)) return -ENXIO; @@ -223,6 +314,11 @@ static int xsk_rcv_check(struct xdp_sock *xs, struct xdp_buff *xdp) if (xs->dev != xdp->rxq->dev || xs->queue_id != xdp->rxq->queue_index) return -EINVAL; + if (len > xsk_pool_get_rx_frame_size(xs->pool) && !xs->sg) { + xs->rx_dropped++; + return -ENOSPC; + } + sk_mark_napi_id_once_xdp(&xs->sk, xdp); return 0; } @@ -236,12 +332,13 @@ static void xsk_flush(struct xdp_sock *xs) int xsk_generic_rcv(struct xdp_sock *xs, struct xdp_buff *xdp) { + u32 len = xdp_get_buff_len(xdp); int err; spin_lock_bh(&xs->rx_lock); - err = xsk_rcv_check(xs, xdp); + err = xsk_rcv_check(xs, xdp, len); if (!err) { - err = __xsk_rcv(xs, xdp); + err = __xsk_rcv(xs, xdp, len); xsk_flush(xs); } spin_unlock_bh(&xs->rx_lock); @@ -250,19 +347,19 @@ int xsk_generic_rcv(struct xdp_sock *xs, struct xdp_buff *xdp) static int xsk_rcv(struct xdp_sock *xs, struct xdp_buff *xdp) { + u32 len = xdp_get_buff_len(xdp); int err; - u32 len; - err = xsk_rcv_check(xs, xdp); + err = xsk_rcv_check(xs, xdp, len); if (err) return err; if (xdp->rxq->mem.type == MEM_TYPE_XSK_BUFF_POOL) { len = xdp->data_end - xdp->data; - return __xsk_rcv_zc(xs, xdp, len); + return xsk_rcv_zc(xs, xdp, len); } - err = __xsk_rcv(xs, xdp); + err = __xsk_rcv(xs, xdp, len); if (!err) xdp_return_buff(xdp); return err; @@ -321,7 +418,8 @@ bool xsk_tx_peek_desc(struct xsk_buff_pool *pool, struct xdp_desc *desc) rcu_read_lock(); list_for_each_entry_rcu(xs, &pool->xsk_tx_list, tx_list) { if (!xskq_cons_peek_desc(xs->tx, desc, pool)) { - xs->tx->queue_empty_descs++; + if (xskq_has_descs(xs->tx)) + xskq_cons_release(xs->tx); continue; } @@ -408,37 +506,91 @@ static int xsk_wakeup(struct xdp_sock *xs, u8 flags) return dev->netdev_ops->ndo_xsk_wakeup(dev, xs->queue_id, flags); } -static void xsk_destruct_skb(struct sk_buff *skb) +static int xsk_cq_reserve_addr_locked(struct xdp_sock *xs, u64 addr) +{ + unsigned long flags; + int ret; + + spin_lock_irqsave(&xs->pool->cq_lock, flags); + ret = xskq_prod_reserve_addr(xs->pool->cq, addr); + spin_unlock_irqrestore(&xs->pool->cq_lock, flags); + + return ret; +} + +static void xsk_cq_submit_locked(struct xdp_sock *xs, u32 n) { - u64 addr = (u64)(long)skb_shinfo(skb)->destructor_arg; - struct xdp_sock *xs = xdp_sk(skb->sk); unsigned long flags; spin_lock_irqsave(&xs->pool->cq_lock, flags); - xskq_prod_submit_addr(xs->pool->cq, addr); + xskq_prod_submit_n(xs->pool->cq, n); spin_unlock_irqrestore(&xs->pool->cq_lock, flags); +} + +static void xsk_cq_cancel_locked(struct xdp_sock *xs, u32 n) +{ + unsigned long flags; + spin_lock_irqsave(&xs->pool->cq_lock, flags); + xskq_prod_cancel_n(xs->pool->cq, n); + spin_unlock_irqrestore(&xs->pool->cq_lock, flags); +} + +static u32 xsk_get_num_desc(struct sk_buff *skb) +{ + return skb ? (long)skb_shinfo(skb)->destructor_arg : 0; +} + +static void xsk_destruct_skb(struct sk_buff *skb) +{ + xsk_cq_submit_locked(xdp_sk(skb->sk), xsk_get_num_desc(skb)); sock_wfree(skb); } +static void xsk_set_destructor_arg(struct sk_buff *skb) +{ + long num = xsk_get_num_desc(xdp_sk(skb->sk)->skb) + 1; + + skb_shinfo(skb)->destructor_arg = (void *)num; +} + +static void xsk_consume_skb(struct sk_buff *skb) +{ + struct xdp_sock *xs = xdp_sk(skb->sk); + + skb->destructor = sock_wfree; + xsk_cq_cancel_locked(xs, xsk_get_num_desc(skb)); + /* Free skb without triggering the perf drop trace */ + consume_skb(skb); + xs->skb = NULL; +} + +static void xsk_drop_skb(struct sk_buff *skb) +{ + xdp_sk(skb->sk)->tx->invalid_descs += xsk_get_num_desc(skb); + xsk_consume_skb(skb); +} + static struct sk_buff *xsk_build_skb_zerocopy(struct xdp_sock *xs, struct xdp_desc *desc) { struct xsk_buff_pool *pool = xs->pool; u32 hr, len, ts, offset, copy, copied; - struct sk_buff *skb; + struct sk_buff *skb = xs->skb; struct page *page; void *buffer; int err, i; u64 addr; - hr = max(NET_SKB_PAD, L1_CACHE_ALIGN(xs->dev->needed_headroom)); + if (!skb) { + hr = max(NET_SKB_PAD, L1_CACHE_ALIGN(xs->dev->needed_headroom)); - skb = sock_alloc_send_skb(&xs->sk, hr, 1, &err); - if (unlikely(!skb)) - return ERR_PTR(err); + skb = sock_alloc_send_skb(&xs->sk, hr, 1, &err); + if (unlikely(!skb)) + return ERR_PTR(err); - skb_reserve(skb, hr); + skb_reserve(skb, hr); + } addr = desc->addr; len = desc->len; @@ -448,7 +600,10 @@ static struct sk_buff *xsk_build_skb_zerocopy(struct xdp_sock *xs, offset = offset_in_page(buffer); addr = buffer - pool->addrs; - for (copied = 0, i = 0; copied < len; i++) { + for (copied = 0, i = skb_shinfo(skb)->nr_frags; copied < len; i++) { + if (unlikely(i >= MAX_SKB_FRAGS)) + return ERR_PTR(-EFAULT); + page = pool->umem->pgs[addr >> PAGE_SHIFT]; get_page(page); @@ -473,43 +628,77 @@ static struct sk_buff *xsk_build_skb(struct xdp_sock *xs, struct xdp_desc *desc) { struct net_device *dev = xs->dev; - struct sk_buff *skb; + struct sk_buff *skb = xs->skb; + int err; if (dev->priv_flags & IFF_TX_SKB_NO_LINEAR) { skb = xsk_build_skb_zerocopy(xs, desc); - if (IS_ERR(skb)) - return skb; + if (IS_ERR(skb)) { + err = PTR_ERR(skb); + goto free_err; + } } else { u32 hr, tr, len; void *buffer; - int err; - hr = max(NET_SKB_PAD, L1_CACHE_ALIGN(dev->needed_headroom)); - tr = dev->needed_tailroom; + buffer = xsk_buff_raw_get_data(xs->pool, desc->addr); len = desc->len; - skb = sock_alloc_send_skb(&xs->sk, hr + len + tr, 1, &err); - if (unlikely(!skb)) - return ERR_PTR(err); + if (!skb) { + hr = max(NET_SKB_PAD, L1_CACHE_ALIGN(dev->needed_headroom)); + tr = dev->needed_tailroom; + skb = sock_alloc_send_skb(&xs->sk, hr + len + tr, 1, &err); + if (unlikely(!skb)) + goto free_err; - skb_reserve(skb, hr); - skb_put(skb, len); + skb_reserve(skb, hr); + skb_put(skb, len); - buffer = xsk_buff_raw_get_data(xs->pool, desc->addr); - err = skb_store_bits(skb, 0, buffer, len); - if (unlikely(err)) { - kfree_skb(skb); - return ERR_PTR(err); + err = skb_store_bits(skb, 0, buffer, len); + if (unlikely(err)) + goto free_err; + } else { + int nr_frags = skb_shinfo(skb)->nr_frags; + struct page *page; + u8 *vaddr; + + if (unlikely(nr_frags == (MAX_SKB_FRAGS - 1) && xp_mb_desc(desc))) { + err = -EFAULT; + goto free_err; + } + + page = alloc_page(xs->sk.sk_allocation); + if (unlikely(!page)) { + err = -EAGAIN; + goto free_err; + } + + vaddr = kmap_local_page(page); + memcpy(vaddr, buffer, len); + kunmap_local(vaddr); + + skb_add_rx_frag(skb, nr_frags, page, 0, len, 0); } } skb->dev = dev; skb->priority = xs->sk.sk_priority; skb->mark = READ_ONCE(xs->sk.sk_mark); - skb_shinfo(skb)->destructor_arg = (void *)(long)desc->addr; skb->destructor = xsk_destruct_skb; + xsk_set_destructor_arg(skb); return skb; + +free_err: + if (err == -EAGAIN) { + xsk_cq_cancel_locked(xs, 1); + } else { + xsk_set_destructor_arg(skb); + xsk_drop_skb(skb); + xskq_cons_release(xs->tx); + } + + return ERR_PTR(err); } static int __xsk_generic_xmit(struct sock *sk) @@ -519,7 +708,6 @@ static int __xsk_generic_xmit(struct sock *sk) bool sent_frame = false; struct xdp_desc desc; struct sk_buff *skb; - unsigned long flags; int err = 0; mutex_lock(&xs->mutex); @@ -544,47 +732,51 @@ static int __xsk_generic_xmit(struct sock *sk) * if there is space in it. This avoids having to implement * any buffering in the Tx path. */ - spin_lock_irqsave(&xs->pool->cq_lock, flags); - if (xskq_prod_reserve(xs->pool->cq)) { - spin_unlock_irqrestore(&xs->pool->cq_lock, flags); + if (xsk_cq_reserve_addr_locked(xs, desc.addr)) goto out; - } - spin_unlock_irqrestore(&xs->pool->cq_lock, flags); skb = xsk_build_skb(xs, &desc); if (IS_ERR(skb)) { err = PTR_ERR(skb); - spin_lock_irqsave(&xs->pool->cq_lock, flags); - xskq_prod_cancel(xs->pool->cq); - spin_unlock_irqrestore(&xs->pool->cq_lock, flags); - goto out; + if (err == -EAGAIN) + goto out; + err = 0; + continue; + } + + xskq_cons_release(xs->tx); + + if (xp_mb_desc(&desc)) { + xs->skb = skb; + continue; } err = __dev_direct_xmit(skb, xs->queue_id); if (err == NETDEV_TX_BUSY) { /* Tell user-space to retry the send */ - skb->destructor = sock_wfree; - spin_lock_irqsave(&xs->pool->cq_lock, flags); - xskq_prod_cancel(xs->pool->cq); - spin_unlock_irqrestore(&xs->pool->cq_lock, flags); - /* Free skb without triggering the perf drop trace */ - consume_skb(skb); + xskq_cons_cancel_n(xs->tx, xsk_get_num_desc(skb)); + xsk_consume_skb(skb); err = -EAGAIN; goto out; } - xskq_cons_release(xs->tx); /* Ignore NET_XMIT_CN as packet might have been sent */ if (err == NET_XMIT_DROP) { /* SKB completed but not sent */ err = -EBUSY; + xs->skb = NULL; goto out; } sent_frame = true; + xs->skb = NULL; } - xs->tx->queue_empty_descs++; + if (xskq_has_descs(xs->tx)) { + if (xs->skb) + xsk_drop_skb(xs->skb); + xskq_cons_release(xs->tx); + } out: if (sent_frame) @@ -834,6 +1026,9 @@ static int xsk_release(struct socket *sock) net = sock_net(sk); + if (xs->skb) + xsk_drop_skb(xs->skb); + mutex_lock(&net->xdp.lock); sk_del_node_init_rcu(sk); mutex_unlock(&net->xdp.lock); @@ -897,7 +1092,7 @@ static int xsk_bind(struct socket *sock, struct sockaddr *addr, int addr_len) flags = sxdp->sxdp_flags; if (flags & ~(XDP_SHARED_UMEM | XDP_COPY | XDP_ZEROCOPY | - XDP_USE_NEED_WAKEUP)) + XDP_USE_NEED_WAKEUP | XDP_USE_SG)) return -EINVAL; bound_dev_if = READ_ONCE(sk->sk_bound_dev_if); @@ -929,7 +1124,7 @@ static int xsk_bind(struct socket *sock, struct sockaddr *addr, int addr_len) struct socket *sock; if ((flags & XDP_COPY) || (flags & XDP_ZEROCOPY) || - (flags & XDP_USE_NEED_WAKEUP)) { + (flags & XDP_USE_NEED_WAKEUP) || (flags & XDP_USE_SG)) { /* Cannot specify flags for shared sockets. */ err = -EINVAL; goto out_unlock; @@ -1029,6 +1224,7 @@ static int xsk_bind(struct socket *sock, struct sockaddr *addr, int addr_len) xs->dev = dev; xs->zc = xs->umem->zc; + xs->sg = !!(flags & XDP_USE_SG); xs->queue_id = qid; xp_add_xsk(xs->pool, xs); diff --git a/net/xdp/xsk_buff_pool.c b/net/xdp/xsk_buff_pool.c index 26f6d304451e..b3f7b310811e 100644 --- a/net/xdp/xsk_buff_pool.c +++ b/net/xdp/xsk_buff_pool.c @@ -86,6 +86,7 @@ struct xsk_buff_pool *xp_create_and_assign_umem(struct xdp_sock *xs, pool->umem = umem; pool->addrs = umem->addrs; INIT_LIST_HEAD(&pool->free_list); + INIT_LIST_HEAD(&pool->xskb_list); INIT_LIST_HEAD(&pool->xsk_tx_list); spin_lock_init(&pool->xsk_tx_list_lock); spin_lock_init(&pool->cq_lock); @@ -99,6 +100,7 @@ struct xsk_buff_pool *xp_create_and_assign_umem(struct xdp_sock *xs, xskb->pool = pool; xskb->xdp.frame_sz = umem->chunk_size - umem->headroom; INIT_LIST_HEAD(&xskb->free_list_node); + INIT_LIST_HEAD(&xskb->xskb_list_node); if (pool->unaligned) pool->free_heads[i] = xskb; else @@ -187,6 +189,11 @@ int xp_assign_dev(struct xsk_buff_pool *pool, goto err_unreg_pool; } + if (netdev->xdp_zc_max_segs == 1 && (flags & XDP_USE_SG)) { + err = -EOPNOTSUPP; + goto err_unreg_pool; + } + bpf.command = XDP_SETUP_XSK_POOL; bpf.xsk.pool = pool; bpf.xsk.queue_id = queue_id; diff --git a/net/xdp/xsk_queue.h b/net/xdp/xsk_queue.h index 6d40a77fccbe..13354a1e4280 100644 --- a/net/xdp/xsk_queue.h +++ b/net/xdp/xsk_queue.h @@ -48,6 +48,11 @@ struct xsk_queue { size_t ring_vmalloc_size; }; +struct parsed_desc { + u32 mb; + u32 valid; +}; + /* The structure of the shared state of the rings are a simple * circular buffer, as outlined in * Documentation/core-api/circular-buffers.rst. For the Rx and @@ -130,18 +135,26 @@ static inline bool xskq_cons_read_addr_unchecked(struct xsk_queue *q, u64 *addr) return false; } +static inline bool xp_unused_options_set(u32 options) +{ + return options & ~XDP_PKT_CONTD; +} + static inline bool xp_aligned_validate_desc(struct xsk_buff_pool *pool, struct xdp_desc *desc) { u64 offset = desc->addr & (pool->chunk_size - 1); + if (!desc->len) + return false; + if (offset + desc->len > pool->chunk_size) return false; if (desc->addr >= pool->addrs_cnt) return false; - if (desc->options) + if (xp_unused_options_set(desc->options)) return false; return true; } @@ -151,6 +164,9 @@ static inline bool xp_unaligned_validate_desc(struct xsk_buff_pool *pool, { u64 addr = xp_unaligned_add_offset_to_addr(desc->addr); + if (!desc->len) + return false; + if (desc->len > pool->chunk_size) return false; @@ -158,7 +174,7 @@ static inline bool xp_unaligned_validate_desc(struct xsk_buff_pool *pool, xp_desc_crosses_non_contig_pg(pool, addr, desc->len)) return false; - if (desc->options) + if (xp_unused_options_set(desc->options)) return false; return true; } @@ -170,6 +186,11 @@ static inline bool xp_validate_desc(struct xsk_buff_pool *pool, xp_aligned_validate_desc(pool, desc); } +static inline bool xskq_has_descs(struct xsk_queue *q) +{ + return q->cached_cons != q->cached_prod; +} + static inline bool xskq_cons_is_valid_desc(struct xsk_queue *q, struct xdp_desc *d, struct xsk_buff_pool *pool) @@ -185,17 +206,15 @@ static inline bool xskq_cons_read_desc(struct xsk_queue *q, struct xdp_desc *desc, struct xsk_buff_pool *pool) { - while (q->cached_cons != q->cached_prod) { + if (q->cached_cons != q->cached_prod) { struct xdp_rxtx_ring *ring = (struct xdp_rxtx_ring *)q->ring; u32 idx = q->cached_cons & q->ring_mask; *desc = ring->desc[idx]; - if (xskq_cons_is_valid_desc(q, desc, pool)) - return true; - - q->cached_cons++; + return xskq_cons_is_valid_desc(q, desc, pool); } + q->queue_empty_descs++; return false; } @@ -204,30 +223,52 @@ static inline void xskq_cons_release_n(struct xsk_queue *q, u32 cnt) q->cached_cons += cnt; } -static inline u32 xskq_cons_read_desc_batch(struct xsk_queue *q, struct xsk_buff_pool *pool, - u32 max) +static inline void parse_desc(struct xsk_queue *q, struct xsk_buff_pool *pool, + struct xdp_desc *desc, struct parsed_desc *parsed) +{ + parsed->valid = xskq_cons_is_valid_desc(q, desc, pool); + parsed->mb = xp_mb_desc(desc); +} + +static inline +u32 xskq_cons_read_desc_batch(struct xsk_queue *q, struct xsk_buff_pool *pool, + u32 max) { u32 cached_cons = q->cached_cons, nb_entries = 0; struct xdp_desc *descs = pool->tx_descs; + u32 total_descs = 0, nr_frags = 0; + /* track first entry, if stumble upon *any* invalid descriptor, rewind + * current packet that consists of frags and stop the processing + */ while (cached_cons != q->cached_prod && nb_entries < max) { struct xdp_rxtx_ring *ring = (struct xdp_rxtx_ring *)q->ring; u32 idx = cached_cons & q->ring_mask; + struct parsed_desc parsed; descs[nb_entries] = ring->desc[idx]; - if (unlikely(!xskq_cons_is_valid_desc(q, &descs[nb_entries], pool))) { - /* Skip the entry */ - cached_cons++; - continue; + cached_cons++; + parse_desc(q, pool, &descs[nb_entries], &parsed); + if (unlikely(!parsed.valid)) + break; + + if (likely(!parsed.mb)) { + total_descs += (nr_frags + 1); + nr_frags = 0; + } else { + nr_frags++; + if (nr_frags == pool->netdev->xdp_zc_max_segs) { + nr_frags = 0; + break; + } } - nb_entries++; - cached_cons++; } + cached_cons -= nr_frags; /* Release valid plus any invalid entries */ xskq_cons_release_n(q, cached_cons - q->cached_cons); - return nb_entries; + return total_descs; } /* Functions for consumers */ @@ -292,6 +333,11 @@ static inline void xskq_cons_release(struct xsk_queue *q) q->cached_cons++; } +static inline void xskq_cons_cancel_n(struct xsk_queue *q, u32 cnt) +{ + q->cached_cons -= cnt; +} + static inline u32 xskq_cons_present_entries(struct xsk_queue *q) { /* No barriers needed since data is not accessed */ @@ -319,9 +365,9 @@ static inline bool xskq_prod_is_full(struct xsk_queue *q) return xskq_prod_nb_free(q, 1) ? false : true; } -static inline void xskq_prod_cancel(struct xsk_queue *q) +static inline void xskq_prod_cancel_n(struct xsk_queue *q, u32 cnt) { - q->cached_prod--; + q->cached_prod -= cnt; } static inline int xskq_prod_reserve(struct xsk_queue *q) @@ -360,7 +406,7 @@ static inline void xskq_prod_write_addr_batch(struct xsk_queue *q, struct xdp_de } static inline int xskq_prod_reserve_desc(struct xsk_queue *q, - u64 addr, u32 len) + u64 addr, u32 len, u32 flags) { struct xdp_rxtx_ring *ring = (struct xdp_rxtx_ring *)q->ring; u32 idx; @@ -372,6 +418,7 @@ static inline int xskq_prod_reserve_desc(struct xsk_queue *q, idx = q->cached_prod++ & q->ring_mask; ring->desc[idx].addr = addr; ring->desc[idx].len = len; + ring->desc[idx].options = flags; return 0; } @@ -386,16 +433,6 @@ static inline void xskq_prod_submit(struct xsk_queue *q) __xskq_prod_submit(q, q->cached_prod); } -static inline void xskq_prod_submit_addr(struct xsk_queue *q, u64 addr) -{ - struct xdp_umem_ring *ring = (struct xdp_umem_ring *)q->ring; - u32 idx = q->ring->producer; - - ring->desc[idx++ & q->ring_mask] = addr; - - __xskq_prod_submit(q, idx); -} - static inline void xskq_prod_submit_n(struct xsk_queue *q, u32 nb_entries) { __xskq_prod_submit(q, q->ring->producer + nb_entries); |