• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 #![allow(clippy::disallowed_names)]
2 #![warn(rust_2018_idioms)]
3 #![cfg(feature = "full")]
4 
5 use futures::StreamExt;
6 use tokio::time::{self, sleep, sleep_until, Duration, Instant};
7 use tokio_test::{assert_pending, assert_ready, task};
8 use tokio_util::time::DelayQueue;
9 
10 macro_rules! poll {
11     ($queue:ident) => {
12         $queue.enter(|cx, mut queue| queue.poll_expired(cx))
13     };
14 }
15 
16 macro_rules! assert_ready_some {
17     ($e:expr) => {{
18         match assert_ready!($e) {
19             Some(v) => v,
20             None => panic!("None"),
21         }
22     }};
23 }
24 
25 #[tokio::test]
single_immediate_delay()26 async fn single_immediate_delay() {
27     time::pause();
28 
29     let mut queue = task::spawn(DelayQueue::new());
30     let _key = queue.insert_at("foo", Instant::now());
31 
32     // Advance time by 1ms to handle thee rounding
33     sleep(ms(1)).await;
34 
35     assert_ready_some!(poll!(queue));
36 
37     let entry = assert_ready!(poll!(queue));
38     assert!(entry.is_none())
39 }
40 
41 #[tokio::test]
multi_immediate_delays()42 async fn multi_immediate_delays() {
43     time::pause();
44 
45     let mut queue = task::spawn(DelayQueue::new());
46 
47     let _k = queue.insert_at("1", Instant::now());
48     let _k = queue.insert_at("2", Instant::now());
49     let _k = queue.insert_at("3", Instant::now());
50 
51     sleep(ms(1)).await;
52 
53     let mut res = vec![];
54 
55     while res.len() < 3 {
56         let entry = assert_ready_some!(poll!(queue));
57         res.push(entry.into_inner());
58     }
59 
60     let entry = assert_ready!(poll!(queue));
61     assert!(entry.is_none());
62 
63     res.sort_unstable();
64 
65     assert_eq!("1", res[0]);
66     assert_eq!("2", res[1]);
67     assert_eq!("3", res[2]);
68 }
69 
70 #[tokio::test]
single_short_delay()71 async fn single_short_delay() {
72     time::pause();
73 
74     let mut queue = task::spawn(DelayQueue::new());
75     let _key = queue.insert_at("foo", Instant::now() + ms(5));
76 
77     assert_pending!(poll!(queue));
78 
79     sleep(ms(1)).await;
80 
81     assert!(!queue.is_woken());
82 
83     sleep(ms(5)).await;
84 
85     assert!(queue.is_woken());
86 
87     let entry = assert_ready_some!(poll!(queue));
88     assert_eq!(*entry.get_ref(), "foo");
89 
90     let entry = assert_ready!(poll!(queue));
91     assert!(entry.is_none());
92 }
93 
94 #[tokio::test]
multi_delay_at_start()95 async fn multi_delay_at_start() {
96     time::pause();
97 
98     let long = 262_144 + 9 * 4096;
99     let delays = &[1000, 2, 234, long, 60, 10];
100 
101     let mut queue = task::spawn(DelayQueue::new());
102 
103     // Setup the delays
104     for &i in delays {
105         let _key = queue.insert_at(i, Instant::now() + ms(i));
106     }
107 
108     assert_pending!(poll!(queue));
109     assert!(!queue.is_woken());
110 
111     let start = Instant::now();
112     for elapsed in 0..1200 {
113         println!("elapsed: {:?}", elapsed);
114         let elapsed = elapsed + 1;
115         tokio::time::sleep_until(start + ms(elapsed)).await;
116 
117         if delays.contains(&elapsed) {
118             assert!(queue.is_woken());
119             assert_ready!(poll!(queue));
120             assert_pending!(poll!(queue));
121         } else if queue.is_woken() {
122             let cascade = &[192, 960];
123             assert!(
124                 cascade.contains(&elapsed),
125                 "elapsed={} dt={:?}",
126                 elapsed,
127                 Instant::now() - start
128             );
129 
130             assert_pending!(poll!(queue));
131         }
132     }
133     println!("finished multi_delay_start");
134 }
135 
136 #[tokio::test]
insert_in_past_fires_immediately()137 async fn insert_in_past_fires_immediately() {
138     println!("running insert_in_past_fires_immediately");
139     time::pause();
140 
141     let mut queue = task::spawn(DelayQueue::new());
142     let now = Instant::now();
143 
144     sleep(ms(10)).await;
145 
146     queue.insert_at("foo", now);
147 
148     assert_ready!(poll!(queue));
149     println!("finished insert_in_past_fires_immediately");
150 }
151 
152 #[tokio::test]
remove_entry()153 async fn remove_entry() {
154     time::pause();
155 
156     let mut queue = task::spawn(DelayQueue::new());
157 
158     let key = queue.insert_at("foo", Instant::now() + ms(5));
159 
160     assert_pending!(poll!(queue));
161 
162     let entry = queue.remove(&key);
163     assert_eq!(entry.into_inner(), "foo");
164 
165     sleep(ms(10)).await;
166 
167     let entry = assert_ready!(poll!(queue));
168     assert!(entry.is_none());
169 }
170 
171 #[tokio::test]
reset_entry()172 async fn reset_entry() {
173     time::pause();
174 
175     let mut queue = task::spawn(DelayQueue::new());
176 
177     let now = Instant::now();
178     let key = queue.insert_at("foo", now + ms(5));
179 
180     assert_pending!(poll!(queue));
181     sleep(ms(1)).await;
182 
183     queue.reset_at(&key, now + ms(10));
184 
185     assert_pending!(poll!(queue));
186 
187     sleep(ms(7)).await;
188 
189     assert!(!queue.is_woken());
190 
191     assert_pending!(poll!(queue));
192 
193     sleep(ms(3)).await;
194 
195     assert!(queue.is_woken());
196 
197     let entry = assert_ready_some!(poll!(queue));
198     assert_eq!(*entry.get_ref(), "foo");
199 
200     let entry = assert_ready!(poll!(queue));
201     assert!(entry.is_none())
202 }
203 
204 // Reproduces tokio-rs/tokio#849.
205 #[tokio::test]
reset_much_later()206 async fn reset_much_later() {
207     time::pause();
208 
209     let mut queue = task::spawn(DelayQueue::new());
210 
211     let now = Instant::now();
212     sleep(ms(1)).await;
213 
214     let key = queue.insert_at("foo", now + ms(200));
215     assert_pending!(poll!(queue));
216 
217     sleep(ms(3)).await;
218 
219     queue.reset_at(&key, now + ms(10));
220 
221     sleep(ms(20)).await;
222 
223     assert!(queue.is_woken());
224 }
225 
226 // Reproduces tokio-rs/tokio#849.
227 #[tokio::test]
reset_twice()228 async fn reset_twice() {
229     time::pause();
230 
231     let mut queue = task::spawn(DelayQueue::new());
232     let now = Instant::now();
233 
234     sleep(ms(1)).await;
235 
236     let key = queue.insert_at("foo", now + ms(200));
237 
238     assert_pending!(poll!(queue));
239 
240     sleep(ms(3)).await;
241 
242     queue.reset_at(&key, now + ms(50));
243 
244     sleep(ms(20)).await;
245 
246     queue.reset_at(&key, now + ms(40));
247 
248     sleep(ms(20)).await;
249 
250     assert!(queue.is_woken());
251 }
252 
253 /// Regression test: Given an entry inserted with a deadline in the past, so
254 /// that it is placed directly on the expired queue, reset the entry to a
255 /// deadline in the future. Validate that this leaves the entry and queue in an
256 /// internally consistent state by running an additional reset on the entry
257 /// before polling it to completion.
258 #[tokio::test]
repeatedly_reset_entry_inserted_as_expired()259 async fn repeatedly_reset_entry_inserted_as_expired() {
260     time::pause();
261 
262     // Instants before the start of the test seem to break in wasm.
263     time::sleep(ms(1000)).await;
264 
265     let mut queue = task::spawn(DelayQueue::new());
266     let now = Instant::now();
267 
268     let key = queue.insert_at("foo", now - ms(100));
269 
270     queue.reset_at(&key, now + ms(100));
271     queue.reset_at(&key, now + ms(50));
272 
273     assert_pending!(poll!(queue));
274 
275     time::sleep_until(now + ms(60)).await;
276 
277     assert!(queue.is_woken());
278 
279     let entry = assert_ready_some!(poll!(queue)).into_inner();
280     assert_eq!(entry, "foo");
281 
282     let entry = assert_ready!(poll!(queue));
283     assert!(entry.is_none());
284 }
285 
286 #[tokio::test]
remove_expired_item()287 async fn remove_expired_item() {
288     time::pause();
289 
290     let mut queue = DelayQueue::new();
291 
292     let now = Instant::now();
293 
294     sleep(ms(10)).await;
295 
296     let key = queue.insert_at("foo", now);
297 
298     let entry = queue.remove(&key);
299     assert_eq!(entry.into_inner(), "foo");
300 }
301 
302 /// Regression test: it should be possible to remove entries which fall in the
303 /// 0th slot of the internal timer wheel — that is, entries whose expiration
304 /// (a) falls at the beginning of one of the wheel's hierarchical levels and (b)
305 /// is equal to the wheel's current elapsed time.
306 #[tokio::test]
remove_at_timer_wheel_threshold()307 async fn remove_at_timer_wheel_threshold() {
308     time::pause();
309 
310     let mut queue = task::spawn(DelayQueue::new());
311 
312     let now = Instant::now();
313 
314     let key1 = queue.insert_at("foo", now + ms(64));
315     let key2 = queue.insert_at("bar", now + ms(64));
316 
317     sleep(ms(80)).await;
318 
319     let entry = assert_ready_some!(poll!(queue)).into_inner();
320 
321     match entry {
322         "foo" => {
323             let entry = queue.remove(&key2).into_inner();
324             assert_eq!(entry, "bar");
325         }
326         "bar" => {
327             let entry = queue.remove(&key1).into_inner();
328             assert_eq!(entry, "foo");
329         }
330         other => panic!("other: {:?}", other),
331     }
332 }
333 
334 #[tokio::test]
expires_before_last_insert()335 async fn expires_before_last_insert() {
336     time::pause();
337 
338     let mut queue = task::spawn(DelayQueue::new());
339 
340     let now = Instant::now();
341 
342     queue.insert_at("foo", now + ms(10_000));
343 
344     // Delay should be set to 8.192s here.
345     assert_pending!(poll!(queue));
346 
347     // Delay should be set to the delay of the new item here
348     queue.insert_at("bar", now + ms(600));
349 
350     assert_pending!(poll!(queue));
351 
352     sleep(ms(600)).await;
353 
354     assert!(queue.is_woken());
355 
356     let entry = assert_ready_some!(poll!(queue)).into_inner();
357     assert_eq!(entry, "bar");
358 }
359 
360 #[tokio::test]
multi_reset()361 async fn multi_reset() {
362     time::pause();
363 
364     let mut queue = task::spawn(DelayQueue::new());
365 
366     let now = Instant::now();
367 
368     let one = queue.insert_at("one", now + ms(200));
369     let two = queue.insert_at("two", now + ms(250));
370 
371     assert_pending!(poll!(queue));
372 
373     queue.reset_at(&one, now + ms(300));
374     queue.reset_at(&two, now + ms(350));
375     queue.reset_at(&one, now + ms(400));
376 
377     sleep(ms(310)).await;
378 
379     assert_pending!(poll!(queue));
380 
381     sleep(ms(50)).await;
382 
383     let entry = assert_ready_some!(poll!(queue));
384     assert_eq!(*entry.get_ref(), "two");
385 
386     assert_pending!(poll!(queue));
387 
388     sleep(ms(50)).await;
389 
390     let entry = assert_ready_some!(poll!(queue));
391     assert_eq!(*entry.get_ref(), "one");
392 
393     let entry = assert_ready!(poll!(queue));
394     assert!(entry.is_none())
395 }
396 
397 #[tokio::test]
expire_first_key_when_reset_to_expire_earlier()398 async fn expire_first_key_when_reset_to_expire_earlier() {
399     time::pause();
400 
401     let mut queue = task::spawn(DelayQueue::new());
402 
403     let now = Instant::now();
404 
405     let one = queue.insert_at("one", now + ms(200));
406     queue.insert_at("two", now + ms(250));
407 
408     assert_pending!(poll!(queue));
409 
410     queue.reset_at(&one, now + ms(100));
411 
412     sleep(ms(100)).await;
413 
414     assert!(queue.is_woken());
415 
416     let entry = assert_ready_some!(poll!(queue)).into_inner();
417     assert_eq!(entry, "one");
418 }
419 
420 #[tokio::test]
expire_second_key_when_reset_to_expire_earlier()421 async fn expire_second_key_when_reset_to_expire_earlier() {
422     time::pause();
423 
424     let mut queue = task::spawn(DelayQueue::new());
425 
426     let now = Instant::now();
427 
428     queue.insert_at("one", now + ms(200));
429     let two = queue.insert_at("two", now + ms(250));
430 
431     assert_pending!(poll!(queue));
432 
433     queue.reset_at(&two, now + ms(100));
434 
435     sleep(ms(100)).await;
436 
437     assert!(queue.is_woken());
438 
439     let entry = assert_ready_some!(poll!(queue)).into_inner();
440     assert_eq!(entry, "two");
441 }
442 
443 #[tokio::test]
reset_first_expiring_item_to_expire_later()444 async fn reset_first_expiring_item_to_expire_later() {
445     time::pause();
446 
447     let mut queue = task::spawn(DelayQueue::new());
448 
449     let now = Instant::now();
450 
451     let one = queue.insert_at("one", now + ms(200));
452     let _two = queue.insert_at("two", now + ms(250));
453 
454     assert_pending!(poll!(queue));
455 
456     queue.reset_at(&one, now + ms(300));
457     sleep(ms(250)).await;
458 
459     assert!(queue.is_woken());
460 
461     let entry = assert_ready_some!(poll!(queue)).into_inner();
462     assert_eq!(entry, "two");
463 }
464 
465 #[tokio::test]
insert_before_first_after_poll()466 async fn insert_before_first_after_poll() {
467     time::pause();
468 
469     let mut queue = task::spawn(DelayQueue::new());
470 
471     let now = Instant::now();
472 
473     let _one = queue.insert_at("one", now + ms(200));
474 
475     assert_pending!(poll!(queue));
476 
477     let _two = queue.insert_at("two", now + ms(100));
478 
479     sleep(ms(99)).await;
480 
481     assert_pending!(poll!(queue));
482 
483     sleep(ms(1)).await;
484 
485     assert!(queue.is_woken());
486 
487     let entry = assert_ready_some!(poll!(queue)).into_inner();
488     assert_eq!(entry, "two");
489 }
490 
491 #[tokio::test]
insert_after_ready_poll()492 async fn insert_after_ready_poll() {
493     time::pause();
494 
495     let mut queue = task::spawn(DelayQueue::new());
496 
497     let now = Instant::now();
498 
499     queue.insert_at("1", now + ms(100));
500     queue.insert_at("2", now + ms(100));
501     queue.insert_at("3", now + ms(100));
502 
503     assert_pending!(poll!(queue));
504 
505     sleep(ms(100)).await;
506 
507     assert!(queue.is_woken());
508 
509     let mut res = vec![];
510 
511     while res.len() < 3 {
512         let entry = assert_ready_some!(poll!(queue));
513         res.push(entry.into_inner());
514         queue.insert_at("foo", now + ms(500));
515     }
516 
517     res.sort_unstable();
518 
519     assert_eq!("1", res[0]);
520     assert_eq!("2", res[1]);
521     assert_eq!("3", res[2]);
522 }
523 
524 #[tokio::test]
reset_later_after_slot_starts()525 async fn reset_later_after_slot_starts() {
526     time::pause();
527 
528     let mut queue = task::spawn(DelayQueue::new());
529 
530     let now = Instant::now();
531 
532     let foo = queue.insert_at("foo", now + ms(100));
533 
534     assert_pending!(poll!(queue));
535 
536     sleep_until(now + Duration::from_millis(80)).await;
537 
538     assert!(!queue.is_woken());
539 
540     // At this point the queue hasn't been polled, so `elapsed` on the wheel
541     // for the queue is still at 0 and hence the 1ms resolution slots cover
542     // [0-64).  Resetting the time on the entry to 120 causes it to get put in
543     // the [64-128) slot.  As the queue knows that the first entry is within
544     // that slot, but doesn't know when, it must wake immediately to advance
545     // the wheel.
546     queue.reset_at(&foo, now + ms(120));
547     assert!(queue.is_woken());
548 
549     assert_pending!(poll!(queue));
550 
551     sleep_until(now + Duration::from_millis(119)).await;
552     assert!(!queue.is_woken());
553 
554     sleep(ms(1)).await;
555     assert!(queue.is_woken());
556 
557     let entry = assert_ready_some!(poll!(queue)).into_inner();
558     assert_eq!(entry, "foo");
559 }
560 
561 #[tokio::test]
reset_inserted_expired()562 async fn reset_inserted_expired() {
563     time::pause();
564 
565     // Instants before the start of the test seem to break in wasm.
566     time::sleep(ms(1000)).await;
567 
568     let mut queue = task::spawn(DelayQueue::new());
569     let now = Instant::now();
570 
571     let key = queue.insert_at("foo", now - ms(100));
572 
573     // this causes the panic described in #2473
574     queue.reset_at(&key, now + ms(100));
575 
576     assert_eq!(1, queue.len());
577 
578     sleep(ms(200)).await;
579 
580     let entry = assert_ready_some!(poll!(queue)).into_inner();
581     assert_eq!(entry, "foo");
582 
583     assert_eq!(queue.len(), 0);
584 }
585 
586 #[tokio::test]
reset_earlier_after_slot_starts()587 async fn reset_earlier_after_slot_starts() {
588     time::pause();
589 
590     let mut queue = task::spawn(DelayQueue::new());
591 
592     let now = Instant::now();
593 
594     let foo = queue.insert_at("foo", now + ms(200));
595 
596     assert_pending!(poll!(queue));
597 
598     sleep_until(now + Duration::from_millis(80)).await;
599 
600     assert!(!queue.is_woken());
601 
602     // At this point the queue hasn't been polled, so `elapsed` on the wheel
603     // for the queue is still at 0 and hence the 1ms resolution slots cover
604     // [0-64).  Resetting the time on the entry to 120 causes it to get put in
605     // the [64-128) slot.  As the queue knows that the first entry is within
606     // that slot, but doesn't know when, it must wake immediately to advance
607     // the wheel.
608     queue.reset_at(&foo, now + ms(120));
609     assert!(queue.is_woken());
610 
611     assert_pending!(poll!(queue));
612 
613     sleep_until(now + Duration::from_millis(119)).await;
614     assert!(!queue.is_woken());
615 
616     sleep(ms(1)).await;
617     assert!(queue.is_woken());
618 
619     let entry = assert_ready_some!(poll!(queue)).into_inner();
620     assert_eq!(entry, "foo");
621 }
622 
623 #[tokio::test]
insert_in_past_after_poll_fires_immediately()624 async fn insert_in_past_after_poll_fires_immediately() {
625     time::pause();
626 
627     let mut queue = task::spawn(DelayQueue::new());
628 
629     let now = Instant::now();
630 
631     queue.insert_at("foo", now + ms(200));
632 
633     assert_pending!(poll!(queue));
634 
635     sleep(ms(80)).await;
636 
637     assert!(!queue.is_woken());
638     queue.insert_at("bar", now + ms(40));
639 
640     assert!(queue.is_woken());
641 
642     let entry = assert_ready_some!(poll!(queue)).into_inner();
643     assert_eq!(entry, "bar");
644 }
645 
646 #[tokio::test]
delay_queue_poll_expired_when_empty()647 async fn delay_queue_poll_expired_when_empty() {
648     let mut delay_queue = task::spawn(DelayQueue::new());
649     let key = delay_queue.insert(0, std::time::Duration::from_secs(10));
650     assert_pending!(poll!(delay_queue));
651 
652     delay_queue.remove(&key);
653     assert!(assert_ready!(poll!(delay_queue)).is_none());
654 }
655 
656 #[tokio::test(start_paused = true)]
compact_expire_empty()657 async fn compact_expire_empty() {
658     let mut queue = task::spawn(DelayQueue::new());
659 
660     let now = Instant::now();
661 
662     queue.insert_at("foo1", now + ms(10));
663     queue.insert_at("foo2", now + ms(10));
664 
665     sleep(ms(10)).await;
666 
667     let mut res = vec![];
668     while res.len() < 2 {
669         let entry = assert_ready_some!(poll!(queue));
670         res.push(entry.into_inner());
671     }
672 
673     queue.compact();
674 
675     assert_eq!(queue.len(), 0);
676     assert_eq!(queue.capacity(), 0);
677 }
678 
679 #[tokio::test(start_paused = true)]
compact_remove_empty()680 async fn compact_remove_empty() {
681     let mut queue = task::spawn(DelayQueue::new());
682 
683     let now = Instant::now();
684 
685     let key1 = queue.insert_at("foo1", now + ms(10));
686     let key2 = queue.insert_at("foo2", now + ms(10));
687 
688     queue.remove(&key1);
689     queue.remove(&key2);
690 
691     queue.compact();
692 
693     assert_eq!(queue.len(), 0);
694     assert_eq!(queue.capacity(), 0);
695 }
696 
697 #[tokio::test(start_paused = true)]
698 // Trigger a re-mapping of keys in the slab due to a `compact` call and
699 // test removal of re-mapped keys
compact_remove_remapped_keys()700 async fn compact_remove_remapped_keys() {
701     let mut queue = task::spawn(DelayQueue::new());
702 
703     let now = Instant::now();
704 
705     queue.insert_at("foo1", now + ms(10));
706     queue.insert_at("foo2", now + ms(10));
707 
708     // should be assigned indices 3 and 4
709     let key3 = queue.insert_at("foo3", now + ms(20));
710     let key4 = queue.insert_at("foo4", now + ms(20));
711 
712     sleep(ms(10)).await;
713 
714     let mut res = vec![];
715     while res.len() < 2 {
716         let entry = assert_ready_some!(poll!(queue));
717         res.push(entry.into_inner());
718     }
719 
720     // items corresponding to `foo3` and `foo4` will be assigned
721     // new indices here
722     queue.compact();
723 
724     queue.insert_at("foo5", now + ms(10));
725 
726     // test removal of re-mapped keys
727     let expired3 = queue.remove(&key3);
728     let expired4 = queue.remove(&key4);
729 
730     assert_eq!(expired3.into_inner(), "foo3");
731     assert_eq!(expired4.into_inner(), "foo4");
732 
733     queue.compact();
734     assert_eq!(queue.len(), 1);
735     assert_eq!(queue.capacity(), 1);
736 }
737 
738 #[tokio::test(start_paused = true)]
compact_change_deadline()739 async fn compact_change_deadline() {
740     let mut queue = task::spawn(DelayQueue::new());
741 
742     let mut now = Instant::now();
743 
744     queue.insert_at("foo1", now + ms(10));
745     queue.insert_at("foo2", now + ms(10));
746 
747     // should be assigned indices 3 and 4
748     queue.insert_at("foo3", now + ms(20));
749     let key4 = queue.insert_at("foo4", now + ms(20));
750 
751     sleep(ms(10)).await;
752 
753     let mut res = vec![];
754     while res.len() < 2 {
755         let entry = assert_ready_some!(poll!(queue));
756         res.push(entry.into_inner());
757     }
758 
759     // items corresponding to `foo3` and `foo4` should be assigned
760     // new indices
761     queue.compact();
762 
763     now = Instant::now();
764 
765     queue.insert_at("foo5", now + ms(10));
766     let key6 = queue.insert_at("foo6", now + ms(10));
767 
768     queue.reset_at(&key4, now + ms(20));
769     queue.reset_at(&key6, now + ms(20));
770 
771     // foo3 and foo5 will expire
772     sleep(ms(10)).await;
773 
774     while res.len() < 4 {
775         let entry = assert_ready_some!(poll!(queue));
776         res.push(entry.into_inner());
777     }
778 
779     sleep(ms(10)).await;
780 
781     while res.len() < 6 {
782         let entry = assert_ready_some!(poll!(queue));
783         res.push(entry.into_inner());
784     }
785 
786     let entry = assert_ready!(poll!(queue));
787     assert!(entry.is_none());
788 }
789 
790 #[tokio::test(start_paused = true)]
item_expiry_greater_than_wheel()791 async fn item_expiry_greater_than_wheel() {
792     // This function tests that a delay queue that has existed for at least 2^36 milliseconds won't panic when a new item is inserted.
793     let mut queue = DelayQueue::new();
794     for _ in 0..2 {
795         tokio::time::advance(Duration::from_millis(1 << 35)).await;
796         queue.insert(0, Duration::from_millis(0));
797         queue.next().await;
798     }
799     // This should not panic
800     let no_panic = std::panic::catch_unwind(std::panic::AssertUnwindSafe(|| {
801         queue.insert(1, Duration::from_millis(1));
802     }));
803     assert!(no_panic.is_ok());
804 }
805 
806 #[cfg_attr(target_os = "wasi", ignore = "FIXME: Does not seem to work with WASI")]
807 #[tokio::test(start_paused = true)]
remove_after_compact()808 async fn remove_after_compact() {
809     let now = Instant::now();
810     let mut queue = DelayQueue::new();
811 
812     let foo_key = queue.insert_at("foo", now + ms(10));
813     queue.insert_at("bar", now + ms(20));
814     queue.remove(&foo_key);
815     queue.compact();
816 
817     let panic = std::panic::catch_unwind(std::panic::AssertUnwindSafe(|| {
818         queue.remove(&foo_key);
819     }));
820     assert!(panic.is_err());
821 }
822 
823 #[cfg_attr(target_os = "wasi", ignore = "FIXME: Does not seem to work with WASI")]
824 #[tokio::test(start_paused = true)]
remove_after_compact_poll()825 async fn remove_after_compact_poll() {
826     let now = Instant::now();
827     let mut queue = task::spawn(DelayQueue::new());
828 
829     let foo_key = queue.insert_at("foo", now + ms(10));
830     queue.insert_at("bar", now + ms(20));
831 
832     sleep(ms(10)).await;
833     assert_eq!(assert_ready_some!(poll!(queue)).key(), foo_key);
834 
835     queue.compact();
836 
837     let panic = std::panic::catch_unwind(std::panic::AssertUnwindSafe(|| {
838         queue.remove(&foo_key);
839     }));
840     assert!(panic.is_err());
841 }
842 
843 #[tokio::test(start_paused = true)]
peek()844 async fn peek() {
845     let mut queue = task::spawn(DelayQueue::new());
846 
847     let now = Instant::now();
848 
849     let key = queue.insert_at("foo", now + ms(5));
850     let key2 = queue.insert_at("bar", now);
851     let key3 = queue.insert_at("baz", now + ms(10));
852 
853     assert_eq!(queue.peek(), Some(key2));
854 
855     sleep(ms(6)).await;
856 
857     assert_eq!(queue.peek(), Some(key2));
858 
859     let entry = assert_ready_some!(poll!(queue));
860     assert_eq!(entry.get_ref(), &"bar");
861 
862     assert_eq!(queue.peek(), Some(key));
863 
864     let entry = assert_ready_some!(poll!(queue));
865     assert_eq!(entry.get_ref(), &"foo");
866 
867     assert_eq!(queue.peek(), Some(key3));
868 
869     assert_pending!(poll!(queue));
870 
871     sleep(ms(5)).await;
872 
873     assert_eq!(queue.peek(), Some(key3));
874 
875     let entry = assert_ready_some!(poll!(queue));
876     assert_eq!(entry.get_ref(), &"baz");
877 
878     assert!(queue.peek().is_none());
879 }
880 
ms(n: u64) -> Duration881 fn ms(n: u64) -> Duration {
882     Duration::from_millis(n)
883 }
884