• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /* SPDX-License-Identifier: MIT */
2 /*
3  * Simple test case showing using send and recv bundles
4  */
5 #include <errno.h>
6 #include <stdio.h>
7 #include <stdlib.h>
8 #include <string.h>
9 #include <unistd.h>
10 #include <arpa/inet.h>
11 #include <sys/types.h>
12 #include <sys/socket.h>
13 #include <pthread.h>
14 
15 #define MSG_SIZE 128
16 #define NR_MIN_MSGS	4
17 #define NR_MAX_MSGS	32
18 #define SEQ_SIZE	(MSG_SIZE / sizeof(unsigned long))
19 
20 static int nr_msgs;
21 static int use_tcp;
22 static int classic_buffers;
23 
24 #define RECV_BIDS	8192
25 #define RECV_BID_MASK	(RECV_BIDS - 1)
26 
27 #include "liburing.h"
28 #include "helpers.h"
29 
30 #define PORT	10202
31 #define HOST	"127.0.0.1"
32 
33 static int use_port = PORT;
34 
35 #define SEND_BGID	7
36 #define RECV_BGID	8
37 
38 static int no_send_mshot;
39 
40 struct recv_data {
41 	pthread_barrier_t connect;
42 	pthread_barrier_t startup;
43 	pthread_barrier_t barrier;
44 	pthread_barrier_t finish;
45 	unsigned long seq;
46 	int recv_bytes;
47 	int accept_fd;
48 	int abort;
49 	unsigned int max_sends;
50 	int to_eagain;
51 	void *recv_buf;
52 
53 	int send_bundle;
54 	int recv_bundle;
55 };
56 
arm_recv(struct io_uring * ring,struct recv_data * rd)57 static int arm_recv(struct io_uring *ring, struct recv_data *rd)
58 {
59 	struct io_uring_sqe *sqe;
60 	int ret;
61 
62 	sqe = io_uring_get_sqe(ring);
63 	io_uring_prep_recv_multishot(sqe, rd->accept_fd, NULL, 0, 0);
64 	if (rd->recv_bundle && use_tcp)
65 		sqe->ioprio |= IORING_RECVSEND_BUNDLE;
66 	sqe->buf_group = RECV_BGID;
67 	sqe->flags |= IOSQE_BUFFER_SELECT;
68 	sqe->user_data = 2;
69 
70 	ret = io_uring_submit(ring);
71 	if (ret != 1) {
72 		fprintf(stderr, "submit failed: %d\n", ret);
73 		return 1;
74 	}
75 
76 	return 0;
77 }
78 
recv_prep(struct io_uring * ring,struct recv_data * rd,int * sock)79 static int recv_prep(struct io_uring *ring, struct recv_data *rd, int *sock)
80 {
81 	struct sockaddr_in saddr;
82 	int sockfd, ret, val, use_fd;
83 	socklen_t socklen;
84 
85 	memset(&saddr, 0, sizeof(saddr));
86 	saddr.sin_family = AF_INET;
87 	saddr.sin_addr.s_addr = htonl(INADDR_ANY);
88 	saddr.sin_port = htons(use_port);
89 
90 	if (use_tcp)
91 		sockfd = socket(AF_INET, SOCK_STREAM, 0);
92 	else
93 		sockfd = socket(AF_INET, SOCK_DGRAM, 0);
94 	if (sockfd < 0) {
95 		perror("socket");
96 		return 1;
97 	}
98 
99 	val = 1;
100 	setsockopt(sockfd, SOL_SOCKET, SO_REUSEADDR, &val, sizeof(val));
101 
102 	ret = bind(sockfd, (struct sockaddr *)&saddr, sizeof(saddr));
103 	if (ret < 0) {
104 		perror("bind");
105 		goto err;
106 	}
107 
108 	if (use_tcp) {
109 		ret = listen(sockfd, 1);
110 		if (ret < 0) {
111 			perror("listen");
112 			goto err;
113 		}
114 
115 		pthread_barrier_wait(&rd->connect);
116 
117 		if (rd->abort)
118 			goto err;
119 
120 		socklen = sizeof(saddr);
121 		use_fd = accept(sockfd, (struct sockaddr *)&saddr, &socklen);
122 		if (use_fd < 0) {
123 			perror("accept");
124 			goto err;
125 		}
126 	} else {
127 		use_fd = sockfd;
128 		pthread_barrier_wait(&rd->connect);
129 	}
130 
131 	rd->accept_fd = use_fd;
132 	pthread_barrier_wait(&rd->startup);
133 	pthread_barrier_wait(&rd->barrier);
134 
135 	if (arm_recv(ring, rd))
136 		goto err;
137 
138 	*sock = sockfd;
139 	return 0;
140 err:
141 	close(sockfd);
142 	return 1;
143 }
144 
verify_seq(struct recv_data * rd,void * verify_ptr,int verify_sz,int start_bid)145 static int verify_seq(struct recv_data *rd, void *verify_ptr, int verify_sz,
146 		      int start_bid)
147 {
148 	unsigned long *seqp;
149 	int seq_size = verify_sz / sizeof(unsigned long);
150 	int i;
151 
152 	seqp = verify_ptr;
153 	for (i = 0; i < seq_size; i++) {
154 		if (rd->seq != *seqp) {
155 			fprintf(stderr, "bid=%d, got seq %lu, wanted %lu, offset %d\n", start_bid, *seqp, rd->seq, i);
156 			return 0;
157 		}
158 		seqp++;
159 		rd->seq++;
160 	}
161 
162 	return 1;
163 }
164 
recv_get_cqe(struct io_uring * ring,struct recv_data * rd,struct io_uring_cqe ** cqe)165 static int recv_get_cqe(struct io_uring *ring, struct recv_data *rd,
166 			struct io_uring_cqe **cqe)
167 {
168 	struct __kernel_timespec ts = { .tv_sec = 0, .tv_nsec = 100000000LL };
169 	int ret;
170 
171 	do {
172 		ret = io_uring_wait_cqe_timeout(ring, cqe, &ts);
173 		if (!ret)
174 			return 0;
175 		if (ret == -ETIME) {
176 			if (rd->abort)
177 				break;
178 			continue;
179 		}
180 		fprintf(stderr, "wait recv: %d\n", ret);
181 		break;
182 	} while (1);
183 
184 	return 1;
185 }
186 
do_recv(struct io_uring * ring,struct recv_data * rd)187 static int do_recv(struct io_uring *ring, struct recv_data *rd)
188 {
189 	struct io_uring_cqe *cqe;
190 	int bid, next_bid = 0;
191 	void *verify_ptr;
192 	int verify_sz = 0;
193 	int verify_bid = 0;
194 
195 	verify_ptr = malloc(rd->recv_bytes);
196 
197 	do {
198 		if (recv_get_cqe(ring, rd, &cqe))
199 			break;
200 		if (cqe->res == -EINVAL) {
201 			fprintf(stdout, "recv not supported, skipping\n");
202 			return 0;
203 		}
204 		if (cqe->res < 0) {
205 			fprintf(stderr, "failed recv cqe: %d\n", cqe->res);
206 			goto err;
207 		}
208 		if (!(cqe->flags & IORING_CQE_F_BUFFER)) {
209 			fprintf(stderr, "no buffer set in recv\n");
210 			goto err;
211 		}
212 		bid = cqe->flags >> IORING_CQE_BUFFER_SHIFT;
213 		if (bid != next_bid) {
214 			fprintf(stderr, "got bid %d, wanted %d\n", bid, next_bid);
215 			goto err;
216 		}
217 		if (!rd->recv_bundle && cqe->res > MSG_SIZE) {
218 			fprintf(stderr, "recv got wrong length: %d\n", cqe->res);
219 			goto err;
220 		}
221 		if (!(verify_sz % MSG_SIZE)) {
222 			if (!verify_seq(rd, verify_ptr, verify_sz, verify_bid))
223 				goto err;
224 			verify_bid += verify_sz / MSG_SIZE;
225 			verify_bid &= RECV_BID_MASK;
226 			verify_sz = 0;
227 		} else {
228 			memcpy(verify_ptr + verify_sz, rd->recv_buf + (bid * MSG_SIZE), cqe->res);
229 			verify_sz += cqe->res;
230 		}
231 		next_bid = bid + ((cqe->res + MSG_SIZE - 1) / MSG_SIZE);
232 		next_bid &= RECV_BID_MASK;
233 		rd->recv_bytes -= cqe->res;
234 		io_uring_cqe_seen(ring, cqe);
235 		if (!(cqe->flags & IORING_CQE_F_MORE) && rd->recv_bytes) {
236 			if (arm_recv(ring, rd))
237 				goto err;
238 		}
239 	} while (rd->recv_bytes);
240 
241 	if (verify_sz && !(verify_sz % MSG_SIZE) &&
242 	    !verify_seq(rd, verify_ptr, verify_sz, verify_bid))
243 		goto err;
244 
245 	pthread_barrier_wait(&rd->finish);
246 	return 0;
247 err:
248 	pthread_barrier_wait(&rd->finish);
249 	return 1;
250 }
251 
provide_classic_buffers(struct io_uring * ring,void * buf,int nbufs,int bgid)252 static int provide_classic_buffers(struct io_uring *ring, void *buf, int nbufs, int bgid)
253 {
254 	struct io_uring_sqe *sqe;
255 	struct io_uring_cqe *cqe;
256 	int ret;
257 
258 	sqe = io_uring_get_sqe(ring);
259 	io_uring_prep_provide_buffers(sqe, buf, MSG_SIZE, nbufs, bgid, 0);
260 	io_uring_submit(ring);
261 
262 	ret = io_uring_wait_cqe(ring, &cqe);
263 	if (ret) {
264 		fprintf(stderr, "provide buffer wait: %d\n", ret);
265 		return 1;
266 	}
267 	if (cqe->res) {
268 		fprintf(stderr, "provide buffers fail: %d\n", cqe->res);
269 		return 1;
270 	}
271 	io_uring_cqe_seen(ring, cqe);
272 	return 0;
273 }
274 
recv_fn(void * data)275 static void *recv_fn(void *data)
276 {
277 	struct recv_data *rd = data;
278 	struct io_uring_params p = { };
279 	struct io_uring ring;
280 	struct io_uring_buf_ring *br;
281 	void *buf = NULL, *ptr;
282 	int ret, sock, i;
283 
284 	p.cq_entries = 4096;
285 	p.flags = IORING_SETUP_CQSIZE;
286 	ret = t_create_ring_params(16, &ring, &p);
287 	if (ret == T_SETUP_SKIP) {
288 		ret = 0;
289 		goto err;
290 	} else if (ret < 0) {
291 		goto err;
292 	}
293 
294 	if (posix_memalign(&buf, 4096, MSG_SIZE * RECV_BIDS))
295 		goto err;
296 
297 	if (!classic_buffers) {
298 		br = io_uring_setup_buf_ring(&ring, RECV_BIDS, RECV_BGID, 0, &ret);
299 		if (!br) {
300 			if (ret != -EINVAL)
301 				fprintf(stderr, "failed setting up recv ring %d\n", ret);
302 			goto err;
303 		}
304 
305 		ptr = buf;
306 		for (i = 0; i < RECV_BIDS; i++) {
307 			io_uring_buf_ring_add(br, ptr, MSG_SIZE, i, RECV_BID_MASK, i);
308 			ptr += MSG_SIZE;
309 		}
310 		io_uring_buf_ring_advance(br, RECV_BIDS);
311 		rd->recv_buf = buf;
312 	} else {
313 		ret = provide_classic_buffers(&ring, buf, RECV_BIDS, RECV_BGID);
314 		if (ret) {
315 			fprintf(stderr, "failed providing classic buffers\n");
316 			goto err;
317 		}
318 	}
319 
320 	ret = recv_prep(&ring, rd, &sock);
321 	if (ret) {
322 		fprintf(stderr, "recv_prep failed: %d\n", ret);
323 		goto err;
324 	}
325 
326 	ret = do_recv(&ring, rd);
327 
328 	close(sock);
329 	close(rd->accept_fd);
330 	io_uring_queue_exit(&ring);
331 err:
332 	free(buf);
333 	return (void *)(intptr_t)ret;
334 }
335 
__do_send_bundle(struct recv_data * rd,struct io_uring * ring,int sockfd)336 static int __do_send_bundle(struct recv_data *rd, struct io_uring *ring, int sockfd)
337 {
338 	struct io_uring_cqe *cqe;
339 	struct io_uring_sqe *sqe;
340 	int bytes_needed = MSG_SIZE * nr_msgs;
341 	int i, ret;
342 
343 	sqe = io_uring_get_sqe(ring);
344 	io_uring_prep_send_bundle(sqe, sockfd, 0, 0);
345 	sqe->flags |= IOSQE_BUFFER_SELECT;
346 	sqe->buf_group = SEND_BGID;
347 	sqe->user_data = 1;
348 
349 	ret = io_uring_submit(ring);
350 	if (ret != 1)
351 		return 1;
352 
353 	pthread_barrier_wait(&rd->barrier);
354 
355 	for (i = 0; i < nr_msgs; i++) {
356 		ret = io_uring_wait_cqe(ring, &cqe);
357 		if (ret) {
358 			fprintf(stderr, "wait send: %d\n", ret);
359 			return 1;
360 		}
361 		if (!i && cqe->res == -EINVAL) {
362 			rd->abort = 1;
363 			no_send_mshot = 1;
364 			break;
365 		}
366 		if (cqe->res < 0) {
367 			fprintf(stderr, "bad send cqe res: %d\n", cqe->res);
368 			return 1;
369 		}
370 		bytes_needed -= cqe->res;
371 		if (!bytes_needed) {
372 			io_uring_cqe_seen(ring, cqe);
373 			break;
374 		}
375 		if (!(cqe->flags & IORING_CQE_F_MORE)) {
376 			fprintf(stderr, "expected more, but MORE not set\n");
377 			return 1;
378 		}
379 		io_uring_cqe_seen(ring, cqe);
380 	}
381 
382 	return 0;
383 }
384 
__do_send(struct recv_data * rd,struct io_uring * ring,int sockfd)385 static int __do_send(struct recv_data *rd, struct io_uring *ring, int sockfd)
386 {
387 	struct io_uring_cqe *cqe;
388 	struct io_uring_sqe *sqe;
389 	int bytes_needed = MSG_SIZE * nr_msgs;
390 	int i, ret;
391 
392 	for (i = 0; i < nr_msgs; i++) {
393 		sqe = io_uring_get_sqe(ring);
394 		io_uring_prep_send(sqe, sockfd, NULL, 0, 0);
395 		sqe->user_data = 10 + i;
396 		sqe->flags |= IOSQE_BUFFER_SELECT;
397 		sqe->buf_group = SEND_BGID;
398 
399 		ret = io_uring_submit(ring);
400 		if (ret != 1)
401 			return 1;
402 
403 		if (!i)
404 			pthread_barrier_wait(&rd->barrier);
405 		ret = io_uring_wait_cqe(ring, &cqe);
406 		if (ret) {
407 			fprintf(stderr, "send wait cqe %d\n", ret);
408 			return 1;
409 		}
410 
411 		if (!i && cqe->res == -EINVAL) {
412 			rd->abort = 1;
413 			no_send_mshot = 1;
414 			break;
415 		}
416 		if (cqe->res != MSG_SIZE) {
417 			fprintf(stderr, "send failed cqe: %d\n", cqe->res);
418 			return 1;
419 		}
420 		if (cqe->res < 0) {
421 			fprintf(stderr, "bad send cqe res: %d\n", cqe->res);
422 			return 1;
423 		}
424 		bytes_needed -= cqe->res;
425 		io_uring_cqe_seen(ring, cqe);
426 		if (!bytes_needed)
427 			break;
428 	}
429 
430 	return 0;
431 }
432 
do_send(struct recv_data * rd)433 static int do_send(struct recv_data *rd)
434 {
435 	struct sockaddr_in saddr;
436 	struct io_uring ring;
437 	unsigned long seq_buf[SEQ_SIZE], send_seq;
438 	struct io_uring_params p = { };
439 	struct io_uring_buf_ring *br;
440 	int sockfd, ret, len, i;
441 	socklen_t optlen;
442 	void *buf = NULL, *ptr;
443 
444 	ret = io_uring_queue_init_params(16, &ring, &p);
445 	if (ret) {
446 		fprintf(stderr, "queue init failed: %d\n", ret);
447 		return 1;
448 	}
449 	if (!(p.features & IORING_FEAT_RECVSEND_BUNDLE)) {
450 		rd->abort = 1;
451 		no_send_mshot = 1;
452 		pthread_barrier_wait(&rd->connect);
453 		return 0;
454 	}
455 
456 	if (posix_memalign(&buf, 4096, MSG_SIZE * nr_msgs))
457 		return 1;
458 
459 	if (!classic_buffers) {
460 		br = io_uring_setup_buf_ring(&ring, nr_msgs, SEND_BGID, 0, &ret);
461 		if (!br) {
462 			if (ret == -EINVAL) {
463 				fprintf(stderr, "einval on br setup\n");
464 				return 0;
465 			}
466 			fprintf(stderr, "failed setting up send ring %d\n", ret);
467 			return 1;
468 		}
469 
470 		ptr = buf;
471 		for (i = 0; i < nr_msgs; i++) {
472 			io_uring_buf_ring_add(br, ptr, MSG_SIZE, i, nr_msgs - 1, i);
473 			ptr += MSG_SIZE;
474 		}
475 		io_uring_buf_ring_advance(br, nr_msgs);
476 	} else {
477 		ret = provide_classic_buffers(&ring, buf, nr_msgs, SEND_BGID);
478 		if (ret) {
479 			fprintf(stderr, "failed providing classic buffers\n");
480 			return ret;
481 		}
482 	}
483 
484 	memset(&saddr, 0, sizeof(saddr));
485 	saddr.sin_family = AF_INET;
486 	saddr.sin_port = htons(use_port);
487 	inet_pton(AF_INET, HOST, &saddr.sin_addr);
488 
489 	if (use_tcp)
490 		sockfd = socket(AF_INET, SOCK_STREAM, 0);
491 	else
492 		sockfd = socket(AF_INET, SOCK_DGRAM, 0);
493 	if (sockfd < 0) {
494 		perror("socket");
495 		goto err2;
496 	}
497 
498 	pthread_barrier_wait(&rd->connect);
499 
500 	ret = connect(sockfd, (struct sockaddr *)&saddr, sizeof(saddr));
501 	if (ret < 0) {
502 		perror("connect");
503 		goto err;
504 	}
505 
506 	pthread_barrier_wait(&rd->startup);
507 
508 	optlen = sizeof(len);
509 	len = 1024 * MSG_SIZE;
510 	setsockopt(sockfd, SOL_SOCKET, SO_SNDBUF, &len, optlen);
511 
512 	/* almost fill queue, leave room for one message */
513 	send_seq = 0;
514 	rd->to_eagain = 0;
515 	while (rd->max_sends && rd->max_sends--) {
516 		for (i = 0; i < SEQ_SIZE; i++)
517 			seq_buf[i] = send_seq++;
518 
519 		ret = send(sockfd, seq_buf, sizeof(seq_buf), MSG_DONTWAIT);
520 		if (ret < 0) {
521 			if (errno == EAGAIN) {
522 				send_seq -= SEQ_SIZE;
523 				break;
524 			}
525 			perror("send");
526 			return 1;
527 		} else if (ret != sizeof(seq_buf)) {
528 			fprintf(stderr, "short %d send\n", ret);
529 			return 1;
530 		}
531 
532 		rd->to_eagain++;
533 		rd->recv_bytes += sizeof(seq_buf);
534 	}
535 
536 	ptr = buf;
537 	for (i = 0; i < nr_msgs; i++) {
538 		unsigned long *pseq = ptr;
539 		int j;
540 
541 		for (j = 0; j < SEQ_SIZE; j++)
542 			pseq[j] = send_seq++;
543 		ptr += MSG_SIZE;
544 	}
545 
546 	/* prepare more messages, sending with bundle */
547 	rd->recv_bytes += (nr_msgs * MSG_SIZE);
548 	if (rd->send_bundle && use_tcp)
549 		ret = __do_send_bundle(rd, &ring, sockfd);
550 	else
551 		ret = __do_send(rd, &ring, sockfd);
552 	if (ret)
553 		goto err;
554 
555 	pthread_barrier_wait(&rd->finish);
556 
557 	close(sockfd);
558 	io_uring_queue_exit(&ring);
559 	free(buf);
560 	return 0;
561 
562 err:
563 	close(sockfd);
564 err2:
565 	io_uring_queue_exit(&ring);
566 	pthread_barrier_wait(&rd->finish);
567 	free(buf);
568 	return 1;
569 }
570 
test(int backlog,unsigned int max_sends,int * to_eagain,int send_bundle,int recv_bundle)571 static int test(int backlog, unsigned int max_sends, int *to_eagain,
572 		int send_bundle, int recv_bundle)
573 {
574 	pthread_t recv_thread;
575 	struct recv_data rd;
576 	int ret;
577 	void *retval;
578 
579 	/* backlog not reliable on UDP, skip it */
580 	if ((backlog || max_sends) && !use_tcp)
581 		return T_EXIT_PASS;
582 
583 	memset(&rd, 0, sizeof(rd));
584 	pthread_barrier_init(&rd.connect, NULL, 2);
585 	pthread_barrier_init(&rd.startup, NULL, 2);
586 	pthread_barrier_init(&rd.barrier, NULL, 2);
587 	pthread_barrier_init(&rd.finish, NULL, 2);
588 	rd.max_sends = max_sends;
589 	if (to_eagain)
590 		*to_eagain = 0;
591 
592 	rd.send_bundle = send_bundle;
593 	rd.recv_bundle = recv_bundle;
594 
595 	ret = pthread_create(&recv_thread, NULL, recv_fn, &rd);
596 	if (ret) {
597 		fprintf(stderr, "Thread create failed: %d\n", ret);
598 		return 1;
599 	}
600 
601 	ret = do_send(&rd);
602 	if (no_send_mshot) {
603 		fprintf(stderr, "no_send_mshot, aborting (ignore other errors)\n");
604 		rd.abort = 1;
605 		pthread_join(recv_thread, &retval);
606 		return 0;
607 	}
608 
609 	if (ret)
610 		return ret;
611 
612 	pthread_join(recv_thread, &retval);
613 	if (to_eagain)
614 		*to_eagain = rd.to_eagain;
615 	return (intptr_t)retval;
616 }
617 
run_tests(int is_udp)618 static int run_tests(int is_udp)
619 {
620 	int ret, eagain_hit;
621 
622 	nr_msgs = NR_MIN_MSGS;
623 
624 	/* test basic send bundle first */
625 	ret = test(0, 0, NULL, 0, 0);
626 	if (ret) {
627 		fprintf(stderr, "test a failed\n");
628 		return T_EXIT_FAIL;
629 	}
630 	if (no_send_mshot)
631 		return T_EXIT_SKIP;
632 
633 	/* test recv bundle */
634 	ret = test(0, 0, NULL, 0, 1);
635 	if (ret) {
636 		fprintf(stderr, "test b failed\n");
637 		return T_EXIT_FAIL;
638 	}
639 
640 	/* test bundling recv and send */
641 	ret = test(0, 0, NULL, 1, 1);
642 	if (ret) {
643 		fprintf(stderr, "test c failed\n");
644 		return T_EXIT_FAIL;
645 	}
646 
647 	/* test bundling with full socket */
648 	ret = test(1, 1000000, &eagain_hit, 1, 1);
649 	if (ret) {
650 		fprintf(stderr, "test d failed\n");
651 		return T_EXIT_FAIL;
652 	}
653 
654 	/* test bundling with almost full socket */
655 	ret = test(1, eagain_hit - (nr_msgs / 2), NULL, 1, 1);
656 	if (ret) {
657 		fprintf(stderr, "test e failed\n");
658 		return T_EXIT_FAIL;
659 	}
660 
661 	/* test recv bundle with almost full socket */
662 	ret = test(1, eagain_hit - (nr_msgs / 2), NULL, 0, 1);
663 	if (ret) {
664 		fprintf(stderr, "test f failed\n");
665 		return T_EXIT_FAIL;
666 	}
667 
668 	if (is_udp)
669 		return T_EXIT_PASS;
670 
671 	/* test send bundle with almost full socket */
672 	ret = test(1, eagain_hit - (nr_msgs / 2), &eagain_hit, 1, 0);
673 	if (ret) {
674 		fprintf(stderr, "test g failed\n");
675 		return T_EXIT_FAIL;
676 	}
677 
678 	/* now repeat the last three tests, but with > FAST_UIOV segments */
679 	nr_msgs = NR_MAX_MSGS;
680 
681 	/* test bundling with almost full socket */
682 	ret = test(1, eagain_hit - (nr_msgs / 2), NULL, 1, 1);
683 	if (ret) {
684 		fprintf(stderr, "test h failed\n");
685 		return T_EXIT_FAIL;
686 	}
687 
688 	/* test recv bundle with almost full socket */
689 	ret = test(1, eagain_hit - (nr_msgs / 2), NULL, 0, 1);
690 	if (ret) {
691 		fprintf(stderr, "test i failed\n");
692 		return T_EXIT_FAIL;
693 	}
694 
695 	/* test send bundle with almost full socket */
696 	ret = test(1, eagain_hit - (nr_msgs / 2), &eagain_hit, 1, 0);
697 	if (ret) {
698 		fprintf(stderr, "test j failed\n");
699 		return T_EXIT_FAIL;
700 	}
701 
702 	return T_EXIT_PASS;
703 }
704 
test_tcp(void)705 static int test_tcp(void)
706 {
707 	int ret;
708 
709 	use_tcp = 1;
710 	ret = run_tests(false);
711 	if (ret == T_EXIT_FAIL)
712 		fprintf(stderr, "TCP test case (classic=%d) failed\n", classic_buffers);
713 	return ret;
714 }
715 
test_udp(void)716 static int test_udp(void)
717 {
718 	int ret;
719 
720 	use_tcp = 0;
721 	use_port++;
722 	ret = run_tests(true);
723 	if (ret == T_EXIT_FAIL)
724 		fprintf(stderr, "UDP test case (classic=%d) failed\n", classic_buffers);
725 	return ret;
726 }
727 
main(int argc,char * argv[])728 int main(int argc, char *argv[])
729 {
730 	int ret;
731 
732 	if (argc > 1)
733 		return T_EXIT_SKIP;
734 
735 	ret = test_tcp();
736 	if (ret != T_EXIT_PASS)
737 		return ret;
738 
739 	ret = test_udp();
740 	if (ret != T_EXIT_PASS)
741 		return ret;
742 
743 	classic_buffers = 1;
744 
745 	ret = test_tcp();
746 	if (ret != T_EXIT_PASS)
747 		return ret;
748 
749 	ret = test_udp();
750 	if (ret != T_EXIT_PASS)
751 		return ret;
752 
753 	return T_EXIT_PASS;
754 }
755