• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 use futures::future::poll_fn;
2 use tokio::sync::mpsc::channel;
3 use tokio_test::task::spawn;
4 use tokio_test::{assert_pending, assert_ready, assert_ready_err, assert_ready_ok};
5 use tokio_util::sync::PollSender;
6 
7 #[tokio::test]
simple()8 async fn simple() {
9     let (send, mut recv) = channel(3);
10     let mut send = PollSender::new(send);
11 
12     for i in 1..=3i32 {
13         let mut reserve = spawn(poll_fn(|cx| send.poll_reserve(cx)));
14         assert_ready_ok!(reserve.poll());
15         send.send_item(i).unwrap();
16     }
17 
18     let mut reserve = spawn(poll_fn(|cx| send.poll_reserve(cx)));
19     assert_pending!(reserve.poll());
20 
21     assert_eq!(recv.recv().await.unwrap(), 1);
22     assert!(reserve.is_woken());
23     assert_ready_ok!(reserve.poll());
24 
25     drop(recv);
26 
27     send.send_item(42).unwrap();
28 }
29 
30 #[tokio::test]
repeated_poll_reserve()31 async fn repeated_poll_reserve() {
32     let (send, mut recv) = channel::<i32>(1);
33     let mut send = PollSender::new(send);
34 
35     let mut reserve = spawn(poll_fn(|cx| send.poll_reserve(cx)));
36     assert_ready_ok!(reserve.poll());
37     assert_ready_ok!(reserve.poll());
38     send.send_item(1).unwrap();
39 
40     assert_eq!(recv.recv().await.unwrap(), 1);
41 }
42 
43 #[tokio::test]
abort_send()44 async fn abort_send() {
45     let (send, mut recv) = channel(3);
46     let mut send = PollSender::new(send);
47     let send2 = send.get_ref().cloned().unwrap();
48 
49     for i in 1..=3i32 {
50         let mut reserve = spawn(poll_fn(|cx| send.poll_reserve(cx)));
51         assert_ready_ok!(reserve.poll());
52         send.send_item(i).unwrap();
53     }
54 
55     let mut reserve = spawn(poll_fn(|cx| send.poll_reserve(cx)));
56     assert_pending!(reserve.poll());
57     assert_eq!(recv.recv().await.unwrap(), 1);
58     assert!(reserve.is_woken());
59     assert_ready_ok!(reserve.poll());
60 
61     let mut send2_send = spawn(send2.send(5));
62     assert_pending!(send2_send.poll());
63     assert!(send.abort_send());
64     assert!(send2_send.is_woken());
65     assert_ready_ok!(send2_send.poll());
66 
67     assert_eq!(recv.recv().await.unwrap(), 2);
68     assert_eq!(recv.recv().await.unwrap(), 3);
69     assert_eq!(recv.recv().await.unwrap(), 5);
70 }
71 
72 #[tokio::test]
close_sender_last()73 async fn close_sender_last() {
74     let (send, mut recv) = channel::<i32>(3);
75     let mut send = PollSender::new(send);
76 
77     let mut recv_task = spawn(recv.recv());
78     assert_pending!(recv_task.poll());
79 
80     send.close();
81 
82     assert!(recv_task.is_woken());
83     assert!(assert_ready!(recv_task.poll()).is_none());
84 }
85 
86 #[tokio::test]
close_sender_not_last()87 async fn close_sender_not_last() {
88     let (send, mut recv) = channel::<i32>(3);
89     let mut send = PollSender::new(send);
90     let send2 = send.get_ref().cloned().unwrap();
91 
92     let mut recv_task = spawn(recv.recv());
93     assert_pending!(recv_task.poll());
94 
95     send.close();
96 
97     assert!(!recv_task.is_woken());
98     assert_pending!(recv_task.poll());
99 
100     drop(send2);
101 
102     assert!(recv_task.is_woken());
103     assert!(assert_ready!(recv_task.poll()).is_none());
104 }
105 
106 #[tokio::test]
close_sender_before_reserve()107 async fn close_sender_before_reserve() {
108     let (send, mut recv) = channel::<i32>(3);
109     let mut send = PollSender::new(send);
110 
111     let mut recv_task = spawn(recv.recv());
112     assert_pending!(recv_task.poll());
113 
114     send.close();
115 
116     assert!(recv_task.is_woken());
117     assert!(assert_ready!(recv_task.poll()).is_none());
118 
119     let mut reserve = spawn(poll_fn(|cx| send.poll_reserve(cx)));
120     assert_ready_err!(reserve.poll());
121 }
122 
123 #[tokio::test]
close_sender_after_pending_reserve()124 async fn close_sender_after_pending_reserve() {
125     let (send, mut recv) = channel::<i32>(1);
126     let mut send = PollSender::new(send);
127 
128     let mut recv_task = spawn(recv.recv());
129     assert_pending!(recv_task.poll());
130 
131     let mut reserve = spawn(poll_fn(|cx| send.poll_reserve(cx)));
132     assert_ready_ok!(reserve.poll());
133     send.send_item(1).unwrap();
134 
135     assert!(recv_task.is_woken());
136 
137     let mut reserve = spawn(poll_fn(|cx| send.poll_reserve(cx)));
138     assert_pending!(reserve.poll());
139     drop(reserve);
140 
141     send.close();
142 
143     assert!(send.is_closed());
144     let mut reserve = spawn(poll_fn(|cx| send.poll_reserve(cx)));
145     assert_ready_err!(reserve.poll());
146 }
147 
148 #[tokio::test]
close_sender_after_successful_reserve()149 async fn close_sender_after_successful_reserve() {
150     let (send, mut recv) = channel::<i32>(3);
151     let mut send = PollSender::new(send);
152 
153     let mut recv_task = spawn(recv.recv());
154     assert_pending!(recv_task.poll());
155 
156     let mut reserve = spawn(poll_fn(|cx| send.poll_reserve(cx)));
157     assert_ready_ok!(reserve.poll());
158     drop(reserve);
159 
160     send.close();
161     assert!(send.is_closed());
162     assert!(!recv_task.is_woken());
163     assert_pending!(recv_task.poll());
164 
165     let mut reserve = spawn(poll_fn(|cx| send.poll_reserve(cx)));
166     assert_ready_ok!(reserve.poll());
167 }
168 
169 #[tokio::test]
abort_send_after_pending_reserve()170 async fn abort_send_after_pending_reserve() {
171     let (send, mut recv) = channel::<i32>(1);
172     let mut send = PollSender::new(send);
173 
174     let mut recv_task = spawn(recv.recv());
175     assert_pending!(recv_task.poll());
176 
177     let mut reserve = spawn(poll_fn(|cx| send.poll_reserve(cx)));
178     assert_ready_ok!(reserve.poll());
179     send.send_item(1).unwrap();
180 
181     assert_eq!(send.get_ref().unwrap().capacity(), 0);
182     assert!(!send.abort_send());
183 
184     let mut reserve = spawn(poll_fn(|cx| send.poll_reserve(cx)));
185     assert_pending!(reserve.poll());
186 
187     assert!(send.abort_send());
188     assert_eq!(send.get_ref().unwrap().capacity(), 0);
189 }
190 
191 #[tokio::test]
abort_send_after_successful_reserve()192 async fn abort_send_after_successful_reserve() {
193     let (send, mut recv) = channel::<i32>(1);
194     let mut send = PollSender::new(send);
195 
196     let mut recv_task = spawn(recv.recv());
197     assert_pending!(recv_task.poll());
198 
199     assert_eq!(send.get_ref().unwrap().capacity(), 1);
200     let mut reserve = spawn(poll_fn(|cx| send.poll_reserve(cx)));
201     assert_ready_ok!(reserve.poll());
202     assert_eq!(send.get_ref().unwrap().capacity(), 0);
203 
204     assert!(send.abort_send());
205     assert_eq!(send.get_ref().unwrap().capacity(), 1);
206 }
207 
208 #[tokio::test]
closed_when_receiver_drops()209 async fn closed_when_receiver_drops() {
210     let (send, _) = channel::<i32>(1);
211     let mut send = PollSender::new(send);
212 
213     let mut reserve = spawn(poll_fn(|cx| send.poll_reserve(cx)));
214     assert_ready_err!(reserve.poll());
215 }
216 
217 #[should_panic]
218 #[test]
start_send_panics_when_idle()219 fn start_send_panics_when_idle() {
220     let (send, _) = channel::<i32>(3);
221     let mut send = PollSender::new(send);
222 
223     send.send_item(1).unwrap();
224 }
225 
226 #[should_panic]
227 #[test]
start_send_panics_when_acquiring()228 fn start_send_panics_when_acquiring() {
229     let (send, _) = channel::<i32>(1);
230     let mut send = PollSender::new(send);
231 
232     let mut reserve = spawn(poll_fn(|cx| send.poll_reserve(cx)));
233     assert_ready_ok!(reserve.poll());
234     send.send_item(1).unwrap();
235 
236     let mut reserve = spawn(poll_fn(|cx| send.poll_reserve(cx)));
237     assert_pending!(reserve.poll());
238     send.send_item(2).unwrap();
239 }
240