• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 #![allow(clippy::needless_range_loop)]
2 #![warn(rust_2018_idioms)]
3 #![cfg(feature = "full")]
4 
5 // Tests to run on both current-thread & multi-thread runtime variants.
6 
7 macro_rules! rt_test {
8     ($($t:tt)*) => {
9         mod current_thread_scheduler {
10             $($t)*
11 
12             #[cfg(not(target_os="wasi"))]
13             const NUM_WORKERS: usize = 1;
14 
15             fn rt() -> Arc<Runtime> {
16                 tokio::runtime::Builder::new_current_thread()
17                     .enable_all()
18                     .build()
19                     .unwrap()
20                     .into()
21             }
22         }
23 
24         #[cfg(not(target_os = "wasi"))] // Wasi doesn't support threads
25         mod threaded_scheduler_4_threads {
26             $($t)*
27 
28             const NUM_WORKERS: usize = 4;
29 
30             fn rt() -> Arc<Runtime> {
31                 tokio::runtime::Builder::new_multi_thread()
32                     .worker_threads(4)
33                     .enable_all()
34                     .build()
35                     .unwrap()
36                     .into()
37             }
38         }
39 
40         #[cfg(not(target_os = "wasi"))] // Wasi doesn't support threads
41         mod threaded_scheduler_1_thread {
42             $($t)*
43 
44             const NUM_WORKERS: usize = 1;
45 
46             fn rt() -> Arc<Runtime> {
47                 tokio::runtime::Builder::new_multi_thread()
48                     .worker_threads(1)
49                     .enable_all()
50                     .build()
51                     .unwrap()
52                     .into()
53             }
54         }
55 
56         #[cfg(not(target_os = "wasi"))] // Wasi doesn't support threads
57         #[cfg(tokio_unstable)]
58         mod alt_threaded_scheduler_4_threads {
59             $($t)*
60 
61             const NUM_WORKERS: usize = 4;
62 
63             fn rt() -> Arc<Runtime> {
64                 tokio::runtime::Builder::new_multi_thread()
65                     .worker_threads(4)
66                     .enable_all()
67                     .build()
68                     .unwrap()
69                     .into()
70             }
71         }
72 
73         #[cfg(not(target_os = "wasi"))] // Wasi doesn't support threads
74         #[cfg(tokio_unstable)]
75         mod alt_threaded_scheduler_1_thread {
76             $($t)*
77 
78             const NUM_WORKERS: usize = 1;
79 
80             fn rt() -> Arc<Runtime> {
81                 tokio::runtime::Builder::new_multi_thread()
82                     .worker_threads(1)
83                     .enable_all()
84                     .build()
85                     .unwrap()
86                     .into()
87             }
88         }
89     }
90 }
91 
92 #[test]
send_sync_bound()93 fn send_sync_bound() {
94     use tokio::runtime::Runtime;
95     fn is_send<T: Send + Sync>() {}
96 
97     is_send::<Runtime>();
98 }
99 
100 rt_test! {
101     #[cfg(not(target_os="wasi"))]
102     use tokio::net::{TcpListener, TcpStream};
103     #[cfg(not(target_os="wasi"))]
104     use tokio::io::{AsyncReadExt, AsyncWriteExt};
105 
106     use tokio::runtime::Runtime;
107     use tokio::sync::oneshot;
108     use tokio::{task, time};
109 
110     #[cfg(not(target_os="wasi"))]
111     use tokio_test::assert_err;
112     use tokio_test::assert_ok;
113 
114     use futures::future::poll_fn;
115     use std::future::Future;
116     use std::pin::Pin;
117 
118     #[cfg(not(target_os="wasi"))]
119     use std::sync::mpsc;
120 
121     use std::sync::Arc;
122     use std::task::{Context, Poll};
123 
124     #[cfg(not(target_os="wasi"))]
125     use std::thread;
126     use std::time::{Duration, Instant};
127 
128     #[test]
129     fn block_on_sync() {
130         let rt = rt();
131 
132         let mut win = false;
133         rt.block_on(async {
134             win = true;
135         });
136 
137         assert!(win);
138     }
139 
140 
141     #[cfg(not(target_os="wasi"))]
142     #[test]
143     fn block_on_async() {
144         let rt = rt();
145 
146         let out = rt.block_on(async {
147             let (tx, rx) = oneshot::channel();
148 
149             thread::spawn(move || {
150                 thread::sleep(Duration::from_millis(50));
151                 tx.send("ZOMG").unwrap();
152             });
153 
154             assert_ok!(rx.await)
155         });
156 
157         assert_eq!(out, "ZOMG");
158     }
159 
160     #[test]
161     fn spawn_one_bg() {
162         let rt = rt();
163 
164         let out = rt.block_on(async {
165             let (tx, rx) = oneshot::channel();
166 
167             tokio::spawn(async move {
168                 tx.send("ZOMG").unwrap();
169             });
170 
171             assert_ok!(rx.await)
172         });
173 
174         assert_eq!(out, "ZOMG");
175     }
176 
177     #[test]
178     fn spawn_one_join() {
179         let rt = rt();
180 
181         let out = rt.block_on(async {
182             let (tx, rx) = oneshot::channel();
183 
184             let handle = tokio::spawn(async move {
185                 tx.send("ZOMG").unwrap();
186                 "DONE"
187             });
188 
189             let msg = assert_ok!(rx.await);
190 
191             let out = assert_ok!(handle.await);
192             assert_eq!(out, "DONE");
193 
194             msg
195         });
196 
197         assert_eq!(out, "ZOMG");
198     }
199 
200     #[test]
201     fn spawn_two() {
202         let rt = rt();
203 
204         let out = rt.block_on(async {
205             let (tx1, rx1) = oneshot::channel();
206             let (tx2, rx2) = oneshot::channel();
207 
208             tokio::spawn(async move {
209                 assert_ok!(tx1.send("ZOMG"));
210             });
211 
212             tokio::spawn(async move {
213                 let msg = assert_ok!(rx1.await);
214                 assert_ok!(tx2.send(msg));
215             });
216 
217             assert_ok!(rx2.await)
218         });
219 
220         assert_eq!(out, "ZOMG");
221     }
222 
223     #[cfg(not(target_os="wasi"))] // Wasi does not support threads
224     #[test]
225     fn spawn_many_from_block_on() {
226         use tokio::sync::mpsc;
227 
228         const ITER: usize = 200;
229 
230         let rt = rt();
231 
232         let out = rt.block_on(async {
233             let (done_tx, mut done_rx) = mpsc::unbounded_channel();
234 
235             let mut txs = (0..ITER)
236                 .map(|i| {
237                     let (tx, rx) = oneshot::channel();
238                     let done_tx = done_tx.clone();
239 
240                     tokio::spawn(async move {
241                         let msg = assert_ok!(rx.await);
242                         assert_eq!(i, msg);
243                         assert_ok!(done_tx.send(msg));
244                     });
245 
246                     tx
247                 })
248                 .collect::<Vec<_>>();
249 
250             drop(done_tx);
251 
252             thread::spawn(move || {
253                 for (i, tx) in txs.drain(..).enumerate() {
254                     assert_ok!(tx.send(i));
255                 }
256             });
257 
258             let mut out = vec![];
259             while let Some(i) = done_rx.recv().await {
260                 out.push(i);
261             }
262 
263             out.sort_unstable();
264             out
265         });
266 
267         assert_eq!(ITER, out.len());
268 
269         for i in 0..ITER {
270             assert_eq!(i, out[i]);
271         }
272     }
273 
274     #[cfg(not(target_os="wasi"))] // Wasi does not support threads
275     #[test]
276     fn spawn_many_from_task() {
277         use tokio::sync::mpsc;
278 
279         const ITER: usize = 500;
280 
281         let rt = rt();
282 
283         let out = rt.block_on(async {
284             tokio::spawn(async move {
285                 let (done_tx, mut done_rx) = mpsc::unbounded_channel();
286 
287                 let mut txs = (0..ITER)
288                     .map(|i| {
289                         let (tx, rx) = oneshot::channel();
290                         let done_tx = done_tx.clone();
291 
292                         tokio::spawn(async move {
293                             let msg = assert_ok!(rx.await);
294                             assert_eq!(i, msg);
295                             assert_ok!(done_tx.send(msg));
296                         });
297 
298                         tx
299                     })
300                     .collect::<Vec<_>>();
301 
302                 drop(done_tx);
303 
304                 thread::spawn(move || {
305                     for (i, tx) in txs.drain(..).enumerate() {
306                         assert_ok!(tx.send(i));
307                     }
308                 });
309 
310                 let mut out = vec![];
311                 while let Some(i) = done_rx.recv().await {
312                     out.push(i);
313                 }
314 
315                 out.sort_unstable();
316                 out
317             }).await.unwrap()
318         });
319 
320         assert_eq!(ITER, out.len());
321 
322         for i in 0..ITER {
323             assert_eq!(i, out[i]);
324         }
325     }
326 
327     #[test]
328     fn spawn_one_from_block_on_called_on_handle() {
329         let rt = rt();
330         let (tx, rx) = oneshot::channel();
331 
332         #[allow(clippy::async_yields_async)]
333         let handle = rt.handle().block_on(async {
334             tokio::spawn(async move {
335                 tx.send("ZOMG").unwrap();
336                 "DONE"
337             })
338         });
339 
340         let out = rt.block_on(async {
341             let msg = assert_ok!(rx.await);
342 
343             let out = assert_ok!(handle.await);
344             assert_eq!(out, "DONE");
345 
346             msg
347         });
348 
349         assert_eq!(out, "ZOMG");
350     }
351 
352     #[test]
353     fn spawn_await_chain() {
354         let rt = rt();
355 
356         let out = rt.block_on(async {
357             assert_ok!(tokio::spawn(async {
358                 assert_ok!(tokio::spawn(async {
359                     "hello"
360                 }).await)
361             }).await)
362         });
363 
364         assert_eq!(out, "hello");
365     }
366 
367     #[test]
368     fn outstanding_tasks_dropped() {
369         let rt = rt();
370 
371         let cnt = Arc::new(());
372 
373         rt.block_on(async {
374             let cnt = cnt.clone();
375 
376             tokio::spawn(poll_fn(move |_| {
377                 assert_eq!(2, Arc::strong_count(&cnt));
378                 Poll::<()>::Pending
379             }));
380         });
381 
382         assert_eq!(2, Arc::strong_count(&cnt));
383 
384         drop(rt);
385 
386         assert_eq!(1, Arc::strong_count(&cnt));
387     }
388 
389     #[test]
390     #[should_panic]
391     fn nested_rt() {
392         let rt1 = rt();
393         let rt2 = rt();
394 
395         rt1.block_on(async { rt2.block_on(async { "hello" }) });
396     }
397 
398     #[test]
399     fn create_rt_in_block_on() {
400         let rt1 = rt();
401         let rt2 = rt1.block_on(async { rt() });
402         let out = rt2.block_on(async { "ZOMG" });
403 
404         assert_eq!(out, "ZOMG");
405     }
406 
407     #[cfg(not(target_os="wasi"))] // Wasi does not support threads
408     #[test]
409     fn complete_block_on_under_load() {
410         let rt = rt();
411 
412         rt.block_on(async {
413             let (tx, rx) = oneshot::channel();
414 
415             // Spin hard
416             tokio::spawn(async {
417                 loop {
418                     yield_once().await;
419                 }
420             });
421 
422             thread::spawn(move || {
423                 thread::sleep(Duration::from_millis(50));
424                 assert_ok!(tx.send(()));
425             });
426 
427             assert_ok!(rx.await);
428         });
429     }
430 
431     #[cfg(not(target_os="wasi"))] // Wasi does not support threads
432     #[test]
433     fn complete_task_under_load() {
434         let rt = rt();
435 
436         rt.block_on(async {
437             let (tx1, rx1) = oneshot::channel();
438             let (tx2, rx2) = oneshot::channel();
439 
440             // Spin hard
441             tokio::spawn(async {
442                 loop {
443                     yield_once().await;
444                 }
445             });
446 
447             thread::spawn(move || {
448                 thread::sleep(Duration::from_millis(50));
449                 assert_ok!(tx1.send(()));
450             });
451 
452             tokio::spawn(async move {
453                 assert_ok!(rx1.await);
454                 assert_ok!(tx2.send(()));
455             });
456 
457             assert_ok!(rx2.await);
458         });
459     }
460 
461     #[cfg(not(target_os="wasi"))] // Wasi does not support threads
462     #[test]
463     fn spawn_from_other_thread_idle() {
464         let rt = rt();
465         let handle = rt.clone();
466 
467         let (tx, rx) = oneshot::channel();
468 
469         thread::spawn(move || {
470             thread::sleep(Duration::from_millis(50));
471 
472             handle.spawn(async move {
473                 assert_ok!(tx.send(()));
474             });
475         });
476 
477         rt.block_on(async move {
478             assert_ok!(rx.await);
479         });
480     }
481 
482     #[cfg(not(target_os="wasi"))] // Wasi does not support threads
483     #[test]
484     fn spawn_from_other_thread_under_load() {
485         let rt = rt();
486         let handle = rt.clone();
487 
488         let (tx, rx) = oneshot::channel();
489 
490         thread::spawn(move || {
491             handle.spawn(async move {
492                 assert_ok!(tx.send(()));
493             });
494         });
495 
496         rt.block_on(async move {
497             // Spin hard
498             tokio::spawn(async {
499                 loop {
500                     yield_once().await;
501                 }
502             });
503 
504             assert_ok!(rx.await);
505         });
506     }
507 
508     #[test]
509     fn sleep_at_root() {
510         let rt = rt();
511 
512         let now = Instant::now();
513         let dur = Duration::from_millis(50);
514 
515         rt.block_on(async move {
516             time::sleep(dur).await;
517         });
518 
519         assert!(now.elapsed() >= dur);
520     }
521 
522     #[test]
523     fn sleep_in_spawn() {
524         let rt = rt();
525 
526         let now = Instant::now();
527         let dur = Duration::from_millis(50);
528 
529         rt.block_on(async move {
530             let (tx, rx) = oneshot::channel();
531 
532             tokio::spawn(async move {
533                 time::sleep(dur).await;
534                 assert_ok!(tx.send(()));
535             });
536 
537             assert_ok!(rx.await);
538         });
539 
540         assert!(now.elapsed() >= dur);
541     }
542 
543     #[cfg(not(target_os="wasi"))] // Wasi does not support bind
544     #[test]
545     fn block_on_socket() {
546         let rt = rt();
547 
548         rt.block_on(async move {
549             let (tx, rx) = oneshot::channel();
550 
551             let listener = TcpListener::bind("127.0.0.1:0").await.unwrap();
552             let addr = listener.local_addr().unwrap();
553 
554             tokio::spawn(async move {
555                 let _ = listener.accept().await;
556                 tx.send(()).unwrap();
557             });
558 
559             TcpStream::connect(&addr).await.unwrap();
560             rx.await.unwrap();
561         });
562     }
563 
564     #[cfg(not(target_os="wasi"))] // Wasi does not support threads
565     #[test]
566     fn spawn_from_blocking() {
567         let rt = rt();
568 
569         let out = rt.block_on(async move {
570             let inner = assert_ok!(tokio::task::spawn_blocking(|| {
571                 tokio::spawn(async move { "hello" })
572             }).await);
573 
574             assert_ok!(inner.await)
575         });
576 
577         assert_eq!(out, "hello")
578     }
579 
580     #[cfg(not(target_os="wasi"))] // Wasi does not support threads
581     #[test]
582     fn spawn_blocking_from_blocking() {
583         let rt = rt();
584 
585         let out = rt.block_on(async move {
586             let inner = assert_ok!(tokio::task::spawn_blocking(|| {
587                 tokio::task::spawn_blocking(|| "hello")
588             }).await);
589 
590             assert_ok!(inner.await)
591         });
592 
593         assert_eq!(out, "hello")
594     }
595 
596     #[cfg(not(target_os="wasi"))] // Wasi does not support threads
597     #[test]
598     fn sleep_from_blocking() {
599         let rt = rt();
600 
601         rt.block_on(async move {
602             assert_ok!(tokio::task::spawn_blocking(|| {
603                 let now = std::time::Instant::now();
604                 let dur = Duration::from_millis(1);
605 
606                 // use the futures' block_on fn to make sure we aren't setting
607                 // any Tokio context
608                 futures::executor::block_on(async {
609                     tokio::time::sleep(dur).await;
610                 });
611 
612                 assert!(now.elapsed() >= dur);
613             }).await);
614         });
615     }
616 
617     #[cfg(not(target_os="wasi"))] // Wasi does not support bind
618     #[test]
619     fn socket_from_blocking() {
620         let rt = rt();
621 
622         rt.block_on(async move {
623             let listener = assert_ok!(TcpListener::bind("127.0.0.1:0").await);
624             let addr = assert_ok!(listener.local_addr());
625 
626             let peer = tokio::task::spawn_blocking(move || {
627                 // use the futures' block_on fn to make sure we aren't setting
628                 // any Tokio context
629                 futures::executor::block_on(async {
630                     assert_ok!(TcpStream::connect(addr).await);
631                 });
632             });
633 
634             // Wait for the client to connect
635             let _ = assert_ok!(listener.accept().await);
636 
637             assert_ok!(peer.await);
638         });
639     }
640 
641     #[cfg(not(target_os="wasi"))] // Wasi does not support threads
642     #[test]
643     fn always_active_parker() {
644         // This test it to show that we will always have
645         // an active parker even if we call block_on concurrently
646 
647         let rt = rt();
648         let rt2 = rt.clone();
649 
650         let (tx1, rx1) = oneshot::channel();
651         let (tx2, rx2) = oneshot::channel();
652 
653         let jh1 = thread::spawn(move || {
654                 rt.block_on(async move {
655                     rx2.await.unwrap();
656                     time::sleep(Duration::from_millis(5)).await;
657                     tx1.send(()).unwrap();
658                 });
659         });
660 
661         let jh2 = thread::spawn(move || {
662             rt2.block_on(async move {
663                 tx2.send(()).unwrap();
664                 time::sleep(Duration::from_millis(5)).await;
665                 rx1.await.unwrap();
666                 time::sleep(Duration::from_millis(5)).await;
667             });
668         });
669 
670         jh1.join().unwrap();
671         jh2.join().unwrap();
672     }
673 
674     #[test]
675     // IOCP requires setting the "max thread" concurrency value. The sane,
676     // default, is to set this to the number of cores. Threads that poll I/O
677     // become associated with the IOCP handle. Once those threads sleep for any
678     // reason (mutex), they yield their ownership.
679     //
680     // This test hits an edge case on windows where more threads than cores are
681     // created, none of those threads ever yield due to being at capacity, so
682     // IOCP gets "starved".
683     //
684     // For now, this is a very edge case that is probably not a real production
685     // concern. There also isn't a great/obvious solution to take. For now, the
686     // test is disabled.
687     #[cfg(not(windows))]
688     #[cfg(not(target_os="wasi"))] // Wasi does not support bind or threads
689     fn io_driver_called_when_under_load() {
690         let rt = rt();
691 
692         // Create a lot of constant load. The scheduler will always be busy.
693         for _ in 0..100 {
694             rt.spawn(async {
695                 loop {
696                     // Don't use Tokio's `yield_now()` to avoid special defer
697                     // logic.
698                     futures::future::poll_fn::<(), _>(|cx| {
699                         cx.waker().wake_by_ref();
700                         std::task::Poll::Pending
701                     }).await;
702                 }
703             });
704         }
705 
706         // Do some I/O work
707         rt.block_on(async {
708             let listener = assert_ok!(TcpListener::bind("127.0.0.1:0").await);
709             let addr = assert_ok!(listener.local_addr());
710 
711             let srv = tokio::spawn(async move {
712                 let (mut stream, _) = assert_ok!(listener.accept().await);
713                 assert_ok!(stream.write_all(b"hello world").await);
714             });
715 
716             let cli = tokio::spawn(async move {
717                 let mut stream = assert_ok!(TcpStream::connect(addr).await);
718                 let mut dst = vec![0; 11];
719 
720                 assert_ok!(stream.read_exact(&mut dst).await);
721                 assert_eq!(dst, b"hello world");
722             });
723 
724             assert_ok!(srv.await);
725             assert_ok!(cli.await);
726         });
727     }
728 
729     /// Tests that yielded tasks are not scheduled until **after** resource
730     /// drivers are polled.
731     ///
732     /// The OS does not guarantee when I/O events are delivered, so there may be
733     /// more yields than anticipated. This makes the test slightly flaky. To
734     /// help avoid flakiness, we run the test 10 times and only fail it after
735     /// 10 failures in a row.
736     ///
737     /// Note that if the test fails by panicking rather than by returning false,
738     /// then we fail it immediately. That kind of failure should not happen
739     /// spuriously.
740     #[test]
741     #[cfg(not(target_os="wasi"))]
742     fn yield_defers_until_park() {
743         for _ in 0..10 {
744             if yield_defers_until_park_inner() {
745                 // test passed
746                 return;
747             }
748 
749             // Wait a bit and run the test again.
750             std::thread::sleep(std::time::Duration::from_secs(2));
751         }
752 
753         panic!("yield_defers_until_park is failing consistently");
754     }
755 
756     /// Implementation of `yield_defers_until_park` test. Returns `true` if the
757     /// test passed.
758     #[cfg(not(target_os="wasi"))]
759     fn yield_defers_until_park_inner() -> bool {
760         use std::sync::atomic::{AtomicBool, Ordering::SeqCst};
761         use std::sync::Barrier;
762 
763         let rt = rt();
764 
765         let flag = Arc::new(AtomicBool::new(false));
766         let barrier = Arc::new(Barrier::new(NUM_WORKERS));
767 
768         rt.block_on(async {
769             // Make sure other workers cannot steal tasks
770             #[allow(clippy::reversed_empty_ranges)]
771             for _ in 0..(NUM_WORKERS-1) {
772                 let flag = flag.clone();
773                 let barrier = barrier.clone();
774 
775                 tokio::spawn(async move {
776                     barrier.wait();
777 
778                     while !flag.load(SeqCst) {
779                         std::thread::sleep(std::time::Duration::from_millis(1));
780                     }
781                 });
782             }
783 
784             barrier.wait();
785 
786             let (fail_test, fail_test_recv) = oneshot::channel::<()>();
787 
788             let jh = tokio::spawn(async move {
789                 // Create a TCP litener
790                 let listener = TcpListener::bind("127.0.0.1:0").await.unwrap();
791                 let addr = listener.local_addr().unwrap();
792 
793                 tokio::join!(
794                     async {
795                         // Done in a blocking manner intentionally.
796                         let _socket = std::net::TcpStream::connect(addr).unwrap();
797 
798                         // Yield until connected
799                         let mut cnt = 0;
800                         while !flag.load(SeqCst){
801                             tokio::task::yield_now().await;
802                             cnt += 1;
803 
804                             if cnt >= 10 {
805                                 // yielded too many times; report failure and
806                                 // sleep forever so that the `fail_test` branch
807                                 // of the `select!` below triggers.
808                                 let _ = fail_test.send(());
809                                 futures::future::pending::<()>().await;
810                                 break;
811                             }
812                         }
813                     },
814                     async {
815                         let _ = listener.accept().await.unwrap();
816                         flag.store(true, SeqCst);
817                     }
818                 );
819             });
820 
821             // Wait until the spawned task completes or fails. If no message is
822             // sent on `fail_test`, then the test succeeds. Otherwise, it fails.
823             let success = fail_test_recv.await.is_err();
824 
825             if success {
826                 // Check for panics in spawned task.
827                 jh.abort();
828                 jh.await.unwrap();
829             }
830 
831             success
832         })
833     }
834 
835     #[cfg(not(target_os="wasi"))] // Wasi does not support threads
836     #[test]
837     fn client_server_block_on() {
838         let rt = rt();
839         let (tx, rx) = mpsc::channel();
840 
841         rt.block_on(async move { client_server(tx).await });
842 
843         assert_ok!(rx.try_recv());
844         assert_err!(rx.try_recv());
845     }
846 
847     #[cfg_attr(target_os = "wasi", ignore = "Wasi does not support threads or panic recovery")]
848     #[cfg(panic = "unwind")]
849     #[test]
850     fn panic_in_task() {
851         let rt = rt();
852         let (tx, rx) = oneshot::channel();
853 
854         struct Boom(Option<oneshot::Sender<()>>);
855 
856         impl Future for Boom {
857             type Output = ();
858 
859             fn poll(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<()> {
860                 panic!();
861             }
862         }
863 
864         impl Drop for Boom {
865             fn drop(&mut self) {
866                 assert!(std::thread::panicking());
867                 self.0.take().unwrap().send(()).unwrap();
868             }
869         }
870 
871         rt.spawn(Boom(Some(tx)));
872         assert_ok!(rt.block_on(rx));
873     }
874 
875     #[test]
876     #[should_panic]
877     #[cfg_attr(target_os = "wasi", ignore = "Wasi does not support panic recovery")]
878     fn panic_in_block_on() {
879         let rt = rt();
880         rt.block_on(async { panic!() });
881     }
882 
883     #[cfg(not(target_os="wasi"))] // Wasi does not support threads
884     async fn yield_once() {
885         let mut yielded = false;
886         poll_fn(|cx| {
887             if yielded {
888                 Poll::Ready(())
889             } else {
890                 yielded = true;
891                 cx.waker().wake_by_ref();
892                 Poll::Pending
893             }
894         })
895         .await
896     }
897 
898     #[test]
899     fn enter_and_spawn() {
900         let rt = rt();
901         let handle = {
902             let _enter = rt.enter();
903             tokio::spawn(async {})
904         };
905 
906         assert_ok!(rt.block_on(handle));
907     }
908 
909     #[test]
910     fn eagerly_drops_futures_on_shutdown() {
911         use std::sync::mpsc;
912 
913         struct Never {
914             drop_tx: mpsc::Sender<()>,
915         }
916 
917         impl Future for Never {
918             type Output = ();
919 
920             fn poll(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<()> {
921                 Poll::Pending
922             }
923         }
924 
925         impl Drop for Never {
926             fn drop(&mut self) {
927                 self.drop_tx.send(()).unwrap();
928             }
929         }
930 
931         let rt = rt();
932 
933         let (drop_tx, drop_rx) = mpsc::channel();
934         let (run_tx, run_rx) = oneshot::channel();
935 
936         rt.block_on(async move {
937             tokio::spawn(async move {
938                 assert_ok!(run_tx.send(()));
939 
940                 Never { drop_tx }.await
941             });
942 
943             assert_ok!(run_rx.await);
944         });
945 
946         drop(rt);
947 
948         assert_ok!(drop_rx.recv());
949     }
950 
951     #[test]
952     fn wake_while_rt_is_dropping() {
953         use tokio::sync::Barrier;
954 
955         struct OnDrop<F: FnMut()>(F);
956 
957         impl<F: FnMut()> Drop for OnDrop<F> {
958             fn drop(&mut self) {
959                 (self.0)()
960             }
961         }
962 
963         let (tx1, rx1) = oneshot::channel();
964         let (tx2, rx2) = oneshot::channel();
965 
966         let barrier = Arc::new(Barrier::new(3));
967         let barrier1 = barrier.clone();
968         let barrier2 = barrier.clone();
969 
970         let rt = rt();
971 
972         rt.spawn(async move {
973             let mut tx2 = Some(tx2);
974             let _d = OnDrop(move || {
975                 let _ = tx2.take().unwrap().send(());
976             });
977 
978             // Ensure a waker gets stored in oneshot 1.
979             let _ = tokio::join!(rx1, barrier1.wait());
980         });
981 
982         rt.spawn(async move {
983             let mut tx1 = Some(tx1);
984             let _d = OnDrop(move || {
985                 let _ = tx1.take().unwrap().send(());
986             });
987 
988             // Ensure a waker gets stored in oneshot 2.
989             let _ = tokio::join!(rx2, barrier2.wait());
990         });
991 
992         // Wait until every oneshot channel has been polled.
993         rt.block_on(barrier.wait());
994 
995         // Drop the rt. Regardless of which task is dropped first, its destructor will wake the
996         // other task.
997         drop(rt);
998     }
999 
1000     #[cfg(not(target_os="wasi"))] // Wasi doesn't support UDP or bind()
1001     #[test]
1002     fn io_notify_while_shutting_down() {
1003         use tokio::net::UdpSocket;
1004         use std::sync::Arc;
1005 
1006         for _ in 1..10 {
1007             let runtime = rt();
1008 
1009             runtime.block_on(async {
1010                 let socket = UdpSocket::bind("127.0.0.1:0").await.unwrap();
1011                 let addr = socket.local_addr().unwrap();
1012                 let send_half = Arc::new(socket);
1013                 let recv_half = send_half.clone();
1014 
1015                 tokio::spawn(async move {
1016                     let mut buf = [0];
1017                     loop {
1018                         recv_half.recv_from(&mut buf).await.unwrap();
1019                         std::thread::sleep(Duration::from_millis(2));
1020                     }
1021                 });
1022 
1023                 tokio::spawn(async move {
1024                     let buf = [0];
1025                     loop {
1026                         send_half.send_to(&buf, &addr).await.unwrap();
1027                         tokio::time::sleep(Duration::from_millis(1)).await;
1028                     }
1029                 });
1030 
1031                 tokio::time::sleep(Duration::from_millis(5)).await;
1032             });
1033         }
1034     }
1035 
1036     #[cfg(not(target_os="wasi"))] // Wasi does not support threads
1037     #[test]
1038     fn shutdown_timeout() {
1039         let (tx, rx) = oneshot::channel();
1040         let runtime = rt();
1041 
1042         runtime.block_on(async move {
1043             task::spawn_blocking(move || {
1044                 tx.send(()).unwrap();
1045                 thread::sleep(Duration::from_secs(10_000));
1046             });
1047 
1048             rx.await.unwrap();
1049         });
1050 
1051         Arc::try_unwrap(runtime).unwrap().shutdown_timeout(Duration::from_millis(100));
1052     }
1053 
1054     #[cfg(not(target_os="wasi"))] // Wasi does not support threads
1055     #[test]
1056     fn shutdown_timeout_0() {
1057         let runtime = rt();
1058 
1059         runtime.block_on(async move {
1060             task::spawn_blocking(move || {
1061                 thread::sleep(Duration::from_secs(10_000));
1062             });
1063         });
1064 
1065         let now = Instant::now();
1066         Arc::try_unwrap(runtime).unwrap().shutdown_timeout(Duration::from_nanos(0));
1067         assert!(now.elapsed().as_secs() < 1);
1068     }
1069 
1070     #[test]
1071     fn shutdown_wakeup_time() {
1072         let runtime = rt();
1073 
1074         runtime.block_on(async move {
1075             tokio::time::sleep(std::time::Duration::from_millis(100)).await;
1076         });
1077 
1078         Arc::try_unwrap(runtime).unwrap().shutdown_timeout(Duration::from_secs(10_000));
1079     }
1080 
1081     // This test is currently ignored on Windows because of a
1082     // rust-lang issue in thread local storage destructors.
1083     // See https://github.com/rust-lang/rust/issues/74875
1084     #[test]
1085     #[cfg(not(windows))]
1086     #[cfg_attr(target_os = "wasi", ignore = "Wasi does not support threads")]
1087     fn runtime_in_thread_local() {
1088         use std::cell::RefCell;
1089         use std::thread;
1090 
1091         thread_local!(
1092             static R: RefCell<Option<Runtime>> = RefCell::new(None);
1093         );
1094 
1095         thread::spawn(|| {
1096             R.with(|cell| {
1097                 let rt = rt();
1098                 let rt = Arc::try_unwrap(rt).unwrap();
1099                 *cell.borrow_mut() = Some(rt);
1100             });
1101 
1102             let _rt = rt();
1103         }).join().unwrap();
1104     }
1105 
1106     #[cfg(not(target_os="wasi"))] // Wasi does not support bind
1107     async fn client_server(tx: mpsc::Sender<()>) {
1108         let server = assert_ok!(TcpListener::bind("127.0.0.1:0").await);
1109 
1110         // Get the assigned address
1111         let addr = assert_ok!(server.local_addr());
1112 
1113         // Spawn the server
1114         tokio::spawn(async move {
1115             // Accept a socket
1116             let (mut socket, _) = server.accept().await.unwrap();
1117 
1118             // Write some data
1119             socket.write_all(b"hello").await.unwrap();
1120         });
1121 
1122         let mut client = TcpStream::connect(&addr).await.unwrap();
1123 
1124         let mut buf = vec![];
1125         client.read_to_end(&mut buf).await.unwrap();
1126 
1127         assert_eq!(buf, b"hello");
1128         tx.send(()).unwrap();
1129     }
1130 
1131     #[cfg(not(target_os = "wasi"))] // Wasi does not support bind
1132     #[test]
1133     fn local_set_block_on_socket() {
1134         let rt = rt();
1135         let local = task::LocalSet::new();
1136 
1137         local.block_on(&rt, async move {
1138             let (tx, rx) = oneshot::channel();
1139 
1140             let listener = TcpListener::bind("127.0.0.1:0").await.unwrap();
1141             let addr = listener.local_addr().unwrap();
1142 
1143             task::spawn_local(async move {
1144                 let _ = listener.accept().await;
1145                 tx.send(()).unwrap();
1146             });
1147 
1148             TcpStream::connect(&addr).await.unwrap();
1149             rx.await.unwrap();
1150         });
1151     }
1152 
1153     #[cfg(not(target_os = "wasi"))] // Wasi does not support bind
1154     #[test]
1155     fn local_set_client_server_block_on() {
1156         let rt = rt();
1157         let (tx, rx) = mpsc::channel();
1158 
1159         let local = task::LocalSet::new();
1160 
1161         local.block_on(&rt, async move { client_server_local(tx).await });
1162 
1163         assert_ok!(rx.try_recv());
1164         assert_err!(rx.try_recv());
1165     }
1166 
1167     #[cfg(not(target_os = "wasi"))] // Wasi does not support bind
1168     async fn client_server_local(tx: mpsc::Sender<()>) {
1169         let server = assert_ok!(TcpListener::bind("127.0.0.1:0").await);
1170 
1171         // Get the assigned address
1172         let addr = assert_ok!(server.local_addr());
1173 
1174         // Spawn the server
1175         task::spawn_local(async move {
1176             // Accept a socket
1177             let (mut socket, _) = server.accept().await.unwrap();
1178 
1179             // Write some data
1180             socket.write_all(b"hello").await.unwrap();
1181         });
1182 
1183         let mut client = TcpStream::connect(&addr).await.unwrap();
1184 
1185         let mut buf = vec![];
1186         client.read_to_end(&mut buf).await.unwrap();
1187 
1188         assert_eq!(buf, b"hello");
1189         tx.send(()).unwrap();
1190     }
1191 
1192     #[test]
1193     fn coop() {
1194         use std::task::Poll::Ready;
1195         use tokio::sync::mpsc;
1196 
1197         let rt = rt();
1198 
1199         rt.block_on(async {
1200             let (send, mut recv) = mpsc::unbounded_channel();
1201 
1202             // Send a bunch of messages.
1203             for _ in 0..1_000 {
1204                 send.send(()).unwrap();
1205             }
1206 
1207             poll_fn(|cx| {
1208                 // At least one response should return pending.
1209                 for _ in 0..1_000 {
1210                     if recv.poll_recv(cx).is_pending() {
1211                         return Ready(());
1212                     }
1213                 }
1214 
1215                 panic!("did not yield");
1216             }).await;
1217         });
1218     }
1219 
1220     #[test]
1221     fn coop_unconstrained() {
1222         use std::task::Poll::Ready;
1223         use tokio::sync::mpsc;
1224 
1225         let rt = rt();
1226 
1227         rt.block_on(async {
1228             let (send, mut recv) = mpsc::unbounded_channel();
1229 
1230             // Send a bunch of messages.
1231             for _ in 0..1_000 {
1232                 send.send(()).unwrap();
1233             }
1234 
1235             tokio::task::unconstrained(poll_fn(|cx| {
1236                 // All the responses should be ready.
1237                 for _ in 0..1_000 {
1238                     assert_eq!(recv.poll_recv(cx), Poll::Ready(Some(())));
1239                 }
1240 
1241                 Ready(())
1242             })).await;
1243         });
1244     }
1245 
1246     #[cfg(tokio_unstable)]
1247     #[test]
1248     fn coop_consume_budget() {
1249         let rt = rt();
1250 
1251         rt.block_on(async {
1252             poll_fn(|cx| {
1253                 let counter = Arc::new(std::sync::Mutex::new(0));
1254                 let counter_clone = Arc::clone(&counter);
1255                 let mut worker = Box::pin(async move {
1256                     // Consume the budget until a yield happens
1257                     for _ in 0..1000 {
1258                         *counter.lock().unwrap() += 1;
1259                         task::consume_budget().await
1260                     }
1261                 });
1262                 // Assert that the worker was yielded and it didn't manage
1263                 // to finish the whole work (assuming the total budget of 128)
1264                 assert!(Pin::new(&mut worker).poll(cx).is_pending());
1265                 assert!(*counter_clone.lock().unwrap() < 1000);
1266                 std::task::Poll::Ready(())
1267             }).await;
1268         });
1269     }
1270 
1271     // Tests that the "next task" scheduler optimization is not able to starve
1272     // other tasks.
1273     #[test]
1274     fn ping_pong_saturation() {
1275         use std::sync::atomic::{Ordering, AtomicBool};
1276         use tokio::sync::mpsc;
1277 
1278         const NUM: usize = 100;
1279 
1280         let rt = rt();
1281 
1282         let running = Arc::new(AtomicBool::new(true));
1283 
1284         rt.block_on(async {
1285             let (spawned_tx, mut spawned_rx) = mpsc::unbounded_channel();
1286 
1287             let mut tasks = vec![];
1288             // Spawn a bunch of tasks that ping-pong between each other to
1289             // saturate the runtime.
1290             for _ in 0..NUM {
1291                 let (tx1, mut rx1) = mpsc::unbounded_channel();
1292                 let (tx2, mut rx2) = mpsc::unbounded_channel();
1293                 let spawned_tx = spawned_tx.clone();
1294                 let running = running.clone();
1295                 tasks.push(task::spawn(async move {
1296                     spawned_tx.send(()).unwrap();
1297 
1298 
1299                     while running.load(Ordering::Relaxed) {
1300                         tx1.send(()).unwrap();
1301                         rx2.recv().await.unwrap();
1302                     }
1303 
1304                     // Close the channel and wait for the other task to exit.
1305                     drop(tx1);
1306                     assert!(rx2.recv().await.is_none());
1307                 }));
1308 
1309                 tasks.push(task::spawn(async move {
1310                     while rx1.recv().await.is_some() {
1311                         tx2.send(()).unwrap();
1312                     }
1313                 }));
1314             }
1315 
1316             for _ in 0..NUM {
1317                 spawned_rx.recv().await.unwrap();
1318             }
1319 
1320             // spawn another task and wait for it to complete
1321             let handle = task::spawn(async {
1322                 for _ in 0..5 {
1323                     // Yielding forces it back into the local queue.
1324                     task::yield_now().await;
1325                 }
1326             });
1327             handle.await.unwrap();
1328             running.store(false, Ordering::Relaxed);
1329             for t in tasks {
1330                 t.await.unwrap();
1331             }
1332         });
1333     }
1334 
1335     #[test]
1336     #[cfg(not(target_os="wasi"))]
1337     fn shutdown_concurrent_spawn() {
1338         const NUM_TASKS: usize = 10_000;
1339         for _ in 0..5 {
1340             let (tx, rx) = std::sync::mpsc::channel();
1341             let rt = rt();
1342 
1343             let mut txs = vec![];
1344 
1345             for _ in 0..NUM_TASKS {
1346                 let (tx, rx) = tokio::sync::oneshot::channel();
1347                 txs.push(tx);
1348                 rt.spawn(async move {
1349                     rx.await.unwrap();
1350                 });
1351             }
1352 
1353             // Prime the tasks
1354             rt.block_on(async { tokio::task::yield_now().await });
1355 
1356             let th = std::thread::spawn(move || {
1357                 tx.send(()).unwrap();
1358                 for tx in txs.drain(..) {
1359                     let _ = tx.send(());
1360                 }
1361             });
1362 
1363             rx.recv().unwrap();
1364             drop(rt);
1365 
1366             th.join().unwrap();
1367         }
1368     }
1369 }
1370