• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 use std::future::Future;
2 use std::panic;
3 use std::pin::Pin;
4 use std::task::{Context, Poll};
5 
6 use crate::runtime::Builder;
7 use crate::sync::oneshot;
8 use crate::task::JoinHandle;
9 
10 use futures::future::FutureExt;
11 
12 // Enums for each option in the combinations being tested
13 
14 #[derive(Copy, Clone, Debug, PartialEq)]
15 enum CombiRuntime {
16     CurrentThread,
17     Multi1,
18     Multi2,
19 }
20 #[derive(Copy, Clone, Debug, PartialEq)]
21 enum CombiLocalSet {
22     Yes,
23     No,
24 }
25 #[derive(Copy, Clone, Debug, PartialEq)]
26 enum CombiTask {
27     PanicOnRun,
28     PanicOnDrop,
29     PanicOnRunAndDrop,
30     NoPanic,
31 }
32 #[derive(Copy, Clone, Debug, PartialEq)]
33 enum CombiOutput {
34     PanicOnDrop,
35     NoPanic,
36 }
37 #[derive(Copy, Clone, Debug, PartialEq)]
38 enum CombiJoinInterest {
39     Polled,
40     NotPolled,
41 }
42 #[allow(clippy::enum_variant_names)] // we aren't using glob imports
43 #[derive(Copy, Clone, Debug, PartialEq)]
44 enum CombiJoinHandle {
45     DropImmediately = 1,
46     DropFirstPoll = 2,
47     DropAfterNoConsume = 3,
48     DropAfterConsume = 4,
49 }
50 #[derive(Copy, Clone, Debug, PartialEq)]
51 enum CombiAbort {
52     NotAborted = 0,
53     AbortedImmediately = 1,
54     AbortedFirstPoll = 2,
55     AbortedAfterFinish = 3,
56     AbortedAfterConsumeOutput = 4,
57 }
58 
59 #[test]
test_combinations()60 fn test_combinations() {
61     let mut rt = &[
62         CombiRuntime::CurrentThread,
63         CombiRuntime::Multi1,
64         CombiRuntime::Multi2,
65     ][..];
66 
67     if cfg!(miri) {
68         rt = &[CombiRuntime::CurrentThread];
69     }
70 
71     let ls = [CombiLocalSet::Yes, CombiLocalSet::No];
72     let task = [
73         CombiTask::NoPanic,
74         CombiTask::PanicOnRun,
75         CombiTask::PanicOnDrop,
76         CombiTask::PanicOnRunAndDrop,
77     ];
78     let output = [CombiOutput::NoPanic, CombiOutput::PanicOnDrop];
79     let ji = [CombiJoinInterest::Polled, CombiJoinInterest::NotPolled];
80     let jh = [
81         CombiJoinHandle::DropImmediately,
82         CombiJoinHandle::DropFirstPoll,
83         CombiJoinHandle::DropAfterNoConsume,
84         CombiJoinHandle::DropAfterConsume,
85     ];
86     let abort = [
87         CombiAbort::NotAborted,
88         CombiAbort::AbortedImmediately,
89         CombiAbort::AbortedFirstPoll,
90         CombiAbort::AbortedAfterFinish,
91         CombiAbort::AbortedAfterConsumeOutput,
92     ];
93 
94     for rt in rt.iter().copied() {
95         for ls in ls.iter().copied() {
96             for task in task.iter().copied() {
97                 for output in output.iter().copied() {
98                     for ji in ji.iter().copied() {
99                         for jh in jh.iter().copied() {
100                             for abort in abort.iter().copied() {
101                                 test_combination(rt, ls, task, output, ji, jh, abort);
102                             }
103                         }
104                     }
105                 }
106             }
107         }
108     }
109 }
110 
test_combination( rt: CombiRuntime, ls: CombiLocalSet, task: CombiTask, output: CombiOutput, ji: CombiJoinInterest, jh: CombiJoinHandle, abort: CombiAbort, )111 fn test_combination(
112     rt: CombiRuntime,
113     ls: CombiLocalSet,
114     task: CombiTask,
115     output: CombiOutput,
116     ji: CombiJoinInterest,
117     jh: CombiJoinHandle,
118     abort: CombiAbort,
119 ) {
120     if (jh as usize) < (abort as usize) {
121         // drop before abort not possible
122         return;
123     }
124     if (task == CombiTask::PanicOnDrop) && (output == CombiOutput::PanicOnDrop) {
125         // this causes double panic
126         return;
127     }
128     if (task == CombiTask::PanicOnRunAndDrop) && (abort != CombiAbort::AbortedImmediately) {
129         // this causes double panic
130         return;
131     }
132 
133     println!("Runtime {:?}, LocalSet {:?}, Task {:?}, Output {:?}, JoinInterest {:?}, JoinHandle {:?}, Abort {:?}", rt, ls, task, output, ji, jh, abort);
134 
135     // A runtime optionally with a LocalSet
136     struct Rt {
137         rt: crate::runtime::Runtime,
138         ls: Option<crate::task::LocalSet>,
139     }
140     impl Rt {
141         fn new(rt: CombiRuntime, ls: CombiLocalSet) -> Self {
142             let rt = match rt {
143                 CombiRuntime::CurrentThread => Builder::new_current_thread().build().unwrap(),
144                 CombiRuntime::Multi1 => Builder::new_multi_thread()
145                     .worker_threads(1)
146                     .build()
147                     .unwrap(),
148                 CombiRuntime::Multi2 => Builder::new_multi_thread()
149                     .worker_threads(2)
150                     .build()
151                     .unwrap(),
152             };
153 
154             let ls = match ls {
155                 CombiLocalSet::Yes => Some(crate::task::LocalSet::new()),
156                 CombiLocalSet::No => None,
157             };
158 
159             Self { rt, ls }
160         }
161         fn block_on<T>(&self, task: T) -> T::Output
162         where
163             T: Future,
164         {
165             match &self.ls {
166                 Some(ls) => ls.block_on(&self.rt, task),
167                 None => self.rt.block_on(task),
168             }
169         }
170         fn spawn<T>(&self, task: T) -> JoinHandle<T::Output>
171         where
172             T: Future + Send + 'static,
173             T::Output: Send + 'static,
174         {
175             match &self.ls {
176                 Some(ls) => ls.spawn_local(task),
177                 None => self.rt.spawn(task),
178             }
179         }
180     }
181 
182     // The type used for the output of the future
183     struct Output {
184         panic_on_drop: bool,
185         on_drop: Option<oneshot::Sender<()>>,
186     }
187     impl Output {
188         fn disarm(&mut self) {
189             self.panic_on_drop = false;
190         }
191     }
192     impl Drop for Output {
193         fn drop(&mut self) {
194             let _ = self.on_drop.take().unwrap().send(());
195             if self.panic_on_drop {
196                 panic!("Panicking in Output");
197             }
198         }
199     }
200 
201     // A wrapper around the future that is spawned
202     struct FutWrapper<F> {
203         inner: F,
204         on_drop: Option<oneshot::Sender<()>>,
205         panic_on_drop: bool,
206     }
207     impl<F: Future> Future for FutWrapper<F> {
208         type Output = F::Output;
209         fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<F::Output> {
210             unsafe {
211                 let me = Pin::into_inner_unchecked(self);
212                 let inner = Pin::new_unchecked(&mut me.inner);
213                 inner.poll(cx)
214             }
215         }
216     }
217     impl<F> Drop for FutWrapper<F> {
218         fn drop(&mut self) {
219             let _: Result<(), ()> = self.on_drop.take().unwrap().send(());
220             if self.panic_on_drop {
221                 panic!("Panicking in FutWrapper");
222             }
223         }
224     }
225 
226     // The channels passed to the task
227     struct Signals {
228         on_first_poll: Option<oneshot::Sender<()>>,
229         wait_complete: Option<oneshot::Receiver<()>>,
230         on_output_drop: Option<oneshot::Sender<()>>,
231     }
232 
233     // The task we will spawn
234     async fn my_task(mut signal: Signals, task: CombiTask, out: CombiOutput) -> Output {
235         // Signal that we have been polled once
236         let _ = signal.on_first_poll.take().unwrap().send(());
237 
238         // Wait for a signal, then complete the future
239         let _ = signal.wait_complete.take().unwrap().await;
240 
241         // If the task gets past wait_complete without yielding, then aborts
242         // may not be caught without this yield_now.
243         crate::task::yield_now().await;
244 
245         if task == CombiTask::PanicOnRun || task == CombiTask::PanicOnRunAndDrop {
246             panic!("Panicking in my_task on {:?}", std::thread::current().id());
247         }
248 
249         Output {
250             panic_on_drop: out == CombiOutput::PanicOnDrop,
251             on_drop: signal.on_output_drop.take(),
252         }
253     }
254 
255     let rt = Rt::new(rt, ls);
256 
257     let (on_first_poll, wait_first_poll) = oneshot::channel();
258     let (on_complete, wait_complete) = oneshot::channel();
259     let (on_future_drop, wait_future_drop) = oneshot::channel();
260     let (on_output_drop, wait_output_drop) = oneshot::channel();
261     let signal = Signals {
262         on_first_poll: Some(on_first_poll),
263         wait_complete: Some(wait_complete),
264         on_output_drop: Some(on_output_drop),
265     };
266 
267     // === Spawn task ===
268     let mut handle = Some(rt.spawn(FutWrapper {
269         inner: my_task(signal, task, output),
270         on_drop: Some(on_future_drop),
271         panic_on_drop: task == CombiTask::PanicOnDrop || task == CombiTask::PanicOnRunAndDrop,
272     }));
273 
274     // Keep track of whether the task has been killed with an abort
275     let mut aborted = false;
276 
277     // If we want to poll the JoinHandle, do it now
278     if ji == CombiJoinInterest::Polled {
279         assert!(
280             handle.as_mut().unwrap().now_or_never().is_none(),
281             "Polling handle succeeded"
282         );
283     }
284 
285     if abort == CombiAbort::AbortedImmediately {
286         handle.as_mut().unwrap().abort();
287         aborted = true;
288     }
289     if jh == CombiJoinHandle::DropImmediately {
290         drop(handle.take().unwrap());
291     }
292 
293     // === Wait for first poll ===
294     let got_polled = rt.block_on(wait_first_poll).is_ok();
295     if !got_polled {
296         // it's possible that we are aborted but still got polled
297         assert!(
298             aborted,
299             "Task completed without ever being polled but was not aborted."
300         );
301     }
302 
303     if abort == CombiAbort::AbortedFirstPoll {
304         handle.as_mut().unwrap().abort();
305         aborted = true;
306     }
307     if jh == CombiJoinHandle::DropFirstPoll {
308         drop(handle.take().unwrap());
309     }
310 
311     // Signal the future that it can return now
312     let _ = on_complete.send(());
313     // === Wait for future to be dropped ===
314     assert!(
315         rt.block_on(wait_future_drop).is_ok(),
316         "The future should always be dropped."
317     );
318 
319     if abort == CombiAbort::AbortedAfterFinish {
320         // Don't set aborted to true here as the task already finished
321         handle.as_mut().unwrap().abort();
322     }
323     if jh == CombiJoinHandle::DropAfterNoConsume {
324         // The runtime will usually have dropped every ref-count at this point,
325         // in which case dropping the JoinHandle drops the output.
326         //
327         // (But it might race and still hold a ref-count)
328         let panic = panic::catch_unwind(panic::AssertUnwindSafe(|| {
329             drop(handle.take().unwrap());
330         }));
331         if panic.is_err() {
332             assert!(
333                 (output == CombiOutput::PanicOnDrop)
334                     && (!matches!(task, CombiTask::PanicOnRun | CombiTask::PanicOnRunAndDrop))
335                     && !aborted,
336                 "Dropping JoinHandle shouldn't panic here"
337             );
338         }
339     }
340 
341     // Check whether we drop after consuming the output
342     if jh == CombiJoinHandle::DropAfterConsume {
343         // Using as_mut() to not immediately drop the handle
344         let result = rt.block_on(handle.as_mut().unwrap());
345 
346         match result {
347             Ok(mut output) => {
348                 // Don't panic here.
349                 output.disarm();
350                 assert!(!aborted, "Task was aborted but returned output");
351             }
352             Err(err) if err.is_cancelled() => assert!(aborted, "Cancelled output but not aborted"),
353             Err(err) if err.is_panic() => {
354                 assert!(
355                     (task == CombiTask::PanicOnRun)
356                         || (task == CombiTask::PanicOnDrop)
357                         || (task == CombiTask::PanicOnRunAndDrop)
358                         || (output == CombiOutput::PanicOnDrop),
359                     "Panic but nothing should panic"
360                 );
361             }
362             _ => unreachable!(),
363         }
364 
365         let handle = handle.take().unwrap();
366         if abort == CombiAbort::AbortedAfterConsumeOutput {
367             handle.abort();
368         }
369         drop(handle);
370     }
371 
372     // The output should have been dropped now. Check whether the output
373     // object was created at all.
374     let output_created = rt.block_on(wait_output_drop).is_ok();
375     assert_eq!(
376         output_created,
377         (!matches!(task, CombiTask::PanicOnRun | CombiTask::PanicOnRunAndDrop)) && !aborted,
378         "Creation of output object"
379     );
380 }
381