Skip to content

Commit 35fcde7

Browse files
magnus-karlssonAlexei Starovoitov
authored andcommitted
xsk: support for Tx
Here, Tx support is added. The user fills the Tx queue with frames to be sent by the kernel, and let's the kernel know using the sendmsg syscall. Signed-off-by: Magnus Karlsson <[email protected]> Signed-off-by: Alexei Starovoitov <[email protected]>
1 parent 865b03f commit 35fcde7

File tree

2 files changed

+200
-4
lines changed

2 files changed

+200
-4
lines changed

net/xdp/xsk.c

Lines changed: 108 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,8 @@
3636
#include "xsk_queue.h"
3737
#include "xdp_umem.h"
3838

39+
#define TX_BATCH_SIZE 16
40+
3941
static struct xdp_sock *xdp_sk(struct sock *sk)
4042
{
4143
return (struct xdp_sock *)sk;
@@ -101,6 +103,108 @@ int xsk_generic_rcv(struct xdp_sock *xs, struct xdp_buff *xdp)
101103
return err;
102104
}
103105

106+
static void xsk_destruct_skb(struct sk_buff *skb)
107+
{
108+
u32 id = (u32)(long)skb_shinfo(skb)->destructor_arg;
109+
struct xdp_sock *xs = xdp_sk(skb->sk);
110+
111+
WARN_ON_ONCE(xskq_produce_id(xs->umem->cq, id));
112+
113+
sock_wfree(skb);
114+
}
115+
116+
static int xsk_generic_xmit(struct sock *sk, struct msghdr *m,
117+
size_t total_len)
118+
{
119+
bool need_wait = !(m->msg_flags & MSG_DONTWAIT);
120+
u32 max_batch = TX_BATCH_SIZE;
121+
struct xdp_sock *xs = xdp_sk(sk);
122+
bool sent_frame = false;
123+
struct xdp_desc desc;
124+
struct sk_buff *skb;
125+
int err = 0;
126+
127+
if (unlikely(!xs->tx))
128+
return -ENOBUFS;
129+
if (need_wait)
130+
return -EOPNOTSUPP;
131+
132+
mutex_lock(&xs->mutex);
133+
134+
while (xskq_peek_desc(xs->tx, &desc)) {
135+
char *buffer;
136+
u32 id, len;
137+
138+
if (max_batch-- == 0) {
139+
err = -EAGAIN;
140+
goto out;
141+
}
142+
143+
if (xskq_reserve_id(xs->umem->cq)) {
144+
err = -EAGAIN;
145+
goto out;
146+
}
147+
148+
len = desc.len;
149+
if (unlikely(len > xs->dev->mtu)) {
150+
err = -EMSGSIZE;
151+
goto out;
152+
}
153+
154+
skb = sock_alloc_send_skb(sk, len, !need_wait, &err);
155+
if (unlikely(!skb)) {
156+
err = -EAGAIN;
157+
goto out;
158+
}
159+
160+
skb_put(skb, len);
161+
id = desc.idx;
162+
buffer = xdp_umem_get_data(xs->umem, id) + desc.offset;
163+
err = skb_store_bits(skb, 0, buffer, len);
164+
if (unlikely(err)) {
165+
kfree_skb(skb);
166+
goto out;
167+
}
168+
169+
skb->dev = xs->dev;
170+
skb->priority = sk->sk_priority;
171+
skb->mark = sk->sk_mark;
172+
skb_shinfo(skb)->destructor_arg = (void *)(long)id;
173+
skb->destructor = xsk_destruct_skb;
174+
175+
err = dev_direct_xmit(skb, xs->queue_id);
176+
/* Ignore NET_XMIT_CN as packet might have been sent */
177+
if (err == NET_XMIT_DROP || err == NETDEV_TX_BUSY) {
178+
err = -EAGAIN;
179+
/* SKB consumed by dev_direct_xmit() */
180+
goto out;
181+
}
182+
183+
sent_frame = true;
184+
xskq_discard_desc(xs->tx);
185+
}
186+
187+
out:
188+
if (sent_frame)
189+
sk->sk_write_space(sk);
190+
191+
mutex_unlock(&xs->mutex);
192+
return err;
193+
}
194+
195+
static int xsk_sendmsg(struct socket *sock, struct msghdr *m, size_t total_len)
196+
{
197+
struct sock *sk = sock->sk;
198+
struct xdp_sock *xs = xdp_sk(sk);
199+
200+
if (unlikely(!xs->dev))
201+
return -ENXIO;
202+
if (unlikely(!(xs->dev->flags & IFF_UP)))
203+
return -ENETDOWN;
204+
205+
return xsk_generic_xmit(sk, m, total_len);
206+
}
207+
104208
static unsigned int xsk_poll(struct file *file, struct socket *sock,
105209
struct poll_table_struct *wait)
106210
{
@@ -110,6 +214,8 @@ static unsigned int xsk_poll(struct file *file, struct socket *sock,
110214

111215
if (xs->rx && !xskq_empty_desc(xs->rx))
112216
mask |= POLLIN | POLLRDNORM;
217+
if (xs->tx && !xskq_full_desc(xs->tx))
218+
mask |= POLLOUT | POLLWRNORM;
113219

114220
return mask;
115221
}
@@ -270,6 +376,7 @@ static int xsk_bind(struct socket *sock, struct sockaddr *addr, int addr_len)
270376
xs->queue_id = sxdp->sxdp_queue_id;
271377

272378
xskq_set_umem(xs->rx, &xs->umem->props);
379+
xskq_set_umem(xs->tx, &xs->umem->props);
273380

274381
out_unlock:
275382
if (err)
@@ -383,8 +490,6 @@ static int xsk_mmap(struct file *file, struct socket *sock,
383490
q = xs->umem->fq;
384491
else if (offset == XDP_UMEM_PGOFF_COMPLETION_RING)
385492
q = xs->umem->cq;
386-
else
387-
return -EINVAL;
388493
}
389494

390495
if (!q)
@@ -420,7 +525,7 @@ static const struct proto_ops xsk_proto_ops = {
420525
.shutdown = sock_no_shutdown,
421526
.setsockopt = xsk_setsockopt,
422527
.getsockopt = sock_no_getsockopt,
423-
.sendmsg = sock_no_sendmsg,
528+
.sendmsg = xsk_sendmsg,
424529
.recvmsg = sock_no_recvmsg,
425530
.mmap = xsk_mmap,
426531
.sendpage = sock_no_sendpage,

net/xdp/xsk_queue.h

Lines changed: 92 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -111,7 +111,93 @@ static inline void xskq_discard_id(struct xsk_queue *q)
111111
(void)xskq_validate_id(q);
112112
}
113113

114-
/* Rx queue */
114+
static inline int xskq_produce_id(struct xsk_queue *q, u32 id)
115+
{
116+
struct xdp_umem_ring *ring = (struct xdp_umem_ring *)q->ring;
117+
118+
ring->desc[q->prod_tail++ & q->ring_mask] = id;
119+
120+
/* Order producer and data */
121+
smp_wmb();
122+
123+
WRITE_ONCE(q->ring->producer, q->prod_tail);
124+
return 0;
125+
}
126+
127+
static inline int xskq_reserve_id(struct xsk_queue *q)
128+
{
129+
if (xskq_nb_free(q, q->prod_head, 1) == 0)
130+
return -ENOSPC;
131+
132+
q->prod_head++;
133+
return 0;
134+
}
135+
136+
/* Rx/Tx queue */
137+
138+
static inline bool xskq_is_valid_desc(struct xsk_queue *q, struct xdp_desc *d)
139+
{
140+
u32 buff_len;
141+
142+
if (unlikely(d->idx >= q->umem_props.nframes)) {
143+
q->invalid_descs++;
144+
return false;
145+
}
146+
147+
buff_len = q->umem_props.frame_size;
148+
if (unlikely(d->len > buff_len || d->len == 0 ||
149+
d->offset > buff_len || d->offset + d->len > buff_len)) {
150+
q->invalid_descs++;
151+
return false;
152+
}
153+
154+
return true;
155+
}
156+
157+
static inline struct xdp_desc *xskq_validate_desc(struct xsk_queue *q,
158+
struct xdp_desc *desc)
159+
{
160+
while (q->cons_tail != q->cons_head) {
161+
struct xdp_rxtx_ring *ring = (struct xdp_rxtx_ring *)q->ring;
162+
unsigned int idx = q->cons_tail & q->ring_mask;
163+
164+
if (xskq_is_valid_desc(q, &ring->desc[idx])) {
165+
if (desc)
166+
*desc = ring->desc[idx];
167+
return desc;
168+
}
169+
170+
q->cons_tail++;
171+
}
172+
173+
return NULL;
174+
}
175+
176+
static inline struct xdp_desc *xskq_peek_desc(struct xsk_queue *q,
177+
struct xdp_desc *desc)
178+
{
179+
struct xdp_rxtx_ring *ring;
180+
181+
if (q->cons_tail == q->cons_head) {
182+
WRITE_ONCE(q->ring->consumer, q->cons_tail);
183+
q->cons_head = q->cons_tail + xskq_nb_avail(q, RX_BATCH_SIZE);
184+
185+
/* Order consumer and data */
186+
smp_rmb();
187+
188+
return xskq_validate_desc(q, desc);
189+
}
190+
191+
ring = (struct xdp_rxtx_ring *)q->ring;
192+
*desc = ring->desc[q->cons_tail & q->ring_mask];
193+
return desc;
194+
}
195+
196+
static inline void xskq_discard_desc(struct xsk_queue *q)
197+
{
198+
q->cons_tail++;
199+
(void)xskq_validate_desc(q, NULL);
200+
}
115201

116202
static inline int xskq_produce_batch_desc(struct xsk_queue *q,
117203
u32 id, u32 len, u16 offset)
@@ -139,6 +225,11 @@ static inline void xskq_produce_flush_desc(struct xsk_queue *q)
139225
WRITE_ONCE(q->ring->producer, q->prod_tail);
140226
}
141227

228+
static inline bool xskq_full_desc(struct xsk_queue *q)
229+
{
230+
return (xskq_nb_avail(q, q->nentries) == q->nentries);
231+
}
232+
142233
static inline bool xskq_empty_desc(struct xsk_queue *q)
143234
{
144235
return (xskq_nb_free(q, q->prod_tail, 1) == q->nentries);

0 commit comments

Comments
 (0)