• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 use std::cell::Cell;
2 use std::future::Future;
3 use std::pin::Pin;
4 use std::sync::atomic::{AtomicUsize, Ordering};
5 use std::task::{Context, Poll};
6 use std::thread;
7 use std::time::Duration;
8 
9 use async_task::Runnable;
10 use atomic_waker::AtomicWaker;
11 
12 // Creates a future with event counters.
13 //
14 // Usage: `future!(f, get_waker, POLL, DROP)`
15 //
16 // The future `f` always sleeps for 200 ms, and returns `Poll::Ready` the second time it is polled.
17 // When it gets polled, `POLL` is incremented.
18 // When it gets dropped, `DROP` is incremented.
19 //
20 // Every time the future is run, it stores the waker into a global variable.
21 // This waker can be extracted using the `get_waker()` function.
22 macro_rules! future {
23     ($name:pat, $get_waker:pat, $poll:ident, $drop:ident) => {
24         static $poll: AtomicUsize = AtomicUsize::new(0);
25         static $drop: AtomicUsize = AtomicUsize::new(0);
26         static WAKER: AtomicWaker = AtomicWaker::new();
27 
28         let ($name, $get_waker) = {
29             struct Fut(Cell<bool>, Box<i32>);
30 
31             impl Future for Fut {
32                 type Output = Box<i32>;
33 
34                 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
35                     WAKER.register(cx.waker());
36                     $poll.fetch_add(1, Ordering::SeqCst);
37                     thread::sleep(ms(200));
38 
39                     if self.0.get() {
40                         Poll::Ready(Box::new(0))
41                     } else {
42                         self.0.set(true);
43                         Poll::Pending
44                     }
45                 }
46             }
47 
48             impl Drop for Fut {
49                 fn drop(&mut self) {
50                     $drop.fetch_add(1, Ordering::SeqCst);
51                 }
52             }
53 
54             (Fut(Cell::new(false), Box::new(0)), || WAKER.take().unwrap())
55         };
56     };
57 }
58 
59 // Creates a schedule function with event counters.
60 //
61 // Usage: `schedule!(s, chan, SCHED, DROP)`
62 //
63 // The schedule function `s` pushes the task into `chan`.
64 // When it gets invoked, `SCHED` is incremented.
65 // When it gets dropped, `DROP` is incremented.
66 //
67 // Receiver `chan` extracts the task when it is scheduled.
68 macro_rules! schedule {
69     ($name:pat, $chan:pat, $sched:ident, $drop:ident) => {
70         static $drop: AtomicUsize = AtomicUsize::new(0);
71         static $sched: AtomicUsize = AtomicUsize::new(0);
72 
73         let ($name, $chan) = {
74             let (s, r) = flume::unbounded();
75 
76             struct Guard(Box<i32>);
77 
78             impl Drop for Guard {
79                 fn drop(&mut self) {
80                     $drop.fetch_add(1, Ordering::SeqCst);
81                 }
82             }
83 
84             let guard = Guard(Box::new(0));
85             let sched = move |runnable: Runnable| {
86                 let _ = &guard;
87                 $sched.fetch_add(1, Ordering::SeqCst);
88                 s.send(runnable).unwrap();
89             };
90 
91             (sched, r)
92         };
93     };
94 }
95 
ms(ms: u64) -> Duration96 fn ms(ms: u64) -> Duration {
97     Duration::from_millis(ms)
98 }
99 
100 #[test]
wake()101 fn wake() {
102     future!(f, get_waker, POLL, DROP_F);
103     schedule!(s, chan, SCHEDULE, DROP_S);
104     let (mut runnable, task) = async_task::spawn(f, s);
105     task.detach();
106 
107     assert!(chan.is_empty());
108 
109     runnable.run();
110     assert_eq!(POLL.load(Ordering::SeqCst), 1);
111     assert_eq!(SCHEDULE.load(Ordering::SeqCst), 0);
112     assert_eq!(DROP_F.load(Ordering::SeqCst), 0);
113     assert_eq!(DROP_S.load(Ordering::SeqCst), 0);
114     assert_eq!(chan.len(), 0);
115 
116     get_waker().wake();
117     runnable = chan.recv().unwrap();
118     assert_eq!(POLL.load(Ordering::SeqCst), 1);
119     assert_eq!(SCHEDULE.load(Ordering::SeqCst), 1);
120     assert_eq!(DROP_F.load(Ordering::SeqCst), 0);
121     assert_eq!(DROP_S.load(Ordering::SeqCst), 0);
122     assert_eq!(chan.len(), 0);
123 
124     runnable.run();
125     assert_eq!(POLL.load(Ordering::SeqCst), 2);
126     assert_eq!(SCHEDULE.load(Ordering::SeqCst), 1);
127     assert_eq!(DROP_F.load(Ordering::SeqCst), 1);
128     assert_eq!(DROP_S.load(Ordering::SeqCst), 0);
129     assert_eq!(chan.len(), 0);
130 
131     get_waker().wake();
132     assert_eq!(POLL.load(Ordering::SeqCst), 2);
133     assert_eq!(SCHEDULE.load(Ordering::SeqCst), 1);
134     assert_eq!(DROP_F.load(Ordering::SeqCst), 1);
135     assert_eq!(DROP_S.load(Ordering::SeqCst), 1);
136     assert_eq!(chan.len(), 0);
137 }
138 
139 #[test]
wake_by_ref()140 fn wake_by_ref() {
141     future!(f, get_waker, POLL, DROP_F);
142     schedule!(s, chan, SCHEDULE, DROP_S);
143     let (mut runnable, task) = async_task::spawn(f, s);
144     task.detach();
145 
146     assert!(chan.is_empty());
147 
148     runnable.run();
149     assert_eq!(POLL.load(Ordering::SeqCst), 1);
150     assert_eq!(SCHEDULE.load(Ordering::SeqCst), 0);
151     assert_eq!(DROP_F.load(Ordering::SeqCst), 0);
152     assert_eq!(DROP_S.load(Ordering::SeqCst), 0);
153     assert_eq!(chan.len(), 0);
154 
155     get_waker().wake_by_ref();
156     runnable = chan.recv().unwrap();
157     assert_eq!(POLL.load(Ordering::SeqCst), 1);
158     assert_eq!(SCHEDULE.load(Ordering::SeqCst), 1);
159     assert_eq!(DROP_F.load(Ordering::SeqCst), 0);
160     assert_eq!(DROP_S.load(Ordering::SeqCst), 0);
161     assert_eq!(chan.len(), 0);
162 
163     runnable.run();
164     assert_eq!(POLL.load(Ordering::SeqCst), 2);
165     assert_eq!(SCHEDULE.load(Ordering::SeqCst), 1);
166     assert_eq!(DROP_F.load(Ordering::SeqCst), 1);
167     assert_eq!(DROP_S.load(Ordering::SeqCst), 0);
168     assert_eq!(chan.len(), 0);
169 
170     get_waker().wake_by_ref();
171     assert_eq!(POLL.load(Ordering::SeqCst), 2);
172     assert_eq!(SCHEDULE.load(Ordering::SeqCst), 1);
173     assert_eq!(DROP_F.load(Ordering::SeqCst), 1);
174     assert_eq!(DROP_S.load(Ordering::SeqCst), 1);
175     assert_eq!(chan.len(), 0);
176 }
177 
178 #[test]
clone()179 fn clone() {
180     future!(f, get_waker, POLL, DROP_F);
181     schedule!(s, chan, SCHEDULE, DROP_S);
182     let (mut runnable, task) = async_task::spawn(f, s);
183     task.detach();
184 
185     runnable.run();
186     assert_eq!(POLL.load(Ordering::SeqCst), 1);
187     assert_eq!(SCHEDULE.load(Ordering::SeqCst), 0);
188     assert_eq!(DROP_F.load(Ordering::SeqCst), 0);
189     assert_eq!(DROP_S.load(Ordering::SeqCst), 0);
190     assert_eq!(chan.len(), 0);
191 
192     let w2 = get_waker().clone();
193     let w3 = w2.clone();
194     let w4 = w3.clone();
195     w4.wake();
196 
197     runnable = chan.recv().unwrap();
198     runnable.run();
199     assert_eq!(POLL.load(Ordering::SeqCst), 2);
200     assert_eq!(SCHEDULE.load(Ordering::SeqCst), 1);
201     assert_eq!(DROP_F.load(Ordering::SeqCst), 1);
202     assert_eq!(DROP_S.load(Ordering::SeqCst), 0);
203     assert_eq!(chan.len(), 0);
204 
205     w3.wake();
206     assert_eq!(POLL.load(Ordering::SeqCst), 2);
207     assert_eq!(SCHEDULE.load(Ordering::SeqCst), 1);
208     assert_eq!(DROP_F.load(Ordering::SeqCst), 1);
209     assert_eq!(DROP_S.load(Ordering::SeqCst), 0);
210     assert_eq!(chan.len(), 0);
211 
212     drop(w2);
213     drop(get_waker());
214     assert_eq!(DROP_S.load(Ordering::SeqCst), 1);
215 }
216 
217 #[test]
wake_dropped()218 fn wake_dropped() {
219     future!(f, get_waker, POLL, DROP_F);
220     schedule!(s, chan, SCHEDULE, DROP_S);
221     let (runnable, task) = async_task::spawn(f, s);
222     task.detach();
223 
224     runnable.run();
225     assert_eq!(POLL.load(Ordering::SeqCst), 1);
226     assert_eq!(SCHEDULE.load(Ordering::SeqCst), 0);
227     assert_eq!(DROP_F.load(Ordering::SeqCst), 0);
228     assert_eq!(DROP_S.load(Ordering::SeqCst), 0);
229     assert_eq!(chan.len(), 0);
230 
231     let waker = get_waker();
232 
233     waker.wake_by_ref();
234     drop(chan.recv().unwrap());
235     assert_eq!(POLL.load(Ordering::SeqCst), 1);
236     assert_eq!(SCHEDULE.load(Ordering::SeqCst), 1);
237     assert_eq!(DROP_F.load(Ordering::SeqCst), 1);
238     assert_eq!(DROP_S.load(Ordering::SeqCst), 0);
239     assert_eq!(chan.len(), 0);
240 
241     waker.wake();
242     assert_eq!(POLL.load(Ordering::SeqCst), 1);
243     assert_eq!(SCHEDULE.load(Ordering::SeqCst), 1);
244     assert_eq!(DROP_F.load(Ordering::SeqCst), 1);
245     assert_eq!(DROP_S.load(Ordering::SeqCst), 1);
246     assert_eq!(chan.len(), 0);
247 }
248 
249 #[test]
wake_completed()250 fn wake_completed() {
251     future!(f, get_waker, POLL, DROP_F);
252     schedule!(s, chan, SCHEDULE, DROP_S);
253     let (runnable, task) = async_task::spawn(f, s);
254     task.detach();
255 
256     runnable.run();
257     let waker = get_waker();
258     assert_eq!(POLL.load(Ordering::SeqCst), 1);
259     assert_eq!(SCHEDULE.load(Ordering::SeqCst), 0);
260     assert_eq!(DROP_F.load(Ordering::SeqCst), 0);
261     assert_eq!(DROP_S.load(Ordering::SeqCst), 0);
262     assert_eq!(chan.len(), 0);
263 
264     waker.wake();
265     chan.recv().unwrap().run();
266     assert_eq!(POLL.load(Ordering::SeqCst), 2);
267     assert_eq!(SCHEDULE.load(Ordering::SeqCst), 1);
268     assert_eq!(DROP_F.load(Ordering::SeqCst), 1);
269     assert_eq!(DROP_S.load(Ordering::SeqCst), 0);
270     assert_eq!(chan.len(), 0);
271 
272     get_waker().wake();
273     assert_eq!(POLL.load(Ordering::SeqCst), 2);
274     assert_eq!(SCHEDULE.load(Ordering::SeqCst), 1);
275     assert_eq!(DROP_F.load(Ordering::SeqCst), 1);
276     assert_eq!(DROP_S.load(Ordering::SeqCst), 1);
277     assert_eq!(chan.len(), 0);
278 }
279