1 use std::cell::Cell;
2 use std::future::Future;
3 use std::pin::Pin;
4 use std::sync::atomic::{AtomicUsize, Ordering};
5 use std::task::{Context, Poll};
6 use std::thread;
7 use std::time::Duration;
8
9 use async_task::Runnable;
10 use atomic_waker::AtomicWaker;
11
12 // Creates a future with event counters.
13 //
14 // Usage: `future!(f, get_waker, POLL, DROP)`
15 //
16 // The future `f` always sleeps for 200 ms, and returns `Poll::Ready` the second time it is polled.
17 // When it gets polled, `POLL` is incremented.
18 // When it gets dropped, `DROP` is incremented.
19 //
20 // Every time the future is run, it stores the waker into a global variable.
21 // This waker can be extracted using the `get_waker()` function.
22 macro_rules! future {
23 ($name:pat, $get_waker:pat, $poll:ident, $drop:ident) => {
24 static $poll: AtomicUsize = AtomicUsize::new(0);
25 static $drop: AtomicUsize = AtomicUsize::new(0);
26 static WAKER: AtomicWaker = AtomicWaker::new();
27
28 let ($name, $get_waker) = {
29 struct Fut(Cell<bool>, Box<i32>);
30
31 impl Future for Fut {
32 type Output = Box<i32>;
33
34 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
35 WAKER.register(cx.waker());
36 $poll.fetch_add(1, Ordering::SeqCst);
37 thread::sleep(ms(200));
38
39 if self.0.get() {
40 Poll::Ready(Box::new(0))
41 } else {
42 self.0.set(true);
43 Poll::Pending
44 }
45 }
46 }
47
48 impl Drop for Fut {
49 fn drop(&mut self) {
50 $drop.fetch_add(1, Ordering::SeqCst);
51 }
52 }
53
54 (Fut(Cell::new(false), Box::new(0)), || WAKER.take().unwrap())
55 };
56 };
57 }
58
59 // Creates a schedule function with event counters.
60 //
61 // Usage: `schedule!(s, chan, SCHED, DROP)`
62 //
63 // The schedule function `s` pushes the task into `chan`.
64 // When it gets invoked, `SCHED` is incremented.
65 // When it gets dropped, `DROP` is incremented.
66 //
67 // Receiver `chan` extracts the task when it is scheduled.
68 macro_rules! schedule {
69 ($name:pat, $chan:pat, $sched:ident, $drop:ident) => {
70 static $drop: AtomicUsize = AtomicUsize::new(0);
71 static $sched: AtomicUsize = AtomicUsize::new(0);
72
73 let ($name, $chan) = {
74 let (s, r) = flume::unbounded();
75
76 struct Guard(Box<i32>);
77
78 impl Drop for Guard {
79 fn drop(&mut self) {
80 $drop.fetch_add(1, Ordering::SeqCst);
81 }
82 }
83
84 let guard = Guard(Box::new(0));
85 let sched = move |runnable: Runnable| {
86 let _ = &guard;
87 $sched.fetch_add(1, Ordering::SeqCst);
88 s.send(runnable).unwrap();
89 };
90
91 (sched, r)
92 };
93 };
94 }
95
ms(ms: u64) -> Duration96 fn ms(ms: u64) -> Duration {
97 Duration::from_millis(ms)
98 }
99
100 #[test]
wake()101 fn wake() {
102 future!(f, get_waker, POLL, DROP_F);
103 schedule!(s, chan, SCHEDULE, DROP_S);
104 let (mut runnable, task) = async_task::spawn(f, s);
105 task.detach();
106
107 assert!(chan.is_empty());
108
109 runnable.run();
110 assert_eq!(POLL.load(Ordering::SeqCst), 1);
111 assert_eq!(SCHEDULE.load(Ordering::SeqCst), 0);
112 assert_eq!(DROP_F.load(Ordering::SeqCst), 0);
113 assert_eq!(DROP_S.load(Ordering::SeqCst), 0);
114 assert_eq!(chan.len(), 0);
115
116 get_waker().wake();
117 runnable = chan.recv().unwrap();
118 assert_eq!(POLL.load(Ordering::SeqCst), 1);
119 assert_eq!(SCHEDULE.load(Ordering::SeqCst), 1);
120 assert_eq!(DROP_F.load(Ordering::SeqCst), 0);
121 assert_eq!(DROP_S.load(Ordering::SeqCst), 0);
122 assert_eq!(chan.len(), 0);
123
124 runnable.run();
125 assert_eq!(POLL.load(Ordering::SeqCst), 2);
126 assert_eq!(SCHEDULE.load(Ordering::SeqCst), 1);
127 assert_eq!(DROP_F.load(Ordering::SeqCst), 1);
128 assert_eq!(DROP_S.load(Ordering::SeqCst), 0);
129 assert_eq!(chan.len(), 0);
130
131 get_waker().wake();
132 assert_eq!(POLL.load(Ordering::SeqCst), 2);
133 assert_eq!(SCHEDULE.load(Ordering::SeqCst), 1);
134 assert_eq!(DROP_F.load(Ordering::SeqCst), 1);
135 assert_eq!(DROP_S.load(Ordering::SeqCst), 1);
136 assert_eq!(chan.len(), 0);
137 }
138
139 #[test]
wake_by_ref()140 fn wake_by_ref() {
141 future!(f, get_waker, POLL, DROP_F);
142 schedule!(s, chan, SCHEDULE, DROP_S);
143 let (mut runnable, task) = async_task::spawn(f, s);
144 task.detach();
145
146 assert!(chan.is_empty());
147
148 runnable.run();
149 assert_eq!(POLL.load(Ordering::SeqCst), 1);
150 assert_eq!(SCHEDULE.load(Ordering::SeqCst), 0);
151 assert_eq!(DROP_F.load(Ordering::SeqCst), 0);
152 assert_eq!(DROP_S.load(Ordering::SeqCst), 0);
153 assert_eq!(chan.len(), 0);
154
155 get_waker().wake_by_ref();
156 runnable = chan.recv().unwrap();
157 assert_eq!(POLL.load(Ordering::SeqCst), 1);
158 assert_eq!(SCHEDULE.load(Ordering::SeqCst), 1);
159 assert_eq!(DROP_F.load(Ordering::SeqCst), 0);
160 assert_eq!(DROP_S.load(Ordering::SeqCst), 0);
161 assert_eq!(chan.len(), 0);
162
163 runnable.run();
164 assert_eq!(POLL.load(Ordering::SeqCst), 2);
165 assert_eq!(SCHEDULE.load(Ordering::SeqCst), 1);
166 assert_eq!(DROP_F.load(Ordering::SeqCst), 1);
167 assert_eq!(DROP_S.load(Ordering::SeqCst), 0);
168 assert_eq!(chan.len(), 0);
169
170 get_waker().wake_by_ref();
171 assert_eq!(POLL.load(Ordering::SeqCst), 2);
172 assert_eq!(SCHEDULE.load(Ordering::SeqCst), 1);
173 assert_eq!(DROP_F.load(Ordering::SeqCst), 1);
174 assert_eq!(DROP_S.load(Ordering::SeqCst), 1);
175 assert_eq!(chan.len(), 0);
176 }
177
178 #[test]
clone()179 fn clone() {
180 future!(f, get_waker, POLL, DROP_F);
181 schedule!(s, chan, SCHEDULE, DROP_S);
182 let (mut runnable, task) = async_task::spawn(f, s);
183 task.detach();
184
185 runnable.run();
186 assert_eq!(POLL.load(Ordering::SeqCst), 1);
187 assert_eq!(SCHEDULE.load(Ordering::SeqCst), 0);
188 assert_eq!(DROP_F.load(Ordering::SeqCst), 0);
189 assert_eq!(DROP_S.load(Ordering::SeqCst), 0);
190 assert_eq!(chan.len(), 0);
191
192 let w2 = get_waker().clone();
193 let w3 = w2.clone();
194 let w4 = w3.clone();
195 w4.wake();
196
197 runnable = chan.recv().unwrap();
198 runnable.run();
199 assert_eq!(POLL.load(Ordering::SeqCst), 2);
200 assert_eq!(SCHEDULE.load(Ordering::SeqCst), 1);
201 assert_eq!(DROP_F.load(Ordering::SeqCst), 1);
202 assert_eq!(DROP_S.load(Ordering::SeqCst), 0);
203 assert_eq!(chan.len(), 0);
204
205 w3.wake();
206 assert_eq!(POLL.load(Ordering::SeqCst), 2);
207 assert_eq!(SCHEDULE.load(Ordering::SeqCst), 1);
208 assert_eq!(DROP_F.load(Ordering::SeqCst), 1);
209 assert_eq!(DROP_S.load(Ordering::SeqCst), 0);
210 assert_eq!(chan.len(), 0);
211
212 drop(w2);
213 drop(get_waker());
214 assert_eq!(DROP_S.load(Ordering::SeqCst), 1);
215 }
216
217 #[test]
wake_dropped()218 fn wake_dropped() {
219 future!(f, get_waker, POLL, DROP_F);
220 schedule!(s, chan, SCHEDULE, DROP_S);
221 let (runnable, task) = async_task::spawn(f, s);
222 task.detach();
223
224 runnable.run();
225 assert_eq!(POLL.load(Ordering::SeqCst), 1);
226 assert_eq!(SCHEDULE.load(Ordering::SeqCst), 0);
227 assert_eq!(DROP_F.load(Ordering::SeqCst), 0);
228 assert_eq!(DROP_S.load(Ordering::SeqCst), 0);
229 assert_eq!(chan.len(), 0);
230
231 let waker = get_waker();
232
233 waker.wake_by_ref();
234 drop(chan.recv().unwrap());
235 assert_eq!(POLL.load(Ordering::SeqCst), 1);
236 assert_eq!(SCHEDULE.load(Ordering::SeqCst), 1);
237 assert_eq!(DROP_F.load(Ordering::SeqCst), 1);
238 assert_eq!(DROP_S.load(Ordering::SeqCst), 0);
239 assert_eq!(chan.len(), 0);
240
241 waker.wake();
242 assert_eq!(POLL.load(Ordering::SeqCst), 1);
243 assert_eq!(SCHEDULE.load(Ordering::SeqCst), 1);
244 assert_eq!(DROP_F.load(Ordering::SeqCst), 1);
245 assert_eq!(DROP_S.load(Ordering::SeqCst), 1);
246 assert_eq!(chan.len(), 0);
247 }
248
249 #[test]
wake_completed()250 fn wake_completed() {
251 future!(f, get_waker, POLL, DROP_F);
252 schedule!(s, chan, SCHEDULE, DROP_S);
253 let (runnable, task) = async_task::spawn(f, s);
254 task.detach();
255
256 runnable.run();
257 let waker = get_waker();
258 assert_eq!(POLL.load(Ordering::SeqCst), 1);
259 assert_eq!(SCHEDULE.load(Ordering::SeqCst), 0);
260 assert_eq!(DROP_F.load(Ordering::SeqCst), 0);
261 assert_eq!(DROP_S.load(Ordering::SeqCst), 0);
262 assert_eq!(chan.len(), 0);
263
264 waker.wake();
265 chan.recv().unwrap().run();
266 assert_eq!(POLL.load(Ordering::SeqCst), 2);
267 assert_eq!(SCHEDULE.load(Ordering::SeqCst), 1);
268 assert_eq!(DROP_F.load(Ordering::SeqCst), 1);
269 assert_eq!(DROP_S.load(Ordering::SeqCst), 0);
270 assert_eq!(chan.len(), 0);
271
272 get_waker().wake();
273 assert_eq!(POLL.load(Ordering::SeqCst), 2);
274 assert_eq!(SCHEDULE.load(Ordering::SeqCst), 1);
275 assert_eq!(DROP_F.load(Ordering::SeqCst), 1);
276 assert_eq!(DROP_S.load(Ordering::SeqCst), 1);
277 assert_eq!(chan.len(), 0);
278 }
279