1 //! A queue of delayed elements. 2 //! 3 //! See [`DelayQueue`] for more details. 4 //! 5 //! [`DelayQueue`]: struct@DelayQueue 6 7 use crate::time::wheel::{self, Wheel}; 8 9 use tokio::time::{sleep_until, Duration, Instant, Sleep}; 10 11 use core::ops::{Index, IndexMut}; 12 use slab::Slab; 13 use std::cmp; 14 use std::collections::HashMap; 15 use std::convert::From; 16 use std::fmt; 17 use std::fmt::Debug; 18 use std::future::Future; 19 use std::marker::PhantomData; 20 use std::pin::Pin; 21 use std::task::{self, ready, Poll, Waker}; 22 23 /// A queue of delayed elements. 24 /// 25 /// Once an element is inserted into the `DelayQueue`, it is yielded once the 26 /// specified deadline has been reached. 27 /// 28 /// # Usage 29 /// 30 /// Elements are inserted into `DelayQueue` using the [`insert`] or 31 /// [`insert_at`] methods. A deadline is provided with the item and a [`Key`] is 32 /// returned. The key is used to remove the entry or to change the deadline at 33 /// which it should be yielded back. 34 /// 35 /// Once delays have been configured, the `DelayQueue` is used via its 36 /// [`Stream`] implementation. [`poll_expired`] is called. If an entry has reached its 37 /// deadline, it is returned. If not, `Poll::Pending` is returned indicating that the 38 /// current task will be notified once the deadline has been reached. 39 /// 40 /// # `Stream` implementation 41 /// 42 /// Items are retrieved from the queue via [`DelayQueue::poll_expired`]. If no delays have 43 /// expired, no items are returned. In this case, `Poll::Pending` is returned and the 44 /// current task is registered to be notified once the next item's delay has 45 /// expired. 46 /// 47 /// If no items are in the queue, i.e. `is_empty()` returns `true`, then `poll` 48 /// returns `Poll::Ready(None)`. This indicates that the stream has reached an end. 49 /// However, if a new item is inserted *after*, `poll` will once again start 50 /// returning items or `Poll::Pending`. 51 /// 52 /// Items are returned ordered by their expirations. Items that are configured 53 /// to expire first will be returned first. There are no ordering guarantees 54 /// for items configured to expire at the same instant. Also note that delays are 55 /// rounded to the closest millisecond. 56 /// 57 /// # Implementation 58 /// 59 /// The [`DelayQueue`] is backed by a separate instance of a timer wheel similar to that used internally 60 /// by Tokio's standalone timer utilities such as [`sleep`]. Because of this, it offers the same 61 /// performance and scalability benefits. 62 /// 63 /// State associated with each entry is stored in a [`slab`]. This amortizes the cost of allocation, 64 /// and allows reuse of the memory allocated for expired entries. 65 /// 66 /// Capacity can be checked using [`capacity`] and allocated preemptively by using 67 /// the [`reserve`] method. 68 /// 69 /// # Usage 70 /// 71 /// Using `DelayQueue` to manage cache entries. 72 /// 73 /// ```rust,no_run 74 /// use tokio_util::time::{DelayQueue, delay_queue}; 75 /// 76 /// use std::collections::HashMap; 77 /// use std::task::{ready, Context, Poll}; 78 /// use std::time::Duration; 79 /// # type CacheKey = String; 80 /// # type Value = String; 81 /// 82 /// struct Cache { 83 /// entries: HashMap<CacheKey, (Value, delay_queue::Key)>, 84 /// expirations: DelayQueue<CacheKey>, 85 /// } 86 /// 87 /// const TTL_SECS: u64 = 30; 88 /// 89 /// impl Cache { 90 /// fn insert(&mut self, key: CacheKey, value: Value) { 91 /// let delay = self.expirations 92 /// .insert(key.clone(), Duration::from_secs(TTL_SECS)); 93 /// 94 /// self.entries.insert(key, (value, delay)); 95 /// } 96 /// 97 /// fn get(&self, key: &CacheKey) -> Option<&Value> { 98 /// self.entries.get(key) 99 /// .map(|&(ref v, _)| v) 100 /// } 101 /// 102 /// fn remove(&mut self, key: &CacheKey) { 103 /// if let Some((_, cache_key)) = self.entries.remove(key) { 104 /// self.expirations.remove(&cache_key); 105 /// } 106 /// } 107 /// 108 /// fn poll_purge(&mut self, cx: &mut Context<'_>) -> Poll<()> { 109 /// while let Some(entry) = ready!(self.expirations.poll_expired(cx)) { 110 /// self.entries.remove(entry.get_ref()); 111 /// } 112 /// 113 /// Poll::Ready(()) 114 /// } 115 /// } 116 /// ``` 117 /// 118 /// [`insert`]: method@Self::insert 119 /// [`insert_at`]: method@Self::insert_at 120 /// [`Key`]: struct@Key 121 /// [`Stream`]: https://docs.rs/futures/0.1/futures/stream/trait.Stream.html 122 /// [`poll_expired`]: method@Self::poll_expired 123 /// [`Stream::poll_expired`]: method@Self::poll_expired 124 /// [`DelayQueue`]: struct@DelayQueue 125 /// [`sleep`]: fn@tokio::time::sleep 126 /// [`slab`]: slab 127 /// [`capacity`]: method@Self::capacity 128 /// [`reserve`]: method@Self::reserve 129 #[derive(Debug)] 130 pub struct DelayQueue<T> { 131 /// Stores data associated with entries 132 slab: SlabStorage<T>, 133 134 /// Lookup structure tracking all delays in the queue 135 wheel: Wheel<Stack<T>>, 136 137 /// Delays that were inserted when already expired. These cannot be stored 138 /// in the wheel 139 expired: Stack<T>, 140 141 /// Delay expiring when the *first* item in the queue expires 142 delay: Option<Pin<Box<Sleep>>>, 143 144 /// Wheel polling state 145 wheel_now: u64, 146 147 /// Instant at which the timer starts 148 start: Instant, 149 150 /// Waker that is invoked when we potentially need to reset the timer. 151 /// Because we lazily create the timer when the first entry is created, we 152 /// need to awaken any poller that polled us before that point. 153 waker: Option<Waker>, 154 } 155 156 #[derive(Default)] 157 struct SlabStorage<T> { 158 inner: Slab<Data<T>>, 159 160 // A `compact` call requires a re-mapping of the `Key`s that were changed 161 // during the `compact` call of the `slab`. Since the keys that were given out 162 // cannot be changed retroactively we need to keep track of these re-mappings. 163 // The keys of `key_map` correspond to the old keys that were given out and 164 // the values to the `Key`s that were re-mapped by the `compact` call. 165 key_map: HashMap<Key, KeyInternal>, 166 167 // Index used to create new keys to hand out. 168 next_key_index: usize, 169 170 // Whether `compact` has been called, necessary in order to decide whether 171 // to include keys in `key_map`. 172 compact_called: bool, 173 } 174 175 impl<T> SlabStorage<T> { with_capacity(capacity: usize) -> SlabStorage<T>176 pub(crate) fn with_capacity(capacity: usize) -> SlabStorage<T> { 177 SlabStorage { 178 inner: Slab::with_capacity(capacity), 179 key_map: HashMap::new(), 180 next_key_index: 0, 181 compact_called: false, 182 } 183 } 184 185 // Inserts data into the inner slab and re-maps keys if necessary insert(&mut self, val: Data<T>) -> Key186 pub(crate) fn insert(&mut self, val: Data<T>) -> Key { 187 let mut key = KeyInternal::new(self.inner.insert(val)); 188 let key_contained = self.key_map.contains_key(&key.into()); 189 190 if key_contained { 191 // It's possible that a `compact` call creates capacity in `self.inner` in 192 // such a way that a `self.inner.insert` call creates a `key` which was 193 // previously given out during an `insert` call prior to the `compact` call. 194 // If `key` is contained in `self.key_map`, we have encountered this exact situation, 195 // We need to create a new key `key_to_give_out` and include the relation 196 // `key_to_give_out` -> `key` in `self.key_map`. 197 let key_to_give_out = self.create_new_key(); 198 assert!(!self.key_map.contains_key(&key_to_give_out.into())); 199 self.key_map.insert(key_to_give_out.into(), key); 200 key = key_to_give_out; 201 } else if self.compact_called { 202 // Include an identity mapping in `self.key_map` in order to allow us to 203 // panic if a key that was handed out is removed more than once. 204 self.key_map.insert(key.into(), key); 205 } 206 207 key.into() 208 } 209 210 // Re-map the key in case compact was previously called. 211 // Note: Since we include identity mappings in key_map after compact was called, 212 // we have information about all keys that were handed out. In the case in which 213 // compact was called and we try to remove a Key that was previously removed 214 // we can detect invalid keys if no key is found in `key_map`. This is necessary 215 // in order to prevent situations in which a previously removed key 216 // corresponds to a re-mapped key internally and which would then be incorrectly 217 // removed from the slab. 218 // 219 // Example to illuminate this problem: 220 // 221 // Let's assume our `key_map` is {1 -> 2, 2 -> 1} and we call remove(1). If we 222 // were to remove 1 again, we would not find it inside `key_map` anymore. 223 // If we were to imply from this that no re-mapping was necessary, we would 224 // incorrectly remove 1 from `self.slab.inner`, which corresponds to the 225 // handed-out key 2. remove(&mut self, key: &Key) -> Data<T>226 pub(crate) fn remove(&mut self, key: &Key) -> Data<T> { 227 let remapped_key = if self.compact_called { 228 match self.key_map.remove(key) { 229 Some(key_internal) => key_internal, 230 None => panic!("invalid key"), 231 } 232 } else { 233 (*key).into() 234 }; 235 236 self.inner.remove(remapped_key.index) 237 } 238 shrink_to_fit(&mut self)239 pub(crate) fn shrink_to_fit(&mut self) { 240 self.inner.shrink_to_fit(); 241 self.key_map.shrink_to_fit(); 242 } 243 compact(&mut self)244 pub(crate) fn compact(&mut self) { 245 if !self.compact_called { 246 for (key, _) in self.inner.iter() { 247 self.key_map.insert(Key::new(key), KeyInternal::new(key)); 248 } 249 } 250 251 let mut remapping = HashMap::new(); 252 self.inner.compact(|_, from, to| { 253 remapping.insert(from, to); 254 true 255 }); 256 257 // At this point `key_map` contains a mapping for every element. 258 for internal_key in self.key_map.values_mut() { 259 if let Some(new_internal_key) = remapping.get(&internal_key.index) { 260 *internal_key = KeyInternal::new(*new_internal_key); 261 } 262 } 263 264 if self.key_map.capacity() > 2 * self.key_map.len() { 265 self.key_map.shrink_to_fit(); 266 } 267 268 self.compact_called = true; 269 } 270 271 // Tries to re-map a `Key` that was given out to the user to its 272 // corresponding internal key. remap_key(&self, key: &Key) -> Option<KeyInternal>273 fn remap_key(&self, key: &Key) -> Option<KeyInternal> { 274 let key_map = &self.key_map; 275 if self.compact_called { 276 key_map.get(key).copied() 277 } else { 278 Some((*key).into()) 279 } 280 } 281 create_new_key(&mut self) -> KeyInternal282 fn create_new_key(&mut self) -> KeyInternal { 283 while self.key_map.contains_key(&Key::new(self.next_key_index)) { 284 self.next_key_index = self.next_key_index.wrapping_add(1); 285 } 286 287 KeyInternal::new(self.next_key_index) 288 } 289 len(&self) -> usize290 pub(crate) fn len(&self) -> usize { 291 self.inner.len() 292 } 293 capacity(&self) -> usize294 pub(crate) fn capacity(&self) -> usize { 295 self.inner.capacity() 296 } 297 clear(&mut self)298 pub(crate) fn clear(&mut self) { 299 self.inner.clear(); 300 self.key_map.clear(); 301 self.compact_called = false; 302 } 303 reserve(&mut self, additional: usize)304 pub(crate) fn reserve(&mut self, additional: usize) { 305 self.inner.reserve(additional); 306 307 if self.compact_called { 308 self.key_map.reserve(additional); 309 } 310 } 311 is_empty(&self) -> bool312 pub(crate) fn is_empty(&self) -> bool { 313 self.inner.is_empty() 314 } 315 contains(&self, key: &Key) -> bool316 pub(crate) fn contains(&self, key: &Key) -> bool { 317 let remapped_key = self.remap_key(key); 318 319 match remapped_key { 320 Some(internal_key) => self.inner.contains(internal_key.index), 321 None => false, 322 } 323 } 324 } 325 326 impl<T> fmt::Debug for SlabStorage<T> 327 where 328 T: fmt::Debug, 329 { fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result330 fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result { 331 if fmt.alternate() { 332 fmt.debug_map().entries(self.inner.iter()).finish() 333 } else { 334 fmt.debug_struct("Slab") 335 .field("len", &self.len()) 336 .field("cap", &self.capacity()) 337 .finish() 338 } 339 } 340 } 341 342 impl<T> Index<Key> for SlabStorage<T> { 343 type Output = Data<T>; 344 index(&self, key: Key) -> &Self::Output345 fn index(&self, key: Key) -> &Self::Output { 346 let remapped_key = self.remap_key(&key); 347 348 match remapped_key { 349 Some(internal_key) => &self.inner[internal_key.index], 350 None => panic!("Invalid index {}", key.index), 351 } 352 } 353 } 354 355 impl<T> IndexMut<Key> for SlabStorage<T> { index_mut(&mut self, key: Key) -> &mut Data<T>356 fn index_mut(&mut self, key: Key) -> &mut Data<T> { 357 let remapped_key = self.remap_key(&key); 358 359 match remapped_key { 360 Some(internal_key) => &mut self.inner[internal_key.index], 361 None => panic!("Invalid index {}", key.index), 362 } 363 } 364 } 365 366 /// An entry in `DelayQueue` that has expired and been removed. 367 /// 368 /// Values are returned by [`DelayQueue::poll_expired`]. 369 /// 370 /// [`DelayQueue::poll_expired`]: method@DelayQueue::poll_expired 371 #[derive(Debug)] 372 pub struct Expired<T> { 373 /// The data stored in the queue 374 data: T, 375 376 /// The expiration time 377 deadline: Instant, 378 379 /// The key associated with the entry 380 key: Key, 381 } 382 383 /// Token to a value stored in a `DelayQueue`. 384 /// 385 /// Instances of `Key` are returned by [`DelayQueue::insert`]. See [`DelayQueue`] 386 /// documentation for more details. 387 /// 388 /// [`DelayQueue`]: struct@DelayQueue 389 /// [`DelayQueue::insert`]: method@DelayQueue::insert 390 #[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)] 391 pub struct Key { 392 index: usize, 393 } 394 395 // Whereas `Key` is given out to users that use `DelayQueue`, internally we use 396 // `KeyInternal` as the key type in order to make the logic of mapping between keys 397 // as a result of `compact` calls clearer. 398 #[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)] 399 struct KeyInternal { 400 index: usize, 401 } 402 403 #[derive(Debug)] 404 struct Stack<T> { 405 /// Head of the stack 406 head: Option<Key>, 407 _p: PhantomData<fn() -> T>, 408 } 409 410 #[derive(Debug)] 411 struct Data<T> { 412 /// The data being stored in the queue and will be returned at the requested 413 /// instant. 414 inner: T, 415 416 /// The instant at which the item is returned. 417 when: u64, 418 419 /// Set to true when stored in the `expired` queue 420 expired: bool, 421 422 /// Next entry in the stack 423 next: Option<Key>, 424 425 /// Previous entry in the stack 426 prev: Option<Key>, 427 } 428 429 /// Maximum number of entries the queue can handle 430 const MAX_ENTRIES: usize = (1 << 30) - 1; 431 432 impl<T> DelayQueue<T> { 433 /// Creates a new, empty, `DelayQueue`. 434 /// 435 /// The queue will not allocate storage until items are inserted into it. 436 /// 437 /// # Examples 438 /// 439 /// ```rust 440 /// # use tokio_util::time::DelayQueue; 441 /// let delay_queue: DelayQueue<u32> = DelayQueue::new(); 442 /// ``` new() -> DelayQueue<T>443 pub fn new() -> DelayQueue<T> { 444 DelayQueue::with_capacity(0) 445 } 446 447 /// Creates a new, empty, `DelayQueue` with the specified capacity. 448 /// 449 /// The queue will be able to hold at least `capacity` elements without 450 /// reallocating. If `capacity` is 0, the queue will not allocate for 451 /// storage. 452 /// 453 /// # Examples 454 /// 455 /// ```rust 456 /// # use tokio_util::time::DelayQueue; 457 /// # use std::time::Duration; 458 /// 459 /// # #[tokio::main] 460 /// # async fn main() { 461 /// let mut delay_queue = DelayQueue::with_capacity(10); 462 /// 463 /// // These insertions are done without further allocation 464 /// for i in 0..10 { 465 /// delay_queue.insert(i, Duration::from_secs(i)); 466 /// } 467 /// 468 /// // This will make the queue allocate additional storage 469 /// delay_queue.insert(11, Duration::from_secs(11)); 470 /// # } 471 /// ``` with_capacity(capacity: usize) -> DelayQueue<T>472 pub fn with_capacity(capacity: usize) -> DelayQueue<T> { 473 DelayQueue { 474 wheel: Wheel::new(), 475 slab: SlabStorage::with_capacity(capacity), 476 expired: Stack::default(), 477 delay: None, 478 wheel_now: 0, 479 start: Instant::now(), 480 waker: None, 481 } 482 } 483 484 /// Inserts `value` into the queue set to expire at a specific instant in 485 /// time. 486 /// 487 /// This function is identical to `insert`, but takes an `Instant` instead 488 /// of a `Duration`. 489 /// 490 /// `value` is stored in the queue until `when` is reached. At which point, 491 /// `value` will be returned from [`poll_expired`]. If `when` has already been 492 /// reached, then `value` is immediately made available to poll. 493 /// 494 /// The return value represents the insertion and is used as an argument to 495 /// [`remove`] and [`reset`]. Note that [`Key`] is a token and is reused once 496 /// `value` is removed from the queue either by calling [`poll_expired`] after 497 /// `when` is reached or by calling [`remove`]. At this point, the caller 498 /// must take care to not use the returned [`Key`] again as it may reference 499 /// a different item in the queue. 500 /// 501 /// See [type] level documentation for more details. 502 /// 503 /// # Panics 504 /// 505 /// This function panics if `when` is too far in the future. 506 /// 507 /// # Examples 508 /// 509 /// Basic usage 510 /// 511 /// ```rust 512 /// use tokio::time::{Duration, Instant}; 513 /// use tokio_util::time::DelayQueue; 514 /// 515 /// # #[tokio::main] 516 /// # async fn main() { 517 /// let mut delay_queue = DelayQueue::new(); 518 /// let key = delay_queue.insert_at( 519 /// "foo", Instant::now() + Duration::from_secs(5)); 520 /// 521 /// // Remove the entry 522 /// let item = delay_queue.remove(&key); 523 /// assert_eq!(*item.get_ref(), "foo"); 524 /// # } 525 /// ``` 526 /// 527 /// [`poll_expired`]: method@Self::poll_expired 528 /// [`remove`]: method@Self::remove 529 /// [`reset`]: method@Self::reset 530 /// [`Key`]: struct@Key 531 /// [type]: # 532 #[track_caller] insert_at(&mut self, value: T, when: Instant) -> Key533 pub fn insert_at(&mut self, value: T, when: Instant) -> Key { 534 assert!(self.slab.len() < MAX_ENTRIES, "max entries exceeded"); 535 536 // Normalize the deadline. Values cannot be set to expire in the past. 537 let when = self.normalize_deadline(when); 538 539 // Insert the value in the store 540 let key = self.slab.insert(Data { 541 inner: value, 542 when, 543 expired: false, 544 next: None, 545 prev: None, 546 }); 547 548 self.insert_idx(when, key); 549 550 // Set a new delay if the current's deadline is later than the one of the new item 551 let should_set_delay = if let Some(ref delay) = self.delay { 552 let current_exp = self.normalize_deadline(delay.deadline()); 553 current_exp > when 554 } else { 555 true 556 }; 557 558 if should_set_delay { 559 if let Some(waker) = self.waker.take() { 560 waker.wake(); 561 } 562 563 let delay_time = self.start + Duration::from_millis(when); 564 if let Some(ref mut delay) = &mut self.delay { 565 delay.as_mut().reset(delay_time); 566 } else { 567 self.delay = Some(Box::pin(sleep_until(delay_time))); 568 } 569 } 570 571 key 572 } 573 574 /// Attempts to pull out the next value of the delay queue, registering the 575 /// current task for wakeup if the value is not yet available, and returning 576 /// `None` if the queue is exhausted. poll_expired(&mut self, cx: &mut task::Context<'_>) -> Poll<Option<Expired<T>>>577 pub fn poll_expired(&mut self, cx: &mut task::Context<'_>) -> Poll<Option<Expired<T>>> { 578 if !self 579 .waker 580 .as_ref() 581 .map(|w| w.will_wake(cx.waker())) 582 .unwrap_or(false) 583 { 584 self.waker = Some(cx.waker().clone()); 585 } 586 587 let item = ready!(self.poll_idx(cx)); 588 Poll::Ready(item.map(|key| { 589 let data = self.slab.remove(&key); 590 debug_assert!(data.next.is_none()); 591 debug_assert!(data.prev.is_none()); 592 593 Expired { 594 key, 595 data: data.inner, 596 deadline: self.start + Duration::from_millis(data.when), 597 } 598 })) 599 } 600 601 /// Inserts `value` into the queue set to expire after the requested duration 602 /// elapses. 603 /// 604 /// This function is identical to `insert_at`, but takes a `Duration` 605 /// instead of an `Instant`. 606 /// 607 /// `value` is stored in the queue until `timeout` duration has 608 /// elapsed after `insert` was called. At that point, `value` will 609 /// be returned from [`poll_expired`]. If `timeout` is a `Duration` of 610 /// zero, then `value` is immediately made available to poll. 611 /// 612 /// The return value represents the insertion and is used as an 613 /// argument to [`remove`] and [`reset`]. Note that [`Key`] is a 614 /// token and is reused once `value` is removed from the queue 615 /// either by calling [`poll_expired`] after `timeout` has elapsed 616 /// or by calling [`remove`]. At this point, the caller must not 617 /// use the returned [`Key`] again as it may reference a different 618 /// item in the queue. 619 /// 620 /// See [type] level documentation for more details. 621 /// 622 /// # Panics 623 /// 624 /// This function panics if `timeout` is greater than the maximum 625 /// duration supported by the timer in the current `Runtime`. 626 /// 627 /// # Examples 628 /// 629 /// Basic usage 630 /// 631 /// ```rust 632 /// use tokio_util::time::DelayQueue; 633 /// use std::time::Duration; 634 /// 635 /// # #[tokio::main] 636 /// # async fn main() { 637 /// let mut delay_queue = DelayQueue::new(); 638 /// let key = delay_queue.insert("foo", Duration::from_secs(5)); 639 /// 640 /// // Remove the entry 641 /// let item = delay_queue.remove(&key); 642 /// assert_eq!(*item.get_ref(), "foo"); 643 /// # } 644 /// ``` 645 /// 646 /// [`poll_expired`]: method@Self::poll_expired 647 /// [`remove`]: method@Self::remove 648 /// [`reset`]: method@Self::reset 649 /// [`Key`]: struct@Key 650 /// [type]: # 651 #[track_caller] insert(&mut self, value: T, timeout: Duration) -> Key652 pub fn insert(&mut self, value: T, timeout: Duration) -> Key { 653 self.insert_at(value, Instant::now() + timeout) 654 } 655 656 #[track_caller] insert_idx(&mut self, when: u64, key: Key)657 fn insert_idx(&mut self, when: u64, key: Key) { 658 use self::wheel::{InsertError, Stack}; 659 660 // Register the deadline with the timer wheel 661 match self.wheel.insert(when, key, &mut self.slab) { 662 Ok(_) => {} 663 Err((_, InsertError::Elapsed)) => { 664 self.slab[key].expired = true; 665 // The delay is already expired, store it in the expired queue 666 self.expired.push(key, &mut self.slab); 667 } 668 Err((_, err)) => panic!("invalid deadline; err={err:?}"), 669 } 670 } 671 672 /// Returns the deadline of the item associated with `key`. 673 /// 674 /// Since the queue operates at millisecond granularity, the returned 675 /// deadline may not exactly match the value that was given when initially 676 /// inserting the item into the queue. 677 /// 678 /// # Panics 679 /// 680 /// This function panics if `key` is not contained by the queue. 681 /// 682 /// # Examples 683 /// 684 /// Basic usage 685 /// 686 /// ```rust 687 /// use tokio_util::time::DelayQueue; 688 /// use std::time::Duration; 689 /// 690 /// # #[tokio::main] 691 /// # async fn main() { 692 /// let mut delay_queue = DelayQueue::new(); 693 /// 694 /// let key1 = delay_queue.insert("foo", Duration::from_secs(5)); 695 /// let key2 = delay_queue.insert("bar", Duration::from_secs(10)); 696 /// 697 /// assert!(delay_queue.deadline(&key1) < delay_queue.deadline(&key2)); 698 /// # } 699 /// ``` 700 #[track_caller] deadline(&self, key: &Key) -> Instant701 pub fn deadline(&self, key: &Key) -> Instant { 702 self.start + Duration::from_millis(self.slab[*key].when) 703 } 704 705 /// Removes the key from the expired queue or the timer wheel 706 /// depending on its expiration status. 707 /// 708 /// # Panics 709 /// 710 /// Panics if the key is not contained in the expired queue or the wheel. 711 #[track_caller] remove_key(&mut self, key: &Key)712 fn remove_key(&mut self, key: &Key) { 713 use crate::time::wheel::Stack; 714 715 // Special case the `expired` queue 716 if self.slab[*key].expired { 717 self.expired.remove(key, &mut self.slab); 718 } else { 719 self.wheel.remove(key, &mut self.slab); 720 } 721 } 722 723 /// Removes the item associated with `key` from the queue. 724 /// 725 /// There must be an item associated with `key`. The function returns the 726 /// removed item as well as the `Instant` at which it will the delay will 727 /// have expired. 728 /// 729 /// # Panics 730 /// 731 /// The function panics if `key` is not contained by the queue. 732 /// 733 /// # Examples 734 /// 735 /// Basic usage 736 /// 737 /// ```rust 738 /// use tokio_util::time::DelayQueue; 739 /// use std::time::Duration; 740 /// 741 /// # #[tokio::main] 742 /// # async fn main() { 743 /// let mut delay_queue = DelayQueue::new(); 744 /// let key = delay_queue.insert("foo", Duration::from_secs(5)); 745 /// 746 /// // Remove the entry 747 /// let item = delay_queue.remove(&key); 748 /// assert_eq!(*item.get_ref(), "foo"); 749 /// # } 750 /// ``` 751 #[track_caller] remove(&mut self, key: &Key) -> Expired<T>752 pub fn remove(&mut self, key: &Key) -> Expired<T> { 753 let prev_deadline = self.next_deadline(); 754 755 self.remove_key(key); 756 let data = self.slab.remove(key); 757 758 let next_deadline = self.next_deadline(); 759 if prev_deadline != next_deadline { 760 match (next_deadline, &mut self.delay) { 761 (None, _) => self.delay = None, 762 (Some(deadline), Some(delay)) => delay.as_mut().reset(deadline), 763 (Some(deadline), None) => self.delay = Some(Box::pin(sleep_until(deadline))), 764 } 765 } 766 767 if self.slab.is_empty() { 768 if let Some(waker) = self.waker.take() { 769 waker.wake(); 770 } 771 } 772 773 Expired { 774 key: Key::new(key.index), 775 data: data.inner, 776 deadline: self.start + Duration::from_millis(data.when), 777 } 778 } 779 780 /// Attempts to remove the item associated with `key` from the queue. 781 /// 782 /// Removes the item associated with `key`, and returns it along with the 783 /// `Instant` at which it would have expired, if it exists. 784 /// 785 /// Returns `None` if `key` is not in the queue. 786 /// 787 /// # Examples 788 /// 789 /// Basic usage 790 /// 791 /// ```rust 792 /// use tokio_util::time::DelayQueue; 793 /// use std::time::Duration; 794 /// 795 /// # #[tokio::main(flavor = "current_thread")] 796 /// # async fn main() { 797 /// let mut delay_queue = DelayQueue::new(); 798 /// let key = delay_queue.insert("foo", Duration::from_secs(5)); 799 /// 800 /// // The item is in the queue, `try_remove` returns `Some(Expired("foo"))`. 801 /// let item = delay_queue.try_remove(&key); 802 /// assert_eq!(item.unwrap().into_inner(), "foo"); 803 /// 804 /// // The item is not in the queue anymore, `try_remove` returns `None`. 805 /// let item = delay_queue.try_remove(&key); 806 /// assert!(item.is_none()); 807 /// # } 808 /// ``` try_remove(&mut self, key: &Key) -> Option<Expired<T>>809 pub fn try_remove(&mut self, key: &Key) -> Option<Expired<T>> { 810 if self.slab.contains(key) { 811 Some(self.remove(key)) 812 } else { 813 None 814 } 815 } 816 817 /// Sets the delay of the item associated with `key` to expire at `when`. 818 /// 819 /// This function is identical to `reset` but takes an `Instant` instead of 820 /// a `Duration`. 821 /// 822 /// The item remains in the queue but the delay is set to expire at `when`. 823 /// If `when` is in the past, then the item is immediately made available to 824 /// the caller. 825 /// 826 /// # Panics 827 /// 828 /// This function panics if `when` is too far in the future or if `key` is 829 /// not contained by the queue. 830 /// 831 /// # Examples 832 /// 833 /// Basic usage 834 /// 835 /// ```rust 836 /// use tokio::time::{Duration, Instant}; 837 /// use tokio_util::time::DelayQueue; 838 /// 839 /// # #[tokio::main] 840 /// # async fn main() { 841 /// let mut delay_queue = DelayQueue::new(); 842 /// let key = delay_queue.insert("foo", Duration::from_secs(5)); 843 /// 844 /// // "foo" is scheduled to be returned in 5 seconds 845 /// 846 /// delay_queue.reset_at(&key, Instant::now() + Duration::from_secs(10)); 847 /// 848 /// // "foo" is now scheduled to be returned in 10 seconds 849 /// # } 850 /// ``` 851 #[track_caller] reset_at(&mut self, key: &Key, when: Instant)852 pub fn reset_at(&mut self, key: &Key, when: Instant) { 853 self.remove_key(key); 854 855 // Normalize the deadline. Values cannot be set to expire in the past. 856 let when = self.normalize_deadline(when); 857 858 self.slab[*key].when = when; 859 self.slab[*key].expired = false; 860 861 self.insert_idx(when, *key); 862 863 let next_deadline = self.next_deadline(); 864 if let (Some(ref mut delay), Some(deadline)) = (&mut self.delay, next_deadline) { 865 // This should awaken us if necessary (ie, if already expired) 866 delay.as_mut().reset(deadline); 867 } 868 } 869 870 /// Shrink the capacity of the slab, which `DelayQueue` uses internally for storage allocation. 871 /// This function is not guaranteed to, and in most cases, won't decrease the capacity of the slab 872 /// to the number of elements still contained in it, because elements cannot be moved to a different 873 /// index. To decrease the capacity to the size of the slab use [`compact`]. 874 /// 875 /// This function can take O(n) time even when the capacity cannot be reduced or the allocation is 876 /// shrunk in place. Repeated calls run in O(1) though. 877 /// 878 /// [`compact`]: method@Self::compact shrink_to_fit(&mut self)879 pub fn shrink_to_fit(&mut self) { 880 self.slab.shrink_to_fit(); 881 } 882 883 /// Shrink the capacity of the slab, which `DelayQueue` uses internally for storage allocation, 884 /// to the number of elements that are contained in it. 885 /// 886 /// This methods runs in O(n). 887 /// 888 /// # Examples 889 /// 890 /// Basic usage 891 /// 892 /// ```rust 893 /// use tokio_util::time::DelayQueue; 894 /// use std::time::Duration; 895 /// 896 /// # #[tokio::main] 897 /// # async fn main() { 898 /// let mut delay_queue = DelayQueue::with_capacity(10); 899 /// 900 /// let key1 = delay_queue.insert(5, Duration::from_secs(5)); 901 /// let key2 = delay_queue.insert(10, Duration::from_secs(10)); 902 /// let key3 = delay_queue.insert(15, Duration::from_secs(15)); 903 /// 904 /// delay_queue.remove(&key2); 905 /// 906 /// delay_queue.compact(); 907 /// assert_eq!(delay_queue.capacity(), 2); 908 /// # } 909 /// ``` compact(&mut self)910 pub fn compact(&mut self) { 911 self.slab.compact(); 912 } 913 914 /// Gets the [`Key`] that [`poll_expired`] will pull out of the queue next, without 915 /// pulling it out or waiting for the deadline to expire. 916 /// 917 /// Entries that have already expired may be returned in any order, but it is 918 /// guaranteed that this method returns them in the same order as when items 919 /// are popped from the `DelayQueue`. 920 /// 921 /// # Examples 922 /// 923 /// Basic usage 924 /// 925 /// ```rust 926 /// use tokio_util::time::DelayQueue; 927 /// use std::time::Duration; 928 /// 929 /// # #[tokio::main] 930 /// # async fn main() { 931 /// let mut delay_queue = DelayQueue::new(); 932 /// 933 /// let key1 = delay_queue.insert("foo", Duration::from_secs(10)); 934 /// let key2 = delay_queue.insert("bar", Duration::from_secs(5)); 935 /// let key3 = delay_queue.insert("baz", Duration::from_secs(15)); 936 /// 937 /// assert_eq!(delay_queue.peek().unwrap(), key2); 938 /// # } 939 /// ``` 940 /// 941 /// [`Key`]: struct@Key 942 /// [`poll_expired`]: method@Self::poll_expired peek(&self) -> Option<Key>943 pub fn peek(&self) -> Option<Key> { 944 use self::wheel::Stack; 945 946 self.expired.peek().or_else(|| self.wheel.peek()) 947 } 948 949 /// Returns the next time to poll as determined by the wheel. 950 /// 951 /// Note that this does not include deadlines in the `expired` queue. next_deadline(&self) -> Option<Instant>952 fn next_deadline(&self) -> Option<Instant> { 953 self.wheel 954 .poll_at() 955 .map(|poll_at| self.start + Duration::from_millis(poll_at)) 956 } 957 958 /// Sets the delay of the item associated with `key` to expire after 959 /// `timeout`. 960 /// 961 /// This function is identical to `reset_at` but takes a `Duration` instead 962 /// of an `Instant`. 963 /// 964 /// The item remains in the queue but the delay is set to expire after 965 /// `timeout`. If `timeout` is zero, then the item is immediately made 966 /// available to the caller. 967 /// 968 /// # Panics 969 /// 970 /// This function panics if `timeout` is greater than the maximum supported 971 /// duration or if `key` is not contained by the queue. 972 /// 973 /// # Examples 974 /// 975 /// Basic usage 976 /// 977 /// ```rust 978 /// use tokio_util::time::DelayQueue; 979 /// use std::time::Duration; 980 /// 981 /// # #[tokio::main] 982 /// # async fn main() { 983 /// let mut delay_queue = DelayQueue::new(); 984 /// let key = delay_queue.insert("foo", Duration::from_secs(5)); 985 /// 986 /// // "foo" is scheduled to be returned in 5 seconds 987 /// 988 /// delay_queue.reset(&key, Duration::from_secs(10)); 989 /// 990 /// // "foo"is now scheduled to be returned in 10 seconds 991 /// # } 992 /// ``` 993 #[track_caller] reset(&mut self, key: &Key, timeout: Duration)994 pub fn reset(&mut self, key: &Key, timeout: Duration) { 995 self.reset_at(key, Instant::now() + timeout); 996 } 997 998 /// Clears the queue, removing all items. 999 /// 1000 /// After calling `clear`, [`poll_expired`] will return `Ok(Ready(None))`. 1001 /// 1002 /// Note that this method has no effect on the allocated capacity. 1003 /// 1004 /// [`poll_expired`]: method@Self::poll_expired 1005 /// 1006 /// # Examples 1007 /// 1008 /// ```rust 1009 /// use tokio_util::time::DelayQueue; 1010 /// use std::time::Duration; 1011 /// 1012 /// # #[tokio::main] 1013 /// # async fn main() { 1014 /// let mut delay_queue = DelayQueue::new(); 1015 /// 1016 /// delay_queue.insert("foo", Duration::from_secs(5)); 1017 /// 1018 /// assert!(!delay_queue.is_empty()); 1019 /// 1020 /// delay_queue.clear(); 1021 /// 1022 /// assert!(delay_queue.is_empty()); 1023 /// # } 1024 /// ``` clear(&mut self)1025 pub fn clear(&mut self) { 1026 self.slab.clear(); 1027 self.expired = Stack::default(); 1028 self.wheel = Wheel::new(); 1029 self.delay = None; 1030 } 1031 1032 /// Returns the number of elements the queue can hold without reallocating. 1033 /// 1034 /// # Examples 1035 /// 1036 /// ```rust 1037 /// use tokio_util::time::DelayQueue; 1038 /// 1039 /// let delay_queue: DelayQueue<i32> = DelayQueue::with_capacity(10); 1040 /// assert_eq!(delay_queue.capacity(), 10); 1041 /// ``` capacity(&self) -> usize1042 pub fn capacity(&self) -> usize { 1043 self.slab.capacity() 1044 } 1045 1046 /// Returns the number of elements currently in the queue. 1047 /// 1048 /// # Examples 1049 /// 1050 /// ```rust 1051 /// use tokio_util::time::DelayQueue; 1052 /// use std::time::Duration; 1053 /// 1054 /// # #[tokio::main] 1055 /// # async fn main() { 1056 /// let mut delay_queue: DelayQueue<i32> = DelayQueue::with_capacity(10); 1057 /// assert_eq!(delay_queue.len(), 0); 1058 /// delay_queue.insert(3, Duration::from_secs(5)); 1059 /// assert_eq!(delay_queue.len(), 1); 1060 /// # } 1061 /// ``` len(&self) -> usize1062 pub fn len(&self) -> usize { 1063 self.slab.len() 1064 } 1065 1066 /// Reserves capacity for at least `additional` more items to be queued 1067 /// without allocating. 1068 /// 1069 /// `reserve` does nothing if the queue already has sufficient capacity for 1070 /// `additional` more values. If more capacity is required, a new segment of 1071 /// memory will be allocated and all existing values will be copied into it. 1072 /// As such, if the queue is already very large, a call to `reserve` can end 1073 /// up being expensive. 1074 /// 1075 /// The queue may reserve more than `additional` extra space in order to 1076 /// avoid frequent reallocations. 1077 /// 1078 /// # Panics 1079 /// 1080 /// Panics if the new capacity exceeds the maximum number of entries the 1081 /// queue can contain. 1082 /// 1083 /// # Examples 1084 /// 1085 /// ``` 1086 /// use tokio_util::time::DelayQueue; 1087 /// use std::time::Duration; 1088 /// 1089 /// # #[tokio::main] 1090 /// # async fn main() { 1091 /// let mut delay_queue = DelayQueue::new(); 1092 /// 1093 /// delay_queue.insert("hello", Duration::from_secs(10)); 1094 /// delay_queue.reserve(10); 1095 /// 1096 /// assert!(delay_queue.capacity() >= 11); 1097 /// # } 1098 /// ``` 1099 #[track_caller] reserve(&mut self, additional: usize)1100 pub fn reserve(&mut self, additional: usize) { 1101 assert!( 1102 self.slab.capacity() + additional <= MAX_ENTRIES, 1103 "max queue capacity exceeded" 1104 ); 1105 self.slab.reserve(additional); 1106 } 1107 1108 /// Returns `true` if there are no items in the queue. 1109 /// 1110 /// Note that this function returns `false` even if all items have not yet 1111 /// expired and a call to `poll` will return `Poll::Pending`. 1112 /// 1113 /// # Examples 1114 /// 1115 /// ``` 1116 /// use tokio_util::time::DelayQueue; 1117 /// use std::time::Duration; 1118 /// 1119 /// # #[tokio::main] 1120 /// # async fn main() { 1121 /// let mut delay_queue = DelayQueue::new(); 1122 /// assert!(delay_queue.is_empty()); 1123 /// 1124 /// delay_queue.insert("hello", Duration::from_secs(5)); 1125 /// assert!(!delay_queue.is_empty()); 1126 /// # } 1127 /// ``` is_empty(&self) -> bool1128 pub fn is_empty(&self) -> bool { 1129 self.slab.is_empty() 1130 } 1131 1132 /// Polls the queue, returning the index of the next slot in the slab that 1133 /// should be returned. 1134 /// 1135 /// A slot should be returned when the associated deadline has been reached. poll_idx(&mut self, cx: &mut task::Context<'_>) -> Poll<Option<Key>>1136 fn poll_idx(&mut self, cx: &mut task::Context<'_>) -> Poll<Option<Key>> { 1137 use self::wheel::Stack; 1138 1139 let expired = self.expired.pop(&mut self.slab); 1140 1141 if expired.is_some() { 1142 return Poll::Ready(expired); 1143 } 1144 1145 loop { 1146 if let Some(ref mut delay) = self.delay { 1147 if !delay.is_elapsed() { 1148 ready!(Pin::new(&mut *delay).poll(cx)); 1149 } 1150 1151 let now = crate::time::ms(delay.deadline() - self.start, crate::time::Round::Down); 1152 1153 self.wheel_now = now; 1154 } 1155 1156 // We poll the wheel to get the next value out before finding the next deadline. 1157 let wheel_idx = self.wheel.poll(self.wheel_now, &mut self.slab); 1158 1159 self.delay = self.next_deadline().map(|when| Box::pin(sleep_until(when))); 1160 1161 if let Some(idx) = wheel_idx { 1162 return Poll::Ready(Some(idx)); 1163 } 1164 1165 if self.delay.is_none() { 1166 return Poll::Ready(None); 1167 } 1168 } 1169 } 1170 normalize_deadline(&self, when: Instant) -> u641171 fn normalize_deadline(&self, when: Instant) -> u64 { 1172 let when = if when < self.start { 1173 0 1174 } else { 1175 crate::time::ms(when - self.start, crate::time::Round::Up) 1176 }; 1177 1178 cmp::max(when, self.wheel.elapsed()) 1179 } 1180 } 1181 1182 // We never put `T` in a `Pin`... 1183 impl<T> Unpin for DelayQueue<T> {} 1184 1185 impl<T> Default for DelayQueue<T> { default() -> DelayQueue<T>1186 fn default() -> DelayQueue<T> { 1187 DelayQueue::new() 1188 } 1189 } 1190 1191 impl<T> futures_core::Stream for DelayQueue<T> { 1192 // DelayQueue seems much more specific, where a user may care that it 1193 // has reached capacity, so return those errors instead of panicking. 1194 type Item = Expired<T>; 1195 poll_next(self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll<Option<Self::Item>>1196 fn poll_next(self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll<Option<Self::Item>> { 1197 DelayQueue::poll_expired(self.get_mut(), cx) 1198 } 1199 } 1200 1201 impl<T> wheel::Stack for Stack<T> { 1202 type Owned = Key; 1203 type Borrowed = Key; 1204 type Store = SlabStorage<T>; 1205 is_empty(&self) -> bool1206 fn is_empty(&self) -> bool { 1207 self.head.is_none() 1208 } 1209 push(&mut self, item: Self::Owned, store: &mut Self::Store)1210 fn push(&mut self, item: Self::Owned, store: &mut Self::Store) { 1211 // Ensure the entry is not already in a stack. 1212 debug_assert!(store[item].next.is_none()); 1213 debug_assert!(store[item].prev.is_none()); 1214 1215 // Remove the old head entry 1216 let old = self.head.take(); 1217 1218 if let Some(idx) = old { 1219 store[idx].prev = Some(item); 1220 } 1221 1222 store[item].next = old; 1223 self.head = Some(item); 1224 } 1225 pop(&mut self, store: &mut Self::Store) -> Option<Self::Owned>1226 fn pop(&mut self, store: &mut Self::Store) -> Option<Self::Owned> { 1227 if let Some(key) = self.head { 1228 self.head = store[key].next; 1229 1230 if let Some(idx) = self.head { 1231 store[idx].prev = None; 1232 } 1233 1234 store[key].next = None; 1235 debug_assert!(store[key].prev.is_none()); 1236 1237 Some(key) 1238 } else { 1239 None 1240 } 1241 } 1242 peek(&self) -> Option<Self::Owned>1243 fn peek(&self) -> Option<Self::Owned> { 1244 self.head 1245 } 1246 1247 #[track_caller] remove(&mut self, item: &Self::Borrowed, store: &mut Self::Store)1248 fn remove(&mut self, item: &Self::Borrowed, store: &mut Self::Store) { 1249 let key = *item; 1250 assert!(store.contains(item)); 1251 1252 // Ensure that the entry is in fact contained by the stack 1253 debug_assert!({ 1254 // This walks the full linked list even if an entry is found. 1255 let mut next = self.head; 1256 let mut contains = false; 1257 1258 while let Some(idx) = next { 1259 let data = &store[idx]; 1260 1261 if idx == *item { 1262 debug_assert!(!contains); 1263 contains = true; 1264 } 1265 1266 next = data.next; 1267 } 1268 1269 contains 1270 }); 1271 1272 if let Some(next) = store[key].next { 1273 store[next].prev = store[key].prev; 1274 } 1275 1276 if let Some(prev) = store[key].prev { 1277 store[prev].next = store[key].next; 1278 } else { 1279 self.head = store[key].next; 1280 } 1281 1282 store[key].next = None; 1283 store[key].prev = None; 1284 } 1285 when(item: &Self::Borrowed, store: &Self::Store) -> u641286 fn when(item: &Self::Borrowed, store: &Self::Store) -> u64 { 1287 store[*item].when 1288 } 1289 } 1290 1291 impl<T> Default for Stack<T> { default() -> Stack<T>1292 fn default() -> Stack<T> { 1293 Stack { 1294 head: None, 1295 _p: PhantomData, 1296 } 1297 } 1298 } 1299 1300 impl Key { new(index: usize) -> Key1301 pub(crate) fn new(index: usize) -> Key { 1302 Key { index } 1303 } 1304 } 1305 1306 impl KeyInternal { new(index: usize) -> KeyInternal1307 pub(crate) fn new(index: usize) -> KeyInternal { 1308 KeyInternal { index } 1309 } 1310 } 1311 1312 impl From<Key> for KeyInternal { from(item: Key) -> Self1313 fn from(item: Key) -> Self { 1314 KeyInternal::new(item.index) 1315 } 1316 } 1317 1318 impl From<KeyInternal> for Key { from(item: KeyInternal) -> Self1319 fn from(item: KeyInternal) -> Self { 1320 Key::new(item.index) 1321 } 1322 } 1323 1324 impl<T> Expired<T> { 1325 /// Returns a reference to the inner value. get_ref(&self) -> &T1326 pub fn get_ref(&self) -> &T { 1327 &self.data 1328 } 1329 1330 /// Returns a mutable reference to the inner value. get_mut(&mut self) -> &mut T1331 pub fn get_mut(&mut self) -> &mut T { 1332 &mut self.data 1333 } 1334 1335 /// Consumes `self` and returns the inner value. into_inner(self) -> T1336 pub fn into_inner(self) -> T { 1337 self.data 1338 } 1339 1340 /// Returns the deadline that the expiration was set to. deadline(&self) -> Instant1341 pub fn deadline(&self) -> Instant { 1342 self.deadline 1343 } 1344 1345 /// Returns the key that the expiration is indexed by. key(&self) -> Key1346 pub fn key(&self) -> Key { 1347 self.key 1348 } 1349 } 1350