1 #![warn(rust_2018_idioms)]
2 #![cfg(feature = "full")]
3
4 use futures::{
5 future::{pending, ready},
6 FutureExt,
7 };
8
9 use tokio::runtime;
10 use tokio::sync::{mpsc, oneshot};
11 use tokio::task::{self, LocalSet};
12 use tokio::time;
13
14 #[cfg(not(tokio_wasi))]
15 use std::cell::Cell;
16 use std::sync::atomic::AtomicBool;
17 #[cfg(not(tokio_wasi))]
18 use std::sync::atomic::AtomicUsize;
19 use std::sync::atomic::Ordering;
20 #[cfg(not(tokio_wasi))]
21 use std::sync::atomic::Ordering::SeqCst;
22 use std::time::Duration;
23
24 #[tokio::test(flavor = "current_thread")]
local_current_thread_scheduler()25 async fn local_current_thread_scheduler() {
26 LocalSet::new()
27 .run_until(async {
28 task::spawn_local(async {}).await.unwrap();
29 })
30 .await;
31 }
32
33 #[cfg(not(tokio_wasi))] // Wasi doesn't support threads
34 #[tokio::test(flavor = "multi_thread")]
local_threadpool()35 async fn local_threadpool() {
36 thread_local! {
37 static ON_RT_THREAD: Cell<bool> = Cell::new(false);
38 }
39
40 ON_RT_THREAD.with(|cell| cell.set(true));
41
42 LocalSet::new()
43 .run_until(async {
44 assert!(ON_RT_THREAD.with(|cell| cell.get()));
45 task::spawn_local(async {
46 assert!(ON_RT_THREAD.with(|cell| cell.get()));
47 })
48 .await
49 .unwrap();
50 })
51 .await;
52 }
53
54 #[cfg(not(tokio_wasi))] // Wasi doesn't support threads
55 #[tokio::test(flavor = "multi_thread")]
localset_future_threadpool()56 async fn localset_future_threadpool() {
57 thread_local! {
58 static ON_LOCAL_THREAD: Cell<bool> = Cell::new(false);
59 }
60
61 ON_LOCAL_THREAD.with(|cell| cell.set(true));
62
63 let local = LocalSet::new();
64 local.spawn_local(async move {
65 assert!(ON_LOCAL_THREAD.with(|cell| cell.get()));
66 });
67 local.await;
68 }
69
70 #[cfg(not(tokio_wasi))] // Wasi doesn't support threads
71 #[tokio::test(flavor = "multi_thread")]
localset_future_timers()72 async fn localset_future_timers() {
73 static RAN1: AtomicBool = AtomicBool::new(false);
74 static RAN2: AtomicBool = AtomicBool::new(false);
75
76 let local = LocalSet::new();
77 local.spawn_local(async move {
78 time::sleep(Duration::from_millis(5)).await;
79 RAN1.store(true, Ordering::SeqCst);
80 });
81 local.spawn_local(async move {
82 time::sleep(Duration::from_millis(10)).await;
83 RAN2.store(true, Ordering::SeqCst);
84 });
85 local.await;
86 assert!(RAN1.load(Ordering::SeqCst));
87 assert!(RAN2.load(Ordering::SeqCst));
88 }
89
90 #[tokio::test]
localset_future_drives_all_local_futs()91 async fn localset_future_drives_all_local_futs() {
92 static RAN1: AtomicBool = AtomicBool::new(false);
93 static RAN2: AtomicBool = AtomicBool::new(false);
94 static RAN3: AtomicBool = AtomicBool::new(false);
95
96 let local = LocalSet::new();
97 local.spawn_local(async move {
98 task::spawn_local(async {
99 task::yield_now().await;
100 RAN3.store(true, Ordering::SeqCst);
101 });
102 task::yield_now().await;
103 RAN1.store(true, Ordering::SeqCst);
104 });
105 local.spawn_local(async move {
106 task::yield_now().await;
107 RAN2.store(true, Ordering::SeqCst);
108 });
109 local.await;
110 assert!(RAN1.load(Ordering::SeqCst));
111 assert!(RAN2.load(Ordering::SeqCst));
112 assert!(RAN3.load(Ordering::SeqCst));
113 }
114
115 #[cfg(not(tokio_wasi))] // Wasi doesn't support threads
116 #[tokio::test(flavor = "multi_thread")]
local_threadpool_timer()117 async fn local_threadpool_timer() {
118 // This test ensures that runtime services like the timer are properly
119 // set for the local task set.
120 thread_local! {
121 static ON_RT_THREAD: Cell<bool> = Cell::new(false);
122 }
123
124 ON_RT_THREAD.with(|cell| cell.set(true));
125
126 LocalSet::new()
127 .run_until(async {
128 assert!(ON_RT_THREAD.with(|cell| cell.get()));
129 let join = task::spawn_local(async move {
130 assert!(ON_RT_THREAD.with(|cell| cell.get()));
131 time::sleep(Duration::from_millis(10)).await;
132 assert!(ON_RT_THREAD.with(|cell| cell.get()));
133 });
134 join.await.unwrap();
135 })
136 .await;
137 }
138 #[test]
enter_guard_spawn()139 fn enter_guard_spawn() {
140 let local = LocalSet::new();
141 let _guard = local.enter();
142 // Run the local task set.
143
144 let join = task::spawn_local(async { true });
145 let rt = runtime::Builder::new_current_thread()
146 .enable_all()
147 .build()
148 .unwrap();
149 local.block_on(&rt, async move {
150 assert!(join.await.unwrap());
151 });
152 }
153
154 #[cfg(not(tokio_wasi))] // Wasi doesn't support panic recovery
155 #[test]
156 // This will panic, since the thread that calls `block_on` cannot use
157 // in-place blocking inside of `block_on`.
158 #[should_panic]
local_threadpool_blocking_in_place()159 fn local_threadpool_blocking_in_place() {
160 thread_local! {
161 static ON_RT_THREAD: Cell<bool> = Cell::new(false);
162 }
163
164 ON_RT_THREAD.with(|cell| cell.set(true));
165
166 let rt = runtime::Builder::new_current_thread()
167 .enable_all()
168 .build()
169 .unwrap();
170 LocalSet::new().block_on(&rt, async {
171 assert!(ON_RT_THREAD.with(|cell| cell.get()));
172 let join = task::spawn_local(async move {
173 assert!(ON_RT_THREAD.with(|cell| cell.get()));
174 task::block_in_place(|| {});
175 assert!(ON_RT_THREAD.with(|cell| cell.get()));
176 });
177 join.await.unwrap();
178 });
179 }
180
181 #[cfg(not(tokio_wasi))] // Wasi doesn't support threads
182 #[tokio::test(flavor = "multi_thread")]
local_threadpool_blocking_run()183 async fn local_threadpool_blocking_run() {
184 thread_local! {
185 static ON_RT_THREAD: Cell<bool> = Cell::new(false);
186 }
187
188 ON_RT_THREAD.with(|cell| cell.set(true));
189
190 LocalSet::new()
191 .run_until(async {
192 assert!(ON_RT_THREAD.with(|cell| cell.get()));
193 let join = task::spawn_local(async move {
194 assert!(ON_RT_THREAD.with(|cell| cell.get()));
195 task::spawn_blocking(|| {
196 assert!(
197 !ON_RT_THREAD.with(|cell| cell.get()),
198 "blocking must not run on the local task set's thread"
199 );
200 })
201 .await
202 .unwrap();
203 assert!(ON_RT_THREAD.with(|cell| cell.get()));
204 });
205 join.await.unwrap();
206 })
207 .await;
208 }
209
210 #[cfg(not(tokio_wasi))] // Wasi doesn't support threads
211 #[tokio::test(flavor = "multi_thread")]
all_spawns_are_local()212 async fn all_spawns_are_local() {
213 use futures::future;
214 thread_local! {
215 static ON_RT_THREAD: Cell<bool> = Cell::new(false);
216 }
217
218 ON_RT_THREAD.with(|cell| cell.set(true));
219
220 LocalSet::new()
221 .run_until(async {
222 assert!(ON_RT_THREAD.with(|cell| cell.get()));
223 let handles = (0..128)
224 .map(|_| {
225 task::spawn_local(async {
226 assert!(ON_RT_THREAD.with(|cell| cell.get()));
227 })
228 })
229 .collect::<Vec<_>>();
230 for joined in future::join_all(handles).await {
231 joined.unwrap();
232 }
233 })
234 .await;
235 }
236
237 #[cfg(not(tokio_wasi))] // Wasi doesn't support threads
238 #[tokio::test(flavor = "multi_thread")]
nested_spawn_is_local()239 async fn nested_spawn_is_local() {
240 thread_local! {
241 static ON_RT_THREAD: Cell<bool> = Cell::new(false);
242 }
243
244 ON_RT_THREAD.with(|cell| cell.set(true));
245
246 LocalSet::new()
247 .run_until(async {
248 assert!(ON_RT_THREAD.with(|cell| cell.get()));
249 task::spawn_local(async {
250 assert!(ON_RT_THREAD.with(|cell| cell.get()));
251 task::spawn_local(async {
252 assert!(ON_RT_THREAD.with(|cell| cell.get()));
253 task::spawn_local(async {
254 assert!(ON_RT_THREAD.with(|cell| cell.get()));
255 task::spawn_local(async {
256 assert!(ON_RT_THREAD.with(|cell| cell.get()));
257 })
258 .await
259 .unwrap();
260 })
261 .await
262 .unwrap();
263 })
264 .await
265 .unwrap();
266 })
267 .await
268 .unwrap();
269 })
270 .await;
271 }
272
273 #[cfg(not(tokio_wasi))] // Wasi doesn't support threads
274 #[test]
join_local_future_elsewhere()275 fn join_local_future_elsewhere() {
276 thread_local! {
277 static ON_RT_THREAD: Cell<bool> = Cell::new(false);
278 }
279
280 ON_RT_THREAD.with(|cell| cell.set(true));
281
282 let rt = runtime::Runtime::new().unwrap();
283 let local = LocalSet::new();
284 local.block_on(&rt, async move {
285 let (tx, rx) = oneshot::channel();
286 let join = task::spawn_local(async move {
287 assert!(
288 ON_RT_THREAD.with(|cell| cell.get()),
289 "local task must run on local thread, no matter where it is awaited"
290 );
291 rx.await.unwrap();
292
293 "hello world"
294 });
295 let join2 = task::spawn(async move {
296 assert!(
297 !ON_RT_THREAD.with(|cell| cell.get()),
298 "spawned task should be on a worker"
299 );
300
301 tx.send(()).expect("task shouldn't have ended yet");
302
303 join.await.expect("task should complete successfully");
304 });
305 join2.await.unwrap()
306 });
307 }
308
309 // Tests for <https://github.com/tokio-rs/tokio/issues/4973>
310 #[cfg(not(tokio_wasi))] // Wasi doesn't support threads
311 #[tokio::test(flavor = "multi_thread")]
localset_in_thread_local()312 async fn localset_in_thread_local() {
313 thread_local! {
314 static LOCAL_SET: LocalSet = LocalSet::new();
315 }
316
317 // holds runtime thread until end of main fn.
318 let (_tx, rx) = oneshot::channel::<()>();
319 let handle = tokio::runtime::Handle::current();
320
321 std::thread::spawn(move || {
322 LOCAL_SET.with(|local_set| {
323 handle.block_on(local_set.run_until(async move {
324 let _ = rx.await;
325 }))
326 });
327 });
328 }
329
330 #[test]
drop_cancels_tasks()331 fn drop_cancels_tasks() {
332 use std::rc::Rc;
333
334 // This test reproduces issue #1842
335 let rt = rt();
336 let rc1 = Rc::new(());
337 let rc2 = rc1.clone();
338
339 let (started_tx, started_rx) = oneshot::channel();
340
341 let local = LocalSet::new();
342 local.spawn_local(async move {
343 // Move this in
344 let _rc2 = rc2;
345
346 started_tx.send(()).unwrap();
347 futures::future::pending::<()>().await;
348 });
349
350 local.block_on(&rt, async {
351 started_rx.await.unwrap();
352 });
353 drop(local);
354 drop(rt);
355
356 assert_eq!(1, Rc::strong_count(&rc1));
357 }
358
359 /// Runs a test function in a separate thread, and panics if the test does not
360 /// complete within the specified timeout, or if the test function panics.
361 ///
362 /// This is intended for running tests whose failure mode is a hang or infinite
363 /// loop that cannot be detected otherwise.
with_timeout(timeout: Duration, f: impl FnOnce() + Send + 'static)364 fn with_timeout(timeout: Duration, f: impl FnOnce() + Send + 'static) {
365 use std::sync::mpsc::RecvTimeoutError;
366
367 let (done_tx, done_rx) = std::sync::mpsc::channel();
368 let thread = std::thread::spawn(move || {
369 f();
370
371 // Send a message on the channel so that the test thread can
372 // determine if we have entered an infinite loop:
373 done_tx.send(()).unwrap();
374 });
375
376 // Since the failure mode of this test is an infinite loop, rather than
377 // something we can easily make assertions about, we'll run it in a
378 // thread. When the test thread finishes, it will send a message on a
379 // channel to this thread. We'll wait for that message with a fairly
380 // generous timeout, and if we don't receive it, we assume the test
381 // thread has hung.
382 //
383 // Note that it should definitely complete in under a minute, but just
384 // in case CI is slow, we'll give it a long timeout.
385 match done_rx.recv_timeout(timeout) {
386 Err(RecvTimeoutError::Timeout) => panic!(
387 "test did not complete within {:?} seconds, \
388 we have (probably) entered an infinite loop!",
389 timeout,
390 ),
391 // Did the test thread panic? We'll find out for sure when we `join`
392 // with it.
393 Err(RecvTimeoutError::Disconnected) => {}
394 // Test completed successfully!
395 Ok(()) => {}
396 }
397
398 thread.join().expect("test thread should not panic!")
399 }
400
401 #[cfg_attr(tokio_wasi, ignore = "`unwrap()` in `with_timeout()` panics on Wasi")]
402 #[test]
drop_cancels_remote_tasks()403 fn drop_cancels_remote_tasks() {
404 // This test reproduces issue #1885.
405 with_timeout(Duration::from_secs(60), || {
406 let (tx, mut rx) = mpsc::channel::<()>(1024);
407
408 let rt = rt();
409
410 let local = LocalSet::new();
411 local.spawn_local(async move { while rx.recv().await.is_some() {} });
412 local.block_on(&rt, async {
413 time::sleep(Duration::from_millis(1)).await;
414 });
415
416 drop(tx);
417
418 // This enters an infinite loop if the remote notified tasks are not
419 // properly cancelled.
420 drop(local);
421 });
422 }
423
424 #[cfg_attr(
425 tokio_wasi,
426 ignore = "FIXME: `task::spawn_local().await.unwrap()` panics on Wasi"
427 )]
428 #[test]
local_tasks_wake_join_all()429 fn local_tasks_wake_join_all() {
430 // This test reproduces issue #2460.
431 with_timeout(Duration::from_secs(60), || {
432 use futures::future::join_all;
433 use tokio::task::LocalSet;
434
435 let rt = rt();
436 let set = LocalSet::new();
437 let mut handles = Vec::new();
438
439 for _ in 1..=128 {
440 handles.push(set.spawn_local(async move {
441 tokio::task::spawn_local(async move {}).await.unwrap();
442 }));
443 }
444
445 rt.block_on(set.run_until(join_all(handles)));
446 });
447 }
448
449 #[cfg(not(tokio_wasi))] // Wasi doesn't support panic recovery
450 #[test]
local_tasks_are_polled_after_tick()451 fn local_tasks_are_polled_after_tick() {
452 // This test depends on timing, so we run it up to five times.
453 for _ in 0..4 {
454 let res = std::panic::catch_unwind(local_tasks_are_polled_after_tick_inner);
455 if res.is_ok() {
456 // success
457 return;
458 }
459 }
460
461 // Test failed 4 times. Try one more time without catching panics. If it
462 // fails again, the test fails.
463 local_tasks_are_polled_after_tick_inner();
464 }
465
466 #[cfg(not(tokio_wasi))] // Wasi doesn't support panic recovery
467 #[tokio::main(flavor = "current_thread")]
local_tasks_are_polled_after_tick_inner()468 async fn local_tasks_are_polled_after_tick_inner() {
469 // Reproduces issues #1899 and #1900
470
471 static RX1: AtomicUsize = AtomicUsize::new(0);
472 static RX2: AtomicUsize = AtomicUsize::new(0);
473 const EXPECTED: usize = 500;
474
475 RX1.store(0, SeqCst);
476 RX2.store(0, SeqCst);
477
478 let (tx, mut rx) = mpsc::unbounded_channel();
479
480 let local = LocalSet::new();
481
482 local
483 .run_until(async {
484 let task2 = task::spawn(async move {
485 // Wait a bit
486 time::sleep(Duration::from_millis(10)).await;
487
488 let mut oneshots = Vec::with_capacity(EXPECTED);
489
490 // Send values
491 for _ in 0..EXPECTED {
492 let (oneshot_tx, oneshot_rx) = oneshot::channel();
493 oneshots.push(oneshot_tx);
494 tx.send(oneshot_rx).unwrap();
495 }
496
497 time::sleep(Duration::from_millis(10)).await;
498
499 for tx in oneshots.drain(..) {
500 tx.send(()).unwrap();
501 }
502
503 loop {
504 time::sleep(Duration::from_millis(20)).await;
505 let rx1 = RX1.load(SeqCst);
506 let rx2 = RX2.load(SeqCst);
507
508 if rx1 == EXPECTED && rx2 == EXPECTED {
509 break;
510 }
511 }
512 });
513
514 while let Some(oneshot) = rx.recv().await {
515 RX1.fetch_add(1, SeqCst);
516
517 task::spawn_local(async move {
518 oneshot.await.unwrap();
519 RX2.fetch_add(1, SeqCst);
520 });
521 }
522
523 task2.await.unwrap();
524 })
525 .await;
526 }
527
528 #[tokio::test]
acquire_mutex_in_drop()529 async fn acquire_mutex_in_drop() {
530 use futures::future::pending;
531
532 let (tx1, rx1) = oneshot::channel();
533 let (tx2, rx2) = oneshot::channel();
534 let local = LocalSet::new();
535
536 local.spawn_local(async move {
537 let _ = rx2.await;
538 unreachable!();
539 });
540
541 local.spawn_local(async move {
542 let _ = rx1.await;
543 tx2.send(()).unwrap();
544 unreachable!();
545 });
546
547 // Spawn a task that will never notify
548 local.spawn_local(async move {
549 pending::<()>().await;
550 tx1.send(()).unwrap();
551 });
552
553 // Tick the loop
554 local
555 .run_until(async {
556 task::yield_now().await;
557 })
558 .await;
559
560 // Drop the LocalSet
561 drop(local);
562 }
563
564 #[tokio::test]
spawn_wakes_localset()565 async fn spawn_wakes_localset() {
566 let local = LocalSet::new();
567 futures::select! {
568 _ = local.run_until(pending::<()>()).fuse() => unreachable!(),
569 ret = async { local.spawn_local(ready(())).await.unwrap()}.fuse() => ret
570 }
571 }
572
573 #[test]
store_local_set_in_thread_local_with_runtime()574 fn store_local_set_in_thread_local_with_runtime() {
575 use tokio::runtime::Runtime;
576
577 thread_local! {
578 static CURRENT: RtAndLocalSet = RtAndLocalSet::new();
579 }
580
581 struct RtAndLocalSet {
582 rt: Runtime,
583 local: LocalSet,
584 }
585
586 impl RtAndLocalSet {
587 fn new() -> RtAndLocalSet {
588 RtAndLocalSet {
589 rt: tokio::runtime::Builder::new_current_thread()
590 .enable_all()
591 .build()
592 .unwrap(),
593 local: LocalSet::new(),
594 }
595 }
596
597 async fn inner_method(&self) {
598 self.local
599 .run_until(async move {
600 tokio::task::spawn_local(async {});
601 })
602 .await
603 }
604
605 fn method(&self) {
606 self.rt.block_on(self.inner_method());
607 }
608 }
609
610 CURRENT.with(|f| {
611 f.method();
612 });
613 }
614
615 #[cfg(tokio_unstable)]
616 mod unstable {
617 use tokio::runtime::UnhandledPanic;
618 use tokio::task::LocalSet;
619
620 #[tokio::test]
621 #[should_panic(
622 expected = "a spawned task panicked and the LocalSet is configured to shutdown on unhandled panic"
623 )]
shutdown_on_panic()624 async fn shutdown_on_panic() {
625 LocalSet::new()
626 .unhandled_panic(UnhandledPanic::ShutdownRuntime)
627 .run_until(async {
628 tokio::task::spawn_local(async {
629 panic!("boom");
630 });
631
632 futures::future::pending::<()>().await;
633 })
634 .await;
635 }
636
637 // This test compares that, when the task driving `run_until` has already
638 // consumed budget, the `run_until` future has less budget than a "spawned"
639 // task.
640 //
641 // "Budget" is a fuzzy metric as the Tokio runtime is able to change values
642 // internally. This is why the test uses indirection to test this.
643 #[tokio::test]
run_until_does_not_get_own_budget()644 async fn run_until_does_not_get_own_budget() {
645 // Consume some budget
646 tokio::task::consume_budget().await;
647
648 LocalSet::new()
649 .run_until(async {
650 let spawned = tokio::spawn(async {
651 let mut spawned_n = 0;
652
653 {
654 let mut spawned = tokio_test::task::spawn(async {
655 loop {
656 spawned_n += 1;
657 tokio::task::consume_budget().await;
658 }
659 });
660 // Poll once
661 assert!(!spawned.poll().is_ready());
662 }
663
664 spawned_n
665 });
666
667 let mut run_until_n = 0;
668 {
669 let mut run_until = tokio_test::task::spawn(async {
670 loop {
671 run_until_n += 1;
672 tokio::task::consume_budget().await;
673 }
674 });
675 // Poll once
676 assert!(!run_until.poll().is_ready());
677 }
678
679 let spawned_n = spawned.await.unwrap();
680 assert_ne!(spawned_n, 0);
681 assert_ne!(run_until_n, 0);
682 assert!(spawned_n > run_until_n);
683 })
684 .await
685 }
686 }
687
rt() -> runtime::Runtime688 fn rt() -> runtime::Runtime {
689 tokio::runtime::Builder::new_current_thread()
690 .enable_all()
691 .build()
692 .unwrap()
693 }
694