• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 #![warn(rust_2018_idioms)]
2 #![cfg(feature = "sync")]
3 
4 #[cfg(all(target_family = "wasm", not(target_os = "wasi")))]
5 use wasm_bindgen_test::wasm_bindgen_test as test;
6 #[cfg(all(target_family = "wasm", not(target_os = "wasi")))]
7 use wasm_bindgen_test::wasm_bindgen_test as maybe_tokio_test;
8 
9 #[cfg(not(all(target_family = "wasm", not(target_os = "wasi"))))]
10 use tokio::test as maybe_tokio_test;
11 
12 use tokio::sync::oneshot;
13 use tokio::sync::oneshot::error::TryRecvError;
14 use tokio_test::*;
15 
16 use std::future::Future;
17 use std::pin::Pin;
18 use std::task::{Context, Poll};
19 
20 trait AssertSend: Send {}
21 impl AssertSend for oneshot::Sender<i32> {}
22 impl AssertSend for oneshot::Receiver<i32> {}
23 
24 trait SenderExt {
poll_closed(&mut self, cx: &mut Context<'_>) -> Poll<()>25     fn poll_closed(&mut self, cx: &mut Context<'_>) -> Poll<()>;
26 }
27 impl<T> SenderExt for oneshot::Sender<T> {
poll_closed(&mut self, cx: &mut Context<'_>) -> Poll<()>28     fn poll_closed(&mut self, cx: &mut Context<'_>) -> Poll<()> {
29         tokio::pin! {
30             let fut = self.closed();
31         }
32         fut.poll(cx)
33     }
34 }
35 
36 #[test]
send_recv()37 fn send_recv() {
38     let (tx, rx) = oneshot::channel();
39     let mut rx = task::spawn(rx);
40 
41     assert_pending!(rx.poll());
42 
43     assert_ok!(tx.send(1));
44 
45     assert!(rx.is_woken());
46 
47     let val = assert_ready_ok!(rx.poll());
48     assert_eq!(val, 1);
49 }
50 
51 #[maybe_tokio_test]
async_send_recv()52 async fn async_send_recv() {
53     let (tx, rx) = oneshot::channel();
54 
55     assert_ok!(tx.send(1));
56     assert_eq!(1, assert_ok!(rx.await));
57 }
58 
59 #[test]
close_tx()60 fn close_tx() {
61     let (tx, rx) = oneshot::channel::<i32>();
62     let mut rx = task::spawn(rx);
63 
64     assert_pending!(rx.poll());
65 
66     drop(tx);
67 
68     assert!(rx.is_woken());
69     assert_ready_err!(rx.poll());
70 }
71 
72 #[test]
close_rx()73 fn close_rx() {
74     // First, without checking poll_closed()
75     //
76     let (tx, _) = oneshot::channel();
77 
78     assert_err!(tx.send(1));
79 
80     // Second, via poll_closed();
81 
82     let (tx, rx) = oneshot::channel();
83     let mut tx = task::spawn(tx);
84 
85     assert_pending!(tx.enter(|cx, mut tx| tx.poll_closed(cx)));
86 
87     drop(rx);
88 
89     assert!(tx.is_woken());
90     assert!(tx.is_closed());
91     assert_ready!(tx.enter(|cx, mut tx| tx.poll_closed(cx)));
92 
93     assert_err!(tx.into_inner().send(1));
94 }
95 
96 #[tokio::test]
97 #[cfg(feature = "full")]
async_rx_closed()98 async fn async_rx_closed() {
99     let (mut tx, rx) = oneshot::channel::<()>();
100 
101     tokio::spawn(async move {
102         drop(rx);
103     });
104 
105     tx.closed().await;
106 }
107 
108 #[test]
explicit_close_poll()109 fn explicit_close_poll() {
110     // First, with message sent
111     let (tx, rx) = oneshot::channel();
112     let mut rx = task::spawn(rx);
113 
114     assert_ok!(tx.send(1));
115 
116     rx.close();
117 
118     let value = assert_ready_ok!(rx.poll());
119     assert_eq!(value, 1);
120 
121     // Second, without the message sent
122     let (tx, rx) = oneshot::channel::<i32>();
123     let mut tx = task::spawn(tx);
124     let mut rx = task::spawn(rx);
125 
126     assert_pending!(tx.enter(|cx, mut tx| tx.poll_closed(cx)));
127 
128     rx.close();
129 
130     assert!(tx.is_woken());
131     assert!(tx.is_closed());
132     assert_ready!(tx.enter(|cx, mut tx| tx.poll_closed(cx)));
133 
134     assert_err!(tx.into_inner().send(1));
135     assert_ready_err!(rx.poll());
136 
137     // Again, but without sending the value this time
138     let (tx, rx) = oneshot::channel::<i32>();
139     let mut tx = task::spawn(tx);
140     let mut rx = task::spawn(rx);
141 
142     assert_pending!(tx.enter(|cx, mut tx| tx.poll_closed(cx)));
143 
144     rx.close();
145 
146     assert!(tx.is_woken());
147     assert!(tx.is_closed());
148     assert_ready!(tx.enter(|cx, mut tx| tx.poll_closed(cx)));
149 
150     assert_ready_err!(rx.poll());
151 }
152 
153 #[test]
explicit_close_try_recv()154 fn explicit_close_try_recv() {
155     // First, with message sent
156     let (tx, mut rx) = oneshot::channel();
157 
158     assert_ok!(tx.send(1));
159 
160     rx.close();
161 
162     let val = assert_ok!(rx.try_recv());
163     assert_eq!(1, val);
164 
165     // Second, without the message sent
166     let (tx, mut rx) = oneshot::channel::<i32>();
167     let mut tx = task::spawn(tx);
168 
169     assert_pending!(tx.enter(|cx, mut tx| tx.poll_closed(cx)));
170 
171     rx.close();
172 
173     assert!(tx.is_woken());
174     assert!(tx.is_closed());
175     assert_ready!(tx.enter(|cx, mut tx| tx.poll_closed(cx)));
176 
177     assert_err!(rx.try_recv());
178 }
179 
180 #[test]
181 #[should_panic]
182 #[cfg(not(target_family = "wasm"))] // wasm currently doesn't support unwinding
close_try_recv_poll()183 fn close_try_recv_poll() {
184     let (_tx, rx) = oneshot::channel::<i32>();
185     let mut rx = task::spawn(rx);
186 
187     rx.close();
188 
189     assert_err!(rx.try_recv());
190 
191     let _ = rx.poll();
192 }
193 
194 #[test]
close_after_recv()195 fn close_after_recv() {
196     let (tx, mut rx) = oneshot::channel::<i32>();
197 
198     tx.send(17).unwrap();
199 
200     assert_eq!(17, rx.try_recv().unwrap());
201     rx.close();
202 }
203 
204 #[test]
try_recv_after_completion()205 fn try_recv_after_completion() {
206     let (tx, mut rx) = oneshot::channel::<i32>();
207 
208     tx.send(17).unwrap();
209 
210     assert_eq!(17, rx.try_recv().unwrap());
211     assert_eq!(Err(TryRecvError::Closed), rx.try_recv());
212     rx.close();
213 }
214 
215 #[test]
try_recv_after_completion_await()216 fn try_recv_after_completion_await() {
217     let (tx, rx) = oneshot::channel::<i32>();
218     let mut rx = task::spawn(rx);
219 
220     tx.send(17).unwrap();
221 
222     assert_eq!(Ok(17), assert_ready!(rx.poll()));
223     assert_eq!(Err(TryRecvError::Closed), rx.try_recv());
224     rx.close();
225 }
226 
227 #[test]
drops_tasks()228 fn drops_tasks() {
229     let (mut tx, mut rx) = oneshot::channel::<i32>();
230     let mut tx_task = task::spawn(());
231     let mut rx_task = task::spawn(());
232 
233     assert_pending!(tx_task.enter(|cx, _| tx.poll_closed(cx)));
234     assert_pending!(rx_task.enter(|cx, _| Pin::new(&mut rx).poll(cx)));
235 
236     drop(tx);
237     drop(rx);
238 
239     assert_eq!(1, tx_task.waker_ref_count());
240     assert_eq!(1, rx_task.waker_ref_count());
241 }
242 
243 #[test]
receiver_changes_task()244 fn receiver_changes_task() {
245     let (tx, mut rx) = oneshot::channel();
246 
247     let mut task1 = task::spawn(());
248     let mut task2 = task::spawn(());
249 
250     assert_pending!(task1.enter(|cx, _| Pin::new(&mut rx).poll(cx)));
251 
252     assert_eq!(2, task1.waker_ref_count());
253     assert_eq!(1, task2.waker_ref_count());
254 
255     assert_pending!(task2.enter(|cx, _| Pin::new(&mut rx).poll(cx)));
256 
257     assert_eq!(1, task1.waker_ref_count());
258     assert_eq!(2, task2.waker_ref_count());
259 
260     assert_ok!(tx.send(1));
261 
262     assert!(!task1.is_woken());
263     assert!(task2.is_woken());
264 
265     assert_ready_ok!(task2.enter(|cx, _| Pin::new(&mut rx).poll(cx)));
266 }
267 
268 #[test]
sender_changes_task()269 fn sender_changes_task() {
270     let (mut tx, rx) = oneshot::channel::<i32>();
271 
272     let mut task1 = task::spawn(());
273     let mut task2 = task::spawn(());
274 
275     assert_pending!(task1.enter(|cx, _| tx.poll_closed(cx)));
276 
277     assert_eq!(2, task1.waker_ref_count());
278     assert_eq!(1, task2.waker_ref_count());
279 
280     assert_pending!(task2.enter(|cx, _| tx.poll_closed(cx)));
281 
282     assert_eq!(1, task1.waker_ref_count());
283     assert_eq!(2, task2.waker_ref_count());
284 
285     drop(rx);
286 
287     assert!(!task1.is_woken());
288     assert!(task2.is_woken());
289 
290     assert_ready!(task2.enter(|cx, _| tx.poll_closed(cx)));
291 }
292