1 use futures::future::poll_fn;
2 use tokio::sync::mpsc::channel;
3 use tokio_test::task::spawn;
4 use tokio_test::{assert_pending, assert_ready, assert_ready_err, assert_ready_ok};
5 use tokio_util::sync::PollSender;
6
7 #[tokio::test]
simple()8 async fn simple() {
9 let (send, mut recv) = channel(3);
10 let mut send = PollSender::new(send);
11
12 for i in 1..=3i32 {
13 let mut reserve = spawn(poll_fn(|cx| send.poll_reserve(cx)));
14 assert_ready_ok!(reserve.poll());
15 send.send_item(i).unwrap();
16 }
17
18 let mut reserve = spawn(poll_fn(|cx| send.poll_reserve(cx)));
19 assert_pending!(reserve.poll());
20
21 assert_eq!(recv.recv().await.unwrap(), 1);
22 assert!(reserve.is_woken());
23 assert_ready_ok!(reserve.poll());
24
25 drop(recv);
26
27 send.send_item(42).unwrap();
28 }
29
30 #[tokio::test]
repeated_poll_reserve()31 async fn repeated_poll_reserve() {
32 let (send, mut recv) = channel::<i32>(1);
33 let mut send = PollSender::new(send);
34
35 let mut reserve = spawn(poll_fn(|cx| send.poll_reserve(cx)));
36 assert_ready_ok!(reserve.poll());
37 assert_ready_ok!(reserve.poll());
38 send.send_item(1).unwrap();
39
40 assert_eq!(recv.recv().await.unwrap(), 1);
41 }
42
43 #[tokio::test]
abort_send()44 async fn abort_send() {
45 let (send, mut recv) = channel(3);
46 let mut send = PollSender::new(send);
47 let send2 = send.get_ref().cloned().unwrap();
48
49 for i in 1..=3i32 {
50 let mut reserve = spawn(poll_fn(|cx| send.poll_reserve(cx)));
51 assert_ready_ok!(reserve.poll());
52 send.send_item(i).unwrap();
53 }
54
55 let mut reserve = spawn(poll_fn(|cx| send.poll_reserve(cx)));
56 assert_pending!(reserve.poll());
57 assert_eq!(recv.recv().await.unwrap(), 1);
58 assert!(reserve.is_woken());
59 assert_ready_ok!(reserve.poll());
60
61 let mut send2_send = spawn(send2.send(5));
62 assert_pending!(send2_send.poll());
63 assert!(send.abort_send());
64 assert!(send2_send.is_woken());
65 assert_ready_ok!(send2_send.poll());
66
67 assert_eq!(recv.recv().await.unwrap(), 2);
68 assert_eq!(recv.recv().await.unwrap(), 3);
69 assert_eq!(recv.recv().await.unwrap(), 5);
70 }
71
72 #[tokio::test]
close_sender_last()73 async fn close_sender_last() {
74 let (send, mut recv) = channel::<i32>(3);
75 let mut send = PollSender::new(send);
76
77 let mut recv_task = spawn(recv.recv());
78 assert_pending!(recv_task.poll());
79
80 send.close();
81
82 assert!(recv_task.is_woken());
83 assert!(assert_ready!(recv_task.poll()).is_none());
84 }
85
86 #[tokio::test]
close_sender_not_last()87 async fn close_sender_not_last() {
88 let (send, mut recv) = channel::<i32>(3);
89 let mut send = PollSender::new(send);
90 let send2 = send.get_ref().cloned().unwrap();
91
92 let mut recv_task = spawn(recv.recv());
93 assert_pending!(recv_task.poll());
94
95 send.close();
96
97 assert!(!recv_task.is_woken());
98 assert_pending!(recv_task.poll());
99
100 drop(send2);
101
102 assert!(recv_task.is_woken());
103 assert!(assert_ready!(recv_task.poll()).is_none());
104 }
105
106 #[tokio::test]
close_sender_before_reserve()107 async fn close_sender_before_reserve() {
108 let (send, mut recv) = channel::<i32>(3);
109 let mut send = PollSender::new(send);
110
111 let mut recv_task = spawn(recv.recv());
112 assert_pending!(recv_task.poll());
113
114 send.close();
115
116 assert!(recv_task.is_woken());
117 assert!(assert_ready!(recv_task.poll()).is_none());
118
119 let mut reserve = spawn(poll_fn(|cx| send.poll_reserve(cx)));
120 assert_ready_err!(reserve.poll());
121 }
122
123 #[tokio::test]
close_sender_after_pending_reserve()124 async fn close_sender_after_pending_reserve() {
125 let (send, mut recv) = channel::<i32>(1);
126 let mut send = PollSender::new(send);
127
128 let mut recv_task = spawn(recv.recv());
129 assert_pending!(recv_task.poll());
130
131 let mut reserve = spawn(poll_fn(|cx| send.poll_reserve(cx)));
132 assert_ready_ok!(reserve.poll());
133 send.send_item(1).unwrap();
134
135 assert!(recv_task.is_woken());
136
137 let mut reserve = spawn(poll_fn(|cx| send.poll_reserve(cx)));
138 assert_pending!(reserve.poll());
139 drop(reserve);
140
141 send.close();
142
143 assert!(send.is_closed());
144 let mut reserve = spawn(poll_fn(|cx| send.poll_reserve(cx)));
145 assert_ready_err!(reserve.poll());
146 }
147
148 #[tokio::test]
close_sender_after_successful_reserve()149 async fn close_sender_after_successful_reserve() {
150 let (send, mut recv) = channel::<i32>(3);
151 let mut send = PollSender::new(send);
152
153 let mut recv_task = spawn(recv.recv());
154 assert_pending!(recv_task.poll());
155
156 let mut reserve = spawn(poll_fn(|cx| send.poll_reserve(cx)));
157 assert_ready_ok!(reserve.poll());
158 drop(reserve);
159
160 send.close();
161 assert!(send.is_closed());
162 assert!(!recv_task.is_woken());
163 assert_pending!(recv_task.poll());
164
165 let mut reserve = spawn(poll_fn(|cx| send.poll_reserve(cx)));
166 assert_ready_ok!(reserve.poll());
167 }
168
169 #[tokio::test]
abort_send_after_pending_reserve()170 async fn abort_send_after_pending_reserve() {
171 let (send, mut recv) = channel::<i32>(1);
172 let mut send = PollSender::new(send);
173
174 let mut recv_task = spawn(recv.recv());
175 assert_pending!(recv_task.poll());
176
177 let mut reserve = spawn(poll_fn(|cx| send.poll_reserve(cx)));
178 assert_ready_ok!(reserve.poll());
179 send.send_item(1).unwrap();
180
181 assert_eq!(send.get_ref().unwrap().capacity(), 0);
182 assert!(!send.abort_send());
183
184 let mut reserve = spawn(poll_fn(|cx| send.poll_reserve(cx)));
185 assert_pending!(reserve.poll());
186
187 assert!(send.abort_send());
188 assert_eq!(send.get_ref().unwrap().capacity(), 0);
189 }
190
191 #[tokio::test]
abort_send_after_successful_reserve()192 async fn abort_send_after_successful_reserve() {
193 let (send, mut recv) = channel::<i32>(1);
194 let mut send = PollSender::new(send);
195
196 let mut recv_task = spawn(recv.recv());
197 assert_pending!(recv_task.poll());
198
199 assert_eq!(send.get_ref().unwrap().capacity(), 1);
200 let mut reserve = spawn(poll_fn(|cx| send.poll_reserve(cx)));
201 assert_ready_ok!(reserve.poll());
202 assert_eq!(send.get_ref().unwrap().capacity(), 0);
203
204 assert!(send.abort_send());
205 assert_eq!(send.get_ref().unwrap().capacity(), 1);
206 }
207
208 #[tokio::test]
closed_when_receiver_drops()209 async fn closed_when_receiver_drops() {
210 let (send, _) = channel::<i32>(1);
211 let mut send = PollSender::new(send);
212
213 let mut reserve = spawn(poll_fn(|cx| send.poll_reserve(cx)));
214 assert_ready_err!(reserve.poll());
215 }
216
217 #[should_panic]
218 #[test]
start_send_panics_when_idle()219 fn start_send_panics_when_idle() {
220 let (send, _) = channel::<i32>(3);
221 let mut send = PollSender::new(send);
222
223 send.send_item(1).unwrap();
224 }
225
226 #[should_panic]
227 #[test]
start_send_panics_when_acquiring()228 fn start_send_panics_when_acquiring() {
229 let (send, _) = channel::<i32>(1);
230 let mut send = PollSender::new(send);
231
232 let mut reserve = spawn(poll_fn(|cx| send.poll_reserve(cx)));
233 assert_ready_ok!(reserve.poll());
234 send.send_item(1).unwrap();
235
236 let mut reserve = spawn(poll_fn(|cx| send.poll_reserve(cx)));
237 assert_pending!(reserve.poll());
238 send.send_item(2).unwrap();
239 }
240