1 // Currently, rust warns when an unsafe fn contains an unsafe {} block. However, 2 // in the future, this will change to the reverse. For now, suppress this 3 // warning and generally stick with being explicit about unsafety. 4 #![allow(unused_unsafe)] 5 #![cfg_attr(not(feature = "rt"), allow(dead_code))] 6 7 //! Time driver 8 9 mod entry; 10 pub(self) use self::entry::{EntryList, TimerEntry, TimerHandle, TimerShared}; 11 12 mod handle; 13 pub(crate) use self::handle::Handle; 14 15 mod wheel; 16 17 pub(super) mod sleep; 18 19 use crate::loom::sync::atomic::{AtomicBool, Ordering}; 20 use crate::loom::sync::{Arc, Mutex}; 21 use crate::park::{Park, Unpark}; 22 use crate::time::error::Error; 23 use crate::time::{Clock, Duration, Instant}; 24 25 use std::convert::TryInto; 26 use std::fmt; 27 use std::{num::NonZeroU64, ptr::NonNull, task::Waker}; 28 29 /// Time implementation that drives [`Sleep`][sleep], [`Interval`][interval], and [`Timeout`][timeout]. 30 /// 31 /// A `Driver` instance tracks the state necessary for managing time and 32 /// notifying the [`Sleep`][sleep] instances once their deadlines are reached. 33 /// 34 /// It is expected that a single instance manages many individual [`Sleep`][sleep] 35 /// instances. The `Driver` implementation is thread-safe and, as such, is able 36 /// to handle callers from across threads. 37 /// 38 /// After creating the `Driver` instance, the caller must repeatedly call `park` 39 /// or `park_timeout`. The time driver will perform no work unless `park` or 40 /// `park_timeout` is called repeatedly. 41 /// 42 /// The driver has a resolution of one millisecond. Any unit of time that falls 43 /// between milliseconds are rounded up to the next millisecond. 44 /// 45 /// When an instance is dropped, any outstanding [`Sleep`][sleep] instance that has not 46 /// elapsed will be notified with an error. At this point, calling `poll` on the 47 /// [`Sleep`][sleep] instance will result in panic. 48 /// 49 /// # Implementation 50 /// 51 /// The time driver is based on the [paper by Varghese and Lauck][paper]. 52 /// 53 /// A hashed timing wheel is a vector of slots, where each slot handles a time 54 /// slice. As time progresses, the timer walks over the slot for the current 55 /// instant, and processes each entry for that slot. When the timer reaches the 56 /// end of the wheel, it starts again at the beginning. 57 /// 58 /// The implementation maintains six wheels arranged in a set of levels. As the 59 /// levels go up, the slots of the associated wheel represent larger intervals 60 /// of time. At each level, the wheel has 64 slots. Each slot covers a range of 61 /// time equal to the wheel at the lower level. At level zero, each slot 62 /// represents one millisecond of time. 63 /// 64 /// The wheels are: 65 /// 66 /// * Level 0: 64 x 1 millisecond slots. 67 /// * Level 1: 64 x 64 millisecond slots. 68 /// * Level 2: 64 x ~4 second slots. 69 /// * Level 3: 64 x ~4 minute slots. 70 /// * Level 4: 64 x ~4 hour slots. 71 /// * Level 5: 64 x ~12 day slots. 72 /// 73 /// When the timer processes entries at level zero, it will notify all the 74 /// `Sleep` instances as their deadlines have been reached. For all higher 75 /// levels, all entries will be redistributed across the wheel at the next level 76 /// down. Eventually, as time progresses, entries with [`Sleep`][sleep] instances will 77 /// either be canceled (dropped) or their associated entries will reach level 78 /// zero and be notified. 79 /// 80 /// [paper]: http://www.cs.columbia.edu/~nahum/w6998/papers/ton97-timing-wheels.pdf 81 /// [sleep]: crate::time::Sleep 82 /// [timeout]: crate::time::Timeout 83 /// [interval]: crate::time::Interval 84 #[derive(Debug)] 85 pub(crate) struct Driver<P: Park + 'static> { 86 /// Timing backend in use 87 time_source: ClockTime, 88 89 /// Shared state 90 handle: Handle, 91 92 /// Parker to delegate to 93 park: P, 94 } 95 96 /// A structure which handles conversion from Instants to u64 timestamps. 97 #[derive(Debug, Clone)] 98 pub(self) struct ClockTime { 99 clock: super::clock::Clock, 100 start_time: Instant, 101 } 102 103 impl ClockTime { new(clock: Clock) -> Self104 pub(self) fn new(clock: Clock) -> Self { 105 Self { 106 start_time: clock.now(), 107 clock, 108 } 109 } 110 deadline_to_tick(&self, t: Instant) -> u64111 pub(self) fn deadline_to_tick(&self, t: Instant) -> u64 { 112 // Round up to the end of a ms 113 self.instant_to_tick(t + Duration::from_nanos(999_999)) 114 } 115 instant_to_tick(&self, t: Instant) -> u64116 pub(self) fn instant_to_tick(&self, t: Instant) -> u64 { 117 // round up 118 let dur: Duration = t 119 .checked_duration_since(self.start_time) 120 .unwrap_or_else(|| Duration::from_secs(0)); 121 let ms = dur.as_millis(); 122 123 ms.try_into().expect("Duration too far into the future") 124 } 125 tick_to_duration(&self, t: u64) -> Duration126 pub(self) fn tick_to_duration(&self, t: u64) -> Duration { 127 Duration::from_millis(t) 128 } 129 now(&self) -> u64130 pub(self) fn now(&self) -> u64 { 131 self.instant_to_tick(self.clock.now()) 132 } 133 } 134 135 /// Timer state shared between `Driver`, `Handle`, and `Registration`. 136 struct Inner { 137 // The state is split like this so `Handle` can access `is_shutdown` without locking the mutex 138 pub(super) state: Mutex<InnerState>, 139 140 /// True if the driver is being shutdown 141 pub(super) is_shutdown: AtomicBool, 142 } 143 144 /// Time state shared which must be protected by a `Mutex` 145 struct InnerState { 146 /// Timing backend in use 147 time_source: ClockTime, 148 149 /// The last published timer `elapsed` value. 150 elapsed: u64, 151 152 /// The earliest time at which we promise to wake up without unparking 153 next_wake: Option<NonZeroU64>, 154 155 /// Timer wheel 156 wheel: wheel::Wheel, 157 158 /// Unparker that can be used to wake the time driver 159 unpark: Box<dyn Unpark>, 160 } 161 162 // ===== impl Driver ===== 163 164 impl<P> Driver<P> 165 where 166 P: Park + 'static, 167 { 168 /// Creates a new `Driver` instance that uses `park` to block the current 169 /// thread and `time_source` to get the current time and convert to ticks. 170 /// 171 /// Specifying the source of time is useful when testing. new(park: P, clock: Clock) -> Driver<P>172 pub(crate) fn new(park: P, clock: Clock) -> Driver<P> { 173 let time_source = ClockTime::new(clock); 174 175 let inner = Inner::new(time_source.clone(), Box::new(park.unpark())); 176 177 Driver { 178 time_source, 179 handle: Handle::new(Arc::new(inner)), 180 park, 181 } 182 } 183 184 /// Returns a handle to the timer. 185 /// 186 /// The `Handle` is how `Sleep` instances are created. The `Sleep` instances 187 /// can either be created directly or the `Handle` instance can be passed to 188 /// `with_default`, setting the timer as the default timer for the execution 189 /// context. handle(&self) -> Handle190 pub(crate) fn handle(&self) -> Handle { 191 self.handle.clone() 192 } 193 park_internal(&mut self, limit: Option<Duration>) -> Result<(), P::Error>194 fn park_internal(&mut self, limit: Option<Duration>) -> Result<(), P::Error> { 195 let clock = &self.time_source.clock; 196 197 let mut lock = self.handle.get().state.lock(); 198 199 assert!(!self.handle.is_shutdown()); 200 201 let next_wake = lock.wheel.next_expiration_time(); 202 lock.next_wake = 203 next_wake.map(|t| NonZeroU64::new(t).unwrap_or_else(|| NonZeroU64::new(1).unwrap())); 204 205 drop(lock); 206 207 match next_wake { 208 Some(when) => { 209 let now = self.time_source.now(); 210 // Note that we effectively round up to 1ms here - this avoids 211 // very short-duration microsecond-resolution sleeps that the OS 212 // might treat as zero-length. 213 let mut duration = self.time_source.tick_to_duration(when.saturating_sub(now)); 214 215 if duration > Duration::from_millis(0) { 216 if let Some(limit) = limit { 217 duration = std::cmp::min(limit, duration); 218 } 219 220 if clock.is_paused() { 221 self.park.park_timeout(Duration::from_secs(0))?; 222 223 // Simulate advancing time 224 clock.advance(duration); 225 } else { 226 self.park.park_timeout(duration)?; 227 } 228 } else { 229 self.park.park_timeout(Duration::from_secs(0))?; 230 } 231 } 232 None => { 233 if let Some(duration) = limit { 234 if clock.is_paused() { 235 self.park.park_timeout(Duration::from_secs(0))?; 236 clock.advance(duration); 237 } else { 238 self.park.park_timeout(duration)?; 239 } 240 } else { 241 self.park.park()?; 242 } 243 } 244 } 245 246 // Process pending timers after waking up 247 self.handle.process(); 248 249 Ok(()) 250 } 251 } 252 253 impl Handle { 254 /// Runs timer related logic, and returns the next wakeup time process(&self)255 pub(self) fn process(&self) { 256 let now = self.time_source().now(); 257 258 self.process_at_time(now) 259 } 260 process_at_time(&self, now: u64)261 pub(self) fn process_at_time(&self, now: u64) { 262 let mut waker_list: [Option<Waker>; 32] = Default::default(); 263 let mut waker_idx = 0; 264 265 let mut lock = self.get().lock(); 266 267 assert!(now >= lock.elapsed); 268 269 while let Some(entry) = lock.wheel.poll(now) { 270 debug_assert!(unsafe { entry.is_pending() }); 271 272 // SAFETY: We hold the driver lock, and just removed the entry from any linked lists. 273 if let Some(waker) = unsafe { entry.fire(Ok(())) } { 274 waker_list[waker_idx] = Some(waker); 275 276 waker_idx += 1; 277 278 if waker_idx == waker_list.len() { 279 // Wake a batch of wakers. To avoid deadlock, we must do this with the lock temporarily dropped. 280 drop(lock); 281 282 for waker in waker_list.iter_mut() { 283 waker.take().unwrap().wake(); 284 } 285 286 waker_idx = 0; 287 288 lock = self.get().lock(); 289 } 290 } 291 } 292 293 // Update the elapsed cache 294 lock.elapsed = lock.wheel.elapsed(); 295 lock.next_wake = lock 296 .wheel 297 .poll_at() 298 .map(|t| NonZeroU64::new(t).unwrap_or_else(|| NonZeroU64::new(1).unwrap())); 299 300 drop(lock); 301 302 for waker in waker_list[0..waker_idx].iter_mut() { 303 waker.take().unwrap().wake(); 304 } 305 } 306 307 /// Removes a registered timer from the driver. 308 /// 309 /// The timer will be moved to the cancelled state. Wakers will _not_ be 310 /// invoked. If the timer is already completed, this function is a no-op. 311 /// 312 /// This function always acquires the driver lock, even if the entry does 313 /// not appear to be registered. 314 /// 315 /// SAFETY: The timer must not be registered with some other driver, and 316 /// `add_entry` must not be called concurrently. clear_entry(&self, entry: NonNull<TimerShared>)317 pub(self) unsafe fn clear_entry(&self, entry: NonNull<TimerShared>) { 318 unsafe { 319 let mut lock = self.get().lock(); 320 321 if entry.as_ref().might_be_registered() { 322 lock.wheel.remove(entry); 323 } 324 325 entry.as_ref().handle().fire(Ok(())); 326 } 327 } 328 329 /// Removes and re-adds an entry to the driver. 330 /// 331 /// SAFETY: The timer must be either unregistered, or registered with this 332 /// driver. No other threads are allowed to concurrently manipulate the 333 /// timer at all (the current thread should hold an exclusive reference to 334 /// the `TimerEntry`) reregister(&self, new_tick: u64, entry: NonNull<TimerShared>)335 pub(self) unsafe fn reregister(&self, new_tick: u64, entry: NonNull<TimerShared>) { 336 let waker = unsafe { 337 let mut lock = self.get().lock(); 338 339 // We may have raced with a firing/deregistration, so check before 340 // deregistering. 341 if unsafe { entry.as_ref().might_be_registered() } { 342 lock.wheel.remove(entry); 343 } 344 345 // Now that we have exclusive control of this entry, mint a handle to reinsert it. 346 let entry = entry.as_ref().handle(); 347 348 if self.is_shutdown() { 349 unsafe { entry.fire(Err(crate::time::error::Error::shutdown())) } 350 } else { 351 entry.set_expiration(new_tick); 352 353 // Note: We don't have to worry about racing with some other resetting 354 // thread, because add_entry and reregister require exclusive control of 355 // the timer entry. 356 match unsafe { lock.wheel.insert(entry) } { 357 Ok(when) => { 358 if lock 359 .next_wake 360 .map(|next_wake| when < next_wake.get()) 361 .unwrap_or(true) 362 { 363 lock.unpark.unpark(); 364 } 365 366 None 367 } 368 Err((entry, super::error::InsertError::Elapsed)) => unsafe { 369 entry.fire(Ok(())) 370 }, 371 } 372 } 373 374 // Must release lock before invoking waker to avoid the risk of deadlock. 375 }; 376 377 // The timer was fired synchronously as a result of the reregistration. 378 // Wake the waker; this is needed because we might reset _after_ a poll, 379 // and otherwise the task won't be awoken to poll again. 380 if let Some(waker) = waker { 381 waker.wake(); 382 } 383 } 384 } 385 386 impl<P> Park for Driver<P> 387 where 388 P: Park + 'static, 389 { 390 type Unpark = P::Unpark; 391 type Error = P::Error; 392 unpark(&self) -> Self::Unpark393 fn unpark(&self) -> Self::Unpark { 394 self.park.unpark() 395 } 396 park(&mut self) -> Result<(), Self::Error>397 fn park(&mut self) -> Result<(), Self::Error> { 398 self.park_internal(None) 399 } 400 park_timeout(&mut self, duration: Duration) -> Result<(), Self::Error>401 fn park_timeout(&mut self, duration: Duration) -> Result<(), Self::Error> { 402 self.park_internal(Some(duration)) 403 } 404 shutdown(&mut self)405 fn shutdown(&mut self) { 406 if self.handle.is_shutdown() { 407 return; 408 } 409 410 self.handle.get().is_shutdown.store(true, Ordering::SeqCst); 411 412 // Advance time forward to the end of time. 413 414 self.handle.process_at_time(u64::MAX); 415 416 self.park.shutdown(); 417 } 418 } 419 420 impl<P> Drop for Driver<P> 421 where 422 P: Park + 'static, 423 { drop(&mut self)424 fn drop(&mut self) { 425 self.shutdown(); 426 } 427 } 428 429 // ===== impl Inner ===== 430 431 impl Inner { new(time_source: ClockTime, unpark: Box<dyn Unpark>) -> Self432 pub(self) fn new(time_source: ClockTime, unpark: Box<dyn Unpark>) -> Self { 433 Inner { 434 state: Mutex::new(InnerState { 435 time_source, 436 elapsed: 0, 437 next_wake: None, 438 unpark, 439 wheel: wheel::Wheel::new(), 440 }), 441 is_shutdown: AtomicBool::new(false), 442 } 443 } 444 445 /// Locks the driver's inner structure lock(&self) -> crate::loom::sync::MutexGuard<'_, InnerState>446 pub(super) fn lock(&self) -> crate::loom::sync::MutexGuard<'_, InnerState> { 447 self.state.lock() 448 } 449 450 // Check whether the driver has been shutdown is_shutdown(&self) -> bool451 pub(super) fn is_shutdown(&self) -> bool { 452 self.is_shutdown.load(Ordering::SeqCst) 453 } 454 } 455 456 impl fmt::Debug for Inner { fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result457 fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result { 458 fmt.debug_struct("Inner").finish() 459 } 460 } 461 462 #[cfg(test)] 463 mod tests; 464