1 #![warn(rust_2018_idioms)]
2 #![cfg(feature = "full")]
3
4 use futures::{
5 future::{pending, ready},
6 FutureExt,
7 };
8
9 use tokio::runtime;
10 use tokio::sync::{mpsc, oneshot};
11 use tokio::task::{self, LocalSet};
12 use tokio::time;
13
14 #[cfg(not(target_os = "wasi"))]
15 use std::cell::Cell;
16 use std::sync::atomic::AtomicBool;
17 #[cfg(not(target_os = "wasi"))]
18 use std::sync::atomic::AtomicUsize;
19 use std::sync::atomic::Ordering;
20 #[cfg(not(target_os = "wasi"))]
21 use std::sync::atomic::Ordering::SeqCst;
22 use std::time::Duration;
23
24 #[tokio::test(flavor = "current_thread")]
local_current_thread_scheduler()25 async fn local_current_thread_scheduler() {
26 LocalSet::new()
27 .run_until(async {
28 task::spawn_local(async {}).await.unwrap();
29 })
30 .await;
31 }
32
33 #[cfg(not(target_os = "wasi"))] // Wasi doesn't support threads
34 #[tokio::test(flavor = "multi_thread")]
local_threadpool()35 async fn local_threadpool() {
36 thread_local! {
37 static ON_RT_THREAD: Cell<bool> = Cell::new(false);
38 }
39
40 ON_RT_THREAD.with(|cell| cell.set(true));
41
42 LocalSet::new()
43 .run_until(async {
44 assert!(ON_RT_THREAD.with(|cell| cell.get()));
45 task::spawn_local(async {
46 assert!(ON_RT_THREAD.with(|cell| cell.get()));
47 })
48 .await
49 .unwrap();
50 })
51 .await;
52 }
53
54 #[cfg(not(target_os = "wasi"))] // Wasi doesn't support threads
55 #[tokio::test(flavor = "multi_thread")]
localset_future_threadpool()56 async fn localset_future_threadpool() {
57 thread_local! {
58 static ON_LOCAL_THREAD: Cell<bool> = Cell::new(false);
59 }
60
61 ON_LOCAL_THREAD.with(|cell| cell.set(true));
62
63 let local = LocalSet::new();
64 local.spawn_local(async move {
65 assert!(ON_LOCAL_THREAD.with(|cell| cell.get()));
66 });
67 local.await;
68 }
69
70 #[cfg(not(target_os = "wasi"))] // Wasi doesn't support threads
71 #[tokio::test(flavor = "multi_thread")]
localset_future_timers()72 async fn localset_future_timers() {
73 static RAN1: AtomicBool = AtomicBool::new(false);
74 static RAN2: AtomicBool = AtomicBool::new(false);
75
76 let local = LocalSet::new();
77 local.spawn_local(async move {
78 time::sleep(Duration::from_millis(5)).await;
79 RAN1.store(true, Ordering::SeqCst);
80 });
81 local.spawn_local(async move {
82 time::sleep(Duration::from_millis(10)).await;
83 RAN2.store(true, Ordering::SeqCst);
84 });
85 local.await;
86 assert!(RAN1.load(Ordering::SeqCst));
87 assert!(RAN2.load(Ordering::SeqCst));
88 }
89
90 #[tokio::test]
localset_future_drives_all_local_futs()91 async fn localset_future_drives_all_local_futs() {
92 static RAN1: AtomicBool = AtomicBool::new(false);
93 static RAN2: AtomicBool = AtomicBool::new(false);
94 static RAN3: AtomicBool = AtomicBool::new(false);
95
96 let local = LocalSet::new();
97 local.spawn_local(async move {
98 task::spawn_local(async {
99 task::yield_now().await;
100 RAN3.store(true, Ordering::SeqCst);
101 });
102 task::yield_now().await;
103 RAN1.store(true, Ordering::SeqCst);
104 });
105 local.spawn_local(async move {
106 task::yield_now().await;
107 RAN2.store(true, Ordering::SeqCst);
108 });
109 local.await;
110 assert!(RAN1.load(Ordering::SeqCst));
111 assert!(RAN2.load(Ordering::SeqCst));
112 assert!(RAN3.load(Ordering::SeqCst));
113 }
114
115 #[cfg(not(target_os = "wasi"))] // Wasi doesn't support threads
116 #[tokio::test(flavor = "multi_thread")]
local_threadpool_timer()117 async fn local_threadpool_timer() {
118 // This test ensures that runtime services like the timer are properly
119 // set for the local task set.
120 thread_local! {
121 static ON_RT_THREAD: Cell<bool> = Cell::new(false);
122 }
123
124 ON_RT_THREAD.with(|cell| cell.set(true));
125
126 LocalSet::new()
127 .run_until(async {
128 assert!(ON_RT_THREAD.with(|cell| cell.get()));
129 let join = task::spawn_local(async move {
130 assert!(ON_RT_THREAD.with(|cell| cell.get()));
131 time::sleep(Duration::from_millis(10)).await;
132 assert!(ON_RT_THREAD.with(|cell| cell.get()));
133 });
134 join.await.unwrap();
135 })
136 .await;
137 }
138 #[test]
enter_guard_spawn()139 fn enter_guard_spawn() {
140 let local = LocalSet::new();
141 let _guard = local.enter();
142 // Run the local task set.
143
144 let join = task::spawn_local(async { true });
145 let rt = runtime::Builder::new_current_thread()
146 .enable_all()
147 .build()
148 .unwrap();
149 local.block_on(&rt, async move {
150 assert!(join.await.unwrap());
151 });
152 }
153
154 #[cfg(not(target_os = "wasi"))] // Wasi doesn't support panic recovery
155 #[test]
156 // This will panic, since the thread that calls `block_on` cannot use
157 // in-place blocking inside of `block_on`.
158 #[should_panic]
local_threadpool_blocking_in_place()159 fn local_threadpool_blocking_in_place() {
160 thread_local! {
161 static ON_RT_THREAD: Cell<bool> = Cell::new(false);
162 }
163
164 ON_RT_THREAD.with(|cell| cell.set(true));
165
166 let rt = runtime::Builder::new_current_thread()
167 .enable_all()
168 .build()
169 .unwrap();
170 LocalSet::new().block_on(&rt, async {
171 assert!(ON_RT_THREAD.with(|cell| cell.get()));
172 let join = task::spawn_local(async move {
173 assert!(ON_RT_THREAD.with(|cell| cell.get()));
174 task::block_in_place(|| {});
175 assert!(ON_RT_THREAD.with(|cell| cell.get()));
176 });
177 join.await.unwrap();
178 });
179 }
180
181 #[cfg(not(target_os = "wasi"))] // Wasi doesn't support threads
182 #[tokio::test(flavor = "multi_thread")]
local_threadpool_blocking_run()183 async fn local_threadpool_blocking_run() {
184 thread_local! {
185 static ON_RT_THREAD: Cell<bool> = Cell::new(false);
186 }
187
188 ON_RT_THREAD.with(|cell| cell.set(true));
189
190 LocalSet::new()
191 .run_until(async {
192 assert!(ON_RT_THREAD.with(|cell| cell.get()));
193 let join = task::spawn_local(async move {
194 assert!(ON_RT_THREAD.with(|cell| cell.get()));
195 task::spawn_blocking(|| {
196 assert!(
197 !ON_RT_THREAD.with(|cell| cell.get()),
198 "blocking must not run on the local task set's thread"
199 );
200 })
201 .await
202 .unwrap();
203 assert!(ON_RT_THREAD.with(|cell| cell.get()));
204 });
205 join.await.unwrap();
206 })
207 .await;
208 }
209
210 #[cfg(not(target_os = "wasi"))] // Wasi doesn't support threads
211 #[tokio::test(flavor = "multi_thread")]
all_spawns_are_local()212 async fn all_spawns_are_local() {
213 use futures::future;
214 thread_local! {
215 static ON_RT_THREAD: Cell<bool> = Cell::new(false);
216 }
217
218 ON_RT_THREAD.with(|cell| cell.set(true));
219
220 LocalSet::new()
221 .run_until(async {
222 assert!(ON_RT_THREAD.with(|cell| cell.get()));
223 let handles = (0..128)
224 .map(|_| {
225 task::spawn_local(async {
226 assert!(ON_RT_THREAD.with(|cell| cell.get()));
227 })
228 })
229 .collect::<Vec<_>>();
230 for joined in future::join_all(handles).await {
231 joined.unwrap();
232 }
233 })
234 .await;
235 }
236
237 #[cfg(not(target_os = "wasi"))] // Wasi doesn't support threads
238 #[tokio::test(flavor = "multi_thread")]
nested_spawn_is_local()239 async fn nested_spawn_is_local() {
240 thread_local! {
241 static ON_RT_THREAD: Cell<bool> = Cell::new(false);
242 }
243
244 ON_RT_THREAD.with(|cell| cell.set(true));
245
246 LocalSet::new()
247 .run_until(async {
248 assert!(ON_RT_THREAD.with(|cell| cell.get()));
249 task::spawn_local(async {
250 assert!(ON_RT_THREAD.with(|cell| cell.get()));
251 task::spawn_local(async {
252 assert!(ON_RT_THREAD.with(|cell| cell.get()));
253 task::spawn_local(async {
254 assert!(ON_RT_THREAD.with(|cell| cell.get()));
255 task::spawn_local(async {
256 assert!(ON_RT_THREAD.with(|cell| cell.get()));
257 })
258 .await
259 .unwrap();
260 })
261 .await
262 .unwrap();
263 })
264 .await
265 .unwrap();
266 })
267 .await
268 .unwrap();
269 })
270 .await;
271 }
272
273 #[cfg(not(target_os = "wasi"))] // Wasi doesn't support threads
274 #[test]
join_local_future_elsewhere()275 fn join_local_future_elsewhere() {
276 thread_local! {
277 static ON_RT_THREAD: Cell<bool> = Cell::new(false);
278 }
279
280 ON_RT_THREAD.with(|cell| cell.set(true));
281
282 let rt = runtime::Runtime::new().unwrap();
283 let local = LocalSet::new();
284 local.block_on(&rt, async move {
285 let (tx, rx) = oneshot::channel();
286 let join = task::spawn_local(async move {
287 assert!(
288 ON_RT_THREAD.with(|cell| cell.get()),
289 "local task must run on local thread, no matter where it is awaited"
290 );
291 rx.await.unwrap();
292
293 "hello world"
294 });
295 let join2 = task::spawn(async move {
296 assert!(
297 !ON_RT_THREAD.with(|cell| cell.get()),
298 "spawned task should be on a worker"
299 );
300
301 tx.send(()).expect("task shouldn't have ended yet");
302
303 join.await.expect("task should complete successfully");
304 });
305 join2.await.unwrap()
306 });
307 }
308
309 // Tests for <https://github.com/tokio-rs/tokio/issues/4973>
310 #[cfg(not(target_os = "wasi"))] // Wasi doesn't support threads
311 #[tokio::test(flavor = "multi_thread")]
localset_in_thread_local()312 async fn localset_in_thread_local() {
313 thread_local! {
314 static LOCAL_SET: LocalSet = LocalSet::new();
315 }
316
317 // holds runtime thread until end of main fn.
318 let (_tx, rx) = oneshot::channel::<()>();
319 let handle = tokio::runtime::Handle::current();
320
321 std::thread::spawn(move || {
322 LOCAL_SET.with(|local_set| {
323 handle.block_on(local_set.run_until(async move {
324 let _ = rx.await;
325 }))
326 });
327 });
328 }
329
330 #[test]
drop_cancels_tasks()331 fn drop_cancels_tasks() {
332 use std::rc::Rc;
333
334 // This test reproduces issue #1842
335 let rt = rt();
336 let rc1 = Rc::new(());
337 let rc2 = rc1.clone();
338
339 let (started_tx, started_rx) = oneshot::channel();
340
341 let local = LocalSet::new();
342 local.spawn_local(async move {
343 // Move this in
344 let _rc2 = rc2;
345
346 started_tx.send(()).unwrap();
347 futures::future::pending::<()>().await;
348 });
349
350 local.block_on(&rt, async {
351 started_rx.await.unwrap();
352 });
353 drop(local);
354 drop(rt);
355
356 assert_eq!(1, Rc::strong_count(&rc1));
357 }
358
359 /// Runs a test function in a separate thread, and panics if the test does not
360 /// complete within the specified timeout, or if the test function panics.
361 ///
362 /// This is intended for running tests whose failure mode is a hang or infinite
363 /// loop that cannot be detected otherwise.
with_timeout(timeout: Duration, f: impl FnOnce() + Send + 'static)364 fn with_timeout(timeout: Duration, f: impl FnOnce() + Send + 'static) {
365 use std::sync::mpsc::RecvTimeoutError;
366
367 let (done_tx, done_rx) = std::sync::mpsc::channel();
368 let thread = std::thread::spawn(move || {
369 f();
370
371 // Send a message on the channel so that the test thread can
372 // determine if we have entered an infinite loop:
373 done_tx.send(()).unwrap();
374 });
375
376 // Since the failure mode of this test is an infinite loop, rather than
377 // something we can easily make assertions about, we'll run it in a
378 // thread. When the test thread finishes, it will send a message on a
379 // channel to this thread. We'll wait for that message with a fairly
380 // generous timeout, and if we don't receive it, we assume the test
381 // thread has hung.
382 //
383 // Note that it should definitely complete in under a minute, but just
384 // in case CI is slow, we'll give it a long timeout.
385 match done_rx.recv_timeout(timeout) {
386 Err(RecvTimeoutError::Timeout) => panic!(
387 "test did not complete within {:?} seconds, \
388 we have (probably) entered an infinite loop!",
389 timeout,
390 ),
391 // Did the test thread panic? We'll find out for sure when we `join`
392 // with it.
393 Err(RecvTimeoutError::Disconnected) => {}
394 // Test completed successfully!
395 Ok(()) => {}
396 }
397
398 thread.join().expect("test thread should not panic!")
399 }
400
401 #[cfg_attr(
402 target_os = "wasi",
403 ignore = "`unwrap()` in `with_timeout()` panics on Wasi"
404 )]
405 #[test]
drop_cancels_remote_tasks()406 fn drop_cancels_remote_tasks() {
407 // This test reproduces issue #1885.
408 with_timeout(Duration::from_secs(60), || {
409 let (tx, mut rx) = mpsc::channel::<()>(1024);
410
411 let rt = rt();
412
413 let local = LocalSet::new();
414 local.spawn_local(async move { while rx.recv().await.is_some() {} });
415 local.block_on(&rt, async {
416 time::sleep(Duration::from_millis(1)).await;
417 });
418
419 drop(tx);
420
421 // This enters an infinite loop if the remote notified tasks are not
422 // properly cancelled.
423 drop(local);
424 });
425 }
426
427 #[cfg_attr(
428 target_os = "wasi",
429 ignore = "FIXME: `task::spawn_local().await.unwrap()` panics on Wasi"
430 )]
431 #[test]
local_tasks_wake_join_all()432 fn local_tasks_wake_join_all() {
433 // This test reproduces issue #2460.
434 with_timeout(Duration::from_secs(60), || {
435 use futures::future::join_all;
436 use tokio::task::LocalSet;
437
438 let rt = rt();
439 let set = LocalSet::new();
440 let mut handles = Vec::new();
441
442 for _ in 1..=128 {
443 handles.push(set.spawn_local(async move {
444 tokio::task::spawn_local(async move {}).await.unwrap();
445 }));
446 }
447
448 rt.block_on(set.run_until(join_all(handles)));
449 });
450 }
451
452 #[cfg(not(target_os = "wasi"))] // Wasi doesn't support panic recovery
453 #[test]
local_tasks_are_polled_after_tick()454 fn local_tasks_are_polled_after_tick() {
455 // This test depends on timing, so we run it up to five times.
456 for _ in 0..4 {
457 let res = std::panic::catch_unwind(local_tasks_are_polled_after_tick_inner);
458 if res.is_ok() {
459 // success
460 return;
461 }
462 }
463
464 // Test failed 4 times. Try one more time without catching panics. If it
465 // fails again, the test fails.
466 local_tasks_are_polled_after_tick_inner();
467 }
468
469 #[cfg(not(target_os = "wasi"))] // Wasi doesn't support panic recovery
470 #[tokio::main(flavor = "current_thread")]
local_tasks_are_polled_after_tick_inner()471 async fn local_tasks_are_polled_after_tick_inner() {
472 // Reproduces issues #1899 and #1900
473
474 static RX1: AtomicUsize = AtomicUsize::new(0);
475 static RX2: AtomicUsize = AtomicUsize::new(0);
476 const EXPECTED: usize = 500;
477
478 RX1.store(0, SeqCst);
479 RX2.store(0, SeqCst);
480
481 let (tx, mut rx) = mpsc::unbounded_channel();
482
483 let local = LocalSet::new();
484
485 local
486 .run_until(async {
487 let task2 = task::spawn(async move {
488 // Wait a bit
489 time::sleep(Duration::from_millis(10)).await;
490
491 let mut oneshots = Vec::with_capacity(EXPECTED);
492
493 // Send values
494 for _ in 0..EXPECTED {
495 let (oneshot_tx, oneshot_rx) = oneshot::channel();
496 oneshots.push(oneshot_tx);
497 tx.send(oneshot_rx).unwrap();
498 }
499
500 time::sleep(Duration::from_millis(10)).await;
501
502 for tx in oneshots.drain(..) {
503 tx.send(()).unwrap();
504 }
505
506 loop {
507 time::sleep(Duration::from_millis(20)).await;
508 let rx1 = RX1.load(SeqCst);
509 let rx2 = RX2.load(SeqCst);
510
511 if rx1 == EXPECTED && rx2 == EXPECTED {
512 break;
513 }
514 }
515 });
516
517 while let Some(oneshot) = rx.recv().await {
518 RX1.fetch_add(1, SeqCst);
519
520 task::spawn_local(async move {
521 oneshot.await.unwrap();
522 RX2.fetch_add(1, SeqCst);
523 });
524 }
525
526 task2.await.unwrap();
527 })
528 .await;
529 }
530
531 #[tokio::test]
acquire_mutex_in_drop()532 async fn acquire_mutex_in_drop() {
533 use futures::future::pending;
534
535 let (tx1, rx1) = oneshot::channel();
536 let (tx2, rx2) = oneshot::channel();
537 let local = LocalSet::new();
538
539 local.spawn_local(async move {
540 let _ = rx2.await;
541 unreachable!();
542 });
543
544 local.spawn_local(async move {
545 let _ = rx1.await;
546 tx2.send(()).unwrap();
547 unreachable!();
548 });
549
550 // Spawn a task that will never notify
551 local.spawn_local(async move {
552 pending::<()>().await;
553 tx1.send(()).unwrap();
554 });
555
556 // Tick the loop
557 local
558 .run_until(async {
559 task::yield_now().await;
560 })
561 .await;
562
563 // Drop the LocalSet
564 drop(local);
565 }
566
567 #[tokio::test]
spawn_wakes_localset()568 async fn spawn_wakes_localset() {
569 let local = LocalSet::new();
570 futures::select! {
571 _ = local.run_until(pending::<()>()).fuse() => unreachable!(),
572 ret = async { local.spawn_local(ready(())).await.unwrap()}.fuse() => ret
573 }
574 }
575
576 #[test]
store_local_set_in_thread_local_with_runtime()577 fn store_local_set_in_thread_local_with_runtime() {
578 use tokio::runtime::Runtime;
579
580 thread_local! {
581 static CURRENT: RtAndLocalSet = RtAndLocalSet::new();
582 }
583
584 struct RtAndLocalSet {
585 rt: Runtime,
586 local: LocalSet,
587 }
588
589 impl RtAndLocalSet {
590 fn new() -> RtAndLocalSet {
591 RtAndLocalSet {
592 rt: tokio::runtime::Builder::new_current_thread()
593 .enable_all()
594 .build()
595 .unwrap(),
596 local: LocalSet::new(),
597 }
598 }
599
600 async fn inner_method(&self) {
601 self.local
602 .run_until(async move {
603 tokio::task::spawn_local(async {});
604 })
605 .await
606 }
607
608 fn method(&self) {
609 self.rt.block_on(self.inner_method());
610 }
611 }
612
613 CURRENT.with(|f| {
614 f.method();
615 });
616 }
617
618 #[cfg(tokio_unstable)]
619 mod unstable {
620 use tokio::runtime::UnhandledPanic;
621 use tokio::task::LocalSet;
622
623 #[tokio::test]
624 #[should_panic(
625 expected = "a spawned task panicked and the LocalSet is configured to shutdown on unhandled panic"
626 )]
shutdown_on_panic()627 async fn shutdown_on_panic() {
628 LocalSet::new()
629 .unhandled_panic(UnhandledPanic::ShutdownRuntime)
630 .run_until(async {
631 tokio::task::spawn_local(async {
632 panic!("boom");
633 });
634
635 futures::future::pending::<()>().await;
636 })
637 .await;
638 }
639
640 // This test compares that, when the task driving `run_until` has already
641 // consumed budget, the `run_until` future has less budget than a "spawned"
642 // task.
643 //
644 // "Budget" is a fuzzy metric as the Tokio runtime is able to change values
645 // internally. This is why the test uses indirection to test this.
646 #[tokio::test]
run_until_does_not_get_own_budget()647 async fn run_until_does_not_get_own_budget() {
648 // Consume some budget
649 tokio::task::consume_budget().await;
650
651 LocalSet::new()
652 .run_until(async {
653 let spawned = tokio::spawn(async {
654 let mut spawned_n = 0;
655
656 {
657 let mut spawned = tokio_test::task::spawn(async {
658 loop {
659 spawned_n += 1;
660 tokio::task::consume_budget().await;
661 }
662 });
663 // Poll once
664 assert!(!spawned.poll().is_ready());
665 }
666
667 spawned_n
668 });
669
670 let mut run_until_n = 0;
671 {
672 let mut run_until = tokio_test::task::spawn(async {
673 loop {
674 run_until_n += 1;
675 tokio::task::consume_budget().await;
676 }
677 });
678 // Poll once
679 assert!(!run_until.poll().is_ready());
680 }
681
682 let spawned_n = spawned.await.unwrap();
683 assert_ne!(spawned_n, 0);
684 assert_ne!(run_until_n, 0);
685 assert!(spawned_n > run_until_n);
686 })
687 .await
688 }
689 }
690
rt() -> runtime::Runtime691 fn rt() -> runtime::Runtime {
692 tokio::runtime::Builder::new_current_thread()
693 .enable_all()
694 .build()
695 .unwrap()
696 }
697