1 use std::future::Future;
2 use std::pin::Pin;
3 use std::sync::atomic::{AtomicUsize, Ordering};
4 use std::task::{Context, Poll};
5
6 use async_task::Runnable;
7 use smol::future;
8
9 // Creates a future with event counters.
10 //
11 // Usage: `future!(f, POLL, DROP)`
12 //
13 // The future `f` always returns `Poll::Ready`.
14 // When it gets polled, `POLL` is incremented.
15 // When it gets dropped, `DROP` is incremented.
16 macro_rules! future {
17 ($name:pat, $poll:ident, $drop:ident) => {
18 static $poll: AtomicUsize = AtomicUsize::new(0);
19 static $drop: AtomicUsize = AtomicUsize::new(0);
20
21 let $name = {
22 struct Fut(Box<i32>);
23
24 impl Future for Fut {
25 type Output = Box<i32>;
26
27 fn poll(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Self::Output> {
28 $poll.fetch_add(1, Ordering::SeqCst);
29 Poll::Ready(Box::new(0))
30 }
31 }
32
33 impl Drop for Fut {
34 fn drop(&mut self) {
35 $drop.fetch_add(1, Ordering::SeqCst);
36 }
37 }
38
39 Fut(Box::new(0))
40 };
41 };
42 }
43
44 // Creates a schedule function with event counters.
45 //
46 // Usage: `schedule!(s, SCHED, DROP)`
47 //
48 // The schedule function `s` does nothing.
49 // When it gets invoked, `SCHED` is incremented.
50 // When it gets dropped, `DROP` is incremented.
51 macro_rules! schedule {
52 ($name:pat, $sched:ident, $drop:ident) => {
53 static $drop: AtomicUsize = AtomicUsize::new(0);
54 static $sched: AtomicUsize = AtomicUsize::new(0);
55
56 let $name = {
57 struct Guard(Box<i32>);
58
59 impl Drop for Guard {
60 fn drop(&mut self) {
61 $drop.fetch_add(1, Ordering::SeqCst);
62 }
63 }
64
65 let guard = Guard(Box::new(0));
66 move |_runnable| {
67 let _ = &guard;
68 $sched.fetch_add(1, Ordering::SeqCst);
69 }
70 };
71 };
72 }
73
try_await<T>(f: impl Future<Output = T>) -> Option<T>74 fn try_await<T>(f: impl Future<Output = T>) -> Option<T> {
75 future::block_on(future::poll_once(f))
76 }
77
78 #[test]
drop_and_detach()79 fn drop_and_detach() {
80 future!(f, POLL, DROP_F);
81 schedule!(s, SCHEDULE, DROP_S);
82 let (runnable, task) = async_task::spawn(f, s);
83
84 assert_eq!(POLL.load(Ordering::SeqCst), 0);
85 assert_eq!(SCHEDULE.load(Ordering::SeqCst), 0);
86 assert_eq!(DROP_F.load(Ordering::SeqCst), 0);
87 assert_eq!(DROP_S.load(Ordering::SeqCst), 0);
88
89 drop(runnable);
90 assert_eq!(POLL.load(Ordering::SeqCst), 0);
91 assert_eq!(SCHEDULE.load(Ordering::SeqCst), 0);
92 assert_eq!(DROP_F.load(Ordering::SeqCst), 1);
93 assert_eq!(DROP_S.load(Ordering::SeqCst), 0);
94
95 task.detach();
96 assert_eq!(POLL.load(Ordering::SeqCst), 0);
97 assert_eq!(SCHEDULE.load(Ordering::SeqCst), 0);
98 assert_eq!(DROP_F.load(Ordering::SeqCst), 1);
99 assert_eq!(DROP_S.load(Ordering::SeqCst), 1);
100 }
101
102 #[test]
detach_and_drop()103 fn detach_and_drop() {
104 future!(f, POLL, DROP_F);
105 schedule!(s, SCHEDULE, DROP_S);
106 let (runnable, task) = async_task::spawn(f, s);
107
108 task.detach();
109 assert_eq!(POLL.load(Ordering::SeqCst), 0);
110 assert_eq!(SCHEDULE.load(Ordering::SeqCst), 0);
111 assert_eq!(DROP_F.load(Ordering::SeqCst), 0);
112 assert_eq!(DROP_S.load(Ordering::SeqCst), 0);
113
114 drop(runnable);
115 assert_eq!(POLL.load(Ordering::SeqCst), 0);
116 assert_eq!(SCHEDULE.load(Ordering::SeqCst), 0);
117 assert_eq!(DROP_F.load(Ordering::SeqCst), 1);
118 assert_eq!(DROP_S.load(Ordering::SeqCst), 1);
119 }
120
121 #[test]
detach_and_run()122 fn detach_and_run() {
123 future!(f, POLL, DROP_F);
124 schedule!(s, SCHEDULE, DROP_S);
125 let (runnable, task) = async_task::spawn(f, s);
126
127 task.detach();
128 assert_eq!(POLL.load(Ordering::SeqCst), 0);
129 assert_eq!(SCHEDULE.load(Ordering::SeqCst), 0);
130 assert_eq!(DROP_F.load(Ordering::SeqCst), 0);
131 assert_eq!(DROP_S.load(Ordering::SeqCst), 0);
132
133 runnable.run();
134 assert_eq!(POLL.load(Ordering::SeqCst), 1);
135 assert_eq!(SCHEDULE.load(Ordering::SeqCst), 0);
136 assert_eq!(DROP_F.load(Ordering::SeqCst), 1);
137 assert_eq!(DROP_S.load(Ordering::SeqCst), 1);
138 }
139
140 #[test]
run_and_detach()141 fn run_and_detach() {
142 future!(f, POLL, DROP_F);
143 schedule!(s, SCHEDULE, DROP_S);
144 let (runnable, task) = async_task::spawn(f, s);
145
146 runnable.run();
147 assert_eq!(POLL.load(Ordering::SeqCst), 1);
148 assert_eq!(SCHEDULE.load(Ordering::SeqCst), 0);
149 assert_eq!(DROP_F.load(Ordering::SeqCst), 1);
150 assert_eq!(DROP_S.load(Ordering::SeqCst), 0);
151
152 task.detach();
153 assert_eq!(POLL.load(Ordering::SeqCst), 1);
154 assert_eq!(SCHEDULE.load(Ordering::SeqCst), 0);
155 assert_eq!(DROP_F.load(Ordering::SeqCst), 1);
156 assert_eq!(DROP_S.load(Ordering::SeqCst), 1);
157 }
158
159 #[test]
cancel_and_run()160 fn cancel_and_run() {
161 future!(f, POLL, DROP_F);
162 schedule!(s, SCHEDULE, DROP_S);
163 let (runnable, task) = async_task::spawn(f, s);
164
165 drop(task);
166 assert_eq!(POLL.load(Ordering::SeqCst), 0);
167 assert_eq!(SCHEDULE.load(Ordering::SeqCst), 0);
168 assert_eq!(DROP_F.load(Ordering::SeqCst), 0);
169 assert_eq!(DROP_S.load(Ordering::SeqCst), 0);
170
171 runnable.run();
172 assert_eq!(POLL.load(Ordering::SeqCst), 0);
173 assert_eq!(SCHEDULE.load(Ordering::SeqCst), 0);
174 assert_eq!(DROP_F.load(Ordering::SeqCst), 1);
175 assert_eq!(DROP_S.load(Ordering::SeqCst), 1);
176 }
177
178 #[test]
run_and_cancel()179 fn run_and_cancel() {
180 future!(f, POLL, DROP_F);
181 schedule!(s, SCHEDULE, DROP_S);
182 let (runnable, task) = async_task::spawn(f, s);
183
184 runnable.run();
185 assert_eq!(POLL.load(Ordering::SeqCst), 1);
186 assert_eq!(SCHEDULE.load(Ordering::SeqCst), 0);
187 assert_eq!(DROP_F.load(Ordering::SeqCst), 1);
188 assert_eq!(DROP_S.load(Ordering::SeqCst), 0);
189
190 drop(task);
191 assert_eq!(POLL.load(Ordering::SeqCst), 1);
192 assert_eq!(SCHEDULE.load(Ordering::SeqCst), 0);
193 assert_eq!(DROP_F.load(Ordering::SeqCst), 1);
194 assert_eq!(DROP_S.load(Ordering::SeqCst), 1);
195 }
196
197 #[test]
cancel_join()198 fn cancel_join() {
199 future!(f, POLL, DROP_F);
200 schedule!(s, SCHEDULE, DROP_S);
201 let (runnable, mut task) = async_task::spawn(f, s);
202
203 assert!(try_await(&mut task).is_none());
204 assert_eq!(POLL.load(Ordering::SeqCst), 0);
205 assert_eq!(SCHEDULE.load(Ordering::SeqCst), 0);
206 assert_eq!(DROP_F.load(Ordering::SeqCst), 0);
207 assert_eq!(DROP_S.load(Ordering::SeqCst), 0);
208
209 runnable.run();
210 assert_eq!(POLL.load(Ordering::SeqCst), 1);
211 assert_eq!(SCHEDULE.load(Ordering::SeqCst), 0);
212 assert_eq!(DROP_F.load(Ordering::SeqCst), 1);
213 assert_eq!(DROP_S.load(Ordering::SeqCst), 0);
214
215 assert!(try_await(&mut task).is_some());
216 assert_eq!(POLL.load(Ordering::SeqCst), 1);
217 assert_eq!(SCHEDULE.load(Ordering::SeqCst), 0);
218 assert_eq!(DROP_F.load(Ordering::SeqCst), 1);
219 assert_eq!(DROP_S.load(Ordering::SeqCst), 0);
220
221 drop(task);
222 assert_eq!(POLL.load(Ordering::SeqCst), 1);
223 assert_eq!(SCHEDULE.load(Ordering::SeqCst), 0);
224 assert_eq!(DROP_F.load(Ordering::SeqCst), 1);
225 assert_eq!(DROP_S.load(Ordering::SeqCst), 1);
226 }
227
228 #[test]
schedule()229 fn schedule() {
230 let (s, r) = flume::unbounded();
231 let schedule = move |runnable| s.send(runnable).unwrap();
232 let (runnable, _task) = async_task::spawn(future::poll_fn(|_| Poll::<()>::Pending), schedule);
233
234 assert!(r.is_empty());
235 runnable.schedule();
236
237 let runnable = r.recv().unwrap();
238 assert!(r.is_empty());
239 runnable.schedule();
240
241 let runnable = r.recv().unwrap();
242 assert!(r.is_empty());
243 runnable.schedule();
244
245 r.recv().unwrap();
246 }
247
248 #[test]
schedule_counter()249 fn schedule_counter() {
250 static COUNT: AtomicUsize = AtomicUsize::new(0);
251
252 let (s, r) = flume::unbounded();
253 let schedule = move |runnable: Runnable| {
254 COUNT.fetch_add(1, Ordering::SeqCst);
255 s.send(runnable).unwrap();
256 };
257 let (runnable, _task) = async_task::spawn(future::poll_fn(|_| Poll::<()>::Pending), schedule);
258 runnable.schedule();
259
260 r.recv().unwrap().schedule();
261 r.recv().unwrap().schedule();
262 assert_eq!(COUNT.load(Ordering::SeqCst), 3);
263 r.recv().unwrap();
264 }
265
266 #[test]
drop_inside_schedule()267 fn drop_inside_schedule() {
268 struct DropGuard(AtomicUsize);
269 impl Drop for DropGuard {
270 fn drop(&mut self) {
271 self.0.fetch_add(1, Ordering::SeqCst);
272 }
273 }
274 let guard = DropGuard(AtomicUsize::new(0));
275
276 let (runnable, _) = async_task::spawn(async {}, move |runnable| {
277 assert_eq!(guard.0.load(Ordering::SeqCst), 0);
278 drop(runnable);
279 assert_eq!(guard.0.load(Ordering::SeqCst), 0);
280 });
281 runnable.schedule();
282 }
283
284 #[test]
waker()285 fn waker() {
286 let (s, r) = flume::unbounded();
287 let schedule = move |runnable| s.send(runnable).unwrap();
288 let (runnable, _task) = async_task::spawn(future::poll_fn(|_| Poll::<()>::Pending), schedule);
289
290 assert!(r.is_empty());
291 let waker = runnable.waker();
292 runnable.run();
293 waker.wake_by_ref();
294
295 let runnable = r.recv().unwrap();
296 runnable.run();
297 waker.wake();
298 r.recv().unwrap();
299 }
300