• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 #![warn(rust_2018_idioms)]
2 #![cfg(feature = "full")]
3 
4 use futures::future::poll_fn;
5 use std::io;
6 use std::sync::Arc;
7 use tokio::{io::ReadBuf, net::UdpSocket};
8 use tokio_test::assert_ok;
9 
10 const MSG: &[u8] = b"hello";
11 const MSG_LEN: usize = MSG.len();
12 
13 #[tokio::test]
send_recv() -> std::io::Result<()>14 async fn send_recv() -> std::io::Result<()> {
15     let sender = UdpSocket::bind("127.0.0.1:0").await?;
16     let receiver = UdpSocket::bind("127.0.0.1:0").await?;
17 
18     sender.connect(receiver.local_addr()?).await?;
19     receiver.connect(sender.local_addr()?).await?;
20 
21     sender.send(MSG).await?;
22 
23     let mut recv_buf = [0u8; 32];
24     let len = receiver.recv(&mut recv_buf[..]).await?;
25 
26     assert_eq!(&recv_buf[..len], MSG);
27     Ok(())
28 }
29 
30 #[tokio::test]
send_recv_poll() -> std::io::Result<()>31 async fn send_recv_poll() -> std::io::Result<()> {
32     let sender = UdpSocket::bind("127.0.0.1:0").await?;
33     let receiver = UdpSocket::bind("127.0.0.1:0").await?;
34 
35     sender.connect(receiver.local_addr()?).await?;
36     receiver.connect(sender.local_addr()?).await?;
37 
38     poll_fn(|cx| sender.poll_send(cx, MSG)).await?;
39 
40     let mut recv_buf = [0u8; 32];
41     let mut read = ReadBuf::new(&mut recv_buf);
42     let _len = poll_fn(|cx| receiver.poll_recv(cx, &mut read)).await?;
43 
44     assert_eq!(read.filled(), MSG);
45     Ok(())
46 }
47 
48 #[tokio::test]
send_to_recv_from() -> std::io::Result<()>49 async fn send_to_recv_from() -> std::io::Result<()> {
50     let sender = UdpSocket::bind("127.0.0.1:0").await?;
51     let receiver = UdpSocket::bind("127.0.0.1:0").await?;
52 
53     let receiver_addr = receiver.local_addr()?;
54     sender.send_to(MSG, &receiver_addr).await?;
55 
56     let mut recv_buf = [0u8; 32];
57     let (len, addr) = receiver.recv_from(&mut recv_buf[..]).await?;
58 
59     assert_eq!(&recv_buf[..len], MSG);
60     assert_eq!(addr, sender.local_addr()?);
61     Ok(())
62 }
63 
64 #[tokio::test]
send_to_recv_from_poll() -> std::io::Result<()>65 async fn send_to_recv_from_poll() -> std::io::Result<()> {
66     let sender = UdpSocket::bind("127.0.0.1:0").await?;
67     let receiver = UdpSocket::bind("127.0.0.1:0").await?;
68 
69     let receiver_addr = receiver.local_addr()?;
70     poll_fn(|cx| sender.poll_send_to(cx, MSG, receiver_addr)).await?;
71 
72     let mut recv_buf = [0u8; 32];
73     let mut read = ReadBuf::new(&mut recv_buf);
74     let addr = poll_fn(|cx| receiver.poll_recv_from(cx, &mut read)).await?;
75 
76     assert_eq!(read.filled(), MSG);
77     assert_eq!(addr, sender.local_addr()?);
78     Ok(())
79 }
80 
81 #[tokio::test]
send_to_peek_from() -> std::io::Result<()>82 async fn send_to_peek_from() -> std::io::Result<()> {
83     let sender = UdpSocket::bind("127.0.0.1:0").await?;
84     let receiver = UdpSocket::bind("127.0.0.1:0").await?;
85 
86     let receiver_addr = receiver.local_addr()?;
87     poll_fn(|cx| sender.poll_send_to(cx, MSG, receiver_addr)).await?;
88 
89     // peek
90     let mut recv_buf = [0u8; 32];
91     let (n, addr) = receiver.peek_from(&mut recv_buf).await?;
92     assert_eq!(&recv_buf[..n], MSG);
93     assert_eq!(addr, sender.local_addr()?);
94 
95     // peek
96     let mut recv_buf = [0u8; 32];
97     let (n, addr) = receiver.peek_from(&mut recv_buf).await?;
98     assert_eq!(&recv_buf[..n], MSG);
99     assert_eq!(addr, sender.local_addr()?);
100 
101     let mut recv_buf = [0u8; 32];
102     let (n, addr) = receiver.recv_from(&mut recv_buf).await?;
103     assert_eq!(&recv_buf[..n], MSG);
104     assert_eq!(addr, sender.local_addr()?);
105 
106     Ok(())
107 }
108 
109 #[tokio::test]
send_to_peek_from_poll() -> std::io::Result<()>110 async fn send_to_peek_from_poll() -> std::io::Result<()> {
111     let sender = UdpSocket::bind("127.0.0.1:0").await?;
112     let receiver = UdpSocket::bind("127.0.0.1:0").await?;
113 
114     let receiver_addr = receiver.local_addr()?;
115     poll_fn(|cx| sender.poll_send_to(cx, MSG, receiver_addr)).await?;
116 
117     let mut recv_buf = [0u8; 32];
118     let mut read = ReadBuf::new(&mut recv_buf);
119     let addr = poll_fn(|cx| receiver.poll_peek_from(cx, &mut read)).await?;
120 
121     assert_eq!(read.filled(), MSG);
122     assert_eq!(addr, sender.local_addr()?);
123 
124     let mut recv_buf = [0u8; 32];
125     let mut read = ReadBuf::new(&mut recv_buf);
126     poll_fn(|cx| receiver.poll_peek_from(cx, &mut read)).await?;
127 
128     assert_eq!(read.filled(), MSG);
129     let mut recv_buf = [0u8; 32];
130     let mut read = ReadBuf::new(&mut recv_buf);
131 
132     poll_fn(|cx| receiver.poll_recv_from(cx, &mut read)).await?;
133     assert_eq!(read.filled(), MSG);
134     Ok(())
135 }
136 
137 #[tokio::test]
split() -> std::io::Result<()>138 async fn split() -> std::io::Result<()> {
139     let socket = UdpSocket::bind("127.0.0.1:0").await?;
140     let s = Arc::new(socket);
141     let r = s.clone();
142 
143     let addr = s.local_addr()?;
144     tokio::spawn(async move {
145         s.send_to(MSG, &addr).await.unwrap();
146     });
147     let mut recv_buf = [0u8; 32];
148     let (len, _) = r.recv_from(&mut recv_buf[..]).await?;
149     assert_eq!(&recv_buf[..len], MSG);
150     Ok(())
151 }
152 
153 #[tokio::test]
split_chan() -> std::io::Result<()>154 async fn split_chan() -> std::io::Result<()> {
155     // setup UdpSocket that will echo all sent items
156     let socket = UdpSocket::bind("127.0.0.1:0").await?;
157     let addr = socket.local_addr().unwrap();
158     let s = Arc::new(socket);
159     let r = s.clone();
160 
161     let (tx, mut rx) = tokio::sync::mpsc::channel::<(Vec<u8>, std::net::SocketAddr)>(1_000);
162     tokio::spawn(async move {
163         while let Some((bytes, addr)) = rx.recv().await {
164             s.send_to(&bytes, &addr).await.unwrap();
165         }
166     });
167 
168     tokio::spawn(async move {
169         let mut buf = [0u8; 32];
170         loop {
171             let (len, addr) = r.recv_from(&mut buf).await.unwrap();
172             tx.send((buf[..len].to_vec(), addr)).await.unwrap();
173         }
174     });
175 
176     // test that we can send a value and get back some response
177     let sender = UdpSocket::bind("127.0.0.1:0").await?;
178     sender.send_to(MSG, addr).await?;
179     let mut recv_buf = [0u8; 32];
180     let (len, _) = sender.recv_from(&mut recv_buf).await?;
181     assert_eq!(&recv_buf[..len], MSG);
182     Ok(())
183 }
184 
185 #[tokio::test]
split_chan_poll() -> std::io::Result<()>186 async fn split_chan_poll() -> std::io::Result<()> {
187     // setup UdpSocket that will echo all sent items
188     let socket = UdpSocket::bind("127.0.0.1:0").await?;
189     let addr = socket.local_addr().unwrap();
190     let s = Arc::new(socket);
191     let r = s.clone();
192 
193     let (tx, mut rx) = tokio::sync::mpsc::channel::<(Vec<u8>, std::net::SocketAddr)>(1_000);
194     tokio::spawn(async move {
195         while let Some((bytes, addr)) = rx.recv().await {
196             poll_fn(|cx| s.poll_send_to(cx, &bytes, addr))
197                 .await
198                 .unwrap();
199         }
200     });
201 
202     tokio::spawn(async move {
203         let mut recv_buf = [0u8; 32];
204         let mut read = ReadBuf::new(&mut recv_buf);
205         loop {
206             let addr = poll_fn(|cx| r.poll_recv_from(cx, &mut read)).await.unwrap();
207             tx.send((read.filled().to_vec(), addr)).await.unwrap();
208         }
209     });
210 
211     // test that we can send a value and get back some response
212     let sender = UdpSocket::bind("127.0.0.1:0").await?;
213     poll_fn(|cx| sender.poll_send_to(cx, MSG, addr)).await?;
214 
215     let mut recv_buf = [0u8; 32];
216     let mut read = ReadBuf::new(&mut recv_buf);
217     let _ = poll_fn(|cx| sender.poll_recv_from(cx, &mut read)).await?;
218     assert_eq!(read.filled(), MSG);
219     Ok(())
220 }
221 
222 // # Note
223 //
224 // This test is purposely written such that each time `sender` sends data on
225 // the socket, `receiver` awaits the data. On Unix, it would be okay waiting
226 // until the end of the test to receive all the data. On Windows, this would
227 // **not** be okay because it's resources are completion based (via IOCP).
228 // If data is sent and not yet received, attempting to send more data will
229 // result in `ErrorKind::WouldBlock` until the first operation completes.
230 #[tokio::test]
try_send_spawn()231 async fn try_send_spawn() {
232     const MSG2: &[u8] = b"world!";
233     const MSG2_LEN: usize = MSG2.len();
234 
235     let sender = UdpSocket::bind("127.0.0.1:0").await.unwrap();
236     let receiver = UdpSocket::bind("127.0.0.1:0").await.unwrap();
237 
238     receiver
239         .connect(sender.local_addr().unwrap())
240         .await
241         .unwrap();
242 
243     sender.writable().await.unwrap();
244 
245     let sent = &sender
246         .try_send_to(MSG, receiver.local_addr().unwrap())
247         .unwrap();
248     assert_eq!(sent, &MSG_LEN);
249     let mut buf = [0u8; 32];
250     let mut received = receiver.recv(&mut buf[..]).await.unwrap();
251 
252     sender
253         .connect(receiver.local_addr().unwrap())
254         .await
255         .unwrap();
256     let sent = &sender.try_send(MSG2).unwrap();
257     assert_eq!(sent, &MSG2_LEN);
258     received += receiver.recv(&mut buf[..]).await.unwrap();
259 
260     std::thread::spawn(move || {
261         let sent = &sender.try_send(MSG).unwrap();
262         assert_eq!(sent, &MSG_LEN);
263     })
264     .join()
265     .unwrap();
266     received += receiver.recv(&mut buf[..]).await.unwrap();
267 
268     assert_eq!(received, MSG_LEN * 2 + MSG2_LEN);
269 }
270 
271 #[tokio::test]
try_send_recv()272 async fn try_send_recv() {
273     // Create listener
274     let server = UdpSocket::bind("127.0.0.1:0").await.unwrap();
275 
276     // Create socket pair
277     let client = UdpSocket::bind("127.0.0.1:0").await.unwrap();
278 
279     // Connect the two
280     client.connect(server.local_addr().unwrap()).await.unwrap();
281     server.connect(client.local_addr().unwrap()).await.unwrap();
282 
283     for _ in 0..5 {
284         loop {
285             client.writable().await.unwrap();
286 
287             match client.try_send(b"hello world") {
288                 Ok(n) => {
289                     assert_eq!(n, 11);
290                     break;
291                 }
292                 Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => continue,
293                 Err(e) => panic!("{:?}", e),
294             }
295         }
296 
297         loop {
298             server.readable().await.unwrap();
299 
300             let mut buf = [0; 512];
301 
302             match server.try_recv(&mut buf) {
303                 Ok(n) => {
304                     assert_eq!(n, 11);
305                     assert_eq!(&buf[0..11], &b"hello world"[..]);
306                     break;
307                 }
308                 Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => continue,
309                 Err(e) => panic!("{:?}", e),
310             }
311         }
312     }
313 }
314 
315 #[tokio::test]
try_send_to_recv_from()316 async fn try_send_to_recv_from() {
317     // Create listener
318     let server = UdpSocket::bind("127.0.0.1:0").await.unwrap();
319     let saddr = server.local_addr().unwrap();
320 
321     // Create socket pair
322     let client = UdpSocket::bind("127.0.0.1:0").await.unwrap();
323     let caddr = client.local_addr().unwrap();
324 
325     for _ in 0..5 {
326         loop {
327             client.writable().await.unwrap();
328 
329             match client.try_send_to(b"hello world", saddr) {
330                 Ok(n) => {
331                     assert_eq!(n, 11);
332                     break;
333                 }
334                 Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => continue,
335                 Err(e) => panic!("{:?}", e),
336             }
337         }
338 
339         loop {
340             server.readable().await.unwrap();
341 
342             let mut buf = [0; 512];
343 
344             match server.try_recv_from(&mut buf) {
345                 Ok((n, addr)) => {
346                     assert_eq!(n, 11);
347                     assert_eq!(addr, caddr);
348                     assert_eq!(&buf[0..11], &b"hello world"[..]);
349                     break;
350                 }
351                 Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => continue,
352                 Err(e) => panic!("{:?}", e),
353             }
354         }
355     }
356 }
357 
358 #[tokio::test]
try_recv_buf()359 async fn try_recv_buf() {
360     // Create listener
361     let server = UdpSocket::bind("127.0.0.1:0").await.unwrap();
362 
363     // Create socket pair
364     let client = UdpSocket::bind("127.0.0.1:0").await.unwrap();
365 
366     // Connect the two
367     client.connect(server.local_addr().unwrap()).await.unwrap();
368     server.connect(client.local_addr().unwrap()).await.unwrap();
369 
370     for _ in 0..5 {
371         loop {
372             client.writable().await.unwrap();
373 
374             match client.try_send(b"hello world") {
375                 Ok(n) => {
376                     assert_eq!(n, 11);
377                     break;
378                 }
379                 Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => continue,
380                 Err(e) => panic!("{:?}", e),
381             }
382         }
383 
384         loop {
385             server.readable().await.unwrap();
386 
387             let mut buf = Vec::with_capacity(512);
388 
389             match server.try_recv_buf(&mut buf) {
390                 Ok(n) => {
391                     assert_eq!(n, 11);
392                     assert_eq!(&buf[0..11], &b"hello world"[..]);
393                     break;
394                 }
395                 Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => continue,
396                 Err(e) => panic!("{:?}", e),
397             }
398         }
399     }
400 }
401 
402 #[tokio::test]
try_recv_buf_from()403 async fn try_recv_buf_from() {
404     // Create listener
405     let server = UdpSocket::bind("127.0.0.1:0").await.unwrap();
406     let saddr = server.local_addr().unwrap();
407 
408     // Create socket pair
409     let client = UdpSocket::bind("127.0.0.1:0").await.unwrap();
410     let caddr = client.local_addr().unwrap();
411 
412     for _ in 0..5 {
413         loop {
414             client.writable().await.unwrap();
415 
416             match client.try_send_to(b"hello world", saddr) {
417                 Ok(n) => {
418                     assert_eq!(n, 11);
419                     break;
420                 }
421                 Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => continue,
422                 Err(e) => panic!("{:?}", e),
423             }
424         }
425 
426         loop {
427             server.readable().await.unwrap();
428 
429             let mut buf = Vec::with_capacity(512);
430 
431             match server.try_recv_buf_from(&mut buf) {
432                 Ok((n, addr)) => {
433                     assert_eq!(n, 11);
434                     assert_eq!(addr, caddr);
435                     assert_eq!(&buf[0..11], &b"hello world"[..]);
436                     break;
437                 }
438                 Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => continue,
439                 Err(e) => panic!("{:?}", e),
440             }
441         }
442     }
443 }
444 
445 #[tokio::test]
poll_ready()446 async fn poll_ready() {
447     // Create listener
448     let server = UdpSocket::bind("127.0.0.1:0").await.unwrap();
449     let saddr = server.local_addr().unwrap();
450 
451     // Create socket pair
452     let client = UdpSocket::bind("127.0.0.1:0").await.unwrap();
453     let caddr = client.local_addr().unwrap();
454 
455     for _ in 0..5 {
456         loop {
457             assert_ok!(poll_fn(|cx| client.poll_send_ready(cx)).await);
458 
459             match client.try_send_to(b"hello world", saddr) {
460                 Ok(n) => {
461                     assert_eq!(n, 11);
462                     break;
463                 }
464                 Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => continue,
465                 Err(e) => panic!("{:?}", e),
466             }
467         }
468 
469         loop {
470             assert_ok!(poll_fn(|cx| server.poll_recv_ready(cx)).await);
471 
472             let mut buf = Vec::with_capacity(512);
473 
474             match server.try_recv_buf_from(&mut buf) {
475                 Ok((n, addr)) => {
476                     assert_eq!(n, 11);
477                     assert_eq!(addr, caddr);
478                     assert_eq!(&buf[0..11], &b"hello world"[..]);
479                     break;
480                 }
481                 Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => continue,
482                 Err(e) => panic!("{:?}", e),
483             }
484         }
485     }
486 }
487