1 //! A queue of delayed elements. 2 //! 3 //! See [`DelayQueue`] for more details. 4 //! 5 //! [`DelayQueue`]: struct@DelayQueue 6 7 use crate::time::wheel::{self, Wheel}; 8 9 use futures_core::ready; 10 use tokio::time::{sleep_until, Duration, Instant, Sleep}; 11 12 use core::ops::{Index, IndexMut}; 13 use slab::Slab; 14 use std::cmp; 15 use std::collections::HashMap; 16 use std::convert::From; 17 use std::fmt; 18 use std::fmt::Debug; 19 use std::future::Future; 20 use std::marker::PhantomData; 21 use std::pin::Pin; 22 use std::task::{self, Poll, Waker}; 23 24 /// A queue of delayed elements. 25 /// 26 /// Once an element is inserted into the `DelayQueue`, it is yielded once the 27 /// specified deadline has been reached. 28 /// 29 /// # Usage 30 /// 31 /// Elements are inserted into `DelayQueue` using the [`insert`] or 32 /// [`insert_at`] methods. A deadline is provided with the item and a [`Key`] is 33 /// returned. The key is used to remove the entry or to change the deadline at 34 /// which it should be yielded back. 35 /// 36 /// Once delays have been configured, the `DelayQueue` is used via its 37 /// [`Stream`] implementation. [`poll_expired`] is called. If an entry has reached its 38 /// deadline, it is returned. If not, `Poll::Pending` is returned indicating that the 39 /// current task will be notified once the deadline has been reached. 40 /// 41 /// # `Stream` implementation 42 /// 43 /// Items are retrieved from the queue via [`DelayQueue::poll_expired`]. If no delays have 44 /// expired, no items are returned. In this case, `Poll::Pending` is returned and the 45 /// current task is registered to be notified once the next item's delay has 46 /// expired. 47 /// 48 /// If no items are in the queue, i.e. `is_empty()` returns `true`, then `poll` 49 /// returns `Poll::Ready(None)`. This indicates that the stream has reached an end. 50 /// However, if a new item is inserted *after*, `poll` will once again start 51 /// returning items or `Poll::Pending`. 52 /// 53 /// Items are returned ordered by their expirations. Items that are configured 54 /// to expire first will be returned first. There are no ordering guarantees 55 /// for items configured to expire at the same instant. Also note that delays are 56 /// rounded to the closest millisecond. 57 /// 58 /// # Implementation 59 /// 60 /// The [`DelayQueue`] is backed by a separate instance of a timer wheel similar to that used internally 61 /// by Tokio's standalone timer utilities such as [`sleep`]. Because of this, it offers the same 62 /// performance and scalability benefits. 63 /// 64 /// State associated with each entry is stored in a [`slab`]. This amortizes the cost of allocation, 65 /// and allows reuse of the memory allocated for expired entries. 66 /// 67 /// Capacity can be checked using [`capacity`] and allocated preemptively by using 68 /// the [`reserve`] method. 69 /// 70 /// # Usage 71 /// 72 /// Using `DelayQueue` to manage cache entries. 73 /// 74 /// ```rust,no_run 75 /// use tokio_util::time::{DelayQueue, delay_queue}; 76 /// 77 /// use futures::ready; 78 /// use std::collections::HashMap; 79 /// use std::task::{Context, Poll}; 80 /// use std::time::Duration; 81 /// # type CacheKey = String; 82 /// # type Value = String; 83 /// 84 /// struct Cache { 85 /// entries: HashMap<CacheKey, (Value, delay_queue::Key)>, 86 /// expirations: DelayQueue<CacheKey>, 87 /// } 88 /// 89 /// const TTL_SECS: u64 = 30; 90 /// 91 /// impl Cache { 92 /// fn insert(&mut self, key: CacheKey, value: Value) { 93 /// let delay = self.expirations 94 /// .insert(key.clone(), Duration::from_secs(TTL_SECS)); 95 /// 96 /// self.entries.insert(key, (value, delay)); 97 /// } 98 /// 99 /// fn get(&self, key: &CacheKey) -> Option<&Value> { 100 /// self.entries.get(key) 101 /// .map(|&(ref v, _)| v) 102 /// } 103 /// 104 /// fn remove(&mut self, key: &CacheKey) { 105 /// if let Some((_, cache_key)) = self.entries.remove(key) { 106 /// self.expirations.remove(&cache_key); 107 /// } 108 /// } 109 /// 110 /// fn poll_purge(&mut self, cx: &mut Context<'_>) -> Poll<()> { 111 /// while let Some(entry) = ready!(self.expirations.poll_expired(cx)) { 112 /// self.entries.remove(entry.get_ref()); 113 /// } 114 /// 115 /// Poll::Ready(()) 116 /// } 117 /// } 118 /// ``` 119 /// 120 /// [`insert`]: method@Self::insert 121 /// [`insert_at`]: method@Self::insert_at 122 /// [`Key`]: struct@Key 123 /// [`Stream`]: https://docs.rs/futures/0.1/futures/stream/trait.Stream.html 124 /// [`poll_expired`]: method@Self::poll_expired 125 /// [`Stream::poll_expired`]: method@Self::poll_expired 126 /// [`DelayQueue`]: struct@DelayQueue 127 /// [`sleep`]: fn@tokio::time::sleep 128 /// [`slab`]: slab 129 /// [`capacity`]: method@Self::capacity 130 /// [`reserve`]: method@Self::reserve 131 #[derive(Debug)] 132 pub struct DelayQueue<T> { 133 /// Stores data associated with entries 134 slab: SlabStorage<T>, 135 136 /// Lookup structure tracking all delays in the queue 137 wheel: Wheel<Stack<T>>, 138 139 /// Delays that were inserted when already expired. These cannot be stored 140 /// in the wheel 141 expired: Stack<T>, 142 143 /// Delay expiring when the *first* item in the queue expires 144 delay: Option<Pin<Box<Sleep>>>, 145 146 /// Wheel polling state 147 wheel_now: u64, 148 149 /// Instant at which the timer starts 150 start: Instant, 151 152 /// Waker that is invoked when we potentially need to reset the timer. 153 /// Because we lazily create the timer when the first entry is created, we 154 /// need to awaken any poller that polled us before that point. 155 waker: Option<Waker>, 156 } 157 158 #[derive(Default)] 159 struct SlabStorage<T> { 160 inner: Slab<Data<T>>, 161 162 // A `compact` call requires a re-mapping of the `Key`s that were changed 163 // during the `compact` call of the `slab`. Since the keys that were given out 164 // cannot be changed retroactively we need to keep track of these re-mappings. 165 // The keys of `key_map` correspond to the old keys that were given out and 166 // the values to the `Key`s that were re-mapped by the `compact` call. 167 key_map: HashMap<Key, KeyInternal>, 168 169 // Index used to create new keys to hand out. 170 next_key_index: usize, 171 172 // Whether `compact` has been called, necessary in order to decide whether 173 // to include keys in `key_map`. 174 compact_called: bool, 175 } 176 177 impl<T> SlabStorage<T> { with_capacity(capacity: usize) -> SlabStorage<T>178 pub(crate) fn with_capacity(capacity: usize) -> SlabStorage<T> { 179 SlabStorage { 180 inner: Slab::with_capacity(capacity), 181 key_map: HashMap::new(), 182 next_key_index: 0, 183 compact_called: false, 184 } 185 } 186 187 // Inserts data into the inner slab and re-maps keys if necessary insert(&mut self, val: Data<T>) -> Key188 pub(crate) fn insert(&mut self, val: Data<T>) -> Key { 189 let mut key = KeyInternal::new(self.inner.insert(val)); 190 let key_contained = self.key_map.contains_key(&key.into()); 191 192 if key_contained { 193 // It's possible that a `compact` call creates capacity in `self.inner` in 194 // such a way that a `self.inner.insert` call creates a `key` which was 195 // previously given out during an `insert` call prior to the `compact` call. 196 // If `key` is contained in `self.key_map`, we have encountered this exact situation, 197 // We need to create a new key `key_to_give_out` and include the relation 198 // `key_to_give_out` -> `key` in `self.key_map`. 199 let key_to_give_out = self.create_new_key(); 200 assert!(!self.key_map.contains_key(&key_to_give_out.into())); 201 self.key_map.insert(key_to_give_out.into(), key); 202 key = key_to_give_out; 203 } else if self.compact_called { 204 // Include an identity mapping in `self.key_map` in order to allow us to 205 // panic if a key that was handed out is removed more than once. 206 self.key_map.insert(key.into(), key); 207 } 208 209 key.into() 210 } 211 212 // Re-map the key in case compact was previously called. 213 // Note: Since we include identity mappings in key_map after compact was called, 214 // we have information about all keys that were handed out. In the case in which 215 // compact was called and we try to remove a Key that was previously removed 216 // we can detect invalid keys if no key is found in `key_map`. This is necessary 217 // in order to prevent situations in which a previously removed key 218 // corresponds to a re-mapped key internally and which would then be incorrectly 219 // removed from the slab. 220 // 221 // Example to illuminate this problem: 222 // 223 // Let's assume our `key_map` is {1 -> 2, 2 -> 1} and we call remove(1). If we 224 // were to remove 1 again, we would not find it inside `key_map` anymore. 225 // If we were to imply from this that no re-mapping was necessary, we would 226 // incorrectly remove 1 from `self.slab.inner`, which corresponds to the 227 // handed-out key 2. remove(&mut self, key: &Key) -> Data<T>228 pub(crate) fn remove(&mut self, key: &Key) -> Data<T> { 229 let remapped_key = if self.compact_called { 230 match self.key_map.remove(key) { 231 Some(key_internal) => key_internal, 232 None => panic!("invalid key"), 233 } 234 } else { 235 (*key).into() 236 }; 237 238 self.inner.remove(remapped_key.index) 239 } 240 shrink_to_fit(&mut self)241 pub(crate) fn shrink_to_fit(&mut self) { 242 self.inner.shrink_to_fit(); 243 self.key_map.shrink_to_fit(); 244 } 245 compact(&mut self)246 pub(crate) fn compact(&mut self) { 247 if !self.compact_called { 248 for (key, _) in self.inner.iter() { 249 self.key_map.insert(Key::new(key), KeyInternal::new(key)); 250 } 251 } 252 253 let mut remapping = HashMap::new(); 254 self.inner.compact(|_, from, to| { 255 remapping.insert(from, to); 256 true 257 }); 258 259 // At this point `key_map` contains a mapping for every element. 260 for internal_key in self.key_map.values_mut() { 261 if let Some(new_internal_key) = remapping.get(&internal_key.index) { 262 *internal_key = KeyInternal::new(*new_internal_key); 263 } 264 } 265 266 if self.key_map.capacity() > 2 * self.key_map.len() { 267 self.key_map.shrink_to_fit(); 268 } 269 270 self.compact_called = true; 271 } 272 273 // Tries to re-map a `Key` that was given out to the user to its 274 // corresponding internal key. remap_key(&self, key: &Key) -> Option<KeyInternal>275 fn remap_key(&self, key: &Key) -> Option<KeyInternal> { 276 let key_map = &self.key_map; 277 if self.compact_called { 278 key_map.get(key).copied() 279 } else { 280 Some((*key).into()) 281 } 282 } 283 create_new_key(&mut self) -> KeyInternal284 fn create_new_key(&mut self) -> KeyInternal { 285 while self.key_map.contains_key(&Key::new(self.next_key_index)) { 286 self.next_key_index = self.next_key_index.wrapping_add(1); 287 } 288 289 KeyInternal::new(self.next_key_index) 290 } 291 len(&self) -> usize292 pub(crate) fn len(&self) -> usize { 293 self.inner.len() 294 } 295 capacity(&self) -> usize296 pub(crate) fn capacity(&self) -> usize { 297 self.inner.capacity() 298 } 299 clear(&mut self)300 pub(crate) fn clear(&mut self) { 301 self.inner.clear(); 302 self.key_map.clear(); 303 self.compact_called = false; 304 } 305 reserve(&mut self, additional: usize)306 pub(crate) fn reserve(&mut self, additional: usize) { 307 self.inner.reserve(additional); 308 309 if self.compact_called { 310 self.key_map.reserve(additional); 311 } 312 } 313 is_empty(&self) -> bool314 pub(crate) fn is_empty(&self) -> bool { 315 self.inner.is_empty() 316 } 317 contains(&self, key: &Key) -> bool318 pub(crate) fn contains(&self, key: &Key) -> bool { 319 let remapped_key = self.remap_key(key); 320 321 match remapped_key { 322 Some(internal_key) => self.inner.contains(internal_key.index), 323 None => false, 324 } 325 } 326 } 327 328 impl<T> fmt::Debug for SlabStorage<T> 329 where 330 T: fmt::Debug, 331 { fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result332 fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result { 333 if fmt.alternate() { 334 fmt.debug_map().entries(self.inner.iter()).finish() 335 } else { 336 fmt.debug_struct("Slab") 337 .field("len", &self.len()) 338 .field("cap", &self.capacity()) 339 .finish() 340 } 341 } 342 } 343 344 impl<T> Index<Key> for SlabStorage<T> { 345 type Output = Data<T>; 346 index(&self, key: Key) -> &Self::Output347 fn index(&self, key: Key) -> &Self::Output { 348 let remapped_key = self.remap_key(&key); 349 350 match remapped_key { 351 Some(internal_key) => &self.inner[internal_key.index], 352 None => panic!("Invalid index {}", key.index), 353 } 354 } 355 } 356 357 impl<T> IndexMut<Key> for SlabStorage<T> { index_mut(&mut self, key: Key) -> &mut Data<T>358 fn index_mut(&mut self, key: Key) -> &mut Data<T> { 359 let remapped_key = self.remap_key(&key); 360 361 match remapped_key { 362 Some(internal_key) => &mut self.inner[internal_key.index], 363 None => panic!("Invalid index {}", key.index), 364 } 365 } 366 } 367 368 /// An entry in `DelayQueue` that has expired and been removed. 369 /// 370 /// Values are returned by [`DelayQueue::poll_expired`]. 371 /// 372 /// [`DelayQueue::poll_expired`]: method@DelayQueue::poll_expired 373 #[derive(Debug)] 374 pub struct Expired<T> { 375 /// The data stored in the queue 376 data: T, 377 378 /// The expiration time 379 deadline: Instant, 380 381 /// The key associated with the entry 382 key: Key, 383 } 384 385 /// Token to a value stored in a `DelayQueue`. 386 /// 387 /// Instances of `Key` are returned by [`DelayQueue::insert`]. See [`DelayQueue`] 388 /// documentation for more details. 389 /// 390 /// [`DelayQueue`]: struct@DelayQueue 391 /// [`DelayQueue::insert`]: method@DelayQueue::insert 392 #[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)] 393 pub struct Key { 394 index: usize, 395 } 396 397 // Whereas `Key` is given out to users that use `DelayQueue`, internally we use 398 // `KeyInternal` as the key type in order to make the logic of mapping between keys 399 // as a result of `compact` calls clearer. 400 #[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)] 401 struct KeyInternal { 402 index: usize, 403 } 404 405 #[derive(Debug)] 406 struct Stack<T> { 407 /// Head of the stack 408 head: Option<Key>, 409 _p: PhantomData<fn() -> T>, 410 } 411 412 #[derive(Debug)] 413 struct Data<T> { 414 /// The data being stored in the queue and will be returned at the requested 415 /// instant. 416 inner: T, 417 418 /// The instant at which the item is returned. 419 when: u64, 420 421 /// Set to true when stored in the `expired` queue 422 expired: bool, 423 424 /// Next entry in the stack 425 next: Option<Key>, 426 427 /// Previous entry in the stack 428 prev: Option<Key>, 429 } 430 431 /// Maximum number of entries the queue can handle 432 const MAX_ENTRIES: usize = (1 << 30) - 1; 433 434 impl<T> DelayQueue<T> { 435 /// Creates a new, empty, `DelayQueue`. 436 /// 437 /// The queue will not allocate storage until items are inserted into it. 438 /// 439 /// # Examples 440 /// 441 /// ```rust 442 /// # use tokio_util::time::DelayQueue; 443 /// let delay_queue: DelayQueue<u32> = DelayQueue::new(); 444 /// ``` new() -> DelayQueue<T>445 pub fn new() -> DelayQueue<T> { 446 DelayQueue::with_capacity(0) 447 } 448 449 /// Creates a new, empty, `DelayQueue` with the specified capacity. 450 /// 451 /// The queue will be able to hold at least `capacity` elements without 452 /// reallocating. If `capacity` is 0, the queue will not allocate for 453 /// storage. 454 /// 455 /// # Examples 456 /// 457 /// ```rust 458 /// # use tokio_util::time::DelayQueue; 459 /// # use std::time::Duration; 460 /// 461 /// # #[tokio::main] 462 /// # async fn main() { 463 /// let mut delay_queue = DelayQueue::with_capacity(10); 464 /// 465 /// // These insertions are done without further allocation 466 /// for i in 0..10 { 467 /// delay_queue.insert(i, Duration::from_secs(i)); 468 /// } 469 /// 470 /// // This will make the queue allocate additional storage 471 /// delay_queue.insert(11, Duration::from_secs(11)); 472 /// # } 473 /// ``` with_capacity(capacity: usize) -> DelayQueue<T>474 pub fn with_capacity(capacity: usize) -> DelayQueue<T> { 475 DelayQueue { 476 wheel: Wheel::new(), 477 slab: SlabStorage::with_capacity(capacity), 478 expired: Stack::default(), 479 delay: None, 480 wheel_now: 0, 481 start: Instant::now(), 482 waker: None, 483 } 484 } 485 486 /// Inserts `value` into the queue set to expire at a specific instant in 487 /// time. 488 /// 489 /// This function is identical to `insert`, but takes an `Instant` instead 490 /// of a `Duration`. 491 /// 492 /// `value` is stored in the queue until `when` is reached. At which point, 493 /// `value` will be returned from [`poll_expired`]. If `when` has already been 494 /// reached, then `value` is immediately made available to poll. 495 /// 496 /// The return value represents the insertion and is used as an argument to 497 /// [`remove`] and [`reset`]. Note that [`Key`] is a token and is reused once 498 /// `value` is removed from the queue either by calling [`poll_expired`] after 499 /// `when` is reached or by calling [`remove`]. At this point, the caller 500 /// must take care to not use the returned [`Key`] again as it may reference 501 /// a different item in the queue. 502 /// 503 /// See [type] level documentation for more details. 504 /// 505 /// # Panics 506 /// 507 /// This function panics if `when` is too far in the future. 508 /// 509 /// # Examples 510 /// 511 /// Basic usage 512 /// 513 /// ```rust 514 /// use tokio::time::{Duration, Instant}; 515 /// use tokio_util::time::DelayQueue; 516 /// 517 /// # #[tokio::main] 518 /// # async fn main() { 519 /// let mut delay_queue = DelayQueue::new(); 520 /// let key = delay_queue.insert_at( 521 /// "foo", Instant::now() + Duration::from_secs(5)); 522 /// 523 /// // Remove the entry 524 /// let item = delay_queue.remove(&key); 525 /// assert_eq!(*item.get_ref(), "foo"); 526 /// # } 527 /// ``` 528 /// 529 /// [`poll_expired`]: method@Self::poll_expired 530 /// [`remove`]: method@Self::remove 531 /// [`reset`]: method@Self::reset 532 /// [`Key`]: struct@Key 533 /// [type]: # 534 #[track_caller] insert_at(&mut self, value: T, when: Instant) -> Key535 pub fn insert_at(&mut self, value: T, when: Instant) -> Key { 536 assert!(self.slab.len() < MAX_ENTRIES, "max entries exceeded"); 537 538 // Normalize the deadline. Values cannot be set to expire in the past. 539 let when = self.normalize_deadline(when); 540 541 // Insert the value in the store 542 let key = self.slab.insert(Data { 543 inner: value, 544 when, 545 expired: false, 546 next: None, 547 prev: None, 548 }); 549 550 self.insert_idx(when, key); 551 552 // Set a new delay if the current's deadline is later than the one of the new item 553 let should_set_delay = if let Some(ref delay) = self.delay { 554 let current_exp = self.normalize_deadline(delay.deadline()); 555 current_exp > when 556 } else { 557 true 558 }; 559 560 if should_set_delay { 561 if let Some(waker) = self.waker.take() { 562 waker.wake(); 563 } 564 565 let delay_time = self.start + Duration::from_millis(when); 566 if let Some(ref mut delay) = &mut self.delay { 567 delay.as_mut().reset(delay_time); 568 } else { 569 self.delay = Some(Box::pin(sleep_until(delay_time))); 570 } 571 } 572 573 key 574 } 575 576 /// Attempts to pull out the next value of the delay queue, registering the 577 /// current task for wakeup if the value is not yet available, and returning 578 /// `None` if the queue is exhausted. poll_expired(&mut self, cx: &mut task::Context<'_>) -> Poll<Option<Expired<T>>>579 pub fn poll_expired(&mut self, cx: &mut task::Context<'_>) -> Poll<Option<Expired<T>>> { 580 if !self 581 .waker 582 .as_ref() 583 .map(|w| w.will_wake(cx.waker())) 584 .unwrap_or(false) 585 { 586 self.waker = Some(cx.waker().clone()); 587 } 588 589 let item = ready!(self.poll_idx(cx)); 590 Poll::Ready(item.map(|key| { 591 let data = self.slab.remove(&key); 592 debug_assert!(data.next.is_none()); 593 debug_assert!(data.prev.is_none()); 594 595 Expired { 596 key, 597 data: data.inner, 598 deadline: self.start + Duration::from_millis(data.when), 599 } 600 })) 601 } 602 603 /// Inserts `value` into the queue set to expire after the requested duration 604 /// elapses. 605 /// 606 /// This function is identical to `insert_at`, but takes a `Duration` 607 /// instead of an `Instant`. 608 /// 609 /// `value` is stored in the queue until `timeout` duration has 610 /// elapsed after `insert` was called. At that point, `value` will 611 /// be returned from [`poll_expired`]. If `timeout` is a `Duration` of 612 /// zero, then `value` is immediately made available to poll. 613 /// 614 /// The return value represents the insertion and is used as an 615 /// argument to [`remove`] and [`reset`]. Note that [`Key`] is a 616 /// token and is reused once `value` is removed from the queue 617 /// either by calling [`poll_expired`] after `timeout` has elapsed 618 /// or by calling [`remove`]. At this point, the caller must not 619 /// use the returned [`Key`] again as it may reference a different 620 /// item in the queue. 621 /// 622 /// See [type] level documentation for more details. 623 /// 624 /// # Panics 625 /// 626 /// This function panics if `timeout` is greater than the maximum 627 /// duration supported by the timer in the current `Runtime`. 628 /// 629 /// # Examples 630 /// 631 /// Basic usage 632 /// 633 /// ```rust 634 /// use tokio_util::time::DelayQueue; 635 /// use std::time::Duration; 636 /// 637 /// # #[tokio::main] 638 /// # async fn main() { 639 /// let mut delay_queue = DelayQueue::new(); 640 /// let key = delay_queue.insert("foo", Duration::from_secs(5)); 641 /// 642 /// // Remove the entry 643 /// let item = delay_queue.remove(&key); 644 /// assert_eq!(*item.get_ref(), "foo"); 645 /// # } 646 /// ``` 647 /// 648 /// [`poll_expired`]: method@Self::poll_expired 649 /// [`remove`]: method@Self::remove 650 /// [`reset`]: method@Self::reset 651 /// [`Key`]: struct@Key 652 /// [type]: # 653 #[track_caller] insert(&mut self, value: T, timeout: Duration) -> Key654 pub fn insert(&mut self, value: T, timeout: Duration) -> Key { 655 self.insert_at(value, Instant::now() + timeout) 656 } 657 658 #[track_caller] insert_idx(&mut self, when: u64, key: Key)659 fn insert_idx(&mut self, when: u64, key: Key) { 660 use self::wheel::{InsertError, Stack}; 661 662 // Register the deadline with the timer wheel 663 match self.wheel.insert(when, key, &mut self.slab) { 664 Ok(_) => {} 665 Err((_, InsertError::Elapsed)) => { 666 self.slab[key].expired = true; 667 // The delay is already expired, store it in the expired queue 668 self.expired.push(key, &mut self.slab); 669 } 670 Err((_, err)) => panic!("invalid deadline; err={:?}", err), 671 } 672 } 673 674 /// Removes the key from the expired queue or the timer wheel 675 /// depending on its expiration status. 676 /// 677 /// # Panics 678 /// 679 /// Panics if the key is not contained in the expired queue or the wheel. 680 #[track_caller] remove_key(&mut self, key: &Key)681 fn remove_key(&mut self, key: &Key) { 682 use crate::time::wheel::Stack; 683 684 // Special case the `expired` queue 685 if self.slab[*key].expired { 686 self.expired.remove(key, &mut self.slab); 687 } else { 688 self.wheel.remove(key, &mut self.slab); 689 } 690 } 691 692 /// Removes the item associated with `key` from the queue. 693 /// 694 /// There must be an item associated with `key`. The function returns the 695 /// removed item as well as the `Instant` at which it will the delay will 696 /// have expired. 697 /// 698 /// # Panics 699 /// 700 /// The function panics if `key` is not contained by the queue. 701 /// 702 /// # Examples 703 /// 704 /// Basic usage 705 /// 706 /// ```rust 707 /// use tokio_util::time::DelayQueue; 708 /// use std::time::Duration; 709 /// 710 /// # #[tokio::main] 711 /// # async fn main() { 712 /// let mut delay_queue = DelayQueue::new(); 713 /// let key = delay_queue.insert("foo", Duration::from_secs(5)); 714 /// 715 /// // Remove the entry 716 /// let item = delay_queue.remove(&key); 717 /// assert_eq!(*item.get_ref(), "foo"); 718 /// # } 719 /// ``` 720 #[track_caller] remove(&mut self, key: &Key) -> Expired<T>721 pub fn remove(&mut self, key: &Key) -> Expired<T> { 722 let prev_deadline = self.next_deadline(); 723 724 self.remove_key(key); 725 let data = self.slab.remove(key); 726 727 let next_deadline = self.next_deadline(); 728 if prev_deadline != next_deadline { 729 match (next_deadline, &mut self.delay) { 730 (None, _) => self.delay = None, 731 (Some(deadline), Some(delay)) => delay.as_mut().reset(deadline), 732 (Some(deadline), None) => self.delay = Some(Box::pin(sleep_until(deadline))), 733 } 734 } 735 736 Expired { 737 key: Key::new(key.index), 738 data: data.inner, 739 deadline: self.start + Duration::from_millis(data.when), 740 } 741 } 742 743 /// Attempts to remove the item associated with `key` from the queue. 744 /// 745 /// Removes the item associated with `key`, and returns it along with the 746 /// `Instant` at which it would have expired, if it exists. 747 /// 748 /// Returns `None` if `key` is not in the queue. 749 /// 750 /// # Examples 751 /// 752 /// Basic usage 753 /// 754 /// ```rust 755 /// use tokio_util::time::DelayQueue; 756 /// use std::time::Duration; 757 /// 758 /// # #[tokio::main(flavor = "current_thread")] 759 /// # async fn main() { 760 /// let mut delay_queue = DelayQueue::new(); 761 /// let key = delay_queue.insert("foo", Duration::from_secs(5)); 762 /// 763 /// // The item is in the queue, `try_remove` returns `Some(Expired("foo"))`. 764 /// let item = delay_queue.try_remove(&key); 765 /// assert_eq!(item.unwrap().into_inner(), "foo"); 766 /// 767 /// // The item is not in the queue anymore, `try_remove` returns `None`. 768 /// let item = delay_queue.try_remove(&key); 769 /// assert!(item.is_none()); 770 /// # } 771 /// ``` try_remove(&mut self, key: &Key) -> Option<Expired<T>>772 pub fn try_remove(&mut self, key: &Key) -> Option<Expired<T>> { 773 if self.slab.contains(key) { 774 Some(self.remove(key)) 775 } else { 776 None 777 } 778 } 779 780 /// Sets the delay of the item associated with `key` to expire at `when`. 781 /// 782 /// This function is identical to `reset` but takes an `Instant` instead of 783 /// a `Duration`. 784 /// 785 /// The item remains in the queue but the delay is set to expire at `when`. 786 /// If `when` is in the past, then the item is immediately made available to 787 /// the caller. 788 /// 789 /// # Panics 790 /// 791 /// This function panics if `when` is too far in the future or if `key` is 792 /// not contained by the queue. 793 /// 794 /// # Examples 795 /// 796 /// Basic usage 797 /// 798 /// ```rust 799 /// use tokio::time::{Duration, Instant}; 800 /// use tokio_util::time::DelayQueue; 801 /// 802 /// # #[tokio::main] 803 /// # async fn main() { 804 /// let mut delay_queue = DelayQueue::new(); 805 /// let key = delay_queue.insert("foo", Duration::from_secs(5)); 806 /// 807 /// // "foo" is scheduled to be returned in 5 seconds 808 /// 809 /// delay_queue.reset_at(&key, Instant::now() + Duration::from_secs(10)); 810 /// 811 /// // "foo" is now scheduled to be returned in 10 seconds 812 /// # } 813 /// ``` 814 #[track_caller] reset_at(&mut self, key: &Key, when: Instant)815 pub fn reset_at(&mut self, key: &Key, when: Instant) { 816 self.remove_key(key); 817 818 // Normalize the deadline. Values cannot be set to expire in the past. 819 let when = self.normalize_deadline(when); 820 821 self.slab[*key].when = when; 822 self.slab[*key].expired = false; 823 824 self.insert_idx(when, *key); 825 826 let next_deadline = self.next_deadline(); 827 if let (Some(ref mut delay), Some(deadline)) = (&mut self.delay, next_deadline) { 828 // This should awaken us if necessary (ie, if already expired) 829 delay.as_mut().reset(deadline); 830 } 831 } 832 833 /// Shrink the capacity of the slab, which `DelayQueue` uses internally for storage allocation. 834 /// This function is not guaranteed to, and in most cases, won't decrease the capacity of the slab 835 /// to the number of elements still contained in it, because elements cannot be moved to a different 836 /// index. To decrease the capacity to the size of the slab use [`compact`]. 837 /// 838 /// This function can take O(n) time even when the capacity cannot be reduced or the allocation is 839 /// shrunk in place. Repeated calls run in O(1) though. 840 /// 841 /// [`compact`]: method@Self::compact shrink_to_fit(&mut self)842 pub fn shrink_to_fit(&mut self) { 843 self.slab.shrink_to_fit(); 844 } 845 846 /// Shrink the capacity of the slab, which `DelayQueue` uses internally for storage allocation, 847 /// to the number of elements that are contained in it. 848 /// 849 /// This methods runs in O(n). 850 /// 851 /// # Examples 852 /// 853 /// Basic usage 854 /// 855 /// ```rust 856 /// use tokio_util::time::DelayQueue; 857 /// use std::time::Duration; 858 /// 859 /// # #[tokio::main] 860 /// # async fn main() { 861 /// let mut delay_queue = DelayQueue::with_capacity(10); 862 /// 863 /// let key1 = delay_queue.insert(5, Duration::from_secs(5)); 864 /// let key2 = delay_queue.insert(10, Duration::from_secs(10)); 865 /// let key3 = delay_queue.insert(15, Duration::from_secs(15)); 866 /// 867 /// delay_queue.remove(&key2); 868 /// 869 /// delay_queue.compact(); 870 /// assert_eq!(delay_queue.capacity(), 2); 871 /// # } 872 /// ``` compact(&mut self)873 pub fn compact(&mut self) { 874 self.slab.compact(); 875 } 876 877 /// Gets the [`Key`] that [`poll_expired`] will pull out of the queue next, without 878 /// pulling it out or waiting for the deadline to expire. 879 /// 880 /// Entries that have already expired may be returned in any order, but it is 881 /// guaranteed that this method returns them in the same order as when items 882 /// are popped from the `DelayQueue`. 883 /// 884 /// # Examples 885 /// 886 /// Basic usage 887 /// 888 /// ```rust 889 /// use tokio_util::time::DelayQueue; 890 /// use std::time::Duration; 891 /// 892 /// # #[tokio::main] 893 /// # async fn main() { 894 /// let mut delay_queue = DelayQueue::new(); 895 /// 896 /// let key1 = delay_queue.insert("foo", Duration::from_secs(10)); 897 /// let key2 = delay_queue.insert("bar", Duration::from_secs(5)); 898 /// let key3 = delay_queue.insert("baz", Duration::from_secs(15)); 899 /// 900 /// assert_eq!(delay_queue.peek().unwrap(), key2); 901 /// # } 902 /// ``` 903 /// 904 /// [`Key`]: struct@Key 905 /// [`poll_expired`]: method@Self::poll_expired peek(&self) -> Option<Key>906 pub fn peek(&self) -> Option<Key> { 907 use self::wheel::Stack; 908 909 self.expired.peek().or_else(|| self.wheel.peek()) 910 } 911 912 /// Returns the next time to poll as determined by the wheel next_deadline(&mut self) -> Option<Instant>913 fn next_deadline(&mut self) -> Option<Instant> { 914 self.wheel 915 .poll_at() 916 .map(|poll_at| self.start + Duration::from_millis(poll_at)) 917 } 918 919 /// Sets the delay of the item associated with `key` to expire after 920 /// `timeout`. 921 /// 922 /// This function is identical to `reset_at` but takes a `Duration` instead 923 /// of an `Instant`. 924 /// 925 /// The item remains in the queue but the delay is set to expire after 926 /// `timeout`. If `timeout` is zero, then the item is immediately made 927 /// available to the caller. 928 /// 929 /// # Panics 930 /// 931 /// This function panics if `timeout` is greater than the maximum supported 932 /// duration or if `key` is not contained by the queue. 933 /// 934 /// # Examples 935 /// 936 /// Basic usage 937 /// 938 /// ```rust 939 /// use tokio_util::time::DelayQueue; 940 /// use std::time::Duration; 941 /// 942 /// # #[tokio::main] 943 /// # async fn main() { 944 /// let mut delay_queue = DelayQueue::new(); 945 /// let key = delay_queue.insert("foo", Duration::from_secs(5)); 946 /// 947 /// // "foo" is scheduled to be returned in 5 seconds 948 /// 949 /// delay_queue.reset(&key, Duration::from_secs(10)); 950 /// 951 /// // "foo"is now scheduled to be returned in 10 seconds 952 /// # } 953 /// ``` 954 #[track_caller] reset(&mut self, key: &Key, timeout: Duration)955 pub fn reset(&mut self, key: &Key, timeout: Duration) { 956 self.reset_at(key, Instant::now() + timeout); 957 } 958 959 /// Clears the queue, removing all items. 960 /// 961 /// After calling `clear`, [`poll_expired`] will return `Ok(Ready(None))`. 962 /// 963 /// Note that this method has no effect on the allocated capacity. 964 /// 965 /// [`poll_expired`]: method@Self::poll_expired 966 /// 967 /// # Examples 968 /// 969 /// ```rust 970 /// use tokio_util::time::DelayQueue; 971 /// use std::time::Duration; 972 /// 973 /// # #[tokio::main] 974 /// # async fn main() { 975 /// let mut delay_queue = DelayQueue::new(); 976 /// 977 /// delay_queue.insert("foo", Duration::from_secs(5)); 978 /// 979 /// assert!(!delay_queue.is_empty()); 980 /// 981 /// delay_queue.clear(); 982 /// 983 /// assert!(delay_queue.is_empty()); 984 /// # } 985 /// ``` clear(&mut self)986 pub fn clear(&mut self) { 987 self.slab.clear(); 988 self.expired = Stack::default(); 989 self.wheel = Wheel::new(); 990 self.delay = None; 991 } 992 993 /// Returns the number of elements the queue can hold without reallocating. 994 /// 995 /// # Examples 996 /// 997 /// ```rust 998 /// use tokio_util::time::DelayQueue; 999 /// 1000 /// let delay_queue: DelayQueue<i32> = DelayQueue::with_capacity(10); 1001 /// assert_eq!(delay_queue.capacity(), 10); 1002 /// ``` capacity(&self) -> usize1003 pub fn capacity(&self) -> usize { 1004 self.slab.capacity() 1005 } 1006 1007 /// Returns the number of elements currently in the queue. 1008 /// 1009 /// # Examples 1010 /// 1011 /// ```rust 1012 /// use tokio_util::time::DelayQueue; 1013 /// use std::time::Duration; 1014 /// 1015 /// # #[tokio::main] 1016 /// # async fn main() { 1017 /// let mut delay_queue: DelayQueue<i32> = DelayQueue::with_capacity(10); 1018 /// assert_eq!(delay_queue.len(), 0); 1019 /// delay_queue.insert(3, Duration::from_secs(5)); 1020 /// assert_eq!(delay_queue.len(), 1); 1021 /// # } 1022 /// ``` len(&self) -> usize1023 pub fn len(&self) -> usize { 1024 self.slab.len() 1025 } 1026 1027 /// Reserves capacity for at least `additional` more items to be queued 1028 /// without allocating. 1029 /// 1030 /// `reserve` does nothing if the queue already has sufficient capacity for 1031 /// `additional` more values. If more capacity is required, a new segment of 1032 /// memory will be allocated and all existing values will be copied into it. 1033 /// As such, if the queue is already very large, a call to `reserve` can end 1034 /// up being expensive. 1035 /// 1036 /// The queue may reserve more than `additional` extra space in order to 1037 /// avoid frequent reallocations. 1038 /// 1039 /// # Panics 1040 /// 1041 /// Panics if the new capacity exceeds the maximum number of entries the 1042 /// queue can contain. 1043 /// 1044 /// # Examples 1045 /// 1046 /// ``` 1047 /// use tokio_util::time::DelayQueue; 1048 /// use std::time::Duration; 1049 /// 1050 /// # #[tokio::main] 1051 /// # async fn main() { 1052 /// let mut delay_queue = DelayQueue::new(); 1053 /// 1054 /// delay_queue.insert("hello", Duration::from_secs(10)); 1055 /// delay_queue.reserve(10); 1056 /// 1057 /// assert!(delay_queue.capacity() >= 11); 1058 /// # } 1059 /// ``` 1060 #[track_caller] reserve(&mut self, additional: usize)1061 pub fn reserve(&mut self, additional: usize) { 1062 assert!( 1063 self.slab.capacity() + additional <= MAX_ENTRIES, 1064 "max queue capacity exceeded" 1065 ); 1066 self.slab.reserve(additional); 1067 } 1068 1069 /// Returns `true` if there are no items in the queue. 1070 /// 1071 /// Note that this function returns `false` even if all items have not yet 1072 /// expired and a call to `poll` will return `Poll::Pending`. 1073 /// 1074 /// # Examples 1075 /// 1076 /// ``` 1077 /// use tokio_util::time::DelayQueue; 1078 /// use std::time::Duration; 1079 /// 1080 /// # #[tokio::main] 1081 /// # async fn main() { 1082 /// let mut delay_queue = DelayQueue::new(); 1083 /// assert!(delay_queue.is_empty()); 1084 /// 1085 /// delay_queue.insert("hello", Duration::from_secs(5)); 1086 /// assert!(!delay_queue.is_empty()); 1087 /// # } 1088 /// ``` is_empty(&self) -> bool1089 pub fn is_empty(&self) -> bool { 1090 self.slab.is_empty() 1091 } 1092 1093 /// Polls the queue, returning the index of the next slot in the slab that 1094 /// should be returned. 1095 /// 1096 /// A slot should be returned when the associated deadline has been reached. poll_idx(&mut self, cx: &mut task::Context<'_>) -> Poll<Option<Key>>1097 fn poll_idx(&mut self, cx: &mut task::Context<'_>) -> Poll<Option<Key>> { 1098 use self::wheel::Stack; 1099 1100 let expired = self.expired.pop(&mut self.slab); 1101 1102 if expired.is_some() { 1103 return Poll::Ready(expired); 1104 } 1105 1106 loop { 1107 if let Some(ref mut delay) = self.delay { 1108 if !delay.is_elapsed() { 1109 ready!(Pin::new(&mut *delay).poll(cx)); 1110 } 1111 1112 let now = crate::time::ms(delay.deadline() - self.start, crate::time::Round::Down); 1113 1114 self.wheel_now = now; 1115 } 1116 1117 // We poll the wheel to get the next value out before finding the next deadline. 1118 let wheel_idx = self.wheel.poll(self.wheel_now, &mut self.slab); 1119 1120 self.delay = self.next_deadline().map(|when| Box::pin(sleep_until(when))); 1121 1122 if let Some(idx) = wheel_idx { 1123 return Poll::Ready(Some(idx)); 1124 } 1125 1126 if self.delay.is_none() { 1127 return Poll::Ready(None); 1128 } 1129 } 1130 } 1131 normalize_deadline(&self, when: Instant) -> u641132 fn normalize_deadline(&self, when: Instant) -> u64 { 1133 let when = if when < self.start { 1134 0 1135 } else { 1136 crate::time::ms(when - self.start, crate::time::Round::Up) 1137 }; 1138 1139 cmp::max(when, self.wheel.elapsed()) 1140 } 1141 } 1142 1143 // We never put `T` in a `Pin`... 1144 impl<T> Unpin for DelayQueue<T> {} 1145 1146 impl<T> Default for DelayQueue<T> { default() -> DelayQueue<T>1147 fn default() -> DelayQueue<T> { 1148 DelayQueue::new() 1149 } 1150 } 1151 1152 impl<T> futures_core::Stream for DelayQueue<T> { 1153 // DelayQueue seems much more specific, where a user may care that it 1154 // has reached capacity, so return those errors instead of panicking. 1155 type Item = Expired<T>; 1156 poll_next(self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll<Option<Self::Item>>1157 fn poll_next(self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll<Option<Self::Item>> { 1158 DelayQueue::poll_expired(self.get_mut(), cx) 1159 } 1160 } 1161 1162 impl<T> wheel::Stack for Stack<T> { 1163 type Owned = Key; 1164 type Borrowed = Key; 1165 type Store = SlabStorage<T>; 1166 is_empty(&self) -> bool1167 fn is_empty(&self) -> bool { 1168 self.head.is_none() 1169 } 1170 push(&mut self, item: Self::Owned, store: &mut Self::Store)1171 fn push(&mut self, item: Self::Owned, store: &mut Self::Store) { 1172 // Ensure the entry is not already in a stack. 1173 debug_assert!(store[item].next.is_none()); 1174 debug_assert!(store[item].prev.is_none()); 1175 1176 // Remove the old head entry 1177 let old = self.head.take(); 1178 1179 if let Some(idx) = old { 1180 store[idx].prev = Some(item); 1181 } 1182 1183 store[item].next = old; 1184 self.head = Some(item); 1185 } 1186 pop(&mut self, store: &mut Self::Store) -> Option<Self::Owned>1187 fn pop(&mut self, store: &mut Self::Store) -> Option<Self::Owned> { 1188 if let Some(key) = self.head { 1189 self.head = store[key].next; 1190 1191 if let Some(idx) = self.head { 1192 store[idx].prev = None; 1193 } 1194 1195 store[key].next = None; 1196 debug_assert!(store[key].prev.is_none()); 1197 1198 Some(key) 1199 } else { 1200 None 1201 } 1202 } 1203 peek(&self) -> Option<Self::Owned>1204 fn peek(&self) -> Option<Self::Owned> { 1205 self.head 1206 } 1207 1208 #[track_caller] remove(&mut self, item: &Self::Borrowed, store: &mut Self::Store)1209 fn remove(&mut self, item: &Self::Borrowed, store: &mut Self::Store) { 1210 let key = *item; 1211 assert!(store.contains(item)); 1212 1213 // Ensure that the entry is in fact contained by the stack 1214 debug_assert!({ 1215 // This walks the full linked list even if an entry is found. 1216 let mut next = self.head; 1217 let mut contains = false; 1218 1219 while let Some(idx) = next { 1220 let data = &store[idx]; 1221 1222 if idx == *item { 1223 debug_assert!(!contains); 1224 contains = true; 1225 } 1226 1227 next = data.next; 1228 } 1229 1230 contains 1231 }); 1232 1233 if let Some(next) = store[key].next { 1234 store[next].prev = store[key].prev; 1235 } 1236 1237 if let Some(prev) = store[key].prev { 1238 store[prev].next = store[key].next; 1239 } else { 1240 self.head = store[key].next; 1241 } 1242 1243 store[key].next = None; 1244 store[key].prev = None; 1245 } 1246 when(item: &Self::Borrowed, store: &Self::Store) -> u641247 fn when(item: &Self::Borrowed, store: &Self::Store) -> u64 { 1248 store[*item].when 1249 } 1250 } 1251 1252 impl<T> Default for Stack<T> { default() -> Stack<T>1253 fn default() -> Stack<T> { 1254 Stack { 1255 head: None, 1256 _p: PhantomData, 1257 } 1258 } 1259 } 1260 1261 impl Key { new(index: usize) -> Key1262 pub(crate) fn new(index: usize) -> Key { 1263 Key { index } 1264 } 1265 } 1266 1267 impl KeyInternal { new(index: usize) -> KeyInternal1268 pub(crate) fn new(index: usize) -> KeyInternal { 1269 KeyInternal { index } 1270 } 1271 } 1272 1273 impl From<Key> for KeyInternal { from(item: Key) -> Self1274 fn from(item: Key) -> Self { 1275 KeyInternal::new(item.index) 1276 } 1277 } 1278 1279 impl From<KeyInternal> for Key { from(item: KeyInternal) -> Self1280 fn from(item: KeyInternal) -> Self { 1281 Key::new(item.index) 1282 } 1283 } 1284 1285 impl<T> Expired<T> { 1286 /// Returns a reference to the inner value. get_ref(&self) -> &T1287 pub fn get_ref(&self) -> &T { 1288 &self.data 1289 } 1290 1291 /// Returns a mutable reference to the inner value. get_mut(&mut self) -> &mut T1292 pub fn get_mut(&mut self) -> &mut T { 1293 &mut self.data 1294 } 1295 1296 /// Consumes `self` and returns the inner value. into_inner(self) -> T1297 pub fn into_inner(self) -> T { 1298 self.data 1299 } 1300 1301 /// Returns the deadline that the expiration was set to. deadline(&self) -> Instant1302 pub fn deadline(&self) -> Instant { 1303 self.deadline 1304 } 1305 1306 /// Returns the key that the expiration is indexed by. key(&self) -> Key1307 pub fn key(&self) -> Key { 1308 self.key 1309 } 1310 } 1311