• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 // SPDX-License-Identifier: GPL-2.0
2 #define _GNU_SOURCE
3 #define __EXPORTED_HEADERS__
4 
5 #include <linux/uio.h>
6 #include <stdio.h>
7 #include <stdlib.h>
8 #include <unistd.h>
9 #include <stdbool.h>
10 #include <string.h>
11 #include <errno.h>
12 #define __iovec_defined
13 #include <fcntl.h>
14 #include <malloc.h>
15 #include <error.h>
16 
17 #include <arpa/inet.h>
18 #include <sys/socket.h>
19 #include <sys/mman.h>
20 #include <sys/ioctl.h>
21 #include <sys/syscall.h>
22 
23 #include <linux/memfd.h>
24 #include <linux/dma-buf.h>
25 #include <linux/udmabuf.h>
26 #include <libmnl/libmnl.h>
27 #include <linux/types.h>
28 #include <linux/netlink.h>
29 #include <linux/genetlink.h>
30 #include <linux/netdev.h>
31 #include <time.h>
32 #include <net/if.h>
33 
34 #include "netdev-user.h"
35 #include <ynl.h>
36 
37 #define PAGE_SHIFT 12
38 #define TEST_PREFIX "ncdevmem"
39 #define NUM_PAGES 16000
40 
41 #ifndef MSG_SOCK_DEVMEM
42 #define MSG_SOCK_DEVMEM 0x2000000
43 #endif
44 
45 /*
46  * tcpdevmem netcat. Works similarly to netcat but does device memory TCP
47  * instead of regular TCP. Uses udmabuf to mock a dmabuf provider.
48  *
49  * Usage:
50  *
51  *	On server:
52  *	ncdevmem -s <server IP> -c <client IP> -f eth1 -l -p 5201 -v 7
53  *
54  *	On client:
55  *	yes $(echo -e \\x01\\x02\\x03\\x04\\x05\\x06) | \
56  *		tr \\n \\0 | \
57  *		head -c 5G | \
58  *		nc <server IP> 5201 -p 5201
59  *
60  * Note this is compatible with regular netcat. i.e. the sender or receiver can
61  * be replaced with regular netcat to test the RX or TX path in isolation.
62  */
63 
64 static char *server_ip = "192.168.1.4";
65 static char *client_ip;
66 static char *port = "5201";
67 static size_t do_validation;
68 static int start_queue = 8;
69 static int num_queues = 8;
70 static char *ifname = "eth1";
71 static unsigned int ifindex;
72 static unsigned int dmabuf_id;
73 
74 struct memory_buffer {
75 	int fd;
76 	size_t size;
77 
78 	int devfd;
79 	int memfd;
80 	char *buf_mem;
81 };
82 
83 struct memory_provider {
84 	struct memory_buffer *(*alloc)(size_t size);
85 	void (*free)(struct memory_buffer *ctx);
86 	void (*memcpy_from_device)(void *dst, struct memory_buffer *src,
87 				   size_t off, int n);
88 };
89 
udmabuf_alloc(size_t size)90 static struct memory_buffer *udmabuf_alloc(size_t size)
91 {
92 	struct udmabuf_create create;
93 	struct memory_buffer *ctx;
94 	int ret;
95 
96 	ctx = malloc(sizeof(*ctx));
97 	if (!ctx)
98 		error(1, ENOMEM, "malloc failed");
99 
100 	ctx->size = size;
101 
102 	ctx->devfd = open("/dev/udmabuf", O_RDWR);
103 	if (ctx->devfd < 0)
104 		error(1, errno,
105 		      "%s: [skip,no-udmabuf: Unable to access DMA buffer device file]\n",
106 		      TEST_PREFIX);
107 
108 	ctx->memfd = memfd_create("udmabuf-test", MFD_ALLOW_SEALING);
109 	if (ctx->memfd < 0)
110 		error(1, errno, "%s: [skip,no-memfd]\n", TEST_PREFIX);
111 
112 	ret = fcntl(ctx->memfd, F_ADD_SEALS, F_SEAL_SHRINK);
113 	if (ret < 0)
114 		error(1, errno, "%s: [skip,fcntl-add-seals]\n", TEST_PREFIX);
115 
116 	ret = ftruncate(ctx->memfd, size);
117 	if (ret == -1)
118 		error(1, errno, "%s: [FAIL,memfd-truncate]\n", TEST_PREFIX);
119 
120 	memset(&create, 0, sizeof(create));
121 
122 	create.memfd = ctx->memfd;
123 	create.offset = 0;
124 	create.size = size;
125 	ctx->fd = ioctl(ctx->devfd, UDMABUF_CREATE, &create);
126 	if (ctx->fd < 0)
127 		error(1, errno, "%s: [FAIL, create udmabuf]\n", TEST_PREFIX);
128 
129 	ctx->buf_mem = mmap(NULL, size, PROT_READ | PROT_WRITE, MAP_SHARED,
130 			    ctx->fd, 0);
131 	if (ctx->buf_mem == MAP_FAILED)
132 		error(1, errno, "%s: [FAIL, map udmabuf]\n", TEST_PREFIX);
133 
134 	return ctx;
135 }
136 
udmabuf_free(struct memory_buffer * ctx)137 static void udmabuf_free(struct memory_buffer *ctx)
138 {
139 	munmap(ctx->buf_mem, ctx->size);
140 	close(ctx->fd);
141 	close(ctx->memfd);
142 	close(ctx->devfd);
143 	free(ctx);
144 }
145 
udmabuf_memcpy_from_device(void * dst,struct memory_buffer * src,size_t off,int n)146 static void udmabuf_memcpy_from_device(void *dst, struct memory_buffer *src,
147 				       size_t off, int n)
148 {
149 	struct dma_buf_sync sync = {};
150 
151 	sync.flags = DMA_BUF_SYNC_START;
152 	ioctl(src->fd, DMA_BUF_IOCTL_SYNC, &sync);
153 
154 	memcpy(dst, src->buf_mem + off, n);
155 
156 	sync.flags = DMA_BUF_SYNC_END;
157 	ioctl(src->fd, DMA_BUF_IOCTL_SYNC, &sync);
158 }
159 
160 static struct memory_provider udmabuf_memory_provider = {
161 	.alloc = udmabuf_alloc,
162 	.free = udmabuf_free,
163 	.memcpy_from_device = udmabuf_memcpy_from_device,
164 };
165 
166 static struct memory_provider *provider = &udmabuf_memory_provider;
167 
print_nonzero_bytes(void * ptr,size_t size)168 static void print_nonzero_bytes(void *ptr, size_t size)
169 {
170 	unsigned char *p = ptr;
171 	unsigned int i;
172 
173 	for (i = 0; i < size; i++)
174 		putchar(p[i]);
175 }
176 
validate_buffer(void * line,size_t size)177 void validate_buffer(void *line, size_t size)
178 {
179 	static unsigned char seed = 1;
180 	unsigned char *ptr = line;
181 	int errors = 0;
182 	size_t i;
183 
184 	for (i = 0; i < size; i++) {
185 		if (ptr[i] != seed) {
186 			fprintf(stderr,
187 				"Failed validation: expected=%u, actual=%u, index=%lu\n",
188 				seed, ptr[i], i);
189 			errors++;
190 			if (errors > 20)
191 				error(1, 0, "validation failed.");
192 		}
193 		seed++;
194 		if (seed == do_validation)
195 			seed = 0;
196 	}
197 
198 	fprintf(stdout, "Validated buffer\n");
199 }
200 
201 #define run_command(cmd, ...)                                           \
202 	({                                                              \
203 		char command[256];                                      \
204 		memset(command, 0, sizeof(command));                    \
205 		snprintf(command, sizeof(command), cmd, ##__VA_ARGS__); \
206 		fprintf(stderr, "Running: %s\n", command);                       \
207 		system(command);                                        \
208 	})
209 
reset_flow_steering(void)210 static int reset_flow_steering(void)
211 {
212 	int ret = 0;
213 
214 	ret = run_command("sudo ethtool -K %s ntuple off >&2", ifname);
215 	if (ret)
216 		return ret;
217 
218 	return run_command("sudo ethtool -K %s ntuple on >&2", ifname);
219 }
220 
configure_headersplit(bool on)221 static int configure_headersplit(bool on)
222 {
223 	return run_command("sudo ethtool -G %s tcp-data-split %s >&2", ifname,
224 			   on ? "on" : "off");
225 }
226 
configure_rss(void)227 static int configure_rss(void)
228 {
229 	return run_command("sudo ethtool -X %s equal %d >&2", ifname, start_queue);
230 }
231 
configure_channels(unsigned int rx,unsigned int tx)232 static int configure_channels(unsigned int rx, unsigned int tx)
233 {
234 	return run_command("sudo ethtool -L %s rx %u tx %u", ifname, rx, tx);
235 }
236 
configure_flow_steering(struct sockaddr_in6 * server_sin)237 static int configure_flow_steering(struct sockaddr_in6 *server_sin)
238 {
239 	const char *type = "tcp6";
240 	const char *server_addr;
241 	char buf[40];
242 
243 	inet_ntop(AF_INET6, &server_sin->sin6_addr, buf, sizeof(buf));
244 	server_addr = buf;
245 
246 	if (IN6_IS_ADDR_V4MAPPED(&server_sin->sin6_addr)) {
247 		type = "tcp4";
248 		server_addr = strrchr(server_addr, ':') + 1;
249 	}
250 
251 	return run_command("sudo ethtool -N %s flow-type %s %s %s dst-ip %s %s %s dst-port %s queue %d >&2",
252 			   ifname,
253 			   type,
254 			   client_ip ? "src-ip" : "",
255 			   client_ip ?: "",
256 			   server_addr,
257 			   client_ip ? "src-port" : "",
258 			   client_ip ? port : "",
259 			   port, start_queue);
260 }
261 
bind_rx_queue(unsigned int ifindex,unsigned int dmabuf_fd,struct netdev_queue_id * queues,unsigned int n_queue_index,struct ynl_sock ** ys)262 static int bind_rx_queue(unsigned int ifindex, unsigned int dmabuf_fd,
263 			 struct netdev_queue_id *queues,
264 			 unsigned int n_queue_index, struct ynl_sock **ys)
265 {
266 	struct netdev_bind_rx_req *req = NULL;
267 	struct netdev_bind_rx_rsp *rsp = NULL;
268 	struct ynl_error yerr;
269 
270 	*ys = ynl_sock_create(&ynl_netdev_family, &yerr);
271 	if (!*ys) {
272 		fprintf(stderr, "YNL: %s\n", yerr.msg);
273 		return -1;
274 	}
275 
276 	req = netdev_bind_rx_req_alloc();
277 	netdev_bind_rx_req_set_ifindex(req, ifindex);
278 	netdev_bind_rx_req_set_fd(req, dmabuf_fd);
279 	__netdev_bind_rx_req_set_queues(req, queues, n_queue_index);
280 
281 	rsp = netdev_bind_rx(*ys, req);
282 	if (!rsp) {
283 		perror("netdev_bind_rx");
284 		goto err_close;
285 	}
286 
287 	if (!rsp->_present.id) {
288 		perror("id not present");
289 		goto err_close;
290 	}
291 
292 	fprintf(stderr, "got dmabuf id=%d\n", rsp->id);
293 	dmabuf_id = rsp->id;
294 
295 	netdev_bind_rx_req_free(req);
296 	netdev_bind_rx_rsp_free(rsp);
297 
298 	return 0;
299 
300 err_close:
301 	fprintf(stderr, "YNL failed: %s\n", (*ys)->err.msg);
302 	netdev_bind_rx_req_free(req);
303 	ynl_sock_destroy(*ys);
304 	return -1;
305 }
306 
enable_reuseaddr(int fd)307 static void enable_reuseaddr(int fd)
308 {
309 	int opt = 1;
310 	int ret;
311 
312 	ret = setsockopt(fd, SOL_SOCKET, SO_REUSEPORT, &opt, sizeof(opt));
313 	if (ret)
314 		error(1, errno, "%s: [FAIL, SO_REUSEPORT]\n", TEST_PREFIX);
315 
316 	ret = setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, &opt, sizeof(opt));
317 	if (ret)
318 		error(1, errno, "%s: [FAIL, SO_REUSEADDR]\n", TEST_PREFIX);
319 }
320 
parse_address(const char * str,int port,struct sockaddr_in6 * sin6)321 static int parse_address(const char *str, int port, struct sockaddr_in6 *sin6)
322 {
323 	int ret;
324 
325 	sin6->sin6_family = AF_INET6;
326 	sin6->sin6_port = htons(port);
327 
328 	ret = inet_pton(sin6->sin6_family, str, &sin6->sin6_addr);
329 	if (ret != 1) {
330 		/* fallback to plain IPv4 */
331 		ret = inet_pton(AF_INET, str, &sin6->sin6_addr.s6_addr32[3]);
332 		if (ret != 1)
333 			return -1;
334 
335 		/* add ::ffff prefix */
336 		sin6->sin6_addr.s6_addr32[0] = 0;
337 		sin6->sin6_addr.s6_addr32[1] = 0;
338 		sin6->sin6_addr.s6_addr16[4] = 0;
339 		sin6->sin6_addr.s6_addr16[5] = 0xffff;
340 	}
341 
342 	return 0;
343 }
344 
create_queues(void)345 static struct netdev_queue_id *create_queues(void)
346 {
347 	struct netdev_queue_id *queues;
348 	size_t i = 0;
349 
350 	queues = calloc(num_queues, sizeof(*queues));
351 	for (i = 0; i < num_queues; i++) {
352 		queues[i]._present.type = 1;
353 		queues[i]._present.id = 1;
354 		queues[i].type = NETDEV_QUEUE_TYPE_RX;
355 		queues[i].id = start_queue + i;
356 	}
357 
358 	return queues;
359 }
360 
do_server(struct memory_buffer * mem)361 int do_server(struct memory_buffer *mem)
362 {
363 	char ctrl_data[sizeof(int) * 20000];
364 	struct netdev_queue_id *queues;
365 	size_t non_page_aligned_frags = 0;
366 	struct sockaddr_in6 client_addr;
367 	struct sockaddr_in6 server_sin;
368 	size_t page_aligned_frags = 0;
369 	size_t total_received = 0;
370 	socklen_t client_addr_len;
371 	bool is_devmem = false;
372 	char *tmp_mem = NULL;
373 	struct ynl_sock *ys;
374 	char iobuf[819200];
375 	char buffer[256];
376 	int socket_fd;
377 	int client_fd;
378 	int ret;
379 
380 	ret = parse_address(server_ip, atoi(port), &server_sin);
381 	if (ret < 0)
382 		error(1, 0, "parse server address");
383 
384 	if (reset_flow_steering())
385 		error(1, 0, "Failed to reset flow steering\n");
386 
387 	/* Configure RSS to divert all traffic from our devmem queues */
388 	if (configure_rss())
389 		error(1, 0, "Failed to configure rss\n");
390 
391 	/* Flow steer our devmem flows to start_queue */
392 	if (configure_flow_steering(&server_sin))
393 		error(1, 0, "Failed to configure flow steering\n");
394 
395 	sleep(1);
396 
397 	if (bind_rx_queue(ifindex, mem->fd, create_queues(), num_queues, &ys))
398 		error(1, 0, "Failed to bind\n");
399 
400 	tmp_mem = malloc(mem->size);
401 	if (!tmp_mem)
402 		error(1, ENOMEM, "malloc failed");
403 
404 	socket_fd = socket(AF_INET6, SOCK_STREAM, 0);
405 	if (socket_fd < 0)
406 		error(1, errno, "%s: [FAIL, create socket]\n", TEST_PREFIX);
407 
408 	enable_reuseaddr(socket_fd);
409 
410 	fprintf(stderr, "binding to address %s:%d\n", server_ip,
411 		ntohs(server_sin.sin6_port));
412 
413 	ret = bind(socket_fd, &server_sin, sizeof(server_sin));
414 	if (ret)
415 		error(1, errno, "%s: [FAIL, bind]\n", TEST_PREFIX);
416 
417 	ret = listen(socket_fd, 1);
418 	if (ret)
419 		error(1, errno, "%s: [FAIL, listen]\n", TEST_PREFIX);
420 
421 	client_addr_len = sizeof(client_addr);
422 
423 	inet_ntop(AF_INET6, &server_sin.sin6_addr, buffer,
424 		  sizeof(buffer));
425 	fprintf(stderr, "Waiting or connection on %s:%d\n", buffer,
426 		ntohs(server_sin.sin6_port));
427 	client_fd = accept(socket_fd, &client_addr, &client_addr_len);
428 
429 	inet_ntop(AF_INET6, &client_addr.sin6_addr, buffer,
430 		  sizeof(buffer));
431 	fprintf(stderr, "Got connection from %s:%d\n", buffer,
432 		ntohs(client_addr.sin6_port));
433 
434 	while (1) {
435 		struct iovec iov = { .iov_base = iobuf,
436 				     .iov_len = sizeof(iobuf) };
437 		struct dmabuf_cmsg *dmabuf_cmsg = NULL;
438 		struct cmsghdr *cm = NULL;
439 		struct msghdr msg = { 0 };
440 		struct dmabuf_token token;
441 		ssize_t ret;
442 
443 		is_devmem = false;
444 
445 		msg.msg_iov = &iov;
446 		msg.msg_iovlen = 1;
447 		msg.msg_control = ctrl_data;
448 		msg.msg_controllen = sizeof(ctrl_data);
449 		ret = recvmsg(client_fd, &msg, MSG_SOCK_DEVMEM);
450 		fprintf(stderr, "recvmsg ret=%ld\n", ret);
451 		if (ret < 0 && (errno == EAGAIN || errno == EWOULDBLOCK))
452 			continue;
453 		if (ret < 0) {
454 			perror("recvmsg");
455 			continue;
456 		}
457 		if (ret == 0) {
458 			fprintf(stderr, "client exited\n");
459 			goto cleanup;
460 		}
461 
462 		for (cm = CMSG_FIRSTHDR(&msg); cm; cm = CMSG_NXTHDR(&msg, cm)) {
463 			if (cm->cmsg_level != SOL_SOCKET ||
464 			    (cm->cmsg_type != SCM_DEVMEM_DMABUF &&
465 			     cm->cmsg_type != SCM_DEVMEM_LINEAR)) {
466 				fprintf(stderr, "skipping non-devmem cmsg\n");
467 				continue;
468 			}
469 
470 			dmabuf_cmsg = (struct dmabuf_cmsg *)CMSG_DATA(cm);
471 			is_devmem = true;
472 
473 			if (cm->cmsg_type == SCM_DEVMEM_LINEAR) {
474 				/* TODO: process data copied from skb's linear
475 				 * buffer.
476 				 */
477 				fprintf(stderr,
478 					"SCM_DEVMEM_LINEAR. dmabuf_cmsg->frag_size=%u\n",
479 					dmabuf_cmsg->frag_size);
480 
481 				continue;
482 			}
483 
484 			token.token_start = dmabuf_cmsg->frag_token;
485 			token.token_count = 1;
486 
487 			total_received += dmabuf_cmsg->frag_size;
488 			fprintf(stderr,
489 				"received frag_page=%llu, in_page_offset=%llu, frag_offset=%llu, frag_size=%u, token=%u, total_received=%lu, dmabuf_id=%u\n",
490 				dmabuf_cmsg->frag_offset >> PAGE_SHIFT,
491 				dmabuf_cmsg->frag_offset % getpagesize(),
492 				dmabuf_cmsg->frag_offset,
493 				dmabuf_cmsg->frag_size, dmabuf_cmsg->frag_token,
494 				total_received, dmabuf_cmsg->dmabuf_id);
495 
496 			if (dmabuf_cmsg->dmabuf_id != dmabuf_id)
497 				error(1, 0,
498 				      "received on wrong dmabuf_id: flow steering error\n");
499 
500 			if (dmabuf_cmsg->frag_size % getpagesize())
501 				non_page_aligned_frags++;
502 			else
503 				page_aligned_frags++;
504 
505 			provider->memcpy_from_device(tmp_mem, mem,
506 						     dmabuf_cmsg->frag_offset,
507 						     dmabuf_cmsg->frag_size);
508 
509 			if (do_validation)
510 				validate_buffer(tmp_mem,
511 						dmabuf_cmsg->frag_size);
512 			else
513 				print_nonzero_bytes(tmp_mem,
514 						    dmabuf_cmsg->frag_size);
515 
516 			ret = setsockopt(client_fd, SOL_SOCKET,
517 					 SO_DEVMEM_DONTNEED, &token,
518 					 sizeof(token));
519 			if (ret != 1)
520 				error(1, 0,
521 				      "SO_DEVMEM_DONTNEED not enough tokens");
522 		}
523 		if (!is_devmem)
524 			error(1, 0, "flow steering error\n");
525 
526 		fprintf(stderr, "total_received=%lu\n", total_received);
527 	}
528 
529 	fprintf(stderr, "%s: ok\n", TEST_PREFIX);
530 
531 	fprintf(stderr, "page_aligned_frags=%lu, non_page_aligned_frags=%lu\n",
532 		page_aligned_frags, non_page_aligned_frags);
533 
534 	fprintf(stderr, "page_aligned_frags=%lu, non_page_aligned_frags=%lu\n",
535 		page_aligned_frags, non_page_aligned_frags);
536 
537 cleanup:
538 
539 	free(tmp_mem);
540 	close(client_fd);
541 	close(socket_fd);
542 	ynl_sock_destroy(ys);
543 
544 	return 0;
545 }
546 
run_devmem_tests(void)547 void run_devmem_tests(void)
548 {
549 	struct memory_buffer *mem;
550 	struct ynl_sock *ys;
551 
552 	mem = provider->alloc(getpagesize() * NUM_PAGES);
553 
554 	/* Configure RSS to divert all traffic from our devmem queues */
555 	if (configure_rss())
556 		error(1, 0, "rss error\n");
557 
558 	if (configure_headersplit(1))
559 		error(1, 0, "Failed to configure header split\n");
560 
561 	if (!bind_rx_queue(ifindex, mem->fd,
562 			   calloc(num_queues, sizeof(struct netdev_queue_id)),
563 			   num_queues, &ys))
564 		error(1, 0, "Binding empty queues array should have failed\n");
565 
566 	if (configure_headersplit(0))
567 		error(1, 0, "Failed to configure header split\n");
568 
569 	if (!bind_rx_queue(ifindex, mem->fd, create_queues(), num_queues, &ys))
570 		error(1, 0, "Configure dmabuf with header split off should have failed\n");
571 
572 	if (configure_headersplit(1))
573 		error(1, 0, "Failed to configure header split\n");
574 
575 	if (bind_rx_queue(ifindex, mem->fd, create_queues(), num_queues, &ys))
576 		error(1, 0, "Failed to bind\n");
577 
578 	/* Deactivating a bound queue should not be legal */
579 	if (!configure_channels(num_queues, num_queues - 1))
580 		error(1, 0, "Deactivating a bound queue should be illegal.\n");
581 
582 	/* Closing the netlink socket does an implicit unbind */
583 	ynl_sock_destroy(ys);
584 
585 	provider->free(mem);
586 }
587 
main(int argc,char * argv[])588 int main(int argc, char *argv[])
589 {
590 	struct memory_buffer *mem;
591 	int is_server = 0, opt;
592 	int ret;
593 
594 	while ((opt = getopt(argc, argv, "ls:c:p:v:q:t:f:")) != -1) {
595 		switch (opt) {
596 		case 'l':
597 			is_server = 1;
598 			break;
599 		case 's':
600 			server_ip = optarg;
601 			break;
602 		case 'c':
603 			client_ip = optarg;
604 			break;
605 		case 'p':
606 			port = optarg;
607 			break;
608 		case 'v':
609 			do_validation = atoll(optarg);
610 			break;
611 		case 'q':
612 			num_queues = atoi(optarg);
613 			break;
614 		case 't':
615 			start_queue = atoi(optarg);
616 			break;
617 		case 'f':
618 			ifname = optarg;
619 			break;
620 		case '?':
621 			fprintf(stderr, "unknown option: %c\n", optopt);
622 			break;
623 		}
624 	}
625 
626 	ifindex = if_nametoindex(ifname);
627 
628 	for (; optind < argc; optind++)
629 		fprintf(stderr, "extra arguments: %s\n", argv[optind]);
630 
631 	run_devmem_tests();
632 
633 	mem = provider->alloc(getpagesize() * NUM_PAGES);
634 	ret = is_server ? do_server(mem) : 1;
635 	provider->free(mem);
636 
637 	return ret;
638 }
639