• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 #![warn(rust_2018_idioms)]
2 #![cfg(all(feature = "full", not(target_os = "wasi")))] // Wasi doesn't support panic recovery
3 
4 use std::sync::Arc;
5 use std::thread::sleep;
6 use tokio::time::Duration;
7 
8 use tokio::runtime::Builder;
9 
10 struct PanicOnDrop;
11 
12 impl Drop for PanicOnDrop {
drop(&mut self)13     fn drop(&mut self) {
14         panic!("Well what did you expect would happen...");
15     }
16 }
17 
18 /// Checks that a suspended task can be aborted without panicking as reported in
19 /// issue #3157: <https://github.com/tokio-rs/tokio/issues/3157>.
20 #[test]
test_abort_without_panic_3157()21 fn test_abort_without_panic_3157() {
22     let rt = Builder::new_multi_thread()
23         .enable_time()
24         .worker_threads(1)
25         .build()
26         .unwrap();
27 
28     rt.block_on(async move {
29         let handle = tokio::spawn(async move { tokio::time::sleep(Duration::new(100, 0)).await });
30 
31         // wait for task to sleep.
32         tokio::time::sleep(Duration::from_millis(10)).await;
33 
34         handle.abort();
35         let _ = handle.await;
36     });
37 }
38 
39 /// Checks that a suspended task can be aborted inside of a current_thread
40 /// executor without panicking as reported in issue #3662:
41 /// <https://github.com/tokio-rs/tokio/issues/3662>.
42 #[test]
test_abort_without_panic_3662()43 fn test_abort_without_panic_3662() {
44     use std::sync::atomic::{AtomicBool, Ordering};
45     use std::sync::Arc;
46 
47     struct DropCheck(Arc<AtomicBool>);
48 
49     impl Drop for DropCheck {
50         fn drop(&mut self) {
51             self.0.store(true, Ordering::SeqCst);
52         }
53     }
54 
55     let rt = Builder::new_current_thread().build().unwrap();
56 
57     rt.block_on(async move {
58         let drop_flag = Arc::new(AtomicBool::new(false));
59         let drop_check = DropCheck(drop_flag.clone());
60 
61         let j = tokio::spawn(async move {
62             // NB: just grab the drop check here so that it becomes part of the
63             // task.
64             let _drop_check = drop_check;
65             futures::future::pending::<()>().await;
66         });
67 
68         let drop_flag2 = drop_flag.clone();
69 
70         let task = std::thread::spawn(move || {
71             // This runs in a separate thread so it doesn't have immediate
72             // thread-local access to the executor. It does however transition
73             // the underlying task to be completed, which will cause it to be
74             // dropped (but not in this thread).
75             assert!(!drop_flag2.load(Ordering::SeqCst));
76             j.abort();
77             j
78         })
79         .join()
80         .unwrap();
81 
82         let result = task.await;
83         assert!(drop_flag.load(Ordering::SeqCst));
84         assert!(result.unwrap_err().is_cancelled());
85 
86         // Note: We do the following to trigger a deferred task cleanup.
87         //
88         // The relevant piece of code you want to look at is in:
89         // `Inner::block_on` of `scheduler/current_thread.rs`.
90         //
91         // We cause the cleanup to happen by having a poll return Pending once
92         // so that the scheduler can go into the "auxiliary tasks" mode, at
93         // which point the task is removed from the scheduler.
94         let i = tokio::spawn(async move {
95             tokio::task::yield_now().await;
96         });
97 
98         i.await.unwrap();
99     });
100 }
101 
102 /// Checks that a suspended LocalSet task can be aborted from a remote thread
103 /// without panicking and without running the tasks destructor on the wrong thread.
104 /// <https://github.com/tokio-rs/tokio/issues/3929>
105 #[test]
remote_abort_local_set_3929()106 fn remote_abort_local_set_3929() {
107     struct DropCheck {
108         created_on: std::thread::ThreadId,
109         not_send: std::marker::PhantomData<*const ()>,
110     }
111 
112     impl DropCheck {
113         fn new() -> Self {
114             Self {
115                 created_on: std::thread::current().id(),
116                 not_send: std::marker::PhantomData,
117             }
118         }
119     }
120     impl Drop for DropCheck {
121         fn drop(&mut self) {
122             if std::thread::current().id() != self.created_on {
123                 panic!("non-Send value dropped in another thread!");
124             }
125         }
126     }
127 
128     let rt = Builder::new_current_thread().build().unwrap();
129     let local = tokio::task::LocalSet::new();
130 
131     let check = DropCheck::new();
132     let jh = local.spawn_local(async move {
133         futures::future::pending::<()>().await;
134         drop(check);
135     });
136 
137     let jh2 = std::thread::spawn(move || {
138         sleep(Duration::from_millis(10));
139         jh.abort();
140     });
141 
142     rt.block_on(local);
143     jh2.join().unwrap();
144 }
145 
146 /// Checks that a suspended task can be aborted even if the `JoinHandle` is immediately dropped.
147 /// issue #3964: <https://github.com/tokio-rs/tokio/issues/3964>.
148 #[test]
test_abort_wakes_task_3964()149 fn test_abort_wakes_task_3964() {
150     let rt = Builder::new_current_thread().enable_time().build().unwrap();
151 
152     rt.block_on(async move {
153         let notify_dropped = Arc::new(());
154         let weak_notify_dropped = Arc::downgrade(&notify_dropped);
155 
156         let handle = tokio::spawn(async move {
157             // Make sure the Arc is moved into the task
158             let _notify_dropped = notify_dropped;
159             tokio::time::sleep(Duration::new(100, 0)).await
160         });
161 
162         // wait for task to sleep.
163         tokio::time::sleep(Duration::from_millis(10)).await;
164 
165         handle.abort();
166         drop(handle);
167 
168         // wait for task to abort.
169         tokio::time::sleep(Duration::from_millis(10)).await;
170 
171         // Check that the Arc has been dropped.
172         assert!(weak_notify_dropped.upgrade().is_none());
173     });
174 }
175 
176 /// Checks that aborting a task whose destructor panics does not allow the
177 /// panic to escape the task.
178 #[test]
179 #[cfg(panic = "unwind")]
test_abort_task_that_panics_on_drop_contained()180 fn test_abort_task_that_panics_on_drop_contained() {
181     let rt = Builder::new_current_thread().enable_time().build().unwrap();
182 
183     rt.block_on(async move {
184         let handle = tokio::spawn(async move {
185             // Make sure the Arc is moved into the task
186             let _panic_dropped = PanicOnDrop;
187             tokio::time::sleep(Duration::new(100, 0)).await
188         });
189 
190         // wait for task to sleep.
191         tokio::time::sleep(Duration::from_millis(10)).await;
192 
193         handle.abort();
194         drop(handle);
195 
196         // wait for task to abort.
197         tokio::time::sleep(Duration::from_millis(10)).await;
198     });
199 }
200 
201 /// Checks that aborting a task whose destructor panics has the expected result.
202 #[test]
203 #[cfg(panic = "unwind")]
test_abort_task_that_panics_on_drop_returned()204 fn test_abort_task_that_panics_on_drop_returned() {
205     let rt = Builder::new_current_thread().enable_time().build().unwrap();
206 
207     rt.block_on(async move {
208         let handle = tokio::spawn(async move {
209             // Make sure the Arc is moved into the task
210             let _panic_dropped = PanicOnDrop;
211             tokio::time::sleep(Duration::new(100, 0)).await
212         });
213 
214         // wait for task to sleep.
215         tokio::time::sleep(Duration::from_millis(10)).await;
216 
217         handle.abort();
218         assert!(handle.await.unwrap_err().is_panic());
219     });
220 }
221