• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 #![warn(rust_2018_idioms)]
2 #![cfg(all(feature = "full", not(target_os = "wasi")))] // Wasi doesn't support threads
3 
4 use tokio::{runtime, task, time};
5 use tokio_test::assert_ok;
6 
7 use std::thread;
8 use std::time::Duration;
9 
10 mod support {
11     pub(crate) mod mpsc_stream;
12 }
13 
14 #[tokio::test]
basic_blocking()15 async fn basic_blocking() {
16     // Run a few times
17     for _ in 0..100 {
18         let out = assert_ok!(
19             tokio::spawn(async {
20                 assert_ok!(
21                     task::spawn_blocking(|| {
22                         thread::sleep(Duration::from_millis(5));
23                         "hello"
24                     })
25                     .await
26                 )
27             })
28             .await
29         );
30 
31         assert_eq!(out, "hello");
32     }
33 }
34 
35 #[tokio::test(flavor = "multi_thread")]
block_in_blocking()36 async fn block_in_blocking() {
37     // Run a few times
38     for _ in 0..100 {
39         let out = assert_ok!(
40             tokio::spawn(async {
41                 assert_ok!(
42                     task::spawn_blocking(|| {
43                         task::block_in_place(|| {
44                             thread::sleep(Duration::from_millis(5));
45                         });
46                         "hello"
47                     })
48                     .await
49                 )
50             })
51             .await
52         );
53 
54         assert_eq!(out, "hello");
55     }
56 }
57 
58 #[tokio::test(flavor = "multi_thread")]
block_in_block()59 async fn block_in_block() {
60     // Run a few times
61     for _ in 0..100 {
62         let out = assert_ok!(
63             tokio::spawn(async {
64                 task::block_in_place(|| {
65                     task::block_in_place(|| {
66                         thread::sleep(Duration::from_millis(5));
67                     });
68                     "hello"
69                 })
70             })
71             .await
72         );
73 
74         assert_eq!(out, "hello");
75     }
76 }
77 
78 #[tokio::test(flavor = "current_thread")]
79 #[should_panic]
no_block_in_current_thread_scheduler()80 async fn no_block_in_current_thread_scheduler() {
81     task::block_in_place(|| {});
82 }
83 
84 #[test]
yes_block_in_threaded_block_on()85 fn yes_block_in_threaded_block_on() {
86     let rt = runtime::Runtime::new().unwrap();
87     rt.block_on(async {
88         task::block_in_place(|| {});
89     });
90 }
91 
92 #[test]
93 #[should_panic]
no_block_in_current_thread_block_on()94 fn no_block_in_current_thread_block_on() {
95     let rt = runtime::Builder::new_current_thread().build().unwrap();
96     rt.block_on(async {
97         task::block_in_place(|| {});
98     });
99 }
100 
101 #[test]
can_enter_current_thread_rt_from_within_block_in_place()102 fn can_enter_current_thread_rt_from_within_block_in_place() {
103     let outer = tokio::runtime::Runtime::new().unwrap();
104 
105     outer.block_on(async {
106         tokio::task::block_in_place(|| {
107             let inner = tokio::runtime::Builder::new_current_thread()
108                 .build()
109                 .unwrap();
110 
111             inner.block_on(async {})
112         })
113     });
114 }
115 
116 #[test]
117 #[cfg(panic = "unwind")]
useful_panic_message_when_dropping_rt_in_rt()118 fn useful_panic_message_when_dropping_rt_in_rt() {
119     use std::panic::{catch_unwind, AssertUnwindSafe};
120 
121     let outer = tokio::runtime::Runtime::new().unwrap();
122 
123     let result = catch_unwind(AssertUnwindSafe(|| {
124         outer.block_on(async {
125             let _ = tokio::runtime::Builder::new_current_thread()
126                 .build()
127                 .unwrap();
128         });
129     }));
130 
131     assert!(result.is_err());
132     let err = result.unwrap_err();
133     let err: &'static str = err.downcast_ref::<&'static str>().unwrap();
134 
135     assert!(
136         err.contains("Cannot drop a runtime"),
137         "Wrong panic message: {:?}",
138         err
139     );
140 }
141 
142 #[test]
can_shutdown_with_zero_timeout_in_runtime()143 fn can_shutdown_with_zero_timeout_in_runtime() {
144     let outer = tokio::runtime::Runtime::new().unwrap();
145 
146     outer.block_on(async {
147         let rt = tokio::runtime::Builder::new_current_thread()
148             .build()
149             .unwrap();
150         rt.shutdown_timeout(Duration::from_nanos(0));
151     });
152 }
153 
154 #[test]
can_shutdown_now_in_runtime()155 fn can_shutdown_now_in_runtime() {
156     let outer = tokio::runtime::Runtime::new().unwrap();
157 
158     outer.block_on(async {
159         let rt = tokio::runtime::Builder::new_current_thread()
160             .build()
161             .unwrap();
162         rt.shutdown_background();
163     });
164 }
165 
166 #[test]
coop_disabled_in_block_in_place()167 fn coop_disabled_in_block_in_place() {
168     let outer = tokio::runtime::Builder::new_multi_thread()
169         .enable_time()
170         .build()
171         .unwrap();
172 
173     let (tx, rx) = support::mpsc_stream::unbounded_channel_stream();
174 
175     for i in 0..200 {
176         tx.send(i).unwrap();
177     }
178     drop(tx);
179 
180     outer.block_on(async move {
181         let jh = tokio::spawn(async move {
182             tokio::task::block_in_place(move || {
183                 futures::executor::block_on(async move {
184                     use tokio_stream::StreamExt;
185                     assert_eq!(rx.fold(0, |n, _| n + 1).await, 200);
186                 })
187             })
188         });
189 
190         tokio::time::timeout(Duration::from_secs(1), jh)
191             .await
192             .expect("timed out (probably hanging)")
193             .unwrap()
194     });
195 }
196 
197 #[test]
coop_disabled_in_block_in_place_in_block_on()198 fn coop_disabled_in_block_in_place_in_block_on() {
199     let (done_tx, done_rx) = std::sync::mpsc::channel();
200     let done = done_tx.clone();
201     thread::spawn(move || {
202         let outer = tokio::runtime::Runtime::new().unwrap();
203 
204         let (tx, rx) = support::mpsc_stream::unbounded_channel_stream();
205 
206         for i in 0..200 {
207             tx.send(i).unwrap();
208         }
209         drop(tx);
210 
211         outer.block_on(async move {
212             tokio::task::block_in_place(move || {
213                 futures::executor::block_on(async move {
214                     use tokio_stream::StreamExt;
215                     assert_eq!(rx.fold(0, |n, _| n + 1).await, 200);
216                 })
217             })
218         });
219 
220         let _ = done.send(Ok(()));
221     });
222 
223     thread::spawn(move || {
224         thread::sleep(Duration::from_secs(1));
225         let _ = done_tx.send(Err("timed out (probably hanging)"));
226     });
227 
228     done_rx.recv().unwrap().unwrap();
229 }
230 
231 #[cfg(feature = "test-util")]
232 #[tokio::test(start_paused = true)]
blocking_when_paused()233 async fn blocking_when_paused() {
234     // Do not auto-advance time when we have started a blocking task that has
235     // not yet finished.
236     time::timeout(
237         Duration::from_secs(3),
238         task::spawn_blocking(|| thread::sleep(Duration::from_millis(1))),
239     )
240     .await
241     .expect("timeout should not trigger")
242     .expect("blocking task should finish");
243 
244     // Really: Do not auto-advance time, even if the timeout is short and the
245     // blocking task runs for longer than that. It doesn't matter: Tokio time
246     // is paused; system time is not.
247     time::timeout(
248         Duration::from_millis(1),
249         task::spawn_blocking(|| thread::sleep(Duration::from_millis(50))),
250     )
251     .await
252     .expect("timeout should not trigger")
253     .expect("blocking task should finish");
254 }
255 
256 #[cfg(feature = "test-util")]
257 #[tokio::test(start_paused = true)]
blocking_task_wakes_paused_runtime()258 async fn blocking_task_wakes_paused_runtime() {
259     let t0 = std::time::Instant::now();
260     time::timeout(
261         Duration::from_secs(15),
262         task::spawn_blocking(|| thread::sleep(Duration::from_millis(1))),
263     )
264     .await
265     .expect("timeout should not trigger")
266     .expect("blocking task should finish");
267     assert!(
268         t0.elapsed() < Duration::from_secs(10),
269         "completing a spawn_blocking should wake the scheduler if it's parked while time is paused"
270     );
271 }
272 
273 #[cfg(feature = "test-util")]
274 #[tokio::test(start_paused = true)]
unawaited_blocking_task_wakes_paused_runtime()275 async fn unawaited_blocking_task_wakes_paused_runtime() {
276     let t0 = std::time::Instant::now();
277 
278     // When this task finishes, time should auto-advance, even though the
279     // JoinHandle has not been awaited yet.
280     let a = task::spawn_blocking(|| {
281         thread::sleep(Duration::from_millis(1));
282     });
283 
284     crate::time::sleep(Duration::from_secs(15)).await;
285     a.await.expect("blocking task should finish");
286     assert!(
287         t0.elapsed() < Duration::from_secs(10),
288         "completing a spawn_blocking should wake the scheduler if it's parked while time is paused"
289     );
290 }
291 
292 #[cfg(feature = "test-util")]
293 #[tokio::test(start_paused = true)]
panicking_blocking_task_wakes_paused_runtime()294 async fn panicking_blocking_task_wakes_paused_runtime() {
295     let t0 = std::time::Instant::now();
296     let result = time::timeout(
297         Duration::from_secs(15),
298         task::spawn_blocking(|| {
299             thread::sleep(Duration::from_millis(1));
300             panic!("blocking task panicked");
301         }),
302     )
303     .await
304     .expect("timeout should not trigger");
305     assert!(result.is_err(), "blocking task should have panicked");
306     assert!(
307         t0.elapsed() < Duration::from_secs(10),
308         "completing a spawn_blocking should wake the scheduler if it's parked while time is paused"
309     );
310 }
311