• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 use std::future::Future;
2 use std::pin::Pin;
3 use std::sync::atomic::{AtomicUsize, Ordering};
4 use std::task::{Context, Poll};
5 use std::thread;
6 use std::time::Duration;
7 
8 use async_task::Runnable;
9 use easy_parallel::Parallel;
10 use smol::future;
11 
12 // Creates a future with event counters.
13 //
14 // Usage: `future!(f, POLL, DROP_F, DROP_T)`
15 //
16 // The future `f` outputs `Poll::Ready`.
17 // When it gets polled, `POLL` is incremented.
18 // When it gets dropped, `DROP_F` is incremented.
19 // When the output gets dropped, `DROP_T` is incremented.
20 macro_rules! future {
21     ($name:pat, $poll:ident, $drop_f:ident, $drop_t:ident) => {
22         static $poll: AtomicUsize = AtomicUsize::new(0);
23         static $drop_f: AtomicUsize = AtomicUsize::new(0);
24         static $drop_t: AtomicUsize = AtomicUsize::new(0);
25 
26         let $name = {
27             struct Fut(Box<i32>);
28 
29             impl Future for Fut {
30                 type Output = Out;
31 
32                 fn poll(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Self::Output> {
33                     $poll.fetch_add(1, Ordering::SeqCst);
34                     thread::sleep(ms(400));
35                     Poll::Ready(Out(Box::new(0), true))
36                 }
37             }
38 
39             impl Drop for Fut {
40                 fn drop(&mut self) {
41                     $drop_f.fetch_add(1, Ordering::SeqCst);
42                 }
43             }
44 
45             #[derive(Default)]
46             struct Out(Box<i32>, bool);
47 
48             impl Drop for Out {
49                 fn drop(&mut self) {
50                     if self.1 {
51                         $drop_t.fetch_add(1, Ordering::SeqCst);
52                     }
53                 }
54             }
55 
56             Fut(Box::new(0))
57         };
58     };
59 }
60 
61 // Creates a schedule function with event counters.
62 //
63 // Usage: `schedule!(s, SCHED, DROP)`
64 //
65 // The schedule function `s` does nothing.
66 // When it gets invoked, `SCHED` is incremented.
67 // When it gets dropped, `DROP` is incremented.
68 macro_rules! schedule {
69     ($name:pat, $sched:ident, $drop:ident) => {
70         static $drop: AtomicUsize = AtomicUsize::new(0);
71         static $sched: AtomicUsize = AtomicUsize::new(0);
72 
73         let $name = {
74             struct Guard(Box<i32>);
75 
76             impl Drop for Guard {
77                 fn drop(&mut self) {
78                     $drop.fetch_add(1, Ordering::SeqCst);
79                 }
80             }
81 
82             let guard = Guard(Box::new(0));
83             move |runnable: Runnable| {
84                 &guard;
85                 runnable.schedule();
86                 $sched.fetch_add(1, Ordering::SeqCst);
87             }
88         };
89     };
90 }
91 
ms(ms: u64) -> Duration92 fn ms(ms: u64) -> Duration {
93     Duration::from_millis(ms)
94 }
95 
96 #[test]
run_and_cancel()97 fn run_and_cancel() {
98     future!(f, POLL, DROP_F, DROP_T);
99     schedule!(s, SCHEDULE, DROP_S);
100     let (runnable, task) = async_task::spawn(f, s);
101 
102     runnable.run();
103     assert_eq!(POLL.load(Ordering::SeqCst), 1);
104     assert_eq!(SCHEDULE.load(Ordering::SeqCst), 0);
105     assert_eq!(DROP_F.load(Ordering::SeqCst), 1);
106     assert_eq!(DROP_T.load(Ordering::SeqCst), 0);
107     assert_eq!(DROP_S.load(Ordering::SeqCst), 0);
108 
109     assert!(future::block_on(task.cancel()).is_some());
110     assert_eq!(POLL.load(Ordering::SeqCst), 1);
111     assert_eq!(SCHEDULE.load(Ordering::SeqCst), 0);
112     assert_eq!(DROP_F.load(Ordering::SeqCst), 1);
113     assert_eq!(DROP_T.load(Ordering::SeqCst), 1);
114     assert_eq!(DROP_S.load(Ordering::SeqCst), 1);
115 }
116 
117 #[test]
cancel_and_run()118 fn cancel_and_run() {
119     future!(f, POLL, DROP_F, DROP_T);
120     schedule!(s, SCHEDULE, DROP_S);
121     let (runnable, task) = async_task::spawn(f, s);
122 
123     Parallel::new()
124         .add(|| {
125             thread::sleep(ms(200));
126             runnable.run();
127 
128             assert_eq!(POLL.load(Ordering::SeqCst), 0);
129             assert_eq!(SCHEDULE.load(Ordering::SeqCst), 0);
130             assert_eq!(DROP_F.load(Ordering::SeqCst), 1);
131             assert_eq!(DROP_T.load(Ordering::SeqCst), 0);
132 
133             thread::sleep(ms(200));
134 
135             assert_eq!(DROP_S.load(Ordering::SeqCst), 1);
136         })
137         .add(|| {
138             assert!(future::block_on(task.cancel()).is_none());
139 
140             thread::sleep(ms(200));
141 
142             assert_eq!(POLL.load(Ordering::SeqCst), 0);
143             assert_eq!(SCHEDULE.load(Ordering::SeqCst), 0);
144             assert_eq!(DROP_F.load(Ordering::SeqCst), 1);
145             assert_eq!(DROP_T.load(Ordering::SeqCst), 0);
146 
147             thread::sleep(ms(200));
148 
149             assert_eq!(DROP_S.load(Ordering::SeqCst), 1);
150         })
151         .run();
152 }
153 
154 #[test]
cancel_during_run()155 fn cancel_during_run() {
156     future!(f, POLL, DROP_F, DROP_T);
157     schedule!(s, SCHEDULE, DROP_S);
158     let (runnable, task) = async_task::spawn(f, s);
159 
160     Parallel::new()
161         .add(|| {
162             runnable.run();
163 
164             thread::sleep(ms(200));
165 
166             assert_eq!(POLL.load(Ordering::SeqCst), 1);
167             assert_eq!(SCHEDULE.load(Ordering::SeqCst), 0);
168             assert_eq!(DROP_F.load(Ordering::SeqCst), 1);
169             assert_eq!(DROP_T.load(Ordering::SeqCst), 1);
170             assert_eq!(DROP_S.load(Ordering::SeqCst), 1);
171         })
172         .add(|| {
173             thread::sleep(ms(200));
174 
175             assert!(future::block_on(task.cancel()).is_none());
176             assert_eq!(POLL.load(Ordering::SeqCst), 1);
177             assert_eq!(SCHEDULE.load(Ordering::SeqCst), 0);
178             assert_eq!(DROP_F.load(Ordering::SeqCst), 1);
179             assert_eq!(DROP_T.load(Ordering::SeqCst), 1);
180             assert_eq!(DROP_S.load(Ordering::SeqCst), 1);
181         })
182         .run();
183 }
184