• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 use std::fmt;
2 use std::future::Future;
3 use std::panic;
4 use std::pin::Pin;
5 use std::task::{Context, Poll};
6 
7 use crate::runtime::task::AbortHandle;
8 use crate::runtime::Builder;
9 use crate::sync::oneshot;
10 use crate::task::JoinHandle;
11 
12 use futures::future::FutureExt;
13 
14 // Enums for each option in the combinations being tested
15 
16 #[derive(Copy, Clone, Debug, PartialEq)]
17 enum CombiRuntime {
18     CurrentThread,
19     Multi1,
20     Multi2,
21 }
22 #[derive(Copy, Clone, Debug, PartialEq)]
23 enum CombiLocalSet {
24     Yes,
25     No,
26 }
27 #[derive(Copy, Clone, Debug, PartialEq)]
28 enum CombiTask {
29     PanicOnRun,
30     PanicOnDrop,
31     PanicOnRunAndDrop,
32     NoPanic,
33 }
34 #[derive(Copy, Clone, Debug, PartialEq)]
35 enum CombiOutput {
36     PanicOnDrop,
37     NoPanic,
38 }
39 #[derive(Copy, Clone, Debug, PartialEq)]
40 enum CombiJoinInterest {
41     Polled,
42     NotPolled,
43 }
44 #[allow(clippy::enum_variant_names)] // we aren't using glob imports
45 #[derive(Copy, Clone, Debug, PartialEq)]
46 enum CombiJoinHandle {
47     DropImmediately = 1,
48     DropFirstPoll = 2,
49     DropAfterNoConsume = 3,
50     DropAfterConsume = 4,
51 }
52 #[derive(Copy, Clone, Debug, PartialEq)]
53 enum CombiAbort {
54     NotAborted = 0,
55     AbortedImmediately = 1,
56     AbortedFirstPoll = 2,
57     AbortedAfterFinish = 3,
58     AbortedAfterConsumeOutput = 4,
59 }
60 
61 #[derive(Copy, Clone, Debug, PartialEq)]
62 enum CombiAbortSource {
63     JoinHandle,
64     AbortHandle,
65 }
66 
67 #[test]
test_combinations()68 fn test_combinations() {
69     let mut rt = &[
70         CombiRuntime::CurrentThread,
71         CombiRuntime::Multi1,
72         CombiRuntime::Multi2,
73     ][..];
74 
75     if cfg!(miri) {
76         rt = &[CombiRuntime::CurrentThread];
77     }
78 
79     let ls = [CombiLocalSet::Yes, CombiLocalSet::No];
80     let task = [
81         CombiTask::NoPanic,
82         CombiTask::PanicOnRun,
83         CombiTask::PanicOnDrop,
84         CombiTask::PanicOnRunAndDrop,
85     ];
86     let output = [CombiOutput::NoPanic, CombiOutput::PanicOnDrop];
87     let ji = [CombiJoinInterest::Polled, CombiJoinInterest::NotPolled];
88     let jh = [
89         CombiJoinHandle::DropImmediately,
90         CombiJoinHandle::DropFirstPoll,
91         CombiJoinHandle::DropAfterNoConsume,
92         CombiJoinHandle::DropAfterConsume,
93     ];
94     let abort = [
95         CombiAbort::NotAborted,
96         CombiAbort::AbortedImmediately,
97         CombiAbort::AbortedFirstPoll,
98         CombiAbort::AbortedAfterFinish,
99         CombiAbort::AbortedAfterConsumeOutput,
100     ];
101     let ah = [
102         None,
103         Some(CombiJoinHandle::DropImmediately),
104         Some(CombiJoinHandle::DropFirstPoll),
105         Some(CombiJoinHandle::DropAfterNoConsume),
106         Some(CombiJoinHandle::DropAfterConsume),
107     ];
108 
109     for rt in rt.iter().copied() {
110         for ls in ls.iter().copied() {
111             for task in task.iter().copied() {
112                 for output in output.iter().copied() {
113                     for ji in ji.iter().copied() {
114                         for jh in jh.iter().copied() {
115                             for abort in abort.iter().copied() {
116                                 // abort via join handle --- abort  handles
117                                 // may be dropped at any point
118                                 for ah in ah.iter().copied() {
119                                     test_combination(
120                                         rt,
121                                         ls,
122                                         task,
123                                         output,
124                                         ji,
125                                         jh,
126                                         ah,
127                                         abort,
128                                         CombiAbortSource::JoinHandle,
129                                     );
130                                 }
131                                 // if aborting via AbortHandle, it will
132                                 // never be dropped.
133                                 test_combination(
134                                     rt,
135                                     ls,
136                                     task,
137                                     output,
138                                     ji,
139                                     jh,
140                                     None,
141                                     abort,
142                                     CombiAbortSource::AbortHandle,
143                                 );
144                             }
145                         }
146                     }
147                 }
148             }
149         }
150     }
151 }
152 
is_debug<T: fmt::Debug>(_: &T)153 fn is_debug<T: fmt::Debug>(_: &T) {}
154 
155 #[allow(clippy::too_many_arguments)]
test_combination( rt: CombiRuntime, ls: CombiLocalSet, task: CombiTask, output: CombiOutput, ji: CombiJoinInterest, jh: CombiJoinHandle, ah: Option<CombiJoinHandle>, abort: CombiAbort, abort_src: CombiAbortSource, )156 fn test_combination(
157     rt: CombiRuntime,
158     ls: CombiLocalSet,
159     task: CombiTask,
160     output: CombiOutput,
161     ji: CombiJoinInterest,
162     jh: CombiJoinHandle,
163     ah: Option<CombiJoinHandle>,
164     abort: CombiAbort,
165     abort_src: CombiAbortSource,
166 ) {
167     match (abort_src, ah) {
168         (CombiAbortSource::JoinHandle, _) if (jh as usize) < (abort as usize) => {
169             // join handle dropped prior to abort
170             return;
171         }
172         (CombiAbortSource::AbortHandle, Some(_)) => {
173             // abort handle dropped, we can't abort through the
174             // abort handle
175             return;
176         }
177 
178         _ => {}
179     }
180 
181     if (task == CombiTask::PanicOnDrop) && (output == CombiOutput::PanicOnDrop) {
182         // this causes double panic
183         return;
184     }
185     if (task == CombiTask::PanicOnRunAndDrop) && (abort != CombiAbort::AbortedImmediately) {
186         // this causes double panic
187         return;
188     }
189 
190     is_debug(&rt);
191     is_debug(&ls);
192     is_debug(&task);
193     is_debug(&output);
194     is_debug(&ji);
195     is_debug(&jh);
196     is_debug(&ah);
197     is_debug(&abort);
198     is_debug(&abort_src);
199 
200     // A runtime optionally with a LocalSet
201     struct Rt {
202         rt: crate::runtime::Runtime,
203         ls: Option<crate::task::LocalSet>,
204     }
205     impl Rt {
206         fn new(rt: CombiRuntime, ls: CombiLocalSet) -> Self {
207             let rt = match rt {
208                 CombiRuntime::CurrentThread => Builder::new_current_thread().build().unwrap(),
209                 CombiRuntime::Multi1 => Builder::new_multi_thread()
210                     .worker_threads(1)
211                     .build()
212                     .unwrap(),
213                 CombiRuntime::Multi2 => Builder::new_multi_thread()
214                     .worker_threads(2)
215                     .build()
216                     .unwrap(),
217             };
218 
219             let ls = match ls {
220                 CombiLocalSet::Yes => Some(crate::task::LocalSet::new()),
221                 CombiLocalSet::No => None,
222             };
223 
224             Self { rt, ls }
225         }
226         fn block_on<T>(&self, task: T) -> T::Output
227         where
228             T: Future,
229         {
230             match &self.ls {
231                 Some(ls) => ls.block_on(&self.rt, task),
232                 None => self.rt.block_on(task),
233             }
234         }
235         fn spawn<T>(&self, task: T) -> JoinHandle<T::Output>
236         where
237             T: Future + Send + 'static,
238             T::Output: Send + 'static,
239         {
240             match &self.ls {
241                 Some(ls) => ls.spawn_local(task),
242                 None => self.rt.spawn(task),
243             }
244         }
245     }
246 
247     // The type used for the output of the future
248     struct Output {
249         panic_on_drop: bool,
250         on_drop: Option<oneshot::Sender<()>>,
251     }
252     impl Output {
253         fn disarm(&mut self) {
254             self.panic_on_drop = false;
255         }
256     }
257     impl Drop for Output {
258         fn drop(&mut self) {
259             let _ = self.on_drop.take().unwrap().send(());
260             if self.panic_on_drop {
261                 panic!("Panicking in Output");
262             }
263         }
264     }
265 
266     // A wrapper around the future that is spawned
267     struct FutWrapper<F> {
268         inner: F,
269         on_drop: Option<oneshot::Sender<()>>,
270         panic_on_drop: bool,
271     }
272     impl<F: Future> Future for FutWrapper<F> {
273         type Output = F::Output;
274         fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<F::Output> {
275             unsafe {
276                 let me = Pin::into_inner_unchecked(self);
277                 let inner = Pin::new_unchecked(&mut me.inner);
278                 inner.poll(cx)
279             }
280         }
281     }
282     impl<F> Drop for FutWrapper<F> {
283         fn drop(&mut self) {
284             let _: Result<(), ()> = self.on_drop.take().unwrap().send(());
285             if self.panic_on_drop {
286                 panic!("Panicking in FutWrapper");
287             }
288         }
289     }
290 
291     // The channels passed to the task
292     struct Signals {
293         on_first_poll: Option<oneshot::Sender<()>>,
294         wait_complete: Option<oneshot::Receiver<()>>,
295         on_output_drop: Option<oneshot::Sender<()>>,
296     }
297 
298     // The task we will spawn
299     async fn my_task(mut signal: Signals, task: CombiTask, out: CombiOutput) -> Output {
300         // Signal that we have been polled once
301         let _ = signal.on_first_poll.take().unwrap().send(());
302 
303         // Wait for a signal, then complete the future
304         let _ = signal.wait_complete.take().unwrap().await;
305 
306         // If the task gets past wait_complete without yielding, then aborts
307         // may not be caught without this yield_now.
308         crate::task::yield_now().await;
309 
310         if task == CombiTask::PanicOnRun || task == CombiTask::PanicOnRunAndDrop {
311             panic!("Panicking in my_task on {:?}", std::thread::current().id());
312         }
313 
314         Output {
315             panic_on_drop: out == CombiOutput::PanicOnDrop,
316             on_drop: signal.on_output_drop.take(),
317         }
318     }
319 
320     let rt = Rt::new(rt, ls);
321 
322     let (on_first_poll, wait_first_poll) = oneshot::channel();
323     let (on_complete, wait_complete) = oneshot::channel();
324     let (on_future_drop, wait_future_drop) = oneshot::channel();
325     let (on_output_drop, wait_output_drop) = oneshot::channel();
326     let signal = Signals {
327         on_first_poll: Some(on_first_poll),
328         wait_complete: Some(wait_complete),
329         on_output_drop: Some(on_output_drop),
330     };
331 
332     // === Spawn task ===
333     let mut handle = Some(rt.spawn(FutWrapper {
334         inner: my_task(signal, task, output),
335         on_drop: Some(on_future_drop),
336         panic_on_drop: task == CombiTask::PanicOnDrop || task == CombiTask::PanicOnRunAndDrop,
337     }));
338 
339     // Keep track of whether the task has been killed with an abort
340     let mut aborted = false;
341 
342     // If we want to poll the JoinHandle, do it now
343     if ji == CombiJoinInterest::Polled {
344         assert!(
345             handle.as_mut().unwrap().now_or_never().is_none(),
346             "Polling handle succeeded"
347         );
348     }
349 
350     // If we are either aborting the task via an abort handle, or dropping via
351     // an abort handle, do that now.
352     let mut abort_handle = if ah.is_some() || abort_src == CombiAbortSource::AbortHandle {
353         handle.as_ref().map(JoinHandle::abort_handle)
354     } else {
355         None
356     };
357 
358     let do_abort = |abort_handle: &mut Option<AbortHandle>,
359                     join_handle: Option<&mut JoinHandle<_>>| {
360         match abort_src {
361             CombiAbortSource::AbortHandle => abort_handle.take().unwrap().abort(),
362             CombiAbortSource::JoinHandle => join_handle.unwrap().abort(),
363         }
364     };
365 
366     if abort == CombiAbort::AbortedImmediately {
367         do_abort(&mut abort_handle, handle.as_mut());
368         aborted = true;
369     }
370     if jh == CombiJoinHandle::DropImmediately {
371         drop(handle.take().unwrap());
372     }
373 
374     // === Wait for first poll ===
375     let got_polled = rt.block_on(wait_first_poll).is_ok();
376     if !got_polled {
377         // it's possible that we are aborted but still got polled
378         assert!(
379             aborted,
380             "Task completed without ever being polled but was not aborted."
381         );
382     }
383 
384     if abort == CombiAbort::AbortedFirstPoll {
385         do_abort(&mut abort_handle, handle.as_mut());
386         aborted = true;
387     }
388     if jh == CombiJoinHandle::DropFirstPoll {
389         drop(handle.take().unwrap());
390     }
391     if ah == Some(CombiJoinHandle::DropFirstPoll) {
392         drop(abort_handle.take().unwrap());
393     }
394 
395     // Signal the future that it can return now
396     let _ = on_complete.send(());
397     // === Wait for future to be dropped ===
398     assert!(
399         rt.block_on(wait_future_drop).is_ok(),
400         "The future should always be dropped."
401     );
402 
403     if abort == CombiAbort::AbortedAfterFinish {
404         // Don't set aborted to true here as the task already finished
405         do_abort(&mut abort_handle, handle.as_mut());
406     }
407     if jh == CombiJoinHandle::DropAfterNoConsume {
408         if ah == Some(CombiJoinHandle::DropAfterNoConsume) {
409             drop(handle.take().unwrap());
410             // The runtime will usually have dropped every ref-count at this point,
411             // in which case dropping the AbortHandle drops the output.
412             //
413             // (But it might race and still hold a ref-count)
414             let panic = panic::catch_unwind(panic::AssertUnwindSafe(|| {
415                 drop(abort_handle.take().unwrap());
416             }));
417             if panic.is_err() {
418                 assert!(
419                     (output == CombiOutput::PanicOnDrop)
420                         && (!matches!(task, CombiTask::PanicOnRun | CombiTask::PanicOnRunAndDrop))
421                         && !aborted,
422                     "Dropping AbortHandle shouldn't panic here"
423                 );
424             }
425         } else {
426             // The runtime will usually have dropped every ref-count at this point,
427             // in which case dropping the JoinHandle drops the output.
428             //
429             // (But it might race and still hold a ref-count)
430             let panic = panic::catch_unwind(panic::AssertUnwindSafe(|| {
431                 drop(handle.take().unwrap());
432             }));
433             if panic.is_err() {
434                 assert!(
435                     (output == CombiOutput::PanicOnDrop)
436                         && (!matches!(task, CombiTask::PanicOnRun | CombiTask::PanicOnRunAndDrop))
437                         && !aborted,
438                     "Dropping JoinHandle shouldn't panic here"
439                 );
440             }
441         }
442     }
443 
444     // Check whether we drop after consuming the output
445     if jh == CombiJoinHandle::DropAfterConsume {
446         // Using as_mut() to not immediately drop the handle
447         let result = rt.block_on(handle.as_mut().unwrap());
448 
449         match result {
450             Ok(mut output) => {
451                 // Don't panic here.
452                 output.disarm();
453                 assert!(!aborted, "Task was aborted but returned output");
454             }
455             Err(err) if err.is_cancelled() => assert!(aborted, "Cancelled output but not aborted"),
456             Err(err) if err.is_panic() => {
457                 assert!(
458                     (task == CombiTask::PanicOnRun)
459                         || (task == CombiTask::PanicOnDrop)
460                         || (task == CombiTask::PanicOnRunAndDrop)
461                         || (output == CombiOutput::PanicOnDrop),
462                     "Panic but nothing should panic"
463                 );
464             }
465             _ => unreachable!(),
466         }
467 
468         let mut handle = handle.take().unwrap();
469         if abort == CombiAbort::AbortedAfterConsumeOutput {
470             do_abort(&mut abort_handle, Some(&mut handle));
471         }
472         drop(handle);
473 
474         if ah == Some(CombiJoinHandle::DropAfterConsume) {
475             drop(abort_handle.take());
476         }
477     }
478 
479     // The output should have been dropped now. Check whether the output
480     // object was created at all.
481     let output_created = rt.block_on(wait_output_drop).is_ok();
482     assert_eq!(
483         output_created,
484         (!matches!(task, CombiTask::PanicOnRun | CombiTask::PanicOnRunAndDrop)) && !aborted,
485         "Creation of output object"
486     );
487 }
488