1 use futures::future::poll_fn;
2 use tokio::sync::mpsc::channel;
3 use tokio_test::task::spawn;
4 use tokio_test::{assert_pending, assert_ready, assert_ready_err, assert_ready_ok};
5 use tokio_util::sync::PollSender;
6
7 #[tokio::test]
simple()8 async fn simple() {
9 let (send, mut recv) = channel(3);
10 let mut send = PollSender::new(send);
11
12 for i in 1..=3i32 {
13 let mut reserve = spawn(poll_fn(|cx| send.poll_reserve(cx)));
14 assert_ready_ok!(reserve.poll());
15 send.send_item(i).unwrap();
16 }
17
18 let mut reserve = spawn(poll_fn(|cx| send.poll_reserve(cx)));
19 assert_pending!(reserve.poll());
20
21 assert_eq!(recv.recv().await.unwrap(), 1);
22 assert!(reserve.is_woken());
23 assert_ready_ok!(reserve.poll());
24
25 drop(recv);
26
27 send.send_item(42).unwrap();
28 }
29
30 #[tokio::test]
simple_ref()31 async fn simple_ref() {
32 let v = vec![1, 2, 3i32];
33
34 let (send, mut recv) = channel(3);
35 let mut send = PollSender::new(send);
36
37 for vi in v.iter() {
38 let mut reserve = spawn(poll_fn(|cx| send.poll_reserve(cx)));
39 assert_ready_ok!(reserve.poll());
40 send.send_item(vi).unwrap();
41 }
42
43 let mut reserve = spawn(poll_fn(|cx| send.poll_reserve(cx)));
44 assert_pending!(reserve.poll());
45
46 assert_eq!(*recv.recv().await.unwrap(), 1);
47 assert!(reserve.is_woken());
48 assert_ready_ok!(reserve.poll());
49 drop(recv);
50 send.send_item(&42).unwrap();
51 }
52
53 #[tokio::test]
repeated_poll_reserve()54 async fn repeated_poll_reserve() {
55 let (send, mut recv) = channel::<i32>(1);
56 let mut send = PollSender::new(send);
57
58 let mut reserve = spawn(poll_fn(|cx| send.poll_reserve(cx)));
59 assert_ready_ok!(reserve.poll());
60 assert_ready_ok!(reserve.poll());
61 send.send_item(1).unwrap();
62
63 assert_eq!(recv.recv().await.unwrap(), 1);
64 }
65
66 #[tokio::test]
abort_send()67 async fn abort_send() {
68 let (send, mut recv) = channel(3);
69 let mut send = PollSender::new(send);
70 let send2 = send.get_ref().cloned().unwrap();
71
72 for i in 1..=3i32 {
73 let mut reserve = spawn(poll_fn(|cx| send.poll_reserve(cx)));
74 assert_ready_ok!(reserve.poll());
75 send.send_item(i).unwrap();
76 }
77
78 let mut reserve = spawn(poll_fn(|cx| send.poll_reserve(cx)));
79 assert_pending!(reserve.poll());
80 assert_eq!(recv.recv().await.unwrap(), 1);
81 assert!(reserve.is_woken());
82 assert_ready_ok!(reserve.poll());
83
84 let mut send2_send = spawn(send2.send(5));
85 assert_pending!(send2_send.poll());
86 assert!(send.abort_send());
87 assert!(send2_send.is_woken());
88 assert_ready_ok!(send2_send.poll());
89
90 assert_eq!(recv.recv().await.unwrap(), 2);
91 assert_eq!(recv.recv().await.unwrap(), 3);
92 assert_eq!(recv.recv().await.unwrap(), 5);
93 }
94
95 #[tokio::test]
close_sender_last()96 async fn close_sender_last() {
97 let (send, mut recv) = channel::<i32>(3);
98 let mut send = PollSender::new(send);
99
100 let mut recv_task = spawn(recv.recv());
101 assert_pending!(recv_task.poll());
102
103 send.close();
104
105 assert!(recv_task.is_woken());
106 assert!(assert_ready!(recv_task.poll()).is_none());
107 }
108
109 #[tokio::test]
close_sender_not_last()110 async fn close_sender_not_last() {
111 let (send, mut recv) = channel::<i32>(3);
112 let mut send = PollSender::new(send);
113 let send2 = send.get_ref().cloned().unwrap();
114
115 let mut recv_task = spawn(recv.recv());
116 assert_pending!(recv_task.poll());
117
118 send.close();
119
120 assert!(!recv_task.is_woken());
121 assert_pending!(recv_task.poll());
122
123 drop(send2);
124
125 assert!(recv_task.is_woken());
126 assert!(assert_ready!(recv_task.poll()).is_none());
127 }
128
129 #[tokio::test]
close_sender_before_reserve()130 async fn close_sender_before_reserve() {
131 let (send, mut recv) = channel::<i32>(3);
132 let mut send = PollSender::new(send);
133
134 let mut recv_task = spawn(recv.recv());
135 assert_pending!(recv_task.poll());
136
137 send.close();
138
139 assert!(recv_task.is_woken());
140 assert!(assert_ready!(recv_task.poll()).is_none());
141
142 let mut reserve = spawn(poll_fn(|cx| send.poll_reserve(cx)));
143 assert_ready_err!(reserve.poll());
144 }
145
146 #[tokio::test]
close_sender_after_pending_reserve()147 async fn close_sender_after_pending_reserve() {
148 let (send, mut recv) = channel::<i32>(1);
149 let mut send = PollSender::new(send);
150
151 let mut recv_task = spawn(recv.recv());
152 assert_pending!(recv_task.poll());
153
154 let mut reserve = spawn(poll_fn(|cx| send.poll_reserve(cx)));
155 assert_ready_ok!(reserve.poll());
156 send.send_item(1).unwrap();
157
158 assert!(recv_task.is_woken());
159
160 let mut reserve = spawn(poll_fn(|cx| send.poll_reserve(cx)));
161 assert_pending!(reserve.poll());
162 drop(reserve);
163
164 send.close();
165
166 assert!(send.is_closed());
167 let mut reserve = spawn(poll_fn(|cx| send.poll_reserve(cx)));
168 assert_ready_err!(reserve.poll());
169 }
170
171 #[tokio::test]
close_sender_after_successful_reserve()172 async fn close_sender_after_successful_reserve() {
173 let (send, mut recv) = channel::<i32>(3);
174 let mut send = PollSender::new(send);
175
176 let mut recv_task = spawn(recv.recv());
177 assert_pending!(recv_task.poll());
178
179 let mut reserve = spawn(poll_fn(|cx| send.poll_reserve(cx)));
180 assert_ready_ok!(reserve.poll());
181 drop(reserve);
182
183 send.close();
184 assert!(send.is_closed());
185 assert!(!recv_task.is_woken());
186 assert_pending!(recv_task.poll());
187
188 let mut reserve = spawn(poll_fn(|cx| send.poll_reserve(cx)));
189 assert_ready_ok!(reserve.poll());
190 }
191
192 #[tokio::test]
abort_send_after_pending_reserve()193 async fn abort_send_after_pending_reserve() {
194 let (send, mut recv) = channel::<i32>(1);
195 let mut send = PollSender::new(send);
196
197 let mut recv_task = spawn(recv.recv());
198 assert_pending!(recv_task.poll());
199
200 let mut reserve = spawn(poll_fn(|cx| send.poll_reserve(cx)));
201 assert_ready_ok!(reserve.poll());
202 send.send_item(1).unwrap();
203
204 assert_eq!(send.get_ref().unwrap().capacity(), 0);
205 assert!(!send.abort_send());
206
207 let mut reserve = spawn(poll_fn(|cx| send.poll_reserve(cx)));
208 assert_pending!(reserve.poll());
209
210 assert!(send.abort_send());
211 assert_eq!(send.get_ref().unwrap().capacity(), 0);
212 }
213
214 #[tokio::test]
abort_send_after_successful_reserve()215 async fn abort_send_after_successful_reserve() {
216 let (send, mut recv) = channel::<i32>(1);
217 let mut send = PollSender::new(send);
218
219 let mut recv_task = spawn(recv.recv());
220 assert_pending!(recv_task.poll());
221
222 assert_eq!(send.get_ref().unwrap().capacity(), 1);
223 let mut reserve = spawn(poll_fn(|cx| send.poll_reserve(cx)));
224 assert_ready_ok!(reserve.poll());
225 assert_eq!(send.get_ref().unwrap().capacity(), 0);
226
227 assert!(send.abort_send());
228 assert_eq!(send.get_ref().unwrap().capacity(), 1);
229 }
230
231 #[tokio::test]
closed_when_receiver_drops()232 async fn closed_when_receiver_drops() {
233 let (send, _) = channel::<i32>(1);
234 let mut send = PollSender::new(send);
235
236 let mut reserve = spawn(poll_fn(|cx| send.poll_reserve(cx)));
237 assert_ready_err!(reserve.poll());
238 }
239
240 #[should_panic]
241 #[test]
start_send_panics_when_idle()242 fn start_send_panics_when_idle() {
243 let (send, _) = channel::<i32>(3);
244 let mut send = PollSender::new(send);
245
246 send.send_item(1).unwrap();
247 }
248
249 #[should_panic]
250 #[test]
start_send_panics_when_acquiring()251 fn start_send_panics_when_acquiring() {
252 let (send, _) = channel::<i32>(1);
253 let mut send = PollSender::new(send);
254
255 let mut reserve = spawn(poll_fn(|cx| send.poll_reserve(cx)));
256 assert_ready_ok!(reserve.poll());
257 send.send_item(1).unwrap();
258
259 let mut reserve = spawn(poll_fn(|cx| send.poll_reserve(cx)));
260 assert_pending!(reserve.poll());
261 send.send_item(2).unwrap();
262 }
263