• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 use crate::fmt;
2 use crate::io::prelude::*;
3 use crate::io::{BorrowedBuf, ErrorKind, IoSlice, IoSliceMut};
4 use crate::mem::MaybeUninit;
5 use crate::net::test::{next_test_ip4, next_test_ip6};
6 use crate::net::*;
7 use crate::sync::mpsc::channel;
8 use crate::thread;
9 use crate::time::{Duration, Instant};
10 
each_ip(f: &mut dyn FnMut(SocketAddr))11 fn each_ip(f: &mut dyn FnMut(SocketAddr)) {
12     f(next_test_ip4());
13     f(next_test_ip6());
14 }
15 
16 macro_rules! t {
17     ($e:expr) => {
18         match $e {
19             Ok(t) => t,
20             Err(e) => panic!("received error for `{}`: {}", stringify!($e), e),
21         }
22     };
23 }
24 
25 #[test]
bind_error()26 fn bind_error() {
27     match TcpListener::bind("1.1.1.1:9999") {
28         Ok(..) => panic!(),
29         Err(e) => assert_eq!(e.kind(), ErrorKind::AddrNotAvailable),
30     }
31 }
32 
33 #[test]
connect_error()34 fn connect_error() {
35     match TcpStream::connect("0.0.0.0:1") {
36         Ok(..) => panic!(),
37         Err(e) => assert!(
38             e.kind() == ErrorKind::ConnectionRefused
39                 || e.kind() == ErrorKind::InvalidInput
40                 || e.kind() == ErrorKind::AddrInUse
41                 || e.kind() == ErrorKind::AddrNotAvailable,
42             "bad error: {} {:?}",
43             e,
44             e.kind()
45         ),
46     }
47 }
48 
49 #[test]
50 #[cfg_attr(target_env = "sgx", ignore)] // FIXME: https://github.com/fortanix/rust-sgx/issues/31
connect_timeout_error()51 fn connect_timeout_error() {
52     let socket_addr = next_test_ip4();
53     let result = TcpStream::connect_timeout(&socket_addr, Duration::MAX);
54     assert!(!matches!(result, Err(e) if e.kind() == ErrorKind::TimedOut));
55 
56     let _listener = TcpListener::bind(&socket_addr).unwrap();
57     assert!(TcpStream::connect_timeout(&socket_addr, Duration::MAX).is_ok());
58 }
59 
60 #[test]
listen_localhost()61 fn listen_localhost() {
62     let socket_addr = next_test_ip4();
63     let listener = t!(TcpListener::bind(&socket_addr));
64 
65     let _t = thread::spawn(move || {
66         let mut stream = t!(TcpStream::connect(&("localhost", socket_addr.port())));
67         t!(stream.write(&[144]));
68     });
69 
70     let mut stream = t!(listener.accept()).0;
71     let mut buf = [0];
72     t!(stream.read(&mut buf));
73     assert!(buf[0] == 144);
74 }
75 
76 #[test]
connect_loopback()77 fn connect_loopback() {
78     each_ip(&mut |addr| {
79         let acceptor = t!(TcpListener::bind(&addr));
80 
81         let _t = thread::spawn(move || {
82             let host = match addr {
83                 SocketAddr::V4(..) => "127.0.0.1",
84                 SocketAddr::V6(..) => "::1",
85             };
86             let mut stream = t!(TcpStream::connect(&(host, addr.port())));
87             t!(stream.write(&[66]));
88         });
89 
90         let mut stream = t!(acceptor.accept()).0;
91         let mut buf = [0];
92         t!(stream.read(&mut buf));
93         assert!(buf[0] == 66);
94     })
95 }
96 
97 #[test]
smoke_test()98 fn smoke_test() {
99     each_ip(&mut |addr| {
100         let acceptor = t!(TcpListener::bind(&addr));
101 
102         let (tx, rx) = channel();
103         let _t = thread::spawn(move || {
104             let mut stream = t!(TcpStream::connect(&addr));
105             t!(stream.write(&[99]));
106             tx.send(t!(stream.local_addr())).unwrap();
107         });
108 
109         let (mut stream, addr) = t!(acceptor.accept());
110         let mut buf = [0];
111         t!(stream.read(&mut buf));
112         assert!(buf[0] == 99);
113         assert_eq!(addr, t!(rx.recv()));
114     })
115 }
116 
117 #[test]
read_eof()118 fn read_eof() {
119     each_ip(&mut |addr| {
120         let acceptor = t!(TcpListener::bind(&addr));
121 
122         let _t = thread::spawn(move || {
123             let _stream = t!(TcpStream::connect(&addr));
124             // Close
125         });
126 
127         let mut stream = t!(acceptor.accept()).0;
128         let mut buf = [0];
129         let nread = t!(stream.read(&mut buf));
130         assert_eq!(nread, 0);
131         let nread = t!(stream.read(&mut buf));
132         assert_eq!(nread, 0);
133     })
134 }
135 
136 #[test]
write_close()137 fn write_close() {
138     each_ip(&mut |addr| {
139         let acceptor = t!(TcpListener::bind(&addr));
140 
141         let (tx, rx) = channel();
142         let _t = thread::spawn(move || {
143             drop(t!(TcpStream::connect(&addr)));
144             tx.send(()).unwrap();
145         });
146 
147         let mut stream = t!(acceptor.accept()).0;
148         rx.recv().unwrap();
149         let buf = [0];
150         match stream.write(&buf) {
151             Ok(..) => {}
152             Err(e) => {
153                 assert!(
154                     e.kind() == ErrorKind::ConnectionReset
155                         || e.kind() == ErrorKind::BrokenPipe
156                         || e.kind() == ErrorKind::ConnectionAborted,
157                     "unknown error: {e}"
158                 );
159             }
160         }
161     })
162 }
163 
164 #[test]
multiple_connect_serial()165 fn multiple_connect_serial() {
166     each_ip(&mut |addr| {
167         let max = 10;
168         let acceptor = t!(TcpListener::bind(&addr));
169 
170         let _t = thread::spawn(move || {
171             for _ in 0..max {
172                 let mut stream = t!(TcpStream::connect(&addr));
173                 t!(stream.write(&[99]));
174             }
175         });
176 
177         for stream in acceptor.incoming().take(max) {
178             let mut stream = t!(stream);
179             let mut buf = [0];
180             t!(stream.read(&mut buf));
181             assert_eq!(buf[0], 99);
182         }
183     })
184 }
185 
186 #[test]
multiple_connect_interleaved_greedy_schedule()187 fn multiple_connect_interleaved_greedy_schedule() {
188     const MAX: usize = 10;
189     each_ip(&mut |addr| {
190         let acceptor = t!(TcpListener::bind(&addr));
191 
192         let _t = thread::spawn(move || {
193             let acceptor = acceptor;
194             for (i, stream) in acceptor.incoming().enumerate().take(MAX) {
195                 // Start another thread to handle the connection
196                 let _t = thread::spawn(move || {
197                     let mut stream = t!(stream);
198                     let mut buf = [0];
199                     t!(stream.read(&mut buf));
200                     assert!(buf[0] == i as u8);
201                 });
202             }
203         });
204 
205         connect(0, addr);
206     });
207 
208     fn connect(i: usize, addr: SocketAddr) {
209         if i == MAX {
210             return;
211         }
212 
213         let t = thread::spawn(move || {
214             let mut stream = t!(TcpStream::connect(&addr));
215             // Connect again before writing
216             connect(i + 1, addr);
217             t!(stream.write(&[i as u8]));
218         });
219         t.join().ok().expect("thread panicked");
220     }
221 }
222 
223 #[test]
multiple_connect_interleaved_lazy_schedule()224 fn multiple_connect_interleaved_lazy_schedule() {
225     const MAX: usize = 10;
226     each_ip(&mut |addr| {
227         let acceptor = t!(TcpListener::bind(&addr));
228 
229         let _t = thread::spawn(move || {
230             for stream in acceptor.incoming().take(MAX) {
231                 // Start another thread to handle the connection
232                 let _t = thread::spawn(move || {
233                     let mut stream = t!(stream);
234                     let mut buf = [0];
235                     t!(stream.read(&mut buf));
236                     assert!(buf[0] == 99);
237                 });
238             }
239         });
240 
241         connect(0, addr);
242     });
243 
244     fn connect(i: usize, addr: SocketAddr) {
245         if i == MAX {
246             return;
247         }
248 
249         let t = thread::spawn(move || {
250             let mut stream = t!(TcpStream::connect(&addr));
251             connect(i + 1, addr);
252             t!(stream.write(&[99]));
253         });
254         t.join().ok().expect("thread panicked");
255     }
256 }
257 
258 #[test]
socket_and_peer_name()259 fn socket_and_peer_name() {
260     each_ip(&mut |addr| {
261         let listener = t!(TcpListener::bind(&addr));
262         let so_name = t!(listener.local_addr());
263         assert_eq!(addr, so_name);
264         let _t = thread::spawn(move || {
265             t!(listener.accept());
266         });
267 
268         let stream = t!(TcpStream::connect(&addr));
269         assert_eq!(addr, t!(stream.peer_addr()));
270     })
271 }
272 
273 #[test]
partial_read()274 fn partial_read() {
275     each_ip(&mut |addr| {
276         let (tx, rx) = channel();
277         let srv = t!(TcpListener::bind(&addr));
278         let _t = thread::spawn(move || {
279             let mut cl = t!(srv.accept()).0;
280             cl.write(&[10]).unwrap();
281             let mut b = [0];
282             t!(cl.read(&mut b));
283             tx.send(()).unwrap();
284         });
285 
286         let mut c = t!(TcpStream::connect(&addr));
287         let mut b = [0; 10];
288         assert_eq!(c.read(&mut b).unwrap(), 1);
289         t!(c.write(&[1]));
290         rx.recv().unwrap();
291     })
292 }
293 
294 #[test]
read_buf()295 fn read_buf() {
296     each_ip(&mut |addr| {
297         let srv = t!(TcpListener::bind(&addr));
298         let t = thread::spawn(move || {
299             let mut s = t!(TcpStream::connect(&addr));
300             s.write_all(&[1, 2, 3, 4]).unwrap();
301         });
302 
303         let mut s = t!(srv.accept()).0;
304         let mut buf: [MaybeUninit<u8>; 128] = MaybeUninit::uninit_array();
305         let mut buf = BorrowedBuf::from(buf.as_mut_slice());
306         t!(s.read_buf(buf.unfilled()));
307         assert_eq!(buf.filled(), &[1, 2, 3, 4]);
308 
309         // FIXME: sgx uses default_read_buf that initializes the buffer.
310         if cfg!(not(target_env = "sgx")) {
311             // TcpStream::read_buf should omit buffer initialization.
312             assert_eq!(buf.init_len(), 4);
313         }
314 
315         t.join().ok().expect("thread panicked");
316     })
317 }
318 
319 #[test]
read_vectored()320 fn read_vectored() {
321     each_ip(&mut |addr| {
322         let srv = t!(TcpListener::bind(&addr));
323         let mut s1 = t!(TcpStream::connect(&addr));
324         let mut s2 = t!(srv.accept()).0;
325 
326         let len = s1.write(&[10, 11, 12]).unwrap();
327         assert_eq!(len, 3);
328 
329         let mut a = [];
330         let mut b = [0];
331         let mut c = [0; 3];
332         let len = t!(s2.read_vectored(&mut [
333             IoSliceMut::new(&mut a),
334             IoSliceMut::new(&mut b),
335             IoSliceMut::new(&mut c)
336         ],));
337         assert!(len > 0);
338         assert_eq!(b, [10]);
339         // some implementations don't support readv, so we may only fill the first buffer
340         assert!(len == 1 || c == [11, 12, 0]);
341     })
342 }
343 
344 #[test]
write_vectored()345 fn write_vectored() {
346     each_ip(&mut |addr| {
347         let srv = t!(TcpListener::bind(&addr));
348         let mut s1 = t!(TcpStream::connect(&addr));
349         let mut s2 = t!(srv.accept()).0;
350 
351         let a = [];
352         let b = [10];
353         let c = [11, 12];
354         t!(s1.write_vectored(&[IoSlice::new(&a), IoSlice::new(&b), IoSlice::new(&c)]));
355 
356         let mut buf = [0; 4];
357         let len = t!(s2.read(&mut buf));
358         // some implementations don't support writev, so we may only write the first buffer
359         if len == 1 {
360             assert_eq!(buf, [10, 0, 0, 0]);
361         } else {
362             assert_eq!(len, 3);
363             assert_eq!(buf, [10, 11, 12, 0]);
364         }
365     })
366 }
367 
368 #[test]
double_bind()369 fn double_bind() {
370     each_ip(&mut |addr| {
371         let listener1 = t!(TcpListener::bind(&addr));
372         match TcpListener::bind(&addr) {
373             Ok(listener2) => panic!(
374                 "This system (perhaps due to options set by TcpListener::bind) \
375                  permits double binding: {:?} and {:?}",
376                 listener1, listener2
377             ),
378             Err(e) => {
379                 assert!(
380                     e.kind() == ErrorKind::ConnectionRefused
381                         || e.kind() == ErrorKind::Uncategorized
382                         || e.kind() == ErrorKind::AddrInUse,
383                     "unknown error: {} {:?}",
384                     e,
385                     e.kind()
386                 );
387             }
388         }
389     })
390 }
391 
392 #[test]
tcp_clone_smoke()393 fn tcp_clone_smoke() {
394     each_ip(&mut |addr| {
395         let acceptor = t!(TcpListener::bind(&addr));
396 
397         let _t = thread::spawn(move || {
398             let mut s = t!(TcpStream::connect(&addr));
399             let mut buf = [0, 0];
400             assert_eq!(s.read(&mut buf).unwrap(), 1);
401             assert_eq!(buf[0], 1);
402             t!(s.write(&[2]));
403         });
404 
405         let mut s1 = t!(acceptor.accept()).0;
406         let s2 = t!(s1.try_clone());
407 
408         let (tx1, rx1) = channel();
409         let (tx2, rx2) = channel();
410         let _t = thread::spawn(move || {
411             let mut s2 = s2;
412             rx1.recv().unwrap();
413             t!(s2.write(&[1]));
414             tx2.send(()).unwrap();
415         });
416         tx1.send(()).unwrap();
417         let mut buf = [0, 0];
418         assert_eq!(s1.read(&mut buf).unwrap(), 1);
419         rx2.recv().unwrap();
420     })
421 }
422 
423 #[test]
tcp_clone_two_read()424 fn tcp_clone_two_read() {
425     each_ip(&mut |addr| {
426         let acceptor = t!(TcpListener::bind(&addr));
427         let (tx1, rx) = channel();
428         let tx2 = tx1.clone();
429 
430         let _t = thread::spawn(move || {
431             let mut s = t!(TcpStream::connect(&addr));
432             t!(s.write(&[1]));
433             rx.recv().unwrap();
434             t!(s.write(&[2]));
435             rx.recv().unwrap();
436         });
437 
438         let mut s1 = t!(acceptor.accept()).0;
439         let s2 = t!(s1.try_clone());
440 
441         let (done, rx) = channel();
442         let _t = thread::spawn(move || {
443             let mut s2 = s2;
444             let mut buf = [0, 0];
445             t!(s2.read(&mut buf));
446             tx2.send(()).unwrap();
447             done.send(()).unwrap();
448         });
449         let mut buf = [0, 0];
450         t!(s1.read(&mut buf));
451         tx1.send(()).unwrap();
452 
453         rx.recv().unwrap();
454     })
455 }
456 
457 #[test]
tcp_clone_two_write()458 fn tcp_clone_two_write() {
459     each_ip(&mut |addr| {
460         let acceptor = t!(TcpListener::bind(&addr));
461 
462         let _t = thread::spawn(move || {
463             let mut s = t!(TcpStream::connect(&addr));
464             let mut buf = [0, 1];
465             t!(s.read(&mut buf));
466             t!(s.read(&mut buf));
467         });
468 
469         let mut s1 = t!(acceptor.accept()).0;
470         let s2 = t!(s1.try_clone());
471 
472         let (done, rx) = channel();
473         let _t = thread::spawn(move || {
474             let mut s2 = s2;
475             t!(s2.write(&[1]));
476             done.send(()).unwrap();
477         });
478         t!(s1.write(&[2]));
479 
480         rx.recv().unwrap();
481     })
482 }
483 
484 #[test]
485 // FIXME: https://github.com/fortanix/rust-sgx/issues/110
486 #[cfg_attr(target_env = "sgx", ignore)]
shutdown_smoke()487 fn shutdown_smoke() {
488     each_ip(&mut |addr| {
489         let a = t!(TcpListener::bind(&addr));
490         let _t = thread::spawn(move || {
491             let mut c = t!(a.accept()).0;
492             let mut b = [0];
493             assert_eq!(c.read(&mut b).unwrap(), 0);
494             t!(c.write(&[1]));
495         });
496 
497         let mut s = t!(TcpStream::connect(&addr));
498         t!(s.shutdown(Shutdown::Write));
499         assert!(s.write(&[1]).is_err());
500         let mut b = [0, 0];
501         assert_eq!(t!(s.read(&mut b)), 1);
502         assert_eq!(b[0], 1);
503     })
504 }
505 
506 #[test]
507 // FIXME: https://github.com/fortanix/rust-sgx/issues/110
508 #[cfg_attr(target_env = "sgx", ignore)]
close_readwrite_smoke()509 fn close_readwrite_smoke() {
510     each_ip(&mut |addr| {
511         let a = t!(TcpListener::bind(&addr));
512         let (tx, rx) = channel::<()>();
513         let _t = thread::spawn(move || {
514             let _s = t!(a.accept());
515             let _ = rx.recv();
516         });
517 
518         let mut b = [0];
519         let mut s = t!(TcpStream::connect(&addr));
520         let mut s2 = t!(s.try_clone());
521 
522         // closing should prevent reads/writes
523         t!(s.shutdown(Shutdown::Write));
524         assert!(s.write(&[0]).is_err());
525         t!(s.shutdown(Shutdown::Read));
526         assert_eq!(s.read(&mut b).unwrap(), 0);
527 
528         // closing should affect previous handles
529         assert!(s2.write(&[0]).is_err());
530         assert_eq!(s2.read(&mut b).unwrap(), 0);
531 
532         // closing should affect new handles
533         let mut s3 = t!(s.try_clone());
534         assert!(s3.write(&[0]).is_err());
535         assert_eq!(s3.read(&mut b).unwrap(), 0);
536 
537         // make sure these don't die
538         let _ = s2.shutdown(Shutdown::Read);
539         let _ = s2.shutdown(Shutdown::Write);
540         let _ = s3.shutdown(Shutdown::Read);
541         let _ = s3.shutdown(Shutdown::Write);
542         drop(tx);
543     })
544 }
545 
546 #[test]
547 #[cfg_attr(target_env = "sgx", ignore)]
close_read_wakes_up()548 fn close_read_wakes_up() {
549     each_ip(&mut |addr| {
550         let a = t!(TcpListener::bind(&addr));
551         let (tx1, rx) = channel::<()>();
552         let _t = thread::spawn(move || {
553             let _s = t!(a.accept());
554             let _ = rx.recv();
555         });
556 
557         let s = t!(TcpStream::connect(&addr));
558         let s2 = t!(s.try_clone());
559         let (tx, rx) = channel();
560         let _t = thread::spawn(move || {
561             let mut s2 = s2;
562             assert_eq!(t!(s2.read(&mut [0])), 0);
563             tx.send(()).unwrap();
564         });
565         // this should wake up the child thread
566         t!(s.shutdown(Shutdown::Read));
567 
568         // this test will never finish if the child doesn't wake up
569         rx.recv().unwrap();
570         drop(tx1);
571     })
572 }
573 
574 #[test]
clone_while_reading()575 fn clone_while_reading() {
576     each_ip(&mut |addr| {
577         let accept = t!(TcpListener::bind(&addr));
578 
579         // Enqueue a thread to write to a socket
580         let (tx, rx) = channel();
581         let (txdone, rxdone) = channel();
582         let txdone2 = txdone.clone();
583         let _t = thread::spawn(move || {
584             let mut tcp = t!(TcpStream::connect(&addr));
585             rx.recv().unwrap();
586             t!(tcp.write(&[0]));
587             txdone2.send(()).unwrap();
588         });
589 
590         // Spawn off a reading clone
591         let tcp = t!(accept.accept()).0;
592         let tcp2 = t!(tcp.try_clone());
593         let txdone3 = txdone.clone();
594         let _t = thread::spawn(move || {
595             let mut tcp2 = tcp2;
596             t!(tcp2.read(&mut [0]));
597             txdone3.send(()).unwrap();
598         });
599 
600         // Try to ensure that the reading clone is indeed reading
601         for _ in 0..50 {
602             thread::yield_now();
603         }
604 
605         // clone the handle again while it's reading, then let it finish the
606         // read.
607         let _ = t!(tcp.try_clone());
608         tx.send(()).unwrap();
609         rxdone.recv().unwrap();
610         rxdone.recv().unwrap();
611     })
612 }
613 
614 #[test]
clone_accept_smoke()615 fn clone_accept_smoke() {
616     each_ip(&mut |addr| {
617         let a = t!(TcpListener::bind(&addr));
618         let a2 = t!(a.try_clone());
619 
620         let _t = thread::spawn(move || {
621             let _ = TcpStream::connect(&addr);
622         });
623         let _t = thread::spawn(move || {
624             let _ = TcpStream::connect(&addr);
625         });
626 
627         t!(a.accept());
628         t!(a2.accept());
629     })
630 }
631 
632 #[test]
clone_accept_concurrent()633 fn clone_accept_concurrent() {
634     each_ip(&mut |addr| {
635         let a = t!(TcpListener::bind(&addr));
636         let a2 = t!(a.try_clone());
637 
638         let (tx, rx) = channel();
639         let tx2 = tx.clone();
640 
641         let _t = thread::spawn(move || {
642             tx.send(t!(a.accept())).unwrap();
643         });
644         let _t = thread::spawn(move || {
645             tx2.send(t!(a2.accept())).unwrap();
646         });
647 
648         let _t = thread::spawn(move || {
649             let _ = TcpStream::connect(&addr);
650         });
651         let _t = thread::spawn(move || {
652             let _ = TcpStream::connect(&addr);
653         });
654 
655         rx.recv().unwrap();
656         rx.recv().unwrap();
657     })
658 }
659 
660 #[test]
debug()661 fn debug() {
662     #[cfg(not(target_env = "sgx"))]
663     fn render_socket_addr<'a>(addr: &'a SocketAddr) -> impl fmt::Debug + 'a {
664         addr
665     }
666     #[cfg(target_env = "sgx")]
667     fn render_socket_addr<'a>(addr: &'a SocketAddr) -> impl fmt::Debug + 'a {
668         addr.to_string()
669     }
670 
671     #[cfg(target_env = "sgx")]
672     use crate::os::fortanix_sgx::io::AsRawFd;
673     #[cfg(unix)]
674     use crate::os::unix::io::AsRawFd;
675     #[cfg(not(windows))]
676     fn render_inner(addr: &dyn AsRawFd) -> impl fmt::Debug {
677         addr.as_raw_fd()
678     }
679     #[cfg(windows)]
680     fn render_inner(addr: &dyn crate::os::windows::io::AsRawSocket) -> impl fmt::Debug {
681         addr.as_raw_socket()
682     }
683 
684     let inner_name = if cfg!(windows) { "socket" } else { "fd" };
685     let socket_addr = next_test_ip4();
686 
687     let listener = t!(TcpListener::bind(&socket_addr));
688     let compare = format!(
689         "TcpListener {{ addr: {:?}, {}: {:?} }}",
690         render_socket_addr(&socket_addr),
691         inner_name,
692         render_inner(&listener)
693     );
694     assert_eq!(format!("{listener:?}"), compare);
695 
696     let stream = t!(TcpStream::connect(&("localhost", socket_addr.port())));
697     let compare = format!(
698         "TcpStream {{ addr: {:?}, peer: {:?}, {}: {:?} }}",
699         render_socket_addr(&stream.local_addr().unwrap()),
700         render_socket_addr(&stream.peer_addr().unwrap()),
701         inner_name,
702         render_inner(&stream)
703     );
704     assert_eq!(format!("{stream:?}"), compare);
705 }
706 
707 // FIXME: re-enabled openbsd tests once their socket timeout code
708 //        no longer has rounding errors.
709 // VxWorks ignores SO_SNDTIMEO.
710 #[cfg_attr(
711     any(target_os = "netbsd", target_os = "openbsd", target_os = "vxworks", target_os = "nto"),
712     ignore
713 )]
714 #[cfg_attr(target_env = "sgx", ignore)] // FIXME: https://github.com/fortanix/rust-sgx/issues/31
715 #[test]
timeouts()716 fn timeouts() {
717     let addr = next_test_ip4();
718     let listener = t!(TcpListener::bind(&addr));
719 
720     let stream = t!(TcpStream::connect(&("localhost", addr.port())));
721     let dur = Duration::new(15410, 0);
722 
723     assert_eq!(None, t!(stream.read_timeout()));
724 
725     t!(stream.set_read_timeout(Some(dur)));
726     assert_eq!(Some(dur), t!(stream.read_timeout()));
727 
728     assert_eq!(None, t!(stream.write_timeout()));
729 
730     t!(stream.set_write_timeout(Some(dur)));
731     assert_eq!(Some(dur), t!(stream.write_timeout()));
732 
733     t!(stream.set_read_timeout(None));
734     assert_eq!(None, t!(stream.read_timeout()));
735 
736     t!(stream.set_write_timeout(None));
737     assert_eq!(None, t!(stream.write_timeout()));
738     drop(listener);
739 }
740 
741 #[test]
742 #[cfg_attr(target_env = "sgx", ignore)] // FIXME: https://github.com/fortanix/rust-sgx/issues/31
test_read_timeout()743 fn test_read_timeout() {
744     let addr = next_test_ip4();
745     let listener = t!(TcpListener::bind(&addr));
746 
747     let mut stream = t!(TcpStream::connect(&("localhost", addr.port())));
748     t!(stream.set_read_timeout(Some(Duration::from_millis(1000))));
749 
750     let mut buf = [0; 10];
751     let start = Instant::now();
752     let kind = stream.read_exact(&mut buf).err().expect("expected error").kind();
753     assert!(
754         kind == ErrorKind::WouldBlock || kind == ErrorKind::TimedOut,
755         "unexpected_error: {:?}",
756         kind
757     );
758     assert!(start.elapsed() > Duration::from_millis(400));
759     drop(listener);
760 }
761 
762 #[test]
763 #[cfg_attr(target_env = "sgx", ignore)] // FIXME: https://github.com/fortanix/rust-sgx/issues/31
test_read_with_timeout()764 fn test_read_with_timeout() {
765     let addr = next_test_ip4();
766     let listener = t!(TcpListener::bind(&addr));
767 
768     let mut stream = t!(TcpStream::connect(&("localhost", addr.port())));
769     t!(stream.set_read_timeout(Some(Duration::from_millis(1000))));
770 
771     let mut other_end = t!(listener.accept()).0;
772     t!(other_end.write_all(b"hello world"));
773 
774     let mut buf = [0; 11];
775     t!(stream.read(&mut buf));
776     assert_eq!(b"hello world", &buf[..]);
777 
778     let start = Instant::now();
779     let kind = stream.read_exact(&mut buf).err().expect("expected error").kind();
780     assert!(
781         kind == ErrorKind::WouldBlock || kind == ErrorKind::TimedOut,
782         "unexpected_error: {:?}",
783         kind
784     );
785     assert!(start.elapsed() > Duration::from_millis(400));
786     drop(listener);
787 }
788 
789 // Ensure the `set_read_timeout` and `set_write_timeout` calls return errors
790 // when passed zero Durations
791 #[test]
test_timeout_zero_duration()792 fn test_timeout_zero_duration() {
793     let addr = next_test_ip4();
794 
795     let listener = t!(TcpListener::bind(&addr));
796     let stream = t!(TcpStream::connect(&addr));
797 
798     let result = stream.set_write_timeout(Some(Duration::new(0, 0)));
799     let err = result.unwrap_err();
800     assert_eq!(err.kind(), ErrorKind::InvalidInput);
801 
802     let result = stream.set_read_timeout(Some(Duration::new(0, 0)));
803     let err = result.unwrap_err();
804     assert_eq!(err.kind(), ErrorKind::InvalidInput);
805 
806     drop(listener);
807 }
808 
809 #[test]
810 #[cfg_attr(target_env = "sgx", ignore)]
linger()811 fn linger() {
812     let addr = next_test_ip4();
813     let _listener = t!(TcpListener::bind(&addr));
814 
815     let stream = t!(TcpStream::connect(&("localhost", addr.port())));
816 
817     assert_eq!(None, t!(stream.linger()));
818     t!(stream.set_linger(Some(Duration::from_secs(1))));
819     assert_eq!(Some(Duration::from_secs(1)), t!(stream.linger()));
820     t!(stream.set_linger(None));
821     assert_eq!(None, t!(stream.linger()));
822 }
823 
824 #[test]
825 #[cfg_attr(target_env = "sgx", ignore)]
nodelay()826 fn nodelay() {
827     let addr = next_test_ip4();
828     let _listener = t!(TcpListener::bind(&addr));
829 
830     let stream = t!(TcpStream::connect(&("localhost", addr.port())));
831 
832     assert_eq!(false, t!(stream.nodelay()));
833     t!(stream.set_nodelay(true));
834     assert_eq!(true, t!(stream.nodelay()));
835     t!(stream.set_nodelay(false));
836     assert_eq!(false, t!(stream.nodelay()));
837 }
838 
839 #[test]
840 #[cfg_attr(target_env = "sgx", ignore)]
ttl()841 fn ttl() {
842     let ttl = 100;
843 
844     let addr = next_test_ip4();
845     let listener = t!(TcpListener::bind(&addr));
846 
847     t!(listener.set_ttl(ttl));
848     assert_eq!(ttl, t!(listener.ttl()));
849 
850     let stream = t!(TcpStream::connect(&("localhost", addr.port())));
851 
852     t!(stream.set_ttl(ttl));
853     assert_eq!(ttl, t!(stream.ttl()));
854 }
855 
856 #[test]
857 #[cfg_attr(target_env = "sgx", ignore)]
set_nonblocking()858 fn set_nonblocking() {
859     let addr = next_test_ip4();
860     let listener = t!(TcpListener::bind(&addr));
861 
862     t!(listener.set_nonblocking(true));
863     t!(listener.set_nonblocking(false));
864 
865     let mut stream = t!(TcpStream::connect(&("localhost", addr.port())));
866 
867     t!(stream.set_nonblocking(false));
868     t!(stream.set_nonblocking(true));
869 
870     let mut buf = [0];
871     match stream.read(&mut buf) {
872         Ok(_) => panic!("expected error"),
873         Err(ref e) if e.kind() == ErrorKind::WouldBlock => {}
874         Err(e) => panic!("unexpected error {e}"),
875     }
876 }
877 
878 #[test]
879 #[cfg_attr(target_env = "sgx", ignore)] // FIXME: https://github.com/fortanix/rust-sgx/issues/31
peek()880 fn peek() {
881     each_ip(&mut |addr| {
882         let (txdone, rxdone) = channel();
883 
884         let srv = t!(TcpListener::bind(&addr));
885         let _t = thread::spawn(move || {
886             let mut cl = t!(srv.accept()).0;
887             cl.write(&[1, 3, 3, 7]).unwrap();
888             t!(rxdone.recv());
889         });
890 
891         let mut c = t!(TcpStream::connect(&addr));
892         let mut b = [0; 10];
893         for _ in 1..3 {
894             let len = c.peek(&mut b).unwrap();
895             assert_eq!(len, 4);
896         }
897         let len = c.read(&mut b).unwrap();
898         assert_eq!(len, 4);
899 
900         t!(c.set_nonblocking(true));
901         match c.peek(&mut b) {
902             Ok(_) => panic!("expected error"),
903             Err(ref e) if e.kind() == ErrorKind::WouldBlock => {}
904             Err(e) => panic!("unexpected error {e}"),
905         }
906         t!(txdone.send(()));
907     })
908 }
909 
910 #[test]
911 #[cfg_attr(target_env = "sgx", ignore)] // FIXME: https://github.com/fortanix/rust-sgx/issues/31
connect_timeout_valid()912 fn connect_timeout_valid() {
913     let listener = TcpListener::bind("127.0.0.1:0").unwrap();
914     let addr = listener.local_addr().unwrap();
915     TcpStream::connect_timeout(&addr, Duration::from_secs(2)).unwrap();
916 }
917