1 use std::future::Future;
2 use std::panic;
3 use std::pin::Pin;
4 use std::task::{Context, Poll};
5
6 use crate::runtime::Builder;
7 use crate::sync::oneshot;
8 use crate::task::JoinHandle;
9
10 use futures::future::FutureExt;
11
12 // Enums for each option in the combinations being tested
13
14 #[derive(Copy, Clone, Debug, PartialEq)]
15 enum CombiRuntime {
16 CurrentThread,
17 Multi1,
18 Multi2,
19 }
20 #[derive(Copy, Clone, Debug, PartialEq)]
21 enum CombiLocalSet {
22 Yes,
23 No,
24 }
25 #[derive(Copy, Clone, Debug, PartialEq)]
26 enum CombiTask {
27 PanicOnRun,
28 PanicOnDrop,
29 PanicOnRunAndDrop,
30 NoPanic,
31 }
32 #[derive(Copy, Clone, Debug, PartialEq)]
33 enum CombiOutput {
34 PanicOnDrop,
35 NoPanic,
36 }
37 #[derive(Copy, Clone, Debug, PartialEq)]
38 enum CombiJoinInterest {
39 Polled,
40 NotPolled,
41 }
42 #[allow(clippy::enum_variant_names)] // we aren't using glob imports
43 #[derive(Copy, Clone, Debug, PartialEq)]
44 enum CombiJoinHandle {
45 DropImmediately = 1,
46 DropFirstPoll = 2,
47 DropAfterNoConsume = 3,
48 DropAfterConsume = 4,
49 }
50 #[derive(Copy, Clone, Debug, PartialEq)]
51 enum CombiAbort {
52 NotAborted = 0,
53 AbortedImmediately = 1,
54 AbortedFirstPoll = 2,
55 AbortedAfterFinish = 3,
56 AbortedAfterConsumeOutput = 4,
57 }
58
59 #[test]
test_combinations()60 fn test_combinations() {
61 let mut rt = &[
62 CombiRuntime::CurrentThread,
63 CombiRuntime::Multi1,
64 CombiRuntime::Multi2,
65 ][..];
66
67 if cfg!(miri) {
68 rt = &[CombiRuntime::CurrentThread];
69 }
70
71 let ls = [CombiLocalSet::Yes, CombiLocalSet::No];
72 let task = [
73 CombiTask::NoPanic,
74 CombiTask::PanicOnRun,
75 CombiTask::PanicOnDrop,
76 CombiTask::PanicOnRunAndDrop,
77 ];
78 let output = [CombiOutput::NoPanic, CombiOutput::PanicOnDrop];
79 let ji = [CombiJoinInterest::Polled, CombiJoinInterest::NotPolled];
80 let jh = [
81 CombiJoinHandle::DropImmediately,
82 CombiJoinHandle::DropFirstPoll,
83 CombiJoinHandle::DropAfterNoConsume,
84 CombiJoinHandle::DropAfterConsume,
85 ];
86 let abort = [
87 CombiAbort::NotAborted,
88 CombiAbort::AbortedImmediately,
89 CombiAbort::AbortedFirstPoll,
90 CombiAbort::AbortedAfterFinish,
91 CombiAbort::AbortedAfterConsumeOutput,
92 ];
93
94 for rt in rt.iter().copied() {
95 for ls in ls.iter().copied() {
96 for task in task.iter().copied() {
97 for output in output.iter().copied() {
98 for ji in ji.iter().copied() {
99 for jh in jh.iter().copied() {
100 for abort in abort.iter().copied() {
101 test_combination(rt, ls, task, output, ji, jh, abort);
102 }
103 }
104 }
105 }
106 }
107 }
108 }
109 }
110
test_combination( rt: CombiRuntime, ls: CombiLocalSet, task: CombiTask, output: CombiOutput, ji: CombiJoinInterest, jh: CombiJoinHandle, abort: CombiAbort, )111 fn test_combination(
112 rt: CombiRuntime,
113 ls: CombiLocalSet,
114 task: CombiTask,
115 output: CombiOutput,
116 ji: CombiJoinInterest,
117 jh: CombiJoinHandle,
118 abort: CombiAbort,
119 ) {
120 if (jh as usize) < (abort as usize) {
121 // drop before abort not possible
122 return;
123 }
124 if (task == CombiTask::PanicOnDrop) && (output == CombiOutput::PanicOnDrop) {
125 // this causes double panic
126 return;
127 }
128 if (task == CombiTask::PanicOnRunAndDrop) && (abort != CombiAbort::AbortedImmediately) {
129 // this causes double panic
130 return;
131 }
132
133 println!("Runtime {:?}, LocalSet {:?}, Task {:?}, Output {:?}, JoinInterest {:?}, JoinHandle {:?}, Abort {:?}", rt, ls, task, output, ji, jh, abort);
134
135 // A runtime optionally with a LocalSet
136 struct Rt {
137 rt: crate::runtime::Runtime,
138 ls: Option<crate::task::LocalSet>,
139 }
140 impl Rt {
141 fn new(rt: CombiRuntime, ls: CombiLocalSet) -> Self {
142 let rt = match rt {
143 CombiRuntime::CurrentThread => Builder::new_current_thread().build().unwrap(),
144 CombiRuntime::Multi1 => Builder::new_multi_thread()
145 .worker_threads(1)
146 .build()
147 .unwrap(),
148 CombiRuntime::Multi2 => Builder::new_multi_thread()
149 .worker_threads(2)
150 .build()
151 .unwrap(),
152 };
153
154 let ls = match ls {
155 CombiLocalSet::Yes => Some(crate::task::LocalSet::new()),
156 CombiLocalSet::No => None,
157 };
158
159 Self { rt, ls }
160 }
161 fn block_on<T>(&self, task: T) -> T::Output
162 where
163 T: Future,
164 {
165 match &self.ls {
166 Some(ls) => ls.block_on(&self.rt, task),
167 None => self.rt.block_on(task),
168 }
169 }
170 fn spawn<T>(&self, task: T) -> JoinHandle<T::Output>
171 where
172 T: Future + Send + 'static,
173 T::Output: Send + 'static,
174 {
175 match &self.ls {
176 Some(ls) => ls.spawn_local(task),
177 None => self.rt.spawn(task),
178 }
179 }
180 }
181
182 // The type used for the output of the future
183 struct Output {
184 panic_on_drop: bool,
185 on_drop: Option<oneshot::Sender<()>>,
186 }
187 impl Output {
188 fn disarm(&mut self) {
189 self.panic_on_drop = false;
190 }
191 }
192 impl Drop for Output {
193 fn drop(&mut self) {
194 let _ = self.on_drop.take().unwrap().send(());
195 if self.panic_on_drop {
196 panic!("Panicking in Output");
197 }
198 }
199 }
200
201 // A wrapper around the future that is spawned
202 struct FutWrapper<F> {
203 inner: F,
204 on_drop: Option<oneshot::Sender<()>>,
205 panic_on_drop: bool,
206 }
207 impl<F: Future> Future for FutWrapper<F> {
208 type Output = F::Output;
209 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<F::Output> {
210 unsafe {
211 let me = Pin::into_inner_unchecked(self);
212 let inner = Pin::new_unchecked(&mut me.inner);
213 inner.poll(cx)
214 }
215 }
216 }
217 impl<F> Drop for FutWrapper<F> {
218 fn drop(&mut self) {
219 let _: Result<(), ()> = self.on_drop.take().unwrap().send(());
220 if self.panic_on_drop {
221 panic!("Panicking in FutWrapper");
222 }
223 }
224 }
225
226 // The channels passed to the task
227 struct Signals {
228 on_first_poll: Option<oneshot::Sender<()>>,
229 wait_complete: Option<oneshot::Receiver<()>>,
230 on_output_drop: Option<oneshot::Sender<()>>,
231 }
232
233 // The task we will spawn
234 async fn my_task(mut signal: Signals, task: CombiTask, out: CombiOutput) -> Output {
235 // Signal that we have been polled once
236 let _ = signal.on_first_poll.take().unwrap().send(());
237
238 // Wait for a signal, then complete the future
239 let _ = signal.wait_complete.take().unwrap().await;
240
241 // If the task gets past wait_complete without yielding, then aborts
242 // may not be caught without this yield_now.
243 crate::task::yield_now().await;
244
245 if task == CombiTask::PanicOnRun || task == CombiTask::PanicOnRunAndDrop {
246 panic!("Panicking in my_task on {:?}", std::thread::current().id());
247 }
248
249 Output {
250 panic_on_drop: out == CombiOutput::PanicOnDrop,
251 on_drop: signal.on_output_drop.take(),
252 }
253 }
254
255 let rt = Rt::new(rt, ls);
256
257 let (on_first_poll, wait_first_poll) = oneshot::channel();
258 let (on_complete, wait_complete) = oneshot::channel();
259 let (on_future_drop, wait_future_drop) = oneshot::channel();
260 let (on_output_drop, wait_output_drop) = oneshot::channel();
261 let signal = Signals {
262 on_first_poll: Some(on_first_poll),
263 wait_complete: Some(wait_complete),
264 on_output_drop: Some(on_output_drop),
265 };
266
267 // === Spawn task ===
268 let mut handle = Some(rt.spawn(FutWrapper {
269 inner: my_task(signal, task, output),
270 on_drop: Some(on_future_drop),
271 panic_on_drop: task == CombiTask::PanicOnDrop || task == CombiTask::PanicOnRunAndDrop,
272 }));
273
274 // Keep track of whether the task has been killed with an abort
275 let mut aborted = false;
276
277 // If we want to poll the JoinHandle, do it now
278 if ji == CombiJoinInterest::Polled {
279 assert!(
280 handle.as_mut().unwrap().now_or_never().is_none(),
281 "Polling handle succeeded"
282 );
283 }
284
285 if abort == CombiAbort::AbortedImmediately {
286 handle.as_mut().unwrap().abort();
287 aborted = true;
288 }
289 if jh == CombiJoinHandle::DropImmediately {
290 drop(handle.take().unwrap());
291 }
292
293 // === Wait for first poll ===
294 let got_polled = rt.block_on(wait_first_poll).is_ok();
295 if !got_polled {
296 // it's possible that we are aborted but still got polled
297 assert!(
298 aborted,
299 "Task completed without ever being polled but was not aborted."
300 );
301 }
302
303 if abort == CombiAbort::AbortedFirstPoll {
304 handle.as_mut().unwrap().abort();
305 aborted = true;
306 }
307 if jh == CombiJoinHandle::DropFirstPoll {
308 drop(handle.take().unwrap());
309 }
310
311 // Signal the future that it can return now
312 let _ = on_complete.send(());
313 // === Wait for future to be dropped ===
314 assert!(
315 rt.block_on(wait_future_drop).is_ok(),
316 "The future should always be dropped."
317 );
318
319 if abort == CombiAbort::AbortedAfterFinish {
320 // Don't set aborted to true here as the task already finished
321 handle.as_mut().unwrap().abort();
322 }
323 if jh == CombiJoinHandle::DropAfterNoConsume {
324 // The runtime will usually have dropped every ref-count at this point,
325 // in which case dropping the JoinHandle drops the output.
326 //
327 // (But it might race and still hold a ref-count)
328 let panic = panic::catch_unwind(panic::AssertUnwindSafe(|| {
329 drop(handle.take().unwrap());
330 }));
331 if panic.is_err() {
332 assert!(
333 (output == CombiOutput::PanicOnDrop)
334 && (!matches!(task, CombiTask::PanicOnRun | CombiTask::PanicOnRunAndDrop))
335 && !aborted,
336 "Dropping JoinHandle shouldn't panic here"
337 );
338 }
339 }
340
341 // Check whether we drop after consuming the output
342 if jh == CombiJoinHandle::DropAfterConsume {
343 // Using as_mut() to not immediately drop the handle
344 let result = rt.block_on(handle.as_mut().unwrap());
345
346 match result {
347 Ok(mut output) => {
348 // Don't panic here.
349 output.disarm();
350 assert!(!aborted, "Task was aborted but returned output");
351 }
352 Err(err) if err.is_cancelled() => assert!(aborted, "Cancelled output but not aborted"),
353 Err(err) if err.is_panic() => {
354 assert!(
355 (task == CombiTask::PanicOnRun)
356 || (task == CombiTask::PanicOnDrop)
357 || (task == CombiTask::PanicOnRunAndDrop)
358 || (output == CombiOutput::PanicOnDrop),
359 "Panic but nothing should panic"
360 );
361 }
362 _ => unreachable!(),
363 }
364
365 let handle = handle.take().unwrap();
366 if abort == CombiAbort::AbortedAfterConsumeOutput {
367 handle.abort();
368 }
369 drop(handle);
370 }
371
372 // The output should have been dropped now. Check whether the output
373 // object was created at all.
374 let output_created = rt.block_on(wait_output_drop).is_ok();
375 assert_eq!(
376 output_created,
377 (!matches!(task, CombiTask::PanicOnRun | CombiTask::PanicOnRunAndDrop)) && !aborted,
378 "Creation of output object"
379 );
380 }
381