• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 #![warn(rust_2018_idioms)]
2 #![cfg(feature = "full")]
3 
4 // All io tests that deal with shutdown is currently ignored because there are known bugs in with
5 // shutting down the io driver while concurrently registering new resources. See
6 // https://github.com/tokio-rs/tokio/pull/3569#pullrequestreview-612703467 fo more details.
7 //
8 // When this has been fixed we want to re-enable these tests.
9 
10 use std::time::Duration;
11 use tokio::runtime::{Handle, Runtime};
12 use tokio::sync::mpsc;
13 #[cfg(not(tokio_wasi))]
14 use tokio::{net, time};
15 
16 #[cfg(not(tokio_wasi))] // Wasi doesn't support threads
17 macro_rules! multi_threaded_rt_test {
18     ($($t:tt)*) => {
19         mod threaded_scheduler_4_threads_only {
20             use super::*;
21 
22             $($t)*
23 
24             fn rt() -> Runtime {
25                 tokio::runtime::Builder::new_multi_thread()
26                     .worker_threads(4)
27                     .enable_all()
28                     .build()
29                     .unwrap()
30             }
31         }
32 
33         mod threaded_scheduler_1_thread_only {
34             use super::*;
35 
36             $($t)*
37 
38             fn rt() -> Runtime {
39                 tokio::runtime::Builder::new_multi_thread()
40                     .worker_threads(1)
41                     .enable_all()
42                     .build()
43                     .unwrap()
44             }
45         }
46     }
47 }
48 
49 #[cfg(not(tokio_wasi))]
50 macro_rules! rt_test {
51     ($($t:tt)*) => {
52         mod current_thread_scheduler {
53             use super::*;
54 
55             $($t)*
56 
57             fn rt() -> Runtime {
58                 tokio::runtime::Builder::new_current_thread()
59                     .enable_all()
60                     .build()
61                     .unwrap()
62             }
63         }
64 
65         mod threaded_scheduler_4_threads {
66             use super::*;
67 
68             $($t)*
69 
70             fn rt() -> Runtime {
71                 tokio::runtime::Builder::new_multi_thread()
72                     .worker_threads(4)
73                     .enable_all()
74                     .build()
75                     .unwrap()
76             }
77         }
78 
79         mod threaded_scheduler_1_thread {
80             use super::*;
81 
82             $($t)*
83 
84             fn rt() -> Runtime {
85                 tokio::runtime::Builder::new_multi_thread()
86                     .worker_threads(1)
87                     .enable_all()
88                     .build()
89                     .unwrap()
90             }
91         }
92     }
93 }
94 
95 // ==== runtime independent futures ======
96 
97 #[test]
basic()98 fn basic() {
99     test_with_runtimes(|| {
100         let one = Handle::current().block_on(async { 1 });
101         assert_eq!(1, one);
102     });
103 }
104 
105 #[test]
bounded_mpsc_channel()106 fn bounded_mpsc_channel() {
107     test_with_runtimes(|| {
108         let (tx, mut rx) = mpsc::channel(1024);
109 
110         Handle::current().block_on(tx.send(42)).unwrap();
111 
112         let value = Handle::current().block_on(rx.recv()).unwrap();
113         assert_eq!(value, 42);
114     });
115 }
116 
117 #[test]
unbounded_mpsc_channel()118 fn unbounded_mpsc_channel() {
119     test_with_runtimes(|| {
120         let (tx, mut rx) = mpsc::unbounded_channel();
121 
122         let _ = tx.send(42);
123 
124         let value = Handle::current().block_on(rx.recv()).unwrap();
125         assert_eq!(value, 42);
126     })
127 }
128 
129 #[cfg(not(tokio_wasi))] // Wasi doesn't support file operations or bind
130 rt_test! {
131     use tokio::fs;
132     // ==== spawn blocking futures ======
133 
134     #[test]
135     fn basic_fs() {
136         let rt = rt();
137         let _enter = rt.enter();
138 
139         let contents = Handle::current()
140             .block_on(fs::read_to_string("Cargo.toml"))
141             .unwrap();
142         assert!(contents.contains("https://tokio.rs"));
143     }
144 
145     #[test]
146     fn fs_shutdown_before_started() {
147         let rt = rt();
148         let _enter = rt.enter();
149         rt.shutdown_timeout(Duration::from_secs(1000));
150 
151         let err: std::io::Error = Handle::current()
152             .block_on(fs::read_to_string("Cargo.toml"))
153             .unwrap_err();
154 
155         assert_eq!(err.kind(), std::io::ErrorKind::Other);
156 
157         let inner_err = err.get_ref().expect("no inner error");
158         assert_eq!(inner_err.to_string(), "background task failed");
159     }
160 
161     #[test]
162     fn basic_spawn_blocking() {
163         use tokio::task::spawn_blocking;
164         let rt = rt();
165         let _enter = rt.enter();
166 
167         let answer = Handle::current()
168             .block_on(spawn_blocking(|| {
169                 std::thread::sleep(Duration::from_millis(100));
170                 42
171             }))
172             .unwrap();
173 
174         assert_eq!(answer, 42);
175     }
176 
177     #[test]
178     fn spawn_blocking_after_shutdown_fails() {
179         use tokio::task::spawn_blocking;
180         let rt = rt();
181         let _enter = rt.enter();
182         rt.shutdown_timeout(Duration::from_secs(1000));
183 
184         let join_err = Handle::current()
185             .block_on(spawn_blocking(|| {
186                 std::thread::sleep(Duration::from_millis(100));
187                 42
188             }))
189             .unwrap_err();
190 
191         assert!(join_err.is_cancelled());
192     }
193 
194     #[test]
195     fn spawn_blocking_started_before_shutdown_continues() {
196         use tokio::task::spawn_blocking;
197         let rt = rt();
198         let _enter = rt.enter();
199 
200         let handle = spawn_blocking(|| {
201             std::thread::sleep(Duration::from_secs(1));
202             42
203         });
204 
205         rt.shutdown_timeout(Duration::from_secs(1000));
206 
207         let answer = Handle::current().block_on(handle).unwrap();
208 
209         assert_eq!(answer, 42);
210     }
211 
212     // ==== net ======
213 
214     #[test]
215     fn tcp_listener_bind() {
216         let rt = rt();
217         let _enter = rt.enter();
218 
219         Handle::current()
220             .block_on(net::TcpListener::bind("127.0.0.1:0"))
221             .unwrap();
222     }
223 
224     // All io tests are ignored for now. See above why that is.
225     #[ignore]
226     #[test]
227     fn tcp_listener_connect_after_shutdown() {
228         let rt = rt();
229         let _enter = rt.enter();
230 
231         rt.shutdown_timeout(Duration::from_secs(1000));
232 
233         let err = Handle::current()
234             .block_on(net::TcpListener::bind("127.0.0.1:0"))
235             .unwrap_err();
236 
237         assert_eq!(err.kind(), std::io::ErrorKind::Other);
238         assert_eq!(
239             err.get_ref().unwrap().to_string(),
240             "A Tokio 1.x context was found, but it is being shutdown.",
241         );
242     }
243 
244     // All io tests are ignored for now. See above why that is.
245     #[ignore]
246     #[test]
247     fn tcp_listener_connect_before_shutdown() {
248         let rt = rt();
249         let _enter = rt.enter();
250 
251         let bind_future = net::TcpListener::bind("127.0.0.1:0");
252 
253         rt.shutdown_timeout(Duration::from_secs(1000));
254 
255         let err = Handle::current().block_on(bind_future).unwrap_err();
256 
257         assert_eq!(err.kind(), std::io::ErrorKind::Other);
258         assert_eq!(
259             err.get_ref().unwrap().to_string(),
260             "A Tokio 1.x context was found, but it is being shutdown.",
261         );
262     }
263 
264     #[test]
265     fn udp_socket_bind() {
266         let rt = rt();
267         let _enter = rt.enter();
268 
269         Handle::current()
270             .block_on(net::UdpSocket::bind("127.0.0.1:0"))
271             .unwrap();
272     }
273 
274     // All io tests are ignored for now. See above why that is.
275     #[ignore]
276     #[test]
277     fn udp_stream_bind_after_shutdown() {
278         let rt = rt();
279         let _enter = rt.enter();
280 
281         rt.shutdown_timeout(Duration::from_secs(1000));
282 
283         let err = Handle::current()
284             .block_on(net::UdpSocket::bind("127.0.0.1:0"))
285             .unwrap_err();
286 
287         assert_eq!(err.kind(), std::io::ErrorKind::Other);
288         assert_eq!(
289             err.get_ref().unwrap().to_string(),
290             "A Tokio 1.x context was found, but it is being shutdown.",
291         );
292     }
293 
294     // All io tests are ignored for now. See above why that is.
295     #[ignore]
296     #[test]
297     fn udp_stream_bind_before_shutdown() {
298         let rt = rt();
299         let _enter = rt.enter();
300 
301         let bind_future = net::UdpSocket::bind("127.0.0.1:0");
302 
303         rt.shutdown_timeout(Duration::from_secs(1000));
304 
305         let err = Handle::current().block_on(bind_future).unwrap_err();
306 
307         assert_eq!(err.kind(), std::io::ErrorKind::Other);
308         assert_eq!(
309             err.get_ref().unwrap().to_string(),
310             "A Tokio 1.x context was found, but it is being shutdown.",
311         );
312     }
313 
314     // All io tests are ignored for now. See above why that is.
315     #[ignore]
316     #[cfg(unix)]
317     #[test]
318     fn unix_listener_bind_after_shutdown() {
319         let rt = rt();
320         let _enter = rt.enter();
321 
322         let dir = tempfile::tempdir().unwrap();
323         let path = dir.path().join("socket");
324 
325         rt.shutdown_timeout(Duration::from_secs(1000));
326 
327         let err = net::UnixListener::bind(path).unwrap_err();
328 
329         assert_eq!(err.kind(), std::io::ErrorKind::Other);
330         assert_eq!(
331             err.get_ref().unwrap().to_string(),
332             "A Tokio 1.x context was found, but it is being shutdown.",
333         );
334     }
335 
336     // All io tests are ignored for now. See above why that is.
337     #[ignore]
338     #[cfg(unix)]
339     #[test]
340     fn unix_listener_shutdown_after_bind() {
341         let rt = rt();
342         let _enter = rt.enter();
343 
344         let dir = tempfile::tempdir().unwrap();
345         let path = dir.path().join("socket");
346 
347         let listener = net::UnixListener::bind(path).unwrap();
348 
349         rt.shutdown_timeout(Duration::from_secs(1000));
350 
351         // this should not timeout but fail immediately since the runtime has been shutdown
352         let err = Handle::current().block_on(listener.accept()).unwrap_err();
353 
354         assert_eq!(err.kind(), std::io::ErrorKind::Other);
355         assert_eq!(err.get_ref().unwrap().to_string(), "reactor gone");
356     }
357 
358     // All io tests are ignored for now. See above why that is.
359     #[ignore]
360     #[cfg(unix)]
361     #[test]
362     fn unix_listener_shutdown_after_accept() {
363         let rt = rt();
364         let _enter = rt.enter();
365 
366         let dir = tempfile::tempdir().unwrap();
367         let path = dir.path().join("socket");
368 
369         let listener = net::UnixListener::bind(path).unwrap();
370 
371         let accept_future = listener.accept();
372 
373         rt.shutdown_timeout(Duration::from_secs(1000));
374 
375         // this should not timeout but fail immediately since the runtime has been shutdown
376         let err = Handle::current().block_on(accept_future).unwrap_err();
377 
378         assert_eq!(err.kind(), std::io::ErrorKind::Other);
379         assert_eq!(err.get_ref().unwrap().to_string(), "reactor gone");
380     }
381 
382     // ==== nesting ======
383 
384     #[test]
385     #[should_panic(
386         expected = "Cannot start a runtime from within a runtime. This happens because a function (like `block_on`) attempted to block the current thread while the thread is being used to drive asynchronous tasks."
387     )]
388     fn nesting() {
389         fn some_non_async_function() -> i32 {
390             Handle::current().block_on(time::sleep(Duration::from_millis(10)));
391             1
392         }
393 
394         let rt = rt();
395 
396         rt.block_on(async { some_non_async_function() });
397     }
398 
399     #[test]
400     fn spawn_after_runtime_dropped() {
401         use futures::future::FutureExt;
402 
403         let rt = rt();
404 
405         let handle = rt.block_on(async move {
406             Handle::current()
407         });
408 
409         let jh1 = handle.spawn(futures::future::pending::<()>());
410 
411         drop(rt);
412 
413         let jh2 = handle.spawn(futures::future::pending::<()>());
414 
415         let err1 = jh1.now_or_never().unwrap().unwrap_err();
416         let err2 = jh2.now_or_never().unwrap().unwrap_err();
417         assert!(err1.is_cancelled());
418         assert!(err2.is_cancelled());
419     }
420 }
421 
422 #[cfg(not(tokio_wasi))]
423 multi_threaded_rt_test! {
424     #[cfg(unix)]
425     #[test]
426     fn unix_listener_bind() {
427         let rt = rt();
428         let _enter = rt.enter();
429 
430         let dir = tempfile::tempdir().unwrap();
431         let path = dir.path().join("socket");
432 
433         let listener = net::UnixListener::bind(path).unwrap();
434 
435         // this should timeout and not fail immediately since the runtime has not been shutdown
436         let _: tokio::time::error::Elapsed = Handle::current()
437             .block_on(tokio::time::timeout(
438                 Duration::from_millis(10),
439                 listener.accept(),
440             ))
441             .unwrap_err();
442     }
443 
444     // ==== timers ======
445 
446     // `Handle::block_on` doesn't work with timer futures on a current thread runtime as there is no
447     // one to drive the timers so they will just hang forever. Therefore they are not tested.
448 
449     #[test]
450     fn sleep() {
451         let rt = rt();
452         let _enter = rt.enter();
453 
454         Handle::current().block_on(time::sleep(Duration::from_millis(100)));
455     }
456 
457     #[test]
458     #[should_panic(expected = "A Tokio 1.x context was found, but it is being shutdown.")]
459     fn sleep_before_shutdown_panics() {
460         let rt = rt();
461         let _enter = rt.enter();
462 
463         let f = time::sleep(Duration::from_millis(100));
464 
465         rt.shutdown_timeout(Duration::from_secs(1000));
466 
467         Handle::current().block_on(f);
468     }
469 
470     #[test]
471     #[should_panic(expected = "A Tokio 1.x context was found, but it is being shutdown.")]
472     fn sleep_after_shutdown_panics() {
473         let rt = rt();
474         let _enter = rt.enter();
475 
476         rt.shutdown_timeout(Duration::from_secs(1000));
477 
478         Handle::current().block_on(time::sleep(Duration::from_millis(100)));
479     }
480 }
481 
482 // ==== utils ======
483 
484 /// Create a new multi threaded runtime
485 #[cfg(not(tokio_wasi))]
new_multi_thread(n: usize) -> Runtime486 fn new_multi_thread(n: usize) -> Runtime {
487     tokio::runtime::Builder::new_multi_thread()
488         .worker_threads(n)
489         .enable_all()
490         .build()
491         .unwrap()
492 }
493 
494 /// Create a new single threaded runtime
new_current_thread() -> Runtime495 fn new_current_thread() -> Runtime {
496     tokio::runtime::Builder::new_current_thread()
497         .enable_all()
498         .build()
499         .unwrap()
500 }
501 
502 /// Utility to test things on both kinds of runtimes both before and after shutting it down.
test_with_runtimes<F>(f: F) where F: Fn(),503 fn test_with_runtimes<F>(f: F)
504 where
505     F: Fn(),
506 {
507     {
508         let rt = new_current_thread();
509         let _enter = rt.enter();
510         f();
511 
512         rt.shutdown_timeout(Duration::from_secs(1000));
513         f();
514     }
515 
516     #[cfg(not(tokio_wasi))]
517     {
518         let rt = new_multi_thread(1);
519         let _enter = rt.enter();
520         f();
521 
522         rt.shutdown_timeout(Duration::from_secs(1000));
523         f();
524     }
525 
526     #[cfg(not(tokio_wasi))]
527     {
528         let rt = new_multi_thread(4);
529         let _enter = rt.enter();
530         f();
531 
532         rt.shutdown_timeout(Duration::from_secs(1000));
533         f();
534     }
535 }
536