1 use crate::fmt;
2 use crate::io::prelude::*;
3 use crate::io::{BorrowedBuf, ErrorKind, IoSlice, IoSliceMut};
4 use crate::mem::MaybeUninit;
5 use crate::net::test::{next_test_ip4, next_test_ip6};
6 use crate::net::*;
7 use crate::sync::mpsc::channel;
8 use crate::thread;
9 use crate::time::{Duration, Instant};
10
each_ip(f: &mut dyn FnMut(SocketAddr))11 fn each_ip(f: &mut dyn FnMut(SocketAddr)) {
12 f(next_test_ip4());
13 f(next_test_ip6());
14 }
15
16 macro_rules! t {
17 ($e:expr) => {
18 match $e {
19 Ok(t) => t,
20 Err(e) => panic!("received error for `{}`: {}", stringify!($e), e),
21 }
22 };
23 }
24
25 #[test]
bind_error()26 fn bind_error() {
27 match TcpListener::bind("1.1.1.1:9999") {
28 Ok(..) => panic!(),
29 Err(e) => assert_eq!(e.kind(), ErrorKind::AddrNotAvailable),
30 }
31 }
32
33 #[test]
connect_error()34 fn connect_error() {
35 match TcpStream::connect("0.0.0.0:1") {
36 Ok(..) => panic!(),
37 Err(e) => assert!(
38 e.kind() == ErrorKind::ConnectionRefused
39 || e.kind() == ErrorKind::InvalidInput
40 || e.kind() == ErrorKind::AddrInUse
41 || e.kind() == ErrorKind::AddrNotAvailable,
42 "bad error: {} {:?}",
43 e,
44 e.kind()
45 ),
46 }
47 }
48
49 #[test]
50 #[cfg_attr(target_env = "sgx", ignore)] // FIXME: https://github.com/fortanix/rust-sgx/issues/31
connect_timeout_error()51 fn connect_timeout_error() {
52 let socket_addr = next_test_ip4();
53 let result = TcpStream::connect_timeout(&socket_addr, Duration::MAX);
54 assert!(!matches!(result, Err(e) if e.kind() == ErrorKind::TimedOut));
55
56 let _listener = TcpListener::bind(&socket_addr).unwrap();
57 assert!(TcpStream::connect_timeout(&socket_addr, Duration::MAX).is_ok());
58 }
59
60 #[test]
listen_localhost()61 fn listen_localhost() {
62 let socket_addr = next_test_ip4();
63 let listener = t!(TcpListener::bind(&socket_addr));
64
65 let _t = thread::spawn(move || {
66 let mut stream = t!(TcpStream::connect(&("localhost", socket_addr.port())));
67 t!(stream.write(&[144]));
68 });
69
70 let mut stream = t!(listener.accept()).0;
71 let mut buf = [0];
72 t!(stream.read(&mut buf));
73 assert!(buf[0] == 144);
74 }
75
76 #[test]
connect_loopback()77 fn connect_loopback() {
78 each_ip(&mut |addr| {
79 let acceptor = t!(TcpListener::bind(&addr));
80
81 let _t = thread::spawn(move || {
82 let host = match addr {
83 SocketAddr::V4(..) => "127.0.0.1",
84 SocketAddr::V6(..) => "::1",
85 };
86 let mut stream = t!(TcpStream::connect(&(host, addr.port())));
87 t!(stream.write(&[66]));
88 });
89
90 let mut stream = t!(acceptor.accept()).0;
91 let mut buf = [0];
92 t!(stream.read(&mut buf));
93 assert!(buf[0] == 66);
94 })
95 }
96
97 #[test]
smoke_test()98 fn smoke_test() {
99 each_ip(&mut |addr| {
100 let acceptor = t!(TcpListener::bind(&addr));
101
102 let (tx, rx) = channel();
103 let _t = thread::spawn(move || {
104 let mut stream = t!(TcpStream::connect(&addr));
105 t!(stream.write(&[99]));
106 tx.send(t!(stream.local_addr())).unwrap();
107 });
108
109 let (mut stream, addr) = t!(acceptor.accept());
110 let mut buf = [0];
111 t!(stream.read(&mut buf));
112 assert!(buf[0] == 99);
113 assert_eq!(addr, t!(rx.recv()));
114 })
115 }
116
117 #[test]
read_eof()118 fn read_eof() {
119 each_ip(&mut |addr| {
120 let acceptor = t!(TcpListener::bind(&addr));
121
122 let _t = thread::spawn(move || {
123 let _stream = t!(TcpStream::connect(&addr));
124 // Close
125 });
126
127 let mut stream = t!(acceptor.accept()).0;
128 let mut buf = [0];
129 let nread = t!(stream.read(&mut buf));
130 assert_eq!(nread, 0);
131 let nread = t!(stream.read(&mut buf));
132 assert_eq!(nread, 0);
133 })
134 }
135
136 #[test]
write_close()137 fn write_close() {
138 each_ip(&mut |addr| {
139 let acceptor = t!(TcpListener::bind(&addr));
140
141 let (tx, rx) = channel();
142 let _t = thread::spawn(move || {
143 drop(t!(TcpStream::connect(&addr)));
144 tx.send(()).unwrap();
145 });
146
147 let mut stream = t!(acceptor.accept()).0;
148 rx.recv().unwrap();
149 let buf = [0];
150 match stream.write(&buf) {
151 Ok(..) => {}
152 Err(e) => {
153 assert!(
154 e.kind() == ErrorKind::ConnectionReset
155 || e.kind() == ErrorKind::BrokenPipe
156 || e.kind() == ErrorKind::ConnectionAborted,
157 "unknown error: {e}"
158 );
159 }
160 }
161 })
162 }
163
164 #[test]
multiple_connect_serial()165 fn multiple_connect_serial() {
166 each_ip(&mut |addr| {
167 let max = 10;
168 let acceptor = t!(TcpListener::bind(&addr));
169
170 let _t = thread::spawn(move || {
171 for _ in 0..max {
172 let mut stream = t!(TcpStream::connect(&addr));
173 t!(stream.write(&[99]));
174 }
175 });
176
177 for stream in acceptor.incoming().take(max) {
178 let mut stream = t!(stream);
179 let mut buf = [0];
180 t!(stream.read(&mut buf));
181 assert_eq!(buf[0], 99);
182 }
183 })
184 }
185
186 #[test]
multiple_connect_interleaved_greedy_schedule()187 fn multiple_connect_interleaved_greedy_schedule() {
188 const MAX: usize = 10;
189 each_ip(&mut |addr| {
190 let acceptor = t!(TcpListener::bind(&addr));
191
192 let _t = thread::spawn(move || {
193 let acceptor = acceptor;
194 for (i, stream) in acceptor.incoming().enumerate().take(MAX) {
195 // Start another thread to handle the connection
196 let _t = thread::spawn(move || {
197 let mut stream = t!(stream);
198 let mut buf = [0];
199 t!(stream.read(&mut buf));
200 assert!(buf[0] == i as u8);
201 });
202 }
203 });
204
205 connect(0, addr);
206 });
207
208 fn connect(i: usize, addr: SocketAddr) {
209 if i == MAX {
210 return;
211 }
212
213 let t = thread::spawn(move || {
214 let mut stream = t!(TcpStream::connect(&addr));
215 // Connect again before writing
216 connect(i + 1, addr);
217 t!(stream.write(&[i as u8]));
218 });
219 t.join().ok().expect("thread panicked");
220 }
221 }
222
223 #[test]
multiple_connect_interleaved_lazy_schedule()224 fn multiple_connect_interleaved_lazy_schedule() {
225 const MAX: usize = 10;
226 each_ip(&mut |addr| {
227 let acceptor = t!(TcpListener::bind(&addr));
228
229 let _t = thread::spawn(move || {
230 for stream in acceptor.incoming().take(MAX) {
231 // Start another thread to handle the connection
232 let _t = thread::spawn(move || {
233 let mut stream = t!(stream);
234 let mut buf = [0];
235 t!(stream.read(&mut buf));
236 assert!(buf[0] == 99);
237 });
238 }
239 });
240
241 connect(0, addr);
242 });
243
244 fn connect(i: usize, addr: SocketAddr) {
245 if i == MAX {
246 return;
247 }
248
249 let t = thread::spawn(move || {
250 let mut stream = t!(TcpStream::connect(&addr));
251 connect(i + 1, addr);
252 t!(stream.write(&[99]));
253 });
254 t.join().ok().expect("thread panicked");
255 }
256 }
257
258 #[test]
socket_and_peer_name()259 fn socket_and_peer_name() {
260 each_ip(&mut |addr| {
261 let listener = t!(TcpListener::bind(&addr));
262 let so_name = t!(listener.local_addr());
263 assert_eq!(addr, so_name);
264 let _t = thread::spawn(move || {
265 t!(listener.accept());
266 });
267
268 let stream = t!(TcpStream::connect(&addr));
269 assert_eq!(addr, t!(stream.peer_addr()));
270 })
271 }
272
273 #[test]
partial_read()274 fn partial_read() {
275 each_ip(&mut |addr| {
276 let (tx, rx) = channel();
277 let srv = t!(TcpListener::bind(&addr));
278 let _t = thread::spawn(move || {
279 let mut cl = t!(srv.accept()).0;
280 cl.write(&[10]).unwrap();
281 let mut b = [0];
282 t!(cl.read(&mut b));
283 tx.send(()).unwrap();
284 });
285
286 let mut c = t!(TcpStream::connect(&addr));
287 let mut b = [0; 10];
288 assert_eq!(c.read(&mut b).unwrap(), 1);
289 t!(c.write(&[1]));
290 rx.recv().unwrap();
291 })
292 }
293
294 #[test]
read_buf()295 fn read_buf() {
296 each_ip(&mut |addr| {
297 let srv = t!(TcpListener::bind(&addr));
298 let t = thread::spawn(move || {
299 let mut s = t!(TcpStream::connect(&addr));
300 s.write_all(&[1, 2, 3, 4]).unwrap();
301 });
302
303 let mut s = t!(srv.accept()).0;
304 let mut buf: [MaybeUninit<u8>; 128] = MaybeUninit::uninit_array();
305 let mut buf = BorrowedBuf::from(buf.as_mut_slice());
306 t!(s.read_buf(buf.unfilled()));
307 assert_eq!(buf.filled(), &[1, 2, 3, 4]);
308
309 // FIXME: sgx uses default_read_buf that initializes the buffer.
310 if cfg!(not(target_env = "sgx")) {
311 // TcpStream::read_buf should omit buffer initialization.
312 assert_eq!(buf.init_len(), 4);
313 }
314
315 t.join().ok().expect("thread panicked");
316 })
317 }
318
319 #[test]
read_vectored()320 fn read_vectored() {
321 each_ip(&mut |addr| {
322 let srv = t!(TcpListener::bind(&addr));
323 let mut s1 = t!(TcpStream::connect(&addr));
324 let mut s2 = t!(srv.accept()).0;
325
326 let len = s1.write(&[10, 11, 12]).unwrap();
327 assert_eq!(len, 3);
328
329 let mut a = [];
330 let mut b = [0];
331 let mut c = [0; 3];
332 let len = t!(s2.read_vectored(&mut [
333 IoSliceMut::new(&mut a),
334 IoSliceMut::new(&mut b),
335 IoSliceMut::new(&mut c)
336 ],));
337 assert!(len > 0);
338 assert_eq!(b, [10]);
339 // some implementations don't support readv, so we may only fill the first buffer
340 assert!(len == 1 || c == [11, 12, 0]);
341 })
342 }
343
344 #[test]
write_vectored()345 fn write_vectored() {
346 each_ip(&mut |addr| {
347 let srv = t!(TcpListener::bind(&addr));
348 let mut s1 = t!(TcpStream::connect(&addr));
349 let mut s2 = t!(srv.accept()).0;
350
351 let a = [];
352 let b = [10];
353 let c = [11, 12];
354 t!(s1.write_vectored(&[IoSlice::new(&a), IoSlice::new(&b), IoSlice::new(&c)]));
355
356 let mut buf = [0; 4];
357 let len = t!(s2.read(&mut buf));
358 // some implementations don't support writev, so we may only write the first buffer
359 if len == 1 {
360 assert_eq!(buf, [10, 0, 0, 0]);
361 } else {
362 assert_eq!(len, 3);
363 assert_eq!(buf, [10, 11, 12, 0]);
364 }
365 })
366 }
367
368 #[test]
double_bind()369 fn double_bind() {
370 each_ip(&mut |addr| {
371 let listener1 = t!(TcpListener::bind(&addr));
372 match TcpListener::bind(&addr) {
373 Ok(listener2) => panic!(
374 "This system (perhaps due to options set by TcpListener::bind) \
375 permits double binding: {:?} and {:?}",
376 listener1, listener2
377 ),
378 Err(e) => {
379 assert!(
380 e.kind() == ErrorKind::ConnectionRefused
381 || e.kind() == ErrorKind::Uncategorized
382 || e.kind() == ErrorKind::AddrInUse,
383 "unknown error: {} {:?}",
384 e,
385 e.kind()
386 );
387 }
388 }
389 })
390 }
391
392 #[test]
tcp_clone_smoke()393 fn tcp_clone_smoke() {
394 each_ip(&mut |addr| {
395 let acceptor = t!(TcpListener::bind(&addr));
396
397 let _t = thread::spawn(move || {
398 let mut s = t!(TcpStream::connect(&addr));
399 let mut buf = [0, 0];
400 assert_eq!(s.read(&mut buf).unwrap(), 1);
401 assert_eq!(buf[0], 1);
402 t!(s.write(&[2]));
403 });
404
405 let mut s1 = t!(acceptor.accept()).0;
406 let s2 = t!(s1.try_clone());
407
408 let (tx1, rx1) = channel();
409 let (tx2, rx2) = channel();
410 let _t = thread::spawn(move || {
411 let mut s2 = s2;
412 rx1.recv().unwrap();
413 t!(s2.write(&[1]));
414 tx2.send(()).unwrap();
415 });
416 tx1.send(()).unwrap();
417 let mut buf = [0, 0];
418 assert_eq!(s1.read(&mut buf).unwrap(), 1);
419 rx2.recv().unwrap();
420 })
421 }
422
423 #[test]
tcp_clone_two_read()424 fn tcp_clone_two_read() {
425 each_ip(&mut |addr| {
426 let acceptor = t!(TcpListener::bind(&addr));
427 let (tx1, rx) = channel();
428 let tx2 = tx1.clone();
429
430 let _t = thread::spawn(move || {
431 let mut s = t!(TcpStream::connect(&addr));
432 t!(s.write(&[1]));
433 rx.recv().unwrap();
434 t!(s.write(&[2]));
435 rx.recv().unwrap();
436 });
437
438 let mut s1 = t!(acceptor.accept()).0;
439 let s2 = t!(s1.try_clone());
440
441 let (done, rx) = channel();
442 let _t = thread::spawn(move || {
443 let mut s2 = s2;
444 let mut buf = [0, 0];
445 t!(s2.read(&mut buf));
446 tx2.send(()).unwrap();
447 done.send(()).unwrap();
448 });
449 let mut buf = [0, 0];
450 t!(s1.read(&mut buf));
451 tx1.send(()).unwrap();
452
453 rx.recv().unwrap();
454 })
455 }
456
457 #[test]
tcp_clone_two_write()458 fn tcp_clone_two_write() {
459 each_ip(&mut |addr| {
460 let acceptor = t!(TcpListener::bind(&addr));
461
462 let _t = thread::spawn(move || {
463 let mut s = t!(TcpStream::connect(&addr));
464 let mut buf = [0, 1];
465 t!(s.read(&mut buf));
466 t!(s.read(&mut buf));
467 });
468
469 let mut s1 = t!(acceptor.accept()).0;
470 let s2 = t!(s1.try_clone());
471
472 let (done, rx) = channel();
473 let _t = thread::spawn(move || {
474 let mut s2 = s2;
475 t!(s2.write(&[1]));
476 done.send(()).unwrap();
477 });
478 t!(s1.write(&[2]));
479
480 rx.recv().unwrap();
481 })
482 }
483
484 #[test]
485 // FIXME: https://github.com/fortanix/rust-sgx/issues/110
486 #[cfg_attr(target_env = "sgx", ignore)]
shutdown_smoke()487 fn shutdown_smoke() {
488 each_ip(&mut |addr| {
489 let a = t!(TcpListener::bind(&addr));
490 let _t = thread::spawn(move || {
491 let mut c = t!(a.accept()).0;
492 let mut b = [0];
493 assert_eq!(c.read(&mut b).unwrap(), 0);
494 t!(c.write(&[1]));
495 });
496
497 let mut s = t!(TcpStream::connect(&addr));
498 t!(s.shutdown(Shutdown::Write));
499 assert!(s.write(&[1]).is_err());
500 let mut b = [0, 0];
501 assert_eq!(t!(s.read(&mut b)), 1);
502 assert_eq!(b[0], 1);
503 })
504 }
505
506 #[test]
507 // FIXME: https://github.com/fortanix/rust-sgx/issues/110
508 #[cfg_attr(target_env = "sgx", ignore)]
close_readwrite_smoke()509 fn close_readwrite_smoke() {
510 each_ip(&mut |addr| {
511 let a = t!(TcpListener::bind(&addr));
512 let (tx, rx) = channel::<()>();
513 let _t = thread::spawn(move || {
514 let _s = t!(a.accept());
515 let _ = rx.recv();
516 });
517
518 let mut b = [0];
519 let mut s = t!(TcpStream::connect(&addr));
520 let mut s2 = t!(s.try_clone());
521
522 // closing should prevent reads/writes
523 t!(s.shutdown(Shutdown::Write));
524 assert!(s.write(&[0]).is_err());
525 t!(s.shutdown(Shutdown::Read));
526 assert_eq!(s.read(&mut b).unwrap(), 0);
527
528 // closing should affect previous handles
529 assert!(s2.write(&[0]).is_err());
530 assert_eq!(s2.read(&mut b).unwrap(), 0);
531
532 // closing should affect new handles
533 let mut s3 = t!(s.try_clone());
534 assert!(s3.write(&[0]).is_err());
535 assert_eq!(s3.read(&mut b).unwrap(), 0);
536
537 // make sure these don't die
538 let _ = s2.shutdown(Shutdown::Read);
539 let _ = s2.shutdown(Shutdown::Write);
540 let _ = s3.shutdown(Shutdown::Read);
541 let _ = s3.shutdown(Shutdown::Write);
542 drop(tx);
543 })
544 }
545
546 #[test]
547 #[cfg_attr(target_env = "sgx", ignore)]
close_read_wakes_up()548 fn close_read_wakes_up() {
549 each_ip(&mut |addr| {
550 let a = t!(TcpListener::bind(&addr));
551 let (tx1, rx) = channel::<()>();
552 let _t = thread::spawn(move || {
553 let _s = t!(a.accept());
554 let _ = rx.recv();
555 });
556
557 let s = t!(TcpStream::connect(&addr));
558 let s2 = t!(s.try_clone());
559 let (tx, rx) = channel();
560 let _t = thread::spawn(move || {
561 let mut s2 = s2;
562 assert_eq!(t!(s2.read(&mut [0])), 0);
563 tx.send(()).unwrap();
564 });
565 // this should wake up the child thread
566 t!(s.shutdown(Shutdown::Read));
567
568 // this test will never finish if the child doesn't wake up
569 rx.recv().unwrap();
570 drop(tx1);
571 })
572 }
573
574 #[test]
clone_while_reading()575 fn clone_while_reading() {
576 each_ip(&mut |addr| {
577 let accept = t!(TcpListener::bind(&addr));
578
579 // Enqueue a thread to write to a socket
580 let (tx, rx) = channel();
581 let (txdone, rxdone) = channel();
582 let txdone2 = txdone.clone();
583 let _t = thread::spawn(move || {
584 let mut tcp = t!(TcpStream::connect(&addr));
585 rx.recv().unwrap();
586 t!(tcp.write(&[0]));
587 txdone2.send(()).unwrap();
588 });
589
590 // Spawn off a reading clone
591 let tcp = t!(accept.accept()).0;
592 let tcp2 = t!(tcp.try_clone());
593 let txdone3 = txdone.clone();
594 let _t = thread::spawn(move || {
595 let mut tcp2 = tcp2;
596 t!(tcp2.read(&mut [0]));
597 txdone3.send(()).unwrap();
598 });
599
600 // Try to ensure that the reading clone is indeed reading
601 for _ in 0..50 {
602 thread::yield_now();
603 }
604
605 // clone the handle again while it's reading, then let it finish the
606 // read.
607 let _ = t!(tcp.try_clone());
608 tx.send(()).unwrap();
609 rxdone.recv().unwrap();
610 rxdone.recv().unwrap();
611 })
612 }
613
614 #[test]
clone_accept_smoke()615 fn clone_accept_smoke() {
616 each_ip(&mut |addr| {
617 let a = t!(TcpListener::bind(&addr));
618 let a2 = t!(a.try_clone());
619
620 let _t = thread::spawn(move || {
621 let _ = TcpStream::connect(&addr);
622 });
623 let _t = thread::spawn(move || {
624 let _ = TcpStream::connect(&addr);
625 });
626
627 t!(a.accept());
628 t!(a2.accept());
629 })
630 }
631
632 #[test]
clone_accept_concurrent()633 fn clone_accept_concurrent() {
634 each_ip(&mut |addr| {
635 let a = t!(TcpListener::bind(&addr));
636 let a2 = t!(a.try_clone());
637
638 let (tx, rx) = channel();
639 let tx2 = tx.clone();
640
641 let _t = thread::spawn(move || {
642 tx.send(t!(a.accept())).unwrap();
643 });
644 let _t = thread::spawn(move || {
645 tx2.send(t!(a2.accept())).unwrap();
646 });
647
648 let _t = thread::spawn(move || {
649 let _ = TcpStream::connect(&addr);
650 });
651 let _t = thread::spawn(move || {
652 let _ = TcpStream::connect(&addr);
653 });
654
655 rx.recv().unwrap();
656 rx.recv().unwrap();
657 })
658 }
659
660 #[test]
debug()661 fn debug() {
662 #[cfg(not(target_env = "sgx"))]
663 fn render_socket_addr<'a>(addr: &'a SocketAddr) -> impl fmt::Debug + 'a {
664 addr
665 }
666 #[cfg(target_env = "sgx")]
667 fn render_socket_addr<'a>(addr: &'a SocketAddr) -> impl fmt::Debug + 'a {
668 addr.to_string()
669 }
670
671 #[cfg(target_env = "sgx")]
672 use crate::os::fortanix_sgx::io::AsRawFd;
673 #[cfg(unix)]
674 use crate::os::unix::io::AsRawFd;
675 #[cfg(not(windows))]
676 fn render_inner(addr: &dyn AsRawFd) -> impl fmt::Debug {
677 addr.as_raw_fd()
678 }
679 #[cfg(windows)]
680 fn render_inner(addr: &dyn crate::os::windows::io::AsRawSocket) -> impl fmt::Debug {
681 addr.as_raw_socket()
682 }
683
684 let inner_name = if cfg!(windows) { "socket" } else { "fd" };
685 let socket_addr = next_test_ip4();
686
687 let listener = t!(TcpListener::bind(&socket_addr));
688 let compare = format!(
689 "TcpListener {{ addr: {:?}, {}: {:?} }}",
690 render_socket_addr(&socket_addr),
691 inner_name,
692 render_inner(&listener)
693 );
694 assert_eq!(format!("{listener:?}"), compare);
695
696 let stream = t!(TcpStream::connect(&("localhost", socket_addr.port())));
697 let compare = format!(
698 "TcpStream {{ addr: {:?}, peer: {:?}, {}: {:?} }}",
699 render_socket_addr(&stream.local_addr().unwrap()),
700 render_socket_addr(&stream.peer_addr().unwrap()),
701 inner_name,
702 render_inner(&stream)
703 );
704 assert_eq!(format!("{stream:?}"), compare);
705 }
706
707 // FIXME: re-enabled openbsd tests once their socket timeout code
708 // no longer has rounding errors.
709 // VxWorks ignores SO_SNDTIMEO.
710 #[cfg_attr(
711 any(target_os = "netbsd", target_os = "openbsd", target_os = "vxworks", target_os = "nto"),
712 ignore
713 )]
714 #[cfg_attr(target_env = "sgx", ignore)] // FIXME: https://github.com/fortanix/rust-sgx/issues/31
715 #[test]
timeouts()716 fn timeouts() {
717 let addr = next_test_ip4();
718 let listener = t!(TcpListener::bind(&addr));
719
720 let stream = t!(TcpStream::connect(&("localhost", addr.port())));
721 let dur = Duration::new(15410, 0);
722
723 assert_eq!(None, t!(stream.read_timeout()));
724
725 t!(stream.set_read_timeout(Some(dur)));
726 assert_eq!(Some(dur), t!(stream.read_timeout()));
727
728 assert_eq!(None, t!(stream.write_timeout()));
729
730 t!(stream.set_write_timeout(Some(dur)));
731 assert_eq!(Some(dur), t!(stream.write_timeout()));
732
733 t!(stream.set_read_timeout(None));
734 assert_eq!(None, t!(stream.read_timeout()));
735
736 t!(stream.set_write_timeout(None));
737 assert_eq!(None, t!(stream.write_timeout()));
738 drop(listener);
739 }
740
741 #[test]
742 #[cfg_attr(target_env = "sgx", ignore)] // FIXME: https://github.com/fortanix/rust-sgx/issues/31
test_read_timeout()743 fn test_read_timeout() {
744 let addr = next_test_ip4();
745 let listener = t!(TcpListener::bind(&addr));
746
747 let mut stream = t!(TcpStream::connect(&("localhost", addr.port())));
748 t!(stream.set_read_timeout(Some(Duration::from_millis(1000))));
749
750 let mut buf = [0; 10];
751 let start = Instant::now();
752 let kind = stream.read_exact(&mut buf).err().expect("expected error").kind();
753 assert!(
754 kind == ErrorKind::WouldBlock || kind == ErrorKind::TimedOut,
755 "unexpected_error: {:?}",
756 kind
757 );
758 assert!(start.elapsed() > Duration::from_millis(400));
759 drop(listener);
760 }
761
762 #[test]
763 #[cfg_attr(target_env = "sgx", ignore)] // FIXME: https://github.com/fortanix/rust-sgx/issues/31
test_read_with_timeout()764 fn test_read_with_timeout() {
765 let addr = next_test_ip4();
766 let listener = t!(TcpListener::bind(&addr));
767
768 let mut stream = t!(TcpStream::connect(&("localhost", addr.port())));
769 t!(stream.set_read_timeout(Some(Duration::from_millis(1000))));
770
771 let mut other_end = t!(listener.accept()).0;
772 t!(other_end.write_all(b"hello world"));
773
774 let mut buf = [0; 11];
775 t!(stream.read(&mut buf));
776 assert_eq!(b"hello world", &buf[..]);
777
778 let start = Instant::now();
779 let kind = stream.read_exact(&mut buf).err().expect("expected error").kind();
780 assert!(
781 kind == ErrorKind::WouldBlock || kind == ErrorKind::TimedOut,
782 "unexpected_error: {:?}",
783 kind
784 );
785 assert!(start.elapsed() > Duration::from_millis(400));
786 drop(listener);
787 }
788
789 // Ensure the `set_read_timeout` and `set_write_timeout` calls return errors
790 // when passed zero Durations
791 #[test]
test_timeout_zero_duration()792 fn test_timeout_zero_duration() {
793 let addr = next_test_ip4();
794
795 let listener = t!(TcpListener::bind(&addr));
796 let stream = t!(TcpStream::connect(&addr));
797
798 let result = stream.set_write_timeout(Some(Duration::new(0, 0)));
799 let err = result.unwrap_err();
800 assert_eq!(err.kind(), ErrorKind::InvalidInput);
801
802 let result = stream.set_read_timeout(Some(Duration::new(0, 0)));
803 let err = result.unwrap_err();
804 assert_eq!(err.kind(), ErrorKind::InvalidInput);
805
806 drop(listener);
807 }
808
809 #[test]
810 #[cfg_attr(target_env = "sgx", ignore)]
linger()811 fn linger() {
812 let addr = next_test_ip4();
813 let _listener = t!(TcpListener::bind(&addr));
814
815 let stream = t!(TcpStream::connect(&("localhost", addr.port())));
816
817 assert_eq!(None, t!(stream.linger()));
818 t!(stream.set_linger(Some(Duration::from_secs(1))));
819 assert_eq!(Some(Duration::from_secs(1)), t!(stream.linger()));
820 t!(stream.set_linger(None));
821 assert_eq!(None, t!(stream.linger()));
822 }
823
824 #[test]
825 #[cfg_attr(target_env = "sgx", ignore)]
nodelay()826 fn nodelay() {
827 let addr = next_test_ip4();
828 let _listener = t!(TcpListener::bind(&addr));
829
830 let stream = t!(TcpStream::connect(&("localhost", addr.port())));
831
832 assert_eq!(false, t!(stream.nodelay()));
833 t!(stream.set_nodelay(true));
834 assert_eq!(true, t!(stream.nodelay()));
835 t!(stream.set_nodelay(false));
836 assert_eq!(false, t!(stream.nodelay()));
837 }
838
839 #[test]
840 #[cfg_attr(target_env = "sgx", ignore)]
ttl()841 fn ttl() {
842 let ttl = 100;
843
844 let addr = next_test_ip4();
845 let listener = t!(TcpListener::bind(&addr));
846
847 t!(listener.set_ttl(ttl));
848 assert_eq!(ttl, t!(listener.ttl()));
849
850 let stream = t!(TcpStream::connect(&("localhost", addr.port())));
851
852 t!(stream.set_ttl(ttl));
853 assert_eq!(ttl, t!(stream.ttl()));
854 }
855
856 #[test]
857 #[cfg_attr(target_env = "sgx", ignore)]
set_nonblocking()858 fn set_nonblocking() {
859 let addr = next_test_ip4();
860 let listener = t!(TcpListener::bind(&addr));
861
862 t!(listener.set_nonblocking(true));
863 t!(listener.set_nonblocking(false));
864
865 let mut stream = t!(TcpStream::connect(&("localhost", addr.port())));
866
867 t!(stream.set_nonblocking(false));
868 t!(stream.set_nonblocking(true));
869
870 let mut buf = [0];
871 match stream.read(&mut buf) {
872 Ok(_) => panic!("expected error"),
873 Err(ref e) if e.kind() == ErrorKind::WouldBlock => {}
874 Err(e) => panic!("unexpected error {e}"),
875 }
876 }
877
878 #[test]
879 #[cfg_attr(target_env = "sgx", ignore)] // FIXME: https://github.com/fortanix/rust-sgx/issues/31
peek()880 fn peek() {
881 each_ip(&mut |addr| {
882 let (txdone, rxdone) = channel();
883
884 let srv = t!(TcpListener::bind(&addr));
885 let _t = thread::spawn(move || {
886 let mut cl = t!(srv.accept()).0;
887 cl.write(&[1, 3, 3, 7]).unwrap();
888 t!(rxdone.recv());
889 });
890
891 let mut c = t!(TcpStream::connect(&addr));
892 let mut b = [0; 10];
893 for _ in 1..3 {
894 let len = c.peek(&mut b).unwrap();
895 assert_eq!(len, 4);
896 }
897 let len = c.read(&mut b).unwrap();
898 assert_eq!(len, 4);
899
900 t!(c.set_nonblocking(true));
901 match c.peek(&mut b) {
902 Ok(_) => panic!("expected error"),
903 Err(ref e) if e.kind() == ErrorKind::WouldBlock => {}
904 Err(e) => panic!("unexpected error {e}"),
905 }
906 t!(txdone.send(()));
907 })
908 }
909
910 #[test]
911 #[cfg_attr(target_env = "sgx", ignore)] // FIXME: https://github.com/fortanix/rust-sgx/issues/31
connect_timeout_valid()912 fn connect_timeout_valid() {
913 let listener = TcpListener::bind("127.0.0.1:0").unwrap();
914 let addr = listener.local_addr().unwrap();
915 TcpStream::connect_timeout(&addr, Duration::from_secs(2)).unwrap();
916 }
917