• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 #![warn(rust_2018_idioms)]
2 #![cfg(feature = "full")]
3 
4 use std::sync::Arc;
5 use std::thread::sleep;
6 use tokio::time::Duration;
7 
8 use tokio::runtime::Builder;
9 
10 struct PanicOnDrop;
11 
12 impl Drop for PanicOnDrop {
drop(&mut self)13     fn drop(&mut self) {
14         panic!("Well what did you expect would happen...");
15     }
16 }
17 
18 /// Checks that a suspended task can be aborted without panicking as reported in
19 /// issue #3157: <https://github.com/tokio-rs/tokio/issues/3157>.
20 #[test]
test_abort_without_panic_3157()21 fn test_abort_without_panic_3157() {
22     let rt = Builder::new_multi_thread()
23         .enable_time()
24         .worker_threads(1)
25         .build()
26         .unwrap();
27 
28     rt.block_on(async move {
29         let handle = tokio::spawn(async move {
30             println!("task started");
31             tokio::time::sleep(Duration::new(100, 0)).await
32         });
33 
34         // wait for task to sleep.
35         tokio::time::sleep(Duration::from_millis(10)).await;
36 
37         handle.abort();
38         let _ = handle.await;
39     });
40 }
41 
42 /// Checks that a suspended task can be aborted inside of a current_thread
43 /// executor without panicking as reported in issue #3662:
44 /// <https://github.com/tokio-rs/tokio/issues/3662>.
45 #[test]
test_abort_without_panic_3662()46 fn test_abort_without_panic_3662() {
47     use std::sync::atomic::{AtomicBool, Ordering};
48     use std::sync::Arc;
49 
50     struct DropCheck(Arc<AtomicBool>);
51 
52     impl Drop for DropCheck {
53         fn drop(&mut self) {
54             self.0.store(true, Ordering::SeqCst);
55         }
56     }
57 
58     let rt = Builder::new_current_thread().build().unwrap();
59 
60     rt.block_on(async move {
61         let drop_flag = Arc::new(AtomicBool::new(false));
62         let drop_check = DropCheck(drop_flag.clone());
63 
64         let j = tokio::spawn(async move {
65             // NB: just grab the drop check here so that it becomes part of the
66             // task.
67             let _drop_check = drop_check;
68             futures::future::pending::<()>().await;
69         });
70 
71         let drop_flag2 = drop_flag.clone();
72 
73         let task = std::thread::spawn(move || {
74             // This runs in a separate thread so it doesn't have immediate
75             // thread-local access to the executor. It does however transition
76             // the underlying task to be completed, which will cause it to be
77             // dropped (but not in this thread).
78             assert!(!drop_flag2.load(Ordering::SeqCst));
79             j.abort();
80             j
81         })
82         .join()
83         .unwrap();
84 
85         let result = task.await;
86         assert!(drop_flag.load(Ordering::SeqCst));
87         assert!(result.unwrap_err().is_cancelled());
88 
89         // Note: We do the following to trigger a deferred task cleanup.
90         //
91         // The relevant piece of code you want to look at is in:
92         // `Inner::block_on` of `basic_scheduler.rs`.
93         //
94         // We cause the cleanup to happen by having a poll return Pending once
95         // so that the scheduler can go into the "auxiliary tasks" mode, at
96         // which point the task is removed from the scheduler.
97         let i = tokio::spawn(async move {
98             tokio::task::yield_now().await;
99         });
100 
101         i.await.unwrap();
102     });
103 }
104 
105 /// Checks that a suspended LocalSet task can be aborted from a remote thread
106 /// without panicking and without running the tasks destructor on the wrong thread.
107 /// <https://github.com/tokio-rs/tokio/issues/3929>
108 #[test]
remote_abort_local_set_3929()109 fn remote_abort_local_set_3929() {
110     struct DropCheck {
111         created_on: std::thread::ThreadId,
112         not_send: std::marker::PhantomData<*const ()>,
113     }
114 
115     impl DropCheck {
116         fn new() -> Self {
117             Self {
118                 created_on: std::thread::current().id(),
119                 not_send: std::marker::PhantomData,
120             }
121         }
122     }
123     impl Drop for DropCheck {
124         fn drop(&mut self) {
125             if std::thread::current().id() != self.created_on {
126                 panic!("non-Send value dropped in another thread!");
127             }
128         }
129     }
130 
131     let rt = Builder::new_current_thread().build().unwrap();
132     let local = tokio::task::LocalSet::new();
133 
134     let check = DropCheck::new();
135     let jh = local.spawn_local(async move {
136         futures::future::pending::<()>().await;
137         drop(check);
138     });
139 
140     let jh2 = std::thread::spawn(move || {
141         sleep(Duration::from_millis(10));
142         jh.abort();
143     });
144 
145     rt.block_on(local);
146     jh2.join().unwrap();
147 }
148 
149 /// Checks that a suspended task can be aborted even if the `JoinHandle` is immediately dropped.
150 /// issue #3964: <https://github.com/tokio-rs/tokio/issues/3964>.
151 #[test]
test_abort_wakes_task_3964()152 fn test_abort_wakes_task_3964() {
153     let rt = Builder::new_current_thread().enable_time().build().unwrap();
154 
155     rt.block_on(async move {
156         let notify_dropped = Arc::new(());
157         let weak_notify_dropped = Arc::downgrade(&notify_dropped);
158 
159         let handle = tokio::spawn(async move {
160             // Make sure the Arc is moved into the task
161             let _notify_dropped = notify_dropped;
162             println!("task started");
163             tokio::time::sleep(Duration::new(100, 0)).await
164         });
165 
166         // wait for task to sleep.
167         tokio::time::sleep(Duration::from_millis(10)).await;
168 
169         handle.abort();
170         drop(handle);
171 
172         // wait for task to abort.
173         tokio::time::sleep(Duration::from_millis(10)).await;
174 
175         // Check that the Arc has been dropped.
176         assert!(weak_notify_dropped.upgrade().is_none());
177     });
178 }
179 
180 /// Checks that aborting a task whose destructor panics does not allow the
181 /// panic to escape the task.
182 #[test]
183 #[cfg(not(target_os = "android"))]
test_abort_task_that_panics_on_drop_contained()184 fn test_abort_task_that_panics_on_drop_contained() {
185     let rt = Builder::new_current_thread().enable_time().build().unwrap();
186 
187     rt.block_on(async move {
188         let handle = tokio::spawn(async move {
189             // Make sure the Arc is moved into the task
190             let _panic_dropped = PanicOnDrop;
191             println!("task started");
192             tokio::time::sleep(Duration::new(100, 0)).await
193         });
194 
195         // wait for task to sleep.
196         tokio::time::sleep(Duration::from_millis(10)).await;
197 
198         handle.abort();
199         drop(handle);
200 
201         // wait for task to abort.
202         tokio::time::sleep(Duration::from_millis(10)).await;
203     });
204 }
205 
206 /// Checks that aborting a task whose destructor panics has the expected result.
207 #[test]
208 #[cfg(not(target_os = "android"))]
test_abort_task_that_panics_on_drop_returned()209 fn test_abort_task_that_panics_on_drop_returned() {
210     let rt = Builder::new_current_thread().enable_time().build().unwrap();
211 
212     rt.block_on(async move {
213         let handle = tokio::spawn(async move {
214             // Make sure the Arc is moved into the task
215             let _panic_dropped = PanicOnDrop;
216             println!("task started");
217             tokio::time::sleep(Duration::new(100, 0)).await
218         });
219 
220         // wait for task to sleep.
221         tokio::time::sleep(Duration::from_millis(10)).await;
222 
223         handle.abort();
224         assert!(handle.await.unwrap_err().is_panic());
225     });
226 }
227