1 #![warn(rust_2018_idioms)]
2 #![cfg(feature = "sync")]
3
4 #[cfg(all(target_family = "wasm", not(target_os = "wasi")))]
5 use wasm_bindgen_test::wasm_bindgen_test as test;
6 #[cfg(all(target_family = "wasm", not(target_os = "wasi")))]
7 use wasm_bindgen_test::wasm_bindgen_test as maybe_tokio_test;
8
9 #[cfg(not(all(target_family = "wasm", not(target_os = "wasi"))))]
10 use tokio::test as maybe_tokio_test;
11
12 use tokio::sync::oneshot;
13 use tokio::sync::oneshot::error::TryRecvError;
14 use tokio_test::*;
15
16 use std::future::Future;
17 use std::pin::Pin;
18 use std::task::{Context, Poll};
19
20 trait AssertSend: Send {}
21 impl AssertSend for oneshot::Sender<i32> {}
22 impl AssertSend for oneshot::Receiver<i32> {}
23
24 trait SenderExt {
poll_closed(&mut self, cx: &mut Context<'_>) -> Poll<()>25 fn poll_closed(&mut self, cx: &mut Context<'_>) -> Poll<()>;
26 }
27 impl<T> SenderExt for oneshot::Sender<T> {
poll_closed(&mut self, cx: &mut Context<'_>) -> Poll<()>28 fn poll_closed(&mut self, cx: &mut Context<'_>) -> Poll<()> {
29 tokio::pin! {
30 let fut = self.closed();
31 }
32 fut.poll(cx)
33 }
34 }
35
36 #[test]
send_recv()37 fn send_recv() {
38 let (tx, rx) = oneshot::channel();
39 let mut rx = task::spawn(rx);
40
41 assert_pending!(rx.poll());
42
43 assert_ok!(tx.send(1));
44
45 assert!(rx.is_woken());
46
47 let val = assert_ready_ok!(rx.poll());
48 assert_eq!(val, 1);
49 }
50
51 #[maybe_tokio_test]
async_send_recv()52 async fn async_send_recv() {
53 let (tx, rx) = oneshot::channel();
54
55 assert_ok!(tx.send(1));
56 assert_eq!(1, assert_ok!(rx.await));
57 }
58
59 #[test]
close_tx()60 fn close_tx() {
61 let (tx, rx) = oneshot::channel::<i32>();
62 let mut rx = task::spawn(rx);
63
64 assert_pending!(rx.poll());
65
66 drop(tx);
67
68 assert!(rx.is_woken());
69 assert_ready_err!(rx.poll());
70 }
71
72 #[test]
close_rx()73 fn close_rx() {
74 // First, without checking poll_closed()
75 //
76 let (tx, _) = oneshot::channel();
77
78 assert_err!(tx.send(1));
79
80 // Second, via poll_closed();
81
82 let (tx, rx) = oneshot::channel();
83 let mut tx = task::spawn(tx);
84
85 assert_pending!(tx.enter(|cx, mut tx| tx.poll_closed(cx)));
86
87 drop(rx);
88
89 assert!(tx.is_woken());
90 assert!(tx.is_closed());
91 assert_ready!(tx.enter(|cx, mut tx| tx.poll_closed(cx)));
92
93 assert_err!(tx.into_inner().send(1));
94 }
95
96 #[tokio::test]
97 #[cfg(feature = "full")]
async_rx_closed()98 async fn async_rx_closed() {
99 let (mut tx, rx) = oneshot::channel::<()>();
100
101 tokio::spawn(async move {
102 drop(rx);
103 });
104
105 tx.closed().await;
106 }
107
108 #[test]
explicit_close_poll()109 fn explicit_close_poll() {
110 // First, with message sent
111 let (tx, rx) = oneshot::channel();
112 let mut rx = task::spawn(rx);
113
114 assert_ok!(tx.send(1));
115
116 rx.close();
117
118 let value = assert_ready_ok!(rx.poll());
119 assert_eq!(value, 1);
120
121 // Second, without the message sent
122 let (tx, rx) = oneshot::channel::<i32>();
123 let mut tx = task::spawn(tx);
124 let mut rx = task::spawn(rx);
125
126 assert_pending!(tx.enter(|cx, mut tx| tx.poll_closed(cx)));
127
128 rx.close();
129
130 assert!(tx.is_woken());
131 assert!(tx.is_closed());
132 assert_ready!(tx.enter(|cx, mut tx| tx.poll_closed(cx)));
133
134 assert_err!(tx.into_inner().send(1));
135 assert_ready_err!(rx.poll());
136
137 // Again, but without sending the value this time
138 let (tx, rx) = oneshot::channel::<i32>();
139 let mut tx = task::spawn(tx);
140 let mut rx = task::spawn(rx);
141
142 assert_pending!(tx.enter(|cx, mut tx| tx.poll_closed(cx)));
143
144 rx.close();
145
146 assert!(tx.is_woken());
147 assert!(tx.is_closed());
148 assert_ready!(tx.enter(|cx, mut tx| tx.poll_closed(cx)));
149
150 assert_ready_err!(rx.poll());
151 }
152
153 #[test]
explicit_close_try_recv()154 fn explicit_close_try_recv() {
155 // First, with message sent
156 let (tx, mut rx) = oneshot::channel();
157
158 assert_ok!(tx.send(1));
159
160 rx.close();
161
162 let val = assert_ok!(rx.try_recv());
163 assert_eq!(1, val);
164
165 // Second, without the message sent
166 let (tx, mut rx) = oneshot::channel::<i32>();
167 let mut tx = task::spawn(tx);
168
169 assert_pending!(tx.enter(|cx, mut tx| tx.poll_closed(cx)));
170
171 rx.close();
172
173 assert!(tx.is_woken());
174 assert!(tx.is_closed());
175 assert_ready!(tx.enter(|cx, mut tx| tx.poll_closed(cx)));
176
177 assert_err!(rx.try_recv());
178 }
179
180 #[test]
181 #[should_panic]
182 #[cfg(not(target_family = "wasm"))] // wasm currently doesn't support unwinding
close_try_recv_poll()183 fn close_try_recv_poll() {
184 let (_tx, rx) = oneshot::channel::<i32>();
185 let mut rx = task::spawn(rx);
186
187 rx.close();
188
189 assert_err!(rx.try_recv());
190
191 let _ = rx.poll();
192 }
193
194 #[test]
close_after_recv()195 fn close_after_recv() {
196 let (tx, mut rx) = oneshot::channel::<i32>();
197
198 tx.send(17).unwrap();
199
200 assert_eq!(17, rx.try_recv().unwrap());
201 rx.close();
202 }
203
204 #[test]
try_recv_after_completion()205 fn try_recv_after_completion() {
206 let (tx, mut rx) = oneshot::channel::<i32>();
207
208 tx.send(17).unwrap();
209
210 assert_eq!(17, rx.try_recv().unwrap());
211 assert_eq!(Err(TryRecvError::Closed), rx.try_recv());
212 rx.close();
213 }
214
215 #[test]
try_recv_after_completion_await()216 fn try_recv_after_completion_await() {
217 let (tx, rx) = oneshot::channel::<i32>();
218 let mut rx = task::spawn(rx);
219
220 tx.send(17).unwrap();
221
222 assert_eq!(Ok(17), assert_ready!(rx.poll()));
223 assert_eq!(Err(TryRecvError::Closed), rx.try_recv());
224 rx.close();
225 }
226
227 #[test]
drops_tasks()228 fn drops_tasks() {
229 let (mut tx, mut rx) = oneshot::channel::<i32>();
230 let mut tx_task = task::spawn(());
231 let mut rx_task = task::spawn(());
232
233 assert_pending!(tx_task.enter(|cx, _| tx.poll_closed(cx)));
234 assert_pending!(rx_task.enter(|cx, _| Pin::new(&mut rx).poll(cx)));
235
236 drop(tx);
237 drop(rx);
238
239 assert_eq!(1, tx_task.waker_ref_count());
240 assert_eq!(1, rx_task.waker_ref_count());
241 }
242
243 #[test]
receiver_changes_task()244 fn receiver_changes_task() {
245 let (tx, mut rx) = oneshot::channel();
246
247 let mut task1 = task::spawn(());
248 let mut task2 = task::spawn(());
249
250 assert_pending!(task1.enter(|cx, _| Pin::new(&mut rx).poll(cx)));
251
252 assert_eq!(2, task1.waker_ref_count());
253 assert_eq!(1, task2.waker_ref_count());
254
255 assert_pending!(task2.enter(|cx, _| Pin::new(&mut rx).poll(cx)));
256
257 assert_eq!(1, task1.waker_ref_count());
258 assert_eq!(2, task2.waker_ref_count());
259
260 assert_ok!(tx.send(1));
261
262 assert!(!task1.is_woken());
263 assert!(task2.is_woken());
264
265 assert_ready_ok!(task2.enter(|cx, _| Pin::new(&mut rx).poll(cx)));
266 }
267
268 #[test]
sender_changes_task()269 fn sender_changes_task() {
270 let (mut tx, rx) = oneshot::channel::<i32>();
271
272 let mut task1 = task::spawn(());
273 let mut task2 = task::spawn(());
274
275 assert_pending!(task1.enter(|cx, _| tx.poll_closed(cx)));
276
277 assert_eq!(2, task1.waker_ref_count());
278 assert_eq!(1, task2.waker_ref_count());
279
280 assert_pending!(task2.enter(|cx, _| tx.poll_closed(cx)));
281
282 assert_eq!(1, task1.waker_ref_count());
283 assert_eq!(2, task2.waker_ref_count());
284
285 drop(rx);
286
287 assert!(!task1.is_woken());
288 assert!(task2.is_woken());
289
290 assert_ready!(task2.enter(|cx, _| tx.poll_closed(cx)));
291 }
292