• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 #![warn(rust_2018_idioms)]
2 #![cfg(feature = "full")]
3 
4 use futures::{
5     future::{pending, ready},
6     FutureExt,
7 };
8 
9 use tokio::runtime;
10 use tokio::sync::{mpsc, oneshot};
11 use tokio::task::{self, LocalSet};
12 use tokio::time;
13 
14 #[cfg(not(tokio_wasi))]
15 use std::cell::Cell;
16 use std::sync::atomic::AtomicBool;
17 #[cfg(not(tokio_wasi))]
18 use std::sync::atomic::AtomicUsize;
19 use std::sync::atomic::Ordering;
20 #[cfg(not(tokio_wasi))]
21 use std::sync::atomic::Ordering::SeqCst;
22 use std::time::Duration;
23 
24 #[tokio::test(flavor = "current_thread")]
local_current_thread_scheduler()25 async fn local_current_thread_scheduler() {
26     LocalSet::new()
27         .run_until(async {
28             task::spawn_local(async {}).await.unwrap();
29         })
30         .await;
31 }
32 
33 #[cfg(not(tokio_wasi))] // Wasi doesn't support threads
34 #[tokio::test(flavor = "multi_thread")]
local_threadpool()35 async fn local_threadpool() {
36     thread_local! {
37         static ON_RT_THREAD: Cell<bool> = Cell::new(false);
38     }
39 
40     ON_RT_THREAD.with(|cell| cell.set(true));
41 
42     LocalSet::new()
43         .run_until(async {
44             assert!(ON_RT_THREAD.with(|cell| cell.get()));
45             task::spawn_local(async {
46                 assert!(ON_RT_THREAD.with(|cell| cell.get()));
47             })
48             .await
49             .unwrap();
50         })
51         .await;
52 }
53 
54 #[cfg(not(tokio_wasi))] // Wasi doesn't support threads
55 #[tokio::test(flavor = "multi_thread")]
localset_future_threadpool()56 async fn localset_future_threadpool() {
57     thread_local! {
58         static ON_LOCAL_THREAD: Cell<bool> = Cell::new(false);
59     }
60 
61     ON_LOCAL_THREAD.with(|cell| cell.set(true));
62 
63     let local = LocalSet::new();
64     local.spawn_local(async move {
65         assert!(ON_LOCAL_THREAD.with(|cell| cell.get()));
66     });
67     local.await;
68 }
69 
70 #[cfg(not(tokio_wasi))] // Wasi doesn't support threads
71 #[tokio::test(flavor = "multi_thread")]
localset_future_timers()72 async fn localset_future_timers() {
73     static RAN1: AtomicBool = AtomicBool::new(false);
74     static RAN2: AtomicBool = AtomicBool::new(false);
75 
76     let local = LocalSet::new();
77     local.spawn_local(async move {
78         time::sleep(Duration::from_millis(5)).await;
79         RAN1.store(true, Ordering::SeqCst);
80     });
81     local.spawn_local(async move {
82         time::sleep(Duration::from_millis(10)).await;
83         RAN2.store(true, Ordering::SeqCst);
84     });
85     local.await;
86     assert!(RAN1.load(Ordering::SeqCst));
87     assert!(RAN2.load(Ordering::SeqCst));
88 }
89 
90 #[tokio::test]
localset_future_drives_all_local_futs()91 async fn localset_future_drives_all_local_futs() {
92     static RAN1: AtomicBool = AtomicBool::new(false);
93     static RAN2: AtomicBool = AtomicBool::new(false);
94     static RAN3: AtomicBool = AtomicBool::new(false);
95 
96     let local = LocalSet::new();
97     local.spawn_local(async move {
98         task::spawn_local(async {
99             task::yield_now().await;
100             RAN3.store(true, Ordering::SeqCst);
101         });
102         task::yield_now().await;
103         RAN1.store(true, Ordering::SeqCst);
104     });
105     local.spawn_local(async move {
106         task::yield_now().await;
107         RAN2.store(true, Ordering::SeqCst);
108     });
109     local.await;
110     assert!(RAN1.load(Ordering::SeqCst));
111     assert!(RAN2.load(Ordering::SeqCst));
112     assert!(RAN3.load(Ordering::SeqCst));
113 }
114 
115 #[cfg(not(tokio_wasi))] // Wasi doesn't support threads
116 #[tokio::test(flavor = "multi_thread")]
local_threadpool_timer()117 async fn local_threadpool_timer() {
118     // This test ensures that runtime services like the timer are properly
119     // set for the local task set.
120     thread_local! {
121         static ON_RT_THREAD: Cell<bool> = Cell::new(false);
122     }
123 
124     ON_RT_THREAD.with(|cell| cell.set(true));
125 
126     LocalSet::new()
127         .run_until(async {
128             assert!(ON_RT_THREAD.with(|cell| cell.get()));
129             let join = task::spawn_local(async move {
130                 assert!(ON_RT_THREAD.with(|cell| cell.get()));
131                 time::sleep(Duration::from_millis(10)).await;
132                 assert!(ON_RT_THREAD.with(|cell| cell.get()));
133             });
134             join.await.unwrap();
135         })
136         .await;
137 }
138 #[test]
enter_guard_spawn()139 fn enter_guard_spawn() {
140     let local = LocalSet::new();
141     let _guard = local.enter();
142     // Run the local task set.
143 
144     let join = task::spawn_local(async { true });
145     let rt = runtime::Builder::new_current_thread()
146         .enable_all()
147         .build()
148         .unwrap();
149     local.block_on(&rt, async move {
150         assert!(join.await.unwrap());
151     });
152 }
153 
154 #[cfg(not(tokio_wasi))] // Wasi doesn't support panic recovery
155 #[test]
156 // This will panic, since the thread that calls `block_on` cannot use
157 // in-place blocking inside of `block_on`.
158 #[should_panic]
local_threadpool_blocking_in_place()159 fn local_threadpool_blocking_in_place() {
160     thread_local! {
161         static ON_RT_THREAD: Cell<bool> = Cell::new(false);
162     }
163 
164     ON_RT_THREAD.with(|cell| cell.set(true));
165 
166     let rt = runtime::Builder::new_current_thread()
167         .enable_all()
168         .build()
169         .unwrap();
170     LocalSet::new().block_on(&rt, async {
171         assert!(ON_RT_THREAD.with(|cell| cell.get()));
172         let join = task::spawn_local(async move {
173             assert!(ON_RT_THREAD.with(|cell| cell.get()));
174             task::block_in_place(|| {});
175             assert!(ON_RT_THREAD.with(|cell| cell.get()));
176         });
177         join.await.unwrap();
178     });
179 }
180 
181 #[cfg(not(tokio_wasi))] // Wasi doesn't support threads
182 #[tokio::test(flavor = "multi_thread")]
local_threadpool_blocking_run()183 async fn local_threadpool_blocking_run() {
184     thread_local! {
185         static ON_RT_THREAD: Cell<bool> = Cell::new(false);
186     }
187 
188     ON_RT_THREAD.with(|cell| cell.set(true));
189 
190     LocalSet::new()
191         .run_until(async {
192             assert!(ON_RT_THREAD.with(|cell| cell.get()));
193             let join = task::spawn_local(async move {
194                 assert!(ON_RT_THREAD.with(|cell| cell.get()));
195                 task::spawn_blocking(|| {
196                     assert!(
197                         !ON_RT_THREAD.with(|cell| cell.get()),
198                         "blocking must not run on the local task set's thread"
199                     );
200                 })
201                 .await
202                 .unwrap();
203                 assert!(ON_RT_THREAD.with(|cell| cell.get()));
204             });
205             join.await.unwrap();
206         })
207         .await;
208 }
209 
210 #[cfg(not(tokio_wasi))] // Wasi doesn't support threads
211 #[tokio::test(flavor = "multi_thread")]
all_spawns_are_local()212 async fn all_spawns_are_local() {
213     use futures::future;
214     thread_local! {
215         static ON_RT_THREAD: Cell<bool> = Cell::new(false);
216     }
217 
218     ON_RT_THREAD.with(|cell| cell.set(true));
219 
220     LocalSet::new()
221         .run_until(async {
222             assert!(ON_RT_THREAD.with(|cell| cell.get()));
223             let handles = (0..128)
224                 .map(|_| {
225                     task::spawn_local(async {
226                         assert!(ON_RT_THREAD.with(|cell| cell.get()));
227                     })
228                 })
229                 .collect::<Vec<_>>();
230             for joined in future::join_all(handles).await {
231                 joined.unwrap();
232             }
233         })
234         .await;
235 }
236 
237 #[cfg(not(tokio_wasi))] // Wasi doesn't support threads
238 #[tokio::test(flavor = "multi_thread")]
nested_spawn_is_local()239 async fn nested_spawn_is_local() {
240     thread_local! {
241         static ON_RT_THREAD: Cell<bool> = Cell::new(false);
242     }
243 
244     ON_RT_THREAD.with(|cell| cell.set(true));
245 
246     LocalSet::new()
247         .run_until(async {
248             assert!(ON_RT_THREAD.with(|cell| cell.get()));
249             task::spawn_local(async {
250                 assert!(ON_RT_THREAD.with(|cell| cell.get()));
251                 task::spawn_local(async {
252                     assert!(ON_RT_THREAD.with(|cell| cell.get()));
253                     task::spawn_local(async {
254                         assert!(ON_RT_THREAD.with(|cell| cell.get()));
255                         task::spawn_local(async {
256                             assert!(ON_RT_THREAD.with(|cell| cell.get()));
257                         })
258                         .await
259                         .unwrap();
260                     })
261                     .await
262                     .unwrap();
263                 })
264                 .await
265                 .unwrap();
266             })
267             .await
268             .unwrap();
269         })
270         .await;
271 }
272 
273 #[cfg(not(tokio_wasi))] // Wasi doesn't support threads
274 #[test]
join_local_future_elsewhere()275 fn join_local_future_elsewhere() {
276     thread_local! {
277         static ON_RT_THREAD: Cell<bool> = Cell::new(false);
278     }
279 
280     ON_RT_THREAD.with(|cell| cell.set(true));
281 
282     let rt = runtime::Runtime::new().unwrap();
283     let local = LocalSet::new();
284     local.block_on(&rt, async move {
285         let (tx, rx) = oneshot::channel();
286         let join = task::spawn_local(async move {
287             assert!(
288                 ON_RT_THREAD.with(|cell| cell.get()),
289                 "local task must run on local thread, no matter where it is awaited"
290             );
291             rx.await.unwrap();
292 
293             "hello world"
294         });
295         let join2 = task::spawn(async move {
296             assert!(
297                 !ON_RT_THREAD.with(|cell| cell.get()),
298                 "spawned task should be on a worker"
299             );
300 
301             tx.send(()).expect("task shouldn't have ended yet");
302 
303             join.await.expect("task should complete successfully");
304         });
305         join2.await.unwrap()
306     });
307 }
308 
309 // Tests for <https://github.com/tokio-rs/tokio/issues/4973>
310 #[cfg(not(tokio_wasi))] // Wasi doesn't support threads
311 #[tokio::test(flavor = "multi_thread")]
localset_in_thread_local()312 async fn localset_in_thread_local() {
313     thread_local! {
314         static LOCAL_SET: LocalSet = LocalSet::new();
315     }
316 
317     // holds runtime thread until end of main fn.
318     let (_tx, rx) = oneshot::channel::<()>();
319     let handle = tokio::runtime::Handle::current();
320 
321     std::thread::spawn(move || {
322         LOCAL_SET.with(|local_set| {
323             handle.block_on(local_set.run_until(async move {
324                 let _ = rx.await;
325             }))
326         });
327     });
328 }
329 
330 #[test]
drop_cancels_tasks()331 fn drop_cancels_tasks() {
332     use std::rc::Rc;
333 
334     // This test reproduces issue #1842
335     let rt = rt();
336     let rc1 = Rc::new(());
337     let rc2 = rc1.clone();
338 
339     let (started_tx, started_rx) = oneshot::channel();
340 
341     let local = LocalSet::new();
342     local.spawn_local(async move {
343         // Move this in
344         let _rc2 = rc2;
345 
346         started_tx.send(()).unwrap();
347         futures::future::pending::<()>().await;
348     });
349 
350     local.block_on(&rt, async {
351         started_rx.await.unwrap();
352     });
353     drop(local);
354     drop(rt);
355 
356     assert_eq!(1, Rc::strong_count(&rc1));
357 }
358 
359 /// Runs a test function in a separate thread, and panics if the test does not
360 /// complete within the specified timeout, or if the test function panics.
361 ///
362 /// This is intended for running tests whose failure mode is a hang or infinite
363 /// loop that cannot be detected otherwise.
with_timeout(timeout: Duration, f: impl FnOnce() + Send + 'static)364 fn with_timeout(timeout: Duration, f: impl FnOnce() + Send + 'static) {
365     use std::sync::mpsc::RecvTimeoutError;
366 
367     let (done_tx, done_rx) = std::sync::mpsc::channel();
368     let thread = std::thread::spawn(move || {
369         f();
370 
371         // Send a message on the channel so that the test thread can
372         // determine if we have entered an infinite loop:
373         done_tx.send(()).unwrap();
374     });
375 
376     // Since the failure mode of this test is an infinite loop, rather than
377     // something we can easily make assertions about, we'll run it in a
378     // thread. When the test thread finishes, it will send a message on a
379     // channel to this thread. We'll wait for that message with a fairly
380     // generous timeout, and if we don't receive it, we assume the test
381     // thread has hung.
382     //
383     // Note that it should definitely complete in under a minute, but just
384     // in case CI is slow, we'll give it a long timeout.
385     match done_rx.recv_timeout(timeout) {
386         Err(RecvTimeoutError::Timeout) => panic!(
387             "test did not complete within {:?} seconds, \
388              we have (probably) entered an infinite loop!",
389             timeout,
390         ),
391         // Did the test thread panic? We'll find out for sure when we `join`
392         // with it.
393         Err(RecvTimeoutError::Disconnected) => {}
394         // Test completed successfully!
395         Ok(()) => {}
396     }
397 
398     thread.join().expect("test thread should not panic!")
399 }
400 
401 #[cfg_attr(tokio_wasi, ignore = "`unwrap()` in `with_timeout()` panics on Wasi")]
402 #[test]
drop_cancels_remote_tasks()403 fn drop_cancels_remote_tasks() {
404     // This test reproduces issue #1885.
405     with_timeout(Duration::from_secs(60), || {
406         let (tx, mut rx) = mpsc::channel::<()>(1024);
407 
408         let rt = rt();
409 
410         let local = LocalSet::new();
411         local.spawn_local(async move { while rx.recv().await.is_some() {} });
412         local.block_on(&rt, async {
413             time::sleep(Duration::from_millis(1)).await;
414         });
415 
416         drop(tx);
417 
418         // This enters an infinite loop if the remote notified tasks are not
419         // properly cancelled.
420         drop(local);
421     });
422 }
423 
424 #[cfg_attr(
425     tokio_wasi,
426     ignore = "FIXME: `task::spawn_local().await.unwrap()` panics on Wasi"
427 )]
428 #[test]
local_tasks_wake_join_all()429 fn local_tasks_wake_join_all() {
430     // This test reproduces issue #2460.
431     with_timeout(Duration::from_secs(60), || {
432         use futures::future::join_all;
433         use tokio::task::LocalSet;
434 
435         let rt = rt();
436         let set = LocalSet::new();
437         let mut handles = Vec::new();
438 
439         for _ in 1..=128 {
440             handles.push(set.spawn_local(async move {
441                 tokio::task::spawn_local(async move {}).await.unwrap();
442             }));
443         }
444 
445         rt.block_on(set.run_until(join_all(handles)));
446     });
447 }
448 
449 #[cfg(not(tokio_wasi))] // Wasi doesn't support panic recovery
450 #[test]
local_tasks_are_polled_after_tick()451 fn local_tasks_are_polled_after_tick() {
452     // This test depends on timing, so we run it up to five times.
453     for _ in 0..4 {
454         let res = std::panic::catch_unwind(local_tasks_are_polled_after_tick_inner);
455         if res.is_ok() {
456             // success
457             return;
458         }
459     }
460 
461     // Test failed 4 times. Try one more time without catching panics. If it
462     // fails again, the test fails.
463     local_tasks_are_polled_after_tick_inner();
464 }
465 
466 #[cfg(not(tokio_wasi))] // Wasi doesn't support panic recovery
467 #[tokio::main(flavor = "current_thread")]
local_tasks_are_polled_after_tick_inner()468 async fn local_tasks_are_polled_after_tick_inner() {
469     // Reproduces issues #1899 and #1900
470 
471     static RX1: AtomicUsize = AtomicUsize::new(0);
472     static RX2: AtomicUsize = AtomicUsize::new(0);
473     const EXPECTED: usize = 500;
474 
475     RX1.store(0, SeqCst);
476     RX2.store(0, SeqCst);
477 
478     let (tx, mut rx) = mpsc::unbounded_channel();
479 
480     let local = LocalSet::new();
481 
482     local
483         .run_until(async {
484             let task2 = task::spawn(async move {
485                 // Wait a bit
486                 time::sleep(Duration::from_millis(10)).await;
487 
488                 let mut oneshots = Vec::with_capacity(EXPECTED);
489 
490                 // Send values
491                 for _ in 0..EXPECTED {
492                     let (oneshot_tx, oneshot_rx) = oneshot::channel();
493                     oneshots.push(oneshot_tx);
494                     tx.send(oneshot_rx).unwrap();
495                 }
496 
497                 time::sleep(Duration::from_millis(10)).await;
498 
499                 for tx in oneshots.drain(..) {
500                     tx.send(()).unwrap();
501                 }
502 
503                 loop {
504                     time::sleep(Duration::from_millis(20)).await;
505                     let rx1 = RX1.load(SeqCst);
506                     let rx2 = RX2.load(SeqCst);
507 
508                     if rx1 == EXPECTED && rx2 == EXPECTED {
509                         break;
510                     }
511                 }
512             });
513 
514             while let Some(oneshot) = rx.recv().await {
515                 RX1.fetch_add(1, SeqCst);
516 
517                 task::spawn_local(async move {
518                     oneshot.await.unwrap();
519                     RX2.fetch_add(1, SeqCst);
520                 });
521             }
522 
523             task2.await.unwrap();
524         })
525         .await;
526 }
527 
528 #[tokio::test]
acquire_mutex_in_drop()529 async fn acquire_mutex_in_drop() {
530     use futures::future::pending;
531 
532     let (tx1, rx1) = oneshot::channel();
533     let (tx2, rx2) = oneshot::channel();
534     let local = LocalSet::new();
535 
536     local.spawn_local(async move {
537         let _ = rx2.await;
538         unreachable!();
539     });
540 
541     local.spawn_local(async move {
542         let _ = rx1.await;
543         tx2.send(()).unwrap();
544         unreachable!();
545     });
546 
547     // Spawn a task that will never notify
548     local.spawn_local(async move {
549         pending::<()>().await;
550         tx1.send(()).unwrap();
551     });
552 
553     // Tick the loop
554     local
555         .run_until(async {
556             task::yield_now().await;
557         })
558         .await;
559 
560     // Drop the LocalSet
561     drop(local);
562 }
563 
564 #[tokio::test]
spawn_wakes_localset()565 async fn spawn_wakes_localset() {
566     let local = LocalSet::new();
567     futures::select! {
568         _ = local.run_until(pending::<()>()).fuse() => unreachable!(),
569         ret = async { local.spawn_local(ready(())).await.unwrap()}.fuse() => ret
570     }
571 }
572 
573 #[test]
store_local_set_in_thread_local_with_runtime()574 fn store_local_set_in_thread_local_with_runtime() {
575     use tokio::runtime::Runtime;
576 
577     thread_local! {
578         static CURRENT: RtAndLocalSet = RtAndLocalSet::new();
579     }
580 
581     struct RtAndLocalSet {
582         rt: Runtime,
583         local: LocalSet,
584     }
585 
586     impl RtAndLocalSet {
587         fn new() -> RtAndLocalSet {
588             RtAndLocalSet {
589                 rt: tokio::runtime::Builder::new_current_thread()
590                     .enable_all()
591                     .build()
592                     .unwrap(),
593                 local: LocalSet::new(),
594             }
595         }
596 
597         async fn inner_method(&self) {
598             self.local
599                 .run_until(async move {
600                     tokio::task::spawn_local(async {});
601                 })
602                 .await
603         }
604 
605         fn method(&self) {
606             self.rt.block_on(self.inner_method());
607         }
608     }
609 
610     CURRENT.with(|f| {
611         f.method();
612     });
613 }
614 
615 #[cfg(tokio_unstable)]
616 mod unstable {
617     use tokio::runtime::UnhandledPanic;
618     use tokio::task::LocalSet;
619 
620     #[tokio::test]
621     #[should_panic(
622         expected = "a spawned task panicked and the LocalSet is configured to shutdown on unhandled panic"
623     )]
shutdown_on_panic()624     async fn shutdown_on_panic() {
625         LocalSet::new()
626             .unhandled_panic(UnhandledPanic::ShutdownRuntime)
627             .run_until(async {
628                 tokio::task::spawn_local(async {
629                     panic!("boom");
630                 });
631 
632                 futures::future::pending::<()>().await;
633             })
634             .await;
635     }
636 
637     // This test compares that, when the task driving `run_until` has already
638     // consumed budget, the `run_until` future has less budget than a "spawned"
639     // task.
640     //
641     // "Budget" is a fuzzy metric as the Tokio runtime is able to change values
642     // internally. This is why the test uses indirection to test this.
643     #[tokio::test]
run_until_does_not_get_own_budget()644     async fn run_until_does_not_get_own_budget() {
645         // Consume some budget
646         tokio::task::consume_budget().await;
647 
648         LocalSet::new()
649             .run_until(async {
650                 let spawned = tokio::spawn(async {
651                     let mut spawned_n = 0;
652 
653                     {
654                         let mut spawned = tokio_test::task::spawn(async {
655                             loop {
656                                 spawned_n += 1;
657                                 tokio::task::consume_budget().await;
658                             }
659                         });
660                         // Poll once
661                         assert!(!spawned.poll().is_ready());
662                     }
663 
664                     spawned_n
665                 });
666 
667                 let mut run_until_n = 0;
668                 {
669                     let mut run_until = tokio_test::task::spawn(async {
670                         loop {
671                             run_until_n += 1;
672                             tokio::task::consume_budget().await;
673                         }
674                     });
675                     // Poll once
676                     assert!(!run_until.poll().is_ready());
677                 }
678 
679                 let spawned_n = spawned.await.unwrap();
680                 assert_ne!(spawned_n, 0);
681                 assert_ne!(run_until_n, 0);
682                 assert!(spawned_n > run_until_n);
683             })
684             .await
685     }
686 }
687 
rt() -> runtime::Runtime688 fn rt() -> runtime::Runtime {
689     tokio::runtime::Builder::new_current_thread()
690         .enable_all()
691         .build()
692         .unwrap()
693 }
694