• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 #![warn(rust_2018_idioms)]
2 #![cfg(feature = "full")]
3 
4 use tokio::runtime::Runtime;
5 use tokio::sync::oneshot;
6 use tokio::time::{timeout, Duration};
7 use tokio_test::{assert_err, assert_ok};
8 
9 use std::future::Future;
10 use std::pin::Pin;
11 use std::sync::atomic::{AtomicBool, Ordering};
12 use std::task::{Context, Poll};
13 use std::thread;
14 
15 mod support {
16     pub(crate) mod mpsc_stream;
17 }
18 
19 macro_rules! cfg_metrics {
20     ($($t:tt)*) => {
21         #[cfg(tokio_unstable)]
22         {
23             $( $t )*
24         }
25     }
26 }
27 
28 #[test]
spawned_task_does_not_progress_without_block_on()29 fn spawned_task_does_not_progress_without_block_on() {
30     let (tx, mut rx) = oneshot::channel();
31 
32     let rt = rt();
33 
34     rt.spawn(async move {
35         assert_ok!(tx.send("hello"));
36     });
37 
38     thread::sleep(Duration::from_millis(50));
39 
40     assert_err!(rx.try_recv());
41 
42     let out = rt.block_on(async { assert_ok!(rx.await) });
43 
44     assert_eq!(out, "hello");
45 }
46 
47 #[test]
no_extra_poll()48 fn no_extra_poll() {
49     use pin_project_lite::pin_project;
50     use std::pin::Pin;
51     use std::sync::{
52         atomic::{AtomicUsize, Ordering::SeqCst},
53         Arc,
54     };
55     use std::task::{Context, Poll};
56     use tokio_stream::{Stream, StreamExt};
57 
58     pin_project! {
59         struct TrackPolls<S> {
60             npolls: Arc<AtomicUsize>,
61             #[pin]
62             s: S,
63         }
64     }
65 
66     impl<S> Stream for TrackPolls<S>
67     where
68         S: Stream,
69     {
70         type Item = S::Item;
71         fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
72             let this = self.project();
73             this.npolls.fetch_add(1, SeqCst);
74             this.s.poll_next(cx)
75         }
76     }
77 
78     let (tx, rx) = support::mpsc_stream::unbounded_channel_stream::<()>();
79     let rx = TrackPolls {
80         npolls: Arc::new(AtomicUsize::new(0)),
81         s: rx,
82     };
83     let npolls = Arc::clone(&rx.npolls);
84 
85     let rt = rt();
86 
87     // TODO: could probably avoid this, but why not.
88     let mut rx = Box::pin(rx);
89 
90     rt.spawn(async move { while rx.next().await.is_some() {} });
91     rt.block_on(async {
92         tokio::task::yield_now().await;
93     });
94 
95     // should have been polled exactly once: the initial poll
96     assert_eq!(npolls.load(SeqCst), 1);
97 
98     tx.send(()).unwrap();
99     rt.block_on(async {
100         tokio::task::yield_now().await;
101     });
102 
103     // should have been polled twice more: once to yield Some(), then once to yield Pending
104     assert_eq!(npolls.load(SeqCst), 1 + 2);
105 
106     drop(tx);
107     rt.block_on(async {
108         tokio::task::yield_now().await;
109     });
110 
111     // should have been polled once more: to yield None
112     assert_eq!(npolls.load(SeqCst), 1 + 2 + 1);
113 }
114 
115 #[test]
acquire_mutex_in_drop()116 fn acquire_mutex_in_drop() {
117     use futures::future::pending;
118     use tokio::task;
119 
120     let (tx1, rx1) = oneshot::channel();
121     let (tx2, rx2) = oneshot::channel();
122 
123     let rt = rt();
124 
125     rt.spawn(async move {
126         let _ = rx2.await;
127         unreachable!();
128     });
129 
130     rt.spawn(async move {
131         let _ = rx1.await;
132         tx2.send(()).unwrap();
133         unreachable!();
134     });
135 
136     // Spawn a task that will never notify
137     rt.spawn(async move {
138         pending::<()>().await;
139         tx1.send(()).unwrap();
140     });
141 
142     // Tick the loop
143     rt.block_on(async {
144         task::yield_now().await;
145     });
146 
147     // Drop the rt
148     drop(rt);
149 }
150 
151 #[test]
drop_tasks_in_context()152 fn drop_tasks_in_context() {
153     static SUCCESS: AtomicBool = AtomicBool::new(false);
154 
155     struct ContextOnDrop;
156 
157     impl Future for ContextOnDrop {
158         type Output = ();
159 
160         fn poll(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<()> {
161             Poll::Pending
162         }
163     }
164 
165     impl Drop for ContextOnDrop {
166         fn drop(&mut self) {
167             if tokio::runtime::Handle::try_current().is_ok() {
168                 SUCCESS.store(true, Ordering::SeqCst);
169             }
170         }
171     }
172 
173     let rt = rt();
174     rt.spawn(ContextOnDrop);
175     drop(rt);
176 
177     assert!(SUCCESS.load(Ordering::SeqCst));
178 }
179 
180 #[test]
181 #[cfg_attr(target_os = "wasi", ignore = "Wasi does not support panic recovery")]
182 #[should_panic(expected = "boom")]
wake_in_drop_after_panic()183 fn wake_in_drop_after_panic() {
184     let (tx, rx) = oneshot::channel::<()>();
185 
186     struct WakeOnDrop(Option<oneshot::Sender<()>>);
187 
188     impl Drop for WakeOnDrop {
189         fn drop(&mut self) {
190             self.0.take().unwrap().send(()).unwrap();
191         }
192     }
193 
194     let rt = rt();
195 
196     rt.spawn(async move {
197         let _wake_on_drop = WakeOnDrop(Some(tx));
198         // wait forever
199         futures::future::pending::<()>().await;
200     });
201 
202     let _join = rt.spawn(async move { rx.await });
203 
204     rt.block_on(async {
205         tokio::task::yield_now().await;
206         panic!("boom");
207     });
208 }
209 
210 #[test]
spawn_two()211 fn spawn_two() {
212     let rt = rt();
213 
214     let out = rt.block_on(async {
215         let (tx, rx) = oneshot::channel();
216 
217         tokio::spawn(async move {
218             tokio::spawn(async move {
219                 tx.send("ZOMG").unwrap();
220             });
221         });
222 
223         assert_ok!(rx.await)
224     });
225 
226     assert_eq!(out, "ZOMG");
227 
228     cfg_metrics! {
229         let metrics = rt.metrics();
230         drop(rt);
231         assert_eq!(0, metrics.remote_schedule_count());
232 
233         let mut local = 0;
234         for i in 0..metrics.num_workers() {
235             local += metrics.worker_local_schedule_count(i);
236         }
237 
238         assert_eq!(2, local);
239     }
240 }
241 
242 #[cfg_attr(target_os = "wasi", ignore = "WASI: std::thread::spawn not supported")]
243 #[test]
spawn_remote()244 fn spawn_remote() {
245     let rt = rt();
246 
247     let out = rt.block_on(async {
248         let (tx, rx) = oneshot::channel();
249 
250         let handle = tokio::spawn(async move {
251             std::thread::spawn(move || {
252                 std::thread::sleep(Duration::from_millis(10));
253                 tx.send("ZOMG").unwrap();
254             });
255 
256             rx.await.unwrap()
257         });
258 
259         handle.await.unwrap()
260     });
261 
262     assert_eq!(out, "ZOMG");
263 
264     cfg_metrics! {
265         let metrics = rt.metrics();
266         drop(rt);
267         assert_eq!(1, metrics.remote_schedule_count());
268 
269         let mut local = 0;
270         for i in 0..metrics.num_workers() {
271             local += metrics.worker_local_schedule_count(i);
272         }
273 
274         assert_eq!(1, local);
275     }
276 }
277 
278 #[test]
279 #[cfg_attr(target_os = "wasi", ignore = "Wasi does not support panic recovery")]
280 #[should_panic(
281     expected = "A Tokio 1.x context was found, but timers are disabled. Call `enable_time` on the runtime builder to enable timers."
282 )]
timeout_panics_when_no_time_handle()283 fn timeout_panics_when_no_time_handle() {
284     let rt = tokio::runtime::Builder::new_current_thread()
285         .build()
286         .unwrap();
287     rt.block_on(async {
288         let (_tx, rx) = oneshot::channel::<()>();
289         let dur = Duration::from_millis(20);
290         let _ = timeout(dur, rx).await;
291     });
292 }
293 
294 #[cfg(tokio_unstable)]
295 mod unstable {
296     use tokio::runtime::{Builder, RngSeed, UnhandledPanic};
297 
298     #[test]
299     #[should_panic(
300         expected = "a spawned task panicked and the runtime is configured to shut down on unhandled panic"
301     )]
shutdown_on_panic()302     fn shutdown_on_panic() {
303         let rt = Builder::new_current_thread()
304             .unhandled_panic(UnhandledPanic::ShutdownRuntime)
305             .build()
306             .unwrap();
307 
308         rt.block_on(async {
309             tokio::spawn(async {
310                 panic!("boom");
311             });
312 
313             futures::future::pending::<()>().await;
314         })
315     }
316 
317     #[test]
318     #[cfg_attr(target_os = "wasi", ignore = "Wasi does not support panic recovery")]
spawns_do_nothing()319     fn spawns_do_nothing() {
320         use std::sync::Arc;
321 
322         let rt = Builder::new_current_thread()
323             .unhandled_panic(UnhandledPanic::ShutdownRuntime)
324             .build()
325             .unwrap();
326 
327         let rt1 = Arc::new(rt);
328         let rt2 = rt1.clone();
329 
330         let _ = std::thread::spawn(move || {
331             rt2.block_on(async {
332                 tokio::spawn(async {
333                     panic!("boom");
334                 });
335 
336                 futures::future::pending::<()>().await;
337             })
338         })
339         .join();
340 
341         let task = rt1.spawn(async {});
342         let res = futures::executor::block_on(task);
343         assert!(res.is_err());
344     }
345 
346     #[test]
347     #[cfg_attr(target_os = "wasi", ignore = "Wasi does not support panic recovery")]
shutdown_all_concurrent_block_on()348     fn shutdown_all_concurrent_block_on() {
349         const N: usize = 2;
350         use std::sync::{mpsc, Arc};
351 
352         let rt = Builder::new_current_thread()
353             .unhandled_panic(UnhandledPanic::ShutdownRuntime)
354             .build()
355             .unwrap();
356 
357         let rt = Arc::new(rt);
358         let mut ths = vec![];
359         let (tx, rx) = mpsc::channel();
360 
361         for _ in 0..N {
362             let rt = rt.clone();
363             let tx = tx.clone();
364             ths.push(std::thread::spawn(move || {
365                 rt.block_on(async {
366                     tx.send(()).unwrap();
367                     futures::future::pending::<()>().await;
368                 });
369             }));
370         }
371 
372         for _ in 0..N {
373             rx.recv().unwrap();
374         }
375 
376         rt.spawn(async {
377             panic!("boom");
378         });
379 
380         for th in ths {
381             assert!(th.join().is_err());
382         }
383     }
384 
385     #[test]
rng_seed()386     fn rng_seed() {
387         let seed = b"bytes used to generate seed";
388         let rt1 = tokio::runtime::Builder::new_current_thread()
389             .rng_seed(RngSeed::from_bytes(seed))
390             .build()
391             .unwrap();
392         let rt1_values = rt1.block_on(async {
393             let rand_1 = tokio::macros::support::thread_rng_n(100);
394             let rand_2 = tokio::macros::support::thread_rng_n(100);
395 
396             (rand_1, rand_2)
397         });
398 
399         let rt2 = tokio::runtime::Builder::new_current_thread()
400             .rng_seed(RngSeed::from_bytes(seed))
401             .build()
402             .unwrap();
403         let rt2_values = rt2.block_on(async {
404             let rand_1 = tokio::macros::support::thread_rng_n(100);
405             let rand_2 = tokio::macros::support::thread_rng_n(100);
406 
407             (rand_1, rand_2)
408         });
409 
410         assert_eq!(rt1_values, rt2_values);
411     }
412 
413     #[test]
rng_seed_multi_enter()414     fn rng_seed_multi_enter() {
415         let seed = b"bytes used to generate seed";
416 
417         fn two_rand_values() -> (u32, u32) {
418             let rand_1 = tokio::macros::support::thread_rng_n(100);
419             let rand_2 = tokio::macros::support::thread_rng_n(100);
420 
421             (rand_1, rand_2)
422         }
423 
424         let rt1 = tokio::runtime::Builder::new_current_thread()
425             .rng_seed(RngSeed::from_bytes(seed))
426             .build()
427             .unwrap();
428         let rt1_values_1 = rt1.block_on(async { two_rand_values() });
429         let rt1_values_2 = rt1.block_on(async { two_rand_values() });
430 
431         let rt2 = tokio::runtime::Builder::new_current_thread()
432             .rng_seed(RngSeed::from_bytes(seed))
433             .build()
434             .unwrap();
435         let rt2_values_1 = rt2.block_on(async { two_rand_values() });
436         let rt2_values_2 = rt2.block_on(async { two_rand_values() });
437 
438         assert_eq!(rt1_values_1, rt2_values_1);
439         assert_eq!(rt1_values_2, rt2_values_2);
440     }
441 }
442 
rt() -> Runtime443 fn rt() -> Runtime {
444     tokio::runtime::Builder::new_current_thread()
445         .enable_all()
446         .build()
447         .unwrap()
448 }
449