1 use std::{task::Context, time::Duration};
2
3 #[cfg(not(loom))]
4 use futures::task::noop_waker_ref;
5
6 use crate::loom::sync::Arc;
7 use crate::loom::thread;
8 use crate::{
9 loom::sync::atomic::{AtomicBool, Ordering},
10 park::Unpark,
11 };
12
13 use super::{Handle, TimerEntry};
14
15 struct MockUnpark {}
16 impl Unpark for MockUnpark {
unpark(&self)17 fn unpark(&self) {}
18 }
19 impl MockUnpark {
mock() -> Box<dyn Unpark>20 fn mock() -> Box<dyn Unpark> {
21 Box::new(Self {})
22 }
23 }
24
block_on<T>(f: impl std::future::Future<Output = T>) -> T25 fn block_on<T>(f: impl std::future::Future<Output = T>) -> T {
26 #[cfg(loom)]
27 return loom::future::block_on(f);
28
29 #[cfg(not(loom))]
30 return futures::executor::block_on(f);
31 }
32
model(f: impl Fn() + Send + Sync + 'static)33 fn model(f: impl Fn() + Send + Sync + 'static) {
34 #[cfg(loom)]
35 loom::model(f);
36
37 #[cfg(not(loom))]
38 f();
39 }
40
41 #[test]
single_timer()42 fn single_timer() {
43 model(|| {
44 let clock = crate::time::clock::Clock::new(true, false);
45 let time_source = super::ClockTime::new(clock.clone());
46
47 let inner = super::Inner::new(time_source.clone(), MockUnpark::mock());
48 let handle = Handle::new(Arc::new(inner));
49
50 let handle_ = handle.clone();
51 let jh = thread::spawn(move || {
52 let entry = TimerEntry::new(&handle_, clock.now() + Duration::from_secs(1));
53 pin!(entry);
54
55 block_on(futures::future::poll_fn(|cx| {
56 entry.as_mut().poll_elapsed(cx)
57 }))
58 .unwrap();
59 });
60
61 thread::yield_now();
62
63 // This may or may not return Some (depending on how it races with the
64 // thread). If it does return None, however, the timer should complete
65 // synchronously.
66 handle.process_at_time(time_source.now() + 2_000_000_000);
67
68 jh.join().unwrap();
69 })
70 }
71
72 #[test]
drop_timer()73 fn drop_timer() {
74 model(|| {
75 let clock = crate::time::clock::Clock::new(true, false);
76 let time_source = super::ClockTime::new(clock.clone());
77
78 let inner = super::Inner::new(time_source.clone(), MockUnpark::mock());
79 let handle = Handle::new(Arc::new(inner));
80
81 let handle_ = handle.clone();
82 let jh = thread::spawn(move || {
83 let entry = TimerEntry::new(&handle_, clock.now() + Duration::from_secs(1));
84 pin!(entry);
85
86 let _ = entry
87 .as_mut()
88 .poll_elapsed(&mut Context::from_waker(futures::task::noop_waker_ref()));
89 let _ = entry
90 .as_mut()
91 .poll_elapsed(&mut Context::from_waker(futures::task::noop_waker_ref()));
92 });
93
94 thread::yield_now();
95
96 // advance 2s in the future.
97 handle.process_at_time(time_source.now() + 2_000_000_000);
98
99 jh.join().unwrap();
100 })
101 }
102
103 #[test]
change_waker()104 fn change_waker() {
105 model(|| {
106 let clock = crate::time::clock::Clock::new(true, false);
107 let time_source = super::ClockTime::new(clock.clone());
108
109 let inner = super::Inner::new(time_source.clone(), MockUnpark::mock());
110 let handle = Handle::new(Arc::new(inner));
111
112 let handle_ = handle.clone();
113 let jh = thread::spawn(move || {
114 let entry = TimerEntry::new(&handle_, clock.now() + Duration::from_secs(1));
115 pin!(entry);
116
117 let _ = entry
118 .as_mut()
119 .poll_elapsed(&mut Context::from_waker(futures::task::noop_waker_ref()));
120
121 block_on(futures::future::poll_fn(|cx| {
122 entry.as_mut().poll_elapsed(cx)
123 }))
124 .unwrap();
125 });
126
127 thread::yield_now();
128
129 // advance 2s
130 handle.process_at_time(time_source.now() + 2_000_000_000);
131
132 jh.join().unwrap();
133 })
134 }
135
136 #[test]
reset_future()137 fn reset_future() {
138 model(|| {
139 let finished_early = Arc::new(AtomicBool::new(false));
140
141 let clock = crate::time::clock::Clock::new(true, false);
142 let time_source = super::ClockTime::new(clock.clone());
143
144 let inner = super::Inner::new(time_source.clone(), MockUnpark::mock());
145 let handle = Handle::new(Arc::new(inner));
146
147 let handle_ = handle.clone();
148 let finished_early_ = finished_early.clone();
149 let start = clock.now();
150
151 let jh = thread::spawn(move || {
152 let entry = TimerEntry::new(&handle_, start + Duration::from_secs(1));
153 pin!(entry);
154
155 let _ = entry
156 .as_mut()
157 .poll_elapsed(&mut Context::from_waker(futures::task::noop_waker_ref()));
158
159 entry.as_mut().reset(start + Duration::from_secs(2));
160
161 // shouldn't complete before 2s
162 block_on(futures::future::poll_fn(|cx| {
163 entry.as_mut().poll_elapsed(cx)
164 }))
165 .unwrap();
166
167 finished_early_.store(true, Ordering::Relaxed);
168 });
169
170 thread::yield_now();
171
172 // This may or may not return a wakeup time.
173 handle.process_at_time(time_source.instant_to_tick(start + Duration::from_millis(1500)));
174
175 assert!(!finished_early.load(Ordering::Relaxed));
176
177 handle.process_at_time(time_source.instant_to_tick(start + Duration::from_millis(2500)));
178
179 jh.join().unwrap();
180
181 assert!(finished_early.load(Ordering::Relaxed));
182 })
183 }
184
185 #[test]
186 #[cfg(not(loom))]
poll_process_levels()187 fn poll_process_levels() {
188 let clock = crate::time::clock::Clock::new(true, false);
189 clock.pause();
190
191 let time_source = super::ClockTime::new(clock.clone());
192
193 let inner = super::Inner::new(time_source, MockUnpark::mock());
194 let handle = Handle::new(Arc::new(inner));
195
196 let mut entries = vec![];
197
198 for i in 0..1024 {
199 let mut entry = Box::pin(TimerEntry::new(
200 &handle,
201 clock.now() + Duration::from_millis(i),
202 ));
203
204 let _ = entry
205 .as_mut()
206 .poll_elapsed(&mut Context::from_waker(noop_waker_ref()));
207
208 entries.push(entry);
209 }
210
211 for t in 1..1024 {
212 handle.process_at_time(t as u64);
213 for (deadline, future) in entries.iter_mut().enumerate() {
214 let mut context = Context::from_waker(noop_waker_ref());
215 if deadline <= t {
216 assert!(future.as_mut().poll_elapsed(&mut context).is_ready());
217 } else {
218 assert!(future.as_mut().poll_elapsed(&mut context).is_pending());
219 }
220 }
221 }
222 }
223
224 #[test]
225 #[cfg(not(loom))]
poll_process_levels_targeted()226 fn poll_process_levels_targeted() {
227 let mut context = Context::from_waker(noop_waker_ref());
228
229 let clock = crate::time::clock::Clock::new(true, false);
230 clock.pause();
231
232 let time_source = super::ClockTime::new(clock.clone());
233
234 let inner = super::Inner::new(time_source, MockUnpark::mock());
235 let handle = Handle::new(Arc::new(inner));
236
237 let e1 = TimerEntry::new(&handle, clock.now() + Duration::from_millis(193));
238 pin!(e1);
239
240 handle.process_at_time(62);
241 assert!(e1.as_mut().poll_elapsed(&mut context).is_pending());
242 handle.process_at_time(192);
243 handle.process_at_time(192);
244 }
245
246 /*
247 #[test]
248 fn balanced_incr_and_decr() {
249 const OPS: usize = 5;
250
251 fn incr(inner: Arc<Inner>) {
252 for _ in 0..OPS {
253 inner.increment().expect("increment should not have failed");
254 thread::yield_now();
255 }
256 }
257
258 fn decr(inner: Arc<Inner>) {
259 let mut ops_performed = 0;
260 while ops_performed < OPS {
261 if inner.num(Ordering::Relaxed) > 0 {
262 ops_performed += 1;
263 inner.decrement();
264 }
265 thread::yield_now();
266 }
267 }
268
269 loom::model(|| {
270 let unpark = Box::new(MockUnpark);
271 let instant = Instant::now();
272
273 let inner = Arc::new(Inner::new(instant, unpark));
274
275 let incr_inner = inner.clone();
276 let decr_inner = inner.clone();
277
278 let incr_hndle = thread::spawn(move || incr(incr_inner));
279 let decr_hndle = thread::spawn(move || decr(decr_inner));
280
281 incr_hndle.join().expect("should never fail");
282 decr_hndle.join().expect("should never fail");
283
284 assert_eq!(inner.num(Ordering::SeqCst), 0);
285 })
286 }
287 */
288