1 #![cfg(not(tokio_wasi))]
2
3 use std::{task::Context, time::Duration};
4
5 #[cfg(not(loom))]
6 use futures::task::noop_waker_ref;
7
8 use crate::loom::sync::atomic::{AtomicBool, Ordering};
9 use crate::loom::sync::Arc;
10 use crate::loom::thread;
11
12 use super::TimerEntry;
13
block_on<T>(f: impl std::future::Future<Output = T>) -> T14 fn block_on<T>(f: impl std::future::Future<Output = T>) -> T {
15 #[cfg(loom)]
16 return loom::future::block_on(f);
17
18 #[cfg(not(loom))]
19 {
20 let rt = crate::runtime::Builder::new_current_thread()
21 .build()
22 .unwrap();
23 rt.block_on(f)
24 }
25 }
26
model(f: impl Fn() + Send + Sync + 'static)27 fn model(f: impl Fn() + Send + Sync + 'static) {
28 #[cfg(loom)]
29 loom::model(f);
30
31 #[cfg(not(loom))]
32 f();
33 }
34
rt(start_paused: bool) -> crate::runtime::Runtime35 fn rt(start_paused: bool) -> crate::runtime::Runtime {
36 crate::runtime::Builder::new_current_thread()
37 .enable_time()
38 .start_paused(start_paused)
39 .build()
40 .unwrap()
41 }
42
43 #[test]
single_timer()44 fn single_timer() {
45 model(|| {
46 let rt = rt(false);
47 let handle = rt.handle();
48
49 let handle_ = handle.clone();
50 let jh = thread::spawn(move || {
51 let entry = TimerEntry::new(
52 &handle_.inner,
53 handle_.inner.driver().clock().now() + Duration::from_secs(1),
54 );
55 pin!(entry);
56
57 block_on(futures::future::poll_fn(|cx| {
58 entry.as_mut().poll_elapsed(cx)
59 }))
60 .unwrap();
61 });
62
63 thread::yield_now();
64
65 let handle = handle.inner.driver().time();
66
67 // This may or may not return Some (depending on how it races with the
68 // thread). If it does return None, however, the timer should complete
69 // synchronously.
70 handle.process_at_time(handle.time_source().now() + 2_000_000_000);
71
72 jh.join().unwrap();
73 })
74 }
75
76 #[test]
drop_timer()77 fn drop_timer() {
78 model(|| {
79 let rt = rt(false);
80 let handle = rt.handle();
81
82 let handle_ = handle.clone();
83 let jh = thread::spawn(move || {
84 let entry = TimerEntry::new(
85 &handle_.inner,
86 handle_.inner.driver().clock().now() + Duration::from_secs(1),
87 );
88 pin!(entry);
89
90 let _ = entry
91 .as_mut()
92 .poll_elapsed(&mut Context::from_waker(futures::task::noop_waker_ref()));
93 let _ = entry
94 .as_mut()
95 .poll_elapsed(&mut Context::from_waker(futures::task::noop_waker_ref()));
96 });
97
98 thread::yield_now();
99
100 let handle = handle.inner.driver().time();
101
102 // advance 2s in the future.
103 handle.process_at_time(handle.time_source().now() + 2_000_000_000);
104
105 jh.join().unwrap();
106 })
107 }
108
109 #[test]
change_waker()110 fn change_waker() {
111 model(|| {
112 let rt = rt(false);
113 let handle = rt.handle();
114
115 let handle_ = handle.clone();
116 let jh = thread::spawn(move || {
117 let entry = TimerEntry::new(
118 &handle_.inner,
119 handle_.inner.driver().clock().now() + Duration::from_secs(1),
120 );
121 pin!(entry);
122
123 let _ = entry
124 .as_mut()
125 .poll_elapsed(&mut Context::from_waker(futures::task::noop_waker_ref()));
126
127 block_on(futures::future::poll_fn(|cx| {
128 entry.as_mut().poll_elapsed(cx)
129 }))
130 .unwrap();
131 });
132
133 thread::yield_now();
134
135 let handle = handle.inner.driver().time();
136
137 // advance 2s
138 handle.process_at_time(handle.time_source().now() + 2_000_000_000);
139
140 jh.join().unwrap();
141 })
142 }
143
144 #[test]
reset_future()145 fn reset_future() {
146 model(|| {
147 let finished_early = Arc::new(AtomicBool::new(false));
148
149 let rt = rt(false);
150 let handle = rt.handle();
151
152 let handle_ = handle.clone();
153 let finished_early_ = finished_early.clone();
154 let start = handle.inner.driver().clock().now();
155
156 let jh = thread::spawn(move || {
157 let entry = TimerEntry::new(&handle_.inner, start + Duration::from_secs(1));
158 pin!(entry);
159
160 let _ = entry
161 .as_mut()
162 .poll_elapsed(&mut Context::from_waker(futures::task::noop_waker_ref()));
163
164 entry.as_mut().reset(start + Duration::from_secs(2));
165
166 // shouldn't complete before 2s
167 block_on(futures::future::poll_fn(|cx| {
168 entry.as_mut().poll_elapsed(cx)
169 }))
170 .unwrap();
171
172 finished_early_.store(true, Ordering::Relaxed);
173 });
174
175 thread::yield_now();
176
177 let handle = handle.inner.driver().time();
178
179 // This may or may not return a wakeup time.
180 handle.process_at_time(
181 handle
182 .time_source()
183 .instant_to_tick(start + Duration::from_millis(1500)),
184 );
185
186 assert!(!finished_early.load(Ordering::Relaxed));
187
188 handle.process_at_time(
189 handle
190 .time_source()
191 .instant_to_tick(start + Duration::from_millis(2500)),
192 );
193
194 jh.join().unwrap();
195
196 assert!(finished_early.load(Ordering::Relaxed));
197 })
198 }
199
200 #[cfg(not(loom))]
normal_or_miri<T>(normal: T, miri: T) -> T201 fn normal_or_miri<T>(normal: T, miri: T) -> T {
202 if cfg!(miri) {
203 miri
204 } else {
205 normal
206 }
207 }
208
209 #[test]
210 #[cfg(not(loom))]
poll_process_levels()211 fn poll_process_levels() {
212 let rt = rt(true);
213 let handle = rt.handle();
214
215 let mut entries = vec![];
216
217 for i in 0..normal_or_miri(1024, 64) {
218 let mut entry = Box::pin(TimerEntry::new(
219 &handle.inner,
220 handle.inner.driver().clock().now() + Duration::from_millis(i),
221 ));
222
223 let _ = entry
224 .as_mut()
225 .poll_elapsed(&mut Context::from_waker(noop_waker_ref()));
226
227 entries.push(entry);
228 }
229
230 for t in 1..normal_or_miri(1024, 64) {
231 handle.inner.driver().time().process_at_time(t as u64);
232
233 for (deadline, future) in entries.iter_mut().enumerate() {
234 let mut context = Context::from_waker(noop_waker_ref());
235 if deadline <= t {
236 assert!(future.as_mut().poll_elapsed(&mut context).is_ready());
237 } else {
238 assert!(future.as_mut().poll_elapsed(&mut context).is_pending());
239 }
240 }
241 }
242 }
243
244 #[test]
245 #[cfg(not(loom))]
poll_process_levels_targeted()246 fn poll_process_levels_targeted() {
247 let mut context = Context::from_waker(noop_waker_ref());
248
249 let rt = rt(true);
250 let handle = rt.handle();
251
252 let e1 = TimerEntry::new(
253 &handle.inner,
254 handle.inner.driver().clock().now() + Duration::from_millis(193),
255 );
256 pin!(e1);
257
258 let handle = handle.inner.driver().time();
259
260 handle.process_at_time(62);
261 assert!(e1.as_mut().poll_elapsed(&mut context).is_pending());
262 handle.process_at_time(192);
263 handle.process_at_time(192);
264 }
265