• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 use std::future::Future;
2 use std::sync::Arc;
3 use std::task::Poll;
4 use tokio::sync::{OwnedSemaphorePermit, Semaphore};
5 use tokio_util::sync::PollSemaphore;
6 
7 type SemRet = Option<OwnedSemaphorePermit>;
8 
semaphore_poll( sem: &mut PollSemaphore, ) -> tokio_test::task::Spawn<impl Future<Output = SemRet> + '_>9 fn semaphore_poll(
10     sem: &mut PollSemaphore,
11 ) -> tokio_test::task::Spawn<impl Future<Output = SemRet> + '_> {
12     let fut = futures::future::poll_fn(move |cx| sem.poll_acquire(cx));
13     tokio_test::task::spawn(fut)
14 }
15 
semaphore_poll_many( sem: &mut PollSemaphore, permits: u32, ) -> tokio_test::task::Spawn<impl Future<Output = SemRet> + '_>16 fn semaphore_poll_many(
17     sem: &mut PollSemaphore,
18     permits: u32,
19 ) -> tokio_test::task::Spawn<impl Future<Output = SemRet> + '_> {
20     let fut = futures::future::poll_fn(move |cx| sem.poll_acquire_many(cx, permits));
21     tokio_test::task::spawn(fut)
22 }
23 
24 #[tokio::test]
it_works()25 async fn it_works() {
26     let sem = Arc::new(Semaphore::new(1));
27     let mut poll_sem = PollSemaphore::new(sem.clone());
28 
29     let permit = sem.acquire().await.unwrap();
30     let mut poll = semaphore_poll(&mut poll_sem);
31     assert!(poll.poll().is_pending());
32     drop(permit);
33 
34     assert!(matches!(poll.poll(), Poll::Ready(Some(_))));
35     drop(poll);
36 
37     sem.close();
38 
39     assert!(semaphore_poll(&mut poll_sem).await.is_none());
40 
41     // Check that it is fused.
42     assert!(semaphore_poll(&mut poll_sem).await.is_none());
43     assert!(semaphore_poll(&mut poll_sem).await.is_none());
44 }
45 
46 #[tokio::test]
can_acquire_many_permits()47 async fn can_acquire_many_permits() {
48     let sem = Arc::new(Semaphore::new(4));
49     let mut poll_sem = PollSemaphore::new(sem.clone());
50 
51     let permit1 = semaphore_poll(&mut poll_sem).poll();
52     assert!(matches!(permit1, Poll::Ready(Some(_))));
53 
54     let permit2 = semaphore_poll_many(&mut poll_sem, 2).poll();
55     assert!(matches!(permit2, Poll::Ready(Some(_))));
56 
57     assert_eq!(sem.available_permits(), 1);
58 
59     drop(permit2);
60 
61     let mut permit4 = semaphore_poll_many(&mut poll_sem, 4);
62     assert!(permit4.poll().is_pending());
63 
64     drop(permit1);
65 
66     let permit4 = permit4.poll();
67     assert!(matches!(permit4, Poll::Ready(Some(_))));
68     assert_eq!(sem.available_permits(), 0);
69 }
70 
71 #[tokio::test]
can_poll_different_amounts_of_permits()72 async fn can_poll_different_amounts_of_permits() {
73     let sem = Arc::new(Semaphore::new(4));
74     let mut poll_sem = PollSemaphore::new(sem.clone());
75     assert!(semaphore_poll_many(&mut poll_sem, 5).poll().is_pending());
76     assert!(semaphore_poll_many(&mut poll_sem, 4).poll().is_ready());
77 
78     let permit = sem.acquire_many(4).await.unwrap();
79     assert!(semaphore_poll_many(&mut poll_sem, 5).poll().is_pending());
80     assert!(semaphore_poll_many(&mut poll_sem, 4).poll().is_pending());
81     drop(permit);
82     assert!(semaphore_poll_many(&mut poll_sem, 5).poll().is_pending());
83     assert!(semaphore_poll_many(&mut poll_sem, 4).poll().is_ready());
84 }
85