• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 use futures::future::poll_fn;
2 use tokio::sync::mpsc::channel;
3 use tokio_test::task::spawn;
4 use tokio_test::{assert_pending, assert_ready, assert_ready_err, assert_ready_ok};
5 use tokio_util::sync::PollSender;
6 
7 #[tokio::test]
simple()8 async fn simple() {
9     let (send, mut recv) = channel(3);
10     let mut send = PollSender::new(send);
11 
12     for i in 1..=3i32 {
13         let mut reserve = spawn(poll_fn(|cx| send.poll_reserve(cx)));
14         assert_ready_ok!(reserve.poll());
15         send.send_item(i).unwrap();
16     }
17 
18     let mut reserve = spawn(poll_fn(|cx| send.poll_reserve(cx)));
19     assert_pending!(reserve.poll());
20 
21     assert_eq!(recv.recv().await.unwrap(), 1);
22     assert!(reserve.is_woken());
23     assert_ready_ok!(reserve.poll());
24 
25     drop(recv);
26 
27     send.send_item(42).unwrap();
28 }
29 
30 #[tokio::test]
simple_ref()31 async fn simple_ref() {
32     let v = vec![1, 2, 3i32];
33 
34     let (send, mut recv) = channel(3);
35     let mut send = PollSender::new(send);
36 
37     for vi in v.iter() {
38         let mut reserve = spawn(poll_fn(|cx| send.poll_reserve(cx)));
39         assert_ready_ok!(reserve.poll());
40         send.send_item(vi).unwrap();
41     }
42 
43     let mut reserve = spawn(poll_fn(|cx| send.poll_reserve(cx)));
44     assert_pending!(reserve.poll());
45 
46     assert_eq!(*recv.recv().await.unwrap(), 1);
47     assert!(reserve.is_woken());
48     assert_ready_ok!(reserve.poll());
49     drop(recv);
50     send.send_item(&42).unwrap();
51 }
52 
53 #[tokio::test]
repeated_poll_reserve()54 async fn repeated_poll_reserve() {
55     let (send, mut recv) = channel::<i32>(1);
56     let mut send = PollSender::new(send);
57 
58     let mut reserve = spawn(poll_fn(|cx| send.poll_reserve(cx)));
59     assert_ready_ok!(reserve.poll());
60     assert_ready_ok!(reserve.poll());
61     send.send_item(1).unwrap();
62 
63     assert_eq!(recv.recv().await.unwrap(), 1);
64 }
65 
66 #[tokio::test]
abort_send()67 async fn abort_send() {
68     let (send, mut recv) = channel(3);
69     let mut send = PollSender::new(send);
70     let send2 = send.get_ref().cloned().unwrap();
71 
72     for i in 1..=3i32 {
73         let mut reserve = spawn(poll_fn(|cx| send.poll_reserve(cx)));
74         assert_ready_ok!(reserve.poll());
75         send.send_item(i).unwrap();
76     }
77 
78     let mut reserve = spawn(poll_fn(|cx| send.poll_reserve(cx)));
79     assert_pending!(reserve.poll());
80     assert_eq!(recv.recv().await.unwrap(), 1);
81     assert!(reserve.is_woken());
82     assert_ready_ok!(reserve.poll());
83 
84     let mut send2_send = spawn(send2.send(5));
85     assert_pending!(send2_send.poll());
86     assert!(send.abort_send());
87     assert!(send2_send.is_woken());
88     assert_ready_ok!(send2_send.poll());
89 
90     assert_eq!(recv.recv().await.unwrap(), 2);
91     assert_eq!(recv.recv().await.unwrap(), 3);
92     assert_eq!(recv.recv().await.unwrap(), 5);
93 }
94 
95 #[tokio::test]
close_sender_last()96 async fn close_sender_last() {
97     let (send, mut recv) = channel::<i32>(3);
98     let mut send = PollSender::new(send);
99 
100     let mut recv_task = spawn(recv.recv());
101     assert_pending!(recv_task.poll());
102 
103     send.close();
104 
105     assert!(recv_task.is_woken());
106     assert!(assert_ready!(recv_task.poll()).is_none());
107 }
108 
109 #[tokio::test]
close_sender_not_last()110 async fn close_sender_not_last() {
111     let (send, mut recv) = channel::<i32>(3);
112     let mut send = PollSender::new(send);
113     let send2 = send.get_ref().cloned().unwrap();
114 
115     let mut recv_task = spawn(recv.recv());
116     assert_pending!(recv_task.poll());
117 
118     send.close();
119 
120     assert!(!recv_task.is_woken());
121     assert_pending!(recv_task.poll());
122 
123     drop(send2);
124 
125     assert!(recv_task.is_woken());
126     assert!(assert_ready!(recv_task.poll()).is_none());
127 }
128 
129 #[tokio::test]
close_sender_before_reserve()130 async fn close_sender_before_reserve() {
131     let (send, mut recv) = channel::<i32>(3);
132     let mut send = PollSender::new(send);
133 
134     let mut recv_task = spawn(recv.recv());
135     assert_pending!(recv_task.poll());
136 
137     send.close();
138 
139     assert!(recv_task.is_woken());
140     assert!(assert_ready!(recv_task.poll()).is_none());
141 
142     let mut reserve = spawn(poll_fn(|cx| send.poll_reserve(cx)));
143     assert_ready_err!(reserve.poll());
144 }
145 
146 #[tokio::test]
close_sender_after_pending_reserve()147 async fn close_sender_after_pending_reserve() {
148     let (send, mut recv) = channel::<i32>(1);
149     let mut send = PollSender::new(send);
150 
151     let mut recv_task = spawn(recv.recv());
152     assert_pending!(recv_task.poll());
153 
154     let mut reserve = spawn(poll_fn(|cx| send.poll_reserve(cx)));
155     assert_ready_ok!(reserve.poll());
156     send.send_item(1).unwrap();
157 
158     assert!(recv_task.is_woken());
159 
160     let mut reserve = spawn(poll_fn(|cx| send.poll_reserve(cx)));
161     assert_pending!(reserve.poll());
162     drop(reserve);
163 
164     send.close();
165 
166     assert!(send.is_closed());
167     let mut reserve = spawn(poll_fn(|cx| send.poll_reserve(cx)));
168     assert_ready_err!(reserve.poll());
169 }
170 
171 #[tokio::test]
close_sender_after_successful_reserve()172 async fn close_sender_after_successful_reserve() {
173     let (send, mut recv) = channel::<i32>(3);
174     let mut send = PollSender::new(send);
175 
176     let mut recv_task = spawn(recv.recv());
177     assert_pending!(recv_task.poll());
178 
179     let mut reserve = spawn(poll_fn(|cx| send.poll_reserve(cx)));
180     assert_ready_ok!(reserve.poll());
181     drop(reserve);
182 
183     send.close();
184     assert!(send.is_closed());
185     assert!(!recv_task.is_woken());
186     assert_pending!(recv_task.poll());
187 
188     let mut reserve = spawn(poll_fn(|cx| send.poll_reserve(cx)));
189     assert_ready_ok!(reserve.poll());
190 }
191 
192 #[tokio::test]
abort_send_after_pending_reserve()193 async fn abort_send_after_pending_reserve() {
194     let (send, mut recv) = channel::<i32>(1);
195     let mut send = PollSender::new(send);
196 
197     let mut recv_task = spawn(recv.recv());
198     assert_pending!(recv_task.poll());
199 
200     let mut reserve = spawn(poll_fn(|cx| send.poll_reserve(cx)));
201     assert_ready_ok!(reserve.poll());
202     send.send_item(1).unwrap();
203 
204     assert_eq!(send.get_ref().unwrap().capacity(), 0);
205     assert!(!send.abort_send());
206 
207     let mut reserve = spawn(poll_fn(|cx| send.poll_reserve(cx)));
208     assert_pending!(reserve.poll());
209 
210     assert!(send.abort_send());
211     assert_eq!(send.get_ref().unwrap().capacity(), 0);
212 }
213 
214 #[tokio::test]
abort_send_after_successful_reserve()215 async fn abort_send_after_successful_reserve() {
216     let (send, mut recv) = channel::<i32>(1);
217     let mut send = PollSender::new(send);
218 
219     let mut recv_task = spawn(recv.recv());
220     assert_pending!(recv_task.poll());
221 
222     assert_eq!(send.get_ref().unwrap().capacity(), 1);
223     let mut reserve = spawn(poll_fn(|cx| send.poll_reserve(cx)));
224     assert_ready_ok!(reserve.poll());
225     assert_eq!(send.get_ref().unwrap().capacity(), 0);
226 
227     assert!(send.abort_send());
228     assert_eq!(send.get_ref().unwrap().capacity(), 1);
229 }
230 
231 #[tokio::test]
closed_when_receiver_drops()232 async fn closed_when_receiver_drops() {
233     let (send, _) = channel::<i32>(1);
234     let mut send = PollSender::new(send);
235 
236     let mut reserve = spawn(poll_fn(|cx| send.poll_reserve(cx)));
237     assert_ready_err!(reserve.poll());
238 }
239 
240 #[should_panic]
241 #[test]
start_send_panics_when_idle()242 fn start_send_panics_when_idle() {
243     let (send, _) = channel::<i32>(3);
244     let mut send = PollSender::new(send);
245 
246     send.send_item(1).unwrap();
247 }
248 
249 #[should_panic]
250 #[test]
start_send_panics_when_acquiring()251 fn start_send_panics_when_acquiring() {
252     let (send, _) = channel::<i32>(1);
253     let mut send = PollSender::new(send);
254 
255     let mut reserve = spawn(poll_fn(|cx| send.poll_reserve(cx)));
256     assert_ready_ok!(reserve.poll());
257     send.send_item(1).unwrap();
258 
259     let mut reserve = spawn(poll_fn(|cx| send.poll_reserve(cx)));
260     assert_pending!(reserve.poll());
261     send.send_item(2).unwrap();
262 }
263