1 use std::future::Future;
2 use std::pin::Pin;
3 use std::sync::atomic::{AtomicUsize, Ordering};
4 use std::task::{Context, Poll};
5 use std::thread;
6 use std::time::Duration;
7
8 use async_task::Runnable;
9 use easy_parallel::Parallel;
10 use smol::future;
11
12 // Creates a future with event counters.
13 //
14 // Usage: `future!(f, POLL, DROP_F, DROP_T)`
15 //
16 // The future `f` outputs `Poll::Ready`.
17 // When it gets polled, `POLL` is incremented.
18 // When it gets dropped, `DROP_F` is incremented.
19 // When the output gets dropped, `DROP_T` is incremented.
20 macro_rules! future {
21 ($name:pat, $poll:ident, $drop_f:ident, $drop_t:ident) => {
22 static $poll: AtomicUsize = AtomicUsize::new(0);
23 static $drop_f: AtomicUsize = AtomicUsize::new(0);
24 static $drop_t: AtomicUsize = AtomicUsize::new(0);
25
26 let $name = {
27 struct Fut(Box<i32>);
28
29 impl Future for Fut {
30 type Output = Out;
31
32 fn poll(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Self::Output> {
33 $poll.fetch_add(1, Ordering::SeqCst);
34 thread::sleep(ms(400));
35 Poll::Ready(Out(Box::new(0), true))
36 }
37 }
38
39 impl Drop for Fut {
40 fn drop(&mut self) {
41 $drop_f.fetch_add(1, Ordering::SeqCst);
42 }
43 }
44
45 #[derive(Default)]
46 struct Out(Box<i32>, bool);
47
48 impl Drop for Out {
49 fn drop(&mut self) {
50 if self.1 {
51 $drop_t.fetch_add(1, Ordering::SeqCst);
52 }
53 }
54 }
55
56 Fut(Box::new(0))
57 };
58 };
59 }
60
61 // Creates a schedule function with event counters.
62 //
63 // Usage: `schedule!(s, SCHED, DROP)`
64 //
65 // The schedule function `s` does nothing.
66 // When it gets invoked, `SCHED` is incremented.
67 // When it gets dropped, `DROP` is incremented.
68 macro_rules! schedule {
69 ($name:pat, $sched:ident, $drop:ident) => {
70 static $drop: AtomicUsize = AtomicUsize::new(0);
71 static $sched: AtomicUsize = AtomicUsize::new(0);
72
73 let $name = {
74 struct Guard(Box<i32>);
75
76 impl Drop for Guard {
77 fn drop(&mut self) {
78 $drop.fetch_add(1, Ordering::SeqCst);
79 }
80 }
81
82 let guard = Guard(Box::new(0));
83 move |runnable: Runnable| {
84 &guard;
85 runnable.schedule();
86 $sched.fetch_add(1, Ordering::SeqCst);
87 }
88 };
89 };
90 }
91
ms(ms: u64) -> Duration92 fn ms(ms: u64) -> Duration {
93 Duration::from_millis(ms)
94 }
95
96 #[test]
run_and_cancel()97 fn run_and_cancel() {
98 future!(f, POLL, DROP_F, DROP_T);
99 schedule!(s, SCHEDULE, DROP_S);
100 let (runnable, task) = async_task::spawn(f, s);
101
102 runnable.run();
103 assert_eq!(POLL.load(Ordering::SeqCst), 1);
104 assert_eq!(SCHEDULE.load(Ordering::SeqCst), 0);
105 assert_eq!(DROP_F.load(Ordering::SeqCst), 1);
106 assert_eq!(DROP_T.load(Ordering::SeqCst), 0);
107 assert_eq!(DROP_S.load(Ordering::SeqCst), 0);
108
109 assert!(future::block_on(task.cancel()).is_some());
110 assert_eq!(POLL.load(Ordering::SeqCst), 1);
111 assert_eq!(SCHEDULE.load(Ordering::SeqCst), 0);
112 assert_eq!(DROP_F.load(Ordering::SeqCst), 1);
113 assert_eq!(DROP_T.load(Ordering::SeqCst), 1);
114 assert_eq!(DROP_S.load(Ordering::SeqCst), 1);
115 }
116
117 #[test]
cancel_and_run()118 fn cancel_and_run() {
119 future!(f, POLL, DROP_F, DROP_T);
120 schedule!(s, SCHEDULE, DROP_S);
121 let (runnable, task) = async_task::spawn(f, s);
122
123 Parallel::new()
124 .add(|| {
125 thread::sleep(ms(200));
126 runnable.run();
127
128 assert_eq!(POLL.load(Ordering::SeqCst), 0);
129 assert_eq!(SCHEDULE.load(Ordering::SeqCst), 0);
130 assert_eq!(DROP_F.load(Ordering::SeqCst), 1);
131 assert_eq!(DROP_T.load(Ordering::SeqCst), 0);
132
133 thread::sleep(ms(200));
134
135 assert_eq!(DROP_S.load(Ordering::SeqCst), 1);
136 })
137 .add(|| {
138 assert!(future::block_on(task.cancel()).is_none());
139
140 thread::sleep(ms(200));
141
142 assert_eq!(POLL.load(Ordering::SeqCst), 0);
143 assert_eq!(SCHEDULE.load(Ordering::SeqCst), 0);
144 assert_eq!(DROP_F.load(Ordering::SeqCst), 1);
145 assert_eq!(DROP_T.load(Ordering::SeqCst), 0);
146
147 thread::sleep(ms(200));
148
149 assert_eq!(DROP_S.load(Ordering::SeqCst), 1);
150 })
151 .run();
152 }
153
154 #[test]
cancel_during_run()155 fn cancel_during_run() {
156 future!(f, POLL, DROP_F, DROP_T);
157 schedule!(s, SCHEDULE, DROP_S);
158 let (runnable, task) = async_task::spawn(f, s);
159
160 Parallel::new()
161 .add(|| {
162 runnable.run();
163
164 thread::sleep(ms(200));
165
166 assert_eq!(POLL.load(Ordering::SeqCst), 1);
167 assert_eq!(SCHEDULE.load(Ordering::SeqCst), 0);
168 assert_eq!(DROP_F.load(Ordering::SeqCst), 1);
169 assert_eq!(DROP_T.load(Ordering::SeqCst), 1);
170 assert_eq!(DROP_S.load(Ordering::SeqCst), 1);
171 })
172 .add(|| {
173 thread::sleep(ms(200));
174
175 assert!(future::block_on(task.cancel()).is_none());
176 assert_eq!(POLL.load(Ordering::SeqCst), 1);
177 assert_eq!(SCHEDULE.load(Ordering::SeqCst), 0);
178 assert_eq!(DROP_F.load(Ordering::SeqCst), 1);
179 assert_eq!(DROP_T.load(Ordering::SeqCst), 1);
180 assert_eq!(DROP_S.load(Ordering::SeqCst), 1);
181 })
182 .run();
183 }
184