• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 #![cfg(not(tokio_wasi))]
2 
3 use std::{task::Context, time::Duration};
4 
5 #[cfg(not(loom))]
6 use futures::task::noop_waker_ref;
7 
8 use crate::loom::sync::atomic::{AtomicBool, Ordering};
9 use crate::loom::sync::Arc;
10 use crate::loom::thread;
11 
12 use super::TimerEntry;
13 
block_on<T>(f: impl std::future::Future<Output = T>) -> T14 fn block_on<T>(f: impl std::future::Future<Output = T>) -> T {
15     #[cfg(loom)]
16     return loom::future::block_on(f);
17 
18     #[cfg(not(loom))]
19     {
20         let rt = crate::runtime::Builder::new_current_thread()
21             .build()
22             .unwrap();
23         rt.block_on(f)
24     }
25 }
26 
model(f: impl Fn() + Send + Sync + 'static)27 fn model(f: impl Fn() + Send + Sync + 'static) {
28     #[cfg(loom)]
29     loom::model(f);
30 
31     #[cfg(not(loom))]
32     f();
33 }
34 
rt(start_paused: bool) -> crate::runtime::Runtime35 fn rt(start_paused: bool) -> crate::runtime::Runtime {
36     crate::runtime::Builder::new_current_thread()
37         .enable_time()
38         .start_paused(start_paused)
39         .build()
40         .unwrap()
41 }
42 
43 #[test]
single_timer()44 fn single_timer() {
45     model(|| {
46         let rt = rt(false);
47         let handle = rt.handle();
48 
49         let handle_ = handle.clone();
50         let jh = thread::spawn(move || {
51             let entry = TimerEntry::new(
52                 &handle_.inner,
53                 handle_.inner.driver().clock().now() + Duration::from_secs(1),
54             );
55             pin!(entry);
56 
57             block_on(futures::future::poll_fn(|cx| {
58                 entry.as_mut().poll_elapsed(cx)
59             }))
60             .unwrap();
61         });
62 
63         thread::yield_now();
64 
65         let handle = handle.inner.driver().time();
66 
67         // This may or may not return Some (depending on how it races with the
68         // thread). If it does return None, however, the timer should complete
69         // synchronously.
70         handle.process_at_time(handle.time_source().now() + 2_000_000_000);
71 
72         jh.join().unwrap();
73     })
74 }
75 
76 #[test]
drop_timer()77 fn drop_timer() {
78     model(|| {
79         let rt = rt(false);
80         let handle = rt.handle();
81 
82         let handle_ = handle.clone();
83         let jh = thread::spawn(move || {
84             let entry = TimerEntry::new(
85                 &handle_.inner,
86                 handle_.inner.driver().clock().now() + Duration::from_secs(1),
87             );
88             pin!(entry);
89 
90             let _ = entry
91                 .as_mut()
92                 .poll_elapsed(&mut Context::from_waker(futures::task::noop_waker_ref()));
93             let _ = entry
94                 .as_mut()
95                 .poll_elapsed(&mut Context::from_waker(futures::task::noop_waker_ref()));
96         });
97 
98         thread::yield_now();
99 
100         let handle = handle.inner.driver().time();
101 
102         // advance 2s in the future.
103         handle.process_at_time(handle.time_source().now() + 2_000_000_000);
104 
105         jh.join().unwrap();
106     })
107 }
108 
109 #[test]
change_waker()110 fn change_waker() {
111     model(|| {
112         let rt = rt(false);
113         let handle = rt.handle();
114 
115         let handle_ = handle.clone();
116         let jh = thread::spawn(move || {
117             let entry = TimerEntry::new(
118                 &handle_.inner,
119                 handle_.inner.driver().clock().now() + Duration::from_secs(1),
120             );
121             pin!(entry);
122 
123             let _ = entry
124                 .as_mut()
125                 .poll_elapsed(&mut Context::from_waker(futures::task::noop_waker_ref()));
126 
127             block_on(futures::future::poll_fn(|cx| {
128                 entry.as_mut().poll_elapsed(cx)
129             }))
130             .unwrap();
131         });
132 
133         thread::yield_now();
134 
135         let handle = handle.inner.driver().time();
136 
137         // advance 2s
138         handle.process_at_time(handle.time_source().now() + 2_000_000_000);
139 
140         jh.join().unwrap();
141     })
142 }
143 
144 #[test]
reset_future()145 fn reset_future() {
146     model(|| {
147         let finished_early = Arc::new(AtomicBool::new(false));
148 
149         let rt = rt(false);
150         let handle = rt.handle();
151 
152         let handle_ = handle.clone();
153         let finished_early_ = finished_early.clone();
154         let start = handle.inner.driver().clock().now();
155 
156         let jh = thread::spawn(move || {
157             let entry = TimerEntry::new(&handle_.inner, start + Duration::from_secs(1));
158             pin!(entry);
159 
160             let _ = entry
161                 .as_mut()
162                 .poll_elapsed(&mut Context::from_waker(futures::task::noop_waker_ref()));
163 
164             entry.as_mut().reset(start + Duration::from_secs(2));
165 
166             // shouldn't complete before 2s
167             block_on(futures::future::poll_fn(|cx| {
168                 entry.as_mut().poll_elapsed(cx)
169             }))
170             .unwrap();
171 
172             finished_early_.store(true, Ordering::Relaxed);
173         });
174 
175         thread::yield_now();
176 
177         let handle = handle.inner.driver().time();
178 
179         // This may or may not return a wakeup time.
180         handle.process_at_time(
181             handle
182                 .time_source()
183                 .instant_to_tick(start + Duration::from_millis(1500)),
184         );
185 
186         assert!(!finished_early.load(Ordering::Relaxed));
187 
188         handle.process_at_time(
189             handle
190                 .time_source()
191                 .instant_to_tick(start + Duration::from_millis(2500)),
192         );
193 
194         jh.join().unwrap();
195 
196         assert!(finished_early.load(Ordering::Relaxed));
197     })
198 }
199 
200 #[cfg(not(loom))]
normal_or_miri<T>(normal: T, miri: T) -> T201 fn normal_or_miri<T>(normal: T, miri: T) -> T {
202     if cfg!(miri) {
203         miri
204     } else {
205         normal
206     }
207 }
208 
209 #[test]
210 #[cfg(not(loom))]
poll_process_levels()211 fn poll_process_levels() {
212     let rt = rt(true);
213     let handle = rt.handle();
214 
215     let mut entries = vec![];
216 
217     for i in 0..normal_or_miri(1024, 64) {
218         let mut entry = Box::pin(TimerEntry::new(
219             &handle.inner,
220             handle.inner.driver().clock().now() + Duration::from_millis(i),
221         ));
222 
223         let _ = entry
224             .as_mut()
225             .poll_elapsed(&mut Context::from_waker(noop_waker_ref()));
226 
227         entries.push(entry);
228     }
229 
230     for t in 1..normal_or_miri(1024, 64) {
231         handle.inner.driver().time().process_at_time(t as u64);
232 
233         for (deadline, future) in entries.iter_mut().enumerate() {
234             let mut context = Context::from_waker(noop_waker_ref());
235             if deadline <= t {
236                 assert!(future.as_mut().poll_elapsed(&mut context).is_ready());
237             } else {
238                 assert!(future.as_mut().poll_elapsed(&mut context).is_pending());
239             }
240         }
241     }
242 }
243 
244 #[test]
245 #[cfg(not(loom))]
poll_process_levels_targeted()246 fn poll_process_levels_targeted() {
247     let mut context = Context::from_waker(noop_waker_ref());
248 
249     let rt = rt(true);
250     let handle = rt.handle();
251 
252     let e1 = TimerEntry::new(
253         &handle.inner,
254         handle.inner.driver().clock().now() + Duration::from_millis(193),
255     );
256     pin!(e1);
257 
258     let handle = handle.inner.driver().time();
259 
260     handle.process_at_time(62);
261     assert!(e1.as_mut().poll_elapsed(&mut context).is_pending());
262     handle.process_at_time(192);
263     handle.process_at_time(192);
264 }
265