1 use std::fmt;
2 use std::future::Future;
3 use std::panic;
4 use std::pin::Pin;
5 use std::task::{Context, Poll};
6
7 use crate::runtime::task::AbortHandle;
8 use crate::runtime::Builder;
9 use crate::sync::oneshot;
10 use crate::task::JoinHandle;
11
12 use futures::future::FutureExt;
13
14 // Enums for each option in the combinations being tested
15
16 #[derive(Copy, Clone, Debug, PartialEq)]
17 enum CombiRuntime {
18 CurrentThread,
19 Multi1,
20 Multi2,
21 }
22 #[derive(Copy, Clone, Debug, PartialEq)]
23 enum CombiLocalSet {
24 Yes,
25 No,
26 }
27 #[derive(Copy, Clone, Debug, PartialEq)]
28 enum CombiTask {
29 PanicOnRun,
30 PanicOnDrop,
31 PanicOnRunAndDrop,
32 NoPanic,
33 }
34 #[derive(Copy, Clone, Debug, PartialEq)]
35 enum CombiOutput {
36 PanicOnDrop,
37 NoPanic,
38 }
39 #[derive(Copy, Clone, Debug, PartialEq)]
40 enum CombiJoinInterest {
41 Polled,
42 NotPolled,
43 }
44 #[allow(clippy::enum_variant_names)] // we aren't using glob imports
45 #[derive(Copy, Clone, Debug, PartialEq)]
46 enum CombiJoinHandle {
47 DropImmediately = 1,
48 DropFirstPoll = 2,
49 DropAfterNoConsume = 3,
50 DropAfterConsume = 4,
51 }
52 #[derive(Copy, Clone, Debug, PartialEq)]
53 enum CombiAbort {
54 NotAborted = 0,
55 AbortedImmediately = 1,
56 AbortedFirstPoll = 2,
57 AbortedAfterFinish = 3,
58 AbortedAfterConsumeOutput = 4,
59 }
60
61 #[derive(Copy, Clone, Debug, PartialEq)]
62 enum CombiAbortSource {
63 JoinHandle,
64 AbortHandle,
65 }
66
67 #[test]
test_combinations()68 fn test_combinations() {
69 let mut rt = &[
70 CombiRuntime::CurrentThread,
71 CombiRuntime::Multi1,
72 CombiRuntime::Multi2,
73 ][..];
74
75 if cfg!(miri) {
76 rt = &[CombiRuntime::CurrentThread];
77 }
78
79 let ls = [CombiLocalSet::Yes, CombiLocalSet::No];
80 let task = [
81 CombiTask::NoPanic,
82 CombiTask::PanicOnRun,
83 CombiTask::PanicOnDrop,
84 CombiTask::PanicOnRunAndDrop,
85 ];
86 let output = [CombiOutput::NoPanic, CombiOutput::PanicOnDrop];
87 let ji = [CombiJoinInterest::Polled, CombiJoinInterest::NotPolled];
88 let jh = [
89 CombiJoinHandle::DropImmediately,
90 CombiJoinHandle::DropFirstPoll,
91 CombiJoinHandle::DropAfterNoConsume,
92 CombiJoinHandle::DropAfterConsume,
93 ];
94 let abort = [
95 CombiAbort::NotAborted,
96 CombiAbort::AbortedImmediately,
97 CombiAbort::AbortedFirstPoll,
98 CombiAbort::AbortedAfterFinish,
99 CombiAbort::AbortedAfterConsumeOutput,
100 ];
101 let ah = [
102 None,
103 Some(CombiJoinHandle::DropImmediately),
104 Some(CombiJoinHandle::DropFirstPoll),
105 Some(CombiJoinHandle::DropAfterNoConsume),
106 Some(CombiJoinHandle::DropAfterConsume),
107 ];
108
109 for rt in rt.iter().copied() {
110 for ls in ls.iter().copied() {
111 for task in task.iter().copied() {
112 for output in output.iter().copied() {
113 for ji in ji.iter().copied() {
114 for jh in jh.iter().copied() {
115 for abort in abort.iter().copied() {
116 // abort via join handle --- abort handles
117 // may be dropped at any point
118 for ah in ah.iter().copied() {
119 test_combination(
120 rt,
121 ls,
122 task,
123 output,
124 ji,
125 jh,
126 ah,
127 abort,
128 CombiAbortSource::JoinHandle,
129 );
130 }
131 // if aborting via AbortHandle, it will
132 // never be dropped.
133 test_combination(
134 rt,
135 ls,
136 task,
137 output,
138 ji,
139 jh,
140 None,
141 abort,
142 CombiAbortSource::AbortHandle,
143 );
144 }
145 }
146 }
147 }
148 }
149 }
150 }
151 }
152
is_debug<T: fmt::Debug>(_: &T)153 fn is_debug<T: fmt::Debug>(_: &T) {}
154
155 #[allow(clippy::too_many_arguments)]
test_combination( rt: CombiRuntime, ls: CombiLocalSet, task: CombiTask, output: CombiOutput, ji: CombiJoinInterest, jh: CombiJoinHandle, ah: Option<CombiJoinHandle>, abort: CombiAbort, abort_src: CombiAbortSource, )156 fn test_combination(
157 rt: CombiRuntime,
158 ls: CombiLocalSet,
159 task: CombiTask,
160 output: CombiOutput,
161 ji: CombiJoinInterest,
162 jh: CombiJoinHandle,
163 ah: Option<CombiJoinHandle>,
164 abort: CombiAbort,
165 abort_src: CombiAbortSource,
166 ) {
167 match (abort_src, ah) {
168 (CombiAbortSource::JoinHandle, _) if (jh as usize) < (abort as usize) => {
169 // join handle dropped prior to abort
170 return;
171 }
172 (CombiAbortSource::AbortHandle, Some(_)) => {
173 // abort handle dropped, we can't abort through the
174 // abort handle
175 return;
176 }
177
178 _ => {}
179 }
180
181 if (task == CombiTask::PanicOnDrop) && (output == CombiOutput::PanicOnDrop) {
182 // this causes double panic
183 return;
184 }
185 if (task == CombiTask::PanicOnRunAndDrop) && (abort != CombiAbort::AbortedImmediately) {
186 // this causes double panic
187 return;
188 }
189
190 is_debug(&rt);
191 is_debug(&ls);
192 is_debug(&task);
193 is_debug(&output);
194 is_debug(&ji);
195 is_debug(&jh);
196 is_debug(&ah);
197 is_debug(&abort);
198 is_debug(&abort_src);
199
200 // A runtime optionally with a LocalSet
201 struct Rt {
202 rt: crate::runtime::Runtime,
203 ls: Option<crate::task::LocalSet>,
204 }
205 impl Rt {
206 fn new(rt: CombiRuntime, ls: CombiLocalSet) -> Self {
207 let rt = match rt {
208 CombiRuntime::CurrentThread => Builder::new_current_thread().build().unwrap(),
209 CombiRuntime::Multi1 => Builder::new_multi_thread()
210 .worker_threads(1)
211 .build()
212 .unwrap(),
213 CombiRuntime::Multi2 => Builder::new_multi_thread()
214 .worker_threads(2)
215 .build()
216 .unwrap(),
217 };
218
219 let ls = match ls {
220 CombiLocalSet::Yes => Some(crate::task::LocalSet::new()),
221 CombiLocalSet::No => None,
222 };
223
224 Self { rt, ls }
225 }
226 fn block_on<T>(&self, task: T) -> T::Output
227 where
228 T: Future,
229 {
230 match &self.ls {
231 Some(ls) => ls.block_on(&self.rt, task),
232 None => self.rt.block_on(task),
233 }
234 }
235 fn spawn<T>(&self, task: T) -> JoinHandle<T::Output>
236 where
237 T: Future + Send + 'static,
238 T::Output: Send + 'static,
239 {
240 match &self.ls {
241 Some(ls) => ls.spawn_local(task),
242 None => self.rt.spawn(task),
243 }
244 }
245 }
246
247 // The type used for the output of the future
248 struct Output {
249 panic_on_drop: bool,
250 on_drop: Option<oneshot::Sender<()>>,
251 }
252 impl Output {
253 fn disarm(&mut self) {
254 self.panic_on_drop = false;
255 }
256 }
257 impl Drop for Output {
258 fn drop(&mut self) {
259 let _ = self.on_drop.take().unwrap().send(());
260 if self.panic_on_drop {
261 panic!("Panicking in Output");
262 }
263 }
264 }
265
266 // A wrapper around the future that is spawned
267 struct FutWrapper<F> {
268 inner: F,
269 on_drop: Option<oneshot::Sender<()>>,
270 panic_on_drop: bool,
271 }
272 impl<F: Future> Future for FutWrapper<F> {
273 type Output = F::Output;
274 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<F::Output> {
275 unsafe {
276 let me = Pin::into_inner_unchecked(self);
277 let inner = Pin::new_unchecked(&mut me.inner);
278 inner.poll(cx)
279 }
280 }
281 }
282 impl<F> Drop for FutWrapper<F> {
283 fn drop(&mut self) {
284 let _: Result<(), ()> = self.on_drop.take().unwrap().send(());
285 if self.panic_on_drop {
286 panic!("Panicking in FutWrapper");
287 }
288 }
289 }
290
291 // The channels passed to the task
292 struct Signals {
293 on_first_poll: Option<oneshot::Sender<()>>,
294 wait_complete: Option<oneshot::Receiver<()>>,
295 on_output_drop: Option<oneshot::Sender<()>>,
296 }
297
298 // The task we will spawn
299 async fn my_task(mut signal: Signals, task: CombiTask, out: CombiOutput) -> Output {
300 // Signal that we have been polled once
301 let _ = signal.on_first_poll.take().unwrap().send(());
302
303 // Wait for a signal, then complete the future
304 let _ = signal.wait_complete.take().unwrap().await;
305
306 // If the task gets past wait_complete without yielding, then aborts
307 // may not be caught without this yield_now.
308 crate::task::yield_now().await;
309
310 if task == CombiTask::PanicOnRun || task == CombiTask::PanicOnRunAndDrop {
311 panic!("Panicking in my_task on {:?}", std::thread::current().id());
312 }
313
314 Output {
315 panic_on_drop: out == CombiOutput::PanicOnDrop,
316 on_drop: signal.on_output_drop.take(),
317 }
318 }
319
320 let rt = Rt::new(rt, ls);
321
322 let (on_first_poll, wait_first_poll) = oneshot::channel();
323 let (on_complete, wait_complete) = oneshot::channel();
324 let (on_future_drop, wait_future_drop) = oneshot::channel();
325 let (on_output_drop, wait_output_drop) = oneshot::channel();
326 let signal = Signals {
327 on_first_poll: Some(on_first_poll),
328 wait_complete: Some(wait_complete),
329 on_output_drop: Some(on_output_drop),
330 };
331
332 // === Spawn task ===
333 let mut handle = Some(rt.spawn(FutWrapper {
334 inner: my_task(signal, task, output),
335 on_drop: Some(on_future_drop),
336 panic_on_drop: task == CombiTask::PanicOnDrop || task == CombiTask::PanicOnRunAndDrop,
337 }));
338
339 // Keep track of whether the task has been killed with an abort
340 let mut aborted = false;
341
342 // If we want to poll the JoinHandle, do it now
343 if ji == CombiJoinInterest::Polled {
344 assert!(
345 handle.as_mut().unwrap().now_or_never().is_none(),
346 "Polling handle succeeded"
347 );
348 }
349
350 // If we are either aborting the task via an abort handle, or dropping via
351 // an abort handle, do that now.
352 let mut abort_handle = if ah.is_some() || abort_src == CombiAbortSource::AbortHandle {
353 handle.as_ref().map(JoinHandle::abort_handle)
354 } else {
355 None
356 };
357
358 let do_abort = |abort_handle: &mut Option<AbortHandle>,
359 join_handle: Option<&mut JoinHandle<_>>| {
360 match abort_src {
361 CombiAbortSource::AbortHandle => abort_handle.take().unwrap().abort(),
362 CombiAbortSource::JoinHandle => join_handle.unwrap().abort(),
363 }
364 };
365
366 if abort == CombiAbort::AbortedImmediately {
367 do_abort(&mut abort_handle, handle.as_mut());
368 aborted = true;
369 }
370 if jh == CombiJoinHandle::DropImmediately {
371 drop(handle.take().unwrap());
372 }
373
374 // === Wait for first poll ===
375 let got_polled = rt.block_on(wait_first_poll).is_ok();
376 if !got_polled {
377 // it's possible that we are aborted but still got polled
378 assert!(
379 aborted,
380 "Task completed without ever being polled but was not aborted."
381 );
382 }
383
384 if abort == CombiAbort::AbortedFirstPoll {
385 do_abort(&mut abort_handle, handle.as_mut());
386 aborted = true;
387 }
388 if jh == CombiJoinHandle::DropFirstPoll {
389 drop(handle.take().unwrap());
390 }
391 if ah == Some(CombiJoinHandle::DropFirstPoll) {
392 drop(abort_handle.take().unwrap());
393 }
394
395 // Signal the future that it can return now
396 let _ = on_complete.send(());
397 // === Wait for future to be dropped ===
398 assert!(
399 rt.block_on(wait_future_drop).is_ok(),
400 "The future should always be dropped."
401 );
402
403 if abort == CombiAbort::AbortedAfterFinish {
404 // Don't set aborted to true here as the task already finished
405 do_abort(&mut abort_handle, handle.as_mut());
406 }
407 if jh == CombiJoinHandle::DropAfterNoConsume {
408 if ah == Some(CombiJoinHandle::DropAfterNoConsume) {
409 drop(handle.take().unwrap());
410 // The runtime will usually have dropped every ref-count at this point,
411 // in which case dropping the AbortHandle drops the output.
412 //
413 // (But it might race and still hold a ref-count)
414 let panic = panic::catch_unwind(panic::AssertUnwindSafe(|| {
415 drop(abort_handle.take().unwrap());
416 }));
417 if panic.is_err() {
418 assert!(
419 (output == CombiOutput::PanicOnDrop)
420 && (!matches!(task, CombiTask::PanicOnRun | CombiTask::PanicOnRunAndDrop))
421 && !aborted,
422 "Dropping AbortHandle shouldn't panic here"
423 );
424 }
425 } else {
426 // The runtime will usually have dropped every ref-count at this point,
427 // in which case dropping the JoinHandle drops the output.
428 //
429 // (But it might race and still hold a ref-count)
430 let panic = panic::catch_unwind(panic::AssertUnwindSafe(|| {
431 drop(handle.take().unwrap());
432 }));
433 if panic.is_err() {
434 assert!(
435 (output == CombiOutput::PanicOnDrop)
436 && (!matches!(task, CombiTask::PanicOnRun | CombiTask::PanicOnRunAndDrop))
437 && !aborted,
438 "Dropping JoinHandle shouldn't panic here"
439 );
440 }
441 }
442 }
443
444 // Check whether we drop after consuming the output
445 if jh == CombiJoinHandle::DropAfterConsume {
446 // Using as_mut() to not immediately drop the handle
447 let result = rt.block_on(handle.as_mut().unwrap());
448
449 match result {
450 Ok(mut output) => {
451 // Don't panic here.
452 output.disarm();
453 assert!(!aborted, "Task was aborted but returned output");
454 }
455 Err(err) if err.is_cancelled() => assert!(aborted, "Cancelled output but not aborted"),
456 Err(err) if err.is_panic() => {
457 assert!(
458 (task == CombiTask::PanicOnRun)
459 || (task == CombiTask::PanicOnDrop)
460 || (task == CombiTask::PanicOnRunAndDrop)
461 || (output == CombiOutput::PanicOnDrop),
462 "Panic but nothing should panic"
463 );
464 }
465 _ => unreachable!(),
466 }
467
468 let mut handle = handle.take().unwrap();
469 if abort == CombiAbort::AbortedAfterConsumeOutput {
470 do_abort(&mut abort_handle, Some(&mut handle));
471 }
472 drop(handle);
473
474 if ah == Some(CombiJoinHandle::DropAfterConsume) {
475 drop(abort_handle.take());
476 }
477 }
478
479 // The output should have been dropped now. Check whether the output
480 // object was created at all.
481 let output_created = rt.block_on(wait_output_drop).is_ok();
482 assert_eq!(
483 output_created,
484 (!matches!(task, CombiTask::PanicOnRun | CombiTask::PanicOnRunAndDrop)) && !aborted,
485 "Creation of output object"
486 );
487 }
488